Adding IsRunning() method to Worker interface; Minor fixes to BaseWorker methods

This commit is contained in:
2025-09-21 18:14:47 +00:00
parent 4e52950458
commit ba650785bc

View File

@@ -19,9 +19,10 @@ var (
) )
type Worker interface { type Worker interface {
ID() uuid.UUID
Start(workerID uuid.UUID, controller manager.SessionController, cfg map[string]string) error Start(workerID uuid.UUID, controller manager.SessionController, cfg map[string]string) error
Stop() error Stop() error
IsRunning() bool
ID() uuid.UUID
} }
type BaseStatefulWorker struct { type BaseStatefulWorker struct {
@@ -36,7 +37,7 @@ type BaseStatefulWorker struct {
mu sync.RWMutex mu sync.RWMutex
} }
func (w *BaseStatefulWorker) Start(workerID uuid.UUID, sessionController manager.SessionController, _ map[string]string) error { func (w *BaseStatefulWorker) Start(workerUUID uuid.UUID, sessionController manager.SessionController, _ map[string]string) error {
if sessionController == nil { if sessionController == nil {
return errors.New("nil SessionController provided") return errors.New("nil SessionController provided")
} }
@@ -44,7 +45,7 @@ func (w *BaseStatefulWorker) Start(workerID uuid.UUID, sessionController manager
w.mu.Lock() w.mu.Lock()
if w.running { if w.running {
w.mu.Unlock() w.mu.Unlock()
return ErrWorkerNotRunning return ErrWorkerRunning
} }
sid := sessionController.NewSession(time.Second * 30) sid := sessionController.NewSession(time.Second * 30)
@@ -55,6 +56,7 @@ func (w *BaseStatefulWorker) Start(workerID uuid.UUID, sessionController manager
} }
w.sc, w.in, w.out = sessionController, in, out w.sc, w.in, w.out = sessionController, in, out
w.workerUUID = workerUUID
w.running = true w.running = true
w.mu.Unlock() w.mu.Unlock()
@@ -86,15 +88,30 @@ func (w *BaseStatefulWorker) Stop() error {
return nil return nil
} }
func (w *BaseStatefulWorker) ID() uuid.UUID { return w.workerUUID } func (w *BaseStatefulWorker) IsRunning() bool {
w.mu.RLock()
running := w.running
w.mu.RUnlock()
func (w *BaseStatefulWorker) Configure(ids []domain.Identifier) error { return running
}
func (w *BaseStatefulWorker) ID() uuid.UUID {
w.mu.RLock()
id := w.workerUUID
w.mu.RUnlock()
return id
}
func (w *BaseStatefulWorker) SetStreams(ids []domain.Identifier) error {
w.mu.RLock() w.mu.RLock()
if !w.running { if !w.running {
w.mu.RUnlock() w.mu.RUnlock()
return ErrWorkerNotRunning return ErrWorkerNotRunning
} }
w.mu.RUnlock()
return w.sc.ConfigureSession(w.sid, ids) return w.sc.ConfigureSession(w.sid, ids)
} }