System Design/misc/Distributed Metrics Logging & Aggregation

Distributed Metrics Logging & Aggregation

HARD12 minmetricsloggingstreamingkafkaflinkparquettimeseriesdata-warehouseelasticsearchetl
Asked at: Misc

Design a robust pipeline to ingest, enrich, aggregate, store, and query high-volume distributed metrics, logs, structured and unstructured events for analysis and playback.

1. The Question

Design a scalable distributed system that ingests high-volume telemetry from many application servers (text logs, structured objects, time-series metrics, and unstructured payloads), persists raw events durably, supports stream enrichment and windowed aggregation, provides fast time-series queries for metrics, full-text search for logs, and analytic queries over historical data (via parquet/data warehouse). The system should support replay, schema evolution, retention/deletion, and efficient bulk deletes.

2. Clarifying Questions

  • What are the main read patterns and SLAs? (real-time dashboards, ad-hoc analysis, debug searches, long-term analytics)
  • Expected peak events/sec, average event size, and retention period?
  • Per-tenant or per-service isolation requirements? Multi-tenant?
  • Is ordering per source required? Exactly-once semantics?
  • Compliance needs (GDPR, right-to-be-forgotten) and retention policies?
  • Do we need live alerting on incoming streams or only post-aggregation analytics?

3. Requirements

Functional:

  • Ingest logs, structured events, metrics, and opaque payloads from many servers.
  • Durable, replicated ingestion with replay capability.
  • Stream enrichment (join against reference tables) and windowed aggregations (tumbling/hopping/sliding).
  • Persist aggregated metrics to a time-series store; persist logs for full-text search; persist raw/normalized events to a data lake/warehouse.
  • Provide APIs for real-time metrics queries, log search, and batch analytics.

Non-functional:

  • High throughput (millions of events/day, example scale provided below).
  • Low-latency streaming aggregation (seconds to sub-minute for dashboards).
  • Cost-effective long-term storage for analytics (S3 + parquet).
  • Scalability (horizontal), HA and fault tolerance.
  • Schema evolution support and operational observability.

4. Scale Estimates

Example baseline to design for (adjustable):

  • Peak ingest: 100k events/sec across sources.
  • Average event size: 1 KB (logs/unstructured), structured smaller via protobuf.
  • Daily data volume: ~8.6 TB/day (100k * 1 KB * 86,400)
  • Retention: raw events 30 days, aggregated metrics 13 months, logs 90 days (hot/cold tiers).
  • Cardinality: 1M unique metrics/tags, 100k active sources. Design targets: sustained ingestion and processing for peaks with linear horizontal scaling and partitioning strategies to avoid hot partitions.

5. Data Model

Event types and storage mappings:

  • Raw events (unstructured JSON): topic-per-ingest or topic per-event-type in Kafka. Store raw payload + metadata (source_id, timestamp, schema_id) for replay.

  • Structured events: use compact binary encoding (Protobuf/Avro/Thrift). Each message includes schema_id (from schema registry) and logical fields. In Kafka the serialized payload is stored; downstream systems use schema registry to decode.

  • Metrics (time-series points): capture as {metric_name, tags:{k:v}, value, timestamp, source_id}. Aggregated points become time-series DB rows partitioned by metric and time chunk (hypertable + chunk per day/server).

  • Text logs: store line entries {source_id, run_id, timestamp, level, message, metadata}. For full-text, index message and selected metadata in a distributed inverted index (Elasticsearch/OpenSearch).

Parquet/Column store for analytics: normalized tables (events, user_joins, session, metrics_aggregates) written as partitioned parquet files by time and optional heavy-filter key (e.g., tenant_id or vendor_id). Include file-level statistics for predicate pushdown.

6. API Design

Producer APIs (ingest):

  • HTTP/gRPC producer endpoint or SDK: Accept structured (protobuf) or raw JSON, attach source_id and auth. Producers write to Kafka (via a Kafka producer or Kafka REST/gRPC gateway).
  • Batching & retries: client SDKs should batch and async publish to Kafka.

Schema registry API:

  • Register/lookup schemas, semantic versioning, compatibility checks.

Query APIs (read):

  • Real-time metrics API: timeseries query endpoint supporting range, aggregation (sum, avg, p99), group-by tags, downsampling, pagination, start/end.
  • Logs search API: full-text query with filters (source_id, run_id, time range), tail logs stream endpoint for live debugging.
  • Analytics export API: submit ad-hoc SQL queries to data warehouse or return S3 parquet pointers.

Admin APIs: retention policies, schema management, topic/partition management, replay control (seek/rewind consumer groups).

7. High-Level Architecture

Components:

  1. Producers/SDKs: apps push telemetry via SDK/agent to ingestion gateway.
  2. Ingestion gateway (optional): validates, authenticates, rate-limits, forwards to Kafka.
  3. Kafka (or durable log broker): central durable buffer with partitioning and replication. Topic design: separate topics for logs, structured events, metrics, and CDC streams.
  4. Schema Registry: centralized store for Avro/Proto/Thrift schemas used by producers & consumers.
  5. Stream Processing Layer (Flink or Spark Streaming): stateful consumers that perform enrichment (lookup/cached joins), windowed aggregations (tumbling/hopping/sliding), deduplication, and write outputs. Use RocksDB (or state backend) for local state and caching.
  6. Time-Series Database (TSDB): store metric aggregates and fine-grained series (e.g., Prometheus remote-write compatible or TimescaleDB/InfluxDB) with hypertable/chunking by time+source.
  7. Search Index (Elasticsearch/OpenSearch): index logs and metadata for fast full-text search and run-level retrieval.
  8. Data Lake (S3/HDFS) + Parquet: sink normalized events and aggregated batches into parquet files partitioned by time and key; used by data warehouse or query engines.
  9. Data Warehouse / Query Engine (Snowflake/BigQuery/Redshift/Presto): for ad-hoc analytics and business intelligence.
  10. CDC pipeline: capture DB changes and publish to Kafka for enrichment state updates.
  11. Observability / Monitoring: metrics about ingest rates, consumer lag, processing latency, errors, and SLO alerts.

Data flow: Producers -> Kafka -> Stream processors (enrich & aggregate) -> TSDB / ES / Parquet Sinks -> Query APIs / Data Warehouse.

8. Detailed Design Decisions

Key trade-offs and chosen approaches:

  • Broker choice: Kafka for durability, partitioning, replay. Use topic-per-data-type with partition key by source_id or tenant_id to preserve ordering per-source.

  • Stream engine: Flink for true streaming with low-latency stateful operations and exactly-once semantics (with checkpointing). Spark Structured Streaming is acceptable if micro-batching is preferred.

  • Windowing: implement tumbling/hopping/sliding windows inside stream engine:

    • Tumbling for fixed-interval aggregates.
    • Hopping for overlapping intervals (compose from tumbling panes).
    • Sliding using stateful ordered buffers (linked lists or indexed windows) and eviction logic.
  • Schema & serialization: use Protobuf/Avro with schema registry for compactness and evolvability. Store schema_id in messages.

  • Enrichment: cache reference data in state backend; update via CDC topics or periodic snapshots. Partition caches by the same key used in Kafka partitions to colocate keys and minimize remote lookups.

  • TSDB: use TimescaleDB or a purpose-built TSDB; adopt hypertable+chunk per (source, date) for fast deletes and locality. Partition chunks by time (daily) and source for efficient compaction and drop.

  • Logs index: index hot logs in ES for recent time window. Use rollover indices by time and source, store older logs in S3 cold storage with pointers.

  • Parquet + Data Lake: write parquet files using time- and heavy-filter partitioning; include min/max statistics for predicate pushdown. Use compaction jobs to merge small files.

  • Retention & delete: implement lifecycle policies. For bulk deletes, drop whole chunk/table files (TSDB chunk or parquet partition) when possible to avoid expensive tombstone operations.

  • Exactly-once vs at-least-once: prefer exactly-once for aggregates (use stream engine transactions + Kafka sinks) and at-least-once for logs where duplicates are acceptable and deduplication is complex.

9. Bottlenecks & Scaling

Potential bottlenecks and mitigations:

  • Kafka cluster saturation: monitor broker throughput, increase partitions, add brokers, use compression, and tune producer batching.
  • Partition hot spots: choose partition keys that spread load (hash tenant+source) and shard heavy sources.
  • Stream processing state explosion: use RocksDB with TTL/eviction, partition state across workers, and scale out stateful operators.
  • Network / S3 read latency for analytics: use caching and colocated compute (e.g., cluster with HDFS or use cloud-native warehouses that read from S3 efficiently).
  • Elasticsearch indexing cost: reduce indexed fields, use ingest pipelines for light parsing, and tier older indices to warm/cold nodes.
  • Large-cardinality tags in TSDB: use tag cardinality limits or rollup aggregations and avoid indexing highly dynamic tags directly.
  • Compaction and small files: run periodic compaction tasks to merge parquet files and reduce overhead.

10. Follow-up Questions / Extensions

  • Support multi-region ingest and geo-failover? (Cross-region replication, global Kafka clusters or MirrorMaker)
  • Provide per-tenant quotas and billing based on ingress/egress/storage?
  • Real-time alerting and anomaly detection on streams?
  • Support trace-level correlation (link logs, traces, metrics) using a global trace_id?
  • Add data lineage and auditing for compliance and debugging?
  • Include schema governance and automated compatibility checks?

11. Wrap-up

A robust design centralizes durable ingestion (Kafka), performs low-latency stateful processing (Flink/Spark) for enrichment and windowed aggregation, stores metrics in a TSDB optimized for time-chunked data, indexes logs in a search engine for fast full-text queries, and persists normalized data to a parquet-backed data lake and data warehouse for analytics. Key operational concerns are partitioning, schema management, state scaling, retention and delete strategies, and cost trade-offs between hot query latency and cold storage economics.

Ready to practice this question?

Run a mock system design interview with AI coaching and detailed feedback.