Skip to main content

base

Pod communication protocols.

These classes take an algorithm and are responsible for organising the communication between Pods and Modeller.

Attributes: registry: A read-only dictionary of protocol factory names to their implementation classes.

Classes

BaseCompatibleAlgoFactory

class BaseCompatibleAlgoFactory(*args, **kwargs):

Protocol defining base algorithm factory compatibility.

Subclasses

  • bitfount.federated.protocols.model_protocols.federated_averaging._FederatedAveragingCompatibleAlgoFactory
  • bitfount.federated.protocols.model_protocols.inference_csv_report._InferenceAndCSVReportCompatibleAlgoFactory
  • bitfount.federated.protocols.results_only._ResultsOnlyCompatibleAlgoFactory

Variables

  • static class_name : str

BaseCompatibleModellerAlgorithm

class BaseCompatibleModellerAlgorithm(*args, **kwargs):

Protocol defining base modeller-side algorithm compatibility.

Subclasses

  • bitfount.federated.protocols.model_protocols.federated_averaging._FederatedAveragingCompatibleModeller
  • bitfount.federated.protocols.model_protocols.inference_csv_report._InferenceAndCSVReportCompatibleModellerAlgorithm
  • bitfount.federated.protocols.results_only._ResultsOnlyCompatibleModellerAlgorithm

Methods


initialise

def initialise(self, task_id: Optional[str] = None, **kwargs: Any)> None:

Initialises the algorithm.

BaseCompatibleWorkerAlgorithm

class BaseCompatibleWorkerAlgorithm(*args, **kwargs):

Protocol defining base worker-side algorithm compatibility.

Subclasses

  • bitfount.federated.protocols.model_protocols.federated_averaging._FederatedAveragingCompatibleWorker
  • bitfount.federated.protocols.model_protocols.inference_csv_report._InferenceAndCSVReportCompatibleWorkerAlgorithm
  • bitfount.federated.protocols.results_only._ResultsOnlyCompatibleWorkerAlgorithm

Methods


initialise

def initialise(    self,    datasource: BaseSource,    data_splitter: Optional[DatasetSplitter] = None,    pod_dp: Optional[DPPodConfig] = None,    pod_identifier: Optional[str] = None,    **kwargs: Any,)> None:

Initialises the algorithm.

initialise_data

def initialise_data(    self, datasource: BaseSource, data_splitter: Optional[DatasetSplitter] = None,)> None:

Initialises the data for the algorithm.

BaseModellerProtocol

class BaseModellerProtocol(    *,    algorithm: Union[BaseCompatibleModellerAlgorithm, Sequence[BaseCompatibleModellerAlgorithm]],    mailbox: _ModellerMailbox,    **kwargs: Any,):

Modeller side of the protocol.

Calls the modeller side of the algorithm.

Ancestors

Subclasses

  • bitfount.federated.protocols.ehr.nextgen_search_protocol._ModellerSide
  • bitfount.federated.protocols.model_protocols.federated_averaging._ModellerSide
  • bitfount.federated.protocols.model_protocols.inference_csv_report._ModellerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_amethyst._ModellerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_bronze._ModellerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_jade._ModellerSide
  • bitfount.federated.protocols.ophthalmology.retinal_disease_protocol_cobalt._ModellerSide
  • bitfount.federated.protocols.results_only._ModellerSide

Methods


initialise

def initialise(self, task_id: Optional[str] = None, **kwargs: Any)> None:

Initialises the component algorithms.

run

async def run(self, *, context: Optional[ProtocolContext] = None, **kwargs: Any)> Any:

Runs Modeller side of the protocol.

Arguments

  • context: Optional. Run-time context for the protocol.
  • ****kwargs**: Additional keyword arguments.

BaseProtocolFactory

class BaseProtocolFactory(    *,    algorithm: Union[BaseCompatibleAlgoFactory, Sequence[BaseCompatibleAlgoFactory]],    **kwargs: Any,):

Base Protocol from which all other protocols must inherit.

Ancestors

  • abc.ABC
  • bitfount.federated.roles._RolesMixIn
  • bitfount.types._BaseSerializableObjectMixIn

Variables

Methods


dump

def dump(self)> SerializedProtocol:

Returns the JSON-serializable representation of the protocol.

modeller

def modeller(    self, mailbox: _ModellerMailbox, **kwargs: Any,)> BaseModellerProtocol:

Creates an instance of the modeller-side for this protocol.

