From ba650785bc91c5ff1751a13d21b755f4291e477f Mon Sep 17 00:00:00 2001 From: Phillip Michelsen Date: Sun, 21 Sep 2025 18:14:47 +0000 Subject: [PATCH] Adding IsRunning() method to Worker interface; Minor fixes to BaseWorker methods --- .../data_service/internal/worker/worker.go | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/services/data_service/internal/worker/worker.go b/services/data_service/internal/worker/worker.go index 00d5c48..7921a4a 100644 --- a/services/data_service/internal/worker/worker.go +++ b/services/data_service/internal/worker/worker.go @@ -19,9 +19,10 @@ var ( ) type Worker interface { - ID() uuid.UUID Start(workerID uuid.UUID, controller manager.SessionController, cfg map[string]string) error Stop() error + IsRunning() bool + ID() uuid.UUID } type BaseStatefulWorker struct { @@ -36,7 +37,7 @@ type BaseStatefulWorker struct { mu sync.RWMutex } -func (w *BaseStatefulWorker) Start(workerID uuid.UUID, sessionController manager.SessionController, _ map[string]string) error { +func (w *BaseStatefulWorker) Start(workerUUID uuid.UUID, sessionController manager.SessionController, _ map[string]string) error { if sessionController == nil { return errors.New("nil SessionController provided") } @@ -44,7 +45,7 @@ func (w *BaseStatefulWorker) Start(workerID uuid.UUID, sessionController manager w.mu.Lock() if w.running { w.mu.Unlock() - return ErrWorkerNotRunning + return ErrWorkerRunning } sid := sessionController.NewSession(time.Second * 30) @@ -55,6 +56,7 @@ func (w *BaseStatefulWorker) Start(workerID uuid.UUID, sessionController manager } w.sc, w.in, w.out = sessionController, in, out + w.workerUUID = workerUUID w.running = true w.mu.Unlock() @@ -86,15 +88,30 @@ func (w *BaseStatefulWorker) Stop() error { return nil } -func (w *BaseStatefulWorker) ID() uuid.UUID { return w.workerUUID } +func (w *BaseStatefulWorker) IsRunning() bool { + w.mu.RLock() + running := w.running + w.mu.RUnlock() -func (w *BaseStatefulWorker) Configure(ids []domain.Identifier) error { + return running +} + +func (w *BaseStatefulWorker) ID() uuid.UUID { + w.mu.RLock() + id := w.workerUUID + w.mu.RUnlock() + + return id +} + +func (w *BaseStatefulWorker) SetStreams(ids []domain.Identifier) error { w.mu.RLock() if !w.running { w.mu.RUnlock() return ErrWorkerNotRunning } + w.mu.RUnlock() return w.sc.ConfigureSession(w.sid, ids) }