Skip to main content

base_source

Module containing BaseSource class.

BaseSource is the abstract data source class from which all concrete data sources must inherit.

Classes

BaseSource

class BaseSource(    data_splitter: Optional[DatasetSplitter] = None,    seed: Optional[int] = None,    ignore_cols: Optional[Union[str, Sequence[str]]] = None,    iterable: bool = True,    modifiers: Optional[dict[str, DataPathModifiers]] = None,    partition_size: int = 16,    required_fields: Optional[dict[str, Any]] = None,    **kwargs: Any,):

Abstract Base Source from which all other data sources must inherit.

This is used for streaming data in batches as opposed to loading the entire dataset into memory.

Arguments

  • data_splitter: Deprecated argument, will be removed in a future release. Defaults to None. Not used.
  • seed: Random number seed. Used for setting random seed for all libraries. Defaults to None.
  • ignore_cols: Column/list of columns to be ignored from the data. Defaults to None.
  • modifiers: Dictionary used for modifying paths/ extensions in the dataframe. Defaults to None.
  • partition_size: The size of each partition when iterating over the data in a batched fashion.

Attributes

  • seed: Random number seed. Used for setting random seed for all libraries.

Subclasses

Variables

  • static has_predefined_schema : bool
  • is_initialised : bool - Checks if BaseSource was initialised.
  • is_task_running : bool - Returns True if a task is running.
  • supports_project_db : bool - Whether the datasource supports the project database.

    Each datasource needs to implement its own methods to define how what its project database table should look like. If the datasource does not implement the methods to get the table creation query and columns, it does not support the projectdatabase.

Methods


add_hook

def add_hook(self, hook: DataSourceHook)> None:

Add a hook to the datasource.

apply_ignore_cols

def apply_ignore_cols(self, df: pd.DataFrame)> pandas.core.frame.DataFrame:

Apply ignored columns to dataframe, dropping columns as needed.

Returns A copy of the dataframe with ignored columns removed, or the original dataframe if this datasource does not specify any ignore columns.

apply_ignore_cols_iter

def apply_ignore_cols_iter(    self, dfs: Iterator[pd.DataFrame],)> collections.abc.Iterator[pandas.core.frame.DataFrame]:

Apply ignored columns to dataframes from iterator.

apply_modifiers

def apply_modifiers(self, df: pd.DataFrame)> pandas.core.frame.DataFrame:

Apply column modifiers to the dataframe.

If no modifiers are specified, returns the dataframe unchanged.

get_data

def get_data(    self, data_keys: SingleOrMulti[str], *, use_cache: bool = True, **kwargs: Any,)> Optional[pandas.core.frame.DataFrame]:

Get data corresponding to the provided data key(s).

Can be used to return data for a single data key or for multiple at once. If used for multiple, the order of the output dataframe must match the order of the keys provided.

Arguments

  • data_keys: Key(s) for which to get the data of. These may be things such as file names, UUIDs, etc.
  • use_cache: Whether the cache should be used to retrieve data for these keys. Note that cached data may have some elements, particularly image-related fields such as image data or file paths, replaced with placeholder values when stored in the cache. If datacache is set on the instance, data will be _set in the cache, regardless of this argument.
  • ****kwargs**: Additional keyword arguments.

Returns A dataframe containing the data, ordered to match the order of keys in data_keys, or None if no data for those keys was available.

get_datasource_metrics

def get_datasource_metrics(    self, use_skip_codes: bool = False,)> DatasourceSummaryStats:

Get metadata about this datasource.

This can be used to store information about the datasource that may be useful for debugging or tracking purposes. The metadata will be stored in the project database.

Arguments

  • use_skip_codes: Whether to use the skip reason codes as the keys in the skip_reasons dictionary, rather than the existing reason descriptions.

Returns A dictionary containing metadata about this datasource.

get_project_db_sqlite_columns

def get_project_db_sqlite_columns(self)> list[str]:

Implement this method to get the required columns.

This is used by the "run on new data only" feature. This is used to add data to the task table in the project database.

get_project_db_sqlite_create_table_query

def get_project_db_sqlite_create_table_query(self)> str:

Implement this method to return the required columns and types.

This is used by the "run on new data only" feature. This should be in the format that can be used after a "CREATE TABLE" statement and is used to create the task table in the project database.

get_schema

def get_schema(self)> dict[str, typing.Any]:

Get the pre-defined schema for this datasource.

This method should be overridden by datasources that have pre-defined schemas (i.e., those with has_predefined_schema = True).

Returns The schema as a dictionary.

Raises

  • NotImplementedError: If the datasource doesn't have a pre-defined schema.

partition

def partition(    self, iterable: Iterable[_I], partition_size: int = 1,)> collections.abc.Iterable[collections.abc.Sequence[~_I]]:

Takes an iterable and yields partitions of size partition_size.

The final partition may be less than size partition_size due to the variable length of the iterable.

remove_hook

def remove_hook(self, hook: DataSourceHook)> None:

Remove a hook from the datasource.

yield_data

def yield_data(    self,    data_keys: Optional[SingleOrMulti[str]] = None,    *,    use_cache: bool = True,    partition_size: Optional[int] = None,    **kwargs: Any,)> collections.abc.Iterator[pandas.core.frame.DataFrame]:

Yields data in batches from this source.

If data_keys is specified, only yield from that subset of the data. Otherwise, iterate through the whole datasource.

Arguments

  • data_keys: An optional list of data keys to use for yielding data. Otherwise, all data in the datasource will be considered. data_keys is always provided when this method is called from the Dataset as part of a task.
  • use_cache: Whether the cache should be used to retrieve data for these data points. Note that cached data may have some elements, particularly image-related fields such as image data or file paths, replaced with placeholder values when stored in the cache. If datacache is set on the instance, data will be _set in the cache, regardless of this argument.
  • partition_size: The number of data elements to load/yield in each iteration. If not provided, defaults to the partition size configured in the datasource.
  • ****kwargs**: Additional keyword arguments.

FileSystemIterableSource

class FileSystemIterableSource(    path: Union[os.PathLike, str],    output_path: Optional[Union[os.PathLike, str]] = None,    iterable: bool = True,    fast_load: bool = True,    cache_images: bool = False,    filter: Optional[FileSystemFilter] = None,    data_splitter: Optional[DatasetSplitter] = None,    seed: Optional[int] = None,    ignore_cols: Optional[Union[str, Sequence[str]]] = None,    modifiers: Optional[dict[str, DataPathModifiers]] = None,    partition_size: int = 16,    required_fields: Optional[dict[str, Any]] = None,):

Abstract base source that supports iterating over file-based data.

This is used for Iterable data sources that whose data is stored as files on disk.

Arguments

  • cache_images: Whether to cache images in the file system. Defaults to False. This is ignored if fast_load is True.
  • data_splitter: Deprecated argument, will be removed in a future release. Defaults to None. Not used.
  • fast_load: Whether the data will be loaded in fast mode. This is used to determine whether the data will be iterated over during set up for schema generation and splitting (where necessary). Only relevant if iterable is True, otherwise it is ignored. Defaults to True.
  • ignore_cols: Column/list of columns to be ignored from the data. Defaults to None.
  • iterable: Whether the data source is iterable. This is used to determine whether the data source can be used in a streaming context during a task. Defaults to True.
  • modifiers: Dictionary used for modifying paths/ extensions in the dataframe. Defaults to None.
  • output_path: The path where to save intermediary output files. Defaults to 'preprocessed/'.
  • partition_size: The size of each partition when iterating over the data in a batched fashion.
  • path: Path to the directory which contains the data files. Subdirectories will be searched recursively.
  • seed: Random number seed. Used for setting random seed for all libraries. Defaults to None.

Attributes

  • seed: Random number seed. Used for setting random seed for all libraries.

Raises

  • ValueError: If iterable is False or fast_load is False or cache_images is True.

Variables

  • file_names : list[str] - Returns a list of file names in the specified directory.

    .. deprecated:: The file_names property is deprecated and will be removed in a future release. Use file_names_iter(as_strs=True) for memory-efficient iteration, or list(file_names_iter(as_strs=True)) if you need a list.

    This property accounts for files skipped at runtime by filtering them out of the list of cached file names. Files may get skipped at runtime due to errors or because they don't contain any image data and images_only is True. This allows us to skip these files again more quickly if they are still present in the directory.

  • is_initialised : bool - Checks if BaseSource was initialised.
  • is_task_running : bool - Returns True if a task is running.
  • path : pathlib.Path - Resolved absolute path to data.

    Provides a consistent version of the path provided by the user which should work throughout regardless of operating system and of directory structure.

  • selected_file_names : list[str] - Returns a list of selected file names as strings.

    Selected file names are affected by the selected_file_names_override and new_file_names_only attributes.

    WARNING: This method loads all filenames into memory. For large datasets, consider using selected_file_names_iter() instead.

  • selected_file_names_differ : bool - Returns True if selected_file_names will differ from default.

    In particular, returns True iff there is a selected file names override in place and/or there is filtering for new file names only present.

  • supports_project_db : bool - Whether the datasource supports the project database.

    Each datasource needs to implement its own methods to define how what its project database table should look like. If the datasource does not implement the methods to get the table creation query and columns, it does not support the projectdatabase.

Static methods


get_num_workers

def get_num_workers(file_names: Sequence[str])> int:

Inherited from:

MultiProcessingMixIn.get_num_workers :

