ds

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultRingSize = 1024

Variables

View Source
var (
	// ErrTopicNotFound indicates the given topic does not exist.
	ErrTopicNotFound = errors.New("topic not found")

	// ErrTopicAlreadyExists indicates the topic already exists.
	ErrTopicAlreadyExists = errors.New("topic already exists")

	// ErrTopicQueueFull indicates the topic queue is full
	ErrTopicQueueFull = errors.New("topic queue is full")
)
View Source
var ErrConnPoolClosed = fmt.Errorf("conn pool is closed")
View Source
var (
	ErrHubClosed = errors.New("hub is closed")
)

ErrHubClosed is returned when operations are attempted on a closed Hub.

Functions

func NewConnPool added in v0.0.35

func NewConnPool[T io.Closer](
	factory func() (T, error),
	residence int,
	opts ...ConnPoolOption[T],
) *connPool[T]

NewConnPool returns a new connection pool.

Parameters:

  • factory: Required function that creates new connections
  • residence: Maximum number of connections to keep in the pool (capacity)
  • opts: Optional configuration options

Returns:

  • *connPool[T]: A new connection pool instance

Panics:

  • If factory is nil

func NewCounter added in v0.1.1

func NewCounter() *counter

func NewMutexCounter added in v0.1.1

func NewMutexCounter() *mutexCounter

Types

type ArrayStack

type ArrayStack[T any] struct {
	// contains filtered or unexported fields
}

ArrayStack is a stack implementation using a slice as the underlying storage. It provides O(1) average time complexity for push and pop operations.

Type parameters:

  • T: The element type stored in the stack

func NewArrayStack

func NewArrayStack[T any]() *ArrayStack[T]

NewArrayStack creates a new empty ArrayStack.

func (*ArrayStack[T]) Empty

func (m *ArrayStack[T]) Empty() bool

Empty returns true if the stack contains no elements.

func (*ArrayStack[T]) Peek

func (m *ArrayStack[T]) Peek() (value T, exist bool)

Peek returns the top element without removing it from the stack. Returns the element and a boolean indicating if the stack was not empty.

func (*ArrayStack[T]) Pop

func (m *ArrayStack[T]) Pop() (value T, exist bool)

Pop removes and returns the top element from the stack. Returns the element and a boolean indicating if the stack was not empty.

func (*ArrayStack[T]) Push

func (m *ArrayStack[T]) Push(value T)

Push adds an element to the top of the stack.

func (*ArrayStack[T]) Size

func (m *ArrayStack[T]) Size() int

Size returns the number of elements in the stack.

func (*ArrayStack[T]) String

func (m *ArrayStack[T]) String() string

String returns a string representation of the stack with elements from top to bottom.

type BroadcastBus added in v0.0.31

type BroadcastBus[T any] struct {
	// contains filtered or unexported fields
}

BroadcastBus is a multi-topic, multi-subscriber pub-sub bus.

Each topic is a FastHub (fan-out): - When you publish a message to a topic, all subscribers receive a copy. - If a topic has no subscribers, messages are discarded.

Typical use case: event broadcast, notifications, state updates.

func NewBroadcastBus added in v0.0.31

func NewBroadcastBus[T any]() *BroadcastBus[T]

NewBroadcastBus creates a new BroadcastBus with the given buffer size for each subscriber channel.

func (*BroadcastBus[T]) AddTopic added in v0.0.32

func (u *BroadcastBus[T]) AddTopic(topic string, bufSize int) error

AddTopic creates a new topic with its own buffered channel. Returns an error if the topic already exists.

func (*BroadcastBus[T]) Close added in v0.0.31

func (u *BroadcastBus[T]) Close() error

Close closes all topics and marks the bus as closed. Any further operations on the bus will return an error.

func (*BroadcastBus[T]) Publish added in v0.0.31

func (u *BroadcastBus[T]) Publish(topic string, msg T) error

Publish broadcasts a message to all subscribers of the given topic.

If the topic has no subscribers, returns ErrTopicNotFound. If the bus is closed, returns an error.

func (*BroadcastBus[T]) RemoveTopic added in v0.0.32

func (u *BroadcastBus[T]) RemoveTopic(topic string) error

RemoveTopic closes and removes the topic's channel. Messages in the buffer are dropped.

func (*BroadcastBus[T]) Subscribe added in v0.0.31

func (u *BroadcastBus[T]) Subscribe(topic string) (*Subscription[T], error)

Subscribe subscribes to the given topic, creating the topic if needed.

Returns a new Subscription which has a buffered channel for receiving messages.

If the bus is closed, returns an error.

func (*BroadcastBus[T]) Unsubscribe added in v0.0.31

func (u *BroadcastBus[T]) Unsubscribe(topic string, sub *Subscription[T]) error

Unsubscribe removes a subscriber from the given topic. If no subscribers remain for that topic, the topic is closed and removed.

Returns an error if the bus is closed or the topic does not exist.

type Cache

type Cache[T any] struct {
	// contains filtered or unexported fields
}

Cache is a thread-safe in-memory cache with optional expiration and cleanup. It stores key-value pairs with configurable expiration times and automatic cleanup of expired entries.

Type parameters:

  • T: The type of values stored in the cache

func NewCache

func NewCache[T any](expire, cleanup time.Duration) *Cache[T]

NewCache creates a new cache with the specified expiration and cleanup intervals.

Parameters:

  • expire: Duration after which entries expire. Use 0 for no expiration.
  • cleanup: Interval for automatic cleanup of expired entries. Use 0 for no automatic cleanup.

Returns:

  • *Cache[T]: A new cache instance

Example:

// Create a cache with 1-minute expiration and 5-minute cleanup interval
cache := NewCache[string](time.Minute, 5*time.Minute)

func (*Cache[T]) Delete

func (c *Cache[T]) Delete(key string)

Delete removes an entry from the cache.

Parameters:

  • key: The key to delete

func (*Cache[T]) Get

func (c *Cache[T]) Get(key string) (value T, loaded bool)

Get retrieves a value from the cache by key.

Parameters:

  • key: The key to look up

Returns:

  • value: The retrieved value (zero value if not found)
  • loaded: True if the key was found and not expired

func (*Cache[T]) Release added in v0.0.32

func (c *Cache[T]) Release() error

Release stops the cleanup goroutine and releases resources. Call this method when the cache is no longer needed to prevent goroutine leaks.

Returns:

  • error: Always returns nil

func (*Cache[T]) Set

func (c *Cache[T]) Set(key string, value T)

Set stores a value in the cache with the default expiration time.

Parameters:

  • key: The key to store the value under
  • value: The value to store

func (*Cache[T]) SetWithExpire

func (c *Cache[T]) SetWithExpire(key string, value T, expire time.Duration)

SetWithExpire stores a value in the cache with a custom expiration time.

Parameters:

  • key: The key to store the value under
  • value: The value to store
  • expire: Custom expiration duration for this entry

type ConnPoolOption added in v0.0.35

type ConnPoolOption[T io.Closer] func(*connPool[T])

ConnPoolOption defines optional configuration for the connection pool.

func WithCheck added in v0.0.35

func WithCheck[T io.Closer](check func(T) bool) ConnPoolOption[T]

WithCheck returns a ConnPoolOption that sets a connection validation function. The check function is called when retrieving a connection from the pool to verify it's still usable. If check returns false, the connection is closed and a new one is created.

type Counter added in v0.1.1

type Counter interface {
	// Diff calculates the difference between the new value and the previous value.
	// It updates the counter with the new value and returns the difference.
	//
	// Parameters:
	//   - val: The new value to update the counter with
	//
	// Returns:
	//   - The difference between the new value and the previous value
	//   - Returns 0 if this is the first update
	Diff(float64) float64

	// Rate calculates the rate of change per second.
	// Equivalent to RateIn(val, time.Second).
	//
	// Parameters:
	//   - val: The new value to update the counter with
	//
	// Returns:
	//   - The rate of change per second
	//   - Returns 0 if this is the first update or time elapsed is 0
	Rate(float64) float64

	// RateIn calculates the rate of change per specified time interval.
	// Formula: (new_value - old_value) / (time_elapsed / interval)
	//
	// Parameters:
	//   - val: The new value to update the counter with
	//   - interval: The time interval to normalize the rate to (e.g., time.Second for per-second rate)
	//
	// Returns:
	//   - The rate of change per specified interval
	//   - Returns 0 if this is the first update, interval <= 0, or time elapsed is 0
	RateIn(float64, time.Duration) float64
}

type FastHub added in v0.0.31

type FastHub[T any] struct {
	// contains filtered or unexported fields
}

FastHub is a simple in-memory publish-subscribe hub. It broadcasts each published message to all active subscribers. FastHub does not guarantee message ordering or reliable delivery. If a subscriber's channel buffer is full, the message is dropped.

func NewFastHub added in v0.0.31

func NewFastHub[T any](bufSize int) *FastHub[T]

NewFastHub creates a new FastBus with the given buffer size for each subscriber.

func (*FastHub[T]) Close added in v0.0.31

func (b *FastHub[T]) Close() error

Close closes the bus and all active subscriber channels. After closing, Publish and Subscribe will return ErrHubClosed.

func (*FastHub) Increment added in v0.0.31

func (g *FastHub) Increment() uint64

Increment atomically increments and returns a new unique ID.

func (*FastHub[T]) Publish added in v0.0.31

func (b *FastHub[T]) Publish(v T) error

Publish broadcasts the given value to all active subscribers. If a subscriber's channel is full, the message is dropped. If the bus is closed, it returns ErrHubClosed.

func (*FastHub) SetPublishCallback added in v0.0.39

func (h *FastHub) SetPublishCallback(f func(id uint64, v T))

func (*FastHub[T]) Subscribe added in v0.0.31

func (b *FastHub[T]) Subscribe() (*Subscription[T], error)

Subscribe registers a new subscriber and returns its Subscription. If the bus is closed, it returns ErrHubClosed.

func (*FastHub[T]) Unsubscribe added in v0.0.31

func (b *FastHub[T]) Unsubscribe(s *Subscription[T])

Unsubscribe removes a subscriber from the bus and closes its channel.

type HashSet

type HashSet[T comparable] map[T]struct{}

HashSet is a simple set implementation using Go's built-in map. It provides O(1) average time complexity for basic operations.

Type parameters:

  • T: The element type, must be comparable

func NewHashSet

func NewHashSet[T comparable](items ...T) HashSet[T]

NewHashSet creates a new HashSet with optional initial elements.

Parameters:

  • items: Optional initial elements to add to the set

Returns:

  • HashSet[T]: A new HashSet instance

func (HashSet[T]) Append

func (s HashSet[T]) Append(items ...T)

Append adds one or more elements to the set. Duplicate elements are ignored.

func (HashSet[T]) Contains

func (s HashSet[T]) Contains(value T) (exist bool)

Contains checks if an element exists in the set.

func (HashSet[T]) Difference

func (s HashSet[T]) Difference(s1 Set[T]) Set[T]

Difference returns a new set containing elements in this set but not in the other.

func (HashSet[T]) Intersection

func (s HashSet[T]) Intersection(s1 Set[T]) Set[T]

Intersection returns a new set containing elements present in both sets.

func (HashSet[T]) Len

func (s HashSet[T]) Len() int

Len returns the number of elements in the set.

func (HashSet[T]) Remove

func (s HashSet[T]) Remove(values ...T) bool

Remove removes one or more elements from the set. Non-existent elements are ignored.

func (HashSet[T]) String

func (s HashSet[T]) String() string

String returns a string representation of the set.

func (HashSet[T]) SymmetricDifference

func (s HashSet[T]) SymmetricDifference(s1 Set[T]) Set[T]

SymmetricDifference returns a new set containing elements present in either set but not both.

func (HashSet[T]) Union

func (s HashSet[T]) Union(s1 Set[T]) Set[T]

Union returns a new set containing all elements from both sets.

func (HashSet[T]) Values

func (s HashSet[T]) Values() (values []T)

Values returns a slice containing all elements in the set.

type Hub added in v0.0.31

type Hub[T any] interface {
	// Subscribe registers a new subscriber and returns its Subscription.
	// If the Hub is closed, it returns ErrHubClosed.
	Subscribe() (*Subscription[T], error)

	// Publish sends the given value to all active subscribers.
	// If the Hub is closed, it returns ErrHubClosed.
	Publish(v T) error

	// Unsubscribe removes the given Subscription from the Hub.
	// After unsubscription, the subscriber will no longer receive messages.
	Unsubscribe(*Subscription[T])

	// SetPublishCallback sets a custom publish callback function.
	// callback is called asynchronously when a message cannot be delivered to a subscriber due to a full buffer.
	// The callback must be non-blocking and must NOT call back into Hub.
	SetPublishCallback(callback func(id uint64, v T))

	// Close closes the Hub and all active subscriptions.
	// After closing, Subscribe and Publish will return ErrHubClosed.
	Close() error
}

Hub defines a simple publish-subscribe broadcast center. All subscribers receive every published message. It does not guarantee message ordering or delivery reliability.

type LinkStack

type LinkStack[T any] struct {
	// contains filtered or unexported fields
}

LinkStack is a stack implementation using a linked list as the underlying storage. It provides O(1) time complexity for push and pop operations.

