sqlite
A data persistance implementation backed by an SQLite database.
Classes
CacheInfoTableBase
class CacheInfoTableBase():Cache information entry ORM.
Represents the table in the database that corresponds to cache validity
information. In particular, stores the primary key of the cache, file,
which is the canonical path of the file in question, and the time the cache
was last updated for that file.
This is a mix-in designed to be used with the EntityName pattern: https://github.com/sqlalchemy/sqlalchemy/wiki/EntityName
Variables
- static
cache_updated_at : sqlalchemy.orm.base.Mapped[datetime.datetime]
- static
data
- static
file : sqlalchemy.orm.base.Mapped[str]
DataTableBase
class DataTableBase():Cached data entry ORM.
The specific structure of this table will depend on the data being stored in
it (hence why deferred reflection is used); the table is initialised at the
first set() call and its schema determined at that point.
Some things are consistent though; the data must have:
- an integer primary key column (
data_cache_id) - a column of text called
_source_canonical_path(which stores a canonical filepath) and has a foreign key constraint on the cache info table.
This is a mix-in designed to be used with the EntityName pattern: https://github.com/sqlalchemy/sqlalchemy/wiki/EntityName
SQLiteDataPersister
class SQLiteDataPersister(sqlite_path: Path, *args: Any, **kwargs: Any):A data caching implementation that uses an SQLite database.
This implementation maintains three related tables in the SQLite database:
1. cache_info table:
- Tracks metadata for successfully cached files
- Schema: file (TEXT PRIMARY KEY), cache_updated_at (DATETIME)
- Purpose: Determines cache validity by comparing file modification times
2. cached_data table:
- Stores the actual processed data from successful files
- Schema: Dynamically determined from first cached file DataFrame + metadata columns
- Always includes: data_cache_id (INT PRIMARY KEY), _source_canonical_path (TEXT)
- Purpose: Fast retrieval of processed data without re-parsing files
- Relationship: Foreign key to cache_info.file with CASCADE DELETE
- Note: If images are present in the datasource, they will not be cached and the file will have to be processed again to obtain them.
3. skipped_files table:
- Tracks files that were skipped during processing
- Schema: file_path (TEXT PRIMARY KEY), reason_code (INT), skip_time (DATETIME)
- Purpose: Avoid reprocessing files that will inevitably fail
- Reason codes map to specific failure types in FileSkipReason enum
Database Lifecycle:
- cache_info + cached_data tables: Created on first successful file processing
- skipped_files table: Created immediately on SQLiteDataPersister initialization
- All tables support concurrent access via SQLAlchemy sessions and optional locking
Performance Benefits:
- Data cache: Eliminates re-parsing of files when only tabular data is needed
- Skip tracking: Eliminates re-parsing of incompatible files
Skip Tracking Methods:
is_file_skipped(): Check if a file was previously skippedmark_file_skipped(): Mark a file as skipped with specific reasonget_all_skipped_files(): Get detailed report of all skipped files
Arguments
sqlite_path: Path to the SQLite database file *args, **kwargs: Additional arguments passed to DataPersister
Ancestors
Variables
db_prepped : bool- Whether the database has been fully initialised.
Static methods
prep_data_for_caching
def prep_data_for_caching( data: pd.DataFrame, image_cols: Optional[Collection[str]] = None,) ‑> pd.DataFrame:Inherited from:
DataPersister.prep_data_for_caching :
Prepares data ready for caching.
This involves removing/replacing things that aren't supposed to be cached or that it makes no sense to cache, such as image data or file paths that won't be relevant except for when the files are actually being used.
Does not mutate input dataframe.
Methods
bulk_get
def bulk_get(self, files: Sequence[Union[str, Path]]) ‑> BulkResult:Inherited from:
Get the persisted data for several files.
Returns only misses if no data has been persisted, if it is out of date, or an error was otherwise encountered.
bulk_set
def bulk_set( self, data: pd.DataFrame, original_file_col: str = '_original_filename',) ‑> None:Inherited from:
Bulk set a bunch of cache entries from a dataframe.
The dataframe must indicate the original file that each row is associated
with. This is the _original_filename column by default.
clear_cache_file
def clear_cache_file(self) ‑> CacheClearResult:Inherited from:
DataPersister.clear_cache_file :
Delete the cache storage completely.
Returns Dictionary with results of the cache clearing operation.
get
def get(self, file: Union[str, Path]) ‑> Optional[pd.DataFrame]:Inherited from:
Get the persisted data for a given file.
Returns None if no data has been persisted, if it is out of date, or an error was otherwise encountered.
get_all_cached_file_paths
def get_all_cached_file_paths(self) ‑> list[str]:Inherited from:
DataPersister.get_all_cached_file_paths :
Get list of all cached file paths.
Returns List of canonical file paths (as strings) that have entries in the cache.
get_all_cached_files
def get_all_cached_files(self) ‑> list[str]:Get all file paths currently stored in the cache.
Returns A list of canonical file paths (as strings) that have entries in the cache. Returns an empty list if the database hasn't been initialized yet.
get_all_skipped_files
def get_all_skipped_files(self) ‑> list[str]:Inherited from:
DataPersister.get_all_skipped_files :
Get list of all skipped file paths.
Returns List of file paths that have been marked as skipped.
get_cached_distinct_values
def get_cached_distinct_values( self, columns: Sequence[str], file_paths: Optional[Sequence[Union[str, Path]]] = None,) ‑> dict[str, list[Any]]:Inherited from:
DataPersister.get_cached_distinct_values :
Get distinct values for columns from cache, optionally scoped to files.
get_cached_dtype_sample
def get_cached_dtype_sample( self, file_paths: Optional[Sequence[Union[str, Path]]] = None, limit: int = 100,) ‑> pd.DataFrame:Inherited from:
DataPersister.get_cached_dtype_sample :
Get a bounded cache sample for dtype reconciliation.
get_cached_row_count
def get_cached_row_count( self, file_paths: Optional[Sequence[Union[str, Path]]] = None,) ‑> int:Inherited from:
DataPersister.get_cached_row_count :
Get row count from cached data, optionally scoped to selected files.
get_cached_table_columns
def get_cached_table_columns(self) ‑> list[str]:Inherited from:
DataPersister.get_cached_table_columns :
Get all column names currently present in cached data storage.
Returns an empty list if cache is not initialised or an error occurs.
get_column_for_id
def get_column_for_id( self, id_value: str, id_column: str, target_column: str,) ‑> list[Any]:Inherited from:
DataPersister.get_column_for_id :
Get all values of a target column for rows matching a given ID.
Queries the cached data for all entries where id_column equals
id_value and returns the corresponding values from
target_column.
Arguments
id_value: The ID value to match against.id_column: The name of the column containing IDs to filter on.target_column: The name of the column whose values should be returned.
Returns
A list of values from target_column for all matching rows.
Returns an empty list if no matches are found, the cache is not
initialised, or an error occurs.
get_column_values_for_files
def get_column_values_for_files( self, file_paths: Sequence[Union[str, Path]], columns: Sequence[str],) ‑> dict[str, dict[str, Any]]:Inherited from:
DataPersister.get_column_values_for_files :
Get specific column values for multiple files via targeted queries.
Retrieves only the requested columns from the cache for the given
files, avoiding loading full rows into DataFrames. This is
significantly more efficient than bulk_get when only a subset of
columns is needed (e.g. during filtering).
Arguments
file_paths: The file paths to query.columns: The column names to retrieve from the cached data.
Returns
A nested dict mapping file_path -> {column_name -> value}.
Files not found in the cache are omitted from the result.
Returns an empty dict if the cache is not initialised or an
error occurs.
get_skip_reason_summary
def get_skip_reason_summary(self) ‑> pandas.core.frame.DataFrame:Inherited from:
DataPersister.get_skip_reason_summary :
Get aggregate statistics of skip reasons.
Returns DataFrame with columns: reason_code, reason_description, file_count
is_file_skipped
def is_file_skipped(self, file: Union[str, Path]) ‑> bool:Inherited from:
DataPersister.is_file_skipped :
Check if a file has been previously skipped.
Arguments
file: The file path to check.
Returns True if the file has been marked as skipped, False otherwise.
mark_file_skipped
def mark_file_skipped(self, file: Union[str, Path], reason: FileSkipReason) ‑> None:Inherited from:
DataPersister.mark_file_skipped :
Mark a file as skipped with the given reason.
Wraps the underlying _mark_file_skipped implementation with error
handling so that a failure to persist the skip record (e.g. a transient
OS/network error) does not propagate up and crash the caller.
Arguments
file: The file path that was skipped.reason: The reason why the file was skipped.
set
def set(self, file: Union[str, Path], data: pd.DataFrame) ‑> None:Inherited from:
Set the persisted data for a given file.
If existing data is already set, it will be overwritten.
The data should only be the data that is related to that file.
touch
def touch(self, file_paths: Optional[Sequence[Union[str, Path]]] = None) ‑> None:Inherited from:
Mark the given cached entries as recently validated.
This signals to the cache that the entries for the given files are still current and should not be considered stale. The concrete effect depends on the implementation.
Files not present in the cache are silently ignored.
unset
def unset(self, file: Union[str, Path]) ‑> None:Deletes the persisted data for the given file.
SkippedFilesTableBase
class SkippedFilesTableBase():Skipped files tracking table ORM.
Tracks files that have been skipped during processing to avoid reprocessing them.
Variables
- static
file_path : sqlalchemy.orm.base.Mapped[str]
- static
reason_code : sqlalchemy.orm.base.Mapped[int]
- static
skip_time : sqlalchemy.orm.base.Mapped[datetime.datetime]