run

def run(    self,    pod_identifiers: Collection[str],    session: Optional[BitfountSession] = None,    username: Optional[str] = None,    hub: Optional[BitfountHub] = None,    ms_config: Optional[MessageServiceConfig] = None,    message_service: Optional[_MessageService] = None,    pod_public_key_paths: Optional[Mapping[str, Path]] = None,    identity_verification_method: IdentityVerificationMethod = IdentityVerificationMethod.OIDC_DEVICE_CODE,    private_key_or_file: Optional[Union[RSAPrivateKey, Path]] = None,    idp_url: Optional[str] = None,    require_all_pods: bool = False,    run_on_new_data_only: bool = False,    model_out: Optional[Union[Path, str]] = None,    project_id: Optional[str] = None,    batched_execution: Optional[bool] = None,)> Optional[Any]:

Sets up a local Modeller instance and runs the protocol.

Arguments

  • pod_identifiers: The BitfountHub pod identifiers to run against.
  • session: Optional. Session to use for authenticated requests. Created if needed.
  • username: Username to run as. Defaults to logged in user.
  • hub: BitfountHub instance. Default: hub.bitfount.com.
  • ms_config: Message service config. Default: messaging.bitfount.com.
  • message_service: Message service instance, created from ms_config if not provided. Defaults to "messaging.bitfount.com".
  • pod_public_key_paths: Public keys of pods to be checked against.
  • identity_verification_method: The identity verification method to use.
  • private_key_or_file: Private key (to be removed).
  • idp_url: The IDP URL.
  • require_all_pods: If true raise PodResponseError if at least one pod identifier specified rejects or fails to respond to a task request.
  • run_on_new_data_only: Whether to run the task on new datapoints only. Defaults to False.
  • model_out: The path to save the model to.
  • project_id: The project ID to run the task under.
  • batched_execution: Whether to run the task in batched mode. Defaults to False.

Returns Results of the protocol.

Raises

  • PodResponseError: If require_all_pods is true and at least one pod identifier specified rejects or fails to respond to a task request.
  • ValueError: If attempting to train on multiple pods, and the DataStructure table name is given as a string.

worker

def worker(    self,    mailbox: _WorkerMailbox,    hub: BitfountHub,    context: Optional[ProtocolContext] = None,    **kwargs: Any,)> BaseWorkerProtocol:

Creates an instance of the worker-side for this protocol.

BaseWorkerProtocol

class BaseWorkerProtocol(    *,    algorithm: Union[BaseCompatibleWorkerAlgorithm, Sequence[BaseCompatibleWorkerAlgorithm]],    mailbox: _WorkerMailbox,    **kwargs: Any,):

Worker side of the protocol.

Calls the worker side of the algorithm.

Ancestors

Subclasses

  • bitfount.federated.protocols.ehr.nextgen_search_protocol._WorkerSide
  • bitfount.federated.protocols.model_protocols.federated_averaging._WorkerSide
  • bitfount.federated.protocols.model_protocols.inference_csv_report._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_amethyst._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_bronze._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_jade._WorkerSide
  • bitfount.federated.protocols.ophthalmology.retinal_disease_protocol_cobalt._WorkerSide
  • bitfount.federated.protocols.results_only._WorkerSide

Variables

  • static mailbox : bitfount.federated.transport.worker_transport._WorkerMailbox
  • static project_id : Optional[str]

Methods


initialise

def initialise(    self,    datasource: BaseSource,    data_splitter: Optional[DatasetSplitter] = None,    pod_dp: Optional[DPPodConfig] = None,    pod_identifier: Optional[str] = None,    project_id: Optional[str] = None,    **kwargs: Any,)> None:

Initialises the component algorithms.

run

async def run(    self,    *,    pod_vitals: Optional[_PodVitals] = None,    context: Optional[ProtocolContext] = None,    **kwargs: Any,)> Any:

Runs the worker-side of the algorithm.

Arguments

  • pod_vitals: Optional. Pod vitals instance for recording run-time details from the protocol run.
  • context: Optional. Run-time context for the protocol.
  • ****kwargs**: Additional keyword arguments.

BatchConfig

class BatchConfig(    num_batches: int,    batch_size: int,    original_test_files: list[str],    original_file_names_override: Optional[list[str]],):

Holds batch configuration and state.

FinalStepReduceProtocol

class FinalStepReduceProtocol():

Tagging class for protocols that contain a final "reduce" step.

