diff --git a/.env.example b/.env.example index f382b9f..28be630 100644 --- a/.env.example +++ b/.env.example @@ -30,6 +30,18 @@ RPC_BATCH_SIZE=20 # API_DB_MAX_CONNECTIONS=20 # SSE_REPLAY_BUFFER_BLOCKS=4096 # replay tail used only for active connected clients +# Optional: enable DA (Data Availability) inclusion tracking from ev-node. +# Set this to true only when you also provide EVNODE_URL below. +ENABLE_DA_TRACKING=false + +# Required when ENABLE_DA_TRACKING=true. +# Must be reachable from the atlas-server process/container. +# EVNODE_URL=http://:7331 + +# Optional when ENABLE_DA_TRACKING=true. +# DA_RPC_REQUESTS_PER_SECOND=50 +# DA_WORKER_CONCURRENCY=50 + # Optional faucet feature # FAUCET_ENABLED=false # FAUCET_PRIVATE_KEY=0x... diff --git a/CLAUDE.md b/CLAUDE.md index 68e11d6..cebbaf7 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -66,7 +66,7 @@ let cursor = (total_count - 1) - (pagination.page.saturating_sub(1) as i64) * li ### Row count estimation For large tables (transactions, addresses), use `pg_class.reltuples` instead of `COUNT(*)`: ```rust -// handlers/mod.rs — get_table_count(pool) +// handlers/mod.rs — get_table_count(pool, table_name) // Partition-aware: sums child reltuples, falls back to parent // For tables < 100k rows: falls back to exact COUNT(*) ``` @@ -77,19 +77,25 @@ For large tables (transactions, addresses), use `pg_class.reltuples` instead of ### AppState (API) ```rust pub struct AppState { - pub pool: PgPool, // API pool only - pub block_events_tx: broadcast::Sender<()>, // shared with indexer + pub pool: PgPool, // API pool only + pub block_events_tx: broadcast::Sender<()>, // shared with indexer + pub da_events_tx: broadcast::Sender>, // shared with DA worker + pub head_tracker: Arc, pub rpc_url: String, - pub solc_path: String, - pub admin_api_key: Option, + pub da_tracking_enabled: bool, + pub chain_id: u64, + pub chain_name: String, } ``` +### DA tracking (optional) +When `ENABLE_DA_TRACKING=true`, a background DA worker queries ev-node for Celestia inclusion heights per block. `EVNODE_URL` is required only in that mode. Updates are pushed to SSE clients via an in-process `broadcast::Sender>`. The SSE handler streams `da_batch` events for incremental updates and emits `da_resync` when a client falls behind and should refetch visible DA state. + ### Frontend API client - Base URL: `/api` (proxied by nginx to `atlas-server:3000`) -- Fast polling endpoint: `GET /api/height` → `{ block_height, indexed_at }` — single key-value lookup from `indexer_state`, sub-ms. Used by the navbar as a polling fallback when SSE is disconnected. +- Fast polling endpoint: `GET /api/height` → `{ block_height, indexed_at, features: { da_tracking } }` — serves from `head_tracker` first and falls back to `indexer_state` when the in-memory head is empty. Used by the navbar as a polling fallback when SSE is disconnected and by feature-flag consumers. - Chain status: `GET /api/status` → `{ chain_id, chain_name, block_height, total_transactions, total_addresses, indexed_at }` — full chain info, fetched once on page load. -- `GET /api/events` → SSE stream of `new_block` events, one per block in order. Primary live-update path for navbar counter and blocks page. Falls back to `/api/height` polling on disconnect. +- `GET /api/events` → SSE stream of `new_block`, `da_batch`, and `da_resync` events. Primary live-update path for navbar counter, blocks page, block detail DA status, and DA resync handling. Falls back to `/api/height` polling on disconnect. ## Important Conventions @@ -109,6 +115,7 @@ Key vars (see `.env.example` for full list): |---|---|---| | `DATABASE_URL` | all | required | | `RPC_URL` | server | required | +| `CHAIN_NAME` | server | `"Unknown"` | | `DB_MAX_CONNECTIONS` | indexer pool | `20` | | `API_DB_MAX_CONNECTIONS` | API pool | `20` | | `BATCH_SIZE` | indexer | `100` | @@ -116,6 +123,10 @@ Key vars (see `.env.example` for full list): | `ADMIN_API_KEY` | API | none | | `API_HOST` | API | `127.0.0.1` | | `API_PORT` | API | `3000` | +| `ENABLE_DA_TRACKING` | server | `false` | +| `EVNODE_URL` | server | none | +| `DA_RPC_REQUESTS_PER_SECOND` | DA worker | `50` | +| `DA_WORKER_CONCURRENCY` | DA worker | `50` | ## Running Locally diff --git a/backend/crates/atlas-common/src/types.rs b/backend/crates/atlas-common/src/types.rs index 699e2f3..6edd8a9 100644 --- a/backend/crates/atlas-common/src/types.rs +++ b/backend/crates/atlas-common/src/types.rs @@ -16,6 +16,18 @@ pub struct Block { pub indexed_at: DateTime, } +/// DA (Data Availability) status for a block on L2 chains using Celestia. +/// Only populated when DA tracking is enabled and the DA worker has checked the block. +#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] +pub struct BlockDaStatus { + pub block_number: i64, + /// Celestia height where the block header was submitted. 0 = pending. + pub header_da_height: i64, + /// Celestia height where the block data was submitted. 0 = pending. + pub data_da_height: i64, + pub updated_at: DateTime, +} + /// Transaction data as stored in the database #[derive(Debug, Clone, Serialize, Deserialize, FromRow)] pub struct Transaction { diff --git a/backend/crates/atlas-server/src/api/handlers/blocks.rs b/backend/crates/atlas-server/src/api/handlers/blocks.rs index f4a08cb..67106f0 100644 --- a/backend/crates/atlas-server/src/api/handlers/blocks.rs +++ b/backend/crates/atlas-server/src/api/handlers/blocks.rs @@ -2,16 +2,27 @@ use axum::{ extract::{Path, Query, State}, Json, }; +use serde::Serialize; use std::sync::Arc; use crate::api::error::ApiResult; use crate::api::AppState; -use atlas_common::{AtlasError, Block, PaginatedResponse, Pagination, Transaction}; +use atlas_common::{AtlasError, Block, BlockDaStatus, PaginatedResponse, Pagination, Transaction}; + +/// Block response with optional DA status. +/// DA fields are always present in the JSON (null when no data), +/// so the frontend can rely on a stable schema. +#[derive(Serialize)] +pub struct BlockResponse { + #[serde(flatten)] + pub block: Block, + pub da_status: Option, +} pub async fn list_blocks( State(state): State>, Query(pagination): Query, -) -> ApiResult>> { +) -> ApiResult>> { // Use MAX(number) + 1 instead of COUNT(*) - blocks are sequential so this is accurate // This is ~6500x faster than COUNT(*) on large tables let total: (Option,) = sqlx::query_as("SELECT MAX(number) + 1 FROM blocks") @@ -30,15 +41,37 @@ pub async fn list_blocks( FROM blocks WHERE number <= $2 ORDER BY number DESC - LIMIT $1" + LIMIT $1", ) .bind(limit) .bind(cursor) .fetch_all(&state.pool) .await?; + // Batch-fetch DA status for all blocks in this page + let block_numbers: Vec = blocks.iter().map(|b| b.number).collect(); + let da_rows: Vec = sqlx::query_as( + "SELECT block_number, header_da_height, data_da_height, updated_at + FROM block_da_status + WHERE block_number = ANY($1)", + ) + .bind(&block_numbers) + .fetch_all(&state.pool) + .await?; + + let da_map: std::collections::HashMap = + da_rows.into_iter().map(|d| (d.block_number, d)).collect(); + + let responses: Vec = blocks + .into_iter() + .map(|block| { + let da_status = da_map.get(&block.number).cloned(); + BlockResponse { block, da_status } + }) + .collect(); + Ok(Json(PaginatedResponse::new( - blocks, + responses, pagination.page, pagination.limit, total_count, @@ -48,18 +81,27 @@ pub async fn list_blocks( pub async fn get_block( State(state): State>, Path(number): Path, -) -> ApiResult> { +) -> ApiResult> { let block: Block = sqlx::query_as( "SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at FROM blocks - WHERE number = $1" + WHERE number = $1", ) .bind(number) .fetch_optional(&state.pool) .await? .ok_or_else(|| AtlasError::NotFound(format!("Block {} not found", number)))?; - Ok(Json(block)) + let da_status: Option = sqlx::query_as( + "SELECT block_number, header_da_height, data_da_height, updated_at + FROM block_da_status + WHERE block_number = $1", + ) + .bind(number) + .fetch_optional(&state.pool) + .await?; + + Ok(Json(BlockResponse { block, da_status })) } pub async fn get_block_transactions( @@ -77,7 +119,7 @@ pub async fn get_block_transactions( FROM transactions WHERE block_number = $1 ORDER BY block_index ASC - LIMIT $2 OFFSET $3" + LIMIT $2 OFFSET $3", ) .bind(number) .bind(pagination.limit()) diff --git a/backend/crates/atlas-server/src/api/handlers/faucet.rs b/backend/crates/atlas-server/src/api/handlers/faucet.rs index 80efb5c..0302383 100644 --- a/backend/crates/atlas-server/src/api/handlers/faucet.rs +++ b/backend/crates/atlas-server/src/api/handlers/faucet.rs @@ -161,11 +161,14 @@ mod tests { .expect("lazy pool"); let head_tracker = Arc::new(crate::head::HeadTracker::empty(10)); let (tx, _) = broadcast::channel(1); + let (da_tx, _) = broadcast::channel(1); Arc::new(AppState { pool, block_events_tx: tx, + da_events_tx: da_tx, head_tracker, rpc_url: String::new(), + da_tracking_enabled: false, faucet, chain_id: 1, chain_name: "Test Chain".to_string(), diff --git a/backend/crates/atlas-server/src/api/handlers/sse.rs b/backend/crates/atlas-server/src/api/handlers/sse.rs index b53b407..16c60ae 100644 --- a/backend/crates/atlas-server/src/api/handlers/sse.rs +++ b/backend/crates/atlas-server/src/api/handlers/sse.rs @@ -1,12 +1,10 @@ use axum::{ extract::State, response::sse::{Event, Sse}, - response::IntoResponse, }; use futures::stream::Stream; use serde::Serialize; use std::convert::Infallible; -use std::pin::Pin; use std::sync::Arc; use std::time::Duration; use tokio::sync::broadcast; @@ -14,22 +12,39 @@ use tokio::sync::broadcast; use crate::api::handlers::get_latest_block; use crate::api::AppState; use crate::head::HeadTracker; +use crate::indexer::DaSseUpdate; use atlas_common::Block; use sqlx::PgPool; use tracing::warn; -type SseStream = Pin> + Send>>; - #[derive(Serialize, Debug)] struct NewBlockEvent { block: Block, } -/// Build the SSE block stream. Separated from the handler for testability. -fn make_block_stream( +#[derive(Serialize, Debug)] +struct DaUpdateEvent { + block_number: i64, + header_da_height: i64, + data_da_height: i64, +} + +#[derive(Serialize, Debug)] +struct DaBatchEvent { + updates: Vec, +} + +#[derive(Serialize, Debug)] +struct DaResyncEvent { + required: bool, +} + +/// Build the SSE stream. Separated from the handler for testability. +fn make_event_stream( pool: PgPool, head_tracker: Arc, - mut rx: broadcast::Receiver<()>, + mut block_rx: broadcast::Receiver<()>, + mut da_rx: broadcast::Receiver>, ) -> impl Stream> + Send { async_stream::stream! { let mut last_block_number: Option = None; @@ -53,63 +68,86 @@ fn make_block_stream( }, } - while let Ok(()) | Err(broadcast::error::RecvError::Lagged(_)) = rx.recv().await { - let snapshot = head_tracker.replay_after(last_block_number).await; - - if let (Some(cursor), Some(buffer_start)) = (last_block_number, snapshot.buffer_start) { - if cursor + 1 < buffer_start { - warn!( - last_seen = cursor, - buffer_start, - buffer_end = ?snapshot.buffer_end, - "sse head-only: client fell behind replay tail; closing stream for canonical refetch" - ); - break; - } - } - - if !snapshot.blocks_after_cursor.is_empty() { - for block in snapshot.blocks_after_cursor { - last_block_number = Some(block.number); - if let Some(event) = block_to_event(block) { - yield Ok(event); + loop { + tokio::select! { + result = block_rx.recv() => { + match result { + Ok(()) | Err(broadcast::error::RecvError::Lagged(_)) => { + let snapshot = head_tracker.replay_after(last_block_number).await; + + if let (Some(cursor), Some(buffer_start)) = (last_block_number, snapshot.buffer_start) { + if cursor + 1 < buffer_start { + warn!( + last_seen = cursor, + buffer_start, + buffer_end = ?snapshot.buffer_end, + "sse head-only: client fell behind replay tail; closing stream for canonical refetch" + ); + break; + } + } + + if !snapshot.blocks_after_cursor.is_empty() { + for block in snapshot.blocks_after_cursor { + last_block_number = Some(block.number); + if let Some(event) = block_to_event(block) { + yield Ok(event); + } + } + continue; + } + + match head_tracker.latest().await { + Some(block) if last_block_number.is_none_or(|last_seen| block.number > last_seen) => { + last_block_number = Some(block.number); + if let Some(event) = block_to_event(block) { + yield Ok(event); + } + } + Some(_) | None => {} + } + } + Err(broadcast::error::RecvError::Closed) => break, } } - continue; - } - - match head_tracker.latest().await { - Some(block) if last_block_number.is_none_or(|last_seen| block.number > last_seen) => { - last_block_number = Some(block.number); - if let Some(event) = block_to_event(block) { - yield Ok(event); + result = da_rx.recv() => { + match result { + Ok(updates) => { + if let Some(event) = da_batch_to_event(&updates) { + yield Ok(event); + } + } + Err(broadcast::error::RecvError::Lagged(skipped)) => { + warn!( + skipped, + "sse da: client fell behind DA update stream; requesting resync" + ); + if let Some(event) = da_resync_event() { + yield Ok(event); + } + } + Err(broadcast::error::RecvError::Closed) => break, } } - Some(_) | None => {} } } } } /// GET /api/events — Server-Sent Events stream for live committed block updates. -/// New connections receive only the current latest block and then stream -/// forward from in-memory committed head state. Historical catch-up stays on -/// the canonical block endpoints. -pub async fn block_events(State(state): State>) -> impl IntoResponse { - let stream = make_block_stream( +/// New connections receive the current latest block and then stream forward from +/// in-memory committed head state, plus DA status update batches. If the DA +/// stream lags, the handler emits `da_resync` so the frontend can refetch the +/// visible DA state instead of silently going stale. +pub async fn block_events( + State(state): State>, +) -> Sse>> { + let stream = make_event_stream( state.pool.clone(), state.head_tracker.clone(), state.block_events_tx.subscribe(), + state.da_events_tx.subscribe(), ); - sse_response(stream) -} - -fn sse_response(stream: S) -> impl IntoResponse -where - S: Stream> + Send + 'static, -{ - let stream: SseStream = Box::pin(stream); - Sse::new(stream).keep_alive( axum::response::sse::KeepAlive::new() .interval(Duration::from_secs(15)) @@ -124,6 +162,31 @@ fn block_to_event(block: Block) -> Option { .map(|json| Event::default().event("new_block").data(json)) } +fn da_batch_to_event(updates: &[DaSseUpdate]) -> Option { + if updates.is_empty() { + return None; + } + let batch = DaBatchEvent { + updates: updates + .iter() + .map(|da| DaUpdateEvent { + block_number: da.block_number, + header_da_height: da.header_da_height, + data_da_height: da.data_da_height, + }) + .collect(), + }; + serde_json::to_string(&batch) + .ok() + .map(|json| Event::default().event("da_batch").data(json)) +} + +fn da_resync_event() -> Option { + serde_json::to_string(&DaResyncEvent { required: true }) + .ok() + .map(|json| Event::default().event("da_resync").data(json)) +} + #[cfg(test)] mod tests { use super::*; @@ -144,6 +207,14 @@ mod tests { } } + fn sample_da_update(block_number: i64) -> DaSseUpdate { + DaSseUpdate { + block_number, + header_da_height: block_number * 10, + data_da_height: block_number * 10 + 1, + } + } + /// Lazy PgPool that never connects — safe for tests that don't hit the DB. fn dummy_pool() -> PgPool { sqlx::postgres::PgPoolOptions::new() @@ -194,6 +265,13 @@ mod tests { } } + #[test] + fn da_resync_event_serializes_with_required_flag() { + let event = da_resync_event().expect("event should serialize"); + let debug = format!("{event:?}"); + assert!(debug.contains("da_resync")); + } + #[tokio::test] async fn stream_seeds_from_head_tracker() { let tracker = Arc::new(HeadTracker::empty(10)); @@ -202,12 +280,13 @@ mod tests { .await; let (tx, _) = broadcast::channel::<()>(16); - let rx = tx.subscribe(); - let stream = make_block_stream(dummy_pool(), tracker, rx); + let (da_tx, _) = broadcast::channel::>(16); + let stream = make_event_stream(dummy_pool(), tracker, tx.subscribe(), da_tx.subscribe()); tokio::pin!(stream); - // Drop sender so loop terminates after the initial seed + // Drop sender so loop terminates after the initial seed. drop(tx); + drop(da_tx); let first = tokio::time::timeout(Duration::from_secs(1), stream.next()).await; assert!( @@ -228,16 +307,21 @@ mod tests { .await; let (tx, _) = broadcast::channel::<()>(16); - let rx = tx.subscribe(); - let stream = make_block_stream(dummy_pool(), tracker.clone(), rx); + let (da_tx, _) = broadcast::channel::>(16); + let stream = make_event_stream( + dummy_pool(), + tracker.clone(), + tx.subscribe(), + da_tx.subscribe(), + ); tokio::pin!(stream); - // Consume initial seed + // Consume initial seed. let _ = tokio::time::timeout(Duration::from_secs(1), stream.next()) .await .unwrap(); - // Publish a new block and broadcast + // Publish a new block and broadcast. tracker .publish_committed_batch(vec![sample_block(43)]) .await; @@ -251,27 +335,33 @@ mod tests { ); drop(tx); + drop(da_tx); } #[tokio::test] async fn stream_terminates_when_client_behind_tail() { - // Buffer capacity 3: only keeps 3 most recent blocks + // Buffer capacity 3: only keeps 3 most recent blocks. let tracker = Arc::new(HeadTracker::empty(3)); tracker .publish_committed_batch(vec![sample_block(10), sample_block(11), sample_block(12)]) .await; let (tx, _) = broadcast::channel::<()>(16); - let rx = tx.subscribe(); - let stream = make_block_stream(dummy_pool(), tracker.clone(), rx); + let (da_tx, _) = broadcast::channel::>(16); + let stream = make_event_stream( + dummy_pool(), + tracker.clone(), + tx.subscribe(), + da_tx.subscribe(), + ); tokio::pin!(stream); - // Consume initial seed (latest = block 12) + // Consume initial seed (latest = block 12). let _ = tokio::time::timeout(Duration::from_secs(1), stream.next()) .await .unwrap(); - // Advance buffer far ahead: client cursor=12, buffer will be [23,24,25] + // Advance buffer far ahead: client cursor=12, buffer will be [23,24,25]. tracker .publish_committed_batch(vec![ sample_block(20), @@ -284,7 +374,7 @@ mod tests { .await; tx.send(()).unwrap(); - // Stream should detect behind-tail and terminate + // Stream should detect behind-tail and terminate. let result = tokio::time::timeout(Duration::from_secs(2), async { while (stream.next().await).is_some() {} }) @@ -293,5 +383,39 @@ mod tests { result.is_ok(), "stream should terminate when client falls behind replay tail" ); + + drop(tx); + drop(da_tx); + } + + #[tokio::test] + async fn stream_emits_da_resync_when_da_updates_lag() { + let tracker = Arc::new(HeadTracker::empty(10)); + tracker + .publish_committed_batch(vec![sample_block(42)]) + .await; + + let (tx, _) = broadcast::channel::<()>(16); + let (da_tx, _) = broadcast::channel::>(1); + let stream = make_event_stream(dummy_pool(), tracker, tx.subscribe(), da_tx.subscribe()); + tokio::pin!(stream); + + let _ = tokio::time::timeout(Duration::from_secs(1), stream.next()) + .await + .unwrap(); + + da_tx.send(vec![sample_da_update(100)]).unwrap(); + da_tx.send(vec![sample_da_update(101)]).unwrap(); + + let next = tokio::time::timeout(Duration::from_secs(1), stream.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + let debug = format!("{next:?}"); + assert!(debug.contains("da_resync")); + + drop(tx); + drop(da_tx); } } diff --git a/backend/crates/atlas-server/src/api/handlers/status.rs b/backend/crates/atlas-server/src/api/handlers/status.rs index 792e3e2..1053274 100644 --- a/backend/crates/atlas-server/src/api/handlers/status.rs +++ b/backend/crates/atlas-server/src/api/handlers/status.rs @@ -6,10 +6,16 @@ use crate::api::error::ApiResult; use crate::api::handlers::get_table_count; use crate::api::AppState; +#[derive(Serialize)] +pub struct ChainFeatures { + pub da_tracking: bool, +} + #[derive(Serialize)] pub struct HeightResponse { pub block_height: i64, pub indexed_at: String, + pub features: ChainFeatures, } #[derive(Serialize)] @@ -45,10 +51,14 @@ async fn latest_height_and_indexed_at(state: &AppState) -> Result<(i64, String), /// Returns in <1ms, optimized for frequent polling. pub async fn get_height(State(state): State>) -> ApiResult> { let (block_height, indexed_at) = latest_height_and_indexed_at(&state).await?; + let features = ChainFeatures { + da_tracking: state.da_tracking_enabled, + }; Ok(Json(HeightResponse { block_height, indexed_at, + features, })) } @@ -89,15 +99,18 @@ mod tests { } fn test_state(head_tracker: Arc) -> State> { - let (tx, _) = tokio::sync::broadcast::channel(1); + let (block_tx, _) = tokio::sync::broadcast::channel(1); + let (da_tx, _) = tokio::sync::broadcast::channel(1); let pool = sqlx::postgres::PgPoolOptions::new() .connect_lazy("postgres://test@localhost:5432/test") .expect("lazy pool"); State(Arc::new(AppState { pool, - block_events_tx: tx, + block_events_tx: block_tx, + da_events_tx: da_tx, head_tracker, rpc_url: String::new(), + da_tracking_enabled: false, faucet: None, chain_id: 1, chain_name: "Test Chain".to_string(), @@ -116,6 +129,7 @@ mod tests { assert_eq!(status.block_height, 42); assert!(!status.indexed_at.is_empty()); + assert!(!status.features.da_tracking); } #[tokio::test] diff --git a/backend/crates/atlas-server/src/api/mod.rs b/backend/crates/atlas-server/src/api/mod.rs index 81950c4..ae8440e 100644 --- a/backend/crates/atlas-server/src/api/mod.rs +++ b/backend/crates/atlas-server/src/api/mod.rs @@ -12,12 +12,15 @@ use tower_http::trace::TraceLayer; use crate::faucet::SharedFaucetBackend; use crate::head::HeadTracker; +use crate::indexer::DaSseUpdate; pub struct AppState { pub pool: PgPool, pub block_events_tx: broadcast::Sender<()>, + pub da_events_tx: broadcast::Sender>, pub head_tracker: Arc, pub rpc_url: String, + pub da_tracking_enabled: bool, pub faucet: Option, pub chain_id: u64, pub chain_name: String, @@ -236,11 +239,14 @@ mod tests { .expect("lazy pool"); let head_tracker = Arc::new(crate::head::HeadTracker::empty(10)); let (tx, _) = broadcast::channel(1); + let (da_tx, _) = broadcast::channel(1); Arc::new(AppState { pool, block_events_tx: tx, + da_events_tx: da_tx, head_tracker, rpc_url: String::new(), + da_tracking_enabled: false, faucet, chain_id: 1, chain_name: "Test Chain".to_string(), diff --git a/backend/crates/atlas-server/src/config.rs b/backend/crates/atlas-server/src/config.rs index ab9d691..6d0bb73 100644 --- a/backend/crates/atlas-server/src/config.rs +++ b/backend/crates/atlas-server/src/config.rs @@ -3,6 +3,9 @@ use alloy::signers::local::PrivateKeySigner; use anyhow::{bail, Context, Result}; use std::{env, str::FromStr}; +const DEFAULT_DA_WORKER_CONCURRENCY: u32 = 50; +const DEFAULT_DA_RPC_REQUESTS_PER_SECOND: u32 = 50; + #[derive(Debug, Clone)] pub struct Config { // Shared @@ -26,6 +29,12 @@ pub struct Config { pub fetch_workers: u32, pub rpc_batch_size: u32, + // DA tracking (optional) + pub da_tracking_enabled: bool, + pub evnode_url: Option, + pub da_worker_concurrency: u32, + pub da_rpc_requests_per_second: u32, + // API-specific pub api_host: String, pub api_port: u16, @@ -68,6 +77,50 @@ impl Config { bail!("SSE_REPLAY_BUFFER_BLOCKS must be between 1 and 100000"); } + let da_tracking_enabled: bool = env::var("ENABLE_DA_TRACKING") + .unwrap_or_else(|_| "false".to_string()) + .parse() + .context("Invalid ENABLE_DA_TRACKING")?; + + let raw_evnode_url = env::var("EVNODE_URL") + .ok() + .map(|url| url.trim().to_string()) + .filter(|url| !url.is_empty()); + + let evnode_url = if da_tracking_enabled { + Some(raw_evnode_url.ok_or_else(|| { + anyhow::anyhow!("EVNODE_URL must be set when DA tracking is enabled") + })?) + } else { + None + }; + + let da_worker_concurrency = if da_tracking_enabled { + let value: u32 = env::var("DA_WORKER_CONCURRENCY") + .unwrap_or_else(|_| DEFAULT_DA_WORKER_CONCURRENCY.to_string()) + .parse() + .context("Invalid DA_WORKER_CONCURRENCY")?; + if value == 0 { + bail!("DA_WORKER_CONCURRENCY must be greater than 0"); + } + value + } else { + DEFAULT_DA_WORKER_CONCURRENCY + }; + + let da_rpc_requests_per_second = if da_tracking_enabled { + let value: u32 = env::var("DA_RPC_REQUESTS_PER_SECOND") + .unwrap_or_else(|_| DEFAULT_DA_RPC_REQUESTS_PER_SECOND.to_string()) + .parse() + .context("Invalid DA_RPC_REQUESTS_PER_SECOND")?; + if value == 0 { + bail!("DA_RPC_REQUESTS_PER_SECOND must be greater than 0"); + } + value + } else { + DEFAULT_DA_RPC_REQUESTS_PER_SECOND + }; + Ok(Self { database_url: env::var("DATABASE_URL").context("DATABASE_URL must be set")?, rpc_url: env::var("RPC_URL").context("RPC_URL must be set")?, @@ -116,6 +169,11 @@ impl Config { .parse() .context("Invalid RPC_BATCH_SIZE")?, + da_tracking_enabled, + evnode_url, + da_worker_concurrency, + da_rpc_requests_per_second, + api_host: env::var("API_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()), api_port: env::var("API_PORT") .unwrap_or_else(|_| "3000".to_string()) @@ -230,6 +288,20 @@ mod tests { env::set_var("RPC_URL", "http://localhost:8545"); } + fn clear_da_env() { + env::remove_var("ENABLE_DA_TRACKING"); + env::remove_var("EVNODE_URL"); + env::remove_var("DA_WORKER_CONCURRENCY"); + env::remove_var("DA_RPC_REQUESTS_PER_SECOND"); + } + + fn clear_faucet_env() { + env::remove_var("FAUCET_ENABLED"); + env::remove_var("FAUCET_PRIVATE_KEY"); + env::remove_var("FAUCET_AMOUNT"); + env::remove_var("FAUCET_COOLDOWN_MINUTES"); + } + fn set_valid_faucet_env() { env::set_var("FAUCET_ENABLED", "true"); env::set_var( @@ -244,6 +316,7 @@ mod tests { fn sse_replay_buffer_validation() { let _lock = ENV_LOCK.lock().unwrap(); set_required_env(); + clear_da_env(); // Default env::remove_var("SSE_REPLAY_BUFFER_BLOCKS"); @@ -271,15 +344,97 @@ mod tests { .contains("Invalid SSE_REPLAY_BUFFER_BLOCKS")); env::remove_var("SSE_REPLAY_BUFFER_BLOCKS"); + clear_da_env(); + } + + #[test] + fn da_tracking_is_disabled_by_default_and_ignores_da_specific_env() { + let _lock = ENV_LOCK.lock().unwrap(); + set_required_env(); + clear_da_env(); + + env::set_var("EVNODE_URL", "http://ev-node:7331"); + env::set_var("DA_WORKER_CONCURRENCY", "not-a-number"); + env::set_var("DA_RPC_REQUESTS_PER_SECOND", "not-a-number"); + + let config = Config::from_env().unwrap(); + assert!(!config.da_tracking_enabled); + assert!(config.evnode_url.is_none()); + assert_eq!(config.da_worker_concurrency, DEFAULT_DA_WORKER_CONCURRENCY); + assert_eq!( + config.da_rpc_requests_per_second, + DEFAULT_DA_RPC_REQUESTS_PER_SECOND + ); + + clear_da_env(); + } + + #[test] + fn da_tracking_requires_non_empty_evnode_url_when_enabled() { + let _lock = ENV_LOCK.lock().unwrap(); + set_required_env(); + clear_da_env(); + + env::set_var("ENABLE_DA_TRACKING", "true"); + let err = Config::from_env().unwrap_err(); + assert!(err + .to_string() + .contains("EVNODE_URL must be set when DA tracking is enabled")); + + env::set_var("EVNODE_URL", " "); + let err = Config::from_env().unwrap_err(); + assert!(err + .to_string() + .contains("EVNODE_URL must be set when DA tracking is enabled")); + + clear_da_env(); + } + + #[test] + fn evnode_url_alone_does_not_enable_da_tracking() { + let _lock = ENV_LOCK.lock().unwrap(); + set_required_env(); + clear_da_env(); + + env::set_var("EVNODE_URL", "http://ev-node:7331"); + + let config = Config::from_env().unwrap(); + assert!(!config.da_tracking_enabled); + assert!(config.evnode_url.is_none()); + assert_eq!(config.da_worker_concurrency, DEFAULT_DA_WORKER_CONCURRENCY); + assert_eq!( + config.da_rpc_requests_per_second, + DEFAULT_DA_RPC_REQUESTS_PER_SECOND + ); + + clear_da_env(); + } + + #[test] + fn da_tracking_parses_enabled_config() { + let _lock = ENV_LOCK.lock().unwrap(); + set_required_env(); + clear_da_env(); + + env::set_var("ENABLE_DA_TRACKING", "true"); + env::set_var("EVNODE_URL", "http://localhost:7331/"); + env::set_var("DA_WORKER_CONCURRENCY", "12"); + env::set_var("DA_RPC_REQUESTS_PER_SECOND", "34"); + + let config = Config::from_env().unwrap(); + assert!(config.da_tracking_enabled); + assert_eq!(config.evnode_url.as_deref(), Some("http://localhost:7331/")); + assert_eq!(config.da_worker_concurrency, 12); + assert_eq!(config.da_rpc_requests_per_second, 34); + + clear_da_env(); } #[test] fn faucet_config_defaults_disabled() { let _lock = ENV_LOCK.lock().unwrap(); env::remove_var("FAUCET_ENABLED"); - env::remove_var("FAUCET_PRIVATE_KEY"); - env::remove_var("FAUCET_AMOUNT"); - env::remove_var("FAUCET_COOLDOWN_MINUTES"); + clear_faucet_env(); let faucet = FaucetConfig::from_env().unwrap(); assert!(!faucet.enabled); diff --git a/backend/crates/atlas-server/src/indexer/da_worker.rs b/backend/crates/atlas-server/src/indexer/da_worker.rs new file mode 100644 index 0000000..17b1fcf --- /dev/null +++ b/backend/crates/atlas-server/src/indexer/da_worker.rs @@ -0,0 +1,363 @@ +//! Background DA (Data Availability) worker for tracking Celestia inclusion status. +//! +//! This worker queries ev-node's Connect RPC service to determine at which Celestia +//! height each block's header and data were submitted. +//! +//! ## Two-phase design +//! +//! The worker runs in a loop with a fixed RPC budget per cycle (BATCH_SIZE): +//! +//! 1. **Backfill** — Discovers blocks in the `blocks` table that are missing from +//! `block_da_status`. Queries ev-node for each and INSERTs the result. +//! **Always inserts a row, even when DA heights are 0** (block not yet included +//! on Celestia). This marks the block as "checked" so the backfill phase won't +//! re-query it on the next cycle. Processes newest blocks first so the UI shows +//! current data immediately. +//! +//! 2. **Update pending** — Finds rows where `header_da_height = 0 OR data_da_height = 0` +//! and re-queries ev-node. Updates with new values when the block has been included. +//! Processes newest pending blocks first (most relevant to UI users). +//! +//! Both phases share the same per-cycle RPC budget. Backfill runs first and takes +//! what it needs; pending gets the remainder. This ensures new blocks are checked +//! promptly while pending blocks still make progress every cycle. +//! +//! A block flows: backfill (phase 1) → update-pending (phase 2) → done. +//! +//! After each batch, the worker sends updated block numbers through an in-process +//! broadcast channel so the SSE handler can push live DA status changes to clients. + +use anyhow::Result; +use futures::stream::{self, StreamExt}; +use governor::{Quota, RateLimiter}; +use sqlx::PgPool; +use std::num::NonZeroU32; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::broadcast; + +use super::evnode::EvnodeClient; + +/// Total RPC budget per cycle, split between backfill and pending. +const BATCH_SIZE: i64 = 100; + +/// Sleep when idle (no work in either phase). +const IDLE_SLEEP: Duration = Duration::from_millis(500); + +const SELECT_MISSING_BLOCKS_SQL: &str = "SELECT b.number FROM blocks b + LEFT JOIN block_da_status d ON d.block_number = b.number + WHERE d.block_number IS NULL + ORDER BY b.number DESC + LIMIT $1"; + +const INSERT_DA_STATUS_SQL: &str = + "INSERT INTO block_da_status (block_number, header_da_height, data_da_height) + VALUES ($1, $2, $3) + ON CONFLICT (block_number) DO UPDATE SET + header_da_height = EXCLUDED.header_da_height, + data_da_height = EXCLUDED.data_da_height, + updated_at = NOW()"; + +const SELECT_PENDING_BLOCKS_SQL: &str = "SELECT block_number FROM block_da_status + WHERE header_da_height = 0 OR data_da_height = 0 + ORDER BY block_number DESC + LIMIT $1"; + +const UPDATE_PENDING_DA_STATUS_SQL: &str = "UPDATE block_da_status + SET header_da_height = $2, data_da_height = $3, updated_at = NOW() + WHERE block_number = $1 + AND (header_da_height, data_da_height) IS DISTINCT FROM ($2, $3)"; + +#[derive(Clone, Debug)] +pub struct DaSseUpdate { + pub block_number: i64, + pub header_da_height: i64, + pub data_da_height: i64, +} + +pub struct DaWorker { + pool: PgPool, + client: EvnodeClient, + concurrency: usize, + requests_per_second: u32, + rate_limiter: Arc< + RateLimiter< + governor::state::NotKeyed, + governor::state::InMemoryState, + governor::clock::DefaultClock, + >, + >, + da_events_tx: broadcast::Sender>, +} + +impl DaWorker { + pub fn new( + pool: PgPool, + evnode_url: &str, + concurrency: u32, + requests_per_second: u32, + da_events_tx: broadcast::Sender>, + ) -> Result { + let concurrency = NonZeroU32::new(concurrency) + .ok_or_else(|| anyhow::anyhow!("DA_WORKER_CONCURRENCY must be greater than 0"))?; + let rate = NonZeroU32::new(requests_per_second) + .ok_or_else(|| anyhow::anyhow!("DA_RPC_REQUESTS_PER_SECOND must be greater than 0"))?; + Ok(Self { + pool, + client: EvnodeClient::new(evnode_url), + concurrency: concurrency.get() as usize, + requests_per_second, + rate_limiter: Arc::new(RateLimiter::direct(Quota::per_second(rate))), + da_events_tx, + }) + } + + pub async fn run(&self) -> Result<()> { + tracing::info!( + "DA worker started (concurrency: {}, rate_limit: {} req/s)", + self.concurrency, + self.requests_per_second + ); + + loop { + // Phase 1: backfill gets first pick of the budget + let backfilled = self.backfill_new_blocks(BATCH_SIZE).await?; + + // Phase 2: pending gets whatever budget remains + let remaining = BATCH_SIZE - backfilled as i64; + let updated = if remaining > 0 { + self.update_pending_blocks(remaining).await? + } else { + 0 + }; + + let did_work = backfilled > 0 || updated > 0; + if did_work { + tracing::info!( + "DA worker cycle: backfilled {}, updated {} pending", + backfilled, + updated + ); + } else { + tokio::time::sleep(IDLE_SLEEP).await; + } + } + } + + /// Notify SSE subscribers of DA status changes via in-process broadcast channel. + fn notify_da_updates(&self, updates: &[DaSseUpdate]) { + if updates.is_empty() { + return; + } + let _ = self.da_events_tx.send(updates.to_vec()); + } + + /// Phase 1: Find blocks missing from block_da_status and query ev-node. + /// Returns the number of blocks processed. + async fn backfill_new_blocks(&self, limit: i64) -> Result { + let missing: Vec<(i64,)> = sqlx::query_as(SELECT_MISSING_BLOCKS_SQL) + .bind(limit) + .fetch_all(&self.pool) + .await?; + + if missing.is_empty() { + return Ok(0); + } + + let pool = &self.pool; + let client = &self.client; + let rate_limiter = &self.rate_limiter; + + let results: Vec> = stream::iter(missing) + .map(|(block_number,)| async move { + rate_limiter.until_ready().await; + match client.get_da_status(block_number as u64).await { + Ok((header_da, data_da)) => { + if let Err(e) = sqlx::query(INSERT_DA_STATUS_SQL) + .bind(block_number) + .bind(header_da as i64) + .bind(data_da as i64) + .execute(pool) + .await + { + tracing::warn!( + "Failed to insert DA status for block {}: {}", + block_number, + e + ); + return None; + } + Some(DaSseUpdate { + block_number, + header_da_height: header_da as i64, + data_da_height: data_da as i64, + }) + } + Err(e) => { + tracing::warn!( + "Failed to fetch DA status for block {}: {}", + block_number, + e + ); + None + } + } + }) + .buffer_unordered(self.concurrency) + .collect() + .await; + + let updates: Vec = results.into_iter().flatten().collect(); + self.notify_da_updates(&updates); + + Ok(updates.len()) + } + + /// Phase 2: Re-check blocks where DA heights are still 0. + /// Returns the number of blocks processed. + async fn update_pending_blocks(&self, limit: i64) -> Result { + let pending: Vec<(i64,)> = sqlx::query_as(SELECT_PENDING_BLOCKS_SQL) + .bind(limit) + .fetch_all(&self.pool) + .await?; + + if pending.is_empty() { + return Ok(0); + } + + let pool = &self.pool; + let client = &self.client; + let rate_limiter = &self.rate_limiter; + + let results: Vec> = stream::iter(pending) + .map(|(block_number,)| async move { + rate_limiter.until_ready().await; + match client.get_da_status(block_number as u64).await { + Ok((header_da, data_da)) => { + match sqlx::query(UPDATE_PENDING_DA_STATUS_SQL) + .bind(block_number) + .bind(header_da as i64) + .bind(data_da as i64) + .execute(pool) + .await + { + Ok(result) if result.rows_affected() > 0 => Some(DaSseUpdate { + block_number, + header_da_height: header_da as i64, + data_da_height: data_da as i64, + }), + Ok(_) => None, + Err(e) => { + tracing::warn!( + "Failed to update DA status for block {}: {}", + block_number, + e + ); + None + } + } + } + Err(e) => { + tracing::warn!( + "Failed to fetch DA status for block {}: {}", + block_number, + e + ); + None + } + } + }) + .buffer_unordered(self.concurrency) + .collect() + .await; + + let updates: Vec = results.into_iter().flatten().collect(); + self.notify_da_updates(&updates); + + Ok(updates.len()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_pool() -> PgPool { + sqlx::postgres::PgPoolOptions::new() + .connect_lazy("postgres://test@localhost:5432/test") + .expect("lazy pool creation should not fail") + } + + #[tokio::test] + async fn new_rejects_zero_concurrency() { + let (tx, _) = broadcast::channel(1); + let err = DaWorker::new(test_pool(), "http://localhost:7331", 0, 50, tx) + .err() + .expect("zero concurrency should fail"); + + assert!(err + .to_string() + .contains("DA_WORKER_CONCURRENCY must be greater than 0")); + } + + #[tokio::test] + async fn new_rejects_zero_rate_limit() { + let (tx, _) = broadcast::channel(1); + let err = DaWorker::new(test_pool(), "http://localhost:7331", 4, 0, tx) + .err() + .expect("zero rate limit should fail"); + + assert!(err + .to_string() + .contains("DA_RPC_REQUESTS_PER_SECOND must be greater than 0")); + } + + #[tokio::test] + async fn notify_da_updates_sends_full_batch() { + let (tx, mut rx) = broadcast::channel(1); + let worker = DaWorker::new(test_pool(), "http://localhost:7331", 4, 50, tx).unwrap(); + let updates = vec![ + DaSseUpdate { + block_number: 10, + header_da_height: 100, + data_da_height: 101, + }, + DaSseUpdate { + block_number: 11, + header_da_height: 110, + data_da_height: 111, + }, + ]; + + worker.notify_da_updates(&updates); + + let received = rx.recv().await.expect("batch should be broadcast"); + assert_eq!(received.len(), 2); + assert_eq!(received[0].block_number, 10); + assert_eq!(received[1].data_da_height, 111); + } + + #[tokio::test] + async fn notify_da_updates_skips_empty_batch() { + let (tx, mut rx) = broadcast::channel(1); + let worker = DaWorker::new(test_pool(), "http://localhost:7331", 4, 50, tx).unwrap(); + + worker.notify_da_updates(&[]); + + let result = tokio::time::timeout(Duration::from_millis(50), rx.recv()).await; + assert!(result.is_err(), "empty batch should not be broadcast"); + } + + #[test] + fn scheduler_queries_prioritize_newest_blocks() { + assert!(SELECT_MISSING_BLOCKS_SQL.contains("ORDER BY b.number DESC")); + assert!(SELECT_PENDING_BLOCKS_SQL.contains("ORDER BY block_number DESC")); + assert!(SELECT_MISSING_BLOCKS_SQL.contains("LIMIT $1")); + assert!(SELECT_PENDING_BLOCKS_SQL.contains("LIMIT $1")); + } + + #[test] + fn pending_update_sql_suppresses_noop_writes() { + assert!(UPDATE_PENDING_DA_STATUS_SQL.contains("IS DISTINCT FROM")); + } +} diff --git a/backend/crates/atlas-server/src/indexer/evnode.rs b/backend/crates/atlas-server/src/indexer/evnode.rs new file mode 100644 index 0000000..fb8d51d --- /dev/null +++ b/backend/crates/atlas-server/src/indexer/evnode.rs @@ -0,0 +1,195 @@ +//! ev-node Connect RPC client for querying DA (Data Availability) status. +//! +//! ev-node exposes a Connect RPC service (`StoreService`) that provides +//! consensus/DA layer data separate from the standard EVM JSON-RPC API. +//! This module wraps the `GetBlock` RPC to extract DA inclusion heights. +//! +//! Uses the Connect RPC JSON codec (`application/json`), which ev-node +//! supports out of the box alongside protobuf. + +use anyhow::{bail, Result}; +use serde::{Deserialize, Serialize}; +use std::time::Duration; + +/// Connect RPC JSON request for StoreService.GetBlock. +/// uint64 fields are encoded as strings per Connect RPC convention. +#[derive(Serialize)] +struct GetBlockRequest { + height: String, +} + +/// Connect RPC JSON response for StoreService.GetBlock. +/// We only extract the DA height fields. +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct GetBlockResponse { + #[serde(default, deserialize_with = "deserialize_u64_string")] + header_da_height: u64, + #[serde(default, deserialize_with = "deserialize_u64_string")] + data_da_height: u64, +} + +/// Connect RPC encodes uint64 as JSON strings (e.g., `"123"` not `123`). +/// This deserializer handles both string and numeric representations. +fn deserialize_u64_string<'de, D>(deserializer: D) -> std::result::Result +where + D: serde::Deserializer<'de>, +{ + use serde::de; + + struct U64Visitor; + impl<'de> de::Visitor<'de> for U64Visitor { + type Value = u64; + fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.write_str("a u64 as a string or number") + } + fn visit_u64(self, v: u64) -> std::result::Result { + Ok(v) + } + fn visit_str(self, v: &str) -> std::result::Result { + v.parse().map_err(de::Error::custom) + } + } + deserializer.deserialize_any(U64Visitor) +} + +/// Retry delays for ev-node RPC calls (in milliseconds). +/// Fail fast — the background loop will retry on the next cycle anyway. +const RETRY_DELAYS_MS: &[u64] = &[100, 500, 1000]; +const MAX_RETRIES: usize = 3; + +/// Client for ev-node's Connect RPC StoreService. +pub struct EvnodeClient { + client: reqwest::Client, + url: String, +} + +impl EvnodeClient { + /// Create a new client pointing at the given ev-node Connect RPC URL. + pub fn new(evnode_url: &str) -> Self { + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(2)) + .build() + .expect("failed to create HTTP client"); + + let base = evnode_url.trim_end_matches('/'); + Self { + client, + url: format!("{base}/evnode.v1.StoreService/GetBlock"), + } + } + + /// Fetch DA inclusion heights for a block. + /// + /// Returns `(header_da_height, data_da_height)`. + /// Both are 0 if the block has not yet been submitted to Celestia. + /// + /// Retries with backoff on transient errors. + pub async fn get_da_status(&self, height: u64) -> Result<(u64, u64)> { + let mut last_error = None; + + for attempt in 0..MAX_RETRIES { + match self.do_request(height).await { + Ok((h, d)) => return Ok((h, d)), + Err(e) => { + last_error = Some(e); + if attempt + 1 < MAX_RETRIES { + let delay_ms = RETRY_DELAYS_MS + .get(attempt) + .copied() + .unwrap_or(*RETRY_DELAYS_MS.last().unwrap()); + + tracing::warn!( + "ev-node GetBlock failed for height {} (attempt {}): {}. Retrying in {}ms", + height, + attempt + 1, + last_error.as_ref().unwrap(), + delay_ms, + ); + + tokio::time::sleep(Duration::from_millis(delay_ms)).await; + } + } + } + } + + bail!( + "ev-node GetBlock failed for height {} after {} attempts: {}", + height, + MAX_RETRIES, + last_error.unwrap() + ) + } + + async fn do_request(&self, height: u64) -> Result<(u64, u64)> { + let request = GetBlockRequest { + height: height.to_string(), + }; + + let response = self + .client + .post(&self.url) + .header("Content-Type", "application/json") + .json(&request) + .send() + .await?; + + if !response.status().is_success() { + bail!( + "HTTP {}: {}", + response.status(), + response.text().await.unwrap_or_default() + ); + } + + let resp: GetBlockResponse = response.json().await?; + Ok((resp.header_da_height, resp.data_da_height)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn client_trims_trailing_slash() { + let client = EvnodeClient::new("http://localhost:7331/"); + assert_eq!( + client.url, + "http://localhost:7331/evnode.v1.StoreService/GetBlock" + ); + } + + #[test] + fn request_serializes_height_as_string() { + let req = GetBlockRequest { + height: 42.to_string(), + }; + let json = serde_json::to_string(&req).unwrap(); + assert_eq!(json, r#"{"height":"42"}"#); + } + + #[test] + fn response_deserializes_string_heights() { + let json = r#"{"headerDaHeight":"100","dataDaHeight":"200"}"#; + let resp: GetBlockResponse = serde_json::from_str(json).unwrap(); + assert_eq!(resp.header_da_height, 100); + assert_eq!(resp.data_da_height, 200); + } + + #[test] + fn response_deserializes_numeric_heights() { + let json = r#"{"headerDaHeight":100,"dataDaHeight":200}"#; + let resp: GetBlockResponse = serde_json::from_str(json).unwrap(); + assert_eq!(resp.header_da_height, 100); + assert_eq!(resp.data_da_height, 200); + } + + #[test] + fn response_defaults_missing_fields_to_zero() { + let json = r#"{}"#; + let resp: GetBlockResponse = serde_json::from_str(json).unwrap(); + assert_eq!(resp.header_da_height, 0); + assert_eq!(resp.data_da_height, 0); + } +} diff --git a/backend/crates/atlas-server/src/indexer/mod.rs b/backend/crates/atlas-server/src/indexer/mod.rs index c46af3c..05bf8a7 100644 --- a/backend/crates/atlas-server/src/indexer/mod.rs +++ b/backend/crates/atlas-server/src/indexer/mod.rs @@ -1,9 +1,12 @@ pub(crate) mod batch; pub(crate) mod copy; +pub mod da_worker; +pub(crate) mod evnode; pub(crate) mod fetcher; #[allow(clippy::module_inception)] pub mod indexer; pub mod metadata; +pub use da_worker::{DaSseUpdate, DaWorker}; pub use indexer::Indexer; pub use metadata::MetadataFetcher; diff --git a/backend/crates/atlas-server/src/main.rs b/backend/crates/atlas-server/src/main.rs index 4de168e..e24c31f 100644 --- a/backend/crates/atlas-server/src/main.rs +++ b/backend/crates/atlas-server/src/main.rs @@ -99,8 +99,9 @@ async fn main() -> Result<()> { let api_pool = atlas_common::db::create_pool(&config.database_url, config.api_db_max_connections).await?; - // Shared broadcast channel for SSE notifications + // Shared broadcast channels for SSE notifications let (block_events_tx, _) = broadcast::channel(1024); + let (da_events_tx, _) = broadcast::channel::>(256); let head_tracker = Arc::new(if config.reindex { head::HeadTracker::empty(config.sse_replay_buffer_blocks) } else { @@ -111,14 +112,17 @@ async fn main() -> Result<()> { let state = Arc::new(api::AppState { pool: api_pool, block_events_tx: block_events_tx.clone(), + da_events_tx: da_events_tx.clone(), head_tracker: head_tracker.clone(), rpc_url: config.rpc_url.clone(), + da_tracking_enabled: config.da_tracking_enabled, faucet, chain_id, chain_name: config.chain_name.clone(), }); // Spawn indexer task with retry logic + let da_pool = indexer_pool.clone(); let indexer = indexer::Indexer::new( indexer_pool.clone(), config.clone(), @@ -131,6 +135,31 @@ async fn main() -> Result<()> { } }); + // Spawn DA worker when DA tracking is explicitly enabled. + if config.da_tracking_enabled { + let evnode_url = config + .evnode_url + .as_deref() + .expect("DA tracking requires EVNODE_URL"); + tracing::info!( + "DA tracking enabled (workers: {}, rate_limit: {} req/s)", + config.da_worker_concurrency, + config.da_rpc_requests_per_second + ); + let da_worker = indexer::DaWorker::new( + da_pool, + evnode_url, + config.da_worker_concurrency, + config.da_rpc_requests_per_second, + da_events_tx, + )?; + tokio::spawn(async move { + if let Err(e) = run_with_retry(|| da_worker.run()).await { + tracing::error!("DA worker terminated with error: {}", e); + } + }); + } + // Spawn metadata fetcher in background let metadata_pool = indexer_pool; let metadata_config = config.clone(); diff --git a/backend/migrations/20240108000001_block_da_status.sql b/backend/migrations/20240108000001_block_da_status.sql index df6bd3f..5be2202 100644 --- a/backend/migrations/20240108000001_block_da_status.sql +++ b/backend/migrations/20240108000001_block_da_status.sql @@ -1,5 +1,5 @@ -- Block DA (Data Availability) status for L2 chains using Celestia. --- Only populated when EVNODE_URL is configured and the DA worker is running. +-- Only populated when ENABLE_DA_TRACKING=true and the DA worker is running. -- -- The DA worker has two phases: -- 1. Backfill: discovers blocks missing from this table, queries ev-node, and INSERTs. diff --git a/docker-compose.yml b/docker-compose.yml index 4e4be78..ffcc4cf 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -28,6 +28,10 @@ services: FETCH_WORKERS: ${FETCH_WORKERS:-10} RPC_REQUESTS_PER_SECOND: ${RPC_REQUESTS_PER_SECOND:-100} RPC_BATCH_SIZE: ${RPC_BATCH_SIZE:-20} + ENABLE_DA_TRACKING: ${ENABLE_DA_TRACKING:-false} + EVNODE_URL: ${EVNODE_URL:-} + DA_RPC_REQUESTS_PER_SECOND: ${DA_RPC_REQUESTS_PER_SECOND:-50} + DA_WORKER_CONCURRENCY: ${DA_WORKER_CONCURRENCY:-50} FAUCET_ENABLED: ${FAUCET_ENABLED:-false} FAUCET_PRIVATE_KEY: ${FAUCET_PRIVATE_KEY:-} FAUCET_AMOUNT: ${FAUCET_AMOUNT:-} diff --git a/frontend/src/api/status.ts b/frontend/src/api/status.ts index 390dfe8..3ab94b2 100644 --- a/frontend/src/api/status.ts +++ b/frontend/src/api/status.ts @@ -1,8 +1,10 @@ import client from './client'; +import type { ChainFeatures } from '../types'; export interface HeightResponse { block_height: number; indexed_at?: string; // ISO timestamp, absent when no blocks indexed + features: ChainFeatures; } export interface ChainStatusResponse { diff --git a/frontend/src/components/Layout.tsx b/frontend/src/components/Layout.tsx index b6ac519..9ac2abc 100644 --- a/frontend/src/components/Layout.tsx +++ b/frontend/src/components/Layout.tsx @@ -206,7 +206,16 @@ export default function Layout() { {/* Main content */}
- + diff --git a/frontend/src/context/BlockStatsContext.tsx b/frontend/src/context/BlockStatsContext.tsx index e07a305..63e7efa 100644 --- a/frontend/src/context/BlockStatsContext.tsx +++ b/frontend/src/context/BlockStatsContext.tsx @@ -1,17 +1,22 @@ import { createContext } from 'react'; -import type { NewBlockEvent } from '../hooks/useBlockSSE'; +import type { NewBlockEvent, DaSubscriber, DaResyncSubscriber } from '../hooks/useBlockSSE'; export interface BlockStats { bps: number | null; height: number | null; latestBlockEvent: NewBlockEvent | null; sseConnected: boolean; + subscribeDa: (cb: DaSubscriber) => () => void; + subscribeDaResync: (cb: DaResyncSubscriber) => () => void; } +const noopSubscribe = () => () => {}; + export const BlockStatsContext = createContext({ bps: null, height: null, latestBlockEvent: null, sseConnected: false, + subscribeDa: noopSubscribe, + subscribeDaResync: noopSubscribe, }); - diff --git a/frontend/src/hooks/index.ts b/frontend/src/hooks/index.ts index d9f3851..be81c3b 100644 --- a/frontend/src/hooks/index.ts +++ b/frontend/src/hooks/index.ts @@ -10,3 +10,4 @@ export * from './useProxies'; export { default as useFaucetInfo } from './useFaucetInfo'; export { default as useEthBalance } from './useEthBalance'; export { default as useEthPrice } from './useEthPrice'; +export { default as useFeatures } from './useFeatures'; diff --git a/frontend/src/hooks/useBlockSSE.ts b/frontend/src/hooks/useBlockSSE.ts index 49bf632..68cd179 100644 --- a/frontend/src/hooks/useBlockSSE.ts +++ b/frontend/src/hooks/useBlockSSE.ts @@ -7,13 +7,27 @@ export interface NewBlockEvent { block: Block; } +export interface DaUpdateEvent { + block_number: number; + header_da_height: number; + data_da_height: number; +} + +export interface DaBatchEvent { + updates: DaUpdateEvent[]; +} + +export type DaSubscriber = (updates: DaUpdateEvent[]) => void; +export type DaResyncSubscriber = () => void; + export interface BlockSSEState { latestBlock: NewBlockEvent | null; height: number | null; connected: boolean; error: string | null; bps: number | null; - lastUpdatedAt: number | null; + subscribeDa: (cb: DaSubscriber) => () => void; + subscribeDaResync: (cb: DaResyncSubscriber) => () => void; } type BlockLog = { num: number; ts: number }[]; @@ -22,11 +36,6 @@ const MIN_DRAIN_MS = 30; const MAX_DRAIN_MS = 500; const POLL_INTERVAL_MS = 2000; -/** - * Compute bps from a rolling log of (blockNumber, blockTimestamp) samples. - * Returns the rate from the first sample whose span >= minSpan, or from the - * oldest sample if its span >= fallbackMinSpan. Returns null if insufficient data. - */ function computeBpsFromLog(log: BlockLog, minSpan: number, fallbackMinSpan: number): number | null { if (log.length < 2) return null; const newest = log[log.length - 1]; @@ -38,21 +47,27 @@ function computeBpsFromLog(log: BlockLog, minSpan: number, fallbackMinSpan: numb return null; } -/** - * Connects to the SSE endpoint and delivers block events at the chain's natural - * cadence. Falls back to polling /api/status when SSE disconnects. - * - * The drain queue rate-limits setState calls so React doesn't collapse rapid - * arrivals from backend batch commits. The interval is derived from on-chain - * block timestamps (not arrival times), making visual cadence match the chain. - */ export default function useBlockSSE(): BlockSSEState { const [latestBlock, setLatestBlock] = useState(null); const [height, setHeight] = useState(null); const [connected, setConnected] = useState(false); const [error, setError] = useState(null); const [bps, setBps] = useState(null); - const [lastUpdatedAt, setLastUpdatedAt] = useState(null); + + const daSubscribersRef = useRef>(new Set()); + const daResyncSubscribersRef = useRef>(new Set()); + const subscribeDa = useCallback((cb: DaSubscriber) => { + daSubscribersRef.current.add(cb); + return () => { + daSubscribersRef.current.delete(cb); + }; + }, []); + const subscribeDaResync = useCallback((cb: DaResyncSubscriber) => { + daResyncSubscribersRef.current.add(cb); + return () => { + daResyncSubscribersRef.current.delete(cb); + }; + }, []); const esRef = useRef(null); const queueRef = useRef([]); @@ -64,8 +79,6 @@ export default function useBlockSSE(): BlockSSEState { const connectedRef = useRef(false); const highestSeenRef = useRef(-1); - // --- Drain: emit one block per chain-rate interval --- - const scheduleDrain = useCallback(() => { if (drainTimerRef.current !== null || queueRef.current.length === 0) return; const interval = Math.max(MIN_DRAIN_MS, drainIntervalRef.current); @@ -77,7 +90,6 @@ export default function useBlockSSE(): BlockSSEState { const queue = queueRef.current; if (queue.length === 0) return; - // If queue backs up, skip to near the end if (queue.length > 50) { const skip = queue.splice(0, queue.length - 5); const last = skip[skip.length - 1]; @@ -87,7 +99,6 @@ export default function useBlockSSE(): BlockSSEState { const next = queue.shift()!; setLatestBlock(next); setHeight(next.block.number); - setLastUpdatedAt(Date.now()); if (queue.length > 0) scheduleDrain(); }, [scheduleDrain]); @@ -100,16 +111,13 @@ export default function useBlockSSE(): BlockSSEState { if (data.block.number <= highestSeenRef.current) return; highestSeenRef.current = data.block.number; - // Update bps from on-chain block timestamps const log = blockLogRef.current; log.push({ num: data.block.number, ts: data.block.timestamp }); if (log.length > 500) blockLogRef.current = log.slice(-500); - // Displayed bps: 30s window for stability, 5s fallback while bootstrapping const displayBps = computeBpsFromLog(blockLogRef.current, 30, 5); if (displayBps !== null) setBps(displayBps); - // Drain pacing: 10s window for moderate responsiveness, 2s fallback const drainBps = computeBpsFromLog(blockLogRef.current, 10, 2); if (drainBps !== null) { drainIntervalRef.current = Math.max(MIN_DRAIN_MS, Math.min(MAX_DRAIN_MS, 1000 / drainBps)); @@ -119,8 +127,6 @@ export default function useBlockSSE(): BlockSSEState { scheduleDrain(); }, [scheduleDrain]); - // --- Polling fallback --- - const stopPolling = useCallback(() => { if (pollTimerRef.current !== null) { clearInterval(pollTimerRef.current); @@ -137,18 +143,17 @@ export default function useBlockSSE(): BlockSSEState { if (typeof status?.block_height === 'number' && !connectedRef.current && status.block_height > highestSeenRef.current) { highestSeenRef.current = status.block_height; setHeight(status.block_height); - setLastUpdatedAt(Date.now()); } } catch { // ignore polling errors } }; - poll(); - pollTimerRef.current = window.setInterval(poll, POLL_INTERVAL_MS); + void poll(); + pollTimerRef.current = window.setInterval(() => { + void poll(); + }, POLL_INTERVAL_MS); }, []); - // --- SSE connection --- - const connect = useCallback(() => { if (esRef.current) esRef.current.close(); @@ -170,6 +175,21 @@ export default function useBlockSSE(): BlockSSEState { } }); + es.addEventListener('da_batch', (e: MessageEvent) => { + try { + const data: DaBatchEvent = JSON.parse(e.data); + if (data.updates?.length) { + for (const cb of daSubscribersRef.current) cb(data.updates); + } + } catch { + // ignore malformed events + } + }); + + es.addEventListener('da_resync', () => { + for (const cb of daResyncSubscribersRef.current) cb(); + }); + es.onerror = (e) => { connectedRef.current = false; setConnected(false); @@ -193,5 +213,5 @@ export default function useBlockSSE(): BlockSSEState { }; }, [connect, stopPolling]); - return { latestBlock, height, connected, error, bps, lastUpdatedAt }; + return { latestBlock, height, connected, error, bps, subscribeDa, subscribeDaResync }; } diff --git a/frontend/src/hooks/useFeatures.ts b/frontend/src/hooks/useFeatures.ts new file mode 100644 index 0000000..f9d3987 --- /dev/null +++ b/frontend/src/hooks/useFeatures.ts @@ -0,0 +1,68 @@ +import { useEffect, useState } from 'react'; +import { getHeight } from '../api/status'; +import type { ChainFeatures } from '../types'; + +const defaultFeatures: ChainFeatures = { da_tracking: false }; +type FeaturesListener = (features: ChainFeatures) => void; + +let cachedFeatures: ChainFeatures | null = null; +let featuresPromise: Promise | null = null; +const listeners = new Set(); + +function notifyFeatures(features: ChainFeatures) { + for (const listener of listeners) { + listener(features); + } +} + +function loadFeatures(): Promise { + if (cachedFeatures) { + return Promise.resolve(cachedFeatures); + } + + if (!featuresPromise) { + featuresPromise = getHeight() + .then((status) => status.features ?? defaultFeatures) + .catch(() => defaultFeatures) + .then((features) => { + cachedFeatures = features; + notifyFeatures(features); + return features; + }) + .finally(() => { + featuresPromise = null; + }); + } + + return featuresPromise; +} + +/** + * Returns cached chain feature flags, loading them only once per app session. + */ +export default function useFeatures(): ChainFeatures { + const [features, setFeatures] = useState(cachedFeatures ?? defaultFeatures); + + useEffect(() => { + let active = true; + const listener: FeaturesListener = (nextFeatures) => { + if (active) { + setFeatures(nextFeatures); + } + }; + + listeners.add(listener); + void loadFeatures().then((nextFeatures) => { + if (active) { + setFeatures(nextFeatures); + } + }); + + return () => { + active = false; + listeners.delete(listener); + }; + }, []); + + return features; +} diff --git a/frontend/src/index.css b/frontend/src/index.css index d6f3cd8..ca46cc6 100644 --- a/frontend/src/index.css +++ b/frontend/src/index.css @@ -168,6 +168,17 @@ 100% { opacity: 0.55; } } +/* DA dot pulse when status changes via SSE */ +.animate-da-pulse { + animation: da-pulse 1.5s ease-out; +} + +@keyframes da-pulse { + 0% { transform: scale(2.2); opacity: 0.5; box-shadow: 0 0 8px currentcolor; } + 40% { transform: scale(1.4); opacity: 0.8; } + 100% { transform: scale(1); opacity: 1; box-shadow: none; } +} + /* Smooth counter appearance on change */ .fade-in-up { animation: fadeInUp 280ms cubic-bezier(0.22, 1, 0.36, 1); diff --git a/frontend/src/pages/BlockDetailPage.tsx b/frontend/src/pages/BlockDetailPage.tsx index 67e1de2..ede4818 100644 --- a/frontend/src/pages/BlockDetailPage.tsx +++ b/frontend/src/pages/BlockDetailPage.tsx @@ -1,16 +1,64 @@ import { useParams, Link } from 'react-router-dom'; -import { useBlock, useBlockTransactions } from '../hooks'; +import { useBlock, useBlockTransactions, useFeatures } from '../hooks'; import { CopyButton, Loading, AddressLink, TxHashLink, StatusBadge } from '../components'; import { formatNumber, formatTimestamp, formatGas, truncateHash, formatTimeAgo, formatEther } from '../utils'; -import { useState } from 'react'; +import { useContext, useEffect, useState } from 'react'; import type { ReactNode } from 'react'; +import { BlockStatsContext } from '../context/BlockStatsContext'; +import type { BlockDaStatus } from '../types'; + +/** Format a DA height as a status indicator. */ +function formatDaStatus(daHeight: number): ReactNode { + if (daHeight > 0) { + return ( + + + {formatNumber(daHeight)} + + ); + } + return ( + + + Pending + + ); +} export default function BlockDetailPage() { const { number } = useParams<{ number: string }>(); const blockNumber = number ? parseInt(number, 10) : undefined; - const { block, loading: blockLoading, error: blockError } = useBlock(blockNumber); + const { block, loading: blockLoading, error: blockError, refetch: refetchBlock } = useBlock(blockNumber); + const features = useFeatures(); const [txPage, setTxPage] = useState(1); const { transactions, pagination, loading } = useBlockTransactions(blockNumber, { page: txPage, limit: 20 }); + const { subscribeDa, subscribeDaResync } = useContext(BlockStatsContext); + const [daOverride, setDaOverride] = useState(null); + + // Persist DA updates for this block until navigation or a full refetch catches up. + useEffect(() => { + if (!features.da_tracking) return; + return subscribeDa((updates) => { + const match = updates.find((update) => update.block_number === blockNumber); + if (!match) return; + setDaOverride({ + block_number: match.block_number, + header_da_height: match.header_da_height, + data_da_height: match.data_da_height, + updated_at: new Date().toISOString(), + }); + }); + }, [features.da_tracking, subscribeDa, blockNumber]); + + useEffect(() => { + if (!features.da_tracking) return; + return subscribeDaResync(() => { + setDaOverride(null); + void refetchBlock(); + }); + }, [features.da_tracking, refetchBlock, subscribeDaResync]); + + const currentDaOverride = daOverride?.block_number === blockNumber ? daOverride : null; type DetailRow = { label: string; value: ReactNode; stacked?: boolean }; const details: DetailRow[] = block ? [ @@ -44,6 +92,20 @@ export default function BlockDetailPage() { }, { label: 'Gas Used', value: formatGas(block.gas_used.toString()) }, { label: 'Gas Limit', value: formatGas(block.gas_limit.toString()) }, + // DA status rows — only shown when da_tracking feature is enabled + ...(features.da_tracking ? (() => { + const daStatus = currentDaOverride ?? block.da_status; + return [ + { + label: 'Header DA', + value: formatDaStatus(daStatus?.header_da_height ?? 0), + }, + { + label: 'Data DA', + value: formatDaStatus(daStatus?.data_da_height ?? 0), + }, + ]; + })() as DetailRow[] : []), ] : [ { label: 'Block Height', value: '---' }, { label: 'Timestamp', value: '---' }, diff --git a/frontend/src/pages/BlocksPage.tsx b/frontend/src/pages/BlocksPage.tsx index e6ae3bc..3e82e8a 100644 --- a/frontend/src/pages/BlocksPage.tsx +++ b/frontend/src/pages/BlocksPage.tsx @@ -1,9 +1,16 @@ import { useContext, useEffect, useMemo, useRef, useState } from 'react'; import { Link, useNavigate } from 'react-router-dom'; -import { useBlocks } from '../hooks'; +import { useBlocks, useFeatures } from '../hooks'; import { CopyButton, Loading } from '../components'; import { formatNumber, formatTimeAgo, formatGas, truncateHash } from '../utils'; import { BlockStatsContext } from '../context/BlockStatsContext'; +import type { BlockDaStatus } from '../types'; + +const BLOCKS_PAGE_SIZE = 20; + +function isDaIncluded(status: Pick | null | undefined): boolean { + return !!status && status.header_da_height > 0 && status.data_da_height > 0; +} export default function BlocksPage() { const [page, setPage] = useState(1); @@ -15,9 +22,19 @@ export default function BlocksPage() { return true; } }); - const { blocks: fetchedBlocks, pagination, refetch, loading } = useBlocks({ page, limit: 20 }); + const { blocks: fetchedBlocks, pagination, refetch, loading } = useBlocks({ page, limit: BLOCKS_PAGE_SIZE }); + const features = useFeatures(); const hasLoaded = !loading || pagination !== null; - const { latestBlockEvent, sseConnected } = useContext(BlockStatsContext); + const { latestBlockEvent, sseConnected, subscribeDa, subscribeDaResync } = useContext(BlockStatsContext); + const [daOverrides, setDaOverrides] = useState>(new Map()); + const [daHighlight, setDaHighlight] = useState>(new Set()); + const daOverridesRef = useRef>(new Map()); + const daOverridesSyncRafRef = useRef(null); + const daHighlightTimeoutsRef = useRef>(new Map()); + const baseDaIncludedRef = useRef>(new Map()); + const visibleDaBlocksRef = useRef>(new Set()); + const bufferedDaBlocksRef = useRef>(new Set()); + const [, setTick] = useState(0); const [sseBlocks, setSseBlocks] = useState([]); const lastSseBlockRef = useRef(null); const ssePrependRafRef = useRef(null); @@ -39,6 +56,7 @@ export default function BlocksPage() { if (lastSseBlockRef.current != null && block.number <= lastSseBlockRef.current) return; lastSseBlockRef.current = block.number; pendingSseBlocksRef.current.push(block); + bufferedDaBlocksRef.current.add(block.number); if (ssePrependRafRef.current !== null) return; // RAF already scheduled; block is buffered ssePrependRafRef.current = window.requestAnimationFrame(() => { const pending = pendingSseBlocksRef.current; @@ -52,12 +70,25 @@ export default function BlocksPage() { prepend.push(b); } prepend.reverse(); - return [...prepend, ...prev].slice(0, 20); + return [...prepend, ...prev].slice(0, BLOCKS_PAGE_SIZE); }); ssePrependRafRef.current = null; }); }, [latestBlockEvent, page, autoRefresh]); + useEffect(() => { + if (page !== 1 || !autoRefresh) { + bufferedDaBlocksRef.current = new Set(); + return; + } + + const next = new Set(sseBlocks.map((block) => block.number)); + for (const block of pendingSseBlocksRef.current) { + next.add(block.number); + } + bufferedDaBlocksRef.current = next; + }, [autoRefresh, page, sseBlocks]); + // Drop SSE blocks that are now present in fetchedBlocks to avoid duplicates, // but keep any that haven't been fetched yet. useEffect(() => { @@ -75,8 +106,140 @@ export default function BlocksPage() { const blocks = useMemo(() => { if (page !== 1 || !sseBlocks.length) return fetchedBlocks; const unique = sseBlocks.filter((b) => !fetchedNumberSet.has(b.number)); - return [...unique, ...fetchedBlocks].slice(0, 20); + return [...unique, ...fetchedBlocks].slice(0, BLOCKS_PAGE_SIZE); }, [fetchedBlocks, fetchedNumberSet, sseBlocks, page]); + + useEffect(() => { + if (!features.da_tracking) { + baseDaIncludedRef.current = new Map(); + visibleDaBlocksRef.current = new Set(); + if (daOverridesRef.current.size > 0) { + const empty = new Map(); + daOverridesRef.current = empty; + if (daOverridesSyncRafRef.current !== null) { + cancelAnimationFrame(daOverridesSyncRafRef.current); + } + daOverridesSyncRafRef.current = window.requestAnimationFrame(() => { + setDaOverrides(empty); + daOverridesSyncRafRef.current = null; + }); + } + return; + } + + const visible = new Set(); + const next = new Map(); + for (const block of blocks) { + visible.add(block.number); + next.set(block.number, isDaIncluded(block.da_status)); + } + baseDaIncludedRef.current = next; + visibleDaBlocksRef.current = visible; + const buffered = bufferedDaBlocksRef.current; + + let changed = false; + const nextOverrides = new Map(); + for (const [blockNumber, status] of daOverridesRef.current) { + if (!visible.has(blockNumber) && !buffered.has(blockNumber)) { + changed = true; + continue; + } + nextOverrides.set(blockNumber, status); + } + + if (changed || nextOverrides.size !== daOverridesRef.current.size) { + daOverridesRef.current = nextOverrides; + if (daOverridesSyncRafRef.current !== null) { + cancelAnimationFrame(daOverridesSyncRafRef.current); + } + daOverridesSyncRafRef.current = window.requestAnimationFrame(() => { + setDaOverrides(nextOverrides); + daOverridesSyncRafRef.current = null; + }); + } + }, [blocks, features.da_tracking]); + + // Subscribe to DA updates from SSE. setState is called inside the subscription + // callback (not synchronously in the effect body), satisfying react-hooks/set-state-in-effect. + useEffect(() => { + if (!features.da_tracking) return; + return subscribeDa((updates) => { + const visible = visibleDaBlocksRef.current; + const buffered = bufferedDaBlocksRef.current; + if (visible.size === 0 && buffered.size === 0) return; + + const next = new Map(); + for (const [blockNumber, status] of daOverridesRef.current) { + if (visible.has(blockNumber) || buffered.has(blockNumber)) { + next.set(blockNumber, status); + } + } + + const transitionedToIncluded: number[] = []; + let changed = next.size !== daOverridesRef.current.size; + + for (const update of updates) { + if (!visible.has(update.block_number) && !buffered.has(update.block_number)) continue; + + const prevStatus = next.get(update.block_number); + const wasIncluded = prevStatus + ? isDaIncluded(prevStatus) + : (baseDaIncludedRef.current.get(update.block_number) ?? false); + const nextStatus = { + block_number: update.block_number, + header_da_height: update.header_da_height, + data_da_height: update.data_da_height, + updated_at: new Date().toISOString(), + }; + + if ( + prevStatus?.header_da_height === nextStatus.header_da_height + && prevStatus?.data_da_height === nextStatus.data_da_height + ) { + continue; + } + + if (!wasIncluded && isDaIncluded(nextStatus)) { + transitionedToIncluded.push(update.block_number); + } + + next.set(update.block_number, nextStatus); + changed = true; + } + + if (!changed) return; + + daOverridesRef.current = next; + setDaOverrides(next); + + // Flash dots for 1.5s only when status transitions from pending -> included. + for (const blockNumber of transitionedToIncluded) { + setDaHighlight((prev) => new Set(prev).add(blockNumber)); + const existing = daHighlightTimeoutsRef.current.get(blockNumber); + if (existing !== undefined) clearTimeout(existing); + const t = window.setTimeout(() => { + setDaHighlight((prev) => { + const nextHighlight = new Set(prev); + nextHighlight.delete(blockNumber); + return nextHighlight; + }); + daHighlightTimeoutsRef.current.delete(blockNumber); + }, 1500); + daHighlightTimeoutsRef.current.set(blockNumber, t); + } + }); + }, [features.da_tracking, subscribeDa]); + + useEffect(() => { + if (!features.da_tracking) return; + return subscribeDaResync(() => { + const empty = new Map(); + daOverridesRef.current = empty; + setDaOverrides(empty); + void refetch(); + }); + }, [features.da_tracking, refetch, subscribeDaResync]); + const navigate = useNavigate(); const [sort, setSort] = useState<{ key: 'number' | 'hash' | 'timestamp' | 'transaction_count' | 'gas_used' | null; direction: 'asc' | 'desc'; }>({ key: null, direction: 'desc' }); const seenBlocksRef = useRef>(new Set()); @@ -84,7 +247,6 @@ export default function BlocksPage() { const [highlightBlocks, setHighlightBlocks] = useState>(new Set()); const timeoutsRef = useRef>(new Map()); const highlightRafRef = useRef(null); - const [, setTick] = useState(0); const handleSort = (key: 'number' | 'hash' | 'timestamp' | 'transaction_count' | 'gas_used') => { setSort((prev) => { @@ -202,11 +364,16 @@ export default function BlocksPage() { // Cleanup on unmount useEffect(() => { const activeTimeouts = timeoutsRef.current; + const activeDaTimeouts = daHighlightTimeoutsRef.current; return () => { if (highlightRafRef.current !== null) { window.cancelAnimationFrame(highlightRafRef.current); highlightRafRef.current = null; } + if (daOverridesSyncRafRef.current !== null) { + cancelAnimationFrame(daOverridesSyncRafRef.current); + daOverridesSyncRafRef.current = null; + } if (ssePrependRafRef.current !== null) { cancelAnimationFrame(ssePrependRafRef.current); ssePrependRafRef.current = null; @@ -218,6 +385,8 @@ export default function BlocksPage() { } for (const [, t] of activeTimeouts) clearTimeout(t); activeTimeouts.clear(); + for (const [, t] of activeDaTimeouts) clearTimeout(t); + activeDaTimeouts.clear(); }; }, []); @@ -323,6 +492,9 @@ export default function BlocksPage() { )} + {features.da_tracking && ( + DA + )} @@ -360,7 +532,23 @@ export default function BlocksPage() { {formatGas(block.gas_used.toString())} - + {features.da_tracking && (() => { + const daStatus = daOverrides.get(block.number) ?? block.da_status; + const flash = daHighlight.has(block.number); + const included = isDaIncluded(daStatus); + const includedTitle = daStatus + ? `Header: ${daStatus.header_da_height}, Data: ${daStatus.data_da_height}` + : 'DA included'; + return ( + + {included ? ( + + ) : ( + + )} + + ); + })()} ))} diff --git a/frontend/src/types/index.ts b/frontend/src/types/index.ts index c7e3563..73f560a 100644 --- a/frontend/src/types/index.ts +++ b/frontend/src/types/index.ts @@ -8,6 +8,21 @@ export interface Block { gas_limit: number; transaction_count: number; indexed_at: string; + da_status?: BlockDaStatus | null; +} + +// DA (Data Availability) status for L2 blocks using Celestia. +// Only present when the DA worker has checked the block. +export interface BlockDaStatus { + block_number: number; + header_da_height: number; + data_da_height: number; + updated_at: string; +} + +// Chain feature flags returned by /api/status +export interface ChainFeatures { + da_tracking: boolean; } // Transaction types