diff --git a/services/data_service/internal/manager/api.go b/services/data_service/internal/manager/api.go deleted file mode 100644 index 8995f04..0000000 --- a/services/data_service/internal/manager/api.go +++ /dev/null @@ -1,22 +0,0 @@ -package manager - -import ( - "time" - - "github.com/google/uuid" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider" -) - -type SessionController interface { - NewSession(idleAfter time.Duration) uuid.UUID - AttachClient(id uuid.UUID, inBuf, outBuf int) (chan<- domain.Message, <-chan domain.Message, error) - DetachClient(id uuid.UUID) error - ConfigureSession(id uuid.UUID, next []domain.Identifier) error - CloseSession(id uuid.UUID) error -} - -type ProviderController interface { - AddProvider(name string, p provider.Provider) error - RemoveProvider(name string) error -} diff --git a/services/data_service/internal/manager/manager.go b/services/data_service/internal/manager/manager.go index 67c9c72..7afc32b 100644 --- a/services/data_service/internal/manager/manager.go +++ b/services/data_service/internal/manager/manager.go @@ -13,6 +13,7 @@ import ( "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/router" + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/worker" ) var ( @@ -35,14 +36,14 @@ type Manager struct { } // NewManager creates a manager and starts its run loop. -func NewManager(r *router.Router) *Manager { +func NewManager(router *router.Router, workerRegistry *worker.Registry) *Manager { m := &Manager{ cmdCh: make(chan any, 256), providers: make(map[string]provider.Provider), sessions: make(map[uuid.UUID]*session), - router: r, + router: router, } - go r.Run() + go router.Run() go m.run() slog.Default().Info("manager started", slog.String("cmp", "manager")) @@ -186,11 +187,15 @@ func (m *Manager) handleRemoveProvider(_ removeProviderCmd) { // handleNewSession creates a new session with the given idle timeout. The idle timeout is typically not set by the client, but by the server configuration. func (m *Manager) handleNewSession(cmd newSessionCmd) { s := newSession(cmd.idleAfter) - s.armIdleTimer(func() { - resp := make(chan closeSessionResult, 1) - m.cmdCh <- closeSessionCmd{sid: s.id, resp: resp} - <-resp - }) + + // Only arm the idle timer if the timeout is positive. We allow a zero or negative timeout to indicate "never timeout". + if s.idleAfter <= 0 { + s.armIdleTimer(func() { + resp := make(chan closeSessionResult, 1) + m.cmdCh <- closeSessionCmd{sid: s.id, resp: resp} + <-resp + }) + } m.sessions[s.id] = s @@ -237,11 +242,15 @@ func (m *Manager) handleDetach(cmd detachCmd) { } s.clearChannels() - s.armIdleTimer(func() { - resp := make(chan closeSessionResult, 1) - m.cmdCh <- closeSessionCmd{sid: s.id, resp: resp} - <-resp - }) + + // Only rearm the idle timer if the timeout is positive. + if s.idleAfter > 0 { + s.armIdleTimer(func() { + resp := make(chan closeSessionResult, 1) + m.cmdCh <- closeSessionCmd{sid: s.id, resp: resp} + <-resp + }) + } s.attached = false diff --git a/services/data_service/internal/manager/session.go b/services/data_service/internal/manager/session.go index 023efd4..6cbbd49 100644 --- a/services/data_service/internal/manager/session.go +++ b/services/data_service/internal/manager/session.go @@ -43,7 +43,7 @@ func (s *session) armIdleTimer(f func()) { s.idleTimer = time.AfterFunc(s.idleAfter, f) } -// disarmIdleTimer stops and nils the idle timer if any. +// disarmIdleTimer stops and nils the idle timer if any. This call is idempotent. func (s *session) disarmIdleTimer() { if s.idleTimer != nil { s.idleTimer.Stop() diff --git a/services/data_service/internal/worker/worker.go b/services/data_service/internal/worker/worker.go index 7921a4a..6aca591 100644 --- a/services/data_service/internal/worker/worker.go +++ b/services/data_service/internal/worker/worker.go @@ -10,7 +10,6 @@ import ( "github.com/google/uuid" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager" ) var ( @@ -18,9 +17,18 @@ var ( ErrWorkerRunning = errors.New("worker already running") ) +type SessionController interface { + NewSession(idleAfter time.Duration) uuid.UUID + AttachClient(id uuid.UUID, inBuf, outBuf int) (chan<- domain.Message, <-chan domain.Message, error) + DetachClient(id uuid.UUID) error + ConfigureSession(id uuid.UUID, next []domain.Identifier) error + CloseSession(id uuid.UUID) error +} + type Worker interface { - Start(workerID uuid.UUID, controller manager.SessionController, cfg map[string]string) error + Start(workerID uuid.UUID, controller SessionController, cfg []byte) error Stop() error + IsRunning() bool ID() uuid.UUID } @@ -28,7 +36,7 @@ type Worker interface { type BaseStatefulWorker struct { workerUUID uuid.UUID - sc manager.SessionController + sc SessionController sid uuid.UUID in chan<- domain.Message out <-chan domain.Message @@ -37,7 +45,7 @@ type BaseStatefulWorker struct { mu sync.RWMutex } -func (w *BaseStatefulWorker) Start(workerUUID uuid.UUID, sessionController manager.SessionController, _ map[string]string) error { +func (w *BaseStatefulWorker) Start(workerUUID uuid.UUID, sessionController SessionController, _ []byte) error { if sessionController == nil { return errors.New("nil SessionController provided") } @@ -48,7 +56,7 @@ func (w *BaseStatefulWorker) Start(workerUUID uuid.UUID, sessionController manag return ErrWorkerRunning } - sid := sessionController.NewSession(time.Second * 30) + sid := sessionController.NewSession(time.Duration(0)) // set a zero duration to disable idle timeout in, out, err := sessionController.AttachClient(sid, 256, 256) if err != nil { w.mu.Unlock() @@ -71,12 +79,12 @@ func (w *BaseStatefulWorker) Stop() error { } err := w.sc.DetachClient(w.sid) - if err != nil && err != manager.ErrSessionNotFound { + if err != nil { slog.Default().Error("error when detaching client", "error", err.Error()) } err = w.sc.CloseSession(w.sid) - if err != nil && err != manager.ErrSessionNotFound { + if err != nil { slog.Default().Error("error when closing session", "error", err.Error()) } @@ -104,7 +112,7 @@ func (w *BaseStatefulWorker) ID() uuid.UUID { return id } -func (w *BaseStatefulWorker) SetStreams(ids []domain.Identifier) error { +func (w *BaseStatefulWorker) SetReceiveIdentifiers(ids []domain.Identifier) error { w.mu.RLock() if !w.running { w.mu.RUnlock()