diff --git a/core/data/data.go b/core/data/data.go index c68cf6f..1585260 100644 --- a/core/data/data.go +++ b/core/data/data.go @@ -4,15 +4,14 @@ package data import "time" type Envelope struct { - Source string - Key string - Timestamp time.Time - Sequence uint64 - Payload []byte + Identifier string + Timestamp time.Time + Sequence uint64 + Payload []byte } type Source interface { - Start() error + Start(config any) error Stop() Name() string @@ -23,19 +22,12 @@ type Source interface { GetSubscriptions() []string } -type Sink interface { - Start() error - Stop() - - Publish(envelope Envelope) error -} - type Processor interface { - Start(context ProcessorContext) error + Start(config any, send chan<- Envelope, receive <-chan Envelope) error Stop() } -type ProcessorContext interface { - Send(inPort string, messsage Envelope) error - Receive(outPort string) Envelope +type Sink interface { + Start(config any, receive <-chan Envelope) error + Stop() } diff --git a/core/data/errors.go b/core/data/errors.go new file mode 100644 index 0000000..a136936 --- /dev/null +++ b/core/data/errors.go @@ -0,0 +1,10 @@ +package data + +import "errors" + +var ( + ErrUnknownKey = errors.New("unknown key") + ErrAlreadySubscribed = errors.New("already subscribed") + ErrNotSubscribed = errors.New("not subscribed") + ErrDroppedMessage = errors.New("dropped message: buffer full") +) diff --git a/core/data/registry/processor.go b/core/data/registry/processor.go new file mode 100644 index 0000000..e1d3011 --- /dev/null +++ b/core/data/registry/processor.go @@ -0,0 +1,34 @@ +// Package registry ... +package registry + +import ( + "fmt" + "sync" + + "gitlab.michelsen.id/phillmichelsen/tessera/core/data" +) + +type ProcessorFactory func(string) (data.Processor, error) + +type Processors struct { + mu sync.RWMutex + m map[string]ProcessorFactory +} + +func NewProcessors() *Processors { return &Processors{m: make(map[string]ProcessorFactory)} } + +func (r *Processors) Register(name string, f ProcessorFactory) { + r.mu.Lock() + defer r.mu.Unlock() + r.m[name] = f +} + +func (r *Processors) New(name string, params string) (data.Processor, error) { + r.mu.RLock() + f, ok := r.m[name] + r.mu.RUnlock() + if !ok { + return nil, fmt.Errorf("unknown processor: %s", name) + } + return f(params) +} diff --git a/core/data/registry/sinks.go b/core/data/registry/sinks.go new file mode 100644 index 0000000..3d00064 --- /dev/null +++ b/core/data/registry/sinks.go @@ -0,0 +1,33 @@ +package registry + +import ( + "fmt" + "sync" + + "gitlab.michelsen.id/phillmichelsen/tessera/core/data" +) + +type SinkFactory func(string) (data.Sink, error) + +type Sinks struct { + mu sync.RWMutex + m map[string]SinkFactory +} + +func NewSinks() *Sinks { return &Sinks{m: make(map[string]SinkFactory)} } + +func (r *Sinks) Register(name string, f SinkFactory) { + r.mu.Lock() + defer r.mu.Unlock() + r.m[name] = f +} + +func (r *Sinks) New(name string, params string) (data.Sink, error) { + r.mu.RLock() + f, ok := r.m[name] + r.mu.RUnlock() + if !ok { + return nil, fmt.Errorf("unknown sink: %s", name) + } + return f(params) +} diff --git a/core/data/registry/sources.go b/core/data/registry/sources.go new file mode 100644 index 0000000..174a458 --- /dev/null +++ b/core/data/registry/sources.go @@ -0,0 +1,33 @@ +package registry + +import ( + "fmt" + "sync" + + "gitlab.michelsen.id/phillmichelsen/tessera/core/data" +) + +type SourceFactory func(string) (data.Source, error) + +type Sources struct { + mu sync.RWMutex + m map[string]SourceFactory +} + +func NewSources() *Sources { return &Sources{m: make(map[string]SourceFactory)} } + +func (r *Sources) Register(name string, f SourceFactory) { + r.mu.Lock() + defer r.mu.Unlock() + r.m[name] = f +} + +func (r *Sources) New(name string, params string) (data.Source, error) { + r.mu.RLock() + f, ok := r.m[name] + r.mu.RUnlock() + if !ok { + return nil, fmt.Errorf("unknown source: %s", name) + } + return f(params) +} diff --git a/core/data/router/router.go b/core/data/router/router.go new file mode 100644 index 0000000..e69de29 diff --git a/core/data/wire.go b/core/data/wire.go new file mode 100644 index 0000000..0ad59c2 --- /dev/null +++ b/core/data/wire.go @@ -0,0 +1 @@ +package data