From ef4a28fb29c8264de21d1a5dcf3b5b7274f17d8f Mon Sep 17 00:00:00 2001 From: Phillip Michelsen Date: Sat, 9 Aug 2025 15:17:51 +0000 Subject: [PATCH] Update Dockerfile and main.go: upgrade Go version, optimize build stages, and adjust binary output path --- services/data_service/Dockerfile | 30 ++- .../cmd/{ => data_service}/main.go | 0 services/data_service/cmd/stream_tap/main.go | 209 ++++++++++++++++++ .../provider/binance/futures_websocket.go | 2 + 4 files changed, 224 insertions(+), 17 deletions(-) rename services/data_service/cmd/{ => data_service}/main.go (100%) create mode 100644 services/data_service/cmd/stream_tap/main.go diff --git a/services/data_service/Dockerfile b/services/data_service/Dockerfile index 8b834d0..0385164 100644 --- a/services/data_service/Dockerfile +++ b/services/data_service/Dockerfile @@ -1,25 +1,21 @@ -# Stage 1: Builder -FROM golang:latest AS builder +# ---- Builder ---- +FROM golang:1.24-alpine AS builder +ENV CGO_ENABLED=0 GOOS=linux WORKDIR /app -# Copy go mod and download deps COPY go.mod go.sum ./ RUN go mod download -# Copy source COPY . . +RUN --mount=type=cache,target=/root/.cache/go-build \ + --mount=type=cache,target=/go/pkg/mod \ + go build -trimpath -ldflags="-s -w" \ + -o /out/data-service ./services/data_service/cmd/data_service -# Build binary -RUN CGO_ENABLED=0 GOOS=linux go build -o /data_service ./services/data_service/cmd - -# Stage 2: Runner -FROM debian:bullseye-slim - -# Minimal setup -RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* - -# Copy binary -COPY --from=builder /data_service /usr/local/bin/data_service - -ENTRYPOINT ["data_service"] +# ---- Runtime ---- +FROM gcr.io/distroless/static:nonroot +EXPOSE 50051 50052 6000 +COPY --from=builder /out/data-service /bin/data-service +USER nonroot:nonroot +ENTRYPOINT ["/bin/data-service"] diff --git a/services/data_service/cmd/main.go b/services/data_service/cmd/data_service/main.go similarity index 100% rename from services/data_service/cmd/main.go rename to services/data_service/cmd/data_service/main.go diff --git a/services/data_service/cmd/stream_tap/main.go b/services/data_service/cmd/stream_tap/main.go new file mode 100644 index 0000000..32d4c52 --- /dev/null +++ b/services/data_service/cmd/stream_tap/main.go @@ -0,0 +1,209 @@ +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "os" + "os/signal" + "strings" + "syscall" + "time" + + pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" +) + +type idsFlag []string + +func (i *idsFlag) String() string { return strings.Join(*i, ",") } +func (i *idsFlag) Set(v string) error { + if v == "" { + return nil + } + *i = append(*i, v) + return nil +} + +func parseID(s string) (provider, subject string, err error) { + parts := strings.SplitN(s, ":", 2) + if len(parts) != 2 || parts[0] == "" || parts[1] == "" { + return "", "", fmt.Errorf("want provider:subject, got %q", s) + } + return parts[0], parts[1], nil +} + +func prettyOrRaw(b []byte, pretty bool) string { + if !pretty || len(b) == 0 { + return string(b) + } + var tmp any + if err := json.Unmarshal(b, &tmp); err != nil { + return string(b) // not JSON + } + out, err := json.MarshalIndent(tmp, "", " ") + if err != nil { + return string(b) + } + return string(out) +} + +// waitReady blocks until conn is READY or ctx times out/cancels. +func waitReady(ctx context.Context, conn *grpc.ClientConn) error { + for { + s := conn.GetState() + if s == connectivity.Ready { + return nil + } + if !conn.WaitForStateChange(ctx, s) { + if ctx.Err() != nil { + return ctx.Err() + } + return fmt.Errorf("WaitForStateChange returned without state change") + } + } +} + +//goland:noinspection GoUnhandledErrorResult +func main() { + var ids idsFlag + var ctlAddr string + var strAddr string + var pretty bool + var timeout time.Duration + + flag.Var(&ids, "id", "identifier in form provider:subject (repeatable)") + flag.StringVar(&ctlAddr, "ctl", "127.0.0.1:50051", "gRPC control address") + flag.StringVar(&strAddr, "str", "127.0.0.1:50052", "gRPC streaming address") + flag.BoolVar(&pretty, "pretty", true, "pretty-print JSON payloads when possible") + flag.DurationVar(&timeout, "timeout", 10*time.Second, "start/config/connect timeout") + flag.Parse() + + if len(ids) == 0 { + fmt.Fprintln(os.Stderr, "provide at least one --id provider:subject") + os.Exit(2) + } + + // Ctrl-C handling + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + // ----- Control client ----- + ccCtl, err := grpc.NewClient( + ctlAddr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + fmt.Fprintf(os.Stderr, "new control client: %v\n", err) + os.Exit(1) + } + defer ccCtl.Close() + + ccCtl.Connect() // start dialing in background + + ctlConnCtx, cancelCtlConn := context.WithTimeout(ctx, timeout) + if err := waitReady(ctlConnCtx, ccCtl); err != nil { + cancelCtlConn() + fmt.Fprintf(os.Stderr, "connect control: %v\n", err) + os.Exit(1) + } + cancelCtlConn() + + ctl := pb.NewDataServiceControlClient(ccCtl) + + // Start stream + ctxStart, cancelStart := context.WithTimeout(ctx, timeout) + startResp, err := ctl.StartStream(ctxStart, &pb.StartStreamRequest{}) + cancelStart() + if err != nil { + fmt.Fprintf(os.Stderr, "StartStream: %v\n", err) + os.Exit(1) + } + streamUUID := startResp.GetStreamUuid() + fmt.Printf("stream: %s\n", streamUUID) + + // Configure + var pbIDs []*pb.Identifier + for _, s := range ids { + prov, subj, err := parseID(s) + if err != nil { + fmt.Fprintf(os.Stderr, "bad --id: %v\n", err) + os.Exit(2) + } + pbIDs = append(pbIDs, &pb.Identifier{Provider: prov, Subject: subj}) + } + + ctxCfg, cancelCfg := context.WithTimeout(ctx, timeout) + _, err = ctl.ConfigureStream(ctxCfg, &pb.ConfigureStreamRequest{ + StreamUuid: streamUUID, + Identifiers: pbIDs, + }) + cancelCfg() + if err != nil { + fmt.Fprintf(os.Stderr, "ConfigureStream: %v\n", err) + os.Exit(1) + } + fmt.Printf("configured %d identifiers\n", len(pbIDs)) + + // ----- Streaming client ----- + ccStr, err := grpc.NewClient( + strAddr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + fmt.Fprintf(os.Stderr, "new streaming client: %v\n", err) + os.Exit(1) + } + defer ccStr.Close() + + ccStr.Connect() + + strConnCtx, cancelStrConn := context.WithTimeout(ctx, timeout) + if err := waitReady(strConnCtx, ccStr); err != nil { + cancelStrConn() + fmt.Fprintf(os.Stderr, "connect streaming: %v\n", err) + os.Exit(1) + } + cancelStrConn() + + str := pb.NewDataServiceStreamingClient(ccStr) + + // This context lives until Ctrl-C + streamCtx, streamCancel := context.WithCancel(ctx) + defer streamCancel() + + stream, err := str.ConnectStream(streamCtx, &pb.ConnectStreamRequest{StreamUuid: streamUUID}) + if err != nil { + fmt.Fprintf(os.Stderr, "ConnectStream: %v\n", err) + os.Exit(1) + } + fmt.Println("connected; streaming… (Ctrl-C to quit)") + + // Receive loop until Ctrl-C + for { + select { + case <-ctx.Done(): + fmt.Println("\nshutting down") + return + default: + msg, err := stream.Recv() + if err != nil { + if ctx.Err() != nil { + return // normal shutdown + } + fmt.Fprintf(os.Stderr, "recv: %v\n", err) + os.Exit(1) + } + id := msg.GetIdentifier() + fmt.Printf("[%s] %s bytes=%d enc=%s t=%s\n", + id.GetProvider(), id.GetSubject(), len(msg.GetPayload()), msg.GetEncoding(), + time.Now().Format(time.RFC3339Nano), + ) + fmt.Println(prettyOrRaw(msg.GetPayload(), pretty)) + fmt.Println("---") + } + } +} diff --git a/services/data_service/internal/provider/binance/futures_websocket.go b/services/data_service/internal/provider/binance/futures_websocket.go index 4a3faa7..ab82d6e 100644 --- a/services/data_service/internal/provider/binance/futures_websocket.go +++ b/services/data_service/internal/provider/binance/futures_websocket.go @@ -77,6 +77,8 @@ func (b *FuturesWebsocket) CancelStream(subject string) { } _ = b.conn.WriteJSON(msg) + fmt.Println("Unsubscribed from stream:", subject) + delete(b.activeStreams, subject) }