Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
102 commits
Select commit Hold shift + click to select a range
7c5e216
Created a cache for flatKV.
Mar 5, 2026
4a404ee
checkpoint
Mar 5, 2026
d36e825
incremental progress
Mar 5, 2026
2ccbe62
address feedback
Mar 5, 2026
f412e85
more fixes
Mar 5, 2026
e310037
bugfix
Mar 5, 2026
cf1071c
wire in cache
Mar 5, 2026
11232ff
Merge branch 'main' into cjl/flatkv-cache
Mar 6, 2026
a8c1c75
incremental improvements
Mar 6, 2026
221d114
checkin
Mar 6, 2026
8eca079
Moved where the cache sits
Mar 6, 2026
267feae
bugfix
Mar 6, 2026
50b0be6
Batch update the cache
Mar 6, 2026
2ca00d6
Add batch read to cache
Mar 6, 2026
8f8534a
Add batch get to db interface
Mar 6, 2026
23c0277
integrate batch reads
Mar 6, 2026
02d3ca1
wire in cache
Mar 6, 2026
7ee1b08
Introduce work pool, size caches differently
Mar 6, 2026
20c70c3
bugfix
Mar 6, 2026
b714789
Add unit constants
Mar 9, 2026
cc9d41d
refactor threading utils
Mar 9, 2026
53b2bd8
cleanup
Mar 9, 2026
c10e0cd
Cleanup, fix race condition
Mar 9, 2026
04f40fa
cleanup
Mar 9, 2026
b4e4d2c
cleanup
Mar 9, 2026
e53fefa
use pool
Mar 9, 2026
438fc8d
fix ctx lifecycle
Mar 9, 2026
19a8a19
Merge branch 'main' into cjl/flatkv-cache
Mar 9, 2026
23440f6
rename package
Mar 9, 2026
4ecc8fd
Clean up string copies
Mar 9, 2026
7a315c6
simplify gc
Mar 9, 2026
a3f3907
better error handling
Mar 9, 2026
f255b87
use config to configure cache params
Mar 9, 2026
cf0a73d
Allow flatkv config to be set in tests
Mar 9, 2026
0b34737
tweak config
Mar 10, 2026
452aa4d
incremental progress
Mar 10, 2026
1c804a8
move data dir into config
Mar 10, 2026
663b2ea
fix config file
Mar 10, 2026
bb530b5
cleanup
Mar 10, 2026
04daf75
move pebble metrics to proper location
Mar 10, 2026
354818e
clean up metrics
Mar 10, 2026
b1574ac
updated dashboard
Mar 10, 2026
07e071c
fix histograms
Mar 10, 2026
d090796
threading tests
Mar 10, 2026
dfd92c1
test lru queue
Mar 10, 2026
f751a9b
unit tests for shard
Mar 10, 2026
7b5538e
cache tests
Mar 10, 2026
dc8d0c9
moar unit tests
Mar 10, 2026
e9cc9ca
cleanup
Mar 10, 2026
c7a418c
Merge branch 'main' into cjl/flatkv-cache
Mar 10, 2026
087fd0f
Merge branch 'main' into cjl/flatkv-cache
Mar 10, 2026
eb9bc51
Merge branch 'main' into cjl/flatkv-cache
Mar 11, 2026
cea0ebb
unit test fixes
Mar 11, 2026
e58bec2
fix hash bug
Mar 11, 2026
c3f34b1
fixed path bug
Mar 11, 2026
111459f
Helper files for flatKV cache
Mar 11, 2026
d40395f
add missing struct
Mar 11, 2026
c8e85d2
Merge branch 'main' into cjl/cache-auxilery
Mar 12, 2026
ed7e4b6
made suggested changes
Mar 12, 2026
5c46647
fix tests
Mar 12, 2026
be0d4f5
Merge branch 'main' into cjl/flatkv-cache
Mar 12, 2026
9ff2199
Merge branch 'cjl/cache-auxilery' into cjl/flatkv-cache
Mar 12, 2026
bb2fe7e
Maded suggested change to cache structure
Mar 13, 2026
f4b8326
rename cache -> dbcache to avoid gitignore
Mar 13, 2026
4b2247b
Helper files for the flatKV cache implementation
Mar 13, 2026
36d7328
bugfix
Mar 13, 2026
d759a9b
Merge branch 'cjl/cache-auxilery-2' into cjl/flatkv-cache
Mar 16, 2026
4ba242b
fix merge problems
Mar 16, 2026
e19a998
refactor API
Mar 16, 2026
94ae673
made suggested changes
Mar 16, 2026
ed10a26
made suggested changes
Mar 16, 2026
81dfd46
fix bug
Mar 16, 2026
480839d
Merge branch 'main' into cjl/flatkv-cache
Mar 16, 2026
7835683
Implement a standard cache.
Mar 16, 2026
950197c
cleanup
Mar 16, 2026
cff96ab
Merge branch 'main' into cjl/cache-impl
Mar 17, 2026
003fcc9
made suggested changes
Mar 17, 2026
a208a1b
made suggested change
Mar 17, 2026
157a600
made suggested changes
Mar 17, 2026
b41639f
fix unit test
Mar 17, 2026
fe31475
fix unit test
Mar 17, 2026
0702197
Merge branch 'cjl/cache-impl' into cjl/flatkv-cache
Mar 17, 2026
6d435f5
Merge branch 'main' into cjl/flatkv-cache
Mar 20, 2026
64f8530
fixed merge bugs
Mar 20, 2026
d9c5fc1
fix teardown race
Mar 20, 2026
14593ec
Add logging metric, clean up log files before/after run
Mar 20, 2026
2d88076
fix unit test
Mar 20, 2026
ccad074
fix unit tests
Mar 20, 2026
38ffd35
fix unit test
Mar 20, 2026
f143d30
made suggested changes
Mar 25, 2026
a18fd93
config changes
Mar 25, 2026
34e711d
made suggested changes
Mar 25, 2026
b596f89
Merge branch 'main' into cjl/flatkv-cache
Mar 25, 2026
33378ce
bugfix
Mar 27, 2026
a402238
don't ignore errors from batch get
Mar 30, 2026
b18bc4e
Merge branch 'main' into cjl/flatkv-cache
Mar 30, 2026
ee30ca9
made suggested changes
Mar 31, 2026
c8fd5ec
Merge branch 'main' into cjl/flatkv-cache
Mar 31, 2026
8094375
Merge branch 'main' into cjl/flatkv-cache
Mar 31, 2026
faf4871
make suggested change to pool
Apr 1, 2026
9c8454f
Merge branch 'main' into cjl/flatkv-cache
Apr 1, 2026
872b0a6
fix merge problem
Apr 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions sei-db/common/utils/chan_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package utils

import (
"context"
"fmt"
)

// TODO unit test before merge

// Push to a channel, returning an error if the context is cancelled before the value is pushed.
func InterruptiblePush[T any](ctx context.Context, ch chan T, value T) error {
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled: %w", ctx.Err())
case ch <- value:
return nil
}
}

// Pull from a channel, returning an error if the context is cancelled before the value is pulled.
func InterruptiblePull[T any](ctx context.Context, ch <-chan T) (T, error) {
var zero T
select {
case <-ctx.Done():
return zero, fmt.Errorf("context cancelled: %w", ctx.Err())
case value, ok := <-ch:
if !ok {
return zero, fmt.Errorf("channel closed")
}
return value, nil
}
}
32 changes: 25 additions & 7 deletions sei-db/db_engine/pebbledb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,55 @@ package pebbledb

import (
"github.com/cockroachdb/pebble/v2"
"github.com/sei-protocol/sei-chain/sei-db/db_engine/pebbledb/flatcache"
"github.com/sei-protocol/sei-chain/sei-db/db_engine/types"
)

// pebbleBatch wraps a Pebble batch for atomic writes.
// Important: Callers must call Close() after Commit() to release batch resources,
// even if Commit() succeeds. Failure to Close() will leak memory.
type pebbleBatch struct {
b *pebble.Batch
b *pebble.Batch
cache flatcache.Cache

// Writes are tracked so the cache can be updated after a successful commit.
pendingCacheUpdates []flatcache.CacheUpdate
}

var _ types.Batch = (*pebbleBatch)(nil)

func newPebbleBatch(db *pebble.DB) *pebbleBatch {
return &pebbleBatch{b: db.NewBatch()}
func newPebbleBatch(db *pebble.DB, cache flatcache.Cache) *pebbleBatch {
return &pebbleBatch{b: db.NewBatch(), cache: cache}
}

func (p *pebbleDB) NewBatch() types.Batch {
return newPebbleBatch(p.db)
return newPebbleBatch(p.db, p.cache)
}

func (pb *pebbleBatch) Set(key, value []byte) error {
// Durability options are applied on Commit.
pb.pendingCacheUpdates = append(pb.pendingCacheUpdates, flatcache.CacheUpdate{
Key: key,
Value: value,
})
return pb.b.Set(key, value, nil)
}

func (pb *pebbleBatch) Delete(key []byte) error {
// Durability options are applied on Commit.
pb.pendingCacheUpdates = append(pb.pendingCacheUpdates, flatcache.CacheUpdate{
Key: key,
IsDelete: true,
})
return pb.b.Delete(key, nil)
}

func (pb *pebbleBatch) Commit(opts types.WriteOptions) error {
return pb.b.Commit(toPebbleWriteOpts(opts))
err := pb.b.Commit(toPebbleWriteOpts(opts))
if err != nil {
return err
}
pb.cache.BatchSet(pb.pendingCacheUpdates)
pb.pendingCacheUpdates = nil
return nil
}

func (pb *pebbleBatch) Len() int {
Expand All @@ -42,6 +59,7 @@ func (pb *pebbleBatch) Len() int {

func (pb *pebbleBatch) Reset() {
pb.b.Reset()
pb.pendingCacheUpdates = nil
}

func (pb *pebbleBatch) Close() error {
Expand Down
73 changes: 60 additions & 13 deletions sei-db/db_engine/pebbledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

errorutils "github.com/sei-protocol/sei-chain/sei-db/common/errors"
"github.com/sei-protocol/sei-chain/sei-db/common/metrics"
"github.com/sei-protocol/sei-chain/sei-db/db_engine/pebbledb/flatcache"
"github.com/sei-protocol/sei-chain/sei-db/db_engine/types"
)

Expand All @@ -23,6 +24,7 @@ const metricsScrapeInterval = 10 * time.Second
type pebbleDB struct {
db *pebble.DB
metricsCancel context.CancelFunc
cache flatcache.Cache
}

var _ types.KeyValueDB = (*pebbleDB)(nil)
Expand All @@ -44,11 +46,12 @@ func Open(
}
}

cache := pebble.NewCache(1024 * 1024 * 512) // 512MB cache
defer cache.Unref()
// Internal pebbleDB cache, used to cache pages in memory. // TODO verify accuracy of this statement
pebbleCache := pebble.NewCache(1024 * 1024 * 512) // 512MB cache
defer pebbleCache.Unref()

popts := &pebble.Options{
Cache: cache,
Cache: pebbleCache,
Comparer: cmp,
// FormatMajorVersion is pinned to a specific version to prevent accidental
// breaking changes when updating the pebble dependency. Using FormatNewest
Expand Down Expand Up @@ -92,33 +95,77 @@ func Open(
return nil, err
}

readFunction := func(key []byte) []byte { // TODO error handling!
val, closer, err := db.Get(key)
if err != nil {
return nil
}
cloned := bytes.Clone(val)
_ = closer.Close()
return cloned
}

// A high level cache per key.
cache, err := flatcache.NewCache(
ctx,
readFunction,
8,
1024*1024*1024,
20,
64,
10*time.Second)
if err != nil {
return nil, fmt.Errorf("failed to create flatcache: %w", err)
}

ctx, cancel := context.WithCancel(ctx)
if enableMetrics {
metrics.NewPebbleMetrics(ctx, db, filepath.Base(path), metricsScrapeInterval)
}

return &pebbleDB{db: db, metricsCancel: cancel}, nil
return &pebbleDB{
db: db,
metricsCancel: cancel,
cache: cache,
}, nil
}

func (p *pebbleDB) Get(key []byte) ([]byte, error) {
// Pebble returns a zero-copy view plus a closer; we copy and close internally.
val, closer, err := p.db.Get(key)
// // Pebble returns a zero-copy view plus a closer; we copy and close internally.
// val, closer, err := p.db.Get(key)
// if err != nil {
// if errors.Is(err, pebble.ErrNotFound) {
// return nil, errorutils.ErrNotFound
// }
// return nil, err
// }
// cloned := bytes.Clone(val)
// _ = closer.Close()

val, found, err := p.cache.Get(key, true)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, errorutils.ErrNotFound
}
return nil, err
return nil, fmt.Errorf("failed to get value from cache: %w", err)
}
if !found {
return nil, errorutils.ErrNotFound
}
cloned := bytes.Clone(val)
_ = closer.Close()
return cloned, nil

return val, nil
}

func (p *pebbleDB) BatchGet(keys map[string]types.BatchGetResult) {
p.cache.BatchGet(keys)
}

func (p *pebbleDB) Set(key, value []byte, opts types.WriteOptions) error {
// TODO batch set!
p.cache.Set(key, value)
return p.db.Set(key, value, toPebbleWriteOpts(opts))
}

func (p *pebbleDB) Delete(key []byte, opts types.WriteOptions) error {
// TODO batch delete!
p.cache.Delete(key)
return p.db.Delete(key, toPebbleWriteOpts(opts))
}

Expand Down
45 changes: 45 additions & 0 deletions sei-db/db_engine/pebbledb/flatcache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package flatcache // TODO rename the flatcache package!

import "github.com/sei-protocol/sei-chain/sei-db/db_engine/types"

// CacheUpdate describes a single key-value mutation to apply to the cache.
type CacheUpdate struct {
// The key to update.
Key []byte
// The value to set. If nil, the key will be deleted.
Value []byte
// If true, the key will be deleted.
// If false, the key will be set to the given value.
IsDelete bool
}

// Cache describes a cache kapable of being used by a FlatKV store.
type Cache interface {

// TODO decide if we should support individual modifications

// Get returns the value for the given key, or (nil, false) if not found.
Get(
// The entry to fetch.
key []byte,
// If true, the LRU queue will be updated. If false, the LRU queue will not be updated.
// Useful for when an operation is performed multiple times in close succession on the same key,
// since it requires non-zero overhead to do so with little benefit.
updateLru bool,
) ([]byte, bool, error)

// Perform a batch read operation. Given a map of keys to read, performs the reads and updates the
// map with the results.
//
// It is not thread safe to read or mutate the map while this method is running.
BatchGet(keys map[string]types.BatchGetResult)

// Set sets the value for the given key.
Set(key []byte, value []byte)

// Delete deletes the value for the given key.
Delete(key []byte)

// BatchSet applies the given updates to the cache.
BatchSet(updates []CacheUpdate)
}
Loading
Loading