Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 185 additions & 34 deletions pkg/submit/direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -19,7 +20,10 @@ const (
maxSequenceRetryRounds = 2
)

var errSequenceMismatch = errors.New("account sequence mismatch")
var (
errSequenceMismatch = errors.New("account sequence mismatch")
errTooManyInFlight = errors.New("too many in-flight submissions")
)

// DirectConfig contains the fixed submission settings Apex owns for direct
// celestia-app writes.
Expand All @@ -42,6 +46,12 @@ type DirectSubmitter struct {
pollInterval time.Duration
feeDenom string
mu sync.Mutex
inFlight int
accountNumber uint64
nextSequence uint64
sequenceReady bool
pendingSequences map[string]uint64
maxInFlight int
}

// NewDirectSubmitter builds a concrete single-account submitter.
Expand Down Expand Up @@ -77,6 +87,7 @@ func NewDirectSubmitter(app AppClient, signer *Signer, cfg DirectConfig) (*Direc
confirmationTimeout: cfg.ConfirmationTimeout,
pollInterval: defaultPollInterval,
feeDenom: defaultFeeDenom,
pendingSequences: make(map[string]uint64),
}, nil
}

Expand All @@ -87,29 +98,23 @@ func (s *DirectSubmitter) Close() error {
return s.app.Close()
}

// Submit serializes submissions for the configured signer so sequence handling
// stays bounded and explicit in v1.
// Submit serializes sequence reservation and broadcast for the configured
// signer, then waits for confirmation without blocking the next nonce.
func (s *DirectSubmitter) Submit(ctx context.Context, req *Request) (*Result, error) {
if err := validateSubmitRequest(req); err != nil {
return nil, err
}
if err := s.startSubmission(); err != nil {
return nil, err
}
defer s.finishSubmission()

s.mu.Lock()
defer s.mu.Unlock()

var lastErr error
for range maxSequenceRetryRounds {
result, err := s.submitOnce(ctx, req)
if err == nil {
return result, nil
}
lastErr = err
if !errors.Is(err, errSequenceMismatch) {
return nil, err
}
broadcast, err := s.broadcastTx(ctx, req)
if err != nil {
return nil, err
}

return nil, lastErr
return s.waitForConfirmation(ctx, broadcast.Hash)
}

func validateSubmitRequest(req *Request) error {
Expand All @@ -127,32 +132,154 @@ func validateSubmitRequest(req *Request) error {
return nil
}

func (s *DirectSubmitter) submitOnce(ctx context.Context, req *Request) (*Result, error) {
account, err := s.app.AccountInfo(ctx, s.signer.Address())
if err != nil {
return nil, fmt.Errorf("query submission account: %w", err)
func (s *DirectSubmitter) broadcastTx(ctx context.Context, req *Request) (*TxStatus, error) {
s.mu.Lock()
defer s.mu.Unlock()

var lastErr error
for range maxSequenceRetryRounds {
account, err := s.nextAccountLocked(ctx)
if err != nil {
return nil, err
}

txBytes, err := s.buildBlobTx(req, account)
if err != nil {
return nil, err
}

broadcast, err := s.app.BroadcastTx(ctx, txBytes)
if err != nil {
if isSequenceMismatchText(err.Error()) {
s.recoverSequenceLocked(account, err.Error())
lastErr = fmt.Errorf("%w: %w", errSequenceMismatch, err)
continue
}
return nil, fmt.Errorf("broadcast blob tx: %w", err)
}
if err := checkTxStatus("broadcast", broadcast); err != nil {
if errors.Is(err, errSequenceMismatch) {
s.recoverSequenceLocked(account, err.Error())
lastErr = err
continue
}
return nil, err
}

if broadcast.Hash != "" {
s.rememberPendingLocked(broadcast.Hash, account.Sequence)
}
s.nextSequence = account.Sequence + 1
s.sequenceReady = true
return broadcast, nil
}
if account == nil {
return nil, errors.New("query submission account: empty response")

return nil, lastErr
}

func (s *DirectSubmitter) nextAccountLocked(ctx context.Context) (*AccountInfo, error) {
if !s.sequenceReady {
account, err := s.app.AccountInfo(ctx, s.signer.Address())
if err != nil {
return nil, fmt.Errorf("query submission account: %w", err)
}
if account == nil {
return nil, errors.New("query submission account: empty response")
}

s.accountNumber = account.AccountNumber
s.nextSequence = account.Sequence
s.sequenceReady = true
if err := s.reconcilePendingLocked(ctx); err != nil {
return nil, err
Comment on lines +192 to +194

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep sequence cache invalid when pending reconcile fails

nextAccountLocked sets sequenceReady to true before running reconcilePendingLocked, so a transient GetTx error during reconciliation returns from Submit while leaving the cache marked ready. After that, later submissions skip both account refresh and reconciliation, continuing with potentially stale nextSequence/pending state instead of retrying reconciliation on the next request.

Useful? React with 👍 / 👎.

}
}

txBytes, err := s.buildBlobTx(req, account)
if err != nil {
return nil, err
return &AccountInfo{
Address: s.signer.Address(),
AccountNumber: s.accountNumber,
Sequence: s.nextSequence,
}, nil
}

func (s *DirectSubmitter) invalidateSequenceLocked() {
s.accountNumber = 0
s.nextSequence = 0
s.sequenceReady = false
}

func (s *DirectSubmitter) startSubmission() error {
s.mu.Lock()
defer s.mu.Unlock()

if s.maxInFlight > 0 && s.inFlight >= s.maxInFlight {
return errTooManyInFlight
}
s.inFlight++
return nil
}

broadcast, err := s.app.BroadcastTx(ctx, txBytes)
if err != nil {
if isSequenceMismatchText(err.Error()) {
return nil, fmt.Errorf("%w: %w", errSequenceMismatch, err)
func (s *DirectSubmitter) finishSubmission() {
s.mu.Lock()
defer s.mu.Unlock()

if s.inFlight > 0 {
s.inFlight--
}
}

func (s *DirectSubmitter) recoverSequenceLocked(account *AccountInfo, errText string) {
expected, ok := expectedSequenceFromMismatchText(errText)
if !ok {
s.invalidateSequenceLocked()
return
}

s.accountNumber = account.AccountNumber
s.nextSequence = expected
s.sequenceReady = true
}

func (s *DirectSubmitter) reconcilePendingLocked(ctx context.Context) error {
if len(s.pendingSequences) == 0 {
return nil
}

nextSequence := s.nextSequence
for hash, sequence := range s.pendingSequences {
_, err := s.app.GetTx(ctx, hash)
if err == nil {
delete(s.pendingSequences, hash)
continue
}
return nil, fmt.Errorf("broadcast blob tx: %w", err)
if isTxNotFound(err) {
if sequence >= nextSequence {
nextSequence = sequence + 1
}
continue
}
return fmt.Errorf("reconcile pending blob tx %s: %w", hash, err)
}
if err := checkTxStatus("broadcast", broadcast); err != nil {
return nil, err

s.nextSequence = nextSequence
return nil
}
Comment on lines +243 to +266
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential unbounded growth of pendingSequences map.

When GetTx returns NotFound, the entry is not deleted from pendingSequences. This is correct for in-flight transactions that haven't been indexed yet. However, if a transaction is dropped from the mempool or evicted, the entry will remain indefinitely, leading to slow memory growth over time.

Consider adding a cleanup mechanism, such as removing entries older than a certain age or limiting the map size.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/submit/direct.go` around lines 243 - 266, The reconcilePendingLocked loop
can leave entries in the pendingSequences map forever when GetTx returns
NotFound; update the data model and logic: change pendingSequences to store a
struct with the original sequence and a timestamp (e.g., pendingEntry{seq int64,
added time.Time}), and in reconcilePendingLocked (and wherever entries are
added) remove entries whose added timestamp is older than a configurable
threshold (or enforce a max map size by evicting oldest entries) instead of
keeping them indefinitely; reference the reconcilePendingLocked method,
pendingSequences map, GetTx call and isTxNotFound branch to implement the
time-based cleanup or size eviction and update s.nextSequence logic accordingly.


func (s *DirectSubmitter) rememberPendingLocked(hash string, sequence uint64) {
if hash == "" {
return
}
s.pendingSequences[hash] = sequence
}

return s.waitForConfirmation(ctx, broadcast.Hash)
func (s *DirectSubmitter) clearPending(hash string) {
if hash == "" {
return
}

s.mu.Lock()
defer s.mu.Unlock()
delete(s.pendingSequences, hash)
}

func (s *DirectSubmitter) buildBlobTx(req *Request, account *AccountInfo) ([]byte, error) {
Expand Down Expand Up @@ -297,6 +424,7 @@ func (s *DirectSubmitter) waitForConfirmation(parent context.Context, hash strin
for {
tx, err := s.app.GetTx(ctx, hash)
if err == nil {
s.clearPending(hash)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid mutex-coupling confirmation completion to broadcasts

Calling s.clearPending(hash) here can block a confirmed submission behind another goroutine that is still inside broadcastTx, because clearPending and broadcastTx share s.mu and broadcastTx holds it across RPC calls. In that situation, this request has already observed confirmation but cannot return until the unrelated broadcast finishes, which can add unbounded latency and bypass the intended confirmation-timeout behavior.

Useful? React with 👍 / 👎.

if err := checkTxStatus("confirm", tx); err != nil {
return nil, err
}
Expand Down Expand Up @@ -339,6 +467,29 @@ func isSequenceMismatchText(text string) bool {
return strings.Contains(text, "account sequence mismatch") || strings.Contains(text, "incorrect account sequence")
}

func expectedSequenceFromMismatchText(text string) (uint64, bool) {
lower := strings.ToLower(text)
idx := strings.Index(lower, "expected ")
if idx < 0 {
return 0, false
}

start := idx + len("expected ")
end := start
for end < len(lower) && lower[end] >= '0' && lower[end] <= '9' {
end++
}
if end == start {
return 0, false
}

sequence, err := strconv.ParseUint(lower[start:end], 10, 64)
if err != nil {
return 0, false
}
return sequence, true
}

func isTxNotFound(err error) bool {
return status.Code(err) == codes.NotFound
}
Loading
Loading