Moved Manager API interfaces to consumers; Changed session logic to allow a for a 0 idle timeout session

This commit is contained in:
2025-09-23 01:48:07 +08:00
parent 9051fb7b6e
commit 740d9e3af6
4 changed files with 39 additions and 44 deletions

View File

@@ -1,22 +0,0 @@
package manager
import (
"time"
"github.com/google/uuid"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider"
)
type SessionController interface {
NewSession(idleAfter time.Duration) uuid.UUID
AttachClient(id uuid.UUID, inBuf, outBuf int) (chan<- domain.Message, <-chan domain.Message, error)
DetachClient(id uuid.UUID) error
ConfigureSession(id uuid.UUID, next []domain.Identifier) error
CloseSession(id uuid.UUID) error
}
type ProviderController interface {
AddProvider(name string, p provider.Provider) error
RemoveProvider(name string) error
}

View File

@@ -13,6 +13,7 @@ import (
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/router" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/router"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/worker"
) )
var ( var (
@@ -35,14 +36,14 @@ type Manager struct {
} }
// NewManager creates a manager and starts its run loop. // NewManager creates a manager and starts its run loop.
func NewManager(r *router.Router) *Manager { func NewManager(router *router.Router, workerRegistry *worker.Registry) *Manager {
m := &Manager{ m := &Manager{
cmdCh: make(chan any, 256), cmdCh: make(chan any, 256),
providers: make(map[string]provider.Provider), providers: make(map[string]provider.Provider),
sessions: make(map[uuid.UUID]*session), sessions: make(map[uuid.UUID]*session),
router: r, router: router,
} }
go r.Run() go router.Run()
go m.run() go m.run()
slog.Default().Info("manager started", slog.String("cmp", "manager")) slog.Default().Info("manager started", slog.String("cmp", "manager"))
@@ -186,11 +187,15 @@ func (m *Manager) handleRemoveProvider(_ removeProviderCmd) {
// handleNewSession creates a new session with the given idle timeout. The idle timeout is typically not set by the client, but by the server configuration. // handleNewSession creates a new session with the given idle timeout. The idle timeout is typically not set by the client, but by the server configuration.
func (m *Manager) handleNewSession(cmd newSessionCmd) { func (m *Manager) handleNewSession(cmd newSessionCmd) {
s := newSession(cmd.idleAfter) s := newSession(cmd.idleAfter)
s.armIdleTimer(func() {
resp := make(chan closeSessionResult, 1) // Only arm the idle timer if the timeout is positive. We allow a zero or negative timeout to indicate "never timeout".
m.cmdCh <- closeSessionCmd{sid: s.id, resp: resp} if s.idleAfter <= 0 {
<-resp s.armIdleTimer(func() {
}) resp := make(chan closeSessionResult, 1)
m.cmdCh <- closeSessionCmd{sid: s.id, resp: resp}
<-resp
})
}
m.sessions[s.id] = s m.sessions[s.id] = s
@@ -237,11 +242,15 @@ func (m *Manager) handleDetach(cmd detachCmd) {
} }
s.clearChannels() s.clearChannels()
s.armIdleTimer(func() {
resp := make(chan closeSessionResult, 1) // Only rearm the idle timer if the timeout is positive.
m.cmdCh <- closeSessionCmd{sid: s.id, resp: resp} if s.idleAfter > 0 {
<-resp s.armIdleTimer(func() {
}) resp := make(chan closeSessionResult, 1)
m.cmdCh <- closeSessionCmd{sid: s.id, resp: resp}
<-resp
})
}
s.attached = false s.attached = false

View File

@@ -43,7 +43,7 @@ func (s *session) armIdleTimer(f func()) {
s.idleTimer = time.AfterFunc(s.idleAfter, f) s.idleTimer = time.AfterFunc(s.idleAfter, f)
} }
// disarmIdleTimer stops and nils the idle timer if any. // disarmIdleTimer stops and nils the idle timer if any. This call is idempotent.
func (s *session) disarmIdleTimer() { func (s *session) disarmIdleTimer() {
if s.idleTimer != nil { if s.idleTimer != nil {
s.idleTimer.Stop() s.idleTimer.Stop()

View File

@@ -10,7 +10,6 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager"
) )
var ( var (
@@ -18,9 +17,18 @@ var (
ErrWorkerRunning = errors.New("worker already running") ErrWorkerRunning = errors.New("worker already running")
) )
type SessionController interface {
NewSession(idleAfter time.Duration) uuid.UUID
AttachClient(id uuid.UUID, inBuf, outBuf int) (chan<- domain.Message, <-chan domain.Message, error)
DetachClient(id uuid.UUID) error
ConfigureSession(id uuid.UUID, next []domain.Identifier) error
CloseSession(id uuid.UUID) error
}
type Worker interface { type Worker interface {
Start(workerID uuid.UUID, controller manager.SessionController, cfg map[string]string) error Start(workerID uuid.UUID, controller SessionController, cfg []byte) error
Stop() error Stop() error
IsRunning() bool IsRunning() bool
ID() uuid.UUID ID() uuid.UUID
} }
@@ -28,7 +36,7 @@ type Worker interface {
type BaseStatefulWorker struct { type BaseStatefulWorker struct {
workerUUID uuid.UUID workerUUID uuid.UUID
sc manager.SessionController sc SessionController
sid uuid.UUID sid uuid.UUID
in chan<- domain.Message in chan<- domain.Message
out <-chan domain.Message out <-chan domain.Message
@@ -37,7 +45,7 @@ type BaseStatefulWorker struct {
mu sync.RWMutex mu sync.RWMutex
} }
func (w *BaseStatefulWorker) Start(workerUUID uuid.UUID, sessionController manager.SessionController, _ map[string]string) error { func (w *BaseStatefulWorker) Start(workerUUID uuid.UUID, sessionController SessionController, _ []byte) error {
if sessionController == nil { if sessionController == nil {
return errors.New("nil SessionController provided") return errors.New("nil SessionController provided")
} }
@@ -48,7 +56,7 @@ func (w *BaseStatefulWorker) Start(workerUUID uuid.UUID, sessionController manag
return ErrWorkerRunning return ErrWorkerRunning
} }
sid := sessionController.NewSession(time.Second * 30) sid := sessionController.NewSession(time.Duration(0)) // set a zero duration to disable idle timeout
in, out, err := sessionController.AttachClient(sid, 256, 256) in, out, err := sessionController.AttachClient(sid, 256, 256)
if err != nil { if err != nil {
w.mu.Unlock() w.mu.Unlock()
@@ -71,12 +79,12 @@ func (w *BaseStatefulWorker) Stop() error {
} }
err := w.sc.DetachClient(w.sid) err := w.sc.DetachClient(w.sid)
if err != nil && err != manager.ErrSessionNotFound { if err != nil {
slog.Default().Error("error when detaching client", "error", err.Error()) slog.Default().Error("error when detaching client", "error", err.Error())
} }
err = w.sc.CloseSession(w.sid) err = w.sc.CloseSession(w.sid)
if err != nil && err != manager.ErrSessionNotFound { if err != nil {
slog.Default().Error("error when closing session", "error", err.Error()) slog.Default().Error("error when closing session", "error", err.Error())
} }
@@ -104,7 +112,7 @@ func (w *BaseStatefulWorker) ID() uuid.UUID {
return id return id
} }
func (w *BaseStatefulWorker) SetStreams(ids []domain.Identifier) error { func (w *BaseStatefulWorker) SetReceiveIdentifiers(ids []domain.Identifier) error {
w.mu.RLock() w.mu.RLock()
if !w.running { if !w.running {
w.mu.RUnlock() w.mu.RUnlock()