hub

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2025 License: MIT Imports: 9 Imported by: 0

README

Hub - Event Pub/Sub Library for Go

Go Reference License Maintainability Test Coverage

Hub is a publish-subscribe event hub for Go applications, featuring:

  • Topic-based routing with key-value attributes
  • Efficient matching using multi-level indexes
  • Flexible subscription options (sync/async, one-time)
  • Thread-safe implementation

Installation

go get github.com/lomik/hub

Core Concepts

  • Event: Message with payload and topic attributes
  • Topic: Routing path defined by key-value pairs (e.g., "type=alert", "priority=high")
  • Subscription: Callback that receives matching events

Basic Usage

Initialization
package main

import "github.com/lomik/hub"

func main() {
    h := hub.New()
}
Publishing Events

The Publish method provides a streamlined way to send events:

// Simple event with string payload
h.Publish(ctx, 
    hub.T("type=alert", "priority=high"),
    "Server CPU overload",
)

// With structured data and options
h.Publish(ctx,
    hub.T("type=metrics", "source=api"),
    map[string]any{
        "cpu": 85.2,
        "mem": 45.7,
    },
    hub.Wait(true), // Wait for handlers
    hub.Sync(true), // Process synchronously
)
Subscribing to Events

The Subscribe method supports flexible callback signatures:

// Minimal callback
id1, _ := h.Subscribe(ctx, hub.T("type=alert"), func(ctx context.Context) error {
    log.Println("Alert received")
    return nil
})

// Typed payload
id2, _ := h.Subscribe(ctx, hub.T("type=metrics"), func(ctx context.Context, stats map[string]float64) error {
    log.Printf("CPU: %.1f%%, Mem: %.1f%%", stats["cpu"], stats["mem"])
    return nil
})

// Generic payload
id3, _ := h.Subscribe(ctx, hub.T(), func(ctx context.Context, payload any) error {
    log.Printf("Event received: %T", payload)
    return nil
})
Complete Example
h := hub.New()

// Subscribe to metrics
h.Subscribe(ctx, hub.T("type=metrics"), func(ctx context.Context, m map[string]any) error {
    log.Printf("Metrics: %+v", m)
    return nil
})

// Publish metrics
h.Publish(ctx,
    hub.T("type=metrics"),
    map[string]any{
        "requests": 1423,
        "errors":   27,
    },
)

// Unsubscribe later
h.Unsubscribe(ctx, id)
Patterns
Topic Matching
// Subscriber wants alerts of any priority
subscriberTopic := hub.T("type=alert", "priority=*")

// Publisher sends high priority alert
eventTopic := hub.T("type=alert", "priority=high")

// This will match
h.Subscribe(ctx, subscriberTopic, func(ctx context.Context, p string) error {
    fmt.Println(p)
})
h.Publish(ctx, eventTopic, "System overload")
Merging Topics
base := hub.T("app=web", "env=production")
extended := base.With("user=admin", "action=delete")

h.Publish(ctx, extended, "User deleted item")

Documentation

Index

Constants

View Source
const Any string = "*"

Any is a special value that matches any other value in topic matching

Variables

This section is empty.

Functions

This section is empty.

Types

type CastError added in v1.0.1

type CastError struct {
	// contains filtered or unexported fields
}

CastError represents an error that occurs during type casting.

func (*CastError) Error added in v1.0.1

func (e *CastError) Error() string

Error implements the error interface for CastError.

type Handler

type Handler func(ctx context.Context, t *Topic, p any) error

Handler defines a function signature for processing events in the hub. It receives a context for cancellation/timeout and the event to process. Return an error to indicate processing failure.

Usage:

var myHandler Handler = func(ctx context.Context, e *Event) error {
    log.Printf("Processing event: %s", e.Topic())
    return nil // Return nil on success
}

h.Subscribe(ctx, topic, myHandler)

type Hub

type Hub struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Hub implements a pub/sub system with optimized subscription matching using multi-level indexes for efficient event distribution

func New

func New(opts ...HubOption) *Hub

New creates and initializes a new Hub instance

func (*Hub) Clear

func (h *Hub) Clear(ctx context.Context)

Clear removes all active subscriptions

func (*Hub) Len

func (h *Hub) Len() int

Len returns current number of active subscriptions

func (*Hub) Publish

func (h *Hub) Publish(ctx context.Context, topic *Topic, payload any, opts ...PublishOption)

Publish sends an event to all subscribers of the specified topic with the given payload.

Parameters:

  • ctx: Context for cancellation and timeouts
  • topic: Destination topic for the event (required)
  • payload: Event data (can be any type)
  • opts: Optional publishing settings:
  • hub.Wait(true) - wait for all handlers to complete
  • hub.Sync(true) - process handlers synchronously
  • hub.OnFinish() - add completion callback

Behavior:

  • Creates a new Event with the provided topic and payload
  • Applies all specified PublishOptions
  • Delivers to all matching subscribers
  • Handles payload conversion automatically when subscribers use typed callbacks

Example usage:

// Simple publish
hub.Publish(ctx,
    hub.T("type=alert", "priority=high"),
    "server is down",
)

// With options
hub.Publish(ctx,
    hub.T("type=metrics"),
    map[string]any{"cpu": 85, "mem": 45},
    hub.Wait(true),          // Wait for processing
    hub.OnFinish(func(ctx context.Context, e *hub.Event) {
        log.Println("Event processed")
    }),
)

Notes: - The payload will be automatically converted when subscribers use typed callbacks - Topic is required (use hub.T() to create topics) - Safe for concurrent use

func (*Hub) Subscribe

func (h *Hub) Subscribe(ctx context.Context, t *Topic, cb interface{}, opts ...SubscribeOption) (SubID, error)

Subscribe registers an event handler with flexible callback signature options.

Supported callback formats:

  1. Minimal without topic and payload: func(ctx context.Context) error func(ctx context.Context)
  2. With original topic: func(ctx context.Context, topic *Topic, payload any) error func(ctx context.Context, topic *Topic, payload any)
  3. Typed payload: func(ctx context.Context, payload Type) error func(ctx context.Context, payload Type)
  4. Generic payload without topic: func(ctx context.Context, payload any) error func(ctx context.Context, payload any)

Supported payload types (Type):

  • All integer types (int8-int64, uint8-uint64)
  • Floating point (float32, float64)
  • String and boolean
  • Time types (time.Time, time.Duration)
  • Common collections ([]string, map[string]any)

Parameters:

  • ctx: Context for cancellation and timeouts
  • t: Topic to subscribe to (use hub.T() for all events)
  • cb: Callback function in one of supported formats
  • opts: Optional subscription settings (e.g., Once for single delivery)

Returns:

  • Subscription ID that can be used for unsubscribing
  • Error if:
  • Callback signature is invalid
  • Topic is nil
  • Unsupported parameter type in callback

Behavior:

  • For typed callbacks, attempts direct type assertion first
  • Falls back to automatic conversion using spf13/cast
  • Returns conversion errors during event delivery
  • Supports all standard SubscribeOption configurations

Example usage:

// Minimal callback
id, err := hub.Subscribe(ctx, topic, func(ctx context.Context) error {
    return nil
})

// Typed payload
id, err := hub.Subscribe(ctx, topic, func(ctx context.Context, id int) error {
    log.Printf("Processing ID: %d", id)
    return nil
})

// With options
id, err := hub.Subscribe(ctx, topic,
    func(ctx context.Context, msg string) error {
        return nil
    },
    hub.Once(true), // Auto-unsubscribe after first event
)

Notes: - Prefer specific typed callbacks when possible for better performance - The generic 'any' signature provides flexibility at small performance cost - All type validation occurs during subscription, not event delivery

func (*Hub) ToHandler

func (h *Hub) ToHandler(ctx context.Context, cb any) (Handler, error)

ToHandler converts various callback signatures into a standardized Event handler function.

func (*Hub) Unsubscribe

func (h *Hub) Unsubscribe(ctx context.Context, id SubID)

Unsubscribe removes a subscription by ID

type HubOption

type HubOption interface {
	// contains filtered or unexported methods
}

HubOption defines an interface for configuring Hub instances during creation.

func ToHandler

func ToHandler(converter func(_ context.Context, cb any) (Handler, error)) HubOption

ToHandler registers a custom callback converter function that transforms arbitrary functions into Handler types. This enables support for additional callback signatures beyond the built-in ones.

The converter function should:

  • Accept a context and the callback value
  • Return a Handler and nil error if conversion succeeds
  • Return an error if something wrong
  • Return nil Handler and nil error if the callback type is unsupported

Multiple converters can be registered. They are tried in registration order until one succeeds or all fail.

Example:

hub.New(
    hub.ToHandler(func(ctx context.Context, cb any) (Handler, error) {
        if fn, ok := cb.(func(string) error); ok {
            return func(ctx context.Context, e *Event) error {
                s, _ := e.Payload().(string)
                return fn(s)
            }, nil
        }
        return nil, errors.New("unsupported type")
    }),
)

type PublishOption

type PublishOption interface {
	// contains filtered or unexported methods
}

PublishOption defines an interface for modifying event publishing behavior

func OnFinish

func OnFinish(cb func(ctx context.Context)) PublishOption

OnFinish creates a PublishOption with completion callback The callback executes after all handlers process the event

func Sync

func Sync(v bool) PublishOption

Sync creates a PublishOption that controls synchronous event processing. When enabled (v=true), the event will be processed by all handlers sequentially in the publishing goroutine, without spawning additional goroutines. This ensures strict ordering but blocks the publisher during processing.

func Wait

func Wait(v bool) PublishOption

Wait creates a PublishOption that controls waiting for handlers When true, Publish will block until all handlers complete

type SubID

type SubID uint64

type SubscribeOption

type SubscribeOption interface {
	// contains filtered or unexported methods
}

SubscribeOption defines an interface for modifying subscription parameters

func Once

func Once(v bool) SubscribeOption

Once creates a SubscribeOption that controls single delivery When true, the subscription will be automatically removed after first delivery

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

Topic represents a named channel for event distribution with key-value attributes. It's immutable after creation and safe for concurrent use.

func NewTopic

func NewTopic(args ...string) (*Topic, error)

NewTopic creates a new Topic from key-value pairs. Pairs can be provided in two formats:

  • "key=value" strings
  • Separate "key", "value" arguments

Returns error if input format is invalid.

Example:

t, err := NewTopic("type=alert", "severity=high")

func T

func T(args ...string) *Topic

T creates a new Topic from key-value pairs, panicking on error. Simplified version of NewTopic for use in tests and initialization.

Example:

t := T("type=alert", "severity=high")

func (*Topic) Each

func (t *Topic) Each(cb func(k, v string))

Each iterates over all key-value pairs in the Topic. Pairs are processed in sorted key order.

Example:

t := T("b=2", "a=1")
t.Each(func(k, v string) {
    fmt.Printf("%s=%s\n", k, v)
})
// Output:
// a=1
// b=2

func (*Topic) Get

func (t *Topic) Get(k string) string

Get returns the value for the specified key. Returns empty string if key doesn't exist.

Example:

t := T("type=alert")
v := t.Get("type") // returns "alert"

func (*Topic) Len

func (t *Topic) Len() int

Len returns the number of key-value pairs

func (*Topic) Match

func (t *Topic) Match(other *Topic) bool

Match checks if this Topic matches another Topic. A Topic matches if:

  • All keys in this Topic exist in the other Topic
  • Corresponding values are equal or one of them is Any ("*")

Does not consider additional keys in the other Topic.

Example:

t1 := T("type=alert", "severity=high")
t2 := T("type=alert", "severity=*", "source=server")
t1.Match(t2) // returns true

func (*Topic) With

func (t *Topic) With(args ...string) *Topic

With creates a new Topic by merging current attributes with new ones. New attributes override existing ones with the same keys. Panics if new attributes have invalid format.

Example:

t1 := T("type=alert", "severity=high")
t2 := t1.With("severity=low", "source=server")
// t2 now has: type=alert, severity=low, source=server

Directories

Path Synopsis
pkg
kv

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL