From 582759bb3b1c923bc91760e6fa2876ad3cc267a8 Mon Sep 17 00:00:00 2001 From: Phillip Michelsen Date: Sun, 24 Aug 2025 08:09:43 +0000 Subject: [PATCH] Small session management change: enhance ChannelOpts with inbound drop control, improve session struct for client attachment handling, and streamline message forwarding logic --- .../data_service/internal/manager/manager.go | 176 +++++++++--------- 1 file changed, 92 insertions(+), 84 deletions(-) diff --git a/services/data_service/internal/manager/manager.go b/services/data_service/internal/manager/manager.go index 9d5ae94..0684cc6 100644 --- a/services/data_service/internal/manager/manager.go +++ b/services/data_service/internal/manager/manager.go @@ -27,10 +27,10 @@ const ( ) type ChannelOpts struct { - InBufSize int - OutBufSize int - // If true, drop to clientOut when its buffer is full. If false, block. - DropOutbound bool + InBufSize int + OutBufSize int + DropOutbound bool // If true, drop outbound to client when its buffer is full. If false, block. + DropInbound bool // If true, drop inbound from client when internalIn is full. If false, block. } // Manager owns providers, sessions, and the router fanout. @@ -48,20 +48,26 @@ type Manager struct { type session struct { id uuid.UUID - // Stable internal channels. Only the session writes internalOut and reads internalIn. + // Stable internal channels. internalIn chan domain.Message // forwarded into router.IncomingChannel() - internalOut chan domain.Message // registered as router route target + internalOut chan domain.Message // registered as router route target, forwarded to clientOut (or dropped if unattached) - // Current client attachment (optional). Created by GetChannels. + // Client Channels (optional). Created on GetChannels and cleared on DetachClient. clientIn chan domain.Message // caller writes clientOut chan domain.Message // caller reads - // Cancels the permanent internalIn forwarder. + // Controls the permanent internalIn forwarder. cancelInternal context.CancelFunc - // Cancels current client forwarders. - cancelClient context.CancelFunc - bound map[domain.Identifier]struct{} + // Permanent outbound drain control. + egressWG sync.WaitGroup + + // Policy + dropWhenUnattached bool // always drop when no client attached + dropWhenSlow bool // mirror ChannelOpts.DropOutbound + dropInbound bool // mirror ChannelOpts.DropInbound + + bound map[domain.Identifier]struct{} // map for quick existence checks closed bool idleAfter time.Duration idleTimer *time.Timer @@ -78,15 +84,16 @@ func NewManager(r *router.Router) *Manager { } } -// NewSession creates a session with stable internal channels and a permanent -// forwarder that pipes internalIn into router.IncomingChannel(). +// NewSession creates a session with stable internal channels and two permanent workers: +// 1) internalIn -> router.Incoming 2) internalOut -> clientOut (or discard if unattached) func (m *Manager) NewSession(idleAfter time.Duration) (uuid.UUID, error) { s := &session{ - id: uuid.New(), - internalIn: make(chan domain.Message, defaultInternalBuf), - internalOut: make(chan domain.Message, defaultInternalBuf), - bound: make(map[domain.Identifier]struct{}), - idleAfter: idleAfter, + id: uuid.New(), + internalIn: make(chan domain.Message, defaultInternalBuf), + internalOut: make(chan domain.Message, defaultInternalBuf), + bound: make(map[domain.Identifier]struct{}), + idleAfter: idleAfter, + dropWhenUnattached: true, } ctx, cancel := context.WithCancel(context.Background()) s.cancelInternal = cancel @@ -106,17 +113,52 @@ func (m *Manager) NewSession(idleAfter time.Duration) (uuid.UUID, error) { if !ok { return } - // Place to filter, validate, meter, or throttle. + // Hook: filter/validate/meter/throttle inbound to router here. incoming <- msg } } }(ctx, s.internalIn) + // Permanent drain: internalOut -> clientOut (drop if unattached) + s.egressWG.Add(1) + go func(sid uuid.UUID) { + defer s.egressWG.Done() + for msg := range s.internalOut { + m.mu.Lock() + // Session might be gone; re-fetch safely. + s, ok := m.sessions[sid] + var cout chan domain.Message + var dropSlow, attached bool + if ok { + cout = s.clientOut + dropSlow = s.dropWhenSlow + attached = cout != nil + } + m.mu.Unlock() + + switch { + case !attached: + // unattached => drop + + case dropSlow: // typical case when attached + select { + case cout <- msg: + default: + // drop on slow consumer + } + + default: + cout <- msg // push to client, block if slow + } + } + }(s.id) + return s.id, nil } -// GetChannels creates a fresh client attachment and hooks both directions: -// clientIn -> internalIn and internalOut -> clientOut. Only one attachment at a time. +// GetChannels creates a fresh client attachment and hooks inbound (clientIn -> internalIn). +// Outbound delivery is handled by the permanent drain. +// Only one attachment at a time. func (m *Manager) GetChannels(id uuid.UUID, opts ChannelOpts) (chan<- domain.Message, <-chan domain.Message, error) { if opts.InBufSize <= 0 { opts.InBufSize = defaultClientBuf @@ -144,6 +186,8 @@ func (m *Manager) GetChannels(id uuid.UUID, opts ChannelOpts) (chan<- domain.Mes cin := make(chan domain.Message, opts.InBufSize) cout := make(chan domain.Message, opts.OutBufSize) s.clientIn, s.clientOut = cin, cout + s.dropWhenSlow = opts.DropOutbound + s.dropInbound = opts.DropInbound // Stop idle timer while attached. if s.idleTimer != nil { @@ -152,62 +196,30 @@ func (m *Manager) GetChannels(id uuid.UUID, opts ChannelOpts) (chan<- domain.Mes } internalIn := s.internalIn - internalOut := s.internalOut - - // Prepare per-attachment cancel. - attachCtx, attachCancel := context.WithCancel(context.Background()) - s.cancelClient = attachCancel - m.mu.Unlock() // Forward clientIn -> internalIn - go func(ctx context.Context, src <-chan domain.Message, dst chan<- domain.Message) { - for { - select { - case <-ctx.Done(): - return - case msg, ok := <-src: - if !ok { - // Client closed input; stop forwarding. - return + go func(src <-chan domain.Message, dst chan<- domain.Message, drop bool) { + for msg := range src { + if drop { + select { + case dst <- msg: + default: + // drop inbound on internal backpressure } - // Per-client checks could go here. + } else { dst <- msg } } - }(attachCtx, cin, internalIn) - - // Forward internalOut -> clientOut - go func(ctx context.Context, src <-chan domain.Message, dst chan<- domain.Message, drop bool) { - defer close(dst) - for { - select { - case <-ctx.Done(): - return - case msg, ok := <-src: - if !ok { - // Session is closing; signal EOF to client. - return - } - if drop { - select { - case dst <- msg: - default: - // Drop on client backpressure. Add metrics if desired. - } - } else { - dst <- msg - } - } - } - }(attachCtx, internalOut, cout, opts.DropOutbound) + // client closed input; forwarder exits + }(cin, internalIn, opts.DropInbound) // Return directional views. return (chan<- domain.Message)(cin), (<-chan domain.Message)(cout), nil } -// DetachClient cancels current client forwarders and clears the attachment. -// It starts the idle close timer if configured. +// DetachClient clears the client attachment and starts the idle close timer if configured. +// Does not close clientOut to avoid send-on-closed races with the permanent drain. func (m *Manager) DetachClient(id uuid.UUID) error { m.mu.Lock() s, ok := m.sessions[id] @@ -219,19 +231,14 @@ func (m *Manager) DetachClient(id uuid.UUID) error { m.mu.Unlock() return ErrSessionClosed } - // Capture and clear client state. - cancel := s.cancelClient cin := s.clientIn - s.cancelClient = nil + // Make unattached; permanent drain will drop while nil. s.clientIn, s.clientOut = nil, nil after := s.idleAfter m.mu.Unlock() - if cancel != nil { - cancel() - } - // Close clientIn to terminate clientIn->internalIn forwarder if client forgot. if cin != nil { + // We own the channel. Closing signals writers to stop. close(cin) } @@ -369,11 +376,10 @@ func (m *Manager) CloseSession(id uuid.UUID) error { ids = append(ids, k) } cancelInternal := s.cancelInternal - cancelClient := s.cancelClient - // Clear attachments before unlock to avoid races. - s.cancelClient = nil + // Snapshot clientIn/Out for shutdown signals after unlock. cin := s.clientIn - s.clientIn, s.clientOut = nil, nil + cout := s.clientOut + // Remove from map before unlock to prevent new work. delete(m.sessions, id) m.mu.Unlock() @@ -385,17 +391,19 @@ func (m *Manager) CloseSession(id uuid.UUID) error { } } - // Stop forwarders and close internal channels. - if cancelClient != nil { - cancelClient() - } + // Stop inbound forwarder and close internals. if cancelInternal != nil { cancelInternal() } - close(s.internalIn) - close(s.internalOut) // will close clientOut via forwarder + close(s.internalIn) // end internalIn forwarder + close(s.internalOut) // signal drain to finish - // Close clientIn to ensure its forwarder exits even if client forgot. + // Wait drain exit, then close clientOut if attached at close time. + s.egressWG.Wait() + if cout != nil { + close(cout) + } + // Close clientIn to stop client writers if still attached. if cin != nil { close(cin) } @@ -427,7 +435,7 @@ func (m *Manager) RemoveProvider(name string) error { if !ok { return fmt.Errorf("provider not found: %s", name) } - // Optional: implement full drain and cancel of all streams for this provider. + // TODO: implement full drain and cancel of all streams for this provider if needed. return fmt.Errorf("RemoveProvider not implemented") }