From 6ebc541de055d6da2e99bc53b9f30dc3752df7d9 Mon Sep 17 00:00:00 2001 From: Phillip Michelsen Date: Tue, 9 Sep 2025 00:08:57 +0000 Subject: [PATCH] Refactor FuturesWebsocket: implement batch subscription handling, enhance connection management, and improve logging --- go.mod | 3 +- go.sum | 4 + .../data_service/cmd/data_service/main.go | 91 +- services/data_service/cmd/stream_tap/main.go | 185 +++- .../data_service/internal/domain/message.go | 8 - .../data_service/internal/manager/helper.go | 25 - .../data_service/internal/manager/manager.go | 233 +++-- .../data_service/internal/manager/session.go | 19 +- .../data_service/internal/manager/types.go | 17 +- .../provider/binance/futures_websocket.go | 885 +++++++----------- .../internal/provider/provider.go | 4 +- .../data_service/internal/router/router.go | 9 +- .../internal/server/grpc_streaming_server.go | 1 - 13 files changed, 700 insertions(+), 784 deletions(-) diff --git a/go.mod b/go.mod index 582989b..f5b2117 100644 --- a/go.mod +++ b/go.mod @@ -4,12 +4,13 @@ go 1.25.1 require ( github.com/google/uuid v1.6.0 - github.com/gorilla/websocket v1.5.3 google.golang.org/grpc v1.75.0 google.golang.org/protobuf v1.36.8 ) require ( + github.com/coder/websocket v1.8.14 // indirect + github.com/lmittmann/tint v1.1.2 // indirect golang.org/x/net v0.43.0 // indirect golang.org/x/sys v0.35.0 // indirect golang.org/x/text v0.28.0 // indirect diff --git a/go.sum b/go.sum index 2e5d3fd..ef17517 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/coder/websocket v1.8.14 h1:9L0p0iKiNOibykf283eHkKUHHrpG7f65OE3BhhO7v9g= +github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -10,6 +12,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/lmittmann/tint v1.1.2 h1:2CQzrL6rslrsyjqLDwD11bZ5OpLBPU+g3G/r5LSfS8w= +github.com/lmittmann/tint v1.1.2/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg= diff --git a/services/data_service/cmd/data_service/main.go b/services/data_service/cmd/data_service/main.go index 3c13160..5f3ddf3 100644 --- a/services/data_service/cmd/data_service/main.go +++ b/services/data_service/cmd/data_service/main.go @@ -1,10 +1,12 @@ package main import ( - "fmt" - "log" + "log/slog" "net" + "os" + "time" + "github.com/lmittmann/tint" pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider/binance" @@ -14,26 +16,67 @@ import ( "google.golang.org/grpc/reflection" ) +func initLogger() *slog.Logger { + level := parseLevel(env("LOG_LEVEL", "debug")) + if env("LOG_FORMAT", "pretty") == "json" { + return slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: level, + })) + } + return slog.New(tint.NewHandler(os.Stdout, &tint.Options{ + Level: level, + TimeFormat: time.RFC3339Nano, + NoColor: os.Getenv("NO_COLOR") != "", + })) +} + +func parseLevel(s string) slog.Level { + switch s { + case "debug": + return slog.LevelDebug + case "warn": + return slog.LevelWarn + case "error": + return slog.LevelError + default: + return slog.LevelInfo + } +} + +func env(k, def string) string { + if v := os.Getenv(k); v != "" { + return v + } + return def +} + func main() { - fmt.Println("Starting Data Service...") + slog.SetDefault(initLogger()) + slog.Info("starting", "svc", "data-service") + // Setup r := router.NewRouter(2048) m := manager.NewManager(r) - binanceFutures := binance.NewFuturesWebsocket() - _ = m.AddProvider("binance_futures_websocket", binanceFutures) + binanceFutures := binance.NewFuturesWebsocket(r.IncomingChannel()) + if err := m.AddProvider("binance_futures_websocket", binanceFutures); err != nil { + slog.Error("add provider failed", "err", err) + os.Exit(1) + } // gRPC Control Server grpcControlServer := grpc.NewServer() go func() { pb.RegisterDataServiceControlServer(grpcControlServer, server.NewGRPCControlServer(m)) reflection.Register(grpcControlServer) - grpcLis, err := net.Listen("tcp", ":50051") + lis, err := net.Listen("tcp", ":50051") if err != nil { - log.Fatalf("Failed to listen for gRPC control: %v", err) + slog.Error("listen failed", "cmp", "grpc-control", "addr", ":50051", "err", err) + os.Exit(1) } - log.Println("gRPC control server listening on :50051") - if err := grpcControlServer.Serve(grpcLis); err != nil { - log.Fatalf("Failed to serve gRPC control: %v", err) + slog.Info("listening", "cmp", "grpc-control", "addr", ":50051") + if err := grpcControlServer.Serve(lis); err != nil { + slog.Error("serve failed", "cmp", "grpc-control", "err", err) + os.Exit(1) } }() @@ -42,31 +85,17 @@ func main() { go func() { pb.RegisterDataServiceStreamingServer(grpcStreamingServer, server.NewGRPCStreamingServer(m)) reflection.Register(grpcStreamingServer) - grpcLis, err := net.Listen("tcp", ":50052") + lis, err := net.Listen("tcp", ":50052") if err != nil { - log.Fatalf("Failed to listen for gRPC: %v", err) + slog.Error("listen failed", "cmp", "grpc-streaming", "addr", ":50052", "err", err) + os.Exit(1) } - log.Println("gRPC streaming server listening on :50052") - if err := grpcStreamingServer.Serve(grpcLis); err != nil { - log.Fatalf("Failed to serve gRPC: %v", err) + slog.Info("listening", "cmp", "grpc-streaming", "addr", ":50052") + if err := grpcStreamingServer.Serve(lis); err != nil { + slog.Error("serve failed", "cmp", "grpc-streaming", "err", err) + os.Exit(1) } }() - // Socket Streaming Server - /* - socketStreamingServer := server.NewSocketStreamingServer(m) - go func() { - socketLis, err := net.Listen("tcp", ":6000") - if err != nil { - log.Fatalf("Failed to listen for socket: %v", err) - } - log.Println("Socket server listening on :6000") - if err := socketStreamingServer.Serve(socketLis); err != nil { - log.Fatalf("Socket server error: %v", err) - } - }() - */ - - // Block main forever select {} } diff --git a/services/data_service/cmd/stream_tap/main.go b/services/data_service/cmd/stream_tap/main.go index d870359..e86f1ce 100644 --- a/services/data_service/cmd/stream_tap/main.go +++ b/services/data_service/cmd/stream_tap/main.go @@ -2,12 +2,13 @@ package main import ( "context" - "encoding/json" "flag" "fmt" + "math" "os" "os/signal" "strings" + "sync/atomic" "syscall" "time" @@ -47,21 +48,6 @@ func toIdentifierKey(input string) (string, error) { return "raw::" + strings.ToLower(prov) + "." + subj, nil } -func prettyOrRaw(b []byte, pretty bool) string { - if !pretty || len(b) == 0 { - return string(b) - } - var tmp any - if err := json.Unmarshal(b, &tmp); err != nil { - return string(b) - } - out, err := json.MarshalIndent(tmp, "", " ") - if err != nil { - return string(b) - } - return string(out) -} - func waitReady(ctx context.Context, conn *grpc.ClientConn) error { for { s := conn.GetState() @@ -77,18 +63,31 @@ func waitReady(ctx context.Context, conn *grpc.ClientConn) error { } } +type streamStats struct { + TotalMsgs int64 + TotalBytes int64 + TickMsgs int64 + TickBytes int64 +} + +type stats struct { + TotalMsgs int64 + TotalBytes int64 + ByStream map[string]*streamStats +} + func main() { var ids idsFlag var ctlAddr string var strAddr string - var pretty bool var timeout time.Duration + var refresh time.Duration flag.Var(&ids, "id", "identifier (provider:subject or canonical key); repeatable") flag.StringVar(&ctlAddr, "ctl", "127.0.0.1:50051", "gRPC control address") flag.StringVar(&strAddr, "str", "127.0.0.1:50052", "gRPC streaming address") - flag.BoolVar(&pretty, "pretty", true, "pretty-print JSON payloads when possible") flag.DurationVar(&timeout, "timeout", 10*time.Second, "start/config/connect timeout") + flag.DurationVar(&refresh, "refresh", 1*time.Second, "dashboard refresh interval") flag.Parse() if len(ids) == 0 { @@ -99,6 +98,7 @@ func main() { ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() + // Control channel ccCtl, err := grpc.NewClient( ctlAddr, grpc.WithTransportCredentials(insecure.NewCredentials()), @@ -107,15 +107,7 @@ func main() { _, _ = fmt.Fprintf(os.Stderr, "new control client: %v\n", err) os.Exit(1) } - defer func(ccCtl *grpc.ClientConn) { - err := ccCtl.Close() - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "close control client: %v\n", err) - os.Exit(1) - } else { - fmt.Println("closed control client") - } - }(ccCtl) + defer ccCtl.Close() ccCtl.Connect() ctlConnCtx, cancelCtlConn := context.WithTimeout(ctx, timeout) @@ -128,17 +120,20 @@ func main() { ctl := pb.NewDataServiceControlClient(ccCtl) + // Start stream ctxStart, cancelStart := context.WithTimeout(ctx, timeout) startResp, err := ctl.StartStream(ctxStart, &pb.StartStreamRequest{}) cancelStart() if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "StartClientStream: %v\n", err) + _, _ = fmt.Fprintf(os.Stderr, "StartStream: %v\n", err) os.Exit(1) } streamUUID := startResp.GetStreamUuid() fmt.Printf("stream: %s\n", streamUUID) + // Configure identifiers var pbIDs []*pb.Identifier + orderedIDs := make([]string, 0, len(ids)) for _, s := range ids { key, err := toIdentifierKey(s) if err != nil { @@ -146,6 +141,7 @@ func main() { os.Exit(2) } pbIDs = append(pbIDs, &pb.Identifier{Key: key}) + orderedIDs = append(orderedIDs, key) // preserve CLI order } ctxCfg, cancelCfg := context.WithTimeout(ctx, timeout) @@ -155,11 +151,12 @@ func main() { }) cancelCfg() if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "ConfigureClientStream: %v\n", err) + _, _ = fmt.Fprintf(os.Stderr, "ConfigureStream: %v\n", err) os.Exit(1) } fmt.Printf("configured %d identifiers\n", len(pbIDs)) + // Streaming connection ccStr, err := grpc.NewClient( strAddr, grpc.WithTransportCredentials(insecure.NewCredentials()), @@ -168,15 +165,7 @@ func main() { _, _ = fmt.Fprintf(os.Stderr, "new streaming client: %v\n", err) os.Exit(1) } - defer func(ccStr *grpc.ClientConn) { - err := ccStr.Close() - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "close streaming client: %v\n", err) - os.Exit(1) - } else { - fmt.Println("closed streaming client") - } - }(ccStr) + defer ccStr.Close() ccStr.Connect() strConnCtx, cancelStrConn := context.WithTimeout(ctx, timeout) @@ -192,34 +181,128 @@ func main() { streamCtx, streamCancel := context.WithCancel(ctx) defer streamCancel() - stream, err := str.ConnectStream(streamCtx, &pb.ConnectStreamRequest{StreamUuid: streamUUID}) + srv, err := str.ConnectStream(streamCtx, &pb.ConnectStreamRequest{StreamUuid: streamUUID}) if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "ConnectClientStream: %v\n", err) + _, _ = fmt.Fprintf(os.Stderr, "ConnectStream: %v\n", err) os.Exit(1) } fmt.Println("connected; streaming… (Ctrl-C to quit)") + // Receiver goroutine → channel + type msgWrap struct { + idKey string + size int + err error + } + msgCh := make(chan msgWrap, 1024) + go func() { + for { + m, err := srv.Recv() + if err != nil { + msgCh <- msgWrap{err: err} + close(msgCh) + return + } + id := m.GetIdentifier().GetKey() + msgCh <- msgWrap{idKey: id, size: len(m.GetPayload())} + } + }() + + // Stats and dashboard + st := &stats{ByStream: make(map[string]*streamStats)} + seen := make(map[string]bool, len(orderedIDs)) + for _, id := range orderedIDs { + seen[id] = true + } + tick := time.NewTicker(refresh) + defer tick.Stop() + + clear := func() { fmt.Print("\033[H\033[2J") } + header := func() { + fmt.Printf("stream: %s now: %s refresh: %s\n", + streamUUID, time.Now().Format(time.RFC3339), refresh) + fmt.Println("--------------------------------------------------------------------------------------") + fmt.Printf("%-56s %10s %14s %12s %16s\n", "identifier", "msgs/s", "bytes/s", "total", "total_bytes") + fmt.Println("--------------------------------------------------------------------------------------") + } + + printAndReset := func() { + clear() + header() + + var totMsgsPS, totBytesPS float64 + for _, id := range orderedIDs { + s, ok := st.ByStream[id] + var msgsPS, bytesPS float64 + var totMsgs, totBytes int64 + if ok { + // Convert window counts into per-second rates. + msgsPS = float64(atomic.SwapInt64(&s.TickMsgs, 0)) / refresh.Seconds() + bytesPS = float64(atomic.SwapInt64(&s.TickBytes, 0)) / refresh.Seconds() + totMsgs = atomic.LoadInt64(&s.TotalMsgs) + totBytes = atomic.LoadInt64(&s.TotalBytes) + } + totMsgsPS += msgsPS + totBytesPS += bytesPS + fmt.Printf("%-56s %10d %14d %12d %16d\n", + id, + int64(math.Round(msgsPS)), + int64(math.Round(bytesPS)), + totMsgs, + totBytes, + ) + } + + fmt.Println("--------------------------------------------------------------------------------------") + fmt.Printf("%-56s %10d %14d %12d %16d\n", + "TOTAL", + int64(math.Round(totMsgsPS)), + int64(math.Round(totBytesPS)), + atomic.LoadInt64(&st.TotalMsgs), + atomic.LoadInt64(&st.TotalBytes), + ) + } + for { select { case <-ctx.Done(): fmt.Println("\nshutting down") return - default: - msg, err := stream.Recv() - if err != nil { + + case <-tick.C: + printAndReset() + + case mw, ok := <-msgCh: + if !ok { + return + } + if mw.err != nil { if ctx.Err() != nil { return } - _, _ = fmt.Fprintf(os.Stderr, "recv: %v\n", err) + _, _ = fmt.Fprintf(os.Stderr, "recv: %v\n", mw.err) os.Exit(1) } - id := msg.GetIdentifier() - fmt.Printf("[%s] bytes=%d enc=%s t=%s\n", - id.GetKey(), len(msg.GetPayload()), msg.GetEncoding(), - time.Now().Format(time.RFC3339Nano), - ) - fmt.Println(prettyOrRaw(msg.GetPayload(), pretty)) - fmt.Println("---") + + // Maintain stable order: append new identifiers at first sight. + if !seen[mw.idKey] { + seen[mw.idKey] = true + orderedIDs = append(orderedIDs, mw.idKey) + } + + // Account + atomic.AddInt64(&st.TotalMsgs, 1) + atomic.AddInt64(&st.TotalBytes, int64(mw.size)) + + ss := st.ByStream[mw.idKey] + if ss == nil { + ss = &streamStats{} + st.ByStream[mw.idKey] = ss + } + atomic.AddInt64(&ss.TotalMsgs, 1) + atomic.AddInt64(&ss.TotalBytes, int64(mw.size)) + atomic.AddInt64(&ss.TickMsgs, 1) + atomic.AddInt64(&ss.TickBytes, int64(mw.size)) } } } diff --git a/services/data_service/internal/domain/message.go b/services/data_service/internal/domain/message.go index c73dd04..516260d 100644 --- a/services/data_service/internal/domain/message.go +++ b/services/data_service/internal/domain/message.go @@ -1,14 +1,6 @@ package domain -type Encoding string - -const ( - EncodingJSON Encoding = "json" - EncodingProtobuf Encoding = "protobuf" -) - type Message struct { Identifier Identifier Payload []byte - Encoding Encoding } diff --git a/services/data_service/internal/manager/helper.go b/services/data_service/internal/manager/helper.go index e8957ee..7b16770 100644 --- a/services/data_service/internal/manager/helper.go +++ b/services/data_service/internal/manager/helper.go @@ -39,31 +39,6 @@ func identifierSetDifferences(old map[domain.Identifier]struct{}, next []domain. return } -// joinErrors aggregates multiple errors. -type joined struct{ es []error } - -func (j joined) Error() string { - switch n := len(j.es); { - case n == 0: - return "" - case n == 1: - return j.es[0].Error() - default: - s := j.es[0].Error() - for i := 1; i < n; i++ { - s += "; " + j.es[i].Error() - } - return s - } -} - -func join(es []error) error { - if len(es) == 0 { - return nil - } - return joined{es} -} - // resolveProvider parses a raw identifier and looks up the provider. func (m *Manager) resolveProvider(id domain.Identifier) (provider.Provider, string, error) { provName, subj, ok := id.ProviderSubject() diff --git a/services/data_service/internal/manager/manager.go b/services/data_service/internal/manager/manager.go index a6abea1..adc018d 100644 --- a/services/data_service/internal/manager/manager.go +++ b/services/data_service/internal/manager/manager.go @@ -1,7 +1,9 @@ package manager import ( + "errors" "fmt" + "log/slog" "time" "github.com/google/uuid" @@ -10,6 +12,8 @@ import ( "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/router" ) +func lg() *slog.Logger { return slog.Default().With("cmp", "manager") } + // Manager is a single-goroutine actor that owns all state. type Manager struct { // Command channel @@ -24,8 +28,8 @@ type Manager struct { router *router.Router } -// New creates a manager and starts its run loop. -func New(r *router.Router) *Manager { +// NewManager creates a manager and starts its run loop. +func NewManager(r *router.Router) *Manager { m := &Manager{ cmdCh: make(chan any, 256), providers: make(map[string]provider.Provider), @@ -35,6 +39,9 @@ func New(r *router.Router) *Manager { } go r.Run() go m.run() + + lg().Info("manager started") + return m } @@ -42,6 +49,7 @@ func New(r *router.Router) *Manager { // AddProvider adds and starts a new provider. func (m *Manager) AddProvider(name string, p provider.Provider) error { + lg().Debug("add provider request", slog.String("name", name)) resp := make(chan error, 1) m.cmdCh <- addProviderCmd{name: name, p: p, resp: resp} return <-resp @@ -49,6 +57,7 @@ func (m *Manager) AddProvider(name string, p provider.Provider) error { // RemoveProvider stops and removes a provider, cleaning up all sessions. func (m *Manager) RemoveProvider(name string) error { + lg().Debug("remove provider request", slog.String("name", name)) resp := make(chan error, 1) m.cmdCh <- removeProviderCmd{name: name, resp: resp} return <-resp @@ -56,6 +65,7 @@ func (m *Manager) RemoveProvider(name string) error { // NewSession creates a new session with the given idle timeout. func (m *Manager) NewSession(idleAfter time.Duration) (uuid.UUID, error) { + lg().Debug("new session request", slog.Duration("idle_after", idleAfter)) resp := make(chan struct { id uuid.UUID err error @@ -67,6 +77,7 @@ func (m *Manager) NewSession(idleAfter time.Duration) (uuid.UUID, error) { // AttachClient attaches a client to a session, creates and returns client channels for the session. func (m *Manager) AttachClient(id uuid.UUID, inBuf, outBuf int) (chan<- domain.Message, <-chan domain.Message, error) { + lg().Debug("attach client request", slog.String("session", id.String()), slog.Int("in_buf", inBuf), slog.Int("out_buf", outBuf)) resp := make(chan struct { cin chan<- domain.Message cout <-chan domain.Message @@ -79,6 +90,7 @@ func (m *Manager) AttachClient(id uuid.UUID, inBuf, outBuf int) (chan<- domain.M // DetachClient detaches the client from the session, closes client channels and arms timeout. func (m *Manager) DetachClient(id uuid.UUID) error { + lg().Debug("detach client request", slog.String("session", id.String())) resp := make(chan error, 1) m.cmdCh <- detachCmd{sid: id, resp: resp} return <-resp @@ -86,6 +98,7 @@ func (m *Manager) DetachClient(id uuid.UUID) error { // ConfigureSession sets the next set of identifiers for the session, starting and stopping streams as needed. func (m *Manager) ConfigureSession(id uuid.UUID, next []domain.Identifier) error { + lg().Debug("configure session request", slog.String("session", id.String()), slog.Int("idents", len(next))) resp := make(chan error, 1) m.cmdCh <- configureCmd{sid: id, next: next, resp: resp} return <-resp @@ -93,6 +106,7 @@ func (m *Manager) ConfigureSession(id uuid.UUID, next []domain.Identifier) error // CloseSession closes and removes the session, cleaning up all bindings. func (m *Manager) CloseSession(id uuid.UUID) error { + lg().Debug("close session request", slog.String("session", id.String())) resp := make(chan error, 1) m.cmdCh <- closeSessionCmd{sid: id, resp: resp} return <-resp @@ -125,10 +139,12 @@ func (m *Manager) run() { func (m *Manager) handleAddProvider(cmd addProviderCmd) { if _, ok := m.providers[cmd.name]; ok { + lg().Warn("provider already exists", slog.String("name", cmd.name)) cmd.resp <- fmt.Errorf("provider exists: %s", cmd.name) return } if err := cmd.p.Start(); err != nil { + lg().Warn("failed to start provider", slog.String("name", cmd.name), slog.String("err", err.Error())) cmd.resp <- fmt.Errorf("start provider %s: %w", cmd.name, err) return } @@ -139,6 +155,7 @@ func (m *Manager) handleAddProvider(cmd addProviderCmd) { func (m *Manager) handleRemoveProvider(cmd removeProviderCmd) { p, ok := m.providers[cmd.name] if !ok { + lg().Warn("provider not found", slog.String("name", cmd.name)) cmd.resp <- fmt.Errorf("provider not found: %s", cmd.name) return } @@ -149,6 +166,7 @@ func (m *Manager) handleRemoveProvider(cmd removeProviderCmd) { provName, subj, ok := ident.ProviderSubject() if !ok || provName != cmd.name { // TODO: add log warning, but basically should never ever happen + lg().Warn("identifier with mismatched provider found in session during provider removal", slog.String("session", s.id.String()), slog.String("ident", ident.Key()), slog.String("expected_provider", cmd.name), slog.String("found_provider", provName)) continue } if s.attached && s.clientOut != nil { @@ -158,19 +176,19 @@ func (m *Manager) handleRemoveProvider(cmd removeProviderCmd) { // decrementStreamRefCount returns true if this was the last ref. In which case we want to stop the stream. if ident.IsRaw() && m.decrementStreamRefCount(ident) && subj != "" { - _ = p.StopStream(subj) // best-effort as we will remove the provider anyway + _ = p.StopStreams([]string{subj}) // best-effort as we will remove the provider anyway } } } - // first iteration above is sound, but as a precaution we also clean up any dangling streamRef entries here + // Defensive sweep: log and clear any dangling streamRef entries for this provider. for id := range m.streamRef { provName, _, ok := id.ProviderSubject() if !ok || provName != cmd.name { continue } - fmt.Printf("manager: warning — dangling streamRef for %s after removing provider %s\n", id.Key(), cmd.name) delete(m.streamRef, id) + lg().Warn("dangling streamRef entry found during provider removal", slog.String("ident", id.Key()), slog.String("provider", cmd.name)) } p.Stop() @@ -196,6 +214,8 @@ func (m *Manager) handleNewSession(cmd newSessionCmd) { id uuid.UUID err error }{id: s.id, err: nil} + + lg().Info("new session created", slog.String("session", s.id.String()), slog.Duration("idle_after", cmd.idleAfter)) } func (m *Manager) handleAttach(cmd attachCmd) { @@ -232,6 +252,8 @@ func (m *Manager) handleAttach(cmd attachCmd) { cout <-chan domain.Message err error }{cin, cout, err} + + lg().Info("client attached to session", slog.String("session", s.id.String())) } func (m *Manager) handleDetach(cmd detachCmd) { @@ -252,126 +274,163 @@ func (m *Manager) handleDetach(cmd detachCmd) { _ = m.detachSession(cmd.sid, s) cmd.resp <- nil + + lg().Info("client detached from session", slog.String("session", s.id.String())) } -func (m *Manager) handleConfigure(c configureCmd) { - s, ok := m.sessions[c.sid] +func (m *Manager) handleConfigure(cmd configureCmd) { + s, ok := m.sessions[cmd.sid] if !ok { - c.resp <- ErrSessionNotFound + cmd.resp <- ErrSessionNotFound return } if s.closed { - c.resp <- ErrSessionClosed + cmd.resp <- ErrSessionClosed return } old := copySet(s.bound) - toAdd, toDel := identifierSetDifferences(old, c.next) + toAdd, toDel := identifierSetDifferences(old, cmd.next) - // 1) Handle removals first. + var aggErrs error + + // 1) Build batches: provider → starts(starters) and stops(subjects) + type starter struct { + id domain.Identifier + subj string + } + startsByProv := make(map[provider.Provider][]starter) + stopsByProv := make(map[provider.Provider][]string) + + // Removals for _, ident := range toDel { if s.attached && s.clientOut != nil { m.router.DeregisterRoute(ident, s.clientOut) } delete(s.bound, ident) - if ident.IsRaw() { - if m.decrementStreamRefCount(ident) { - if p, subj, err := m.resolveProvider(ident); err == nil { - _ = p.StopStream(subj) // fire-and-forget - } - } - } - } - - // 2) Handle additions. Collect starts to await. - type startItem struct { - id domain.Identifier - ch <-chan error - } - var starts []startItem - var initErrs []error - - for _, ident := range toAdd { - // Bind intent now. - s.bound[ident] = struct{}{} - if !ident.IsRaw() { - if s.attached && s.clientOut != nil { - m.router.RegisterRoute(ident, s.clientOut) - } continue } p, subj, err := m.resolveProvider(ident) if err != nil { - delete(s.bound, ident) - initErrs = append(initErrs, err) + aggErrs = errors.Join(aggErrs, fmt.Errorf("stop %s: %w", ident.Key(), err)) + continue + } + if subj == "" { + continue + } + + if m.decrementStreamRefCount(ident) { // only when last ref + stopsByProv[p] = append(stopsByProv[p], subj) + } + } + + // Additions + for _, ident := range toAdd { + if !ident.IsRaw() { + if s.attached && s.clientOut != nil { + m.router.RegisterRoute(ident, s.clientOut) + } + s.bound[ident] = struct{}{} + continue + } + + p, subj, err := m.resolveProvider(ident) + if err != nil { + aggErrs = errors.Join(aggErrs, err) continue } if !p.IsValidSubject(subj, false) { - delete(s.bound, ident) - initErrs = append(initErrs, fmt.Errorf("invalid subject %q for provider", subj)) + aggErrs = errors.Join(aggErrs, fmt.Errorf("invalid subject %q", subj)) continue } - first := m.incrementStreamRefCount(ident) - - if first || !p.IsStreamActive(subj) { - ch := p.StartStream(subj, m.router.IncomingChannel()) - starts = append(starts, startItem{id: ident, ch: ch}) - } else if s.attached && s.clientOut != nil { - // Already active, just register for this session. - m.router.RegisterRoute(ident, s.clientOut) - } - } - - // 3) Wait for starts initiated by this call, each with its own timeout. - if len(starts) == 0 { - c.resp <- join(initErrs) - return - } - - type result struct { - id domain.Identifier - err error - } - done := make(chan result, len(starts)) - - for _, si := range starts { - // Per-start waiter. - go func(id domain.Identifier, ch <-chan error) { - select { - case err := <-ch: - done <- result{id: id, err: err} - case <-time.After(statusWaitTotal): - done <- result{id: id, err: fmt.Errorf("timeout")} + if m.incrementStreamRefCount(ident) { // first ref → start later + startsByProv[p] = append(startsByProv[p], starter{id: ident, subj: subj}) + } else { + // already active → bind+route now + if s.attached && s.clientOut != nil { + m.router.RegisterRoute(ident, s.clientOut) } - }(si.id, si.ch) + s.bound[ident] = struct{}{} + } } - // Collect results and apply. - for i := 0; i < len(starts); i++ { - r := <-done - if r.err != nil { - // Roll back this session's bind and drop ref. - delete(s.bound, r.id) - _ = m.decrementStreamRefCount(r.id) - initErrs = append(initErrs, fmt.Errorf("start %v: %w", r.id, r.err)) - continue + // 2) Fire provider calls + type batchRes struct { + prov provider.Provider + err error + op string // "start"/"stop" + } + done := make(chan batchRes, len(startsByProv)+len(stopsByProv)) + + // Start batches + for p, items := range startsByProv { + subjs := make([]string, 0, len(items)) + for _, it := range items { + subjs = append(subjs, it.subj) } - // Success: register for any attached sessions that are bound. - for _, sess := range m.sessions { - if !sess.attached || sess.clientOut == nil { + ack := p.StartStreams(subjs) + go func(p provider.Provider, ack <-chan error) { + var err error + select { + case err = <-ack: + case <-time.After(statusWaitTotal): + err = fmt.Errorf("timeout") + } + done <- batchRes{prov: p, err: err, op: "start"} + }(p, ack) + } + + // Stop batches + for p, subjs := range stopsByProv { + ack := p.StopStreams(subjs) + go func(p provider.Provider, ack <-chan error) { + var err error + select { + case err = <-ack: + case <-time.After(statusWaitTotal): + err = fmt.Errorf("timeout") + } + done <- batchRes{prov: p, err: err, op: "stop"} + }(p, ack) + } + + // 3) Collect results + for i := 0; i < len(startsByProv)+len(stopsByProv); i++ { + r := <-done + switch r.op { + case "start": + items := startsByProv[r.prov] + if r.err != nil { + // Roll back refcounts for each ident in this provider batch + for _, it := range items { + _ = m.decrementStreamRefCount(it.id) + aggErrs = errors.Join(aggErrs, fmt.Errorf("start %s: %w", it.id.Key(), r.err)) + } continue } - if _, bound := sess.bound[r.id]; bound { - m.router.RegisterRoute(r.id, sess.clientOut) + // Success → bind and route + for _, it := range items { + if s.attached && s.clientOut != nil { + m.router.RegisterRoute(it.id, s.clientOut) + } + s.bound[it.id] = struct{}{} + } + case "stop": + if r.err != nil { + for _, subj := range stopsByProv[r.prov] { + aggErrs = errors.Join(aggErrs, fmt.Errorf("stop %s/%s: %w", "raw", subj, r.err)) + } } } } - c.resp <- join(initErrs) + cmd.resp <- aggErrs + + lg().Info("session configured", slog.String("session", s.id.String()), slog.Int("bound", len(s.bound)), slog.Int("to_add", len(toAdd)), slog.Int("to_del", len(toDel))) } func (m *Manager) handleCloseSession(c closeSessionCmd) { @@ -382,4 +441,6 @@ func (m *Manager) handleCloseSession(c closeSessionCmd) { } m.closeSession(c.sid, s) c.resp <- nil + + lg().Info("session closed", slog.String("session", s.id.String())) } diff --git a/services/data_service/internal/manager/session.go b/services/data_service/internal/manager/session.go index ebddeb1..9bf4a7b 100644 --- a/services/data_service/internal/manager/session.go +++ b/services/data_service/internal/manager/session.go @@ -7,6 +7,21 @@ import ( "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" ) +// Session holds per-session state. Owned by the manager loop. +type session struct { + id uuid.UUID + + clientIn chan domain.Message // caller writes + clientOut chan domain.Message // caller reads + + bound map[domain.Identifier]struct{} + + closed bool + attached bool + idleAfter time.Duration + idleTimer *time.Timer +} + // attachSession wires channels, stops idle timer, and registers ready routes. // Precondition: session exists and is not attached/closed. Runs in loop. func (m *Manager) attachSession(s *session, inBuf, outBuf int) (chan<- domain.Message, <-chan domain.Message, error) { @@ -32,7 +47,7 @@ func (m *Manager) attachSession(s *session, inBuf, outBuf int) (chan<- domain.Me select { case dst <- msg: default: - // drop + lg().Warn("drop message on clientIn backpressure", "identifier", msg.Identifier.Key()) } } }(cin, m.router.IncomingChannel()) @@ -105,7 +120,7 @@ func (m *Manager) closeSession(sid uuid.UUID, s *session) { } if last := m.decrementStreamRefCount(ident); last { if p, subj, err := m.resolveProvider(ident); err == nil { - _ = p.StopStream(subj) // do not wait + _ = p.StopStreams([]string{subj}) // do not wait } } } diff --git a/services/data_service/internal/manager/types.go b/services/data_service/internal/manager/types.go index 4b01bef..89c548e 100644 --- a/services/data_service/internal/manager/types.go +++ b/services/data_service/internal/manager/types.go @@ -11,7 +11,7 @@ import ( // Shared constants. const ( defaultClientBuf = 256 - statusWaitTotal = 8 * time.Second + statusWaitTotal = 10 * time.Second ) // Manager-level errors. @@ -24,21 +24,6 @@ var ( ErrUnknownProvider = errorf("unknown provider") ) -// Session holds per-session state. Owned by the manager loop. -type session struct { - id uuid.UUID - - clientIn chan domain.Message // caller writes - clientOut chan domain.Message // caller reads - - bound map[domain.Identifier]struct{} - - closed bool - attached bool - idleAfter time.Duration - idleTimer *time.Timer -} - // Commands posted into the manager loop. One struct per action. type addProviderCmd struct { name string diff --git a/services/data_service/internal/provider/binance/futures_websocket.go b/services/data_service/internal/provider/binance/futures_websocket.go index 329350f..7129f1e 100644 --- a/services/data_service/internal/provider/binance/futures_websocket.go +++ b/services/data_service/internal/provider/binance/futures_websocket.go @@ -1,643 +1,410 @@ package binance import ( + "context" "encoding/json" "errors" - "fmt" - "net/http" + "log/slog" "sync" "sync/atomic" "time" - "github.com/gorilla/websocket" + "github.com/coder/websocket" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" ) const ( - wsURL = "wss://fstream.binance.com/stream" - writeRatePerSecond = 8 // hard cap per second - writeBurst = 8 // token bucket burst - writeWait = 5 * time.Second // per write deadline + endpoint = "wss://stream.binance.com:9443/stream" + cmpName = "binance_futures_websocket" - batchPeriod = 1 * time.Second // batch SUB/UNSUB every second - - reconnectMin = 500 * time.Millisecond - reconnectMax = 10 * time.Second + // I/O limits + readLimitBytes = 8 << 20 + writeTimeout = 5 * time.Second + dialTimeout = 10 * time.Second + reconnectMaxBackoff = 30 * time.Second ) -// internal stream states (provider stays simple; manager relies on IsStreamActive) -type streamState uint8 +type wsReq struct { + Method string `json:"method"` + Params []string `json:"params,omitempty"` + ID uint64 `json:"id"` +} -const ( - stateUnknown streamState = iota - statePendingSub - stateActive - statePendingUnsub - stateInactive - stateError -) +type wsAck struct { + Result any `json:"result"` + ID uint64 `json:"id"` +} + +type combinedEvent struct { + Stream string `json:"stream"` + Data json.RawMessage `json:"data"` +} type FuturesWebsocket struct { - dial websocket.Dialer - hdr http.Header + out chan<- domain.Message - // desired subscriptions and sinks - mu sync.Mutex - desired map[string]bool // subject -> want subscribed - sinks map[string]chan<- domain.Message // subject -> destination - states map[string]streamState // subject -> state + mu sync.RWMutex + active map[string]bool - // waiters per subject - startWaiters map[string][]chan error - stopWaiters map[string][]chan error + connMu sync.Mutex + conn *websocket.Conn + cancel context.CancelFunc - // batching queues - subQ chan string - unsubQ chan string + reqID atomic.Uint64 + pending map[uint64]chan error + pmu sync.Mutex - // websocket - writeMu sync.Mutex - conn *websocket.Conn - - // rate limit tokens - tokensCh chan struct{} - stopRate chan struct{} - - // lifecycle + // pumps + writer chan []byte + once sync.Once stopCh chan struct{} - wg sync.WaitGroup - - // ack tracking - ackMu sync.Mutex - idSeq uint64 - pendingA map[int64]ackBatch } -type ackBatch struct { - method string // "SUBSCRIBE" or "UNSUBSCRIBE" - subjects []string -} - -func NewFuturesWebsocket() *FuturesWebsocket { +func NewFuturesWebsocket(out chan<- domain.Message) *FuturesWebsocket { return &FuturesWebsocket{ - desired: make(map[string]bool), - sinks: make(map[string]chan<- domain.Message), - states: make(map[string]streamState), - startWaiters: make(map[string][]chan error), - stopWaiters: make(map[string][]chan error), - subQ: make(chan string, 4096), - unsubQ: make(chan string, 4096), - tokensCh: make(chan struct{}, writeBurst), - stopRate: make(chan struct{}), - stopCh: make(chan struct{}), - pendingA: make(map[int64]ackBatch), + out: out, + active: make(map[string]bool), + pending: make(map[uint64]chan error), + writer: make(chan []byte, 256), + stopCh: make(chan struct{}), } } -/* provider.Provider */ +func (p *FuturesWebsocket) Start() error { + var startErr error + p.once.Do(func() { + go p.run() + }) + return startErr +} -func (b *FuturesWebsocket) Start() error { - // token bucket - b.wg.Add(1) +func (p *FuturesWebsocket) Stop() { + close(p.stopCh) + p.connMu.Lock() + if p.cancel != nil { + p.cancel() + } + if p.conn != nil { + _ = p.conn.Close(websocket.StatusNormalClosure, "shutdown") + p.conn = nil + } + p.connMu.Unlock() + + // fail pending waiters + p.pmu.Lock() + for id, ch := range p.pending { + ch <- errors.New("provider stopped") + close(ch) + delete(p.pending, id) + } + p.pmu.Unlock() + + slog.Default().Info("stopped", "cmp", cmpName) +} + +func (p *FuturesWebsocket) StartStreams(keys []string) <-chan error { + ch := make(chan error, 1) go func() { - defer b.wg.Done() - t := time.NewTicker(time.Second / writeRatePerSecond) - defer t.Stop() - // prime burst - for i := 0; i < writeBurst; i++ { - select { - case b.tokensCh <- struct{}{}: - default: - } + defer close(ch) + if len(keys) == 0 { + ch <- nil + return } - for { - select { - case <-b.stopRate: - return - case <-t.C: - select { - case b.tokensCh <- struct{}{}: - default: + id, ack := p.sendReq("SUBSCRIBE", keys) + if ack == nil { + ch <- errors.New("not connected") + slog.Default().Error("subscribe failed; not connected", "cmp", cmpName, "keys", keys) + return + } + if err := <-ack; err != nil { + ch <- err + slog.Default().Error("subscribe NACK", "cmp", cmpName, "id", id, "keys", keys, "err", err) + return + } + p.mu.Lock() + for _, k := range keys { + p.active[k] = true + } + p.mu.Unlock() + slog.Default().Info("subscribed", "cmp", cmpName, "id", id, "keys", keys) + ch <- nil + }() + return ch +} + +func (p *FuturesWebsocket) StopStreams(keys []string) <-chan error { + ch := make(chan error, 1) + go func() { + defer close(ch) + if len(keys) == 0 { + ch <- nil + return + } + id, ack := p.sendReq("UNSUBSCRIBE", keys) + if ack == nil { + ch <- errors.New("not connected") + slog.Default().Error("unsubscribe failed; not connected", "cmp", cmpName, "keys", keys) + return + } + if err := <-ack; err != nil { + ch <- err + slog.Default().Error("unsubscribe NACK", "cmp", cmpName, "id", id, "keys", keys, "err", err) + return + } + p.mu.Lock() + for _, k := range keys { + delete(p.active, k) + } + p.mu.Unlock() + slog.Default().Info("unsubscribed", "cmp", cmpName, "id", id, "keys", keys) + ch <- nil + }() + return ch +} + +func (p *FuturesWebsocket) Fetch(key string) (domain.Message, error) { + return domain.Message{}, errors.New("not implemented") +} + +func (p *FuturesWebsocket) IsStreamActive(key string) bool { + p.mu.RLock() + defer p.mu.RUnlock() + return p.active[key] +} + +func (p *FuturesWebsocket) IsValidSubject(key string, _ bool) bool { + return len(key) > 0 +} + +// internal + +func (p *FuturesWebsocket) run() { + backoff := time.Second + + for { + // stop? + select { + case <-p.stopCh: + return + default: + } + + if err := p.connect(); err != nil { + slog.Default().Error("dial failed", "cmp", cmpName, "err", err) + time.Sleep(backoff) + if backoff < reconnectMaxBackoff { + backoff *= 2 + } + continue + } + backoff = time.Second + + // resubscribe existing keys + func() { + p.mu.RLock() + if len(p.active) > 0 { + keys := make([]string, 0, len(p.active)) + for k := range p.active { + keys = append(keys, k) + } + _, ack := p.sendReq("SUBSCRIBE", keys) + if ack != nil { + if err := <-ack; err != nil { + slog.Default().Warn("resubscribe error", "cmp", cmpName, "err", err) + } else { + slog.Default().Info("resubscribed", "cmp", cmpName, "count", len(keys)) + } } } + p.mu.RUnlock() + }() + + // run read and write pumps + ctx, cancel := context.WithCancel(context.Background()) + errc := make(chan error, 2) + go func() { errc <- p.readLoop(ctx) }() + go func() { errc <- p.writeLoop(ctx) }() + + // wait for failure or stop + var err error + select { + case <-p.stopCh: + cancel() + p.cleanupConn() + return + case err = <-errc: + cancel() } - }() - // connection manager - b.wg.Add(1) - go b.run() + // fail pendings on error + p.pmu.Lock() + for id, ch := range p.pending { + ch <- err + close(ch) + delete(p.pending, id) + } + p.pmu.Unlock() - // batcher - b.wg.Add(1) - go b.batcher() + slog.Default().Error("ws loop error; reconnecting", "cmp", cmpName, "err", err) + p.cleanupConn() + } +} +func (p *FuturesWebsocket) connect() error { + p.connMu.Lock() + defer p.connMu.Unlock() + if p.conn != nil { + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + + c, _, err := websocket.Dial(ctx, endpoint, &websocket.DialOptions{ + CompressionMode: websocket.CompressionDisabled, + OnPingReceived: func(ctx context.Context, _ []byte) bool { + slog.Default().Info("ping received", "cmp", cmpName) + return true + }, + }) + if err != nil { + cancel() + return err + } + + c.SetReadLimit(8 << 20) + + p.conn = c + p.cancel = cancel + slog.Default().Info("connected", "cmp", cmpName, "endpoint", endpoint) return nil } -func (b *FuturesWebsocket) Stop() { - close(b.stopCh) - close(b.stopRate) - - b.writeMu.Lock() - if b.conn != nil { - _ = b.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "bye")) - _ = b.conn.Close() - b.conn = nil +func (p *FuturesWebsocket) cleanupConn() { + p.connMu.Lock() + defer p.connMu.Unlock() + if p.cancel != nil { + p.cancel() + p.cancel = nil } - b.writeMu.Unlock() - - b.wg.Wait() - - // resolve any remaining waiters with an error - b.mu.Lock() - defer b.mu.Unlock() - for subj, ws := range b.startWaiters { - for _, ch := range ws { - select { - case ch <- errors.New("provider stopped"): - default: - } - close(ch) - } - delete(b.startWaiters, subj) - } - for subj, ws := range b.stopWaiters { - for _, ch := range ws { - select { - case ch <- errors.New("provider stopped"): - default: - } - close(ch) - } - delete(b.stopWaiters, subj) + if p.conn != nil { + _ = p.conn.Close(websocket.StatusAbnormalClosure, "reconnect") + p.conn = nil } } -func (b *FuturesWebsocket) StartStream(subject string, dst chan<- domain.Message) <-chan error { - fmt.Println("Starting stream for subject:", subject) - ch := make(chan error, 1) - - if subject == "" { - ch <- fmt.Errorf("empty subject") - close(ch) - return ch - } - - b.mu.Lock() - // mark desired, update sink - b.desired[subject] = true - b.sinks[subject] = dst - - // fast path: already active - if b.states[subject] == stateActive { - b.mu.Unlock() - ch <- nil - close(ch) - return ch - } - - // enqueue waiter and transition if needed - b.startWaiters[subject] = append(b.startWaiters[subject], ch) - if b.states[subject] != statePendingSub { - b.states[subject] = statePendingSub - select { - case b.subQ <- subject: - default: - // queue full → fail fast - ws := b.startWaiters[subject] - delete(b.startWaiters, subject) - b.states[subject] = stateError - b.mu.Unlock() - for _, w := range ws { - w <- fmt.Errorf("subscribe queue full") - close(w) - } - return ch - } - } - b.mu.Unlock() - return ch -} - -func (b *FuturesWebsocket) StopStream(subject string) <-chan error { - fmt.Println("Stopping stream for subject:", subject) - ch := make(chan error, 1) - - if subject == "" { - ch <- fmt.Errorf("empty subject") - close(ch) - return ch - } - - b.mu.Lock() - // mark no longer desired; keep sink until UNSUB ack to avoid drops - b.desired[subject] = false - - // already inactive - if b.states[subject] == stateInactive { - b.mu.Unlock() - ch <- nil - close(ch) - return ch - } - - // enqueue waiter and transition if needed - b.stopWaiters[subject] = append(b.stopWaiters[subject], ch) - if b.states[subject] != statePendingUnsub { - b.states[subject] = statePendingUnsub - select { - case b.unsubQ <- subject: - default: - // queue full → fail fast - ws := b.stopWaiters[subject] - delete(b.stopWaiters, subject) - b.states[subject] = stateError - b.mu.Unlock() - for _, w := range ws { - w <- fmt.Errorf("unsubscribe queue full") - close(w) - } - return ch - } - } - b.mu.Unlock() - return ch -} - -func (b *FuturesWebsocket) Fetch(_ string) (domain.Message, error) { - return domain.Message{}, fmt.Errorf("fetch not supported") -} - -func (b *FuturesWebsocket) IsStreamActive(subject string) bool { - b.mu.Lock() - defer b.mu.Unlock() - return b.states[subject] == stateActive -} - -func (b *FuturesWebsocket) IsValidSubject(subject string, isFetch bool) bool { - return !isFetch && subject != "" -} - -/* internals */ - -func (b *FuturesWebsocket) run() { - defer b.wg.Done() - - backoff := reconnectMin - - dial := func() (*websocket.Conn, error) { - c, _, err := b.dial.Dial(wsURL, b.hdr) - if err != nil { - return nil, err - } - return c, nil - } - +func (p *FuturesWebsocket) writeLoop(ctx context.Context) error { for { select { - case <-b.stopCh: - return - default: - } + case <-ctx.Done(): + return ctx.Err() - c, err := dial() - if err != nil { - time.Sleep(backoff) - backoff = minDur(backoff*2, reconnectMax) - continue - } - backoff = reconnectMin - - b.writeMu.Lock() - b.conn = c - b.writeMu.Unlock() - - // Resubscribe desired subjects in one batched SUB. - want := b.snapshotDesired(true) // only desired==true - if len(want) > 0 { - _ = b.sendSubscribe(want) - b.mu.Lock() - for _, s := range want { - if b.states[s] != stateActive { - b.states[s] = statePendingSub - } + case b := <-p.writer: + p.connMu.Lock() + c := p.conn + p.connMu.Unlock() + if c == nil { + return errors.New("conn nil") + } + wctx, cancel := context.WithTimeout(ctx, writeTimeout) + err := c.Write(wctx, websocket.MessageText, b) + cancel() + if err != nil { + return err } - b.mu.Unlock() - } - - err = b.readLoop(c) - - // tear down connection - b.writeMu.Lock() - if b.conn != nil { - _ = b.conn.Close() - b.conn = nil - } - b.writeMu.Unlock() - - select { - case <-b.stopCh: - return - default: - time.Sleep(backoff) - backoff = minDur(backoff*2, reconnectMax) } } } -func (b *FuturesWebsocket) batcher() { - defer b.wg.Done() - - t := time.NewTicker(batchPeriod) - defer t.Stop() - - var subs, unsubs []string - - flush := func() { - if len(subs) > 0 { - _ = b.sendSubscribe(subs) - subs = subs[:0] - } - if len(unsubs) > 0 { - _ = b.sendUnsubscribe(unsubs) - unsubs = unsubs[:0] - } - } +func (p *FuturesWebsocket) readLoop(ctx context.Context) error { + slog.Default().Info("read loop started", "cmp", cmpName) + defer slog.Default().Info("read loop exited", "cmp", cmpName) for { - select { - case <-b.stopCh: - return - case s := <-b.subQ: - if s != "" { - subs = append(subs, s) - } - case s := <-b.unsubQ: - if s != "" { - unsubs = append(unsubs, s) - } - case <-t.C: - flush() + p.connMu.Lock() + c := p.conn + p.connMu.Unlock() + if c == nil { + return errors.New("conn nil") } - } -} -func (b *FuturesWebsocket) readLoop(c *websocket.Conn) error { - for { - _, raw, err := c.ReadMessage() + _, data, err := c.Read(ctx) if err != nil { return err } - fmt.Println("Received message:", string(raw)) - - // Stream data or command ack - if hasField(raw, `"stream"`) { - var container struct { - Stream string `json:"stream"` - Data json.RawMessage `json:"data"` - } - if err := json.Unmarshal(raw, &container); err != nil || container.Stream == "" { - continue + // ACK + var ack wsAck + if json.Unmarshal(data, &ack) == nil && ack.ID != 0 { + p.pmu.Lock() + if ch, ok := p.pending[ack.ID]; ok { + if ack.Result == nil { + ch <- nil + slog.Default().Debug("ack ok", "cmp", cmpName, "id", ack.ID) + } else { + resb, _ := json.Marshal(ack.Result) + ch <- errors.New(string(resb)) + slog.Default().Warn("ack error", "cmp", cmpName, "id", ack.ID, "result", string(resb)) + } + close(ch) + delete(p.pending, ack.ID) + } else { + slog.Default().Warn("ack with unknown id", "cmp", cmpName, "id", ack.ID) } + p.pmu.Unlock() + continue + } - b.mu.Lock() - dst, ok := b.sinks[container.Stream] - st := b.states[container.Stream] - b.mu.Unlock() - - if !ok || st == stateInactive || st == statePendingUnsub { - continue - } - - id, err := domain.RawID("binance_futures_websocket", container.Stream) - if err != nil { - continue - } + // Combined stream payload + var evt combinedEvent + if json.Unmarshal(data, &evt) == nil && evt.Stream != "" { + ident, _ := domain.RawID(cmpName, evt.Stream) msg := domain.Message{ - Identifier: id, - Payload: container.Data, - Encoding: domain.EncodingJSON, + Identifier: ident, + Payload: evt.Data, } - select { - case dst <- msg: + case p.out <- msg: default: - // drop on backpressure + slog.Default().Warn("dropping message since router buffer full", "cmp", cmpName, "stream", evt.Stream) } continue } - // Ack path - var ack struct { - Result json.RawMessage `json:"result"` - ID int64 `json:"id"` - } - if err := json.Unmarshal(raw, &ack); err != nil || ack.ID == 0 { - continue - } - - b.ackMu.Lock() - batch, ok := b.pendingA[ack.ID] - if ok { - delete(b.pendingA, ack.ID) - } - b.ackMu.Unlock() - if !ok { - continue - } - - ackErr := (len(ack.Result) > 0 && string(ack.Result) != "null") - - switch batch.method { - case "SUBSCRIBE": - b.mu.Lock() - for _, s := range batch.subjects { - if ackErr { - b.states[s] = stateError - // fail all start waiters - ws := b.startWaiters[s] - delete(b.startWaiters, s) - b.mu.Unlock() - for _, ch := range ws { - ch <- fmt.Errorf("subscribe failed") - close(ch) - } - b.mu.Lock() - continue - } - // success - b.states[s] = stateActive - ws := b.startWaiters[s] - delete(b.startWaiters, s) - dst := b.sinks[s] - b.mu.Unlock() - - for _, ch := range ws { - ch <- nil - close(ch) - } - _ = dst // messages will flow via readLoop - b.mu.Lock() - } - b.mu.Unlock() - - case "UNSUBSCRIBE": - b.mu.Lock() - for _, s := range batch.subjects { - if ackErr { - b.states[s] = stateError - ws := b.stopWaiters[s] - delete(b.stopWaiters, s) - b.mu.Unlock() - for _, ch := range ws { - ch <- fmt.Errorf("unsubscribe failed") - close(ch) - } - b.mu.Lock() - continue - } - // success - b.states[s] = stateInactive - delete(b.sinks, s) // stop delivering - ws := b.stopWaiters[s] - delete(b.stopWaiters, s) - b.mu.Unlock() - for _, ch := range ws { - ch <- nil - close(ch) - } - b.mu.Lock() - } - b.mu.Unlock() + // Unknown frame + const maxSample = 512 + if len(data) > maxSample { + slog.Default().Debug("unparsed frame", "cmp", cmpName, "size", len(data)) + } else { + slog.Default().Debug("unparsed frame", "cmp", cmpName, "size", len(data), "body", string(data)) } } } -func (b *FuturesWebsocket) nextID() int64 { - return int64(atomic.AddUint64(&b.idSeq, 1)) -} - -func (b *FuturesWebsocket) sendSubscribe(subjects []string) error { - if len(subjects) == 0 { - return nil - } - id := b.nextID() - req := map[string]any{ - "method": "SUBSCRIBE", - "params": subjects, - "id": id, - } - if err := b.writeJSON(req); err != nil { - // mark error and fail waiters - b.mu.Lock() - for _, s := range subjects { - b.states[s] = stateError - ws := b.startWaiters[s] - delete(b.startWaiters, s) - b.mu.Unlock() - for _, ch := range ws { - ch <- fmt.Errorf("subscribe send failed") - close(ch) - } - b.mu.Lock() - } - b.mu.Unlock() - return err - } - b.ackMu.Lock() - b.pendingA[id] = ackBatch{method: "SUBSCRIBE", subjects: append([]string(nil), subjects...)} - b.ackMu.Unlock() - return nil -} - -func (b *FuturesWebsocket) sendUnsubscribe(subjects []string) error { - if len(subjects) == 0 { - return nil - } - id := b.nextID() - req := map[string]any{ - "method": "UNSUBSCRIBE", - "params": subjects, - "id": id, - } - if err := b.writeJSON(req); err != nil { - b.mu.Lock() - for _, s := range subjects { - b.states[s] = stateError - ws := b.stopWaiters[s] - delete(b.stopWaiters, s) - b.mu.Unlock() - for _, ch := range ws { - ch <- fmt.Errorf("unsubscribe send failed") - close(ch) - } - b.mu.Lock() - } - b.mu.Unlock() - return err - } - b.ackMu.Lock() - b.pendingA[id] = ackBatch{method: "UNSUBSCRIBE", subjects: append([]string(nil), subjects...)} - b.ackMu.Unlock() - return nil -} - -func (b *FuturesWebsocket) writeJSON(v any) error { - // token bucket - select { - case <-b.stopCh: - return fmt.Errorf("stopped") - case <-b.tokensCh: - } - - b.writeMu.Lock() - c := b.conn - b.writeMu.Unlock() +func (p *FuturesWebsocket) sendReq(method string, params []string) (uint64, <-chan error) { + p.connMu.Lock() + c := p.conn + p.connMu.Unlock() if c == nil { - return fmt.Errorf("not connected") + return 0, nil } - _ = c.SetWriteDeadline(time.Now().Add(writeWait)) - return c.WriteJSON(v) -} + id := p.reqID.Add(1) + req := wsReq{Method: method, Params: params, ID: id} + b, _ := json.Marshal(req) -/* utilities */ + ack := make(chan error, 1) + p.pmu.Lock() + p.pending[id] = ack + p.pmu.Unlock() -func (b *FuturesWebsocket) snapshotDesired(onlyTrue bool) []string { - b.mu.Lock() - defer b.mu.Unlock() - var out []string - for s, want := range b.desired { - if !onlyTrue || want { - out = append(out, s) - } + // enqueue to single writer to avoid concurrent writes + select { + case p.writer <- b: + default: + // avoid blocking the caller; offload + go func() { p.writer <- b }() } - return out -} -func minDur(a, b time.Duration) time.Duration { - if a < b { - return a - } - return b -} - -func hasField(raw []byte, needle string) bool { - // cheap check; avoids another allocation if it's obviously an ACK - return json.Valid(raw) && byteContains(raw, needle) -} - -func byteContains(b []byte, sub string) bool { - n := len(sub) - if n == 0 || len(b) < n { - return false - } - // naive search; sufficient for small frames - for i := 0; i <= len(b)-n; i++ { - if string(b[i:i+n]) == sub { - return true - } - } - return false + slog.Default().Debug("request enqueued", "cmp", cmpName, "id", id, "method", method, "params", params) + return id, ack } diff --git a/services/data_service/internal/provider/provider.go b/services/data_service/internal/provider/provider.go index 5dd6ae7..17877ba 100644 --- a/services/data_service/internal/provider/provider.go +++ b/services/data_service/internal/provider/provider.go @@ -6,8 +6,8 @@ type Provider interface { Start() error Stop() - StartStream(key string, destination chan<- domain.Message) <-chan error - StopStream(key string) <-chan error + StartStreams(keys []string) <-chan error + StopStreams(key []string) <-chan error Fetch(key string) (domain.Message, error) diff --git a/services/data_service/internal/router/router.go b/services/data_service/internal/router/router.go index 31b0fdd..ddb0c26 100644 --- a/services/data_service/internal/router/router.go +++ b/services/data_service/internal/router/router.go @@ -1,7 +1,7 @@ package router import ( - "fmt" + "log/slog" "sync" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" @@ -25,6 +25,7 @@ func (r *Router) IncomingChannel() chan<- domain.Message { } func (r *Router) Run() { + slog.Default().Info("router started", "cmp", "router") for msg := range r.incoming { r.mu.RLock() channels := r.routes[msg.Identifier] @@ -33,7 +34,7 @@ func (r *Router) Run() { select { case ch <- msg: default: - fmt.Println("Router could not push message to a full buffer...") // TODO: Handle full buffer case more gracefully + slog.Default().Warn("dropping message due to backpressure", "cmp", "router", "identifier", msg.Identifier.Key()) } } r.mu.RUnlock() @@ -44,6 +45,8 @@ func (r *Router) RegisterRoute(id domain.Identifier, ch chan<- domain.Message) { r.mu.Lock() r.routes[id] = append(r.routes[id], ch) r.mu.Unlock() + + slog.Default().Debug("registered route", "cmp", "router", "identifier", id.Key(), "channel", ch) } func (r *Router) DeregisterRoute(id domain.Identifier, ch chan<- domain.Message) { @@ -62,4 +65,6 @@ func (r *Router) DeregisterRoute(id domain.Identifier, ch chan<- domain.Message) r.routes[id] = slice } r.mu.Unlock() + + slog.Default().Debug("deregistered route", "cmp", "router", "identifier", id.Key(), "channel", ch) } diff --git a/services/data_service/internal/server/grpc_streaming_server.go b/services/data_service/internal/server/grpc_streaming_server.go index 50c510d..7e7ce81 100644 --- a/services/data_service/internal/server/grpc_streaming_server.go +++ b/services/data_service/internal/server/grpc_streaming_server.go @@ -46,7 +46,6 @@ func (s *GRPCStreamingServer) ConnectStream(req *pb.ConnectStreamRequest, stream if err := stream.Send(&pb.Message{ Identifier: &pb.Identifier{Key: msg.Identifier.Key()}, Payload: msg.Payload, - Encoding: string(msg.Encoding), }); err != nil { return err }