diff --git a/pkg/pb/data_service/data_service.pb.go b/pkg/pb/data_service/data_service.pb.go index a61b1fe..abe760f 100644 --- a/pkg/pb/data_service/data_service.pb.go +++ b/pkg/pb/data_service/data_service.pb.go @@ -77,7 +77,8 @@ func (x *Identifier) GetSubject() string { type Message struct { state protoimpl.MessageState `protogen:"open.v1"` Identifier *Identifier `protobuf:"bytes,1,opt,name=identifier,proto3" json:"identifier,omitempty"` - Payload string `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` // JSON-encoded data + Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` // JSON-encoded data + Encoding string `protobuf:"bytes,3,opt,name=encoding,proto3" json:"encoding,omitempty"` // e.g., "json", "protobuf" unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -119,10 +120,17 @@ func (x *Message) GetIdentifier() *Identifier { return nil } -func (x *Message) GetPayload() string { +func (x *Message) GetPayload() []byte { if x != nil { return x.Payload } + return nil +} + +func (x *Message) GetEncoding() string { + if x != nil { + return x.Encoding + } return "" } @@ -428,12 +436,13 @@ const file_pkg_pb_data_service_data_service_proto_rawDesc = "" + "\n" + "Identifier\x12\x1a\n" + "\bprovider\x18\x01 \x01(\tR\bprovider\x12\x18\n" + - "\asubject\x18\x02 \x01(\tR\asubject\"]\n" + + "\asubject\x18\x02 \x01(\tR\asubject\"y\n" + "\aMessage\x128\n" + "\n" + "identifier\x18\x01 \x01(\v2\x18.data_service.IdentifierR\n" + "identifier\x12\x18\n" + - "\apayload\x18\x02 \x01(\tR\apayload\"\x14\n" + + "\apayload\x18\x02 \x01(\fR\apayload\x12\x1a\n" + + "\bencoding\x18\x03 \x01(\tR\bencoding\"\x14\n" + "\x12StartStreamRequest\"6\n" + "\x13StartStreamResponse\x12\x1f\n" + "\vstream_uuid\x18\x01 \x01(\tR\n" + diff --git a/pkg/pb/data_service/data_service.proto b/pkg/pb/data_service/data_service.proto index 44219e5..a6b2b0f 100644 --- a/pkg/pb/data_service/data_service.proto +++ b/pkg/pb/data_service/data_service.proto @@ -22,7 +22,8 @@ message Identifier { message Message { Identifier identifier = 1; - string payload = 2; // JSON-encoded data + bytes payload = 2; // JSON-encoded data + string encoding = 3; // e.g., "json", "protobuf" } // Control Requests and Responses diff --git a/services/data_service/internal/domain/message.go b/services/data_service/internal/domain/message.go index c211306..c73dd04 100644 --- a/services/data_service/internal/domain/message.go +++ b/services/data_service/internal/domain/message.go @@ -1,6 +1,14 @@ package domain +type Encoding string + +const ( + EncodingJSON Encoding = "json" + EncodingProtobuf Encoding = "protobuf" +) + type Message struct { Identifier Identifier - Payload any + Payload []byte + Encoding Encoding } diff --git a/services/data_service/internal/provider/binance/futures_websocket.go b/services/data_service/internal/provider/binance/futures_websocket.go index 3461e59..4a3faa7 100644 --- a/services/data_service/internal/provider/binance/futures_websocket.go +++ b/services/data_service/internal/provider/binance/futures_websocket.go @@ -141,7 +141,8 @@ func (b *FuturesWebsocket) readLoop() { Provider: "binance_futures_websocket", Subject: container.Stream, }, - Payload: container.Data, + Payload: []byte(container.Data), + Encoding: domain.EncodingJSON, } select { diff --git a/services/data_service/internal/server/grpc_streaming_server.go b/services/data_service/internal/server/grpc_streaming_server.go index 416606d..225e2a4 100644 --- a/services/data_service/internal/server/grpc_streaming_server.go +++ b/services/data_service/internal/server/grpc_streaming_server.go @@ -45,7 +45,8 @@ func (s *GRPCStreamingServer) ConnectStream(req *pb.ConnectStreamRequest, stream Provider: msg.Identifier.Provider, Subject: msg.Identifier.Subject, }, - Payload: fmt.Sprintf("%v", msg.Payload), + Payload: msg.Payload, + Encoding: string(msg.Encoding), }) if err != nil {