From 3a3f890ed6f4471faa50c9da0d482f3eabcb8670 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Mon, 16 Mar 2026 19:02:10 +0100 Subject: [PATCH 1/9] remove backfill, update docs, update frontend --- .env.example | 1 + backend/crates/atlas-server/Cargo.toml | 2 +- .../atlas-server/src/api/handlers/sse.rs | 107 +++++----- .../atlas-server/src/api/handlers/status.rs | 7 + backend/crates/atlas-server/src/api/mod.rs | 3 + backend/crates/atlas-server/src/config.rs | 12 +- backend/crates/atlas-server/src/head.rs | 185 ++++++++++++++++++ .../crates/atlas-server/src/indexer/batch.rs | 51 +++++ .../crates/atlas-server/src/indexer/copy.rs | 22 ++- .../atlas-server/src/indexer/indexer.rs | 26 ++- backend/crates/atlas-server/src/main.rs | 14 +- docs/API.md | 33 +++- docs/ARCHITECTURE.md | 50 ++--- frontend/src/hooks/useBlockSSE.ts | 31 ++- 14 files changed, 431 insertions(+), 113 deletions(-) create mode 100644 backend/crates/atlas-server/src/head.rs diff --git a/.env.example b/.env.example index 290c93e..31ef3ba 100644 --- a/.env.example +++ b/.env.example @@ -20,3 +20,4 @@ RPC_BATCH_SIZE=20 # API_HOST=127.0.0.1 # API_PORT=3000 # API_DB_MAX_CONNECTIONS=20 +# SSE_REPLAY_BUFFER_BLOCKS=4096 # replay tail used only for active connected clients diff --git a/backend/crates/atlas-server/Cargo.toml b/backend/crates/atlas-server/Cargo.toml index 40f894a..858b78f 100644 --- a/backend/crates/atlas-server/Cargo.toml +++ b/backend/crates/atlas-server/Cargo.toml @@ -32,7 +32,7 @@ async-stream = { workspace = true } num-bigint = "0.4" async-channel = "2.3" governor = "0.6" -tokio-postgres = { version = "0.7" } +tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] } tokio-postgres-rustls = "0.12" rustls = "0.23" webpki-roots = "0.26" diff --git a/backend/crates/atlas-server/src/api/handlers/sse.rs b/backend/crates/atlas-server/src/api/handlers/sse.rs index f1cddad..fd7a3ba 100644 --- a/backend/crates/atlas-server/src/api/handlers/sse.rs +++ b/backend/crates/atlas-server/src/api/handlers/sse.rs @@ -5,6 +5,7 @@ use axum::{ use futures::stream::Stream; use serde::Serialize; use std::convert::Infallible; +use std::pin::Pin; use std::sync::Arc; use std::time::Duration; use tokio::sync::broadcast; @@ -16,70 +17,92 @@ use tracing::warn; const BLOCK_COLUMNS: &str = "number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at"; -const FETCH_BATCH_SIZE: i64 = 256; +type SseStream = Pin> + Send>>; #[derive(Serialize, Debug)] struct NewBlockEvent { block: Block, } -/// GET /api/events — Server-Sent Events stream for live block updates. -/// Seeds from the latest indexed block, then requeries the DB for blocks added -/// after that point whenever the shared notification fanout emits a wake-up. -pub async fn block_events( - State(state): State>, -) -> Sse>> { +/// GET /api/events — Server-Sent Events stream for live committed block updates. +/// New connections receive only the current latest block and then stream +/// forward from in-memory committed head state. Historical catch-up stays on +/// the canonical block endpoints. +pub async fn block_events(State(state): State>) -> Sse { let pool = state.pool.clone(); + let head_tracker = state.head_tracker.clone(); let mut rx = state.block_events_tx.subscribe(); let stream = async_stream::stream! { let mut last_block_number: Option = None; - match fetch_latest_block(&pool).await { - Ok(Some(block)) => { - let block_number = block.number; - last_block_number = Some(block_number); + match head_tracker.latest().await { + Some(block) => { + last_block_number = Some(block.number); if let Some(event) = block_to_event(block) { yield Ok(event); } } - Ok(None) => {} - Err(e) => warn!(error = ?e, "sse: failed to fetch initial block"), + None => match fetch_latest_block(&pool).await { + Ok(Some(block)) => { + last_block_number = Some(block.number); + if let Some(event) = block_to_event(block) { + yield Ok(event); + } + } + Ok(None) => {} + Err(e) => warn!(error = ?e, "sse: failed to fetch initial block"), + }, } while let Ok(()) | Err(broadcast::error::RecvError::Lagged(_)) = rx.recv().await { - let mut cursor = last_block_number; - - loop { - match fetch_blocks_after(&pool, cursor).await { - Ok(blocks) => { - if blocks.is_empty() { - break; - } - - let batch_len = blocks.len(); - for block in blocks { - let block_number = block.number; - last_block_number = Some(block_number); - cursor = Some(block_number); - if let Some(event) = block_to_event(block) { - yield Ok(event); - } - } + if let Some(cursor) = last_block_number { + let snapshot = head_tracker.replay_after(Some(cursor)).await; + + if let Some(buffer_start) = snapshot.buffer_start { + if cursor < buffer_start.saturating_sub(1) { + warn!( + last_seen = cursor, + buffer_start, + buffer_end = ?snapshot.buffer_end, + "sse head-only: client fell behind replay tail; closing stream for canonical refetch" + ); + break; + } + } - if batch_len < FETCH_BATCH_SIZE as usize { - break; + if !snapshot.blocks_after_cursor.is_empty() { + for block in snapshot.blocks_after_cursor { + last_block_number = Some(block.number); + if let Some(event) = block_to_event(block) { + yield Ok(event); } } - Err(e) => { - warn!(error = ?e, cursor = ?last_block_number, "sse: failed to fetch blocks after wake-up"); - break; + continue; + } + } + + match head_tracker.latest().await { + Some(block) if last_block_number.is_none_or(|last_seen| block.number > last_seen) => { + last_block_number = Some(block.number); + if let Some(event) = block_to_event(block) { + yield Ok(event); } } + Some(_) | None => {} } } }; + sse_response(stream) +} + +fn sse_response(stream: S) -> Sse +where + S: Stream> + Send + 'static, +{ + let stream: SseStream = Box::pin(stream); + Sse::new(stream).keep_alive( axum::response::sse::KeepAlive::new() .interval(Duration::from_secs(15)) @@ -96,18 +119,6 @@ async fn fetch_latest_block(pool: &PgPool) -> Result, sqlx::Error> .await } -async fn fetch_blocks_after(pool: &PgPool, cursor: Option) -> Result, sqlx::Error> { - let lower_bound = cursor.unwrap_or(-1); - - sqlx::query_as(&format!( - "SELECT {} FROM blocks WHERE number > $1 ORDER BY number ASC LIMIT {}", - BLOCK_COLUMNS, FETCH_BATCH_SIZE - )) - .bind(lower_bound) - .fetch_all(pool) - .await -} - fn block_to_event(block: Block) -> Option { let event = NewBlockEvent { block }; serde_json::to_string(&event) diff --git a/backend/crates/atlas-server/src/api/handlers/status.rs b/backend/crates/atlas-server/src/api/handlers/status.rs index f7c7d49..1b5aae9 100644 --- a/backend/crates/atlas-server/src/api/handlers/status.rs +++ b/backend/crates/atlas-server/src/api/handlers/status.rs @@ -14,6 +14,13 @@ pub struct ChainStatus { /// GET /api/status - Lightweight endpoint for current chain status /// Returns in <1ms, optimized for frequent polling pub async fn get_status(State(state): State>) -> ApiResult> { + if let Some(block) = state.head_tracker.latest().await { + return Ok(Json(ChainStatus { + block_height: block.number, + indexed_at: block.indexed_at.to_rfc3339(), + })); + } + let result: (String, chrono::DateTime) = sqlx::query_as( "SELECT value, updated_at FROM indexer_state WHERE key = 'last_indexed_block'", ) diff --git a/backend/crates/atlas-server/src/api/mod.rs b/backend/crates/atlas-server/src/api/mod.rs index 9c0f63b..d2d62f8 100644 --- a/backend/crates/atlas-server/src/api/mod.rs +++ b/backend/crates/atlas-server/src/api/mod.rs @@ -10,9 +10,12 @@ use tower_http::cors::{Any, CorsLayer}; use tower_http::timeout::TimeoutLayer; use tower_http::trace::TraceLayer; +use crate::head::HeadTracker; + pub struct AppState { pub pool: PgPool, pub block_events_tx: broadcast::Sender<()>, + pub head_tracker: Arc, pub rpc_url: String, } diff --git a/backend/crates/atlas-server/src/config.rs b/backend/crates/atlas-server/src/config.rs index 9be99d1..ea7d243 100644 --- a/backend/crates/atlas-server/src/config.rs +++ b/backend/crates/atlas-server/src/config.rs @@ -1,4 +1,4 @@ -use anyhow::{Context, Result}; +use anyhow::{bail, Context, Result}; use std::env; #[derive(Debug, Clone)] @@ -27,10 +27,19 @@ pub struct Config { // API-specific pub api_host: String, pub api_port: u16, + pub sse_replay_buffer_blocks: usize, } impl Config { pub fn from_env() -> Result { + let sse_replay_buffer_blocks: usize = env::var("SSE_REPLAY_BUFFER_BLOCKS") + .unwrap_or_else(|_| "4096".to_string()) + .parse() + .context("Invalid SSE_REPLAY_BUFFER_BLOCKS")?; + if sse_replay_buffer_blocks == 0 { + bail!("SSE_REPLAY_BUFFER_BLOCKS must be greater than 0"); + } + Ok(Self { database_url: env::var("DATABASE_URL").context("DATABASE_URL must be set")?, rpc_url: env::var("RPC_URL").context("RPC_URL must be set")?, @@ -84,6 +93,7 @@ impl Config { .unwrap_or_else(|_| "3000".to_string()) .parse() .context("Invalid API_PORT")?, + sse_replay_buffer_blocks, }) } } diff --git a/backend/crates/atlas-server/src/head.rs b/backend/crates/atlas-server/src/head.rs new file mode 100644 index 0000000..471266c --- /dev/null +++ b/backend/crates/atlas-server/src/head.rs @@ -0,0 +1,185 @@ +use atlas_common::Block; +use sqlx::PgPool; +use std::collections::VecDeque; +use tokio::sync::RwLock; +use tracing::warn; + +const BLOCK_COLUMNS: &str = + "number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at"; + +pub(crate) struct HeadTracker { + replay_capacity: usize, + state: RwLock, +} + +#[derive(Default)] +struct HeadState { + latest: Option, + replay: VecDeque, +} + +pub(crate) struct ReplaySnapshot { + pub buffer_start: Option, + pub buffer_end: Option, + pub blocks_after_cursor: Vec, +} + +impl HeadTracker { + pub(crate) async fn bootstrap( + pool: &PgPool, + replay_capacity: usize, + ) -> Result { + let mut blocks = sqlx::query_as::<_, Block>(&format!( + "SELECT {} FROM blocks ORDER BY number DESC LIMIT $1", + BLOCK_COLUMNS + )) + .bind(replay_capacity as i64) + .fetch_all(pool) + .await?; + blocks.reverse(); + + let latest = blocks.last().cloned(); + let replay = VecDeque::from(blocks); + + Ok(Self { + replay_capacity, + state: RwLock::new(HeadState { latest, replay }), + }) + } + + pub(crate) fn empty(replay_capacity: usize) -> Self { + Self { + replay_capacity, + state: RwLock::new(HeadState::default()), + } + } + + pub(crate) async fn clear(&self) { + let mut state = self.state.write().await; + *state = HeadState::default(); + } + + pub(crate) async fn publish_committed_batch(&self, blocks: &[Block]) { + if blocks.is_empty() { + return; + } + + let mut state = self.state.write().await; + let mut latest_number = state.latest.as_ref().map(|block| block.number); + + for block in blocks { + if latest_number.is_some_and(|latest| block.number <= latest) { + warn!( + current_latest = ?latest_number, + incoming_block = block.number, + "ignoring non-advancing committed block publication" + ); + continue; + } + + state.latest = Some(block.clone()); + state.replay.push_back(block.clone()); + latest_number = Some(block.number); + } + + while state.replay.len() > self.replay_capacity { + state.replay.pop_front(); + } + } + + pub(crate) async fn latest(&self) -> Option { + self.state.read().await.latest.clone() + } + + pub(crate) async fn replay_after(&self, after_block: Option) -> ReplaySnapshot { + let state = self.state.read().await; + + let blocks_after_cursor = state + .replay + .iter() + .filter(|block| after_block.is_none_or(|cursor| block.number > cursor)) + .cloned() + .collect(); + + ReplaySnapshot { + buffer_start: state.replay.front().map(|block| block.number), + buffer_end: state.replay.back().map(|block| block.number), + blocks_after_cursor, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::{TimeZone, Utc}; + + fn sample_block(number: i64) -> Block { + Block { + number, + hash: format!("0x{number:064x}"), + parent_hash: format!("0x{:064x}", number.saturating_sub(1)), + timestamp: 1_700_000_000 + number, + gas_used: 21_000, + gas_limit: 30_000_000, + transaction_count: 1, + indexed_at: Utc.timestamp_opt(1_700_000_000 + number, 0).unwrap(), + } + } + + #[tokio::test] + async fn replay_after_returns_full_buffer_for_empty_cursor() { + let tracker = HeadTracker::empty(3); + tracker + .publish_committed_batch(&[sample_block(10), sample_block(11)]) + .await; + + let snapshot = tracker.replay_after(None).await; + let numbers: Vec = snapshot + .blocks_after_cursor + .into_iter() + .map(|block| block.number) + .collect(); + + assert_eq!(numbers, vec![10, 11]); + assert_eq!(snapshot.buffer_start, Some(10)); + assert_eq!(snapshot.buffer_end, Some(11)); + } + + #[tokio::test] + async fn publish_committed_batch_trims_oldest_blocks() { + let tracker = HeadTracker::empty(2); + tracker + .publish_committed_batch(&[sample_block(10), sample_block(11), sample_block(12)]) + .await; + + let snapshot = tracker.replay_after(None).await; + let numbers: Vec = snapshot + .blocks_after_cursor + .into_iter() + .map(|block| block.number) + .collect(); + + assert_eq!(numbers, vec![11, 12]); + assert_eq!(tracker.latest().await.unwrap().number, 12); + } + + #[tokio::test] + async fn publish_committed_batch_ignores_non_advancing_blocks() { + let tracker = HeadTracker::empty(3); + tracker.publish_committed_batch(&[sample_block(10)]).await; + tracker + .publish_committed_batch(&[sample_block(9), sample_block(10)]) + .await; + + let snapshot = tracker.replay_after(None).await; + let numbers: Vec = snapshot + .blocks_after_cursor + .into_iter() + .map(|block| block.number) + .collect(); + + assert_eq!(numbers, vec![10]); + assert_eq!(tracker.latest().await.unwrap().number, 10); + } +} diff --git a/backend/crates/atlas-server/src/indexer/batch.rs b/backend/crates/atlas-server/src/indexer/batch.rs index b4fb8f4..7c49f1c 100644 --- a/backend/crates/atlas-server/src/indexer/batch.rs +++ b/backend/crates/atlas-server/src/indexer/batch.rs @@ -1,6 +1,9 @@ use bigdecimal::BigDecimal; use std::collections::{HashMap, HashSet}; +use atlas_common::Block; +use chrono::{DateTime, Utc}; + // --------------------------------------------------------------------------- // Batch accumulator - collects data from multiple blocks before writing to DB // --------------------------------------------------------------------------- @@ -154,12 +157,35 @@ impl BlockBatch { entry.delta += delta; entry.last_block = entry.last_block.max(block); } + + pub(crate) fn materialize_blocks(&self, indexed_at: DateTime) -> Vec { + debug_assert_eq!(self.b_numbers.len(), self.b_hashes.len()); + debug_assert_eq!(self.b_numbers.len(), self.b_parent_hashes.len()); + debug_assert_eq!(self.b_numbers.len(), self.b_timestamps.len()); + debug_assert_eq!(self.b_numbers.len(), self.b_gas_used.len()); + debug_assert_eq!(self.b_numbers.len(), self.b_gas_limits.len()); + debug_assert_eq!(self.b_numbers.len(), self.b_tx_counts.len()); + + (0..self.b_numbers.len()) + .map(|i| Block { + number: self.b_numbers[i], + hash: self.b_hashes[i].clone(), + parent_hash: self.b_parent_hashes[i].clone(), + timestamp: self.b_timestamps[i], + gas_used: self.b_gas_used[i], + gas_limit: self.b_gas_limits[i], + transaction_count: self.b_tx_counts[i], + indexed_at, + }) + .collect() + } } #[cfg(test)] mod tests { use super::*; use bigdecimal::BigDecimal; + use chrono::{TimeZone, Utc}; // --- touch_addr tests --- @@ -230,4 +256,29 @@ mod tests { .unwrap(); assert_eq!(entry.last_block, 100); } + + #[test] + fn materialize_blocks_preserves_parallel_block_fields() { + let mut batch = BlockBatch::new(); + batch.b_numbers.push(42); + batch.b_hashes.push("0xabc".to_string()); + batch.b_parent_hashes.push("0xdef".to_string()); + batch.b_timestamps.push(1_700_000_042); + batch.b_gas_used.push(21_000); + batch.b_gas_limits.push(30_000_000); + batch.b_tx_counts.push(3); + + let indexed_at = Utc.timestamp_opt(1_700_000_100, 0).unwrap(); + let blocks = batch.materialize_blocks(indexed_at); + + assert_eq!(blocks.len(), 1); + assert_eq!(blocks[0].number, 42); + assert_eq!(blocks[0].hash, "0xabc"); + assert_eq!(blocks[0].parent_hash, "0xdef"); + assert_eq!(blocks[0].timestamp, 1_700_000_042); + assert_eq!(blocks[0].gas_used, 21_000); + assert_eq!(blocks[0].gas_limit, 30_000_000); + assert_eq!(blocks[0].transaction_count, 3); + assert_eq!(blocks[0].indexed_at, indexed_at); + } } diff --git a/backend/crates/atlas-server/src/indexer/copy.rs b/backend/crates/atlas-server/src/indexer/copy.rs index 0dfa7ed..1c9dff3 100644 --- a/backend/crates/atlas-server/src/indexer/copy.rs +++ b/backend/crates/atlas-server/src/indexer/copy.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use chrono::{DateTime, Utc}; use tokio::pin; use tokio_postgres::{ binary_copy::BinaryCopyInWriter, @@ -8,7 +9,11 @@ use tokio_postgres::{ use super::batch::BlockBatch; -pub async fn copy_blocks(tx: &mut Transaction<'_>, batch: &BlockBatch) -> Result<()> { +pub async fn copy_blocks( + tx: &mut Transaction<'_>, + batch: &BlockBatch, + indexed_at: DateTime, +) -> Result<()> { if batch.b_numbers.is_empty() { return Ok(()); } @@ -21,7 +26,8 @@ pub async fn copy_blocks(tx: &mut Transaction<'_>, batch: &BlockBatch) -> Result timestamp BIGINT, gas_used BIGINT, gas_limit BIGINT, - transaction_count INT + transaction_count INT, + indexed_at TIMESTAMPTZ ) ON COMMIT DELETE ROWS; TRUNCATE tmp_blocks;", ) @@ -29,7 +35,7 @@ pub async fn copy_blocks(tx: &mut Transaction<'_>, batch: &BlockBatch) -> Result let sink = tx .copy_in( - "COPY tmp_blocks (number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count) FROM STDIN BINARY", + "COPY tmp_blocks (number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at) FROM STDIN BINARY", ) .await?; let writer = BinaryCopyInWriter::new( @@ -42,12 +48,13 @@ pub async fn copy_blocks(tx: &mut Transaction<'_>, batch: &BlockBatch) -> Result Type::INT8, Type::INT8, Type::INT4, + Type::TIMESTAMPTZ, ], ); pin!(writer); for i in 0..batch.b_numbers.len() { - let row: [&(dyn ToSql + Sync); 7] = [ + let row: [&(dyn ToSql + Sync); 8] = [ &batch.b_numbers[i], &batch.b_hashes[i], &batch.b_parent_hashes[i], @@ -55,6 +62,7 @@ pub async fn copy_blocks(tx: &mut Transaction<'_>, batch: &BlockBatch) -> Result &batch.b_gas_used[i], &batch.b_gas_limits[i], &batch.b_tx_counts[i], + &indexed_at, ]; writer.as_mut().write(&row).await?; } @@ -62,8 +70,8 @@ pub async fn copy_blocks(tx: &mut Transaction<'_>, batch: &BlockBatch) -> Result writer.finish().await?; tx.execute( - "INSERT INTO blocks (number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count) - SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count + "INSERT INTO blocks (number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at) + SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at FROM tmp_blocks ON CONFLICT (number) DO UPDATE SET hash = EXCLUDED.hash, @@ -72,7 +80,7 @@ pub async fn copy_blocks(tx: &mut Transaction<'_>, batch: &BlockBatch) -> Result gas_used = EXCLUDED.gas_used, gas_limit = EXCLUDED.gas_limit, transaction_count = EXCLUDED.transaction_count, - indexed_at = NOW()", + indexed_at = EXCLUDED.indexed_at", &[], ) .await?; diff --git a/backend/crates/atlas-server/src/indexer/indexer.rs b/backend/crates/atlas-server/src/indexer/indexer.rs index dcdb79d..be32f92 100644 --- a/backend/crates/atlas-server/src/indexer/indexer.rs +++ b/backend/crates/atlas-server/src/indexer/indexer.rs @@ -3,6 +3,7 @@ use alloy::providers::ProviderBuilder; use alloy::rpc::types::TransactionReceipt; use anyhow::Result; use bigdecimal::BigDecimal; +use chrono::{DateTime, Utc}; use governor::{Quota, RateLimiter}; use sqlx::PgPool; use std::collections::{BTreeMap, HashMap, HashSet}; @@ -23,6 +24,7 @@ use super::fetcher::{ WorkItem, }; use crate::config::Config; +use crate::head::HeadTracker; /// Partition size: 10 million blocks per partition const PARTITION_SIZE: u64 = 10_000_000; @@ -40,16 +42,24 @@ pub struct Indexer { current_max_partition: std::sync::atomic::AtomicU64, /// Broadcast channel to notify SSE subscribers of new blocks block_events_tx: broadcast::Sender<()>, + /// Shared in-memory tracker for the latest committed head and replay tail + head_tracker: Arc, } impl Indexer { - pub fn new(pool: PgPool, config: Config, block_events_tx: broadcast::Sender<()>) -> Self { + pub fn new( + pool: PgPool, + config: Config, + block_events_tx: broadcast::Sender<()>, + head_tracker: Arc, + ) -> Self { Self { pool, config, // Will be initialized on first run based on start block current_max_partition: std::sync::atomic::AtomicU64::new(0), block_events_tx, + head_tracker, } } @@ -102,6 +112,7 @@ impl Indexer { // Handle reindex flag if self.config.reindex { tracing::warn!("Reindex flag set - truncating all tables"); + self.head_tracker.clear().await; self.truncate_tables().await?; } @@ -632,8 +643,10 @@ impl Indexer { } let mut pg_tx = copy_client.transaction().await?; + let indexed_at: DateTime = pg_tx.query_one("SELECT NOW()", &[]).await?.get(0); + let committed_blocks = update_watermark.then(|| batch.materialize_blocks(indexed_at)); - copy_blocks(&mut pg_tx, &batch).await?; + copy_blocks(&mut pg_tx, &batch, indexed_at).await?; copy_transactions(&mut pg_tx, &batch).await?; copy_event_logs(&mut pg_tx, &batch).await?; copy_nft_transfers(&mut pg_tx, &batch).await?; @@ -781,9 +794,9 @@ impl Indexer { pg_tx .execute( "INSERT INTO indexer_state (key, value, updated_at) - VALUES ('last_indexed_block', $1, NOW()) - ON CONFLICT (key) DO UPDATE SET value = $1, updated_at = NOW()", - &[&last_value], + VALUES ('last_indexed_block', $1, $2) + ON CONFLICT (key) DO UPDATE SET value = $1, updated_at = $2", + &[&last_value, &indexed_at], ) .await?; } @@ -791,6 +804,9 @@ impl Indexer { pg_tx.commit().await?; if update_watermark { + if let Some(blocks) = committed_blocks.as_ref() { + self.head_tracker.publish_committed_batch(blocks).await; + } // Notify SSE subscribers only after the batch commit is visible. let _ = self.block_events_tx.send(()); } diff --git a/backend/crates/atlas-server/src/main.rs b/backend/crates/atlas-server/src/main.rs index a80caed..b52290f 100644 --- a/backend/crates/atlas-server/src/main.rs +++ b/backend/crates/atlas-server/src/main.rs @@ -6,6 +6,7 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; mod api; mod config; +mod head; mod indexer; /// Retry delays for exponential backoff (in seconds) @@ -42,16 +43,27 @@ async fn main() -> Result<()> { // Shared broadcast channel for SSE notifications let (block_events_tx, _) = broadcast::channel(1024); + let head_tracker = Arc::new(if config.reindex { + head::HeadTracker::empty(config.sse_replay_buffer_blocks) + } else { + head::HeadTracker::bootstrap(&api_pool, config.sse_replay_buffer_blocks).await? + }); // Build AppState for API let state = Arc::new(api::AppState { pool: api_pool, block_events_tx: block_events_tx.clone(), + head_tracker: head_tracker.clone(), rpc_url: config.rpc_url.clone(), }); // Spawn indexer task with retry logic - let indexer = indexer::Indexer::new(indexer_pool.clone(), config.clone(), block_events_tx); + let indexer = indexer::Indexer::new( + indexer_pool.clone(), + config.clone(), + block_events_tx, + head_tracker, + ); tokio::spawn(async move { if let Err(e) = run_with_retry(|| indexer.run()).await { tracing::error!("Indexer terminated with error: {}", e); diff --git a/docs/API.md b/docs/API.md index 1214c53..3e53168 100644 --- a/docs/API.md +++ b/docs/API.md @@ -29,22 +29,41 @@ Response format: | Method | Path | Description | |--------|------|-------------| -| GET | `/api/height` | Current block height and indexer timestamp (lightweight, safe to poll frequently) | -| GET | `/api/status` | Full chain status: chain ID, chain name, block height, total transactions, total addresses | +| GET | `/api/status` | Current indexed block height and index timestamp (lightweight, safe to poll frequently) | +| GET | `/api/events` | SSE stream of committed `new_block` events | | GET | `/health` | Health check (returns "OK") | **`/api/status` response:** ```json { - "chain_id": 1, - "chain_name": "My Chain", "block_height": 1000000, - "total_transactions": 5000000, - "total_addresses": 200000, "indexed_at": "2026-01-01T00:00:00+00:00" } ``` -`chain_name` is set via the `CHAIN_NAME` environment variable. + +`block_height` and `indexed_at` refer to the latest committed/indexed head. + +**`/api/events` SSE details:** + +- Event name: `new_block` +- Payload: + +```json +{ + "block": { + "number": 1000000, + "hash": "0x...", + "parent_hash": "0x...", + "timestamp": 1700000000, + "gas_used": 21000, + "gas_limit": 30000000, + "transaction_count": 1, + "indexed_at": "2026-01-01T00:00:00+00:00" + } +} +``` + +The stream represents the committed indexed head, not a speculative node-observed head. It is a head/tail stream, not a history replay API: new or reconnected clients resume from the current live tail, while canonical catch-up stays on `/api/blocks` and `/api/status`. ### Blocks diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index a541aac..91ffdc6 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -1,32 +1,35 @@ - # Architecture +# Architecture ## Overview -Atlas is a modular Ethereum L2 blockchain indexer and API server built in Rust. It indexes blocks, transactions, ERC-20 tokens, and NFTs from any EVM-compatible chain. +Atlas is a modular Ethereum L2 blockchain indexer and API server built in Rust. The `atlas-server` process runs both the indexer and the HTTP API, indexing blocks, transactions, ERC-20 tokens, and NFTs from any EVM-compatible chain. ## System Diagram ``` -┌─────────────────────────────────────────────────────────────┐ -│ PostgreSQL Database │ -│ (Partitioned tables for blocks, transactions, transfers) │ -└─────────────────────────────────────────────────────────────┘ - ↑ ↑ - │ (Read-Write) │ (Read-Only) - ┌────────────────────┐ ┌────────────────────────┐ - │ Atlas Indexer │ │ Atlas API Server │ - │ ─────────────── │ │ ──────────────────── │ - │ • Block Fetcher │ │ • REST Endpoints │ - │ • TX Processing │ │ • Contract ABIs │ - │ • Event Parsing │ │ • Etherscan Compat │ - │ • Metadata Fetcher │ │ • Search │ - └────────────────────┘ └────────────────────────┘ - │ - ↓ - ┌─────────────────────┐ - │ Ethereum Node │ - │ (JSON-RPC) │ - └─────────────────────┘ +┌────────────────────────────────────────────────────────────────────┐ +│ atlas-server process │ +│ │ +│ ┌────────────────────┐ post-commit publish ┌──────────┐ │ +│ │ Indexer │ ────────────────────────────► │HeadTracker│ │ +│ │ • RPC block fetch │ │ latest │ │ +│ │ • Batch assembly │ │ live tail │ │ +│ │ • DB writes │ └─────┬────┘ │ +│ └─────────┬──────────┘ │ │ +│ │ │ │ +│ ▼ ▼ │ +│ ┌────────────────────┐ ┌────────────────┐│ +│ │ PostgreSQL │ │ HTTP API ││ +│ │ canonical history │ │ • REST ││ +│ │ blocks/indexes │ │ • SSE events ││ +│ └────────────────────┘ └────────────────┘│ +└────────────────────────────────────────────────────────────────────┘ + ▲ + │ + ┌─────────────────────┐ + │ Ethereum Node │ + │ (JSON-RPC) │ + └─────────────────────┘ ``` ## Project Structure @@ -36,8 +39,7 @@ atlas/ ├── backend/ │ ├── crates/ │ │ ├── atlas-common/ # Shared types, DB models, error handling -│ │ ├── atlas-indexer/ # Block indexer + metadata fetcher -│ │ └── atlas-api/ # REST API server (Axum) +│ │ └── atlas-server/ # Combined indexer + API server (Axum) │ └── migrations/ # PostgreSQL migrations ├── frontend/ # React frontend (Vite + Tailwind) └── docker-compose.yml diff --git a/frontend/src/hooks/useBlockSSE.ts b/frontend/src/hooks/useBlockSSE.ts index a278133..1f20312 100644 --- a/frontend/src/hooks/useBlockSSE.ts +++ b/frontend/src/hooks/useBlockSSE.ts @@ -73,9 +73,9 @@ export default function useBlockSSE(): BlockSSEState { const [error, setError] = useState(null); const [bps, setBps] = useState(null); const esRef = useRef(null); - const reconnectTimeoutRef = useRef(null); const queueRef = useRef([]); const drainTimerRef = useRef(null); + const drainOneRef = useRef<(() => void) | null>(null); const lastDrainAtRef = useRef(0); // Rolling window of (blockNumber, blockTimestamp) for chain-rate calculation. // We keep up to 500 samples (~45s at 11 bps) and use two windows: @@ -115,7 +115,7 @@ export default function useBlockSSE(): BlockSSEState { delay = Math.max(delay, burstLeadIn); } - drainTimerRef.current = window.setTimeout(drainOne, delay); + drainTimerRef.current = window.setTimeout(() => drainOneRef.current?.(), delay); }, []); // Kick the drain loop when new items arrive. @@ -173,22 +173,15 @@ export default function useBlockSSE(): BlockSSEState { es.onerror = (e) => { setConnected(false); - setError(`SSE ${e.type || 'error'}; retrying`); - es.close(); - esRef.current = null; - - if (reconnectTimeoutRef.current !== null) { - clearTimeout(reconnectTimeoutRef.current); - } - reconnectTimeoutRef.current = window.setTimeout(() => { - reconnectTimeoutRef.current = null; - connectSSE(); - }, 2000); + // Preserve the browser-managed EventSource reconnect path. The server's + // head-only stream resumes from the current live tail rather than replaying + // missed history; canonical catch-up stays on normal block/status fetches. + setError(`SSE ${e.type || 'error'}; reconnecting`); }; }, [kickDrain]); // Drain one block from the queue at the chain's natural cadence. - function drainOne() { + const drainOne = useCallback(() => { const queue = queueRef.current; drainTimerRef.current = null; @@ -210,7 +203,11 @@ export default function useBlockSSE(): BlockSSEState { if (queue.length > 0) { scheduleDrain(false); } - } + }, [scheduleDrain]); + + useEffect(() => { + drainOneRef.current = drainOne; + }, [drainOne]); useEffect(() => { connect(); @@ -220,10 +217,6 @@ export default function useBlockSSE(): BlockSSEState { esRef.current.close(); esRef.current = null; } - if (reconnectTimeoutRef.current !== null) { - clearTimeout(reconnectTimeoutRef.current); - reconnectTimeoutRef.current = null; - } if (drainTimerRef.current !== null) { clearTimeout(drainTimerRef.current); drainTimerRef.current = null; From 631c00f3eccd9ca957870fdc64ea4628d4eba2b8 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Mon, 16 Mar 2026 19:09:21 +0100 Subject: [PATCH 2/9] wrapup --- .../atlas-server/src/api/handlers/mod.rs | 13 +++++++++++++ .../atlas-server/src/api/handlers/sse.rs | 15 ++------------- .../atlas-server/src/api/handlers/status.rs | 18 +++++++++--------- 3 files changed, 24 insertions(+), 22 deletions(-) diff --git a/backend/crates/atlas-server/src/api/handlers/mod.rs b/backend/crates/atlas-server/src/api/handlers/mod.rs index 97f09d3..559db35 100644 --- a/backend/crates/atlas-server/src/api/handlers/mod.rs +++ b/backend/crates/atlas-server/src/api/handlers/mod.rs @@ -10,8 +10,21 @@ pub mod status; pub mod tokens; pub mod transactions; +use atlas_common::Block; use sqlx::PgPool; +const BLOCK_COLUMNS: &str = + "number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at"; + +pub async fn get_latest_block(pool: &PgPool) -> Result, sqlx::Error> { + sqlx::query_as(&format!( + "SELECT {} FROM blocks ORDER BY number DESC LIMIT 1", + BLOCK_COLUMNS + )) + .fetch_optional(pool) + .await +} + /// Get transactions table row count efficiently. /// - For tables > 100k rows: uses PostgreSQL's approximate count (instant, ~99% accurate) /// - For smaller tables: uses exact COUNT(*) (fast enough) diff --git a/backend/crates/atlas-server/src/api/handlers/sse.rs b/backend/crates/atlas-server/src/api/handlers/sse.rs index fd7a3ba..ecd7903 100644 --- a/backend/crates/atlas-server/src/api/handlers/sse.rs +++ b/backend/crates/atlas-server/src/api/handlers/sse.rs @@ -10,13 +10,11 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::broadcast; +use crate::api::handlers::get_latest_block; use crate::api::AppState; use atlas_common::Block; -use sqlx::PgPool; use tracing::warn; -const BLOCK_COLUMNS: &str = - "number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at"; type SseStream = Pin> + Send>>; #[derive(Serialize, Debug)] @@ -43,7 +41,7 @@ pub async fn block_events(State(state): State>) -> Sse yield Ok(event); } } - None => match fetch_latest_block(&pool).await { + None => match get_latest_block(&pool).await { Ok(Some(block)) => { last_block_number = Some(block.number); if let Some(event) = block_to_event(block) { @@ -110,15 +108,6 @@ where ) } -async fn fetch_latest_block(pool: &PgPool) -> Result, sqlx::Error> { - sqlx::query_as(&format!( - "SELECT {} FROM blocks ORDER BY number DESC LIMIT 1", - BLOCK_COLUMNS - )) - .fetch_optional(pool) - .await -} - fn block_to_event(block: Block) -> Option { let event = NewBlockEvent { block }; serde_json::to_string(&event) diff --git a/backend/crates/atlas-server/src/api/handlers/status.rs b/backend/crates/atlas-server/src/api/handlers/status.rs index 1b5aae9..5545e4e 100644 --- a/backend/crates/atlas-server/src/api/handlers/status.rs +++ b/backend/crates/atlas-server/src/api/handlers/status.rs @@ -3,6 +3,7 @@ use serde::Serialize; use std::sync::Arc; use crate::api::error::ApiResult; +use crate::api::handlers::get_latest_block; use crate::api::AppState; #[derive(Serialize)] @@ -21,16 +22,15 @@ pub async fn get_status(State(state): State>) -> ApiResult) = sqlx::query_as( - "SELECT value, updated_at FROM indexer_state WHERE key = 'last_indexed_block'", - ) - .fetch_one(&state.pool) - .await?; - - let block_height: i64 = result.0.parse().unwrap_or(0); + if let Some(block) = get_latest_block(&state.pool).await? { + return Ok(Json(ChainStatus { + block_height: block.number, + indexed_at: block.indexed_at.to_rfc3339(), + })); + } Ok(Json(ChainStatus { - block_height, - indexed_at: result.1.to_rfc3339(), + block_height: 0, + indexed_at: String::new(), })) } From a131152abee476be7eab150960b4a31deae28b26 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Mon, 16 Mar 2026 19:25:39 +0100 Subject: [PATCH 3/9] changes --- backend/crates/atlas-common/src/types.rs | 4 ++ .../atlas-server/src/api/handlers/mod.rs | 5 +- .../atlas-server/src/api/handlers/sse.rs | 2 +- .../atlas-server/src/api/handlers/status.rs | 9 ++-- backend/crates/atlas-server/src/head.rs | 50 ++++++++++++------- .../atlas-server/src/indexer/indexer.rs | 4 +- frontend/src/api/status.ts | 2 +- 7 files changed, 46 insertions(+), 30 deletions(-) diff --git a/backend/crates/atlas-common/src/types.rs b/backend/crates/atlas-common/src/types.rs index 174fde2..e32ad2e 100644 --- a/backend/crates/atlas-common/src/types.rs +++ b/backend/crates/atlas-common/src/types.rs @@ -226,6 +226,10 @@ pub struct ContractAbi { pub verified_at: DateTime, } +/// SQL column list for the `blocks` table, matching the field order in [`Block`]. +pub const BLOCK_COLUMNS: &str = + "number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at"; + /// Pagination parameters #[derive(Debug, Clone, Deserialize)] pub struct Pagination { diff --git a/backend/crates/atlas-server/src/api/handlers/mod.rs b/backend/crates/atlas-server/src/api/handlers/mod.rs index 559db35..c0f72c2 100644 --- a/backend/crates/atlas-server/src/api/handlers/mod.rs +++ b/backend/crates/atlas-server/src/api/handlers/mod.rs @@ -10,12 +10,9 @@ pub mod status; pub mod tokens; pub mod transactions; -use atlas_common::Block; +use atlas_common::{Block, BLOCK_COLUMNS}; use sqlx::PgPool; -const BLOCK_COLUMNS: &str = - "number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at"; - pub async fn get_latest_block(pool: &PgPool) -> Result, sqlx::Error> { sqlx::query_as(&format!( "SELECT {} FROM blocks ORDER BY number DESC LIMIT 1", diff --git a/backend/crates/atlas-server/src/api/handlers/sse.rs b/backend/crates/atlas-server/src/api/handlers/sse.rs index ecd7903..f9dcdab 100644 --- a/backend/crates/atlas-server/src/api/handlers/sse.rs +++ b/backend/crates/atlas-server/src/api/handlers/sse.rs @@ -58,7 +58,7 @@ pub async fn block_events(State(state): State>) -> Sse let snapshot = head_tracker.replay_after(Some(cursor)).await; if let Some(buffer_start) = snapshot.buffer_start { - if cursor < buffer_start.saturating_sub(1) { + if cursor + 1 < buffer_start { warn!( last_seen = cursor, buffer_start, diff --git a/backend/crates/atlas-server/src/api/handlers/status.rs b/backend/crates/atlas-server/src/api/handlers/status.rs index 5545e4e..48cbbae 100644 --- a/backend/crates/atlas-server/src/api/handlers/status.rs +++ b/backend/crates/atlas-server/src/api/handlers/status.rs @@ -9,7 +9,8 @@ use crate::api::AppState; #[derive(Serialize)] pub struct ChainStatus { pub block_height: i64, - pub indexed_at: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub indexed_at: Option, } /// GET /api/status - Lightweight endpoint for current chain status @@ -18,19 +19,19 @@ pub async fn get_status(State(state): State>) -> ApiResult) { if blocks.is_empty() { return; } @@ -77,9 +79,9 @@ impl HeadTracker { continue; } - state.latest = Some(block.clone()); - state.replay.push_back(block.clone()); latest_number = Some(block.number); + state.latest = Some(block.clone()); + state.replay.push_back(block); } while state.replay.len() > self.replay_capacity { @@ -94,12 +96,22 @@ impl HeadTracker { pub(crate) async fn replay_after(&self, after_block: Option) -> ReplaySnapshot { let state = self.state.read().await; - let blocks_after_cursor = state - .replay - .iter() - .filter(|block| after_block.is_none_or(|cursor| block.number > cursor)) - .cloned() - .collect(); + let blocks_after_cursor = match after_block { + None => state.replay.iter().cloned().collect(), + Some(cursor) => { + // The replay deque is sorted by block number. Binary search on + // each contiguous slice to skip past blocks <= cursor. + let (head, tail) = state.replay.as_slices(); + let mut out = Vec::new(); + for slc in [head, tail] { + let start = slc.partition_point(|b| b.number <= cursor); + if start < slc.len() { + out.extend_from_slice(&slc[start..]); + } + } + out + } + }; ReplaySnapshot { buffer_start: state.replay.front().map(|block| block.number), @@ -131,7 +143,7 @@ mod tests { async fn replay_after_returns_full_buffer_for_empty_cursor() { let tracker = HeadTracker::empty(3); tracker - .publish_committed_batch(&[sample_block(10), sample_block(11)]) + .publish_committed_batch(vec![sample_block(10), sample_block(11)]) .await; let snapshot = tracker.replay_after(None).await; @@ -150,7 +162,7 @@ mod tests { async fn publish_committed_batch_trims_oldest_blocks() { let tracker = HeadTracker::empty(2); tracker - .publish_committed_batch(&[sample_block(10), sample_block(11), sample_block(12)]) + .publish_committed_batch(vec![sample_block(10), sample_block(11), sample_block(12)]) .await; let snapshot = tracker.replay_after(None).await; @@ -167,9 +179,11 @@ mod tests { #[tokio::test] async fn publish_committed_batch_ignores_non_advancing_blocks() { let tracker = HeadTracker::empty(3); - tracker.publish_committed_batch(&[sample_block(10)]).await; tracker - .publish_committed_batch(&[sample_block(9), sample_block(10)]) + .publish_committed_batch(vec![sample_block(10)]) + .await; + tracker + .publish_committed_batch(vec![sample_block(9), sample_block(10)]) .await; let snapshot = tracker.replay_after(None).await; diff --git a/backend/crates/atlas-server/src/indexer/indexer.rs b/backend/crates/atlas-server/src/indexer/indexer.rs index be32f92..73c468f 100644 --- a/backend/crates/atlas-server/src/indexer/indexer.rs +++ b/backend/crates/atlas-server/src/indexer/indexer.rs @@ -643,7 +643,7 @@ impl Indexer { } let mut pg_tx = copy_client.transaction().await?; - let indexed_at: DateTime = pg_tx.query_one("SELECT NOW()", &[]).await?.get(0); + let indexed_at: DateTime = Utc::now(); let committed_blocks = update_watermark.then(|| batch.materialize_blocks(indexed_at)); copy_blocks(&mut pg_tx, &batch, indexed_at).await?; @@ -804,7 +804,7 @@ impl Indexer { pg_tx.commit().await?; if update_watermark { - if let Some(blocks) = committed_blocks.as_ref() { + if let Some(blocks) = committed_blocks { self.head_tracker.publish_committed_batch(blocks).await; } // Notify SSE subscribers only after the batch commit is visible. diff --git a/frontend/src/api/status.ts b/frontend/src/api/status.ts index 4f5df2c..87dd4b6 100644 --- a/frontend/src/api/status.ts +++ b/frontend/src/api/status.ts @@ -2,7 +2,7 @@ import client from './client'; export interface StatusResponse { block_height: number; - indexed_at: string; // ISO timestamp + indexed_at?: string; // ISO timestamp, absent when no blocks indexed } export async function getStatus(): Promise { From 9666d5c2e73889f3bd3e600b8025ebfbc67b9440 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Mon, 16 Mar 2026 20:00:57 +0100 Subject: [PATCH 4/9] simplify frontend --- frontend/src/components/Layout.tsx | 99 +-------- frontend/src/hooks/useBlockSSE.ts | 231 +++++++++------------ frontend/src/hooks/useLatestBlockHeight.ts | 110 ---------- frontend/src/hooks/useStats.ts | 6 +- 4 files changed, 108 insertions(+), 338 deletions(-) delete mode 100644 frontend/src/hooks/useLatestBlockHeight.ts diff --git a/frontend/src/components/Layout.tsx b/frontend/src/components/Layout.tsx index 2d3d63b..aa024f5 100644 --- a/frontend/src/components/Layout.tsx +++ b/frontend/src/components/Layout.tsx @@ -1,7 +1,6 @@ import { Link, NavLink, Outlet, useLocation } from 'react-router-dom'; -import { useEffect, useMemo, useRef, useState } from 'react'; +import { useMemo } from 'react'; import SearchBar from './SearchBar'; -import useLatestBlockHeight from '../hooks/useLatestBlockHeight'; import useBlockSSE from '../hooks/useBlockSSE'; import SmoothCounter from './SmoothCounter'; import logoImg from '../assets/logo.png'; @@ -12,99 +11,17 @@ export default function Layout() { const location = useLocation(); const isHome = location.pathname === '/'; const sse = useBlockSSE(); - const { height, lastUpdatedAt, bps } = useLatestBlockHeight(2000, sse); - const [now, setNow] = useState(() => Date.now()); - const recentlyUpdated = lastUpdatedAt ? (now - lastUpdatedAt) < 10000 : false; - const [displayedHeight, setDisplayedHeight] = useState(null); - const rafRef = useRef(null); - const displayRafRef = useRef(null); - const lastFrameRef = useRef(0); - const displayedRef = useRef(0); - const displayInitializedRef = useRef(false); - useEffect(() => { - const id = window.setInterval(() => setNow(Date.now()), 1000); - return () => window.clearInterval(id); - }, []); - - // Update displayed height - // When SSE is connected: show exact height from SSE (increments one-by-one) - // When polling: use bps prediction to smooth between poll intervals - useEffect(() => { - if (height == null) { - if (displayRafRef.current !== null) { - cancelAnimationFrame(displayRafRef.current); - } - displayRafRef.current = window.requestAnimationFrame(() => { - setDisplayedHeight(null); - displayInitializedRef.current = false; - displayRafRef.current = null; - }); - return; - } - - // Initialize displayed to at least current height on first run - if (!displayInitializedRef.current || height > displayedRef.current) { - displayedRef.current = Math.max(displayedRef.current || 0, height); - if (displayRafRef.current !== null) { - cancelAnimationFrame(displayRafRef.current); - } - displayRafRef.current = window.requestAnimationFrame(() => { - setDisplayedHeight(displayedRef.current); - displayInitializedRef.current = true; - displayRafRef.current = null; - }); - } - - // When SSE is connected, just track the real height directly — no prediction. - // The initialization block above already scheduled a RAF to call setDisplayedHeight - // whenever height changes, so no synchronous setState needed here. - if (sse.connected) { - displayedRef.current = height; - return; - } - - // Polling mode: use bps prediction to smooth between poll intervals - const loop = (t: number) => { - if (!bps || bps <= 0) { - if (displayedRef.current !== height) { - displayedRef.current = height; - setDisplayedHeight(displayedRef.current); - } - } else { - const now = t || performance.now(); - const dt = lastFrameRef.current ? (now - lastFrameRef.current) / 1000 : 0; - lastFrameRef.current = now; - - const predicted = displayedRef.current + bps * dt; - const next = Math.max(height, Math.floor(predicted)); - if (next !== displayedRef.current) { - displayedRef.current = next; - setDisplayedHeight(next); - } - } - rafRef.current = window.requestAnimationFrame(loop); - }; - - rafRef.current = window.requestAnimationFrame(loop); - return () => { - if (rafRef.current) cancelAnimationFrame(rafRef.current); - if (displayRafRef.current) cancelAnimationFrame(displayRafRef.current); - rafRef.current = null; - displayRafRef.current = null; - lastFrameRef.current = 0; - }; - }, [height, bps, sse.connected]); const blockTimeLabel = useMemo(() => { - if (bps !== null && bps > 0) { - const secs = 1 / bps; + if (sse.bps !== null && sse.bps > 0) { + const secs = 1 / sse.bps; if (secs < 1) { return `${Math.round(secs * 1000)} ms`; } return `${secs.toFixed(1)} s`; } return '—'; - }, [bps]); + }, [sse.bps]); const navLinkClass = ({ isActive }: { isActive: boolean }) => `inline-flex items-center h-10 px-4 rounded-full leading-none transition-colors duration-150 ${ isActive @@ -185,10 +102,10 @@ export default function Layout() {
- + |
- +
diff --git a/frontend/src/hooks/useBlockSSE.ts b/frontend/src/hooks/useBlockSSE.ts index 1f20312..bc2c0ab 100644 --- a/frontend/src/hooks/useBlockSSE.ts +++ b/frontend/src/hooks/useBlockSSE.ts @@ -1,6 +1,7 @@ import { useCallback, useEffect, useRef, useState } from 'react'; import type { Block } from '../types'; import { API_BASE_URL } from '../api/client'; +import { getStatus } from '../api/status'; export interface NewBlockEvent { block: Block; @@ -12,16 +13,14 @@ export interface BlockSSEState { connected: boolean; error: string | null; bps: number | null; + lastUpdatedAt: number | null; } type BlockLog = { num: number; ts: number }[]; -type QueuedBlock = { event: NewBlockEvent; receivedAt: number }; -const MIN_DRAIN_INTERVAL_MS = 30; -const MAX_DRAIN_INTERVAL_MS = 500; -const STREAM_BUFFER_BLOCKS = 3; -const MAX_BUFFER_WAIT_MS = 320; -const MAX_BURST_LEAD_IN_MS = 320; +const MIN_DRAIN_MS = 30; +const MAX_DRAIN_MS = 500; +const POLL_INTERVAL_MS = 2000; /** * Compute bps from a rolling log of (blockNumber, blockTimestamp) samples. @@ -39,32 +38,13 @@ function computeBpsFromLog(log: BlockLog, minSpan: number, fallbackMinSpan: numb return null; } -function getDrainInterval(baseInterval: number, queueLength: number): number { - let interval = baseInterval; - - // Speed up gently when backlog builds, but avoid collapsing back into bursts. - if (queueLength > 24) interval *= 0.62; - else if (queueLength > 12) interval *= 0.72; - else if (queueLength > 6) interval *= 0.82; - else if (queueLength > 2) interval *= 0.9; - - return Math.max(MIN_DRAIN_INTERVAL_MS, interval); -} - /** - * Connects to the SSE endpoint and delivers block events one-by-one. + * Connects to the SSE endpoint and delivers block events at the chain's natural + * cadence. Falls back to polling /api/status when SSE disconnects. * - * SSE events can still arrive in bursts when the backend indexes a backlog. - * React batching would collapse rapid setState calls, so we buffer into a - * ref-based queue and drain one at a time at a steadier visual cadence. - * - * The drain interval is derived from the chain's true block time, computed - * from on-chain block timestamps (not arrival/indexed times). We collect a - * rolling window of (blockNumber, blockTimestamp) samples and compute - * bps = deltaBlocks / deltaTimestamp. Since timestamps have second-level - * granularity, we require at least 2 seconds of block-time span for accuracy. - * This makes the visual cadence match the chain's actual speed, regardless of - * indexer lag, network batching, or SSE delivery timing. + * The drain queue rate-limits setState calls so React doesn't collapse rapid + * arrivals from backend batch commits. The interval is derived from on-chain + * block timestamps (not arrival times), making visual cadence match the chain. */ export default function useBlockSSE(): BlockSSEState { const [latestBlock, setLatestBlock] = useState(null); @@ -72,158 +52,141 @@ export default function useBlockSSE(): BlockSSEState { const [connected, setConnected] = useState(false); const [error, setError] = useState(null); const [bps, setBps] = useState(null); + const [lastUpdatedAt, setLastUpdatedAt] = useState(null); + const esRef = useRef(null); - const queueRef = useRef([]); + const queueRef = useRef([]); const drainTimerRef = useRef(null); const drainOneRef = useRef<(() => void) | null>(null); - const lastDrainAtRef = useRef(0); - // Rolling window of (blockNumber, blockTimestamp) for chain-rate calculation. - // We keep up to 500 samples (~45s at 11 bps) and use two windows: - // - 30s of block-time for the displayed bps (very stable) - // - 10s of block-time for drain pacing (responsive enough to adapt) - const blockLogRef = useRef([]); - // Cached drain interval in ms, derived from chain block timestamps const drainIntervalRef = useRef(90); // initial guess ~11 bps + const blockLogRef = useRef([]); + const pollTimerRef = useRef(null); + const connectedRef = useRef(false); - const scheduleDrain = useCallback((fromBurstStart: boolean) => { + // --- Drain: emit one block per chain-rate interval --- + + const scheduleDrain = useCallback(() => { if (drainTimerRef.current !== null || queueRef.current.length === 0) return; + const interval = Math.max(MIN_DRAIN_MS, drainIntervalRef.current); + drainTimerRef.current = window.setTimeout(() => drainOneRef.current?.(), interval); + }, []); - const now = performance.now(); + const drainOne = useCallback(() => { + drainTimerRef.current = null; const queue = queueRef.current; - const interval = getDrainInterval(drainIntervalRef.current, queue.length); - const sinceLastDrain = lastDrainAtRef.current > 0 ? now - lastDrainAtRef.current : null; + if (queue.length === 0) return; - let delay = 0; - if (sinceLastDrain !== null) { - delay = Math.max(0, interval - sinceLastDrain); + // If queue backs up, skip to near the end + if (queue.length > 50) { + const skip = queue.splice(0, queue.length - 5); + const last = skip[skip.length - 1]; + setHeight(last.block.number); } - // Keep a small client-side buffer so the UI doesn't spend its final couple - // of items too early and expose the backend's batch cadence as a pause. - const oldestBufferedFor = now - queue[0].receivedAt; - if (queue.length < STREAM_BUFFER_BLOCKS && oldestBufferedFor < MAX_BUFFER_WAIT_MS) { - delay = Math.max(delay, Math.min(interval, MAX_BUFFER_WAIT_MS - oldestBufferedFor)); + const next = queue.shift()!; + setLatestBlock(next); + setHeight(next.block.number); + setLastUpdatedAt(Date.now()); + + if (queue.length > 0) scheduleDrain(); + }, [scheduleDrain]); + + useEffect(() => { + drainOneRef.current = drainOne; + }, [drainOne]); + + const enqueue = useCallback((data: NewBlockEvent) => { + // Update bps from on-chain block timestamps + const log = blockLogRef.current; + log.push({ num: data.block.number, ts: data.block.timestamp }); + if (log.length > 500) blockLogRef.current = log.slice(-500); + + // Displayed bps: 30s window for stability, 5s fallback while bootstrapping + const displayBps = computeBpsFromLog(blockLogRef.current, 30, 5); + if (displayBps !== null) setBps(displayBps); + + // Drain pacing: 10s window for moderate responsiveness, 2s fallback + const drainBps = computeBpsFromLog(blockLogRef.current, 10, 2); + if (drainBps !== null) { + drainIntervalRef.current = Math.max(MIN_DRAIN_MS, Math.min(MAX_DRAIN_MS, 1000 / drainBps)); } - // When a backlog arrives after idle time, add a brief lead-in so the UI - // starts "dripping" rather than snapping to the first block immediately. - if (fromBurstStart && queue.length > 1) { - const burstLeadIn = Math.min( - MAX_BURST_LEAD_IN_MS, - interval * Math.min(queue.length - 1, STREAM_BUFFER_BLOCKS + 1), - ); - delay = Math.max(delay, burstLeadIn); + queueRef.current.push(data); + scheduleDrain(); + }, [scheduleDrain]); + + // --- Polling fallback --- + + const stopPolling = useCallback(() => { + if (pollTimerRef.current !== null) { + clearInterval(pollTimerRef.current); + pollTimerRef.current = null; } + }, []); + + const startPolling = useCallback(() => { + if (pollTimerRef.current !== null) return; - drainTimerRef.current = window.setTimeout(() => drainOneRef.current?.(), delay); + const poll = async () => { + try { + const status = await getStatus(); + if (typeof status?.block_height === 'number' && !connectedRef.current) { + setHeight(status.block_height); + setLastUpdatedAt(Date.now()); + } + } catch { + // ignore polling errors + } + }; + poll(); + pollTimerRef.current = window.setInterval(poll, POLL_INTERVAL_MS); }, []); - // Kick the drain loop when new items arrive. - const kickDrain = useCallback(() => { - scheduleDrain(true); - }, [scheduleDrain]); + // --- SSE connection --- - const connect = useCallback(function connectSSE() { - if (esRef.current) { - esRef.current.close(); - } + const connect = useCallback(() => { + if (esRef.current) esRef.current.close(); - const url = `${API_BASE_URL}/events`; - const es = new EventSource(url); + const es = new EventSource(`${API_BASE_URL}/events`); esRef.current = es; es.onopen = () => { + connectedRef.current = true; setConnected(true); setError(null); + stopPolling(); }; es.addEventListener('new_block', (e: MessageEvent) => { try { - const data: NewBlockEvent = JSON.parse(e.data); - - // Log block number + on-chain timestamp for true chain-rate calculation - const log = blockLogRef.current; - log.push({ num: data.block.number, ts: data.block.timestamp }); - - // Keep last 500 samples (~45s at 11 bps) - if (log.length > 500) { - blockLogRef.current = log.slice(-500); - } - - // Displayed bps: 30s window for stability, 5s fallback while bootstrapping - const displayBps = computeBpsFromLog(blockLogRef.current, 30, 5); - if (displayBps !== null) setBps(displayBps); - - // Drain pacing: 10s window for moderate responsiveness, 2s fallback - const drainBps = computeBpsFromLog(blockLogRef.current, 10, 2); - if (drainBps !== null) { - drainIntervalRef.current = Math.max( - MIN_DRAIN_INTERVAL_MS, - Math.min(MAX_DRAIN_INTERVAL_MS, 1000 / drainBps), - ); - } - - // Push to ref queue — synchronous, never lost by React batching - queueRef.current.push({ event: data, receivedAt: performance.now() }); - kickDrain(); + enqueue(JSON.parse(e.data)); } catch { - // Ignore malformed events + // ignore malformed events } }); es.onerror = (e) => { + connectedRef.current = false; setConnected(false); - // Preserve the browser-managed EventSource reconnect path. The server's - // head-only stream resumes from the current live tail rather than replaying - // missed history; canonical catch-up stays on normal block/status fetches. setError(`SSE ${e.type || 'error'}; reconnecting`); + startPolling(); }; - }, [kickDrain]); - - // Drain one block from the queue at the chain's natural cadence. - const drainOne = useCallback(() => { - const queue = queueRef.current; - drainTimerRef.current = null; - - if (queue.length === 0) return; // idle — kickDrain will restart when items arrive - - // If queue is backing up (> 50 items), skip to near the end - if (queue.length > 50) { - const skip = queue.splice(0, queue.length - 5); - const lastSkipped = skip[skip.length - 1].event; - setHeight(lastSkipped.block.number); - } - - const next = queue.shift()!.event; - setLatestBlock(next); - setHeight(next.block.number); - lastDrainAtRef.current = performance.now(); - - // Schedule next drain if more items remain - if (queue.length > 0) { - scheduleDrain(false); - } - }, [scheduleDrain]); - - useEffect(() => { - drainOneRef.current = drainOne; - }, [drainOne]); + }, [enqueue, stopPolling, startPolling]); useEffect(() => { connect(); - return () => { if (esRef.current) { esRef.current.close(); esRef.current = null; } + stopPolling(); if (drainTimerRef.current !== null) { clearTimeout(drainTimerRef.current); drainTimerRef.current = null; } - lastDrainAtRef.current = 0; }; - }, [connect]); + }, [connect, stopPolling]); - return { latestBlock, height, connected, error, bps }; + return { latestBlock, height, connected, error, bps, lastUpdatedAt }; } diff --git a/frontend/src/hooks/useLatestBlockHeight.ts b/frontend/src/hooks/useLatestBlockHeight.ts deleted file mode 100644 index a0ff29e..0000000 --- a/frontend/src/hooks/useLatestBlockHeight.ts +++ /dev/null @@ -1,110 +0,0 @@ -import { useCallback, useEffect, useRef, useState } from 'react'; -import { getStatus } from '../api/status'; - -export interface SSEState { - height: number | null; - connected: boolean; - bps: number | null; -} - -export interface LatestHeightState { - height: number | null; - loading: boolean; - error: string | null; - lastUpdatedAt: number | null; - bps: number | null; -} - -/** - * Tracks the latest block height and computes blocks-per-second (bps). - * When SSE is connected, uses sseBps (derived from on-chain block timestamps) - * for a stable block-time display. Falls back to wall-clock EMA when polling. - */ -export default function useLatestBlockHeight( - pollMs = 2000, - sse: SSEState | null = null, -): LatestHeightState { - const [height, setHeight] = useState(null); - const heightRef = useRef(null); - const [loading, setLoading] = useState(true); - const [error, setError] = useState(null); - const [lastUpdatedAt, setLastUpdatedAt] = useState(null); - const fetchingRef = useRef(false); - const [bps, setBps] = useState(null); - const prevSampleRef = useRef<{ h: number; t: number } | null>(null); - const alphaRef = useRef(0.25); // smoothing factor for EMA - - const sseConnected = sse?.connected ?? false; - const sseHeight = sse?.height ?? null; - const sseBps = sse?.bps ?? null; - - // When SSE provides bps from block timestamps, use it directly - useEffect(() => { - if (sseConnected && sseBps != null) { - setBps(sseBps); - } - }, [sseConnected, sseBps]); - - // Process a new height value (from either SSE or polling) - const processHeight = useCallback((latestHeight: number, fromSSE: boolean) => { - const now = Date.now(); - if (latestHeight !== heightRef.current) { - heightRef.current = latestHeight; - setHeight(latestHeight); - setLastUpdatedAt(now); - } - // Only compute wall-clock EMA bps when polling (not SSE) - if (!fromSSE) { - const prev = prevSampleRef.current; - const curr = { h: latestHeight, t: now }; - if (prev && curr.t > prev.t && curr.h >= prev.h) { - const dh = curr.h - prev.h; - const dt = (curr.t - prev.t) / 1000; - const inst = dt > 0 ? dh / dt : 0; - const alpha = alphaRef.current; - setBps((prevBps) => (prevBps == null ? inst : prevBps + alpha * (inst - prevBps))); - } - prevSampleRef.current = curr; - } - setError(null); - setLoading(false); - }, []); - - // Handle SSE height updates - useEffect(() => { - if (sseHeight != null) { - processHeight(sseHeight, true); - } - }, [sseHeight, processHeight]); - - // HTTP polling fallback — only active when SSE is not connected - const fetchHeight = useCallback(async () => { - if (fetchingRef.current) return; - fetchingRef.current = true; - try { - const status = await getStatus(); - const latestHeight = status?.block_height; - if (typeof latestHeight === 'number') { - processHeight(latestHeight, false); - } else { - setHeight(null); - } - } catch (e: unknown) { - setError(e instanceof Error ? e.message : 'Failed to fetch latest height'); - } finally { - setLoading(false); - fetchingRef.current = false; - } - }, [processHeight]); - - useEffect(() => { - // Skip polling when SSE is connected - if (sseConnected) return; - - fetchHeight(); - const id = setInterval(fetchHeight, pollMs); - return () => clearInterval(id); - }, [pollMs, fetchHeight, sseConnected]); - - return { height, loading, error, lastUpdatedAt, bps }; -} diff --git a/frontend/src/hooks/useStats.ts b/frontend/src/hooks/useStats.ts index 5f47d6c..fd4e4f8 100644 --- a/frontend/src/hooks/useStats.ts +++ b/frontend/src/hooks/useStats.ts @@ -1,13 +1,13 @@ -import { useCallback, useEffect, useMemo, useState } from 'react'; +import { useCallback, useContext, useEffect, useMemo, useState } from 'react'; import { getTotals, getDailyTxCount, type Totals } from '../api/stats'; -import useLatestBlockHeight from './useLatestBlockHeight'; +import { BlockStatsContext } from '../context/BlockStatsContext'; export default function useStats() { const [totals, setTotals] = useState(null); const [dailyTx, setDailyTx] = useState(null); const [loading, setLoading] = useState(true); const [error, setError] = useState(null); - const { bps } = useLatestBlockHeight(4000); + const { bps } = useContext(BlockStatsContext); const fetchAll = useCallback(async () => { setLoading(true); From 17d568cbb7f6dd413269991246683707e0f8f42a Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Mon, 16 Mar 2026 20:06:50 +0100 Subject: [PATCH 5/9] comments --- .../atlas-server/src/api/handlers/sse.rs | 38 +++++++++---------- .../atlas-server/src/api/handlers/status.rs | 14 +++++-- backend/crates/atlas-server/src/config.rs | 4 +- backend/crates/atlas-server/src/head.rs | 16 ++++++++ 4 files changed, 46 insertions(+), 26 deletions(-) diff --git a/backend/crates/atlas-server/src/api/handlers/sse.rs b/backend/crates/atlas-server/src/api/handlers/sse.rs index f9dcdab..c32cd28 100644 --- a/backend/crates/atlas-server/src/api/handlers/sse.rs +++ b/backend/crates/atlas-server/src/api/handlers/sse.rs @@ -54,30 +54,28 @@ pub async fn block_events(State(state): State>) -> Sse } while let Ok(()) | Err(broadcast::error::RecvError::Lagged(_)) = rx.recv().await { - if let Some(cursor) = last_block_number { - let snapshot = head_tracker.replay_after(Some(cursor)).await; - - if let Some(buffer_start) = snapshot.buffer_start { - if cursor + 1 < buffer_start { - warn!( - last_seen = cursor, - buffer_start, - buffer_end = ?snapshot.buffer_end, - "sse head-only: client fell behind replay tail; closing stream for canonical refetch" - ); - break; - } + let snapshot = head_tracker.replay_after(last_block_number).await; + + if let (Some(cursor), Some(buffer_start)) = (last_block_number, snapshot.buffer_start) { + if cursor + 1 < buffer_start { + warn!( + last_seen = cursor, + buffer_start, + buffer_end = ?snapshot.buffer_end, + "sse head-only: client fell behind replay tail; closing stream for canonical refetch" + ); + break; } + } - if !snapshot.blocks_after_cursor.is_empty() { - for block in snapshot.blocks_after_cursor { - last_block_number = Some(block.number); - if let Some(event) = block_to_event(block) { - yield Ok(event); - } + if !snapshot.blocks_after_cursor.is_empty() { + for block in snapshot.blocks_after_cursor { + last_block_number = Some(block.number); + if let Some(event) = block_to_event(block) { + yield Ok(event); } - continue; } + continue; } match head_tracker.latest().await { diff --git a/backend/crates/atlas-server/src/api/handlers/status.rs b/backend/crates/atlas-server/src/api/handlers/status.rs index 48cbbae..f98fdc0 100644 --- a/backend/crates/atlas-server/src/api/handlers/status.rs +++ b/backend/crates/atlas-server/src/api/handlers/status.rs @@ -3,7 +3,6 @@ use serde::Serialize; use std::sync::Arc; use crate::api::error::ApiResult; -use crate::api::handlers::get_latest_block; use crate::api::AppState; #[derive(Serialize)] @@ -23,10 +22,17 @@ pub async fn get_status(State(state): State>) -> ApiResult)> = sqlx::query_as( + "SELECT value, updated_at FROM indexer_state WHERE key = 'last_indexed_block'", + ) + .fetch_optional(&state.pool) + .await?; + + if let Some((value, updated_at)) = row { return Ok(Json(ChainStatus { - block_height: block.number, - indexed_at: Some(block.indexed_at.to_rfc3339()), + block_height: value.parse().unwrap_or(0), + indexed_at: Some(updated_at.to_rfc3339()), })); } diff --git a/backend/crates/atlas-server/src/config.rs b/backend/crates/atlas-server/src/config.rs index ea7d243..f96d51a 100644 --- a/backend/crates/atlas-server/src/config.rs +++ b/backend/crates/atlas-server/src/config.rs @@ -36,8 +36,8 @@ impl Config { .unwrap_or_else(|_| "4096".to_string()) .parse() .context("Invalid SSE_REPLAY_BUFFER_BLOCKS")?; - if sse_replay_buffer_blocks == 0 { - bail!("SSE_REPLAY_BUFFER_BLOCKS must be greater than 0"); + if sse_replay_buffer_blocks == 0 || sse_replay_buffer_blocks > 100_000 { + bail!("SSE_REPLAY_BUFFER_BLOCKS must be between 1 and 100000"); } Ok(Self { diff --git a/backend/crates/atlas-server/src/head.rs b/backend/crates/atlas-server/src/head.rs index 21f82cb..e00970a 100644 --- a/backend/crates/atlas-server/src/head.rs +++ b/backend/crates/atlas-server/src/head.rs @@ -196,4 +196,20 @@ mod tests { assert_eq!(numbers, vec![10]); assert_eq!(tracker.latest().await.unwrap().number, 10); } + + #[tokio::test] + async fn clear_resets_state_to_empty() { + let tracker = HeadTracker::empty(3); + tracker + .publish_committed_batch(vec![sample_block(10)]) + .await; + assert!(tracker.latest().await.is_some()); + + tracker.clear().await; + + assert!(tracker.latest().await.is_none()); + let snapshot = tracker.replay_after(None).await; + assert!(snapshot.blocks_after_cursor.is_empty()); + assert!(snapshot.buffer_start.is_none()); + } } From 9ac5bda8925d34f36aff4ddb7c7478687310358d Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Mon, 16 Mar 2026 20:16:05 +0100 Subject: [PATCH 6/9] sse before write --- .../atlas-server/src/indexer/indexer.rs | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/backend/crates/atlas-server/src/indexer/indexer.rs b/backend/crates/atlas-server/src/indexer/indexer.rs index 73c468f..a289608 100644 --- a/backend/crates/atlas-server/src/indexer/indexer.rs +++ b/backend/crates/atlas-server/src/indexer/indexer.rs @@ -281,6 +281,18 @@ impl Indexer { let new_erc20 = std::mem::take(&mut batch.new_erc20); let new_nft = std::mem::take(&mut batch.new_nft); + // Publish to head tracker + SSE *before* the DB write so subscribers + // see new blocks without waiting for the full transaction to commit. + // The SSE handler reads from head_tracker (in-memory), not from DB, + // so this is safe even if the DB write is slow. If write_batch fails + // the indexer retries the same blocks and head_tracker ignores + // non-advancing publishes. + let committed_blocks = batch.materialize_blocks(Utc::now()); + self.head_tracker + .publish_committed_batch(committed_blocks) + .await; + let _ = self.block_events_tx.send(()); + // One DB transaction for the entire batch self.write_batch(&mut copy_client, batch, true).await?; @@ -644,7 +656,6 @@ impl Indexer { let mut pg_tx = copy_client.transaction().await?; let indexed_at: DateTime = Utc::now(); - let committed_blocks = update_watermark.then(|| batch.materialize_blocks(indexed_at)); copy_blocks(&mut pg_tx, &batch, indexed_at).await?; copy_transactions(&mut pg_tx, &batch).await?; @@ -803,14 +814,6 @@ impl Indexer { pg_tx.commit().await?; - if update_watermark { - if let Some(blocks) = committed_blocks { - self.head_tracker.publish_committed_batch(blocks).await; - } - // Notify SSE subscribers only after the batch commit is visible. - let _ = self.block_events_tx.send(()); - } - Ok(()) } From f7cf2177f3bb2d79c5b6e5b9cb052c308a935eb1 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Tue, 17 Mar 2026 10:55:31 +0100 Subject: [PATCH 7/9] address comments --- .../atlas-server/src/api/handlers/sse.rs | 142 ++++++++++++++++-- .../atlas-server/src/api/handlers/status.rs | 69 ++++++++- backend/crates/atlas-server/src/config.rs | 40 +++++ frontend/src/hooks/useBlockSSE.ts | 7 +- 4 files changed, 242 insertions(+), 16 deletions(-) diff --git a/backend/crates/atlas-server/src/api/handlers/sse.rs b/backend/crates/atlas-server/src/api/handlers/sse.rs index c32cd28..80c005f 100644 --- a/backend/crates/atlas-server/src/api/handlers/sse.rs +++ b/backend/crates/atlas-server/src/api/handlers/sse.rs @@ -12,7 +12,9 @@ use tokio::sync::broadcast; use crate::api::handlers::get_latest_block; use crate::api::AppState; +use crate::head::HeadTracker; use atlas_common::Block; +use sqlx::PgPool; use tracing::warn; type SseStream = Pin> + Send>>; @@ -22,16 +24,13 @@ struct NewBlockEvent { block: Block, } -/// GET /api/events — Server-Sent Events stream for live committed block updates. -/// New connections receive only the current latest block and then stream -/// forward from in-memory committed head state. Historical catch-up stays on -/// the canonical block endpoints. -pub async fn block_events(State(state): State>) -> Sse { - let pool = state.pool.clone(); - let head_tracker = state.head_tracker.clone(); - let mut rx = state.block_events_tx.subscribe(); - - let stream = async_stream::stream! { +/// Build the SSE block stream. Separated from the handler for testability. +fn make_block_stream( + pool: PgPool, + head_tracker: Arc, + mut rx: broadcast::Receiver<()>, +) -> impl Stream> + Send { + async_stream::stream! { let mut last_block_number: Option = None; match head_tracker.latest().await { @@ -88,8 +87,19 @@ pub async fn block_events(State(state): State>) -> Sse Some(_) | None => {} } } - }; + } +} +/// GET /api/events — Server-Sent Events stream for live committed block updates. +/// New connections receive only the current latest block and then stream +/// forward from in-memory committed head state. Historical catch-up stays on +/// the canonical block endpoints. +pub async fn block_events(State(state): State>) -> Sse { + let stream = make_block_stream( + state.pool.clone(), + state.head_tracker.clone(), + state.block_events_tx.subscribe(), + ); sse_response(stream) } @@ -116,7 +126,9 @@ fn block_to_event(block: Block) -> Option { #[cfg(test)] mod tests { use super::*; + use crate::head::HeadTracker; use chrono::Utc; + use futures::StreamExt; fn sample_block(number: i64) -> Block { Block { @@ -131,6 +143,13 @@ mod tests { } } + /// Lazy PgPool that never connects — safe for tests that don't hit the DB. + fn dummy_pool() -> PgPool { + sqlx::postgres::PgPoolOptions::new() + .connect_lazy("postgres://test@localhost:5432/test") + .expect("lazy pool creation should not fail") + } + #[test] fn new_block_event_serializes_with_block_wrapper() { let event = NewBlockEvent { @@ -173,4 +192,105 @@ mod tests { ); } } + + #[tokio::test] + async fn stream_seeds_from_head_tracker() { + let tracker = Arc::new(HeadTracker::empty(10)); + tracker + .publish_committed_batch(vec![sample_block(42)]) + .await; + + let (tx, _) = broadcast::channel::<()>(16); + let rx = tx.subscribe(); + let stream = make_block_stream(dummy_pool(), tracker, rx); + tokio::pin!(stream); + + // Drop sender so loop terminates after the initial seed + drop(tx); + + let first = tokio::time::timeout(Duration::from_secs(1), stream.next()).await; + assert!( + first.is_ok(), + "stream should yield initial event without blocking" + ); + assert!( + first.unwrap().is_some(), + "stream should yield at least one event" + ); + } + + #[tokio::test] + async fn stream_replays_new_blocks_after_broadcast() { + let tracker = Arc::new(HeadTracker::empty(10)); + tracker + .publish_committed_batch(vec![sample_block(42)]) + .await; + + let (tx, _) = broadcast::channel::<()>(16); + let rx = tx.subscribe(); + let stream = make_block_stream(dummy_pool(), tracker.clone(), rx); + tokio::pin!(stream); + + // Consume initial seed + let _ = tokio::time::timeout(Duration::from_secs(1), stream.next()) + .await + .unwrap(); + + // Publish a new block and broadcast + tracker + .publish_committed_batch(vec![sample_block(43)]) + .await; + tx.send(()).unwrap(); + + let second = tokio::time::timeout(Duration::from_secs(1), stream.next()).await; + assert!(second.is_ok(), "stream should yield event after broadcast"); + assert!( + second.unwrap().is_some(), + "broadcast should trigger a new event" + ); + + drop(tx); + } + + #[tokio::test] + async fn stream_terminates_when_client_behind_tail() { + // Buffer capacity 3: only keeps 3 most recent blocks + let tracker = Arc::new(HeadTracker::empty(3)); + tracker + .publish_committed_batch(vec![sample_block(10), sample_block(11), sample_block(12)]) + .await; + + let (tx, _) = broadcast::channel::<()>(16); + let rx = tx.subscribe(); + let stream = make_block_stream(dummy_pool(), tracker.clone(), rx); + tokio::pin!(stream); + + // Consume initial seed (latest = block 12) + let _ = tokio::time::timeout(Duration::from_secs(1), stream.next()) + .await + .unwrap(); + + // Advance buffer far ahead: client cursor=12, buffer will be [23,24,25] + tracker + .publish_committed_batch(vec![ + sample_block(20), + sample_block(21), + sample_block(22), + sample_block(23), + sample_block(24), + sample_block(25), + ]) + .await; + tx.send(()).unwrap(); + + // Stream should detect behind-tail and terminate + let result = tokio::time::timeout(Duration::from_secs(2), async { + while (stream.next().await).is_some() {} + }) + .await; + assert!( + result.is_ok(), + "stream should terminate when client falls behind replay tail" + ); + } } diff --git a/backend/crates/atlas-server/src/api/handlers/status.rs b/backend/crates/atlas-server/src/api/handlers/status.rs index f98fdc0..acff0d4 100644 --- a/backend/crates/atlas-server/src/api/handlers/status.rs +++ b/backend/crates/atlas-server/src/api/handlers/status.rs @@ -23,15 +23,15 @@ pub async fn get_status(State(state): State>) -> ApiResult)> = sqlx::query_as( - "SELECT value, updated_at FROM indexer_state WHERE key = 'last_indexed_block'", + let row: Option<(i64, chrono::DateTime)> = sqlx::query_as( + "SELECT value::bigint, updated_at FROM indexer_state WHERE key = 'last_indexed_block'", ) .fetch_optional(&state.pool) .await?; - if let Some((value, updated_at)) = row { + if let Some((block_height, updated_at)) = row { return Ok(Json(ChainStatus { - block_height: value.parse().unwrap_or(0), + block_height, indexed_at: Some(updated_at.to_rfc3339()), })); } @@ -41,3 +41,64 @@ pub async fn get_status(State(state): State>) -> ApiResult Block { + Block { + number, + hash: format!("0x{:064x}", number), + parent_hash: format!("0x{:064x}", number.saturating_sub(1)), + timestamp: 1_700_000_000 + number, + gas_used: 21_000, + gas_limit: 30_000_000, + transaction_count: 1, + indexed_at: Utc::now(), + } + } + + fn test_state(head_tracker: Arc) -> State> { + let (tx, _) = tokio::sync::broadcast::channel(1); + let pool = sqlx::postgres::PgPoolOptions::new() + .connect_lazy("postgres://test@localhost:5432/test") + .expect("lazy pool"); + State(Arc::new(AppState { + pool, + block_events_tx: tx, + head_tracker, + rpc_url: String::new(), + })) + } + + #[tokio::test] + async fn status_returns_head_tracker_block() { + let tracker = Arc::new(HeadTracker::empty(10)); + tracker + .publish_committed_batch(vec![sample_block(42)]) + .await; + + let result = get_status(test_state(tracker)).await; + let Json(status) = result.unwrap_or_else(|_| panic!("get_status should not fail")); + + assert_eq!(status.block_height, 42); + assert!(status.indexed_at.is_some()); + } + + #[tokio::test] + async fn status_returns_latest_head_after_multiple_publishes() { + let tracker = Arc::new(HeadTracker::empty(10)); + tracker + .publish_committed_batch(vec![sample_block(10), sample_block(11), sample_block(12)]) + .await; + + let result = get_status(test_state(tracker)).await; + let Json(status) = result.unwrap_or_else(|_| panic!("get_status should not fail")); + + assert_eq!(status.block_height, 12); + } +} diff --git a/backend/crates/atlas-server/src/config.rs b/backend/crates/atlas-server/src/config.rs index f96d51a..c871259 100644 --- a/backend/crates/atlas-server/src/config.rs +++ b/backend/crates/atlas-server/src/config.rs @@ -97,3 +97,43 @@ impl Config { }) } } + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Mutex; + + static ENV_LOCK: Mutex<()> = Mutex::new(()); + + fn set_required_env() { + env::set_var("DATABASE_URL", "postgres://test@localhost/test"); + env::set_var("RPC_URL", "http://localhost:8545"); + } + + #[test] + fn sse_replay_buffer_validation() { + let _lock = ENV_LOCK.lock().unwrap(); + set_required_env(); + + // Default + env::remove_var("SSE_REPLAY_BUFFER_BLOCKS"); + assert_eq!(Config::from_env().unwrap().sse_replay_buffer_blocks, 4096); + + // Valid custom value + env::set_var("SSE_REPLAY_BUFFER_BLOCKS", "12345"); + assert_eq!(Config::from_env().unwrap().sse_replay_buffer_blocks, 12345); + + // Out-of-range (0 and above max) + for val in ["0", "100001"] { + env::set_var("SSE_REPLAY_BUFFER_BLOCKS", val); + let err = Config::from_env().unwrap_err(); + assert!(err.to_string().contains("must be between 1 and 100000"), "expected range error for {val}"); + } + + // Non-numeric + env::set_var("SSE_REPLAY_BUFFER_BLOCKS", "abc"); + assert!(Config::from_env().unwrap_err().to_string().contains("Invalid SSE_REPLAY_BUFFER_BLOCKS")); + + env::remove_var("SSE_REPLAY_BUFFER_BLOCKS"); + } +} diff --git a/frontend/src/hooks/useBlockSSE.ts b/frontend/src/hooks/useBlockSSE.ts index bc2c0ab..9ee954b 100644 --- a/frontend/src/hooks/useBlockSSE.ts +++ b/frontend/src/hooks/useBlockSSE.ts @@ -62,6 +62,7 @@ export default function useBlockSSE(): BlockSSEState { const blockLogRef = useRef([]); const pollTimerRef = useRef(null); const connectedRef = useRef(false); + const highestSeenRef = useRef(-1); // --- Drain: emit one block per chain-rate interval --- @@ -96,6 +97,9 @@ export default function useBlockSSE(): BlockSSEState { }, [drainOne]); const enqueue = useCallback((data: NewBlockEvent) => { + if (data.block.number <= highestSeenRef.current) return; + highestSeenRef.current = data.block.number; + // Update bps from on-chain block timestamps const log = blockLogRef.current; log.push({ num: data.block.number, ts: data.block.timestamp }); @@ -130,7 +134,8 @@ export default function useBlockSSE(): BlockSSEState { const poll = async () => { try { const status = await getStatus(); - if (typeof status?.block_height === 'number' && !connectedRef.current) { + if (typeof status?.block_height === 'number' && !connectedRef.current && status.block_height > highestSeenRef.current) { + highestSeenRef.current = status.block_height; setHeight(status.block_height); setLastUpdatedAt(Date.now()); } From 0e3b18264204466a420a4664c4ba5d5895b5c304 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Tue, 17 Mar 2026 12:19:18 +0100 Subject: [PATCH 8/9] fmt --- backend/crates/atlas-server/src/config.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/backend/crates/atlas-server/src/config.rs b/backend/crates/atlas-server/src/config.rs index c871259..a0c7da7 100644 --- a/backend/crates/atlas-server/src/config.rs +++ b/backend/crates/atlas-server/src/config.rs @@ -127,12 +127,18 @@ mod tests { for val in ["0", "100001"] { env::set_var("SSE_REPLAY_BUFFER_BLOCKS", val); let err = Config::from_env().unwrap_err(); - assert!(err.to_string().contains("must be between 1 and 100000"), "expected range error for {val}"); + assert!( + err.to_string().contains("must be between 1 and 100000"), + "expected range error for {val}" + ); } // Non-numeric env::set_var("SSE_REPLAY_BUFFER_BLOCKS", "abc"); - assert!(Config::from_env().unwrap_err().to_string().contains("Invalid SSE_REPLAY_BUFFER_BLOCKS")); + assert!(Config::from_env() + .unwrap_err() + .to_string() + .contains("Invalid SSE_REPLAY_BUFFER_BLOCKS")); env::remove_var("SSE_REPLAY_BUFFER_BLOCKS"); } From 6bdf64905afb1d02dcb59262468354de44530dcd Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Tue, 17 Mar 2026 13:53:42 +0100 Subject: [PATCH 9/9] add test system just command --- Justfile | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/Justfile b/Justfile index 931b3da..be4d70a 100644 --- a/Justfile +++ b/Justfile @@ -29,5 +29,26 @@ backend-test: backend-server: cd backend && cargo run --bin atlas-server +# Docker +rpc_url := "https://ev-reth-eden-testnet.binarybuilders.services:8545" + +# Run full stack against eden testnet, starting from latest block +test-run: + #!/usr/bin/env bash + latest=$(curl -s -X POST {{rpc_url}} \ + -H 'Content-Type: application/json' \ + -d '{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":1}' \ + | jq -r '.result') + start_block=$((latest)) + echo "Latest block: $start_block" + RPC_URL={{rpc_url}} \ + START_BLOCK=$start_block \ + REINDEX=false \ + RPC_REQUESTS_PER_SECOND=10000 \ + FETCH_WORKERS=10 \ + BATCH_SIZE=10000 \ + RPC_BATCH_SIZE=100 \ + docker compose up --build + # Combined checks ci: backend-fmt backend-clippy backend-test frontend-install frontend-lint frontend-build