Skip to main content

sql_source

Module containing SQLSource class.

SQLSource class handles loading of data from SQL databases.

Classes

OMOPSource

class OMOPSource(    connection_string: str,    version: str,    read_sql_kwargs: Optional[dict[str, Any]] = None,    **kwargs: Any,):

Data source for connecting to OMOP databases.

The Observational Medical Outcomes Partnership (OMOP) datasource is specifically designed for OMOP Common Data Model databases. The chosen version determines the schema of the database displayed in the Bitfount Hub.

Arguments

  • connection_string: SQLAlchemy connection string to the OMOP database.
  • version: OMOP CDM version. Must be one of "v3.0", "v5.3", or "v5.4".
  • read_sql_kwargs: Additional arguments to be passed to pandas.read_sql. Defaults to None.
  • ****kwargs**: Additional arguments passed to BaseSource.

Ancestors

Variables

  • static SUPPORTED_VERSIONS
  • static has_predefined_schema : bool
  • is_initialised : bool - Checks if BaseSource was initialised.
  • is_task_running : bool - Returns True if a task is running.
  • supports_project_db : bool - Whether the datasource supports the project database.

    Each datasource needs to implement its own methods to define how what its project database table should look like. If the datasource does not implement the methods to get the table creation query and columns, it does not support the projectdatabase.

Methods


add_hook

def add_hook(self, hook: DataSourceHook)> None:

Inherited from:

BaseSource.add_hook :

Add a hook to the datasource.

apply_ignore_cols

def apply_ignore_cols(self, df: pd.DataFrame)> pandas.core.frame.DataFrame:

Inherited from:

BaseSource.apply_ignore_cols :

Apply ignored columns to dataframe, dropping columns as needed.

Returns A copy of the dataframe with ignored columns removed, or the original dataframe if this datasource does not specify any ignore columns.

apply_ignore_cols_iter

def apply_ignore_cols_iter(self, dfs: Iterator[pd.DataFrame])> collections.abc.Iterator:

Inherited from:

BaseSource.apply_ignore_cols_iter :

Apply ignored columns to dataframes from iterator.

apply_modifiers

def apply_modifiers(self, df: pd.DataFrame)> pandas.core.frame.DataFrame:

Inherited from:

BaseSource.apply_modifiers :

Apply column modifiers to the dataframe.

If no modifiers are specified, returns the dataframe unchanged.

get_data

def get_data(    self, data_keys: SingleOrMulti[str], *, use_cache: bool = True, **kwargs: Any,)> Optional[pandas.core.frame.DataFrame]:

Inherited from:

BaseSource.get_data :

Get data corresponding to the provided data key(s).

Can be used to return data for a single data key or for multiple at once. If used for multiple, the order of the output dataframe must match the order of the keys provided.

Arguments

  • data_keys: Key(s) for which to get the data of. These may be things such as file names, UUIDs, etc.
  • use_cache: Whether the cache should be used to retrieve data for these keys. Note that cached data may have some elements, particularly image-related fields such as image data or file paths, replaced with placeholder values when stored in the cache. If datacache is set on the instance, data will be _set in the cache, regardless of this argument.
  • ****kwargs**: Additional keyword arguments.

Returns A dataframe containing the data, ordered to match the order of keys in data_keys, or None if no data for those keys was available.

get_datasource_metrics

def get_datasource_metrics(    self, use_skip_codes: bool = False,)> DatasourceSummaryStats:

Inherited from:

BaseSource.get_datasource_metrics :

Get metadata about this datasource.

This can be used to store information about the datasource that may be useful for debugging or tracking purposes. The metadata will be stored in the project database.

Arguments

  • use_skip_codes: Whether to use the skip reason codes as the keys in the skip_reasons dictionary, rather than the existing reason descriptions.

Returns A dictionary containing metadata about this datasource.

get_project_db_sqlite_columns

def get_project_db_sqlite_columns(self)> list:

Inherited from:

BaseSource.get_project_db_sqlite_columns :

Implement this method to get the required columns.

This is used by the "run on new data only" feature. This is used to add data to the task table in the project database.

get_project_db_sqlite_create_table_query

def get_project_db_sqlite_create_table_query(self)> str:

Inherited from:

BaseSource.get_project_db_sqlite_create_table_query :

Implement this method to return the required columns and types.

This is used by the "run on new data only" feature. This should be in the format that can be used after a "CREATE TABLE" statement and is used to create the task table in the project database.

get_schema

def get_schema(self)> dict:

Get the pre-defined OMOP schema for this datasource's version.

Returns The OMOP schema as a JSON dictionary.

Raises

  • DataSourceError: If the schema file cannot be opened or parsed.
  • FileNotFoundError: If the schema file doesn't exist for this version.
  • ValueError: If the version is not supported.

partition

def partition(    self, iterable: Iterable[_I], partition_size: int = 1,)> collections.abc.Iterable:

Inherited from:

BaseSource.partition :

Takes an iterable and yields partitions of size partition_size.

The final partition may be less than size partition_size due to the variable length of the iterable.

remove_hook

def remove_hook(self, hook: DataSourceHook)> None:

Inherited from:

BaseSource.remove_hook :

Remove a hook from the datasource.

yield_data

def yield_data(    self,    data_keys: Optional[SingleOrMulti[str]] = None,    *,    use_cache: bool = True,    partition_size: Optional[int] = None,    **kwargs: Any,)> collections.abc.Iterator:

Inherited from:

BaseSource.yield_data :

Yields data in batches from this source.

If data_keys is specified, only yield from that subset of the data. Otherwise, iterate through the whole datasource.

Arguments

  • data_keys: An optional list of data keys to use for yielding data. Otherwise, all data in the datasource will be considered. data_keys is always provided when this method is called from the Dataset as part of a task.
  • use_cache: Whether the cache should be used to retrieve data for these data points. Note that cached data may have some elements, particularly image-related fields such as image data or file paths, replaced with placeholder values when stored in the cache. If datacache is set on the instance, data will be _set in the cache, regardless of this argument.
  • partition_size: The number of data elements to load/yield in each iteration. If not provided, defaults to the partition size configured in the datasource.
  • ****kwargs**: Additional keyword arguments.