Major update to the data service. Added gRPC and socket servers. Switched to using only a single go.mod at the root.
This commit is contained in:
54
services/data_service/cmd/main.go
Normal file
54
services/data_service/cmd/main.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service"
|
||||
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager"
|
||||
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider/binance"
|
||||
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/router"
|
||||
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/server"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/reflection"
|
||||
"log"
|
||||
"net"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// Setup
|
||||
r := router.NewRouter()
|
||||
m := manager.NewManager(r)
|
||||
binanceFutures := binance.NewFuturesWebsocket()
|
||||
m.AddProvider("binance_futures_websocket", binanceFutures)
|
||||
|
||||
// gRPC Server
|
||||
grpcServer := grpc.NewServer()
|
||||
streamingServer := server.NewGRPCStreamingServer(m)
|
||||
pb.RegisterDataServiceStreamingServer(grpcServer, streamingServer)
|
||||
reflection.Register(grpcServer)
|
||||
|
||||
go func() {
|
||||
grpcLis, err := net.Listen("tcp", ":50051")
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to listen for gRPC: %v", err)
|
||||
}
|
||||
log.Println("gRPC server listening on :50051")
|
||||
if err := grpcServer.Serve(grpcLis); err != nil {
|
||||
log.Fatalf("Failed to serve gRPC: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Socket Server
|
||||
socketServer := server.NewSocketStreamingServer(m)
|
||||
go func() {
|
||||
socketLis, err := net.Listen("tcp", ":6000")
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to listen for socket: %v", err)
|
||||
}
|
||||
log.Println("Socket server listening on :6000")
|
||||
if err := socketServer.Serve(socketLis); err != nil {
|
||||
log.Fatalf("Socket server error: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Block main forever
|
||||
select {}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package identifier
|
||||
package domain
|
||||
|
||||
type Identifier struct {
|
||||
Provider string
|
||||
6
services/data_service/internal/domain/message.go
Normal file
6
services/data_service/internal/domain/message.go
Normal file
@@ -0,0 +1,6 @@
|
||||
package domain
|
||||
|
||||
type Message struct {
|
||||
Identifier Identifier
|
||||
Payload any
|
||||
}
|
||||
196
services/data_service/internal/manager/manager.go
Normal file
196
services/data_service/internal/manager/manager.go
Normal file
@@ -0,0 +1,196 @@
|
||||
package manager
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/google/uuid"
|
||||
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain"
|
||||
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider"
|
||||
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/router"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Manager struct {
|
||||
providers map[string]provider.Provider
|
||||
providerStreams map[domain.Identifier]chan domain.Message
|
||||
|
||||
clientStreams map[uuid.UUID]*ClientStream
|
||||
|
||||
router *router.Router
|
||||
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
type ClientStream struct {
|
||||
UUID uuid.UUID
|
||||
Identifiers []domain.Identifier
|
||||
OutChannel chan domain.Message
|
||||
Timer *time.Timer
|
||||
}
|
||||
|
||||
func NewManager(router *router.Router) *Manager {
|
||||
go router.Run() // Start the router in a separate goroutine
|
||||
return &Manager{
|
||||
providers: make(map[string]provider.Provider),
|
||||
providerStreams: make(map[domain.Identifier]chan domain.Message),
|
||||
clientStreams: make(map[uuid.UUID]*ClientStream),
|
||||
router: router,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) StartStream(ids []domain.Identifier) (uuid.UUID, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
// Validate Identifiers, prevents unnecessary calls to providers
|
||||
for _, id := range ids {
|
||||
if id.Provider == "" || id.Subject == "" {
|
||||
return uuid.Nil, fmt.Errorf("invalid identifier: %v", id)
|
||||
}
|
||||
if _, ok := m.providers[id.Provider]; !ok {
|
||||
return uuid.Nil, fmt.Errorf("unknown provider: %s", id.Provider)
|
||||
}
|
||||
if !m.providers[id.Provider].IsValidSubject(id.Subject, false) {
|
||||
return uuid.Nil, fmt.Errorf("invalid subject for provider %s: %s", id.Provider, id.Subject)
|
||||
}
|
||||
}
|
||||
|
||||
// Provision provider streams
|
||||
for _, id := range ids {
|
||||
if _, ok := m.providerStreams[id]; ok {
|
||||
continue // Skip if requested stream is already being provided
|
||||
}
|
||||
|
||||
ch := make(chan domain.Message, 64)
|
||||
if err := m.providers[id.Provider].RequestStream(id.Subject, ch); err != nil {
|
||||
return uuid.Nil, fmt.Errorf("provision %v: %w", id, err)
|
||||
}
|
||||
m.providerStreams[id] = ch
|
||||
|
||||
// Start routine to route the provider stream to the router's input channel
|
||||
go func(ch chan domain.Message) {
|
||||
for msg := range ch {
|
||||
m.router.IncomingChannel() <- msg
|
||||
}
|
||||
}(ch)
|
||||
}
|
||||
|
||||
streamID := uuid.New()
|
||||
|
||||
m.clientStreams[streamID] = &ClientStream{
|
||||
UUID: streamID,
|
||||
Identifiers: ids,
|
||||
OutChannel: nil, // Initially nil, will be set when connected
|
||||
Timer: time.AfterFunc(1*time.Minute, func() {
|
||||
fmt.Printf("stream %s expired due to inactivity\n", streamID)
|
||||
m.StopStream(streamID)
|
||||
}),
|
||||
}
|
||||
|
||||
return streamID, nil
|
||||
}
|
||||
|
||||
func (m *Manager) StopStream(streamID uuid.UUID) {
|
||||
m.DisconnectStream(streamID)
|
||||
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
stream, ok := m.clientStreams[streamID]
|
||||
if !ok {
|
||||
return // Stream not found
|
||||
}
|
||||
|
||||
stream.Timer.Stop()
|
||||
|
||||
delete(m.clientStreams, streamID)
|
||||
|
||||
// Find provider streams that are used by other client streams
|
||||
used := make(map[domain.Identifier]bool)
|
||||
for _, s := range m.clientStreams {
|
||||
for _, id := range s.Identifiers {
|
||||
used[id] = true
|
||||
}
|
||||
}
|
||||
|
||||
// Cancel provider streams that are not used by any client stream
|
||||
for id, ch := range m.providerStreams {
|
||||
if !used[id] {
|
||||
m.providers[id.Provider].CancelStream(id.Subject)
|
||||
close(ch)
|
||||
delete(m.providerStreams, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) ConnectStream(streamID uuid.UUID) (<-chan domain.Message, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
stream, ok := m.clientStreams[streamID]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("stream not found: %s", streamID)
|
||||
}
|
||||
|
||||
if stream.OutChannel != nil {
|
||||
return nil, fmt.Errorf("stream already connected")
|
||||
}
|
||||
|
||||
ch := make(chan domain.Message, 128)
|
||||
stream.OutChannel = ch
|
||||
|
||||
for _, ident := range stream.Identifiers {
|
||||
m.router.RegisterRoute(ident, ch)
|
||||
}
|
||||
|
||||
if stream.Timer != nil {
|
||||
stream.Timer.Stop()
|
||||
stream.Timer = nil
|
||||
}
|
||||
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
func (m *Manager) DisconnectStream(streamID uuid.UUID) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
stream, ok := m.clientStreams[streamID]
|
||||
if !ok || stream.OutChannel == nil {
|
||||
return // already disconnected or does not exist
|
||||
}
|
||||
|
||||
// Deregister all identifiers from the router
|
||||
for _, ident := range stream.Identifiers {
|
||||
m.router.DeregisterRoute(ident, stream.OutChannel)
|
||||
}
|
||||
|
||||
// Close the output channel
|
||||
close(stream.OutChannel)
|
||||
stream.OutChannel = nil
|
||||
|
||||
// Set up the expiry timer
|
||||
stream.Timer = time.AfterFunc(1*time.Minute, func() {
|
||||
fmt.Printf("stream %s expired due to inactivity\n", streamID)
|
||||
m.StopStream(streamID)
|
||||
})
|
||||
}
|
||||
|
||||
func (m *Manager) AddProvider(name string, p provider.Provider) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
if _, exists := m.providers[name]; exists {
|
||||
panic(fmt.Sprintf("provider %s already exists", name))
|
||||
}
|
||||
|
||||
if err := p.Start(); err != nil {
|
||||
panic(fmt.Errorf("failed to start provider %s: %w", name, err))
|
||||
}
|
||||
|
||||
m.providers[name] = p
|
||||
}
|
||||
|
||||
func (m *Manager) RemoveProvider(name string) {
|
||||
panic("not implemented yet") // TODO: Implement provider removal logic
|
||||
}
|
||||
@@ -0,0 +1,153 @@
|
||||
package binance
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain"
|
||||
)
|
||||
|
||||
type FuturesWebsocket struct {
|
||||
conn *websocket.Conn
|
||||
activeStreams map[string]chan domain.Message
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewFuturesWebsocket() *FuturesWebsocket {
|
||||
return &FuturesWebsocket{
|
||||
activeStreams: make(map[string]chan domain.Message),
|
||||
}
|
||||
}
|
||||
|
||||
func (b *FuturesWebsocket) Start() error {
|
||||
c, _, err := websocket.DefaultDialer.Dial("wss://fstream.binance.com/stream", nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("connect failed: %w", err)
|
||||
}
|
||||
b.conn = c
|
||||
go b.readLoop()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *FuturesWebsocket) Stop() {
|
||||
if b.conn != nil {
|
||||
err := b.conn.Close()
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("failed to close websocket connection: %w", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *FuturesWebsocket) RequestStream(subject string, ch chan domain.Message) error {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
if _, ok := b.activeStreams[subject]; ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
msg := map[string]interface{}{
|
||||
"method": "SUBSCRIBE",
|
||||
"params": []string{subject},
|
||||
"id": len(b.activeStreams) + 1,
|
||||
}
|
||||
if err := b.conn.WriteJSON(msg); err != nil {
|
||||
return fmt.Errorf("subscribe failed: %w", err)
|
||||
}
|
||||
|
||||
b.activeStreams[subject] = ch
|
||||
fmt.Println("Subscribed to stream:", subject)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *FuturesWebsocket) CancelStream(subject string) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
if _, ok := b.activeStreams[subject]; !ok {
|
||||
return
|
||||
}
|
||||
|
||||
msg := map[string]interface{}{
|
||||
"method": "UNSUBSCRIBE",
|
||||
"params": []string{subject},
|
||||
"id": len(b.activeStreams) + 1000,
|
||||
}
|
||||
_ = b.conn.WriteJSON(msg)
|
||||
|
||||
delete(b.activeStreams, subject)
|
||||
}
|
||||
|
||||
func (b *FuturesWebsocket) GetActiveStreams() []string {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
var streams []string
|
||||
for k := range b.activeStreams {
|
||||
streams = append(streams, k)
|
||||
}
|
||||
return streams
|
||||
}
|
||||
|
||||
func (b *FuturesWebsocket) IsStreamActive(subject string) bool {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
_, ok := b.activeStreams[subject]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (b *FuturesWebsocket) Fetch(subject string) (domain.Message, error) {
|
||||
return domain.Message{}, fmt.Errorf("not supported: websocket provider does not implement fetch")
|
||||
}
|
||||
|
||||
func (b *FuturesWebsocket) IsValidSubject(subject string, isFetch bool) bool {
|
||||
if isFetch {
|
||||
return false
|
||||
}
|
||||
return len(subject) > 0 // Extend with regex or lookup if needed
|
||||
}
|
||||
|
||||
func (b *FuturesWebsocket) readLoop() {
|
||||
for {
|
||||
_, msgBytes, err := b.conn.ReadMessage()
|
||||
if err != nil {
|
||||
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
||||
return
|
||||
}
|
||||
fmt.Printf("read error: %v\n", err)
|
||||
continue
|
||||
}
|
||||
|
||||
var container struct {
|
||||
Stream string `json:"stream"`
|
||||
Data json.RawMessage `json:"data"`
|
||||
}
|
||||
if err := json.Unmarshal(msgBytes, &container); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
b.mu.Lock()
|
||||
ch, ok := b.activeStreams[container.Stream]
|
||||
b.mu.Unlock()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
msg := domain.Message{
|
||||
Identifier: domain.Identifier{
|
||||
Provider: "binance_futures_websocket",
|
||||
Subject: container.Stream,
|
||||
},
|
||||
Payload: container.Data,
|
||||
}
|
||||
|
||||
select {
|
||||
case ch <- msg:
|
||||
default:
|
||||
fmt.Printf("channel for %s is full, dropping message\n", container.Stream)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,19 +1,19 @@
|
||||
package provider
|
||||
|
||||
import (
|
||||
"gitlab.michelsen.id/phillmichelsen/tessera/services/marketdata/internal/router"
|
||||
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain"
|
||||
)
|
||||
|
||||
type Provider interface {
|
||||
Start() error
|
||||
Stop() error
|
||||
Stop()
|
||||
|
||||
RequestStream(subject string, channel chan router.Message) error
|
||||
RequestStream(subject string, channel chan domain.Message) error
|
||||
CancelStream(subject string)
|
||||
GetActiveStreams() []string
|
||||
IsStreamActive(subject string) bool
|
||||
|
||||
Fetch(subject string) (router.Message, error)
|
||||
Fetch(subject string) (domain.Message, error)
|
||||
|
||||
IsValidSubject(subject string, isFetch bool) bool
|
||||
}
|
||||
67
services/data_service/internal/router/router.go
Normal file
67
services/data_service/internal/router/router.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Router struct {
|
||||
incoming chan domain.Message
|
||||
routes map[domain.Identifier][]chan<- domain.Message
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func NewRouter() *Router {
|
||||
return &Router{
|
||||
incoming: make(chan domain.Message, 64), // Buffered channel for incoming messages
|
||||
routes: make(map[domain.Identifier][]chan<- domain.Message),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Router) IncomingChannel() chan<- domain.Message {
|
||||
return r.incoming
|
||||
}
|
||||
|
||||
func (r *Router) Run() {
|
||||
for msg := range r.incoming {
|
||||
startTime := time.Now()
|
||||
r.mu.RLock()
|
||||
channels := r.routes[msg.Identifier]
|
||||
|
||||
for _, ch := range channels {
|
||||
select {
|
||||
case ch <- msg:
|
||||
default:
|
||||
fmt.Println("Dropped message, buffer full!!!")
|
||||
}
|
||||
}
|
||||
r.mu.RUnlock()
|
||||
fmt.Printf("Message routed to %d channels in %v\n", len(channels), time.Since(startTime))
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Router) RegisterRoute(id domain.Identifier, ch chan<- domain.Message) {
|
||||
r.mu.Lock()
|
||||
r.routes[id] = append(r.routes[id], ch)
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
func (r *Router) DeregisterRoute(id domain.Identifier, ch chan<- domain.Message) {
|
||||
r.mu.Lock()
|
||||
slice := r.routes[id]
|
||||
for i := 0; i < len(slice); i++ {
|
||||
if slice[i] == ch {
|
||||
slice[i] = slice[len(slice)-1]
|
||||
slice = slice[:len(slice)-1]
|
||||
i--
|
||||
}
|
||||
}
|
||||
if len(slice) == 0 {
|
||||
delete(r.routes, id)
|
||||
} else {
|
||||
r.routes[id] = slice
|
||||
}
|
||||
r.mu.Unlock()
|
||||
}
|
||||
@@ -0,0 +1,74 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/google/uuid"
|
||||
pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service"
|
||||
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain"
|
||||
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager"
|
||||
)
|
||||
|
||||
type GRPCStreamingServer struct {
|
||||
pb.UnimplementedDataServiceStreamingServer
|
||||
manager *manager.Manager
|
||||
}
|
||||
|
||||
func NewGRPCStreamingServer(m *manager.Manager) *GRPCStreamingServer {
|
||||
return &GRPCStreamingServer{
|
||||
manager: m,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *GRPCStreamingServer) StartStream(ctx context.Context, req *pb.StartStreamRequest) (*pb.StartStreamResponse, error) {
|
||||
var ids []domain.Identifier
|
||||
for _, id := range req.Identifiers {
|
||||
ids = append(ids, domain.Identifier{
|
||||
Provider: id.Provider,
|
||||
Subject: id.Subject,
|
||||
})
|
||||
}
|
||||
|
||||
streamID, err := s.manager.StartStream(ids)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to start stream: %w", err)
|
||||
}
|
||||
|
||||
return &pb.StartStreamResponse{StreamUuid: streamID.String()}, nil
|
||||
}
|
||||
|
||||
func (s *GRPCStreamingServer) ConnectStream(req *pb.ConnectStreamRequest, stream pb.DataServiceStreaming_ConnectStreamServer) error {
|
||||
streamUUID, err := uuid.Parse(req.StreamUuid)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid UUID: %w", err)
|
||||
}
|
||||
|
||||
ch, err := s.manager.ConnectStream(streamUUID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect: %w", err)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stream.Context().Done():
|
||||
s.manager.DisconnectStream(streamUUID)
|
||||
return nil
|
||||
case msg, ok := <-ch:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := stream.Send(&pb.Message{
|
||||
Identifier: &pb.Identifier{
|
||||
Provider: msg.Identifier.Provider,
|
||||
Subject: msg.Identifier.Subject,
|
||||
},
|
||||
Payload: fmt.Sprintf("%s", msg.Payload),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,83 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/google/uuid"
|
||||
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager"
|
||||
"io"
|
||||
"net"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type SocketStreamingServer struct {
|
||||
manager *manager.Manager
|
||||
}
|
||||
|
||||
func NewSocketStreamingServer(m *manager.Manager) *SocketStreamingServer {
|
||||
return &SocketStreamingServer{
|
||||
manager: m,
|
||||
}
|
||||
}
|
||||
|
||||
// Serve accepts a listener (TCP or Unix) and begins handling incoming connections.
|
||||
func (s *SocketStreamingServer) Serve(lis net.Listener) error {
|
||||
for {
|
||||
conn, err := lis.Accept()
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to accept connection: %v\n", err)
|
||||
continue
|
||||
}
|
||||
go s.handleConnection(conn)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SocketStreamingServer) handleConnection(conn net.Conn) {
|
||||
defer conn.Close()
|
||||
reader := bufio.NewReader(conn)
|
||||
|
||||
raw, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to read stream UUID: %v\n", err)
|
||||
return
|
||||
}
|
||||
streamUUIDStr := strings.TrimSpace(raw)
|
||||
streamUUID, err := uuid.Parse(streamUUIDStr)
|
||||
if err != nil {
|
||||
fmt.Fprintf(conn, "Invalid stream UUID\n")
|
||||
return
|
||||
}
|
||||
|
||||
outCh, err := s.manager.ConnectStream(streamUUID)
|
||||
if err != nil {
|
||||
fmt.Fprintf(conn, "Failed to connect to stream: %v\n", err)
|
||||
return
|
||||
}
|
||||
defer s.manager.DisconnectStream(streamUUID)
|
||||
|
||||
for msg := range outCh {
|
||||
payload := struct {
|
||||
Provider string `json:"provider"`
|
||||
Subject string `json:"subject"`
|
||||
Data string `json:"data"`
|
||||
}{
|
||||
Provider: msg.Identifier.Provider,
|
||||
Subject: msg.Identifier.Subject,
|
||||
Data: fmt.Sprintf("%s", msg.Payload),
|
||||
}
|
||||
|
||||
bytes, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to encode message: %v\n", err)
|
||||
continue
|
||||
}
|
||||
_, err = conn.Write(append(bytes, '\n'))
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
fmt.Printf("Write error: %v\n", err)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,5 +0,0 @@
|
||||
module gitlab.michelsen.id/phillmichelsen/tessera/services/marketdata
|
||||
|
||||
go 1.24.2
|
||||
|
||||
require github.com/google/uuid v1.6.0 // indirect
|
||||
@@ -1,2 +0,0 @@
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
@@ -1,187 +0,0 @@
|
||||
package manager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"gitlab.michelsen.id/phillmichelsen/tessera/services/marketdata/internal/identifier"
|
||||
"gitlab.michelsen.id/phillmichelsen/tessera/services/marketdata/internal/provider"
|
||||
"gitlab.michelsen.id/phillmichelsen/tessera/services/marketdata/internal/router"
|
||||
)
|
||||
|
||||
type Manager struct {
|
||||
Router *router.Router
|
||||
incoming chan router.Message // Aggregated incoming messages from providers, for the Router
|
||||
|
||||
Providers map[string]provider.Provider
|
||||
providerStreams map[identifier.Identifier]chan router.Message // Channels for the streams that the providers are running
|
||||
|
||||
subscribers map[identifier.Identifier][]chan router.Message // Map of identifiers to subscriber channels, one to many mapping
|
||||
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// NewManager constructs a Manager and starts its Router loop.
|
||||
func NewManager() *Manager {
|
||||
incoming := make(chan router.Message, 128)
|
||||
r := router.NewRouter(incoming)
|
||||
|
||||
m := &Manager{
|
||||
Router: r,
|
||||
incoming: incoming,
|
||||
Providers: make(map[string]provider.Provider),
|
||||
providerStreams: make(map[identifier.Identifier]chan router.Message),
|
||||
subscribers: make(map[identifier.Identifier][]chan router.Message),
|
||||
}
|
||||
go r.Run()
|
||||
return m
|
||||
}
|
||||
|
||||
// AddProvider registers and starts a new Provider under the given name.
|
||||
func (m *Manager) AddProvider(name string, p provider.Provider) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
if _, exists := m.Providers[name]; exists {
|
||||
return fmt.Errorf("provider %q already exists", name)
|
||||
}
|
||||
if err := p.Start(); err != nil {
|
||||
return fmt.Errorf("failed to start provider %q: %w", name, err)
|
||||
}
|
||||
m.Providers[name] = p
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoveProvider stops and unregisters a Provider, tearing down all its streams.
|
||||
func (m *Manager) RemoveProvider(name string) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
p, exists := m.Providers[name]
|
||||
if !exists {
|
||||
return fmt.Errorf("provider %q not found", name)
|
||||
}
|
||||
if err := p.Stop(); err != nil {
|
||||
return fmt.Errorf("failed to stop provider %q: %w", name, err)
|
||||
}
|
||||
|
||||
// tear down every active stream for this provider
|
||||
for id, streamCh := range m.providerStreams {
|
||||
if id.Provider != name {
|
||||
continue
|
||||
}
|
||||
// stop the provider's internal stream
|
||||
p.CancelStream(id.Subject)
|
||||
close(streamCh)
|
||||
delete(m.providerStreams, id)
|
||||
|
||||
// deregister & close all subscriber channels
|
||||
for _, subCh := range m.subscribers[id] {
|
||||
m.Router.Deregister(id, subCh)
|
||||
close(subCh)
|
||||
}
|
||||
delete(m.subscribers, id)
|
||||
}
|
||||
|
||||
delete(m.Providers, name)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stream establishes a single gRPC‐stream channel that multiplexes
|
||||
// live updates for all requested identifiers.
|
||||
// When ctx is canceled, it will deregister and—if a stream has no more
|
||||
// subscribers—call CancelStream on the provider.
|
||||
func (m *Manager) Stream(
|
||||
ctx context.Context,
|
||||
reqs []identifier.Identifier,
|
||||
) (<-chan router.Message, error) {
|
||||
m.mu.Lock()
|
||||
// 1) Validate and ensure each provider/subject is streaming
|
||||
for _, id := range reqs {
|
||||
p, ok := m.Providers[id.Provider]
|
||||
if !ok {
|
||||
m.mu.Unlock()
|
||||
return nil, fmt.Errorf("provider %q not found", id.Provider)
|
||||
}
|
||||
if !p.IsValidSubject(id.Subject, false) {
|
||||
m.mu.Unlock()
|
||||
return nil, fmt.Errorf("invalid subject %q for provider %q", id.Subject, id.Provider)
|
||||
}
|
||||
// start the provider stream if not already running
|
||||
if _, exists := m.providerStreams[id]; !exists {
|
||||
ch := make(chan router.Message, 64)
|
||||
if err := p.RequestStream(id.Subject, ch); err != nil {
|
||||
m.mu.Unlock()
|
||||
return nil, fmt.Errorf("could not request stream for %v: %w", id, err)
|
||||
}
|
||||
m.providerStreams[id] = ch
|
||||
// pump into the central incoming channel
|
||||
go func(c chan router.Message) {
|
||||
for msg := range c {
|
||||
m.incoming <- msg
|
||||
}
|
||||
}(ch)
|
||||
}
|
||||
}
|
||||
|
||||
// 2) Create one channel for this RPC and register it for every ID
|
||||
subCh := make(chan router.Message, 128)
|
||||
for _, id := range reqs {
|
||||
m.subscribers[id] = append(m.subscribers[id], subCh)
|
||||
m.Router.Register(id, subCh)
|
||||
}
|
||||
m.mu.Unlock()
|
||||
|
||||
// 3) Teardown logic when context is done
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
for _, id := range reqs {
|
||||
// deregister this subscriber channel
|
||||
m.Router.Deregister(id, subCh)
|
||||
|
||||
// remove it from the list
|
||||
subs := m.subscribers[id]
|
||||
for i, ch := range subs {
|
||||
if ch == subCh {
|
||||
subs = append(subs[:i], subs[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if len(subs) == 0 {
|
||||
// no more listeners: cancel provider stream
|
||||
delete(m.subscribers, id)
|
||||
if streamCh, ok := m.providerStreams[id]; ok {
|
||||
m.Providers[id.Provider].CancelStream(id.Subject)
|
||||
close(streamCh)
|
||||
delete(m.providerStreams, id)
|
||||
}
|
||||
} else {
|
||||
m.subscribers[id] = subs
|
||||
}
|
||||
}
|
||||
|
||||
close(subCh)
|
||||
}()
|
||||
|
||||
return subCh, nil
|
||||
}
|
||||
|
||||
// Fetch performs a single request/response fetch against the named provider.
|
||||
func (m *Manager) Fetch(providerName, subject string) (router.Message, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
p, ok := m.Providers[providerName]
|
||||
if !ok {
|
||||
return router.Message{}, fmt.Errorf("provider %q not found", providerName)
|
||||
}
|
||||
if !p.IsValidSubject(subject, true) {
|
||||
return router.Message{}, fmt.Errorf("invalid subject %q for provider %q", subject, providerName)
|
||||
}
|
||||
return p.Fetch(subject)
|
||||
}
|
||||
@@ -1 +0,0 @@
|
||||
package binance
|
||||
@@ -1 +0,0 @@
|
||||
package binance
|
||||
@@ -1 +0,0 @@
|
||||
package binance
|
||||
@@ -1,59 +0,0 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"gitlab.michelsen.id/phillmichelsen/tessera/services/marketdata/internal/identifier"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Message struct {
|
||||
Identifier identifier.Identifier
|
||||
Payload any
|
||||
}
|
||||
|
||||
type Router struct {
|
||||
incoming <-chan Message
|
||||
routes map[identifier.Identifier][]chan<- Message
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func NewRouter(incoming <-chan Message) *Router {
|
||||
return &Router{
|
||||
incoming: incoming,
|
||||
routes: make(map[identifier.Identifier][]chan<- Message),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Router) Run() {
|
||||
for msg := range r.incoming {
|
||||
r.mu.RLock()
|
||||
chans := r.routes[msg.Identifier]
|
||||
for _, ch := range chans {
|
||||
ch <- msg
|
||||
}
|
||||
r.mu.RUnlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Router) Register(id identifier.Identifier, ch chan<- Message) {
|
||||
r.mu.Lock()
|
||||
r.routes[id] = append(r.routes[id], ch)
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
func (r *Router) Deregister(id identifier.Identifier, ch chan<- Message) {
|
||||
r.mu.Lock()
|
||||
slice := r.routes[id]
|
||||
for i := 0; i < len(slice); i++ {
|
||||
if slice[i] == ch {
|
||||
slice[i] = slice[len(slice)-1]
|
||||
slice = slice[:len(slice)-1]
|
||||
i--
|
||||
}
|
||||
}
|
||||
if len(slice) == 0 {
|
||||
delete(r.routes, id)
|
||||
} else {
|
||||
r.routes[id] = slice
|
||||
}
|
||||
r.mu.Unlock()
|
||||
}
|
||||
@@ -1 +0,0 @@
|
||||
package grpc
|
||||
@@ -1,3 +0,0 @@
|
||||
module gitlab.michelsen.id/phillmichelsen/tessera/services/orchestrator
|
||||
|
||||
go 1.24.2
|
||||
@@ -1 +0,0 @@
|
||||
package cmd
|
||||
Reference in New Issue
Block a user