Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Transform Nodes

Transform nodes apply CXL expressions to each record, producing new fields, filtering records, or both. They process one record at a time in streaming fashion with constant memory overhead.

Basic structure

- type: transform
  name: enrich
  input: customers
  config:
    cxl: |
      emit full_name = first_name + " " + last_name
      emit tier = if lifetime_value >= 10000 then "gold" else "standard"
      filter status == "active"

The cxl: field is required and contains a CXL program. The three core CXL statements for transforms are:

  • emit – produces an output field. Only emitted fields appear in downstream nodes.
  • filter – drops records that do not match the boolean condition.
  • let – binds a local variable for use in subsequent expressions (not emitted).
    cxl: |
      let margin = revenue - cost
      emit product_id = product_id
      emit margin = margin
      emit margin_pct = if revenue > 0 then margin / revenue * 100 else 0
      filter margin > 0

Analytic window

The analytic_window field enables cross-source lookups by joining a secondary dataset into the transform. The secondary source is loaded into memory and indexed by the join key.

- type: transform
  name: enrich_orders
  input: orders
  config:
    analytic_window:
      source: products
      on: product_id
      group_by: [product_id]
    cxl: |
      emit order_id = order_id
      emit product_name = $window.first()
      emit quantity = quantity
      emit line_total = quantity * price

The $window.* namespace provides access to the windowed data. Functions like $window.first(), $window.last(), and $window.count() operate over the matched group.

Validations

Declarative validation checks can be attached to a transform. They run against each record and either route failures to the DLQ (severity error) or log a warning and continue (severity warn).

- type: transform
  name: validate_orders
  input: raw_orders
  config:
    cxl: |
      emit order_id = order_id
      emit amount = amount
      emit email = email
    validations:
      - field: email
        check: "not_empty"
        severity: error
        message: "Email is required"
      - check: "amount > 0"
        severity: warn
        message: "Non-positive amount"
      - field: order_id
        check: "not_empty"
        severity: error

Validation fields

FieldRequiredDescription
fieldNoRestrict the check to a single field
checkYesValidation name (e.g. "not_empty") or CXL boolean expression
severityNoerror (default) routes to DLQ; warn logs and continues
messageNoCustom error message for DLQ entries
nameNoValidation name for DLQ reporting. Auto-derived from field + check if omitted
argsNoAdditional arguments as key-value pairs

Expansion cap (max_expansion)

When a transform body contains an emit each statement, every input record can fan out into multiple output records. The max_expansion field caps how many output records a single input record may produce – a safety bound against unexpectedly large arrays.

- type: transform
  name: explode_items
  input: orders
  config:
    max_expansion: 5000      # default: 10000
    cxl: |
      emit each it in items {
        emit order_id = order_id
        emit sku = it["sku"]
        emit price = it["price"]
      }
FieldTypeDefaultDescription
max_expansionu6410000Maximum cumulative output records per input record.

If a single input record’s emit each block produces more than max_expansion output records, the originating record routes to the DLQ with category expansion_limit_exceeded instead of producing a truncated or unbounded result. No partial output is emitted for that record – the cap is enforced eagerly so the writer never sees records from a runaway expansion.

When to tune

  • Lower (e.g. 100, 1000) when input arrays are bounded by a known business rule and you want hostile or malformed input to surface as a DLQ entry rather than as a flood of downstream records.
  • Higher (e.g. 100000, 1000000) when legitimate input carries large arrays – for example, an order with a long line-item list or an event carrying a per-second pricing curve.

The DLQ category expansion_limit_exceeded is distinct from generic CXL evaluation failures, so DLQ-side filters and metrics can target expansion runaway specifically. See Error Handling & DLQ for the wider DLQ contract.

Batch size (batch_size)

A streaming-eligible transform hands its output downstream in bounded batches rather than accumulating the whole stage before the next stage runs. batch_size sets how many events (records plus document-boundary punctuations) a batch holds. A per-transform batch_size overrides the pipeline-level pipeline.batch_size for this one stage; omit it to inherit the pipeline value (or the built-in default of 2048).

- type: transform
  name: enrich
  input: orders
  config:
    batch_size: 512         # override pipeline.batch_size for this stage
    cxl: |
      emit order_id = order_id
      emit total = quantity * unit_price
FieldTypeDefaultDescription
batch_sizeusizeinherits pipeline.batch_size (else 2048)Events per streaming batch for this transform. Must be >= 1.

A batch_size of 0 is rejected at config load (a zero-event batch never flushes). Smaller batches lower the in-flight memory of a streaming stage at the cost of more per-batch bookkeeping; larger batches amortize the bookkeeping at the cost of a larger live working set. The default suits typical record widths — tune it only when a profiling run shows a streaming stage’s per-batch footprint matters. See Streaming vs. Blocking Stages for which stages stream and which fully materialize.

Log directives

Log directives control diagnostic output during transform execution:

- type: transform
  name: process
  input: validated
  config:
    cxl: |
      emit id = id
      emit result = compute(value)
    log:
      - level: info
        when: per_record
        every: 1000
        message: "Processed record"
      - level: warn
        when: on_error
        message: "Record failed processing"
      - level: debug
        when: before_transform
        message: "Starting transform"

Log directive fields

FieldRequiredDescription
levelYestrace, debug, info, warn, or error
whenYesbefore_transform, after_transform, per_record, or on_error
messageYesLog message text
everyNoOnly log every N records (for per_record timing)
conditionNoCXL boolean expression – only log when true
fieldsNoList of field names to include in the log output
log_ruleNoReference to an external log rule definition

Complete example

- type: source
  name: employees
  config:
    name: employees
    type: csv
    path: "./data/employees.csv"
    schema:
      - { name: employee_id, type: string }
      - { name: first_name, type: string }
      - { name: last_name, type: string }
      - { name: department, type: string }
      - { name: salary, type: int }
      - { name: hire_date, type: date }

- type: transform
  name: enrich_employees
  description: "Compute display name and tenure"
  input: employees
  config:
    cxl: |
      emit employee_id = employee_id
      emit display_name = last_name + ", " + first_name
      emit department = department.upper()
      emit salary = salary
      emit annual_bonus = if salary >= 80000 then salary * 0.15
        else salary * 0.10
    validations:
      - field: employee_id
        check: "not_empty"
        severity: error
        message: "Employee ID is required"
      - check: "salary > 0"
        severity: warn
        message: "Salary should be positive"
    log:
      - level: info
        when: per_record
        every: 5000
        message: "Processing employees"