Skip to main content

base

Pod communication protocols.

These classes take an algorithm and are responsible for organising the communication between Pods and Modeller.

Attributes: registry: A read-only dictionary of protocol factory names to their implementation classes.

Current message flow between the workers with 2 batches and modeller for streaming batched execution:

  1. Workers send NUM_BATCHES=-1 to Modeller.
  2. Workers send CURRENT_BATCH_ID=1 to Modeller.
  3. Modeller states it's processing batch 1, Worker send a TASK_START to Modeller.
  4. Modeller waits for all Pods to be ready.
  5. Modeller sends TASK_START to Worker.
  6. Workers run batch 1.
  7. Workers send EVALUATION_RESULTS to Modeller.
  8. Workers send CURRENT_BATCH_ID=2 to Modeller.
  9. Workers run batch 2 (final), Modeller states it's processing batch 2.
  10. Workers send EVALUATION_RESULTS to Modeller.
  11. Workers send BATCHES_COMPLETE("BATCHES_ONLY") to Modeller.
  12. Worker runs resilience phase [if enabled]
  13. Workers send BATCHES_COMPLETE("TASK_COMPLETE") to Modeller, modeller exits streaming loop.
  14. Modeller sends TASK_COMPLETE to Workers.

Batched execution is currently only supported for single worker cases, but for future reference, in the case where multiple workers are expected to execute the current task, the modeller will choose to display to the user the current batch id corresponding to the slowest user.

Classes

BaseCompatibleAlgoFactoryWorkerHubNeeded

class BaseCompatibleAlgoFactoryWorkerHubNeeded(*args, **kwargs):

Protocol defining base algorithm factory compatibility.

For the case where the worker() call explicitly needs a hub instance.

Ancestors

Subclasses

  • bitfount.federated.protocols.model_protocols.federated_averaging._FederatedAveragingCompatibleAlgoFactory
  • bitfount.federated.protocols.model_protocols.inference_csv_report._InferenceAndCSVReportCompatibleHuggingFaceAlgoFactory
  • bitfount.federated.protocols.model_protocols.inference_csv_report._InferenceAndCSVReportCompatibleModelAlgoFactory
  • bitfount.federated.protocols.model_protocols.inference_image_output._InferenceAndImageOutputCompatibleHuggingFaceAlgoFactory
  • bitfount.federated.protocols.model_protocols.inference_image_output._InferenceAndImageOutputCompatibleModelAlgoFactory
  • bitfount.federated.protocols.results_only._ResultsOnlyCompatibleModelAlgoFactory

Methods


worker

def worker(    self, *, hub: BitfountHub, context: ProtocolContext, **kwargs: Any,)> T_WorkerSide:

Worker-side of the algorithm.

BaseCompatibleAlgoFactoryWorkerStandard

class BaseCompatibleAlgoFactoryWorkerStandard(*args, **kwargs):

Protocol defining base algorithm factory compatibility.

For the case where the worker() call has no explicit requirements.

Ancestors

Subclasses

  • bitfount.federated.protocols.modelprotocols.inference_csv_report._InferenceAndCSVReportCompatibleAlgoFactory
  • bitfount.federated.protocols.modelprotocols.inference_image_output._InferenceAndImageOutputCompatibleAlgoFactory
  • bitfount.federated.protocols.results_only._ResultsOnlyCompatibleNonModelAlgoFactory

Methods


worker

def worker(self, *, context: ProtocolContext, **kwargs: Any)> +T_WorkerSide:

Worker-side of the algorithm.

BaseCompatibleModellerAlgorithm

class BaseCompatibleModellerAlgorithm(*args, **kwargs):

Protocol defining base modeller-side algorithm compatibility.

Subclasses

  • bitfount.federated.protocols.model_protocols.federated_averaging._FederatedAveragingCompatibleModeller
  • bitfount.federated.protocols.model_protocols.inference_csv_report._InferenceAndCSVReportCompatibleModellerAlgorithm
  • bitfount.federated.protocols.model_protocols.inference_image_output._InferenceAndImageOutputCompatibleModellerAlgorithm
  • bitfount.federated.protocols.results_only._ResultsOnlyCompatibleModellerAlgorithm

Methods


initialise

def initialise(self, *, task_id: str, **kwargs: Any)> None:

Initialises the algorithm.

BaseCompatibleWorkerAlgorithm

class BaseCompatibleWorkerAlgorithm(*args, **kwargs):

Protocol defining base worker-side algorithm compatibility.

Subclasses

  • bitfount.federated.protocols.model_protocols.federated_averaging._FederatedAveragingCompatibleWorker
  • bitfount.federated.protocols.model_protocols.inference_csv_report._InferenceAndCSVReportCompatibleWorkerAlgorithm
  • bitfount.federated.protocols.model_protocols.inference_image_output._InferenceAndImageOutputCompatibleWorkerAlgorithm
  • bitfount.federated.protocols.results_only._ResultsOnlyCompatibleWorkerAlgorithm

Methods


initialise

def initialise(    self,    *,    datasource: BaseSource,    task_id: str,    data_splitter: Optional[DatasetSplitter] = None,    pod_dp: Optional[DPPodConfig] = None,    pod_identifier: Optional[str] = None,    **kwargs: Any,)> None:

Initialises the algorithm.

initialise_data

def initialise_data(    self, datasource: BaseSource, data_splitter: Optional[DatasetSplitter] = None,)> None:

Initialises the data for the algorithm.

BaseModellerProtocol

class BaseModellerProtocol(    *,    algorithm: Union[BaseCompatibleModellerAlgorithm, Sequence[BaseCompatibleModellerAlgorithm]],    mailbox: _ModellerMailbox,    **kwargs: Any,):

Modeller side of the protocol.

Calls the modeller side of the algorithm.

Ancestors

Subclasses

  • bitfount.federated.protocols.ehr.data_extraction_protocol_charcoal._ModellerSide
  • bitfount.federated.protocols.ehr.nextgen_search_protocol._ModellerSide
  • bitfount.federated.protocols.model_protocols.federated_averaging._ModellerSide
  • bitfount.federated.protocols.model_protocols.inference_csv_report._ModellerSide
  • bitfount.federated.protocols.model_protocols.inference_image_output._ModellerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_charcoal._ModellerSide
  • bitfount.federated.protocols.ophthalmology.retinal_disease_protocol_cobalt._ModellerSide
  • GenericOphthalmologyModellerSide
  • bitfount.federated.protocols.results_only._ModellerSide

Variables

  • static task_id : str

Methods


initialise

def initialise(self, *, task_id: str, **kwargs: Any)> None:

Initialises the component algorithms.

run

async def run(self, *, context: ProtocolContext, **kwargs: Any)> Any:

Runs Modeller side of the protocol.

Arguments

  • context: Optional. Run-time context for the protocol.
  • ****kwargs**: Additional keyword arguments.

BaseProtocolFactory

class BaseProtocolFactory(    *,    algorithm: Union[BaseCompatibleAlgoFactory, Sequence[BaseCompatibleAlgoFactory]],    primary_results_path: Optional[str] = None,    **kwargs: Any,):

Base Protocol from which all other protocols must inherit.

Ancestors

  • abc.ABC
  • bitfount.federated.roles._RolesMixIn
  • bitfount.types._BaseSerializableObjectMixIn

Variables

Methods


dump

def dump(self)> SerializedProtocol:

Returns the JSON-serializable representation of the protocol.

modeller

def modeller(    self, *, mailbox: _ModellerMailbox, context: ProtocolContext, **kwargs: Any,)> BaseModellerProtocol:

Creates an instance of the modeller-side for this protocol.

run

def run(    self,    pod_identifiers: Collection[str],    session: Optional[BitfountSession] = None,    username: Optional[str] = None,    hub: Optional[BitfountHub] = None,    ms_config: Optional[MessageServiceConfig] = None,    message_service: Optional[_MessageService] = None,    pod_public_key_paths: Optional[Mapping[str, Path]] = None,    identity_verification_method: IdentityVerificationMethod = IdentityVerificationMethod.OIDC_DEVICE_CODE,    private_key_or_file: Optional[Union[RSAPrivateKey, Path]] = None,    idp_url: Optional[str] = None,    require_all_pods: bool = False,    run_on_new_data_only: bool = False,    project_id: Optional[str] = None,    batched_execution: Optional[bool] = None,    test_run: bool = False,    force_rerun_failed_files: bool = True,)> Optional[Any]:

Sets up a local Modeller instance and runs the protocol.

Arguments

  • pod_identifiers: The BitfountHub pod identifiers to run against.
  • session: Optional. Session to use for authenticated requests. Created if needed.
  • username: Username to run as. Defaults to logged in user.
  • hub: BitfountHub instance. Default: hub.bitfount.com.
  • ms_config: Message service config. Default: messaging.bitfount.com.
  • message_service: Message service instance, created from ms_config if not provided. Defaults to "messaging.bitfount.com".
  • pod_public_key_paths: Public keys of pods to be checked against.
  • identity_verification_method: The identity verification method to use.
  • private_key_or_file: Private key (to be removed).
  • idp_url: The IDP URL.
  • require_all_pods: If true raise PodResponseError if at least one pod identifier specified rejects or fails to respond to a task request.
  • run_on_new_data_only: Whether to run the task on new datapoints only. Defaults to False.
  • project_id: The project ID to run the task under.
  • batched_execution: Whether to run the task in batched mode. Defaults to False.
  • test_run: If True, runs the task in test mode, on a limited number of datapoints. Defaults to False.
  • force_rerun_failed_files: If True, forces a rerun on files that the task previously failed on. If False, the task will skip files that have previously failed. Note: This option can only be enabled if both enable_batch_resilience and individual_file_retry_enabled are True. Defaults to True.

Returns Results of the protocol.

Raises

  • PodResponseError: If require_all_pods is true and at least one pod identifier specified rejects or fails to respond to a task request.
  • ValueError: If attempting to train on multiple pods, and the DataStructure table name is given as a string.

worker

def worker(    self,    *,    mailbox: _WorkerMailbox,    hub: BitfountHub,    context: ProtocolContext,    **kwargs: Any,)> BaseWorkerProtocol:

Creates an instance of the worker-side for this protocol.

BaseWorkerProtocol

class BaseWorkerProtocol(    *,    algorithm: Union[BaseCompatibleWorkerAlgorithm, Sequence[BaseCompatibleWorkerAlgorithm]],    mailbox: _WorkerMailbox,    **kwargs: Any,):

Worker side of the protocol.

Calls the worker side of the algorithm.

Ancestors

Subclasses

  • bitfount.federated.protocols.ehr.data_extraction_protocol_charcoal._WorkerSide
  • bitfount.federated.protocols.ehr.nextgen_search_protocol._WorkerSide
  • bitfount.federated.protocols.model_protocols.federated_averaging._WorkerSide
  • bitfount.federated.protocols.model_protocols.inference_csv_report._WorkerSide
  • bitfount.federated.protocols.model_protocols.inference_image_output._WorkerSide
  • bitfount.federated.protocols.ophthalmology.fluid_volume_screening_protocol._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_amethyst._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_bronze_base._WorkerSideBronzeBase
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_charcoal._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_jade._WorkerSide
  • bitfount.federated.protocols.ophthalmology.insite_insights_protocol._WorkerSide
  • bitfount.federated.protocols.ophthalmology.retinal_disease_protocol_cobalt._WorkerSide
  • bitfount.federated.protocols.results_only._WorkerSide

Variables

  • static mailbox : bitfount.federated.transport.worker_transport._WorkerMailbox
  • static project_id : Optional[str]

Methods


initialise

def initialise(    self,    *,    datasource: BaseSource,    task_id: str,    data_splitter: Optional[DatasetSplitter] = None,    pod_dp: Optional[DPPodConfig] = None,    pod_identifier: Optional[str] = None,    project_id: Optional[str] = None,    ehr_secrets: Optional[ExternallyManagedJWT] = None,    ehr_config: Optional[EHRConfig] = None,    parent_pod_identifier: Optional[str] = None,    **kwargs: Any,)> None:

Initialises the component algorithms.

run

async def run(    self,    *,    pod_vitals: Optional[_PodVitals] = None,    context: ProtocolContext,    **kwargs: Any,)> Any:

Runs the worker-side of the algorithm.

Arguments

  • pod_vitals: Optional. Pod vitals instance for recording run-time details from the protocol run.
  • context: Optional. Run-time context for the protocol.
  • ****kwargs**: Additional keyword arguments.

FinalStepProtocol

class FinalStepProtocol():

Tagging class for protocols that contain a final step.

These protocols will have a number of steps that can be operated batch-wise (batch steps) followed by step(s) to be executed at the end.

Type

Arguments

  • T_FinalStepAlgo: Type of the setup algorithm, must implement FinalStepAlgorithm

Subclasses

  • FinalStepReduceProtocol
  • bitfount.federated.protocols.model_protocols.inference_csv_report._WorkerSide
  • bitfount.federated.protocols.ophthalmology.retinal_disease_protocol_cobalt._WorkerSide
  • bitfount.federated.protocols.results_only._WorkerSide

Variables

Methods


run_final_step

async def run_final_step(self, *, context: ProtocolContext, **kwargs: Any)> Any:

Execute the final reduce step.

FinalStepReduceProtocol

class FinalStepReduceProtocol():

Tagging class for protocols that contain a final "reduce" step.

These protocols will have a number of steps that can be operated batch-wise (batch steps) followed by step(s) at the end that cannot be executed batch-wise but instead require access to the outputs from all batch steps (reduce step(s)).

Subclasses

  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_amethyst._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_bronze_base._WorkerSideBronzeBase
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_charcoal._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_jade._WorkerSide
  • bitfount.federated.protocols.ophthalmology.insite_insights_protocol._WorkerSide

Variables

Methods


run_final_step

async def run_final_step(self, *, context: ProtocolContext, **kwargs: Any)> Any:

Inherited from:

FinalStepProtocol.run_final_step :

Execute the final reduce step.

InitialSetupProtocol

class InitialSetupProtocol(*args: Any, **kwargs: Any):

Tagging class for protocols that contain an initial setup step.

These protocols will have an initial step that must be executed before any batching, followed by steps that can be operated batch-wise.

Type

Arguments

  • T_InitialSetupAlgo: Type of the setup algorithm, must implement InitialSetupAlgorithm

Subclasses

  • bitfount.federated.protocols.ehr.nextgen_search_protocol._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_charcoal._WorkerSide
  • bitfount.federated.protocols.ophthalmology.insite_insights_protocol._WorkerSide

Variables

Methods


run_initial_setup

def run_initial_setup(self, **kwargs: Any)> None:

Run the initial setup phase.

LimitsExceededInfo

class LimitsExceededInfo(overrun: int, allowed: int):

LimitsExceededInfo(overrun, allowed)

Ancestors

  • builtins.tuple

Variables

  • allowed : int - Alias for field number 1
  • overrun : int - Alias for field number 0

ModelInferenceProtocolMixin

class ModelInferenceProtocolMixin():

Mixin class for protocols that may contain one or more model inference steps.

These protocols will have to respect any model inference usage limits that are associated with the model(s) in use.

Subclasses

  • bitfount.federated.protocols.ehr.data_extraction_protocol_charcoal._WorkerSide
  • bitfount.federated.protocols.model_protocols.inference_csv_report._WorkerSide
  • bitfount.federated.protocols.model_protocols.inference_image_output._WorkerSide
  • bitfount.federated.protocols.ophthalmology.fluid_volume_screening_protocol._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_amethyst._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_bronze_base._WorkerSideBronzeBase
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_charcoal._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_jade._WorkerSide
  • bitfount.federated.protocols.ophthalmology.insite_insights_protocol._WorkerSide
  • bitfount.federated.protocols.ophthalmology.retinal_disease_protocol_cobalt._WorkerSide
  • bitfount.federated.protocols.results_only._WorkerSide

