Skip to main content

worker_process

Contains classes and methods for spawning the Worker in a dedicated process.

The parent owns the real worker mailbox and message-service connection. The child receives a proxy mailbox that forwards transport-sensitive operations back to the parent over IPC.

Module

Functions

_run_protocol_in_child

def _run_protocol_in_child(    config: _WorkerProcessConfig,    result_queue: multiprocessing.Queue[_ChildResultMessage],    request_queue: Any,    response_queue: multiprocessing.Queue[_ParentResponse] | None = None,    modeller_ready_event: multiprocessing.synchronize.Event | None = None,    heartbeat_state: Any = None,):

Target function for the child worker multiprocessing.Process.

Must be a plain module-level function (not a method) so it can be pickled by the spawn start method.

All exceptions are caught and placed on result_queue; this function must never raise.

Arguments

  • config: Serialisable configuration for reconstructing the worker.
  • result_queue: Queue for sending results back to the parent process.
  • request_queue: Queue for child requests that the parent broker fulfills, or the legacy modeller_ready_event when called via older test wrappers.
  • response_queue: Queue for parent responses to child IPC requests. Optional for backwards-compatible test wrappers.
  • modeller_ready_event: Cross-process event set by the parent when the modeller's TASK_START message is received. A daemon thread in this child watches it and sets mailbox.modeller_ready. Optional for backwards-compatible test wrappers.
  • heartbeat_state: Shared runtime state reserved for child liveness tracking in later milestones.

_run_protocol_in_child_target

def _run_protocol_in_child_target(    config: _WorkerProcessConfig,    result_queue: multiprocessing.Queue[_ChildResultMessage],    request_queue: Any,    response_queue: multiprocessing.Queue[_ParentResponse] | None = None,    modeller_ready_event: multiprocessing.synchronize.Event | None = None,    heartbeat_state: Any = None,)> Any:

Target function for the child worker multiprocessing.Process.

Must be a plain module-level function (not a method) so it can be pickled by the spawn start method.

All exceptions are caught and placed on result_queue; this function must never raise.

Arguments

  • config: Serialisable configuration for reconstructing the worker.
  • result_queue: Queue for sending results back to the parent process.
  • request_queue: Queue for child requests that the parent broker fulfills, or the legacy modeller_ready_event when called via older test wrappers.
  • response_queue: Queue for parent responses to child IPC requests. Optional for backwards-compatible test wrappers.
  • modeller_ready_event: Cross-process event set by the parent when the modeller's TASK_START message is received. A daemon thread in this child watches it and sets mailbox.modeller_ready. Optional for backwards-compatible test wrappers.
  • heartbeat_state: Shared runtime state reserved for child liveness tracking in later milestones.

Classes

AlgorithmProgress

class AlgorithmProgress():

Emits AlgorithmProgressEvent at algorithm run and epoch boundaries.

Initialise the hook.

Variables

  • type : HookType - Return the hook type.

Methods


on_init_end

def on_init_end(self, algorithm: _BaseAlgorithm, *args: Any, **kwargs: Any)> None:

Inherited from:

BaseAlgorithmHook.on_init_end :

Run the hook at the very end of algorithm initialisation.

on_init_start

def on_init_start(self, algorithm: _BaseAlgorithm, *args: Any, **kwargs: Any)> None:

Inherited from:

BaseAlgorithmHook.on_init_start :

Run the hook at the very start of algorithm initialisation.

on_progress

def on_progress(    self,    algorithm: _BaseAlgorithm,    context: Optional[TaskContext],    step: str,    current_epoch: Optional[int] = None,    max_epochs: Optional[int] = None,    step_info: Optional[str] = None,    *args: Any,    **kwargs: Any,):

Emit telemetry for an arbitrary in-run algorithm progress update.

on_run_end

def on_run_end(    self,    algorithm: _BaseAlgorithm,    context: Optional[TaskContext],    *args: Any,    **kwargs: Any,):

Emit telemetry when algorithm execution ends.

on_run_start

def on_run_start(    self,    algorithm: _BaseAlgorithm,    context: Optional[TaskContext],    *args: Any,    **kwargs: Any,):

Emit telemetry when algorithm execution starts.

on_train_epoch_end

def on_train_epoch_end(    self,    current_epoch: int,    min_epochs: Optional[int],    max_epochs: Optional[int],    *args: Any,    **kwargs: Any,)> None:

Emit telemetry when an algorithm training epoch ends.

on_train_epoch_start

def on_train_epoch_start(    self,    current_epoch: int,    min_epochs: Optional[int],    max_epochs: Optional[int],    *args: Any,    **kwargs: Any,)> None:

Emit telemetry when an algorithm training epoch starts.

ProtocolTaskBatchRun

class ProtocolTaskBatchRun():

Hook to report dataset statistics before and after protocol run.

Initialise the hook.

Methods


on_run_end

def on_run_end(    self,    protocol: bitfount.federated.protocols.base._BaseProtocol,    context: TaskContext,    *args: Any,    **kwargs: Any,)> None:

Runs after protocol run to report dataset statistics.

SaveFailedFilesToDatabase

class SaveFailedFilesToDatabase():

Hook to save failed files to database.

Initialise the hook.

Methods


on_resilience_end

def on_resilience_end(    self,    protocol: bitfount.federated.protocols.base._BaseProtocol,    context: TaskContext,    *args: Any,    **kwargs: Any,)> None:

Runs after protocol run to save failed files to database.

SaveResultsToDatabase

class SaveResultsToDatabase():

Hook to save protocol results to database.

Initialise the hook.

Methods


on_run_end

def on_run_end(    self,    protocol: bitfount.federated.protocols.base._BaseProtocol,    context: TaskContext,    *args: Any,    **kwargs: Any,)> None:

Runs after protocol run to save results to database.

_ChildResultMessage

class _ChildResultMessage(    status: bitfount.federated.worker_process.WorkerProcessStatus,    datasource_state: dict[str, typing.Any] | None = None,):

Base class for child-to-parent terminal messages on the result queue.

Variables

  • static datasource_state : dict[str, typing.Any] | None
  • static status : bitfount.federated.worker_process.WorkerProcessStatus

_WorkerProcessConfig

class _WorkerProcessConfig(    *,    ms_config: MessageServiceConfig,    modeller_mailbox_id: str,    modeller_name: str,    pod_mailbox_ids: dict[str, str],    task_id: str,    aes_encryption_key: bytes,    datasource: BaseSource,    datasource_name: str,    serialized_protocol: SerializedProtocol,    serialized_protocol_bytes: bytes,    parent_pod_identifier: str,    mailbox_pod_identifier: str,    schema: BitfountSchema,    batched_execution: bool,    test_run: bool,    project_id: Optional[str],    run_on_new_data_only: bool,    force_rerun_failed_files: bool,    inference_limits: dict[str, InferenceLimits],    model_urls: dict[str, ModelURLs],    pod_vitals: bitfount.federated.pod_vitals._PodVitals | None = None,    pod_dp: DPPodConfig | None = None,    project_db_connector: ProjectDbConnector | None = None,    data_identifier: Optional[str] = None,    secrets: APIKeys | RefreshableJWT | dict[typing.Literal[''codeBlockAnchor[bitfount](/api/bitfount/index)'', 'ehr'], APIKeys | RefreshableJWT] | None = None,    ehr_config: Union[bitfount.federated.types.NextGenEHRConfig, SMARTStandaloneEHRConfig, SMARTBackendEHRConfig, ForwardRef(None)] = None,    data_splitter: DatasetSplitter | None = None,    task_hash: Optional[str] = None,    username: Optional[str] = None,    pod_public_keys_pem: dict[str, bytes] | None = None,    private_key_pem: bytes | None = None,    hook_factories: list[typing.Callable[[], NoneType]] = [],):

A serialisable Dataclass that holds config to reinstantiate a Worker process.

All fields are picklable. The only _Worker attribute intentionally absent is mailbox - the subprocess builds a fresh _WorkerMailbox from the fields below.

Variables

  • static aes_encryption_key : bytes
  • static batched_execution : bool
  • static data_identifier : Optional[str]
  • static datasource_name : str
  • static force_rerun_failed_files : bool
  • static mailbox_pod_identifier : str
  • static modeller_mailbox_id : str
  • static modeller_name : str
  • static parent_pod_identifier : str
  • static pod_mailbox_ids : dict[str, str]
  • static pod_public_keys_pem : dict[str, bytes] | None
  • static pod_vitals : bitfount.federated.pod_vitals._PodVitals | None
  • static private_key_pem : bytes | None
  • static project_id : Optional[str]
  • static run_on_new_data_only : bool
  • static serialized_protocol_bytes : bytes
  • static task_hash : Optional[str]
  • static task_id : str
  • static test_run : bool
  • static username : Optional[str]