core

package
v0.0.0-...-1bb08d0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2026 License: MIT Imports: 30 Imported by: 0

Documentation

Overview

Package core provides business logic for the Marionette server.

Index

Constants

View Source
const (
	PermissionStatusPending  = "pending"
	PermissionStatusApproved = "approved"
	PermissionStatusDenied   = "denied"
	PermissionStatusCanceled = "canceled"
)

Permission status constants.

View Source
const (
	RiskLevelLow      = "low"
	RiskLevelMedium   = "medium"
	RiskLevelHigh     = "high"
	RiskLevelCritical = "critical"
)

Risk level constants.

View Source
const (
	StatusOffline = "offline"
	StatusIdle    = "idle"
	StatusBusy    = "busy"
	StatusPaused  = "paused"
)

Runner status constants.

View Source
const (
	DefaultCheckInterval  = 30 * time.Second // How often to check for stale runners
	DefaultStaleThreshold = 90 * time.Second // 3 missed heartbeats (30s interval)
)

Default stale detection configuration.

View Source
const (
	DefaultScheduledTaskCheckInterval = 30 * time.Second
	DefaultScheduledTaskBatchSize     = 50
)

Default scheduled task executor configuration.

View Source
const (
	ScheduledTaskStatusActive   = "active"
	ScheduledTaskStatusPaused   = "paused"
	ScheduledTaskStatusDisabled = "disabled"
)

ScheduledTask status constants.

View Source
const (
	OnFailureContinue         = "continue"
	OnFailurePauseOnFailure   = "pause_on_failure"
	OnFailureDisableOnFailure = "disable_on_failure"
)

ScheduledTask OnFailure policies.

View Source
const (
	DefaultScheduledTaskTimeoutSeconds = 3600 // 1 hour
	DefaultScheduledTaskMaxRetries     = 0
	DefaultMaxConsecutiveFailures      = 3
	DefaultScheduledTaskTimezone       = "UTC"
)

Default scheduled task configuration.

View Source
const (
	SessionStatusPending    = "pending"
	SessionStatusActive     = "active"
	SessionStatusSuspended  = "suspended"
	SessionStatusResuming   = "resuming"
	SessionStatusTerminated = "terminated"
)

Session status constants.

View Source
const (
	NetworkPolicyNone      = "none"
	NetworkPolicyAllowList = "allow_list"
	NetworkPolicyProxy     = "proxy"
	NetworkPolicyAirGapped = "air_gapped"
)

Network policy constants.

View Source
const (
	LifecycleModeOnDemand  = "on_demand"
	LifecycleModeAlwaysOn  = "always_on"
	LifecycleModeScheduled = "scheduled"
)

Lifecycle mode constants.

View Source
const (
	TaskStatusPending   = "pending"
	TaskStatusRunning   = "running"
	TaskStatusCompleted = "completed"
	TaskStatusFailed    = "failed"
	TaskStatusCanceled  = "canceled"
)

Task status constants.

View Source
const (
	TaskRunStatusPending   = "pending"
	TaskRunStatusAssigned  = "assigned"
	TaskRunStatusRunning   = "running"
	TaskRunStatusCompleted = "completed"
	TaskRunStatusFailed    = "failed"
	TaskRunStatusTimeout   = "timeout"
	TaskRunStatusCanceled  = "canceled"
)

TaskRun status constants.

View Source
const (
	DefaultTaskTimeoutSeconds = 3600 // 1 hour
	DefaultMaxRetries         = 0
)

Default task configuration.

View Source
const (
	WorkspaceMobilityLocal      = "local"
	WorkspaceMobilityShared     = "shared"
	WorkspaceMobilityObjectSync = "object_sync"
)

Workspace mobility constants.

View Source
const CurrentSnapshotVersion = 1

CurrentSnapshotVersion is the current snapshot format version.

View Source
const (
	DefaultPermissionTimeoutCheckInterval = 60 * time.Second
)

Default permission timeout enforcement configuration.

View Source
const (
	DefaultSuspendAfterSeconds = 1800 // 30 minutes
)

Default permission timeout configuration.

View Source
const (
	DefaultTimeoutCheckInterval = 30 * time.Second
)

Default timeout enforcement configuration.

View Source
const (
	DefaultWorkspaceBaseDir = "/var/marionette/workspaces"
)

Default workspace settings.

View Source
const (
	WorkspaceStorageTypeVolume = "volume"
)

Workspace storage type constants.

Variables

View Source
var (
	ErrPermissionNotFound         = errors.New("permission request not found")
	ErrPermissionAlreadyResponded = errors.New("permission request already responded")
	ErrPermissionNotPending       = errors.New("permission request is not pending")
)

Permission-related errors.

View Source
var (
	ErrScheduledTaskNotFound       = errors.New("scheduled task not found")
	ErrInvalidCronExpression       = errors.New("invalid cron expression")
	ErrInvalidTimezone             = errors.New("invalid timezone")
	ErrScheduledTaskNameRequired   = errors.New("name is required")
	ErrScheduledTaskPromptRequired = errors.New("prompt_template is required")
	ErrScheduledTaskCronRequired   = errors.New("cron_expression is required")
	ErrScheduledTaskAlreadyActive  = errors.New("scheduled task is already active")
	ErrScheduledTaskAlreadyPaused  = errors.New("scheduled task is already paused")
	ErrScheduledTaskDisabled       = errors.New("scheduled task is disabled")
)

ScheduledTask-related errors.

View Source
var (
	ErrSessionNotFound          = errors.New("session not found")
	ErrInvalidSessionTransition = errors.New("invalid session status transition")
	ErrSessionAlreadyTerminated = errors.New("session is already terminated")
	ErrSessionNotActive         = errors.New("session is not active")
	ErrSessionNotSuspended      = errors.New("session is not suspended")
	ErrSessionNoRunner          = errors.New("session has no runner attached")
	ErrSessionAlreadyHasRunner  = errors.New("session already has a runner attached")
	ErrWorkspaceRequired        = errors.New("workspace_id is required")
	ErrAgentRequired            = errors.New("agent is required")
	ErrRunnerNotIdle            = errors.New("runner is not idle")
	ErrScheduleCronRequired     = errors.New("schedule_cron is required for scheduled lifecycle mode")
)

Session-related errors.

View Source
var (
	ErrTaskNotFound             = errors.New("task not found")
	ErrTaskRunNotFound          = errors.New("task run not found")
	ErrInvalidTaskTransition    = errors.New("invalid task status transition")
	ErrInvalidTaskRunTransition = errors.New("invalid task run status transition")
	ErrTaskAlreadyCompleted     = errors.New("task is already completed")
	ErrTaskAlreadyCanceled      = errors.New("task is already canceled")
	ErrSessionRequired          = errors.New("session_id is required")
	ErrPromptRequired           = errors.New("prompt is required")
	ErrNoRunnerAttached         = errors.New("no runner attached to session")
	ErrMaxRetriesExceeded       = errors.New("max retries exceeded")
)

Task-related errors.

View Source
var (
	ErrWebhookNotFound      = errors.New("webhook not found")
	ErrWebhookNameExists    = errors.New("webhook name already exists")
	ErrWebhookEventNotFound = errors.New("webhook event not found")
	ErrInvalidEventPattern  = errors.New("invalid event pattern")
)

Webhook-related errors.

View Source
var (
	ErrWorkspaceNotFound      = errors.New("workspace not found")
	ErrWorkspaceAlreadyExists = errors.New("workspace already exists")
	ErrWorkspaceDeleted       = errors.New("workspace has been deleted")
	ErrWorkspaceInUse         = errors.New("workspace is in use by a session")
	ErrInvalidWorkspaceName   = errors.New("invalid workspace name")
)

Workspace-related errors.

View Source
var ErrInvalidStatusTransition = errors.New("invalid status transition")

ErrInvalidStatusTransition is returned when an invalid status transition is attempted.

View Source
var ErrTokenBoundToOtherRunner = errors.New("token is bound to a different runner")

ErrTokenBoundToOtherRunner is returned when token is bound to a different runner.

View Source
var ErrTokenRequired = errors.New("runner token is required")

ErrTokenRequired is returned when no token is provided.

Functions

func IsValidSessionTransition

func IsValidSessionTransition(from, to string) bool

IsValidSessionTransition is exported for testing.

func IsValidTaskRunTransition

func IsValidTaskRunTransition(from, to string) bool

IsValidTaskRunTransition is exported for testing.

func IsValidTaskTransition

func IsValidTaskTransition(from, to string) bool

IsValidTaskTransition is exported for testing.

func IsValidTransition

func IsValidTransition(from, to string) bool

IsValidTransition is exported for testing.

Types

type CommandSender

type CommandSender interface {
	SendCommand(runnerID string, cmd *pb.ServerCommand) error
}

CommandSender defines the interface for sending commands to runners. This is implemented by grpc.ConnectionManager.

type ConnectionManagerInterface

type ConnectionManagerInterface interface {
	IsConnected(runnerID string) bool
	UpdateLastSeen(runnerID string) error
}

ConnectionManagerInterface defines the interface for connection management. This is implemented by grpc.ConnectionManager.

type ContextSnapshot

type ContextSnapshot struct {
	// WorkingDirectory is the agent's current working directory.
	WorkingDirectory string `json:"working_directory,omitempty"`

	// Environment contains environment variables to restore.
	Environment map[string]string `json:"environment,omitempty"`

	// ConversationID is the agent's conversation ID (for agents that support it).
	// This allows the agent to continue the conversation after resume.
	ConversationID string `json:"conversation_id,omitempty"`

	// AgentState contains agent-specific state as raw JSON.
	// This is opaque to the server and passed directly to the agent on resume.
	AgentState json.RawMessage `json:"agent_state,omitempty"`

	// LastActivity is when the session was last active.
	LastActivity time.Time `json:"last_activity"`

	// Version is the snapshot format version for compatibility checking.
	Version int `json:"version"`

	// AgentVersion is the agent version that created this snapshot.
	// Used to check compatibility on resume.
	AgentVersion string `json:"agent_version,omitempty"`

	// PendingPermissions contains IDs of pending permission requests.
	// These need to be delivered to the agent on resume.
	PendingPermissions []string `json:"pending_permissions,omitempty"`

	// TaskContext contains information about the current/last task.
	TaskContext *TaskContextSnapshot `json:"task_context,omitempty"`
}

ContextSnapshot represents the saved state of a session for suspend/resume. This allows sessions to be resumed on different runners while preserving the agent's context and working state.

func NewContextSnapshot

func NewContextSnapshot() *ContextSnapshot

NewContextSnapshot creates a new context snapshot with default values.

func ParseContextSnapshot

func ParseContextSnapshot(data json.RawMessage) (*ContextSnapshot, error)

ParseContextSnapshot deserializes a snapshot from JSON.

func (*ContextSnapshot) IsCompatible

func (s *ContextSnapshot) IsCompatible(agentVersion string) bool

IsCompatible checks if the snapshot is compatible with the given agent version. For now, we only check the snapshot version, not the agent version.

func (*ContextSnapshot) Merge

func (s *ContextSnapshot) Merge(other *ContextSnapshot)

Merge merges another snapshot into this one, overwriting non-empty fields.

func (*ContextSnapshot) ToJSON

func (s *ContextSnapshot) ToJSON() (json.RawMessage, error)

ToJSON serializes the snapshot to JSON.

type CreatePermissionRequestInput

type CreatePermissionRequestInput struct {
	OriginalRequestID   string // Original request ID from agent (e.g., tool_use_id)
	SessionID           string
	TaskID              string
	RunID               string
	Tool                string
	Action              string
	Context             string
	RiskLevel           string
	SuspendAfterSeconds int
	TenantID            *string
}

CreatePermissionRequestInput contains input for creating a permission request.

type CreateScheduledTaskOptions

type CreateScheduledTaskOptions struct {
	SessionID              string            // Required
	Name                   string            // Required
	Description            string            // Optional
	CronExpression         string            // Required: e.g., "0 9 * * 1-5"
	Timezone               string            // Default: "UTC"
	PromptTemplate         string            // Required: may contain {{.Date}}, {{.RunNumber}}
	TimeoutSeconds         int               // Default: 3600
	MaxRetries             int               // Default: 0
	OnFailure              string            // Default: "continue"
	MaxConsecutiveFailures *int              // Default: 3
	TenantID               *string           // For multi-tenant deployments
	Labels                 map[string]string // Optional metadata labels
	Annotations            map[string]string // Optional metadata annotations
}

CreateScheduledTaskOptions contains options for creating a scheduled task.

type CreateSessionOptions

type CreateSessionOptions struct {
	Name          *string           // Optional session name
	WorkspaceID   string            // Required
	Agent         string            // Required (e.g., "claude")
	IsBYOK        bool              // Whether using BYOK mode
	AgentConfigID *string           // Optional, for managed credentials
	ProfileID     *string           // Optional, for runner configuration
	LifecycleMode string            // on_demand, always_on, scheduled
	IdleTimeout   *int              // Seconds (for on_demand mode)
	NetworkPolicy string            // none, allow_list, proxy, air_gapped
	AllowedHosts  []string          // For allow_list network policy
	ScheduleCron  *string           // Required for scheduled mode
	ScheduleTZ    *string           // Timezone for scheduled mode
	TenantID      *string           // For multi-tenant deployments
	Labels        map[string]string // Optional metadata labels
	Annotations   map[string]string // Optional metadata annotations
}

CreateSessionOptions contains options for creating a new session.

type CreateTaskOptions

type CreateTaskOptions struct {
	SessionID      string            // Required
	Prompt         string            // Required
	MaxRetries     int               // Default: 0
	TimeoutSeconds int               // Default: 3600
	TenantID       *string           // For multi-tenant deployments
	Labels         map[string]string // Optional metadata labels
	Annotations    map[string]string // Optional metadata annotations
}

CreateTaskOptions contains options for creating a new task.

type CreateWebhookInput

type CreateWebhookInput struct {
	Name              string
	URL               string
	Events            []string
	MaxRetries        *int
	RetryDelaySeconds *int
	TimeoutSeconds    *int
	Headers           map[string]string
	TenantID          *string
	Labels            map[string]string
	Annotations       map[string]string
}

CreateWebhookInput contains input for creating a webhook.

type CreateWorkspaceOptions

type CreateWorkspaceOptions struct {
	Name        string            // Optional workspace name
	Persist     *bool             // Whether to persist workspace (default: true)
	StorageType string            // Storage type (default: "volume")
	Mobility    string            // Mobility mode (default: "local")
	DiskQuotaMB *int              // Disk quota in MB (nil for default)
	TenantID    *string           // For multi-tenant deployments
	Labels      map[string]string // Optional metadata labels
	Annotations map[string]string // Optional annotations
}

CreateWorkspaceOptions contains options for creating a workspace.

type ListPermissionRequestsOptions

type ListPermissionRequestsOptions = store.ListPermissionRequestsOptions

ListPermissionRequestsOptions wraps store.ListPermissionRequestsOptions for convenience.

type ListScheduledTasksOptions

type ListScheduledTasksOptions = store.ListScheduledTasksOptions

ListScheduledTasksOptions wraps store.ListScheduledTasksOptions.

type ListSessionsOptions

type ListSessionsOptions = store.ListSessionsOptions

ListSessionsOptions wraps store.ListSessionsOptions for convenience.

type ListTaskRunsOptions

type ListTaskRunsOptions = store.ListTaskRunsOptions

ListTaskRunsOptions wraps store.ListTaskRunsOptions for convenience.

type ListTasksOptions

type ListTasksOptions = store.ListTasksOptions

ListTasksOptions wraps store.ListTasksOptions for convenience.

type ListWorkspacesOptions

type ListWorkspacesOptions struct {
	Limit    int
	Cursor   string
	TenantID *string
}

ListWorkspacesOptions contains options for listing workspaces.

type LogSubscriberManager

type LogSubscriberManager struct {
	// contains filtered or unexported fields
}

LogSubscriberManager manages real-time log subscribers. Full WebSocket integration will be implemented in G6.

func NewLogSubscriberManager

func NewLogSubscriberManager(logger *zap.Logger) *LogSubscriberManager

NewLogSubscriberManager creates a new LogSubscriberManager.

func (*LogSubscriberManager) Broadcast

func (m *LogSubscriberManager) Broadcast(log *store.Log)

Broadcast sends a log entry to all subscribers for the session. In G4, this is a stub that logs the broadcast. Full implementation in G6.

func (*LogSubscriberManager) Subscribe

func (m *LogSubscriberManager) Subscribe(sessionID string, ch chan *store.Log)

Subscribe registers a channel to receive logs for a session.

func (*LogSubscriberManager) SubscriberCount

func (m *LogSubscriberManager) SubscriberCount(sessionID string) int

SubscriberCount returns the number of subscribers for a session. Used for testing and monitoring.

func (*LogSubscriberManager) Unsubscribe

func (m *LogSubscriberManager) Unsubscribe(sessionID string, ch chan *store.Log)

Unsubscribe removes a channel from session subscriptions.

type LogSubscriberManagerInterface

type LogSubscriberManagerInterface interface {
	// Broadcast sends a log entry to all subscribers for the session.
	Broadcast(log *store.Log)
	// Subscribe registers a channel to receive logs for a session.
	Subscribe(sessionID string, ch chan *store.Log)
	// Unsubscribe removes a channel from session subscriptions.
	Unsubscribe(sessionID string, ch chan *store.Log)
}

LogSubscriberManagerInterface defines the interface for managing real-time log subscribers. This is implemented by LogSubscriberManager and used for dependency injection.

type PermissionManager

type PermissionManager struct {
	// contains filtered or unexported fields
}

PermissionManager handles permission request lifecycle.

func NewPermissionManager

func NewPermissionManager(
	store store.Store,
	cmdSender CommandSender,
	sessionMgr SessionManagerInterface,
	auditLog audit.Logger,
	logger *zap.Logger,
) *PermissionManager

NewPermissionManager creates a new PermissionManager.

func (*PermissionManager) Cancel

func (m *PermissionManager) Cancel(ctx context.Context, permID string) error

Cancel cancels a pending permission request.

func (*PermissionManager) Create

Create stores a new permission request from runner.

func (*PermissionManager) Get

Get retrieves a permission request by ID.

func (*PermissionManager) List

List retrieves permission requests with filters.

func (*PermissionManager) Respond

func (m *PermissionManager) Respond(ctx context.Context, permID string, approved bool, reason, respondedBy string) error

Respond approves or denies a permission request.

func (*PermissionManager) SetWebhookIntegration

func (m *PermissionManager) SetWebhookIntegration(wi *WebhookIntegration)

SetWebhookIntegration sets the webhook integration for dispatching events.

type PermissionManagerInterface

type PermissionManagerInterface interface {
	// Create stores a new permission request from runner.
	Create(ctx context.Context, req *CreatePermissionRequestInput) (*store.PermissionRequest, error)
	// Respond approves or denies a permission request.
	Respond(ctx context.Context, permID string, approved bool, reason, respondedBy string) error
	// Get retrieves a permission request by ID.
	Get(ctx context.Context, permID string) (*store.PermissionRequest, error)
	// List retrieves permission requests with filters.
	List(ctx context.Context, opts ListPermissionRequestsOptions) (*store.ListResult[store.PermissionRequest], error)
	// Cancel cancels a pending permission request.
	Cancel(ctx context.Context, permID string) error
}

PermissionManagerInterface defines the interface for permission request management. This is used for dependency injection in other components.

type PermissionTimeoutEnforcer

type PermissionTimeoutEnforcer struct {
	// contains filtered or unexported fields
}

PermissionTimeoutEnforcer monitors pending permission requests and suspends sessions that exceed their suspend_after_seconds timeout.

func NewPermissionTimeoutEnforcer

func NewPermissionTimeoutEnforcer(
	store store.Store,
	sessionMgr SessionManagerInterface,
	logger *zap.Logger,
	opts ...PermissionTimeoutEnforcerOption,
) *PermissionTimeoutEnforcer

NewPermissionTimeoutEnforcer creates a new PermissionTimeoutEnforcer.

func (*PermissionTimeoutEnforcer) CheckTimeouts

func (e *PermissionTimeoutEnforcer) CheckTimeouts(ctx context.Context) error

CheckTimeouts is exported for testing.

func (*PermissionTimeoutEnforcer) Start

Start begins the background permission timeout enforcement loop.

func (*PermissionTimeoutEnforcer) Stop

func (e *PermissionTimeoutEnforcer) Stop()

Stop stops the permission timeout enforcer.

type PermissionTimeoutEnforcerOption

type PermissionTimeoutEnforcerOption func(*PermissionTimeoutEnforcer)

PermissionTimeoutEnforcerOption is a functional option for PermissionTimeoutEnforcer.

func WithPermissionTimeoutCheckInterval

func WithPermissionTimeoutCheckInterval(d time.Duration) PermissionTimeoutEnforcerOption

WithPermissionTimeoutCheckInterval sets the check interval for the timeout enforcer.

type ProfileNetwork

type ProfileNetwork struct {
	Level        string   `json:"level"`         // "none", "allow_list", "proxy", "air_gapped"
	AllowedHosts []string `json:"allowed_hosts"` // For allow_list mode
}

ProfileNetwork defines the network configuration from a profile.

type ProfileResources

type ProfileResources struct {
	CPU    int    `json:"cpu"`
	Memory string `json:"memory"` // e.g., "8GB"
	Disk   string `json:"disk"`   // e.g., "50GB"
}

ProfileResources defines the resource configuration from a profile.

type ProfileSelector

type ProfileSelector struct {
	OS           string   `json:"os,omitempty"`           // e.g., "darwin", "linux"
	Arch         string   `json:"arch,omitempty"`         // e.g., "arm64", "amd64"
	Capabilities []string `json:"capabilities,omitempty"` // e.g., ["gpu", "xcode"]
}

ProfileSelector defines the runner selector constraints from a profile.

type PromptTemplateData

type PromptTemplateData struct {
	Date         string // Current date in YYYY-MM-DD format
	DateTime     string // Current date and time in RFC3339 format
	RunNumber    int    // Total number of runs including this one
	TaskName     string // Name of the scheduled task
	SessionID    string // Session ID
	ScheduledFor string // Scheduled run time in RFC3339 format
}

PromptTemplateData contains data for rendering prompt templates.

type ProviderRegistryInterface

type ProviderRegistryInterface interface {
	// GetDefault returns the default provider.
	GetDefault(ctx context.Context) (provider.Provider, error)
	// Get returns a provider by name.
	Get(ctx context.Context, name string) (provider.Provider, error)
}

ProviderRegistryInterface defines the interface for provider operations needed by SessionManager.

type RegisterRequest

type RegisterRequest struct {
	Token        string            // Runner token (required)
	Name         string            // Runner name
	Hostname     string            // Runner hostname
	SandboxMode  string            // Sandbox mode
	SandboxTypes []string          // Available sandbox types
	Capabilities []string          // Runner capabilities
	Labels       map[string]string // Key-value labels
}

RegisterRequest contains the data needed to register a runner.

type RegisterResult

type RegisterResult struct {
	RunnerID string // The runner ID (new or existing)
	IsNew    bool   // True if a new runner was created
	PoolName string // Pool name from token
}

RegisterResult contains the result of runner registration.

type ResumeResult

type ResumeResult struct {
	// Session is the resumed session.
	Session *store.Session

	// ContextSnapshot is the saved context to restore.
	ContextSnapshot *ContextSnapshot

	// SuspendStrategy is the strategy used when suspended.
	SuspendStrategy string

	// SnapshotID is the snapshot ID if one was created during suspend.
	SnapshotID string

	// WorkspaceSynced indicates if workspace was synced during suspend.
	WorkspaceSynced bool
}

ResumeResult contains information about a resumed session.

type RunnerManager

type RunnerManager struct {
	// contains filtered or unexported fields
}

RunnerManager handles runner lifecycle and status transitions.

func NewRunnerManager

func NewRunnerManager(store store.Store, connManager ConnectionManagerInterface, logger *zap.Logger, opts ...RunnerManagerOption) *RunnerManager

NewRunnerManager creates a new RunnerManager.

func (*RunnerManager) OnConnect

func (m *RunnerManager) OnConnect(ctx context.Context, runnerID string) error

OnConnect is called when a runner connects to the server. Transitions the runner from offline to idle. Also checks for resuming sessions that need a runner and auto-attaches.

func (*RunnerManager) OnDisconnect

func (m *RunnerManager) OnDisconnect(ctx context.Context, runnerID string) error

OnDisconnect is called when a runner disconnects from the server. Transitions the runner to offline status.

func (*RunnerManager) OnHeartbeat

func (m *RunnerManager) OnHeartbeat(ctx context.Context, runnerID string, hb *pb.Heartbeat) error

OnHeartbeat is called when a heartbeat is received from a runner. Updates last_seen timestamp and optionally status from heartbeat.

func (*RunnerManager) SetStatus

func (m *RunnerManager) SetStatus(ctx context.Context, runnerID, status string) error

SetStatus updates a runner's status with validation.

func (*RunnerManager) SetWebhookIntegration

func (m *RunnerManager) SetWebhookIntegration(wi *WebhookIntegration)

SetWebhookIntegration sets the webhook integration for dispatching events.

type RunnerManagerInterface

type RunnerManagerInterface interface {
	OnConnect(ctx context.Context, runnerID string) error
	OnDisconnect(ctx context.Context, runnerID string) error
	OnHeartbeat(ctx context.Context, runnerID string, hb *pb.Heartbeat) error
	SetStatus(ctx context.Context, runnerID, status string) error
}

RunnerManagerInterface defines the interface for runner management. This is used for dependency injection in other components.

type RunnerManagerOption

type RunnerManagerOption func(*RunnerManager)

RunnerManagerOption is a functional option for RunnerManager.

func WithSessionManager

func WithSessionManager(sm SessionManagerInterface) RunnerManagerOption

WithSessionManager sets the session manager for the runner manager.

func WithTaskManager

func WithTaskManager(tm TaskManagerInterface) RunnerManagerOption

WithTaskManager sets the task manager for the runner manager.

type RunnerRegistry

type RunnerRegistry struct {
	// contains filtered or unexported fields
}

RunnerRegistry handles runner registration and lookup.

func NewRunnerRegistry

func NewRunnerRegistry(store store.Store, tokenSvc *auth.RunnerTokenService, logger *zap.Logger) *RunnerRegistry

NewRunnerRegistry creates a new RunnerRegistry.

func (*RunnerRegistry) Get

func (r *RunnerRegistry) Get(ctx context.Context, runnerID string) (*store.Runner, error)

Get retrieves a runner by ID.

func (*RunnerRegistry) GetByName

func (r *RunnerRegistry) GetByName(ctx context.Context, name string) (*store.Runner, error)

GetByName retrieves a runner by name.

func (*RunnerRegistry) Register

Register registers a new runner or updates an existing one. The registration flow: 1. Validate token via tokenSvc.Validate() 2. If token is bound to a runner -> check it exists, update it 3. If token is not bound -> look up by name or create new runner 4. Bind token to runner if not already bound

type ScheduledSessionActivator

type ScheduledSessionActivator struct {
	// contains filtered or unexported fields
}

ScheduledSessionActivator is a background job that monitors sessions with lifecycle_mode='scheduled' and activates them when their next_scheduled_at time arrives.

This enables the scheduled session lifecycle mode where: - Sessions stay suspended between scheduled runs - Auto-resume at scheduled times (based on schedule_cron) - Execute their scheduled tasks when active - Auto-suspend again after scheduled tasks complete (handled elsewhere)

func NewScheduledSessionActivator

func NewScheduledSessionActivator(cfg ScheduledSessionActivatorConfig) *ScheduledSessionActivator

NewScheduledSessionActivator creates a new ScheduledSessionActivator.

func (*ScheduledSessionActivator) Start

func (a *ScheduledSessionActivator) Start() error

Start begins the background job.

func (*ScheduledSessionActivator) Stop

func (a *ScheduledSessionActivator) Stop()

Stop gracefully stops the background job.

type ScheduledSessionActivatorConfig

type ScheduledSessionActivatorConfig struct {
	Store         store.Store
	SessionMgr    SessionManagerInterface
	CheckInterval time.Duration
	BatchSize     int
	Logger        *zap.Logger
}

ScheduledSessionActivatorConfig holds configuration for the activator.

type ScheduledTaskExecutor

type ScheduledTaskExecutor struct {
	// contains filtered or unexported fields
}

ScheduledTaskExecutor polls for due scheduled tasks and executes them.

func NewScheduledTaskExecutor

func NewScheduledTaskExecutor(
	scheduledTaskSvc ScheduledTaskServiceInterface,
	taskMgr TaskManagerInterface,
	logger *zap.Logger,
	opts ...ScheduledTaskExecutorOption,
) *ScheduledTaskExecutor

NewScheduledTaskExecutor creates a new ScheduledTaskExecutor.

func (*ScheduledTaskExecutor) IsRunning

func (e *ScheduledTaskExecutor) IsRunning() bool

IsRunning returns whether the executor is running.

func (*ScheduledTaskExecutor) RunNow

func (e *ScheduledTaskExecutor) RunNow(ctx context.Context) error

RunNow immediately processes due tasks (for testing).

func (*ScheduledTaskExecutor) Start

func (e *ScheduledTaskExecutor) Start(ctx context.Context)

Start begins the background scheduled task execution loop.

func (*ScheduledTaskExecutor) Stop

func (e *ScheduledTaskExecutor) Stop()

Stop stops the scheduled task executor.

type ScheduledTaskExecutorOption

type ScheduledTaskExecutorOption func(*ScheduledTaskExecutor)

ScheduledTaskExecutorOption is a functional option for ScheduledTaskExecutor.

func WithScheduledTaskBatchSize

func WithScheduledTaskBatchSize(size int) ScheduledTaskExecutorOption

WithScheduledTaskBatchSize sets the number of tasks to process per poll.

func WithScheduledTaskCheckInterval

func WithScheduledTaskCheckInterval(d time.Duration) ScheduledTaskExecutorOption

WithScheduledTaskCheckInterval sets the polling interval.

type ScheduledTaskService

type ScheduledTaskService struct {
	// contains filtered or unexported fields
}

ScheduledTaskService handles scheduled task lifecycle.

func NewScheduledTaskService

func NewScheduledTaskService(
	store store.Store,
	taskMgr TaskManagerInterface,
	auditLog audit.Logger,
	logger *zap.Logger,
) *ScheduledTaskService

NewScheduledTaskService creates a new ScheduledTaskService.

func (*ScheduledTaskService) CalculateNextRunAt

func (s *ScheduledTaskService) CalculateNextRunAt(cronExpr, timezone string, after time.Time) (*time.Time, error)

CalculateNextRunAt calculates the next run time for a cron expression.

func (*ScheduledTaskService) Create

Create creates a new scheduled task.

func (*ScheduledTaskService) Delete

func (s *ScheduledTaskService) Delete(ctx context.Context, taskID string) error

Delete deletes a scheduled task.

func (*ScheduledTaskService) ExecuteScheduledTask

func (s *ScheduledTaskService) ExecuteScheduledTask(ctx context.Context, scheduledTask *store.ScheduledTask) (*store.Task, error)

ExecuteScheduledTask executes a scheduled task by creating a regular task.

func (*ScheduledTaskService) Get

Get retrieves a scheduled task by ID.

func (*ScheduledTaskService) GetDue

func (s *ScheduledTaskService) GetDue(ctx context.Context, limit int) ([]*store.ScheduledTask, error)

GetDue retrieves scheduled tasks that are due to run.

func (*ScheduledTaskService) List

List retrieves scheduled tasks with filtering.

func (*ScheduledTaskService) MarkTaskCompleted

func (s *ScheduledTaskService) MarkTaskCompleted(ctx context.Context, scheduledTaskID string, success bool) error

MarkTaskCompleted marks a scheduled task's last run as completed (success or failure). This should be called by the task completion handler.

func (*ScheduledTaskService) Pause

func (s *ScheduledTaskService) Pause(ctx context.Context, taskID string) error

Pause pauses a scheduled task.

func (*ScheduledTaskService) Resume

func (s *ScheduledTaskService) Resume(ctx context.Context, taskID string) error

Resume resumes a paused scheduled task.

func (*ScheduledTaskService) Trigger

func (s *ScheduledTaskService) Trigger(ctx context.Context, taskID string) (*store.Task, error)

Trigger manually triggers a scheduled task immediately.

func (*ScheduledTaskService) Update

Update updates a scheduled task.

type ScheduledTaskServiceInterface

type ScheduledTaskServiceInterface interface {
	Create(ctx context.Context, opts CreateScheduledTaskOptions) (*store.ScheduledTask, error)
	Get(ctx context.Context, taskID string) (*store.ScheduledTask, error)
	List(ctx context.Context, opts ListScheduledTasksOptions) (*store.ListResult[store.ScheduledTask], error)
	Update(ctx context.Context, taskID string, opts UpdateScheduledTaskOptions) (*store.ScheduledTask, error)
	Delete(ctx context.Context, taskID string) error
	Pause(ctx context.Context, taskID string) error
	Resume(ctx context.Context, taskID string) error
	Trigger(ctx context.Context, taskID string) (*store.Task, error)
	GetDue(ctx context.Context, limit int) ([]*store.ScheduledTask, error)
	ExecuteScheduledTask(ctx context.Context, scheduledTask *store.ScheduledTask) (*store.Task, error)
	MarkTaskCompleted(ctx context.Context, scheduledTaskID string, success bool) error
	CalculateNextRunAt(cronExpr, timezone string, after time.Time) (*time.Time, error)
}

ScheduledTaskServiceInterface defines the interface for scheduled task management.

type SessionManager

type SessionManager struct {
	// contains filtered or unexported fields
}

SessionManager handles session lifecycle and state transitions.

func NewSessionManager

func NewSessionManager(store store.Store, connManager ConnectionManagerInterface, cmdSender CommandSender, logger *zap.Logger) *SessionManager

NewSessionManager creates a new SessionManager.

func NewSessionManagerWithConfig

func NewSessionManagerWithConfig(cfg SessionManagerConfig) *SessionManager

NewSessionManagerWithConfig creates a new SessionManager with full configuration.

func (*SessionManager) Activate

func (m *SessionManager) Activate(ctx context.Context, sessionID, runnerID string) error

Activate transitions a session from pending/resuming to active. This is called when a runner is assigned to the session.

func (*SessionManager) AttachRunner

func (m *SessionManager) AttachRunner(ctx context.Context, sessionID, runnerID string) error

AttachRunner attaches a runner to a session. The session must be in pending or resuming state.

func (*SessionManager) Create

Create creates a new session.

func (*SessionManager) DetachRunner

func (m *SessionManager) DetachRunner(ctx context.Context, sessionID string) error

DetachRunner detaches the runner from a session without changing status. This is called internally when a runner disconnects.

func (*SessionManager) Get

func (m *SessionManager) Get(ctx context.Context, sessionID string) (*store.Session, error)

Get retrieves a session by ID.

func (*SessionManager) GetContextSnapshot

func (m *SessionManager) GetContextSnapshot(ctx context.Context, sessionID string) (*ContextSnapshot, error)

GetContextSnapshot retrieves the context snapshot for a session.

func (*SessionManager) GetWorkspaceHostPath

func (m *SessionManager) GetWorkspaceHostPath(ctx context.Context, sessionID string) (string, error)

GetWorkspaceHostPath returns the host filesystem path for a session's workspace. This path is used for mounting the workspace into containers.

func (*SessionManager) List

List retrieves sessions matching the given options.

func (*SessionManager) Resume

func (m *SessionManager) Resume(ctx context.Context, sessionID string) error

Resume transitions a session from suspended to resuming. The session will be fully activated once a runner is attached.

func (*SessionManager) ResumeWithResult

func (m *SessionManager) ResumeWithResult(ctx context.Context, sessionID string) (*ResumeResult, error)

ResumeWithResult transitions a session from suspended to resuming and returns resume info.

func (*SessionManager) SetProviderRegistry

func (m *SessionManager) SetProviderRegistry(pr ProviderRegistryInterface)

SetProviderRegistry sets the provider registry. This allows optional injection.

func (*SessionManager) SetTaskManager

func (m *SessionManager) SetTaskManager(tm TaskManagerInterface)

SetTaskManager sets the task manager. This allows optional injection.

func (*SessionManager) SetWebhookIntegration

func (m *SessionManager) SetWebhookIntegration(wi *WebhookIntegration)

SetWebhookIntegration sets the webhook integration for dispatching events.

func (*SessionManager) SetWorkspaceManager

func (m *SessionManager) SetWorkspaceManager(wm WorkspaceManagerInterface)

SetWorkspaceManager sets the workspace manager. This allows optional injection.

func (*SessionManager) Suspend

func (m *SessionManager) Suspend(ctx context.Context, sessionID, strategy string) error

Suspend transitions a session from active to suspended. The strategy parameter specifies how to handle the suspend (pause, snapshot, etc.).

func (*SessionManager) SuspendWithOptions

func (m *SessionManager) SuspendWithOptions(ctx context.Context, sessionID string, opts SuspendOptions) error

SuspendWithOptions transitions a session from active to suspended with full options.

func (*SessionManager) Terminate

func (m *SessionManager) Terminate(ctx context.Context, sessionID string) error

Terminate transitions a session to terminated status. This can be called from any non-terminated state.

func (*SessionManager) UpdateContextSnapshot

func (m *SessionManager) UpdateContextSnapshot(ctx context.Context, sessionID string, snapshot *ContextSnapshot) error

UpdateContextSnapshot updates the context snapshot for a session.

type SessionManagerConfig

type SessionManagerConfig struct {
	Store            store.Store
	ConnManager      ConnectionManagerInterface
	CmdSender        CommandSender
	WorkspaceManager WorkspaceManagerInterface
	AuditLog         audit.Logger
	ProviderRegistry ProviderRegistryInterface
	Logger           *zap.Logger
}

SessionManagerConfig holds configuration for SessionManager.

type SessionManagerInterface

type SessionManagerInterface interface {
	Create(ctx context.Context, opts CreateSessionOptions) (*store.Session, error)
	Get(ctx context.Context, sessionID string) (*store.Session, error)
	List(ctx context.Context, opts ListSessionsOptions) (*store.ListResult[store.Session], error)
	Activate(ctx context.Context, sessionID, runnerID string) error
	Suspend(ctx context.Context, sessionID, strategy string) error
	Resume(ctx context.Context, sessionID string) error
	Terminate(ctx context.Context, sessionID string) error
	AttachRunner(ctx context.Context, sessionID, runnerID string) error
	DetachRunner(ctx context.Context, sessionID string) error
	UpdateContextSnapshot(ctx context.Context, sessionID string, snapshot *ContextSnapshot) error
}

SessionManagerInterface defines the interface for session management. This is used for dependency injection in other components.

type StaleDetector

type StaleDetector struct {
	// contains filtered or unexported fields
}

StaleDetector monitors runners and marks stale ones as offline.

func NewStaleDetector

func NewStaleDetector(
	store store.Store,
	connManager ConnectionManagerInterface,
	runnerManager *RunnerManager,
	logger *zap.Logger,
	opts ...StaleDetectorOption,
) *StaleDetector

NewStaleDetector creates a new StaleDetector.

func (*StaleDetector) Start

func (sd *StaleDetector) Start(ctx context.Context)

Start begins the background stale detection loop.

func (*StaleDetector) Stop

func (sd *StaleDetector) Stop()

Stop stops the stale detector.

type StaleDetectorOption

type StaleDetectorOption func(*StaleDetector)

StaleDetectorOption is a functional option for StaleDetector.

func WithCheckInterval

func WithCheckInterval(d time.Duration) StaleDetectorOption

WithCheckInterval sets the check interval for the stale detector.

func WithStaleThreshold

func WithStaleThreshold(d time.Duration) StaleDetectorOption

WithStaleThreshold sets the stale threshold for the stale detector.

type StreamManager

type StreamManager struct {
	// contains filtered or unexported fields
}

StreamManager integrates the streaming manager with the server. It provides an interface for HTTP handlers and WebSocket connections to interact with the streaming infrastructure.

func NewStreamManager

func NewStreamManager(config StreamManagerConfig, s store.Store, logger *zap.Logger) (*StreamManager, error)

NewStreamManager creates a new StreamManager.

func (*StreamManager) GetSFU

func (m *StreamManager) GetSFU() *sfu.SFU

GetSFU returns the SFU instance.

func (*StreamManager) GetSignalingHandler

func (m *StreamManager) GetSignalingHandler() *sfu.SignalingHandler

GetSignalingHandler returns the signaling handler.

func (*StreamManager) GetStream

func (m *StreamManager) GetStream(ctx context.Context, streamID string) (*streaming.Stream, error)

GetStream returns a stream by ID.

func (*StreamManager) GetStreamBySession

func (m *StreamManager) GetStreamBySession(ctx context.Context, sessionID string, streamType streaming.StreamType) (*streaming.Stream, error)

GetStreamBySession returns the active stream for a session and type.

func (*StreamManager) ListSessionStreams

func (m *StreamManager) ListSessionStreams(ctx context.Context, sessionID string) ([]*streaming.Stream, error)

ListSessionStreams lists all streams for a session.

func (*StreamManager) ListStreams

func (m *StreamManager) ListStreams(ctx context.Context, params streaming.ListStreamsParams) ([]*streaming.Stream, int, error)

ListStreams lists streams matching the given parameters.

func (*StreamManager) RegisterProvider

func (m *StreamManager) RegisterProvider(provider streaming.StreamProvider) error

RegisterProvider registers a stream provider.

func (*StreamManager) Start

func (m *StreamManager) Start(ctx context.Context) error

Start starts the stream manager.

func (*StreamManager) StartStream

func (m *StreamManager) StartStream(ctx context.Context, opts streaming.StreamOptions) (*streaming.Stream, error)

StartStream starts a new stream.

func (*StreamManager) Stats

func (m *StreamManager) Stats() manager.Stats

Stats returns manager statistics.

func (*StreamManager) Stop

func (m *StreamManager) Stop(ctx context.Context) error

Stop stops the stream manager.

func (*StreamManager) StopStream

func (m *StreamManager) StopStream(ctx context.Context, streamID string) error

StopStream stops a stream.

func (*StreamManager) UnregisterProvider

func (m *StreamManager) UnregisterProvider(name string) bool

UnregisterProvider unregisters a stream provider.

func (*StreamManager) UpgradeWebSocket

func (m *StreamManager) UpgradeWebSocket(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error)

UpgradeWebSocket upgrades an HTTP connection to WebSocket.

type StreamManagerConfig

type StreamManagerConfig struct {
	// Manager configuration
	Manager manager.Config

	// WebSocket configuration
	WebSocketReadBufferSize  int
	WebSocketWriteBufferSize int
	AllowedOrigins           []string
}

StreamManagerConfig contains configuration for the StreamManager.

func DefaultStreamManagerConfig

func DefaultStreamManagerConfig() StreamManagerConfig

DefaultStreamManagerConfig returns a default configuration.

type SuspendOptions

type SuspendOptions struct {
	// Strategy specifies how to handle the suspend (pause, snapshot, etc.).
	Strategy string

	// ContextSnapshot is the context to save with the session.
	// If nil, a basic snapshot is created automatically.
	ContextSnapshot *ContextSnapshot

	// WorkspaceSynced indicates if workspace was synced to object storage.
	WorkspaceSynced bool

	// SnapshotID is the ID of any snapshot created during suspend.
	SnapshotID string
}

SuspendOptions contains options for suspending a session.

type TaskCompletedResult

type TaskCompletedResult struct {
	RunID        string
	Success      bool
	Error        string
	ExitCode     *int
	TokensInput  int
	TokensOutput int
}

TaskCompletedResult contains the result of a completed task run.

type TaskContextSnapshot

type TaskContextSnapshot struct {
	// TaskID is the current task ID.
	TaskID string `json:"task_id,omitempty"`

	// RunID is the current task run ID.
	RunID string `json:"run_id,omitempty"`

	// Status is the task status when suspended.
	Status string `json:"status,omitempty"`

	// Progress is the estimated task progress (0-100).
	Progress int `json:"progress,omitempty"`
}

TaskContextSnapshot contains task-related state for resume.

type TaskManager

type TaskManager struct {
	// contains filtered or unexported fields
}

TaskManager handles task lifecycle and execution.

func NewTaskManager

func NewTaskManager(
	store store.Store,
	cmdSender CommandSender,
	sessionMgr SessionManagerInterface,
	auditLog audit.Logger,
	logger *zap.Logger,
) *TaskManager

NewTaskManager creates a new TaskManager.

func (*TaskManager) Cancel

func (m *TaskManager) Cancel(ctx context.Context, taskID string) error

Cancel cancels a task. This can be called from pending or running state.

func (*TaskManager) Create

func (m *TaskManager) Create(ctx context.Context, opts CreateTaskOptions) (*store.Task, error)

Create creates a new task.

func (*TaskManager) CreateRun

func (m *TaskManager) CreateRun(ctx context.Context, taskID string) (*store.TaskRun, error)

CreateRun creates a new task run.

func (*TaskManager) Execute

func (m *TaskManager) Execute(ctx context.Context, taskID string) error

Execute sends a task to a runner for execution.

func (*TaskManager) FailRun

func (m *TaskManager) FailRun(ctx context.Context, runID, reason string) error

FailRun marks a task run as failed.

func (*TaskManager) Get

func (m *TaskManager) Get(ctx context.Context, taskID string) (*store.Task, error)

Get retrieves a task by ID.

func (*TaskManager) List

List retrieves tasks matching the given options.

func (*TaskManager) OnTaskAccepted

func (m *TaskManager) OnTaskAccepted(ctx context.Context, runID string) error

OnTaskAccepted is called when a runner accepts a task (pending → assigned).

func (*TaskManager) OnTaskCompleted

func (m *TaskManager) OnTaskCompleted(ctx context.Context, result *TaskCompletedResult) error

OnTaskCompleted is called when a task run completes (running → completed/failed).

func (*TaskManager) OnTaskProgress

func (m *TaskManager) OnTaskProgress(_ context.Context, runID string, progress int) error

OnTaskProgress is called when a runner reports progress. Currently a no-op since we don't store progress in DB, but could be used for real-time updates.

func (*TaskManager) OnTaskStarted

func (m *TaskManager) OnTaskStarted(ctx context.Context, runID string) error

OnTaskStarted is called when a runner starts executing a task (assigned → running).

func (*TaskManager) ReExecute

func (m *TaskManager) ReExecute(ctx context.Context, taskID string) error

ReExecute re-sends a running task to a runner after session resume. Unlike Execute, this reuses the existing task_run instead of creating a new one. This is used when a session resumes and needs to continue a task that was interrupted by suspend.

func (*TaskManager) Retry

func (m *TaskManager) Retry(ctx context.Context, taskID string) (*store.TaskRun, error)

Retry creates a new run for a failed task.

func (*TaskManager) SetWebhookIntegration

func (m *TaskManager) SetWebhookIntegration(wi *WebhookIntegration)

SetWebhookIntegration sets the webhook integration for dispatching events.

func (*TaskManager) ShouldRetry

func (m *TaskManager) ShouldRetry(ctx context.Context, taskID string) (bool, error)

ShouldRetry checks if a task should be retried.

type TaskManagerInterface

type TaskManagerInterface interface {
	Create(ctx context.Context, opts CreateTaskOptions) (*store.Task, error)
	Get(ctx context.Context, taskID string) (*store.Task, error)
	List(ctx context.Context, opts ListTasksOptions) (*store.ListResult[store.Task], error)
	Cancel(ctx context.Context, taskID string) error
	Execute(ctx context.Context, taskID string) error
	ReExecute(ctx context.Context, taskID string) error
	CreateRun(ctx context.Context, taskID string) (*store.TaskRun, error)
	OnTaskAccepted(ctx context.Context, runID string) error
	OnTaskStarted(ctx context.Context, runID string) error
	OnTaskProgress(ctx context.Context, runID string, progress int) error
	OnTaskCompleted(ctx context.Context, result *TaskCompletedResult) error
	FailRun(ctx context.Context, runID, reason string) error
	ShouldRetry(ctx context.Context, taskID string) (bool, error)
	Retry(ctx context.Context, taskID string) (*store.TaskRun, error)
}

TaskManagerInterface defines the interface for task management. This is used for dependency injection in other components.

type TaskTimeoutEnforcer

type TaskTimeoutEnforcer struct {
	// contains filtered or unexported fields
}

TaskTimeoutEnforcer monitors running tasks and enforces timeouts.

func NewTaskTimeoutEnforcer

func NewTaskTimeoutEnforcer(
	store store.Store,
	taskMgr *TaskManager,
	cmdSender CommandSender,
	logger *zap.Logger,
	opts ...TaskTimeoutEnforcerOption,
) *TaskTimeoutEnforcer

NewTaskTimeoutEnforcer creates a new TaskTimeoutEnforcer.

func (*TaskTimeoutEnforcer) Start

func (e *TaskTimeoutEnforcer) Start(ctx context.Context)

Start begins the background timeout enforcement loop.

func (*TaskTimeoutEnforcer) Stop

func (e *TaskTimeoutEnforcer) Stop()

Stop stops the timeout enforcer.

type TaskTimeoutEnforcerOption

type TaskTimeoutEnforcerOption func(*TaskTimeoutEnforcer)

TaskTimeoutEnforcerOption is a functional option for TaskTimeoutEnforcer.

func WithTimeoutCheckInterval

func WithTimeoutCheckInterval(d time.Duration) TaskTimeoutEnforcerOption

WithTimeoutCheckInterval sets the check interval for the timeout enforcer.

type UpdateScheduledTaskOptions

type UpdateScheduledTaskOptions struct {
	Name                   *string
	Description            *string
	CronExpression         *string
	Timezone               *string
	PromptTemplate         *string
	TimeoutSeconds         *int
	MaxRetries             *int
	OnFailure              *string
	MaxConsecutiveFailures *int
	Labels                 map[string]string
	Annotations            map[string]string
}

UpdateScheduledTaskOptions contains options for updating a scheduled task.

type UpdateWebhookInput

type UpdateWebhookInput struct {
	Name              *string
	URL               *string
	Events            []string
	IsActive          *bool
	MaxRetries        *int
	RetryDelaySeconds *int
	TimeoutSeconds    *int
	Headers           map[string]string
	Labels            map[string]string
	Annotations       map[string]string
}

UpdateWebhookInput contains input for updating a webhook.

type WebhookDispatcher

type WebhookDispatcher interface {
	Dispatch(ctx context.Context, eventType string, resource webhook.ResourceInfo, data any, tenantID *string) error
}

WebhookDispatcher is the interface for dispatching webhook events.

type WebhookIntegration

type WebhookIntegration struct {
	// contains filtered or unexported fields
}

WebhookIntegration provides methods for dispatching webhook events from managers.

func NewWebhookIntegration

func NewWebhookIntegration(dispatcher WebhookDispatcher, logger *zap.Logger) *WebhookIntegration

NewWebhookIntegration creates a new WebhookIntegration.

func (*WebhookIntegration) DispatchPermissionEvent

func (w *WebhookIntegration) DispatchPermissionEvent(ctx context.Context, eventType string, perm *store.PermissionRequest)

DispatchPermissionEvent dispatches a permission-related webhook event.

func (*WebhookIntegration) DispatchRunnerEvent

func (w *WebhookIntegration) DispatchRunnerEvent(ctx context.Context, eventType string, runner *store.Runner, sessionID *string)

DispatchRunnerEvent dispatches a runner-related webhook event.

func (*WebhookIntegration) DispatchSessionEvent

func (w *WebhookIntegration) DispatchSessionEvent(ctx context.Context, eventType string, session *store.Session)

DispatchSessionEvent dispatches a session-related webhook event.

func (*WebhookIntegration) DispatchTaskEvent

