From b9cd25e9b4ba967bbefd0b419120cd7a6819a973 Mon Sep 17 00:00:00 2001 From: Phillip Michelsen Date: Fri, 27 Jun 2025 18:24:10 +0800 Subject: [PATCH] Major update to the data service. Added gRPC and socket servers. Switched to using only a single go.mod at the root. --- go.mod | 17 + go.sum | 36 ++ go.work | 7 - pkg/go.mod | 3 - .../data_service/data_service_streaming.pb.go | 549 ++++++++++++++++++ .../data_service/data_service_streaming.proto | 53 ++ .../data_service_streaming_grpc.pb.go | 239 ++++++++ pkg/pb/marketdata/marketdata.proto | 0 .../cmd/main.go | 0 .../service.go | 0 services/data_service/cmd/main.go | 54 ++ .../internal/config/config.go | 0 .../internal/domain}/identifier.go | 2 +- .../data_service/internal/domain/message.go | 6 + .../data_service/internal/manager/manager.go | 196 +++++++ .../provider/binance/futures_websocket.go | 153 +++++ .../internal/provider/provider.go | 8 +- .../data_service/internal/router/router.go | 67 +++ .../internal/server/grpc_streaming_server.go | 74 +++ .../server/socket_streaming_server.go | 83 +++ services/marketdata/go.mod | 5 - services/marketdata/go.sum | 2 - .../marketdata/internal/manager/manager.go | 187 ------ .../internal/provider/binance/spot_fix.go | 1 - .../internal/provider/binance/spot_rest.go | 1 - .../provider/binance/spot_websocket.go | 1 - services/marketdata/internal/router/router.go | 59 -- .../internal/transport/grpc/server.go | 1 - services/orchestrator/go.mod | 3 - services/portfolio/cmd/main.go | 1 - .../cmd/main.go | 0 .../service.go | 0 32 files changed, 1532 insertions(+), 276 deletions(-) create mode 100644 go.mod create mode 100644 go.sum delete mode 100644 go.work delete mode 100644 pkg/go.mod create mode 100644 pkg/pb/data_service/data_service_streaming.pb.go create mode 100644 pkg/pb/data_service/data_service_streaming.proto create mode 100644 pkg/pb/data_service/data_service_streaming_grpc.pb.go delete mode 100644 pkg/pb/marketdata/marketdata.proto rename services/{marketdata => controller_service}/cmd/main.go (100%) rename services/{orchestrator => controller_service}/service.go (100%) create mode 100644 services/data_service/cmd/main.go rename services/{marketdata => data_service}/internal/config/config.go (100%) rename services/{marketdata/internal/identifier => data_service/internal/domain}/identifier.go (76%) create mode 100644 services/data_service/internal/domain/message.go create mode 100644 services/data_service/internal/manager/manager.go create mode 100644 services/data_service/internal/provider/binance/futures_websocket.go rename services/{marketdata => data_service}/internal/provider/provider.go (52%) create mode 100644 services/data_service/internal/router/router.go create mode 100644 services/data_service/internal/server/grpc_streaming_server.go create mode 100644 services/data_service/internal/server/socket_streaming_server.go delete mode 100644 services/marketdata/go.mod delete mode 100644 services/marketdata/go.sum delete mode 100644 services/marketdata/internal/manager/manager.go delete mode 100644 services/marketdata/internal/provider/binance/spot_fix.go delete mode 100644 services/marketdata/internal/provider/binance/spot_rest.go delete mode 100644 services/marketdata/internal/provider/binance/spot_websocket.go delete mode 100644 services/marketdata/internal/router/router.go delete mode 100644 services/marketdata/internal/transport/grpc/server.go delete mode 100644 services/orchestrator/go.mod delete mode 100644 services/portfolio/cmd/main.go rename services/{orchestrator => portfolio_service}/cmd/main.go (100%) rename services/{portfolio => portfolio_service}/service.go (100%) diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..72d38e8 --- /dev/null +++ b/go.mod @@ -0,0 +1,17 @@ +module gitlab.michelsen.id/phillmichelsen/tessera + +go 1.24.4 + +require ( + github.com/google/uuid v1.6.0 + github.com/gorilla/websocket v1.5.3 + google.golang.org/grpc v1.73.0 + google.golang.org/protobuf v1.36.6 +) + +require ( + golang.org/x/net v0.38.0 // indirect + golang.org/x/sys v0.31.0 // indirect + golang.org/x/text v0.23.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..199b832 --- /dev/null +++ b/go.sum @@ -0,0 +1,36 @@ +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ= +go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y= +go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M= +go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE= +go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY= +go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg= +go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o= +go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w= +go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs= +go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc= +golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= +golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= +golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 h1:e0AIkUUhxyBKh6ssZNrAMeqhA7RKUj42346d1y02i2g= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= +google.golang.org/grpc v1.73.0 h1:VIWSmpI2MegBtTuFt5/JWy2oXxtjJ/e89Z70ImfD2ok= +google.golang.org/grpc v1.73.0/go.mod h1:50sbHOUqWoCQGI8V2HQLJM0B+LMlIUjNSZmow7EVBQc= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= diff --git a/go.work b/go.work deleted file mode 100644 index 7d9dc49..0000000 --- a/go.work +++ /dev/null @@ -1,7 +0,0 @@ -go 1.24.2 - -use ( - ./pkg - ./services/marketdata - ./services/orchestrator -) diff --git a/pkg/go.mod b/pkg/go.mod deleted file mode 100644 index 095bbe8..0000000 --- a/pkg/go.mod +++ /dev/null @@ -1,3 +0,0 @@ -module gitlab.michelsen.id/phillmichelsen/tessera/pkg - -go 1.24.2 diff --git a/pkg/pb/data_service/data_service_streaming.pb.go b/pkg/pb/data_service/data_service_streaming.pb.go new file mode 100644 index 0000000..488b834 --- /dev/null +++ b/pkg/pb/data_service/data_service_streaming.pb.go @@ -0,0 +1,549 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.6 +// protoc v6.31.1 +// source: pkg/pb/data_service/data_service_streaming.proto + +package data_service + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Identifier struct { + state protoimpl.MessageState `protogen:"open.v1"` + Provider string `protobuf:"bytes,1,opt,name=provider,proto3" json:"provider,omitempty"` // e.g., "binance" + Subject string `protobuf:"bytes,2,opt,name=subject,proto3" json:"subject,omitempty"` // e.g., "BTCUSDT" + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Identifier) Reset() { + *x = Identifier{} + mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Identifier) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Identifier) ProtoMessage() {} + +func (x *Identifier) ProtoReflect() protoreflect.Message { + mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Identifier.ProtoReflect.Descriptor instead. +func (*Identifier) Descriptor() ([]byte, []int) { + return file_pkg_pb_data_service_data_service_streaming_proto_rawDescGZIP(), []int{0} +} + +func (x *Identifier) GetProvider() string { + if x != nil { + return x.Provider + } + return "" +} + +func (x *Identifier) GetSubject() string { + if x != nil { + return x.Subject + } + return "" +} + +type Message struct { + state protoimpl.MessageState `protogen:"open.v1"` + Identifier *Identifier `protobuf:"bytes,1,opt,name=identifier,proto3" json:"identifier,omitempty"` + Payload string `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` // JSON-encoded data + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Message) Reset() { + *x = Message{} + mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Message) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Message) ProtoMessage() {} + +func (x *Message) ProtoReflect() protoreflect.Message { + mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Message.ProtoReflect.Descriptor instead. +func (*Message) Descriptor() ([]byte, []int) { + return file_pkg_pb_data_service_data_service_streaming_proto_rawDescGZIP(), []int{1} +} + +func (x *Message) GetIdentifier() *Identifier { + if x != nil { + return x.Identifier + } + return nil +} + +func (x *Message) GetPayload() string { + if x != nil { + return x.Payload + } + return "" +} + +type StartStreamRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Identifiers []*Identifier `protobuf:"bytes,1,rep,name=identifiers,proto3" json:"identifiers,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *StartStreamRequest) Reset() { + *x = StartStreamRequest{} + mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StartStreamRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StartStreamRequest) ProtoMessage() {} + +func (x *StartStreamRequest) ProtoReflect() protoreflect.Message { + mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StartStreamRequest.ProtoReflect.Descriptor instead. +func (*StartStreamRequest) Descriptor() ([]byte, []int) { + return file_pkg_pb_data_service_data_service_streaming_proto_rawDescGZIP(), []int{2} +} + +func (x *StartStreamRequest) GetIdentifiers() []*Identifier { + if x != nil { + return x.Identifiers + } + return nil +} + +type StartStreamResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + StreamUuid string `protobuf:"bytes,1,opt,name=stream_uuid,json=streamUuid,proto3" json:"stream_uuid,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *StartStreamResponse) Reset() { + *x = StartStreamResponse{} + mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StartStreamResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StartStreamResponse) ProtoMessage() {} + +func (x *StartStreamResponse) ProtoReflect() protoreflect.Message { + mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StartStreamResponse.ProtoReflect.Descriptor instead. +func (*StartStreamResponse) Descriptor() ([]byte, []int) { + return file_pkg_pb_data_service_data_service_streaming_proto_rawDescGZIP(), []int{3} +} + +func (x *StartStreamResponse) GetStreamUuid() string { + if x != nil { + return x.StreamUuid + } + return "" +} + +type UpdateStreamRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + StreamUuid string `protobuf:"bytes,1,opt,name=stream_uuid,json=streamUuid,proto3" json:"stream_uuid,omitempty"` + Identifiers []*Identifier `protobuf:"bytes,2,rep,name=identifiers,proto3" json:"identifiers,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *UpdateStreamRequest) Reset() { + *x = UpdateStreamRequest{} + mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *UpdateStreamRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UpdateStreamRequest) ProtoMessage() {} + +func (x *UpdateStreamRequest) ProtoReflect() protoreflect.Message { + mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UpdateStreamRequest.ProtoReflect.Descriptor instead. +func (*UpdateStreamRequest) Descriptor() ([]byte, []int) { + return file_pkg_pb_data_service_data_service_streaming_proto_rawDescGZIP(), []int{4} +} + +func (x *UpdateStreamRequest) GetStreamUuid() string { + if x != nil { + return x.StreamUuid + } + return "" +} + +func (x *UpdateStreamRequest) GetIdentifiers() []*Identifier { + if x != nil { + return x.Identifiers + } + return nil +} + +type UpdateStreamResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *UpdateStreamResponse) Reset() { + *x = UpdateStreamResponse{} + mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *UpdateStreamResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UpdateStreamResponse) ProtoMessage() {} + +func (x *UpdateStreamResponse) ProtoReflect() protoreflect.Message { + mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UpdateStreamResponse.ProtoReflect.Descriptor instead. +func (*UpdateStreamResponse) Descriptor() ([]byte, []int) { + return file_pkg_pb_data_service_data_service_streaming_proto_rawDescGZIP(), []int{5} +} + +func (x *UpdateStreamResponse) GetSuccess() bool { + if x != nil { + return x.Success + } + return false +} + +type StopStreamRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + StreamUuid string `protobuf:"bytes,1,opt,name=stream_uuid,json=streamUuid,proto3" json:"stream_uuid,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *StopStreamRequest) Reset() { + *x = StopStreamRequest{} + mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StopStreamRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StopStreamRequest) ProtoMessage() {} + +func (x *StopStreamRequest) ProtoReflect() protoreflect.Message { + mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StopStreamRequest.ProtoReflect.Descriptor instead. +func (*StopStreamRequest) Descriptor() ([]byte, []int) { + return file_pkg_pb_data_service_data_service_streaming_proto_rawDescGZIP(), []int{6} +} + +func (x *StopStreamRequest) GetStreamUuid() string { + if x != nil { + return x.StreamUuid + } + return "" +} + +type StopStreamResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *StopStreamResponse) Reset() { + *x = StopStreamResponse{} + mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StopStreamResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StopStreamResponse) ProtoMessage() {} + +func (x *StopStreamResponse) ProtoReflect() protoreflect.Message { + mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[7] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StopStreamResponse.ProtoReflect.Descriptor instead. +func (*StopStreamResponse) Descriptor() ([]byte, []int) { + return file_pkg_pb_data_service_data_service_streaming_proto_rawDescGZIP(), []int{7} +} + +func (x *StopStreamResponse) GetSuccess() bool { + if x != nil { + return x.Success + } + return false +} + +type ConnectStreamRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + StreamUuid string `protobuf:"bytes,1,opt,name=stream_uuid,json=streamUuid,proto3" json:"stream_uuid,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ConnectStreamRequest) Reset() { + *x = ConnectStreamRequest{} + mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ConnectStreamRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ConnectStreamRequest) ProtoMessage() {} + +func (x *ConnectStreamRequest) ProtoReflect() protoreflect.Message { + mi := &file_pkg_pb_data_service_data_service_streaming_proto_msgTypes[8] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ConnectStreamRequest.ProtoReflect.Descriptor instead. +func (*ConnectStreamRequest) Descriptor() ([]byte, []int) { + return file_pkg_pb_data_service_data_service_streaming_proto_rawDescGZIP(), []int{8} +} + +func (x *ConnectStreamRequest) GetStreamUuid() string { + if x != nil { + return x.StreamUuid + } + return "" +} + +var File_pkg_pb_data_service_data_service_streaming_proto protoreflect.FileDescriptor + +const file_pkg_pb_data_service_data_service_streaming_proto_rawDesc = "" + + "\n" + + "0pkg/pb/data_service/data_service_streaming.proto\x12\fdata_service\"B\n" + + "\n" + + "Identifier\x12\x1a\n" + + "\bprovider\x18\x01 \x01(\tR\bprovider\x12\x18\n" + + "\asubject\x18\x02 \x01(\tR\asubject\"]\n" + + "\aMessage\x128\n" + + "\n" + + "identifier\x18\x01 \x01(\v2\x18.data_service.IdentifierR\n" + + "identifier\x12\x18\n" + + "\apayload\x18\x02 \x01(\tR\apayload\"P\n" + + "\x12StartStreamRequest\x12:\n" + + "\videntifiers\x18\x01 \x03(\v2\x18.data_service.IdentifierR\videntifiers\"6\n" + + "\x13StartStreamResponse\x12\x1f\n" + + "\vstream_uuid\x18\x01 \x01(\tR\n" + + "streamUuid\"r\n" + + "\x13UpdateStreamRequest\x12\x1f\n" + + "\vstream_uuid\x18\x01 \x01(\tR\n" + + "streamUuid\x12:\n" + + "\videntifiers\x18\x02 \x03(\v2\x18.data_service.IdentifierR\videntifiers\"0\n" + + "\x14UpdateStreamResponse\x12\x18\n" + + "\asuccess\x18\x01 \x01(\bR\asuccess\"4\n" + + "\x11StopStreamRequest\x12\x1f\n" + + "\vstream_uuid\x18\x01 \x01(\tR\n" + + "streamUuid\".\n" + + "\x12StopStreamResponse\x12\x18\n" + + "\asuccess\x18\x01 \x01(\bR\asuccess\"7\n" + + "\x14ConnectStreamRequest\x12\x1f\n" + + "\vstream_uuid\x18\x01 \x01(\tR\n" + + "streamUuid2\xe0\x02\n" + + "\x14DataServiceStreaming\x12R\n" + + "\vStartStream\x12 .data_service.StartStreamRequest\x1a!.data_service.StartStreamResponse\x12U\n" + + "\fUpdateStream\x12!.data_service.UpdateStreamRequest\x1a\".data_service.UpdateStreamResponse\x12O\n" + + "\n" + + "StopStream\x12\x1f.data_service.StopStreamRequest\x1a .data_service.StopStreamResponse\x12L\n" + + "\rConnectStream\x12\".data_service.ConnectStreamRequest\x1a\x15.data_service.Message0\x01BMZKgitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service;data_serviceb\x06proto3" + +var ( + file_pkg_pb_data_service_data_service_streaming_proto_rawDescOnce sync.Once + file_pkg_pb_data_service_data_service_streaming_proto_rawDescData []byte +) + +func file_pkg_pb_data_service_data_service_streaming_proto_rawDescGZIP() []byte { + file_pkg_pb_data_service_data_service_streaming_proto_rawDescOnce.Do(func() { + file_pkg_pb_data_service_data_service_streaming_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_pkg_pb_data_service_data_service_streaming_proto_rawDesc), len(file_pkg_pb_data_service_data_service_streaming_proto_rawDesc))) + }) + return file_pkg_pb_data_service_data_service_streaming_proto_rawDescData +} + +var file_pkg_pb_data_service_data_service_streaming_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_pkg_pb_data_service_data_service_streaming_proto_goTypes = []any{ + (*Identifier)(nil), // 0: data_service.Identifier + (*Message)(nil), // 1: data_service.Message + (*StartStreamRequest)(nil), // 2: data_service.StartStreamRequest + (*StartStreamResponse)(nil), // 3: data_service.StartStreamResponse + (*UpdateStreamRequest)(nil), // 4: data_service.UpdateStreamRequest + (*UpdateStreamResponse)(nil), // 5: data_service.UpdateStreamResponse + (*StopStreamRequest)(nil), // 6: data_service.StopStreamRequest + (*StopStreamResponse)(nil), // 7: data_service.StopStreamResponse + (*ConnectStreamRequest)(nil), // 8: data_service.ConnectStreamRequest +} +var file_pkg_pb_data_service_data_service_streaming_proto_depIdxs = []int32{ + 0, // 0: data_service.Message.identifier:type_name -> data_service.Identifier + 0, // 1: data_service.StartStreamRequest.identifiers:type_name -> data_service.Identifier + 0, // 2: data_service.UpdateStreamRequest.identifiers:type_name -> data_service.Identifier + 2, // 3: data_service.DataServiceStreaming.StartStream:input_type -> data_service.StartStreamRequest + 4, // 4: data_service.DataServiceStreaming.UpdateStream:input_type -> data_service.UpdateStreamRequest + 6, // 5: data_service.DataServiceStreaming.StopStream:input_type -> data_service.StopStreamRequest + 8, // 6: data_service.DataServiceStreaming.ConnectStream:input_type -> data_service.ConnectStreamRequest + 3, // 7: data_service.DataServiceStreaming.StartStream:output_type -> data_service.StartStreamResponse + 5, // 8: data_service.DataServiceStreaming.UpdateStream:output_type -> data_service.UpdateStreamResponse + 7, // 9: data_service.DataServiceStreaming.StopStream:output_type -> data_service.StopStreamResponse + 1, // 10: data_service.DataServiceStreaming.ConnectStream:output_type -> data_service.Message + 7, // [7:11] is the sub-list for method output_type + 3, // [3:7] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name +} + +func init() { file_pkg_pb_data_service_data_service_streaming_proto_init() } +func file_pkg_pb_data_service_data_service_streaming_proto_init() { + if File_pkg_pb_data_service_data_service_streaming_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_pkg_pb_data_service_data_service_streaming_proto_rawDesc), len(file_pkg_pb_data_service_data_service_streaming_proto_rawDesc)), + NumEnums: 0, + NumMessages: 9, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_pkg_pb_data_service_data_service_streaming_proto_goTypes, + DependencyIndexes: file_pkg_pb_data_service_data_service_streaming_proto_depIdxs, + MessageInfos: file_pkg_pb_data_service_data_service_streaming_proto_msgTypes, + }.Build() + File_pkg_pb_data_service_data_service_streaming_proto = out.File + file_pkg_pb_data_service_data_service_streaming_proto_goTypes = nil + file_pkg_pb_data_service_data_service_streaming_proto_depIdxs = nil +} diff --git a/pkg/pb/data_service/data_service_streaming.proto b/pkg/pb/data_service/data_service_streaming.proto new file mode 100644 index 0000000..209cbca --- /dev/null +++ b/pkg/pb/data_service/data_service_streaming.proto @@ -0,0 +1,53 @@ +syntax = "proto3"; + +package data_service; + +option go_package = "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service;data_service"; + +service DataServiceStreaming { + rpc StartStream(StartStreamRequest) returns (StartStreamResponse); + rpc UpdateStream(UpdateStreamRequest) returns (UpdateStreamResponse); + rpc StopStream(StopStreamRequest) returns (StopStreamResponse); + + rpc ConnectStream(ConnectStreamRequest) returns (stream Message); +} + +message Identifier { + string provider = 1; // e.g., "binance" + string subject = 2; // e.g., "BTCUSDT" +} + +message Message { + Identifier identifier = 1; + string payload = 2; // JSON-encoded data +} + +message StartStreamRequest { + repeated Identifier identifiers = 1; +} + +message StartStreamResponse { + string stream_uuid = 1; +} + +message UpdateStreamRequest { + string stream_uuid = 1; + repeated Identifier identifiers = 2; +} + +message UpdateStreamResponse { + bool success = 1; +} + +message StopStreamRequest { + string stream_uuid = 1; +} + +message StopStreamResponse { + bool success = 1; +} + + +message ConnectStreamRequest { + string stream_uuid = 1; +} diff --git a/pkg/pb/data_service/data_service_streaming_grpc.pb.go b/pkg/pb/data_service/data_service_streaming_grpc.pb.go new file mode 100644 index 0000000..b600064 --- /dev/null +++ b/pkg/pb/data_service/data_service_streaming_grpc.pb.go @@ -0,0 +1,239 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.5.1 +// - protoc v6.31.1 +// source: pkg/pb/data_service/data_service_streaming.proto + +package data_service + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + DataServiceStreaming_StartStream_FullMethodName = "/data_service.DataServiceStreaming/StartStream" + DataServiceStreaming_UpdateStream_FullMethodName = "/data_service.DataServiceStreaming/UpdateStream" + DataServiceStreaming_StopStream_FullMethodName = "/data_service.DataServiceStreaming/StopStream" + DataServiceStreaming_ConnectStream_FullMethodName = "/data_service.DataServiceStreaming/ConnectStream" +) + +// DataServiceStreamingClient is the client API for DataServiceStreaming service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type DataServiceStreamingClient interface { + StartStream(ctx context.Context, in *StartStreamRequest, opts ...grpc.CallOption) (*StartStreamResponse, error) + UpdateStream(ctx context.Context, in *UpdateStreamRequest, opts ...grpc.CallOption) (*UpdateStreamResponse, error) + StopStream(ctx context.Context, in *StopStreamRequest, opts ...grpc.CallOption) (*StopStreamResponse, error) + ConnectStream(ctx context.Context, in *ConnectStreamRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Message], error) +} + +type dataServiceStreamingClient struct { + cc grpc.ClientConnInterface +} + +func NewDataServiceStreamingClient(cc grpc.ClientConnInterface) DataServiceStreamingClient { + return &dataServiceStreamingClient{cc} +} + +func (c *dataServiceStreamingClient) StartStream(ctx context.Context, in *StartStreamRequest, opts ...grpc.CallOption) (*StartStreamResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(StartStreamResponse) + err := c.cc.Invoke(ctx, DataServiceStreaming_StartStream_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *dataServiceStreamingClient) UpdateStream(ctx context.Context, in *UpdateStreamRequest, opts ...grpc.CallOption) (*UpdateStreamResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(UpdateStreamResponse) + err := c.cc.Invoke(ctx, DataServiceStreaming_UpdateStream_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *dataServiceStreamingClient) StopStream(ctx context.Context, in *StopStreamRequest, opts ...grpc.CallOption) (*StopStreamResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(StopStreamResponse) + err := c.cc.Invoke(ctx, DataServiceStreaming_StopStream_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *dataServiceStreamingClient) ConnectStream(ctx context.Context, in *ConnectStreamRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Message], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &DataServiceStreaming_ServiceDesc.Streams[0], DataServiceStreaming_ConnectStream_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[ConnectStreamRequest, Message]{ClientStream: stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type DataServiceStreaming_ConnectStreamClient = grpc.ServerStreamingClient[Message] + +// DataServiceStreamingServer is the server API for DataServiceStreaming service. +// All implementations must embed UnimplementedDataServiceStreamingServer +// for forward compatibility. +type DataServiceStreamingServer interface { + StartStream(context.Context, *StartStreamRequest) (*StartStreamResponse, error) + UpdateStream(context.Context, *UpdateStreamRequest) (*UpdateStreamResponse, error) + StopStream(context.Context, *StopStreamRequest) (*StopStreamResponse, error) + ConnectStream(*ConnectStreamRequest, grpc.ServerStreamingServer[Message]) error + mustEmbedUnimplementedDataServiceStreamingServer() +} + +// UnimplementedDataServiceStreamingServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedDataServiceStreamingServer struct{} + +func (UnimplementedDataServiceStreamingServer) StartStream(context.Context, *StartStreamRequest) (*StartStreamResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method StartStream not implemented") +} +func (UnimplementedDataServiceStreamingServer) UpdateStream(context.Context, *UpdateStreamRequest) (*UpdateStreamResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method UpdateStream not implemented") +} +func (UnimplementedDataServiceStreamingServer) StopStream(context.Context, *StopStreamRequest) (*StopStreamResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method StopStream not implemented") +} +func (UnimplementedDataServiceStreamingServer) ConnectStream(*ConnectStreamRequest, grpc.ServerStreamingServer[Message]) error { + return status.Errorf(codes.Unimplemented, "method ConnectStream not implemented") +} +func (UnimplementedDataServiceStreamingServer) mustEmbedUnimplementedDataServiceStreamingServer() {} +func (UnimplementedDataServiceStreamingServer) testEmbeddedByValue() {} + +// UnsafeDataServiceStreamingServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to DataServiceStreamingServer will +// result in compilation errors. +type UnsafeDataServiceStreamingServer interface { + mustEmbedUnimplementedDataServiceStreamingServer() +} + +func RegisterDataServiceStreamingServer(s grpc.ServiceRegistrar, srv DataServiceStreamingServer) { + // If the following call pancis, it indicates UnimplementedDataServiceStreamingServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&DataServiceStreaming_ServiceDesc, srv) +} + +func _DataServiceStreaming_StartStream_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(StartStreamRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DataServiceStreamingServer).StartStream(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: DataServiceStreaming_StartStream_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DataServiceStreamingServer).StartStream(ctx, req.(*StartStreamRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _DataServiceStreaming_UpdateStream_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(UpdateStreamRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DataServiceStreamingServer).UpdateStream(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: DataServiceStreaming_UpdateStream_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DataServiceStreamingServer).UpdateStream(ctx, req.(*UpdateStreamRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _DataServiceStreaming_StopStream_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(StopStreamRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DataServiceStreamingServer).StopStream(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: DataServiceStreaming_StopStream_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DataServiceStreamingServer).StopStream(ctx, req.(*StopStreamRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _DataServiceStreaming_ConnectStream_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(ConnectStreamRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(DataServiceStreamingServer).ConnectStream(m, &grpc.GenericServerStream[ConnectStreamRequest, Message]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type DataServiceStreaming_ConnectStreamServer = grpc.ServerStreamingServer[Message] + +// DataServiceStreaming_ServiceDesc is the grpc.ServiceDesc for DataServiceStreaming service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var DataServiceStreaming_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "data_service.DataServiceStreaming", + HandlerType: (*DataServiceStreamingServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "StartStream", + Handler: _DataServiceStreaming_StartStream_Handler, + }, + { + MethodName: "UpdateStream", + Handler: _DataServiceStreaming_UpdateStream_Handler, + }, + { + MethodName: "StopStream", + Handler: _DataServiceStreaming_StopStream_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "ConnectStream", + Handler: _DataServiceStreaming_ConnectStream_Handler, + ServerStreams: true, + }, + }, + Metadata: "pkg/pb/data_service/data_service_streaming.proto", +} diff --git a/pkg/pb/marketdata/marketdata.proto b/pkg/pb/marketdata/marketdata.proto deleted file mode 100644 index e69de29..0000000 diff --git a/services/marketdata/cmd/main.go b/services/controller_service/cmd/main.go similarity index 100% rename from services/marketdata/cmd/main.go rename to services/controller_service/cmd/main.go diff --git a/services/orchestrator/service.go b/services/controller_service/service.go similarity index 100% rename from services/orchestrator/service.go rename to services/controller_service/service.go diff --git a/services/data_service/cmd/main.go b/services/data_service/cmd/main.go new file mode 100644 index 0000000..74a21bf --- /dev/null +++ b/services/data_service/cmd/main.go @@ -0,0 +1,54 @@ +package main + +import ( + pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service" + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager" + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider/binance" + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/router" + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/server" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" + "log" + "net" +) + +func main() { + // Setup + r := router.NewRouter() + m := manager.NewManager(r) + binanceFutures := binance.NewFuturesWebsocket() + m.AddProvider("binance_futures_websocket", binanceFutures) + + // gRPC Server + grpcServer := grpc.NewServer() + streamingServer := server.NewGRPCStreamingServer(m) + pb.RegisterDataServiceStreamingServer(grpcServer, streamingServer) + reflection.Register(grpcServer) + + go func() { + grpcLis, err := net.Listen("tcp", ":50051") + if err != nil { + log.Fatalf("Failed to listen for gRPC: %v", err) + } + log.Println("gRPC server listening on :50051") + if err := grpcServer.Serve(grpcLis); err != nil { + log.Fatalf("Failed to serve gRPC: %v", err) + } + }() + + // Socket Server + socketServer := server.NewSocketStreamingServer(m) + go func() { + socketLis, err := net.Listen("tcp", ":6000") + if err != nil { + log.Fatalf("Failed to listen for socket: %v", err) + } + log.Println("Socket server listening on :6000") + if err := socketServer.Serve(socketLis); err != nil { + log.Fatalf("Socket server error: %v", err) + } + }() + + // Block main forever + select {} +} diff --git a/services/marketdata/internal/config/config.go b/services/data_service/internal/config/config.go similarity index 100% rename from services/marketdata/internal/config/config.go rename to services/data_service/internal/config/config.go diff --git a/services/marketdata/internal/identifier/identifier.go b/services/data_service/internal/domain/identifier.go similarity index 76% rename from services/marketdata/internal/identifier/identifier.go rename to services/data_service/internal/domain/identifier.go index d9daeed..3a61744 100644 --- a/services/marketdata/internal/identifier/identifier.go +++ b/services/data_service/internal/domain/identifier.go @@ -1,4 +1,4 @@ -package identifier +package domain type Identifier struct { Provider string diff --git a/services/data_service/internal/domain/message.go b/services/data_service/internal/domain/message.go new file mode 100644 index 0000000..c211306 --- /dev/null +++ b/services/data_service/internal/domain/message.go @@ -0,0 +1,6 @@ +package domain + +type Message struct { + Identifier Identifier + Payload any +} diff --git a/services/data_service/internal/manager/manager.go b/services/data_service/internal/manager/manager.go new file mode 100644 index 0000000..9892644 --- /dev/null +++ b/services/data_service/internal/manager/manager.go @@ -0,0 +1,196 @@ +package manager + +import ( + "fmt" + "github.com/google/uuid" + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/provider" + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/router" + "sync" + "time" +) + +type Manager struct { + providers map[string]provider.Provider + providerStreams map[domain.Identifier]chan domain.Message + + clientStreams map[uuid.UUID]*ClientStream + + router *router.Router + + mu sync.Mutex +} + +type ClientStream struct { + UUID uuid.UUID + Identifiers []domain.Identifier + OutChannel chan domain.Message + Timer *time.Timer +} + +func NewManager(router *router.Router) *Manager { + go router.Run() // Start the router in a separate goroutine + return &Manager{ + providers: make(map[string]provider.Provider), + providerStreams: make(map[domain.Identifier]chan domain.Message), + clientStreams: make(map[uuid.UUID]*ClientStream), + router: router, + } +} + +func (m *Manager) StartStream(ids []domain.Identifier) (uuid.UUID, error) { + m.mu.Lock() + defer m.mu.Unlock() + + // Validate Identifiers, prevents unnecessary calls to providers + for _, id := range ids { + if id.Provider == "" || id.Subject == "" { + return uuid.Nil, fmt.Errorf("invalid identifier: %v", id) + } + if _, ok := m.providers[id.Provider]; !ok { + return uuid.Nil, fmt.Errorf("unknown provider: %s", id.Provider) + } + if !m.providers[id.Provider].IsValidSubject(id.Subject, false) { + return uuid.Nil, fmt.Errorf("invalid subject for provider %s: %s", id.Provider, id.Subject) + } + } + + // Provision provider streams + for _, id := range ids { + if _, ok := m.providerStreams[id]; ok { + continue // Skip if requested stream is already being provided + } + + ch := make(chan domain.Message, 64) + if err := m.providers[id.Provider].RequestStream(id.Subject, ch); err != nil { + return uuid.Nil, fmt.Errorf("provision %v: %w", id, err) + } + m.providerStreams[id] = ch + + // Start routine to route the provider stream to the router's input channel + go func(ch chan domain.Message) { + for msg := range ch { + m.router.IncomingChannel() <- msg + } + }(ch) + } + + streamID := uuid.New() + + m.clientStreams[streamID] = &ClientStream{ + UUID: streamID, + Identifiers: ids, + OutChannel: nil, // Initially nil, will be set when connected + Timer: time.AfterFunc(1*time.Minute, func() { + fmt.Printf("stream %s expired due to inactivity\n", streamID) + m.StopStream(streamID) + }), + } + + return streamID, nil +} + +func (m *Manager) StopStream(streamID uuid.UUID) { + m.DisconnectStream(streamID) + + m.mu.Lock() + defer m.mu.Unlock() + + stream, ok := m.clientStreams[streamID] + if !ok { + return // Stream not found + } + + stream.Timer.Stop() + + delete(m.clientStreams, streamID) + + // Find provider streams that are used by other client streams + used := make(map[domain.Identifier]bool) + for _, s := range m.clientStreams { + for _, id := range s.Identifiers { + used[id] = true + } + } + + // Cancel provider streams that are not used by any client stream + for id, ch := range m.providerStreams { + if !used[id] { + m.providers[id.Provider].CancelStream(id.Subject) + close(ch) + delete(m.providerStreams, id) + } + } +} + +func (m *Manager) ConnectStream(streamID uuid.UUID) (<-chan domain.Message, error) { + m.mu.Lock() + defer m.mu.Unlock() + + stream, ok := m.clientStreams[streamID] + if !ok { + return nil, fmt.Errorf("stream not found: %s", streamID) + } + + if stream.OutChannel != nil { + return nil, fmt.Errorf("stream already connected") + } + + ch := make(chan domain.Message, 128) + stream.OutChannel = ch + + for _, ident := range stream.Identifiers { + m.router.RegisterRoute(ident, ch) + } + + if stream.Timer != nil { + stream.Timer.Stop() + stream.Timer = nil + } + + return ch, nil +} + +func (m *Manager) DisconnectStream(streamID uuid.UUID) { + m.mu.Lock() + defer m.mu.Unlock() + + stream, ok := m.clientStreams[streamID] + if !ok || stream.OutChannel == nil { + return // already disconnected or does not exist + } + + // Deregister all identifiers from the router + for _, ident := range stream.Identifiers { + m.router.DeregisterRoute(ident, stream.OutChannel) + } + + // Close the output channel + close(stream.OutChannel) + stream.OutChannel = nil + + // Set up the expiry timer + stream.Timer = time.AfterFunc(1*time.Minute, func() { + fmt.Printf("stream %s expired due to inactivity\n", streamID) + m.StopStream(streamID) + }) +} + +func (m *Manager) AddProvider(name string, p provider.Provider) { + m.mu.Lock() + defer m.mu.Unlock() + + if _, exists := m.providers[name]; exists { + panic(fmt.Sprintf("provider %s already exists", name)) + } + + if err := p.Start(); err != nil { + panic(fmt.Errorf("failed to start provider %s: %w", name, err)) + } + + m.providers[name] = p +} + +func (m *Manager) RemoveProvider(name string) { + panic("not implemented yet") // TODO: Implement provider removal logic +} diff --git a/services/data_service/internal/provider/binance/futures_websocket.go b/services/data_service/internal/provider/binance/futures_websocket.go new file mode 100644 index 0000000..f6683da --- /dev/null +++ b/services/data_service/internal/provider/binance/futures_websocket.go @@ -0,0 +1,153 @@ +package binance + +import ( + "encoding/json" + "fmt" + "sync" + + "github.com/gorilla/websocket" + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" +) + +type FuturesWebsocket struct { + conn *websocket.Conn + activeStreams map[string]chan domain.Message + mu sync.Mutex +} + +func NewFuturesWebsocket() *FuturesWebsocket { + return &FuturesWebsocket{ + activeStreams: make(map[string]chan domain.Message), + } +} + +func (b *FuturesWebsocket) Start() error { + c, _, err := websocket.DefaultDialer.Dial("wss://fstream.binance.com/stream", nil) + if err != nil { + return fmt.Errorf("connect failed: %w", err) + } + b.conn = c + go b.readLoop() + return nil +} + +func (b *FuturesWebsocket) Stop() { + if b.conn != nil { + err := b.conn.Close() + if err != nil { + panic(fmt.Errorf("failed to close websocket connection: %w", err)) + } + } +} + +func (b *FuturesWebsocket) RequestStream(subject string, ch chan domain.Message) error { + b.mu.Lock() + defer b.mu.Unlock() + + if _, ok := b.activeStreams[subject]; ok { + return nil + } + + msg := map[string]interface{}{ + "method": "SUBSCRIBE", + "params": []string{subject}, + "id": len(b.activeStreams) + 1, + } + if err := b.conn.WriteJSON(msg); err != nil { + return fmt.Errorf("subscribe failed: %w", err) + } + + b.activeStreams[subject] = ch + fmt.Println("Subscribed to stream:", subject) + return nil +} + +func (b *FuturesWebsocket) CancelStream(subject string) { + b.mu.Lock() + defer b.mu.Unlock() + + if _, ok := b.activeStreams[subject]; !ok { + return + } + + msg := map[string]interface{}{ + "method": "UNSUBSCRIBE", + "params": []string{subject}, + "id": len(b.activeStreams) + 1000, + } + _ = b.conn.WriteJSON(msg) + + delete(b.activeStreams, subject) +} + +func (b *FuturesWebsocket) GetActiveStreams() []string { + b.mu.Lock() + defer b.mu.Unlock() + + var streams []string + for k := range b.activeStreams { + streams = append(streams, k) + } + return streams +} + +func (b *FuturesWebsocket) IsStreamActive(subject string) bool { + b.mu.Lock() + defer b.mu.Unlock() + + _, ok := b.activeStreams[subject] + return ok +} + +func (b *FuturesWebsocket) Fetch(subject string) (domain.Message, error) { + return domain.Message{}, fmt.Errorf("not supported: websocket provider does not implement fetch") +} + +func (b *FuturesWebsocket) IsValidSubject(subject string, isFetch bool) bool { + if isFetch { + return false + } + return len(subject) > 0 // Extend with regex or lookup if needed +} + +func (b *FuturesWebsocket) readLoop() { + for { + _, msgBytes, err := b.conn.ReadMessage() + if err != nil { + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + return + } + fmt.Printf("read error: %v\n", err) + continue + } + + var container struct { + Stream string `json:"stream"` + Data json.RawMessage `json:"data"` + } + if err := json.Unmarshal(msgBytes, &container); err != nil { + continue + } + + b.mu.Lock() + ch, ok := b.activeStreams[container.Stream] + b.mu.Unlock() + if !ok { + continue + } + + msg := domain.Message{ + Identifier: domain.Identifier{ + Provider: "binance_futures_websocket", + Subject: container.Stream, + }, + Payload: container.Data, + } + + select { + case ch <- msg: + default: + fmt.Printf("channel for %s is full, dropping message\n", container.Stream) + } + } +} diff --git a/services/marketdata/internal/provider/provider.go b/services/data_service/internal/provider/provider.go similarity index 52% rename from services/marketdata/internal/provider/provider.go rename to services/data_service/internal/provider/provider.go index 118c191..fb1bede 100644 --- a/services/marketdata/internal/provider/provider.go +++ b/services/data_service/internal/provider/provider.go @@ -1,19 +1,19 @@ package provider import ( - "gitlab.michelsen.id/phillmichelsen/tessera/services/marketdata/internal/router" + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" ) type Provider interface { Start() error - Stop() error + Stop() - RequestStream(subject string, channel chan router.Message) error + RequestStream(subject string, channel chan domain.Message) error CancelStream(subject string) GetActiveStreams() []string IsStreamActive(subject string) bool - Fetch(subject string) (router.Message, error) + Fetch(subject string) (domain.Message, error) IsValidSubject(subject string, isFetch bool) bool } diff --git a/services/data_service/internal/router/router.go b/services/data_service/internal/router/router.go new file mode 100644 index 0000000..6e271bd --- /dev/null +++ b/services/data_service/internal/router/router.go @@ -0,0 +1,67 @@ +package router + +import ( + "fmt" + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" + "sync" + "time" +) + +type Router struct { + incoming chan domain.Message + routes map[domain.Identifier][]chan<- domain.Message + mu sync.RWMutex +} + +func NewRouter() *Router { + return &Router{ + incoming: make(chan domain.Message, 64), // Buffered channel for incoming messages + routes: make(map[domain.Identifier][]chan<- domain.Message), + } +} + +func (r *Router) IncomingChannel() chan<- domain.Message { + return r.incoming +} + +func (r *Router) Run() { + for msg := range r.incoming { + startTime := time.Now() + r.mu.RLock() + channels := r.routes[msg.Identifier] + + for _, ch := range channels { + select { + case ch <- msg: + default: + fmt.Println("Dropped message, buffer full!!!") + } + } + r.mu.RUnlock() + fmt.Printf("Message routed to %d channels in %v\n", len(channels), time.Since(startTime)) + } +} + +func (r *Router) RegisterRoute(id domain.Identifier, ch chan<- domain.Message) { + r.mu.Lock() + r.routes[id] = append(r.routes[id], ch) + r.mu.Unlock() +} + +func (r *Router) DeregisterRoute(id domain.Identifier, ch chan<- domain.Message) { + r.mu.Lock() + slice := r.routes[id] + for i := 0; i < len(slice); i++ { + if slice[i] == ch { + slice[i] = slice[len(slice)-1] + slice = slice[:len(slice)-1] + i-- + } + } + if len(slice) == 0 { + delete(r.routes, id) + } else { + r.routes[id] = slice + } + r.mu.Unlock() +} diff --git a/services/data_service/internal/server/grpc_streaming_server.go b/services/data_service/internal/server/grpc_streaming_server.go new file mode 100644 index 0000000..58fa415 --- /dev/null +++ b/services/data_service/internal/server/grpc_streaming_server.go @@ -0,0 +1,74 @@ +package server + +import ( + "context" + "fmt" + "github.com/google/uuid" + pb "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service" + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager" +) + +type GRPCStreamingServer struct { + pb.UnimplementedDataServiceStreamingServer + manager *manager.Manager +} + +func NewGRPCStreamingServer(m *manager.Manager) *GRPCStreamingServer { + return &GRPCStreamingServer{ + manager: m, + } +} + +func (s *GRPCStreamingServer) StartStream(ctx context.Context, req *pb.StartStreamRequest) (*pb.StartStreamResponse, error) { + var ids []domain.Identifier + for _, id := range req.Identifiers { + ids = append(ids, domain.Identifier{ + Provider: id.Provider, + Subject: id.Subject, + }) + } + + streamID, err := s.manager.StartStream(ids) + if err != nil { + return nil, fmt.Errorf("failed to start stream: %w", err) + } + + return &pb.StartStreamResponse{StreamUuid: streamID.String()}, nil +} + +func (s *GRPCStreamingServer) ConnectStream(req *pb.ConnectStreamRequest, stream pb.DataServiceStreaming_ConnectStreamServer) error { + streamUUID, err := uuid.Parse(req.StreamUuid) + if err != nil { + return fmt.Errorf("invalid UUID: %w", err) + } + + ch, err := s.manager.ConnectStream(streamUUID) + if err != nil { + return fmt.Errorf("failed to connect: %w", err) + } + + for { + select { + case <-stream.Context().Done(): + s.manager.DisconnectStream(streamUUID) + return nil + case msg, ok := <-ch: + if !ok { + return nil + } + + err := stream.Send(&pb.Message{ + Identifier: &pb.Identifier{ + Provider: msg.Identifier.Provider, + Subject: msg.Identifier.Subject, + }, + Payload: fmt.Sprintf("%s", msg.Payload), + }) + + if err != nil { + return err + } + } + } +} diff --git a/services/data_service/internal/server/socket_streaming_server.go b/services/data_service/internal/server/socket_streaming_server.go new file mode 100644 index 0000000..656c089 --- /dev/null +++ b/services/data_service/internal/server/socket_streaming_server.go @@ -0,0 +1,83 @@ +package server + +import ( + "bufio" + "encoding/json" + "fmt" + "github.com/google/uuid" + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager" + "io" + "net" + "strings" +) + +type SocketStreamingServer struct { + manager *manager.Manager +} + +func NewSocketStreamingServer(m *manager.Manager) *SocketStreamingServer { + return &SocketStreamingServer{ + manager: m, + } +} + +// Serve accepts a listener (TCP or Unix) and begins handling incoming connections. +func (s *SocketStreamingServer) Serve(lis net.Listener) error { + for { + conn, err := lis.Accept() + if err != nil { + fmt.Printf("Failed to accept connection: %v\n", err) + continue + } + go s.handleConnection(conn) + } +} + +func (s *SocketStreamingServer) handleConnection(conn net.Conn) { + defer conn.Close() + reader := bufio.NewReader(conn) + + raw, err := reader.ReadString('\n') + if err != nil { + fmt.Printf("Failed to read stream UUID: %v\n", err) + return + } + streamUUIDStr := strings.TrimSpace(raw) + streamUUID, err := uuid.Parse(streamUUIDStr) + if err != nil { + fmt.Fprintf(conn, "Invalid stream UUID\n") + return + } + + outCh, err := s.manager.ConnectStream(streamUUID) + if err != nil { + fmt.Fprintf(conn, "Failed to connect to stream: %v\n", err) + return + } + defer s.manager.DisconnectStream(streamUUID) + + for msg := range outCh { + payload := struct { + Provider string `json:"provider"` + Subject string `json:"subject"` + Data string `json:"data"` + }{ + Provider: msg.Identifier.Provider, + Subject: msg.Identifier.Subject, + Data: fmt.Sprintf("%s", msg.Payload), + } + + bytes, err := json.Marshal(payload) + if err != nil { + fmt.Printf("Failed to encode message: %v\n", err) + continue + } + _, err = conn.Write(append(bytes, '\n')) + if err != nil { + if err != io.EOF { + fmt.Printf("Write error: %v\n", err) + } + break + } + } +} diff --git a/services/marketdata/go.mod b/services/marketdata/go.mod deleted file mode 100644 index c30f89e..0000000 --- a/services/marketdata/go.mod +++ /dev/null @@ -1,5 +0,0 @@ -module gitlab.michelsen.id/phillmichelsen/tessera/services/marketdata - -go 1.24.2 - -require github.com/google/uuid v1.6.0 // indirect diff --git a/services/marketdata/go.sum b/services/marketdata/go.sum deleted file mode 100644 index 7790d7c..0000000 --- a/services/marketdata/go.sum +++ /dev/null @@ -1,2 +0,0 @@ -github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= -github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= diff --git a/services/marketdata/internal/manager/manager.go b/services/marketdata/internal/manager/manager.go deleted file mode 100644 index 2b873bb..0000000 --- a/services/marketdata/internal/manager/manager.go +++ /dev/null @@ -1,187 +0,0 @@ -package manager - -import ( - "context" - "fmt" - "sync" - - "gitlab.michelsen.id/phillmichelsen/tessera/services/marketdata/internal/identifier" - "gitlab.michelsen.id/phillmichelsen/tessera/services/marketdata/internal/provider" - "gitlab.michelsen.id/phillmichelsen/tessera/services/marketdata/internal/router" -) - -type Manager struct { - Router *router.Router - incoming chan router.Message // Aggregated incoming messages from providers, for the Router - - Providers map[string]provider.Provider - providerStreams map[identifier.Identifier]chan router.Message // Channels for the streams that the providers are running - - subscribers map[identifier.Identifier][]chan router.Message // Map of identifiers to subscriber channels, one to many mapping - - mu sync.Mutex -} - -// NewManager constructs a Manager and starts its Router loop. -func NewManager() *Manager { - incoming := make(chan router.Message, 128) - r := router.NewRouter(incoming) - - m := &Manager{ - Router: r, - incoming: incoming, - Providers: make(map[string]provider.Provider), - providerStreams: make(map[identifier.Identifier]chan router.Message), - subscribers: make(map[identifier.Identifier][]chan router.Message), - } - go r.Run() - return m -} - -// AddProvider registers and starts a new Provider under the given name. -func (m *Manager) AddProvider(name string, p provider.Provider) error { - m.mu.Lock() - defer m.mu.Unlock() - - if _, exists := m.Providers[name]; exists { - return fmt.Errorf("provider %q already exists", name) - } - if err := p.Start(); err != nil { - return fmt.Errorf("failed to start provider %q: %w", name, err) - } - m.Providers[name] = p - return nil -} - -// RemoveProvider stops and unregisters a Provider, tearing down all its streams. -func (m *Manager) RemoveProvider(name string) error { - m.mu.Lock() - defer m.mu.Unlock() - - p, exists := m.Providers[name] - if !exists { - return fmt.Errorf("provider %q not found", name) - } - if err := p.Stop(); err != nil { - return fmt.Errorf("failed to stop provider %q: %w", name, err) - } - - // tear down every active stream for this provider - for id, streamCh := range m.providerStreams { - if id.Provider != name { - continue - } - // stop the provider's internal stream - p.CancelStream(id.Subject) - close(streamCh) - delete(m.providerStreams, id) - - // deregister & close all subscriber channels - for _, subCh := range m.subscribers[id] { - m.Router.Deregister(id, subCh) - close(subCh) - } - delete(m.subscribers, id) - } - - delete(m.Providers, name) - return nil -} - -// Stream establishes a single gRPC‐stream channel that multiplexes -// live updates for all requested identifiers. -// When ctx is canceled, it will deregister and—if a stream has no more -// subscribers—call CancelStream on the provider. -func (m *Manager) Stream( - ctx context.Context, - reqs []identifier.Identifier, -) (<-chan router.Message, error) { - m.mu.Lock() - // 1) Validate and ensure each provider/subject is streaming - for _, id := range reqs { - p, ok := m.Providers[id.Provider] - if !ok { - m.mu.Unlock() - return nil, fmt.Errorf("provider %q not found", id.Provider) - } - if !p.IsValidSubject(id.Subject, false) { - m.mu.Unlock() - return nil, fmt.Errorf("invalid subject %q for provider %q", id.Subject, id.Provider) - } - // start the provider stream if not already running - if _, exists := m.providerStreams[id]; !exists { - ch := make(chan router.Message, 64) - if err := p.RequestStream(id.Subject, ch); err != nil { - m.mu.Unlock() - return nil, fmt.Errorf("could not request stream for %v: %w", id, err) - } - m.providerStreams[id] = ch - // pump into the central incoming channel - go func(c chan router.Message) { - for msg := range c { - m.incoming <- msg - } - }(ch) - } - } - - // 2) Create one channel for this RPC and register it for every ID - subCh := make(chan router.Message, 128) - for _, id := range reqs { - m.subscribers[id] = append(m.subscribers[id], subCh) - m.Router.Register(id, subCh) - } - m.mu.Unlock() - - // 3) Teardown logic when context is done - go func() { - <-ctx.Done() - m.mu.Lock() - defer m.mu.Unlock() - - for _, id := range reqs { - // deregister this subscriber channel - m.Router.Deregister(id, subCh) - - // remove it from the list - subs := m.subscribers[id] - for i, ch := range subs { - if ch == subCh { - subs = append(subs[:i], subs[i+1:]...) - break - } - } - - if len(subs) == 0 { - // no more listeners: cancel provider stream - delete(m.subscribers, id) - if streamCh, ok := m.providerStreams[id]; ok { - m.Providers[id.Provider].CancelStream(id.Subject) - close(streamCh) - delete(m.providerStreams, id) - } - } else { - m.subscribers[id] = subs - } - } - - close(subCh) - }() - - return subCh, nil -} - -// Fetch performs a single request/response fetch against the named provider. -func (m *Manager) Fetch(providerName, subject string) (router.Message, error) { - m.mu.Lock() - defer m.mu.Unlock() - - p, ok := m.Providers[providerName] - if !ok { - return router.Message{}, fmt.Errorf("provider %q not found", providerName) - } - if !p.IsValidSubject(subject, true) { - return router.Message{}, fmt.Errorf("invalid subject %q for provider %q", subject, providerName) - } - return p.Fetch(subject) -} diff --git a/services/marketdata/internal/provider/binance/spot_fix.go b/services/marketdata/internal/provider/binance/spot_fix.go deleted file mode 100644 index f5c710e..0000000 --- a/services/marketdata/internal/provider/binance/spot_fix.go +++ /dev/null @@ -1 +0,0 @@ -package binance diff --git a/services/marketdata/internal/provider/binance/spot_rest.go b/services/marketdata/internal/provider/binance/spot_rest.go deleted file mode 100644 index f5c710e..0000000 --- a/services/marketdata/internal/provider/binance/spot_rest.go +++ /dev/null @@ -1 +0,0 @@ -package binance diff --git a/services/marketdata/internal/provider/binance/spot_websocket.go b/services/marketdata/internal/provider/binance/spot_websocket.go deleted file mode 100644 index f5c710e..0000000 --- a/services/marketdata/internal/provider/binance/spot_websocket.go +++ /dev/null @@ -1 +0,0 @@ -package binance diff --git a/services/marketdata/internal/router/router.go b/services/marketdata/internal/router/router.go deleted file mode 100644 index 0294c97..0000000 --- a/services/marketdata/internal/router/router.go +++ /dev/null @@ -1,59 +0,0 @@ -package router - -import ( - "gitlab.michelsen.id/phillmichelsen/tessera/services/marketdata/internal/identifier" - "sync" -) - -type Message struct { - Identifier identifier.Identifier - Payload any -} - -type Router struct { - incoming <-chan Message - routes map[identifier.Identifier][]chan<- Message - mu sync.RWMutex -} - -func NewRouter(incoming <-chan Message) *Router { - return &Router{ - incoming: incoming, - routes: make(map[identifier.Identifier][]chan<- Message), - } -} - -func (r *Router) Run() { - for msg := range r.incoming { - r.mu.RLock() - chans := r.routes[msg.Identifier] - for _, ch := range chans { - ch <- msg - } - r.mu.RUnlock() - } -} - -func (r *Router) Register(id identifier.Identifier, ch chan<- Message) { - r.mu.Lock() - r.routes[id] = append(r.routes[id], ch) - r.mu.Unlock() -} - -func (r *Router) Deregister(id identifier.Identifier, ch chan<- Message) { - r.mu.Lock() - slice := r.routes[id] - for i := 0; i < len(slice); i++ { - if slice[i] == ch { - slice[i] = slice[len(slice)-1] - slice = slice[:len(slice)-1] - i-- - } - } - if len(slice) == 0 { - delete(r.routes, id) - } else { - r.routes[id] = slice - } - r.mu.Unlock() -} diff --git a/services/marketdata/internal/transport/grpc/server.go b/services/marketdata/internal/transport/grpc/server.go deleted file mode 100644 index 21e034e..0000000 --- a/services/marketdata/internal/transport/grpc/server.go +++ /dev/null @@ -1 +0,0 @@ -package grpc diff --git a/services/orchestrator/go.mod b/services/orchestrator/go.mod deleted file mode 100644 index dffbdf6..0000000 --- a/services/orchestrator/go.mod +++ /dev/null @@ -1,3 +0,0 @@ -module gitlab.michelsen.id/phillmichelsen/tessera/services/orchestrator - -go 1.24.2 diff --git a/services/portfolio/cmd/main.go b/services/portfolio/cmd/main.go deleted file mode 100644 index 1d619dd..0000000 --- a/services/portfolio/cmd/main.go +++ /dev/null @@ -1 +0,0 @@ -package cmd diff --git a/services/orchestrator/cmd/main.go b/services/portfolio_service/cmd/main.go similarity index 100% rename from services/orchestrator/cmd/main.go rename to services/portfolio_service/cmd/main.go diff --git a/services/portfolio/service.go b/services/portfolio_service/service.go similarity index 100% rename from services/portfolio/service.go rename to services/portfolio_service/service.go