Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Aggregation & Rollups

This recipe demonstrates grouping records and computing summary statistics. The pipeline filters active sales records, then rolls them up by department.

Input data

sales.csv:

id,department,amount,status,rep
1,Engineering,5000,active,Alice
2,Marketing,3000,active,Bob
3,Engineering,7000,active,Carol
4,Sales,4000,inactive,Dave
5,Marketing,2000,active,Eva
6,Engineering,9500,active,Frank
7,Sales,6000,active,Grace
8,Marketing,1500,inactive,Hank

Pipeline

dept_rollup.yaml:

pipeline:
  name: dept_rollup

nodes:
  - type: source
    name: sales
    config:
      name: sales
      type: csv
      path: "./sales.csv"
      schema:
        - { name: id, type: int }
        - { name: department, type: string }
        - { name: amount, type: float }
        - { name: status, type: string }
        - { name: rep, type: string }

  - type: transform
    name: active_only
    input: sales
    config:
      cxl: |
        filter status == "active"

  - type: aggregate
    name: rollup
    input: active_only
    config:
      group_by: [department]
      cxl: |
        emit total = sum(amount)
        emit count = count(*)
        emit average = avg(amount)
        emit maximum = max(amount)
        emit minimum = min(amount)

  - type: output
    name: report
    input: rollup
    config:
      name: dept_totals
      type: csv
      path: "./output/dept_totals.csv"

Run it

clinker run dept_rollup.yaml --dry-run
clinker run dept_rollup.yaml

Expected output

output/dept_totals.csv:

department,total,count,average,maximum,minimum
Engineering,21500,3,7166.67,9500,5000
Marketing,5000,2,2500,3000,2000
Sales,6000,1,6000,6000,6000

One row per department. The inactive records (Dave’s $4000, Hank’s $1500) are excluded by the filter.

How aggregation works

Group-by keys

The group_by field lists the columns that define each group. Records with the same values for all group-by columns are aggregated together. The group-by columns appear automatically in the output – you do not need to emit them.

Aggregate functions

Available aggregate functions in CXL:

FunctionDescription
sum(expr)Sum of values
count(*)Number of records
avg(expr)Arithmetic mean
min(expr)Minimum value
max(expr)Maximum value
first(expr)First value encountered
last(expr)Last value encountered

Per-document aggregation

When the source is document-aware — a glob: / paths: source that treats each file as its own document, or an enveloped format like XML or EDI — the Aggregate produces one set of grouped rows per document rather than a single aggregate spanning every file. Each document’s groups finalize and emit at that document’s close boundary, so a glob over twelve monthly files yields twelve independent monthly roll-ups, and only one document’s groups are live in memory at a time. A plain single-file source is one document and still emits a single aggregate. See Envelopes & Document Context for the boundary rules (including how a Merge of distinct sources folds them back into one aggregate).

Strategy selection

Clinker offers two aggregation strategies:

  • Hash aggregation (default): Builds an in-memory hash map keyed by the group-by columns. Works with any input order. Memory usage is proportional to the number of distinct groups.

  • Streaming aggregation: Processes records in order, emitting each group’s result as soon as the next group starts. Requires input sorted by the group-by keys. Uses minimal memory regardless of the number of groups.

The default strategy (auto) selects streaming when the optimizer can prove the input is sorted by the group-by keys, and hash otherwise. You can force a strategy:

    config:
      group_by: [department]
      strategy: streaming   # requires sorted input

See Memory Tuning for details on memory implications.

Variations

Multiple group-by keys

    config:
      group_by: [department, region]
      cxl: |
        emit total = sum(amount)
        emit count = count(*)

Produces one row per unique (department, region) combination.

Pre-aggregation transform

Compute derived fields before aggregating:

  - type: transform
    name: prepare
    input: sales
    config:
      cxl: |
        filter status == "active"
        emit department = department
        emit amount = amount
        emit is_large = amount >= 5000

  - type: aggregate
    name: rollup
    input: prepare
    config:
      group_by: [department]
      cxl: |
        emit total = sum(amount)
        emit large_count = sum(if is_large then 1 else 0)
        emit small_count = sum(if not is_large then 1 else 0)

Aggregation followed by routing

Aggregate first, then route the summary rows:

  - type: aggregate
    name: rollup
    input: active_only
    config:
      group_by: [department]
      cxl: |
        emit total = sum(amount)

  - type: route
    name: split_by_total
    input: rollup
    config:
      mode: exclusive
      conditions:
        large: "total >= 10000"
      default: small

This routes departments with over $10,000 in total sales to one output and the rest to another.

No group-by (grand total)

Omit group_by to aggregate all records into a single output row:

    config:
      cxl: |
        emit grand_total = sum(amount)
        emit record_count = count(*)
        emit average_amount = avg(amount)

Time-windowed rollups

When the grouping dimension is event-time bucket, declare a watermark: on every source and a time_window: on the aggregate. Three patterns cover the common shapes; all three ship as runnable pipelines under examples/pipelines/.

Tumbling: hourly click counts

Non-overlapping one-hour buckets per user. Use when each record should contribute to exactly one reporting bucket.

examples/pipelines/tumbling_clicks.yaml:

pipeline:
  name: tumbling_clicks

nodes:
  - type: source
    name: clicks
    description: Per-user click stream with an event-time column.
    config:
      name: clicks
      type: csv
      path: ./data/tumbling_clicks.csv
      options:
        has_header: true
      watermark:
        column: event_ts
      schema:
        - { name: user_id, type: string }
        - { name: event_ts, type: date_time }
        - { name: kind, type: string }

  - type: aggregate
    name: hourly_clicks
    description: Per-user click count, bucketed by event-time hour.
    input: clicks
    config:
      group_by: [user_id]
      time_window:
        tumbling: { size: 1h }
      cxl: |
        emit user_id = user_id
        emit n = count(*)

  - type: output
    name: results
    input: hourly_clicks
    config:
      name: results
      type: csv
      path: ./output/tumbling_clicks.csv

error_handling:
  strategy: fail_fast

Run:

cargo run -p clinker -- run examples/pipelines/tumbling_clicks.yaml

The source’s watermark advances with each record’s event_ts; each hour-aligned bucket emits one row per user_id as soon as the watermark crosses bucket_end. Records observed out-of-order land in the DLQ as late_record — add delay: on the source or allowed_lateness: on the aggregate if the input has a known out-of-order tail.

Hopping: 1-hour sums advanced every 5 minutes

Overlapping one-hour windows that move forward every 5 minutes. Use for moving averages and rolling sums where one record should contribute to multiple overlapping reports.

examples/pipelines/hopping_sliding_5m_1h.yaml:

pipeline:
  name: hopping_sliding_5m_1h

nodes:
  - type: source
    name: clicks
    config:
      name: clicks
      type: csv
      path: ./data/hopping_clicks.csv
      options:
        has_header: true
      watermark:
        column: event_ts
        delay: 5s
      schema:
        - { name: user_id, type: string }
        - { name: event_ts, type: date_time }
        - { name: amount, type: int }

  - type: aggregate
    name: sliding_amount
    input: clicks
    config:
      group_by: [user_id]
      time_window:
        hopping:
          size: 1h
          slide: 5m
      allowed_lateness: 30s
      cxl: |
        emit user_id = user_id
        emit total = sum(amount)
        emit n = count(*)

  - type: output
    name: results
    input: sliding_amount
    config:
      name: results
      type: csv
      path: ./output/hopping_sliding_5m_1h.csv

error_handling:
  strategy: fail_fast

Run:

cargo run -p clinker -- run examples/pipelines/hopping_sliding_5m_1h.yaml

Each record fans into ceil(size / slide) = 12 overlapping windows, so the output row count is roughly 12× the active-window record count. The source’s delay: 5s plus the aggregate’s allowed_lateness: 30s give the pipeline 35 seconds of total grace beyond strict event-time order before a record drops to the DLQ.

Session: per-user multi-source login sessions

Variable-duration windows bounded by inactivity, computed across two independent sources. Use for activity grouping where the window length is data-driven rather than clock-aligned.

examples/pipelines/multi_source_session.yaml:

pipeline:
  name: multi_source_session

nodes:
  - type: source
    name: src_web
    description: Web login events.
    config:
      name: src_web
      type: csv
      path: ./data/session_logins.csv
      options:
        has_header: true
      watermark:
        column: event_ts
      schema:
        - { name: user_id, type: string }
        - { name: event_ts, type: date_time }
        - { name: source, type: string }

  - type: source
    name: src_mobile
    description: Mobile login events.
    config:
      name: src_mobile
      type: csv
      path: ./data/session_mobile.csv
      options:
        has_header: true
      watermark:
        column: event_ts
      schema:
        - { name: user_id, type: string }
        - { name: event_ts, type: date_time }
        - { name: source, type: string }

  - type: merge
    name: all_logins
    inputs: [src_web, src_mobile]

  - type: aggregate
    name: user_sessions
    input: all_logins
    config:
      group_by: [user_id]
      time_window:
        session: { gap: 5m }
      allowed_lateness: 30s
      cxl: |
        emit user_id = user_id
        emit logins = count(*)

  - type: output
    name: results
    input: user_sessions
    config:
      name: results
      type: csv
      path: ./output/multi_source_session.csv

error_handling:
  strategy: fail_fast

Run:

cargo run -p clinker -- run examples/pipelines/multi_source_session.yaml

Each source declares its own watermark.column independently. The aggregate’s close decision reads min_across_sources across both sources’ partitions: a session can’t emit until both src_web and src_mobile have advanced past session_end + allowed_lateness. Drop the watermark: block on either source and the planner rejects the pipeline with E156.

When to pick each

KindBucket shapeTypical use
tumblingDisjoint, clock-aligned, fixed widthHourly metrics, daily rollups, billing periods.
hoppingOverlapping, clock-aligned, fixed widthMoving averages, sliding sums, anomaly detection where each record should affect multiple reports.
sessionVariable width, gap-bounded, per-keyUser sessions, telemetry burst grouping, activity envelopes where the window length is data-driven.