Refactor FuturesWebsocket: implement batch subscription handling, enhance connection management, and improve logging

This commit is contained in:
2025-09-09 00:08:57 +00:00
parent 70f3714d2f
commit 6ebc541de0
13 changed files with 700 additions and 784 deletions

3
go.mod
View File

@@ -4,12 +4,13 @@ go 1.25.1
require ( require (
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3
google.golang.org/grpc v1.75.0 google.golang.org/grpc v1.75.0
google.golang.org/protobuf v1.36.8 google.golang.org/protobuf v1.36.8
) )
require ( require (
github.com/coder/websocket v1.8.14 // indirect
github.com/lmittmann/tint v1.1.2 // indirect
golang.org/x/net v0.43.0 // indirect golang.org/x/net v0.43.0 // indirect
golang.org/x/sys v0.35.0 // indirect golang.org/x/sys v0.35.0 // indirect
golang.org/x/text v0.28.0 // indirect golang.org/x/text v0.28.0 // indirect

4
go.sum
View File

@@ -1,3 +1,5 @@
github.com/coder/websocket v1.8.14 h1:9L0p0iKiNOibykf283eHkKUHHrpG7f65OE3BhhO7v9g=
github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
@@ -10,6 +12,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/lmittmann/tint v1.1.2 h1:2CQzrL6rslrsyjqLDwD11bZ5OpLBPU+g3G/r5LSfS8w=
github.com/lmittmann/tint v1.1.2/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg= go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg=

View File

@@ -1,10 +1,12 @@
package main package main
import ( import (
"fmt" "log/slog"
"log"
"net" "net"
"os"
"time"
"github.com/lmittmann/tint"
pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service" pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider/binance" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider/binance"
@@ -14,26 +16,67 @@ import (
"google.golang.org/grpc/reflection" "google.golang.org/grpc/reflection"
) )
func initLogger() *slog.Logger {
level := parseLevel(env("LOG_LEVEL", "debug"))
if env("LOG_FORMAT", "pretty") == "json" {
return slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: level,
}))
}
return slog.New(tint.NewHandler(os.Stdout, &tint.Options{
Level: level,
TimeFormat: time.RFC3339Nano,
NoColor: os.Getenv("NO_COLOR") != "",
}))
}
func parseLevel(s string) slog.Level {
switch s {
case "debug":
return slog.LevelDebug
case "warn":
return slog.LevelWarn
case "error":
return slog.LevelError
default:
return slog.LevelInfo
}
}
func env(k, def string) string {
if v := os.Getenv(k); v != "" {
return v
}
return def
}
func main() { func main() {
fmt.Println("Starting Data Service...") slog.SetDefault(initLogger())
slog.Info("starting", "svc", "data-service")
// Setup // Setup
r := router.NewRouter(2048) r := router.NewRouter(2048)
m := manager.NewManager(r) m := manager.NewManager(r)
binanceFutures := binance.NewFuturesWebsocket() binanceFutures := binance.NewFuturesWebsocket(r.IncomingChannel())
_ = m.AddProvider("binance_futures_websocket", binanceFutures) if err := m.AddProvider("binance_futures_websocket", binanceFutures); err != nil {
slog.Error("add provider failed", "err", err)
os.Exit(1)
}
// gRPC Control Server // gRPC Control Server
grpcControlServer := grpc.NewServer() grpcControlServer := grpc.NewServer()
go func() { go func() {
pb.RegisterDataServiceControlServer(grpcControlServer, server.NewGRPCControlServer(m)) pb.RegisterDataServiceControlServer(grpcControlServer, server.NewGRPCControlServer(m))
reflection.Register(grpcControlServer) reflection.Register(grpcControlServer)
grpcLis, err := net.Listen("tcp", ":50051") lis, err := net.Listen("tcp", ":50051")
if err != nil { if err != nil {
log.Fatalf("Failed to listen for gRPC control: %v", err) slog.Error("listen failed", "cmp", "grpc-control", "addr", ":50051", "err", err)
os.Exit(1)
} }
log.Println("gRPC control server listening on :50051") slog.Info("listening", "cmp", "grpc-control", "addr", ":50051")
if err := grpcControlServer.Serve(grpcLis); err != nil { if err := grpcControlServer.Serve(lis); err != nil {
log.Fatalf("Failed to serve gRPC control: %v", err) slog.Error("serve failed", "cmp", "grpc-control", "err", err)
os.Exit(1)
} }
}() }()
@@ -42,31 +85,17 @@ func main() {
go func() { go func() {
pb.RegisterDataServiceStreamingServer(grpcStreamingServer, server.NewGRPCStreamingServer(m)) pb.RegisterDataServiceStreamingServer(grpcStreamingServer, server.NewGRPCStreamingServer(m))
reflection.Register(grpcStreamingServer) reflection.Register(grpcStreamingServer)
grpcLis, err := net.Listen("tcp", ":50052") lis, err := net.Listen("tcp", ":50052")
if err != nil { if err != nil {
log.Fatalf("Failed to listen for gRPC: %v", err) slog.Error("listen failed", "cmp", "grpc-streaming", "addr", ":50052", "err", err)
os.Exit(1)
} }
log.Println("gRPC streaming server listening on :50052") slog.Info("listening", "cmp", "grpc-streaming", "addr", ":50052")
if err := grpcStreamingServer.Serve(grpcLis); err != nil { if err := grpcStreamingServer.Serve(lis); err != nil {
log.Fatalf("Failed to serve gRPC: %v", err) slog.Error("serve failed", "cmp", "grpc-streaming", "err", err)
os.Exit(1)
} }
}() }()
// Socket Streaming Server
/*
socketStreamingServer := server.NewSocketStreamingServer(m)
go func() {
socketLis, err := net.Listen("tcp", ":6000")
if err != nil {
log.Fatalf("Failed to listen for socket: %v", err)
}
log.Println("Socket server listening on :6000")
if err := socketStreamingServer.Serve(socketLis); err != nil {
log.Fatalf("Socket server error: %v", err)
}
}()
*/
// Block main forever
select {} select {}
} }

View File

@@ -2,12 +2,13 @@ package main
import ( import (
"context" "context"
"encoding/json"
"flag" "flag"
"fmt" "fmt"
"math"
"os" "os"
"os/signal" "os/signal"
"strings" "strings"
"sync/atomic"
"syscall" "syscall"
"time" "time"
@@ -47,21 +48,6 @@ func toIdentifierKey(input string) (string, error) {
return "raw::" + strings.ToLower(prov) + "." + subj, nil return "raw::" + strings.ToLower(prov) + "." + subj, nil
} }
func prettyOrRaw(b []byte, pretty bool) string {
if !pretty || len(b) == 0 {
return string(b)
}
var tmp any
if err := json.Unmarshal(b, &tmp); err != nil {
return string(b)
}
out, err := json.MarshalIndent(tmp, "", " ")
if err != nil {
return string(b)
}
return string(out)
}
func waitReady(ctx context.Context, conn *grpc.ClientConn) error { func waitReady(ctx context.Context, conn *grpc.ClientConn) error {
for { for {
s := conn.GetState() s := conn.GetState()
@@ -77,18 +63,31 @@ func waitReady(ctx context.Context, conn *grpc.ClientConn) error {
} }
} }
type streamStats struct {
TotalMsgs int64
TotalBytes int64
TickMsgs int64
TickBytes int64
}
type stats struct {
TotalMsgs int64
TotalBytes int64
ByStream map[string]*streamStats
}
func main() { func main() {
var ids idsFlag var ids idsFlag
var ctlAddr string var ctlAddr string
var strAddr string var strAddr string
var pretty bool
var timeout time.Duration var timeout time.Duration
var refresh time.Duration
flag.Var(&ids, "id", "identifier (provider:subject or canonical key); repeatable") flag.Var(&ids, "id", "identifier (provider:subject or canonical key); repeatable")
flag.StringVar(&ctlAddr, "ctl", "127.0.0.1:50051", "gRPC control address") flag.StringVar(&ctlAddr, "ctl", "127.0.0.1:50051", "gRPC control address")
flag.StringVar(&strAddr, "str", "127.0.0.1:50052", "gRPC streaming address") flag.StringVar(&strAddr, "str", "127.0.0.1:50052", "gRPC streaming address")
flag.BoolVar(&pretty, "pretty", true, "pretty-print JSON payloads when possible")
flag.DurationVar(&timeout, "timeout", 10*time.Second, "start/config/connect timeout") flag.DurationVar(&timeout, "timeout", 10*time.Second, "start/config/connect timeout")
flag.DurationVar(&refresh, "refresh", 1*time.Second, "dashboard refresh interval")
flag.Parse() flag.Parse()
if len(ids) == 0 { if len(ids) == 0 {
@@ -99,6 +98,7 @@ func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel() defer cancel()
// Control channel
ccCtl, err := grpc.NewClient( ccCtl, err := grpc.NewClient(
ctlAddr, ctlAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithTransportCredentials(insecure.NewCredentials()),
@@ -107,15 +107,7 @@ func main() {
_, _ = fmt.Fprintf(os.Stderr, "new control client: %v\n", err) _, _ = fmt.Fprintf(os.Stderr, "new control client: %v\n", err)
os.Exit(1) os.Exit(1)
} }
defer func(ccCtl *grpc.ClientConn) { defer ccCtl.Close()
err := ccCtl.Close()
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "close control client: %v\n", err)
os.Exit(1)
} else {
fmt.Println("closed control client")
}
}(ccCtl)
ccCtl.Connect() ccCtl.Connect()
ctlConnCtx, cancelCtlConn := context.WithTimeout(ctx, timeout) ctlConnCtx, cancelCtlConn := context.WithTimeout(ctx, timeout)
@@ -128,17 +120,20 @@ func main() {
ctl := pb.NewDataServiceControlClient(ccCtl) ctl := pb.NewDataServiceControlClient(ccCtl)
// Start stream
ctxStart, cancelStart := context.WithTimeout(ctx, timeout) ctxStart, cancelStart := context.WithTimeout(ctx, timeout)
startResp, err := ctl.StartStream(ctxStart, &pb.StartStreamRequest{}) startResp, err := ctl.StartStream(ctxStart, &pb.StartStreamRequest{})
cancelStart() cancelStart()
if err != nil { if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "StartClientStream: %v\n", err) _, _ = fmt.Fprintf(os.Stderr, "StartStream: %v\n", err)
os.Exit(1) os.Exit(1)
} }
streamUUID := startResp.GetStreamUuid() streamUUID := startResp.GetStreamUuid()
fmt.Printf("stream: %s\n", streamUUID) fmt.Printf("stream: %s\n", streamUUID)
// Configure identifiers
var pbIDs []*pb.Identifier var pbIDs []*pb.Identifier
orderedIDs := make([]string, 0, len(ids))
for _, s := range ids { for _, s := range ids {
key, err := toIdentifierKey(s) key, err := toIdentifierKey(s)
if err != nil { if err != nil {
@@ -146,6 +141,7 @@ func main() {
os.Exit(2) os.Exit(2)
} }
pbIDs = append(pbIDs, &pb.Identifier{Key: key}) pbIDs = append(pbIDs, &pb.Identifier{Key: key})
orderedIDs = append(orderedIDs, key) // preserve CLI order
} }
ctxCfg, cancelCfg := context.WithTimeout(ctx, timeout) ctxCfg, cancelCfg := context.WithTimeout(ctx, timeout)
@@ -155,11 +151,12 @@ func main() {
}) })
cancelCfg() cancelCfg()
if err != nil { if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "ConfigureClientStream: %v\n", err) _, _ = fmt.Fprintf(os.Stderr, "ConfigureStream: %v\n", err)
os.Exit(1) os.Exit(1)
} }
fmt.Printf("configured %d identifiers\n", len(pbIDs)) fmt.Printf("configured %d identifiers\n", len(pbIDs))
// Streaming connection
ccStr, err := grpc.NewClient( ccStr, err := grpc.NewClient(
strAddr, strAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithTransportCredentials(insecure.NewCredentials()),
@@ -168,15 +165,7 @@ func main() {
_, _ = fmt.Fprintf(os.Stderr, "new streaming client: %v\n", err) _, _ = fmt.Fprintf(os.Stderr, "new streaming client: %v\n", err)
os.Exit(1) os.Exit(1)
} }
defer func(ccStr *grpc.ClientConn) { defer ccStr.Close()
err := ccStr.Close()
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "close streaming client: %v\n", err)
os.Exit(1)
} else {
fmt.Println("closed streaming client")
}
}(ccStr)
ccStr.Connect() ccStr.Connect()
strConnCtx, cancelStrConn := context.WithTimeout(ctx, timeout) strConnCtx, cancelStrConn := context.WithTimeout(ctx, timeout)
@@ -192,34 +181,128 @@ func main() {
streamCtx, streamCancel := context.WithCancel(ctx) streamCtx, streamCancel := context.WithCancel(ctx)
defer streamCancel() defer streamCancel()
stream, err := str.ConnectStream(streamCtx, &pb.ConnectStreamRequest{StreamUuid: streamUUID}) srv, err := str.ConnectStream(streamCtx, &pb.ConnectStreamRequest{StreamUuid: streamUUID})
if err != nil { if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "ConnectClientStream: %v\n", err) _, _ = fmt.Fprintf(os.Stderr, "ConnectStream: %v\n", err)
os.Exit(1) os.Exit(1)
} }
fmt.Println("connected; streaming… (Ctrl-C to quit)") fmt.Println("connected; streaming… (Ctrl-C to quit)")
// Receiver goroutine → channel
type msgWrap struct {
idKey string
size int
err error
}
msgCh := make(chan msgWrap, 1024)
go func() {
for {
m, err := srv.Recv()
if err != nil {
msgCh <- msgWrap{err: err}
close(msgCh)
return
}
id := m.GetIdentifier().GetKey()
msgCh <- msgWrap{idKey: id, size: len(m.GetPayload())}
}
}()
// Stats and dashboard
st := &stats{ByStream: make(map[string]*streamStats)}
seen := make(map[string]bool, len(orderedIDs))
for _, id := range orderedIDs {
seen[id] = true
}
tick := time.NewTicker(refresh)
defer tick.Stop()
clear := func() { fmt.Print("\033[H\033[2J") }
header := func() {
fmt.Printf("stream: %s now: %s refresh: %s\n",
streamUUID, time.Now().Format(time.RFC3339), refresh)
fmt.Println("--------------------------------------------------------------------------------------")
fmt.Printf("%-56s %10s %14s %12s %16s\n", "identifier", "msgs/s", "bytes/s", "total", "total_bytes")
fmt.Println("--------------------------------------------------------------------------------------")
}
printAndReset := func() {
clear()
header()
var totMsgsPS, totBytesPS float64
for _, id := range orderedIDs {
s, ok := st.ByStream[id]
var msgsPS, bytesPS float64
var totMsgs, totBytes int64
if ok {
// Convert window counts into per-second rates.
msgsPS = float64(atomic.SwapInt64(&s.TickMsgs, 0)) / refresh.Seconds()
bytesPS = float64(atomic.SwapInt64(&s.TickBytes, 0)) / refresh.Seconds()
totMsgs = atomic.LoadInt64(&s.TotalMsgs)
totBytes = atomic.LoadInt64(&s.TotalBytes)
}
totMsgsPS += msgsPS
totBytesPS += bytesPS
fmt.Printf("%-56s %10d %14d %12d %16d\n",
id,
int64(math.Round(msgsPS)),
int64(math.Round(bytesPS)),
totMsgs,
totBytes,
)
}
fmt.Println("--------------------------------------------------------------------------------------")
fmt.Printf("%-56s %10d %14d %12d %16d\n",
"TOTAL",
int64(math.Round(totMsgsPS)),
int64(math.Round(totBytesPS)),
atomic.LoadInt64(&st.TotalMsgs),
atomic.LoadInt64(&st.TotalBytes),
)
}
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
fmt.Println("\nshutting down") fmt.Println("\nshutting down")
return return
default:
msg, err := stream.Recv() case <-tick.C:
if err != nil { printAndReset()
case mw, ok := <-msgCh:
if !ok {
return
}
if mw.err != nil {
if ctx.Err() != nil { if ctx.Err() != nil {
return return
} }
_, _ = fmt.Fprintf(os.Stderr, "recv: %v\n", err) _, _ = fmt.Fprintf(os.Stderr, "recv: %v\n", mw.err)
os.Exit(1) os.Exit(1)
} }
id := msg.GetIdentifier()
fmt.Printf("[%s] bytes=%d enc=%s t=%s\n", // Maintain stable order: append new identifiers at first sight.
id.GetKey(), len(msg.GetPayload()), msg.GetEncoding(), if !seen[mw.idKey] {
time.Now().Format(time.RFC3339Nano), seen[mw.idKey] = true
) orderedIDs = append(orderedIDs, mw.idKey)
fmt.Println(prettyOrRaw(msg.GetPayload(), pretty)) }
fmt.Println("---")
// Account
atomic.AddInt64(&st.TotalMsgs, 1)
atomic.AddInt64(&st.TotalBytes, int64(mw.size))
ss := st.ByStream[mw.idKey]
if ss == nil {
ss = &streamStats{}
st.ByStream[mw.idKey] = ss
}
atomic.AddInt64(&ss.TotalMsgs, 1)
atomic.AddInt64(&ss.TotalBytes, int64(mw.size))
atomic.AddInt64(&ss.TickMsgs, 1)
atomic.AddInt64(&ss.TickBytes, int64(mw.size))
} }
} }
} }

View File

@@ -1,14 +1,6 @@
package domain package domain
type Encoding string
const (
EncodingJSON Encoding = "json"
EncodingProtobuf Encoding = "protobuf"
)
type Message struct { type Message struct {
Identifier Identifier Identifier Identifier
Payload []byte Payload []byte
Encoding Encoding
} }

View File

@@ -39,31 +39,6 @@ func identifierSetDifferences(old map[domain.Identifier]struct{}, next []domain.
return return
} }
// joinErrors aggregates multiple errors.
type joined struct{ es []error }
func (j joined) Error() string {
switch n := len(j.es); {
case n == 0:
return ""
case n == 1:
return j.es[0].Error()
default:
s := j.es[0].Error()
for i := 1; i < n; i++ {
s += "; " + j.es[i].Error()
}
return s
}
}
func join(es []error) error {
if len(es) == 0 {
return nil
}
return joined{es}
}
// resolveProvider parses a raw identifier and looks up the provider. // resolveProvider parses a raw identifier and looks up the provider.
func (m *Manager) resolveProvider(id domain.Identifier) (provider.Provider, string, error) { func (m *Manager) resolveProvider(id domain.Identifier) (provider.Provider, string, error) {
provName, subj, ok := id.ProviderSubject() provName, subj, ok := id.ProviderSubject()

View File

@@ -1,7 +1,9 @@
package manager package manager
import ( import (
"errors"
"fmt" "fmt"
"log/slog"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
@@ -10,6 +12,8 @@ import (
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/router" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/router"
) )
func lg() *slog.Logger { return slog.Default().With("cmp", "manager") }
// Manager is a single-goroutine actor that owns all state. // Manager is a single-goroutine actor that owns all state.
type Manager struct { type Manager struct {
// Command channel // Command channel
@@ -24,8 +28,8 @@ type Manager struct {
router *router.Router router *router.Router
} }
// New creates a manager and starts its run loop. // NewManager creates a manager and starts its run loop.
func New(r *router.Router) *Manager { func NewManager(r *router.Router) *Manager {
m := &Manager{ m := &Manager{
cmdCh: make(chan any, 256), cmdCh: make(chan any, 256),
providers: make(map[string]provider.Provider), providers: make(map[string]provider.Provider),
@@ -35,6 +39,9 @@ func New(r *router.Router) *Manager {
} }
go r.Run() go r.Run()
go m.run() go m.run()
lg().Info("manager started")
return m return m
} }
@@ -42,6 +49,7 @@ func New(r *router.Router) *Manager {
// AddProvider adds and starts a new provider. // AddProvider adds and starts a new provider.
func (m *Manager) AddProvider(name string, p provider.Provider) error { func (m *Manager) AddProvider(name string, p provider.Provider) error {
lg().Debug("add provider request", slog.String("name", name))
resp := make(chan error, 1) resp := make(chan error, 1)
m.cmdCh <- addProviderCmd{name: name, p: p, resp: resp} m.cmdCh <- addProviderCmd{name: name, p: p, resp: resp}
return <-resp return <-resp
@@ -49,6 +57,7 @@ func (m *Manager) AddProvider(name string, p provider.Provider) error {
// RemoveProvider stops and removes a provider, cleaning up all sessions. // RemoveProvider stops and removes a provider, cleaning up all sessions.
func (m *Manager) RemoveProvider(name string) error { func (m *Manager) RemoveProvider(name string) error {
lg().Debug("remove provider request", slog.String("name", name))
resp := make(chan error, 1) resp := make(chan error, 1)
m.cmdCh <- removeProviderCmd{name: name, resp: resp} m.cmdCh <- removeProviderCmd{name: name, resp: resp}
return <-resp return <-resp
@@ -56,6 +65,7 @@ func (m *Manager) RemoveProvider(name string) error {
// NewSession creates a new session with the given idle timeout. // NewSession creates a new session with the given idle timeout.
func (m *Manager) NewSession(idleAfter time.Duration) (uuid.UUID, error) { func (m *Manager) NewSession(idleAfter time.Duration) (uuid.UUID, error) {
lg().Debug("new session request", slog.Duration("idle_after", idleAfter))
resp := make(chan struct { resp := make(chan struct {
id uuid.UUID id uuid.UUID
err error err error
@@ -67,6 +77,7 @@ func (m *Manager) NewSession(idleAfter time.Duration) (uuid.UUID, error) {
// AttachClient attaches a client to a session, creates and returns client channels for the session. // AttachClient attaches a client to a session, creates and returns client channels for the session.
func (m *Manager) AttachClient(id uuid.UUID, inBuf, outBuf int) (chan<- domain.Message, <-chan domain.Message, error) { func (m *Manager) AttachClient(id uuid.UUID, inBuf, outBuf int) (chan<- domain.Message, <-chan domain.Message, error) {
lg().Debug("attach client request", slog.String("session", id.String()), slog.Int("in_buf", inBuf), slog.Int("out_buf", outBuf))
resp := make(chan struct { resp := make(chan struct {
cin chan<- domain.Message cin chan<- domain.Message
cout <-chan domain.Message cout <-chan domain.Message
@@ -79,6 +90,7 @@ func (m *Manager) AttachClient(id uuid.UUID, inBuf, outBuf int) (chan<- domain.M
// DetachClient detaches the client from the session, closes client channels and arms timeout. // DetachClient detaches the client from the session, closes client channels and arms timeout.
func (m *Manager) DetachClient(id uuid.UUID) error { func (m *Manager) DetachClient(id uuid.UUID) error {
lg().Debug("detach client request", slog.String("session", id.String()))
resp := make(chan error, 1) resp := make(chan error, 1)
m.cmdCh <- detachCmd{sid: id, resp: resp} m.cmdCh <- detachCmd{sid: id, resp: resp}
return <-resp return <-resp
@@ -86,6 +98,7 @@ func (m *Manager) DetachClient(id uuid.UUID) error {
// ConfigureSession sets the next set of identifiers for the session, starting and stopping streams as needed. // ConfigureSession sets the next set of identifiers for the session, starting and stopping streams as needed.
func (m *Manager) ConfigureSession(id uuid.UUID, next []domain.Identifier) error { func (m *Manager) ConfigureSession(id uuid.UUID, next []domain.Identifier) error {
lg().Debug("configure session request", slog.String("session", id.String()), slog.Int("idents", len(next)))
resp := make(chan error, 1) resp := make(chan error, 1)
m.cmdCh <- configureCmd{sid: id, next: next, resp: resp} m.cmdCh <- configureCmd{sid: id, next: next, resp: resp}
return <-resp return <-resp
@@ -93,6 +106,7 @@ func (m *Manager) ConfigureSession(id uuid.UUID, next []domain.Identifier) error
// CloseSession closes and removes the session, cleaning up all bindings. // CloseSession closes and removes the session, cleaning up all bindings.
func (m *Manager) CloseSession(id uuid.UUID) error { func (m *Manager) CloseSession(id uuid.UUID) error {
lg().Debug("close session request", slog.String("session", id.String()))
resp := make(chan error, 1) resp := make(chan error, 1)
m.cmdCh <- closeSessionCmd{sid: id, resp: resp} m.cmdCh <- closeSessionCmd{sid: id, resp: resp}
return <-resp return <-resp
@@ -125,10 +139,12 @@ func (m *Manager) run() {
func (m *Manager) handleAddProvider(cmd addProviderCmd) { func (m *Manager) handleAddProvider(cmd addProviderCmd) {
if _, ok := m.providers[cmd.name]; ok { if _, ok := m.providers[cmd.name]; ok {
lg().Warn("provider already exists", slog.String("name", cmd.name))
cmd.resp <- fmt.Errorf("provider exists: %s", cmd.name) cmd.resp <- fmt.Errorf("provider exists: %s", cmd.name)
return return
} }
if err := cmd.p.Start(); err != nil { if err := cmd.p.Start(); err != nil {
lg().Warn("failed to start provider", slog.String("name", cmd.name), slog.String("err", err.Error()))
cmd.resp <- fmt.Errorf("start provider %s: %w", cmd.name, err) cmd.resp <- fmt.Errorf("start provider %s: %w", cmd.name, err)
return return
} }
@@ -139,6 +155,7 @@ func (m *Manager) handleAddProvider(cmd addProviderCmd) {
func (m *Manager) handleRemoveProvider(cmd removeProviderCmd) { func (m *Manager) handleRemoveProvider(cmd removeProviderCmd) {
p, ok := m.providers[cmd.name] p, ok := m.providers[cmd.name]
if !ok { if !ok {
lg().Warn("provider not found", slog.String("name", cmd.name))
cmd.resp <- fmt.Errorf("provider not found: %s", cmd.name) cmd.resp <- fmt.Errorf("provider not found: %s", cmd.name)
return return
} }
@@ -149,6 +166,7 @@ func (m *Manager) handleRemoveProvider(cmd removeProviderCmd) {
provName, subj, ok := ident.ProviderSubject() provName, subj, ok := ident.ProviderSubject()
if !ok || provName != cmd.name { if !ok || provName != cmd.name {
// TODO: add log warning, but basically should never ever happen // TODO: add log warning, but basically should never ever happen
lg().Warn("identifier with mismatched provider found in session during provider removal", slog.String("session", s.id.String()), slog.String("ident", ident.Key()), slog.String("expected_provider", cmd.name), slog.String("found_provider", provName))
continue continue
} }
if s.attached && s.clientOut != nil { if s.attached && s.clientOut != nil {
@@ -158,19 +176,19 @@ func (m *Manager) handleRemoveProvider(cmd removeProviderCmd) {
// decrementStreamRefCount returns true if this was the last ref. In which case we want to stop the stream. // decrementStreamRefCount returns true if this was the last ref. In which case we want to stop the stream.
if ident.IsRaw() && m.decrementStreamRefCount(ident) && subj != "" { if ident.IsRaw() && m.decrementStreamRefCount(ident) && subj != "" {
_ = p.StopStream(subj) // best-effort as we will remove the provider anyway _ = p.StopStreams([]string{subj}) // best-effort as we will remove the provider anyway
} }
} }
} }
// first iteration above is sound, but as a precaution we also clean up any dangling streamRef entries here // Defensive sweep: log and clear any dangling streamRef entries for this provider.
for id := range m.streamRef { for id := range m.streamRef {
provName, _, ok := id.ProviderSubject() provName, _, ok := id.ProviderSubject()
if !ok || provName != cmd.name { if !ok || provName != cmd.name {
continue continue
} }
fmt.Printf("manager: warning — dangling streamRef for %s after removing provider %s\n", id.Key(), cmd.name)
delete(m.streamRef, id) delete(m.streamRef, id)
lg().Warn("dangling streamRef entry found during provider removal", slog.String("ident", id.Key()), slog.String("provider", cmd.name))
} }
p.Stop() p.Stop()
@@ -196,6 +214,8 @@ func (m *Manager) handleNewSession(cmd newSessionCmd) {
id uuid.UUID id uuid.UUID
err error err error
}{id: s.id, err: nil} }{id: s.id, err: nil}
lg().Info("new session created", slog.String("session", s.id.String()), slog.Duration("idle_after", cmd.idleAfter))
} }
func (m *Manager) handleAttach(cmd attachCmd) { func (m *Manager) handleAttach(cmd attachCmd) {
@@ -232,6 +252,8 @@ func (m *Manager) handleAttach(cmd attachCmd) {
cout <-chan domain.Message cout <-chan domain.Message
err error err error
}{cin, cout, err} }{cin, cout, err}
lg().Info("client attached to session", slog.String("session", s.id.String()))
} }
func (m *Manager) handleDetach(cmd detachCmd) { func (m *Manager) handleDetach(cmd detachCmd) {
@@ -252,126 +274,163 @@ func (m *Manager) handleDetach(cmd detachCmd) {
_ = m.detachSession(cmd.sid, s) _ = m.detachSession(cmd.sid, s)
cmd.resp <- nil cmd.resp <- nil
lg().Info("client detached from session", slog.String("session", s.id.String()))
} }
func (m *Manager) handleConfigure(c configureCmd) { func (m *Manager) handleConfigure(cmd configureCmd) {
s, ok := m.sessions[c.sid] s, ok := m.sessions[cmd.sid]
if !ok { if !ok {
c.resp <- ErrSessionNotFound cmd.resp <- ErrSessionNotFound
return return
} }
if s.closed { if s.closed {
c.resp <- ErrSessionClosed cmd.resp <- ErrSessionClosed
return return
} }
old := copySet(s.bound) old := copySet(s.bound)
toAdd, toDel := identifierSetDifferences(old, c.next) toAdd, toDel := identifierSetDifferences(old, cmd.next)
// 1) Handle removals first. var aggErrs error
// 1) Build batches: provider → starts(starters) and stops(subjects)
type starter struct {
id domain.Identifier
subj string
}
startsByProv := make(map[provider.Provider][]starter)
stopsByProv := make(map[provider.Provider][]string)
// Removals
for _, ident := range toDel { for _, ident := range toDel {
if s.attached && s.clientOut != nil { if s.attached && s.clientOut != nil {
m.router.DeregisterRoute(ident, s.clientOut) m.router.DeregisterRoute(ident, s.clientOut)
} }
delete(s.bound, ident) delete(s.bound, ident)
if ident.IsRaw() {
if m.decrementStreamRefCount(ident) {
if p, subj, err := m.resolveProvider(ident); err == nil {
_ = p.StopStream(subj) // fire-and-forget
}
}
}
}
// 2) Handle additions. Collect starts to await.
type startItem struct {
id domain.Identifier
ch <-chan error
}
var starts []startItem
var initErrs []error
for _, ident := range toAdd {
// Bind intent now.
s.bound[ident] = struct{}{}
if !ident.IsRaw() { if !ident.IsRaw() {
if s.attached && s.clientOut != nil {
m.router.RegisterRoute(ident, s.clientOut)
}
continue continue
} }
p, subj, err := m.resolveProvider(ident) p, subj, err := m.resolveProvider(ident)
if err != nil { if err != nil {
delete(s.bound, ident) aggErrs = errors.Join(aggErrs, fmt.Errorf("stop %s: %w", ident.Key(), err))
initErrs = append(initErrs, err) continue
}
if subj == "" {
continue
}
if m.decrementStreamRefCount(ident) { // only when last ref
stopsByProv[p] = append(stopsByProv[p], subj)
}
}
// Additions
for _, ident := range toAdd {
if !ident.IsRaw() {
if s.attached && s.clientOut != nil {
m.router.RegisterRoute(ident, s.clientOut)
}
s.bound[ident] = struct{}{}
continue
}
p, subj, err := m.resolveProvider(ident)
if err != nil {
aggErrs = errors.Join(aggErrs, err)
continue continue
} }
if !p.IsValidSubject(subj, false) { if !p.IsValidSubject(subj, false) {
delete(s.bound, ident) aggErrs = errors.Join(aggErrs, fmt.Errorf("invalid subject %q", subj))
initErrs = append(initErrs, fmt.Errorf("invalid subject %q for provider", subj))
continue continue
} }
first := m.incrementStreamRefCount(ident) if m.incrementStreamRefCount(ident) { // first ref → start later
startsByProv[p] = append(startsByProv[p], starter{id: ident, subj: subj})
if first || !p.IsStreamActive(subj) { } else {
ch := p.StartStream(subj, m.router.IncomingChannel()) // already active → bind+route now
starts = append(starts, startItem{id: ident, ch: ch}) if s.attached && s.clientOut != nil {
} else if s.attached && s.clientOut != nil { m.router.RegisterRoute(ident, s.clientOut)
// Already active, just register for this session.
m.router.RegisterRoute(ident, s.clientOut)
}
}
// 3) Wait for starts initiated by this call, each with its own timeout.
if len(starts) == 0 {
c.resp <- join(initErrs)
return
}
type result struct {
id domain.Identifier
err error
}
done := make(chan result, len(starts))
for _, si := range starts {
// Per-start waiter.
go func(id domain.Identifier, ch <-chan error) {
select {
case err := <-ch:
done <- result{id: id, err: err}
case <-time.After(statusWaitTotal):
done <- result{id: id, err: fmt.Errorf("timeout")}
} }
}(si.id, si.ch) s.bound[ident] = struct{}{}
}
} }
// Collect results and apply. // 2) Fire provider calls
for i := 0; i < len(starts); i++ { type batchRes struct {
r := <-done prov provider.Provider
if r.err != nil { err error
// Roll back this session's bind and drop ref. op string // "start"/"stop"
delete(s.bound, r.id) }
_ = m.decrementStreamRefCount(r.id) done := make(chan batchRes, len(startsByProv)+len(stopsByProv))
initErrs = append(initErrs, fmt.Errorf("start %v: %w", r.id, r.err))
continue // Start batches
for p, items := range startsByProv {
subjs := make([]string, 0, len(items))
for _, it := range items {
subjs = append(subjs, it.subj)
} }
// Success: register for any attached sessions that are bound. ack := p.StartStreams(subjs)
for _, sess := range m.sessions { go func(p provider.Provider, ack <-chan error) {
if !sess.attached || sess.clientOut == nil { var err error
select {
case err = <-ack:
case <-time.After(statusWaitTotal):
err = fmt.Errorf("timeout")
}
done <- batchRes{prov: p, err: err, op: "start"}
}(p, ack)
}
// Stop batches
for p, subjs := range stopsByProv {
ack := p.StopStreams(subjs)
go func(p provider.Provider, ack <-chan error) {
var err error
select {
case err = <-ack:
case <-time.After(statusWaitTotal):
err = fmt.Errorf("timeout")
}
done <- batchRes{prov: p, err: err, op: "stop"}
}(p, ack)
}
// 3) Collect results
for i := 0; i < len(startsByProv)+len(stopsByProv); i++ {
r := <-done
switch r.op {
case "start":
items := startsByProv[r.prov]
if r.err != nil {
// Roll back refcounts for each ident in this provider batch
for _, it := range items {
_ = m.decrementStreamRefCount(it.id)
aggErrs = errors.Join(aggErrs, fmt.Errorf("start %s: %w", it.id.Key(), r.err))
}
continue continue
} }
if _, bound := sess.bound[r.id]; bound { // Success → bind and route
m.router.RegisterRoute(r.id, sess.clientOut) for _, it := range items {
if s.attached && s.clientOut != nil {
m.router.RegisterRoute(it.id, s.clientOut)
}
s.bound[it.id] = struct{}{}
}
case "stop":
if r.err != nil {
for _, subj := range stopsByProv[r.prov] {
aggErrs = errors.Join(aggErrs, fmt.Errorf("stop %s/%s: %w", "raw", subj, r.err))
}
} }
} }
} }
c.resp <- join(initErrs) cmd.resp <- aggErrs
lg().Info("session configured", slog.String("session", s.id.String()), slog.Int("bound", len(s.bound)), slog.Int("to_add", len(toAdd)), slog.Int("to_del", len(toDel)))
} }
func (m *Manager) handleCloseSession(c closeSessionCmd) { func (m *Manager) handleCloseSession(c closeSessionCmd) {
@@ -382,4 +441,6 @@ func (m *Manager) handleCloseSession(c closeSessionCmd) {
} }
m.closeSession(c.sid, s) m.closeSession(c.sid, s)
c.resp <- nil c.resp <- nil
lg().Info("session closed", slog.String("session", s.id.String()))
} }

View File

@@ -7,6 +7,21 @@ import (
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain"
) )
// Session holds per-session state. Owned by the manager loop.
type session struct {
id uuid.UUID
clientIn chan domain.Message // caller writes
clientOut chan domain.Message // caller reads
bound map[domain.Identifier]struct{}
closed bool
attached bool
idleAfter time.Duration
idleTimer *time.Timer
}
// attachSession wires channels, stops idle timer, and registers ready routes. // attachSession wires channels, stops idle timer, and registers ready routes.
// Precondition: session exists and is not attached/closed. Runs in loop. // Precondition: session exists and is not attached/closed. Runs in loop.
func (m *Manager) attachSession(s *session, inBuf, outBuf int) (chan<- domain.Message, <-chan domain.Message, error) { func (m *Manager) attachSession(s *session, inBuf, outBuf int) (chan<- domain.Message, <-chan domain.Message, error) {
@@ -32,7 +47,7 @@ func (m *Manager) attachSession(s *session, inBuf, outBuf int) (chan<- domain.Me
select { select {
case dst <- msg: case dst <- msg:
default: default:
// drop lg().Warn("drop message on clientIn backpressure", "identifier", msg.Identifier.Key())
} }
} }
}(cin, m.router.IncomingChannel()) }(cin, m.router.IncomingChannel())
@@ -105,7 +120,7 @@ func (m *Manager) closeSession(sid uuid.UUID, s *session) {
} }
if last := m.decrementStreamRefCount(ident); last { if last := m.decrementStreamRefCount(ident); last {
if p, subj, err := m.resolveProvider(ident); err == nil { if p, subj, err := m.resolveProvider(ident); err == nil {
_ = p.StopStream(subj) // do not wait _ = p.StopStreams([]string{subj}) // do not wait
} }
} }
} }

View File

@@ -11,7 +11,7 @@ import (
// Shared constants. // Shared constants.
const ( const (
defaultClientBuf = 256 defaultClientBuf = 256
statusWaitTotal = 8 * time.Second statusWaitTotal = 10 * time.Second
) )
// Manager-level errors. // Manager-level errors.
@@ -24,21 +24,6 @@ var (
ErrUnknownProvider = errorf("unknown provider") ErrUnknownProvider = errorf("unknown provider")
) )
// Session holds per-session state. Owned by the manager loop.
type session struct {
id uuid.UUID
clientIn chan domain.Message // caller writes
clientOut chan domain.Message // caller reads
bound map[domain.Identifier]struct{}
closed bool
attached bool
idleAfter time.Duration
idleTimer *time.Timer
}
// Commands posted into the manager loop. One struct per action. // Commands posted into the manager loop. One struct per action.
type addProviderCmd struct { type addProviderCmd struct {
name string name string

View File

@@ -1,643 +1,410 @@
package binance package binance
import ( import (
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "log/slog"
"net/http"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/gorilla/websocket" "github.com/coder/websocket"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain"
) )
const ( const (
wsURL = "wss://fstream.binance.com/stream" endpoint = "wss://stream.binance.com:9443/stream"
writeRatePerSecond = 8 // hard cap per second cmpName = "binance_futures_websocket"
writeBurst = 8 // token bucket burst
writeWait = 5 * time.Second // per write deadline
batchPeriod = 1 * time.Second // batch SUB/UNSUB every second // I/O limits
readLimitBytes = 8 << 20
reconnectMin = 500 * time.Millisecond writeTimeout = 5 * time.Second
reconnectMax = 10 * time.Second dialTimeout = 10 * time.Second
reconnectMaxBackoff = 30 * time.Second
) )
// internal stream states (provider stays simple; manager relies on IsStreamActive) type wsReq struct {
type streamState uint8 Method string `json:"method"`
Params []string `json:"params,omitempty"`
ID uint64 `json:"id"`
}
const ( type wsAck struct {
stateUnknown streamState = iota Result any `json:"result"`
statePendingSub ID uint64 `json:"id"`
stateActive }
statePendingUnsub
stateInactive type combinedEvent struct {
stateError Stream string `json:"stream"`
) Data json.RawMessage `json:"data"`
}
type FuturesWebsocket struct { type FuturesWebsocket struct {
dial websocket.Dialer out chan<- domain.Message
hdr http.Header
// desired subscriptions and sinks mu sync.RWMutex
mu sync.Mutex active map[string]bool
desired map[string]bool // subject -> want subscribed
sinks map[string]chan<- domain.Message // subject -> destination
states map[string]streamState // subject -> state
// waiters per subject connMu sync.Mutex
startWaiters map[string][]chan error conn *websocket.Conn
stopWaiters map[string][]chan error cancel context.CancelFunc
// batching queues reqID atomic.Uint64
subQ chan string pending map[uint64]chan error
unsubQ chan string pmu sync.Mutex
// websocket // pumps
writeMu sync.Mutex writer chan []byte
conn *websocket.Conn once sync.Once
// rate limit tokens
tokensCh chan struct{}
stopRate chan struct{}
// lifecycle
stopCh chan struct{} stopCh chan struct{}
wg sync.WaitGroup
// ack tracking
ackMu sync.Mutex
idSeq uint64
pendingA map[int64]ackBatch
} }
type ackBatch struct { func NewFuturesWebsocket(out chan<- domain.Message) *FuturesWebsocket {
method string // "SUBSCRIBE" or "UNSUBSCRIBE"
subjects []string
}
func NewFuturesWebsocket() *FuturesWebsocket {
return &FuturesWebsocket{ return &FuturesWebsocket{
desired: make(map[string]bool), out: out,
sinks: make(map[string]chan<- domain.Message), active: make(map[string]bool),
states: make(map[string]streamState), pending: make(map[uint64]chan error),
startWaiters: make(map[string][]chan error), writer: make(chan []byte, 256),
stopWaiters: make(map[string][]chan error), stopCh: make(chan struct{}),
subQ: make(chan string, 4096),
unsubQ: make(chan string, 4096),
tokensCh: make(chan struct{}, writeBurst),
stopRate: make(chan struct{}),
stopCh: make(chan struct{}),
pendingA: make(map[int64]ackBatch),
} }
} }
/* provider.Provider */ func (p *FuturesWebsocket) Start() error {
var startErr error
p.once.Do(func() {
go p.run()
})
return startErr
}
func (b *FuturesWebsocket) Start() error { func (p *FuturesWebsocket) Stop() {
// token bucket close(p.stopCh)
b.wg.Add(1) p.connMu.Lock()
if p.cancel != nil {
p.cancel()
}
if p.conn != nil {
_ = p.conn.Close(websocket.StatusNormalClosure, "shutdown")
p.conn = nil
}
p.connMu.Unlock()
// fail pending waiters
p.pmu.Lock()
for id, ch := range p.pending {
ch <- errors.New("provider stopped")
close(ch)
delete(p.pending, id)
}
p.pmu.Unlock()
slog.Default().Info("stopped", "cmp", cmpName)
}
func (p *FuturesWebsocket) StartStreams(keys []string) <-chan error {
ch := make(chan error, 1)
go func() { go func() {
defer b.wg.Done() defer close(ch)
t := time.NewTicker(time.Second / writeRatePerSecond) if len(keys) == 0 {
defer t.Stop() ch <- nil
// prime burst return
for i := 0; i < writeBurst; i++ {
select {
case b.tokensCh <- struct{}{}:
default:
}
} }
for { id, ack := p.sendReq("SUBSCRIBE", keys)
select { if ack == nil {
case <-b.stopRate: ch <- errors.New("not connected")
return slog.Default().Error("subscribe failed; not connected", "cmp", cmpName, "keys", keys)
case <-t.C: return
select { }
case b.tokensCh <- struct{}{}: if err := <-ack; err != nil {
default: ch <- err
slog.Default().Error("subscribe NACK", "cmp", cmpName, "id", id, "keys", keys, "err", err)
return
}
p.mu.Lock()
for _, k := range keys {
p.active[k] = true
}
p.mu.Unlock()
slog.Default().Info("subscribed", "cmp", cmpName, "id", id, "keys", keys)
ch <- nil
}()
return ch
}
func (p *FuturesWebsocket) StopStreams(keys []string) <-chan error {
ch := make(chan error, 1)
go func() {
defer close(ch)
if len(keys) == 0 {
ch <- nil
return
}
id, ack := p.sendReq("UNSUBSCRIBE", keys)
if ack == nil {
ch <- errors.New("not connected")
slog.Default().Error("unsubscribe failed; not connected", "cmp", cmpName, "keys", keys)
return
}
if err := <-ack; err != nil {
ch <- err
slog.Default().Error("unsubscribe NACK", "cmp", cmpName, "id", id, "keys", keys, "err", err)
return
}
p.mu.Lock()
for _, k := range keys {
delete(p.active, k)
}
p.mu.Unlock()
slog.Default().Info("unsubscribed", "cmp", cmpName, "id", id, "keys", keys)
ch <- nil
}()
return ch
}
func (p *FuturesWebsocket) Fetch(key string) (domain.Message, error) {
return domain.Message{}, errors.New("not implemented")
}
func (p *FuturesWebsocket) IsStreamActive(key string) bool {
p.mu.RLock()
defer p.mu.RUnlock()
return p.active[key]
}
func (p *FuturesWebsocket) IsValidSubject(key string, _ bool) bool {
return len(key) > 0
}
// internal
func (p *FuturesWebsocket) run() {
backoff := time.Second
for {
// stop?
select {
case <-p.stopCh:
return
default:
}
if err := p.connect(); err != nil {
slog.Default().Error("dial failed", "cmp", cmpName, "err", err)
time.Sleep(backoff)
if backoff < reconnectMaxBackoff {
backoff *= 2
}
continue
}
backoff = time.Second
// resubscribe existing keys
func() {
p.mu.RLock()
if len(p.active) > 0 {
keys := make([]string, 0, len(p.active))
for k := range p.active {
keys = append(keys, k)
}
_, ack := p.sendReq("SUBSCRIBE", keys)
if ack != nil {
if err := <-ack; err != nil {
slog.Default().Warn("resubscribe error", "cmp", cmpName, "err", err)
} else {
slog.Default().Info("resubscribed", "cmp", cmpName, "count", len(keys))
}
} }
} }
p.mu.RUnlock()
}()
// run read and write pumps
ctx, cancel := context.WithCancel(context.Background())
errc := make(chan error, 2)
go func() { errc <- p.readLoop(ctx) }()
go func() { errc <- p.writeLoop(ctx) }()
// wait for failure or stop
var err error
select {
case <-p.stopCh:
cancel()
p.cleanupConn()
return
case err = <-errc:
cancel()
} }
}()
// connection manager // fail pendings on error
b.wg.Add(1) p.pmu.Lock()
go b.run() for id, ch := range p.pending {
ch <- err
close(ch)
delete(p.pending, id)
}
p.pmu.Unlock()
// batcher slog.Default().Error("ws loop error; reconnecting", "cmp", cmpName, "err", err)
b.wg.Add(1) p.cleanupConn()
go b.batcher() }
}
func (p *FuturesWebsocket) connect() error {
p.connMu.Lock()
defer p.connMu.Unlock()
if p.conn != nil {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
c, _, err := websocket.Dial(ctx, endpoint, &websocket.DialOptions{
CompressionMode: websocket.CompressionDisabled,
OnPingReceived: func(ctx context.Context, _ []byte) bool {
slog.Default().Info("ping received", "cmp", cmpName)
return true
},
})
if err != nil {
cancel()
return err
}
c.SetReadLimit(8 << 20)
p.conn = c
p.cancel = cancel
slog.Default().Info("connected", "cmp", cmpName, "endpoint", endpoint)
return nil return nil
} }
func (b *FuturesWebsocket) Stop() { func (p *FuturesWebsocket) cleanupConn() {
close(b.stopCh) p.connMu.Lock()
close(b.stopRate) defer p.connMu.Unlock()
if p.cancel != nil {
b.writeMu.Lock() p.cancel()
if b.conn != nil { p.cancel = nil
_ = b.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "bye"))
_ = b.conn.Close()
b.conn = nil
} }
b.writeMu.Unlock() if p.conn != nil {
_ = p.conn.Close(websocket.StatusAbnormalClosure, "reconnect")
b.wg.Wait() p.conn = nil
// resolve any remaining waiters with an error
b.mu.Lock()
defer b.mu.Unlock()
for subj, ws := range b.startWaiters {
for _, ch := range ws {
select {
case ch <- errors.New("provider stopped"):
default:
}
close(ch)
}
delete(b.startWaiters, subj)
}
for subj, ws := range b.stopWaiters {
for _, ch := range ws {
select {
case ch <- errors.New("provider stopped"):
default:
}
close(ch)
}
delete(b.stopWaiters, subj)
} }
} }
func (b *FuturesWebsocket) StartStream(subject string, dst chan<- domain.Message) <-chan error { func (p *FuturesWebsocket) writeLoop(ctx context.Context) error {
fmt.Println("Starting stream for subject:", subject)
ch := make(chan error, 1)
if subject == "" {
ch <- fmt.Errorf("empty subject")
close(ch)
return ch
}
b.mu.Lock()
// mark desired, update sink
b.desired[subject] = true
b.sinks[subject] = dst
// fast path: already active
if b.states[subject] == stateActive {
b.mu.Unlock()
ch <- nil
close(ch)
return ch
}
// enqueue waiter and transition if needed
b.startWaiters[subject] = append(b.startWaiters[subject], ch)
if b.states[subject] != statePendingSub {
b.states[subject] = statePendingSub
select {
case b.subQ <- subject:
default:
// queue full → fail fast
ws := b.startWaiters[subject]
delete(b.startWaiters, subject)
b.states[subject] = stateError
b.mu.Unlock()
for _, w := range ws {
w <- fmt.Errorf("subscribe queue full")
close(w)
}
return ch
}
}
b.mu.Unlock()
return ch
}
func (b *FuturesWebsocket) StopStream(subject string) <-chan error {
fmt.Println("Stopping stream for subject:", subject)
ch := make(chan error, 1)
if subject == "" {
ch <- fmt.Errorf("empty subject")
close(ch)
return ch
}
b.mu.Lock()
// mark no longer desired; keep sink until UNSUB ack to avoid drops
b.desired[subject] = false
// already inactive
if b.states[subject] == stateInactive {
b.mu.Unlock()
ch <- nil
close(ch)
return ch
}
// enqueue waiter and transition if needed
b.stopWaiters[subject] = append(b.stopWaiters[subject], ch)
if b.states[subject] != statePendingUnsub {
b.states[subject] = statePendingUnsub
select {
case b.unsubQ <- subject:
default:
// queue full → fail fast
ws := b.stopWaiters[subject]
delete(b.stopWaiters, subject)
b.states[subject] = stateError
b.mu.Unlock()
for _, w := range ws {
w <- fmt.Errorf("unsubscribe queue full")
close(w)
}
return ch
}
}
b.mu.Unlock()
return ch
}
func (b *FuturesWebsocket) Fetch(_ string) (domain.Message, error) {
return domain.Message{}, fmt.Errorf("fetch not supported")
}
func (b *FuturesWebsocket) IsStreamActive(subject string) bool {
b.mu.Lock()
defer b.mu.Unlock()
return b.states[subject] == stateActive
}
func (b *FuturesWebsocket) IsValidSubject(subject string, isFetch bool) bool {
return !isFetch && subject != ""
}
/* internals */
func (b *FuturesWebsocket) run() {
defer b.wg.Done()
backoff := reconnectMin
dial := func() (*websocket.Conn, error) {
c, _, err := b.dial.Dial(wsURL, b.hdr)
if err != nil {
return nil, err
}
return c, nil
}
for { for {
select { select {
case <-b.stopCh: case <-ctx.Done():
return return ctx.Err()
default:
}
c, err := dial() case b := <-p.writer:
if err != nil { p.connMu.Lock()
time.Sleep(backoff) c := p.conn
backoff = minDur(backoff*2, reconnectMax) p.connMu.Unlock()
continue if c == nil {
} return errors.New("conn nil")
backoff = reconnectMin }
wctx, cancel := context.WithTimeout(ctx, writeTimeout)
b.writeMu.Lock() err := c.Write(wctx, websocket.MessageText, b)
b.conn = c cancel()
b.writeMu.Unlock() if err != nil {
return err
// Resubscribe desired subjects in one batched SUB.
want := b.snapshotDesired(true) // only desired==true
if len(want) > 0 {
_ = b.sendSubscribe(want)
b.mu.Lock()
for _, s := range want {
if b.states[s] != stateActive {
b.states[s] = statePendingSub
}
} }
b.mu.Unlock()
}
err = b.readLoop(c)
// tear down connection
b.writeMu.Lock()
if b.conn != nil {
_ = b.conn.Close()
b.conn = nil
}
b.writeMu.Unlock()
select {
case <-b.stopCh:
return
default:
time.Sleep(backoff)
backoff = minDur(backoff*2, reconnectMax)
} }
} }
} }
func (b *FuturesWebsocket) batcher() { func (p *FuturesWebsocket) readLoop(ctx context.Context) error {
defer b.wg.Done() slog.Default().Info("read loop started", "cmp", cmpName)
defer slog.Default().Info("read loop exited", "cmp", cmpName)
t := time.NewTicker(batchPeriod)
defer t.Stop()
var subs, unsubs []string
flush := func() {
if len(subs) > 0 {
_ = b.sendSubscribe(subs)
subs = subs[:0]
}
if len(unsubs) > 0 {
_ = b.sendUnsubscribe(unsubs)
unsubs = unsubs[:0]
}
}
for { for {
select { p.connMu.Lock()
case <-b.stopCh: c := p.conn
return p.connMu.Unlock()
case s := <-b.subQ: if c == nil {
if s != "" { return errors.New("conn nil")
subs = append(subs, s)
}
case s := <-b.unsubQ:
if s != "" {
unsubs = append(unsubs, s)
}
case <-t.C:
flush()
} }
}
}
func (b *FuturesWebsocket) readLoop(c *websocket.Conn) error { _, data, err := c.Read(ctx)
for {
_, raw, err := c.ReadMessage()
if err != nil { if err != nil {
return err return err
} }
fmt.Println("Received message:", string(raw)) // ACK
var ack wsAck
// Stream data or command ack if json.Unmarshal(data, &ack) == nil && ack.ID != 0 {
if hasField(raw, `"stream"`) { p.pmu.Lock()
var container struct { if ch, ok := p.pending[ack.ID]; ok {
Stream string `json:"stream"` if ack.Result == nil {
Data json.RawMessage `json:"data"` ch <- nil
} slog.Default().Debug("ack ok", "cmp", cmpName, "id", ack.ID)
if err := json.Unmarshal(raw, &container); err != nil || container.Stream == "" { } else {
continue resb, _ := json.Marshal(ack.Result)
ch <- errors.New(string(resb))
slog.Default().Warn("ack error", "cmp", cmpName, "id", ack.ID, "result", string(resb))
}
close(ch)
delete(p.pending, ack.ID)
} else {
slog.Default().Warn("ack with unknown id", "cmp", cmpName, "id", ack.ID)
} }
p.pmu.Unlock()
continue
}
b.mu.Lock() // Combined stream payload
dst, ok := b.sinks[container.Stream] var evt combinedEvent
st := b.states[container.Stream] if json.Unmarshal(data, &evt) == nil && evt.Stream != "" {
b.mu.Unlock() ident, _ := domain.RawID(cmpName, evt.Stream)
if !ok || st == stateInactive || st == statePendingUnsub {
continue
}
id, err := domain.RawID("binance_futures_websocket", container.Stream)
if err != nil {
continue
}
msg := domain.Message{ msg := domain.Message{
Identifier: id, Identifier: ident,
Payload: container.Data, Payload: evt.Data,
Encoding: domain.EncodingJSON,
} }
select { select {
case dst <- msg: case p.out <- msg:
default: default:
// drop on backpressure slog.Default().Warn("dropping message since router buffer full", "cmp", cmpName, "stream", evt.Stream)
} }
continue continue
} }
// Ack path // Unknown frame
var ack struct { const maxSample = 512
Result json.RawMessage `json:"result"` if len(data) > maxSample {
ID int64 `json:"id"` slog.Default().Debug("unparsed frame", "cmp", cmpName, "size", len(data))
} } else {
if err := json.Unmarshal(raw, &ack); err != nil || ack.ID == 0 { slog.Default().Debug("unparsed frame", "cmp", cmpName, "size", len(data), "body", string(data))
continue
}
b.ackMu.Lock()
batch, ok := b.pendingA[ack.ID]
if ok {
delete(b.pendingA, ack.ID)
}
b.ackMu.Unlock()
if !ok {
continue
}
ackErr := (len(ack.Result) > 0 && string(ack.Result) != "null")
switch batch.method {
case "SUBSCRIBE":
b.mu.Lock()
for _, s := range batch.subjects {
if ackErr {
b.states[s] = stateError
// fail all start waiters
ws := b.startWaiters[s]
delete(b.startWaiters, s)
b.mu.Unlock()
for _, ch := range ws {
ch <- fmt.Errorf("subscribe failed")
close(ch)
}
b.mu.Lock()
continue
}
// success
b.states[s] = stateActive
ws := b.startWaiters[s]
delete(b.startWaiters, s)
dst := b.sinks[s]
b.mu.Unlock()
for _, ch := range ws {
ch <- nil
close(ch)
}
_ = dst // messages will flow via readLoop
b.mu.Lock()
}
b.mu.Unlock()
case "UNSUBSCRIBE":
b.mu.Lock()
for _, s := range batch.subjects {
if ackErr {
b.states[s] = stateError
ws := b.stopWaiters[s]
delete(b.stopWaiters, s)
b.mu.Unlock()
for _, ch := range ws {
ch <- fmt.Errorf("unsubscribe failed")
close(ch)
}
b.mu.Lock()
continue
}
// success
b.states[s] = stateInactive
delete(b.sinks, s) // stop delivering
ws := b.stopWaiters[s]
delete(b.stopWaiters, s)
b.mu.Unlock()
for _, ch := range ws {
ch <- nil
close(ch)
}
b.mu.Lock()
}
b.mu.Unlock()
} }
} }
} }
func (b *FuturesWebsocket) nextID() int64 { func (p *FuturesWebsocket) sendReq(method string, params []string) (uint64, <-chan error) {
return int64(atomic.AddUint64(&b.idSeq, 1)) p.connMu.Lock()
} c := p.conn
p.connMu.Unlock()
func (b *FuturesWebsocket) sendSubscribe(subjects []string) error {
if len(subjects) == 0 {
return nil
}
id := b.nextID()
req := map[string]any{
"method": "SUBSCRIBE",
"params": subjects,
"id": id,
}
if err := b.writeJSON(req); err != nil {
// mark error and fail waiters
b.mu.Lock()
for _, s := range subjects {
b.states[s] = stateError
ws := b.startWaiters[s]
delete(b.startWaiters, s)
b.mu.Unlock()
for _, ch := range ws {
ch <- fmt.Errorf("subscribe send failed")
close(ch)
}
b.mu.Lock()
}
b.mu.Unlock()
return err
}
b.ackMu.Lock()
b.pendingA[id] = ackBatch{method: "SUBSCRIBE", subjects: append([]string(nil), subjects...)}
b.ackMu.Unlock()
return nil
}
func (b *FuturesWebsocket) sendUnsubscribe(subjects []string) error {
if len(subjects) == 0 {
return nil
}
id := b.nextID()
req := map[string]any{
"method": "UNSUBSCRIBE",
"params": subjects,
"id": id,
}
if err := b.writeJSON(req); err != nil {
b.mu.Lock()
for _, s := range subjects {
b.states[s] = stateError
ws := b.stopWaiters[s]
delete(b.stopWaiters, s)
b.mu.Unlock()
for _, ch := range ws {
ch <- fmt.Errorf("unsubscribe send failed")
close(ch)
}
b.mu.Lock()
}
b.mu.Unlock()
return err
}
b.ackMu.Lock()
b.pendingA[id] = ackBatch{method: "UNSUBSCRIBE", subjects: append([]string(nil), subjects...)}
b.ackMu.Unlock()
return nil
}
func (b *FuturesWebsocket) writeJSON(v any) error {
// token bucket
select {
case <-b.stopCh:
return fmt.Errorf("stopped")
case <-b.tokensCh:
}
b.writeMu.Lock()
c := b.conn
b.writeMu.Unlock()
if c == nil { if c == nil {
return fmt.Errorf("not connected") return 0, nil
} }
_ = c.SetWriteDeadline(time.Now().Add(writeWait)) id := p.reqID.Add(1)
return c.WriteJSON(v) req := wsReq{Method: method, Params: params, ID: id}
} b, _ := json.Marshal(req)
/* utilities */ ack := make(chan error, 1)
p.pmu.Lock()
p.pending[id] = ack
p.pmu.Unlock()
func (b *FuturesWebsocket) snapshotDesired(onlyTrue bool) []string { // enqueue to single writer to avoid concurrent writes
b.mu.Lock() select {
defer b.mu.Unlock() case p.writer <- b:
var out []string default:
for s, want := range b.desired { // avoid blocking the caller; offload
if !onlyTrue || want { go func() { p.writer <- b }()
out = append(out, s)
}
} }
return out
}
func minDur(a, b time.Duration) time.Duration { slog.Default().Debug("request enqueued", "cmp", cmpName, "id", id, "method", method, "params", params)
if a < b { return id, ack
return a
}
return b
}
func hasField(raw []byte, needle string) bool {
// cheap check; avoids another allocation if it's obviously an ACK
return json.Valid(raw) && byteContains(raw, needle)
}
func byteContains(b []byte, sub string) bool {
n := len(sub)
if n == 0 || len(b) < n {
return false
}
// naive search; sufficient for small frames
for i := 0; i <= len(b)-n; i++ {
if string(b[i:i+n]) == sub {
return true
}
}
return false
} }

View File

@@ -6,8 +6,8 @@ type Provider interface {
Start() error Start() error
Stop() Stop()
StartStream(key string, destination chan<- domain.Message) <-chan error StartStreams(keys []string) <-chan error
StopStream(key string) <-chan error StopStreams(key []string) <-chan error
Fetch(key string) (domain.Message, error) Fetch(key string) (domain.Message, error)

View File

@@ -1,7 +1,7 @@
package router package router
import ( import (
"fmt" "log/slog"
"sync" "sync"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain"
@@ -25,6 +25,7 @@ func (r *Router) IncomingChannel() chan<- domain.Message {
} }
func (r *Router) Run() { func (r *Router) Run() {
slog.Default().Info("router started", "cmp", "router")
for msg := range r.incoming { for msg := range r.incoming {
r.mu.RLock() r.mu.RLock()
channels := r.routes[msg.Identifier] channels := r.routes[msg.Identifier]
@@ -33,7 +34,7 @@ func (r *Router) Run() {
select { select {
case ch <- msg: case ch <- msg:
default: default:
fmt.Println("Router could not push message to a full buffer...") // TODO: Handle full buffer case more gracefully slog.Default().Warn("dropping message due to backpressure", "cmp", "router", "identifier", msg.Identifier.Key())
} }
} }
r.mu.RUnlock() r.mu.RUnlock()
@@ -44,6 +45,8 @@ func (r *Router) RegisterRoute(id domain.Identifier, ch chan<- domain.Message) {
r.mu.Lock() r.mu.Lock()
r.routes[id] = append(r.routes[id], ch) r.routes[id] = append(r.routes[id], ch)
r.mu.Unlock() r.mu.Unlock()
slog.Default().Debug("registered route", "cmp", "router", "identifier", id.Key(), "channel", ch)
} }
func (r *Router) DeregisterRoute(id domain.Identifier, ch chan<- domain.Message) { func (r *Router) DeregisterRoute(id domain.Identifier, ch chan<- domain.Message) {
@@ -62,4 +65,6 @@ func (r *Router) DeregisterRoute(id domain.Identifier, ch chan<- domain.Message)
r.routes[id] = slice r.routes[id] = slice
} }
r.mu.Unlock() r.mu.Unlock()
slog.Default().Debug("deregistered route", "cmp", "router", "identifier", id.Key(), "channel", ch)
} }

View File

@@ -46,7 +46,6 @@ func (s *GRPCStreamingServer) ConnectStream(req *pb.ConnectStreamRequest, stream
if err := stream.Send(&pb.Message{ if err := stream.Send(&pb.Message{
Identifier: &pb.Identifier{Key: msg.Identifier.Key()}, Identifier: &pb.Identifier{Key: msg.Identifier.Key()},
Payload: msg.Payload, Payload: msg.Payload,
Encoding: string(msg.Encoding),
}); err != nil { }); err != nil {
return err return err
} }