Aggregation & Rollups
This recipe demonstrates grouping records and computing summary statistics. The pipeline filters active sales records, then rolls them up by department.
Input data
sales.csv:
id,department,amount,status,rep
1,Engineering,5000,active,Alice
2,Marketing,3000,active,Bob
3,Engineering,7000,active,Carol
4,Sales,4000,inactive,Dave
5,Marketing,2000,active,Eva
6,Engineering,9500,active,Frank
7,Sales,6000,active,Grace
8,Marketing,1500,inactive,Hank
Pipeline
dept_rollup.yaml:
pipeline:
name: dept_rollup
nodes:
- type: source
name: sales
config:
name: sales
type: csv
path: "./sales.csv"
schema:
- { name: id, type: int }
- { name: department, type: string }
- { name: amount, type: float }
- { name: status, type: string }
- { name: rep, type: string }
- type: transform
name: active_only
input: sales
config:
cxl: |
filter status == "active"
- type: aggregate
name: rollup
input: active_only
config:
group_by: [department]
cxl: |
emit total = sum(amount)
emit count = count(*)
emit average = avg(amount)
emit maximum = max(amount)
emit minimum = min(amount)
- type: output
name: report
input: rollup
config:
name: dept_totals
type: csv
path: "./output/dept_totals.csv"
Run it
clinker run dept_rollup.yaml --dry-run
clinker run dept_rollup.yaml
Expected output
output/dept_totals.csv:
department,total,count,average,maximum,minimum
Engineering,21500,3,7166.67,9500,5000
Marketing,5000,2,2500,3000,2000
Sales,6000,1,6000,6000,6000
One row per department. The inactive records (Dave’s $4000, Hank’s $1500) are excluded by the filter.
How aggregation works
Group-by keys
The group_by field lists the columns that define each group. Records with the same values for all group-by columns are aggregated together. The group-by columns appear automatically in the output – you do not need to emit them.
Aggregate functions
Available aggregate functions in CXL:
| Function | Description |
|---|---|
sum(expr) | Sum of values |
count(*) | Number of records |
avg(expr) | Arithmetic mean |
min(expr) | Minimum value |
max(expr) | Maximum value |
first(expr) | First value encountered |
last(expr) | Last value encountered |
Per-document aggregation
When the source is document-aware — a glob: / paths: source that
treats each file as its own document, or an enveloped format like XML or
EDI — the Aggregate produces one set of grouped rows per document
rather than a single aggregate spanning every file. Each document’s
groups finalize and emit at that document’s close boundary, so a glob
over twelve monthly files yields twelve independent monthly roll-ups, and
only one document’s groups are live in memory at a time. A plain
single-file source is one document and still emits a single aggregate.
See Envelopes & Document Context
for the boundary rules (including how a Merge of distinct sources folds
them back into one aggregate).
Strategy selection
Clinker offers two aggregation strategies:
-
Hash aggregation (default): Builds an in-memory hash map keyed by the group-by columns. Works with any input order. Memory usage is proportional to the number of distinct groups.
-
Streaming aggregation: Processes records in order, emitting each group’s result as soon as the next group starts. Requires input sorted by the group-by keys. Uses minimal memory regardless of the number of groups.
The default strategy (auto) selects streaming when the optimizer can prove the input is sorted by the group-by keys, and hash otherwise. You can force a strategy:
config:
group_by: [department]
strategy: streaming # requires sorted input
See Memory Tuning for details on memory implications.
Variations
Multiple group-by keys
config:
group_by: [department, region]
cxl: |
emit total = sum(amount)
emit count = count(*)
Produces one row per unique (department, region) combination.
Pre-aggregation transform
Compute derived fields before aggregating:
- type: transform
name: prepare
input: sales
config:
cxl: |
filter status == "active"
emit department = department
emit amount = amount
emit is_large = amount >= 5000
- type: aggregate
name: rollup
input: prepare
config:
group_by: [department]
cxl: |
emit total = sum(amount)
emit large_count = sum(if is_large then 1 else 0)
emit small_count = sum(if not is_large then 1 else 0)
Aggregation followed by routing
Aggregate first, then route the summary rows:
- type: aggregate
name: rollup
input: active_only
config:
group_by: [department]
cxl: |
emit total = sum(amount)
- type: route
name: split_by_total
input: rollup
config:
mode: exclusive
conditions:
large: "total >= 10000"
default: small
This routes departments with over $10,000 in total sales to one output and the rest to another.
No group-by (grand total)
Omit group_by to aggregate all records into a single output row:
config:
cxl: |
emit grand_total = sum(amount)
emit record_count = count(*)
emit average_amount = avg(amount)
Time-windowed rollups
When the grouping dimension is event-time bucket, declare a
watermark: on every source
and a time_window:
on the aggregate. Three patterns cover the common shapes; all three
ship as runnable pipelines under examples/pipelines/.
Tumbling: hourly click counts
Non-overlapping one-hour buckets per user. Use when each record should contribute to exactly one reporting bucket.
examples/pipelines/tumbling_clicks.yaml:
pipeline:
name: tumbling_clicks
nodes:
- type: source
name: clicks
description: Per-user click stream with an event-time column.
config:
name: clicks
type: csv
path: ./data/tumbling_clicks.csv
options:
has_header: true
watermark:
column: event_ts
schema:
- { name: user_id, type: string }
- { name: event_ts, type: date_time }
- { name: kind, type: string }
- type: aggregate
name: hourly_clicks
description: Per-user click count, bucketed by event-time hour.
input: clicks
config:
group_by: [user_id]
time_window:
tumbling: { size: 1h }
cxl: |
emit user_id = user_id
emit n = count(*)
- type: output
name: results
input: hourly_clicks
config:
name: results
type: csv
path: ./output/tumbling_clicks.csv
error_handling:
strategy: fail_fast
Run:
cargo run -p clinker -- run examples/pipelines/tumbling_clicks.yaml
The source’s watermark advances with each record’s event_ts; each
hour-aligned bucket emits one row per user_id as soon as the
watermark crosses bucket_end. Records observed out-of-order land
in the DLQ as late_record — add delay: on the source or
allowed_lateness: on the aggregate if the input has a known
out-of-order tail.
Hopping: 1-hour sums advanced every 5 minutes
Overlapping one-hour windows that move forward every 5 minutes. Use for moving averages and rolling sums where one record should contribute to multiple overlapping reports.
examples/pipelines/hopping_sliding_5m_1h.yaml:
pipeline:
name: hopping_sliding_5m_1h
nodes:
- type: source
name: clicks
config:
name: clicks
type: csv
path: ./data/hopping_clicks.csv
options:
has_header: true
watermark:
column: event_ts
delay: 5s
schema:
- { name: user_id, type: string }
- { name: event_ts, type: date_time }
- { name: amount, type: int }
- type: aggregate
name: sliding_amount
input: clicks
config:
group_by: [user_id]
time_window:
hopping:
size: 1h
slide: 5m
allowed_lateness: 30s
cxl: |
emit user_id = user_id
emit total = sum(amount)
emit n = count(*)
- type: output
name: results
input: sliding_amount
config:
name: results
type: csv
path: ./output/hopping_sliding_5m_1h.csv
error_handling:
strategy: fail_fast
Run:
cargo run -p clinker -- run examples/pipelines/hopping_sliding_5m_1h.yaml
Each record fans into ceil(size / slide) = 12 overlapping
windows, so the output row count is roughly 12× the active-window
record count. The source’s delay: 5s plus the aggregate’s
allowed_lateness: 30s give the pipeline 35 seconds of total grace
beyond strict event-time order before a record drops to the DLQ.
Session: per-user multi-source login sessions
Variable-duration windows bounded by inactivity, computed across two independent sources. Use for activity grouping where the window length is data-driven rather than clock-aligned.
examples/pipelines/multi_source_session.yaml:
pipeline:
name: multi_source_session
nodes:
- type: source
name: src_web
description: Web login events.
config:
name: src_web
type: csv
path: ./data/session_logins.csv
options:
has_header: true
watermark:
column: event_ts
schema:
- { name: user_id, type: string }
- { name: event_ts, type: date_time }
- { name: source, type: string }
- type: source
name: src_mobile
description: Mobile login events.
config:
name: src_mobile
type: csv
path: ./data/session_mobile.csv
options:
has_header: true
watermark:
column: event_ts
schema:
- { name: user_id, type: string }
- { name: event_ts, type: date_time }
- { name: source, type: string }
- type: merge
name: all_logins
inputs: [src_web, src_mobile]
- type: aggregate
name: user_sessions
input: all_logins
config:
group_by: [user_id]
time_window:
session: { gap: 5m }
allowed_lateness: 30s
cxl: |
emit user_id = user_id
emit logins = count(*)
- type: output
name: results
input: user_sessions
config:
name: results
type: csv
path: ./output/multi_source_session.csv
error_handling:
strategy: fail_fast
Run:
cargo run -p clinker -- run examples/pipelines/multi_source_session.yaml
Each source declares its own watermark.column independently. The
aggregate’s close decision reads min_across_sources across both
sources’ partitions: a session can’t emit until both src_web and
src_mobile have advanced past session_end + allowed_lateness.
Drop the watermark: block on either source and the planner
rejects the pipeline with
E156.
When to pick each
| Kind | Bucket shape | Typical use |
|---|---|---|
tumbling | Disjoint, clock-aligned, fixed width | Hourly metrics, daily rollups, billing periods. |
hopping | Overlapping, clock-aligned, fixed width | Moving averages, sliding sums, anomaly detection where each record should affect multiple reports. |
session | Variable width, gap-bounded, per-key | User sessions, telemetry burst grouping, activity envelopes where the window length is data-driven. |