Begun data module
This commit is contained in:
@@ -4,15 +4,14 @@ package data
|
|||||||
import "time"
|
import "time"
|
||||||
|
|
||||||
type Envelope struct {
|
type Envelope struct {
|
||||||
Source string
|
Identifier string
|
||||||
Key string
|
Timestamp time.Time
|
||||||
Timestamp time.Time
|
Sequence uint64
|
||||||
Sequence uint64
|
Payload []byte
|
||||||
Payload []byte
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Source interface {
|
type Source interface {
|
||||||
Start() error
|
Start(config any) error
|
||||||
Stop()
|
Stop()
|
||||||
Name() string
|
Name() string
|
||||||
|
|
||||||
@@ -23,19 +22,12 @@ type Source interface {
|
|||||||
GetSubscriptions() []string
|
GetSubscriptions() []string
|
||||||
}
|
}
|
||||||
|
|
||||||
type Sink interface {
|
|
||||||
Start() error
|
|
||||||
Stop()
|
|
||||||
|
|
||||||
Publish(envelope Envelope) error
|
|
||||||
}
|
|
||||||
|
|
||||||
type Processor interface {
|
type Processor interface {
|
||||||
Start(context ProcessorContext) error
|
Start(config any, send chan<- Envelope, receive <-chan Envelope) error
|
||||||
Stop()
|
Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
type ProcessorContext interface {
|
type Sink interface {
|
||||||
Send(inPort string, messsage Envelope) error
|
Start(config any, receive <-chan Envelope) error
|
||||||
Receive(outPort string) Envelope
|
Stop()
|
||||||
}
|
}
|
||||||
|
|||||||
10
core/data/errors.go
Normal file
10
core/data/errors.go
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
package data
|
||||||
|
|
||||||
|
import "errors"
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrUnknownKey = errors.New("unknown key")
|
||||||
|
ErrAlreadySubscribed = errors.New("already subscribed")
|
||||||
|
ErrNotSubscribed = errors.New("not subscribed")
|
||||||
|
ErrDroppedMessage = errors.New("dropped message: buffer full")
|
||||||
|
)
|
||||||
34
core/data/registry/processor.go
Normal file
34
core/data/registry/processor.go
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
// Package registry ...
|
||||||
|
package registry
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"gitlab.michelsen.id/phillmichelsen/tessera/core/data"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ProcessorFactory func(string) (data.Processor, error)
|
||||||
|
|
||||||
|
type Processors struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
m map[string]ProcessorFactory
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewProcessors() *Processors { return &Processors{m: make(map[string]ProcessorFactory)} }
|
||||||
|
|
||||||
|
func (r *Processors) Register(name string, f ProcessorFactory) {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
r.m[name] = f
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Processors) New(name string, params string) (data.Processor, error) {
|
||||||
|
r.mu.RLock()
|
||||||
|
f, ok := r.m[name]
|
||||||
|
r.mu.RUnlock()
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("unknown processor: %s", name)
|
||||||
|
}
|
||||||
|
return f(params)
|
||||||
|
}
|
||||||
33
core/data/registry/sinks.go
Normal file
33
core/data/registry/sinks.go
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
package registry
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"gitlab.michelsen.id/phillmichelsen/tessera/core/data"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SinkFactory func(string) (data.Sink, error)
|
||||||
|
|
||||||
|
type Sinks struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
m map[string]SinkFactory
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSinks() *Sinks { return &Sinks{m: make(map[string]SinkFactory)} }
|
||||||
|
|
||||||
|
func (r *Sinks) Register(name string, f SinkFactory) {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
r.m[name] = f
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Sinks) New(name string, params string) (data.Sink, error) {
|
||||||
|
r.mu.RLock()
|
||||||
|
f, ok := r.m[name]
|
||||||
|
r.mu.RUnlock()
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("unknown sink: %s", name)
|
||||||
|
}
|
||||||
|
return f(params)
|
||||||
|
}
|
||||||
33
core/data/registry/sources.go
Normal file
33
core/data/registry/sources.go
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
package registry
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"gitlab.michelsen.id/phillmichelsen/tessera/core/data"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SourceFactory func(string) (data.Source, error)
|
||||||
|
|
||||||
|
type Sources struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
m map[string]SourceFactory
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSources() *Sources { return &Sources{m: make(map[string]SourceFactory)} }
|
||||||
|
|
||||||
|
func (r *Sources) Register(name string, f SourceFactory) {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
r.m[name] = f
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Sources) New(name string, params string) (data.Source, error) {
|
||||||
|
r.mu.RLock()
|
||||||
|
f, ok := r.m[name]
|
||||||
|
r.mu.RUnlock()
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("unknown source: %s", name)
|
||||||
|
}
|
||||||
|
return f(params)
|
||||||
|
}
|
||||||
0
core/data/router/router.go
Normal file
0
core/data/router/router.go
Normal file
1
core/data/wire.go
Normal file
1
core/data/wire.go
Normal file
@@ -0,0 +1 @@
|
|||||||
|
package data
|
||||||
Reference in New Issue
Block a user