From b017291a5a811cd2a7030fb99ecf53047df50d75 Mon Sep 17 00:00:00 2001 From: Phillip Michelsen Date: Fri, 13 Jun 2025 16:09:35 +0700 Subject: [PATCH] Add Manager and Provider structures with streaming capabilities --- services/marketdata/go.mod | 2 + services/marketdata/go.sum | 2 + .../internal/identifier/identifier.go | 5 +- .../marketdata/internal/manager/manager.go | 186 ++++++++++++++++++ .../binance/spot_fix.go | 0 .../binance/spot_rest.go | 0 .../binance/spot_websocket.go | 0 .../marketdata/internal/provider/provider.go | 19 ++ .../internal/providers/interface.go | 13 -- services/marketdata/internal/router/router.go | 58 ++++++ 10 files changed, 269 insertions(+), 16 deletions(-) create mode 100644 services/marketdata/go.sum rename services/marketdata/internal/{providers => provider}/binance/spot_fix.go (100%) rename services/marketdata/internal/{providers => provider}/binance/spot_rest.go (100%) rename services/marketdata/internal/{providers => provider}/binance/spot_websocket.go (100%) create mode 100644 services/marketdata/internal/provider/provider.go delete mode 100644 services/marketdata/internal/providers/interface.go diff --git a/services/marketdata/go.mod b/services/marketdata/go.mod index 2a6fcd6..c30f89e 100644 --- a/services/marketdata/go.mod +++ b/services/marketdata/go.mod @@ -1,3 +1,5 @@ module gitlab.michelsen.id/phillmichelsen/tessera/services/marketdata go 1.24.2 + +require github.com/google/uuid v1.6.0 // indirect diff --git a/services/marketdata/go.sum b/services/marketdata/go.sum new file mode 100644 index 0000000..7790d7c --- /dev/null +++ b/services/marketdata/go.sum @@ -0,0 +1,2 @@ +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= diff --git a/services/marketdata/internal/identifier/identifier.go b/services/marketdata/internal/identifier/identifier.go index a2171aa..d9daeed 100644 --- a/services/marketdata/internal/identifier/identifier.go +++ b/services/marketdata/internal/identifier/identifier.go @@ -1,7 +1,6 @@ package identifier type Identifier struct { - Source string - Subject string - Type string + Provider string + Subject string } diff --git a/services/marketdata/internal/manager/manager.go b/services/marketdata/internal/manager/manager.go index 5d04392..2b873bb 100644 --- a/services/marketdata/internal/manager/manager.go +++ b/services/marketdata/internal/manager/manager.go @@ -1 +1,187 @@ package manager + +import ( + "context" + "fmt" + "sync" + + "gitlab.michelsen.id/phillmichelsen/tessera/services/marketdata/internal/identifier" + "gitlab.michelsen.id/phillmichelsen/tessera/services/marketdata/internal/provider" + "gitlab.michelsen.id/phillmichelsen/tessera/services/marketdata/internal/router" +) + +type Manager struct { + Router *router.Router + incoming chan router.Message // Aggregated incoming messages from providers, for the Router + + Providers map[string]provider.Provider + providerStreams map[identifier.Identifier]chan router.Message // Channels for the streams that the providers are running + + subscribers map[identifier.Identifier][]chan router.Message // Map of identifiers to subscriber channels, one to many mapping + + mu sync.Mutex +} + +// NewManager constructs a Manager and starts its Router loop. +func NewManager() *Manager { + incoming := make(chan router.Message, 128) + r := router.NewRouter(incoming) + + m := &Manager{ + Router: r, + incoming: incoming, + Providers: make(map[string]provider.Provider), + providerStreams: make(map[identifier.Identifier]chan router.Message), + subscribers: make(map[identifier.Identifier][]chan router.Message), + } + go r.Run() + return m +} + +// AddProvider registers and starts a new Provider under the given name. +func (m *Manager) AddProvider(name string, p provider.Provider) error { + m.mu.Lock() + defer m.mu.Unlock() + + if _, exists := m.Providers[name]; exists { + return fmt.Errorf("provider %q already exists", name) + } + if err := p.Start(); err != nil { + return fmt.Errorf("failed to start provider %q: %w", name, err) + } + m.Providers[name] = p + return nil +} + +// RemoveProvider stops and unregisters a Provider, tearing down all its streams. +func (m *Manager) RemoveProvider(name string) error { + m.mu.Lock() + defer m.mu.Unlock() + + p, exists := m.Providers[name] + if !exists { + return fmt.Errorf("provider %q not found", name) + } + if err := p.Stop(); err != nil { + return fmt.Errorf("failed to stop provider %q: %w", name, err) + } + + // tear down every active stream for this provider + for id, streamCh := range m.providerStreams { + if id.Provider != name { + continue + } + // stop the provider's internal stream + p.CancelStream(id.Subject) + close(streamCh) + delete(m.providerStreams, id) + + // deregister & close all subscriber channels + for _, subCh := range m.subscribers[id] { + m.Router.Deregister(id, subCh) + close(subCh) + } + delete(m.subscribers, id) + } + + delete(m.Providers, name) + return nil +} + +// Stream establishes a single gRPC‐stream channel that multiplexes +// live updates for all requested identifiers. +// When ctx is canceled, it will deregister and—if a stream has no more +// subscribers—call CancelStream on the provider. +func (m *Manager) Stream( + ctx context.Context, + reqs []identifier.Identifier, +) (<-chan router.Message, error) { + m.mu.Lock() + // 1) Validate and ensure each provider/subject is streaming + for _, id := range reqs { + p, ok := m.Providers[id.Provider] + if !ok { + m.mu.Unlock() + return nil, fmt.Errorf("provider %q not found", id.Provider) + } + if !p.IsValidSubject(id.Subject, false) { + m.mu.Unlock() + return nil, fmt.Errorf("invalid subject %q for provider %q", id.Subject, id.Provider) + } + // start the provider stream if not already running + if _, exists := m.providerStreams[id]; !exists { + ch := make(chan router.Message, 64) + if err := p.RequestStream(id.Subject, ch); err != nil { + m.mu.Unlock() + return nil, fmt.Errorf("could not request stream for %v: %w", id, err) + } + m.providerStreams[id] = ch + // pump into the central incoming channel + go func(c chan router.Message) { + for msg := range c { + m.incoming <- msg + } + }(ch) + } + } + + // 2) Create one channel for this RPC and register it for every ID + subCh := make(chan router.Message, 128) + for _, id := range reqs { + m.subscribers[id] = append(m.subscribers[id], subCh) + m.Router.Register(id, subCh) + } + m.mu.Unlock() + + // 3) Teardown logic when context is done + go func() { + <-ctx.Done() + m.mu.Lock() + defer m.mu.Unlock() + + for _, id := range reqs { + // deregister this subscriber channel + m.Router.Deregister(id, subCh) + + // remove it from the list + subs := m.subscribers[id] + for i, ch := range subs { + if ch == subCh { + subs = append(subs[:i], subs[i+1:]...) + break + } + } + + if len(subs) == 0 { + // no more listeners: cancel provider stream + delete(m.subscribers, id) + if streamCh, ok := m.providerStreams[id]; ok { + m.Providers[id.Provider].CancelStream(id.Subject) + close(streamCh) + delete(m.providerStreams, id) + } + } else { + m.subscribers[id] = subs + } + } + + close(subCh) + }() + + return subCh, nil +} + +// Fetch performs a single request/response fetch against the named provider. +func (m *Manager) Fetch(providerName, subject string) (router.Message, error) { + m.mu.Lock() + defer m.mu.Unlock() + + p, ok := m.Providers[providerName] + if !ok { + return router.Message{}, fmt.Errorf("provider %q not found", providerName) + } + if !p.IsValidSubject(subject, true) { + return router.Message{}, fmt.Errorf("invalid subject %q for provider %q", subject, providerName) + } + return p.Fetch(subject) +} diff --git a/services/marketdata/internal/providers/binance/spot_fix.go b/services/marketdata/internal/provider/binance/spot_fix.go similarity index 100% rename from services/marketdata/internal/providers/binance/spot_fix.go rename to services/marketdata/internal/provider/binance/spot_fix.go diff --git a/services/marketdata/internal/providers/binance/spot_rest.go b/services/marketdata/internal/provider/binance/spot_rest.go similarity index 100% rename from services/marketdata/internal/providers/binance/spot_rest.go rename to services/marketdata/internal/provider/binance/spot_rest.go diff --git a/services/marketdata/internal/providers/binance/spot_websocket.go b/services/marketdata/internal/provider/binance/spot_websocket.go similarity index 100% rename from services/marketdata/internal/providers/binance/spot_websocket.go rename to services/marketdata/internal/provider/binance/spot_websocket.go diff --git a/services/marketdata/internal/provider/provider.go b/services/marketdata/internal/provider/provider.go new file mode 100644 index 0000000..118c191 --- /dev/null +++ b/services/marketdata/internal/provider/provider.go @@ -0,0 +1,19 @@ +package provider + +import ( + "gitlab.michelsen.id/phillmichelsen/tessera/services/marketdata/internal/router" +) + +type Provider interface { + Start() error + Stop() error + + RequestStream(subject string, channel chan router.Message) error + CancelStream(subject string) + GetActiveStreams() []string + IsStreamActive(subject string) bool + + Fetch(subject string) (router.Message, error) + + IsValidSubject(subject string, isFetch bool) bool +} diff --git a/services/marketdata/internal/providers/interface.go b/services/marketdata/internal/providers/interface.go deleted file mode 100644 index a5ee95f..0000000 --- a/services/marketdata/internal/providers/interface.go +++ /dev/null @@ -1,13 +0,0 @@ -package providers - -import ( - "gitlab.michelsen.id/phillmichelsen/tessera/services/marketdata/internal/identifier" -) - -type Provider interface { - Start() error - Stop() error - Subscribe(identifier identifier.Identifier, channel chan<- any) error - Unsubscribe(identifier identifier.Identifier) - Fetch(identifier identifier.Identifier) (any, error) -} diff --git a/services/marketdata/internal/router/router.go b/services/marketdata/internal/router/router.go index 7ef135b..0294c97 100644 --- a/services/marketdata/internal/router/router.go +++ b/services/marketdata/internal/router/router.go @@ -1 +1,59 @@ package router + +import ( + "gitlab.michelsen.id/phillmichelsen/tessera/services/marketdata/internal/identifier" + "sync" +) + +type Message struct { + Identifier identifier.Identifier + Payload any +} + +type Router struct { + incoming <-chan Message + routes map[identifier.Identifier][]chan<- Message + mu sync.RWMutex +} + +func NewRouter(incoming <-chan Message) *Router { + return &Router{ + incoming: incoming, + routes: make(map[identifier.Identifier][]chan<- Message), + } +} + +func (r *Router) Run() { + for msg := range r.incoming { + r.mu.RLock() + chans := r.routes[msg.Identifier] + for _, ch := range chans { + ch <- msg + } + r.mu.RUnlock() + } +} + +func (r *Router) Register(id identifier.Identifier, ch chan<- Message) { + r.mu.Lock() + r.routes[id] = append(r.routes[id], ch) + r.mu.Unlock() +} + +func (r *Router) Deregister(id identifier.Identifier, ch chan<- Message) { + r.mu.Lock() + slice := r.routes[id] + for i := 0; i < len(slice); i++ { + if slice[i] == ch { + slice[i] = slice[len(slice)-1] + slice = slice[:len(slice)-1] + i-- + } + } + if len(slice) == 0 { + delete(r.routes, id) + } else { + r.routes[id] = slice + } + r.mu.Unlock() +}