wheeltimer

package module
v0.0.0-...-f6cbd97 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2025 License: Apache-2.0 Imports: 12 Imported by: 0

README

wheeltimer

A simple wheel timer implemented based on Netty.

Documentation

Overview

Copyright 2014 Workiva, LLC

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (
	// DefaultMaxPendingTimeouts is the default maximum number of pending timeouts.
	DefaultMaxPendingTimeouts = 512
	DefaultRingBufferSize     = 1024
)

Variables

View Source
var (
	// ErrDisposed is returned when an operation is performed on a disposed
	// queue.
	ErrDisposed = errors.New(`queue: disposed`)

	// ErrTimeout is returned when an applicable queue operation times out.
	ErrTimeout = timeoutError{}

	// ErrEmpty is returned when queue is empty
	ErrEmpty = errors.New(`queue: empty`)
)

Functions

This section is empty.

Types

type BlockingWaitStrategy

type BlockingWaitStrategy struct {
	// contains filtered or unexported fields
}

BlockingWaitStrategy is a strategy that uses a sync.Cond for waiting on a sequence to be available.

func (*BlockingWaitStrategy) SignalAll

func (s *BlockingWaitStrategy) SignalAll()

func (*BlockingWaitStrategy) WaitFor

func (s *BlockingWaitStrategy) WaitFor(timeout time.Duration) error

type DataTimerTask

type DataTimerTask[T any] struct {
	// contains filtered or unexported fields
}

func (DataTimerTask[T]) Run

func (f DataTimerTask[T]) Run(timeout Timeout) error

type Executor

type Executor interface {
	Execute(task func())
}

type PanicHandler

type PanicHandler func(interface{})

type RingBuffer

type RingBuffer struct {
	// contains filtered or unexported fields
}

RingBuffer is a MPMC buffer that achieves threadsafety with CAS operations only. A put on full or get on empty call will block until an item is put or retrieved. Calling Dispose on the RingBuffer will unblock any blocked threads with an error. This buffer is similar to the buffer described here: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue with some minor additions.

func NewRingBuffer

func NewRingBuffer(size uint64, opts ...RingOption) *RingBuffer

NewRingBuffer will allocate, initialize, and return a ring buffer with the specified size.

func (*RingBuffer) Cap

func (rb *RingBuffer) Cap() uint64

Cap returns the capacity of this ring buffer.

func (*RingBuffer) Dispose

func (rb *RingBuffer) Dispose()

Dispose will dispose of this queue and free any blocked threads in the Put and/or Get methods. Calling those methods on a disposed queue will return an error.

func (*RingBuffer) Get

func (rb *RingBuffer) Get() (interface{}, error)

Get will return the next item in the queue. This call will block if the queue is empty. This call will unblock when an item is added to the queue or Dispose is called on the queue. An error will be returned if the queue is disposed.

func (*RingBuffer) IsDisposed

func (rb *RingBuffer) IsDisposed() bool

IsDisposed will return a bool indicating if this queue has been disposed.

func (*RingBuffer) Len

func (rb *RingBuffer) Len() uint64

Len returns the number of items in the queue.

func (*RingBuffer) Offer

func (rb *RingBuffer) Offer(item interface{}) (bool, error)

Offer adds the provided item to the queue if there is space. If the queue is full, this call will return false. An error will be returned if the queue is disposed.

func (*RingBuffer) Poll

func (rb *RingBuffer) Poll(timeout time.Duration) (interface{}, error)

Poll will return the next item in the queue. This call will block if the queue is empty. This call will unblock when an item is added to the queue, Dispose is called on the queue, or the timeout is reached. An error will be returned if the queue is disposed or a timeout occurs. A non-positive timeout will block indefinitely.

func (*RingBuffer) PollNonBlocking

func (rb *RingBuffer) PollNonBlocking(timeout time.Duration) (interface{}, error)

PollNonBlocking will return the next item in the queue. This call will unblock when an item is added to the queue, Dispose is called on the queue, queue is empty, or the timeout is reached. An error will be returned if the queue is disposed, a timeout occurs or queue is empty.

func (*RingBuffer) Put

func (rb *RingBuffer) Put(item interface{}) error

Put adds the provided item to the queue. If the queue is full, this call will block until an item is added to the queue or Dispose is called on the queue. An error will be returned if the queue is disposed.

type RingOption

type RingOption func(r *ringOption)

func WithWaitStrategy

func WithWaitStrategy(strategy WaitStrategy) RingOption

WithWaitStrategy sets the wait strategy for the ring buffer.

type Timeout

type Timeout interface {
	// Timer is Returns the Timer that created this handle.
	Timer() Timer

	// Task is Returns the TimerTask which is associated with this handle.
	Task() TimerTask

	// IsExpired is Returns true if and only if the TimerTask associated with this handle has been expired.
	IsExpired() bool

	// IsCancelled is Returns true if and only if the TimerTask associated with this handle has been cancelled.
	IsCancelled() bool

	// Cancel is Attempts to cancel the TimerTask associated with this handle.
	// If the task has been executed or cancelled already, it will return with no side effect.
	Cancel() bool
}

type Timer

type Timer interface {
	// NewTimeout is Schedules the specified TimerTask for one-time execution after the specified delay.
	NewTimeout(task TimerTask, delay time.Duration) (Timeout, error)

	// Stop is Releases all resources acquired by this Timer and cancels all
	// tasks which were scheduled but not executed yet.
	Stop() []Timeout
}

type TimerTask

type TimerTask interface {
	// Run is Executed after the delay specified with newTimeout.
	Run(timeout Timeout) error
}

func NewDataTimerTask

func NewDataTimerTask[T any](data T, f func(Timeout, T) error) TimerTask

NewDataTimerTask is a factory function that creates a new DataTimerTask.

type TimerTaskFunc

type TimerTaskFunc func(Timeout) error

TimerTaskFunc is a function type that implements TimerTask.

func (TimerTaskFunc) Run

func (f TimerTaskFunc) Run(timeout Timeout) error

type WaitStrategy

type WaitStrategy interface {
	WaitFor(timeout time.Duration) error
	SignalAll()
}

WaitStrategy is a strategy for waiting on a sequence to be available.

func NewBlockingWaitStrategy

func NewBlockingWaitStrategy() WaitStrategy

NewBlockingWaitStrategy creates a new blocking wait strategy.

func NewSleepingWaitStrategy

func NewSleepingWaitStrategy(sleepTime time.Duration) WaitStrategy

NewSleepingWaitStrategy creates a new sleeping wait strategy.

func NewYieldingWaitStrategy

func NewYieldingWaitStrategy() WaitStrategy

NewYieldingWaitStrategy creates a new yielding wait strategy.

type WheelBucket

type WheelBucket struct {
	// contains filtered or unexported fields
}

type WheelTimeout

type WheelTimeout struct {
	// contains filtered or unexported fields
}

func (*WheelTimeout) Cancel

func (timeout *WheelTimeout) Cancel() bool

func (*WheelTimeout) Expired

func (timeout *WheelTimeout) Expired()

func (*WheelTimeout) IsCancelled

func (timeout *WheelTimeout) IsCancelled() bool

func (*WheelTimeout) IsExpired

func (timeout *WheelTimeout) IsExpired() bool

func (*WheelTimeout) State

func (timeout *WheelTimeout) State() timeoutState

func (*WheelTimeout) String

func (timeout *WheelTimeout) String() string

func (*WheelTimeout) Task

func (timeout *WheelTimeout) Task() TimerTask

func (*WheelTimeout) Timer

func (timeout *WheelTimeout) Timer() Timer

type WheelTimer

type WheelTimer struct {
	// contains filtered or unexported fields
}

func NewWheelTimer

func NewWheelTimer(tickDuration time.Duration, ticksPerWheel uint32, opts ...WheelTimerOption) (*WheelTimer, error)

func (*WheelTimer) NewTimeout

func (tw *WheelTimer) NewTimeout(task TimerTask, delay time.Duration) (Timeout, error)

func (*WheelTimer) PendingTimeouts

func (tw *WheelTimer) PendingTimeouts() int64

func (*WheelTimer) Start

func (tw *WheelTimer) Start() error

func (*WheelTimer) State

func (tw *WheelTimer) State() workerState

func (*WheelTimer) Stop

func (tw *WheelTimer) Stop() []Timeout

type WheelTimerOption

type WheelTimerOption func(*option)

func WithExecutor

func WithExecutor(executor Executor) WheelTimerOption

func WithLogger

func WithLogger(logger *slog.Logger) WheelTimerOption

func WithMaxPendingTimeouts

func WithMaxPendingTimeouts(maxPendingTimeouts int64) WheelTimerOption

func WithPanicHandler

func WithPanicHandler(handler PanicHandler) WheelTimerOption

func WithRingBufferOptions

func WithRingBufferOptions(opts ...RingOption) WheelTimerOption

func WithRingBufferSize

func WithRingBufferSize(size uint64) WheelTimerOption

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL