Documentation
¶
Overview ¶
Package executor provides convergence goal execution.
Package executor provides workflow and goal execution.
Utility functions for the executor.
Session event logging functions for the executor.
Skill handling functions for the executor.
Sub-agent spawning and execution functions for the executor.
Tool execution functions for the executor.
Tracing instrumentation for the executor.
Package executor provides workflow execution.
Package executor provides workflow execution with XML-structured context.
Index ¶
- Constants
- func BuildTaskContext(role, parentGoal, task string) string
- func BuildTaskContextWithCorrection(role, parentGoal, task, correction string) string
- func BuildTaskContextWithPriorGoals(role, parentGoal, task string, priorGoals []GoalOutput) string
- func BuildWorkspaceContext(workspace string) string
- func FormatInterruptsBlock(goalDescription string, interrupts []InterruptMessage) string
- type AgentContribution
- type AgentIdentity
- type AgentState
- type ConvergenceIteration
- type ConvergenceResult
- type DiscussMessage
- type Executor
- func (e *Executor) AddUntrustedContent(ctx context.Context, content, source string)
- func (e *Executor) AddUntrustedContentWithTaint(ctx context.Context, content, source string, taintedBy []string)
- func (e *Executor) ClearDiscussPublisher()
- func (e *Executor) ClearEventPublisher()
- func (e *Executor) GetConvergenceFailures() map[string]int
- func (e *Executor) InterruptBuffer() *InterruptBuffer
- func (e *Executor) LogBashSecurity(command, step string, allowed bool, reason string, durationMs int64, ...)
- func (e *Executor) PreFlight() error
- func (e *Executor) Registry() *tools.Registry
- func (e *Executor) Run(ctx context.Context, inputs map[string]string) (*Result, error)
- func (e *Executor) SetDebug(debug bool)
- func (e *Executor) SetDiscussPublisher(fn func(goalName, content string))
- func (e *Executor) SetEventPublisher(fn func(event session.Event))
- func (e *Executor) SetInterruptBuffer(buf *InterruptBuffer)
- func (e *Executor) SetMCPManager(m *mcp.Manager)
- func (e *Executor) SetMetricsCollector(mc MetricsCollector)
- func (e *Executor) SetObservationExtraction(extractor ObservationExtractor, store ObservationStore)
- func (e *Executor) SetPersistentSession(persistent bool)
- func (e *Executor) SetSecurityResearchScope(scope string)
- func (e *Executor) SetSecurityVerifier(v *security.Verifier)
- func (e *Executor) SetSession(sess *session.Session, mgr session.SessionManager)
- func (e *Executor) SetSkills(refs []skills.SkillRef)
- func (e *Executor) SetSupervision(store *checkpoint.Store, supervisorProvider llm.Provider, humanAvailable bool, ...)
- func (e *Executor) SetTimeouts(mcp, webSearch, webFetch int)
- func (e *Executor) SetWorkspaceContext(ctx string)
- type GoalOutput
- type GoalResult
- type InterruptBuffer
- type InterruptMessage
- type MetricsCollector
- type ObservationExtractor
- type ObservationStore
- type Result
- type Status
- type SwarmContext
- func (sc *SwarmContext) AddDiscussMessage(taskID string, msg DiscussMessage)
- func (sc *SwarmContext) FormatForLLM(relevantTaskID string) string
- func (sc *SwarmContext) GetAgentStates() []AgentState
- func (sc *SwarmContext) GetCompleted() map[string]DiscussMessage
- func (sc *SwarmContext) GetDiscussion(taskID string) []DiscussMessage
- func (sc *SwarmContext) GetDiscussions() []string
- func (sc *SwarmContext) UpdateAgent(agentID, status, capability, currentTask string, timestamp time.Time)
- type TaskDiscussion
- type XMLContextBuilder
- func (b *XMLContextBuilder) AddConvergenceIteration(n int, output string)
- func (b *XMLContextBuilder) AddDiscussContribution(id, capability string, round int, output string)
- func (b *XMLContextBuilder) AddPriorGoal(id, output string)
- func (b *XMLContextBuilder) Build() string
- func (b *XMLContextBuilder) SetConvergenceMode()
- func (b *XMLContextBuilder) SetCorrection(correction string)
- func (b *XMLContextBuilder) SetCurrentGoal(id, description string)
- func (b *XMLContextBuilder) SetDiscussTaskID(taskID string)
Constants ¶
const InformationProcessingGuidance = `` /* 642-byte string literal not displayed */
InformationProcessingGuidance is prepended to all system prompts. Based on Shannon's information theory: the LLM should weight surprising/contradictory signals more heavily than routine/expected ones across all context it receives.
const OrchestratorSystemPromptPrefix = `` /* 901-byte string literal not displayed */
OrchestratorSystemPromptPrefix returns the prefix to inject when spawn_agent is available.
const ScratchpadGuidancePrefix = `` /* 839-byte string literal not displayed */
ScratchpadGuidancePrefix is injected when scratchpad tools are available.
const SemanticMemoryGuidancePrefix = `` /* 831-byte string literal not displayed */
SemanticMemoryGuidancePrefix is injected when semantic memory tools are available.
const TersenessGuidance = `` /* 939-byte string literal not displayed */
TersenessGuidance is prepended to execution prompts to reduce verbosity.
Variables ¶
This section is empty.
Functions ¶
func BuildTaskContext ¶
BuildTaskContext builds XML context for a dynamic sub-agent task. All data content is escaped to prevent injection attacks.
func BuildTaskContextWithCorrection ¶
BuildTaskContextWithCorrection builds XML context for a sub-agent task with supervisor correction. All data content is escaped to prevent injection attacks.
func BuildTaskContextWithPriorGoals ¶
func BuildTaskContextWithPriorGoals(role, parentGoal, task string, priorGoals []GoalOutput) string
BuildTaskContextWithPriorGoals builds XML context for a sub-agent task including prior goal outputs. This ensures sub-agents have access to the workflow context, not just the raw task. All data content is escaped to prevent injection attacks.
func BuildWorkspaceContext ¶ added in v0.5.1
BuildWorkspaceContext scans the workspace directory and produces a concise context string describing the project layout, type, and key files. This is injected into the system prompt so agents don't waste cycles discovering the project structure.
func FormatInterruptsBlock ¶ added in v0.5.1
func FormatInterruptsBlock(goalDescription string, interrupts []InterruptMessage) string
FormatInterruptsBlock formats buffered interrupt messages into an XML block for injection into the LLM's next turn. The goalDescription provides the <context> element — just the current goal, nothing more.
Types ¶
type AgentContribution ¶
type AgentContribution struct {
ID string // Agent identifier
Capability string // Agent's announced capability
Round int // Which round this was (0 = first contribution from another agent)
Output string // The agent's output
}
AgentContribution represents an agent's output in a discuss round.
type AgentIdentity ¶
type AgentIdentity struct {
Name string // Agent name (e.g., "researcher", "writer", or workflow name for main)
Role string // Agent role (for dynamic sub-agents, same as name; for static agents, defined role)
}
AgentIdentity holds agent name and role for logging/attribution.
type AgentState ¶ added in v0.5.1
type AgentState struct {
AgentID string
Status string // "replay", "monitoring", "deliberating", "executing"
Capability string
CurrentTask string
LastSeen time.Time
}
AgentState represents an agent's current state as observed from heartbeats.
type ConvergenceIteration ¶
type ConvergenceIteration struct {
N int // Iteration number (1-indexed)
Output string // The output from this iteration
}
ConvergenceIteration represents a completed convergence iteration.
type ConvergenceResult ¶
type ConvergenceResult struct {
Converged bool // true if the goal converged before hitting the limit
Iterations int // number of iterations executed
Output string // final output (last substantive iteration)
}
ConvergenceResult tracks the outcome of a convergence goal.
type DiscussMessage ¶ added in v0.5.1
type DiscussMessage struct {
From string
Timestamp time.Time
Content string
Signal string // "CLAIM", "NEED_INFO", "DONE", or empty
}
DiscussMessage represents a single message in a task's discussion log.
type Executor ¶
type Executor struct {
// Callbacks
OnGoalStart func(name string)
OnGoalComplete func(name string, output string)
OnToolCall func(name string, args map[string]interface{}, result interface{}, agentRole string)
OnToolError func(name string, args map[string]interface{}, err error, agentRole string)
OnLLMError func(err error)
OnSkillLoaded func(name string)
OnMCPToolCall func(server, tool string, args map[string]interface{}, result interface{})
OnSubAgentStart func(name string, input map[string]string)
OnSubAgentComplete func(name string, output string)
OnSupervisionEvent func(stepID string, phase string, data interface{})
// contains filtered or unexported fields
}
Executor executes workflows.
func NewExecutor ¶
func NewExecutor(wf *agentfile.Workflow, provider llm.Provider, registry *tools.Registry, pol *policy.Policy) *Executor
NewExecutor creates a new executor.
func NewExecutorWithFactory ¶
func NewExecutorWithFactory(wf *agentfile.Workflow, factory llm.ProviderFactory, registry *tools.Registry, pol *policy.Policy) *Executor
NewExecutorWithFactory creates an executor with a provider factory for profile support.
func (*Executor) AddUntrustedContent ¶
AddUntrustedContent registers untrusted content with the security verifier.
func (*Executor) AddUntrustedContentWithTaint ¶
func (e *Executor) AddUntrustedContentWithTaint(ctx context.Context, content, source string, taintedBy []string)
AddUntrustedContentWithTaint registers untrusted content with explicit taint lineage.
func (*Executor) ClearDiscussPublisher ¶ added in v0.5.1
func (e *Executor) ClearDiscussPublisher()
ClearDiscussPublisher removes the discuss publisher (e.g., after task completes).
func (*Executor) ClearEventPublisher ¶ added in v0.5.1
func (e *Executor) ClearEventPublisher()
ClearEventPublisher removes the event publisher.
func (*Executor) GetConvergenceFailures ¶
GetConvergenceFailures returns goals that failed to converge.
func (*Executor) InterruptBuffer ¶ added in v0.5.1
func (e *Executor) InterruptBuffer() *InterruptBuffer
InterruptBuffer returns the current interrupt buffer (may be nil).
func (*Executor) LogBashSecurity ¶
func (e *Executor) LogBashSecurity(command, step string, allowed bool, reason string, durationMs int64, inputTokens, outputTokens int)
LogBashSecurity logs a bash security decision to the session. This is called by the bash security checker callback.
func (*Executor) PreFlight ¶
PreFlight checks if the workflow can execute successfully. Returns an error if SUPERVISED HUMAN steps exist but no human connection is available.
func (*Executor) SetDebug ¶
SetDebug enables verbose logging of prompts, responses, and tool outputs. When disabled (default), content is redacted to prevent PII leakage in production.
func (*Executor) SetDiscussPublisher ¶ added in v0.5.1
SetDiscussPublisher attaches a callback that publishes non-tool-call LLM responses to the swarm discuss channel. In non-swarm mode, leave unset (nil) — the publish call short-circuits with zero overhead. The caller typically binds the task ID in a closure.
func (*Executor) SetEventPublisher ¶ added in v0.5.1
SetEventPublisher attaches a callback that fires for every session event. Used by swarm mode to publish structured events to NATS in real time.
func (*Executor) SetInterruptBuffer ¶ added in v0.5.1
func (e *Executor) SetInterruptBuffer(buf *InterruptBuffer)
SetInterruptBuffer attaches an interrupt buffer for swarm collaboration. When set, the executor drains the buffer between LLM turns and injects interrupt messages into the context. A nil buffer disables interrupts.
func (*Executor) SetMCPManager ¶
SetMCPManager sets the MCP manager for external tool access.
func (*Executor) SetMetricsCollector ¶
func (e *Executor) SetMetricsCollector(mc MetricsCollector)
SetObservationExtraction enables observation extraction and storage for semantic memory. SetMetricsCollector sets the metrics collector for heartbeat reporting.
func (*Executor) SetObservationExtraction ¶
func (e *Executor) SetObservationExtraction(extractor ObservationExtractor, store ObservationStore)
func (*Executor) SetPersistentSession ¶ added in v0.5.1
SetPersistentSession marks the session as long-lived (serve mode). When set, Run() flushes but does not close the session — the caller is responsible for closing it on shutdown.
func (*Executor) SetSecurityResearchScope ¶
SetSecurityResearchScope sets the security research scope for defensive framing. When set, system prompts will include context indicating authorized security research.
func (*Executor) SetSecurityVerifier ¶
SetSecurityVerifier sets the security verifier for tool call verification.
func (*Executor) SetSession ¶
func (e *Executor) SetSession(sess *session.Session, mgr session.SessionManager)
SetSession sets the session for logging events and starts the batched writer.
func (*Executor) SetSupervision ¶
func (e *Executor) SetSupervision(store *checkpoint.Store, supervisorProvider llm.Provider, humanAvailable bool, humanInputChan chan string)
SetSupervision configures supervision for the executor.
func (*Executor) SetTimeouts ¶
SetTimeouts configures timeouts for network operations. Values are in seconds. Zero means use default (60s for MCP/WebFetch, 30s for WebSearch).
func (*Executor) SetWorkspaceContext ¶ added in v0.5.1
SetWorkspaceContext sets pre-computed workspace context that gets injected into the system prompt, so the agent knows the project layout without discovery.
type GoalOutput ¶
type GoalOutput struct {
ID string // Goal identifier
Output string // The LLM's response for this goal
}
GoalOutput represents a completed goal's output for context building.
type GoalResult ¶
GoalResult contains the result of executing a goal.
type InterruptBuffer ¶ added in v0.5.1
type InterruptBuffer struct {
// contains filtered or unexported fields
}
InterruptBuffer is a thread-safe FIFO queue that collects discuss.* messages arriving during execution. The NATS subscriber writes to it; the executor drains it between LLM turns.
A nil buffer is valid and indicates non-swarm mode — all operations short-circuit immediately with zero overhead.
func NewInterruptBuffer ¶ added in v0.5.1
func NewInterruptBuffer() *InterruptBuffer
NewInterruptBuffer creates a new interrupt buffer.
func (*InterruptBuffer) Drain ¶ added in v0.5.1
func (b *InterruptBuffer) Drain() []InterruptMessage
Drain removes and returns all buffered messages. Returns nil if empty. Safe to call from any goroutine.
func (*InterruptBuffer) Len ¶ added in v0.5.1
func (b *InterruptBuffer) Len() int
Len returns the current buffer size.
func (*InterruptBuffer) Push ¶ added in v0.5.1
func (b *InterruptBuffer) Push(msg InterruptMessage)
Push adds a message to the buffer. Safe to call from any goroutine.
type InterruptMessage ¶ added in v0.5.1
type InterruptMessage struct {
From string // Agent name or "operator"
Timestamp time.Time // When the message was published
Content string // Raw message content
TaskID string // Task ID from the NATS subject
}
InterruptMessage represents a message received from discuss.* during execution.
type MetricsCollector ¶
type MetricsCollector interface {
RecordLLMCall(inputTokens, outputTokens, cacheCreation, cacheRead int, latencyMs int64)
RecordSupervision(approved bool)
SetSubagents(count int)
}
MetricsCollector receives LLM and supervision metrics for heartbeat reporting.
type ObservationExtractor ¶
type ObservationExtractor interface {
Extract(ctx context.Context, stepName, stepType, output string) (interface{}, error)
}
ObservationExtractor extracts observations from step outputs.
type ObservationStore ¶
type ObservationStore interface {
StoreObservation(ctx context.Context, obs interface{}) error
QueryRelevantObservations(ctx context.Context, query string, limit int) ([]interface{}, error)
}
ObservationStore stores and retrieves observations.
type Result ¶
type Result struct {
Status Status
Outputs map[string]string
Iterations map[string]int
Error string
}
Result represents the execution result.
type SwarmContext ¶ added in v0.5.1
type SwarmContext struct {
// contains filtered or unexported fields
}
SwarmContext maintains the agent's personal, ephemeral view of the swarm. It is passively updated from NATS message handlers and read during deliberation and interrupt processing.
Thread-safe — written by NATS handler goroutines, read by the executor.
func NewSwarmContext ¶ added in v0.5.1
func NewSwarmContext() *SwarmContext
NewSwarmContext creates an empty swarm context.
func (*SwarmContext) AddDiscussMessage ¶ added in v0.5.1
func (sc *SwarmContext) AddDiscussMessage(taskID string, msg DiscussMessage)
AddDiscussMessage records a discuss message for a task.
func (*SwarmContext) FormatForLLM ¶ added in v0.5.1
func (sc *SwarmContext) FormatForLLM(relevantTaskID string) string
FormatForLLM formats the swarm context as a compact text block for injection into LLM context during deliberation or interrupt processing.
func (*SwarmContext) GetAgentStates ¶ added in v0.5.1
func (sc *SwarmContext) GetAgentStates() []AgentState
GetAgentStates returns a snapshot of all agent states.
func (*SwarmContext) GetCompleted ¶ added in v0.5.1
func (sc *SwarmContext) GetCompleted() map[string]DiscussMessage
GetCompleted returns completed task IDs and their DONE messages.
func (*SwarmContext) GetDiscussion ¶ added in v0.5.1
func (sc *SwarmContext) GetDiscussion(taskID string) []DiscussMessage
GetDiscussion returns the discussion log for a task.
func (*SwarmContext) GetDiscussions ¶ added in v0.5.1
func (sc *SwarmContext) GetDiscussions() []string
GetDiscussions returns all task IDs that have active discussions.
func (*SwarmContext) UpdateAgent ¶ added in v0.5.1
func (sc *SwarmContext) UpdateAgent(agentID, status, capability, currentTask string, timestamp time.Time)
UpdateAgent updates an agent's state from a heartbeat.
type TaskDiscussion ¶ added in v0.5.1
type TaskDiscussion struct {
TaskID string
Messages []DiscussMessage
}
TaskDiscussion holds the discussion history for a single task.
type XMLContextBuilder ¶
type XMLContextBuilder struct {
// contains filtered or unexported fields
}
XMLContextBuilder builds XML-structured prompts for LLM communication.
func NewXMLContextBuilder ¶
func NewXMLContextBuilder(workflowName string) *XMLContextBuilder
NewXMLContextBuilder creates a new context builder for a workflow.
func (*XMLContextBuilder) AddConvergenceIteration ¶
func (b *XMLContextBuilder) AddConvergenceIteration(n int, output string)
AddConvergenceIteration adds a completed convergence iteration to the context.
func (*XMLContextBuilder) AddDiscussContribution ¶
func (b *XMLContextBuilder) AddDiscussContribution(id, capability string, round int, output string)
AddDiscussContribution adds an agent's output from a discuss round.
func (*XMLContextBuilder) AddPriorGoal ¶
func (b *XMLContextBuilder) AddPriorGoal(id, output string)
AddPriorGoal adds a completed goal's output to the context.
func (*XMLContextBuilder) Build ¶
func (b *XMLContextBuilder) Build() string
Build generates the XML-structured prompt. All data content is escaped to prevent injection attacks.
func (*XMLContextBuilder) SetConvergenceMode ¶
func (b *XMLContextBuilder) SetConvergenceMode()
SetConvergenceMode enables convergence mode for the context builder.
func (*XMLContextBuilder) SetCorrection ¶
func (b *XMLContextBuilder) SetCorrection(correction string)
SetCorrection sets the supervisor correction for the current goal.
func (*XMLContextBuilder) SetCurrentGoal ¶
func (b *XMLContextBuilder) SetCurrentGoal(id, description string)
SetCurrentGoal sets the current goal to be executed.
func (*XMLContextBuilder) SetDiscussTaskID ¶
func (b *XMLContextBuilder) SetDiscussTaskID(taskID string)
SetDiscussTaskID sets the task ID for discuss context.