diff --git a/cmd/datad/main.go b/cmd/datad/main.go index 38dd16d..119c10d 100644 --- a/cmd/datad/main.go +++ b/cmd/datad/main.go @@ -1,3 +1,173 @@ package main -func main() {} +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/google/uuid" + "gitlab.michelsen.id/phillmichelsen/tessera/pkg/data" + "gitlab.michelsen.id/phillmichelsen/tessera/pkg/data/routing" +) + +type SeqPayload struct { + Seq uint64 +} + +type streamStats struct { + sent uint64 + observed uint64 + missed uint64 + lastSeen uint64 + lastReport time.Time +} + +func main() { + ctx := context.Background() + + // ---- Knobs ---- + N := 10 + duration := 5 * time.Second + totalTargetPerSec := 5_000 // total across all streams + // ---------------- + + rt := routing.NewInprocRouter() + + senders := make([]data.Sender, N) + receivers := make([]data.Receiver, N) + + for i := range N { + st, err := rt.OpenStream(data.StreamID(uuid.New())) + if err != nil { + panic(err) + } + senders[i] = st.Sender() + receivers[i] = st.Receiver() + } + + perStreamTarget := totalTargetPerSec / N + if perStreamTarget == 0 { + perStreamTarget = 1 + } + + fmt.Printf("N=%d duration=%s totalTarget=%d/s perStreamTarget=%d/s\n", + N, duration, totalTargetPerSec, perStreamTarget) + + stopAt := time.Now().Add(duration) + + stats := make([]streamStats, N) + + var wg sync.WaitGroup + wg.Add(N + 1) + + // Publisher: per-stream sender sequence in envelope payload. + go func() { + defer wg.Done() + + tick := time.NewTicker(1 * time.Millisecond) + defer tick.Stop() + + perTick := perStreamTarget / 1000 + rem := perStreamTarget % 1000 + remAcc := make([]int, N) + + seq := make([]uint64, N) + + for time.Now().Before(stopAt) { + <-tick.C + + for i := range N { + n := int(perTick) + remAcc[i] += rem + if remAcc[i] >= 1000 { + n++ + remAcc[i] -= 1000 + } + + for j := 0; j < n; j++ { + seq[i]++ + + env := data.Envelope{ + Payload: SeqPayload{Seq: seq[i]}, + } + _ = senders[i].Send(ctx, env) + stats[i].sent++ + } + } + } + }() + + // Consumers: detect missed sender sequence numbers. + for i := range N { + idx := i + rx := receivers[i] + + go func() { + defer wg.Done() + + for time.Now().Before(stopAt) { + env, ok, err := rx.TryReceive() + if err != nil { + return + } + if !ok { + continue + } + + p, ok := env.Payload.(SeqPayload) + if !ok { + // If your Payload is pointer/interface-heavy, adjust accordingly. + continue + } + + stats[idx].observed++ + + if stats[idx].lastSeen == 0 { + stats[idx].lastSeen = p.Seq + continue + } + + if p.Seq > stats[idx].lastSeen+1 { + stats[idx].missed += (p.Seq - stats[idx].lastSeen - 1) + } + stats[idx].lastSeen = p.Seq + } + }() + } + + wg.Wait() + + var totalSent, totalObs, totalMiss uint64 + minDrop, maxDrop := 100.0, 0.0 + + for i := range N { + totalSent += stats[i].sent + totalObs += stats[i].observed + totalMiss += stats[i].missed + + den := stats[i].observed + stats[i].missed + dropPct := 0.0 + if den > 0 { + dropPct = 100.0 * float64(stats[i].missed) / float64(den) + } + if dropPct < minDrop { + minDrop = dropPct + } + if dropPct > maxDrop { + maxDrop = dropPct + } + + fmt.Printf("stream[%02d] sent=%6d observed=%6d missed=%6d lastSeen=%6d drop=%5.2f%%\n", + i, stats[i].sent, stats[i].observed, stats[i].missed, stats[i].lastSeen, dropPct) + } + + totalDen := totalObs + totalMiss + totalDrop := 0.0 + if totalDen > 0 { + totalDrop = 100.0 * float64(totalMiss) / float64(totalDen) + } + + fmt.Printf("\nTOTAL sent=%d observed=%d missed=%d drop=%.2f%% (min=%.2f%% max=%.2f%%)\n", + totalSent, totalObs, totalMiss, totalDrop, minDrop, maxDrop) +} diff --git a/pkg/data/component.go b/pkg/data/component.go new file mode 100644 index 0000000..c72a716 --- /dev/null +++ b/pkg/data/component.go @@ -0,0 +1,33 @@ +package data + +import "context" + +type Component interface { + Start(ctx context.Context) error + Stop(ctx context.Context) error + + ValidateConfig(cfg ComponentConfig) (ComponentConfigValidation, error) + ApplyConfig(cfg ComponentConfig) error + + Status() ComponentStatus +} + +type ComponentRuntime interface { + OpenStream(id StreamID) (Stream, error) + ReportError(fatal bool, err error) +} + +type ComponentStatus struct { + Active bool + Patching bool + Config any + Error error +} + +type ComponentConfig any + +type ComponentConfigValidation struct { + Valid bool + RequiresRestart bool + Warnings []string +} diff --git a/pkg/data/coordinator/coordinator.go b/pkg/data/coordinator/coordinator.go deleted file mode 100644 index aedabf3..0000000 --- a/pkg/data/coordinator/coordinator.go +++ /dev/null @@ -1,2 +0,0 @@ -// Package coordinator ... -package coordinator diff --git a/pkg/data/egress.go b/pkg/data/egress.go deleted file mode 100644 index 7f27c8c..0000000 --- a/pkg/data/egress.go +++ /dev/null @@ -1,18 +0,0 @@ -package data - -import ( - "context" - - "github.com/google/uuid" -) - -type Egress interface { - Name() string - Configure(cfg map[string]any) error - Connect(ctx context.Context, actions EgresActions) error - Disconnect(ctx context.Context) error -} - -type EgresActions interface { - Subscribe(ctx context.Context, namespace string, id uuid.UUID) (<-chan Envelope, func(), error) -} diff --git a/pkg/data/registry.go b/pkg/data/errors.go similarity index 100% rename from pkg/data/registry.go rename to pkg/data/errors.go diff --git a/pkg/data/ingress.go b/pkg/data/ingress.go deleted file mode 100644 index b1c562a..0000000 --- a/pkg/data/ingress.go +++ /dev/null @@ -1,34 +0,0 @@ -package data - -import ( - "context" - "time" - - "github.com/google/uuid" -) - -type Ingress interface { - Name() string - Configure(cfg map[string]any) error - Connect(ctx context.Context, actions IngressActions) error - Disconnect(ctx context.Context) error -} - -type IngressActions interface { - Emit(namespace string, id uuid.UUID, envelope Envelope) error - EmitBatch(namespace string, id uuid.UUID, envelopes []Envelope) error -} - -type LiveIngress interface { - Subscribe(ctx context.Context, key ...string) error - Unsubscribe(ctx context.Context, key ...string) error - ListAvailableKeys(ctx context.Context) []string - IsValidKey(ctx context.Context, key string) bool -} - -type HistoricalIngress interface { - Fetch(ctx context.Context, key string, start time.Time, end time.Time) ([]Envelope, error) - ListAvailableKeys(ctx context.Context) []string - IsValidKey(ctx context.Context, key string) bool - // some other method that gives the avalible time period for a key -} diff --git a/pkg/data/processor.go b/pkg/data/processor.go deleted file mode 100644 index ada97ac..0000000 --- a/pkg/data/processor.go +++ /dev/null @@ -1,14 +0,0 @@ -package data - -import "context" - -type Processor interface { - Name() string - Configure(cfg map[string]any) error - Run(ctx context.Context, actions ProcessorActions) error -} - -type ProcessorActions interface { - IngressActions - EgresActions -} diff --git a/pkg/data/router/router.go b/pkg/data/router/router.go deleted file mode 100644 index ee0d945..0000000 --- a/pkg/data/router/router.go +++ /dev/null @@ -1,2 +0,0 @@ -// Package router ... -package router diff --git a/pkg/data/routing/inproc.go b/pkg/data/routing/inproc.go new file mode 100644 index 0000000..704ea77 --- /dev/null +++ b/pkg/data/routing/inproc.go @@ -0,0 +1,130 @@ +package routing + +import ( + "context" + "errors" + "sync" + "time" + + "gitlab.michelsen.id/phillmichelsen/tessera/pkg/data" +) + +type InprocRouter struct { + mu sync.RWMutex + streams map[data.StreamID]*inprocStream +} + +func NewInprocRouter() *InprocRouter { + return &InprocRouter{ + streams: make(map[data.StreamID]*inprocStream), + } +} + +func (r *InprocRouter) OpenStream(id data.StreamID) (data.Stream, error) { + r.mu.Lock() + defer r.mu.Unlock() + + s := r.streams[id] + if s != nil { + return s, nil + } + + s = newInprocStream(id) + r.streams[id] = s + return s, nil +} + +type inprocStream struct { + id data.StreamID + + seq uint64 + latest data.Envelope + + streamClosed bool + mu sync.RWMutex +} + +func newInprocStream(id data.StreamID) *inprocStream { + return &inprocStream{ + id: id, + } +} + +func (s *inprocStream) ID() data.StreamID { + return s.id +} + +func (s *inprocStream) Sender() data.Sender { + return &inprocSender{stream: s} +} + +func (s *inprocStream) Receiver() data.Receiver { + s.mu.RLock() + cur := s.seq + s.mu.RUnlock() + + return &inprocReceiver{ + stream: s, + lastSeenSeq: cur, + } + +} + +type inprocSender struct { + stream *inprocStream +} + +func (tx *inprocSender) Send(ctx context.Context, env data.Envelope) error { + if err := ctx.Err(); err != nil { + return err + } + + s := tx.stream + s.mu.Lock() + defer s.mu.Unlock() + + if s.streamClosed { + return errors.New("stream closed") + } + + env.SendTime = time.Now().UTC() + + s.seq++ + s.latest = env + + return nil +} + +func (tx *inprocSender) SendBatch(ctx context.Context, envs []data.Envelope) error { + panic("unimplemented") +} + +type inprocReceiver struct { + stream *inprocStream + lastSeenSeq uint64 +} + +func (rx *inprocReceiver) TryReceive() (data.Envelope, bool, error) { + s := rx.stream + s.mu.RLock() + defer s.mu.RUnlock() + + if s.streamClosed { + return data.Envelope{}, false, errors.New("stream closed") + } + + if s.seq == 0 || s.seq == rx.lastSeenSeq { + return data.Envelope{}, false, nil + } + + rx.lastSeenSeq = s.seq + return s.latest, true, nil +} + +func (rx *inprocReceiver) ReceiveNext(ctx context.Context) (data.Envelope, error) { + panic("unimplemented") +} + +func (rx *inprocReceiver) Seq() uint64 { + return rx.lastSeenSeq +} diff --git a/pkg/data/stream.go b/pkg/data/stream.go new file mode 100644 index 0000000..71de943 --- /dev/null +++ b/pkg/data/stream.go @@ -0,0 +1,27 @@ +package data + +import ( + "context" + + "github.com/google/uuid" +) + +type StreamID uuid.UUID + +type Stream interface { + ID() StreamID + + Sender() Sender + Receiver() Receiver +} + +type Sender interface { + Send(context.Context, Envelope) error + SendBatch(context.Context, []Envelope) error +} + +type Receiver interface { + TryReceive() (Envelope, bool, error) + ReceiveNext(context.Context) (Envelope, error) + Seq() uint64 +}