framework

package
v0.0.0-...-d1eea97 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2026 License: Apache-2.0, Apache-2.0 Imports: 31 Imported by: 0

Documentation

Overview

Package framework provides orchestration for running analysis pipelines.

Index

Constants

View Source
const (
	FileModeCommit = 0o160000
	FileModeTree   = 0o040000
	FileModeBlob   = 0o100644
	FileModeExec   = 0o100755
	FileModeLink   = 0o120000
)

File mode constants for git tree entries.

View Source
const DefaultBlobBatchArenaSize = 4 * 1024 * 1024

DefaultBlobBatchArenaSize is the default size of the memory arena for blob loading (4MB).

View Source
const DefaultDiffCacheSize = 10000

DefaultDiffCacheSize is the default maximum number of diff entries to cache.

View Source
const DefaultGlobalCacheSize = 128 * 1024 * 1024

DefaultGlobalCacheSize is the default maximum memory size for the global blob cache (128 MB).

View Source
const (
	// MaxStallRetries is the maximum number of retry attempts before giving up.
	MaxStallRetries = 3
)

Watchdog constants.

Variables

View Source
var (
	ErrInvalidSizeFormat = errors.New("invalid size format")
	ErrInvalidGCPercent  = errors.New("invalid GC percent")
)

Sentinel errors for configuration.

View Source
var ErrNotParallelizable = errors.New("leaf does not implement Parallelizable")

ErrNotParallelizable is returned when a leaf analyzer does not implement analyze.Parallelizable.

View Source
var ErrWorkerStalled = errors.New("worker stalled: CGO call did not return within timeout")

ErrWorkerStalled is returned when a worker does not respond within the timeout and all retry attempts with exponential backoff are exhausted.

Functions

func BackoffDuration

func BackoffDuration(attempt int) time.Duration

BackoffDuration returns the backoff duration for the given retry attempt (0-indexed). Sequence: 0s, 1s, 4s.

func CanResumeWithCheckpoint

func CanResumeWithCheckpoint(totalAnalyzers, checkpointableCount int) bool

CanResumeWithCheckpoint returns true if all analyzers support checkpointing.

func DefaultMemoryBudget

func DefaultMemoryBudget() int64

DefaultMemoryBudget returns a sensible memory budget based on available system memory. Returns min(50% of total RAM, 2 GiB), or 0 if detection fails.

func MaybeStartCPUProfile

func MaybeStartCPUProfile(path string) (func(), error)

MaybeStartCPUProfile starts CPU profiling to the given file. Returns a stop function that must be deferred. Returns a no-op if path is empty.

func MaybeWriteHeapProfile

func MaybeWriteHeapProfile(path string, logger *slog.Logger)

MaybeWriteHeapProfile writes a heap profile to the given file. No-op if path is empty. Uses the provided logger for error reporting.

func ParseOptionalSize

func ParseOptionalSize(sizeValue string) (int64, error)

ParseOptionalSize parses a human-readable size string, returning 0 for empty or "0".

func RunStreaming

func RunStreaming(
	ctx context.Context,
	runner *Runner,
	commits []*gitlib.Commit,
	analyzers []analyze.HistoryAnalyzer,
	config StreamingConfig,
) (map[analyze.HistoryAnalyzer]analyze.Report, error)

RunStreaming executes the pipeline in streaming chunks with optional checkpoint support. When the memory budget is sufficient and multiple chunks are needed, it enables double-buffered chunk pipelining to overlap pipeline execution with analyzer consumption.

func SafeInt

func SafeInt(v uint64) int

SafeInt converts uint64 to int, clamping to maxInt to prevent overflow.

func SafeInt64

func SafeInt64(v uint64) int64

SafeInt64 converts uint64 to int64, clamping to maxInt64 to prevent overflow.

func StallError

func StallError(reqType string, retries int) error

StallError creates a descriptive error for a stalled worker.

Types

type BlobData

