base
Pod communication protocols.
These classes take an algorithm and are responsible for organising the communication between Pods and Modeller.
Attributes: registry: A read-only dictionary of protocol factory names to their implementation classes.
Current message flow between the workers with 2 batches and modeller for streaming batched execution:
- Workers send
NUM_BATCHES=-1 to Modeller. - Workers send
CURRENT_BATCH_ID=1 to Modeller. - Modeller states it's processing batch 1, Worker send a
TASK_STARTto Modeller. - Modeller waits for all Pods to be ready.
- Modeller sends
TASK_STARTto Worker. - Workers run batch 1.
- Workers send
EVALUATION_RESULTSto Modeller. - Workers send
CURRENT_BATCH_ID=2 to Modeller. - Workers run batch 2 (final), Modeller states it's processing batch 2.
- Workers send
EVALUATION_RESULTSto Modeller. - Workers send
BATCHES_COMPLETE("BATCHES_ONLY")to Modeller. - Worker runs resilience phase [if enabled]
- Workers send BATCHES_COMPLETE("TASK_COMPLETE") to Modeller, modeller exits streaming loop.
- Modeller sends
TASK_COMPLETEto Workers.
Batched execution is currently only supported for single worker cases, but for future reference, in the case where multiple workers are expected to execute the current task, the modeller will choose to display to the user the current batch id corresponding to the slowest user.
Classes
BaseCompatibleAlgoFactoryWorkerHubNeeded
class BaseCompatibleAlgoFactoryWorkerHubNeeded(*args, **kwargs):Protocol defining base algorithm factory compatibility.
For the case where the worker() call explicitly needs a hub instance.
Ancestors
- bitfount.federated.protocols.base._BaseCompatibleAlgoFactoryCommon
- typing.Protocol
- typing.Generic
Subclasses
- bitfount.federated.protocols.model_protocols.federated_averaging._FederatedAveragingCompatibleAlgoFactory
- bitfount.federated.protocols.model_protocols.inference_csv_report._InferenceAndCSVReportCompatibleHuggingFaceAlgoFactory
- bitfount.federated.protocols.model_protocols.inference_csv_report._InferenceAndCSVReportCompatibleModelAlgoFactory
- bitfount.federated.protocols.model_protocols.inference_image_output._InferenceAndImageOutputCompatibleHuggingFaceAlgoFactory
- bitfount.federated.protocols.model_protocols.inference_image_output._InferenceAndImageOutputCompatibleModelAlgoFactory
- bitfount.federated.protocols.results_only._ResultsOnlyCompatibleModelAlgoFactory
Methods
worker
def worker( self, *, hub: BitfountHub, context: ProtocolContext, **kwargs: Any,) ‑> T_WorkerSide:Worker-side of the algorithm.
BaseCompatibleAlgoFactoryWorkerStandard
class BaseCompatibleAlgoFactoryWorkerStandard(*args, **kwargs):Protocol defining base algorithm factory compatibility.
For the case where the worker() call has no explicit requirements.
Ancestors
- bitfount.federated.protocols.base._BaseCompatibleAlgoFactoryCommon
- typing.Protocol
- typing.Generic
Subclasses
- bitfount.federated.protocols.modelprotocols.inference_csv_report._InferenceAndCSVReportCompatibleAlgoFactory
- bitfount.federated.protocols.modelprotocols.inference_image_output._InferenceAndImageOutputCompatibleAlgoFactory
- bitfount.federated.protocols.results_only._ResultsOnlyCompatibleNonModelAlgoFactory
Methods
worker
def worker(self, *, context: ProtocolContext, **kwargs: Any) ‑> +T_WorkerSide:Worker-side of the algorithm.
BaseCompatibleModellerAlgorithm
class BaseCompatibleModellerAlgorithm(*args, **kwargs):Protocol defining base modeller-side algorithm compatibility.
Ancestors
Subclasses
- bitfount.federated.protocols.model_protocols.federated_averaging._FederatedAveragingCompatibleModeller
- bitfount.federated.protocols.model_protocols.inference_csv_report._InferenceAndCSVReportCompatibleModellerAlgorithm
- bitfount.federated.protocols.model_protocols.inference_image_output._InferenceAndImageOutputCompatibleModellerAlgorithm
- bitfount.federated.protocols.results_only._ResultsOnlyCompatibleModellerAlgorithm
Methods
initialise
def initialise(self, *, task_id: str, **kwargs: Any) ‑> None:Initialises the algorithm.
BaseCompatibleWorkerAlgorithm
class BaseCompatibleWorkerAlgorithm(*args, **kwargs):Protocol defining base worker-side algorithm compatibility.
Ancestors
Subclasses
- bitfount.federated.protocols.model_protocols.federated_averaging._FederatedAveragingCompatibleWorker
- bitfount.federated.protocols.model_protocols.inference_csv_report._InferenceAndCSVReportCompatibleWorkerAlgorithm
- bitfount.federated.protocols.model_protocols.inference_image_output._InferenceAndImageOutputCompatibleWorkerAlgorithm
- bitfount.federated.protocols.results_only._ResultsOnlyCompatibleWorkerAlgorithm
Methods
initialise
def initialise( self, *, datasource: BaseSource, task_id: str, data_splitter: Optional[DatasetSplitter] = None, pod_dp: Optional[DPPodConfig] = None, pod_identifier: Optional[str] = None, **kwargs: Any,) ‑> None:Initialises the algorithm.
initialise_data
def initialise_data( self, datasource: BaseSource, data_splitter: Optional[DatasetSplitter] = None,) ‑> None:Initialises the data for the algorithm.
BaseModellerProtocol
class BaseModellerProtocol( *, algorithm: Union[BaseCompatibleModellerAlgorithm, Sequence[BaseCompatibleModellerAlgorithm]], mailbox: _ModellerMailbox, **kwargs: Any,):Modeller side of the protocol.
Calls the modeller side of the algorithm.
Ancestors
- bitfount.federated.protocols.base._BaseProtocol
- typing.Generic
- abc.ABC
Subclasses
- bitfount.federated.protocols.ehr.data_extraction_protocol_charcoal._ModellerSide
- bitfount.federated.protocols.ehr.nextgen_search_protocol._ModellerSide
- bitfount.federated.protocols.model_protocols.federated_averaging._ModellerSide
- bitfount.federated.protocols.model_protocols.inference_csv_report._ModellerSide
- bitfount.federated.protocols.model_protocols.inference_image_output._ModellerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_charcoal._ModellerSide
- bitfount.federated.protocols.ophthalmology.retinal_disease_protocol_cobalt._ModellerSide
- GenericOphthalmologyModellerSide
- bitfount.federated.protocols.results_only._ModellerSide
Variables
- static
task_id : str
Methods
initialise
def initialise(self, *, task_id: str, **kwargs: Any) ‑> None:Initialises the component algorithms.
run
async def run(self, *, context: ProtocolContext, **kwargs: Any) ‑> Any:Runs Modeller side of the protocol.
Arguments
context: Optional. Run-time context for the protocol.- **
**kwargs**: Additional keyword arguments.
BaseProtocolFactory
class BaseProtocolFactory( *, algorithm: Union[BaseCompatibleAlgoFactory, Sequence[BaseCompatibleAlgoFactory]], primary_results_path: Optional[str] = None, **kwargs: Any,):Base Protocol from which all other protocols must inherit.
Subclasses
- DataExtractionProtocolCharcoal
- NextGenSearchProtocol
- FederatedAveraging
- InferenceAndCSVReport
- InferenceAndImageOutput
- FluidVolumeScreeningProtocol
- GAScreeningProtocolAmethyst
- GAScreeningProtocolBronze
- GAScreeningProtocolBronzeWithEHR
- GAScreeningProtocolCharcoal
- GAScreeningProtocolJade
- InSiteInsightsProtocol
- RetinalDiseaseProtocolCobalt
- ResultsOnly
Variables
- static
fields_dict : ClassVar[dict[str, marshmallow.fields.Field]]
- static
nested_fields : ClassVar[dict[str, collections.abc.Mapping[str, Any]]]
algorithms : list[BaseCompatibleAlgoFactoryWorkerStandard | BaseCompatibleAlgoFactoryWorkerHubNeeded]- Returns the algorithms in the protocol.
Methods
dump
def dump(self) ‑> SerializedProtocol:Returns the JSON-serializable representation of the protocol.
modeller
def modeller( self, *, mailbox: _ModellerMailbox, context: ProtocolContext, **kwargs: Any,) ‑> BaseModellerProtocol:Creates an instance of the modeller-side for this protocol.
run
def run( self, pod_identifiers: Collection[str], session: Optional[BitfountSession] = None, username: Optional[str] = None, hub: Optional[BitfountHub] = None, ms_config: Optional[MessageServiceConfig] = None, message_service: Optional[_MessageService] = None, pod_public_key_paths: Optional[Mapping[str, Path]] = None, identity_verification_method: IdentityVerificationMethod = IdentityVerificationMethod.OIDC_DEVICE_CODE, private_key_or_file: Optional[Union[RSAPrivateKey, Path]] = None, idp_url: Optional[str] = None, require_all_pods: bool = False, run_on_new_data_only: bool = False, project_id: Optional[str] = None, batched_execution: Optional[bool] = None, test_run: bool = False, force_rerun_failed_files: bool = True,) ‑> Optional[Any]:Sets up a local Modeller instance and runs the protocol.
Arguments
pod_identifiers: The BitfountHub pod identifiers to run against.session: Optional. Session to use for authenticated requests. Created if needed.username: Username to run as. Defaults to logged in user.hub: BitfountHub instance. Default: hub.bitfount.com.ms_config: Message service config. Default: messaging.bitfount.com.message_service: Message service instance, created from ms_config if not provided. Defaults to "messaging.bitfount.com".pod_public_key_paths: Public keys of pods to be checked against.identity_verification_method: The identity verification method to use.private_key_or_file: Private key (to be removed).idp_url: The IDP URL.require_all_pods: If true raise PodResponseError if at least one pod identifier specified rejects or fails to respond to a task request.run_on_new_data_only: Whether to run the task on new datapoints only. Defaults to False.project_id: The project ID to run the task under.batched_execution: Whether to run the task in batched mode. Defaults to False.test_run: If True, runs the task in test mode, on a limited number of datapoints. Defaults to False.force_rerun_failed_files: If True, forces a rerun on files that the task previously failed on. If False, the task will skip files that have previously failed. Note: This option can only be enabled if both enable_batch_resilience and individual_file_retry_enabled are True. Defaults to True.
Returns Results of the protocol.
Raises
PodResponseError: If require_all_pods is true and at least one pod identifier specified rejects or fails to respond to a task request.ValueError: If attempting to train on multiple pods, and theDataStructuretable name is given as a string.
worker
def worker( self, *, mailbox: _WorkerMailbox, hub: BitfountHub, context: ProtocolContext, **kwargs: Any,) ‑> BaseWorkerProtocol:Creates an instance of the worker-side for this protocol.
BaseWorkerProtocol
class BaseWorkerProtocol( *, algorithm: Union[BaseCompatibleWorkerAlgorithm, Sequence[BaseCompatibleWorkerAlgorithm]], mailbox: _WorkerMailbox, **kwargs: Any,):Worker side of the protocol.
Calls the worker side of the algorithm.
Ancestors
- bitfount.federated.protocols.base._BaseProtocol
- typing.Generic
- abc.ABC
Subclasses
- bitfount.federated.protocols.ehr.data_extraction_protocol_charcoal._WorkerSide
- bitfount.federated.protocols.ehr.nextgen_search_protocol._WorkerSide
- bitfount.federated.protocols.model_protocols.federated_averaging._WorkerSide
- bitfount.federated.protocols.model_protocols.inference_csv_report._WorkerSide
- bitfount.federated.protocols.model_protocols.inference_image_output._WorkerSide
- bitfount.federated.protocols.ophthalmology.fluid_volume_screening_protocol._WorkerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_amethyst._WorkerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_bronze_base._WorkerSideBronzeBase
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_charcoal._WorkerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_jade._WorkerSide
- bitfount.federated.protocols.ophthalmology.insite_insights_protocol._WorkerSide
- bitfount.federated.protocols.ophthalmology.retinal_disease_protocol_cobalt._WorkerSide
- bitfount.federated.protocols.results_only._WorkerSide
Variables
- static
datasource : BaseSource
- static
mailbox : bitfount.federated.transport.worker_transport._WorkerMailbox
- static
project_id : Optional[str]
Methods
initialise
def initialise( self, *, datasource: BaseSource, task_id: str, data_splitter: Optional[DatasetSplitter] = None, pod_dp: Optional[DPPodConfig] = None, pod_identifier: Optional[str] = None, project_id: Optional[str] = None, ehr_secrets: Optional[ExternallyManagedJWT] = None, ehr_config: Optional[EHRConfig] = None, parent_pod_identifier: Optional[str] = None, **kwargs: Any,) ‑> None:Initialises the component algorithms.
run
async def run( self, *, pod_vitals: Optional[_PodVitals] = None, context: ProtocolContext, **kwargs: Any,) ‑> Any:Runs the worker-side of the algorithm.
Arguments
pod_vitals: Optional. Pod vitals instance for recording run-time details from the protocol run.context: Optional. Run-time context for the protocol.- **
**kwargs**: Additional keyword arguments.
FinalStepProtocol
class FinalStepProtocol():Tagging class for protocols that contain a final step.
These protocols will have a number of steps that can be operated batch-wise (batch steps) followed by step(s) to be executed at the end.
Type
Arguments
T_FinalStepAlgo: Type of the setup algorithm, must implement FinalStepAlgorithm
Ancestors
Subclasses
- FinalStepReduceProtocol
- bitfount.federated.protocols.model_protocols.inference_csv_report._WorkerSide
- bitfount.federated.protocols.ophthalmology.retinal_disease_protocol_cobalt._WorkerSide
- bitfount.federated.protocols.results_only._WorkerSide
Variables
algorithms : list[BaseCompatibleModellerAlgorithm] | list[BaseCompatibleWorkerAlgorithm]- Get the algorithms for this protocol.
Methods
run_final_step
async def run_final_step(self, *, context: ProtocolContext, **kwargs: Any) ‑> Any:Execute the final reduce step.
FinalStepReduceProtocol
class FinalStepReduceProtocol():Tagging class for protocols that contain a final "reduce" step.
These protocols will have a number of steps that can be operated batch-wise (batch steps) followed by step(s) at the end that cannot be executed batch-wise but instead require access to the outputs from all batch steps (reduce step(s)).
Ancestors
Subclasses
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_amethyst._WorkerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_bronze_base._WorkerSideBronzeBase
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_charcoal._WorkerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_jade._WorkerSide
- bitfount.federated.protocols.ophthalmology.insite_insights_protocol._WorkerSide
Variables
algorithms : list[BaseCompatibleModellerAlgorithm] | list[BaseCompatibleWorkerAlgorithm]- Get the algorithms for this protocol.
Methods
run_final_step
async def run_final_step(self, *, context: ProtocolContext, **kwargs: Any) ‑> Any:Inherited from:
FinalStepProtocol.run_final_step :
Execute the final reduce step.
InitialSetupProtocol
class InitialSetupProtocol(*args: Any, **kwargs: Any):Tagging class for protocols that contain an initial setup step.
These protocols will have an initial step that must be executed before any batching, followed by steps that can be operated batch-wise.
Type
Arguments
T_InitialSetupAlgo: Type of the setup algorithm, must implement InitialSetupAlgorithm
Ancestors
Subclasses
- bitfount.federated.protocols.ehr.nextgen_search_protocol._WorkerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_charcoal._WorkerSide
- bitfount.federated.protocols.ophthalmology.insite_insights_protocol._WorkerSide
Variables
algorithms : list[BaseCompatibleModellerAlgorithm] | list[BaseCompatibleWorkerAlgorithm]- Get the algorithms for this protocol.
Methods
run_initial_setup
def run_initial_setup(self, **kwargs: Any) ‑> None:Run the initial setup phase.
LimitsExceededInfo
class LimitsExceededInfo(overrun: int, allowed: int):LimitsExceededInfo(overrun, allowed)
Ancestors
- builtins.tuple
ModelInferenceProtocolMixin
class ModelInferenceProtocolMixin():Mixin class for protocols that may contain one or more model inference steps.
These protocols will have to respect any model inference usage limits that are associated with the model(s) in use.
Subclasses
- bitfount.federated.protocols.ehr.data_extraction_protocol_charcoal._WorkerSide
- bitfount.federated.protocols.model_protocols.inference_csv_report._WorkerSide
- bitfount.federated.protocols.model_protocols.inference_image_output._WorkerSide
- bitfount.federated.protocols.ophthalmology.fluid_volume_screening_protocol._WorkerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_amethyst._WorkerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_bronze_base._WorkerSideBronzeBase
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_charcoal._WorkerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_jade._WorkerSide
- bitfount.federated.protocols.ophthalmology.insite_insights_protocol._WorkerSide
- bitfount.federated.protocols.ophthalmology.retinal_disease_protocol_cobalt._WorkerSide
- bitfount.federated.protocols.results_only._WorkerSide
Static methods
apply_actual_usage_to_resources_consumed
def apply_actual_usage_to_resources_consumed( inference_algorithm: ModelInferenceWorkerSideAlgorithm, limits_exceeded_info: LimitsExceededInfo | None,) ‑> list[ResourceConsumed]:Generate a resources consumed list from an algorithm that respects limits.
Given information on the actual number of inferences that were allowed/used, updates resources consumed entries from the given algorithm to reflect this limit.
If limits were not exceeded, just returns the resources consumed information unchanged.
Arguments
inference_algorithm: The inference algorithm used for the inferences.limits_exceeded_info: If not None, contains information on the actual number of inferences that were allowed/used.
Returns The list of resources consumed, as generated by the algorithm, with model inference resources consumed entries modified to reflect the actually used inferences. If limits were not exceeded, returns the list of resources consumed, unchanged.
check_usage_limits
def check_usage_limits( limits: dict[str, InferenceLimits], inference_algorithm: ModelInferenceWorkerSideAlgorithm,) ‑> Optional[LimitsExceededInfo]:Check if the most recent inference run has exceeded the usage limits.
Updates the total usage count associated with model in question, regardless of if the limits are exceeded or not.
Arguments
limits: The inference usage limits as a mapping of model_id to usage limits.inference_algorithm: The inference algorithm instance that has just been run.
Returns
If limits were not exceeded, returns None. Otherwise, returns a container
with .overrun and .allowed attributes which indicate the number of
predictions usage was exceeded by and the number of predictions actually
allowed to be used respectively.
e.g. for an initial total_usage of 10, a limit of 20, and an inference
run that used 14 more inferences, will return (4, 10). If limits are
not exceeded, will return None.
handle_limits_exceeded
async def handle_limits_exceeded( exceeded_inference_algo: ModelInferenceWorkerSideAlgorithm, limits_exceeded_info: LimitsExceededInfo, limits_info: dict[str, InferenceLimits], mailbox: _WorkerMailbox,) ‑> NoReturn:Handles when usage limits are exceeded within the protocol.
In particular, sends a TASK_ABORT message from Worker->Modeller, letting them know that they limits are exceeded, and raises a TaskAbortError to do the same within the Worker side.
ProtocolDecoratorMetaClass
class ProtocolDecoratorMetaClass(*args, **kwargs):Decorates the __init__ and run protocol methods.
Ancestors
- bitfount.hooks.BaseDecoratorMetaClass
- builtins.type
Subclasses
- types.AbstractProtocolDecoratorMetaClass
Static methods
decorator
def decorator(f: Callable) ‑> collections.abc.Callable:Hook decorator which logs before and after the hook it decorates.
do_decorate
def do_decorate(attr: str, value: Any) ‑> bool:Checks if an object should be decorated.
Only the init and run methods should be decorated.
ProtocolExecution
class ProtocolExecution( protocol: _BaseProtocol, run_method: Callable, context: ProtocolContext, batched_execution: Optional[bool], hook_kwargs: Optional[_StrAnyDict], processed_files_cache: Optional[dict[str, datetime]] = None, failed_files_cache: Optional[dict[str, dict[str, str]]] = None, test_run: bool = False, **kwargs: Any,):Handles protocol execution with proper context management.
Arguments
protocol: The protocol instance to run.run_method: The method to execute on the protocol.context: Optional. The context in which the protocol is being run.batched_execution: Whether to run the protocol in batched mode.hook_kwargs: Optional. Keyword arguments to pass to the hooks.processed_files_cache: Optional. A dictionary of processed files with their last modified dates. Defaults to None.failed_files_cache: Optional. A dictionary of previously failed files for the current task run. Defaults to None.test_run: Whether this is a test run. Defaults to False.- **
**kwargs**: Additional keyword arguments for the run method.
Methods
execute
async def execute(self) ‑> Union[Any, list[Any]]:Main execution entry point.