Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
be549af
math
pompon0 Jun 23, 2026
3f2a3b6
accountPool -> AccountPool
pompon0 Jun 23, 2026
6aa8cec
global GenerateN
pompon0 Jun 23, 2026
8d7f4cd
removed mutexes
pompon0 Jun 24, 2026
ce7019e
accounts in accountpool
pompon0 Jun 24, 2026
1eea5cd
account registry
pompon0 Jun 24, 2026
be075a1
wip
pompon0 Jun 24, 2026
da0e16e
wip
pompon0 Jun 24, 2026
e551828
wip
pompon0 Jun 24, 2026
aa86ad5
WIP
pompon0 Jun 24, 2026
d349c0b
WIP
pompon0 Jun 24, 2026
1aa66b6
fmt
pompon0 Jun 24, 2026
3bbfad6
removed scheduler
pompon0 Jun 24, 2026
c3e58da
removed more stuff
pompon0 Jun 24, 2026
480cf5e
generatorBuilder
pompon0 Jun 24, 2026
26389cc
reduced generators logic
pompon0 Jun 24, 2026
d836c10
WIP
pompon0 Jun 25, 2026
3deb622
WIP
pompon0 Jun 25, 2026
a967777
sender rewrite
pompon0 Jun 26, 2026
3a5d556
fmt
pompon0 Jun 26, 2026
73f1327
WIP
pompon0 Jun 26, 2026
5d350f5
missing file
pompon0 Jun 29, 2026
0f63a9e
WIP
pompon0 Jun 29, 2026
4e72c22
fmt
pompon0 Jun 29, 2026
2b3e4a8
test fixes
pompon0 Jun 29, 2026
0aa8e23
nonce reset test
pompon0 Jun 29, 2026
d95ad8f
e2e test
pompon0 Jun 29, 2026
0cd70a0
more cases
pompon0 Jun 29, 2026
d015fb5
TxWriter coverage
pompon0 Jun 29, 2026
00db69f
fmt
pompon0 Jun 29, 2026
91db965
removed utils/rng
pompon0 Jun 29, 2026
fe10e8c
fix
pompon0 Jun 29, 2026
34ba3fe
added MaxInFlight validation
pompon0 Jun 29, 2026
e7e136a
lint
pompon0 Jun 29, 2026
b83e882
test fixes
pompon0 Jun 29, 2026
03ffbd1
applied comments
pompon0 Jun 29, 2026
a28e7c0
applied comment
pompon0 Jun 29, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 10 additions & 38 deletions config/distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import (
"encoding/json"
"fmt"
"math"
"math/rand/v2"
mrand "math/rand/v2"
"sync"

"github.com/sei-protocol/sei-load/utils/rng"
)

var (
Expand All @@ -18,7 +16,7 @@ var (

// indexSampler draws an index in [0, n) from some keyspace distribution.
type indexSampler interface {
SampleIndex(n uint64) (uint64, error)
SampleIndex(rng *mrand.Rand, n uint64) (uint64, error)
}

// Distribution is a tagged keyspace index sampler selected by a "Name"
Expand All @@ -31,25 +29,13 @@ type Distribution struct {

func (d *Distribution) Name() string { return d.name }

// SetStream binds the sampler to a deterministic sub-stream (nil = unseeded
// global RNG); a zero-value Distribution draws nothing, so it no-ops. See
// package doc for the reproducibility contract.
func (d *Distribution) SetStream(s *rng.Stream) {
switch delegate := d.delegate.(type) {
case *UniformDistribution:
delegate.stream = s
case *ZipfianDistribution:
delegate.stream = s
}
}

// SampleIndex delegates to the selected sampler; a zero-value (no Name)
// Distribution returns 0.
func (d *Distribution) SampleIndex(n uint64) (uint64, error) {
func (d *Distribution) SampleIndex(rng *mrand.Rand, n uint64) (uint64, error) {
if d.delegate == nil {
return 0, nil
}
return d.delegate.SampleIndex(n)
return d.delegate.SampleIndex(rng, n)
}

func (d *Distribution) UnmarshalJSON(data []byte) error {
Expand All @@ -64,7 +50,7 @@ func (d *Distribution) UnmarshalJSON(data []byte) error {
case "":
return nil
case "uniform":
// No JSON parameters; the stream is bound later via SetStream.
// No JSON parameters; the PRNG is supplied at draw time.
d.delegate = &UniformDistribution{}
return nil
case "zipfian":
Expand All @@ -83,20 +69,13 @@ func (d *Distribution) UnmarshalJSON(data []byte) error {
}

// UniformDistribution draws each index with equal probability.
//
// copy-safe: holds no mutex; the *rng.Stream pointer aliases on copy.
type UniformDistribution struct {
stream *rng.Stream
}
type UniformDistribution struct{}

func (u *UniformDistribution) SampleIndex(n uint64) (uint64, error) {
func (u *UniformDistribution) SampleIndex(rng *mrand.Rand, n uint64) (uint64, error) {
if n == 0 {
return 0, fmt.Errorf("uniform sample: empty keyspace (n == 0)")
}
if u.stream != nil {
return u.stream.Uint64N(n), nil
}
return rand.Uint64N(n), nil
return rng.Uint64N(n), nil
}

// ZipfianDistribution is the YCSB precomputed-zeta generator: zeta(n, theta) is
Expand All @@ -107,8 +86,6 @@ func (u *UniformDistribution) SampleIndex(n uint64) (uint64, error) {
type ZipfianDistribution struct {
Theta float64 `json:"theta"`

stream *rng.Stream

mu sync.Mutex
state *zipfState // memoized for state.n; recomputed when n changes.
}
Expand Down Expand Up @@ -169,7 +146,7 @@ func (z *ZipfianDistribution) validate() error {
// SampleIndex draws a Zipf-skewed index in [0, n). n must be stable per sampler:
// the zeta cache is keyed on n, so a changing n recomputes O(n) every draw. See
// package doc.
func (z *ZipfianDistribution) SampleIndex(n uint64) (uint64, error) {
func (z *ZipfianDistribution) SampleIndex(rng *mrand.Rand, n uint64) (uint64, error) {
if n == 0 {
return 0, fmt.Errorf("zipfian sample: empty keyspace (n == 0)")
}
Expand All @@ -181,12 +158,7 @@ func (z *ZipfianDistribution) SampleIndex(n uint64) (uint64, error) {
st := z.state
z.mu.Unlock()

var u float64
if z.stream != nil {
u = z.stream.Float64()
} else {
u = rand.Float64()
}
u := rng.Float64()
uz := u * st.zetaN
if uz < 1.0 {
return 0, nil
Expand Down
56 changes: 22 additions & 34 deletions config/distribution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,27 @@ package config_test
import (
"encoding/json"
"fmt"
mrand "math/rand/v2"
"os"
"path/filepath"
"testing"
"time"

"github.com/sei-protocol/sei-load/config"
"github.com/sei-protocol/sei-load/utils/rng"
"github.com/stretchr/testify/require"
)

func newDistributionTestRng(seed uint64) *mrand.Rand {
return mrand.New(mrand.NewPCG(seed, seed^0x9e3779b97f4a7c15))
}

func TestDistribution(t *testing.T) {
t.Parallel()
t.Run("empty", func(t *testing.T) {
var subject config.Distribution
require.NoError(t, subject.UnmarshalJSON([]byte(`{}`)))
require.Empty(t, subject.Name())
idx, err := subject.SampleIndex(100)
idx, err := subject.SampleIndex(newDistributionTestRng(1), 100)
require.NoError(t, err)
require.Zero(t, idx)
})
Expand Down Expand Up @@ -55,13 +59,12 @@ func distribution(t *testing.T, raw string) *config.Distribution {
return &d
}

// sample binds d to stream and pulls count draws over keyspace n.
func sample(t *testing.T, d *config.Distribution, s *rng.Stream, n uint64, count int) []uint64 {
// sample binds d to a PRNG and pulls count draws over keyspace n.
func sample(t *testing.T, d *config.Distribution, rng *mrand.Rand, n uint64, count int) []uint64 {
t.Helper()
d.SetStream(s)
out := make([]uint64, count)
for i := range out {
v, err := d.SampleIndex(n)
v, err := d.SampleIndex(rng, n)
require.NoError(t, err)
require.Less(t, v, n, "draw out of range [0, n)")
out[i] = v
Expand All @@ -74,7 +77,7 @@ func sample(t *testing.T, d *config.Distribution, s *rng.Stream, n uint64, count
func TestSampleIndexEmptyKeyspace(t *testing.T) {
t.Parallel()
for _, raw := range []string{`{"Name":"uniform"}`, `{"Name":"zipfian","theta":0.9}`} {
_, err := distribution(t, raw).SampleIndex(0)
_, err := distribution(t, raw).SampleIndex(newDistributionTestRng(1), 0)
require.Error(t, err, raw)
}
}
Expand All @@ -85,27 +88,12 @@ func TestSampleIndexDeterminism(t *testing.T) {
t.Parallel()
const seed, n, count = 99, 1000, 256
for _, raw := range []string{`{"Name":"uniform"}`, `{"Name":"zipfian","theta":0.8}`} {
a := sample(t, distribution(t, raw), rng.NewSource(seed).Stream(rng.KeyDistributionStream(0)), n, count)
b := sample(t, distribution(t, raw), rng.NewSource(seed).Stream(rng.KeyDistributionStream(0)), n, count)
a := sample(t, distribution(t, raw), newDistributionTestRng(seed), n, count)
b := sample(t, distribution(t, raw), newDistributionTestRng(seed), n, count)
require.Equal(t, a, b, "same seed must reproduce the draw sequence: %s", raw)
}
}

// TestSampleIndexSeededDiffersFromUnseeded guards the binding the way
// TestRandomGasPickerStreamSeeds does for gas: a bound sampler draws
// seed-determined values that differ from the unseeded global RNG path. If a
// refactor silently broke the binding, the seeded and unseeded sequences would
// match by accident only with probability ~0.
func TestSampleIndexSeededDiffersFromUnseeded(t *testing.T) {
t.Parallel()
const seed, n, count = 7, 1000, 128
for _, raw := range []string{`{"Name":"uniform"}`, `{"Name":"zipfian","theta":0.8}`} {
seeded := sample(t, distribution(t, raw), rng.NewSource(seed).Stream(rng.KeyDistributionStream(0)), n, count)
unseeded := sample(t, distribution(t, raw), nil, n, count)
require.NotEqual(t, seeded, unseeded, "seeded draws must differ from the unseeded global RNG: %s", raw)
}
}

// TestUniformIsUniform: a chi-square goodness-of-fit test over evenly-sized
// buckets. With B buckets and N draws the statistic should sit well under the
// upper critical value; a badly skewed "uniform" would blow far past it.
Expand All @@ -114,7 +102,7 @@ func TestUniformIsUniform(t *testing.T) {
const n, buckets, perBucket = 1000, 20, 5000
const draws = buckets * perBucket // 100k draws, expected 5k per bucket.

got := sample(t, distribution(t, `{"Name":"uniform"}`), rng.NewSource(1).Stream("x"), n, draws)
got := sample(t, distribution(t, `{"Name":"uniform"}`), newDistributionTestRng(1), n, draws)
counts := make([]float64, buckets)
width := uint64(n / buckets)
for _, v := range got {
Expand All @@ -140,7 +128,7 @@ func TestZipfianSkewRisesWithTheta(t *testing.T) {

topKMass := func(theta float64) float64 {
raw := fmt.Sprintf(`{"Name":"zipfian","theta":%v}`, theta)
got := sample(t, distribution(t, raw), rng.NewSource(5).Stream("x"), n, draws)
got := sample(t, distribution(t, raw), newDistributionTestRng(5), n, draws)
var hot int
for _, v := range got {
if v < topK {
Expand Down Expand Up @@ -176,17 +164,17 @@ func TestZipfianInitCostBounded(t *testing.T) {
t.Parallel()
const n = 1_000_000
d := distribution(t, `{"Name":"zipfian","theta":0.99}`)
d.SetStream(rng.NewSource(1).Stream("x"))
rand := newDistributionTestRng(1)

// Warmup outside the timer: pay the one-time O(n) zeta precompute here so the
// timed window measures only steady-state per-draw cost.
warm, err := d.SampleIndex(n)
warm, err := d.SampleIndex(rand, n)
require.NoError(t, err)
require.Less(t, warm, uint64(n))

start := time.Now()
for i := 0; i < 1000; i++ {
v, err := d.SampleIndex(n)
v, err := d.SampleIndex(rand, n)
require.NoError(t, err)
require.Less(t, v, uint64(n))
}
Expand All @@ -202,18 +190,18 @@ func TestZipfianInitCostBounded(t *testing.T) {
func TestZipfianRecomputesOnNChange(t *testing.T) {
t.Parallel()
d := distribution(t, `{"Name":"zipfian","theta":0.9}`)
d.SetStream(rng.NewSource(1).Stream("x"))
rand := newDistributionTestRng(1)

// Same seed + same draw index against two different keyspaces: if the cache
// ignored the n change, the second n would reuse the first's zetaN/eta and the
// draw could fall outside [0, n2). The in-range check is the recompute witness.
const n1, n2 = 1_000_000, 10
v1, err := d.SampleIndex(n1)
v1, err := d.SampleIndex(rand, n1)
require.NoError(t, err)
require.Less(t, v1, uint64(n1))

for i := 0; i < 1000; i++ {
v2, err := d.SampleIndex(n2)
v2, err := d.SampleIndex(rand, n2)
require.NoError(t, err)
require.Less(t, v2, uint64(n2), "draw must be in [0, n2) after n change; stale cache would overshoot")
}
Expand All @@ -228,9 +216,9 @@ func TestZipfianNoNaNAcrossThetaRange(t *testing.T) {
for _, n := range []uint64{2, 3, 100, 1000} {
raw := fmt.Sprintf(`{"Name":"zipfian","theta":%v}`, theta)
d := distribution(t, raw)
d.SetStream(rng.NewSource(1).Stream("x"))
rand := newDistributionTestRng(1)
for i := 0; i < 100; i++ {
v, err := d.SampleIndex(n)
v, err := d.SampleIndex(rand, n)
require.NoError(t, err)
// v is a uint64 index; the in-range check is the real guard that
// the internal zeta/eta math never produced a bad (NaN-derived) draw.
Expand Down
21 changes: 5 additions & 16 deletions config/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
//
// A Distribution is a tagged sampler that draws an index in [0, n) from some
// keyspace distribution (see distribution.go). It is selected on the JSON wire
// by a "Name" discriminator and bound at run time to a deterministic
// pseudo-random sub-stream (utils/rng) so that two runs at the same seed draw
// the same multiset of indices.
// by a "Name" discriminator and bound at run time to an explicit seeded PRNG so
// that two runs at the same seed draw the same sequence of indices.
//
// # Wire format (FROZEN one-way door)
//
Expand Down Expand Up @@ -81,17 +80,7 @@
//
// # Seeded-stream reproducibility (FROZEN inputs)
//
// Draws go through a bound *rng.Stream (see SetStream): a per-scenario
// substream derived from the run seed. This is what gives the workload its
// reproducibility contract — same seed + same config yields the same per-stream
// draw multiset (see package utils/rng for the precise contract and its limits
// above one worker).
//
// The stream ids feeding the samplers — "dist:%d:key" and "dist:%d:size" — are
// FROZEN, append-only inputs: a stream id is hashed to seed its sub-stream, so
// renaming one reseeds that stream and invalidates every saved replay for the
// same config_sha256. New ids may be added (they hash to their own sub-streams
// and do not perturb existing ones); existing ids must never be renamed. The
// canonical list and the full frozen-derivation note live in
// utils/rng/streams.go and utils/rng/rng.go — this is a one-way door.
// Draws go through an explicitly supplied *rand.Rand seeded from the run seed.
// This is what gives the workload its reproducibility contract: same seed +
// same config yields the same draw sequence for the same call order.
package config
33 changes: 7 additions & 26 deletions config/gas.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package config
import (
"encoding/json"
"fmt"
"math/rand/v2"

"github.com/sei-protocol/sei-load/utils/rng"
mrand "math/rand/v2"
)

var (
Expand All @@ -15,7 +13,7 @@ var (
)

type gasGenerator interface {
GenerateGas() (uint64, error)
GenerateGas(rng *mrand.Rand) (uint64, error)
}

type GasPicker struct {
Expand All @@ -25,23 +23,11 @@ type GasPicker struct {

func (g *GasPicker) Name() string { return g.name }

// SetStream binds the picker's random delegate to a deterministic sub-stream. A
// nil stream leaves the picker on the unseeded global RNG.
//
// Only a random delegate has anything to seed: fixed and empty pickers draw no
// randomness, so the type assertion intentionally no-ops for them rather than
// erroring.
func (g *GasPicker) SetStream(s *rng.Stream) {
if r, ok := g.delegate.(*RandomGasGenerator); ok {
r.stream = s
}
}

func (g *GasPicker) GenerateGas() (uint64, error) {
func (g *GasPicker) GenerateGas(rng *mrand.Rand) (uint64, error) {
if g.delegate == nil {
return 0, nil
}
return g.delegate.GenerateGas()
return g.delegate.GenerateGas(rng)
}

func (g *GasPicker) UnmarshalJSON(data []byte) error {
Expand Down Expand Up @@ -78,24 +64,19 @@ type FixedGasGenerator struct {
Gas uint64 `json:"Gas"`
}

func (f *FixedGasGenerator) GenerateGas() (uint64, error) {
func (f *FixedGasGenerator) GenerateGas(rng *mrand.Rand) (uint64, error) {
return f.Gas, nil
}

type RandomGasGenerator struct {
Min uint64 `json:"Min"`
Max uint64 `json:"Max"`

stream *rng.Stream
}

func (r *RandomGasGenerator) GenerateGas() (uint64, error) {
func (r *RandomGasGenerator) GenerateGas(rng *mrand.Rand) (uint64, error) {
if r.Min >= r.Max {
return 0, fmt.Errorf("invalid random gas range: min %d must be less than max %d", r.Min, r.Max)
}
span := r.Max - r.Min + 1
if r.stream != nil {
return r.Min + r.stream.Uint64N(span), nil
}
return r.Min + rand.Uint64N(span), nil
return r.Min + rng.Uint64N(span), nil
}
Loading
Loading