Gets the number of workers to use for multiprocessing.

Ensures that the number of workers is at least 1 and at most equal to MAX_NUM_MULTIPROCESSING_WORKERS. If the number of files is less than MAX_NUM_MULTIPROCESSING_WORKERS, then we use the number of files as the number of workers. Unless the number of machine cores is also less than MAX_NUM_MULTIPROCESSING_WORKERS, in which case we use the lower of the two.

Arguments

  • file_names: The list of file names to load.

Returns The number of workers to use for multiprocessing.

Methods


add_hook

def add_hook(self, hook: DataSourceHook)> None:

Inherited from:

BaseSource.add_hook :

Add a hook to the datasource.

apply_ignore_cols

def apply_ignore_cols(self, df: pd.DataFrame)> pandas.core.frame.DataFrame:

Inherited from:

BaseSource.apply_ignore_cols :

Apply ignored columns to dataframe, dropping columns as needed.

Returns A copy of the dataframe with ignored columns removed, or the original dataframe if this datasource does not specify any ignore columns.

apply_ignore_cols_iter

def apply_ignore_cols_iter(    self, dfs: Iterator[pd.DataFrame],)> collections.abc.Iterator[pandas.core.frame.DataFrame]:

Inherited from:

BaseSource.apply_ignore_cols_iter :

Apply ignored columns to dataframes from iterator.

apply_modifiers

def apply_modifiers(self, df: pd.DataFrame)> pandas.core.frame.DataFrame:

Inherited from:

BaseSource.apply_modifiers :

Apply column modifiers to the dataframe.

If no modifiers are specified, returns the dataframe unchanged.

clear_dataset_cache

def clear_dataset_cache(self)> dict[str, typing.Any]:

Clear all dataset cache for this data source.

This clears both:

  1. The file names cache (Python cached_property)
  2. The dataset cache file (deletes the SQLite database file completely)

Returns Dictionary with cache clearing results.

clear_file_names_cache

def clear_file_names_cache(self)> None:

Clears the list of selected file names.

This allows the datasource to pick up any new files that have been added to the directory since the last time it was cached.

file_names_iter

def file_names_iter(    self, as_strs: bool = False,)> Union[collections.abc.Iterator[pathlib.Path], collections.abc.Iterator[str]]:

Iterate over files in a directory, yielding those that match the criteria.

Arguments

  • as_strs: By default the files yielded will be yielded as Path objects. If this is True, yield them as strings instead.

get_all_cached_file_paths

def get_all_cached_file_paths(self)> list[str]:

Get all file paths that are currently stored in the cache.

Returns A list of file paths that have cache entries, or an empty list if there is no cache or the cache hasn't been initialized.

get_data

def get_data(    self, data_keys: SingleOrMulti[str], *, use_cache: bool = True, **kwargs: Any,)> Optional[pandas.core.frame.DataFrame]:

Inherited from:

BaseSource.get_data :

Get data corresponding to the provided data key(s).

Can be used to return data for a single data key or for multiple at once. If used for multiple, the order of the output dataframe must match the order of the keys provided.

Arguments

  • data_keys: Key(s) for which to get the data of. These may be things such as file names, UUIDs, etc.
  • use_cache: Whether the cache should be used to retrieve data for these keys. Note that cached data may have some elements, particularly image-related fields such as image data or file paths, replaced with placeholder values when stored in the cache. If datacache is set on the instance, data will be _set in the cache, regardless of this argument.
  • ****kwargs**: Additional keyword arguments.

Returns A dataframe containing the data, ordered to match the order of keys in data_keys, or None if no data for those keys was available.

get_datasource_metrics

def get_datasource_metrics(    self, use_skip_codes: bool = False,)> DatasourceSummaryStats:

Inherited from:

BaseSource.get_datasource_metrics :

Get metadata about this datasource.

This can be used to store information about the datasource that may be useful for debugging or tracking purposes. The metadata will be stored in the project database.

Arguments

  • use_skip_codes: Whether to use the skip reason codes as the keys in the skip_reasons dictionary, rather than the existing reason descriptions.

Returns A dictionary containing metadata about this datasource.

get_project_db_sqlite_columns

def get_project_db_sqlite_columns(self)> list[str]:

Returns the required columns to identify a data point.

get_project_db_sqlite_create_table_query

def get_project_db_sqlite_create_table_query(self)> str:

Returns the required columns and types to identify a data point.

The file name is used as the primary key and the last modified date is used to determine if the file has been updated since the last time it was processed. If there is a conflict on the file name, the row is replaced with the new data to ensure that the last modified date is always up to date.

get_schema

def get_schema(self)> dict[str, typing.Any]:

Inherited from:

BaseSource.get_schema :

Get the pre-defined schema for this datasource.

This method should be overridden by datasources that have pre-defined schemas (i.e., those with has_predefined_schema = True).

Returns The schema as a dictionary.

Raises

  • NotImplementedError: If the datasource doesn't have a pre-defined schema.

has_uncached_files

def has_uncached_files(self)> bool:

Returns True if there are any files in the datasource not yet cached.

partition

def partition(    self, iterable: Iterable[_I], partition_size: int = 1,)> collections.abc.Iterable[collections.abc.Sequence[~_I]]:

Partition the iterable into chunks of the given size.

remove_hook

def remove_hook(self, hook: DataSourceHook)> None:

Inherited from:

BaseSource.remove_hook :

Remove a hook from the datasource.

selected_file_names_iter

def selected_file_names_iter(self)> collections.abc.Iterator[str]:

Returns an iterator over selected file names.

Selected file names are affected by the selected_file_names_override and new_file_names_only attributes.

Returns Iterator over selected file names.

skip_file

def skip_file(self, filename: str, reason: FileSkipReason)> None:

Skip a file by updating cache and skipped_files set.

The first reason is always the one recorded in the data cache.

Arguments

  • filename: Path to the file being skipped
  • reason: Reason for skipping the file

use_file_multiprocessing

def use_file_multiprocessing(self, file_names: Sequence[str])> bool:

Inherited from:

MultiProcessingMixIn.use_file_multiprocessing :

Check if file multiprocessing should be used.

Returns True if file multiprocessing has been enabled by the environment variable and the number of workers would be greater than 1, otherwise False. There is no need to use file multiprocessing if we are just going to use one worker - it would be slower than just loading the data in the main process.

Returns True if file multiprocessing should be used, otherwise False.

yield_data

def yield_data(    self,    data_keys: Optional[SingleOrMulti[str]] = None,    *,    use_cache: bool = True,    partition_size: Optional[int] = None,    **kwargs: Any,)> collections.abc.Iterator[pandas.core.frame.DataFrame]:

Inherited from:

BaseSource.yield_data :

Yields data in batches from this source.

If data_keys is specified, only yield from that subset of the data. Otherwise, iterate through the whole datasource.

Arguments

  • data_keys: An optional list of data keys to use for yielding data. Otherwise, all data in the datasource will be considered. data_keys is always provided when this method is called from the Dataset as part of a task.
  • use_cache: Whether the cache should be used to retrieve data for these data points. Note that cached data may have some elements, particularly image-related fields such as image data or file paths, replaced with placeholder values when stored in the cache. If datacache is set on the instance, data will be _set in the cache, regardless of this argument.
  • partition_size: The number of data elements to load/yield in each iteration. If not provided, defaults to the partition size configured in the datasource.
  • ****kwargs**: Additional keyword arguments.

FileSystemIterableSourceInferrable

class FileSystemIterableSourceInferrable(    path: Union[os.PathLike, str],    data_cache: Optional[DataPersister] = None,    infer_class_labels_from_filepaths: bool = False,    output_path: Optional[Union[os.PathLike, str]] = None,    iterable: bool = True,    fast_load: bool = True,    cache_images: bool = False,    filter: Optional[FileSystemFilter] = None,    data_splitter: Optional[DatasetSplitter] = None,    seed: Optional[int] = None,    ignore_cols: Optional[Union[str, Sequence[str]]] = None,    modifiers: Optional[dict[str, DataPathModifiers]] = None,    partition_size: int = 16,    required_fields: Optional[dict[str, Any]] = None,):

Base source that supports iterating over folder-labelled, file-based data.

This is used for data sources whose data is stored as files on disk, and for which the folder structure (potentially) contains labelling information (e.g. the files are split into "test/", "train/", and "validate/" folders).

Arguments

  • cache_images: Whether to cache images in the file system. Defaults to False. This is ignored if fast_load is True.
  • data_cache: A DataPersister instance to use for data caching.
  • data_splitter: Deprecated argument, will be removed in a future release. Defaults to None. Not used.
  • fast_load: Whether the data will be loaded in fast mode. This is used to determine whether the data will be iterated over during set up for schema generation and splitting (where necessary). Only relevant if iterable is True, otherwise it is ignored. Defaults to True.
  • ignore_cols: Column/list of columns to be ignored from the data. Defaults to None.
  • infer_class_labels_from_filepaths: Whether class labels should be added to the data based on the filepath of the files. Defaults to the first directory within self.path, but can go a level deeper if the datasplitter is provided with infer_data_split_labels set to true
  • iterable: Whether the data source is iterable. This is used to determine whether the data source can be used in a streaming context during a task. Defaults to True.
  • modifiers: Dictionary used for modifying paths/ extensions in the dataframe. Defaults to None.
  • output_path: The path where to save intermediary output files. Defaults to 'preprocessed/'.
  • partition_size: The size of each partition when iterating over the data in a batched fashion.
  • path: Path to the directory which contains the data files. Subdirectories will be searched recursively.
  • seed: Random number seed. Used for setting random seed for all libraries. Defaults to None.

Attributes

  • seed: Random number seed. Used for setting random seed for all libraries.

Raises

  • ValueError: If iterable is False or fast_load is False or cache_images is True.

Subclasses

  • DICOMSource
  • ImageSource
  • bitfount.data.datasources.ophthalmology.ophthalmology_base_source._OphthalmologySource

Variables

  • file_names : list[str] - Returns a list of file names in the specified directory.

    .. deprecated:: The file_names property is deprecated and will be removed in a future release. Use file_names_iter(as_strs=True) for memory-efficient iteration, or list(file_names_iter(as_strs=True)) if you need a list.

    This property accounts for files skipped at runtime by filtering them out of the list of cached file names. Files may get skipped at runtime due to errors or because they don't contain any image data and images_only is True. This allows us to skip these files again more quickly if they are still present in the directory.

  • is_initialised : bool - Checks if BaseSource was initialised.
  • is_task_running : bool - Returns True if a task is running.
  • path : pathlib.Path - Resolved absolute path to data.

    Provides a consistent version of the path provided by the user which should work throughout regardless of operating system and of directory structure.

  • selected_file_names : list[str] - Returns a list of selected file names as strings.

    Selected file names are affected by the selected_file_names_override and new_file_names_only attributes.

    WARNING: This method loads all filenames into memory. For large datasets, consider using selected_file_names_iter() instead.

  • selected_file_names_differ : bool - Returns True if selected_file_names will differ from default.

    In particular, returns True iff there is a selected file names override in place and/or there is filtering for new file names only present.

  • supports_project_db : bool - Whether the datasource supports the project database.

    Each datasource needs to implement its own methods to define how what its project database table should look like. If the datasource does not implement the methods to get the table creation query and columns, it does not support the projectdatabase.

Static methods


get_num_workers

def get_num_workers(file_names: Sequence[str])> int:

Inherited from:

FileSystemIterableSource.get_num_workers :

Gets the number of workers to use for multiprocessing.

Ensures that the number of workers is at least 1 and at most equal to MAX_NUM_MULTIPROCESSING_WORKERS. If the number of files is less than MAX_NUM_MULTIPROCESSING_WORKERS, then we use the number of files as the number of workers. Unless the number of machine cores is also less than MAX_NUM_MULTIPROCESSING_WORKERS, in which case we use the lower of the two.

Arguments

  • file_names: The list of file names to load.

Returns The number of workers to use for multiprocessing.

Methods


add_hook

def add_hook(self, hook: DataSourceHook)> None:

Inherited from:

FileSystemIterableSource.add_hook :

Add a hook to the datasource.

apply_ignore_cols

def apply_ignore_cols(self, df: pd.DataFrame)> pandas.core.frame.DataFrame:

Inherited from:

FileSystemIterableSource.apply_ignore_cols :

Apply ignored columns to dataframe, dropping columns as needed.

Returns A copy of the dataframe with ignored columns removed, or the original dataframe if this datasource does not specify any ignore columns.

apply_ignore_cols_iter

def apply_ignore_cols_iter(    self, dfs: Iterator[pd.DataFrame],)> collections.abc.Iterator[pandas.core.frame.DataFrame]:

Inherited from:

FileSystemIterableSource.apply_ignore_cols_iter :

Apply ignored columns to dataframes from iterator.

apply_modifiers

def apply_modifiers(self, df: pd.DataFrame)> pandas.core.frame.DataFrame:

Inherited from:

FileSystemIterableSource.apply_modifiers :

Apply column modifiers to the dataframe.

If no modifiers are specified, returns the dataframe unchanged.

clear_dataset_cache

def clear_dataset_cache(self)> dict[str, typing.Any]:

Inherited from:

FileSystemIterableSource.clear_dataset_cache :

Clear all dataset cache for this data source.

This clears both:

  1. The file names cache (Python cached_property)
  2. The dataset cache file (deletes the SQLite database file completely)

Returns Dictionary with cache clearing results.

clear_file_names_cache

def clear_file_names_cache(self)> None:

Inherited from:

FileSystemIterableSource.clear_file_names_cache :

Clears the list of selected file names.

This allows the datasource to pick up any new files that have been added to the directory since the last time it was cached.

file_names_iter

def file_names_iter(    self, as_strs: bool = False,)> Union[collections.abc.Iterator[pathlib.Path], collections.abc.Iterator[str]]:

Inherited from:

FileSystemIterableSource.file_names_iter :

Iterate over files in a directory, yielding those that match the criteria.

Arguments

  • as_strs: By default the files yielded will be yielded as Path objects. If this is True, yield them as strings instead.

get_all_cached_file_paths

def get_all_cached_file_paths(self)> list[str]:

Inherited from:

FileSystemIterableSource.get_all_cached_file_paths :

Get all file paths that are currently stored in the cache.

Returns A list of file paths that have cache entries, or an empty list if there is no cache or the cache hasn't been initialized.

get_data

def get_data(    self, data_keys: SingleOrMulti[str], *, use_cache: bool = True, **kwargs: Any,)> Optional[pandas.core.frame.DataFrame]:

Inherited from:

FileSystemIterableSource.get_data :

Get data corresponding to the provided data key(s).

Can be used to return data for a single data key or for multiple at once. If used for multiple, the order of the output dataframe must match the order of the keys provided.

Arguments

  • data_keys: Key(s) for which to get the data of. These may be things such as file names, UUIDs, etc.
  • use_cache: Whether the cache should be used to retrieve data for these keys. Note that cached data may have some elements, particularly image-related fields such as image data or file paths, replaced with placeholder values when stored in the cache. If datacache is set on the instance, data will be _set in the cache, regardless of this argument.
  • ****kwargs**: Additional keyword arguments.

Returns A dataframe containing the data, ordered to match the order of keys in data_keys, or None if no data for those keys was available.

get_datasource_metrics

def get_datasource_metrics(    self, use_skip_codes: bool = False,)> DatasourceSummaryStats:

Inherited from:

FileSystemIterableSource.get_datasource_metrics :

Get metadata about this datasource.

This can be used to store information about the datasource that may be useful for debugging or tracking purposes. The metadata will be stored in the project database.

Arguments

  • use_skip_codes: Whether to use the skip reason codes as the keys in the skip_reasons dictionary, rather than the existing reason descriptions.

Returns A dictionary containing metadata about this datasource.

get_project_db_sqlite_columns

def get_project_db_sqlite_columns(self)> list[str]:

Inherited from:

FileSystemIterableSource.get_project_db_sqlite_columns :

Returns the required columns to identify a data point.

get_project_db_sqlite_create_table_query

def get_project_db_sqlite_create_table_query(self)> str:

Inherited from:

FileSystemIterableSource.get_project_db_sqlite_create_table_query :

Returns the required columns and types to identify a data point.

The file name is used as the primary key and the last modified date is used to determine if the file has been updated since the last time it was processed. If there is a conflict on the file name, the row is replaced with the new data to ensure that the last modified date is always up to date.

get_schema

def get_schema(self)> dict[str, typing.Any]:

Inherited from:

FileSystemIterableSource.get_schema :

Get the pre-defined schema for this datasource.

This method should be overridden by datasources that have pre-defined schemas (i.e., those with has_predefined_schema = True).

Returns The schema as a dictionary.

Raises

  • NotImplementedError: If the datasource doesn't have a pre-defined schema.

has_uncached_files

def has_uncached_files(self)> bool:

Inherited from:

FileSystemIterableSource.has_uncached_files :

Returns True if there are any files in the datasource not yet cached.

partition

def partition(    self, iterable: Iterable[_I], partition_size: int = 1,)> collections.abc.Iterable[collections.abc.Sequence[~_I]]:

Inherited from:

FileSystemIterableSource.partition :

Partition the iterable into chunks of the given size.

remove_hook

def remove_hook(self, hook: DataSourceHook)> None:

Inherited from:

FileSystemIterableSource.remove_hook :

Remove a hook from the datasource.

selected_file_names_iter

def selected_file_names_iter(self)> collections.abc.Iterator[str]:

Inherited from:

FileSystemIterableSource.selected_file_names_iter :

Returns an iterator over selected file names.

Selected file names are affected by the selected_file_names_override and new_file_names_only attributes.

Returns Iterator over selected file names.

skip_file

def skip_file(self, filename: str, reason: FileSkipReason)> None:

Inherited from:

FileSystemIterableSource.skip_file :

Skip a file by updating cache and skipped_files set.

The first reason is always the one recorded in the data cache.

Arguments

  • filename: Path to the file being skipped
  • reason: Reason for skipping the file

use_file_multiprocessing

def use_file_multiprocessing(self, file_names: Sequence[str])> bool:

Inherited from:

FileSystemIterableSource.use_file_multiprocessing :

Check if file multiprocessing should be used.

Returns True if file multiprocessing has been enabled by the environment variable and the number of workers would be greater than 1, otherwise False. There is no need to use file multiprocessing if we are just going to use one worker - it would be slower than just loading the data in the main process.

Returns True if file multiprocessing should be used, otherwise False.

yield_data

