Continued scaffolding and interface definitions

This commit is contained in:
2025-10-12 02:20:20 +08:00
parent 2e5fd96014
commit c3e0eba4b1
18 changed files with 164 additions and 15 deletions

View File

@@ -1,2 +1,25 @@
// Package control
package control
import (
"github.com/google/uuid"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/node"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/node/processor"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/router"
)
type nodeEntry struct {
Template node.Template
TemplateFingerprint string
}
type Controller struct {
router router.Router
sourceRegistry any // source.Registry
processorRegistry processor.Registry
sinkRegistry any // sink.Registry
sourceNodes map[uuid.UUID]any
processorNodes map[uuid.UUID]processor.Processor
sinkNodes map[uuid.UUID]any
}

View File

@@ -0,0 +1,43 @@
package control
import (
"encoding/binary"
"encoding/hex"
"sort"
"lukechampine.com/blake3"
)
func StreamFingerprint(templateFP, outPort string, inMap map[string]string) string {
// Sort input keys for determinism.
keys := make([]string, 0, len(inMap))
for k := range inMap {
keys = append(keys, k)
}
sort.Strings(keys)
h := blake3.New(32, nil)
write := func(s string) {
var lenbuf [4]byte
binary.LittleEndian.PutUint32(lenbuf[:], uint32(len(s)))
_, _ = h.Write(lenbuf[:])
_, _ = h.Write([]byte(s))
}
// templateFP, outPort, input count, then pairs.
write(templateFP)
write(outPort)
var nbuf [4]byte
binary.LittleEndian.PutUint32(nbuf[:], uint32(len(keys)))
_, _ = h.Write(nbuf[:])
for _, k := range keys {
write(k)
write(inMap[k])
}
sum := h.Sum(nil)
return hex.EncodeToString(sum)
}

View File

@@ -1,2 +0,0 @@
// Package exit
package exit

View File

@@ -5,10 +5,10 @@ import (
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain"
)
type In interface {
type Receiver interface {
Receive() (port string, message domain.Message, ok bool)
}
type Out interface {
type Sender interface {
Send(port string, message domain.Message) error
}

View File

@@ -0,0 +1,16 @@
package node
type Type string
const (
Source Type = "source"
Processor Type = "processor"
Sink Type = "sink"
)
type Template struct {
Kind Type
Type string
Version string
Config string
}

View File

@@ -1,2 +1,18 @@
// Package processor
package processor
import (
"context"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/node"
)
type Processor interface {
Start(ctx context.Context, cfg string, io IO) error
Stop()
}
type IO interface {
node.Receiver
node.Sender
}

View File

@@ -0,0 +1,11 @@
package processor
type Registry struct{}
type Factory interface {
New() Processor
Type() string
Version() string
TemplateFingerprint(cfg string) (string, error)
}

View File

@@ -0,0 +1,17 @@
// Package sink
package sink
import (
"context"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/node"
)
type Sink interface {
Start(ctx context.Context, io IO) error
Stop()
}
type IO interface {
node.Sender
}

View File

@@ -0,0 +1 @@
package source

View File

@@ -1,2 +1,20 @@
// Package source
package source
import (
"context"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/node"
)
type Source interface {
Start(ctx context.Context, cfg string, io IO) error
Stop()
Serve(key string) error
Unserve(key string) error
}
type IO interface {
node.Sender
}

View File

@@ -0,0 +1,9 @@
// Package router
package router
import "github.com/google/uuid"
type Router interface {
Route(fromNode uuid.UUID, fromOutPort string, toNode uuid.UUID, toInPort string)
Unroute(fromNode uuid.UUID, fromOutPort string, toNode uuid.UUID, toInPort string)
}

View File

@@ -0,0 +1,2 @@
// Package single
package single