Small session management change: enhance ChannelOpts with inbound drop control, improve session struct for client attachment handling, and streamline message forwarding logic
This commit is contained in:
@@ -29,8 +29,8 @@ const (
|
||||
type ChannelOpts struct {
|
||||
InBufSize int
|
||||
OutBufSize int
|
||||
// If true, drop to clientOut when its buffer is full. If false, block.
|
||||
DropOutbound bool
|
||||
DropOutbound bool // If true, drop outbound to client when its buffer is full. If false, block.
|
||||
DropInbound bool // If true, drop inbound from client when internalIn is full. If false, block.
|
||||
}
|
||||
|
||||
// Manager owns providers, sessions, and the router fanout.
|
||||
@@ -48,20 +48,26 @@ type Manager struct {
|
||||
type session struct {
|
||||
id uuid.UUID
|
||||
|
||||
// Stable internal channels. Only the session writes internalOut and reads internalIn.
|
||||
// Stable internal channels.
|
||||
internalIn chan domain.Message // forwarded into router.IncomingChannel()
|
||||
internalOut chan domain.Message // registered as router route target
|
||||
internalOut chan domain.Message // registered as router route target, forwarded to clientOut (or dropped if unattached)
|
||||
|
||||
// Current client attachment (optional). Created by GetChannels.
|
||||
// Client Channels (optional). Created on GetChannels and cleared on DetachClient.
|
||||
clientIn chan domain.Message // caller writes
|
||||
clientOut chan domain.Message // caller reads
|
||||
|
||||
// Cancels the permanent internalIn forwarder.
|
||||
// Controls the permanent internalIn forwarder.
|
||||
cancelInternal context.CancelFunc
|
||||
// Cancels current client forwarders.
|
||||
cancelClient context.CancelFunc
|
||||
|
||||
bound map[domain.Identifier]struct{}
|
||||
// Permanent outbound drain control.
|
||||
egressWG sync.WaitGroup
|
||||
|
||||
// Policy
|
||||
dropWhenUnattached bool // always drop when no client attached
|
||||
dropWhenSlow bool // mirror ChannelOpts.DropOutbound
|
||||
dropInbound bool // mirror ChannelOpts.DropInbound
|
||||
|
||||
bound map[domain.Identifier]struct{} // map for quick existence checks
|
||||
closed bool
|
||||
idleAfter time.Duration
|
||||
idleTimer *time.Timer
|
||||
@@ -78,8 +84,8 @@ func NewManager(r *router.Router) *Manager {
|
||||
}
|
||||
}
|
||||
|
||||
// NewSession creates a session with stable internal channels and a permanent
|
||||
// forwarder that pipes internalIn into router.IncomingChannel().
|
||||
// NewSession creates a session with stable internal channels and two permanent workers:
|
||||
// 1) internalIn -> router.Incoming 2) internalOut -> clientOut (or discard if unattached)
|
||||
func (m *Manager) NewSession(idleAfter time.Duration) (uuid.UUID, error) {
|
||||
s := &session{
|
||||
id: uuid.New(),
|
||||
@@ -87,6 +93,7 @@ func (m *Manager) NewSession(idleAfter time.Duration) (uuid.UUID, error) {
|
||||
internalOut: make(chan domain.Message, defaultInternalBuf),
|
||||
bound: make(map[domain.Identifier]struct{}),
|
||||
idleAfter: idleAfter,
|
||||
dropWhenUnattached: true,
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
s.cancelInternal = cancel
|
||||
@@ -106,17 +113,52 @@ func (m *Manager) NewSession(idleAfter time.Duration) (uuid.UUID, error) {
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
// Place to filter, validate, meter, or throttle.
|
||||
// Hook: filter/validate/meter/throttle inbound to router here.
|
||||
incoming <- msg
|
||||
}
|
||||
}
|
||||
}(ctx, s.internalIn)
|
||||
|
||||
// Permanent drain: internalOut -> clientOut (drop if unattached)
|
||||
s.egressWG.Add(1)
|
||||
go func(sid uuid.UUID) {
|
||||
defer s.egressWG.Done()
|
||||
for msg := range s.internalOut {
|
||||
m.mu.Lock()
|
||||
// Session might be gone; re-fetch safely.
|
||||
s, ok := m.sessions[sid]
|
||||
var cout chan domain.Message
|
||||
var dropSlow, attached bool
|
||||
if ok {
|
||||
cout = s.clientOut
|
||||
dropSlow = s.dropWhenSlow
|
||||
attached = cout != nil
|
||||
}
|
||||
m.mu.Unlock()
|
||||
|
||||
switch {
|
||||
case !attached:
|
||||
// unattached => drop
|
||||
|
||||
case dropSlow: // typical case when attached
|
||||
select {
|
||||
case cout <- msg:
|
||||
default:
|
||||
// drop on slow consumer
|
||||
}
|
||||
|
||||
default:
|
||||
cout <- msg // push to client, block if slow
|
||||
}
|
||||
}
|
||||
}(s.id)
|
||||
|
||||
return s.id, nil
|
||||
}
|
||||
|
||||
// GetChannels creates a fresh client attachment and hooks both directions:
|
||||
// clientIn -> internalIn and internalOut -> clientOut. Only one attachment at a time.
|
||||
// GetChannels creates a fresh client attachment and hooks inbound (clientIn -> internalIn).
|
||||
// Outbound delivery is handled by the permanent drain.
|
||||
// Only one attachment at a time.
|
||||
func (m *Manager) GetChannels(id uuid.UUID, opts ChannelOpts) (chan<- domain.Message, <-chan domain.Message, error) {
|
||||
if opts.InBufSize <= 0 {
|
||||
opts.InBufSize = defaultClientBuf
|
||||
@@ -144,6 +186,8 @@ func (m *Manager) GetChannels(id uuid.UUID, opts ChannelOpts) (chan<- domain.Mes
|
||||
cin := make(chan domain.Message, opts.InBufSize)
|
||||
cout := make(chan domain.Message, opts.OutBufSize)
|
||||
s.clientIn, s.clientOut = cin, cout
|
||||
s.dropWhenSlow = opts.DropOutbound
|
||||
s.dropInbound = opts.DropInbound
|
||||
|
||||
// Stop idle timer while attached.
|
||||
if s.idleTimer != nil {
|
||||
@@ -152,62 +196,30 @@ func (m *Manager) GetChannels(id uuid.UUID, opts ChannelOpts) (chan<- domain.Mes
|
||||
}
|
||||
|
||||
internalIn := s.internalIn
|
||||
internalOut := s.internalOut
|
||||
|
||||
// Prepare per-attachment cancel.
|
||||
attachCtx, attachCancel := context.WithCancel(context.Background())
|
||||
s.cancelClient = attachCancel
|
||||
|
||||
m.mu.Unlock()
|
||||
|
||||
// Forward clientIn -> internalIn
|
||||
go func(ctx context.Context, src <-chan domain.Message, dst chan<- domain.Message) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case msg, ok := <-src:
|
||||
if !ok {
|
||||
// Client closed input; stop forwarding.
|
||||
return
|
||||
}
|
||||
// Per-client checks could go here.
|
||||
dst <- msg
|
||||
}
|
||||
}
|
||||
}(attachCtx, cin, internalIn)
|
||||
|
||||
// Forward internalOut -> clientOut
|
||||
go func(ctx context.Context, src <-chan domain.Message, dst chan<- domain.Message, drop bool) {
|
||||
defer close(dst)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case msg, ok := <-src:
|
||||
if !ok {
|
||||
// Session is closing; signal EOF to client.
|
||||
return
|
||||
}
|
||||
go func(src <-chan domain.Message, dst chan<- domain.Message, drop bool) {
|
||||
for msg := range src {
|
||||
if drop {
|
||||
select {
|
||||
case dst <- msg:
|
||||
default:
|
||||
// Drop on client backpressure. Add metrics if desired.
|
||||
// drop inbound on internal backpressure
|
||||
}
|
||||
} else {
|
||||
dst <- msg
|
||||
}
|
||||
}
|
||||
}
|
||||
}(attachCtx, internalOut, cout, opts.DropOutbound)
|
||||
// client closed input; forwarder exits
|
||||
}(cin, internalIn, opts.DropInbound)
|
||||
|
||||
// Return directional views.
|
||||
return (chan<- domain.Message)(cin), (<-chan domain.Message)(cout), nil
|
||||
}
|
||||
|
||||
// DetachClient cancels current client forwarders and clears the attachment.
|
||||
// It starts the idle close timer if configured.
|
||||
// DetachClient clears the client attachment and starts the idle close timer if configured.
|
||||
// Does not close clientOut to avoid send-on-closed races with the permanent drain.
|
||||
func (m *Manager) DetachClient(id uuid.UUID) error {
|
||||
m.mu.Lock()
|
||||
s, ok := m.sessions[id]
|
||||
@@ -219,19 +231,14 @@ func (m *Manager) DetachClient(id uuid.UUID) error {
|
||||
m.mu.Unlock()
|
||||
return ErrSessionClosed
|
||||
}
|
||||
// Capture and clear client state.
|
||||
cancel := s.cancelClient
|
||||
cin := s.clientIn
|
||||
s.cancelClient = nil
|
||||
// Make unattached; permanent drain will drop while nil.
|
||||
s.clientIn, s.clientOut = nil, nil
|
||||
after := s.idleAfter
|
||||
m.mu.Unlock()
|
||||
|
||||
if cancel != nil {
|
||||
cancel()
|
||||
}
|
||||
// Close clientIn to terminate clientIn->internalIn forwarder if client forgot.
|
||||
if cin != nil {
|
||||
// We own the channel. Closing signals writers to stop.
|
||||
close(cin)
|
||||
}
|
||||
|
||||
@@ -369,11 +376,10 @@ func (m *Manager) CloseSession(id uuid.UUID) error {
|
||||
ids = append(ids, k)
|
||||
}
|
||||
cancelInternal := s.cancelInternal
|
||||
cancelClient := s.cancelClient
|
||||
// Clear attachments before unlock to avoid races.
|
||||
s.cancelClient = nil
|
||||
// Snapshot clientIn/Out for shutdown signals after unlock.
|
||||
cin := s.clientIn
|
||||
s.clientIn, s.clientOut = nil, nil
|
||||
cout := s.clientOut
|
||||
// Remove from map before unlock to prevent new work.
|
||||
delete(m.sessions, id)
|
||||
m.mu.Unlock()
|
||||
|
||||
@@ -385,17 +391,19 @@ func (m *Manager) CloseSession(id uuid.UUID) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Stop forwarders and close internal channels.
|
||||
if cancelClient != nil {
|
||||
cancelClient()
|
||||
}
|
||||
// Stop inbound forwarder and close internals.
|
||||
if cancelInternal != nil {
|
||||
cancelInternal()
|
||||
}
|
||||
close(s.internalIn)
|
||||
close(s.internalOut) // will close clientOut via forwarder
|
||||
close(s.internalIn) // end internalIn forwarder
|
||||
close(s.internalOut) // signal drain to finish
|
||||
|
||||
// Close clientIn to ensure its forwarder exits even if client forgot.
|
||||
// Wait drain exit, then close clientOut if attached at close time.
|
||||
s.egressWG.Wait()
|
||||
if cout != nil {
|
||||
close(cout)
|
||||
}
|
||||
// Close clientIn to stop client writers if still attached.
|
||||
if cin != nil {
|
||||
close(cin)
|
||||
}
|
||||
@@ -427,7 +435,7 @@ func (m *Manager) RemoveProvider(name string) error {
|
||||
if !ok {
|
||||
return fmt.Errorf("provider not found: %s", name)
|
||||
}
|
||||
// Optional: implement full drain and cancel of all streams for this provider.
|
||||
// TODO: implement full drain and cancel of all streams for this provider if needed.
|
||||
return fmt.Errorf("RemoveProvider not implemented")
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user