Documentation
¶
Overview ¶
Package etl provides a flexible Extract-Transform-Load pipeline framework.
The ETL package uses an interface-based API where your job type implements only the interfaces it needs. The pipeline auto-detects implemented interfaces and configures itself accordingly. Runtime configuration overrides are also available via method chaining.
Quick Start ¶
Implement the required Job interface:
type MyJob struct {
db *sql.DB
}
func (j *MyJob) Extract(ctx context.Context, cursor *int64) iter.Seq2[Source, error] {
return func(yield func(Source, error) bool) {
rows, err := j.db.QueryContext(ctx, "SELECT id, name FROM source WHERE id > $1", lo.FromPtr(cursor))
if err != nil {
yield(Source{}, err)
return
}
defer rows.Close()
for rows.Next() {
var r Source
if err := rows.Scan(&r.ID, &r.Name); err != nil {
if !yield(Source{}, err) { return }
continue
}
if !yield(r, nil) { return }
}
}
}
func (j *MyJob) Transform(ctx context.Context, src Source) (Target, error) {
return Target{ID: src.ID, Name: strings.ToUpper(src.Name)}, nil
}
func (j *MyJob) Load(ctx context.Context, batch []Target) error {
// UPSERT batch to destination
return j.db.BulkUpsert(ctx, batch)
}
// Run the pipeline
err := etl.New[Source, Target, int64](&MyJob{db: db}).Run(ctx)
Interface-Based Design ¶
The pipeline auto-detects optional interfaces. Just implement what you need:
// Add filtering by implementing Filter[S]
func (j *MyJob) Include(src Source) bool {
return src.Active
}
// Add error handling by implementing ErrorHandler
func (j *MyJob) OnError(ctx context.Context, stage etl.Stage, err error) etl.Action {
slog.Error("error in pipeline", "stage", stage, "error", err)
return etl.ActionSkip // or etl.ActionFail to stop
}
// Add progress tracking by implementing ProgressReporter
func (j *MyJob) ReportInterval() int { return 10000 }
func (j *MyJob) OnProgress(ctx context.Context, stats *etl.Stats) {
slog.Info("progress", "loaded", stats.Loaded())
}
Checkpointing for Resumability ¶
Implement Checkpointer[S, C] for checkpoint-based resumability:
func (j *MyJob) CheckpointInterval() int { return 5000 }
func (j *MyJob) Cursor(src Source) int64 { return src.ID }
func (j *MyJob) LoadCheckpoint(ctx context.Context) (cur *int64, stats *etl.Stats, err error) {
var c sql.NullInt64
var extracted, filtered, transformed, loaded, errCount int64
err = j.db.QuerySourceContext(ctx,
`SELECT cursor, extracted, filtered, transformed, loaded, errors
FROM checkpoints WHERE job_id = $1`, j.jobID,
).Scan(&c, &extracted, &filtered, &transformed, &loaded, &errCount)
if errors.Is(err, sql.ErrNoSources) || !c.Valid {
return // Fresh start
}
if err != nil {
return
}
return &c.Int64, etl.NewStats(extracted, filtered, transformed, loaded, errCount), nil
}
func (j *MyJob) SaveCheckpoint(ctx context.Context, cursor int64, stats *etl.Stats) error {
_, err := j.db.ExecContext(ctx,
`INSERT INTO checkpoints (job_id, cursor, extracted, filtered, transformed, loaded, errors, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
ON CONFLICT (job_id) DO UPDATE SET
cursor = $2, extracted = $3, filtered = $4, transformed = $5, loaded = $6, errors = $7, updated_at = NOW()`,
j.jobID, cursor, stats.Extracted(), stats.Filtered(), stats.Transformed(), stats.Loaded(), stats.Errors(),
)
return err
}
func (j *MyJob) ClearCheckpoint(ctx context.Context) error {
_, err := j.db.ExecContext(ctx,
"DELETE FROM checkpoints WHERE job_id = $1", j.jobID,
)
return err
}
When checkpointing is enabled, the pipeline:
- Processes records in epochs of CheckpointInterval() records
- Waits for all transforms and loads in an epoch to complete
- Saves the checkpoint cursor after each successful epoch
- On restart, resumes from the last saved cursor
- Clears the checkpoint after successful completion
Execution Modes ¶
The pipeline operates in one of two modes:
Streaming Mode (default): When Checkpointer is not implemented, records flow through the pipeline in a streaming fashion with concurrent stages.
Epoch Mode: When Checkpointer is implemented, records are processed in epochs. Each epoch collects CheckpointInterval() records, processes them with concurrency, then saves a checkpoint before starting the next epoch.
Configuration ¶
Every configuration knob follows the same pattern: a WithXxx builder method and a matching Xxx interface with an Xxx() method. The builder always takes priority.
Configure the pipeline with method chaining:
err := etl.New[Source, Target, int64](&MyJob{}).
WithTransformWorkers(4). // Concurrent transform workers
WithLoadWorkers(2). // Concurrent load workers
WithLoadBatchSize(100). // Targets per load batch
WithReportInterval(5000). // Progress interval override
WithDrainTimeout(time.Minute). // Graceful shutdown timeout
Run(ctx)
Or implement the corresponding interfaces in your job:
func (j *MyJob) TransformWorkers() int { return 4 }
func (j *MyJob) LoadWorkers() int { return 2 }
func (j *MyJob) LoadBatchSize() int { return 100 }
func (j *MyJob) DrainTimeout() time.Duration { return time.Minute }
Configuration priority (highest to lowest):
- WithXxx() method overrides
- Interface implementations
- Default values
Batching ¶
By default the pipeline batches records by count using LoadBatchSize (default 100). For custom batching logic, implement Batcher[T] on your job:
func (j *MyJob) Batch(items []Target) [][]Target {
// Group by category
byCategory := make(map[string][]Target)
for _, item := range items {
byCategory[item.Category] = append(byCategory[item.Category], item)
}
var batches [][]Target
for _, batch := range byCategory {
batches = append(batches, batch)
}
return batches
}
The package also provides ready-made batchers that can be used inside a Batch method implementation:
// Fixed-size batches (same as default behavior)
func (j *MyJob) Batch(items []Target) [][]Target {
return etl.SizeBatcher[Target](500).Batch(items)
}
// Group by a field value so related records are loaded together
func (j *MyJob) Batch(items []Target) [][]Target {
return etl.GroupByField(func(t Target) string { return t.Category }).Batch(items)
}
// Group by field, then cap each group at 50
func (j *MyJob) Batch(items []Target) [][]Target {
return etl.GroupByFieldWithSizeLimit(
func(t Target) string { return t.Category },
50,
).Batch(items)
}
// Batch by cumulative weight (e.g., SQL parameter limits).
// Each row uses 5 INSERT parameters; Postgres allows 65535 per statement.
func (j *MyJob) Batch(items []Target) [][]Target {
return etl.WeightedBatcher(func(t Target) int { return 5 }, 65535).Batch(items)
}
// Hash-stripe for parallel load workers without lock contention.
// Items with the same key always land in the same stripe; batches are interleaved
// across stripes so concurrent workers naturally avoid contending on the same rows.
func (j *MyJob) Batch(items []Target) [][]Target {
return etl.StripeBatcher(
func(t Target) string { return t.Category },
4, // numStripes (match WithLoadWorkers)
500, // maxBatchSize
).Batch(items)
}
// Compose multiple strategies: group by category, then cap at 10 KB per batch
func (j *MyJob) Batch(items []Target) [][]Target {
return etl.CombineBatchers(
etl.GroupByField(func(t Target) string { return t.Category }),
etl.WeightedBatcher(func(t Target) int { return t.Size }, 10*1024),
).Batch(items)
}
// Send everything in a single batch (disable batching entirely)
func (j *MyJob) Batch(items []Target) [][]Target {
return etl.NoBatcher[Target]().Batch(items)
}
Lifecycle Hooks ¶
Implement Starter and/or Stopper for setup and cleanup:
func (j *MyJob) Start(ctx context.Context) context.Context {
j.startedAt = time.Now()
slog.Info("starting pipeline")
return ctx
}
func (j *MyJob) Stop(ctx context.Context, stats *etl.Stats, err error) {
elapsed := time.Since(j.startedAt)
rate := float64(stats.Loaded()) / elapsed.Seconds()
if err != nil {
slog.Error("pipeline failed", "error", err, "processed", stats.Loaded())
} else {
slog.Info("pipeline complete", "loaded", stats.Loaded(), "elapsed", elapsed, "rate", rate)
}
}
Transform Patterns ¶
One-to-one transformation (implement Transformer[S, T]):
func (j *MyJob) Transform(ctx context.Context, src Source) (Target, error) {
return Target{ID: src.ID, Name: src.Name}, nil
}
One-to-many transformation (implement Expander[S, T]):
func (j *MyJob) Expand(ctx context.Context, src Source) ([]Target, error) {
return src.Items, nil // One source row produces multiple output records
}
Filtering Targets ¶
Use the Filter[S] interface for pre-transform filtering:
func (j *MyJob) Include(src Source) bool {
return !src.Deleted // Skip deleted records before transform
}
This is more efficient than filtering in Transform since it skips the transformation step entirely.
Error Handling ¶
Without ErrorHandler, the pipeline stops on the first error.
With ErrorHandler, you control error behavior:
func (j *MyJob) OnError(ctx context.Context, stage etl.Stage, err error) etl.Action {
switch stage {
case etl.StageExtract:
// Skip malformed records
slog.Warn("skipping record", "error", err)
return etl.ActionSkip
case etl.StageTransform:
// Log and skip transform errors
slog.Error("transform error", "error", err)
return etl.ActionSkip
case etl.StageLoad:
// Fail on load errors (data integrity)
return etl.ActionFail
}
return etl.ActionFail
}
Best Practices ¶
Load operations should be idempotent (use UPSERT) to handle:
- Pipeline restarts after partial completion
- Re-processing of records near checkpoint boundaries
Use generous CheckpointInterval() values (1000-10000) to balance:
- Recovery granularity (smaller = less re-processing on restart)
- Checkpoint overhead (larger = less database writes)
Match LoadWorkers() to your database connection pool size.
For graceful shutdown on SIGINT/SIGTERM, set up signal handling before calling Run:
ctx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer cancel()
err := etl.New[Source, Target, int64](&MyJob{}).Run(ctx)
Index ¶
- Constants
- type Action
- type Batcher
- func CombineBatchers[Target any](batchers ...Batcher[Target]) Batcher[Target]
- func GroupByField[Target any, K comparable](keyExtractor func(Target) K) Batcher[Target]
- func GroupByFieldWithSizeLimit[Target any, K comparable](keyExtractor func(Target) K, maxGroupSize int) Batcher[Target]
- func NoBatcher[Target any]() Batcher[Target]
- func SizeBatcher[Target any](maxSize int) Batcher[Target]
- func StripeBatcher[T any, K comparable](keyFn func(T) K, numStripes int, maxBatchSize int) Batcher[T]
- func WeightedBatcher[Target any](weigher func(Target) int, maxWeight int) Batcher[Target]
- type BatcherFunc
- type Checkpointer
- type DrainTimeout
- type ErrorHandler
- type Expander
- type Filter
- type Job
- type LoadBatchSize
- type LoadWorkers
- type Pipeline
- func (p *Pipeline[S, T, C]) Run(ctx context.Context) error
- func (p *Pipeline[S, T, C]) WithDrainTimeout(d time.Duration) *Pipeline[S, T, C]
- func (p *Pipeline[S, T, C]) WithLoadBatchSize(n int) *Pipeline[S, T, C]
- func (p *Pipeline[S, T, C]) WithLoadWorkers(n int) *Pipeline[S, T, C]
- func (p *Pipeline[S, T, C]) WithReportInterval(n int) *Pipeline[S, T, C]
- func (p *Pipeline[S, T, C]) WithTransformWorkers(n int) *Pipeline[S, T, C]
- type ProgressReporter
- type ReportInterval
- type Stage
- type Starter
- type Stats
- type Stopper
- type TransformWorkers
- type Transformer
Examples ¶
Constants ¶
const ( DefaultTransformWorkers = 1 DefaultLoadWorkers = 1 DefaultLoadBatchSize = 100 DefaultReportInterval = 10000 DefaultDrainTimeout = 5 * time.Minute )
Default configuration values.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Batcher ¶
type Batcher[T any] interface { // Batch groups items into batches for loading. Batch(items []T) [][]T }
Batcher groups transformed records into batches for loading. Implement this interface on your job when the default size-based batching is insufficient.
The pipeline calls Batch after accumulating records from the transform stage. In streaming mode, Batch is called whenever the pending buffer reaches LoadBatchSize, and once more to flush remaining records. In epoch mode, Batch is called once per epoch with all transformed records from that epoch.
The default batcher (used when Batcher is not implemented) is equivalent to SizeBatcher with the resolved LoadBatchSize.
When to implement a custom Batcher:
- Records must be grouped by a key (e.g., tenant ID) so each Load call targets a single partition
- Batch size depends on record weight (e.g., SQL parameter count limits)
- You need stripe-based batching for parallel load workers
Ready-made batchers are available for common patterns:
- SizeBatcher: fixed number of items per batch
- GroupByField: group by a key extracted from each record
- GroupByFieldWithSizeLimit: group by key with a per-group size cap
- WeightedBatcher: batch by cumulative weight (e.g., SQL param limits)
- StripeBatcher: hash-stripe for parallel load workers
- NoBatcher: send everything in a single batch
- CombineBatchers: compose multiple strategies in sequence
Example:
func (j *MyJob) Batch(items []Target) [][]Target {
return etl.WeightedBatcher(func(t Target) int { return 5 }, 65535).Batch(items)
}
func CombineBatchers ¶
CombineBatchers applies multiple batching strategies in sequence. Each batcher processes the output of the previous batcher.
Example:
// First group by category, then limit each group to 25 items
batcher := etl.CombineBatchers(
etl.GroupByField(func(t Target) string { return t.Category }),
etl.SizeBatcher[Target](25),
)
Example ¶
package main
import (
"fmt"
"github.com/bjaus/etl"
)
// Target is a target record type for examples.
type Target struct {
ID int
Name string
Category string
Size int
}
func main() {
items := []Target{
{ID: 1, Category: "x"},
{ID: 2, Category: "x"},
{ID: 3, Category: "x"},
{ID: 4, Category: "y"},
{ID: 5, Category: "y"},
}
// First group by category, then split groups into batches of at most 2
batcher := etl.CombineBatchers(
etl.GroupByField(func(t Target) string { return t.Category }),
etl.SizeBatcher[Target](2),
)
batches := batcher.Batch(items)
fmt.Println("batches:", len(batches))
for _, batch := range batches {
fmt.Printf(" category=%s count=%d\n", batch[0].Category, len(batch))
}
}
Output: batches: 3 category=x count=2 category=x count=1 category=y count=2
func GroupByField ¶
func GroupByField[Target any, K comparable](keyExtractor func(Target) K) Batcher[Target]
GroupByField creates batches by grouping items that have the same value for a given field. All items with the same key will be in the same batch, regardless of batch size.
Example:
// Group all records by category
batcher := etl.GroupByField(func(t Target) string {
return t.Category
})
Example ¶
package main
import (
"fmt"
"github.com/bjaus/etl"
)
// Target is a target record type for examples.
type Target struct {
ID int
Name string
Category string
Size int
}
func main() {
items := []Target{
{ID: 1, Category: "x"},
{ID: 2, Category: "y"},
{ID: 3, Category: "x"},
{ID: 4, Category: "y"},
}
batcher := etl.GroupByField(func(t Target) string { return t.Category })
batches := batcher.Batch(items)
fmt.Println("batches:", len(batches))
for _, batch := range batches {
fmt.Printf(" category=%s count=%d\n", batch[0].Category, len(batch))
}
}
Output: batches: 2 category=x count=2 category=y count=2
func GroupByFieldWithSizeLimit ¶
func GroupByFieldWithSizeLimit[Target any, K comparable]( keyExtractor func(Target) K, maxGroupSize int, ) Batcher[Target]
GroupByFieldWithSizeLimit combines field grouping with size limits per group. Items are first grouped by field value, then large groups are split into smaller batches.
Example:
// Group by category, but limit each group to 50 records
batcher := etl.GroupByFieldWithSizeLimit(
func(t Target) string { return t.Category },
50,
)
Example ¶
package main
import (
"fmt"
"github.com/bjaus/etl"
)
// Target is a target record type for examples.
type Target struct {
ID int
Name string
Category string
Size int
}
func main() {
items := []Target{
{ID: 1, Category: "x"},
{ID: 2, Category: "x"},
{ID: 3, Category: "x"},
{ID: 4, Category: "y"},
}
// Group by category, but cap each group at 2 items
batcher := etl.GroupByFieldWithSizeLimit(
func(t Target) string { return t.Category },
2,
)
batches := batcher.Batch(items)
fmt.Println("batches:", len(batches))
for _, batch := range batches {
fmt.Printf(" category=%s count=%d\n", batch[0].Category, len(batch))
}
}
Output: batches: 3 category=x count=2 category=x count=1 category=y count=1
func NoBatcher ¶
NoBatcher returns items as a single batch (no batching).
Example ¶
package main
import (
"fmt"
"github.com/bjaus/etl"
)
func main() {
batcher := etl.NoBatcher[string]()
batches := batcher.Batch([]string{"a", "b", "c"})
fmt.Println(batches)
}
Output: [[a b c]]
func SizeBatcher ¶
SizeBatcher creates batches with a maximum number of items per batch.
Example:
// Create batches of up to 100 items each batcher := etl.SizeBatcher[MyRecord](100)
Example ¶
package main
import (
"fmt"
"github.com/bjaus/etl"
)
func main() {
batcher := etl.SizeBatcher[string](2)
batches := batcher.Batch([]string{"a", "b", "c", "d", "e"})
fmt.Println(batches)
}
Output: [[a b] [c d] [e]]
func StripeBatcher ¶
func StripeBatcher[T any, K comparable](keyFn func(T) K, numStripes int, maxBatchSize int) Batcher[T]
StripeBatcher assigns items to stripes via a hash of their key, then interleaves batches across stripes in round-robin order. This is designed for parallel load workers: items with the same key always land in the same stripe, and the interleaving ensures concurrent workers naturally process different stripes, avoiding lock contention on shared resources (e.g., database rows).
Within each stripe, items are grouped by key for locality, then chunked by maxBatchSize. The output order is: stripe 0 batch 0, stripe 1 batch 0, ..., stripe 0 batch 1, stripe 1 batch 1, etc.
Example:
// 4 stripes (match load worker count), max 500 items per batch
batcher := etl.StripeBatcher(
func(r Record) string { return r.TenantID },
4, // numStripes
500, // maxBatchSize
)
Example ¶
package main
import (
"fmt"
"github.com/bjaus/etl"
)
// Target is a target record type for examples.
type Target struct {
ID int
Name string
Category string
Size int
}
func main() {
items := []Target{
{ID: 1, Category: "a"},
{ID: 2, Category: "b"},
{ID: 3, Category: "a"},
{ID: 4, Category: "c"},
{ID: 5, Category: "b"},
{ID: 6, Category: "a"},
}
// 2 stripes, max 10 items per batch
batcher := etl.StripeBatcher(
func(t Target) string { return t.Category },
2,
10,
)
batches := batcher.Batch(items)
fmt.Println("batches:", len(batches))
total := 0
for _, batch := range batches {
total += len(batch)
}
fmt.Println("total items:", total)
}
Output: batches: 2 total items: 6
func WeightedBatcher ¶
WeightedBatcher creates batches where the total weight does not exceed maxWeight. The weigher function returns the weight of each individual item. Items are accumulated into a batch until adding the next item would exceed maxWeight, at which point a new batch is started.
If a single item exceeds maxWeight, it is placed in its own batch (never dropped).
Common use cases:
- SQL parameter limits: each record contributes N parameters to an INSERT
- Payload size limits: each record contributes a variable number of bytes
- API rate/cost budgets: each record has a variable cost
Example:
// Batch by SQL parameter count (5 columns per row, 65535 param limit)
batcher := etl.WeightedBatcher(func(t Target) int { return 5 }, 65535)
// Batch by estimated payload size
batcher := etl.WeightedBatcher(func(t Target) int { return len(t.Body) }, 10*1024*1024)
Example ¶
package main
import (
"fmt"
"github.com/bjaus/etl"
)
// Target is a target record type for examples.
type Target struct {
ID int
Name string
Category string
Size int
}
func main() {
items := []Target{
{ID: 1, Size: 30},
{ID: 2, Size: 30},
{ID: 3, Size: 30},
{ID: 4, Size: 30},
{ID: 5, Size: 30},
}
// Batch so total size per batch does not exceed 70
batcher := etl.WeightedBatcher(func(t Target) int { return t.Size }, 70)
batches := batcher.Batch(items)
for i, batch := range batches {
total := 0
for _, t := range batch {
total += t.Size
}
fmt.Printf("batch %d: %d items, total size %d\n", i, len(batch), total)
}
}
Output: batch 0: 2 items, total size 60 batch 1: 2 items, total size 60 batch 2: 1 items, total size 30
type BatcherFunc ¶
type BatcherFunc[T any] func(items []T) [][]T
BatcherFunc adapts a plain function to the Batcher interface.
Example:
batcher := etl.BatcherFunc[Target](func(items []Target) [][]Target {
// custom batching logic
return [][]Target{items}
})
func (BatcherFunc[T]) Batch ¶
func (f BatcherFunc[T]) Batch(items []T) [][]T
type Checkpointer ¶
type Checkpointer[S any, C comparable] interface { // CheckpointInterval returns the number of records to process per epoch. CheckpointInterval() int // Cursor extracts a checkpoint cursor from a source record. // The cursor must increase monotonically with source ordering. Cursor(src S) C // LoadCheckpoint retrieves the last saved cursor and stats. // Return (nil, nil, nil) if no checkpoint exists (fresh start). LoadCheckpoint(ctx context.Context) (cursor *C, stats *Stats, err error) // SaveCheckpoint persists cursor and stats. // Called by the pipeline after each epoch completes successfully. SaveCheckpoint(ctx context.Context, cursor C, stats *Stats) error // ClearCheckpoint removes the saved checkpoint. // Called by the pipeline after successful completion. ClearCheckpoint(ctx context.Context) error }
Checkpointer enables checkpoint-based resumability for long-running ETL jobs.
When implemented, the pipeline switches from streaming mode to epoch-based mode. Records are processed in epochs of CheckpointInterval() records:
- Extract up to CheckpointInterval() records
- Transform and load them with configured concurrency
- Save a checkpoint (cursor + stats) after the epoch completes
- Repeat from step 1 until the source is exhausted
- Clear the checkpoint on successful completion
On restart, LoadCheckpoint returns the saved cursor and stats so the pipeline resumes from where it left off. Extract receives the cursor to skip already- processed records.
Use Checkpointer when:
- The dataset is large enough that re-processing from scratch is expensive
- The job may be interrupted (deploys, spot instances, timeouts)
- You need progress durability across process restarts
Idempotency requirement: because records near a checkpoint boundary may be re-processed on restart, the Load implementation should be idempotent (use UPSERT rather than INSERT). Cursor values must increase monotonically with source ordering so that resuming from a cursor skips the right records.
Choosing CheckpointInterval:
- Smaller values (500-2000): finer recovery granularity, more checkpoint writes
- Larger values (5000-10000): fewer checkpoint writes, more re-processing on restart
- Match to your source's natural page size when possible
Interaction with graceful shutdown: when the parent context is cancelled, the current epoch is allowed to complete (within DrainTimeout), and the checkpoint is saved before the pipeline exits. This means the next restart will not re-process the completed epoch.
Example:
func (j *MyJob) CheckpointInterval() int { return 5000 }
func (j *MyJob) Cursor(src Source) int64 { return src.ID }
func (j *MyJob) LoadCheckpoint(ctx context.Context) (*int64, *etl.Stats, error) {
// Return (nil, nil, nil) for a fresh start
row := j.db.QueryRowContext(ctx, "SELECT cursor, stats FROM checkpoints WHERE job = $1", j.id)
var cursor int64
var statsJSON []byte
if err := row.Scan(&cursor, &statsJSON); err != nil {
if errors.Is(err, sql.ErrNoRows) { return nil, nil, nil }
return nil, nil, err
}
var stats etl.Stats
if err := stats.UnmarshalJSON(statsJSON); err != nil { return nil, nil, err }
return &cursor, &stats, nil
}
func (j *MyJob) SaveCheckpoint(ctx context.Context, cursor int64, stats *etl.Stats) error {
data, _ := stats.MarshalJSON()
_, err := j.db.ExecContext(ctx,
`INSERT INTO checkpoints (job, cursor, stats) VALUES ($1, $2, $3)
ON CONFLICT (job) DO UPDATE SET cursor = $2, stats = $3`, j.id, cursor, data)
return err
}
func (j *MyJob) ClearCheckpoint(ctx context.Context) error {
_, err := j.db.ExecContext(ctx, "DELETE FROM checkpoints WHERE job = $1", j.id)
return err
}
type DrainTimeout ¶
type DrainTimeout interface {
// DrainTimeout returns the maximum time to wait for in-flight operations
// to complete after the parent context is cancelled.
// A zero value disables graceful shutdown (immediate abort).
DrainTimeout() time.Duration
}
DrainTimeout controls graceful shutdown behavior. When the parent context is cancelled (e.g., SIGTERM), the pipeline:
- Stops extracting new records immediately
- Allows in-flight transform/load operations to complete within the timeout
- If operations complete within timeout: exits cleanly with nil error
- If timeout expires: forces abort with error
Implement this interface to set the timeout from the job struct rather than the pipeline builder.
The value can be overridden at runtime via WithDrainTimeout, which takes precedence. If neither is set, DefaultDrainTimeout (5 minutes) is used.
Set to 0 to disable graceful shutdown entirely (immediate abort on context cancellation). Negative values passed to WithDrainTimeout are ignored.
When to customize:
- Jobs with fast loads (< 1s per batch): a short timeout (30s) is sufficient
- Jobs with slow external API calls: increase to match worst-case latency
- Jobs where partial progress is not useful: set to 0 to fail fast
In checkpoint mode, a successful drain saves the checkpoint before exiting, so the next restart picks up exactly where the shutdown left off.
Example:
func (j *MyJob) DrainTimeout() time.Duration { return 30 * time.Second }
type ErrorHandler ¶
type ErrorHandler interface {
// OnError is called when an error occurs during any stage.
// Return ActionSkip to continue processing, ActionFail to stop the pipeline.
OnError(ctx context.Context, stage Stage, err error) Action
}
ErrorHandler customizes error handling per pipeline stage. Without an ErrorHandler, the pipeline stops on the first error in any stage.
Implement this interface when you want to:
- Log errors and continue processing (return ActionSkip)
- Apply different strategies per stage (e.g., skip extract errors, fail on load errors)
- Track error counts for alerting or metrics
Common patterns:
// Log-and-skip for transform, fail for load (data integrity)
func (j *MyJob) OnError(ctx context.Context, stage etl.Stage, err error) etl.Action {
switch stage {
case etl.StageExtract:
slog.Warn("skipping malformed record", "error", err)
return etl.ActionSkip
case etl.StageTransform:
slog.Error("transform error", "error", err)
return etl.ActionSkip
case etl.StageLoad:
return etl.ActionFail
}
return etl.ActionFail
}
Skipped errors still increment Stats.Errors. The err parameter passed to Stopper.Stop only contains the fatal error that caused the pipeline to fail (i.e., when OnError returned ActionFail or when no ErrorHandler is present).
type Expander ¶
Expander converts one input record to multiple output records. Use this when a single source record needs to produce a variable number of target records.
Returning an empty or nil slice effectively filters the record out — no target records are produced and nothing reaches the load stage.
When to use:
- Denormalization: a source row with nested items produces one target per item
- Splitting: a source record contains multiple logical entities
- Conditional expansion: some records produce targets, others don't
Example:
func (j *MyJob) Expand(ctx context.Context, src Source) ([]Target, error) {
targets := make([]Target, 0, len(src.Items))
for _, item := range src.Items {
targets = append(targets, Target{ParentID: src.ID, ItemID: item.ID})
}
return targets, nil
}
type Filter ¶
type Filter[S any] interface { // Include returns true if the record should be processed. // Returning false skips the record before it reaches the transform stage. Include(src S) bool }
Filter excludes records before transformation. Implement this interface when you need to skip records based on their content without incurring the cost of transformation.
Filter runs in the extract stage, before any concurrency. This makes it the most efficient place to drop records because filtered records never enter the transform or load stages and never consume worker capacity.
Use Filter when you have:
- Soft-deleted records that should be ignored
- Records outside a target date range
- Inactive or disabled entities
- Any condition that can be evaluated from the source record alone
Example:
func (j *MyJob) Include(src Source) bool {
return !src.Deleted && src.UpdatedAt.After(j.since)
}
If you need access to context or want to produce errors, consider filtering inside Transformer or Expander instead. Returning an empty slice from Expander has the same effect, but the record still passes through the transform stage.
Filter interacts with Checkpointer: in epoch mode, filtered records still advance the cursor so that resuming from a checkpoint does not re-process records that were already seen and skipped.
type Job ¶
type Job[S, T any, C comparable] interface { // Extract yields records from the source. // cursor is nil on fresh start, or the last checkpoint cursor on resume. // If Checkpointer[S, C] is not implemented, cursor is always nil. Extract(ctx context.Context, cursor *C) iter.Seq2[S, error] // Load writes a batch of records to the destination. // Should be idempotent (UPSERT) to handle potential re-processing. Load(ctx context.Context, batch []T) error }
Job defines the core ETL operations. This is the only required interface to implement.
The type parameters are:
- S: source record type (extracted from the data source)
- T: target record type (loaded to the destination)
- C: cursor type for checkpoint-based resumability (use any if not needed)
For transformation, implement one of:
- Transformer: 1:1 transform (one input record -> one output record)
- Expander: 1:N transform (one input record -> multiple output records)
If both are implemented, Transformer takes precedence.
type LoadBatchSize ¶
type LoadBatchSize interface {
// LoadBatchSize returns the number of records to batch before loading.
LoadBatchSize() int
}
LoadBatchSize controls the number of records batched together before calling Load. Implement this interface to set the batch size from the job struct rather than the pipeline builder.
The value can be overridden at runtime via WithLoadBatchSize, which takes precedence. If neither is set, DefaultLoadBatchSize (100) is used.
This value is used as the default batch size when no custom Batcher is implemented. When a custom Batcher is present, this value is ignored in favor of the Batcher's own logic.
Tuning guidance:
- For SQL INSERTs: balance between fewer round-trips (larger batches) and staying within parameter limits (e.g., PostgreSQL's 65535 param limit)
- For API calls: match the external API's preferred batch size
- For file writes: larger batches reduce I/O overhead
Example:
func (j *MyJob) LoadBatchSize() int { return 500 }
type LoadWorkers ¶
type LoadWorkers interface {
// LoadWorkers returns the number of concurrent load workers.
LoadWorkers() int
}
LoadWorkers controls worker parallelism for the load stage. Implement this interface to set the concurrency level from the job struct rather than the pipeline builder.
The value can be overridden at runtime via WithLoadWorkers, which takes precedence. If neither is set, DefaultLoadWorkers (1) is used.
Tuning guidance:
- Match to your database connection pool size to avoid pool exhaustion
- For batch INSERTs/UPSERTs, 2-4 workers often saturates a single database
- Consider using StripeBatcher with matching stripe count to avoid lock contention when multiple workers write to the same table
Example:
func (j *MyJob) LoadWorkers() int { return 4 }
type Pipeline ¶
type Pipeline[S, T any, C comparable] struct { // contains filtered or unexported fields }
Pipeline orchestrates the ETL process.
func New ¶
func New[S, T any, C comparable](job Job[S, T, C]) *Pipeline[S, T, C]
New creates a new Pipeline for the given job. The job must implement Job[S, T, C]. Optional interfaces are auto-detected.
For transformation, the job must implement one of:
- Transformer[S, T]: 1:1 transform (one input record -> one output record)
- Expander[S, T]: 1:N transform (one input record -> multiple output records)
If both Transformer and Expander are implemented, Transformer takes precedence. Panics if neither is implemented.
Example ¶
package main
import (
"context"
"fmt"
"iter"
"github.com/bjaus/etl"
)
// Source is a source record type for examples.
type Source struct {
ID int
Name string
}
// Target is a target record type for examples.
type Target struct {
ID int
Name string
Category string
Size int
}
type basicJob struct {
rows []Source
}
func (j *basicJob) Extract(_ context.Context, _ *int) iter.Seq2[Source, error] {
return func(yield func(Source, error) bool) {
for _, r := range j.rows {
if !yield(r, nil) {
return
}
}
}
}
func (j *basicJob) Transform(_ context.Context, src Source) (Target, error) {
return Target{ID: src.ID, Name: src.Name + "!"}, nil
}
func (j *basicJob) Load(_ context.Context, batch []Target) error {
for _, r := range batch {
fmt.Printf("loaded: %d %s\n", r.ID, r.Name)
}
return nil
}
func main() {
job := &basicJob{
rows: []Source{
{ID: 1, Name: "Alice"},
{ID: 2, Name: "Bob"},
},
}
err := etl.New[Source, Target, int](job).Run(context.Background())
if err != nil {
fmt.Println("error:", err)
}
}
Output: loaded: 1 Alice! loaded: 2 Bob!
func (*Pipeline[S, T, C]) Run ¶
Run executes the pipeline.
Example ¶
package main
import (
"context"
"fmt"
"iter"
"github.com/bjaus/etl"
)
// Source is a source record type for examples.
type Source struct {
ID int
Name string
}
// Target is a target record type for examples.
type Target struct {
ID int
Name string
Category string
Size int
}
type basicJob struct {
rows []Source
}
func (j *basicJob) Extract(_ context.Context, _ *int) iter.Seq2[Source, error] {
return func(yield func(Source, error) bool) {
for _, r := range j.rows {
if !yield(r, nil) {
return
}
}
}
}
func (j *basicJob) Transform(_ context.Context, src Source) (Target, error) {
return Target{ID: src.ID, Name: src.Name + "!"}, nil
}
func (j *basicJob) Load(_ context.Context, batch []Target) error {
for _, r := range batch {
fmt.Printf("loaded: %d %s\n", r.ID, r.Name)
}
return nil
}
func main() {
job := &basicJob{
rows: []Source{
{ID: 1, Name: "Alice"},
{ID: 2, Name: "Bob"},
{ID: 3, Name: "Charlie"},
},
}
err := etl.New[Source, Target, int](job).
WithLoadBatchSize(2).
WithTransformWorkers(2).
Run(context.Background())
if err != nil {
fmt.Println("error:", err)
}
}
Output: loaded: 1 Alice! loaded: 2 Bob! loaded: 3 Charlie!
func (*Pipeline[S, T, C]) WithDrainTimeout ¶
WithDrainTimeout overrides the graceful shutdown timeout. When the parent context is cancelled, the pipeline will wait up to this duration for in-flight operations to complete before forcing an abort. Priority: this method > DrainTimeout interface > DefaultDrainTimeout. Set to 0 to disable graceful shutdown (immediate abort). Negative values are ignored.
func (*Pipeline[S, T, C]) WithLoadBatchSize ¶
WithLoadBatchSize overrides the number of records to batch before loading. Priority: this method > LoadBatchSize interface > DefaultLoadBatchSize. Values less than 1 are ignored.
func (*Pipeline[S, T, C]) WithLoadWorkers ¶
WithLoadWorkers overrides the number of concurrent load workers. Priority: this method > LoadWorkers interface > DefaultLoadWorkers. Values less than 1 are ignored.
func (*Pipeline[S, T, C]) WithReportInterval ¶
WithReportInterval overrides how often to report progress (in records). Priority: this method > ProgressReporter interface > DefaultReportInterval. Values less than 1 are ignored.
func (*Pipeline[S, T, C]) WithTransformWorkers ¶
WithTransformWorkers overrides the number of concurrent transform workers. Priority: this method > TransformWorkers interface > DefaultTransformWorkers. Values less than 1 are ignored.
type ProgressReporter ¶
type ProgressReporter interface {
ReportInterval
// OnProgress is called periodically during execution.
OnProgress(ctx context.Context, stats *Stats)
}
ProgressReporter receives periodic progress updates during pipeline execution. Implement this interface when you want to log throughput, emit metrics, or update an external dashboard while the pipeline is running.
OnProgress is called each time the cumulative loaded count crosses a ReportInterval boundary. In streaming mode this happens inside load workers; in epoch mode it happens during batch loading within each epoch.
The Stats snapshot passed to OnProgress is safe to read concurrently. Avoid performing blocking I/O inside OnProgress since it runs on a load worker goroutine.
Use ProgressReporter when:
- You want periodic log lines showing loaded/error counts
- You need to push throughput metrics to Prometheus, Datadog, etc.
- You want a heartbeat to detect stalled pipelines
Example:
func (j *MyJob) ReportInterval() int { return 10000 }
func (j *MyJob) OnProgress(ctx context.Context, stats *etl.Stats) {
slog.InfoContext(ctx, "progress",
"extracted", stats.Extracted(),
"loaded", stats.Loaded(),
"errors", stats.Errors(),
)
}
To override the interval at runtime without changing the job struct, use WithReportInterval on the pipeline builder instead.
type ReportInterval ¶
type ReportInterval interface {
// ReportInterval returns how often to call OnProgress (in records loaded).
ReportInterval() int
}
ReportInterval controls how often progress is reported, measured in records loaded. This interface can be implemented independently of ProgressReporter when you want to set the interval via the job struct rather than the builder.
The value can be overridden at runtime via WithReportInterval, which takes precedence over this interface. If neither is set, DefaultReportInterval (10,000 records) is used.
This interface is embedded in ProgressReporter, so implementing ProgressReporter automatically satisfies ReportInterval.
Example:
func (j *MyJob) ReportInterval() int { return 5000 }
type Starter ¶
type Starter interface {
// Start is called before extraction begins.
// The returned context is used for the entire pipeline.
Start(ctx context.Context) context.Context
}
Starter is called before pipeline execution begins. Implement this interface when you need to perform setup work or enrich the context before extraction starts.
Use Starter for:
- Adding values to the context (request IDs, trace spans, logger fields)
- Recording the pipeline start time for elapsed-time metrics
- Acquiring resources that must be held for the pipeline's lifetime
- Logging the start of a pipeline run
The context returned by Start is propagated to all pipeline stages and to Stopper.Stop. This makes it the right place to attach tracing spans or cancellation signals.
Example:
func (j *MyJob) Start(ctx context.Context) context.Context {
j.startedAt = time.Now()
slog.InfoContext(ctx, "pipeline starting")
return ctx
}
Start is called exactly once, before the first call to Extract.
type Stats ¶
type Stats struct {
// contains filtered or unexported fields
}
Stats provides pipeline statistics with thread-safe access. Counter fields use atomic operations for safe concurrent access from worker goroutines.
func NewStats ¶
NewStats creates a Stats with initial counter values. Use this when loading checkpoint data from storage.
func (*Stats) MarshalJSON ¶
MarshalJSON implements json.Marshaler for Stats serialization.
func (*Stats) Transformed ¶
Transformed returns the number of records transformed.
func (*Stats) UnmarshalJSON ¶
UnmarshalJSON implements json.Unmarshaler for Stats deserialization.
type Stopper ¶
type Stopper interface {
// Stop is called exactly once, after the pipeline Run method returns.
Stop(ctx context.Context, stats *Stats, err error)
}
Stopper is called after pipeline execution completes, regardless of whether the pipeline succeeded, failed, or was shut down gracefully. Implement this interface for cleanup, final logging, or metrics reporting.
Use Stopper for:
- Logging final stats and elapsed time
- Reporting success/failure metrics to an observability system
- Releasing resources acquired in Starter
- Sending completion notifications
The ctx passed to Stop is derived from the pipeline's drain context, which remains valid even during graceful shutdown. This allows Stop to perform cleanup operations (database writes, API calls) after the parent context has been cancelled.
The err parameter is the same error value returned by Run: the unrecoverable error that caused Run to fail (no ErrorHandler, or ErrorHandler returned ActionFail). Errors handled with ActionSkip do not appear in err, even though they increment stats.Errors while the pipeline continues processing.
Example:
func (j *MyJob) Stop(ctx context.Context, stats *etl.Stats, err error) {
elapsed := time.Since(j.startedAt)
if err != nil {
slog.ErrorContext(ctx, "pipeline failed", "error", err, "stats", stats, "elapsed", elapsed)
} else {
slog.InfoContext(ctx, "pipeline complete", "stats", stats, "elapsed", elapsed)
}
}
Stop is called exactly once, after the pipeline Run method returns.
type TransformWorkers ¶
type TransformWorkers interface {
// TransformWorkers returns the number of concurrent transform workers.
TransformWorkers() int
}
TransformWorkers controls worker parallelism for the transform stage. Implement this interface to set the concurrency level from the job struct rather than the pipeline builder.
The value can be overridden at runtime via WithTransformWorkers, which takes precedence. If neither is set, DefaultTransformWorkers (1) is used.
Tuning guidance:
- CPU-bound transforms (parsing, hashing, serialization): set to runtime.NumCPU()
- I/O-bound transforms (HTTP calls, cache lookups): set higher (10-50) depending on the external service's capacity
- Pure field mapping with no I/O: 1 worker is usually sufficient since the overhead of goroutine coordination outweighs the parallelism benefit
Example:
func (j *MyJob) TransformWorkers() int { return runtime.NumCPU() }
type Transformer ¶
Transformer converts one input record to one output record. Use this for simple 1:1 mappings where each source record produces exactly one target record.
When to use Transformer vs Expander:
- Transformer: field mapping, format conversion, enrichment — one in, one out
- Expander: denormalization, splitting — one source record produces a variable number of output records (including zero, which acts as a filter)
Example:
func (j *MyJob) Transform(ctx context.Context, src Source) (Target, error) {
return Target{
ID: src.ID,
Name: strings.ToUpper(src.Name),
}, nil
}