From 40fda456eb69bbeca2c3a144b1d15eb36c772c66 Mon Sep 17 00:00:00 2001 From: Phillip Michelsen Date: Fri, 5 Sep 2025 07:07:33 +0000 Subject: [PATCH] Updated grpc servers to support new Manager API --- .../data_service/cmd/data_service/main.go | 24 +++++----- .../internal/server/gprc_control_server.go | 45 ++++++++++++++----- .../internal/server/grpc_streaming_server.go | 30 +++++++++---- ...rver.go => socket_streaming_server.go.bak} | 0 4 files changed, 70 insertions(+), 29 deletions(-) rename services/data_service/internal/server/{socket_streaming_server.go => socket_streaming_server.go.bak} (100%) diff --git a/services/data_service/cmd/data_service/main.go b/services/data_service/cmd/data_service/main.go index 0b76daf..3b08ecd 100644 --- a/services/data_service/cmd/data_service/main.go +++ b/services/data_service/cmd/data_service/main.go @@ -53,17 +53,19 @@ func main() { }() // Socket Streaming Server - socketStreamingServer := server.NewSocketStreamingServer(m) - go func() { - socketLis, err := net.Listen("tcp", ":6000") - if err != nil { - log.Fatalf("Failed to listen for socket: %v", err) - } - log.Println("Socket server listening on :6000") - if err := socketStreamingServer.Serve(socketLis); err != nil { - log.Fatalf("Socket server error: %v", err) - } - }() + /* + socketStreamingServer := server.NewSocketStreamingServer(m) + go func() { + socketLis, err := net.Listen("tcp", ":6000") + if err != nil { + log.Fatalf("Failed to listen for socket: %v", err) + } + log.Println("Socket server listening on :6000") + if err := socketStreamingServer.Serve(socketLis); err != nil { + log.Fatalf("Socket server error: %v", err) + } + }() + */ // Block main forever select {} diff --git a/services/data_service/internal/server/gprc_control_server.go b/services/data_service/internal/server/gprc_control_server.go index 52f180f..4fc0257 100644 --- a/services/data_service/internal/server/gprc_control_server.go +++ b/services/data_service/internal/server/gprc_control_server.go @@ -2,7 +2,7 @@ package server import ( "context" - "fmt" + "time" "github.com/google/uuid" pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service" @@ -21,21 +21,28 @@ func NewGRPCControlServer(m *manager.Manager) *GRPCControlServer { return &GRPCControlServer{manager: m} } -func (s *GRPCControlServer) StartStream(_ context.Context, _ *pb.StartStreamRequest) (*pb.StartStreamResponse, error) { - streamID, err := s.manager.StartClientStream() +// StartStream creates a new session. It does NOT attach client channels. +// Your streaming RPC should later call GetChannels(sessionID, opts). +func (s *GRPCControlServer) StartStream(_ context.Context, req *pb.StartStreamRequest) (*pb.StartStreamResponse, error) { + sessionID, err := s.manager.NewSession(time.Duration(1) * time.Minute) // timeout set to 1 minute if err != nil { - return nil, fmt.Errorf("failed to start stream: %w", err) + return nil, status.Errorf(codes.Internal, "new session: %v", err) } - return &pb.StartStreamResponse{StreamUuid: streamID.String()}, nil + return &pb.StartStreamResponse{StreamUuid: sessionID.String()}, nil } +// ConfigureStream sets the session's subscriptions in one shot. +// It does NOT require channels to be attached. func (s *GRPCControlServer) ConfigureStream(_ context.Context, req *pb.ConfigureStreamRequest) (*pb.ConfigureStreamResponse, error) { + if req == nil { + return nil, status.Error(codes.InvalidArgument, "nil request") + } streamID, err := uuid.Parse(req.StreamUuid) if err != nil { return nil, status.Errorf(codes.InvalidArgument, "invalid stream_uuid %q: %v", req.StreamUuid, err) } - var ids []domain.Identifier + ids := make([]domain.Identifier, 0, len(req.Identifiers)) for _, in := range req.Identifiers { id, e := domain.ParseIdentifier(in.Key) if e != nil { @@ -44,19 +51,37 @@ func (s *GRPCControlServer) ConfigureStream(_ context.Context, req *pb.Configure ids = append(ids, id) } - if err := s.manager.ConfigureClientStream(streamID, ids); err != nil { - return nil, status.Errorf(codes.InvalidArgument, "configure failed: %v", err) + if err := s.manager.SetSubscriptions(streamID, ids); err != nil { + // Map common manager errors to gRPC codes. + switch err { + case manager.ErrSessionNotFound: + return nil, status.Errorf(codes.NotFound, "session not found: %v", err) + case manager.ErrSessionClosed: + return nil, status.Errorf(codes.FailedPrecondition, "session closed: %v", err) + default: + return nil, status.Errorf(codes.Internal, "set subscriptions: %v", err) + } } return &pb.ConfigureStreamResponse{}, nil } +// StopStream closes the session and tears down routes and streams. func (s *GRPCControlServer) StopStream(_ context.Context, req *pb.StopStreamRequest) (*pb.StopStreamResponse, error) { + if req == nil { + return nil, status.Error(codes.InvalidArgument, "nil request") + } streamID, err := uuid.Parse(req.StreamUuid) if err != nil { return nil, status.Errorf(codes.InvalidArgument, "invalid stream_uuid %q: %v", req.StreamUuid, err) } - if err := s.manager.StopClientStream(streamID); err != nil { - return nil, status.Errorf(codes.Internal, "failed to stop stream: %v", err) + + if err := s.manager.CloseSession(streamID); err != nil { + switch err { + case manager.ErrSessionNotFound: + return nil, status.Errorf(codes.NotFound, "session not found: %v", err) + default: + return nil, status.Errorf(codes.Internal, "close session: %v", err) + } } return &pb.StopStreamResponse{}, nil } diff --git a/services/data_service/internal/server/grpc_streaming_server.go b/services/data_service/internal/server/grpc_streaming_server.go index c0768f6..65afb13 100644 --- a/services/data_service/internal/server/grpc_streaming_server.go +++ b/services/data_service/internal/server/grpc_streaming_server.go @@ -17,25 +17,39 @@ func NewGRPCStreamingServer(m *manager.Manager) *GRPCStreamingServer { return &GRPCStreamingServer{manager: m} } +// ConnectStream attaches a client to an existing session and streams outbound messages. +// This is server-streaming only; inbound use is optional and ignored here. func (s *GRPCStreamingServer) ConnectStream(req *pb.ConnectStreamRequest, stream pb.DataServiceStreaming_ConnectStreamServer) error { - streamUUID, err := uuid.Parse(req.StreamUuid) + if req == nil { + return fmt.Errorf("nil request") + } + sessionID, err := uuid.Parse(req.StreamUuid) if err != nil { return fmt.Errorf("invalid UUID: %w", err) } - ch, err := s.manager.ConnectClientStream(streamUUID) - if err != nil { - return fmt.Errorf("failed to connect: %w", err) + // Defaults; tune or map from req if your proto carries options. + opts := manager.ChannelOpts{ + InBufSize: 256, + OutBufSize: 1024, + DropOutbound: true, // do not let slow clients stall producers + DropInbound: true, // irrelevant here (we don't send inbound), safe default } + _, out, err := s.manager.GetChannels(sessionID, opts) + if err != nil { + return fmt.Errorf("attach channels: %w", err) + } + defer func() { _ = s.manager.DetachClient(sessionID) }() + + ctx := stream.Context() for { select { - case <-stream.Context().Done(): - s.manager.DisconnectClientStream(streamUUID) + case <-ctx.Done(): return nil - case msg, ok := <-ch: + case msg, ok := <-out: if !ok { - return nil + return nil // session closed } if err := stream.Send(&pb.Message{ Identifier: &pb.Identifier{Key: msg.Identifier.Key()}, diff --git a/services/data_service/internal/server/socket_streaming_server.go b/services/data_service/internal/server/socket_streaming_server.go.bak similarity index 100% rename from services/data_service/internal/server/socket_streaming_server.go rename to services/data_service/internal/server/socket_streaming_server.go.bak