Merge branch '1-deprecate-providers-package-in-favor-of-worker' into 'main'

Resolve "Deprecate Providers package in favor of Worker"

Closes #1

See merge request phillmichelsen/tessera!2
This commit is contained in:
2025-10-09 15:53:02 +00:00
26 changed files with 1695 additions and 2239 deletions

View File

@@ -18,6 +18,10 @@ message Identifier {
string key = 1;
}
message Pattern {
string key = 2;
}
message Message {
Identifier identifier = 1;
bytes payload = 2;
@@ -29,7 +33,7 @@ message StartStreamResponse { string stream_uuid = 1; }
message ConfigureStreamRequest {
string stream_uuid = 1;
repeated Identifier identifiers = 2;
repeated Pattern patterns = 2;
}
message ConfigureStreamResponse {}

View File

@@ -2,20 +2,13 @@ package main
import (
"log/slog"
"net"
"os"
"time"
"github.com/lmittmann/tint"
pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider/providers/binance/ws"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider/providers/test"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/router"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/server"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/worker"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
func initLogger() *slog.Logger {
@@ -58,72 +51,8 @@ func main() {
// Setup
wr := worker.NewRegistry()
r := router.NewRouter(2048)
m := manager.NewManager(r, wr)
// Providers
testProvider := test.NewTestProvider(r.IncomingChannel(), time.Microsecond*100)
if err := m.AddProvider("test_provider", testProvider); err != nil {
slog.Error("add provider failed", "err", err)
os.Exit(1)
}
binanceFuturesWebsocket := ws.NewBinanceFuturesWebsocket(ws.Config{}, r.IncomingChannel())
if err := m.AddProvider("binance_futures", binanceFuturesWebsocket); err != nil {
slog.Error("add provider failed", "err", err)
os.Exit(1)
}
// gRPC Control Server
grpcControlServer := grpc.NewServer()
go func() {
pb.RegisterDataServiceControlServer(grpcControlServer, server.NewGRPCControlServer(m))
reflection.Register(grpcControlServer)
lis, err := net.Listen("tcp", ":50051")
if err != nil {
slog.Error("listen failed", "cmp", "grpc-control", "addr", ":50051", "err", err)
os.Exit(1)
}
slog.Info("listening", "cmp", "grpc-control", "addr", ":50051")
if err := grpcControlServer.Serve(lis); err != nil {
slog.Error("serve failed", "cmp", "grpc-control", "err", err)
os.Exit(1)
}
}()
// gRPC Streaming Server
grpcStreamingServer := grpc.NewServer()
go func() {
pb.RegisterDataServiceStreamingServer(grpcStreamingServer, server.NewGRPCStreamingServer(m))
reflection.Register(grpcStreamingServer)
lis, err := net.Listen("tcp", ":50052")
if err != nil {
slog.Error("listen failed", "cmp", "grpc-streaming", "addr", ":50052", "err", err)
os.Exit(1)
}
slog.Info("listening", "cmp", "grpc-streaming", "addr", ":50052")
if err := grpcStreamingServer.Serve(lis); err != nil {
slog.Error("serve failed", "cmp", "grpc-streaming", "err", err)
os.Exit(1)
}
}()
// Socket Streaming Server
socketStreamingServer := server.NewSocketStreamingServer(m)
go func() {
lis, err := net.Listen("tcp", ":50060")
if err != nil {
slog.Error("listen failed", "cmp", "socket-streaming", "addr", ":50060", "err", err)
os.Exit(1)
}
slog.Info("listening", "cmp", "socket-streaming", "addr", ":50060")
if err := socketStreamingServer.Serve(lis); err != nil {
slog.Error("serve failed", "cmp", "socket-streaming", "err", err)
os.Exit(1)
}
}()
r, _ := router.NewRouter("actor", 2048, 512)
_ = manager.NewManager(r, wr)
select {}
}

View File

@@ -1,308 +0,0 @@
package main
import (
"context"
"flag"
"fmt"
"math"
"os"
"os/signal"
"strings"
"sync/atomic"
"syscall"
"time"
pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
)
type idsFlag []string
func (i *idsFlag) String() string { return strings.Join(*i, ",") }
func (i *idsFlag) Set(v string) error {
if v == "" {
return nil
}
*i = append(*i, v)
return nil
}
func parseIDPair(s string) (provider, subject string, err error) {
parts := strings.SplitN(s, ":", 2)
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
return "", "", fmt.Errorf("want provider:subject, got %q", s)
}
return parts[0], parts[1], nil
}
func toIdentifierKey(input string) (string, error) {
if strings.Contains(input, "::") {
return input, nil
}
prov, subj, err := parseIDPair(input)
if err != nil {
return "", err
}
return "raw::" + strings.ToLower(prov) + "." + subj, nil
}
func waitReady(ctx context.Context, conn *grpc.ClientConn) error {
for {
s := conn.GetState()
if s == connectivity.Ready {
return nil
}
if !conn.WaitForStateChange(ctx, s) {
if ctx.Err() != nil {
return ctx.Err()
}
return fmt.Errorf("WaitForStateChange returned without state change")
}
}
}
type streamStats struct {
TotalMsgs int64
TotalBytes int64
TickMsgs int64
TickBytes int64
}
type stats struct {
TotalMsgs int64
TotalBytes int64
ByStream map[string]*streamStats
}
func main() {
var ids idsFlag
var ctlAddr string
var strAddr string
var timeout time.Duration
var refresh time.Duration
flag.Var(&ids, "id", "identifier (provider:subject or canonical key); repeatable")
flag.StringVar(&ctlAddr, "ctl", "127.0.0.1:50051", "gRPC control address")
flag.StringVar(&strAddr, "str", "127.0.0.1:50052", "gRPC streaming address")
flag.DurationVar(&timeout, "timeout", 10*time.Second, "start/config/connect timeout")
flag.DurationVar(&refresh, "refresh", 1*time.Second, "dashboard refresh interval")
flag.Parse()
if len(ids) == 0 {
_, _ = fmt.Fprintln(os.Stderr, "provide at least one --id (provider:subject or canonical key)")
os.Exit(2)
}
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
// Control channel
ccCtl, err := grpc.NewClient(
ctlAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "new control client: %v\n", err)
os.Exit(1)
}
defer ccCtl.Close()
ccCtl.Connect()
ctlConnCtx, cancelCtlConn := context.WithTimeout(ctx, timeout)
if err := waitReady(ctlConnCtx, ccCtl); err != nil {
cancelCtlConn()
_, _ = fmt.Fprintf(os.Stderr, "connect control: %v\n", err)
os.Exit(1)
}
cancelCtlConn()
ctl := pb.NewDataServiceControlClient(ccCtl)
// Start stream
ctxStart, cancelStart := context.WithTimeout(ctx, timeout)
startResp, err := ctl.StartStream(ctxStart, &pb.StartStreamRequest{})
cancelStart()
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "StartStream: %v\n", err)
os.Exit(1)
}
streamUUID := startResp.GetStreamUuid()
fmt.Printf("stream: %s\n", streamUUID)
// Configure identifiers
var pbIDs []*pb.Identifier
orderedIDs := make([]string, 0, len(ids))
for _, s := range ids {
key, err := toIdentifierKey(s)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "bad --id: %v\n", err)
os.Exit(2)
}
pbIDs = append(pbIDs, &pb.Identifier{Key: key})
orderedIDs = append(orderedIDs, key) // preserve CLI order
}
ctxCfg, cancelCfg := context.WithTimeout(ctx, timeout)
_, err = ctl.ConfigureStream(ctxCfg, &pb.ConfigureStreamRequest{
StreamUuid: streamUUID,
Identifiers: pbIDs,
})
cancelCfg()
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "ConfigureStream: %v\n", err)
os.Exit(1)
}
fmt.Printf("configured %d identifiers\n", len(pbIDs))
// Streaming connection
ccStr, err := grpc.NewClient(
strAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "new streaming client: %v\n", err)
os.Exit(1)
}
defer ccStr.Close()
ccStr.Connect()
strConnCtx, cancelStrConn := context.WithTimeout(ctx, timeout)
if err := waitReady(strConnCtx, ccStr); err != nil {
cancelStrConn()
_, _ = fmt.Fprintf(os.Stderr, "connect streaming: %v\n", err)
os.Exit(1)
}
cancelStrConn()
str := pb.NewDataServiceStreamingClient(ccStr)
streamCtx, streamCancel := context.WithCancel(ctx)
defer streamCancel()
srv, err := str.ConnectStream(streamCtx, &pb.ConnectStreamRequest{StreamUuid: streamUUID})
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "ConnectStream: %v\n", err)
os.Exit(1)
}
fmt.Println("connected; streaming… (Ctrl-C to quit)")
// Receiver goroutine → channel
type msgWrap struct {
idKey string
size int
err error
}
msgCh := make(chan msgWrap, 1024)
go func() {
for {
m, err := srv.Recv()
if err != nil {
msgCh <- msgWrap{err: err}
close(msgCh)
return
}
id := m.GetIdentifier().GetKey()
msgCh <- msgWrap{idKey: id, size: len(m.GetPayload())}
}
}()
// Stats and dashboard
st := &stats{ByStream: make(map[string]*streamStats)}
seen := make(map[string]bool, len(orderedIDs))
for _, id := range orderedIDs {
seen[id] = true
}
tick := time.NewTicker(refresh)
defer tick.Stop()
clear := func() { fmt.Print("\033[H\033[2J") }
header := func() {
fmt.Printf("stream: %s now: %s refresh: %s\n",
streamUUID, time.Now().Format(time.RFC3339), refresh)
fmt.Println("--------------------------------------------------------------------------------------")
fmt.Printf("%-56s %10s %14s %12s %16s\n", "identifier", "msgs/s", "bytes/s", "total", "total_bytes")
fmt.Println("--------------------------------------------------------------------------------------")
}
printAndReset := func() {
clear()
header()
var totMsgsPS, totBytesPS float64
for _, id := range orderedIDs {
s, ok := st.ByStream[id]
var msgsPS, bytesPS float64
var totMsgs, totBytes int64
if ok {
// Convert window counts into per-second rates.
msgsPS = float64(atomic.SwapInt64(&s.TickMsgs, 0)) / refresh.Seconds()
bytesPS = float64(atomic.SwapInt64(&s.TickBytes, 0)) / refresh.Seconds()
totMsgs = atomic.LoadInt64(&s.TotalMsgs)
totBytes = atomic.LoadInt64(&s.TotalBytes)
}
totMsgsPS += msgsPS
totBytesPS += bytesPS
fmt.Printf("%-56s %10d %14d %12d %16d\n",
id,
int64(math.Round(msgsPS)),
int64(math.Round(bytesPS)),
totMsgs,
totBytes,
)
}
fmt.Println("--------------------------------------------------------------------------------------")
fmt.Printf("%-56s %10d %14d %12d %16d\n",
"TOTAL",
int64(math.Round(totMsgsPS)),
int64(math.Round(totBytesPS)),
atomic.LoadInt64(&st.TotalMsgs),
atomic.LoadInt64(&st.TotalBytes),
)
}
for {
select {
case <-ctx.Done():
fmt.Println("\nshutting down")
return
case <-tick.C:
printAndReset()
case mw, ok := <-msgCh:
if !ok {
return
}
if mw.err != nil {
if ctx.Err() != nil {
return
}
_, _ = fmt.Fprintf(os.Stderr, "recv: %v\n", mw.err)
os.Exit(1)
}
// Maintain stable order: append new identifiers at first sight.
if !seen[mw.idKey] {
seen[mw.idKey] = true
orderedIDs = append(orderedIDs, mw.idKey)
}
// Account
atomic.AddInt64(&st.TotalMsgs, 1)
atomic.AddInt64(&st.TotalBytes, int64(mw.size))
ss := st.ByStream[mw.idKey]
if ss == nil {
ss = &streamStats{}
st.ByStream[mw.idKey] = ss
}
atomic.AddInt64(&ss.TotalMsgs, 1)
atomic.AddInt64(&ss.TotalBytes, int64(mw.size))
atomic.AddInt64(&ss.TickMsgs, 1)
atomic.AddInt64(&ss.TickBytes, int64(mw.size))
}
}
}

View File

@@ -1,331 +0,0 @@
package main
import (
"bufio"
"context"
"encoding/binary"
"flag"
"fmt"
"io"
"math"
"net"
"os"
"os/signal"
"strings"
"sync/atomic"
"syscall"
"time"
pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/proto"
)
type idsFlag []string
func (i *idsFlag) String() string { return strings.Join(*i, ",") }
func (i *idsFlag) Set(v string) error {
if v == "" {
return nil
}
*i = append(*i, v)
return nil
}
func parseIDPair(s string) (provider, subject string, err error) {
parts := strings.SplitN(s, ":", 2)
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
return "", "", fmt.Errorf("want provider:subject, got %q", s)
}
return parts[0], parts[1], nil
}
func toIdentifierKey(input string) (string, error) {
if strings.Contains(input, "::") {
return input, nil
}
prov, subj, err := parseIDPair(input)
if err != nil {
return "", err
}
return "raw::" + strings.ToLower(prov) + "." + subj, nil
}
func waitReady(ctx context.Context, conn *grpc.ClientConn) error {
for {
s := conn.GetState()
if s == connectivity.Ready {
return nil
}
if !conn.WaitForStateChange(ctx, s) {
if ctx.Err() != nil {
return ctx.Err()
}
return fmt.Errorf("WaitForStateChange returned without state change")
}
}
}
type streamStats struct {
TotalMsgs int64
TotalBytes int64
TickMsgs int64
TickBytes int64
}
type stats struct {
TotalMsgs int64
TotalBytes int64
ByStream map[string]*streamStats
}
func main() {
var ids idsFlag
var ctlAddr string
var strAddr string
var timeout time.Duration
var refresh time.Duration
flag.Var(&ids, "id", "identifier (provider:subject or canonical key); repeatable")
flag.StringVar(&ctlAddr, "ctl", "127.0.0.1:50051", "gRPC control address")
flag.StringVar(&strAddr, "str", "127.0.0.1:50060", "socket streaming address host:port")
flag.DurationVar(&timeout, "timeout", 10*time.Second, "start/config/connect timeout")
flag.DurationVar(&refresh, "refresh", 1*time.Second, "dashboard refresh interval")
flag.Parse()
if len(ids) == 0 {
_, _ = fmt.Fprintln(os.Stderr, "provide at least one --id (provider:subject or canonical key)")
os.Exit(2)
}
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
// Control channel
ccCtl, err := grpc.NewClient(
ctlAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "new control client: %v\n", err)
os.Exit(1)
}
defer ccCtl.Close()
ccCtl.Connect()
ctlConnCtx, cancelCtlConn := context.WithTimeout(ctx, timeout)
if err := waitReady(ctlConnCtx, ccCtl); err != nil {
cancelCtlConn()
_, _ = fmt.Fprintf(os.Stderr, "connect control: %v\n", err)
os.Exit(1)
}
cancelCtlConn()
ctl := pb.NewDataServiceControlClient(ccCtl)
// Start stream
ctxStart, cancelStart := context.WithTimeout(ctx, timeout)
startResp, err := ctl.StartStream(ctxStart, &pb.StartStreamRequest{})
cancelStart()
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "StartStream: %v\n", err)
os.Exit(1)
}
streamUUID := startResp.GetStreamUuid()
fmt.Printf("stream: %s\n", streamUUID)
// Configure identifiers
var pbIDs []*pb.Identifier
orderedIDs := make([]string, 0, len(ids))
for _, s := range ids {
key, err := toIdentifierKey(s)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "bad --id: %v\n", err)
os.Exit(2)
}
pbIDs = append(pbIDs, &pb.Identifier{Key: key})
orderedIDs = append(orderedIDs, key)
}
ctxCfg, cancelCfg := context.WithTimeout(ctx, timeout)
_, err = ctl.ConfigureStream(ctxCfg, &pb.ConfigureStreamRequest{
StreamUuid: streamUUID,
Identifiers: pbIDs,
})
cancelCfg()
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "ConfigureStream: %v\n", err)
os.Exit(1)
}
fmt.Printf("configured %d identifiers\n", len(pbIDs))
// Socket streaming connection
d := net.Dialer{Timeout: timeout, KeepAlive: 30 * time.Second}
conn, err := d.DialContext(ctx, "tcp", strAddr)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "dial socket: %v\n", err)
os.Exit(1)
}
defer conn.Close()
if tc, ok := conn.(*net.TCPConn); ok {
_ = tc.SetNoDelay(true)
_ = tc.SetWriteBuffer(512 * 1024)
_ = tc.SetReadBuffer(512 * 1024)
_ = tc.SetKeepAlive(true)
_ = tc.SetKeepAlivePeriod(30 * time.Second)
}
// Send the stream UUID followed by '\n' per socket server contract.
if _, err := io.WriteString(conn, streamUUID+"\n"); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "send stream UUID: %v\n", err)
os.Exit(1)
}
fmt.Println("connected; streaming… (Ctrl-C to quit)")
// Receiver goroutine → channel
type msgWrap struct {
idKey string
size int
err error
}
msgCh := make(chan msgWrap, 1024)
go func() {
defer close(msgCh)
r := bufio.NewReaderSize(conn, 256*1024)
var hdr [4]byte
for {
if err := conn.SetReadDeadline(time.Now().Add(120 * time.Second)); err != nil {
msgCh <- msgWrap{err: err}
return
}
if _, err := io.ReadFull(r, hdr[:]); err != nil {
msgCh <- msgWrap{err: err}
return
}
n := binary.BigEndian.Uint32(hdr[:])
if n == 0 || n > 64*1024*1024 {
msgCh <- msgWrap{err: fmt.Errorf("invalid frame length: %d", n)}
return
}
buf := make([]byte, n)
if _, err := io.ReadFull(r, buf); err != nil {
msgCh <- msgWrap{err: err}
return
}
var m pb.Message
if err := proto.Unmarshal(buf, &m); err != nil {
msgCh <- msgWrap{err: fmt.Errorf("unmarshal: %w", err)}
return
}
id := m.GetIdentifier().GetKey()
msgCh <- msgWrap{idKey: id, size: len(m.GetPayload())}
}
}()
// Stats and dashboard
st := &stats{ByStream: make(map[string]*streamStats)}
seen := make(map[string]bool, len(orderedIDs))
for _, id := range orderedIDs {
seen[id] = true
}
tick := time.NewTicker(refresh)
defer tick.Stop()
clear := func() { fmt.Print("\033[H\033[2J") }
header := func() {
fmt.Printf("stream: %s now: %s refresh: %s\n",
streamUUID, time.Now().Format(time.RFC3339), refresh)
fmt.Println("--------------------------------------------------------------------------------------")
fmt.Printf("%-56s %10s %14s %12s %16s\n", "identifier", "msgs/s", "bytes/s", "total", "total_bytes")
fmt.Println("--------------------------------------------------------------------------------------")
}
printAndReset := func() {
clear()
header()
var totMsgsPS, totBytesPS float64
for _, id := range orderedIDs {
s, ok := st.ByStream[id]
var msgsPS, bytesPS float64
var totMsgs, totBytes int64
if ok {
msgsPS = float64(atomic.SwapInt64(&s.TickMsgs, 0)) / refresh.Seconds()
bytesPS = float64(atomic.SwapInt64(&s.TickBytes, 0)) / refresh.Seconds()
totMsgs = atomic.LoadInt64(&s.TotalMsgs)
totBytes = atomic.LoadInt64(&s.TotalBytes)
}
totMsgsPS += msgsPS
totBytesPS += bytesPS
fmt.Printf("%-56s %10d %14d %12d %16d\n",
id,
int64(math.Round(msgsPS)),
int64(math.Round(bytesPS)),
totMsgs,
totBytes,
)
}
fmt.Println("--------------------------------------------------------------------------------------")
fmt.Printf("%-56s %10d %14d %12d %16d\n",
"TOTAL",
int64(math.Round(totMsgsPS)),
int64(math.Round(totBytesPS)),
atomic.LoadInt64(&st.TotalMsgs),
atomic.LoadInt64(&st.TotalBytes),
)
}
for {
select {
case <-ctx.Done():
fmt.Println("\nshutting down")
return
case <-tick.C:
printAndReset()
case mw, ok := <-msgCh:
if !ok {
return
}
if mw.err != nil {
if ctx.Err() != nil {
return
}
if ne, ok := mw.err.(net.Error); ok && ne.Timeout() {
_, _ = fmt.Fprintln(os.Stderr, "recv timeout")
} else if mw.err == io.EOF {
_, _ = fmt.Fprintln(os.Stderr, "server closed stream")
} else {
_, _ = fmt.Fprintf(os.Stderr, "recv: %v\n", mw.err)
}
os.Exit(1)
}
if !seen[mw.idKey] {
seen[mw.idKey] = true
orderedIDs = append(orderedIDs, mw.idKey)
}
atomic.AddInt64(&st.TotalMsgs, 1)
atomic.AddInt64(&st.TotalBytes, int64(mw.size))
ss := st.ByStream[mw.idKey]
if ss == nil {
ss = &streamStats{}
st.ByStream[mw.idKey] = ss
}
atomic.AddInt64(&ss.TotalMsgs, 1)
atomic.AddInt64(&ss.TotalBytes, int64(mw.size))
atomic.AddInt64(&ss.TickMsgs, 1)
atomic.AddInt64(&ss.TickBytes, int64(mw.size))
}
}
}

View File

@@ -1,4 +1,3 @@
// Package domain defines external message identifiers.
package domain
import (
@@ -9,36 +8,77 @@ import (
var ErrBadIdentifier = errors.New("identifier: invalid format")
// Identifier is a lightweight wrapper around the canonical key.
// Identifier is an immutable canonical key.
// Canonical form: "namespace::tag1[] . tag2[k=v;foo=bar] . tag3[]"
type Identifier struct{ key string }
// NewIdentifier builds a canonical key: "namespace::l1.l2[param=v;...] .l3".
// Labels and params are sorted for determinism.
func NewIdentifier(namespace string, labels map[string]map[string]string) Identifier {
// NewIdentifier builds a canonical key with strict validation.
// Tags and param keys are sorted. Tags with no params emit "name[]".
// Rejects on: empty namespace, bad tag names, bad keys/values.
func NewIdentifier(namespace string, tags map[string]map[string]string) (Identifier, error) {
ns := strings.TrimSpace(namespace)
if !validNamespace(ns) {
return Identifier{}, ErrBadIdentifier
}
// Validate and copy to protect immutability and reject dup keys early.
clean := make(map[string]map[string]string, len(tags))
for name, params := range tags {
n := strings.TrimSpace(name)
if !validIDTagName(n) {
return Identifier{}, ErrBadIdentifier
}
if _, exists := clean[n]; exists {
// impossible via map input, but keep the intent explicit
return Identifier{}, ErrBadIdentifier
}
if len(params) == 0 {
clean[n] = map[string]string{}
continue
}
dst := make(map[string]string, len(params))
for k, v := range params {
kk := strings.TrimSpace(k)
vv := strings.TrimSpace(v)
if !validParamKey(kk) || !validIDParamValue(vv) {
return Identifier{}, ErrBadIdentifier
}
if _, dup := dst[kk]; dup {
return Identifier{}, ErrBadIdentifier
}
dst[kk] = vv
}
clean[n] = dst
}
var b strings.Builder
// rough prealloc: ns + "::" + avg label + some params
b.Grow(len(namespace) + 2 + 10*len(labels) + 20)
// Rough capacity hint.
b.Grow(len(ns) + 2 + 16*len(clean) + 32)
// namespace
b.WriteString(namespace)
b.WriteString(ns)
b.WriteString("::")
// sort label names for stable output
labelNames := make([]string, 0, len(labels))
for name := range labels {
labelNames = append(labelNames, name)
// stable tag order
names := make([]string, 0, len(clean))
for n := range clean {
names = append(names, n)
}
sort.Strings(labelNames)
sort.Strings(names)
for i, name := range labelNames {
for i, name := range names {
if i > 0 {
b.WriteByte('.')
}
b.WriteString(name)
// params (sorted)
params := labels[name]
if len(params) > 0 {
params := clean[name]
if len(params) == 0 {
b.WriteString("[]")
continue
}
// stable param order
keys := make([]string, 0, len(params))
for k := range params {
keys = append(keys, k)
@@ -56,9 +96,8 @@ func NewIdentifier(namespace string, labels map[string]map[string]string) Identi
}
b.WriteByte(']')
}
}
return Identifier{key: b.String()}
return Identifier{key: b.String()}, nil
}
// NewIdentifierFromRaw wraps a raw key without validation.
@@ -67,65 +106,156 @@ func NewIdentifierFromRaw(raw string) Identifier { return Identifier{key: raw} }
// Key returns the canonical key string.
func (id Identifier) Key() string { return id.key }
// Parse returns namespace and labels from Key.
// Format: "namespace::label1.label2[param=a;foo=bar].label3"
// Parse returns namespace and tags from Key.
// Accepts "tag" (bare) as "tag[]". Emits "name[]"/"[k=v;...]". First token wins on duplicates.
func (id Identifier) Parse() (string, map[string]map[string]string, error) {
k := id.key
// namespace
i := strings.Index(k, "::")
if i <= 0 {
return "", nil, ErrBadIdentifier
}
ns := k[:i]
if ns == "" {
ns := strings.TrimSpace(k[:i])
if !validNamespace(ns) {
return "", nil, ErrBadIdentifier
}
raw := k[i+2:]
lbls := make(map[string]map[string]string, 8)
tags := make(map[string]map[string]string, 8)
if raw == "" {
return ns, lbls, nil
return ns, tags, nil
}
for tok := range strings.SplitSeq(raw, ".") {
tok = strings.TrimSpace(tok)
if tok == "" {
continue
}
name, params, err := parseLabel(tok)
if err != nil || name == "" {
return "", nil, ErrBadIdentifier
}
lbls[name] = params
}
return ns, lbls, nil
}
// parseLabel parses "name" or "name[k=v;...]" into (name, params).
func parseLabel(tok string) (string, map[string]string, error) {
lb := strings.IndexByte(tok, '[')
if lb == -1 {
return tok, map[string]string{}, nil
// bare tag => empty params
name := strings.TrimSpace(tok)
if !validIDTagName(name) {
return "", nil, ErrBadIdentifier
}
if _, exists := tags[name]; !exists {
tags[name] = map[string]string{}
}
continue
}
rb := strings.LastIndexByte(tok, ']')
if rb == -1 || rb < lb {
if rb == -1 || rb < lb || rb != len(tok)-1 {
return "", nil, ErrBadIdentifier
}
name := tok[:lb]
paramStr := tok[lb+1 : rb]
params := map[string]string{}
if paramStr == "" {
return name, params, nil
name := strings.TrimSpace(tok[:lb])
if !validIDTagName(name) {
return "", nil, ErrBadIdentifier
}
// first tag wins
if _, exists := tags[name]; exists {
continue
}
for pair := range strings.SplitSeq(paramStr, ";") {
body := tok[lb+1 : rb]
// forbid outer whitespace like "[ x=1 ]"
if body != strings.TrimSpace(body) {
return "", nil, ErrBadIdentifier
}
if body == "" {
tags[name] = map[string]string{}
continue
}
// parse "k=v;foo=bar"
params := make(map[string]string, 4)
for pair := range strings.SplitSeq(body, ";") {
pair = strings.TrimSpace(pair)
if pair == "" {
continue
}
kv := strings.SplitN(pair, "=", 2)
if len(kv) != 2 || kv[0] == "" {
if len(kv) != 2 {
return "", nil, ErrBadIdentifier
}
params[kv[0]] = kv[1]
key := strings.TrimSpace(kv[0])
val := strings.TrimSpace(kv[1])
if !validParamKey(key) || !validIDParamValue(val) || val == "" {
return "", nil, ErrBadIdentifier
}
return name, params, nil
// first key wins
if _, dup := params[key]; !dup {
params[key] = val
}
}
tags[name] = params
}
return ns, tags, nil
}
// --- validation helpers ---
func validNamespace(s string) bool {
if s == "" {
return false
}
for _, r := range s {
switch r {
case '[', ']', ':':
return false
}
if isSpace(r) {
return false
}
}
return true
}
func validIDTagName(s string) bool {
if s == "" {
return false
}
for _, r := range s {
switch r {
case '[', ']', '.', ':': // added ':'
return false
}
if isSpace(r) {
return false
}
}
return true
}
func validParamKey(s string) bool {
if s == "" {
return false
}
for _, r := range s {
switch r {
case '[', ']', ';', '=':
return false
}
if isSpace(r) {
return false
}
}
return true
}
func validIDParamValue(s string) bool {
// allow spaces; forbid only bracket, pair, and kv delimiters
for _, r := range s {
switch r {
case '[', ']', ';', '=':
return false
}
}
return true
}
func isSpace(r rune) bool { return r == ' ' || r == '\t' || r == '\n' || r == '\r' }

View File

@@ -0,0 +1,141 @@
package domain
import (
"reflect"
"testing"
)
func TestNewIdentifier_CanonicalAndValidation(t *testing.T) {
t.Run("canonical ordering and formatting", func(t *testing.T) {
id, err := NewIdentifier("ns", map[string]map[string]string{
"b": {"y": "2", "x": "1"},
"a": {},
})
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
got := id.Key()
want := "ns::a[].b[x=1;y=2]"
if got != want {
t.Fatalf("key mismatch\ngot: %q\nwant: %q", got, want)
}
})
t.Run("trim whitespace and validate", func(t *testing.T) {
id, err := NewIdentifier(" ns ", map[string]map[string]string{
" tag ": {" k ": " v "},
})
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if id.Key() != "ns::tag[k=v]" {
t.Fatalf("unexpected canonical: %q", id.Key())
}
})
t.Run("reject bad namespace", func(t *testing.T) {
cases := []string{"", "a:b", "a[b]"}
for _, ns := range cases {
if _, err := NewIdentifier(ns, nil); err == nil {
t.Fatalf("expected error for ns=%q", ns)
}
}
})
t.Run("reject bad tag names", func(t *testing.T) {
for _, name := range []string{"", "bad.", "bad[", "bad]", "a:b"} {
_, err := NewIdentifier("ns", map[string]map[string]string{
name: {},
})
if err == nil {
t.Fatalf("expected error for tag name %q", name)
}
}
})
t.Run("reject bad param keys and values", func(t *testing.T) {
badKeys := []string{"", "k;", "k[", "k]", "k="}
for _, k := range badKeys {
if _, err := NewIdentifier("ns", map[string]map[string]string{
"t": {k: "ok"},
}); err == nil {
t.Fatalf("expected error for bad key %q", k)
}
}
for _, v := range []string{"bad;", "bad[", "bad]", "a=b"} {
if _, err := NewIdentifier("ns", map[string]map[string]string{
"t": {"k": v},
}); err == nil {
t.Fatalf("expected error for bad value %q", v)
}
}
})
}
func TestIdentifier_Parse_RoundTripAndTolerance(t *testing.T) {
t.Run("round trip from constructor", func(t *testing.T) {
id, err := NewIdentifier("ns", map[string]map[string]string{
"a": {},
"b": {"x": "1", "y": "2"},
})
if err != nil {
t.Fatal(err)
}
ns, tags, err := id.Parse()
if err != nil {
t.Fatal(err)
}
if ns != "ns" {
t.Fatalf("ns: got %q", ns)
}
want := map[string]map[string]string{"a": {}, "b": {"x": "1", "y": "2"}}
if !reflect.DeepEqual(tags, want) {
t.Fatalf("tags mismatch\ngot: %#v\nwant: %#v", tags, want)
}
})
t.Run("parse bare tag as empty params", func(t *testing.T) {
id := NewIdentifierFromRaw("ns::a.b[]")
_, tags, err := id.Parse()
if err != nil {
t.Fatal(err)
}
if len(tags["a"]) != 0 || len(tags["b"]) != 0 {
t.Fatalf("expected empty params, got %#v", tags)
}
})
t.Run("first token wins on duplicate tags and params", func(t *testing.T) {
id := NewIdentifierFromRaw("ns::t[x=1;y=2].t[x=9].u[k=1;k=2]")
_, tags, err := id.Parse()
if err != nil {
t.Fatal(err)
}
if tags["t"]["x"] != "1" || tags["t"]["y"] != "2" {
t.Fatalf("first tag should win, got %#v", tags["t"])
}
if tags["u"]["k"] != "1" {
t.Fatalf("first param key should win, got %#v", tags["u"])
}
})
t.Run("reject malformed", func(t *testing.T) {
bads := []string{
"", "no_ns", "ns:onecolon", "::missingns::tag[]", "ns::tag[", "ns::tag]", "ns::[]",
"ns::tag[]junk", "ns::tag[x=1;y]", "ns::tag[=1]", "ns::tag[ x=1 ]", // spaces inside keys are rejected
}
for _, s := range bads {
_, _, err := NewIdentifierFromRaw(s).Parse()
if err == nil {
t.Fatalf("expected parse error for %q", s)
}
}
})
}
func TestIdentifier_NewThenParse_ForbidsColonInTagName(t *testing.T) {
_, err := NewIdentifier("ns", map[string]map[string]string{"a:b": {}})
if err == nil {
t.Fatal("expected error due to ':' in tag name")
}
}

View File

@@ -1,3 +1,4 @@
// Package domain contains all key domain types
package domain
type Message struct {

View File

@@ -1,45 +1,114 @@
package domain
import (
"errors"
"sort"
"strings"
)
type Pattern struct {
Namespace string
Labels map[string]map[string]string
Exact bool
var ErrBadPattern = errors.New("pattern: invalid format")
// ParamMatchKind selects how a tag's params must match.
type ParamMatchKind uint8
const (
MatchAny ParamMatchKind = iota // "tag" or "tag[*]"
MatchNone // "tag[]"
MatchExact // "tag[k=v;...]"
)
// TagSpec is the per-tag constraint.
type TagSpec struct {
Kind ParamMatchKind
Params map[string]string // only for MatchExact; keys sorted on emit
}
// Canonical returns a canonical string representation of the Pattern struct
// TODO: Ensure labels and namespaces are set to lowercase
func (p *Pattern) Canonical() string {
var b strings.Builder
b.Grow(len(p.Namespace) + 10*len(p.Labels) + 20) // preallocate a rough size estimate
// Pattern is an immutable canonical key.
// Canonical form (tags unordered in input, sorted on emit):
//
// namespace::tag1[]:tag2[*]:tag3[k=v;foo=bar].* // superset
// namespace::tag1[]:tag2[*]:tag3[k=v;foo=bar] // exact set
type Pattern struct{ key string }
b.WriteString(p.Namespace)
// NewPattern builds the canonical key from structured input with strict validation.
// If a tag name equals "*" it sets superset and omits it from canonical tags.
func NewPattern(namespace string, tags map[string]TagSpec, superset bool) (Pattern, error) {
ns := strings.TrimSpace(namespace)
if !validNamespace(ns) {
return Pattern{}, ErrBadPattern
}
// Validate tags and normalize.
clean := make(map[string]TagSpec, len(tags))
for name, spec := range tags {
n := strings.TrimSpace(name)
if n == "*" {
superset = true
continue
}
if !validPatternTagName(n) {
return Pattern{}, ErrBadPattern
}
switch spec.Kind {
case MatchAny:
clean[n] = TagSpec{Kind: MatchAny}
case MatchNone:
clean[n] = TagSpec{Kind: MatchNone}
case MatchExact:
if len(spec.Params) == 0 {
// Treat empty exact as none.
clean[n] = TagSpec{Kind: MatchNone}
continue
}
dst := make(map[string]string, len(spec.Params))
for k, v := range spec.Params {
kk := strings.TrimSpace(k)
vv := strings.TrimSpace(v)
if !validParamKey(kk) || !validPatternParamValue(vv) {
return Pattern{}, ErrBadPattern
}
if _, dup := dst[kk]; dup {
return Pattern{}, ErrBadPattern
}
dst[kk] = vv
}
clean[n] = TagSpec{Kind: MatchExact, Params: dst}
default:
// Reject unknown kinds rather than silently defaulting.
return Pattern{}, ErrBadPattern
}
}
var b strings.Builder
b.Grow(len(ns) + 2 + 16*len(clean) + 32 + 2)
b.WriteString(ns)
b.WriteString("::")
labelNames := make([]string, 0, len(p.Labels))
for name := range p.Labels {
labelNames = append(labelNames, name)
names := make([]string, 0, len(clean))
for n := range clean {
names = append(names, n)
}
sort.Strings(labelNames) // sort the labels for determinism
sort.Strings(names)
for i, name := range labelNames {
for i, name := range names {
if i > 0 {
b.WriteByte('|')
b.WriteByte(':')
}
b.WriteString(name)
params := p.Labels[name]
if len(params) > 0 {
keys := make([]string, 0, len(params))
for k := range params {
spec := clean[name]
switch spec.Kind {
case MatchAny:
b.WriteString("[*]")
case MatchNone:
b.WriteString("[]")
case MatchExact:
keys := make([]string, 0, len(spec.Params))
for k := range spec.Params {
keys = append(keys, k)
}
sort.Strings(keys) // sort params for determinism
sort.Strings(keys)
b.WriteByte('[')
for j, k := range keys {
if j > 0 {
@@ -47,48 +116,232 @@ func (p *Pattern) Canonical() string {
}
b.WriteString(k)
b.WriteByte('=')
b.WriteString(params[k])
b.WriteString(spec.Params[k])
}
b.WriteByte(']')
}
}
b.WriteString("::")
if p.Exact {
b.WriteString("t")
} else {
b.WriteString("f")
if superset {
if len(names) > 0 {
b.WriteByte('.')
}
b.WriteByte('*')
}
return Pattern{key: b.String()}, nil
}
return b.String()
// NewPatternFromRaw wraps a raw key without validation.
func NewPatternFromRaw(raw string) Pattern { return Pattern{key: raw} }
// Key returns the canonical key string.
func (p Pattern) Key() string { return p.key }
// Parse returns namespace, tag specs, and superset flag.
// Accepts tokens: "tag", "tag[*]", "tag[]", "tag[k=v;...]". Also accepts ".*" suffix or a ":*" token anywhere.
// First token wins on duplicate tag names; first key wins on duplicate params.
func (p Pattern) Parse() (string, map[string]TagSpec, bool, error) {
k := p.key
// namespace
i := strings.Index(k, "::")
if i <= 0 {
return "", nil, false, ErrBadPattern
}
ns := strings.TrimSpace(k[:i])
if !validNamespace(ns) {
return "", nil, false, ErrBadPattern
}
raw := k[i+2:]
// suffix superset ".*"
superset := false
if strings.HasSuffix(raw, ".*") {
superset = true
raw = raw[:len(raw)-2]
}
// Satisfies checks if a domain.Identifier satisfies the pattern.
func (p *Pattern) Satisfies(id Identifier) bool {
ns, idLabels, err := id.Parse()
if err != nil || ns != p.Namespace {
specs := make(map[string]TagSpec, 8)
if raw == "" {
return ns, specs, superset, nil
}
for tok := range strings.SplitSeq(raw, ":") {
tok = strings.TrimSpace(tok)
if tok == "" {
continue
}
if tok == "*" {
superset = true
continue
}
lb := strings.IndexByte(tok, '[')
if lb == -1 {
name := tok
if !validPatternTagName(name) {
return "", nil, false, ErrBadPattern
}
if _, exists := specs[name]; !exists {
specs[name] = TagSpec{Kind: MatchAny}
}
continue
}
rb := strings.LastIndexByte(tok, ']')
if rb == -1 || rb < lb || rb != len(tok)-1 {
return "", nil, false, ErrBadPattern
}
name := strings.TrimSpace(tok[:lb])
if !validPatternTagName(name) {
return "", nil, false, ErrBadPattern
}
// first tag wins
if _, exists := specs[name]; exists {
continue
}
rawBody := tok[lb+1 : rb]
// forbid outer whitespace like "[ x=1 ]"
if rawBody != strings.TrimSpace(rawBody) {
return "", nil, false, ErrBadPattern
}
body := strings.TrimSpace(rawBody)
switch body {
case "":
specs[name] = TagSpec{Kind: MatchNone}
case "*":
specs[name] = TagSpec{Kind: MatchAny}
default:
params := make(map[string]string, 4)
for pair := range strings.SplitSeq(body, ";") {
pair = strings.TrimSpace(pair)
if pair == "" {
continue
}
kv := strings.SplitN(pair, "=", 2)
if len(kv) != 2 {
return "", nil, false, ErrBadPattern
}
key := strings.TrimSpace(kv[0])
val := strings.TrimSpace(kv[1])
if !validParamKey(key) || !validPatternParamValue(val) || val == "" {
return "", nil, false, ErrBadPattern
}
// first key wins
if _, dup := params[key]; !dup {
params[key] = val
}
}
specs[name] = TagSpec{Kind: MatchExact, Params: params}
}
}
return ns, specs, superset, nil
}
// Equal compares canonical keys.
func (p Pattern) Equal(q Pattern) bool { return p.key == q.key }
// CompiledPattern is a parsed pattern optimized for matching.
type CompiledPattern struct {
ns string
superset bool
specs map[string]TagSpec
}
// Compile parses and returns a compiled form.
func (p Pattern) Compile() (CompiledPattern, error) {
ns, specs, sup, err := p.Parse()
if err != nil {
return CompiledPattern{}, err
}
return CompiledPattern{ns: ns, specs: specs, superset: sup}, nil
}
// Parse on CompiledPattern returns the structured contents without error.
func (cp CompiledPattern) Parse() (namespace string, tags map[string]TagSpec, superset bool) {
return cp.ns, cp.specs, cp.superset
}
// Match parses id and tests it against the pattern.
// Returns false on parse error.
func (p Pattern) Match(id Identifier) bool {
cp, err := p.Compile()
if err != nil {
return false
}
return cp.Match(id)
}
// Match tests id against the compiled pattern.
func (cp CompiledPattern) Match(id Identifier) bool {
ns, tags, err := id.Parse()
if err != nil || ns != cp.ns {
return false
}
// Every pattern label must be present in the identifier.
for lname, wantParams := range p.Labels {
haveParams, ok := idLabels[lname]
// All pattern tags must be satisfied.
for name, spec := range cp.specs {
params, ok := tags[name]
if !ok {
return false
}
// If pattern specifies params, they must be a subset of identifier's params.
for k, v := range wantParams {
hv, ok := haveParams[k]
if !ok || hv != v {
switch spec.Kind {
case MatchAny:
// any or none is fine
case MatchNone:
if len(params) != 0 {
return false
}
case MatchExact:
if len(params) != len(spec.Params) {
return false
}
for k, v := range spec.Params {
if params[k] != v {
return false
}
}
// If pattern has no params for this label, it matches any/none params in the identifier.
default:
return false
}
}
// Exact applies to label names only: no extras allowed.
if p.Exact && len(idLabels) != len(p.Labels) {
// If exact-set match, forbid extra tags.
if !cp.superset && len(tags) != len(cp.specs) {
return false
}
return true
}
// --- validation helpers ---
func validPatternTagName(s string) bool {
if s == "" || s == "*" {
return false
}
for _, r := range s {
switch r {
case '[', ']', ':':
return false
}
if isSpace(r) {
return false
}
}
return true
}
func validPatternParamValue(s string) bool {
// allow spaces; forbid only bracket, pair, and kv delimiters
for _, r := range s {
switch r {
case '[', ']', ';', '=':
return false
}
}
return true
}

View File

@@ -0,0 +1,209 @@
package domain
import (
"reflect"
"testing"
)
func TestNewPattern_Canonical_And_Superset(t *testing.T) {
t.Run("canonical ordering and kinds", func(t *testing.T) {
p, err := NewPattern("ns", map[string]TagSpec{
"b": {Kind: MatchExact, Params: map[string]string{"y": "2", "x": "1"}},
"a": {Kind: MatchNone},
"c": {Kind: MatchAny},
}, false)
if err != nil {
t.Fatal(err)
}
if got, want := p.Key(), "ns::a[]:b[x=1;y=2]:c[*]"; got != want {
t.Fatalf("got %q want %q", got, want)
}
})
t.Run("superset via flag", func(t *testing.T) {
p, err := NewPattern("ns", map[string]TagSpec{"a": {Kind: MatchNone}}, true)
if err != nil {
t.Fatal(err)
}
if got, want := p.Key(), "ns::a[].*"; got != want {
t.Fatalf("got %q want %q", got, want)
}
})
t.Run("superset via '*' tag anywhere", func(t *testing.T) {
p, err := NewPattern("ns", map[string]TagSpec{
"*": {Kind: MatchAny}, // triggers superset; omitted from canonical
"a": {Kind: MatchNone},
}, false)
if err != nil {
t.Fatal(err)
}
if got, want := p.Key(), "ns::a[].*"; got != want {
t.Fatalf("got %q want %q", got, want)
}
})
t.Run("trim and validate", func(t *testing.T) {
p, err := NewPattern(" ns ", map[string]TagSpec{
" tag ": {Kind: MatchAny},
}, false)
if err != nil {
t.Fatal(err)
}
if p.Key() != "ns::tag[*]" {
t.Fatalf("unexpected canonical: %q", p.Key())
}
})
t.Run("reject invalid inputs", func(t *testing.T) {
_, err := NewPattern("", nil, false)
if err == nil {
t.Fatal("expected error for empty namespace")
}
_, err = NewPattern("ns", map[string]TagSpec{"": {Kind: MatchAny}}, false)
if err == nil {
t.Fatal("expected error for empty tag")
}
_, err = NewPattern("ns", map[string]TagSpec{"bad:": {Kind: MatchAny}}, false)
if err == nil {
t.Fatal("expected error for ':' in tag")
}
_, err = NewPattern("ns", map[string]TagSpec{"a": {Kind: MatchExact, Params: map[string]string{"": "1"}}}, false)
if err == nil {
t.Fatal("expected error for empty param key")
}
_, err = NewPattern("ns", map[string]TagSpec{"a": {Kind: MatchExact, Params: map[string]string{"k": "bad;val"}}}, false)
if err == nil {
t.Fatal("expected error for bad param value")
}
})
t.Run("MatchExact with empty params downgrades to []", func(t *testing.T) {
// Behavior matches current constructor: empty exact => MatchNone
p, err := NewPattern("ns", map[string]TagSpec{
"a": {Kind: MatchExact, Params: map[string]string{}},
}, false)
if err != nil {
t.Fatal(err)
}
if p.Key() != "ns::a[]" {
t.Fatalf("unexpected canonical for empty exact: %q", p.Key())
}
})
}
func TestPattern_Parse_Tokens_And_SupersetRecognition(t *testing.T) {
t.Run("accept :* token and .*", func(t *testing.T) {
ns, specs, sup, err := NewPatternFromRaw("ns::a[]:*:b[*]").Parse()
if err != nil {
t.Fatal(err)
}
if ns != "ns" || !sup {
t.Fatalf("ns=%q sup=%v", ns, sup)
}
if specs["a"].Kind != MatchNone || specs["b"].Kind != MatchAny {
t.Fatalf("unexpected specs: %#v", specs)
}
_, specs2, sup2, err := NewPatternFromRaw("ns::a[]:b[*].*").Parse()
if err != nil || !sup2 {
t.Fatalf("parse superset suffix failed: err=%v sup=%v", err, sup2)
}
if !reflect.DeepEqual(specs, specs2) {
t.Fatalf("specs mismatch between forms")
}
})
t.Run("first-wins on duplicate tags and params", func(t *testing.T) {
_, specs, sup, err := NewPatternFromRaw("ns::t[x=1;y=2]:t[*]:u[k=1;k=2]").Parse()
if err != nil || sup {
t.Fatalf("err=%v sup=%v", err, sup)
}
if specs["t"].Kind != MatchExact || specs["t"].Params["x"] != "1" {
t.Fatalf("first tag should win: %#v", specs["t"])
}
if specs["u"].Params["k"] != "1" {
t.Fatalf("first param key should win: %#v", specs["u"])
}
})
t.Run("reject malformed", func(t *testing.T) {
bads := []string{
"", "no_ns", "ns:onecolon", "::missingns::tag[]",
"ns::tag[", "ns::tag]", "ns::[]", "ns::tag[]junk",
"ns::a[=1]", "ns::a[x=]", "ns::a[ x=1 ]",
}
for _, s := range bads {
_, _, _, err := NewPatternFromRaw(s).Parse()
if err == nil {
t.Fatalf("expected parse error for %q", s)
}
}
})
}
func TestPattern_Match_Matrix(t *testing.T) {
makeID := func(key string) Identifier { return NewIdentifierFromRaw(key) }
t.Run("namespace mismatch", func(t *testing.T) {
p := NewPatternFromRaw("ns::a[]")
if p.Match(makeID("other::a[]")) {
t.Fatal("should not match different namespace")
}
})
t.Run("MatchAny accepts empty and nonempty", func(t *testing.T) {
p := NewPatternFromRaw("ns::a[*]")
if !p.Match(makeID("ns::a[]")) || !p.Match(makeID("ns::a[x=1]")) {
t.Fatal("MatchAny should accept both")
}
})
t.Run("MatchNone requires empty", func(t *testing.T) {
p := NewPatternFromRaw("ns::a[]")
if !p.Match(makeID("ns::a[]")) {
t.Fatal("empty should match")
}
if p.Match(makeID("ns::a[x=1]")) {
t.Fatal("nonempty should not match MatchNone")
}
})
t.Run("MatchExact equals, order independent", func(t *testing.T) {
p := NewPatternFromRaw("ns::a[x=1;y=2]")
if !p.Match(makeID("ns::a[y=2;x=1]")) {
t.Fatal("param order should not matter")
}
if p.Match(makeID("ns::a[x=1]")) {
t.Fatal("missing param should fail")
}
if p.Match(makeID("ns::a[x=1;y=2;z=3]")) {
t.Fatal("extra param should fail exact")
}
if p.Match(makeID("ns::a[x=9;y=2]")) {
t.Fatal("different value should fail")
}
})
t.Run("exact-set vs superset", func(t *testing.T) {
exact := NewPatternFromRaw("ns::a[]:b[*]")
super := NewPatternFromRaw("ns::a[]:b[*].*")
if !exact.Match(makeID("ns::a[].b[x=1]")) {
t.Fatal("exact should match same set")
}
if exact.Match(makeID("ns::a[].b[x=1].c[]")) {
t.Fatal("exact should not allow extra tags")
}
if !super.Match(makeID("ns::a[].b[x=1].c[]")) {
t.Fatal("superset should allow extra tags")
}
})
t.Run("all pattern tags must exist", func(t *testing.T) {
p := NewPatternFromRaw("ns::a[]:b[*]")
if p.Match(makeID("ns::a[]")) {
t.Fatal("missing b should fail")
}
})
}

View File

@@ -1,74 +1,69 @@
package manager
import (
"time"
"github.com/google/uuid"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider"
)
// Commands posted into the manager loop. One struct per action.
type addProviderCmd struct {
name string
p provider.Provider
resp chan addProviderResult
// Session Commands
type createSessionCommand struct {
resp chan createSessionResult
}
type addProviderResult struct {
err error
}
type removeProviderCmd struct {
name string
resp chan removeProviderResult
}
type removeProviderResult struct {
err error
}
type newSessionCmd struct {
idleAfter time.Duration
resp chan newSessionResult
}
type newSessionResult struct {
id uuid.UUID
}
type attachCmd struct {
type createSessionResult struct {
sid uuid.UUID
inBuf, outBuf int
resp chan attachResult
}
type attachResult struct {
cin chan<- domain.Message
cout <-chan domain.Message
err error
}
type detachCmd struct {
type leaseSessionReceiverCommand struct {
sid uuid.UUID
resp chan detachResult
resp chan leaseSessionReceiverResult
}
type detachResult struct {
type leaseSessionReceiverResult struct {
receiveFunc func() (domain.Message, error)
err error
}
type configureCmd struct {
type leaseSessionSenderCommand struct {
sid uuid.UUID
next []domain.Pattern
resp chan configureResult
resp chan leaseSessionSenderResult
}
type configureResult struct {
type leaseSessionSenderResult struct {
sendFunc func(domain.Message) error
err error
}
type closeSessionCmd struct {
type releaseSessionReceiverCommand struct {
sid uuid.UUID
resp chan releaseSessionReceiverResult
}
type releaseSessionReceiverResult struct {
err error
}
type releaseSessionSenderCommand struct {
sid uuid.UUID
resp chan releaseSessionSenderResult
}
type releaseSessionSenderResult struct {
err error
}
type configureSessionCommand struct {
sid uuid.UUID
cfg any
resp chan configureSessionResult
}
type configureSessionResult struct {
err error
}
type closeSessionCommand struct {
sid uuid.UUID
resp chan closeSessionResult
}

View File

@@ -1 +1,27 @@
package manager
func workerEntryKey(w WorkerEntry) string {
return w.Type + "|" + string(w.Spec) + "|" + string(w.Unit)
}
func workerEntryDiffs(old, nw []WorkerEntry) (added, removed []WorkerEntry) {
oldKeys := make(map[string]struct{}, len(old))
newKeys := make(map[string]struct{}, len(nw))
for _, w := range old {
oldKeys[workerEntryKey(w)] = struct{}{}
}
for _, w := range nw {
k := workerEntryKey(w)
newKeys[k] = struct{}{}
if _, ok := oldKeys[k]; !ok {
added = append(added, w)
}
}
for _, w := range old {
if _, ok := newKeys[workerEntryKey(w)]; !ok {
removed = append(removed, w)
}
}
return added, removed
}

View File

@@ -1,277 +1,500 @@
// Package manager implements the core orchestration logic for data providers and client sessions
// in the tessera data_service. It manages provider registration, session lifecycle, client attachment,
// stream configuration, and routing of messages between clients and providers.
// Package manager is the manager package!!!
package manager
import (
"errors"
"fmt"
"log/slog"
"time"
"github.com/google/uuid"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/router"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/worker"
)
var (
ErrSessionNotFound = errors.New("session not found")
ErrClientAlreadyAttached = errors.New("client already attached")
ErrClientNotAttached = errors.New("client not attached")
)
var ErrSessionNotFound = errors.New("session not found")
// Manager is a single-goroutine actor that owns all state.
type Manager struct {
// Command channel
cmdCh chan any
// State (loop-owned)
providers map[string]provider.Provider
sessions map[uuid.UUID]*session
// Router
router *router.Router
workerRegistry *WorkerRegistry
workerInstances map[string]map[string]worker.Worker
workerUnitRefCounts map[string]map[string]map[string]int
}
// NewManager creates a manager and starts its run loop.
func NewManager(router *router.Router, workerRegistry *worker.Registry) *Manager {
func NewManager(r *router.Router, _ *WorkerRegistry) *Manager {
m := &Manager{
cmdCh: make(chan any, 256),
providers: make(map[string]provider.Provider),
sessions: make(map[uuid.UUID]*session),
router: router,
router: r,
}
go router.Start()
go r.Start()
go m.run()
slog.Default().Info("manager started", slog.String("cmp", "manager"))
return m
}
// API
// AddProvider adds and starts a new provider.
func (m *Manager) AddProvider(name string, p provider.Provider) error {
slog.Default().Debug("add provider request", slog.String("cmp", "manager"), slog.String("name", name))
resp := make(chan addProviderResult, 1)
m.cmdCh <- addProviderCmd{name: name, p: p, resp: resp}
// CreateSession creates a new session. Arms a 1m idle timer immediately.
func (m *Manager) CreateSession() uuid.UUID {
slog.Default().Debug("create session request", slog.String("cmp", "manager"))
resp := make(chan createSessionResult, 1)
m.cmdCh <- createSessionCommand{resp: resp}
r := <-resp
slog.Default().Info("new session created", slog.String("cmp", "manager"), slog.String("session", r.sid.String()))
return r.sid
}
slog.Default().Info("provider added", slog.String("cmp", "manager"), slog.String("name", name))
// LeaseSessionReceiver leases a receiver and returns the receive func and its close func.
func (m *Manager) LeaseSessionReceiver(sid uuid.UUID) (func() (domain.Message, error), error) {
slog.Default().Debug("lease session receiver request", slog.String("cmp", "manager"), slog.String("session", sid.String()))
resp := make(chan leaseSessionReceiverResult, 1)
m.cmdCh <- leaseSessionReceiverCommand{sid: sid, resp: resp}
r := <-resp
return r.receiveFunc, r.err
}
// LeaseSessionSender leases a sender and returns the send func and its close func.
func (m *Manager) LeaseSessionSender(sid uuid.UUID) (func(domain.Message) error, error) {
slog.Default().Debug("lease sender request", slog.String("cmp", "manager"), slog.String("session", sid.String()))
resp := make(chan leaseSessionSenderResult, 1)
m.cmdCh <- leaseSessionSenderCommand{sid: sid, resp: resp}
r := <-resp
return r.sendFunc, r.err
}
// ReleaseSessionReceiver releases the currently held receiver lease
func (m *Manager) ReleaseSessionReceiver(sid uuid.UUID) error {
slog.Default().Debug("release session receiver request", slog.String("cmp", "manager"), slog.String("session", sid.String()))
resp := make(chan releaseSessionReceiverResult, 1)
m.cmdCh <- releaseSessionReceiverCommand{sid: sid, resp: resp}
r := <-resp
return r.err
}
// RemoveProvider stops and removes a provider, cleaning up all sessions.
func (m *Manager) RemoveProvider(name string) error {
slog.Default().Debug("remove provider request", slog.String("cmp", "manager"), slog.String("name", name))
resp := make(chan removeProviderResult, 1)
m.cmdCh <- removeProviderCmd{name: name, resp: resp}
// ReleaseSessionSender releases the currently held receiver lease
func (m *Manager) ReleaseSessionSender(sid uuid.UUID) error {
slog.Default().Debug("release sender request", slog.String("cmp", "manager"), slog.String("session", sid.String()))
resp := make(chan releaseSessionSenderResult, 1)
m.cmdCh <- releaseSessionSenderCommand{sid: sid, resp: resp}
r := <-resp
slog.Default().Info("provider removed", slog.String("cmp", "manager"), slog.String("name", name))
return r.err
}
// NewSession creates a new session with the given idle timeout.
func (m *Manager) NewSession(idleAfter time.Duration) uuid.UUID {
slog.Default().Debug("new session request", slog.String("cmp", "manager"), slog.Duration("idle_after", idleAfter))
resp := make(chan newSessionResult, 1)
m.cmdCh <- newSessionCmd{idleAfter: idleAfter, resp: resp}
// ConfigureSession applies a session config. Pattern wiring left TODO.
func (m *Manager) ConfigureSession(sid uuid.UUID, cfg any) error {
slog.Default().Debug("configure session request", slog.String("cmp", "manager"), slog.String("session", sid.String()))
resp := make(chan configureSessionResult, 1)
m.cmdCh <- configureSessionCommand{sid: sid, cfg: cfg, resp: resp}
r := <-resp
slog.Default().Info("new session created", slog.String("cmp", "manager"), slog.String("session", r.id.String()))
return r.id
}
// AttachClient attaches a client to a session, creates and returns client channels for the session.
func (m *Manager) AttachClient(id uuid.UUID, inBuf, outBuf int) (chan<- domain.Message, <-chan domain.Message, error) {
slog.Default().Debug("attach client request", slog.String("cmp", "manager"), slog.String("session", id.String()), slog.Int("in_buf", inBuf), slog.Int("out_buf", outBuf))
resp := make(chan attachResult, 1)
m.cmdCh <- attachCmd{sid: id, inBuf: inBuf, outBuf: outBuf, resp: resp}
r := <-resp
slog.Default().Info("client attached", slog.String("cmp", "manager"), slog.String("session", id.String()))
return r.cin, r.cout, r.err
}
// DetachClient detaches the client from the session, closes client channels and arms timeout.
func (m *Manager) DetachClient(id uuid.UUID) error {
slog.Default().Debug("detach client request", slog.String("cmp", "manager"), slog.String("session", id.String()))
resp := make(chan detachResult, 1)
m.cmdCh <- detachCmd{sid: id, resp: resp}
r := <-resp
slog.Default().Info("client detached", slog.String("cmp", "manager"), slog.String("session", id.String()))
return r.err
}
// ConfigureSession sets the next set of patterns for the session, starting and stopping streams as needed.
func (m *Manager) ConfigureSession(id uuid.UUID, next []domain.Pattern) error {
slog.Default().Debug("configure session request", slog.String("cmp", "manager"), slog.String("session", id.String()), slog.Int("patterns", len(next)))
resp := make(chan configureResult, 1)
m.cmdCh <- configureCmd{sid: id, next: next, resp: resp}
r := <-resp
slog.Default().Info("session configured", slog.String("cmp", "manager"), slog.String("session", id.String()), slog.String("err", fmt.Sprintf("%v", r.err)))
return r.err
}
// CloseSession closes and removes the session, cleaning up all bindings.
func (m *Manager) CloseSession(id uuid.UUID) error {
slog.Default().Debug("close session request", slog.String("cmp", "manager"), slog.String("session", id.String()))
// CloseSession closes and removes the session.
func (m *Manager) CloseSession(sid uuid.UUID) error {
slog.Default().Debug("close session request", slog.String("cmp", "manager"), slog.String("session", sid.String()))
resp := make(chan closeSessionResult, 1)
m.cmdCh <- closeSessionCmd{sid: id, resp: resp}
m.cmdCh <- closeSessionCommand{sid: sid, resp: resp}
r := <-resp
slog.Default().Info("session closed", slog.String("cmp", "manager"), slog.String("session", id.String()))
return r.err
}
// The main loop of the manager, processing commands serially.
// --- Loop ---
func (m *Manager) run() {
for {
msg := <-m.cmdCh
for msg := range m.cmdCh {
switch c := msg.(type) {
case addProviderCmd:
m.handleAddProvider(c)
case removeProviderCmd:
m.handleRemoveProvider(c)
case newSessionCmd:
case createSessionCommand:
m.handleNewSession(c)
case attachCmd:
m.handleAttach(c)
case detachCmd:
m.handleDetach(c)
case configureCmd:
m.handleConfigure(c)
case closeSessionCmd:
case leaseSessionReceiverCommand:
m.handleLeaseSessionReceiver(c)
case leaseSessionSenderCommand:
m.handleLeaseSessionSender(c)
case releaseSessionReceiverCommand:
m.handleReleaseSessionReceiver(c)
case releaseSessionSenderCommand:
m.handleReleaseSessionSender(c)
case configureSessionCommand:
m.handleConfigureSession(c)
case closeSessionCommand:
m.handleCloseSession(c)
}
}
}
// Command handlers, run in loop goroutine. With a single goroutine, no locking is needed.
// --- Handlers ---
// handleAddProvider adds and starts a new provider.
func (m *Manager) handleAddProvider(cmd addProviderCmd) {
if _, ok := m.providers[cmd.name]; ok {
slog.Default().Warn("provider already exists", slog.String("cmp", "manager"), slog.String("name", cmd.name))
cmd.resp <- addProviderResult{err: fmt.Errorf("provider exists: %s", cmd.name)}
return
}
if err := cmd.p.Start(); err != nil {
slog.Default().Warn("failed to start provider", slog.String("cmp", "manager"), slog.String("name", cmd.name), slog.String("err", err.Error()))
cmd.resp <- addProviderResult{err: fmt.Errorf("failed to start provider %s: %w", cmd.name, err)}
return
}
m.providers[cmd.name] = cmd.p
cmd.resp <- addProviderResult{err: nil}
}
// handleRemoveProvider stops and removes a provider, removing the bindings from all sessions that use streams from it.
// TODO: Implement this function.
func (m *Manager) handleRemoveProvider(_ removeProviderCmd) {
panic("unimplemented")
}
// handleNewSession creates a new session with the given idle timeout. The idle timeout is typically not set by the client, but by the server configuration.
func (m *Manager) handleNewSession(cmd newSessionCmd) {
s := newSession(cmd.idleAfter)
// Only arm the idle timer if the timeout is positive. We allow a zero or negative timeout to indicate "never timeout".
if s.idleAfter <= 0 {
s.armIdleTimer(func() {
func (m *Manager) handleNewSession(cmd createSessionCommand) {
var s *session
idleCallback := func() { // Generate callback function for the session to be created.
resp := make(chan closeSessionResult, 1)
m.cmdCh <- closeSessionCmd{sid: s.id, resp: resp}
m.cmdCh <- closeSessionCommand{sid: s.id, resp: resp}
<-resp
})
}
s = newSession(m.router.Incoming(), idleCallback)
m.sessions[s.id] = s
cmd.resp <- newSessionResult{id: s.id}
cmd.resp <- createSessionResult{sid: s.id}
}
// handleAttach attaches a client to a session, creating new client channels for the session. If the session is already attached, returns an error.
func (m *Manager) handleAttach(cmd attachCmd) {
func (m *Manager) handleLeaseSessionReceiver(cmd leaseSessionReceiverCommand) {
s, ok := m.sessions[cmd.sid]
if !ok {
cmd.resp <- attachResult{nil, nil, ErrSessionNotFound}
cmd.resp <- leaseSessionReceiverResult{err: ErrSessionNotFound}
return
}
if s.attached {
cmd.resp <- attachResult{nil, nil, ErrClientAlreadyAttached}
recv, err := s.leaseReceiver()
if err != nil {
cmd.resp <- leaseSessionReceiverResult{err: err}
return
}
cin, cout := s.generateNewChannels(cmd.inBuf, cmd.outBuf)
s.attached = true
s.disarmIdleTimer()
cmd.resp <- attachResult{cin: cin, cout: cout, err: nil}
// Register the patterns and egress channel for the session with the router.
patterns := s.getPatterns()
egressChan, ok := s.getEgress()
if !ok {
cmd.resp <- leaseSessionReceiverResult{err: errors.New("egress channel doesn't exist despite successful lease")}
}
// handleDetach detaches the client from the session, closing client channels and arming the idle timeout. If the session is not attached, returns an error.
func (m *Manager) handleDetach(cmd detachCmd) {
for _, pattern := range patterns {
m.router.RegisterPattern(pattern, egressChan)
}
cmd.resp <- leaseSessionReceiverResult{receiveFunc: recv, err: nil}
}
func (m *Manager) handleLeaseSessionSender(cmd leaseSessionSenderCommand) {
s, ok := m.sessions[cmd.sid]
if !ok {
cmd.resp <- detachResult{ErrSessionNotFound}
cmd.resp <- leaseSessionSenderResult{err: ErrSessionNotFound}
return
}
if !s.attached {
cmd.resp <- detachResult{ErrClientNotAttached}
send, err := s.leaseSender()
if err != nil {
cmd.resp <- leaseSessionSenderResult{err: err}
return
}
s.clearChannels()
// Only rearm the idle timer if the timeout is positive.
if s.idleAfter > 0 {
s.armIdleTimer(func() {
resp := make(chan closeSessionResult, 1)
m.cmdCh <- closeSessionCmd{sid: s.id, resp: resp}
<-resp
})
cmd.resp <- leaseSessionSenderResult{sendFunc: send, err: nil}
}
s.attached = false
cmd.resp <- detachResult{nil}
}
// handleConfigure updates the session bindings, starting and stopping streams as needed. Currently only supports Raw streams.
// TODO: Change this configuration to be an atomic operation, so that partial failures do not end in a half-configured state.
func (m *Manager) handleConfigure(cmd configureCmd) {
_, ok := m.sessions[cmd.sid]
func (m *Manager) handleReleaseSessionReceiver(cmd releaseSessionReceiverCommand) {
s, ok := m.sessions[cmd.sid]
if !ok {
cmd.resp <- configureResult{ErrSessionNotFound}
cmd.resp <- releaseSessionReceiverResult{err: ErrSessionNotFound}
return
}
err := s.releaseReceiver()
if err != nil {
cmd.resp <- releaseSessionReceiverResult{err: err}
return
}
cmd.resp <- releaseSessionReceiverResult{err: nil}
}
func (m *Manager) handleReleaseSessionSender(cmd releaseSessionSenderCommand) {
s, ok := m.sessions[cmd.sid]
if !ok {
cmd.resp <- releaseSessionSenderResult{err: ErrSessionNotFound}
return
}
err := s.releaseSender()
if err != nil {
cmd.resp <- releaseSessionSenderResult{err: err}
return
}
cmd.resp <- releaseSessionSenderResult{err: nil}
}
func (m *Manager) handleConfigureSession(cmd configureSessionCommand) {
s, ok := m.sessions[cmd.sid]
if !ok {
cmd.resp <- configureSessionResult{err: ErrSessionNotFound}
return
}
var errs error
cmd.resp <- configureResult{err: errs}
newCfg, ok := cmd.cfg.(SessionConfig)
if !ok {
cmd.resp <- configureSessionResult{err: ErrBadConfig}
return
}
// handleCloseSession closes and removes the session, cleaning up all bindings.
func (m *Manager) handleCloseSession(cmd closeSessionCmd) {
_, ok := m.sessions[cmd.sid]
// Normalize workers.
normalized := make([]WorkerEntry, len(newCfg.Workers))
for i, we := range newCfg.Workers {
spec, err := m.workerRegistry.NormalizeSpecificationBytes(we.Type, we.Spec)
if err != nil {
cmd.resp <- configureSessionResult{err: err}
return
}
unit, err := m.workerRegistry.NormalizeUnitBytes(we.Type, we.Unit)
if err != nil {
cmd.resp <- configureSessionResult{err: err}
return
}
normalized[i] = WorkerEntry{Type: we.Type, Spec: spec, Unit: unit}
}
newCfg.Workers = normalized
// Compute diffs.
curr := append([]WorkerEntry(nil), s.cfg.Workers...)
next := append([]WorkerEntry(nil), newCfg.Workers...)
additions, removals := workerEntryDiffs(curr, next)
// Per-instance delta: type -> spec -> {add, remove}
type delta struct{ add, remove [][]byte }
changes := make(map[string]map[string]delta)
addTo := func(typ, spec string, u []byte, isAdd bool) {
if changes[typ] == nil {
changes[typ] = make(map[string]delta)
}
d := changes[typ][spec]
if isAdd {
d.add = append(d.add, u)
} else {
d.remove = append(d.remove, u)
}
changes[typ][spec] = d
}
for _, e := range additions {
addTo(e.Type, string(e.Spec), e.Unit, true)
}
for _, e := range removals {
addTo(e.Type, string(e.Spec), e.Unit, false)
}
// Ensure manager maps.
if m.workerInstances == nil {
m.workerInstances = make(map[string]map[string]worker.Worker)
}
if m.workerUnitRefCounts == nil {
m.workerUnitRefCounts = make(map[string]map[string]map[string]int)
}
// Rollback snapshots.
type snap struct {
hadInst bool
prevRef map[string]int
}
snaps := make(map[string]map[string]snap) // type -> spec -> snap
created := make(map[string]map[string]bool)
saveSnap := func(typ, spec string) {
if snaps[typ] == nil {
snaps[typ] = make(map[string]snap)
}
if _, ok := snaps[typ][spec]; ok {
return
}
had := false
if m.workerInstances[typ] != nil {
_, had = m.workerInstances[typ][spec]
}
prev := make(map[string]int)
if m.workerUnitRefCounts[typ] != nil && m.workerUnitRefCounts[typ][spec] != nil {
for k, v := range m.workerUnitRefCounts[typ][spec] {
prev[k] = v
}
}
snaps[typ][spec] = snap{hadInst: had, prevRef: prev}
}
markCreated := func(typ, spec string) {
if created[typ] == nil {
created[typ] = make(map[string]bool)
}
created[typ][spec] = true
}
toBytesSlice := func(ref map[string]int) [][]byte {
out := make([][]byte, 0, len(ref))
for k, c := range ref {
if c > 0 {
out = append(out, []byte(k))
}
}
return out
}
restore := func(err error) {
// Restore refcounts and instance unit sets.
for typ, specs := range snaps {
for spec, sn := range specs {
// Restore refcounts exactly.
if m.workerUnitRefCounts[typ] == nil {
m.workerUnitRefCounts[typ] = make(map[string]map[string]int)
}
rc := make(map[string]int)
for k, v := range sn.prevRef {
rc[k] = v
}
m.workerUnitRefCounts[typ][spec] = rc
prevUnits := toBytesSlice(rc)
inst := m.workerInstances[typ][spec]
switch {
case sn.hadInst:
// Ensure instance exists and set units back.
if inst == nil {
wi, ierr := m.workerRegistry.Spawn(typ)
if ierr == nil {
m.workerInstances[typ][spec] = wi
inst = wi
// TODO: pass the correct SessionController
_ = wi.Start([]byte(spec), s) // best-effort
}
}
if inst != nil {
_ = inst.SetUnits(prevUnits) // best-effort
}
default:
// We did not have an instance before. Stop and remove if present.
if inst != nil {
_ = inst.Stop()
delete(m.workerInstances[typ], spec)
if len(m.workerInstances[typ]) == 0 {
delete(m.workerInstances, typ)
}
}
// If no refs remain, clean refcounts map too.
if len(rc) == 0 {
delete(m.workerUnitRefCounts[typ], spec)
if len(m.workerUnitRefCounts[typ]) == 0 {
delete(m.workerUnitRefCounts, typ)
}
}
}
}
}
// Clean up instances created during this op that shouldn't exist.
for typ, specs := range created {
for spec := range specs {
if snaps[typ] != nil && snaps[typ][spec].hadInst {
continue
}
if inst := m.workerInstances[typ][spec]; inst != nil {
_ = inst.Stop()
delete(m.workerInstances[typ], spec)
if len(m.workerInstances[typ]) == 0 {
delete(m.workerInstances, typ)
}
}
}
}
cmd.resp <- configureSessionResult{err: err}
}
// Apply deltas per instance.
for typ, specMap := range changes {
if m.workerUnitRefCounts[typ] == nil {
m.workerUnitRefCounts[typ] = make(map[string]map[string]int)
}
if m.workerInstances[typ] == nil {
m.workerInstances[typ] = make(map[string]worker.Worker)
}
for spec, d := range specMap {
saveSnap(typ, spec)
// Update refcounts.
rc := m.workerUnitRefCounts[typ][spec]
if rc == nil {
rc = make(map[string]int)
m.workerUnitRefCounts[typ][spec] = rc
}
for _, u := range d.remove {
k := string(u)
if rc[k] > 0 {
rc[k]--
}
if rc[k] == 0 {
delete(rc, k)
}
}
for _, u := range d.add {
k := string(u)
rc[k]++
}
desired := toBytesSlice(rc)
inst := m.workerInstances[typ][spec]
switch {
case len(desired) == 0:
// No units desired: stop and prune if instance exists.
if inst != nil {
if err := inst.Stop(); err != nil {
restore(err)
return
}
delete(m.workerInstances[typ], spec)
if len(m.workerInstances[typ]) == 0 {
delete(m.workerInstances, typ)
}
}
// If no refs left, prune refcounts too.
delete(m.workerUnitRefCounts[typ], spec)
if len(m.workerUnitRefCounts[typ]) == 0 {
delete(m.workerUnitRefCounts, typ)
}
default:
// Need instance with desired units.
if inst == nil {
wi, err := m.workerRegistry.Instantiate(typ, []byte(spec))
if err != nil {
restore(err)
return
}
m.workerInstances[typ][spec] = wi
markCreated(typ, spec)
// TODO: pass correct SessionController implementation
if err := wi.Start([]byte(spec), s); err != nil {
restore(err)
return
}
inst = wi
}
if err := inst.SetUnits(desired); err != nil {
restore(err)
return
}
}
}
}
// Commit config last.
if err := s.setConfig(newCfg); err != nil {
restore(err)
return
}
cmd.resp <- configureSessionResult{err: nil}
}
func (m *Manager) handleCloseSession(cmd closeSessionCommand) {
s, ok := m.sessions[cmd.sid]
if !ok {
cmd.resp <- closeSessionResult{err: ErrSessionNotFound}
return
}
var errs error
// TODO: Ensure workers are correctly scrapped
cmd.resp <- closeSessionResult{err: errs}
patterns := s.getPatterns()
egress, ok := s.getEgress()
if ok { // We only need to deregister if there is an active receiver lease.
for _, pattern := range patterns {
m.router.DeregisterPattern(pattern, egress)
}
}
// Release leases and ensure idle timer is disarmed.
s.closeAll()
s.disarmIdleTimer()
delete(m.sessions, cmd.sid)
cmd.resp <- closeSessionResult{err: nil}
}

View File

@@ -1,77 +1,238 @@
package manager
import (
"errors"
"log/slog"
"time"
"github.com/google/uuid"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain"
)
const (
defaultClientBuf = 256
var (
// Lease lifecycle errors.
ErrAlreadyReleased = errors.New("lease already released")
ErrSenderAlreadyLeased = errors.New("sender already leased")
ErrReceiverAlreadyLeased = errors.New("receiver already leased")
ErrSenderNotLeased = errors.New("no sender lease active")
ErrReceiverNotLeased = errors.New("no receiver lease active")
// Config errors
ErrBadConfig = errors.New("config not valid")
ErrConfigActiveLeases = errors.New("cannot configure while a lease is active")
)
// session holds per-session state.
// Owned by the manager loop. So we do not need a mutex.
type WorkerEntry struct {
Type string
Spec []byte
Unit []byte
}
// SessionConfig carries non-live-tunable knobs for a session.
// Manager mutates this directly; session does not expose Configure anymore.
type SessionConfig struct {
IdleAfter time.Duration // <=0 disables idle timer
EgressBuffer int // receiver egress buffer size
Patterns []domain.Pattern
Workers []WorkerEntry
}
// session is manager-owned state. Single goroutine access.
type session struct {
id uuid.UUID
inChannel chan domain.Message // caller writes
outChannel chan domain.Message // caller reads
// Router pipes
ingress chan<- domain.Message // router.Incoming(); router-owned
egress chan domain.Message // current receiver lease egress; owned here
bound map[domain.Identifier]struct{}
attached bool
idleAfter time.Duration
// Config and timers
cfg SessionConfig
idleTimer *time.Timer
idleCallback func() // stored on creation
// Sender lease
sendOpen bool
sendDone chan struct{}
// Receiver lease
receiveOpen bool
receiveDone chan struct{}
}
func newSession(idleAfter time.Duration) *session {
return &session{
// newSession arms a 1-minute idle timer immediately. Manager must
// configure before it fires. idleCb is invoked by the timer.
func newSession(ingress chan<- domain.Message, idleCb func()) *session {
s := &session{
id: uuid.New(),
bound: make(map[domain.Identifier]struct{}),
attached: false,
idleAfter: idleAfter,
ingress: ingress,
cfg: SessionConfig{
IdleAfter: time.Minute, // default 1m on creation
EgressBuffer: 256, // default buffer
},
idleCallback: idleCb,
}
s.armIdleTimer()
return s
}
func (s *session) setConfig(cfg any) error {
if s.sendOpen || s.receiveOpen {
return ErrConfigActiveLeases
}
cfgParsed, ok := cfg.(SessionConfig)
if !ok {
return ErrBadConfig
}
s.cfg = cfgParsed
return nil
}
func (s *session) getEgress() (chan<- domain.Message, bool) {
if s.egress == nil {
return nil, false
}
return s.egress, true
}
func (s *session) getPatterns() []domain.Pattern {
return nil
}
// leaseSender opens a sender lease and returns send(m) error.
func (s *session) leaseSender() (func(domain.Message) error, error) {
if s.sendOpen {
return nil, ErrSenderAlreadyLeased
}
s.sendOpen = true
s.sendDone = make(chan struct{})
s.disarmIdleTimer()
// Snapshot for lease-scoped handle.
done := s.sendDone
sendFunc := func(m domain.Message) error {
select {
case <-done:
return ErrAlreadyReleased
case s.ingress <- m:
return nil
}
}
// armIdleTimer sets the idle timer to call f after idleAfter duration (resets existing timer if any).
func (s *session) armIdleTimer(f func()) {
return sendFunc, nil
}
// releaseSender releases the current sender lease.
func (s *session) releaseSender() error {
if !s.sendOpen {
return ErrSenderNotLeased
}
s.sendOpen = false
if s.sendDone != nil {
close(s.sendDone) // invalidates all prior send funcs
s.sendDone = nil
}
if !s.receiveOpen {
s.armIdleTimer()
}
return nil
}
// leaseReceiver opens a receiver lease and returns receive() (Message, error).
func (s *session) leaseReceiver() (func() (domain.Message, error), error) {
if s.receiveOpen {
return nil, ErrReceiverAlreadyLeased
}
s.receiveOpen = true
s.receiveDone = make(chan struct{})
s.egress = make(chan domain.Message, s.cfg.EgressBuffer)
s.disarmIdleTimer()
// Snapshots for lease-scoped handles.
done := s.receiveDone
eg := s.egress
receiveFunc := func() (domain.Message, error) {
select {
case <-done:
return domain.Message{}, ErrAlreadyReleased
case msg, ok := <-eg:
if !ok {
return domain.Message{}, ErrAlreadyReleased
}
return msg, nil
}
}
return receiveFunc, nil
}
// releaseReceiver releases the current receiver lease.
// Manager must stop any routing into s.egress before calling this.
func (s *session) releaseReceiver() error {
if !s.receiveOpen {
return ErrReceiverNotLeased
}
s.receiveOpen = false
if s.receiveDone != nil {
close(s.receiveDone) // invalidates all prior receive funcs
s.receiveDone = nil
}
if s.egress != nil {
close(s.egress)
s.egress = nil
}
if !s.sendOpen {
s.armIdleTimer()
}
return nil
}
// closeAll force-releases both sender and receiver leases. Safe to call multiple times.
func (s *session) closeAll() {
// Sender
if s.sendOpen {
s.sendOpen = false
if s.sendDone != nil {
close(s.sendDone)
s.sendDone = nil
}
}
// Receiver
if s.receiveOpen {
s.receiveOpen = false
if s.receiveDone != nil {
close(s.receiveDone)
s.receiveDone = nil
}
if s.egress != nil {
close(s.egress)
s.egress = nil
}
}
}
// armIdleTimer arms a timer using stored cfg.IdleAfter and idleCb.
func (s *session) armIdleTimer() {
if s.idleCallback == nil {
slog.Warn("nil idle callback function provided to session")
}
if s.idleTimer != nil {
s.idleTimer.Stop()
s.idleTimer = nil
}
if s.cfg.IdleAfter > 0 && s.idleCallback != nil {
s.idleTimer = time.AfterFunc(s.cfg.IdleAfter, s.idleCallback)
}
s.idleTimer = time.AfterFunc(s.idleAfter, f)
}
// disarmIdleTimer stops and nils the idle timer if any. This call is idempotent.
// disarmIdleTimer disarms the idle timer if active.
func (s *session) disarmIdleTimer() {
if s.idleTimer != nil {
s.idleTimer.Stop()
s.idleTimer = nil
}
}
// generateNewChannels creates new in/out channels for the session, will not close existing channels.
func (s *session) generateNewChannels(inBuf, outBuf int) (chan domain.Message, chan domain.Message) {
if inBuf <= 0 {
inBuf = defaultClientBuf
}
if outBuf <= 0 {
outBuf = defaultClientBuf
}
s.inChannel = make(chan domain.Message, inBuf)
s.outChannel = make(chan domain.Message, outBuf)
return s.inChannel, s.outChannel
}
// clearChannels closes and nils the in/out channels.
func (s *session) clearChannels() {
if s.inChannel != nil {
close(s.inChannel)
s.inChannel = nil
}
if s.outChannel != nil {
close(s.outChannel)
s.outChannel = nil
}
}

View File

@@ -0,0 +1,120 @@
package manager
import (
"errors"
"sync"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/worker"
)
var (
ErrWorkerAlreadyRegistered = errors.New("worker type already registered")
ErrWorkerTypeUnknown = errors.New("unknown worker type")
ErrNilFactory = errors.New("nil worker factory")
ErrNilNormalizer = errors.New("nil worker normalizer")
)
type registryEntry struct {
factory worker.Factory
normalizer worker.Normalizer
}
type WorkerRegistry struct {
mu sync.RWMutex
m map[string]registryEntry
}
func NewWorkerRegistry() *WorkerRegistry {
return &WorkerRegistry{m: make(map[string]registryEntry)}
}
// Register a worker type with its factory and keyer.
func (wr *WorkerRegistry) Register(workerType string, factory worker.Factory, normalizer worker.Normalizer) error {
if factory == nil {
return ErrNilFactory
}
if normalizer == nil {
return ErrNilNormalizer
}
wr.mu.Lock()
defer wr.mu.Unlock()
if _, ok := wr.m[workerType]; ok {
return ErrWorkerAlreadyRegistered
}
wr.m[workerType] = registryEntry{factory: factory, normalizer: normalizer}
return nil
}
// Deregister removes a worker type.
func (wr *WorkerRegistry) Deregister(workerType string) error {
wr.mu.Lock()
defer wr.mu.Unlock()
if _, ok := wr.m[workerType]; !ok {
return ErrWorkerTypeUnknown
}
delete(wr.m, workerType)
return nil
}
// Spawn constructs a new worker instance for the given type.
func (wr *WorkerRegistry) Spawn(workerType string) (worker.Worker, error) {
wr.mu.RLock()
entry, ok := wr.m[workerType]
wr.mu.RUnlock()
if !ok {
return nil, ErrWorkerTypeUnknown
}
return entry.factory(), nil
}
func (wr *WorkerRegistry) NormalizeSpecificationBytes(workerType string, spec []byte) ([]byte, error) {
wr.mu.RLock()
entry, ok := wr.m[workerType]
wr.mu.RUnlock()
if !ok {
return nil, ErrWorkerTypeUnknown
}
return entry.normalizer.NormalizeSpecification(spec)
}
func (wr *WorkerRegistry) NormalizeUnitBytes(workerType string, unit []byte) ([]byte, error) {
wr.mu.RLock()
entry, ok := wr.m[workerType]
wr.mu.RUnlock()
if !ok {
return nil, ErrWorkerTypeUnknown
}
return entry.normalizer.NormalizeUnit(unit)
}
// Factory returns the registered factory.
func (wr *WorkerRegistry) Factory(workerType string) (worker.Factory, error) {
wr.mu.RLock()
entry, ok := wr.m[workerType]
wr.mu.RUnlock()
if !ok {
return nil, ErrWorkerTypeUnknown
}
return entry.factory, nil
}
func (wr *WorkerRegistry) Normalizer(workerType string) (worker.Normalizer, error) {
wr.mu.RLock()
entry, ok := wr.m[workerType]
wr.mu.RUnlock()
if !ok {
return nil, ErrWorkerTypeUnknown
}
return entry.normalizer, nil
}
// RegisteredTypes lists all worker types.
func (wr *WorkerRegistry) RegisteredTypes() []string {
wr.mu.RLock()
defer wr.mu.RUnlock()
out := make([]string, 0, len(wr.m))
for t := range wr.m {
out = append(out, t)
}
return out
}

View File

@@ -1,16 +0,0 @@
package provider
import "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain"
type Provider interface {
Start() error
Stop()
Subscribe(subject string) <-chan error
Unsubscribe(subject string) <-chan error
Fetch(subject string) (domain.Message, error)
GetActiveStreams() []string
IsStreamActive(key string) bool
IsValidSubject(key string, isFetch bool) bool
}

View File

@@ -1,542 +0,0 @@
// Package test implements a configurable synthetic data provider.
//
// Config via subject string. Two syntaxes are accepted:
//
// Query style: "foo?period=7us&size=64&mode=const&burst=1&jitter=0.02&drop=1&ts=1&log=1"
// Path style: "foo/period/7us/size/64/mode/poisson/rate/120000/jitter/0.05/drop/0/ts/1/log/1"
//
// Parameters:
//
// period: Go duration. Inter-message target (wins over rate).
// rate: Messages per second. Used if period absent.
// mode: const | poisson | onoff
// burst: Messages emitted per tick (>=1).
// jitter: ±fraction jitter on period (e.g., 0.05 = ±5%).
// on/off: Durations for onoff mode (e.g., on=5ms&off=1ms).
// size: Payload bytes (>=1). If ts=1 and size<16, auto-extends to 16.
// ptype: bytes | counter | json (payload content generator)
// drop: 1=non-blocking send (drop on backpressure), 0=block.
// ts: 1=prepend 16B header: [sendUnixNano int64][seq int64].
// log: 1=emit per-second metrics via slog.
//
// Notes:
// - Constant mode uses sleep-then-spin pacer for sub-10µs.
// - Poisson mode draws inter-arrivals from Exp(rate).
// - On/Off emits at period during "on", silent during "off" windows.
// - Metrics include msgs/s, bytes/s, drops/s per stream.
// - Fetch is unsupported (returns error).
package test
import (
"context"
"errors"
"fmt"
"math/rand/v2"
"net/url"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain"
)
type TestProvider struct {
mu sync.Mutex
streams map[string]*stream
out chan<- domain.Message
defaults cfg
}
type stream struct {
cancel context.CancelFunc
done chan struct{}
stats *metrics
}
type metrics struct {
sent, dropped atomic.Uint64
prevSent uint64
prevDropped uint64
startUnix int64
}
type mode int
const (
modeConst mode = iota
modePoisson
modeOnOff
)
type ptype int
const (
ptBytes ptype = iota
ptCounter
ptJSON
)
type cfg struct {
period time.Duration // inter-arrival target
rate float64 // msgs/sec if period == 0
jitter float64 // ±fraction
mode mode
onDur time.Duration // for onoff
offDur time.Duration // for onoff
burst int
size int
pType ptype
dropIfSlow bool
embedTS bool
logEverySec bool
}
// NewTestProvider returns a provider with sane defaults.
func NewTestProvider(out chan<- domain.Message, defaultPeriod time.Duration) *TestProvider {
if defaultPeriod <= 0 {
defaultPeriod = 100 * time.Microsecond
}
return &TestProvider{
streams: make(map[string]*stream),
out: out,
defaults: cfg{
period: defaultPeriod,
rate: 0,
jitter: 0,
mode: modeConst,
onDur: 5 * time.Millisecond,
offDur: 1 * time.Millisecond,
burst: 1,
size: 32,
pType: ptBytes,
dropIfSlow: true,
embedTS: true,
},
}
}
func (p *TestProvider) Start() error { return nil }
func (p *TestProvider) Stop() {
p.mu.Lock()
defer p.mu.Unlock()
for key, s := range p.streams {
s.cancel()
<-s.done
delete(p.streams, key)
}
}
func (p *TestProvider) Subscribe(subject string) <-chan error {
errCh := make(chan error, 1)
if !p.IsValidSubject(subject, false) {
errCh <- errors.New("invalid subject")
close(errCh)
return errCh
}
p.mu.Lock()
if _, exists := p.streams[subject]; exists {
p.mu.Unlock()
errCh <- nil
return errCh
}
ctx, cancel := context.WithCancel(context.Background())
s := &stream{
cancel: cancel,
done: make(chan struct{}),
stats: &metrics{startUnix: time.Now().Unix()},
}
p.streams[subject] = s
out := p.out
conf := p.parseCfg(subject)
p.mu.Unlock()
go run(ctx, s, out, subject, conf)
errCh <- nil
return errCh
}
func (p *TestProvider) Unsubscribe(subject string) <-chan error {
errCh := make(chan error, 1)
p.mu.Lock()
s, ok := p.streams[subject]
if !ok {
p.mu.Unlock()
errCh <- errors.New("not subscribed")
return errCh
}
delete(p.streams, subject)
p.mu.Unlock()
go func() {
s.cancel()
<-s.done
errCh <- nil
}()
return errCh
}
func (p *TestProvider) Fetch(_ string) (domain.Message, error) {
return domain.Message{}, fmt.Errorf("fetch not supported by provider")
}
func (p *TestProvider) GetActiveStreams() []string {
p.mu.Lock()
defer p.mu.Unlock()
keys := make([]string, 0, len(p.streams))
for k := range p.streams {
keys = append(keys, k)
}
return keys
}
func (p *TestProvider) IsStreamActive(key string) bool {
p.mu.Lock()
_, ok := p.streams[key]
p.mu.Unlock()
return ok
}
func (p *TestProvider) IsValidSubject(key string, _ bool) bool {
if key == "" {
return false
}
// Accept anything parseable via parseCfg; fallback true.
return true
}
// --- core ---
func run(ctx context.Context, s *stream, out chan<- domain.Message, subject string, c cfg) {
defer close(s.done)
ident, _ := domain.RawID("test_provider", subject)
// Sanitize
if c.burst < 1 {
c.burst = 1
}
if c.size < 1 {
c.size = 1
}
if c.embedTS && c.size < 16 {
c.size = 16
}
if c.period <= 0 {
if c.rate > 0 {
c.period = time.Duration(float64(time.Second) / c.rate)
} else {
c.period = 10 * time.Microsecond
}
}
if c.jitter < 0 {
c.jitter = 0
}
if c.jitter > 0.95 {
c.jitter = 0.95
}
// Per-second logging
var logTicker *time.Ticker
if c.logEverySec {
logTicker = time.NewTicker(time.Second)
defer logTicker.Stop()
}
var seq uint64
base := make([]byte, c.size)
// On/Off state
onUntil := time.Time{}
offUntil := time.Time{}
inOn := true
now := time.Now()
onUntil = now.Add(c.onDur)
// Scheduling
next := time.Now()
for {
select {
case <-ctx.Done():
return
default:
}
switch c.mode {
case modeConst:
// sleep-then-spin to hit sub-10µs with isolated core
if d := time.Until(next); d > 0 {
if d > 30*time.Microsecond {
time.Sleep(d - 30*time.Microsecond)
}
for time.Now().Before(next) {
}
}
case modePoisson:
// draw from exponential with mean=period
lam := 1.0 / float64(c.period)
ia := time.Duration(rand.ExpFloat64() / lam)
next = time.Now().Add(ia)
// No pre-wait here; emit immediately then sleep to next
case modeOnOff:
now = time.Now()
if inOn {
if now.After(onUntil) {
inOn = false
offUntil = now.Add(c.offDur)
continue
}
} else {
if now.After(offUntil) {
inOn = true
onUntil = now.Add(c.onDur)
}
// While off, push next and wait
// Small sleep to avoid busy loop during off
time.Sleep(minDur(c.offDur/4, 200*time.Microsecond))
continue
}
// For on state, behave like const
if d := time.Until(next); d > 0 {
if d > 30*time.Microsecond {
time.Sleep(d - 30*time.Microsecond)
}
for time.Now().Before(next) {
}
}
}
// Emit burst
for i := 0; i < c.burst; i++ {
seq++
payload := base[:c.size]
switch c.pType {
case ptBytes:
fillPattern(payload, uint64(seq))
case ptCounter:
fillCounter(payload, uint64(seq))
case ptJSON:
// build minimal, fixed-size-ish JSON into payload
n := buildJSON(payload, uint64(seq))
payload = payload[:n]
}
if c.embedTS {
ensureCap(&payload, 16)
ts := time.Now().UnixNano()
putInt64(payload[0:8], ts)
putInt64(payload[8:16], int64(seq))
}
msg := domain.Message{
Identifier: ident,
Payload: payload,
}
if out != nil {
if c.dropIfSlow {
select {
case out <- msg:
s.stats.sent.Add(1)
default:
s.stats.dropped.Add(1)
}
} else {
select {
case out <- msg:
s.stats.sent.Add(1)
case <-ctx.Done():
return
}
}
}
}
// Schedule next
adj := c.period
if c.mode == modePoisson {
// next already chosen
} else {
if c.jitter > 0 {
j := (rand.Float64()*2 - 1) * c.jitter
adj = time.Duration(float64(c.period) * (1 + j))
if adj < 0 {
adj = 0
}
}
next = next.Add(adj)
}
// For poisson, actively wait to next
if c.mode == modePoisson {
if d := time.Until(next); d > 0 {
if d > 30*time.Microsecond {
time.Sleep(d - 30*time.Microsecond)
}
for time.Now().Before(next) {
}
}
}
}
}
// --- config parsing ---
func (p *TestProvider) parseCfg(subject string) cfg {
c := p.defaults
// Query style first
if i := strings.Index(subject, "?"); i >= 0 && i < len(subject)-1 {
if qv, err := url.ParseQuery(subject[i+1:]); err == nil {
c = applyQuery(c, qv)
}
}
// Path segments like /key/value/ pairs
parts := strings.Split(subject, "/")
for i := 0; i+1 < len(parts); i += 2 {
k := strings.ToLower(parts[i])
v := parts[i+1]
if k == "" {
continue
}
applyKV(&c, k, v)
}
return c
}
func applyQuery(c cfg, v url.Values) cfg {
for k, vals := range v {
if len(vals) == 0 {
continue
}
applyKV(&c, strings.ToLower(k), vals[0])
}
return c
}
func applyKV(c *cfg, key, val string) {
switch key {
case "period":
if d, err := time.ParseDuration(val); err == nil && d > 0 {
c.period = d
}
case "rate":
if f, err := strconv.ParseFloat(val, 64); err == nil && f > 0 {
c.rate = f
c.period = 0 // let rate take effect if period unset later
}
case "mode":
switch strings.ToLower(val) {
case "const", "steady":
c.mode = modeConst
case "poisson":
c.mode = modePoisson
case "onoff", "burst":
c.mode = modeOnOff
}
case "on":
if d, err := time.ParseDuration(val); err == nil && d >= 0 {
c.onDur = d
}
case "off":
if d, err := time.ParseDuration(val); err == nil && d >= 0 {
c.offDur = d
}
case "burst":
if n, err := strconv.Atoi(val); err == nil && n > 0 {
c.burst = n
}
case "jitter":
if f, err := strconv.ParseFloat(val, 64); err == nil && f >= 0 && f < 1 {
c.jitter = f
}
case "size":
if n, err := strconv.Atoi(val); err == nil && n > 0 {
c.size = n
}
case "ptype":
switch strings.ToLower(val) {
case "bytes":
c.pType = ptBytes
case "counter":
c.pType = ptCounter
case "json":
c.pType = ptJSON
}
case "drop":
c.dropIfSlow = val == "1" || strings.EqualFold(val, "true")
case "ts":
c.embedTS = val == "1" || strings.EqualFold(val, "true")
case "log":
c.logEverySec = val == "1" || strings.EqualFold(val, "true")
}
}
// --- payload builders ---
func fillPattern(b []byte, seed uint64) {
// xorshift for deterministic but non-trivial bytes
if len(b) == 0 {
return
}
x := seed | 1
for i := range b {
x ^= x << 13
x ^= x >> 7
x ^= x << 17
b[i] = byte(x)
}
}
func fillCounter(b []byte, seq uint64) {
for i := range b {
b[i] = byte((seq + uint64(i)) & 0xFF)
}
}
func buildJSON(buf []byte, seq uint64) int {
// Small fixed fields. Truncate if buffer small.
// Example: {"t":1694490000000000,"s":12345,"p":100.12}
ts := time.Now().UnixNano()
price := 10000 + float64(seq%1000)*0.01
str := fmt.Sprintf(`{"t":%d,"s":%d,"p":%.2f}`, ts, seq, price)
n := copy(buf, str)
return n
}
func ensureCap(b *[]byte, need int) {
if len(*b) >= need {
return
}
nb := make([]byte, need)
copy(nb, *b)
*b = nb
}
func putInt64(b []byte, v int64) {
_ = b[7]
b[0] = byte(v >> 56)
b[1] = byte(v >> 48)
b[2] = byte(v >> 40)
b[3] = byte(v >> 32)
b[4] = byte(v >> 24)
b[5] = byte(v >> 16)
b[6] = byte(v >> 8)
b[7] = byte(v)
}
func minDur(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}

View File

@@ -79,7 +79,7 @@ func (p *actorPartition) loop() {
if !exists {
uniqueChannels := make(map[chan<- domain.Message]struct{})
for _, e := range p.rules {
if e.pattern.Satisfies(id) {
if e.pattern.Match(id) {
for ch := range e.channels {
uniqueChannels[ch] = struct{}{}
}
@@ -107,7 +107,7 @@ func (p *actorPartition) loop() {
}
case opRegister:
key := v.pattern.Canonical()
key := v.pattern.Key()
e, exists := p.rules[key]
if !exists {
e = &ruleEntry{pattern: v.pattern, channels: make(map[chan<- domain.Message]struct{})}
@@ -121,7 +121,7 @@ func (p *actorPartition) loop() {
e.channels[v.channel] = struct{}{}
for id, subs := range p.memo {
if v.pattern.Satisfies(id) && !slices.Contains(subs, v.channel) {
if v.pattern.Match(id) && !slices.Contains(subs, v.channel) {
p.memo[id] = append(subs, v.channel)
}
}
@@ -129,7 +129,7 @@ func (p *actorPartition) loop() {
v.done <- struct{}{}
case opDeregister:
key := v.pattern.Canonical()
key := v.pattern.Key()
e, ok := p.rules[key]
if !ok {
v.done <- struct{}{}
@@ -146,7 +146,7 @@ func (p *actorPartition) loop() {
}
for id, subs := range p.memo {
if v.pattern.Satisfies(id) {
if v.pattern.Match(id) {
for i := range subs {
if subs[i] == v.channel {
last := len(subs) - 1

View File

@@ -100,7 +100,7 @@ func (r *Router) Incoming() chan<- domain.Message { return r.incoming }
func (r *Router) RegisterPattern(pat domain.Pattern, ch chan<- domain.Message) {
// Inline ensurePartition
ns := pat.Namespace
ns, _, _, _ := pat.Parse() // Note: Error ignored, pattern assumed to be valid if passed to router
r.mu.RLock()
p := r.partitions[ns]
r.mu.RUnlock()
@@ -119,7 +119,8 @@ func (r *Router) RegisterPattern(pat domain.Pattern, ch chan<- domain.Message) {
func (r *Router) DeregisterPattern(pat domain.Pattern, ch chan<- domain.Message) {
r.mu.RLock()
p := r.partitions[pat.Namespace]
ns, _, _, _ := pat.Parse()
p := r.partitions[ns]
r.mu.RUnlock()
if p != nil {
p.deregisterRoute(pat, ch)

View File

@@ -1,82 +0,0 @@
package server
import (
"context"
"time"
"github.com/google/uuid"
pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type GRPCControlServer struct {
pb.UnimplementedDataServiceControlServer
manager *manager.Manager
}
func NewGRPCControlServer(m *manager.Manager) *GRPCControlServer {
return &GRPCControlServer{manager: m}
}
// StartStream creates a new session. It does NOT attach client channels.
// Your streaming RPC should later call AttachClient(sessionID, opts).
func (s *GRPCControlServer) StartStream(_ context.Context, req *pb.StartStreamRequest) (*pb.StartStreamResponse, error) {
sessionID := s.manager.NewSession(time.Duration(1) * time.Minute) // timeout set to 1 minute
return &pb.StartStreamResponse{StreamUuid: sessionID.String()}, nil
}
// ConfigureStream sets the session's subscriptions in one shot.
// It does NOT require channels to be attached.
func (s *GRPCControlServer) ConfigureStream(_ context.Context, req *pb.ConfigureStreamRequest) (*pb.ConfigureStreamResponse, error) {
if req == nil {
return nil, status.Error(codes.InvalidArgument, "nil request")
}
streamID, err := uuid.Parse(req.StreamUuid)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid stream_uuid %q: %v", req.StreamUuid, err)
}
ids := make([]domain.Identifier, 0, len(req.Identifiers))
for _, in := range req.Identifiers {
id, e := domain.ParseIdentifier(in.Key)
if e != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid identifier %q: %v", in.Key, e)
}
ids = append(ids, id)
}
if err := s.manager.ConfigureSession(streamID, ids); err != nil {
// Map common manager errors to gRPC codes.
switch err {
case manager.ErrSessionNotFound:
return nil, status.Errorf(codes.NotFound, "session not found: %v", err)
default:
return nil, status.Errorf(codes.Internal, "set subscriptions: %v", err)
}
}
return &pb.ConfigureStreamResponse{}, nil
}
// StopStream closes the session and tears down routes and streams.
func (s *GRPCControlServer) StopStream(_ context.Context, req *pb.StopStreamRequest) (*pb.StopStreamResponse, error) {
if req == nil {
return nil, status.Error(codes.InvalidArgument, "nil request")
}
streamID, err := uuid.Parse(req.StreamUuid)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid stream_uuid %q: %v", req.StreamUuid, err)
}
if err := s.manager.CloseSession(streamID); err != nil {
switch err {
case manager.ErrSessionNotFound:
return nil, status.Errorf(codes.NotFound, "session not found: %v", err)
default:
return nil, status.Errorf(codes.Internal, "close session: %v", err)
}
}
return &pb.StopStreamResponse{}, nil
}

View File

@@ -1,54 +0,0 @@
package server
import (
"fmt"
"github.com/google/uuid"
pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager"
)
type GRPCStreamingServer struct {
pb.UnimplementedDataServiceStreamingServer
manager *manager.Manager
}
func NewGRPCStreamingServer(m *manager.Manager) *GRPCStreamingServer {
return &GRPCStreamingServer{manager: m}
}
// ConnectStream attaches a client to an existing session and streams outbound messages.
// This is server-streaming only; inbound use is optional and ignored here.
func (s *GRPCStreamingServer) ConnectStream(req *pb.ConnectStreamRequest, stream pb.DataServiceStreaming_ConnectStreamServer) error {
if req == nil {
return fmt.Errorf("nil request")
}
sessionID, err := uuid.Parse(req.StreamUuid)
if err != nil {
return fmt.Errorf("invalid UUID: %w", err)
}
_, out, err := s.manager.AttachClient(sessionID, 256, 1024)
if err != nil {
return fmt.Errorf("attach channels: %w", err)
}
defer func() { _ = s.manager.DetachClient(sessionID) }()
ctx := stream.Context()
for {
select {
case <-ctx.Done():
return nil
case msg, ok := <-out:
if !ok {
return nil // session closed
}
if err := stream.Send(&pb.Message{
Identifier: &pb.Identifier{Key: msg.Identifier.Key()},
Payload: msg.Payload,
}); err != nil {
return err
}
}
}
}

View File

@@ -1,242 +0,0 @@
package server
import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"io"
"net"
"sync"
"time"
"github.com/google/uuid"
pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager"
"google.golang.org/protobuf/proto"
)
type SocketStreamingServer struct {
manager *manager.Manager
}
func NewSocketStreamingServer(m *manager.Manager) *SocketStreamingServer {
return &SocketStreamingServer{manager: m}
}
func (s *SocketStreamingServer) Serve(lis net.Listener) error {
for {
conn, err := lis.Accept()
if err != nil {
fmt.Printf("accept error: %v\n", err)
continue
}
go s.handleConnection(conn)
}
}
func (s *SocketStreamingServer) handleConnection(conn net.Conn) {
defer func() {
if err := conn.Close(); err != nil {
fmt.Printf("conn close error: %v\n", err)
} else {
fmt.Println("connection closed")
}
}()
if tc, ok := conn.(*net.TCPConn); ok {
_ = tc.SetNoDelay(true) // low latency
_ = tc.SetWriteBuffer(2 * 1024 * 1024) // bigger kernel sndbuf
_ = tc.SetReadBuffer(256 * 1024)
_ = tc.SetKeepAlive(true)
_ = tc.SetKeepAlivePeriod(30 * time.Second)
// Note: avoid SetLinger>0; default is fine.
}
reader := bufio.NewReaderSize(conn, 64*1024)
line, err := reader.ReadBytes('\n')
if err != nil {
fmt.Printf("read stream UUID error: %v\n", err)
_, _ = fmt.Fprint(conn, "Failed to read stream UUID\n")
return
}
streamUUID, err := uuid.Parse(string(trimLineEnding(line)))
if err != nil {
_, _ = fmt.Fprint(conn, "Invalid stream UUID\n")
return
}
// Give the socket server room before router drops. Make out chan larger.
// Tune per your pressure. (in=256, out=8192 as example)
_, out, err := s.manager.AttachClient(streamUUID, 256, 8192)
if err != nil {
_, _ = fmt.Fprintf(conn, "Failed to attach to stream: %v\n", err)
return
}
defer func() { _ = s.manager.DetachClient(streamUUID) }()
// Large bufio writer to reduce syscalls.
writer := bufio.NewWriterSize(conn, 1*1024*1024)
defer func() {
if err := writer.Flush(); err != nil {
fmt.Printf("final flush error: %v\n", err)
}
}()
// ---- Throughput optimizations ----
const (
maxBatchMsgs = 128 // cap number of msgs per batch
maxBatchBytes = 1 * 1024 * 1024 // cap bytes per batch
idleFlush = 2 * time.Millisecond // small idle flush timer
)
var (
hdr [4]byte
batchBuf = &bytes.Buffer{}
bufPool = sync.Pool{New: func() any { return make([]byte, 64*1024) }}
timer = time.NewTimer(idleFlush)
timerAlive = true
)
stopTimer := func() {
if timerAlive && timer.Stop() {
// drain if fired
select {
case <-timer.C:
default:
}
}
timerAlive = false
}
resetTimer := func() {
if !timerAlive {
timer.Reset(idleFlush)
timerAlive = true
} else {
// re-arm
stopTimer()
timer.Reset(idleFlush)
timerAlive = true
}
}
// Main loop: drain out channel into a single write.
for {
// Block for at least one message or close.
msg, ok := <-out
if !ok {
_ = writer.Flush()
return
}
batchBuf.Reset()
bytesInBatch := 0
msgsInBatch := 0
// Start with the message we just popped.
{
m := pb.Message{
Identifier: &pb.Identifier{Key: msg.Identifier.Key()},
Payload: msg.Payload,
}
// Use pooled scratch to avoid per-message allocs in Marshal.
scratch := bufPool.Get().([]byte)[:0]
b, err := proto.MarshalOptions{}.MarshalAppend(scratch, &m)
if err != nil {
fmt.Printf("proto marshal error: %v\n", err)
bufPool.Put(scratch[:0])
// skip message
} else {
binary.BigEndian.PutUint32(hdr[:], uint32(len(b)))
_, _ = batchBuf.Write(hdr[:])
_, _ = batchBuf.Write(b)
bytesInBatch += 4 + len(b)
msgsInBatch++
bufPool.Put(b[:0])
}
}
// Opportunistically drain without blocking.
drain := true
resetTimer()
for drain && msgsInBatch < maxBatchMsgs && bytesInBatch < maxBatchBytes {
select {
case msg, ok = <-out:
if !ok {
// peer closed while batching; flush what we have.
if batchBuf.Len() > 0 {
if _, err := writer.Write(batchBuf.Bytes()); err != nil {
if err == io.EOF {
return
}
fmt.Printf("write error: %v\n", err)
return
}
if err := writer.Flush(); err != nil {
fmt.Printf("flush error: %v\n", err)
}
}
return
}
m := pb.Message{
Identifier: &pb.Identifier{Key: msg.Identifier.Key()},
Payload: msg.Payload,
}
scratch := bufPool.Get().([]byte)[:0]
b, err := proto.MarshalOptions{}.MarshalAppend(scratch, &m)
if err != nil {
fmt.Printf("proto marshal error: %v\n", err)
bufPool.Put(scratch[:0])
continue
}
binary.BigEndian.PutUint32(hdr[:], uint32(len(b)))
_, _ = batchBuf.Write(hdr[:])
_, _ = batchBuf.Write(b)
bytesInBatch += 4 + len(b)
msgsInBatch++
bufPool.Put(b[:0])
case <-timer.C:
timerAlive = false
// idle window hit; stop draining further this round
drain = false
}
}
// Single write for the whole batch.
// Avoid per-message SetWriteDeadline. Let TCP handle buffering.
if _, err := writer.Write(batchBuf.Bytes()); err != nil {
if err == io.EOF {
return
}
fmt.Printf("write error: %v\n", err)
return
}
// Flush when batch is sizable or we saw the idle timer.
// This keeps latency low without flushing every message.
if msgsInBatch >= maxBatchMsgs ||
bytesInBatch >= maxBatchBytes ||
!timerAlive {
if err := writer.Flush(); err != nil {
fmt.Printf("flush error: %v\n", err)
return
}
}
}
}
// trimLineEnding trims a single trailing '\n' and optional '\r' before it.
func trimLineEnding(b []byte) []byte {
n := len(b)
if n == 0 {
return b
}
if b[n-1] == '\n' {
n--
if n > 0 && b[n-1] == '\r' {
n--
}
return b[:n]
}
return b
}

View File

@@ -1,68 +0,0 @@
package worker
import (
"fmt"
"sync"
)
type Factory func() Worker
type Registry struct {
mu sync.RWMutex
workerFactories map[string]Factory
}
func NewRegistry() *Registry {
return &Registry{
workerFactories: make(map[string]Factory),
}
}
func (r *Registry) Register(workerType string, workerFactory Factory) error {
r.mu.Lock()
defer r.mu.Unlock()
if _, ok := r.workerFactories[workerType]; ok {
return fmt.Errorf("worker already registered: %s", workerType)
}
if workerFactory == nil {
return fmt.Errorf("nil workerFactory provided for: %s", workerType)
}
r.workerFactories[workerType] = workerFactory
return nil
}
func (r *Registry) Spawn(workerType string) (Worker, error) {
r.mu.RLock()
defer r.mu.RUnlock()
workerFactory, ok := r.workerFactories[workerType]
if !ok {
return nil, fmt.Errorf("unknown worker type: %s", workerType)
}
return workerFactory(), nil
}
func (r *Registry) RegisteredWorkers() []string {
r.mu.RLock()
defer r.mu.RUnlock()
workerTypes := make([]string, 0, len(r.workerFactories))
for k := range r.workerFactories {
workerTypes = append(workerTypes, k)
}
return workerTypes
}
func (r *Registry) Factory(workerType string) (Factory, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
workerFactory, ok := r.workerFactories[workerType]
return workerFactory, ok
}

View File

@@ -3,10 +3,7 @@
package worker
import (
"context"
"errors"
"log/slog"
"sync"
"time"
"github.com/google/uuid"
@@ -18,128 +15,37 @@ var (
ErrWorkerRunning = errors.New("worker already running")
)
type SessionController interface {
NewSession(idleAfter time.Duration) uuid.UUID
AttachClient(id uuid.UUID, inBuf, outBuf int) (chan<- domain.Message, <-chan domain.Message, error)
DetachClient(id uuid.UUID) error
ConfigureSession(id uuid.UUID, next []domain.Identifier) error
CloseSession(id uuid.UUID) error
}
type (
ReceiverFunc func() (domain.Message, error)
SenderFunc func(m domain.Message) error
)
type Instruction struct{}
type SessionController interface {
CreateSession(idleAfter time.Duration) uuid.UUID
LeaseSessionReceiver(sid uuid.UUID) (ReceiverFunc, error)
LeaseSessionSender(sid uuid.UUID) (SenderFunc, error)
ReleaseSessionReceiver(sid uuid.UUID) error
ReleaseSessionSender(sid uuid.UUID) error
ConfigureSession(sid uuid.UUID, cfg any) error
CloseSession(sid uuid.UUID) error
}
type Worker interface {
Start(workerID uuid.UUID, controller SessionController, cfg []byte) error
Start(spec []byte, ctrl SessionController) error
Stop() error
Execute(ctx context.Context, inst Instruction) error
IsRunning() bool
ID() uuid.UUID
SetUnits(units [][]byte) error
GetSpecification() []byte
GetUnits() [][]byte
}
type BaseStatefulWorker struct {
workerUUID uuid.UUID
sc SessionController
sid uuid.UUID
in chan<- domain.Message
out <-chan domain.Message
running bool
mu sync.RWMutex
type Normalizer interface {
NormalizeSpecification(spec []byte) ([]byte, error)
NormalizeUnit(unit []byte) ([]byte, error)
}
func (w *BaseStatefulWorker) Start(workerUUID uuid.UUID, sessionController SessionController, _ []byte) error {
if sessionController == nil {
return errors.New("nil SessionController provided")
}
w.mu.Lock()
if w.running {
w.mu.Unlock()
return ErrWorkerRunning
}
sid := sessionController.NewSession(time.Duration(0)) // set a zero duration to disable idle timeout
in, out, err := sessionController.AttachClient(sid, 256, 256)
if err != nil {
w.mu.Unlock()
return err
}
w.sc, w.in, w.out = sessionController, in, out
w.workerUUID = workerUUID
w.running = true
w.mu.Unlock()
return nil
}
func (w *BaseStatefulWorker) Stop() error {
w.mu.Lock()
if !w.running {
w.mu.Unlock()
return ErrWorkerNotRunning
}
err := w.sc.DetachClient(w.sid)
if err != nil {
slog.Default().Error("error when detaching client", "error", err.Error())
}
err = w.sc.CloseSession(w.sid)
if err != nil {
slog.Default().Error("error when closing session", "error", err.Error())
}
w.sc, w.in, w.out = nil, nil, nil
w.workerUUID, w.sid = uuid.Nil, uuid.Nil
w.running = false
w.mu.Unlock()
return nil
}
func (w *BaseStatefulWorker) IsRunning() bool {
w.mu.RLock()
running := w.running
w.mu.RUnlock()
return running
}
func (w *BaseStatefulWorker) ID() uuid.UUID {
w.mu.RLock()
id := w.workerUUID
w.mu.RUnlock()
return id
}
func (w *BaseStatefulWorker) SetReceiveIdentifiers(ids []domain.Identifier) error {
w.mu.RLock()
if !w.running {
w.mu.RUnlock()
return ErrWorkerNotRunning
}
w.mu.RUnlock()
return w.sc.ConfigureSession(w.sid, ids)
}
func (w *BaseStatefulWorker) In() chan<- domain.Message {
w.mu.RLock()
ch := w.in
w.mu.RUnlock()
return ch
}
func (w *BaseStatefulWorker) Out() <-chan domain.Message {
w.mu.RLock()
ch := w.out
w.mu.RUnlock()
return ch
}
type Factory func() Worker