diff --git a/go.mod b/go.mod index f803688..a7ae7e1 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module gitlab.michelsen.id/phillmichelsen/tessera go 1.25.1 require ( - github.com/coder/websocket v1.8.14 github.com/google/uuid v1.6.0 github.com/lmittmann/tint v1.1.2 google.golang.org/grpc v1.75.0 @@ -11,8 +10,10 @@ require ( ) require ( + github.com/klauspost/cpuid/v2 v2.0.9 // indirect golang.org/x/net v0.43.0 // indirect golang.org/x/sys v0.35.0 // indirect golang.org/x/text v0.28.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250826171959-ef028d996bc1 // indirect + lukechampine.com/blake3 v1.4.1 // indirect ) diff --git a/go.sum b/go.sum index 9d0e5c6..f35afde 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,3 @@ -github.com/coder/websocket v1.8.14 h1:9L0p0iKiNOibykf283eHkKUHHrpG7f65OE3BhhO7v9g= -github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -10,6 +8,8 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/lmittmann/tint v1.1.2 h1:2CQzrL6rslrsyjqLDwD11bZ5OpLBPU+g3G/r5LSfS8w= github.com/lmittmann/tint v1.1.2/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= @@ -38,3 +38,5 @@ google.golang.org/grpc v1.75.0 h1:+TW+dqTd2Biwe6KKfhE5JpiYIBWq865PhKGSXiivqt4= google.golang.org/grpc v1.75.0/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ= google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= +lukechampine.com/blake3 v1.4.1 h1:I3Smz7gso8w4/TunLKec6K2fn+kyKtDxr/xcQEN84Wg= +lukechampine.com/blake3 v1.4.1/go.mod h1:QFosUxmjB8mnrWFSNwKmvxHpfY72bmD2tQ0kBMM3kwo= diff --git a/services/data_service/cmd/data_service/main.go b/services/data_service/cmd/data_service/main.go index 49e8254..beb57ad 100644 --- a/services/data_service/cmd/data_service/main.go +++ b/services/data_service/cmd/data_service/main.go @@ -6,9 +6,6 @@ import ( "time" "github.com/lmittmann/tint" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/manager" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/router" - "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/worker" ) func initLogger() *slog.Logger { @@ -49,10 +46,5 @@ func main() { slog.SetDefault(initLogger()) slog.Info("starting", "svc", "data-service") - // Setup - wr := worker.NewRegistry() - r, _ := router.NewRouter("actor", 2048, 512) - _ = manager.NewManager(r, wr) - select {} } diff --git a/services/data_service/internal/control/control.go b/services/data_service/internal/control/control.go index 9a3bee4..8f87ef2 100644 --- a/services/data_service/internal/control/control.go +++ b/services/data_service/internal/control/control.go @@ -1,2 +1,25 @@ // Package control package control + +import ( + "github.com/google/uuid" + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/node" + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/node/processor" + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/router" +) + +type nodeEntry struct { + Template node.Template + TemplateFingerprint string +} + +type Controller struct { + router router.Router + sourceRegistry any // source.Registry + processorRegistry processor.Registry + sinkRegistry any // sink.Registry + + sourceNodes map[uuid.UUID]any + processorNodes map[uuid.UUID]processor.Processor + sinkNodes map[uuid.UUID]any +} diff --git a/services/data_service/internal/control/fingerprint.go b/services/data_service/internal/control/fingerprint.go new file mode 100644 index 0000000..3078ca9 --- /dev/null +++ b/services/data_service/internal/control/fingerprint.go @@ -0,0 +1,43 @@ +package control + +import ( + "encoding/binary" + "encoding/hex" + "sort" + + "lukechampine.com/blake3" +) + +func StreamFingerprint(templateFP, outPort string, inMap map[string]string) string { + // Sort input keys for determinism. + keys := make([]string, 0, len(inMap)) + for k := range inMap { + keys = append(keys, k) + } + sort.Strings(keys) + + h := blake3.New(32, nil) + + write := func(s string) { + var lenbuf [4]byte + binary.LittleEndian.PutUint32(lenbuf[:], uint32(len(s))) + _, _ = h.Write(lenbuf[:]) + _, _ = h.Write([]byte(s)) + } + + // templateFP, outPort, input count, then pairs. + write(templateFP) + write(outPort) + + var nbuf [4]byte + binary.LittleEndian.PutUint32(nbuf[:], uint32(len(keys))) + _, _ = h.Write(nbuf[:]) + + for _, k := range keys { + write(k) + write(inMap[k]) + } + + sum := h.Sum(nil) + return hex.EncodeToString(sum) +} diff --git a/services/data_service/internal/control/planner.go b/services/data_service/internal/control/planner.go new file mode 100644 index 0000000..e69de29 diff --git a/services/data_service/internal/control/registry.go b/services/data_service/internal/control/registry.go new file mode 100644 index 0000000..e69de29 diff --git a/services/data_service/internal/control/wiring.go b/services/data_service/internal/control/wiring.go new file mode 100644 index 0000000..e69de29 diff --git a/services/data_service/internal/node/exit/session.go b/services/data_service/internal/node/exit/session.go deleted file mode 100644 index 2a668c7..0000000 --- a/services/data_service/internal/node/exit/session.go +++ /dev/null @@ -1,2 +0,0 @@ -// Package exit -package exit diff --git a/services/data_service/internal/node/io.go b/services/data_service/internal/node/io.go index d418de2..06590f1 100644 --- a/services/data_service/internal/node/io.go +++ b/services/data_service/internal/node/io.go @@ -5,10 +5,10 @@ import ( "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain" ) -type In interface { +type Receiver interface { Receive() (port string, message domain.Message, ok bool) } -type Out interface { +type Sender interface { Send(port string, message domain.Message) error } diff --git a/services/data_service/internal/node/node.go b/services/data_service/internal/node/node.go new file mode 100644 index 0000000..2bb5f16 --- /dev/null +++ b/services/data_service/internal/node/node.go @@ -0,0 +1,16 @@ +package node + +type Type string + +const ( + Source Type = "source" + Processor Type = "processor" + Sink Type = "sink" +) + +type Template struct { + Kind Type + Type string + Version string + Config string +} diff --git a/services/data_service/internal/node/processor/processor.go b/services/data_service/internal/node/processor/processor.go index 0cc7738..028e348 100644 --- a/services/data_service/internal/node/processor/processor.go +++ b/services/data_service/internal/node/processor/processor.go @@ -1,2 +1,18 @@ // Package processor package processor + +import ( + "context" + + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/node" +) + +type Processor interface { + Start(ctx context.Context, cfg string, io IO) error + Stop() +} + +type IO interface { + node.Receiver + node.Sender +} diff --git a/services/data_service/internal/node/processor/registry.go b/services/data_service/internal/node/processor/registry.go index e69de29..6e14d99 100644 --- a/services/data_service/internal/node/processor/registry.go +++ b/services/data_service/internal/node/processor/registry.go @@ -0,0 +1,11 @@ +package processor + +type Registry struct{} + +type Factory interface { + New() Processor + Type() string + Version() string + + TemplateFingerprint(cfg string) (string, error) +} diff --git a/services/data_service/internal/node/sink/sink.go b/services/data_service/internal/node/sink/sink.go new file mode 100644 index 0000000..13972e8 --- /dev/null +++ b/services/data_service/internal/node/sink/sink.go @@ -0,0 +1,17 @@ +// Package sink +package sink + +import ( + "context" + + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/node" +) + +type Sink interface { + Start(ctx context.Context, io IO) error + Stop() +} + +type IO interface { + node.Sender +} diff --git a/services/data_service/internal/node/source/registry.go b/services/data_service/internal/node/source/registry.go index e69de29..d150341 100644 --- a/services/data_service/internal/node/source/registry.go +++ b/services/data_service/internal/node/source/registry.go @@ -0,0 +1 @@ +package source diff --git a/services/data_service/internal/node/source/source.go b/services/data_service/internal/node/source/source.go index 4d6e6bc..5f4006c 100644 --- a/services/data_service/internal/node/source/source.go +++ b/services/data_service/internal/node/source/source.go @@ -1,2 +1,20 @@ // Package source package source + +import ( + "context" + + "gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/node" +) + +type Source interface { + Start(ctx context.Context, cfg string, io IO) error + Stop() + + Serve(key string) error + Unserve(key string) error +} + +type IO interface { + node.Sender +} diff --git a/services/data_service/internal/router/router.go b/services/data_service/internal/router/router.go index e69de29..fef5b05 100644 --- a/services/data_service/internal/router/router.go +++ b/services/data_service/internal/router/router.go @@ -0,0 +1,9 @@ +// Package router +package router + +import "github.com/google/uuid" + +type Router interface { + Route(fromNode uuid.UUID, fromOutPort string, toNode uuid.UUID, toInPort string) + Unroute(fromNode uuid.UUID, fromOutPort string, toNode uuid.UUID, toInPort string) +} diff --git a/services/data_service/internal/router/single/engine.go b/services/data_service/internal/router/single/engine.go index e69de29..62c7ddc 100644 --- a/services/data_service/internal/router/single/engine.go +++ b/services/data_service/internal/router/single/engine.go @@ -0,0 +1,2 @@ +// Package single +package single