From 42021197d99460a69eda9eacd70f26d769163781 Mon Sep 17 00:00:00 2001 From: Maksim Dimitrov Date: Tue, 16 Dec 2025 23:39:41 +0200 Subject: [PATCH 1/4] store: Reject incompatible graft schemas during site allocation Signed-off-by: Maksim Dimitrov --- store/postgres/src/deployment_store.rs | 16 +----- store/postgres/src/subgraph_store.rs | 75 ++++++++++++++++---------- 2 files changed, 49 insertions(+), 42 deletions(-) diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 3703534979c..a9fcc833e99 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -184,7 +184,6 @@ impl DeploymentStore { schema: &InputSchema, deployment: DeploymentCreate, site: Arc, - graft_base: Option>, replace: bool, on_sync: OnSync, index_def: Option, @@ -217,7 +216,7 @@ impl DeploymentStore { let query = format!("create schema {}", &site.namespace); conn.batch_execute(&query).await?; - let layout = Layout::create_relational_schema( + let _ = Layout::create_relational_schema( conn, site.clone(), schema, @@ -225,19 +224,6 @@ impl DeploymentStore { index_def, ) .await?; - // See if we are grafting and check that the graft is permissible - if let Some(base) = graft_base { - let errors = layout.can_copy_from(&base); - if !errors.is_empty() { - return Err(StoreError::Unknown(anyhow!( - "The subgraph `{}` cannot be used as the graft base \ - for `{}` because the schemas are incompatible:\n - {}", - &base.catalog.site.namespace, - &layout.catalog.site.namespace, - errors.join("\n - ") - ))); - } - } // Create data sources table if site.schema_version.private_data_sources() { diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 478d21eba02..f47049d0e39 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -39,6 +39,7 @@ use graph::{ use graph::{derive::CheapClone, futures03::future::join_all, prelude::alloy::primitives::Address}; use crate::{ + catalog::Catalog, deployment::{OnSync, SubgraphHealth}, primary::{self, DeploymentId, Mirror as PrimaryMirror, Primary, Site}, relational::{ @@ -88,7 +89,7 @@ impl Shard { .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') { return Err(StoreError::InvalidIdentifier(format!( - "shard name `{}` is invalid: shard names must only contain lowercase alphanumeric characters or '_'", name + "shard name `{name}` is invalid: shard names must only contain lowercase alphanumeric characters or '_'" ))); } Ok(Shard(name)) @@ -351,33 +352,60 @@ impl SubgraphStore { // assignment that we used last time to avoid creating // the same deployment in another shard let (shard, node_id) = self.place(&name, &network_name, node_id).await?; + let mut conn = self.primary_conn().await?; - let (site, site_was_created) = conn - .allocate_site(shard, schema.id(), network_name, graft_base) - .await?; - let node_id = conn.assigned_node(&site).await?.unwrap_or(node_id); - (site, !site_was_created, node_id) + conn.transaction(|conn| { + async { + let (site, site_was_created) = conn + .allocate_site(shard, schema.id(), network_name, graft_base) + .await?; + let node_id = conn.assigned_node(&site).await?.unwrap_or(node_id); + let site = Arc::new(site); + + if let Some(graft_base) = graft_base { + // Ensure that the graft base exists + let base_layout = self.layout(graft_base).await?; + let entities_with_causality_region = + deployment.manifest.entities_with_causality_region.clone(); + let catalog = Catalog::for_tests( + site.cheap_clone(), + entities_with_causality_region.into_iter().collect(), + )?; + let layout = Layout::new(site.cheap_clone(), schema, catalog)?; + + let errors = layout.can_copy_from(&base_layout); + if !errors.is_empty() { + return Err(StoreError::Unknown(anyhow!( + "The subgraph `{}` cannot be used as the graft base \ + for `{}` because the schemas are incompatible:\n - {}", + &base_layout.catalog.site.namespace, + &layout.catalog.site.namespace, + errors.join("\n - ") + ))); + } + } + + Ok((site, !site_was_created, node_id)) + } + .scope_boxed() + }) + .await? }; - let site = Arc::new(site); - // if the deployment already exists, we don't need to perform any copying - // so we can set graft_base to None - // if it doesn't exist, we need to copy the graft base to the new deployment - let graft_base_layout = if !exists { - let graft_base = match deployment.graft_base.as_ref() { + // If the deployment already exists, we don't need to perform any copying + // If it doesn't exist, we need to copy the graft base to the new deployment + if !exists { + let graft_base_layout = match graft_base { Some(base) => Some(self.layout(base).await?), None => None, }; - if let Some(graft_base) = &graft_base { + if let Some(graft_base_layout) = &graft_base_layout { self.primary_conn() .await? - .record_active_copy(graft_base.site.as_ref(), site.as_ref()) + .record_active_copy(graft_base_layout.site.as_ref(), site.as_ref()) .await?; } - graft_base - } else { - None }; // Create the actual databases schema and metadata entries @@ -386,7 +414,7 @@ impl SubgraphStore { .get(&site.shard) .ok_or_else(|| StoreError::UnknownShard(site.shard.to_string()))?; - let index_def = if let Some(graft) = &graft_base.clone() { + let index_def = if let Some(graft) = graft_base { if let Some(site) = self.sites.get(graft) { let store = self .stores @@ -406,7 +434,6 @@ impl SubgraphStore { schema, deployment, site.clone(), - graft_base_layout, replace, OnSync::None, index_def, @@ -731,8 +758,7 @@ impl Inner { if src.id == dst.id { return Err(StoreError::Unknown(anyhow!( - "can not copy deployment {} onto itself", - src_loc + "can not copy deployment {src_loc} onto itself" ))); } // The very last thing we do when we set up a copy here is assign it @@ -740,9 +766,7 @@ impl Inner { // should not have been called. if let Some(node) = self.mirror.assigned_node(dst.as_ref()).await? { return Err(StoreError::Unknown(anyhow!( - "can not copy into deployment {} since it is already assigned to node `{}`", - dst_loc, - node + "can not copy into deployment {dst_loc} since it is already assigned to node `{node}`" ))); } let deployment = src_store.load_deployment(src.clone()).await?; @@ -758,8 +782,6 @@ impl Inner { history_blocks_override: None, }; - let graft_base = self.layout(&src.deployment).await?; - self.primary_conn() .await? .record_active_copy(src.as_ref(), dst.as_ref()) @@ -776,7 +798,6 @@ impl Inner { &src_layout.input_schema, deployment, dst.clone(), - Some(graft_base), false, on_sync, Some(index_def), From 526b5c30c4771438fb599d747939eb3d7d06c065 Mon Sep 17 00:00:00 2001 From: Maksim Dimitrov Date: Thu, 22 Jan 2026 11:28:06 +0200 Subject: [PATCH 2/4] store: Move recording the active_copy in the transaction Signed-off-by: Maksim Dimitrov --- store/postgres/src/subgraph_store.rs | 108 +++++++++++++-------------- 1 file changed, 51 insertions(+), 57 deletions(-) diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index f47049d0e39..d56c2e29792 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -340,7 +340,7 @@ impl SubgraphStore { self.evict(schema.id())?; let graft_base = deployment.graft_base.as_ref(); - let (site, exists, node_id) = { + let (site, deployment_store, node_id) = { // We need to deal with two situations: // (1) We are really creating a new subgraph; it therefore needs // to go in the shard and onto the node that the placement @@ -353,67 +353,61 @@ impl SubgraphStore { // the same deployment in another shard let (shard, node_id) = self.place(&name, &network_name, node_id).await?; - let mut conn = self.primary_conn().await?; - conn.transaction(|conn| { - async { - let (site, site_was_created) = conn - .allocate_site(shard, schema.id(), network_name, graft_base) - .await?; - let node_id = conn.assigned_node(&site).await?.unwrap_or(node_id); - let site = Arc::new(site); - - if let Some(graft_base) = graft_base { - // Ensure that the graft base exists - let base_layout = self.layout(graft_base).await?; - let entities_with_causality_region = - deployment.manifest.entities_with_causality_region.clone(); - let catalog = Catalog::for_tests( - site.cheap_clone(), - entities_with_causality_region.into_iter().collect(), - )?; - let layout = Layout::new(site.cheap_clone(), schema, catalog)?; - - let errors = layout.can_copy_from(&base_layout); - if !errors.is_empty() { - return Err(StoreError::Unknown(anyhow!( - "The subgraph `{}` cannot be used as the graft base \ - for `{}` because the schemas are incompatible:\n - {}", - &base_layout.catalog.site.namespace, - &layout.catalog.site.namespace, - errors.join("\n - ") - ))); + let mut pconn = self.primary_conn().await?; + pconn + .transaction(|pconn| { + async { + let (site, site_was_created) = pconn + .allocate_site(shard, schema.id(), network_name, graft_base) + .await?; + let node_id = pconn.assigned_node(&site).await?.unwrap_or(node_id); + let site = Arc::new(site); + let deployment_store = self + .stores + .get(&site.shard) + .ok_or_else(|| StoreError::UnknownShard(site.shard.to_string()))?; + + if site_was_created { + if let Some(graft_base) = graft_base { + // Ensure that the graft base exists + let base_layout = self.layout(graft_base).await?; + let mut shard_conn = + deployment_store.get_replica_conn(ReplicaId::Main).await?; + let entities_with_causality_region = + deployment.manifest.entities_with_causality_region.clone(); + let catalog = Catalog::for_creation( + &mut shard_conn, + site.cheap_clone(), + entities_with_causality_region.into_iter().collect(), + ) + .await?; + let layout = Layout::new(site.cheap_clone(), schema, catalog)?; + + let errors = layout.can_copy_from(&base_layout); + if !errors.is_empty() { + return Err(StoreError::Unknown(anyhow!( + "The subgraph `{}` cannot be used as the graft base \ + for `{}` because the schemas are incompatible:\n - {}", + &base_layout.catalog.site.namespace, + &layout.catalog.site.namespace, + errors.join("\n - ") + ))); + } + + pconn + .record_active_copy(base_layout.site.as_ref(), site.as_ref()) + .await?; + } } - } - - Ok((site, !site_was_created, node_id)) - } - .scope_boxed() - }) - .await? - }; - // If the deployment already exists, we don't need to perform any copying - // If it doesn't exist, we need to copy the graft base to the new deployment - if !exists { - let graft_base_layout = match graft_base { - Some(base) => Some(self.layout(base).await?), - None => None, - }; - - if let Some(graft_base_layout) = &graft_base_layout { - self.primary_conn() - .await? - .record_active_copy(graft_base_layout.site.as_ref(), site.as_ref()) - .await?; - } + Ok((site, deployment_store, node_id)) + } + .scope_boxed() + }) + .await? }; // Create the actual databases schema and metadata entries - let deployment_store = self - .stores - .get(&site.shard) - .ok_or_else(|| StoreError::UnknownShard(site.shard.to_string()))?; - let index_def = if let Some(graft) = graft_base { if let Some(site) = self.sites.get(graft) { let store = self From e4e809acd912cc249ad63468f50c7c73217872a0 Mon Sep 17 00:00:00 2001 From: Maksim Dimitrov Date: Tue, 27 Jan 2026 14:54:26 +0200 Subject: [PATCH 3/4] fix(store): handle orphaned sites in graft compatibility check Move graft compatibility validation out of primary transaction to avoid holding primary and shard connections simultaneously, which could exhaust connection pools when they share the same database. - Detect orphaned sites (site exists but deployment doesn't) and re-run the graft `can_copy_from` check on redeploy - Only insert into `active_copies` when a copy is actually needed, avoiding spurious records for already-copied deployments - Maintain idempotency: failed deployments leave state that will be properly validated on the next attempt Signed-off-by: Maksim Dimitrov --- store/postgres/src/subgraph_store.rs | 104 ++++++++++++++------------- 1 file changed, 55 insertions(+), 49 deletions(-) diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index d56c2e29792..74ec326de6c 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -354,57 +354,63 @@ impl SubgraphStore { let (shard, node_id) = self.place(&name, &network_name, node_id).await?; let mut pconn = self.primary_conn().await?; - pconn - .transaction(|pconn| { - async { - let (site, site_was_created) = pconn - .allocate_site(shard, schema.id(), network_name, graft_base) - .await?; - let node_id = pconn.assigned_node(&site).await?.unwrap_or(node_id); - let site = Arc::new(site); - let deployment_store = self - .stores - .get(&site.shard) - .ok_or_else(|| StoreError::UnknownShard(site.shard.to_string()))?; - - if site_was_created { - if let Some(graft_base) = graft_base { - // Ensure that the graft base exists - let base_layout = self.layout(graft_base).await?; - let mut shard_conn = - deployment_store.get_replica_conn(ReplicaId::Main).await?; - let entities_with_causality_region = - deployment.manifest.entities_with_causality_region.clone(); - let catalog = Catalog::for_creation( - &mut shard_conn, - site.cheap_clone(), - entities_with_causality_region.into_iter().collect(), - ) - .await?; - let layout = Layout::new(site.cheap_clone(), schema, catalog)?; - - let errors = layout.can_copy_from(&base_layout); - if !errors.is_empty() { - return Err(StoreError::Unknown(anyhow!( - "The subgraph `{}` cannot be used as the graft base \ + + let (site, site_was_created) = pconn + .allocate_site(shard, schema.id(), network_name, graft_base) + .await?; + let node_id = pconn.assigned_node(&site).await?.unwrap_or(node_id); + let site = Arc::new(site); + let deployment_store = self + .stores + .get(&site.shard) + .ok_or_else(|| StoreError::UnknownShard(site.shard.to_string()))?; + + let mut shard_conn = deployment_store.get_replica_conn(ReplicaId::Main).await?; + let needs_check = if site_was_created { + true + } else { + // If deployment does not exist, but site exists it means + // that we are recovering from a failed deployment creation with an orphaned site. + // In that case, we should check graft compatibility again. + let exists = crate::deployment::exists(&mut shard_conn, &site).await?; + !exists + }; + + if let Some(graft_base) = graft_base { + let base_layout = self.layout(graft_base).await?; + + if needs_check { + let entities_with_causality_region = + deployment.manifest.entities_with_causality_region.clone(); + let catalog = Catalog::for_creation( + &mut shard_conn, + site.cheap_clone(), + entities_with_causality_region.into_iter().collect(), + ) + .await?; + let layout = Layout::new(site.cheap_clone(), schema, catalog)?; + + let errors = layout.can_copy_from(&base_layout); + if !errors.is_empty() { + return Err(StoreError::Unknown(anyhow!( + "The subgraph `{}` cannot be used as the graft base \ for `{}` because the schemas are incompatible:\n - {}", - &base_layout.catalog.site.namespace, - &layout.catalog.site.namespace, - errors.join("\n - ") - ))); - } - - pconn - .record_active_copy(base_layout.site.as_ref(), site.as_ref()) - .await?; - } - } - - Ok((site, deployment_store, node_id)) + &base_layout.catalog.site.namespace, + &layout.catalog.site.namespace, + errors.join("\n - ") + ))); } - .scope_boxed() - }) - .await? + + // Only record active copy when the graft check passes and a copy is needed. + // If deployment already exists, the copy has either completed (no active_copies + // record) or is in progress (active_copies record already exists). + pconn + .record_active_copy(base_layout.site.as_ref(), site.as_ref()) + .await?; + } + } + + (site, deployment_store, node_id) }; // Create the actual databases schema and metadata entries From c50f89f8380d0ec65ecd8a63d1f6400bdb59658d Mon Sep 17 00:00:00 2001 From: Maksim Dimitrov Date: Thu, 29 Jan 2026 17:23:00 +0200 Subject: [PATCH 4/4] store: Address comments and move evaluations until they are needed. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move deployment exists check into the graft_base branch so the DB exists call is only executed when we actually are creating a graft. Rename needs_check → should_validate and add a clear comment describing the validation cases. Avoid calling layout unless validation is required and keep shard connections short‑lived (acquire/drop per use) to reduce connection deadlock risk. Signed-off-by: Maksim Dimitrov --- store/postgres/src/subgraph_store.rs | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 74ec326de6c..b4b6a65edce 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -365,23 +365,21 @@ impl SubgraphStore { .get(&site.shard) .ok_or_else(|| StoreError::UnknownShard(site.shard.to_string()))?; - let mut shard_conn = deployment_store.get_replica_conn(ReplicaId::Main).await?; - let needs_check = if site_was_created { - true - } else { - // If deployment does not exist, but site exists it means - // that we are recovering from a failed deployment creation with an orphaned site. - // In that case, we should check graft compatibility again. - let exists = crate::deployment::exists(&mut shard_conn, &site).await?; - !exists - }; - if let Some(graft_base) = graft_base { - let base_layout = self.layout(graft_base).await?; - - if needs_check { + // Perform schema compatibility validation when: + // - the site was just created (first-time deployment), or + // - the site exists but the deployment is missing (recovering from a failed create). + // The `exists` DB call is only made if `site_was_created` is false. + let should_validate = { + let mut shard_conn = deployment_store.get_replica_conn(ReplicaId::Main).await?; + site_was_created || !crate::deployment::exists(&mut shard_conn, &site).await? + }; + + if should_validate { + let base_layout = self.layout(graft_base).await?; let entities_with_causality_region = deployment.manifest.entities_with_causality_region.clone(); + let mut shard_conn = deployment_store.get_replica_conn(ReplicaId::Main).await?; let catalog = Catalog::for_creation( &mut shard_conn, site.cheap_clone(),