Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ require (
github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20251001021608-1fe7b43fc4d6 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.20.0 // indirect
github.com/casbin/govaluate v1.8.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/consensys/gnark-crypto v0.19.2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.7 // indirect
Expand Down Expand Up @@ -74,7 +75,7 @@ require (
github.com/itchyny/timefmt-go v0.1.7 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/kilic/bls12-381 v0.1.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.9 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/mailru/easyjson v0.9.0 // indirect
github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
Expand All @@ -83,7 +84,8 @@ require (
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/ncruces/go-strftime v0.1.9 // indirect
github.com/pk910/dynamic-ssz v0.0.5 // indirect
github.com/pk910/dynamic-ssz v1.2.1 // indirect
github.com/pk910/hashtree-bindings v0.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.66.1 // indirect
Expand Down Expand Up @@ -115,10 +117,11 @@ require (
golang.org/x/tools v0.38.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/protobuf v1.36.11 // indirect
gopkg.in/Knetic/govaluate.v3 v3.0.0 // indirect
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
modernc.org/libc v1.66.3 // indirect
modernc.org/mathutil v1.7.1 // indirect
modernc.org/memory v1.11.0 // indirect
modernc.org/sqlite v1.38.2 // indirect
)

replace github.com/attestantio/go-eth2-client => github.com/pk910/go-eth2-client v0.0.0-20260219114320-6080c2df7e2f
18 changes: 10 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20251001021608
github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20251001021608-1fe7b43fc4d6/go.mod h1:ioLG6R+5bUSO1oeGSDxOV3FADARuMoytZCSX6MEMQkI=
github.com/VictoriaMetrics/fastcache v1.13.0 h1:AW4mheMR5Vd9FkAPUv+NH6Nhw+fmbTMGMsNAoA/+4G0=
github.com/VictoriaMetrics/fastcache v1.13.0/go.mod h1:hHXhl4DA2fTL2HTZDJFXWgW0LNjo6B+4aj2Wmng3TjU=
github.com/attestantio/go-eth2-client v0.28.0 h1:2zIIIMPvSD+g6h3TgVXsoda/Yw3e+wjo1e8CZEanORU=
github.com/attestantio/go-eth2-client v0.28.0/go.mod h1:PO9sHFCq+1RiG+Eh3eOR2GYvYV64Qzg7idM3kLgCs5k=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bits-and-blooms/bitset v1.20.0 h1:2F+rfL86jE2d/bmw7OhqUg2Sj/1rURkBn3MdfoPyRVU=
github.com/bits-and-blooms/bitset v1.20.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/casbin/govaluate v1.8.0 h1:1dUaV/I0LFP2tcY1uNQEb6wBCbp8GMTcC/zhwQDWvZo=
github.com/casbin/govaluate v1.8.0/go.mod h1:G/UnbIjZk/0uMNaLwZZmFQrR72tYRZWQkO70si/iR7A=
github.com/cespare/cp v0.1.0 h1:SE+dxFebS7Iik5LK0tsi1k9ZCxEaFX4AjQmoyA+1dJk=
github.com/cespare/cp v0.1.0/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
Expand Down Expand Up @@ -192,8 +192,8 @@ github.com/kilic/bls12-381 v0.1.0 h1:encrdjqKMEvabVQ7qYOKu1OvhqpK4s47wDYtNiPtlp4
github.com/kilic/bls12-381 v0.1.0/go.mod h1:vDTTHJONJ6G+P2R74EhnyotQDTliQDnFEwhdmfzw1ig=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/klauspost/cpuid/v2 v2.2.9 h1:66ze0taIn2H33fBvCkXuv9BmCwDfafmiIVpKV9kKGuY=
github.com/klauspost/cpuid/v2 v2.2.9/go.mod h1:rqkxqrZ1EhYM9G+hXH7YdowN5R5RGN6NK4QwQ3WMXF8=
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
Expand Down Expand Up @@ -251,8 +251,12 @@ github.com/pion/transport/v2 v2.2.1 h1:7qYnCBlpgSJNYMbLCKuSY9KbQdBFoETvPNETv0y4N
github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g=
github.com/pion/transport/v3 v3.0.1 h1:gDTlPJwROfSfz6QfSi0ZmeCSkFcnWWiiR9ES0ouANiM=
github.com/pion/transport/v3 v3.0.1/go.mod h1:UY7kiITrlMv7/IKgd5eTUcaahZx5oUN3l9SzK5f5xE0=
github.com/pk910/dynamic-ssz v0.0.5 h1:VP9heGYUwzlpyhk28P2nCAzhvGsePJOOOO5vQMDh2qQ=
github.com/pk910/dynamic-ssz v0.0.5/go.mod h1:b6CrLaB2X7pYA+OSEEbkgXDEcRnjLOZIxZTsMuO/Y9c=
github.com/pk910/dynamic-ssz v1.2.1 h1:84eNMiiOYDiNC2Y1m5A/UtIPs6u/9SsvG4RVSBRGE5U=
github.com/pk910/dynamic-ssz v1.2.1/go.mod h1:HXRWLNcgj3DL65Kznrb+RdL3DEKw2JBZ/6crooqGoII=
github.com/pk910/go-eth2-client v0.0.0-20260219114320-6080c2df7e2f h1:OjX1YemePnLes4JqeDFIJOZN9YPCc4R6cyvyXFtVA6c=
github.com/pk910/go-eth2-client v0.0.0-20260219114320-6080c2df7e2f/go.mod h1:8fpxrIBBVbOcVG3vcHe5ubOHIeqW3N5t7kS4oU5EeJU=
github.com/pk910/hashtree-bindings v0.0.1 h1:Sw+UlPlrBle4LUg04kqLFybVQcfmamwKL1QsrR3GU0g=
github.com/pk910/hashtree-bindings v0.0.1/go.mod h1:eayIpxMFkWzMsydESu/5bV8wglZzSE/c9mq6DQdn204=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down Expand Up @@ -412,8 +416,6 @@ golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSm
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/Knetic/govaluate.v3 v3.0.0 h1:18mUyIt4ZlRlFZAAfVetz4/rzlJs9yhN+U02F4u1AOc=
gopkg.in/Knetic/govaluate.v3 v3.0.0/go.mod h1:csKLBORsPbafmSCGTEh3U7Ozmsuq8ZSIlKk1bcqph0E=
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
67 changes: 47 additions & 20 deletions pkg/clients/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"runtime/debug"
"time"

"github.com/attestantio/go-eth2-client/spec"
"github.com/ethereum/go-ethereum/common"
"github.com/ethpandaops/assertoor/pkg/clients/consensus"
"github.com/ethpandaops/assertoor/pkg/clients/execution"
Expand Down Expand Up @@ -123,35 +124,61 @@ func (pool *ClientPool) processConsensusBlockNotification(poolClient *PoolClient
}
}()

subscription := poolClient.ConsensusClient.SubscribeBlockEvent(100)
defer subscription.Unsubscribe()
blockSubscription := poolClient.ConsensusClient.SubscribeBlockEvent(100)
defer blockSubscription.Unsubscribe()

payloadSubscription := pool.consensusPool.GetBlockCache().SubscribePayloadEvent(100)
defer payloadSubscription.Unsubscribe()

for {
select {
case <-pool.ctx.Done():
return
case block := <-subscription.Channel():
versionedBlock := block.AwaitBlock(context.Background(), 2*time.Second)
if versionedBlock == nil {
pool.logger.Warnf("cl/el block notification failed: AwaitBlock timeout (client: %v, slot: %v, root: 0x%x)", poolClient.Config.Name, block.Slot, block.Root)
break
}
case block := <-blockSubscription.Channel():
pool.notifyELBlockFromBeaconBlock(poolClient, block)
case block := <-payloadSubscription.Channel():
pool.notifyELBlockFromPayload(poolClient, block)
}
}
}

hash, err := versionedBlock.ExecutionBlockHash()
if err != nil {
pool.logger.Warnf("cl/el block notification failed: %s (client: %v, slot: %v, root: 0x%x)", err, poolClient.Config.Name, block.Slot, block.Root)
break
}
func (pool *ClientPool) notifyELBlockFromBeaconBlock(poolClient *PoolClient, block *consensus.Block) {
versionedBlock := block.AwaitBlock(context.Background(), 2*time.Second)
if versionedBlock == nil {
pool.logger.Warnf("cl/el block notification failed: AwaitBlock timeout (client: %v, slot: %v, root: 0x%x)", poolClient.Config.Name, block.Slot, block.Root)
return
}

number, err := versionedBlock.ExecutionBlockNumber()
if err != nil {
pool.logger.Warnf("cl/el block notification failed: %s (client: %v, slot: %v, root: 0x%x)", err, poolClient.Config.Name, block.Slot, block.Root)
break
}
// For gloas+ blocks, EL info comes from the payload, not the block body
if versionedBlock.Version >= spec.DataVersionGloas {
return
}

poolClient.ExecutionClient.NotifyNewBlock(common.Hash(hash), number)
}
hash, err := versionedBlock.ExecutionBlockHash()
if err != nil {
pool.logger.Warnf("cl/el block notification failed: %s (client: %v, slot: %v, root: 0x%x)", err, poolClient.Config.Name, block.Slot, block.Root)
return
}

number, err := versionedBlock.ExecutionBlockNumber()
if err != nil {
pool.logger.Warnf("cl/el block notification failed: %s (client: %v, slot: %v, root: 0x%x)", err, poolClient.Config.Name, block.Slot, block.Root)
return
}

poolClient.ExecutionClient.NotifyNewBlock(common.Hash(hash), number)
}

func (pool *ClientPool) notifyELBlockFromPayload(poolClient *PoolClient, block *consensus.Block) {
payload := block.GetPayload()
if payload == nil || payload.Message == nil || payload.Message.Payload == nil {
return
}

hash := common.Hash(payload.Message.Payload.BlockHash)
number := payload.Message.Payload.BlockNumber

poolClient.ExecutionClient.NotifyNewBlock(hash, number)
}

func (pool *ClientPool) GetConsensusPool() *consensus.Pool {
Expand Down
68 changes: 58 additions & 10 deletions pkg/clients/consensus/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,24 @@ import (
"time"

"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/gloas"
"github.com/attestantio/go-eth2-client/spec/phase0"
)

type Block struct {
Root phase0.Root
Slot phase0.Slot
headerMutex sync.Mutex
headerChan chan bool
header *phase0.SignedBeaconBlockHeader
blockMutex sync.Mutex
blockChan chan bool
block *spec.VersionedSignedBeaconBlock
seenMutex sync.RWMutex
seenMap map[uint16]*Client
Root phase0.Root
Slot phase0.Slot
headerMutex sync.Mutex
headerChan chan bool
header *phase0.SignedBeaconBlockHeader
blockMutex sync.Mutex
blockChan chan bool
block *spec.VersionedSignedBeaconBlock
payloadMutex sync.Mutex
payloadChan chan bool
payload *gloas.SignedExecutionPayloadEnvelope
seenMutex sync.RWMutex
seenMap map[uint16]*Client
}

func (block *Block) GetSeenBy() []*Client {
Expand Down Expand Up @@ -140,3 +144,47 @@ func (block *Block) EnsureBlock(loadBlock func() (*spec.VersionedSignedBeaconBlo

return true, nil
}

// GetPayload returns the execution payload envelope if available.
func (block *Block) GetPayload() *gloas.SignedExecutionPayloadEnvelope {
return block.payload
}

// AwaitPayload waits for the execution payload envelope to become available.
func (block *Block) AwaitPayload(ctx context.Context, timeout time.Duration) *gloas.SignedExecutionPayloadEnvelope {
if ctx == nil {
ctx = context.Background()
}

select {
case <-block.payloadChan:
case <-time.After(timeout):
case <-ctx.Done():
}

return block.payload
}

// EnsurePayload loads and sets the execution payload envelope if not already set.
func (block *Block) EnsurePayload(loadPayload func() (*gloas.SignedExecutionPayloadEnvelope, error)) (bool, error) {
if block.payload != nil {
return false, nil
}

block.payloadMutex.Lock()
defer block.payloadMutex.Unlock()

if block.payload != nil {
return false, nil
}

payload, err := loadPayload()
if err != nil {
return false, err
}

block.payload = payload
close(block.payloadChan)

return true, nil
}
32 changes: 31 additions & 1 deletion pkg/clients/consensus/block_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"

"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/gloas"
)

func GetExecutionExtraData(v *spec.VersionedSignedBeaconBlock) ([]byte, error) {
Expand All @@ -23,15 +24,38 @@ func GetExecutionExtraData(v *spec.VersionedSignedBeaconBlock) ([]byte, error) {
return v.Capella.Message.Body.ExecutionPayload.ExtraData, nil
case spec.DataVersionDeneb:
if v.Deneb == nil || v.Deneb.Message == nil || v.Deneb.Message.Body == nil || v.Deneb.Message.Body.ExecutionPayload == nil {
return nil, errors.New("no denb block")
return nil, errors.New("no deneb block")
}

return v.Deneb.Message.Body.ExecutionPayload.ExtraData, nil
case spec.DataVersionElectra:
if v.Electra == nil || v.Electra.Message == nil || v.Electra.Message.Body == nil || v.Electra.Message.Body.ExecutionPayload == nil {
return nil, errors.New("no electra block")
}

return v.Electra.Message.Body.ExecutionPayload.ExtraData, nil
case spec.DataVersionFulu:
if v.Fulu == nil || v.Fulu.Message == nil || v.Fulu.Message.Body == nil || v.Fulu.Message.Body.ExecutionPayload == nil {
return nil, errors.New("no fulu block")
}

return v.Fulu.Message.Body.ExecutionPayload.ExtraData, nil
case spec.DataVersionGloas:
return nil, errors.New("gloas extra data is in separate payload")
default:
return nil, errors.New("unknown version")
}
}

// GetPayloadExtraData returns the extra data from a gloas execution payload envelope.
func GetPayloadExtraData(payload *gloas.SignedExecutionPayloadEnvelope) ([]byte, error) {
if payload == nil || payload.Message == nil || payload.Message.Payload == nil {
return nil, errors.New("no payload")
}

return payload.Message.Payload.ExtraData, nil
}

func GetBlockBody(v *spec.VersionedSignedBeaconBlock) any {
//nolint:exhaustive // ignore
switch v.Version {
Expand All @@ -45,6 +69,12 @@ func GetBlockBody(v *spec.VersionedSignedBeaconBlock) any {
return v.Capella
case spec.DataVersionDeneb:
return v.Deneb
case spec.DataVersionElectra:
return v.Electra
case spec.DataVersionFulu:
return v.Fulu
case spec.DataVersionGloas:
return v.Gloas
default:
return nil
}
Expand Down
20 changes: 15 additions & 5 deletions pkg/clients/consensus/blockcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type BlockCache struct {
maxSlotIdx int64

blockDispatcher Dispatcher[*Block]
payloadDispatcher Dispatcher[*Block]
checkpointDispatcher Dispatcher[*FinalizedCheckpoint]
wallclockEpochDispatcher Dispatcher[*ethwallclock.Epoch]
wallclockSlotDispatcher Dispatcher[*ethwallclock.Slot]
Expand Down Expand Up @@ -99,6 +100,14 @@ func (cache *BlockCache) notifyBlockReady(block *Block) {
cache.blockDispatcher.Fire(block)
}

func (cache *BlockCache) SubscribePayloadEvent(capacity int) *Subscription[*Block] {
return cache.payloadDispatcher.Subscribe(capacity)
}

func (cache *BlockCache) notifyPayloadReady(block *Block) {
cache.payloadDispatcher.Fire(block)
}

func (cache *BlockCache) SetMinFollowDistance(followDistance uint64) {
if followDistance > 10000 {
followDistance = 10000
Expand Down Expand Up @@ -266,11 +275,12 @@ func (cache *BlockCache) AddBlock(root phase0.Root, slot phase0.Slot) (*Block, b
}

cacheBlock := &Block{
Root: root,
Slot: slot,
seenMap: make(map[uint16]*Client),
headerChan: make(chan bool),
blockChan: make(chan bool),
Root: root,
Slot: slot,
seenMap: make(map[uint16]*Client),
headerChan: make(chan bool),
blockChan: make(chan bool),
payloadChan: make(chan bool),
}
cache.blockRootMap[root] = cacheBlock

Expand Down
13 changes: 13 additions & 0 deletions pkg/clients/consensus/chainspec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,24 @@ type ChainSpec struct {
BellatrixForkEpoch uint64 `yaml:"BELLATRIX_FORK_EPOCH"`
CappellaForkVersion phase0.Version `yaml:"CAPELLA_FORK_VERSION"`
CappellaForkEpoch uint64 `yaml:"CAPELLA_FORK_EPOCH"`
DenebForkEpoch uint64 `yaml:"DENEB_FORK_EPOCH"`
ElectraForkEpoch uint64 `yaml:"ELECTRA_FORK_EPOCH"`
FuluForkEpoch uint64 `yaml:"FULU_FORK_EPOCH"`
GloasForkEpoch uint64 `yaml:"GLOAS_FORK_EPOCH"`
SecondsPerSlot time.Duration `yaml:"SECONDS_PER_SLOT"`
SlotsPerEpoch uint64 `yaml:"SLOTS_PER_EPOCH"`
MaxCommitteesPerSlot uint64 `yaml:"MAX_COMMITTEES_PER_SLOT"`
}

// IsGloasActive returns true if the gloas fork is active at the given slot.
func (chain *ChainSpec) IsGloasActive(slot phase0.Slot) bool {
if chain.GloasForkEpoch == 0 || chain.SlotsPerEpoch == 0 {
return false
}

return uint64(slot) >= chain.GloasForkEpoch*chain.SlotsPerEpoch
}

func (chain *ChainSpec) CheckMismatch(chain2 *ChainSpec) []string {
mismatches := []string{}

Expand Down
Loading