Update Message structure: change Payload type to bytes and add Encoding field

This commit is contained in:
2025-08-09 13:45:45 +00:00
parent 20c1bd1a33
commit d1369643d2
5 changed files with 28 additions and 8 deletions

View File

@@ -77,7 +77,8 @@ func (x *Identifier) GetSubject() string {
type Message struct { type Message struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState `protogen:"open.v1"`
Identifier *Identifier `protobuf:"bytes,1,opt,name=identifier,proto3" json:"identifier,omitempty"` Identifier *Identifier `protobuf:"bytes,1,opt,name=identifier,proto3" json:"identifier,omitempty"`
Payload string `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` // JSON-encoded data Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` // JSON-encoded data
Encoding string `protobuf:"bytes,3,opt,name=encoding,proto3" json:"encoding,omitempty"` // e.g., "json", "protobuf"
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
} }
@@ -119,10 +120,17 @@ func (x *Message) GetIdentifier() *Identifier {
return nil return nil
} }
func (x *Message) GetPayload() string { func (x *Message) GetPayload() []byte {
if x != nil { if x != nil {
return x.Payload return x.Payload
} }
return nil
}
func (x *Message) GetEncoding() string {
if x != nil {
return x.Encoding
}
return "" return ""
} }
@@ -428,12 +436,13 @@ const file_pkg_pb_data_service_data_service_proto_rawDesc = "" +
"\n" + "\n" +
"Identifier\x12\x1a\n" + "Identifier\x12\x1a\n" +
"\bprovider\x18\x01 \x01(\tR\bprovider\x12\x18\n" + "\bprovider\x18\x01 \x01(\tR\bprovider\x12\x18\n" +
"\asubject\x18\x02 \x01(\tR\asubject\"]\n" + "\asubject\x18\x02 \x01(\tR\asubject\"y\n" +
"\aMessage\x128\n" + "\aMessage\x128\n" +
"\n" + "\n" +
"identifier\x18\x01 \x01(\v2\x18.data_service.IdentifierR\n" + "identifier\x18\x01 \x01(\v2\x18.data_service.IdentifierR\n" +
"identifier\x12\x18\n" + "identifier\x12\x18\n" +
"\apayload\x18\x02 \x01(\tR\apayload\"\x14\n" + "\apayload\x18\x02 \x01(\fR\apayload\x12\x1a\n" +
"\bencoding\x18\x03 \x01(\tR\bencoding\"\x14\n" +
"\x12StartStreamRequest\"6\n" + "\x12StartStreamRequest\"6\n" +
"\x13StartStreamResponse\x12\x1f\n" + "\x13StartStreamResponse\x12\x1f\n" +
"\vstream_uuid\x18\x01 \x01(\tR\n" + "\vstream_uuid\x18\x01 \x01(\tR\n" +

View File

@@ -22,7 +22,8 @@ message Identifier {
message Message { message Message {
Identifier identifier = 1; Identifier identifier = 1;
string payload = 2; // JSON-encoded data bytes payload = 2; // JSON-encoded data
string encoding = 3; // e.g., "json", "protobuf"
} }
// Control Requests and Responses // Control Requests and Responses

View File

@@ -1,6 +1,14 @@
package domain package domain
type Encoding string
const (
EncodingJSON Encoding = "json"
EncodingProtobuf Encoding = "protobuf"
)
type Message struct { type Message struct {
Identifier Identifier Identifier Identifier
Payload any Payload []byte
Encoding Encoding
} }

View File

@@ -141,7 +141,8 @@ func (b *FuturesWebsocket) readLoop() {
Provider: "binance_futures_websocket", Provider: "binance_futures_websocket",
Subject: container.Stream, Subject: container.Stream,
}, },
Payload: container.Data, Payload: []byte(container.Data),
Encoding: domain.EncodingJSON,
} }
select { select {

View File

@@ -45,7 +45,8 @@ func (s *GRPCStreamingServer) ConnectStream(req *pb.ConnectStreamRequest, stream
Provider: msg.Identifier.Provider, Provider: msg.Identifier.Provider,
Subject: msg.Identifier.Subject, Subject: msg.Identifier.Subject,
}, },
Payload: fmt.Sprintf("%v", msg.Payload), Payload: msg.Payload,
Encoding: string(msg.Encoding),
}) })
if err != nil { if err != nil {