Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -810,8 +810,9 @@ where
.offchain_monitor
.ready_offchain_events()
.non_deterministic()?;
let onchain_vid_seq = block_state.entity_cache.vid_seq;
let (offchain_mods, processed_offchain_data_sources, persisted_off_chain_data_sources) =
self.handle_offchain_triggers(offchain_events, &block)
self.handle_offchain_triggers(offchain_events, &block, onchain_vid_seq)
.await
.non_deterministic()?;
block_state
Expand Down Expand Up @@ -1161,6 +1162,7 @@ where
&mut self,
triggers: Vec<offchain::TriggerData>,
block: &Arc<C::Block>,
mut next_vid_seq: u32,
) -> Result<
(
Vec<EntityModification>,
Expand All @@ -1179,6 +1181,11 @@ where
let schema = ReadStore::input_schema(&self.inputs.store);
let mut block_state = BlockState::new(EmptyStore::new(schema), LfuCache::new());

// Continue the vid sequence from the previous trigger (or from
// onchain processing) so that each offchain trigger does not
// reset to RESERVED_VIDS and produce duplicate VIDs.
block_state.entity_cache.vid_seq = next_vid_seq;

// PoI ignores offchain events.
// See also: poi-ignores-offchain
let proof_of_indexing = SharedProofOfIndexing::ignored();
Expand Down Expand Up @@ -1244,6 +1251,10 @@ where
return Err(anyhow!("{}", err));
}

// Carry forward the vid sequence so the next iteration doesn't
// reset to RESERVED_VIDS and produce duplicate VIDs.
next_vid_seq = block_state.entity_cache.vid_seq;

mods.extend(
block_state
.entity_cache
Expand Down
101 changes: 101 additions & 0 deletions store/test-store/tests/graph/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,107 @@ async fn check_vid_sequence() {
}
}

// Test that demonstrates the VID collision bug when multiple offchain triggers
// (e.g. file data sources) are processed in the same block. Each trigger creates
// a fresh EntityCache with vid_seq reset to RESERVED_VIDS (100). When two triggers
// in the same block both insert an entity, they produce the same VID, violating
// the primary key constraint.
//
// The fix is to thread vid_seq from one trigger's EntityCache to the next.
#[graph::test]
async fn offchain_trigger_vid_collision_without_fix() {
let block: i32 = 2_163_923; // any block number

// Simulate first offchain trigger: fresh EntityCache (vid_seq starts at 100)
let store1 = Arc::new(MockStore::new(BTreeMap::new()));
let mut cache1 = EntityCache::new(store1);
let band1_data = entity! { SCHEMA => id: "band1", name: "First Band" };
let band1_key = make_band_key("band1");
cache1
.set(band1_key.clone(), band1_data, block, None)
.await
.unwrap();
let result1 = cache1.as_modifications(block).await.unwrap();

// Simulate second offchain trigger: another fresh EntityCache (vid_seq ALSO starts at 100)
let store2 = Arc::new(MockStore::new(BTreeMap::new()));
let mut cache2 = EntityCache::new(store2);
let band2_data = entity! { SCHEMA => id: "band2", name: "Second Band" };
let band2_key = make_band_key("band2");
cache2
.set(band2_key.clone(), band2_data, block, None)
.await
.unwrap();
let result2 = cache2.as_modifications(block).await.unwrap();

// Extract VIDs from both modifications
let vid1 = match &result1.modifications[0] {
EntityModification::Insert { data, .. } => data.vid(),
_ => panic!("expected Insert"),
};
let vid2 = match &result2.modifications[0] {
EntityModification::Insert { data, .. } => data.vid(),
_ => panic!("expected Insert"),
};

// BUG: Both VIDs are identical! This is what causes
// "duplicate key value violates unique constraint"
// vid = (block << 32) + 100 for BOTH triggers
let expected_vid = ((block as i64) << 32) + 100;
assert_eq!(vid1, expected_vid, "first trigger vid should be (block << 32) + 100");
assert_eq!(vid2, expected_vid, "second trigger vid should ALSO be (block << 32) + 100 — the bug!");
assert_eq!(vid1, vid2, "VIDs collide — this is the bug that causes the DB constraint violation");
}

// Test that demonstrates the fix: threading vid_seq from one trigger's
// EntityCache to the next prevents VID collisions.
#[graph::test]
async fn offchain_trigger_vid_no_collision_with_fix() {
let block: i32 = 2_163_923;

// First offchain trigger
let store1 = Arc::new(MockStore::new(BTreeMap::new()));
let mut cache1 = EntityCache::new(store1);
let band1_data = entity! { SCHEMA => id: "band1", name: "First Band" };
let band1_key = make_band_key("band1");
cache1
.set(band1_key.clone(), band1_data, block, None)
.await
.unwrap();

// THE FIX: capture vid_seq BEFORE as_modifications consumes the cache
let next_vid_seq = cache1.vid_seq;
let result1 = cache1.as_modifications(block).await.unwrap();

// Second offchain trigger: set vid_seq to where first trigger left off
let store2 = Arc::new(MockStore::new(BTreeMap::new()));
let mut cache2 = EntityCache::new(store2);
cache2.vid_seq = next_vid_seq; // <-- the fix
let band2_data = entity! { SCHEMA => id: "band2", name: "Second Band" };
let band2_key = make_band_key("band2");
cache2
.set(band2_key.clone(), band2_data, block, None)
.await
.unwrap();
let result2 = cache2.as_modifications(block).await.unwrap();

let vid1 = match &result1.modifications[0] {
EntityModification::Insert { data, .. } => data.vid(),
_ => panic!("expected Insert"),
};
let vid2 = match &result2.modifications[0] {
EntityModification::Insert { data, .. } => data.vid(),
_ => panic!("expected Insert"),
};

// With the fix, VIDs are different
assert_ne!(vid1, vid2, "VIDs should NOT collide when vid_seq is threaded");
let expected_vid1 = ((block as i64) << 32) + 100;
let expected_vid2 = ((block as i64) << 32) + 101;
assert_eq!(vid1, expected_vid1, "first trigger starts at vid_seq 100");
assert_eq!(vid2, expected_vid2, "second trigger continues at vid_seq 101");
}

const ACCOUNT_GQL: &str = "
type Account @entity {
id: ID!
Expand Down