Added performant in-process message broker to data's routing subpackage

This commit is contained in:
2026-02-25 02:35:52 +08:00
parent b3841f5647
commit 28d04e04e1
5 changed files with 540 additions and 247 deletions

View File

@@ -0,0 +1,56 @@
package routing
import "sync"
const DefaultRingCapacity = 1 << 8 // Best if power of 2 (or so I am told)
// Broker manages topics and issues publisher/subscriber handles.
type Broker struct {
mu sync.RWMutex
topics map[string]*TopicRing
}
func NewBroker() *Broker {
return &Broker{
topics: make(map[string]*TopicRing),
}
}
// getOrCreateRing handles the race condition where a subscriber might attach
// to a topic before the publisher has created it, and vice versa.
// This is because we allow either a publisher or a subscriber to 'create' the topic
func (b *Broker) getOrCreateRing(topicKey string, requestedCap int) *TopicRing {
b.mu.Lock()
defer b.mu.Unlock()
ring, exists := b.topics[topicKey]
if !exists {
cap := requestedCap
if cap <= 0 {
cap = DefaultRingCapacity
}
ring = newTopicRing(cap)
b.topics[topicKey] = ring
}
return ring
}
// RegisterPublisher returns a fast-path Publisher.
func (b *Broker) RegisterPublisher(topicKey string, capacity int) Publisher {
ring := b.getOrCreateRing(topicKey, capacity)
return &ringPublisher{
ring: ring,
}
}
// RegisterSubscriber attaches a consumer to a topic and returns a fast-path Subscriber.
// We don't allow subscribrs to specify a buffer capacity size.
// As a general rule, a publisher takes precedence over a subscriber
func (b *Broker) RegisterSubscriber(topicKey string) Subscriber {
ring := b.getOrCreateRing(topicKey, 0)
consumer := ring.addConsumer()
return &ringSubscriber{
ring: ring,
consumer: consumer,
}
}

View File

@@ -1,130 +0,0 @@
package routing
import (
"context"
"errors"
"sync"
"time"
"gitlab.michelsen.id/phillmichelsen/tessera/pkg/data"
)
type InprocRouter struct {
mu sync.RWMutex
streams map[data.StreamID]*inprocStream
}
func NewInprocRouter() *InprocRouter {
return &InprocRouter{
streams: make(map[data.StreamID]*inprocStream),
}
}
func (r *InprocRouter) OpenStream(id data.StreamID) (data.Stream, error) {
r.mu.Lock()
defer r.mu.Unlock()
s := r.streams[id]
if s != nil {
return s, nil
}
s = newInprocStream(id)
r.streams[id] = s
return s, nil
}
type inprocStream struct {
id data.StreamID
seq uint64
latest data.Envelope
streamClosed bool
mu sync.RWMutex
}
func newInprocStream(id data.StreamID) *inprocStream {
return &inprocStream{
id: id,
}
}
func (s *inprocStream) ID() data.StreamID {
return s.id
}
func (s *inprocStream) Sender() data.Sender {
return &inprocSender{stream: s}
}
func (s *inprocStream) Receiver() data.Receiver {
s.mu.RLock()
cur := s.seq
s.mu.RUnlock()
return &inprocReceiver{
stream: s,
lastSeenSeq: cur,
}
}
type inprocSender struct {
stream *inprocStream
}
func (tx *inprocSender) Send(ctx context.Context, env data.Envelope) error {
if err := ctx.Err(); err != nil {
return err
}
s := tx.stream
s.mu.Lock()
defer s.mu.Unlock()
if s.streamClosed {
return errors.New("stream closed")
}
env.SendTime = time.Now().UTC()
s.seq++
s.latest = env
return nil
}
func (tx *inprocSender) SendBatch(ctx context.Context, envs []data.Envelope) error {
panic("unimplemented")
}
type inprocReceiver struct {
stream *inprocStream
lastSeenSeq uint64
}
func (rx *inprocReceiver) TryReceive() (data.Envelope, bool, error) {
s := rx.stream
s.mu.RLock()
defer s.mu.RUnlock()
if s.streamClosed {
return data.Envelope{}, false, errors.New("stream closed")
}
if s.seq == 0 || s.seq == rx.lastSeenSeq {
return data.Envelope{}, false, nil
}
rx.lastSeenSeq = s.seq
return s.latest, true, nil
}
func (rx *inprocReceiver) ReceiveNext(ctx context.Context) (data.Envelope, error) {
panic("unimplemented")
}
func (rx *inprocReceiver) Seq() uint64 {
return rx.lastSeenSeq
}

View File

@@ -0,0 +1,25 @@
package routing
import (
"context"
"gitlab.michelsen.id/phillmichelsen/tessera/pkg/data"
)
// Publisher is the write-side handle given to data sources.
type Publisher interface {
Publish(env data.Envelope)
}
// Subscriber is the read-side handle given to consumers (data sinks).
type Subscriber interface {
// Receive blocks until a message is available or the context cancels.
// Best for general low-latency consumers that shouldn't burn CPU.
// Typically more than enough for most situations
Receive(ctx context.Context) (data.Envelope, error)
// TryReceive attempts to read one message lock-free.
// Returns (envelope, true, nil) if successful, or false if nothing is available.
// Polling TryReceive without a wait will most likely spike the CPU
TryReceive() (data.Envelope, bool, error)
}

201
pkg/data/routing/router.go Normal file
View File

@@ -0,0 +1,201 @@
// router.go
package routing
import (
"context"
"errors"
"sync"
"sync/atomic"
"gitlab.michelsen.id/phillmichelsen/tessera/pkg/data"
)
var ErrDisconnected = errors.New("subscriber disconnected: failed to consume fast enough")
// IMPLEMENTATIONS
// Implements the Publisher interface
type ringPublisher struct {
ring *TopicRing
}
func (p *ringPublisher) Publish(env data.Envelope) {
p.ring.publish(env)
}
// Implements the Subscriber interface
type ringSubscriber struct {
ring *TopicRing
consumer *ringConsumer
}
func (s *ringSubscriber) Receive(ctx context.Context) (data.Envelope, error) {
return s.ring.receive(ctx, s.consumer)
}
func (s *ringSubscriber) TryReceive() (data.Envelope, bool, error) {
return s.ring.tryReceive(s.consumer)
}
// ringConsumer represents a single subscriber's read state within a TopicRing
// The 56-byte pads are added to prevent 'False Sharing' due to 64-byte cache sizes
type ringConsumer struct {
ID uint64 // monotonically increasing identifier
_ [56]byte
Cursor atomic.Uint64 // next expected sequence number, advanced monotonically
_ [56]byte
Dead atomic.Bool // set true if the consumer has fallen behind ring capacity, consumer should be disconnected
_ [56]byte
notify chan struct{} // size-1 wakeup channel for subscribers to block whilst waiting for new data
}
// TopicRing is a broadcast ring buffer for a topic
// It is designed to be minimize locks, same 56-byte pads used here as well
// The publisher appends sequentially whilst each subscriber maintains its own cursor (ringConsumer)
// We typically aim for a capacity that is power-of-two sized for reasons beyond my knowledge
type TopicRing struct {
Capacity uint64
Mask uint64
Ring []data.Envelope
_ [56]byte
writeTail atomic.Uint64
_ [56]byte
cachedMinConsumer uint64
consumers atomic.Pointer[[]*ringConsumer] // Copy-on-Write slice
mu sync.Mutex
nextSubID uint64
}
// newTopicRing creates a TopicRing
// The capacity should be specified as a power-of-two (as the N in 2^N)
func newTopicRing(pow2 int) *TopicRing {
cap := uint64(1)
for cap < uint64(pow2) {
cap <<= 1
}
t := &TopicRing{
Capacity: cap,
Mask: cap - 1,
Ring: make([]data.Envelope, cap),
}
empty := make([]*ringConsumer, 0)
t.consumers.Store(&empty)
return t
}
// addConsumer registers a new subscriber on the ring
// The consumer starts at the current write tail
func (t *TopicRing) addConsumer() *ringConsumer {
t.mu.Lock()
defer t.mu.Unlock()
t.nextSubID++
c := &ringConsumer{
ID: t.nextSubID,
notify: make(chan struct{}, 1),
}
// Start at the current write tail so we don't read historical data
c.Cursor.Store(t.writeTail.Load())
// Copy-on-write update
old := *t.consumers.Load()
newSubs := make([]*ringConsumer, len(old), len(old)+1)
copy(newSubs, old)
newSubs = append(newSubs, c)
t.consumers.Store(&newSubs)
return c
}
// publish appends one message to the ring and notifies subscribers (with the 'notify' channel)
// Assumes a single publisher per topic
func (t *TopicRing) publish(env data.Envelope) {
seq := t.writeTail.Load() // we expect only one publisher per topic
// in the case we do want more than one publisher, switch to using atomic.AddUint64
if seq-t.cachedMinConsumer >= t.Capacity {
t.enforceCapacity(seq)
}
t.Ring[seq&t.Mask] = env
t.writeTail.Store(seq + 1)
subs := *t.consumers.Load()
for _, c := range subs {
select {
case c.notify <- struct{}{}:
default:
}
}
}
// enforceCapacity 'evicts' consumers that have fallen beyond the ring capacity
func (t *TopicRing) enforceCapacity(targetSeq uint64) {
subs := *t.consumers.Load()
newMin := targetSeq
for _, c := range subs {
if c.Dead.Load() {
continue
}
cCursor := c.Cursor.Load()
if targetSeq-cCursor >= t.Capacity {
c.Dead.Store(true) // Evict slow consumer
} else if cCursor < newMin {
newMin = cCursor
}
}
t.cachedMinConsumer = newMin
}
// receive blocks until a new message is available, the consumer is evicted, or the context is cancelled
// Ordering is preserved per consumer (naturally)
func (t *TopicRing) receive(ctx context.Context, c *ringConsumer) (data.Envelope, error) {
for {
if c.Dead.Load() {
return data.Envelope{}, ErrDisconnected
}
currentCursor := c.Cursor.Load()
availableTail := t.writeTail.Load()
if currentCursor < availableTail {
env := t.Ring[currentCursor&t.Mask]
c.Cursor.Store(currentCursor + 1)
return env, nil
}
select {
case <-ctx.Done():
return data.Envelope{}, ctx.Err()
case <-c.notify:
}
}
}
// tryReceive is a non-blocking variant of receive
// Returns immediately if no new data is available
func (t *TopicRing) tryReceive(c *ringConsumer) (data.Envelope, bool, error) {
if c.Dead.Load() {
return data.Envelope{}, false, ErrDisconnected
}
currentCursor := c.Cursor.Load()
availableTail := t.writeTail.Load()
if currentCursor >= availableTail {
return data.Envelope{}, false, nil
}
env := t.Ring[currentCursor&t.Mask]
c.Cursor.Store(currentCursor + 1)
return env, true, nil
}