Documentation
¶
Overview ¶
Package semstreams provides a stream processor that builds semantic knowledge graphs from event data, with automatic community detection and progressive AI enhancement.
Overview ¶
SemStreams transforms event streams into a living knowledge graph stored in NATS KV. You define a vocabulary of predicates, implement a simple interface, and the system maintains entities, relationships, indexes, and communities automatically.
Key characteristics:
- Edge-first: Deploy on a Raspberry Pi with just NATS, or scale to clusters
- Offline-capable: NATS JetStream provides local persistence and sync
- Progressive: Start with rules, add search, then embeddings and LLM as needed
- Domain-driven: No mandatory AI dependencies—you enable what you need
Core Concept ¶
Events → Graphable Interface → Knowledge Graph → Queries
1. Events arrive (telemetry, records, notifications) 2. Your processor transforms them into entities with triples 3. SemStreams maintains the graph, indexes, and communities 4. Query by relationships, predicates, or semantic similarity
The Graphable Interface ¶
Your domain types implement Graphable to become graph entities:
type Graphable interface {
EntityID() string // 6-part federated identifier
Triples() []message.Triple // Facts about this entity
}
Example:
func (d *DroneTelemetry) EntityID() string {
return fmt.Sprintf("acme.ops.robotics.gcs.drone.%s", d.DroneID)
}
func (d *DroneTelemetry) Triples() []message.Triple {
id := d.EntityID()
return []message.Triple{
{Subject: id, Predicate: "drone.telemetry.battery", Object: d.Battery},
{Subject: id, Predicate: "fleet.membership.current", Object: d.FleetID},
}
}
Entity ID Format ¶
Use 6-part hierarchical identifiers for federation and queryability:
org.platform.domain.system.type.instance
Example: acme.ops.robotics.gcs.drone.001
Predicates ¶
Predicates follow domain.category.property format:
sensor.measurement.celsius geo.location.zone fleet.membership.current
Dotted notation enables NATS wildcard queries (sensor.measurement.*) and provides SQL-like query semantics via prefix matching.
Progressive Enhancement (Tiers) ¶
SemStreams supports three capability tiers:
Tier 0: Rules engine, explicit relationships, structural indexing (NATS only) Tier 1: + BM25 search, statistical communities (+ search index) Tier 2: + Neural embeddings, LLM summaries (+ embedding service, LLM)
Start with Tier 0. Add capabilities as resources allow.
Architecture ¶
Components connect via NATS subjects in flow-based configurations:
Input → Processor → Storage → Graph → Gateway │ │ │ │ │ UDP iot_sensor ObjectStore KV+ GraphQL File document (raw docs) Indexes MCP
Component types:
- Input: UDP, WebSocket, File - ingest external data
- Processor: Graph, JSONMap, Rule - transform and enrich
- Output: File, HTTPPost, WebSocket - export data
- Storage: ObjectStore - persist to NATS JetStream
- Gateway: HTTP, GraphQL, MCP - expose query APIs
State: NATS KV Buckets ¶
All state lives in NATS JetStream KV buckets:
Core buckets (always created):
- ENTITY_STATES: Entity records with triples and version
- PREDICATE_INDEX: Predicate → entity IDs
- INCOMING_INDEX: Entity ID → referencing entities
- OUTGOING_INDEX: Entity ID → referenced entities
- ALIAS_INDEX: Alias → entity ID
- SPATIAL_INDEX: Geohash → entity IDs
- TEMPORAL_INDEX: Time bucket → entity IDs
- RULE_STATE: Rule evaluation state per entity
Optional buckets (created when features enabled):
- STRUCTURAL_INDEX: K-core levels and pivot distances
- EMBEDDING_INDEX: Entity ID → embedding vector
- COMMUNITY_INDEX: Community records with members and summaries
Package Organization ¶
Core packages:
- component: Component lifecycle, registry, port definitions
- componentregistry: Registration of all component types
- engine: Component orchestration and lifecycle
- flowstore: Flow persistence (NATS KV)
- config: Configuration loading and validation
Graph packages:
- graph: Knowledge graph processing core
- message: Triple and entity message types
- vocabulary: Predicate definitions and standards
Infrastructure:
- natsclient: NATS connection management
- gateway: HTTP, GraphQL, MCP API endpoints
- service: Discovery, flow-builder, metrics services
- metric: Prometheus metrics
- health: Health check system
Components:
- input/: UDP, WebSocket, File inputs
- output/: File, HTTPPost, WebSocket outputs
- processor/: Graph, JSONMap, JSONFilter, Rule processors
- storage/: ObjectStore for raw document persistence
Utilities:
- pkg/buffer: Ring buffer for streaming
- pkg/cache: LRU caching
- pkg/retry: Retry policies
- pkg/worker: Worker pools
Usage ¶
Build and run:
task build ./bin/semstreams --config configs/semantic-flow.json
The binary uses componentregistry.Register() to register all component types. Flow configuration determines which components are instantiated.
Documentation ¶
See docs/ for comprehensive documentation:
- docs/basics/: Getting started, core interfaces
- docs/concepts/: Background knowledge, algorithms
- docs/advanced/: Clustering, LLM, performance tuning
- docs/rules/: Rules engine reference
- docs/contributing/: Development, testing, CI
Version ¶
Current: v0.5.0-alpha
Directories
¶
| Path | Synopsis |
|---|---|
|
Package agentic provides shared types for the SemStreams agentic processing system.
|
Package agentic provides shared types for the SemStreams agentic processing system. |
|
identity
Package identity provides DID-based cryptographic identity for AGNTCY integration.
|
Package identity provides DID-based cryptographic identity for AGNTCY integration. |
|
cmd
|
|
|
domain-graphql-generator
command
Package main provides a GraphQL code generator for semstreams.
|
Package main provides a GraphQL code generator for semstreams. |
|
e2e
command
Package main provides structured result comparison for tier runs
|
Package main provides structured result comparison for tier runs |
|
e2e-semstreams
command
Package main provides the E2E test application for SemStreams.
|
Package main provides the E2E test application for SemStreams. |
|
openapi-generator
command
Package main generates a complete OpenAPI 3.0 specification for SemStreams.
|
Package main generates a complete OpenAPI 3.0 specification for SemStreams. |
|
semstreams
command
Package main implements the entry point for the SemStreams application.
|
Package main implements the entry point for the SemStreams application. |
|
Package component defines the Discoverable interface and related types
|
Package component defines the Discoverable interface and related types |
|
flowgraph
Package flowgraph provides flow graph analysis and validation for component connections.
|
Package flowgraph provides flow graph analysis and validation for component connections. |
|
Package componentregistry provides component registration for SemStreams framework.
|
Package componentregistry provides component registration for SemStreams framework. |
|
Package config provides configuration management for StreamKit applications.
|
Package config provides configuration management for StreamKit applications. |
|
Package flowengine translates Flow entities to ComponentConfigs and manages deployment.
|
Package flowengine translates Flow entities to ComponentConfigs and manages deployment. |
|
examples
|
|
|
github-pr-workflow
Package githubprworkflow provides Graphable entity types for the GitHub issue-to-PR automation workflow.
|
Package githubprworkflow provides Graphable entity types for the GitHub issue-to-PR automation workflow. |
|
processors/document
Package document provides a generic document processor demonstrating the Graphable implementation pattern for text-rich content like documents, maintenance records, and observations.
|
Package document provides a generic document processor demonstrating the Graphable implementation pattern for text-rich content like documents, maintenance records, and observations. |
|
processors/iot_sensor
Package iotsensor provides an example domain processor demonstrating the correct Graphable implementation pattern for SemStreams.
|
Package iotsensor provides an example domain processor demonstrating the correct Graphable implementation pattern for SemStreams. |
|
processors/weather_station
Package weatherstation provides an example weather station processor demonstrating how to build a domain processor following the tutorial.
|
Package weatherstation provides an example weather station processor demonstrating how to build a domain processor following the tutorial. |
|
Package flowstore provides flow persistence and management.
|
Package flowstore provides flow persistence and management. |
|
Package gateway provides bidirectional protocol bridging for SemStreams.
|
Package gateway provides bidirectional protocol bridging for SemStreams. |
|
graph-gateway
Package graphgateway provides the graph-gateway component for exposing graph operations via HTTP.
|
Package graphgateway provides the graph-gateway component for exposing graph operations via HTTP. |
|
http
Package http provides an HTTP gateway implementation for bridging REST APIs to NATS services.
|
Package http provides an HTTP gateway implementation for bridging REST APIs to NATS services. |
|
Package graph provides shared types and error definitions for graph processing
|
Package graph provides shared types and error definitions for graph processing |
|
clustering
Package clustering provides community detection algorithms and graph clustering for discovering structural patterns in the knowledge graph.
|
Package clustering provides community detection algorithms and graph clustering for discovering structural patterns in the knowledge graph. |
|
datamanager
Package datamanager consolidates entity and edge operations into a unified data management service.
|
Package datamanager consolidates entity and edge operations into a unified data management service. |
|
embedding
Package embedding provides vector embedding generation and caching for semantic search in the knowledge graph.
|
Package embedding provides vector embedding generation and caching for semantic search in the knowledge graph. |
|
inference
Package inference provides structural anomaly detection for missing relationships.
|
Package inference provides structural anomaly detection for missing relationships. |
|
llm
Package llm provides LLM client abstractions for OpenAI-compatible APIs.
|
Package llm provides LLM client abstractions for OpenAI-compatible APIs. |
|
messagemanager
Package messagemanager provides message processing and entity extraction for the knowledge graph.
|
Package messagemanager provides message processing and entity extraction for the knowledge graph. |
|
query
Package query provides a clean interface for reading graph data from NATS KV buckets.
|
Package query provides a clean interface for reading graph data from NATS KV buckets. |
|
structural
Package structural provides structural graph indexing algorithms for query optimization and inference detection.
|
Package structural provides structural graph indexing algorithms for query optimization and inference detection. |
|
Package health provides health monitoring functionality for StreamKit components and systems with thread-safe status tracking and aggregation.
|
Package health provides health monitoring functionality for StreamKit components and systems with thread-safe status tracking and aggregation. |
|
input
|
|
|
a2a
Package a2a provides an input component that implements the A2A (Agent-to-Agent) protocol for receiving task delegations from external agents.
|
Package a2a provides an input component that implements the A2A (Agent-to-Agent) protocol for receiving task delegations from external agents. |
|
cli
Package cli provides a CLI input component for interactive user sessions.
|
Package cli provides a CLI input component for interactive user sessions. |
|
file
Package file provides a file input component for reading JSONL/JSON files and publishing to NATS.
|
Package file provides a file input component for reading JSONL/JSON files and publishing to NATS. |
|
github-webhook
Package githubwebhook provides an HTTP input component that receives GitHub webhook events and publishes them to NATS JetStream.
|
Package githubwebhook provides an HTTP input component that receives GitHub webhook events and publishes them to NATS JetStream. |
|
slim
Package slim provides an input component that bridges SLIM (Secure Lightweight Instant Messaging) groups to SemStreams using MLS (Messaging Layer Security).
|
Package slim provides an input component that bridges SLIM (Secure Lightweight Instant Messaging) groups to SemStreams using MLS (Messaging Layer Security). |
|
trustgraph
Package trustgraph provides a TrustGraph input component that polls TrustGraph's triples-query API and emits SemStreams entity messages.
|
Package trustgraph provides a TrustGraph input component that polls TrustGraph's triples-query API and emits SemStreams entity messages. |
|
udp
Package udp provides a UDP input component for receiving data over UDP sockets.
|
Package udp provides a UDP input component for receiving data over UDP sockets. |
|
websocket
Package websocket provides WebSocket input component for receiving federated data
|
Package websocket provides WebSocket input component for receiving federated data |
|
Package message provides the core message infrastructure for the SemStreams platform.
|
Package message provides the core message infrastructure for the SemStreams platform. |
|
Package metric provides Prometheus-based metrics collection and HTTP server for StreamKit platform monitoring and observability.
|
Package metric provides Prometheus-based metrics collection and HTTP server for StreamKit platform monitoring and observability. |
|
Package model provides a unified model registry for centralized endpoint configuration, capability-based routing, and tool capability metadata.
|
Package model provides a unified model registry for centralized endpoint configuration, capability-based routing, and tool capability metadata. |
|
Package natsclient provides a client for managing NATS connections with circuit breaker pattern.
|
Package natsclient provides a client for managing NATS connections with circuit breaker pattern. |
|
output
|
|
|
directory-bridge
Package directorybridge provides an output component that registers agents with AGNTCY directories using OASF (Open Agent Specification Framework) records.
|
Package directorybridge provides an output component that registers agents with AGNTCY directories using OASF (Open Agent Specification Framework) records. |
|
file
Package file provides a file output component for writing messages to files.
|
Package file provides a file output component for writing messages to files. |
|
httppost
Package httppost provides an HTTP POST output component for sending messages to HTTP endpoints.
|
Package httppost provides an HTTP POST output component for sending messages to HTTP endpoints. |
|
otel
Package otel provides an OpenTelemetry exporter for SemStreams agent telemetry.
|
Package otel provides an OpenTelemetry exporter for SemStreams agent telemetry. |
|
trustgraph
Package trustgraph provides a TrustGraph output component that watches SemStreams entity state changes and exports them as RDF triples to TrustGraph knowledge cores.
|
Package trustgraph provides a TrustGraph output component that watches SemStreams entity state changes and exports them as RDF triples to TrustGraph knowledge cores. |
|
websocket
Package websocket provides a WebSocket server output component for streaming messages to WebSocket clients.
|
Package websocket provides a WebSocket server output component for streaming messages to WebSocket clients. |
|
pkg
|
|
|
acme
Package acme provides ACME client functionality for automated certificate management
|
Package acme provides ACME client functionality for automated certificate management |
|
buffer
Package buffer provides generic, thread-safe buffer implementations with various overflow policies.
|
Package buffer provides generic, thread-safe buffer implementations with various overflow policies. |
|
cache
Package cache provides generic, thread-safe cache implementations with various eviction policies.
|
Package cache provides generic, thread-safe cache implementations with various eviction policies. |
|
context
Package context provides building blocks for context construction in agentic systems.
|
Package context provides building blocks for context construction in agentic systems. |
|
errs
Package errs provides standardized error handling patterns for SemStreams components.
|
Package errs provides standardized error handling patterns for SemStreams components. |
|
logging
Package logging provides slog handlers for structured logging with multi-destination support, NATS publishing, and graceful fallback behavior.
|
Package logging provides slog handlers for structured logging with multi-destination support, NATS publishing, and graceful fallback behavior. |
|
resource
Package resource provides utilities for monitoring resource availability.
|
Package resource provides utilities for monitoring resource availability. |
|
retry
Package retry provides simple exponential backoff retry logic for transient failures.
|
Package retry provides simple exponential backoff retry logic for transient failures. |
|
security
Package security provides platform-wide security configuration types
|
Package security provides platform-wide security configuration types |
|
text
Package text provides text manipulation utilities.
|
Package text provides text manipulation utilities. |
|
timestamp
Package timestamp provides standardized Unix timestamp handling utilities.
|
Package timestamp provides standardized Unix timestamp handling utilities. |
|
tlsutil
Package tlsutil provides TLS configuration utilities for secure connections.
|
Package tlsutil provides TLS configuration utilities for secure connections. |
|
types
Package types provides core type definitions for the semantic event mesh.
|
Package types provides core type definitions for the semantic event mesh. |
|
worker
Package worker provides a generic, thread-safe worker pool for concurrent task processing.
|
Package worker provides a generic, thread-safe worker pool for concurrent task processing. |
|
workflow
Package workflow provides workflow state management primitives for components that participate in stateful workflows.
|
Package workflow provides workflow state management primitives for components that participate in stateful workflows. |
|
processor
|
|
|
agentic-dispatch
Package agenticdispatch provides message routing between users and agentic loops.
|
Package agenticdispatch provides message routing between users and agentic loops. |
|
agentic-governance
Package agenticgovernance provides a governance layer processor component that enforces content policies, PII redaction, injection detection, and rate limiting for agentic message flows.
|
Package agenticgovernance provides a governance layer processor component that enforces content policies, PII redaction, injection detection, and rate limiting for agentic message flows. |
|
agentic-loop
Package agenticloop provides the agentic loop orchestrator component.
|
Package agenticloop provides the agentic loop orchestrator component. |
|
agentic-memory
Package agenticmemory provides a graph-backed agent memory processor component that manages context hydration, fact extraction, and memory checkpointing for agentic loops.
|
Package agenticmemory provides a graph-backed agent memory processor component that manages context hydration, fact extraction, and memory checkpointing for agentic loops. |
|
agentic-model
Package agenticmodel provides an OpenAI-compatible agentic model processor component that routes agent requests to configured LLM endpoints with retry logic and tool calling support.
|
Package agenticmodel provides an OpenAI-compatible agentic model processor component that routes agent requests to configured LLM endpoints with retry logic and tool calling support. |
|
agentic-tools
Package agentictools provides a tool executor processor component that routes tool calls to registered tool executors with filtering and timeout support.
|
Package agentictools provides a tool executor processor component that routes tool calls to registered tool executors with filtering and timeout support. |
|
agentic-tools/executors
Package executors provides tool executor implementations for the agentic-tools component.
|
Package executors provides tool executor implementations for the agentic-tools component. |
|
graph-clustering
Package graphclustering provides anomaly detection integration for graph-clustering.
|
Package graphclustering provides anomaly detection integration for graph-clustering. |
|
graph-embedding
Package graphembedding provides the graph-embedding component for generating entity embeddings.
|
Package graphembedding provides the graph-embedding component for generating entity embeddings. |
|
graph-index
Package graphindex provides the graph-index component for maintaining graph relationship indexes.
|
Package graphindex provides the graph-index component for maintaining graph relationship indexes. |
|
graph-index-spatial
Package graphindexspatial provides the graph-index-spatial component for spatial indexing.
|
Package graphindexspatial provides the graph-index-spatial component for spatial indexing. |
|
graph-index-temporal
Package graphindextemporal provides the graph-index-temporal component for temporal indexing.
|
Package graphindextemporal provides the graph-index-temporal component for temporal indexing. |
|
graph-ingest
Package graphingest provides the graph-ingest component for entity and triple ingestion.
|
Package graphingest provides the graph-ingest component for entity and triple ingestion. |
|
graph-query
Package graphquery community cache implementation
|
Package graphquery community cache implementation |
|
json_filter
Package jsonfilter provides a processor for filtering GenericJSON messages based on field values and comparison rules.
|
Package jsonfilter provides a processor for filtering GenericJSON messages based on field values and comparison rules. |
|
json_generic
Package jsongeneric provides a processor for wrapping plain JSON into GenericJSON (core .json.v1) format for integration with StreamKit pipelines.
|
Package jsongeneric provides a processor for wrapping plain JSON into GenericJSON (core .json.v1) format for integration with StreamKit pipelines. |
|
json_map
Package jsonmapprocessor provides a processor for transforming GenericJSON messages through field mapping, adding, removing, and string transformations.
|
Package jsonmapprocessor provides a processor for transforming GenericJSON messages through field mapping, adding, removing, and string transformations. |
|
oasf-generator
Package oasfgenerator provides an OASF (Open Agent Specification Framework) record generator processor component for SemStreams.
|
Package oasfgenerator provides an OASF (Open Agent Specification Framework) record generator processor component for SemStreams. |
|
parser
Package parser provides utilities for parsing various data formats into structured maps for use with StreamKit message processing.
|
Package parser provides utilities for parsing various data formats into structured maps for use with StreamKit message processing. |
|
reactive
Package reactive provides a reactive workflow engine built on rules engine primitives.
|
Package reactive provides a reactive workflow engine built on rules engine primitives. |
|
reactive/testutil
Package testutil provides testing utilities for the reactive workflow engine.
|
Package testutil provides testing utilities for the reactive workflow engine. |
|
rule
Package rule - Action execution for ECA rules
|
Package rule - Action execution for ECA rules |
|
rule/boid
Package boid implements Boids-inspired local coordination rules for multi-agent teams.
|
Package boid implements Boids-inspired local coordination rules for multi-agent teams. |
|
rule/expression
Package expression - Expression evaluator implementation
|
Package expression - Expression evaluator implementation |
|
Package service provides base functionality and common patterns for long-running services in the semstreams platform.
|
Package service provides base functionality and common patterns for long-running services in the semstreams platform. |
|
Package storage provides pluggable backend interfaces for storage operations.
|
Package storage provides pluggable backend interfaces for storage operations. |
|
objectstore
Package objectstore provides a NATS ObjectStore-based storage component for immutable message storage with time-bucketed keys and caching.
|
Package objectstore provides a NATS ObjectStore-based storage component for immutable message storage with time-bucketed keys and caching. |
|
Package subjects provides typed NATS subject definitions for compile-time type safety.
|
Package subjects provides typed NATS subject definitions for compile-time type safety. |
|
test
|
|
|
e2e/client
Package client provides test utilities for SemStreams E2E tests
|
Package client provides test utilities for SemStreams E2E tests |
|
e2e/config
Package config provides configuration for SemStreams E2E tests
|
Package config provides configuration for SemStreams E2E tests |
|
e2e/mock
Package mock provides mock servers for E2E testing.
|
Package mock provides mock servers for E2E testing. |
|
e2e/mock/cmd
command
Package main provides the standalone mock servers for E2E testing.
|
Package main provides the standalone mock servers for E2E testing. |
|
e2e/results
Package results provides structured result writing for E2E test scenarios
|
Package results provides structured result writing for E2E test scenarios |
|
e2e/scenarios
Package scenarios provides E2E test scenarios for SemStreams
|
Package scenarios provides E2E test scenarios for SemStreams |
|
e2e/scenarios/agentic
Package agentic provides the agentic E2E test scenario.
|
Package agentic provides the agentic E2E test scenario. |
|
e2e/scenarios/anomaly
Package anomaly provides ground truth validation for anomaly detection.
|
Package anomaly provides ground truth validation for anomaly detection. |
|
e2e/scenarios/boids
Package boids provides the Boids coordination A/B test scenario.
|
Package boids provides the Boids coordination A/B test scenario. |
|
e2e/scenarios/community
Package community provides ground truth validation for community detection.
|
Package community provides ground truth validation for community detection. |
|
e2e/scenarios/search
Package search provides unified search query execution for e2e tests.
|
Package search provides unified search query execution for e2e tests. |
|
e2e/scenarios/stages
Package stages contains extracted stage implementations for tiered E2E tests
|
Package stages contains extracted stage implementations for tiered E2E tests |
|
e2e/scenarios/throughput
Package throughput provides a high-throughput E2E scenario for performance profiling.
|
Package throughput provides a high-throughput E2E scenario for performance profiling. |
|
Package testutil provides testing utilities for StreamKit integration tests.
|
Package testutil provides testing utilities for StreamKit integration tests. |
|
Package trustgraph provides an HTTP client for TrustGraph REST APIs.
|
Package trustgraph provides an HTTP client for TrustGraph REST APIs. |
|
Package types contains shared domain types used across the semstreams platform
|
Package types contains shared domain types used across the semstreams platform |
|
Package vocabulary provides semantic vocabulary management for the SemStreams platform.
|
Package vocabulary provides semantic vocabulary management for the SemStreams platform. |
|
agentic
Package agentic provides vocabulary constants for AI agent interoperability.
|
Package agentic provides vocabulary constants for AI agent interoperability. |
|
bfo
Package bfo provides IRI constants for the Basic Formal Ontology (BFO) 2.0.
|
Package bfo provides IRI constants for the Basic Formal Ontology (BFO) 2.0. |
|
cco
Package cco provides IRI constants for the Common Core Ontologies (CCO).
|
Package cco provides IRI constants for the Common Core Ontologies (CCO). |
|
examples
Package examples provides reference vocabulary implementations.
|
Package examples provides reference vocabulary implementations. |
|
export
Package export serializes []message.Triple to standard RDF formats.
|
Package export serializes []message.Triple to standard RDF formats. |
|
trustgraph
Package trustgraph provides vocabulary translation between SemStreams dotted notation and RDF URIs for TrustGraph integration.
|
Package trustgraph provides vocabulary translation between SemStreams dotted notation and RDF URIs for TrustGraph integration. |