5 Commits

45 changed files with 506 additions and 1201 deletions

View File

@@ -1,11 +0,0 @@
PROTO_DIR := pkg/pb
PROTO_FILES := $(shell find $(PROTO_DIR) -name '*.proto')
.PHONY: proto
proto:
@echo "Generating Go code from Protobuf..."
protoc \
--go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
$(PROTO_FILES)
@echo "Protobuf generation complete."

173
cmd/datad/main.go Normal file
View File

@@ -0,0 +1,173 @@
package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/google/uuid"
"gitlab.michelsen.id/phillmichelsen/tessera/pkg/data"
"gitlab.michelsen.id/phillmichelsen/tessera/pkg/data/routing"
)
type SeqPayload struct {
Seq uint64
}
type streamStats struct {
sent uint64
observed uint64
missed uint64
lastSeen uint64
lastReport time.Time
}
func main() {
ctx := context.Background()
// ---- Knobs ----
N := 10
duration := 5 * time.Second
totalTargetPerSec := 5_000 // total across all streams
// ----------------
rt := routing.NewInprocRouter()
senders := make([]data.Sender, N)
receivers := make([]data.Receiver, N)
for i := range N {
st, err := rt.OpenStream(data.StreamID(uuid.New()))
if err != nil {
panic(err)
}
senders[i] = st.Sender()
receivers[i] = st.Receiver()
}
perStreamTarget := totalTargetPerSec / N
if perStreamTarget == 0 {
perStreamTarget = 1
}
fmt.Printf("N=%d duration=%s totalTarget=%d/s perStreamTarget=%d/s\n",
N, duration, totalTargetPerSec, perStreamTarget)
stopAt := time.Now().Add(duration)
stats := make([]streamStats, N)
var wg sync.WaitGroup
wg.Add(N + 1)
// Publisher: per-stream sender sequence in envelope payload.
go func() {
defer wg.Done()
tick := time.NewTicker(1 * time.Millisecond)
defer tick.Stop()
perTick := perStreamTarget / 1000
rem := perStreamTarget % 1000
remAcc := make([]int, N)
seq := make([]uint64, N)
for time.Now().Before(stopAt) {
<-tick.C
for i := range N {
n := int(perTick)
remAcc[i] += rem
if remAcc[i] >= 1000 {
n++
remAcc[i] -= 1000
}
for j := 0; j < n; j++ {
seq[i]++
env := data.Envelope{
Payload: SeqPayload{Seq: seq[i]},
}
_ = senders[i].Send(ctx, env)
stats[i].sent++
}
}
}
}()
// Consumers: detect missed sender sequence numbers.
for i := range N {
idx := i
rx := receivers[i]
go func() {
defer wg.Done()
for time.Now().Before(stopAt) {
env, ok, err := rx.TryReceive()
if err != nil {
return
}
if !ok {
continue
}
p, ok := env.Payload.(SeqPayload)
if !ok {
// If your Payload is pointer/interface-heavy, adjust accordingly.
continue
}
stats[idx].observed++
if stats[idx].lastSeen == 0 {
stats[idx].lastSeen = p.Seq
continue
}
if p.Seq > stats[idx].lastSeen+1 {
stats[idx].missed += (p.Seq - stats[idx].lastSeen - 1)
}
stats[idx].lastSeen = p.Seq
}
}()
}
wg.Wait()
var totalSent, totalObs, totalMiss uint64
minDrop, maxDrop := 100.0, 0.0
for i := range N {
totalSent += stats[i].sent
totalObs += stats[i].observed
totalMiss += stats[i].missed
den := stats[i].observed + stats[i].missed
dropPct := 0.0
if den > 0 {
dropPct = 100.0 * float64(stats[i].missed) / float64(den)
}
if dropPct < minDrop {
minDrop = dropPct
}
if dropPct > maxDrop {
maxDrop = dropPct
}
fmt.Printf("stream[%02d] sent=%6d observed=%6d missed=%6d lastSeen=%6d drop=%5.2f%%\n",
i, stats[i].sent, stats[i].observed, stats[i].missed, stats[i].lastSeen, dropPct)
}
totalDen := totalObs + totalMiss
totalDrop := 0.0
if totalDen > 0 {
totalDrop = 100.0 * float64(totalMiss) / float64(totalDen)
}
fmt.Printf("\nTOTAL sent=%d observed=%d missed=%d drop=%.2f%% (min=%.2f%% max=%.2f%%)\n",
totalSent, totalObs, totalMiss, totalDrop, minDrop, maxDrop)
}

16
go.mod
View File

@@ -2,18 +2,4 @@ module gitlab.michelsen.id/phillmichelsen/tessera
go 1.25.1
require (
github.com/google/uuid v1.6.0
github.com/lmittmann/tint v1.1.2
google.golang.org/grpc v1.75.0
google.golang.org/protobuf v1.36.8
)
require (
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
golang.org/x/net v0.43.0 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/text v0.28.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250826171959-ef028d996bc1 // indirect
lukechampine.com/blake3 v1.4.1 // indirect
)
require github.com/google/uuid v1.6.0 // indirect

40
go.sum
View File

@@ -1,42 +1,2 @@
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/lmittmann/tint v1.1.2 h1:2CQzrL6rslrsyjqLDwD11bZ5OpLBPU+g3G/r5LSfS8w=
github.com/lmittmann/tint v1.1.2/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ=
go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I=
go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE=
go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E=
go.opentelemetry.io/otel/sdk v1.37.0 h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI=
go.opentelemetry.io/otel/sdk v1.37.0/go.mod h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg=
go.opentelemetry.io/otel/sdk/metric v1.37.0 h1:90lI228XrB9jCMuSdA0673aubgRobVZFhbjxHHspCPc=
go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps=
go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4=
go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0=
golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE=
golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250826171959-ef028d996bc1 h1:pmJpJEvT846VzausCQ5d7KreSROcDqmO388w5YbnltA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250826171959-ef028d996bc1/go.mod h1:GmFNa4BdJZ2a8G+wCe9Bg3wwThLrJun751XstdJt5Og=
google.golang.org/grpc v1.75.0 h1:+TW+dqTd2Biwe6KKfhE5JpiYIBWq865PhKGSXiivqt4=
google.golang.org/grpc v1.75.0/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ=
google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=
google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
lukechampine.com/blake3 v1.4.1 h1:I3Smz7gso8w4/TunLKec6K2fn+kyKtDxr/xcQEN84Wg=
lukechampine.com/blake3 v1.4.1/go.mod h1:QFosUxmjB8mnrWFSNwKmvxHpfY72bmD2tQ0kBMM3kwo=

33
pkg/data/component.go Normal file
View File

@@ -0,0 +1,33 @@
package data
import "context"
type Component interface {
Start(ctx context.Context) error
Stop(ctx context.Context) error
ValidateConfig(cfg ComponentConfig) (ComponentConfigValidation, error)
ApplyConfig(cfg ComponentConfig) error
Status() ComponentStatus
}
type ComponentRuntime interface {
OpenStream(id StreamID) (Stream, error)
ReportError(fatal bool, err error)
}
type ComponentStatus struct {
Active bool
Patching bool
Config any
Error error
}
type ComponentConfig any
type ComponentConfigValidation struct {
Valid bool
RequiresRestart bool
Warnings []string
}

9
pkg/data/descriptor.go Normal file
View File

@@ -0,0 +1,9 @@
package data
type Descriptor struct {
Type string
Key string
Schema string
Attributes map[string]string
Tags []string
}

13
pkg/data/envelope.go Normal file
View File

@@ -0,0 +1,13 @@
// Package data ...
package data
import (
"time"
)
type Envelope struct {
SendTime time.Time
Descriptor Descriptor
Payload any
}

1
pkg/data/errors.go Normal file
View File

@@ -0,0 +1 @@
package data

12
pkg/data/events/bar.go Normal file
View File

@@ -0,0 +1,12 @@
package events
import "time"
type Bar struct {
Open float64
High float64
Low float64
Close float64
Volume float64
Interval time.Duration
}

View File

@@ -0,0 +1,6 @@
package events
type Custom struct {
Bytes []byte
ContentType string
}

22
pkg/data/events/domain.go Normal file
View File

@@ -0,0 +1,22 @@
// Package events ...
package events
type DataType uint8
const (
TradeType DataType = iota
QuoteType
BarType
MBPDeltaType
MBPSnapshotType
MBODeltaType
MBOSnapshotType
CustomType
)
type Side uint8
const (
Bid Side = iota
Ask
)

View File

@@ -0,0 +1,20 @@
package events
type MBODelta struct {
Operation MBOOrderOp
OrderID string
Side Side
Price float64
Size float64
IsMaker bool
Seq uint64
ParentID string
}
type MBOOrderOp uint8
const (
OrderAdd MBOOrderOp = iota
OrderMod
OrderDel
)

View File

@@ -0,0 +1,14 @@
package events
type MBOSnapshot struct {
Orders []OrderEntry
Seq uint64
}
type OrderEntry struct {
OrderID string
Side Side
Price float64
Size float64
IsMaker bool
}

View File

@@ -0,0 +1,8 @@
package events
type MBPDelta struct {
Side Side
Price float64
Size float64
Seq uint64
}

View File

@@ -0,0 +1,13 @@
package events
type MBPSnapshot struct {
Bids []PriceLevel
Asks []PriceLevel
Depth int
Seq uint64
}
type PriceLevel struct {
Price float64
Size float64
}

8
pkg/data/events/quote.go Normal file
View File

@@ -0,0 +1,8 @@
package events
type Quote struct {
BidPrice float64
BidSize float64
AskPrice float64
AskSize float64
}

16
pkg/data/events/trade.go Normal file
View File

@@ -0,0 +1,16 @@
package events
type Trade struct {
Price float64
Qty float64
Aggressor AggressorSide
TradeID string
}
type AggressorSide uint8
const (
AggUnknown AggressorSide = iota
AggBuy
AggSell
)

130
pkg/data/routing/inproc.go Normal file
View File

@@ -0,0 +1,130 @@
package routing
import (
"context"
"errors"
"sync"
"time"
"gitlab.michelsen.id/phillmichelsen/tessera/pkg/data"
)
type InprocRouter struct {
mu sync.RWMutex
streams map[data.StreamID]*inprocStream
}
func NewInprocRouter() *InprocRouter {
return &InprocRouter{
streams: make(map[data.StreamID]*inprocStream),
}
}
func (r *InprocRouter) OpenStream(id data.StreamID) (data.Stream, error) {
r.mu.Lock()
defer r.mu.Unlock()
s := r.streams[id]
if s != nil {
return s, nil
}
s = newInprocStream(id)
r.streams[id] = s
return s, nil
}
type inprocStream struct {
id data.StreamID
seq uint64
latest data.Envelope
streamClosed bool
mu sync.RWMutex
}
func newInprocStream(id data.StreamID) *inprocStream {
return &inprocStream{
id: id,
}
}
func (s *inprocStream) ID() data.StreamID {
return s.id
}
func (s *inprocStream) Sender() data.Sender {
return &inprocSender{stream: s}
}
func (s *inprocStream) Receiver() data.Receiver {
s.mu.RLock()
cur := s.seq
s.mu.RUnlock()
return &inprocReceiver{
stream: s,
lastSeenSeq: cur,
}
}
type inprocSender struct {
stream *inprocStream
}
func (tx *inprocSender) Send(ctx context.Context, env data.Envelope) error {
if err := ctx.Err(); err != nil {
return err
}
s := tx.stream
s.mu.Lock()
defer s.mu.Unlock()
if s.streamClosed {
return errors.New("stream closed")
}
env.SendTime = time.Now().UTC()
s.seq++
s.latest = env
return nil
}
func (tx *inprocSender) SendBatch(ctx context.Context, envs []data.Envelope) error {
panic("unimplemented")
}
type inprocReceiver struct {
stream *inprocStream
lastSeenSeq uint64
}
func (rx *inprocReceiver) TryReceive() (data.Envelope, bool, error) {
s := rx.stream
s.mu.RLock()
defer s.mu.RUnlock()
if s.streamClosed {
return data.Envelope{}, false, errors.New("stream closed")
}
if s.seq == 0 || s.seq == rx.lastSeenSeq {
return data.Envelope{}, false, nil
}
rx.lastSeenSeq = s.seq
return s.latest, true, nil
}
func (rx *inprocReceiver) ReceiveNext(ctx context.Context) (data.Envelope, error) {
panic("unimplemented")
}
func (rx *inprocReceiver) Seq() uint64 {
return rx.lastSeenSeq
}

27
pkg/data/stream.go Normal file
View File

@@ -0,0 +1,27 @@
package data
import (
"context"
"github.com/google/uuid"
)
type StreamID uuid.UUID
type Stream interface {
ID() StreamID
Sender() Sender
Receiver() Receiver
}
type Sender interface {
Send(context.Context, Envelope) error
SendBatch(context.Context, []Envelope) error
}
type Receiver interface {
TryReceive() (Envelope, bool, error)
ReceiveNext(context.Context) (Envelope, error)
Seq() uint64
}

View File

@@ -1,523 +0,0 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc v6.32.0
// source: pkg/pb/data_service/data_service.proto
package data_service
import (
reflect "reflect"
sync "sync"
unsafe "unsafe"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type Identifier struct {
state protoimpl.MessageState `protogen:"open.v1"`
Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *Identifier) Reset() {
*x = Identifier{}
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *Identifier) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Identifier) ProtoMessage() {}
func (x *Identifier) ProtoReflect() protoreflect.Message {
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[0]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Identifier.ProtoReflect.Descriptor instead.
func (*Identifier) Descriptor() ([]byte, []int) {
return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{0}
}
func (x *Identifier) GetKey() string {
if x != nil {
return x.Key
}
return ""
}
type Message struct {
state protoimpl.MessageState `protogen:"open.v1"`
Identifier *Identifier `protobuf:"bytes,1,opt,name=identifier,proto3" json:"identifier,omitempty"`
Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
Encoding string `protobuf:"bytes,3,opt,name=encoding,proto3" json:"encoding,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *Message) Reset() {
*x = Message{}
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *Message) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Message) ProtoMessage() {}
func (x *Message) ProtoReflect() protoreflect.Message {
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[1]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Message.ProtoReflect.Descriptor instead.
func (*Message) Descriptor() ([]byte, []int) {
return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{1}
}
func (x *Message) GetIdentifier() *Identifier {
if x != nil {
return x.Identifier
}
return nil
}
func (x *Message) GetPayload() []byte {
if x != nil {
return x.Payload
}
return nil
}
func (x *Message) GetEncoding() string {
if x != nil {
return x.Encoding
}
return ""
}
type StartStreamRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *StartStreamRequest) Reset() {
*x = StartStreamRequest{}
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *StartStreamRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StartStreamRequest) ProtoMessage() {}
func (x *StartStreamRequest) ProtoReflect() protoreflect.Message {
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[2]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StartStreamRequest.ProtoReflect.Descriptor instead.
func (*StartStreamRequest) Descriptor() ([]byte, []int) {
return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{2}
}
type StartStreamResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
StreamUuid string `protobuf:"bytes,1,opt,name=stream_uuid,json=streamUuid,proto3" json:"stream_uuid,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *StartStreamResponse) Reset() {
*x = StartStreamResponse{}
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *StartStreamResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StartStreamResponse) ProtoMessage() {}
func (x *StartStreamResponse) ProtoReflect() protoreflect.Message {
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[3]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StartStreamResponse.ProtoReflect.Descriptor instead.
func (*StartStreamResponse) Descriptor() ([]byte, []int) {
return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{3}
}
func (x *StartStreamResponse) GetStreamUuid() string {
if x != nil {
return x.StreamUuid
}
return ""
}
type ConfigureStreamRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
StreamUuid string `protobuf:"bytes,1,opt,name=stream_uuid,json=streamUuid,proto3" json:"stream_uuid,omitempty"`
Identifiers []*Identifier `protobuf:"bytes,2,rep,name=identifiers,proto3" json:"identifiers,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ConfigureStreamRequest) Reset() {
*x = ConfigureStreamRequest{}
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ConfigureStreamRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ConfigureStreamRequest) ProtoMessage() {}
func (x *ConfigureStreamRequest) ProtoReflect() protoreflect.Message {
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[4]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ConfigureStreamRequest.ProtoReflect.Descriptor instead.
func (*ConfigureStreamRequest) Descriptor() ([]byte, []int) {
return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{4}
}
func (x *ConfigureStreamRequest) GetStreamUuid() string {
if x != nil {
return x.StreamUuid
}
return ""
}
func (x *ConfigureStreamRequest) GetIdentifiers() []*Identifier {
if x != nil {
return x.Identifiers
}
return nil
}
type ConfigureStreamResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ConfigureStreamResponse) Reset() {
*x = ConfigureStreamResponse{}
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ConfigureStreamResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ConfigureStreamResponse) ProtoMessage() {}
func (x *ConfigureStreamResponse) ProtoReflect() protoreflect.Message {
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[5]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ConfigureStreamResponse.ProtoReflect.Descriptor instead.
func (*ConfigureStreamResponse) Descriptor() ([]byte, []int) {
return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{5}
}
type StopStreamRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
StreamUuid string `protobuf:"bytes,1,opt,name=stream_uuid,json=streamUuid,proto3" json:"stream_uuid,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *StopStreamRequest) Reset() {
*x = StopStreamRequest{}
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[6]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *StopStreamRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StopStreamRequest) ProtoMessage() {}
func (x *StopStreamRequest) ProtoReflect() protoreflect.Message {
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[6]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StopStreamRequest.ProtoReflect.Descriptor instead.
func (*StopStreamRequest) Descriptor() ([]byte, []int) {
return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{6}
}
func (x *StopStreamRequest) GetStreamUuid() string {
if x != nil {
return x.StreamUuid
}
return ""
}
type StopStreamResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *StopStreamResponse) Reset() {
*x = StopStreamResponse{}
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[7]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *StopStreamResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StopStreamResponse) ProtoMessage() {}
func (x *StopStreamResponse) ProtoReflect() protoreflect.Message {
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[7]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StopStreamResponse.ProtoReflect.Descriptor instead.
func (*StopStreamResponse) Descriptor() ([]byte, []int) {
return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{7}
}
type ConnectStreamRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
StreamUuid string `protobuf:"bytes,1,opt,name=stream_uuid,json=streamUuid,proto3" json:"stream_uuid,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ConnectStreamRequest) Reset() {
*x = ConnectStreamRequest{}
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[8]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ConnectStreamRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ConnectStreamRequest) ProtoMessage() {}
func (x *ConnectStreamRequest) ProtoReflect() protoreflect.Message {
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[8]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ConnectStreamRequest.ProtoReflect.Descriptor instead.
func (*ConnectStreamRequest) Descriptor() ([]byte, []int) {
return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{8}
}
func (x *ConnectStreamRequest) GetStreamUuid() string {
if x != nil {
return x.StreamUuid
}
return ""
}
var File_pkg_pb_data_service_data_service_proto protoreflect.FileDescriptor
const file_pkg_pb_data_service_data_service_proto_rawDesc = "" +
"\n" +
"&pkg/pb/data_service/data_service.proto\x12\fdata_service\"\x1e\n" +
"\n" +
"Identifier\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\"y\n" +
"\aMessage\x128\n" +
"\n" +
"identifier\x18\x01 \x01(\v2\x18.data_service.IdentifierR\n" +
"identifier\x12\x18\n" +
"\apayload\x18\x02 \x01(\fR\apayload\x12\x1a\n" +
"\bencoding\x18\x03 \x01(\tR\bencoding\"\x14\n" +
"\x12StartStreamRequest\"6\n" +
"\x13StartStreamResponse\x12\x1f\n" +
"\vstream_uuid\x18\x01 \x01(\tR\n" +
"streamUuid\"u\n" +
"\x16ConfigureStreamRequest\x12\x1f\n" +
"\vstream_uuid\x18\x01 \x01(\tR\n" +
"streamUuid\x12:\n" +
"\videntifiers\x18\x02 \x03(\v2\x18.data_service.IdentifierR\videntifiers\"\x19\n" +
"\x17ConfigureStreamResponse\"4\n" +
"\x11StopStreamRequest\x12\x1f\n" +
"\vstream_uuid\x18\x01 \x01(\tR\n" +
"streamUuid\"\x14\n" +
"\x12StopStreamResponse\"7\n" +
"\x14ConnectStreamRequest\x12\x1f\n" +
"\vstream_uuid\x18\x01 \x01(\tR\n" +
"streamUuid2\x99\x02\n" +
"\x12DataServiceControl\x12R\n" +
"\vStartStream\x12 .data_service.StartStreamRequest\x1a!.data_service.StartStreamResponse\x12O\n" +
"\n" +
"StopStream\x12\x1f.data_service.StopStreamRequest\x1a .data_service.StopStreamResponse\x12^\n" +
"\x0fConfigureStream\x12$.data_service.ConfigureStreamRequest\x1a%.data_service.ConfigureStreamResponse2d\n" +
"\x14DataServiceStreaming\x12L\n" +
"\rConnectStream\x12\".data_service.ConnectStreamRequest\x1a\x15.data_service.Message0\x01BMZKgitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service;data_serviceb\x06proto3"
var (
file_pkg_pb_data_service_data_service_proto_rawDescOnce sync.Once
file_pkg_pb_data_service_data_service_proto_rawDescData []byte
)
func file_pkg_pb_data_service_data_service_proto_rawDescGZIP() []byte {
file_pkg_pb_data_service_data_service_proto_rawDescOnce.Do(func() {
file_pkg_pb_data_service_data_service_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_pkg_pb_data_service_data_service_proto_rawDesc), len(file_pkg_pb_data_service_data_service_proto_rawDesc)))
})
return file_pkg_pb_data_service_data_service_proto_rawDescData
}
var file_pkg_pb_data_service_data_service_proto_msgTypes = make([]protoimpl.MessageInfo, 9)
var file_pkg_pb_data_service_data_service_proto_goTypes = []any{
(*Identifier)(nil), // 0: data_service.Identifier
(*Message)(nil), // 1: data_service.Message
(*StartStreamRequest)(nil), // 2: data_service.StartStreamRequest
(*StartStreamResponse)(nil), // 3: data_service.StartStreamResponse
(*ConfigureStreamRequest)(nil), // 4: data_service.ConfigureStreamRequest
(*ConfigureStreamResponse)(nil), // 5: data_service.ConfigureStreamResponse
(*StopStreamRequest)(nil), // 6: data_service.StopStreamRequest
(*StopStreamResponse)(nil), // 7: data_service.StopStreamResponse
(*ConnectStreamRequest)(nil), // 8: data_service.ConnectStreamRequest
}
var file_pkg_pb_data_service_data_service_proto_depIdxs = []int32{
0, // 0: data_service.Message.identifier:type_name -> data_service.Identifier
0, // 1: data_service.ConfigureStreamRequest.identifiers:type_name -> data_service.Identifier
2, // 2: data_service.DataServiceControl.StartStream:input_type -> data_service.StartStreamRequest
6, // 3: data_service.DataServiceControl.StopStream:input_type -> data_service.StopStreamRequest
4, // 4: data_service.DataServiceControl.ConfigureStream:input_type -> data_service.ConfigureStreamRequest
8, // 5: data_service.DataServiceStreaming.ConnectStream:input_type -> data_service.ConnectStreamRequest
3, // 6: data_service.DataServiceControl.StartStream:output_type -> data_service.StartStreamResponse
7, // 7: data_service.DataServiceControl.StopStream:output_type -> data_service.StopStreamResponse
5, // 8: data_service.DataServiceControl.ConfigureStream:output_type -> data_service.ConfigureStreamResponse
1, // 9: data_service.DataServiceStreaming.ConnectStream:output_type -> data_service.Message
6, // [6:10] is the sub-list for method output_type
2, // [2:6] is the sub-list for method input_type
2, // [2:2] is the sub-list for extension type_name
2, // [2:2] is the sub-list for extension extendee
0, // [0:2] is the sub-list for field type_name
}
func init() { file_pkg_pb_data_service_data_service_proto_init() }
func file_pkg_pb_data_service_data_service_proto_init() {
if File_pkg_pb_data_service_data_service_proto != nil {
return
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_pkg_pb_data_service_data_service_proto_rawDesc), len(file_pkg_pb_data_service_data_service_proto_rawDesc)),
NumEnums: 0,
NumMessages: 9,
NumExtensions: 0,
NumServices: 2,
},
GoTypes: file_pkg_pb_data_service_data_service_proto_goTypes,
DependencyIndexes: file_pkg_pb_data_service_data_service_proto_depIdxs,
MessageInfos: file_pkg_pb_data_service_data_service_proto_msgTypes,
}.Build()
File_pkg_pb_data_service_data_service_proto = out.File
file_pkg_pb_data_service_data_service_proto_goTypes = nil
file_pkg_pb_data_service_data_service_proto_depIdxs = nil
}

View File

@@ -1,43 +0,0 @@
syntax = "proto3";
package data_service;
option go_package = "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service;data_service";
service DataServiceControl {
rpc StartStream(StartStreamRequest) returns (StartStreamResponse);
rpc StopStream(StopStreamRequest) returns (StopStreamResponse);
rpc ConfigureStream(ConfigureStreamRequest) returns (ConfigureStreamResponse);
}
service DataServiceStreaming {
rpc ConnectStream(ConnectStreamRequest) returns (stream Message);
}
message Identifier {
string key = 1;
}
message Pattern {
string key = 2;
}
message Message {
Identifier identifier = 1;
bytes payload = 2;
string encoding = 3;
}
message StartStreamRequest {}
message StartStreamResponse { string stream_uuid = 1; }
message ConfigureStreamRequest {
string stream_uuid = 1;
repeated Pattern patterns = 2;
}
message ConfigureStreamResponse {}
message StopStreamRequest { string stream_uuid = 1; }
message StopStreamResponse {}
message ConnectStreamRequest { string stream_uuid = 1; }

View File

@@ -1,303 +0,0 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.5.1
// - protoc v6.32.0
// source: pkg/pb/data_service/data_service.proto
package data_service
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.64.0 or later.
const _ = grpc.SupportPackageIsVersion9
const (
DataServiceControl_StartStream_FullMethodName = "/data_service.DataServiceControl/StartStream"
DataServiceControl_StopStream_FullMethodName = "/data_service.DataServiceControl/StopStream"
DataServiceControl_ConfigureStream_FullMethodName = "/data_service.DataServiceControl/ConfigureStream"
)
// DataServiceControlClient is the client API for DataServiceControl service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type DataServiceControlClient interface {
StartStream(ctx context.Context, in *StartStreamRequest, opts ...grpc.CallOption) (*StartStreamResponse, error)
StopStream(ctx context.Context, in *StopStreamRequest, opts ...grpc.CallOption) (*StopStreamResponse, error)
ConfigureStream(ctx context.Context, in *ConfigureStreamRequest, opts ...grpc.CallOption) (*ConfigureStreamResponse, error)
}
type dataServiceControlClient struct {
cc grpc.ClientConnInterface
}
func NewDataServiceControlClient(cc grpc.ClientConnInterface) DataServiceControlClient {
return &dataServiceControlClient{cc}
}
func (c *dataServiceControlClient) StartStream(ctx context.Context, in *StartStreamRequest, opts ...grpc.CallOption) (*StartStreamResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(StartStreamResponse)
err := c.cc.Invoke(ctx, DataServiceControl_StartStream_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *dataServiceControlClient) StopStream(ctx context.Context, in *StopStreamRequest, opts ...grpc.CallOption) (*StopStreamResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(StopStreamResponse)
err := c.cc.Invoke(ctx, DataServiceControl_StopStream_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *dataServiceControlClient) ConfigureStream(ctx context.Context, in *ConfigureStreamRequest, opts ...grpc.CallOption) (*ConfigureStreamResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ConfigureStreamResponse)
err := c.cc.Invoke(ctx, DataServiceControl_ConfigureStream_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
// DataServiceControlServer is the server API for DataServiceControl service.
// All implementations must embed UnimplementedDataServiceControlServer
// for forward compatibility.
type DataServiceControlServer interface {
StartStream(context.Context, *StartStreamRequest) (*StartStreamResponse, error)
StopStream(context.Context, *StopStreamRequest) (*StopStreamResponse, error)
ConfigureStream(context.Context, *ConfigureStreamRequest) (*ConfigureStreamResponse, error)
mustEmbedUnimplementedDataServiceControlServer()
}
// UnimplementedDataServiceControlServer must be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedDataServiceControlServer struct{}
func (UnimplementedDataServiceControlServer) StartStream(context.Context, *StartStreamRequest) (*StartStreamResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method StartStream not implemented")
}
func (UnimplementedDataServiceControlServer) StopStream(context.Context, *StopStreamRequest) (*StopStreamResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method StopStream not implemented")
}
func (UnimplementedDataServiceControlServer) ConfigureStream(context.Context, *ConfigureStreamRequest) (*ConfigureStreamResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ConfigureStream not implemented")
}
func (UnimplementedDataServiceControlServer) mustEmbedUnimplementedDataServiceControlServer() {}
func (UnimplementedDataServiceControlServer) testEmbeddedByValue() {}
// UnsafeDataServiceControlServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to DataServiceControlServer will
// result in compilation errors.
type UnsafeDataServiceControlServer interface {
mustEmbedUnimplementedDataServiceControlServer()
}
func RegisterDataServiceControlServer(s grpc.ServiceRegistrar, srv DataServiceControlServer) {
// If the following call pancis, it indicates UnimplementedDataServiceControlServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&DataServiceControl_ServiceDesc, srv)
}
func _DataServiceControl_StartStream_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(StartStreamRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DataServiceControlServer).StartStream(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: DataServiceControl_StartStream_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DataServiceControlServer).StartStream(ctx, req.(*StartStreamRequest))
}
return interceptor(ctx, in, info, handler)
}
func _DataServiceControl_StopStream_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(StopStreamRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DataServiceControlServer).StopStream(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: DataServiceControl_StopStream_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DataServiceControlServer).StopStream(ctx, req.(*StopStreamRequest))
}
return interceptor(ctx, in, info, handler)
}
func _DataServiceControl_ConfigureStream_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ConfigureStreamRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DataServiceControlServer).ConfigureStream(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: DataServiceControl_ConfigureStream_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DataServiceControlServer).ConfigureStream(ctx, req.(*ConfigureStreamRequest))
}
return interceptor(ctx, in, info, handler)
}
// DataServiceControl_ServiceDesc is the grpc.ServiceDesc for DataServiceControl service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var DataServiceControl_ServiceDesc = grpc.ServiceDesc{
ServiceName: "data_service.DataServiceControl",
HandlerType: (*DataServiceControlServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "StartStream",
Handler: _DataServiceControl_StartStream_Handler,
},
{
MethodName: "StopStream",
Handler: _DataServiceControl_StopStream_Handler,
},
{
MethodName: "ConfigureStream",
Handler: _DataServiceControl_ConfigureStream_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "pkg/pb/data_service/data_service.proto",
}
const (
DataServiceStreaming_ConnectStream_FullMethodName = "/data_service.DataServiceStreaming/ConnectStream"
)
// DataServiceStreamingClient is the client API for DataServiceStreaming service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type DataServiceStreamingClient interface {
ConnectStream(ctx context.Context, in *ConnectStreamRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Message], error)
}
type dataServiceStreamingClient struct {
cc grpc.ClientConnInterface
}
func NewDataServiceStreamingClient(cc grpc.ClientConnInterface) DataServiceStreamingClient {
return &dataServiceStreamingClient{cc}
}
func (c *dataServiceStreamingClient) ConnectStream(ctx context.Context, in *ConnectStreamRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Message], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &DataServiceStreaming_ServiceDesc.Streams[0], DataServiceStreaming_ConnectStream_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &grpc.GenericClientStream[ConnectStreamRequest, Message]{ClientStream: stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type DataServiceStreaming_ConnectStreamClient = grpc.ServerStreamingClient[Message]
// DataServiceStreamingServer is the server API for DataServiceStreaming service.
// All implementations must embed UnimplementedDataServiceStreamingServer
// for forward compatibility.
type DataServiceStreamingServer interface {
ConnectStream(*ConnectStreamRequest, grpc.ServerStreamingServer[Message]) error
mustEmbedUnimplementedDataServiceStreamingServer()
}
// UnimplementedDataServiceStreamingServer must be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedDataServiceStreamingServer struct{}
func (UnimplementedDataServiceStreamingServer) ConnectStream(*ConnectStreamRequest, grpc.ServerStreamingServer[Message]) error {
return status.Errorf(codes.Unimplemented, "method ConnectStream not implemented")
}
func (UnimplementedDataServiceStreamingServer) mustEmbedUnimplementedDataServiceStreamingServer() {}
func (UnimplementedDataServiceStreamingServer) testEmbeddedByValue() {}
// UnsafeDataServiceStreamingServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to DataServiceStreamingServer will
// result in compilation errors.
type UnsafeDataServiceStreamingServer interface {
mustEmbedUnimplementedDataServiceStreamingServer()
}
func RegisterDataServiceStreamingServer(s grpc.ServiceRegistrar, srv DataServiceStreamingServer) {
// If the following call pancis, it indicates UnimplementedDataServiceStreamingServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&DataServiceStreaming_ServiceDesc, srv)
}
func _DataServiceStreaming_ConnectStream_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(ConnectStreamRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(DataServiceStreamingServer).ConnectStream(m, &grpc.GenericServerStream[ConnectStreamRequest, Message]{ServerStream: stream})
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type DataServiceStreaming_ConnectStreamServer = grpc.ServerStreamingServer[Message]
// DataServiceStreaming_ServiceDesc is the grpc.ServiceDesc for DataServiceStreaming service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var DataServiceStreaming_ServiceDesc = grpc.ServiceDesc{
ServiceName: "data_service.DataServiceStreaming",
HandlerType: (*DataServiceStreamingServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "ConnectStream",
Handler: _DataServiceStreaming_ConnectStream_Handler,
ServerStreams: true,
},
},
Metadata: "pkg/pb/data_service/data_service.proto",
}

View File

@@ -1 +0,0 @@
package cmd

View File

@@ -1,21 +0,0 @@
# ---- Builder ----
FROM golang:1.24-alpine AS builder
ENV CGO_ENABLED=0 GOOS=linux
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN --mount=type=cache,target=/root/.cache/go-build \
--mount=type=cache,target=/go/pkg/mod \
go build -trimpath -ldflags="-s -w" \
-o /out/data-service ./services/data_service/cmd/data_service
# ---- Runtime ----
FROM gcr.io/distroless/static:nonroot
EXPOSE 50051 50052 6000
COPY --from=builder /out/data-service /bin/data-service
USER nonroot:nonroot
ENTRYPOINT ["/bin/data-service"]

View File

@@ -1,50 +0,0 @@
package main
import (
"log/slog"
"os"
"time"
"github.com/lmittmann/tint"
)
func initLogger() *slog.Logger {
level := parseLevel(env("LOG_LEVEL", "debug"))
if env("LOG_FORMAT", "pretty") == "json" {
return slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: level,
}))
}
return slog.New(tint.NewHandler(os.Stdout, &tint.Options{
Level: level,
TimeFormat: time.RFC3339Nano,
NoColor: os.Getenv("NO_COLOR") != "",
}))
}
func parseLevel(s string) slog.Level {
switch s {
case "debug":
return slog.LevelDebug
case "warn":
return slog.LevelWarn
case "error":
return slog.LevelError
default:
return slog.LevelInfo
}
}
func env(k, def string) string {
if v := os.Getenv(k); v != "" {
return v
}
return def
}
func main() {
slog.SetDefault(initLogger())
slog.Info("starting", "svc", "data-service")
select {}
}

View File

@@ -1,25 +0,0 @@
// Package control
package control
import (
"github.com/google/uuid"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/node"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/node/processor"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/router"
)
type nodeEntry struct {
Template node.Template
TemplateFingerprint string
}
type Controller struct {
router router.Router
sourceRegistry any // source.Registry
processorRegistry processor.Registry
sinkRegistry any // sink.Registry
sourceNodes map[uuid.UUID]any
processorNodes map[uuid.UUID]processor.Processor
sinkNodes map[uuid.UUID]any
}

View File

@@ -1,43 +0,0 @@
package control
import (
"encoding/binary"
"encoding/hex"
"sort"
"lukechampine.com/blake3"
)
func StreamFingerprint(templateFP, outPort string, inMap map[string]string) string {
// Sort input keys for determinism.
keys := make([]string, 0, len(inMap))
for k := range inMap {
keys = append(keys, k)
}
sort.Strings(keys)
h := blake3.New(32, nil)
write := func(s string) {
var lenbuf [4]byte
binary.LittleEndian.PutUint32(lenbuf[:], uint32(len(s)))
_, _ = h.Write(lenbuf[:])
_, _ = h.Write([]byte(s))
}
// templateFP, outPort, input count, then pairs.
write(templateFP)
write(outPort)
var nbuf [4]byte
binary.LittleEndian.PutUint32(nbuf[:], uint32(len(keys)))
_, _ = h.Write(nbuf[:])
for _, k := range keys {
write(k)
write(inMap[k])
}
sum := h.Sum(nil)
return hex.EncodeToString(sum)
}

View File

@@ -1,11 +0,0 @@
// Package domain
package domain
import "time"
type Message struct {
Payload []byte
SchemaID string
Seq uint64
Timestamp time.Time
}

View File

@@ -1,14 +0,0 @@
// Package node
package node
import (
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain"
)
type Receiver interface {
Receive() (port string, message domain.Message, ok bool)
}
type Sender interface {
Send(port string, message domain.Message) error
}

View File

@@ -1,16 +0,0 @@
package node
type Type string
const (
Source Type = "source"
Processor Type = "processor"
Sink Type = "sink"
)
type Template struct {
Kind Type
Type string
Version string
Config string
}

View File

@@ -1,18 +0,0 @@
// Package processor
package processor
import (
"context"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/node"
)
type Processor interface {
Start(ctx context.Context, cfg string, io IO) error
Stop()
}
type IO interface {
node.Receiver
node.Sender
}

View File

@@ -1,11 +0,0 @@
package processor
type Registry struct{}
type Factory interface {
New() Processor
Type() string
Version() string
TemplateFingerprint(cfg string) (string, error)
}

View File

@@ -1,17 +0,0 @@
// Package sink
package sink
import (
"context"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/node"
)
type Sink interface {
Start(ctx context.Context, io IO) error
Stop()
}
type IO interface {
node.Sender
}

View File

@@ -1 +0,0 @@
package source

View File

@@ -1,20 +0,0 @@
// Package source
package source
import (
"context"
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/node"
)
type Source interface {
Start(ctx context.Context, cfg string, io IO) error
Stop()
Serve(key string) error
Unserve(key string) error
}
type IO interface {
node.Sender
}

View File

@@ -1,9 +0,0 @@
// Package router
package router
import "github.com/google/uuid"
type Router interface {
Route(fromNode uuid.UUID, fromOutPort string, toNode uuid.UUID, toInPort string)
Unroute(fromNode uuid.UUID, fromOutPort string, toNode uuid.UUID, toInPort string)
}

View File

@@ -1,2 +0,0 @@
// Package single
package single

View File

@@ -1,2 +0,0 @@
// Package server
package server

View File

@@ -1,2 +0,0 @@
// Package session
package session

View File

@@ -1 +0,0 @@
package cmd

View File

@@ -1 +0,0 @@
package cmd

View File

@@ -1 +0,0 @@
package marketdata