From c512f51a57357ef523f5ad1dc7005e1e1e690765 Mon Sep 17 00:00:00 2001 From: Phillip Michelsen Date: Thu, 9 Oct 2025 15:53:02 +0000 Subject: [PATCH] Resolve "Deprecate Providers package in favor of Worker" --- pkg/pb/data_service/data_service.proto | 6 +- .../data_service/cmd/data_service/main.go | 75 +-- services/data_service/cmd/stream_tap/main.go | 308 --------- .../data_service/cmd/stream_tap_v2/main.go | 331 ---------- .../internal/domain/identifier.go | 274 +++++--- .../internal/domain/identifier_test.go | 141 +++++ .../data_service/internal/domain/message.go | 1 + .../data_service/internal/domain/pattern.go | 341 ++++++++-- .../internal/domain/pattern_test.go | 209 ++++++ .../data_service/internal/manager/commands.go | 93 ++- .../data_service/internal/manager/helper.go | 26 + .../data_service/internal/manager/manager.go | 595 ++++++++++++------ .../data_service/internal/manager/session.go | 251 ++++++-- .../internal/manager/worker_registry.go | 120 ++++ .../internal/provider/provider.go | 16 - .../provider/providers/test/test_provider.go | 542 ---------------- .../data_service/internal/router/partition.go | 10 +- .../data_service/internal/router/routerAdv.go | 5 +- .../internal/server/gprc_control_server.go | 82 --- .../internal/server/grpc_streaming_server.go | 54 -- .../server/socket_streaming_server.go | 242 ------- .../data_service/internal/worker/registry.go | 68 -- .../data_service/internal/worker/worker.go | 144 +---- .../workers}/binance/ws/binance_futures.go | 0 .../workers}/binance/ws/shard.go | 0 .../workers}/binance/ws/subjects.go | 0 26 files changed, 1695 insertions(+), 2239 deletions(-) delete mode 100644 services/data_service/cmd/stream_tap/main.go delete mode 100644 services/data_service/cmd/stream_tap_v2/main.go create mode 100644 services/data_service/internal/domain/identifier_test.go create mode 100644 services/data_service/internal/domain/pattern_test.go create mode 100644 services/data_service/internal/manager/worker_registry.go delete mode 100644 services/data_service/internal/provider/provider.go delete mode 100644 services/data_service/internal/provider/providers/test/test_provider.go delete mode 100644 services/data_service/internal/server/gprc_control_server.go delete mode 100644 services/data_service/internal/server/grpc_streaming_server.go delete mode 100644 services/data_service/internal/server/socket_streaming_server.go delete mode 100644 services/data_service/internal/worker/registry.go rename services/data_service/internal/{provider/providers => worker/workers}/binance/ws/binance_futures.go (100%) rename services/data_service/internal/{provider/providers => worker/workers}/binance/ws/shard.go (100%) rename services/data_service/internal/{provider/providers => worker/workers}/binance/ws/subjects.go (100%) diff --git a/pkg/pb/data_service/data_service.proto b/pkg/pb/data_service/data_service.proto index a99eb20..47ff69e 100644 --- a/pkg/pb/data_service/data_service.proto +++ b/pkg/pb/data_service/data_service.proto @@ -18,6 +18,10 @@ message Identifier { string key = 1; } +message Pattern { + string key = 2; +} + message Message { Identifier identifier = 1; bytes payload = 2; @@ -29,7 +33,7 @@ message StartStreamResponse { string stream_uuid = 1; } message ConfigureStreamRequest { string stream_uuid = 1; - repeated Identifier identifiers = 2; + repeated Pattern patterns = 2; } message ConfigureStreamResponse {} diff --git a/services/data_service/cmd/data_service/main.go b/services/data_service/cmd/data_service/main.go index aac2c7a..49e8254 100644 --- a/services/data_service/cmd/data_service/main.go +++ b/services/data_service/cmd/data_service/main.go @@ -2,20 +2,13 @@ package main import ( "log/slog" - "net" "os" "time" "github.com/lmittmann/tint" - pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider/providers/binance/ws" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider/providers/test" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/router" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/server" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/worker" - "google.golang.org/grpc" - "google.golang.org/grpc/reflection" ) func initLogger() *slog.Logger { @@ -58,72 +51,8 @@ func main() { // Setup wr := worker.NewRegistry() - - r := router.NewRouter(2048) - m := manager.NewManager(r, wr) - - // Providers - - testProvider := test.NewTestProvider(r.IncomingChannel(), time.Microsecond*100) - if err := m.AddProvider("test_provider", testProvider); err != nil { - slog.Error("add provider failed", "err", err) - os.Exit(1) - } - - binanceFuturesWebsocket := ws.NewBinanceFuturesWebsocket(ws.Config{}, r.IncomingChannel()) - if err := m.AddProvider("binance_futures", binanceFuturesWebsocket); err != nil { - slog.Error("add provider failed", "err", err) - os.Exit(1) - } - - // gRPC Control Server - grpcControlServer := grpc.NewServer() - go func() { - pb.RegisterDataServiceControlServer(grpcControlServer, server.NewGRPCControlServer(m)) - reflection.Register(grpcControlServer) - lis, err := net.Listen("tcp", ":50051") - if err != nil { - slog.Error("listen failed", "cmp", "grpc-control", "addr", ":50051", "err", err) - os.Exit(1) - } - slog.Info("listening", "cmp", "grpc-control", "addr", ":50051") - if err := grpcControlServer.Serve(lis); err != nil { - slog.Error("serve failed", "cmp", "grpc-control", "err", err) - os.Exit(1) - } - }() - - // gRPC Streaming Server - grpcStreamingServer := grpc.NewServer() - go func() { - pb.RegisterDataServiceStreamingServer(grpcStreamingServer, server.NewGRPCStreamingServer(m)) - reflection.Register(grpcStreamingServer) - lis, err := net.Listen("tcp", ":50052") - if err != nil { - slog.Error("listen failed", "cmp", "grpc-streaming", "addr", ":50052", "err", err) - os.Exit(1) - } - slog.Info("listening", "cmp", "grpc-streaming", "addr", ":50052") - if err := grpcStreamingServer.Serve(lis); err != nil { - slog.Error("serve failed", "cmp", "grpc-streaming", "err", err) - os.Exit(1) - } - }() - - // Socket Streaming Server - socketStreamingServer := server.NewSocketStreamingServer(m) - go func() { - lis, err := net.Listen("tcp", ":50060") - if err != nil { - slog.Error("listen failed", "cmp", "socket-streaming", "addr", ":50060", "err", err) - os.Exit(1) - } - slog.Info("listening", "cmp", "socket-streaming", "addr", ":50060") - if err := socketStreamingServer.Serve(lis); err != nil { - slog.Error("serve failed", "cmp", "socket-streaming", "err", err) - os.Exit(1) - } - }() + r, _ := router.NewRouter("actor", 2048, 512) + _ = manager.NewManager(r, wr) select {} } diff --git a/services/data_service/cmd/stream_tap/main.go b/services/data_service/cmd/stream_tap/main.go deleted file mode 100644 index e86f1ce..0000000 --- a/services/data_service/cmd/stream_tap/main.go +++ /dev/null @@ -1,308 +0,0 @@ -package main - -import ( - "context" - "flag" - "fmt" - "math" - "os" - "os/signal" - "strings" - "sync/atomic" - "syscall" - "time" - - pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service" - "google.golang.org/grpc" - "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/credentials/insecure" -) - -type idsFlag []string - -func (i *idsFlag) String() string { return strings.Join(*i, ",") } -func (i *idsFlag) Set(v string) error { - if v == "" { - return nil - } - *i = append(*i, v) - return nil -} - -func parseIDPair(s string) (provider, subject string, err error) { - parts := strings.SplitN(s, ":", 2) - if len(parts) != 2 || parts[0] == "" || parts[1] == "" { - return "", "", fmt.Errorf("want provider:subject, got %q", s) - } - return parts[0], parts[1], nil -} - -func toIdentifierKey(input string) (string, error) { - if strings.Contains(input, "::") { - return input, nil - } - prov, subj, err := parseIDPair(input) - if err != nil { - return "", err - } - return "raw::" + strings.ToLower(prov) + "." + subj, nil -} - -func waitReady(ctx context.Context, conn *grpc.ClientConn) error { - for { - s := conn.GetState() - if s == connectivity.Ready { - return nil - } - if !conn.WaitForStateChange(ctx, s) { - if ctx.Err() != nil { - return ctx.Err() - } - return fmt.Errorf("WaitForStateChange returned without state change") - } - } -} - -type streamStats struct { - TotalMsgs int64 - TotalBytes int64 - TickMsgs int64 - TickBytes int64 -} - -type stats struct { - TotalMsgs int64 - TotalBytes int64 - ByStream map[string]*streamStats -} - -func main() { - var ids idsFlag - var ctlAddr string - var strAddr string - var timeout time.Duration - var refresh time.Duration - - flag.Var(&ids, "id", "identifier (provider:subject or canonical key); repeatable") - flag.StringVar(&ctlAddr, "ctl", "127.0.0.1:50051", "gRPC control address") - flag.StringVar(&strAddr, "str", "127.0.0.1:50052", "gRPC streaming address") - flag.DurationVar(&timeout, "timeout", 10*time.Second, "start/config/connect timeout") - flag.DurationVar(&refresh, "refresh", 1*time.Second, "dashboard refresh interval") - flag.Parse() - - if len(ids) == 0 { - _, _ = fmt.Fprintln(os.Stderr, "provide at least one --id (provider:subject or canonical key)") - os.Exit(2) - } - - ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) - defer cancel() - - // Control channel - ccCtl, err := grpc.NewClient( - ctlAddr, - grpc.WithTransportCredentials(insecure.NewCredentials()), - ) - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "new control client: %v\n", err) - os.Exit(1) - } - defer ccCtl.Close() - ccCtl.Connect() - - ctlConnCtx, cancelCtlConn := context.WithTimeout(ctx, timeout) - if err := waitReady(ctlConnCtx, ccCtl); err != nil { - cancelCtlConn() - _, _ = fmt.Fprintf(os.Stderr, "connect control: %v\n", err) - os.Exit(1) - } - cancelCtlConn() - - ctl := pb.NewDataServiceControlClient(ccCtl) - - // Start stream - ctxStart, cancelStart := context.WithTimeout(ctx, timeout) - startResp, err := ctl.StartStream(ctxStart, &pb.StartStreamRequest{}) - cancelStart() - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "StartStream: %v\n", err) - os.Exit(1) - } - streamUUID := startResp.GetStreamUuid() - fmt.Printf("stream: %s\n", streamUUID) - - // Configure identifiers - var pbIDs []*pb.Identifier - orderedIDs := make([]string, 0, len(ids)) - for _, s := range ids { - key, err := toIdentifierKey(s) - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "bad --id: %v\n", err) - os.Exit(2) - } - pbIDs = append(pbIDs, &pb.Identifier{Key: key}) - orderedIDs = append(orderedIDs, key) // preserve CLI order - } - - ctxCfg, cancelCfg := context.WithTimeout(ctx, timeout) - _, err = ctl.ConfigureStream(ctxCfg, &pb.ConfigureStreamRequest{ - StreamUuid: streamUUID, - Identifiers: pbIDs, - }) - cancelCfg() - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "ConfigureStream: %v\n", err) - os.Exit(1) - } - fmt.Printf("configured %d identifiers\n", len(pbIDs)) - - // Streaming connection - ccStr, err := grpc.NewClient( - strAddr, - grpc.WithTransportCredentials(insecure.NewCredentials()), - ) - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "new streaming client: %v\n", err) - os.Exit(1) - } - defer ccStr.Close() - ccStr.Connect() - - strConnCtx, cancelStrConn := context.WithTimeout(ctx, timeout) - if err := waitReady(strConnCtx, ccStr); err != nil { - cancelStrConn() - _, _ = fmt.Fprintf(os.Stderr, "connect streaming: %v\n", err) - os.Exit(1) - } - cancelStrConn() - - str := pb.NewDataServiceStreamingClient(ccStr) - - streamCtx, streamCancel := context.WithCancel(ctx) - defer streamCancel() - - srv, err := str.ConnectStream(streamCtx, &pb.ConnectStreamRequest{StreamUuid: streamUUID}) - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "ConnectStream: %v\n", err) - os.Exit(1) - } - fmt.Println("connected; streaming… (Ctrl-C to quit)") - - // Receiver goroutine → channel - type msgWrap struct { - idKey string - size int - err error - } - msgCh := make(chan msgWrap, 1024) - go func() { - for { - m, err := srv.Recv() - if err != nil { - msgCh <- msgWrap{err: err} - close(msgCh) - return - } - id := m.GetIdentifier().GetKey() - msgCh <- msgWrap{idKey: id, size: len(m.GetPayload())} - } - }() - - // Stats and dashboard - st := &stats{ByStream: make(map[string]*streamStats)} - seen := make(map[string]bool, len(orderedIDs)) - for _, id := range orderedIDs { - seen[id] = true - } - tick := time.NewTicker(refresh) - defer tick.Stop() - - clear := func() { fmt.Print("\033[H\033[2J") } - header := func() { - fmt.Printf("stream: %s now: %s refresh: %s\n", - streamUUID, time.Now().Format(time.RFC3339), refresh) - fmt.Println("--------------------------------------------------------------------------------------") - fmt.Printf("%-56s %10s %14s %12s %16s\n", "identifier", "msgs/s", "bytes/s", "total", "total_bytes") - fmt.Println("--------------------------------------------------------------------------------------") - } - - printAndReset := func() { - clear() - header() - - var totMsgsPS, totBytesPS float64 - for _, id := range orderedIDs { - s, ok := st.ByStream[id] - var msgsPS, bytesPS float64 - var totMsgs, totBytes int64 - if ok { - // Convert window counts into per-second rates. - msgsPS = float64(atomic.SwapInt64(&s.TickMsgs, 0)) / refresh.Seconds() - bytesPS = float64(atomic.SwapInt64(&s.TickBytes, 0)) / refresh.Seconds() - totMsgs = atomic.LoadInt64(&s.TotalMsgs) - totBytes = atomic.LoadInt64(&s.TotalBytes) - } - totMsgsPS += msgsPS - totBytesPS += bytesPS - fmt.Printf("%-56s %10d %14d %12d %16d\n", - id, - int64(math.Round(msgsPS)), - int64(math.Round(bytesPS)), - totMsgs, - totBytes, - ) - } - - fmt.Println("--------------------------------------------------------------------------------------") - fmt.Printf("%-56s %10d %14d %12d %16d\n", - "TOTAL", - int64(math.Round(totMsgsPS)), - int64(math.Round(totBytesPS)), - atomic.LoadInt64(&st.TotalMsgs), - atomic.LoadInt64(&st.TotalBytes), - ) - } - - for { - select { - case <-ctx.Done(): - fmt.Println("\nshutting down") - return - - case <-tick.C: - printAndReset() - - case mw, ok := <-msgCh: - if !ok { - return - } - if mw.err != nil { - if ctx.Err() != nil { - return - } - _, _ = fmt.Fprintf(os.Stderr, "recv: %v\n", mw.err) - os.Exit(1) - } - - // Maintain stable order: append new identifiers at first sight. - if !seen[mw.idKey] { - seen[mw.idKey] = true - orderedIDs = append(orderedIDs, mw.idKey) - } - - // Account - atomic.AddInt64(&st.TotalMsgs, 1) - atomic.AddInt64(&st.TotalBytes, int64(mw.size)) - - ss := st.ByStream[mw.idKey] - if ss == nil { - ss = &streamStats{} - st.ByStream[mw.idKey] = ss - } - atomic.AddInt64(&ss.TotalMsgs, 1) - atomic.AddInt64(&ss.TotalBytes, int64(mw.size)) - atomic.AddInt64(&ss.TickMsgs, 1) - atomic.AddInt64(&ss.TickBytes, int64(mw.size)) - } - } -} diff --git a/services/data_service/cmd/stream_tap_v2/main.go b/services/data_service/cmd/stream_tap_v2/main.go deleted file mode 100644 index e2e9941..0000000 --- a/services/data_service/cmd/stream_tap_v2/main.go +++ /dev/null @@ -1,331 +0,0 @@ -package main - -import ( - "bufio" - "context" - "encoding/binary" - "flag" - "fmt" - "io" - "math" - "net" - "os" - "os/signal" - "strings" - "sync/atomic" - "syscall" - "time" - - pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service" - "google.golang.org/grpc" - "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/protobuf/proto" -) - -type idsFlag []string - -func (i *idsFlag) String() string { return strings.Join(*i, ",") } -func (i *idsFlag) Set(v string) error { - if v == "" { - return nil - } - *i = append(*i, v) - return nil -} - -func parseIDPair(s string) (provider, subject string, err error) { - parts := strings.SplitN(s, ":", 2) - if len(parts) != 2 || parts[0] == "" || parts[1] == "" { - return "", "", fmt.Errorf("want provider:subject, got %q", s) - } - return parts[0], parts[1], nil -} - -func toIdentifierKey(input string) (string, error) { - if strings.Contains(input, "::") { - return input, nil - } - prov, subj, err := parseIDPair(input) - if err != nil { - return "", err - } - return "raw::" + strings.ToLower(prov) + "." + subj, nil -} - -func waitReady(ctx context.Context, conn *grpc.ClientConn) error { - for { - s := conn.GetState() - if s == connectivity.Ready { - return nil - } - if !conn.WaitForStateChange(ctx, s) { - if ctx.Err() != nil { - return ctx.Err() - } - return fmt.Errorf("WaitForStateChange returned without state change") - } - } -} - -type streamStats struct { - TotalMsgs int64 - TotalBytes int64 - TickMsgs int64 - TickBytes int64 -} - -type stats struct { - TotalMsgs int64 - TotalBytes int64 - ByStream map[string]*streamStats -} - -func main() { - var ids idsFlag - var ctlAddr string - var strAddr string - var timeout time.Duration - var refresh time.Duration - - flag.Var(&ids, "id", "identifier (provider:subject or canonical key); repeatable") - flag.StringVar(&ctlAddr, "ctl", "127.0.0.1:50051", "gRPC control address") - flag.StringVar(&strAddr, "str", "127.0.0.1:50060", "socket streaming address host:port") - flag.DurationVar(&timeout, "timeout", 10*time.Second, "start/config/connect timeout") - flag.DurationVar(&refresh, "refresh", 1*time.Second, "dashboard refresh interval") - flag.Parse() - - if len(ids) == 0 { - _, _ = fmt.Fprintln(os.Stderr, "provide at least one --id (provider:subject or canonical key)") - os.Exit(2) - } - - ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) - defer cancel() - - // Control channel - ccCtl, err := grpc.NewClient( - ctlAddr, - grpc.WithTransportCredentials(insecure.NewCredentials()), - ) - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "new control client: %v\n", err) - os.Exit(1) - } - defer ccCtl.Close() - ccCtl.Connect() - - ctlConnCtx, cancelCtlConn := context.WithTimeout(ctx, timeout) - if err := waitReady(ctlConnCtx, ccCtl); err != nil { - cancelCtlConn() - _, _ = fmt.Fprintf(os.Stderr, "connect control: %v\n", err) - os.Exit(1) - } - cancelCtlConn() - - ctl := pb.NewDataServiceControlClient(ccCtl) - - // Start stream - ctxStart, cancelStart := context.WithTimeout(ctx, timeout) - startResp, err := ctl.StartStream(ctxStart, &pb.StartStreamRequest{}) - cancelStart() - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "StartStream: %v\n", err) - os.Exit(1) - } - streamUUID := startResp.GetStreamUuid() - fmt.Printf("stream: %s\n", streamUUID) - - // Configure identifiers - var pbIDs []*pb.Identifier - orderedIDs := make([]string, 0, len(ids)) - for _, s := range ids { - key, err := toIdentifierKey(s) - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "bad --id: %v\n", err) - os.Exit(2) - } - pbIDs = append(pbIDs, &pb.Identifier{Key: key}) - orderedIDs = append(orderedIDs, key) - } - - ctxCfg, cancelCfg := context.WithTimeout(ctx, timeout) - _, err = ctl.ConfigureStream(ctxCfg, &pb.ConfigureStreamRequest{ - StreamUuid: streamUUID, - Identifiers: pbIDs, - }) - cancelCfg() - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "ConfigureStream: %v\n", err) - os.Exit(1) - } - fmt.Printf("configured %d identifiers\n", len(pbIDs)) - - // Socket streaming connection - d := net.Dialer{Timeout: timeout, KeepAlive: 30 * time.Second} - conn, err := d.DialContext(ctx, "tcp", strAddr) - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "dial socket: %v\n", err) - os.Exit(1) - } - defer conn.Close() - - if tc, ok := conn.(*net.TCPConn); ok { - _ = tc.SetNoDelay(true) - _ = tc.SetWriteBuffer(512 * 1024) - _ = tc.SetReadBuffer(512 * 1024) - _ = tc.SetKeepAlive(true) - _ = tc.SetKeepAlivePeriod(30 * time.Second) - } - - // Send the stream UUID followed by '\n' per socket server contract. - if _, err := io.WriteString(conn, streamUUID+"\n"); err != nil { - _, _ = fmt.Fprintf(os.Stderr, "send stream UUID: %v\n", err) - os.Exit(1) - } - fmt.Println("connected; streaming… (Ctrl-C to quit)") - - // Receiver goroutine → channel - type msgWrap struct { - idKey string - size int - err error - } - msgCh := make(chan msgWrap, 1024) - - go func() { - defer close(msgCh) - r := bufio.NewReaderSize(conn, 256*1024) - var hdr [4]byte - for { - if err := conn.SetReadDeadline(time.Now().Add(120 * time.Second)); err != nil { - msgCh <- msgWrap{err: err} - return - } - - if _, err := io.ReadFull(r, hdr[:]); err != nil { - msgCh <- msgWrap{err: err} - return - } - n := binary.BigEndian.Uint32(hdr[:]) - if n == 0 || n > 64*1024*1024 { - msgCh <- msgWrap{err: fmt.Errorf("invalid frame length: %d", n)} - return - } - buf := make([]byte, n) - if _, err := io.ReadFull(r, buf); err != nil { - msgCh <- msgWrap{err: err} - return - } - - var m pb.Message - if err := proto.Unmarshal(buf, &m); err != nil { - msgCh <- msgWrap{err: fmt.Errorf("unmarshal: %w", err)} - return - } - id := m.GetIdentifier().GetKey() - msgCh <- msgWrap{idKey: id, size: len(m.GetPayload())} - } - }() - - // Stats and dashboard - st := &stats{ByStream: make(map[string]*streamStats)} - seen := make(map[string]bool, len(orderedIDs)) - for _, id := range orderedIDs { - seen[id] = true - } - tick := time.NewTicker(refresh) - defer tick.Stop() - - clear := func() { fmt.Print("\033[H\033[2J") } - header := func() { - fmt.Printf("stream: %s now: %s refresh: %s\n", - streamUUID, time.Now().Format(time.RFC3339), refresh) - fmt.Println("--------------------------------------------------------------------------------------") - fmt.Printf("%-56s %10s %14s %12s %16s\n", "identifier", "msgs/s", "bytes/s", "total", "total_bytes") - fmt.Println("--------------------------------------------------------------------------------------") - } - - printAndReset := func() { - clear() - header() - - var totMsgsPS, totBytesPS float64 - for _, id := range orderedIDs { - s, ok := st.ByStream[id] - var msgsPS, bytesPS float64 - var totMsgs, totBytes int64 - if ok { - msgsPS = float64(atomic.SwapInt64(&s.TickMsgs, 0)) / refresh.Seconds() - bytesPS = float64(atomic.SwapInt64(&s.TickBytes, 0)) / refresh.Seconds() - totMsgs = atomic.LoadInt64(&s.TotalMsgs) - totBytes = atomic.LoadInt64(&s.TotalBytes) - } - totMsgsPS += msgsPS - totBytesPS += bytesPS - fmt.Printf("%-56s %10d %14d %12d %16d\n", - id, - int64(math.Round(msgsPS)), - int64(math.Round(bytesPS)), - totMsgs, - totBytes, - ) - } - - fmt.Println("--------------------------------------------------------------------------------------") - fmt.Printf("%-56s %10d %14d %12d %16d\n", - "TOTAL", - int64(math.Round(totMsgsPS)), - int64(math.Round(totBytesPS)), - atomic.LoadInt64(&st.TotalMsgs), - atomic.LoadInt64(&st.TotalBytes), - ) - } - - for { - select { - case <-ctx.Done(): - fmt.Println("\nshutting down") - return - - case <-tick.C: - printAndReset() - - case mw, ok := <-msgCh: - if !ok { - return - } - if mw.err != nil { - if ctx.Err() != nil { - return - } - if ne, ok := mw.err.(net.Error); ok && ne.Timeout() { - _, _ = fmt.Fprintln(os.Stderr, "recv timeout") - } else if mw.err == io.EOF { - _, _ = fmt.Fprintln(os.Stderr, "server closed stream") - } else { - _, _ = fmt.Fprintf(os.Stderr, "recv: %v\n", mw.err) - } - os.Exit(1) - } - - if !seen[mw.idKey] { - seen[mw.idKey] = true - orderedIDs = append(orderedIDs, mw.idKey) - } - - atomic.AddInt64(&st.TotalMsgs, 1) - atomic.AddInt64(&st.TotalBytes, int64(mw.size)) - - ss := st.ByStream[mw.idKey] - if ss == nil { - ss = &streamStats{} - st.ByStream[mw.idKey] = ss - } - atomic.AddInt64(&ss.TotalMsgs, 1) - atomic.AddInt64(&ss.TotalBytes, int64(mw.size)) - atomic.AddInt64(&ss.TickMsgs, 1) - atomic.AddInt64(&ss.TickBytes, int64(mw.size)) - } - } -} diff --git a/services/data_service/internal/domain/identifier.go b/services/data_service/internal/domain/identifier.go index d6b75da..3e62b55 100644 --- a/services/data_service/internal/domain/identifier.go +++ b/services/data_service/internal/domain/identifier.go @@ -1,4 +1,3 @@ -// Package domain defines external message identifiers. package domain import ( @@ -9,56 +8,96 @@ import ( var ErrBadIdentifier = errors.New("identifier: invalid format") -// Identifier is a lightweight wrapper around the canonical key. +// Identifier is an immutable canonical key. +// Canonical form: "namespace::tag1[] . tag2[k=v;foo=bar] . tag3[]" type Identifier struct{ key string } -// NewIdentifier builds a canonical key: "namespace::l1.l2[param=v;...] .l3". -// Labels and params are sorted for determinism. -func NewIdentifier(namespace string, labels map[string]map[string]string) Identifier { +// NewIdentifier builds a canonical key with strict validation. +// Tags and param keys are sorted. Tags with no params emit "name[]". +// Rejects on: empty namespace, bad tag names, bad keys/values. +func NewIdentifier(namespace string, tags map[string]map[string]string) (Identifier, error) { + ns := strings.TrimSpace(namespace) + if !validNamespace(ns) { + return Identifier{}, ErrBadIdentifier + } + + // Validate and copy to protect immutability and reject dup keys early. + clean := make(map[string]map[string]string, len(tags)) + for name, params := range tags { + n := strings.TrimSpace(name) + if !validIDTagName(n) { + return Identifier{}, ErrBadIdentifier + } + if _, exists := clean[n]; exists { + // impossible via map input, but keep the intent explicit + return Identifier{}, ErrBadIdentifier + } + if len(params) == 0 { + clean[n] = map[string]string{} + continue + } + dst := make(map[string]string, len(params)) + for k, v := range params { + kk := strings.TrimSpace(k) + vv := strings.TrimSpace(v) + if !validParamKey(kk) || !validIDParamValue(vv) { + return Identifier{}, ErrBadIdentifier + } + if _, dup := dst[kk]; dup { + return Identifier{}, ErrBadIdentifier + } + dst[kk] = vv + } + clean[n] = dst + } + var b strings.Builder - // rough prealloc: ns + "::" + avg label + some params - b.Grow(len(namespace) + 2 + 10*len(labels) + 20) + // Rough capacity hint. + b.Grow(len(ns) + 2 + 16*len(clean) + 32) // namespace - b.WriteString(namespace) + b.WriteString(ns) b.WriteString("::") - // sort label names for stable output - labelNames := make([]string, 0, len(labels)) - for name := range labels { - labelNames = append(labelNames, name) + // stable tag order + names := make([]string, 0, len(clean)) + for n := range clean { + names = append(names, n) } - sort.Strings(labelNames) + sort.Strings(names) - for i, name := range labelNames { + for i, name := range names { if i > 0 { b.WriteByte('.') } b.WriteString(name) - // params (sorted) - params := labels[name] - if len(params) > 0 { - keys := make([]string, 0, len(params)) - for k := range params { - keys = append(keys, k) - } - sort.Strings(keys) - - b.WriteByte('[') - for j, k := range keys { - if j > 0 { - b.WriteByte(';') - } - b.WriteString(k) - b.WriteByte('=') - b.WriteString(params[k]) - } - b.WriteByte(']') + params := clean[name] + if len(params) == 0 { + b.WriteString("[]") + continue } + + // stable param order + keys := make([]string, 0, len(params)) + for k := range params { + keys = append(keys, k) + } + sort.Strings(keys) + + b.WriteByte('[') + for j, k := range keys { + if j > 0 { + b.WriteByte(';') + } + b.WriteString(k) + b.WriteByte('=') + b.WriteString(params[k]) + } + b.WriteByte(']') } - return Identifier{key: b.String()} + return Identifier{key: b.String()}, nil } // NewIdentifierFromRaw wraps a raw key without validation. @@ -67,65 +106,156 @@ func NewIdentifierFromRaw(raw string) Identifier { return Identifier{key: raw} } // Key returns the canonical key string. func (id Identifier) Key() string { return id.key } -// Parse returns namespace and labels from Key. -// Format: "namespace::label1.label2[param=a;foo=bar].label3" +// Parse returns namespace and tags from Key. +// Accepts "tag" (bare) as "tag[]". Emits "name[]"/"[k=v;...]". First token wins on duplicates. func (id Identifier) Parse() (string, map[string]map[string]string, error) { k := id.key + + // namespace i := strings.Index(k, "::") if i <= 0 { return "", nil, ErrBadIdentifier } - ns := k[:i] - if ns == "" { + ns := strings.TrimSpace(k[:i]) + if !validNamespace(ns) { return "", nil, ErrBadIdentifier } raw := k[i+2:] - lbls := make(map[string]map[string]string, 8) + tags := make(map[string]map[string]string, 8) if raw == "" { - return ns, lbls, nil + return ns, tags, nil } for tok := range strings.SplitSeq(raw, ".") { + tok = strings.TrimSpace(tok) if tok == "" { continue } - name, params, err := parseLabel(tok) - if err != nil || name == "" { - return "", nil, ErrBadIdentifier - } - lbls[name] = params - } - return ns, lbls, nil -} -// parseLabel parses "name" or "name[k=v;...]" into (name, params). -func parseLabel(tok string) (string, map[string]string, error) { - lb := strings.IndexByte(tok, '[') - if lb == -1 { - return tok, map[string]string{}, nil - } - rb := strings.LastIndexByte(tok, ']') - if rb == -1 || rb < lb { - return "", nil, ErrBadIdentifier - } - - name := tok[:lb] - paramStr := tok[lb+1 : rb] - params := map[string]string{} - if paramStr == "" { - return name, params, nil - } - - for pair := range strings.SplitSeq(paramStr, ";") { - if pair == "" { + lb := strings.IndexByte(tok, '[') + if lb == -1 { + // bare tag => empty params + name := strings.TrimSpace(tok) + if !validIDTagName(name) { + return "", nil, ErrBadIdentifier + } + if _, exists := tags[name]; !exists { + tags[name] = map[string]string{} + } continue } - kv := strings.SplitN(pair, "=", 2) - if len(kv) != 2 || kv[0] == "" { + + rb := strings.LastIndexByte(tok, ']') + if rb == -1 || rb < lb || rb != len(tok)-1 { return "", nil, ErrBadIdentifier } - params[kv[0]] = kv[1] + + name := strings.TrimSpace(tok[:lb]) + if !validIDTagName(name) { + return "", nil, ErrBadIdentifier + } + // first tag wins + if _, exists := tags[name]; exists { + continue + } + + body := tok[lb+1 : rb] + // forbid outer whitespace like "[ x=1 ]" + if body != strings.TrimSpace(body) { + return "", nil, ErrBadIdentifier + } + if body == "" { + tags[name] = map[string]string{} + continue + } + + // parse "k=v;foo=bar" + params := make(map[string]string, 4) + for pair := range strings.SplitSeq(body, ";") { + pair = strings.TrimSpace(pair) + if pair == "" { + continue + } + kv := strings.SplitN(pair, "=", 2) + if len(kv) != 2 { + return "", nil, ErrBadIdentifier + } + key := strings.TrimSpace(kv[0]) + val := strings.TrimSpace(kv[1]) + if !validParamKey(key) || !validIDParamValue(val) || val == "" { + return "", nil, ErrBadIdentifier + } + // first key wins + if _, dup := params[key]; !dup { + params[key] = val + } + } + tags[name] = params } - return name, params, nil + + return ns, tags, nil } + +// --- validation helpers --- + +func validNamespace(s string) bool { + if s == "" { + return false + } + for _, r := range s { + switch r { + case '[', ']', ':': + return false + } + if isSpace(r) { + return false + } + } + return true +} + +func validIDTagName(s string) bool { + if s == "" { + return false + } + for _, r := range s { + switch r { + case '[', ']', '.', ':': // added ':' + return false + } + if isSpace(r) { + return false + } + } + return true +} + +func validParamKey(s string) bool { + if s == "" { + return false + } + for _, r := range s { + switch r { + case '[', ']', ';', '=': + return false + } + if isSpace(r) { + return false + } + } + return true +} + +func validIDParamValue(s string) bool { + // allow spaces; forbid only bracket, pair, and kv delimiters + for _, r := range s { + switch r { + case '[', ']', ';', '=': + return false + } + } + return true +} + +func isSpace(r rune) bool { return r == ' ' || r == '\t' || r == '\n' || r == '\r' } diff --git a/services/data_service/internal/domain/identifier_test.go b/services/data_service/internal/domain/identifier_test.go new file mode 100644 index 0000000..130cff5 --- /dev/null +++ b/services/data_service/internal/domain/identifier_test.go @@ -0,0 +1,141 @@ +package domain + +import ( + "reflect" + "testing" +) + +func TestNewIdentifier_CanonicalAndValidation(t *testing.T) { + t.Run("canonical ordering and formatting", func(t *testing.T) { + id, err := NewIdentifier("ns", map[string]map[string]string{ + "b": {"y": "2", "x": "1"}, + "a": {}, + }) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + got := id.Key() + want := "ns::a[].b[x=1;y=2]" + if got != want { + t.Fatalf("key mismatch\ngot: %q\nwant: %q", got, want) + } + }) + + t.Run("trim whitespace and validate", func(t *testing.T) { + id, err := NewIdentifier(" ns ", map[string]map[string]string{ + " tag ": {" k ": " v "}, + }) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if id.Key() != "ns::tag[k=v]" { + t.Fatalf("unexpected canonical: %q", id.Key()) + } + }) + + t.Run("reject bad namespace", func(t *testing.T) { + cases := []string{"", "a:b", "a[b]"} + for _, ns := range cases { + if _, err := NewIdentifier(ns, nil); err == nil { + t.Fatalf("expected error for ns=%q", ns) + } + } + }) + + t.Run("reject bad tag names", func(t *testing.T) { + for _, name := range []string{"", "bad.", "bad[", "bad]", "a:b"} { + _, err := NewIdentifier("ns", map[string]map[string]string{ + name: {}, + }) + if err == nil { + t.Fatalf("expected error for tag name %q", name) + } + } + }) + + t.Run("reject bad param keys and values", func(t *testing.T) { + badKeys := []string{"", "k;", "k[", "k]", "k="} + for _, k := range badKeys { + if _, err := NewIdentifier("ns", map[string]map[string]string{ + "t": {k: "ok"}, + }); err == nil { + t.Fatalf("expected error for bad key %q", k) + } + } + for _, v := range []string{"bad;", "bad[", "bad]", "a=b"} { + if _, err := NewIdentifier("ns", map[string]map[string]string{ + "t": {"k": v}, + }); err == nil { + t.Fatalf("expected error for bad value %q", v) + } + } + }) +} + +func TestIdentifier_Parse_RoundTripAndTolerance(t *testing.T) { + t.Run("round trip from constructor", func(t *testing.T) { + id, err := NewIdentifier("ns", map[string]map[string]string{ + "a": {}, + "b": {"x": "1", "y": "2"}, + }) + if err != nil { + t.Fatal(err) + } + ns, tags, err := id.Parse() + if err != nil { + t.Fatal(err) + } + if ns != "ns" { + t.Fatalf("ns: got %q", ns) + } + want := map[string]map[string]string{"a": {}, "b": {"x": "1", "y": "2"}} + if !reflect.DeepEqual(tags, want) { + t.Fatalf("tags mismatch\ngot: %#v\nwant: %#v", tags, want) + } + }) + + t.Run("parse bare tag as empty params", func(t *testing.T) { + id := NewIdentifierFromRaw("ns::a.b[]") + _, tags, err := id.Parse() + if err != nil { + t.Fatal(err) + } + if len(tags["a"]) != 0 || len(tags["b"]) != 0 { + t.Fatalf("expected empty params, got %#v", tags) + } + }) + + t.Run("first token wins on duplicate tags and params", func(t *testing.T) { + id := NewIdentifierFromRaw("ns::t[x=1;y=2].t[x=9].u[k=1;k=2]") + _, tags, err := id.Parse() + if err != nil { + t.Fatal(err) + } + if tags["t"]["x"] != "1" || tags["t"]["y"] != "2" { + t.Fatalf("first tag should win, got %#v", tags["t"]) + } + if tags["u"]["k"] != "1" { + t.Fatalf("first param key should win, got %#v", tags["u"]) + } + }) + + t.Run("reject malformed", func(t *testing.T) { + bads := []string{ + "", "no_ns", "ns:onecolon", "::missingns::tag[]", "ns::tag[", "ns::tag]", "ns::[]", + "ns::tag[]junk", "ns::tag[x=1;y]", "ns::tag[=1]", "ns::tag[ x=1 ]", // spaces inside keys are rejected + } + for _, s := range bads { + _, _, err := NewIdentifierFromRaw(s).Parse() + if err == nil { + t.Fatalf("expected parse error for %q", s) + } + } + }) +} + +func TestIdentifier_NewThenParse_ForbidsColonInTagName(t *testing.T) { + _, err := NewIdentifier("ns", map[string]map[string]string{"a:b": {}}) + if err == nil { + t.Fatal("expected error due to ':' in tag name") + } +} diff --git a/services/data_service/internal/domain/message.go b/services/data_service/internal/domain/message.go index 516260d..11ee6fc 100644 --- a/services/data_service/internal/domain/message.go +++ b/services/data_service/internal/domain/message.go @@ -1,3 +1,4 @@ +// Package domain contains all key domain types package domain type Message struct { diff --git a/services/data_service/internal/domain/pattern.go b/services/data_service/internal/domain/pattern.go index 333434d..15aa013 100644 --- a/services/data_service/internal/domain/pattern.go +++ b/services/data_service/internal/domain/pattern.go @@ -1,45 +1,114 @@ package domain import ( + "errors" "sort" "strings" ) -type Pattern struct { - Namespace string - Labels map[string]map[string]string - Exact bool +var ErrBadPattern = errors.New("pattern: invalid format") + +// ParamMatchKind selects how a tag's params must match. +type ParamMatchKind uint8 + +const ( + MatchAny ParamMatchKind = iota // "tag" or "tag[*]" + MatchNone // "tag[]" + MatchExact // "tag[k=v;...]" +) + +// TagSpec is the per-tag constraint. +type TagSpec struct { + Kind ParamMatchKind + Params map[string]string // only for MatchExact; keys sorted on emit } -// Canonical returns a canonical string representation of the Pattern struct -// TODO: Ensure labels and namespaces are set to lowercase -func (p *Pattern) Canonical() string { - var b strings.Builder - b.Grow(len(p.Namespace) + 10*len(p.Labels) + 20) // preallocate a rough size estimate +// Pattern is an immutable canonical key. +// Canonical form (tags unordered in input, sorted on emit): +// +// namespace::tag1[]:tag2[*]:tag3[k=v;foo=bar].* // superset +// namespace::tag1[]:tag2[*]:tag3[k=v;foo=bar] // exact set +type Pattern struct{ key string } - b.WriteString(p.Namespace) +// NewPattern builds the canonical key from structured input with strict validation. +// If a tag name equals "*" it sets superset and omits it from canonical tags. +func NewPattern(namespace string, tags map[string]TagSpec, superset bool) (Pattern, error) { + ns := strings.TrimSpace(namespace) + if !validNamespace(ns) { + return Pattern{}, ErrBadPattern + } + + // Validate tags and normalize. + clean := make(map[string]TagSpec, len(tags)) + for name, spec := range tags { + n := strings.TrimSpace(name) + if n == "*" { + superset = true + continue + } + if !validPatternTagName(n) { + return Pattern{}, ErrBadPattern + } + switch spec.Kind { + case MatchAny: + clean[n] = TagSpec{Kind: MatchAny} + case MatchNone: + clean[n] = TagSpec{Kind: MatchNone} + case MatchExact: + if len(spec.Params) == 0 { + // Treat empty exact as none. + clean[n] = TagSpec{Kind: MatchNone} + continue + } + dst := make(map[string]string, len(spec.Params)) + for k, v := range spec.Params { + kk := strings.TrimSpace(k) + vv := strings.TrimSpace(v) + if !validParamKey(kk) || !validPatternParamValue(vv) { + return Pattern{}, ErrBadPattern + } + if _, dup := dst[kk]; dup { + return Pattern{}, ErrBadPattern + } + dst[kk] = vv + } + clean[n] = TagSpec{Kind: MatchExact, Params: dst} + default: + // Reject unknown kinds rather than silently defaulting. + return Pattern{}, ErrBadPattern + } + } + + var b strings.Builder + b.Grow(len(ns) + 2 + 16*len(clean) + 32 + 2) + + b.WriteString(ns) b.WriteString("::") - labelNames := make([]string, 0, len(p.Labels)) - for name := range p.Labels { - labelNames = append(labelNames, name) + names := make([]string, 0, len(clean)) + for n := range clean { + names = append(names, n) } - sort.Strings(labelNames) // sort the labels for determinism + sort.Strings(names) - for i, name := range labelNames { + for i, name := range names { if i > 0 { - b.WriteByte('|') + b.WriteByte(':') } b.WriteString(name) - params := p.Labels[name] - if len(params) > 0 { - keys := make([]string, 0, len(params)) - for k := range params { + spec := clean[name] + switch spec.Kind { + case MatchAny: + b.WriteString("[*]") + case MatchNone: + b.WriteString("[]") + case MatchExact: + keys := make([]string, 0, len(spec.Params)) + for k := range spec.Params { keys = append(keys, k) } - sort.Strings(keys) // sort params for determinism - + sort.Strings(keys) b.WriteByte('[') for j, k := range keys { if j > 0 { @@ -47,48 +116,232 @@ func (p *Pattern) Canonical() string { } b.WriteString(k) b.WriteByte('=') - b.WriteString(params[k]) + b.WriteString(spec.Params[k]) } b.WriteByte(']') } } - b.WriteString("::") - if p.Exact { - b.WriteString("t") - } else { - b.WriteString("f") + if superset { + if len(names) > 0 { + b.WriteByte('.') + } + b.WriteByte('*') } - - return b.String() + return Pattern{key: b.String()}, nil } -// Satisfies checks if a domain.Identifier satisfies the pattern. -func (p *Pattern) Satisfies(id Identifier) bool { - ns, idLabels, err := id.Parse() - if err != nil || ns != p.Namespace { +// NewPatternFromRaw wraps a raw key without validation. +func NewPatternFromRaw(raw string) Pattern { return Pattern{key: raw} } + +// Key returns the canonical key string. +func (p Pattern) Key() string { return p.key } + +// Parse returns namespace, tag specs, and superset flag. +// Accepts tokens: "tag", "tag[*]", "tag[]", "tag[k=v;...]". Also accepts ".*" suffix or a ":*" token anywhere. +// First token wins on duplicate tag names; first key wins on duplicate params. +func (p Pattern) Parse() (string, map[string]TagSpec, bool, error) { + k := p.key + + // namespace + i := strings.Index(k, "::") + if i <= 0 { + return "", nil, false, ErrBadPattern + } + ns := strings.TrimSpace(k[:i]) + if !validNamespace(ns) { + return "", nil, false, ErrBadPattern + } + raw := k[i+2:] + + // suffix superset ".*" + superset := false + if strings.HasSuffix(raw, ".*") { + superset = true + raw = raw[:len(raw)-2] + } + + specs := make(map[string]TagSpec, 8) + if raw == "" { + return ns, specs, superset, nil + } + + for tok := range strings.SplitSeq(raw, ":") { + tok = strings.TrimSpace(tok) + if tok == "" { + continue + } + if tok == "*" { + superset = true + continue + } + + lb := strings.IndexByte(tok, '[') + if lb == -1 { + name := tok + if !validPatternTagName(name) { + return "", nil, false, ErrBadPattern + } + if _, exists := specs[name]; !exists { + specs[name] = TagSpec{Kind: MatchAny} + } + continue + } + + rb := strings.LastIndexByte(tok, ']') + if rb == -1 || rb < lb || rb != len(tok)-1 { + return "", nil, false, ErrBadPattern + } + + name := strings.TrimSpace(tok[:lb]) + if !validPatternTagName(name) { + return "", nil, false, ErrBadPattern + } + // first tag wins + if _, exists := specs[name]; exists { + continue + } + + rawBody := tok[lb+1 : rb] + // forbid outer whitespace like "[ x=1 ]" + if rawBody != strings.TrimSpace(rawBody) { + return "", nil, false, ErrBadPattern + } + body := strings.TrimSpace(rawBody) + + switch body { + case "": + specs[name] = TagSpec{Kind: MatchNone} + case "*": + specs[name] = TagSpec{Kind: MatchAny} + default: + params := make(map[string]string, 4) + for pair := range strings.SplitSeq(body, ";") { + pair = strings.TrimSpace(pair) + if pair == "" { + continue + } + kv := strings.SplitN(pair, "=", 2) + if len(kv) != 2 { + return "", nil, false, ErrBadPattern + } + key := strings.TrimSpace(kv[0]) + val := strings.TrimSpace(kv[1]) + if !validParamKey(key) || !validPatternParamValue(val) || val == "" { + return "", nil, false, ErrBadPattern + } + // first key wins + if _, dup := params[key]; !dup { + params[key] = val + } + } + specs[name] = TagSpec{Kind: MatchExact, Params: params} + } + } + + return ns, specs, superset, nil +} + +// Equal compares canonical keys. +func (p Pattern) Equal(q Pattern) bool { return p.key == q.key } + +// CompiledPattern is a parsed pattern optimized for matching. +type CompiledPattern struct { + ns string + superset bool + specs map[string]TagSpec +} + +// Compile parses and returns a compiled form. +func (p Pattern) Compile() (CompiledPattern, error) { + ns, specs, sup, err := p.Parse() + if err != nil { + return CompiledPattern{}, err + } + return CompiledPattern{ns: ns, specs: specs, superset: sup}, nil +} + +// Parse on CompiledPattern returns the structured contents without error. +func (cp CompiledPattern) Parse() (namespace string, tags map[string]TagSpec, superset bool) { + return cp.ns, cp.specs, cp.superset +} + +// Match parses id and tests it against the pattern. +// Returns false on parse error. +func (p Pattern) Match(id Identifier) bool { + cp, err := p.Compile() + if err != nil { + return false + } + return cp.Match(id) +} + +// Match tests id against the compiled pattern. +func (cp CompiledPattern) Match(id Identifier) bool { + ns, tags, err := id.Parse() + if err != nil || ns != cp.ns { return false } - // Every pattern label must be present in the identifier. - for lname, wantParams := range p.Labels { - haveParams, ok := idLabels[lname] + // All pattern tags must be satisfied. + for name, spec := range cp.specs { + params, ok := tags[name] if !ok { return false } - // If pattern specifies params, they must be a subset of identifier's params. - for k, v := range wantParams { - hv, ok := haveParams[k] - if !ok || hv != v { + switch spec.Kind { + case MatchAny: + // any or none is fine + case MatchNone: + if len(params) != 0 { return false } + case MatchExact: + if len(params) != len(spec.Params) { + return false + } + for k, v := range spec.Params { + if params[k] != v { + return false + } + } + default: + return false } - // If pattern has no params for this label, it matches any/none params in the identifier. } - // Exact applies to label names only: no extras allowed. - if p.Exact && len(idLabels) != len(p.Labels) { + // If exact-set match, forbid extra tags. + if !cp.superset && len(tags) != len(cp.specs) { return false } return true } + +// --- validation helpers --- + +func validPatternTagName(s string) bool { + if s == "" || s == "*" { + return false + } + for _, r := range s { + switch r { + case '[', ']', ':': + return false + } + if isSpace(r) { + return false + } + } + return true +} + +func validPatternParamValue(s string) bool { + // allow spaces; forbid only bracket, pair, and kv delimiters + for _, r := range s { + switch r { + case '[', ']', ';', '=': + return false + } + } + return true +} diff --git a/services/data_service/internal/domain/pattern_test.go b/services/data_service/internal/domain/pattern_test.go new file mode 100644 index 0000000..2132212 --- /dev/null +++ b/services/data_service/internal/domain/pattern_test.go @@ -0,0 +1,209 @@ +package domain + +import ( + "reflect" + "testing" +) + +func TestNewPattern_Canonical_And_Superset(t *testing.T) { + t.Run("canonical ordering and kinds", func(t *testing.T) { + p, err := NewPattern("ns", map[string]TagSpec{ + "b": {Kind: MatchExact, Params: map[string]string{"y": "2", "x": "1"}}, + "a": {Kind: MatchNone}, + "c": {Kind: MatchAny}, + }, false) + if err != nil { + t.Fatal(err) + } + if got, want := p.Key(), "ns::a[]:b[x=1;y=2]:c[*]"; got != want { + t.Fatalf("got %q want %q", got, want) + } + }) + + t.Run("superset via flag", func(t *testing.T) { + p, err := NewPattern("ns", map[string]TagSpec{"a": {Kind: MatchNone}}, true) + if err != nil { + t.Fatal(err) + } + if got, want := p.Key(), "ns::a[].*"; got != want { + t.Fatalf("got %q want %q", got, want) + } + }) + + t.Run("superset via '*' tag anywhere", func(t *testing.T) { + p, err := NewPattern("ns", map[string]TagSpec{ + "*": {Kind: MatchAny}, // triggers superset; omitted from canonical + "a": {Kind: MatchNone}, + }, false) + if err != nil { + t.Fatal(err) + } + if got, want := p.Key(), "ns::a[].*"; got != want { + t.Fatalf("got %q want %q", got, want) + } + }) + + t.Run("trim and validate", func(t *testing.T) { + p, err := NewPattern(" ns ", map[string]TagSpec{ + " tag ": {Kind: MatchAny}, + }, false) + if err != nil { + t.Fatal(err) + } + if p.Key() != "ns::tag[*]" { + t.Fatalf("unexpected canonical: %q", p.Key()) + } + }) + + t.Run("reject invalid inputs", func(t *testing.T) { + _, err := NewPattern("", nil, false) + if err == nil { + t.Fatal("expected error for empty namespace") + } + _, err = NewPattern("ns", map[string]TagSpec{"": {Kind: MatchAny}}, false) + if err == nil { + t.Fatal("expected error for empty tag") + } + _, err = NewPattern("ns", map[string]TagSpec{"bad:": {Kind: MatchAny}}, false) + if err == nil { + t.Fatal("expected error for ':' in tag") + } + _, err = NewPattern("ns", map[string]TagSpec{"a": {Kind: MatchExact, Params: map[string]string{"": "1"}}}, false) + if err == nil { + t.Fatal("expected error for empty param key") + } + _, err = NewPattern("ns", map[string]TagSpec{"a": {Kind: MatchExact, Params: map[string]string{"k": "bad;val"}}}, false) + if err == nil { + t.Fatal("expected error for bad param value") + } + }) + + t.Run("MatchExact with empty params downgrades to []", func(t *testing.T) { + // Behavior matches current constructor: empty exact => MatchNone + p, err := NewPattern("ns", map[string]TagSpec{ + "a": {Kind: MatchExact, Params: map[string]string{}}, + }, false) + if err != nil { + t.Fatal(err) + } + if p.Key() != "ns::a[]" { + t.Fatalf("unexpected canonical for empty exact: %q", p.Key()) + } + }) +} + +func TestPattern_Parse_Tokens_And_SupersetRecognition(t *testing.T) { + t.Run("accept :* token and .*", func(t *testing.T) { + ns, specs, sup, err := NewPatternFromRaw("ns::a[]:*:b[*]").Parse() + if err != nil { + t.Fatal(err) + } + if ns != "ns" || !sup { + t.Fatalf("ns=%q sup=%v", ns, sup) + } + if specs["a"].Kind != MatchNone || specs["b"].Kind != MatchAny { + t.Fatalf("unexpected specs: %#v", specs) + } + + _, specs2, sup2, err := NewPatternFromRaw("ns::a[]:b[*].*").Parse() + if err != nil || !sup2 { + t.Fatalf("parse superset suffix failed: err=%v sup=%v", err, sup2) + } + if !reflect.DeepEqual(specs, specs2) { + t.Fatalf("specs mismatch between forms") + } + }) + + t.Run("first-wins on duplicate tags and params", func(t *testing.T) { + _, specs, sup, err := NewPatternFromRaw("ns::t[x=1;y=2]:t[*]:u[k=1;k=2]").Parse() + if err != nil || sup { + t.Fatalf("err=%v sup=%v", err, sup) + } + if specs["t"].Kind != MatchExact || specs["t"].Params["x"] != "1" { + t.Fatalf("first tag should win: %#v", specs["t"]) + } + if specs["u"].Params["k"] != "1" { + t.Fatalf("first param key should win: %#v", specs["u"]) + } + }) + + t.Run("reject malformed", func(t *testing.T) { + bads := []string{ + "", "no_ns", "ns:onecolon", "::missingns::tag[]", + "ns::tag[", "ns::tag]", "ns::[]", "ns::tag[]junk", + "ns::a[=1]", "ns::a[x=]", "ns::a[ x=1 ]", + } + for _, s := range bads { + _, _, _, err := NewPatternFromRaw(s).Parse() + if err == nil { + t.Fatalf("expected parse error for %q", s) + } + } + }) +} + +func TestPattern_Match_Matrix(t *testing.T) { + makeID := func(key string) Identifier { return NewIdentifierFromRaw(key) } + + t.Run("namespace mismatch", func(t *testing.T) { + p := NewPatternFromRaw("ns::a[]") + if p.Match(makeID("other::a[]")) { + t.Fatal("should not match different namespace") + } + }) + + t.Run("MatchAny accepts empty and nonempty", func(t *testing.T) { + p := NewPatternFromRaw("ns::a[*]") + if !p.Match(makeID("ns::a[]")) || !p.Match(makeID("ns::a[x=1]")) { + t.Fatal("MatchAny should accept both") + } + }) + + t.Run("MatchNone requires empty", func(t *testing.T) { + p := NewPatternFromRaw("ns::a[]") + if !p.Match(makeID("ns::a[]")) { + t.Fatal("empty should match") + } + if p.Match(makeID("ns::a[x=1]")) { + t.Fatal("nonempty should not match MatchNone") + } + }) + + t.Run("MatchExact equals, order independent", func(t *testing.T) { + p := NewPatternFromRaw("ns::a[x=1;y=2]") + if !p.Match(makeID("ns::a[y=2;x=1]")) { + t.Fatal("param order should not matter") + } + if p.Match(makeID("ns::a[x=1]")) { + t.Fatal("missing param should fail") + } + if p.Match(makeID("ns::a[x=1;y=2;z=3]")) { + t.Fatal("extra param should fail exact") + } + if p.Match(makeID("ns::a[x=9;y=2]")) { + t.Fatal("different value should fail") + } + }) + + t.Run("exact-set vs superset", func(t *testing.T) { + exact := NewPatternFromRaw("ns::a[]:b[*]") + super := NewPatternFromRaw("ns::a[]:b[*].*") + + if !exact.Match(makeID("ns::a[].b[x=1]")) { + t.Fatal("exact should match same set") + } + if exact.Match(makeID("ns::a[].b[x=1].c[]")) { + t.Fatal("exact should not allow extra tags") + } + if !super.Match(makeID("ns::a[].b[x=1].c[]")) { + t.Fatal("superset should allow extra tags") + } + }) + + t.Run("all pattern tags must exist", func(t *testing.T) { + p := NewPatternFromRaw("ns::a[]:b[*]") + if p.Match(makeID("ns::a[]")) { + t.Fatal("missing b should fail") + } + }) +} diff --git a/services/data_service/internal/manager/commands.go b/services/data_service/internal/manager/commands.go index 6aabcf4..aff8bde 100644 --- a/services/data_service/internal/manager/commands.go +++ b/services/data_service/internal/manager/commands.go @@ -1,74 +1,69 @@ package manager import ( - "time" - "github.com/google/uuid" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider" ) -// Commands posted into the manager loop. One struct per action. -type addProviderCmd struct { - name string - p provider.Provider - resp chan addProviderResult +// Session Commands + +type createSessionCommand struct { + resp chan createSessionResult } -type addProviderResult struct { - err error +type createSessionResult struct { + sid uuid.UUID } -type removeProviderCmd struct { - name string - resp chan removeProviderResult -} - -type removeProviderResult struct { - err error -} - -type newSessionCmd struct { - idleAfter time.Duration - resp chan newSessionResult -} - -type newSessionResult struct { - id uuid.UUID -} - -type attachCmd struct { - sid uuid.UUID - inBuf, outBuf int - resp chan attachResult -} - -type attachResult struct { - cin chan<- domain.Message - cout <-chan domain.Message - err error -} - -type detachCmd struct { +type leaseSessionReceiverCommand struct { sid uuid.UUID - resp chan detachResult + resp chan leaseSessionReceiverResult } -type detachResult struct { - err error +type leaseSessionReceiverResult struct { + receiveFunc func() (domain.Message, error) + err error } -type configureCmd struct { +type leaseSessionSenderCommand struct { sid uuid.UUID - next []domain.Pattern - resp chan configureResult + resp chan leaseSessionSenderResult } -type configureResult struct { +type leaseSessionSenderResult struct { + sendFunc func(domain.Message) error + err error +} + +type releaseSessionReceiverCommand struct { + sid uuid.UUID + resp chan releaseSessionReceiverResult +} + +type releaseSessionReceiverResult struct { err error } -type closeSessionCmd struct { +type releaseSessionSenderCommand struct { + sid uuid.UUID + resp chan releaseSessionSenderResult +} + +type releaseSessionSenderResult struct { + err error +} + +type configureSessionCommand struct { + sid uuid.UUID + cfg any + resp chan configureSessionResult +} + +type configureSessionResult struct { + err error +} + +type closeSessionCommand struct { sid uuid.UUID resp chan closeSessionResult } diff --git a/services/data_service/internal/manager/helper.go b/services/data_service/internal/manager/helper.go index 5d04392..6187b07 100644 --- a/services/data_service/internal/manager/helper.go +++ b/services/data_service/internal/manager/helper.go @@ -1 +1,27 @@ package manager + +func workerEntryKey(w WorkerEntry) string { + return w.Type + "|" + string(w.Spec) + "|" + string(w.Unit) +} + +func workerEntryDiffs(old, nw []WorkerEntry) (added, removed []WorkerEntry) { + oldKeys := make(map[string]struct{}, len(old)) + newKeys := make(map[string]struct{}, len(nw)) + + for _, w := range old { + oldKeys[workerEntryKey(w)] = struct{}{} + } + for _, w := range nw { + k := workerEntryKey(w) + newKeys[k] = struct{}{} + if _, ok := oldKeys[k]; !ok { + added = append(added, w) + } + } + for _, w := range old { + if _, ok := newKeys[workerEntryKey(w)]; !ok { + removed = append(removed, w) + } + } + return added, removed +} diff --git a/services/data_service/internal/manager/manager.go b/services/data_service/internal/manager/manager.go index 639bff3..3a2c9c9 100644 --- a/services/data_service/internal/manager/manager.go +++ b/services/data_service/internal/manager/manager.go @@ -1,277 +1,500 @@ -// Package manager implements the core orchestration logic for data providers and client sessions -// in the tessera data_service. It manages provider registration, session lifecycle, client attachment, -// stream configuration, and routing of messages between clients and providers. +// Package manager is the manager package!!! package manager import ( "errors" - "fmt" "log/slog" - "time" "github.com/google/uuid" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/router" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/worker" ) -var ( - ErrSessionNotFound = errors.New("session not found") - ErrClientAlreadyAttached = errors.New("client already attached") - ErrClientNotAttached = errors.New("client not attached") -) +var ErrSessionNotFound = errors.New("session not found") // Manager is a single-goroutine actor that owns all state. type Manager struct { - // Command channel - cmdCh chan any + cmdCh chan any + sessions map[uuid.UUID]*session + router *router.Router - // State (loop-owned) - providers map[string]provider.Provider - sessions map[uuid.UUID]*session - - // Router - router *router.Router + workerRegistry *WorkerRegistry + workerInstances map[string]map[string]worker.Worker + workerUnitRefCounts map[string]map[string]map[string]int } // NewManager creates a manager and starts its run loop. -func NewManager(router *router.Router, workerRegistry *worker.Registry) *Manager { +func NewManager(r *router.Router, _ *WorkerRegistry) *Manager { m := &Manager{ - cmdCh: make(chan any, 256), - providers: make(map[string]provider.Provider), - sessions: make(map[uuid.UUID]*session), - router: router, + cmdCh: make(chan any, 256), + sessions: make(map[uuid.UUID]*session), + router: r, } - go router.Start() + go r.Start() go m.run() - slog.Default().Info("manager started", slog.String("cmp", "manager")) - return m } // API -// AddProvider adds and starts a new provider. -func (m *Manager) AddProvider(name string, p provider.Provider) error { - slog.Default().Debug("add provider request", slog.String("cmp", "manager"), slog.String("name", name)) - resp := make(chan addProviderResult, 1) - m.cmdCh <- addProviderCmd{name: name, p: p, resp: resp} - +// CreateSession creates a new session. Arms a 1m idle timer immediately. +func (m *Manager) CreateSession() uuid.UUID { + slog.Default().Debug("create session request", slog.String("cmp", "manager")) + resp := make(chan createSessionResult, 1) + m.cmdCh <- createSessionCommand{resp: resp} r := <-resp + slog.Default().Info("new session created", slog.String("cmp", "manager"), slog.String("session", r.sid.String())) + return r.sid +} - slog.Default().Info("provider added", slog.String("cmp", "manager"), slog.String("name", name)) +// LeaseSessionReceiver leases a receiver and returns the receive func and its close func. +func (m *Manager) LeaseSessionReceiver(sid uuid.UUID) (func() (domain.Message, error), error) { + slog.Default().Debug("lease session receiver request", slog.String("cmp", "manager"), slog.String("session", sid.String())) + resp := make(chan leaseSessionReceiverResult, 1) + m.cmdCh <- leaseSessionReceiverCommand{sid: sid, resp: resp} + r := <-resp + return r.receiveFunc, r.err +} + +// LeaseSessionSender leases a sender and returns the send func and its close func. +func (m *Manager) LeaseSessionSender(sid uuid.UUID) (func(domain.Message) error, error) { + slog.Default().Debug("lease sender request", slog.String("cmp", "manager"), slog.String("session", sid.String())) + resp := make(chan leaseSessionSenderResult, 1) + m.cmdCh <- leaseSessionSenderCommand{sid: sid, resp: resp} + r := <-resp + return r.sendFunc, r.err +} + +// ReleaseSessionReceiver releases the currently held receiver lease +func (m *Manager) ReleaseSessionReceiver(sid uuid.UUID) error { + slog.Default().Debug("release session receiver request", slog.String("cmp", "manager"), slog.String("session", sid.String())) + resp := make(chan releaseSessionReceiverResult, 1) + m.cmdCh <- releaseSessionReceiverCommand{sid: sid, resp: resp} + r := <-resp return r.err } -// RemoveProvider stops and removes a provider, cleaning up all sessions. -func (m *Manager) RemoveProvider(name string) error { - slog.Default().Debug("remove provider request", slog.String("cmp", "manager"), slog.String("name", name)) - resp := make(chan removeProviderResult, 1) - m.cmdCh <- removeProviderCmd{name: name, resp: resp} - +// ReleaseSessionSender releases the currently held receiver lease +func (m *Manager) ReleaseSessionSender(sid uuid.UUID) error { + slog.Default().Debug("release sender request", slog.String("cmp", "manager"), slog.String("session", sid.String())) + resp := make(chan releaseSessionSenderResult, 1) + m.cmdCh <- releaseSessionSenderCommand{sid: sid, resp: resp} r := <-resp - - slog.Default().Info("provider removed", slog.String("cmp", "manager"), slog.String("name", name)) return r.err } -// NewSession creates a new session with the given idle timeout. -func (m *Manager) NewSession(idleAfter time.Duration) uuid.UUID { - slog.Default().Debug("new session request", slog.String("cmp", "manager"), slog.Duration("idle_after", idleAfter)) - resp := make(chan newSessionResult, 1) - m.cmdCh <- newSessionCmd{idleAfter: idleAfter, resp: resp} - +// ConfigureSession applies a session config. Pattern wiring left TODO. +func (m *Manager) ConfigureSession(sid uuid.UUID, cfg any) error { + slog.Default().Debug("configure session request", slog.String("cmp", "manager"), slog.String("session", sid.String())) + resp := make(chan configureSessionResult, 1) + m.cmdCh <- configureSessionCommand{sid: sid, cfg: cfg, resp: resp} r := <-resp - - slog.Default().Info("new session created", slog.String("cmp", "manager"), slog.String("session", r.id.String())) - return r.id -} - -// AttachClient attaches a client to a session, creates and returns client channels for the session. -func (m *Manager) AttachClient(id uuid.UUID, inBuf, outBuf int) (chan<- domain.Message, <-chan domain.Message, error) { - slog.Default().Debug("attach client request", slog.String("cmp", "manager"), slog.String("session", id.String()), slog.Int("in_buf", inBuf), slog.Int("out_buf", outBuf)) - resp := make(chan attachResult, 1) - m.cmdCh <- attachCmd{sid: id, inBuf: inBuf, outBuf: outBuf, resp: resp} - - r := <-resp - - slog.Default().Info("client attached", slog.String("cmp", "manager"), slog.String("session", id.String())) - return r.cin, r.cout, r.err -} - -// DetachClient detaches the client from the session, closes client channels and arms timeout. -func (m *Manager) DetachClient(id uuid.UUID) error { - slog.Default().Debug("detach client request", slog.String("cmp", "manager"), slog.String("session", id.String())) - resp := make(chan detachResult, 1) - m.cmdCh <- detachCmd{sid: id, resp: resp} - - r := <-resp - - slog.Default().Info("client detached", slog.String("cmp", "manager"), slog.String("session", id.String())) return r.err } -// ConfigureSession sets the next set of patterns for the session, starting and stopping streams as needed. -func (m *Manager) ConfigureSession(id uuid.UUID, next []domain.Pattern) error { - slog.Default().Debug("configure session request", slog.String("cmp", "manager"), slog.String("session", id.String()), slog.Int("patterns", len(next))) - resp := make(chan configureResult, 1) - m.cmdCh <- configureCmd{sid: id, next: next, resp: resp} - - r := <-resp - - slog.Default().Info("session configured", slog.String("cmp", "manager"), slog.String("session", id.String()), slog.String("err", fmt.Sprintf("%v", r.err))) - return r.err -} - -// CloseSession closes and removes the session, cleaning up all bindings. -func (m *Manager) CloseSession(id uuid.UUID) error { - slog.Default().Debug("close session request", slog.String("cmp", "manager"), slog.String("session", id.String())) +// CloseSession closes and removes the session. +func (m *Manager) CloseSession(sid uuid.UUID) error { + slog.Default().Debug("close session request", slog.String("cmp", "manager"), slog.String("session", sid.String())) resp := make(chan closeSessionResult, 1) - m.cmdCh <- closeSessionCmd{sid: id, resp: resp} - + m.cmdCh <- closeSessionCommand{sid: sid, resp: resp} r := <-resp - - slog.Default().Info("session closed", slog.String("cmp", "manager"), slog.String("session", id.String())) return r.err } -// The main loop of the manager, processing commands serially. +// --- Loop --- + func (m *Manager) run() { - for { - msg := <-m.cmdCh + for msg := range m.cmdCh { switch c := msg.(type) { - case addProviderCmd: - m.handleAddProvider(c) - case removeProviderCmd: - m.handleRemoveProvider(c) - case newSessionCmd: + case createSessionCommand: m.handleNewSession(c) - case attachCmd: - m.handleAttach(c) - case detachCmd: - m.handleDetach(c) - case configureCmd: - m.handleConfigure(c) - case closeSessionCmd: + case leaseSessionReceiverCommand: + m.handleLeaseSessionReceiver(c) + case leaseSessionSenderCommand: + m.handleLeaseSessionSender(c) + case releaseSessionReceiverCommand: + m.handleReleaseSessionReceiver(c) + case releaseSessionSenderCommand: + m.handleReleaseSessionSender(c) + case configureSessionCommand: + m.handleConfigureSession(c) + case closeSessionCommand: m.handleCloseSession(c) } } } -// Command handlers, run in loop goroutine. With a single goroutine, no locking is needed. +// --- Handlers --- -// handleAddProvider adds and starts a new provider. -func (m *Manager) handleAddProvider(cmd addProviderCmd) { - if _, ok := m.providers[cmd.name]; ok { - slog.Default().Warn("provider already exists", slog.String("cmp", "manager"), slog.String("name", cmd.name)) - cmd.resp <- addProviderResult{err: fmt.Errorf("provider exists: %s", cmd.name)} - return +func (m *Manager) handleNewSession(cmd createSessionCommand) { + var s *session + idleCallback := func() { // Generate callback function for the session to be created. + resp := make(chan closeSessionResult, 1) + m.cmdCh <- closeSessionCommand{sid: s.id, resp: resp} + <-resp } - if err := cmd.p.Start(); err != nil { - slog.Default().Warn("failed to start provider", slog.String("cmp", "manager"), slog.String("name", cmd.name), slog.String("err", err.Error())) - cmd.resp <- addProviderResult{err: fmt.Errorf("failed to start provider %s: %w", cmd.name, err)} - return - } - m.providers[cmd.name] = cmd.p - cmd.resp <- addProviderResult{err: nil} -} - -// handleRemoveProvider stops and removes a provider, removing the bindings from all sessions that use streams from it. -// TODO: Implement this function. -func (m *Manager) handleRemoveProvider(_ removeProviderCmd) { - panic("unimplemented") -} - -// handleNewSession creates a new session with the given idle timeout. The idle timeout is typically not set by the client, but by the server configuration. -func (m *Manager) handleNewSession(cmd newSessionCmd) { - s := newSession(cmd.idleAfter) - - // Only arm the idle timer if the timeout is positive. We allow a zero or negative timeout to indicate "never timeout". - if s.idleAfter <= 0 { - s.armIdleTimer(func() { - resp := make(chan closeSessionResult, 1) - m.cmdCh <- closeSessionCmd{sid: s.id, resp: resp} - <-resp - }) - } - + s = newSession(m.router.Incoming(), idleCallback) m.sessions[s.id] = s - - cmd.resp <- newSessionResult{id: s.id} + cmd.resp <- createSessionResult{sid: s.id} } -// handleAttach attaches a client to a session, creating new client channels for the session. If the session is already attached, returns an error. -func (m *Manager) handleAttach(cmd attachCmd) { +func (m *Manager) handleLeaseSessionReceiver(cmd leaseSessionReceiverCommand) { s, ok := m.sessions[cmd.sid] if !ok { - cmd.resp <- attachResult{nil, nil, ErrSessionNotFound} + cmd.resp <- leaseSessionReceiverResult{err: ErrSessionNotFound} return } - if s.attached { - cmd.resp <- attachResult{nil, nil, ErrClientAlreadyAttached} + recv, err := s.leaseReceiver() + if err != nil { + cmd.resp <- leaseSessionReceiverResult{err: err} return } - cin, cout := s.generateNewChannels(cmd.inBuf, cmd.outBuf) - s.attached = true - s.disarmIdleTimer() + // Register the patterns and egress channel for the session with the router. + patterns := s.getPatterns() + egressChan, ok := s.getEgress() + if !ok { + cmd.resp <- leaseSessionReceiverResult{err: errors.New("egress channel doesn't exist despite successful lease")} + } - cmd.resp <- attachResult{cin: cin, cout: cout, err: nil} + for _, pattern := range patterns { + m.router.RegisterPattern(pattern, egressChan) + } + + cmd.resp <- leaseSessionReceiverResult{receiveFunc: recv, err: nil} } -// handleDetach detaches the client from the session, closing client channels and arming the idle timeout. If the session is not attached, returns an error. -func (m *Manager) handleDetach(cmd detachCmd) { +func (m *Manager) handleLeaseSessionSender(cmd leaseSessionSenderCommand) { s, ok := m.sessions[cmd.sid] if !ok { - cmd.resp <- detachResult{ErrSessionNotFound} + cmd.resp <- leaseSessionSenderResult{err: ErrSessionNotFound} return } - if !s.attached { - cmd.resp <- detachResult{ErrClientNotAttached} + send, err := s.leaseSender() + if err != nil { + cmd.resp <- leaseSessionSenderResult{err: err} return } - - s.clearChannels() - - // Only rearm the idle timer if the timeout is positive. - if s.idleAfter > 0 { - s.armIdleTimer(func() { - resp := make(chan closeSessionResult, 1) - m.cmdCh <- closeSessionCmd{sid: s.id, resp: resp} - <-resp - }) - } - - s.attached = false - - cmd.resp <- detachResult{nil} + cmd.resp <- leaseSessionSenderResult{sendFunc: send, err: nil} } -// handleConfigure updates the session bindings, starting and stopping streams as needed. Currently only supports Raw streams. -// TODO: Change this configuration to be an atomic operation, so that partial failures do not end in a half-configured state. -func (m *Manager) handleConfigure(cmd configureCmd) { - _, ok := m.sessions[cmd.sid] +func (m *Manager) handleReleaseSessionReceiver(cmd releaseSessionReceiverCommand) { + s, ok := m.sessions[cmd.sid] if !ok { - cmd.resp <- configureResult{ErrSessionNotFound} + cmd.resp <- releaseSessionReceiverResult{err: ErrSessionNotFound} + return + } + err := s.releaseReceiver() + if err != nil { + cmd.resp <- releaseSessionReceiverResult{err: err} + return + } + cmd.resp <- releaseSessionReceiverResult{err: nil} +} + +func (m *Manager) handleReleaseSessionSender(cmd releaseSessionSenderCommand) { + s, ok := m.sessions[cmd.sid] + if !ok { + cmd.resp <- releaseSessionSenderResult{err: ErrSessionNotFound} + return + } + err := s.releaseSender() + if err != nil { + cmd.resp <- releaseSessionSenderResult{err: err} + return + } + cmd.resp <- releaseSessionSenderResult{err: nil} +} + +func (m *Manager) handleConfigureSession(cmd configureSessionCommand) { + s, ok := m.sessions[cmd.sid] + if !ok { + cmd.resp <- configureSessionResult{err: ErrSessionNotFound} return } - var errs error + newCfg, ok := cmd.cfg.(SessionConfig) + if !ok { + cmd.resp <- configureSessionResult{err: ErrBadConfig} + return + } - cmd.resp <- configureResult{err: errs} + // Normalize workers. + normalized := make([]WorkerEntry, len(newCfg.Workers)) + for i, we := range newCfg.Workers { + spec, err := m.workerRegistry.NormalizeSpecificationBytes(we.Type, we.Spec) + if err != nil { + cmd.resp <- configureSessionResult{err: err} + return + } + unit, err := m.workerRegistry.NormalizeUnitBytes(we.Type, we.Unit) + if err != nil { + cmd.resp <- configureSessionResult{err: err} + return + } + normalized[i] = WorkerEntry{Type: we.Type, Spec: spec, Unit: unit} + } + newCfg.Workers = normalized + + // Compute diffs. + curr := append([]WorkerEntry(nil), s.cfg.Workers...) + next := append([]WorkerEntry(nil), newCfg.Workers...) + additions, removals := workerEntryDiffs(curr, next) + + // Per-instance delta: type -> spec -> {add, remove} + type delta struct{ add, remove [][]byte } + changes := make(map[string]map[string]delta) + addTo := func(typ, spec string, u []byte, isAdd bool) { + if changes[typ] == nil { + changes[typ] = make(map[string]delta) + } + d := changes[typ][spec] + if isAdd { + d.add = append(d.add, u) + } else { + d.remove = append(d.remove, u) + } + changes[typ][spec] = d + } + for _, e := range additions { + addTo(e.Type, string(e.Spec), e.Unit, true) + } + for _, e := range removals { + addTo(e.Type, string(e.Spec), e.Unit, false) + } + + // Ensure manager maps. + if m.workerInstances == nil { + m.workerInstances = make(map[string]map[string]worker.Worker) + } + if m.workerUnitRefCounts == nil { + m.workerUnitRefCounts = make(map[string]map[string]map[string]int) + } + + // Rollback snapshots. + type snap struct { + hadInst bool + prevRef map[string]int + } + snaps := make(map[string]map[string]snap) // type -> spec -> snap + created := make(map[string]map[string]bool) + + saveSnap := func(typ, spec string) { + if snaps[typ] == nil { + snaps[typ] = make(map[string]snap) + } + if _, ok := snaps[typ][spec]; ok { + return + } + had := false + if m.workerInstances[typ] != nil { + _, had = m.workerInstances[typ][spec] + } + prev := make(map[string]int) + if m.workerUnitRefCounts[typ] != nil && m.workerUnitRefCounts[typ][spec] != nil { + for k, v := range m.workerUnitRefCounts[typ][spec] { + prev[k] = v + } + } + snaps[typ][spec] = snap{hadInst: had, prevRef: prev} + } + markCreated := func(typ, spec string) { + if created[typ] == nil { + created[typ] = make(map[string]bool) + } + created[typ][spec] = true + } + + toBytesSlice := func(ref map[string]int) [][]byte { + out := make([][]byte, 0, len(ref)) + for k, c := range ref { + if c > 0 { + out = append(out, []byte(k)) + } + } + return out + } + + restore := func(err error) { + // Restore refcounts and instance unit sets. + for typ, specs := range snaps { + for spec, sn := range specs { + // Restore refcounts exactly. + if m.workerUnitRefCounts[typ] == nil { + m.workerUnitRefCounts[typ] = make(map[string]map[string]int) + } + rc := make(map[string]int) + for k, v := range sn.prevRef { + rc[k] = v + } + m.workerUnitRefCounts[typ][spec] = rc + + prevUnits := toBytesSlice(rc) + + inst := m.workerInstances[typ][spec] + switch { + case sn.hadInst: + // Ensure instance exists and set units back. + if inst == nil { + wi, ierr := m.workerRegistry.Spawn(typ) + if ierr == nil { + m.workerInstances[typ][spec] = wi + inst = wi + // TODO: pass the correct SessionController + _ = wi.Start([]byte(spec), s) // best-effort + } + } + if inst != nil { + _ = inst.SetUnits(prevUnits) // best-effort + } + default: + // We did not have an instance before. Stop and remove if present. + if inst != nil { + _ = inst.Stop() + delete(m.workerInstances[typ], spec) + if len(m.workerInstances[typ]) == 0 { + delete(m.workerInstances, typ) + } + } + // If no refs remain, clean refcounts map too. + if len(rc) == 0 { + delete(m.workerUnitRefCounts[typ], spec) + if len(m.workerUnitRefCounts[typ]) == 0 { + delete(m.workerUnitRefCounts, typ) + } + } + } + } + } + // Clean up instances created during this op that shouldn't exist. + for typ, specs := range created { + for spec := range specs { + if snaps[typ] != nil && snaps[typ][spec].hadInst { + continue + } + if inst := m.workerInstances[typ][spec]; inst != nil { + _ = inst.Stop() + delete(m.workerInstances[typ], spec) + if len(m.workerInstances[typ]) == 0 { + delete(m.workerInstances, typ) + } + } + } + } + cmd.resp <- configureSessionResult{err: err} + } + + // Apply deltas per instance. + for typ, specMap := range changes { + if m.workerUnitRefCounts[typ] == nil { + m.workerUnitRefCounts[typ] = make(map[string]map[string]int) + } + if m.workerInstances[typ] == nil { + m.workerInstances[typ] = make(map[string]worker.Worker) + } + + for spec, d := range specMap { + saveSnap(typ, spec) + + // Update refcounts. + rc := m.workerUnitRefCounts[typ][spec] + if rc == nil { + rc = make(map[string]int) + m.workerUnitRefCounts[typ][spec] = rc + } + for _, u := range d.remove { + k := string(u) + if rc[k] > 0 { + rc[k]-- + } + if rc[k] == 0 { + delete(rc, k) + } + } + for _, u := range d.add { + k := string(u) + rc[k]++ + } + + desired := toBytesSlice(rc) + inst := m.workerInstances[typ][spec] + + switch { + case len(desired) == 0: + // No units desired: stop and prune if instance exists. + if inst != nil { + if err := inst.Stop(); err != nil { + restore(err) + return + } + delete(m.workerInstances[typ], spec) + if len(m.workerInstances[typ]) == 0 { + delete(m.workerInstances, typ) + } + } + // If no refs left, prune refcounts too. + delete(m.workerUnitRefCounts[typ], spec) + if len(m.workerUnitRefCounts[typ]) == 0 { + delete(m.workerUnitRefCounts, typ) + } + + default: + // Need instance with desired units. + if inst == nil { + wi, err := m.workerRegistry.Instantiate(typ, []byte(spec)) + if err != nil { + restore(err) + return + } + m.workerInstances[typ][spec] = wi + markCreated(typ, spec) + // TODO: pass correct SessionController implementation + if err := wi.Start([]byte(spec), s); err != nil { + restore(err) + return + } + inst = wi + } + if err := inst.SetUnits(desired); err != nil { + restore(err) + return + } + } + } + } + + // Commit config last. + if err := s.setConfig(newCfg); err != nil { + restore(err) + return + } + + cmd.resp <- configureSessionResult{err: nil} } -// handleCloseSession closes and removes the session, cleaning up all bindings. -func (m *Manager) handleCloseSession(cmd closeSessionCmd) { - _, ok := m.sessions[cmd.sid] +func (m *Manager) handleCloseSession(cmd closeSessionCommand) { + s, ok := m.sessions[cmd.sid] if !ok { cmd.resp <- closeSessionResult{err: ErrSessionNotFound} return } - var errs error + // TODO: Ensure workers are correctly scrapped - cmd.resp <- closeSessionResult{err: errs} + patterns := s.getPatterns() + egress, ok := s.getEgress() + if ok { // We only need to deregister if there is an active receiver lease. + for _, pattern := range patterns { + m.router.DeregisterPattern(pattern, egress) + } + } + + // Release leases and ensure idle timer is disarmed. + s.closeAll() + s.disarmIdleTimer() + delete(m.sessions, cmd.sid) + + cmd.resp <- closeSessionResult{err: nil} } diff --git a/services/data_service/internal/manager/session.go b/services/data_service/internal/manager/session.go index 6cbbd49..f12c1ba 100644 --- a/services/data_service/internal/manager/session.go +++ b/services/data_service/internal/manager/session.go @@ -1,77 +1,238 @@ package manager import ( + "errors" + "log/slog" "time" "github.com/google/uuid" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" ) -const ( - defaultClientBuf = 256 +var ( + // Lease lifecycle errors. + ErrAlreadyReleased = errors.New("lease already released") + ErrSenderAlreadyLeased = errors.New("sender already leased") + ErrReceiverAlreadyLeased = errors.New("receiver already leased") + ErrSenderNotLeased = errors.New("no sender lease active") + ErrReceiverNotLeased = errors.New("no receiver lease active") + + // Config errors + ErrBadConfig = errors.New("config not valid") + ErrConfigActiveLeases = errors.New("cannot configure while a lease is active") ) -// session holds per-session state. -// Owned by the manager loop. So we do not need a mutex. +type WorkerEntry struct { + Type string + Spec []byte + Unit []byte +} + +// SessionConfig carries non-live-tunable knobs for a session. +// Manager mutates this directly; session does not expose Configure anymore. +type SessionConfig struct { + IdleAfter time.Duration // <=0 disables idle timer + EgressBuffer int // receiver egress buffer size + Patterns []domain.Pattern + Workers []WorkerEntry +} + +// session is manager-owned state. Single goroutine access. type session struct { id uuid.UUID - inChannel chan domain.Message // caller writes - outChannel chan domain.Message // caller reads + // Router pipes + ingress chan<- domain.Message // router.Incoming(); router-owned + egress chan domain.Message // current receiver lease egress; owned here - bound map[domain.Identifier]struct{} + // Config and timers + cfg SessionConfig + idleTimer *time.Timer + idleCallback func() // stored on creation - attached bool - idleAfter time.Duration - idleTimer *time.Timer + // Sender lease + sendOpen bool + sendDone chan struct{} + + // Receiver lease + receiveOpen bool + receiveDone chan struct{} } -func newSession(idleAfter time.Duration) *session { - return &session{ - id: uuid.New(), - bound: make(map[domain.Identifier]struct{}), - attached: false, - idleAfter: idleAfter, +// newSession arms a 1-minute idle timer immediately. Manager must +// configure before it fires. idleCb is invoked by the timer. +func newSession(ingress chan<- domain.Message, idleCb func()) *session { + s := &session{ + id: uuid.New(), + ingress: ingress, + cfg: SessionConfig{ + IdleAfter: time.Minute, // default 1m on creation + EgressBuffer: 256, // default buffer + }, + idleCallback: idleCb, + } + s.armIdleTimer() + return s +} + +func (s *session) setConfig(cfg any) error { + if s.sendOpen || s.receiveOpen { + return ErrConfigActiveLeases + } + + cfgParsed, ok := cfg.(SessionConfig) + if !ok { + return ErrBadConfig + } + + s.cfg = cfgParsed + + return nil +} + +func (s *session) getEgress() (chan<- domain.Message, bool) { + if s.egress == nil { + return nil, false + } + return s.egress, true +} + +func (s *session) getPatterns() []domain.Pattern { + return nil +} + +// leaseSender opens a sender lease and returns send(m) error. +func (s *session) leaseSender() (func(domain.Message) error, error) { + if s.sendOpen { + return nil, ErrSenderAlreadyLeased + } + s.sendOpen = true + s.sendDone = make(chan struct{}) + s.disarmIdleTimer() + + // Snapshot for lease-scoped handle. + done := s.sendDone + + sendFunc := func(m domain.Message) error { + select { + case <-done: + return ErrAlreadyReleased + case s.ingress <- m: + return nil + } + } + + return sendFunc, nil +} + +// releaseSender releases the current sender lease. +func (s *session) releaseSender() error { + if !s.sendOpen { + return ErrSenderNotLeased + } + s.sendOpen = false + if s.sendDone != nil { + close(s.sendDone) // invalidates all prior send funcs + s.sendDone = nil + } + if !s.receiveOpen { + s.armIdleTimer() + } + return nil +} + +// leaseReceiver opens a receiver lease and returns receive() (Message, error). +func (s *session) leaseReceiver() (func() (domain.Message, error), error) { + if s.receiveOpen { + return nil, ErrReceiverAlreadyLeased + } + s.receiveOpen = true + s.receiveDone = make(chan struct{}) + s.egress = make(chan domain.Message, s.cfg.EgressBuffer) + s.disarmIdleTimer() + + // Snapshots for lease-scoped handles. + done := s.receiveDone + eg := s.egress + + receiveFunc := func() (domain.Message, error) { + select { + case <-done: + return domain.Message{}, ErrAlreadyReleased + case msg, ok := <-eg: + if !ok { + return domain.Message{}, ErrAlreadyReleased + } + return msg, nil + } + } + + return receiveFunc, nil +} + +// releaseReceiver releases the current receiver lease. +// Manager must stop any routing into s.egress before calling this. +func (s *session) releaseReceiver() error { + if !s.receiveOpen { + return ErrReceiverNotLeased + } + s.receiveOpen = false + if s.receiveDone != nil { + close(s.receiveDone) // invalidates all prior receive funcs + s.receiveDone = nil + } + if s.egress != nil { + close(s.egress) + s.egress = nil + } + if !s.sendOpen { + s.armIdleTimer() + } + return nil +} + +// closeAll force-releases both sender and receiver leases. Safe to call multiple times. +func (s *session) closeAll() { + // Sender + if s.sendOpen { + s.sendOpen = false + if s.sendDone != nil { + close(s.sendDone) + s.sendDone = nil + } + } + // Receiver + if s.receiveOpen { + s.receiveOpen = false + if s.receiveDone != nil { + close(s.receiveDone) + s.receiveDone = nil + } + if s.egress != nil { + close(s.egress) + s.egress = nil + } } } -// armIdleTimer sets the idle timer to call f after idleAfter duration (resets existing timer if any). -func (s *session) armIdleTimer(f func()) { +// armIdleTimer arms a timer using stored cfg.IdleAfter and idleCb. +func (s *session) armIdleTimer() { + if s.idleCallback == nil { + slog.Warn("nil idle callback function provided to session") + } if s.idleTimer != nil { s.idleTimer.Stop() + s.idleTimer = nil + } + if s.cfg.IdleAfter > 0 && s.idleCallback != nil { + s.idleTimer = time.AfterFunc(s.cfg.IdleAfter, s.idleCallback) } - s.idleTimer = time.AfterFunc(s.idleAfter, f) } -// disarmIdleTimer stops and nils the idle timer if any. This call is idempotent. +// disarmIdleTimer disarms the idle timer if active. func (s *session) disarmIdleTimer() { if s.idleTimer != nil { s.idleTimer.Stop() s.idleTimer = nil } } - -// generateNewChannels creates new in/out channels for the session, will not close existing channels. -func (s *session) generateNewChannels(inBuf, outBuf int) (chan domain.Message, chan domain.Message) { - if inBuf <= 0 { - inBuf = defaultClientBuf - } - if outBuf <= 0 { - outBuf = defaultClientBuf - } - s.inChannel = make(chan domain.Message, inBuf) - s.outChannel = make(chan domain.Message, outBuf) - return s.inChannel, s.outChannel -} - -// clearChannels closes and nils the in/out channels. -func (s *session) clearChannels() { - if s.inChannel != nil { - close(s.inChannel) - s.inChannel = nil - } - if s.outChannel != nil { - close(s.outChannel) - s.outChannel = nil - } -} diff --git a/services/data_service/internal/manager/worker_registry.go b/services/data_service/internal/manager/worker_registry.go new file mode 100644 index 0000000..b363446 --- /dev/null +++ b/services/data_service/internal/manager/worker_registry.go @@ -0,0 +1,120 @@ +package manager + +import ( + "errors" + "sync" + + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/worker" +) + +var ( + ErrWorkerAlreadyRegistered = errors.New("worker type already registered") + ErrWorkerTypeUnknown = errors.New("unknown worker type") + ErrNilFactory = errors.New("nil worker factory") + ErrNilNormalizer = errors.New("nil worker normalizer") +) + +type registryEntry struct { + factory worker.Factory + normalizer worker.Normalizer +} + +type WorkerRegistry struct { + mu sync.RWMutex + m map[string]registryEntry +} + +func NewWorkerRegistry() *WorkerRegistry { + return &WorkerRegistry{m: make(map[string]registryEntry)} +} + +// Register a worker type with its factory and keyer. +func (wr *WorkerRegistry) Register(workerType string, factory worker.Factory, normalizer worker.Normalizer) error { + if factory == nil { + return ErrNilFactory + } + if normalizer == nil { + return ErrNilNormalizer + } + wr.mu.Lock() + defer wr.mu.Unlock() + if _, ok := wr.m[workerType]; ok { + return ErrWorkerAlreadyRegistered + } + wr.m[workerType] = registryEntry{factory: factory, normalizer: normalizer} + return nil +} + +// Deregister removes a worker type. +func (wr *WorkerRegistry) Deregister(workerType string) error { + wr.mu.Lock() + defer wr.mu.Unlock() + if _, ok := wr.m[workerType]; !ok { + return ErrWorkerTypeUnknown + } + delete(wr.m, workerType) + return nil +} + +// Spawn constructs a new worker instance for the given type. +func (wr *WorkerRegistry) Spawn(workerType string) (worker.Worker, error) { + wr.mu.RLock() + entry, ok := wr.m[workerType] + wr.mu.RUnlock() + if !ok { + return nil, ErrWorkerTypeUnknown + } + return entry.factory(), nil +} + +func (wr *WorkerRegistry) NormalizeSpecificationBytes(workerType string, spec []byte) ([]byte, error) { + wr.mu.RLock() + entry, ok := wr.m[workerType] + wr.mu.RUnlock() + if !ok { + return nil, ErrWorkerTypeUnknown + } + return entry.normalizer.NormalizeSpecification(spec) +} + +func (wr *WorkerRegistry) NormalizeUnitBytes(workerType string, unit []byte) ([]byte, error) { + wr.mu.RLock() + entry, ok := wr.m[workerType] + wr.mu.RUnlock() + if !ok { + return nil, ErrWorkerTypeUnknown + } + return entry.normalizer.NormalizeUnit(unit) +} + +// Factory returns the registered factory. +func (wr *WorkerRegistry) Factory(workerType string) (worker.Factory, error) { + wr.mu.RLock() + entry, ok := wr.m[workerType] + wr.mu.RUnlock() + if !ok { + return nil, ErrWorkerTypeUnknown + } + return entry.factory, nil +} + +func (wr *WorkerRegistry) Normalizer(workerType string) (worker.Normalizer, error) { + wr.mu.RLock() + entry, ok := wr.m[workerType] + wr.mu.RUnlock() + if !ok { + return nil, ErrWorkerTypeUnknown + } + return entry.normalizer, nil +} + +// RegisteredTypes lists all worker types. +func (wr *WorkerRegistry) RegisteredTypes() []string { + wr.mu.RLock() + defer wr.mu.RUnlock() + out := make([]string, 0, len(wr.m)) + for t := range wr.m { + out = append(out, t) + } + return out +} diff --git a/services/data_service/internal/provider/provider.go b/services/data_service/internal/provider/provider.go deleted file mode 100644 index 663b10c..0000000 --- a/services/data_service/internal/provider/provider.go +++ /dev/null @@ -1,16 +0,0 @@ -package provider - -import "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" - -type Provider interface { - Start() error - Stop() - - Subscribe(subject string) <-chan error - Unsubscribe(subject string) <-chan error - Fetch(subject string) (domain.Message, error) - - GetActiveStreams() []string - IsStreamActive(key string) bool - IsValidSubject(key string, isFetch bool) bool -} diff --git a/services/data_service/internal/provider/providers/test/test_provider.go b/services/data_service/internal/provider/providers/test/test_provider.go deleted file mode 100644 index 5615f05..0000000 --- a/services/data_service/internal/provider/providers/test/test_provider.go +++ /dev/null @@ -1,542 +0,0 @@ -// Package test implements a configurable synthetic data provider. -// -// Config via subject string. Two syntaxes are accepted: -// -// Query style: "foo?period=7us&size=64&mode=const&burst=1&jitter=0.02&drop=1&ts=1&log=1" -// Path style: "foo/period/7us/size/64/mode/poisson/rate/120000/jitter/0.05/drop/0/ts/1/log/1" -// -// Parameters: -// -// period: Go duration. Inter-message target (wins over rate). -// rate: Messages per second. Used if period absent. -// mode: const | poisson | onoff -// burst: Messages emitted per tick (>=1). -// jitter: ±fraction jitter on period (e.g., 0.05 = ±5%). -// on/off: Durations for onoff mode (e.g., on=5ms&off=1ms). -// size: Payload bytes (>=1). If ts=1 and size<16, auto-extends to 16. -// ptype: bytes | counter | json (payload content generator) -// drop: 1=non-blocking send (drop on backpressure), 0=block. -// ts: 1=prepend 16B header: [sendUnixNano int64][seq int64]. -// log: 1=emit per-second metrics via slog. -// -// Notes: -// - Constant mode uses sleep-then-spin pacer for sub-10µs. -// - Poisson mode draws inter-arrivals from Exp(rate). -// - On/Off emits at period during "on", silent during "off" windows. -// - Metrics include msgs/s, bytes/s, drops/s per stream. -// - Fetch is unsupported (returns error). -package test - -import ( - "context" - "errors" - "fmt" - "math/rand/v2" - "net/url" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" - - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" -) - -type TestProvider struct { - mu sync.Mutex - streams map[string]*stream - out chan<- domain.Message - defaults cfg -} - -type stream struct { - cancel context.CancelFunc - done chan struct{} - stats *metrics -} - -type metrics struct { - sent, dropped atomic.Uint64 - prevSent uint64 - prevDropped uint64 - startUnix int64 -} - -type mode int - -const ( - modeConst mode = iota - modePoisson - modeOnOff -) - -type ptype int - -const ( - ptBytes ptype = iota - ptCounter - ptJSON -) - -type cfg struct { - period time.Duration // inter-arrival target - rate float64 // msgs/sec if period == 0 - jitter float64 // ±fraction - mode mode - onDur time.Duration // for onoff - offDur time.Duration // for onoff - burst int - size int - pType ptype - dropIfSlow bool - embedTS bool - logEverySec bool -} - -// NewTestProvider returns a provider with sane defaults. -func NewTestProvider(out chan<- domain.Message, defaultPeriod time.Duration) *TestProvider { - if defaultPeriod <= 0 { - defaultPeriod = 100 * time.Microsecond - } - return &TestProvider{ - streams: make(map[string]*stream), - out: out, - defaults: cfg{ - period: defaultPeriod, - rate: 0, - jitter: 0, - mode: modeConst, - onDur: 5 * time.Millisecond, - offDur: 1 * time.Millisecond, - burst: 1, - size: 32, - pType: ptBytes, - dropIfSlow: true, - embedTS: true, - }, - } -} - -func (p *TestProvider) Start() error { return nil } - -func (p *TestProvider) Stop() { - p.mu.Lock() - defer p.mu.Unlock() - for key, s := range p.streams { - s.cancel() - <-s.done - delete(p.streams, key) - } -} - -func (p *TestProvider) Subscribe(subject string) <-chan error { - errCh := make(chan error, 1) - - if !p.IsValidSubject(subject, false) { - errCh <- errors.New("invalid subject") - close(errCh) - return errCh - } - - p.mu.Lock() - if _, exists := p.streams[subject]; exists { - p.mu.Unlock() - errCh <- nil - return errCh - } - - ctx, cancel := context.WithCancel(context.Background()) - s := &stream{ - cancel: cancel, - done: make(chan struct{}), - stats: &metrics{startUnix: time.Now().Unix()}, - } - p.streams[subject] = s - out := p.out - conf := p.parseCfg(subject) - p.mu.Unlock() - - go run(ctx, s, out, subject, conf) - - errCh <- nil - return errCh -} - -func (p *TestProvider) Unsubscribe(subject string) <-chan error { - errCh := make(chan error, 1) - - p.mu.Lock() - s, ok := p.streams[subject] - if !ok { - p.mu.Unlock() - errCh <- errors.New("not subscribed") - return errCh - } - delete(p.streams, subject) - p.mu.Unlock() - - go func() { - s.cancel() - <-s.done - errCh <- nil - }() - return errCh -} - -func (p *TestProvider) Fetch(_ string) (domain.Message, error) { - return domain.Message{}, fmt.Errorf("fetch not supported by provider") -} - -func (p *TestProvider) GetActiveStreams() []string { - p.mu.Lock() - defer p.mu.Unlock() - keys := make([]string, 0, len(p.streams)) - for k := range p.streams { - keys = append(keys, k) - } - return keys -} - -func (p *TestProvider) IsStreamActive(key string) bool { - p.mu.Lock() - _, ok := p.streams[key] - p.mu.Unlock() - return ok -} - -func (p *TestProvider) IsValidSubject(key string, _ bool) bool { - if key == "" { - return false - } - // Accept anything parseable via parseCfg; fallback true. - return true -} - -// --- core --- - -func run(ctx context.Context, s *stream, out chan<- domain.Message, subject string, c cfg) { - defer close(s.done) - - ident, _ := domain.RawID("test_provider", subject) - - // Sanitize - if c.burst < 1 { - c.burst = 1 - } - if c.size < 1 { - c.size = 1 - } - if c.embedTS && c.size < 16 { - c.size = 16 - } - if c.period <= 0 { - if c.rate > 0 { - c.period = time.Duration(float64(time.Second) / c.rate) - } else { - c.period = 10 * time.Microsecond - } - } - if c.jitter < 0 { - c.jitter = 0 - } - if c.jitter > 0.95 { - c.jitter = 0.95 - } - - // Per-second logging - var logTicker *time.Ticker - if c.logEverySec { - logTicker = time.NewTicker(time.Second) - defer logTicker.Stop() - } - - var seq uint64 - base := make([]byte, c.size) - - // On/Off state - onUntil := time.Time{} - offUntil := time.Time{} - inOn := true - now := time.Now() - onUntil = now.Add(c.onDur) - - // Scheduling - next := time.Now() - - for { - select { - case <-ctx.Done(): - return - default: - } - - switch c.mode { - case modeConst: - // sleep-then-spin to hit sub-10µs with isolated core - if d := time.Until(next); d > 0 { - if d > 30*time.Microsecond { - time.Sleep(d - 30*time.Microsecond) - } - for time.Now().Before(next) { - } - } - case modePoisson: - // draw from exponential with mean=period - lam := 1.0 / float64(c.period) - ia := time.Duration(rand.ExpFloat64() / lam) - next = time.Now().Add(ia) - // No pre-wait here; emit immediately then sleep to next - case modeOnOff: - now = time.Now() - if inOn { - if now.After(onUntil) { - inOn = false - offUntil = now.Add(c.offDur) - continue - } - } else { - if now.After(offUntil) { - inOn = true - onUntil = now.Add(c.onDur) - } - // While off, push next and wait - // Small sleep to avoid busy loop during off - time.Sleep(minDur(c.offDur/4, 200*time.Microsecond)) - continue - } - // For on state, behave like const - if d := time.Until(next); d > 0 { - if d > 30*time.Microsecond { - time.Sleep(d - 30*time.Microsecond) - } - for time.Now().Before(next) { - } - } - } - - // Emit burst - for i := 0; i < c.burst; i++ { - seq++ - payload := base[:c.size] - switch c.pType { - case ptBytes: - fillPattern(payload, uint64(seq)) - case ptCounter: - fillCounter(payload, uint64(seq)) - case ptJSON: - // build minimal, fixed-size-ish JSON into payload - n := buildJSON(payload, uint64(seq)) - payload = payload[:n] - } - - if c.embedTS { - ensureCap(&payload, 16) - ts := time.Now().UnixNano() - putInt64(payload[0:8], ts) - putInt64(payload[8:16], int64(seq)) - } - - msg := domain.Message{ - Identifier: ident, - Payload: payload, - } - - if out != nil { - if c.dropIfSlow { - select { - case out <- msg: - s.stats.sent.Add(1) - default: - s.stats.dropped.Add(1) - } - } else { - select { - case out <- msg: - s.stats.sent.Add(1) - case <-ctx.Done(): - return - } - } - } - } - - // Schedule next - adj := c.period - if c.mode == modePoisson { - // next already chosen - } else { - if c.jitter > 0 { - j := (rand.Float64()*2 - 1) * c.jitter - adj = time.Duration(float64(c.period) * (1 + j)) - if adj < 0 { - adj = 0 - } - } - next = next.Add(adj) - } - - // For poisson, actively wait to next - if c.mode == modePoisson { - if d := time.Until(next); d > 0 { - if d > 30*time.Microsecond { - time.Sleep(d - 30*time.Microsecond) - } - for time.Now().Before(next) { - } - } - } - } -} - -// --- config parsing --- - -func (p *TestProvider) parseCfg(subject string) cfg { - c := p.defaults - - // Query style first - if i := strings.Index(subject, "?"); i >= 0 && i < len(subject)-1 { - if qv, err := url.ParseQuery(subject[i+1:]); err == nil { - c = applyQuery(c, qv) - } - } - - // Path segments like /key/value/ pairs - parts := strings.Split(subject, "/") - for i := 0; i+1 < len(parts); i += 2 { - k := strings.ToLower(parts[i]) - v := parts[i+1] - if k == "" { - continue - } - applyKV(&c, k, v) - } - return c -} - -func applyQuery(c cfg, v url.Values) cfg { - for k, vals := range v { - if len(vals) == 0 { - continue - } - applyKV(&c, strings.ToLower(k), vals[0]) - } - return c -} - -func applyKV(c *cfg, key, val string) { - switch key { - case "period": - if d, err := time.ParseDuration(val); err == nil && d > 0 { - c.period = d - } - case "rate": - if f, err := strconv.ParseFloat(val, 64); err == nil && f > 0 { - c.rate = f - c.period = 0 // let rate take effect if period unset later - } - case "mode": - switch strings.ToLower(val) { - case "const", "steady": - c.mode = modeConst - case "poisson": - c.mode = modePoisson - case "onoff", "burst": - c.mode = modeOnOff - } - case "on": - if d, err := time.ParseDuration(val); err == nil && d >= 0 { - c.onDur = d - } - case "off": - if d, err := time.ParseDuration(val); err == nil && d >= 0 { - c.offDur = d - } - case "burst": - if n, err := strconv.Atoi(val); err == nil && n > 0 { - c.burst = n - } - case "jitter": - if f, err := strconv.ParseFloat(val, 64); err == nil && f >= 0 && f < 1 { - c.jitter = f - } - case "size": - if n, err := strconv.Atoi(val); err == nil && n > 0 { - c.size = n - } - case "ptype": - switch strings.ToLower(val) { - case "bytes": - c.pType = ptBytes - case "counter": - c.pType = ptCounter - case "json": - c.pType = ptJSON - } - case "drop": - c.dropIfSlow = val == "1" || strings.EqualFold(val, "true") - case "ts": - c.embedTS = val == "1" || strings.EqualFold(val, "true") - case "log": - c.logEverySec = val == "1" || strings.EqualFold(val, "true") - } -} - -// --- payload builders --- - -func fillPattern(b []byte, seed uint64) { - // xorshift for deterministic but non-trivial bytes - if len(b) == 0 { - return - } - x := seed | 1 - for i := range b { - x ^= x << 13 - x ^= x >> 7 - x ^= x << 17 - b[i] = byte(x) - } -} - -func fillCounter(b []byte, seq uint64) { - for i := range b { - b[i] = byte((seq + uint64(i)) & 0xFF) - } -} - -func buildJSON(buf []byte, seq uint64) int { - // Small fixed fields. Truncate if buffer small. - // Example: {"t":1694490000000000,"s":12345,"p":100.12} - ts := time.Now().UnixNano() - price := 10000 + float64(seq%1000)*0.01 - str := fmt.Sprintf(`{"t":%d,"s":%d,"p":%.2f}`, ts, seq, price) - n := copy(buf, str) - return n -} - -func ensureCap(b *[]byte, need int) { - if len(*b) >= need { - return - } - nb := make([]byte, need) - copy(nb, *b) - *b = nb -} - -func putInt64(b []byte, v int64) { - _ = b[7] - b[0] = byte(v >> 56) - b[1] = byte(v >> 48) - b[2] = byte(v >> 40) - b[3] = byte(v >> 32) - b[4] = byte(v >> 24) - b[5] = byte(v >> 16) - b[6] = byte(v >> 8) - b[7] = byte(v) -} - -func minDur(a, b time.Duration) time.Duration { - if a < b { - return a - } - return b -} diff --git a/services/data_service/internal/router/partition.go b/services/data_service/internal/router/partition.go index 983673a..511a1ff 100644 --- a/services/data_service/internal/router/partition.go +++ b/services/data_service/internal/router/partition.go @@ -79,7 +79,7 @@ func (p *actorPartition) loop() { if !exists { uniqueChannels := make(map[chan<- domain.Message]struct{}) for _, e := range p.rules { - if e.pattern.Satisfies(id) { + if e.pattern.Match(id) { for ch := range e.channels { uniqueChannels[ch] = struct{}{} } @@ -107,7 +107,7 @@ func (p *actorPartition) loop() { } case opRegister: - key := v.pattern.Canonical() + key := v.pattern.Key() e, exists := p.rules[key] if !exists { e = &ruleEntry{pattern: v.pattern, channels: make(map[chan<- domain.Message]struct{})} @@ -121,7 +121,7 @@ func (p *actorPartition) loop() { e.channels[v.channel] = struct{}{} for id, subs := range p.memo { - if v.pattern.Satisfies(id) && !slices.Contains(subs, v.channel) { + if v.pattern.Match(id) && !slices.Contains(subs, v.channel) { p.memo[id] = append(subs, v.channel) } } @@ -129,7 +129,7 @@ func (p *actorPartition) loop() { v.done <- struct{}{} case opDeregister: - key := v.pattern.Canonical() + key := v.pattern.Key() e, ok := p.rules[key] if !ok { v.done <- struct{}{} @@ -146,7 +146,7 @@ func (p *actorPartition) loop() { } for id, subs := range p.memo { - if v.pattern.Satisfies(id) { + if v.pattern.Match(id) { for i := range subs { if subs[i] == v.channel { last := len(subs) - 1 diff --git a/services/data_service/internal/router/routerAdv.go b/services/data_service/internal/router/routerAdv.go index 4d8a770..d21a032 100644 --- a/services/data_service/internal/router/routerAdv.go +++ b/services/data_service/internal/router/routerAdv.go @@ -100,7 +100,7 @@ func (r *Router) Incoming() chan<- domain.Message { return r.incoming } func (r *Router) RegisterPattern(pat domain.Pattern, ch chan<- domain.Message) { // Inline ensurePartition - ns := pat.Namespace + ns, _, _, _ := pat.Parse() // Note: Error ignored, pattern assumed to be valid if passed to router r.mu.RLock() p := r.partitions[ns] r.mu.RUnlock() @@ -119,7 +119,8 @@ func (r *Router) RegisterPattern(pat domain.Pattern, ch chan<- domain.Message) { func (r *Router) DeregisterPattern(pat domain.Pattern, ch chan<- domain.Message) { r.mu.RLock() - p := r.partitions[pat.Namespace] + ns, _, _, _ := pat.Parse() + p := r.partitions[ns] r.mu.RUnlock() if p != nil { p.deregisterRoute(pat, ch) diff --git a/services/data_service/internal/server/gprc_control_server.go b/services/data_service/internal/server/gprc_control_server.go deleted file mode 100644 index 0a5b1a8..0000000 --- a/services/data_service/internal/server/gprc_control_server.go +++ /dev/null @@ -1,82 +0,0 @@ -package server - -import ( - "context" - "time" - - "github.com/google/uuid" - pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -type GRPCControlServer struct { - pb.UnimplementedDataServiceControlServer - manager *manager.Manager -} - -func NewGRPCControlServer(m *manager.Manager) *GRPCControlServer { - return &GRPCControlServer{manager: m} -} - -// StartStream creates a new session. It does NOT attach client channels. -// Your streaming RPC should later call AttachClient(sessionID, opts). -func (s *GRPCControlServer) StartStream(_ context.Context, req *pb.StartStreamRequest) (*pb.StartStreamResponse, error) { - sessionID := s.manager.NewSession(time.Duration(1) * time.Minute) // timeout set to 1 minute - return &pb.StartStreamResponse{StreamUuid: sessionID.String()}, nil -} - -// ConfigureStream sets the session's subscriptions in one shot. -// It does NOT require channels to be attached. -func (s *GRPCControlServer) ConfigureStream(_ context.Context, req *pb.ConfigureStreamRequest) (*pb.ConfigureStreamResponse, error) { - if req == nil { - return nil, status.Error(codes.InvalidArgument, "nil request") - } - streamID, err := uuid.Parse(req.StreamUuid) - if err != nil { - return nil, status.Errorf(codes.InvalidArgument, "invalid stream_uuid %q: %v", req.StreamUuid, err) - } - - ids := make([]domain.Identifier, 0, len(req.Identifiers)) - for _, in := range req.Identifiers { - id, e := domain.ParseIdentifier(in.Key) - if e != nil { - return nil, status.Errorf(codes.InvalidArgument, "invalid identifier %q: %v", in.Key, e) - } - ids = append(ids, id) - } - - if err := s.manager.ConfigureSession(streamID, ids); err != nil { - // Map common manager errors to gRPC codes. - switch err { - case manager.ErrSessionNotFound: - return nil, status.Errorf(codes.NotFound, "session not found: %v", err) - default: - return nil, status.Errorf(codes.Internal, "set subscriptions: %v", err) - } - } - return &pb.ConfigureStreamResponse{}, nil -} - -// StopStream closes the session and tears down routes and streams. -func (s *GRPCControlServer) StopStream(_ context.Context, req *pb.StopStreamRequest) (*pb.StopStreamResponse, error) { - if req == nil { - return nil, status.Error(codes.InvalidArgument, "nil request") - } - streamID, err := uuid.Parse(req.StreamUuid) - if err != nil { - return nil, status.Errorf(codes.InvalidArgument, "invalid stream_uuid %q: %v", req.StreamUuid, err) - } - - if err := s.manager.CloseSession(streamID); err != nil { - switch err { - case manager.ErrSessionNotFound: - return nil, status.Errorf(codes.NotFound, "session not found: %v", err) - default: - return nil, status.Errorf(codes.Internal, "close session: %v", err) - } - } - return &pb.StopStreamResponse{}, nil -} diff --git a/services/data_service/internal/server/grpc_streaming_server.go b/services/data_service/internal/server/grpc_streaming_server.go deleted file mode 100644 index 7e7ce81..0000000 --- a/services/data_service/internal/server/grpc_streaming_server.go +++ /dev/null @@ -1,54 +0,0 @@ -package server - -import ( - "fmt" - - "github.com/google/uuid" - pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager" -) - -type GRPCStreamingServer struct { - pb.UnimplementedDataServiceStreamingServer - manager *manager.Manager -} - -func NewGRPCStreamingServer(m *manager.Manager) *GRPCStreamingServer { - return &GRPCStreamingServer{manager: m} -} - -// ConnectStream attaches a client to an existing session and streams outbound messages. -// This is server-streaming only; inbound use is optional and ignored here. -func (s *GRPCStreamingServer) ConnectStream(req *pb.ConnectStreamRequest, stream pb.DataServiceStreaming_ConnectStreamServer) error { - if req == nil { - return fmt.Errorf("nil request") - } - sessionID, err := uuid.Parse(req.StreamUuid) - if err != nil { - return fmt.Errorf("invalid UUID: %w", err) - } - - _, out, err := s.manager.AttachClient(sessionID, 256, 1024) - if err != nil { - return fmt.Errorf("attach channels: %w", err) - } - defer func() { _ = s.manager.DetachClient(sessionID) }() - - ctx := stream.Context() - for { - select { - case <-ctx.Done(): - return nil - case msg, ok := <-out: - if !ok { - return nil // session closed - } - if err := stream.Send(&pb.Message{ - Identifier: &pb.Identifier{Key: msg.Identifier.Key()}, - Payload: msg.Payload, - }); err != nil { - return err - } - } - } -} diff --git a/services/data_service/internal/server/socket_streaming_server.go b/services/data_service/internal/server/socket_streaming_server.go deleted file mode 100644 index f34b9a3..0000000 --- a/services/data_service/internal/server/socket_streaming_server.go +++ /dev/null @@ -1,242 +0,0 @@ -package server - -import ( - "bufio" - "bytes" - "encoding/binary" - "fmt" - "io" - "net" - "sync" - "time" - - "github.com/google/uuid" - pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager" - "google.golang.org/protobuf/proto" -) - -type SocketStreamingServer struct { - manager *manager.Manager -} - -func NewSocketStreamingServer(m *manager.Manager) *SocketStreamingServer { - return &SocketStreamingServer{manager: m} -} - -func (s *SocketStreamingServer) Serve(lis net.Listener) error { - for { - conn, err := lis.Accept() - if err != nil { - fmt.Printf("accept error: %v\n", err) - continue - } - go s.handleConnection(conn) - } -} - -func (s *SocketStreamingServer) handleConnection(conn net.Conn) { - defer func() { - if err := conn.Close(); err != nil { - fmt.Printf("conn close error: %v\n", err) - } else { - fmt.Println("connection closed") - } - }() - - if tc, ok := conn.(*net.TCPConn); ok { - _ = tc.SetNoDelay(true) // low latency - _ = tc.SetWriteBuffer(2 * 1024 * 1024) // bigger kernel sndbuf - _ = tc.SetReadBuffer(256 * 1024) - _ = tc.SetKeepAlive(true) - _ = tc.SetKeepAlivePeriod(30 * time.Second) - // Note: avoid SetLinger>0; default is fine. - } - - reader := bufio.NewReaderSize(conn, 64*1024) - line, err := reader.ReadBytes('\n') - if err != nil { - fmt.Printf("read stream UUID error: %v\n", err) - _, _ = fmt.Fprint(conn, "Failed to read stream UUID\n") - return - } - streamUUID, err := uuid.Parse(string(trimLineEnding(line))) - if err != nil { - _, _ = fmt.Fprint(conn, "Invalid stream UUID\n") - return - } - - // Give the socket server room before router drops. Make out chan larger. - // Tune per your pressure. (in=256, out=8192 as example) - _, out, err := s.manager.AttachClient(streamUUID, 256, 8192) - if err != nil { - _, _ = fmt.Fprintf(conn, "Failed to attach to stream: %v\n", err) - return - } - defer func() { _ = s.manager.DetachClient(streamUUID) }() - - // Large bufio writer to reduce syscalls. - writer := bufio.NewWriterSize(conn, 1*1024*1024) - defer func() { - if err := writer.Flush(); err != nil { - fmt.Printf("final flush error: %v\n", err) - } - }() - - // ---- Throughput optimizations ---- - const ( - maxBatchMsgs = 128 // cap number of msgs per batch - maxBatchBytes = 1 * 1024 * 1024 // cap bytes per batch - idleFlush = 2 * time.Millisecond // small idle flush timer - ) - var ( - hdr [4]byte - batchBuf = &bytes.Buffer{} - bufPool = sync.Pool{New: func() any { return make([]byte, 64*1024) }} - timer = time.NewTimer(idleFlush) - timerAlive = true - ) - - stopTimer := func() { - if timerAlive && timer.Stop() { - // drain if fired - select { - case <-timer.C: - default: - } - } - timerAlive = false - } - resetTimer := func() { - if !timerAlive { - timer.Reset(idleFlush) - timerAlive = true - } else { - // re-arm - stopTimer() - timer.Reset(idleFlush) - timerAlive = true - } - } - - // Main loop: drain out channel into a single write. - for { - // Block for at least one message or close. - msg, ok := <-out - if !ok { - _ = writer.Flush() - return - } - - batchBuf.Reset() - bytesInBatch := 0 - msgsInBatch := 0 - - // Start with the message we just popped. - { - m := pb.Message{ - Identifier: &pb.Identifier{Key: msg.Identifier.Key()}, - Payload: msg.Payload, - } - - // Use pooled scratch to avoid per-message allocs in Marshal. - scratch := bufPool.Get().([]byte)[:0] - b, err := proto.MarshalOptions{}.MarshalAppend(scratch, &m) - if err != nil { - fmt.Printf("proto marshal error: %v\n", err) - bufPool.Put(scratch[:0]) - // skip message - } else { - binary.BigEndian.PutUint32(hdr[:], uint32(len(b))) - _, _ = batchBuf.Write(hdr[:]) - _, _ = batchBuf.Write(b) - bytesInBatch += 4 + len(b) - msgsInBatch++ - bufPool.Put(b[:0]) - } - } - - // Opportunistically drain without blocking. - drain := true - resetTimer() - for drain && msgsInBatch < maxBatchMsgs && bytesInBatch < maxBatchBytes { - select { - case msg, ok = <-out: - if !ok { - // peer closed while batching; flush what we have. - if batchBuf.Len() > 0 { - if _, err := writer.Write(batchBuf.Bytes()); err != nil { - if err == io.EOF { - return - } - fmt.Printf("write error: %v\n", err) - return - } - if err := writer.Flush(); err != nil { - fmt.Printf("flush error: %v\n", err) - } - } - return - } - m := pb.Message{ - Identifier: &pb.Identifier{Key: msg.Identifier.Key()}, - Payload: msg.Payload, - } - scratch := bufPool.Get().([]byte)[:0] - b, err := proto.MarshalOptions{}.MarshalAppend(scratch, &m) - if err != nil { - fmt.Printf("proto marshal error: %v\n", err) - bufPool.Put(scratch[:0]) - continue - } - binary.BigEndian.PutUint32(hdr[:], uint32(len(b))) - _, _ = batchBuf.Write(hdr[:]) - _, _ = batchBuf.Write(b) - bytesInBatch += 4 + len(b) - msgsInBatch++ - bufPool.Put(b[:0]) - case <-timer.C: - timerAlive = false - // idle window hit; stop draining further this round - drain = false - } - } - - // Single write for the whole batch. - // Avoid per-message SetWriteDeadline. Let TCP handle buffering. - if _, err := writer.Write(batchBuf.Bytes()); err != nil { - if err == io.EOF { - return - } - fmt.Printf("write error: %v\n", err) - return - } - - // Flush when batch is sizable or we saw the idle timer. - // This keeps latency low without flushing every message. - if msgsInBatch >= maxBatchMsgs || - bytesInBatch >= maxBatchBytes || - !timerAlive { - if err := writer.Flush(); err != nil { - fmt.Printf("flush error: %v\n", err) - return - } - } - } -} - -// trimLineEnding trims a single trailing '\n' and optional '\r' before it. -func trimLineEnding(b []byte) []byte { - n := len(b) - if n == 0 { - return b - } - if b[n-1] == '\n' { - n-- - if n > 0 && b[n-1] == '\r' { - n-- - } - return b[:n] - } - return b -} diff --git a/services/data_service/internal/worker/registry.go b/services/data_service/internal/worker/registry.go deleted file mode 100644 index d33109e..0000000 --- a/services/data_service/internal/worker/registry.go +++ /dev/null @@ -1,68 +0,0 @@ -package worker - -import ( - "fmt" - "sync" -) - -type Factory func() Worker - -type Registry struct { - mu sync.RWMutex - workerFactories map[string]Factory -} - -func NewRegistry() *Registry { - return &Registry{ - workerFactories: make(map[string]Factory), - } -} - -func (r *Registry) Register(workerType string, workerFactory Factory) error { - r.mu.Lock() - defer r.mu.Unlock() - - if _, ok := r.workerFactories[workerType]; ok { - return fmt.Errorf("worker already registered: %s", workerType) - } - - if workerFactory == nil { - return fmt.Errorf("nil workerFactory provided for: %s", workerType) - } - - r.workerFactories[workerType] = workerFactory - - return nil -} - -func (r *Registry) Spawn(workerType string) (Worker, error) { - r.mu.RLock() - defer r.mu.RUnlock() - - workerFactory, ok := r.workerFactories[workerType] - if !ok { - return nil, fmt.Errorf("unknown worker type: %s", workerType) - } - - return workerFactory(), nil -} - -func (r *Registry) RegisteredWorkers() []string { - r.mu.RLock() - defer r.mu.RUnlock() - - workerTypes := make([]string, 0, len(r.workerFactories)) - for k := range r.workerFactories { - workerTypes = append(workerTypes, k) - } - - return workerTypes -} - -func (r *Registry) Factory(workerType string) (Factory, bool) { - r.mu.RLock() - defer r.mu.RUnlock() - - workerFactory, ok := r.workerFactories[workerType] - return workerFactory, ok -} diff --git a/services/data_service/internal/worker/worker.go b/services/data_service/internal/worker/worker.go index 384c8f7..2bee65e 100644 --- a/services/data_service/internal/worker/worker.go +++ b/services/data_service/internal/worker/worker.go @@ -3,10 +3,7 @@ package worker import ( - "context" "errors" - "log/slog" - "sync" "time" "github.com/google/uuid" @@ -18,128 +15,37 @@ var ( ErrWorkerRunning = errors.New("worker already running") ) -type SessionController interface { - NewSession(idleAfter time.Duration) uuid.UUID - AttachClient(id uuid.UUID, inBuf, outBuf int) (chan<- domain.Message, <-chan domain.Message, error) - DetachClient(id uuid.UUID) error - ConfigureSession(id uuid.UUID, next []domain.Identifier) error - CloseSession(id uuid.UUID) error -} +type ( + ReceiverFunc func() (domain.Message, error) + SenderFunc func(m domain.Message) error +) -type Instruction struct{} +type SessionController interface { + CreateSession(idleAfter time.Duration) uuid.UUID + + LeaseSessionReceiver(sid uuid.UUID) (ReceiverFunc, error) + LeaseSessionSender(sid uuid.UUID) (SenderFunc, error) + ReleaseSessionReceiver(sid uuid.UUID) error + ReleaseSessionSender(sid uuid.UUID) error + + ConfigureSession(sid uuid.UUID, cfg any) error + CloseSession(sid uuid.UUID) error +} type Worker interface { - Start(workerID uuid.UUID, controller SessionController, cfg []byte) error + Start(spec []byte, ctrl SessionController) error Stop() error - - Execute(ctx context.Context, inst Instruction) error - IsRunning() bool - ID() uuid.UUID + + SetUnits(units [][]byte) error + + GetSpecification() []byte + GetUnits() [][]byte } -type BaseStatefulWorker struct { - workerUUID uuid.UUID - - sc SessionController - sid uuid.UUID - in chan<- domain.Message - out <-chan domain.Message - - running bool - mu sync.RWMutex +type Normalizer interface { + NormalizeSpecification(spec []byte) ([]byte, error) + NormalizeUnit(unit []byte) ([]byte, error) } -func (w *BaseStatefulWorker) Start(workerUUID uuid.UUID, sessionController SessionController, _ []byte) error { - if sessionController == nil { - return errors.New("nil SessionController provided") - } - - w.mu.Lock() - if w.running { - w.mu.Unlock() - return ErrWorkerRunning - } - - sid := sessionController.NewSession(time.Duration(0)) // set a zero duration to disable idle timeout - in, out, err := sessionController.AttachClient(sid, 256, 256) - if err != nil { - w.mu.Unlock() - return err - } - - w.sc, w.in, w.out = sessionController, in, out - w.workerUUID = workerUUID - w.running = true - - w.mu.Unlock() - return nil -} - -func (w *BaseStatefulWorker) Stop() error { - w.mu.Lock() - if !w.running { - w.mu.Unlock() - return ErrWorkerNotRunning - } - - err := w.sc.DetachClient(w.sid) - if err != nil { - slog.Default().Error("error when detaching client", "error", err.Error()) - } - - err = w.sc.CloseSession(w.sid) - if err != nil { - slog.Default().Error("error when closing session", "error", err.Error()) - } - - w.sc, w.in, w.out = nil, nil, nil - w.workerUUID, w.sid = uuid.Nil, uuid.Nil - w.running = false - - w.mu.Unlock() - return nil -} - -func (w *BaseStatefulWorker) IsRunning() bool { - w.mu.RLock() - running := w.running - w.mu.RUnlock() - - return running -} - -func (w *BaseStatefulWorker) ID() uuid.UUID { - w.mu.RLock() - id := w.workerUUID - w.mu.RUnlock() - - return id -} - -func (w *BaseStatefulWorker) SetReceiveIdentifiers(ids []domain.Identifier) error { - w.mu.RLock() - if !w.running { - w.mu.RUnlock() - return ErrWorkerNotRunning - } - - w.mu.RUnlock() - return w.sc.ConfigureSession(w.sid, ids) -} - -func (w *BaseStatefulWorker) In() chan<- domain.Message { - w.mu.RLock() - ch := w.in - w.mu.RUnlock() - - return ch -} - -func (w *BaseStatefulWorker) Out() <-chan domain.Message { - w.mu.RLock() - ch := w.out - w.mu.RUnlock() - - return ch -} +type Factory func() Worker diff --git a/services/data_service/internal/provider/providers/binance/ws/binance_futures.go b/services/data_service/internal/worker/workers/binance/ws/binance_futures.go similarity index 100% rename from services/data_service/internal/provider/providers/binance/ws/binance_futures.go rename to services/data_service/internal/worker/workers/binance/ws/binance_futures.go diff --git a/services/data_service/internal/provider/providers/binance/ws/shard.go b/services/data_service/internal/worker/workers/binance/ws/shard.go similarity index 100% rename from services/data_service/internal/provider/providers/binance/ws/shard.go rename to services/data_service/internal/worker/workers/binance/ws/shard.go diff --git a/services/data_service/internal/provider/providers/binance/ws/subjects.go b/services/data_service/internal/worker/workers/binance/ws/subjects.go similarity index 100% rename from services/data_service/internal/provider/providers/binance/ws/subjects.go rename to services/data_service/internal/worker/workers/binance/ws/subjects.go