WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
73540a5
add metrics fetching
quinna-h Nov 19, 2025
4b1df94
wip
quinna-h Nov 20, 2025
9c2998c
wip
quinna-h Nov 20, 2025
93c431e
update metrics
quinna-h Nov 21, 2025
e336138
wip
quinna-h Nov 21, 2025
90e5dc0
remove unnecessary file
quinna-h Nov 21, 2025
03e30f1
fix linting
quinna-h Nov 24, 2025
0891a91
Bring rollback, vacuums, blocks_read, and index scans metrics
wantsui Nov 24, 2025
4eaa37f
Bring back commented metrics and add queries for temp files.
wantsui Nov 25, 2025
442125b
refactor out into otel validators
quinna-h Nov 25, 2025
5b0df00
Merge branch 'refactor-postgres-metrics-test-module' into add-missing…
quinna-h Nov 25, 2025
3b7f661
update delay for querying backend
quinna-h Nov 25, 2025
d9cf687
Add in query to trigger buffer metrics
wantsui Nov 25, 2025
ffeee9b
Merge branch 'refactor-postgres-metrics-test-module' into add-missing…
wantsui Nov 25, 2025
77d133f
refactor
quinna-h Nov 25, 2025
18f317b
update test
quinna-h Nov 25, 2025
5601f2a
Merge branch 'refactor-postgres-metrics-test-module' into add-missing…
quinna-h Nov 25, 2025
03c9cf6
linting fix
quinna-h Nov 25, 2025
1970ffc
Merge branch 'main' into add-missing-postgres-metrics
quinna-h Nov 25, 2025
60bcc57
update otel collector image version (#5783)
quinna-h Dec 1, 2025
5af1742
Merge branch 'main' into add-missing-postgres-metrics
quinna-h Dec 1, 2025
b1ffbde
Merge branch 'main' into add-missing-postgres-metrics
quinna-h Dec 1, 2025
8c0df2c
Merge branch 'main' into add-missing-postgres-metrics
quinna-h Dec 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 43 additions & 20 deletions tests/otel_postgres_metrics_e2e/test_postgres_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,10 @@
# Load PostgreSQL metrics specification
# Exclude metrics that require a replica database
_EXCLUDED_POSTGRES_METRICS = {
"postgresql.wal.delay",
"postgresql.wal.age",
"postgresql.replication.data_delay",
"postgresql.wal.lag",
# Background writer metrics (require more sustained activity)
"postgresql.backends",
"postgresql.bgwriter.buffers.allocated",
"postgresql.bgwriter.buffers.writes",
"postgresql.bgwriter.checkpoint.count",
"postgresql.bgwriter.duration",
"postgresql.bgwriter.maxwritten",
"postgresql.blks_hit",
"postgresql.blks_read",
"postgresql.temp.io",
"postgresql.tup_deleted",
"postgresql.tup_fetched",
"postgresql.tup_inserted",
"postgresql.tup_returned",
"postgresql.tup_updated",
"postgresql.function.calls",
"postgresql.wal.delay", # requires replica
"postgresql.wal.age", # requires replica
"postgresql.replication.data_delay", # requires replica
"postgresql.wal.lag", # requires replica
}

postgresql_metrics = OtelMetricsValidator.load_metrics_from_file(
Expand Down Expand Up @@ -128,6 +112,45 @@ def setup_main(self) -> None:
)

r = container.exec_run('psql -U system_tests_user -d system_tests_dbname -c "SELECT 1;"')

# Rollback
r = container.exec_run(
'psql -U system_tests_user -d system_tests_dbname -c "BEGIN; INSERT INTO test_table DEFAULT VALUES; ROLLBACK;"'
)

# Vacuums and forces a read block (FULL activates the blocks_read metric)
r = container.exec_run('psql -U system_tests_user -d system_tests_dbname -c "VACUUM FULL test_table;"')
r = container.exec_run('psql -U system_tests_user -d system_tests_dbname -c "VACUUM test_table;"')

# Forces an index scan with the two sets of psql commands
r = container.exec_run(
"psql -U system_tests_user -d system_tests_dbname -c "
'"INSERT INTO test_table DEFAULT VALUES FROM generate_series(1, 800);"'
)

r = container.exec_run(
"psql -U system_tests_user -d system_tests_dbname -c "
'"SET enable_seqscan = off; SET enable_bitmapscan = off; '
'SELECT * FROM test_table WHERE id = 300;"'
)

# Forces temp files for postgresql.temp.io and postgresql.temp_files
r = container.exec_run(
"psql -U system_tests_user -d system_tests_dbname -c "
"\"SET work_mem = '64kB'; "
'SELECT * FROM generate_series(1, 1000000) g ORDER BY g;"'
)

# hit the buffer + max writtern
r = container.exec_run(
'psql -U system_tests_user -d system_tests_dbname -c "'
"CREATE TABLE IF NOT EXISTS bg_test AS "
"SELECT i, md5(random()::text) FROM generate_series(1, 2000000) g(i); "
"UPDATE bg_test SET i = i + 1; "
"UPDATE bg_test SET i = i + 1; "
'SELECT pg_sleep(2);"'
)

logger.info(r.output)

def test_main(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion utils/_context/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1320,7 +1320,7 @@ def __init__(

super().__init__(
name="collector",
image_name="otel/opentelemetry-collector-contrib:0.110.0",
image_name="otel/opentelemetry-collector-contrib:0.137.0",
binary_file_name="otel_collector-image",
command="--config=/etc/otelcol-config.yml",
environment=environment,
Expand Down
56 changes: 28 additions & 28 deletions utils/build/docker/otelcol-config-with-postgres.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,43 +13,43 @@ receivers:
collection_interval: 10s
tls:
insecure: true
# events: # from the opentelemetry-collector-contrib/receiver/postgresqlreceiver/README.md
# db.server.query_sample:
# enabled: true
# db.server.top_query:
# enabled: true
# # query_sample_collection:
# # max_rows_per_query: 100
# # top_query_collection:
# # max_rows_per_query: 100
# # top_n_query: 100
events: # from the opentelemetry-collector-contrib/receiver/postgresqlreceiver/README.md
db.server.query_sample:
enabled: true
db.server.top_query:
enabled: true
query_sample_collection:
max_rows_per_query: 100
top_query_collection:
max_rows_per_query: 100
top_n_query: 100
metrics:
# postgresql.blks_read:
# enabled: true
# postgresql.blks_hit:
# enabled: true
postgresql.blks_read:
enabled: true
postgresql.blks_hit:
enabled: true
postgresql.database.locks:
enabled: true
postgresql.deadlocks:
enabled: true
# postgresql.function.calls:
# enabled: true
postgresql.function.calls:
enabled: true
postgresql.sequential_scans:
enabled: true
# postgresql.temp.io:
# enabled: true
postgresql.temp.io:
enabled: true
postgresql.temp_files:
enabled: true
# postgresql.tup_deleted:
# enabled: true
# postgresql.tup_fetched:
# enabled: true
# postgresql.tup_inserted:
# enabled: true
# postgresql.tup_returned:
# enabled: true
# postgresql.tup_updated:
# enabled: true
postgresql.tup_deleted:
enabled: true
postgresql.tup_fetched:
enabled: true
postgresql.tup_inserted:
enabled: true
postgresql.tup_returned:
enabled: true
postgresql.tup_updated:
enabled: true
postgresql.wal.delay:
enabled: true
postgresql.wal.age:
Expand Down
5 changes: 5 additions & 0 deletions utils/otel_validators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""OpenTelemetry validators for system tests."""

from utils.otel_validators.validator_metrics import OtelMetricsValidator, get_collector_metrics_from_scenario

__all__ = ["OtelMetricsValidator", "get_collector_metrics_from_scenario"]
231 changes: 231 additions & 0 deletions utils/otel_validators/validator_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
"""Utility module for validating OpenTelemetry integration metrics.

This module provides reusable components for testing OTel receiver metrics:
- Loading metric specifications from JSON files
- Retrieving metrics from OTel Collector logs
- Validating metrics against specifications
- Querying metrics from the Datadog backend
"""

import json
import time
from pathlib import Path
from typing import TYPE_CHECKING, Any

from utils import interfaces, logger

if TYPE_CHECKING:
from utils._context._scenarios.otel_collector import OtelCollectorScenario


class OtelMetricsValidator:
"""Base class for validating OTel integration metrics."""

def __init__(self, metrics_spec: dict[str, dict[str, str]]) -> None:
"""Initialize the validator with a metrics specification."""
self.metrics_spec = metrics_spec

@staticmethod
def load_metrics_from_file(
metrics_file: Path, excluded_metrics: set[str] | None = None
) -> dict[str, dict[str, str]]:
"""Load metric specifications from a JSON file, excluding excluded_metrics (if provided).
Return transformed metrics
"""
if not metrics_file.exists():
raise FileNotFoundError(f"Metrics file not found: {metrics_file}")

with open(metrics_file, encoding="utf-8") as f:
metrics = json.load(f)

if excluded_metrics:
return {k: v for k, v in metrics.items() if k not in excluded_metrics}

return metrics

@staticmethod
def get_collector_metrics(collector_log_path: str) -> list[dict[str, Any]]:
"""Retrieve metrics from the OTel Collector's file exporter logs.
Given path to the collector's metrics log file, returns list of metric batch dictionaries
"""
assert Path(collector_log_path).exists(), f"Metrics log file not found: {collector_log_path}"

metrics_batch = []
with open(collector_log_path, "r", encoding="utf-8") as f:
for row in f:
if row.strip():
metrics_batch.append(json.loads(row.strip()))

return metrics_batch

def process_and_validate_metrics(
self, metrics_batch: list[dict[str, Any]]
) -> tuple[set[str], set[str], list[str], list[str]]:
"""Process metrics batch and validate against specifications from backend.
Returns (found_metrics, metrics_dont_match_spec, validation_results, failed_validations)
"""
found_metrics: set[str] = set()
metrics_dont_match_spec: set[str] = set()

for data in metrics_batch:
self._process_metrics_data(data, found_metrics, metrics_dont_match_spec)

validation_results = []
failed_validations = []

# Check that all expected metrics were found
for metric_name in self.metrics_spec:
if metric_name in found_metrics:
result = f"✅ {metric_name}"
validation_results.append(result)
else:
result = f"❌ {metric_name}"
validation_results.append(result)
failed_validations.append(result)

# Add spec mismatches to failures
for spec_mismatch in metrics_dont_match_spec:
failed_validations.append(f"❌ Spec mismatch: {spec_mismatch}")
validation_results.append(f"❌ Spec mismatch: {spec_mismatch}")

return found_metrics, metrics_dont_match_spec, validation_results, failed_validations

def _process_metrics_data(
self, data: dict[str, Any], found_metrics: set[str], metrics_dont_match_spec: set[str]
) -> None:
"""Process top-level metrics data structure."""
if "resourceMetrics" not in data:
return

for resource_metric in data["resourceMetrics"]:
self._process_resource_metric(resource_metric, found_metrics, metrics_dont_match_spec)

def _process_resource_metric(
self, resource_metric: dict[str, Any], found_metrics: set[str], metrics_dont_match_spec: set[str]
) -> None:
"""Process resource-level metrics."""
if "scopeMetrics" not in resource_metric:
return

for scope_metric in resource_metric["scopeMetrics"]:
self._process_scope_metric(scope_metric, found_metrics, metrics_dont_match_spec)

def _process_scope_metric(
self, scope_metric: dict[str, Any], found_metrics: set[str], metrics_dont_match_spec: set[str]
) -> None:
"""Process scope-level metrics."""
if "metrics" not in scope_metric:
return

for metric in scope_metric["metrics"]:
self._process_individual_metric(metric, found_metrics, metrics_dont_match_spec)

def _process_individual_metric(
self, metric: dict[str, Any], found_metrics: set[str], metrics_dont_match_spec: set[str]
) -> None:
"""Process and validate individual metric."""
if "name" not in metric:
return

metric_name = metric["name"]
found_metrics.add(metric_name)

# Skip validation if metric is not in our expected list
if metric_name not in self.metrics_spec:
return

self._validate_metric_specification(metric, metrics_dont_match_spec)

def _validate_metric_specification(self, metric: dict[str, Any], metrics_dont_match_spec: set[str]) -> None:
"""Validate that a metric matches its expected specification."""
metric_name = metric["name"]
description = metric["description"]
gauge_type = "gauge" in metric
sum_type = "sum" in metric

expected_spec = self.metrics_spec[metric_name]
expected_type = expected_spec["data_type"].lower()
expected_description = expected_spec["description"]

# Validate metric type
if expected_type == "sum" and not sum_type:
metrics_dont_match_spec.add(f"{metric_name}: Expected Sum type but got Gauge")
elif expected_type == "gauge" and not gauge_type:
metrics_dont_match_spec.add(f"{metric_name}: Expected Gauge type but got Sum")

# Validate description (sometimes the spec has a period, but the actual logs don't)
if description.rstrip(".") != expected_description.rstrip("."):
metrics_dont_match_spec.add(
f"{metric_name}: Description mismatch - Expected: '{expected_description}', Got: '{description}'"
)

def query_backend_for_metrics(
self,
metric_names: list[str],
query_tags: dict[str, str],
lookback_seconds: int = 300,
retries: int = 3,
initial_delay_s: float = 15.0,
semantic_mode: str = "combined",
) -> tuple[list[str], list[str]]:
"""Query the Datadog backend to validate metrics were received.
Returns (validated_metrics, failed_metrics)
"""
end_time = int(time.time())
start_time = end_time - lookback_seconds

validated_metrics = []
failed_metrics = []

# Build tag string for query
tag_string = ",".join(f"{k}:{v}" for k, v in query_tags.items())

for metric_name in metric_names:
logger.info(f"Looking at metric: {metric_name}")
try:
start_time_ms = start_time * 1000
end_time_ms = end_time * 1000

query_str = f"avg:{metric_name}{{{tag_string}}}"
logger.info(f"Query: {query_str}, time range: {start_time_ms} to {end_time_ms} ({lookback_seconds}s)")

metric_data = interfaces.backend.query_ui_timeseries(
query=query_str,
start=start_time_ms,
end=end_time_ms,
semantic_mode=semantic_mode,
retries=retries,
initial_delay_s=initial_delay_s,
)

if metric_data and metric_data.get("data") and len(metric_data["data"]) > 0:
data_item = metric_data["data"][0]
attributes = data_item.get("attributes", {})

meta_responses = metric_data.get("meta", {}).get("responses", [])
results_warning = meta_responses[0].get("results_warnings") if meta_responses else None
if results_warning:
logger.warning(f"Results warning: {results_warning}")

times = attributes.get("times", [])
values = attributes.get("values", [])

if times and values and len(values) > 0 and len(values[0]) > 0:
validated_metrics.append(metric_name)
else:
failed_metrics.append(f"{metric_name}: No data points found")
else:
failed_metrics.append(f"{metric_name}: No series data returned")

except Exception as e:
failed_metrics.append(f"❌ {metric_name}: Failed to query semantic mode {semantic_mode} - {e!s}")

return validated_metrics, failed_metrics


def get_collector_metrics_from_scenario(scenario: "OtelCollectorScenario") -> list[dict[str, Any]]:
"""Helper function to get metrics from an OtelCollectorScenario.
Returns a list of metric batch dictionaries
"""
collector_log_path = f"{scenario.collector_container.log_folder_path}/logs/metrics.json"
return OtelMetricsValidator.get_collector_metrics(collector_log_path)
Loading