2 Commits

12 changed files with 654 additions and 71 deletions

View File

@@ -1,3 +1,314 @@
package main package main
func main() {} import (
"context"
"fmt"
"math/rand"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"gitlab.michelsen.id/phillmichelsen/tessera/pkg/data"
"gitlab.michelsen.id/phillmichelsen/tessera/pkg/data/routing"
)
/*
Realistic-ish market-data style test.
Model:
- 1 publisher per topic (instrument / feed partition)
- Each message carries a strictly increasing sequence number (per topic)
- Subscribers validate in-order, gap-free delivery
- Publishers send with bursty pacing to approximate L3-ish behavior:
send BurstSize messages back-to-back, then sleep to maintain AvgRate.
Defaults are intentionally moderate. Increase topics/rates to stress.
*/
const (
NumTopics = 8 // topics/instruments/partitions
SubsPerTopic = 6 // fan-out per topic
RingCapacity = 1 << 14
TestDuration = 60 * time.Second
AvgRatePerTopic = 500_000 // msgs/sec per topic (average)
BurstSize = 512 // burst messages then sleep to preserve avg
// If true, subscribers spin-poll (TryReceive). If false, blocking Receive.
UseTryReceive = false
)
type topicStats struct {
published atomic.Uint64
}
type subStats struct {
received atomic.Uint64
errors atomic.Uint64
}
func main() {
fmt.Printf("Market-Data Routing Test\n")
fmt.Printf("Topics: %d | Subs/Topic: %d | Duration: %v\n", NumTopics, SubsPerTopic, TestDuration)
fmt.Printf("AvgRate/Topic: %d msg/s | BurstSize: %d | Mode: %s\n\n",
AvgRatePerTopic, BurstSize, modeName())
broker := routing.NewBroker()
topics := make([]string, NumTopics)
for i := 0; i < NumTopics; i++ {
topics[i] = fmt.Sprintf("FUT_L3_%02d", i)
}
// Create publishers first to size the rings.
pubs := make([]routing.Publisher, NumTopics)
for i := 0; i < NumTopics; i++ {
pubs[i] = broker.RegisterPublisher(topics[i], RingCapacity)
}
// Per-topic published counters (ground truth).
tStats := make([]*topicStats, NumTopics)
for i := range tStats {
tStats[i] = &topicStats{}
}
// Subscribers: attach evenly, validate ordering.
var subsWG sync.WaitGroup
sStats := make([][]*subStats, NumTopics) // [topic][sub]
for ti := 0; ti < NumTopics; ti++ {
sStats[ti] = make([]*subStats, SubsPerTopic)
for si := 0; si < SubsPerTopic; si++ {
sStats[ti][si] = &subStats{}
}
}
ctx, cancel := context.WithTimeout(context.Background(), TestDuration)
defer cancel()
start := time.Now()
for ti := 0; ti < NumTopics; ti++ {
topic := topics[ti]
for si := 0; si < SubsPerTopic; si++ {
sub := broker.RegisterSubscriber(topic)
stats := sStats[ti][si]
subsWG.Add(1)
go func(topicIndex int, subIndex int, subscriber routing.Subscriber, st *subStats) {
defer subsWG.Done()
var expected uint64 = 0
for {
if ctx.Err() != nil {
return
}
var (
env data.Envelope
ok bool
err error
)
if UseTryReceive {
env, ok, err = subscriber.TryReceive()
if err != nil {
st.errors.Add(1)
cancel()
return
}
if !ok {
runtime.Gosched()
continue
}
} else {
env, err = subscriber.Receive(ctx)
if err != nil {
// Context cancellation is normal at end of test.
if ctx.Err() != nil {
return
}
st.errors.Add(1)
cancel()
return
}
}
seq, parseOK := parseSeq(env)
if !parseOK {
st.errors.Add(1)
cancel()
return
}
if seq != expected {
// Out-of-order or gap detected.
st.errors.Add(1)
cancel()
return
}
expected++
st.received.Add(1)
}
}(ti, si, sub, stats)
}
}
// Publishers: bursty pacing to approximate “average rate with bursts”.
var pubsWG sync.WaitGroup
for ti := 0; ti < NumTopics; ti++ {
pub := pubs[ti]
stats := tStats[ti]
pubsWG.Add(1)
go func(topicIndex int, p routing.Publisher, st *topicStats) {
defer pubsWG.Done()
// Maintain AvgRatePerTopic as an average by sleeping after bursts.
// burstDuration = BurstSize / AvgRatePerTopic seconds
burstNs := int64(0)
if AvgRatePerTopic > 0 {
burstNs = int64(time.Second) * int64(BurstSize) / int64(AvgRatePerTopic)
}
if burstNs <= 0 {
burstNs = 1
}
var seq uint64 = 0
// Optional small jitter to avoid perfect lockstep across topics.
jitter := time.Duration(rand.Intn(200)) * time.Microsecond
nextBurstAt := time.Now().Add(jitter)
for {
if ctx.Err() != nil {
return
}
now := time.Now()
if now.Before(nextBurstAt) {
time.Sleep(nextBurstAt.Sub(now))
}
// Send BurstSize messages back-to-back.
sendTime := time.Now()
for i := 0; i < BurstSize; i++ {
if ctx.Err() != nil {
return
}
env := data.Envelope{
SendTime: sendTime,
Descriptor: data.Descriptor{Key: "SEQ"}, // keep your existing descriptor usage
Payload: formatSeq(seq),
// Any other fields you use can be set here.
}
p.Publish(env)
seq++
}
st.published.Add(uint64(BurstSize))
// Schedule next burst to maintain average rate.
nextBurstAt = nextBurstAt.Add(time.Duration(burstNs))
}
}(ti, pub, stats)
}
// Wait for timeout, then stop and drain.
<-ctx.Done()
// Ensure publishers exit.
pubsWG.Wait()
// Subscribers may still be blocked; cancel again and wait.
cancel()
subsWG.Wait()
totalTime := time.Since(start)
// Report.
var totalPublished uint64
var totalReceived uint64
var totalErrors uint64
for ti := 0; ti < NumTopics; ti++ {
pub := tStats[ti].published.Load()
totalPublished += pub
var topicRecv uint64
var topicErr uint64
for si := 0; si < SubsPerTopic; si++ {
topicRecv += sStats[ti][si].received.Load()
topicErr += sStats[ti][si].errors.Load()
}
totalReceived += topicRecv
totalErrors += topicErr
// Each subscriber should have received ~published for that topic.
avgPerSub := uint64(0)
if SubsPerTopic > 0 {
avgPerSub = topicRecv / uint64(SubsPerTopic)
}
fmt.Printf("Topic %s: published=%d | avg_received_per_sub=%d | sub_errors=%d\n",
topics[ti], pub, avgPerSub, topicErr)
}
pubRate := float64(totalPublished) / totalTime.Seconds()
deliveriesRate := float64(totalReceived) / totalTime.Seconds()
fmt.Printf("\nTotal Time: %v\n", totalTime)
fmt.Printf("Total Published: %d msgs\n", totalPublished)
fmt.Printf("Total Deliveries: %d (published * subs/topic, minus cancellations)\n", totalReceived)
fmt.Printf("Publish Rate: %.2f msgs/sec (aggregate)\n", pubRate)
fmt.Printf("Delivery Rate: %.2f deliveries/sec (aggregate)\n", deliveriesRate)
fmt.Printf("Validation Errors: %d\n", totalErrors)
if totalErrors == 0 {
fmt.Printf("Result: PASS (in-order, gap-free until cancellation)\n")
} else {
fmt.Printf("Result: FAIL (see errors above; test cancels on first detected issue)\n")
}
}
func modeName() string {
if UseTryReceive {
return "TryReceive (spin)"
}
return "Receive (blocking)"
}
// formatSeq encodes the per-topic sequence into a string payload.
// This compiles whether Envelope.Payload is string or interface{} accepting string.
func formatSeq(seq uint64) string {
// Keep it cheap to parse: decimal only.
return strconv.FormatUint(seq, 10)
}
func parseSeq(env data.Envelope) (uint64, bool) {
// If you later switch Payload to a structured type, change this accordingly.
s, ok := env.Payload.(string)
if !ok {
// If Payload is defined as string (not interface{}), remove this type assert and just use env.Payload.
// This branch is for interface{} payloads where non-string could appear.
return 0, false
}
// Fast path: no extra fields.
// If you later include pubID:seq, you can parse with strings.Cut.
if strings.IndexByte(s, ':') >= 0 {
_, right, ok := strings.Cut(s, ":")
if !ok {
return 0, false
}
s = right
}
v, err := strconv.ParseUint(s, 10, 64)
return v, err == nil
}

33
pkg/data/component.go Normal file
View File

@@ -0,0 +1,33 @@
package data
import "context"
type Component interface {
Start(ctx context.Context) error
Stop(ctx context.Context) error
ValidateConfig(cfg ComponentConfig) (ComponentConfigValidation, error)
ApplyConfig(cfg ComponentConfig) error
Status() ComponentStatus
}
type ComponentRuntime interface {
OpenStream(id StreamID) (Stream, error)
ReportError(fatal bool, err error)
}
type ComponentStatus struct {
Active bool
Patching bool
Config any
Error error
}
type ComponentConfig any
type ComponentConfigValidation struct {
Valid bool
RequiresRestart bool
Warnings []string
}

View File

@@ -1,2 +0,0 @@
// Package coordinator ...
package coordinator

View File

@@ -1,18 +0,0 @@
package data
import (
"context"
"github.com/google/uuid"
)
type Egress interface {
Name() string
Configure(cfg map[string]any) error
Connect(ctx context.Context, actions EgresActions) error
Disconnect(ctx context.Context) error
}
type EgresActions interface {
Subscribe(ctx context.Context, namespace string, id uuid.UUID) (<-chan Envelope, func(), error)
}

View File

@@ -1,34 +0,0 @@
package data
import (
"context"
"time"
"github.com/google/uuid"
)
type Ingress interface {
Name() string
Configure(cfg map[string]any) error
Connect(ctx context.Context, actions IngressActions) error
Disconnect(ctx context.Context) error
}
type IngressActions interface {
Emit(namespace string, id uuid.UUID, envelope Envelope) error
EmitBatch(namespace string, id uuid.UUID, envelopes []Envelope) error
}
type LiveIngress interface {
Subscribe(ctx context.Context, key ...string) error
Unsubscribe(ctx context.Context, key ...string) error
ListAvailableKeys(ctx context.Context) []string
IsValidKey(ctx context.Context, key string) bool
}
type HistoricalIngress interface {
Fetch(ctx context.Context, key string, start time.Time, end time.Time) ([]Envelope, error)
ListAvailableKeys(ctx context.Context) []string
IsValidKey(ctx context.Context, key string) bool
// some other method that gives the avalible time period for a key
}

View File

@@ -1,14 +0,0 @@
package data
import "context"
type Processor interface {
Name() string
Configure(cfg map[string]any) error
Run(ctx context.Context, actions ProcessorActions) error
}
type ProcessorActions interface {
IngressActions
EgresActions
}

View File

@@ -1,2 +0,0 @@
// Package router ...
package router

View File

@@ -0,0 +1,56 @@
package routing
import "sync"
const DefaultRingCapacity = 1 << 8 // Best if power of 2 (or so I am told)
// Broker manages topics and issues publisher/subscriber handles.
type Broker struct {
mu sync.RWMutex
topics map[string]*TopicRing
}
func NewBroker() *Broker {
return &Broker{
topics: make(map[string]*TopicRing),
}
}
// getOrCreateRing handles the race condition where a subscriber might attach
// to a topic before the publisher has created it, and vice versa.
// This is because we allow either a publisher or a subscriber to 'create' the topic
func (b *Broker) getOrCreateRing(topicKey string, requestedCap int) *TopicRing {
b.mu.Lock()
defer b.mu.Unlock()
ring, exists := b.topics[topicKey]
if !exists {
cap := requestedCap
if cap <= 0 {
cap = DefaultRingCapacity
}
ring = newTopicRing(cap)
b.topics[topicKey] = ring
}
return ring
}
// RegisterPublisher returns a fast-path Publisher.
func (b *Broker) RegisterPublisher(topicKey string, capacity int) Publisher {
ring := b.getOrCreateRing(topicKey, capacity)
return &ringPublisher{
ring: ring,
}
}
// RegisterSubscriber attaches a consumer to a topic and returns a fast-path Subscriber.
// We don't allow subscribrs to specify a buffer capacity size.
// As a general rule, a publisher takes precedence over a subscriber
func (b *Broker) RegisterSubscriber(topicKey string) Subscriber {
ring := b.getOrCreateRing(topicKey, 0)
consumer := ring.addConsumer()
return &ringSubscriber{
ring: ring,
consumer: consumer,
}
}

View File

@@ -0,0 +1,25 @@
package routing
import (
"context"
"gitlab.michelsen.id/phillmichelsen/tessera/pkg/data"
)
// Publisher is the write-side handle given to data sources.
type Publisher interface {
Publish(env data.Envelope)
}
// Subscriber is the read-side handle given to consumers (data sinks).
type Subscriber interface {
// Receive blocks until a message is available or the context cancels.
// Best for general low-latency consumers that shouldn't burn CPU.
// Typically more than enough for most situations
Receive(ctx context.Context) (data.Envelope, error)
// TryReceive attempts to read one message lock-free.
// Returns (envelope, true, nil) if successful, or false if nothing is available.
// Polling TryReceive without a wait will most likely spike the CPU
TryReceive() (data.Envelope, bool, error)
}

201
pkg/data/routing/router.go Normal file
View File

@@ -0,0 +1,201 @@
// router.go
package routing
import (
"context"
"errors"
"sync"
"sync/atomic"
"gitlab.michelsen.id/phillmichelsen/tessera/pkg/data"
)
var ErrDisconnected = errors.New("subscriber disconnected: failed to consume fast enough")
// IMPLEMENTATIONS
// Implements the Publisher interface
type ringPublisher struct {
ring *TopicRing
}
func (p *ringPublisher) Publish(env data.Envelope) {
p.ring.publish(env)
}
// Implements the Subscriber interface
type ringSubscriber struct {
ring *TopicRing
consumer *ringConsumer
}
func (s *ringSubscriber) Receive(ctx context.Context) (data.Envelope, error) {
return s.ring.receive(ctx, s.consumer)
}
func (s *ringSubscriber) TryReceive() (data.Envelope, bool, error) {
return s.ring.tryReceive(s.consumer)
}
// ringConsumer represents a single subscriber's read state within a TopicRing
// The 56-byte pads are added to prevent 'False Sharing' due to 64-byte cache sizes
type ringConsumer struct {
ID uint64 // monotonically increasing identifier
_ [56]byte
Cursor atomic.Uint64 // next expected sequence number, advanced monotonically
_ [56]byte
Dead atomic.Bool // set true if the consumer has fallen behind ring capacity, consumer should be disconnected
_ [56]byte
notify chan struct{} // size-1 wakeup channel for subscribers to block whilst waiting for new data
}
// TopicRing is a broadcast ring buffer for a topic
// It is designed to be minimize locks, same 56-byte pads used here as well
// The publisher appends sequentially whilst each subscriber maintains its own cursor (ringConsumer)
// We typically aim for a capacity that is power-of-two sized for reasons beyond my knowledge
type TopicRing struct {
Capacity uint64
Mask uint64
Ring []data.Envelope
_ [56]byte
writeTail atomic.Uint64
_ [56]byte
cachedMinConsumer uint64
consumers atomic.Pointer[[]*ringConsumer] // Copy-on-Write slice
mu sync.Mutex
nextSubID uint64
}
// newTopicRing creates a TopicRing
// The capacity should be specified as a power-of-two (as the N in 2^N)
func newTopicRing(pow2 int) *TopicRing {
cap := uint64(1)
for cap < uint64(pow2) {
cap <<= 1
}
t := &TopicRing{
Capacity: cap,
Mask: cap - 1,
Ring: make([]data.Envelope, cap),
}
empty := make([]*ringConsumer, 0)
t.consumers.Store(&empty)
return t
}
// addConsumer registers a new subscriber on the ring
// The consumer starts at the current write tail
func (t *TopicRing) addConsumer() *ringConsumer {
t.mu.Lock()
defer t.mu.Unlock()
t.nextSubID++
c := &ringConsumer{
ID: t.nextSubID,
notify: make(chan struct{}, 1),
}
// Start at the current write tail so we don't read historical data
c.Cursor.Store(t.writeTail.Load())
// Copy-on-write update
old := *t.consumers.Load()
newSubs := make([]*ringConsumer, len(old), len(old)+1)
copy(newSubs, old)
newSubs = append(newSubs, c)
t.consumers.Store(&newSubs)
return c
}
// publish appends one message to the ring and notifies subscribers (with the 'notify' channel)
// Assumes a single publisher per topic
func (t *TopicRing) publish(env data.Envelope) {
seq := t.writeTail.Load() // we expect only one publisher per topic
// in the case we do want more than one publisher, switch to using atomic.AddUint64
if seq-t.cachedMinConsumer >= t.Capacity {
t.enforceCapacity(seq)
}
t.Ring[seq&t.Mask] = env
t.writeTail.Store(seq + 1)
subs := *t.consumers.Load()
for _, c := range subs {
select {
case c.notify <- struct{}{}:
default:
}
}
}
// enforceCapacity 'evicts' consumers that have fallen beyond the ring capacity
func (t *TopicRing) enforceCapacity(targetSeq uint64) {
subs := *t.consumers.Load()
newMin := targetSeq
for _, c := range subs {
if c.Dead.Load() {
continue
}
cCursor := c.Cursor.Load()
if targetSeq-cCursor >= t.Capacity {
c.Dead.Store(true) // Evict slow consumer
} else if cCursor < newMin {
newMin = cCursor
}
}
t.cachedMinConsumer = newMin
}
// receive blocks until a new message is available, the consumer is evicted, or the context is cancelled
// Ordering is preserved per consumer (naturally)
func (t *TopicRing) receive(ctx context.Context, c *ringConsumer) (data.Envelope, error) {
for {
if c.Dead.Load() {
return data.Envelope{}, ErrDisconnected
}
currentCursor := c.Cursor.Load()
availableTail := t.writeTail.Load()
if currentCursor < availableTail {
env := t.Ring[currentCursor&t.Mask]
c.Cursor.Store(currentCursor + 1)
return env, nil
}
select {
case <-ctx.Done():
return data.Envelope{}, ctx.Err()
case <-c.notify:
}
}
}
// tryReceive is a non-blocking variant of receive
// Returns immediately if no new data is available
func (t *TopicRing) tryReceive(c *ringConsumer) (data.Envelope, bool, error) {
if c.Dead.Load() {
return data.Envelope{}, false, ErrDisconnected
}
currentCursor := c.Cursor.Load()
availableTail := t.writeTail.Load()
if currentCursor >= availableTail {
return data.Envelope{}, false, nil
}
env := t.Ring[currentCursor&t.Mask]
c.Cursor.Store(currentCursor + 1)
return env, true, nil
}

27
pkg/data/stream.go Normal file
View File

@@ -0,0 +1,27 @@
package data
import (
"context"
"github.com/google/uuid"
)
type StreamID uuid.UUID
type Stream interface {
ID() StreamID
Sender() Sender
Receiver() Receiver
}
type Sender interface {
Send(context.Context, Envelope) error
SendBatch(context.Context, []Envelope) error
}
type Receiver interface {
TryReceive() (Envelope, bool, error)
ReceiveNext(context.Context) (Envelope, error)
Seq() uint64
}