lplex

package module
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2026 License: MIT Imports: 55 Imported by: 0

README

lplex

CAN bus HTTP bridge for NMEA 2000. Reads raw CAN frames from a SocketCAN interface, reassembles fast-packets, tracks device discovery, and streams frames to clients over SSE with session management, filtering, and replay. Supports cloud replication for remote access to boat data over intermittent connections.

  • Real-time SSE streaming with ephemeral and buffered session modes, per-client filtering by PGN, manufacturer, instance, or device name
  • Fast-packet reassembly for multi-frame NMEA 2000 PGNs, with automatic device discovery via ISO requests
  • PGN decoding of known NMEA 2000 message types into human-readable field values, with a DSL-based code generator supporting variant dispatch for proprietary PGNs and per-PGN metadata (fast-packet, transmission interval, on-demand)
  • Journal recording to block-based .lpj files with zstd compression, CRC32C checksums, and O(log N) time seeking
  • Retention and archival with max-age/min-keep/max-size knobs, soft/hard thresholds, configurable overflow policy, and pluggable archive scripts
  • Cloud replication over gRPC with mTLS, live + backfill streams, hole tracking, and lazy per-instance Broker on the cloud side
  • Pull-based Consumer with tiered replay (journal files → ring buffer → live), so clients can catch up from any point in history
  • Embeddable core as a Go package, mount the HTTP handler on any ServeMux
  • Go client library (lplexc) with mDNS discovery, subscriptions, device queries, and transmit
  • TypeScript client library (@sixfathoms/lplex) for browsers and Node.js, with CloudClient for lplex-cloud
  • CAN transmit via POST /send with automatic fast-packet fragmentation

Installation

Client (lplex)
# Homebrew (macOS / Linux)
brew install sixfathoms/tap/lplex

# From source
go install github.com/sixfathoms/lplex/cmd/lplex@latest
Server (Linux only, requires SocketCAN)
# Debian/Ubuntu (.deb includes both lplex-server and lplex)
sudo dpkg -i lplex_*.deb
sudo systemctl start lplex-server

# Docker
docker run --network host --device /dev/can0 ghcr.io/sixfathoms/lplex:latest

# From source
go install github.com/sixfathoms/lplex/cmd/lplex-server@latest
Cloud Server
# From source
go install github.com/sixfathoms/lplex/cmd/lplex-cloud@latest

Download .deb packages from GitHub Releases.

Go Client Library
go get github.com/sixfathoms/lplex/lplexc@latest
TypeScript Client Library
npm install @sixfathoms/lplex

Zero runtime dependencies. Works in browsers and Node 18+. Ships ESM, CJS, and TypeScript declarations. See @sixfathoms/lplex on npm.

Embedding lplex

The core package is importable, so you can embed lplex into your own service:

go get github.com/sixfathoms/lplex@latest
import (
    "log/slog"
    "net/http"
    "time"

    "github.com/sixfathoms/lplex"
)

func main() {
    logger := slog.Default()

    // Create the broker (owns ring buffer, device registry, fan-out).
    broker := lplex.NewBroker(lplex.BrokerConfig{
        RingSize:          65536,
        MaxBufferDuration: 5 * time.Minute,
        Logger:            logger,
    })
    go broker.Run()

    // Mount the HTTP handler on a sub-path.
    srv := lplex.NewServer(broker, logger)
    mux := http.NewServeMux()
    mux.Handle("/nmea/", http.StripPrefix("/nmea", srv))

    // Feed frames from your own CAN source.
    go func() {
        for frame := range myFrameSource() {
            broker.RxFrames() <- lplex.RxFrame{
                Timestamp: frame.Time,
                Header:    lplex.CANHeader{Priority: 2, PGN: frame.PGN, Source: frame.Src, Destination: 0xFF},
                Data:      frame.Data,
            }
        }
    }()

    // Optional: enable journal recording.
    journalCh := make(chan lplex.RxFrame, 16384)
    broker.SetJournal(journalCh)
    // ... create JournalWriter and call Run in a goroutine.

    http.ListenAndServe(":8080", mux)
}

Lifecycle: the broker goroutine exits when you call broker.CloseRx(). Close the journal channel after that, then wait for the journal writer to finish.

Quick Start

Server
# Start the server (requires SocketCAN interface)
lplex-server -interface can0 -port 8089

# With a config file
lplex-server -config /etc/lplex/lplex-server.conf

# With journal recording enabled
lplex-server -interface can0 -port 8089 -journal-dir /var/log/lplex

# With cloud replication
lplex-server -interface can0 -replication-target cloud.example.com:9443 \
  -replication-instance-id boat-001 \
  -replication-tls-cert /etc/lplex/boat.crt \
  -replication-tls-key /etc/lplex/boat.key \
  -replication-tls-ca /etc/lplex/ca.crt

# Or with systemd
sudo systemctl enable --now lplex-server
Cloud Server
# Start the cloud server with mTLS
lplex-cloud -data-dir /data/lplex \
  -tls-cert /etc/lplex-cloud/server.crt \
  -tls-key /etc/lplex-cloud/server.key \
  -tls-client-ca /etc/lplex-cloud/ca.crt

# With a config file
lplex-cloud -config /etc/lplex-cloud/lplex-cloud.conf
Client (lplex)
# Auto-discover via mDNS and stream all frames
lplex dump

# Connect to a specific server with filtering
lplex dump --server http://inuc1.local:8089 --pgn 129025 --manufacturer Garmin

# Decode known PGNs into human-readable fields
lplex dump --decode

# Filter on decoded field values (auto-enables --decode)
lplex dump --where "pgn == 130310 && water_temperature < 280"
lplex dump --where 'register.name == "State of Charge"'

# Only show frames with significant changes (suppress sensor noise)
lplex dump --changes --decode

# Buffered mode with automatic reconnect replay
lplex dump --server http://inuc1.local:8089 --buffer-timeout PT5M

# List devices on the bus
lplex devices

# Show last-known decoded values
lplex values

# Request a specific PGN from all devices
lplex request --pgn 126996 --decode

# Inspect a journal file
lplex inspect recording.lpj

# Simulate a boat from recorded journals (no CAN bus needed)
lplex simulate --dir /path/to/journals/
lplex simulate --file recording.lpj --speed 10

# Docker: simulate from journal files, exit when done
docker run --rm -p 8090:8090 -v ./journals:/data:ro \
  --entrypoint /lplex ghcr.io/sixfathoms/lplex:latest \
  simulate --dir /data --speed 0 --exit-when-done
Go Client Library (lplexc)
import "github.com/sixfathoms/lplex/lplexc"

// Auto-discover the server
addr, _ := lplexc.Discover(ctx)
client := lplexc.NewClient(addr)

// Get devices on the bus
devices, _ := client.Devices(ctx)

// Subscribe to position updates from Garmin devices
sub, _ := client.Subscribe(ctx, &lplexc.Filter{
    PGNs:          []uint32{129025},
    Manufacturers: []string{"Garmin"},
})
defer sub.Close()

for {
    ev, err := sub.Next()
    if err != nil {
        break
    }
    fmt.Printf("Position: src=%d data=%s\n", ev.Frame.Src, ev.Frame.Data)
}
TypeScript Client Library (@sixfathoms/lplex)
import { Client } from "@sixfathoms/lplex";

const client = new Client("http://inuc1.local:8089");

// Get devices on the bus
const devices = await client.devices();

// Get current bus state snapshot
const snapshot = await client.values();

// Subscribe to position updates from Garmin devices
const stream = await client.subscribe({
  pgn: [129025],
  manufacturer: ["Garmin"],
});

for await (const event of stream) {
  if (event.type === "frame") {
    console.log(`Position: src=${event.frame.src} data=${event.frame.data}`);
  }
}

A CloudClient is also available for the lplex-cloud management API:

import { CloudClient } from "@sixfathoms/lplex";

const cloud = new CloudClient("https://cloud.example.com");
const instances = await cloud.instances();

// Get a regular Client scoped to a specific instance
const client = cloud.client("boat-001");
const devices = await client.devices();

Configuration

lplex can be configured with CLI flags, a HOCON config file, or both. CLI flags always take precedence over config file values.

Config file discovery

Use -config path/to/lplex-server.conf to specify a config file explicitly. If -config is not set, lplex-server searches for:

  1. ./lplex-server.conf
  2. /etc/lplex/lplex-server.conf
  3. ./lplex.conf (backward compat)
  4. /etc/lplex/lplex.conf (backward compat)

If no config file is found, lplex-server continues with defaults (fully backward compatible).

Example config (boat)
interface = can0
port = 8089
max-buffer-duration = PT5M

journal {
  dir = /var/log/lplex
  prefix = nmea2k
  block-size = 262144
  compression = zstd

  rotate {
    duration = PT1H
    size = 0
  }

  retention {
    max-age = P30D
    min-keep = PT24H
  }

  archive {
    command = "/usr/local/bin/archive-to-s3"
    trigger = "on-rotate"
  }
}

replication {
  target = "cloud.example.com:9443"
  instance-id = "boat-001"
  tls {
    cert = "/etc/lplex/boat.crt"
    key = "/etc/lplex/boat.key"
    ca = "/etc/lplex/ca.crt"
  }
}
Example config (cloud)
grpc {
  listen = ":9443"
  tls {
    cert = "/etc/lplex-cloud/server.crt"
    key = "/etc/lplex-cloud/server.key"
    client-ca = "/etc/lplex-cloud/ca.crt"
  }
}
http {
  listen = ":8080"
}
data-dir = "/data/lplex"

journal {
  rotate-duration = PT1H
  retention {
    max-age = P90D
    max-size = 53687091200
  }
  archive {
    command = "/usr/local/bin/archive-to-gcs"
    trigger = "before-expire"
  }
}

See lplex-server.conf.example and lplex-cloud.conf.example for the full annotated versions.

Architecture

SocketCAN (can0)
    |
CANReader goroutine
    |  reads extended CAN frames
    |  reassembles fast-packets (multi-frame PGNs)
    |
    v
rxFrames chan
    |
Broker goroutine (single writer, owns all state)
    |  assigns monotonic sequence numbers
    |  appends pre-serialized JSON to ring buffer (64k entries)
    |  updates device registry (PGN 60928, PGN 126996)
    |  fans out to sessions and ephemeral subscribers
    |  sends ISO requests to discover new devices
    |  feeds journal writer (if enabled)
    |
    +---> ring buffer (pre-serialized JSON, power-of-2)
    +---> DeviceRegistry (keyed by source address)
    +---> ValueStore (last frame per source+PGN)
    +---> sessions map (buffered clients with cursors)
    +---> subscribers map (ephemeral clients, no state)
    +---> journal chan (optional, 16k buffer)
    |
    v
HTTP Server (:8089)                JournalWriter goroutine
    |                                   |  block-based .lpj files
    +-- GET  /events                    |  zstd block compression
    +-- PUT  /clients/{id}              |  CRC32C checksums
    +-- GET  /clients/{id}/events       |  device table per block
    +-- PUT  /clients/{id}/ack          |  O(log N) time seeking
    +-- POST /send                      |  ~2-3 MB/hour at 200 fps
    +-- POST /query                     v
    +-- GET  /devices                .lpj journal files
    +-- GET  /values
    +-- GET  /replication/status

CANWriter goroutine            ReplicationClient (optional)
    |  fragments for TX            |  gRPC to cloud server
    |  writes to SocketCAN         +-- Live: Consumer -> LiveFrame stream
                                   +-- Backfill: raw blocks -> Block stream
                                   +-- Reconnect: exponential backoff

API

Ephemeral streaming

GET /events with optional query params: pgn, exclude_pgn, manufacturer, instance, name (hex).

No session, no replay, no ACK. Zero server-side state after disconnect.

Buffered sessions
  1. PUT /clients/{id} with {"buffer_timeout": "PT5M"} to create/reconnect
  2. GET /clients/{id}/events for SSE (replays from cursor, then live)
  3. PUT /clients/{id}/ack with {"seq": N} to advance cursor

Disconnected sessions keep their cursor for the buffer duration.

Transmit

Both /send and /query are disabled by default. Enable with -send-enabled or send.enabled = true in the config file. Use send.rules (HOCON string or object array) or -send-rules (semicolon-separated DSL) to define ordered allow/deny rules with PGN ranges and CAN NAME lists. HOCON config supports both string rules ("pgn:59904") and native objects ({ pgn = "59904", name = "..." }). Rules are evaluated top-to-bottom, first match wins. Internal device discovery (ISO requests at startup) is not affected.

POST /send with {"pgn": 59904, "src": 254, "dst": 255, "prio": 6, "data": "00ee00"}

Query on demand

POST /query with {"pgn": 129025, "dst": 255} sends an ISO Request (PGN 59904) and waits for the response. Returns the first matching frame as JSON. Optional "timeout": "PT5S" (default 2s). Returns 504 Gateway Timeout if no response arrives.

Devices

GET /devices returns JSON array of all discovered NMEA 2000 devices.

Last values

GET /values returns the most recently received frame for each (device, PGN) pair. Grouped by device, sorted by source address. Useful for getting a snapshot of bus state without subscribing to SSE.

Supports the same filter query params as /events: pgn, exclude_pgn, manufacturer, instance, name (hex). Example: GET /values?pgn=129025&manufacturer=Garmin.

Replication status (boat)

GET /replication/status returns current replication state (available when replication is configured).

Cloud Replication

lplex can replicate CAN bus data from a boat to a cloud instance over gRPC with mTLS. The boat initiates all connections (no public IP required). Data flows over two independent gRPC streams:

  • Live stream: realtime frames from the broker's head, delivered to the cloud within seconds
  • Backfill stream: raw journal blocks for filling historical gaps, newest-first

On reconnect after a connectivity gap, live data resumes immediately while backfill works through the gap in the background. The cloud runs a replica Broker per instance, so web clients connect to the cloud and get the same SSE API as if they were on the boat.

See docs/cloud-replication.md for the full protocol specification.

Cloud HTTP API
Endpoint Description
GET /instances List all instances
GET /instances/{id}/status Instance status (cursor, holes, lag)
GET /instances/{id}/events SSE stream from instance's broker
GET /instances/{id}/devices Device table
GET /instances/{id}/values Last-seen values per (device, PGN). Query params: pgn, manufacturer, instance, name.
GET /instances/{id}/replication/events?limit=N Replication event log (newest first, default 100, max 1024)

Journal Recording

lplex can record all CAN frames to disk as block-based binary journal files (.lpj) for future replay and analysis.

# Enable recording (zstd compression by default)
lplex-server -interface can0 -journal-dir /var/log/lplex

# With rotation (new file every hour)
lplex-server -interface can0 -journal-dir /var/log/lplex -journal-rotate-duration PT1H

# Disable compression
lplex-server -interface can0 -journal-dir /var/log/lplex -journal-compression none

Flags:

Flag Default Description
-journal-dir (disabled) Directory for journal files
-journal-prefix nmea2k Journal file name prefix
-journal-block-size 262144 Block size (power of 2, min 4096)
-journal-compression zstd Block compression: none, zstd, zstd-dict
-journal-rotate-duration PT1H Rotate after duration (ISO 8601)
-journal-rotate-size 0 Rotate after bytes (0 = disabled)
-journal-retention-max-age (disabled) Delete files older than this (ISO 8601, e.g. P30D)
-journal-retention-min-keep (disabled) Never delete files younger than this, unless max-size exceeded
-journal-retention-max-size 0 Hard size cap in bytes; delete oldest files when exceeded
-journal-retention-soft-pct 80 Proactive archive threshold as % of max-size (1-99)
-journal-retention-overflow-policy delete-unarchived What to do when hard cap hit with failed archives
-journal-archive-command (disabled) Path to archive script
-journal-archive-trigger (disabled) When to archive: on-rotate or before-expire

Blocks are compressed individually with zstd (~4x ratio at 256KB blocks on typical CAN data, ~158 MB/day at 200 fps). Each block carries a device table so consumers can resolve source addresses without external state. A block index at end-of-file enables fast seeking; crash-truncated files are recovered via forward-scan. See docs/format.md for the binary format specification.

Retention and Archival

Journal files accumulate indefinitely unless you configure a retention policy. Retention and archival are available on both boat and cloud binaries.

# Keep at most 30 days of journals, but never delete files less than 24 hours old
lplex-server -interface can0 -journal-dir /var/log/lplex \
  -journal-retention-max-age P30D -journal-retention-min-keep PT24H

# Hard size cap: keep at most 10 GB, oldest files deleted first
lplex-server -interface can0 -journal-dir /var/log/lplex \
  -journal-retention-max-size 10737418240

# Archive to S3 on rotation, then delete after 30 days
lplex-server -interface can0 -journal-dir /var/log/lplex \
  -journal-retention-max-age P30D \
  -journal-archive-command /usr/local/bin/archive-to-s3 \
  -journal-archive-trigger on-rotate

Retention algorithm: files are sorted oldest-first. Three zones govern behavior when max-size is set with archival:

  1. Normal (total <= soft threshold): standard age-based expiration, archive-then-delete
  2. Soft zone (soft < total <= hard): proactively queue oldest non-archived files for archive
  3. Hard zone (total > hard): expire files; if archives have failed, apply the overflow policy

max-size overrides min-keep overrides max-age. The soft threshold defaults to 80% of max-size and only applies when both max-size and an archive command are configured.

Overflow policies (when hard cap is hit and archives have failed):

  • delete-unarchived (default): delete files even if not archived, prioritizing continued recording
  • pause-recording: stop journal writes until archives free space, prioritizing archive completeness

Archive script protocol: the script receives file paths as arguments and JSONL metadata on stdin (one line per file with path, instance_id, size, created). It must write JSONL to stdout with per-file status ("ok" or "error"). Failed files are retried with exponential backoff.

Archive triggers:

  • on-rotate: archive immediately after a journal file is closed (eager, minimizes data loss window)
  • before-expire: archive only when a file is about to be deleted by retention (lazy, minimizes archive traffic)

PGN Decoding

lplex can decode known NMEA 2000 PGNs into human-readable field values using the --decode flag:

# Terminal: decoded fields appear below each frame
lplex dump --decode

# JSON output: adds a "decoded" object to each frame
lplex dump --decode --json

# Journal replay with decoding
lplex dump --file recording.lpj --decode

The registry contains ~120 PGNs, of which ~30 have full decoders (position, heading, wind, depth, engine, battery, environment, etc.). The remaining PGNs are name-only: they carry descriptions and metadata (fast-packet, interval) but no field layout. Unknown PGNs pass through with raw hex data as usual.

Packet tests

PGN decoders are verified by table-driven tests in pgn/packets_test.go. Each test vector specifies hex packet data and the expected decoded struct, with automatic round-trip verification. To add a test from real device data, capture a frame with lplex dump --decode --json and copy the data and decoded fields into a new entry.

PGN DSL

PGN definitions live in pgn/defs/*.pgn using a compact DSL that describes bit-level field layouts. The code generator (pgngen) reads these files and produces Go structs with Decode*/Encode methods, a Registry map, Protobuf definitions, and JSON Schema.

go generate ./pgn/...   # regenerate from pgn/defs/*.pgn
Basic syntax
# Line comments start with #

pgn 129025 "Position Rapid Update" interval=100ms {
  latitude   int32  :32  scale=1e-7  unit="deg"
  longitude  int32  :32  scale=1e-7  unit="deg"
}

pgn 129029 "GNSS Position Data" fast_packet interval=1000ms {
  sid              uint8   :8
  days_since_1970  uint16  :16
  # ... more fields
}

pgn 59904 "ISO Request" on_demand {
  requested_pgn  uint32  :24
}
PGN-level attributes

Attributes between the description and opening { apply to the PGN as a whole:

Attribute Description
fast_packet PGN uses multi-frame fast-packet protocol
interval=<duration> Default transmission interval (100ms, 500ms, 1s, 2500ms, 60s). Stored as time.Duration in the registry.
on_demand Event-driven PGN, no periodic transmission
draft Definition is incomplete or reverse-engineered. Propagated to PGNInfo.Draft.

These are code-generated into PGNInfo fields in pgn.Registry and used by IsFastPacket() to identify fast-packet PGNs.

Name-only PGNs

A PGN definition without braces registers the PGN's name and metadata (fast-packet, interval, etc.) without defining a field layout. The generated Registry entry has Decode: nil.

pgn 129038 "AIS Class A Position Report" fast_packet
pgn 126983 "Alert" fast_packet
pgn 127493 "Transmission Parameters Dynamic" draft

This is the canonical form for PGNs whose structure is unknown or not yet implemented. Use this instead of hardcoded name maps.

Field definitions

Each field has: name type :bits [attributes...]

Element Description
name Field name (snake_case). Use _ for reserved/padding bits, ? for unknown/undocumented data.
type uint8, uint16, uint32, uint64, int8, int16, int32, int64, float32, float64, string, or an enum name
:bits Bit width of the field
scale=N Scaling factor: decoded = raw * scale. Output type becomes float64.
offset=N Offset: decoded = raw * scale + offset
unit="..." Human-readable unit (e.g. "deg", "m/s", "rad")
trim="..." Right-trim these characters from decoded string fields (e.g. trim="@ " for AIS names)
tolerance=N Change detection threshold for ChangeTracker. Fields with changes smaller than N are suppressed by lplex dump --changes.
value=N Dispatch constraint for variant PGNs (see below)
Enums

Named enumerations for lookup fields:

enum HeadingReference {
  0 = "true"
  1 = "magnetic"
}

pgn 127250 "Vessel Heading" {
  sid                uint8             :8
  heading            uint16            :16  scale=0.0001  unit="rad"
  heading_reference  HeadingReference  :2
  _                                    :6
}
Lookups

Lookup tables map integer keys to human-readable names. Unlike enums, lookups don't change the field's Go type; the field stays its raw integer type and gets a Name() method for display.

lookup VictronRegister uint16 {
  0x0100 = "Product ID"
  0x0200 = "Device Mode"
  0xED8F = "DC Channel 1 Current"
}

pgn 61184 "Victron Battery Register" {
  manufacturer_code  uint16  :11  value=358
  _                          :2
  industry_code      uint8   :3
  register           uint16  :16  lookup=VictronRegister
  payload            uint32  :32
}

The generator produces:

  • A map[uint16]string variable (victronRegisterNames) with all key-name pairs
  • A RegisterName() string method on the struct that returns the human-readable name (or empty string if unknown)
  • A LookupFields() map[string]string method for display code to wrap the field as {"id": <raw>, "name": "..."}

Keys support hex (0xFF) and decimal (255) literals. Valid key types: uint8, uint16, uint32, uint64.

Variant dispatch (value=)

Some PGN numbers (notably 61184, Proprietary Single Frame) carry different payloads depending on a discriminator field value. The DSL supports this by allowing multiple pgn blocks with the same number, differentiated by value= constraints on a shared discriminator field.

# Victron devices use manufacturer_code=358
pgn 61184 "Victron Battery Register" {
  manufacturer_code  uint16  :11  value=358
  _                          :2
  industry_code      uint8   :3
  register           uint16  :16
  payload            uint32  :32
}

# Garmin devices use manufacturer_code=229
pgn 61184 "Garmin Proprietary" {
  manufacturer_code  uint16  :11  value=229
  _                          :2
  industry_code      uint8   :3
  data               uint32  :32
}

The generator produces:

  • A separate struct and Decode*/Encode for each variant (VictronBatteryRegister, GarminProprietary)
  • A dispatch function Decode61184(data []byte) (any, error) that reads the discriminator from raw bytes and routes to the correct variant decoder
  • A single Registry entry for the PGN number pointing to the dispatch function

Rules and constraints:

Rule Detail
Discriminator field All constrained variants must use the same field name, bit position, and bit width as the discriminator
Unique values Each value=N must be unique across all variants of the same PGN
Default variant A variant with no value= on any field acts as the fallback for unrecognized discriminator values. This is optional, not required.
At most one default Only one default variant (without value=) is allowed per PGN
Minimum one constraint At least one variant must have a value= constraint. Two defaults with no constraints is an error.
Single constrained variant Even a single pgn block with value= gets a dispatch function that rejects non-matching discriminator values
No default means error Without a default variant, unknown discriminator values return an error from the dispatch function
Constrained encode Encode() hardcodes the value=N literal instead of reading the struct field, so encoded frames always have the correct discriminator
Reserved/unknown fields _ (padding) and ? (unknown) fields cannot have value=

Generated dispatch (conceptual):

func Decode61184(data []byte) (any, error) {
    disc := binary.LittleEndian.Uint16(data[0:2]) & 0x07FF
    switch uint64(disc) {
    case 358:
        return DecodeVictronBatteryRegister(data)
    case 229:
        return DecodeGarminProprietary(data)
    default:
        return nil, fmt.Errorf("PGN 61184: unknown manufacturer_code value %d", disc)
    }
}
Repeated fields (repeat=)

When a PGN has N identical consecutive fields (e.g. 28 two-bit switch indicators), use repeat=N to collapse them into a single line. The generator expands them at code-generation time into a slice or map in Go.

# Array mode (default): generates []uint8
pgn 127501 "Binary Switch Bank Status" {
  instance    uint8   :8
  indicator   uint8   :2  repeat=28
}

# Map mode: generates map[int]uint8 with 1-based keys
pgn 127501 "Binary Switch Bank Status" {
  instance    uint8   :8
  indicator   uint8   :2  repeat=28  group="map"
}

# Override the auto-pluralized field name
pgn 127501 "Binary Switch Bank Status" {
  instance    uint8   :8
  indicator   uint8   :2  repeat=28  as="switches"
}
Attribute Description
repeat=N Repeat this field N times (N >= 2). Expands to N consecutive fields of the same type/width.
group="map" Use map[int]T instead of []T in Go. Keys are 1-based (NMEA convention). Default is array.
as="name" Override the auto-pluralized field name. Default: basic English pluralization (indicator -> indicators).

Constraints: repeat= cannot be used on reserved (_) or unknown (?) fields, or combined with value=, lookup=, or enum types. group= and as= require repeat=.

Generated code: Decode produces a slice/map literal with unrolled bit reads. Encode uses bounds-checked (array) or key-checked (map) writes. Fields after a repeated field get correct bit offsets automatically.

Deployment

The .deb package installs a systemd service that binds to can0. Configure with a config file or environment variable:

# Option 1: config file (recommended)
sudo cp lplex-server.conf.example /etc/lplex/lplex-server.conf
sudo vi /etc/lplex/lplex-server.conf

# Option 2: environment variable
# Edit /etc/default/lplex-server:
LPLEX_ARGS="-interface can0 -port 8089 -journal-dir /var/log/lplex -journal-compression zstd"

License

MIT

Documentation

Overview

Package lplex is a CAN bus HTTP bridge for NMEA 2000.

It reads raw CAN frames from a SocketCAN interface, reassembles fast-packets, tracks device discovery, and streams frames to clients over SSE with session management, filtering, and replay.

The package can be embedded into other Go services. Create a Broker to manage frame routing, a Server to expose the HTTP API, and optionally a JournalWriter to record frames to disk.

broker := lplex.NewBroker(lplex.BrokerConfig{
    RingSize:          65536,
    MaxBufferDuration: 5 * time.Minute,
    Logger:            logger,
})
go broker.Run(ctx)

srv := lplex.NewServer(broker, logger, sendpolicy.SendPolicy{})
mux.Handle("/nmea/", http.StripPrefix("/nmea", srv))

Feed frames into the broker via Broker.RxFrames:

broker.RxFrames() <- lplex.RxFrame{
    Timestamp: time.Now(),
    Header:    lplex.CANHeader{Priority: 2, PGN: 129025, Source: 10, Destination: 0xFF},
    Data:      payload,
}

When done, close the rx channel and the broker goroutine exits:

broker.CloseRx()

Index

Constants

View Source
const (
	// DefaultClaimHeartbeat is how often held virtual devices re-broadcast
	// their address claim (PGN 60928).
	DefaultClaimHeartbeat = 60 * time.Second

	// DefaultProductInfoHeartbeat is how often held virtual devices
	// re-broadcast product info (PGN 126996). Longer interval since it's a
	// 134-byte fast-packet.
	DefaultProductInfoHeartbeat = 5 * time.Minute
)
View Source
const DefaultLagCheckInterval = 1000

DefaultLagCheckInterval controls how often the boat checks for lag in the live send loop. Checked every N frames sent rather than by wall clock because when lagging, consumer.Next() returns instantly and the loop spins at CPU speed. At max bus rate (2000 fps) this checks roughly every 0.5s.

View Source
const DefaultMaxFrameRate = 2000

NMEA 2000 runs on CAN 2.0B at 250 kbit/s. An extended frame (29-bit ID, 8-byte payload) is roughly 131-157 bits depending on bit stuffing. Theoretical max is ~1800 frames/sec. We allow 2000 to give ~10% headroom for measurement jitter and bit-stuffing variance.

View Source
const DefaultMaxLiveLag uint64 = 10_000

DefaultMaxLiveLag is the frame count threshold for live lag detection. If the live stream falls this far behind the broker head (boat-side) or the boat's reported head (cloud-side), the stream is killed and the gap switches to backfill mode. ~5 seconds at max bus rate.

View Source
const DefaultMinLagReconnectInterval = 30 * time.Second

DefaultMinLagReconnectInterval prevents thrashing when the system is persistently overloaded. If lag keeps recurring, we wait at least this long between lag-triggered reconnects.

View Source
const DefaultRateBurst = 500

DefaultRateBurst is the burst allowance for transient spikes. Power-on storms (every device announces simultaneously) can briefly exceed the sustained rate. 500 frames absorbs a ~250ms burst at max bus load.

Variables

View Source
var (
	ParseCANID = canbus.ParseCANID
	BuildCANID = canbus.BuildCANID
)
View Source
var ErrFallenBehind = errors.New("consumer fallen behind: data no longer available")

ErrFallenBehind is returned by Consumer.Next when the consumer's cursor has fallen behind both the ring buffer and available journal files.

Functions

func ApplySlots added in v0.4.0

func ApplySlots(broker *Broker, slots []ClientSlot, logger *slog.Logger)

ApplySlots creates pre-configured client sessions on the broker.

func CANReader

func CANReader(ctx context.Context, iface string, rxFrames chan<- RxFrame, logger *slog.Logger) error

CANReader reads frames from SocketCAN, reassembles fast-packets, and sends completed frames to the broker's rxFrames channel. The iface name (e.g. "can0") is used as the Bus tag on each frame.

func CANWriter

func CANWriter(ctx context.Context, iface string, txFrames <-chan TxRequest, logger *slog.Logger) error

CANWriter reads from the broker's txFrames channel and writes to SocketCAN. Handles fast-packet fragmentation for payloads > 8 bytes.

func CompressHandler added in v0.4.0

func CompressHandler(next http.Handler) http.Handler

CompressHandler wraps an http.Handler with gzip compression. Responses are compressed when the client sends Accept-Encoding: gzip and the response Content-Type is not text/event-stream (SSE streams are left uncompressed because per-event gzip flushing adds overhead without meaningful compression on small JSON payloads).

func FragmentFastPacket

func FragmentFastPacket(data []byte, seqCounter uint8) [][]byte

FragmentFastPacket splits a payload into CAN frames for fast-packet TX. seqCounter should be incremented per transfer (wraps at 7, 3-bit field). Returns a slice of 8-byte CAN frame payloads.

func HealthHandler added in v0.3.1

func HealthHandler(cfg HealthConfig) http.HandlerFunc

HealthHandler returns an http.HandlerFunc that serves the /healthz endpoint. It returns the full health status including broker, replication, and component health. For Kubernetes, prefer /livez and /readyz instead.

func InitTracing added in v0.4.0

func InitTracing(ctx context.Context, cfg TracingConfig) (shutdown func(context.Context) error, err error)

InitTracing sets up the OpenTelemetry TracerProvider and returns a shutdown function. If tracing is disabled, sets a no-op provider and returns a no-op shutdown.

func IsFastPacket

func IsFastPacket(pgnNum uint32) bool

IsFastPacket returns true if the PGN uses fast-packet transfer.

func LivenessHandler added in v0.4.0

func LivenessHandler() http.HandlerFunc

LivenessHandler returns an http.HandlerFunc for /livez. It reports whether the process is alive. Always returns 200 OK with a minimal JSON body. Suitable for Kubernetes livenessProbe.

func MetricsHandler added in v0.3.1

func MetricsHandler(broker *Broker, replStatus func() *ReplicationStatus, journalStats func() *JournalWriterStats) http.HandlerFunc

MetricsHandler returns an http.HandlerFunc that serves Prometheus-format metrics from the broker's Stats(). Optional callbacks provide journal and replication metrics for deployments that use those features.

func ParseISO8601Duration

func ParseISO8601Duration(s string) (time.Duration, error)

ParseISO8601Duration parses a subset of ISO 8601 durations (PT format). Supports hours (H), minutes (M), and seconds (S). Examples: "PT5M", "PT1H30M", "PT30S", "PT1H"

func ReadinessHandler added in v0.4.0

func ReadinessHandler(cfg HealthConfig) http.HandlerFunc

ReadinessHandler returns an http.HandlerFunc for /readyz. It reports whether the service is ready to handle traffic by checking CAN bus activity and replication connectivity. Returns 200 for "ok" or "degraded", 503 for "unhealthy". Suitable for Kubernetes readinessProbe.

func SDNotify added in v0.4.0

func SDNotify(state string) bool

SDNotify sends a notification to systemd via the NOTIFY_SOCKET. Returns false if NOTIFY_SOCKET is not set (not running under systemd). This is a minimal implementation that avoids external dependencies.

func SDReady added in v0.4.0

func SDReady() bool

SDReady sends READY=1 to systemd, indicating the service is ready.

func SDWatchdog added in v0.4.0

func SDWatchdog() bool

SDWatchdog sends WATCHDOG=1 to systemd, resetting the watchdog timer.

func Tracer added in v0.4.0

func Tracer(name string) trace.Tracer

Tracer returns a named tracer from the global provider.

Types

type AlertEvent added in v0.4.0

type AlertEvent struct {
	Timestamp  time.Time       `json:"timestamp"`
	Type       AlertType       `json:"type"`
	Severity   string          `json:"severity"` // "warning", "info"
	Summary    string          `json:"summary"`
	InstanceID string          `json:"instance_id,omitempty"`
	Details    json.RawMessage `json:"details,omitempty"`
}

AlertEvent is the payload sent to the webhook endpoint.

type AlertManager added in v0.4.0

type AlertManager struct {
	// contains filtered or unexported fields
}

AlertManager dispatches alert events to a webhook endpoint. Alerts are sent asynchronously via a buffered channel to avoid blocking the caller. Duplicate alerts of the same type are suppressed within the dedup window.

func NewAlertManager added in v0.4.0

func NewAlertManager(cfg AlertManagerConfig) *AlertManager

NewAlertManager creates an alert manager. Call Run to start dispatching. Returns nil if webhookURL is empty (alerting disabled).

func (*AlertManager) Fire added in v0.4.0

func (am *AlertManager) Fire(event AlertEvent)

Fire queues an alert for delivery. Non-blocking; drops the alert if the send buffer is full.

func (*AlertManager) FireBusResumed added in v0.4.0

func (am *AlertManager) FireBusResumed(silenceDuration time.Duration)

FireBusResumed is a convenience method for bus activity resumed alerts.

func (*AlertManager) FireBusSilence added in v0.4.0

func (am *AlertManager) FireBusSilence(lastFrame time.Time, elapsed time.Duration)

FireBusSilence is a convenience method for bus silence alerts.

func (*AlertManager) FireDeviceRemoved added in v0.4.0

func (am *AlertManager) FireDeviceRemoved(src uint8, dev *Device)

FireDeviceRemoved is a convenience method for device removal alerts.

func (*AlertManager) FireReplicationDisconnected added in v0.4.0

func (am *AlertManager) FireReplicationDisconnected(reason string)

FireReplicationDisconnected alerts on replication disconnect.

func (*AlertManager) FireReplicationReconnected added in v0.4.0

func (am *AlertManager) FireReplicationReconnected()

FireReplicationReconnected alerts when replication reconnects.

func (*AlertManager) Run added in v0.4.0

func (am *AlertManager) Run(ctx context.Context)

Run processes queued alerts until ctx is cancelled.

type AlertManagerConfig added in v0.4.0

type AlertManagerConfig struct {
	WebhookURL  string        // HTTP POST target (empty = alerting disabled)
	Timeout     time.Duration // HTTP request timeout (default 5s)
	DedupWindow time.Duration // suppress duplicate alerts within this window (default 5m)
	InstanceID  string        // identifies the boat/instance
	Logger      *slog.Logger
}

AlertManagerConfig configures the alert manager.

type AlertType added in v0.4.0

type AlertType string

AlertType identifies the kind of alert.

const (
	AlertBusSilence              AlertType = "bus_silence"
	AlertBusResumed              AlertType = "bus_resumed"
	AlertDeviceRemoved           AlertType = "device_removed"
	AlertReplicationDisconnected AlertType = "replication_disconnected"
	AlertReplicationReconnected  AlertType = "replication_reconnected"
)

type BlockWriter

type BlockWriter struct {
	// contains filtered or unexported fields
}

BlockWriter appends pre-encoded journal blocks to .lpj files. Unlike JournalWriter, it receives blocks that are already serialized (with CRC, device table, frame data). Used by the cloud replication server to write backfill blocks byte-for-byte without decompression or re-encoding.

func NewBlockWriter

func NewBlockWriter(cfg BlockWriterConfig) (*BlockWriter, error)

NewBlockWriter creates a new BlockWriter. Call AppendBlock to write blocks. Call Close when done to finalize the current file.

func (*BlockWriter) AppendBlock

func (w *BlockWriter) AppendBlock(baseSeq uint64, baseTimeUs int64, data []byte, compressed bool) error

AppendBlock writes a pre-encoded block to the current journal file. For compressed blocks, the data is the compressed payload (written with a 20-byte v2 header). For uncompressed blocks, data is the full block bytes (must be exactly BlockSize with valid CRC).

func (*BlockWriter) Close

func (w *BlockWriter) Close() error

Close finalizes the current file (writes block index for compressed files, syncs, and closes). Safe to call multiple times or on a writer with no open file.

type BlockWriterConfig

type BlockWriterConfig struct {
	Dir            string
	Prefix         string // default: "nmea2k"
	BlockSize      int    // uncompressed block size (from source journal)
	Compression    journal.CompressionType
	RotateDuration time.Duration            // 0 = no limit
	RotateSize     int64                    // 0 = no limit
	OnRotate       func(keeper.RotatedFile) // called after a journal file is closed by rotation
	Logger         *slog.Logger
}

BlockWriterConfig configures a BlockWriter.

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker is the central coordinator. Single goroutine reads from rxFrames, assigns sequence numbers, appends to ring buffer, updates device registry, and fans out to client sessions and ephemeral subscribers.

func NewBroker

func NewBroker(cfg BrokerConfig) *Broker

NewBroker creates a new broker with the given config.

func (*Broker) AckSession

func (b *Broker) AckSession(id string, seq uint64) error

AckSession updates the cursor for a session.

func (*Broker) CloseRx

func (b *Broker) CloseRx()

CloseRx closes the rxFrames channel, signaling the broker to stop processing. Wait on Done() to know when the broker goroutine has actually exited.

func (*Broker) CreateSession

func (b *Broker) CreateSession(id string, bufferTimeout time.Duration, filter *EventFilter) (*ClientSession, uint64)

CreateSession creates or retrieves a client session. Returns the session and the current head sequence number.

When bufferTimeout is 0, the session cursor is reset so no frames are replayed on the next connect (fresh start).

func (*Broker) CurrentSeq

func (b *Broker) CurrentSeq() uint64

CurrentSeq returns the most recently assigned sequence number.

func (*Broker) Devices

func (b *Broker) Devices() *DeviceRegistry

Devices returns the broker's device registry.

func (*Broker) Done

func (b *Broker) Done() <-chan struct{}

Done returns a channel that is closed when the broker's Run() method returns.

func (*Broker) GetSession

func (b *Broker) GetSession(id string) *ClientSession

GetSession returns a session by ID, or nil if not found.

func (*Broker) LastFrameTime added in v0.3.1

func (b *Broker) LastFrameTime() time.Time

LastFrameTime returns the timestamp of the most recently received frame, or zero if no frames have been received yet.

func (*Broker) NewConsumer

func (b *Broker) NewConsumer(cfg ConsumerConfig) *Consumer

NewConsumer creates a pull-based consumer starting at the given cursor. The consumer is registered with the broker for live notifications.

func (*Broker) QueueTx added in v0.4.0

func (b *Broker) QueueTx(req TxRequest) bool

QueueTx sends a frame to the CAN bus. Returns false if the TX queue is full. The frame will appear in the ring buffer, journal, and SSE when the SocketCAN echo comes back through CANReader.

func (*Broker) Run

func (b *Broker) Run(ctx context.Context)

Run is the broker's main loop. Call in its own goroutine. Exits when ctx is cancelled or rxFrames is closed.

func (*Broker) RxFrames

func (b *Broker) RxFrames() chan<- RxFrame

RxFrames returns the channel for submitting received CAN frames to the broker.

func (*Broker) SendISORequest added in v0.3.1

func (b *Broker) SendISORequest(bus string, dst uint8, pgn uint32) error

SendISORequest sends an ISO Request (PGN 59904) to the given destination, asking it to transmit the specified PGN. Returns an error if the tx queue is full. The bus parameter routes the request to the correct CAN interface (empty = default).

func (*Broker) SetAlerts added in v0.4.0

func (b *Broker) SetAlerts(am *AlertManager)

SetAlerts sets the alert manager for device removal notifications. Must be called before Run.

func (*Broker) SetJournal

func (b *Broker) SetJournal(ch chan<- RxFrame)

SetJournal sets the journal channel. Must be called before Run.

func (*Broker) Stats added in v0.3.1

func (b *Broker) Stats() BrokerStats

Stats returns a point-in-time snapshot of broker metrics.

func (*Broker) Subscribe

func (b *Broker) Subscribe(filter *EventFilter) (*subscriber, func())

Subscribe creates an ephemeral fan-out channel with the given filter. Returns the subscriber and a cleanup function that must be called on disconnect.

func (*Broker) TouchSession

func (b *Broker) TouchSession(id string)

TouchSession updates the LastActivity timestamp for a session.

func (*Broker) TxFrames

func (b *Broker) TxFrames() <-chan TxRequest

TxFrames returns the channel for reading CAN frames to transmit.

func (*Broker) Values

func (b *Broker) Values() *ValueStore

Values returns the broker's last-values store.

func (*Broker) VirtualDevices added in v0.4.0

func (b *Broker) VirtualDevices() *VirtualDeviceManager

VirtualDevices returns the broker's virtual device manager, or nil if disabled.

type BrokerConfig

type BrokerConfig struct {
	RingSize          int           // must be power of 2
	MaxBufferDuration time.Duration // cap on client buffer_timeout
	JournalDir        string        // directory containing .lpj files (for consumer journal fallback)
	Logger            *slog.Logger

	// ReplicaMode makes the broker honor frame.Seq instead of auto-incrementing.
	// Used by the cloud replication server where sequence numbers originate
	// from the boat's broker.
	ReplicaMode bool

	// InitialHead sets the starting head value. Use this when resuming a
	// replica broker from persisted state so the ring starts at the right
	// position. Zero means start at 1 (the default).
	InitialHead uint64

	// DeviceIdleTimeout removes devices that haven't been seen for this
	// duration. Zero means use the default (5 minutes). Use -1 to disable.
	DeviceIdleTimeout time.Duration

	// VirtualDevices configures virtual NMEA 2000 devices that claim
	// addresses on the bus. Nil or empty means disabled.
	VirtualDevices []VirtualDeviceConfig

	// ClaimHeartbeat is how often virtual devices re-broadcast their address
	// claim (PGN 60928). Zero uses DefaultClaimHeartbeat (60s).
	ClaimHeartbeat time.Duration

	// ProductInfoHeartbeat is how often virtual devices re-broadcast product
	// info (PGN 126996). Zero uses DefaultProductInfoHeartbeat (5m).
	ProductInfoHeartbeat time.Duration
}

BrokerConfig holds broker configuration.

type BrokerHealth added in v0.3.1

type BrokerHealth struct {
	Status        string    `json:"status"` // "ok" or "unhealthy"
	FramesTotal   uint64    `json:"frames_total"`
	HeadSeq       uint64    `json:"head_seq"`
	LastFrameTime time.Time `json:"last_frame_time,omitempty"`
	DeviceCount   int       `json:"device_count"`
	RingEntries   uint64    `json:"ring_entries"`
	RingCapacity  int       `json:"ring_capacity"`
}

BrokerHealth reports the broker's health.

type BrokerStats added in v0.3.1

type BrokerStats struct {
	FramesTotal       uint64    // total frames processed
	LastFrameTime     time.Time // timestamp of most recent frame (zero if none)
	RingEntries       uint64    // current entries in ring buffer
	RingCapacity      int       // ring buffer size
	HeadSeq           uint64    // next sequence number
	ActiveSessions    int       // buffered client sessions
	ActiveSubscribers int       // ephemeral SSE subscribers
	ActiveConsumers   int       // pull-based consumers
	DeviceCount       int       // discovered NMEA 2000 devices
	JournalDrops      uint64    // frames dropped due to full journal channel
	DevicesAdded      uint64    // cumulative device discovery events
	DevicesRemoved    uint64    // cumulative device eviction events
	ConsumerMaxLag    uint64    // max consumer lag (head - cursor) across all consumers
}

BrokerStats is a point-in-time snapshot of broker metrics.

type BusSilenceMonitor added in v0.3.1

type BusSilenceMonitor struct {
	// contains filtered or unexported fields
}

BusSilenceMonitor watches for periods of CAN bus inactivity and logs alerts when no frames have been received for a configurable duration. This helps detect CAN bus disconnection or power issues on the boat.

func NewBusSilenceMonitor added in v0.3.1

func NewBusSilenceMonitor(timeout time.Duration, broker *Broker, logger *slog.Logger, alerts *AlertManager) *BusSilenceMonitor

NewBusSilenceMonitor creates a monitor that alerts when no frames have been received for the given timeout duration. The timeout must be positive. The optional AlertManager fires webhook alerts on silence/resume transitions.

func (*BusSilenceMonitor) IsSilent added in v0.3.1

func (m *BusSilenceMonitor) IsSilent() bool

IsSilent reports whether the bus is currently in a silence state.

func (*BusSilenceMonitor) Run added in v0.3.1

func (m *BusSilenceMonitor) Run(ctx context.Context)

Run checks for bus silence periodically and logs warnings. It exits when ctx is cancelled.

type BusSource added in v0.4.0

type BusSource struct {
	Bus    string
	Source uint8
}

BusSource identifies a device on a specific CAN bus.

type CANBus added in v0.4.0

type CANBus interface {
	// ReadFrames reads CAN frames from the bus, reassembles fast-packets,
	// and sends completed frames to rxFrames. Blocks until ctx is cancelled
	// or an unrecoverable error occurs.
	ReadFrames(ctx context.Context, rxFrames chan<- RxFrame) error

	// WriteFrames reads TX requests from txFrames and transmits them on the
	// bus. Blocks until ctx is cancelled or txFrames is closed.
	WriteFrames(ctx context.Context, txFrames <-chan TxRequest) error

	// Name returns the bus name (e.g. "can0", "loopback0").
	Name() string
}

CANBus provides read and write access to a CAN bus. Implementations handle the transport (SocketCAN, loopback, etc.) while the broker remains transport-agnostic.

type CANHeader

type CANHeader = canbus.CANHeader

Type aliases so existing server code compiles unchanged.

type ClaimState added in v0.4.0

type ClaimState int

ClaimState represents where a virtual device is in the address claim lifecycle.

const (
	// ClaimIdle is the initial state before any claim attempt.
	ClaimIdle ClaimState = iota
	// ClaimInProgress means a claim frame has been sent but the 250ms holdoff hasn't elapsed.
	ClaimInProgress
	// ClaimHeld means the address is successfully claimed and ready for use.
	ClaimHeld
	// ClaimCannotClaim means all 253 addresses (0-252) were exhausted.
	ClaimCannotClaim
)

func (ClaimState) String added in v0.4.0

func (s ClaimState) String() string

type ClientSession

type ClientSession struct {
	ID            string
	Cursor        uint64        // last ACK'd sequence number (0 = never ACK'd)
	BufferTimeout time.Duration // how long to keep buffering after disconnect
	LastActivity  time.Time
	Filter        *EventFilter // nil = receive all frames
}

ClientSession tracks a buffered client's metadata for persistence across HTTP reconnects. The actual frame reading is done by Consumer.

type ClientSlot added in v0.4.0

type ClientSlot struct {
	ID            string        `json:"id"`
	BufferTimeout time.Duration `json:"buffer_timeout"`
	Filter        *EventFilter  `json:"filter,omitempty"`
}

ClientSlot defines a pre-configured client session that is created at startup. This allows test environments and Docker containers to have named sessions ready before any HTTP client connects.

func ParseClientSlot added in v0.4.0

func ParseClientSlot(cfg ClientSlotConfig) (ClientSlot, error)

ParseClientSlot converts a ClientSlotConfig into a ClientSlot, parsing durations and hex NAME values.

type ClientSlotConfig added in v0.4.0

type ClientSlotConfig struct {
	ID            string            `json:"id"`
	BufferTimeout string            `json:"buffer_timeout"`
	Filter        *SlotFilterConfig `json:"filter,omitempty"`
}

ClientSlotConfig is the JSON/HOCON-friendly representation of a client slot before duration parsing.

type ComponentHealth added in v0.3.1

type ComponentHealth struct {
	Status  string `json:"status"`
	Message string `json:"message,omitempty"`
}

ComponentHealth is a generic component status.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a pull-based reader that iterates frames at its own pace. It reads from a tiered log: journal files on disk (oldest), ring buffer in memory (recent), and live notification (current head, blocking wait).

func (*Consumer) Close

func (c *Consumer) Close() error

Close stops the consumer and removes it from the broker. Safe to call multiple times.

func (*Consumer) Cursor

func (c *Consumer) Cursor() uint64

Cursor returns the consumer's current position (next seq to read).

func (*Consumer) Next

func (c *Consumer) Next(ctx context.Context) (*Frame, error)

Next returns the next matching frame, blocking until one is available. Returns ErrFallenBehind if the consumer's cursor has fallen behind all available data. Returns ctx.Err() if the context is cancelled.

type ConsumerConfig

type ConsumerConfig struct {
	Cursor uint64       // starting position (next seq to read)
	Filter *EventFilter // nil = all frames
}

ConsumerConfig configures a new Consumer.

type DecodedDeviceValues added in v0.3.1

type DecodedDeviceValues struct {
	Name         string            `json:"name"`
	Bus          string            `json:"bus,omitempty"`
	Source       uint8             `json:"src"`
	Manufacturer string            `json:"manufacturer,omitempty"`
	ModelID      string            `json:"model_id,omitempty"`
	Values       []DecodedPGNValue `json:"values"`
}

DecodedDeviceValues groups decoded PGN values by device.

type DecodedPGNValue added in v0.3.1

type DecodedPGNValue struct {
	PGN         uint32 `json:"pgn"`
	Description string `json:"description"`
	Ts          string `json:"ts"`
	Seq         uint64 `json:"seq"`
	Fields      any    `json:"fields"`
}

DecodedPGNValue is a single PGN's last-known value decoded into named fields.

type Device

type Device struct {
	Bus              string `json:"bus,omitempty"`
	Source           uint8  `json:"src"`
	NAME             uint64 `json:"-"`
	NAMEHex          string `json:"name"`
	Manufacturer     string `json:"manufacturer"`
	ManufacturerCode uint16 `json:"manufacturer_code"`
	DeviceClass      uint8  `json:"device_class"`
	DeviceFunction   uint8  `json:"device_function"`
	DeviceInstance   uint8  `json:"device_instance"`
	UniqueNumber     uint32 `json:"unique_number,omitempty"`

	// PGN 126996 Product Information fields.
	ModelID         string `json:"model_id,omitempty"`
	SoftwareVersion string `json:"software_version,omitempty"`
	ModelVersion    string `json:"model_version,omitempty"`
	ModelSerial     string `json:"model_serial,omitempty"`
	ProductCode     uint16 `json:"product_code,omitempty"`

	// Per-source packet statistics.
	FirstSeen   time.Time `json:"first_seen"`
	LastSeen    time.Time `json:"last_seen"`
	PacketCount uint64    `json:"packet_count"`
	ByteCount   uint64    `json:"byte_count"`
}

Device represents an NMEA 2000 device discovered via ISO Address Claim (PGN 60928) and optionally enriched with Product Information (PGN 126996).

type DeviceNotFoundError added in v0.4.0

type DeviceNotFoundError struct {
	Bus    string
	Source uint8
}

DeviceNotFoundError indicates that no device was found at the given bus and source address.

func (*DeviceNotFoundError) Error added in v0.4.0

func (e *DeviceNotFoundError) Error() string

type DeviceRegistry

type DeviceRegistry struct {
	// contains filtered or unexported fields
}

DeviceRegistry tracks NMEA 2000 devices discovered via PGN 60928. Thread-safe for concurrent reads (SSE streams) and writes (broker goroutine). Devices are keyed by (bus, source address) to support multiple CAN interfaces.

func NewDeviceRegistry

func NewDeviceRegistry() *DeviceRegistry

NewDeviceRegistry creates an empty device registry.

func (*DeviceRegistry) ExpireIdle added in v0.4.0

func (r *DeviceRegistry) ExpireIdle(cutoff time.Time) []BusSource

ExpireIdle removes all devices whose LastSeen is before cutoff. Returns the (bus, source) pairs of evicted entries.

func (*DeviceRegistry) Get

func (r *DeviceRegistry) Get(bus string, source uint8) *Device

Get returns a snapshot of the device at the given (bus, source), or nil.

func (*DeviceRegistry) HandleAddressClaim

func (r *DeviceRegistry) HandleAddressClaim(bus string, source uint8, data []byte) (dev *Device, evictedSrc uint8, evicted bool)

HandleAddressClaim processes a PGN 60928 ISO Address Claim. Returns the device if this is a new or changed device, nil otherwise. If a different source address on the same bus previously held the same NAME, that old entry is evicted and its source address is returned in evictedSrc.

func (*DeviceRegistry) HandleProductInfo

func (r *DeviceRegistry) HandleProductInfo(bus string, source uint8, data []byte) *Device

HandleProductInfo processes a PGN 126996 Product Information response. Returns the device if fields changed, nil if source is unknown or unchanged.

func (*DeviceRegistry) RecordPacket

func (r *DeviceRegistry) RecordPacket(bus string, source uint8, ts time.Time, dataLen int) bool

RecordPacket updates per-source packet statistics. Returns true if this is a previously unseen (bus, source) pair. Source 254 (Cannot Claim Address) and 255 (broadcast) are ignored since they are not real devices.

func (*DeviceRegistry) Snapshot

func (r *DeviceRegistry) Snapshot() []Device

Snapshot returns a copy of all known devices.

func (*DeviceRegistry) SnapshotJSON

func (r *DeviceRegistry) SnapshotJSON() json.RawMessage

SnapshotJSON returns the device list as pre-serialized JSON.

func (*DeviceRegistry) SourcesMissingProductInfo added in v0.4.0

func (r *DeviceRegistry) SourcesMissingProductInfo() []BusSource

SourcesMissingProductInfo returns (bus, source) pairs of devices that have a NAME (address claim received) but no product info (PGN 126996) yet.

func (*DeviceRegistry) SynthesizeFrames

func (r *DeviceRegistry) SynthesizeFrames(ts time.Time) []RxFrame

SynthesizeFrames generates RxFrame slices for PGN 60928 (Address Claim) and PGN 126996 (Product Info) from all known devices. Used to seed a remote broker's device registry on live stream connect.

type DeviceValues

type DeviceValues struct {
	Name         string     `json:"name"`
	Bus          string     `json:"bus,omitempty"`
	Source       uint8      `json:"src"`
	Manufacturer string     `json:"manufacturer,omitempty"`
	ModelID      string     `json:"model_id,omitempty"`
	Values       []PGNValue `json:"values"`
}

DeviceValues groups PGN values by device in the JSON response.

type EventFilter

type EventFilter struct {
	PGNs          []uint32
	ExcludePGNs   []uint32
	Manufacturers []string
	Instances     []uint8
	Names         []uint64 // 64-bit CAN NAMEs (include)
	ExcludeNames  []uint64 // 64-bit CAN NAMEs (exclude)
	Buses         []string // SocketCAN interface names (e.g. "can0", "can1")
}

EventFilter specifies which CAN frames a session receives. Categories are AND'd (all set categories must match), values within a category are OR'd (any value in the list matches).

func ParseFilterParams

func ParseFilterParams(r *http.Request) (*EventFilter, error)

ParseFilterParams reads optional filter query params from a request. Supported params: pgn (decimal), manufacturer (name or code), instance (decimal), name (hex CAN NAME), bus (SocketCAN interface name). Returns nil filter if no params are set.

func (*EventFilter) IsEmpty

func (f *EventFilter) IsEmpty() bool

IsEmpty returns true if no filter criteria are set.

type EventLog

type EventLog struct {
	// contains filtered or unexported fields
}

EventLog is a fixed-size ring buffer of replication events.

func NewEventLog

func NewEventLog() *EventLog

NewEventLog creates an empty event log.

func (*EventLog) Recent

func (l *EventLog) Recent(n int) []ReplicationEvent

Recent returns up to n events, newest first.

func (*EventLog) Record

func (l *EventLog) Record(typ ReplicationEventType, detail map[string]any)

Record appends a new event to the log.

type FastPacketAssembler

type FastPacketAssembler struct {
	// contains filtered or unexported fields
}

FastPacketAssembler reassembles multi-frame fast-packet PGNs.

Fast-packet protocol:

  • Frame 0: byte[0] = seq_counter(3 bits) | frame_number(5 bits), byte[1] = total_bytes, bytes[2:8] = first 6 data bytes
  • Frame N: byte[0] = seq_counter(3 bits) | frame_number(5 bits), bytes[1:8] = next 7 data bytes

func NewFastPacketAssembler

func NewFastPacketAssembler(timeout time.Duration) *FastPacketAssembler

NewFastPacketAssembler creates a new assembler with the given reassembly timeout.

func (*FastPacketAssembler) Process

func (a *FastPacketAssembler) Process(pgn uint32, source uint8, data []byte, now time.Time) []byte

Process handles a CAN frame that is part of a fast-packet transfer. Returns the complete reassembled payload when all frames are received, nil otherwise.

func (*FastPacketAssembler) PurgeStale

func (a *FastPacketAssembler) PurgeStale(now time.Time)

PurgeStale removes any in-progress assemblies older than the timeout.

type Frame

type Frame struct {
	Seq       uint64
	Timestamp time.Time
	Header    CANHeader
	Bus       string // SocketCAN interface name (empty for journal-replayed frames)
	Data      []byte // raw CAN payload
	// contains filtered or unexported fields
}

Frame is a single CAN frame returned by Consumer.Next.

func (*Frame) JSON

func (f *Frame) JSON() ([]byte, error)

JSON returns the pre-serialized JSON for this frame (SSE format). If not cached (e.g. from journal replay), it serializes on demand.

type HealthConfig added in v0.3.1

type HealthConfig struct {
	Broker     *Broker
	ReplStatus func() *ReplicationStatus // nil if replication not configured

	// BusSilenceThreshold is the duration after which no frames indicates
	// a CAN bus problem. Zero disables bus silence detection.
	BusSilenceThreshold time.Duration
}

HealthConfig configures the health check endpoint.

type HealthStatus added in v0.3.1

type HealthStatus struct {
	Status      string                     `json:"status"` // "ok", "degraded", or "unhealthy"
	Broker      BrokerHealth               `json:"broker"`
	Replication *ReplicationHealth         `json:"replication,omitempty"`
	Components  map[string]ComponentHealth `json:"components,omitempty"`
}

HealthStatus is the structured response from the /healthz endpoint.

type HoleTracker

type HoleTracker struct {
	// contains filtered or unexported fields
}

HoleTracker tracks gaps (holes) in a sequence number space. Holes are non-overlapping, sorted by Start, and represent ranges of missing data.

Typical case: 0-3 holes. All operations are linear on the slice, which is perfectly fine for the expected cardinality.

func NewHoleTracker

func NewHoleTracker() *HoleTracker

NewHoleTracker creates an empty hole tracker.

func (*HoleTracker) Add

func (h *HoleTracker) Add(start, end uint64)

Add inserts a new hole [start, end). Merges with any overlapping or adjacent holes. No-op if start >= end.

func (*HoleTracker) ContinuousThrough

func (h *HoleTracker) ContinuousThrough(cursor uint64) uint64

ContinuousThrough returns the highest sequence number with no holes below it. Given a base cursor and the hole set, this is the seq just before the first hole (or the cursor itself if no holes exist before it).

Example: cursor=100, holes=[(200,300)] -> returns 199 (continuous through 199) Example: cursor=100, holes=[(100,200)] -> returns 99 (hole starts at cursor) Example: cursor=100, no holes -> returns max uint64 (no bound from holes)

func (*HoleTracker) Fill

func (h *HoleTracker) Fill(start, end uint64) bool

Fill marks [start, end) as received, removing that range from any holes. Returns true if any holes were actually affected.

func (*HoleTracker) Holes

func (h *HoleTracker) Holes() []SeqRange

Holes returns a copy of current holes, sorted by Start.

func (*HoleTracker) Len

func (h *HoleTracker) Len() int

Len returns the number of holes.

func (*HoleTracker) TotalMissing

func (h *HoleTracker) TotalMissing() uint64

TotalMissing returns the total number of missing sequence numbers across all holes.

type InstanceManager

type InstanceManager struct {
	// contains filtered or unexported fields
}

InstanceManager manages per-instance state on the cloud side.

func NewInstanceManager

func NewInstanceManager(dataDir string, logger *slog.Logger) (*InstanceManager, error)

NewInstanceManager creates a new instance manager, loading any persisted state.

func (*InstanceManager) Get

func (im *InstanceManager) Get(id string) *InstanceState

Get returns the instance state, or nil if not found.

func (*InstanceManager) GetOrCreate

func (im *InstanceManager) GetOrCreate(id string) *InstanceState

GetOrCreate returns the instance state, creating it if necessary.

func (*InstanceManager) List

func (im *InstanceManager) List() []InstanceSummary

List returns a snapshot of all instance IDs and their basic state.

func (*InstanceManager) SetDeviceIdleTimeout added in v0.4.0

func (im *InstanceManager) SetDeviceIdleTimeout(d time.Duration)

SetDeviceIdleTimeout configures the device idle timeout for all existing and future instance brokers. Must be called before instances connect.

func (*InstanceManager) SetInstancePaused

func (im *InstanceManager) SetInstancePaused(instanceID string, paused bool)

SetInstancePaused pauses or unpauses journal writing for a specific instance. Used by the JournalKeeper overflow policy to stop/resume writes.

func (*InstanceManager) SetJournalRotation added in v0.3.1

func (im *InstanceManager) SetJournalRotation(duration time.Duration, size int64)

SetJournalRotation configures rotation for live journal writers. Must be called before any connections are accepted. Retroactively updates all existing instances loaded at startup.

func (*InstanceManager) SetOnRotate

func (im *InstanceManager) SetOnRotate(fn func(instanceID string, rf keeper.RotatedFile))

SetOnRotate sets a callback invoked when any instance's journal or backfill file is rotated. Used by the cloud binary to feed the JournalKeeper. Must be called before any connections are accepted. Retroactively updates all existing instances loaded at startup.

func (*InstanceManager) SetRingSize added in v0.4.0

func (im *InstanceManager) SetRingSize(size int)

SetRingSize configures the ring buffer size for all existing and future instance brokers. Must be called before instances connect.

func (*InstanceManager) Shutdown

func (im *InstanceManager) Shutdown()

Shutdown stops all instance brokers and persists state.

type InstanceState

type InstanceState struct {
	ID               string       `json:"id"`
	Cursor           uint64       `json:"cursor"`        // continuous data through this seq
	BoatHeadSeq      uint64       `json:"boat_head_seq"` // last reported by boat
	BoatJournalBytes uint64       `json:"boat_journal_bytes"`
	LastSeen         time.Time    `json:"last_seen"`
	Connected        bool         `json:"-"`
	HoleTracker      *HoleTracker `json:"-"`

	// Persisted hole state
	PersistedHoles []SeqRange `json:"holes,omitempty"`
	// contains filtered or unexported fields
}

InstanceState tracks the replication state for a single boat instance on the cloud side. Thread-safe.

func (*InstanceState) RecentEvents

func (s *InstanceState) RecentEvents(n int) []ReplicationEvent

RecentEvents returns up to n recent replication events, newest first.

func (*InstanceState) RecordEvent

func (s *InstanceState) RecordEvent(typ ReplicationEventType, detail map[string]any)

RecordEvent appends a diagnostic event to this instance's event log.

func (*InstanceState) Status

func (s *InstanceState) Status() InstanceStatus

Status returns a thread-safe snapshot of this instance's replication state.

type InstanceStatus

type InstanceStatus struct {
	ID               string     `json:"id"`
	Connected        bool       `json:"connected"`
	Cursor           uint64     `json:"cursor"`
	BoatHeadSeq      uint64     `json:"boat_head_seq"`
	BoatJournalBytes uint64     `json:"boat_journal_bytes"`
	Holes            []SeqRange `json:"holes,omitzero"`
	LagSeqs          uint64     `json:"lag_seqs"`
	LastSeen         time.Time  `json:"last_seen"`
}

InstanceStatus is a detailed snapshot of an instance for status reporting.

type InstanceSummary

type InstanceSummary struct {
	ID          string    `json:"id"`
	Connected   bool      `json:"connected"`
	Cursor      uint64    `json:"cursor"`
	BoatHeadSeq uint64    `json:"boat_head_seq"`
	Holes       int       `json:"holes"`
	LagSeqs     uint64    `json:"lag_seqs"`
	LastSeen    time.Time `json:"last_seen"`
}

InstanceSummary is a snapshot of an instance for listing.

type JournalConfig

type JournalConfig struct {
	Dir            string
	Prefix         string                   // default: "nmea2k"
	BlockSize      int                      // default: 262144, power of 2, min 4096
	Compression    journal.CompressionType  // default: CompressionNone
	RotateDuration time.Duration            // 0 = no limit
	RotateSize     int64                    // 0 = no limit
	RotateCount    int64                    // 0 = no limit
	OnRotate       func(keeper.RotatedFile) // called after a journal file is closed by rotation
	Logger         *slog.Logger
}

JournalConfig configures the journal writer.

type JournalWriter

type JournalWriter struct {
	// contains filtered or unexported fields
}

JournalWriter writes CAN frames to block-based journal files.

func NewJournalWriter

func NewJournalWriter(cfg JournalConfig, devices *DeviceRegistry, ch <-chan RxFrame) (*JournalWriter, error)

NewJournalWriter creates a writer. Call Run to start.

func (*JournalWriter) Run

func (w *JournalWriter) Run(ctx context.Context) error

Run is the main loop. Blocks until ctx is cancelled or the channel is closed.

func (*JournalWriter) SetPaused

func (w *JournalWriter) SetPaused(p bool)

SetPaused sets whether the writer should discard incoming frames. Used by the overflow policy to stop writes when disk is full.

func (*JournalWriter) Stats added in v0.4.0

func (w *JournalWriter) Stats() JournalWriterStats

Stats returns a point-in-time snapshot of journal write metrics.

type JournalWriterStats added in v0.4.0

type JournalWriterStats struct {
	BlocksWritten          uint64        // total blocks flushed
	BytesWritten           uint64        // total bytes written to disk
	LastBlockWriteDuration time.Duration // duration of last block write
}

JournalWriterStats holds point-in-time journal write metrics.

type LoopbackBus added in v0.4.0

type LoopbackBus struct {
	// contains filtered or unexported fields
}

LoopbackBus implements CANBus as an in-memory loopback: transmitted frames are echoed back as received frames (matching SocketCAN's kernel echo behavior). Useful for testing and development on platforms without SocketCAN (e.g. macOS).

func NewLoopbackBus added in v0.4.0

func NewLoopbackBus(name string, bufSize int, logger *slog.Logger) *LoopbackBus

NewLoopbackBus creates a CANBus that echoes transmitted frames back as received frames. The buffer size controls how many frames can be queued via Inject before blocking.

func (*LoopbackBus) Inject added in v0.4.0

func (b *LoopbackBus) Inject(frame RxFrame) bool

Inject adds a frame to the bus as if it was received from CAN hardware. The Bus field is set to the loopback bus name. Non-blocking: drops the frame if the internal buffer is full.

func (*LoopbackBus) Name added in v0.4.0

func (b *LoopbackBus) Name() string

func (*LoopbackBus) ReadFrames added in v0.4.0

func (b *LoopbackBus) ReadFrames(ctx context.Context, rxFrames chan<- RxFrame) error

func (*LoopbackBus) WriteFrames added in v0.4.0

func (b *LoopbackBus) WriteFrames(ctx context.Context, txFrames <-chan TxRequest) error

type MQTTBridge added in v0.4.0

type MQTTBridge struct {
	// contains filtered or unexported fields
}

MQTTBridge subscribes to the broker's frame stream and publishes each frame to an MQTT broker. Frames are published to per-PGN topics ({prefix}/frames/{pgn}) with the pre-serialized JSON payload.

func NewMQTTBridge added in v0.4.0

func NewMQTTBridge(cfg MQTTBridgeConfig, broker *Broker) *MQTTBridge

NewMQTTBridge creates a new MQTT bridge. Call Run to start publishing.

func (*MQTTBridge) Run added in v0.4.0

func (m *MQTTBridge) Run(ctx context.Context) error

Run connects to the MQTT broker, subscribes to the lplex broker's frame stream, and publishes frames until ctx is cancelled. Reconnection is handled automatically by the paho MQTT client.

type MQTTBridgeConfig added in v0.4.0

type MQTTBridgeConfig struct {
	// BrokerURL is the MQTT broker address (e.g. "tcp://localhost:1883").
	BrokerURL string

	// TopicPrefix is prepended to all published topics (default "lplex").
	// Frames are published to {prefix}/frames/{pgn} or {prefix}/frames/all.
	TopicPrefix string

	// ClientID identifies this client to the MQTT broker (default "lplex-server").
	ClientID string

	// QoS is the MQTT quality of service level (0, 1, or 2; default 0).
	QoS byte

	// Username and Password for MQTT broker authentication (optional).
	Username string
	Password string

	// Filter restricts which CAN frames are published.
	Filter *EventFilter

	// Logger for diagnostic output.
	Logger *slog.Logger
}

MQTTBridgeConfig configures the MQTT publisher bridge.

type PGNValue

type PGNValue struct {
	PGN  uint32 `json:"pgn"`
	Ts   string `json:"ts"`
	Data string `json:"data"`
	Seq  uint64 `json:"seq"`
}

PGNValue is a single PGN's last-known value in the JSON response.

type ReplicationClient

type ReplicationClient struct {
	// contains filtered or unexported fields
}

ReplicationClient streams frames from the local broker to a cloud replication server over gRPC. It runs two independent streams: one for live frames (from the broker's head forward) and one for backfilling historical gaps (from journal files). On disconnect, it reconnects with exponential backoff and resumes both streams.

func NewReplicationClient

func NewReplicationClient(cfg ReplicationClientConfig, broker *Broker) *ReplicationClient

NewReplicationClient creates a new replication client. Call Run to start.

func (*ReplicationClient) Run

func (c *ReplicationClient) Run(ctx context.Context) error

Run is the main loop. Connects to the cloud, performs handshake, and starts live + backfill streams. Reconnects on failure with exponential backoff. Blocks until ctx is cancelled.

func (*ReplicationClient) SetAlerts added in v0.4.0

func (c *ReplicationClient) SetAlerts(am *AlertManager)

SetAlerts sets the alert manager for replication alerts. Must be called before Run.

func (*ReplicationClient) Status

Status returns the current replication state for status reporting.

type ReplicationClientConfig

type ReplicationClientConfig struct {
	Target     string // cloud gRPC address (host:port)
	InstanceID string
	CertFile   string // client certificate
	KeyFile    string // client private key
	CAFile     string // CA certificate for verifying server
	Logger     *slog.Logger

	// Resource protection tuning. Zero values use defaults from
	// replication_limits.go.
	MaxLiveLag              uint64        // max frames live can lag before reconnect (default: DefaultMaxLiveLag)
	LagCheckInterval        int           // check lag every N frames sent (default: DefaultLagCheckInterval)
	MinLagReconnectInterval time.Duration // min wait between lag-triggered reconnects (default: DefaultMinLagReconnectInterval)
}

ReplicationClientConfig configures the boat-side replication client.

type ReplicationEvent

type ReplicationEvent struct {
	Time   time.Time            `json:"time"`
	Type   ReplicationEventType `json:"type"`
	Detail map[string]any       `json:"detail,omitempty"`
}

ReplicationEvent is a single diagnostic event from the replication pipeline.

type ReplicationEventType

type ReplicationEventType string

ReplicationEventType identifies the kind of replication event.

const (
	EventLiveStart     ReplicationEventType = "live_start"
	EventLiveStop      ReplicationEventType = "live_stop"
	EventBackfillStart ReplicationEventType = "backfill_start"
	EventBackfillStop  ReplicationEventType = "backfill_stop"
	EventBlockReceived ReplicationEventType = "block_received"
	EventCheckpoint    ReplicationEventType = "checkpoint"
)

type ReplicationHealth added in v0.3.1

type ReplicationHealth struct {
	Status            string    `json:"status"` // "ok", "degraded", or "disconnected"
	Connected         bool      `json:"connected"`
	LiveLag           uint64    `json:"live_lag"`
	BackfillRemaining uint64    `json:"backfill_remaining_seqs"`
	LastAck           time.Time `json:"last_ack,omitempty"`
}

ReplicationHealth reports the replication client's health.

type ReplicationServer

type ReplicationServer struct {
	pb.UnimplementedReplicationServer

	// Resource protection tuning. Zero values use defaults from
	// replication_limits.go.
	MaxFrameRate float64 // max frames/sec per live stream (default: DefaultMaxFrameRate)
	RateBurst    int     // burst allowance for transient spikes (default: DefaultRateBurst)
	MaxLiveLag   uint64  // max frames live can lag before closing stream (default: DefaultMaxLiveLag)
	// contains filtered or unexported fields
}

ReplicationServer implements the gRPC Replication service.

func NewReplicationServer

func NewReplicationServer(im *InstanceManager, logger *slog.Logger) *ReplicationServer

NewReplicationServer creates a new replication gRPC server.

func (*ReplicationServer) Backfill

Backfill handles bulk block transfer for filling gaps.

func (*ReplicationServer) GetInstanceBroker

func (s *ReplicationServer) GetInstanceBroker(instanceID string) *Broker

GetInstanceBroker returns the broker for an instance (starting it if needed). Used by HTTP handlers to serve SSE from a cloud instance.

func (*ReplicationServer) GetInstanceState

func (s *ReplicationServer) GetInstanceState(instanceID string) *InstanceState

GetInstanceState returns the instance state for status reporting.

func (*ReplicationServer) Handshake

Handshake exchanges sync state between boat and cloud.

func (*ReplicationServer) Live

Live handles the realtime frame stream from a boat.

type ReplicationStatus

type ReplicationStatus struct {
	Connected             bool       `json:"connected"`
	InstanceID            string     `json:"instance_id"`
	LocalHeadSeq          uint64     `json:"local_head_seq"`
	CloudCursor           uint64     `json:"cloud_cursor"`
	Holes                 []SeqRange `json:"holes,omitzero"`
	LiveLag               uint64     `json:"live_lag"`
	BackfillRemainingSeqs uint64     `json:"backfill_remaining_seqs"`
	LastAck               time.Time  `json:"last_ack,omitempty"`
	LiveFramesSent        uint64     `json:"live_frames_sent"`
	BackfillBlocksSent    uint64     `json:"backfill_blocks_sent"`
	BackfillBytesSent     uint64     `json:"backfill_bytes_sent"`
	Reconnects            uint64     `json:"reconnects"`
}

ReplicationStatus is the boat-side view of replication state.

type RxFrame

type RxFrame struct {
	Timestamp time.Time
	Header    CANHeader
	Data      []byte
	Bus       string // SocketCAN interface name (e.g. "can0"); empty for single-bus or unknown
	Seq       uint64 // assigned by broker in handleFrame; zero when fed by external code
}

RxFrame is a reassembled CAN frame ready for the broker.

type SeqRange

type SeqRange struct {
	Start uint64 // inclusive
	End   uint64 // exclusive
}

SeqRange represents a half-open interval [Start, End) of sequence numbers.

type SequenceGapError added in v0.4.0

type SequenceGapError struct {
	ExpectedSeq uint64
	ActualSeq   uint64
}

SequenceGapError indicates that journal data has a gap in sequence numbers. The consumer expected ExpectedSeq but the next available sequence was ActualSeq.

func (*SequenceGapError) Error added in v0.4.0

func (e *SequenceGapError) Error() string

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server handles HTTP API requests for lplex.

func NewServer

func NewServer(broker *Broker, logger *slog.Logger, policy sendpolicy.SendPolicy) *Server

NewServer creates a new HTTP server wired to the given broker. Use SetAPIKey to enable authentication.

func (*Server) HandleEphemeralSSE

func (s *Server) HandleEphemeralSSE(w http.ResponseWriter, r *http.Request)

HandleEphemeralSSE handles GET /events. Ephemeral SSE stream, no session, no ACK, no replay. Optional query params for filtering: pgn, manufacturer, instance, name (hex).

func (*Server) HandleFunc

func (s *Server) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))

HandleFunc registers an additional HTTP handler on the server's mux.

func (*Server) HandleWebSocket added in v0.4.0

func (s *Server) HandleWebSocket(w http.ResponseWriter, r *http.Request)

HandleWebSocket upgrades an HTTP connection to a WebSocket and provides bidirectional communication: CAN frames flow to the client (filtered like /events), and the client can send CAN frames (like /send) on the same connection.

Query params for filtering are the same as /events: pgn, exclude_pgn, manufacturer, instance, name, exclude_name.

func (*Server) ServeHTTP

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP implements http.Handler.

func (*Server) SetAPIKey added in v0.4.0

func (s *Server) SetAPIKey(key string)

SetAPIKey enables API key authentication. When set, all HTTP requests must include the key via either the Authorization header (Bearer token) or the X-API-Key header. Health/liveness endpoints are exempt.

func (*Server) SetReadOnly added in v0.4.0

func (s *Server) SetReadOnly(readOnly bool)

SetReadOnly completely disables the /send and /query endpoints, regardless of the send policy. This is a defense-in-depth kill switch.

func (*Server) SetSendPolicy added in v0.4.0

func (s *Server) SetSendPolicy(policy sendpolicy.SendPolicy)

SetSendPolicy updates the send policy at runtime (for config reload).

func (*Server) SetSendRateLimit added in v0.4.0

func (s *Server) SetSendRateLimit(rps float64, burst int)

SetSendRateLimit enables rate limiting on the /send and /query endpoints. rps is the sustained requests per second; burst is the maximum burst size.

type SessionNotFoundError added in v0.4.0

type SessionNotFoundError struct {
	SessionID string
}

SessionNotFoundError indicates that a client session ID was not found.

func (*SessionNotFoundError) Error added in v0.4.0

func (e *SessionNotFoundError) Error() string

type SlotFilterConfig added in v0.4.0

type SlotFilterConfig struct {
	PGN          []uint32 `json:"pgn,omitempty"`
	ExcludePGN   []uint32 `json:"exclude_pgn,omitempty"`
	Manufacturer []string `json:"manufacturer,omitempty"`
	Instance     []uint8  `json:"instance,omitempty"`
	Name         []string `json:"name,omitempty"`
	ExcludeName  []string `json:"exclude_name,omitempty"`
	Bus          []string `json:"bus,omitempty"`
}

SlotFilterConfig is the JSON/HOCON-friendly representation of an EventFilter.

type SocketCANBus added in v0.4.0

type SocketCANBus struct {
	// contains filtered or unexported fields
}

SocketCANBus implements CANBus using Linux SocketCAN.

func NewSocketCANBus added in v0.4.0

func NewSocketCANBus(iface string, logger *slog.Logger) *SocketCANBus

NewSocketCANBus creates a CANBus backed by a Linux SocketCAN interface.

func (*SocketCANBus) Name added in v0.4.0

func (b *SocketCANBus) Name() string

func (*SocketCANBus) ReadFrames added in v0.4.0

func (b *SocketCANBus) ReadFrames(ctx context.Context, rxFrames chan<- RxFrame) error

func (*SocketCANBus) WriteFrames added in v0.4.0

func (b *SocketCANBus) WriteFrames(ctx context.Context, txFrames <-chan TxRequest) error

type SyncState

type SyncState struct {
	Cursor           uint64     // continuous data through this seq
	Holes            []SeqRange // sorted gaps
	BoatHeadSeq      uint64     // last reported by boat
	BoatJournalBytes uint64
}

SyncState captures the replication state for an instance, used for persistence and handshake responses.

type TracingConfig added in v0.4.0

type TracingConfig struct {
	// Enabled enables tracing. When false, a no-op tracer is used.
	Enabled bool

	// Endpoint is the OTLP gRPC collector endpoint (e.g. "localhost:4317").
	Endpoint string

	// ServiceName identifies this service in traces (e.g. "lplex-server", "lplex-cloud").
	ServiceName string

	// ServiceVersion is the build version.
	ServiceVersion string

	// SampleRatio controls probabilistic sampling (0.0 to 1.0).
	// 1.0 = sample everything, 0.01 = sample 1%.
	SampleRatio float64
}

TracingConfig configures OpenTelemetry distributed tracing.

type TxRequest

type TxRequest struct {
	Header CANHeader
	Data   []byte
	Bus    string // target SocketCAN interface; empty = default (first) bus
}

TxRequest is a frame to write to the CAN bus.

type ValueStore

type ValueStore struct {
	// contains filtered or unexported fields
}

ValueStore tracks the last-seen frame data for each (bus, source, PGN) tuple. The broker goroutine writes via Record; HTTP handlers read via Snapshot.

func NewValueStore

func NewValueStore() *ValueStore

NewValueStore creates an empty value store.

func (*ValueStore) DecodedSnapshot added in v0.3.1

func (vs *ValueStore) DecodedSnapshot(devices *DeviceRegistry, filter *EventFilter) []DecodedDeviceValues

DecodedSnapshot returns the current values grouped by device with PGN data decoded into named fields using the pgn.Registry. PGNs not in the registry or that fail to decode are omitted.

func (*ValueStore) DecodedSnapshotJSON added in v0.3.1

func (vs *ValueStore) DecodedSnapshotJSON(devices *DeviceRegistry, filter *EventFilter) json.RawMessage

DecodedSnapshotJSON returns the decoded snapshot as pre-serialized JSON.

func (*ValueStore) Record

func (vs *ValueStore) Record(bus string, source uint8, pgn uint32, ts time.Time, data []byte, seq uint64)

Record updates the stored value for the given (bus, source, PGN). Called by the broker goroutine on every frame.

func (*ValueStore) RemoveSource added in v0.4.0

func (vs *ValueStore) RemoveSource(bus string, source uint8)

RemoveSource deletes all stored values for the given (bus, source) pair.

func (*ValueStore) Snapshot

func (vs *ValueStore) Snapshot(devices *DeviceRegistry, filter *EventFilter) []DeviceValues

Snapshot returns the current values grouped by device, resolved against the device registry for NAME and manufacturer info. An optional filter restricts results by PGN, bus, and/or device criteria (manufacturer, name, instance).

func (*ValueStore) SnapshotJSON

func (vs *ValueStore) SnapshotJSON(devices *DeviceRegistry, filter *EventFilter) json.RawMessage

SnapshotJSON returns the snapshot as pre-serialized JSON.

type VirtualDevice added in v0.4.0

type VirtualDevice struct {
	// contains filtered or unexported fields
}

VirtualDevice is a single virtual NMEA 2000 device managed by the VirtualDeviceManager.

type VirtualDeviceConfig added in v0.4.0

type VirtualDeviceConfig struct {
	// NAME is the 64-bit ISO NAME. Lower values win address conflicts.
	NAME        uint64
	ProductInfo VirtualProductInfo
}

VirtualDeviceConfig configures a single virtual NMEA 2000 device.

type VirtualDeviceManager added in v0.4.0

type VirtualDeviceManager struct {
	// contains filtered or unexported fields
}

VirtualDeviceManager manages a set of virtual NMEA 2000 devices that claim addresses on the CAN bus, making lplex-server a legitimate bus participant.

Thread safety: the manager is called from the broker's single goroutine for frame handling (HandleBusClaim, HandleISORequest) and from HTTP handlers for ClaimedSource/Ready. The mutex protects the device state for the latter case.

func NewVirtualDeviceManager added in v0.4.0

func NewVirtualDeviceManager(txFunc func(TxRequest), registry *DeviceRegistry, logger *slog.Logger, claimInterval, productInfoInterval time.Duration) *VirtualDeviceManager

NewVirtualDeviceManager creates a new manager. txFunc is called to send frames to the CAN bus. registry is consulted for address selection. claimInterval and productInfoInterval control heartbeat frequency; zero values use DefaultClaimHeartbeat and DefaultProductInfoHeartbeat.

func (*VirtualDeviceManager) Add added in v0.4.0

Add registers a virtual device configuration. Call before Start.

func (*VirtualDeviceManager) ClaimedSource added in v0.4.0

func (m *VirtualDeviceManager) ClaimedSource() (uint8, bool)

ClaimedSource returns the source address of the first virtual device that has successfully claimed an address. Returns (0, false) if none are claimed.

func (*VirtualDeviceManager) Devices added in v0.4.0

Devices returns a snapshot of virtual device states for diagnostics.

func (*VirtualDeviceManager) HandleBusClaim added in v0.4.0

func (m *VirtualDeviceManager) HandleBusClaim(source uint8, name uint64) bool

HandleBusClaim is called by the broker when a PGN 60928 address claim is received from the bus. It checks if any of our virtual devices conflict and resolves per NMEA 2000: lower NAME wins.

Returns true if the frame was handled (either as an echo or conflict) and should NOT be registered in the device registry.

func (*VirtualDeviceManager) HandleISORequest added in v0.4.0

func (m *VirtualDeviceManager) HandleISORequest(dst uint8, requestedPGN uint32, requesterSrc uint8)

HandleISORequest is called when PGN 59904 is received. If the request targets one of our virtual devices, we respond with the appropriate data.

func (*VirtualDeviceManager) Heartbeat added in v0.4.0

func (m *VirtualDeviceManager) Heartbeat()

Heartbeat re-broadcasts address claims and product info for all held virtual devices at their respective intervals. Called from the broker's ticker goroutine (single-threaded, no concurrent calls).

func (*VirtualDeviceManager) ProductInfoPayload added in v0.4.0

func (m *VirtualDeviceManager) ProductInfoPayload(source uint8) []byte

ProductInfoPayload returns the 134-byte PGN 126996 payload for the virtual device at the given source address, or nil if no device uses that address.

func (*VirtualDeviceManager) Ready added in v0.4.0

func (m *VirtualDeviceManager) Ready() bool

Ready returns true if at least one virtual device has claimed an address and the holdoff period has elapsed.

func (*VirtualDeviceManager) StartAfterDiscovery added in v0.4.0

func (m *VirtualDeviceManager) StartAfterDiscovery(ctx context.Context, delay time.Duration)

StartAfterDiscovery waits for the device table to populate (the broker broadcasts an ISO Request for PGN 60928 on startup), then claims addresses for all configured virtual devices.

type VirtualDeviceStatus added in v0.4.0

type VirtualDeviceStatus struct {
	NAME    string `json:"name"`
	Source  uint8  `json:"src"`
	State   string `json:"state"`
	ModelID string `json:"model_id,omitempty"`
}

VirtualDeviceStatus is a diagnostic snapshot of a virtual device.

type VirtualProductInfo added in v0.4.0

type VirtualProductInfo struct {
	ModelID         string
	SoftwareVersion string
	ModelVersion    string
	ModelSerial     string
	ProductCode     uint16
}

VirtualProductInfo holds the PGN 126996 fields for a virtual device.

Directories

Path Synopsis
Package canbus provides shared NMEA 2000 / CAN bus types and functions importable by both the server internals and client tools.
Package canbus provides shared NMEA 2000 / CAN bus types and functions importable by both the server internals and client tools.
cmd
journalbench command
journalbench reads frames from a .lpj journal file and re-encodes them with every combination of block size and compression method to find the optimal parameters for CAN bus journal data.
journalbench reads frames from a .lpj journal file and re-encodes them with every combination of block size and compression method to find the optimal parameters for CAN bus journal data.
lplex command
lplex-cloud command
lplex-server command
pgngen command
pgngen generates Go structs, Protocol Buffer definitions, and JSON Schema from NMEA 2000 PGN definition files written in the lplex PGN DSL.
pgngen generates Go structs, Protocol Buffer definitions, and JSON Schema from NMEA 2000 PGN definition files written in the lplex PGN DSL.
Package filter provides a BPF-inspired expression language for filtering decoded NMEA 2000 frames by field values.
Package filter provides a BPF-inspired expression language for filtering decoded NMEA 2000 frames by field values.
Package journal provides the block-based binary journal format (.lpj files) for recording and replaying NMEA 2000 CAN frames.
Package journal provides the block-based binary journal format (.lpj files) for recording and replaying NMEA 2000 CAN frames.
Package lplexc provides a Go client for lplex, a CAN bus HTTP bridge for NMEA 2000.
Package lplexc provides a Go client for lplex, a CAN bus HTTP bridge for NMEA 2000.
Package pgn provides generated Go types for decoding and encoding NMEA 2000 PGN messages.
Package pgn provides generated Go types for decoding and encoding NMEA 2000 PGN messages.
Package pgngen provides a DSL parser and code generators for NMEA 2000 PGN definitions.
Package pgngen provides a DSL parser and code generators for NMEA 2000 PGN definitions.
proto

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL