diff --git a/cmd/datad/main.go b/cmd/datad/main.go new file mode 100644 index 0000000..38dd16d --- /dev/null +++ b/cmd/datad/main.go @@ -0,0 +1,3 @@ +package main + +func main() {} diff --git a/core/data/bus.go b/core/data/bus.go new file mode 100644 index 0000000..a73a3f1 --- /dev/null +++ b/core/data/bus.go @@ -0,0 +1,91 @@ +package data + +import ( + "sync" + + "github.com/google/uuid" +) + +type Bus interface { + Publish(id uuid.UUID, env Envelope) error + Subscribe(id uuid.UUID) (<-chan Envelope, func(), error) +} + +type InMemBus struct { + mu sync.RWMutex + subs map[uuid.UUID][]chan Envelope + publishChan chan published +} + +type published struct { + id uuid.UUID + env Envelope +} + +const ( + busBufferSize = 256 // buffer size for publishChan + subscriberBufferSize = 128 // buffer size for each subscriber channel +) + +func NewInMemBus() *InMemBus { + b := &InMemBus{ + subs: make(map[uuid.UUID][]chan Envelope), + publishChan: make(chan published, busBufferSize), + } + go b.run() + return b +} + +func (b *InMemBus) Publish(id uuid.UUID, env Envelope) error { + b.publishChan <- published{id: id, env: env} + return nil +} + +func (b *InMemBus) Subscribe(id uuid.UUID) (<-chan Envelope, func(), error) { + ch := make(chan Envelope, subscriberBufferSize) + + b.mu.Lock() + b.subs[id] = append(b.subs[id], ch) + b.mu.Unlock() + + cancel := func() { + b.mu.Lock() + defer b.mu.Unlock() + + chans := b.subs[id] + for i, c := range chans { + if c == ch { + // swap-delete + chans[i] = chans[len(chans)-1] + chans = chans[:len(chans)-1] + break + } + } + + if len(chans) == 0 { + delete(b.subs, id) + } else { + b.subs[id] = chans + } + } + + return ch, cancel, nil +} + +func (b *InMemBus) run() { + for pub := range b.publishChan { + id := pub.id + + b.mu.RLock() + chans := b.subs[id] + for _, ch := range chans { + select { + case ch <- pub.env: + default: + <-ch + ch <- pub.env + } + } + b.mu.RUnlock() + } +} diff --git a/core/data/wire.go b/core/data/coordinator.go similarity index 100% rename from core/data/wire.go rename to core/data/coordinator.go diff --git a/core/data/data.go b/core/data/data.go deleted file mode 100644 index 1585260..0000000 --- a/core/data/data.go +++ /dev/null @@ -1,33 +0,0 @@ -// Package data ... -package data - -import "time" - -type Envelope struct { - Identifier string - Timestamp time.Time - Sequence uint64 - Payload []byte -} - -type Source interface { - Start(config any) error - Stop() - Name() string - - Subscribe(key string) (<-chan Envelope, error) - Unsubscribe(key string) error - - IsValidKey(key string) bool - GetSubscriptions() []string -} - -type Processor interface { - Start(config any, send chan<- Envelope, receive <-chan Envelope) error - Stop() -} - -type Sink interface { - Start(config any, receive <-chan Envelope) error - Stop() -} diff --git a/core/data/descriptor.go b/core/data/descriptor.go new file mode 100644 index 0000000..188f23f --- /dev/null +++ b/core/data/descriptor.go @@ -0,0 +1,9 @@ +package data + +type Descriptor struct { + Type string + Key string + Schema string + Attributes map[string]string + Tags []string +} diff --git a/core/data/egress.go b/core/data/egress.go new file mode 100644 index 0000000..7f27c8c --- /dev/null +++ b/core/data/egress.go @@ -0,0 +1,18 @@ +package data + +import ( + "context" + + "github.com/google/uuid" +) + +type Egress interface { + Name() string + Configure(cfg map[string]any) error + Connect(ctx context.Context, actions EgresActions) error + Disconnect(ctx context.Context) error +} + +type EgresActions interface { + Subscribe(ctx context.Context, namespace string, id uuid.UUID) (<-chan Envelope, func(), error) +} diff --git a/core/data/envelope.go b/core/data/envelope.go new file mode 100644 index 0000000..272a417 --- /dev/null +++ b/core/data/envelope.go @@ -0,0 +1,13 @@ +// Package data ... +package data + +import ( + "time" +) + +type Envelope struct { + SendTime time.Time + + Descriptor Descriptor + Payload any +} diff --git a/core/data/errors.go b/core/data/errors.go deleted file mode 100644 index a136936..0000000 --- a/core/data/errors.go +++ /dev/null @@ -1,10 +0,0 @@ -package data - -import "errors" - -var ( - ErrUnknownKey = errors.New("unknown key") - ErrAlreadySubscribed = errors.New("already subscribed") - ErrNotSubscribed = errors.New("not subscribed") - ErrDroppedMessage = errors.New("dropped message: buffer full") -) diff --git a/core/data/events/bar.go b/core/data/events/bar.go new file mode 100644 index 0000000..f8c282e --- /dev/null +++ b/core/data/events/bar.go @@ -0,0 +1,12 @@ +package events + +import "time" + +type Bar struct { + Open float64 + High float64 + Low float64 + Close float64 + Volume float64 + Interval time.Duration +} diff --git a/core/data/events/custom.go b/core/data/events/custom.go new file mode 100644 index 0000000..eba4536 --- /dev/null +++ b/core/data/events/custom.go @@ -0,0 +1,6 @@ +package events + +type Custom struct { + Bytes []byte + ContentType string +} diff --git a/core/data/events/domain.go b/core/data/events/domain.go new file mode 100644 index 0000000..1ed13b6 --- /dev/null +++ b/core/data/events/domain.go @@ -0,0 +1,22 @@ +// Package events ... +package events + +type DataType uint8 + +const ( + TradeType DataType = iota + QuoteType + BarType + MBPDeltaType + MBPSnapshotType + MBODeltaType + MBOSnapshotType + CustomType +) + +type Side uint8 + +const ( + Bid Side = iota + Ask +) diff --git a/core/data/events/mbodelta.go b/core/data/events/mbodelta.go new file mode 100644 index 0000000..b24f3b4 --- /dev/null +++ b/core/data/events/mbodelta.go @@ -0,0 +1,20 @@ +package events + +type MBODelta struct { + Operation MBOOrderOp + OrderID string + Side Side + Price float64 + Size float64 + IsMaker bool + Seq uint64 + ParentID string +} + +type MBOOrderOp uint8 + +const ( + OrderAdd MBOOrderOp = iota + OrderMod + OrderDel +) diff --git a/core/data/events/mbosnapshot.go b/core/data/events/mbosnapshot.go new file mode 100644 index 0000000..f1a2cf5 --- /dev/null +++ b/core/data/events/mbosnapshot.go @@ -0,0 +1,14 @@ +package events + +type MBOSnapshot struct { + Orders []OrderEntry + Seq uint64 +} + +type OrderEntry struct { + OrderID string + Side Side + Price float64 + Size float64 + IsMaker bool +} diff --git a/core/data/events/mbpdelta.go b/core/data/events/mbpdelta.go new file mode 100644 index 0000000..5eedab3 --- /dev/null +++ b/core/data/events/mbpdelta.go @@ -0,0 +1,8 @@ +package events + +type MBPDelta struct { + Side Side + Price float64 + Size float64 + Seq uint64 +} diff --git a/core/data/events/mbpsnapshot.go b/core/data/events/mbpsnapshot.go new file mode 100644 index 0000000..2fb0a5a --- /dev/null +++ b/core/data/events/mbpsnapshot.go @@ -0,0 +1,13 @@ +package events + +type MBPSnapshot struct { + Bids []PriceLevel + Asks []PriceLevel + Depth int + Seq uint64 +} + +type PriceLevel struct { + Price float64 + Size float64 +} diff --git a/core/data/events/quote.go b/core/data/events/quote.go new file mode 100644 index 0000000..10ab9fa --- /dev/null +++ b/core/data/events/quote.go @@ -0,0 +1,8 @@ +package events + +type Quote struct { + BidPrice float64 + BidSize float64 + AskPrice float64 + AskSize float64 +} diff --git a/core/data/events/trade.go b/core/data/events/trade.go new file mode 100644 index 0000000..a3650fb --- /dev/null +++ b/core/data/events/trade.go @@ -0,0 +1,16 @@ +package events + +type Trade struct { + Price float64 + Qty float64 + Aggressor AggressorSide + TradeID string +} + +type AggressorSide uint8 + +const ( + AggUnknown AggressorSide = iota + AggBuy + AggSell +) diff --git a/core/data/ingress.go b/core/data/ingress.go new file mode 100644 index 0000000..b1c562a --- /dev/null +++ b/core/data/ingress.go @@ -0,0 +1,34 @@ +package data + +import ( + "context" + "time" + + "github.com/google/uuid" +) + +type Ingress interface { + Name() string + Configure(cfg map[string]any) error + Connect(ctx context.Context, actions IngressActions) error + Disconnect(ctx context.Context) error +} + +type IngressActions interface { + Emit(namespace string, id uuid.UUID, envelope Envelope) error + EmitBatch(namespace string, id uuid.UUID, envelopes []Envelope) error +} + +type LiveIngress interface { + Subscribe(ctx context.Context, key ...string) error + Unsubscribe(ctx context.Context, key ...string) error + ListAvailableKeys(ctx context.Context) []string + IsValidKey(ctx context.Context, key string) bool +} + +type HistoricalIngress interface { + Fetch(ctx context.Context, key string, start time.Time, end time.Time) ([]Envelope, error) + ListAvailableKeys(ctx context.Context) []string + IsValidKey(ctx context.Context, key string) bool + // some other method that gives the avalible time period for a key +} diff --git a/core/data/processor.go b/core/data/processor.go new file mode 100644 index 0000000..ada97ac --- /dev/null +++ b/core/data/processor.go @@ -0,0 +1,14 @@ +package data + +import "context" + +type Processor interface { + Name() string + Configure(cfg map[string]any) error + Run(ctx context.Context, actions ProcessorActions) error +} + +type ProcessorActions interface { + IngressActions + EgresActions +} diff --git a/core/data/registry.go b/core/data/registry.go new file mode 100644 index 0000000..0ad59c2 --- /dev/null +++ b/core/data/registry.go @@ -0,0 +1 @@ +package data diff --git a/core/data/registry/processor.go b/core/data/registry/processor.go deleted file mode 100644 index e1d3011..0000000 --- a/core/data/registry/processor.go +++ /dev/null @@ -1,34 +0,0 @@ -// Package registry ... -package registry - -import ( - "fmt" - "sync" - - "gitlab.michelsen.id/phillmichelsen/tessera/core/data" -) - -type ProcessorFactory func(string) (data.Processor, error) - -type Processors struct { - mu sync.RWMutex - m map[string]ProcessorFactory -} - -func NewProcessors() *Processors { return &Processors{m: make(map[string]ProcessorFactory)} } - -func (r *Processors) Register(name string, f ProcessorFactory) { - r.mu.Lock() - defer r.mu.Unlock() - r.m[name] = f -} - -func (r *Processors) New(name string, params string) (data.Processor, error) { - r.mu.RLock() - f, ok := r.m[name] - r.mu.RUnlock() - if !ok { - return nil, fmt.Errorf("unknown processor: %s", name) - } - return f(params) -} diff --git a/core/data/registry/sinks.go b/core/data/registry/sinks.go deleted file mode 100644 index 3d00064..0000000 --- a/core/data/registry/sinks.go +++ /dev/null @@ -1,33 +0,0 @@ -package registry - -import ( - "fmt" - "sync" - - "gitlab.michelsen.id/phillmichelsen/tessera/core/data" -) - -type SinkFactory func(string) (data.Sink, error) - -type Sinks struct { - mu sync.RWMutex - m map[string]SinkFactory -} - -func NewSinks() *Sinks { return &Sinks{m: make(map[string]SinkFactory)} } - -func (r *Sinks) Register(name string, f SinkFactory) { - r.mu.Lock() - defer r.mu.Unlock() - r.m[name] = f -} - -func (r *Sinks) New(name string, params string) (data.Sink, error) { - r.mu.RLock() - f, ok := r.m[name] - r.mu.RUnlock() - if !ok { - return nil, fmt.Errorf("unknown sink: %s", name) - } - return f(params) -} diff --git a/core/data/registry/sources.go b/core/data/registry/sources.go deleted file mode 100644 index 174a458..0000000 --- a/core/data/registry/sources.go +++ /dev/null @@ -1,33 +0,0 @@ -package registry - -import ( - "fmt" - "sync" - - "gitlab.michelsen.id/phillmichelsen/tessera/core/data" -) - -type SourceFactory func(string) (data.Source, error) - -type Sources struct { - mu sync.RWMutex - m map[string]SourceFactory -} - -func NewSources() *Sources { return &Sources{m: make(map[string]SourceFactory)} } - -func (r *Sources) Register(name string, f SourceFactory) { - r.mu.Lock() - defer r.mu.Unlock() - r.m[name] = f -} - -func (r *Sources) New(name string, params string) (data.Source, error) { - r.mu.RLock() - f, ok := r.m[name] - r.mu.RUnlock() - if !ok { - return nil, fmt.Errorf("unknown source: %s", name) - } - return f(params) -} diff --git a/core/data/router.go b/core/data/router.go new file mode 100644 index 0000000..b84d74c --- /dev/null +++ b/core/data/router.go @@ -0,0 +1,16 @@ +package data + +import "github.com/google/uuid" + +type Router struct { + busses map[string]Bus +} + +func NewRouter() *Router { + return &Router{ + busses: make(map[string]Bus), + } +} + +func (r *Router) Emit(namespace string, id uuid.UUID, envelope Envelope) error { return nil } +func (r *Router) EmitBatch(namespace string, id uuid.UUID, enveloeps []Envelope) error { return nil } diff --git a/core/data/router/router.go b/core/data/router/router.go deleted file mode 100644 index e69de29..0000000 diff --git a/core/worker/worker.go b/core/worker/worker.go deleted file mode 100644 index e69de29..0000000 diff --git a/go.mod b/go.mod index ac1743e..7d9ae90 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,5 @@ module gitlab.michelsen.id/phillmichelsen/tessera go 1.25.1 + +require github.com/google/uuid v1.6.0 // indirect diff --git a/go.sum b/go.sum index e69de29..7790d7c 100644 --- a/go.sum +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=