etl

package module
v0.0.0-...-25492d0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 7, 2026 License: MIT Imports: 9 Imported by: 0

README

etl

Go Reference Go Report Card CI codecov

A flexible, high-performance Extract-Transform-Load pipeline framework for Go with interface-based configuration and checkpoint support.

Features

  • Interface-Based Design — Implement only the interfaces you need; the pipeline auto-detects capabilities
  • Concurrent Processing — Configurable worker pools for transform and load stages
  • Checkpointing — Resume interrupted pipelines from the last successful checkpoint
  • Flexible Batching — Multiple batching strategies including size, weight, grouping, and striping
  • Graceful Shutdown — Configurable drain timeout for clean shutdowns
  • Progress Reporting — Built-in progress callbacks with statistics
  • Error Handling — Per-stage error handling with skip or fail actions
  • Iterator-Based Extraction — Uses Go 1.23+ iter.Seq2 for memory-efficient streaming
  • Minimal Dependencies — Only golang.org/x/sync (plus testify for tests)

Installation

go get github.com/bjaus/etl

Requires Go 1.25 or later.

Quick Start

package main

import (
    "context"
    "fmt"
    "iter"

    "github.com/bjaus/etl"
)

type Source struct {
    ID   int
    Name string
}

type Target struct {
    ID   int
    Name string
}

type MyJob struct {
    data []Source
}

func (j *MyJob) Extract(_ context.Context, _ *int) iter.Seq2[Source, error] {
    return func(yield func(Source, error) bool) {
        for _, row := range j.data {
            if !yield(row, nil) {
                return
            }
        }
    }
}

func (j *MyJob) Transform(_ context.Context, src Source) (Target, error) {
    return Target{ID: src.ID, Name: src.Name + "!"}, nil
}

func (j *MyJob) Load(_ context.Context, batch []Target) error {
    for _, t := range batch {
        fmt.Printf("Loaded: %d %s\n", t.ID, t.Name)
    }
    return nil
}

func main() {
    job := &MyJob{
        data: []Source{
            {ID: 1, Name: "Alice"},
            {ID: 2, Name: "Bob"},
        },
    }

    err := etl.New[Source, Target, int](job).Run(context.Background())
    if err != nil {
        panic(err)
    }
}

Core Interfaces

Required: Job Interface

Your job must implement three methods:

type Job[S, T any, C comparable] interface {
    Extract(ctx context.Context, cursor *C) iter.Seq2[S, error]
    Transform(ctx context.Context, src S) (T, error)
    Load(ctx context.Context, batch []T) error
}
Optional Interfaces

Implement these for additional capabilities:

Interface Methods Description
Filter[S] Include(S) bool Pre-transform filtering
Expander[S, T] Expand(ctx, S) ([]T, error) One-to-many transformation
Batcher[T] Batch([]T) [][]T Custom batching logic
ErrorHandler OnError(ctx, Stage, error) Action Per-stage error handling
ProgressReporter ReportInterval() int, OnProgress(ctx, *Stats) Progress callbacks
Checkpointer[S, C] CheckpointInterval() int, Cursor(S) C, LoadCheckpoint(ctx), SaveCheckpoint(ctx, C, *Stats), ClearCheckpoint(ctx) Resumable pipelines
Starter Start(ctx) context.Context Pre-run setup
Stopper Stop(ctx, *Stats, error) Post-run cleanup

Configuration

Configure via method chaining or implement corresponding interfaces:

err := etl.New[Source, Target, int](job).
    WithTransformWorkers(4).      // Concurrent transform workers
    WithLoadWorkers(2).           // Concurrent load workers
    WithLoadBatchSize(100).       // Records per batch
    WithReportInterval(1000).     // Progress report every N records
    WithDrainTimeout(time.Minute).// Graceful shutdown timeout
    Run(ctx)

Or implement interfaces in your job:

func (j *MyJob) TransformWorkers() int    { return 4 }
func (j *MyJob) LoadWorkers() int         { return 2 }
func (j *MyJob) LoadBatchSize() int       { return 100 }
func (j *MyJob) DrainTimeout() time.Duration { return time.Minute }

Configuration priority: WithXxx() methods > interface implementations > defaults.

Batching Strategies

Size-Based (Default)
batcher := etl.SizeBatcher[Target](100)
// [[0-99], [100-199], [200-249]]
Weight-Based

Batch by cumulative weight (e.g., SQL parameter limits):

// Each row uses 5 params; Postgres allows 65535 per statement
batcher := etl.WeightedBatcher(func(t Target) int { return 5 }, 65535)
Group By Field

Keep related records together:

batcher := etl.GroupByField(func(t Target) string { return t.Category })
// All items with same category in one batch
Group By Field with Size Limit
batcher := etl.GroupByFieldWithSizeLimit(
    func(t Target) string { return t.Category },
    50, // Max 50 items per batch
)
Stripe Batcher

Hash-stripe for parallel load workers without lock contention:

batcher := etl.StripeBatcher(
    func(t Target) string { return t.UserID },
    4,   // Number of stripes (match load workers)
    500, // Max batch size
)
Combine Batchers

Chain multiple strategies:

batcher := etl.CombineBatchers(
    etl.GroupByField(func(t Target) string { return t.Category }),
    etl.WeightedBatcher(func(t Target) int { return t.Size }, 10*1024),
)
No Batching

Send everything in one batch:

batcher := etl.NoBatcher[Target]()

Checkpointing

For resumable pipelines, implement Checkpointer:

func (j *MyJob) CheckpointInterval() int { return 5000 }

func (j *MyJob) Cursor(src Source) int64 { return src.ID }

func (j *MyJob) LoadCheckpoint(ctx context.Context) (*int64, *etl.Stats, error) {
    // Load from database/file
    cursor, stats, err := j.db.LoadCheckpoint(ctx, j.jobID)
    return cursor, stats, err
}

func (j *MyJob) SaveCheckpoint(ctx context.Context, cursor int64, stats *etl.Stats) error {
    return j.db.SaveCheckpoint(ctx, j.jobID, cursor, stats)
}

func (j *MyJob) ClearCheckpoint(ctx context.Context) error {
    return j.db.ClearCheckpoint(ctx, j.jobID)
}

When checkpointing is enabled:

  • Records are processed in epochs of CheckpointInterval() records
  • Checkpoint is saved after each successful epoch
  • On restart, pipeline resumes from the last saved cursor

Error Handling

Without ErrorHandler, the pipeline stops on first error. With it:

func (j *MyJob) OnError(ctx context.Context, stage etl.Stage, err error) etl.Action {
    switch stage {
    case etl.StageExtract:
        slog.Warn("skipping bad record", "error", err)
        return etl.ActionSkip
    case etl.StageTransform:
        slog.Error("transform failed", "error", err)
        return etl.ActionSkip
    case etl.StageLoad:
        return etl.ActionFail // Stop on load errors
    }
    return etl.ActionFail
}

Progress Reporting

func (j *MyJob) ReportInterval() int { return 10000 }

func (j *MyJob) OnProgress(ctx context.Context, stats *etl.Stats) {
    slog.Info("progress",
        "extracted", stats.Extracted(),
        "transformed", stats.Transformed(),
        "loaded", stats.Loaded(),
        "errors", stats.Errors(),
    )
}

Lifecycle Hooks

func (j *MyJob) Start(ctx context.Context) context.Context {
    j.startTime = time.Now()
    slog.Info("pipeline starting")
    return ctx
}

func (j *MyJob) Stop(ctx context.Context, stats *etl.Stats, err error) {
    elapsed := time.Since(j.startTime)
    rate := float64(stats.Loaded()) / elapsed.Seconds()
    if err != nil {
        slog.Error("pipeline failed", "error", err, "loaded", stats.Loaded())
    } else {
        slog.Info("pipeline complete", "loaded", stats.Loaded(), "rate", rate)
    }
}

Graceful Shutdown

Handle SIGINT/SIGTERM for graceful shutdown:

ctx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer cancel()

err := etl.New[Source, Target, int64](job).
    WithDrainTimeout(30 * time.Second).
    Run(ctx)

Execution Modes

Streaming Mode (Default)

Records flow through the pipeline with concurrent stages. Use when checkpointing is not implemented.

Epoch Mode

When Checkpointer is implemented, records are processed in epochs:

  1. Extract up to CheckpointInterval() records
  2. Transform and load with concurrency
  3. Save checkpoint
  4. Repeat

Best Practices

  1. Idempotent Loads — Use UPSERT for safe restarts and checkpoint boundaries
  2. Generous Checkpoint Intervals — 1000-10000 balances recovery granularity vs overhead
  3. Match Load Workers to Connection Pool — Avoid connection exhaustion
  4. Use Stripe Batching for Parallel Loads — Prevents lock contention on same rows

Testing

Comprehensive test coverage including concurrent scenarios:

go test -v ./...

License

MIT License - see LICENSE for details.

Documentation

Overview

Package etl provides a flexible Extract-Transform-Load pipeline framework.

The ETL package uses an interface-based API where your job type implements only the interfaces it needs. The pipeline auto-detects implemented interfaces and configures itself accordingly. Runtime configuration overrides are also available via method chaining.

Quick Start

Implement the required Job interface:

type MyJob struct {
    db *sql.DB
}

func (j *MyJob) Extract(ctx context.Context, cursor *int64) iter.Seq2[Source, error] {
    return func(yield func(Source, error) bool) {
        rows, err := j.db.QueryContext(ctx, "SELECT id, name FROM source WHERE id > $1", lo.FromPtr(cursor))
        if err != nil {
            yield(Source{}, err)
            return
        }
        defer rows.Close()
        for rows.Next() {
            var r Source
            if err := rows.Scan(&r.ID, &r.Name); err != nil {
                if !yield(Source{}, err) { return }
                continue
            }
            if !yield(r, nil) { return }
        }
    }
}

func (j *MyJob) Transform(ctx context.Context, src Source) (Target, error) {
    return Target{ID: src.ID, Name: strings.ToUpper(src.Name)}, nil
}

func (j *MyJob) Load(ctx context.Context, batch []Target) error {
    // UPSERT batch to destination
    return j.db.BulkUpsert(ctx, batch)
}

// Run the pipeline
err := etl.New[Source, Target, int64](&MyJob{db: db}).Run(ctx)

Interface-Based Design

The pipeline auto-detects optional interfaces. Just implement what you need:

// Add filtering by implementing Filter[S]
func (j *MyJob) Include(src Source) bool {
    return src.Active
}

// Add error handling by implementing ErrorHandler
func (j *MyJob) OnError(ctx context.Context, stage etl.Stage, err error) etl.Action {
    slog.Error("error in pipeline", "stage", stage, "error", err)
    return etl.ActionSkip // or etl.ActionFail to stop
}

// Add progress tracking by implementing ProgressReporter
func (j *MyJob) ReportInterval() int { return 10000 }
func (j *MyJob) OnProgress(ctx context.Context, stats *etl.Stats) {
    slog.Info("progress", "loaded", stats.Loaded())
}

Checkpointing for Resumability

Implement Checkpointer[S, C] for checkpoint-based resumability:

func (j *MyJob) CheckpointInterval() int { return 5000 }
func (j *MyJob) Cursor(src Source) int64 { return src.ID }

func (j *MyJob) LoadCheckpoint(ctx context.Context) (cur *int64, stats *etl.Stats, err error) {
    var c sql.NullInt64
    var extracted, filtered, transformed, loaded, errCount int64
    err = j.db.QuerySourceContext(ctx,
        `SELECT cursor, extracted, filtered, transformed, loaded, errors
         FROM checkpoints WHERE job_id = $1`, j.jobID,
    ).Scan(&c, &extracted, &filtered, &transformed, &loaded, &errCount)
    if errors.Is(err, sql.ErrNoSources) || !c.Valid {
        return // Fresh start
    }
    if err != nil {
        return
    }
    return &c.Int64, etl.NewStats(extracted, filtered, transformed, loaded, errCount), nil
}

func (j *MyJob) SaveCheckpoint(ctx context.Context, cursor int64, stats *etl.Stats) error {
    _, err := j.db.ExecContext(ctx,
        `INSERT INTO checkpoints (job_id, cursor, extracted, filtered, transformed, loaded, errors, updated_at)
         VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
         ON CONFLICT (job_id) DO UPDATE SET
             cursor = $2, extracted = $3, filtered = $4, transformed = $5, loaded = $6, errors = $7, updated_at = NOW()`,
        j.jobID, cursor, stats.Extracted(), stats.Filtered(), stats.Transformed(), stats.Loaded(), stats.Errors(),
    )
    return err
}

func (j *MyJob) ClearCheckpoint(ctx context.Context) error {
    _, err := j.db.ExecContext(ctx,
        "DELETE FROM checkpoints WHERE job_id = $1", j.jobID,
    )
    return err
}

When checkpointing is enabled, the pipeline:

  • Processes records in epochs of CheckpointInterval() records
  • Waits for all transforms and loads in an epoch to complete
  • Saves the checkpoint cursor after each successful epoch
  • On restart, resumes from the last saved cursor
  • Clears the checkpoint after successful completion

Execution Modes

The pipeline operates in one of two modes:

Streaming Mode (default): When Checkpointer is not implemented, records flow through the pipeline in a streaming fashion with concurrent stages.

Epoch Mode: When Checkpointer is implemented, records are processed in epochs. Each epoch collects CheckpointInterval() records, processes them with concurrency, then saves a checkpoint before starting the next epoch.

Configuration

Every configuration knob follows the same pattern: a WithXxx builder method and a matching Xxx interface with an Xxx() method. The builder always takes priority.

Configure the pipeline with method chaining:

err := etl.New[Source, Target, int64](&MyJob{}).
    WithTransformWorkers(4).       // Concurrent transform workers
    WithLoadWorkers(2).            // Concurrent load workers
    WithLoadBatchSize(100).        // Targets per load batch
    WithReportInterval(5000).      // Progress interval override
    WithDrainTimeout(time.Minute). // Graceful shutdown timeout
    Run(ctx)

Or implement the corresponding interfaces in your job:

func (j *MyJob) TransformWorkers() int       { return 4 }
func (j *MyJob) LoadWorkers() int             { return 2 }
func (j *MyJob) LoadBatchSize() int            { return 100 }
func (j *MyJob) DrainTimeout() time.Duration   { return time.Minute }

Configuration priority (highest to lowest):

  1. WithXxx() method overrides
  2. Interface implementations
  3. Default values

Batching

By default the pipeline batches records by count using LoadBatchSize (default 100). For custom batching logic, implement Batcher[T] on your job:

func (j *MyJob) Batch(items []Target) [][]Target {
    // Group by category
    byCategory := make(map[string][]Target)
    for _, item := range items {
        byCategory[item.Category] = append(byCategory[item.Category], item)
    }
    var batches [][]Target
    for _, batch := range byCategory {
        batches = append(batches, batch)
    }
    return batches
}

The package also provides ready-made batchers that can be used inside a Batch method implementation:

// Fixed-size batches (same as default behavior)
func (j *MyJob) Batch(items []Target) [][]Target {
    return etl.SizeBatcher[Target](500).Batch(items)
}

// Group by a field value so related records are loaded together
func (j *MyJob) Batch(items []Target) [][]Target {
    return etl.GroupByField(func(t Target) string { return t.Category }).Batch(items)
}

// Group by field, then cap each group at 50
func (j *MyJob) Batch(items []Target) [][]Target {
    return etl.GroupByFieldWithSizeLimit(
        func(t Target) string { return t.Category },
        50,
    ).Batch(items)
}

// Batch by cumulative weight (e.g., SQL parameter limits).
// Each row uses 5 INSERT parameters; Postgres allows 65535 per statement.
func (j *MyJob) Batch(items []Target) [][]Target {
    return etl.WeightedBatcher(func(t Target) int { return 5 }, 65535).Batch(items)
}

// Hash-stripe for parallel load workers without lock contention.
// Items with the same key always land in the same stripe; batches are interleaved
// across stripes so concurrent workers naturally avoid contending on the same rows.
func (j *MyJob) Batch(items []Target) [][]Target {
    return etl.StripeBatcher(
        func(t Target) string { return t.Category },
        4,   // numStripes (match WithLoadWorkers)
        500, // maxBatchSize
    ).Batch(items)
}

// Compose multiple strategies: group by category, then cap at 10 KB per batch
func (j *MyJob) Batch(items []Target) [][]Target {
    return etl.CombineBatchers(
        etl.GroupByField(func(t Target) string { return t.Category }),
        etl.WeightedBatcher(func(t Target) int { return t.Size }, 10*1024),
    ).Batch(items)
}

// Send everything in a single batch (disable batching entirely)
func (j *MyJob) Batch(items []Target) [][]Target {
    return etl.NoBatcher[Target]().Batch(items)
}

Lifecycle Hooks

Implement Starter and/or Stopper for setup and cleanup:

func (j *MyJob) Start(ctx context.Context) context.Context {
    j.startedAt = time.Now()
    slog.Info("starting pipeline")
    return ctx
}

func (j *MyJob) Stop(ctx context.Context, stats *etl.Stats, err error) {
    elapsed := time.Since(j.startedAt)
    rate := float64(stats.Loaded()) / elapsed.Seconds()
    if err != nil {
        slog.Error("pipeline failed", "error", err, "processed", stats.Loaded())
    } else {
        slog.Info("pipeline complete", "loaded", stats.Loaded(), "elapsed", elapsed, "rate", rate)
    }
}

Transform Patterns

One-to-one transformation (implement Transformer[S, T]):

func (j *MyJob) Transform(ctx context.Context, src Source) (Target, error) {
    return Target{ID: src.ID, Name: src.Name}, nil
}

One-to-many transformation (implement Expander[S, T]):

func (j *MyJob) Expand(ctx context.Context, src Source) ([]Target, error) {
    return src.Items, nil // One source row produces multiple output records
}

Filtering Targets

Use the Filter[S] interface for pre-transform filtering:

func (j *MyJob) Include(src Source) bool {
    return !src.Deleted // Skip deleted records before transform
}

This is more efficient than filtering in Transform since it skips the transformation step entirely.

Error Handling

Without ErrorHandler, the pipeline stops on the first error.

With ErrorHandler, you control error behavior:

func (j *MyJob) OnError(ctx context.Context, stage etl.Stage, err error) etl.Action {
    switch stage {
    case etl.StageExtract:
        // Skip malformed records
        slog.Warn("skipping record", "error", err)
        return etl.ActionSkip
    case etl.StageTransform:
        // Log and skip transform errors
        slog.Error("transform error", "error", err)
        return etl.ActionSkip
    case etl.StageLoad:
        // Fail on load errors (data integrity)
        return etl.ActionFail
    }
    return etl.ActionFail
}

Best Practices

Load operations should be idempotent (use UPSERT) to handle:

  • Pipeline restarts after partial completion
  • Re-processing of records near checkpoint boundaries

Use generous CheckpointInterval() values (1000-10000) to balance:

  • Recovery granularity (smaller = less re-processing on restart)
  • Checkpoint overhead (larger = less database writes)

Match LoadWorkers() to your database connection pool size.

For graceful shutdown on SIGINT/SIGTERM, set up signal handling before calling Run:

ctx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer cancel()
err := etl.New[Source, Target, int64](&MyJob{}).Run(ctx)

Index

Examples

Constants

View Source
const (
	DefaultTransformWorkers = 1
	DefaultLoadWorkers      = 1
	DefaultLoadBatchSize    = 100
	DefaultReportInterval   = 10000
	DefaultDrainTimeout     = 5 * time.Minute
)

Default configuration values.

Variables

This section is empty.

Functions

This section is empty.

Types

type Action

type Action string

Action tells the pipeline what to do after an error.

const (
	ActionFail Action = "fail" // Stop pipeline and return error
	ActionSkip Action = "skip" // Skip this record and continue
)

type Batcher

type Batcher[T any] interface {
	// Batch groups items into batches for loading.
	Batch(items []T) [][]T
}

Batcher groups transformed records into batches for loading. Implement this interface on your job when the default size-based batching is insufficient.

The pipeline calls Batch after accumulating records from the transform stage. In streaming mode, Batch is called whenever the pending buffer reaches LoadBatchSize, and once more to flush remaining records. In epoch mode, Batch is called once per epoch with all transformed records from that epoch.

The default batcher (used when Batcher is not implemented) is equivalent to SizeBatcher with the resolved LoadBatchSize.

When to implement a custom Batcher:

  • Records must be grouped by a key (e.g., tenant ID) so each Load call targets a single partition
  • Batch size depends on record weight (e.g., SQL parameter count limits)
  • You need stripe-based batching for parallel load workers

Ready-made batchers are available for common patterns:

Example:

func (j *MyJob) Batch(items []Target) [][]Target {
    return etl.WeightedBatcher(func(t Target) int { return 5 }, 65535).Batch(items)
}

func CombineBatchers

func CombineBatchers[Target any](batchers ...Batcher[Target]) Batcher[Target]

CombineBatchers applies multiple batching strategies in sequence. Each batcher processes the output of the previous batcher.

Example:

// First group by category, then limit each group to 25 items
batcher := etl.CombineBatchers(
	etl.GroupByField(func(t Target) string { return t.Category }),
	etl.SizeBatcher[Target](25),
)
Example
package main

import (
	"fmt"

	"github.com/bjaus/etl"
)

// Target is a target record type for examples.
type Target struct {
	ID       int
	Name     string
	Category string
	Size     int
}

func main() {
	items := []Target{
		{ID: 1, Category: "x"},
		{ID: 2, Category: "x"},
		{ID: 3, Category: "x"},
		{ID: 4, Category: "y"},
		{ID: 5, Category: "y"},
	}

	// First group by category, then split groups into batches of at most 2
	batcher := etl.CombineBatchers(
		etl.GroupByField(func(t Target) string { return t.Category }),
		etl.SizeBatcher[Target](2),
	)
	batches := batcher.Batch(items)

	fmt.Println("batches:", len(batches))
	for _, batch := range batches {
		fmt.Printf("  category=%s count=%d\n", batch[0].Category, len(batch))
	}

}
Output:

batches: 3
  category=x count=2
  category=x count=1
  category=y count=2

func GroupByField

func GroupByField[Target any, K comparable](keyExtractor func(Target) K) Batcher[Target]

GroupByField creates batches by grouping items that have the same value for a given field. All items with the same key will be in the same batch, regardless of batch size.

Example:

// Group all records by category
batcher := etl.GroupByField(func(t Target) string {
	return t.Category
})
Example
package main

import (
	"fmt"

	"github.com/bjaus/etl"
)

// Target is a target record type for examples.
type Target struct {
	ID       int
	Name     string
	Category string
	Size     int
}

func main() {
	items := []Target{
		{ID: 1, Category: "x"},
		{ID: 2, Category: "y"},
		{ID: 3, Category: "x"},
		{ID: 4, Category: "y"},
	}

	batcher := etl.GroupByField(func(t Target) string { return t.Category })
	batches := batcher.Batch(items)

	fmt.Println("batches:", len(batches))
	for _, batch := range batches {
		fmt.Printf("  category=%s count=%d\n", batch[0].Category, len(batch))
	}

}
Output:

batches: 2
  category=x count=2
  category=y count=2

func GroupByFieldWithSizeLimit

func GroupByFieldWithSizeLimit[Target any, K comparable](
	keyExtractor func(Target) K,
	maxGroupSize int,
) Batcher[Target]

GroupByFieldWithSizeLimit combines field grouping with size limits per group. Items are first grouped by field value, then large groups are split into smaller batches.

Example:

// Group by category, but limit each group to 50 records
batcher := etl.GroupByFieldWithSizeLimit(
	func(t Target) string { return t.Category },
	50,
)
Example
package main

import (
	"fmt"

	"github.com/bjaus/etl"
)

// Target is a target record type for examples.
type Target struct {
	ID       int
	Name     string
	Category string
	Size     int
}

func main() {
	items := []Target{
		{ID: 1, Category: "x"},
		{ID: 2, Category: "x"},
		{ID: 3, Category: "x"},
		{ID: 4, Category: "y"},
	}

	// Group by category, but cap each group at 2 items
	batcher := etl.GroupByFieldWithSizeLimit(
		func(t Target) string { return t.Category },
		2,
	)
	batches := batcher.Batch(items)

	fmt.Println("batches:", len(batches))
	for _, batch := range batches {
		fmt.Printf("  category=%s count=%d\n", batch[0].Category, len(batch))
	}

}
Output:

batches: 3
  category=x count=2
  category=x count=1
  category=y count=1

func NoBatcher

func NoBatcher[Target any]() Batcher[Target]

NoBatcher returns items as a single batch (no batching).

Example
package main

import (
	"fmt"

	"github.com/bjaus/etl"
)

func main() {
	batcher := etl.NoBatcher[string]()
	batches := batcher.Batch([]string{"a", "b", "c"})
	fmt.Println(batches)

}
Output:

[[a b c]]

func SizeBatcher

func SizeBatcher[Target any](maxSize int) Batcher[Target]

SizeBatcher creates batches with a maximum number of items per batch.

Example:

// Create batches of up to 100 items each
batcher := etl.SizeBatcher[MyRecord](100)
Example
package main

import (
	"fmt"

	"github.com/bjaus/etl"
)

func main() {
	batcher := etl.SizeBatcher[string](2)
	batches := batcher.Batch([]string{"a", "b", "c", "d", "e"})
	fmt.Println(batches)

}
Output:

[[a b] [c d] [e]]

func StripeBatcher

func StripeBatcher[T any, K comparable](keyFn func(T) K, numStripes int, maxBatchSize int) Batcher[T]

StripeBatcher assigns items to stripes via a hash of their key, then interleaves batches across stripes in round-robin order. This is designed for parallel load workers: items with the same key always land in the same stripe, and the interleaving ensures concurrent workers naturally process different stripes, avoiding lock contention on shared resources (e.g., database rows).

Within each stripe, items are grouped by key for locality, then chunked by maxBatchSize. The output order is: stripe 0 batch 0, stripe 1 batch 0, ..., stripe 0 batch 1, stripe 1 batch 1, etc.

Example:

// 4 stripes (match load worker count), max 500 items per batch
batcher := etl.StripeBatcher(
    func(r Record) string { return r.TenantID },
    4,   // numStripes
    500, // maxBatchSize
)
Example
package main

import (
	"fmt"

	"github.com/bjaus/etl"
)

// Target is a target record type for examples.
type Target struct {
	ID       int
	Name     string
	Category string
	Size     int
}

func main() {
	items := []Target{
		{ID: 1, Category: "a"},
		{ID: 2, Category: "b"},
		{ID: 3, Category: "a"},
		{ID: 4, Category: "c"},
		{ID: 5, Category: "b"},
		{ID: 6, Category: "a"},
	}

	// 2 stripes, max 10 items per batch
	batcher := etl.StripeBatcher(
		func(t Target) string { return t.Category },
		2,
		10,
	)
	batches := batcher.Batch(items)

	fmt.Println("batches:", len(batches))
	total := 0
	for _, batch := range batches {
		total += len(batch)
	}
	fmt.Println("total items:", total)

}
Output:

batches: 2
total items: 6

func WeightedBatcher

func WeightedBatcher[Target any](weigher func(Target) int, maxWeight int) Batcher[Target]

WeightedBatcher creates batches where the total weight does not exceed maxWeight. The weigher function returns the weight of each individual item. Items are accumulated into a batch until adding the next item would exceed maxWeight, at which point a new batch is started.

If a single item exceeds maxWeight, it is placed in its own batch (never dropped).

Common use cases:

  • SQL parameter limits: each record contributes N parameters to an INSERT
  • Payload size limits: each record contributes a variable number of bytes
  • API rate/cost budgets: each record has a variable cost

Example:

// Batch by SQL parameter count (5 columns per row, 65535 param limit)
batcher := etl.WeightedBatcher(func(t Target) int { return 5 }, 65535)

// Batch by estimated payload size
batcher := etl.WeightedBatcher(func(t Target) int { return len(t.Body) }, 10*1024*1024)
Example
package main

import (
	"fmt"

	"github.com/bjaus/etl"
)

// Target is a target record type for examples.
type Target struct {
	ID       int
	Name     string
	Category string
	Size     int
}

func main() {
	items := []Target{
		{ID: 1, Size: 30},
		{ID: 2, Size: 30},
		{ID: 3, Size: 30},
		{ID: 4, Size: 30},
		{ID: 5, Size: 30},
	}

	// Batch so total size per batch does not exceed 70
	batcher := etl.WeightedBatcher(func(t Target) int { return t.Size }, 70)
	batches := batcher.Batch(items)

	for i, batch := range batches {
		total := 0
		for _, t := range batch {
			total += t.Size
		}
		fmt.Printf("batch %d: %d items, total size %d\n", i, len(batch), total)
	}

}
Output:

batch 0: 2 items, total size 60
batch 1: 2 items, total size 60
batch 2: 1 items, total size 30

type BatcherFunc

type BatcherFunc[T any] func(items []T) [][]T

BatcherFunc adapts a plain function to the Batcher interface.

Example:

batcher := etl.BatcherFunc[Target](func(items []Target) [][]Target {
    // custom batching logic
    return [][]Target{items}
})

func (BatcherFunc[T]) Batch

func (f BatcherFunc[T]) Batch(items []T) [][]T

type Checkpointer

type Checkpointer[S any, C comparable] interface {
	// CheckpointInterval returns the number of records to process per epoch.
	CheckpointInterval() int

	// Cursor extracts a checkpoint cursor from a source record.
	// The cursor must increase monotonically with source ordering.
	Cursor(src S) C

	// LoadCheckpoint retrieves the last saved cursor and stats.
	// Return (nil, nil, nil) if no checkpoint exists (fresh start).
	LoadCheckpoint(ctx context.Context) (cursor *C, stats *Stats, err error)

	// SaveCheckpoint persists cursor and stats.
	// Called by the pipeline after each epoch completes successfully.
	SaveCheckpoint(ctx context.Context, cursor C, stats *Stats) error

	// ClearCheckpoint removes the saved checkpoint.
	// Called by the pipeline after successful completion.
	ClearCheckpoint(ctx context.Context) error
}

Checkpointer enables checkpoint-based resumability for long-running ETL jobs.

When implemented, the pipeline switches from streaming mode to epoch-based mode. Records are processed in epochs of CheckpointInterval() records:

  1. Extract up to CheckpointInterval() records
  2. Transform and load them with configured concurrency
  3. Save a checkpoint (cursor + stats) after the epoch completes
  4. Repeat from step 1 until the source is exhausted
  5. Clear the checkpoint on successful completion

On restart, LoadCheckpoint returns the saved cursor and stats so the pipeline resumes from where it left off. Extract receives the cursor to skip already- processed records.

Use Checkpointer when:

  • The dataset is large enough that re-processing from scratch is expensive
  • The job may be interrupted (deploys, spot instances, timeouts)
  • You need progress durability across process restarts

Idempotency requirement: because records near a checkpoint boundary may be re-processed on restart, the Load implementation should be idempotent (use UPSERT rather than INSERT). Cursor values must increase monotonically with source ordering so that resuming from a cursor skips the right records.

Choosing CheckpointInterval:

  • Smaller values (500-2000): finer recovery granularity, more checkpoint writes
  • Larger values (5000-10000): fewer checkpoint writes, more re-processing on restart
  • Match to your source's natural page size when possible

Interaction with graceful shutdown: when the parent context is cancelled, the current epoch is allowed to complete (within DrainTimeout), and the checkpoint is saved before the pipeline exits. This means the next restart will not re-process the completed epoch.

Example:

func (j *MyJob) CheckpointInterval() int { return 5000 }

func (j *MyJob) Cursor(src Source) int64 { return src.ID }

func (j *MyJob) LoadCheckpoint(ctx context.Context) (*int64, *etl.Stats, error) {
    // Return (nil, nil, nil) for a fresh start
    row := j.db.QueryRowContext(ctx, "SELECT cursor, stats FROM checkpoints WHERE job = $1", j.id)
    var cursor int64
    var statsJSON []byte
    if err := row.Scan(&cursor, &statsJSON); err != nil {
        if errors.Is(err, sql.ErrNoRows) { return nil, nil, nil }
        return nil, nil, err
    }
    var stats etl.Stats
    if err := stats.UnmarshalJSON(statsJSON); err != nil { return nil, nil, err }
    return &cursor, &stats, nil
}

func (j *MyJob) SaveCheckpoint(ctx context.Context, cursor int64, stats *etl.Stats) error {
    data, _ := stats.MarshalJSON()
    _, err := j.db.ExecContext(ctx,
        `INSERT INTO checkpoints (job, cursor, stats) VALUES ($1, $2, $3)
         ON CONFLICT (job) DO UPDATE SET cursor = $2, stats = $3`, j.id, cursor, data)
    return err
}

func (j *MyJob) ClearCheckpoint(ctx context.Context) error {
    _, err := j.db.ExecContext(ctx, "DELETE FROM checkpoints WHERE job = $1", j.id)
    return err
}

type DrainTimeout

type DrainTimeout interface {
	// DrainTimeout returns the maximum time to wait for in-flight operations
	// to complete after the parent context is cancelled.
	// A zero value disables graceful shutdown (immediate abort).
	DrainTimeout() time.Duration
}

DrainTimeout controls graceful shutdown behavior. When the parent context is cancelled (e.g., SIGTERM), the pipeline:

  1. Stops extracting new records immediately
  2. Allows in-flight transform/load operations to complete within the timeout
  3. If operations complete within timeout: exits cleanly with nil error
  4. If timeout expires: forces abort with error

Implement this interface to set the timeout from the job struct rather than the pipeline builder.

The value can be overridden at runtime via WithDrainTimeout, which takes precedence. If neither is set, DefaultDrainTimeout (5 minutes) is used.

Set to 0 to disable graceful shutdown entirely (immediate abort on context cancellation). Negative values passed to WithDrainTimeout are ignored.

When to customize:

  • Jobs with fast loads (< 1s per batch): a short timeout (30s) is sufficient
  • Jobs with slow external API calls: increase to match worst-case latency
  • Jobs where partial progress is not useful: set to 0 to fail fast

In checkpoint mode, a successful drain saves the checkpoint before exiting, so the next restart picks up exactly where the shutdown left off.

Example:

func (j *MyJob) DrainTimeout() time.Duration { return 30 * time.Second }

type ErrorHandler

type ErrorHandler interface {
	// OnError is called when an error occurs during any stage.
	// Return ActionSkip to continue processing, ActionFail to stop the pipeline.
	OnError(ctx context.Context, stage Stage, err error) Action
}

ErrorHandler customizes error handling per pipeline stage. Without an ErrorHandler, the pipeline stops on the first error in any stage.

Implement this interface when you want to:

  • Log errors and continue processing (return ActionSkip)
  • Apply different strategies per stage (e.g., skip extract errors, fail on load errors)
  • Track error counts for alerting or metrics

Common patterns:

// Log-and-skip for transform, fail for load (data integrity)
func (j *MyJob) OnError(ctx context.Context, stage etl.Stage, err error) etl.Action {
    switch stage {
    case etl.StageExtract:
        slog.Warn("skipping malformed record", "error", err)
        return etl.ActionSkip
    case etl.StageTransform:
        slog.Error("transform error", "error", err)
        return etl.ActionSkip
    case etl.StageLoad:
        return etl.ActionFail
    }
    return etl.ActionFail
}

Skipped errors still increment Stats.Errors. The err parameter passed to Stopper.Stop only contains the fatal error that caused the pipeline to fail (i.e., when OnError returned ActionFail or when no ErrorHandler is present).

type Expander

type Expander[S, T any] interface {
	Expand(ctx context.Context, src S) ([]T, error)
}

Expander converts one input record to multiple output records. Use this when a single source record needs to produce a variable number of target records.

Returning an empty or nil slice effectively filters the record out — no target records are produced and nothing reaches the load stage.

When to use:

  • Denormalization: a source row with nested items produces one target per item
  • Splitting: a source record contains multiple logical entities
  • Conditional expansion: some records produce targets, others don't

Example:

func (j *MyJob) Expand(ctx context.Context, src Source) ([]Target, error) {
    targets := make([]Target, 0, len(src.Items))
    for _, item := range src.Items {
        targets = append(targets, Target{ParentID: src.ID, ItemID: item.ID})
    }
    return targets, nil
}

type Filter

type Filter[S any] interface {
	// Include returns true if the record should be processed.
	// Returning false skips the record before it reaches the transform stage.
	Include(src S) bool
}

Filter excludes records before transformation. Implement this interface when you need to skip records based on their content without incurring the cost of transformation.

Filter runs in the extract stage, before any concurrency. This makes it the most efficient place to drop records because filtered records never enter the transform or load stages and never consume worker capacity.

Use Filter when you have:

  • Soft-deleted records that should be ignored
  • Records outside a target date range
  • Inactive or disabled entities
  • Any condition that can be evaluated from the source record alone

Example:

func (j *MyJob) Include(src Source) bool {
    return !src.Deleted && src.UpdatedAt.After(j.since)
}

If you need access to context or want to produce errors, consider filtering inside Transformer or Expander instead. Returning an empty slice from Expander has the same effect, but the record still passes through the transform stage.

Filter interacts with Checkpointer: in epoch mode, filtered records still advance the cursor so that resuming from a checkpoint does not re-process records that were already seen and skipped.

type Job

type Job[S, T any, C comparable] interface {
	// Extract yields records from the source.
	// cursor is nil on fresh start, or the last checkpoint cursor on resume.
	// If Checkpointer[S, C] is not implemented, cursor is always nil.
	Extract(ctx context.Context, cursor *C) iter.Seq2[S, error]

	// Load writes a batch of records to the destination.
	// Should be idempotent (UPSERT) to handle potential re-processing.
	Load(ctx context.Context, batch []T) error
}

Job defines the core ETL operations. This is the only required interface to implement.

The type parameters are:

  • S: source record type (extracted from the data source)
  • T: target record type (loaded to the destination)
  • C: cursor type for checkpoint-based resumability (use any if not needed)

For transformation, implement one of:

  • Transformer: 1:1 transform (one input record -> one output record)
  • Expander: 1:N transform (one input record -> multiple output records)

If both are implemented, Transformer takes precedence.

type LoadBatchSize

type LoadBatchSize interface {
	// LoadBatchSize returns the number of records to batch before loading.
	LoadBatchSize() int
}

LoadBatchSize controls the number of records batched together before calling Load. Implement this interface to set the batch size from the job struct rather than the pipeline builder.

The value can be overridden at runtime via WithLoadBatchSize, which takes precedence. If neither is set, DefaultLoadBatchSize (100) is used.

This value is used as the default batch size when no custom Batcher is implemented. When a custom Batcher is present, this value is ignored in favor of the Batcher's own logic.

Tuning guidance:

  • For SQL INSERTs: balance between fewer round-trips (larger batches) and staying within parameter limits (e.g., PostgreSQL's 65535 param limit)
  • For API calls: match the external API's preferred batch size
  • For file writes: larger batches reduce I/O overhead

Example:

func (j *MyJob) LoadBatchSize() int { return 500 }

type LoadWorkers

type LoadWorkers interface {
	// LoadWorkers returns the number of concurrent load workers.
	LoadWorkers() int
}

LoadWorkers controls worker parallelism for the load stage. Implement this interface to set the concurrency level from the job struct rather than the pipeline builder.

The value can be overridden at runtime via WithLoadWorkers, which takes precedence. If neither is set, DefaultLoadWorkers (1) is used.

Tuning guidance:

  • Match to your database connection pool size to avoid pool exhaustion
  • For batch INSERTs/UPSERTs, 2-4 workers often saturates a single database
  • Consider using StripeBatcher with matching stripe count to avoid lock contention when multiple workers write to the same table

Example:

func (j *MyJob) LoadWorkers() int { return 4 }

type Pipeline

type Pipeline[S, T any, C comparable] struct {
	// contains filtered or unexported fields
}

Pipeline orchestrates the ETL process.

func New

func New[S, T any, C comparable](job Job[S, T, C]) *Pipeline[S, T, C]

New creates a new Pipeline for the given job. The job must implement Job[S, T, C]. Optional interfaces are auto-detected.

For transformation, the job must implement one of:

  • Transformer[S, T]: 1:1 transform (one input record -> one output record)
  • Expander[S, T]: 1:N transform (one input record -> multiple output records)

If both Transformer and Expander are implemented, Transformer takes precedence. Panics if neither is implemented.

Example
package main

import (
	"context"
	"fmt"
	"iter"

	"github.com/bjaus/etl"
)

// Source is a source record type for examples.
type Source struct {
	ID   int
	Name string
}

// Target is a target record type for examples.
type Target struct {
	ID       int
	Name     string
	Category string
	Size     int
}

type basicJob struct {
	rows []Source
}

func (j *basicJob) Extract(_ context.Context, _ *int) iter.Seq2[Source, error] {
	return func(yield func(Source, error) bool) {
		for _, r := range j.rows {
			if !yield(r, nil) {
				return
			}
		}
	}
}

func (j *basicJob) Transform(_ context.Context, src Source) (Target, error) {
	return Target{ID: src.ID, Name: src.Name + "!"}, nil
}

func (j *basicJob) Load(_ context.Context, batch []Target) error {
	for _, r := range batch {
		fmt.Printf("loaded: %d %s\n", r.ID, r.Name)
	}
	return nil
}

func main() {
	job := &basicJob{
		rows: []Source{
			{ID: 1, Name: "Alice"},
			{ID: 2, Name: "Bob"},
		},
	}

	err := etl.New[Source, Target, int](job).Run(context.Background())
	if err != nil {
		fmt.Println("error:", err)
	}

}
Output:

loaded: 1 Alice!
loaded: 2 Bob!

func (*Pipeline[S, T, C]) Run

func (p *Pipeline[S, T, C]) Run(ctx context.Context) error

Run executes the pipeline.

Example
package main

import (
	"context"
	"fmt"
	"iter"

	"github.com/bjaus/etl"
)

// Source is a source record type for examples.
type Source struct {
	ID   int
	Name string
}

// Target is a target record type for examples.
type Target struct {
	ID       int
	Name     string
	Category string
	Size     int
}

type basicJob struct {
	rows []Source
}

func (j *basicJob) Extract(_ context.Context, _ *int) iter.Seq2[Source, error] {
	return func(yield func(Source, error) bool) {
		for _, r := range j.rows {
			if !yield(r, nil) {
				return
			}
		}
	}
}

func (j *basicJob) Transform(_ context.Context, src Source) (Target, error) {
	return Target{ID: src.ID, Name: src.Name + "!"}, nil
}

func (j *basicJob) Load(_ context.Context, batch []Target) error {
	for _, r := range batch {
		fmt.Printf("loaded: %d %s\n", r.ID, r.Name)
	}
	return nil
}

func main() {
	job := &basicJob{
		rows: []Source{
			{ID: 1, Name: "Alice"},
			{ID: 2, Name: "Bob"},
			{ID: 3, Name: "Charlie"},
		},
	}

	err := etl.New[Source, Target, int](job).
		WithLoadBatchSize(2).
		WithTransformWorkers(2).
		Run(context.Background())
	if err != nil {
		fmt.Println("error:", err)
	}

}
Output:

loaded: 1 Alice!
loaded: 2 Bob!
loaded: 3 Charlie!

func (*Pipeline[S, T, C]) WithDrainTimeout

func (p *Pipeline[S, T, C]) WithDrainTimeout(d time.Duration) *Pipeline[S, T, C]

WithDrainTimeout overrides the graceful shutdown timeout. When the parent context is cancelled, the pipeline will wait up to this duration for in-flight operations to complete before forcing an abort. Priority: this method > DrainTimeout interface > DefaultDrainTimeout. Set to 0 to disable graceful shutdown (immediate abort). Negative values are ignored.

func (*Pipeline[S, T, C]) WithLoadBatchSize

func (p *Pipeline[S, T, C]) WithLoadBatchSize(n int) *Pipeline[S, T, C]

WithLoadBatchSize overrides the number of records to batch before loading. Priority: this method > LoadBatchSize interface > DefaultLoadBatchSize. Values less than 1 are ignored.

func (*Pipeline[S, T, C]) WithLoadWorkers

func (p *Pipeline[S, T, C]) WithLoadWorkers(n int) *Pipeline[S, T, C]

WithLoadWorkers overrides the number of concurrent load workers. Priority: this method > LoadWorkers interface > DefaultLoadWorkers. Values less than 1 are ignored.

func (*Pipeline[S, T, C]) WithReportInterval

func (p *Pipeline[S, T, C]) WithReportInterval(n int) *Pipeline[S, T, C]

WithReportInterval overrides how often to report progress (in records). Priority: this method > ProgressReporter interface > DefaultReportInterval. Values less than 1 are ignored.

func (*Pipeline[S, T, C]) WithTransformWorkers

func (p *Pipeline[S, T, C]) WithTransformWorkers(n int) *Pipeline[S, T, C]

WithTransformWorkers overrides the number of concurrent transform workers. Priority: this method > TransformWorkers interface > DefaultTransformWorkers. Values less than 1 are ignored.

type ProgressReporter

type ProgressReporter interface {
	ReportInterval

	// OnProgress is called periodically during execution.
	OnProgress(ctx context.Context, stats *Stats)
}

ProgressReporter receives periodic progress updates during pipeline execution. Implement this interface when you want to log throughput, emit metrics, or update an external dashboard while the pipeline is running.

OnProgress is called each time the cumulative loaded count crosses a ReportInterval boundary. In streaming mode this happens inside load workers; in epoch mode it happens during batch loading within each epoch.

The Stats snapshot passed to OnProgress is safe to read concurrently. Avoid performing blocking I/O inside OnProgress since it runs on a load worker goroutine.

Use ProgressReporter when:

  • You want periodic log lines showing loaded/error counts
  • You need to push throughput metrics to Prometheus, Datadog, etc.
  • You want a heartbeat to detect stalled pipelines

Example:

func (j *MyJob) ReportInterval() int { return 10000 }

func (j *MyJob) OnProgress(ctx context.Context, stats *etl.Stats) {
    slog.InfoContext(ctx, "progress",
        "extracted", stats.Extracted(),
        "loaded", stats.Loaded(),
        "errors", stats.Errors(),
    )
}

To override the interval at runtime without changing the job struct, use WithReportInterval on the pipeline builder instead.

type ReportInterval

type ReportInterval interface {
	// ReportInterval returns how often to call OnProgress (in records loaded).
	ReportInterval() int
}

ReportInterval controls how often progress is reported, measured in records loaded. This interface can be implemented independently of ProgressReporter when you want to set the interval via the job struct rather than the builder.

The value can be overridden at runtime via WithReportInterval, which takes precedence over this interface. If neither is set, DefaultReportInterval (10,000 records) is used.

This interface is embedded in ProgressReporter, so implementing ProgressReporter automatically satisfies ReportInterval.

Example:

func (j *MyJob) ReportInterval() int { return 5000 }

type Stage

type Stage string

Stage identifies where in the pipeline an event occurred.

const (
	StageExtract   Stage = "extract"
	StageTransform Stage = "transform"
	StageLoad      Stage = "load"
)

type Starter

type Starter interface {
	// Start is called before extraction begins.
	// The returned context is used for the entire pipeline.
	Start(ctx context.Context) context.Context
}

Starter is called before pipeline execution begins. Implement this interface when you need to perform setup work or enrich the context before extraction starts.

Use Starter for:

  • Adding values to the context (request IDs, trace spans, logger fields)
  • Recording the pipeline start time for elapsed-time metrics
  • Acquiring resources that must be held for the pipeline's lifetime
  • Logging the start of a pipeline run

The context returned by Start is propagated to all pipeline stages and to Stopper.Stop. This makes it the right place to attach tracing spans or cancellation signals.

Example:

func (j *MyJob) Start(ctx context.Context) context.Context {
    j.startedAt = time.Now()
    slog.InfoContext(ctx, "pipeline starting")
    return ctx
}

Start is called exactly once, before the first call to Extract.

type Stats

type Stats struct {
	// contains filtered or unexported fields
}

Stats provides pipeline statistics with thread-safe access. Counter fields use atomic operations for safe concurrent access from worker goroutines.

func NewStats

func NewStats(extracted, filtered, transformed, loaded, errors int64) *Stats

NewStats creates a Stats with initial counter values. Use this when loading checkpoint data from storage.

func (*Stats) Errors

func (s *Stats) Errors() int64

Errors returns the number of errors encountered.

func (*Stats) Extracted

func (s *Stats) Extracted() int64

Extracted returns the number of records extracted.

func (*Stats) Filtered

func (s *Stats) Filtered() int64

Filtered returns the number of records filtered out before transformation.

func (*Stats) Loaded

func (s *Stats) Loaded() int64

Loaded returns the number of records loaded.

func (*Stats) LogValue

func (s *Stats) LogValue() slog.Value

LogValue implements slog.LogValuer for structured logging.

func (*Stats) MarshalJSON

func (s *Stats) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler for Stats serialization.

func (*Stats) Transformed

func (s *Stats) Transformed() int64

Transformed returns the number of records transformed.

func (*Stats) UnmarshalJSON

func (s *Stats) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler for Stats deserialization.

type Stopper

type Stopper interface {
	// Stop is called exactly once, after the pipeline Run method returns.
	Stop(ctx context.Context, stats *Stats, err error)
}

Stopper is called after pipeline execution completes, regardless of whether the pipeline succeeded, failed, or was shut down gracefully. Implement this interface for cleanup, final logging, or metrics reporting.

Use Stopper for:

  • Logging final stats and elapsed time
  • Reporting success/failure metrics to an observability system
  • Releasing resources acquired in Starter
  • Sending completion notifications

The ctx passed to Stop is derived from the pipeline's drain context, which remains valid even during graceful shutdown. This allows Stop to perform cleanup operations (database writes, API calls) after the parent context has been cancelled.

The err parameter is the same error value returned by Run: the unrecoverable error that caused Run to fail (no ErrorHandler, or ErrorHandler returned ActionFail). Errors handled with ActionSkip do not appear in err, even though they increment stats.Errors while the pipeline continues processing.

Example:

func (j *MyJob) Stop(ctx context.Context, stats *etl.Stats, err error) {
    elapsed := time.Since(j.startedAt)
    if err != nil {
        slog.ErrorContext(ctx, "pipeline failed", "error", err, "stats", stats, "elapsed", elapsed)
    } else {
        slog.InfoContext(ctx, "pipeline complete", "stats", stats, "elapsed", elapsed)
    }
}

Stop is called exactly once, after the pipeline Run method returns.

type TransformWorkers

type TransformWorkers interface {
	// TransformWorkers returns the number of concurrent transform workers.
	TransformWorkers() int
}

TransformWorkers controls worker parallelism for the transform stage. Implement this interface to set the concurrency level from the job struct rather than the pipeline builder.

The value can be overridden at runtime via WithTransformWorkers, which takes precedence. If neither is set, DefaultTransformWorkers (1) is used.

Tuning guidance:

  • CPU-bound transforms (parsing, hashing, serialization): set to runtime.NumCPU()
  • I/O-bound transforms (HTTP calls, cache lookups): set higher (10-50) depending on the external service's capacity
  • Pure field mapping with no I/O: 1 worker is usually sufficient since the overhead of goroutine coordination outweighs the parallelism benefit

Example:

func (j *MyJob) TransformWorkers() int { return runtime.NumCPU() }

type Transformer

type Transformer[S, T any] interface {
	Transform(ctx context.Context, src S) (T, error)
}

Transformer converts one input record to one output record. Use this for simple 1:1 mappings where each source record produces exactly one target record.

When to use Transformer vs Expander:

  • Transformer: field mapping, format conversion, enrichment — one in, one out
  • Expander: denormalization, splitting — one source record produces a variable number of output records (including zero, which acts as a filter)

Example:

func (j *MyJob) Transform(ctx context.Context, src Source) (Target, error) {
    return Target{
        ID:   src.ID,
        Name: strings.ToUpper(src.Name),
    }, nil
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL