From 28d04e04e122a9fb4bd57eaf75c07e55898faee6 Mon Sep 17 00:00:00 2001 From: Phillip Michelsen Date: Wed, 25 Feb 2026 02:35:52 +0800 Subject: [PATCH] Added performant in-process message broker to data's routing subpackage --- cmd/datad/main.go | 375 +++++++++++++++++++++++---------- pkg/data/routing/broker.go | 56 +++++ pkg/data/routing/inproc.go | 130 ------------ pkg/data/routing/interfaces.go | 25 +++ pkg/data/routing/router.go | 201 ++++++++++++++++++ 5 files changed, 540 insertions(+), 247 deletions(-) create mode 100644 pkg/data/routing/broker.go delete mode 100644 pkg/data/routing/inproc.go create mode 100644 pkg/data/routing/interfaces.go create mode 100644 pkg/data/routing/router.go diff --git a/cmd/datad/main.go b/cmd/datad/main.go index 119c10d..b32e948 100644 --- a/cmd/datad/main.go +++ b/cmd/datad/main.go @@ -3,171 +3,312 @@ package main import ( "context" "fmt" + "math/rand" + "runtime" + "strconv" + "strings" "sync" + "sync/atomic" "time" - "github.com/google/uuid" "gitlab.michelsen.id/phillmichelsen/tessera/pkg/data" "gitlab.michelsen.id/phillmichelsen/tessera/pkg/data/routing" ) -type SeqPayload struct { - Seq uint64 +/* +Realistic-ish market-data style test. + +Model: +- 1 publisher per topic (instrument / feed partition) +- Each message carries a strictly increasing sequence number (per topic) +- Subscribers validate in-order, gap-free delivery +- Publishers send with bursty pacing to approximate L3-ish behavior: + send BurstSize messages back-to-back, then sleep to maintain AvgRate. + +Defaults are intentionally moderate. Increase topics/rates to stress. +*/ + +const ( + NumTopics = 8 // topics/instruments/partitions + SubsPerTopic = 6 // fan-out per topic + RingCapacity = 1 << 14 + TestDuration = 60 * time.Second + AvgRatePerTopic = 500_000 // msgs/sec per topic (average) + BurstSize = 512 // burst messages then sleep to preserve avg + + // If true, subscribers spin-poll (TryReceive). If false, blocking Receive. + UseTryReceive = false +) + +type topicStats struct { + published atomic.Uint64 } -type streamStats struct { - sent uint64 - observed uint64 - missed uint64 - lastSeen uint64 - lastReport time.Time +type subStats struct { + received atomic.Uint64 + errors atomic.Uint64 } func main() { - ctx := context.Background() + fmt.Printf("Market-Data Routing Test\n") + fmt.Printf("Topics: %d | Subs/Topic: %d | Duration: %v\n", NumTopics, SubsPerTopic, TestDuration) + fmt.Printf("AvgRate/Topic: %d msg/s | BurstSize: %d | Mode: %s\n\n", + AvgRatePerTopic, BurstSize, modeName()) - // ---- Knobs ---- - N := 10 - duration := 5 * time.Second - totalTargetPerSec := 5_000 // total across all streams - // ---------------- + broker := routing.NewBroker() - rt := routing.NewInprocRouter() + topics := make([]string, NumTopics) + for i := 0; i < NumTopics; i++ { + topics[i] = fmt.Sprintf("FUT_L3_%02d", i) + } - senders := make([]data.Sender, N) - receivers := make([]data.Receiver, N) + // Create publishers first to size the rings. + pubs := make([]routing.Publisher, NumTopics) + for i := 0; i < NumTopics; i++ { + pubs[i] = broker.RegisterPublisher(topics[i], RingCapacity) + } - for i := range N { - st, err := rt.OpenStream(data.StreamID(uuid.New())) - if err != nil { - panic(err) + // Per-topic published counters (ground truth). + tStats := make([]*topicStats, NumTopics) + for i := range tStats { + tStats[i] = &topicStats{} + } + + // Subscribers: attach evenly, validate ordering. + var subsWG sync.WaitGroup + sStats := make([][]*subStats, NumTopics) // [topic][sub] + for ti := 0; ti < NumTopics; ti++ { + sStats[ti] = make([]*subStats, SubsPerTopic) + for si := 0; si < SubsPerTopic; si++ { + sStats[ti][si] = &subStats{} } - senders[i] = st.Sender() - receivers[i] = st.Receiver() } - perStreamTarget := totalTargetPerSec / N - if perStreamTarget == 0 { - perStreamTarget = 1 - } + ctx, cancel := context.WithTimeout(context.Background(), TestDuration) + defer cancel() - fmt.Printf("N=%d duration=%s totalTarget=%d/s perStreamTarget=%d/s\n", - N, duration, totalTargetPerSec, perStreamTarget) + start := time.Now() - stopAt := time.Now().Add(duration) + for ti := 0; ti < NumTopics; ti++ { + topic := topics[ti] + for si := 0; si < SubsPerTopic; si++ { + sub := broker.RegisterSubscriber(topic) + stats := sStats[ti][si] + subsWG.Add(1) - stats := make([]streamStats, N) + go func(topicIndex int, subIndex int, subscriber routing.Subscriber, st *subStats) { + defer subsWG.Done() - var wg sync.WaitGroup - wg.Add(N + 1) + var expected uint64 = 0 - // Publisher: per-stream sender sequence in envelope payload. - go func() { - defer wg.Done() - - tick := time.NewTicker(1 * time.Millisecond) - defer tick.Stop() - - perTick := perStreamTarget / 1000 - rem := perStreamTarget % 1000 - remAcc := make([]int, N) - - seq := make([]uint64, N) - - for time.Now().Before(stopAt) { - <-tick.C - - for i := range N { - n := int(perTick) - remAcc[i] += rem - if remAcc[i] >= 1000 { - n++ - remAcc[i] -= 1000 - } - - for j := 0; j < n; j++ { - seq[i]++ - - env := data.Envelope{ - Payload: SeqPayload{Seq: seq[i]}, + for { + if ctx.Err() != nil { + return } - _ = senders[i].Send(ctx, env) - stats[i].sent++ + + var ( + env data.Envelope + ok bool + err error + ) + + if UseTryReceive { + env, ok, err = subscriber.TryReceive() + if err != nil { + st.errors.Add(1) + cancel() + return + } + if !ok { + runtime.Gosched() + continue + } + } else { + env, err = subscriber.Receive(ctx) + if err != nil { + // Context cancellation is normal at end of test. + if ctx.Err() != nil { + return + } + st.errors.Add(1) + cancel() + return + } + } + + seq, parseOK := parseSeq(env) + if !parseOK { + st.errors.Add(1) + cancel() + return + } + + if seq != expected { + // Out-of-order or gap detected. + st.errors.Add(1) + cancel() + return + } + + expected++ + st.received.Add(1) } - } + }(ti, si, sub, stats) } - }() + } - // Consumers: detect missed sender sequence numbers. - for i := range N { - idx := i - rx := receivers[i] + // Publishers: bursty pacing to approximate “average rate with bursts”. + var pubsWG sync.WaitGroup + for ti := 0; ti < NumTopics; ti++ { + pub := pubs[ti] + stats := tStats[ti] + pubsWG.Add(1) - go func() { - defer wg.Done() + go func(topicIndex int, p routing.Publisher, st *topicStats) { + defer pubsWG.Done() - for time.Now().Before(stopAt) { - env, ok, err := rx.TryReceive() - if err != nil { + // Maintain AvgRatePerTopic as an average by sleeping after bursts. + // burstDuration = BurstSize / AvgRatePerTopic seconds + burstNs := int64(0) + if AvgRatePerTopic > 0 { + burstNs = int64(time.Second) * int64(BurstSize) / int64(AvgRatePerTopic) + } + if burstNs <= 0 { + burstNs = 1 + } + + var seq uint64 = 0 + + // Optional small jitter to avoid perfect lockstep across topics. + jitter := time.Duration(rand.Intn(200)) * time.Microsecond + + nextBurstAt := time.Now().Add(jitter) + + for { + if ctx.Err() != nil { return } - if !ok { - continue + + now := time.Now() + if now.Before(nextBurstAt) { + time.Sleep(nextBurstAt.Sub(now)) } - p, ok := env.Payload.(SeqPayload) - if !ok { - // If your Payload is pointer/interface-heavy, adjust accordingly. - continue - } + // Send BurstSize messages back-to-back. + sendTime := time.Now() + for i := 0; i < BurstSize; i++ { + if ctx.Err() != nil { + return + } - stats[idx].observed++ + env := data.Envelope{ + SendTime: sendTime, + Descriptor: data.Descriptor{Key: "SEQ"}, // keep your existing descriptor usage + Payload: formatSeq(seq), + // Any other fields you use can be set here. + } - if stats[idx].lastSeen == 0 { - stats[idx].lastSeen = p.Seq - continue + p.Publish(env) + seq++ } + st.published.Add(uint64(BurstSize)) - if p.Seq > stats[idx].lastSeen+1 { - stats[idx].missed += (p.Seq - stats[idx].lastSeen - 1) - } - stats[idx].lastSeen = p.Seq + // Schedule next burst to maintain average rate. + nextBurstAt = nextBurstAt.Add(time.Duration(burstNs)) } - }() + }(ti, pub, stats) } - wg.Wait() + // Wait for timeout, then stop and drain. + <-ctx.Done() - var totalSent, totalObs, totalMiss uint64 - minDrop, maxDrop := 100.0, 0.0 + // Ensure publishers exit. + pubsWG.Wait() - for i := range N { - totalSent += stats[i].sent - totalObs += stats[i].observed - totalMiss += stats[i].missed + // Subscribers may still be blocked; cancel again and wait. + cancel() + subsWG.Wait() - den := stats[i].observed + stats[i].missed - dropPct := 0.0 - if den > 0 { - dropPct = 100.0 * float64(stats[i].missed) / float64(den) + totalTime := time.Since(start) + + // Report. + var totalPublished uint64 + var totalReceived uint64 + var totalErrors uint64 + + for ti := 0; ti < NumTopics; ti++ { + pub := tStats[ti].published.Load() + totalPublished += pub + + var topicRecv uint64 + var topicErr uint64 + for si := 0; si < SubsPerTopic; si++ { + topicRecv += sStats[ti][si].received.Load() + topicErr += sStats[ti][si].errors.Load() } - if dropPct < minDrop { - minDrop = dropPct - } - if dropPct > maxDrop { - maxDrop = dropPct + totalReceived += topicRecv + totalErrors += topicErr + + // Each subscriber should have received ~published for that topic. + avgPerSub := uint64(0) + if SubsPerTopic > 0 { + avgPerSub = topicRecv / uint64(SubsPerTopic) } - fmt.Printf("stream[%02d] sent=%6d observed=%6d missed=%6d lastSeen=%6d drop=%5.2f%%\n", - i, stats[i].sent, stats[i].observed, stats[i].missed, stats[i].lastSeen, dropPct) + fmt.Printf("Topic %s: published=%d | avg_received_per_sub=%d | sub_errors=%d\n", + topics[ti], pub, avgPerSub, topicErr) } - totalDen := totalObs + totalMiss - totalDrop := 0.0 - if totalDen > 0 { - totalDrop = 100.0 * float64(totalMiss) / float64(totalDen) - } + pubRate := float64(totalPublished) / totalTime.Seconds() + deliveriesRate := float64(totalReceived) / totalTime.Seconds() - fmt.Printf("\nTOTAL sent=%d observed=%d missed=%d drop=%.2f%% (min=%.2f%% max=%.2f%%)\n", - totalSent, totalObs, totalMiss, totalDrop, minDrop, maxDrop) + fmt.Printf("\nTotal Time: %v\n", totalTime) + fmt.Printf("Total Published: %d msgs\n", totalPublished) + fmt.Printf("Total Deliveries: %d (published * subs/topic, minus cancellations)\n", totalReceived) + fmt.Printf("Publish Rate: %.2f msgs/sec (aggregate)\n", pubRate) + fmt.Printf("Delivery Rate: %.2f deliveries/sec (aggregate)\n", deliveriesRate) + fmt.Printf("Validation Errors: %d\n", totalErrors) + + if totalErrors == 0 { + fmt.Printf("Result: PASS (in-order, gap-free until cancellation)\n") + } else { + fmt.Printf("Result: FAIL (see errors above; test cancels on first detected issue)\n") + } +} + +func modeName() string { + if UseTryReceive { + return "TryReceive (spin)" + } + return "Receive (blocking)" +} + +// formatSeq encodes the per-topic sequence into a string payload. +// This compiles whether Envelope.Payload is string or interface{} accepting string. +func formatSeq(seq uint64) string { + // Keep it cheap to parse: decimal only. + return strconv.FormatUint(seq, 10) +} + +func parseSeq(env data.Envelope) (uint64, bool) { + // If you later switch Payload to a structured type, change this accordingly. + s, ok := env.Payload.(string) + if !ok { + // If Payload is defined as string (not interface{}), remove this type assert and just use env.Payload. + // This branch is for interface{} payloads where non-string could appear. + return 0, false + } + + // Fast path: no extra fields. + // If you later include pubID:seq, you can parse with strings.Cut. + if strings.IndexByte(s, ':') >= 0 { + _, right, ok := strings.Cut(s, ":") + if !ok { + return 0, false + } + s = right + } + + v, err := strconv.ParseUint(s, 10, 64) + return v, err == nil } diff --git a/pkg/data/routing/broker.go b/pkg/data/routing/broker.go new file mode 100644 index 0000000..d8ce09b --- /dev/null +++ b/pkg/data/routing/broker.go @@ -0,0 +1,56 @@ +package routing + +import "sync" + +const DefaultRingCapacity = 1 << 8 // Best if power of 2 (or so I am told) + +// Broker manages topics and issues publisher/subscriber handles. +type Broker struct { + mu sync.RWMutex + topics map[string]*TopicRing +} + +func NewBroker() *Broker { + return &Broker{ + topics: make(map[string]*TopicRing), + } +} + +// getOrCreateRing handles the race condition where a subscriber might attach +// to a topic before the publisher has created it, and vice versa. +// This is because we allow either a publisher or a subscriber to 'create' the topic +func (b *Broker) getOrCreateRing(topicKey string, requestedCap int) *TopicRing { + b.mu.Lock() + defer b.mu.Unlock() + + ring, exists := b.topics[topicKey] + if !exists { + cap := requestedCap + if cap <= 0 { + cap = DefaultRingCapacity + } + ring = newTopicRing(cap) + b.topics[topicKey] = ring + } + return ring +} + +// RegisterPublisher returns a fast-path Publisher. +func (b *Broker) RegisterPublisher(topicKey string, capacity int) Publisher { + ring := b.getOrCreateRing(topicKey, capacity) + return &ringPublisher{ + ring: ring, + } +} + +// RegisterSubscriber attaches a consumer to a topic and returns a fast-path Subscriber. +// We don't allow subscribrs to specify a buffer capacity size. +// As a general rule, a publisher takes precedence over a subscriber +func (b *Broker) RegisterSubscriber(topicKey string) Subscriber { + ring := b.getOrCreateRing(topicKey, 0) + consumer := ring.addConsumer() + return &ringSubscriber{ + ring: ring, + consumer: consumer, + } +} diff --git a/pkg/data/routing/inproc.go b/pkg/data/routing/inproc.go deleted file mode 100644 index 704ea77..0000000 --- a/pkg/data/routing/inproc.go +++ /dev/null @@ -1,130 +0,0 @@ -package routing - -import ( - "context" - "errors" - "sync" - "time" - - "gitlab.michelsen.id/phillmichelsen/tessera/pkg/data" -) - -type InprocRouter struct { - mu sync.RWMutex - streams map[data.StreamID]*inprocStream -} - -func NewInprocRouter() *InprocRouter { - return &InprocRouter{ - streams: make(map[data.StreamID]*inprocStream), - } -} - -func (r *InprocRouter) OpenStream(id data.StreamID) (data.Stream, error) { - r.mu.Lock() - defer r.mu.Unlock() - - s := r.streams[id] - if s != nil { - return s, nil - } - - s = newInprocStream(id) - r.streams[id] = s - return s, nil -} - -type inprocStream struct { - id data.StreamID - - seq uint64 - latest data.Envelope - - streamClosed bool - mu sync.RWMutex -} - -func newInprocStream(id data.StreamID) *inprocStream { - return &inprocStream{ - id: id, - } -} - -func (s *inprocStream) ID() data.StreamID { - return s.id -} - -func (s *inprocStream) Sender() data.Sender { - return &inprocSender{stream: s} -} - -func (s *inprocStream) Receiver() data.Receiver { - s.mu.RLock() - cur := s.seq - s.mu.RUnlock() - - return &inprocReceiver{ - stream: s, - lastSeenSeq: cur, - } - -} - -type inprocSender struct { - stream *inprocStream -} - -func (tx *inprocSender) Send(ctx context.Context, env data.Envelope) error { - if err := ctx.Err(); err != nil { - return err - } - - s := tx.stream - s.mu.Lock() - defer s.mu.Unlock() - - if s.streamClosed { - return errors.New("stream closed") - } - - env.SendTime = time.Now().UTC() - - s.seq++ - s.latest = env - - return nil -} - -func (tx *inprocSender) SendBatch(ctx context.Context, envs []data.Envelope) error { - panic("unimplemented") -} - -type inprocReceiver struct { - stream *inprocStream - lastSeenSeq uint64 -} - -func (rx *inprocReceiver) TryReceive() (data.Envelope, bool, error) { - s := rx.stream - s.mu.RLock() - defer s.mu.RUnlock() - - if s.streamClosed { - return data.Envelope{}, false, errors.New("stream closed") - } - - if s.seq == 0 || s.seq == rx.lastSeenSeq { - return data.Envelope{}, false, nil - } - - rx.lastSeenSeq = s.seq - return s.latest, true, nil -} - -func (rx *inprocReceiver) ReceiveNext(ctx context.Context) (data.Envelope, error) { - panic("unimplemented") -} - -func (rx *inprocReceiver) Seq() uint64 { - return rx.lastSeenSeq -} diff --git a/pkg/data/routing/interfaces.go b/pkg/data/routing/interfaces.go new file mode 100644 index 0000000..eec2d28 --- /dev/null +++ b/pkg/data/routing/interfaces.go @@ -0,0 +1,25 @@ +package routing + +import ( + "context" + + "gitlab.michelsen.id/phillmichelsen/tessera/pkg/data" +) + +// Publisher is the write-side handle given to data sources. +type Publisher interface { + Publish(env data.Envelope) +} + +// Subscriber is the read-side handle given to consumers (data sinks). +type Subscriber interface { + // Receive blocks until a message is available or the context cancels. + // Best for general low-latency consumers that shouldn't burn CPU. + // Typically more than enough for most situations + Receive(ctx context.Context) (data.Envelope, error) + + // TryReceive attempts to read one message lock-free. + // Returns (envelope, true, nil) if successful, or false if nothing is available. + // Polling TryReceive without a wait will most likely spike the CPU + TryReceive() (data.Envelope, bool, error) +} diff --git a/pkg/data/routing/router.go b/pkg/data/routing/router.go new file mode 100644 index 0000000..484c100 --- /dev/null +++ b/pkg/data/routing/router.go @@ -0,0 +1,201 @@ +// router.go +package routing + +import ( + "context" + "errors" + "sync" + "sync/atomic" + + "gitlab.michelsen.id/phillmichelsen/tessera/pkg/data" +) + +var ErrDisconnected = errors.New("subscriber disconnected: failed to consume fast enough") + +// IMPLEMENTATIONS + +// Implements the Publisher interface +type ringPublisher struct { + ring *TopicRing +} + +func (p *ringPublisher) Publish(env data.Envelope) { + p.ring.publish(env) +} + +// Implements the Subscriber interface +type ringSubscriber struct { + ring *TopicRing + consumer *ringConsumer +} + +func (s *ringSubscriber) Receive(ctx context.Context) (data.Envelope, error) { + return s.ring.receive(ctx, s.consumer) +} + +func (s *ringSubscriber) TryReceive() (data.Envelope, bool, error) { + return s.ring.tryReceive(s.consumer) +} + +// ringConsumer represents a single subscriber's read state within a TopicRing +// The 56-byte pads are added to prevent 'False Sharing' due to 64-byte cache sizes +type ringConsumer struct { + ID uint64 // monotonically increasing identifier + _ [56]byte + + Cursor atomic.Uint64 // next expected sequence number, advanced monotonically + _ [56]byte + + Dead atomic.Bool // set true if the consumer has fallen behind ring capacity, consumer should be disconnected + _ [56]byte + + notify chan struct{} // size-1 wakeup channel for subscribers to block whilst waiting for new data +} + +// TopicRing is a broadcast ring buffer for a topic +// It is designed to be minimize locks, same 56-byte pads used here as well +// The publisher appends sequentially whilst each subscriber maintains its own cursor (ringConsumer) +// We typically aim for a capacity that is power-of-two sized for reasons beyond my knowledge +type TopicRing struct { + Capacity uint64 + Mask uint64 + Ring []data.Envelope + + _ [56]byte + writeTail atomic.Uint64 + + _ [56]byte + cachedMinConsumer uint64 + + consumers atomic.Pointer[[]*ringConsumer] // Copy-on-Write slice + mu sync.Mutex + nextSubID uint64 +} + +// newTopicRing creates a TopicRing +// The capacity should be specified as a power-of-two (as the N in 2^N) +func newTopicRing(pow2 int) *TopicRing { + cap := uint64(1) + for cap < uint64(pow2) { + cap <<= 1 + } + t := &TopicRing{ + Capacity: cap, + Mask: cap - 1, + Ring: make([]data.Envelope, cap), + } + + empty := make([]*ringConsumer, 0) + t.consumers.Store(&empty) + return t +} + +// addConsumer registers a new subscriber on the ring +// The consumer starts at the current write tail +func (t *TopicRing) addConsumer() *ringConsumer { + t.mu.Lock() + defer t.mu.Unlock() + + t.nextSubID++ + c := &ringConsumer{ + ID: t.nextSubID, + notify: make(chan struct{}, 1), + } + // Start at the current write tail so we don't read historical data + c.Cursor.Store(t.writeTail.Load()) + + // Copy-on-write update + old := *t.consumers.Load() + newSubs := make([]*ringConsumer, len(old), len(old)+1) + copy(newSubs, old) + newSubs = append(newSubs, c) + t.consumers.Store(&newSubs) + + return c +} + +// publish appends one message to the ring and notifies subscribers (with the 'notify' channel) +// Assumes a single publisher per topic +func (t *TopicRing) publish(env data.Envelope) { + seq := t.writeTail.Load() // we expect only one publisher per topic + // in the case we do want more than one publisher, switch to using atomic.AddUint64 + + if seq-t.cachedMinConsumer >= t.Capacity { + t.enforceCapacity(seq) + } + + t.Ring[seq&t.Mask] = env + + t.writeTail.Store(seq + 1) + + subs := *t.consumers.Load() + for _, c := range subs { + select { + case c.notify <- struct{}{}: + default: + } + } +} + +// enforceCapacity 'evicts' consumers that have fallen beyond the ring capacity +func (t *TopicRing) enforceCapacity(targetSeq uint64) { + subs := *t.consumers.Load() + newMin := targetSeq + + for _, c := range subs { + if c.Dead.Load() { + continue + } + cCursor := c.Cursor.Load() + if targetSeq-cCursor >= t.Capacity { + c.Dead.Store(true) // Evict slow consumer + } else if cCursor < newMin { + newMin = cCursor + } + } + t.cachedMinConsumer = newMin +} + +// receive blocks until a new message is available, the consumer is evicted, or the context is cancelled +// Ordering is preserved per consumer (naturally) +func (t *TopicRing) receive(ctx context.Context, c *ringConsumer) (data.Envelope, error) { + for { + if c.Dead.Load() { + return data.Envelope{}, ErrDisconnected + } + + currentCursor := c.Cursor.Load() + availableTail := t.writeTail.Load() + + if currentCursor < availableTail { + env := t.Ring[currentCursor&t.Mask] + c.Cursor.Store(currentCursor + 1) + return env, nil + } + + select { + case <-ctx.Done(): + return data.Envelope{}, ctx.Err() + case <-c.notify: + } + } +} + +// tryReceive is a non-blocking variant of receive +// Returns immediately if no new data is available +func (t *TopicRing) tryReceive(c *ringConsumer) (data.Envelope, bool, error) { + if c.Dead.Load() { + return data.Envelope{}, false, ErrDisconnected + } + + currentCursor := c.Cursor.Load() + availableTail := t.writeTail.Load() + + if currentCursor >= availableTail { + return data.Envelope{}, false, nil + } + + env := t.Ring[currentCursor&t.Mask] + c.Cursor.Store(currentCursor + 1) + return env, true, nil +}