Compare commits
2 Commits
3-rework-a
...
2-rework-w
| Author | SHA1 | Date | |
|---|---|---|---|
| c3e0eba4b1 | |||
| 2e5fd96014 |
11
Makefile
11
Makefile
@@ -0,0 +1,11 @@
|
|||||||
|
PROTO_DIR := pkg/pb
|
||||||
|
PROTO_FILES := $(shell find $(PROTO_DIR) -name '*.proto')
|
||||||
|
|
||||||
|
.PHONY: proto
|
||||||
|
proto:
|
||||||
|
@echo "Generating Go code from Protobuf..."
|
||||||
|
protoc \
|
||||||
|
--go_out=. --go_opt=paths=source_relative \
|
||||||
|
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
|
||||||
|
$(PROTO_FILES)
|
||||||
|
@echo "Protobuf generation complete."
|
||||||
|
|||||||
@@ -1,314 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"math/rand"
|
|
||||||
"runtime"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"gitlab.michelsen.id/phillmichelsen/tessera/pkg/data"
|
|
||||||
"gitlab.michelsen.id/phillmichelsen/tessera/pkg/data/routing"
|
|
||||||
)
|
|
||||||
|
|
||||||
/*
|
|
||||||
Realistic-ish market-data style test.
|
|
||||||
|
|
||||||
Model:
|
|
||||||
- 1 publisher per topic (instrument / feed partition)
|
|
||||||
- Each message carries a strictly increasing sequence number (per topic)
|
|
||||||
- Subscribers validate in-order, gap-free delivery
|
|
||||||
- Publishers send with bursty pacing to approximate L3-ish behavior:
|
|
||||||
send BurstSize messages back-to-back, then sleep to maintain AvgRate.
|
|
||||||
|
|
||||||
Defaults are intentionally moderate. Increase topics/rates to stress.
|
|
||||||
*/
|
|
||||||
|
|
||||||
const (
|
|
||||||
NumTopics = 8 // topics/instruments/partitions
|
|
||||||
SubsPerTopic = 6 // fan-out per topic
|
|
||||||
RingCapacity = 1 << 14
|
|
||||||
TestDuration = 60 * time.Second
|
|
||||||
AvgRatePerTopic = 500_000 // msgs/sec per topic (average)
|
|
||||||
BurstSize = 512 // burst messages then sleep to preserve avg
|
|
||||||
|
|
||||||
// If true, subscribers spin-poll (TryReceive). If false, blocking Receive.
|
|
||||||
UseTryReceive = false
|
|
||||||
)
|
|
||||||
|
|
||||||
type topicStats struct {
|
|
||||||
published atomic.Uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
type subStats struct {
|
|
||||||
received atomic.Uint64
|
|
||||||
errors atomic.Uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
fmt.Printf("Market-Data Routing Test\n")
|
|
||||||
fmt.Printf("Topics: %d | Subs/Topic: %d | Duration: %v\n", NumTopics, SubsPerTopic, TestDuration)
|
|
||||||
fmt.Printf("AvgRate/Topic: %d msg/s | BurstSize: %d | Mode: %s\n\n",
|
|
||||||
AvgRatePerTopic, BurstSize, modeName())
|
|
||||||
|
|
||||||
broker := routing.NewBroker()
|
|
||||||
|
|
||||||
topics := make([]string, NumTopics)
|
|
||||||
for i := 0; i < NumTopics; i++ {
|
|
||||||
topics[i] = fmt.Sprintf("FUT_L3_%02d", i)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create publishers first to size the rings.
|
|
||||||
pubs := make([]routing.Publisher, NumTopics)
|
|
||||||
for i := 0; i < NumTopics; i++ {
|
|
||||||
pubs[i] = broker.RegisterPublisher(topics[i], RingCapacity)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Per-topic published counters (ground truth).
|
|
||||||
tStats := make([]*topicStats, NumTopics)
|
|
||||||
for i := range tStats {
|
|
||||||
tStats[i] = &topicStats{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Subscribers: attach evenly, validate ordering.
|
|
||||||
var subsWG sync.WaitGroup
|
|
||||||
sStats := make([][]*subStats, NumTopics) // [topic][sub]
|
|
||||||
for ti := 0; ti < NumTopics; ti++ {
|
|
||||||
sStats[ti] = make([]*subStats, SubsPerTopic)
|
|
||||||
for si := 0; si < SubsPerTopic; si++ {
|
|
||||||
sStats[ti][si] = &subStats{}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), TestDuration)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
start := time.Now()
|
|
||||||
|
|
||||||
for ti := 0; ti < NumTopics; ti++ {
|
|
||||||
topic := topics[ti]
|
|
||||||
for si := 0; si < SubsPerTopic; si++ {
|
|
||||||
sub := broker.RegisterSubscriber(topic)
|
|
||||||
stats := sStats[ti][si]
|
|
||||||
subsWG.Add(1)
|
|
||||||
|
|
||||||
go func(topicIndex int, subIndex int, subscriber routing.Subscriber, st *subStats) {
|
|
||||||
defer subsWG.Done()
|
|
||||||
|
|
||||||
var expected uint64 = 0
|
|
||||||
|
|
||||||
for {
|
|
||||||
if ctx.Err() != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
env data.Envelope
|
|
||||||
ok bool
|
|
||||||
err error
|
|
||||||
)
|
|
||||||
|
|
||||||
if UseTryReceive {
|
|
||||||
env, ok, err = subscriber.TryReceive()
|
|
||||||
if err != nil {
|
|
||||||
st.errors.Add(1)
|
|
||||||
cancel()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !ok {
|
|
||||||
runtime.Gosched()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
env, err = subscriber.Receive(ctx)
|
|
||||||
if err != nil {
|
|
||||||
// Context cancellation is normal at end of test.
|
|
||||||
if ctx.Err() != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
st.errors.Add(1)
|
|
||||||
cancel()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
seq, parseOK := parseSeq(env)
|
|
||||||
if !parseOK {
|
|
||||||
st.errors.Add(1)
|
|
||||||
cancel()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if seq != expected {
|
|
||||||
// Out-of-order or gap detected.
|
|
||||||
st.errors.Add(1)
|
|
||||||
cancel()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
expected++
|
|
||||||
st.received.Add(1)
|
|
||||||
}
|
|
||||||
}(ti, si, sub, stats)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Publishers: bursty pacing to approximate “average rate with bursts”.
|
|
||||||
var pubsWG sync.WaitGroup
|
|
||||||
for ti := 0; ti < NumTopics; ti++ {
|
|
||||||
pub := pubs[ti]
|
|
||||||
stats := tStats[ti]
|
|
||||||
pubsWG.Add(1)
|
|
||||||
|
|
||||||
go func(topicIndex int, p routing.Publisher, st *topicStats) {
|
|
||||||
defer pubsWG.Done()
|
|
||||||
|
|
||||||
// Maintain AvgRatePerTopic as an average by sleeping after bursts.
|
|
||||||
// burstDuration = BurstSize / AvgRatePerTopic seconds
|
|
||||||
burstNs := int64(0)
|
|
||||||
if AvgRatePerTopic > 0 {
|
|
||||||
burstNs = int64(time.Second) * int64(BurstSize) / int64(AvgRatePerTopic)
|
|
||||||
}
|
|
||||||
if burstNs <= 0 {
|
|
||||||
burstNs = 1
|
|
||||||
}
|
|
||||||
|
|
||||||
var seq uint64 = 0
|
|
||||||
|
|
||||||
// Optional small jitter to avoid perfect lockstep across topics.
|
|
||||||
jitter := time.Duration(rand.Intn(200)) * time.Microsecond
|
|
||||||
|
|
||||||
nextBurstAt := time.Now().Add(jitter)
|
|
||||||
|
|
||||||
for {
|
|
||||||
if ctx.Err() != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
if now.Before(nextBurstAt) {
|
|
||||||
time.Sleep(nextBurstAt.Sub(now))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send BurstSize messages back-to-back.
|
|
||||||
sendTime := time.Now()
|
|
||||||
for i := 0; i < BurstSize; i++ {
|
|
||||||
if ctx.Err() != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
env := data.Envelope{
|
|
||||||
SendTime: sendTime,
|
|
||||||
Descriptor: data.Descriptor{Key: "SEQ"}, // keep your existing descriptor usage
|
|
||||||
Payload: formatSeq(seq),
|
|
||||||
// Any other fields you use can be set here.
|
|
||||||
}
|
|
||||||
|
|
||||||
p.Publish(env)
|
|
||||||
seq++
|
|
||||||
}
|
|
||||||
st.published.Add(uint64(BurstSize))
|
|
||||||
|
|
||||||
// Schedule next burst to maintain average rate.
|
|
||||||
nextBurstAt = nextBurstAt.Add(time.Duration(burstNs))
|
|
||||||
}
|
|
||||||
}(ti, pub, stats)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for timeout, then stop and drain.
|
|
||||||
<-ctx.Done()
|
|
||||||
|
|
||||||
// Ensure publishers exit.
|
|
||||||
pubsWG.Wait()
|
|
||||||
|
|
||||||
// Subscribers may still be blocked; cancel again and wait.
|
|
||||||
cancel()
|
|
||||||
subsWG.Wait()
|
|
||||||
|
|
||||||
totalTime := time.Since(start)
|
|
||||||
|
|
||||||
// Report.
|
|
||||||
var totalPublished uint64
|
|
||||||
var totalReceived uint64
|
|
||||||
var totalErrors uint64
|
|
||||||
|
|
||||||
for ti := 0; ti < NumTopics; ti++ {
|
|
||||||
pub := tStats[ti].published.Load()
|
|
||||||
totalPublished += pub
|
|
||||||
|
|
||||||
var topicRecv uint64
|
|
||||||
var topicErr uint64
|
|
||||||
for si := 0; si < SubsPerTopic; si++ {
|
|
||||||
topicRecv += sStats[ti][si].received.Load()
|
|
||||||
topicErr += sStats[ti][si].errors.Load()
|
|
||||||
}
|
|
||||||
totalReceived += topicRecv
|
|
||||||
totalErrors += topicErr
|
|
||||||
|
|
||||||
// Each subscriber should have received ~published for that topic.
|
|
||||||
avgPerSub := uint64(0)
|
|
||||||
if SubsPerTopic > 0 {
|
|
||||||
avgPerSub = topicRecv / uint64(SubsPerTopic)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("Topic %s: published=%d | avg_received_per_sub=%d | sub_errors=%d\n",
|
|
||||||
topics[ti], pub, avgPerSub, topicErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
pubRate := float64(totalPublished) / totalTime.Seconds()
|
|
||||||
deliveriesRate := float64(totalReceived) / totalTime.Seconds()
|
|
||||||
|
|
||||||
fmt.Printf("\nTotal Time: %v\n", totalTime)
|
|
||||||
fmt.Printf("Total Published: %d msgs\n", totalPublished)
|
|
||||||
fmt.Printf("Total Deliveries: %d (published * subs/topic, minus cancellations)\n", totalReceived)
|
|
||||||
fmt.Printf("Publish Rate: %.2f msgs/sec (aggregate)\n", pubRate)
|
|
||||||
fmt.Printf("Delivery Rate: %.2f deliveries/sec (aggregate)\n", deliveriesRate)
|
|
||||||
fmt.Printf("Validation Errors: %d\n", totalErrors)
|
|
||||||
|
|
||||||
if totalErrors == 0 {
|
|
||||||
fmt.Printf("Result: PASS (in-order, gap-free until cancellation)\n")
|
|
||||||
} else {
|
|
||||||
fmt.Printf("Result: FAIL (see errors above; test cancels on first detected issue)\n")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func modeName() string {
|
|
||||||
if UseTryReceive {
|
|
||||||
return "TryReceive (spin)"
|
|
||||||
}
|
|
||||||
return "Receive (blocking)"
|
|
||||||
}
|
|
||||||
|
|
||||||
// formatSeq encodes the per-topic sequence into a string payload.
|
|
||||||
// This compiles whether Envelope.Payload is string or interface{} accepting string.
|
|
||||||
func formatSeq(seq uint64) string {
|
|
||||||
// Keep it cheap to parse: decimal only.
|
|
||||||
return strconv.FormatUint(seq, 10)
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseSeq(env data.Envelope) (uint64, bool) {
|
|
||||||
// If you later switch Payload to a structured type, change this accordingly.
|
|
||||||
s, ok := env.Payload.(string)
|
|
||||||
if !ok {
|
|
||||||
// If Payload is defined as string (not interface{}), remove this type assert and just use env.Payload.
|
|
||||||
// This branch is for interface{} payloads where non-string could appear.
|
|
||||||
return 0, false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fast path: no extra fields.
|
|
||||||
// If you later include pubID:seq, you can parse with strings.Cut.
|
|
||||||
if strings.IndexByte(s, ':') >= 0 {
|
|
||||||
_, right, ok := strings.Cut(s, ":")
|
|
||||||
if !ok {
|
|
||||||
return 0, false
|
|
||||||
}
|
|
||||||
s = right
|
|
||||||
}
|
|
||||||
|
|
||||||
v, err := strconv.ParseUint(s, 10, 64)
|
|
||||||
return v, err == nil
|
|
||||||
}
|
|
||||||
16
go.mod
16
go.mod
@@ -2,4 +2,18 @@ module gitlab.michelsen.id/phillmichelsen/tessera
|
|||||||
|
|
||||||
go 1.25.1
|
go 1.25.1
|
||||||
|
|
||||||
require github.com/google/uuid v1.6.0 // indirect
|
require (
|
||||||
|
github.com/google/uuid v1.6.0
|
||||||
|
github.com/lmittmann/tint v1.1.2
|
||||||
|
google.golang.org/grpc v1.75.0
|
||||||
|
google.golang.org/protobuf v1.36.8
|
||||||
|
)
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
|
||||||
|
golang.org/x/net v0.43.0 // indirect
|
||||||
|
golang.org/x/sys v0.35.0 // indirect
|
||||||
|
golang.org/x/text v0.28.0 // indirect
|
||||||
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20250826171959-ef028d996bc1 // indirect
|
||||||
|
lukechampine.com/blake3 v1.4.1 // indirect
|
||||||
|
)
|
||||||
|
|||||||
40
go.sum
40
go.sum
@@ -1,2 +1,42 @@
|
|||||||
|
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
|
||||||
|
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||||
|
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||||
|
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
||||||
|
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
|
||||||
|
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
|
||||||
|
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
||||||
|
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
||||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
|
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
|
||||||
|
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
|
||||||
|
github.com/lmittmann/tint v1.1.2 h1:2CQzrL6rslrsyjqLDwD11bZ5OpLBPU+g3G/r5LSfS8w=
|
||||||
|
github.com/lmittmann/tint v1.1.2/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE=
|
||||||
|
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
|
||||||
|
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
|
||||||
|
go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ=
|
||||||
|
go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I=
|
||||||
|
go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE=
|
||||||
|
go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E=
|
||||||
|
go.opentelemetry.io/otel/sdk v1.37.0 h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI=
|
||||||
|
go.opentelemetry.io/otel/sdk v1.37.0/go.mod h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg=
|
||||||
|
go.opentelemetry.io/otel/sdk/metric v1.37.0 h1:90lI228XrB9jCMuSdA0673aubgRobVZFhbjxHHspCPc=
|
||||||
|
go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps=
|
||||||
|
go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4=
|
||||||
|
go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0=
|
||||||
|
golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE=
|
||||||
|
golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg=
|
||||||
|
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
|
||||||
|
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||||
|
golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
|
||||||
|
golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
|
||||||
|
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
|
||||||
|
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
|
||||||
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20250826171959-ef028d996bc1 h1:pmJpJEvT846VzausCQ5d7KreSROcDqmO388w5YbnltA=
|
||||||
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20250826171959-ef028d996bc1/go.mod h1:GmFNa4BdJZ2a8G+wCe9Bg3wwThLrJun751XstdJt5Og=
|
||||||
|
google.golang.org/grpc v1.75.0 h1:+TW+dqTd2Biwe6KKfhE5JpiYIBWq865PhKGSXiivqt4=
|
||||||
|
google.golang.org/grpc v1.75.0/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ=
|
||||||
|
google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=
|
||||||
|
google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
|
||||||
|
lukechampine.com/blake3 v1.4.1 h1:I3Smz7gso8w4/TunLKec6K2fn+kyKtDxr/xcQEN84Wg=
|
||||||
|
lukechampine.com/blake3 v1.4.1/go.mod h1:QFosUxmjB8mnrWFSNwKmvxHpfY72bmD2tQ0kBMM3kwo=
|
||||||
|
|||||||
@@ -1,33 +0,0 @@
|
|||||||
package data
|
|
||||||
|
|
||||||
import "context"
|
|
||||||
|
|
||||||
type Component interface {
|
|
||||||
Start(ctx context.Context) error
|
|
||||||
Stop(ctx context.Context) error
|
|
||||||
|
|
||||||
ValidateConfig(cfg ComponentConfig) (ComponentConfigValidation, error)
|
|
||||||
ApplyConfig(cfg ComponentConfig) error
|
|
||||||
|
|
||||||
Status() ComponentStatus
|
|
||||||
}
|
|
||||||
|
|
||||||
type ComponentRuntime interface {
|
|
||||||
OpenStream(id StreamID) (Stream, error)
|
|
||||||
ReportError(fatal bool, err error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type ComponentStatus struct {
|
|
||||||
Active bool
|
|
||||||
Patching bool
|
|
||||||
Config any
|
|
||||||
Error error
|
|
||||||
}
|
|
||||||
|
|
||||||
type ComponentConfig any
|
|
||||||
|
|
||||||
type ComponentConfigValidation struct {
|
|
||||||
Valid bool
|
|
||||||
RequiresRestart bool
|
|
||||||
Warnings []string
|
|
||||||
}
|
|
||||||
@@ -1,9 +0,0 @@
|
|||||||
package data
|
|
||||||
|
|
||||||
type Descriptor struct {
|
|
||||||
Type string
|
|
||||||
Key string
|
|
||||||
Schema string
|
|
||||||
Attributes map[string]string
|
|
||||||
Tags []string
|
|
||||||
}
|
|
||||||
@@ -1,13 +0,0 @@
|
|||||||
// Package data ...
|
|
||||||
package data
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Envelope struct {
|
|
||||||
SendTime time.Time
|
|
||||||
|
|
||||||
Descriptor Descriptor
|
|
||||||
Payload any
|
|
||||||
}
|
|
||||||
@@ -1 +0,0 @@
|
|||||||
package data
|
|
||||||
@@ -1,12 +0,0 @@
|
|||||||
package events
|
|
||||||
|
|
||||||
import "time"
|
|
||||||
|
|
||||||
type Bar struct {
|
|
||||||
Open float64
|
|
||||||
High float64
|
|
||||||
Low float64
|
|
||||||
Close float64
|
|
||||||
Volume float64
|
|
||||||
Interval time.Duration
|
|
||||||
}
|
|
||||||
@@ -1,6 +0,0 @@
|
|||||||
package events
|
|
||||||
|
|
||||||
type Custom struct {
|
|
||||||
Bytes []byte
|
|
||||||
ContentType string
|
|
||||||
}
|
|
||||||
@@ -1,22 +0,0 @@
|
|||||||
// Package events ...
|
|
||||||
package events
|
|
||||||
|
|
||||||
type DataType uint8
|
|
||||||
|
|
||||||
const (
|
|
||||||
TradeType DataType = iota
|
|
||||||
QuoteType
|
|
||||||
BarType
|
|
||||||
MBPDeltaType
|
|
||||||
MBPSnapshotType
|
|
||||||
MBODeltaType
|
|
||||||
MBOSnapshotType
|
|
||||||
CustomType
|
|
||||||
)
|
|
||||||
|
|
||||||
type Side uint8
|
|
||||||
|
|
||||||
const (
|
|
||||||
Bid Side = iota
|
|
||||||
Ask
|
|
||||||
)
|
|
||||||
@@ -1,20 +0,0 @@
|
|||||||
package events
|
|
||||||
|
|
||||||
type MBODelta struct {
|
|
||||||
Operation MBOOrderOp
|
|
||||||
OrderID string
|
|
||||||
Side Side
|
|
||||||
Price float64
|
|
||||||
Size float64
|
|
||||||
IsMaker bool
|
|
||||||
Seq uint64
|
|
||||||
ParentID string
|
|
||||||
}
|
|
||||||
|
|
||||||
type MBOOrderOp uint8
|
|
||||||
|
|
||||||
const (
|
|
||||||
OrderAdd MBOOrderOp = iota
|
|
||||||
OrderMod
|
|
||||||
OrderDel
|
|
||||||
)
|
|
||||||
@@ -1,14 +0,0 @@
|
|||||||
package events
|
|
||||||
|
|
||||||
type MBOSnapshot struct {
|
|
||||||
Orders []OrderEntry
|
|
||||||
Seq uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
type OrderEntry struct {
|
|
||||||
OrderID string
|
|
||||||
Side Side
|
|
||||||
Price float64
|
|
||||||
Size float64
|
|
||||||
IsMaker bool
|
|
||||||
}
|
|
||||||
@@ -1,8 +0,0 @@
|
|||||||
package events
|
|
||||||
|
|
||||||
type MBPDelta struct {
|
|
||||||
Side Side
|
|
||||||
Price float64
|
|
||||||
Size float64
|
|
||||||
Seq uint64
|
|
||||||
}
|
|
||||||
@@ -1,13 +0,0 @@
|
|||||||
package events
|
|
||||||
|
|
||||||
type MBPSnapshot struct {
|
|
||||||
Bids []PriceLevel
|
|
||||||
Asks []PriceLevel
|
|
||||||
Depth int
|
|
||||||
Seq uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
type PriceLevel struct {
|
|
||||||
Price float64
|
|
||||||
Size float64
|
|
||||||
}
|
|
||||||
@@ -1,8 +0,0 @@
|
|||||||
package events
|
|
||||||
|
|
||||||
type Quote struct {
|
|
||||||
BidPrice float64
|
|
||||||
BidSize float64
|
|
||||||
AskPrice float64
|
|
||||||
AskSize float64
|
|
||||||
}
|
|
||||||
@@ -1,16 +0,0 @@
|
|||||||
package events
|
|
||||||
|
|
||||||
type Trade struct {
|
|
||||||
Price float64
|
|
||||||
Qty float64
|
|
||||||
Aggressor AggressorSide
|
|
||||||
TradeID string
|
|
||||||
}
|
|
||||||
|
|
||||||
type AggressorSide uint8
|
|
||||||
|
|
||||||
const (
|
|
||||||
AggUnknown AggressorSide = iota
|
|
||||||
AggBuy
|
|
||||||
AggSell
|
|
||||||
)
|
|
||||||
@@ -1,56 +0,0 @@
|
|||||||
package routing
|
|
||||||
|
|
||||||
import "sync"
|
|
||||||
|
|
||||||
const DefaultRingCapacity = 1 << 8 // Best if power of 2 (or so I am told)
|
|
||||||
|
|
||||||
// Broker manages topics and issues publisher/subscriber handles.
|
|
||||||
type Broker struct {
|
|
||||||
mu sync.RWMutex
|
|
||||||
topics map[string]*TopicRing
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewBroker() *Broker {
|
|
||||||
return &Broker{
|
|
||||||
topics: make(map[string]*TopicRing),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// getOrCreateRing handles the race condition where a subscriber might attach
|
|
||||||
// to a topic before the publisher has created it, and vice versa.
|
|
||||||
// This is because we allow either a publisher or a subscriber to 'create' the topic
|
|
||||||
func (b *Broker) getOrCreateRing(topicKey string, requestedCap int) *TopicRing {
|
|
||||||
b.mu.Lock()
|
|
||||||
defer b.mu.Unlock()
|
|
||||||
|
|
||||||
ring, exists := b.topics[topicKey]
|
|
||||||
if !exists {
|
|
||||||
cap := requestedCap
|
|
||||||
if cap <= 0 {
|
|
||||||
cap = DefaultRingCapacity
|
|
||||||
}
|
|
||||||
ring = newTopicRing(cap)
|
|
||||||
b.topics[topicKey] = ring
|
|
||||||
}
|
|
||||||
return ring
|
|
||||||
}
|
|
||||||
|
|
||||||
// RegisterPublisher returns a fast-path Publisher.
|
|
||||||
func (b *Broker) RegisterPublisher(topicKey string, capacity int) Publisher {
|
|
||||||
ring := b.getOrCreateRing(topicKey, capacity)
|
|
||||||
return &ringPublisher{
|
|
||||||
ring: ring,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// RegisterSubscriber attaches a consumer to a topic and returns a fast-path Subscriber.
|
|
||||||
// We don't allow subscribrs to specify a buffer capacity size.
|
|
||||||
// As a general rule, a publisher takes precedence over a subscriber
|
|
||||||
func (b *Broker) RegisterSubscriber(topicKey string) Subscriber {
|
|
||||||
ring := b.getOrCreateRing(topicKey, 0)
|
|
||||||
consumer := ring.addConsumer()
|
|
||||||
return &ringSubscriber{
|
|
||||||
ring: ring,
|
|
||||||
consumer: consumer,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,25 +0,0 @@
|
|||||||
package routing
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"gitlab.michelsen.id/phillmichelsen/tessera/pkg/data"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Publisher is the write-side handle given to data sources.
|
|
||||||
type Publisher interface {
|
|
||||||
Publish(env data.Envelope)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Subscriber is the read-side handle given to consumers (data sinks).
|
|
||||||
type Subscriber interface {
|
|
||||||
// Receive blocks until a message is available or the context cancels.
|
|
||||||
// Best for general low-latency consumers that shouldn't burn CPU.
|
|
||||||
// Typically more than enough for most situations
|
|
||||||
Receive(ctx context.Context) (data.Envelope, error)
|
|
||||||
|
|
||||||
// TryReceive attempts to read one message lock-free.
|
|
||||||
// Returns (envelope, true, nil) if successful, or false if nothing is available.
|
|
||||||
// Polling TryReceive without a wait will most likely spike the CPU
|
|
||||||
TryReceive() (data.Envelope, bool, error)
|
|
||||||
}
|
|
||||||
@@ -1,201 +0,0 @@
|
|||||||
// router.go
|
|
||||||
package routing
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
|
|
||||||
"gitlab.michelsen.id/phillmichelsen/tessera/pkg/data"
|
|
||||||
)
|
|
||||||
|
|
||||||
var ErrDisconnected = errors.New("subscriber disconnected: failed to consume fast enough")
|
|
||||||
|
|
||||||
// IMPLEMENTATIONS
|
|
||||||
|
|
||||||
// Implements the Publisher interface
|
|
||||||
type ringPublisher struct {
|
|
||||||
ring *TopicRing
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *ringPublisher) Publish(env data.Envelope) {
|
|
||||||
p.ring.publish(env)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implements the Subscriber interface
|
|
||||||
type ringSubscriber struct {
|
|
||||||
ring *TopicRing
|
|
||||||
consumer *ringConsumer
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ringSubscriber) Receive(ctx context.Context) (data.Envelope, error) {
|
|
||||||
return s.ring.receive(ctx, s.consumer)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ringSubscriber) TryReceive() (data.Envelope, bool, error) {
|
|
||||||
return s.ring.tryReceive(s.consumer)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ringConsumer represents a single subscriber's read state within a TopicRing
|
|
||||||
// The 56-byte pads are added to prevent 'False Sharing' due to 64-byte cache sizes
|
|
||||||
type ringConsumer struct {
|
|
||||||
ID uint64 // monotonically increasing identifier
|
|
||||||
_ [56]byte
|
|
||||||
|
|
||||||
Cursor atomic.Uint64 // next expected sequence number, advanced monotonically
|
|
||||||
_ [56]byte
|
|
||||||
|
|
||||||
Dead atomic.Bool // set true if the consumer has fallen behind ring capacity, consumer should be disconnected
|
|
||||||
_ [56]byte
|
|
||||||
|
|
||||||
notify chan struct{} // size-1 wakeup channel for subscribers to block whilst waiting for new data
|
|
||||||
}
|
|
||||||
|
|
||||||
// TopicRing is a broadcast ring buffer for a topic
|
|
||||||
// It is designed to be minimize locks, same 56-byte pads used here as well
|
|
||||||
// The publisher appends sequentially whilst each subscriber maintains its own cursor (ringConsumer)
|
|
||||||
// We typically aim for a capacity that is power-of-two sized for reasons beyond my knowledge
|
|
||||||
type TopicRing struct {
|
|
||||||
Capacity uint64
|
|
||||||
Mask uint64
|
|
||||||
Ring []data.Envelope
|
|
||||||
|
|
||||||
_ [56]byte
|
|
||||||
writeTail atomic.Uint64
|
|
||||||
|
|
||||||
_ [56]byte
|
|
||||||
cachedMinConsumer uint64
|
|
||||||
|
|
||||||
consumers atomic.Pointer[[]*ringConsumer] // Copy-on-Write slice
|
|
||||||
mu sync.Mutex
|
|
||||||
nextSubID uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
// newTopicRing creates a TopicRing
|
|
||||||
// The capacity should be specified as a power-of-two (as the N in 2^N)
|
|
||||||
func newTopicRing(pow2 int) *TopicRing {
|
|
||||||
cap := uint64(1)
|
|
||||||
for cap < uint64(pow2) {
|
|
||||||
cap <<= 1
|
|
||||||
}
|
|
||||||
t := &TopicRing{
|
|
||||||
Capacity: cap,
|
|
||||||
Mask: cap - 1,
|
|
||||||
Ring: make([]data.Envelope, cap),
|
|
||||||
}
|
|
||||||
|
|
||||||
empty := make([]*ringConsumer, 0)
|
|
||||||
t.consumers.Store(&empty)
|
|
||||||
return t
|
|
||||||
}
|
|
||||||
|
|
||||||
// addConsumer registers a new subscriber on the ring
|
|
||||||
// The consumer starts at the current write tail
|
|
||||||
func (t *TopicRing) addConsumer() *ringConsumer {
|
|
||||||
t.mu.Lock()
|
|
||||||
defer t.mu.Unlock()
|
|
||||||
|
|
||||||
t.nextSubID++
|
|
||||||
c := &ringConsumer{
|
|
||||||
ID: t.nextSubID,
|
|
||||||
notify: make(chan struct{}, 1),
|
|
||||||
}
|
|
||||||
// Start at the current write tail so we don't read historical data
|
|
||||||
c.Cursor.Store(t.writeTail.Load())
|
|
||||||
|
|
||||||
// Copy-on-write update
|
|
||||||
old := *t.consumers.Load()
|
|
||||||
newSubs := make([]*ringConsumer, len(old), len(old)+1)
|
|
||||||
copy(newSubs, old)
|
|
||||||
newSubs = append(newSubs, c)
|
|
||||||
t.consumers.Store(&newSubs)
|
|
||||||
|
|
||||||
return c
|
|
||||||
}
|
|
||||||
|
|
||||||
// publish appends one message to the ring and notifies subscribers (with the 'notify' channel)
|
|
||||||
// Assumes a single publisher per topic
|
|
||||||
func (t *TopicRing) publish(env data.Envelope) {
|
|
||||||
seq := t.writeTail.Load() // we expect only one publisher per topic
|
|
||||||
// in the case we do want more than one publisher, switch to using atomic.AddUint64
|
|
||||||
|
|
||||||
if seq-t.cachedMinConsumer >= t.Capacity {
|
|
||||||
t.enforceCapacity(seq)
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Ring[seq&t.Mask] = env
|
|
||||||
|
|
||||||
t.writeTail.Store(seq + 1)
|
|
||||||
|
|
||||||
subs := *t.consumers.Load()
|
|
||||||
for _, c := range subs {
|
|
||||||
select {
|
|
||||||
case c.notify <- struct{}{}:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// enforceCapacity 'evicts' consumers that have fallen beyond the ring capacity
|
|
||||||
func (t *TopicRing) enforceCapacity(targetSeq uint64) {
|
|
||||||
subs := *t.consumers.Load()
|
|
||||||
newMin := targetSeq
|
|
||||||
|
|
||||||
for _, c := range subs {
|
|
||||||
if c.Dead.Load() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
cCursor := c.Cursor.Load()
|
|
||||||
if targetSeq-cCursor >= t.Capacity {
|
|
||||||
c.Dead.Store(true) // Evict slow consumer
|
|
||||||
} else if cCursor < newMin {
|
|
||||||
newMin = cCursor
|
|
||||||
}
|
|
||||||
}
|
|
||||||
t.cachedMinConsumer = newMin
|
|
||||||
}
|
|
||||||
|
|
||||||
// receive blocks until a new message is available, the consumer is evicted, or the context is cancelled
|
|
||||||
// Ordering is preserved per consumer (naturally)
|
|
||||||
func (t *TopicRing) receive(ctx context.Context, c *ringConsumer) (data.Envelope, error) {
|
|
||||||
for {
|
|
||||||
if c.Dead.Load() {
|
|
||||||
return data.Envelope{}, ErrDisconnected
|
|
||||||
}
|
|
||||||
|
|
||||||
currentCursor := c.Cursor.Load()
|
|
||||||
availableTail := t.writeTail.Load()
|
|
||||||
|
|
||||||
if currentCursor < availableTail {
|
|
||||||
env := t.Ring[currentCursor&t.Mask]
|
|
||||||
c.Cursor.Store(currentCursor + 1)
|
|
||||||
return env, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return data.Envelope{}, ctx.Err()
|
|
||||||
case <-c.notify:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// tryReceive is a non-blocking variant of receive
|
|
||||||
// Returns immediately if no new data is available
|
|
||||||
func (t *TopicRing) tryReceive(c *ringConsumer) (data.Envelope, bool, error) {
|
|
||||||
if c.Dead.Load() {
|
|
||||||
return data.Envelope{}, false, ErrDisconnected
|
|
||||||
}
|
|
||||||
|
|
||||||
currentCursor := c.Cursor.Load()
|
|
||||||
availableTail := t.writeTail.Load()
|
|
||||||
|
|
||||||
if currentCursor >= availableTail {
|
|
||||||
return data.Envelope{}, false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
env := t.Ring[currentCursor&t.Mask]
|
|
||||||
c.Cursor.Store(currentCursor + 1)
|
|
||||||
return env, true, nil
|
|
||||||
}
|
|
||||||
@@ -1,27 +0,0 @@
|
|||||||
package data
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"github.com/google/uuid"
|
|
||||||
)
|
|
||||||
|
|
||||||
type StreamID uuid.UUID
|
|
||||||
|
|
||||||
type Stream interface {
|
|
||||||
ID() StreamID
|
|
||||||
|
|
||||||
Sender() Sender
|
|
||||||
Receiver() Receiver
|
|
||||||
}
|
|
||||||
|
|
||||||
type Sender interface {
|
|
||||||
Send(context.Context, Envelope) error
|
|
||||||
SendBatch(context.Context, []Envelope) error
|
|
||||||
}
|
|
||||||
|
|
||||||
type Receiver interface {
|
|
||||||
TryReceive() (Envelope, bool, error)
|
|
||||||
ReceiveNext(context.Context) (Envelope, error)
|
|
||||||
Seq() uint64
|
|
||||||
}
|
|
||||||
523
pkg/pb/data_service/data_service.pb.go
Normal file
523
pkg/pb/data_service/data_service.pb.go
Normal file
@@ -0,0 +1,523 @@
|
|||||||
|
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||||
|
// versions:
|
||||||
|
// protoc-gen-go v1.36.6
|
||||||
|
// protoc v6.32.0
|
||||||
|
// source: pkg/pb/data_service/data_service.proto
|
||||||
|
|
||||||
|
package data_service
|
||||||
|
|
||||||
|
import (
|
||||||
|
reflect "reflect"
|
||||||
|
sync "sync"
|
||||||
|
unsafe "unsafe"
|
||||||
|
|
||||||
|
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||||
|
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Verify that this generated code is sufficiently up-to-date.
|
||||||
|
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
|
||||||
|
// Verify that runtime/protoimpl is sufficiently up-to-date.
|
||||||
|
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
|
||||||
|
)
|
||||||
|
|
||||||
|
type Identifier struct {
|
||||||
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
|
Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
|
||||||
|
unknownFields protoimpl.UnknownFields
|
||||||
|
sizeCache protoimpl.SizeCache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *Identifier) Reset() {
|
||||||
|
*x = Identifier{}
|
||||||
|
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[0]
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *Identifier) String() string {
|
||||||
|
return protoimpl.X.MessageStringOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*Identifier) ProtoMessage() {}
|
||||||
|
|
||||||
|
func (x *Identifier) ProtoReflect() protoreflect.Message {
|
||||||
|
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[0]
|
||||||
|
if x != nil {
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
if ms.LoadMessageInfo() == nil {
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
return ms
|
||||||
|
}
|
||||||
|
return mi.MessageOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deprecated: Use Identifier.ProtoReflect.Descriptor instead.
|
||||||
|
func (*Identifier) Descriptor() ([]byte, []int) {
|
||||||
|
return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{0}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *Identifier) GetKey() string {
|
||||||
|
if x != nil {
|
||||||
|
return x.Key
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
type Message struct {
|
||||||
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
|
Identifier *Identifier `protobuf:"bytes,1,opt,name=identifier,proto3" json:"identifier,omitempty"`
|
||||||
|
Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
|
||||||
|
Encoding string `protobuf:"bytes,3,opt,name=encoding,proto3" json:"encoding,omitempty"`
|
||||||
|
unknownFields protoimpl.UnknownFields
|
||||||
|
sizeCache protoimpl.SizeCache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *Message) Reset() {
|
||||||
|
*x = Message{}
|
||||||
|
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[1]
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *Message) String() string {
|
||||||
|
return protoimpl.X.MessageStringOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*Message) ProtoMessage() {}
|
||||||
|
|
||||||
|
func (x *Message) ProtoReflect() protoreflect.Message {
|
||||||
|
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[1]
|
||||||
|
if x != nil {
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
if ms.LoadMessageInfo() == nil {
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
return ms
|
||||||
|
}
|
||||||
|
return mi.MessageOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deprecated: Use Message.ProtoReflect.Descriptor instead.
|
||||||
|
func (*Message) Descriptor() ([]byte, []int) {
|
||||||
|
return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{1}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *Message) GetIdentifier() *Identifier {
|
||||||
|
if x != nil {
|
||||||
|
return x.Identifier
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *Message) GetPayload() []byte {
|
||||||
|
if x != nil {
|
||||||
|
return x.Payload
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *Message) GetEncoding() string {
|
||||||
|
if x != nil {
|
||||||
|
return x.Encoding
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
type StartStreamRequest struct {
|
||||||
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
|
unknownFields protoimpl.UnknownFields
|
||||||
|
sizeCache protoimpl.SizeCache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *StartStreamRequest) Reset() {
|
||||||
|
*x = StartStreamRequest{}
|
||||||
|
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[2]
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *StartStreamRequest) String() string {
|
||||||
|
return protoimpl.X.MessageStringOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*StartStreamRequest) ProtoMessage() {}
|
||||||
|
|
||||||
|
func (x *StartStreamRequest) ProtoReflect() protoreflect.Message {
|
||||||
|
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[2]
|
||||||
|
if x != nil {
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
if ms.LoadMessageInfo() == nil {
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
return ms
|
||||||
|
}
|
||||||
|
return mi.MessageOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deprecated: Use StartStreamRequest.ProtoReflect.Descriptor instead.
|
||||||
|
func (*StartStreamRequest) Descriptor() ([]byte, []int) {
|
||||||
|
return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{2}
|
||||||
|
}
|
||||||
|
|
||||||
|
type StartStreamResponse struct {
|
||||||
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
|
StreamUuid string `protobuf:"bytes,1,opt,name=stream_uuid,json=streamUuid,proto3" json:"stream_uuid,omitempty"`
|
||||||
|
unknownFields protoimpl.UnknownFields
|
||||||
|
sizeCache protoimpl.SizeCache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *StartStreamResponse) Reset() {
|
||||||
|
*x = StartStreamResponse{}
|
||||||
|
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[3]
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *StartStreamResponse) String() string {
|
||||||
|
return protoimpl.X.MessageStringOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*StartStreamResponse) ProtoMessage() {}
|
||||||
|
|
||||||
|
func (x *StartStreamResponse) ProtoReflect() protoreflect.Message {
|
||||||
|
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[3]
|
||||||
|
if x != nil {
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
if ms.LoadMessageInfo() == nil {
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
return ms
|
||||||
|
}
|
||||||
|
return mi.MessageOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deprecated: Use StartStreamResponse.ProtoReflect.Descriptor instead.
|
||||||
|
func (*StartStreamResponse) Descriptor() ([]byte, []int) {
|
||||||
|
return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{3}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *StartStreamResponse) GetStreamUuid() string {
|
||||||
|
if x != nil {
|
||||||
|
return x.StreamUuid
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
type ConfigureStreamRequest struct {
|
||||||
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
|
StreamUuid string `protobuf:"bytes,1,opt,name=stream_uuid,json=streamUuid,proto3" json:"stream_uuid,omitempty"`
|
||||||
|
Identifiers []*Identifier `protobuf:"bytes,2,rep,name=identifiers,proto3" json:"identifiers,omitempty"`
|
||||||
|
unknownFields protoimpl.UnknownFields
|
||||||
|
sizeCache protoimpl.SizeCache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *ConfigureStreamRequest) Reset() {
|
||||||
|
*x = ConfigureStreamRequest{}
|
||||||
|
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[4]
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *ConfigureStreamRequest) String() string {
|
||||||
|
return protoimpl.X.MessageStringOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*ConfigureStreamRequest) ProtoMessage() {}
|
||||||
|
|
||||||
|
func (x *ConfigureStreamRequest) ProtoReflect() protoreflect.Message {
|
||||||
|
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[4]
|
||||||
|
if x != nil {
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
if ms.LoadMessageInfo() == nil {
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
return ms
|
||||||
|
}
|
||||||
|
return mi.MessageOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deprecated: Use ConfigureStreamRequest.ProtoReflect.Descriptor instead.
|
||||||
|
func (*ConfigureStreamRequest) Descriptor() ([]byte, []int) {
|
||||||
|
return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{4}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *ConfigureStreamRequest) GetStreamUuid() string {
|
||||||
|
if x != nil {
|
||||||
|
return x.StreamUuid
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *ConfigureStreamRequest) GetIdentifiers() []*Identifier {
|
||||||
|
if x != nil {
|
||||||
|
return x.Identifiers
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type ConfigureStreamResponse struct {
|
||||||
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
|
unknownFields protoimpl.UnknownFields
|
||||||
|
sizeCache protoimpl.SizeCache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *ConfigureStreamResponse) Reset() {
|
||||||
|
*x = ConfigureStreamResponse{}
|
||||||
|
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[5]
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *ConfigureStreamResponse) String() string {
|
||||||
|
return protoimpl.X.MessageStringOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*ConfigureStreamResponse) ProtoMessage() {}
|
||||||
|
|
||||||
|
func (x *ConfigureStreamResponse) ProtoReflect() protoreflect.Message {
|
||||||
|
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[5]
|
||||||
|
if x != nil {
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
if ms.LoadMessageInfo() == nil {
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
return ms
|
||||||
|
}
|
||||||
|
return mi.MessageOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deprecated: Use ConfigureStreamResponse.ProtoReflect.Descriptor instead.
|
||||||
|
func (*ConfigureStreamResponse) Descriptor() ([]byte, []int) {
|
||||||
|
return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{5}
|
||||||
|
}
|
||||||
|
|
||||||
|
type StopStreamRequest struct {
|
||||||
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
|
StreamUuid string `protobuf:"bytes,1,opt,name=stream_uuid,json=streamUuid,proto3" json:"stream_uuid,omitempty"`
|
||||||
|
unknownFields protoimpl.UnknownFields
|
||||||
|
sizeCache protoimpl.SizeCache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *StopStreamRequest) Reset() {
|
||||||
|
*x = StopStreamRequest{}
|
||||||
|
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[6]
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *StopStreamRequest) String() string {
|
||||||
|
return protoimpl.X.MessageStringOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*StopStreamRequest) ProtoMessage() {}
|
||||||
|
|
||||||
|
func (x *StopStreamRequest) ProtoReflect() protoreflect.Message {
|
||||||
|
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[6]
|
||||||
|
if x != nil {
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
if ms.LoadMessageInfo() == nil {
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
return ms
|
||||||
|
}
|
||||||
|
return mi.MessageOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deprecated: Use StopStreamRequest.ProtoReflect.Descriptor instead.
|
||||||
|
func (*StopStreamRequest) Descriptor() ([]byte, []int) {
|
||||||
|
return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{6}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *StopStreamRequest) GetStreamUuid() string {
|
||||||
|
if x != nil {
|
||||||
|
return x.StreamUuid
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
type StopStreamResponse struct {
|
||||||
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
|
unknownFields protoimpl.UnknownFields
|
||||||
|
sizeCache protoimpl.SizeCache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *StopStreamResponse) Reset() {
|
||||||
|
*x = StopStreamResponse{}
|
||||||
|
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[7]
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *StopStreamResponse) String() string {
|
||||||
|
return protoimpl.X.MessageStringOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*StopStreamResponse) ProtoMessage() {}
|
||||||
|
|
||||||
|
func (x *StopStreamResponse) ProtoReflect() protoreflect.Message {
|
||||||
|
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[7]
|
||||||
|
if x != nil {
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
if ms.LoadMessageInfo() == nil {
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
return ms
|
||||||
|
}
|
||||||
|
return mi.MessageOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deprecated: Use StopStreamResponse.ProtoReflect.Descriptor instead.
|
||||||
|
func (*StopStreamResponse) Descriptor() ([]byte, []int) {
|
||||||
|
return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{7}
|
||||||
|
}
|
||||||
|
|
||||||
|
type ConnectStreamRequest struct {
|
||||||
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
|
StreamUuid string `protobuf:"bytes,1,opt,name=stream_uuid,json=streamUuid,proto3" json:"stream_uuid,omitempty"`
|
||||||
|
unknownFields protoimpl.UnknownFields
|
||||||
|
sizeCache protoimpl.SizeCache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *ConnectStreamRequest) Reset() {
|
||||||
|
*x = ConnectStreamRequest{}
|
||||||
|
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[8]
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *ConnectStreamRequest) String() string {
|
||||||
|
return protoimpl.X.MessageStringOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*ConnectStreamRequest) ProtoMessage() {}
|
||||||
|
|
||||||
|
func (x *ConnectStreamRequest) ProtoReflect() protoreflect.Message {
|
||||||
|
mi := &file_pkg_pb_data_service_data_service_proto_msgTypes[8]
|
||||||
|
if x != nil {
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
if ms.LoadMessageInfo() == nil {
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
return ms
|
||||||
|
}
|
||||||
|
return mi.MessageOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deprecated: Use ConnectStreamRequest.ProtoReflect.Descriptor instead.
|
||||||
|
func (*ConnectStreamRequest) Descriptor() ([]byte, []int) {
|
||||||
|
return file_pkg_pb_data_service_data_service_proto_rawDescGZIP(), []int{8}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *ConnectStreamRequest) GetStreamUuid() string {
|
||||||
|
if x != nil {
|
||||||
|
return x.StreamUuid
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
var File_pkg_pb_data_service_data_service_proto protoreflect.FileDescriptor
|
||||||
|
|
||||||
|
const file_pkg_pb_data_service_data_service_proto_rawDesc = "" +
|
||||||
|
"\n" +
|
||||||
|
"&pkg/pb/data_service/data_service.proto\x12\fdata_service\"\x1e\n" +
|
||||||
|
"\n" +
|
||||||
|
"Identifier\x12\x10\n" +
|
||||||
|
"\x03key\x18\x01 \x01(\tR\x03key\"y\n" +
|
||||||
|
"\aMessage\x128\n" +
|
||||||
|
"\n" +
|
||||||
|
"identifier\x18\x01 \x01(\v2\x18.data_service.IdentifierR\n" +
|
||||||
|
"identifier\x12\x18\n" +
|
||||||
|
"\apayload\x18\x02 \x01(\fR\apayload\x12\x1a\n" +
|
||||||
|
"\bencoding\x18\x03 \x01(\tR\bencoding\"\x14\n" +
|
||||||
|
"\x12StartStreamRequest\"6\n" +
|
||||||
|
"\x13StartStreamResponse\x12\x1f\n" +
|
||||||
|
"\vstream_uuid\x18\x01 \x01(\tR\n" +
|
||||||
|
"streamUuid\"u\n" +
|
||||||
|
"\x16ConfigureStreamRequest\x12\x1f\n" +
|
||||||
|
"\vstream_uuid\x18\x01 \x01(\tR\n" +
|
||||||
|
"streamUuid\x12:\n" +
|
||||||
|
"\videntifiers\x18\x02 \x03(\v2\x18.data_service.IdentifierR\videntifiers\"\x19\n" +
|
||||||
|
"\x17ConfigureStreamResponse\"4\n" +
|
||||||
|
"\x11StopStreamRequest\x12\x1f\n" +
|
||||||
|
"\vstream_uuid\x18\x01 \x01(\tR\n" +
|
||||||
|
"streamUuid\"\x14\n" +
|
||||||
|
"\x12StopStreamResponse\"7\n" +
|
||||||
|
"\x14ConnectStreamRequest\x12\x1f\n" +
|
||||||
|
"\vstream_uuid\x18\x01 \x01(\tR\n" +
|
||||||
|
"streamUuid2\x99\x02\n" +
|
||||||
|
"\x12DataServiceControl\x12R\n" +
|
||||||
|
"\vStartStream\x12 .data_service.StartStreamRequest\x1a!.data_service.StartStreamResponse\x12O\n" +
|
||||||
|
"\n" +
|
||||||
|
"StopStream\x12\x1f.data_service.StopStreamRequest\x1a .data_service.StopStreamResponse\x12^\n" +
|
||||||
|
"\x0fConfigureStream\x12$.data_service.ConfigureStreamRequest\x1a%.data_service.ConfigureStreamResponse2d\n" +
|
||||||
|
"\x14DataServiceStreaming\x12L\n" +
|
||||||
|
"\rConnectStream\x12\".data_service.ConnectStreamRequest\x1a\x15.data_service.Message0\x01BMZKgitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service;data_serviceb\x06proto3"
|
||||||
|
|
||||||
|
var (
|
||||||
|
file_pkg_pb_data_service_data_service_proto_rawDescOnce sync.Once
|
||||||
|
file_pkg_pb_data_service_data_service_proto_rawDescData []byte
|
||||||
|
)
|
||||||
|
|
||||||
|
func file_pkg_pb_data_service_data_service_proto_rawDescGZIP() []byte {
|
||||||
|
file_pkg_pb_data_service_data_service_proto_rawDescOnce.Do(func() {
|
||||||
|
file_pkg_pb_data_service_data_service_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_pkg_pb_data_service_data_service_proto_rawDesc), len(file_pkg_pb_data_service_data_service_proto_rawDesc)))
|
||||||
|
})
|
||||||
|
return file_pkg_pb_data_service_data_service_proto_rawDescData
|
||||||
|
}
|
||||||
|
|
||||||
|
var file_pkg_pb_data_service_data_service_proto_msgTypes = make([]protoimpl.MessageInfo, 9)
|
||||||
|
var file_pkg_pb_data_service_data_service_proto_goTypes = []any{
|
||||||
|
(*Identifier)(nil), // 0: data_service.Identifier
|
||||||
|
(*Message)(nil), // 1: data_service.Message
|
||||||
|
(*StartStreamRequest)(nil), // 2: data_service.StartStreamRequest
|
||||||
|
(*StartStreamResponse)(nil), // 3: data_service.StartStreamResponse
|
||||||
|
(*ConfigureStreamRequest)(nil), // 4: data_service.ConfigureStreamRequest
|
||||||
|
(*ConfigureStreamResponse)(nil), // 5: data_service.ConfigureStreamResponse
|
||||||
|
(*StopStreamRequest)(nil), // 6: data_service.StopStreamRequest
|
||||||
|
(*StopStreamResponse)(nil), // 7: data_service.StopStreamResponse
|
||||||
|
(*ConnectStreamRequest)(nil), // 8: data_service.ConnectStreamRequest
|
||||||
|
}
|
||||||
|
var file_pkg_pb_data_service_data_service_proto_depIdxs = []int32{
|
||||||
|
0, // 0: data_service.Message.identifier:type_name -> data_service.Identifier
|
||||||
|
0, // 1: data_service.ConfigureStreamRequest.identifiers:type_name -> data_service.Identifier
|
||||||
|
2, // 2: data_service.DataServiceControl.StartStream:input_type -> data_service.StartStreamRequest
|
||||||
|
6, // 3: data_service.DataServiceControl.StopStream:input_type -> data_service.StopStreamRequest
|
||||||
|
4, // 4: data_service.DataServiceControl.ConfigureStream:input_type -> data_service.ConfigureStreamRequest
|
||||||
|
8, // 5: data_service.DataServiceStreaming.ConnectStream:input_type -> data_service.ConnectStreamRequest
|
||||||
|
3, // 6: data_service.DataServiceControl.StartStream:output_type -> data_service.StartStreamResponse
|
||||||
|
7, // 7: data_service.DataServiceControl.StopStream:output_type -> data_service.StopStreamResponse
|
||||||
|
5, // 8: data_service.DataServiceControl.ConfigureStream:output_type -> data_service.ConfigureStreamResponse
|
||||||
|
1, // 9: data_service.DataServiceStreaming.ConnectStream:output_type -> data_service.Message
|
||||||
|
6, // [6:10] is the sub-list for method output_type
|
||||||
|
2, // [2:6] is the sub-list for method input_type
|
||||||
|
2, // [2:2] is the sub-list for extension type_name
|
||||||
|
2, // [2:2] is the sub-list for extension extendee
|
||||||
|
0, // [0:2] is the sub-list for field type_name
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() { file_pkg_pb_data_service_data_service_proto_init() }
|
||||||
|
func file_pkg_pb_data_service_data_service_proto_init() {
|
||||||
|
if File_pkg_pb_data_service_data_service_proto != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
type x struct{}
|
||||||
|
out := protoimpl.TypeBuilder{
|
||||||
|
File: protoimpl.DescBuilder{
|
||||||
|
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
|
||||||
|
RawDescriptor: unsafe.Slice(unsafe.StringData(file_pkg_pb_data_service_data_service_proto_rawDesc), len(file_pkg_pb_data_service_data_service_proto_rawDesc)),
|
||||||
|
NumEnums: 0,
|
||||||
|
NumMessages: 9,
|
||||||
|
NumExtensions: 0,
|
||||||
|
NumServices: 2,
|
||||||
|
},
|
||||||
|
GoTypes: file_pkg_pb_data_service_data_service_proto_goTypes,
|
||||||
|
DependencyIndexes: file_pkg_pb_data_service_data_service_proto_depIdxs,
|
||||||
|
MessageInfos: file_pkg_pb_data_service_data_service_proto_msgTypes,
|
||||||
|
}.Build()
|
||||||
|
File_pkg_pb_data_service_data_service_proto = out.File
|
||||||
|
file_pkg_pb_data_service_data_service_proto_goTypes = nil
|
||||||
|
file_pkg_pb_data_service_data_service_proto_depIdxs = nil
|
||||||
|
}
|
||||||
43
pkg/pb/data_service/data_service.proto
Normal file
43
pkg/pb/data_service/data_service.proto
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
package data_service;
|
||||||
|
|
||||||
|
option go_package = "gitlab.michelsen.id/phillmichelsen/tessera/pkg/pb/data_service;data_service";
|
||||||
|
|
||||||
|
service DataServiceControl {
|
||||||
|
rpc StartStream(StartStreamRequest) returns (StartStreamResponse);
|
||||||
|
rpc StopStream(StopStreamRequest) returns (StopStreamResponse);
|
||||||
|
rpc ConfigureStream(ConfigureStreamRequest) returns (ConfigureStreamResponse);
|
||||||
|
}
|
||||||
|
|
||||||
|
service DataServiceStreaming {
|
||||||
|
rpc ConnectStream(ConnectStreamRequest) returns (stream Message);
|
||||||
|
}
|
||||||
|
|
||||||
|
message Identifier {
|
||||||
|
string key = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message Pattern {
|
||||||
|
string key = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message Message {
|
||||||
|
Identifier identifier = 1;
|
||||||
|
bytes payload = 2;
|
||||||
|
string encoding = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message StartStreamRequest {}
|
||||||
|
message StartStreamResponse { string stream_uuid = 1; }
|
||||||
|
|
||||||
|
message ConfigureStreamRequest {
|
||||||
|
string stream_uuid = 1;
|
||||||
|
repeated Pattern patterns = 2;
|
||||||
|
}
|
||||||
|
message ConfigureStreamResponse {}
|
||||||
|
|
||||||
|
message StopStreamRequest { string stream_uuid = 1; }
|
||||||
|
message StopStreamResponse {}
|
||||||
|
|
||||||
|
message ConnectStreamRequest { string stream_uuid = 1; }
|
||||||
303
pkg/pb/data_service/data_service_grpc.pb.go
Normal file
303
pkg/pb/data_service/data_service_grpc.pb.go
Normal file
@@ -0,0 +1,303 @@
|
|||||||
|
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||||
|
// versions:
|
||||||
|
// - protoc-gen-go-grpc v1.5.1
|
||||||
|
// - protoc v6.32.0
|
||||||
|
// source: pkg/pb/data_service/data_service.proto
|
||||||
|
|
||||||
|
package data_service
|
||||||
|
|
||||||
|
import (
|
||||||
|
context "context"
|
||||||
|
|
||||||
|
grpc "google.golang.org/grpc"
|
||||||
|
codes "google.golang.org/grpc/codes"
|
||||||
|
status "google.golang.org/grpc/status"
|
||||||
|
)
|
||||||
|
|
||||||
|
// This is a compile-time assertion to ensure that this generated file
|
||||||
|
// is compatible with the grpc package it is being compiled against.
|
||||||
|
// Requires gRPC-Go v1.64.0 or later.
|
||||||
|
const _ = grpc.SupportPackageIsVersion9
|
||||||
|
|
||||||
|
const (
|
||||||
|
DataServiceControl_StartStream_FullMethodName = "/data_service.DataServiceControl/StartStream"
|
||||||
|
DataServiceControl_StopStream_FullMethodName = "/data_service.DataServiceControl/StopStream"
|
||||||
|
DataServiceControl_ConfigureStream_FullMethodName = "/data_service.DataServiceControl/ConfigureStream"
|
||||||
|
)
|
||||||
|
|
||||||
|
// DataServiceControlClient is the client API for DataServiceControl service.
|
||||||
|
//
|
||||||
|
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
|
||||||
|
type DataServiceControlClient interface {
|
||||||
|
StartStream(ctx context.Context, in *StartStreamRequest, opts ...grpc.CallOption) (*StartStreamResponse, error)
|
||||||
|
StopStream(ctx context.Context, in *StopStreamRequest, opts ...grpc.CallOption) (*StopStreamResponse, error)
|
||||||
|
ConfigureStream(ctx context.Context, in *ConfigureStreamRequest, opts ...grpc.CallOption) (*ConfigureStreamResponse, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type dataServiceControlClient struct {
|
||||||
|
cc grpc.ClientConnInterface
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDataServiceControlClient(cc grpc.ClientConnInterface) DataServiceControlClient {
|
||||||
|
return &dataServiceControlClient{cc}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dataServiceControlClient) StartStream(ctx context.Context, in *StartStreamRequest, opts ...grpc.CallOption) (*StartStreamResponse, error) {
|
||||||
|
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||||
|
out := new(StartStreamResponse)
|
||||||
|
err := c.cc.Invoke(ctx, DataServiceControl_StartStream_FullMethodName, in, out, cOpts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dataServiceControlClient) StopStream(ctx context.Context, in *StopStreamRequest, opts ...grpc.CallOption) (*StopStreamResponse, error) {
|
||||||
|
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||||
|
out := new(StopStreamResponse)
|
||||||
|
err := c.cc.Invoke(ctx, DataServiceControl_StopStream_FullMethodName, in, out, cOpts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dataServiceControlClient) ConfigureStream(ctx context.Context, in *ConfigureStreamRequest, opts ...grpc.CallOption) (*ConfigureStreamResponse, error) {
|
||||||
|
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||||
|
out := new(ConfigureStreamResponse)
|
||||||
|
err := c.cc.Invoke(ctx, DataServiceControl_ConfigureStream_FullMethodName, in, out, cOpts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DataServiceControlServer is the server API for DataServiceControl service.
|
||||||
|
// All implementations must embed UnimplementedDataServiceControlServer
|
||||||
|
// for forward compatibility.
|
||||||
|
type DataServiceControlServer interface {
|
||||||
|
StartStream(context.Context, *StartStreamRequest) (*StartStreamResponse, error)
|
||||||
|
StopStream(context.Context, *StopStreamRequest) (*StopStreamResponse, error)
|
||||||
|
ConfigureStream(context.Context, *ConfigureStreamRequest) (*ConfigureStreamResponse, error)
|
||||||
|
mustEmbedUnimplementedDataServiceControlServer()
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnimplementedDataServiceControlServer must be embedded to have
|
||||||
|
// forward compatible implementations.
|
||||||
|
//
|
||||||
|
// NOTE: this should be embedded by value instead of pointer to avoid a nil
|
||||||
|
// pointer dereference when methods are called.
|
||||||
|
type UnimplementedDataServiceControlServer struct{}
|
||||||
|
|
||||||
|
func (UnimplementedDataServiceControlServer) StartStream(context.Context, *StartStreamRequest) (*StartStreamResponse, error) {
|
||||||
|
return nil, status.Errorf(codes.Unimplemented, "method StartStream not implemented")
|
||||||
|
}
|
||||||
|
func (UnimplementedDataServiceControlServer) StopStream(context.Context, *StopStreamRequest) (*StopStreamResponse, error) {
|
||||||
|
return nil, status.Errorf(codes.Unimplemented, "method StopStream not implemented")
|
||||||
|
}
|
||||||
|
func (UnimplementedDataServiceControlServer) ConfigureStream(context.Context, *ConfigureStreamRequest) (*ConfigureStreamResponse, error) {
|
||||||
|
return nil, status.Errorf(codes.Unimplemented, "method ConfigureStream not implemented")
|
||||||
|
}
|
||||||
|
func (UnimplementedDataServiceControlServer) mustEmbedUnimplementedDataServiceControlServer() {}
|
||||||
|
func (UnimplementedDataServiceControlServer) testEmbeddedByValue() {}
|
||||||
|
|
||||||
|
// UnsafeDataServiceControlServer may be embedded to opt out of forward compatibility for this service.
|
||||||
|
// Use of this interface is not recommended, as added methods to DataServiceControlServer will
|
||||||
|
// result in compilation errors.
|
||||||
|
type UnsafeDataServiceControlServer interface {
|
||||||
|
mustEmbedUnimplementedDataServiceControlServer()
|
||||||
|
}
|
||||||
|
|
||||||
|
func RegisterDataServiceControlServer(s grpc.ServiceRegistrar, srv DataServiceControlServer) {
|
||||||
|
// If the following call pancis, it indicates UnimplementedDataServiceControlServer was
|
||||||
|
// embedded by pointer and is nil. This will cause panics if an
|
||||||
|
// unimplemented method is ever invoked, so we test this at initialization
|
||||||
|
// time to prevent it from happening at runtime later due to I/O.
|
||||||
|
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
|
||||||
|
t.testEmbeddedByValue()
|
||||||
|
}
|
||||||
|
s.RegisterService(&DataServiceControl_ServiceDesc, srv)
|
||||||
|
}
|
||||||
|
|
||||||
|
func _DataServiceControl_StartStream_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||||
|
in := new(StartStreamRequest)
|
||||||
|
if err := dec(in); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if interceptor == nil {
|
||||||
|
return srv.(DataServiceControlServer).StartStream(ctx, in)
|
||||||
|
}
|
||||||
|
info := &grpc.UnaryServerInfo{
|
||||||
|
Server: srv,
|
||||||
|
FullMethod: DataServiceControl_StartStream_FullMethodName,
|
||||||
|
}
|
||||||
|
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
|
return srv.(DataServiceControlServer).StartStream(ctx, req.(*StartStreamRequest))
|
||||||
|
}
|
||||||
|
return interceptor(ctx, in, info, handler)
|
||||||
|
}
|
||||||
|
|
||||||
|
func _DataServiceControl_StopStream_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||||
|
in := new(StopStreamRequest)
|
||||||
|
if err := dec(in); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if interceptor == nil {
|
||||||
|
return srv.(DataServiceControlServer).StopStream(ctx, in)
|
||||||
|
}
|
||||||
|
info := &grpc.UnaryServerInfo{
|
||||||
|
Server: srv,
|
||||||
|
FullMethod: DataServiceControl_StopStream_FullMethodName,
|
||||||
|
}
|
||||||
|
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
|
return srv.(DataServiceControlServer).StopStream(ctx, req.(*StopStreamRequest))
|
||||||
|
}
|
||||||
|
return interceptor(ctx, in, info, handler)
|
||||||
|
}
|
||||||
|
|
||||||
|
func _DataServiceControl_ConfigureStream_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||||
|
in := new(ConfigureStreamRequest)
|
||||||
|
if err := dec(in); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if interceptor == nil {
|
||||||
|
return srv.(DataServiceControlServer).ConfigureStream(ctx, in)
|
||||||
|
}
|
||||||
|
info := &grpc.UnaryServerInfo{
|
||||||
|
Server: srv,
|
||||||
|
FullMethod: DataServiceControl_ConfigureStream_FullMethodName,
|
||||||
|
}
|
||||||
|
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
|
return srv.(DataServiceControlServer).ConfigureStream(ctx, req.(*ConfigureStreamRequest))
|
||||||
|
}
|
||||||
|
return interceptor(ctx, in, info, handler)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DataServiceControl_ServiceDesc is the grpc.ServiceDesc for DataServiceControl service.
|
||||||
|
// It's only intended for direct use with grpc.RegisterService,
|
||||||
|
// and not to be introspected or modified (even as a copy)
|
||||||
|
var DataServiceControl_ServiceDesc = grpc.ServiceDesc{
|
||||||
|
ServiceName: "data_service.DataServiceControl",
|
||||||
|
HandlerType: (*DataServiceControlServer)(nil),
|
||||||
|
Methods: []grpc.MethodDesc{
|
||||||
|
{
|
||||||
|
MethodName: "StartStream",
|
||||||
|
Handler: _DataServiceControl_StartStream_Handler,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
MethodName: "StopStream",
|
||||||
|
Handler: _DataServiceControl_StopStream_Handler,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
MethodName: "ConfigureStream",
|
||||||
|
Handler: _DataServiceControl_ConfigureStream_Handler,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Streams: []grpc.StreamDesc{},
|
||||||
|
Metadata: "pkg/pb/data_service/data_service.proto",
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
DataServiceStreaming_ConnectStream_FullMethodName = "/data_service.DataServiceStreaming/ConnectStream"
|
||||||
|
)
|
||||||
|
|
||||||
|
// DataServiceStreamingClient is the client API for DataServiceStreaming service.
|
||||||
|
//
|
||||||
|
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
|
||||||
|
type DataServiceStreamingClient interface {
|
||||||
|
ConnectStream(ctx context.Context, in *ConnectStreamRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Message], error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type dataServiceStreamingClient struct {
|
||||||
|
cc grpc.ClientConnInterface
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDataServiceStreamingClient(cc grpc.ClientConnInterface) DataServiceStreamingClient {
|
||||||
|
return &dataServiceStreamingClient{cc}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dataServiceStreamingClient) ConnectStream(ctx context.Context, in *ConnectStreamRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Message], error) {
|
||||||
|
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||||
|
stream, err := c.cc.NewStream(ctx, &DataServiceStreaming_ServiceDesc.Streams[0], DataServiceStreaming_ConnectStream_FullMethodName, cOpts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
x := &grpc.GenericClientStream[ConnectStreamRequest, Message]{ClientStream: stream}
|
||||||
|
if err := x.ClientStream.SendMsg(in); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := x.ClientStream.CloseSend(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return x, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||||
|
type DataServiceStreaming_ConnectStreamClient = grpc.ServerStreamingClient[Message]
|
||||||
|
|
||||||
|
// DataServiceStreamingServer is the server API for DataServiceStreaming service.
|
||||||
|
// All implementations must embed UnimplementedDataServiceStreamingServer
|
||||||
|
// for forward compatibility.
|
||||||
|
type DataServiceStreamingServer interface {
|
||||||
|
ConnectStream(*ConnectStreamRequest, grpc.ServerStreamingServer[Message]) error
|
||||||
|
mustEmbedUnimplementedDataServiceStreamingServer()
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnimplementedDataServiceStreamingServer must be embedded to have
|
||||||
|
// forward compatible implementations.
|
||||||
|
//
|
||||||
|
// NOTE: this should be embedded by value instead of pointer to avoid a nil
|
||||||
|
// pointer dereference when methods are called.
|
||||||
|
type UnimplementedDataServiceStreamingServer struct{}
|
||||||
|
|
||||||
|
func (UnimplementedDataServiceStreamingServer) ConnectStream(*ConnectStreamRequest, grpc.ServerStreamingServer[Message]) error {
|
||||||
|
return status.Errorf(codes.Unimplemented, "method ConnectStream not implemented")
|
||||||
|
}
|
||||||
|
func (UnimplementedDataServiceStreamingServer) mustEmbedUnimplementedDataServiceStreamingServer() {}
|
||||||
|
func (UnimplementedDataServiceStreamingServer) testEmbeddedByValue() {}
|
||||||
|
|
||||||
|
// UnsafeDataServiceStreamingServer may be embedded to opt out of forward compatibility for this service.
|
||||||
|
// Use of this interface is not recommended, as added methods to DataServiceStreamingServer will
|
||||||
|
// result in compilation errors.
|
||||||
|
type UnsafeDataServiceStreamingServer interface {
|
||||||
|
mustEmbedUnimplementedDataServiceStreamingServer()
|
||||||
|
}
|
||||||
|
|
||||||
|
func RegisterDataServiceStreamingServer(s grpc.ServiceRegistrar, srv DataServiceStreamingServer) {
|
||||||
|
// If the following call pancis, it indicates UnimplementedDataServiceStreamingServer was
|
||||||
|
// embedded by pointer and is nil. This will cause panics if an
|
||||||
|
// unimplemented method is ever invoked, so we test this at initialization
|
||||||
|
// time to prevent it from happening at runtime later due to I/O.
|
||||||
|
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
|
||||||
|
t.testEmbeddedByValue()
|
||||||
|
}
|
||||||
|
s.RegisterService(&DataServiceStreaming_ServiceDesc, srv)
|
||||||
|
}
|
||||||
|
|
||||||
|
func _DataServiceStreaming_ConnectStream_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||||
|
m := new(ConnectStreamRequest)
|
||||||
|
if err := stream.RecvMsg(m); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return srv.(DataServiceStreamingServer).ConnectStream(m, &grpc.GenericServerStream[ConnectStreamRequest, Message]{ServerStream: stream})
|
||||||
|
}
|
||||||
|
|
||||||
|
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||||
|
type DataServiceStreaming_ConnectStreamServer = grpc.ServerStreamingServer[Message]
|
||||||
|
|
||||||
|
// DataServiceStreaming_ServiceDesc is the grpc.ServiceDesc for DataServiceStreaming service.
|
||||||
|
// It's only intended for direct use with grpc.RegisterService,
|
||||||
|
// and not to be introspected or modified (even as a copy)
|
||||||
|
var DataServiceStreaming_ServiceDesc = grpc.ServiceDesc{
|
||||||
|
ServiceName: "data_service.DataServiceStreaming",
|
||||||
|
HandlerType: (*DataServiceStreamingServer)(nil),
|
||||||
|
Methods: []grpc.MethodDesc{},
|
||||||
|
Streams: []grpc.StreamDesc{
|
||||||
|
{
|
||||||
|
StreamName: "ConnectStream",
|
||||||
|
Handler: _DataServiceStreaming_ConnectStream_Handler,
|
||||||
|
ServerStreams: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Metadata: "pkg/pb/data_service/data_service.proto",
|
||||||
|
}
|
||||||
1
services/controller_service/cmd/main.go
Normal file
1
services/controller_service/cmd/main.go
Normal file
@@ -0,0 +1 @@
|
|||||||
|
package cmd
|
||||||
21
services/data_service/Dockerfile
Normal file
21
services/data_service/Dockerfile
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
# ---- Builder ----
|
||||||
|
FROM golang:1.24-alpine AS builder
|
||||||
|
|
||||||
|
ENV CGO_ENABLED=0 GOOS=linux
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
COPY go.mod go.sum ./
|
||||||
|
RUN go mod download
|
||||||
|
|
||||||
|
COPY . .
|
||||||
|
RUN --mount=type=cache,target=/root/.cache/go-build \
|
||||||
|
--mount=type=cache,target=/go/pkg/mod \
|
||||||
|
go build -trimpath -ldflags="-s -w" \
|
||||||
|
-o /out/data-service ./services/data_service/cmd/data_service
|
||||||
|
|
||||||
|
# ---- Runtime ----
|
||||||
|
FROM gcr.io/distroless/static:nonroot
|
||||||
|
EXPOSE 50051 50052 6000
|
||||||
|
COPY --from=builder /out/data-service /bin/data-service
|
||||||
|
USER nonroot:nonroot
|
||||||
|
ENTRYPOINT ["/bin/data-service"]
|
||||||
50
services/data_service/cmd/data_service/main.go
Normal file
50
services/data_service/cmd/data_service/main.go
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log/slog"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/lmittmann/tint"
|
||||||
|
)
|
||||||
|
|
||||||
|
func initLogger() *slog.Logger {
|
||||||
|
level := parseLevel(env("LOG_LEVEL", "debug"))
|
||||||
|
if env("LOG_FORMAT", "pretty") == "json" {
|
||||||
|
return slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
|
||||||
|
Level: level,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
return slog.New(tint.NewHandler(os.Stdout, &tint.Options{
|
||||||
|
Level: level,
|
||||||
|
TimeFormat: time.RFC3339Nano,
|
||||||
|
NoColor: os.Getenv("NO_COLOR") != "",
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseLevel(s string) slog.Level {
|
||||||
|
switch s {
|
||||||
|
case "debug":
|
||||||
|
return slog.LevelDebug
|
||||||
|
case "warn":
|
||||||
|
return slog.LevelWarn
|
||||||
|
case "error":
|
||||||
|
return slog.LevelError
|
||||||
|
default:
|
||||||
|
return slog.LevelInfo
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func env(k, def string) string {
|
||||||
|
if v := os.Getenv(k); v != "" {
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
return def
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
slog.SetDefault(initLogger())
|
||||||
|
slog.Info("starting", "svc", "data-service")
|
||||||
|
|
||||||
|
select {}
|
||||||
|
}
|
||||||
25
services/data_service/internal/control/control.go
Normal file
25
services/data_service/internal/control/control.go
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
// Package control
|
||||||
|
package control
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/node"
|
||||||
|
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/node/processor"
|
||||||
|
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/router"
|
||||||
|
)
|
||||||
|
|
||||||
|
type nodeEntry struct {
|
||||||
|
Template node.Template
|
||||||
|
TemplateFingerprint string
|
||||||
|
}
|
||||||
|
|
||||||
|
type Controller struct {
|
||||||
|
router router.Router
|
||||||
|
sourceRegistry any // source.Registry
|
||||||
|
processorRegistry processor.Registry
|
||||||
|
sinkRegistry any // sink.Registry
|
||||||
|
|
||||||
|
sourceNodes map[uuid.UUID]any
|
||||||
|
processorNodes map[uuid.UUID]processor.Processor
|
||||||
|
sinkNodes map[uuid.UUID]any
|
||||||
|
}
|
||||||
43
services/data_service/internal/control/fingerprint.go
Normal file
43
services/data_service/internal/control/fingerprint.go
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
package control
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/binary"
|
||||||
|
"encoding/hex"
|
||||||
|
"sort"
|
||||||
|
|
||||||
|
"lukechampine.com/blake3"
|
||||||
|
)
|
||||||
|
|
||||||
|
func StreamFingerprint(templateFP, outPort string, inMap map[string]string) string {
|
||||||
|
// Sort input keys for determinism.
|
||||||
|
keys := make([]string, 0, len(inMap))
|
||||||
|
for k := range inMap {
|
||||||
|
keys = append(keys, k)
|
||||||
|
}
|
||||||
|
sort.Strings(keys)
|
||||||
|
|
||||||
|
h := blake3.New(32, nil)
|
||||||
|
|
||||||
|
write := func(s string) {
|
||||||
|
var lenbuf [4]byte
|
||||||
|
binary.LittleEndian.PutUint32(lenbuf[:], uint32(len(s)))
|
||||||
|
_, _ = h.Write(lenbuf[:])
|
||||||
|
_, _ = h.Write([]byte(s))
|
||||||
|
}
|
||||||
|
|
||||||
|
// templateFP, outPort, input count, then pairs.
|
||||||
|
write(templateFP)
|
||||||
|
write(outPort)
|
||||||
|
|
||||||
|
var nbuf [4]byte
|
||||||
|
binary.LittleEndian.PutUint32(nbuf[:], uint32(len(keys)))
|
||||||
|
_, _ = h.Write(nbuf[:])
|
||||||
|
|
||||||
|
for _, k := range keys {
|
||||||
|
write(k)
|
||||||
|
write(inMap[k])
|
||||||
|
}
|
||||||
|
|
||||||
|
sum := h.Sum(nil)
|
||||||
|
return hex.EncodeToString(sum)
|
||||||
|
}
|
||||||
0
services/data_service/internal/control/registry.go
Normal file
0
services/data_service/internal/control/registry.go
Normal file
0
services/data_service/internal/control/wiring.go
Normal file
0
services/data_service/internal/control/wiring.go
Normal file
11
services/data_service/internal/domain/message.go
Normal file
11
services/data_service/internal/domain/message.go
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
// Package domain
|
||||||
|
package domain
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
type Message struct {
|
||||||
|
Payload []byte
|
||||||
|
SchemaID string
|
||||||
|
Seq uint64
|
||||||
|
Timestamp time.Time
|
||||||
|
}
|
||||||
14
services/data_service/internal/node/io.go
Normal file
14
services/data_service/internal/node/io.go
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
// Package node
|
||||||
|
package node
|
||||||
|
|
||||||
|
import (
|
||||||
|
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/domain"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Receiver interface {
|
||||||
|
Receive() (port string, message domain.Message, ok bool)
|
||||||
|
}
|
||||||
|
|
||||||
|
type Sender interface {
|
||||||
|
Send(port string, message domain.Message) error
|
||||||
|
}
|
||||||
16
services/data_service/internal/node/node.go
Normal file
16
services/data_service/internal/node/node.go
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
package node
|
||||||
|
|
||||||
|
type Type string
|
||||||
|
|
||||||
|
const (
|
||||||
|
Source Type = "source"
|
||||||
|
Processor Type = "processor"
|
||||||
|
Sink Type = "sink"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Template struct {
|
||||||
|
Kind Type
|
||||||
|
Type string
|
||||||
|
Version string
|
||||||
|
Config string
|
||||||
|
}
|
||||||
18
services/data_service/internal/node/processor/processor.go
Normal file
18
services/data_service/internal/node/processor/processor.go
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
// Package processor
|
||||||
|
package processor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/node"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Processor interface {
|
||||||
|
Start(ctx context.Context, cfg string, io IO) error
|
||||||
|
Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
type IO interface {
|
||||||
|
node.Receiver
|
||||||
|
node.Sender
|
||||||
|
}
|
||||||
11
services/data_service/internal/node/processor/registry.go
Normal file
11
services/data_service/internal/node/processor/registry.go
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
package processor
|
||||||
|
|
||||||
|
type Registry struct{}
|
||||||
|
|
||||||
|
type Factory interface {
|
||||||
|
New() Processor
|
||||||
|
Type() string
|
||||||
|
Version() string
|
||||||
|
|
||||||
|
TemplateFingerprint(cfg string) (string, error)
|
||||||
|
}
|
||||||
17
services/data_service/internal/node/sink/sink.go
Normal file
17
services/data_service/internal/node/sink/sink.go
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
// Package sink
|
||||||
|
package sink
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/node"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Sink interface {
|
||||||
|
Start(ctx context.Context, io IO) error
|
||||||
|
Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
type IO interface {
|
||||||
|
node.Sender
|
||||||
|
}
|
||||||
1
services/data_service/internal/node/source/registry.go
Normal file
1
services/data_service/internal/node/source/registry.go
Normal file
@@ -0,0 +1 @@
|
|||||||
|
package source
|
||||||
20
services/data_service/internal/node/source/source.go
Normal file
20
services/data_service/internal/node/source/source.go
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
// Package source
|
||||||
|
package source
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"gitlab.michelsen.id/phillmichelsen/tessera/services/data_service/internal/node"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Source interface {
|
||||||
|
Start(ctx context.Context, cfg string, io IO) error
|
||||||
|
Stop()
|
||||||
|
|
||||||
|
Serve(key string) error
|
||||||
|
Unserve(key string) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type IO interface {
|
||||||
|
node.Sender
|
||||||
|
}
|
||||||
9
services/data_service/internal/router/router.go
Normal file
9
services/data_service/internal/router/router.go
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
// Package router
|
||||||
|
package router
|
||||||
|
|
||||||
|
import "github.com/google/uuid"
|
||||||
|
|
||||||
|
type Router interface {
|
||||||
|
Route(fromNode uuid.UUID, fromOutPort string, toNode uuid.UUID, toInPort string)
|
||||||
|
Unroute(fromNode uuid.UUID, fromOutPort string, toNode uuid.UUID, toInPort string)
|
||||||
|
}
|
||||||
2
services/data_service/internal/router/single/engine.go
Normal file
2
services/data_service/internal/router/single/engine.go
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
// Package single
|
||||||
|
package single
|
||||||
2
services/data_service/internal/server/server.go
Normal file
2
services/data_service/internal/server/server.go
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
// Package server
|
||||||
|
package server
|
||||||
2
services/data_service/internal/session/session.go
Normal file
2
services/data_service/internal/session/session.go
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
// Package session
|
||||||
|
package session
|
||||||
1
services/portfolio_service/cmd/main.go
Normal file
1
services/portfolio_service/cmd/main.go
Normal file
@@ -0,0 +1 @@
|
|||||||
|
package cmd
|
||||||
1
services/strategies/strategy1/cmd/main.go
Normal file
1
services/strategies/strategy1/cmd/main.go
Normal file
@@ -0,0 +1 @@
|
|||||||
|
package cmd
|
||||||
1
services/strategies/strategy1/service.go
Normal file
1
services/strategies/strategy1/service.go
Normal file
@@ -0,0 +1 @@
|
|||||||
|
package marketdata
|
||||||
Reference in New Issue
Block a user