Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Output Nodes

Output nodes write processed records to files. They are the terminal nodes of a pipeline – every pipeline path must end at an output (or records are silently dropped).

Basic structure

- type: output
  name: result
  input: transform_node
  config:
    name: output_stage
    type: csv
    path: "./output/result.csv"

The type: field selects the output format: csv, json, xml, fixed_width, edifact, or x12. The edifact and x12 writers reconstruct their EDI interchange envelopes around emitted records; see EDIFACT Format and X12 Format.

Field control

Output nodes can either pass every upstream field through to the writer or restrict output to the fields the upstream transform explicitly emitted. Several options control which fields appear and how they are named.

Unmapped input field passthrough

    include_unmapped: false    # Default: true

When true (the default), every field on an input record that the upstream transform did not explicitly emit still passes through to the output unchanged. This includes fields the source’s on_unmapped: auto_widen policy absorbed into the per-record $widened sidecar map – their contents expand back to top-level columns at the sink.

When false, only fields named by an emit statement in the upstream transform appear in the output. The $widened sidecar slot is stripped and undeclared input fields are dropped.

Migration notice

The default flipped from false to true in a recent release (see issue #90). Pipelines that relied on the previous behavior – where output records contained only the fields explicitly emitted upstream – must now set include_unmapped: false explicitly to restore that shape.

The flag composes independently with include_correlation_keys: true – see below. See Auto-Widen & Schema Drift -> Output controls for the full specification, cross-format flow examples, and the writer-rejection contract for Value::Map payloads.

Worked example

Suppose the upstream source emits records with order_id, customer_id, amount, and region, and a transform that emits only one derived field:

- type: transform
  name: classify
  input: orders
  config:
    cxl: |
      emit amount_bucket = if amount >= 1000 then "high" else "low"

With include_unmapped: true (the default), each output record carries order_id, customer_id, amount, region, and amount_bucket. With include_unmapped: false, each output record carries only amount_bucket. The transform’s CXL is unchanged in both cases – the Output node decides the field set.

Include correlation-key shadow columns

    include_correlation_keys: true    # Default: false

When the pipeline declares error_handling.correlation_key: <field>, the engine adds shadow columns named $ck.<field> to the schema. These shadow columns preserve correlation-group identity through transforms that may rewrite the user-declared field. They are an internal engine namespace and are stripped from output by default.

Set include_correlation_keys: true to surface the shadow columns in the writer output – typically for debugging correlation-group routing or auditing DLQ behavior. See Correlation Keys for the full lifecycle.

include_correlation_keys does not surface the $widened sidecar – include_unmapped is the separate flag for that. The two are independent: each, both, or neither can be set.

Writer rejection of Value::Map payloads

CSV, XML, fixed-width, EDIFACT, and X12 writers refuse records carrying a Value::Map payload at any column slot, raising FormatError::UnserializableMapValue { format, column }. JSON serializes Value::Map natively as a nested object.

The typical cause is a $widened sidecar reaching a non-JSON writer because the Output node set include_unmapped: false. See Auto-Widen & Schema Drift -> Writer rejection for the rejection contract and remediation routes.

Field mapping

Rename fields at output time without changing upstream CXL:

    mapping:
      "Customer Name": "full_name"
      "Order Total": "amount"

Keys are output column names; values are the source field names from upstream.

Excluding fields

Remove specific fields from output:

    exclude: [internal_id, _debug_flag, temp_calc]

Header control (CSV)

    include_header: true      # Default: true

Set to false to omit the CSV header row.

Null handling

    preserve_nulls: false     # Default: false

When false, null values are written as empty strings. When true, nulls are preserved in the output format’s native null representation (e.g., null in JSON).

Metadata inclusion

Control whether per-record $meta.* metadata fields appear in output:

    include_metadata: all       # Include all metadata fields
    include_metadata: none      # Default -- strip all metadata
    include_metadata:
      - source_file             # Include only listed metadata keys
      - source_row

Metadata fields are prefixed with meta. in the output.

Output format options

CSV

- type: output
  name: csv_out
  input: processed
  config:
    name: csv_out
    type: csv
    path: "./output/result.csv"
    options:
      delimiter: "|"

JSON

- type: output
  name: json_out
  input: processed
  config:
    name: json_out
    type: json
    path: "./output/result.json"
    options:
      format: ndjson           # array | ndjson
      pretty: true             # Pretty-print JSON
  • array (default) – writes a single JSON array containing all records.
  • ndjson – writes one JSON object per line.

XML

- type: output
  name: xml_out
  input: processed
  config:
    name: xml_out
    type: xml
    path: "./output/result.xml"
    options:
      root_element: "data"
      record_element: "row"

Fixed-width

- type: output
  name: fw_out
  input: processed
  config:
    name: fw_out
    type: fixed_width
    path: "./output/result.dat"
    schema: "./schemas/output.schema.yaml"
    options:
      line_separator: crlf

Fixed-width output requires a format schema defining field positions and widths.

EDIFACT

- type: output
  name: edi_out
  input: messages
  config:
    name: edi_out
    type: edifact
    path: "./out/result.edi"
    options:
      interchange: ["UNOA:1", "SENDER", "RECEIVER", "240101:1200", "REF1"]
      message_type: "ORDERS:D:96A:UN"
      write_una: false
      segment_newline: true

The EDIFACT writer reconstructs the interchange envelope around emitted records, recomputing the UNT/UNZ control counts and echoing the control references, and release-escapes any element data that carries a service character. The UNB header comes from interchange (literal elements) or interchange_from_doc (echoed from a $doc section). An interchange is a single envelope, so an edifact output cannot be combined with a split: block — the combination is rejected at config-validation time (E323). See EDIFACT Format for the full option reference, the record schema, and the round-trip semantics.

Sort order

Sort records before writing:

    sort_order:
      - { field: "name", order: asc }
      - { field: "amount", order: desc, null_order: last }
Sort optionValuesDefault
orderasc, descasc
null_orderfirst, last, droplast
  • first – nulls sort before all non-null values.
  • last – nulls sort after all non-null values.
  • drop – records with null sort keys are excluded from output.

Shorthand: a bare string defaults to ascending with nulls last:

    sort_order:
      - "name"
      - { field: "amount", order: desc }

File splitting

Split output into multiple files based on record count, byte size, or group boundaries:

- type: output
  name: split_output
  input: processed
  config:
    name: split_output
    type: csv
    path: "./output/result.csv"
    split:
      max_records: 10000
      max_bytes: 10485760           # 10 MB
      group_key: "department"       # Never split mid-group
      naming: "{stem}_{seq:04}.{ext}"
      repeat_header: true           # Repeat CSV header in each file
      oversize_group: warn          # warn | error | allow

Split configuration fields

FieldRequiredDefaultDescription
max_recordsNoSoft record count limit per file
max_bytesNoSoft byte size limit per file
group_keyNoField name – never split within a group sharing this key value
namingNo"{stem}_{seq:04}.{ext}"File naming pattern. {stem} is the base name, {seq:04} is a zero-padded sequence number, {ext} is the file extension
repeat_headerNotrueRepeat CSV header row in each split file
oversize_groupNowarnWhat to do when a single key group exceeds file limits

At least one of max_records or max_bytes should be specified for splitting to have any effect.

Oversize group policies

  • warn (default) – log a warning and allow the oversized file.
  • error – stop the pipeline.
  • allow – silently allow the oversized file.

When group_key is set, the split point is the first group boundary after the threshold is reached (greedy). Without group_key, files are split at the exact limit.

Streaming writes under fused Merge.interleave

When a single Output sits directly downstream of a Merge whose mode is interleave and whose every direct predecessor is a Source, the executor takes a streaming path: a bounded tokio::sync::mpsc::channel connects the Merge arm to the writer task, and Writer::write_record fires per record as Merge emits, concurrent with Merge production.

The buffered alternative — which still runs for every other Output topology — waits until the Merge arm has accumulated every record before invoking the writer. With a slow upstream Source that defeats the live back-pressure the Merge.interleave fusion provides at the Source-channel layer: each record sits in node_buffers[merge] until the slow Source finishes.

Topology

- type: source
  name: src_a
  config: { type: csv, path: a.csv, schema: ... }
- type: source
  name: src_b
  config: { type: csv, path: b.csv, schema: ... }
- type: merge
  name: merged
  inputs: [src_a, src_b]
  config:
    mode: interleave        # required
- type: output
  name: out
  input: merged
  config:
    name: out
    type: csv
    path: out.csv

The streaming path is selected automatically — there is no opt-in setting. Pipelines that don’t match the topology keep the buffered path.

Eligibility

Every condition must hold for the streaming path to engage; if any fails, the buffered path runs:

  • The Output has exactly one incoming edge, and that predecessor is a Merge with mode: interleave.
  • Every direct predecessor of that Merge is a Source (same predicate the fused Merge.interleave arm uses for its live tokio::select!).
  • The Merge has no other downstream consumer besides this one Output (no fan-out).
  • The Output is not in the init-phase ancestor closure.
  • The OutputConfig has no split: block — splitting writers manage their own file rotation lifecycle.
  • The writer is registered in the single-file writer registry (not fan_out_per_source_file).
  • No Source in the pipeline declares a correlation key — the correlation-buffered output path defers writes to CorrelationCommit and is incompatible with per-record write.

Back-pressure flow

Under the streaming path, back-pressure flows end-to-end:

writer slow → mpsc::Sender::send().await yields
             → Merge arm yields
             → Source mpsc::Receiver fills
             → Source ingest task blocks on send

The bounded handoff channel between Merge and Output (256 slots) and the existing per-Source ingest channels (issue #67) form a single pace-bound chain from the underlying Write sink back to the source reader. A slow file system, a saturated network sink, or a deliberately-paced writer no longer accumulates records in pipeline-internal Vecs; the upstream readers slow down to match.

Counter semantics

Counter behavior under the streaming path matches the buffered Output arm exactly: records_written increments once per Writer::write_record call, ok_count counts distinct source row_nums reaching the Output, and dlq_count is unaffected (DLQ entries originate upstream). Stage metrics (SchemaScan, Write, Projection) accumulate into the same fields the buffered path uses; the dispatcher folds the streaming task’s per-task accounting back into the run-wide totals at end of DAG.

Complete example

- type: output
  name: department_reports
  input: enriched_employees
  config:
    name: department_reports
    type: csv
    path: "./output/employees.csv"
    mapping:
      "Employee ID": "employee_id"
      "Full Name": "display_name"
      "Department": "department"
      "Annual Salary": "salary"
    exclude: [internal_flags]
    include_header: true
    sort_order:
      - { field: "department", order: asc }
      - { field: "display_name", order: asc }
    split:
      max_records: 5000
      group_key: "department"
      naming: "employees_{seq:03}.csv"
      repeat_header: true