Type parameters:

  • T: The element type stored in the stack

func NewLinkStack

func NewLinkStack[T any]() *LinkStack[T]

NewLinkStack creates a new empty LinkStack.

func (*LinkStack[T]) Empty

func (m *LinkStack[T]) Empty() bool

Empty returns true if the stack contains no elements.

func (*LinkStack[T]) Peek

func (m *LinkStack[T]) Peek() (value T, exist bool)

Peek returns the top element without removing it from the stack. Returns the element and a boolean indicating if the stack was not empty.

func (*LinkStack[T]) Pop

func (m *LinkStack[T]) Pop() (value T, exist bool)

Pop removes and returns the top element from the stack. Returns the element and a boolean indicating if the stack was not empty.

func (*LinkStack[T]) Push

func (m *LinkStack[T]) Push(value T)

Push adds an element to the top of the stack.

func (*LinkStack[T]) Size

func (m *LinkStack[T]) Size() int

Size returns the number of elements in the stack.

type Map

type Map[K comparable, V any] interface {
	// Store stores a value for a key.
	Store(K, V)

	// Load returns the value stored for a key, or false if no value is present.
	Load(K) (V, bool)

	// Delete deletes the value for a key.
	Delete(K) bool

	// Swap swaps the value for a key and returns the previous value if any.
	// The loaded result reports whether the key was present.
	Swap(key K, newValue V) (old V, loaded bool)

	// Range calls iterator sequentially for each key and value present in the map.
	// If iterator returns false, range stops the iteration.
	Range(iterator func(K, V) bool)

	// LoadOrStore returns the existing value for the key if present.
	// Otherwise, it stores and returns the given value.
	// The loaded result is true if the value was loaded, false if stored.
	LoadOrStore(key K, newValue V) (value V, loaded bool)

	// LoadAndDelete deletes the value for a key, returning the previous value if any.
	// The loaded result reports whether the key was present.
	LoadAndDelete(K) (value V, loaded bool)

	// CompareAndSwap swaps the value for a key if the current value equals old.
	// Returns true if the swap was performed.
	CompareAndSwap(key K, old, newValue V) bool

	// CompareAndDelete deletes the entry for a key if its value equals old.
	// Returns true if the entry was deleted.
	CompareAndDelete(key K, value V) bool

	// CompareFnAndSwap swaps the value for a key using a custom comparison function.
	// The function fn is called with the current value and old value.
	// Returns true if fn returns true and the swap was performed.
	CompareFnAndSwap(key K, fn func(V, V) bool, old, newValue V) bool

	// CompareFnAndDelete deletes the entry for a key using a custom comparison function.
	// The function fn is called with the current value and old value.
	// Returns true if fn returns true and the entry was deleted.
	CompareFnAndDelete(key K, fn func(V, V) bool, old V) bool

	// Keys returns a slice containing all keys in the map.
	Keys() []K

	// Values returns a slice containing all values in the map.
	Values() []V

	// Filter returns a new Map containing only entries that satisfy the filter function.
	Filter(filter func(K, V) bool) Map[K, V]
}

Map is a generic interface for thread-safe key-value storage. It provides operations similar to Go's sync.Map but with a more comprehensive API.

Type parameters:

  • K: The key type, must be comparable
  • V: The value type

Implementations:

  • SyncMap: Wraps Go's sync.Map with type safety
  • MutexMap: Uses a mutex-protected map for simpler concurrency control

type MutexMap

type MutexMap[K comparable, V any] struct {
	// contains filtered or unexported fields
}

MutexMap is a thread-safe map implementation using a mutex for synchronization. It provides a simpler alternative to SyncMap with mutex-based concurrency control.

func NewMutexMap

func NewMutexMap[K comparable, V any]() *MutexMap[K, V]

NewMutexMap creates a new MutexMap instance.

func (*MutexMap[K, V]) CompareAndDelete

func (m *MutexMap[K, V]) CompareAndDelete(key K, old V) bool

CompareAndDelete deletes the entry for a key if its value equals old. Returns true if the entry was deleted.

func (*MutexMap[K, V]) CompareAndSwap

func (m *MutexMap[K, V]) CompareAndSwap(key K, old, new V) bool

CompareAndSwap swaps the value for a key if the current value equals old. Returns true if the swap was performed.

func (*MutexMap[K, V]) CompareFnAndDelete

func (m *MutexMap[K, V]) CompareFnAndDelete(key K, fn func(V, V) bool, old V) bool

