common

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 19, 2025 License: ISC Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrTypeGeneric        = "EnSyncGenericError"
	ErrTypeAuth           = "EnSyncAuthError"
	ErrTypeConnection     = "EnSyncConnectionError"
	ErrTypePublish        = "EnSyncPublishError"
	ErrTypeSubscription   = "EnSyncSubscriptionError"
	ErrTypeTimeout        = "EnSyncTimeoutError"
	ErrTypeReplay         = "EnSyncReplayError"
	ErrTypeDefer          = "EnSyncDeferError"
	ErrTypeDiscard        = "EnSyncDiscardError"
	ErrTypePause          = "EnSyncPauseError"
	ErrTypeContinue       = "EnSyncContinueError"
	ErrTypeValidation     = "EnSyncValidationError"
	ErrTypeCircuitBreaker = "EnSyncCircuitBreakerError"
	ErrTypeMaxRetries     = "EnSyncMaxRetriesError"
	ErrTypeResume         = "EnSyncResumeError"
)
View Source
const (
	GenericMessage        = "Verify your EnSync engine is still operating"
	GeneralResponse       = "Failed to establish a connection with an EnSync engine"
	EventNotFound         = "Event not found or no longer available"
	InvalidDelay          = "Delay must be between 1000ms and 24 hours"
	NotAuthenticated      = "Not authenticated"
	RecipientsRequired    = "recipients array cannot be empty"
	RecipientsMustBeArray = "recipients must be an array"
)
View Source
const (
	EncryptionTypeHybrid = "hybrid"
)

Variables

View Source
var (
	ErrNotAuthenticated     = errors.New("not authenticated")
	ErrNotConnected         = errors.New("not connected")
	ErrMaxRetriesExceeded   = errors.New("max reconnection attempts exceeded")
	ErrCircuitBreakerOpen   = errors.New("circuit breaker is open")
	ErrInvalidRecipients    = errors.New("recipients array cannot be empty")
	ErrConnectionClosed     = errors.New("connection closed")
	ErrSubscriptionNotFound = errors.New("subscription not found")
	ErrInvalidURL           = errors.New("invalid URL")
	ErrInvalidAccessKey     = errors.New("invalid access key")
)

Functions

func DecryptEd25519

func DecryptEd25519(encrypted *EncryptedMessage, secretKey []byte) (string, error)

func DecryptMessageKey

func DecryptMessageKey(encryptedKey *EncryptedMessage, recipientEd25519SecretKey []byte) ([]byte, error)

func GenerateMessageKey

func GenerateMessageKey() ([]byte, error)

func IsRetryableError

func IsRetryableError(err error) bool

func ValidateEd25519KeyPair added in v0.1.3

func ValidateEd25519KeyPair(publicKey, privateKey []byte) bool

Types

type BaseEngine

type BaseEngine struct {
	AccessKey    string
	AppSecretKey string
	State        engineState
	ClientID     string
	Logger       Logger
	ClientHash   string
	Ctx          context.Context
	// contains filtered or unexported fields
}

func NewBaseEngine

func NewBaseEngine(ctx context.Context, opts ...Option) (*BaseEngine, error)

func (*BaseEngine) DecryptEventPayload added in v0.1.4

func (e *BaseEngine) DecryptEventPayload(
	encryptedPayloadBase64 string,
	decryptionKey string,
) (map[string]interface{}, error)

func (*BaseEngine) ExecuteOperation added in v0.1.4

func (e *BaseEngine) ExecuteOperation(op Operation) error

func (*BaseEngine) GetGracefulShutdownTimeout added in v0.1.5

func (b *BaseEngine) GetGracefulShutdownTimeout() time.Duration

func (*BaseEngine) GetOperationTimeout added in v0.1.5

func (b *BaseEngine) GetOperationTimeout() time.Duration

func (*BaseEngine) GetPingIntervalTimeout added in v0.2.1

func (b *BaseEngine) GetPingIntervalTimeout() time.Duration

func (*BaseEngine) ParseMetadata added in v0.1.4

func (e *BaseEngine) ParseMetadata(
	metadata map[string]interface{},
) map[string]interface{}

func (*BaseEngine) PreparePublishData added in v0.1.4

func (e *BaseEngine) PreparePublishData(
	payload map[string]interface{},
	metadata *EventMetadata,
) (*PublishData, error)

func (*BaseEngine) Retry

func (b *BaseEngine) Retry(ctx context.Context, fn func() error, cfg *retryConfig) error

func (*BaseEngine) ValidatePublishInput added in v0.1.4

func (e *BaseEngine) ValidatePublishInput(eventName string, recipients []string) error

func (*BaseEngine) ValidateSubscribeInput added in v0.1.4

func (e *BaseEngine) ValidateSubscribeInput(eventName string) error

func (*BaseEngine) WithRetry

func (b *BaseEngine) WithRetry(ctx context.Context, fn func() error) error

type BaseEngineBuilder

type BaseEngineBuilder struct {
	// contains filtered or unexported fields
}

func NewBaseEngineBuilder

func NewBaseEngineBuilder() *BaseEngineBuilder

func (*BaseEngineBuilder) Build

func (b *BaseEngineBuilder) Build(ctx context.Context) (*BaseEngine, error)

func (*BaseEngineBuilder) WithConfigOptions

func (b *BaseEngineBuilder) WithConfigOptions(opts ...Option) *BaseEngineBuilder

type BaseSubscription

type BaseSubscription struct {
	EventName string
	Handlers  []EventHandler
	Mu        sync.RWMutex
	// contains filtered or unexported fields
}

func NewBaseSubscription added in v0.1.5

func NewBaseSubscription(eventName string, logger Logger) *BaseSubscription

func (*BaseSubscription) AddHandler

func (s *BaseSubscription) AddHandler(handler EventHandler) func()

func (*BaseSubscription) CallHandlers

func (s *BaseSubscription) CallHandlers(event *EventPayload)

func (*BaseSubscription) GetHandlers

func (s *BaseSubscription) GetHandlers() []EventHandler

func (*BaseSubscription) StartWorkerPool added in v0.2.1

func (s *BaseSubscription) StartWorkerPool() error

func (*BaseSubscription) StopWorkerPool added in v0.2.1

func (s *BaseSubscription) StopWorkerPool()

func (*BaseSubscription) SubmitJob added in v0.2.1

func (s *BaseSubscription) SubmitJob(event *EventPayload) error

type ClientConfig

type ClientConfig struct {
	AppSecretKey string
	ClientID     string
}

type ClientOption

type ClientOption func(*ClientConfig)

func WithAppSecretKey

func WithAppSecretKey(secretKey string) ClientOption

func WithClientID

func WithClientID(clientID string) ClientOption

type ConnectionError

type ConnectionError struct {
	Addr string
	Err  error
}

func (*ConnectionError) Error

func (e *ConnectionError) Error() string

func (*ConnectionError) Unwrap

func (e *ConnectionError) Unwrap() error

type ContinueResponse

type ContinueResponse struct {
	Status    string `json:"status"`
	Action    string `json:"action"`
	EventName string `json:"eventName"`
}

ContinueResponse represents the response from a continue operation

type DeferResponse

type DeferResponse struct {
	Status            string    `json:"status"`
	Action            string    `json:"action"`
	EventID           string    `json:"eventId"`
	DelayMs           int64     `json:"delayMs"`
	ScheduledDelivery time.Time `json:"scheduledDelivery"`
	Timestamp         time.Time `json:"timestamp"`
}

DeferResponse represents the response from a defer operation

type DiscardResponse

type DiscardResponse struct {
	Status    string    `json:"status"`
	Action    string    `json:"action"`
	EventID   string    `json:"eventId"`
	Timestamp time.Time `json:"timestamp"`
}

DiscardResponse represents the response from a discard operation

type EnSyncError

type EnSyncError struct {
	Message string
	Type    string
	Err     error
}

func NewEnSyncError

func NewEnSyncError(message, errType string, err error) *EnSyncError

func (*EnSyncError) Error

func (e *EnSyncError) Error() string

func (*EnSyncError) Unwrap

func (e *EnSyncError) Unwrap() error

type EncryptedMessage

type EncryptedMessage struct {
	Nonce              string `json:"nonce"`
	Ciphertext         string `json:"ciphertext"`
	EphemeralPublicKey string `json:"ephemeralPublicKey"`
}

func EncryptEd25519

func EncryptEd25519(payload string, publicKey []byte) (*EncryptedMessage, error)

type EncryptedSymmetric

type EncryptedSymmetric struct {
	Nonce      string `json:"nonce"`
	Ciphertext string `json:"ciphertext"`
}

type Engine

type Engine interface {
	// CreateClient creates and authenticates a new client
	CreateClient(accessKey string, options ...ClientOption) error

	// Publish publishes an event to the EnSync system
	Publish(
		eventName string,
		recipients []string,
		payload map[string]interface{},
		metadata *EventMetadata,
		options *PublishOptions,
	) (string, error)

	// Subscribe subscribes to an event
	Subscribe(eventName string, options *SubscribeOptions) (Subscription, error)

	// Close closes the connection
	Close() error

	// GetClientPublicKey returns the client's public key
	GetClientPublicKey() string

	// AnalyzePayload analyzes a payload and returns metadata
	AnalyzePayload(payload map[string]interface{}) *PayloadMetadata

	// Context returns the engine's context
	Context() context.Context

	// Logger returns the engine's logger
	Logger() Logger
}

Engine is the main interface for EnSync clients

type EventHandler

type EventHandler func(*EventPayload) error

EventHandler is a function that handles incoming events

type EventMetadata

type EventMetadata struct {
	Persist bool              `json:"persist"`
	Headers map[string]string `json:"headers"`
}

EventMetadata represents metadata for an event

type EventPayload

type EventPayload struct {
	EventName string                 `json:"eventName"`
	Idem      string                 `json:"idem"`
	Block     int64                  `json:"block"`
	Timestamp time.Time              `json:"timestamp"`
	Payload   map[string]interface{} `json:"payload"`
	Metadata  map[string]interface{} `json:"metadata"`
	Sender    string                 `json:"sender"`
}

EventPayload represents a received event

type HybridEncryptedMessage

type HybridEncryptedMessage struct {
	Type    string                      `json:"type"`
	Payload EncryptedSymmetric          `json:"payload"`
	Keys    map[string]EncryptedMessage `json:"keys"`
}

func HybridEncrypt

func HybridEncrypt(message string, recipientPublicKeys []string) (*HybridEncryptedMessage, error)

type Logger

type Logger interface {
	Debug(msg string, keysAndValues ...interface{})
	Info(msg string, keysAndValues ...interface{})
	Warn(msg string, keysAndValues ...interface{})
	Error(msg string, keysAndValues ...interface{})
}

type Operation added in v0.1.4

type Operation struct {
	Name    string
	Execute func(context.Context) error
}

type Option

type Option func(*engineConfig)

func WithCircuitBreaker

func WithCircuitBreaker(failureThreshold int, resetTimeout, maxResetTimeout time.Duration) Option

func WithDefaultRetryConfig

func WithDefaultRetryConfig() Option

func WithLogger

func WithLogger(logger Logger) Option

func WithRetryConfig

func WithRetryConfig(maxAttempts int, initialBackoff, maxBackoff time.Duration, jitter float64) Option

func WithTimeoutOptions added in v0.2.1

func WithTimeoutOptions(opts ...TimeoutOption) Option

type PauseResponse

type PauseResponse struct {
	Status    string `json:"status"`
	Action    string `json:"action"`
	EventName string `json:"eventName"`
	Reason    string `json:"reason,omitempty"`
}

PauseResponse represents the response from a pause operation

type PayloadMetadata

type PayloadMetadata struct {
	ByteSize int               `json:"byteSize"`
	Skeleton map[string]string `json:"skeleton"`
}

PayloadMetadata represents metadata about a payload

type PublishData added in v0.1.4

type PublishData struct {
	PayloadJSON     []byte
	MetadataJSON    []byte
	PayloadMetaJSON []byte
}

PublishData is an internal struct for publishing events

type PublishOptions

type PublishOptions struct {
	UseHybridEncryption bool `json:"useHybridEncryption"`
}

PublishOptions contains options for publishing events

type SubscribeOptions

type SubscribeOptions struct {
	AutoAck      bool   `json:"autoAck"`
	AppSecretKey string `json:"appSecretKey"`
}

SubscribeOptions contains options for subscribing to events

type Subscription

type Subscription interface {
	// AddHandler registers an event handler for this subscription
	AddHandler(handler EventHandler) func()

	// Ack acknowledges an event
	Ack(eventIdem string, block int64) error

	// Resume resumes event processing
	Resume() error

	// Pause pauses event processing
	Pause(reason string) error

	// Defer defers an event for later processing
	Defer(eventIdem string, delayMs int64, reason string) (*DeferResponse, error)

	// Discard permanently discards an event
	Discard(eventIdem string, reason string) (*DiscardResponse, error)

	// Rollback rolls back an event
	Rollback(eventIdem string, block int64) error

	// Replay requests a specific event to be sent again
	Replay(eventIdem string) (*EventPayload, error)

	// Unsubscribe unsubscribes from the event
	Unsubscribe() error
}

Subscription represents an active subscription to an event

type SubscriptionManager

type SubscriptionManager struct {
	// contains filtered or unexported fields
}

func NewSubscriptionManager added in v0.2.1

func NewSubscriptionManager(logger Logger) *SubscriptionManager

func (*SubscriptionManager) CloseAll added in v0.2.1

func (sm *SubscriptionManager) CloseAll(ctx context.Context) error

func (*SubscriptionManager) Count added in v0.2.1

func (sm *SubscriptionManager) Count() int

func (*SubscriptionManager) Exists

func (sm *SubscriptionManager) Exists(eventName string) bool

func (*SubscriptionManager) Get added in v0.2.1

func (sm *SubscriptionManager) Get(eventName string) (Subscription, bool)

func (*SubscriptionManager) GetAll added in v0.2.1

func (sm *SubscriptionManager) GetAll() []Subscription

func (*SubscriptionManager) Range

func (sm *SubscriptionManager) Range(fn func(key, value interface{}) bool)

func (*SubscriptionManager) Register added in v0.2.1

func (sm *SubscriptionManager) Register(eventName string, sub Subscription) error

func (*SubscriptionManager) Unregister added in v0.2.1

func (sm *SubscriptionManager) Unregister(eventName string) error

type TimeoutOption added in v0.2.1

type TimeoutOption func(*timeoutConfig)

func WithGracefulShutdownTimeout added in v0.2.1

func WithGracefulShutdownTimeout(d time.Duration) TimeoutOption

func WithOperationTimeout added in v0.1.4

func WithOperationTimeout(d time.Duration) TimeoutOption

func WithPingInterval added in v0.2.1

func WithPingInterval(d time.Duration) TimeoutOption

func WithPingTimeout added in v0.2.1

func WithPingTimeout(d time.Duration) TimeoutOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL