diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index 81db925a092..51891ad1bf9 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -810,8 +810,9 @@ where .offchain_monitor .ready_offchain_events() .non_deterministic()?; + let onchain_vid_seq = block_state.entity_cache.vid_seq; let (offchain_mods, processed_offchain_data_sources, persisted_off_chain_data_sources) = - self.handle_offchain_triggers(offchain_events, &block) + self.handle_offchain_triggers(offchain_events, &block, onchain_vid_seq) .await .non_deterministic()?; block_state @@ -1161,6 +1162,7 @@ where &mut self, triggers: Vec, block: &Arc, + mut next_vid_seq: u32, ) -> Result< ( Vec, @@ -1179,6 +1181,11 @@ where let schema = ReadStore::input_schema(&self.inputs.store); let mut block_state = BlockState::new(EmptyStore::new(schema), LfuCache::new()); + // Continue the vid sequence from the previous trigger (or from + // onchain processing) so that each offchain trigger does not + // reset to RESERVED_VIDS and produce duplicate VIDs. + block_state.entity_cache.vid_seq = next_vid_seq; + // PoI ignores offchain events. // See also: poi-ignores-offchain let proof_of_indexing = SharedProofOfIndexing::ignored(); @@ -1244,6 +1251,10 @@ where return Err(anyhow!("{}", err)); } + // Carry forward the vid sequence so the next iteration doesn't + // reset to RESERVED_VIDS and produce duplicate VIDs. + next_vid_seq = block_state.entity_cache.vid_seq; + mods.extend( block_state .entity_cache diff --git a/store/test-store/tests/graph/entity_cache.rs b/store/test-store/tests/graph/entity_cache.rs index 02b02579440..498d7c3d880 100644 --- a/store/test-store/tests/graph/entity_cache.rs +++ b/store/test-store/tests/graph/entity_cache.rs @@ -366,6 +366,107 @@ async fn check_vid_sequence() { } } +// Test that demonstrates the VID collision bug when multiple offchain triggers +// (e.g. file data sources) are processed in the same block. Each trigger creates +// a fresh EntityCache with vid_seq reset to RESERVED_VIDS (100). When two triggers +// in the same block both insert an entity, they produce the same VID, violating +// the primary key constraint. +// +// The fix is to thread vid_seq from one trigger's EntityCache to the next. +#[graph::test] +async fn offchain_trigger_vid_collision_without_fix() { + let block: i32 = 2_163_923; // any block number + + // Simulate first offchain trigger: fresh EntityCache (vid_seq starts at 100) + let store1 = Arc::new(MockStore::new(BTreeMap::new())); + let mut cache1 = EntityCache::new(store1); + let band1_data = entity! { SCHEMA => id: "band1", name: "First Band" }; + let band1_key = make_band_key("band1"); + cache1 + .set(band1_key.clone(), band1_data, block, None) + .await + .unwrap(); + let result1 = cache1.as_modifications(block).await.unwrap(); + + // Simulate second offchain trigger: another fresh EntityCache (vid_seq ALSO starts at 100) + let store2 = Arc::new(MockStore::new(BTreeMap::new())); + let mut cache2 = EntityCache::new(store2); + let band2_data = entity! { SCHEMA => id: "band2", name: "Second Band" }; + let band2_key = make_band_key("band2"); + cache2 + .set(band2_key.clone(), band2_data, block, None) + .await + .unwrap(); + let result2 = cache2.as_modifications(block).await.unwrap(); + + // Extract VIDs from both modifications + let vid1 = match &result1.modifications[0] { + EntityModification::Insert { data, .. } => data.vid(), + _ => panic!("expected Insert"), + }; + let vid2 = match &result2.modifications[0] { + EntityModification::Insert { data, .. } => data.vid(), + _ => panic!("expected Insert"), + }; + + // BUG: Both VIDs are identical! This is what causes + // "duplicate key value violates unique constraint" + // vid = (block << 32) + 100 for BOTH triggers + let expected_vid = ((block as i64) << 32) + 100; + assert_eq!(vid1, expected_vid, "first trigger vid should be (block << 32) + 100"); + assert_eq!(vid2, expected_vid, "second trigger vid should ALSO be (block << 32) + 100 — the bug!"); + assert_eq!(vid1, vid2, "VIDs collide — this is the bug that causes the DB constraint violation"); +} + +// Test that demonstrates the fix: threading vid_seq from one trigger's +// EntityCache to the next prevents VID collisions. +#[graph::test] +async fn offchain_trigger_vid_no_collision_with_fix() { + let block: i32 = 2_163_923; + + // First offchain trigger + let store1 = Arc::new(MockStore::new(BTreeMap::new())); + let mut cache1 = EntityCache::new(store1); + let band1_data = entity! { SCHEMA => id: "band1", name: "First Band" }; + let band1_key = make_band_key("band1"); + cache1 + .set(band1_key.clone(), band1_data, block, None) + .await + .unwrap(); + + // THE FIX: capture vid_seq BEFORE as_modifications consumes the cache + let next_vid_seq = cache1.vid_seq; + let result1 = cache1.as_modifications(block).await.unwrap(); + + // Second offchain trigger: set vid_seq to where first trigger left off + let store2 = Arc::new(MockStore::new(BTreeMap::new())); + let mut cache2 = EntityCache::new(store2); + cache2.vid_seq = next_vid_seq; // <-- the fix + let band2_data = entity! { SCHEMA => id: "band2", name: "Second Band" }; + let band2_key = make_band_key("band2"); + cache2 + .set(band2_key.clone(), band2_data, block, None) + .await + .unwrap(); + let result2 = cache2.as_modifications(block).await.unwrap(); + + let vid1 = match &result1.modifications[0] { + EntityModification::Insert { data, .. } => data.vid(), + _ => panic!("expected Insert"), + }; + let vid2 = match &result2.modifications[0] { + EntityModification::Insert { data, .. } => data.vid(), + _ => panic!("expected Insert"), + }; + + // With the fix, VIDs are different + assert_ne!(vid1, vid2, "VIDs should NOT collide when vid_seq is threaded"); + let expected_vid1 = ((block as i64) << 32) + 100; + let expected_vid2 = ((block as i64) << 32) + 101; + assert_eq!(vid1, expected_vid1, "first trigger starts at vid_seq 100"); + assert_eq!(vid2, expected_vid2, "second trigger continues at vid_seq 101"); +} + const ACCOUNT_GQL: &str = " type Account @entity { id: ID!