WIP Reworking data module

This commit is contained in:
2026-02-05 22:55:48 +08:00
parent 9b9d9e2156
commit b3841f5647
10 changed files with 361 additions and 71 deletions

View File

@@ -1,3 +1,173 @@
package main
func main() {}
import (
"context"
"fmt"
"sync"
"time"
"github.com/google/uuid"
"gitlab.michelsen.id/phillmichelsen/tessera/pkg/data"
"gitlab.michelsen.id/phillmichelsen/tessera/pkg/data/routing"
)
type SeqPayload struct {
Seq uint64
}
type streamStats struct {
sent uint64
observed uint64
missed uint64
lastSeen uint64
lastReport time.Time
}
func main() {
ctx := context.Background()
// ---- Knobs ----
N := 10
duration := 5 * time.Second
totalTargetPerSec := 5_000 // total across all streams
// ----------------
rt := routing.NewInprocRouter()
senders := make([]data.Sender, N)
receivers := make([]data.Receiver, N)
for i := range N {
st, err := rt.OpenStream(data.StreamID(uuid.New()))
if err != nil {
panic(err)
}
senders[i] = st.Sender()
receivers[i] = st.Receiver()
}
perStreamTarget := totalTargetPerSec / N
if perStreamTarget == 0 {
perStreamTarget = 1
}
fmt.Printf("N=%d duration=%s totalTarget=%d/s perStreamTarget=%d/s\n",
N, duration, totalTargetPerSec, perStreamTarget)
stopAt := time.Now().Add(duration)
stats := make([]streamStats, N)
var wg sync.WaitGroup
wg.Add(N + 1)
// Publisher: per-stream sender sequence in envelope payload.
go func() {
defer wg.Done()
tick := time.NewTicker(1 * time.Millisecond)
defer tick.Stop()
perTick := perStreamTarget / 1000
rem := perStreamTarget % 1000
remAcc := make([]int, N)
seq := make([]uint64, N)
for time.Now().Before(stopAt) {
<-tick.C
for i := range N {
n := int(perTick)
remAcc[i] += rem
if remAcc[i] >= 1000 {
n++
remAcc[i] -= 1000
}
for j := 0; j < n; j++ {
seq[i]++
env := data.Envelope{
Payload: SeqPayload{Seq: seq[i]},
}
_ = senders[i].Send(ctx, env)
stats[i].sent++
}
}
}
}()
// Consumers: detect missed sender sequence numbers.
for i := range N {
idx := i
rx := receivers[i]
go func() {
defer wg.Done()
for time.Now().Before(stopAt) {
env, ok, err := rx.TryReceive()
if err != nil {
return
}
if !ok {
continue
}
p, ok := env.Payload.(SeqPayload)
if !ok {
// If your Payload is pointer/interface-heavy, adjust accordingly.
continue
}
stats[idx].observed++
if stats[idx].lastSeen == 0 {
stats[idx].lastSeen = p.Seq
continue
}
if p.Seq > stats[idx].lastSeen+1 {
stats[idx].missed += (p.Seq - stats[idx].lastSeen - 1)
}
stats[idx].lastSeen = p.Seq
}
}()
}
wg.Wait()
var totalSent, totalObs, totalMiss uint64
minDrop, maxDrop := 100.0, 0.0
for i := range N {
totalSent += stats[i].sent
totalObs += stats[i].observed
totalMiss += stats[i].missed
den := stats[i].observed + stats[i].missed
dropPct := 0.0
if den > 0 {
dropPct = 100.0 * float64(stats[i].missed) / float64(den)
}
if dropPct < minDrop {
minDrop = dropPct
}
if dropPct > maxDrop {
maxDrop = dropPct
}
fmt.Printf("stream[%02d] sent=%6d observed=%6d missed=%6d lastSeen=%6d drop=%5.2f%%\n",
i, stats[i].sent, stats[i].observed, stats[i].missed, stats[i].lastSeen, dropPct)
}
totalDen := totalObs + totalMiss
totalDrop := 0.0
if totalDen > 0 {
totalDrop = 100.0 * float64(totalMiss) / float64(totalDen)
}
fmt.Printf("\nTOTAL sent=%d observed=%d missed=%d drop=%.2f%% (min=%.2f%% max=%.2f%%)\n",
totalSent, totalObs, totalMiss, totalDrop, minDrop, maxDrop)
}

33
pkg/data/component.go Normal file
View File

@@ -0,0 +1,33 @@
package data
import "context"
type Component interface {
Start(ctx context.Context) error
Stop(ctx context.Context) error
ValidateConfig(cfg ComponentConfig) (ComponentConfigValidation, error)
ApplyConfig(cfg ComponentConfig) error
Status() ComponentStatus
}
type ComponentRuntime interface {
OpenStream(id StreamID) (Stream, error)
ReportError(fatal bool, err error)
}
type ComponentStatus struct {
Active bool
Patching bool
Config any
Error error
}
type ComponentConfig any
type ComponentConfigValidation struct {
Valid bool
RequiresRestart bool
Warnings []string
}

View File

@@ -1,2 +0,0 @@
// Package coordinator ...
package coordinator

View File

@@ -1,18 +0,0 @@
package data
import (
"context"
"github.com/google/uuid"
)
type Egress interface {
Name() string
Configure(cfg map[string]any) error
Connect(ctx context.Context, actions EgresActions) error
Disconnect(ctx context.Context) error
}
type EgresActions interface {
Subscribe(ctx context.Context, namespace string, id uuid.UUID) (<-chan Envelope, func(), error)
}

View File

@@ -1,34 +0,0 @@
package data
import (
"context"
"time"
"github.com/google/uuid"
)
type Ingress interface {
Name() string
Configure(cfg map[string]any) error
Connect(ctx context.Context, actions IngressActions) error
Disconnect(ctx context.Context) error
}
type IngressActions interface {
Emit(namespace string, id uuid.UUID, envelope Envelope) error
EmitBatch(namespace string, id uuid.UUID, envelopes []Envelope) error
}
type LiveIngress interface {
Subscribe(ctx context.Context, key ...string) error
Unsubscribe(ctx context.Context, key ...string) error
ListAvailableKeys(ctx context.Context) []string
IsValidKey(ctx context.Context, key string) bool
}
type HistoricalIngress interface {
Fetch(ctx context.Context, key string, start time.Time, end time.Time) ([]Envelope, error)
ListAvailableKeys(ctx context.Context) []string
IsValidKey(ctx context.Context, key string) bool
// some other method that gives the avalible time period for a key
}

View File

@@ -1,14 +0,0 @@
package data
import "context"
type Processor interface {
Name() string
Configure(cfg map[string]any) error
Run(ctx context.Context, actions ProcessorActions) error
}
type ProcessorActions interface {
IngressActions
EgresActions
}

View File

@@ -1,2 +0,0 @@
// Package router ...
package router

130
pkg/data/routing/inproc.go Normal file
View File

@@ -0,0 +1,130 @@
package routing
import (
"context"
"errors"
"sync"
"time"
"gitlab.michelsen.id/phillmichelsen/tessera/pkg/data"
)
type InprocRouter struct {
mu sync.RWMutex
streams map[data.StreamID]*inprocStream
}
func NewInprocRouter() *InprocRouter {
return &InprocRouter{
streams: make(map[data.StreamID]*inprocStream),
}
}
func (r *InprocRouter) OpenStream(id data.StreamID) (data.Stream, error) {
r.mu.Lock()
defer r.mu.Unlock()
s := r.streams[id]
if s != nil {
return s, nil
}
s = newInprocStream(id)
r.streams[id] = s
return s, nil
}
type inprocStream struct {
id data.StreamID
seq uint64
latest data.Envelope
streamClosed bool
mu sync.RWMutex
}
func newInprocStream(id data.StreamID) *inprocStream {
return &inprocStream{
id: id,
}
}
func (s *inprocStream) ID() data.StreamID {
return s.id
}
func (s *inprocStream) Sender() data.Sender {
return &inprocSender{stream: s}
}
func (s *inprocStream) Receiver() data.Receiver {
s.mu.RLock()
cur := s.seq
s.mu.RUnlock()
return &inprocReceiver{
stream: s,
lastSeenSeq: cur,
}
}
type inprocSender struct {
stream *inprocStream
}
func (tx *inprocSender) Send(ctx context.Context, env data.Envelope) error {
if err := ctx.Err(); err != nil {
return err
}
s := tx.stream
s.mu.Lock()
defer s.mu.Unlock()
if s.streamClosed {
return errors.New("stream closed")
}
env.SendTime = time.Now().UTC()
s.seq++
s.latest = env
return nil
}
func (tx *inprocSender) SendBatch(ctx context.Context, envs []data.Envelope) error {
panic("unimplemented")
}
type inprocReceiver struct {
stream *inprocStream
lastSeenSeq uint64
}
func (rx *inprocReceiver) TryReceive() (data.Envelope, bool, error) {
s := rx.stream
s.mu.RLock()
defer s.mu.RUnlock()
if s.streamClosed {
return data.Envelope{}, false, errors.New("stream closed")
}
if s.seq == 0 || s.seq == rx.lastSeenSeq {
return data.Envelope{}, false, nil
}
rx.lastSeenSeq = s.seq
return s.latest, true, nil
}
func (rx *inprocReceiver) ReceiveNext(ctx context.Context) (data.Envelope, error) {
panic("unimplemented")
}
func (rx *inprocReceiver) Seq() uint64 {
return rx.lastSeenSeq
}

27
pkg/data/stream.go Normal file
View File

@@ -0,0 +1,27 @@
package data
import (
"context"
"github.com/google/uuid"
)
type StreamID uuid.UUID
type Stream interface {
ID() StreamID
Sender() Sender
Receiver() Receiver
}
type Sender interface {
Send(context.Context, Envelope) error
SendBatch(context.Context, []Envelope) error
}
type Receiver interface {
TryReceive() (Envelope, bool, error)
ReceiveNext(context.Context) (Envelope, error)
Seq() uint64
}