Skip to main content

telemetry

Datadog logging handlers for sending logs to Datadog's API.

This module provides handlers that integrate with Python's logging framework to send logs to Datadog, with automatic batching and size management.

For file-related skip events in Datadog: set config.settings.enable_skipped_file_telemetry True and provide dd_client_token and dd_site. Skip payloads are sanitized (UTF-8 safe) before sending.

Module

Functions

flush_datadog_metrics

def flush_datadog_metrics()> None:

Flush the Datadog metrics buffer.

Should be called after each task completes, mirroring flush_datadog_telemetry.

flush_datadog_telemetry

def flush_datadog_telemetry()> None:

Flush the Datadog telemetry buffer.

This should be called to ensure all buffered logs are sent to Datadog.

log_execution_time

def log_execution_time(func: ~_F)> ~_F:

Decorator that logs function execution time to Datadog telemetry.

Supports both sync and async callables. Only active when config.settings.enable_execution_time_telemetry is True.

Arguments

  • func: The function to wrap.

Returns The wrapped function.

setup_datadog_metrics

def setup_datadog_metrics(    dd_client_token: Optional[str] = None,    dd_site: Optional[str] = None,    metrics: Optional[dict[str, datadog_api_client.v2.model.metric_intake_type.MetricIntakeType]] = None,    hostname: Optional[str] = None,    tags: Optional[list[str]] = None,)> None:

Setup Datadog metrics if credentials are available.

Mirrors setup_datadog_telemetry but routes to the Datadog Metrics API (/api/v2/series) rather than the Logs API. Must be called alongside setup_datadog_telemetry at pod startup.

This function is idempotent - calling it multiple times is safe.

Arguments

  • dd_client_token: The Datadog API key.
  • dd_site: The Datadog site (e.g. 'datadoghq.com', 'datadoghq.eu').
  • metrics: Pre-registered metric names mapped to their intake type. Metrics emitted via metrics_logger that are not in this dict are auto-registered as GAUGE on first use.
  • hostname: The host name attached to every metric series. Defaults to the system hostname.
  • tags: Static tags applied to every metric series (e.g. ['env:prod']).

Returns None

setup_datadog_telemetry

def setup_datadog_telemetry(    dd_client_token: Optional[str] = None,    dd_site: Optional[str] = None,    service: str = 'pod',    hostname: Optional[str] = None,    tags: Optional[list[str]] = None,    log_level: str = 'INFO',)> None:

Setup Datadog telemetry logging if credentials are available.

If credentials are not provided, the telemetry logger will exist but have no handlers, meaning all telemetry logs will be silently dropped.

This function is idempotent - calling it multiple times is safe.

Arguments

  • dd_client_token: The Datadog client token.
  • dd_site: The Datadog site to use (e.g., 'datadoghq.com', 'datadoghq.eu').
  • service: The service to use for the logs.
  • hostname: The hostname to use for the logs. Defaults to system hostname.
  • tags: The tags to use for the logs.
  • log_level: The log level for the Datadog handler (e.g., 'INFO', 'DEBUG').

Returns None

shutdown_datadog_metrics

def shutdown_datadog_metrics()> None:

Shutdown Datadog metrics and flush any pending series.

Should be called at pod shutdown, mirroring shutdown_datadog_telemetry.

shutdown_datadog_telemetry

def shutdown_datadog_telemetry()> None:

Shutdown Datadog telemetry logging and flush any pending logs.

This should be called during application shutdown to ensure all buffered logs are sent to Datadog.

Classes

DatadogLogsHandler

class DatadogLogsHandler(    api_instance: datadog_api_client.v2.api.logs_api.LogsApi,    source: str,    hostname: str,    service: str,    tags: Optional[list[str]] = None,    capacity: int = 1000,):

A MemoryHandler that sends logs to Datadog.

This handler automatically flushes when the buffer approaches a 5MB limit. These numbers are taken from Datadog's Logs API documentation: https://docs.datadoghq.com/api/latest/logs/

We piggyback off Python's MemoryHandler class since we do not want to implement our own buffer management system, including what happens when there are records left in the buffer when the handler is closed.

Initialize the DatadogMemoryHandler.

Arguments

  • api_instance: The Datadog API instance.
  • source: The source of the logs.
  • hostname: The hostname of the logs.
  • service: The service of the logs.
  • tags: The tags of the logs.
  • capacity: The capacity of the buffer, defaulted to 1000 according to Datadog's documentation.

Variables

  • static BUFFER_PERCENTAGE
  • static MAX_BUFFER_SIZE

Methods


emit

def emit(self, record: logging.LogRecord)> None:

Emit a record to the buffer.

Note that we immediately build the HTTPLogItem object and add it to a separate buffer, rather than waiting for the flush operation to do so.

Arguments

  • record: The log record.

flush

def flush(self)> None:

Flush the buffer by sending all records to Datadog in a single request.

Override the parent's flush method to flush records instead of calling emit() per record.

shouldFlush

def shouldFlush(self, record: logging.LogRecord, item_size: int = 0)> bool:

Determine if we should flush the buffer.

Arguments

  • record: The log record.
  • item_size: The size of the item to add to the buffer.

Returns True if we should flush the buffer, False otherwise.

DatadogMetricsHandler

class DatadogMetricsHandler(    api_instance: datadog_api_client.v2.api.metrics_api.MetricsApi,    metrics: dict[str, datadog_api_client.v2.model.metric_intake_type.MetricIntakeType],    hostname: str,    tags: Optional[list[str]] = None,):

A logging handler that sends custom metrics to Datadog's Metrics API.

Keeps per-metric state as dict[metric_name, list[MetricPoint]]. Each metric's list is monitored independently and flushed as a single MetricSeries when it approaches 90% of the 5 MB uncompressed limit.

Call sites emit via::

metrics_logger.info(metric_name, timestamp, value)

where metric_name maps to record.msg and (timestamp, value) map to record.args. An optional metric_type attribute can be attached via extra={"metric_type": MetricIntakeType.COUNT}; unknown metrics default to GAUGE.

See: https://docs.datadoghq.com/api/latest/metrics/

Initialise the handler.

Arguments

  • api_instance: A configured Datadog MetricsApi instance.
  • metrics: Pre-registered metric names mapped to their intake type. Any metric name emitted that is not in this dict is auto-registered as GAUGE on first use.
  • hostname: Attached to every MetricSeries via resources so metrics are filterable by host in Datadog.
  • tags: Static tags applied to every series (e.g. ['env:prod']).

Variables

  • static MAX_SERIES_SIZE

Methods


close

def close(self)> None:

Flush and close the handler.

emit

def emit(self, record: logging.LogRecord)> None:

Buffer a metric point.

Expects record.msg to be the metric name and record.args to be a two-element tuple of (timestamp: int, value: float). Per-call tags can be supplied via extra={"tags": [...]}. Records that do not match this shape are silently dropped.

Each unique (metric_name, tags) combination is buffered independently and flushed as its own MetricSeries when it approaches the size limit.

Arguments

  • record: The log record.

flush

def flush(self)> None:

Flush all series buffers.