Documentation
¶
Index ¶
- Constants
- Variables
- func NewConnPool[T io.Closer](factory func() (T, error), residence int, opts ...ConnPoolOption[T]) *connPool[T]
- func NewCounter() *counter
- func NewMutexCounter() *mutexCounter
- type ArrayStack
- type BroadcastBus
- func (u *BroadcastBus[T]) AddTopic(topic string, bufSize int) error
- func (u *BroadcastBus[T]) Close() error
- func (u *BroadcastBus[T]) Publish(topic string, msg T) error
- func (u *BroadcastBus[T]) RemoveTopic(topic string) error
- func (u *BroadcastBus[T]) Subscribe(topic string) (*Subscription[T], error)
- func (u *BroadcastBus[T]) Unsubscribe(topic string, sub *Subscription[T]) error
- type Cache
- type ConnPoolOption
- type Counter
- type FastHub
- type HashSet
- func (s HashSet[T]) Append(items ...T)
- func (s HashSet[T]) Contains(value T) (exist bool)
- func (s HashSet[T]) Difference(s1 Set[T]) Set[T]
- func (s HashSet[T]) Intersection(s1 Set[T]) Set[T]
- func (s HashSet[T]) Len() int
- func (s HashSet[T]) Remove(values ...T) bool
- func (s HashSet[T]) String() string
- func (s HashSet[T]) SymmetricDifference(s1 Set[T]) Set[T]
- func (s HashSet[T]) Union(s1 Set[T]) Set[T]
- func (s HashSet[T]) Values() (values []T)
- type Hub
- type LinkStack
- type Map
- type MutexMap
- func (m *MutexMap[K, V]) CompareAndDelete(key K, old V) bool
- func (m *MutexMap[K, V]) CompareAndSwap(key K, old, new V) bool
- func (m *MutexMap[K, V]) CompareFnAndDelete(key K, fn func(V, V) bool, old V) bool
- func (m *MutexMap[K, V]) CompareFnAndSwap(key K, fn func(V, V) bool, old, new V) bool
- func (m *MutexMap[K, V]) Delete(key K) bool
- func (m *MutexMap[K, V]) Filter(filter func(K, V) bool) Map[K, V]
- func (m *MutexMap[K, V]) Keys() []K
- func (m *MutexMap[K, V]) Len() int
- func (m *MutexMap[K, V]) Load(key K) (value V, ok bool)
- func (m *MutexMap[K, V]) LoadAndDelete(key K) (value V, loaded bool)
- func (m *MutexMap[K, V]) LoadOrStore(key K, new V) (actual V, loaded bool)
- func (m *MutexMap[K, V]) Range(iterator func(key K, value V) bool)
- func (m *MutexMap[K, V]) Store(key K, value V)
- func (m *MutexMap[K, V]) Swap(key K, value V) (pre V, loaded bool)
- func (m *MutexMap[K, V]) Values() []V
- type OrderHub
- type Pool
- type RingBuffer
- type Set
- type Stack
- type Subscription
- type SyncMap
- func (m *SyncMap[K, V]) CompareAndDelete(key K, old V) bool
- func (m *SyncMap[K, V]) CompareAndSwap(key K, old, new V) bool
- func (m *SyncMap[K, V]) CompareFnAndDelete(key K, fn func(V, V) bool, old V) bool
- func (m *SyncMap[K, V]) CompareFnAndSwap(key K, fn func(V, V) bool, old, new V) bool
- func (m *SyncMap[K, V]) Delete(key K) bool
- func (m *SyncMap[K, V]) Filter(filter func(K, V) bool) Map[K, V]
- func (m *SyncMap[K, V]) Keys() []K
- func (m *SyncMap[K, V]) Load(key K) (value V, ok bool)
- func (m *SyncMap[K, V]) LoadAndDelete(key K) (value V, loaded bool)
- func (m *SyncMap[K, V]) LoadOrStore(key K, new V) (actual V, loaded bool)
- func (m *SyncMap[K, V]) Range(iterator func(key K, value V) bool)
- func (m *SyncMap[K, V]) Store(key K, value V)
- func (m *SyncMap[K, V]) Swap(key K, value V) (pre V, loaded bool)
- func (m *SyncMap[K, V]) Values() []V
- type SyncSet
- func (s *SyncSet[T]) Append(values ...T)
- func (s *SyncSet[T]) Contains(v T) bool
- func (s *SyncSet[T]) Difference(s1 Set[T]) Set[T]
- func (s *SyncSet[T]) Intersection(s1 Set[T]) Set[T]
- func (s *SyncSet[T]) Len() int
- func (s *SyncSet[T]) Remove(values ...T) bool
- func (s *SyncSet[T]) String() string
- func (s *SyncSet[T]) SymmetricDifference(s1 Set[T]) Set[T]
- func (s *SyncSet[T]) Union(s1 Set[T]) Set[T]
- func (s *SyncSet[T]) Values() []T
- type WorkQueue
Constants ¶
const DefaultRingSize = 1024
Variables ¶
var ( // ErrTopicNotFound indicates the given topic does not exist. ErrTopicNotFound = errors.New("topic not found") // ErrTopicAlreadyExists indicates the topic already exists. ErrTopicAlreadyExists = errors.New("topic already exists") // ErrTopicQueueFull indicates the topic queue is full ErrTopicQueueFull = errors.New("topic queue is full") )
var ErrConnPoolClosed = fmt.Errorf("conn pool is closed")
var (
ErrHubClosed = errors.New("hub is closed")
)
ErrHubClosed is returned when operations are attempted on a closed Hub.
Functions ¶
func NewConnPool ¶ added in v0.0.35
func NewConnPool[T io.Closer]( factory func() (T, error), residence int, opts ...ConnPoolOption[T], ) *connPool[T]
NewConnPool returns a new connection pool.
Parameters:
- factory: Required function that creates new connections
- residence: Maximum number of connections to keep in the pool (capacity)
- opts: Optional configuration options
Returns:
- *connPool[T]: A new connection pool instance
Panics:
- If factory is nil
func NewCounter ¶ added in v0.1.1
func NewCounter() *counter
func NewMutexCounter ¶ added in v0.1.1
func NewMutexCounter() *mutexCounter
Types ¶
type ArrayStack ¶
type ArrayStack[T any] struct { // contains filtered or unexported fields }
ArrayStack is a stack implementation using a slice as the underlying storage. It provides O(1) average time complexity for push and pop operations.
Type parameters:
- T: The element type stored in the stack
func NewArrayStack ¶
func NewArrayStack[T any]() *ArrayStack[T]
NewArrayStack creates a new empty ArrayStack.
func (*ArrayStack[T]) Empty ¶
func (m *ArrayStack[T]) Empty() bool
Empty returns true if the stack contains no elements.
func (*ArrayStack[T]) Peek ¶
func (m *ArrayStack[T]) Peek() (value T, exist bool)
Peek returns the top element without removing it from the stack. Returns the element and a boolean indicating if the stack was not empty.
func (*ArrayStack[T]) Pop ¶
func (m *ArrayStack[T]) Pop() (value T, exist bool)
Pop removes and returns the top element from the stack. Returns the element and a boolean indicating if the stack was not empty.
func (*ArrayStack[T]) Push ¶
func (m *ArrayStack[T]) Push(value T)
Push adds an element to the top of the stack.
func (*ArrayStack[T]) Size ¶
func (m *ArrayStack[T]) Size() int
Size returns the number of elements in the stack.
func (*ArrayStack[T]) String ¶
func (m *ArrayStack[T]) String() string
String returns a string representation of the stack with elements from top to bottom.
type BroadcastBus ¶ added in v0.0.31
type BroadcastBus[T any] struct { // contains filtered or unexported fields }
BroadcastBus is a multi-topic, multi-subscriber pub-sub bus.
Each topic is a FastHub (fan-out): - When you publish a message to a topic, all subscribers receive a copy. - If a topic has no subscribers, messages are discarded.
Typical use case: event broadcast, notifications, state updates.
func NewBroadcastBus ¶ added in v0.0.31
func NewBroadcastBus[T any]() *BroadcastBus[T]
NewBroadcastBus creates a new BroadcastBus with the given buffer size for each subscriber channel.
func (*BroadcastBus[T]) AddTopic ¶ added in v0.0.32
func (u *BroadcastBus[T]) AddTopic(topic string, bufSize int) error
AddTopic creates a new topic with its own buffered channel. Returns an error if the topic already exists.
func (*BroadcastBus[T]) Close ¶ added in v0.0.31
func (u *BroadcastBus[T]) Close() error
Close closes all topics and marks the bus as closed. Any further operations on the bus will return an error.
func (*BroadcastBus[T]) Publish ¶ added in v0.0.31
func (u *BroadcastBus[T]) Publish(topic string, msg T) error
Publish broadcasts a message to all subscribers of the given topic.
If the topic has no subscribers, returns ErrTopicNotFound. If the bus is closed, returns an error.
func (*BroadcastBus[T]) RemoveTopic ¶ added in v0.0.32
func (u *BroadcastBus[T]) RemoveTopic(topic string) error
RemoveTopic closes and removes the topic's channel. Messages in the buffer are dropped.
func (*BroadcastBus[T]) Subscribe ¶ added in v0.0.31
func (u *BroadcastBus[T]) Subscribe(topic string) (*Subscription[T], error)
Subscribe subscribes to the given topic, creating the topic if needed.
Returns a new Subscription which has a buffered channel for receiving messages.
If the bus is closed, returns an error.
func (*BroadcastBus[T]) Unsubscribe ¶ added in v0.0.31
func (u *BroadcastBus[T]) Unsubscribe(topic string, sub *Subscription[T]) error
Unsubscribe removes a subscriber from the given topic. If no subscribers remain for that topic, the topic is closed and removed.
Returns an error if the bus is closed or the topic does not exist.
type Cache ¶
type Cache[T any] struct { // contains filtered or unexported fields }
Cache is a thread-safe in-memory cache with optional expiration and cleanup. It stores key-value pairs with configurable expiration times and automatic cleanup of expired entries.
Type parameters:
- T: The type of values stored in the cache
func NewCache ¶
NewCache creates a new cache with the specified expiration and cleanup intervals.
Parameters:
- expire: Duration after which entries expire. Use 0 for no expiration.
- cleanup: Interval for automatic cleanup of expired entries. Use 0 for no automatic cleanup.
Returns:
- *Cache[T]: A new cache instance
Example:
// Create a cache with 1-minute expiration and 5-minute cleanup interval cache := NewCache[string](time.Minute, 5*time.Minute)
func (*Cache[T]) Delete ¶
Delete removes an entry from the cache.
Parameters:
- key: The key to delete
func (*Cache[T]) Get ¶
Get retrieves a value from the cache by key.
Parameters:
- key: The key to look up
Returns:
- value: The retrieved value (zero value if not found)
- loaded: True if the key was found and not expired
func (*Cache[T]) Release ¶ added in v0.0.32
Release stops the cleanup goroutine and releases resources. Call this method when the cache is no longer needed to prevent goroutine leaks.
Returns:
- error: Always returns nil
func (*Cache[T]) Set ¶
Set stores a value in the cache with the default expiration time.
Parameters:
- key: The key to store the value under
- value: The value to store
func (*Cache[T]) SetWithExpire ¶
SetWithExpire stores a value in the cache with a custom expiration time.
Parameters:
- key: The key to store the value under
- value: The value to store
- expire: Custom expiration duration for this entry
type ConnPoolOption ¶ added in v0.0.35
ConnPoolOption defines optional configuration for the connection pool.
type Counter ¶ added in v0.1.1
type Counter interface {
// Diff calculates the difference between the new value and the previous value.
// It updates the counter with the new value and returns the difference.
//
// Parameters:
// - val: The new value to update the counter with
//
// Returns:
// - The difference between the new value and the previous value
// - Returns 0 if this is the first update
Diff(float64) float64
// Rate calculates the rate of change per second.
// Equivalent to RateIn(val, time.Second).
//
// Parameters:
// - val: The new value to update the counter with
//
// Returns:
// - The rate of change per second
// - Returns 0 if this is the first update or time elapsed is 0
Rate(float64) float64
// RateIn calculates the rate of change per specified time interval.
// Formula: (new_value - old_value) / (time_elapsed / interval)
//
// Parameters:
// - val: The new value to update the counter with
// - interval: The time interval to normalize the rate to (e.g., time.Second for per-second rate)
//
// Returns:
// - The rate of change per specified interval
// - Returns 0 if this is the first update, interval <= 0, or time elapsed is 0
RateIn(float64, time.Duration) float64
}
type FastHub ¶ added in v0.0.31
type FastHub[T any] struct { // contains filtered or unexported fields }
FastHub is a simple in-memory publish-subscribe hub. It broadcasts each published message to all active subscribers. FastHub does not guarantee message ordering or reliable delivery. If a subscriber's channel buffer is full, the message is dropped.
func NewFastHub ¶ added in v0.0.31
NewFastHub creates a new FastBus with the given buffer size for each subscriber.
func (*FastHub[T]) Close ¶ added in v0.0.31
Close closes the bus and all active subscriber channels. After closing, Publish and Subscribe will return ErrHubClosed.
func (*FastHub) Increment ¶ added in v0.0.31
func (g *FastHub) Increment() uint64
Increment atomically increments and returns a new unique ID.
func (*FastHub[T]) Publish ¶ added in v0.0.31
Publish broadcasts the given value to all active subscribers. If a subscriber's channel is full, the message is dropped. If the bus is closed, it returns ErrHubClosed.
func (*FastHub) SetPublishCallback ¶ added in v0.0.39
func (h *FastHub) SetPublishCallback(f func(id uint64, v T))
func (*FastHub[T]) Subscribe ¶ added in v0.0.31
func (b *FastHub[T]) Subscribe() (*Subscription[T], error)
Subscribe registers a new subscriber and returns its Subscription. If the bus is closed, it returns ErrHubClosed.
func (*FastHub[T]) Unsubscribe ¶ added in v0.0.31
func (b *FastHub[T]) Unsubscribe(s *Subscription[T])
Unsubscribe removes a subscriber from the bus and closes its channel.
type HashSet ¶
type HashSet[T comparable] map[T]struct{}
HashSet is a simple set implementation using Go's built-in map. It provides O(1) average time complexity for basic operations.
Type parameters:
- T: The element type, must be comparable
func NewHashSet ¶
func NewHashSet[T comparable](items ...T) HashSet[T]
NewHashSet creates a new HashSet with optional initial elements.
Parameters:
- items: Optional initial elements to add to the set
Returns:
- HashSet[T]: A new HashSet instance
func (HashSet[T]) Append ¶
func (s HashSet[T]) Append(items ...T)
Append adds one or more elements to the set. Duplicate elements are ignored.
func (HashSet[T]) Difference ¶
Difference returns a new set containing elements in this set but not in the other.
func (HashSet[T]) Intersection ¶
Intersection returns a new set containing elements present in both sets.
func (HashSet[T]) Remove ¶
Remove removes one or more elements from the set. Non-existent elements are ignored.
func (HashSet[T]) SymmetricDifference ¶
SymmetricDifference returns a new set containing elements present in either set but not both.
type Hub ¶ added in v0.0.31
type Hub[T any] interface { // Subscribe registers a new subscriber and returns its Subscription. // If the Hub is closed, it returns ErrHubClosed. Subscribe() (*Subscription[T], error) // Publish sends the given value to all active subscribers. // If the Hub is closed, it returns ErrHubClosed. Publish(v T) error // Unsubscribe removes the given Subscription from the Hub. // After unsubscription, the subscriber will no longer receive messages. Unsubscribe(*Subscription[T]) // SetPublishCallback sets a custom publish callback function. // callback is called asynchronously when a message cannot be delivered to a subscriber due to a full buffer. // The callback must be non-blocking and must NOT call back into Hub. SetPublishCallback(callback func(id uint64, v T)) // Close closes the Hub and all active subscriptions. // After closing, Subscribe and Publish will return ErrHubClosed. Close() error }
Hub defines a simple publish-subscribe broadcast center. All subscribers receive every published message. It does not guarantee message ordering or delivery reliability.
type LinkStack ¶
type LinkStack[T any] struct { // contains filtered or unexported fields }
LinkStack is a stack implementation using a linked list as the underlying storage. It provides O(1) time complexity for push and pop operations.
Type parameters:
- T: The element type stored in the stack
func NewLinkStack ¶
NewLinkStack creates a new empty LinkStack.
func (*LinkStack[T]) Peek ¶
Peek returns the top element without removing it from the stack. Returns the element and a boolean indicating if the stack was not empty.
func (*LinkStack[T]) Pop ¶
Pop removes and returns the top element from the stack. Returns the element and a boolean indicating if the stack was not empty.
type Map ¶
type Map[K comparable, V any] interface { // Store stores a value for a key. Store(K, V) // Load returns the value stored for a key, or false if no value is present. Load(K) (V, bool) // Delete deletes the value for a key. Delete(K) bool // Swap swaps the value for a key and returns the previous value if any. // The loaded result reports whether the key was present. Swap(key K, newValue V) (old V, loaded bool) // Range calls iterator sequentially for each key and value present in the map. // If iterator returns false, range stops the iteration. Range(iterator func(K, V) bool) // LoadOrStore returns the existing value for the key if present. // Otherwise, it stores and returns the given value. // The loaded result is true if the value was loaded, false if stored. LoadOrStore(key K, newValue V) (value V, loaded bool) // LoadAndDelete deletes the value for a key, returning the previous value if any. // The loaded result reports whether the key was present. LoadAndDelete(K) (value V, loaded bool) // CompareAndSwap swaps the value for a key if the current value equals old. // Returns true if the swap was performed. CompareAndSwap(key K, old, newValue V) bool // CompareAndDelete deletes the entry for a key if its value equals old. // Returns true if the entry was deleted. CompareAndDelete(key K, value V) bool // CompareFnAndSwap swaps the value for a key using a custom comparison function. // The function fn is called with the current value and old value. // Returns true if fn returns true and the swap was performed. CompareFnAndSwap(key K, fn func(V, V) bool, old, newValue V) bool // CompareFnAndDelete deletes the entry for a key using a custom comparison function. // The function fn is called with the current value and old value. // Returns true if fn returns true and the entry was deleted. CompareFnAndDelete(key K, fn func(V, V) bool, old V) bool // Keys returns a slice containing all keys in the map. Keys() []K // Values returns a slice containing all values in the map. Values() []V // Filter returns a new Map containing only entries that satisfy the filter function. Filter(filter func(K, V) bool) Map[K, V] }
Map is a generic interface for thread-safe key-value storage. It provides operations similar to Go's sync.Map but with a more comprehensive API.
Type parameters:
- K: The key type, must be comparable
- V: The value type
Implementations:
- SyncMap: Wraps Go's sync.Map with type safety
- MutexMap: Uses a mutex-protected map for simpler concurrency control
type MutexMap ¶
type MutexMap[K comparable, V any] struct { // contains filtered or unexported fields }
MutexMap is a thread-safe map implementation using a mutex for synchronization. It provides a simpler alternative to SyncMap with mutex-based concurrency control.
func NewMutexMap ¶
func NewMutexMap[K comparable, V any]() *MutexMap[K, V]
NewMutexMap creates a new MutexMap instance.
func (*MutexMap[K, V]) CompareAndDelete ¶
CompareAndDelete deletes the entry for a key if its value equals old. Returns true if the entry was deleted.
func (*MutexMap[K, V]) CompareAndSwap ¶
CompareAndSwap swaps the value for a key if the current value equals old. Returns true if the swap was performed.
func (*MutexMap[K, V]) CompareFnAndDelete ¶
CompareFnAndDelete deletes the entry for a key using a custom comparison function. The function fn is called with the current value and old value. Returns true if fn returns true and the entry was deleted.
func (*MutexMap[K, V]) CompareFnAndSwap ¶
CompareFnAndSwap swaps the value for a key using a custom comparison function. The function fn is called with the current value and old value. Returns true if fn returns true and the swap was performed.
func (*MutexMap[K, V]) Filter ¶ added in v0.0.32
Filter returns a new Map containing only entries that satisfy the filter function.
func (*MutexMap[K, V]) Keys ¶ added in v0.0.32
func (m *MutexMap[K, V]) Keys() []K
Keys returns a slice containing all keys in the map.
func (*MutexMap[K, V]) Load ¶
Load returns the value stored for a key, or false if no value is present.
func (*MutexMap[K, V]) LoadAndDelete ¶
LoadAndDelete deletes the value for a key, returning the previous value if any. The loaded result reports whether the key was present.
func (*MutexMap[K, V]) LoadOrStore ¶
LoadOrStore returns the existing value for the key if present. Otherwise, it stores and returns the given value. The loaded result is true if the value was loaded, false if stored.
func (*MutexMap[K, V]) Range ¶
Range calls iterator sequentially for each key and value present in the map. If iterator returns false, range stops the iteration.
func (*MutexMap[K, V]) Store ¶
func (m *MutexMap[K, V]) Store(key K, value V)
Store stores a value for a key.
type OrderHub ¶ added in v0.0.31
type OrderHub[T any] struct { // contains filtered or unexported fields }
OrderHub is a simple in-memory publish-subscribe hub that delivers messages to subscribers in the order they subscribed. Unlike FastBus, OrderHub maintains a slice to preserve subscription order.
func NewOrderHub ¶ added in v0.0.31
NewOrderHub creates a new OrderBus with the given buffer size for each subscriber.
func (*OrderHub[T]) Close ¶ added in v0.0.31
Close closes the bus and all subscriber channels. After closing, Subscribe and Publish will return ErrHubClosed.
func (*OrderHub) Increment ¶ added in v0.0.39
func (g *OrderHub) Increment() uint64
Increment atomically increments and returns a new unique ID.
func (*OrderHub[T]) Publish ¶ added in v0.0.31
Publish sends the given value to all subscribers in subscription order. If a subscriber's channel is full, the message is dropped for that subscriber. If the bus is closed, it returns ErrHubClosed.
func (*OrderHub) SetPublishCallback ¶ added in v0.0.39
func (h *OrderHub) SetPublishCallback(f func(id uint64, v T))
func (*OrderHub[T]) Subscribe ¶ added in v0.0.31
func (b *OrderHub[T]) Subscribe() (*Subscription[T], error)
Subscribe registers a new subscriber and returns its Subscription. Messages will be delivered in the order subscribers were added. If the bus is closed, it returns ErrHubClosed.
func (*OrderHub[T]) Unsubscribe ¶ added in v0.0.31
func (b *OrderHub[T]) Unsubscribe(s *Subscription[T])
Unsubscribe removes a subscriber from the bus and closes its channel. After removal, the subscriber will no longer receive messages.
type Pool ¶
type Pool[K, V any] struct { // New is a function that creates a new object when the pool doesn't have // an available instance for the given key. New poolNewFunc[K, V] // Identifier converts a key to a string identifier used for internal storage. // This allows the pool to use SyncMap for thread-safe operations. Identifier poolIDFunc[K] // Destroy is called when an object is removed from the pool (borrow count reaches zero). // Use this to clean up resources (e.g., close connections, free memory). Destroy poolDestroyFunc[V] // contains filtered or unexported fields }
Pool is a generic, thread-safe object pool that manages reusable resources. It maintains a collection of objects that can be borrowed and returned, reducing the overhead of creating new objects repeatedly.
Type parameters:
- K: The key type used to identify objects in the pool
- V: The value type of objects stored in the pool
The pool tracks borrow counts for each object and automatically removes objects when they are no longer in use (borrow count reaches zero).
func (*Pool[K, V]) Get ¶
Get retrieves a value from the pool for the given key. If an object exists in the pool, its borrow count is incremented and the object is returned. If no object exists, a new one is created using the New function. This method uses context.Background() internally.
Parameters:
- ctx: Context for cancellation/timeout (not used in this method, see GetWithCtx)
- key: The key identifying the object to retrieve
Returns:
- value: The retrieved or newly created object
- err: Error if object creation fails
func (*Pool[K, V]) GetWithCtx ¶
GetWithCtx retrieves a value from the pool for the given key with context support. If an object exists in the pool, its borrow count is incremented and the object is returned. If no object exists, a new one is created using the New function with the provided context.
Parameters:
- ctx: Context for cancellation/timeout during object creation
- key: The key identifying the object to retrieve
Returns:
- value: The retrieved or newly created object
- err: Error if object creation fails or context is cancelled
func (*Pool[K, V]) Put ¶
Put returns a borrowed object to the pool, decrementing its borrow count. When the borrow count reaches zero, the object is removed from the pool and the Destroy function is called to clean up resources. This method uses context.Background() internally.
Parameters:
- key: The key identifying the object to return
Returns:
- err: Error if the Destroy function fails
func (*Pool[K, V]) PutWithCtx ¶
PutWithCtx returns a borrowed object to the pool with context support. When the borrow count reaches zero, the object is removed from the pool and the Destroy function is called with the provided context.
Parameters:
- ctx: Context for cancellation/timeout during cleanup
- key: The key identifying the object to return
Returns:
- err: Error if the Destroy function fails or context is cancelled
type RingBuffer ¶ added in v0.0.31
type RingBuffer[T any] struct { // contains filtered or unexported fields }
RingBuffer Non-concurrency-safe Ring
func NewRingBuffer ¶ added in v0.0.31
func NewRingBuffer[T any]() *RingBuffer[T]
NewRingBuffer creates a new RingBuffer with default size (1024).
func NewRingBufferWithSize ¶ added in v0.0.31
func NewRingBufferWithSize[T any](size int) *RingBuffer[T]
NewRingBufferWithSize creates a new RingBuffer with the specified size. If size <= 0, DefaultRingSize (1024) is used.
func (*RingBuffer[T]) Cap ¶ added in v0.0.31
func (r *RingBuffer[T]) Cap() int
Cap returns the capacity of the ring buffer.
func (*RingBuffer[T]) Iterator ¶ added in v0.0.31
func (r *RingBuffer[T]) Iterator() iter.Seq[T]
Iterator returns an iterator that yields all values in the ring buffer in chronological order (oldest to newest).
func (*RingBuffer[T]) Len ¶ added in v0.0.31
func (r *RingBuffer[T]) Len() int
Len returns the number of elements currently stored in the ring buffer.
func (*RingBuffer[T]) Push ¶ added in v0.0.31
func (r *RingBuffer[T]) Push(values ...T)
Push adds one or more values to the ring buffer. When the buffer is full, older values are overwritten (circular behavior).
func (*RingBuffer[T]) Values ¶ added in v0.0.31
func (r *RingBuffer[T]) Values() (result []T)
Values returns a slice containing all values in the ring buffer in chronological order (oldest to newest).
type Set ¶
type Set[T comparable] interface { fmt.Stringer // Len returns the number of elements in the set. Len() int // Append adds one or more elements to the set. // Duplicate elements are ignored. Append(...T) // Remove removes one or more elements from the set. // Non-existent elements are ignored. Remove(...T) bool // Contains checks if an element exists in the set. Contains(T) bool // Values returns a slice containing all elements in the set. Values() []T // Union returns a new set containing all elements from both sets. Union(Set[T]) Set[T] // Intersection returns a new set containing elements present in both sets. Intersection(Set[T]) Set[T] // Difference returns a new set containing elements in this set but not in the other. Difference(Set[T]) Set[T] // SymmetricDifference returns a new set containing elements present in either set but not both. SymmetricDifference(Set[T]) Set[T] }
Set is a generic interface for collections of unique elements. It provides standard set operations like union, intersection, difference, etc.
Type parameters:
- T: The element type, must be comparable
type Stack ¶
type Stack[T any] interface { // Push adds an element to the top of the stack. Push(T) // Pop removes and returns the top element from the stack. // Returns the element and a boolean indicating if the stack was not empty. Pop() (T, bool) // Peek returns the top element without removing it from the stack. // Returns the element and a boolean indicating if the stack was not empty. Peek() (T, bool) // Empty returns true if the stack contains no elements. Empty() bool // Size returns the number of elements in the stack. Size() int }
Stack is a generic interface for Last-In-First-Out (LIFO) data structures. It provides basic stack operations like push, pop, and peek.
Type parameters:
- T: The element type stored in the stack
type Subscription ¶ added in v0.0.31
type Subscription[T any] struct { // ID is the unique identifier for this subscriber. ID uint64 // contains filtered or unexported fields }
Subscription represents a single subscriber to a Hub. It holds the unique subscription ID and the receive-only channel that delivers published messages.
func (*Subscription[T]) Channel ¶ added in v0.0.39
func (s *Subscription[T]) Channel() <-chan T
Channel returns the receive-only channel for this subscription. Use this channel to receive messages published to the Hub.
type SyncMap ¶
type SyncMap[K comparable, V any] struct { // contains filtered or unexported fields }
SyncMap Generic wrapper for sync.Map
func NewSyncMap ¶
func NewSyncMap[K comparable, V any]() *SyncMap[K, V]
NewSyncMap creates a new SyncMap instance.
func (*SyncMap[K, V]) CompareAndDelete ¶
CompareAndDelete deletes the entry for a key if its value equals old. Returns true if the entry was deleted.
func (*SyncMap[K, V]) CompareAndSwap ¶
CompareAndSwap swaps the value for a key if the current value equals old. Returns true if the swap was performed.
func (*SyncMap[K, V]) CompareFnAndDelete ¶
CompareFnAndDelete deletes the entry for a key using a custom comparison function. The function fn is called with the current value and old value. Returns true if fn returns true and the entry was deleted.
func (*SyncMap[K, V]) CompareFnAndSwap ¶
CompareFnAndSwap swaps the value for a key using a custom comparison function. The function fn is called with the current value and old value. Returns true if fn returns true and the swap was performed.
func (*SyncMap[K, V]) Filter ¶ added in v0.0.32
Filter returns a new Map containing only entries that satisfy the filter function.
func (*SyncMap[K, V]) Keys ¶ added in v0.0.32
func (m *SyncMap[K, V]) Keys() []K
Keys returns a slice containing all keys in the map.
func (*SyncMap[K, V]) Load ¶
Load returns the value stored for a key, or false if no value is present.
func (*SyncMap[K, V]) LoadAndDelete ¶
LoadAndDelete deletes the value for a key, returning the previous value if any. The loaded result reports whether the key was present.
func (*SyncMap[K, V]) LoadOrStore ¶
LoadOrStore returns the existing value for the key if present. Otherwise, it stores and returns the given value. The loaded result is true if the value was loaded, false if stored.
func (*SyncMap[K, V]) Range ¶
Range calls iterator sequentially for each key and value present in the map. If iterator returns false, range stops the iteration.
func (*SyncMap[K, V]) Store ¶
func (m *SyncMap[K, V]) Store(key K, value V)
Store stores a value for a key.
type SyncSet ¶
type SyncSet[T comparable] struct { // contains filtered or unexported fields }
SyncSet is a thread-safe set implementation for comparable types.
func NewSyncSet ¶
func NewSyncSet[T comparable](items ...T) *SyncSet[T]
NewSyncSet creates a new SyncSet and optionally adds initial items.
func (*SyncSet[T]) Append ¶
func (s *SyncSet[T]) Append(values ...T)
Append adds one or more elements to the set. Duplicates are ignored.
func (*SyncSet[T]) Difference ¶
Difference returns a new set containing elements in the current set but not in the other.
func (*SyncSet[T]) Intersection ¶
Intersection returns a new set containing elements present in both sets.
func (*SyncSet[T]) Remove ¶
Remove deletes one or more elements from the set. Non-existent elements are ignored.
func (*SyncSet[T]) SymmetricDifference ¶
SymmetricDifference returns a new set containing elements present in either of the sets but not both.
type WorkQueue ¶ added in v0.0.31
type WorkQueue[T any] struct { // contains filtered or unexported fields }
WorkQueue is a simple pub-sub style queue where each topic has exactly one subscriber (worker). A published message goes to exactly one consumer.
Typical use case: task queues, job workers.
T is the type of the message.
func NewWorkQueue ¶ added in v0.0.31
NewWorkQueue creates a new WorkQueue with a given channel buffer size.
func (*WorkQueue[T]) AddTopic ¶ added in v0.0.31
AddTopic creates a new topic with its own buffered channel. Returns an error if the topic already exists.
func (*WorkQueue[T]) Close ¶ added in v0.0.31
func (u *WorkQueue[T]) Close()
Close closes all topic channels. Topics are not removed from the map.
func (*WorkQueue[T]) Publish ¶ added in v0.0.31
Publish sends a message to the topic's channel. If the channel buffer is full, the message is dropped silently. Returns an error if the topic does not exist.
func (*WorkQueue[T]) RemoveTopic ¶ added in v0.0.31
RemoveTopic closes and removes the topic's channel. Messages in the buffer are dropped.