Skip to main content

sqlite

A data persistance implementation backed by an SQLite database.

Classes

CacheInfoTableBase

class CacheInfoTableBase():

Cache information entry ORM.

Represents the table in the database that corresponds to cache validity information. In particular, stores the primary key of the cache, file, which is the canonical path of the file in question, and the time the cache was last updated for that file.

This is a mix-in designed to be used with the EntityName pattern: https://github.com/sqlalchemy/sqlalchemy/wiki/EntityName

Variables

  • static data
  • static file : sqlalchemy.orm.base.Mapped[str]

DataTableBase

class DataTableBase():

Cached data entry ORM.

The specific structure of this table will depend on the data being stored in it (hence why deferred reflection is used); the table is initialised at the first set() call and its schema determined at that point.

Some things are consistent though; the data must have:

  • an integer primary key column (data_cache_id)
  • a column of text called _source_canonical_path (which stores a canonical filepath) and has a foreign key constraint on the cache info table.

This is a mix-in designed to be used with the EntityName pattern: https://github.com/sqlalchemy/sqlalchemy/wiki/EntityName

Variables

  • static cache_info
  • static data_cache_id : sqlalchemy.orm.base.Mapped[int]

SQLiteDataPersister

class SQLiteDataPersister(sqlite_path: Path, *args: Any, **kwargs: Any):

A data caching implementation that uses an SQLite database.

This implementation maintains three related tables in the SQLite database:

1. cache_info table:

  • Tracks metadata for successfully cached files
  • Schema: file (TEXT PRIMARY KEY), cache_updated_at (DATETIME)
  • Purpose: Determines cache validity by comparing file modification times

2. cached_data table:

  • Stores the actual processed data from successful files
  • Schema: Dynamically determined from first cached file DataFrame + metadata columns
  • Always includes: data_cache_id (INT PRIMARY KEY), _source_canonical_path (TEXT)
  • Purpose: Fast retrieval of processed data without re-parsing files
  • Relationship: Foreign key to cache_info.file with CASCADE DELETE
  • Note: If images are present in the datasource, they will not be cached and the file will have to be processed again to obtain them.

3. skipped_files table:

  • Tracks files that were skipped during processing
  • Schema: file_path (TEXT PRIMARY KEY), reason_code (INT), skip_time (DATETIME)
  • Purpose: Avoid reprocessing files that will inevitably fail
  • Reason codes map to specific failure types in FileSkipReason enum

Database Lifecycle:

  • cache_info + cached_data tables: Created on first successful file processing
  • skipped_files table: Created immediately on SQLiteDataPersister initialization
  • All tables support concurrent access via SQLAlchemy sessions and optional locking

Performance Benefits:

  • Data cache: Eliminates re-parsing of files when only tabular data is needed
  • Skip tracking: Eliminates re-parsing of incompatible files

Skip Tracking Methods:

  • is_file_skipped(): Check if a file was previously skipped
  • mark_file_skipped(): Mark a file as skipped with specific reason
  • get_all_skipped_files(): Get detailed report of all skipped files

Arguments

  • sqlite_path: Path to the SQLite database file *args, **kwargs: Additional arguments passed to DataPersister

Variables

  • db_prepped : bool - Whether the database has been fully initialised.

Static methods


prep_data_for_caching

def prep_data_for_caching(    data: pd.DataFrame, image_cols: Optional[Collection[str]] = None,)> pd.DataFrame:

Inherited from:

DataPersister.prep_data_for_caching :

Prepares data ready for caching.

This involves removing/replacing things that aren't supposed to be cached or that it makes no sense to cache, such as image data or file paths that won't be relevant except for when the files are actually being used.

Does not mutate input dataframe.

Methods


bulk_get

def bulk_get(self, files: Sequence[Union[str, Path]])> BulkResult:

Inherited from:

DataPersister.bulk_get :

Get the persisted data for several files.

Returns only misses if no data has been persisted, if it is out of date, or an error was otherwise encountered.

bulk_set

def bulk_set(    self, data: pd.DataFrame, original_file_col: str = '_original_filename',)> None:

Inherited from:

DataPersister.bulk_set :

Bulk set a bunch of cache entries from a dataframe.

The dataframe must indicate the original file that each row is associated with. This is the _original_filename column by default.

clear_cache_file

def clear_cache_file(self)> CacheClearResult:

Inherited from:

DataPersister.clear_cache_file :

Delete the cache storage completely.

Returns Dictionary with results of the cache clearing operation.

get

def get(self, file: Union[str, Path])> Optional[pd.DataFrame]:

Inherited from:

DataPersister.get :

Get the persisted data for a given file.

Returns None if no data has been persisted, if it is out of date, or an error was otherwise encountered.

get_all_cached_file_paths

def get_all_cached_file_paths(self)> list[str]:

Inherited from:

DataPersister.get_all_cached_file_paths :

Get list of all cached file paths.

Returns List of canonical file paths (as strings) that have entries in the cache.

get_all_cached_files

def get_all_cached_files(self)> list[str]:

Get all file paths currently stored in the cache.

Returns A list of canonical file paths (as strings) that have entries in the cache. Returns an empty list if the database hasn't been initialized yet.

get_all_skipped_files

def get_all_skipped_files(self)> list[str]:

Inherited from:

DataPersister.get_all_skipped_files :

Get list of all skipped file paths.

Returns List of file paths that have been marked as skipped.

get_cached_distinct_values

def get_cached_distinct_values(    self, columns: Sequence[str], file_paths: Optional[Sequence[Union[str, Path]]] = None,)> dict[str, list[Any]]:

Inherited from:

DataPersister.get_cached_distinct_values :

Get distinct values for columns from cache, optionally scoped to files.

get_cached_dtype_sample

def get_cached_dtype_sample(    self, file_paths: Optional[Sequence[Union[str, Path]]] = None, limit: int = 100,)> pd.DataFrame:

Inherited from:

DataPersister.get_cached_dtype_sample :

Get a bounded cache sample for dtype reconciliation.

get_cached_row_count

def get_cached_row_count(    self, file_paths: Optional[Sequence[Union[str, Path]]] = None,)> int:

Inherited from:

DataPersister.get_cached_row_count :

Get row count from cached data, optionally scoped to selected files.

get_cached_table_columns

def get_cached_table_columns(self)> list[str]:

Inherited from:

DataPersister.get_cached_table_columns :

Get all column names currently present in cached data storage.

Returns an empty list if cache is not initialised or an error occurs.

get_column_for_id

def get_column_for_id(    self, id_value: str, id_column: str, target_column: str,)> list[Any]:

Inherited from:

DataPersister.get_column_for_id :

Get all values of a target column for rows matching a given ID.

Queries the cached data for all entries where id_column equals id_value and returns the corresponding values from target_column.

Arguments

  • id_value: The ID value to match against.
  • id_column: The name of the column containing IDs to filter on.
  • target_column: The name of the column whose values should be returned.

Returns A list of values from target_column for all matching rows. Returns an empty list if no matches are found, the cache is not initialised, or an error occurs.

get_column_values_for_files

def get_column_values_for_files(    self, file_paths: Sequence[Union[str, Path]], columns: Sequence[str],)> dict[str, dict[str, Any]]:

Inherited from:

DataPersister.get_column_values_for_files :

Get specific column values for multiple files via targeted queries.

Retrieves only the requested columns from the cache for the given files, avoiding loading full rows into DataFrames. This is significantly more efficient than bulk_get when only a subset of columns is needed (e.g. during filtering).

Arguments

  • file_paths: The file paths to query.
  • columns: The column names to retrieve from the cached data.

Returns A nested dict mapping file_path -> {column_name -> value}. Files not found in the cache are omitted from the result. Returns an empty dict if the cache is not initialised or an error occurs.

get_skip_reason_summary

def get_skip_reason_summary(self)> pandas.core.frame.DataFrame:

Inherited from:

DataPersister.get_skip_reason_summary :

Get aggregate statistics of skip reasons.

Returns DataFrame with columns: reason_code, reason_description, file_count

is_file_skipped

def is_file_skipped(self, file: Union[str, Path])> bool:

Inherited from:

DataPersister.is_file_skipped :

Check if a file has been previously skipped.

Arguments

  • file: The file path to check.

Returns True if the file has been marked as skipped, False otherwise.

mark_file_skipped

def mark_file_skipped(self, file: Union[str, Path], reason: FileSkipReason)> None:

Inherited from:

DataPersister.mark_file_skipped :

Mark a file as skipped with the given reason.

Wraps the underlying _mark_file_skipped implementation with error handling so that a failure to persist the skip record (e.g. a transient OS/network error) does not propagate up and crash the caller.

Arguments

  • file: The file path that was skipped.
  • reason: The reason why the file was skipped.

set

def set(self, file: Union[str, Path], data: pd.DataFrame)> None:

Inherited from:

DataPersister.set :

Set the persisted data for a given file.

If existing data is already set, it will be overwritten.

The data should only be the data that is related to that file.

touch

def touch(self, file_paths: Optional[Sequence[Union[str, Path]]] = None)> None:

Inherited from:

DataPersister.touch :

Mark the given cached entries as recently validated.

This signals to the cache that the entries for the given files are still current and should not be considered stale. The concrete effect depends on the implementation.

Files not present in the cache are silently ignored.

unset

def unset(self, file: Union[str, Path])> None:

Deletes the persisted data for the given file.

SkippedFilesTableBase

class SkippedFilesTableBase():

Skipped files tracking table ORM.

Tracks files that have been skipped during processing to avoid reprocessing them.

Variables

  • static file_path : sqlalchemy.orm.base.Mapped[str]
  • static reason_code : sqlalchemy.orm.base.Mapped[int]