Skip to content

Commit 9865f72

Browse files
committed
Merge remote-tracking branch 'origin/hm/head-change-revert' into hm/sqlite-indexer
2 parents 59f9c0b + 279d588 commit 9865f72

16 files changed

Lines changed: 265 additions & 152 deletions

File tree

src/chain/store/chain_store.rs

Lines changed: 50 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ use super::{
66
index::{ChainIndex, ResolveNullTipset},
77
tipset_tracker::TipsetTracker,
88
};
9-
use crate::db::{EthMappingsStore, EthMappingsStoreExt};
10-
use crate::interpreter::{BlockMessages, VMTrace};
119
use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite};
1210
use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage};
1311
use crate::networks::{ChainConfig, Height};
@@ -23,7 +21,15 @@ use crate::{
2321
blocks::{CachingBlockHeader, Tipset, TipsetKey, TxMeta},
2422
db::HeaviestTipsetKeyProvider,
2523
};
24+
use crate::{
25+
db::{EthMappingsStore, EthMappingsStoreExt},
26+
rpc::chain::PathChange,
27+
};
2628
use crate::{fil_cns, utils::cache::SizeTrackingLruCache};
29+
use crate::{
30+
interpreter::{BlockMessages, VMTrace},
31+
rpc::chain::PathChanges,
32+
};
2733
use ahash::{HashMap, HashMapExt, HashSet};
2834
use anyhow::Context as _;
2935
use cid::Cid;
@@ -47,17 +53,16 @@ pub type ChainEpochDelta = ChainEpoch;
4753

4854
/// `Enum` for `pubsub` channel that defines message type variant and data
4955
/// contained in message type.
50-
#[derive(Clone, Debug)]
51-
pub enum HeadChange {
52-
Apply(Tipset),
53-
}
56+
pub type HeadChange = PathChange<Tipset>;
57+
58+
pub type HeadChanges = PathChanges<Tipset>;
5459

5560
/// Stores chain data such as heaviest tipset and cached tipset info at each
5661
/// epoch. This structure is thread-safe, and all caches are wrapped in a mutex
5762
/// to allow a consistent `ChainStore` to be shared across tasks.
5863
pub struct ChainStore<DB> {
5964
/// Publisher for head change events
60-
publisher: Publisher<HeadChange>,
65+
publisher: Publisher<HeadChanges>,
6166

6267
/// key-value `datastore`.
6368
db: Arc<DB>,
@@ -66,7 +71,7 @@ pub struct ChainStore<DB> {
6671
heaviest_tipset_key_provider: Arc<dyn HeaviestTipsetKeyProvider + Sync + Send>,
6772

6873
/// Heaviest tipset cache
69-
heaviest_tipset_cache: Arc<RwLock<Option<Tipset>>>,
74+
heaviest_tipset_cache: Arc<RwLock<Tipset>>,
7075

7176
/// Used as a cache for tipset `lookbacks`.
7277
chain_index: Arc<ChainIndex<Arc<DB>>>,
@@ -124,14 +129,24 @@ where
124129
let (publisher, _) = broadcast::channel(SINK_CAP);
125130
let chain_index = Arc::new(ChainIndex::new(Arc::clone(&db)));
126131
let validated_blocks = Mutex::new(HashSet::default());
127-
132+
let head = if let Some(head_tsk) = heaviest_tipset_key_provider
133+
.heaviest_tipset_key()
134+
.context("failed to load head tipset key")?
135+
&& let Some(head) = chain_index
136+
.load_tipset(&head_tsk)
137+
.context("failed to load head tipset")?
138+
{
139+
head
140+
} else {
141+
Tipset::from(&genesis_block_header)
142+
};
128143
let cs = Self {
129144
publisher,
130145
chain_index,
131146
tipset_tracker: TipsetTracker::new(Arc::clone(&db), chain_config.clone()),
132147
db,
133148
heaviest_tipset_key_provider,
134-
heaviest_tipset_cache: Default::default(),
149+
heaviest_tipset_cache: Arc::new(RwLock::new(head)),
135150
genesis_block_header,
136151
validated_blocks,
137152
eth_mappings,
@@ -142,14 +157,31 @@ where
142157
}
143158

144159
/// Sets heaviest tipset
145-
pub fn set_heaviest_tipset(&self, ts: Tipset) -> Result<(), Error> {
160+
pub fn set_heaviest_tipset(&self, head: Tipset) -> Result<(), Error> {
161+
head.key().save(self.blockstore())?;
146162
self.heaviest_tipset_key_provider
147-
.set_heaviest_tipset_key(ts.key())?;
148-
*self.heaviest_tipset_cache.write() = Some(ts.clone());
149-
ts.key().save(self.blockstore())?;
150-
if self.publisher.send(HeadChange::Apply(ts)).is_err() {
151-
debug!("did not publish head change, no active receivers");
163+
.set_heaviest_tipset_key(head.key())?;
164+
let old_head = std::mem::replace(&mut *self.heaviest_tipset_cache.write(), head.clone());
165+
166+
let changes = match crate::rpc::chain::chain_get_path(self, old_head.key(), head.key()) {
167+
Ok(changes) => changes,
168+
Err(e) => {
169+
// Do not warn when the old head is genesis
170+
if old_head.epoch() > 0 {
171+
warn!("failed to get chain path changes: {e}");
172+
}
173+
// Fallback to single apply
174+
PathChanges {
175+
applies: vec![head],
176+
reverts: vec![],
177+
}
178+
}
179+
};
180+
181+
if self.publisher.send(changes).is_err() {
182+
debug!("did not publish changes, no active receivers");
152183
}
184+
153185
Ok(())
154186
}
155187

@@ -200,16 +232,7 @@ where
200232

201233
/// Returns the currently tracked heaviest tipset.
202234
pub fn heaviest_tipset(&self) -> Tipset {
203-
if let Some(ts) = &*self.heaviest_tipset_cache.read() {
204-
return ts.clone();
205-
}
206-
let tsk = self
207-
.heaviest_tipset_key_provider
208-
.heaviest_tipset_key()
209-
.unwrap_or_else(|_| TipsetKey::from(nunny::vec![*self.genesis_block_header.cid()]));
210-
self.chain_index
211-
.load_required_tipset(&tsk)
212-
.expect("failed to load heaviest tipset")
235+
self.heaviest_tipset_cache.read().clone()
213236
}
214237

215238
/// Returns the genesis tipset.
@@ -218,7 +241,7 @@ where
218241
}
219242

220243
/// Returns a reference to the publisher of head changes.
221-
pub fn publisher(&self) -> &Publisher<HeadChange> {
244+
pub fn publisher(&self) -> &Publisher<HeadChanges> {
222245
&self.publisher
223246
}
224247

src/chain/store/indexer.rs

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use sqlx::Row as _;
1414

1515
use crate::{
1616
blocks::Tipset,
17-
chain::{ChainStore, HeadChange, index::ResolveNullTipset},
17+
chain::{ChainStore, HeadChanges, index::ResolveNullTipset},
1818
message::{ChainMessage, SignedMessage},
1919
rpc::{
2020
chain::types::ChainIndexValidation,
@@ -122,18 +122,18 @@ where
122122

123123
pub async fn index_loop(
124124
&self,
125-
mut head_change_subscriber: tokio::sync::broadcast::Receiver<HeadChange>,
125+
mut head_change_subscriber: tokio::sync::broadcast::Receiver<HeadChanges>,
126126
) -> anyhow::Result<()> {
127127
loop {
128-
match head_change_subscriber.recv().await? {
129-
HeadChange::Apply(ts) => {
130-
if let Err(e) = self.index_tipset(&ts).await {
131-
tracing::warn!(
132-
"failed to index new head@{}({}): {e}",
133-
ts.epoch(),
134-
ts.key()
135-
);
136-
}
128+
let HeadChanges { reverts, applies } = head_change_subscriber.recv().await?;
129+
for ts in reverts {
130+
if let Err(e) = self.revert_tipset(&ts).await {
131+
tracing::warn!("failed to index new head@{}({}): {e}", ts.epoch(), ts.key());
132+
}
133+
}
134+
for ts in applies {
135+
if let Err(e) = self.index_tipset(&ts).await {
136+
tracing::warn!("failed to index new head@{}({}): {e}", ts.epoch(), ts.key());
137137
}
138138
}
139139
}
@@ -556,6 +556,27 @@ where
556556
Ok(())
557557
}
558558

559+
pub async fn revert_tipset(&self, ts: &Tipset) -> anyhow::Result<()> {
560+
let tsk_cid_bytes = ts.key().cid()?.to_bytes();
561+
// Because of deferred execution in Filecoin, events at tipset T are reverted when a tipset T+1 is reverted.
562+
// However, the tipet `T` itself is not reverted.
563+
let pts = Tipset::load_required(self.cs.blockstore(), ts.parents())?;
564+
let events_tsk_cid_bytes = pts.key().cid()?.to_bytes();
565+
let mut tx = self.db.begin().await?;
566+
sqlx::query(self.stmts.update_events_to_reverted)
567+
.bind(&tsk_cid_bytes)
568+
.execute(tx.deref_mut())
569+
.await?;
570+
// events are indexed against the message inclusion tipset, not the message execution tipset.
571+
// So we need to revert the events for the message inclusion tipset
572+
sqlx::query(self.stmts.update_events_to_reverted)
573+
.bind(&events_tsk_cid_bytes)
574+
.execute(tx.deref_mut())
575+
.await?;
576+
tx.commit().await?;
577+
Ok(())
578+
}
579+
559580
pub async fn index_tipset(&self, ts: &Tipset) -> anyhow::Result<()> {
560581
let mut tx = self.db.begin().await?;
561582
self.index_tipset_and_parent_events_with_tx(&mut tx, ts)

src/chain/store/indexer/ddls.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@ pub struct PreparedStatements {
5656
pub insert_eth_tx_hash: &'static str,
5757
pub insert_tipset_message: &'static str,
5858
pub update_tipset_to_non_reverted: &'static str,
59+
pub update_tipset_to_reverted: &'static str,
5960
pub update_events_to_non_reverted: &'static str,
61+
pub update_events_to_reverted: &'static str,
6062
pub get_msg_id_for_msg_cid_and_tipset: &'static str,
6163
pub insert_event: &'static str,
6264
pub insert_event_entry: &'static str,
@@ -81,7 +83,10 @@ impl Default for PreparedStatements {
8183
let insert_tipset_message = "INSERT INTO tipset_message (tipset_key_cid, height, reverted, message_cid, message_index) VALUES (?, ?, ?, ?, ?) ON CONFLICT (tipset_key_cid, message_cid) DO UPDATE SET reverted = 0";
8284
let update_tipset_to_non_reverted =
8385
"UPDATE tipset_message SET reverted = 0 WHERE tipset_key_cid = ?";
86+
let update_tipset_to_reverted =
87+
"UPDATE tipset_message SET reverted = 1 WHERE tipset_key_cid = ?";
8488
let update_events_to_non_reverted = "UPDATE event SET reverted = 0 WHERE message_id IN (SELECT id FROM tipset_message WHERE tipset_key_cid = ?)";
89+
let update_events_to_reverted = "UPDATE event SET reverted = 1 WHERE message_id IN (SELECT id FROM tipset_message WHERE height >= ?)";
8590
let get_msg_id_for_msg_cid_and_tipset = "SELECT id FROM tipset_message WHERE tipset_key_cid = ? AND message_cid = ? AND reverted = 0";
8691
let insert_event = "INSERT INTO event (message_id, event_index, emitter_id, emitter_addr, reverted) VALUES (?, ?, ?, ?, ?)";
8792
let insert_event_entry = "INSERT INTO event_entry (event_id, indexed, flags, key, codec, value) VALUES (?, ?, ?, ?, ?, ?)";
@@ -102,7 +107,9 @@ impl Default for PreparedStatements {
102107
insert_eth_tx_hash,
103108
insert_tipset_message,
104109
update_tipset_to_non_reverted,
110+
update_tipset_to_reverted,
105111
update_events_to_non_reverted,
112+
update_events_to_reverted,
106113
get_msg_id_for_msg_cid_and_tipset,
107114
insert_event,
108115
insert_event_entry,

src/daemon/mod.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ pub mod db_util;
77
pub mod main;
88

99
use crate::blocks::Tipset;
10-
use crate::chain::HeadChange;
1110
use crate::chain::index::ResolveNullTipset;
1211
use crate::chain_sync::network_context::SyncNetworkContext;
1312
use crate::chain_sync::{ChainFollower, SyncStatus};
@@ -507,21 +506,19 @@ fn maybe_start_indexer_service(
507506
{
508507
// Old indexer
509508
{
510-
let mut receiver = ctx.state_manager.chain_store().publisher().subscribe();
509+
let mut head_changes_subscriber =
510+
ctx.state_manager.chain_store().publisher().subscribe();
511511
let chain_store = ctx.state_manager.chain_store().clone();
512512
services.spawn(async move {
513513
tracing::info!("Starting indexer service");
514-
515514
// Continuously listen for head changes
516515
loop {
517-
let HeadChange::Apply(ts) = receiver.recv().await?;
518-
519-
tracing::debug!("Indexing tipset {}", ts.key());
520-
521-
let delegated_messages =
522-
chain_store.headers_delegated_messages(ts.block_headers().iter())?;
523-
524-
chain_store.process_signed_messages(&delegated_messages)?;
516+
for ts in head_changes_subscriber.recv().await?.applies {
517+
tracing::debug!("Indexing tipset {}", ts.key());
518+
let delegated_messages =
519+
chain_store.headers_delegated_messages(ts.block_headers().iter())?;
520+
chain_store.process_signed_messages(&delegated_messages)?;
521+
}
525522
}
526523
});
527524

@@ -544,11 +541,11 @@ fn maybe_start_indexer_service(
544541
// New SQLITE indexer
545542
if let Some(indexer) = &ctx.chain_indexer {
546543
services.spawn({
547-
let head_change_subscriber =
544+
let head_changes_subscriber =
548545
ctx.state_manager.chain_store().publisher().subscribe();
549546
let indexer = indexer.clone();
550547
async move {
551-
if let Err(e) = indexer.index_loop(head_change_subscriber).await {
548+
if let Err(e) = indexer.index_loop(head_changes_subscriber).await {
552549
tracing::warn!("indexer stopped unexpectedly: {e}");
553550
}
554551
Ok(())

src/db/car/many.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -130,12 +130,12 @@ impl<WriterT> ManyCar<WriterT> {
130130
Ok(())
131131
}
132132

133-
pub fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey> {
134-
self.read_only
133+
pub fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
134+
Ok(self
135+
.read_only
135136
.read()
136137
.peek()
137-
.map(|w| AnyCar::heaviest_tipset_key(&w.car))
138-
.context("ManyCar store doesn't have a heaviest tipset key")
138+
.map(|w| AnyCar::heaviest_tipset_key(&w.car)))
139139
}
140140

141141
pub fn heaviest_tipset(&self) -> anyhow::Result<Tipset> {
@@ -252,9 +252,9 @@ impl<WriterT: EthMappingsStore> EthMappingsStore for ManyCar<WriterT> {
252252
}
253253

254254
impl<T: Blockstore + SettingsStore> super::super::HeaviestTipsetKeyProvider for ManyCar<T> {
255-
fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey> {
255+
fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
256256
match SettingsStoreExt::read_obj::<TipsetKey>(self, crate::db::setting_keys::HEAD_KEY)? {
257-
Some(tsk) => Ok(tsk),
257+
Some(tsk) => Ok(Some(tsk)),
258258
None => self.heaviest_tipset_key(),
259259
}
260260
}

src/db/gc/snapshot.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ where
282282
tracing::warn!("{e}");
283283
}
284284

285-
*self.memory_db_head_key.write() = db.heaviest_tipset_key().ok();
285+
*self.memory_db_head_key.write() = db.heaviest_tipset_key()?;
286286
db.unsubscribe_write_ops();
287287
match joinset.join_next().await {
288288
Some(Ok(map)) => {

src/db/memory.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,8 @@ impl BitswapStoreReadWrite for MemoryDB {
154154
}
155155

156156
impl super::HeaviestTipsetKeyProvider for MemoryDB {
157-
fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey> {
158-
SettingsStoreExt::read_obj::<TipsetKey>(self, crate::db::setting_keys::HEAD_KEY)?
159-
.context("head key not found")
157+
fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
158+
SettingsStoreExt::read_obj::<TipsetKey>(self, crate::db::setting_keys::HEAD_KEY)
160159
}
161160

162161
fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {

src/db/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,14 +239,14 @@ impl<T: PersistentStore> PersistentStore for &Arc<T> {
239239

240240
pub trait HeaviestTipsetKeyProvider {
241241
/// Returns the currently tracked heaviest tipset.
242-
fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey>;
242+
fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>>;
243243

244244
/// Sets heaviest tipset.
245245
fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()>;
246246
}
247247

248248
impl<T: HeaviestTipsetKeyProvider> HeaviestTipsetKeyProvider for Arc<T> {
249-
fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey> {
249+
fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
250250
self.as_ref().heaviest_tipset_key()
251251
}
252252

src/db/parity_db.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,8 @@ impl SettingsStore for ParityDb {
173173
}
174174

175175
impl super::HeaviestTipsetKeyProvider for ParityDb {
176-
fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey> {
177-
super::SettingsStoreExt::read_obj::<TipsetKey>(self, super::setting_keys::HEAD_KEY)?
178-
.context("head key not found")
176+
fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
177+
super::SettingsStoreExt::read_obj::<TipsetKey>(self, super::setting_keys::HEAD_KEY)
179178
}
180179

181180
fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {

0 commit comments

Comments
 (0)