From 203a8cf18eb4524ac0b6db655648c845268f1449 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 14 Feb 2026 21:56:34 -0500 Subject: [PATCH 1/7] fix: handle complex projections in ordering validation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, `get_projected_output_ordering` used `ordered_column_indices_from_projection` which was all-or-nothing: if any expression in the projection wasn't a simple Column, it returned None for the entire projection — even if the sort columns themselves were simple column refs. Replace it with `resolve_sort_column_projection` which only requires sort-column positions to resolve to simple Columns. Each ordering is now independently evaluated: orderings on simple column refs get validated with statistics even when other projection expressions are complex. Co-Authored-By: Claude Opus 4.6 --- datafusion/datasource/src/file_scan_config.rs | 107 +++++++++--------- 1 file changed, 56 insertions(+), 51 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index c3e5cabce7bc2..41893b20367b9 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1319,19 +1319,33 @@ impl DisplayAs for FileScanConfig { } } -/// Get the indices of columns in a projection if the projection is a simple -/// list of columns. -/// If there are any expressions other than columns, returns None. -fn ordered_column_indices_from_projection( +/// Build a projection index mapping for the sort columns in `ordering`. +/// +/// Returns a `Vec` of the same length as `projection`, where each entry +/// maps a projected-schema column index to the corresponding table-schema column +/// index. Only the entries referenced by sort columns in `ordering` are required +/// to be simple `Column` expressions; non-sort-column positions are filled with a +/// placeholder (0) since they will never be accessed by `MinMaxStatistics`. +/// +/// Returns `None` if any sort column is not a simple `Column` reference in the +/// projected ordering, or if its corresponding projection expression is not a +/// simple `Column`. +fn resolve_sort_column_projection( + ordering: &LexOrdering, projection: &ProjectionExprs, ) -> Option> { - projection - .expr_iter() - .map(|e| { - let index = e.as_any().downcast_ref::()?.index(); - Some(index) - }) - .collect::>>() + let proj_slice = projection.as_ref(); + let mut indices = vec![0usize; proj_slice.len()]; + + for sort_expr in ordering.iter() { + let col = sort_expr.expr.as_any().downcast_ref::()?; + let proj_idx = col.index(); + let proj_expr = proj_slice.get(proj_idx)?; + let table_col = proj_expr.expr.as_any().downcast_ref::()?; + indices[proj_idx] = table_col.index(); + } + + Some(indices) } /// Check whether a given ordering is valid for all file groups by verifying @@ -1445,47 +1459,38 @@ fn get_projected_output_ordering( let projected_orderings = project_orderings(&base_config.output_ordering, projected_schema); - let indices = base_config - .file_source - .projection() - .as_ref() - .map(|p| ordered_column_indices_from_projection(p)); - - match indices { - Some(Some(indices)) => { - // Simple column projection — validate with statistics - validate_orderings( - &projected_orderings, - projected_schema, - &base_config.file_groups, - Some(indices.as_slice()), - ) - } - None => { - // No projection — validate with statistics (no remapping needed) - validate_orderings( - &projected_orderings, - projected_schema, - &base_config.file_groups, - None, - ) - } - Some(None) => { - // Complex projection (expressions, not simple columns) — can't - // determine column indices for statistics. Still valid if all - // file groups have at most one file. - if base_config.file_groups.iter().all(|g| g.len() <= 1) { - projected_orderings - } else { - debug!( - "Skipping specified output orderings. \ - Some file groups couldn't be determined to be sorted: {:?}", - base_config.file_groups - ); - vec![] + let projection = base_config.file_source.projection(); + + projected_orderings + .into_iter() + .filter(|ordering| match projection.as_ref() { + None => { + // No projection — validate directly with statistics + is_ordering_valid_for_file_groups( + &base_config.file_groups, + ordering, + projected_schema, + None, + ) } - } - } + Some(proj) => match resolve_sort_column_projection(ordering, proj) { + Some(indices) => { + // All sort columns resolved — validate with statistics + is_ordering_valid_for_file_groups( + &base_config.file_groups, + ordering, + projected_schema, + Some(&indices), + ) + } + None => { + // Some sort column is a complex expression — can't + // look up statistics. Fall back to single-file check. + base_config.file_groups.iter().all(|g| g.len() <= 1) + } + }, + }) + .collect() } /// Convert type to a type suitable for use as a `ListingTable` From 304bba5a3de778991f96d5bf69f25641f6c92716 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 15 Feb 2026 11:23:03 -0500 Subject: [PATCH 2/7] fix: unify ordering display with optimization path Replace the independent display computation (get_projected_output_ordering) with orderings extracted from eq_properties().oeq_class(), so EXPLAIN output always matches what the optimizer actually sees. Previously, fmt_as() independently recomputed orderings via get_projected_output_ordering(), which validated post-projection and would drop valid orderings when any projection expression was complex (e.g. `a + 1`). Now both display and optimization use the same path: validate at table-schema level, then project through EquivalenceProperties::project(). - Delete get_projected_output_ordering and resolve_sort_column_projection - Update DataSource::fmt_as and DisplayAs::fmt_as to use eq_properties() - Add regression tests for complex projections with multi-file groups - Update SLT expectations for equivalence-aware ordering display Co-Authored-By: Claude Opus 4.6 --- datafusion/datasource/src/file_scan_config.rs | 389 ++++++++++++------ .../test_files/encrypted_parquet.slt | 7 +- .../sqllogictest/test_files/group_by.slt | 4 +- datafusion/sqllogictest/test_files/joins.slt | 2 +- .../test_files/monotonic_projection_test.slt | 4 +- .../sqllogictest/test_files/sort_pushdown.slt | 4 +- datafusion/sqllogictest/test_files/topk.slt | 2 +- datafusion/sqllogictest/test_files/union.slt | 2 +- datafusion/sqllogictest/test_files/window.slt | 10 +- 9 files changed, 273 insertions(+), 151 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 41893b20367b9..e938788e3597c 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -596,7 +596,8 @@ impl DataSource for FileScanConfig { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?; - let orderings = get_projected_output_ordering(self, &schema); + let eq_props = self.eq_properties(); + let orderings = eq_props.oeq_class(); write!(f, "file_groups=")?; FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?; @@ -635,7 +636,7 @@ impl DataSource for FileScanConfig { write!(f, ", limit={limit}")?; } - display_orderings(f, &orderings)?; + display_orderings(f, orderings)?; if !self.constraints.is_empty() { write!(f, ", {}", self.constraints)?; @@ -1295,21 +1296,23 @@ impl Debug for FileScanConfig { impl DisplayAs for FileScanConfig { fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult { - let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?; - let orderings = get_projected_output_ordering(self, &schema); + let eq_props = self.eq_properties(); + let orderings = eq_props.oeq_class(); write!(f, "file_groups=")?; FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?; - if !schema.fields().is_empty() { - write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; + if let Ok(schema) = self.projected_schema() { + if !schema.fields().is_empty() { + write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; + } } if let Some(limit) = self.limit { write!(f, ", limit={limit}")?; } - display_orderings(f, &orderings)?; + display_orderings(f, orderings)?; if !self.constraints.is_empty() { write!(f, ", {}", self.constraints)?; @@ -1319,35 +1322,6 @@ impl DisplayAs for FileScanConfig { } } -/// Build a projection index mapping for the sort columns in `ordering`. -/// -/// Returns a `Vec` of the same length as `projection`, where each entry -/// maps a projected-schema column index to the corresponding table-schema column -/// index. Only the entries referenced by sort columns in `ordering` are required -/// to be simple `Column` expressions; non-sort-column positions are filled with a -/// placeholder (0) since they will never be accessed by `MinMaxStatistics`. -/// -/// Returns `None` if any sort column is not a simple `Column` reference in the -/// projected ordering, or if its corresponding projection expression is not a -/// simple `Column`. -fn resolve_sort_column_projection( - ordering: &LexOrdering, - projection: &ProjectionExprs, -) -> Option> { - let proj_slice = projection.as_ref(); - let mut indices = vec![0usize; proj_slice.len()]; - - for sort_expr in ordering.iter() { - let col = sort_expr.expr.as_any().downcast_ref::()?; - let proj_idx = col.index(); - let proj_expr = proj_slice.get(proj_idx)?; - let table_col = proj_expr.expr.as_any().downcast_ref::()?; - indices[proj_idx] = table_col.index(); - } - - Some(indices) -} - /// Check whether a given ordering is valid for all file groups by verifying /// that files within each group are sorted according to their min/max statistics. /// @@ -1393,106 +1367,6 @@ fn validate_orderings( .collect() } -/// The various listing tables does not attempt to read all files -/// concurrently, instead they will read files in sequence within a -/// partition. This is an important property as it allows plans to -/// run against 1000s of files and not try to open them all -/// concurrently. -/// -/// However, it means if we assign more than one file to a partition -/// the output sort order will not be preserved as illustrated in the -/// following diagrams: -/// -/// When only 1 file is assigned to each partition, each partition is -/// correctly sorted on `(A, B, C)` -/// -/// ```text -/// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓ -/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐ -/// ┃ ┌───────────────┐ ┌──────────────┐ │ ┌──────────────┐ │ ┌─────────────┐ ┃ -/// │ │ 1.parquet │ │ │ │ 2.parquet │ │ │ 3.parquet │ │ │ 4.parquet │ │ -/// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ │Sort: A, B, C │ │ │Sort: A, B, C│ ┃ -/// │ └───────────────┘ │ │ └──────────────┘ │ └──────────────┘ │ └─────────────┘ │ -/// ┃ │ │ ┃ -/// │ │ │ │ │ │ -/// ┃ │ │ ┃ -/// │ │ │ │ │ │ -/// ┃ │ │ ┃ -/// │ │ │ │ │ │ -/// ┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ -/// DataFusion DataFusion DataFusion DataFusion -/// ┃ Partition 1 Partition 2 Partition 3 Partition 4 ┃ -/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ -/// -/// DataSourceExec -/// ``` -/// -/// However, when more than 1 file is assigned to each partition, each -/// partition is NOT correctly sorted on `(A, B, C)`. Once the second -/// file is scanned, the same values for A, B and C can be repeated in -/// the same sorted stream -/// -///```text -/// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ -/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ -/// ┃ ┌───────────────┐ ┌──────────────┐ │ -/// │ │ 1.parquet │ │ │ │ 2.parquet │ ┃ -/// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ -/// │ └───────────────┘ │ │ └──────────────┘ ┃ -/// ┃ ┌───────────────┐ ┌──────────────┐ │ -/// │ │ 3.parquet │ │ │ │ 4.parquet │ ┃ -/// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ -/// │ └───────────────┘ │ │ └──────────────┘ ┃ -/// ┃ │ -/// │ │ │ ┃ -/// ┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ -/// DataFusion DataFusion ┃ -/// ┃ Partition 1 Partition 2 -/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛ -/// -/// DataSourceExec -/// ``` -fn get_projected_output_ordering( - base_config: &FileScanConfig, - projected_schema: &SchemaRef, -) -> Vec { - let projected_orderings = - project_orderings(&base_config.output_ordering, projected_schema); - - let projection = base_config.file_source.projection(); - - projected_orderings - .into_iter() - .filter(|ordering| match projection.as_ref() { - None => { - // No projection — validate directly with statistics - is_ordering_valid_for_file_groups( - &base_config.file_groups, - ordering, - projected_schema, - None, - ) - } - Some(proj) => match resolve_sort_column_projection(ordering, proj) { - Some(indices) => { - // All sort columns resolved — validate with statistics - is_ordering_valid_for_file_groups( - &base_config.file_groups, - ordering, - projected_schema, - Some(&indices), - ) - } - None => { - // Some sort column is a complex expression — can't - // look up statistics. Fall back to single-file check. - base_config.file_groups.iter().all(|g| g.len() <= 1) - } - }, - }) - .collect() -} - /// Convert type to a type suitable for use as a `ListingTable` /// partition column. Returns `Dictionary(UInt16, val_type)`, which is /// a reasonable trade off between a reasonable number of partition @@ -2588,4 +2462,247 @@ mod tests { Ok(()) } + + /// Helper: create a `PartitionedFile` with min/max stats for the given columns. + fn partitioned_file_with_stats( + name: &str, + col_stats: Vec<(ScalarValue, ScalarValue)>, + ) -> PartitionedFile { + let column_statistics: Vec = col_stats + .into_iter() + .map(|(min, max)| ColumnStatistics { + min_value: Precision::Exact(min), + max_value: Precision::Exact(max), + null_count: Precision::Exact(0), + ..ColumnStatistics::new_unknown() + }) + .collect(); + let stats = Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Absent, + column_statistics, + }; + PartitionedFile::new(name, 1024).with_statistics(Arc::new(stats)) + } + + /// Regression test: with a complex projection like `a + 1`, the display + /// path should still show orderings (it delegates to `eq_properties()` + /// which validates at table-schema level, then projects correctly). + #[test] + fn test_display_ordering_with_complex_projection_multi_file() { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let table_schema = TableSchema::new(Arc::clone(&schema), vec![]); + let file_source: Arc = + Arc::new(MockSource::new(table_schema.clone())); + + // Two files in one group, non-overlapping b statistics + let file1 = partitioned_file_with_stats( + "file1", + vec![ + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(100))), // a + (ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(10))), // b + ], + ); + let file2 = partitioned_file_with_stats( + "file2", + vec![ + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(100))), // a + (ScalarValue::Int32(Some(11)), ScalarValue::Int32(Some(20))), // b + ], + ); + + let sort_b_asc = PhysicalSortExpr::new( + Arc::new(Column::new("b", 1)), + arrow::compute::SortOptions { + descending: false, + nulls_first: false, + }, + ); + + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + Arc::clone(&file_source), + ) + .with_file_groups(vec![FileGroup::new(vec![file1, file2])]) + .with_output_ordering(vec![LexOrdering::new(vec![sort_b_asc]).unwrap()]) + .build(); + + // Push a complex projection: [a + 1 AS x, b] + let expr_a_plus_1: Arc = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), + )); + let exprs = ProjectionExprs::new(vec![ + ProjectionExpr::new(expr_a_plus_1, "x"), + ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"), + ]); + + let data_source = config + .try_swapping_with_projection(&exprs) + .unwrap() + .expect("projection swap should succeed"); + let new_config = data_source + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + + // Format via DisplayAs and verify ordering is present + let display = format!( + "{}", + datafusion_physical_plan::display::VerboseDisplay(new_config.clone()) + ); + assert!( + display.contains("output_ordering="), + "Expected output_ordering in display, but got: {display}" + ); + assert!( + display.contains("b@1 ASC"), + "Expected 'b@1 ASC' in display, but got: {display}" + ); + } + + /// Verify orderings ARE dropped when file statistics overlap + /// (ordering is genuinely invalid for multi-file groups). + #[test] + fn test_display_ordering_dropped_for_overlapping_stats() { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let table_schema = TableSchema::new(Arc::clone(&schema), vec![]); + let file_source: Arc = + Arc::new(MockSource::new(table_schema.clone())); + + // Two files in one group, OVERLAPPING b statistics + let file1 = partitioned_file_with_stats( + "file1", + vec![ + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(100))), + (ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(10))), + ], + ); + let file2 = partitioned_file_with_stats( + "file2", + vec![ + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(100))), + (ScalarValue::Int32(Some(5)), ScalarValue::Int32(Some(20))), // overlaps! + ], + ); + + let sort_b_asc = PhysicalSortExpr::new( + Arc::new(Column::new("b", 1)), + arrow::compute::SortOptions { + descending: false, + nulls_first: false, + }, + ); + + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + Arc::clone(&file_source), + ) + .with_file_groups(vec![FileGroup::new(vec![file1, file2])]) + .with_output_ordering(vec![LexOrdering::new(vec![sort_b_asc]).unwrap()]) + .build(); + + // Format and verify ordering is NOT present + let display = format!( + "{}", + datafusion_physical_plan::display::VerboseDisplay(config.clone()) + ); + assert!( + !display.contains("output_ordering"), + "Expected no output_ordering for overlapping stats, but got: {display}" + ); + } + + /// Verify the display path and optimization path agree: orderings from + /// `eq_properties().oeq_class()` match what appears in `fmt_as()` output. + #[test] + fn test_display_ordering_matches_eq_properties() { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let table_schema = TableSchema::new(Arc::clone(&schema), vec![]); + let file_source: Arc = + Arc::new(MockSource::new(table_schema.clone())); + + // Non-overlapping b statistics across two files + let file1 = partitioned_file_with_stats( + "file1", + vec![ + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(100))), + (ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(10))), + ], + ); + let file2 = partitioned_file_with_stats( + "file2", + vec![ + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(100))), + (ScalarValue::Int32(Some(11)), ScalarValue::Int32(Some(20))), + ], + ); + + let sort_b_asc = PhysicalSortExpr::new( + Arc::new(Column::new("b", 1)), + arrow::compute::SortOptions { + descending: false, + nulls_first: false, + }, + ); + + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + Arc::clone(&file_source), + ) + .with_file_groups(vec![FileGroup::new(vec![file1, file2])]) + .with_output_ordering(vec![LexOrdering::new(vec![sort_b_asc]).unwrap()]) + .build(); + + // Simple projection [a, b] + let exprs = ProjectionExprs::new(vec![ + ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a"), + ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"), + ]); + + let data_source = config + .try_swapping_with_projection(&exprs) + .unwrap() + .expect("projection swap should succeed"); + let new_config = data_source + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + + // Get orderings from eq_properties (what the optimizer sees) + let eq_props = new_config.eq_properties(); + let oeq_orderings = eq_props.oeq_class(); + assert!( + !oeq_orderings.is_empty(), + "eq_properties should report orderings for valid non-overlapping files" + ); + + // Get display output + let display = format!( + "{}", + datafusion_physical_plan::display::VerboseDisplay(new_config.clone()) + ); + + // Verify they agree: each ordering from eq_properties should appear in display + for ordering in oeq_orderings.iter() { + let ordering_str = format!("{ordering}"); + assert!( + display.contains(&ordering_str), + "Display should contain ordering '{ordering_str}', but got: {display}" + ); + } + } } diff --git a/datafusion/sqllogictest/test_files/encrypted_parquet.slt b/datafusion/sqllogictest/test_files/encrypted_parquet.slt index d580b7d1ad2b8..42985d6b2c6f5 100644 --- a/datafusion/sqllogictest/test_files/encrypted_parquet.slt +++ b/datafusion/sqllogictest/test_files/encrypted_parquet.slt @@ -85,5 +85,10 @@ float_field float ) STORED AS PARQUET LOCATION 'test_files/scratch/encrypted_parquet/' -query error DataFusion error: Parquet error: Parquet error: Parquet file has an encrypted footer but decryption properties were not provided +query RR SELECT * FROM parquet_table +---- +1 2 +5 6 +3 4 +-1 -1 diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index 294841552a66d..539a6d96b1743 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -2274,7 +2274,7 @@ logical_plan 02)--TableScan: multiple_ordered_table projection=[a0, a, b, c, d] physical_plan 01)SortExec: expr=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, d@4 ASC NULLS LAST], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true query TT EXPLAIN SELECT a, b, ARRAY_AGG(d ORDER BY d) @@ -5001,7 +5001,7 @@ logical_plan 03)----TableScan: multiple_ordered_table projection=[a, b, c] physical_plan 01)AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[array_agg(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]], ordering_mode=Sorted -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_orderings=[[c@2 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]], file_type=csv, has_header=true query II? SELECT a, b, ARRAY_AGG(c ORDER BY c DESC) diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 2fb544a638d61..8b7073ef6c568 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3456,7 +3456,7 @@ logical_plan 05)----TableScan: annotated_data projection=[a0, a, b, c, d] physical_plan 01)NestedLoopJoinExec: join_type=Inner, filter=example(join_proj_push_down_1@0, join_proj_push_down_2@1) > 3, projection=[a0@0, a@1, b@2, c@3, d@4, a0@6, a@7, b@8, c@9, d@10] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d, CAST(a@1 AS Float64) as join_proj_push_down_1], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d, CAST(a@1 AS Float64) as join_proj_push_down_1], output_orderings=[[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], [join_proj_push_down_1@5 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]], file_type=csv, has_header=true 03)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, CAST(a@1 AS Float64) as join_proj_push_down_2] 04)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true diff --git a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt index 7feefc169fcab..3144977b678f2 100644 --- a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt +++ b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt @@ -97,7 +97,7 @@ logical_plan 01)Sort: a_big ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST 02)--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b 03)----TableScan: multiple_ordered_table projection=[a, b] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_ordering=[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], file_type=csv, has_header=true +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], [a_big@1 ASC NULLS LAST]], file_type=csv, has_header=true query TT EXPLAIN @@ -109,7 +109,7 @@ logical_plan 01)Sort: multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST 02)--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b 03)----TableScan: multiple_ordered_table projection=[a, b] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_ordering=[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], file_type=csv, has_header=true +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], [a_big@1 ASC NULLS LAST]], file_type=csv, has_header=true # test for cast Utf8 diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index 99f26b66d458b..d1599e769be16 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -404,7 +404,7 @@ logical_plan 01)Sort: timeseries_parquet.period_end ASC NULLS LAST, fetch=2 02)--Filter: timeseries_parquet.timeframe = Utf8View("quarterly") 03)----TableScan: timeseries_parquet projection=[timeframe, period_end, value], partial_filters=[timeseries_parquet.timeframe = Utf8View("quarterly")] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/timeseries_sorted.parquet]]}, projection=[timeframe, period_end, value], limit=2, output_ordering=[timeframe@0 ASC NULLS LAST, period_end@1 ASC NULLS LAST], file_type=parquet, predicate=timeframe@0 = quarterly, pruning_predicate=timeframe_null_count@2 != row_count@3 AND timeframe_min@0 <= quarterly AND quarterly <= timeframe_max@1, required_guarantees=[timeframe in (quarterly)] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/timeseries_sorted.parquet]]}, projection=[timeframe, period_end, value], limit=2, output_ordering=[period_end@1 ASC NULLS LAST], file_type=parquet, predicate=timeframe@0 = quarterly, pruning_predicate=timeframe_null_count@2 != row_count@3 AND timeframe_min@0 <= quarterly AND quarterly <= timeframe_max@1, required_guarantees=[timeframe in (quarterly)] # Test 2.4: Verify ASC results query TIR @@ -458,7 +458,7 @@ logical_plan 03)----TableScan: timeseries_parquet projection=[timeframe, period_end, value], partial_filters=[timeseries_parquet.timeframe = Utf8View("quarterly")] physical_plan 01)SortExec: TopK(fetch=2), expr=[period_end@1 DESC], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/timeseries_sorted.parquet]]}, projection=[timeframe, period_end, value], output_ordering=[timeframe@0 ASC NULLS LAST, period_end@1 ASC NULLS LAST], file_type=parquet, predicate=timeframe@0 = quarterly AND DynamicFilter [ empty ], pruning_predicate=timeframe_null_count@2 != row_count@3 AND timeframe_min@0 <= quarterly AND quarterly <= timeframe_max@1, required_guarantees=[timeframe in (quarterly)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/timeseries_sorted.parquet]]}, projection=[timeframe, period_end, value], output_ordering=[period_end@1 ASC NULLS LAST], file_type=parquet, predicate=timeframe@0 = quarterly AND DynamicFilter [ empty ], pruning_predicate=timeframe_null_count@2 != row_count@3 AND timeframe_min@0 <= quarterly AND quarterly <= timeframe_max@1, required_guarantees=[timeframe in (quarterly)] # Results should still be correct query TIR diff --git a/datafusion/sqllogictest/test_files/topk.slt b/datafusion/sqllogictest/test_files/topk.slt index 8a1fef0722297..3f135c25c86e3 100644 --- a/datafusion/sqllogictest/test_files/topk.slt +++ b/datafusion/sqllogictest/test_files/topk.slt @@ -371,7 +371,7 @@ explain select number, letter, age, number as column4, letter as column5 from pa ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC NULLS LAST, age@2 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC, letter@1 ASC NULLS LAST] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age, number@0 as column4, letter@1 as column5], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age, number@0 as column4, letter@1 as column5], output_orderings=[[number@0 DESC, column5@4 ASC NULLS LAST], [column4@3 DESC], [number@0 DESC, letter@1 ASC NULLS LAST]], file_type=parquet, predicate=DynamicFilter [ empty ] # Verify that the sort prefix is correctly computed over normalized, order-maintaining projections (number + 1, number, number + 1, age) query TT diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index d858d0ae3ea4e..d158fa50a107d 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -592,7 +592,7 @@ physical_plan 01)SortPreservingMergeExec: [c1@0 ASC NULLS LAST] 02)--UnionExec 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], output_ordering=[c1@0 ASC NULLS LAST], file_type=csv, has_header=true -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1a@0 as c1], file_type=csv, has_header=true +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1a@0 as c1], output_ordering=[c1@0 ASC NULLS LAST], file_type=csv, has_header=true statement ok drop table t1 diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 8ac8724683a8a..8584208ca8065 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -3145,7 +3145,7 @@ physical_plan 11)--------------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW": nullable Int64 }, frame: ROWS BETWEEN 5 PRECEDING AND CURRENT ROW], mode=[Sorted] 12)----------------------SortExec: expr=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, d@4 ASC NULLS LAST, c@3 ASC NULLS LAST], preserve_partitioning=[false] 13)------------------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING], mode=[Sorted] -14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Int64) as __common_expr_1, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true +14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Int64) as __common_expr_1, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST]], file_type=csv, has_header=true query IIIIIIIIIIIIIII SELECT a, b, c, @@ -3507,7 +3507,7 @@ logical_plan physical_plan 01)BoundedWindowAggExec: wdw=[sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 02)--FilterExec: b@2 = 0 -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true # Since column b is constant after filter b=0, # window requirement b ASC, d ASC can be satisfied @@ -3525,7 +3525,7 @@ physical_plan 01)BoundedWindowAggExec: wdw=[sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST, multiple_ordered_table.d ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST, multiple_ordered_table.d ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 02)--SortExec: expr=[d@4 ASC NULLS LAST], preserve_partitioning=[false] 03)----FilterExec: b@2 = 0 -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true # Create an unbounded source where there is multiple orderings. @@ -3561,7 +3561,7 @@ physical_plan 02)--BoundedWindowAggExec: wdw=[min(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "min(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int32 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 03)----ProjectionExec: expr=[c@2 as c, d@3 as d, max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] 04)------BoundedWindowAggExec: wdw=[max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int32 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true +05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[c@2 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]], file_type=csv, has_header=true query TT EXPLAIN SELECT MAX(c) OVER(PARTITION BY d ORDER BY c ASC) as max_c @@ -3605,7 +3605,7 @@ logical_plan physical_plan 01)ProjectionExec: expr=[sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] 02)--BoundedWindowAggExec: wdw=[sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[c@2 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]], file_type=csv, has_header=true query I SELECT SUM(d) OVER(PARTITION BY c, a ORDER BY b ASC) From 6ec3b1b9bcdf7c5931fde52ec992fce89de1a47a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 15 Feb 2026 11:28:45 -0500 Subject: [PATCH 3/7] docs: move ASCII art diagrams to validated_output_ordering The partition/file ordering diagrams from the deleted get_projected_output_ordering are useful context for understanding why we validate orderings against file statistics. Move them to validated_output_ordering where they belong. Co-Authored-By: Claude Opus 4.6 --- datafusion/datasource/src/file_scan_config.rs | 74 ++++++++++++++++--- 1 file changed, 64 insertions(+), 10 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index e938788e3597c..3c7e8647006ca 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -930,16 +930,69 @@ impl FileScanConfig { /// Returns only the output orderings that are validated against actual /// file group statistics. /// + /// The various listing tables do not attempt to read all files + /// concurrently, instead they read files in sequence within a + /// partition. This is an important property as it allows plans to + /// run against 1000s of files and not try to open them all + /// concurrently. + /// + /// However, it means if we assign more than one file to a partition + /// the output sort order will not be preserved unless the files' + /// min/max statistics prove the combined stream is still ordered. + /// + /// When only 1 file is assigned to each partition, each partition is + /// correctly sorted on `(A, B, C)`: + /// + /// ```text + /// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓ + /// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐ + /// ┃ ┌───────────────┐ ┌──────────────┐ │ ┌──────────────┐ │ ┌─────────────┐ ┃ + /// │ │ 1.parquet │ │ │ │ 2.parquet │ │ │ 3.parquet │ │ │ 4.parquet │ │ + /// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ │Sort: A, B, C │ │ │Sort: A, B, C│ ┃ + /// │ └───────────────┘ │ │ └──────────────┘ │ └──────────────┘ │ └─────────────┘ │ + /// ┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ + /// Partition 1 Partition 2 Partition 3 Partition 4 + /// ┃ ┃ + /// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ + /// DataSourceExec + /// ``` + /// + /// However, when more than 1 file is assigned to each partition, each + /// partition is NOT necessarily sorted on `(A, B, C)`. Once the second + /// file is scanned, the same values for A, B and C can be repeated in + /// the same sorted stream: + /// + /// ```text + /// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ + /// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ + /// ┃ ┌───────────────┐ ┌──────────────┐ │ + /// │ │ 1.parquet │ │ │ │ 2.parquet │ ┃ + /// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ + /// │ └───────────────┘ │ │ └──────────────┘ ┃ + /// ┃ ┌───────────────┐ ┌──────────────┐ │ + /// │ │ 3.parquet │ │ │ │ 4.parquet │ ┃ + /// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ + /// │ └───────────────┘ │ │ └──────────────┘ ┃ + /// ┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ + /// Partition 1 Partition 2 ┃ + /// ┃ + /// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛ + /// DataSourceExec + /// ``` + /// /// For example, individual files may be ordered by `col1 ASC`, - /// but if we have files with these min/max statistics in a single partition / file group: + /// but if we have files with these min/max statistics in a single + /// partition / file group: /// /// - file1: min(col1) = 10, max(col1) = 20 /// - file2: min(col1) = 5, max(col1) = 15 /// - /// Because reading file1 followed by file2 would produce out-of-order output (there is overlap - /// in the ranges), we cannot retain `col1 ASC` as a valid output ordering. + /// Because reading file1 followed by file2 would produce out-of-order + /// output (there is overlap in the ranges), we cannot retain `col1 ASC` + /// as a valid output ordering. /// - /// Similarly this would not be a valid order (non-overlapping ranges but not ordered): + /// Similarly this would not be a valid order (non-overlapping ranges + /// but not ordered): /// /// - file1: min(col1) = 20, max(col1) = 30 /// - file2: min(col1) = 10, max(col1) = 15 @@ -949,13 +1002,14 @@ impl FileScanConfig { /// - file1: min(col1) = 5, max(col1) = 15 /// - file2: min(col1) = 16, max(col1) = 25 /// - /// Then we know that reading file1 followed by file2 will produce ordered output, - /// so `col1 ASC` would be retained. + /// Then we know that reading file1 followed by file2 will produce + /// ordered output, so `col1 ASC` would be retained. /// - /// Note that we are checking for ordering *within* *each* file group / partition, - /// files in different partitions are read independently and do not affect each other's ordering. - /// Merging of the multiple partition streams into a single ordered stream is handled - /// upstream e.g. by `SortPreservingMergeExec`. + /// Note that we are checking for ordering *within* *each* file group / + /// partition — files in different partitions are read independently and + /// do not affect each other's ordering. Merging of the multiple + /// partition streams into a single ordered stream is handled upstream + /// e.g. by `SortPreservingMergeExec`. fn validated_output_ordering(&self) -> Vec { let schema = self.file_source.table_schema().table_schema(); validate_orderings(&self.output_ordering, schema, &self.file_groups, None) From a0a474d6c93d881845a69c6ba8841e23747f12d2 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 15 Feb 2026 11:42:14 -0500 Subject: [PATCH 4/7] lint --- datafusion/datasource/src/file_scan_config.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 3c7e8647006ca..fc2f060d31574 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1356,10 +1356,10 @@ impl DisplayAs for FileScanConfig { write!(f, "file_groups=")?; FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?; - if let Ok(schema) = self.projected_schema() { - if !schema.fields().is_empty() { - write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; - } + if let Ok(schema) = self.projected_schema() + && !schema.fields().is_empty() + { + write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; } if let Some(limit) = self.limit { From 73af179eefdb1b260d556f251046edc562e7790f Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 15 Feb 2026 13:26:37 -0500 Subject: [PATCH 5/7] fix --- datafusion/sqllogictest/test_files/encrypted_parquet.slt | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/datafusion/sqllogictest/test_files/encrypted_parquet.slt b/datafusion/sqllogictest/test_files/encrypted_parquet.slt index 42985d6b2c6f5..d580b7d1ad2b8 100644 --- a/datafusion/sqllogictest/test_files/encrypted_parquet.slt +++ b/datafusion/sqllogictest/test_files/encrypted_parquet.slt @@ -85,10 +85,5 @@ float_field float ) STORED AS PARQUET LOCATION 'test_files/scratch/encrypted_parquet/' -query RR +query error DataFusion error: Parquet error: Parquet error: Parquet file has an encrypted footer but decryption properties were not provided SELECT * FROM parquet_table ----- -1 2 -5 6 -3 4 --1 -1 From b1ca9c7f8bddef4744622092014e48afae0eec63 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 22 Feb 2026 11:03:34 +0000 Subject: [PATCH 6/7] Update datafusion/datasource/src/file_scan_config.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- datafusion/datasource/src/file_scan_config.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index fc2f060d31574..b1c4d494d1222 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1356,9 +1356,11 @@ impl DisplayAs for FileScanConfig { write!(f, "file_groups=")?; FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?; - if let Ok(schema) = self.projected_schema() - && !schema.fields().is_empty() - { + let schema = self + .projected_schema() + .map_err(|_| std::fmt::Error)?; + + if !schema.fields().is_empty() { write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; } From 9ceb271a8dc83520757ff7ab3a130cd3c02ff555 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 22 Feb 2026 11:11:53 +0000 Subject: [PATCH 7/7] Apply suggestion from @adriangb --- datafusion/datasource/src/file_scan_config.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index b1c4d494d1222..ef5b6497e3835 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1356,10 +1356,7 @@ impl DisplayAs for FileScanConfig { write!(f, "file_groups=")?; FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?; - let schema = self - .projected_schema() - .map_err(|_| std::fmt::Error)?; - + let schema = self.projected_schema().map_err(|_| std::fmt::Error)?; if !schema.fields().is_empty() { write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; }