Documentation
¶
Index ¶
- Constants
- Variables
- func DecryptEd25519(encrypted *EncryptedMessage, secretKey []byte) (string, error)
- func DecryptMessageKey(encryptedKey *EncryptedMessage, recipientEd25519SecretKey []byte) ([]byte, error)
- func GenerateMessageKey() ([]byte, error)
- func IsRetryableError(err error) bool
- func ValidateEd25519KeyPair(publicKey, privateKey []byte) bool
- type BaseEngine
- func (e *BaseEngine) DecryptEventPayload(encryptedPayloadBase64 string, decryptionKey string) (map[string]interface{}, error)
- func (e *BaseEngine) ExecuteOperation(op Operation) error
- func (b *BaseEngine) GetGracefulShutdownTimeout() time.Duration
- func (b *BaseEngine) GetOperationTimeout() time.Duration
- func (b *BaseEngine) GetPingIntervalTimeout() time.Duration
- func (e *BaseEngine) ParseMetadata(metadata map[string]interface{}) map[string]interface{}
- func (e *BaseEngine) PreparePublishData(payload map[string]interface{}, metadata *EventMetadata) (*PublishData, error)
- func (b *BaseEngine) Retry(ctx context.Context, fn func() error, cfg *retryConfig) error
- func (e *BaseEngine) ValidatePublishInput(eventName string, recipients []string) error
- func (e *BaseEngine) ValidateSubscribeInput(eventName string) error
- func (b *BaseEngine) WithRetry(ctx context.Context, fn func() error) error
- type BaseEngineBuilder
- type BaseSubscription
- func (s *BaseSubscription) AddHandler(handler EventHandler) func()
- func (s *BaseSubscription) CallHandlers(event *EventPayload)
- func (s *BaseSubscription) GetHandlers() []EventHandler
- func (s *BaseSubscription) StartWorkerPool() error
- func (s *BaseSubscription) StopWorkerPool()
- func (s *BaseSubscription) SubmitJob(event *EventPayload) error
- type ClientConfig
- type ClientOption
- type ConnectionError
- type ContinueResponse
- type DeferResponse
- type DiscardResponse
- type EnSyncError
- type EncryptedMessage
- type EncryptedSymmetric
- type Engine
- type EventHandler
- type EventMetadata
- type EventPayload
- type HybridEncryptedMessage
- type Logger
- type Operation
- type Option
- func WithCircuitBreaker(failureThreshold int, resetTimeout, maxResetTimeout time.Duration) Option
- func WithDefaultRetryConfig() Option
- func WithLogger(logger Logger) Option
- func WithRetryConfig(maxAttempts int, initialBackoff, maxBackoff time.Duration, jitter float64) Option
- func WithTimeoutOptions(opts ...TimeoutOption) Option
- type PauseResponse
- type PayloadMetadata
- type PublishData
- type PublishOptions
- type SubscribeOptions
- type Subscription
- type SubscriptionManager
- func (sm *SubscriptionManager) CloseAll(ctx context.Context) error
- func (sm *SubscriptionManager) Count() int
- func (sm *SubscriptionManager) Exists(eventName string) bool
- func (sm *SubscriptionManager) Get(eventName string) (Subscription, bool)
- func (sm *SubscriptionManager) GetAll() []Subscription
- func (sm *SubscriptionManager) Range(fn func(key, value interface{}) bool)
- func (sm *SubscriptionManager) Register(eventName string, sub Subscription) error
- func (sm *SubscriptionManager) Unregister(eventName string) error
- type TimeoutOption
Constants ¶
const ( ErrTypeGeneric = "EnSyncGenericError" ErrTypeAuth = "EnSyncAuthError" ErrTypeConnection = "EnSyncConnectionError" ErrTypePublish = "EnSyncPublishError" ErrTypeSubscription = "EnSyncSubscriptionError" ErrTypeTimeout = "EnSyncTimeoutError" ErrTypeReplay = "EnSyncReplayError" ErrTypeDefer = "EnSyncDeferError" ErrTypeDiscard = "EnSyncDiscardError" ErrTypePause = "EnSyncPauseError" ErrTypeContinue = "EnSyncContinueError" ErrTypeValidation = "EnSyncValidationError" ErrTypeCircuitBreaker = "EnSyncCircuitBreakerError" ErrTypeMaxRetries = "EnSyncMaxRetriesError" ErrTypeResume = "EnSyncResumeError" )
const ( GenericMessage = "Verify your EnSync engine is still operating" GeneralResponse = "Failed to establish a connection with an EnSync engine" EventNotFound = "Event not found or no longer available" InvalidDelay = "Delay must be between 1000ms and 24 hours" NotAuthenticated = "Not authenticated" RecipientsRequired = "recipients array cannot be empty" RecipientsMustBeArray = "recipients must be an array" )
const (
EncryptionTypeHybrid = "hybrid"
)
Variables ¶
var ( ErrNotAuthenticated = errors.New("not authenticated") ErrNotConnected = errors.New("not connected") ErrMaxRetriesExceeded = errors.New("max reconnection attempts exceeded") ErrCircuitBreakerOpen = errors.New("circuit breaker is open") ErrInvalidRecipients = errors.New("recipients array cannot be empty") ErrConnectionClosed = errors.New("connection closed") ErrSubscriptionNotFound = errors.New("subscription not found") ErrInvalidURL = errors.New("invalid URL") ErrInvalidAccessKey = errors.New("invalid access key") )
Functions ¶
func DecryptEd25519 ¶
func DecryptEd25519(encrypted *EncryptedMessage, secretKey []byte) (string, error)
func DecryptMessageKey ¶
func DecryptMessageKey(encryptedKey *EncryptedMessage, recipientEd25519SecretKey []byte) ([]byte, error)
func GenerateMessageKey ¶
func IsRetryableError ¶
func ValidateEd25519KeyPair ¶ added in v0.1.3
Types ¶
type BaseEngine ¶
type BaseEngine struct {
AccessKey string
AppSecretKey string
State engineState
ClientID string
Logger Logger
ClientHash string
Ctx context.Context
// contains filtered or unexported fields
}
func NewBaseEngine ¶
func NewBaseEngine(ctx context.Context, opts ...Option) (*BaseEngine, error)
func (*BaseEngine) DecryptEventPayload ¶ added in v0.1.4
func (e *BaseEngine) DecryptEventPayload( encryptedPayloadBase64 string, decryptionKey string, ) (map[string]interface{}, error)
func (*BaseEngine) ExecuteOperation ¶ added in v0.1.4
func (e *BaseEngine) ExecuteOperation(op Operation) error
func (*BaseEngine) GetGracefulShutdownTimeout ¶ added in v0.1.5
func (b *BaseEngine) GetGracefulShutdownTimeout() time.Duration
func (*BaseEngine) GetOperationTimeout ¶ added in v0.1.5
func (b *BaseEngine) GetOperationTimeout() time.Duration
func (*BaseEngine) GetPingIntervalTimeout ¶ added in v0.2.1
func (b *BaseEngine) GetPingIntervalTimeout() time.Duration
func (*BaseEngine) ParseMetadata ¶ added in v0.1.4
func (e *BaseEngine) ParseMetadata( metadata map[string]interface{}, ) map[string]interface{}
func (*BaseEngine) PreparePublishData ¶ added in v0.1.4
func (e *BaseEngine) PreparePublishData( payload map[string]interface{}, metadata *EventMetadata, ) (*PublishData, error)
func (*BaseEngine) Retry ¶
func (b *BaseEngine) Retry(ctx context.Context, fn func() error, cfg *retryConfig) error
func (*BaseEngine) ValidatePublishInput ¶ added in v0.1.4
func (e *BaseEngine) ValidatePublishInput(eventName string, recipients []string) error
func (*BaseEngine) ValidateSubscribeInput ¶ added in v0.1.4
func (e *BaseEngine) ValidateSubscribeInput(eventName string) error
type BaseEngineBuilder ¶
type BaseEngineBuilder struct {
// contains filtered or unexported fields
}
func NewBaseEngineBuilder ¶
func NewBaseEngineBuilder() *BaseEngineBuilder
func (*BaseEngineBuilder) Build ¶
func (b *BaseEngineBuilder) Build(ctx context.Context) (*BaseEngine, error)
func (*BaseEngineBuilder) WithConfigOptions ¶
func (b *BaseEngineBuilder) WithConfigOptions(opts ...Option) *BaseEngineBuilder
type BaseSubscription ¶
type BaseSubscription struct {
EventName string
Handlers []EventHandler
Mu sync.RWMutex
// contains filtered or unexported fields
}
func NewBaseSubscription ¶ added in v0.1.5
func NewBaseSubscription(eventName string, logger Logger) *BaseSubscription
func (*BaseSubscription) AddHandler ¶
func (s *BaseSubscription) AddHandler(handler EventHandler) func()
func (*BaseSubscription) CallHandlers ¶
func (s *BaseSubscription) CallHandlers(event *EventPayload)
func (*BaseSubscription) GetHandlers ¶
func (s *BaseSubscription) GetHandlers() []EventHandler
func (*BaseSubscription) StartWorkerPool ¶ added in v0.2.1
func (s *BaseSubscription) StartWorkerPool() error
func (*BaseSubscription) StopWorkerPool ¶ added in v0.2.1
func (s *BaseSubscription) StopWorkerPool()
func (*BaseSubscription) SubmitJob ¶ added in v0.2.1
func (s *BaseSubscription) SubmitJob(event *EventPayload) error
type ClientConfig ¶
type ClientOption ¶
type ClientOption func(*ClientConfig)
func WithAppSecretKey ¶
func WithAppSecretKey(secretKey string) ClientOption
func WithClientID ¶
func WithClientID(clientID string) ClientOption
type ConnectionError ¶
func (*ConnectionError) Error ¶
func (e *ConnectionError) Error() string
func (*ConnectionError) Unwrap ¶
func (e *ConnectionError) Unwrap() error
type ContinueResponse ¶
type ContinueResponse struct {
Status string `json:"status"`
Action string `json:"action"`
EventName string `json:"eventName"`
}
ContinueResponse represents the response from a continue operation
type DeferResponse ¶
type DeferResponse struct {
Status string `json:"status"`
Action string `json:"action"`
EventID string `json:"eventId"`
DelayMs int64 `json:"delayMs"`
ScheduledDelivery time.Time `json:"scheduledDelivery"`
Timestamp time.Time `json:"timestamp"`
}
DeferResponse represents the response from a defer operation
type DiscardResponse ¶
type DiscardResponse struct {
Status string `json:"status"`
Action string `json:"action"`
EventID string `json:"eventId"`
Timestamp time.Time `json:"timestamp"`
}
DiscardResponse represents the response from a discard operation
type EnSyncError ¶
func NewEnSyncError ¶
func NewEnSyncError(message, errType string, err error) *EnSyncError
func (*EnSyncError) Error ¶
func (e *EnSyncError) Error() string
func (*EnSyncError) Unwrap ¶
func (e *EnSyncError) Unwrap() error
type EncryptedMessage ¶
type EncryptedMessage struct {
Nonce string `json:"nonce"`
Ciphertext string `json:"ciphertext"`
EphemeralPublicKey string `json:"ephemeralPublicKey"`
}
func EncryptEd25519 ¶
func EncryptEd25519(payload string, publicKey []byte) (*EncryptedMessage, error)
type EncryptedSymmetric ¶
type Engine ¶
type Engine interface {
// CreateClient creates and authenticates a new client
CreateClient(accessKey string, options ...ClientOption) error
// Publish publishes an event to the EnSync system
Publish(
eventName string,
recipients []string,
payload map[string]interface{},
metadata *EventMetadata,
options *PublishOptions,
) (string, error)
// Subscribe subscribes to an event
Subscribe(eventName string, options *SubscribeOptions) (Subscription, error)
// Close closes the connection
Close() error
// GetClientPublicKey returns the client's public key
GetClientPublicKey() string
// AnalyzePayload analyzes a payload and returns metadata
AnalyzePayload(payload map[string]interface{}) *PayloadMetadata
// Context returns the engine's context
Context() context.Context
// Logger returns the engine's logger
Logger() Logger
}
Engine is the main interface for EnSync clients
type EventHandler ¶
type EventHandler func(*EventPayload) error
EventHandler is a function that handles incoming events
type EventMetadata ¶
type EventMetadata struct {
Persist bool `json:"persist"`
Headers map[string]string `json:"headers"`
}
EventMetadata represents metadata for an event
type EventPayload ¶
type EventPayload struct {
EventName string `json:"eventName"`
Idem string `json:"idem"`
Block int64 `json:"block"`
Timestamp time.Time `json:"timestamp"`
Payload map[string]interface{} `json:"payload"`
Metadata map[string]interface{} `json:"metadata"`
Sender string `json:"sender"`
}
EventPayload represents a received event
type HybridEncryptedMessage ¶
type HybridEncryptedMessage struct {
Type string `json:"type"`
Payload EncryptedSymmetric `json:"payload"`
Keys map[string]EncryptedMessage `json:"keys"`
}
func HybridEncrypt ¶
func HybridEncrypt(message string, recipientPublicKeys []string) (*HybridEncryptedMessage, error)
type Option ¶
type Option func(*engineConfig)
func WithCircuitBreaker ¶
func WithDefaultRetryConfig ¶
func WithDefaultRetryConfig() Option
func WithLogger ¶
func WithRetryConfig ¶
func WithTimeoutOptions ¶ added in v0.2.1
func WithTimeoutOptions(opts ...TimeoutOption) Option
type PauseResponse ¶
type PauseResponse struct {
Status string `json:"status"`
Action string `json:"action"`
EventName string `json:"eventName"`
Reason string `json:"reason,omitempty"`
}
PauseResponse represents the response from a pause operation
type PayloadMetadata ¶
type PayloadMetadata struct {
ByteSize int `json:"byteSize"`
Skeleton map[string]string `json:"skeleton"`
}
PayloadMetadata represents metadata about a payload
type PublishData ¶ added in v0.1.4
PublishData is an internal struct for publishing events
type PublishOptions ¶
type PublishOptions struct {
UseHybridEncryption bool `json:"useHybridEncryption"`
}
PublishOptions contains options for publishing events
type SubscribeOptions ¶
type SubscribeOptions struct {
AutoAck bool `json:"autoAck"`
AppSecretKey string `json:"appSecretKey"`
}
SubscribeOptions contains options for subscribing to events
type Subscription ¶
type Subscription interface {
// AddHandler registers an event handler for this subscription
AddHandler(handler EventHandler) func()
// Ack acknowledges an event
Ack(eventIdem string, block int64) error
// Resume resumes event processing
Resume() error
// Pause pauses event processing
Pause(reason string) error
// Defer defers an event for later processing
Defer(eventIdem string, delayMs int64, reason string) (*DeferResponse, error)
// Discard permanently discards an event
Discard(eventIdem string, reason string) (*DiscardResponse, error)
// Rollback rolls back an event
Rollback(eventIdem string, block int64) error
// Replay requests a specific event to be sent again
Replay(eventIdem string) (*EventPayload, error)
// Unsubscribe unsubscribes from the event
Unsubscribe() error
}
Subscription represents an active subscription to an event
type SubscriptionManager ¶
type SubscriptionManager struct {
// contains filtered or unexported fields
}
func NewSubscriptionManager ¶ added in v0.2.1
func NewSubscriptionManager(logger Logger) *SubscriptionManager
func (*SubscriptionManager) CloseAll ¶ added in v0.2.1
func (sm *SubscriptionManager) CloseAll(ctx context.Context) error
func (*SubscriptionManager) Count ¶ added in v0.2.1
func (sm *SubscriptionManager) Count() int
func (*SubscriptionManager) Exists ¶
func (sm *SubscriptionManager) Exists(eventName string) bool
func (*SubscriptionManager) Get ¶ added in v0.2.1
func (sm *SubscriptionManager) Get(eventName string) (Subscription, bool)
func (*SubscriptionManager) GetAll ¶ added in v0.2.1
func (sm *SubscriptionManager) GetAll() []Subscription
func (*SubscriptionManager) Range ¶
func (sm *SubscriptionManager) Range(fn func(key, value interface{}) bool)
func (*SubscriptionManager) Register ¶ added in v0.2.1
func (sm *SubscriptionManager) Register(eventName string, sub Subscription) error
func (*SubscriptionManager) Unregister ¶ added in v0.2.1
func (sm *SubscriptionManager) Unregister(eventName string) error
type TimeoutOption ¶ added in v0.2.1
type TimeoutOption func(*timeoutConfig)
func WithGracefulShutdownTimeout ¶ added in v0.2.1
func WithGracefulShutdownTimeout(d time.Duration) TimeoutOption
func WithOperationTimeout ¶ added in v0.1.4
func WithOperationTimeout(d time.Duration) TimeoutOption
func WithPingInterval ¶ added in v0.2.1
func WithPingInterval(d time.Duration) TimeoutOption
func WithPingTimeout ¶ added in v0.2.1
func WithPingTimeout(d time.Duration) TimeoutOption