From a81343ad46ba43802cde84b7b67b46f03d49228c Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 16 Jan 2026 10:54:27 -0800 Subject: [PATCH 1/9] graph, store: Replace std::sync::RwLock with parking_lot::RwLock for pool metrics Use parking_lot::RwLock instead of std::sync::RwLock for connection pool metric recording. parking_lot::RwLock is faster for short-held locks as it uses efficient spinning before parking, reducing tokio worker thread blocking during metric recording. This change helps reduce tokio threadpool contention when the connection pool is under heavy load, as the metric recording locks are held for only microseconds. Co-Authored-By: Claude Opus 4.5 --- graph/src/components/store/mod.rs | 4 +++- graph/src/data/graphql/load_manager.rs | 2 +- store/postgres/src/pool/manager.rs | 4 ++-- store/postgres/src/pool/mod.rs | 6 +++--- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index 77675967c25..c9f78cece5d 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -25,9 +25,11 @@ use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::fmt; use std::fmt::Display; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::Duration; +use crate::parking_lot::RwLock; + use async_trait::async_trait; use crate::blockchain::{Block, BlockHash, BlockPtr}; diff --git a/graph/src/data/graphql/load_manager.rs b/graph/src/data/graphql/load_manager.rs index b8bdb4a63d0..96fa23d7fa7 100644 --- a/graph/src/data/graphql/load_manager.rs +++ b/graph/src/data/graphql/load_manager.rs @@ -457,7 +457,7 @@ impl LoadManager { } fn overloaded(&self, wait_stats: &PoolWaitStats) -> (bool, Duration) { - let store_avg = wait_stats.read().unwrap().average(); + let store_avg = wait_stats.read().average(); let overloaded = store_avg .map(|average| average > ENV_VARS.load_threshold) .unwrap_or(false); diff --git a/store/postgres/src/pool/manager.rs b/store/postgres/src/pool/manager.rs index 4677ea6276b..2f5fe0fa00a 100644 --- a/store/postgres/src/pool/manager.rs +++ b/store/postgres/src/pool/manager.rs @@ -22,9 +22,10 @@ use std::collections::HashMap; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; -use std::sync::RwLock; use std::time::{Duration, Instant}; +use graph::parking_lot::RwLock; + use crate::pool::AsyncPool; /// Our own connection manager. It is pretty much the same as @@ -308,7 +309,6 @@ impl WaitMeter { pub(crate) fn add_conn_wait_time(&self, duration: Duration) { self.wait_stats .write() - .unwrap() .add_and_register(duration, &self.wait_gauge); } } diff --git a/store/postgres/src/pool/mod.rs b/store/postgres/src/pool/mod.rs index 20d332616a2..cf176db64c2 100644 --- a/store/postgres/src/pool/mod.rs +++ b/store/postgres/src/pool/mod.rs @@ -19,11 +19,13 @@ use graph::slog::warn; use graph::util::timed_rw_lock::TimedMutex; use tokio::sync::OwnedSemaphorePermit; +use std::collections::HashMap; use std::fmt::{self}; use std::ops::{Deref, DerefMut}; use std::sync::Arc; use std::time::Duration; -use std::{collections::HashMap, sync::RwLock}; + +use graph::parking_lot::RwLock; use crate::catalog; use crate::pool::manager::{ConnectionManager, WaitMeter}; @@ -720,7 +722,6 @@ impl PoolInner { let permit = self.query_semaphore.cheap_clone().acquire_owned().await; self.semaphore_wait_stats .write() - .unwrap() .add_and_register(start.elapsed(), &self.semaphore_wait_gauge); permit.unwrap() } @@ -734,7 +735,6 @@ impl PoolInner { let elapsed = start.elapsed(); self.indexing_semaphore_wait_stats .write() - .unwrap() .add_and_register(elapsed, &self.indexing_semaphore_wait_gauge); (permit.unwrap(), elapsed) } From cdb894ac9c3bd40f32609be26e935a2ad3b3e68e Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 16 Jan 2026 12:10:49 -0800 Subject: [PATCH 2/9] graph: Replace std::sync::RwLock with parking_lot::RwLock in LoadManager These locks are accessed on every GraphQL query, so using the faster parking_lot::RwLock reduces lock contention in the query path. Co-Authored-By: Claude Opus 4.5 --- graph/src/data/graphql/load_manager.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/graph/src/data/graphql/load_manager.rs b/graph/src/data/graphql/load_manager.rs index 96fa23d7fa7..0abba9521fc 100644 --- a/graph/src/data/graphql/load_manager.rs +++ b/graph/src/data/graphql/load_manager.rs @@ -4,9 +4,11 @@ use prometheus::core::GenericCounter; use rand::{prelude::Rng, rng}; use std::collections::{HashMap, HashSet}; use std::iter::FromIterator; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::{Duration, Instant}; +use crate::parking_lot::RwLock; + use crate::components::metrics::{Counter, GaugeVec, MetricsRegistry}; use crate::components::store::{DeploymentId, PoolWaitStats}; use crate::data::graphql::shape_hash::shape_hash; @@ -57,7 +59,7 @@ impl ShardEffort { } pub fn add(&self, shard: &str, qref: QueryRef, duration: Duration, gauge: &GaugeVec) { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.write(); inner.add(qref, duration); gauge .with_label_values(&[shard]) @@ -70,7 +72,7 @@ impl ShardEffort { /// data for the particular query, return `None` as the effort /// for the query pub fn current_effort(&self, qref: &QueryRef) -> (Option, Duration) { - let inner = self.inner.read().unwrap(); + let inner = self.inner.read(); let total_effort = inner.total.duration(); let query_effort = inner.effort.get(qref).map(|stats| stats.duration()); (query_effort, total_effort) @@ -381,7 +383,7 @@ impl LoadManager { let qref = QueryRef::new(deployment, shape_hash); - if self.jailed_queries.read().unwrap().contains(&qref) { + if self.jailed_queries.read().contains(&qref) { return if ENV_VARS.load_simulate { Proceed } else { @@ -426,7 +428,7 @@ impl LoadManager { "query_effort_ms" => query_effort, "total_effort_ms" => total_effort, "ratio" => format!("{:.4}", query_effort/total_effort)); - self.jailed_queries.write().unwrap().insert(qref); + self.jailed_queries.write().insert(qref); return if ENV_VARS.load_simulate { Proceed } else { @@ -465,7 +467,7 @@ impl LoadManager { } fn kill_state(&self, shard: &str) -> (f64, Instant) { - let state = self.kill_state.get(shard).unwrap().read().unwrap(); + let state = self.kill_state.get(shard).unwrap().read(); (state.kill_rate, state.last_update) } @@ -505,7 +507,7 @@ impl LoadManager { kill_rate = (kill_rate - KILL_RATE_STEP_DOWN).max(0.0); } let event = { - let mut state = self.kill_state.get(shard).unwrap().write().unwrap(); + let mut state = self.kill_state.get(shard).unwrap().write(); state.kill_rate = kill_rate; state.last_update = now; state.log_event(now, kill_rate, overloaded) From 6ba8f82b05a55de7b3f2ef9802b9f876bffcb6f2 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 16 Jan 2026 12:19:13 -0800 Subject: [PATCH 3/9] store: Replace std::sync::RwLock with parking_lot::RwLock in store_events Replace std::sync::RwLock with parking_lot::RwLock in the SubscriptionManager to reduce lock contention. parking_lot's RwLock is faster for short-held locks due to efficient spinning before parking, which helps reduce tokio threadpool contention. Co-Authored-By: Claude Opus 4.5 --- store/postgres/src/store_events.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/store/postgres/src/store_events.rs b/store/postgres/src/store_events.rs index 6189120f602..572c3a339a3 100644 --- a/store/postgres/src/store_events.rs +++ b/store/postgres/src/store_events.rs @@ -2,7 +2,9 @@ use graph::futures01::Stream; use graph::futures03::compat::Stream01CompatExt; use graph::futures03::stream::StreamExt; use graph::futures03::TryStreamExt; -use std::sync::{atomic::Ordering, Arc, RwLock}; +use std::sync::{atomic::Ordering, Arc}; + +use graph::parking_lot::RwLock; use std::{collections::HashMap, sync::atomic::AtomicUsize}; use tokio::sync::mpsc::{channel, Sender}; use tokio_stream::wrappers::ReceiverStream; @@ -127,7 +129,7 @@ impl SubscriptionManager { // Send to `subscriptions`. { - let senders = subscriptions.read().unwrap().clone(); + let senders = subscriptions.read().clone(); // Write change to all matching subscription streams; remove subscriptions // whose receiving end has been dropped @@ -138,7 +140,7 @@ impl SubscriptionManager { "Failed to send store event to subscriber {}: {}", id, e ); // Receiver was dropped - subscriptions.write().unwrap().remove(&id); + subscriptions.write().remove(&id); } } } @@ -187,7 +189,7 @@ impl SubscriptionManager { // Cleanup `subscriptions`. { - let mut subscriptions = subscriptions.write().unwrap(); + let mut subscriptions = subscriptions.write(); // Obtain IDs of subscriptions whose receiving end has gone let stale_ids = subscriptions @@ -218,7 +220,7 @@ impl SubscriptionManagerTrait for SubscriptionManager { let (sender, receiver) = channel(100); // Add the new subscription - self.subscriptions.write().unwrap().insert(id, sender); + self.subscriptions.write().insert(id, sender); // Return the subscription ID and entity change stream ReceiverStream::new(receiver) From 5db5ff871bc7b10fbf35d6f448254df5aaf60f45 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 16 Jan 2026 12:22:25 -0800 Subject: [PATCH 4/9] store: Replace std::sync::RwLock with parking_lot::RwLock in writable Replace std::sync::RwLock with parking_lot::RwLock in the background writer's Request::Write batch handling. This reduces lock contention as parking_lot's RwLock is faster for short-held locks due to efficient spinning before parking. Co-Authored-By: Claude Opus 4.5 --- store/postgres/src/writable.rs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index ff5ffb2d45b..928cbdbe76f 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -1,7 +1,9 @@ use std::collections::BTreeSet; use std::ops::{Deref, Range}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Mutex, RwLock, TryLockError as RwLockError}; +use std::sync::Mutex; + +use graph::parking_lot::RwLock; use std::time::Instant; use std::{collections::BTreeMap, sync::Arc}; @@ -574,7 +576,7 @@ impl BlockTracker { // processed. let res = queue.find_map(|req| match req.as_ref() { Request::Write { batch, .. } => { - let batch = batch.read().unwrap(); + let batch = batch.read(); tracker.write(&batch.block_ptr); if batch.first_block <= tracker.revert { let res = f(batch.deref(), tracker.revert); @@ -613,7 +615,7 @@ impl BlockTracker { let accum = queue.fold(init, |accum, req| { match req.as_ref() { Request::Write { batch, .. } => { - let batch = batch.read().unwrap(); + let batch = batch.read(); let mut accum = accum; tracker.write(&batch.block_ptr); if batch.first_block <= tracker.revert { @@ -740,7 +742,7 @@ impl std::fmt::Debug for Request { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Write { batch, store, .. } => { - let batch = batch.read().unwrap(); + let batch = batch.read(); write!( f, "write[{}, {:p}, {} entities]", @@ -811,7 +813,7 @@ impl Request { } => { let start = Instant::now(); - let batch = batch.write().unwrap().close(); + let batch = batch.write().close(); if let Some(err) = &batch.error { // This can happen when appending to the batch failed @@ -850,7 +852,7 @@ impl Request { fn should_process(&self) -> bool { match self { Request::Write { queued, batch, .. } => { - batch.read().unwrap().weight() >= ENV_VARS.store.write_batch_size + batch.read().weight() >= ENV_VARS.store.write_batch_size || queued.elapsed() >= ENV_VARS.store.write_batch_duration } Request::RevertTo { .. } | Request::Stop => true, @@ -1169,7 +1171,7 @@ impl Queue { // duration of the write, and we do not want to // slow down queueing requests unnecessarily match existing.try_write() { - Ok(mut existing) => { + Some(mut existing) => { if existing.weight() < ENV_VARS.store.write_batch_size { let res = existing.append(batch).map(|()| None); if existing.weight() >= ENV_VARS.store.write_batch_size { @@ -1180,16 +1182,13 @@ impl Queue { Ok(Some(batch)) } } - Err(RwLockError::WouldBlock) => { + None => { // This branch can cause batches that // are not 'full' at the head of the // queue, something that start_writer // has to take into account Ok(Some(batch)) } - Err(RwLockError::Poisoned(e)) => { - panic!("rwlock on batch was poisoned {:?}", e); - } } } else { Ok(Some(batch)) From 6469e4bf0e36665794a8ce90a615a0d95a539eb3 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 16 Jan 2026 12:24:18 -0800 Subject: [PATCH 5/9] graph: Replace std::sync::RwLock with parking_lot::RwLock in TimedCache Replace std::sync::RwLock with parking_lot::RwLock in TimedCache for faster lock acquisition on cache gets and sets. parking_lot's RwLock uses efficient spinning before parking, reducing contention. Co-Authored-By: Claude Opus 4.5 --- graph/src/util/timed_cache.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/graph/src/util/timed_cache.rs b/graph/src/util/timed_cache.rs index 8f64c844630..587a49a176a 100644 --- a/graph/src/util/timed_cache.rs +++ b/graph/src/util/timed_cache.rs @@ -3,10 +3,12 @@ use std::{ cmp::Eq, collections::HashMap, hash::Hash, - sync::{Arc, RwLock}, + sync::Arc, time::{Duration, Instant}, }; +use crate::parking_lot::RwLock; + /// Caching of values for a specified amount of time #[derive(Debug)] struct CacheEntry { @@ -49,7 +51,7 @@ impl TimedCache { K: Borrow + Eq + Hash, Q: Hash + Eq + ?Sized, { - match self.entries.read().unwrap().get(key) { + match self.entries.read().get(key) { Some(CacheEntry { value, expires }) if expires >= &now => Some(value.clone()), _ => None, } @@ -72,11 +74,11 @@ impl TimedCache { value, expires: now + self.ttl, }; - self.entries.write().unwrap().insert(key, entry); + self.entries.write().insert(key, entry); } pub fn clear(&self) { - self.entries.write().unwrap().clear(); + self.entries.write().clear(); } pub fn find(&self, pred: F) -> Option> @@ -85,7 +87,6 @@ impl TimedCache { { self.entries .read() - .unwrap() .values() .find(move |entry| pred(entry.value.as_ref())) .map(|entry| entry.value.clone()) @@ -101,7 +102,6 @@ impl TimedCache { { self.entries .write() - .unwrap() .remove(key) .map(|CacheEntry { value, expires }| (value, expires >= Instant::now())) } From 6e2009b5b982368d0ecaaba337fc8691b17c89bb Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 16 Jan 2026 12:28:01 -0800 Subject: [PATCH 6/9] store: Replace std::sync::RwLock with parking_lot::RwLock in BlockStore Replace std::sync::RwLock with parking_lot::RwLock for the chain stores map in BlockStore. This reduces lock contention when looking up or modifying chain stores. Co-Authored-By: Claude Opus 4.5 --- store/postgres/src/block_store.rs | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/store/postgres/src/block_store.rs b/store/postgres/src/block_store.rs index 674c274ac5c..48ea768c033 100644 --- a/store/postgres/src/block_store.rs +++ b/store/postgres/src/block_store.rs @@ -1,8 +1,6 @@ -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, - time::Duration, -}; +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use graph::parking_lot::RwLock; use anyhow::anyhow; use async_trait::async_trait; @@ -321,7 +319,6 @@ impl BlockStore { let configured_chains = block_store .stores .read() - .unwrap() .keys() .cloned() .collect::>(); @@ -410,7 +407,6 @@ impl BlockStore { let store = Arc::new(store); self.stores .write() - .unwrap() .insert(chain.name.clone(), store.clone()); Ok(store) } @@ -475,12 +471,7 @@ impl BlockStore { } async fn store(&self, chain: &str) -> Option> { - let store = self - .stores - .read() - .unwrap() - .get(chain) - .map(CheapClone::cheap_clone); + let store = self.stores.read().get(chain).map(CheapClone::cheap_clone); if store.is_some() { return store; } @@ -506,7 +497,7 @@ impl BlockStore { chain_store.drop_chain().await?; - self.stores.write().unwrap().remove(chain); + self.stores.write().remove(chain); Ok(()) } @@ -516,7 +507,6 @@ impl BlockStore { fn stores(&self) -> Vec> { self.stores .read() - .unwrap() .values() .map(CheapClone::cheap_clone) .collect() From 0a8b92ea3ca72e3c56833df0c28c924a4a02a2c3 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 16 Jan 2026 12:30:30 -0800 Subject: [PATCH 7/9] graph: Replace std::sync::RwLock with parking_lot::RwLock in MetricsRegistry Replace std::sync::RwLock with parking_lot::RwLock for the global metrics caches in MetricsRegistry. This reduces lock contention when registering or looking up global metrics. Co-Authored-By: Claude Opus 4.5 --- graph/src/components/metrics/registry.rs | 31 +++++++++--------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/graph/src/components/metrics/registry.rs b/graph/src/components/metrics/registry.rs index cb210040952..4777ea6f62f 100644 --- a/graph/src/components/metrics/registry.rs +++ b/graph/src/components/metrics/registry.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; + +use crate::parking_lot::RwLock; use prometheus::{labels, Histogram, IntCounterVec}; use prometheus::{IntCounter, IntGauge}; @@ -109,14 +111,13 @@ impl MetricsRegistry { }; let counters = CounterVec::new(opts, variable_labels)?; let id = counters.desc().first().unwrap().id; - let maybe_counter = self.global_counter_vecs.read().unwrap().get(&id).cloned(); + let maybe_counter = self.global_counter_vecs.read().get(&id).cloned(); if let Some(counters) = maybe_counter { Ok(counters) } else { self.register(name, Box::new(counters.clone())); self.global_counter_vecs .write() - .unwrap() .insert(id, counters.clone()); Ok(counters) } @@ -161,15 +162,12 @@ impl MetricsRegistry { ) -> Result { let counter = counter_with_labels(name, help, const_labels)?; let id = counter.desc().first().unwrap().id; - let maybe_counter = self.global_counters.read().unwrap().get(&id).cloned(); + let maybe_counter = self.global_counters.read().get(&id).cloned(); if let Some(counter) = maybe_counter { Ok(counter) } else { self.register(name, Box::new(counter.clone())); - self.global_counters - .write() - .unwrap() - .insert(id, counter.clone()); + self.global_counters.write().insert(id, counter.clone()); Ok(counter) } } @@ -210,15 +208,12 @@ impl MetricsRegistry { ) -> Result { let gauge = gauge_with_labels(name, help, const_labels)?; let id = gauge.desc().first().unwrap().id; - let maybe_gauge = self.global_gauges.read().unwrap().get(&id).cloned(); + let maybe_gauge = self.global_gauges.read().get(&id).cloned(); if let Some(gauge) = maybe_gauge { Ok(gauge) } else { self.register(name, Box::new(gauge.clone())); - self.global_gauges - .write() - .unwrap() - .insert(id, gauge.clone()); + self.global_gauges.write().insert(id, gauge.clone()); Ok(gauge) } } @@ -232,15 +227,12 @@ impl MetricsRegistry { let opts = Opts::new(name, help); let gauges = GaugeVec::new(opts, variable_labels)?; let id = gauges.desc().first().unwrap().id; - let maybe_gauge = self.global_gauge_vecs.read().unwrap().get(&id).cloned(); + let maybe_gauge = self.global_gauge_vecs.read().get(&id).cloned(); if let Some(gauges) = maybe_gauge { Ok(gauges) } else { self.register(name, Box::new(gauges.clone())); - self.global_gauge_vecs - .write() - .unwrap() - .insert(id, gauges.clone()); + self.global_gauge_vecs.write().insert(id, gauges.clone()); Ok(gauges) } } @@ -254,14 +246,13 @@ impl MetricsRegistry { let opts = HistogramOpts::new(name, help); let histograms = HistogramVec::new(opts, variable_labels)?; let id = histograms.desc().first().unwrap().id; - let maybe_histogram = self.global_histogram_vecs.read().unwrap().get(&id).cloned(); + let maybe_histogram = self.global_histogram_vecs.read().get(&id).cloned(); if let Some(histograms) = maybe_histogram { Ok(histograms) } else { self.register(name, Box::new(histograms.clone())); self.global_histogram_vecs .write() - .unwrap() .insert(id, histograms.clone()); Ok(histograms) } From dfd627bd0bb7e7d802f58b567692a53d40e2a253 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 16 Jan 2026 12:32:53 -0800 Subject: [PATCH 8/9] core: Replace std::sync::RwLock with parking_lot::RwLock in SubgraphKeepAlive Replace std::sync::RwLock with parking_lot::RwLock for the alive_map in SubgraphKeepAlive. This reduces lock contention when tracking running subgraph deployments. Co-Authored-By: Claude Opus 4.5 --- core/src/subgraph/context/mod.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/subgraph/context/mod.rs b/core/src/subgraph/context/mod.rs index e6f485e2552..2b7d560dfc1 100644 --- a/core/src/subgraph/context/mod.rs +++ b/core/src/subgraph/context/mod.rs @@ -23,7 +23,9 @@ use graph::{ slog::Logger, }; use std::collections::HashMap; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; + +use graph::parking_lot::RwLock; use tokio::sync::mpsc; use self::instance::SubgraphInstance; @@ -44,18 +46,18 @@ impl SubgraphKeepAlive { } pub fn remove(&self, deployment_id: &DeploymentId) { - self.alive_map.write().unwrap().remove(deployment_id); + self.alive_map.write().remove(deployment_id); self.sg_metrics.running_count.dec(); } pub fn insert(&self, deployment_id: DeploymentId, guard: CancelGuard) { - let old = self.alive_map.write().unwrap().insert(deployment_id, guard); + let old = self.alive_map.write().insert(deployment_id, guard); if old.is_none() { self.sg_metrics.running_count.inc(); } } pub fn contains(&self, deployment_id: &DeploymentId) -> bool { - self.alive_map.read().unwrap().contains_key(deployment_id) + self.alive_map.read().contains_key(deployment_id) } } From 7ecdbda0fd4017cf584a539c689ecafbfb2274c6 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 16 Jan 2026 14:17:52 -0800 Subject: [PATCH 9/9] store: Add adaptive TTL cache for chain_head_ptr() With many subgraphs, chain_head_ptr() was querying the database on every call, leading to connection pool saturation. This adds an adaptive cache that learns optimal TTL from observed block frequency. The cache uses EWMA to estimate block time and sets TTL to 1/4 of that estimate (bounded by 20ms-2000ms). During warmup (first 5 blocks), it uses the minimum TTL to avoid missing blocks on unknown chains. New metrics: - chain_head_ptr_cache_hits: cache hit counter - chain_head_ptr_cache_misses: cache miss counter (DB queries) - chain_head_ptr_cache_block_time_ms: estimated block time per chain Safety escape hatch: set GRAPH_STORE_DISABLE_CHAIN_HEAD_PTR_CACHE=true to revert to the previous uncached behavior. Co-Authored-By: Claude Opus 4.5 --- graph/src/env/store.rs | 6 + store/postgres/src/chain_store.rs | 191 +++++++++++++++++++++++++++++- 2 files changed, 194 insertions(+), 3 deletions(-) diff --git a/graph/src/env/store.rs b/graph/src/env/store.rs index 7a3c16d5b81..23294ad0a77 100644 --- a/graph/src/env/store.rs +++ b/graph/src/env/store.rs @@ -163,6 +163,9 @@ pub struct EnvVarsStore { /// Disables storing or reading `eth_call` results from the store call cache. /// Set by `GRAPH_STORE_DISABLE_CALL_CACHE`. Defaults to false. pub disable_call_cache: bool, + /// Set by `GRAPH_STORE_DISABLE_CHAIN_HEAD_PTR_CACHE`. Default is false. + /// Set to true to disable chain_head_ptr caching (safety escape hatch). + pub disable_chain_head_ptr_cache: bool, } // This does not print any values avoid accidentally leaking any sensitive env vars @@ -224,6 +227,7 @@ impl TryFrom for EnvVarsStore { account_like_min_versions_count: x.account_like_min_versions_count, account_like_max_unique_ratio: x.account_like_max_unique_ratio.map(|r| r.0), disable_call_cache: x.disable_call_cache, + disable_chain_head_ptr_cache: x.disable_chain_head_ptr_cache, }; if let Some(timeout) = vars.batch_timeout { if timeout < 2 * vars.batch_target_duration { @@ -331,6 +335,8 @@ pub struct InnerStore { account_like_max_unique_ratio: Option, #[envconfig(from = "GRAPH_STORE_DISABLE_CALL_CACHE", default = "false")] disable_call_cache: bool, + #[envconfig(from = "GRAPH_STORE_DISABLE_CHAIN_HEAD_PTR_CACHE", default = "false")] + disable_chain_head_ptr_cache: bool, } #[derive(Clone, Copy, Debug)] diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index cc3b6949fa8..556c6b80d2d 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -17,6 +17,8 @@ use graph::util::herd_cache::HerdCache; use std::collections::BTreeMap; use std::future::Future; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{Duration, Instant}; use std::{ collections::HashMap, convert::{TryFrom, TryInto}, @@ -1873,6 +1875,10 @@ pub struct ChainStoreMetrics { chain_head_cache_latest_block_num: Box, chain_head_cache_hits: Box, chain_head_cache_misses: Box, + // Metrics for chain_head_ptr() cache + chain_head_ptr_cache_hits: Box, + chain_head_ptr_cache_misses: Box, + chain_head_ptr_cache_block_time_ms: Box, } impl ChainStoreMetrics { @@ -1914,12 +1920,37 @@ impl ChainStoreMetrics { ) .expect("Can't register the counter"); + let chain_head_ptr_cache_hits = registry + .new_counter_vec( + "chain_head_ptr_cache_hits", + "Number of times the chain_head_ptr cache was hit", + vec!["network".to_string()], + ) + .expect("Can't register the counter"); + let chain_head_ptr_cache_misses = registry + .new_counter_vec( + "chain_head_ptr_cache_misses", + "Number of times the chain_head_ptr cache was missed", + vec!["network".to_string()], + ) + .expect("Can't register the counter"); + let chain_head_ptr_cache_block_time_ms = registry + .new_gauge_vec( + "chain_head_ptr_cache_block_time_ms", + "Estimated block time in milliseconds used for adaptive cache TTL", + vec!["network".to_string()], + ) + .expect("Can't register the gauge"); + Self { chain_head_cache_size, chain_head_cache_oldest_block_num, chain_head_cache_latest_block_num, chain_head_cache_hits, chain_head_cache_misses, + chain_head_ptr_cache_hits, + chain_head_ptr_cache_misses, + chain_head_ptr_cache_block_time_ms, } } @@ -1959,6 +1990,143 @@ impl ChainStoreMetrics { .unwrap() .inc_by(misses as f64); } + + pub fn record_chain_head_ptr_cache_hit(&self, network: &str) { + self.chain_head_ptr_cache_hits + .with_label_values(&[network]) + .inc(); + } + + pub fn record_chain_head_ptr_cache_miss(&self, network: &str) { + self.chain_head_ptr_cache_misses + .with_label_values(&[network]) + .inc(); + } + + pub fn set_chain_head_ptr_block_time(&self, network: &str, block_time_ms: u64) { + self.chain_head_ptr_cache_block_time_ms + .with_label_values(&[network]) + .set(block_time_ms as f64); + } +} + +const MIN_TTL_MS: u64 = 20; +const MAX_TTL_MS: u64 = 2000; +const MIN_OBSERVATIONS: u64 = 5; + +/// Adaptive cache for chain_head_ptr() that learns optimal TTL from block frequency. +struct ChainHeadPtrCache { + /// Cached value and when it expires + entry: RwLock>, + /// Estimated milliseconds between blocks (EWMA) + estimated_block_time_ms: AtomicU64, + /// When we last observed the chain head change + last_change: RwLock, + /// Number of block changes observed (for warmup) + observations: AtomicU64, + /// Metrics for recording cache hits/misses + metrics: Arc, + /// Chain name for metric labels + chain: String, +} + +impl ChainHeadPtrCache { + fn new(metrics: Arc, chain: String) -> Self { + Self { + entry: RwLock::new(None), + estimated_block_time_ms: AtomicU64::new(0), + last_change: RwLock::new(Instant::now()), + observations: AtomicU64::new(0), + metrics, + chain, + } + } + + /// Returns cached value if still valid, or None if cache is disabled/missed. + /// Records hit/miss metrics automatically. + fn get(&self) -> Option { + if ENV_VARS.store.disable_chain_head_ptr_cache { + return None; + } + let guard = self.entry.read(); + if let Some((value, expires)) = guard.as_ref() { + if Instant::now() < *expires { + self.metrics.record_chain_head_ptr_cache_hit(&self.chain); + return Some(value.clone()); + } + } + self.metrics.record_chain_head_ptr_cache_miss(&self.chain); + None + } + + /// Compute current TTL - MIN_TTL during warmup, then 1/4 of estimated block time + fn current_ttl(&self) -> Duration { + let obs = AtomicU64::load(&self.observations, Ordering::Relaxed); + if obs < MIN_OBSERVATIONS { + return Duration::from_millis(MIN_TTL_MS); + } + + let block_time = AtomicU64::load(&self.estimated_block_time_ms, Ordering::Relaxed); + let ttl_ms = (block_time / 4).clamp(MIN_TTL_MS, MAX_TTL_MS); + Duration::from_millis(ttl_ms) + } + + /// Cache a new value, updating block time estimate if value changed. + /// Does nothing if cache is disabled. + fn set(&self, new_value: BlockPtr) { + if ENV_VARS.store.disable_chain_head_ptr_cache { + return; + } + let now = Instant::now(); + + // Check if block changed + let old_value = { + let guard = self.entry.read(); + guard.as_ref().map(|(v, _)| v.clone()) + }; + + // Only update estimate if we have a previous value and block number advanced + // (skip reorgs where new block number <= old) + if let Some(old_ptr) = old_value.as_ref() { + if new_value.number > old_ptr.number { + let mut last_change = self.last_change.write(); + let delta_ms = now.duration_since(*last_change).as_millis() as u64; + *last_change = now; + + let blocks_advanced = (new_value.number - old_ptr.number) as u64; + + // Increment observation count + let obs = AtomicU64::fetch_add(&self.observations, 1, Ordering::Relaxed); + + // Ignore unreasonable deltas (> 60s) + if delta_ms > 0 && delta_ms < 60_000 { + let per_block_ms = delta_ms / blocks_advanced; + let new_estimate = if obs == 0 { + // First observation - use as initial estimate + per_block_ms + } else { + // EWMA: new = 0.8 * old + 0.2 * observed + let old_estimate = + AtomicU64::load(&self.estimated_block_time_ms, Ordering::Relaxed); + (old_estimate * 4 + per_block_ms) / 5 + }; + AtomicU64::store( + &self.estimated_block_time_ms, + new_estimate, + Ordering::Relaxed, + ); + + // Update metric gauge + self.metrics + .set_chain_head_ptr_block_time(&self.chain, new_estimate); + } + } + } + + // Compute TTL and store with expiry + let ttl = self.current_ttl(); + *self.entry.write() = Some((new_value, now + ttl)); + } } pub struct ChainStore { @@ -1980,6 +2148,8 @@ pub struct ChainStore { blocks_by_number_cache: HerdCache>, StoreError>>>, ancestor_cache: HerdCache, StoreError>>>, + /// Adaptive cache for chain_head_ptr() + chain_head_ptr_cache: ChainHeadPtrCache, } impl ChainStore { @@ -1994,10 +2164,11 @@ impl ChainStore { metrics: Arc, ) -> Self { let recent_blocks_cache = - RecentBlocksCache::new(recent_blocks_cache_capacity, chain.clone(), metrics); + RecentBlocksCache::new(recent_blocks_cache_capacity, chain.clone(), metrics.clone()); let blocks_by_hash_cache = HerdCache::new(format!("chain_{}_blocks_by_hash", chain)); let blocks_by_number_cache = HerdCache::new(format!("chain_{}_blocks_by_number", chain)); let ancestor_cache = HerdCache::new(format!("chain_{}_ancestor", chain)); + let chain_head_ptr_cache = ChainHeadPtrCache::new(metrics, chain.clone()); ChainStore { logger, pool, @@ -2009,6 +2180,7 @@ impl ChainStore { blocks_by_hash_cache, blocks_by_number_cache, ancestor_cache, + chain_head_ptr_cache, } } @@ -2351,8 +2523,14 @@ impl ChainHeadStore for ChainStore { async fn chain_head_ptr(self: Arc) -> Result, Error> { use public::ethereum_networks::dsl::*; + // Check cache first (handles disabled check and metrics internally) + if let Some(cached) = self.chain_head_ptr_cache.get() { + return Ok(Some(cached)); + } + + // Query database let mut conn = self.pool.get_permitted().await?; - Ok(ethereum_networks + let result = ethereum_networks .select((head_block_hash, head_block_number)) .filter(name.eq(&self.chain)) .load::<(Option, Option)>(&mut conn) @@ -2375,7 +2553,14 @@ impl ChainHeadStore for ChainStore { _ => unreachable!(), }) .and_then(|opt: Option| opt) - })?) + })?; + + // Cache the result (set() handles disabled check internally) + if let Some(ref ptr) = result { + self.chain_head_ptr_cache.set(ptr.clone()); + } + + Ok(result) } async fn chain_head_cursor(&self) -> Result, Error> {