diff --git a/core/src/subgraph/context/mod.rs b/core/src/subgraph/context/mod.rs index e6f485e2552..2b7d560dfc1 100644 --- a/core/src/subgraph/context/mod.rs +++ b/core/src/subgraph/context/mod.rs @@ -23,7 +23,9 @@ use graph::{ slog::Logger, }; use std::collections::HashMap; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; + +use graph::parking_lot::RwLock; use tokio::sync::mpsc; use self::instance::SubgraphInstance; @@ -44,18 +46,18 @@ impl SubgraphKeepAlive { } pub fn remove(&self, deployment_id: &DeploymentId) { - self.alive_map.write().unwrap().remove(deployment_id); + self.alive_map.write().remove(deployment_id); self.sg_metrics.running_count.dec(); } pub fn insert(&self, deployment_id: DeploymentId, guard: CancelGuard) { - let old = self.alive_map.write().unwrap().insert(deployment_id, guard); + let old = self.alive_map.write().insert(deployment_id, guard); if old.is_none() { self.sg_metrics.running_count.inc(); } } pub fn contains(&self, deployment_id: &DeploymentId) -> bool { - self.alive_map.read().unwrap().contains_key(deployment_id) + self.alive_map.read().contains_key(deployment_id) } } diff --git a/graph/src/components/metrics/registry.rs b/graph/src/components/metrics/registry.rs index cb210040952..4777ea6f62f 100644 --- a/graph/src/components/metrics/registry.rs +++ b/graph/src/components/metrics/registry.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; + +use crate::parking_lot::RwLock; use prometheus::{labels, Histogram, IntCounterVec}; use prometheus::{IntCounter, IntGauge}; @@ -109,14 +111,13 @@ impl MetricsRegistry { }; let counters = CounterVec::new(opts, variable_labels)?; let id = counters.desc().first().unwrap().id; - let maybe_counter = self.global_counter_vecs.read().unwrap().get(&id).cloned(); + let maybe_counter = self.global_counter_vecs.read().get(&id).cloned(); if let Some(counters) = maybe_counter { Ok(counters) } else { self.register(name, Box::new(counters.clone())); self.global_counter_vecs .write() - .unwrap() .insert(id, counters.clone()); Ok(counters) } @@ -161,15 +162,12 @@ impl MetricsRegistry { ) -> Result { let counter = counter_with_labels(name, help, const_labels)?; let id = counter.desc().first().unwrap().id; - let maybe_counter = self.global_counters.read().unwrap().get(&id).cloned(); + let maybe_counter = self.global_counters.read().get(&id).cloned(); if let Some(counter) = maybe_counter { Ok(counter) } else { self.register(name, Box::new(counter.clone())); - self.global_counters - .write() - .unwrap() - .insert(id, counter.clone()); + self.global_counters.write().insert(id, counter.clone()); Ok(counter) } } @@ -210,15 +208,12 @@ impl MetricsRegistry { ) -> Result { let gauge = gauge_with_labels(name, help, const_labels)?; let id = gauge.desc().first().unwrap().id; - let maybe_gauge = self.global_gauges.read().unwrap().get(&id).cloned(); + let maybe_gauge = self.global_gauges.read().get(&id).cloned(); if let Some(gauge) = maybe_gauge { Ok(gauge) } else { self.register(name, Box::new(gauge.clone())); - self.global_gauges - .write() - .unwrap() - .insert(id, gauge.clone()); + self.global_gauges.write().insert(id, gauge.clone()); Ok(gauge) } } @@ -232,15 +227,12 @@ impl MetricsRegistry { let opts = Opts::new(name, help); let gauges = GaugeVec::new(opts, variable_labels)?; let id = gauges.desc().first().unwrap().id; - let maybe_gauge = self.global_gauge_vecs.read().unwrap().get(&id).cloned(); + let maybe_gauge = self.global_gauge_vecs.read().get(&id).cloned(); if let Some(gauges) = maybe_gauge { Ok(gauges) } else { self.register(name, Box::new(gauges.clone())); - self.global_gauge_vecs - .write() - .unwrap() - .insert(id, gauges.clone()); + self.global_gauge_vecs.write().insert(id, gauges.clone()); Ok(gauges) } } @@ -254,14 +246,13 @@ impl MetricsRegistry { let opts = HistogramOpts::new(name, help); let histograms = HistogramVec::new(opts, variable_labels)?; let id = histograms.desc().first().unwrap().id; - let maybe_histogram = self.global_histogram_vecs.read().unwrap().get(&id).cloned(); + let maybe_histogram = self.global_histogram_vecs.read().get(&id).cloned(); if let Some(histograms) = maybe_histogram { Ok(histograms) } else { self.register(name, Box::new(histograms.clone())); self.global_histogram_vecs .write() - .unwrap() .insert(id, histograms.clone()); Ok(histograms) } diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index 77675967c25..c9f78cece5d 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -25,9 +25,11 @@ use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::fmt; use std::fmt::Display; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::Duration; +use crate::parking_lot::RwLock; + use async_trait::async_trait; use crate::blockchain::{Block, BlockHash, BlockPtr}; diff --git a/graph/src/data/graphql/load_manager.rs b/graph/src/data/graphql/load_manager.rs index b8bdb4a63d0..0abba9521fc 100644 --- a/graph/src/data/graphql/load_manager.rs +++ b/graph/src/data/graphql/load_manager.rs @@ -4,9 +4,11 @@ use prometheus::core::GenericCounter; use rand::{prelude::Rng, rng}; use std::collections::{HashMap, HashSet}; use std::iter::FromIterator; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::{Duration, Instant}; +use crate::parking_lot::RwLock; + use crate::components::metrics::{Counter, GaugeVec, MetricsRegistry}; use crate::components::store::{DeploymentId, PoolWaitStats}; use crate::data::graphql::shape_hash::shape_hash; @@ -57,7 +59,7 @@ impl ShardEffort { } pub fn add(&self, shard: &str, qref: QueryRef, duration: Duration, gauge: &GaugeVec) { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.write(); inner.add(qref, duration); gauge .with_label_values(&[shard]) @@ -70,7 +72,7 @@ impl ShardEffort { /// data for the particular query, return `None` as the effort /// for the query pub fn current_effort(&self, qref: &QueryRef) -> (Option, Duration) { - let inner = self.inner.read().unwrap(); + let inner = self.inner.read(); let total_effort = inner.total.duration(); let query_effort = inner.effort.get(qref).map(|stats| stats.duration()); (query_effort, total_effort) @@ -381,7 +383,7 @@ impl LoadManager { let qref = QueryRef::new(deployment, shape_hash); - if self.jailed_queries.read().unwrap().contains(&qref) { + if self.jailed_queries.read().contains(&qref) { return if ENV_VARS.load_simulate { Proceed } else { @@ -426,7 +428,7 @@ impl LoadManager { "query_effort_ms" => query_effort, "total_effort_ms" => total_effort, "ratio" => format!("{:.4}", query_effort/total_effort)); - self.jailed_queries.write().unwrap().insert(qref); + self.jailed_queries.write().insert(qref); return if ENV_VARS.load_simulate { Proceed } else { @@ -457,7 +459,7 @@ impl LoadManager { } fn overloaded(&self, wait_stats: &PoolWaitStats) -> (bool, Duration) { - let store_avg = wait_stats.read().unwrap().average(); + let store_avg = wait_stats.read().average(); let overloaded = store_avg .map(|average| average > ENV_VARS.load_threshold) .unwrap_or(false); @@ -465,7 +467,7 @@ impl LoadManager { } fn kill_state(&self, shard: &str) -> (f64, Instant) { - let state = self.kill_state.get(shard).unwrap().read().unwrap(); + let state = self.kill_state.get(shard).unwrap().read(); (state.kill_rate, state.last_update) } @@ -505,7 +507,7 @@ impl LoadManager { kill_rate = (kill_rate - KILL_RATE_STEP_DOWN).max(0.0); } let event = { - let mut state = self.kill_state.get(shard).unwrap().write().unwrap(); + let mut state = self.kill_state.get(shard).unwrap().write(); state.kill_rate = kill_rate; state.last_update = now; state.log_event(now, kill_rate, overloaded) diff --git a/graph/src/env/store.rs b/graph/src/env/store.rs index 7a3c16d5b81..23294ad0a77 100644 --- a/graph/src/env/store.rs +++ b/graph/src/env/store.rs @@ -163,6 +163,9 @@ pub struct EnvVarsStore { /// Disables storing or reading `eth_call` results from the store call cache. /// Set by `GRAPH_STORE_DISABLE_CALL_CACHE`. Defaults to false. pub disable_call_cache: bool, + /// Set by `GRAPH_STORE_DISABLE_CHAIN_HEAD_PTR_CACHE`. Default is false. + /// Set to true to disable chain_head_ptr caching (safety escape hatch). + pub disable_chain_head_ptr_cache: bool, } // This does not print any values avoid accidentally leaking any sensitive env vars @@ -224,6 +227,7 @@ impl TryFrom for EnvVarsStore { account_like_min_versions_count: x.account_like_min_versions_count, account_like_max_unique_ratio: x.account_like_max_unique_ratio.map(|r| r.0), disable_call_cache: x.disable_call_cache, + disable_chain_head_ptr_cache: x.disable_chain_head_ptr_cache, }; if let Some(timeout) = vars.batch_timeout { if timeout < 2 * vars.batch_target_duration { @@ -331,6 +335,8 @@ pub struct InnerStore { account_like_max_unique_ratio: Option, #[envconfig(from = "GRAPH_STORE_DISABLE_CALL_CACHE", default = "false")] disable_call_cache: bool, + #[envconfig(from = "GRAPH_STORE_DISABLE_CHAIN_HEAD_PTR_CACHE", default = "false")] + disable_chain_head_ptr_cache: bool, } #[derive(Clone, Copy, Debug)] diff --git a/graph/src/util/timed_cache.rs b/graph/src/util/timed_cache.rs index 8f64c844630..587a49a176a 100644 --- a/graph/src/util/timed_cache.rs +++ b/graph/src/util/timed_cache.rs @@ -3,10 +3,12 @@ use std::{ cmp::Eq, collections::HashMap, hash::Hash, - sync::{Arc, RwLock}, + sync::Arc, time::{Duration, Instant}, }; +use crate::parking_lot::RwLock; + /// Caching of values for a specified amount of time #[derive(Debug)] struct CacheEntry { @@ -49,7 +51,7 @@ impl TimedCache { K: Borrow + Eq + Hash, Q: Hash + Eq + ?Sized, { - match self.entries.read().unwrap().get(key) { + match self.entries.read().get(key) { Some(CacheEntry { value, expires }) if expires >= &now => Some(value.clone()), _ => None, } @@ -72,11 +74,11 @@ impl TimedCache { value, expires: now + self.ttl, }; - self.entries.write().unwrap().insert(key, entry); + self.entries.write().insert(key, entry); } pub fn clear(&self) { - self.entries.write().unwrap().clear(); + self.entries.write().clear(); } pub fn find(&self, pred: F) -> Option> @@ -85,7 +87,6 @@ impl TimedCache { { self.entries .read() - .unwrap() .values() .find(move |entry| pred(entry.value.as_ref())) .map(|entry| entry.value.clone()) @@ -101,7 +102,6 @@ impl TimedCache { { self.entries .write() - .unwrap() .remove(key) .map(|CacheEntry { value, expires }| (value, expires >= Instant::now())) } diff --git a/store/postgres/src/block_store.rs b/store/postgres/src/block_store.rs index 674c274ac5c..48ea768c033 100644 --- a/store/postgres/src/block_store.rs +++ b/store/postgres/src/block_store.rs @@ -1,8 +1,6 @@ -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, - time::Duration, -}; +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use graph::parking_lot::RwLock; use anyhow::anyhow; use async_trait::async_trait; @@ -321,7 +319,6 @@ impl BlockStore { let configured_chains = block_store .stores .read() - .unwrap() .keys() .cloned() .collect::>(); @@ -410,7 +407,6 @@ impl BlockStore { let store = Arc::new(store); self.stores .write() - .unwrap() .insert(chain.name.clone(), store.clone()); Ok(store) } @@ -475,12 +471,7 @@ impl BlockStore { } async fn store(&self, chain: &str) -> Option> { - let store = self - .stores - .read() - .unwrap() - .get(chain) - .map(CheapClone::cheap_clone); + let store = self.stores.read().get(chain).map(CheapClone::cheap_clone); if store.is_some() { return store; } @@ -506,7 +497,7 @@ impl BlockStore { chain_store.drop_chain().await?; - self.stores.write().unwrap().remove(chain); + self.stores.write().remove(chain); Ok(()) } @@ -516,7 +507,6 @@ impl BlockStore { fn stores(&self) -> Vec> { self.stores .read() - .unwrap() .values() .map(CheapClone::cheap_clone) .collect() diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index cc3b6949fa8..556c6b80d2d 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -17,6 +17,8 @@ use graph::util::herd_cache::HerdCache; use std::collections::BTreeMap; use std::future::Future; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{Duration, Instant}; use std::{ collections::HashMap, convert::{TryFrom, TryInto}, @@ -1873,6 +1875,10 @@ pub struct ChainStoreMetrics { chain_head_cache_latest_block_num: Box, chain_head_cache_hits: Box, chain_head_cache_misses: Box, + // Metrics for chain_head_ptr() cache + chain_head_ptr_cache_hits: Box, + chain_head_ptr_cache_misses: Box, + chain_head_ptr_cache_block_time_ms: Box, } impl ChainStoreMetrics { @@ -1914,12 +1920,37 @@ impl ChainStoreMetrics { ) .expect("Can't register the counter"); + let chain_head_ptr_cache_hits = registry + .new_counter_vec( + "chain_head_ptr_cache_hits", + "Number of times the chain_head_ptr cache was hit", + vec!["network".to_string()], + ) + .expect("Can't register the counter"); + let chain_head_ptr_cache_misses = registry + .new_counter_vec( + "chain_head_ptr_cache_misses", + "Number of times the chain_head_ptr cache was missed", + vec!["network".to_string()], + ) + .expect("Can't register the counter"); + let chain_head_ptr_cache_block_time_ms = registry + .new_gauge_vec( + "chain_head_ptr_cache_block_time_ms", + "Estimated block time in milliseconds used for adaptive cache TTL", + vec!["network".to_string()], + ) + .expect("Can't register the gauge"); + Self { chain_head_cache_size, chain_head_cache_oldest_block_num, chain_head_cache_latest_block_num, chain_head_cache_hits, chain_head_cache_misses, + chain_head_ptr_cache_hits, + chain_head_ptr_cache_misses, + chain_head_ptr_cache_block_time_ms, } } @@ -1959,6 +1990,143 @@ impl ChainStoreMetrics { .unwrap() .inc_by(misses as f64); } + + pub fn record_chain_head_ptr_cache_hit(&self, network: &str) { + self.chain_head_ptr_cache_hits + .with_label_values(&[network]) + .inc(); + } + + pub fn record_chain_head_ptr_cache_miss(&self, network: &str) { + self.chain_head_ptr_cache_misses + .with_label_values(&[network]) + .inc(); + } + + pub fn set_chain_head_ptr_block_time(&self, network: &str, block_time_ms: u64) { + self.chain_head_ptr_cache_block_time_ms + .with_label_values(&[network]) + .set(block_time_ms as f64); + } +} + +const MIN_TTL_MS: u64 = 20; +const MAX_TTL_MS: u64 = 2000; +const MIN_OBSERVATIONS: u64 = 5; + +/// Adaptive cache for chain_head_ptr() that learns optimal TTL from block frequency. +struct ChainHeadPtrCache { + /// Cached value and when it expires + entry: RwLock>, + /// Estimated milliseconds between blocks (EWMA) + estimated_block_time_ms: AtomicU64, + /// When we last observed the chain head change + last_change: RwLock, + /// Number of block changes observed (for warmup) + observations: AtomicU64, + /// Metrics for recording cache hits/misses + metrics: Arc, + /// Chain name for metric labels + chain: String, +} + +impl ChainHeadPtrCache { + fn new(metrics: Arc, chain: String) -> Self { + Self { + entry: RwLock::new(None), + estimated_block_time_ms: AtomicU64::new(0), + last_change: RwLock::new(Instant::now()), + observations: AtomicU64::new(0), + metrics, + chain, + } + } + + /// Returns cached value if still valid, or None if cache is disabled/missed. + /// Records hit/miss metrics automatically. + fn get(&self) -> Option { + if ENV_VARS.store.disable_chain_head_ptr_cache { + return None; + } + let guard = self.entry.read(); + if let Some((value, expires)) = guard.as_ref() { + if Instant::now() < *expires { + self.metrics.record_chain_head_ptr_cache_hit(&self.chain); + return Some(value.clone()); + } + } + self.metrics.record_chain_head_ptr_cache_miss(&self.chain); + None + } + + /// Compute current TTL - MIN_TTL during warmup, then 1/4 of estimated block time + fn current_ttl(&self) -> Duration { + let obs = AtomicU64::load(&self.observations, Ordering::Relaxed); + if obs < MIN_OBSERVATIONS { + return Duration::from_millis(MIN_TTL_MS); + } + + let block_time = AtomicU64::load(&self.estimated_block_time_ms, Ordering::Relaxed); + let ttl_ms = (block_time / 4).clamp(MIN_TTL_MS, MAX_TTL_MS); + Duration::from_millis(ttl_ms) + } + + /// Cache a new value, updating block time estimate if value changed. + /// Does nothing if cache is disabled. + fn set(&self, new_value: BlockPtr) { + if ENV_VARS.store.disable_chain_head_ptr_cache { + return; + } + let now = Instant::now(); + + // Check if block changed + let old_value = { + let guard = self.entry.read(); + guard.as_ref().map(|(v, _)| v.clone()) + }; + + // Only update estimate if we have a previous value and block number advanced + // (skip reorgs where new block number <= old) + if let Some(old_ptr) = old_value.as_ref() { + if new_value.number > old_ptr.number { + let mut last_change = self.last_change.write(); + let delta_ms = now.duration_since(*last_change).as_millis() as u64; + *last_change = now; + + let blocks_advanced = (new_value.number - old_ptr.number) as u64; + + // Increment observation count + let obs = AtomicU64::fetch_add(&self.observations, 1, Ordering::Relaxed); + + // Ignore unreasonable deltas (> 60s) + if delta_ms > 0 && delta_ms < 60_000 { + let per_block_ms = delta_ms / blocks_advanced; + let new_estimate = if obs == 0 { + // First observation - use as initial estimate + per_block_ms + } else { + // EWMA: new = 0.8 * old + 0.2 * observed + let old_estimate = + AtomicU64::load(&self.estimated_block_time_ms, Ordering::Relaxed); + (old_estimate * 4 + per_block_ms) / 5 + }; + AtomicU64::store( + &self.estimated_block_time_ms, + new_estimate, + Ordering::Relaxed, + ); + + // Update metric gauge + self.metrics + .set_chain_head_ptr_block_time(&self.chain, new_estimate); + } + } + } + + // Compute TTL and store with expiry + let ttl = self.current_ttl(); + *self.entry.write() = Some((new_value, now + ttl)); + } } pub struct ChainStore { @@ -1980,6 +2148,8 @@ pub struct ChainStore { blocks_by_number_cache: HerdCache>, StoreError>>>, ancestor_cache: HerdCache, StoreError>>>, + /// Adaptive cache for chain_head_ptr() + chain_head_ptr_cache: ChainHeadPtrCache, } impl ChainStore { @@ -1994,10 +2164,11 @@ impl ChainStore { metrics: Arc, ) -> Self { let recent_blocks_cache = - RecentBlocksCache::new(recent_blocks_cache_capacity, chain.clone(), metrics); + RecentBlocksCache::new(recent_blocks_cache_capacity, chain.clone(), metrics.clone()); let blocks_by_hash_cache = HerdCache::new(format!("chain_{}_blocks_by_hash", chain)); let blocks_by_number_cache = HerdCache::new(format!("chain_{}_blocks_by_number", chain)); let ancestor_cache = HerdCache::new(format!("chain_{}_ancestor", chain)); + let chain_head_ptr_cache = ChainHeadPtrCache::new(metrics, chain.clone()); ChainStore { logger, pool, @@ -2009,6 +2180,7 @@ impl ChainStore { blocks_by_hash_cache, blocks_by_number_cache, ancestor_cache, + chain_head_ptr_cache, } } @@ -2351,8 +2523,14 @@ impl ChainHeadStore for ChainStore { async fn chain_head_ptr(self: Arc) -> Result, Error> { use public::ethereum_networks::dsl::*; + // Check cache first (handles disabled check and metrics internally) + if let Some(cached) = self.chain_head_ptr_cache.get() { + return Ok(Some(cached)); + } + + // Query database let mut conn = self.pool.get_permitted().await?; - Ok(ethereum_networks + let result = ethereum_networks .select((head_block_hash, head_block_number)) .filter(name.eq(&self.chain)) .load::<(Option, Option)>(&mut conn) @@ -2375,7 +2553,14 @@ impl ChainHeadStore for ChainStore { _ => unreachable!(), }) .and_then(|opt: Option| opt) - })?) + })?; + + // Cache the result (set() handles disabled check internally) + if let Some(ref ptr) = result { + self.chain_head_ptr_cache.set(ptr.clone()); + } + + Ok(result) } async fn chain_head_cursor(&self) -> Result, Error> { diff --git a/store/postgres/src/pool/manager.rs b/store/postgres/src/pool/manager.rs index 4677ea6276b..2f5fe0fa00a 100644 --- a/store/postgres/src/pool/manager.rs +++ b/store/postgres/src/pool/manager.rs @@ -22,9 +22,10 @@ use std::collections::HashMap; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; -use std::sync::RwLock; use std::time::{Duration, Instant}; +use graph::parking_lot::RwLock; + use crate::pool::AsyncPool; /// Our own connection manager. It is pretty much the same as @@ -308,7 +309,6 @@ impl WaitMeter { pub(crate) fn add_conn_wait_time(&self, duration: Duration) { self.wait_stats .write() - .unwrap() .add_and_register(duration, &self.wait_gauge); } } diff --git a/store/postgres/src/pool/mod.rs b/store/postgres/src/pool/mod.rs index 20d332616a2..cf176db64c2 100644 --- a/store/postgres/src/pool/mod.rs +++ b/store/postgres/src/pool/mod.rs @@ -19,11 +19,13 @@ use graph::slog::warn; use graph::util::timed_rw_lock::TimedMutex; use tokio::sync::OwnedSemaphorePermit; +use std::collections::HashMap; use std::fmt::{self}; use std::ops::{Deref, DerefMut}; use std::sync::Arc; use std::time::Duration; -use std::{collections::HashMap, sync::RwLock}; + +use graph::parking_lot::RwLock; use crate::catalog; use crate::pool::manager::{ConnectionManager, WaitMeter}; @@ -720,7 +722,6 @@ impl PoolInner { let permit = self.query_semaphore.cheap_clone().acquire_owned().await; self.semaphore_wait_stats .write() - .unwrap() .add_and_register(start.elapsed(), &self.semaphore_wait_gauge); permit.unwrap() } @@ -734,7 +735,6 @@ impl PoolInner { let elapsed = start.elapsed(); self.indexing_semaphore_wait_stats .write() - .unwrap() .add_and_register(elapsed, &self.indexing_semaphore_wait_gauge); (permit.unwrap(), elapsed) } diff --git a/store/postgres/src/store_events.rs b/store/postgres/src/store_events.rs index 6189120f602..572c3a339a3 100644 --- a/store/postgres/src/store_events.rs +++ b/store/postgres/src/store_events.rs @@ -2,7 +2,9 @@ use graph::futures01::Stream; use graph::futures03::compat::Stream01CompatExt; use graph::futures03::stream::StreamExt; use graph::futures03::TryStreamExt; -use std::sync::{atomic::Ordering, Arc, RwLock}; +use std::sync::{atomic::Ordering, Arc}; + +use graph::parking_lot::RwLock; use std::{collections::HashMap, sync::atomic::AtomicUsize}; use tokio::sync::mpsc::{channel, Sender}; use tokio_stream::wrappers::ReceiverStream; @@ -127,7 +129,7 @@ impl SubscriptionManager { // Send to `subscriptions`. { - let senders = subscriptions.read().unwrap().clone(); + let senders = subscriptions.read().clone(); // Write change to all matching subscription streams; remove subscriptions // whose receiving end has been dropped @@ -138,7 +140,7 @@ impl SubscriptionManager { "Failed to send store event to subscriber {}: {}", id, e ); // Receiver was dropped - subscriptions.write().unwrap().remove(&id); + subscriptions.write().remove(&id); } } } @@ -187,7 +189,7 @@ impl SubscriptionManager { // Cleanup `subscriptions`. { - let mut subscriptions = subscriptions.write().unwrap(); + let mut subscriptions = subscriptions.write(); // Obtain IDs of subscriptions whose receiving end has gone let stale_ids = subscriptions @@ -218,7 +220,7 @@ impl SubscriptionManagerTrait for SubscriptionManager { let (sender, receiver) = channel(100); // Add the new subscription - self.subscriptions.write().unwrap().insert(id, sender); + self.subscriptions.write().insert(id, sender); // Return the subscription ID and entity change stream ReceiverStream::new(receiver) diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index ff5ffb2d45b..928cbdbe76f 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -1,7 +1,9 @@ use std::collections::BTreeSet; use std::ops::{Deref, Range}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Mutex, RwLock, TryLockError as RwLockError}; +use std::sync::Mutex; + +use graph::parking_lot::RwLock; use std::time::Instant; use std::{collections::BTreeMap, sync::Arc}; @@ -574,7 +576,7 @@ impl BlockTracker { // processed. let res = queue.find_map(|req| match req.as_ref() { Request::Write { batch, .. } => { - let batch = batch.read().unwrap(); + let batch = batch.read(); tracker.write(&batch.block_ptr); if batch.first_block <= tracker.revert { let res = f(batch.deref(), tracker.revert); @@ -613,7 +615,7 @@ impl BlockTracker { let accum = queue.fold(init, |accum, req| { match req.as_ref() { Request::Write { batch, .. } => { - let batch = batch.read().unwrap(); + let batch = batch.read(); let mut accum = accum; tracker.write(&batch.block_ptr); if batch.first_block <= tracker.revert { @@ -740,7 +742,7 @@ impl std::fmt::Debug for Request { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Write { batch, store, .. } => { - let batch = batch.read().unwrap(); + let batch = batch.read(); write!( f, "write[{}, {:p}, {} entities]", @@ -811,7 +813,7 @@ impl Request { } => { let start = Instant::now(); - let batch = batch.write().unwrap().close(); + let batch = batch.write().close(); if let Some(err) = &batch.error { // This can happen when appending to the batch failed @@ -850,7 +852,7 @@ impl Request { fn should_process(&self) -> bool { match self { Request::Write { queued, batch, .. } => { - batch.read().unwrap().weight() >= ENV_VARS.store.write_batch_size + batch.read().weight() >= ENV_VARS.store.write_batch_size || queued.elapsed() >= ENV_VARS.store.write_batch_duration } Request::RevertTo { .. } | Request::Stop => true, @@ -1169,7 +1171,7 @@ impl Queue { // duration of the write, and we do not want to // slow down queueing requests unnecessarily match existing.try_write() { - Ok(mut existing) => { + Some(mut existing) => { if existing.weight() < ENV_VARS.store.write_batch_size { let res = existing.append(batch).map(|()| None); if existing.weight() >= ENV_VARS.store.write_batch_size { @@ -1180,16 +1182,13 @@ impl Queue { Ok(Some(batch)) } } - Err(RwLockError::WouldBlock) => { + None => { // This branch can cause batches that // are not 'full' at the head of the // queue, something that start_writer // has to take into account Ok(Some(batch)) } - Err(RwLockError::Poisoned(e)) => { - panic!("rwlock on batch was poisoned {:?}", e); - } } } else { Ok(Some(batch))