type BlobData struct {
	Commit    *gitlib.Commit
	Index     int
	Changes   gitlib.Changes
	BlobCache map[gitlib.Hash]*gitlib.CachedBlob
	Error     error
}

BlobData holds loaded blob data for a commit.

type BlobPipeline

type BlobPipeline struct {
	SeqWorkerChan  chan<- gitlib.WorkerRequest
	PoolWorkerChan chan<- gitlib.WorkerRequest
	BufferSize     int
	WorkerCount    int
	BlobCache      *GlobalBlobCache
	ArenaSize      int
}

BlobPipeline processes commit batches to load blobs.

func NewBlobPipeline

func NewBlobPipeline(
	seqChan chan<- gitlib.WorkerRequest,
	poolChan chan<- gitlib.WorkerRequest,
	bufferSize int,
	workerCount int,
) *BlobPipeline

NewBlobPipeline creates a new blob pipeline.

func NewBlobPipelineWithCache

func NewBlobPipelineWithCache(
	seqChan chan<- gitlib.WorkerRequest,
	poolChan chan<- gitlib.WorkerRequest,
	bufferSize int,
	workerCount int,
	cache *GlobalBlobCache,
) *BlobPipeline

NewBlobPipelineWithCache creates a new blob pipeline with an optional global blob cache.

func (*BlobPipeline) Process

func (p *BlobPipeline) Process(ctx context.Context, commits <-chan CommitBatch) <-chan BlobData

Process receives commit batches and outputs blob data.

type BudgetSolver

type BudgetSolver func(budgetBytes int64) (CoordinatorConfig, error)

BudgetSolver resolves a memory budget (in bytes) to a CoordinatorConfig.

type CacheStats

type CacheStats struct {
	Hits        int64
	Misses      int64
	Entries     int
	CurrentSize int64
	MaxSize     int64
}

CacheStats holds cache performance metrics.

func (CacheStats) HitRate

func (s CacheStats) HitRate() float64

HitRate returns the cache hit rate (0.0 to 1.0).

type CheckpointParams

type CheckpointParams struct {
	Enabled   bool
	Dir       string
	Resume    bool
	ClearPrev bool
}

CheckpointParams holds checkpoint-related configuration.

type CommitBatch

type CommitBatch struct {
	// Commits in this batch.
	Commits []*gitlib.Commit

	// StartIndex is the index of the first commit in the full sequence.
	StartIndex int

	// BatchID identifies this batch for ordering.
	BatchID int
}

CommitBatch represents a batch of commits for processing.

type CommitData

type CommitData struct {
	Commit      *gitlib.Commit
	Index       int
	Changes     gitlib.Changes
	BlobCache   map[gitlib.Hash]*gitlib.CachedBlob
	FileDiffs   map[string]plumbing.FileDiffData
	UASTChanges []uast.Change // Pre-computed UAST changes (nil if not computed).
	Error       error
}

CommitData holds all processed data for a commit.

type CommitStreamer

type CommitStreamer struct {
	// BatchSize is the number of commits per batch.
	BatchSize int

	// Lookahead is the number of batches to prefetch.
	Lookahead int
}

CommitStreamer iterates commits and groups them into batches for efficient processing.

func NewCommitStreamer

func NewCommitStreamer() *CommitStreamer

NewCommitStreamer creates a new commit streamer with default settings.

func (*CommitStreamer) Stream

func (s *CommitStreamer) Stream(ctx context.Context, commits []*gitlib.Commit) <-chan CommitBatch

Stream takes a slice of commits and streams them as batches. The output channel is closed when all commits have been sent.

func (*CommitStreamer) StreamFromIterator

func (s *CommitStreamer) StreamFromIterator(ctx context.Context, iter *gitlib.CommitIter, limit int) <-chan CommitBatch

StreamFromIterator streams commits from a commit iterator. This is more memory-efficient for large repositories.

func (*CommitStreamer) StreamSingle

func (s *CommitStreamer) StreamSingle(ctx context.Context, commits []*gitlib.Commit) <-chan CommitBatch

StreamSingle streams commits one at a time (batch size = 1). This is compatible with the existing sequential processing model.

type ConfigParams

type ConfigParams struct {
	Workers         int
	BufferSize      int
	CommitBatchSize int
	BlobCacheSize   string
	DiffCacheSize   int
	BlobArenaSize   string
	MemoryBudget    string
	GCPercent       int
	BallastSize     string
}

ConfigParams holds raw CLI parameter values for building a CoordinatorConfig. All size strings use humanize format (e.g. "256MB", "1GiB").

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

Coordinator orchestrates the full data processing pipeline.

func NewCoordinator

func NewCoordinator(repo *gitlib.Repository, config CoordinatorConfig) *Coordinator

NewCoordinator creates a new coordinator for the repository.

func (*Coordinator) Config

func (c *Coordinator) Config() CoordinatorConfig

Config returns the coordinator configuration.

func (*Coordinator) Process

func (c *Coordinator) Process(ctx context.Context, commits []*gitlib.Commit) <-chan CommitData

Process runs the full pipeline on a slice of commits. After the returned channel is fully drained, call Stats() to retrieve pipeline timing and cache metrics.

func (*Coordinator) ProcessSingle

func (c *Coordinator) ProcessSingle(ctx context.Context, commit *gitlib.Commit, _ int) CommitData

ProcessSingle processes a single commit.

func (*Coordinator) Stats

func (c *Coordinator) Stats() PipelineStats

Stats returns the pipeline stats collected during Process(). Only valid after the channel returned by Process() is fully drained.

type CoordinatorConfig

type CoordinatorConfig struct {
	// BatchConfig configures batch sizes for blob and diff operations.
	BatchConfig gitlib.BatchConfig

	// CommitBatchSize is the number of commits to process in each batch.
	CommitBatchSize int

	// Workers is the number of parallel workers for processing.
	Workers int

	// BufferSize is the size of internal channels.
	BufferSize int

	// BlobCacheSize is the maximum size of the global blob cache in bytes.
	// Set to 0 to disable caching.
	BlobCacheSize int64

	// DiffCacheSize is the maximum number of diff results to cache.
	// Set to 0 to disable caching.
	DiffCacheSize int

	// BlobArenaSize is the size of the memory arena for blob loading.
	// Defaults to 16MB if 0.
	BlobArenaSize int

	// UASTPipelineWorkers is the number of goroutines for parallel UAST parsing
	// in the pipeline stage. Set to 0 to disable the UAST pipeline stage.
	UASTPipelineWorkers int

	// LeafWorkers is the number of goroutines for parallel leaf analyzer consumption.
	// Each worker processes a disjoint subset of commits via Fork/Merge.
	// Set to 0 to disable parallel leaf consumption (serial path).
	LeafWorkers int

	// GCPercent controls Go's GC aggressiveness.
	// Set to 0 to use auto mode (200 when system memory > 32 GiB).
	GCPercent int

	// BallastSize reserves bytes in a long-lived slice to smooth GC behavior.
	// Set to 0 to disable ballast allocation.
	BallastSize int64

	// WorkerTimeout is the maximum time to wait for a worker response before
	// considering it stalled. Set to 0 to disable the watchdog.
	WorkerTimeout time.Duration
}

CoordinatorConfig configures the pipeline coordinator.

func BuildConfigFromParams

func BuildConfigFromParams(params ConfigParams, budgetSolver BudgetSolver) (CoordinatorConfig, int64, error)

BuildConfigFromParams builds a CoordinatorConfig from raw parameters. Returns the config and the memory budget in bytes (0 if not set). The budgetSolver is called when params.MemoryBudget is set; pass nil if memory-budget is not supported.

func DefaultCoordinatorConfig

func DefaultCoordinatorConfig() CoordinatorConfig

DefaultCoordinatorConfig returns the default coordinator configuration.

func (CoordinatorConfig) EstimatedOverhead

func (c CoordinatorConfig) EstimatedOverhead() int64

EstimatedOverhead returns the estimated memory consumed by the pipeline infrastructure (runtime, workers, caches, buffers) — everything except analyzer state. This allows the streaming planner to accurately compute how much memory remains for analyzer state growth.

type DiffCache

type DiffCache struct {
	// contains filtered or unexported fields
}

DiffCache provides an LRU cache for diff results. It caches computed diffs to avoid redundant diff computations.

func NewDiffCache

func NewDiffCache(maxEntries int) *DiffCache

NewDiffCache creates a new diff cache with the specified maximum entries.

func (*DiffCache) CacheHits

func (c *DiffCache) CacheHits() int64

CacheHits returns the total cache hit count (atomic, lock-free).

func (*DiffCache) CacheMisses

func (c *DiffCache) CacheMisses() int64

CacheMisses returns the total cache miss count (atomic, lock-free).

func (*DiffCache) Clear

func (c *DiffCache) Clear()

Clear removes all entries from the cache.

func (*DiffCache) Get

func (c *DiffCache) Get(key DiffKey) (plumbing.FileDiffData, bool)

Get retrieves a cached diff result.

func (*DiffCache) Put

func (c *DiffCache) Put(key DiffKey, diff plumbing.FileDiffData)

Put adds a diff result to the cache.

func (*DiffCache) Stats

func (c *DiffCache) Stats() DiffCacheStats

Stats returns current cache statistics.

type DiffCacheStats

type DiffCacheStats struct {
	Hits       int64
	Misses     int64
	Entries    int
	MaxEntries int
}

DiffCacheStats holds statistics about diff cache usage.

func (DiffCacheStats) HitRate

func (s DiffCacheStats) HitRate() float64

HitRate returns the cache hit rate as a fraction.

type DiffKey

type DiffKey struct {
	OldHash gitlib.Hash
	NewHash gitlib.Hash
}

DiffKey uniquely identifies a diff computation by blob hashes.

type DiffPipeline

type DiffPipeline struct {
	PoolWorkerChan chan<- gitlib.WorkerRequest
	BufferSize     int
	DiffCache      *DiffCache
}

DiffPipeline processes blob data to compute file diffs.

func NewDiffPipeline

func NewDiffPipeline(workerChan chan<- gitlib.WorkerRequest, bufferSize int) *DiffPipeline

NewDiffPipeline creates a new diff pipeline.

func NewDiffPipelineWithCache

func NewDiffPipelineWithCache(workerChan chan<- gitlib.WorkerRequest, bufferSize int, cache *DiffCache) *DiffPipeline

NewDiffPipelineWithCache creates a new diff pipeline with an optional diff cache.

func (*DiffPipeline) Process

func (p *DiffPipeline) Process(ctx context.Context, blobs <-chan BlobData) <-chan CommitData

Process receives blob data and outputs commit data with computed diffs.

type GlobalBlobCache

type GlobalBlobCache struct {
	// contains filtered or unexported fields
}

GlobalBlobCache provides a cross-commit LRU cache for blob data. It tracks memory usage and evicts least recently used entries when the limit is exceeded.

func NewGlobalBlobCache

func NewGlobalBlobCache(maxSize int64) *GlobalBlobCache

NewGlobalBlobCache creates a new global blob cache with the specified maximum size in bytes.

func (*GlobalBlobCache) CacheHits

func (c *GlobalBlobCache) CacheHits() int64

CacheHits returns the total cache hit count (atomic, lock-free).

func (*GlobalBlobCache) CacheMisses

func (c *GlobalBlobCache) CacheMisses() int64

CacheMisses returns the total cache miss count (atomic, lock-free).

func (*GlobalBlobCache) Clear

func (c *GlobalBlobCache) Clear()

Clear removes all entries from the cache.

func (*GlobalBlobCache) Get

