telemetry
Datadog logging handlers for sending logs to Datadog's API.
This module provides handlers that integrate with Python's logging framework to send logs to Datadog, with automatic batching and size management.
For file-related skip events in Datadog: set config.settings.enable_skipped_file_telemetry True and provide dd_client_token and dd_site. Skip payloads are sanitized (UTF-8 safe) before sending.
Module
Functions
flush_datadog_metrics
def flush_datadog_metrics() ‑> None:Flush the Datadog metrics buffer.
Should be called after each task completes, mirroring flush_datadog_telemetry.
flush_datadog_telemetry
def flush_datadog_telemetry() ‑> None:Flush the Datadog telemetry buffer.
This should be called to ensure all buffered logs are sent to Datadog.
log_execution_time
def log_execution_time(func: ~_F) ‑> ~_F:Decorator that logs function execution time to Datadog telemetry.
Supports both sync and async callables. Only active when config.settings.enable_execution_time_telemetry is True.
Arguments
func: The function to wrap.
Returns The wrapped function.
setup_datadog_metrics
def setup_datadog_metrics( dd_client_token: Optional[str] = None, dd_site: Optional[str] = None, metrics: Optional[dict[str, datadog_api_client.v2.model.metric_intake_type.MetricIntakeType]] = None, hostname: Optional[str] = None, tags: Optional[list[str]] = None,) ‑> None:Setup Datadog metrics if credentials are available.
Mirrors setup_datadog_telemetry but routes to the Datadog Metrics API
(/api/v2/series) rather than the Logs API. Must be called alongside
setup_datadog_telemetry at pod startup.
This function is idempotent - calling it multiple times is safe.
Arguments
dd_client_token: The Datadog API key.dd_site: The Datadog site (e.g.'datadoghq.com','datadoghq.eu').metrics: Pre-registered metric names mapped to their intake type. Metrics emitted viametrics_loggerthat are not in this dict are auto-registered asGAUGEon first use.hostname: The host name attached to every metric series. Defaults to the system hostname.tags: Static tags applied to every metric series (e.g.['env:prod']).
Returns None
setup_datadog_telemetry
def setup_datadog_telemetry( dd_client_token: Optional[str] = None, dd_site: Optional[str] = None, service: str = 'pod', hostname: Optional[str] = None, tags: Optional[list[str]] = None, log_level: str = 'INFO',) ‑> None:Setup Datadog telemetry logging if credentials are available.
If credentials are not provided, the telemetry logger will exist but have no handlers, meaning all telemetry logs will be silently dropped.
This function is idempotent - calling it multiple times is safe.
Arguments
dd_client_token: The Datadog client token.dd_site: The Datadog site to use (e.g., 'datadoghq.com', 'datadoghq.eu').service: The service to use for the logs.hostname: The hostname to use for the logs. Defaults to system hostname.tags: The tags to use for the logs.log_level: The log level for the Datadog handler (e.g., 'INFO', 'DEBUG').
Returns None
shutdown_datadog_metrics
def shutdown_datadog_metrics() ‑> None:Shutdown Datadog metrics and flush any pending series.
Should be called at pod shutdown, mirroring shutdown_datadog_telemetry.
shutdown_datadog_telemetry
def shutdown_datadog_telemetry() ‑> None:Shutdown Datadog telemetry logging and flush any pending logs.
This should be called during application shutdown to ensure all buffered logs are sent to Datadog.
Classes
DatadogLogsHandler
class DatadogLogsHandler( api_instance: datadog_api_client.v2.api.logs_api.LogsApi, source: str, hostname: str, service: str, tags: Optional[list[str]] = None, capacity: int = 1000,):A MemoryHandler that sends logs to Datadog.
This handler automatically flushes when the buffer approaches a 5MB limit. These numbers are taken from Datadog's Logs API documentation: https://docs.datadoghq.com/api/latest/logs/
We piggyback off Python's MemoryHandler class since we do not want to implement our own buffer management system, including what happens when there are records left in the buffer when the handler is closed.
Initialize the DatadogMemoryHandler.
Arguments
api_instance: The Datadog API instance.source: The source of the logs.hostname: The hostname of the logs.service: The service of the logs.tags: The tags of the logs.capacity: The capacity of the buffer, defaulted to 1000 according to Datadog's documentation.
Methods
emit
def emit(self, record: logging.LogRecord) ‑> None:Emit a record to the buffer.
Note that we immediately build the HTTPLogItem object and add it to a separate buffer, rather than waiting for the flush operation to do so.
Arguments
record: The log record.
flush
def flush(self) ‑> None:Flush the buffer by sending all records to Datadog in a single request.
Override the parent's flush method to flush records instead of calling emit() per record.
shouldFlush
def shouldFlush(self, record: logging.LogRecord, item_size: int = 0) ‑> bool:Determine if we should flush the buffer.
Arguments
record: The log record.item_size: The size of the item to add to the buffer.
Returns True if we should flush the buffer, False otherwise.
DatadogMetricsHandler
class DatadogMetricsHandler( api_instance: datadog_api_client.v2.api.metrics_api.MetricsApi, metrics: dict[str, datadog_api_client.v2.model.metric_intake_type.MetricIntakeType], hostname: str, tags: Optional[list[str]] = None,):A logging handler that sends custom metrics to Datadog's Metrics API.
Keeps per-metric state as dict[metric_name, list[MetricPoint]].
Each metric's list is monitored independently and flushed as a single
MetricSeries when it approaches 90% of the 5 MB uncompressed limit.
Call sites emit via::
metrics_logger.info(metric_name, timestamp, value)
where metric_name maps to record.msg and (timestamp, value)
map to record.args. An optional metric_type attribute can be
attached via extra={"metric_type": MetricIntakeType.COUNT}; unknown
metrics default to GAUGE.
See: https://docs.datadoghq.com/api/latest/metrics/
Initialise the handler.
Arguments
api_instance: A configured DatadogMetricsApiinstance.metrics: Pre-registered metric names mapped to their intake type. Any metric name emitted that is not in this dict is auto-registered asGAUGEon first use.hostname: Attached to everyMetricSeriesviaresourcesso metrics are filterable by host in Datadog.tags: Static tags applied to every series (e.g.['env:prod']).
Ancestors
Variables
- static
MAX_SERIES_SIZE
Methods
close
def close(self) ‑> None:Flush and close the handler.
emit
def emit(self, record: logging.LogRecord) ‑> None:Buffer a metric point.
Expects record.msg to be the metric name and record.args to be
a two-element tuple of (timestamp: int, value: float). Per-call tags
can be supplied via extra={"tags": [...]}. Records that do not match
this shape are silently dropped.
Each unique (metric_name, tags) combination is buffered
independently and flushed as its own MetricSeries when it
approaches the size limit.
Arguments
record: The log record.
flush
def flush(self) ‑> None:Flush all series buffers.