Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ RPC_BATCH_SIZE=20
# API_HOST=127.0.0.1
# API_PORT=3000
# API_DB_MAX_CONNECTIONS=20
# SSE_REPLAY_BUFFER_BLOCKS=4096 # replay tail used only for active connected clients
21 changes: 21 additions & 0 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,26 @@ backend-test:
backend-server:
cd backend && cargo run --bin atlas-server

# Docker
rpc_url := "https://ev-reth-eden-testnet.binarybuilders.services:8545"

# Run full stack against eden testnet, starting from latest block
test-run:
#!/usr/bin/env bash
latest=$(curl -s -X POST {{rpc_url}} \
-H 'Content-Type: application/json' \
-d '{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":1}' \
| jq -r '.result')
start_block=$((latest))
echo "Latest block: $start_block"
RPC_URL={{rpc_url}} \
START_BLOCK=$start_block \
REINDEX=false \
RPC_REQUESTS_PER_SECOND=10000 \
FETCH_WORKERS=10 \
BATCH_SIZE=10000 \
RPC_BATCH_SIZE=100 \
docker compose up --build

# Combined checks
ci: backend-fmt backend-clippy backend-test frontend-install frontend-lint frontend-build
4 changes: 4 additions & 0 deletions backend/crates/atlas-common/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ pub struct ContractAbi {
pub verified_at: DateTime<Utc>,
}

/// SQL column list for the `blocks` table, matching the field order in [`Block`].
pub const BLOCK_COLUMNS: &str =
"number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at";

/// Pagination parameters
#[derive(Debug, Clone, Deserialize)]
pub struct Pagination {
Expand Down
2 changes: 1 addition & 1 deletion backend/crates/atlas-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async-stream = { workspace = true }
num-bigint = "0.4"
async-channel = "2.3"
governor = "0.6"
tokio-postgres = { version = "0.7" }
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
tokio-postgres-rustls = "0.12"
rustls = "0.23"
webpki-roots = "0.26"
10 changes: 10 additions & 0 deletions backend/crates/atlas-server/src/api/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,18 @@ pub mod status;
pub mod tokens;
pub mod transactions;

use atlas_common::{Block, BLOCK_COLUMNS};
use sqlx::PgPool;

pub async fn get_latest_block(pool: &PgPool) -> Result<Option<Block>, sqlx::Error> {
sqlx::query_as(&format!(
"SELECT {} FROM blocks ORDER BY number DESC LIMIT 1",
BLOCK_COLUMNS
))
.fetch_optional(pool)
.await
}

/// Get transactions table row count efficiently.
/// - For tables > 100k rows: uses PostgreSQL's approximate count (instant, ~99% accurate)
/// - For smaller tables: uses exact COUNT(*) (fast enough)
Expand Down
250 changes: 184 additions & 66 deletions backend/crates/atlas-server/src/api/handlers/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,80 +5,109 @@ use axum::{
use futures::stream::Stream;
use serde::Serialize;
use std::convert::Infallible;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;

use crate::api::handlers::get_latest_block;
use crate::api::AppState;
use crate::head::HeadTracker;
use atlas_common::Block;
use sqlx::PgPool;
use tracing::warn;

const BLOCK_COLUMNS: &str =
"number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at";
const FETCH_BATCH_SIZE: i64 = 256;
type SseStream = Pin<Box<dyn Stream<Item = Result<Event, Infallible>> + Send>>;

#[derive(Serialize, Debug)]
struct NewBlockEvent {
block: Block,
}

/// GET /api/events — Server-Sent Events stream for live block updates.
/// Seeds from the latest indexed block, then requeries the DB for blocks added
/// after that point whenever the shared notification fanout emits a wake-up.
pub async fn block_events(
State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let pool = state.pool.clone();
let mut rx = state.block_events_tx.subscribe();

let stream = async_stream::stream! {
/// Build the SSE block stream. Separated from the handler for testability.
fn make_block_stream(
pool: PgPool,
head_tracker: Arc<HeadTracker>,
mut rx: broadcast::Receiver<()>,
) -> impl Stream<Item = Result<Event, Infallible>> + Send {
async_stream::stream! {
let mut last_block_number: Option<i64> = None;

match fetch_latest_block(&pool).await {
Ok(Some(block)) => {
let block_number = block.number;
last_block_number = Some(block_number);
match head_tracker.latest().await {
Some(block) => {
last_block_number = Some(block.number);
if let Some(event) = block_to_event(block) {
yield Ok(event);
}
}
Ok(None) => {}
Err(e) => warn!(error = ?e, "sse: failed to fetch initial block"),
None => match get_latest_block(&pool).await {
Ok(Some(block)) => {
last_block_number = Some(block.number);
if let Some(event) = block_to_event(block) {
yield Ok(event);
}
}
Ok(None) => {}
Err(e) => warn!(error = ?e, "sse: failed to fetch initial block"),
},
}

while let Ok(()) | Err(broadcast::error::RecvError::Lagged(_)) = rx.recv().await {
let mut cursor = last_block_number;

loop {
match fetch_blocks_after(&pool, cursor).await {
Ok(blocks) => {
if blocks.is_empty() {
break;
}

let batch_len = blocks.len();
for block in blocks {
let block_number = block.number;
last_block_number = Some(block_number);
cursor = Some(block_number);
if let Some(event) = block_to_event(block) {
yield Ok(event);
}
}

if batch_len < FETCH_BATCH_SIZE as usize {
break;
}
let snapshot = head_tracker.replay_after(last_block_number).await;

if let (Some(cursor), Some(buffer_start)) = (last_block_number, snapshot.buffer_start) {
if cursor + 1 < buffer_start {
warn!(
last_seen = cursor,
buffer_start,
buffer_end = ?snapshot.buffer_end,
"sse head-only: client fell behind replay tail; closing stream for canonical refetch"
);
break;
}
}

if !snapshot.blocks_after_cursor.is_empty() {
for block in snapshot.blocks_after_cursor {
last_block_number = Some(block.number);
if let Some(event) = block_to_event(block) {
yield Ok(event);
}
Err(e) => {
warn!(error = ?e, cursor = ?last_block_number, "sse: failed to fetch blocks after wake-up");
break;
}
continue;
}

match head_tracker.latest().await {
Some(block) if last_block_number.is_none_or(|last_seen| block.number > last_seen) => {
last_block_number = Some(block.number);
if let Some(event) = block_to_event(block) {
yield Ok(event);
}
}
Some(_) | None => {}
}
}
};
}
}

/// GET /api/events — Server-Sent Events stream for live committed block updates.
/// New connections receive only the current latest block and then stream
/// forward from in-memory committed head state. Historical catch-up stays on
/// the canonical block endpoints.
pub async fn block_events(State(state): State<Arc<AppState>>) -> Sse<SseStream> {
let stream = make_block_stream(
state.pool.clone(),
state.head_tracker.clone(),
state.block_events_tx.subscribe(),
);
sse_response(stream)
}

fn sse_response<S>(stream: S) -> Sse<SseStream>
where
S: Stream<Item = Result<Event, Infallible>> + Send + 'static,
{
let stream: SseStream = Box::pin(stream);

Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
Expand All @@ -87,27 +116,6 @@ pub async fn block_events(
)
}

async fn fetch_latest_block(pool: &PgPool) -> Result<Option<Block>, sqlx::Error> {
sqlx::query_as(&format!(
"SELECT {} FROM blocks ORDER BY number DESC LIMIT 1",
BLOCK_COLUMNS
))
.fetch_optional(pool)
.await
}

async fn fetch_blocks_after(pool: &PgPool, cursor: Option<i64>) -> Result<Vec<Block>, sqlx::Error> {
let lower_bound = cursor.unwrap_or(-1);

sqlx::query_as(&format!(
"SELECT {} FROM blocks WHERE number > $1 ORDER BY number ASC LIMIT {}",
BLOCK_COLUMNS, FETCH_BATCH_SIZE
))
.bind(lower_bound)
.fetch_all(pool)
.await
}

fn block_to_event(block: Block) -> Option<Event> {
let event = NewBlockEvent { block };
serde_json::to_string(&event)
Expand All @@ -118,7 +126,9 @@ fn block_to_event(block: Block) -> Option<Event> {
#[cfg(test)]
mod tests {
use super::*;
use crate::head::HeadTracker;
use chrono::Utc;
use futures::StreamExt;

fn sample_block(number: i64) -> Block {
Block {
Expand All @@ -133,6 +143,13 @@ mod tests {
}
}

/// Lazy PgPool that never connects — safe for tests that don't hit the DB.
fn dummy_pool() -> PgPool {
sqlx::postgres::PgPoolOptions::new()
.connect_lazy("postgres://test@localhost:5432/test")
.expect("lazy pool creation should not fail")
}

#[test]
fn new_block_event_serializes_with_block_wrapper() {
let event = NewBlockEvent {
Expand Down Expand Up @@ -175,4 +192,105 @@ mod tests {
);
}
}

#[tokio::test]
async fn stream_seeds_from_head_tracker() {
let tracker = Arc::new(HeadTracker::empty(10));
tracker
.publish_committed_batch(vec![sample_block(42)])
.await;

let (tx, _) = broadcast::channel::<()>(16);
let rx = tx.subscribe();
let stream = make_block_stream(dummy_pool(), tracker, rx);
tokio::pin!(stream);

// Drop sender so loop terminates after the initial seed
drop(tx);

let first = tokio::time::timeout(Duration::from_secs(1), stream.next()).await;
assert!(
first.is_ok(),
"stream should yield initial event without blocking"
);
assert!(
first.unwrap().is_some(),
"stream should yield at least one event"
);
}

#[tokio::test]
async fn stream_replays_new_blocks_after_broadcast() {
let tracker = Arc::new(HeadTracker::empty(10));
tracker
.publish_committed_batch(vec![sample_block(42)])
.await;

let (tx, _) = broadcast::channel::<()>(16);
let rx = tx.subscribe();
let stream = make_block_stream(dummy_pool(), tracker.clone(), rx);
tokio::pin!(stream);

// Consume initial seed
let _ = tokio::time::timeout(Duration::from_secs(1), stream.next())
.await
.unwrap();

// Publish a new block and broadcast
tracker
.publish_committed_batch(vec![sample_block(43)])
.await;
tx.send(()).unwrap();

let second = tokio::time::timeout(Duration::from_secs(1), stream.next()).await;
assert!(second.is_ok(), "stream should yield event after broadcast");
assert!(
second.unwrap().is_some(),
"broadcast should trigger a new event"
);

drop(tx);
}

#[tokio::test]
async fn stream_terminates_when_client_behind_tail() {
// Buffer capacity 3: only keeps 3 most recent blocks
let tracker = Arc::new(HeadTracker::empty(3));
tracker
.publish_committed_batch(vec![sample_block(10), sample_block(11), sample_block(12)])
.await;

let (tx, _) = broadcast::channel::<()>(16);
let rx = tx.subscribe();
let stream = make_block_stream(dummy_pool(), tracker.clone(), rx);
tokio::pin!(stream);

// Consume initial seed (latest = block 12)
let _ = tokio::time::timeout(Duration::from_secs(1), stream.next())
.await
.unwrap();

// Advance buffer far ahead: client cursor=12, buffer will be [23,24,25]
tracker
.publish_committed_batch(vec![
sample_block(20),
sample_block(21),
sample_block(22),
sample_block(23),
sample_block(24),
sample_block(25),
])
.await;
tx.send(()).unwrap();

// Stream should detect behind-tail and terminate
let result = tokio::time::timeout(Duration::from_secs(2), async {
while (stream.next().await).is_some() {}
})
.await;
assert!(
result.is_ok(),
"stream should terminate when client falls behind replay tail"
);
}
}
Loading
Loading