Skip to main content

resilience

Resilience handling for batch execution in federated protocols.

This module handles batch resilience, individual file retry logic, error reporting, and failure recovery mechanisms extracted from the main ProtocolExecution class.

Classes

ResilienceHandler

class ResilienceHandler(    protocol: BaseWorkerProtocol,    hook_kwargs: dict,    execute_run_func: Callable,    context: Optional[ProtocolContext] = None,):

Handles batch resilience, retry logic, and error reporting.

Methods


abort_task_all_batches_failed

async def abort_task_all_batches_failed(self, batch_config: BatchConfig)> None:

Abort the task when all batches failed and no recovery is possible.

Arguments

  • batch_config: The batch configuration object

Raises

  • BatchResilienceAbortError: Always raised to abort the task

build_batch_level_report_data

def build_batch_level_report_data(    self, batch_config: BatchConfig, report_timestamp: str,)> list:

Build data for batch-level failure report (original behavior).

Arguments

  • batch_config: The batch configuration object
  • report_timestamp: Timestamp for the report

Returns List of dictionaries containing batch-level failure data

build_file_level_report_data

def build_file_level_report_data(    self, batch_config: BatchConfig, report_timestamp: str,)> list:

Build data for file-level diagnosis report.

Arguments

  • batch_config: The batch configuration object
  • report_timestamp: Timestamp for the report

Returns List of dictionaries containing file-level diagnosis data

check_and_handle_zero_successful_batches

async def check_and_handle_zero_successful_batches(    self, batch_config: BatchConfig,)> None:

Check if all batches failed and handle task abort.

Arguments

  • batch_config: The batch configuration object

Raises

  • BatchResilienceAbortError: If all batches failed and no successful processing occurred, and individual file retry is not enabled or not applicable.

check_and_handle_zero_successful_files

async def check_and_handle_zero_successful_files(self, batch_config: BatchConfig)> None:

Check if both all batches failed AND all individual files failed.

This is called after individual file diagnosis to determine if we should abort.

Arguments

  • batch_config: The batch configuration object

Raises

  • BatchResilienceAbortError: If no files succeeded in either batch or individual processing

get_error_reports_save_dir

def get_error_reports_save_dir(self)> pathlib.Path:

Establish which directory to save error reports to.

Either uses the task results folder if available, or the current directory.

Returns Path object for where to save the batch resilience report

handle_batch_failure

async def handle_batch_failure(    self, error: Exception, batch_num: int, batch_config: BatchConfig,)> None:

Handle a batch failure with resilience logic.

Arguments

  • error: The exception that caused the batch to fail
  • batch_num: The batch number that failed
  • batch_config: The batch configuration object

handle_batch_resilience_and_reporting

async def handle_batch_resilience_and_reporting(self, batch_config: BatchConfig)> None:

Handle batch resilience reporting and individual file retry logic.

Arguments

  • batch_config: The batch configuration containing success/failure info.

retry_failed_files_individually

async def retry_failed_files_individually(self, batch_config: BatchConfig)> None:

Retry files from failed batches individually to identify problematic files.

Test each file from failed batches individually to determine if the file itself is problematic or if the issue was batch-level (e.g., resource constraints).

This runs as part of the batch execution flow, before the batch complete message is sent.

Arguments

  • batch_config: The batch configuration for the worker.

write_failed_files_report

def write_failed_files_report(self, batch_config: BatchConfig)> None:

Write CSV report for all failed batches and their files.

If individual file retry is enabled, creates a detailed file-level report.

write_immediate_batch_failure_report

def write_immediate_batch_failure_report(    self, batch_config: BatchConfig, batch_num: int,)> None:

Write immediate batch failure report to a single CSV file.

Appends the current batch failure to an ongoing report file, creating it with headers if it doesn't exist.

Arguments

  • batch_config: The batch configuration object
  • batch_num: The batch number that failed