Static methods


apply_actual_usage_to_resources_consumed

def apply_actual_usage_to_resources_consumed(    inference_algorithm: ModelInferenceWorkerSideAlgorithm,    limits_exceeded_info: LimitsExceededInfo | None,)> list[ResourceConsumed]:

Generate a resources consumed list from an algorithm that respects limits.

Given information on the actual number of inferences that were allowed/used, updates resources consumed entries from the given algorithm to reflect this limit.

If limits were not exceeded, just returns the resources consumed information unchanged.

Arguments

  • inference_algorithm: The inference algorithm used for the inferences.
  • limits_exceeded_info: If not None, contains information on the actual number of inferences that were allowed/used.

Returns The list of resources consumed, as generated by the algorithm, with model inference resources consumed entries modified to reflect the actually used inferences. If limits were not exceeded, returns the list of resources consumed, unchanged.

check_usage_limits

def check_usage_limits(    limits: dict[str, InferenceLimits],    inference_algorithm: ModelInferenceWorkerSideAlgorithm,)> Optional[LimitsExceededInfo]:

Check if the most recent inference run has exceeded the usage limits.

Updates the total usage count associated with model in question, regardless of if the limits are exceeded or not.

Arguments

  • limits: The inference usage limits as a mapping of model_id to usage limits.
  • inference_algorithm: The inference algorithm instance that has just been run.

Returns If limits were not exceeded, returns None. Otherwise, returns a container with .overrun and .allowed attributes which indicate the number of predictions usage was exceeded by and the number of predictions actually allowed to be used respectively. e.g. for an initial total_usage of 10, a limit of 20, and an inference run that used 14 more inferences, will return (4, 10). If limits are not exceeded, will return None.

handle_limits_exceeded

async def handle_limits_exceeded(    exceeded_inference_algo: ModelInferenceWorkerSideAlgorithm,    limits_exceeded_info: LimitsExceededInfo,    limits_info: dict[str, InferenceLimits],    mailbox: _WorkerMailbox,)> NoReturn:

Handles when usage limits are exceeded within the protocol.

In particular, sends a TASK_ABORT message from Worker->Modeller, letting them know that they limits are exceeded, and raises a TaskAbortError to do the same within the Worker side.

ProtocolDecoratorMetaClass

class ProtocolDecoratorMetaClass(*args, **kwargs):

Decorates the __init__ and run protocol methods.

Ancestors

  • bitfount.hooks.BaseDecoratorMetaClass
  • builtins.type

Subclasses

  • types.AbstractProtocolDecoratorMetaClass

Static methods


decorator

def decorator(f: Callable)> collections.abc.Callable:

Hook decorator which logs before and after the hook it decorates.

do_decorate

def do_decorate(attr: str, value: Any)> bool:

Checks if an object should be decorated.

Only the init and run methods should be decorated.

ProtocolExecution

class ProtocolExecution(    protocol: _BaseProtocol,    run_method: Callable,    context: ProtocolContext,    batched_execution: Optional[bool],    hook_kwargs: Optional[_StrAnyDict],    processed_files_cache: Optional[dict[str, datetime]] = None,    failed_files_cache: Optional[dict[str, dict[str, str]]] = None,    test_run: bool = False,    **kwargs: Any,):

Handles protocol execution with proper context management.

Arguments

  • protocol: The protocol instance to run.
  • run_method: The method to execute on the protocol.
  • context: Optional. The context in which the protocol is being run.
  • batched_execution: Whether to run the protocol in batched mode.
  • hook_kwargs: Optional. Keyword arguments to pass to the hooks.
  • processed_files_cache: Optional. A dictionary of processed files with their last modified dates. Defaults to None.
  • failed_files_cache: Optional. A dictionary of previously failed files for the current task run. Defaults to None.
  • test_run: Whether this is a test run. Defaults to False.
  • ****kwargs**: Additional keyword arguments for the run method.

Methods


execute

async def execute(self)> Union[Any, list[Any]]:

Main execution entry point.