Documentation
¶
Overview ¶
Package transport provides pluggable transports for JSON-RPC 2.0 communication.
Overview ¶
The transport package implements bidirectional message passing over various backends while maintaining JSON-RPC 2.0 as the protocol. All transports implement the Transport interface with channel-based APIs for Go-idiomatic concurrent use.
Available Transports ¶
- StdioTransport: Communication over stdin/stdout (for CLI tools)
- WebSocketTransport: Bidirectional over WebSocket (for real-time UIs)
- SSETransport: Server-Sent Events + HTTP POST (for web clients)
Usage ¶
All transports follow the same pattern:
t := transport.NewStdioTransport(os.Stdin, os.Stdout, transport.DefaultConfig())
go t.Run(ctx)
for msg := range t.Recv() {
if msg.Request != nil {
// Handle request, send response
t.Send(&transport.OutboundMessage{
Response: &transport.Response{
JSONRPC: "2.0",
ID: msg.Request.ID,
Result: result,
},
})
}
}
Design Decisions ¶
- Channel-based API: Go-idiomatic for concurrent use
- JSON-RPC 2.0: Protocol for all transports
- Reconnection: Handled by agent implementations, not agentkit
Thread Safety ¶
All transport methods are safe for concurrent use. The Recv() channel is closed when the transport shuts down.
Package transport provides JSON-RPC 2.0 stdio transport.
Package transport provides pluggable transports for JSON-RPC 2.0 communication.
The Transport interface enables bidirectional message passing over various backends (stdio, WebSocket, SSE) while maintaining JSON-RPC 2.0 as the protocol.
Index ¶
- Constants
- Variables
- func MarshalOutbound(msg *OutboundMessage) ([]byte, error)
- func NewWebSocketUpgrader() *websocket.Upgrader
- type Config
- type Error
- type GoalCompleteParams
- type GoalStartedParams
- type Handler
- type HandlerFunc
- type InboundMessage
- type Notification
- type OutboundMessage
- type Request
- type Response
- type RunParams
- type RunResult
- type SSEClient
- type SSEConfig
- type SSETransport
- func (t *SSETransport) Close() error
- func (t *SSETransport) HandlePost(w http.ResponseWriter, r *http.Request)
- func (t *SSETransport) HandleSSE(w http.ResponseWriter, r *http.Request)
- func (t *SSETransport) Recv() <-chan *InboundMessage
- func (t *SSETransport) Run(ctx context.Context) error
- func (t *SSETransport) Send(msg *OutboundMessage) error
- type Server
- type StdioTransport
- type ToolCallParams
- type Transport
- type WebSocketConfig
- type WebSocketTransport
Constants ¶
const ( ParseError = -32700 InvalidRequest = -32600 MethodNotFound = -32601 InvalidParams = -32602 InternalError = -32603 )
Standard error codes
const ( EventGoalStarted = "goal_started" EventGoalComplete = "goal_complete" EventToolCall = "tool_call" EventLoopIteration = "loop_iteration" EventError = "error" )
Event types for notifications
Variables ¶
var ( ErrClosed = errors.New("transport closed") ErrSendTimeout = errors.New("send timeout") )
Common errors.
Functions ¶
func MarshalOutbound ¶
func MarshalOutbound(msg *OutboundMessage) ([]byte, error)
MarshalOutbound serializes an OutboundMessage to JSON.
func NewWebSocketUpgrader ¶
NewWebSocketUpgrader creates an upgrader for accepting WebSocket connections.
Types ¶
type Config ¶
type Config struct {
// RecvBufferSize is the size of the receive channel buffer.
// Default: 100
RecvBufferSize int
// SendBufferSize is the size of the internal send buffer.
// Default: 100
SendBufferSize int
}
Config holds common transport configuration.
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns configuration with sensible defaults.
type Error ¶
type Error struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}
Error represents a JSON-RPC 2.0 error.
type GoalCompleteParams ¶
type GoalCompleteParams struct {
SessionID string `json:"session_id"`
Goal string `json:"goal"`
Output string `json:"output"`
}
GoalCompleteParams are params for goal_complete event.
type GoalStartedParams ¶
GoalStartedParams are params for goal_started event.
type Handler ¶
type Handler interface {
Handle(ctx context.Context, method string, params json.RawMessage) (interface{}, error)
}
Handler handles JSON-RPC requests.
type HandlerFunc ¶
type HandlerFunc func(ctx context.Context, method string, params json.RawMessage) (interface{}, error)
HandlerFunc is a function adapter for Handler.
func (HandlerFunc) Handle ¶
func (f HandlerFunc) Handle(ctx context.Context, method string, params json.RawMessage) (interface{}, error)
type InboundMessage ¶
type InboundMessage struct {
// Request is set if this is a JSON-RPC request (has ID).
Request *Request
// Notification is set if this is a notification (no ID).
Notification *Notification
// Raw contains the original bytes for passthrough scenarios.
Raw json.RawMessage
}
InboundMessage wraps an incoming JSON-RPC message.
func ParseInbound ¶
func ParseInbound(data []byte) (*InboundMessage, error)
ParseInbound parses raw JSON into an InboundMessage.
type Notification ¶
type Notification struct {
JSONRPC string `json:"jsonrpc"`
Method string `json:"method"`
Params interface{} `json:"params,omitempty"`
}
Notification represents a JSON-RPC 2.0 notification (no ID).
type OutboundMessage ¶
type OutboundMessage struct {
// Response is set when replying to a request.
Response *Response
// Notification is set when sending an unsolicited notification.
Notification *Notification
}
OutboundMessage wraps an outgoing JSON-RPC message.
type Request ¶
type Request struct {
JSONRPC string `json:"jsonrpc"`
ID interface{} `json:"id,omitempty"`
Method string `json:"method"`
Params json.RawMessage `json:"params,omitempty"`
}
Request represents a JSON-RPC 2.0 request.
type Response ¶
type Response struct {
JSONRPC string `json:"jsonrpc"`
ID interface{} `json:"id,omitempty"`
Result interface{} `json:"result,omitempty"`
Error *Error `json:"error,omitempty"`
}
Response represents a JSON-RPC 2.0 response.
type RunParams ¶
type RunParams struct {
File string `json:"file"`
Inputs map[string]string `json:"inputs,omitempty"`
}
RunParams are the parameters for the "run" method.
type RunResult ¶
type RunResult struct {
SessionID string `json:"session_id"`
Status string `json:"status"`
Outputs map[string]string `json:"outputs,omitempty"`
Iterations map[string]int `json:"iterations,omitempty"`
Error string `json:"error,omitempty"`
}
RunResult is the result of the "run" method.
type SSEClient ¶
type SSEClient struct {
// contains filtered or unexported fields
}
SSEClient connects to an SSE endpoint and receives messages.
func NewSSEClient ¶
NewSSEClient creates a client for connecting to an SSE endpoint.
func (*SSEClient) Recv ¶
func (c *SSEClient) Recv() <-chan *InboundMessage
Recv returns the channel for incoming messages.
type SSEConfig ¶
type SSEConfig struct {
Config // Embed base config
// FlushInterval for SSE writes.
FlushInterval time.Duration
// HeartbeatInterval sends SSE comments as keepalive (0 = disabled).
HeartbeatInterval time.Duration
}
SSEConfig holds SSE transport configuration.
func DefaultSSEConfig ¶
func DefaultSSEConfig() SSEConfig
DefaultSSEConfig returns configuration with sensible defaults.
type SSETransport ¶
type SSETransport struct {
// contains filtered or unexported fields
}
SSETransport implements Transport using Server-Sent Events for server→client and HTTP POST for client→server communication.
func NewSSETransport ¶
func NewSSETransport(cfg SSEConfig) *SSETransport
NewSSETransport creates a new SSE transport.
func (*SSETransport) Close ¶
func (t *SSETransport) Close() error
Close initiates graceful shutdown.
func (*SSETransport) HandlePost ¶
func (t *SSETransport) HandlePost(w http.ResponseWriter, r *http.Request)
HandlePost is an HTTP handler for receiving JSON-RPC requests. Mount this at your request endpoint (e.g., /rpc).
func (*SSETransport) HandleSSE ¶
func (t *SSETransport) HandleSSE(w http.ResponseWriter, r *http.Request)
HandleSSE is an HTTP handler for SSE connections. Mount this at your SSE endpoint (e.g., /events).
func (*SSETransport) Recv ¶
func (t *SSETransport) Recv() <-chan *InboundMessage
Recv returns the channel for incoming messages.
func (*SSETransport) Run ¶
func (t *SSETransport) Run(ctx context.Context) error
Run starts the transport, blocking until shutdown.
func (*SSETransport) Send ¶
func (t *SSETransport) Send(msg *OutboundMessage) error
Send queues a message for delivery to all connected SSE clients.
type Server ¶
type Server struct {
// NotifyFunc is called to send notifications
NotifyFunc func(method string, params interface{})
// contains filtered or unexported fields
}
Server is a JSON-RPC 2.0 server over stdio.
type StdioTransport ¶
type StdioTransport struct {
// contains filtered or unexported fields
}
StdioTransport implements Transport over stdin/stdout.
func NewStdioTransport ¶
NewStdioTransport creates a new stdio transport.
func (*StdioTransport) Close ¶
func (t *StdioTransport) Close() error
Close initiates graceful shutdown.
func (*StdioTransport) Recv ¶
func (t *StdioTransport) Recv() <-chan *InboundMessage
Recv returns the channel for incoming messages.
func (*StdioTransport) Run ¶
func (t *StdioTransport) Run(ctx context.Context) error
Run starts the transport, blocking until shutdown.
func (*StdioTransport) Send ¶
func (t *StdioTransport) Send(msg *OutboundMessage) error
Send queues a message for delivery.
type ToolCallParams ¶
type ToolCallParams struct {
SessionID string `json:"session_id"`
Goal string `json:"goal"`
Tool string `json:"tool"`
Args map[string]interface{} `json:"args"`
Result interface{} `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
ToolCallParams are params for tool_call event.
type Transport ¶
type Transport interface {
// Recv returns channel for incoming messages.
// Channel is closed when transport shuts down.
Recv() <-chan *InboundMessage
// Send queues a message for delivery.
// Returns ErrClosed if transport is closed.
Send(msg *OutboundMessage) error
// Run starts the transport, blocks until ctx cancelled or error.
// Returns nil on graceful shutdown, error otherwise.
Run(ctx context.Context) error
// Close initiates graceful shutdown.
// Drains pending sends before returning.
Close() error
}
Transport provides bidirectional JSON-RPC message passing.
type WebSocketConfig ¶
type WebSocketConfig struct {
Config // Embed base config
// WriteTimeout for write operations.
WriteTimeout time.Duration
// ReadTimeout for read operations (0 = no timeout).
ReadTimeout time.Duration
// MaxMessageSize limits incoming message size.
MaxMessageSize int64
// PingInterval for keepalive pings (0 = disabled).
PingInterval time.Duration
}
WebSocketConfig holds WebSocket transport configuration.
func DefaultWebSocketConfig ¶
func DefaultWebSocketConfig() WebSocketConfig
DefaultWebSocketConfig returns configuration with sensible defaults.
type WebSocketTransport ¶
type WebSocketTransport struct {
// contains filtered or unexported fields
}
WebSocketTransport implements Transport over WebSocket.
func NewWebSocketTransport ¶
func NewWebSocketTransport(conn *websocket.Conn, cfg WebSocketConfig) *WebSocketTransport
NewWebSocketTransport creates a transport from an existing connection.
func (*WebSocketTransport) Close ¶
func (t *WebSocketTransport) Close() error
Close initiates graceful shutdown.
func (*WebSocketTransport) Recv ¶
func (t *WebSocketTransport) Recv() <-chan *InboundMessage
Recv returns the channel for incoming messages.
func (*WebSocketTransport) Run ¶
func (t *WebSocketTransport) Run(ctx context.Context) error
Run starts the transport, blocking until shutdown.
func (*WebSocketTransport) Send ¶
func (t *WebSocketTransport) Send(msg *OutboundMessage) error
Send queues a message for delivery.