def yield_data(    self,    data_keys: Optional[SingleOrMulti[str]] = None,    *,    use_cache: bool = True,    partition_size: Optional[int] = None,    **kwargs: Any,)> collections.abc.Iterator[pandas.core.frame.DataFrame]:

Inherited from:

FileSystemIterableSource.yield_data :

Yields data in batches from this source.

If data_keys is specified, only yield from that subset of the data. Otherwise, iterate through the whole datasource.

Arguments

  • data_keys: An optional list of data keys to use for yielding data. Otherwise, all data in the datasource will be considered. data_keys is always provided when this method is called from the Dataset as part of a task.
  • use_cache: Whether the cache should be used to retrieve data for these data points. Note that cached data may have some elements, particularly image-related fields such as image data or file paths, replaced with placeholder values when stored in the cache. If datacache is set on the instance, data will be _set in the cache, regardless of this argument.
  • partition_size: The number of data elements to load/yield in each iteration. If not provided, defaults to the partition size configured in the datasource.
  • ****kwargs**: Additional keyword arguments.

FileYieldingPerformanceHook

class FileYieldingPerformanceHook():

Hooks to measure the performance of the file yielding process.

Attributes

  • _last_seen_file_num: Most recent cumulative file counter observed.
  • _last_event_time: Epoch time of the most recent event, in seconds.
  • _rolling_count: Number of files observed.
  • _rolling_mean: Rolling mean of per-file durations in seconds.
  • _rolling_M2: Accumulated sum of squares of differences from the current mean (for variance computation).

Initialise the hook.

Ancestors

Variables

  • type : HookType - Return the hook type.

Methods


on_batches_complete

def on_batches_complete(    self,    task_id: str,    modeller_username: str,    total_batches: int,    total_files: int,    *args: Any,    **kwargs: Any,)> None:

Inherited from:

BasePodHook.on_batches_complete :

Run the hook when all batches are processed but before resilience starts.

on_file_filter_progress

def on_file_filter_progress(    self, total_files: int, total_skipped: int, *args: Any, **kwargs: Any,)> None:

Inherited from:

BasePodHook.on_file_filter_progress :

Run the hook when filtering files to track progress.

on_file_process_end

def on_file_process_end(    self,    datasource: FileSystemIterableSource,    file_num: int,    total_num_files: Optional[int],    *args: Any,    **kwargs: Any,)> None:

Inherited from:

BasePodHook.on_file_process_end :

Run the hook when a file processing ends.

on_file_process_start

def on_file_process_start(    self,    datasource: FileSystemIterableSource,    file_num: int,    total_num_files: Optional[int],    *args: Any,    **kwargs: Any,)> None:

Inherited from:

BasePodHook.on_file_process_start :

Run the hook when a file starts to be processed.

on_files_partition

def on_files_partition(    self,    datasource: FileSystemIterableSource,    total_num_files: Optional[int],    batch_size: int,    *args: Any,    **kwargs: Any,)> None:

Inherited from:

BasePodHook.on_files_partition :

Run the hook when we partition files to be processed.

on_pod_init_end

def on_pod_init_end(self, pod: Pod, *args: Any, **kwargs: Any)> None:

Inherited from:

BasePodHook.on_pod_init_end :

Run the hook at the end of pod initialisation.

on_pod_init_error

def on_pod_init_error(    self, pod: Pod, exception: BaseException, *args: Any, **kwargs: Any,)> None:

Inherited from:

BasePodHook.on_pod_init_error :

Run the hook if an uncaught exception is raised during pod initialisation.

Raises

  • NotImplementedError: If the hook is not implemented. This is to ensure that underlying exceptions are not swallowed if the hook is not implemented. This error is caught further up the chain and the underlying exception is raised instead.

on_pod_init_progress

def on_pod_init_progress(    self,    pod: Pod,    message: str,    datasource_name: Optional[str] = None,    base_datasource_names: Optional[list[str]] = None,    pod_db_enabled: Optional[bool] = None,    *args: Any,    **kwargs: Any,)> None:

Inherited from:

BasePodHook.on_pod_init_progress :

Run the hook at key points of pod initialisation.

on_pod_init_start

def on_pod_init_start(    self,    pod: Pod,    pod_name: str,    username: Optional[str] = None,    *args: Any,    **kwargs: Any,)> None:

Inherited from:

BasePodHook.on_pod_init_start :

Run the hook at the very start of pod initialisation.

on_pod_shutdown_end

def on_pod_shutdown_end(self, pod: Pod, *args: Any, **kwargs: Any)> None:

Inherited from:

BasePodHook.on_pod_shutdown_end :

Run the hook at the very end of pod shutdown.

on_pod_shutdown_start

def on_pod_shutdown_start(self, pod: Pod, *args: Any, **kwargs: Any)> None:

Inherited from:

BasePodHook.on_pod_shutdown_start :

Run the hook at the very start of pod shutdown.

on_pod_startup_end

def on_pod_startup_end(self, pod: Pod, *args: Any, **kwargs: Any)> None:

Inherited from:

BasePodHook.on_pod_startup_end :

Run the hook at the end of pod startup.

on_pod_startup_error

def on_pod_startup_error(    self, pod: Pod, exception: BaseException, *args: Any, **kwargs: Any,)> None:

Inherited from:

BasePodHook.on_pod_startup_error :

Run the hook if an uncaught exception is raised during pod startup.

Raises

  • NotImplementedError: If the hook is not implemented. This is to ensure that underlying exceptions are not swallowed if the hook is not implemented. This error is caught further up the chain and the underlying exception is raised instead.

on_pod_startup_start

def on_pod_startup_start(self, pod: Pod, *args: Any, **kwargs: Any)> None:

Inherited from:

BasePodHook.on_pod_startup_start :

Run the hook at the very start of pod startup.

on_pod_task_data_check

def on_pod_task_data_check(    self, task_id: str, message: str, *args: Any, **kwargs: Any,)> None:

Inherited from:

BasePodHook.on_pod_task_data_check :

Run the hook at start of a job request to check that the pod has data.

on_resilience_complete

def on_resilience_complete(    self,    task_id: str,    modeller_username: str,    total_attempted: int,    total_succeeded: int,    total_failed: int,    *args: Any,    **kwargs: Any,)> None:

Inherited from:

BasePodHook.on_resilience_complete :

Run the hook when individual file retry phase is complete.

on_resilience_progress

def on_resilience_progress(    self,    task_id: str,    modeller_username: str,    current_file: int,    total_files: int,    file_name: str,    success: bool,    *args: Any,    **kwargs: Any,)> None:

Inherited from:

BasePodHook.on_resilience_progress :

Run the hook for each individual file retry attempt.

on_resilience_start

def on_resilience_start(    self,    task_id: str,    modeller_username: str,    total_failed_files: int,    *args: Any,    **kwargs: Any,)> None:

Inherited from:

BasePodHook.on_resilience_start :

Run the hook when individual file retry phase begins.

on_task_abort

def on_task_abort(    self,    pod: Pod,    message: str,    task_id: str,    project_id: Optional[str],    *args: Any,    **kwargs: Any,)> None:

Inherited from:

BasePodHook.on_task_abort :

Run the hook when there is an exception in a task.

on_task_end

def on_task_end(self, pod: Pod, task_id: str, *args: Any, **kwargs: Any)> None:

Inherited from:

BasePodHook.on_task_end :

Run the hook when a new task is received at the end.

on_task_error

def on_task_error(    self,    pod: Pod,    exception: BaseException,    task_id: str,    project_id: Optional[str],    *args: Any,    **kwargs: Any,)> None:

Inherited from:

BasePodHook.on_task_error :

Run the hook when there is an exception in a task.

on_task_progress

def on_task_progress(self, task_id: str, message: str, *args: Any, **kwargs: Any)> None:

Inherited from:

BasePodHook.on_task_progress :

Run the hook at key points of the task.

on_task_start

def on_task_start(    self,    pod: Pod,    task_id: str,    project_id: Optional[str],    modeller_username: str,    protocol_name: str,    save_path: Optional[str] = None,    primary_results_path: Optional[str] = None,    dataset_name: Optional[str] = None,    *args: Any,    **kwargs: Any,)> None:

Inherited from:

BasePodHook.on_task_start :

Run the hook when a new task is received at the start.

MultiProcessingMixIn

class MultiProcessingMixIn():

MixIn class for multiprocessing of _get_data.

Variables

  • static image_columns : set[str]
  • static skipped_files : set[str]

Static methods


get_num_workers

def get_num_workers(file_names: Sequence[str])> int:

Gets the number of workers to use for multiprocessing.

Ensures that the number of workers is at least 1 and at most equal to MAX_NUM_MULTIPROCESSING_WORKERS. If the number of files is less than MAX_NUM_MULTIPROCESSING_WORKERS, then we use the number of files as the number of workers. Unless the number of machine cores is also less than MAX_NUM_MULTIPROCESSING_WORKERS, in which case we use the lower of the two.

Arguments

  • file_names: The list of file names to load.

Returns The number of workers to use for multiprocessing.

Methods


use_file_multiprocessing

def use_file_multiprocessing(self, file_names: Sequence[str])> bool:

Check if file multiprocessing should be used.

Returns True if file multiprocessing has been enabled by the environment variable and the number of workers would be greater than 1, otherwise False. There is no need to use file multiprocessing if we are just going to use one worker - it would be slower than just loading the data in the main process.

Returns True if file multiprocessing should be used, otherwise False.