These protocols will have a number of steps that can be operated batch-wise (batch steps) followed by step(s) at the end that cannot be executed batch-wise but instead require access to the outputs from all batch steps (reduce step(s)).

Subclasses

  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_amethyst._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_bronze._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_jade._WorkerSide
  • bitfount.federated.protocols.ophthalmology.retinal_disease_protocol_cobalt._WorkerSide

LimitsExceededInfo

class LimitsExceededInfo(overrun: int, allowed: int):

LimitsExceededInfo(overrun, allowed)

Ancestors

  • builtins.tuple

Variables

  • allowed : int - Alias for field number 1
  • overrun : int - Alias for field number 0

ModelInferenceProtocolMixin

class ModelInferenceProtocolMixin():

Mixin class for protocols that may contain one or more model inference steps.

These protocols will have to respect any model inference usage limits that are associated with the model(s) in use.

Subclasses

  • bitfount.federated.protocols.model_protocols.inference_csv_report._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_amethyst._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_bronze._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_jade._WorkerSide
  • bitfount.federated.protocols.ophthalmology.retinal_disease_protocol_cobalt._WorkerSide
  • bitfount.federated.protocols.results_only._WorkerSide

Static methods


apply_actual_usage_to_resources_consumed

def apply_actual_usage_to_resources_consumed(    inference_algorithm: ModelInferenceWorkerSideAlgorithm,    limits_exceeded_info: LimitsExceededInfo | None,)> list[ResourceConsumed]:

Generate a resources consumed list from an algorithm that respects limits.

Given information on the actual number of inferences that were allowed/used, updates resources consumed entries from the given algorithm to reflect this limit.

If limits were not exceeded, just returns the resources consumed information unchanged.

Arguments

  • inference_algorithm: The inference algorithm used for the inferences.
  • limits_exceeded_info: If not None, contains information on the actual number of inferences that were allowed/used.

Returns The list of resources consumed, as generated by the algorithm, with model inference resources consumed entries modified to reflect the actually used inferences. If limits were not exceeded, returns the list of resources consumed, unchanged.

check_usage_limits

def check_usage_limits(    limits: dict[str, InferenceLimits],    inference_algorithm: ModelInferenceWorkerSideAlgorithm,)> Optional[LimitsExceededInfo]:

Check if the most recent inference run has exceeded the usage limits.

Updates the total usage count associated with model in question, regardless of if the limits are exceeded or not.

Arguments

  • limits: The inference usage limits as a mapping of model_id to usage limits.
  • inference_algorithm: The inference algorithm instance that has just been run.

Returns If limits were not exceeded, returns None. Otherwise, returns a container with .overrun and .allowed attributes which indicate the number of predictions usage was exceeded by and the number of predictions actually allowed to be used respectively. e.g. for an initial total_usage of 10, a limit of 20, and an inference run that used 14 more inferences, will return (4, 10). If limits are not exceeded, will return None.

handle_limits_exceeded

async def handle_limits_exceeded(    exceeded_inference_algo: ModelInferenceWorkerSideAlgorithm,    limits_exceeded_info: LimitsExceededInfo,    limits_info: dict[str, InferenceLimits],    mailbox: _WorkerMailbox,)> NoReturn:

Handles when usage limits are exceeded within the protocol.

In particular, sends a TASK_ABORT message from Worker->Modeller, letting them know that they limits are exceeded, and raises a TaskAbortError to do the same within the Worker side.

ProtocolDecoratorMetaClass

class ProtocolDecoratorMetaClass(*args, **kwargs):

Decorates the __init__ and run protocol methods.

Ancestors

  • bitfount.hooks.BaseDecoratorMetaClass
  • builtins.type

Subclasses

  • types.AbstractProtocolDecoratorMetaClass

Static methods


decorator

def decorator(f: Callable)> collections.abc.Callable:

Hook decorator which logs before and after the hook it decorates.

do_decorate

def do_decorate(attr: str, value: Any)> bool:

Checks if an object should be decorated.

Only the init and run methods should be decorated.

ProtocolExecution

class ProtocolExecution(    protocol: _BaseProtocol,    run_method: Callable,    context: Optional[ProtocolContext],    batched_execution: Optional[bool],    hook_kwargs: Optional[_StrAnyDict],    **kwargs: Any,):

Handles protocol execution with proper context management.

Methods


execute

async def execute(self)> Union[Any, list[Any]]:

Main execution entry point.