From e56bb210f883779d4105b0bf94779f9a513a9322 Mon Sep 17 00:00:00 2001 From: Phillip Michelsen Date: Wed, 10 Sep 2025 13:05:34 +0000 Subject: [PATCH] Refactor command handling: rename types.go to commands.go, update command response types, and streamline session management logic --- .../data_service/internal/manager/commands.go | 78 ++++ .../data_service/internal/manager/helper.go | 69 +--- .../data_service/internal/manager/manager.go | 380 ++++++------------ .../data_service/internal/manager/session.go | 143 +++---- .../data_service/internal/manager/types.go | 71 ---- .../internal/provider/provider.go | 8 +- 6 files changed, 275 insertions(+), 474 deletions(-) create mode 100644 services/data_service/internal/manager/commands.go delete mode 100644 services/data_service/internal/manager/types.go diff --git a/services/data_service/internal/manager/commands.go b/services/data_service/internal/manager/commands.go new file mode 100644 index 0000000..b310dc5 --- /dev/null +++ b/services/data_service/internal/manager/commands.go @@ -0,0 +1,78 @@ +package manager + +import ( + "time" + + "github.com/google/uuid" + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider" +) + +// Commands posted into the manager loop. One struct per action. +type addProviderCmd struct { + name string + p provider.Provider + resp chan addProviderResult +} + +type addProviderResult struct { + err error +} + +type removeProviderCmd struct { + name string + resp chan removeProviderResult +} + +type removeProviderResult struct { + err error +} + +type newSessionCmd struct { + idleAfter time.Duration + resp chan newSessionResult +} + +type newSessionResult struct { + id uuid.UUID +} + +type attachCmd struct { + sid uuid.UUID + inBuf, outBuf int + resp chan attachResult +} + +type attachResult struct { + cin chan<- domain.Message + cout <-chan domain.Message + err error +} + +type detachCmd struct { + sid uuid.UUID + resp chan detachResult +} + +type detachResult struct { + err error +} + +type configureCmd struct { + sid uuid.UUID + next []domain.Identifier + resp chan configureResult +} + +type configureResult struct { + err error +} + +type closeSessionCmd struct { + sid uuid.UUID + resp chan closeSessionResult +} + +type closeSessionResult struct { + err error +} diff --git a/services/data_service/internal/manager/helper.go b/services/data_service/internal/manager/helper.go index 7b16770..1f93814 100644 --- a/services/data_service/internal/manager/helper.go +++ b/services/data_service/internal/manager/helper.go @@ -1,75 +1,40 @@ package manager import ( - "fmt" + "log/slog" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider" ) -// Lightweight error helper to define package-level errors inline. -type constErr string +func lg() *slog.Logger { return slog.Default().With("cmp", "manager") } -func (e constErr) Error() string { return string(e) } -func errorf(s string) error { return constErr(s) } - -// copySet copies a set of identifiers to a new map. -func copySet(in map[domain.Identifier]struct{}) map[domain.Identifier]struct{} { - out := make(map[domain.Identifier]struct{}, len(in)) - for k := range in { - out[k] = struct{}{} +func identifierSetDifferences(oldIDs, nextIDs []domain.Identifier) (toAdd, toDel []domain.Identifier) { + oldSet := make(map[domain.Identifier]struct{}, len(oldIDs)) + for _, id := range oldIDs { + oldSet[id] = struct{}{} } - return out -} -// identifierSetDifferences computes additions and deletions from old -> next. -func identifierSetDifferences(old map[domain.Identifier]struct{}, next []domain.Identifier) (toAdd, toDel []domain.Identifier) { - newSet := make(map[domain.Identifier]struct{}, len(next)) - for _, id := range next { + newSet := make(map[domain.Identifier]struct{}, len(nextIDs)) + for _, id := range nextIDs { newSet[id] = struct{}{} - if _, ok := old[id]; !ok { + if _, ok := oldSet[id]; !ok { toAdd = append(toAdd, id) } } - for id := range old { + + for _, id := range oldIDs { if _, ok := newSet[id]; !ok { toDel = append(toDel, id) } } + return } -// resolveProvider parses a raw identifier and looks up the provider. -func (m *Manager) resolveProvider(id domain.Identifier) (provider.Provider, string, error) { - provName, subj, ok := id.ProviderSubject() - if !ok || provName == "" || subj == "" { - return nil, "", ErrInvalidIdentifier +func identifierMapToSlice(m map[domain.Identifier]struct{}) []domain.Identifier { + ids := make([]domain.Identifier, 0, len(m)) + for id := range m { + ids = append(ids, id) } - p := m.providers[provName] - if p == nil { - return nil, "", fmt.Errorf("%w: %s", ErrUnknownProvider, provName) - } - return p, subj, nil -} - -// incrementStreamRefCount increments refcount and returns true if transitioning 0->1. -func (m *Manager) incrementStreamRefCount(id domain.Identifier) bool { - rc := m.streamRef[id] + 1 - m.streamRef[id] = rc - return rc == 1 -} - -// decrementStreamRefCount decrements refcount and returns true if transitioning 1->0. -func (m *Manager) decrementStreamRefCount(id domain.Identifier) bool { - rc, ok := m.streamRef[id] - if !ok { - return false - } - rc-- - if rc <= 0 { - delete(m.streamRef, id) - return true - } - m.streamRef[id] = rc - return false + return ids } diff --git a/services/data_service/internal/manager/manager.go b/services/data_service/internal/manager/manager.go index adc018d..075d433 100644 --- a/services/data_service/internal/manager/manager.go +++ b/services/data_service/internal/manager/manager.go @@ -12,7 +12,11 @@ import ( "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/router" ) -func lg() *slog.Logger { return slog.Default().With("cmp", "manager") } +var ( + ErrSessionNotFound = errors.New("session not found") + ErrClientAlreadyAttached = errors.New("client already attached") + ErrClientNotAttached = errors.New("client not attached") +) // Manager is a single-goroutine actor that owns all state. type Manager struct { @@ -22,7 +26,6 @@ type Manager struct { // State (loop-owned) providers map[string]provider.Provider sessions map[uuid.UUID]*session - streamRef map[domain.Identifier]int // Router router *router.Router @@ -34,7 +37,6 @@ func NewManager(r *router.Router) *Manager { cmdCh: make(chan any, 256), providers: make(map[string]provider.Provider), sessions: make(map[uuid.UUID]*session), - streamRef: make(map[domain.Identifier]int), router: r, } go r.Run() @@ -50,40 +52,39 @@ func NewManager(r *router.Router) *Manager { // AddProvider adds and starts a new provider. func (m *Manager) AddProvider(name string, p provider.Provider) error { lg().Debug("add provider request", slog.String("name", name)) - resp := make(chan error, 1) + resp := make(chan addProviderResult, 1) m.cmdCh <- addProviderCmd{name: name, p: p, resp: resp} - return <-resp + + r := <-resp + return r.err } // RemoveProvider stops and removes a provider, cleaning up all sessions. func (m *Manager) RemoveProvider(name string) error { lg().Debug("remove provider request", slog.String("name", name)) - resp := make(chan error, 1) + resp := make(chan removeProviderResult, 1) m.cmdCh <- removeProviderCmd{name: name, resp: resp} - return <-resp + + r := <-resp + return r.err } // NewSession creates a new session with the given idle timeout. -func (m *Manager) NewSession(idleAfter time.Duration) (uuid.UUID, error) { +func (m *Manager) NewSession(idleAfter time.Duration) uuid.UUID { lg().Debug("new session request", slog.Duration("idle_after", idleAfter)) - resp := make(chan struct { - id uuid.UUID - err error - }, 1) + resp := make(chan newSessionResult, 1) m.cmdCh <- newSessionCmd{idleAfter: idleAfter, resp: resp} + r := <-resp - return r.id, r.err + return r.id } // AttachClient attaches a client to a session, creates and returns client channels for the session. func (m *Manager) AttachClient(id uuid.UUID, inBuf, outBuf int) (chan<- domain.Message, <-chan domain.Message, error) { lg().Debug("attach client request", slog.String("session", id.String()), slog.Int("in_buf", inBuf), slog.Int("out_buf", outBuf)) - resp := make(chan struct { - cin chan<- domain.Message - cout <-chan domain.Message - err error - }, 1) + resp := make(chan attachResult, 1) m.cmdCh <- attachCmd{sid: id, inBuf: inBuf, outBuf: outBuf, resp: resp} + r := <-resp return r.cin, r.cout, r.err } @@ -91,25 +92,31 @@ func (m *Manager) AttachClient(id uuid.UUID, inBuf, outBuf int) (chan<- domain.M // DetachClient detaches the client from the session, closes client channels and arms timeout. func (m *Manager) DetachClient(id uuid.UUID) error { lg().Debug("detach client request", slog.String("session", id.String())) - resp := make(chan error, 1) + resp := make(chan detachResult, 1) m.cmdCh <- detachCmd{sid: id, resp: resp} - return <-resp + + r := <-resp + return r.err } // ConfigureSession sets the next set of identifiers for the session, starting and stopping streams as needed. func (m *Manager) ConfigureSession(id uuid.UUID, next []domain.Identifier) error { lg().Debug("configure session request", slog.String("session", id.String()), slog.Int("idents", len(next))) - resp := make(chan error, 1) + resp := make(chan configureResult, 1) m.cmdCh <- configureCmd{sid: id, next: next, resp: resp} - return <-resp + + r := <-resp + return r.err } // CloseSession closes and removes the session, cleaning up all bindings. func (m *Manager) CloseSession(id uuid.UUID) error { lg().Debug("close session request", slog.String("session", id.String())) - resp := make(chan error, 1) + resp := make(chan closeSessionResult, 1) m.cmdCh <- closeSessionCmd{sid: id, resp: resp} - return <-resp + + r := <-resp + return r.err } // The main loop of the manager, processing commands serially. @@ -140,307 +147,172 @@ func (m *Manager) run() { func (m *Manager) handleAddProvider(cmd addProviderCmd) { if _, ok := m.providers[cmd.name]; ok { lg().Warn("provider already exists", slog.String("name", cmd.name)) - cmd.resp <- fmt.Errorf("provider exists: %s", cmd.name) + cmd.resp <- addProviderResult{err: fmt.Errorf("provider exists: %s", cmd.name)} return } if err := cmd.p.Start(); err != nil { lg().Warn("failed to start provider", slog.String("name", cmd.name), slog.String("err", err.Error())) - cmd.resp <- fmt.Errorf("start provider %s: %w", cmd.name, err) + cmd.resp <- addProviderResult{err: fmt.Errorf("failed to start provider %s: %w", cmd.name, err)} return } m.providers[cmd.name] = cmd.p - cmd.resp <- nil + cmd.resp <- addProviderResult{err: nil} } func (m *Manager) handleRemoveProvider(cmd removeProviderCmd) { - p, ok := m.providers[cmd.name] - if !ok { - lg().Warn("provider not found", slog.String("name", cmd.name)) - cmd.resp <- fmt.Errorf("provider not found: %s", cmd.name) - return - } - - // Clean all identifiers belonging to this provider. Iterates through sessions to reduce provider burden. - for _, s := range m.sessions { - for ident := range s.bound { - provName, subj, ok := ident.ProviderSubject() - if !ok || provName != cmd.name { - // TODO: add log warning, but basically should never ever happen - lg().Warn("identifier with mismatched provider found in session during provider removal", slog.String("session", s.id.String()), slog.String("ident", ident.Key()), slog.String("expected_provider", cmd.name), slog.String("found_provider", provName)) - continue - } - if s.attached && s.clientOut != nil { - m.router.DeregisterRoute(ident, s.clientOut) - } - delete(s.bound, ident) - - // decrementStreamRefCount returns true if this was the last ref. In which case we want to stop the stream. - if ident.IsRaw() && m.decrementStreamRefCount(ident) && subj != "" { - _ = p.StopStreams([]string{subj}) // best-effort as we will remove the provider anyway - } - } - } - - // Defensive sweep: log and clear any dangling streamRef entries for this provider. - for id := range m.streamRef { - provName, _, ok := id.ProviderSubject() - if !ok || provName != cmd.name { - continue - } - delete(m.streamRef, id) - lg().Warn("dangling streamRef entry found during provider removal", slog.String("ident", id.Key()), slog.String("provider", cmd.name)) - } - - p.Stop() - delete(m.providers, cmd.name) - cmd.resp <- nil + panic("unimplemented") } func (m *Manager) handleNewSession(cmd newSessionCmd) { - s := &session{ - id: uuid.New(), - bound: make(map[domain.Identifier]struct{}), - idleAfter: cmd.idleAfter, - } - - // Arm idle timer to auto-close the session. - s.idleTimer = time.AfterFunc(cmd.idleAfter, func() { - m.cmdCh <- closeSessionCmd{sid: s.id, resp: make(chan error, 1)} + s := newSession(cmd.idleAfter) + s.armIdleTimer(func() { + resp := make(chan closeSessionResult, 1) + m.cmdCh <- closeSessionCmd{sid: s.id, resp: resp} + <-resp }) - m.sessions[s.id] = s // added after arming in the case of immediate timeout or error in arming timer + m.sessions[s.id] = s - cmd.resp <- struct { - id uuid.UUID - err error - }{id: s.id, err: nil} - - lg().Info("new session created", slog.String("session", s.id.String()), slog.Duration("idle_after", cmd.idleAfter)) + cmd.resp <- newSessionResult{id: s.id} } func (m *Manager) handleAttach(cmd attachCmd) { s, ok := m.sessions[cmd.sid] if !ok { - cmd.resp <- struct { - cin chan<- domain.Message - cout <-chan domain.Message - err error - }{nil, nil, ErrSessionNotFound} - return - } - if s.closed { - cmd.resp <- struct { - cin chan<- domain.Message - cout <-chan domain.Message - err error - }{nil, nil, ErrSessionClosed} + cmd.resp <- attachResult{nil, nil, ErrSessionNotFound} return } if s.attached { - cmd.resp <- struct { - cin chan<- domain.Message - cout <-chan domain.Message - err error - }{nil, nil, ErrClientAlreadyAttached} + cmd.resp <- attachResult{nil, nil, ErrClientAlreadyAttached} return } - cin, cout, err := m.attachSession(s, cmd.inBuf, cmd.outBuf) + cin, cout := s.generateNewChannels(cmd.inBuf, cmd.outBuf) + s.attached = true + s.disarmIdleTimer() - cmd.resp <- struct { - cin chan<- domain.Message - cout <-chan domain.Message - err error - }{cin, cout, err} - - lg().Info("client attached to session", slog.String("session", s.id.String())) + cmd.resp <- attachResult{cin: cin, cout: cout, err: nil} } func (m *Manager) handleDetach(cmd detachCmd) { s, ok := m.sessions[cmd.sid] if !ok { - cmd.resp <- ErrSessionNotFound - return - } - if s.closed { - cmd.resp <- ErrSessionClosed + cmd.resp <- detachResult{ErrSessionNotFound} return } if !s.attached { - cmd.resp <- ErrClientNotAttached + cmd.resp <- detachResult{ErrClientNotAttached} return } - _ = m.detachSession(cmd.sid, s) + s.clearChannels() + s.armIdleTimer(func() { + resp := make(chan closeSessionResult, 1) + m.cmdCh <- closeSessionCmd{sid: s.id, resp: resp} + <-resp + }) - cmd.resp <- nil + s.attached = false - lg().Info("client detached from session", slog.String("session", s.id.String())) + cmd.resp <- detachResult{nil} } +// handleConfigure updates the session bindings, starting and stopping streams as needed. Currently only supports Raw streams. func (m *Manager) handleConfigure(cmd configureCmd) { s, ok := m.sessions[cmd.sid] if !ok { - cmd.resp <- ErrSessionNotFound - return - } - if s.closed { - cmd.resp <- ErrSessionClosed + cmd.resp <- configureResult{ErrSessionNotFound} return } - old := copySet(s.bound) - toAdd, toDel := identifierSetDifferences(old, cmd.next) + toAdd, toRemove := identifierSetDifferences(identifierMapToSlice(s.bound), cmd.next) - var aggErrs error + pendingSub := make(map[domain.Identifier]<-chan error) + pendingUnsub := make(map[domain.Identifier]<-chan error) + var added, removed []domain.Identifier + var errs error - // 1) Build batches: provider → starts(starters) and stops(subjects) - type starter struct { - id domain.Identifier - subj string - } - startsByProv := make(map[provider.Provider][]starter) - stopsByProv := make(map[provider.Provider][]string) - - // Removals - for _, ident := range toDel { - if s.attached && s.clientOut != nil { - m.router.DeregisterRoute(ident, s.clientOut) - } - delete(s.bound, ident) - - if !ident.IsRaw() { + // Adds + for _, id := range toAdd { + pName, subject, ok := id.ProviderSubject() + if !ok || subject == "" || pName == "" { + errs = errors.Join(errs, fmt.Errorf("invalid identifier: %s", id.Key())) continue } - - p, subj, err := m.resolveProvider(ident) - if err != nil { - aggErrs = errors.Join(aggErrs, fmt.Errorf("stop %s: %w", ident.Key(), err)) + p, ok := m.providers[pName] + if !ok { + errs = errors.Join(errs, fmt.Errorf("provider not found: %s", pName)) continue } - if subj == "" { + if p.IsStreamActive(subject) { + s.bound[id] = struct{}{} + added = append(added, id) continue } - - if m.decrementStreamRefCount(ident) { // only when last ref - stopsByProv[p] = append(stopsByProv[p], subj) - } + pendingSub[id] = p.Subscribe(subject) } - // Additions - for _, ident := range toAdd { - if !ident.IsRaw() { - if s.attached && s.clientOut != nil { - m.router.RegisterRoute(ident, s.clientOut) - } - s.bound[ident] = struct{}{} + // Removes + for _, id := range toRemove { + pName, subject, ok := id.ProviderSubject() + if !ok || subject == "" || pName == "" { + errs = errors.Join(errs, fmt.Errorf("invalid identifier: %s", id.Key())) continue } - - p, subj, err := m.resolveProvider(ident) - if err != nil { - aggErrs = errors.Join(aggErrs, err) + p, ok := m.providers[pName] + if !ok { + errs = errors.Join(errs, fmt.Errorf("provider not found: %s", pName)) continue } - if !p.IsValidSubject(subj, false) { - aggErrs = errors.Join(aggErrs, fmt.Errorf("invalid subject %q", subj)) - continue - } - - if m.incrementStreamRefCount(ident) { // first ref → start later - startsByProv[p] = append(startsByProv[p], starter{id: ident, subj: subj}) - } else { - // already active → bind+route now - if s.attached && s.clientOut != nil { - m.router.RegisterRoute(ident, s.clientOut) - } - s.bound[ident] = struct{}{} - } - } - - // 2) Fire provider calls - type batchRes struct { - prov provider.Provider - err error - op string // "start"/"stop" - } - done := make(chan batchRes, len(startsByProv)+len(stopsByProv)) - - // Start batches - for p, items := range startsByProv { - subjs := make([]string, 0, len(items)) - for _, it := range items { - subjs = append(subjs, it.subj) - } - ack := p.StartStreams(subjs) - go func(p provider.Provider, ack <-chan error) { - var err error - select { - case err = <-ack: - case <-time.After(statusWaitTotal): - err = fmt.Errorf("timeout") - } - done <- batchRes{prov: p, err: err, op: "start"} - }(p, ack) - } - - // Stop batches - for p, subjs := range stopsByProv { - ack := p.StopStreams(subjs) - go func(p provider.Provider, ack <-chan error) { - var err error - select { - case err = <-ack: - case <-time.After(statusWaitTotal): - err = fmt.Errorf("timeout") - } - done <- batchRes{prov: p, err: err, op: "stop"} - }(p, ack) - } - - // 3) Collect results - for i := 0; i < len(startsByProv)+len(stopsByProv); i++ { - r := <-done - switch r.op { - case "start": - items := startsByProv[r.prov] - if r.err != nil { - // Roll back refcounts for each ident in this provider batch - for _, it := range items { - _ = m.decrementStreamRefCount(it.id) - aggErrs = errors.Join(aggErrs, fmt.Errorf("start %s: %w", it.id.Key(), r.err)) - } + stillNeeded := false + for _, other := range m.sessions { + if other.id == s.id { continue } - // Success → bind and route - for _, it := range items { - if s.attached && s.clientOut != nil { - m.router.RegisterRoute(it.id, s.clientOut) - } - s.bound[it.id] = struct{}{} - } - case "stop": - if r.err != nil { - for _, subj := range stopsByProv[r.prov] { - aggErrs = errors.Join(aggErrs, fmt.Errorf("stop %s/%s: %w", "raw", subj, r.err)) - } + if _, bound := other.bound[id]; bound { + stillNeeded = true + break } } + if stillNeeded { + delete(s.bound, id) + removed = append(removed, id) + continue + } + pendingUnsub[id] = p.Unsubscribe(subject) } - cmd.resp <- aggErrs + // Wait for subscribes + for id, ch := range pendingSub { + if err := <-ch; err != nil { + errs = errors.Join(errs, fmt.Errorf("failed to subscribe to %s: %w", id.Key(), err)) + continue + } + s.bound[id] = struct{}{} + added = append(added, id) + } - lg().Info("session configured", slog.String("session", s.id.String()), slog.Int("bound", len(s.bound)), slog.Int("to_add", len(toAdd)), slog.Int("to_del", len(toDel))) + // Wait for unsubscribes + for id, ch := range pendingUnsub { + if err := <-ch; err != nil { + errs = errors.Join(errs, fmt.Errorf("failed to unsubscribe from %s: %w", id.Key(), err)) + continue + } + delete(s.bound, id) + removed = append(removed, id) + } + + // Update the router routes to reflect the new successful bindings + for _, id := range added { + m.router.RegisterRoute(id, s.outChannel) + } + for _, id := range removed { + m.router.DeregisterRoute(id, s.outChannel) + } + + cmd.resp <- configureResult{err: errs} } func (m *Manager) handleCloseSession(c closeSessionCmd) { - s, ok := m.sessions[c.sid] - if !ok { - c.resp <- ErrSessionNotFound - return - } - m.closeSession(c.sid, s) - c.resp <- nil - - lg().Info("session closed", slog.String("session", s.id.String())) + panic("unimplemented") } diff --git a/services/data_service/internal/manager/session.go b/services/data_service/internal/manager/session.go index 9bf4a7b..b4067bd 100644 --- a/services/data_service/internal/manager/session.go +++ b/services/data_service/internal/manager/session.go @@ -7,123 +7,80 @@ import ( "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" ) -// Session holds per-session state. Owned by the manager loop. +const ( + defaultClientBuf = 256 +) + +// Session holds per-session state. Owned by the manager loop. So we do not need a mutex. type session struct { id uuid.UUID - clientIn chan domain.Message // caller writes - clientOut chan domain.Message // caller reads + inChannel chan domain.Message // caller writes + outChannel chan domain.Message // caller reads bound map[domain.Identifier]struct{} - closed bool attached bool idleAfter time.Duration idleTimer *time.Timer } -// attachSession wires channels, stops idle timer, and registers ready routes. -// Precondition: session exists and is not attached/closed. Runs in loop. -func (m *Manager) attachSession(s *session, inBuf, outBuf int) (chan<- domain.Message, <-chan domain.Message, error) { +func newSession(idleAfter time.Duration) *session { + return &session{ + id: uuid.New(), + bound: make(map[domain.Identifier]struct{}), + attached: false, + idleAfter: idleAfter, + } +} + +func (s *session) armIdleTimer(f func()) { + if s.idleTimer != nil { + s.idleTimer.Stop() + } + s.idleTimer = time.AfterFunc(s.idleAfter, f) +} + +func (s *session) disarmIdleTimer() { + if s.idleTimer != nil { + s.idleTimer.Stop() + s.idleTimer = nil + } +} + +// generateNewChannels creates new in/out channels for the session, will not close existing channels. +func (s *session) generateNewChannels(inBuf, outBuf int) (chan domain.Message, chan domain.Message) { if inBuf <= 0 { inBuf = defaultClientBuf } if outBuf <= 0 { outBuf = defaultClientBuf } - - cin := make(chan domain.Message, inBuf) - cout := make(chan domain.Message, outBuf) - s.clientIn, s.clientOut = cin, cout - - if s.idleTimer != nil { - s.idleTimer.Stop() - s.idleTimer = nil - } - - // Forward clientIn to router.Incoming with drop on backpressure. - go func(src <-chan domain.Message, dst chan<- domain.Message) { - for msg := range src { - select { - case dst <- msg: - default: - lg().Warn("drop message on clientIn backpressure", "identifier", msg.Identifier.Key()) - } - } - }(cin, m.router.IncomingChannel()) - - // Register all currently bound that are ready. - for ident := range s.bound { - if !ident.IsRaw() { - m.router.RegisterRoute(ident, cout) - continue - } - // Raw: register only if provider stream is active. - if p, subj, err := m.resolveProvider(ident); err == nil && p.IsStreamActive(subj) { - m.router.RegisterRoute(ident, cout) - } - } - - s.attached = true - return cin, cout, nil + s.inChannel = make(chan domain.Message, inBuf) + s.outChannel = make(chan domain.Message, outBuf) + return s.inChannel, s.outChannel } -// detachSession deregisters all routes, closes channels, and arms idle timer. -// Precondition: session exists and is attached. Runs in loop. -func (m *Manager) detachSession(sid uuid.UUID, s *session) error { - if s.clientOut != nil { - for ident := range s.bound { - m.router.DeregisterRoute(ident, s.clientOut) - } - close(s.clientOut) +// clearChannels closes and nils the in/out channels. +func (s *session) clearChannels() { + if s.inChannel != nil { + close(s.inChannel) + s.inChannel = nil } - if s.clientIn != nil { - close(s.clientIn) + if s.outChannel != nil { + close(s.outChannel) + s.outChannel = nil } - s.clientIn, s.clientOut = nil, nil - s.attached = false - - // Arm idle timer to auto-close the session. - s.idleTimer = time.AfterFunc(s.idleAfter, func() { - m.cmdCh <- closeSessionCmd{sid: sid, resp: make(chan error, 1)} - }) - return nil } -// closeSession performs full teardown and refcount drops. Runs in loop. -func (m *Manager) closeSession(sid uuid.UUID, s *session) { - if s.closed { - return +func (m *Manager) getSessionChannels(sid uuid.UUID) (chan<- domain.Message, <-chan domain.Message, error) { + s, ok := m.sessions[sid] + if !ok { + return nil, nil, ErrSessionNotFound } - s.closed = true - - // Detach if attached. - if s.attached { - if s.clientOut != nil { - for ident := range s.bound { - m.router.DeregisterRoute(ident, s.clientOut) - } - close(s.clientOut) - } - if s.clientIn != nil { - close(s.clientIn) - } - } else if s.idleTimer != nil { - s.idleTimer.Stop() - s.idleTimer = nil + if !s.attached { + return nil, nil, ErrClientNotAttached } - // Drop refs for raw identifiers and stop streams if last ref. Fire-and-forget. - for ident := range s.bound { - if !ident.IsRaw() { - continue - } - if last := m.decrementStreamRefCount(ident); last { - if p, subj, err := m.resolveProvider(ident); err == nil { - _ = p.StopStreams([]string{subj}) // do not wait - } - } - } - - delete(m.sessions, sid) + return s.inChannel, s.outChannel, nil } diff --git a/services/data_service/internal/manager/types.go b/services/data_service/internal/manager/types.go deleted file mode 100644 index 89c548e..0000000 --- a/services/data_service/internal/manager/types.go +++ /dev/null @@ -1,71 +0,0 @@ -package manager - -import ( - "time" - - "github.com/google/uuid" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider" -) - -// Shared constants. -const ( - defaultClientBuf = 256 - statusWaitTotal = 10 * time.Second -) - -// Manager-level errors. -var ( - ErrSessionNotFound = errorf("session not found") - ErrSessionClosed = errorf("session closed") - ErrClientAlreadyAttached = errorf("client already attached") - ErrClientNotAttached = errorf("client not attached") - ErrInvalidIdentifier = errorf("invalid identifier") - ErrUnknownProvider = errorf("unknown provider") -) - -// Commands posted into the manager loop. One struct per action. -type addProviderCmd struct { - name string - p provider.Provider - resp chan error -} - -type removeProviderCmd struct { - name string - resp chan error -} - -type newSessionCmd struct { - idleAfter time.Duration - resp chan struct { - id uuid.UUID - err error - } -} - -type attachCmd struct { - sid uuid.UUID - inBuf, outBuf int - resp chan struct { - cin chan<- domain.Message - cout <-chan domain.Message - err error - } -} - -type detachCmd struct { - sid uuid.UUID - resp chan error -} - -type configureCmd struct { - sid uuid.UUID - next []domain.Identifier - resp chan error // returns after starts from this call succeed or timeout -} - -type closeSessionCmd struct { - sid uuid.UUID - resp chan error -} diff --git a/services/data_service/internal/provider/provider.go b/services/data_service/internal/provider/provider.go index 17877ba..663b10c 100644 --- a/services/data_service/internal/provider/provider.go +++ b/services/data_service/internal/provider/provider.go @@ -6,11 +6,11 @@ type Provider interface { Start() error Stop() - StartStreams(keys []string) <-chan error - StopStreams(key []string) <-chan error - - Fetch(key string) (domain.Message, error) + Subscribe(subject string) <-chan error + Unsubscribe(subject string) <-chan error + Fetch(subject string) (domain.Message, error) + GetActiveStreams() []string IsStreamActive(key string) bool IsValidSubject(key string, isFetch bool) bool }