diff --git a/Makefile b/Makefile index 8c514e7..7cd94cf 100644 --- a/Makefile +++ b/Makefile @@ -1,14 +1,11 @@ -PROTOC_GEN_GO := protoc-gen-go -PROTOC_GEN_GO_GRPC := protoc-gen-go-grpc PROTO_DIR := pkg/pb -PROTO_FILE := $(PROTO_DIR)/data_service/data.proto -OUT_DIR := $(PROTO_DIR) +PROTO_FILES := $(shell find $(PROTO_DIR) -name '*.proto') .PHONY: proto proto: @echo "Generating Go code from Protobuf..." protoc \ - --go_out=$(OUT_DIR) --go_opt=paths=source_relative \ - --go-grpc_out=$(OUT_DIR) --go-grpc_opt=paths=source_relative \ - $(PROTO_FILE) + --go_out=. --go_opt=paths=source_relative \ + --go-grpc_out=. --go-grpc_opt=paths=source_relative \ + $(PROTO_FILES) @echo "Protobuf generation complete." diff --git a/go.mod b/go.mod index 2d75f1e..ee52e3b 100644 --- a/go.mod +++ b/go.mod @@ -1,17 +1,17 @@ module gitlab.michelsen.id/phillmichelsen/tessera -go 1.24.4 +go 1.24.5 require ( github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 - google.golang.org/grpc v1.73.0 + google.golang.org/grpc v1.74.2 google.golang.org/protobuf v1.36.6 ) require ( - golang.org/x/net v0.41.0 // indirect - golang.org/x/sys v0.33.0 // indirect - golang.org/x/text v0.26.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 // indirect + golang.org/x/net v0.42.0 // indirect + golang.org/x/sys v0.34.0 // indirect + golang.org/x/text v0.27.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b // indirect ) diff --git a/go.sum b/go.sum index 09bdb1d..6a8987a 100644 --- a/go.sum +++ b/go.sum @@ -26,19 +26,29 @@ golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= +golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs= +golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= +golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= +golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4= +golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU= google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 h1:e0AIkUUhxyBKh6ssZNrAMeqhA7RKUj42346d1y02i2g= google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 h1:pFyd6EwwL2TqFf8emdthzeX+gZE1ElRq3iM8pui4KBY= google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b h1:zPKJod4w6F1+nRGDI9ubnXYhU9NSWoFAijkHkUXeTK8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= google.golang.org/grpc v1.73.0 h1:VIWSmpI2MegBtTuFt5/JWy2oXxtjJ/e89Z70ImfD2ok= google.golang.org/grpc v1.73.0/go.mod h1:50sbHOUqWoCQGI8V2HQLJM0B+LMlIUjNSZmow7EVBQc= +google.golang.org/grpc v1.74.2 h1:WoosgB65DlWVC9FqI82dGsZhWFNBSLjQ84bjROOpMu4= +google.golang.org/grpc v1.74.2/go.mod h1:CtQ+BGjaAIXHs/5YS3i473GqwBBa1zGQNevxdeBEXrM= google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= diff --git a/pkg/pb/data_service/data_service_streaming.pb.go b/pkg/pb/data_service/data_service.pb.go similarity index 56% rename from pkg/pb/data_service/data_service_streaming.pb.go rename to pkg/pb/data_service/data_service.pb.go index 488b834..a61b1fe 100644 --- a/pkg/pb/data_service/data_service_streaming.pb.go +++ b/pkg/pb/data_service/data_service.pb.go @@ -2,7 +2,7 @@ // versions: // protoc-gen-go v1.36.6 // protoc v6.31.1 -// source: pkg/pb/data_service/data_service_streaming.proto +// source: pkg/pb/data_service/data_service.proto package data_service @@ -21,6 +21,7 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +// Domain Models type Identifier struct { state protoimpl.MessageState `protogen:"open.v1"` Provider string `protobuf:"bytes,1,opt,name=provider,proto3" json:"provider,omitempty"` // e.g., "binance" @@ -31,7 +32,7 @@ type Identifier struct { func (x *Identifier) Reset() { *x = Identifier{} - mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[0] + mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -43,7 +44,7 @@ func (x *Identifier) String() string { func (*Identifier) ProtoMessage() {} func (x *Identifier) ProtoReflect() protoreflect.Message { - mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[0] + mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[0] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -56,7 +57,7 @@ func (x *Identifier) ProtoReflect() protoreflect.Message { // Deprecated: Use Identifier.ProtoReflect.Descriptor instead. func (*Identifier) Descriptor() ([]byte, []int) { - return file_pkg_pb_data_service_data_service_streaming_proto_rawDescGZIP(), []int{0} + return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{0} } func (x *Identifier) GetProvider() string { @@ -83,7 +84,7 @@ type Message struct { func (x *Message) Reset() { *x = Message{} - mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[1] + mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -95,7 +96,7 @@ func (x *Message) String() string { func (*Message) ProtoMessage() {} func (x *Message) ProtoReflect() protoreflect.Message { - mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[1] + mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[1] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -108,7 +109,7 @@ func (x *Message) ProtoReflect() protoreflect.Message { // Deprecated: Use Message.ProtoReflect.Descriptor instead. func (*Message) Descriptor() ([]byte, []int) { - return file_pkg_pb_data_service_data_service_streaming_proto_rawDescGZIP(), []int{1} + return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{1} } func (x *Message) GetIdentifier() *Identifier { @@ -125,16 +126,16 @@ func (x *Message) GetPayload() string { return "" } +// Control Requests and Responses type StartStreamRequest struct { state protoimpl.MessageState `protogen:"open.v1"` - Identifiers []*Identifier `protobuf:"bytes,1,rep,name=identifiers,proto3" json:"identifiers,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *StartStreamRequest) Reset() { *x = StartStreamRequest{} - mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[2] + mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -146,7 +147,7 @@ func (x *StartStreamRequest) String() string { func (*StartStreamRequest) ProtoMessage() {} func (x *StartStreamRequest) ProtoReflect() protoreflect.Message { - mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[2] + mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[2] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -159,14 +160,7 @@ func (x *StartStreamRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use StartStreamRequest.ProtoReflect.Descriptor instead. func (*StartStreamRequest) Descriptor() ([]byte, []int) { - return file_pkg_pb_data_service_data_service_streaming_proto_rawDescGZIP(), []int{2} -} - -func (x *StartStreamRequest) GetIdentifiers() []*Identifier { - if x != nil { - return x.Identifiers - } - return nil + return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{2} } type StartStreamResponse struct { @@ -178,7 +172,7 @@ type StartStreamResponse struct { func (x *StartStreamResponse) Reset() { *x = StartStreamResponse{} - mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[3] + mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -190,7 +184,7 @@ func (x *StartStreamResponse) String() string { func (*StartStreamResponse) ProtoMessage() {} func (x *StartStreamResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[3] + mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[3] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -203,7 +197,7 @@ func (x *StartStreamResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use StartStreamResponse.ProtoReflect.Descriptor instead. func (*StartStreamResponse) Descriptor() ([]byte, []int) { - return file_pkg_pb_data_service_data_service_streaming_proto_rawDescGZIP(), []int{3} + return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{3} } func (x *StartStreamResponse) GetStreamUuid() string { @@ -213,7 +207,7 @@ func (x *StartStreamResponse) GetStreamUuid() string { return "" } -type UpdateStreamRequest struct { +type ConfigureStreamRequest struct { state protoimpl.MessageState `protogen:"open.v1"` StreamUuid string `protobuf:"bytes,1,opt,name=stream_uuid,json=streamUuid,proto3" json:"stream_uuid,omitempty"` Identifiers []*Identifier `protobuf:"bytes,2,rep,name=identifiers,proto3" json:"identifiers,omitempty"` @@ -221,21 +215,21 @@ type UpdateStreamRequest struct { sizeCache protoimpl.SizeCache } -func (x *UpdateStreamRequest) Reset() { - *x = UpdateStreamRequest{} - mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[4] +func (x *ConfigureStreamRequest) Reset() { + *x = ConfigureStreamRequest{} + mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } -func (x *UpdateStreamRequest) String() string { +func (x *ConfigureStreamRequest) String() string { return protoimpl.X.MessageStringOf(x) } -func (*UpdateStreamRequest) ProtoMessage() {} +func (*ConfigureStreamRequest) ProtoMessage() {} -func (x *UpdateStreamRequest) ProtoReflect() protoreflect.Message { - mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[4] +func (x *ConfigureStreamRequest) ProtoReflect() protoreflect.Message { + mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[4] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -246,47 +240,46 @@ func (x *UpdateStreamRequest) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use UpdateStreamRequest.ProtoReflect.Descriptor instead. -func (*UpdateStreamRequest) Descriptor() ([]byte, []int) { - return file_pkg_pb_data_service_data_service_streaming_proto_rawDescGZIP(), []int{4} +// Deprecated: Use ConfigureStreamRequest.ProtoReflect.Descriptor instead. +func (*ConfigureStreamRequest) Descriptor() ([]byte, []int) { + return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{4} } -func (x *UpdateStreamRequest) GetStreamUuid() string { +func (x *ConfigureStreamRequest) GetStreamUuid() string { if x != nil { return x.StreamUuid } return "" } -func (x *UpdateStreamRequest) GetIdentifiers() []*Identifier { +func (x *ConfigureStreamRequest) GetIdentifiers() []*Identifier { if x != nil { return x.Identifiers } return nil } -type UpdateStreamResponse struct { +type ConfigureStreamResponse struct { state protoimpl.MessageState `protogen:"open.v1"` - Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } -func (x *UpdateStreamResponse) Reset() { - *x = UpdateStreamResponse{} - mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[5] +func (x *ConfigureStreamResponse) Reset() { + *x = ConfigureStreamResponse{} + mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } -func (x *UpdateStreamResponse) String() string { +func (x *ConfigureStreamResponse) String() string { return protoimpl.X.MessageStringOf(x) } -func (*UpdateStreamResponse) ProtoMessage() {} +func (*ConfigureStreamResponse) ProtoMessage() {} -func (x *UpdateStreamResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[5] +func (x *ConfigureStreamResponse) ProtoReflect() protoreflect.Message { + mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[5] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -297,16 +290,9 @@ func (x *UpdateStreamResponse) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use UpdateStreamResponse.ProtoReflect.Descriptor instead. -func (*UpdateStreamResponse) Descriptor() ([]byte, []int) { - return file_pkg_pb_data_service_data_service_streaming_proto_rawDescGZIP(), []int{5} -} - -func (x *UpdateStreamResponse) GetSuccess() bool { - if x != nil { - return x.Success - } - return false +// Deprecated: Use ConfigureStreamResponse.ProtoReflect.Descriptor instead. +func (*ConfigureStreamResponse) Descriptor() ([]byte, []int) { + return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{5} } type StopStreamRequest struct { @@ -318,7 +304,7 @@ type StopStreamRequest struct { func (x *StopStreamRequest) Reset() { *x = StopStreamRequest{} - mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[6] + mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -330,7 +316,7 @@ func (x *StopStreamRequest) String() string { func (*StopStreamRequest) ProtoMessage() {} func (x *StopStreamRequest) ProtoReflect() protoreflect.Message { - mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[6] + mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[6] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -343,7 +329,7 @@ func (x *StopStreamRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use StopStreamRequest.ProtoReflect.Descriptor instead. func (*StopStreamRequest) Descriptor() ([]byte, []int) { - return file_pkg_pb_data_service_data_service_streaming_proto_rawDescGZIP(), []int{6} + return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{6} } func (x *StopStreamRequest) GetStreamUuid() string { @@ -355,14 +341,13 @@ func (x *StopStreamRequest) GetStreamUuid() string { type StopStreamResponse struct { state protoimpl.MessageState `protogen:"open.v1"` - Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *StopStreamResponse) Reset() { *x = StopStreamResponse{} - mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[7] + mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -374,7 +359,7 @@ func (x *StopStreamResponse) String() string { func (*StopStreamResponse) ProtoMessage() {} func (x *StopStreamResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[7] + mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -387,16 +372,10 @@ func (x *StopStreamResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use StopStreamResponse.ProtoReflect.Descriptor instead. func (*StopStreamResponse) Descriptor() ([]byte, []int) { - return file_pkg_pb_data_service_data_service_streaming_proto_rawDescGZIP(), []int{7} -} - -func (x *StopStreamResponse) GetSuccess() bool { - if x != nil { - return x.Success - } - return false + return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{7} } +// Stream Requests and Responses type ConnectStreamRequest struct { state protoimpl.MessageState `protogen:"open.v1"` StreamUuid string `protobuf:"bytes,1,opt,name=stream_uuid,json=streamUuid,proto3" json:"stream_uuid,omitempty"` @@ -406,7 +385,7 @@ type ConnectStreamRequest struct { func (x *ConnectStreamRequest) Reset() { *x = ConnectStreamRequest{} - mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[8] + mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -418,7 +397,7 @@ func (x *ConnectStreamRequest) String() string { func (*ConnectStreamRequest) ProtoMessage() {} func (x *ConnectStreamRequest) ProtoReflect() protoreflect.Message { - mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[8] + mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[8] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -431,7 +410,7 @@ func (x *ConnectStreamRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use ConnectStreamRequest.ProtoReflect.Descriptor instead. func (*ConnectStreamRequest) Descriptor() ([]byte, []int) { - return file_pkg_pb_data_service_data_service_streaming_proto_rawDescGZIP(), []int{8} + return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{8} } func (x *ConnectStreamRequest) GetStreamUuid() string { @@ -441,11 +420,11 @@ func (x *ConnectStreamRequest) GetStreamUuid() string { return "" } -var File_pkg_pb_data_service_data_service_streaming_proto protoreflect.FileDescriptor +var File_pkg_pb_data_service_data_service_proto protoreflect.FileDescriptor -const file_pkg_pb_data_service_data_service_streaming_proto_rawDesc = "" + +const file_pkg_pb_data_service_data_service_proto_rawDesc = "" + "\n" + - "0pkg/pb/data_service/data_service_streaming.proto\x12\fdata_service\"B\n" + + "&pkg/pb/data_service/data_service.proto\x12\fdata_service\"B\n" + "\n" + "Identifier\x12\x1a\n" + "\bprovider\x18\x01 \x01(\tR\bprovider\x12\x18\n" + @@ -454,96 +433,93 @@ const file_pkg_pb_data_service_data_service_streaming_proto_rawDesc = "" + "\n" + "identifier\x18\x01 \x01(\v2\x18.data_service.IdentifierR\n" + "identifier\x12\x18\n" + - "\apayload\x18\x02 \x01(\tR\apayload\"P\n" + - "\x12StartStreamRequest\x12:\n" + - "\videntifiers\x18\x01 \x03(\v2\x18.data_service.IdentifierR\videntifiers\"6\n" + + "\apayload\x18\x02 \x01(\tR\apayload\"\x14\n" + + "\x12StartStreamRequest\"6\n" + "\x13StartStreamResponse\x12\x1f\n" + "\vstream_uuid\x18\x01 \x01(\tR\n" + - "streamUuid\"r\n" + - "\x13UpdateStreamRequest\x12\x1f\n" + + "streamUuid\"u\n" + + "\x16ConfigureStreamRequest\x12\x1f\n" + "\vstream_uuid\x18\x01 \x01(\tR\n" + "streamUuid\x12:\n" + - "\videntifiers\x18\x02 \x03(\v2\x18.data_service.IdentifierR\videntifiers\"0\n" + - "\x14UpdateStreamResponse\x12\x18\n" + - "\asuccess\x18\x01 \x01(\bR\asuccess\"4\n" + + "\videntifiers\x18\x02 \x03(\v2\x18.data_service.IdentifierR\videntifiers\"\x19\n" + + "\x17ConfigureStreamResponse\"4\n" + "\x11StopStreamRequest\x12\x1f\n" + "\vstream_uuid\x18\x01 \x01(\tR\n" + - "streamUuid\".\n" + - "\x12StopStreamResponse\x12\x18\n" + - "\asuccess\x18\x01 \x01(\bR\asuccess\"7\n" + + "streamUuid\"\x14\n" + + "\x12StopStreamResponse\"7\n" + "\x14ConnectStreamRequest\x12\x1f\n" + "\vstream_uuid\x18\x01 \x01(\tR\n" + - "streamUuid2\xe0\x02\n" + - "\x14DataServiceStreaming\x12R\n" + - "\vStartStream\x12 .data_service.StartStreamRequest\x1a!.data_service.StartStreamResponse\x12U\n" + - "\fUpdateStream\x12!.data_service.UpdateStreamRequest\x1a\".data_service.UpdateStreamResponse\x12O\n" + + "streamUuid2\x99\x02\n" + + "\x12DataServiceControl\x12R\n" + + "\vStartStream\x12 .data_service.StartStreamRequest\x1a!.data_service.StartStreamResponse\x12O\n" + "\n" + - "StopStream\x12\x1f.data_service.StopStreamRequest\x1a .data_service.StopStreamResponse\x12L\n" + + "StopStream\x12\x1f.data_service.StopStreamRequest\x1a .data_service.StopStreamResponse\x12^\n" + + "\x0fConfigureStream\x12$.data_service.ConfigureStreamRequest\x1a%.data_service.ConfigureStreamResponse2d\n" + + "\x14DataServiceStreaming\x12L\n" + "\rConnectStream\x12\".data_service.ConnectStreamRequest\x1a\x15.data_service.Message0\x01BMZKgitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service;data_serviceb\x06proto3" var ( - file_pkg_pb_data_service_data_service_streaming_proto_rawDescOnce sync.Once - file_pkg_pb_data_service_data_service_streaming_proto_rawDescData []byte + file_pkg_pb_data_service_data_service_proto_rawDescOnce sync.Once + file_pkg_pb_data_service_data_service_proto_rawDescData []byte ) -func file_pkg_pb_data_service_data_service_streaming_proto_rawDescGZIP() []byte { - file_pkg_pb_data_service_data_service_streaming_proto_rawDescOnce.Do(func() { - file_pkg_pb_data_service_data_service_streaming_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_pkg_pb_data_service_data_service_streaming_proto_rawDesc), len(file_pkg_pb_data_service_data_service_streaming_proto_rawDesc))) +func file_pkg_pb_data_service_data_service_proto_rawDescGZIP() []byte { + file_pkg_pb_data_service_data_service_proto_rawDescOnce.Do(func() { + file_pkg_pb_data_service_data_service_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_pkg_pb_data_service_data_service_proto_rawDesc), len(file_pkg_pb_data_service_data_service_proto_rawDesc))) }) - return file_pkg_pb_data_service_data_service_streaming_proto_rawDescData + return file_pkg_pb_data_service_data_service_proto_rawDescData } -var file_pkg_pb_data_service_data_service_streaming_proto_msgTypes = make([]protoimpl.MessageInfo, 9) -var file_pkg_pb_data_service_data_service_streaming_proto_goTypes = []any{ - (*Identifier)(nil), // 0: data_service.Identifier - (*Message)(nil), // 1: data_service.Message - (*StartStreamRequest)(nil), // 2: data_service.StartStreamRequest - (*StartStreamResponse)(nil), // 3: data_service.StartStreamResponse - (*UpdateStreamRequest)(nil), // 4: data_service.UpdateStreamRequest - (*UpdateStreamResponse)(nil), // 5: data_service.UpdateStreamResponse - (*StopStreamRequest)(nil), // 6: data_service.StopStreamRequest - (*StopStreamResponse)(nil), // 7: data_service.StopStreamResponse - (*ConnectStreamRequest)(nil), // 8: data_service.ConnectStreamRequest +var file_pkg_pb_data_service_data_service_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_pkg_pb_data_service_data_service_proto_goTypes = []any{ + (*Identifier)(nil), // 0: data_service.Identifier + (*Message)(nil), // 1: data_service.Message + (*StartStreamRequest)(nil), // 2: data_service.StartStreamRequest + (*StartStreamResponse)(nil), // 3: data_service.StartStreamResponse + (*ConfigureStreamRequest)(nil), // 4: data_service.ConfigureStreamRequest + (*ConfigureStreamResponse)(nil), // 5: data_service.ConfigureStreamResponse + (*StopStreamRequest)(nil), // 6: data_service.StopStreamRequest + (*StopStreamResponse)(nil), // 7: data_service.StopStreamResponse + (*ConnectStreamRequest)(nil), // 8: data_service.ConnectStreamRequest } -var file_pkg_pb_data_service_data_service_streaming_proto_depIdxs = []int32{ +var file_pkg_pb_data_service_data_service_proto_depIdxs = []int32{ 0, // 0: data_service.Message.identifier:type_name -> data_service.Identifier - 0, // 1: data_service.StartStreamRequest.identifiers:type_name -> data_service.Identifier - 0, // 2: data_service.UpdateStreamRequest.identifiers:type_name -> data_service.Identifier - 2, // 3: data_service.DataServiceStreaming.StartStream:input_type -> data_service.StartStreamRequest - 4, // 4: data_service.DataServiceStreaming.UpdateStream:input_type -> data_service.UpdateStreamRequest - 6, // 5: data_service.DataServiceStreaming.StopStream:input_type -> data_service.StopStreamRequest - 8, // 6: data_service.DataServiceStreaming.ConnectStream:input_type -> data_service.ConnectStreamRequest - 3, // 7: data_service.DataServiceStreaming.StartStream:output_type -> data_service.StartStreamResponse - 5, // 8: data_service.DataServiceStreaming.UpdateStream:output_type -> data_service.UpdateStreamResponse - 7, // 9: data_service.DataServiceStreaming.StopStream:output_type -> data_service.StopStreamResponse - 1, // 10: data_service.DataServiceStreaming.ConnectStream:output_type -> data_service.Message - 7, // [7:11] is the sub-list for method output_type - 3, // [3:7] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 0, // 1: data_service.ConfigureStreamRequest.identifiers:type_name -> data_service.Identifier + 2, // 2: data_service.DataServiceControl.StartStream:input_type -> data_service.StartStreamRequest + 6, // 3: data_service.DataServiceControl.StopStream:input_type -> data_service.StopStreamRequest + 4, // 4: data_service.DataServiceControl.ConfigureStream:input_type -> data_service.ConfigureStreamRequest + 8, // 5: data_service.DataServiceStreaming.ConnectStream:input_type -> data_service.ConnectStreamRequest + 3, // 6: data_service.DataServiceControl.StartStream:output_type -> data_service.StartStreamResponse + 7, // 7: data_service.DataServiceControl.StopStream:output_type -> data_service.StopStreamResponse + 5, // 8: data_service.DataServiceControl.ConfigureStream:output_type -> data_service.ConfigureStreamResponse + 1, // 9: data_service.DataServiceStreaming.ConnectStream:output_type -> data_service.Message + 6, // [6:10] is the sub-list for method output_type + 2, // [2:6] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name } -func init() { file_pkg_pb_data_service_data_service_streaming_proto_init() } -func file_pkg_pb_data_service_data_service_streaming_proto_init() { - if File_pkg_pb_data_service_data_service_streaming_proto != nil { +func init() { file_pkg_pb_data_service_data_service_proto_init() } +func file_pkg_pb_data_service_data_service_proto_init() { + if File_pkg_pb_data_service_data_service_proto != nil { return } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: unsafe.Slice(unsafe.StringData(file_pkg_pb_data_service_data_service_streaming_proto_rawDesc), len(file_pkg_pb_data_service_data_service_streaming_proto_rawDesc)), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_pkg_pb_data_service_data_service_proto_rawDesc), len(file_pkg_pb_data_service_data_service_proto_rawDesc)), NumEnums: 0, NumMessages: 9, NumExtensions: 0, - NumServices: 1, + NumServices: 2, }, - GoTypes: file_pkg_pb_data_service_data_service_streaming_proto_goTypes, - DependencyIndexes: file_pkg_pb_data_service_data_service_streaming_proto_depIdxs, - MessageInfos: file_pkg_pb_data_service_data_service_streaming_proto_msgTypes, + GoTypes: file_pkg_pb_data_service_data_service_proto_goTypes, + DependencyIndexes: file_pkg_pb_data_service_data_service_proto_depIdxs, + MessageInfos: file_pkg_pb_data_service_data_service_proto_msgTypes, }.Build() - File_pkg_pb_data_service_data_service_streaming_proto = out.File - file_pkg_pb_data_service_data_service_streaming_proto_goTypes = nil - file_pkg_pb_data_service_data_service_streaming_proto_depIdxs = nil + File_pkg_pb_data_service_data_service_proto = out.File + file_pkg_pb_data_service_data_service_proto_goTypes = nil + file_pkg_pb_data_service_data_service_proto_depIdxs = nil } diff --git a/pkg/pb/data_service/data_service_streaming.proto b/pkg/pb/data_service/data_service.proto similarity index 72% rename from pkg/pb/data_service/data_service_streaming.proto rename to pkg/pb/data_service/data_service.proto index 209cbca..44219e5 100644 --- a/pkg/pb/data_service/data_service_streaming.proto +++ b/pkg/pb/data_service/data_service.proto @@ -4,14 +4,17 @@ package data_service; option go_package = "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service;data_service"; -service DataServiceStreaming { +service DataServiceControl { rpc StartStream(StartStreamRequest) returns (StartStreamResponse); - rpc UpdateStream(UpdateStreamRequest) returns (UpdateStreamResponse); rpc StopStream(StopStreamRequest) returns (StopStreamResponse); + rpc ConfigureStream(ConfigureStreamRequest) returns (ConfigureStreamResponse); +} +service DataServiceStreaming { rpc ConnectStream(ConnectStreamRequest) returns (stream Message); } +// Domain Models message Identifier { string provider = 1; // e.g., "binance" string subject = 2; // e.g., "BTCUSDT" @@ -22,32 +25,27 @@ message Message { string payload = 2; // JSON-encoded data } -message StartStreamRequest { - repeated Identifier identifiers = 1; -} +// Control Requests and Responses +message StartStreamRequest {} message StartStreamResponse { string stream_uuid = 1; } -message UpdateStreamRequest { +message ConfigureStreamRequest { string stream_uuid = 1; repeated Identifier identifiers = 2; } -message UpdateStreamResponse { - bool success = 1; -} +message ConfigureStreamResponse {} message StopStreamRequest { string stream_uuid = 1; } -message StopStreamResponse { - bool success = 1; -} - +message StopStreamResponse {} +// Stream Requests and Responses message ConnectStreamRequest { string stream_uuid = 1; } diff --git a/pkg/pb/data_service/data_service_streaming_grpc.pb.go b/pkg/pb/data_service/data_service_grpc.pb.go similarity index 54% rename from pkg/pb/data_service/data_service_streaming_grpc.pb.go rename to pkg/pb/data_service/data_service_grpc.pb.go index b600064..bb4181c 100644 --- a/pkg/pb/data_service/data_service_streaming_grpc.pb.go +++ b/pkg/pb/data_service/data_service_grpc.pb.go @@ -2,7 +2,7 @@ // versions: // - protoc-gen-go-grpc v1.5.1 // - protoc v6.31.1 -// source: pkg/pb/data_service/data_service_streaming.proto +// source: pkg/pb/data_service/data_service.proto package data_service @@ -19,9 +19,184 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - DataServiceStreaming_StartStream_FullMethodName = "/data_service.DataServiceStreaming/StartStream" - DataServiceStreaming_UpdateStream_FullMethodName = "/data_service.DataServiceStreaming/UpdateStream" - DataServiceStreaming_StopStream_FullMethodName = "/data_service.DataServiceStreaming/StopStream" + DataServiceControl_StartStream_FullMethodName = "/data_service.DataServiceControl/StartStream" + DataServiceControl_StopStream_FullMethodName = "/data_service.DataServiceControl/StopStream" + DataServiceControl_ConfigureStream_FullMethodName = "/data_service.DataServiceControl/ConfigureStream" +) + +// DataServiceControlClient is the client API for DataServiceControl service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type DataServiceControlClient interface { + StartStream(ctx context.Context, in *StartStreamRequest, opts ...grpc.CallOption) (*StartStreamResponse, error) + StopStream(ctx context.Context, in *StopStreamRequest, opts ...grpc.CallOption) (*StopStreamResponse, error) + ConfigureStream(ctx context.Context, in *ConfigureStreamRequest, opts ...grpc.CallOption) (*ConfigureStreamResponse, error) +} + +type dataServiceControlClient struct { + cc grpc.ClientConnInterface +} + +func NewDataServiceControlClient(cc grpc.ClientConnInterface) DataServiceControlClient { + return &dataServiceControlClient{cc} +} + +func (c *dataServiceControlClient) StartStream(ctx context.Context, in *StartStreamRequest, opts ...grpc.CallOption) (*StartStreamResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(StartStreamResponse) + err := c.cc.Invoke(ctx, DataServiceControl_StartStream_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *dataServiceControlClient) StopStream(ctx context.Context, in *StopStreamRequest, opts ...grpc.CallOption) (*StopStreamResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(StopStreamResponse) + err := c.cc.Invoke(ctx, DataServiceControl_StopStream_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *dataServiceControlClient) ConfigureStream(ctx context.Context, in *ConfigureStreamRequest, opts ...grpc.CallOption) (*ConfigureStreamResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(ConfigureStreamResponse) + err := c.cc.Invoke(ctx, DataServiceControl_ConfigureStream_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// DataServiceControlServer is the server API for DataServiceControl service. +// All implementations must embed UnimplementedDataServiceControlServer +// for forward compatibility. +type DataServiceControlServer interface { + StartStream(context.Context, *StartStreamRequest) (*StartStreamResponse, error) + StopStream(context.Context, *StopStreamRequest) (*StopStreamResponse, error) + ConfigureStream(context.Context, *ConfigureStreamRequest) (*ConfigureStreamResponse, error) + mustEmbedUnimplementedDataServiceControlServer() +} + +// UnimplementedDataServiceControlServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedDataServiceControlServer struct{} + +func (UnimplementedDataServiceControlServer) StartStream(context.Context, *StartStreamRequest) (*StartStreamResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method StartStream not implemented") +} +func (UnimplementedDataServiceControlServer) StopStream(context.Context, *StopStreamRequest) (*StopStreamResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method StopStream not implemented") +} +func (UnimplementedDataServiceControlServer) ConfigureStream(context.Context, *ConfigureStreamRequest) (*ConfigureStreamResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method ConfigureStream not implemented") +} +func (UnimplementedDataServiceControlServer) mustEmbedUnimplementedDataServiceControlServer() {} +func (UnimplementedDataServiceControlServer) testEmbeddedByValue() {} + +// UnsafeDataServiceControlServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to DataServiceControlServer will +// result in compilation errors. +type UnsafeDataServiceControlServer interface { + mustEmbedUnimplementedDataServiceControlServer() +} + +func RegisterDataServiceControlServer(s grpc.ServiceRegistrar, srv DataServiceControlServer) { + // If the following call pancis, it indicates UnimplementedDataServiceControlServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&DataServiceControl_ServiceDesc, srv) +} + +func _DataServiceControl_StartStream_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(StartStreamRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DataServiceControlServer).StartStream(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: DataServiceControl_StartStream_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DataServiceControlServer).StartStream(ctx, req.(*StartStreamRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _DataServiceControl_StopStream_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(StopStreamRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DataServiceControlServer).StopStream(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: DataServiceControl_StopStream_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DataServiceControlServer).StopStream(ctx, req.(*StopStreamRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _DataServiceControl_ConfigureStream_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ConfigureStreamRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DataServiceControlServer).ConfigureStream(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: DataServiceControl_ConfigureStream_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DataServiceControlServer).ConfigureStream(ctx, req.(*ConfigureStreamRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// DataServiceControl_ServiceDesc is the grpc.ServiceDesc for DataServiceControl service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var DataServiceControl_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "data_service.DataServiceControl", + HandlerType: (*DataServiceControlServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "StartStream", + Handler: _DataServiceControl_StartStream_Handler, + }, + { + MethodName: "StopStream", + Handler: _DataServiceControl_StopStream_Handler, + }, + { + MethodName: "ConfigureStream", + Handler: _DataServiceControl_ConfigureStream_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "pkg/pb/data_service/data_service.proto", +} + +const ( DataServiceStreaming_ConnectStream_FullMethodName = "/data_service.DataServiceStreaming/ConnectStream" ) @@ -29,9 +204,6 @@ const ( // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type DataServiceStreamingClient interface { - StartStream(ctx context.Context, in *StartStreamRequest, opts ...grpc.CallOption) (*StartStreamResponse, error) - UpdateStream(ctx context.Context, in *UpdateStreamRequest, opts ...grpc.CallOption) (*UpdateStreamResponse, error) - StopStream(ctx context.Context, in *StopStreamRequest, opts ...grpc.CallOption) (*StopStreamResponse, error) ConnectStream(ctx context.Context, in *ConnectStreamRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Message], error) } @@ -43,36 +215,6 @@ func NewDataServiceStreamingClient(cc grpc.ClientConnInterface) DataServiceStrea return &dataServiceStreamingClient{cc} } -func (c *dataServiceStreamingClient) StartStream(ctx context.Context, in *StartStreamRequest, opts ...grpc.CallOption) (*StartStreamResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - out := new(StartStreamResponse) - err := c.cc.Invoke(ctx, DataServiceStreaming_StartStream_FullMethodName, in, out, cOpts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *dataServiceStreamingClient) UpdateStream(ctx context.Context, in *UpdateStreamRequest, opts ...grpc.CallOption) (*UpdateStreamResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - out := new(UpdateStreamResponse) - err := c.cc.Invoke(ctx, DataServiceStreaming_UpdateStream_FullMethodName, in, out, cOpts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *dataServiceStreamingClient) StopStream(ctx context.Context, in *StopStreamRequest, opts ...grpc.CallOption) (*StopStreamResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - out := new(StopStreamResponse) - err := c.cc.Invoke(ctx, DataServiceStreaming_StopStream_FullMethodName, in, out, cOpts...) - if err != nil { - return nil, err - } - return out, nil -} - func (c *dataServiceStreamingClient) ConnectStream(ctx context.Context, in *ConnectStreamRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Message], error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) stream, err := c.cc.NewStream(ctx, &DataServiceStreaming_ServiceDesc.Streams[0], DataServiceStreaming_ConnectStream_FullMethodName, cOpts...) @@ -96,9 +238,6 @@ type DataServiceStreaming_ConnectStreamClient = grpc.ServerStreamingClient[Messa // All implementations must embed UnimplementedDataServiceStreamingServer // for forward compatibility. type DataServiceStreamingServer interface { - StartStream(context.Context, *StartStreamRequest) (*StartStreamResponse, error) - UpdateStream(context.Context, *UpdateStreamRequest) (*UpdateStreamResponse, error) - StopStream(context.Context, *StopStreamRequest) (*StopStreamResponse, error) ConnectStream(*ConnectStreamRequest, grpc.ServerStreamingServer[Message]) error mustEmbedUnimplementedDataServiceStreamingServer() } @@ -110,15 +249,6 @@ type DataServiceStreamingServer interface { // pointer dereference when methods are called. type UnimplementedDataServiceStreamingServer struct{} -func (UnimplementedDataServiceStreamingServer) StartStream(context.Context, *StartStreamRequest) (*StartStreamResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method StartStream not implemented") -} -func (UnimplementedDataServiceStreamingServer) UpdateStream(context.Context, *UpdateStreamRequest) (*UpdateStreamResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method UpdateStream not implemented") -} -func (UnimplementedDataServiceStreamingServer) StopStream(context.Context, *StopStreamRequest) (*StopStreamResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method StopStream not implemented") -} func (UnimplementedDataServiceStreamingServer) ConnectStream(*ConnectStreamRequest, grpc.ServerStreamingServer[Message]) error { return status.Errorf(codes.Unimplemented, "method ConnectStream not implemented") } @@ -143,60 +273,6 @@ func RegisterDataServiceStreamingServer(s grpc.ServiceRegistrar, srv DataService s.RegisterService(&DataServiceStreaming_ServiceDesc, srv) } -func _DataServiceStreaming_StartStream_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(StartStreamRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(DataServiceStreamingServer).StartStream(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: DataServiceStreaming_StartStream_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(DataServiceStreamingServer).StartStream(ctx, req.(*StartStreamRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _DataServiceStreaming_UpdateStream_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(UpdateStreamRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(DataServiceStreamingServer).UpdateStream(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: DataServiceStreaming_UpdateStream_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(DataServiceStreamingServer).UpdateStream(ctx, req.(*UpdateStreamRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _DataServiceStreaming_StopStream_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(StopStreamRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(DataServiceStreamingServer).StopStream(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: DataServiceStreaming_StopStream_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(DataServiceStreamingServer).StopStream(ctx, req.(*StopStreamRequest)) - } - return interceptor(ctx, in, info, handler) -} - func _DataServiceStreaming_ConnectStream_Handler(srv interface{}, stream grpc.ServerStream) error { m := new(ConnectStreamRequest) if err := stream.RecvMsg(m); err != nil { @@ -214,20 +290,7 @@ type DataServiceStreaming_ConnectStreamServer = grpc.ServerStreamingServer[Messa var DataServiceStreaming_ServiceDesc = grpc.ServiceDesc{ ServiceName: "data_service.DataServiceStreaming", HandlerType: (*DataServiceStreamingServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "StartStream", - Handler: _DataServiceStreaming_StartStream_Handler, - }, - { - MethodName: "UpdateStream", - Handler: _DataServiceStreaming_UpdateStream_Handler, - }, - { - MethodName: "StopStream", - Handler: _DataServiceStreaming_StopStream_Handler, - }, - }, + Methods: []grpc.MethodDesc{}, Streams: []grpc.StreamDesc{ { StreamName: "ConnectStream", @@ -235,5 +298,5 @@ var DataServiceStreaming_ServiceDesc = grpc.ServiceDesc{ ServerStreams: true, }, }, - Metadata: "pkg/pb/data_service/data_service_streaming.proto", + Metadata: "pkg/pb/data_service/data_service.proto", } diff --git a/services/data_service/cmd/main.go b/services/data_service/cmd/main.go index 134601a..0b76daf 100644 --- a/services/data_service/cmd/main.go +++ b/services/data_service/cmd/main.go @@ -1,6 +1,10 @@ package main import ( + "fmt" + "log" + "net" + pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider/binance" @@ -8,41 +12,55 @@ import ( "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/server" "google.golang.org/grpc" "google.golang.org/grpc/reflection" - "log" - "net" ) func main() { + fmt.Println("Starting Data Service...") // Setup r := router.NewRouter() m := manager.NewManager(r) binanceFutures := binance.NewFuturesWebsocket() m.AddProvider("binance_futures_websocket", binanceFutures) - // gRPC Server - grpcServer := grpc.NewServer() + // gRPC Control Server + grpcControlServer := grpc.NewServer() go func() { - pb.RegisterDataServiceStreamingServer(grpcServer, server.NewGRPCStreamingServer(m)) - reflection.Register(grpcServer) + pb.RegisterDataServiceControlServer(grpcControlServer, server.NewGRPCControlServer(m)) + reflection.Register(grpcControlServer) grpcLis, err := net.Listen("tcp", ":50051") + if err != nil { + log.Fatalf("Failed to listen for gRPC control: %v", err) + } + log.Println("gRPC control server listening on :50051") + if err := grpcControlServer.Serve(grpcLis); err != nil { + log.Fatalf("Failed to serve gRPC control: %v", err) + } + }() + + // gRPC Streaming Server + grpcStreamingServer := grpc.NewServer() + go func() { + pb.RegisterDataServiceStreamingServer(grpcStreamingServer, server.NewGRPCStreamingServer(m)) + reflection.Register(grpcStreamingServer) + grpcLis, err := net.Listen("tcp", ":50052") if err != nil { log.Fatalf("Failed to listen for gRPC: %v", err) } - log.Println("gRPC server listening on :50051") - if err := grpcServer.Serve(grpcLis); err != nil { + log.Println("gRPC streaming server listening on :50052") + if err := grpcStreamingServer.Serve(grpcLis); err != nil { log.Fatalf("Failed to serve gRPC: %v", err) } }() - // Socket Server - socketServer := server.NewSocketStreamingServer(m) + // Socket Streaming Server + socketStreamingServer := server.NewSocketStreamingServer(m) go func() { socketLis, err := net.Listen("tcp", ":6000") if err != nil { log.Fatalf("Failed to listen for socket: %v", err) } log.Println("Socket server listening on :6000") - if err := socketServer.Serve(socketLis); err != nil { + if err := socketStreamingServer.Serve(socketLis); err != nil { log.Fatalf("Socket server error: %v", err) } }() diff --git a/services/data_service/internal/manager/manager.go b/services/data_service/internal/manager/manager.go index 9892644..c350087 100644 --- a/services/data_service/internal/manager/manager.go +++ b/services/data_service/internal/manager/manager.go @@ -2,12 +2,13 @@ package manager import ( "fmt" + "sync" + "time" + "github.com/google/uuid" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/router" - "sync" - "time" ) type Manager struct { @@ -38,59 +39,118 @@ func NewManager(router *router.Router) *Manager { } } -func (m *Manager) StartStream(ids []domain.Identifier) (uuid.UUID, error) { +func (m *Manager) StartStream() (uuid.UUID, error) { m.mu.Lock() defer m.mu.Unlock() - // Validate Identifiers, prevents unnecessary calls to providers - for _, id := range ids { - if id.Provider == "" || id.Subject == "" { - return uuid.Nil, fmt.Errorf("invalid identifier: %v", id) - } - if _, ok := m.providers[id.Provider]; !ok { - return uuid.Nil, fmt.Errorf("unknown provider: %s", id.Provider) - } - if !m.providers[id.Provider].IsValidSubject(id.Subject, false) { - return uuid.Nil, fmt.Errorf("invalid subject for provider %s: %s", id.Provider, id.Subject) - } - } - - // Provision provider streams - for _, id := range ids { - if _, ok := m.providerStreams[id]; ok { - continue // Skip if requested stream is already being provided - } - - ch := make(chan domain.Message, 64) - if err := m.providers[id.Provider].RequestStream(id.Subject, ch); err != nil { - return uuid.Nil, fmt.Errorf("provision %v: %w", id, err) - } - m.providerStreams[id] = ch - - // Start routine to route the provider stream to the router's input channel - go func(ch chan domain.Message) { - for msg := range ch { - m.router.IncomingChannel() <- msg - } - }(ch) - } - streamID := uuid.New() - m.clientStreams[streamID] = &ClientStream{ UUID: streamID, - Identifiers: ids, - OutChannel: nil, // Initially nil, will be set when connected + Identifiers: nil, // start empty + OutChannel: nil, // not yet connected Timer: time.AfterFunc(1*time.Minute, func() { fmt.Printf("stream %s expired due to inactivity\n", streamID) - m.StopStream(streamID) + err := m.StopStream(streamID) + if err != nil { + fmt.Printf("failed to stop stream after timeout: %v\n", err) + } }), } return streamID, nil } -func (m *Manager) StopStream(streamID uuid.UUID) { +func (m *Manager) ConfigureStream(streamID uuid.UUID, newIds []domain.Identifier) error { + m.mu.Lock() + defer m.mu.Unlock() + + stream, ok := m.clientStreams[streamID] + if !ok { + return fmt.Errorf("stream not found: %s", streamID) + } + + // Validate new identifiers. + for _, id := range newIds { + if id.Provider == "" || id.Subject == "" { + return fmt.Errorf("empty identifier: %v", id) + } + prov, exists := m.providers[id.Provider] + if !exists { + return fmt.Errorf("unknown provider: %s", id.Provider) + } + if !prov.IsValidSubject(id.Subject, false) { + return fmt.Errorf("invalid subject %q for provider %s", id.Subject, id.Provider) + } + } + + // Generate old and new sets of identifiers + oldSet := make(map[domain.Identifier]struct{}, len(stream.Identifiers)) + for _, id := range stream.Identifiers { + oldSet[id] = struct{}{} + } + newSet := make(map[domain.Identifier]struct{}, len(newIds)) + for _, id := range newIds { + newSet[id] = struct{}{} + } + + // Add identifiers that are in newIds but not in oldSet + for _, id := range newIds { + if _, seen := oldSet[id]; !seen { + // Provision the stream from the provider if needed + if _, ok := m.providerStreams[id]; !ok { + ch := make(chan domain.Message, 64) + if err := m.providers[id.Provider].RequestStream(id.Subject, ch); err != nil { + return fmt.Errorf("provision %v: %w", id, err) + } + m.providerStreams[id] = ch + + incomingChannel := m.router.IncomingChannel() + go func(c chan domain.Message) { + for msg := range c { + incomingChannel <- msg + } + }(ch) + } + + // Register the new identifier with the router, only if there's an active output channel (meaning the stream is connected) + if stream.OutChannel != nil { + m.router.RegisterRoute(id, stream.OutChannel) + } + } + } + + // Remove identifiers that are in oldSet but not in newSet + for _, oldId := range stream.Identifiers { + if _, keep := newSet[oldId]; !keep { + // Deregister the identifier from the router, only if there's an active output channel (meaning the stream is connected) + if stream.OutChannel != nil { + m.router.DeregisterRoute(oldId, stream.OutChannel) + } + } + } + + // Set the new identifiers for the stream + stream.Identifiers = newIds + + // Clean up provider streams that are no longer used + used := make(map[domain.Identifier]bool) + for _, cs := range m.clientStreams { + for _, id := range cs.Identifiers { + used[id] = true + } + } + for id, ch := range m.providerStreams { + if !used[id] { + m.providers[id.Provider].CancelStream(id.Subject) + close(ch) + delete(m.providerStreams, id) + } + } + + return nil +} + +func (m *Manager) StopStream(streamID uuid.UUID) error { m.DisconnectStream(streamID) m.mu.Lock() @@ -98,7 +158,7 @@ func (m *Manager) StopStream(streamID uuid.UUID) { stream, ok := m.clientStreams[streamID] if !ok { - return // Stream not found + return fmt.Errorf("stream not found: %s", streamID) } stream.Timer.Stop() @@ -121,6 +181,8 @@ func (m *Manager) StopStream(streamID uuid.UUID) { delete(m.providerStreams, id) } } + + return nil } func (m *Manager) ConnectStream(streamID uuid.UUID) (<-chan domain.Message, error) { @@ -172,7 +234,10 @@ func (m *Manager) DisconnectStream(streamID uuid.UUID) { // Set up the expiry timer stream.Timer = time.AfterFunc(1*time.Minute, func() { fmt.Printf("stream %s expired due to inactivity\n", streamID) - m.StopStream(streamID) + err := m.StopStream(streamID) + if err != nil { + fmt.Printf("failed to stop stream after disconnect: %v\n", err) + } }) } diff --git a/services/data_service/internal/provider/binance/futures_websocket.go b/services/data_service/internal/provider/binance/futures_websocket.go index f6683da..3461e59 100644 --- a/services/data_service/internal/provider/binance/futures_websocket.go +++ b/services/data_service/internal/provider/binance/futures_websocket.go @@ -99,7 +99,7 @@ func (b *FuturesWebsocket) IsStreamActive(subject string) bool { return ok } -func (b *FuturesWebsocket) Fetch(subject string) (domain.Message, error) { +func (b *FuturesWebsocket) Fetch(_ string) (domain.Message, error) { return domain.Message{}, fmt.Errorf("not supported: websocket provider does not implement fetch") } diff --git a/services/data_service/internal/router/router.go b/services/data_service/internal/router/router.go index 29820fc..80b2111 100644 --- a/services/data_service/internal/router/router.go +++ b/services/data_service/internal/router/router.go @@ -2,9 +2,9 @@ package router import ( "fmt" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" "sync" - "time" + + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" ) type Router struct { @@ -15,7 +15,7 @@ type Router struct { func NewRouter() *Router { return &Router{ - incoming: make(chan domain.Message, 64), // Buffered channel for incoming messages + incoming: make(chan domain.Message, 512), // Buffered channel for incoming messages routes: make(map[domain.Identifier][]chan<- domain.Message), } } @@ -26,7 +26,6 @@ func (r *Router) IncomingChannel() chan<- domain.Message { func (r *Router) Run() { for msg := range r.incoming { - startTime := time.Now() r.mu.RLock() channels := r.routes[msg.Identifier] @@ -38,7 +37,6 @@ func (r *Router) Run() { } } r.mu.RUnlock() - fmt.Printf("Message routed to %d channels in %v\n", len(channels), time.Since(startTime)) } } diff --git a/services/data_service/internal/server/gprc_control_server.go b/services/data_service/internal/server/gprc_control_server.go new file mode 100644 index 0000000..5dd8dd6 --- /dev/null +++ b/services/data_service/internal/server/gprc_control_server.go @@ -0,0 +1,69 @@ +package server + +import ( + "context" + "fmt" + + "github.com/google/uuid" + pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service" + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type GRPCControlServer struct { + pb.UnimplementedDataServiceControlServer + manager *manager.Manager +} + +func NewGRPCControlServer(m *manager.Manager) *GRPCControlServer { + return &GRPCControlServer{ + manager: m, + } +} + +func (s *GRPCControlServer) StartStream(_ context.Context, _ *pb.StartStreamRequest) (*pb.StartStreamResponse, error) { + streamID, err := s.manager.StartStream() + if err != nil { + return nil, fmt.Errorf("failed to start stream: %w", err) + } + + return &pb.StartStreamResponse{StreamUuid: streamID.String()}, nil +} + +func (s *GRPCControlServer) ConfigureStream(_ context.Context, req *pb.ConfigureStreamRequest) (*pb.ConfigureStreamResponse, error) { + streamID, err := uuid.Parse(req.StreamUuid) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "invalid stream_uuid %q: %v", req.StreamUuid, err) + } + + // Transform identifiers from protobuf to domain format + var ids []domain.Identifier + for _, i := range req.Identifiers { + ids = append(ids, domain.Identifier{ + Provider: i.Provider, + Subject: i.Subject, + }) + } + + if err := s.manager.ConfigureStream(streamID, ids); err != nil { + return nil, status.Errorf(codes.InvalidArgument, "configure failed: %v", err) + } + + return &pb.ConfigureStreamResponse{}, nil +} + +func (s *GRPCControlServer) StopStream(_ context.Context, req *pb.StopStreamRequest) (*pb.StopStreamResponse, error) { + streamID, err := uuid.Parse(req.StreamUuid) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "invalid stream_uuid %q: %v", req.StreamUuid, err) + } + + err = s.manager.StopStream(streamID) // Should only error if the stream doesn't exist + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to stop stream: %v", err) + } + + return &pb.StopStreamResponse{}, nil +} diff --git a/services/data_service/internal/server/grpc_streaming_server.go b/services/data_service/internal/server/grpc_streaming_server.go index 58fa415..416606d 100644 --- a/services/data_service/internal/server/grpc_streaming_server.go +++ b/services/data_service/internal/server/grpc_streaming_server.go @@ -1,11 +1,10 @@ package server import ( - "context" "fmt" + "github.com/google/uuid" pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager" ) @@ -20,23 +19,6 @@ func NewGRPCStreamingServer(m *manager.Manager) *GRPCStreamingServer { } } -func (s *GRPCStreamingServer) StartStream(ctx context.Context, req *pb.StartStreamRequest) (*pb.StartStreamResponse, error) { - var ids []domain.Identifier - for _, id := range req.Identifiers { - ids = append(ids, domain.Identifier{ - Provider: id.Provider, - Subject: id.Subject, - }) - } - - streamID, err := s.manager.StartStream(ids) - if err != nil { - return nil, fmt.Errorf("failed to start stream: %w", err) - } - - return &pb.StartStreamResponse{StreamUuid: streamID.String()}, nil -} - func (s *GRPCStreamingServer) ConnectStream(req *pb.ConnectStreamRequest, stream pb.DataServiceStreaming_ConnectStreamServer) error { streamUUID, err := uuid.Parse(req.StreamUuid) if err != nil { @@ -63,7 +45,7 @@ func (s *GRPCStreamingServer) ConnectStream(req *pb.ConnectStreamRequest, stream Provider: msg.Identifier.Provider, Subject: msg.Identifier.Subject, }, - Payload: fmt.Sprintf("%s", msg.Payload), + Payload: fmt.Sprintf("%v", msg.Payload), }) if err != nil { diff --git a/services/data_service/internal/server/socket_streaming_server.go b/services/data_service/internal/server/socket_streaming_server.go index 656c089..fd166f8 100644 --- a/services/data_service/internal/server/socket_streaming_server.go +++ b/services/data_service/internal/server/socket_streaming_server.go @@ -4,11 +4,12 @@ import ( "bufio" "encoding/json" "fmt" - "github.com/google/uuid" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager" "io" "net" "strings" + + "github.com/google/uuid" + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager" ) type SocketStreamingServer struct { @@ -34,7 +35,14 @@ func (s *SocketStreamingServer) Serve(lis net.Listener) error { } func (s *SocketStreamingServer) handleConnection(conn net.Conn) { - defer conn.Close() + defer func(conn net.Conn) { + err := conn.Close() + if err != nil { + fmt.Printf("Failed to close connection: %v\n", err) + } else { + fmt.Println("Connection closed") + } + }(conn) reader := bufio.NewReader(conn) raw, err := reader.ReadString('\n')