From dec67f2565e26bf59be570cd743ffa14f1c046ee Mon Sep 17 00:00:00 2001 From: Phillip Michelsen Date: Sat, 9 Aug 2025 14:45:05 +0000 Subject: [PATCH] Refactor socket streaming server: improve connection handling, switch to protobuf for message encoding, and optimize write operations --- .../server/socket_streaming_server.go | 115 +++++++++++++----- 1 file changed, 85 insertions(+), 30 deletions(-) diff --git a/services/data_service/internal/server/socket_streaming_server.go b/services/data_service/internal/server/socket_streaming_server.go index fd166f8..a551391 100644 --- a/services/data_service/internal/server/socket_streaming_server.go +++ b/services/data_service/internal/server/socket_streaming_server.go @@ -2,14 +2,16 @@ package server import ( "bufio" - "encoding/json" + "encoding/binary" "fmt" "io" "net" "strings" "github.com/google/uuid" + pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager" + "google.golang.org/protobuf/proto" ) type SocketStreamingServer struct { @@ -17,17 +19,15 @@ type SocketStreamingServer struct { } func NewSocketStreamingServer(m *manager.Manager) *SocketStreamingServer { - return &SocketStreamingServer{ - manager: m, - } + return &SocketStreamingServer{manager: m} } -// Serve accepts a listener (TCP or Unix) and begins handling incoming connections. +// Accepts connections and hands each off to handleConnection. func (s *SocketStreamingServer) Serve(lis net.Listener) error { for { conn, err := lis.Accept() if err != nil { - fmt.Printf("Failed to accept connection: %v\n", err) + fmt.Printf("accept error: %v\n", err) continue } go s.handleConnection(conn) @@ -35,57 +35,112 @@ func (s *SocketStreamingServer) Serve(lis net.Listener) error { } func (s *SocketStreamingServer) handleConnection(conn net.Conn) { - defer func(conn net.Conn) { - err := conn.Close() - if err != nil { - fmt.Printf("Failed to close connection: %v\n", err) + defer func() { + if err := conn.Close(); err != nil { + fmt.Printf("conn close error: %v\n", err) } else { - fmt.Println("Connection closed") + fmt.Println("connection closed") } - }(conn) + }() + + // Low-latency socket hints (best-effort). + if tc, ok := conn.(*net.TCPConn); ok { + _ = tc.SetNoDelay(true) + _ = tc.SetWriteBuffer(512 * 1024) + _ = tc.SetReadBuffer(512 * 1024) + } + reader := bufio.NewReader(conn) + // Protocol header: first line is the stream UUID. raw, err := reader.ReadString('\n') if err != nil { - fmt.Printf("Failed to read stream UUID: %v\n", err) + fmt.Printf("read stream UUID error: %v\n", err) + _, _ = fmt.Fprint(conn, "Failed to read stream UUID\n") return } streamUUIDStr := strings.TrimSpace(raw) streamUUID, err := uuid.Parse(streamUUIDStr) if err != nil { - fmt.Fprintf(conn, "Invalid stream UUID\n") + _, _ = fmt.Fprint(conn, "Invalid stream UUID\n") return } outCh, err := s.manager.ConnectStream(streamUUID) if err != nil { - fmt.Fprintf(conn, "Failed to connect to stream: %v\n", err) + _, _ = fmt.Fprintf(conn, "Failed to connect to stream: %v\n", err) return } defer s.manager.DisconnectStream(streamUUID) + writer := bufio.NewWriterSize(conn, 256*1024) + defer func(writer *bufio.Writer) { + err := writer.Flush() + if err != nil { + fmt.Printf("final flush error: %v\n", err) + } + }(writer) + + const flushEvery = 32 + batch := 0 + for msg := range outCh { - payload := struct { - Provider string `json:"provider"` - Subject string `json:"subject"` - Data string `json:"data"` - }{ - Provider: msg.Identifier.Provider, - Subject: msg.Identifier.Subject, - Data: fmt.Sprintf("%s", msg.Payload), + // Build protobuf payload. + message := pb.Message{ + Identifier: &pb.Identifier{ + Provider: msg.Identifier.Provider, + Subject: msg.Identifier.Subject, + }, + Payload: msg.Payload, // []byte + Encoding: string(msg.Encoding), // e.g., "application/json" } - bytes, err := json.Marshal(payload) + // Marshal protobuf. + // Use MarshalAppend to reuse capacity and avoid an extra alloc. + size := proto.Size(&message) + buf := make([]byte, 0, size) + b, err := proto.MarshalOptions{}.MarshalAppend(buf, &message) if err != nil { - fmt.Printf("Failed to encode message: %v\n", err) + fmt.Printf("proto marshal error: %v\n", err) continue } - _, err = conn.Write(append(bytes, '\n')) - if err != nil { - if err != io.EOF { - fmt.Printf("Write error: %v\n", err) + + // Fixed 4-byte big-endian length prefix. + var hdr [4]byte + if len(b) > int(^uint32(0)) { + fmt.Printf("message too large: %d bytes\n", len(b)) + continue + } + binary.BigEndian.PutUint32(hdr[:], uint32(len(b))) + + // Write frame: [len][bytes]. + if _, err := writer.Write(hdr[:]); err != nil { + if err == io.EOF { + return } - break + fmt.Printf("write len error: %v\n", err) + return + } + if _, err := writer.Write(b); err != nil { + if err == io.EOF { + return + } + fmt.Printf("write body error: %v\n", err) + return + } + + batch++ + if batch >= flushEvery { + if err := writer.Flush(); err != nil { + fmt.Printf("flush error: %v\n", err) + return + } + batch = 0 } } + + // Final flush when channel closes. + if err := writer.Flush(); err != nil { + fmt.Printf("final flush error: %v\n", err) + } }