Documentation
¶
Overview ¶
Package lplex is a CAN bus HTTP bridge for NMEA 2000.
It reads raw CAN frames from a SocketCAN interface, reassembles fast-packets, tracks device discovery, and streams frames to clients over SSE with session management, filtering, and replay.
The package can be embedded into other Go services. Create a Broker to manage frame routing, a Server to expose the HTTP API, and optionally a JournalWriter to record frames to disk.
broker := lplex.NewBroker(lplex.BrokerConfig{
RingSize: 65536,
MaxBufferDuration: 5 * time.Minute,
Logger: logger,
})
go broker.Run(ctx)
srv := lplex.NewServer(broker, logger, sendpolicy.SendPolicy{})
mux.Handle("/nmea/", http.StripPrefix("/nmea", srv))
Feed frames into the broker via Broker.RxFrames:
broker.RxFrames() <- lplex.RxFrame{
Timestamp: time.Now(),
Header: lplex.CANHeader{Priority: 2, PGN: 129025, Source: 10, Destination: 0xFF},
Data: payload,
}
When done, close the rx channel and the broker goroutine exits:
broker.CloseRx()
Index ¶
- Constants
- Variables
- func ApplySlots(broker *Broker, slots []ClientSlot, logger *slog.Logger)
- func CANReader(ctx context.Context, iface string, rxFrames chan<- RxFrame, ...) error
- func CANWriter(ctx context.Context, iface string, txFrames <-chan TxRequest, ...) error
- func CompressHandler(next http.Handler) http.Handler
- func FragmentFastPacket(data []byte, seqCounter uint8) [][]byte
- func HealthHandler(cfg HealthConfig) http.HandlerFunc
- func InitTracing(ctx context.Context, cfg TracingConfig) (shutdown func(context.Context) error, err error)
- func IsFastPacket(pgnNum uint32) bool
- func LivenessHandler() http.HandlerFunc
- func MetricsHandler(broker *Broker, replStatus func() *ReplicationStatus, ...) http.HandlerFunc
- func ParseISO8601Duration(s string) (time.Duration, error)
- func ReadinessHandler(cfg HealthConfig) http.HandlerFunc
- func SDNotify(state string) bool
- func SDReady() bool
- func SDWatchdog() bool
- func Tracer(name string) trace.Tracer
- type AlertEvent
- type AlertManager
- func (am *AlertManager) Fire(event AlertEvent)
- func (am *AlertManager) FireBusResumed(silenceDuration time.Duration)
- func (am *AlertManager) FireBusSilence(lastFrame time.Time, elapsed time.Duration)
- func (am *AlertManager) FireDeviceRemoved(src uint8, dev *Device)
- func (am *AlertManager) FireReplicationDisconnected(reason string)
- func (am *AlertManager) FireReplicationReconnected()
- func (am *AlertManager) Run(ctx context.Context)
- type AlertManagerConfig
- type AlertType
- type BlockWriter
- type BlockWriterConfig
- type Broker
- func (b *Broker) AckSession(id string, seq uint64) error
- func (b *Broker) CloseRx()
- func (b *Broker) CreateSession(id string, bufferTimeout time.Duration, filter *EventFilter) (*ClientSession, uint64)
- func (b *Broker) CurrentSeq() uint64
- func (b *Broker) Devices() *DeviceRegistry
- func (b *Broker) Done() <-chan struct{}
- func (b *Broker) GetSession(id string) *ClientSession
- func (b *Broker) LastFrameTime() time.Time
- func (b *Broker) NewConsumer(cfg ConsumerConfig) *Consumer
- func (b *Broker) QueueTx(req TxRequest) bool
- func (b *Broker) Run(ctx context.Context)
- func (b *Broker) RxFrames() chan<- RxFrame
- func (b *Broker) SendISORequest(bus string, dst uint8, pgn uint32) error
- func (b *Broker) SetAlerts(am *AlertManager)
- func (b *Broker) SetJournal(ch chan<- RxFrame)
- func (b *Broker) Stats() BrokerStats
- func (b *Broker) Subscribe(filter *EventFilter) (*subscriber, func())
- func (b *Broker) TouchSession(id string)
- func (b *Broker) TxFrames() <-chan TxRequest
- func (b *Broker) Values() *ValueStore
- func (b *Broker) VirtualDevices() *VirtualDeviceManager
- type BrokerConfig
- type BrokerHealth
- type BrokerStats
- type BusSilenceMonitor
- type BusSource
- type CANBus
- type CANHeader
- type ClaimState
- type ClientSession
- type ClientSlot
- type ClientSlotConfig
- type ComponentHealth
- type Consumer
- type ConsumerConfig
- type DecodedDeviceValues
- type DecodedPGNValue
- type Device
- type DeviceNotFoundError
- type DeviceRegistry
- func (r *DeviceRegistry) ExpireIdle(cutoff time.Time) []BusSource
- func (r *DeviceRegistry) Get(bus string, source uint8) *Device
- func (r *DeviceRegistry) HandleAddressClaim(bus string, source uint8, data []byte) (dev *Device, evictedSrc uint8, evicted bool)
- func (r *DeviceRegistry) HandleProductInfo(bus string, source uint8, data []byte) *Device
- func (r *DeviceRegistry) RecordPacket(bus string, source uint8, ts time.Time, dataLen int) bool
- func (r *DeviceRegistry) Snapshot() []Device
- func (r *DeviceRegistry) SnapshotJSON() json.RawMessage
- func (r *DeviceRegistry) SourcesMissingProductInfo() []BusSource
- func (r *DeviceRegistry) SynthesizeFrames(ts time.Time) []RxFrame
- type DeviceValues
- type EventFilter
- type EventLog
- type FastPacketAssembler
- type Frame
- type HealthConfig
- type HealthStatus
- type HoleTracker
- type InstanceManager
- func (im *InstanceManager) Get(id string) *InstanceState
- func (im *InstanceManager) GetOrCreate(id string) *InstanceState
- func (im *InstanceManager) List() []InstanceSummary
- func (im *InstanceManager) SetDeviceIdleTimeout(d time.Duration)
- func (im *InstanceManager) SetInstancePaused(instanceID string, paused bool)
- func (im *InstanceManager) SetJournalRotation(duration time.Duration, size int64)
- func (im *InstanceManager) SetOnRotate(fn func(instanceID string, rf keeper.RotatedFile))
- func (im *InstanceManager) SetRingSize(size int)
- func (im *InstanceManager) Shutdown()
- type InstanceState
- type InstanceStatus
- type InstanceSummary
- type JournalConfig
- type JournalWriter
- type JournalWriterStats
- type LoopbackBus
- type MQTTBridge
- type MQTTBridgeConfig
- type PGNValue
- type ReplicationClient
- type ReplicationClientConfig
- type ReplicationEvent
- type ReplicationEventType
- type ReplicationHealth
- type ReplicationServer
- func (s *ReplicationServer) Backfill(stream pb.Replication_BackfillServer) error
- func (s *ReplicationServer) GetInstanceBroker(instanceID string) *Broker
- func (s *ReplicationServer) GetInstanceState(instanceID string) *InstanceState
- func (s *ReplicationServer) Handshake(ctx context.Context, req *pb.HandshakeRequest) (*pb.HandshakeResponse, error)
- func (s *ReplicationServer) Live(stream pb.Replication_LiveServer) error
- type ReplicationStatus
- type RxFrame
- type SeqRange
- type SequenceGapError
- type Server
- func (s *Server) HandleEphemeralSSE(w http.ResponseWriter, r *http.Request)
- func (s *Server) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))
- func (s *Server) HandleWebSocket(w http.ResponseWriter, r *http.Request)
- func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (s *Server) SetAPIKey(key string)
- func (s *Server) SetReadOnly(readOnly bool)
- func (s *Server) SetSendPolicy(policy sendpolicy.SendPolicy)
- func (s *Server) SetSendRateLimit(rps float64, burst int)
- type SessionNotFoundError
- type SlotFilterConfig
- type SocketCANBus
- type SyncState
- type TracingConfig
- type TxRequest
- type ValueStore
- func (vs *ValueStore) DecodedSnapshot(devices *DeviceRegistry, filter *EventFilter) []DecodedDeviceValues
- func (vs *ValueStore) DecodedSnapshotJSON(devices *DeviceRegistry, filter *EventFilter) json.RawMessage
- func (vs *ValueStore) Record(bus string, source uint8, pgn uint32, ts time.Time, data []byte, seq uint64)
- func (vs *ValueStore) RemoveSource(bus string, source uint8)
- func (vs *ValueStore) Snapshot(devices *DeviceRegistry, filter *EventFilter) []DeviceValues
- func (vs *ValueStore) SnapshotJSON(devices *DeviceRegistry, filter *EventFilter) json.RawMessage
- type VirtualDevice
- type VirtualDeviceConfig
- type VirtualDeviceManager
- func (m *VirtualDeviceManager) Add(cfg VirtualDeviceConfig)
- func (m *VirtualDeviceManager) ClaimedSource() (uint8, bool)
- func (m *VirtualDeviceManager) Devices() []VirtualDeviceStatus
- func (m *VirtualDeviceManager) HandleBusClaim(source uint8, name uint64) bool
- func (m *VirtualDeviceManager) HandleISORequest(dst uint8, requestedPGN uint32, requesterSrc uint8)
- func (m *VirtualDeviceManager) Heartbeat()
- func (m *VirtualDeviceManager) ProductInfoPayload(source uint8) []byte
- func (m *VirtualDeviceManager) Ready() bool
- func (m *VirtualDeviceManager) StartAfterDiscovery(ctx context.Context, delay time.Duration)
- type VirtualDeviceStatus
- type VirtualProductInfo
Constants ¶
const ( // DefaultClaimHeartbeat is how often held virtual devices re-broadcast // their address claim (PGN 60928). DefaultClaimHeartbeat = 60 * time.Second // DefaultProductInfoHeartbeat is how often held virtual devices // re-broadcast product info (PGN 126996). Longer interval since it's a // 134-byte fast-packet. DefaultProductInfoHeartbeat = 5 * time.Minute )
const DefaultLagCheckInterval = 1000
DefaultLagCheckInterval controls how often the boat checks for lag in the live send loop. Checked every N frames sent rather than by wall clock because when lagging, consumer.Next() returns instantly and the loop spins at CPU speed. At max bus rate (2000 fps) this checks roughly every 0.5s.
const DefaultMaxFrameRate = 2000
NMEA 2000 runs on CAN 2.0B at 250 kbit/s. An extended frame (29-bit ID, 8-byte payload) is roughly 131-157 bits depending on bit stuffing. Theoretical max is ~1800 frames/sec. We allow 2000 to give ~10% headroom for measurement jitter and bit-stuffing variance.
const DefaultMaxLiveLag uint64 = 10_000
DefaultMaxLiveLag is the frame count threshold for live lag detection. If the live stream falls this far behind the broker head (boat-side) or the boat's reported head (cloud-side), the stream is killed and the gap switches to backfill mode. ~5 seconds at max bus rate.
const DefaultMinLagReconnectInterval = 30 * time.Second
DefaultMinLagReconnectInterval prevents thrashing when the system is persistently overloaded. If lag keeps recurring, we wait at least this long between lag-triggered reconnects.
const DefaultRateBurst = 500
DefaultRateBurst is the burst allowance for transient spikes. Power-on storms (every device announces simultaneously) can briefly exceed the sustained rate. 500 frames absorbs a ~250ms burst at max bus load.
Variables ¶
var ( ParseCANID = canbus.ParseCANID BuildCANID = canbus.BuildCANID )
var ErrFallenBehind = errors.New("consumer fallen behind: data no longer available")
ErrFallenBehind is returned by Consumer.Next when the consumer's cursor has fallen behind both the ring buffer and available journal files.
Functions ¶
func ApplySlots ¶ added in v0.4.0
func ApplySlots(broker *Broker, slots []ClientSlot, logger *slog.Logger)
ApplySlots creates pre-configured client sessions on the broker.
func CANReader ¶
func CANReader(ctx context.Context, iface string, rxFrames chan<- RxFrame, logger *slog.Logger) error
CANReader reads frames from SocketCAN, reassembles fast-packets, and sends completed frames to the broker's rxFrames channel. The iface name (e.g. "can0") is used as the Bus tag on each frame.
func CANWriter ¶
func CANWriter(ctx context.Context, iface string, txFrames <-chan TxRequest, logger *slog.Logger) error
CANWriter reads from the broker's txFrames channel and writes to SocketCAN. Handles fast-packet fragmentation for payloads > 8 bytes.
func CompressHandler ¶ added in v0.4.0
CompressHandler wraps an http.Handler with gzip compression. Responses are compressed when the client sends Accept-Encoding: gzip and the response Content-Type is not text/event-stream (SSE streams are left uncompressed because per-event gzip flushing adds overhead without meaningful compression on small JSON payloads).
func FragmentFastPacket ¶
FragmentFastPacket splits a payload into CAN frames for fast-packet TX. seqCounter should be incremented per transfer (wraps at 7, 3-bit field). Returns a slice of 8-byte CAN frame payloads.
func HealthHandler ¶ added in v0.3.1
func HealthHandler(cfg HealthConfig) http.HandlerFunc
HealthHandler returns an http.HandlerFunc that serves the /healthz endpoint. It returns the full health status including broker, replication, and component health. For Kubernetes, prefer /livez and /readyz instead.
func InitTracing ¶ added in v0.4.0
func InitTracing(ctx context.Context, cfg TracingConfig) (shutdown func(context.Context) error, err error)
InitTracing sets up the OpenTelemetry TracerProvider and returns a shutdown function. If tracing is disabled, sets a no-op provider and returns a no-op shutdown.
func IsFastPacket ¶
IsFastPacket returns true if the PGN uses fast-packet transfer.
func LivenessHandler ¶ added in v0.4.0
func LivenessHandler() http.HandlerFunc
LivenessHandler returns an http.HandlerFunc for /livez. It reports whether the process is alive. Always returns 200 OK with a minimal JSON body. Suitable for Kubernetes livenessProbe.
func MetricsHandler ¶ added in v0.3.1
func MetricsHandler(broker *Broker, replStatus func() *ReplicationStatus, journalStats func() *JournalWriterStats) http.HandlerFunc
MetricsHandler returns an http.HandlerFunc that serves Prometheus-format metrics from the broker's Stats(). Optional callbacks provide journal and replication metrics for deployments that use those features.
func ParseISO8601Duration ¶
ParseISO8601Duration parses a subset of ISO 8601 durations (PT format). Supports hours (H), minutes (M), and seconds (S). Examples: "PT5M", "PT1H30M", "PT30S", "PT1H"
func ReadinessHandler ¶ added in v0.4.0
func ReadinessHandler(cfg HealthConfig) http.HandlerFunc
ReadinessHandler returns an http.HandlerFunc for /readyz. It reports whether the service is ready to handle traffic by checking CAN bus activity and replication connectivity. Returns 200 for "ok" or "degraded", 503 for "unhealthy". Suitable for Kubernetes readinessProbe.
func SDNotify ¶ added in v0.4.0
SDNotify sends a notification to systemd via the NOTIFY_SOCKET. Returns false if NOTIFY_SOCKET is not set (not running under systemd). This is a minimal implementation that avoids external dependencies.
func SDReady ¶ added in v0.4.0
func SDReady() bool
SDReady sends READY=1 to systemd, indicating the service is ready.
func SDWatchdog ¶ added in v0.4.0
func SDWatchdog() bool
SDWatchdog sends WATCHDOG=1 to systemd, resetting the watchdog timer.
Types ¶
type AlertEvent ¶ added in v0.4.0
type AlertEvent struct {
Timestamp time.Time `json:"timestamp"`
Type AlertType `json:"type"`
Severity string `json:"severity"` // "warning", "info"
Summary string `json:"summary"`
InstanceID string `json:"instance_id,omitempty"`
Details json.RawMessage `json:"details,omitempty"`
}
AlertEvent is the payload sent to the webhook endpoint.
type AlertManager ¶ added in v0.4.0
type AlertManager struct {
// contains filtered or unexported fields
}
AlertManager dispatches alert events to a webhook endpoint. Alerts are sent asynchronously via a buffered channel to avoid blocking the caller. Duplicate alerts of the same type are suppressed within the dedup window.
func NewAlertManager ¶ added in v0.4.0
func NewAlertManager(cfg AlertManagerConfig) *AlertManager
NewAlertManager creates an alert manager. Call Run to start dispatching. Returns nil if webhookURL is empty (alerting disabled).
func (*AlertManager) Fire ¶ added in v0.4.0
func (am *AlertManager) Fire(event AlertEvent)
Fire queues an alert for delivery. Non-blocking; drops the alert if the send buffer is full.
func (*AlertManager) FireBusResumed ¶ added in v0.4.0
func (am *AlertManager) FireBusResumed(silenceDuration time.Duration)
FireBusResumed is a convenience method for bus activity resumed alerts.
func (*AlertManager) FireBusSilence ¶ added in v0.4.0
func (am *AlertManager) FireBusSilence(lastFrame time.Time, elapsed time.Duration)
FireBusSilence is a convenience method for bus silence alerts.
func (*AlertManager) FireDeviceRemoved ¶ added in v0.4.0
func (am *AlertManager) FireDeviceRemoved(src uint8, dev *Device)
FireDeviceRemoved is a convenience method for device removal alerts.
func (*AlertManager) FireReplicationDisconnected ¶ added in v0.4.0
func (am *AlertManager) FireReplicationDisconnected(reason string)
FireReplicationDisconnected alerts on replication disconnect.
func (*AlertManager) FireReplicationReconnected ¶ added in v0.4.0
func (am *AlertManager) FireReplicationReconnected()
FireReplicationReconnected alerts when replication reconnects.
func (*AlertManager) Run ¶ added in v0.4.0
func (am *AlertManager) Run(ctx context.Context)
Run processes queued alerts until ctx is cancelled.
type AlertManagerConfig ¶ added in v0.4.0
type AlertManagerConfig struct {
WebhookURL string // HTTP POST target (empty = alerting disabled)
Timeout time.Duration // HTTP request timeout (default 5s)
DedupWindow time.Duration // suppress duplicate alerts within this window (default 5m)
InstanceID string // identifies the boat/instance
Logger *slog.Logger
}
AlertManagerConfig configures the alert manager.
type BlockWriter ¶
type BlockWriter struct {
// contains filtered or unexported fields
}
BlockWriter appends pre-encoded journal blocks to .lpj files. Unlike JournalWriter, it receives blocks that are already serialized (with CRC, device table, frame data). Used by the cloud replication server to write backfill blocks byte-for-byte without decompression or re-encoding.
func NewBlockWriter ¶
func NewBlockWriter(cfg BlockWriterConfig) (*BlockWriter, error)
NewBlockWriter creates a new BlockWriter. Call AppendBlock to write blocks. Call Close when done to finalize the current file.
func (*BlockWriter) AppendBlock ¶
func (w *BlockWriter) AppendBlock(baseSeq uint64, baseTimeUs int64, data []byte, compressed bool) error
AppendBlock writes a pre-encoded block to the current journal file. For compressed blocks, the data is the compressed payload (written with a 20-byte v2 header). For uncompressed blocks, data is the full block bytes (must be exactly BlockSize with valid CRC).
func (*BlockWriter) Close ¶
func (w *BlockWriter) Close() error
Close finalizes the current file (writes block index for compressed files, syncs, and closes). Safe to call multiple times or on a writer with no open file.
type BlockWriterConfig ¶
type BlockWriterConfig struct {
Dir string
Prefix string // default: "nmea2k"
BlockSize int // uncompressed block size (from source journal)
Compression journal.CompressionType
RotateDuration time.Duration // 0 = no limit
RotateSize int64 // 0 = no limit
OnRotate func(keeper.RotatedFile) // called after a journal file is closed by rotation
Logger *slog.Logger
}
BlockWriterConfig configures a BlockWriter.
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker is the central coordinator. Single goroutine reads from rxFrames, assigns sequence numbers, appends to ring buffer, updates device registry, and fans out to client sessions and ephemeral subscribers.
func NewBroker ¶
func NewBroker(cfg BrokerConfig) *Broker
NewBroker creates a new broker with the given config.
func (*Broker) AckSession ¶
AckSession updates the cursor for a session.
func (*Broker) CloseRx ¶
func (b *Broker) CloseRx()
CloseRx closes the rxFrames channel, signaling the broker to stop processing. Wait on Done() to know when the broker goroutine has actually exited.
func (*Broker) CreateSession ¶
func (b *Broker) CreateSession(id string, bufferTimeout time.Duration, filter *EventFilter) (*ClientSession, uint64)
CreateSession creates or retrieves a client session. Returns the session and the current head sequence number.
When bufferTimeout is 0, the session cursor is reset so no frames are replayed on the next connect (fresh start).
func (*Broker) CurrentSeq ¶
CurrentSeq returns the most recently assigned sequence number.
func (*Broker) Devices ¶
func (b *Broker) Devices() *DeviceRegistry
Devices returns the broker's device registry.
func (*Broker) Done ¶
func (b *Broker) Done() <-chan struct{}
Done returns a channel that is closed when the broker's Run() method returns.
func (*Broker) GetSession ¶
func (b *Broker) GetSession(id string) *ClientSession
GetSession returns a session by ID, or nil if not found.
func (*Broker) LastFrameTime ¶ added in v0.3.1
LastFrameTime returns the timestamp of the most recently received frame, or zero if no frames have been received yet.
func (*Broker) NewConsumer ¶
func (b *Broker) NewConsumer(cfg ConsumerConfig) *Consumer
NewConsumer creates a pull-based consumer starting at the given cursor. The consumer is registered with the broker for live notifications.
func (*Broker) QueueTx ¶ added in v0.4.0
QueueTx sends a frame to the CAN bus. Returns false if the TX queue is full. The frame will appear in the ring buffer, journal, and SSE when the SocketCAN echo comes back through CANReader.
func (*Broker) Run ¶
Run is the broker's main loop. Call in its own goroutine. Exits when ctx is cancelled or rxFrames is closed.
func (*Broker) RxFrames ¶
RxFrames returns the channel for submitting received CAN frames to the broker.
func (*Broker) SendISORequest ¶ added in v0.3.1
SendISORequest sends an ISO Request (PGN 59904) to the given destination, asking it to transmit the specified PGN. Returns an error if the tx queue is full. The bus parameter routes the request to the correct CAN interface (empty = default).
func (*Broker) SetAlerts ¶ added in v0.4.0
func (b *Broker) SetAlerts(am *AlertManager)
SetAlerts sets the alert manager for device removal notifications. Must be called before Run.
func (*Broker) SetJournal ¶
SetJournal sets the journal channel. Must be called before Run.
func (*Broker) Stats ¶ added in v0.3.1
func (b *Broker) Stats() BrokerStats
Stats returns a point-in-time snapshot of broker metrics.
func (*Broker) Subscribe ¶
func (b *Broker) Subscribe(filter *EventFilter) (*subscriber, func())
Subscribe creates an ephemeral fan-out channel with the given filter. Returns the subscriber and a cleanup function that must be called on disconnect.
func (*Broker) TouchSession ¶
TouchSession updates the LastActivity timestamp for a session.
func (*Broker) Values ¶
func (b *Broker) Values() *ValueStore
Values returns the broker's last-values store.
func (*Broker) VirtualDevices ¶ added in v0.4.0
func (b *Broker) VirtualDevices() *VirtualDeviceManager
VirtualDevices returns the broker's virtual device manager, or nil if disabled.
type BrokerConfig ¶
type BrokerConfig struct {
RingSize int // must be power of 2
MaxBufferDuration time.Duration // cap on client buffer_timeout
JournalDir string // directory containing .lpj files (for consumer journal fallback)
Logger *slog.Logger
// ReplicaMode makes the broker honor frame.Seq instead of auto-incrementing.
// Used by the cloud replication server where sequence numbers originate
// from the boat's broker.
ReplicaMode bool
// InitialHead sets the starting head value. Use this when resuming a
// replica broker from persisted state so the ring starts at the right
// position. Zero means start at 1 (the default).
InitialHead uint64
// DeviceIdleTimeout removes devices that haven't been seen for this
// duration. Zero means use the default (5 minutes). Use -1 to disable.
DeviceIdleTimeout time.Duration
// VirtualDevices configures virtual NMEA 2000 devices that claim
// addresses on the bus. Nil or empty means disabled.
VirtualDevices []VirtualDeviceConfig
// ClaimHeartbeat is how often virtual devices re-broadcast their address
// claim (PGN 60928). Zero uses DefaultClaimHeartbeat (60s).
ClaimHeartbeat time.Duration
// ProductInfoHeartbeat is how often virtual devices re-broadcast product
// info (PGN 126996). Zero uses DefaultProductInfoHeartbeat (5m).
ProductInfoHeartbeat time.Duration
}
BrokerConfig holds broker configuration.
type BrokerHealth ¶ added in v0.3.1
type BrokerHealth struct {
Status string `json:"status"` // "ok" or "unhealthy"
FramesTotal uint64 `json:"frames_total"`
HeadSeq uint64 `json:"head_seq"`
LastFrameTime time.Time `json:"last_frame_time,omitempty"`
DeviceCount int `json:"device_count"`
RingEntries uint64 `json:"ring_entries"`
RingCapacity int `json:"ring_capacity"`
}
BrokerHealth reports the broker's health.
type BrokerStats ¶ added in v0.3.1
type BrokerStats struct {
FramesTotal uint64 // total frames processed
LastFrameTime time.Time // timestamp of most recent frame (zero if none)
RingEntries uint64 // current entries in ring buffer
RingCapacity int // ring buffer size
HeadSeq uint64 // next sequence number
ActiveSessions int // buffered client sessions
ActiveSubscribers int // ephemeral SSE subscribers
ActiveConsumers int // pull-based consumers
DeviceCount int // discovered NMEA 2000 devices
JournalDrops uint64 // frames dropped due to full journal channel
DevicesAdded uint64 // cumulative device discovery events
DevicesRemoved uint64 // cumulative device eviction events
ConsumerMaxLag uint64 // max consumer lag (head - cursor) across all consumers
}
BrokerStats is a point-in-time snapshot of broker metrics.
type BusSilenceMonitor ¶ added in v0.3.1
type BusSilenceMonitor struct {
// contains filtered or unexported fields
}
BusSilenceMonitor watches for periods of CAN bus inactivity and logs alerts when no frames have been received for a configurable duration. This helps detect CAN bus disconnection or power issues on the boat.
func NewBusSilenceMonitor ¶ added in v0.3.1
func NewBusSilenceMonitor(timeout time.Duration, broker *Broker, logger *slog.Logger, alerts *AlertManager) *BusSilenceMonitor
NewBusSilenceMonitor creates a monitor that alerts when no frames have been received for the given timeout duration. The timeout must be positive. The optional AlertManager fires webhook alerts on silence/resume transitions.
func (*BusSilenceMonitor) IsSilent ¶ added in v0.3.1
func (m *BusSilenceMonitor) IsSilent() bool
IsSilent reports whether the bus is currently in a silence state.
func (*BusSilenceMonitor) Run ¶ added in v0.3.1
func (m *BusSilenceMonitor) Run(ctx context.Context)
Run checks for bus silence periodically and logs warnings. It exits when ctx is cancelled.
type CANBus ¶ added in v0.4.0
type CANBus interface {
// ReadFrames reads CAN frames from the bus, reassembles fast-packets,
// and sends completed frames to rxFrames. Blocks until ctx is cancelled
// or an unrecoverable error occurs.
ReadFrames(ctx context.Context, rxFrames chan<- RxFrame) error
// WriteFrames reads TX requests from txFrames and transmits them on the
// bus. Blocks until ctx is cancelled or txFrames is closed.
WriteFrames(ctx context.Context, txFrames <-chan TxRequest) error
// Name returns the bus name (e.g. "can0", "loopback0").
Name() string
}
CANBus provides read and write access to a CAN bus. Implementations handle the transport (SocketCAN, loopback, etc.) while the broker remains transport-agnostic.
type ClaimState ¶ added in v0.4.0
type ClaimState int
ClaimState represents where a virtual device is in the address claim lifecycle.
const ( // ClaimIdle is the initial state before any claim attempt. ClaimIdle ClaimState = iota // ClaimInProgress means a claim frame has been sent but the 250ms holdoff hasn't elapsed. ClaimInProgress // ClaimHeld means the address is successfully claimed and ready for use. ClaimHeld // ClaimCannotClaim means all 253 addresses (0-252) were exhausted. ClaimCannotClaim )
func (ClaimState) String ¶ added in v0.4.0
func (s ClaimState) String() string
type ClientSession ¶
type ClientSession struct {
ID string
Cursor uint64 // last ACK'd sequence number (0 = never ACK'd)
BufferTimeout time.Duration // how long to keep buffering after disconnect
LastActivity time.Time
Filter *EventFilter // nil = receive all frames
}
ClientSession tracks a buffered client's metadata for persistence across HTTP reconnects. The actual frame reading is done by Consumer.
type ClientSlot ¶ added in v0.4.0
type ClientSlot struct {
ID string `json:"id"`
BufferTimeout time.Duration `json:"buffer_timeout"`
Filter *EventFilter `json:"filter,omitempty"`
}
ClientSlot defines a pre-configured client session that is created at startup. This allows test environments and Docker containers to have named sessions ready before any HTTP client connects.
func ParseClientSlot ¶ added in v0.4.0
func ParseClientSlot(cfg ClientSlotConfig) (ClientSlot, error)
ParseClientSlot converts a ClientSlotConfig into a ClientSlot, parsing durations and hex NAME values.
type ClientSlotConfig ¶ added in v0.4.0
type ClientSlotConfig struct {
ID string `json:"id"`
BufferTimeout string `json:"buffer_timeout"`
Filter *SlotFilterConfig `json:"filter,omitempty"`
}
ClientSlotConfig is the JSON/HOCON-friendly representation of a client slot before duration parsing.
type ComponentHealth ¶ added in v0.3.1
type ComponentHealth struct {
Status string `json:"status"`
Message string `json:"message,omitempty"`
}
ComponentHealth is a generic component status.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is a pull-based reader that iterates frames at its own pace. It reads from a tiered log: journal files on disk (oldest), ring buffer in memory (recent), and live notification (current head, blocking wait).
func (*Consumer) Close ¶
Close stops the consumer and removes it from the broker. Safe to call multiple times.
type ConsumerConfig ¶
type ConsumerConfig struct {
Cursor uint64 // starting position (next seq to read)
Filter *EventFilter // nil = all frames
}
ConsumerConfig configures a new Consumer.
type DecodedDeviceValues ¶ added in v0.3.1
type DecodedDeviceValues struct {
Name string `json:"name"`
Bus string `json:"bus,omitempty"`
Source uint8 `json:"src"`
Manufacturer string `json:"manufacturer,omitempty"`
ModelID string `json:"model_id,omitempty"`
Values []DecodedPGNValue `json:"values"`
}
DecodedDeviceValues groups decoded PGN values by device.
type DecodedPGNValue ¶ added in v0.3.1
type DecodedPGNValue struct {
PGN uint32 `json:"pgn"`
Description string `json:"description"`
Ts string `json:"ts"`
Seq uint64 `json:"seq"`
Fields any `json:"fields"`
}
DecodedPGNValue is a single PGN's last-known value decoded into named fields.
type Device ¶
type Device struct {
Bus string `json:"bus,omitempty"`
Source uint8 `json:"src"`
NAME uint64 `json:"-"`
NAMEHex string `json:"name"`
Manufacturer string `json:"manufacturer"`
ManufacturerCode uint16 `json:"manufacturer_code"`
DeviceClass uint8 `json:"device_class"`
DeviceFunction uint8 `json:"device_function"`
DeviceInstance uint8 `json:"device_instance"`
UniqueNumber uint32 `json:"unique_number,omitempty"`
// PGN 126996 Product Information fields.
ModelID string `json:"model_id,omitempty"`
SoftwareVersion string `json:"software_version,omitempty"`
ModelVersion string `json:"model_version,omitempty"`
ModelSerial string `json:"model_serial,omitempty"`
ProductCode uint16 `json:"product_code,omitempty"`
// Per-source packet statistics.
FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"`
PacketCount uint64 `json:"packet_count"`
ByteCount uint64 `json:"byte_count"`
}
Device represents an NMEA 2000 device discovered via ISO Address Claim (PGN 60928) and optionally enriched with Product Information (PGN 126996).
type DeviceNotFoundError ¶ added in v0.4.0
DeviceNotFoundError indicates that no device was found at the given bus and source address.
func (*DeviceNotFoundError) Error ¶ added in v0.4.0
func (e *DeviceNotFoundError) Error() string
type DeviceRegistry ¶
type DeviceRegistry struct {
// contains filtered or unexported fields
}
DeviceRegistry tracks NMEA 2000 devices discovered via PGN 60928. Thread-safe for concurrent reads (SSE streams) and writes (broker goroutine). Devices are keyed by (bus, source address) to support multiple CAN interfaces.
func NewDeviceRegistry ¶
func NewDeviceRegistry() *DeviceRegistry
NewDeviceRegistry creates an empty device registry.
func (*DeviceRegistry) ExpireIdle ¶ added in v0.4.0
func (r *DeviceRegistry) ExpireIdle(cutoff time.Time) []BusSource
ExpireIdle removes all devices whose LastSeen is before cutoff. Returns the (bus, source) pairs of evicted entries.
func (*DeviceRegistry) Get ¶
func (r *DeviceRegistry) Get(bus string, source uint8) *Device
Get returns a snapshot of the device at the given (bus, source), or nil.
func (*DeviceRegistry) HandleAddressClaim ¶
func (r *DeviceRegistry) HandleAddressClaim(bus string, source uint8, data []byte) (dev *Device, evictedSrc uint8, evicted bool)
HandleAddressClaim processes a PGN 60928 ISO Address Claim. Returns the device if this is a new or changed device, nil otherwise. If a different source address on the same bus previously held the same NAME, that old entry is evicted and its source address is returned in evictedSrc.
func (*DeviceRegistry) HandleProductInfo ¶
func (r *DeviceRegistry) HandleProductInfo(bus string, source uint8, data []byte) *Device
HandleProductInfo processes a PGN 126996 Product Information response. Returns the device if fields changed, nil if source is unknown or unchanged.
func (*DeviceRegistry) RecordPacket ¶
RecordPacket updates per-source packet statistics. Returns true if this is a previously unseen (bus, source) pair. Source 254 (Cannot Claim Address) and 255 (broadcast) are ignored since they are not real devices.
func (*DeviceRegistry) Snapshot ¶
func (r *DeviceRegistry) Snapshot() []Device
Snapshot returns a copy of all known devices.
func (*DeviceRegistry) SnapshotJSON ¶
func (r *DeviceRegistry) SnapshotJSON() json.RawMessage
SnapshotJSON returns the device list as pre-serialized JSON.
func (*DeviceRegistry) SourcesMissingProductInfo ¶ added in v0.4.0
func (r *DeviceRegistry) SourcesMissingProductInfo() []BusSource
SourcesMissingProductInfo returns (bus, source) pairs of devices that have a NAME (address claim received) but no product info (PGN 126996) yet.
func (*DeviceRegistry) SynthesizeFrames ¶
func (r *DeviceRegistry) SynthesizeFrames(ts time.Time) []RxFrame
SynthesizeFrames generates RxFrame slices for PGN 60928 (Address Claim) and PGN 126996 (Product Info) from all known devices. Used to seed a remote broker's device registry on live stream connect.
type DeviceValues ¶
type DeviceValues struct {
Name string `json:"name"`
Bus string `json:"bus,omitempty"`
Source uint8 `json:"src"`
Manufacturer string `json:"manufacturer,omitempty"`
ModelID string `json:"model_id,omitempty"`
Values []PGNValue `json:"values"`
}
DeviceValues groups PGN values by device in the JSON response.
type EventFilter ¶
type EventFilter struct {
PGNs []uint32
ExcludePGNs []uint32
Manufacturers []string
Instances []uint8
Names []uint64 // 64-bit CAN NAMEs (include)
ExcludeNames []uint64 // 64-bit CAN NAMEs (exclude)
Buses []string // SocketCAN interface names (e.g. "can0", "can1")
}
EventFilter specifies which CAN frames a session receives. Categories are AND'd (all set categories must match), values within a category are OR'd (any value in the list matches).
func ParseFilterParams ¶
func ParseFilterParams(r *http.Request) (*EventFilter, error)
ParseFilterParams reads optional filter query params from a request. Supported params: pgn (decimal), manufacturer (name or code), instance (decimal), name (hex CAN NAME), bus (SocketCAN interface name). Returns nil filter if no params are set.
func (*EventFilter) IsEmpty ¶
func (f *EventFilter) IsEmpty() bool
IsEmpty returns true if no filter criteria are set.
type EventLog ¶
type EventLog struct {
// contains filtered or unexported fields
}
EventLog is a fixed-size ring buffer of replication events.
func (*EventLog) Recent ¶
func (l *EventLog) Recent(n int) []ReplicationEvent
Recent returns up to n events, newest first.
type FastPacketAssembler ¶
type FastPacketAssembler struct {
// contains filtered or unexported fields
}
FastPacketAssembler reassembles multi-frame fast-packet PGNs.
Fast-packet protocol:
- Frame 0: byte[0] = seq_counter(3 bits) | frame_number(5 bits), byte[1] = total_bytes, bytes[2:8] = first 6 data bytes
- Frame N: byte[0] = seq_counter(3 bits) | frame_number(5 bits), bytes[1:8] = next 7 data bytes
func NewFastPacketAssembler ¶
func NewFastPacketAssembler(timeout time.Duration) *FastPacketAssembler
NewFastPacketAssembler creates a new assembler with the given reassembly timeout.
func (*FastPacketAssembler) Process ¶
Process handles a CAN frame that is part of a fast-packet transfer. Returns the complete reassembled payload when all frames are received, nil otherwise.
func (*FastPacketAssembler) PurgeStale ¶
func (a *FastPacketAssembler) PurgeStale(now time.Time)
PurgeStale removes any in-progress assemblies older than the timeout.
type Frame ¶
type Frame struct {
Seq uint64
Timestamp time.Time
Header CANHeader
Bus string // SocketCAN interface name (empty for journal-replayed frames)
Data []byte // raw CAN payload
// contains filtered or unexported fields
}
Frame is a single CAN frame returned by Consumer.Next.
type HealthConfig ¶ added in v0.3.1
type HealthConfig struct {
Broker *Broker
ReplStatus func() *ReplicationStatus // nil if replication not configured
// BusSilenceThreshold is the duration after which no frames indicates
// a CAN bus problem. Zero disables bus silence detection.
BusSilenceThreshold time.Duration
}
HealthConfig configures the health check endpoint.
type HealthStatus ¶ added in v0.3.1
type HealthStatus struct {
Status string `json:"status"` // "ok", "degraded", or "unhealthy"
Broker BrokerHealth `json:"broker"`
Replication *ReplicationHealth `json:"replication,omitempty"`
Components map[string]ComponentHealth `json:"components,omitempty"`
}
HealthStatus is the structured response from the /healthz endpoint.
type HoleTracker ¶
type HoleTracker struct {
// contains filtered or unexported fields
}
HoleTracker tracks gaps (holes) in a sequence number space. Holes are non-overlapping, sorted by Start, and represent ranges of missing data.
Typical case: 0-3 holes. All operations are linear on the slice, which is perfectly fine for the expected cardinality.
func NewHoleTracker ¶
func NewHoleTracker() *HoleTracker
NewHoleTracker creates an empty hole tracker.
func (*HoleTracker) Add ¶
func (h *HoleTracker) Add(start, end uint64)
Add inserts a new hole [start, end). Merges with any overlapping or adjacent holes. No-op if start >= end.
func (*HoleTracker) ContinuousThrough ¶
func (h *HoleTracker) ContinuousThrough(cursor uint64) uint64
ContinuousThrough returns the highest sequence number with no holes below it. Given a base cursor and the hole set, this is the seq just before the first hole (or the cursor itself if no holes exist before it).
Example: cursor=100, holes=[(200,300)] -> returns 199 (continuous through 199) Example: cursor=100, holes=[(100,200)] -> returns 99 (hole starts at cursor) Example: cursor=100, no holes -> returns max uint64 (no bound from holes)
func (*HoleTracker) Fill ¶
func (h *HoleTracker) Fill(start, end uint64) bool
Fill marks [start, end) as received, removing that range from any holes. Returns true if any holes were actually affected.
func (*HoleTracker) Holes ¶
func (h *HoleTracker) Holes() []SeqRange
Holes returns a copy of current holes, sorted by Start.
func (*HoleTracker) TotalMissing ¶
func (h *HoleTracker) TotalMissing() uint64
TotalMissing returns the total number of missing sequence numbers across all holes.
type InstanceManager ¶
type InstanceManager struct {
// contains filtered or unexported fields
}
InstanceManager manages per-instance state on the cloud side.
func NewInstanceManager ¶
func NewInstanceManager(dataDir string, logger *slog.Logger) (*InstanceManager, error)
NewInstanceManager creates a new instance manager, loading any persisted state.
func (*InstanceManager) Get ¶
func (im *InstanceManager) Get(id string) *InstanceState
Get returns the instance state, or nil if not found.
func (*InstanceManager) GetOrCreate ¶
func (im *InstanceManager) GetOrCreate(id string) *InstanceState
GetOrCreate returns the instance state, creating it if necessary.
func (*InstanceManager) List ¶
func (im *InstanceManager) List() []InstanceSummary
List returns a snapshot of all instance IDs and their basic state.
func (*InstanceManager) SetDeviceIdleTimeout ¶ added in v0.4.0
func (im *InstanceManager) SetDeviceIdleTimeout(d time.Duration)
SetDeviceIdleTimeout configures the device idle timeout for all existing and future instance brokers. Must be called before instances connect.
func (*InstanceManager) SetInstancePaused ¶
func (im *InstanceManager) SetInstancePaused(instanceID string, paused bool)
SetInstancePaused pauses or unpauses journal writing for a specific instance. Used by the JournalKeeper overflow policy to stop/resume writes.
func (*InstanceManager) SetJournalRotation ¶ added in v0.3.1
func (im *InstanceManager) SetJournalRotation(duration time.Duration, size int64)
SetJournalRotation configures rotation for live journal writers. Must be called before any connections are accepted. Retroactively updates all existing instances loaded at startup.
func (*InstanceManager) SetOnRotate ¶
func (im *InstanceManager) SetOnRotate(fn func(instanceID string, rf keeper.RotatedFile))
SetOnRotate sets a callback invoked when any instance's journal or backfill file is rotated. Used by the cloud binary to feed the JournalKeeper. Must be called before any connections are accepted. Retroactively updates all existing instances loaded at startup.
func (*InstanceManager) SetRingSize ¶ added in v0.4.0
func (im *InstanceManager) SetRingSize(size int)
SetRingSize configures the ring buffer size for all existing and future instance brokers. Must be called before instances connect.
func (*InstanceManager) Shutdown ¶
func (im *InstanceManager) Shutdown()
Shutdown stops all instance brokers and persists state.
type InstanceState ¶
type InstanceState struct {
ID string `json:"id"`
Cursor uint64 `json:"cursor"` // continuous data through this seq
BoatHeadSeq uint64 `json:"boat_head_seq"` // last reported by boat
BoatJournalBytes uint64 `json:"boat_journal_bytes"`
LastSeen time.Time `json:"last_seen"`
Connected bool `json:"-"`
HoleTracker *HoleTracker `json:"-"`
// Persisted hole state
PersistedHoles []SeqRange `json:"holes,omitempty"`
// contains filtered or unexported fields
}
InstanceState tracks the replication state for a single boat instance on the cloud side. Thread-safe.
func (*InstanceState) RecentEvents ¶
func (s *InstanceState) RecentEvents(n int) []ReplicationEvent
RecentEvents returns up to n recent replication events, newest first.
func (*InstanceState) RecordEvent ¶
func (s *InstanceState) RecordEvent(typ ReplicationEventType, detail map[string]any)
RecordEvent appends a diagnostic event to this instance's event log.
func (*InstanceState) Status ¶
func (s *InstanceState) Status() InstanceStatus
Status returns a thread-safe snapshot of this instance's replication state.
type InstanceStatus ¶
type InstanceStatus struct {
ID string `json:"id"`
Connected bool `json:"connected"`
Cursor uint64 `json:"cursor"`
BoatHeadSeq uint64 `json:"boat_head_seq"`
BoatJournalBytes uint64 `json:"boat_journal_bytes"`
Holes []SeqRange `json:"holes,omitzero"`
LagSeqs uint64 `json:"lag_seqs"`
LastSeen time.Time `json:"last_seen"`
}
InstanceStatus is a detailed snapshot of an instance for status reporting.
type InstanceSummary ¶
type InstanceSummary struct {
ID string `json:"id"`
Connected bool `json:"connected"`
Cursor uint64 `json:"cursor"`
BoatHeadSeq uint64 `json:"boat_head_seq"`
Holes int `json:"holes"`
LagSeqs uint64 `json:"lag_seqs"`
LastSeen time.Time `json:"last_seen"`
}
InstanceSummary is a snapshot of an instance for listing.
type JournalConfig ¶
type JournalConfig struct {
Dir string
Prefix string // default: "nmea2k"
BlockSize int // default: 262144, power of 2, min 4096
Compression journal.CompressionType // default: CompressionNone
RotateDuration time.Duration // 0 = no limit
RotateSize int64 // 0 = no limit
RotateCount int64 // 0 = no limit
OnRotate func(keeper.RotatedFile) // called after a journal file is closed by rotation
Logger *slog.Logger
}
JournalConfig configures the journal writer.
type JournalWriter ¶
type JournalWriter struct {
// contains filtered or unexported fields
}
JournalWriter writes CAN frames to block-based journal files.
func NewJournalWriter ¶
func NewJournalWriter(cfg JournalConfig, devices *DeviceRegistry, ch <-chan RxFrame) (*JournalWriter, error)
NewJournalWriter creates a writer. Call Run to start.
func (*JournalWriter) Run ¶
func (w *JournalWriter) Run(ctx context.Context) error
Run is the main loop. Blocks until ctx is cancelled or the channel is closed.
func (*JournalWriter) SetPaused ¶
func (w *JournalWriter) SetPaused(p bool)
SetPaused sets whether the writer should discard incoming frames. Used by the overflow policy to stop writes when disk is full.
func (*JournalWriter) Stats ¶ added in v0.4.0
func (w *JournalWriter) Stats() JournalWriterStats
Stats returns a point-in-time snapshot of journal write metrics.
type JournalWriterStats ¶ added in v0.4.0
type JournalWriterStats struct {
BlocksWritten uint64 // total blocks flushed
BytesWritten uint64 // total bytes written to disk
LastBlockWriteDuration time.Duration // duration of last block write
}
JournalWriterStats holds point-in-time journal write metrics.
type LoopbackBus ¶ added in v0.4.0
type LoopbackBus struct {
// contains filtered or unexported fields
}
LoopbackBus implements CANBus as an in-memory loopback: transmitted frames are echoed back as received frames (matching SocketCAN's kernel echo behavior). Useful for testing and development on platforms without SocketCAN (e.g. macOS).
func NewLoopbackBus ¶ added in v0.4.0
func NewLoopbackBus(name string, bufSize int, logger *slog.Logger) *LoopbackBus
NewLoopbackBus creates a CANBus that echoes transmitted frames back as received frames. The buffer size controls how many frames can be queued via Inject before blocking.
func (*LoopbackBus) Inject ¶ added in v0.4.0
func (b *LoopbackBus) Inject(frame RxFrame) bool
Inject adds a frame to the bus as if it was received from CAN hardware. The Bus field is set to the loopback bus name. Non-blocking: drops the frame if the internal buffer is full.
func (*LoopbackBus) Name ¶ added in v0.4.0
func (b *LoopbackBus) Name() string
func (*LoopbackBus) ReadFrames ¶ added in v0.4.0
func (b *LoopbackBus) ReadFrames(ctx context.Context, rxFrames chan<- RxFrame) error
func (*LoopbackBus) WriteFrames ¶ added in v0.4.0
func (b *LoopbackBus) WriteFrames(ctx context.Context, txFrames <-chan TxRequest) error
type MQTTBridge ¶ added in v0.4.0
type MQTTBridge struct {
// contains filtered or unexported fields
}
MQTTBridge subscribes to the broker's frame stream and publishes each frame to an MQTT broker. Frames are published to per-PGN topics ({prefix}/frames/{pgn}) with the pre-serialized JSON payload.
func NewMQTTBridge ¶ added in v0.4.0
func NewMQTTBridge(cfg MQTTBridgeConfig, broker *Broker) *MQTTBridge
NewMQTTBridge creates a new MQTT bridge. Call Run to start publishing.
type MQTTBridgeConfig ¶ added in v0.4.0
type MQTTBridgeConfig struct {
// BrokerURL is the MQTT broker address (e.g. "tcp://localhost:1883").
BrokerURL string
// TopicPrefix is prepended to all published topics (default "lplex").
// Frames are published to {prefix}/frames/{pgn} or {prefix}/frames/all.
TopicPrefix string
// ClientID identifies this client to the MQTT broker (default "lplex-server").
ClientID string
// QoS is the MQTT quality of service level (0, 1, or 2; default 0).
QoS byte
// Username and Password for MQTT broker authentication (optional).
Username string
Password string
// Filter restricts which CAN frames are published.
Filter *EventFilter
// Logger for diagnostic output.
Logger *slog.Logger
}
MQTTBridgeConfig configures the MQTT publisher bridge.
type PGNValue ¶
type PGNValue struct {
PGN uint32 `json:"pgn"`
Ts string `json:"ts"`
Data string `json:"data"`
Seq uint64 `json:"seq"`
}
PGNValue is a single PGN's last-known value in the JSON response.
type ReplicationClient ¶
type ReplicationClient struct {
// contains filtered or unexported fields
}
ReplicationClient streams frames from the local broker to a cloud replication server over gRPC. It runs two independent streams: one for live frames (from the broker's head forward) and one for backfilling historical gaps (from journal files). On disconnect, it reconnects with exponential backoff and resumes both streams.
func NewReplicationClient ¶
func NewReplicationClient(cfg ReplicationClientConfig, broker *Broker) *ReplicationClient
NewReplicationClient creates a new replication client. Call Run to start.
func (*ReplicationClient) Run ¶
func (c *ReplicationClient) Run(ctx context.Context) error
Run is the main loop. Connects to the cloud, performs handshake, and starts live + backfill streams. Reconnects on failure with exponential backoff. Blocks until ctx is cancelled.
func (*ReplicationClient) SetAlerts ¶ added in v0.4.0
func (c *ReplicationClient) SetAlerts(am *AlertManager)
SetAlerts sets the alert manager for replication alerts. Must be called before Run.
func (*ReplicationClient) Status ¶
func (c *ReplicationClient) Status() ReplicationStatus
Status returns the current replication state for status reporting.
type ReplicationClientConfig ¶
type ReplicationClientConfig struct {
Target string // cloud gRPC address (host:port)
InstanceID string
CertFile string // client certificate
KeyFile string // client private key
CAFile string // CA certificate for verifying server
Logger *slog.Logger
// Resource protection tuning. Zero values use defaults from
// replication_limits.go.
MaxLiveLag uint64 // max frames live can lag before reconnect (default: DefaultMaxLiveLag)
LagCheckInterval int // check lag every N frames sent (default: DefaultLagCheckInterval)
MinLagReconnectInterval time.Duration // min wait between lag-triggered reconnects (default: DefaultMinLagReconnectInterval)
}
ReplicationClientConfig configures the boat-side replication client.
type ReplicationEvent ¶
type ReplicationEvent struct {
Time time.Time `json:"time"`
Type ReplicationEventType `json:"type"`
Detail map[string]any `json:"detail,omitempty"`
}
ReplicationEvent is a single diagnostic event from the replication pipeline.
type ReplicationEventType ¶
type ReplicationEventType string
ReplicationEventType identifies the kind of replication event.
const ( EventLiveStart ReplicationEventType = "live_start" EventLiveStop ReplicationEventType = "live_stop" EventBackfillStart ReplicationEventType = "backfill_start" EventBackfillStop ReplicationEventType = "backfill_stop" EventBlockReceived ReplicationEventType = "block_received" EventCheckpoint ReplicationEventType = "checkpoint" )
type ReplicationHealth ¶ added in v0.3.1
type ReplicationHealth struct {
Status string `json:"status"` // "ok", "degraded", or "disconnected"
Connected bool `json:"connected"`
LiveLag uint64 `json:"live_lag"`
BackfillRemaining uint64 `json:"backfill_remaining_seqs"`
LastAck time.Time `json:"last_ack,omitempty"`
}
ReplicationHealth reports the replication client's health.
type ReplicationServer ¶
type ReplicationServer struct {
pb.UnimplementedReplicationServer
// Resource protection tuning. Zero values use defaults from
// replication_limits.go.
MaxFrameRate float64 // max frames/sec per live stream (default: DefaultMaxFrameRate)
RateBurst int // burst allowance for transient spikes (default: DefaultRateBurst)
MaxLiveLag uint64 // max frames live can lag before closing stream (default: DefaultMaxLiveLag)
// contains filtered or unexported fields
}
ReplicationServer implements the gRPC Replication service.
func NewReplicationServer ¶
func NewReplicationServer(im *InstanceManager, logger *slog.Logger) *ReplicationServer
NewReplicationServer creates a new replication gRPC server.
func (*ReplicationServer) Backfill ¶
func (s *ReplicationServer) Backfill(stream pb.Replication_BackfillServer) error
Backfill handles bulk block transfer for filling gaps.
func (*ReplicationServer) GetInstanceBroker ¶
func (s *ReplicationServer) GetInstanceBroker(instanceID string) *Broker
GetInstanceBroker returns the broker for an instance (starting it if needed). Used by HTTP handlers to serve SSE from a cloud instance.
func (*ReplicationServer) GetInstanceState ¶
func (s *ReplicationServer) GetInstanceState(instanceID string) *InstanceState
GetInstanceState returns the instance state for status reporting.
func (*ReplicationServer) Handshake ¶
func (s *ReplicationServer) Handshake(ctx context.Context, req *pb.HandshakeRequest) (*pb.HandshakeResponse, error)
Handshake exchanges sync state between boat and cloud.
func (*ReplicationServer) Live ¶
func (s *ReplicationServer) Live(stream pb.Replication_LiveServer) error
Live handles the realtime frame stream from a boat.
type ReplicationStatus ¶
type ReplicationStatus struct {
Connected bool `json:"connected"`
InstanceID string `json:"instance_id"`
LocalHeadSeq uint64 `json:"local_head_seq"`
CloudCursor uint64 `json:"cloud_cursor"`
Holes []SeqRange `json:"holes,omitzero"`
LiveLag uint64 `json:"live_lag"`
BackfillRemainingSeqs uint64 `json:"backfill_remaining_seqs"`
LastAck time.Time `json:"last_ack,omitempty"`
LiveFramesSent uint64 `json:"live_frames_sent"`
BackfillBlocksSent uint64 `json:"backfill_blocks_sent"`
BackfillBytesSent uint64 `json:"backfill_bytes_sent"`
Reconnects uint64 `json:"reconnects"`
}
ReplicationStatus is the boat-side view of replication state.
type RxFrame ¶
type RxFrame struct {
Timestamp time.Time
Header CANHeader
Data []byte
Bus string // SocketCAN interface name (e.g. "can0"); empty for single-bus or unknown
Seq uint64 // assigned by broker in handleFrame; zero when fed by external code
}
RxFrame is a reassembled CAN frame ready for the broker.
type SequenceGapError ¶ added in v0.4.0
SequenceGapError indicates that journal data has a gap in sequence numbers. The consumer expected ExpectedSeq but the next available sequence was ActualSeq.
func (*SequenceGapError) Error ¶ added in v0.4.0
func (e *SequenceGapError) Error() string
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server handles HTTP API requests for lplex.
func NewServer ¶
func NewServer(broker *Broker, logger *slog.Logger, policy sendpolicy.SendPolicy) *Server
NewServer creates a new HTTP server wired to the given broker. Use SetAPIKey to enable authentication.
func (*Server) HandleEphemeralSSE ¶
func (s *Server) HandleEphemeralSSE(w http.ResponseWriter, r *http.Request)
HandleEphemeralSSE handles GET /events. Ephemeral SSE stream, no session, no ACK, no replay. Optional query params for filtering: pgn, manufacturer, instance, name (hex).
func (*Server) HandleFunc ¶
HandleFunc registers an additional HTTP handler on the server's mux.
func (*Server) HandleWebSocket ¶ added in v0.4.0
func (s *Server) HandleWebSocket(w http.ResponseWriter, r *http.Request)
HandleWebSocket upgrades an HTTP connection to a WebSocket and provides bidirectional communication: CAN frames flow to the client (filtered like /events), and the client can send CAN frames (like /send) on the same connection.
Query params for filtering are the same as /events: pgn, exclude_pgn, manufacturer, instance, name, exclude_name.
func (*Server) ServeHTTP ¶
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP implements http.Handler.
func (*Server) SetAPIKey ¶ added in v0.4.0
SetAPIKey enables API key authentication. When set, all HTTP requests must include the key via either the Authorization header (Bearer token) or the X-API-Key header. Health/liveness endpoints are exempt.
func (*Server) SetReadOnly ¶ added in v0.4.0
SetReadOnly completely disables the /send and /query endpoints, regardless of the send policy. This is a defense-in-depth kill switch.
func (*Server) SetSendPolicy ¶ added in v0.4.0
func (s *Server) SetSendPolicy(policy sendpolicy.SendPolicy)
SetSendPolicy updates the send policy at runtime (for config reload).
func (*Server) SetSendRateLimit ¶ added in v0.4.0
SetSendRateLimit enables rate limiting on the /send and /query endpoints. rps is the sustained requests per second; burst is the maximum burst size.
type SessionNotFoundError ¶ added in v0.4.0
type SessionNotFoundError struct {
SessionID string
}
SessionNotFoundError indicates that a client session ID was not found.
func (*SessionNotFoundError) Error ¶ added in v0.4.0
func (e *SessionNotFoundError) Error() string
type SlotFilterConfig ¶ added in v0.4.0
type SlotFilterConfig struct {
PGN []uint32 `json:"pgn,omitempty"`
ExcludePGN []uint32 `json:"exclude_pgn,omitempty"`
Manufacturer []string `json:"manufacturer,omitempty"`
Instance []uint8 `json:"instance,omitempty"`
Name []string `json:"name,omitempty"`
ExcludeName []string `json:"exclude_name,omitempty"`
Bus []string `json:"bus,omitempty"`
}
SlotFilterConfig is the JSON/HOCON-friendly representation of an EventFilter.
type SocketCANBus ¶ added in v0.4.0
type SocketCANBus struct {
// contains filtered or unexported fields
}
SocketCANBus implements CANBus using Linux SocketCAN.
func NewSocketCANBus ¶ added in v0.4.0
func NewSocketCANBus(iface string, logger *slog.Logger) *SocketCANBus
NewSocketCANBus creates a CANBus backed by a Linux SocketCAN interface.
func (*SocketCANBus) Name ¶ added in v0.4.0
func (b *SocketCANBus) Name() string
func (*SocketCANBus) ReadFrames ¶ added in v0.4.0
func (b *SocketCANBus) ReadFrames(ctx context.Context, rxFrames chan<- RxFrame) error
func (*SocketCANBus) WriteFrames ¶ added in v0.4.0
func (b *SocketCANBus) WriteFrames(ctx context.Context, txFrames <-chan TxRequest) error
type SyncState ¶
type SyncState struct {
Cursor uint64 // continuous data through this seq
Holes []SeqRange // sorted gaps
BoatHeadSeq uint64 // last reported by boat
BoatJournalBytes uint64
}
SyncState captures the replication state for an instance, used for persistence and handshake responses.
type TracingConfig ¶ added in v0.4.0
type TracingConfig struct {
// Enabled enables tracing. When false, a no-op tracer is used.
Enabled bool
// Endpoint is the OTLP gRPC collector endpoint (e.g. "localhost:4317").
Endpoint string
// ServiceName identifies this service in traces (e.g. "lplex-server", "lplex-cloud").
ServiceName string
// ServiceVersion is the build version.
ServiceVersion string
// SampleRatio controls probabilistic sampling (0.0 to 1.0).
// 1.0 = sample everything, 0.01 = sample 1%.
SampleRatio float64
}
TracingConfig configures OpenTelemetry distributed tracing.
type TxRequest ¶
type TxRequest struct {
Header CANHeader
Data []byte
Bus string // target SocketCAN interface; empty = default (first) bus
}
TxRequest is a frame to write to the CAN bus.
type ValueStore ¶
type ValueStore struct {
// contains filtered or unexported fields
}
ValueStore tracks the last-seen frame data for each (bus, source, PGN) tuple. The broker goroutine writes via Record; HTTP handlers read via Snapshot.
func (*ValueStore) DecodedSnapshot ¶ added in v0.3.1
func (vs *ValueStore) DecodedSnapshot(devices *DeviceRegistry, filter *EventFilter) []DecodedDeviceValues
DecodedSnapshot returns the current values grouped by device with PGN data decoded into named fields using the pgn.Registry. PGNs not in the registry or that fail to decode are omitted.
func (*ValueStore) DecodedSnapshotJSON ¶ added in v0.3.1
func (vs *ValueStore) DecodedSnapshotJSON(devices *DeviceRegistry, filter *EventFilter) json.RawMessage
DecodedSnapshotJSON returns the decoded snapshot as pre-serialized JSON.
func (*ValueStore) Record ¶
func (vs *ValueStore) Record(bus string, source uint8, pgn uint32, ts time.Time, data []byte, seq uint64)
Record updates the stored value for the given (bus, source, PGN). Called by the broker goroutine on every frame.
func (*ValueStore) RemoveSource ¶ added in v0.4.0
func (vs *ValueStore) RemoveSource(bus string, source uint8)
RemoveSource deletes all stored values for the given (bus, source) pair.
func (*ValueStore) Snapshot ¶
func (vs *ValueStore) Snapshot(devices *DeviceRegistry, filter *EventFilter) []DeviceValues
Snapshot returns the current values grouped by device, resolved against the device registry for NAME and manufacturer info. An optional filter restricts results by PGN, bus, and/or device criteria (manufacturer, name, instance).
func (*ValueStore) SnapshotJSON ¶
func (vs *ValueStore) SnapshotJSON(devices *DeviceRegistry, filter *EventFilter) json.RawMessage
SnapshotJSON returns the snapshot as pre-serialized JSON.
type VirtualDevice ¶ added in v0.4.0
type VirtualDevice struct {
// contains filtered or unexported fields
}
VirtualDevice is a single virtual NMEA 2000 device managed by the VirtualDeviceManager.
type VirtualDeviceConfig ¶ added in v0.4.0
type VirtualDeviceConfig struct {
// NAME is the 64-bit ISO NAME. Lower values win address conflicts.
NAME uint64
ProductInfo VirtualProductInfo
}
VirtualDeviceConfig configures a single virtual NMEA 2000 device.
type VirtualDeviceManager ¶ added in v0.4.0
type VirtualDeviceManager struct {
// contains filtered or unexported fields
}
VirtualDeviceManager manages a set of virtual NMEA 2000 devices that claim addresses on the CAN bus, making lplex-server a legitimate bus participant.
Thread safety: the manager is called from the broker's single goroutine for frame handling (HandleBusClaim, HandleISORequest) and from HTTP handlers for ClaimedSource/Ready. The mutex protects the device state for the latter case.
func NewVirtualDeviceManager ¶ added in v0.4.0
func NewVirtualDeviceManager(txFunc func(TxRequest), registry *DeviceRegistry, logger *slog.Logger, claimInterval, productInfoInterval time.Duration) *VirtualDeviceManager
NewVirtualDeviceManager creates a new manager. txFunc is called to send frames to the CAN bus. registry is consulted for address selection. claimInterval and productInfoInterval control heartbeat frequency; zero values use DefaultClaimHeartbeat and DefaultProductInfoHeartbeat.
func (*VirtualDeviceManager) Add ¶ added in v0.4.0
func (m *VirtualDeviceManager) Add(cfg VirtualDeviceConfig)
Add registers a virtual device configuration. Call before Start.
func (*VirtualDeviceManager) ClaimedSource ¶ added in v0.4.0
func (m *VirtualDeviceManager) ClaimedSource() (uint8, bool)
ClaimedSource returns the source address of the first virtual device that has successfully claimed an address. Returns (0, false) if none are claimed.
func (*VirtualDeviceManager) Devices ¶ added in v0.4.0
func (m *VirtualDeviceManager) Devices() []VirtualDeviceStatus
Devices returns a snapshot of virtual device states for diagnostics.
func (*VirtualDeviceManager) HandleBusClaim ¶ added in v0.4.0
func (m *VirtualDeviceManager) HandleBusClaim(source uint8, name uint64) bool
HandleBusClaim is called by the broker when a PGN 60928 address claim is received from the bus. It checks if any of our virtual devices conflict and resolves per NMEA 2000: lower NAME wins.
Returns true if the frame was handled (either as an echo or conflict) and should NOT be registered in the device registry.
func (*VirtualDeviceManager) HandleISORequest ¶ added in v0.4.0
func (m *VirtualDeviceManager) HandleISORequest(dst uint8, requestedPGN uint32, requesterSrc uint8)
HandleISORequest is called when PGN 59904 is received. If the request targets one of our virtual devices, we respond with the appropriate data.
func (*VirtualDeviceManager) Heartbeat ¶ added in v0.4.0
func (m *VirtualDeviceManager) Heartbeat()
Heartbeat re-broadcasts address claims and product info for all held virtual devices at their respective intervals. Called from the broker's ticker goroutine (single-threaded, no concurrent calls).
func (*VirtualDeviceManager) ProductInfoPayload ¶ added in v0.4.0
func (m *VirtualDeviceManager) ProductInfoPayload(source uint8) []byte
ProductInfoPayload returns the 134-byte PGN 126996 payload for the virtual device at the given source address, or nil if no device uses that address.
func (*VirtualDeviceManager) Ready ¶ added in v0.4.0
func (m *VirtualDeviceManager) Ready() bool
Ready returns true if at least one virtual device has claimed an address and the holdoff period has elapsed.
func (*VirtualDeviceManager) StartAfterDiscovery ¶ added in v0.4.0
func (m *VirtualDeviceManager) StartAfterDiscovery(ctx context.Context, delay time.Duration)
StartAfterDiscovery waits for the device table to populate (the broker broadcasts an ISO Request for PGN 60928 on startup), then claims addresses for all configured virtual devices.
Source Files
¶
- alerting.go
- block_writer.go
- broker.go
- bus_silence.go
- can.go
- can_bus.go
- canid.go
- compress.go
- consumer.go
- decode_sse.go
- devices.go
- doc.go
- errors.go
- fastpacket.go
- health.go
- history.go
- journal_writer.go
- metrics.go
- mqtt.go
- replication.go
- replication_client.go
- replication_events.go
- replication_limits.go
- replication_server.go
- sdnotify.go
- server.go
- slots.go
- tracing.go
- values.go
- virtual_device.go
- websocket.go
Directories
¶
| Path | Synopsis |
|---|---|
|
Package canbus provides shared NMEA 2000 / CAN bus types and functions importable by both the server internals and client tools.
|
Package canbus provides shared NMEA 2000 / CAN bus types and functions importable by both the server internals and client tools. |
|
cmd
|
|
|
journalbench
command
journalbench reads frames from a .lpj journal file and re-encodes them with every combination of block size and compression method to find the optimal parameters for CAN bus journal data.
|
journalbench reads frames from a .lpj journal file and re-encodes them with every combination of block size and compression method to find the optimal parameters for CAN bus journal data. |
|
lplex
command
|
|
|
lplex-cloud
command
|
|
|
lplex-server
command
|
|
|
pgngen
command
pgngen generates Go structs, Protocol Buffer definitions, and JSON Schema from NMEA 2000 PGN definition files written in the lplex PGN DSL.
|
pgngen generates Go structs, Protocol Buffer definitions, and JSON Schema from NMEA 2000 PGN definition files written in the lplex PGN DSL. |
|
Package filter provides a BPF-inspired expression language for filtering decoded NMEA 2000 frames by field values.
|
Package filter provides a BPF-inspired expression language for filtering decoded NMEA 2000 frames by field values. |
|
Package journal provides the block-based binary journal format (.lpj files) for recording and replaying NMEA 2000 CAN frames.
|
Package journal provides the block-based binary journal format (.lpj files) for recording and replaying NMEA 2000 CAN frames. |
|
Package lplexc provides a Go client for lplex, a CAN bus HTTP bridge for NMEA 2000.
|
Package lplexc provides a Go client for lplex, a CAN bus HTTP bridge for NMEA 2000. |
|
Package pgn provides generated Go types for decoding and encoding NMEA 2000 PGN messages.
|
Package pgn provides generated Go types for decoding and encoding NMEA 2000 PGN messages. |
|
Package pgngen provides a DSL parser and code generators for NMEA 2000 PGN definitions.
|
Package pgngen provides a DSL parser and code generators for NMEA 2000 PGN definitions. |
|
proto
|
|