-
Notifications
You must be signed in to change notification settings - Fork 253
fix: startup races #3172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
fix: startup races #3172
Changes from all commits
522863f
24cb707
21ef2da
9e3e8e5
505bce1
ab0f35a
4736118
fd9a2f9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,130 @@ | ||
| package evm | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "math/big" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/ethereum/go-ethereum/beacon/engine" | ||
| "github.com/ethereum/go-ethereum/core/types" | ||
| ds "github.com/ipfs/go-datastore" | ||
| dssync "github.com/ipfs/go-datastore/sync" | ||
| "github.com/rs/zerolog" | ||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| func TestReconcileExecutionAtHeight_StartedExecMeta(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| specs := map[string]struct { | ||
| execMetaTimestamp int64 | ||
| execMetaTxs [][]byte | ||
| requestedTxs [][]byte | ||
| requestedTime time.Time | ||
| expectFound bool | ||
| expectPayloadID bool | ||
| expectGetPayloads int | ||
| }{ | ||
| "resume_when_inputs_match": { | ||
| execMetaTimestamp: 1700000012, | ||
| execMetaTxs: [][]byte{[]byte("tx-1")}, | ||
| requestedTxs: [][]byte{[]byte("tx-1")}, | ||
| requestedTime: time.Unix(1700000012, 0), | ||
| expectFound: true, | ||
| expectPayloadID: true, | ||
| expectGetPayloads: 1, | ||
| }, | ||
| "ignore_when_timestamp_differs": { | ||
| execMetaTimestamp: 1700000010, | ||
| execMetaTxs: [][]byte{[]byte("tx-1")}, | ||
| requestedTxs: [][]byte{[]byte("tx-1")}, | ||
| requestedTime: time.Unix(1700000012, 0), | ||
| expectFound: false, | ||
| expectPayloadID: false, | ||
| expectGetPayloads: 0, | ||
| }, | ||
| "ignore_when_txs_differ": { | ||
| execMetaTimestamp: 1700000012, | ||
| execMetaTxs: [][]byte{[]byte("tx-old")}, | ||
| requestedTxs: [][]byte{[]byte("tx-new")}, | ||
| requestedTime: time.Unix(1700000012, 0), | ||
| expectFound: false, | ||
| expectPayloadID: false, | ||
| expectGetPayloads: 0, | ||
| }, | ||
| } | ||
|
|
||
| for name, spec := range specs { | ||
| t.Run(name, func(t *testing.T) { | ||
| t.Parallel() | ||
|
Comment on lines
+59
to
+61
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion | 🟠 Major 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Verify current nondeterministic table pattern in this test file.
rg -n -C2 'specs := map\[string\]struct|for name, spec := range specs|t\.Run\(' execution/evm/execution_reconcile_test.goRepository: evstack/ev-node Length of output: 283 Use a slice-backed table to keep subtest execution order deterministic. Map iteration in Go is randomized, so line 59's Suggested refactor- specs := map[string]struct {
+ specs := []struct {
+ name string
execMetaTimestamp int64
execMetaTxs [][]byte
requestedTxs [][]byte
requestedTime time.Time
expectFound bool
expectPayloadID bool
expectGetPayloads int
- }{
- "resume_when_inputs_match": {
+ }{
+ {
+ name: "resume_when_inputs_match",
execMetaTimestamp: 1700000012,
execMetaTxs: [][]byte{[]byte("tx-1")},
requestedTxs: [][]byte{[]byte("tx-1")},
requestedTime: time.Unix(1700000012, 0),
expectFound: true,
expectPayloadID: true,
expectGetPayloads: 1,
},
- "ignore_when_timestamp_differs": {
+ {
+ name: "ignore_when_timestamp_differs",
execMetaTimestamp: 1700000010,
execMetaTxs: [][]byte{[]byte("tx-1")},
requestedTxs: [][]byte{[]byte("tx-1")},
requestedTime: time.Unix(1700000012, 0),
expectFound: false,
expectPayloadID: false,
expectGetPayloads: 0,
},
- "ignore_when_txs_differ": {
+ {
+ name: "ignore_when_txs_differ",
execMetaTimestamp: 1700000012,
execMetaTxs: [][]byte{[]byte("tx-old")},
requestedTxs: [][]byte{[]byte("tx-new")},
requestedTime: time.Unix(1700000012, 0),
expectFound: false,
expectPayloadID: false,
expectGetPayloads: 0,
},
}
- for name, spec := range specs {
- t.Run(name, func(t *testing.T) {
+ for _, spec := range specs {
+ t.Run(spec.name, func(t *testing.T) {🤖 Prompt for AI Agents |
||
|
|
||
| store := NewEVMStore(dssync.MutexWrap(ds.NewMapDatastore())) | ||
| payloadID := engine.PayloadID{1, 2, 3, 4, 5, 6, 7, 8} | ||
| require.NoError(t, store.SaveExecMeta(t.Context(), &ExecMeta{ | ||
| Height: 12, | ||
| PayloadID: payloadID[:], | ||
| TxHash: hashTxs(spec.execMetaTxs), | ||
| Timestamp: spec.execMetaTimestamp, | ||
| Stage: ExecStageStarted, | ||
| })) | ||
|
|
||
| engineRPC := &mockReconcileEngineRPCClient{ | ||
| payloads: map[engine.PayloadID]*engine.ExecutionPayloadEnvelope{ | ||
| payloadID: {}, | ||
| }, | ||
| } | ||
| client := &EngineClient{ | ||
| engineClient: engineRPC, | ||
| ethClient: mockReconcileEthRPCClient{}, | ||
| store: store, | ||
| logger: zerolog.Nop(), | ||
| } | ||
|
|
||
| stateRoot, gotPayloadID, found, err := client.reconcileExecutionAtHeight(t.Context(), 12, spec.requestedTime, spec.requestedTxs) | ||
|
|
||
| require.NoError(t, err) | ||
| require.Nil(t, stateRoot) | ||
| require.Equal(t, spec.expectFound, found) | ||
| require.Equal(t, spec.expectPayloadID, gotPayloadID != nil) | ||
| if spec.expectPayloadID { | ||
| require.Equal(t, payloadID, *gotPayloadID) | ||
| } | ||
| require.Equal(t, spec.expectGetPayloads, engineRPC.getPayloadCalls) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| type mockReconcileEngineRPCClient struct { | ||
| payloads map[engine.PayloadID]*engine.ExecutionPayloadEnvelope | ||
| getPayloadCalls int | ||
| } | ||
|
|
||
| func (m *mockReconcileEngineRPCClient) ForkchoiceUpdated(_ context.Context, _ engine.ForkchoiceStateV1, _ map[string]any) (*engine.ForkChoiceResponse, error) { | ||
| return nil, errors.New("unexpected ForkchoiceUpdated call") | ||
| } | ||
|
|
||
| func (m *mockReconcileEngineRPCClient) GetPayload(_ context.Context, payloadID engine.PayloadID) (*engine.ExecutionPayloadEnvelope, error) { | ||
| m.getPayloadCalls++ | ||
| payload, ok := m.payloads[payloadID] | ||
| if !ok { | ||
| return nil, errors.New("payload not found") | ||
| } | ||
|
|
||
| return payload, nil | ||
| } | ||
|
|
||
| func (m *mockReconcileEngineRPCClient) NewPayload(_ context.Context, _ *engine.ExecutableData, _ []string, _ string, _ [][]byte) (*engine.PayloadStatusV1, error) { | ||
| return nil, errors.New("unexpected NewPayload call") | ||
| } | ||
|
|
||
| type mockReconcileEthRPCClient struct{} | ||
|
|
||
| func (mockReconcileEthRPCClient) HeaderByNumber(_ context.Context, _ *big.Int) (*types.Header, error) { | ||
| return nil, errors.New("header not found") | ||
| } | ||
|
|
||
| func (mockReconcileEthRPCClient) GetTxs(_ context.Context) ([]string, error) { | ||
| return nil, errors.New("unexpected GetTxs call") | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -6,6 +6,7 @@ import ( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "encoding/binary" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "errors" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "fmt" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "time" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ds "github.com/ipfs/go-datastore" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "google.golang.org/protobuf/proto" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -30,7 +31,21 @@ func New(ds ds.Batching) Store { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Close safely closes underlying data storage, to ensure that data is actually saved. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func (s *DefaultStore) Close() error { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return s.db.Close() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| done := make(chan error, 1) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| go func() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| syncCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| defer cancel() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _ = s.Sync(syncCtx) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| done <- s.db.Close() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| select { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case err := <-done: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return err | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case <-time.After(4 * time.Second): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+34
to
+47
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't turn an incomplete close into a successful one. This path drops Suggested fix func (s *DefaultStore) Close() error {
done := make(chan error, 1)
go func() {
syncCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
- _ = s.Sync(syncCtx)
- done <- s.db.Close()
+ syncErr := s.Sync(syncCtx)
+ closeErr := s.db.Close()
+
+ switch {
+ case syncErr != nil && closeErr != nil:
+ done <- errors.Join(
+ fmt.Errorf("sync store before close: %w", syncErr),
+ fmt.Errorf("close datastore: %w", closeErr),
+ )
+ case syncErr != nil:
+ done <- fmt.Errorf("sync store before close: %w", syncErr)
+ default:
+ done <- closeErr
+ }
}()
select {
case err := <-done:
return err
case <-time.After(4 * time.Second):
- return nil
+ return fmt.Errorf("closing datastore timed out after 4s")
}
}📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Height returns height of the highest block saved in the Store. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,6 +35,22 @@ type mockBatch struct { | |
| commitError error | ||
| } | ||
|
|
||
| type syncingBatchingDatastore struct { | ||
| ds.Batching | ||
| syncCalled bool | ||
| closeCalled bool | ||
| } | ||
|
|
||
| func (m *syncingBatchingDatastore) Sync(ctx context.Context, key ds.Key) error { | ||
| m.syncCalled = true | ||
| return m.Batching.Sync(ctx, key) | ||
| } | ||
|
|
||
| func (m *syncingBatchingDatastore) Close() error { | ||
| m.closeCalled = true | ||
| return m.Batching.Close() | ||
| } | ||
|
Comment on lines
+38
to
+52
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test doesn't verify the Both booleans become Suggested fix+import "sync"
...
type syncingBatchingDatastore struct {
ds.Batching
- syncCalled bool
- closeCalled bool
+ mu sync.Mutex
+ calls []string
}
func (m *syncingBatchingDatastore) Sync(ctx context.Context, key ds.Key) error {
- m.syncCalled = true
+ m.mu.Lock()
+ m.calls = append(m.calls, "sync")
+ m.mu.Unlock()
return m.Batching.Sync(ctx, key)
}
func (m *syncingBatchingDatastore) Close() error {
- m.closeCalled = true
+ m.mu.Lock()
+ m.calls = append(m.calls, "close")
+ m.mu.Unlock()
return m.Batching.Close()
}
...
require.NoError(t, s.Close())
- require.True(t, mock.syncCalled)
- require.True(t, mock.closeCalled)
+ mock.mu.Lock()
+ defer mock.mu.Unlock()
+ require.Equal(t, []string{"sync", "close"}, mock.calls)
}Also applies to: 160-171 🤖 Prompt for AI Agents |
||
|
|
||
| func (m *mockBatchingDatastore) Put(ctx context.Context, key ds.Key, value []byte) error { | ||
| if m.putError != nil { | ||
| return m.putError | ||
|
|
@@ -141,6 +157,20 @@ func TestStoreHeight(t *testing.T) { | |
| } | ||
| } | ||
|
|
||
| func TestStoreCloseSyncsBeforeClose(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| kv, err := NewTestInMemoryKVStore() | ||
| require.NoError(t, err) | ||
|
|
||
| mock := &syncingBatchingDatastore{Batching: kv} | ||
| s := New(mock) | ||
|
|
||
| require.NoError(t, s.Close()) | ||
| require.True(t, mock.syncCalled) | ||
| require.True(t, mock.closeCalled) | ||
| } | ||
|
|
||
| func TestStoreLoad(t *testing.T) { | ||
| t.Parallel() | ||
| chainID := "TestStoreLoad" | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make the resume fingerprint unambiguous.
hashTxs()hashes the raw byte concatenation, so different ordered transaction slices can share the same digest ([ab,c]vs[a,bc]). The equality check on Line 760 can then resume a payload built from different inputs. Hash a length-delimited encoding instead.Suggested fix
Also applies to: 1048-1058
🤖 Prompt for AI Agents