CompareFnAndDelete deletes the entry for a key using a custom comparison function. The function fn is called with the current value and old value. Returns true if fn returns true and the entry was deleted.

func (*MutexMap[K, V]) CompareFnAndSwap

func (m *MutexMap[K, V]) CompareFnAndSwap(key K, fn func(V, V) bool, old, new V) bool

CompareFnAndSwap swaps the value for a key using a custom comparison function. The function fn is called with the current value and old value. Returns true if fn returns true and the swap was performed.

func (*MutexMap[K, V]) Delete

func (m *MutexMap[K, V]) Delete(key K) bool

Delete deletes the value for a key.

func (*MutexMap[K, V]) Filter added in v0.0.32

func (m *MutexMap[K, V]) Filter(filter func(K, V) bool) Map[K, V]

Filter returns a new Map containing only entries that satisfy the filter function.

func (*MutexMap[K, V]) Keys added in v0.0.32

func (m *MutexMap[K, V]) Keys() []K

Keys returns a slice containing all keys in the map.

func (*MutexMap[K, V]) Len added in v0.0.35

func (m *MutexMap[K, V]) Len() int

Len returns the number of entries in the map.

func (*MutexMap[K, V]) Load

func (m *MutexMap[K, V]) Load(key K) (value V, ok bool)

Load returns the value stored for a key, or false if no value is present.

func (*MutexMap[K, V]) LoadAndDelete

func (m *MutexMap[K, V]) LoadAndDelete(key K) (value V, loaded bool)

LoadAndDelete deletes the value for a key, returning the previous value if any. The loaded result reports whether the key was present.

func (*MutexMap[K, V]) LoadOrStore

func (m *MutexMap[K, V]) LoadOrStore(key K, new V) (actual V, loaded bool)

LoadOrStore returns the existing value for the key if present. Otherwise, it stores and returns the given value. The loaded result is true if the value was loaded, false if stored.

func (*MutexMap[K, V]) Range

func (m *MutexMap[K, V]) Range(iterator func(key K, value V) bool)

Range calls iterator sequentially for each key and value present in the map. If iterator returns false, range stops the iteration.

func (*MutexMap[K, V]) Store

func (m *MutexMap[K, V]) Store(key K, value V)

Store stores a value for a key.

func (*MutexMap[K, V]) Swap

func (m *MutexMap[K, V]) Swap(key K, value V) (pre V, loaded bool)

Swap swaps the value for a key and returns the previous value if any. The loaded result reports whether the key was present.

func (*MutexMap[K, V]) Values added in v0.0.32

func (m *MutexMap[K, V]) Values() []V

Values returns a slice containing all values in the map.

type OrderHub added in v0.0.31

type OrderHub[T any] struct {
	// contains filtered or unexported fields
}

OrderHub is a simple in-memory publish-subscribe hub that delivers messages to subscribers in the order they subscribed. Unlike FastBus, OrderHub maintains a slice to preserve subscription order.

func NewOrderHub added in v0.0.31

func NewOrderHub[T any](bufSize int) *OrderHub[T]

NewOrderHub creates a new OrderBus with the given buffer size for each subscriber.

func (*OrderHub[T]) Close added in v0.0.31

func (b *OrderHub[T]) Close() error

Close closes the bus and all subscriber channels. After closing, Subscribe and Publish will return ErrHubClosed.

func (*OrderHub) Increment added in v0.0.39

func (g *OrderHub) Increment() uint64

Increment atomically increments and returns a new unique ID.

func (*OrderHub[T]) Publish added in v0.0.31

func (b *OrderHub[T]) Publish(v T) error

Publish sends the given value to all subscribers in subscription order. If a subscriber's channel is full, the message is dropped for that subscriber. If the bus is closed, it returns ErrHubClosed.

func (*OrderHub) SetPublishCallback added in v0.0.39

func (h *OrderHub) SetPublishCallback(f func(id uint64, v T))

func (*OrderHub[T]) Subscribe added in v0.0.31

func (b *OrderHub[T]) Subscribe() (*Subscription[T], error)

Subscribe registers a new subscriber and returns its Subscription. Messages will be delivered in the order subscribers were added. If the bus is closed, it returns ErrHubClosed.

func (*OrderHub[T]) Unsubscribe added in v0.0.31

func (b *OrderHub[T]) Unsubscribe(s *Subscription[T])

Unsubscribe removes a subscriber from the bus and closes its channel. After removal, the subscriber will no longer receive messages.

type Pool

type Pool[K, V any] struct {
	// New is a function that creates a new object when the pool doesn't have
	// an available instance for the given key.
	New poolNewFunc[K, V]

	// Identifier converts a key to a string identifier used for internal storage.
	// This allows the pool to use SyncMap for thread-safe operations.
	Identifier poolIDFunc[K]

	// Destroy is called when an object is removed from the pool (borrow count reaches zero).
	// Use this to clean up resources (e.g., close connections, free memory).
	Destroy poolDestroyFunc[V]
	// contains filtered or unexported fields
}

Pool is a generic, thread-safe object pool that manages reusable resources. It maintains a collection of objects that can be borrowed and returned, reducing the overhead of creating new objects repeatedly.

Type parameters:

  • K: The key type used to identify objects in the pool
  • V: The value type of objects stored in the pool

The pool tracks borrow counts for each object and automatically removes objects when they are no longer in use (borrow count reaches zero).

func (*Pool[K, V]) Get

func (p *Pool[K, V]) Get(ctx context.Context, key K) (value V, err error)

Get retrieves a value from the pool for the given key. If an object exists in the pool, its borrow count is incremented and the object is returned. If no object exists, a new one is created using the New function. This method uses context.Background() internally.

Parameters:

  • ctx: Context for cancellation/timeout (not used in this method, see GetWithCtx)
  • key: The key identifying the object to retrieve

Returns:

  • value: The retrieved or newly created object
  • err: Error if object creation fails

func (*Pool[K, V]) GetWithCtx

func (p *Pool[K, V]) GetWithCtx(ctx context.Context, key K) (value V, err error)

GetWithCtx retrieves a value from the pool for the given key with context support. If an object exists in the pool, its borrow count is incremented and the object is returned. If no object exists, a new one is created using the New function with the provided context.

Parameters:

  • ctx: Context for cancellation/timeout during object creation
  • key: The key identifying the object to retrieve

Returns:

  • value: The retrieved or newly created object
  • err: Error if object creation fails or context is cancelled

func (*Pool[K, V]) Put

func (p *Pool[K, V]) Put(key K) (err error)

Put returns a borrowed object to the pool, decrementing its borrow count. When the borrow count reaches zero, the object is removed from the pool and the Destroy function is called to clean up resources. This method uses context.Background() internally.

Parameters:

  • key: The key identifying the object to return

Returns:

  • err: Error if the Destroy function fails

func (*Pool[K, V]) PutWithCtx

func (p *Pool[K, V]) PutWithCtx(ctx context.Context, key K) (err error)

PutWithCtx returns a borrowed object to the pool with context support. When the borrow count reaches zero, the object is removed from the pool and the Destroy function is called with the provided context.

Parameters:

  • ctx: Context for cancellation/timeout during cleanup
  • key: The key identifying the object to return

Returns:

  • err: Error if the Destroy function fails or context is cancelled

type RingBuffer added in v0.0.31

type RingBuffer[T any] struct {
	// contains filtered or unexported fields
}

RingBuffer Non-concurrency-safe Ring

func NewRingBuffer added in v0.0.31

func NewRingBuffer[T any]() *RingBuffer[T]

NewRingBuffer creates a new RingBuffer with default size (1024).

func NewRingBufferWithSize added in v0.0.31

func NewRingBufferWithSize[T any](size int) *RingBuffer[T]

NewRingBufferWithSize creates a new RingBuffer with the specified size. If size <= 0, DefaultRingSize (1024) is used.

func (*RingBuffer[T]) Cap added in v0.0.31

func (r *RingBuffer[T]) Cap() int

Cap returns the capacity of the ring buffer.

func (*RingBuffer[T]) Iterator added in v0.0.31

func (r *RingBuffer[T]) Iterator() iter.Seq[T]

Iterator returns an iterator that yields all values in the ring buffer in chronological order (oldest to newest).

func (*RingBuffer[T]) Len added in v0.0.31

func (r *RingBuffer[T]) Len() int

Len returns the number of elements currently stored in the ring buffer.

func (*RingBuffer[T]) Push added in v0.0.31

func (r *RingBuffer[T]) Push(values ...T)

Push adds one or more values to the ring buffer. When the buffer is full, older values are overwritten (circular behavior).

func (*RingBuffer[T]) Values added in v0.0.31

func (r *RingBuffer[T]) Values() (result []T)

Values returns a slice containing all values in the ring buffer in chronological order (oldest to newest).

type Set

type Set[T comparable] interface {
	fmt.Stringer
	// Len returns the number of elements in the set.
	Len() int
	// Append adds one or more elements to the set.
	// Duplicate elements are ignored.
	Append(...T)
	// Remove removes one or more elements from the set.
	// Non-existent elements are ignored.
	Remove(...T) bool
	// Contains checks if an element exists in the set.
	Contains(T) bool
	// Values returns a slice containing all elements in the set.
	Values() []T
	// Union returns a new set containing all elements from both sets.
	Union(Set[T]) Set[T]
	// Intersection returns a new set containing elements present in both sets.
	Intersection(Set[T]) Set[T]
	// Difference returns a new set containing elements in this set but not in the other.
	Difference(Set[T]) Set[T]
	// SymmetricDifference returns a new set containing elements present in either set but not both.
	SymmetricDifference(Set[T]) Set[T]
}

Set is a generic interface for collections of unique elements. It provides standard set operations like union, intersection, difference, etc.

Type parameters:

  • T: The element type, must be comparable

type Stack

type Stack[T any] interface {
	// Push adds an element to the top of the stack.
	Push(T)
	// Pop removes and returns the top element from the stack.
	// Returns the element and a boolean indicating if the stack was not empty.
	Pop() (T, bool)
	// Peek returns the top element without removing it from the stack.
	// Returns the element and a boolean indicating if the stack was not empty.
	Peek() (T, bool)
	// Empty returns true if the stack contains no elements.
	Empty() bool
	// Size returns the number of elements in the stack.
	Size() int
}

Stack is a generic interface for Last-In-First-Out (LIFO) data structures. It provides basic stack operations like push, pop, and peek.

Type parameters:

  • T: The element type stored in the stack

type Subscription added in v0.0.31

type Subscription[T any] struct {
	// ID is the unique identifier for this subscriber.
	ID uint64
	// contains filtered or unexported fields
}

Subscription represents a single subscriber to a Hub. It holds the unique subscription ID and the receive-only channel that delivers published messages.

func (*Subscription[T]) Channel added in v0.0.39

func (s *Subscription[T]) Channel() <-chan T

Channel returns the receive-only channel for this subscription. Use this channel to receive messages published to the Hub.

type SyncMap

type SyncMap[K comparable, V any] struct {
	// contains filtered or unexported fields
}

SyncMap Generic wrapper for sync.Map

func NewSyncMap

func NewSyncMap[K comparable, V any]() *SyncMap[K, V]

NewSyncMap creates a new SyncMap instance.

func (*SyncMap[K, V]) CompareAndDelete

func (m *SyncMap[K, V]) CompareAndDelete(key K, old V) bool

CompareAndDelete deletes the entry for a key if its value equals old. Returns true if the entry was deleted.

func (*SyncMap[K, V]) CompareAndSwap

func (m *SyncMap[K, V]) CompareAndSwap(key K, old, new V) bool

CompareAndSwap swaps the value for a key if the current value equals old. Returns true if the swap was performed.

func (*SyncMap[K, V]) CompareFnAndDelete

func (m *SyncMap[K, V]) CompareFnAndDelete(key K, fn func(V, V) bool, old V) bool

CompareFnAndDelete deletes the entry for a key using a custom comparison function. The function fn is called with the current value and old value. Returns true if fn returns true and the entry was deleted.

func (*SyncMap[K, V]) CompareFnAndSwap

func (m *SyncMap[K, V]) CompareFnAndSwap(key K, fn func(V, V) bool, old, new V) bool

CompareFnAndSwap swaps the value for a key using a custom comparison function. The function fn is called with the current value and old value. Returns true if fn returns true and the swap was performed.

func (*SyncMap[K, V]) Delete

func (m *SyncMap[K, V]) Delete(key K) bool

Delete deletes the value for a key.

func (*SyncMap[K, V]) Filter added in v0.0.32

func (m *SyncMap[K, V]) Filter(filter func(K, V) bool) Map[K, V]

Filter returns a new Map containing only entries that satisfy the filter function.

func (*SyncMap[K, V]) Keys added in v0.0.32

func (m *SyncMap[K, V]) Keys() []K

Keys returns a slice containing all keys in the map.

func (*SyncMap[K, V]) Load

func (m *SyncMap[K, V]) Load(key K) (value V, ok bool)

Load returns the value stored for a key, or false if no value is present.

func (*SyncMap[K, V]) LoadAndDelete

func (m *SyncMap[K, V]) LoadAndDelete(key K) (value V, loaded bool)

LoadAndDelete deletes the value for a key, returning the previous value if any. The loaded result reports whether the key was present.

func (*SyncMap[K, V]) LoadOrStore

func (m *SyncMap[K, V]) LoadOrStore(key K, new V) (actual V, loaded bool)

LoadOrStore returns the existing value for the key if present. Otherwise, it stores and returns the given value. The loaded result is true if the value was loaded, false if stored.

func (*SyncMap[K, V]) Range

func (m *SyncMap[K, V]) Range(iterator func(key K, value V) bool)

Range calls iterator sequentially for each key and value present in the map. If iterator returns false, range stops the iteration.

func (*SyncMap[K, V]) Store

func (m *SyncMap[K, V]) Store(key K, value V)

Store stores a value for a key.

func (*SyncMap[K, V]) Swap

func (m *SyncMap[K, V]) Swap(key K, value V) (pre V, loaded bool)

Swap swaps the value for a key and returns the previous value if any. The loaded result reports whether the key was present.

func (*SyncMap[K, V]) Values added in v0.0.32

func (m *SyncMap[K, V]) Values() []V

Values returns a slice containing all values in the map.

type SyncSet

type SyncSet[T comparable] struct {
	// contains filtered or unexported fields
}

SyncSet is a thread-safe set implementation for comparable types.

func NewSyncSet

func NewSyncSet[T comparable](items ...T) *SyncSet[T]

NewSyncSet creates a new SyncSet and optionally adds initial items.

func (*SyncSet[T]) Append

func (s *SyncSet[T]) Append(values ...T)

Append adds one or more elements to the set. Duplicates are ignored.

func (*SyncSet[T]) Contains

func (s *SyncSet[T]) Contains(v T) bool

Contains reports whether the set contains the specified element.

func (*SyncSet[T]) Difference

func (s *SyncSet[T]) Difference(s1 Set[T]) Set[T]

Difference returns a new set containing elements in the current set but not in the other.

func (*SyncSet[T]) Intersection

func (s *SyncSet[T]) Intersection(s1 Set[T]) Set[T]

Intersection returns a new set containing elements present in both sets.

func (*SyncSet[T]) Len

func (s *SyncSet[T]) Len() int

Len returns the number of elements in the set.

func (*SyncSet[T]) Remove

func (s *SyncSet[T]) Remove(values ...T) bool

Remove deletes one or more elements from the set. Non-existent elements are ignored.

func (*SyncSet[T]) String

func (s *SyncSet[T]) String() string

String returns the string representation of the set's elements.

func (*SyncSet[T]) SymmetricDifference

func (s *SyncSet[T]) SymmetricDifference(s1 Set[T]) Set[T]

SymmetricDifference returns a new set containing elements present in either of the sets but not both.

func (*SyncSet[T]) Union

func (s *SyncSet[T]) Union(s1 Set[T]) Set[T]

Union returns a new set containing all elements from the current set and another set.

func (*SyncSet[T]) Values

func (s *SyncSet[T]) Values() []T

Values returns a slice containing all elements in the set (unordered).

type WorkQueue added in v0.0.31

type WorkQueue[T any] struct {
	// contains filtered or unexported fields
}

WorkQueue is a simple pub-sub style queue where each topic has exactly one subscriber (worker). A published message goes to exactly one consumer.

Typical use case: task queues, job workers.

T is the type of the message.

func NewWorkQueue added in v0.0.31

func NewWorkQueue[T any]() *WorkQueue[T]

NewWorkQueue creates a new WorkQueue with a given channel buffer size.

func (*WorkQueue[T]) AddTopic added in v0.0.31

func (u *WorkQueue[T]) AddTopic(topic string, bufSize int) error

AddTopic creates a new topic with its own buffered channel. Returns an error if the topic already exists.

func (*WorkQueue[T]) Close added in v0.0.31

func (u *WorkQueue[T]) Close()

Close closes all topic channels. Topics are not removed from the map.

func (*WorkQueue[T]) Publish added in v0.0.31

func (u *WorkQueue[T]) Publish(topic string, msg T) error

Publish sends a message to the topic's channel. If the channel buffer is full, the message is dropped silently. Returns an error if the topic does not exist.

func (*WorkQueue[T]) RemoveTopic added in v0.0.31

func (u *WorkQueue[T]) RemoveTopic(topic string) error

RemoveTopic closes and removes the topic's channel. Messages in the buffer are dropped.

func (*WorkQueue[T]) Subscribe added in v0.0.31

func (u *WorkQueue[T]) Subscribe(topic string) (<-chan T, error)

Subscribe returns the channel for the given topic. Only one subscriber should consume from the channel. Returns an error if the topic does not exist.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL