Skip to content

Commit 7a2287a

Browse files
cody-littleyCody Littley
andauthored
Implement a standard cache. (#3077)
## Describe your changes and provide context This PR contains the standard cache implementation. It intentionally does not integrate this cache with FlatKV, that will be done in the following PR. ## Testing performed to validate your change Tested locally, run in benchmark for ~6 days so far. --------- Co-authored-by: Cody Littley <cody.littley@seinetwork.io>
1 parent cc96111 commit 7a2287a

8 files changed

Lines changed: 2515 additions & 0 deletions

File tree

sei-db/common/metrics/buckets.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package metrics
2+
3+
import "github.com/sei-protocol/sei-chain/sei-db/common/unit"
4+
5+
// Shared histogram bucket boundaries for use across the codebase.
6+
// The OTel defaults are too coarse for meaningful percentile queries in Grafana.
7+
8+
// LatencyBuckets covers 10μs to 5 minutes — wide enough for both fast key
9+
// lookups and slow compactions/flushes without needing per-metric tuning.
10+
var LatencyBuckets = []float64{
11+
0.00001, 0.000025, 0.00005, 0.0001, 0.00025, 0.0005, // 10μs–500μs
12+
0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, // 1ms–50ms
13+
0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 60, 120, 300, // 100ms–5min
14+
}
15+
16+
// ByteSizeBuckets covers 256B to 1GB for data size histograms.
17+
var ByteSizeBuckets = []float64{
18+
256, unit.KB, 4 * unit.KB, 16 * unit.KB, 64 * unit.KB, 256 * unit.KB,
19+
unit.MB, 4 * unit.MB, 16 * unit.MB, 64 * unit.MB, 256 * unit.MB, unit.GB,
20+
}
21+
22+
// CountBuckets covers 1 to 1M for per-operation step/iteration counts.
23+
var CountBuckets = []float64{
24+
1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 100000, 1000000,
25+
}

sei-db/common/threading/pool.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,12 @@ type Pool interface {
99
// If Submit is called concurrently with or after shutdown (i.e. when ctx is done/cancelled), the task may
1010
// be silently dropped. Callers that need a guarantee of execution must
1111
// ensure Submit happens-before shutdown.
12+
//
13+
// This method is permitted to return an error only under the following conditions:
14+
// - the pool is shutting down (i.e. its context is done/cancelled)
15+
// - the provided ctx parameter is done/cancelled before this method returns
16+
// - invalid input (e.g. the task is nil)
17+
//
18+
// If this method returns an error, the task may or may not have been executed.
1219
Submit(ctx context.Context, task func()) error
1320
}

sei-db/db_engine/dbcache/cache.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package dbcache
22

33
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/sei-protocol/sei-chain/sei-db/common/threading"
49
"github.com/sei-protocol/sei-chain/sei-db/db_engine/types"
510
)
611

@@ -22,6 +27,9 @@ type Reader func(key []byte) (value []byte, found bool, err error)
2227
// - the Reader method returns an error (for methods that accpet a Reader)
2328
// - the cache is shutting down
2429
// - the cache's work pools are shutting down
30+
//
31+
// Cache errors are are generally not recoverable, and it should be assumed that a cache that has returned an error
32+
// is in a corrupted state, and should be discarded.
2533
type Cache interface {
2634

2735
// Get returns the value for the given key, or (nil, false, nil) if not found.
@@ -64,6 +72,14 @@ type Cache interface {
6472
BatchSet(updates []CacheUpdate) error
6573
}
6674

75+
// DefaultEstimatedOverheadPerEntry is a rough estimate of the fixed heap overhead per cache entry
76+
// on a 64-bit architecture (amd64/arm64). It accounts for the shardEntry struct (48 B),
77+
// list.Element (48 B), lruQueueEntry (32 B), two map-entry costs (~64 B), string allocation
78+
// rounding (~16 B), and a margin for the duplicate key copy stored in the LRU. Derived from
79+
// static analysis of Go size classes and map bucket layout; validate experimentally for your
80+
// target platform.
81+
const DefaultEstimatedOverheadPerEntry uint64 = 250
82+
6783
// CacheUpdate describes a single key-value mutation to apply to the cache.
6884
type CacheUpdate struct {
6985
// The key to update.
@@ -76,3 +92,35 @@ type CacheUpdate struct {
7692
func (u *CacheUpdate) IsDelete() bool {
7793
return u.Value == nil
7894
}
95+
96+
// BuildCache creates a new Cache.
97+
func BuildCache(
98+
ctx context.Context,
99+
shardCount uint64,
100+
maxSize uint64,
101+
readPool threading.Pool,
102+
miscPool threading.Pool,
103+
estimatedOverheadPerEntry uint64,
104+
cacheName string,
105+
metricsScrapeInterval time.Duration,
106+
) (Cache, error) {
107+
108+
if maxSize == 0 {
109+
return NewNoOpCache(), nil
110+
}
111+
112+
cache, err := NewStandardCache(
113+
ctx,
114+
shardCount,
115+
maxSize,
116+
readPool,
117+
miscPool,
118+
estimatedOverheadPerEntry,
119+
cacheName,
120+
metricsScrapeInterval,
121+
)
122+
if err != nil {
123+
return nil, fmt.Errorf("failed to create cache: %w", err)
124+
}
125+
return cache, nil
126+
}
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
package dbcache
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"time"
8+
9+
"github.com/sei-protocol/sei-chain/sei-db/common/threading"
10+
"github.com/sei-protocol/sei-chain/sei-db/db_engine/types"
11+
)
12+
13+
var _ Cache = (*cache)(nil)
14+
15+
// A standard implementation of a flatcache.
16+
type cache struct {
17+
ctx context.Context
18+
19+
// A utility for assigning keys to shard indices.
20+
shardManager *shardManager
21+
22+
// The shards in the cache.
23+
shards []*shard
24+
25+
// A pool for asynchronous reads.
26+
readPool threading.Pool
27+
28+
// A pool for miscellaneous operations that are neither computationally intensive nor IO bound.
29+
miscPool threading.Pool
30+
}
31+
32+
// Creates a new Cache. If cacheName is non-empty, OTel metrics are enabled and the
33+
// background size scrape runs every metricsScrapeInterval.
34+
func NewStandardCache(
35+
ctx context.Context,
36+
// The number of shards in the cache. Must be a power of two and greater than 0.
37+
shardCount uint64,
38+
// The maximum size of the cache, in bytes.
39+
maxSize uint64,
40+
// A work pool for reading from the DB.
41+
readPool threading.Pool,
42+
// A work pool for miscellaneous operations that are neither computationally intensive nor IO bound.
43+
miscPool threading.Pool,
44+
// The estimated overhead per entry, in bytes. This is used to calculate the maximum size of the cache.
45+
// This value should be derived experimentally, and may differ between different builds and architectures.
46+
estimatedOverheadPerEntry uint64,
47+
// Name used as the "cache" attribute on metrics. Empty string disables metrics.
48+
cacheName string,
49+
// How often to scrape cache size for metrics. Ignored if cacheName is empty.
50+
metricsScrapeInterval time.Duration,
51+
) (Cache, error) {
52+
if shardCount == 0 || (shardCount&(shardCount-1)) != 0 {
53+
return nil, ErrNumShardsNotPowerOfTwo
54+
}
55+
if maxSize == 0 {
56+
return nil, fmt.Errorf("maxSize must be greater than 0")
57+
}
58+
59+
shardManager, err := newShardManager(shardCount)
60+
if err != nil {
61+
return nil, fmt.Errorf("failed to create shard manager: %w", err)
62+
}
63+
sizePerShard := maxSize / shardCount
64+
if sizePerShard == 0 {
65+
return nil, fmt.Errorf("maxSize must be greater than shardCount")
66+
}
67+
68+
shards := make([]*shard, shardCount)
69+
for i := uint64(0); i < shardCount; i++ {
70+
shards[i], err = NewShard(ctx, readPool, sizePerShard, estimatedOverheadPerEntry)
71+
if err != nil {
72+
return nil, fmt.Errorf("failed to create shard: %w", err)
73+
}
74+
}
75+
76+
c := &cache{
77+
ctx: ctx,
78+
shardManager: shardManager,
79+
shards: shards,
80+
readPool: readPool,
81+
miscPool: miscPool,
82+
}
83+
84+
if cacheName != "" {
85+
metrics := newCacheMetrics(ctx, cacheName, metricsScrapeInterval, c.getCacheSizeInfo)
86+
for _, s := range c.shards {
87+
s.metrics = metrics
88+
}
89+
}
90+
91+
return c, nil
92+
}
93+
94+
func (c *cache) getCacheSizeInfo() (bytes uint64, entries uint64) {
95+
for _, s := range c.shards {
96+
b, e := s.getSizeInfo()
97+
bytes += b
98+
entries += e
99+
}
100+
return bytes, entries
101+
}
102+
103+
func (c *cache) BatchSet(updates []CacheUpdate) error {
104+
// Sort entries by shard index so each shard is locked only once.
105+
shardMap := make(map[uint64][]CacheUpdate)
106+
for i := range updates {
107+
idx := c.shardManager.Shard(updates[i].Key)
108+
shardMap[idx] = append(shardMap[idx], updates[i])
109+
}
110+
111+
var wg sync.WaitGroup
112+
for shardIndex, shardEntries := range shardMap {
113+
wg.Add(1)
114+
err := c.miscPool.Submit(c.ctx, func() {
115+
defer wg.Done()
116+
c.shards[shardIndex].BatchSet(shardEntries)
117+
})
118+
if err != nil {
119+
return fmt.Errorf("failed to submit batch set: %w", err)
120+
}
121+
}
122+
wg.Wait()
123+
124+
return nil
125+
}
126+
127+
func (c *cache) BatchGet(read Reader, keys map[string]types.BatchGetResult) error {
128+
work := make(map[uint64]map[string]types.BatchGetResult)
129+
for key := range keys {
130+
idx := c.shardManager.Shard([]byte(key))
131+
if work[idx] == nil {
132+
work[idx] = make(map[string]types.BatchGetResult)
133+
}
134+
work[idx][key] = types.BatchGetResult{}
135+
}
136+
137+
var wg sync.WaitGroup
138+
for shardIndex, subMap := range work {
139+
wg.Add(1)
140+
141+
err := c.miscPool.Submit(c.ctx, func() {
142+
defer wg.Done()
143+
err := c.shards[shardIndex].BatchGet(read, subMap)
144+
if err != nil {
145+
for key := range subMap {
146+
subMap[key] = types.BatchGetResult{Error: err}
147+
}
148+
}
149+
})
150+
if err != nil {
151+
return fmt.Errorf("failed to submit batch get: %w", err)
152+
}
153+
}
154+
wg.Wait()
155+
156+
for _, subMap := range work {
157+
for key, result := range subMap {
158+
keys[key] = result
159+
}
160+
}
161+
162+
return nil
163+
}
164+
165+
func (c *cache) Delete(key []byte) {
166+
shardIndex := c.shardManager.Shard(key)
167+
shard := c.shards[shardIndex]
168+
shard.Delete(key)
169+
}
170+
171+
func (c *cache) Get(read Reader, key []byte, updateLru bool) ([]byte, bool, error) {
172+
shardIndex := c.shardManager.Shard(key)
173+
shard := c.shards[shardIndex]
174+
175+
value, ok, err := shard.Get(read, key, updateLru)
176+
if err != nil {
177+
return nil, false, fmt.Errorf("failed to get value from shard: %w", err)
178+
}
179+
if !ok {
180+
return nil, false, nil
181+
}
182+
return value, ok, nil
183+
}
184+
185+
func (c *cache) Set(key []byte, value []byte) {
186+
shardIndex := c.shardManager.Shard(key)
187+
shard := c.shards[shardIndex]
188+
189+
if value == nil {
190+
shard.Delete(key)
191+
} else {
192+
shard.Set(key, value)
193+
}
194+
}

0 commit comments

Comments
 (0)