worker_process
Contains classes and methods for spawning the Worker in a dedicated process.
The parent owns the real worker mailbox and message-service connection. The child receives a proxy mailbox that forwards transport-sensitive operations back to the parent over IPC.
Module
Functions
_run_protocol_in_child
def _run_protocol_in_child( config: _WorkerProcessConfig, result_queue: multiprocessing.Queue[_ChildResultMessage], request_queue: Any, response_queue: multiprocessing.Queue[_ParentResponse] | None = None, modeller_ready_event: multiprocessing.synchronize.Event | None = None, heartbeat_state: Any = None,):Target function for the child worker multiprocessing.Process.
Must be a plain module-level function (not a method) so it can be pickled
by the spawn start method.
All exceptions are caught and placed on result_queue; this function
must never raise.
Arguments
config: Serialisable configuration for reconstructing the worker.result_queue: Queue for sending results back to the parent process.request_queue: Queue for child requests that the parent broker fulfills, or the legacymodeller_ready_eventwhen called via older test wrappers.response_queue: Queue for parent responses to child IPC requests. Optional for backwards-compatible test wrappers.modeller_ready_event: Cross-process event set by the parent when the modeller's TASK_START message is received. A daemon thread in this child watches it and setsmailbox.modeller_ready. Optional for backwards-compatible test wrappers.heartbeat_state: Shared runtime state reserved for child liveness tracking in later milestones.
_run_protocol_in_child_target
def _run_protocol_in_child_target( config: _WorkerProcessConfig, result_queue: multiprocessing.Queue[_ChildResultMessage], request_queue: Any, response_queue: multiprocessing.Queue[_ParentResponse] | None = None, modeller_ready_event: multiprocessing.synchronize.Event | None = None, heartbeat_state: Any = None,) ‑> Any:Target function for the child worker multiprocessing.Process.
Must be a plain module-level function (not a method) so it can be pickled
by the spawn start method.
All exceptions are caught and placed on result_queue; this function
must never raise.
Arguments
config: Serialisable configuration for reconstructing the worker.result_queue: Queue for sending results back to the parent process.request_queue: Queue for child requests that the parent broker fulfills, or the legacymodeller_ready_eventwhen called via older test wrappers.response_queue: Queue for parent responses to child IPC requests. Optional for backwards-compatible test wrappers.modeller_ready_event: Cross-process event set by the parent when the modeller's TASK_START message is received. A daemon thread in this child watches it and setsmailbox.modeller_ready. Optional for backwards-compatible test wrappers.heartbeat_state: Shared runtime state reserved for child liveness tracking in later milestones.
Classes
AlgorithmProgress
class AlgorithmProgress():Emits AlgorithmProgressEvent at algorithm run and epoch boundaries.
Initialise the hook.
Methods
on_init_end
def on_init_end(self, algorithm: _BaseAlgorithm, *args: Any, **kwargs: Any) ‑> None:Inherited from:
BaseAlgorithmHook.on_init_end :
Run the hook at the very end of algorithm initialisation.
on_init_start
def on_init_start(self, algorithm: _BaseAlgorithm, *args: Any, **kwargs: Any) ‑> None:Inherited from:
BaseAlgorithmHook.on_init_start :
Run the hook at the very start of algorithm initialisation.
on_progress
def on_progress( self, algorithm: _BaseAlgorithm, context: Optional[TaskContext], step: str, current_epoch: Optional[int] = None, max_epochs: Optional[int] = None, step_info: Optional[str] = None, *args: Any, **kwargs: Any,):Emit telemetry for an arbitrary in-run algorithm progress update.
on_run_end
def on_run_end( self, algorithm: _BaseAlgorithm, context: Optional[TaskContext], *args: Any, **kwargs: Any,):Emit telemetry when algorithm execution ends.
on_run_start
def on_run_start( self, algorithm: _BaseAlgorithm, context: Optional[TaskContext], *args: Any, **kwargs: Any,):Emit telemetry when algorithm execution starts.
on_train_epoch_end
def on_train_epoch_end( self, current_epoch: int, min_epochs: Optional[int], max_epochs: Optional[int], *args: Any, **kwargs: Any,) ‑> None:Emit telemetry when an algorithm training epoch ends.
on_train_epoch_start
def on_train_epoch_start( self, current_epoch: int, min_epochs: Optional[int], max_epochs: Optional[int], *args: Any, **kwargs: Any,) ‑> None:Emit telemetry when an algorithm training epoch starts.
ProtocolTaskBatchRun
class ProtocolTaskBatchRun():Hook to report dataset statistics before and after protocol run.
Initialise the hook.
Ancestors
Methods
on_run_end
def on_run_end( self, protocol: bitfount.federated.protocols.base._BaseProtocol, context: TaskContext, *args: Any, **kwargs: Any,) ‑> None:Runs after protocol run to report dataset statistics.
SaveFailedFilesToDatabase
class SaveFailedFilesToDatabase():Hook to save failed files to database.
Initialise the hook.
Ancestors
Methods
on_resilience_end
def on_resilience_end( self, protocol: bitfount.federated.protocols.base._BaseProtocol, context: TaskContext, *args: Any, **kwargs: Any,) ‑> None:Runs after protocol run to save failed files to database.
SaveResultsToDatabase
class SaveResultsToDatabase():Hook to save protocol results to database.
Initialise the hook.
Ancestors
Methods
on_run_end
def on_run_end( self, protocol: bitfount.federated.protocols.base._BaseProtocol, context: TaskContext, *args: Any, **kwargs: Any,) ‑> None:Runs after protocol run to save results to database.
_ChildResultMessage
class _ChildResultMessage( status: bitfount.federated.worker_process.WorkerProcessStatus, datasource_state: dict[str, typing.Any] | None = None,):Base class for child-to-parent terminal messages on the result queue.
Variables
- static
datasource_state : dict[str, typing.Any] | None
- static
status : bitfount.federated.worker_process.WorkerProcessStatus
_WorkerProcessConfig
class _WorkerProcessConfig( *, ms_config: MessageServiceConfig, modeller_mailbox_id: str, modeller_name: str, pod_mailbox_ids: dict[str, str], task_id: str, aes_encryption_key: bytes, datasource: BaseSource, datasource_name: str, serialized_protocol: SerializedProtocol, serialized_protocol_bytes: bytes, parent_pod_identifier: str, mailbox_pod_identifier: str, schema: BitfountSchema, batched_execution: bool, test_run: bool, project_id: Optional[str], run_on_new_data_only: bool, force_rerun_failed_files: bool, inference_limits: dict[str, InferenceLimits], model_urls: dict[str, ModelURLs], pod_vitals: bitfount.federated.pod_vitals._PodVitals | None = None, pod_dp: DPPodConfig | None = None, project_db_connector: ProjectDbConnector | None = None, data_identifier: Optional[str] = None, secrets: APIKeys | RefreshableJWT | dict[typing.Literal[''codeBlockAnchor[bitfount](/api/bitfount/index)'', 'ehr'], APIKeys | RefreshableJWT] | None = None, ehr_config: Union[bitfount.federated.types.NextGenEHRConfig, SMARTStandaloneEHRConfig, SMARTBackendEHRConfig, ForwardRef(None)] = None, data_splitter: DatasetSplitter | None = None, task_hash: Optional[str] = None, username: Optional[str] = None, pod_public_keys_pem: dict[str, bytes] | None = None, private_key_pem: bytes | None = None, hook_factories: list[typing.Callable[[], NoneType]] = [],):A serialisable Dataclass that holds config to reinstantiate a Worker process.
All fields are picklable. The only _Worker attribute intentionally absent is
mailbox - the subprocess builds a fresh _WorkerMailbox from the fields below.
Variables
- static
aes_encryption_key : bytes
- static
batched_execution : bool
- static
data_identifier : Optional[str]
- static
data_splitter : DatasetSplitter | None
- static
datasource : BaseSource
- static
datasource_name : str
- static
ehr_config : Union[bitfount.federated.types.NextGenEHRConfig, SMARTStandaloneEHRConfig, SMARTBackendEHRConfig, ForwardRef(None)]
- static
force_rerun_failed_files : bool
- static
hook_factories : list[typing.Callable[[], NoneType]]
- static
inference_limits : dict[str, InferenceLimits]
- static
mailbox_pod_identifier : str
- static
model_urls : dict[str, ModelURLs]
- static
modeller_mailbox_id : str
- static
modeller_name : str
- static
ms_config : MessageServiceConfig
- static
parent_pod_identifier : str
- static
pod_dp : DPPodConfig | None
- static
pod_mailbox_ids : dict[str, str]
- static
pod_public_keys_pem : dict[str, bytes] | None
- static
pod_vitals : bitfount.federated.pod_vitals._PodVitals | None
- static
private_key_pem : bytes | None
- static
project_db_connector : ProjectDbConnector | None
- static
project_id : Optional[str]
- static
run_on_new_data_only : bool
- static
schema : BitfountSchema
- static
secrets : APIKeys | RefreshableJWT | dict[typing.Literal['bitfount', 'ehr'], APIKeys | RefreshableJWT] | None
- static
serialized_protocol : SerializedProtocol
- static
serialized_protocol_bytes : bytes
- static
task_hash : Optional[str]
- static
task_id : str
- static
test_run : bool
- static
username : Optional[str]