diff --git a/core/data/bus.go b/core/data/bus.go deleted file mode 100644 index a73a3f1..0000000 --- a/core/data/bus.go +++ /dev/null @@ -1,91 +0,0 @@ -package data - -import ( - "sync" - - "github.com/google/uuid" -) - -type Bus interface { - Publish(id uuid.UUID, env Envelope) error - Subscribe(id uuid.UUID) (<-chan Envelope, func(), error) -} - -type InMemBus struct { - mu sync.RWMutex - subs map[uuid.UUID][]chan Envelope - publishChan chan published -} - -type published struct { - id uuid.UUID - env Envelope -} - -const ( - busBufferSize = 256 // buffer size for publishChan - subscriberBufferSize = 128 // buffer size for each subscriber channel -) - -func NewInMemBus() *InMemBus { - b := &InMemBus{ - subs: make(map[uuid.UUID][]chan Envelope), - publishChan: make(chan published, busBufferSize), - } - go b.run() - return b -} - -func (b *InMemBus) Publish(id uuid.UUID, env Envelope) error { - b.publishChan <- published{id: id, env: env} - return nil -} - -func (b *InMemBus) Subscribe(id uuid.UUID) (<-chan Envelope, func(), error) { - ch := make(chan Envelope, subscriberBufferSize) - - b.mu.Lock() - b.subs[id] = append(b.subs[id], ch) - b.mu.Unlock() - - cancel := func() { - b.mu.Lock() - defer b.mu.Unlock() - - chans := b.subs[id] - for i, c := range chans { - if c == ch { - // swap-delete - chans[i] = chans[len(chans)-1] - chans = chans[:len(chans)-1] - break - } - } - - if len(chans) == 0 { - delete(b.subs, id) - } else { - b.subs[id] = chans - } - } - - return ch, cancel, nil -} - -func (b *InMemBus) run() { - for pub := range b.publishChan { - id := pub.id - - b.mu.RLock() - chans := b.subs[id] - for _, ch := range chans { - select { - case ch <- pub.env: - default: - <-ch - ch <- pub.env - } - } - b.mu.RUnlock() - } -} diff --git a/core/data/coordinator.go b/core/data/coordinator.go deleted file mode 100644 index 0ad59c2..0000000 --- a/core/data/coordinator.go +++ /dev/null @@ -1 +0,0 @@ -package data diff --git a/core/data/router.go b/core/data/router.go deleted file mode 100644 index b84d74c..0000000 --- a/core/data/router.go +++ /dev/null @@ -1,16 +0,0 @@ -package data - -import "github.com/google/uuid" - -type Router struct { - busses map[string]Bus -} - -func NewRouter() *Router { - return &Router{ - busses: make(map[string]Bus), - } -} - -func (r *Router) Emit(namespace string, id uuid.UUID, envelope Envelope) error { return nil } -func (r *Router) EmitBatch(namespace string, id uuid.UUID, enveloeps []Envelope) error { return nil } diff --git a/core/execution/execution.go b/core/execution/execution.go deleted file mode 100644 index e69de29..0000000 diff --git a/core/portfolio/portfolio.go b/core/portfolio/portfolio.go deleted file mode 100644 index e69de29..0000000 diff --git a/core/risk/risk.go b/core/risk/risk.go deleted file mode 100644 index e69de29..0000000 diff --git a/core/strategy/strategy.go b/core/strategy/strategy.go deleted file mode 100644 index e69de29..0000000 diff --git a/pkg/data/coordinator/coordinator.go b/pkg/data/coordinator/coordinator.go new file mode 100644 index 0000000..aedabf3 --- /dev/null +++ b/pkg/data/coordinator/coordinator.go @@ -0,0 +1,2 @@ +// Package coordinator ... +package coordinator diff --git a/core/data/descriptor.go b/pkg/data/descriptor.go similarity index 100% rename from core/data/descriptor.go rename to pkg/data/descriptor.go diff --git a/core/data/egress.go b/pkg/data/egress.go similarity index 100% rename from core/data/egress.go rename to pkg/data/egress.go diff --git a/core/data/envelope.go b/pkg/data/envelope.go similarity index 100% rename from core/data/envelope.go rename to pkg/data/envelope.go diff --git a/core/data/events/bar.go b/pkg/data/events/bar.go similarity index 100% rename from core/data/events/bar.go rename to pkg/data/events/bar.go diff --git a/core/data/events/custom.go b/pkg/data/events/custom.go similarity index 100% rename from core/data/events/custom.go rename to pkg/data/events/custom.go diff --git a/core/data/events/domain.go b/pkg/data/events/domain.go similarity index 100% rename from core/data/events/domain.go rename to pkg/data/events/domain.go diff --git a/core/data/events/mbodelta.go b/pkg/data/events/mbodelta.go similarity index 100% rename from core/data/events/mbodelta.go rename to pkg/data/events/mbodelta.go diff --git a/core/data/events/mbosnapshot.go b/pkg/data/events/mbosnapshot.go similarity index 100% rename from core/data/events/mbosnapshot.go rename to pkg/data/events/mbosnapshot.go diff --git a/core/data/events/mbpdelta.go b/pkg/data/events/mbpdelta.go similarity index 100% rename from core/data/events/mbpdelta.go rename to pkg/data/events/mbpdelta.go diff --git a/core/data/events/mbpsnapshot.go b/pkg/data/events/mbpsnapshot.go similarity index 100% rename from core/data/events/mbpsnapshot.go rename to pkg/data/events/mbpsnapshot.go diff --git a/core/data/events/quote.go b/pkg/data/events/quote.go similarity index 100% rename from core/data/events/quote.go rename to pkg/data/events/quote.go diff --git a/core/data/events/trade.go b/pkg/data/events/trade.go similarity index 100% rename from core/data/events/trade.go rename to pkg/data/events/trade.go diff --git a/core/data/ingress.go b/pkg/data/ingress.go similarity index 100% rename from core/data/ingress.go rename to pkg/data/ingress.go diff --git a/core/data/processor.go b/pkg/data/processor.go similarity index 100% rename from core/data/processor.go rename to pkg/data/processor.go diff --git a/core/data/registry.go b/pkg/data/registry.go similarity index 100% rename from core/data/registry.go rename to pkg/data/registry.go diff --git a/pkg/data/router/router.go b/pkg/data/router/router.go new file mode 100644 index 0000000..ee0d945 --- /dev/null +++ b/pkg/data/router/router.go @@ -0,0 +1,2 @@ +// Package router ... +package router