func (w *WebhookIntegration) DispatchTaskEvent(ctx context.Context, eventType string, task *store.Task, run *store.TaskRun)

DispatchTaskEvent dispatches a task-related webhook event.

type WebhookManager

type WebhookManager struct {
	// contains filtered or unexported fields
}

WebhookManager handles webhook configuration and event dispatch.

func NewWebhookManager

func NewWebhookManager(
	store store.Store,
	config webhook.Config,
	logger *zap.Logger,
) *WebhookManager

NewWebhookManager creates a new WebhookManager.

func (*WebhookManager) Create

Create creates a new webhook and returns its secret (only shown once).

func (*WebhookManager) Delete

func (m *WebhookManager) Delete(ctx context.Context, webhookID string) error

Delete deletes a webhook and cancels its pending events.

func (*WebhookManager) DeliverPendingEvents

func (m *WebhookManager) DeliverPendingEvents(ctx context.Context, limit int) (int, error)

DeliverPendingEvents processes pending webhook events. This is called by the retry job.

func (*WebhookManager) Dispatch

func (m *WebhookManager) Dispatch(ctx context.Context, eventType string, resource webhook.ResourceInfo, data any, tenantID *string) error

Dispatch sends an event to all matching webhooks.

func (*WebhookManager) Get

func (m *WebhookManager) Get(ctx context.Context, webhookID string) (*store.Webhook, error)

Get retrieves a webhook by ID.

func (*WebhookManager) GetByName

func (m *WebhookManager) GetByName(ctx context.Context, name string, tenantID *string) (*store.Webhook, error)

GetByName retrieves a webhook by name.

func (*WebhookManager) GetEvent

func (m *WebhookManager) GetEvent(ctx context.Context, eventID string) (*store.WebhookEvent, error)

GetEvent retrieves a webhook event by ID.

func (*WebhookManager) List

List retrieves webhooks with filters.

func (*WebhookManager) ListEvents

ListEvents retrieves webhook events with filters.

func (*WebhookManager) RetryEvent

func (m *WebhookManager) RetryEvent(ctx context.Context, eventID string) error

RetryEvent manually retries a failed webhook event.

func (*WebhookManager) RotateSecret

func (m *WebhookManager) RotateSecret(ctx context.Context, webhookID string) (string, error)

RotateSecret generates a new secret for a webhook. Returns the new secret (only shown once).

func (*WebhookManager) Stop

func (m *WebhookManager) Stop()

Stop gracefully stops the webhook manager.

func (*WebhookManager) Update

func (m *WebhookManager) Update(ctx context.Context, webhookID string, input *UpdateWebhookInput) error

Update updates a webhook.

type WebhookManagerInterface

type WebhookManagerInterface interface {
	// Webhook CRUD
	Create(ctx context.Context, input *CreateWebhookInput) (*store.Webhook, string, error)
	Get(ctx context.Context, webhookID string) (*store.Webhook, error)
	GetByName(ctx context.Context, name string, tenantID *string) (*store.Webhook, error)
	List(ctx context.Context, opts store.ListWebhooksOptions) (*store.ListResult[store.Webhook], error)
	Update(ctx context.Context, webhookID string, input *UpdateWebhookInput) error
	Delete(ctx context.Context, webhookID string) error
	RotateSecret(ctx context.Context, webhookID string) (string, error)

	// Event dispatch
	Dispatch(ctx context.Context, eventType string, resource webhook.ResourceInfo, data any, tenantID *string) error

	// Webhook events
	GetEvent(ctx context.Context, eventID string) (*store.WebhookEvent, error)
	ListEvents(ctx context.Context, opts store.ListWebhookEventsOptions) (*store.ListResult[store.WebhookEvent], error)
	RetryEvent(ctx context.Context, eventID string) error
}

WebhookManagerInterface defines the interface for webhook management.

type WorkspaceManager

type WorkspaceManager struct {
	// contains filtered or unexported fields
}

WorkspaceManager handles workspace lifecycle and storage.

func NewWorkspaceManager

func NewWorkspaceManager(store store.Store, cfg config.WorkspaceStorageConfig, logger *zap.Logger) *WorkspaceManager

NewWorkspaceManager creates a new WorkspaceManager.

func (*WorkspaceManager) CleanupHostDirectory

func (m *WorkspaceManager) CleanupHostDirectory(ctx context.Context, workspaceID string) error

CleanupHostDirectory removes the host directory for a workspace.

func (*WorkspaceManager) Create

Create creates a new workspace with a corresponding host directory.

func (*WorkspaceManager) Delete

func (m *WorkspaceManager) Delete(ctx context.Context, workspaceID string) error

Delete soft-deletes a workspace. The actual host directory cleanup is handled separately based on configuration.

func (*WorkspaceManager) EnsureHostDirectory

func (m *WorkspaceManager) EnsureHostDirectory(ctx context.Context, workspaceID string) (string, error)

EnsureHostDirectory creates the host directory for a workspace if it doesn't exist. Returns the host path that was created or already exists.

func (*WorkspaceManager) Get

func (m *WorkspaceManager) Get(ctx context.Context, workspaceID string) (*store.Workspace, error)

Get retrieves a workspace by ID.

func (*WorkspaceManager) GetBaseDir

func (m *WorkspaceManager) GetBaseDir() string

GetBaseDir returns the configured base directory for workspaces.

func (*WorkspaceManager) GetHostPath

func (m *WorkspaceManager) GetHostPath(_ context.Context, workspaceID string) (string, error)

GetHostPath returns the host filesystem path for a workspace. This path is used for mounting the workspace into containers.

func (*WorkspaceManager) IsInUse

func (m *WorkspaceManager) IsInUse(ctx context.Context, workspaceID string) (bool, error)

IsInUse checks if a workspace is currently being used by any active session.

func (*WorkspaceManager) List

List returns workspaces matching the filter options.

func (*WorkspaceManager) Update

func (m *WorkspaceManager) Update(ctx context.Context, workspaceID string, updates store.WorkspaceUpdates) (*store.Workspace, error)

Update updates a workspace's mutable fields.

type WorkspaceManagerInterface

type WorkspaceManagerInterface interface {
	Create(ctx context.Context, opts CreateWorkspaceOptions) (*store.Workspace, error)
	Get(ctx context.Context, workspaceID string) (*store.Workspace, error)
	List(ctx context.Context, opts ListWorkspacesOptions) (*store.ListResult[store.Workspace], error)
	Update(ctx context.Context, workspaceID string, updates store.WorkspaceUpdates) (*store.Workspace, error)
	Delete(ctx context.Context, workspaceID string) error
	GetHostPath(ctx context.Context, workspaceID string) (string, error)
	EnsureHostDirectory(ctx context.Context, workspaceID string) (string, error)
	CleanupHostDirectory(ctx context.Context, workspaceID string) error
	IsInUse(ctx context.Context, workspaceID string) (bool, error)
}

WorkspaceManagerInterface defines the interface for workspace management.

type WorkspaceStore

type WorkspaceStore interface {
	CreateWorkspace(ctx context.Context, ws *store.Workspace) error
	GetWorkspace(ctx context.Context, id string) (*store.Workspace, error)
	ListWorkspaces(ctx context.Context, opts store.ListWorkspacesOptions) (*store.ListResult[store.Workspace], error)
	UpdateWorkspace(ctx context.Context, id string, updates store.WorkspaceUpdates) error
	DeleteWorkspace(ctx context.Context, id string) error
	ListSessions(ctx context.Context, opts store.ListSessionsOptions) (*store.ListResult[store.Session], error)
}

WorkspaceStore defines the minimal store interface needed by WorkspaceManager.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL