From 60d5a81b11ff4a4bf3854e573145193d7c57f643 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Mon, 23 Feb 2026 17:37:46 -0500 Subject: [PATCH 01/11] DuckDB to use the Scan API Signed-off-by: Nicholas Gates --- vortex-duckdb/src/datasource.rs | 509 ++++++++++++++++++++++ vortex-duckdb/src/lib.rs | 16 +- vortex-duckdb/src/multi_file.rs | 61 +++ vortex-duckdb/src/scan.rs | 730 -------------------------------- vortex-duckdb/src/scan_api.rs | 332 --------------- 5 files changed, 575 insertions(+), 1073 deletions(-) create mode 100644 vortex-duckdb/src/datasource.rs create mode 100644 vortex-duckdb/src/multi_file.rs delete mode 100644 vortex-duckdb/src/scan.rs delete mode 100644 vortex-duckdb/src/scan_api.rs diff --git a/vortex-duckdb/src/datasource.rs b/vortex-duckdb/src/datasource.rs new file mode 100644 index 00000000000..cc27e7cb241 --- /dev/null +++ b/vortex-duckdb/src/datasource.rs @@ -0,0 +1,509 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Reusable logic for driving a [`DataSourceRef`] scan through DuckDB's table function interface. +//! +//! Table functions that resolve to a [`DataSourceRef`] can implement [`DataSourceTableFunction`] +//! to get a blanket [`TableFunction`] implementation covering init, scan, progress, filter +//! pushdown, cardinality, partitioning, and virtual columns. + +use std::ffi::CString; +use std::fmt::Debug; +use std::sync::Arc; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; + +use custom_labels::CURRENT_LABELSET; +use futures::StreamExt; +use futures::TryStreamExt; +use itertools::Itertools; +use num_traits::AsPrimitive; +use vortex::VortexSessionDefault; +use vortex::array::ArrayRef; +use vortex::array::Canonical; +use vortex::array::ExecutionCtx; +use vortex::array::arrays::ScalarFnVTable; +use vortex::array::arrays::StructArray; +use vortex::array::arrays::StructVTable; +use vortex::array::optimizer::ArrayOptimizer; +use vortex::dtype::DType; +use vortex::dtype::FieldNames; +use vortex::error::VortexExpect; +use vortex::error::VortexResult; +use vortex::error::vortex_err; +use vortex::expr::Expression; +use vortex::expr::Pack; +use vortex::expr::and_collect; +use vortex::expr::col; +use vortex::expr::root; +use vortex::expr::select; +use vortex::expr::stats::Precision; +use vortex::io::runtime::BlockingRuntime; +use vortex::io::runtime::current::ThreadSafeIterator; +use vortex::metrics::tracing::get_global_labels; +use vortex::scan::api::DataSourceRef; +use vortex::scan::api::ScanRequest; +use vortex::session::VortexSession; +use vortex_utils::aliases::hash_set::HashSet; + +use crate::RUNTIME; +use crate::convert::try_from_bound_expression; +use crate::convert::try_from_table_filter; +use crate::duckdb::BindInputRef; +use crate::duckdb::BindResultRef; +use crate::duckdb::Cardinality; +use crate::duckdb::ClientContextRef; +use crate::duckdb::DataChunkRef; +use crate::duckdb::ExpressionRef; +use crate::duckdb::LogicalType; +use crate::duckdb::TableFilterSetRef; +use crate::duckdb::TableFunction; +use crate::duckdb::TableInitInput; +use crate::duckdb::VirtualColumnsResultRef; +use crate::exporter::ArrayExporter; +use crate::exporter::ConversionCache; + +// taken from duckdb/common/constants.h COLUMN_IDENTIFIER_EMPTY +// This is used by duckdb whenever there is no projection id in a logical_get node. +// For some reason we cannot return an empty DataChunk and duckdb will look for the virtual column +// with this index and create a data chunk with a single vector of that type. +static EMPTY_COLUMN_IDX: u64 = 18446744073709551614; +static EMPTY_COLUMN_NAME: &str = ""; + +/// A trait for table functions that resolve to a [`DataSourceRef`]. +/// +/// Implementors only need to define how parameters are declared and how binding produces a +/// data source. All other [`TableFunction`] methods (init, scan, progress, filter pushdown, +/// cardinality, partitioning, virtual columns) are provided by a blanket implementation. +pub(crate) trait DataSourceTableFunction: Sized + Debug { + /// Returns the positional parameters of the table function. + fn parameters() -> Vec { + vec![] + } + + /// Returns the named parameters of the table function, if any. + fn named_parameters() -> Vec<(CString, LogicalType)> { + vec![] + } + + /// Bind the table function and return a [`DataSourceRef`]. + fn bind(ctx: &ClientContextRef, input: &BindInputRef) -> VortexResult; +} + +/// Bind data produced by a [`DataSourceTableFunction`]. +pub struct DataSourceBindData { + data_source: DataSourceRef, + filter_exprs: Vec, + column_names: Vec, + column_types: Vec, +} + +impl Clone for DataSourceBindData { + fn clone(&self) -> Self { + Self { + data_source: self.data_source.clone(), + // filter_exprs are consumed once in `init_global`. + filter_exprs: vec![], + column_names: self.column_names.clone(), + column_types: self.column_types.clone(), + } + } +} + +impl Debug for DataSourceBindData { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DataSourceBindData") + .field("column_names", &self.column_names) + .field("column_types", &self.column_types) + .field( + "filter_exprs", + &self + .filter_exprs + .iter() + .map(|e| e.to_string()) + .collect_vec(), + ) + .finish() + } +} + +/// Global scan state for driving a `DataSource` scan through DuckDB. +pub struct DataSourceGlobal { + iterator: ThreadSafeIterator)>>, + batch_id: AtomicU64, + ctx: ExecutionCtx, + bytes_total: Arc, + bytes_read: AtomicU64, +} + +/// Per-thread local scan state. +pub struct DataSourceLocal { + iterator: ThreadSafeIterator)>>, + exporter: Option, + /// The unique batch id of the last chunk exported via scan(). + batch_id: Option, +} + +/// Returns scan progress as a percentage (0.0–100.0). +fn progress(bytes_read: &AtomicU64, bytes_total: &AtomicU64) -> f64 { + let read = bytes_read.load(Ordering::Relaxed); + let mut total = bytes_total.load(Ordering::Relaxed); + total += (total == 0) as u64; + read as f64 / total as f64 * 100. +} + +// --------------------------------------------------------------------------- +// Blanket TableFunction implementation for any DataSourceTableFunction +// --------------------------------------------------------------------------- + +impl TableFunction for T { + type BindData = DataSourceBindData; + type GlobalState = DataSourceGlobal; + type LocalState = DataSourceLocal; + + const PROJECTION_PUSHDOWN: bool = true; + const FILTER_PUSHDOWN: bool = true; + const FILTER_PRUNE: bool = true; + + fn parameters() -> Vec { + T::parameters() + } + + fn named_parameters() -> Vec<(CString, LogicalType)> { + T::named_parameters() + } + + fn bind( + ctx: &ClientContextRef, + input: &BindInputRef, + result: &mut BindResultRef, + ) -> VortexResult { + let data_source = T::bind(ctx, input)?; + + let (column_names, column_types) = extract_schema_from_dtype(data_source.dtype())?; + + for (column_name, column_type) in column_names.iter().zip(&column_types) { + result.add_result_column(column_name, column_type); + } + + Ok(DataSourceBindData { + data_source, + filter_exprs: vec![], + column_names, + column_types, + }) + } + + fn init_global(init_input: &TableInitInput) -> VortexResult { + let bind_data = init_input.bind_data(); + let projection_ids = init_input.projection_ids().unwrap_or(&[]); + let column_ids = init_input.column_ids(); + + let projection_expr = + extract_projection_expr(projection_ids, column_ids, &bind_data.column_names); + let filter_expr = extract_table_filter_expr( + init_input.table_filter_set(), + column_ids, + &bind_data.column_names, + &bind_data.filter_exprs, + bind_data.data_source.dtype(), + )?; + + tracing::debug!( + "Global init Vortex scan SELECT {} WHERE {}", + &projection_expr, + filter_expr + .as_ref() + .map_or_else(|| "true".to_string(), |f| f.to_string()) + ); + + let request = ScanRequest { + projection: Some(projection_expr), + filter: filter_expr, + ..Default::default() + }; + + let scan = RUNTIME.block_on(bind_data.data_source.scan(request))?; + let conversion_cache = Arc::new(ConversionCache::new(0)); + + let num_workers = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1); + + // Each split.execute() returns a lazy stream whose early polls do preparation + // work (expression resolution, layout traversal, first I/O spawns). We use + // try_flatten_unordered to poll multiple split streams concurrently so that + // the next split is already warm when the current one finishes. + let scan_streams = scan.partitions().map(move |split_result| { + let cache = conversion_cache.clone(); + let split = split_result?; + let s = split.execute()?; + VortexResult::Ok(s.map(move |r| Ok((r?, cache.clone()))).boxed()) + }); + + let iterator = RUNTIME.block_on_stream_thread_safe(move |_| { + scan_streams.try_flatten_unordered(Some(num_workers * 2)) + }); + + Ok(DataSourceGlobal { + iterator, + batch_id: AtomicU64::new(0), + ctx: ExecutionCtx::new(VortexSession::default()), + bytes_total: Arc::new(AtomicU64::new(0)), + bytes_read: AtomicU64::new(0), + }) + } + + fn init_local( + _init: &TableInitInput, + global: &mut Self::GlobalState, + ) -> VortexResult { + unsafe { + use custom_labels::sys; + + if sys::current().is_null() { + let ls = sys::new(0); + sys::replace(ls); + }; + } + + let global_labels = get_global_labels(); + + for (key, value) in global_labels { + CURRENT_LABELSET.set(key, value); + } + + Ok(DataSourceLocal { + iterator: global.iterator.clone(), + exporter: None, + batch_id: None, + }) + } + + fn scan( + _client_context: &ClientContextRef, + _bind_data: &Self::BindData, + local_state: &mut Self::LocalState, + global_state: &mut Self::GlobalState, + chunk: &mut DataChunkRef, + ) -> VortexResult<()> { + loop { + if local_state.exporter.is_none() { + let Some(result) = local_state.iterator.next() else { + return Ok(()); + }; + let (array_result, conversion_cache) = result?; + + let array_result = array_result.optimize_recursive()?; + let array_result = if let Some(array) = array_result.as_opt::() { + array.clone() + } else if let Some(array) = array_result.as_opt::() + && let Some(pack_options) = array.scalar_fn().as_opt::() + { + StructArray::new( + pack_options.names.clone(), + array.children(), + array.len(), + pack_options.nullability.into(), + ) + } else { + array_result + .execute::(&mut global_state.ctx)? + .into_struct() + }; + + local_state.exporter = Some(ArrayExporter::try_new( + &array_result, + &conversion_cache, + &mut global_state.ctx, + )?); + // Relaxed since there is no intra-instruction ordering required. + local_state.batch_id = + Some(global_state.batch_id.fetch_add(1, Ordering::Relaxed)); + } + + let exporter = local_state + .exporter + .as_mut() + .vortex_expect("error: exporter missing"); + + let has_more_data = exporter.export(chunk)?; + global_state + .bytes_read + .fetch_add(chunk.len(), Ordering::Relaxed); + + if !has_more_data { + // This exporter is fully consumed. + local_state.exporter = None; + local_state.batch_id = None; + } else { + break; + } + } + + assert!(!chunk.is_empty()); + + Ok(()) + } + + fn table_scan_progress( + _client_context: &ClientContextRef, + _bind_data: &mut Self::BindData, + global_state: &mut Self::GlobalState, + ) -> f64 { + progress(&global_state.bytes_read, &global_state.bytes_total) + } + + fn pushdown_complex_filter( + bind_data: &mut Self::BindData, + expr: &ExpressionRef, + ) -> VortexResult { + tracing::debug!("Attempting to push down filter expression: {expr}"); + let Some(expr) = try_from_bound_expression(expr)? else { + return Ok(false); + }; + bind_data.filter_exprs.push(expr); + + // NOTE(ngates): Vortex does indeed run exact filters, so in theory we should return `true` + // here to tell DuckDB we've handled the filter. However, DuckDB applies some crude + // cardinality estimation heuristics (e.g. an equality filter => 20% selectivity) that + // means by returning false, DuckDB runs an additional filter (a little bit of overhead) + // but tends to end up with a better query plan. + // If we plumb row count estimation into the layout tree, perhaps we could use zone maps + // etc. to return estimates. But this function is probably called too late anyway. Maybe + // we need our own cardinality heuristics. + Ok(false) + } + + fn cardinality(bind_data: &Self::BindData) -> Cardinality { + match bind_data.data_source.row_count() { + Some(Precision::Exact(v)) => Cardinality::Maximum(v), + Some(Precision::Inexact(v)) => Cardinality::Estimate(v), + None => Cardinality::Unknown, + } + } + + fn partition_data( + _bind_data: &Self::BindData, + _global_init_data: &mut Self::GlobalState, + local_init_data: &mut Self::LocalState, + ) -> VortexResult { + local_init_data + .batch_id + .ok_or_else(|| vortex_err!("batch id missing, no batches exported")) + } + + fn to_string(bind_data: &Self::BindData) -> Option> { + let mut result = Vec::new(); + + result.push(("Function".to_string(), "Vortex Scan".to_string())); + + if !bind_data.filter_exprs.is_empty() { + let mut filters = bind_data.filter_exprs.iter().map(|f| format!("{}", f)); + result.push(("Filters".to_string(), filters.join(" /\\\n"))); + } + + Some(result) + } + + fn virtual_columns(_bind_data: &Self::BindData, result: &mut VirtualColumnsResultRef) { + result.register(EMPTY_COLUMN_IDX, EMPTY_COLUMN_NAME, &LogicalType::bool()); + } +} + +// --------------------------------------------------------------------------- +// Helper functions +// --------------------------------------------------------------------------- + +/// Extracts DuckDB column names and logical types from a Vortex struct DType. +fn extract_schema_from_dtype(dtype: &DType) -> VortexResult<(Vec, Vec)> { + let struct_dtype = dtype + .as_struct_fields_opt() + .ok_or_else(|| vortex_err!("Vortex file must contain a struct array at the top level"))?; + + let mut column_names = Vec::new(); + let mut column_types = Vec::new(); + + for (field_name, field_dtype) in struct_dtype.names().iter().zip(struct_dtype.fields()) { + let logical_type = LogicalType::try_from(&field_dtype)?; + column_names.push(field_name.to_string()); + column_types.push(logical_type); + } + + Ok((column_names, column_types)) +} + +/// Creates a projection expression from raw projection/column ID slices and column names. +fn extract_projection_expr( + projection_ids: &[u64], + column_ids: &[u64], + column_names: &[String], +) -> Expression { + select( + projection_ids + .iter() + .map(|p| { + let idx: usize = p.as_(); + let val: usize = column_ids[idx].as_(); + val + }) + .map(|idx| { + column_names + .get(idx) + .vortex_expect("prune idx in column names") + }) + .map(|s| Arc::from(s.as_str())) + .collect::(), + root(), + ) +} + +/// Creates a table filter expression from the table filter set, column metadata, additional +/// filter expressions, and the top-level DType. +fn extract_table_filter_expr( + table_filter_set: Option<&TableFilterSetRef>, + column_ids: &[u64], + column_names: &[String], + additional_filters: &[Expression], + dtype: &DType, +) -> VortexResult> { + let mut table_filter_exprs: HashSet = if let Some(filter) = table_filter_set { + filter + .into_iter() + .map(|(idx, ex)| { + let idx_u: usize = idx.as_(); + let col_idx: usize = column_ids[idx_u].as_(); + let name = column_names.get(col_idx).vortex_expect("exists"); + try_from_table_filter(ex, &col(name.as_str()), dtype) + }) + .collect::>>>()? + .unwrap_or_else(HashSet::new) + } else { + HashSet::new() + }; + + table_filter_exprs.extend(additional_filters.iter().cloned()); + Ok(and_collect(table_filter_exprs.into_iter().collect_vec())) +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::AtomicU64; + use std::sync::atomic::Ordering::Relaxed; + + use super::progress; + + #[test] + fn test_table_scan_progress() { + let bytes_total = AtomicU64::new(100); + let bytes_read = AtomicU64::new(0); + + assert_eq!(progress(&bytes_read, &bytes_total), 0.0); + + bytes_read.fetch_add(100, Relaxed); + assert_eq!(progress(&bytes_read, &bytes_total), 100.); + + bytes_total.fetch_add(100, Relaxed); + assert!((progress(&bytes_read, &bytes_total) - 50.).abs() < f64::EPSILON); + } +} diff --git a/vortex-duckdb/src/lib.rs b/vortex-duckdb/src/lib.rs index a00c61fe319..5e321fa2040 100644 --- a/vortex-duckdb/src/lib.rs +++ b/vortex-duckdb/src/lib.rs @@ -20,15 +20,14 @@ use crate::duckdb::Database; use crate::duckdb::DatabaseRef; use crate::duckdb::LogicalType; use crate::duckdb::Value; -use crate::scan::VortexTableFunction; -use crate::scan_api::VortexScanApiTableFunction; +use crate::multi_file::VortexMultiFileScan; mod convert; +mod datasource; pub mod duckdb; mod exporter; mod filesystem; -mod scan; -mod scan_api; +mod multi_file; #[rustfmt::skip] #[path = "./cpp.rs"] @@ -55,13 +54,8 @@ pub fn initialize(db: &DatabaseRef) -> VortexResult<()> { LogicalType::varchar(), Value::from("vortex"), )?; - if std::env::var("VORTEX_USE_SCAN_API").is_ok_and(|v| v == "1") { - db.register_table_function::(c"vortex_scan")?; - db.register_table_function::(c"read_vortex")?; - } else { - db.register_table_function::(c"vortex_scan")?; - db.register_table_function::(c"read_vortex")?; - } + db.register_table_function::(c"vortex_scan")?; + db.register_table_function::(c"read_vortex")?; db.register_copy_function::(c"vortex", c"vortex") } diff --git a/vortex-duckdb/src/multi_file.rs b/vortex-duckdb/src/multi_file.rs new file mode 100644 index 00000000000..3ce2ed3856e --- /dev/null +++ b/vortex-duckdb/src/multi_file.rs @@ -0,0 +1,61 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::path::Path; +use std::sync::Arc; + +use url::Url; +use vortex::error::VortexResult; +use vortex::error::vortex_err; +use vortex::file::multi::MultiFileDataSource; +use vortex::io::runtime::BlockingRuntime; +use vortex::scan::api::DataSourceRef; + +use crate::RUNTIME; +use crate::SESSION; +use crate::datasource::DataSourceTableFunction; +use crate::duckdb::BindInputRef; +use crate::duckdb::ClientContextRef; +use crate::duckdb::LogicalType; +use crate::filesystem::resolve_filesystem; + +/// Vortex multi-file scan table function (`vortex_scan` / `read_vortex`). +/// +/// Takes a file glob parameter and resolves it into a [`MultiFileDataSource`]. +/// All other table function logic is provided by the blanket [`DataSourceTableFunction`] +/// implementation. +#[derive(Debug)] +pub struct VortexMultiFileScan; + +impl DataSourceTableFunction for VortexMultiFileScan { + fn parameters() -> Vec { + vec![LogicalType::varchar()] + } + + fn bind(ctx: &ClientContextRef, input: &BindInputRef) -> VortexResult { + let glob_url_parameter = input + .get_parameter(0) + .ok_or_else(|| vortex_err!("Missing file glob parameter"))?; + + // Parse the URL and separate the base URL (keep scheme, host, etc.) from the path. + let glob_url_str = glob_url_parameter.as_string(); + let glob_url = match Url::parse(glob_url_str.as_str()) { + Ok(url) => Ok(url), + Err(_) => Url::from_file_path(Path::new(glob_url_str.as_str())) + .map_err(|_| vortex_err!("Neither URL nor path: '{}' ", glob_url_str.as_str())), + }?; + + let mut base_url = glob_url.clone(); + base_url.set_path(""); + + let fs = resolve_filesystem(&base_url, ctx)?; + + RUNTIME.block_on(async { + let builder = MultiFileDataSource::new(SESSION.clone()) + .with_filesystem(fs) + .with_glob(glob_url.path()); + let ds = builder.build().await?; + VortexResult::Ok(Arc::new(ds) as DataSourceRef) + }) + } +} diff --git a/vortex-duckdb/src/scan.rs b/vortex-duckdb/src/scan.rs deleted file mode 100644 index 58e3a90fd4c..00000000000 --- a/vortex-duckdb/src/scan.rs +++ /dev/null @@ -1,730 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use std::cmp::max; -use std::ffi::CString; -use std::fmt; -use std::fmt::Debug; -use std::fmt::Formatter; -use std::path::Path; -use std::pin::Pin; -use std::sync::Arc; -use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering; -use std::task::Context; -use std::task::Poll; - -use custom_labels::CURRENT_LABELSET; -use futures::FutureExt; -use futures::Stream; -use futures::StreamExt; -use futures::stream; -use futures::stream::BoxStream; -use futures::stream::SelectAll; -use itertools::Itertools; -use num_traits::AsPrimitive; -use url::Url; -use vortex::VortexSessionDefault; -use vortex::array::ArrayRef; -use vortex::array::Canonical; -use vortex::array::ExecutionCtx; -use vortex::array::arrays::ScalarFnVTable; -use vortex::array::arrays::StructArray; -use vortex::array::arrays::StructVTable; -use vortex::array::optimizer::ArrayOptimizer; -use vortex::dtype::FieldNames; -use vortex::error::VortexExpect; -use vortex::error::VortexResult; -use vortex::error::vortex_bail; -use vortex::error::vortex_err; -use vortex::expr::Expression as VortexExpression; -use vortex::expr::Pack; -use vortex::expr::and_collect; -use vortex::expr::col; -use vortex::expr::root; -use vortex::expr::select; -use vortex::file::OpenOptionsSessionExt; -use vortex::file::VortexFile; -use vortex::io::filesystem::FileListing; -use vortex::io::filesystem::FileSystemRef; -use vortex::io::runtime::BlockingRuntime; -use vortex::io::runtime::current::ThreadSafeIterator; -use vortex::metrics::tracing::get_global_labels; -use vortex::session::VortexSession; -use vortex_utils::aliases::hash_set::HashSet; - -use crate::RUNTIME; -use crate::SESSION; -use crate::convert::try_from_bound_expression; -use crate::convert::try_from_table_filter; -use crate::duckdb::BindInputRef; -use crate::duckdb::BindResultRef; -use crate::duckdb::Cardinality; -use crate::duckdb::ClientContextRef; -use crate::duckdb::DataChunkRef; -use crate::duckdb::ExpressionRef; -use crate::duckdb::ExtractedValue; -use crate::duckdb::LogicalType; -use crate::duckdb::TableFunction; -use crate::duckdb::TableInitInput; -use crate::duckdb::VirtualColumnsResultRef; -use crate::duckdb::footer_cache::FooterCache; -use crate::exporter::ArrayExporter; -use crate::exporter::ConversionCache; -use crate::filesystem::resolve_filesystem; - -pub struct VortexBindData { - file_system: FileSystemRef, - first_file: VortexFile, - filter_exprs: Vec, - files: Vec, - column_names: Vec, - column_types: Vec, -} - -impl Clone for VortexBindData { - /// `VortexBindData` is cloned in case of multiple scan nodes. - fn clone(&self) -> Self { - Self { - file_system: self.file_system.clone(), - first_file: self.first_file.clone(), - // filter_expr don't need to be cloned as they are consumed once in `init_global`. - filter_exprs: vec![], - files: self.files.clone(), - column_names: self.column_names.clone(), - column_types: self.column_types.clone(), - } - } -} - -impl Debug for VortexBindData { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.debug_struct("VortexBindData") - .field("file_system", &self.file_system) - .field("file_urls", &self.files) - .field("column_names", &self.column_names) - .field("column_types", &self.column_types) - .field("filter_expr", &self.filter_exprs) - .finish() - } -} - -pub struct VortexGlobalData { - iterator: ThreadSafeIterator)>>, - batch_id: AtomicU64, - ctx: ExecutionCtx, - bytes_total: Arc, - bytes_read: AtomicU64, -} - -impl VortexGlobalData { - pub(crate) fn new( - iterator: ThreadSafeIterator)>>, - ) -> Self { - Self { - iterator, - batch_id: AtomicU64::new(0), - ctx: ExecutionCtx::new(VortexSession::default()), - bytes_total: Arc::new(AtomicU64::new(0)), - bytes_read: AtomicU64::new(0), - } - } - - pub fn progress(&self) -> f64 { - let read = self.bytes_read.load(Ordering::Relaxed); - let mut total = self.bytes_total.load(Ordering::Relaxed); - total += (total == 0) as u64; - read as f64 / total as f64 * 100. // return 100. when nothing is read - } - - pub(crate) fn new_local(&self) -> VortexLocalData { - VortexLocalData { - iterator: self.iterator.clone(), - exporter: None, - batch_id: None, - } - } - - /// Shared scan logic: pulls arrays from the thread-safe iterator, converts them to struct - /// arrays, and exports them into DuckDB data chunks. - pub(crate) fn scan( - &mut self, - local_state: &mut VortexLocalData, - chunk: &mut DataChunkRef, - ) -> VortexResult<()> { - loop { - if local_state.exporter.is_none() { - let Some(result) = local_state.iterator.next() else { - return Ok(()); - }; - let (array_result, conversion_cache) = result?; - - let array_result = array_result.optimize_recursive()?; - let array_result = if let Some(array) = array_result.as_opt::() { - array.clone() - } else if let Some(array) = array_result.as_opt::() - && let Some(pack_options) = array.scalar_fn().as_opt::() - { - StructArray::new( - pack_options.names.clone(), - array.children(), - array.len(), - pack_options.nullability.into(), - ) - } else { - array_result - .execute::(&mut self.ctx)? - .into_struct() - }; - - local_state.exporter = Some(ArrayExporter::try_new( - &array_result, - &conversion_cache, - &mut self.ctx, - )?); - // Relaxed since there is no intra-instruction ordering required. - local_state.batch_id = Some(self.batch_id.fetch_add(1, Ordering::Relaxed)); - } - - let exporter = local_state - .exporter - .as_mut() - .vortex_expect("error: exporter missing"); - - let has_more_data = exporter.export(chunk)?; - self.bytes_read.fetch_add(chunk.len(), Ordering::Relaxed); - - if !has_more_data { - // This exporter is fully consumed. - local_state.exporter = None; - local_state.batch_id = None; - } else { - break; - } - } - - assert!(!chunk.is_empty()); - - Ok(()) - } - - pub(crate) fn partition_data(local_state: &VortexLocalData) -> VortexResult { - local_state - .batch_id - .ok_or_else(|| vortex_err!("batch id missing, no batches exported")) - } -} - -pub struct VortexLocalData { - iterator: ThreadSafeIterator)>>, - exporter: Option, - // The unique batch id the of the last chunk exported via scan() - batch_id: Option, -} - -#[derive(Debug)] -pub struct VortexTableFunction; - -/// Extracts DuckDB column names and logical types from a Vortex struct DType. -pub(crate) fn extract_schema_from_dtype( - dtype: &vortex::dtype::DType, -) -> VortexResult<(Vec, Vec)> { - let struct_dtype = dtype - .as_struct_fields_opt() - .ok_or_else(|| vortex_err!("Vortex file must contain a struct array at the top level"))?; - - let mut column_names = Vec::new(); - let mut column_types = Vec::new(); - - for (field_name, field_dtype) in struct_dtype.names().iter().zip(struct_dtype.fields()) { - let logical_type = LogicalType::try_from(&field_dtype)?; - column_names.push(field_name.to_string()); - column_types.push(logical_type); - } - - Ok((column_names, column_types)) -} - -/// Creates a projection expression based on the table initialization input. -fn extract_projection_expr(init: &TableInitInput) -> VortexExpression { - let projection_ids = init.projection_ids().unwrap_or(&[]); - let column_ids = init.column_ids(); - - select( - projection_ids - .iter() - .map(|p| { - let idx: usize = p.as_(); - let val: usize = column_ids[idx].as_(); - val - }) - .map(|idx| { - init.bind_data() - .column_names - .get(idx) - .vortex_expect("prune idx in column names") - }) - .map(|s| Arc::from(s.as_str())) - .collect::(), - root(), - ) -} - -/// Creates a table filter expression from the table filter set. -fn extract_table_filter_expr( - init: &TableInitInput, - column_ids: &[u64], -) -> VortexResult> { - let mut table_filter_exprs: HashSet = if let Some(filter) = - init.table_filter_set() - { - filter - .into_iter() - .map(|(idx, ex)| { - let idx_u: usize = idx.as_(); - let col_idx: usize = column_ids[idx_u].as_(); - let name = init - .bind_data() - .column_names - .get(col_idx) - .vortex_expect("exists"); - try_from_table_filter(ex, &col(name.as_str()), init.bind_data().first_file.dtype()) - }) - .collect::>>>()? - .unwrap_or_else(HashSet::new) - } else { - HashSet::new() - }; - - table_filter_exprs.extend(init.bind_data().filter_exprs.clone()); - Ok(and_collect(table_filter_exprs.into_iter().collect_vec())) -} - -// taken from duckdb/common/constants.h COLUMN_IDENTIFIER_EMPTY -// This is used by duckdb whenever there is no projection id in a logical_get node. -// For some reason we cannot return an empty DataChunk and duckdb will look for the virtual column -// with this index and create a data chunk with a single vector of that type. -pub(crate) static EMPTY_COLUMN_IDX: u64 = 18446744073709551614; -pub(crate) static EMPTY_COLUMN_NAME: &str = ""; - -/// Shared local state initialization for both `VortexTableFunction` and `VortexScanApiTableFunction`. -pub(crate) fn init_local_shared(global: &mut VortexGlobalData) -> VortexResult { - unsafe { - use custom_labels::sys; - - if sys::current().is_null() { - let ls = sys::new(0); - sys::replace(ls); - }; - } - - let global_labels = get_global_labels(); - - for (key, value) in global_labels { - CURRENT_LABELSET.set(key, value); - } - - Ok(global.new_local()) -} - -impl TableFunction for VortexTableFunction { - type BindData = VortexBindData; - type GlobalState = VortexGlobalData; - type LocalState = VortexLocalData; - - const PROJECTION_PUSHDOWN: bool = true; - const FILTER_PUSHDOWN: bool = true; - const FILTER_PRUNE: bool = true; - - /// Input parameter types of the `vortex_scan` table function. - /// - // `vortex_scan` takes a single file glob parameter. - fn parameters() -> Vec { - vec![LogicalType::varchar()] - } - - fn bind( - ctx: &ClientContextRef, - input: &BindInputRef, - result: &mut BindResultRef, - ) -> VortexResult { - let glob_url_parameter = input - .get_parameter(0) - .ok_or_else(|| vortex_err!("Missing file glob parameter"))?; - - // Parse the URL and separate the base URL (keep scheme, host, etc.) from the path. - let glob_url_str = glob_url_parameter.as_string(); - let glob_url = match Url::parse(glob_url_str.as_str()) { - Ok(url) => url, - Err(_) => { - // Otherwise, we assume it's a file path. - let path = if !glob_url_str.as_str().starts_with("/") { - // We cannot use Path::canonicalize to resolve relative paths since it requires the file to exist, and the glob may contain wildcards. Instead, we resolve relative paths against the current working directory. - let current_dir = std::env::current_dir().map_err(|e| { - vortex_err!( - "Cannot get current working directory to resolve relative path {}: {}", - glob_url_str.as_str(), - e - ) - })?; - current_dir.join(glob_url_str.as_str()) - } else { - Path::new(glob_url_str.as_str()).to_path_buf() - }; - - Url::from_file_path(path).map_err(|_| { - vortex_err!("Cannot convert path to URL: {}", glob_url_str.as_str()) - })? - } - }; - - let mut base_url = glob_url.clone(); - base_url.set_path(""); - - let fs: FileSystemRef = resolve_filesystem(&base_url, ctx)?; - - // Read the vortex_max_threads setting from DuckDB configuration - let max_threads_cstr = CString::new("vortex_max_threads") - .map_err(|e| vortex_err!("Invalid setting name: {}", e))?; - let max_threads = ctx - .try_get_current_setting(&max_threads_cstr) - .and_then(|v| match v.extract() { - ExtractedValue::UBigInt(val) => usize::try_from(val).ok(), - ExtractedValue::BigInt(val) if val > 0 => usize::try_from(val as u64).ok(), - _ => None, - }) - .unwrap_or_else(|| { - std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1) - }); - - tracing::trace!("running scan with max_threads {max_threads}"); - - let glob_pattern = glob_url - .path() - .strip_prefix("/") - .unwrap_or_else(|| glob_url.path()); - let files: Vec = RUNTIME - .block_on_stream(fs.glob(glob_pattern)?) - .try_collect()?; - - // The first file is skipped in `create_file_paths_queue`. - let Some(first_file_listing) = files.first() else { - vortex_bail!("No files matched the glob"); - }; - - let footer_cache = FooterCache::new(ctx.object_cache()); - let entry = footer_cache.entry(&first_file_listing.path); - let fs2 = fs.clone(); - let first_file = RUNTIME.block_on(async move { - let options = entry - .apply_to_file(SESSION.open_options()) - .with_some_file_size(first_file_listing.size); - let read_at = fs2.open_read(&first_file_listing.path).await?; - let file = options.open(read_at).await?; - entry.put_if_absent(|| file.footer().clone()); - VortexResult::Ok(file) - })?; - - let (column_names, column_types) = extract_schema_from_dtype(first_file.dtype())?; - - // Add result columns based on the extracted schema. - for (column_name, column_type) in column_names.iter().zip(&column_types) { - result.add_result_column(column_name, column_type); - } - - Ok(VortexBindData { - file_system: fs, - files, - first_file, - filter_exprs: vec![], - column_names, - column_types, - }) - } - - fn scan( - _client_context: &ClientContextRef, - _bind_data: &Self::BindData, - local_state: &mut Self::LocalState, - global_state: &mut Self::GlobalState, - chunk: &mut DataChunkRef, - ) -> VortexResult<()> { - global_state.scan(local_state, chunk) - } - - fn init_global(init_input: &TableInitInput) -> VortexResult { - let bind_data = init_input.bind_data(); - let projection_expr = extract_projection_expr(init_input); - let filter_expr = extract_table_filter_expr(init_input, init_input.column_ids())?; - - tracing::trace!( - "Global init Vortex scan SELECT {} WHERE {}", - &projection_expr, - filter_expr - .as_ref() - .map_or_else(|| "true".to_string(), |f| f.to_string()) - ); - - let client_context = init_input.client_context()?; - // SAFETY: The ObjectCache is owned by the DatabaseInstance and lives as long as the - // database. DuckDB keeps the database alive for the duration of any query execution. - let object_cache = unsafe { client_context.object_cache().erase_lifetime() }; - - let num_workers = std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1); - - let bytes_total = Arc::new(AtomicU64::new(0)); - let bytes_total_copy = bytes_total.clone(); - - let handle = RUNTIME.handle(); - let fs = bind_data.file_system.clone(); - let first_file = bind_data.first_file.clone(); - let scan_streams = stream::iter(bind_data.files.clone()) - .enumerate() - .map(move |(idx, file_listing)| { - let fs = fs.clone(); - let first_file = first_file.clone(); - let filter_expr = filter_expr.clone(); - let projection_expr = projection_expr.clone(); - let conversion_cache = Arc::new(ConversionCache::new(idx as u64)); - let object_cache = object_cache; - - let bytes_total = bytes_total_copy.clone(); - handle - .spawn(async move { - let vxf = if idx == 0 { - // The first path from `file_paths` is skipped as - // the first file was already opened during bind. - Ok(first_file) - } else { - let cache = FooterCache::new(object_cache); - let entry = cache.entry(&file_listing.path); - let file = entry - .apply_to_file(SESSION.open_options()) - .with_some_file_size(file_listing.size) - .open(fs.open_read(&file_listing.path).await?) - .await?; - entry.put_if_absent(|| file.footer().clone()); - VortexResult::Ok(file) - }?; - - if let Some(ref filter) = filter_expr - && vxf.can_prune(filter)? - { - return Ok(None); - }; - let scan = vxf - .scan()? - .with_some_filter(filter_expr) - .with_projection(projection_expr) - .with_ordered(false) - .map(move |split| { - bytes_total.fetch_add(split.len() as u64, Ordering::Relaxed); - Ok((split, conversion_cache.clone())) - }) - .into_stream()? - .boxed(); - - Ok(Some(scan)) - }) - .boxed() - }) - // Open up to num_workers * 2 files concurrently so we always have one ready to go. - .buffer_unordered(num_workers * 2) - .filter_map(|result| async move { result.transpose() }); - - Ok(VortexGlobalData { - iterator: RUNTIME.block_on_stream_thread_safe(move |_| MultiScan { - streams: scan_streams.boxed(), - streams_finished: false, - select_all: Default::default(), - max_concurrency: num_workers * 2, - }), - batch_id: AtomicU64::new(0), - // TODO(joe): fetch this from somewhere??. - ctx: ExecutionCtx::new(VortexSession::default()), - bytes_read: AtomicU64::new(0), - bytes_total, - }) - } - - fn init_local( - _init: &TableInitInput, - global: &mut Self::GlobalState, - ) -> VortexResult { - init_local_shared(global) - } - - fn table_scan_progress( - _client_context: &ClientContextRef, - _bind_data: &mut Self::BindData, - global_state: &mut Self::GlobalState, - ) -> f64 { - global_state.progress() - } - - fn pushdown_complex_filter( - bind_data: &mut Self::BindData, - expr: &ExpressionRef, - ) -> VortexResult { - let Some(expr) = try_from_bound_expression(expr)? else { - return Ok(false); - }; - bind_data.filter_exprs.push(expr); - // It seems like there is a regression in the DuckDB planner we actually delete filters?? - // TODO(joe): file and issue and fix. - Ok(false) - } - - fn cardinality(bind_data: &Self::BindData) -> Cardinality { - if bind_data.files.len() == 1 { - Cardinality::Maximum(bind_data.first_file.row_count()) - } else { - // This is the same behavior as DuckDB's Parquet extension, although we could - // test multiplying the row count by the number of files. - Cardinality::Estimate( - max(bind_data.first_file.row_count(), 1) * bind_data.files.len() as u64, - ) - } - } - - fn partition_data( - _bind_data: &Self::BindData, - _global_init_data: &mut Self::GlobalState, - local_init_data: &mut Self::LocalState, - ) -> VortexResult { - VortexGlobalData::partition_data(local_init_data) - } - - fn to_string(bind_data: &Self::BindData) -> Option> { - let mut result = Vec::new(); - - // Add function name - result.push(("Function".to_string(), "Vortex Scan".to_string())); - - // Add file information - if !bind_data.files.is_empty() { - result.push(("Files".to_string(), bind_data.files.len().to_string())); - } - - // Add filter information - if !bind_data.filter_exprs.is_empty() { - let mut filters = bind_data.filter_exprs.iter().map(|f| format!("{}", f)); - result.push(("Filters".to_string(), filters.join(" /\\\n"))); - } - // NOTE: Projection is already printed by the planner. - - Some(result) - } - - fn virtual_columns(_bind_data: &Self::BindData, result: &mut VirtualColumnsResultRef) { - result.register(EMPTY_COLUMN_IDX, EMPTY_COLUMN_NAME, &LogicalType::bool()); - } -} - -struct MultiScan<'rt, T> { - // A stream-of-streams of scan results. - streams: BoxStream<'rt, VortexResult>>>, - streams_finished: bool, - // The SelectAll used to drive the inner streams. - select_all: SelectAll>>, - // The maximum number of streams to be driving concurrently. - max_concurrency: usize, -} - -impl<'rt, T: 'rt> Stream for MultiScan<'rt, T> { - type Item = VortexResult; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = &mut *self; - - loop { - // First, try to pull from the SelectAll of active streams. - // This means we prefer to complete existing work before starting new work, unless it - // all returns Poll::Pending. - match this.select_all.poll_next_unpin(cx) { - Poll::Ready(None) => { - if this.streams_finished { - // All streams are done - return Poll::Ready(None); - } - } - Poll::Ready(Some(result)) => return Poll::Ready(Some(result)), - Poll::Pending => { - // None of the active streams are ready right now. - } - } - - // If all current streams returned `Poll::Pending`, then we try to fetch the next - // stream to drive. The idea here is to ensure our executors are always busy with - // CPU work by driving as many streams necessary to keep the I/O queues full. - if this.select_all.len() < this.max_concurrency { - match Pin::new(&mut this.streams).poll_next(cx) { - Poll::Ready(Some(Ok(stream))) => { - // Add the new stream to SelectAll, and continue the loop to poll it. - this.select_all.push(stream); - continue; - } - Poll::Ready(Some(Err(e))) => { - // Error opening one of the streams - return Poll::Ready(Some(Err(e))); - } - Poll::Ready(None) => { - // No more streams available from the source - this.streams_finished = true; - if this.select_all.is_empty() { - // No active streams, so we're done. - return Poll::Ready(None); - } - return Poll::Pending; - } - Poll::Pending => { - // Can't get more streams right now - return Poll::Pending; - } - } - } else { - // We have enough active streams, so just wait for one of them to yield. - return Poll::Pending; - } - } - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - use std::sync::atomic::AtomicU64; - use std::sync::atomic::Ordering::Relaxed; - - use vortex::VortexSessionDefault as _; - use vortex::io::runtime::current::CurrentThreadRuntime; - use vortex::session::VortexSession; - use vortex_array::ExecutionCtx; - - use crate::scan::VortexGlobalData; - - #[test] - fn test_table_scan_progress() { - let iterator = - CurrentThreadRuntime::new().block_on_stream_thread_safe(|_| futures::stream::empty()); - let state = VortexGlobalData { - iterator, - batch_id: AtomicU64::new(0), - ctx: ExecutionCtx::new(VortexSession::default()), - bytes_total: Arc::new(AtomicU64::new(100)), - bytes_read: AtomicU64::new(0), - }; - - assert_eq!(state.progress(), 0.0); - - state.bytes_read.fetch_add(100, Relaxed); - assert_eq!(state.progress(), 100.); - - state.bytes_total.fetch_add(100, Relaxed); - assert!((state.progress() - 50.).abs() < f64::EPSILON); - } -} diff --git a/vortex-duckdb/src/scan_api.rs b/vortex-duckdb/src/scan_api.rs deleted file mode 100644 index 07f4baf0b91..00000000000 --- a/vortex-duckdb/src/scan_api.rs +++ /dev/null @@ -1,332 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use std::fmt; -use std::fmt::Debug; -use std::fmt::Formatter; -use std::path::Path; -use std::sync::Arc; - -use futures::StreamExt; -use futures::TryStreamExt; -use itertools::Itertools; -use num_traits::AsPrimitive; -use url::Url; -use vortex::dtype::FieldNames; -use vortex::error::VortexExpect; -use vortex::error::VortexResult; -use vortex::error::vortex_err; -use vortex::expr::Expression; -use vortex::expr::and_collect; -use vortex::expr::col; -use vortex::expr::root; -use vortex::expr::select; -use vortex::expr::stats::Precision; -use vortex::file::multi::MultiFileDataSource; -use vortex::io::runtime::BlockingRuntime; -use vortex::scan::api::DataSourceRef; -use vortex::scan::api::ScanRequest; -use vortex_utils::aliases::hash_set::HashSet; - -use crate::RUNTIME; -use crate::SESSION; -use crate::convert::try_from_bound_expression; -use crate::convert::try_from_table_filter; -use crate::duckdb::BindInputRef; -use crate::duckdb::BindResultRef; -use crate::duckdb::Cardinality; -use crate::duckdb::ClientContextRef; -use crate::duckdb::DataChunkRef; -use crate::duckdb::ExpressionRef; -use crate::duckdb::LogicalType; -use crate::duckdb::TableFunction; -use crate::duckdb::TableInitInput; -use crate::duckdb::VirtualColumnsResultRef; -use crate::exporter::ConversionCache; -use crate::filesystem::resolve_filesystem; -use crate::scan::EMPTY_COLUMN_IDX; -use crate::scan::EMPTY_COLUMN_NAME; -use crate::scan::VortexGlobalData; -use crate::scan::VortexLocalData; -use crate::scan::extract_schema_from_dtype; -use crate::scan::init_local_shared; - -/// Bind data for the scan API table function, holding a [`DataSourceRef`] instead of -/// per-file URLs. -pub struct VortexScanApiBindData { - data_source: DataSourceRef, - filter_exprs: Vec, - column_names: Vec, - column_types: Vec, -} - -impl Clone for VortexScanApiBindData { - fn clone(&self) -> Self { - Self { - data_source: self.data_source.clone(), - // filter_exprs are consumed once in `init_global`. - filter_exprs: vec![], - column_names: self.column_names.clone(), - column_types: self.column_types.clone(), - } - } -} - -impl Debug for VortexScanApiBindData { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.debug_struct("VortexScanApiBindData") - .field("column_names", &self.column_names) - .field("column_types", &self.column_types) - .field( - "filter_exprs", - &self - .filter_exprs - .iter() - .map(|e| e.to_string()) - .collect_vec(), - ) - .finish() - } -} - -#[derive(Debug)] -pub struct VortexScanApiTableFunction; - -/// Creates a projection expression from the table initialization input. -fn extract_projection_expr(init: &TableInitInput) -> Expression { - let projection_ids = init.projection_ids().unwrap_or(&[]); - let column_ids = init.column_ids(); - - select( - projection_ids - .iter() - .map(|p| { - let idx: usize = p.as_(); - let val: usize = column_ids[idx].as_(); - val - }) - .map(|idx| { - init.bind_data() - .column_names - .get(idx) - .vortex_expect("prune idx in column names") - }) - .map(|s| Arc::from(s.as_str())) - .collect::(), - root(), - ) -} - -/// Creates a table filter expression from the table filter set. -fn extract_table_filter_expr( - init: &TableInitInput, - column_ids: &[u64], -) -> VortexResult> { - let mut table_filter_exprs: HashSet = if let Some(filter) = init.table_filter_set() - { - filter - .into_iter() - .map(|(idx, ex)| { - let idx_u: usize = idx.as_(); - let col_idx: usize = column_ids[idx_u].as_(); - let name = init - .bind_data() - .column_names - .get(col_idx) - .vortex_expect("exists"); - try_from_table_filter( - ex, - &col(name.as_str()), - init.bind_data().data_source.dtype(), - ) - }) - .collect::>>>()? - .unwrap_or_else(HashSet::new) - } else { - HashSet::new() - }; - - table_filter_exprs.extend(init.bind_data().filter_exprs.clone()); - Ok(and_collect(table_filter_exprs.into_iter().collect_vec())) -} - -impl TableFunction for VortexScanApiTableFunction { - type BindData = VortexScanApiBindData; - type GlobalState = VortexGlobalData; - type LocalState = VortexLocalData; - - const PROJECTION_PUSHDOWN: bool = true; - const FILTER_PUSHDOWN: bool = true; - const FILTER_PRUNE: bool = true; - - fn parameters() -> Vec { - vec![LogicalType::varchar()] - } - - fn bind( - ctx: &ClientContextRef, - input: &BindInputRef, - result: &mut BindResultRef, - ) -> VortexResult { - let glob_url_parameter = input - .get_parameter(0) - .ok_or_else(|| vortex_err!("Missing file glob parameter"))?; - - // Parse the URL and separate the base URL (keep scheme, host, etc.) from the path. - let glob_url_str = glob_url_parameter.as_string(); - let glob_url = match Url::parse(glob_url_str.as_str()) { - Ok(url) => Ok(url), - Err(_) => Url::from_file_path(Path::new(glob_url_str.as_str())) - .map_err(|_| vortex_err!("Neither URL nor path: '{}' ", glob_url_str.as_str())), - }?; - - let mut base_url = glob_url.clone(); - base_url.set_path(""); - - let fs = resolve_filesystem(&base_url, ctx)?; - - let data_source: DataSourceRef = RUNTIME.block_on(async { - let builder = MultiFileDataSource::new(SESSION.clone()) - .with_filesystem(fs) - .with_glob(glob_url.path()); - let ds = builder.build().await?; - VortexResult::Ok(Arc::new(ds)) - })?; - - let (column_names, column_types) = extract_schema_from_dtype(data_source.dtype())?; - - for (column_name, column_type) in column_names.iter().zip(&column_types) { - result.add_result_column(column_name, column_type); - } - - Ok(VortexScanApiBindData { - data_source, - filter_exprs: vec![], - column_names, - column_types, - }) - } - - fn scan( - _client_context: &ClientContextRef, - _bind_data: &Self::BindData, - local_state: &mut Self::LocalState, - global_state: &mut Self::GlobalState, - chunk: &mut DataChunkRef, - ) -> VortexResult<()> { - global_state.scan(local_state, chunk) - } - - fn init_global(init_input: &TableInitInput) -> VortexResult { - let bind_data = init_input.bind_data(); - let projection_expr = extract_projection_expr(init_input); - let filter_expr = extract_table_filter_expr(init_input, init_input.column_ids())?; - - tracing::debug!( - "Global init Vortex scan_api SELECT {} WHERE {}", - &projection_expr, - filter_expr - .as_ref() - .map_or_else(|| "true".to_string(), |f| f.to_string()) - ); - - let request = ScanRequest { - projection: Some(projection_expr), - filter: filter_expr, - ..Default::default() - }; - - let scan = RUNTIME.block_on(bind_data.data_source.scan(request))?; - let conversion_cache = Arc::new(ConversionCache::new(0)); - - let num_workers = std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1); - - // Each split.execute() returns a lazy stream whose early polls do preparation - // work (expression resolution, layout traversal, first I/O spawns). We use - // try_flatten_unordered to poll multiple split streams concurrently so that - // the next split is already warm when the current one finishes. - let scan_streams = scan.partitions().map(move |split_result| { - let cache = conversion_cache.clone(); - let split = split_result?; - let s = split.execute()?; - VortexResult::Ok(s.map(move |r| Ok((r?, cache.clone()))).boxed()) - }); - - let iterator = RUNTIME.block_on_stream_thread_safe(move |_| { - scan_streams.try_flatten_unordered(Some(num_workers * 2)) - }); - - Ok(VortexGlobalData::new(iterator)) - } - - fn init_local( - _init: &TableInitInput, - global: &mut Self::GlobalState, - ) -> VortexResult { - init_local_shared(global) - } - - fn table_scan_progress( - _client_context: &ClientContextRef, - _bind_data: &mut Self::BindData, - global_state: &mut Self::GlobalState, - ) -> f64 { - global_state.progress() - } - - fn pushdown_complex_filter( - bind_data: &mut Self::BindData, - expr: &ExpressionRef, - ) -> VortexResult { - tracing::debug!("Attempting to push down filter expression: {expr}"); - let Some(expr) = try_from_bound_expression(expr)? else { - return Ok(false); - }; - bind_data.filter_exprs.push(expr); - - // NOTE(ngates): Vortex does indeed run exact filters, so in theory we should return `true` - // here to tell DuckDB we've handled the filter. However, DuckDB applies some crude - // cardinality estimation heuristics (e.g. an equality filter => 20% selectivity) that - // means by returning false, DuckDB runs an additional filter (a little bit of overhead) - // but tends to end up with a better query plan. - // If we plumb row count estimation into the layout tree, perhaps we could use zone maps - // etc. to return estimates. But this function is probably called too late anyway. Maybe - // we need our own cardinality heuristics. - Ok(false) - } - - fn cardinality(bind_data: &Self::BindData) -> Cardinality { - match bind_data.data_source.row_count() { - Some(Precision::Exact(v)) => Cardinality::Maximum(v), - Some(Precision::Inexact(v)) => Cardinality::Estimate(v), - None => Cardinality::Unknown, - } - } - - fn partition_data( - _bind_data: &Self::BindData, - _global_init_data: &mut Self::GlobalState, - local_init_data: &mut Self::LocalState, - ) -> VortexResult { - VortexGlobalData::partition_data(local_init_data) - } - - fn to_string(bind_data: &Self::BindData) -> Option> { - let mut result = Vec::new(); - - result.push(("Function".to_string(), "Vortex Scan (scan API)".to_string())); - - if !bind_data.filter_exprs.is_empty() { - let mut filters = bind_data.filter_exprs.iter().map(|f| format!("{}", f)); - result.push(("Filters".to_string(), filters.join(" /\\\n"))); - } - - Some(result) - } - - fn virtual_columns(_bind_data: &Self::BindData, result: &mut VirtualColumnsResultRef) { - result.register(EMPTY_COLUMN_IDX, EMPTY_COLUMN_NAME, &LogicalType::bool()); - } -} From c43ab1f1c5817818484f54bdb03195b91b278cf7 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Mon, 23 Feb 2026 19:02:29 -0500 Subject: [PATCH 02/11] DuckDB to use the Scan API Signed-off-by: Nicholas Gates --- vortex-duckdb/src/datasource.rs | 3 +- vortex-duckdb/src/filesystem.rs | 2 +- vortex-file/src/multi/mod.rs | 2 +- vortex-scan/src/multi.rs | 58 +++++++++++++++++++++------------ 4 files changed, 41 insertions(+), 24 deletions(-) diff --git a/vortex-duckdb/src/datasource.rs b/vortex-duckdb/src/datasource.rs index cc27e7cb241..9c8676fc185 100644 --- a/vortex-duckdb/src/datasource.rs +++ b/vortex-duckdb/src/datasource.rs @@ -318,8 +318,7 @@ impl TableFunction for T { &mut global_state.ctx, )?); // Relaxed since there is no intra-instruction ordering required. - local_state.batch_id = - Some(global_state.batch_id.fetch_add(1, Ordering::Relaxed)); + local_state.batch_id = Some(global_state.batch_id.fetch_add(1, Ordering::Relaxed)); } let exporter = local_state diff --git a/vortex-duckdb/src/filesystem.rs b/vortex-duckdb/src/filesystem.rs index b10ae65b125..e4bb279a567 100644 --- a/vortex-duckdb/src/filesystem.rs +++ b/vortex-duckdb/src/filesystem.rs @@ -63,7 +63,7 @@ pub(super) fn resolve_filesystem( ctx.erase_lifetime() })) } else if fs_config.as_str() == "vortex" { - tracing::info!( + tracing::debug!( "Using Vortex's object store filesystem for URL scheme '{}'", base_url.scheme() ); diff --git a/vortex-file/src/multi/mod.rs b/vortex-file/src/multi/mod.rs index 0aca57a50b6..36a5812588f 100644 --- a/vortex-file/src/multi/mod.rs +++ b/vortex-file/src/multi/mod.rs @@ -135,7 +135,7 @@ impl MultiFileDataSource { }) .collect(); - let inner = MultiLayoutDataSource::with_first(first_reader, factories, &self.session); + let inner = MultiLayoutDataSource::new_with_first(first_reader, factories, &self.session); debug!(file_count, dtype = %inner.dtype(), "built MultiFileDataSource"); diff --git a/vortex-scan/src/multi.rs b/vortex-scan/src/multi.rs index 4ca7276afe4..f9424d03421 100644 --- a/vortex-scan/src/multi.rs +++ b/vortex-scan/src/multi.rs @@ -98,7 +98,7 @@ impl MultiLayoutDataSource { /// /// The first reader determines the dtype. Remaining readers are opened lazily during /// scanning via their factories. - pub fn with_first( + pub fn new_with_first( first: LayoutReaderRef, remaining: Vec>, session: &VortexSession, @@ -125,7 +125,7 @@ impl MultiLayoutDataSource { /// The dtype must be provided externally since there is no pre-opened reader to infer it /// from. This avoids eagerly opening any file when the schema is already known (e.g. from /// a catalog or a prior scan). - pub fn all_deferred( + pub fn new_deferred( dtype: DType, factories: Vec>, session: &VortexSession, @@ -270,28 +270,46 @@ impl DataSourceScan for MultiLayoutScan { concurrency, } = *self; + let ordered = request.ordered; + // Pre-opened readers are immediately available. let ready_stream = stream::iter(ready).map(Ok); - // Deferred readers are opened concurrently via spawned tasks, mirroring the DuckDB - // scan pattern of `buffer_unordered(num_workers * 2)`. - let deferred_stream = stream::iter(deferred) - .map(move |factory| { - handle.spawn(async move { - factory - .open() - .instrument(tracing::info_span!("LayoutReaderFactory::open")) - .await - }) + // Deferred readers are opened concurrently via spawned tasks. + // When ordered, we use `buffered` to preserve the original partition order. + // When unordered, we use `buffer_unordered` to yield partitions as they open. + let spawned = stream::iter(deferred).map(move |factory| { + handle.spawn(async move { + factory + .open() + .instrument(tracing::info_span!("LayoutReaderFactory::open")) + .await }) - .buffer_unordered(concurrency) - .filter_map(|result| async move { - match result { - Ok(Some(reader)) => Some(Ok(reader)), - Ok(None) => None, - Err(e) => Some(Err(e)), - } - }); + }); + + let deferred_stream = if ordered { + spawned + .buffered(concurrency) + .filter_map(|result| async move { + match result { + Ok(Some(reader)) => Some(Ok(reader)), + Ok(None) => None, + Err(e) => Some(Err(e)), + } + }) + .boxed() + } else { + spawned + .buffer_unordered(concurrency) + .filter_map(|result| async move { + match result { + Ok(Some(reader)) => Some(Ok(reader)), + Ok(None) => None, + Err(e) => Some(Err(e)), + } + }) + .boxed() + }; // For each reader (ready or just-opened), generate a partition. // Partition generation is synchronous (just creates structs with row ranges), so From 758ce2aa32e5c0d1641a31b6f73c5633306e74ed Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Mon, 23 Feb 2026 19:07:21 -0500 Subject: [PATCH 03/11] DuckDB to use the Scan API Signed-off-by: Nicholas Gates --- vortex-scan/public-api.lock | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/vortex-scan/public-api.lock b/vortex-scan/public-api.lock index 8234f990d24..50ca8143739 100644 --- a/vortex-scan/public-api.lock +++ b/vortex-scan/public-api.lock @@ -162,11 +162,11 @@ pub struct vortex_scan::multi::MultiLayoutDataSource impl vortex_scan::multi::MultiLayoutDataSource -pub fn vortex_scan::multi::MultiLayoutDataSource::all_deferred(dtype: vortex_array::dtype::DType, factories: alloc::vec::Vec>, session: &vortex_session::VortexSession) -> Self +pub fn vortex_scan::multi::MultiLayoutDataSource::new_deferred(dtype: vortex_array::dtype::DType, factories: alloc::vec::Vec>, session: &vortex_session::VortexSession) -> Self -pub fn vortex_scan::multi::MultiLayoutDataSource::with_concurrency(self, concurrency: usize) -> Self +pub fn vortex_scan::multi::MultiLayoutDataSource::new_with_first(first: vortex_layout::reader::LayoutReaderRef, remaining: alloc::vec::Vec>, session: &vortex_session::VortexSession) -> Self -pub fn vortex_scan::multi::MultiLayoutDataSource::with_first(first: vortex_layout::reader::LayoutReaderRef, remaining: alloc::vec::Vec>, session: &vortex_session::VortexSession) -> Self +pub fn vortex_scan::multi::MultiLayoutDataSource::with_concurrency(self, concurrency: usize) -> Self impl vortex_scan::api::DataSource for vortex_scan::multi::MultiLayoutDataSource From 35833e3b4823edef78608fa387ddc278cfbdab57 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Mon, 23 Feb 2026 20:32:16 -0500 Subject: [PATCH 04/11] Close before re-opening Signed-off-by: Nicholas Gates --- benchmarks/duckdb-bench/src/lib.rs | 33 ++++++++++++++++++++--------- benchmarks/duckdb-bench/src/main.rs | 2 +- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/benchmarks/duckdb-bench/src/lib.rs b/benchmarks/duckdb-bench/src/lib.rs index 332c6b03547..5b7c4abb6d6 100644 --- a/benchmarks/duckdb-bench/src/lib.rs +++ b/benchmarks/duckdb-bench/src/lib.rs @@ -20,13 +20,19 @@ use vortex_duckdb::duckdb::Database; /// DuckDB context for benchmarks. pub struct DuckClient { - pub db: Database, - pub connection: Connection, + db: Option, + connection: Option, pub db_path: PathBuf, pub threads: Option, } impl DuckClient { + pub fn connection(&self) -> &Connection { + self.connection + .as_ref() + .vortex_expect("DuckClient connection accessed after close") + } + /// Create a new DuckDB context with a database at `{data_url}/{format}/duckdb.db`. pub fn new( benchmark: &dyn Benchmark, @@ -55,8 +61,8 @@ impl DuckClient { let (db, connection) = Self::open_and_setup_database(Some(db_path.clone()), threads)?; Ok(Self { - db, - connection, + db: Some(db), + connection: Some(connection), db_path, threads, }) @@ -100,11 +106,18 @@ impl DuckClient { } pub fn reopen(&mut self) -> Result<()> { - let (mut db, mut connection) = + // Close the old database before opening a new one on the same file path. + // DuckDB cannot have two instances on the same file simultaneously — the new + // instance may read an inconsistent state, causing deserialization + // errors. Drop connection before database (connection depends on database). + self.connection.take(); + self.db.take(); + + let (db, connection) = Self::open_and_setup_database(Some(self.db_path.clone()), self.threads)?; - std::mem::swap(&mut self.connection, &mut connection); - std::mem::swap(&mut self.db, &mut db); + self.db = Some(db); + self.connection = Some(connection); Ok(()) } @@ -117,8 +130,8 @@ impl DuckClient { let db_path = dir.join("duckdb.db"); let (db, connection) = Self::open_and_setup_database(Some(db_path.clone()), None)?; Ok(Self { - db, - connection, + db: Some(db), + connection: Some(connection), db_path, threads: None, }) @@ -130,7 +143,7 @@ impl DuckClient { pub fn execute_query(&self, query: &str) -> Result<(usize, Option)> { trace!("execute duckdb query: {query}"); let time_instant = Instant::now(); - let result = self.connection.query(query)?; + let result = self.connection().query(query)?; let query_time = time_instant.elapsed(); let row_count = usize::try_from(result.row_count()).vortex_expect("row count overflow"); diff --git a/benchmarks/duckdb-bench/src/main.rs b/benchmarks/duckdb-bench/src/main.rs index 96943f8064d..949f9b89964 100644 --- a/benchmarks/duckdb-bench/src/main.rs +++ b/benchmarks/duckdb-bench/src/main.rs @@ -145,7 +145,7 @@ fn main() -> anyhow::Result<()> { println!("=== Q{query_idx} [{format}] ==="); println!("{query}"); println!(); - let result = ctx.connection.query(&format!("EXPLAIN {query}"))?; + let result = ctx.connection().query(&format!("EXPLAIN {query}"))?; for chunk in result { let chunk_str = String::try_from(chunk.deref()).unwrap_or_else(|_| "".to_string()); From eda930cfdd31e6151b39d4d191962f9533a96aa1 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Mon, 23 Feb 2026 21:53:00 -0500 Subject: [PATCH 05/11] Spawn partitions and push into single rx channel Signed-off-by: Nicholas Gates --- vortex-duckdb/src/datasource.rs | 76 +++++++++++++++++++++++------ vortex-duckdb/src/exporter/cache.rs | 15 ------ vortex-duckdb/src/exporter/dict.rs | 4 +- 3 files changed, 61 insertions(+), 34 deletions(-) diff --git a/vortex-duckdb/src/datasource.rs b/vortex-duckdb/src/datasource.rs index 9c8676fc185..891e215fa6c 100644 --- a/vortex-duckdb/src/datasource.rs +++ b/vortex-duckdb/src/datasource.rs @@ -14,8 +14,9 @@ use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use custom_labels::CURRENT_LABELSET; +use futures::SinkExt; use futures::StreamExt; -use futures::TryStreamExt; +use futures::stream::FuturesUnordered; use itertools::Itertools; use num_traits::AsPrimitive; use vortex::VortexSessionDefault; @@ -39,6 +40,7 @@ use vortex::expr::root; use vortex::expr::select; use vortex::expr::stats::Precision; use vortex::io::runtime::BlockingRuntime; +use vortex::io::runtime::Task; use vortex::io::runtime::current::ThreadSafeIterator; use vortex::metrics::tracing::get_global_labels; use vortex::scan::api::DataSourceRef; @@ -224,26 +226,68 @@ impl TableFunction for T { }; let scan = RUNTIME.block_on(bind_data.data_source.scan(request))?; - let conversion_cache = Arc::new(ConversionCache::new(0)); + let handle = RUNTIME.handle(); let num_workers = std::thread::available_parallelism() .map(|n| n.get()) .unwrap_or(1); - // Each split.execute() returns a lazy stream whose early polls do preparation - // work (expression resolution, layout traversal, first I/O spawns). We use - // try_flatten_unordered to poll multiple split streams concurrently so that - // the next split is already warm when the current one finishes. - let scan_streams = scan.partitions().map(move |split_result| { - let cache = conversion_cache.clone(); - let split = split_result?; - let s = split.execute()?; - VortexResult::Ok(s.map(move |r| Ok((r?, cache.clone()))).boxed()) - }); - - let iterator = RUNTIME.block_on_stream_thread_safe(move |_| { - scan_streams.try_flatten_unordered(Some(num_workers * 2)) - }); + // Create a result channel so all workers can return whatever the next chunk is regardless + // of which partition it came from. + let (tx, rx) = futures::channel::mpsc::channel(num_workers * 2); + + // We want to drive `num_workers` partitions concurrently, and ensure we have the same + // number ready to go as soon as one finishes, so we spawn up to `num_workers * 2` tasks. + let stream = scan + .partitions() + // We spawn the partition execution on the CPU runtime in case it does anything + // expensive. + .map(|partition| RUNTIME.handle().spawn_cpu(move || partition?.execute())) + // We then buffer `num_workers` partitions so that we make sure we have this many + // partitions open in the background. + .buffer_unordered(num_workers) + // For each partition stream, we spawn a task that drives it and sends the resulting arrays through a channel to the local scan tasks. We clone the conversion cache for each partition task so that they can populate it in parallel. + .map(move |stream| { + // We create a new conversion cache scoped to the partition, since there's no point + // caching anything across partitions. + let conversion_cache = Arc::new(ConversionCache::default()); + let mut tx = tx.clone(); + handle.spawn(async move { + match stream { + Ok(mut stream) => { + while let Some(array) = stream.next().await { + if tx + .send(array.map(|a| (a, conversion_cache.clone()))) + .await + .is_err() + { + // If the receiver is dropped, we can stop processing and exit the task. + break; + }; + } + } + Err(e) => { + // If we fail to execute the partition, we send the error through the channel and exit the task. + let _ = tx.send(Err(e)).await; + } + } + }) + }) + .buffer_unordered(num_workers) + .boxed(); + + // Spawn the stream to completion while works await on the rx channel. + RUNTIME + .handle() + .spawn(async move { + stream.collect::<()>().await; + }) + .detach(); + + // Spawn a task per partition so that multiple executor threads can drive different + // partitions simultaneously, rather than funnelling all polling through a single + // stream-driving task. + let iterator = RUNTIME.block_on_stream_thread_safe(|_handle| rx); Ok(DataSourceGlobal { iterator, diff --git a/vortex-duckdb/src/exporter/cache.rs b/vortex-duckdb/src/exporter/cache.rs index 8d0d2afb7d3..d7e49845bd5 100644 --- a/vortex-duckdb/src/exporter/cache.rs +++ b/vortex-duckdb/src/exporter/cache.rs @@ -19,19 +19,4 @@ use crate::duckdb::Vector; pub struct ConversionCache { pub values_cache: DashMap>)>, pub canonical_cache: DashMap, - // A value which must be unique for a given DuckDB operator. - instance_id: u64, -} - -impl ConversionCache { - pub fn new(id: u64) -> Self { - Self { - instance_id: id, - ..Self::default() - } - } - - pub fn instance_id(&self) -> u64 { - self.instance_id - } } diff --git a/vortex-duckdb/src/exporter/dict.rs b/vortex-duckdb/src/exporter/dict.rs index ee561fb8056..0ea2bdebcd5 100644 --- a/vortex-duckdb/src/exporter/dict.rs +++ b/vortex-duckdb/src/exporter/dict.rs @@ -37,7 +37,6 @@ struct DictExporter { values_len: u32, codes: PrimitiveArray, codes_type: PhantomData, - cache_id: u64, value_id: usize, } @@ -133,7 +132,6 @@ pub(crate) fn new_exporter_with_flatten( values_len: values.len().as_u32(), codes, codes_type: PhantomData::, - cache_id: cache.instance_id(), value_id: values_key, })) }) @@ -167,7 +165,7 @@ impl> ColumnExporter for DictExporter { // Use a unique id for each dictionary data array -- telling duckdb that // the dict value vector is the same as reuse the hash in a join. - vector.set_dictionary_id(format!("{}-{}", self.cache_id, self.value_id)); + vector.set_dictionary_id(format!("{}", self.value_id)); Ok(()) } From 972787938a00034cf100ead42c62d9c15ae101c7 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Mon, 23 Feb 2026 22:13:51 -0500 Subject: [PATCH 06/11] Spawn partitions and push into single rx channel Signed-off-by: Nicholas Gates --- Cargo.lock | 1 + vortex-duckdb/Cargo.toml | 1 + vortex-duckdb/src/datasource.rs | 76 ++++++++++++--------------------- 3 files changed, 30 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5ff72208310..62ac7ca8096 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10284,6 +10284,7 @@ dependencies = [ "glob", "itertools 0.14.0", "jiff", + "kanal", "num-traits", "object_store", "once_cell", diff --git a/vortex-duckdb/Cargo.toml b/vortex-duckdb/Cargo.toml index d84096e45d0..eb12d25a216 100644 --- a/vortex-duckdb/Cargo.toml +++ b/vortex-duckdb/Cargo.toml @@ -32,6 +32,7 @@ custom-labels = { workspace = true } futures = { workspace = true } glob = { workspace = true } itertools = { workspace = true } +kanal = { workspace = true } num-traits = { workspace = true } object_store = { workspace = true, features = ["aws"] } parking_lot = { workspace = true } diff --git a/vortex-duckdb/src/datasource.rs b/vortex-duckdb/src/datasource.rs index 891e215fa6c..3f39f87b3fb 100644 --- a/vortex-duckdb/src/datasource.rs +++ b/vortex-duckdb/src/datasource.rs @@ -14,9 +14,7 @@ use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use custom_labels::CURRENT_LABELSET; -use futures::SinkExt; use futures::StreamExt; -use futures::stream::FuturesUnordered; use itertools::Itertools; use num_traits::AsPrimitive; use vortex::VortexSessionDefault; @@ -39,8 +37,8 @@ use vortex::expr::col; use vortex::expr::root; use vortex::expr::select; use vortex::expr::stats::Precision; +use vortex::io::kanal_ext::KanalExt; use vortex::io::runtime::BlockingRuntime; -use vortex::io::runtime::Task; use vortex::io::runtime::current::ThreadSafeIterator; use vortex::metrics::tracing::get_global_labels; use vortex::scan::api::DataSourceRef; @@ -226,68 +224,50 @@ impl TableFunction for T { }; let scan = RUNTIME.block_on(bind_data.data_source.scan(request))?; - let handle = RUNTIME.handle(); let num_workers = std::thread::available_parallelism() .map(|n| n.get()) .unwrap_or(1); - // Create a result channel so all workers can return whatever the next chunk is regardless - // of which partition it came from. - let (tx, rx) = futures::channel::mpsc::channel(num_workers * 2); + // We create an async bounded channel so that all thread-local workers can pull the next + // available array chunk regardless of which partition it came from. + let (tx, rx) = kanal::bounded_async(num_workers * 2); - // We want to drive `num_workers` partitions concurrently, and ensure we have the same - // number ready to go as soon as one finishes, so we spawn up to `num_workers * 2` tasks. + // We drive one partition per worker thread. Each partition is driven as a spawned task + // that pushes array chunks into the shared channel as they are produced. This spawning + // allows all worker threads to drive the polling of all partitions, and then return the + // first available array chunk. let stream = scan .partitions() - // We spawn the partition execution on the CPU runtime in case it does anything - // expensive. - .map(|partition| RUNTIME.handle().spawn_cpu(move || partition?.execute())) - // We then buffer `num_workers` partitions so that we make sure we have this many - // partitions open in the background. - .buffer_unordered(num_workers) - // For each partition stream, we spawn a task that drives it and sends the resulting arrays through a channel to the local scan tasks. We clone the conversion cache for each partition task so that they can populate it in parallel. - .map(move |stream| { + .map(move |partition| { // We create a new conversion cache scoped to the partition, since there's no point // caching anything across partitions. - let conversion_cache = Arc::new(ConversionCache::default()); - let mut tx = tx.clone(); - handle.spawn(async move { - match stream { - Ok(mut stream) => { - while let Some(array) = stream.next().await { - if tx - .send(array.map(|a| (a, conversion_cache.clone()))) - .await - .is_err() - { - // If the receiver is dropped, we can stop processing and exit the task. - break; - }; - } - } + let cache = Arc::new(ConversionCache::default()); + let tx = tx.clone(); + + RUNTIME.handle().spawn(async move { + let mut stream = match partition.and_then(|p| p.execute()) { + Ok(s) => s, Err(e) => { - // If we fail to execute the partition, we send the error through the channel and exit the task. let _ = tx.send(Err(e)).await; + return; + } + }; + while let Some(item) = stream.next().await { + if tx.send(item.map(|a| (a, cache.clone()))).await.is_err() { + // Exit early if the receiver has been dropped, which happens when the + // scan is complete or if an error has occurred in another partition. + return; } } }) }) - .buffer_unordered(num_workers) - .boxed(); - - // Spawn the stream to completion while works await on the rx channel. - RUNTIME - .handle() - .spawn(async move { - stream.collect::<()>().await; - }) - .detach(); + .buffer_unordered(num_workers); + + // Spawn a task to drive the partition stream and push array chunks into the channel. + RUNTIME.handle().spawn(stream.collect::<()>()).detach(); - // Spawn a task per partition so that multiple executor threads can drive different - // partitions simultaneously, rather than funnelling all polling through a single - // stream-driving task. - let iterator = RUNTIME.block_on_stream_thread_safe(|_handle| rx); + let iterator = RUNTIME.block_on_stream_thread_safe(|_handle| rx.into_stream()); Ok(DataSourceGlobal { iterator, From e1b43589f0d6c16b6245d2ee7a3a0c0e84fea3fe Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Mon, 23 Feb 2026 22:18:24 -0500 Subject: [PATCH 07/11] Spawn partitions and push into single rx channel Signed-off-by: Nicholas Gates --- vortex-duckdb/src/datasource.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/vortex-duckdb/src/datasource.rs b/vortex-duckdb/src/datasource.rs index 3f39f87b3fb..b0cd89c6557 100644 --- a/vortex-duckdb/src/datasource.rs +++ b/vortex-duckdb/src/datasource.rs @@ -262,6 +262,11 @@ impl TableFunction for T { } }) }) + // TODO(ngates): I'd like to run an experiment with a custom buffer_unordered + // implementation that re-fills the queue before returning a value. The regular + // implementation will only re-fill the queue on the subsequent poll after a value + // is returned, which means we have to wait for one round of polling before all + // workers are driving the scan. .buffer_unordered(num_workers); // Spawn a task to drive the partition stream and push array chunks into the channel. From 13913494c563ab96ad88230cb51a76dfef4d2743 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 24 Feb 2026 09:52:27 -0500 Subject: [PATCH 08/11] merge Signed-off-by: Nicholas Gates --- .../bench_orchestrator/runner/executor.py | 4 + benchmarks/duckdb-bench/src/main.rs | 13 ++- vortex-datafusion/src/v2/source.rs | 2 +- vortex-duckdb/src/datasource.rs | 2 +- vortex-file/src/footer/file_statistics.rs | 9 ++ vortex-file/src/v2/file_stats_reader.rs | 99 +++++++++---------- vortex-scan/public-api.lock | 2 +- vortex-scan/src/api.rs | 20 +++- vortex-scan/src/layout.rs | 6 +- vortex-scan/src/multi.rs | 66 ++++++------- 10 files changed, 124 insertions(+), 99 deletions(-) diff --git a/bench-orchestrator/bench_orchestrator/runner/executor.py b/bench-orchestrator/bench_orchestrator/runner/executor.py index 40eced50b9a..a80080d8d5a 100644 --- a/bench-orchestrator/bench_orchestrator/runner/executor.py +++ b/bench-orchestrator/bench_orchestrator/runner/executor.py @@ -87,6 +87,10 @@ def run( else: cmd = cmd_prefix + cmd + if samply and self.engine == Engine.DUCKDB: + # Re-use the same DuckDB instance across runs make samply output readable + cmd += ["--reuse"] + if self.verbose: console.print(f"[dim]$ {' '.join(cmd)}[/dim]") diff --git a/benchmarks/duckdb-bench/src/main.rs b/benchmarks/duckdb-bench/src/main.rs index 949f9b89964..2bd5b0b99c1 100644 --- a/benchmarks/duckdb-bench/src/main.rs +++ b/benchmarks/duckdb-bench/src/main.rs @@ -73,6 +73,14 @@ struct Args { /// Print EXPLAIN output for each query instead of running benchmarks. #[arg(long, default_value_t = false)] explain: bool, + + #[arg( + long, + default_value_t = false, + help = "Whether to reuse the DuckDB connection across iterations. Helpful when profiling \ + to keep all work on the same threads" + )] + reuse: bool, } fn main() -> anyhow::Result<()> { @@ -187,8 +195,11 @@ fn main() -> anyhow::Result<()> { ("benchmark_name", benchmark_name.clone()), ("query_idx", query_idx.to_string()), ]); + // Make sure to reopen the duckdb connection between iterations - ctx.reopen()?; + if !args.reuse { + ctx.reopen()?; + } ctx.execute_query(query) }, )?; diff --git a/vortex-datafusion/src/v2/source.rs b/vortex-datafusion/src/v2/source.rs index 9fea7aad020..53dd9939a16 100644 --- a/vortex-datafusion/src/v2/source.rs +++ b/vortex-datafusion/src/v2/source.rs @@ -278,7 +278,7 @@ impl DataSource for VortexDataSource { // Build the scan request with pushed-down projection, filter, and limit. // The projection is included so the scan can prune columns at the I/O level. let scan_request = ScanRequest { - projection: Some(self.projected_projection.clone()), + projection: self.projected_projection.clone(), filter: self.filter.clone(), limit: self.limit.map(|l| u64::try_from(l).unwrap_or(u64::MAX)), ordered: self.ordered, diff --git a/vortex-duckdb/src/datasource.rs b/vortex-duckdb/src/datasource.rs index b0cd89c6557..82738eb9f0f 100644 --- a/vortex-duckdb/src/datasource.rs +++ b/vortex-duckdb/src/datasource.rs @@ -218,7 +218,7 @@ impl TableFunction for T { ); let request = ScanRequest { - projection: Some(projection_expr), + projection: projection_expr, filter: filter_expr, ..Default::default() }; diff --git a/vortex-file/src/footer/file_statistics.rs b/vortex-file/src/footer/file_statistics.rs index 2669f3fb6ff..704ae6a44c0 100644 --- a/vortex-file/src/footer/file_statistics.rs +++ b/vortex-file/src/footer/file_statistics.rs @@ -146,6 +146,15 @@ impl FileStatistics { } } +impl<'a> IntoIterator for &'a FileStatistics { + type Item = (&'a StatsSet, &'a DType); + type IntoIter = std::iter::Zip, std::slice::Iter<'a, DType>>; + + fn into_iter(self) -> Self::IntoIter { + self.stats.iter().zip(self.dtypes.iter()) + } +} + impl FlatBufferRoot for FileStatistics {} impl WriteFlatBuffer for FileStatistics { diff --git a/vortex-file/src/v2/file_stats_reader.rs b/vortex-file/src/v2/file_stats_reader.rs index 3db4dec1697..4e720949da4 100644 --- a/vortex-file/src/v2/file_stats_reader.rs +++ b/vortex-file/src/v2/file_stats_reader.rs @@ -11,17 +11,21 @@ use std::collections::BTreeSet; use std::ops::Range; use std::sync::Arc; -use vortex_array::Columnar; +use vortex_array::Canonical; +use vortex_array::IntoArray; use vortex_array::MaskFuture; use vortex_array::VortexSessionExecute; +use vortex_array::arrays::NullArray; use vortex_array::dtype::DType; -use vortex_array::dtype::Field; use vortex_array::dtype::FieldMask; use vortex_array::dtype::FieldPath; -use vortex_array::dtype::FieldPathSet; use vortex_array::dtype::StructFields; use vortex_array::expr::Expression; -use vortex_array::expr::pruning::checked_pruning_expr; +use vortex_array::expr::Literal; +use vortex_array::expr::StatsCatalog; +use vortex_array::expr::lit; +use vortex_array::expr::stats::Stat; +use vortex_array::scalar::Scalar; use vortex_error::VortexResult; use vortex_layout::ArrayFuture; use vortex_layout::LayoutReader; @@ -29,10 +33,8 @@ use vortex_layout::LayoutReaderRef; use vortex_mask::Mask; use vortex_session::VortexSession; use vortex_utils::aliases::dash_map::DashMap; -use vortex_utils::aliases::hash_map::HashMap; use crate::FileStatistics; -use crate::pruning::extract_relevant_file_stats_as_struct_row; /// A [`LayoutReader`] decorator that prunes entire files based on file-level statistics. /// @@ -46,7 +48,6 @@ pub struct FileStatsLayoutReader { child: LayoutReaderRef, file_stats: FileStatistics, struct_fields: StructFields, - available_stats: FieldPathSet, session: VortexSession, prune_cache: DashMap, } @@ -65,28 +66,12 @@ impl FileStatsLayoutReader { .cloned() .unwrap_or_default(); - let available_stats = FieldPathSet::from_iter( - struct_fields - .names() - .iter() - .zip(file_stats.stats_sets().iter()) - .flat_map(|(name, stats)| { - stats.iter().map(|(stat, _)| { - FieldPath::from_iter([ - Field::Name(name.clone()), - Field::Name(stat.name().into()), - ]) - }) - }), - ); - Self { child, file_stats, struct_fields, - available_stats, session, - prune_cache: DashMap::with_hasher(Default::default()), + prune_cache: Default::default(), } } @@ -94,37 +79,51 @@ impl FileStatsLayoutReader { /// /// Returns `true` if file-level stats prove no rows can match, `false` otherwise. fn evaluate_file_stats(&self, expr: &Expression) -> VortexResult { - let Some((predicate, required_stats)) = checked_pruning_expr(expr, &self.available_stats) - else { + let Some(pruning_expr) = expr.stat_falsification(self) else { + // If there is no pruning expression, we can't prune. return Ok(false); }; - let required_file_stats = HashMap::from_iter( - required_stats - .map() - .iter() - .map(|(path, stats)| (path.clone(), stats.clone())), - ); - - let Some(file_stats) = extract_relevant_file_stats_as_struct_row( - &required_file_stats, - self.file_stats.stats_sets(), - &self.struct_fields, - )? - else { - return Ok(false); - }; + // Given how we implemented the StatsCatalog, we know the expression must be all literals. + // We can therefore optimize with a null scope since there are no field references that + // need to be resolved. + let simplified = pruning_expr.optimize_recursive(&DType::Null)?; + if let Some(result) = simplified.as_opt::() { + // Can prune if the result is non-nullable and true + return Ok(result.as_bool().value() == Some(true)); + } + + // Sometimes expressions don't implement constant folding to literals... In this case, + // we just execute the expression over a null array. + let pruning = NullArray::new(1).into_array().apply(&pruning_expr)?; let mut ctx = self.session.create_execution_ctx(); - Ok( - match file_stats - .apply(&predicate)? - .execute::(&mut ctx)? - { - Columnar::Constant(s) => s.scalar().as_bool().value() == Some(true), - Columnar::Canonical(_) => false, - }, - ) + let result = pruning + .execute::(&mut ctx)? + .into_bool() + .scalar_at(0)?; + + Ok(result.as_bool().value() == Some(true)) + } +} + +/// Implements [`StatsCatalog`] to provide file-level stats to expressions during pruning evaluation. +impl StatsCatalog for FileStatsLayoutReader { + fn stats_ref(&self, field_path: &FieldPath, stat: Stat) -> Option { + // FileStats currently only holds top-level field statistics. + if field_path.parts().len() != 1 { + return None; + } + + let field_name = field_path.parts()[0].as_name()?; + let field_idx = self.struct_fields.find(field_name)?; + let field_stats = self.file_stats.stats_sets().get(field_idx)?; + + let stat_value = field_stats.get(stat)?.as_exact()?; + let field_dtype = self.struct_fields.field_by_index(field_idx)?; + let stat_scalar = Scalar::try_new(field_dtype, Some(stat_value)).ok()?; + + Some(lit(stat_scalar)) } } diff --git a/vortex-scan/public-api.lock b/vortex-scan/public-api.lock index 50ca8143739..907eacd274b 100644 --- a/vortex-scan/public-api.lock +++ b/vortex-scan/public-api.lock @@ -10,7 +10,7 @@ pub vortex_scan::api::ScanRequest::limit: core::option::Option pub vortex_scan::api::ScanRequest::ordered: bool -pub vortex_scan::api::ScanRequest::projection: core::option::Option +pub vortex_scan::api::ScanRequest::projection: vortex_array::expr::expression::Expression pub vortex_scan::api::ScanRequest::row_range: core::option::Option> diff --git a/vortex-scan/src/api.rs b/vortex-scan/src/api.rs index 4eea54cde9c..f090dd2cd79 100644 --- a/vortex-scan/src/api.rs +++ b/vortex-scan/src/api.rs @@ -28,6 +28,7 @@ use futures::stream::BoxStream; use vortex_array::dtype::DType; use vortex_array::dtype::FieldPath; use vortex_array::expr::Expression; +use vortex_array::expr::root; use vortex_array::expr::stats::Precision; use vortex_array::stats::StatsSet; use vortex_array::stream::SendableArrayStream; @@ -108,10 +109,10 @@ pub trait DataSource: 'static + Send + Sync { } /// A request to scan a data source. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct ScanRequest { - /// Projection expression, `None` implies `root()`. - pub projection: Option, + /// Projection expression. Defaults to `root()` which returns all columns. + pub projection: Expression, /// Filter expression, `None` implies no filter. pub filter: Option, /// The row range to read. @@ -127,6 +128,19 @@ pub struct ScanRequest { pub limit: Option, } +impl Default for ScanRequest { + fn default() -> Self { + Self { + projection: root(), + filter: None, + row_range: None, + selection: Selection::default(), + ordered: false, + limit: None, + } + } +} + /// A boxed data source scan. pub type DataSourceScanRef = Box; diff --git a/vortex-scan/src/layout.rs b/vortex-scan/src/layout.rs index 8b946b2f8f3..e2f11b30201 100644 --- a/vortex-scan/src/layout.rs +++ b/vortex-scan/src/layout.rs @@ -19,7 +19,6 @@ use vortex_array::dtype::DType; use vortex_array::dtype::FieldPath; use vortex_array::dtype::Nullability; use vortex_array::expr::Expression; -use vortex_array::expr::root; use vortex_array::expr::stats::Precision; use vortex_array::scalar::Scalar; use vortex_array::stats::StatsSet; @@ -116,8 +115,7 @@ impl DataSource for LayoutReaderDataSource { let total_rows = self.reader.row_count(); let row_range = scan_request.row_range.unwrap_or(0..total_rows); - let projection = scan_request.projection.unwrap_or_else(root); - let dtype = projection.return_dtype(self.reader.dtype())?; + let dtype = scan_request.projection.return_dtype(self.reader.dtype())?; // If the dtype is an empty struct, and there is no filter, we can return a special // length-only scan. @@ -163,7 +161,7 @@ impl DataSource for LayoutReaderDataSource { reader: self.reader.clone(), session: self.session.clone(), dtype, - projection, + projection: scan_request.projection, filter: scan_request.filter, limit: scan_request.limit, selection: scan_request.selection, diff --git a/vortex-scan/src/multi.rs b/vortex-scan/src/multi.rs index f9424d03421..df405011b6b 100644 --- a/vortex-scan/src/multi.rs +++ b/vortex-scan/src/multi.rs @@ -26,7 +26,6 @@ use std::any::Any; use std::collections::VecDeque; -use std::ops::Range; use std::sync::Arc; use async_trait::async_trait; @@ -36,8 +35,6 @@ use futures::stream; use tracing::Instrument; use vortex_array::dtype::DType; use vortex_array::dtype::FieldPath; -use vortex_array::expr::Expression; -use vortex_array::expr::root; use vortex_array::expr::stats::Precision; use vortex_array::stats::StatsSet; use vortex_array::stream::ArrayStreamAdapter; @@ -51,7 +48,6 @@ use vortex_mask::Mask; use vortex_session::VortexSession; use crate::ScanBuilder; -use crate::Selection; use crate::api::DataSource; use crate::api::DataSourceScan; use crate::api::DataSourceScanRef; @@ -213,13 +209,11 @@ impl DataSource for MultiLayoutDataSource { } } - let projection = scan_request.projection.clone().unwrap_or_else(root); - let dtype = projection.return_dtype(&self.dtype)?; + let dtype = scan_request.projection.return_dtype(&self.dtype)?; Ok(Box::new(MultiLayoutScan { session: self.session.clone(), dtype, - projection, request: scan_request, ready, deferred, @@ -236,7 +230,6 @@ impl DataSource for MultiLayoutDataSource { struct MultiLayoutScan { session: VortexSession, dtype: DType, - projection: Expression, request: ScanRequest, ready: VecDeque, deferred: VecDeque>, @@ -262,7 +255,6 @@ impl DataSourceScan for MultiLayoutScan { let Self { session, dtype: _, - projection, request, ready, deferred, @@ -317,9 +309,7 @@ impl DataSourceScan for MultiLayoutScan { ready_stream .chain(deferred_stream) .flat_map(move |reader_result| match reader_result { - Ok(reader) => { - reader_partition(reader, session.clone(), projection.clone(), request.clone()) - } + Ok(reader) => reader_partition(reader, session.clone(), request.clone()), Err(e) => stream::once(async move { Err(e) }).boxed(), }) .boxed() @@ -329,12 +319,11 @@ impl DataSourceScan for MultiLayoutScan { /// Generates a partition stream for a single layout reader. /// /// Checks file-level pruning first (via `pruning_evaluation`). If the filter proves no rows -/// can match, returns an empty stream. Otherwise yields a single partition covering the +/// can match, returns an empty stream. Otherwise, yields a single partition covering the /// reader's full row range. fn reader_partition( reader: LayoutReaderRef, session: VortexSession, - projection: Expression, request: ScanRequest, ) -> PartitionStream { let row_count = reader.row_count(); @@ -357,12 +346,10 @@ fn reader_partition( Ok(Box::new(MultiLayoutPartition { reader, session, - projection, - filter: request.filter, - limit: request.limit, - ordered: request.ordered, - row_range, - selection: request.selection, + request: ScanRequest { + row_range: Some(row_range), + ..request + }, }) as PartitionRef) }) .boxed() @@ -375,12 +362,7 @@ fn reader_partition( struct MultiLayoutPartition { reader: LayoutReaderRef, session: VortexSession, - projection: Expression, - filter: Option, - limit: Option, - ordered: bool, - row_range: Range, - selection: Selection, + request: ScanRequest, } impl Partition for MultiLayoutPartition { @@ -389,11 +371,15 @@ impl Partition for MultiLayoutPartition { } fn row_count(&self) -> Option> { - let row_count = self.row_range.end - self.row_range.start; - let row_count = self.selection.row_count(row_count); - let row_count = self.limit.map_or(row_count, |limit| row_count.min(limit)); - - Some(if self.filter.is_some() { + let row_range = self.request.row_range.as_ref()?; + let row_count = row_range.end - row_range.start; + let row_count = self.request.selection.row_count(row_count); + let row_count = self + .request + .limit + .map_or(row_count, |limit| row_count.min(limit)); + + Some(if self.request.filter.is_some() { Precision::inexact(row_count) } else { Precision::exact(row_count) @@ -405,13 +391,17 @@ impl Partition for MultiLayoutPartition { } fn execute(self: Box) -> VortexResult { - let builder = ScanBuilder::new(self.session, self.reader) - .with_row_range(self.row_range) - .with_selection(self.selection) - .with_projection(self.projection) - .with_some_filter(self.filter) - .with_some_limit(self.limit) - .with_ordered(self.ordered); + let request = self.request; + let mut builder = ScanBuilder::new(self.session, self.reader) + .with_selection(request.selection) + .with_projection(request.projection) + .with_some_filter(request.filter) + .with_some_limit(request.limit) + .with_ordered(request.ordered); + + if let Some(row_range) = request.row_range { + builder = builder.with_row_range(row_range); + } let dtype = builder.dtype()?; let stream = builder.into_stream()?; From af39f2dd6fa777aa9438bb004fcb68767fc64ae7 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 24 Feb 2026 09:55:17 -0500 Subject: [PATCH 09/11] merge Signed-off-by: Nicholas Gates --- vortex-file/public-api.lock | 12 ++++++++++++ vortex-scan/public-api.lock | 2 +- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/vortex-file/public-api.lock b/vortex-file/public-api.lock index 3f9c2f482c1..3ade56875ae 100644 --- a/vortex-file/public-api.lock +++ b/vortex-file/public-api.lock @@ -72,6 +72,10 @@ impl vortex_file::v2::FileStatsLayoutReader pub fn vortex_file::v2::FileStatsLayoutReader::new(child: vortex_layout::reader::LayoutReaderRef, file_stats: vortex_file::FileStatistics, session: vortex_session::VortexSession) -> Self +impl vortex_array::expr::pruning::StatsCatalog for vortex_file::v2::FileStatsLayoutReader + +pub fn vortex_file::v2::FileStatsLayoutReader::stats_ref(&self, field_path: &vortex_array::dtype::field::FieldPath, stat: vortex_array::expr::stats::Stat) -> core::option::Option + impl vortex_layout::reader::LayoutReader for vortex_file::v2::FileStatsLayoutReader pub fn vortex_file::v2::FileStatsLayoutReader::dtype(&self) -> &vortex_array::dtype::DType @@ -156,6 +160,14 @@ pub type vortex_file::FileStatistics::Target<'a> = vortex_flatbuffers::footer::F pub fn vortex_file::FileStatistics::write_flatbuffer<'fb>(&self, fbb: &mut flatbuffers::builder::FlatBufferBuilder<'fb>) -> vortex_error::VortexResult> +impl<'a> core::iter::traits::collect::IntoIterator for &'a vortex_file::FileStatistics + +pub type &'a vortex_file::FileStatistics::IntoIter = core::iter::adapters::zip::Zip, core::slice::iter::Iter<'a, vortex_array::dtype::DType>> + +pub type &'a vortex_file::FileStatistics::Item = (&'a vortex_array::stats::stats_set::StatsSet, &'a vortex_array::dtype::DType) + +pub fn &'a vortex_file::FileStatistics::into_iter(self) -> Self::IntoIter + pub struct vortex_file::Footer impl vortex_file::Footer diff --git a/vortex-scan/public-api.lock b/vortex-scan/public-api.lock index 907eacd274b..4232ae1994c 100644 --- a/vortex-scan/public-api.lock +++ b/vortex-scan/public-api.lock @@ -22,7 +22,7 @@ pub fn vortex_scan::api::ScanRequest::clone(&self) -> vortex_scan::api::ScanRequ impl core::default::Default for vortex_scan::api::ScanRequest -pub fn vortex_scan::api::ScanRequest::default() -> vortex_scan::api::ScanRequest +pub fn vortex_scan::api::ScanRequest::default() -> Self impl core::fmt::Debug for vortex_scan::api::ScanRequest From f4990ea7664b510ce0a734396115cfebb5b54804 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 24 Feb 2026 11:38:54 -0500 Subject: [PATCH 10/11] merge Signed-off-by: Nicholas Gates --- vortex-duckdb/src/datasource.rs | 2 +- vortex-utils/src/stream/buffer_unordered.rs | 103 ++++++++++++++++++++ vortex-utils/src/stream/mod.rs | 30 ++++++ 3 files changed, 134 insertions(+), 1 deletion(-) create mode 100644 vortex-utils/src/stream/buffer_unordered.rs create mode 100644 vortex-utils/src/stream/mod.rs diff --git a/vortex-duckdb/src/datasource.rs b/vortex-duckdb/src/datasource.rs index 82738eb9f0f..fba44577451 100644 --- a/vortex-duckdb/src/datasource.rs +++ b/vortex-duckdb/src/datasource.rs @@ -267,7 +267,7 @@ impl TableFunction for T { // implementation will only re-fill the queue on the subsequent poll after a value // is returned, which means we have to wait for one round of polling before all // workers are driving the scan. - .buffer_unordered(num_workers); + .buffer_unordered(num_workers * 2); // Spawn a task to drive the partition stream and push array chunks into the channel. RUNTIME.handle().spawn(stream.collect::<()>()).detach(); diff --git a/vortex-utils/src/stream/buffer_unordered.rs b/vortex-utils/src/stream/buffer_unordered.rs new file mode 100644 index 00000000000..32836d4a716 --- /dev/null +++ b/vortex-utils/src/stream/buffer_unordered.rs @@ -0,0 +1,103 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Stream for buffering unordered futures from a stream. + +use std::pin::Pin; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::task::Context; +use std::task::Poll; + +use futures::Stream; +use futures::StreamExt as _; +use futures::stream::Fuse; +use futures::stream::FusedStream; +use futures::stream::FuturesUnordered; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for [`super::StreamExt::buffer_unordered2`]. + #[must_use = "streams do nothing unless polled"] + pub struct BufferUnordered where S: Stream{ + #[pin] + stream: Fuse, + max: AtomicUsize, + in_progress_queue: FuturesUnordered, + } +} + +impl BufferUnordered { + pub(super) fn new(stream: S, concurrency: AtomicUsize) -> Self { + Self { + stream: stream.fuse(), + max: concurrency, + in_progress_queue: FuturesUnordered::new(), + } + } +} + +impl Stream for BufferUnordered +where + S: Stream, + S::Item: Future, +{ + type Item = ::Output; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + // First up, try to spawn off as many futures as possible by filling up + // our queue of futures. + while this.in_progress_queue.len() < this.max.load(Ordering::Relaxed) { + match this.stream.as_mut().poll_next(cx) { + Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut), + Poll::Ready(None) | Poll::Pending => break, + } + } + + // Attempt to pull the next value from the in_progress_queue + match this.in_progress_queue.poll_next_unpin(cx) { + x @ Poll::Pending | x @ Poll::Ready(Some(_)) => { + // After pulling the latest value, re-fill before returning it. + while this.in_progress_queue.len() < this.max.load(Ordering::Relaxed) { + match this.stream.as_mut().poll_next(cx) { + Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut), + Poll::Ready(None) | Poll::Pending => break, + } + } + + return x; + } + Poll::Ready(None) => {} + } + + // If more values are still coming from the stream, we're not done yet + if this.stream.is_done() { + Poll::Ready(None) + } else { + Poll::Pending + } + } + + fn size_hint(&self) -> (usize, Option) { + let queue_len = self.in_progress_queue.len(); + let (lower, upper) = self.stream.size_hint(); + let lower = lower.saturating_add(queue_len); + let upper = match upper { + Some(x) => x.checked_add(queue_len), + None => None, + }; + (lower, upper) + } +} + +impl FusedStream for BufferUnordered +where + S: Stream, + S::Item: Future, +{ + fn is_terminated(&self) -> bool { + self.in_progress_queue.is_terminated() && self.stream.is_terminated() + } +} diff --git a/vortex-utils/src/stream/mod.rs b/vortex-utils/src/stream/mod.rs new file mode 100644 index 00000000000..c6880aac1ca --- /dev/null +++ b/vortex-utils/src/stream/mod.rs @@ -0,0 +1,30 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Extension traits for `Stream`. + +mod buffer_unordered; + +use std::sync::atomic::AtomicUsize; + +pub use buffer_unordered::BufferUnordered; +use futures::Stream; + +/// Extension trait for `Stream`. +pub trait StreamExt: Sized + Stream { + /// Buffers unordered futures from this stream, with a maximum concurrency of `concurrency`. + /// + /// There are two main differences vs [`futures::stream::StreamExt::buffer_unordered`]: + /// 1. This version takes an `AtomicUsize` for concurrency, allowing it to be modified at + /// runtime to adjust concurrency on the fly. + /// 2. This version re-fills the in-progress queue prior to returning a value instead of + /// on the next iteration of `poll_next`. This can be important when the consumer does a lot + /// of work in between polls and the items of the stream are spawned. + fn buffer_unordered2(self, concurrency: AtomicUsize) -> BufferUnordered; +} + +impl StreamExt for S { + fn buffer_unordered2(self, concurrency: AtomicUsize) -> BufferUnordered { + BufferUnordered::new(self, concurrency) + } +} From 62ab9e3c5dd71456d5239841c3d76a6ad54e33dc Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 24 Feb 2026 12:16:31 -0500 Subject: [PATCH 11/11] ordered: false Signed-off-by: Nicholas Gates --- vortex-duckdb/src/datasource.rs | 8 +- vortex-duckdb/src/duckdb/footer_cache.rs | 54 ---------- vortex-duckdb/src/duckdb/mod.rs | 1 - vortex-utils/src/stream/buffer_unordered.rs | 103 -------------------- vortex-utils/src/stream/mod.rs | 30 ------ 5 files changed, 2 insertions(+), 194 deletions(-) delete mode 100644 vortex-duckdb/src/duckdb/footer_cache.rs delete mode 100644 vortex-utils/src/stream/buffer_unordered.rs delete mode 100644 vortex-utils/src/stream/mod.rs diff --git a/vortex-duckdb/src/datasource.rs b/vortex-duckdb/src/datasource.rs index fba44577451..e4dd07db70e 100644 --- a/vortex-duckdb/src/datasource.rs +++ b/vortex-duckdb/src/datasource.rs @@ -220,6 +220,7 @@ impl TableFunction for T { let request = ScanRequest { projection: projection_expr, filter: filter_expr, + ordered: false, ..Default::default() }; @@ -262,12 +263,7 @@ impl TableFunction for T { } }) }) - // TODO(ngates): I'd like to run an experiment with a custom buffer_unordered - // implementation that re-fills the queue before returning a value. The regular - // implementation will only re-fill the queue on the subsequent poll after a value - // is returned, which means we have to wait for one round of polling before all - // workers are driving the scan. - .buffer_unordered(num_workers * 2); + .buffer_unordered(num_workers); // Spawn a task to drive the partition stream and push array chunks into the channel. RUNTIME.handle().spawn(stream.collect::<()>()).detach(); diff --git a/vortex-duckdb/src/duckdb/footer_cache.rs b/vortex-duckdb/src/duckdb/footer_cache.rs deleted file mode 100644 index 9e4befced64..00000000000 --- a/vortex-duckdb/src/duckdb/footer_cache.rs +++ /dev/null @@ -1,54 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use vortex::file::Footer; -use vortex::file::VortexOpenOptions; - -use crate::duckdb::ObjectCacheRef; - -pub struct FooterCache<'a> { - object_cache: &'a ObjectCacheRef, -} - -pub struct Entry<'a> { - object_cache: &'a ObjectCacheRef, - key: String, - value: Option<&'a Footer>, -} - -impl Entry<'_> { - pub fn put_if_absent(self, value: impl FnOnce() -> Footer) { - if self.value.is_some() { - return; - } - self.object_cache.put(&self.key, value()); - } - - pub fn apply_to_file(&self, options: VortexOpenOptions) -> VortexOpenOptions { - if let Some(footer) = self.value { - options.with_footer(footer.clone()) - } else { - options - } - } -} - -impl<'a> FooterCache<'a> { - pub fn new(object_cache: &'a ObjectCacheRef) -> Self { - Self { object_cache } - } - - pub fn entry(&self, key: &str) -> Entry<'_> { - let key = Self::key(key); - let value = self.object_cache.get(&key); - Entry { - object_cache: self.object_cache, - key, - value, - } - } - - fn key(key: &str) -> String { - format!("vx_cache_key://{key}") - } -} diff --git a/vortex-duckdb/src/duckdb/mod.rs b/vortex-duckdb/src/duckdb/mod.rs index 774dc4af3a9..ec82613fd85 100644 --- a/vortex-duckdb/src/duckdb/mod.rs +++ b/vortex-duckdb/src/duckdb/mod.rs @@ -11,7 +11,6 @@ mod database; mod ddb_string; mod expr; mod file_system; -pub mod footer_cache; mod logical_type; mod macro_; mod object_cache; diff --git a/vortex-utils/src/stream/buffer_unordered.rs b/vortex-utils/src/stream/buffer_unordered.rs deleted file mode 100644 index 32836d4a716..00000000000 --- a/vortex-utils/src/stream/buffer_unordered.rs +++ /dev/null @@ -1,103 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -//! Stream for buffering unordered futures from a stream. - -use std::pin::Pin; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; -use std::task::Context; -use std::task::Poll; - -use futures::Stream; -use futures::StreamExt as _; -use futures::stream::Fuse; -use futures::stream::FusedStream; -use futures::stream::FuturesUnordered; -use pin_project_lite::pin_project; - -pin_project! { - /// Stream for [`super::StreamExt::buffer_unordered2`]. - #[must_use = "streams do nothing unless polled"] - pub struct BufferUnordered where S: Stream{ - #[pin] - stream: Fuse, - max: AtomicUsize, - in_progress_queue: FuturesUnordered, - } -} - -impl BufferUnordered { - pub(super) fn new(stream: S, concurrency: AtomicUsize) -> Self { - Self { - stream: stream.fuse(), - max: concurrency, - in_progress_queue: FuturesUnordered::new(), - } - } -} - -impl Stream for BufferUnordered -where - S: Stream, - S::Item: Future, -{ - type Item = ::Output; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - // First up, try to spawn off as many futures as possible by filling up - // our queue of futures. - while this.in_progress_queue.len() < this.max.load(Ordering::Relaxed) { - match this.stream.as_mut().poll_next(cx) { - Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut), - Poll::Ready(None) | Poll::Pending => break, - } - } - - // Attempt to pull the next value from the in_progress_queue - match this.in_progress_queue.poll_next_unpin(cx) { - x @ Poll::Pending | x @ Poll::Ready(Some(_)) => { - // After pulling the latest value, re-fill before returning it. - while this.in_progress_queue.len() < this.max.load(Ordering::Relaxed) { - match this.stream.as_mut().poll_next(cx) { - Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut), - Poll::Ready(None) | Poll::Pending => break, - } - } - - return x; - } - Poll::Ready(None) => {} - } - - // If more values are still coming from the stream, we're not done yet - if this.stream.is_done() { - Poll::Ready(None) - } else { - Poll::Pending - } - } - - fn size_hint(&self) -> (usize, Option) { - let queue_len = self.in_progress_queue.len(); - let (lower, upper) = self.stream.size_hint(); - let lower = lower.saturating_add(queue_len); - let upper = match upper { - Some(x) => x.checked_add(queue_len), - None => None, - }; - (lower, upper) - } -} - -impl FusedStream for BufferUnordered -where - S: Stream, - S::Item: Future, -{ - fn is_terminated(&self) -> bool { - self.in_progress_queue.is_terminated() && self.stream.is_terminated() - } -} diff --git a/vortex-utils/src/stream/mod.rs b/vortex-utils/src/stream/mod.rs deleted file mode 100644 index c6880aac1ca..00000000000 --- a/vortex-utils/src/stream/mod.rs +++ /dev/null @@ -1,30 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -//! Extension traits for `Stream`. - -mod buffer_unordered; - -use std::sync::atomic::AtomicUsize; - -pub use buffer_unordered::BufferUnordered; -use futures::Stream; - -/// Extension trait for `Stream`. -pub trait StreamExt: Sized + Stream { - /// Buffers unordered futures from this stream, with a maximum concurrency of `concurrency`. - /// - /// There are two main differences vs [`futures::stream::StreamExt::buffer_unordered`]: - /// 1. This version takes an `AtomicUsize` for concurrency, allowing it to be modified at - /// runtime to adjust concurrency on the fly. - /// 2. This version re-fills the in-progress queue prior to returning a value instead of - /// on the next iteration of `poll_next`. This can be important when the consumer does a lot - /// of work in between polls and the items of the stream are spawned. - fn buffer_unordered2(self, concurrency: AtomicUsize) -> BufferUnordered; -} - -impl StreamExt for S { - fn buffer_unordered2(self, concurrency: AtomicUsize) -> BufferUnordered { - BufferUnordered::new(self, concurrency) - } -}