base
Pod communication protocols.
These classes take an algorithm and are responsible for organising the communication between Pods and Modeller.
Attributes: registry: A read-only dictionary of protocol factory names to their implementation classes.
Classes
BaseCompatibleAlgoFactory
class BaseCompatibleAlgoFactory(*args, **kwargs):
Protocol defining base algorithm factory compatibility.
Ancestors
Subclasses
- bitfount.federated.protocols.model_protocols.federated_averaging._FederatedAveragingCompatibleAlgoFactory
- bitfount.federated.protocols.model_protocols.inference_csv_report._InferenceAndCSVReportCompatibleAlgoFactory
- bitfount.federated.protocols.results_only._ResultsOnlyCompatibleAlgoFactory
Variables
- static
class_name : str
- static
fields_dict : ClassVar[dict[str, marshmallow.fields.Field]]
- static
nested_fields : ClassVar[dict[str, collections.abc.Mapping[str, Any]]]
BaseCompatibleModellerAlgorithm
class BaseCompatibleModellerAlgorithm(*args, **kwargs):
Protocol defining base modeller-side algorithm compatibility.
Ancestors
Subclasses
- bitfount.federated.protocols.model_protocols.federated_averaging._FederatedAveragingCompatibleModeller
- bitfount.federated.protocols.model_protocols.inference_csv_report._InferenceAndCSVReportCompatibleModellerAlgorithm
- bitfount.federated.protocols.results_only._ResultsOnlyCompatibleModellerAlgorithm
Methods
initialise
def initialise(self, task_id: Optional[str] = None, **kwargs: Any) ‑> None:
Initialises the algorithm.
BaseCompatibleWorkerAlgorithm
class BaseCompatibleWorkerAlgorithm(*args, **kwargs):
Protocol defining base worker-side algorithm compatibility.
Ancestors
Subclasses
- bitfount.federated.protocols.model_protocols.federated_averaging._FederatedAveragingCompatibleWorker
- bitfount.federated.protocols.model_protocols.inference_csv_report._InferenceAndCSVReportCompatibleWorkerAlgorithm
- bitfount.federated.protocols.results_only._ResultsOnlyCompatibleWorkerAlgorithm
Methods
initialise
def initialise( self, datasource: BaseSource, data_splitter: Optional[DatasetSplitter] = None, pod_dp: Optional[DPPodConfig] = None, pod_identifier: Optional[str] = None, **kwargs: Any,) ‑> None:
Initialises the algorithm.
initialise_data
def initialise_data( self, datasource: BaseSource, data_splitter: Optional[DatasetSplitter] = None,) ‑> None:
Initialises the data for the algorithm.
BaseModellerProtocol
class BaseModellerProtocol( *, algorithm: Union[BaseCompatibleModellerAlgorithm, Sequence[BaseCompatibleModellerAlgorithm]], mailbox: _ModellerMailbox, **kwargs: Any,):
Modeller side of the protocol.
Calls the modeller side of the algorithm.
Ancestors
- bitfount.federated.protocols.base._BaseProtocol
- typing.Generic
- abc.ABC
Subclasses
- bitfount.federated.protocols.ehr.nextgen_search_protocol._ModellerSide
- bitfount.federated.protocols.model_protocols.federated_averaging._ModellerSide
- bitfount.federated.protocols.model_protocols.inference_csv_report._ModellerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_amethyst._ModellerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_bronze._ModellerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_jade._ModellerSide
- bitfount.federated.protocols.ophthalmology.retinal_disease_protocol_cobalt._ModellerSide
- bitfount.federated.protocols.results_only._ModellerSide
Methods
initialise
def initialise(self, task_id: Optional[str] = None, **kwargs: Any) ‑> None:
Initialises the component algorithms.
run
async def run(self, *, context: Optional[ProtocolContext] = None, **kwargs: Any) ‑> Any:
Runs Modeller side of the protocol.
Arguments
context
: Optional. Run-time context for the protocol.- **
**kwargs
**: Additional keyword arguments.
BaseProtocolFactory
class BaseProtocolFactory( *, algorithm: Union[BaseCompatibleAlgoFactory, Sequence[BaseCompatibleAlgoFactory]], **kwargs: Any,):
Base Protocol from which all other protocols must inherit.
Subclasses
Variables
- static
fields_dict : ClassVar[dict[str, marshmallow.fields.Field]]
- static
nested_fields : ClassVar[dict[str, collections.abc.Mapping[str, Any]]]
algorithms : list[BaseCompatibleAlgoFactory]
- Returns the algorithms in the protocol.
Methods
dump
def dump(self) ‑> SerializedProtocol:
Returns the JSON-serializable representation of the protocol.
modeller
def modeller( self, mailbox: _ModellerMailbox, **kwargs: Any,) ‑> BaseModellerProtocol:
Creates an instance of the modeller-side for this protocol.
run
def run( self, pod_identifiers: Collection[str], session: Optional[BitfountSession] = None, username: Optional[str] = None, hub: Optional[BitfountHub] = None, ms_config: Optional[MessageServiceConfig] = None, message_service: Optional[_MessageService] = None, pod_public_key_paths: Optional[Mapping[str, Path]] = None, identity_verification_method: IdentityVerificationMethod = IdentityVerificationMethod.OIDC_DEVICE_CODE, private_key_or_file: Optional[Union[RSAPrivateKey, Path]] = None, idp_url: Optional[str] = None, require_all_pods: bool = False, run_on_new_data_only: bool = False, model_out: Optional[Union[Path, str]] = None, project_id: Optional[str] = None, batched_execution: Optional[bool] = None,) ‑> Optional[Any]:
Sets up a local Modeller instance and runs the protocol.
Arguments
pod_identifiers
: The BitfountHub pod identifiers to run against.session
: Optional. Session to use for authenticated requests. Created if needed.username
: Username to run as. Defaults to logged in user.hub
: BitfountHub instance. Default: hub.bitfount.com.ms_config
: Message service config. Default: messaging.bitfount.com.message_service
: Message service instance, created from ms_config if not provided. Defaults to "messaging.bitfount.com".pod_public_key_paths
: Public keys of pods to be checked against.identity_verification_method
: The identity verification method to use.private_key_or_file
: Private key (to be removed).idp_url
: The IDP URL.require_all_pods
: If true raise PodResponseError if at least one pod identifier specified rejects or fails to respond to a task request.run_on_new_data_only
: Whether to run the task on new datapoints only. Defaults to False.model_out
: The path to save the model to.project_id
: The project ID to run the task under.batched_execution
: Whether to run the task in batched mode. Defaults to False.
Returns Results of the protocol.
Raises
PodResponseError
: If require_all_pods is true and at least one pod identifier specified rejects or fails to respond to a task request.ValueError
: If attempting to train on multiple pods, and theDataStructure
table name is given as a string.
worker
def worker( self, mailbox: _WorkerMailbox, hub: BitfountHub, context: Optional[ProtocolContext] = None, **kwargs: Any,) ‑> BaseWorkerProtocol:
Creates an instance of the worker-side for this protocol.
BaseWorkerProtocol
class BaseWorkerProtocol( *, algorithm: Union[BaseCompatibleWorkerAlgorithm, Sequence[BaseCompatibleWorkerAlgorithm]], mailbox: _WorkerMailbox, **kwargs: Any,):
Worker side of the protocol.
Calls the worker side of the algorithm.
Ancestors
- bitfount.federated.protocols.base._BaseProtocol
- typing.Generic
- abc.ABC
Subclasses
- bitfount.federated.protocols.ehr.nextgen_search_protocol._WorkerSide
- bitfount.federated.protocols.model_protocols.federated_averaging._WorkerSide
- bitfount.federated.protocols.model_protocols.inference_csv_report._WorkerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_amethyst._WorkerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_bronze._WorkerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_jade._WorkerSide
- bitfount.federated.protocols.ophthalmology.retinal_disease_protocol_cobalt._WorkerSide
- bitfount.federated.protocols.results_only._WorkerSide
Variables
- static
datasource : BaseSource
- static
mailbox : bitfount.federated.transport.worker_transport._WorkerMailbox
- static
project_id : Optional[str]
Methods
initialise
def initialise( self, datasource: BaseSource, data_splitter: Optional[DatasetSplitter] = None, pod_dp: Optional[DPPodConfig] = None, pod_identifier: Optional[str] = None, project_id: Optional[str] = None, **kwargs: Any,) ‑> None:
Initialises the component algorithms.
run
async def run( self, *, pod_vitals: Optional[_PodVitals] = None, context: Optional[ProtocolContext] = None, **kwargs: Any,) ‑> Any:
Runs the worker-side of the algorithm.
Arguments
pod_vitals
: Optional. Pod vitals instance for recording run-time details from the protocol run.context
: Optional. Run-time context for the protocol.- **
**kwargs
**: Additional keyword arguments.
BatchConfig
class BatchConfig( num_batches: int, batch_size: int, original_test_files: list[str], original_file_names_override: Optional[list[str]],):
Holds batch configuration and state.
FinalStepReduceProtocol
class FinalStepReduceProtocol():
Tagging class for protocols that contain a final "reduce" step.
These protocols will have a number of steps that can be operated batch-wise (batch steps) followed by step(s) at the end that cannot be executed batch-wise but instead require access to the outputs from all batch steps (reduce step(s)).
Subclasses
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_amethyst._WorkerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_bronze._WorkerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_jade._WorkerSide
- bitfount.federated.protocols.ophthalmology.retinal_disease_protocol_cobalt._WorkerSide
LimitsExceededInfo
class LimitsExceededInfo(overrun: int, allowed: int):
LimitsExceededInfo(overrun, allowed)
Ancestors
- builtins.tuple
ModelInferenceProtocolMixin
class ModelInferenceProtocolMixin():
Mixin class for protocols that may contain one or more model inference steps.
These protocols will have to respect any model inference usage limits that are associated with the model(s) in use.
Subclasses
- bitfount.federated.protocols.model_protocols.inference_csv_report._WorkerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_amethyst._WorkerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_bronze._WorkerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_jade._WorkerSide
- bitfount.federated.protocols.ophthalmology.retinal_disease_protocol_cobalt._WorkerSide
- bitfount.federated.protocols.results_only._WorkerSide
Static methods
apply_actual_usage_to_resources_consumed
def apply_actual_usage_to_resources_consumed( inference_algorithm: ModelInferenceWorkerSideAlgorithm, limits_exceeded_info: LimitsExceededInfo | None,) ‑> list[ResourceConsumed]:
Generate a resources consumed list from an algorithm that respects limits.
Given information on the actual number of inferences that were allowed/used, updates resources consumed entries from the given algorithm to reflect this limit.
If limits were not exceeded, just returns the resources consumed information unchanged.
Arguments
inference_algorithm
: The inference algorithm used for the inferences.limits_exceeded_info
: If not None, contains information on the actual number of inferences that were allowed/used.
Returns The list of resources consumed, as generated by the algorithm, with model inference resources consumed entries modified to reflect the actually used inferences. If limits were not exceeded, returns the list of resources consumed, unchanged.
check_usage_limits
def check_usage_limits( limits: dict[str, InferenceLimits], inference_algorithm: ModelInferenceWorkerSideAlgorithm,) ‑> Optional[LimitsExceededInfo]:
Check if the most recent inference run has exceeded the usage limits.
Updates the total usage count associated with model in question, regardless of if the limits are exceeded or not.
Arguments
limits
: The inference usage limits as a mapping of model_id to usage limits.inference_algorithm
: The inference algorithm instance that has just been run.
Returns
If limits were not exceeded, returns None. Otherwise, returns a container
with .overrun
and .allowed
attributes which indicate the number of
predictions usage was exceeded by and the number of predictions actually
allowed to be used respectively.
e.g. for an initial total_usage of 10, a limit of 20, and an inference
run that used 14 more inferences, will return (4, 10)
. If limits are
not exceeded, will return None
.
handle_limits_exceeded
async def handle_limits_exceeded( exceeded_inference_algo: ModelInferenceWorkerSideAlgorithm, limits_exceeded_info: LimitsExceededInfo, limits_info: dict[str, InferenceLimits], mailbox: _WorkerMailbox,) ‑> NoReturn:
Handles when usage limits are exceeded within the protocol.
In particular, sends a TASK_ABORT message from Worker->Modeller, letting them know that they limits are exceeded, and raises a TaskAbortError to do the same within the Worker side.
ProtocolDecoratorMetaClass
class ProtocolDecoratorMetaClass(*args, **kwargs):
Decorates the __init__
and run
protocol methods.
Ancestors
- bitfount.hooks.BaseDecoratorMetaClass
- builtins.type
Subclasses
- types.AbstractProtocolDecoratorMetaClass
Static methods
decorator
def decorator(f: Callable) ‑> collections.abc.Callable:
Hook decorator which logs before and after the hook it decorates.
do_decorate
def do_decorate(attr: str, value: Any) ‑> bool:
Checks if an object should be decorated.
Only the init and run methods should be decorated.
ProtocolExecution
class ProtocolExecution( protocol: _BaseProtocol, run_method: Callable, context: Optional[ProtocolContext], batched_execution: Optional[bool], hook_kwargs: Optional[_StrAnyDict], **kwargs: Any,):
Handles protocol execution with proper context management.
Methods
execute
async def execute(self) ‑> Union[Any, list[Any]]:
Main execution entry point.