From b77efe8e553750624a98ec16244ba3700ba693bf Mon Sep 17 00:00:00 2001 From: Phillip Michelsen Date: Sat, 27 Sep 2025 11:07:43 +0000 Subject: [PATCH] Updating router to provide new advanced routing logic --- .../internal/domain/identifier.go | 252 +++++++----------- .../data_service/internal/domain/pattern.go | 94 +++++++ .../data_service/internal/manager/commands.go | 2 +- .../data_service/internal/manager/helper.go | 35 --- .../data_service/internal/manager/manager.go | 164 +----------- .../data_service/internal/router/partition.go | 165 ++++++++++++ .../data_service/internal/router/router.go | 70 ----- .../data_service/internal/router/routerAdv.go | 127 +++++++++ 8 files changed, 494 insertions(+), 415 deletions(-) create mode 100644 services/data_service/internal/domain/pattern.go create mode 100644 services/data_service/internal/router/partition.go delete mode 100644 services/data_service/internal/router/router.go create mode 100644 services/data_service/internal/router/routerAdv.go diff --git a/services/data_service/internal/domain/identifier.go b/services/data_service/internal/domain/identifier.go index 867cc31..d6b75da 100644 --- a/services/data_service/internal/domain/identifier.go +++ b/services/data_service/internal/domain/identifier.go @@ -1,181 +1,131 @@ +// Package domain defines external message identifiers. package domain import ( "errors" - "fmt" - "regexp" "sort" "strings" ) -const ( - prefixRaw = "raw::" - prefixInternal = "internal::" -) +var ErrBadIdentifier = errors.New("identifier: invalid format") -// Identifier is a canonical representation of a data stream identifier. +// Identifier is a lightweight wrapper around the canonical key. type Identifier struct{ key string } -func (id Identifier) IsRaw() bool { return strings.HasPrefix(id.key, prefixRaw) } -func (id Identifier) IsInternal() bool { return strings.HasPrefix(id.key, prefixInternal) } -func (id Identifier) Key() string { return id.key } +// NewIdentifier builds a canonical key: "namespace::l1.l2[param=v;...] .l3". +// Labels and params are sorted for determinism. +func NewIdentifier(namespace string, labels map[string]map[string]string) Identifier { + var b strings.Builder + // rough prealloc: ns + "::" + avg label + some params + b.Grow(len(namespace) + 2 + 10*len(labels) + 20) -func (id Identifier) ProviderSubject() (provider, subject string, ok bool) { - if !id.IsRaw() { - return "", "", false + // namespace + b.WriteString(namespace) + b.WriteString("::") + + // sort label names for stable output + labelNames := make([]string, 0, len(labels)) + for name := range labels { + labelNames = append(labelNames, name) } - body := strings.TrimPrefix(id.key, prefixRaw) - prov, subj, ok := strings.Cut(body, ".") - return prov, subj, ok + sort.Strings(labelNames) + + for i, name := range labelNames { + if i > 0 { + b.WriteByte('.') + } + b.WriteString(name) + + // params (sorted) + params := labels[name] + if len(params) > 0 { + keys := make([]string, 0, len(params)) + for k := range params { + keys = append(keys, k) + } + sort.Strings(keys) + + b.WriteByte('[') + for j, k := range keys { + if j > 0 { + b.WriteByte(';') + } + b.WriteString(k) + b.WriteByte('=') + b.WriteString(params[k]) + } + b.WriteByte(']') + } + } + + return Identifier{key: b.String()} } -func (id Identifier) InternalParts() (venue, stream, symbol string, params map[string]string, ok bool) { - if !id.IsInternal() { - return "", "", "", nil, false +// NewIdentifierFromRaw wraps a raw key without validation. +func NewIdentifierFromRaw(raw string) Identifier { return Identifier{key: raw} } + +// Key returns the canonical key string. +func (id Identifier) Key() string { return id.key } + +// Parse returns namespace and labels from Key. +// Format: "namespace::label1.label2[param=a;foo=bar].label3" +func (id Identifier) Parse() (string, map[string]map[string]string, error) { + k := id.key + i := strings.Index(k, "::") + if i <= 0 { + return "", nil, ErrBadIdentifier } - body := strings.TrimPrefix(id.key, prefixInternal) - before, bracket, _ := strings.Cut(body, "[") - parts := strings.Split(before, ".") - if len(parts) != 3 { - return "", "", "", nil, false + ns := k[:i] + if ns == "" { + return "", nil, ErrBadIdentifier } - return parts[0], parts[1], parts[2], decodeParams(strings.TrimSuffix(bracket, "]")), true + raw := k[i+2:] + + lbls := make(map[string]map[string]string, 8) + if raw == "" { + return ns, lbls, nil + } + + for tok := range strings.SplitSeq(raw, ".") { + if tok == "" { + continue + } + name, params, err := parseLabel(tok) + if err != nil || name == "" { + return "", nil, ErrBadIdentifier + } + lbls[name] = params + } + return ns, lbls, nil } -func RawID(provider, subject string) (Identifier, error) { - p := strings.ToLower(strings.TrimSpace(provider)) - s := strings.TrimSpace(subject) - - if err := validateComponent("provider", p, false); err != nil { - return Identifier{}, err +// parseLabel parses "name" or "name[k=v;...]" into (name, params). +func parseLabel(tok string) (string, map[string]string, error) { + lb := strings.IndexByte(tok, '[') + if lb == -1 { + return tok, map[string]string{}, nil } - if err := validateComponent("subject", s, true); err != nil { - return Identifier{}, err - } - return Identifier{key: prefixRaw + p + "." + s}, nil -} - -func InternalID(venue, stream, symbol string, params map[string]string) (Identifier, error) { - v := strings.ToLower(strings.TrimSpace(venue)) - t := strings.ToLower(strings.TrimSpace(stream)) - sym := strings.ToUpper(strings.TrimSpace(symbol)) - - if err := validateComponent("venue", v, false); err != nil { - return Identifier{}, err - } - if err := validateComponent("stream", t, false); err != nil { - return Identifier{}, err - } - if err := validateComponent("symbol", sym, false); err != nil { - return Identifier{}, err + rb := strings.LastIndexByte(tok, ']') + if rb == -1 || rb < lb { + return "", nil, ErrBadIdentifier } - paramStr, err := encodeParams(params) // "k=v;..." or "" - if err != nil { - return Identifier{}, err - } + name := tok[:lb] + paramStr := tok[lb+1 : rb] + params := map[string]string{} if paramStr == "" { - paramStr = "[]" - } else { - paramStr = "[" + paramStr + "]" + return name, params, nil } - return Identifier{key: prefixInternal + v + "." + t + "." + sym + paramStr}, nil -} -func ParseIdentifier(s string) (Identifier, error) { - s = strings.TrimSpace(s) - switch { - case strings.HasPrefix(s, prefixRaw): - // raw::provider.subject - body := strings.TrimPrefix(s, prefixRaw) - prov, subj, ok := strings.Cut(body, ".") - if !ok { - return Identifier{}, errors.New("invalid raw identifier: missing '.'") - } - return RawID(prov, subj) - - case strings.HasPrefix(s, prefixInternal): - // internal::venue.stream.symbol[...] - body := strings.TrimPrefix(s, prefixInternal) - before, bracket, _ := strings.Cut(body, "[") - parts := strings.Split(before, ".") - if len(parts) != 3 { - return Identifier{}, errors.New("invalid internal identifier: need venue.stream.symbol") - } - params := decodeParams(strings.TrimSuffix(bracket, "]")) - return InternalID(parts[0], parts[1], parts[2], params) - } - return Identifier{}, errors.New("unknown identifier prefix") -} - -var ( - segDisallow = regexp.MustCompile(`[ \t\r\n\[\]]`) // forbid whitespace/brackets in fixed segments - dotDisallow = regexp.MustCompile(`[.]`) // fixed segments cannot contain '.' -) - -// allowAny=true (for subject) skips dot checks but still forbids whitespace/brackets. -func validateComponent(name, v string, allowAny bool) error { - if v == "" { - return fmt.Errorf("%s cannot be empty", name) - } - if allowAny { - if segDisallow.MatchString(v) { - return fmt.Errorf("%s contains illegal chars [] or whitespace", name) - } - return nil - } - if segDisallow.MatchString(v) || dotDisallow.MatchString(v) { - return fmt.Errorf("%s contains illegal chars (dot/brackets/whitespace)", name) - } - return nil -} - -// encodeParams renders sorted k=v pairs separated by ';'. -func encodeParams(params map[string]string) (string, error) { - if len(params) == 0 { - return "", nil - } - keys := make([]string, 0, len(params)) - for k := range params { - k = strings.ToLower(strings.TrimSpace(k)) - if k == "" { + for pair := range strings.SplitSeq(paramStr, ";") { + if pair == "" { continue } - keys = append(keys, k) - } - sort.Strings(keys) - - out := make([]string, 0, len(keys)) - for _, k := range keys { - v := strings.TrimSpace(params[k]) - // prevent breaking delimiters - if strings.ContainsAny(k, ";]") || strings.ContainsAny(v, ";]") { - return "", fmt.Errorf("param %q contains illegal ';' or ']'", k) + kv := strings.SplitN(pair, "=", 2) + if len(kv) != 2 || kv[0] == "" { + return "", nil, ErrBadIdentifier } - out = append(out, k+"="+v) + params[kv[0]] = kv[1] } - return strings.Join(out, ";"), nil -} - -func decodeParams(s string) map[string]string { - s = strings.TrimSpace(s) - if s == "" { - return map[string]string{} - } - out := make(map[string]string, 4) - for _, p := range strings.Split(s, ";") { - if p == "" { - continue - } - kv := strings.SplitN(p, "=", 2) - if len(kv) != 2 { - continue - } - k := strings.ToLower(strings.TrimSpace(kv[0])) - v := strings.TrimSpace(kv[1]) - if k != "" { - out[k] = v - } - } - return out + return name, params, nil } diff --git a/services/data_service/internal/domain/pattern.go b/services/data_service/internal/domain/pattern.go new file mode 100644 index 0000000..333434d --- /dev/null +++ b/services/data_service/internal/domain/pattern.go @@ -0,0 +1,94 @@ +package domain + +import ( + "sort" + "strings" +) + +type Pattern struct { + Namespace string + Labels map[string]map[string]string + Exact bool +} + +// Canonical returns a canonical string representation of the Pattern struct +// TODO: Ensure labels and namespaces are set to lowercase +func (p *Pattern) Canonical() string { + var b strings.Builder + b.Grow(len(p.Namespace) + 10*len(p.Labels) + 20) // preallocate a rough size estimate + + b.WriteString(p.Namespace) + b.WriteString("::") + + labelNames := make([]string, 0, len(p.Labels)) + for name := range p.Labels { + labelNames = append(labelNames, name) + } + sort.Strings(labelNames) // sort the labels for determinism + + for i, name := range labelNames { + if i > 0 { + b.WriteByte('|') + } + b.WriteString(name) + + params := p.Labels[name] + if len(params) > 0 { + keys := make([]string, 0, len(params)) + for k := range params { + keys = append(keys, k) + } + sort.Strings(keys) // sort params for determinism + + b.WriteByte('[') + for j, k := range keys { + if j > 0 { + b.WriteByte(';') + } + b.WriteString(k) + b.WriteByte('=') + b.WriteString(params[k]) + } + b.WriteByte(']') + } + } + + b.WriteString("::") + if p.Exact { + b.WriteString("t") + } else { + b.WriteString("f") + } + + return b.String() +} + +// Satisfies checks if a domain.Identifier satisfies the pattern. +func (p *Pattern) Satisfies(id Identifier) bool { + ns, idLabels, err := id.Parse() + if err != nil || ns != p.Namespace { + return false + } + + // Every pattern label must be present in the identifier. + for lname, wantParams := range p.Labels { + haveParams, ok := idLabels[lname] + if !ok { + return false + } + // If pattern specifies params, they must be a subset of identifier's params. + for k, v := range wantParams { + hv, ok := haveParams[k] + if !ok || hv != v { + return false + } + } + // If pattern has no params for this label, it matches any/none params in the identifier. + } + + // Exact applies to label names only: no extras allowed. + if p.Exact && len(idLabels) != len(p.Labels) { + return false + } + return true +} diff --git a/services/data_service/internal/manager/commands.go b/services/data_service/internal/manager/commands.go index b310dc5..6aabcf4 100644 --- a/services/data_service/internal/manager/commands.go +++ b/services/data_service/internal/manager/commands.go @@ -60,7 +60,7 @@ type detachResult struct { type configureCmd struct { sid uuid.UUID - next []domain.Identifier + next []domain.Pattern resp chan configureResult } diff --git a/services/data_service/internal/manager/helper.go b/services/data_service/internal/manager/helper.go index 0f95a5a..5d04392 100644 --- a/services/data_service/internal/manager/helper.go +++ b/services/data_service/internal/manager/helper.go @@ -1,36 +1 @@ package manager - -import ( - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" -) - -func identifierSetDifferences(oldIDs, nextIDs []domain.Identifier) (toAdd, toDel []domain.Identifier) { - oldSet := make(map[domain.Identifier]struct{}, len(oldIDs)) - for _, id := range oldIDs { - oldSet[id] = struct{}{} - } - - newSet := make(map[domain.Identifier]struct{}, len(nextIDs)) - for _, id := range nextIDs { - newSet[id] = struct{}{} - if _, ok := oldSet[id]; !ok { - toAdd = append(toAdd, id) - } - } - - for _, id := range oldIDs { - if _, ok := newSet[id]; !ok { - toDel = append(toDel, id) - } - } - - return -} - -func identifierMapToSlice(m map[domain.Identifier]struct{}) []domain.Identifier { - ids := make([]domain.Identifier, 0, len(m)) - for id := range m { - ids = append(ids, id) - } - return ids -} diff --git a/services/data_service/internal/manager/manager.go b/services/data_service/internal/manager/manager.go index 7afc32b..639bff3 100644 --- a/services/data_service/internal/manager/manager.go +++ b/services/data_service/internal/manager/manager.go @@ -43,7 +43,7 @@ func NewManager(router *router.Router, workerRegistry *worker.Registry) *Manager sessions: make(map[uuid.UUID]*session), router: router, } - go router.Run() + go router.Start() go m.run() slog.Default().Info("manager started", slog.String("cmp", "manager")) @@ -113,9 +113,9 @@ func (m *Manager) DetachClient(id uuid.UUID) error { return r.err } -// ConfigureSession sets the next set of identifiers for the session, starting and stopping streams as needed. -func (m *Manager) ConfigureSession(id uuid.UUID, next []domain.Identifier) error { - slog.Default().Debug("configure session request", slog.String("cmp", "manager"), slog.String("session", id.String()), slog.Int("idents", len(next))) +// ConfigureSession sets the next set of patterns for the session, starting and stopping streams as needed. +func (m *Manager) ConfigureSession(id uuid.UUID, next []domain.Pattern) error { + slog.Default().Debug("configure session request", slog.String("cmp", "manager"), slog.String("session", id.String()), slog.Int("patterns", len(next))) resp := make(chan configureResult, 1) m.cmdCh <- configureCmd{sid: id, next: next, resp: resp} @@ -218,10 +218,6 @@ func (m *Manager) handleAttach(cmd attachCmd) { s.attached = true s.disarmIdleTimer() - for id := range s.bound { - m.router.RegisterRoute(id, cout) - } - cmd.resp <- attachResult{cin: cin, cout: cout, err: nil} } @@ -237,10 +233,6 @@ func (m *Manager) handleDetach(cmd detachCmd) { return } - for id := range s.bound { - m.router.DeregisterRoute(id, s.outChannel) - } - s.clearChannels() // Only rearm the idle timer if the timeout is positive. @@ -260,109 +252,20 @@ func (m *Manager) handleDetach(cmd detachCmd) { // handleConfigure updates the session bindings, starting and stopping streams as needed. Currently only supports Raw streams. // TODO: Change this configuration to be an atomic operation, so that partial failures do not end in a half-configured state. func (m *Manager) handleConfigure(cmd configureCmd) { - s, ok := m.sessions[cmd.sid] + _, ok := m.sessions[cmd.sid] if !ok { cmd.resp <- configureResult{ErrSessionNotFound} return } - toAdd, toRemove := identifierSetDifferences(identifierMapToSlice(s.bound), cmd.next) - - pendingSub := make(map[domain.Identifier]<-chan error) - pendingUnsub := make(map[domain.Identifier]<-chan error) - var added, removed []domain.Identifier var errs error - // Adds - for _, id := range toAdd { - pName, subject, ok := id.ProviderSubject() - if !ok || subject == "" || pName == "" { - errs = errors.Join(errs, fmt.Errorf("invalid identifier: %s", id.Key())) - continue - } - p, ok := m.providers[pName] - if !ok { - errs = errors.Join(errs, fmt.Errorf("provider not found: %s", pName)) - continue - } - if p.IsStreamActive(subject) { - s.bound[id] = struct{}{} - added = append(added, id) - continue - } - pendingSub[id] = p.Subscribe(subject) - } - - // Removes - for _, id := range toRemove { - pName, subject, ok := id.ProviderSubject() - if !ok || subject == "" || pName == "" { - errs = errors.Join(errs, fmt.Errorf("invalid identifier: %s", id.Key())) - continue - } - p, ok := m.providers[pName] - if !ok { - errs = errors.Join(errs, fmt.Errorf("provider not found: %s", pName)) - continue - } - stillNeeded := false - for _, other := range m.sessions { - if other.id == s.id { - continue - } - if _, bound := other.bound[id]; bound { - stillNeeded = true - break - } - } - if stillNeeded { - delete(s.bound, id) - removed = append(removed, id) - continue - } - pendingUnsub[id] = p.Unsubscribe(subject) - } - - // Wait for subscribes - for id, ch := range pendingSub { - if err := <-ch; err != nil { - errs = errors.Join(errs, fmt.Errorf("failed to subscribe to %s: %w", id.Key(), err)) - continue - } - s.bound[id] = struct{}{} - added = append(added, id) - } - - // Wait for unsubscribes - for id, ch := range pendingUnsub { - if err := <-ch; err != nil { - errs = errors.Join(errs, fmt.Errorf("failed to unsubscribe from %s: %w", id.Key(), err)) - continue - } - delete(s.bound, id) - removed = append(removed, id) - } - - if s.attached { - if s.inChannel == nil || s.outChannel == nil { - errs = errors.Join(errs, fmt.Errorf("channels do not exist despite attached state")) // error should never be hit - slog.Default().Error("no channels despite attached state", slog.String("cmp", "manager"), slog.String("session", cmd.sid.String())) - } else { - for _, id := range added { - m.router.RegisterRoute(id, s.outChannel) - } - for _, id := range removed { - m.router.DeregisterRoute(id, s.outChannel) - } - } - } - cmd.resp <- configureResult{err: errs} } // handleCloseSession closes and removes the session, cleaning up all bindings. func (m *Manager) handleCloseSession(cmd closeSessionCmd) { - s, ok := m.sessions[cmd.sid] + _, ok := m.sessions[cmd.sid] if !ok { cmd.resp <- closeSessionResult{err: ErrSessionNotFound} return @@ -370,60 +273,5 @@ func (m *Manager) handleCloseSession(cmd closeSessionCmd) { var errs error - // Deregister attached routes - if s.attached { - if s.outChannel == nil { - errs = errors.Join(errs, fmt.Errorf("channels do not exist despite attached state")) - slog.Default().Error("no channels despite attached state", slog.String("cmp", "manager"), slog.String("session", cmd.sid.String())) - } else { - for id := range s.bound { - m.router.DeregisterRoute(id, s.outChannel) - } - } - } - - // Unsubscribe from all streams if no other session needs them. - pendingUnsub := make(map[domain.Identifier]<-chan error) - - for id := range s.bound { - pName, subject, ok := id.ProviderSubject() - if !ok || subject == "" || pName == "" { - errs = errors.Join(errs, fmt.Errorf("invalid identifier: %s", id.Key())) - continue - } - p, ok := m.providers[pName] - if !ok { - errs = errors.Join(errs, fmt.Errorf("provider not found: %s", pName)) - continue - } - - stillNeeded := false - for _, other := range m.sessions { - if other.id == s.id { - continue - } - if _, bound := other.bound[id]; bound { - stillNeeded = true - break - } - } - if stillNeeded { - continue - } - - pendingUnsub[id] = p.Unsubscribe(subject) - } - - for id, ch := range pendingUnsub { - if err := <-ch; err != nil { - errs = errors.Join(errs, fmt.Errorf("failed to unsubscribe from %s: %w", id.Key(), err)) - } - } - - // Stop timers and channels, remove session. - s.disarmIdleTimer() - s.clearChannels() - delete(m.sessions, s.id) - cmd.resp <- closeSessionResult{err: errs} } diff --git a/services/data_service/internal/router/partition.go b/services/data_service/internal/router/partition.go new file mode 100644 index 0000000..983673a --- /dev/null +++ b/services/data_service/internal/router/partition.go @@ -0,0 +1,165 @@ +package router + +import ( + "slices" + "sync" + + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" +) + +// actorPartition: single goroutine owns state. +type actorPartition struct { + opCh chan any + wg sync.WaitGroup + memo map[domain.Identifier][]chan<- domain.Message + rules map[string]*ruleEntry // the string is to be a pattern.Canonical() +} + +type ruleEntry struct { + pattern domain.Pattern + channels map[chan<- domain.Message]struct{} +} + +func newActorPartition(bufferSize int) *actorPartition { + return &actorPartition{ + opCh: make(chan any, bufferSize), + memo: make(map[domain.Identifier][]chan<- domain.Message), + rules: make(map[string]*ruleEntry), + } +} + +// External (though not exported) methods to implement the pattern interface + +func (p *actorPartition) start() { + p.wg.Go(p.loop) +} + +func (p *actorPartition) stop() { + close(p.opCh) + p.wg.Wait() +} + +func (p *actorPartition) registerRoute(pat domain.Pattern, ch chan<- domain.Message) { + done := make(chan struct{}, 1) + p.opCh <- opRegister{pattern: pat, channel: ch, done: done} + <-done +} + +func (p *actorPartition) deregisterRoute(pat domain.Pattern, ch chan<- domain.Message) { + done := make(chan struct{}, 1) + p.opCh <- opDeregister{pattern: pat, channel: ch, done: done} + <-done +} + +func (p *actorPartition) deliver(msg domain.Message) { + p.opCh <- opDeliver{msg: msg} +} + +// Internal + +type opRegister struct { + pattern domain.Pattern + channel chan<- domain.Message + done chan struct{} +} +type opDeregister struct { + pattern domain.Pattern + channel chan<- domain.Message + done chan struct{} +} +type opDeliver struct{ msg domain.Message } + +func (p *actorPartition) loop() { + for op := range p.opCh { + switch v := op.(type) { + + case opDeliver: + id := v.msg.Identifier + subs, exists := p.memo[id] + if !exists { + uniqueChannels := make(map[chan<- domain.Message]struct{}) + for _, e := range p.rules { + if e.pattern.Satisfies(id) { + for ch := range e.channels { + uniqueChannels[ch] = struct{}{} + } + } + } + + if len(uniqueChannels) > 0 { + uniqueChannelsSlice := make([]chan<- domain.Message, 0, len(uniqueChannels)) + for ch := range uniqueChannels { + uniqueChannelsSlice = append(uniqueChannelsSlice, ch) + } + p.memo[id] = uniqueChannelsSlice + subs = uniqueChannelsSlice + } else { + p.memo[id] = nil // cache "no subscribers", fast hot-path. + subs = nil + } + } + + for _, ch := range subs { + select { + case ch <- v.msg: + default: // drop on full ch + } + } + + case opRegister: + key := v.pattern.Canonical() + e, exists := p.rules[key] + if !exists { + e = &ruleEntry{pattern: v.pattern, channels: make(map[chan<- domain.Message]struct{})} + p.rules[key] = e + } + if _, exists := e.channels[v.channel]; exists { + v.done <- struct{}{} + continue + } + + e.channels[v.channel] = struct{}{} + + for id, subs := range p.memo { + if v.pattern.Satisfies(id) && !slices.Contains(subs, v.channel) { + p.memo[id] = append(subs, v.channel) + } + } + + v.done <- struct{}{} + + case opDeregister: + key := v.pattern.Canonical() + e, ok := p.rules[key] + if !ok { + v.done <- struct{}{} + continue + } + if _, ok := e.channels[v.channel]; !ok { + v.done <- struct{}{} + continue + } + + delete(e.channels, v.channel) + if len(e.channels) == 0 { + delete(p.rules, key) + } + + for id, subs := range p.memo { + if v.pattern.Satisfies(id) { + for i := range subs { + if subs[i] == v.channel { + last := len(subs) - 1 + subs[i] = subs[last] + subs[last] = nil // help GC + p.memo[id] = subs[:last] + break + } + } + } + } + + v.done <- struct{}{} + } + } +} diff --git a/services/data_service/internal/router/router.go b/services/data_service/internal/router/router.go deleted file mode 100644 index ddb0c26..0000000 --- a/services/data_service/internal/router/router.go +++ /dev/null @@ -1,70 +0,0 @@ -package router - -import ( - "log/slog" - "sync" - - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" -) - -type Router struct { - incoming chan domain.Message - routes map[domain.Identifier][]chan<- domain.Message - mu sync.RWMutex -} - -func NewRouter(buffer int) *Router { - return &Router{ - incoming: make(chan domain.Message, buffer), // Buffered channel for incoming messages - routes: make(map[domain.Identifier][]chan<- domain.Message), - } -} - -func (r *Router) IncomingChannel() chan<- domain.Message { - return r.incoming -} - -func (r *Router) Run() { - slog.Default().Info("router started", "cmp", "router") - for msg := range r.incoming { - r.mu.RLock() - channels := r.routes[msg.Identifier] - - for _, ch := range channels { - select { - case ch <- msg: - default: - slog.Default().Warn("dropping message due to backpressure", "cmp", "router", "identifier", msg.Identifier.Key()) - } - } - r.mu.RUnlock() - } -} - -func (r *Router) RegisterRoute(id domain.Identifier, ch chan<- domain.Message) { - r.mu.Lock() - r.routes[id] = append(r.routes[id], ch) - r.mu.Unlock() - - slog.Default().Debug("registered route", "cmp", "router", "identifier", id.Key(), "channel", ch) -} - -func (r *Router) DeregisterRoute(id domain.Identifier, ch chan<- domain.Message) { - r.mu.Lock() - slice := r.routes[id] - for i := 0; i < len(slice); i++ { - if slice[i] == ch { - slice[i] = slice[len(slice)-1] - slice = slice[:len(slice)-1] - i-- - } - } - if len(slice) == 0 { - delete(r.routes, id) - } else { - r.routes[id] = slice - } - r.mu.Unlock() - - slog.Default().Debug("deregistered route", "cmp", "router", "identifier", id.Key(), "channel", ch) -} diff --git a/services/data_service/internal/router/routerAdv.go b/services/data_service/internal/router/routerAdv.go new file mode 100644 index 0000000..4d8a770 --- /dev/null +++ b/services/data_service/internal/router/routerAdv.go @@ -0,0 +1,127 @@ +// Package router for routing! +package router + +import ( + "fmt" + "sync" + + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" +) + +type partition interface { + registerRoute(domain.Pattern, chan<- domain.Message) + deregisterRoute(domain.Pattern, chan<- domain.Message) + deliver(domain.Message) + start() + stop() +} + +// Factories take a buffer size (internal), router stores a zero-arg thunk. +var partitionFactories = map[string]func(int) partition{ + "actor": func(buf int) partition { return newActorPartition(buf) }, +} + +type Router struct { + incoming chan domain.Message + mu sync.RWMutex + partitions map[string]partition + newPart func() partition // zero-arg thunk + wg sync.WaitGroup + started bool +} + +func NewRouter(kind string, incomingBuf, partBuf int) (*Router, error) { + if incomingBuf <= 0 { + incomingBuf = 2048 + } + if partBuf <= 0 { + partBuf = 1024 + } + + makePartWithBuf, ok := partitionFactories[kind] + if !ok { + return nil, fmt.Errorf("unknown partition kind %q", kind) + } + // Curry (!!!) to zero-arg + makePart := func() partition { return makePartWithBuf(partBuf) } + + return &Router{ + incoming: make(chan domain.Message, incomingBuf), + partitions: make(map[string]partition), + newPart: makePart, + }, nil +} + +func (r *Router) Start() { + r.mu.Lock() + if r.started { + r.mu.Unlock() + return + } + r.started = true + r.mu.Unlock() + + r.wg.Go(func() { + for msg := range r.incoming { + ns, _, _ := msg.Identifier.Parse() + r.mu.RLock() + p, exists := r.partitions[ns] + r.mu.RUnlock() + if exists { + p.deliver(msg) + } + } + }) +} + +func (r *Router) Stop() { + r.mu.Lock() + if !r.started { + r.mu.Unlock() + return + } + r.started = false + close(r.incoming) + + ps := make([]partition, 0, len(r.partitions)) + for _, p := range r.partitions { + ps = append(ps, p) + } + r.partitions = make(map[string]partition) + r.mu.Unlock() + + for _, p := range ps { + p.stop() + } + r.wg.Wait() +} + +func (r *Router) Incoming() chan<- domain.Message { return r.incoming } + +func (r *Router) RegisterPattern(pat domain.Pattern, ch chan<- domain.Message) { + // Inline ensurePartition + ns := pat.Namespace + r.mu.RLock() + p := r.partitions[ns] + r.mu.RUnlock() + if p == nil { + r.mu.Lock() + // recheck under write lock + if p = r.partitions[ns]; p == nil { + p = r.newPart() + p.start() + r.partitions[ns] = p + } + r.mu.Unlock() + } + p.registerRoute(pat, ch) +} + +func (r *Router) DeregisterPattern(pat domain.Pattern, ch chan<- domain.Message) { + r.mu.RLock() + p := r.partitions[pat.Namespace] + r.mu.RUnlock() + if p != nil { + p.deregisterRoute(pat, ch) + } +}