Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,16 @@ jobs:
- name: Install just
uses: extractions/setup-just@v3
- name: E2E Tests
run: just test-e2e
run: |
mkdir -p .artifacts/e2e-server-logs
EV_E2E_LOG_DIR="${{ github.workspace }}/.artifacts/e2e-server-logs" just test-e2e
- name: Upload E2E server logs
if: failure()
uses: actions/upload-artifact@v7.0.0
with:
name: e2e-server-logs-${{ github.sha }}
path: ./.artifacts/e2e-server-logs
if-no-files-found: warn

evm-tests:
name: Run EVM Execution Tests
Expand Down
60 changes: 38 additions & 22 deletions execution/evm/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,24 +756,33 @@ func (c *EngineClient) reconcileExecutionAtHeight(ctx context.Context, height ui
// If we have a started execution with a payloadID, validate it still exists before resuming.
// After node restart, the EL's payload cache is ephemeral and the payloadID may be stale.
if execMeta.Stage == ExecStageStarted && len(execMeta.PayloadID) == 8 {
var pid engine.PayloadID
copy(pid[:], execMeta.PayloadID)
requestedTxHash := hashTxs(txs)
if execMeta.Timestamp != timestamp.Unix() || !bytes.Equal(execMeta.TxHash, requestedTxHash) {
Comment on lines +759 to +760
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Make the resume fingerprint unambiguous.

hashTxs() hashes the raw byte concatenation, so different ordered transaction slices can share the same digest ([ab,c] vs [a,bc]). The equality check on Line 760 can then resume a payload built from different inputs. Hash a length-delimited encoding instead.

Suggested fix
+import "encoding/binary"
 ...
 func hashTxs(txs [][]byte) []byte {
 	if len(txs) == 0 {
 		return nil
 	}
 
 	h := sha256.New()
+	var lenBuf [8]byte
 	for _, tx := range txs {
-		h.Write(tx)
+		binary.LittleEndian.PutUint64(lenBuf[:], uint64(len(tx)))
+		_, _ = h.Write(lenBuf[:])
+		_, _ = h.Write(tx)
 	}
 
 	return h.Sum(nil)
 }

Also applies to: 1048-1058

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@execution/evm/execution.go` around lines 759 - 760, The resume fingerprint is
ambiguous because hashTxs() currently hashes raw concatenation of tx bytes;
change hashTxs to compute a length-delimited digest (prefix each transaction
with its length using a fixed or varint encoding, then hash the sequence) so
different segmentations/orderings cannot collide, and update the comparisons
that use hashTxs (where execMeta.TxHash is compared to requestedTxHash alongside
execMeta.Timestamp/timestamp.Unix()) to use the new length-delimited hash
function; also apply the same replacement to the other places that call hashTxs
(the later equality check block referenced around the second usage) so all
resume checks use the unambiguous, length-prefixed hash.

c.logger.Warn().
Uint64("height", height).
Int64("execmeta_timestamp", execMeta.Timestamp).
Int64("requested_timestamp", timestamp.Unix()).
Msg("ExecuteTxs: ignoring stale in-progress execution for different block inputs")
} else {
var pid engine.PayloadID
copy(pid[:], execMeta.PayloadID)

// Validate payload still exists by attempting to retrieve it
if _, err = c.engineClient.GetPayload(ctx, pid); err == nil {
c.logger.Info().
// Validate payload still exists by attempting to retrieve it
if _, err = c.engineClient.GetPayload(ctx, pid); err == nil {
c.logger.Info().
Uint64("height", height).
Str("stage", execMeta.Stage).
Msg("ExecuteTxs: found in-progress execution with payloadID, returning payloadID for resume")
return nil, &pid, true, nil
}
// Payload is stale (expired or node restarted) - proceed with fresh execution
c.logger.Debug().
Uint64("height", height).
Str("stage", execMeta.Stage).
Msg("ExecuteTxs: found in-progress execution with payloadID, returning payloadID for resume")
return nil, &pid, true, nil
Str("payloadID", pid.String()).
Err(err).
Msg("ExecuteTxs: stale ExecMeta payloadID no longer valid in EL, will re-execute")
// Don't return - fall through to fresh execution
}
// Payload is stale (expired or node restarted) - proceed with fresh execution
c.logger.Debug().
Uint64("height", height).
Str("payloadID", pid.String()).
Err(err).
Msg("ExecuteTxs: stale ExecMeta payloadID no longer valid in EL, will re-execute")
// Don't return - fall through to fresh execution
}
}

Expand Down Expand Up @@ -1023,13 +1032,7 @@ func (c *EngineClient) saveExecMeta(ctx context.Context, height uint64, timestam
}

// Compute tx hash for sanity checks on retry
if len(txs) > 0 {
h := sha256.New()
for _, tx := range txs {
h.Write(tx)
}
execMeta.TxHash = h.Sum(nil)
}
execMeta.TxHash = hashTxs(txs)

if err := c.store.SaveExecMeta(ctx, execMeta); err != nil {
c.logger.Warn().Err(err).Uint64("height", height).Msg("saveExecMeta: failed to save exec meta")
Expand All @@ -1042,6 +1045,19 @@ func (c *EngineClient) saveExecMeta(ctx context.Context, height uint64, timestam
Msg("saveExecMeta: saved execution metadata")
}

func hashTxs(txs [][]byte) []byte {
if len(txs) == 0 {
return nil
}

h := sha256.New()
for _, tx := range txs {
h.Write(tx)
}

return h.Sum(nil)
}

// GetLatestHeight returns the current block height of the execution layer
func (c *EngineClient) GetLatestHeight(ctx context.Context) (uint64, error) {
header, err := c.ethClient.HeaderByNumber(ctx, nil) // nil = latest block
Expand Down
130 changes: 130 additions & 0 deletions execution/evm/execution_reconcile_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package evm

import (
"context"
"errors"
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/core/types"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
)

func TestReconcileExecutionAtHeight_StartedExecMeta(t *testing.T) {
t.Parallel()

specs := map[string]struct {
execMetaTimestamp int64
execMetaTxs [][]byte
requestedTxs [][]byte
requestedTime time.Time
expectFound bool
expectPayloadID bool
expectGetPayloads int
}{
"resume_when_inputs_match": {
execMetaTimestamp: 1700000012,
execMetaTxs: [][]byte{[]byte("tx-1")},
requestedTxs: [][]byte{[]byte("tx-1")},
requestedTime: time.Unix(1700000012, 0),
expectFound: true,
expectPayloadID: true,
expectGetPayloads: 1,
},
"ignore_when_timestamp_differs": {
execMetaTimestamp: 1700000010,
execMetaTxs: [][]byte{[]byte("tx-1")},
requestedTxs: [][]byte{[]byte("tx-1")},
requestedTime: time.Unix(1700000012, 0),
expectFound: false,
expectPayloadID: false,
expectGetPayloads: 0,
},
"ignore_when_txs_differ": {
execMetaTimestamp: 1700000012,
execMetaTxs: [][]byte{[]byte("tx-old")},
requestedTxs: [][]byte{[]byte("tx-new")},
requestedTime: time.Unix(1700000012, 0),
expectFound: false,
expectPayloadID: false,
expectGetPayloads: 0,
},
}

for name, spec := range specs {
t.Run(name, func(t *testing.T) {
t.Parallel()
Comment on lines +59 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify current nondeterministic table pattern in this test file.
rg -n -C2 'specs := map\[string\]struct|for name, spec := range specs|t\.Run\(' execution/evm/execution_reconcile_test.go

Repository: evstack/ev-node

Length of output: 283


Use a slice-backed table to keep subtest execution order deterministic.

Map iteration in Go is randomized, so line 59's for name, spec := range specs produces non-deterministic subtest ordering. Refactor to use a []struct{...} table with a name field to satisfy the test determinism requirement.

Suggested refactor
- specs := map[string]struct {
+ specs := []struct {
+   name              string
    execMetaTimestamp int64
    execMetaTxs       [][]byte
    requestedTxs      [][]byte
    requestedTime     time.Time
    expectFound       bool
    expectPayloadID   bool
    expectGetPayloads int
- }{
-   "resume_when_inputs_match": {
+ }{
+   {
+     name:              "resume_when_inputs_match",
      execMetaTimestamp: 1700000012,
      execMetaTxs:       [][]byte{[]byte("tx-1")},
      requestedTxs:      [][]byte{[]byte("tx-1")},
      requestedTime:     time.Unix(1700000012, 0),
      expectFound:       true,
      expectPayloadID:   true,
      expectGetPayloads: 1,
    },
-   "ignore_when_timestamp_differs": {
+   {
+     name:              "ignore_when_timestamp_differs",
      execMetaTimestamp: 1700000010,
      execMetaTxs:       [][]byte{[]byte("tx-1")},
      requestedTxs:      [][]byte{[]byte("tx-1")},
      requestedTime:     time.Unix(1700000012, 0),
      expectFound:       false,
      expectPayloadID:   false,
      expectGetPayloads: 0,
    },
-   "ignore_when_txs_differ": {
+   {
+     name:              "ignore_when_txs_differ",
      execMetaTimestamp: 1700000012,
      execMetaTxs:       [][]byte{[]byte("tx-old")},
      requestedTxs:      [][]byte{[]byte("tx-new")},
      requestedTime:     time.Unix(1700000012, 0),
      expectFound:       false,
      expectPayloadID:   false,
      expectGetPayloads: 0,
    },
  }

- for name, spec := range specs {
-   t.Run(name, func(t *testing.T) {
+ for _, spec := range specs {
+   t.Run(spec.name, func(t *testing.T) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@execution/evm/execution_reconcile_test.go` around lines 59 - 61, The test
uses a map-backed table `specs` and iterates with `for name, spec := range
specs` which yields non-deterministic subtest order; refactor `specs` into a
slice-backed table (e.g. `[]struct { name string; spec <type> }`) and iterate
`for _, tc := range specsSlice { t.Run(tc.name, func(t *testing.T) {
t.Parallel(); spec := tc.spec; ... }) }` so subtest order is deterministic and
the loop-local `spec` is captured correctly; update any references to `specs`
and the `t.Run` closure accordingly.


store := NewEVMStore(dssync.MutexWrap(ds.NewMapDatastore()))
payloadID := engine.PayloadID{1, 2, 3, 4, 5, 6, 7, 8}
require.NoError(t, store.SaveExecMeta(t.Context(), &ExecMeta{
Height: 12,
PayloadID: payloadID[:],
TxHash: hashTxs(spec.execMetaTxs),
Timestamp: spec.execMetaTimestamp,
Stage: ExecStageStarted,
}))

engineRPC := &mockReconcileEngineRPCClient{
payloads: map[engine.PayloadID]*engine.ExecutionPayloadEnvelope{
payloadID: {},
},
}
client := &EngineClient{
engineClient: engineRPC,
ethClient: mockReconcileEthRPCClient{},
store: store,
logger: zerolog.Nop(),
}

stateRoot, gotPayloadID, found, err := client.reconcileExecutionAtHeight(t.Context(), 12, spec.requestedTime, spec.requestedTxs)

require.NoError(t, err)
require.Nil(t, stateRoot)
require.Equal(t, spec.expectFound, found)
require.Equal(t, spec.expectPayloadID, gotPayloadID != nil)
if spec.expectPayloadID {
require.Equal(t, payloadID, *gotPayloadID)
}
require.Equal(t, spec.expectGetPayloads, engineRPC.getPayloadCalls)
})
}
}

type mockReconcileEngineRPCClient struct {
payloads map[engine.PayloadID]*engine.ExecutionPayloadEnvelope
getPayloadCalls int
}

func (m *mockReconcileEngineRPCClient) ForkchoiceUpdated(_ context.Context, _ engine.ForkchoiceStateV1, _ map[string]any) (*engine.ForkChoiceResponse, error) {
return nil, errors.New("unexpected ForkchoiceUpdated call")
}

func (m *mockReconcileEngineRPCClient) GetPayload(_ context.Context, payloadID engine.PayloadID) (*engine.ExecutionPayloadEnvelope, error) {
m.getPayloadCalls++
payload, ok := m.payloads[payloadID]
if !ok {
return nil, errors.New("payload not found")
}

return payload, nil
}

func (m *mockReconcileEngineRPCClient) NewPayload(_ context.Context, _ *engine.ExecutableData, _ []string, _ string, _ [][]byte) (*engine.PayloadStatusV1, error) {
return nil, errors.New("unexpected NewPayload call")
}

type mockReconcileEthRPCClient struct{}

func (mockReconcileEthRPCClient) HeaderByNumber(_ context.Context, _ *big.Int) (*types.Header, error) {
return nil, errors.New("header not found")
}

func (mockReconcileEthRPCClient) GetTxs(_ context.Context) ([]string, error) {
return nil, errors.New("unexpected GetTxs call")
}
40 changes: 38 additions & 2 deletions node/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type failoverState struct {
dataSyncService *evsync.DataSyncService
rpcServer *http.Server
bc *block.Components
raftNode *raft.Node
isAggregator bool

// catchup fields — used when the aggregator needs to sync before producing
catchupEnabled bool
Expand Down Expand Up @@ -172,13 +174,34 @@ func setupFailoverState(
dataSyncService: dataSyncService,
rpcServer: rpcServer,
bc: bc,
raftNode: raftNode,
isAggregator: isAggregator,
store: rktStore,
catchupEnabled: catchupEnabled,
catchupTimeout: nodeConfig.Node.CatchupTimeout.Duration,
daBlockTime: nodeConfig.DA.BlockTime.Duration,
}, nil
}

func (f *failoverState) shouldStartSyncInPublisherMode(ctx context.Context) bool {
if !f.isAggregator || f.raftNode == nil || !f.raftNode.IsLeader() {
return false
}

height, err := f.store.Height(ctx)
if err != nil {
f.logger.Warn().Err(err).Msg("cannot determine local height; keeping blocking sync startup")
return false
}
if height > 0 {
return false
}

f.logger.Info().
Msg("raft leader with empty store: starting sync services in publisher mode")
return true
}

func (f *failoverState) Run(pCtx context.Context) (multiErr error) {
stopService := func(stoppable func(context.Context) error, name string) { //nolint:contextcheck // shutdown uses context.Background intentionally
// parent context is cancelled already, so we need to create a new one
Expand Down Expand Up @@ -207,15 +230,28 @@ func (f *failoverState) Run(pCtx context.Context) (multiErr error) {
})

// start header and data sync services concurrently to avoid cumulative startup delay.
startSyncInPublisherMode := f.shouldStartSyncInPublisherMode(ctx)
syncWg, syncCtx := errgroup.WithContext(ctx)
syncWg.Go(func() error {
if err := f.headerSyncService.Start(syncCtx); err != nil {
var err error
if startSyncInPublisherMode {
err = f.headerSyncService.StartForPublishing(syncCtx)
} else {
err = f.headerSyncService.Start(syncCtx)
}
if err != nil {
return fmt.Errorf("header sync service: %w", err)
}
return nil
})
syncWg.Go(func() error {
if err := f.dataSyncService.Start(syncCtx); err != nil {
var err error
if startSyncInPublisherMode {
err = f.dataSyncService.StartForPublishing(syncCtx)
} else {
err = f.dataSyncService.Start(syncCtx)
}
if err != nil {
return fmt.Errorf("data sync service: %w", err)
}
return nil
Expand Down
5 changes: 5 additions & 0 deletions pkg/store/cached_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,8 @@ func (cs *CachedStore) Close() error {
cs.ClearCache()
return cs.Store.Close()
}

// Sync flushes the underlying store to durable storage.
func (cs *CachedStore) Sync(ctx context.Context) error {
return cs.Store.Sync(ctx)
}
17 changes: 16 additions & 1 deletion pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"time"

ds "github.com/ipfs/go-datastore"
"google.golang.org/protobuf/proto"
Expand All @@ -30,7 +31,21 @@ func New(ds ds.Batching) Store {

// Close safely closes underlying data storage, to ensure that data is actually saved.
func (s *DefaultStore) Close() error {
return s.db.Close()
done := make(chan error, 1)
go func() {
syncCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

_ = s.Sync(syncCtx)
done <- s.db.Close()
}()

select {
case err := <-done:
return err
case <-time.After(4 * time.Second):
return nil
Comment on lines +34 to +47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't turn an incomplete close into a successful one.

This path drops Sync() failures and returns nil after 4 seconds even if the goroutine is still flushing or closing s.db. That lets callers reopen the same store while the old handle is still live and hides the durability failure this change is supposed to prevent.

Suggested fix
 func (s *DefaultStore) Close() error {
 	done := make(chan error, 1)
 	go func() {
 		syncCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
 		defer cancel()
 
-		_ = s.Sync(syncCtx)
-		done <- s.db.Close()
+		syncErr := s.Sync(syncCtx)
+		closeErr := s.db.Close()
+
+		switch {
+		case syncErr != nil && closeErr != nil:
+			done <- errors.Join(
+				fmt.Errorf("sync store before close: %w", syncErr),
+				fmt.Errorf("close datastore: %w", closeErr),
+			)
+		case syncErr != nil:
+			done <- fmt.Errorf("sync store before close: %w", syncErr)
+		default:
+			done <- closeErr
+		}
 	}()
 
 	select {
 	case err := <-done:
 		return err
 	case <-time.After(4 * time.Second):
-		return nil
+		return fmt.Errorf("closing datastore timed out after 4s")
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
done := make(chan error, 1)
go func() {
syncCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = s.Sync(syncCtx)
done <- s.db.Close()
}()
select {
case err := <-done:
return err
case <-time.After(4 * time.Second):
return nil
done := make(chan error, 1)
go func() {
syncCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
syncErr := s.Sync(syncCtx)
closeErr := s.db.Close()
switch {
case syncErr != nil && closeErr != nil:
done <- errors.Join(
fmt.Errorf("sync store before close: %w", syncErr),
fmt.Errorf("close datastore: %w", closeErr),
)
case syncErr != nil:
done <- fmt.Errorf("sync store before close: %w", syncErr)
default:
done <- closeErr
}
}()
select {
case err := <-done:
return err
case <-time.After(4 * time.Second):
return fmt.Errorf("closing datastore timed out after 4s")
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/store.go` around lines 34 - 47, The current close path spawns a
goroutine that discards the result of s.Sync and can return nil after 4s even if
Sync or s.db.Close failed or is still running; change the goroutine to capture
both the Sync error and Close error (e.g., syncErr := s.Sync(syncCtx); closeErr
:= s.db.Close()), send a combined/non-nil error on done (prefer returning
syncErr if non-nil, else closeErr), and update the select timeout branch to
return a non-nil error (e.g., context.DeadlineExceeded or a wrapped error
indicating close timed out) instead of nil so callers cannot reopen while the
old store may still be closing. Ensure you reference the goroutine closure where
s.Sync, s.db.Close and the done channel are used.

}
}

// Height returns height of the highest block saved in the Store.
Expand Down
30 changes: 30 additions & 0 deletions pkg/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,22 @@ type mockBatch struct {
commitError error
}

type syncingBatchingDatastore struct {
ds.Batching
syncCalled bool
closeCalled bool
}

func (m *syncingBatchingDatastore) Sync(ctx context.Context, key ds.Key) error {
m.syncCalled = true
return m.Batching.Sync(ctx, key)
}

func (m *syncingBatchingDatastore) Close() error {
m.closeCalled = true
return m.Batching.Close()
}
Comment on lines +38 to +52
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

This test doesn't verify the Sync()-before-Close() contract.

Both booleans become true even if the implementation calls Close() first, so the regression this test is named after would still pass. Record the call sequence and assert the exact order.

Suggested fix
+import "sync"
 ...
 type syncingBatchingDatastore struct {
 	ds.Batching
-	syncCalled  bool
-	closeCalled bool
+	mu    sync.Mutex
+	calls []string
 }
 
 func (m *syncingBatchingDatastore) Sync(ctx context.Context, key ds.Key) error {
-	m.syncCalled = true
+	m.mu.Lock()
+	m.calls = append(m.calls, "sync")
+	m.mu.Unlock()
 	return m.Batching.Sync(ctx, key)
 }
 
 func (m *syncingBatchingDatastore) Close() error {
-	m.closeCalled = true
+	m.mu.Lock()
+	m.calls = append(m.calls, "close")
+	m.mu.Unlock()
 	return m.Batching.Close()
 }
 ...
 	require.NoError(t, s.Close())
-	require.True(t, mock.syncCalled)
-	require.True(t, mock.closeCalled)
+	mock.mu.Lock()
+	defer mock.mu.Unlock()
+	require.Equal(t, []string{"sync", "close"}, mock.calls)
 }

Also applies to: 160-171

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/store_test.go` around lines 38 - 52, The test currently uses two
booleans (syncCalled/closeCalled) in syncingBatchingDatastore which can't detect
order; change the stub to record the call sequence (e.g., a slice of strings or
ints) and have Sync(ctx, key) append "Sync" and Close() append "Close", then
update the test assertions to assert the recorded sequence equals the expected
order (Sync before Close). Update both occurrences that define/instantiate
syncingBatchingDatastore (the methods Sync and Close in
syncingBatchingDatastore) and the corresponding assertions so they check the
exact sequence rather than just both flags being true.


func (m *mockBatchingDatastore) Put(ctx context.Context, key ds.Key, value []byte) error {
if m.putError != nil {
return m.putError
Expand Down Expand Up @@ -141,6 +157,20 @@ func TestStoreHeight(t *testing.T) {
}
}

func TestStoreCloseSyncsBeforeClose(t *testing.T) {
t.Parallel()

kv, err := NewTestInMemoryKVStore()
require.NoError(t, err)

mock := &syncingBatchingDatastore{Batching: kv}
s := New(mock)

require.NoError(t, s.Close())
require.True(t, mock.syncCalled)
require.True(t, mock.closeCalled)
}

func TestStoreLoad(t *testing.T) {
t.Parallel()
chainID := "TestStoreLoad"
Expand Down
Loading
Loading