Get retrieves a blob from the cache. Returns nil if not found.

func (*GlobalBlobCache) GetMulti

func (c *GlobalBlobCache) GetMulti(hashes []gitlib.Hash) (found map[gitlib.Hash]*gitlib.CachedBlob, missing []gitlib.Hash)

GetMulti retrieves multiple blobs from the cache. Returns a map of found blobs and a slice of missing hashes.

func (*GlobalBlobCache) Put

func (c *GlobalBlobCache) Put(hash gitlib.Hash, blob *gitlib.CachedBlob)

Put adds a blob to the cache. If the cache exceeds maxSize, LRU entries are evicted.

func (*GlobalBlobCache) PutMulti

func (c *GlobalBlobCache) PutMulti(blobs map[gitlib.Hash]*gitlib.CachedBlob)

PutMulti adds multiple blobs to the cache.

func (*GlobalBlobCache) Stats

func (c *GlobalBlobCache) Stats() CacheStats

Stats returns cache statistics.

type PipelineStats

type PipelineStats struct {
	BlobDuration time.Duration
	DiffDuration time.Duration
	UASTDuration time.Duration

	BlobCacheHits   int64
	BlobCacheMisses int64
	DiffCacheHits   int64
	DiffCacheMisses int64
}

PipelineStats holds cumulative pipeline metrics for a single Coordinator run. Populated during Process(); valid after the returned channel is fully drained.

func (*PipelineStats) Add

func (s *PipelineStats) Add(other PipelineStats)

Add accumulates another PipelineStats into this one (cross-chunk aggregation).

type Runner

type Runner struct {
	Repo      *gitlib.Repository
	RepoPath  string
	Analyzers []analyze.HistoryAnalyzer
	Config    CoordinatorConfig

	// Tracer is the OTel tracer for creating pipeline spans.
	// When nil, falls back to otel.Tracer("codefang").
	Tracer trace.Tracer

	// CoreCount is the number of leading analyzers in the Analyzers slice that are
	// core (plumbing) analyzers. These run sequentially. Analyzers after CoreCount
	// are leaf analyzers that can be parallelized via Fork/Merge.
	// Set to 0 to disable parallel leaf consumption.
	CoreCount int
	// contains filtered or unexported fields
}

Runner orchestrates multiple HistoryAnalyzers over a commit sequence. It always uses the Coordinator pipeline (batch blob load + batch diff in C).

func NewRunner

func NewRunner(repo *gitlib.Repository, repoPath string, analyzers ...analyze.HistoryAnalyzer) *Runner

NewRunner creates a new Runner for the given repository and analyzers. Uses DefaultCoordinatorConfig(). Use NewRunnerWithConfig for custom configuration.

func NewRunnerWithConfig

func NewRunnerWithConfig(
	repo *gitlib.Repository,
	repoPath string,
	config CoordinatorConfig,
	analyzers ...analyze.HistoryAnalyzer,
) *Runner

NewRunnerWithConfig creates a new Runner with custom coordinator configuration.

func (*Runner) Finalize

func (runner *Runner) Finalize() (map[analyze.HistoryAnalyzer]analyze.Report, error)

Finalize finalizes all analyzers and returns their reports.

func (*Runner) Initialize

func (runner *Runner) Initialize() error

Initialize initializes all analyzers. Call once before processing chunks.

func (*Runner) ProcessChunk

func (runner *Runner) ProcessChunk(ctx context.Context, commits []*gitlib.Commit, indexOffset, chunkIndex int) (PipelineStats, error)

ProcessChunk processes a chunk of commits without Initialize/Finalize. Use this for streaming mode where Initialize is called once at start and Finalize once at end. The indexOffset is added to the commit index to maintain correct ordering across chunks. chunkIndex is the zero-based chunk number used for span naming.

func (*Runner) ProcessChunkFromData

func (runner *Runner) ProcessChunkFromData(ctx context.Context, data []CommitData, indexOffset, chunkIndex int) (PipelineStats, error)

ProcessChunkFromData consumes pre-fetched CommitData through analyzers, bypassing Coordinator creation. Used by double-buffered chunk pipelining where the pipeline has already run and collected data. Returns zero PipelineStats since the real stats come from the prefetch Coordinator.

func (*Runner) Run

func (runner *Runner) Run(ctx context.Context, commits []*gitlib.Commit) (map[analyze.HistoryAnalyzer]analyze.Report, error)

Run executes all analyzers over the given commits: initialize, consume each commit via pipeline, then finalize.

type StreamingConfig

type StreamingConfig struct {
	MemBudget     int64
	Checkpoint    CheckpointParams
	RepoPath      string
	AnalyzerNames []string

	// Logger is the structured logger for streaming operations.
	// When nil, a discard logger is used.
	Logger *slog.Logger

	// DebugTrace enables 100% trace sampling for debugging.
	DebugTrace bool

	// AnalysisMetrics records analysis-specific OTel metrics (commits, chunks,
	// cache stats). Nil-safe: when nil, no metrics are recorded.
	AnalysisMetrics *observability.AnalysisMetrics
}

StreamingConfig holds configuration for streaming pipeline execution.

type UASTPipeline

type UASTPipeline struct {
	Parser     *uast.Parser
	Workers    int
	BufferSize int
}

UASTPipeline pre-computes UAST changes for each commit in the pipeline, enabling cross-commit parallelism. It sits between DiffPipeline and the serial analyzer consumption loop.

func NewUASTPipeline

func NewUASTPipeline(parser *uast.Parser, workers, bufferSize int) *UASTPipeline

NewUASTPipeline creates a new UAST pipeline stage.

func (*UASTPipeline) Process

func (p *UASTPipeline) Process(ctx context.Context, diffs <-chan CommitData) <-chan CommitData

Process receives commit data with blobs and diffs, and adds pre-computed UAST changes. Multiple commits are processed concurrently by worker goroutines. Output order matches input order via a slot-based approach.

type Watchdog

type Watchdog struct {
	// contains filtered or unexported fields
}

Watchdog monitors worker pool health and recreates stalled workers. It wraps the pool worker channel with timeout-aware dispatch and provides exponential backoff retry on stall detection.

func NewWatchdog

func NewWatchdog(cfg WatchdogConfig) *Watchdog

NewWatchdog creates a Watchdog that monitors the worker pool. Returns nil if timeout is zero (disabled).

func (*Watchdog) StalledCount

func (wd *Watchdog) StalledCount() int

StalledCount returns the total number of stall events observed.

func (*Watchdog) WaitForDiffResponse

func (wd *Watchdog) WaitForDiffResponse(ch <-chan gitlib.DiffBatchResponse) (gitlib.DiffBatchResponse, bool)

WaitForDiffResponse waits for a diff response with timeout.

func (*Watchdog) WaitForResponse

func (wd *Watchdog) WaitForResponse(ch <-chan gitlib.BlobBatchResponse) (gitlib.BlobBatchResponse, bool)

WaitForResponse waits for a response on the given channel with timeout. Returns true if the response was received, false if the worker stalled. On stall, it recreates one pool worker and returns false so the caller can retry.

func (*Watchdog) WaitForTreeDiffResponse

func (wd *Watchdog) WaitForTreeDiffResponse(ch <-chan gitlib.TreeDiffResponse) (gitlib.TreeDiffResponse, bool)

WaitForTreeDiffResponse waits for a tree diff response with timeout.

type WatchdogConfig

type WatchdogConfig struct {
	Span         trace.Span
	RepoPath     string
	Config       CoordinatorConfig
	PoolRepos    []*gitlib.Repository
	PoolWorkers  []*gitlib.Worker
	PoolRequests chan gitlib.WorkerRequest
	Logger       *slog.Logger
}

WatchdogConfig holds parameters for creating a Watchdog.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL