From 4fd7dcae91f0d6e0374cf5e1951a7ced7c3710b5 Mon Sep 17 00:00:00 2001 From: Mikhail Kot Date: Wed, 18 Feb 2026 15:10:33 +0000 Subject: [PATCH] initial Signed-off-by: Mikhail Kot --- benchmarks/duckdb-bench/src/main.rs | 4 ++ vortex-duckdb/README.md | 20 ++++++ .../cpp/include/duckdb_vx/table_function.h | 4 +- vortex-duckdb/cpp/table_function.cpp | 13 +++- .../src/duckdb/table_function/mod.rs | 11 ++- .../table_function/table_scan_progress.rs | 19 +++++ .../src/e2e_test/object_cache_test.rs | 8 +++ vortex-duckdb/src/scan.rs | 70 ++++++++++++++++++- 8 files changed, 143 insertions(+), 6 deletions(-) create mode 100644 vortex-duckdb/src/duckdb/table_function/table_scan_progress.rs diff --git a/benchmarks/duckdb-bench/src/main.rs b/benchmarks/duckdb-bench/src/main.rs index 5da04c2950c..f25682038c8 100644 --- a/benchmarks/duckdb-bench/src/main.rs +++ b/benchmarks/duckdb-bench/src/main.rs @@ -84,6 +84,10 @@ fn main() -> anyhow::Result<()> { args.exclude_queries.as_ref(), ); + if args.formats.is_empty() { + anyhow::bail!("provide a format with --formats"); + } + // Generate Vortex files from Parquet for any Vortex formats requested if benchmark.data_url().scheme() == "file" { // This is ugly, but otherwise some complicated async interaction might result in a deadlock diff --git a/vortex-duckdb/README.md b/vortex-duckdb/README.md index c550cb7d05d..5c7661411db 100644 --- a/vortex-duckdb/README.md +++ b/vortex-duckdb/README.md @@ -53,3 +53,23 @@ VX_DUCKDB_DEBUG=1 cargo test -p vortex-duckdb # Link against the DuckDB debug build from source with ASAN & TSAN. ASAN_OPTIONS=detect_container_overflow=0 VX_DUCKDB_DEBUG=1 VX_DUCKDB_SAN=1 cargo test -p vortex-duckdb ``` + +## Testing the extension with DuckDB + +By default, our tests use a precompiled build which means you don't get an +.extension which you can load in DuckDB. If you want to test a full setup, + +1. Clone [duckdb-vortex](https://github.com/vortex-data/duckdb-vortex) + repository. + +2. If there is an api difference between duckdb-vortex's duckdb submodule and + vortex's vortex-duckdb/duckdb submodule, checkout duckdb-vortex to previous + commit. For example, if duckdb-vortex's HEAD uses 1.5 API but vortex's HEAD + uses 1.4.2, checkout duckdb-vortex at 8a41ee6ebd9. + +3. Update duckdb-vortex's submodules. Replace vortex/ submodule by a softlink to + your local vortex repository. +4. Inside duckdb-vortex, run make -j. + +./target/release/duckdb will be a duckdb instance with vortex-duckdb already +loaded. diff --git a/vortex-duckdb/cpp/include/duckdb_vx/table_function.h b/vortex-duckdb/cpp/include/duckdb_vx/table_function.h index 6a1b906701d..5299d773271 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx/table_function.h +++ b/vortex-duckdb/cpp/include/duckdb_vx/table_function.h @@ -133,7 +133,9 @@ typedef struct { void *pushdown_expression; duckdb_vx_string_map (*to_string)(void *bind_data); // void *dynamic_to_string; - void *table_scan_progress; + + double (*table_scan_progress)(duckdb_client_context ctx, void *bind_data, void *global_state); + idx_t (*get_partition_data)(const void *bind_data, void *init_global_data, void *init_local_data, diff --git a/vortex-duckdb/cpp/table_function.cpp b/vortex-duckdb/cpp/table_function.cpp index 5948379549d..d8f96287f7d 100644 --- a/vortex-duckdb/cpp/table_function.cpp +++ b/vortex-duckdb/cpp/table_function.cpp @@ -74,6 +74,16 @@ struct CTableBindResult { vector &names; }; +double c_table_scan_progress(ClientContext &context, + const FunctionData *bind_data, + const GlobalTableFunctionState *global_state) { + auto &bind = bind_data->Cast(); + duckdb_client_context c_ctx = reinterpret_cast(&context); + void *const c_bind_data = bind.ffi_data->DataPtr(); + void *const c_global_state = global_state->Cast().ffi_data->DataPtr(); + return bind.info->vtab.table_scan_progress(c_ctx, c_bind_data, c_global_state); +} + unique_ptr c_bind(ClientContext &context, TableFunctionBindInput &input, vector &return_types, @@ -351,6 +361,7 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d tf.get_partition_data = c_get_partition_data; tf.get_virtual_columns = c_get_virtual_columns; tf.to_string = c_to_string; + tf.table_scan_progress = c_table_scan_progress; // Set up the parameters tf.arguments.reserve(vtab->parameter_count); @@ -398,4 +409,4 @@ extern "C" void duckdb_vx_string_map_free(duckdb_vx_string_map map) { auto *cpp_map = reinterpret_cast *>(map); delete cpp_map; } -} // namespace vortex \ No newline at end of file +} // namespace vortex diff --git a/vortex-duckdb/src/duckdb/table_function/mod.rs b/vortex-duckdb/src/duckdb/table_function/mod.rs index a068765fcbe..ca197f46e6c 100644 --- a/vortex-duckdb/src/duckdb/table_function/mod.rs +++ b/vortex-duckdb/src/duckdb/table_function/mod.rs @@ -14,6 +14,7 @@ mod cardinality; mod init; mod partition; mod pushdown_complex_filter; +mod table_scan_progress; mod virtual_columns; pub use bind::*; @@ -30,6 +31,7 @@ use crate::duckdb::expr::Expression; use crate::duckdb::table_function::cardinality::cardinality_callback; use crate::duckdb::table_function::partition::get_partition_data_callback; use crate::duckdb::table_function::pushdown_complex_filter::pushdown_complex_filter_callback; +use crate::duckdb::table_function::table_scan_progress::table_scan_progress_callback; use crate::duckdb::table_function::virtual_columns::get_virtual_columns_callback; use crate::duckdb_try; @@ -103,6 +105,13 @@ pub trait TableFunction: Sized + Debug { global: &mut Self::GlobalState, ) -> VortexResult; + /// Return table scanning progress from 0. to 100. + fn table_scan_progress( + client_context: &ClientContext, + bind_data: &mut Self::BindData, + global_state: &mut Self::GlobalState, + ) -> f64; + /// Pushes down a filter expression to the table function. /// /// Returns `true` if the filter was successfully pushed down (and stored on the bind data), @@ -181,7 +190,7 @@ impl Database { pushdown_expression: ptr::null_mut::(), get_virtual_columns: Some(get_virtual_columns_callback::), to_string: Some(to_string_callback::), - table_scan_progress: ptr::null_mut::(), + table_scan_progress: Some(table_scan_progress_callback::), get_partition_data: Some(get_partition_data_callback::), projection_pushdown: T::PROJECTION_PUSHDOWN, filter_pushdown: T::FILTER_PUSHDOWN, diff --git a/vortex-duckdb/src/duckdb/table_function/table_scan_progress.rs b/vortex-duckdb/src/duckdb/table_function/table_scan_progress.rs new file mode 100644 index 00000000000..9eb18c4a4e8 --- /dev/null +++ b/vortex-duckdb/src/duckdb/table_function/table_scan_progress.rs @@ -0,0 +1,19 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex::error::VortexExpect; + +use crate::duckdb::TableFunction; + +pub(crate) unsafe extern "C-unwind" fn table_scan_progress_callback( + ctx: crate::cpp::duckdb_client_context, + bind_data: *mut ::std::os::raw::c_void, + global_state: *mut ::std::os::raw::c_void, +) -> f64 { + let ctx = unsafe { crate::duckdb::ClientContext::borrow(ctx) }; + let bind_data = + unsafe { bind_data.cast::().as_mut() }.vortex_expect("bind_data null pointer"); + let global_state = unsafe { global_state.cast::().as_mut() } + .vortex_expect("global_init_data null pointer"); + T::table_scan_progress(&ctx, bind_data, global_state) +} diff --git a/vortex-duckdb/src/e2e_test/object_cache_test.rs b/vortex-duckdb/src/e2e_test/object_cache_test.rs index 0613b4069d1..6179b6f914e 100644 --- a/vortex-duckdb/src/e2e_test/object_cache_test.rs +++ b/vortex-duckdb/src/e2e_test/object_cache_test.rs @@ -64,6 +64,14 @@ impl TableFunction for TestTableFunction { }) } + fn table_scan_progress( + _client_context: &ClientContext, + _bind_data: &mut Self::BindData, + _global_state: &mut Self::GlobalState, + ) -> f64 { + 100.0 + } + fn scan( _client_context: &ClientContext, _bind_data: &Self::BindData, diff --git a/vortex-duckdb/src/scan.rs b/vortex-duckdb/src/scan.rs index 2772130eb8d..239cbff1b10 100644 --- a/vortex-duckdb/src/scan.rs +++ b/vortex-duckdb/src/scan.rs @@ -113,6 +113,17 @@ pub struct VortexGlobalData { iterator: ThreadSafeIterator)>>, batch_id: AtomicU64, ctx: ExecutionCtx, + bytes_total: Arc, + bytes_read: AtomicU64, +} + +impl VortexGlobalData { + pub fn progress(&self) -> f64 { + let read = self.bytes_read.load(Ordering::Relaxed); + let mut total = self.bytes_total.load(Ordering::Relaxed); + total += (total == 0) as u64; + read as f64 / total as f64 * 100. // return 100. when nothing is read + } } pub struct VortexLocalData { @@ -344,7 +355,6 @@ impl TableFunction for VortexTableFunction { let Some(result) = local_state.iterator.next() else { return Ok(()); }; - let (array_result, conversion_cache) = result?; let array_result = array_result.optimize_recursive()?; @@ -380,6 +390,9 @@ impl TableFunction for VortexTableFunction { .vortex_expect("error: exporter missing"); let has_more_data = exporter.export(chunk)?; + global_state + .bytes_read + .fetch_add(chunk.len(), Ordering::Relaxed); if !has_more_data { // This exporter is fully consumed. @@ -415,6 +428,9 @@ impl TableFunction for VortexTableFunction { .map(|n| n.get()) .unwrap_or(1); + let bytes_total = Arc::new(AtomicU64::new(0)); + let bytes_total_copy = bytes_total.clone(); + let handle = RUNTIME.handle(); let fs = bind_data.file_system.clone(); let first_file = bind_data.first_file.clone(); @@ -428,6 +444,7 @@ impl TableFunction for VortexTableFunction { let conversion_cache = Arc::new(ConversionCache::new(idx as u64)); let object_cache = object_cache; + let bytes_total = bytes_total_copy.clone(); handle .spawn(async move { let vxf = if idx == 0 { @@ -451,13 +468,15 @@ impl TableFunction for VortexTableFunction { { return Ok(None); }; - let scan = vxf .scan()? .with_some_filter(filter_expr) .with_projection(projection_expr) .with_ordered(false) - .map(move |split| Ok((split, conversion_cache.clone()))) + .map(move |split| { + bytes_total.fetch_add(split.len() as u64, Ordering::Relaxed); + Ok((split, conversion_cache.clone())) + }) .into_stream()? .boxed(); @@ -479,6 +498,8 @@ impl TableFunction for VortexTableFunction { batch_id: AtomicU64::new(0), // TODO(joe): fetch this from somewhere??. ctx: ExecutionCtx::new(VortexSession::default()), + bytes_read: AtomicU64::new(0), + bytes_total, }) } @@ -508,6 +529,14 @@ impl TableFunction for VortexTableFunction { }) } + fn table_scan_progress( + _client_context: &ClientContext, + _bind_data: &mut Self::BindData, + global_state: &mut Self::GlobalState, + ) -> f64 { + global_state.progress() + } + fn pushdown_complex_filter( bind_data: &mut Self::BindData, expr: &Expression, @@ -637,3 +666,38 @@ impl<'rt, T: 'rt> Stream for MultiScan<'rt, T> { } } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::sync::atomic::AtomicU64; + use std::sync::atomic::Ordering::Relaxed; + + use vortex::VortexSessionDefault as _; + use vortex::io::runtime::current::CurrentThreadRuntime; + use vortex::session::VortexSession; + use vortex_array::ExecutionCtx; + + use crate::scan::VortexGlobalData; + + #[test] + fn test_table_scan_progress() { + let iterator = + CurrentThreadRuntime::new().block_on_stream_thread_safe(|_| futures::stream::empty()); + let state = VortexGlobalData { + iterator, + batch_id: AtomicU64::new(0), + ctx: ExecutionCtx::new(VortexSession::default()), + bytes_total: Arc::new(AtomicU64::new(100)), + bytes_read: AtomicU64::new(0), + }; + + assert_eq!(state.progress(), 0.0); + + state.bytes_read.fetch_add(100, Relaxed); + assert_eq!(state.progress(), 100.); + + state.bytes_total.fetch_add(100, Relaxed); + assert!((state.progress() - 50.).abs() < f64::EPSILON); + } +}