Documentation
¶
Overview ¶
Package core provides business logic for the Marionette server.
Index ¶
- Constants
- Variables
- func IsValidSessionTransition(from, to string) bool
- func IsValidTaskRunTransition(from, to string) bool
- func IsValidTaskTransition(from, to string) bool
- func IsValidTransition(from, to string) bool
- type CommandSender
- type ConnectionManagerInterface
- type ContextSnapshot
- type CreatePermissionRequestInput
- type CreateScheduledTaskOptions
- type CreateSessionOptions
- type CreateTaskOptions
- type CreateWebhookInput
- type CreateWorkspaceOptions
- type ListPermissionRequestsOptions
- type ListScheduledTasksOptions
- type ListSessionsOptions
- type ListTaskRunsOptions
- type ListTasksOptions
- type ListWorkspacesOptions
- type LogSubscriberManager
- type LogSubscriberManagerInterface
- type PermissionManager
- func (m *PermissionManager) Cancel(ctx context.Context, permID string) error
- func (m *PermissionManager) Create(ctx context.Context, req *CreatePermissionRequestInput) (*store.PermissionRequest, error)
- func (m *PermissionManager) Get(ctx context.Context, permID string) (*store.PermissionRequest, error)
- func (m *PermissionManager) List(ctx context.Context, opts ListPermissionRequestsOptions) (*store.ListResult[store.PermissionRequest], error)
- func (m *PermissionManager) Respond(ctx context.Context, permID string, approved bool, reason, respondedBy string) error
- func (m *PermissionManager) SetWebhookIntegration(wi *WebhookIntegration)
- type PermissionManagerInterface
- type PermissionTimeoutEnforcer
- type PermissionTimeoutEnforcerOption
- type ProfileNetwork
- type ProfileResources
- type ProfileSelector
- type PromptTemplateData
- type ProviderRegistryInterface
- type RegisterRequest
- type RegisterResult
- type ResumeResult
- type RunnerManager
- func (m *RunnerManager) OnConnect(ctx context.Context, runnerID string) error
- func (m *RunnerManager) OnDisconnect(ctx context.Context, runnerID string) error
- func (m *RunnerManager) OnHeartbeat(ctx context.Context, runnerID string, hb *pb.Heartbeat) error
- func (m *RunnerManager) SetStatus(ctx context.Context, runnerID, status string) error
- func (m *RunnerManager) SetWebhookIntegration(wi *WebhookIntegration)
- type RunnerManagerInterface
- type RunnerManagerOption
- type RunnerRegistry
- type ScheduledSessionActivator
- type ScheduledSessionActivatorConfig
- type ScheduledTaskExecutor
- type ScheduledTaskExecutorOption
- type ScheduledTaskService
- func (s *ScheduledTaskService) CalculateNextRunAt(cronExpr, timezone string, after time.Time) (*time.Time, error)
- func (s *ScheduledTaskService) Create(ctx context.Context, opts CreateScheduledTaskOptions) (*store.ScheduledTask, error)
- func (s *ScheduledTaskService) Delete(ctx context.Context, taskID string) error
- func (s *ScheduledTaskService) ExecuteScheduledTask(ctx context.Context, scheduledTask *store.ScheduledTask) (*store.Task, error)
- func (s *ScheduledTaskService) Get(ctx context.Context, taskID string) (*store.ScheduledTask, error)
- func (s *ScheduledTaskService) GetDue(ctx context.Context, limit int) ([]*store.ScheduledTask, error)
- func (s *ScheduledTaskService) List(ctx context.Context, opts ListScheduledTasksOptions) (*store.ListResult[store.ScheduledTask], error)
- func (s *ScheduledTaskService) MarkTaskCompleted(ctx context.Context, scheduledTaskID string, success bool) error
- func (s *ScheduledTaskService) Pause(ctx context.Context, taskID string) error
- func (s *ScheduledTaskService) Resume(ctx context.Context, taskID string) error
- func (s *ScheduledTaskService) Trigger(ctx context.Context, taskID string) (*store.Task, error)
- func (s *ScheduledTaskService) Update(ctx context.Context, taskID string, opts UpdateScheduledTaskOptions) (*store.ScheduledTask, error)
- type ScheduledTaskServiceInterface
- type SessionManager
- func (m *SessionManager) Activate(ctx context.Context, sessionID, runnerID string) error
- func (m *SessionManager) AttachRunner(ctx context.Context, sessionID, runnerID string) error
- func (m *SessionManager) Create(ctx context.Context, opts CreateSessionOptions) (*store.Session, error)
- func (m *SessionManager) DetachRunner(ctx context.Context, sessionID string) error
- func (m *SessionManager) Get(ctx context.Context, sessionID string) (*store.Session, error)
- func (m *SessionManager) GetContextSnapshot(ctx context.Context, sessionID string) (*ContextSnapshot, error)
- func (m *SessionManager) GetWorkspaceHostPath(ctx context.Context, sessionID string) (string, error)
- func (m *SessionManager) List(ctx context.Context, opts ListSessionsOptions) (*store.ListResult[store.Session], error)
- func (m *SessionManager) Resume(ctx context.Context, sessionID string) error
- func (m *SessionManager) ResumeWithResult(ctx context.Context, sessionID string) (*ResumeResult, error)
- func (m *SessionManager) SetProviderRegistry(pr ProviderRegistryInterface)
- func (m *SessionManager) SetTaskManager(tm TaskManagerInterface)
- func (m *SessionManager) SetWebhookIntegration(wi *WebhookIntegration)
- func (m *SessionManager) SetWorkspaceManager(wm WorkspaceManagerInterface)
- func (m *SessionManager) Suspend(ctx context.Context, sessionID, strategy string) error
- func (m *SessionManager) SuspendWithOptions(ctx context.Context, sessionID string, opts SuspendOptions) error
- func (m *SessionManager) Terminate(ctx context.Context, sessionID string) error
- func (m *SessionManager) UpdateContextSnapshot(ctx context.Context, sessionID string, snapshot *ContextSnapshot) error
- type SessionManagerConfig
- type SessionManagerInterface
- type StaleDetector
- type StaleDetectorOption
- type StreamManager
- func (m *StreamManager) GetSFU() *sfu.SFU
- func (m *StreamManager) GetSignalingHandler() *sfu.SignalingHandler
- func (m *StreamManager) GetStream(ctx context.Context, streamID string) (*streaming.Stream, error)
- func (m *StreamManager) GetStreamBySession(ctx context.Context, sessionID string, streamType streaming.StreamType) (*streaming.Stream, error)
- func (m *StreamManager) ListSessionStreams(ctx context.Context, sessionID string) ([]*streaming.Stream, error)
- func (m *StreamManager) ListStreams(ctx context.Context, params streaming.ListStreamsParams) ([]*streaming.Stream, int, error)
- func (m *StreamManager) RegisterProvider(provider streaming.StreamProvider) error
- func (m *StreamManager) Start(ctx context.Context) error
- func (m *StreamManager) StartStream(ctx context.Context, opts streaming.StreamOptions) (*streaming.Stream, error)
- func (m *StreamManager) Stats() manager.Stats
- func (m *StreamManager) Stop(ctx context.Context) error
- func (m *StreamManager) StopStream(ctx context.Context, streamID string) error
- func (m *StreamManager) UnregisterProvider(name string) bool
- func (m *StreamManager) UpgradeWebSocket(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error)
- type StreamManagerConfig
- type SuspendOptions
- type TaskCompletedResult
- type TaskContextSnapshot
- type TaskManager
- func (m *TaskManager) Cancel(ctx context.Context, taskID string) error
- func (m *TaskManager) Create(ctx context.Context, opts CreateTaskOptions) (*store.Task, error)
- func (m *TaskManager) CreateRun(ctx context.Context, taskID string) (*store.TaskRun, error)
- func (m *TaskManager) Execute(ctx context.Context, taskID string) error
- func (m *TaskManager) FailRun(ctx context.Context, runID, reason string) error
- func (m *TaskManager) Get(ctx context.Context, taskID string) (*store.Task, error)
- func (m *TaskManager) List(ctx context.Context, opts ListTasksOptions) (*store.ListResult[store.Task], error)
- func (m *TaskManager) OnTaskAccepted(ctx context.Context, runID string) error
- func (m *TaskManager) OnTaskCompleted(ctx context.Context, result *TaskCompletedResult) error
- func (m *TaskManager) OnTaskProgress(_ context.Context, runID string, progress int) error
- func (m *TaskManager) OnTaskStarted(ctx context.Context, runID string) error
- func (m *TaskManager) ReExecute(ctx context.Context, taskID string) error
- func (m *TaskManager) Retry(ctx context.Context, taskID string) (*store.TaskRun, error)
- func (m *TaskManager) SetWebhookIntegration(wi *WebhookIntegration)
- func (m *TaskManager) ShouldRetry(ctx context.Context, taskID string) (bool, error)
- type TaskManagerInterface
- type TaskTimeoutEnforcer
- type TaskTimeoutEnforcerOption
- type UpdateScheduledTaskOptions
- type UpdateWebhookInput
- type WebhookDispatcher
- type WebhookIntegration
- func (w *WebhookIntegration) DispatchPermissionEvent(ctx context.Context, eventType string, perm *store.PermissionRequest)
- func (w *WebhookIntegration) DispatchRunnerEvent(ctx context.Context, eventType string, runner *store.Runner, sessionID *string)
- func (w *WebhookIntegration) DispatchSessionEvent(ctx context.Context, eventType string, session *store.Session)
- func (w *WebhookIntegration) DispatchTaskEvent(ctx context.Context, eventType string, task *store.Task, run *store.TaskRun)
- type WebhookManager
- func (m *WebhookManager) Create(ctx context.Context, input *CreateWebhookInput) (*store.Webhook, string, error)
- func (m *WebhookManager) Delete(ctx context.Context, webhookID string) error
- func (m *WebhookManager) DeliverPendingEvents(ctx context.Context, limit int) (int, error)
- func (m *WebhookManager) Dispatch(ctx context.Context, eventType string, resource webhook.ResourceInfo, data any, ...) error
- func (m *WebhookManager) Get(ctx context.Context, webhookID string) (*store.Webhook, error)
- func (m *WebhookManager) GetByName(ctx context.Context, name string, tenantID *string) (*store.Webhook, error)
- func (m *WebhookManager) GetEvent(ctx context.Context, eventID string) (*store.WebhookEvent, error)
- func (m *WebhookManager) List(ctx context.Context, opts store.ListWebhooksOptions) (*store.ListResult[store.Webhook], error)
- func (m *WebhookManager) ListEvents(ctx context.Context, opts store.ListWebhookEventsOptions) (*store.ListResult[store.WebhookEvent], error)
- func (m *WebhookManager) RetryEvent(ctx context.Context, eventID string) error
- func (m *WebhookManager) RotateSecret(ctx context.Context, webhookID string) (string, error)
- func (m *WebhookManager) Stop()
- func (m *WebhookManager) Update(ctx context.Context, webhookID string, input *UpdateWebhookInput) error
- type WebhookManagerInterface
- type WorkspaceManager
- func (m *WorkspaceManager) CleanupHostDirectory(ctx context.Context, workspaceID string) error
- func (m *WorkspaceManager) Create(ctx context.Context, opts CreateWorkspaceOptions) (*store.Workspace, error)
- func (m *WorkspaceManager) Delete(ctx context.Context, workspaceID string) error
- func (m *WorkspaceManager) EnsureHostDirectory(ctx context.Context, workspaceID string) (string, error)
- func (m *WorkspaceManager) Get(ctx context.Context, workspaceID string) (*store.Workspace, error)
- func (m *WorkspaceManager) GetBaseDir() string
- func (m *WorkspaceManager) GetHostPath(_ context.Context, workspaceID string) (string, error)
- func (m *WorkspaceManager) IsInUse(ctx context.Context, workspaceID string) (bool, error)
- func (m *WorkspaceManager) List(ctx context.Context, opts ListWorkspacesOptions) (*store.ListResult[store.Workspace], error)
- func (m *WorkspaceManager) Update(ctx context.Context, workspaceID string, updates store.WorkspaceUpdates) (*store.Workspace, error)
- type WorkspaceManagerInterface
- type WorkspaceStore
Constants ¶
const ( PermissionStatusPending = "pending" PermissionStatusApproved = "approved" PermissionStatusDenied = "denied" PermissionStatusCanceled = "canceled" )
Permission status constants.
const ( RiskLevelLow = "low" RiskLevelMedium = "medium" RiskLevelHigh = "high" RiskLevelCritical = "critical" )
Risk level constants.
const ( StatusOffline = "offline" StatusIdle = "idle" StatusBusy = "busy" StatusPaused = "paused" )
Runner status constants.
const ( DefaultCheckInterval = 30 * time.Second // How often to check for stale runners DefaultStaleThreshold = 90 * time.Second // 3 missed heartbeats (30s interval) )
Default stale detection configuration.
const ( DefaultScheduledTaskCheckInterval = 30 * time.Second DefaultScheduledTaskBatchSize = 50 )
Default scheduled task executor configuration.
const ( ScheduledTaskStatusActive = "active" ScheduledTaskStatusPaused = "paused" ScheduledTaskStatusDisabled = "disabled" )
ScheduledTask status constants.
const ( OnFailureContinue = "continue" OnFailurePauseOnFailure = "pause_on_failure" OnFailureDisableOnFailure = "disable_on_failure" )
ScheduledTask OnFailure policies.
const ( DefaultScheduledTaskTimeoutSeconds = 3600 // 1 hour DefaultScheduledTaskMaxRetries = 0 DefaultMaxConsecutiveFailures = 3 DefaultScheduledTaskTimezone = "UTC" )
Default scheduled task configuration.
const ( SessionStatusPending = "pending" SessionStatusActive = "active" SessionStatusSuspended = "suspended" SessionStatusResuming = "resuming" SessionStatusTerminated = "terminated" )
Session status constants.
const ( NetworkPolicyNone = "none" NetworkPolicyAllowList = "allow_list" NetworkPolicyProxy = "proxy" NetworkPolicyAirGapped = "air_gapped" )
Network policy constants.
const ( LifecycleModeOnDemand = "on_demand" LifecycleModeAlwaysOn = "always_on" LifecycleModeScheduled = "scheduled" )
Lifecycle mode constants.
const ( TaskStatusPending = "pending" TaskStatusRunning = "running" TaskStatusCompleted = "completed" TaskStatusFailed = "failed" TaskStatusCanceled = "canceled" )
Task status constants.
const ( TaskRunStatusPending = "pending" TaskRunStatusAssigned = "assigned" TaskRunStatusRunning = "running" TaskRunStatusCompleted = "completed" TaskRunStatusFailed = "failed" TaskRunStatusTimeout = "timeout" TaskRunStatusCanceled = "canceled" )
TaskRun status constants.
const ( DefaultTaskTimeoutSeconds = 3600 // 1 hour DefaultMaxRetries = 0 )
Default task configuration.
const ( WorkspaceMobilityLocal = "local" WorkspaceMobilityObjectSync = "object_sync" )
Workspace mobility constants.
const CurrentSnapshotVersion = 1
CurrentSnapshotVersion is the current snapshot format version.
const (
DefaultPermissionTimeoutCheckInterval = 60 * time.Second
)
Default permission timeout enforcement configuration.
const (
DefaultSuspendAfterSeconds = 1800 // 30 minutes
)
Default permission timeout configuration.
const (
DefaultTimeoutCheckInterval = 30 * time.Second
)
Default timeout enforcement configuration.
const (
DefaultWorkspaceBaseDir = "/var/marionette/workspaces"
)
Default workspace settings.
const (
WorkspaceStorageTypeVolume = "volume"
)
Workspace storage type constants.
Variables ¶
var ( ErrPermissionNotFound = errors.New("permission request not found") ErrPermissionAlreadyResponded = errors.New("permission request already responded") ErrPermissionNotPending = errors.New("permission request is not pending") )
Permission-related errors.
var ( ErrScheduledTaskNotFound = errors.New("scheduled task not found") ErrInvalidCronExpression = errors.New("invalid cron expression") ErrInvalidTimezone = errors.New("invalid timezone") ErrScheduledTaskNameRequired = errors.New("name is required") ErrScheduledTaskPromptRequired = errors.New("prompt_template is required") ErrScheduledTaskCronRequired = errors.New("cron_expression is required") ErrScheduledTaskAlreadyActive = errors.New("scheduled task is already active") ErrScheduledTaskAlreadyPaused = errors.New("scheduled task is already paused") ErrScheduledTaskDisabled = errors.New("scheduled task is disabled") )
ScheduledTask-related errors.
var ( ErrSessionNotFound = errors.New("session not found") ErrInvalidSessionTransition = errors.New("invalid session status transition") ErrSessionAlreadyTerminated = errors.New("session is already terminated") ErrSessionNotActive = errors.New("session is not active") ErrSessionNotSuspended = errors.New("session is not suspended") ErrSessionNoRunner = errors.New("session has no runner attached") ErrSessionAlreadyHasRunner = errors.New("session already has a runner attached") ErrWorkspaceRequired = errors.New("workspace_id is required") ErrAgentRequired = errors.New("agent is required") ErrRunnerNotIdle = errors.New("runner is not idle") ErrScheduleCronRequired = errors.New("schedule_cron is required for scheduled lifecycle mode") )
Session-related errors.
var ( ErrTaskNotFound = errors.New("task not found") ErrTaskRunNotFound = errors.New("task run not found") ErrInvalidTaskTransition = errors.New("invalid task status transition") ErrInvalidTaskRunTransition = errors.New("invalid task run status transition") ErrTaskAlreadyCompleted = errors.New("task is already completed") ErrTaskAlreadyCanceled = errors.New("task is already canceled") ErrSessionRequired = errors.New("session_id is required") ErrPromptRequired = errors.New("prompt is required") ErrNoRunnerAttached = errors.New("no runner attached to session") ErrMaxRetriesExceeded = errors.New("max retries exceeded") )
Task-related errors.
var ( ErrWebhookNotFound = errors.New("webhook not found") ErrWebhookNameExists = errors.New("webhook name already exists") ErrWebhookEventNotFound = errors.New("webhook event not found") ErrInvalidEventPattern = errors.New("invalid event pattern") )
Webhook-related errors.
var ( ErrWorkspaceNotFound = errors.New("workspace not found") ErrWorkspaceAlreadyExists = errors.New("workspace already exists") ErrWorkspaceDeleted = errors.New("workspace has been deleted") ErrWorkspaceInUse = errors.New("workspace is in use by a session") ErrInvalidWorkspaceName = errors.New("invalid workspace name") )
Workspace-related errors.
var ErrInvalidStatusTransition = errors.New("invalid status transition")
ErrInvalidStatusTransition is returned when an invalid status transition is attempted.
var ErrTokenBoundToOtherRunner = errors.New("token is bound to a different runner")
ErrTokenBoundToOtherRunner is returned when token is bound to a different runner.
var ErrTokenRequired = errors.New("runner token is required")
ErrTokenRequired is returned when no token is provided.
Functions ¶
func IsValidSessionTransition ¶
IsValidSessionTransition is exported for testing.
func IsValidTaskRunTransition ¶
IsValidTaskRunTransition is exported for testing.
func IsValidTaskTransition ¶
IsValidTaskTransition is exported for testing.
func IsValidTransition ¶
IsValidTransition is exported for testing.
Types ¶
type CommandSender ¶
type CommandSender interface {
SendCommand(runnerID string, cmd *pb.ServerCommand) error
}
CommandSender defines the interface for sending commands to runners. This is implemented by grpc.ConnectionManager.
type ConnectionManagerInterface ¶
type ConnectionManagerInterface interface {
IsConnected(runnerID string) bool
UpdateLastSeen(runnerID string) error
}
ConnectionManagerInterface defines the interface for connection management. This is implemented by grpc.ConnectionManager.
type ContextSnapshot ¶
type ContextSnapshot struct {
// WorkingDirectory is the agent's current working directory.
WorkingDirectory string `json:"working_directory,omitempty"`
// Environment contains environment variables to restore.
Environment map[string]string `json:"environment,omitempty"`
// ConversationID is the agent's conversation ID (for agents that support it).
// This allows the agent to continue the conversation after resume.
ConversationID string `json:"conversation_id,omitempty"`
// AgentState contains agent-specific state as raw JSON.
// This is opaque to the server and passed directly to the agent on resume.
AgentState json.RawMessage `json:"agent_state,omitempty"`
// LastActivity is when the session was last active.
LastActivity time.Time `json:"last_activity"`
// Version is the snapshot format version for compatibility checking.
Version int `json:"version"`
// AgentVersion is the agent version that created this snapshot.
// Used to check compatibility on resume.
AgentVersion string `json:"agent_version,omitempty"`
// PendingPermissions contains IDs of pending permission requests.
// These need to be delivered to the agent on resume.
PendingPermissions []string `json:"pending_permissions,omitempty"`
// TaskContext contains information about the current/last task.
TaskContext *TaskContextSnapshot `json:"task_context,omitempty"`
}
ContextSnapshot represents the saved state of a session for suspend/resume. This allows sessions to be resumed on different runners while preserving the agent's context and working state.
func NewContextSnapshot ¶
func NewContextSnapshot() *ContextSnapshot
NewContextSnapshot creates a new context snapshot with default values.
func ParseContextSnapshot ¶
func ParseContextSnapshot(data json.RawMessage) (*ContextSnapshot, error)
ParseContextSnapshot deserializes a snapshot from JSON.
func (*ContextSnapshot) IsCompatible ¶
func (s *ContextSnapshot) IsCompatible(agentVersion string) bool
IsCompatible checks if the snapshot is compatible with the given agent version. For now, we only check the snapshot version, not the agent version.
func (*ContextSnapshot) Merge ¶
func (s *ContextSnapshot) Merge(other *ContextSnapshot)
Merge merges another snapshot into this one, overwriting non-empty fields.
func (*ContextSnapshot) ToJSON ¶
func (s *ContextSnapshot) ToJSON() (json.RawMessage, error)
ToJSON serializes the snapshot to JSON.
type CreatePermissionRequestInput ¶
type CreatePermissionRequestInput struct {
OriginalRequestID string // Original request ID from agent (e.g., tool_use_id)
SessionID string
TaskID string
RunID string
Tool string
Action string
Context string
RiskLevel string
SuspendAfterSeconds int
TenantID *string
}
CreatePermissionRequestInput contains input for creating a permission request.
type CreateScheduledTaskOptions ¶
type CreateScheduledTaskOptions struct {
SessionID string // Required
Name string // Required
Description string // Optional
CronExpression string // Required: e.g., "0 9 * * 1-5"
Timezone string // Default: "UTC"
PromptTemplate string // Required: may contain {{.Date}}, {{.RunNumber}}
TimeoutSeconds int // Default: 3600
MaxRetries int // Default: 0
OnFailure string // Default: "continue"
MaxConsecutiveFailures *int // Default: 3
TenantID *string // For multi-tenant deployments
Labels map[string]string // Optional metadata labels
Annotations map[string]string // Optional metadata annotations
}
CreateScheduledTaskOptions contains options for creating a scheduled task.
type CreateSessionOptions ¶
type CreateSessionOptions struct {
Name *string // Optional session name
WorkspaceID string // Required
Agent string // Required (e.g., "claude")
IsBYOK bool // Whether using BYOK mode
AgentConfigID *string // Optional, for managed credentials
ProfileID *string // Optional, for runner configuration
LifecycleMode string // on_demand, always_on, scheduled
IdleTimeout *int // Seconds (for on_demand mode)
NetworkPolicy string // none, allow_list, proxy, air_gapped
AllowedHosts []string // For allow_list network policy
ScheduleCron *string // Required for scheduled mode
ScheduleTZ *string // Timezone for scheduled mode
TenantID *string // For multi-tenant deployments
Labels map[string]string // Optional metadata labels
Annotations map[string]string // Optional metadata annotations
}
CreateSessionOptions contains options for creating a new session.
type CreateTaskOptions ¶
type CreateTaskOptions struct {
SessionID string // Required
Prompt string // Required
MaxRetries int // Default: 0
TimeoutSeconds int // Default: 3600
TenantID *string // For multi-tenant deployments
Labels map[string]string // Optional metadata labels
Annotations map[string]string // Optional metadata annotations
}
CreateTaskOptions contains options for creating a new task.
type CreateWebhookInput ¶
type CreateWebhookInput struct {
Name string
URL string
Events []string
MaxRetries *int
RetryDelaySeconds *int
TimeoutSeconds *int
Headers map[string]string
TenantID *string
Labels map[string]string
Annotations map[string]string
}
CreateWebhookInput contains input for creating a webhook.
type CreateWorkspaceOptions ¶
type CreateWorkspaceOptions struct {
Name string // Optional workspace name
Persist *bool // Whether to persist workspace (default: true)
StorageType string // Storage type (default: "volume")
Mobility string // Mobility mode (default: "local")
DiskQuotaMB *int // Disk quota in MB (nil for default)
TenantID *string // For multi-tenant deployments
Labels map[string]string // Optional metadata labels
Annotations map[string]string // Optional annotations
}
CreateWorkspaceOptions contains options for creating a workspace.
type ListPermissionRequestsOptions ¶
type ListPermissionRequestsOptions = store.ListPermissionRequestsOptions
ListPermissionRequestsOptions wraps store.ListPermissionRequestsOptions for convenience.
type ListScheduledTasksOptions ¶
type ListScheduledTasksOptions = store.ListScheduledTasksOptions
ListScheduledTasksOptions wraps store.ListScheduledTasksOptions.
type ListSessionsOptions ¶
type ListSessionsOptions = store.ListSessionsOptions
ListSessionsOptions wraps store.ListSessionsOptions for convenience.
type ListTaskRunsOptions ¶
type ListTaskRunsOptions = store.ListTaskRunsOptions
ListTaskRunsOptions wraps store.ListTaskRunsOptions for convenience.
type ListTasksOptions ¶
type ListTasksOptions = store.ListTasksOptions
ListTasksOptions wraps store.ListTasksOptions for convenience.
type ListWorkspacesOptions ¶
ListWorkspacesOptions contains options for listing workspaces.
type LogSubscriberManager ¶
type LogSubscriberManager struct {
// contains filtered or unexported fields
}
LogSubscriberManager manages real-time log subscribers. Full WebSocket integration will be implemented in G6.
func NewLogSubscriberManager ¶
func NewLogSubscriberManager(logger *zap.Logger) *LogSubscriberManager
NewLogSubscriberManager creates a new LogSubscriberManager.
func (*LogSubscriberManager) Broadcast ¶
func (m *LogSubscriberManager) Broadcast(log *store.Log)
Broadcast sends a log entry to all subscribers for the session. In G4, this is a stub that logs the broadcast. Full implementation in G6.
func (*LogSubscriberManager) Subscribe ¶
func (m *LogSubscriberManager) Subscribe(sessionID string, ch chan *store.Log)
Subscribe registers a channel to receive logs for a session.
func (*LogSubscriberManager) SubscriberCount ¶
func (m *LogSubscriberManager) SubscriberCount(sessionID string) int
SubscriberCount returns the number of subscribers for a session. Used for testing and monitoring.
func (*LogSubscriberManager) Unsubscribe ¶
func (m *LogSubscriberManager) Unsubscribe(sessionID string, ch chan *store.Log)
Unsubscribe removes a channel from session subscriptions.
type LogSubscriberManagerInterface ¶
type LogSubscriberManagerInterface interface {
// Broadcast sends a log entry to all subscribers for the session.
Broadcast(log *store.Log)
// Subscribe registers a channel to receive logs for a session.
Subscribe(sessionID string, ch chan *store.Log)
// Unsubscribe removes a channel from session subscriptions.
Unsubscribe(sessionID string, ch chan *store.Log)
}
LogSubscriberManagerInterface defines the interface for managing real-time log subscribers. This is implemented by LogSubscriberManager and used for dependency injection.
type PermissionManager ¶
type PermissionManager struct {
// contains filtered or unexported fields
}
PermissionManager handles permission request lifecycle.
func NewPermissionManager ¶
func NewPermissionManager( store store.Store, cmdSender CommandSender, sessionMgr SessionManagerInterface, auditLog audit.Logger, logger *zap.Logger, ) *PermissionManager
NewPermissionManager creates a new PermissionManager.
func (*PermissionManager) Cancel ¶
func (m *PermissionManager) Cancel(ctx context.Context, permID string) error
Cancel cancels a pending permission request.
func (*PermissionManager) Create ¶
func (m *PermissionManager) Create(ctx context.Context, req *CreatePermissionRequestInput) (*store.PermissionRequest, error)
Create stores a new permission request from runner.
func (*PermissionManager) Get ¶
func (m *PermissionManager) Get(ctx context.Context, permID string) (*store.PermissionRequest, error)
Get retrieves a permission request by ID.
func (*PermissionManager) List ¶
func (m *PermissionManager) List(ctx context.Context, opts ListPermissionRequestsOptions) (*store.ListResult[store.PermissionRequest], error)
List retrieves permission requests with filters.
func (*PermissionManager) Respond ¶
func (m *PermissionManager) Respond(ctx context.Context, permID string, approved bool, reason, respondedBy string) error
Respond approves or denies a permission request.
func (*PermissionManager) SetWebhookIntegration ¶
func (m *PermissionManager) SetWebhookIntegration(wi *WebhookIntegration)
SetWebhookIntegration sets the webhook integration for dispatching events.
type PermissionManagerInterface ¶
type PermissionManagerInterface interface {
// Create stores a new permission request from runner.
Create(ctx context.Context, req *CreatePermissionRequestInput) (*store.PermissionRequest, error)
// Respond approves or denies a permission request.
Respond(ctx context.Context, permID string, approved bool, reason, respondedBy string) error
// Get retrieves a permission request by ID.
Get(ctx context.Context, permID string) (*store.PermissionRequest, error)
// List retrieves permission requests with filters.
List(ctx context.Context, opts ListPermissionRequestsOptions) (*store.ListResult[store.PermissionRequest], error)
// Cancel cancels a pending permission request.
Cancel(ctx context.Context, permID string) error
}
PermissionManagerInterface defines the interface for permission request management. This is used for dependency injection in other components.
type PermissionTimeoutEnforcer ¶
type PermissionTimeoutEnforcer struct {
// contains filtered or unexported fields
}
PermissionTimeoutEnforcer monitors pending permission requests and suspends sessions that exceed their suspend_after_seconds timeout.
func NewPermissionTimeoutEnforcer ¶
func NewPermissionTimeoutEnforcer( store store.Store, sessionMgr SessionManagerInterface, logger *zap.Logger, opts ...PermissionTimeoutEnforcerOption, ) *PermissionTimeoutEnforcer
NewPermissionTimeoutEnforcer creates a new PermissionTimeoutEnforcer.
func (*PermissionTimeoutEnforcer) CheckTimeouts ¶
func (e *PermissionTimeoutEnforcer) CheckTimeouts(ctx context.Context) error
CheckTimeouts is exported for testing.
func (*PermissionTimeoutEnforcer) Start ¶
func (e *PermissionTimeoutEnforcer) Start(ctx context.Context)
Start begins the background permission timeout enforcement loop.
func (*PermissionTimeoutEnforcer) Stop ¶
func (e *PermissionTimeoutEnforcer) Stop()
Stop stops the permission timeout enforcer.
type PermissionTimeoutEnforcerOption ¶
type PermissionTimeoutEnforcerOption func(*PermissionTimeoutEnforcer)
PermissionTimeoutEnforcerOption is a functional option for PermissionTimeoutEnforcer.
func WithPermissionTimeoutCheckInterval ¶
func WithPermissionTimeoutCheckInterval(d time.Duration) PermissionTimeoutEnforcerOption
WithPermissionTimeoutCheckInterval sets the check interval for the timeout enforcer.
type ProfileNetwork ¶
type ProfileNetwork struct {
Level string `json:"level"` // "none", "allow_list", "proxy", "air_gapped"
AllowedHosts []string `json:"allowed_hosts"` // For allow_list mode
}
ProfileNetwork defines the network configuration from a profile.
type ProfileResources ¶
type ProfileResources struct {
CPU int `json:"cpu"`
Memory string `json:"memory"` // e.g., "8GB"
Disk string `json:"disk"` // e.g., "50GB"
}
ProfileResources defines the resource configuration from a profile.
type ProfileSelector ¶
type ProfileSelector struct {
OS string `json:"os,omitempty"` // e.g., "darwin", "linux"
Arch string `json:"arch,omitempty"` // e.g., "arm64", "amd64"
Capabilities []string `json:"capabilities,omitempty"` // e.g., ["gpu", "xcode"]
}
ProfileSelector defines the runner selector constraints from a profile.
type PromptTemplateData ¶
type PromptTemplateData struct {
Date string // Current date in YYYY-MM-DD format
DateTime string // Current date and time in RFC3339 format
RunNumber int // Total number of runs including this one
TaskName string // Name of the scheduled task
SessionID string // Session ID
ScheduledFor string // Scheduled run time in RFC3339 format
}
PromptTemplateData contains data for rendering prompt templates.
type ProviderRegistryInterface ¶
type ProviderRegistryInterface interface {
// GetDefault returns the default provider.
GetDefault(ctx context.Context) (provider.Provider, error)
// Get returns a provider by name.
Get(ctx context.Context, name string) (provider.Provider, error)
}
ProviderRegistryInterface defines the interface for provider operations needed by SessionManager.
type RegisterRequest ¶
type RegisterRequest struct {
Token string // Runner token (required)
Name string // Runner name
Hostname string // Runner hostname
SandboxMode string // Sandbox mode
SandboxTypes []string // Available sandbox types
Capabilities []string // Runner capabilities
Labels map[string]string // Key-value labels
}
RegisterRequest contains the data needed to register a runner.
type RegisterResult ¶
type RegisterResult struct {
RunnerID string // The runner ID (new or existing)
IsNew bool // True if a new runner was created
PoolName string // Pool name from token
}
RegisterResult contains the result of runner registration.
type ResumeResult ¶
type ResumeResult struct {
// Session is the resumed session.
Session *store.Session
// ContextSnapshot is the saved context to restore.
ContextSnapshot *ContextSnapshot
// SuspendStrategy is the strategy used when suspended.
SuspendStrategy string
// SnapshotID is the snapshot ID if one was created during suspend.
SnapshotID string
// WorkspaceSynced indicates if workspace was synced during suspend.
WorkspaceSynced bool
}
ResumeResult contains information about a resumed session.
type RunnerManager ¶
type RunnerManager struct {
// contains filtered or unexported fields
}
RunnerManager handles runner lifecycle and status transitions.
func NewRunnerManager ¶
func NewRunnerManager(store store.Store, connManager ConnectionManagerInterface, logger *zap.Logger, opts ...RunnerManagerOption) *RunnerManager
NewRunnerManager creates a new RunnerManager.
func (*RunnerManager) OnConnect ¶
func (m *RunnerManager) OnConnect(ctx context.Context, runnerID string) error
OnConnect is called when a runner connects to the server. Transitions the runner from offline to idle. Also checks for resuming sessions that need a runner and auto-attaches.
func (*RunnerManager) OnDisconnect ¶
func (m *RunnerManager) OnDisconnect(ctx context.Context, runnerID string) error
OnDisconnect is called when a runner disconnects from the server. Transitions the runner to offline status.
func (*RunnerManager) OnHeartbeat ¶
OnHeartbeat is called when a heartbeat is received from a runner. Updates last_seen timestamp and optionally status from heartbeat.
func (*RunnerManager) SetStatus ¶
func (m *RunnerManager) SetStatus(ctx context.Context, runnerID, status string) error
SetStatus updates a runner's status with validation.
func (*RunnerManager) SetWebhookIntegration ¶
func (m *RunnerManager) SetWebhookIntegration(wi *WebhookIntegration)
SetWebhookIntegration sets the webhook integration for dispatching events.
type RunnerManagerInterface ¶
type RunnerManagerInterface interface {
OnConnect(ctx context.Context, runnerID string) error
OnDisconnect(ctx context.Context, runnerID string) error
OnHeartbeat(ctx context.Context, runnerID string, hb *pb.Heartbeat) error
SetStatus(ctx context.Context, runnerID, status string) error
}
RunnerManagerInterface defines the interface for runner management. This is used for dependency injection in other components.
type RunnerManagerOption ¶
type RunnerManagerOption func(*RunnerManager)
RunnerManagerOption is a functional option for RunnerManager.
func WithSessionManager ¶
func WithSessionManager(sm SessionManagerInterface) RunnerManagerOption
WithSessionManager sets the session manager for the runner manager.
func WithTaskManager ¶
func WithTaskManager(tm TaskManagerInterface) RunnerManagerOption
WithTaskManager sets the task manager for the runner manager.
type RunnerRegistry ¶
type RunnerRegistry struct {
// contains filtered or unexported fields
}
RunnerRegistry handles runner registration and lookup.
func NewRunnerRegistry ¶
func NewRunnerRegistry(store store.Store, tokenSvc *auth.RunnerTokenService, logger *zap.Logger) *RunnerRegistry
NewRunnerRegistry creates a new RunnerRegistry.
func (*RunnerRegistry) Register ¶
func (r *RunnerRegistry) Register(ctx context.Context, req *RegisterRequest) (*RegisterResult, error)
Register registers a new runner or updates an existing one. The registration flow: 1. Validate token via tokenSvc.Validate() 2. If token is bound to a runner -> check it exists, update it 3. If token is not bound -> look up by name or create new runner 4. Bind token to runner if not already bound
type ScheduledSessionActivator ¶
type ScheduledSessionActivator struct {
// contains filtered or unexported fields
}
ScheduledSessionActivator is a background job that monitors sessions with lifecycle_mode='scheduled' and activates them when their next_scheduled_at time arrives.
This enables the scheduled session lifecycle mode where: - Sessions stay suspended between scheduled runs - Auto-resume at scheduled times (based on schedule_cron) - Execute their scheduled tasks when active - Auto-suspend again after scheduled tasks complete (handled elsewhere)
func NewScheduledSessionActivator ¶
func NewScheduledSessionActivator(cfg ScheduledSessionActivatorConfig) *ScheduledSessionActivator
NewScheduledSessionActivator creates a new ScheduledSessionActivator.
func (*ScheduledSessionActivator) Start ¶
func (a *ScheduledSessionActivator) Start() error
Start begins the background job.
func (*ScheduledSessionActivator) Stop ¶
func (a *ScheduledSessionActivator) Stop()
Stop gracefully stops the background job.
type ScheduledSessionActivatorConfig ¶
type ScheduledSessionActivatorConfig struct {
Store store.Store
SessionMgr SessionManagerInterface
CheckInterval time.Duration
BatchSize int
Logger *zap.Logger
}
ScheduledSessionActivatorConfig holds configuration for the activator.
type ScheduledTaskExecutor ¶
type ScheduledTaskExecutor struct {
// contains filtered or unexported fields
}
ScheduledTaskExecutor polls for due scheduled tasks and executes them.
func NewScheduledTaskExecutor ¶
func NewScheduledTaskExecutor( scheduledTaskSvc ScheduledTaskServiceInterface, taskMgr TaskManagerInterface, logger *zap.Logger, opts ...ScheduledTaskExecutorOption, ) *ScheduledTaskExecutor
NewScheduledTaskExecutor creates a new ScheduledTaskExecutor.
func (*ScheduledTaskExecutor) IsRunning ¶
func (e *ScheduledTaskExecutor) IsRunning() bool
IsRunning returns whether the executor is running.
func (*ScheduledTaskExecutor) RunNow ¶
func (e *ScheduledTaskExecutor) RunNow(ctx context.Context) error
RunNow immediately processes due tasks (for testing).
func (*ScheduledTaskExecutor) Start ¶
func (e *ScheduledTaskExecutor) Start(ctx context.Context)
Start begins the background scheduled task execution loop.
func (*ScheduledTaskExecutor) Stop ¶
func (e *ScheduledTaskExecutor) Stop()
Stop stops the scheduled task executor.
type ScheduledTaskExecutorOption ¶
type ScheduledTaskExecutorOption func(*ScheduledTaskExecutor)
ScheduledTaskExecutorOption is a functional option for ScheduledTaskExecutor.
func WithScheduledTaskBatchSize ¶
func WithScheduledTaskBatchSize(size int) ScheduledTaskExecutorOption
WithScheduledTaskBatchSize sets the number of tasks to process per poll.
func WithScheduledTaskCheckInterval ¶
func WithScheduledTaskCheckInterval(d time.Duration) ScheduledTaskExecutorOption
WithScheduledTaskCheckInterval sets the polling interval.
type ScheduledTaskService ¶
type ScheduledTaskService struct {
// contains filtered or unexported fields
}
ScheduledTaskService handles scheduled task lifecycle.
func NewScheduledTaskService ¶
func NewScheduledTaskService( store store.Store, taskMgr TaskManagerInterface, auditLog audit.Logger, logger *zap.Logger, ) *ScheduledTaskService
NewScheduledTaskService creates a new ScheduledTaskService.
func (*ScheduledTaskService) CalculateNextRunAt ¶
func (s *ScheduledTaskService) CalculateNextRunAt(cronExpr, timezone string, after time.Time) (*time.Time, error)
CalculateNextRunAt calculates the next run time for a cron expression.
func (*ScheduledTaskService) Create ¶
func (s *ScheduledTaskService) Create(ctx context.Context, opts CreateScheduledTaskOptions) (*store.ScheduledTask, error)
Create creates a new scheduled task.
func (*ScheduledTaskService) Delete ¶
func (s *ScheduledTaskService) Delete(ctx context.Context, taskID string) error
Delete deletes a scheduled task.
func (*ScheduledTaskService) ExecuteScheduledTask ¶
func (s *ScheduledTaskService) ExecuteScheduledTask(ctx context.Context, scheduledTask *store.ScheduledTask) (*store.Task, error)
ExecuteScheduledTask executes a scheduled task by creating a regular task.
func (*ScheduledTaskService) Get ¶
func (s *ScheduledTaskService) Get(ctx context.Context, taskID string) (*store.ScheduledTask, error)
Get retrieves a scheduled task by ID.
func (*ScheduledTaskService) GetDue ¶
func (s *ScheduledTaskService) GetDue(ctx context.Context, limit int) ([]*store.ScheduledTask, error)
GetDue retrieves scheduled tasks that are due to run.
func (*ScheduledTaskService) List ¶
func (s *ScheduledTaskService) List(ctx context.Context, opts ListScheduledTasksOptions) (*store.ListResult[store.ScheduledTask], error)
List retrieves scheduled tasks with filtering.
func (*ScheduledTaskService) MarkTaskCompleted ¶
func (s *ScheduledTaskService) MarkTaskCompleted(ctx context.Context, scheduledTaskID string, success bool) error
MarkTaskCompleted marks a scheduled task's last run as completed (success or failure). This should be called by the task completion handler.
func (*ScheduledTaskService) Pause ¶
func (s *ScheduledTaskService) Pause(ctx context.Context, taskID string) error
Pause pauses a scheduled task.
func (*ScheduledTaskService) Resume ¶
func (s *ScheduledTaskService) Resume(ctx context.Context, taskID string) error
Resume resumes a paused scheduled task.
func (*ScheduledTaskService) Update ¶
func (s *ScheduledTaskService) Update(ctx context.Context, taskID string, opts UpdateScheduledTaskOptions) (*store.ScheduledTask, error)
Update updates a scheduled task.
type ScheduledTaskServiceInterface ¶
type ScheduledTaskServiceInterface interface {
Create(ctx context.Context, opts CreateScheduledTaskOptions) (*store.ScheduledTask, error)
Get(ctx context.Context, taskID string) (*store.ScheduledTask, error)
List(ctx context.Context, opts ListScheduledTasksOptions) (*store.ListResult[store.ScheduledTask], error)
Update(ctx context.Context, taskID string, opts UpdateScheduledTaskOptions) (*store.ScheduledTask, error)
Delete(ctx context.Context, taskID string) error
Pause(ctx context.Context, taskID string) error
Resume(ctx context.Context, taskID string) error
Trigger(ctx context.Context, taskID string) (*store.Task, error)
GetDue(ctx context.Context, limit int) ([]*store.ScheduledTask, error)
ExecuteScheduledTask(ctx context.Context, scheduledTask *store.ScheduledTask) (*store.Task, error)
MarkTaskCompleted(ctx context.Context, scheduledTaskID string, success bool) error
CalculateNextRunAt(cronExpr, timezone string, after time.Time) (*time.Time, error)
}
ScheduledTaskServiceInterface defines the interface for scheduled task management.
type SessionManager ¶
type SessionManager struct {
// contains filtered or unexported fields
}
SessionManager handles session lifecycle and state transitions.
func NewSessionManager ¶
func NewSessionManager(store store.Store, connManager ConnectionManagerInterface, cmdSender CommandSender, logger *zap.Logger) *SessionManager
NewSessionManager creates a new SessionManager.
func NewSessionManagerWithConfig ¶
func NewSessionManagerWithConfig(cfg SessionManagerConfig) *SessionManager
NewSessionManagerWithConfig creates a new SessionManager with full configuration.
func (*SessionManager) Activate ¶
func (m *SessionManager) Activate(ctx context.Context, sessionID, runnerID string) error
Activate transitions a session from pending/resuming to active. This is called when a runner is assigned to the session.
func (*SessionManager) AttachRunner ¶
func (m *SessionManager) AttachRunner(ctx context.Context, sessionID, runnerID string) error
AttachRunner attaches a runner to a session. The session must be in pending or resuming state.
func (*SessionManager) Create ¶
func (m *SessionManager) Create(ctx context.Context, opts CreateSessionOptions) (*store.Session, error)
Create creates a new session.
func (*SessionManager) DetachRunner ¶
func (m *SessionManager) DetachRunner(ctx context.Context, sessionID string) error
DetachRunner detaches the runner from a session without changing status. This is called internally when a runner disconnects.
func (*SessionManager) GetContextSnapshot ¶
func (m *SessionManager) GetContextSnapshot(ctx context.Context, sessionID string) (*ContextSnapshot, error)
GetContextSnapshot retrieves the context snapshot for a session.
func (*SessionManager) GetWorkspaceHostPath ¶
func (m *SessionManager) GetWorkspaceHostPath(ctx context.Context, sessionID string) (string, error)
GetWorkspaceHostPath returns the host filesystem path for a session's workspace. This path is used for mounting the workspace into containers.
func (*SessionManager) List ¶
func (m *SessionManager) List(ctx context.Context, opts ListSessionsOptions) (*store.ListResult[store.Session], error)
List retrieves sessions matching the given options.
func (*SessionManager) Resume ¶
func (m *SessionManager) Resume(ctx context.Context, sessionID string) error
Resume transitions a session from suspended to resuming. The session will be fully activated once a runner is attached.
func (*SessionManager) ResumeWithResult ¶
func (m *SessionManager) ResumeWithResult(ctx context.Context, sessionID string) (*ResumeResult, error)
ResumeWithResult transitions a session from suspended to resuming and returns resume info.
func (*SessionManager) SetProviderRegistry ¶
func (m *SessionManager) SetProviderRegistry(pr ProviderRegistryInterface)
SetProviderRegistry sets the provider registry. This allows optional injection.
func (*SessionManager) SetTaskManager ¶
func (m *SessionManager) SetTaskManager(tm TaskManagerInterface)
SetTaskManager sets the task manager. This allows optional injection.
func (*SessionManager) SetWebhookIntegration ¶
func (m *SessionManager) SetWebhookIntegration(wi *WebhookIntegration)
SetWebhookIntegration sets the webhook integration for dispatching events.
func (*SessionManager) SetWorkspaceManager ¶
func (m *SessionManager) SetWorkspaceManager(wm WorkspaceManagerInterface)
SetWorkspaceManager sets the workspace manager. This allows optional injection.
func (*SessionManager) Suspend ¶
func (m *SessionManager) Suspend(ctx context.Context, sessionID, strategy string) error
Suspend transitions a session from active to suspended. The strategy parameter specifies how to handle the suspend (pause, snapshot, etc.).
func (*SessionManager) SuspendWithOptions ¶
func (m *SessionManager) SuspendWithOptions(ctx context.Context, sessionID string, opts SuspendOptions) error
SuspendWithOptions transitions a session from active to suspended with full options.
func (*SessionManager) Terminate ¶
func (m *SessionManager) Terminate(ctx context.Context, sessionID string) error
Terminate transitions a session to terminated status. This can be called from any non-terminated state.
func (*SessionManager) UpdateContextSnapshot ¶
func (m *SessionManager) UpdateContextSnapshot(ctx context.Context, sessionID string, snapshot *ContextSnapshot) error
UpdateContextSnapshot updates the context snapshot for a session.
type SessionManagerConfig ¶
type SessionManagerConfig struct {
Store store.Store
ConnManager ConnectionManagerInterface
CmdSender CommandSender
WorkspaceManager WorkspaceManagerInterface
AuditLog audit.Logger
ProviderRegistry ProviderRegistryInterface
Logger *zap.Logger
}
SessionManagerConfig holds configuration for SessionManager.
type SessionManagerInterface ¶
type SessionManagerInterface interface {
Create(ctx context.Context, opts CreateSessionOptions) (*store.Session, error)
Get(ctx context.Context, sessionID string) (*store.Session, error)
List(ctx context.Context, opts ListSessionsOptions) (*store.ListResult[store.Session], error)
Activate(ctx context.Context, sessionID, runnerID string) error
Suspend(ctx context.Context, sessionID, strategy string) error
Resume(ctx context.Context, sessionID string) error
Terminate(ctx context.Context, sessionID string) error
AttachRunner(ctx context.Context, sessionID, runnerID string) error
DetachRunner(ctx context.Context, sessionID string) error
UpdateContextSnapshot(ctx context.Context, sessionID string, snapshot *ContextSnapshot) error
}
SessionManagerInterface defines the interface for session management. This is used for dependency injection in other components.
type StaleDetector ¶
type StaleDetector struct {
// contains filtered or unexported fields
}
StaleDetector monitors runners and marks stale ones as offline.
func NewStaleDetector ¶
func NewStaleDetector( store store.Store, connManager ConnectionManagerInterface, runnerManager *RunnerManager, logger *zap.Logger, opts ...StaleDetectorOption, ) *StaleDetector
NewStaleDetector creates a new StaleDetector.
func (*StaleDetector) Start ¶
func (sd *StaleDetector) Start(ctx context.Context)
Start begins the background stale detection loop.
type StaleDetectorOption ¶
type StaleDetectorOption func(*StaleDetector)
StaleDetectorOption is a functional option for StaleDetector.
func WithCheckInterval ¶
func WithCheckInterval(d time.Duration) StaleDetectorOption
WithCheckInterval sets the check interval for the stale detector.
func WithStaleThreshold ¶
func WithStaleThreshold(d time.Duration) StaleDetectorOption
WithStaleThreshold sets the stale threshold for the stale detector.
type StreamManager ¶
type StreamManager struct {
// contains filtered or unexported fields
}
StreamManager integrates the streaming manager with the server. It provides an interface for HTTP handlers and WebSocket connections to interact with the streaming infrastructure.
func NewStreamManager ¶
func NewStreamManager(config StreamManagerConfig, s store.Store, logger *zap.Logger) (*StreamManager, error)
NewStreamManager creates a new StreamManager.
func (*StreamManager) GetSFU ¶
func (m *StreamManager) GetSFU() *sfu.SFU
GetSFU returns the SFU instance.
func (*StreamManager) GetSignalingHandler ¶
func (m *StreamManager) GetSignalingHandler() *sfu.SignalingHandler
GetSignalingHandler returns the signaling handler.
func (*StreamManager) GetStreamBySession ¶
func (m *StreamManager) GetStreamBySession(ctx context.Context, sessionID string, streamType streaming.StreamType) (*streaming.Stream, error)
GetStreamBySession returns the active stream for a session and type.
func (*StreamManager) ListSessionStreams ¶
func (m *StreamManager) ListSessionStreams(ctx context.Context, sessionID string) ([]*streaming.Stream, error)
ListSessionStreams lists all streams for a session.
func (*StreamManager) ListStreams ¶
func (m *StreamManager) ListStreams(ctx context.Context, params streaming.ListStreamsParams) ([]*streaming.Stream, int, error)
ListStreams lists streams matching the given parameters.
func (*StreamManager) RegisterProvider ¶
func (m *StreamManager) RegisterProvider(provider streaming.StreamProvider) error
RegisterProvider registers a stream provider.
func (*StreamManager) Start ¶
func (m *StreamManager) Start(ctx context.Context) error
Start starts the stream manager.
func (*StreamManager) StartStream ¶
func (m *StreamManager) StartStream(ctx context.Context, opts streaming.StreamOptions) (*streaming.Stream, error)
StartStream starts a new stream.
func (*StreamManager) Stats ¶
func (m *StreamManager) Stats() manager.Stats
Stats returns manager statistics.
func (*StreamManager) Stop ¶
func (m *StreamManager) Stop(ctx context.Context) error
Stop stops the stream manager.
func (*StreamManager) StopStream ¶
func (m *StreamManager) StopStream(ctx context.Context, streamID string) error
StopStream stops a stream.
func (*StreamManager) UnregisterProvider ¶
func (m *StreamManager) UnregisterProvider(name string) bool
UnregisterProvider unregisters a stream provider.
func (*StreamManager) UpgradeWebSocket ¶
func (m *StreamManager) UpgradeWebSocket(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error)
UpgradeWebSocket upgrades an HTTP connection to WebSocket.
type StreamManagerConfig ¶
type StreamManagerConfig struct {
// Manager configuration
Manager manager.Config
// WebSocket configuration
WebSocketReadBufferSize int
WebSocketWriteBufferSize int
AllowedOrigins []string
}
StreamManagerConfig contains configuration for the StreamManager.
func DefaultStreamManagerConfig ¶
func DefaultStreamManagerConfig() StreamManagerConfig
DefaultStreamManagerConfig returns a default configuration.
type SuspendOptions ¶
type SuspendOptions struct {
// Strategy specifies how to handle the suspend (pause, snapshot, etc.).
Strategy string
// ContextSnapshot is the context to save with the session.
// If nil, a basic snapshot is created automatically.
ContextSnapshot *ContextSnapshot
// WorkspaceSynced indicates if workspace was synced to object storage.
WorkspaceSynced bool
// SnapshotID is the ID of any snapshot created during suspend.
SnapshotID string
}
SuspendOptions contains options for suspending a session.
type TaskCompletedResult ¶
type TaskCompletedResult struct {
RunID string
Success bool
Error string
ExitCode *int
TokensInput int
TokensOutput int
}
TaskCompletedResult contains the result of a completed task run.
type TaskContextSnapshot ¶
type TaskContextSnapshot struct {
// TaskID is the current task ID.
TaskID string `json:"task_id,omitempty"`
// RunID is the current task run ID.
RunID string `json:"run_id,omitempty"`
// Status is the task status when suspended.
Status string `json:"status,omitempty"`
// Progress is the estimated task progress (0-100).
Progress int `json:"progress,omitempty"`
}
TaskContextSnapshot contains task-related state for resume.
type TaskManager ¶
type TaskManager struct {
// contains filtered or unexported fields
}
TaskManager handles task lifecycle and execution.
func NewTaskManager ¶
func NewTaskManager( store store.Store, cmdSender CommandSender, sessionMgr SessionManagerInterface, auditLog audit.Logger, logger *zap.Logger, ) *TaskManager
NewTaskManager creates a new TaskManager.
func (*TaskManager) Cancel ¶
func (m *TaskManager) Cancel(ctx context.Context, taskID string) error
Cancel cancels a task. This can be called from pending or running state.
func (*TaskManager) Create ¶
func (m *TaskManager) Create(ctx context.Context, opts CreateTaskOptions) (*store.Task, error)
Create creates a new task.
func (*TaskManager) Execute ¶
func (m *TaskManager) Execute(ctx context.Context, taskID string) error
Execute sends a task to a runner for execution.
func (*TaskManager) FailRun ¶
func (m *TaskManager) FailRun(ctx context.Context, runID, reason string) error
FailRun marks a task run as failed.
func (*TaskManager) List ¶
func (m *TaskManager) List(ctx context.Context, opts ListTasksOptions) (*store.ListResult[store.Task], error)
List retrieves tasks matching the given options.
func (*TaskManager) OnTaskAccepted ¶
func (m *TaskManager) OnTaskAccepted(ctx context.Context, runID string) error
OnTaskAccepted is called when a runner accepts a task (pending → assigned).
func (*TaskManager) OnTaskCompleted ¶
func (m *TaskManager) OnTaskCompleted(ctx context.Context, result *TaskCompletedResult) error
OnTaskCompleted is called when a task run completes (running → completed/failed).
func (*TaskManager) OnTaskProgress ¶
OnTaskProgress is called when a runner reports progress. Currently a no-op since we don't store progress in DB, but could be used for real-time updates.
func (*TaskManager) OnTaskStarted ¶
func (m *TaskManager) OnTaskStarted(ctx context.Context, runID string) error
OnTaskStarted is called when a runner starts executing a task (assigned → running).
func (*TaskManager) ReExecute ¶
func (m *TaskManager) ReExecute(ctx context.Context, taskID string) error
ReExecute re-sends a running task to a runner after session resume. Unlike Execute, this reuses the existing task_run instead of creating a new one. This is used when a session resumes and needs to continue a task that was interrupted by suspend.
func (*TaskManager) SetWebhookIntegration ¶
func (m *TaskManager) SetWebhookIntegration(wi *WebhookIntegration)
SetWebhookIntegration sets the webhook integration for dispatching events.
func (*TaskManager) ShouldRetry ¶
ShouldRetry checks if a task should be retried.
type TaskManagerInterface ¶
type TaskManagerInterface interface {
Create(ctx context.Context, opts CreateTaskOptions) (*store.Task, error)
Get(ctx context.Context, taskID string) (*store.Task, error)
List(ctx context.Context, opts ListTasksOptions) (*store.ListResult[store.Task], error)
Cancel(ctx context.Context, taskID string) error
Execute(ctx context.Context, taskID string) error
ReExecute(ctx context.Context, taskID string) error
CreateRun(ctx context.Context, taskID string) (*store.TaskRun, error)
OnTaskAccepted(ctx context.Context, runID string) error
OnTaskStarted(ctx context.Context, runID string) error
OnTaskProgress(ctx context.Context, runID string, progress int) error
OnTaskCompleted(ctx context.Context, result *TaskCompletedResult) error
FailRun(ctx context.Context, runID, reason string) error
ShouldRetry(ctx context.Context, taskID string) (bool, error)
Retry(ctx context.Context, taskID string) (*store.TaskRun, error)
}
TaskManagerInterface defines the interface for task management. This is used for dependency injection in other components.
type TaskTimeoutEnforcer ¶
type TaskTimeoutEnforcer struct {
// contains filtered or unexported fields
}
TaskTimeoutEnforcer monitors running tasks and enforces timeouts.
func NewTaskTimeoutEnforcer ¶
func NewTaskTimeoutEnforcer( store store.Store, taskMgr *TaskManager, cmdSender CommandSender, logger *zap.Logger, opts ...TaskTimeoutEnforcerOption, ) *TaskTimeoutEnforcer
NewTaskTimeoutEnforcer creates a new TaskTimeoutEnforcer.
func (*TaskTimeoutEnforcer) Start ¶
func (e *TaskTimeoutEnforcer) Start(ctx context.Context)
Start begins the background timeout enforcement loop.
func (*TaskTimeoutEnforcer) Stop ¶
func (e *TaskTimeoutEnforcer) Stop()
Stop stops the timeout enforcer.
type TaskTimeoutEnforcerOption ¶
type TaskTimeoutEnforcerOption func(*TaskTimeoutEnforcer)
TaskTimeoutEnforcerOption is a functional option for TaskTimeoutEnforcer.
func WithTimeoutCheckInterval ¶
func WithTimeoutCheckInterval(d time.Duration) TaskTimeoutEnforcerOption
WithTimeoutCheckInterval sets the check interval for the timeout enforcer.
type UpdateScheduledTaskOptions ¶
type UpdateScheduledTaskOptions struct {
Name *string
Description *string
CronExpression *string
Timezone *string
PromptTemplate *string
TimeoutSeconds *int
MaxRetries *int
OnFailure *string
MaxConsecutiveFailures *int
Labels map[string]string
Annotations map[string]string
}
UpdateScheduledTaskOptions contains options for updating a scheduled task.
type UpdateWebhookInput ¶
type UpdateWebhookInput struct {
Name *string
URL *string
Events []string
IsActive *bool
MaxRetries *int
RetryDelaySeconds *int
TimeoutSeconds *int
Headers map[string]string
Labels map[string]string
Annotations map[string]string
}
UpdateWebhookInput contains input for updating a webhook.
type WebhookDispatcher ¶
type WebhookDispatcher interface {
Dispatch(ctx context.Context, eventType string, resource webhook.ResourceInfo, data any, tenantID *string) error
}
WebhookDispatcher is the interface for dispatching webhook events.
type WebhookIntegration ¶
type WebhookIntegration struct {
// contains filtered or unexported fields
}
WebhookIntegration provides methods for dispatching webhook events from managers.
func NewWebhookIntegration ¶
func NewWebhookIntegration(dispatcher WebhookDispatcher, logger *zap.Logger) *WebhookIntegration
NewWebhookIntegration creates a new WebhookIntegration.
func (*WebhookIntegration) DispatchPermissionEvent ¶
func (w *WebhookIntegration) DispatchPermissionEvent(ctx context.Context, eventType string, perm *store.PermissionRequest)
DispatchPermissionEvent dispatches a permission-related webhook event.
func (*WebhookIntegration) DispatchRunnerEvent ¶
func (w *WebhookIntegration) DispatchRunnerEvent(ctx context.Context, eventType string, runner *store.Runner, sessionID *string)
DispatchRunnerEvent dispatches a runner-related webhook event.
func (*WebhookIntegration) DispatchSessionEvent ¶
func (w *WebhookIntegration) DispatchSessionEvent(ctx context.Context, eventType string, session *store.Session)
DispatchSessionEvent dispatches a session-related webhook event.
func (*WebhookIntegration) DispatchTaskEvent ¶
func (w *WebhookIntegration) DispatchTaskEvent(ctx context.Context, eventType string, task *store.Task, run *store.TaskRun)
DispatchTaskEvent dispatches a task-related webhook event.
type WebhookManager ¶
type WebhookManager struct {
// contains filtered or unexported fields
}
WebhookManager handles webhook configuration and event dispatch.
func NewWebhookManager ¶
func NewWebhookManager( store store.Store, config webhook.Config, logger *zap.Logger, ) *WebhookManager
NewWebhookManager creates a new WebhookManager.
func (*WebhookManager) Create ¶
func (m *WebhookManager) Create(ctx context.Context, input *CreateWebhookInput) (*store.Webhook, string, error)
Create creates a new webhook and returns its secret (only shown once).
func (*WebhookManager) Delete ¶
func (m *WebhookManager) Delete(ctx context.Context, webhookID string) error
Delete deletes a webhook and cancels its pending events.
func (*WebhookManager) DeliverPendingEvents ¶
DeliverPendingEvents processes pending webhook events. This is called by the retry job.
func (*WebhookManager) Dispatch ¶
func (m *WebhookManager) Dispatch(ctx context.Context, eventType string, resource webhook.ResourceInfo, data any, tenantID *string) error
Dispatch sends an event to all matching webhooks.
func (*WebhookManager) GetByName ¶
func (m *WebhookManager) GetByName(ctx context.Context, name string, tenantID *string) (*store.Webhook, error)
GetByName retrieves a webhook by name.
func (*WebhookManager) GetEvent ¶
func (m *WebhookManager) GetEvent(ctx context.Context, eventID string) (*store.WebhookEvent, error)
GetEvent retrieves a webhook event by ID.
func (*WebhookManager) List ¶
func (m *WebhookManager) List(ctx context.Context, opts store.ListWebhooksOptions) (*store.ListResult[store.Webhook], error)
List retrieves webhooks with filters.
func (*WebhookManager) ListEvents ¶
func (m *WebhookManager) ListEvents(ctx context.Context, opts store.ListWebhookEventsOptions) (*store.ListResult[store.WebhookEvent], error)
ListEvents retrieves webhook events with filters.
func (*WebhookManager) RetryEvent ¶
func (m *WebhookManager) RetryEvent(ctx context.Context, eventID string) error
RetryEvent manually retries a failed webhook event.
func (*WebhookManager) RotateSecret ¶
RotateSecret generates a new secret for a webhook. Returns the new secret (only shown once).
func (*WebhookManager) Stop ¶
func (m *WebhookManager) Stop()
Stop gracefully stops the webhook manager.
func (*WebhookManager) Update ¶
func (m *WebhookManager) Update(ctx context.Context, webhookID string, input *UpdateWebhookInput) error
Update updates a webhook.
type WebhookManagerInterface ¶
type WebhookManagerInterface interface {
// Webhook CRUD
Create(ctx context.Context, input *CreateWebhookInput) (*store.Webhook, string, error)
Get(ctx context.Context, webhookID string) (*store.Webhook, error)
GetByName(ctx context.Context, name string, tenantID *string) (*store.Webhook, error)
List(ctx context.Context, opts store.ListWebhooksOptions) (*store.ListResult[store.Webhook], error)
Update(ctx context.Context, webhookID string, input *UpdateWebhookInput) error
Delete(ctx context.Context, webhookID string) error
RotateSecret(ctx context.Context, webhookID string) (string, error)
// Event dispatch
Dispatch(ctx context.Context, eventType string, resource webhook.ResourceInfo, data any, tenantID *string) error
// Webhook events
GetEvent(ctx context.Context, eventID string) (*store.WebhookEvent, error)
ListEvents(ctx context.Context, opts store.ListWebhookEventsOptions) (*store.ListResult[store.WebhookEvent], error)
RetryEvent(ctx context.Context, eventID string) error
}
WebhookManagerInterface defines the interface for webhook management.
type WorkspaceManager ¶
type WorkspaceManager struct {
// contains filtered or unexported fields
}
WorkspaceManager handles workspace lifecycle and storage.
func NewWorkspaceManager ¶
func NewWorkspaceManager(store store.Store, cfg config.WorkspaceStorageConfig, logger *zap.Logger) *WorkspaceManager
NewWorkspaceManager creates a new WorkspaceManager.
func (*WorkspaceManager) CleanupHostDirectory ¶
func (m *WorkspaceManager) CleanupHostDirectory(ctx context.Context, workspaceID string) error
CleanupHostDirectory removes the host directory for a workspace.
func (*WorkspaceManager) Create ¶
func (m *WorkspaceManager) Create(ctx context.Context, opts CreateWorkspaceOptions) (*store.Workspace, error)
Create creates a new workspace with a corresponding host directory.
func (*WorkspaceManager) Delete ¶
func (m *WorkspaceManager) Delete(ctx context.Context, workspaceID string) error
Delete soft-deletes a workspace. The actual host directory cleanup is handled separately based on configuration.
func (*WorkspaceManager) EnsureHostDirectory ¶
func (m *WorkspaceManager) EnsureHostDirectory(ctx context.Context, workspaceID string) (string, error)
EnsureHostDirectory creates the host directory for a workspace if it doesn't exist. Returns the host path that was created or already exists.
func (*WorkspaceManager) GetBaseDir ¶
func (m *WorkspaceManager) GetBaseDir() string
GetBaseDir returns the configured base directory for workspaces.
func (*WorkspaceManager) GetHostPath ¶
GetHostPath returns the host filesystem path for a workspace. This path is used for mounting the workspace into containers.
func (*WorkspaceManager) IsInUse ¶
IsInUse checks if a workspace is currently being used by any active session.
func (*WorkspaceManager) List ¶
func (m *WorkspaceManager) List(ctx context.Context, opts ListWorkspacesOptions) (*store.ListResult[store.Workspace], error)
List returns workspaces matching the filter options.
type WorkspaceManagerInterface ¶
type WorkspaceManagerInterface interface {
Create(ctx context.Context, opts CreateWorkspaceOptions) (*store.Workspace, error)
Get(ctx context.Context, workspaceID string) (*store.Workspace, error)
List(ctx context.Context, opts ListWorkspacesOptions) (*store.ListResult[store.Workspace], error)
Update(ctx context.Context, workspaceID string, updates store.WorkspaceUpdates) (*store.Workspace, error)
Delete(ctx context.Context, workspaceID string) error
GetHostPath(ctx context.Context, workspaceID string) (string, error)
EnsureHostDirectory(ctx context.Context, workspaceID string) (string, error)
CleanupHostDirectory(ctx context.Context, workspaceID string) error
IsInUse(ctx context.Context, workspaceID string) (bool, error)
}
WorkspaceManagerInterface defines the interface for workspace management.
type WorkspaceStore ¶
type WorkspaceStore interface {
CreateWorkspace(ctx context.Context, ws *store.Workspace) error
GetWorkspace(ctx context.Context, id string) (*store.Workspace, error)
ListWorkspaces(ctx context.Context, opts store.ListWorkspacesOptions) (*store.ListResult[store.Workspace], error)
UpdateWorkspace(ctx context.Context, id string, updates store.WorkspaceUpdates) error
DeleteWorkspace(ctx context.Context, id string) error
ListSessions(ctx context.Context, opts store.ListSessionsOptions) (*store.ListResult[store.Session], error)
}
WorkspaceStore defines the minimal store interface needed by WorkspaceManager.
Source Files
¶
- context_snapshot.go
- interfaces.go
- log_subscriber_manager.go
- permission_manager.go
- permission_timeout_enforcer.go
- runner_manager.go
- runner_registry.go
- scheduled_session_activator.go
- scheduled_task_executor.go
- scheduled_task_service.go
- session_manager.go
- stream_manager.go
- task_manager.go
- webhook_integration.go
- webhook_manager.go
- workspace_manager.go