Documentation
¶
Index ¶
- Constants
- type CastError
- type Handler
- type Hub
- func (h *Hub) Clear(ctx context.Context)
- func (h *Hub) Len() int
- func (h *Hub) Publish(ctx context.Context, topic *Topic, payload any, opts ...PublishOption)
- func (h *Hub) Subscribe(ctx context.Context, t *Topic, cb interface{}, opts ...SubscribeOption) (SubID, error)
- func (h *Hub) ToHandler(ctx context.Context, cb any) (Handler, error)
- func (h *Hub) Unsubscribe(ctx context.Context, id SubID)
- type HubOption
- type PublishOption
- type SubID
- type SubscribeOption
- type Topic
Constants ¶
const Any string = "*"
Any is a special value that matches any other value in topic matching
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CastError ¶ added in v1.0.1
type CastError struct {
// contains filtered or unexported fields
}
CastError represents an error that occurs during type casting.
type Handler ¶
Handler defines a function signature for processing events in the hub. It receives a context for cancellation/timeout and the event to process. Return an error to indicate processing failure.
Usage:
var myHandler Handler = func(ctx context.Context, e *Event) error {
log.Printf("Processing event: %s", e.Topic())
return nil // Return nil on success
}
h.Subscribe(ctx, topic, myHandler)
type Hub ¶
Hub implements a pub/sub system with optimized subscription matching using multi-level indexes for efficient event distribution
func (*Hub) Publish ¶
Publish sends an event to all subscribers of the specified topic with the given payload.
Parameters:
- ctx: Context for cancellation and timeouts
- topic: Destination topic for the event (required)
- payload: Event data (can be any type)
- opts: Optional publishing settings:
- hub.Wait(true) - wait for all handlers to complete
- hub.Sync(true) - process handlers synchronously
- hub.OnFinish() - add completion callback
Behavior:
- Creates a new Event with the provided topic and payload
- Applies all specified PublishOptions
- Delivers to all matching subscribers
- Handles payload conversion automatically when subscribers use typed callbacks
Example usage:
// Simple publish
hub.Publish(ctx,
hub.T("type=alert", "priority=high"),
"server is down",
)
// With options
hub.Publish(ctx,
hub.T("type=metrics"),
map[string]any{"cpu": 85, "mem": 45},
hub.Wait(true), // Wait for processing
hub.OnFinish(func(ctx context.Context, e *hub.Event) {
log.Println("Event processed")
}),
)
Notes: - The payload will be automatically converted when subscribers use typed callbacks - Topic is required (use hub.T() to create topics) - Safe for concurrent use
func (*Hub) Subscribe ¶
func (h *Hub) Subscribe(ctx context.Context, t *Topic, cb interface{}, opts ...SubscribeOption) (SubID, error)
Subscribe registers an event handler with flexible callback signature options.
Supported callback formats:
- Minimal without topic and payload: func(ctx context.Context) error func(ctx context.Context)
- With original topic: func(ctx context.Context, topic *Topic, payload any) error func(ctx context.Context, topic *Topic, payload any)
- Typed payload: func(ctx context.Context, payload Type) error func(ctx context.Context, payload Type)
- Generic payload without topic: func(ctx context.Context, payload any) error func(ctx context.Context, payload any)
Supported payload types (Type):
- All integer types (int8-int64, uint8-uint64)
- Floating point (float32, float64)
- String and boolean
- Time types (time.Time, time.Duration)
- Common collections ([]string, map[string]any)
Parameters:
- ctx: Context for cancellation and timeouts
- t: Topic to subscribe to (use hub.T() for all events)
- cb: Callback function in one of supported formats
- opts: Optional subscription settings (e.g., Once for single delivery)
Returns:
- Subscription ID that can be used for unsubscribing
- Error if:
- Callback signature is invalid
- Topic is nil
- Unsupported parameter type in callback
Behavior:
- For typed callbacks, attempts direct type assertion first
- Falls back to automatic conversion using spf13/cast
- Returns conversion errors during event delivery
- Supports all standard SubscribeOption configurations
Example usage:
// Minimal callback
id, err := hub.Subscribe(ctx, topic, func(ctx context.Context) error {
return nil
})
// Typed payload
id, err := hub.Subscribe(ctx, topic, func(ctx context.Context, id int) error {
log.Printf("Processing ID: %d", id)
return nil
})
// With options
id, err := hub.Subscribe(ctx, topic,
func(ctx context.Context, msg string) error {
return nil
},
hub.Once(true), // Auto-unsubscribe after first event
)
Notes: - Prefer specific typed callbacks when possible for better performance - The generic 'any' signature provides flexibility at small performance cost - All type validation occurs during subscription, not event delivery
type HubOption ¶
type HubOption interface {
// contains filtered or unexported methods
}
HubOption defines an interface for configuring Hub instances during creation.
func ToHandler ¶
ToHandler registers a custom callback converter function that transforms arbitrary functions into Handler types. This enables support for additional callback signatures beyond the built-in ones.
The converter function should:
- Accept a context and the callback value
- Return a Handler and nil error if conversion succeeds
- Return an error if something wrong
- Return nil Handler and nil error if the callback type is unsupported
Multiple converters can be registered. They are tried in registration order until one succeeds or all fail.
Example:
hub.New(
hub.ToHandler(func(ctx context.Context, cb any) (Handler, error) {
if fn, ok := cb.(func(string) error); ok {
return func(ctx context.Context, e *Event) error {
s, _ := e.Payload().(string)
return fn(s)
}, nil
}
return nil, errors.New("unsupported type")
}),
)
type PublishOption ¶
type PublishOption interface {
// contains filtered or unexported methods
}
PublishOption defines an interface for modifying event publishing behavior
func OnFinish ¶
func OnFinish(cb func(ctx context.Context)) PublishOption
OnFinish creates a PublishOption with completion callback The callback executes after all handlers process the event
func Sync ¶
func Sync(v bool) PublishOption
Sync creates a PublishOption that controls synchronous event processing. When enabled (v=true), the event will be processed by all handlers sequentially in the publishing goroutine, without spawning additional goroutines. This ensures strict ordering but blocks the publisher during processing.
func Wait ¶
func Wait(v bool) PublishOption
Wait creates a PublishOption that controls waiting for handlers When true, Publish will block until all handlers complete
type SubscribeOption ¶
type SubscribeOption interface {
// contains filtered or unexported methods
}
SubscribeOption defines an interface for modifying subscription parameters
func Once ¶
func Once(v bool) SubscribeOption
Once creates a SubscribeOption that controls single delivery When true, the subscription will be automatically removed after first delivery
type Topic ¶
type Topic struct {
// contains filtered or unexported fields
}
Topic represents a named channel for event distribution with key-value attributes. It's immutable after creation and safe for concurrent use.
func NewTopic ¶
NewTopic creates a new Topic from key-value pairs. Pairs can be provided in two formats:
- "key=value" strings
- Separate "key", "value" arguments
Returns error if input format is invalid.
Example:
t, err := NewTopic("type=alert", "severity=high")
func T ¶
T creates a new Topic from key-value pairs, panicking on error. Simplified version of NewTopic for use in tests and initialization.
Example:
t := T("type=alert", "severity=high")
func (*Topic) Each ¶
Each iterates over all key-value pairs in the Topic. Pairs are processed in sorted key order.
Example:
t := T("b=2", "a=1")
t.Each(func(k, v string) {
fmt.Printf("%s=%s\n", k, v)
})
// Output:
// a=1
// b=2
func (*Topic) Get ¶
Get returns the value for the specified key. Returns empty string if key doesn't exist.
Example:
t := T("type=alert")
v := t.Get("type") // returns "alert"
func (*Topic) Match ¶
Match checks if this Topic matches another Topic. A Topic matches if:
- All keys in this Topic exist in the other Topic
- Corresponding values are equal or one of them is Any ("*")
Does not consider additional keys in the other Topic.
Example:
t1 := T("type=alert", "severity=high")
t2 := T("type=alert", "severity=*", "source=server")
t1.Match(t2) // returns true
func (*Topic) With ¶
With creates a new Topic by merging current attributes with new ones. New attributes override existing ones with the same keys. Panics if new attributes have invalid format.
Example:
t1 := T("type=alert", "severity=high")
t2 := t1.With("severity=low", "source=server")
// t2 now has: type=alert, severity=low, source=server