1. The Question
Design a scalable distributed system that ingests high-volume telemetry from many application servers (text logs, structured objects, time-series metrics, and unstructured payloads), persists raw events durably, supports stream enrichment and windowed aggregation, provides fast time-series queries for metrics, full-text search for logs, and analytic queries over historical data (via parquet/data warehouse). The system should support replay, schema evolution, retention/deletion, and efficient bulk deletes.
2. Clarifying Questions
- What are the main read patterns and SLAs? (real-time dashboards, ad-hoc analysis, debug searches, long-term analytics)
- Expected peak events/sec, average event size, and retention period?
- Per-tenant or per-service isolation requirements? Multi-tenant?
- Is ordering per source required? Exactly-once semantics?
- Compliance needs (GDPR, right-to-be-forgotten) and retention policies?
- Do we need live alerting on incoming streams or only post-aggregation analytics?
3. Requirements
Functional:
- Ingest logs, structured events, metrics, and opaque payloads from many servers.
- Durable, replicated ingestion with replay capability.
- Stream enrichment (join against reference tables) and windowed aggregations (tumbling/hopping/sliding).
- Persist aggregated metrics to a time-series store; persist logs for full-text search; persist raw/normalized events to a data lake/warehouse.
- Provide APIs for real-time metrics queries, log search, and batch analytics.
Non-functional:
- High throughput (millions of events/day, example scale provided below).
- Low-latency streaming aggregation (seconds to sub-minute for dashboards).
- Cost-effective long-term storage for analytics (S3 + parquet).
- Scalability (horizontal), HA and fault tolerance.
- Schema evolution support and operational observability.
4. Scale Estimates
Example baseline to design for (adjustable):
- Peak ingest: 100k events/sec across sources.
- Average event size: 1 KB (logs/unstructured), structured smaller via protobuf.
- Daily data volume: ~8.6 TB/day (100k * 1 KB * 86,400)
- Retention: raw events 30 days, aggregated metrics 13 months, logs 90 days (hot/cold tiers).
- Cardinality: 1M unique metrics/tags, 100k active sources. Design targets: sustained ingestion and processing for peaks with linear horizontal scaling and partitioning strategies to avoid hot partitions.
5. Data Model
Event types and storage mappings:
-
Raw events (unstructured JSON): topic-per-ingest or topic per-event-type in Kafka. Store raw payload + metadata (source_id, timestamp, schema_id) for replay.
-
Structured events: use compact binary encoding (Protobuf/Avro/Thrift). Each message includes schema_id (from schema registry) and logical fields. In Kafka the serialized payload is stored; downstream systems use schema registry to decode.
-
Metrics (time-series points): capture as {metric_name, tags:{k:v}, value, timestamp, source_id}. Aggregated points become time-series DB rows partitioned by metric and time chunk (hypertable + chunk per day/server).
-
Text logs: store line entries {source_id, run_id, timestamp, level, message, metadata}. For full-text, index message and selected metadata in a distributed inverted index (Elasticsearch/OpenSearch).
Parquet/Column store for analytics: normalized tables (events, user_joins, session, metrics_aggregates) written as partitioned parquet files by time and optional heavy-filter key (e.g., tenant_id or vendor_id). Include file-level statistics for predicate pushdown.
6. API Design
Producer APIs (ingest):
- HTTP/gRPC producer endpoint or SDK: Accept structured (protobuf) or raw JSON, attach source_id and auth. Producers write to Kafka (via a Kafka producer or Kafka REST/gRPC gateway).
- Batching & retries: client SDKs should batch and async publish to Kafka.
Schema registry API:
- Register/lookup schemas, semantic versioning, compatibility checks.
Query APIs (read):
- Real-time metrics API: timeseries query endpoint supporting range, aggregation (sum, avg, p99), group-by tags, downsampling, pagination, start/end.
- Logs search API: full-text query with filters (source_id, run_id, time range), tail logs stream endpoint for live debugging.
- Analytics export API: submit ad-hoc SQL queries to data warehouse or return S3 parquet pointers.
Admin APIs: retention policies, schema management, topic/partition management, replay control (seek/rewind consumer groups).
7. High-Level Architecture
Components:
- Producers/SDKs: apps push telemetry via SDK/agent to ingestion gateway.
- Ingestion gateway (optional): validates, authenticates, rate-limits, forwards to Kafka.
- Kafka (or durable log broker): central durable buffer with partitioning and replication. Topic design: separate topics for logs, structured events, metrics, and CDC streams.
- Schema Registry: centralized store for Avro/Proto/Thrift schemas used by producers & consumers.
- Stream Processing Layer (Flink or Spark Streaming): stateful consumers that perform enrichment (lookup/cached joins), windowed aggregations (tumbling/hopping/sliding), deduplication, and write outputs. Use RocksDB (or state backend) for local state and caching.
- Time-Series Database (TSDB): store metric aggregates and fine-grained series (e.g., Prometheus remote-write compatible or TimescaleDB/InfluxDB) with hypertable/chunking by time+source.
- Search Index (Elasticsearch/OpenSearch): index logs and metadata for fast full-text search and run-level retrieval.
- Data Lake (S3/HDFS) + Parquet: sink normalized events and aggregated batches into parquet files partitioned by time and key; used by data warehouse or query engines.
- Data Warehouse / Query Engine (Snowflake/BigQuery/Redshift/Presto): for ad-hoc analytics and business intelligence.
- CDC pipeline: capture DB changes and publish to Kafka for enrichment state updates.
- Observability / Monitoring: metrics about ingest rates, consumer lag, processing latency, errors, and SLO alerts.
Data flow: Producers -> Kafka -> Stream processors (enrich & aggregate) -> TSDB / ES / Parquet Sinks -> Query APIs / Data Warehouse.
8. Detailed Design Decisions
Key trade-offs and chosen approaches:
-
Broker choice: Kafka for durability, partitioning, replay. Use topic-per-data-type with partition key by source_id or tenant_id to preserve ordering per-source.
-
Stream engine: Flink for true streaming with low-latency stateful operations and exactly-once semantics (with checkpointing). Spark Structured Streaming is acceptable if micro-batching is preferred.
-
Windowing: implement tumbling/hopping/sliding windows inside stream engine:
- Tumbling for fixed-interval aggregates.
- Hopping for overlapping intervals (compose from tumbling panes).
- Sliding using stateful ordered buffers (linked lists or indexed windows) and eviction logic.
-
Schema & serialization: use Protobuf/Avro with schema registry for compactness and evolvability. Store schema_id in messages.
-
Enrichment: cache reference data in state backend; update via CDC topics or periodic snapshots. Partition caches by the same key used in Kafka partitions to colocate keys and minimize remote lookups.
-
TSDB: use TimescaleDB or a purpose-built TSDB; adopt hypertable+chunk per (source, date) for fast deletes and locality. Partition chunks by time (daily) and source for efficient compaction and drop.
-
Logs index: index hot logs in ES for recent time window. Use rollover indices by time and source, store older logs in S3 cold storage with pointers.
-
Parquet + Data Lake: write parquet files using time- and heavy-filter partitioning; include min/max statistics for predicate pushdown. Use compaction jobs to merge small files.
-
Retention & delete: implement lifecycle policies. For bulk deletes, drop whole chunk/table files (TSDB chunk or parquet partition) when possible to avoid expensive tombstone operations.
-
Exactly-once vs at-least-once: prefer exactly-once for aggregates (use stream engine transactions + Kafka sinks) and at-least-once for logs where duplicates are acceptable and deduplication is complex.
9. Bottlenecks & Scaling
Potential bottlenecks and mitigations:
- Kafka cluster saturation: monitor broker throughput, increase partitions, add brokers, use compression, and tune producer batching.
- Partition hot spots: choose partition keys that spread load (hash tenant+source) and shard heavy sources.
- Stream processing state explosion: use RocksDB with TTL/eviction, partition state across workers, and scale out stateful operators.
- Network / S3 read latency for analytics: use caching and colocated compute (e.g., cluster with HDFS or use cloud-native warehouses that read from S3 efficiently).
- Elasticsearch indexing cost: reduce indexed fields, use ingest pipelines for light parsing, and tier older indices to warm/cold nodes.
- Large-cardinality tags in TSDB: use tag cardinality limits or rollup aggregations and avoid indexing highly dynamic tags directly.
- Compaction and small files: run periodic compaction tasks to merge parquet files and reduce overhead.
10. Follow-up Questions / Extensions
- Support multi-region ingest and geo-failover? (Cross-region replication, global Kafka clusters or MirrorMaker)
- Provide per-tenant quotas and billing based on ingress/egress/storage?
- Real-time alerting and anomaly detection on streams?
- Support trace-level correlation (link logs, traces, metrics) using a global trace_id?
- Add data lineage and auditing for compliance and debugging?
- Include schema governance and automated compatibility checks?
11. Wrap-up
A robust design centralizes durable ingestion (Kafka), performs low-latency stateful processing (Flink/Spark) for enrichment and windowed aggregation, stores metrics in a TSDB optimized for time-chunked data, indexes logs in a search engine for fast full-text queries, and persists normalized data to a parquet-backed data lake and data warehouse for analytics. Key operational concerns are partitioning, schema management, state scaling, retention and delete strategies, and cost trade-offs between hot query latency and cold storage economics.