transport

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package transport provides pluggable transports for JSON-RPC 2.0 communication.

Overview

The transport package implements bidirectional message passing over various backends while maintaining JSON-RPC 2.0 as the protocol. All transports implement the Transport interface with channel-based APIs for Go-idiomatic concurrent use.

Available Transports

  • StdioTransport: Communication over stdin/stdout (for CLI tools)
  • WebSocketTransport: Bidirectional over WebSocket (for real-time UIs)
  • SSETransport: Server-Sent Events + HTTP POST (for web clients)

Usage

All transports follow the same pattern:

t := transport.NewStdioTransport(os.Stdin, os.Stdout, transport.DefaultConfig())
go t.Run(ctx)

for msg := range t.Recv() {
    if msg.Request != nil {
        // Handle request, send response
        t.Send(&transport.OutboundMessage{
            Response: &transport.Response{
                JSONRPC: "2.0",
                ID:      msg.Request.ID,
                Result:  result,
            },
        })
    }
}

Design Decisions

  • Channel-based API: Go-idiomatic for concurrent use
  • JSON-RPC 2.0: Protocol for all transports
  • Reconnection: Handled by agent implementations, not agentkit

Thread Safety

All transport methods are safe for concurrent use. The Recv() channel is closed when the transport shuts down.

Package transport provides JSON-RPC 2.0 stdio transport.

Package transport provides pluggable transports for JSON-RPC 2.0 communication.

The Transport interface enables bidirectional message passing over various backends (stdio, WebSocket, SSE) while maintaining JSON-RPC 2.0 as the protocol.

Index

Constants

View Source
const (
	ParseError     = -32700
	InvalidRequest = -32600
	MethodNotFound = -32601
	InvalidParams  = -32602
	InternalError  = -32603
)

Standard error codes

View Source
const (
	EventGoalStarted   = "goal_started"
	EventGoalComplete  = "goal_complete"
	EventToolCall      = "tool_call"
	EventLoopIteration = "loop_iteration"
	EventError         = "error"
)

Event types for notifications

Variables

View Source
var (
	ErrClosed      = errors.New("transport closed")
	ErrSendTimeout = errors.New("send timeout")
)

Common errors.

Functions

func MarshalOutbound

func MarshalOutbound(msg *OutboundMessage) ([]byte, error)

MarshalOutbound serializes an OutboundMessage to JSON.

func NewWebSocketUpgrader

func NewWebSocketUpgrader() *websocket.Upgrader

NewWebSocketUpgrader creates an upgrader for accepting WebSocket connections.

Types

type Config

type Config struct {
	// RecvBufferSize is the size of the receive channel buffer.
	// Default: 100
	RecvBufferSize int

	// SendBufferSize is the size of the internal send buffer.
	// Default: 100
	SendBufferSize int
}

Config holds common transport configuration.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns configuration with sensible defaults.

type Error

type Error struct {
	Code    int         `json:"code"`
	Message string      `json:"message"`
	Data    interface{} `json:"data,omitempty"`
}

Error represents a JSON-RPC 2.0 error.

func (*Error) Error

func (e *Error) Error() string

Error implements the error interface.

type GoalCompleteParams

type GoalCompleteParams struct {
	SessionID string `json:"session_id"`
	Goal      string `json:"goal"`
	Output    string `json:"output"`
}

GoalCompleteParams are params for goal_complete event.

type GoalStartedParams

type GoalStartedParams struct {
	SessionID string `json:"session_id"`
	Goal      string `json:"goal"`
}

GoalStartedParams are params for goal_started event.

type Handler

type Handler interface {
	Handle(ctx context.Context, method string, params json.RawMessage) (interface{}, error)
}

Handler handles JSON-RPC requests.

type HandlerFunc

type HandlerFunc func(ctx context.Context, method string, params json.RawMessage) (interface{}, error)

HandlerFunc is a function adapter for Handler.

func (HandlerFunc) Handle

func (f HandlerFunc) Handle(ctx context.Context, method string, params json.RawMessage) (interface{}, error)

type InboundMessage

type InboundMessage struct {
	// Request is set if this is a JSON-RPC request (has ID).
	Request *Request

	// Notification is set if this is a notification (no ID).
	Notification *Notification

	// Raw contains the original bytes for passthrough scenarios.
	Raw json.RawMessage
}

InboundMessage wraps an incoming JSON-RPC message.

func ParseInbound

func ParseInbound(data []byte) (*InboundMessage, error)

ParseInbound parses raw JSON into an InboundMessage.

type Notification

type Notification struct {
	JSONRPC string      `json:"jsonrpc"`
	Method  string      `json:"method"`
	Params  interface{} `json:"params,omitempty"`
}

Notification represents a JSON-RPC 2.0 notification (no ID).

type OutboundMessage

type OutboundMessage struct {
	// Response is set when replying to a request.
	Response *Response

	// Notification is set when sending an unsolicited notification.
	Notification *Notification
}

OutboundMessage wraps an outgoing JSON-RPC message.

type Request

type Request struct {
	JSONRPC string          `json:"jsonrpc"`
	ID      interface{}     `json:"id,omitempty"`
	Method  string          `json:"method"`
	Params  json.RawMessage `json:"params,omitempty"`
}

Request represents a JSON-RPC 2.0 request.

type Response

type Response struct {
	JSONRPC string      `json:"jsonrpc"`
	ID      interface{} `json:"id,omitempty"`
	Result  interface{} `json:"result,omitempty"`
	Error   *Error      `json:"error,omitempty"`
}

Response represents a JSON-RPC 2.0 response.

type RunParams

type RunParams struct {
	File   string            `json:"file"`
	Inputs map[string]string `json:"inputs,omitempty"`
}

RunParams are the parameters for the "run" method.

type RunResult

type RunResult struct {
	SessionID  string            `json:"session_id"`
	Status     string            `json:"status"`
	Outputs    map[string]string `json:"outputs,omitempty"`
	Iterations map[string]int    `json:"iterations,omitempty"`
	Error      string            `json:"error,omitempty"`
}

RunResult is the result of the "run" method.

type SSEClient

type SSEClient struct {
	// contains filtered or unexported fields
}

SSEClient connects to an SSE endpoint and receives messages.

func NewSSEClient

func NewSSEClient(url string, bufferSize int) *SSEClient

NewSSEClient creates a client for connecting to an SSE endpoint.

func (*SSEClient) Close

func (c *SSEClient) Close() error

Close closes the SSE client.

func (*SSEClient) Connect

func (c *SSEClient) Connect(ctx context.Context) error

Connect establishes the SSE connection and starts receiving.

func (*SSEClient) Recv

func (c *SSEClient) Recv() <-chan *InboundMessage

Recv returns the channel for incoming messages.

type SSEConfig

type SSEConfig struct {
	Config // Embed base config

	// FlushInterval for SSE writes.
	FlushInterval time.Duration

	// HeartbeatInterval sends SSE comments as keepalive (0 = disabled).
	HeartbeatInterval time.Duration
}

SSEConfig holds SSE transport configuration.

func DefaultSSEConfig

func DefaultSSEConfig() SSEConfig

DefaultSSEConfig returns configuration with sensible defaults.

type SSETransport

type SSETransport struct {
	// contains filtered or unexported fields
}

SSETransport implements Transport using Server-Sent Events for server→client and HTTP POST for client→server communication.

func NewSSETransport

func NewSSETransport(cfg SSEConfig) *SSETransport

NewSSETransport creates a new SSE transport.

func (*SSETransport) Close

func (t *SSETransport) Close() error

Close initiates graceful shutdown.

func (*SSETransport) HandlePost

func (t *SSETransport) HandlePost(w http.ResponseWriter, r *http.Request)

HandlePost is an HTTP handler for receiving JSON-RPC requests. Mount this at your request endpoint (e.g., /rpc).

func (*SSETransport) HandleSSE

func (t *SSETransport) HandleSSE(w http.ResponseWriter, r *http.Request)

HandleSSE is an HTTP handler for SSE connections. Mount this at your SSE endpoint (e.g., /events).

func (*SSETransport) Recv

func (t *SSETransport) Recv() <-chan *InboundMessage

Recv returns the channel for incoming messages.

func (*SSETransport) Run

func (t *SSETransport) Run(ctx context.Context) error

Run starts the transport, blocking until shutdown.

func (*SSETransport) Send

func (t *SSETransport) Send(msg *OutboundMessage) error

Send queues a message for delivery to all connected SSE clients.

type Server

type Server struct {

	// NotifyFunc is called to send notifications
	NotifyFunc func(method string, params interface{})
	// contains filtered or unexported fields
}

Server is a JSON-RPC 2.0 server over stdio.

func NewServer

func NewServer(r io.Reader, w io.Writer, handler Handler) *Server

NewServer creates a new JSON-RPC server.

func (*Server) Notify

func (s *Server) Notify(method string, params interface{})

Notify sends a notification to the client.

func (*Server) Serve

func (s *Server) Serve(ctx context.Context) error

Serve reads and handles requests until EOF or error.

type StdioTransport

type StdioTransport struct {
	// contains filtered or unexported fields
}

StdioTransport implements Transport over stdin/stdout.

func NewStdioTransport

func NewStdioTransport(r io.Reader, w io.Writer, cfg Config) *StdioTransport

NewStdioTransport creates a new stdio transport.

func (*StdioTransport) Close

func (t *StdioTransport) Close() error

Close initiates graceful shutdown.

func (*StdioTransport) Recv

func (t *StdioTransport) Recv() <-chan *InboundMessage

Recv returns the channel for incoming messages.

func (*StdioTransport) Run

func (t *StdioTransport) Run(ctx context.Context) error

Run starts the transport, blocking until shutdown.

func (*StdioTransport) Send

func (t *StdioTransport) Send(msg *OutboundMessage) error

Send queues a message for delivery.

type ToolCallParams

type ToolCallParams struct {
	SessionID string                 `json:"session_id"`
	Goal      string                 `json:"goal"`
	Tool      string                 `json:"tool"`
	Args      map[string]interface{} `json:"args"`
	Result    interface{}            `json:"result,omitempty"`
	Error     string                 `json:"error,omitempty"`
}

ToolCallParams are params for tool_call event.

type Transport

type Transport interface {
	// Recv returns channel for incoming messages.
	// Channel is closed when transport shuts down.
	Recv() <-chan *InboundMessage

	// Send queues a message for delivery.
	// Returns ErrClosed if transport is closed.
	Send(msg *OutboundMessage) error

	// Run starts the transport, blocks until ctx cancelled or error.
	// Returns nil on graceful shutdown, error otherwise.
	Run(ctx context.Context) error

	// Close initiates graceful shutdown.
	// Drains pending sends before returning.
	Close() error
}

Transport provides bidirectional JSON-RPC message passing.

type WebSocketConfig

type WebSocketConfig struct {
	Config // Embed base config

	// WriteTimeout for write operations.
	WriteTimeout time.Duration

	// ReadTimeout for read operations (0 = no timeout).
	ReadTimeout time.Duration

	// MaxMessageSize limits incoming message size.
	MaxMessageSize int64

	// PingInterval for keepalive pings (0 = disabled).
	PingInterval time.Duration
}

WebSocketConfig holds WebSocket transport configuration.

func DefaultWebSocketConfig

func DefaultWebSocketConfig() WebSocketConfig

DefaultWebSocketConfig returns configuration with sensible defaults.

type WebSocketTransport

type WebSocketTransport struct {
	// contains filtered or unexported fields
}

WebSocketTransport implements Transport over WebSocket.

func NewWebSocketTransport

func NewWebSocketTransport(conn *websocket.Conn, cfg WebSocketConfig) *WebSocketTransport

NewWebSocketTransport creates a transport from an existing connection.

func (*WebSocketTransport) Close

func (t *WebSocketTransport) Close() error

Close initiates graceful shutdown.

func (*WebSocketTransport) Recv

func (t *WebSocketTransport) Recv() <-chan *InboundMessage

Recv returns the channel for incoming messages.

func (*WebSocketTransport) Run

Run starts the transport, blocking until shutdown.

func (*WebSocketTransport) Send

Send queues a message for delivery.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL