Documentation
¶
Index ¶
- Constants
- func BatchLoadExpectedChildrenByDSTIDs(database *db.DB, dstParentIDs []string, dstIDToPath map[string]string) (expectedFoldersMap map[string][]types.Folder, ...)
- func BatchLoadRetryDstCleanup(database *db.DB, srcFolderIDs []string) (map[string]*RetryDstCleanup, error)
- func BuildExpectedMapsFromDstWithChildren(dstBatch []db.FetchResult, childrenByDstID map[string][]*db.NodeState) (expectedFoldersMap map[string][]types.Folder, ...)
- func SeedRootTask(queueType string, rootFolder types.Folder, rootNodeID string, database *db.DB) error
- func SeedRootTaskWithSrcID(queueType string, rootFolder types.Folder, rootNodeID string, srcID string, ...) error
- func SeedRootTasks(srcRoot types.Folder, dstRoot types.Folder, database *db.DB) error
- type ChildResult
- type CompletionCheckOptions
- type CopyTask
- type CopyWorker
- type ExternalQueueMetrics
- type InternalQueueMetrics
- type ProgressWatchdog
- type Queue
- func (q *Queue) Add(task *TaskBase) bool
- func (q *Queue) AddWorker(worker Worker)
- func (q *Queue) AdvanceCopyRound()
- func (q *Queue) AdvanceTraversalRound()
- func (q *Queue) CheckCopyCompletion(currentRound int) bool
- func (q *Queue) CheckTraversalCompletion(currentRound int) bool
- func (q *Queue) Clear()
- func (q *Queue) Close()
- func (q *Queue) CompleteCopyTask(task *TaskBase, executionDelta time.Duration)
- func (q *Queue) CompleteTraversalTask(task *TaskBase, executionDelta time.Duration)
- func (q *Queue) EnsureRoundExpectedFromStats()
- func (q *Queue) FailCopyTask(task *TaskBase, executionDelta time.Duration)
- func (q *Queue) FailTraversalTask(task *TaskBase, executionDelta time.Duration)
- func (q *Queue) GetAverageExecutionTime() time.Duration
- func (q *Queue) GetBytesTransferredTotal() int64
- func (q *Queue) GetCopyPass() int
- func (q *Queue) GetExecutionTimeBufferSize() int
- func (q *Queue) GetExecutionTimeDeltas() []time.Duration
- func (q *Queue) GetFilesCreatedTotal() int64
- func (q *Queue) GetFilesDiscoveredTotal() int64
- func (q *Queue) GetFoldersCreatedTotal() int64
- func (q *Queue) GetFoldersDiscoveredTotal() int64
- func (q *Queue) GetLastPullWasPartial() bool
- func (q *Queue) GetMode() QueueMode
- func (q *Queue) GetPendingCount() int
- func (q *Queue) GetRound() int
- func (q *Queue) GetRoundStats(round int) *RoundStats
- func (q *Queue) GetTotalCompleted() int
- func (q *Queue) GetTotalDiscovered() int64
- func (q *Queue) GetTotalFailed() int
- func (q *Queue) GetWorkerCount() int
- func (q *Queue) InProgressCount() int
- func (q *Queue) InitializeCopyWithContext(database *db.DB, srcAdapter, dstAdapter types.FSAdapter, ...)
- func (q *Queue) InitializeWithContext(database *db.DB, adapter types.FSAdapter, shutdownCtx context.Context)
- func (q *Queue) IsExhausted() bool
- func (q *Queue) IsPaused() bool
- func (q *Queue) Lease() *TaskBase
- func (q *Queue) Name() string
- func (q *Queue) Pause()
- func (q *Queue) PullCopyTasks(force bool)
- func (q *Queue) PullRetryTasks(force bool)
- func (q *Queue) PullTasksIfNeeded(force bool)
- func (q *Queue) PullTraversalTasks(force bool)
- func (q *Queue) ReportTaskResult(task *TaskBase, result TaskExecutionResult)
- func (q *Queue) Resume()
- func (q *Queue) Run()
- func (q *Queue) SetCopyPass(pass int)
- func (q *Queue) SetCopyResumeDstExistenceWindow(anchorPass, anchorRound int)
- func (q *Queue) SetMaxKnownDepth(depth int)
- func (q *Queue) SetMode(mode QueueMode)
- func (q *Queue) SetObserver(observer *QueueObserver)
- func (q *Queue) SetRound(round int)
- func (q *Queue) SetShutdownContext(ctx context.Context)
- func (q *Queue) SetState(state QueueState)
- func (q *Queue) SetStatsChannel(ch chan QueueStats)
- func (q *Queue) SetTraversalCacheLoaded(loaded bool)
- func (q *Queue) SetWorkers(workers []Worker)
- func (q *Queue) Shutdown()
- func (q *Queue) State() QueueState
- func (q *Queue) Stats() QueueStats
- func (q *Queue) TotalTracked() int
- type QueueCoordinator
- func (c *QueueCoordinator) CanDstStartRound(targetRound int) bool
- func (c *QueueCoordinator) GetRound(which string) int
- func (c *QueueCoordinator) IsCompleted(queueType string) bool
- func (c *QueueCoordinator) MarkCompleted(which string)
- func (c *QueueCoordinator) UpdateRound(which string, round int)
- func (c *QueueCoordinator) WaitSealBackpressure(_ string, round int, database *db.DB) bool
- type QueueMode
- type QueueObserver
- type QueueState
- type QueueStateSnapshot
- type QueueStats
- type QueueWatchdog
- type RetryDstChild
- type RetryDstCleanup
- type RoundInfo
- type RoundStats
- type SrcNodeMeta
- type TaskBase
- type TaskExecutionResult
- type TraversalWorker
- type UploadTask
- type Worker
Constants ¶
const ( TaskTypeSrcTraversal = "src-traversal" TaskTypeDstTraversal = "dst-traversal" TaskTypeUpload = "upload" TaskTypeCopy = "copy" TaskTypeCopyFolder = "copy-folder" // Copy phase: create folder TaskTypeCopyFile = "copy-file" // Copy phase: copy file with streaming )
Task types
Variables ¶
This section is empty.
Functions ¶
func BatchLoadExpectedChildrenByDSTIDs ¶
func BatchLoadExpectedChildrenByDSTIDs(database *db.DB, dstParentIDs []string, dstIDToPath map[string]string) ( expectedFoldersMap map[string][]types.Folder, expectedFilesMap map[string][]types.File, srcIDMap map[string]map[string]string, srcIDToMeta map[string]SrcNodeMeta, err error, )
BatchLoadExpectedChildrenByDSTIDs loads SRC children for the given DST folder IDs by joining on parent_path = dst.path. Prefer ListDstBatchWithSrcChildren + BuildExpectedMapsFromDstWithChildren for keyset-based DST pull (no IN query). Returns expectedFoldersMap, expectedFilesMap (keyed by DST task ID), srcIDMap (Type+DisplayName -> SRC node ID per DST ID), and srcIDToMeta (SRC ID -> meta).
func BatchLoadRetryDstCleanup ¶
func BatchLoadRetryDstCleanup(database *db.DB, srcFolderIDs []string) (map[string]*RetryDstCleanup, error)
BatchLoadRetryDstCleanup loads DST counterpart and children meta for each SRC folder ID (for retry mode DST cleanup).
func BuildExpectedMapsFromDstWithChildren ¶
func BuildExpectedMapsFromDstWithChildren(dstBatch []db.FetchResult, childrenByDstID map[string][]*db.NodeState) ( expectedFoldersMap map[string][]types.Folder, expectedFilesMap map[string][]types.File, srcIDMap map[string]map[string]string, srcIDToMeta map[string]SrcNodeMeta, )
BuildExpectedMapsFromDstWithChildren builds expectedFoldersMap, expectedFilesMap, srcIDMap, and srcIDToMeta from a DST batch and its SRC children (e.g. from ListDstBatchWithSrcChildren). Keyed by DST node ID.
func SeedRootTask ¶
func SeedRootTask(queueType string, rootFolder types.Folder, rootNodeID string, database *db.DB) error
SeedRootTask inserts the initial root folder task into DuckDB to kickstart traversal. For src: sets traversal_status='Pending' and copy_status='Successful' (root is not a copy task). For dst: sets traversal_status='Pending' Ensures stats bucket exists and is updated with root task counts. rootNodeID should be a deterministic ID generated by db.DeterministicNodeID().
func SeedRootTaskWithSrcID ¶
func SeedRootTaskWithSrcID(queueType string, rootFolder types.Folder, rootNodeID string, srcID string, database *db.DB) error
SeedRootTaskWithSrcID inserts the initial root folder task into DuckDB with a pre-set SrcID. This is used for DST root nodes that need to be matched to SRC root nodes. rootNodeID should be a deterministic ID generated by db.DeterministicNodeID().
func SeedRootTasks ¶
SeedRootTasks is a convenience function to seed both src and dst root tasks at once. Writes root tasks to DuckDB. Generates deterministic IDs for both root nodes and matches DST root to SRC root.
Types ¶
type ChildResult ¶
type ChildResult struct {
Folder types.Folder // Folder info (if folder)
File types.File // File info (if file)
Status string // "pending", "successful", "missing", "not_on_src"
IsFile bool // true if this is a file, false if folder
SrcID string // ULID of corresponding SRC node (for DST nodes only, set during matching)
SrcCopyStatus string // Copy status to update on SRC node (if SrcID is set and match found): "pending" or "successful", empty if no update needed
}
ChildResult represents a discovered child node with its traversal status.
type CompletionCheckOptions ¶
type CompletionCheckOptions struct {
CheckRoundComplete bool // Check if current round is complete
CheckFinalCompletion bool // Check if traversal/copy is complete (mode-specific logic in CheckTraversalCompletion / CheckCopyCompletion)
AdvanceRoundIfComplete bool // Advance to next round if current round is complete
}
CompletionCheckOptions configures what actions to take during completion checks.
type CopyTask ¶
type CopyTask struct {
TaskBase
SrcId string
DstId string
DstCtx types.ServiceContext
}
CopyTask represents a generic copy operation.
type CopyWorker ¶
type CopyWorker struct {
// contains filtered or unexported fields
}
CopyWorker executes copy tasks by creating folders or streaming files from source to destination. Each worker runs independently in its own goroutine, continuously polling the queue for work.
func NewCopyWorker ¶
func NewCopyWorker( id string, queue *Queue, srcAdapter types.FSAdapter, dstAdapter types.FSAdapter, shutdownCtx context.Context, ) *CopyWorker
NewCopyWorker creates a worker that executes copy tasks. shutdownCtx is optional - if provided, the worker will check for cancellation and exit on shutdown.
func (*CopyWorker) Run ¶
func (w *CopyWorker) Run()
Run is the main worker loop. It continuously polls the queue for tasks. When a task is found, it leases it, executes it, and reports the result. When no work is available or queue is paused, it briefly sleeps before polling again. When queue is exhausted, the worker exits.
type ExternalQueueMetrics ¶
type ExternalQueueMetrics struct {
// Monotonic counters (traversal phase)
FilesDiscoveredTotal int64 `json:"files_discovered_total"`
FoldersDiscoveredTotal int64 `json:"folders_discovered_total"`
// EMA-smoothed rates (2-5 second window) - traversal phase
DiscoveryRateItemsPerSec float64 `json:"discovery_rate_items_per_sec"`
// Verification counts (for O(1) stats bucket lookups)
TotalDiscovered int64 `json:"total_discovered"` // files + folders
TotalPending int `json:"total_pending"` // pending across all rounds (from DB)
TotalFailed int `json:"total_failed"` // failed across all rounds
// Copy phase metrics (monotonic counters)
Folders int64 `json:"folders"` // Total folders created
Files int64 `json:"files"` // Total files created
Total int64 `json:"total"` // Total items (folders + files)
Bytes int64 `json:"bytes"` // Total bytes transferred
// Copy phase rates (EMA-smoothed)
ItemsPerSecond float64 `json:"items_per_second"` // Items/sec (folders + files)
BytesPerSecond float64 `json:"bytes_per_second"` // Bytes/sec
// Current state (for API)
QueueStats
Round int `json:"round"`
}
ExternalQueueMetrics contains user-facing metrics published to DuckDB for API access.
type InternalQueueMetrics ¶
type InternalQueueMetrics struct {
// State-based time tracking (additive counters)
TimeProcessing time.Duration
TimeWaitingOnQueue time.Duration
TimeWaitingOnFS time.Duration
TimeRateLimited time.Duration
TimePausedRoundBoundary time.Duration
TimeIdleNoWork time.Duration
// Capacity metrics
TasksCompletedWhileActive int64
ActiveProcessingTime time.Duration
// Utilization metrics
WallClockTime time.Duration
LastState QueueState
LastStateChangeTime time.Time
}
InternalQueueMetrics contains control system metrics stored in memory for autoscaling decisions.
type ProgressWatchdog ¶ added in v0.1.41
type ProgressWatchdog struct {
// contains filtered or unexported fields
}
ProgressWatchdog detects stalled tasks by requiring progress (Beat) within a timeout. If no progress occurs, it cancels the context so adapter operations abort.
func NewProgressWatchdog ¶ added in v0.1.41
func NewProgressWatchdog(parent context.Context, timeout time.Duration) (*ProgressWatchdog, context.Context)
NewProgressWatchdog creates a watchdog and a child context. When no Beat() occurs within timeout, the context is cancelled. Call Stop() when the task ends to release resources.
func (*ProgressWatchdog) Beat ¶ added in v0.1.41
func (w *ProgressWatchdog) Beat()
Beat records progress. Call whenever meaningful work completes.
func (*ProgressWatchdog) Stop ¶ added in v0.1.41
func (w *ProgressWatchdog) Stop()
Stop cancels the context and stops the monitor goroutine.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue maintains round-based task queues for BFS traversal coordination. It handles task leasing, retry logic, and cross-queue task propagation. All operational state lives in DuckDB, flushed via per-queue buffers.
func NewQueue ¶
func NewQueue(name string, maxRetries int, workerCount int, coordinator *QueueCoordinator) *Queue
NewQueue creates a new Queue instance.
func (*Queue) Add ¶
Add enqueues a task into the pending buffer. Returns false if task is nil, has empty ID, or is already in progress.
func (*Queue) AddWorker ¶
AddWorker registers a worker with this queue for reference. Workers manage their own lifecycle - this is just for tracking/debugging.
func (*Queue) AdvanceCopyRound ¶
func (q *Queue) AdvanceCopyRound()
AdvanceCopyRound handles copy-specific round advancement logic. Round completion is determined by lastPullWasPartial (memory/keyset only); we never query the DB for in-round advancement. When called, the current round has just completed - we always advance to currentRound+1.
func (*Queue) AdvanceTraversalRound ¶
func (q *Queue) AdvanceTraversalRound()
AdvanceTraversalRound handles traversal/retry-specific round advancement logic. For traversal/retry modes, simply increments the round by 1.
func (*Queue) CheckCopyCompletion ¶
CheckCopyCompletion checks if the copy phase should switch passes or complete. Only called when we're past maxKnownDepth - the pass has exhausted itself round-by-round. Trust the per-round logic; no re-checking of pending/inProgress/wasFirstPull.
func (*Queue) CheckTraversalCompletion ¶
CheckTraversalCompletion checks if traversal/retry phase should complete. DB-backed: no pending at depth (from GetPendingTraversalCountAtDepthFromLive), attempted pull, first pull returned 0.
func (*Queue) Clear ¶
func (q *Queue) Clear()
Clear removes all tasks from DuckDB and resets in-progress tracking. Note: This is a destructive operation - use with caution.
func (*Queue) Close ¶
func (q *Queue) Close()
Close stops the queue and cleans up resources: stats publishing loop, output buffer, and sets state to Stopped if not already Completed or Stopped.
func (*Queue) CompleteCopyTask ¶
CompleteCopyTask handles successful completion of copy tasks. Updates copy status, creates DST node entry, and updates join-lookup mapping.
func (*Queue) CompleteTraversalTask ¶
CompleteTraversalTask handles successful completion of traversal/retry tasks. This includes child discovery, status updates, and buffer operations.
func (*Queue) EnsureRoundExpectedFromStats ¶
func (q *Queue) EnsureRoundExpectedFromStats()
EnsureRoundExpectedFromStats sets Expected for the current round from the stats bucket (O(1) lookup). Call after SetRound (e.g. on init or resume) so Expected reflects actual pending count and survives restarts.
func (*Queue) FailCopyTask ¶
FailCopyTask handles failure of copy tasks. Updates copy status to failed if max retries exceeded, or back to pending if retrying.
func (*Queue) FailTraversalTask ¶
FailTraversalTask handles failure of traversal/retry tasks. Retries up to maxRetries, then marks as failed.
func (*Queue) GetAverageExecutionTime ¶
GetAverageExecutionTime returns the current average task execution time.
func (*Queue) GetBytesTransferredTotal ¶
GetBytesTransferredTotal returns the total bytes transferred during copy phase.
func (*Queue) GetCopyPass ¶
GetCopyPass returns the current copy pass.
func (*Queue) GetExecutionTimeBufferSize ¶
GetExecutionTimeBufferSize returns the current size of the execution time buffer.
func (*Queue) GetExecutionTimeDeltas ¶
GetExecutionTimeDeltas returns a copy of the execution time deltas buffer.
func (*Queue) GetFilesCreatedTotal ¶
GetFilesCreatedTotal returns the total files created during copy phase.
func (*Queue) GetFilesDiscoveredTotal ¶
func (*Queue) GetFoldersCreatedTotal ¶
GetFoldersCreatedTotal returns the total folders created during copy phase.
func (*Queue) GetFoldersDiscoveredTotal ¶
func (*Queue) GetLastPullWasPartial ¶ added in v0.1.41
func (*Queue) GetPendingCount ¶
GetPendingCount returns the number of tasks in the pending buffer.
func (*Queue) GetRoundStats ¶
func (q *Queue) GetRoundStats(round int) *RoundStats
GetRoundStats returns the statistics for a specific round. Returns nil if the round has no stats yet.
func (*Queue) GetTotalCompleted ¶
GetTotalCompleted returns the total number of completed tasks across all rounds.
func (*Queue) GetTotalDiscovered ¶
GetTotalDiscovered returns the total number of items discovered (files + folders).
func (*Queue) GetTotalFailed ¶
GetTotalFailed returns the total number of failed tasks across all rounds.
func (*Queue) GetWorkerCount ¶ added in v0.1.41
GetWorkerCount returns the number of workers registered with this queue.
func (*Queue) InProgressCount ¶
InProgressCount returns the number of tasks currently being executed.
func (*Queue) InitializeCopyWithContext ¶
func (q *Queue) InitializeCopyWithContext(database *db.DB, srcAdapter, dstAdapter types.FSAdapter, shutdownCtx context.Context)
InitializeCopyWithContext sets up a copy queue with both source and destination adapters. This is specifically for copy mode which requires both adapters.
func (*Queue) InitializeWithContext ¶
func (q *Queue) InitializeWithContext(database *db.DB, adapter types.FSAdapter, shutdownCtx context.Context)
InitializeWithContext sets up the queue with DuckDB, context, and filesystem adapter references. Creates and starts workers immediately - they'll poll for tasks autonomously. shutdownCtx is optional - if provided, workers will check for cancellation and exit on shutdown. For copy mode, InitializeCopyWithContext should be used instead to provide both adapters.
func (*Queue) IsExhausted ¶
IsExhausted returns true if the queue has finished all traversal or has been stopped.
func (*Queue) Lease ¶
Lease attempts to lease a task for execution atomically. Returns nil if no tasks are available, queue is paused, or completed.
func (*Queue) Pause ¶
func (q *Queue) Pause()
Pause pauses the queue (workers will not lease new tasks).
func (*Queue) PullCopyTasks ¶
PullCopyTasks pulls copy tasks from DuckDB for the current round. Pulls from SRC copy status buckets, filters by pass (folders vs files), and skips round 0. Uses getter/setter methods - no direct mutex access.
func (*Queue) PullRetryTasks ¶
PullRetryTasks pulls retry tasks from failed/pending status buckets. Checks maxKnownDepth and scans all known levels up to maxKnownDepth, then uses normal traversal logic for deeper levels. Uses getter/setter methods - no direct mutex access.
func (*Queue) PullTasksIfNeeded ¶
func (*Queue) PullTraversalTasks ¶
PullTraversalTasks refills the queue from DuckDB for the current round (ID-offset pagination, ~10K batch). SRC: ListNodesByDepthKeyset; DST: ListDstBatchWithSrcChildren (join for expected children). Pushed directly to queue.
func (*Queue) ReportTaskResult ¶
func (q *Queue) ReportTaskResult(task *TaskBase, result TaskExecutionResult)
ReportTaskResult reports the result of a task execution and handles post-processing. This is the event-driven entry point that replaces separate Complete()/Fail() calls. After processing the result, it checks if we need to pull more tasks or advance rounds.
func (*Queue) Run ¶
func (q *Queue) Run()
Run is the main queue coordination loop. It has an outer loop for rounds and an inner loop for each round. The outer loop checks coordinator gates before starting each round (DST only). The inner loop processes tasks until the round is complete.
func (*Queue) SetCopyPass ¶
SetCopyPass sets the current copy pass.
func (*Queue) SetCopyResumeDstExistenceWindow ¶ added in v0.1.41
SetCopyResumeDstExistenceWindow enables the copy worker dst ListChildren precheck until the anchor pass+round is left (see AdvanceCopyRound). Only for normal copy mode (not copy-retry); call from RunCopyPhase when resuming a partially completed copy (Successful>0 and Pending>0 in status events).
func (*Queue) SetMaxKnownDepth ¶
SetMaxKnownDepth sets the maximum depth for traversal/copy. Set to -1 to auto-detect.
func (*Queue) SetObserver ¶
func (q *Queue) SetObserver(observer *QueueObserver)
SetObserver registers this queue with an observer for DuckDB stats publishing. The observer will poll this queue directly for statistics.
func (*Queue) SetShutdownContext ¶
SetShutdownContext sets the shutdown context for the queue.
func (*Queue) SetState ¶
func (q *Queue) SetState(state QueueState)
SetState sets the queue lifecycle state.
func (*Queue) SetStatsChannel ¶
func (q *Queue) SetStatsChannel(ch chan QueueStats)
Shutdown gracefully shuts down the queue. No buffer to stop - all writes are direct/synchronous now. SetStatsChannel sets the channel for publishing queue statistics for UDP logging. The queue will periodically publish stats to this channel.
func (*Queue) SetTraversalCacheLoaded ¶
SetTraversalCacheLoaded sets whether we have completed the first pull for the current round. Until true, CheckTraversalCompletion returns false so the queue does not complete before the first pull.
func (*Queue) SetWorkers ¶
SetWorkers sets the workers associated with this queue.
func (*Queue) Shutdown ¶
func (q *Queue) Shutdown()
Shutdown stops the stats publishing loop and cleans up resources.
func (*Queue) State ¶
func (q *Queue) State() QueueState
State returns the current queue lifecycle state.
func (*Queue) Stats ¶
func (q *Queue) Stats() QueueStats
Stats returns a snapshot of the queue's current state. Uses only in-memory counters - no database queries.
func (*Queue) TotalTracked ¶
TotalTracked returns the total number of tasks across all rounds (pending + in-progress).
type QueueCoordinator ¶
type QueueCoordinator struct {
// contains filtered or unexported fields
}
QueueCoordinator manages round advancement gates for dual-BFS traversal. It enforces: DST cannot advance to round N until SRC has completed rounds N and N+1 (or SRC traversal is done). SRC is not level-gated relative to DST; frontier is streamed to the database in chunks.
func NewQueueCoordinator ¶
func NewQueueCoordinator() *QueueCoordinator
NewQueueCoordinator creates a new coordinator.
func (*QueueCoordinator) CanDstStartRound ¶
func (c *QueueCoordinator) CanDstStartRound(targetRound int) bool
CanDstStartRound returns true if DST can start processing the specified round. DST can start round N if:
- SRC has completed traversal entirely (DST can proceed freely), OR
- SRC has completed rounds N and N+1 (SRC is at round N+2 or higher)
Note: Since DST can now freely advance to a round and then pause, the check is N+2 (if DST wants to start round 4, SRC needs to have completed rounds 4 and 5, so SRC >= 6). Once SRC is completed, DST can proceed at full speed with no restrictions.
func (*QueueCoordinator) GetRound ¶
func (c *QueueCoordinator) GetRound(which string) int
GetRound returns the current round for SRC or DST.
func (*QueueCoordinator) IsCompleted ¶
func (c *QueueCoordinator) IsCompleted(queueType string) bool
IsCompleted returns true if the specified queue ("src", "dst", or "both") has completed traversal.
func (*QueueCoordinator) MarkCompleted ¶
func (c *QueueCoordinator) MarkCompleted(which string)
MarkCompleted marks SRC or DST as completed based on the argument ("src" or "dst").
func (*QueueCoordinator) UpdateRound ¶
func (c *QueueCoordinator) UpdateRound(which string, round int)
UpdateRound updates the current round for SRC or DST.
func (*QueueCoordinator) WaitSealBackpressure ¶
WaitSealBackpressure ensures the seal buffer has flushed through the given round before the caller drops that level from node cache. Call after enqueueing the round's seal data and before dropping the level. Prevents the DB flush buffer from growing unbounded. Returns false if flushing fails; callers should fail closed (do not drop level or advance round).
type QueueMode ¶
type QueueMode string
QueueMode represents the operation mode of a queue.
const ( QueueModeTraversal QueueMode = "traversal" // Normal BFS traversal QueueModeRetry QueueMode = "retry" // Retry failed tasks sweep QueueModeCopy QueueMode = "copy" // Copy phase (folders then files) QueueModeCopyRetry QueueMode = "copy-retry" // Copy retry: only copy_status = failed, max-depth guarded completion )
type QueueObserver ¶
type QueueObserver struct {
// contains filtered or unexported fields
}
QueueObserver collects statistics from queues by polling them directly and publishes them to DuckDB periodically. Similar to QueueCoordinator, but focused on observability rather than coordination.
func NewQueueObserver ¶
func NewQueueObserver(database *db.DB, updateInterval time.Duration) *QueueObserver
NewQueueObserver creates a new observer that will publish stats to DuckDB. updateInterval is how often stats are written to DuckDB (default: 200ms).
func (*QueueObserver) RegisterQueue ¶
func (o *QueueObserver) RegisterQueue(queueName string, queue *Queue)
RegisterQueue registers a queue with the observer. The observer will poll this queue directly for statistics.
func (*QueueObserver) Start ¶
func (o *QueueObserver) Start()
Start begins the observer loop that publishes stats to DuckDB. This is called automatically when the first queue is registered, but can be called manually.
func (*QueueObserver) Stop ¶
func (o *QueueObserver) Stop()
Stop stops the observer loop and cleans up resources. This should only be called once. Calling it multiple times is safe but has no effect.
func (*QueueObserver) UnregisterQueue ¶
func (o *QueueObserver) UnregisterQueue(queueName string)
UnregisterQueue removes a queue from the observer.
type QueueState ¶
type QueueState string
QueueState represents the lifecycle state of a queue.
const ( QueueStateRunning QueueState = "running" // Queue is active and processing QueueStatePaused QueueState = "paused" // Queue is paused QueueStateStopped QueueState = "stopped" // Queue is stopped QueueStateWaiting QueueState = "waiting" // Queue is waiting for coordinator to allow advancement (DST only) QueueStateCompleted QueueState = "completed" // Traversal complete (max depth reached) )
type QueueStateSnapshot ¶
type QueueStateSnapshot struct {
State QueueState
Round int
PendingCount int
InProgressCount int
Pulling bool
LastPullWasPartial bool
FirstPullForRound bool
PullLowWM int
Database *db.DB
Mode QueueMode
}
getStateSnapshot returns a snapshot of queue state for use in logic functions
type QueueStats ¶
type QueueStats struct {
Name string
Round int
Pending int
InProgress int
TotalTracked int
Workers int
}
Stats returns current queue statistics.
type QueueWatchdog ¶ added in v0.1.41
type QueueWatchdog struct {
// contains filtered or unexported fields
}
QueueWatchdog monitors queue progress and dumps state when stalled. A stall is detected when no tasks complete for the configured timeout while tasks remain in-progress or pending.
func NewQueueWatchdog ¶ added in v0.1.41
func NewQueueWatchdog(q *Queue, stallTimeout time.Duration) *QueueWatchdog
NewQueueWatchdog creates a watchdog for the given queue.
func (*QueueWatchdog) Beat ¶ added in v0.1.41
func (wd *QueueWatchdog) Beat()
Beat records progress (call when a task completes).
func (*QueueWatchdog) Start ¶ added in v0.1.41
func (wd *QueueWatchdog) Start()
Start begins monitoring in a background goroutine.
func (*QueueWatchdog) Stop ¶ added in v0.1.41
func (wd *QueueWatchdog) Stop()
Stop stops the watchdog monitoring.
type RetryDstChild ¶
RetryDstChild holds DST child node meta for retry DST cleanup; populated at pull to avoid per-child DB lookups.
type RetryDstCleanup ¶
type RetryDstCleanup struct {
DstID string
DstDepth int
DstOldStatus string
Children []RetryDstChild
}
RetryDstCleanup holds DST counterpart and its children meta for SRC folder tasks in retry mode; populated at pull.
type RoundInfo ¶
type RoundInfo struct {
Round int // Round number
PullCount int // Number of pull operations (queries) performed this round
ItemsYielded int // Pulled amount: total items actually returned from DB queries this round (like completed count but for pulls)
ExpectedCount int // Expected items from DB (if known)
TasksCompleted int // Successfully completed tasks
TasksFailed int // Failed tasks
StartTime time.Time // When this round started
LastPullTime time.Time // Timestamp of last pull operation
AvgTasksPerSec float64 // Rolling average tasks/sec
LastPartialPull bool // Whether the last pull was partial (< batch size)
}
RoundInfo tracks statistics and metadata for a specific BFS round.
type RoundStats ¶
type RoundStats struct {
Expected int // Expected tasks for this round (folder children inserted)
Completed int // Tasks completed in this round (successful + failed)
Failed int // Tasks failed in this round
}
RoundStats tracks statistics for a specific round.
type SrcNodeMeta ¶
SrcNodeMeta holds Depth and CopyStatus for an SRC node; used by DST tasks at completion to avoid per-child DB lookups.
type TaskBase ¶
type TaskBase struct {
ID string // ULID for internal tracking (database keys)
Type string // Task type: "src-traversal", "dst-traversal", "upload", etc.
Folder types.Folder // Folder to process (if applicable)
File types.File // File to process (if applicable)
Locked bool // Whether this task is currently leased by a worker
Attempts int // Number of execution attempts
Status string // Execution result: "successful", "failed" (set by queue)
WorkerResult string // Worker execution result: "success", "error" (set by worker before ReportTaskResult)
LastError string // Error message from last execution attempt (set by worker, written by queue)
ExpectedFolders []types.Folder // Expected folders (dst tasks only)
ExpectedFiles []types.File // Expected files (dst tasks only)
ExpectedSrcIDMap map[string]string // Map of Type+Name -> SRC node ID for matching (dst tasks only)
ExpectedSrcNodeMeta map[string]SrcNodeMeta // SRC node Depth/CopyStatus keyed by SRC ID (dst tasks only, populated at pull)
RetryDstCleanup *RetryDstCleanup // DST counterpart + children meta for SRC folder in retry mode (populated at pull)
DiscoveredChildren []ChildResult // Children discovered during execution
Round int // The round this task belongs to (for buffer coordination)
LeaseTime time.Time // Time when task was leased (for execution time tracking)
CopyStatus string // Current SRC copy status from DB (used to preserve copy_status on traversal completion events)
// Copy phase specific fields
CopyPass int // Copy pass number (1 for folders, 2 for files)
SrcTraversalStatus string // SRC node traversal_status at pull time (preserved when writing copy_status events)
BytesTransferred int64 // Bytes transferred for file copy tasks
DstParentID string // Destination parent folder ID for creation
}
TaskBase represents the foundational structure for all task types. Workers lease tasks, mark them Locked, and attempt execution. Tasks are identified by ULID (ID) for internal tracking.
func (*TaskBase) Identifier ¶
Identifier returns the unique identifier for this task (absolute path).
func (*TaskBase) IsFolder ¶
IsFolder returns whether this task represents a folder traversal. Copy phase: use TaskType (copy-folder) so folder tasks are recognized even when ServiceID is empty; traversal/retry still use Folder.ServiceID when Type is not copy-folder/copy-file.
func (*TaskBase) LocationPath ¶
LocationPath returns the logical, root-relative path for this task.
type TaskExecutionResult ¶
type TaskExecutionResult string
TaskExecutionResult represents the result of a task execution.
const ( TaskExecutionResultSuccessful TaskExecutionResult = "successful" TaskExecutionResultFailed TaskExecutionResult = "failed" )
type TraversalWorker ¶
type TraversalWorker struct {
// contains filtered or unexported fields
}
TraversalWorker executes traversal tasks by listing children and recording them to DuckDB. Each worker runs independently in its own goroutine, continuously polling the queue for work.
func NewTraversalWorker ¶
func NewTraversalWorker( id string, queue *Queue, adapter types.FSAdapter, queueName string, shutdownCtx context.Context, ) *TraversalWorker
NewTraversalWorker creates a worker that executes traversal tasks. shutdownCtx is optional - if provided, the worker will check for cancellation and exit on shutdown.
func (*TraversalWorker) Run ¶
func (w *TraversalWorker) Run()
Run is the main worker loop. It continuously polls the queue for tasks. When a task is found, it leases it, executes it, and reports the result. When no work is available or queue is paused, it briefly sleeps before polling again. When queue is exhausted, the worker exits.
type UploadTask ¶
type UploadTask struct {
TaskBase
SrcId string // Source file identifier
DstId string // Destination parent folder identifier
DstCtx types.ServiceContext
}
UploadTask represents a task to upload a file from source to destination.