Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,10 @@ RPC_BATCH_SIZE=20
# API_PORT=3000
# API_DB_MAX_CONNECTIONS=20
# SSE_REPLAY_BUFFER_BLOCKS=4096 # replay tail used only for active connected clients

# Optional: ev-node Connect RPC URL for L2 DA (Data Availability) inclusion tracking.
# When set, a background worker queries ev-node for Celestia DA heights per block.
# EVNODE_URL=http://localhost:7331

# Number of concurrent requests to ev-node for DA status backfill (default: 50)
# DA_WORKER_CONCURRENCY=50
17 changes: 11 additions & 6 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,21 @@ For large tables (transactions, addresses), use `pg_class.reltuples` instead of
### AppState (API)
```rust
pub struct AppState {
pub pool: PgPool, // API pool only
pub block_events_tx: broadcast::Sender<()>, // shared with indexer
pub pool: PgPool, // API pool only
pub block_events_tx: broadcast::Sender<()>, // shared with indexer
pub da_events_tx: broadcast::Sender<Vec<i64>>, // shared with DA worker
pub rpc_url: String,
pub solc_path: String,
pub admin_api_key: Option<String>,
pub evnode_url: Option<String>, // DA feature flag
}
```

### DA tracking (optional)
When `EVNODE_URL` is set, a background DA worker queries ev-node for Celestia inclusion heights per block. Updates are pushed to SSE clients via an in-process `broadcast::Sender<Vec<i64>>` (block numbers that were updated). The SSE handler fetches DA status from the database and streams `da_batch` events.

### Frontend API client
- Base URL: `/api` (proxied by nginx to `atlas-server:3000`)
- `GET /api/status` → `{ block_height, indexed_at }` — single key-value lookup from `indexer_state`, sub-ms. Used by the navbar as a polling fallback when SSE is disconnected.
- `GET /api/events` → SSE stream of `new_block` events, one per block in order. Primary live-update path for navbar counter and blocks page. Falls back to `/api/status` polling on disconnect.
- `GET /api/status` → `{ block_height, indexed_at, features: { da_tracking } }` — single key-value lookup from `indexer_state`, sub-ms. Used by the navbar as a polling fallback when SSE is disconnected.
- `GET /api/events` → SSE stream of `new_block` and `da_batch` events. Primary live-update path for navbar counter, blocks page, and DA status. Falls back to `/api/status` polling on disconnect.

## Important Conventions

Expand All @@ -115,6 +118,8 @@ Key vars (see `.env.example` for full list):
| `ADMIN_API_KEY` | API | none |
| `API_HOST` | API | `127.0.0.1` |
| `API_PORT` | API | `3000` |
| `EVNODE_URL` | server | none (DA tracking disabled) |
| `DA_WORKER_CONCURRENCY` | DA worker | `50` |

## Running Locally

Expand Down
12 changes: 12 additions & 0 deletions backend/crates/atlas-common/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@ pub struct Block {
pub indexed_at: DateTime<Utc>,
}

/// DA (Data Availability) status for a block on L2 chains using Celestia.
/// Only populated when EVNODE_URL is configured and the DA worker has checked the block.
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct BlockDaStatus {
pub block_number: i64,
/// Celestia height where the block header was submitted. 0 = pending.
pub header_da_height: i64,
/// Celestia height where the block data was submitted. 0 = pending.
pub data_da_height: i64,
pub updated_at: DateTime<Utc>,
}

/// Transaction data as stored in the database
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct Transaction {
Expand Down
58 changes: 50 additions & 8 deletions backend/crates/atlas-server/src/api/handlers/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,27 @@ use axum::{
extract::{Path, Query, State},
Json,
};
use serde::Serialize;
use std::sync::Arc;

use crate::api::error::ApiResult;
use crate::api::AppState;
use atlas_common::{AtlasError, Block, PaginatedResponse, Pagination, Transaction};
use atlas_common::{AtlasError, Block, BlockDaStatus, PaginatedResponse, Pagination, Transaction};

/// Block response with optional DA status.
/// DA fields are always present in the JSON (null when no data),
/// so the frontend can rely on a stable schema.
#[derive(Serialize)]
pub struct BlockResponse {
#[serde(flatten)]
pub block: Block,
pub da_status: Option<BlockDaStatus>,
}

pub async fn list_blocks(
State(state): State<Arc<AppState>>,
Query(pagination): Query<Pagination>,
) -> ApiResult<Json<PaginatedResponse<Block>>> {
) -> ApiResult<Json<PaginatedResponse<BlockResponse>>> {
// Use MAX(number) + 1 instead of COUNT(*) - blocks are sequential so this is accurate
// This is ~6500x faster than COUNT(*) on large tables
let total: (Option<i64>,) = sqlx::query_as("SELECT MAX(number) + 1 FROM blocks")
Expand All @@ -30,15 +41,37 @@ pub async fn list_blocks(
FROM blocks
WHERE number <= $2
ORDER BY number DESC
LIMIT $1"
LIMIT $1",
)
.bind(limit)
.bind(cursor)
.fetch_all(&state.pool)
.await?;

// Batch-fetch DA status for all blocks in this page
let block_numbers: Vec<i64> = blocks.iter().map(|b| b.number).collect();
let da_rows: Vec<BlockDaStatus> = sqlx::query_as(
"SELECT block_number, header_da_height, data_da_height, updated_at
FROM block_da_status
WHERE block_number = ANY($1)",
)
.bind(&block_numbers)
.fetch_all(&state.pool)
.await?;

let da_map: std::collections::HashMap<i64, BlockDaStatus> =
da_rows.into_iter().map(|d| (d.block_number, d)).collect();

let responses: Vec<BlockResponse> = blocks
.into_iter()
.map(|block| {
let da_status = da_map.get(&block.number).cloned();
BlockResponse { block, da_status }
})
.collect();

Ok(Json(PaginatedResponse::new(
blocks,
responses,
pagination.page,
pagination.limit,
total_count,
Expand All @@ -48,18 +81,27 @@ pub async fn list_blocks(
pub async fn get_block(
State(state): State<Arc<AppState>>,
Path(number): Path<i64>,
) -> ApiResult<Json<Block>> {
) -> ApiResult<Json<BlockResponse>> {
let block: Block = sqlx::query_as(
"SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at
FROM blocks
WHERE number = $1"
WHERE number = $1",
)
.bind(number)
.fetch_optional(&state.pool)
.await?
.ok_or_else(|| AtlasError::NotFound(format!("Block {} not found", number)))?;

Ok(Json(block))
let da_status: Option<BlockDaStatus> = sqlx::query_as(
"SELECT block_number, header_da_height, data_da_height, updated_at
FROM block_da_status
WHERE block_number = $1",
)
.bind(number)
.fetch_optional(&state.pool)
.await?;

Ok(Json(BlockResponse { block, da_status }))
}

pub async fn get_block_transactions(
Expand All @@ -77,7 +119,7 @@ pub async fn get_block_transactions(
FROM transactions
WHERE block_number = $1
ORDER BY block_index ASC
LIMIT $2 OFFSET $3"
LIMIT $2 OFFSET $3",
)
.bind(number)
.bind(pagination.limit())
Expand Down
Loading
Loading