diff --git a/Makefile b/Makefile index 7cd94cf..e69de29 100644 --- a/Makefile +++ b/Makefile @@ -1,11 +0,0 @@ -PROTO_DIR := pkg/pb -PROTO_FILES := $(shell find $(PROTO_DIR) -name '*.proto') - -.PHONY: proto -proto: - @echo "Generating Go code from Protobuf..." - protoc \ - --go_out=. --go_opt=paths=source_relative \ - --go-grpc_out=. --go-grpc_opt=paths=source_relative \ - $(PROTO_FILES) - @echo "Protobuf generation complete." diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/core/data/data.go b/core/data/data.go new file mode 100644 index 0000000..c68cf6f --- /dev/null +++ b/core/data/data.go @@ -0,0 +1,41 @@ +// Package data ... +package data + +import "time" + +type Envelope struct { + Source string + Key string + Timestamp time.Time + Sequence uint64 + Payload []byte +} + +type Source interface { + Start() error + Stop() + Name() string + + Subscribe(key string) (<-chan Envelope, error) + Unsubscribe(key string) error + + IsValidKey(key string) bool + GetSubscriptions() []string +} + +type Sink interface { + Start() error + Stop() + + Publish(envelope Envelope) error +} + +type Processor interface { + Start(context ProcessorContext) error + Stop() +} + +type ProcessorContext interface { + Send(inPort string, messsage Envelope) error + Receive(outPort string) Envelope +} diff --git a/core/execution/execution.go b/core/execution/execution.go new file mode 100644 index 0000000..e69de29 diff --git a/core/portfolio/portfolio.go b/core/portfolio/portfolio.go new file mode 100644 index 0000000..e69de29 diff --git a/core/risk/risk.go b/core/risk/risk.go new file mode 100644 index 0000000..e69de29 diff --git a/core/strategy/strategy.go b/core/strategy/strategy.go new file mode 100644 index 0000000..e69de29 diff --git a/core/worker/worker.go b/core/worker/worker.go new file mode 100644 index 0000000..e69de29 diff --git a/go.mod b/go.mod index f803688..ac1743e 100644 --- a/go.mod +++ b/go.mod @@ -1,18 +1,3 @@ module gitlab.michelsen.id/phillmichelsen/tessera go 1.25.1 - -require ( - github.com/coder/websocket v1.8.14 - github.com/google/uuid v1.6.0 - github.com/lmittmann/tint v1.1.2 - google.golang.org/grpc v1.75.0 - google.golang.org/protobuf v1.36.8 -) - -require ( - golang.org/x/net v0.43.0 // indirect - golang.org/x/sys v0.35.0 // indirect - golang.org/x/text v0.28.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250826171959-ef028d996bc1 // indirect -) diff --git a/go.sum b/go.sum index 9d0e5c6..e69de29 100644 --- a/go.sum +++ b/go.sum @@ -1,40 +0,0 @@ -github.com/coder/websocket v1.8.14 h1:9L0p0iKiNOibykf283eHkKUHHrpG7f65OE3BhhO7v9g= -github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg= -github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= -github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= -github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= -github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= -github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= -github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= -github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= -github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/lmittmann/tint v1.1.2 h1:2CQzrL6rslrsyjqLDwD11bZ5OpLBPU+g3G/r5LSfS8w= -github.com/lmittmann/tint v1.1.2/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= -go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= -go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= -go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ= -go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I= -go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE= -go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E= -go.opentelemetry.io/otel/sdk v1.37.0 h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI= -go.opentelemetry.io/otel/sdk v1.37.0/go.mod h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg= -go.opentelemetry.io/otel/sdk/metric v1.37.0 h1:90lI228XrB9jCMuSdA0673aubgRobVZFhbjxHHspCPc= -go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps= -go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4= -go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0= -golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= -golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= -golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= -golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= -golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= -golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= -gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= -gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250826171959-ef028d996bc1 h1:pmJpJEvT846VzausCQ5d7KreSROcDqmO388w5YbnltA= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250826171959-ef028d996bc1/go.mod h1:GmFNa4BdJZ2a8G+wCe9Bg3wwThLrJun751XstdJt5Og= -google.golang.org/grpc v1.75.0 h1:+TW+dqTd2Biwe6KKfhE5JpiYIBWq865PhKGSXiivqt4= -google.golang.org/grpc v1.75.0/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ= -google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= -google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= diff --git a/pkg/pb/data_service/data_service.pb.go b/pkg/pb/data_service/data_service.pb.go deleted file mode 100644 index b53c0e3..0000000 --- a/pkg/pb/data_service/data_service.pb.go +++ /dev/null @@ -1,523 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.36.6 -// protoc v6.32.0 -// source: pkg/pb/data_service/data_service.proto - -package data_service - -import ( - reflect "reflect" - sync "sync" - unsafe "unsafe" - - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type Identifier struct { - state protoimpl.MessageState `protogen:"open.v1"` - Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *Identifier) Reset() { - *x = Identifier{} - mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *Identifier) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Identifier) ProtoMessage() {} - -func (x *Identifier) ProtoReflect() protoreflect.Message { - mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[0] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Identifier.ProtoReflect.Descriptor instead. -func (*Identifier) Descriptor() ([]byte, []int) { - return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{0} -} - -func (x *Identifier) GetKey() string { - if x != nil { - return x.Key - } - return "" -} - -type Message struct { - state protoimpl.MessageState `protogen:"open.v1"` - Identifier *Identifier `protobuf:"bytes,1,opt,name=identifier,proto3" json:"identifier,omitempty"` - Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` - Encoding string `protobuf:"bytes,3,opt,name=encoding,proto3" json:"encoding,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *Message) Reset() { - *x = Message{} - mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *Message) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Message) ProtoMessage() {} - -func (x *Message) ProtoReflect() protoreflect.Message { - mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[1] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Message.ProtoReflect.Descriptor instead. -func (*Message) Descriptor() ([]byte, []int) { - return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{1} -} - -func (x *Message) GetIdentifier() *Identifier { - if x != nil { - return x.Identifier - } - return nil -} - -func (x *Message) GetPayload() []byte { - if x != nil { - return x.Payload - } - return nil -} - -func (x *Message) GetEncoding() string { - if x != nil { - return x.Encoding - } - return "" -} - -type StartStreamRequest struct { - state protoimpl.MessageState `protogen:"open.v1"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *StartStreamRequest) Reset() { - *x = StartStreamRequest{} - mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *StartStreamRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*StartStreamRequest) ProtoMessage() {} - -func (x *StartStreamRequest) ProtoReflect() protoreflect.Message { - mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[2] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use StartStreamRequest.ProtoReflect.Descriptor instead. -func (*StartStreamRequest) Descriptor() ([]byte, []int) { - return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{2} -} - -type StartStreamResponse struct { - state protoimpl.MessageState `protogen:"open.v1"` - StreamUuid string `protobuf:"bytes,1,opt,name=stream_uuid,json=streamUuid,proto3" json:"stream_uuid,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *StartStreamResponse) Reset() { - *x = StartStreamResponse{} - mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[3] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *StartStreamResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*StartStreamResponse) ProtoMessage() {} - -func (x *StartStreamResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[3] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use StartStreamResponse.ProtoReflect.Descriptor instead. -func (*StartStreamResponse) Descriptor() ([]byte, []int) { - return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{3} -} - -func (x *StartStreamResponse) GetStreamUuid() string { - if x != nil { - return x.StreamUuid - } - return "" -} - -type ConfigureStreamRequest struct { - state protoimpl.MessageState `protogen:"open.v1"` - StreamUuid string `protobuf:"bytes,1,opt,name=stream_uuid,json=streamUuid,proto3" json:"stream_uuid,omitempty"` - Identifiers []*Identifier `protobuf:"bytes,2,rep,name=identifiers,proto3" json:"identifiers,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *ConfigureStreamRequest) Reset() { - *x = ConfigureStreamRequest{} - mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[4] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *ConfigureStreamRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ConfigureStreamRequest) ProtoMessage() {} - -func (x *ConfigureStreamRequest) ProtoReflect() protoreflect.Message { - mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[4] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ConfigureStreamRequest.ProtoReflect.Descriptor instead. -func (*ConfigureStreamRequest) Descriptor() ([]byte, []int) { - return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{4} -} - -func (x *ConfigureStreamRequest) GetStreamUuid() string { - if x != nil { - return x.StreamUuid - } - return "" -} - -func (x *ConfigureStreamRequest) GetIdentifiers() []*Identifier { - if x != nil { - return x.Identifiers - } - return nil -} - -type ConfigureStreamResponse struct { - state protoimpl.MessageState `protogen:"open.v1"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *ConfigureStreamResponse) Reset() { - *x = ConfigureStreamResponse{} - mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[5] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *ConfigureStreamResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ConfigureStreamResponse) ProtoMessage() {} - -func (x *ConfigureStreamResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[5] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ConfigureStreamResponse.ProtoReflect.Descriptor instead. -func (*ConfigureStreamResponse) Descriptor() ([]byte, []int) { - return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{5} -} - -type StopStreamRequest struct { - state protoimpl.MessageState `protogen:"open.v1"` - StreamUuid string `protobuf:"bytes,1,opt,name=stream_uuid,json=streamUuid,proto3" json:"stream_uuid,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *StopStreamRequest) Reset() { - *x = StopStreamRequest{} - mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[6] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *StopStreamRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*StopStreamRequest) ProtoMessage() {} - -func (x *StopStreamRequest) ProtoReflect() protoreflect.Message { - mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[6] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use StopStreamRequest.ProtoReflect.Descriptor instead. -func (*StopStreamRequest) Descriptor() ([]byte, []int) { - return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{6} -} - -func (x *StopStreamRequest) GetStreamUuid() string { - if x != nil { - return x.StreamUuid - } - return "" -} - -type StopStreamResponse struct { - state protoimpl.MessageState `protogen:"open.v1"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *StopStreamResponse) Reset() { - *x = StopStreamResponse{} - mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[7] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *StopStreamResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*StopStreamResponse) ProtoMessage() {} - -func (x *StopStreamResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[7] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use StopStreamResponse.ProtoReflect.Descriptor instead. -func (*StopStreamResponse) Descriptor() ([]byte, []int) { - return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{7} -} - -type ConnectStreamRequest struct { - state protoimpl.MessageState `protogen:"open.v1"` - StreamUuid string `protobuf:"bytes,1,opt,name=stream_uuid,json=streamUuid,proto3" json:"stream_uuid,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *ConnectStreamRequest) Reset() { - *x = ConnectStreamRequest{} - mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[8] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *ConnectStreamRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ConnectStreamRequest) ProtoMessage() {} - -func (x *ConnectStreamRequest) ProtoReflect() protoreflect.Message { - mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[8] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ConnectStreamRequest.ProtoReflect.Descriptor instead. -func (*ConnectStreamRequest) Descriptor() ([]byte, []int) { - return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{8} -} - -func (x *ConnectStreamRequest) GetStreamUuid() string { - if x != nil { - return x.StreamUuid - } - return "" -} - -var File_pkg_pb_data_service_data_service_proto protoreflect.FileDescriptor - -const file_pkg_pb_data_service_data_service_proto_rawDesc = "" + - "\n" + - "&pkg/pb/data_service/data_service.proto\x12\fdata_service\"\x1e\n" + - "\n" + - "Identifier\x12\x10\n" + - "\x03key\x18\x01 \x01(\tR\x03key\"y\n" + - "\aMessage\x128\n" + - "\n" + - "identifier\x18\x01 \x01(\v2\x18.data_service.IdentifierR\n" + - "identifier\x12\x18\n" + - "\apayload\x18\x02 \x01(\fR\apayload\x12\x1a\n" + - "\bencoding\x18\x03 \x01(\tR\bencoding\"\x14\n" + - "\x12StartStreamRequest\"6\n" + - "\x13StartStreamResponse\x12\x1f\n" + - "\vstream_uuid\x18\x01 \x01(\tR\n" + - "streamUuid\"u\n" + - "\x16ConfigureStreamRequest\x12\x1f\n" + - "\vstream_uuid\x18\x01 \x01(\tR\n" + - "streamUuid\x12:\n" + - "\videntifiers\x18\x02 \x03(\v2\x18.data_service.IdentifierR\videntifiers\"\x19\n" + - "\x17ConfigureStreamResponse\"4\n" + - "\x11StopStreamRequest\x12\x1f\n" + - "\vstream_uuid\x18\x01 \x01(\tR\n" + - "streamUuid\"\x14\n" + - "\x12StopStreamResponse\"7\n" + - "\x14ConnectStreamRequest\x12\x1f\n" + - "\vstream_uuid\x18\x01 \x01(\tR\n" + - "streamUuid2\x99\x02\n" + - "\x12DataServiceControl\x12R\n" + - "\vStartStream\x12 .data_service.StartStreamRequest\x1a!.data_service.StartStreamResponse\x12O\n" + - "\n" + - "StopStream\x12\x1f.data_service.StopStreamRequest\x1a .data_service.StopStreamResponse\x12^\n" + - "\x0fConfigureStream\x12$.data_service.ConfigureStreamRequest\x1a%.data_service.ConfigureStreamResponse2d\n" + - "\x14DataServiceStreaming\x12L\n" + - "\rConnectStream\x12\".data_service.ConnectStreamRequest\x1a\x15.data_service.Message0\x01BMZKgitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service;data_serviceb\x06proto3" - -var ( - file_pkg_pb_data_service_data_service_proto_rawDescOnce sync.Once - file_pkg_pb_data_service_data_service_proto_rawDescData []byte -) - -func file_pkg_pb_data_service_data_service_proto_rawDescGZIP() []byte { - file_pkg_pb_data_service_data_service_proto_rawDescOnce.Do(func() { - file_pkg_pb_data_service_data_service_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_pkg_pb_data_service_data_service_proto_rawDesc), len(file_pkg_pb_data_service_data_service_proto_rawDesc))) - }) - return file_pkg_pb_data_service_data_service_proto_rawDescData -} - -var file_pkg_pb_data_service_data_service_proto_msgTypes = make([]protoimpl.MessageInfo, 9) -var file_pkg_pb_data_service_data_service_proto_goTypes = []any{ - (*Identifier)(nil), // 0: data_service.Identifier - (*Message)(nil), // 1: data_service.Message - (*StartStreamRequest)(nil), // 2: data_service.StartStreamRequest - (*StartStreamResponse)(nil), // 3: data_service.StartStreamResponse - (*ConfigureStreamRequest)(nil), // 4: data_service.ConfigureStreamRequest - (*ConfigureStreamResponse)(nil), // 5: data_service.ConfigureStreamResponse - (*StopStreamRequest)(nil), // 6: data_service.StopStreamRequest - (*StopStreamResponse)(nil), // 7: data_service.StopStreamResponse - (*ConnectStreamRequest)(nil), // 8: data_service.ConnectStreamRequest -} -var file_pkg_pb_data_service_data_service_proto_depIdxs = []int32{ - 0, // 0: data_service.Message.identifier:type_name -> data_service.Identifier - 0, // 1: data_service.ConfigureStreamRequest.identifiers:type_name -> data_service.Identifier - 2, // 2: data_service.DataServiceControl.StartStream:input_type -> data_service.StartStreamRequest - 6, // 3: data_service.DataServiceControl.StopStream:input_type -> data_service.StopStreamRequest - 4, // 4: data_service.DataServiceControl.ConfigureStream:input_type -> data_service.ConfigureStreamRequest - 8, // 5: data_service.DataServiceStreaming.ConnectStream:input_type -> data_service.ConnectStreamRequest - 3, // 6: data_service.DataServiceControl.StartStream:output_type -> data_service.StartStreamResponse - 7, // 7: data_service.DataServiceControl.StopStream:output_type -> data_service.StopStreamResponse - 5, // 8: data_service.DataServiceControl.ConfigureStream:output_type -> data_service.ConfigureStreamResponse - 1, // 9: data_service.DataServiceStreaming.ConnectStream:output_type -> data_service.Message - 6, // [6:10] is the sub-list for method output_type - 2, // [2:6] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name -} - -func init() { file_pkg_pb_data_service_data_service_proto_init() } -func file_pkg_pb_data_service_data_service_proto_init() { - if File_pkg_pb_data_service_data_service_proto != nil { - return - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: unsafe.Slice(unsafe.StringData(file_pkg_pb_data_service_data_service_proto_rawDesc), len(file_pkg_pb_data_service_data_service_proto_rawDesc)), - NumEnums: 0, - NumMessages: 9, - NumExtensions: 0, - NumServices: 2, - }, - GoTypes: file_pkg_pb_data_service_data_service_proto_goTypes, - DependencyIndexes: file_pkg_pb_data_service_data_service_proto_depIdxs, - MessageInfos: file_pkg_pb_data_service_data_service_proto_msgTypes, - }.Build() - File_pkg_pb_data_service_data_service_proto = out.File - file_pkg_pb_data_service_data_service_proto_goTypes = nil - file_pkg_pb_data_service_data_service_proto_depIdxs = nil -} diff --git a/pkg/pb/data_service/data_service.proto b/pkg/pb/data_service/data_service.proto deleted file mode 100644 index 47ff69e..0000000 --- a/pkg/pb/data_service/data_service.proto +++ /dev/null @@ -1,43 +0,0 @@ -syntax = "proto3"; - -package data_service; - -option go_package = "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service;data_service"; - -service DataServiceControl { - rpc StartStream(StartStreamRequest) returns (StartStreamResponse); - rpc StopStream(StopStreamRequest) returns (StopStreamResponse); - rpc ConfigureStream(ConfigureStreamRequest) returns (ConfigureStreamResponse); -} - -service DataServiceStreaming { - rpc ConnectStream(ConnectStreamRequest) returns (stream Message); -} - -message Identifier { - string key = 1; -} - -message Pattern { - string key = 2; -} - -message Message { - Identifier identifier = 1; - bytes payload = 2; - string encoding = 3; -} - -message StartStreamRequest {} -message StartStreamResponse { string stream_uuid = 1; } - -message ConfigureStreamRequest { - string stream_uuid = 1; - repeated Pattern patterns = 2; -} -message ConfigureStreamResponse {} - -message StopStreamRequest { string stream_uuid = 1; } -message StopStreamResponse {} - -message ConnectStreamRequest { string stream_uuid = 1; } diff --git a/pkg/pb/data_service/data_service_grpc.pb.go b/pkg/pb/data_service/data_service_grpc.pb.go deleted file mode 100644 index ab580a0..0000000 --- a/pkg/pb/data_service/data_service_grpc.pb.go +++ /dev/null @@ -1,303 +0,0 @@ -// Code generated by protoc-gen-go-grpc. DO NOT EDIT. -// versions: -// - protoc-gen-go-grpc v1.5.1 -// - protoc v6.32.0 -// source: pkg/pb/data_service/data_service.proto - -package data_service - -import ( - context "context" - - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" -) - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.64.0 or later. -const _ = grpc.SupportPackageIsVersion9 - -const ( - DataServiceControl_StartStream_FullMethodName = "/data_service.DataServiceControl/StartStream" - DataServiceControl_StopStream_FullMethodName = "/data_service.DataServiceControl/StopStream" - DataServiceControl_ConfigureStream_FullMethodName = "/data_service.DataServiceControl/ConfigureStream" -) - -// DataServiceControlClient is the client API for DataServiceControl service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. -type DataServiceControlClient interface { - StartStream(ctx context.Context, in *StartStreamRequest, opts ...grpc.CallOption) (*StartStreamResponse, error) - StopStream(ctx context.Context, in *StopStreamRequest, opts ...grpc.CallOption) (*StopStreamResponse, error) - ConfigureStream(ctx context.Context, in *ConfigureStreamRequest, opts ...grpc.CallOption) (*ConfigureStreamResponse, error) -} - -type dataServiceControlClient struct { - cc grpc.ClientConnInterface -} - -func NewDataServiceControlClient(cc grpc.ClientConnInterface) DataServiceControlClient { - return &dataServiceControlClient{cc} -} - -func (c *dataServiceControlClient) StartStream(ctx context.Context, in *StartStreamRequest, opts ...grpc.CallOption) (*StartStreamResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - out := new(StartStreamResponse) - err := c.cc.Invoke(ctx, DataServiceControl_StartStream_FullMethodName, in, out, cOpts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *dataServiceControlClient) StopStream(ctx context.Context, in *StopStreamRequest, opts ...grpc.CallOption) (*StopStreamResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - out := new(StopStreamResponse) - err := c.cc.Invoke(ctx, DataServiceControl_StopStream_FullMethodName, in, out, cOpts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *dataServiceControlClient) ConfigureStream(ctx context.Context, in *ConfigureStreamRequest, opts ...grpc.CallOption) (*ConfigureStreamResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - out := new(ConfigureStreamResponse) - err := c.cc.Invoke(ctx, DataServiceControl_ConfigureStream_FullMethodName, in, out, cOpts...) - if err != nil { - return nil, err - } - return out, nil -} - -// DataServiceControlServer is the server API for DataServiceControl service. -// All implementations must embed UnimplementedDataServiceControlServer -// for forward compatibility. -type DataServiceControlServer interface { - StartStream(context.Context, *StartStreamRequest) (*StartStreamResponse, error) - StopStream(context.Context, *StopStreamRequest) (*StopStreamResponse, error) - ConfigureStream(context.Context, *ConfigureStreamRequest) (*ConfigureStreamResponse, error) - mustEmbedUnimplementedDataServiceControlServer() -} - -// UnimplementedDataServiceControlServer must be embedded to have -// forward compatible implementations. -// -// NOTE: this should be embedded by value instead of pointer to avoid a nil -// pointer dereference when methods are called. -type UnimplementedDataServiceControlServer struct{} - -func (UnimplementedDataServiceControlServer) StartStream(context.Context, *StartStreamRequest) (*StartStreamResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method StartStream not implemented") -} -func (UnimplementedDataServiceControlServer) StopStream(context.Context, *StopStreamRequest) (*StopStreamResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method StopStream not implemented") -} -func (UnimplementedDataServiceControlServer) ConfigureStream(context.Context, *ConfigureStreamRequest) (*ConfigureStreamResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method ConfigureStream not implemented") -} -func (UnimplementedDataServiceControlServer) mustEmbedUnimplementedDataServiceControlServer() {} -func (UnimplementedDataServiceControlServer) testEmbeddedByValue() {} - -// UnsafeDataServiceControlServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to DataServiceControlServer will -// result in compilation errors. -type UnsafeDataServiceControlServer interface { - mustEmbedUnimplementedDataServiceControlServer() -} - -func RegisterDataServiceControlServer(s grpc.ServiceRegistrar, srv DataServiceControlServer) { - // If the following call pancis, it indicates UnimplementedDataServiceControlServer was - // embedded by pointer and is nil. This will cause panics if an - // unimplemented method is ever invoked, so we test this at initialization - // time to prevent it from happening at runtime later due to I/O. - if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { - t.testEmbeddedByValue() - } - s.RegisterService(&DataServiceControl_ServiceDesc, srv) -} - -func _DataServiceControl_StartStream_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(StartStreamRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(DataServiceControlServer).StartStream(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: DataServiceControl_StartStream_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(DataServiceControlServer).StartStream(ctx, req.(*StartStreamRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _DataServiceControl_StopStream_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(StopStreamRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(DataServiceControlServer).StopStream(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: DataServiceControl_StopStream_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(DataServiceControlServer).StopStream(ctx, req.(*StopStreamRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _DataServiceControl_ConfigureStream_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(ConfigureStreamRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(DataServiceControlServer).ConfigureStream(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: DataServiceControl_ConfigureStream_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(DataServiceControlServer).ConfigureStream(ctx, req.(*ConfigureStreamRequest)) - } - return interceptor(ctx, in, info, handler) -} - -// DataServiceControl_ServiceDesc is the grpc.ServiceDesc for DataServiceControl service. -// It's only intended for direct use with grpc.RegisterService, -// and not to be introspected or modified (even as a copy) -var DataServiceControl_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "data_service.DataServiceControl", - HandlerType: (*DataServiceControlServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "StartStream", - Handler: _DataServiceControl_StartStream_Handler, - }, - { - MethodName: "StopStream", - Handler: _DataServiceControl_StopStream_Handler, - }, - { - MethodName: "ConfigureStream", - Handler: _DataServiceControl_ConfigureStream_Handler, - }, - }, - Streams: []grpc.StreamDesc{}, - Metadata: "pkg/pb/data_service/data_service.proto", -} - -const ( - DataServiceStreaming_ConnectStream_FullMethodName = "/data_service.DataServiceStreaming/ConnectStream" -) - -// DataServiceStreamingClient is the client API for DataServiceStreaming service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. -type DataServiceStreamingClient interface { - ConnectStream(ctx context.Context, in *ConnectStreamRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Message], error) -} - -type dataServiceStreamingClient struct { - cc grpc.ClientConnInterface -} - -func NewDataServiceStreamingClient(cc grpc.ClientConnInterface) DataServiceStreamingClient { - return &dataServiceStreamingClient{cc} -} - -func (c *dataServiceStreamingClient) ConnectStream(ctx context.Context, in *ConnectStreamRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Message], error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - stream, err := c.cc.NewStream(ctx, &DataServiceStreaming_ServiceDesc.Streams[0], DataServiceStreaming_ConnectStream_FullMethodName, cOpts...) - if err != nil { - return nil, err - } - x := &grpc.GenericClientStream[ConnectStreamRequest, Message]{ClientStream: stream} - if err := x.ClientStream.SendMsg(in); err != nil { - return nil, err - } - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - return x, nil -} - -// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. -type DataServiceStreaming_ConnectStreamClient = grpc.ServerStreamingClient[Message] - -// DataServiceStreamingServer is the server API for DataServiceStreaming service. -// All implementations must embed UnimplementedDataServiceStreamingServer -// for forward compatibility. -type DataServiceStreamingServer interface { - ConnectStream(*ConnectStreamRequest, grpc.ServerStreamingServer[Message]) error - mustEmbedUnimplementedDataServiceStreamingServer() -} - -// UnimplementedDataServiceStreamingServer must be embedded to have -// forward compatible implementations. -// -// NOTE: this should be embedded by value instead of pointer to avoid a nil -// pointer dereference when methods are called. -type UnimplementedDataServiceStreamingServer struct{} - -func (UnimplementedDataServiceStreamingServer) ConnectStream(*ConnectStreamRequest, grpc.ServerStreamingServer[Message]) error { - return status.Errorf(codes.Unimplemented, "method ConnectStream not implemented") -} -func (UnimplementedDataServiceStreamingServer) mustEmbedUnimplementedDataServiceStreamingServer() {} -func (UnimplementedDataServiceStreamingServer) testEmbeddedByValue() {} - -// UnsafeDataServiceStreamingServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to DataServiceStreamingServer will -// result in compilation errors. -type UnsafeDataServiceStreamingServer interface { - mustEmbedUnimplementedDataServiceStreamingServer() -} - -func RegisterDataServiceStreamingServer(s grpc.ServiceRegistrar, srv DataServiceStreamingServer) { - // If the following call pancis, it indicates UnimplementedDataServiceStreamingServer was - // embedded by pointer and is nil. This will cause panics if an - // unimplemented method is ever invoked, so we test this at initialization - // time to prevent it from happening at runtime later due to I/O. - if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { - t.testEmbeddedByValue() - } - s.RegisterService(&DataServiceStreaming_ServiceDesc, srv) -} - -func _DataServiceStreaming_ConnectStream_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(ConnectStreamRequest) - if err := stream.RecvMsg(m); err != nil { - return err - } - return srv.(DataServiceStreamingServer).ConnectStream(m, &grpc.GenericServerStream[ConnectStreamRequest, Message]{ServerStream: stream}) -} - -// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. -type DataServiceStreaming_ConnectStreamServer = grpc.ServerStreamingServer[Message] - -// DataServiceStreaming_ServiceDesc is the grpc.ServiceDesc for DataServiceStreaming service. -// It's only intended for direct use with grpc.RegisterService, -// and not to be introspected or modified (even as a copy) -var DataServiceStreaming_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "data_service.DataServiceStreaming", - HandlerType: (*DataServiceStreamingServer)(nil), - Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{ - { - StreamName: "ConnectStream", - Handler: _DataServiceStreaming_ConnectStream_Handler, - ServerStreams: true, - }, - }, - Metadata: "pkg/pb/data_service/data_service.proto", -} diff --git a/services/controller_service/cmd/main.go b/services/controller_service/cmd/main.go deleted file mode 100644 index 1d619dd..0000000 --- a/services/controller_service/cmd/main.go +++ /dev/null @@ -1 +0,0 @@ -package cmd diff --git a/services/data_service/Dockerfile b/services/data_service/Dockerfile deleted file mode 100644 index 0385164..0000000 --- a/services/data_service/Dockerfile +++ /dev/null @@ -1,21 +0,0 @@ -# ---- Builder ---- -FROM golang:1.24-alpine AS builder - -ENV CGO_ENABLED=0 GOOS=linux -WORKDIR /app - -COPY go.mod go.sum ./ -RUN go mod download - -COPY . . -RUN --mount=type=cache,target=/root/.cache/go-build \ - --mount=type=cache,target=/go/pkg/mod \ - go build -trimpath -ldflags="-s -w" \ - -o /out/data-service ./services/data_service/cmd/data_service - -# ---- Runtime ---- -FROM gcr.io/distroless/static:nonroot -EXPOSE 50051 50052 6000 -COPY --from=builder /out/data-service /bin/data-service -USER nonroot:nonroot -ENTRYPOINT ["/bin/data-service"] diff --git a/services/data_service/cmd/data_service/main.go b/services/data_service/cmd/data_service/main.go deleted file mode 100644 index 49e8254..0000000 --- a/services/data_service/cmd/data_service/main.go +++ /dev/null @@ -1,58 +0,0 @@ -package main - -import ( - "log/slog" - "os" - "time" - - "github.com/lmittmann/tint" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/router" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/worker" -) - -func initLogger() *slog.Logger { - level := parseLevel(env("LOG_LEVEL", "debug")) - if env("LOG_FORMAT", "pretty") == "json" { - return slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ - Level: level, - })) - } - return slog.New(tint.NewHandler(os.Stdout, &tint.Options{ - Level: level, - TimeFormat: time.RFC3339Nano, - NoColor: os.Getenv("NO_COLOR") != "", - })) -} - -func parseLevel(s string) slog.Level { - switch s { - case "debug": - return slog.LevelDebug - case "warn": - return slog.LevelWarn - case "error": - return slog.LevelError - default: - return slog.LevelInfo - } -} - -func env(k, def string) string { - if v := os.Getenv(k); v != "" { - return v - } - return def -} - -func main() { - slog.SetDefault(initLogger()) - slog.Info("starting", "svc", "data-service") - - // Setup - wr := worker.NewRegistry() - r, _ := router.NewRouter("actor", 2048, 512) - _ = manager.NewManager(r, wr) - - select {} -} diff --git a/services/data_service/internal/config/config.go b/services/data_service/internal/config/config.go deleted file mode 100644 index d912156..0000000 --- a/services/data_service/internal/config/config.go +++ /dev/null @@ -1 +0,0 @@ -package config diff --git a/services/data_service/internal/domain/identifier.go b/services/data_service/internal/domain/identifier.go deleted file mode 100644 index 3e62b55..0000000 --- a/services/data_service/internal/domain/identifier.go +++ /dev/null @@ -1,261 +0,0 @@ -package domain - -import ( - "errors" - "sort" - "strings" -) - -var ErrBadIdentifier = errors.New("identifier: invalid format") - -// Identifier is an immutable canonical key. -// Canonical form: "namespace::tag1[] . tag2[k=v;foo=bar] . tag3[]" -type Identifier struct{ key string } - -// NewIdentifier builds a canonical key with strict validation. -// Tags and param keys are sorted. Tags with no params emit "name[]". -// Rejects on: empty namespace, bad tag names, bad keys/values. -func NewIdentifier(namespace string, tags map[string]map[string]string) (Identifier, error) { - ns := strings.TrimSpace(namespace) - if !validNamespace(ns) { - return Identifier{}, ErrBadIdentifier - } - - // Validate and copy to protect immutability and reject dup keys early. - clean := make(map[string]map[string]string, len(tags)) - for name, params := range tags { - n := strings.TrimSpace(name) - if !validIDTagName(n) { - return Identifier{}, ErrBadIdentifier - } - if _, exists := clean[n]; exists { - // impossible via map input, but keep the intent explicit - return Identifier{}, ErrBadIdentifier - } - if len(params) == 0 { - clean[n] = map[string]string{} - continue - } - dst := make(map[string]string, len(params)) - for k, v := range params { - kk := strings.TrimSpace(k) - vv := strings.TrimSpace(v) - if !validParamKey(kk) || !validIDParamValue(vv) { - return Identifier{}, ErrBadIdentifier - } - if _, dup := dst[kk]; dup { - return Identifier{}, ErrBadIdentifier - } - dst[kk] = vv - } - clean[n] = dst - } - - var b strings.Builder - // Rough capacity hint. - b.Grow(len(ns) + 2 + 16*len(clean) + 32) - - // namespace - b.WriteString(ns) - b.WriteString("::") - - // stable tag order - names := make([]string, 0, len(clean)) - for n := range clean { - names = append(names, n) - } - sort.Strings(names) - - for i, name := range names { - if i > 0 { - b.WriteByte('.') - } - b.WriteString(name) - - params := clean[name] - if len(params) == 0 { - b.WriteString("[]") - continue - } - - // stable param order - keys := make([]string, 0, len(params)) - for k := range params { - keys = append(keys, k) - } - sort.Strings(keys) - - b.WriteByte('[') - for j, k := range keys { - if j > 0 { - b.WriteByte(';') - } - b.WriteString(k) - b.WriteByte('=') - b.WriteString(params[k]) - } - b.WriteByte(']') - } - - return Identifier{key: b.String()}, nil -} - -// NewIdentifierFromRaw wraps a raw key without validation. -func NewIdentifierFromRaw(raw string) Identifier { return Identifier{key: raw} } - -// Key returns the canonical key string. -func (id Identifier) Key() string { return id.key } - -// Parse returns namespace and tags from Key. -// Accepts "tag" (bare) as "tag[]". Emits "name[]"/"[k=v;...]". First token wins on duplicates. -func (id Identifier) Parse() (string, map[string]map[string]string, error) { - k := id.key - - // namespace - i := strings.Index(k, "::") - if i <= 0 { - return "", nil, ErrBadIdentifier - } - ns := strings.TrimSpace(k[:i]) - if !validNamespace(ns) { - return "", nil, ErrBadIdentifier - } - raw := k[i+2:] - - tags := make(map[string]map[string]string, 8) - if raw == "" { - return ns, tags, nil - } - - for tok := range strings.SplitSeq(raw, ".") { - tok = strings.TrimSpace(tok) - if tok == "" { - continue - } - - lb := strings.IndexByte(tok, '[') - if lb == -1 { - // bare tag => empty params - name := strings.TrimSpace(tok) - if !validIDTagName(name) { - return "", nil, ErrBadIdentifier - } - if _, exists := tags[name]; !exists { - tags[name] = map[string]string{} - } - continue - } - - rb := strings.LastIndexByte(tok, ']') - if rb == -1 || rb < lb || rb != len(tok)-1 { - return "", nil, ErrBadIdentifier - } - - name := strings.TrimSpace(tok[:lb]) - if !validIDTagName(name) { - return "", nil, ErrBadIdentifier - } - // first tag wins - if _, exists := tags[name]; exists { - continue - } - - body := tok[lb+1 : rb] - // forbid outer whitespace like "[ x=1 ]" - if body != strings.TrimSpace(body) { - return "", nil, ErrBadIdentifier - } - if body == "" { - tags[name] = map[string]string{} - continue - } - - // parse "k=v;foo=bar" - params := make(map[string]string, 4) - for pair := range strings.SplitSeq(body, ";") { - pair = strings.TrimSpace(pair) - if pair == "" { - continue - } - kv := strings.SplitN(pair, "=", 2) - if len(kv) != 2 { - return "", nil, ErrBadIdentifier - } - key := strings.TrimSpace(kv[0]) - val := strings.TrimSpace(kv[1]) - if !validParamKey(key) || !validIDParamValue(val) || val == "" { - return "", nil, ErrBadIdentifier - } - // first key wins - if _, dup := params[key]; !dup { - params[key] = val - } - } - tags[name] = params - } - - return ns, tags, nil -} - -// --- validation helpers --- - -func validNamespace(s string) bool { - if s == "" { - return false - } - for _, r := range s { - switch r { - case '[', ']', ':': - return false - } - if isSpace(r) { - return false - } - } - return true -} - -func validIDTagName(s string) bool { - if s == "" { - return false - } - for _, r := range s { - switch r { - case '[', ']', '.', ':': // added ':' - return false - } - if isSpace(r) { - return false - } - } - return true -} - -func validParamKey(s string) bool { - if s == "" { - return false - } - for _, r := range s { - switch r { - case '[', ']', ';', '=': - return false - } - if isSpace(r) { - return false - } - } - return true -} - -func validIDParamValue(s string) bool { - // allow spaces; forbid only bracket, pair, and kv delimiters - for _, r := range s { - switch r { - case '[', ']', ';', '=': - return false - } - } - return true -} - -func isSpace(r rune) bool { return r == ' ' || r == '\t' || r == '\n' || r == '\r' } diff --git a/services/data_service/internal/domain/identifier_test.go b/services/data_service/internal/domain/identifier_test.go deleted file mode 100644 index 130cff5..0000000 --- a/services/data_service/internal/domain/identifier_test.go +++ /dev/null @@ -1,141 +0,0 @@ -package domain - -import ( - "reflect" - "testing" -) - -func TestNewIdentifier_CanonicalAndValidation(t *testing.T) { - t.Run("canonical ordering and formatting", func(t *testing.T) { - id, err := NewIdentifier("ns", map[string]map[string]string{ - "b": {"y": "2", "x": "1"}, - "a": {}, - }) - if err != nil { - t.Fatalf("unexpected err: %v", err) - } - got := id.Key() - want := "ns::a[].b[x=1;y=2]" - if got != want { - t.Fatalf("key mismatch\ngot: %q\nwant: %q", got, want) - } - }) - - t.Run("trim whitespace and validate", func(t *testing.T) { - id, err := NewIdentifier(" ns ", map[string]map[string]string{ - " tag ": {" k ": " v "}, - }) - if err != nil { - t.Fatalf("unexpected err: %v", err) - } - if id.Key() != "ns::tag[k=v]" { - t.Fatalf("unexpected canonical: %q", id.Key()) - } - }) - - t.Run("reject bad namespace", func(t *testing.T) { - cases := []string{"", "a:b", "a[b]"} - for _, ns := range cases { - if _, err := NewIdentifier(ns, nil); err == nil { - t.Fatalf("expected error for ns=%q", ns) - } - } - }) - - t.Run("reject bad tag names", func(t *testing.T) { - for _, name := range []string{"", "bad.", "bad[", "bad]", "a:b"} { - _, err := NewIdentifier("ns", map[string]map[string]string{ - name: {}, - }) - if err == nil { - t.Fatalf("expected error for tag name %q", name) - } - } - }) - - t.Run("reject bad param keys and values", func(t *testing.T) { - badKeys := []string{"", "k;", "k[", "k]", "k="} - for _, k := range badKeys { - if _, err := NewIdentifier("ns", map[string]map[string]string{ - "t": {k: "ok"}, - }); err == nil { - t.Fatalf("expected error for bad key %q", k) - } - } - for _, v := range []string{"bad;", "bad[", "bad]", "a=b"} { - if _, err := NewIdentifier("ns", map[string]map[string]string{ - "t": {"k": v}, - }); err == nil { - t.Fatalf("expected error for bad value %q", v) - } - } - }) -} - -func TestIdentifier_Parse_RoundTripAndTolerance(t *testing.T) { - t.Run("round trip from constructor", func(t *testing.T) { - id, err := NewIdentifier("ns", map[string]map[string]string{ - "a": {}, - "b": {"x": "1", "y": "2"}, - }) - if err != nil { - t.Fatal(err) - } - ns, tags, err := id.Parse() - if err != nil { - t.Fatal(err) - } - if ns != "ns" { - t.Fatalf("ns: got %q", ns) - } - want := map[string]map[string]string{"a": {}, "b": {"x": "1", "y": "2"}} - if !reflect.DeepEqual(tags, want) { - t.Fatalf("tags mismatch\ngot: %#v\nwant: %#v", tags, want) - } - }) - - t.Run("parse bare tag as empty params", func(t *testing.T) { - id := NewIdentifierFromRaw("ns::a.b[]") - _, tags, err := id.Parse() - if err != nil { - t.Fatal(err) - } - if len(tags["a"]) != 0 || len(tags["b"]) != 0 { - t.Fatalf("expected empty params, got %#v", tags) - } - }) - - t.Run("first token wins on duplicate tags and params", func(t *testing.T) { - id := NewIdentifierFromRaw("ns::t[x=1;y=2].t[x=9].u[k=1;k=2]") - _, tags, err := id.Parse() - if err != nil { - t.Fatal(err) - } - if tags["t"]["x"] != "1" || tags["t"]["y"] != "2" { - t.Fatalf("first tag should win, got %#v", tags["t"]) - } - if tags["u"]["k"] != "1" { - t.Fatalf("first param key should win, got %#v", tags["u"]) - } - }) - - t.Run("reject malformed", func(t *testing.T) { - bads := []string{ - "", "no_ns", "ns:onecolon", "::missingns::tag[]", "ns::tag[", "ns::tag]", "ns::[]", - "ns::tag[]junk", "ns::tag[x=1;y]", "ns::tag[=1]", "ns::tag[ x=1 ]", // spaces inside keys are rejected - } - for _, s := range bads { - _, _, err := NewIdentifierFromRaw(s).Parse() - if err == nil { - t.Fatalf("expected parse error for %q", s) - } - } - }) -} - -func TestIdentifier_NewThenParse_ForbidsColonInTagName(t *testing.T) { - _, err := NewIdentifier("ns", map[string]map[string]string{"a:b": {}}) - if err == nil { - t.Fatal("expected error due to ':' in tag name") - } -} diff --git a/services/data_service/internal/domain/message.go b/services/data_service/internal/domain/message.go deleted file mode 100644 index 11ee6fc..0000000 --- a/services/data_service/internal/domain/message.go +++ /dev/null @@ -1,7 +0,0 @@ -// Package domain contains all key domain types -package domain - -type Message struct { - Identifier Identifier - Payload []byte -} diff --git a/services/data_service/internal/domain/pattern.go b/services/data_service/internal/domain/pattern.go deleted file mode 100644 index 15aa013..0000000 --- a/services/data_service/internal/domain/pattern.go +++ /dev/null @@ -1,347 +0,0 @@ -package domain - -import ( - "errors" - "sort" - "strings" -) - -var ErrBadPattern = errors.New("pattern: invalid format") - -// ParamMatchKind selects how a tag's params must match. -type ParamMatchKind uint8 - -const ( - MatchAny ParamMatchKind = iota // "tag" or "tag[*]" - MatchNone // "tag[]" - MatchExact // "tag[k=v;...]" -) - -// TagSpec is the per-tag constraint. -type TagSpec struct { - Kind ParamMatchKind - Params map[string]string // only for MatchExact; keys sorted on emit -} - -// Pattern is an immutable canonical key. -// Canonical form (tags unordered in input, sorted on emit): -// -// namespace::tag1[]:tag2[*]:tag3[k=v;foo=bar].* // superset -// namespace::tag1[]:tag2[*]:tag3[k=v;foo=bar] // exact set -type Pattern struct{ key string } - -// NewPattern builds the canonical key from structured input with strict validation. -// If a tag name equals "*" it sets superset and omits it from canonical tags. -func NewPattern(namespace string, tags map[string]TagSpec, superset bool) (Pattern, error) { - ns := strings.TrimSpace(namespace) - if !validNamespace(ns) { - return Pattern{}, ErrBadPattern - } - - // Validate tags and normalize. - clean := make(map[string]TagSpec, len(tags)) - for name, spec := range tags { - n := strings.TrimSpace(name) - if n == "*" { - superset = true - continue - } - if !validPatternTagName(n) { - return Pattern{}, ErrBadPattern - } - switch spec.Kind { - case MatchAny: - clean[n] = TagSpec{Kind: MatchAny} - case MatchNone: - clean[n] = TagSpec{Kind: MatchNone} - case MatchExact: - if len(spec.Params) == 0 { - // Treat empty exact as none. - clean[n] = TagSpec{Kind: MatchNone} - continue - } - dst := make(map[string]string, len(spec.Params)) - for k, v := range spec.Params { - kk := strings.TrimSpace(k) - vv := strings.TrimSpace(v) - if !validParamKey(kk) || !validPatternParamValue(vv) { - return Pattern{}, ErrBadPattern - } - if _, dup := dst[kk]; dup { - return Pattern{}, ErrBadPattern - } - dst[kk] = vv - } - clean[n] = TagSpec{Kind: MatchExact, Params: dst} - default: - // Reject unknown kinds rather than silently defaulting. - return Pattern{}, ErrBadPattern - } - } - - var b strings.Builder - b.Grow(len(ns) + 2 + 16*len(clean) + 32 + 2) - - b.WriteString(ns) - b.WriteString("::") - - names := make([]string, 0, len(clean)) - for n := range clean { - names = append(names, n) - } - sort.Strings(names) - - for i, name := range names { - if i > 0 { - b.WriteByte(':') - } - b.WriteString(name) - - spec := clean[name] - switch spec.Kind { - case MatchAny: - b.WriteString("[*]") - case MatchNone: - b.WriteString("[]") - case MatchExact: - keys := make([]string, 0, len(spec.Params)) - for k := range spec.Params { - keys = append(keys, k) - } - sort.Strings(keys) - b.WriteByte('[') - for j, k := range keys { - if j > 0 { - b.WriteByte(';') - } - b.WriteString(k) - b.WriteByte('=') - b.WriteString(spec.Params[k]) - } - b.WriteByte(']') - } - } - - if superset { - if len(names) > 0 { - b.WriteByte('.') - } - b.WriteByte('*') - } - return Pattern{key: b.String()}, nil -} - -// NewPatternFromRaw wraps a raw key without validation. -func NewPatternFromRaw(raw string) Pattern { return Pattern{key: raw} } - -// Key returns the canonical key string. -func (p Pattern) Key() string { return p.key } - -// Parse returns namespace, tag specs, and superset flag. -// Accepts tokens: "tag", "tag[*]", "tag[]", "tag[k=v;...]". Also accepts ".*" suffix or a ":*" token anywhere. -// First token wins on duplicate tag names; first key wins on duplicate params. -func (p Pattern) Parse() (string, map[string]TagSpec, bool, error) { - k := p.key - - // namespace - i := strings.Index(k, "::") - if i <= 0 { - return "", nil, false, ErrBadPattern - } - ns := strings.TrimSpace(k[:i]) - if !validNamespace(ns) { - return "", nil, false, ErrBadPattern - } - raw := k[i+2:] - - // suffix superset ".*" - superset := false - if strings.HasSuffix(raw, ".*") { - superset = true - raw = raw[:len(raw)-2] - } - - specs := make(map[string]TagSpec, 8) - if raw == "" { - return ns, specs, superset, nil - } - - for tok := range strings.SplitSeq(raw, ":") { - tok = strings.TrimSpace(tok) - if tok == "" { - continue - } - if tok == "*" { - superset = true - continue - } - - lb := strings.IndexByte(tok, '[') - if lb == -1 { - name := tok - if !validPatternTagName(name) { - return "", nil, false, ErrBadPattern - } - if _, exists := specs[name]; !exists { - specs[name] = TagSpec{Kind: MatchAny} - } - continue - } - - rb := strings.LastIndexByte(tok, ']') - if rb == -1 || rb < lb || rb != len(tok)-1 { - return "", nil, false, ErrBadPattern - } - - name := strings.TrimSpace(tok[:lb]) - if !validPatternTagName(name) { - return "", nil, false, ErrBadPattern - } - // first tag wins - if _, exists := specs[name]; exists { - continue - } - - rawBody := tok[lb+1 : rb] - // forbid outer whitespace like "[ x=1 ]" - if rawBody != strings.TrimSpace(rawBody) { - return "", nil, false, ErrBadPattern - } - body := strings.TrimSpace(rawBody) - - switch body { - case "": - specs[name] = TagSpec{Kind: MatchNone} - case "*": - specs[name] = TagSpec{Kind: MatchAny} - default: - params := make(map[string]string, 4) - for pair := range strings.SplitSeq(body, ";") { - pair = strings.TrimSpace(pair) - if pair == "" { - continue - } - kv := strings.SplitN(pair, "=", 2) - if len(kv) != 2 { - return "", nil, false, ErrBadPattern - } - key := strings.TrimSpace(kv[0]) - val := strings.TrimSpace(kv[1]) - if !validParamKey(key) || !validPatternParamValue(val) || val == "" { - return "", nil, false, ErrBadPattern - } - // first key wins - if _, dup := params[key]; !dup { - params[key] = val - } - } - specs[name] = TagSpec{Kind: MatchExact, Params: params} - } - } - - return ns, specs, superset, nil -} - -// Equal compares canonical keys. -func (p Pattern) Equal(q Pattern) bool { return p.key == q.key } - -// CompiledPattern is a parsed pattern optimized for matching. -type CompiledPattern struct { - ns string - superset bool - specs map[string]TagSpec -} - -// Compile parses and returns a compiled form. -func (p Pattern) Compile() (CompiledPattern, error) { - ns, specs, sup, err := p.Parse() - if err != nil { - return CompiledPattern{}, err - } - return CompiledPattern{ns: ns, specs: specs, superset: sup}, nil -} - -// Parse on CompiledPattern returns the structured contents without error. -func (cp CompiledPattern) Parse() (namespace string, tags map[string]TagSpec, superset bool) { - return cp.ns, cp.specs, cp.superset -} - -// Match parses id and tests it against the pattern. -// Returns false on parse error. -func (p Pattern) Match(id Identifier) bool { - cp, err := p.Compile() - if err != nil { - return false - } - return cp.Match(id) -} - -// Match tests id against the compiled pattern. -func (cp CompiledPattern) Match(id Identifier) bool { - ns, tags, err := id.Parse() - if err != nil || ns != cp.ns { - return false - } - - // All pattern tags must be satisfied. - for name, spec := range cp.specs { - params, ok := tags[name] - if !ok { - return false - } - switch spec.Kind { - case MatchAny: - // any or none is fine - case MatchNone: - if len(params) != 0 { - return false - } - case MatchExact: - if len(params) != len(spec.Params) { - return false - } - for k, v := range spec.Params { - if params[k] != v { - return false - } - } - default: - return false - } - } - - // If exact-set match, forbid extra tags. - if !cp.superset && len(tags) != len(cp.specs) { - return false - } - return true -} - -// --- validation helpers --- - -func validPatternTagName(s string) bool { - if s == "" || s == "*" { - return false - } - for _, r := range s { - switch r { - case '[', ']', ':': - return false - } - if isSpace(r) { - return false - } - } - return true -} - -func validPatternParamValue(s string) bool { - // allow spaces; forbid only bracket, pair, and kv delimiters - for _, r := range s { - switch r { - case '[', ']', ';', '=': - return false - } - } - return true -} diff --git a/services/data_service/internal/domain/pattern_test.go b/services/data_service/internal/domain/pattern_test.go deleted file mode 100644 index 2132212..0000000 --- a/services/data_service/internal/domain/pattern_test.go +++ /dev/null @@ -1,209 +0,0 @@ -package domain - -import ( - "reflect" - "testing" -) - -func TestNewPattern_Canonical_And_Superset(t *testing.T) { - t.Run("canonical ordering and kinds", func(t *testing.T) { - p, err := NewPattern("ns", map[string]TagSpec{ - "b": {Kind: MatchExact, Params: map[string]string{"y": "2", "x": "1"}}, - "a": {Kind: MatchNone}, - "c": {Kind: MatchAny}, - }, false) - if err != nil { - t.Fatal(err) - } - if got, want := p.Key(), "ns::a[]:b[x=1;y=2]:c[*]"; got != want { - t.Fatalf("got %q want %q", got, want) - } - }) - - t.Run("superset via flag", func(t *testing.T) { - p, err := NewPattern("ns", map[string]TagSpec{"a": {Kind: MatchNone}}, true) - if err != nil { - t.Fatal(err) - } - if got, want := p.Key(), "ns::a[].*"; got != want { - t.Fatalf("got %q want %q", got, want) - } - }) - - t.Run("superset via '*' tag anywhere", func(t *testing.T) { - p, err := NewPattern("ns", map[string]TagSpec{ - "*": {Kind: MatchAny}, // triggers superset; omitted from canonical - "a": {Kind: MatchNone}, - }, false) - if err != nil { - t.Fatal(err) - } - if got, want := p.Key(), "ns::a[].*"; got != want { - t.Fatalf("got %q want %q", got, want) - } - }) - - t.Run("trim and validate", func(t *testing.T) { - p, err := NewPattern(" ns ", map[string]TagSpec{ - " tag ": {Kind: MatchAny}, - }, false) - if err != nil { - t.Fatal(err) - } - if p.Key() != "ns::tag[*]" { - t.Fatalf("unexpected canonical: %q", p.Key()) - } - }) - - t.Run("reject invalid inputs", func(t *testing.T) { - _, err := NewPattern("", nil, false) - if err == nil { - t.Fatal("expected error for empty namespace") - } - _, err = NewPattern("ns", map[string]TagSpec{"": {Kind: MatchAny}}, false) - if err == nil { - t.Fatal("expected error for empty tag") - } - _, err = NewPattern("ns", map[string]TagSpec{"bad:": {Kind: MatchAny}}, false) - if err == nil { - t.Fatal("expected error for ':' in tag") - } - _, err = NewPattern("ns", map[string]TagSpec{"a": {Kind: MatchExact, Params: map[string]string{"": "1"}}}, false) - if err == nil { - t.Fatal("expected error for empty param key") - } - _, err = NewPattern("ns", map[string]TagSpec{"a": {Kind: MatchExact, Params: map[string]string{"k": "bad;val"}}}, false) - if err == nil { - t.Fatal("expected error for bad param value") - } - }) - - t.Run("MatchExact with empty params downgrades to []", func(t *testing.T) { - // Behavior matches current constructor: empty exact => MatchNone - p, err := NewPattern("ns", map[string]TagSpec{ - "a": {Kind: MatchExact, Params: map[string]string{}}, - }, false) - if err != nil { - t.Fatal(err) - } - if p.Key() != "ns::a[]" { - t.Fatalf("unexpected canonical for empty exact: %q", p.Key()) - } - }) -} - -func TestPattern_Parse_Tokens_And_SupersetRecognition(t *testing.T) { - t.Run("accept :* token and .*", func(t *testing.T) { - ns, specs, sup, err := NewPatternFromRaw("ns::a[]:*:b[*]").Parse() - if err != nil { - t.Fatal(err) - } - if ns != "ns" || !sup { - t.Fatalf("ns=%q sup=%v", ns, sup) - } - if specs["a"].Kind != MatchNone || specs["b"].Kind != MatchAny { - t.Fatalf("unexpected specs: %#v", specs) - } - - _, specs2, sup2, err := NewPatternFromRaw("ns::a[]:b[*].*").Parse() - if err != nil || !sup2 { - t.Fatalf("parse superset suffix failed: err=%v sup=%v", err, sup2) - } - if !reflect.DeepEqual(specs, specs2) { - t.Fatalf("specs mismatch between forms") - } - }) - - t.Run("first-wins on duplicate tags and params", func(t *testing.T) { - _, specs, sup, err := NewPatternFromRaw("ns::t[x=1;y=2]:t[*]:u[k=1;k=2]").Parse() - if err != nil || sup { - t.Fatalf("err=%v sup=%v", err, sup) - } - if specs["t"].Kind != MatchExact || specs["t"].Params["x"] != "1" { - t.Fatalf("first tag should win: %#v", specs["t"]) - } - if specs["u"].Params["k"] != "1" { - t.Fatalf("first param key should win: %#v", specs["u"]) - } - }) - - t.Run("reject malformed", func(t *testing.T) { - bads := []string{ - "", "no_ns", "ns:onecolon", "::missingns::tag[]", - "ns::tag[", "ns::tag]", "ns::[]", "ns::tag[]junk", - "ns::a[=1]", "ns::a[x=]", "ns::a[ x=1 ]", - } - for _, s := range bads { - _, _, _, err := NewPatternFromRaw(s).Parse() - if err == nil { - t.Fatalf("expected parse error for %q", s) - } - } - }) -} - -func TestPattern_Match_Matrix(t *testing.T) { - makeID := func(key string) Identifier { return NewIdentifierFromRaw(key) } - - t.Run("namespace mismatch", func(t *testing.T) { - p := NewPatternFromRaw("ns::a[]") - if p.Match(makeID("other::a[]")) { - t.Fatal("should not match different namespace") - } - }) - - t.Run("MatchAny accepts empty and nonempty", func(t *testing.T) { - p := NewPatternFromRaw("ns::a[*]") - if !p.Match(makeID("ns::a[]")) || !p.Match(makeID("ns::a[x=1]")) { - t.Fatal("MatchAny should accept both") - } - }) - - t.Run("MatchNone requires empty", func(t *testing.T) { - p := NewPatternFromRaw("ns::a[]") - if !p.Match(makeID("ns::a[]")) { - t.Fatal("empty should match") - } - if p.Match(makeID("ns::a[x=1]")) { - t.Fatal("nonempty should not match MatchNone") - } - }) - - t.Run("MatchExact equals, order independent", func(t *testing.T) { - p := NewPatternFromRaw("ns::a[x=1;y=2]") - if !p.Match(makeID("ns::a[y=2;x=1]")) { - t.Fatal("param order should not matter") - } - if p.Match(makeID("ns::a[x=1]")) { - t.Fatal("missing param should fail") - } - if p.Match(makeID("ns::a[x=1;y=2;z=3]")) { - t.Fatal("extra param should fail exact") - } - if p.Match(makeID("ns::a[x=9;y=2]")) { - t.Fatal("different value should fail") - } - }) - - t.Run("exact-set vs superset", func(t *testing.T) { - exact := NewPatternFromRaw("ns::a[]:b[*]") - super := NewPatternFromRaw("ns::a[]:b[*].*") - - if !exact.Match(makeID("ns::a[].b[x=1]")) { - t.Fatal("exact should match same set") - } - if exact.Match(makeID("ns::a[].b[x=1].c[]")) { - t.Fatal("exact should not allow extra tags") - } - if !super.Match(makeID("ns::a[].b[x=1].c[]")) { - t.Fatal("superset should allow extra tags") - } - }) - - t.Run("all pattern tags must exist", func(t *testing.T) { - p := NewPatternFromRaw("ns::a[]:b[*]") - if p.Match(makeID("ns::a[]")) { - t.Fatal("missing b should fail") - } - }) -} diff --git a/services/data_service/internal/manager/commands.go b/services/data_service/internal/manager/commands.go deleted file mode 100644 index aff8bde..0000000 --- a/services/data_service/internal/manager/commands.go +++ /dev/null @@ -1,73 +0,0 @@ -package manager - -import ( - "github.com/google/uuid" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" -) - -// Session Commands - -type createSessionCommand struct { - resp chan createSessionResult -} - -type createSessionResult struct { - sid uuid.UUID -} - -type leaseSessionReceiverCommand struct { - sid uuid.UUID - resp chan leaseSessionReceiverResult -} - -type leaseSessionReceiverResult struct { - receiveFunc func() (domain.Message, error) - err error -} - -type leaseSessionSenderCommand struct { - sid uuid.UUID - resp chan leaseSessionSenderResult -} - -type leaseSessionSenderResult struct { - sendFunc func(domain.Message) error - err error -} - -type releaseSessionReceiverCommand struct { - sid uuid.UUID - resp chan releaseSessionReceiverResult -} - -type releaseSessionReceiverResult struct { - err error -} - -type releaseSessionSenderCommand struct { - sid uuid.UUID - resp chan releaseSessionSenderResult -} - -type releaseSessionSenderResult struct { - err error -} - -type configureSessionCommand struct { - sid uuid.UUID - cfg any - resp chan configureSessionResult -} - -type configureSessionResult struct { - err error -} - -type closeSessionCommand struct { - sid uuid.UUID - resp chan closeSessionResult -} - -type closeSessionResult struct { - err error -} diff --git a/services/data_service/internal/manager/helper.go b/services/data_service/internal/manager/helper.go deleted file mode 100644 index 6187b07..0000000 --- a/services/data_service/internal/manager/helper.go +++ /dev/null @@ -1,27 +0,0 @@ -package manager - -func workerEntryKey(w WorkerEntry) string { - return w.Type + "|" + string(w.Spec) + "|" + string(w.Unit) -} - -func workerEntryDiffs(old, nw []WorkerEntry) (added, removed []WorkerEntry) { - oldKeys := make(map[string]struct{}, len(old)) - newKeys := make(map[string]struct{}, len(nw)) - - for _, w := range old { - oldKeys[workerEntryKey(w)] = struct{}{} - } - for _, w := range nw { - k := workerEntryKey(w) - newKeys[k] = struct{}{} - if _, ok := oldKeys[k]; !ok { - added = append(added, w) - } - } - for _, w := range old { - if _, ok := newKeys[workerEntryKey(w)]; !ok { - removed = append(removed, w) - } - } - return added, removed -} diff --git a/services/data_service/internal/manager/manager.go b/services/data_service/internal/manager/manager.go deleted file mode 100644 index 3a2c9c9..0000000 --- a/services/data_service/internal/manager/manager.go +++ /dev/null @@ -1,500 +0,0 @@ -// Package manager is the manager package!!! -package manager - -import ( - "errors" - "log/slog" - - "github.com/google/uuid" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/router" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/worker" -) - -var ErrSessionNotFound = errors.New("session not found") - -// Manager is a single-goroutine actor that owns all state. -type Manager struct { - cmdCh chan any - sessions map[uuid.UUID]*session - router *router.Router - - workerRegistry *WorkerRegistry - workerInstances map[string]map[string]worker.Worker - workerUnitRefCounts map[string]map[string]map[string]int -} - -// NewManager creates a manager and starts its run loop. -func NewManager(r *router.Router, _ *WorkerRegistry) *Manager { - m := &Manager{ - cmdCh: make(chan any, 256), - sessions: make(map[uuid.UUID]*session), - router: r, - } - go r.Start() - go m.run() - slog.Default().Info("manager started", slog.String("cmp", "manager")) - return m -} - -// API - -// CreateSession creates a new session. Arms a 1m idle timer immediately. -func (m *Manager) CreateSession() uuid.UUID { - slog.Default().Debug("create session request", slog.String("cmp", "manager")) - resp := make(chan createSessionResult, 1) - m.cmdCh <- createSessionCommand{resp: resp} - r := <-resp - slog.Default().Info("new session created", slog.String("cmp", "manager"), slog.String("session", r.sid.String())) - return r.sid -} - -// LeaseSessionReceiver leases a receiver and returns the receive func and its close func. -func (m *Manager) LeaseSessionReceiver(sid uuid.UUID) (func() (domain.Message, error), error) { - slog.Default().Debug("lease session receiver request", slog.String("cmp", "manager"), slog.String("session", sid.String())) - resp := make(chan leaseSessionReceiverResult, 1) - m.cmdCh <- leaseSessionReceiverCommand{sid: sid, resp: resp} - r := <-resp - return r.receiveFunc, r.err -} - -// LeaseSessionSender leases a sender and returns the send func and its close func. -func (m *Manager) LeaseSessionSender(sid uuid.UUID) (func(domain.Message) error, error) { - slog.Default().Debug("lease sender request", slog.String("cmp", "manager"), slog.String("session", sid.String())) - resp := make(chan leaseSessionSenderResult, 1) - m.cmdCh <- leaseSessionSenderCommand{sid: sid, resp: resp} - r := <-resp - return r.sendFunc, r.err -} - -// ReleaseSessionReceiver releases the currently held receiver lease -func (m *Manager) ReleaseSessionReceiver(sid uuid.UUID) error { - slog.Default().Debug("release session receiver request", slog.String("cmp", "manager"), slog.String("session", sid.String())) - resp := make(chan releaseSessionReceiverResult, 1) - m.cmdCh <- releaseSessionReceiverCommand{sid: sid, resp: resp} - r := <-resp - return r.err -} - -// ReleaseSessionSender releases the currently held receiver lease -func (m *Manager) ReleaseSessionSender(sid uuid.UUID) error { - slog.Default().Debug("release sender request", slog.String("cmp", "manager"), slog.String("session", sid.String())) - resp := make(chan releaseSessionSenderResult, 1) - m.cmdCh <- releaseSessionSenderCommand{sid: sid, resp: resp} - r := <-resp - return r.err -} - -// ConfigureSession applies a session config. Pattern wiring left TODO. -func (m *Manager) ConfigureSession(sid uuid.UUID, cfg any) error { - slog.Default().Debug("configure session request", slog.String("cmp", "manager"), slog.String("session", sid.String())) - resp := make(chan configureSessionResult, 1) - m.cmdCh <- configureSessionCommand{sid: sid, cfg: cfg, resp: resp} - r := <-resp - return r.err -} - -// CloseSession closes and removes the session. -func (m *Manager) CloseSession(sid uuid.UUID) error { - slog.Default().Debug("close session request", slog.String("cmp", "manager"), slog.String("session", sid.String())) - resp := make(chan closeSessionResult, 1) - m.cmdCh <- closeSessionCommand{sid: sid, resp: resp} - r := <-resp - return r.err -} - -// --- Loop --- - -func (m *Manager) run() { - for msg := range m.cmdCh { - switch c := msg.(type) { - case createSessionCommand: - m.handleNewSession(c) - case leaseSessionReceiverCommand: - m.handleLeaseSessionReceiver(c) - case leaseSessionSenderCommand: - m.handleLeaseSessionSender(c) - case releaseSessionReceiverCommand: - m.handleReleaseSessionReceiver(c) - case releaseSessionSenderCommand: - m.handleReleaseSessionSender(c) - case configureSessionCommand: - m.handleConfigureSession(c) - case closeSessionCommand: - m.handleCloseSession(c) - } - } -} - -// --- Handlers --- - -func (m *Manager) handleNewSession(cmd createSessionCommand) { - var s *session - idleCallback := func() { // Generate callback function for the session to be created. - resp := make(chan closeSessionResult, 1) - m.cmdCh <- closeSessionCommand{sid: s.id, resp: resp} - <-resp - } - s = newSession(m.router.Incoming(), idleCallback) - m.sessions[s.id] = s - cmd.resp <- createSessionResult{sid: s.id} -} - -func (m *Manager) handleLeaseSessionReceiver(cmd leaseSessionReceiverCommand) { - s, ok := m.sessions[cmd.sid] - if !ok { - cmd.resp <- leaseSessionReceiverResult{err: ErrSessionNotFound} - return - } - recv, err := s.leaseReceiver() - if err != nil { - cmd.resp <- leaseSessionReceiverResult{err: err} - return - } - - // Register the patterns and egress channel for the session with the router. - patterns := s.getPatterns() - egressChan, ok := s.getEgress() - if !ok { - cmd.resp <- leaseSessionReceiverResult{err: errors.New("egress channel doesn't exist despite successful lease")} - } - - for _, pattern := range patterns { - m.router.RegisterPattern(pattern, egressChan) - } - - cmd.resp <- leaseSessionReceiverResult{receiveFunc: recv, err: nil} -} - -func (m *Manager) handleLeaseSessionSender(cmd leaseSessionSenderCommand) { - s, ok := m.sessions[cmd.sid] - if !ok { - cmd.resp <- leaseSessionSenderResult{err: ErrSessionNotFound} - return - } - send, err := s.leaseSender() - if err != nil { - cmd.resp <- leaseSessionSenderResult{err: err} - return - } - cmd.resp <- leaseSessionSenderResult{sendFunc: send, err: nil} -} - -func (m *Manager) handleReleaseSessionReceiver(cmd releaseSessionReceiverCommand) { - s, ok := m.sessions[cmd.sid] - if !ok { - cmd.resp <- releaseSessionReceiverResult{err: ErrSessionNotFound} - return - } - err := s.releaseReceiver() - if err != nil { - cmd.resp <- releaseSessionReceiverResult{err: err} - return - } - cmd.resp <- releaseSessionReceiverResult{err: nil} -} - -func (m *Manager) handleReleaseSessionSender(cmd releaseSessionSenderCommand) { - s, ok := m.sessions[cmd.sid] - if !ok { - cmd.resp <- releaseSessionSenderResult{err: ErrSessionNotFound} - return - } - err := s.releaseSender() - if err != nil { - cmd.resp <- releaseSessionSenderResult{err: err} - return - } - cmd.resp <- releaseSessionSenderResult{err: nil} -} - -func (m *Manager) handleConfigureSession(cmd configureSessionCommand) { - s, ok := m.sessions[cmd.sid] - if !ok { - cmd.resp <- configureSessionResult{err: ErrSessionNotFound} - return - } - - newCfg, ok := cmd.cfg.(SessionConfig) - if !ok { - cmd.resp <- configureSessionResult{err: ErrBadConfig} - return - } - - // Normalize workers. - normalized := make([]WorkerEntry, len(newCfg.Workers)) - for i, we := range newCfg.Workers { - spec, err := m.workerRegistry.NormalizeSpecificationBytes(we.Type, we.Spec) - if err != nil { - cmd.resp <- configureSessionResult{err: err} - return - } - unit, err := m.workerRegistry.NormalizeUnitBytes(we.Type, we.Unit) - if err != nil { - cmd.resp <- configureSessionResult{err: err} - return - } - normalized[i] = WorkerEntry{Type: we.Type, Spec: spec, Unit: unit} - } - newCfg.Workers = normalized - - // Compute diffs. - curr := append([]WorkerEntry(nil), s.cfg.Workers...) - next := append([]WorkerEntry(nil), newCfg.Workers...) - additions, removals := workerEntryDiffs(curr, next) - - // Per-instance delta: type -> spec -> {add, remove} - type delta struct{ add, remove [][]byte } - changes := make(map[string]map[string]delta) - addTo := func(typ, spec string, u []byte, isAdd bool) { - if changes[typ] == nil { - changes[typ] = make(map[string]delta) - } - d := changes[typ][spec] - if isAdd { - d.add = append(d.add, u) - } else { - d.remove = append(d.remove, u) - } - changes[typ][spec] = d - } - for _, e := range additions { - addTo(e.Type, string(e.Spec), e.Unit, true) - } - for _, e := range removals { - addTo(e.Type, string(e.Spec), e.Unit, false) - } - - // Ensure manager maps. - if m.workerInstances == nil { - m.workerInstances = make(map[string]map[string]worker.Worker) - } - if m.workerUnitRefCounts == nil { - m.workerUnitRefCounts = make(map[string]map[string]map[string]int) - } - - // Rollback snapshots. - type snap struct { - hadInst bool - prevRef map[string]int - } - snaps := make(map[string]map[string]snap) // type -> spec -> snap - created := make(map[string]map[string]bool) - - saveSnap := func(typ, spec string) { - if snaps[typ] == nil { - snaps[typ] = make(map[string]snap) - } - if _, ok := snaps[typ][spec]; ok { - return - } - had := false - if m.workerInstances[typ] != nil { - _, had = m.workerInstances[typ][spec] - } - prev := make(map[string]int) - if m.workerUnitRefCounts[typ] != nil && m.workerUnitRefCounts[typ][spec] != nil { - for k, v := range m.workerUnitRefCounts[typ][spec] { - prev[k] = v - } - } - snaps[typ][spec] = snap{hadInst: had, prevRef: prev} - } - markCreated := func(typ, spec string) { - if created[typ] == nil { - created[typ] = make(map[string]bool) - } - created[typ][spec] = true - } - - toBytesSlice := func(ref map[string]int) [][]byte { - out := make([][]byte, 0, len(ref)) - for k, c := range ref { - if c > 0 { - out = append(out, []byte(k)) - } - } - return out - } - - restore := func(err error) { - // Restore refcounts and instance unit sets. - for typ, specs := range snaps { - for spec, sn := range specs { - // Restore refcounts exactly. - if m.workerUnitRefCounts[typ] == nil { - m.workerUnitRefCounts[typ] = make(map[string]map[string]int) - } - rc := make(map[string]int) - for k, v := range sn.prevRef { - rc[k] = v - } - m.workerUnitRefCounts[typ][spec] = rc - - prevUnits := toBytesSlice(rc) - - inst := m.workerInstances[typ][spec] - switch { - case sn.hadInst: - // Ensure instance exists and set units back. - if inst == nil { - wi, ierr := m.workerRegistry.Spawn(typ) - if ierr == nil { - m.workerInstances[typ][spec] = wi - inst = wi - // TODO: pass the correct SessionController - _ = wi.Start([]byte(spec), s) // best-effort - } - } - if inst != nil { - _ = inst.SetUnits(prevUnits) // best-effort - } - default: - // We did not have an instance before. Stop and remove if present. - if inst != nil { - _ = inst.Stop() - delete(m.workerInstances[typ], spec) - if len(m.workerInstances[typ]) == 0 { - delete(m.workerInstances, typ) - } - } - // If no refs remain, clean refcounts map too. - if len(rc) == 0 { - delete(m.workerUnitRefCounts[typ], spec) - if len(m.workerUnitRefCounts[typ]) == 0 { - delete(m.workerUnitRefCounts, typ) - } - } - } - } - } - // Clean up instances created during this op that shouldn't exist. - for typ, specs := range created { - for spec := range specs { - if snaps[typ] != nil && snaps[typ][spec].hadInst { - continue - } - if inst := m.workerInstances[typ][spec]; inst != nil { - _ = inst.Stop() - delete(m.workerInstances[typ], spec) - if len(m.workerInstances[typ]) == 0 { - delete(m.workerInstances, typ) - } - } - } - } - cmd.resp <- configureSessionResult{err: err} - } - - // Apply deltas per instance. - for typ, specMap := range changes { - if m.workerUnitRefCounts[typ] == nil { - m.workerUnitRefCounts[typ] = make(map[string]map[string]int) - } - if m.workerInstances[typ] == nil { - m.workerInstances[typ] = make(map[string]worker.Worker) - } - - for spec, d := range specMap { - saveSnap(typ, spec) - - // Update refcounts. - rc := m.workerUnitRefCounts[typ][spec] - if rc == nil { - rc = make(map[string]int) - m.workerUnitRefCounts[typ][spec] = rc - } - for _, u := range d.remove { - k := string(u) - if rc[k] > 0 { - rc[k]-- - } - if rc[k] == 0 { - delete(rc, k) - } - } - for _, u := range d.add { - k := string(u) - rc[k]++ - } - - desired := toBytesSlice(rc) - inst := m.workerInstances[typ][spec] - - switch { - case len(desired) == 0: - // No units desired: stop and prune if instance exists. - if inst != nil { - if err := inst.Stop(); err != nil { - restore(err) - return - } - delete(m.workerInstances[typ], spec) - if len(m.workerInstances[typ]) == 0 { - delete(m.workerInstances, typ) - } - } - // If no refs left, prune refcounts too. - delete(m.workerUnitRefCounts[typ], spec) - if len(m.workerUnitRefCounts[typ]) == 0 { - delete(m.workerUnitRefCounts, typ) - } - - default: - // Need instance with desired units. - if inst == nil { - wi, err := m.workerRegistry.Instantiate(typ, []byte(spec)) - if err != nil { - restore(err) - return - } - m.workerInstances[typ][spec] = wi - markCreated(typ, spec) - // TODO: pass correct SessionController implementation - if err := wi.Start([]byte(spec), s); err != nil { - restore(err) - return - } - inst = wi - } - if err := inst.SetUnits(desired); err != nil { - restore(err) - return - } - } - } - } - - // Commit config last. - if err := s.setConfig(newCfg); err != nil { - restore(err) - return - } - - cmd.resp <- configureSessionResult{err: nil} -} - -func (m *Manager) handleCloseSession(cmd closeSessionCommand) { - s, ok := m.sessions[cmd.sid] - if !ok { - cmd.resp <- closeSessionResult{err: ErrSessionNotFound} - return - } - - // TODO: Ensure workers are correctly scrapped - - patterns := s.getPatterns() - egress, ok := s.getEgress() - if ok { // We only need to deregister if there is an active receiver lease. - for _, pattern := range patterns { - m.router.DeregisterPattern(pattern, egress) - } - } - - // Release leases and ensure idle timer is disarmed. - s.closeAll() - s.disarmIdleTimer() - delete(m.sessions, cmd.sid) - - cmd.resp <- closeSessionResult{err: nil} -} diff --git a/services/data_service/internal/manager/session.go b/services/data_service/internal/manager/session.go deleted file mode 100644 index f12c1ba..0000000 --- a/services/data_service/internal/manager/session.go +++ /dev/null @@ -1,238 +0,0 @@ -package manager - -import ( - "errors" - "log/slog" - "time" - - "github.com/google/uuid" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" -) - -var ( - // Lease lifecycle errors. - ErrAlreadyReleased = errors.New("lease already released") - ErrSenderAlreadyLeased = errors.New("sender already leased") - ErrReceiverAlreadyLeased = errors.New("receiver already leased") - ErrSenderNotLeased = errors.New("no sender lease active") - ErrReceiverNotLeased = errors.New("no receiver lease active") - - // Config errors - ErrBadConfig = errors.New("config not valid") - ErrConfigActiveLeases = errors.New("cannot configure while a lease is active") -) - -type WorkerEntry struct { - Type string - Spec []byte - Unit []byte -} - -// SessionConfig carries non-live-tunable knobs for a session. -// Manager mutates this directly; session does not expose Configure anymore. -type SessionConfig struct { - IdleAfter time.Duration // <=0 disables idle timer - EgressBuffer int // receiver egress buffer size - Patterns []domain.Pattern - Workers []WorkerEntry -} - -// session is manager-owned state. Single goroutine access. -type session struct { - id uuid.UUID - - // Router pipes - ingress chan<- domain.Message // router.Incoming(); router-owned - egress chan domain.Message // current receiver lease egress; owned here - - // Config and timers - cfg SessionConfig - idleTimer *time.Timer - idleCallback func() // stored on creation - - // Sender lease - sendOpen bool - sendDone chan struct{} - - // Receiver lease - receiveOpen bool - receiveDone chan struct{} -} - -// newSession arms a 1-minute idle timer immediately. Manager must -// configure before it fires. idleCb is invoked by the timer. -func newSession(ingress chan<- domain.Message, idleCb func()) *session { - s := &session{ - id: uuid.New(), - ingress: ingress, - cfg: SessionConfig{ - IdleAfter: time.Minute, // default 1m on creation - EgressBuffer: 256, // default buffer - }, - idleCallback: idleCb, - } - s.armIdleTimer() - return s -} - -func (s *session) setConfig(cfg any) error { - if s.sendOpen || s.receiveOpen { - return ErrConfigActiveLeases - } - - cfgParsed, ok := cfg.(SessionConfig) - if !ok { - return ErrBadConfig - } - - s.cfg = cfgParsed - - return nil -} - -func (s *session) getEgress() (chan<- domain.Message, bool) { - if s.egress == nil { - return nil, false - } - return s.egress, true -} - -func (s *session) getPatterns() []domain.Pattern { - return nil -} - -// leaseSender opens a sender lease and returns send(m) error. -func (s *session) leaseSender() (func(domain.Message) error, error) { - if s.sendOpen { - return nil, ErrSenderAlreadyLeased - } - s.sendOpen = true - s.sendDone = make(chan struct{}) - s.disarmIdleTimer() - - // Snapshot for lease-scoped handle. - done := s.sendDone - - sendFunc := func(m domain.Message) error { - select { - case <-done: - return ErrAlreadyReleased - case s.ingress <- m: - return nil - } - } - - return sendFunc, nil -} - -// releaseSender releases the current sender lease. -func (s *session) releaseSender() error { - if !s.sendOpen { - return ErrSenderNotLeased - } - s.sendOpen = false - if s.sendDone != nil { - close(s.sendDone) // invalidates all prior send funcs - s.sendDone = nil - } - if !s.receiveOpen { - s.armIdleTimer() - } - return nil -} - -// leaseReceiver opens a receiver lease and returns receive() (Message, error). -func (s *session) leaseReceiver() (func() (domain.Message, error), error) { - if s.receiveOpen { - return nil, ErrReceiverAlreadyLeased - } - s.receiveOpen = true - s.receiveDone = make(chan struct{}) - s.egress = make(chan domain.Message, s.cfg.EgressBuffer) - s.disarmIdleTimer() - - // Snapshots for lease-scoped handles. - done := s.receiveDone - eg := s.egress - - receiveFunc := func() (domain.Message, error) { - select { - case <-done: - return domain.Message{}, ErrAlreadyReleased - case msg, ok := <-eg: - if !ok { - return domain.Message{}, ErrAlreadyReleased - } - return msg, nil - } - } - - return receiveFunc, nil -} - -// releaseReceiver releases the current receiver lease. -// Manager must stop any routing into s.egress before calling this. -func (s *session) releaseReceiver() error { - if !s.receiveOpen { - return ErrReceiverNotLeased - } - s.receiveOpen = false - if s.receiveDone != nil { - close(s.receiveDone) // invalidates all prior receive funcs - s.receiveDone = nil - } - if s.egress != nil { - close(s.egress) - s.egress = nil - } - if !s.sendOpen { - s.armIdleTimer() - } - return nil -} - -// closeAll force-releases both sender and receiver leases. Safe to call multiple times. -func (s *session) closeAll() { - // Sender - if s.sendOpen { - s.sendOpen = false - if s.sendDone != nil { - close(s.sendDone) - s.sendDone = nil - } - } - // Receiver - if s.receiveOpen { - s.receiveOpen = false - if s.receiveDone != nil { - close(s.receiveDone) - s.receiveDone = nil - } - if s.egress != nil { - close(s.egress) - s.egress = nil - } - } -} - -// armIdleTimer arms a timer using stored cfg.IdleAfter and idleCb. -func (s *session) armIdleTimer() { - if s.idleCallback == nil { - slog.Warn("nil idle callback function provided to session") - } - if s.idleTimer != nil { - s.idleTimer.Stop() - s.idleTimer = nil - } - if s.cfg.IdleAfter > 0 && s.idleCallback != nil { - s.idleTimer = time.AfterFunc(s.cfg.IdleAfter, s.idleCallback) - } -} - -// disarmIdleTimer disarms the idle timer if active. -func (s *session) disarmIdleTimer() { - if s.idleTimer != nil { - s.idleTimer.Stop() - s.idleTimer = nil - } -} diff --git a/services/data_service/internal/manager/worker_registry.go b/services/data_service/internal/manager/worker_registry.go deleted file mode 100644 index b363446..0000000 --- a/services/data_service/internal/manager/worker_registry.go +++ /dev/null @@ -1,120 +0,0 @@ -package manager - -import ( - "errors" - "sync" - - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/worker" -) - -var ( - ErrWorkerAlreadyRegistered = errors.New("worker type already registered") - ErrWorkerTypeUnknown = errors.New("unknown worker type") - ErrNilFactory = errors.New("nil worker factory") - ErrNilNormalizer = errors.New("nil worker normalizer") -) - -type registryEntry struct { - factory worker.Factory - normalizer worker.Normalizer -} - -type WorkerRegistry struct { - mu sync.RWMutex - m map[string]registryEntry -} - -func NewWorkerRegistry() *WorkerRegistry { - return &WorkerRegistry{m: make(map[string]registryEntry)} -} - -// Register a worker type with its factory and keyer. -func (wr *WorkerRegistry) Register(workerType string, factory worker.Factory, normalizer worker.Normalizer) error { - if factory == nil { - return ErrNilFactory - } - if normalizer == nil { - return ErrNilNormalizer - } - wr.mu.Lock() - defer wr.mu.Unlock() - if _, ok := wr.m[workerType]; ok { - return ErrWorkerAlreadyRegistered - } - wr.m[workerType] = registryEntry{factory: factory, normalizer: normalizer} - return nil -} - -// Deregister removes a worker type. -func (wr *WorkerRegistry) Deregister(workerType string) error { - wr.mu.Lock() - defer wr.mu.Unlock() - if _, ok := wr.m[workerType]; !ok { - return ErrWorkerTypeUnknown - } - delete(wr.m, workerType) - return nil -} - -// Spawn constructs a new worker instance for the given type. -func (wr *WorkerRegistry) Spawn(workerType string) (worker.Worker, error) { - wr.mu.RLock() - entry, ok := wr.m[workerType] - wr.mu.RUnlock() - if !ok { - return nil, ErrWorkerTypeUnknown - } - return entry.factory(), nil -} - -func (wr *WorkerRegistry) NormalizeSpecificationBytes(workerType string, spec []byte) ([]byte, error) { - wr.mu.RLock() - entry, ok := wr.m[workerType] - wr.mu.RUnlock() - if !ok { - return nil, ErrWorkerTypeUnknown - } - return entry.normalizer.NormalizeSpecification(spec) -} - -func (wr *WorkerRegistry) NormalizeUnitBytes(workerType string, unit []byte) ([]byte, error) { - wr.mu.RLock() - entry, ok := wr.m[workerType] - wr.mu.RUnlock() - if !ok { - return nil, ErrWorkerTypeUnknown - } - return entry.normalizer.NormalizeUnit(unit) -} - -// Factory returns the registered factory. -func (wr *WorkerRegistry) Factory(workerType string) (worker.Factory, error) { - wr.mu.RLock() - entry, ok := wr.m[workerType] - wr.mu.RUnlock() - if !ok { - return nil, ErrWorkerTypeUnknown - } - return entry.factory, nil -} - -func (wr *WorkerRegistry) Normalizer(workerType string) (worker.Normalizer, error) { - wr.mu.RLock() - entry, ok := wr.m[workerType] - wr.mu.RUnlock() - if !ok { - return nil, ErrWorkerTypeUnknown - } - return entry.normalizer, nil -} - -// RegisteredTypes lists all worker types. -func (wr *WorkerRegistry) RegisteredTypes() []string { - wr.mu.RLock() - defer wr.mu.RUnlock() - out := make([]string, 0, len(wr.m)) - for t := range wr.m { - out = append(out, t) - } - return out -} diff --git a/services/data_service/internal/router/partition.go b/services/data_service/internal/router/partition.go deleted file mode 100644 index 511a1ff..0000000 --- a/services/data_service/internal/router/partition.go +++ /dev/null @@ -1,165 +0,0 @@ -package router - -import ( - "slices" - "sync" - - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" -) - -// actorPartition: single goroutine owns state. -type actorPartition struct { - opCh chan any - wg sync.WaitGroup - memo map[domain.Identifier][]chan<- domain.Message - rules map[string]*ruleEntry // the string is to be a pattern.Canonical() -} - -type ruleEntry struct { - pattern domain.Pattern - channels map[chan<- domain.Message]struct{} -} - -func newActorPartition(bufferSize int) *actorPartition { - return &actorPartition{ - opCh: make(chan any, bufferSize), - memo: make(map[domain.Identifier][]chan<- domain.Message), - rules: make(map[string]*ruleEntry), - } -} - -// External (though not exported) methods to implement the pattern interface - -func (p *actorPartition) start() { - p.wg.Go(p.loop) -} - -func (p *actorPartition) stop() { - close(p.opCh) - p.wg.Wait() -} - -func (p *actorPartition) registerRoute(pat domain.Pattern, ch chan<- domain.Message) { - done := make(chan struct{}, 1) - p.opCh <- opRegister{pattern: pat, channel: ch, done: done} - <-done -} - -func (p *actorPartition) deregisterRoute(pat domain.Pattern, ch chan<- domain.Message) { - done := make(chan struct{}, 1) - p.opCh <- opDeregister{pattern: pat, channel: ch, done: done} - <-done -} - -func (p *actorPartition) deliver(msg domain.Message) { - p.opCh <- opDeliver{msg: msg} -} - -// Internal - -type opRegister struct { - pattern domain.Pattern - channel chan<- domain.Message - done chan struct{} -} -type opDeregister struct { - pattern domain.Pattern - channel chan<- domain.Message - done chan struct{} -} -type opDeliver struct{ msg domain.Message } - -func (p *actorPartition) loop() { - for op := range p.opCh { - switch v := op.(type) { - - case opDeliver: - id := v.msg.Identifier - subs, exists := p.memo[id] - if !exists { - uniqueChannels := make(map[chan<- domain.Message]struct{}) - for _, e := range p.rules { - if e.pattern.Match(id) { - for ch := range e.channels { - uniqueChannels[ch] = struct{}{} - } - } - } - - if len(uniqueChannels) > 0 { - uniqueChannelsSlice := make([]chan<- domain.Message, 0, len(uniqueChannels)) - for ch := range uniqueChannels { - uniqueChannelsSlice = append(uniqueChannelsSlice, ch) - } - p.memo[id] = uniqueChannelsSlice - subs = uniqueChannelsSlice - } else { - p.memo[id] = nil // cache "no subscribers", fast hot-path. - subs = nil - } - } - - for _, ch := range subs { - select { - case ch <- v.msg: - default: // drop on full ch - } - } - - case opRegister: - key := v.pattern.Key() - e, exists := p.rules[key] - if !exists { - e = &ruleEntry{pattern: v.pattern, channels: make(map[chan<- domain.Message]struct{})} - p.rules[key] = e - } - if _, exists := e.channels[v.channel]; exists { - v.done <- struct{}{} - continue - } - - e.channels[v.channel] = struct{}{} - - for id, subs := range p.memo { - if v.pattern.Match(id) && !slices.Contains(subs, v.channel) { - p.memo[id] = append(subs, v.channel) - } - } - - v.done <- struct{}{} - - case opDeregister: - key := v.pattern.Key() - e, ok := p.rules[key] - if !ok { - v.done <- struct{}{} - continue - } - if _, ok := e.channels[v.channel]; !ok { - v.done <- struct{}{} - continue - } - - delete(e.channels, v.channel) - if len(e.channels) == 0 { - delete(p.rules, key) - } - - for id, subs := range p.memo { - if v.pattern.Match(id) { - for i := range subs { - if subs[i] == v.channel { - last := len(subs) - 1 - subs[i] = subs[last] - subs[last] = nil // help GC - p.memo[id] = subs[:last] - break - } - } - } - } - - v.done <- struct{}{} - } - } -} diff --git a/services/data_service/internal/router/routerAdv.go b/services/data_service/internal/router/routerAdv.go deleted file mode 100644 index d21a032..0000000 --- a/services/data_service/internal/router/routerAdv.go +++ /dev/null @@ -1,128 +0,0 @@ -// Package router for routing! -package router - -import ( - "fmt" - "sync" - - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" -) - -type partition interface { - registerRoute(domain.Pattern, chan<- domain.Message) - deregisterRoute(domain.Pattern, chan<- domain.Message) - deliver(domain.Message) - start() - stop() -} - -// Factories take a buffer size (internal), router stores a zero-arg thunk. -var partitionFactories = map[string]func(int) partition{ - "actor": func(buf int) partition { return newActorPartition(buf) }, -} - -type Router struct { - incoming chan domain.Message - mu sync.RWMutex - partitions map[string]partition - newPart func() partition // zero-arg thunk - wg sync.WaitGroup - started bool -} - -func NewRouter(kind string, incomingBuf, partBuf int) (*Router, error) { - if incomingBuf <= 0 { - incomingBuf = 2048 - } - if partBuf <= 0 { - partBuf = 1024 - } - - makePartWithBuf, ok := partitionFactories[kind] - if !ok { - return nil, fmt.Errorf("unknown partition kind %q", kind) - } - // Curry (!!!) to zero-arg - makePart := func() partition { return makePartWithBuf(partBuf) } - - return &Router{ - incoming: make(chan domain.Message, incomingBuf), - partitions: make(map[string]partition), - newPart: makePart, - }, nil -} - -func (r *Router) Start() { - r.mu.Lock() - if r.started { - r.mu.Unlock() - return - } - r.started = true - r.mu.Unlock() - - r.wg.Go(func() { - for msg := range r.incoming { - ns, _, _ := msg.Identifier.Parse() - r.mu.RLock() - p, exists := r.partitions[ns] - r.mu.RUnlock() - if exists { - p.deliver(msg) - } - } - }) -} - -func (r *Router) Stop() { - r.mu.Lock() - if !r.started { - r.mu.Unlock() - return - } - r.started = false - close(r.incoming) - - ps := make([]partition, 0, len(r.partitions)) - for _, p := range r.partitions { - ps = append(ps, p) - } - r.partitions = make(map[string]partition) - r.mu.Unlock() - - for _, p := range ps { - p.stop() - } - r.wg.Wait() -} - -func (r *Router) Incoming() chan<- domain.Message { return r.incoming } - -func (r *Router) RegisterPattern(pat domain.Pattern, ch chan<- domain.Message) { - // Inline ensurePartition - ns, _, _, _ := pat.Parse() // Note: Error ignored, pattern assumed to be valid if passed to router - r.mu.RLock() - p := r.partitions[ns] - r.mu.RUnlock() - if p == nil { - r.mu.Lock() - // recheck under write lock - if p = r.partitions[ns]; p == nil { - p = r.newPart() - p.start() - r.partitions[ns] = p - } - r.mu.Unlock() - } - p.registerRoute(pat, ch) -} - -func (r *Router) DeregisterPattern(pat domain.Pattern, ch chan<- domain.Message) { - r.mu.RLock() - ns, _, _, _ := pat.Parse() - p := r.partitions[ns] - r.mu.RUnlock() - if p != nil { - p.deregisterRoute(pat, ch) - } -} diff --git a/services/data_service/internal/worker/worker.go b/services/data_service/internal/worker/worker.go deleted file mode 100644 index 2bee65e..0000000 --- a/services/data_service/internal/worker/worker.go +++ /dev/null @@ -1,51 +0,0 @@ -// Package worker provides background processing and task management for the tessera data_service. -// It handles the execution, coordination, and lifecycle of worker routines responsible for data operations. -package worker - -import ( - "errors" - "time" - - "github.com/google/uuid" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" -) - -var ( - ErrWorkerNotRunning = errors.New("worker not running") - ErrWorkerRunning = errors.New("worker already running") -) - -type ( - ReceiverFunc func() (domain.Message, error) - SenderFunc func(m domain.Message) error -) - -type SessionController interface { - CreateSession(idleAfter time.Duration) uuid.UUID - - LeaseSessionReceiver(sid uuid.UUID) (ReceiverFunc, error) - LeaseSessionSender(sid uuid.UUID) (SenderFunc, error) - ReleaseSessionReceiver(sid uuid.UUID) error - ReleaseSessionSender(sid uuid.UUID) error - - ConfigureSession(sid uuid.UUID, cfg any) error - CloseSession(sid uuid.UUID) error -} - -type Worker interface { - Start(spec []byte, ctrl SessionController) error - Stop() error - IsRunning() bool - - SetUnits(units [][]byte) error - - GetSpecification() []byte - GetUnits() [][]byte -} - -type Normalizer interface { - NormalizeSpecification(spec []byte) ([]byte, error) - NormalizeUnit(unit []byte) ([]byte, error) -} - -type Factory func() Worker diff --git a/services/data_service/internal/worker/workers/binance/ws/binance_futures.go b/services/data_service/internal/worker/workers/binance/ws/binance_futures.go deleted file mode 100644 index bd8e0ce..0000000 --- a/services/data_service/internal/worker/workers/binance/ws/binance_futures.go +++ /dev/null @@ -1,252 +0,0 @@ -package ws - -import ( - "context" - "fmt" - "log/slog" - "sync" - "sync/atomic" - "time" - - "github.com/google/uuid" - - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" -) - -const providerName = "binance_futures" - -type Config struct { - Endpoint string - MaxStreamsPerShard uint16 - RateLimitPerSec uint16 -} - -type BinanceFutures struct { - cfg Config - bus chan<- domain.Message - - mu sync.RWMutex - shards map[uuid.UUID]*shard - assignOrder []uuid.UUID - streamAssignments map[string]*shard - pendingGlobal map[string][]chan error - - ctx context.Context - cancel context.CancelFunc - - idSeq atomic.Uint64 -} - -func NewBinanceFuturesWebsocket(cfg Config, bus chan<- domain.Message) *BinanceFutures { - if cfg.Endpoint == "" { - cfg.Endpoint = "wss://fstream.binance.com/stream" - } - if cfg.RateLimitPerSec <= 0 { - cfg.RateLimitPerSec = 5 - } - if cfg.MaxStreamsPerShard == 0 { - cfg.MaxStreamsPerShard = 15 - } - return &BinanceFutures{ - cfg: cfg, - bus: bus, - shards: make(map[uuid.UUID]*shard), - streamAssignments: make(map[string]*shard), - pendingGlobal: make(map[string][]chan error), - } -} - -func (b *BinanceFutures) Start() error { - b.mu.Lock() - defer b.mu.Unlock() - if b.ctx != nil { - return nil - } - b.ctx, b.cancel = context.WithCancel(context.Background()) - - slog.Default().Info("started", slog.String("cmp", providerName)) - sh, err := newShard(b.ctx, b.cfg, b.bus, b.nextReqID) - if err != nil { - slog.Default().Error("", "error", err) - return err - } - b.shards[sh.ID] = sh - b.assignOrder = []uuid.UUID{sh.ID} - - // idle shard GC - go b.gcIdleShards() - - return nil -} - -func (b *BinanceFutures) Stop() { - b.mu.Lock() - if b.cancel != nil { - b.cancel() - } - // snapshot shards, then clear maps - shs := make([]*shard, 0, len(b.shards)) - for _, sh := range b.shards { - shs = append(shs, sh) - } - b.shards = map[uuid.UUID]*shard{} - b.assignOrder = nil - b.streamAssignments = map[string]*shard{} - - for subj, waiters := range b.pendingGlobal { - for _, ch := range waiters { - select { - case ch <- context.Canceled: - default: - } - } - delete(b.pendingGlobal, subj) - } - slog.Default().Info("stopped", slog.String("cmp", providerName)) - b.mu.Unlock() - - for _, sh := range shs { - sh.stop() - } -} - -func (b *BinanceFutures) Subscribe(subject string) <-chan error { - ch := make(chan error, 1) - if !IsValidSubject(subject) { - ch <- fmt.Errorf("invalid subject: %s", subject) - return ch - } - - b.mu.Lock() - if sh, ok := b.streamAssignments[subject]; ok && sh.isActive(subject) { - b.mu.Unlock() - ch <- nil - return ch - } - sh := b.pickShardLocked() - b.streamAssignments[subject] = sh - sh.enqueueSubscribe(subject, ch) - b.mu.Unlock() - return ch -} - -func (b *BinanceFutures) Unsubscribe(subject string) <-chan error { - ch := make(chan error, 1) - - b.mu.Lock() - sh, ok := b.streamAssignments[subject] - if ok { - delete(b.streamAssignments, subject) // allow reassignment later - } - b.mu.Unlock() - - if !ok { - ch <- nil - return ch - } - sh.enqueueUnsubscribe(subject, ch) - return ch -} - -func (b *BinanceFutures) Fetch(_ string) (domain.Message, error) { - return domain.Message{}, fmt.Errorf("fetch not supported by provider") -} - -func (b *BinanceFutures) GetActiveStreams() []string { - b.mu.RLock() - defer b.mu.RUnlock() - out := make([]string, 0) - for _, sh := range b.shards { - out = append(out, sh.activeList()...) - } - return out -} - -func (b *BinanceFutures) IsStreamActive(key string) bool { - b.mu.RLock() - sh := b.streamAssignments[key] - b.mu.RUnlock() - if sh == nil { - return false - } - return sh.isActive(key) -} - -func (b *BinanceFutures) IsValidSubject(key string, _ bool) bool { return IsValidSubject(key) } - -// pick shard by lowest load = active + pending subs; enforce cap -func (b *BinanceFutures) pickShardLocked() *shard { - var chosen *shard - minLoad := int(^uint(0) >> 1) // max int - - for _, id := range b.assignOrder { - sh := b.shards[id] - if sh == nil { - continue - } - load := sh.loadEstimate() - if load < int(b.cfg.MaxStreamsPerShard) && load < minLoad { - minLoad = load - chosen = sh - } - } - if chosen != nil { - return chosen - } - - // need a new shard - sh, err := newShard(b.ctx, b.cfg, b.bus, b.nextReqID) - if err != nil { - if len(b.assignOrder) > 0 { - return b.shards[b.assignOrder[0]] - } - return sh - } - b.shards[sh.ID] = sh - b.assignOrder = append(b.assignOrder, sh.ID) - return sh -} - -func (b *BinanceFutures) nextReqID() uint64 { return b.idSeq.Add(1) } - -// Close idle shards periodically. Keep at least one. -func (b *BinanceFutures) gcIdleShards() { - t := time.NewTicker(30 * time.Second) - defer t.Stop() - for { - select { - case <-b.ctx.Done(): - return - case <-t.C: - var toStop []*shard - - b.mu.Lock() - if len(b.shards) <= 1 { - b.mu.Unlock() - continue - } - for id, sh := range b.shards { - if len(b.shards)-len(toStop) <= 1 { - break // keep one - } - if sh.isIdle() { - toStop = append(toStop, sh) - delete(b.shards, id) - // prune order list - for i, v := range b.assignOrder { - if v == id { - b.assignOrder = append(b.assignOrder[:i], b.assignOrder[i+1:]...) - break - } - } - } - } - b.mu.Unlock() - - for _, sh := range toStop { - slog.Default().Info("close idle shard", "cmp", providerName, "shard", sh.ID) - sh.stop() - } - } - } -} diff --git a/services/data_service/internal/worker/workers/binance/ws/shard.go b/services/data_service/internal/worker/workers/binance/ws/shard.go deleted file mode 100644 index 18c304a..0000000 --- a/services/data_service/internal/worker/workers/binance/ws/shard.go +++ /dev/null @@ -1,465 +0,0 @@ -package ws - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "log/slog" - "sync" - "time" - - "github.com/coder/websocket" - "github.com/google/uuid" - - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" -) - -type opType uint8 - -const ( - opSubscribe opType = iota + 1 - opUnsubscribe -) - -type pendingBatch struct { - Op opType - Subjects []string - Waiters map[string][]chan error -} - -type shard struct { - ID uuid.UUID - url string - cfg Config - - ctx context.Context - cancel context.CancelFunc - - conn *websocket.Conn - - mu sync.RWMutex - active map[string]struct{} - - subBatch map[string][]chan error - unsubBatch map[string][]chan error - - sendQ chan []byte - rateTicker *time.Ticker - pingTicker *time.Ticker - - pendingMu sync.Mutex - pendingByID map[uint64]*pendingBatch - nextReqID func() uint64 - - wg sync.WaitGroup - bus chan<- domain.Message -} - -func newShard(pctx context.Context, cfg Config, bus chan<- domain.Message, next func() uint64) (*shard, error) { - id := uuid.New() - ctx, cancel := context.WithCancel(pctx) - sh := &shard{ - ID: id, - url: cfg.Endpoint, - cfg: cfg, - ctx: ctx, - cancel: cancel, - active: make(map[string]struct{}), - subBatch: make(map[string][]chan error), - unsubBatch: make(map[string][]chan error), - sendQ: make(chan []byte, 256), - pendingByID: make(map[uint64]*pendingBatch), - nextReqID: next, - bus: bus, - } - - // per-shard rate limiter; also drives batch flushing - rate := cfg.RateLimitPerSec - if rate <= 0 { - rate = 1 - } - interval := time.Second / time.Duration(rate) - sh.rateTicker = time.NewTicker(interval) - sh.pingTicker = time.NewTicker(30 * time.Second) - - slog.Default().Info("shard created", "cmp", providerName, "shard", sh.ID.String()) - - if err := sh.connect(); err != nil { - slog.Default().Error("shard connection failed", "cmp", providerName, "shard", sh.ID.String(), "error", err) - return nil, err - } - sh.startLoops() - return sh, nil -} - -func (s *shard) connect() error { - dctx, cancel := context.WithTimeout(s.ctx, 10*time.Second) - defer cancel() - c, _, err := websocket.Dial(dctx, s.url, &websocket.DialOptions{}) - if err != nil { - slog.Default().Error("shard connection error", "cmp", providerName, "shard", s.ID.String(), "error", err) - return err - } - s.conn = c - slog.Default().Info("shard connected", "cmp", providerName, "shard", s.ID.String()) - return nil -} - -func (s *shard) startLoops() { - s.wg.Add(3) - go s.writeLoop() - go s.readLoop() - go s.pingLoop() -} - -func (s *shard) stop() { - s.cancel() - if s.conn != nil { - _ = s.conn.Close(websocket.StatusNormalClosure, "shutdown") - } - if s.rateTicker != nil { - s.rateTicker.Stop() - } - if s.pingTicker != nil { - s.pingTicker.Stop() - } - s.wg.Wait() - - s.pendingMu.Lock() - for _, p := range s.pendingByID { - for _, arr := range p.Waiters { - for _, ch := range arr { - select { - case ch <- context.Canceled: - default: - } - } - } - } - s.pendingByID = map[uint64]*pendingBatch{} - s.pendingMu.Unlock() - - s.mu.Lock() - for _, arr := range s.subBatch { - for _, ch := range arr { - select { - case ch <- context.Canceled: - default: - } - } - } - for _, arr := range s.unsubBatch { - for _, ch := range arr { - select { - case ch <- context.Canceled: - default: - } - } - } - s.subBatch = map[string][]chan error{} - s.unsubBatch = map[string][]chan error{} - s.mu.Unlock() - - slog.Default().Info("shard stopped", "cmp", providerName, "shard", s.ID.String()) -} - -func (s *shard) enqueueSubscribe(subject string, ch chan error) { - s.mu.Lock() - s.subBatch[subject] = append(s.subBatch[subject], ch) - s.mu.Unlock() - slog.Default().Debug("shard enqueue subscribe", "cmp", providerName, "shard", s.ID, "subject", subject) -} - -func (s *shard) enqueueUnsubscribe(subject string, ch chan error) { - s.mu.Lock() - s.unsubBatch[subject] = append(s.unsubBatch[subject], ch) - s.mu.Unlock() - slog.Default().Debug("shard enqueue unsubscribe", "cmp", providerName, "shard", s.ID, "subject", subject) -} - -func (s *shard) isActive(subj string) bool { - s.mu.RLock() - _, ok := s.active[subj] - s.mu.RUnlock() - return ok -} - -func (s *shard) activeCount() int { - s.mu.RLock() - n := len(s.active) - s.mu.RUnlock() - return n -} - -func (s *shard) loadEstimate() int { // active + pending subscribes - s.mu.RLock() - n := len(s.active) + len(s.subBatch) - s.mu.RUnlock() - return n -} - -func (s *shard) isIdle() bool { - s.mu.RLock() - idle := len(s.active) == 0 && len(s.subBatch) == 0 && len(s.unsubBatch) == 0 - s.mu.RUnlock() - return idle -} - -func (s *shard) activeList() []string { - s.mu.RLock() - defer s.mu.RUnlock() - out := make([]string, 0, len(s.active)) - for k := range s.active { - out = append(out, k) - } - return out -} - -func (s *shard) writeLoop() { - defer s.wg.Done() - for { - select { - case <-s.ctx.Done(): - return - case <-s.rateTicker.C: - // snapshot and clear pending batch operations - var subs, unsubs map[string][]chan error - s.mu.Lock() - if len(s.subBatch) > 0 { - subs = s.subBatch - s.subBatch = make(map[string][]chan error) - } - if len(s.unsubBatch) > 0 { - unsubs = s.unsubBatch - s.unsubBatch = make(map[string][]chan error) - } - s.mu.Unlock() - - // send SUBSCRIBE batch - if len(subs) > 0 { - params := make([]string, 0, len(subs)) - waiters := make(map[string][]chan error, len(subs)) - for k, v := range subs { - params = append(params, k) - waiters[k] = v - } - id := s.nextReqID() - frame := map[string]any{"method": "SUBSCRIBE", "params": params, "id": id} - payload, _ := json.Marshal(frame) - s.recordPending(id, opSubscribe, params, waiters) - if err := s.writeFrame(payload); err != nil { - s.reconnect() - return - } - } - - // send UNSUBSCRIBE batch - if len(unsubs) > 0 { - params := make([]string, 0, len(unsubs)) - waiters := make(map[string][]chan error, len(unsubs)) - for k, v := range unsubs { - params = append(params, k) - waiters[k] = v - } - id := s.nextReqID() - frame := map[string]any{"method": "UNSUBSCRIBE", "params": params, "id": id} - payload, _ := json.Marshal(frame) - s.recordPending(id, opUnsubscribe, params, waiters) - if err := s.writeFrame(payload); err != nil { - s.reconnect() - return - } - } - - // optional: one queued ad-hoc frame per tick - select { - case msg := <-s.sendQ: - if err := s.writeFrame(msg); err != nil { - s.reconnect() - return - } - default: - } - } - } -} - -func (s *shard) writeFrame(msg []byte) error { - wctx, cancel := context.WithTimeout(s.ctx, 5*time.Second) - defer cancel() - err := s.conn.Write(wctx, websocket.MessageText, msg) - if err != nil { - slog.Default().Warn("shard write error", "cmp", providerName, "shard", s.ID, "error", err) - } - return err -} - -func (s *shard) readLoop() { - defer s.wg.Done() - for { - select { - case <-s.ctx.Done(): - return - default: - // longer idle timeout when no active subscriptions - timeout := 60 * time.Second - if s.activeCount() == 0 { - timeout = 5 * time.Minute - } - rctx, cancel := context.WithTimeout(s.ctx, timeout) - _, data, err := s.conn.Read(rctx) - cancel() - if err != nil { - if errors.Is(err, context.DeadlineExceeded) { - slog.Default().Debug("shard read idle timeout", "cmp", providerName, "shard", s.ID) - continue - } - slog.Default().Warn("shard read error", "cmp", providerName, "shard", s.ID, "error", err) - s.reconnect() - return - } - - if bytes.Contains(data, []byte("\"id\"")) { - var ack struct { - ID uint64 `json:"id"` - Result *json.RawMessage `json:"result"` - Error *struct { - Code int `json:"code"` - Msg string `json:"msg"` - } `json:"error"` - } - if json.Unmarshal(data, &ack) == nil && ack.ID != 0 { - if ack.Error != nil { - slog.Default().Warn("shard ack error", "cmp", providerName, "shard", s.ID, "id", ack.ID, "code", ack.Error.Code, "msg", ack.Error.Msg) - s.resolvePending(ack.ID, fmt.Errorf("binance error %d: %s", ack.Error.Code, ack.Error.Msg)) - } else { - slog.Default().Debug("shard ack ok", "cmp", providerName, "shard", s.ID, "id", ack.ID) - s.resolvePending(ack.ID, nil) - } - continue - } - } - - var frame struct { - Stream string `json:"stream"` - Data json.RawMessage `json:"data"` - } - if json.Unmarshal(data, &frame) == nil && frame.Stream != "" { - id, err := domain.RawID(providerName, frame.Stream) - if err == nil { - select { - case s.bus <- domain.Message{Identifier: id, Payload: frame.Data}: - default: - } - } - continue - } - slog.Default().Debug("shard unknown message", "cmp", providerName, "shard", s.ID, "data", string(data)) - } - } -} - -func (s *shard) pingLoop() { - defer s.wg.Done() - for { - select { - case <-s.ctx.Done(): - return - case <-s.pingTicker.C: - ctx, cancel := context.WithTimeout(s.ctx, 5*time.Second) - err := s.conn.Ping(ctx) - cancel() - if err != nil { - slog.Default().Warn("shard ping failed", "cmp", providerName, "shard", s.ID, "error", err) - s.reconnect() - return - } - } - } -} - -func (s *shard) recordPending(id uint64, op opType, subjects []string, waiters map[string][]chan error) { - s.pendingMu.Lock() - s.pendingByID[id] = &pendingBatch{Op: op, Subjects: subjects, Waiters: waiters} - s.pendingMu.Unlock() -} - -func (s *shard) resolvePending(id uint64, err error) { - s.pendingMu.Lock() - p := s.pendingByID[id] - delete(s.pendingByID, id) - s.pendingMu.Unlock() - if p == nil { - return - } - - if err == nil { - s.mu.Lock() - if p.Op == opSubscribe { - for _, subj := range p.Subjects { - s.active[subj] = struct{}{} - } - slog.Default().Debug("shard subscribed", "cmp", providerName, "shard", s.ID, "subjects", p.Subjects) - } else { - for _, subj := range p.Subjects { - delete(s.active, subj) - } - slog.Default().Debug("shard unsubscribed", "cmp", providerName, "shard", s.ID, "subjects", p.Subjects) - } - s.mu.Unlock() - } else { - slog.Default().Warn("shard pending error", "cmp", providerName, "shard", s.ID, "error", err) - } - - for _, arr := range p.Waiters { - for _, ch := range arr { - select { - case ch <- err: - default: - } - } - } -} - -func (s *shard) queue(payload []byte) { - select { - case s.sendQ <- payload: - default: - slog.Default().Warn("shard sendQ full, dropping one message", "cmp", providerName, "shard", s.ID) - <-s.sendQ - s.sendQ <- payload - } -} - -func (s *shard) reconnect() { - reconnectStartTime := time.Now() - if s.conn != nil { - _ = s.conn.Close(websocket.StatusGoingAway, "reconnect") - } - - for { - select { - case <-s.ctx.Done(): - return - default: - if err := s.connect(); err != nil { - time.Sleep(200 * time.Millisecond) - continue - } - - // re-stage current actives for batch subscribe on next tick - s.mu.RLock() - for k := range s.active { - s.subBatch[k] = append(s.subBatch[k], nil) - } - s.mu.RUnlock() - - // restart loops - s.startLoops() - slog.Default().Info("shard reconnected", "cmp", providerName, "shard", s.ID, "downtime", time.Since(reconnectStartTime).String()) - return - } - } -} diff --git a/services/data_service/internal/worker/workers/binance/ws/subjects.go b/services/data_service/internal/worker/workers/binance/ws/subjects.go deleted file mode 100644 index 81023a1..0000000 --- a/services/data_service/internal/worker/workers/binance/ws/subjects.go +++ /dev/null @@ -1,21 +0,0 @@ -package ws - -import "regexp" - -var ( - reAggTrade = regexp.MustCompile(`^[a-z0-9]+@aggTrade$`) - reTrade = regexp.MustCompile(`^[a-z0-9]+@trade$`) - reMarkPrice = regexp.MustCompile(`^[a-z0-9]+@markPrice(@1s)?$`) - reKline = regexp.MustCompile(`^[a-z0-9]+@kline_(1s|1m|3m|5m|15m|30m|1h|2h|4h|6h|8h|12h|1d|3d|1w|1M)$`) - reBookTicker = regexp.MustCompile(`^[a-z0-9]+@bookTicker$`) - reDepth = regexp.MustCompile(`^[a-z0-9]+@depth(@100ms)?$`) -) - -func IsValidSubject(s string) bool { - return reAggTrade.MatchString(s) || - reTrade.MatchString(s) || - reMarkPrice.MatchString(s) || - reKline.MatchString(s) || - reBookTicker.MatchString(s) || - reDepth.MatchString(s) -} diff --git a/services/portfolio_service/cmd/main.go b/services/portfolio_service/cmd/main.go deleted file mode 100644 index 1d619dd..0000000 --- a/services/portfolio_service/cmd/main.go +++ /dev/null @@ -1 +0,0 @@ -package cmd diff --git a/services/strategies/strategy1/cmd/main.go b/services/strategies/strategy1/cmd/main.go deleted file mode 100644 index 1d619dd..0000000 --- a/services/strategies/strategy1/cmd/main.go +++ /dev/null @@ -1 +0,0 @@ -package cmd diff --git a/services/strategies/strategy1/service.go b/services/strategies/strategy1/service.go deleted file mode 100644 index fece8d8..0000000 --- a/services/strategies/strategy1/service.go +++ /dev/null @@ -1 +0,0 @@ -package marketdata