Updating router to provide new advanced routing logic
This commit is contained in:
@@ -1,181 +1,131 @@
|
||||
// Package domain defines external message identifiers.
|
||||
package domain
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
prefixRaw = "raw::"
|
||||
prefixInternal = "internal::"
|
||||
)
|
||||
var ErrBadIdentifier = errors.New("identifier: invalid format")
|
||||
|
||||
// Identifier is a canonical representation of a data stream identifier.
|
||||
// Identifier is a lightweight wrapper around the canonical key.
|
||||
type Identifier struct{ key string }
|
||||
|
||||
func (id Identifier) IsRaw() bool { return strings.HasPrefix(id.key, prefixRaw) }
|
||||
func (id Identifier) IsInternal() bool { return strings.HasPrefix(id.key, prefixInternal) }
|
||||
func (id Identifier) Key() string { return id.key }
|
||||
// NewIdentifier builds a canonical key: "namespace::l1.l2[param=v;...] .l3".
|
||||
// Labels and params are sorted for determinism.
|
||||
func NewIdentifier(namespace string, labels map[string]map[string]string) Identifier {
|
||||
var b strings.Builder
|
||||
// rough prealloc: ns + "::" + avg label + some params
|
||||
b.Grow(len(namespace) + 2 + 10*len(labels) + 20)
|
||||
|
||||
func (id Identifier) ProviderSubject() (provider, subject string, ok bool) {
|
||||
if !id.IsRaw() {
|
||||
return "", "", false
|
||||
// namespace
|
||||
b.WriteString(namespace)
|
||||
b.WriteString("::")
|
||||
|
||||
// sort label names for stable output
|
||||
labelNames := make([]string, 0, len(labels))
|
||||
for name := range labels {
|
||||
labelNames = append(labelNames, name)
|
||||
}
|
||||
body := strings.TrimPrefix(id.key, prefixRaw)
|
||||
prov, subj, ok := strings.Cut(body, ".")
|
||||
return prov, subj, ok
|
||||
sort.Strings(labelNames)
|
||||
|
||||
for i, name := range labelNames {
|
||||
if i > 0 {
|
||||
b.WriteByte('.')
|
||||
}
|
||||
b.WriteString(name)
|
||||
|
||||
// params (sorted)
|
||||
params := labels[name]
|
||||
if len(params) > 0 {
|
||||
keys := make([]string, 0, len(params))
|
||||
for k := range params {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
|
||||
b.WriteByte('[')
|
||||
for j, k := range keys {
|
||||
if j > 0 {
|
||||
b.WriteByte(';')
|
||||
}
|
||||
b.WriteString(k)
|
||||
b.WriteByte('=')
|
||||
b.WriteString(params[k])
|
||||
}
|
||||
b.WriteByte(']')
|
||||
}
|
||||
}
|
||||
|
||||
return Identifier{key: b.String()}
|
||||
}
|
||||
|
||||
func (id Identifier) InternalParts() (venue, stream, symbol string, params map[string]string, ok bool) {
|
||||
if !id.IsInternal() {
|
||||
return "", "", "", nil, false
|
||||
// NewIdentifierFromRaw wraps a raw key without validation.
|
||||
func NewIdentifierFromRaw(raw string) Identifier { return Identifier{key: raw} }
|
||||
|
||||
// Key returns the canonical key string.
|
||||
func (id Identifier) Key() string { return id.key }
|
||||
|
||||
// Parse returns namespace and labels from Key.
|
||||
// Format: "namespace::label1.label2[param=a;foo=bar].label3"
|
||||
func (id Identifier) Parse() (string, map[string]map[string]string, error) {
|
||||
k := id.key
|
||||
i := strings.Index(k, "::")
|
||||
if i <= 0 {
|
||||
return "", nil, ErrBadIdentifier
|
||||
}
|
||||
body := strings.TrimPrefix(id.key, prefixInternal)
|
||||
before, bracket, _ := strings.Cut(body, "[")
|
||||
parts := strings.Split(before, ".")
|
||||
if len(parts) != 3 {
|
||||
return "", "", "", nil, false
|
||||
ns := k[:i]
|
||||
if ns == "" {
|
||||
return "", nil, ErrBadIdentifier
|
||||
}
|
||||
return parts[0], parts[1], parts[2], decodeParams(strings.TrimSuffix(bracket, "]")), true
|
||||
raw := k[i+2:]
|
||||
|
||||
lbls := make(map[string]map[string]string, 8)
|
||||
if raw == "" {
|
||||
return ns, lbls, nil
|
||||
}
|
||||
|
||||
for tok := range strings.SplitSeq(raw, ".") {
|
||||
if tok == "" {
|
||||
continue
|
||||
}
|
||||
name, params, err := parseLabel(tok)
|
||||
if err != nil || name == "" {
|
||||
return "", nil, ErrBadIdentifier
|
||||
}
|
||||
lbls[name] = params
|
||||
}
|
||||
return ns, lbls, nil
|
||||
}
|
||||
|
||||
func RawID(provider, subject string) (Identifier, error) {
|
||||
p := strings.ToLower(strings.TrimSpace(provider))
|
||||
s := strings.TrimSpace(subject)
|
||||
|
||||
if err := validateComponent("provider", p, false); err != nil {
|
||||
return Identifier{}, err
|
||||
// parseLabel parses "name" or "name[k=v;...]" into (name, params).
|
||||
func parseLabel(tok string) (string, map[string]string, error) {
|
||||
lb := strings.IndexByte(tok, '[')
|
||||
if lb == -1 {
|
||||
return tok, map[string]string{}, nil
|
||||
}
|
||||
if err := validateComponent("subject", s, true); err != nil {
|
||||
return Identifier{}, err
|
||||
}
|
||||
return Identifier{key: prefixRaw + p + "." + s}, nil
|
||||
}
|
||||
|
||||
func InternalID(venue, stream, symbol string, params map[string]string) (Identifier, error) {
|
||||
v := strings.ToLower(strings.TrimSpace(venue))
|
||||
t := strings.ToLower(strings.TrimSpace(stream))
|
||||
sym := strings.ToUpper(strings.TrimSpace(symbol))
|
||||
|
||||
if err := validateComponent("venue", v, false); err != nil {
|
||||
return Identifier{}, err
|
||||
}
|
||||
if err := validateComponent("stream", t, false); err != nil {
|
||||
return Identifier{}, err
|
||||
}
|
||||
if err := validateComponent("symbol", sym, false); err != nil {
|
||||
return Identifier{}, err
|
||||
rb := strings.LastIndexByte(tok, ']')
|
||||
if rb == -1 || rb < lb {
|
||||
return "", nil, ErrBadIdentifier
|
||||
}
|
||||
|
||||
paramStr, err := encodeParams(params) // "k=v;..." or ""
|
||||
if err != nil {
|
||||
return Identifier{}, err
|
||||
}
|
||||
name := tok[:lb]
|
||||
paramStr := tok[lb+1 : rb]
|
||||
params := map[string]string{}
|
||||
if paramStr == "" {
|
||||
paramStr = "[]"
|
||||
} else {
|
||||
paramStr = "[" + paramStr + "]"
|
||||
return name, params, nil
|
||||
}
|
||||
return Identifier{key: prefixInternal + v + "." + t + "." + sym + paramStr}, nil
|
||||
}
|
||||
|
||||
func ParseIdentifier(s string) (Identifier, error) {
|
||||
s = strings.TrimSpace(s)
|
||||
switch {
|
||||
case strings.HasPrefix(s, prefixRaw):
|
||||
// raw::provider.subject
|
||||
body := strings.TrimPrefix(s, prefixRaw)
|
||||
prov, subj, ok := strings.Cut(body, ".")
|
||||
if !ok {
|
||||
return Identifier{}, errors.New("invalid raw identifier: missing '.'")
|
||||
}
|
||||
return RawID(prov, subj)
|
||||
|
||||
case strings.HasPrefix(s, prefixInternal):
|
||||
// internal::venue.stream.symbol[...]
|
||||
body := strings.TrimPrefix(s, prefixInternal)
|
||||
before, bracket, _ := strings.Cut(body, "[")
|
||||
parts := strings.Split(before, ".")
|
||||
if len(parts) != 3 {
|
||||
return Identifier{}, errors.New("invalid internal identifier: need venue.stream.symbol")
|
||||
}
|
||||
params := decodeParams(strings.TrimSuffix(bracket, "]"))
|
||||
return InternalID(parts[0], parts[1], parts[2], params)
|
||||
}
|
||||
return Identifier{}, errors.New("unknown identifier prefix")
|
||||
}
|
||||
|
||||
var (
|
||||
segDisallow = regexp.MustCompile(`[ \t\r\n\[\]]`) // forbid whitespace/brackets in fixed segments
|
||||
dotDisallow = regexp.MustCompile(`[.]`) // fixed segments cannot contain '.'
|
||||
)
|
||||
|
||||
// allowAny=true (for subject) skips dot checks but still forbids whitespace/brackets.
|
||||
func validateComponent(name, v string, allowAny bool) error {
|
||||
if v == "" {
|
||||
return fmt.Errorf("%s cannot be empty", name)
|
||||
}
|
||||
if allowAny {
|
||||
if segDisallow.MatchString(v) {
|
||||
return fmt.Errorf("%s contains illegal chars [] or whitespace", name)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if segDisallow.MatchString(v) || dotDisallow.MatchString(v) {
|
||||
return fmt.Errorf("%s contains illegal chars (dot/brackets/whitespace)", name)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// encodeParams renders sorted k=v pairs separated by ';'.
|
||||
func encodeParams(params map[string]string) (string, error) {
|
||||
if len(params) == 0 {
|
||||
return "", nil
|
||||
}
|
||||
keys := make([]string, 0, len(params))
|
||||
for k := range params {
|
||||
k = strings.ToLower(strings.TrimSpace(k))
|
||||
if k == "" {
|
||||
for pair := range strings.SplitSeq(paramStr, ";") {
|
||||
if pair == "" {
|
||||
continue
|
||||
}
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
|
||||
out := make([]string, 0, len(keys))
|
||||
for _, k := range keys {
|
||||
v := strings.TrimSpace(params[k])
|
||||
// prevent breaking delimiters
|
||||
if strings.ContainsAny(k, ";]") || strings.ContainsAny(v, ";]") {
|
||||
return "", fmt.Errorf("param %q contains illegal ';' or ']'", k)
|
||||
kv := strings.SplitN(pair, "=", 2)
|
||||
if len(kv) != 2 || kv[0] == "" {
|
||||
return "", nil, ErrBadIdentifier
|
||||
}
|
||||
out = append(out, k+"="+v)
|
||||
params[kv[0]] = kv[1]
|
||||
}
|
||||
return strings.Join(out, ";"), nil
|
||||
}
|
||||
|
||||
func decodeParams(s string) map[string]string {
|
||||
s = strings.TrimSpace(s)
|
||||
if s == "" {
|
||||
return map[string]string{}
|
||||
}
|
||||
out := make(map[string]string, 4)
|
||||
for _, p := range strings.Split(s, ";") {
|
||||
if p == "" {
|
||||
continue
|
||||
}
|
||||
kv := strings.SplitN(p, "=", 2)
|
||||
if len(kv) != 2 {
|
||||
continue
|
||||
}
|
||||
k := strings.ToLower(strings.TrimSpace(kv[0]))
|
||||
v := strings.TrimSpace(kv[1])
|
||||
if k != "" {
|
||||
out[k] = v
|
||||
}
|
||||
}
|
||||
return out
|
||||
return name, params, nil
|
||||
}
|
||||
|
||||
94
services/data_service/internal/domain/pattern.go
Normal file
94
services/data_service/internal/domain/pattern.go
Normal file
@@ -0,0 +1,94 @@
|
||||
package domain
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type Pattern struct {
|
||||
Namespace string
|
||||
Labels map[string]map[string]string
|
||||
Exact bool
|
||||
}
|
||||
|
||||
// Canonical returns a canonical string representation of the Pattern struct
|
||||
// TODO: Ensure labels and namespaces are set to lowercase
|
||||
func (p *Pattern) Canonical() string {
|
||||
var b strings.Builder
|
||||
b.Grow(len(p.Namespace) + 10*len(p.Labels) + 20) // preallocate a rough size estimate
|
||||
|
||||
b.WriteString(p.Namespace)
|
||||
b.WriteString("::")
|
||||
|
||||
labelNames := make([]string, 0, len(p.Labels))
|
||||
for name := range p.Labels {
|
||||
labelNames = append(labelNames, name)
|
||||
}
|
||||
sort.Strings(labelNames) // sort the labels for determinism
|
||||
|
||||
for i, name := range labelNames {
|
||||
if i > 0 {
|
||||
b.WriteByte('|')
|
||||
}
|
||||
b.WriteString(name)
|
||||
|
||||
params := p.Labels[name]
|
||||
if len(params) > 0 {
|
||||
keys := make([]string, 0, len(params))
|
||||
for k := range params {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys) // sort params for determinism
|
||||
|
||||
b.WriteByte('[')
|
||||
for j, k := range keys {
|
||||
if j > 0 {
|
||||
b.WriteByte(';')
|
||||
}
|
||||
b.WriteString(k)
|
||||
b.WriteByte('=')
|
||||
b.WriteString(params[k])
|
||||
}
|
||||
b.WriteByte(']')
|
||||
}
|
||||
}
|
||||
|
||||
b.WriteString("::")
|
||||
if p.Exact {
|
||||
b.WriteString("t")
|
||||
} else {
|
||||
b.WriteString("f")
|
||||
}
|
||||
|
||||
return b.String()
|
||||
}
|
||||
|
||||
// Satisfies checks if a domain.Identifier satisfies the pattern.
|
||||
func (p *Pattern) Satisfies(id Identifier) bool {
|
||||
ns, idLabels, err := id.Parse()
|
||||
if err != nil || ns != p.Namespace {
|
||||
return false
|
||||
}
|
||||
|
||||
// Every pattern label must be present in the identifier.
|
||||
for lname, wantParams := range p.Labels {
|
||||
haveParams, ok := idLabels[lname]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
// If pattern specifies params, they must be a subset of identifier's params.
|
||||
for k, v := range wantParams {
|
||||
hv, ok := haveParams[k]
|
||||
if !ok || hv != v {
|
||||
return false
|
||||
}
|
||||
}
|
||||
// If pattern has no params for this label, it matches any/none params in the identifier.
|
||||
}
|
||||
|
||||
// Exact applies to label names only: no extras allowed.
|
||||
if p.Exact && len(idLabels) != len(p.Labels) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
@@ -60,7 +60,7 @@ type detachResult struct {
|
||||
|
||||
type configureCmd struct {
|
||||
sid uuid.UUID
|
||||
next []domain.Identifier
|
||||
next []domain.Pattern
|
||||
resp chan configureResult
|
||||
}
|
||||
|
||||
|
||||
@@ -1,36 +1 @@
|
||||
package manager
|
||||
|
||||
import (
|
||||
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain"
|
||||
)
|
||||
|
||||
func identifierSetDifferences(oldIDs, nextIDs []domain.Identifier) (toAdd, toDel []domain.Identifier) {
|
||||
oldSet := make(map[domain.Identifier]struct{}, len(oldIDs))
|
||||
for _, id := range oldIDs {
|
||||
oldSet[id] = struct{}{}
|
||||
}
|
||||
|
||||
newSet := make(map[domain.Identifier]struct{}, len(nextIDs))
|
||||
for _, id := range nextIDs {
|
||||
newSet[id] = struct{}{}
|
||||
if _, ok := oldSet[id]; !ok {
|
||||
toAdd = append(toAdd, id)
|
||||
}
|
||||
}
|
||||
|
||||
for _, id := range oldIDs {
|
||||
if _, ok := newSet[id]; !ok {
|
||||
toDel = append(toDel, id)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func identifierMapToSlice(m map[domain.Identifier]struct{}) []domain.Identifier {
|
||||
ids := make([]domain.Identifier, 0, len(m))
|
||||
for id := range m {
|
||||
ids = append(ids, id)
|
||||
}
|
||||
return ids
|
||||
}
|
||||
|
||||
@@ -43,7 +43,7 @@ func NewManager(router *router.Router, workerRegistry *worker.Registry) *Manager
|
||||
sessions: make(map[uuid.UUID]*session),
|
||||
router: router,
|
||||
}
|
||||
go router.Run()
|
||||
go router.Start()
|
||||
go m.run()
|
||||
|
||||
slog.Default().Info("manager started", slog.String("cmp", "manager"))
|
||||
@@ -113,9 +113,9 @@ func (m *Manager) DetachClient(id uuid.UUID) error {
|
||||
return r.err
|
||||
}
|
||||
|
||||
// ConfigureSession sets the next set of identifiers for the session, starting and stopping streams as needed.
|
||||
func (m *Manager) ConfigureSession(id uuid.UUID, next []domain.Identifier) error {
|
||||
slog.Default().Debug("configure session request", slog.String("cmp", "manager"), slog.String("session", id.String()), slog.Int("idents", len(next)))
|
||||
// ConfigureSession sets the next set of patterns for the session, starting and stopping streams as needed.
|
||||
func (m *Manager) ConfigureSession(id uuid.UUID, next []domain.Pattern) error {
|
||||
slog.Default().Debug("configure session request", slog.String("cmp", "manager"), slog.String("session", id.String()), slog.Int("patterns", len(next)))
|
||||
resp := make(chan configureResult, 1)
|
||||
m.cmdCh <- configureCmd{sid: id, next: next, resp: resp}
|
||||
|
||||
@@ -218,10 +218,6 @@ func (m *Manager) handleAttach(cmd attachCmd) {
|
||||
s.attached = true
|
||||
s.disarmIdleTimer()
|
||||
|
||||
for id := range s.bound {
|
||||
m.router.RegisterRoute(id, cout)
|
||||
}
|
||||
|
||||
cmd.resp <- attachResult{cin: cin, cout: cout, err: nil}
|
||||
}
|
||||
|
||||
@@ -237,10 +233,6 @@ func (m *Manager) handleDetach(cmd detachCmd) {
|
||||
return
|
||||
}
|
||||
|
||||
for id := range s.bound {
|
||||
m.router.DeregisterRoute(id, s.outChannel)
|
||||
}
|
||||
|
||||
s.clearChannels()
|
||||
|
||||
// Only rearm the idle timer if the timeout is positive.
|
||||
@@ -260,109 +252,20 @@ func (m *Manager) handleDetach(cmd detachCmd) {
|
||||
// handleConfigure updates the session bindings, starting and stopping streams as needed. Currently only supports Raw streams.
|
||||
// TODO: Change this configuration to be an atomic operation, so that partial failures do not end in a half-configured state.
|
||||
func (m *Manager) handleConfigure(cmd configureCmd) {
|
||||
s, ok := m.sessions[cmd.sid]
|
||||
_, ok := m.sessions[cmd.sid]
|
||||
if !ok {
|
||||
cmd.resp <- configureResult{ErrSessionNotFound}
|
||||
return
|
||||
}
|
||||
|
||||
toAdd, toRemove := identifierSetDifferences(identifierMapToSlice(s.bound), cmd.next)
|
||||
|
||||
pendingSub := make(map[domain.Identifier]<-chan error)
|
||||
pendingUnsub := make(map[domain.Identifier]<-chan error)
|
||||
var added, removed []domain.Identifier
|
||||
var errs error
|
||||
|
||||
// Adds
|
||||
for _, id := range toAdd {
|
||||
pName, subject, ok := id.ProviderSubject()
|
||||
if !ok || subject == "" || pName == "" {
|
||||
errs = errors.Join(errs, fmt.Errorf("invalid identifier: %s", id.Key()))
|
||||
continue
|
||||
}
|
||||
p, ok := m.providers[pName]
|
||||
if !ok {
|
||||
errs = errors.Join(errs, fmt.Errorf("provider not found: %s", pName))
|
||||
continue
|
||||
}
|
||||
if p.IsStreamActive(subject) {
|
||||
s.bound[id] = struct{}{}
|
||||
added = append(added, id)
|
||||
continue
|
||||
}
|
||||
pendingSub[id] = p.Subscribe(subject)
|
||||
}
|
||||
|
||||
// Removes
|
||||
for _, id := range toRemove {
|
||||
pName, subject, ok := id.ProviderSubject()
|
||||
if !ok || subject == "" || pName == "" {
|
||||
errs = errors.Join(errs, fmt.Errorf("invalid identifier: %s", id.Key()))
|
||||
continue
|
||||
}
|
||||
p, ok := m.providers[pName]
|
||||
if !ok {
|
||||
errs = errors.Join(errs, fmt.Errorf("provider not found: %s", pName))
|
||||
continue
|
||||
}
|
||||
stillNeeded := false
|
||||
for _, other := range m.sessions {
|
||||
if other.id == s.id {
|
||||
continue
|
||||
}
|
||||
if _, bound := other.bound[id]; bound {
|
||||
stillNeeded = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if stillNeeded {
|
||||
delete(s.bound, id)
|
||||
removed = append(removed, id)
|
||||
continue
|
||||
}
|
||||
pendingUnsub[id] = p.Unsubscribe(subject)
|
||||
}
|
||||
|
||||
// Wait for subscribes
|
||||
for id, ch := range pendingSub {
|
||||
if err := <-ch; err != nil {
|
||||
errs = errors.Join(errs, fmt.Errorf("failed to subscribe to %s: %w", id.Key(), err))
|
||||
continue
|
||||
}
|
||||
s.bound[id] = struct{}{}
|
||||
added = append(added, id)
|
||||
}
|
||||
|
||||
// Wait for unsubscribes
|
||||
for id, ch := range pendingUnsub {
|
||||
if err := <-ch; err != nil {
|
||||
errs = errors.Join(errs, fmt.Errorf("failed to unsubscribe from %s: %w", id.Key(), err))
|
||||
continue
|
||||
}
|
||||
delete(s.bound, id)
|
||||
removed = append(removed, id)
|
||||
}
|
||||
|
||||
if s.attached {
|
||||
if s.inChannel == nil || s.outChannel == nil {
|
||||
errs = errors.Join(errs, fmt.Errorf("channels do not exist despite attached state")) // error should never be hit
|
||||
slog.Default().Error("no channels despite attached state", slog.String("cmp", "manager"), slog.String("session", cmd.sid.String()))
|
||||
} else {
|
||||
for _, id := range added {
|
||||
m.router.RegisterRoute(id, s.outChannel)
|
||||
}
|
||||
for _, id := range removed {
|
||||
m.router.DeregisterRoute(id, s.outChannel)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cmd.resp <- configureResult{err: errs}
|
||||
}
|
||||
|
||||
// handleCloseSession closes and removes the session, cleaning up all bindings.
|
||||
func (m *Manager) handleCloseSession(cmd closeSessionCmd) {
|
||||
s, ok := m.sessions[cmd.sid]
|
||||
_, ok := m.sessions[cmd.sid]
|
||||
if !ok {
|
||||
cmd.resp <- closeSessionResult{err: ErrSessionNotFound}
|
||||
return
|
||||
@@ -370,60 +273,5 @@ func (m *Manager) handleCloseSession(cmd closeSessionCmd) {
|
||||
|
||||
var errs error
|
||||
|
||||
// Deregister attached routes
|
||||
if s.attached {
|
||||
if s.outChannel == nil {
|
||||
errs = errors.Join(errs, fmt.Errorf("channels do not exist despite attached state"))
|
||||
slog.Default().Error("no channels despite attached state", slog.String("cmp", "manager"), slog.String("session", cmd.sid.String()))
|
||||
} else {
|
||||
for id := range s.bound {
|
||||
m.router.DeregisterRoute(id, s.outChannel)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Unsubscribe from all streams if no other session needs them.
|
||||
pendingUnsub := make(map[domain.Identifier]<-chan error)
|
||||
|
||||
for id := range s.bound {
|
||||
pName, subject, ok := id.ProviderSubject()
|
||||
if !ok || subject == "" || pName == "" {
|
||||
errs = errors.Join(errs, fmt.Errorf("invalid identifier: %s", id.Key()))
|
||||
continue
|
||||
}
|
||||
p, ok := m.providers[pName]
|
||||
if !ok {
|
||||
errs = errors.Join(errs, fmt.Errorf("provider not found: %s", pName))
|
||||
continue
|
||||
}
|
||||
|
||||
stillNeeded := false
|
||||
for _, other := range m.sessions {
|
||||
if other.id == s.id {
|
||||
continue
|
||||
}
|
||||
if _, bound := other.bound[id]; bound {
|
||||
stillNeeded = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if stillNeeded {
|
||||
continue
|
||||
}
|
||||
|
||||
pendingUnsub[id] = p.Unsubscribe(subject)
|
||||
}
|
||||
|
||||
for id, ch := range pendingUnsub {
|
||||
if err := <-ch; err != nil {
|
||||
errs = errors.Join(errs, fmt.Errorf("failed to unsubscribe from %s: %w", id.Key(), err))
|
||||
}
|
||||
}
|
||||
|
||||
// Stop timers and channels, remove session.
|
||||
s.disarmIdleTimer()
|
||||
s.clearChannels()
|
||||
delete(m.sessions, s.id)
|
||||
|
||||
cmd.resp <- closeSessionResult{err: errs}
|
||||
}
|
||||
|
||||
165
services/data_service/internal/router/partition.go
Normal file
165
services/data_service/internal/router/partition.go
Normal file
@@ -0,0 +1,165 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain"
|
||||
)
|
||||
|
||||
// actorPartition: single goroutine owns state.
|
||||
type actorPartition struct {
|
||||
opCh chan any
|
||||
wg sync.WaitGroup
|
||||
memo map[domain.Identifier][]chan<- domain.Message
|
||||
rules map[string]*ruleEntry // the string is to be a pattern.Canonical()
|
||||
}
|
||||
|
||||
type ruleEntry struct {
|
||||
pattern domain.Pattern
|
||||
channels map[chan<- domain.Message]struct{}
|
||||
}
|
||||
|
||||
func newActorPartition(bufferSize int) *actorPartition {
|
||||
return &actorPartition{
|
||||
opCh: make(chan any, bufferSize),
|
||||
memo: make(map[domain.Identifier][]chan<- domain.Message),
|
||||
rules: make(map[string]*ruleEntry),
|
||||
}
|
||||
}
|
||||
|
||||
// External (though not exported) methods to implement the pattern interface
|
||||
|
||||
func (p *actorPartition) start() {
|
||||
p.wg.Go(p.loop)
|
||||
}
|
||||
|
||||
func (p *actorPartition) stop() {
|
||||
close(p.opCh)
|
||||
p.wg.Wait()
|
||||
}
|
||||
|
||||
func (p *actorPartition) registerRoute(pat domain.Pattern, ch chan<- domain.Message) {
|
||||
done := make(chan struct{}, 1)
|
||||
p.opCh <- opRegister{pattern: pat, channel: ch, done: done}
|
||||
<-done
|
||||
}
|
||||
|
||||
func (p *actorPartition) deregisterRoute(pat domain.Pattern, ch chan<- domain.Message) {
|
||||
done := make(chan struct{}, 1)
|
||||
p.opCh <- opDeregister{pattern: pat, channel: ch, done: done}
|
||||
<-done
|
||||
}
|
||||
|
||||
func (p *actorPartition) deliver(msg domain.Message) {
|
||||
p.opCh <- opDeliver{msg: msg}
|
||||
}
|
||||
|
||||
// Internal
|
||||
|
||||
type opRegister struct {
|
||||
pattern domain.Pattern
|
||||
channel chan<- domain.Message
|
||||
done chan struct{}
|
||||
}
|
||||
type opDeregister struct {
|
||||
pattern domain.Pattern
|
||||
channel chan<- domain.Message
|
||||
done chan struct{}
|
||||
}
|
||||
type opDeliver struct{ msg domain.Message }
|
||||
|
||||
func (p *actorPartition) loop() {
|
||||
for op := range p.opCh {
|
||||
switch v := op.(type) {
|
||||
|
||||
case opDeliver:
|
||||
id := v.msg.Identifier
|
||||
subs, exists := p.memo[id]
|
||||
if !exists {
|
||||
uniqueChannels := make(map[chan<- domain.Message]struct{})
|
||||
for _, e := range p.rules {
|
||||
if e.pattern.Satisfies(id) {
|
||||
for ch := range e.channels {
|
||||
uniqueChannels[ch] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(uniqueChannels) > 0 {
|
||||
uniqueChannelsSlice := make([]chan<- domain.Message, 0, len(uniqueChannels))
|
||||
for ch := range uniqueChannels {
|
||||
uniqueChannelsSlice = append(uniqueChannelsSlice, ch)
|
||||
}
|
||||
p.memo[id] = uniqueChannelsSlice
|
||||
subs = uniqueChannelsSlice
|
||||
} else {
|
||||
p.memo[id] = nil // cache "no subscribers", fast hot-path.
|
||||
subs = nil
|
||||
}
|
||||
}
|
||||
|
||||
for _, ch := range subs {
|
||||
select {
|
||||
case ch <- v.msg:
|
||||
default: // drop on full ch
|
||||
}
|
||||
}
|
||||
|
||||
case opRegister:
|
||||
key := v.pattern.Canonical()
|
||||
e, exists := p.rules[key]
|
||||
if !exists {
|
||||
e = &ruleEntry{pattern: v.pattern, channels: make(map[chan<- domain.Message]struct{})}
|
||||
p.rules[key] = e
|
||||
}
|
||||
if _, exists := e.channels[v.channel]; exists {
|
||||
v.done <- struct{}{}
|
||||
continue
|
||||
}
|
||||
|
||||
e.channels[v.channel] = struct{}{}
|
||||
|
||||
for id, subs := range p.memo {
|
||||
if v.pattern.Satisfies(id) && !slices.Contains(subs, v.channel) {
|
||||
p.memo[id] = append(subs, v.channel)
|
||||
}
|
||||
}
|
||||
|
||||
v.done <- struct{}{}
|
||||
|
||||
case opDeregister:
|
||||
key := v.pattern.Canonical()
|
||||
e, ok := p.rules[key]
|
||||
if !ok {
|
||||
v.done <- struct{}{}
|
||||
continue
|
||||
}
|
||||
if _, ok := e.channels[v.channel]; !ok {
|
||||
v.done <- struct{}{}
|
||||
continue
|
||||
}
|
||||
|
||||
delete(e.channels, v.channel)
|
||||
if len(e.channels) == 0 {
|
||||
delete(p.rules, key)
|
||||
}
|
||||
|
||||
for id, subs := range p.memo {
|
||||
if v.pattern.Satisfies(id) {
|
||||
for i := range subs {
|
||||
if subs[i] == v.channel {
|
||||
last := len(subs) - 1
|
||||
subs[i] = subs[last]
|
||||
subs[last] = nil // help GC
|
||||
p.memo[id] = subs[:last]
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
v.done <- struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,70 +0,0 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"sync"
|
||||
|
||||
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain"
|
||||
)
|
||||
|
||||
type Router struct {
|
||||
incoming chan domain.Message
|
||||
routes map[domain.Identifier][]chan<- domain.Message
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func NewRouter(buffer int) *Router {
|
||||
return &Router{
|
||||
incoming: make(chan domain.Message, buffer), // Buffered channel for incoming messages
|
||||
routes: make(map[domain.Identifier][]chan<- domain.Message),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Router) IncomingChannel() chan<- domain.Message {
|
||||
return r.incoming
|
||||
}
|
||||
|
||||
func (r *Router) Run() {
|
||||
slog.Default().Info("router started", "cmp", "router")
|
||||
for msg := range r.incoming {
|
||||
r.mu.RLock()
|
||||
channels := r.routes[msg.Identifier]
|
||||
|
||||
for _, ch := range channels {
|
||||
select {
|
||||
case ch <- msg:
|
||||
default:
|
||||
slog.Default().Warn("dropping message due to backpressure", "cmp", "router", "identifier", msg.Identifier.Key())
|
||||
}
|
||||
}
|
||||
r.mu.RUnlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Router) RegisterRoute(id domain.Identifier, ch chan<- domain.Message) {
|
||||
r.mu.Lock()
|
||||
r.routes[id] = append(r.routes[id], ch)
|
||||
r.mu.Unlock()
|
||||
|
||||
slog.Default().Debug("registered route", "cmp", "router", "identifier", id.Key(), "channel", ch)
|
||||
}
|
||||
|
||||
func (r *Router) DeregisterRoute(id domain.Identifier, ch chan<- domain.Message) {
|
||||
r.mu.Lock()
|
||||
slice := r.routes[id]
|
||||
for i := 0; i < len(slice); i++ {
|
||||
if slice[i] == ch {
|
||||
slice[i] = slice[len(slice)-1]
|
||||
slice = slice[:len(slice)-1]
|
||||
i--
|
||||
}
|
||||
}
|
||||
if len(slice) == 0 {
|
||||
delete(r.routes, id)
|
||||
} else {
|
||||
r.routes[id] = slice
|
||||
}
|
||||
r.mu.Unlock()
|
||||
|
||||
slog.Default().Debug("deregistered route", "cmp", "router", "identifier", id.Key(), "channel", ch)
|
||||
}
|
||||
127
services/data_service/internal/router/routerAdv.go
Normal file
127
services/data_service/internal/router/routerAdv.go
Normal file
@@ -0,0 +1,127 @@
|
||||
// Package router for routing!
|
||||
package router
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain"
|
||||
)
|
||||
|
||||
type partition interface {
|
||||
registerRoute(domain.Pattern, chan<- domain.Message)
|
||||
deregisterRoute(domain.Pattern, chan<- domain.Message)
|
||||
deliver(domain.Message)
|
||||
start()
|
||||
stop()
|
||||
}
|
||||
|
||||
// Factories take a buffer size (internal), router stores a zero-arg thunk.
|
||||
var partitionFactories = map[string]func(int) partition{
|
||||
"actor": func(buf int) partition { return newActorPartition(buf) },
|
||||
}
|
||||
|
||||
type Router struct {
|
||||
incoming chan domain.Message
|
||||
mu sync.RWMutex
|
||||
partitions map[string]partition
|
||||
newPart func() partition // zero-arg thunk
|
||||
wg sync.WaitGroup
|
||||
started bool
|
||||
}
|
||||
|
||||
func NewRouter(kind string, incomingBuf, partBuf int) (*Router, error) {
|
||||
if incomingBuf <= 0 {
|
||||
incomingBuf = 2048
|
||||
}
|
||||
if partBuf <= 0 {
|
||||
partBuf = 1024
|
||||
}
|
||||
|
||||
makePartWithBuf, ok := partitionFactories[kind]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unknown partition kind %q", kind)
|
||||
}
|
||||
// Curry (!!!) to zero-arg
|
||||
makePart := func() partition { return makePartWithBuf(partBuf) }
|
||||
|
||||
return &Router{
|
||||
incoming: make(chan domain.Message, incomingBuf),
|
||||
partitions: make(map[string]partition),
|
||||
newPart: makePart,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *Router) Start() {
|
||||
r.mu.Lock()
|
||||
if r.started {
|
||||
r.mu.Unlock()
|
||||
return
|
||||
}
|
||||
r.started = true
|
||||
r.mu.Unlock()
|
||||
|
||||
r.wg.Go(func() {
|
||||
for msg := range r.incoming {
|
||||
ns, _, _ := msg.Identifier.Parse()
|
||||
r.mu.RLock()
|
||||
p, exists := r.partitions[ns]
|
||||
r.mu.RUnlock()
|
||||
if exists {
|
||||
p.deliver(msg)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (r *Router) Stop() {
|
||||
r.mu.Lock()
|
||||
if !r.started {
|
||||
r.mu.Unlock()
|
||||
return
|
||||
}
|
||||
r.started = false
|
||||
close(r.incoming)
|
||||
|
||||
ps := make([]partition, 0, len(r.partitions))
|
||||
for _, p := range r.partitions {
|
||||
ps = append(ps, p)
|
||||
}
|
||||
r.partitions = make(map[string]partition)
|
||||
r.mu.Unlock()
|
||||
|
||||
for _, p := range ps {
|
||||
p.stop()
|
||||
}
|
||||
r.wg.Wait()
|
||||
}
|
||||
|
||||
func (r *Router) Incoming() chan<- domain.Message { return r.incoming }
|
||||
|
||||
func (r *Router) RegisterPattern(pat domain.Pattern, ch chan<- domain.Message) {
|
||||
// Inline ensurePartition
|
||||
ns := pat.Namespace
|
||||
r.mu.RLock()
|
||||
p := r.partitions[ns]
|
||||
r.mu.RUnlock()
|
||||
if p == nil {
|
||||
r.mu.Lock()
|
||||
// recheck under write lock
|
||||
if p = r.partitions[ns]; p == nil {
|
||||
p = r.newPart()
|
||||
p.start()
|
||||
r.partitions[ns] = p
|
||||
}
|
||||
r.mu.Unlock()
|
||||
}
|
||||
p.registerRoute(pat, ch)
|
||||
}
|
||||
|
||||
func (r *Router) DeregisterPattern(pat domain.Pattern, ch chan<- domain.Message) {
|
||||
r.mu.RLock()
|
||||
p := r.partitions[pat.Namespace]
|
||||
r.mu.RUnlock()
|
||||
if p != nil {
|
||||
p.deregisterRoute(pat, ch)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user