From 59728a021c86ff5c1e24229c129ab4140e93a348 Mon Sep 17 00:00:00 2001 From: Aly Date: Tue, 10 Feb 2026 08:58:24 +0200 Subject: [PATCH 1/4] feat(config): add DFTimeUnit validation for parquet coerce_int96 - Introduce DFTimeUnit enum for INT96 timestamp coercion - Implement FromStr, Display, and ConfigField for validation and SET support - Add conversions to/from arrow::TimeUnit for engine integration - Replace old parse_coerce_int96_string usage with DFTimeUnit - Ensures invalid config values are rejected early --- datafusion/common/src/config.rs | 28 +++++- datafusion/common/src/parquet_config.rs | 90 +++++++++++++++++++ .../datasource-parquet/src/file_format.rs | 10 ++- datafusion/datasource-parquet/src/source.rs | 27 +----- datafusion/proto-common/src/from_proto/mod.rs | 10 ++- datafusion/proto-common/src/to_proto/mod.rs | 4 +- .../proto/src/logical_plan/file_formats.rs | 7 +- 7 files changed, 140 insertions(+), 36 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index dad12c1c6bc91..df9e189f06597 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -23,7 +23,7 @@ use arrow_ipc::CompressionType; use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties}; use crate::error::_config_err; use crate::format::{ExplainAnalyzeLevel, ExplainFormat}; -use crate::parquet_config::DFParquetWriterVersion; +use crate::parquet_config::{DFParquetWriterVersion, DFTimeUnit}; use crate::parsers::CompressionTypeVariant; use crate::utils::get_available_parallelism; use crate::{DataFusionError, Result}; @@ -738,7 +738,7 @@ config_namespace! { /// which stores microsecond resolution timestamps in an int96 allowing it /// to write values with a larger date range than 64-bit timestamps with /// nanosecond resolution. - pub coerce_int96: Option, transform = str::to_lowercase, default = None + pub coerce_int96: Option, default = Some(DFTimeUnit::default()) /// (reading) Use any available bloom filters when reading parquet files pub bloom_filter_on_read: bool, default = true @@ -3530,4 +3530,28 @@ mod tests { "Invalid or Unsupported Configuration: Invalid parquet writer version: 3.0. Expected one of: 1.0, 2.0" ); } + #[cfg(feature = "parquet")] + #[test] + fn test_parquet_coerce_int96_validation() { + use crate::{config::ConfigOptions, parquet_config::DFTimeUnit}; + + let mut config = ConfigOptions::default(); + + // Valid values should work + config + .set("datafusion.execution.parquet.coerce_int96", "ns") + .unwrap(); + assert_eq!(config.execution.parquet.coerce_int96, Some(DFTimeUnit::NS)); + + // ... tests for "us", "ms", "s" ... + + // Invalid value should error immediately at SET time + let err = config + .set("datafusion.execution.parquet.coerce_int96", "invalid") + .unwrap_err(); + assert_eq!( + err.to_string(), + "Invalid or Unsupported Configuration: Invalid parquet coerce_int96: invalid. Expected one of: ns, us, ms, s" + ); + } } diff --git a/datafusion/common/src/parquet_config.rs b/datafusion/common/src/parquet_config.rs index 9d6d7a88566a7..928a0f89bb622 100644 --- a/datafusion/common/src/parquet_config.rs +++ b/datafusion/common/src/parquet_config.rs @@ -106,3 +106,93 @@ impl From for DFParquetWriterVersion { } } } + +/// Time unit options for Parquet INT96 timestamp coercion. +/// +/// This enum validates time unit values at configuration time, +/// ensuring only supported units ("ns", "us", "ms", "s") can be set +/// via `SET` commands or proto deserialization. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum DFTimeUnit { + /// Nanoseconds + #[default] + NS, + /// Microseconds + US, + /// Milliseconds + MS, + /// Seconds + S, +} + +/// Implement parsing strings to `DFTimeUnit` +impl FromStr for DFTimeUnit { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "ns" => Ok(DFTimeUnit::NS), + "us" => Ok(DFTimeUnit::US), + "ms" => Ok(DFTimeUnit::MS), + "s" => Ok(DFTimeUnit::S), + other => Err(DataFusionError::Configuration(format!( + "Invalid parquet coerce_int96: {other}. Expected one of: ns, us, ms, s" + ))), + } + } +} + +impl Display for DFTimeUnit { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match self { + DFTimeUnit::NS => "ns", + DFTimeUnit::US => "us", + DFTimeUnit::MS => "ms", + DFTimeUnit::S => "s", + }; + write!(f, "{s}") + } +} + +impl ConfigField for DFTimeUnit { + fn visit(&self, v: &mut V, key: &str, description: &'static str) { + v.some(key, self, description) + } + + fn set(&mut self, _: &str, value: &str) -> Result<()> { + *self = DFTimeUnit::from_str(value)?; + Ok(()) + } +} + +/// Convert `DFTimeUnit` to parquet crate's `arrow::datatypes::TimeUnit` +/// +/// This conversion is infallible since `DFTimeUnit` only contains +/// valid values that have been validated at configuration time. +#[cfg(feature = "parquet")] +impl From for arrow::datatypes::TimeUnit { + fn from(value: DFTimeUnit) -> Self { + match value { + DFTimeUnit::NS => arrow::datatypes::TimeUnit::Nanosecond, + DFTimeUnit::US => arrow::datatypes::TimeUnit::Microsecond, + DFTimeUnit::MS => arrow::datatypes::TimeUnit::Millisecond, + DFTimeUnit::S => arrow::datatypes::TimeUnit::Second, + } + } +} + +/// Convert parquet crate's `arrow::datatypes::TimeUnit` to `DFTimeUnit` +/// +/// This is used when converting from existing parquet TimeUnit, +/// such as when reading from proto or test code. +#[cfg(feature = "parquet")] +impl From for DFTimeUnit { + fn from(value: arrow::datatypes::TimeUnit) -> Self { + match value { + arrow::datatypes::TimeUnit::Nanosecond => DFTimeUnit::NS, + arrow::datatypes::TimeUnit::Microsecond => DFTimeUnit::US, + arrow::datatypes::TimeUnit::Millisecond => DFTimeUnit::MS, + arrow::datatypes::TimeUnit::Second => DFTimeUnit::S, + } + } +} diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index d59b42ed15d15..34b95b58377e4 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -27,6 +27,7 @@ use std::{fmt, vec}; use arrow::array::RecordBatch; use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit}; +use datafusion_common::parquet_config::DFTimeUnit; use datafusion_datasource::TableSchema; use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; @@ -60,7 +61,7 @@ use datafusion_session::Session; use crate::metadata::{DFParquetMetadata, lex_ordering_to_sorting_columns}; use crate::reader::CachedParquetFileReaderFactory; -use crate::source::{ParquetSource, parse_coerce_int96_string}; +use crate::source::ParquetSource; use async_trait::async_trait; use bytes::Bytes; use datafusion_datasource::source::DataSourceExec; @@ -273,11 +274,11 @@ impl ParquetFormat { self } - pub fn coerce_int96(&self) -> Option { + pub fn coerce_int96(&self) -> Option { self.options.global.coerce_int96.clone() } - pub fn with_coerce_int96(mut self, time_unit: Option) -> Self { + pub fn with_coerce_int96(mut self, time_unit: Option) -> Self { self.options.global.coerce_int96 = time_unit; self } @@ -365,7 +366,8 @@ impl FileFormat for ParquetFormat { objects: &[ObjectMeta], ) -> Result { let coerce_int96 = match self.coerce_int96() { - Some(time_unit) => Some(parse_coerce_int96_string(time_unit.as_str())?), + Some(time_unit) => Some(time_unit.into()), + None => None, }; diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 75d87a4cd16fc..41b6a3a5dcb47 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -29,15 +29,12 @@ use crate::row_filter::can_expr_be_pushed_down_with_schemas; use datafusion_common::config::ConfigOptions; #[cfg(feature = "parquet_encryption")] use datafusion_common::config::EncryptionFactoryOptions; -use datafusion_datasource::as_file_source; -use datafusion_datasource::file_stream::FileOpener; - -use arrow::datatypes::TimeUnit; -use datafusion_common::DataFusionError; use datafusion_common::config::TableParquetOptions; use datafusion_datasource::TableSchema; +use datafusion_datasource::as_file_source; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_datasource::file_stream::FileOpener; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::{EquivalenceProperties, conjunction}; use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; @@ -483,24 +480,6 @@ impl ParquetSource { } } -/// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit -pub(crate) fn parse_coerce_int96_string( - str_setting: &str, -) -> datafusion_common::Result { - let str_setting_lower: &str = &str_setting.to_lowercase(); - - match str_setting_lower { - "ns" => Ok(TimeUnit::Nanosecond), - "us" => Ok(TimeUnit::Microsecond), - "ms" => Ok(TimeUnit::Millisecond), - "s" => Ok(TimeUnit::Second), - _ => Err(DataFusionError::Configuration(format!( - "Unknown or unsupported parquet coerce_int96: \ - {str_setting}. Valid values are: ns, us, ms, and s." - ))), - } -} - /// Allows easy conversion from ParquetSource to Arc<dyn FileSource> impl From for Arc { fn from(source: ParquetSource) -> Self { @@ -539,7 +518,7 @@ impl FileSource for ParquetSource { .global .coerce_int96 .as_ref() - .map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap()); + .map(|time_unit| (*time_unit).into()); let opener = Arc::new(ParquetOpener { partition_index: partition, diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index ca8a269958d73..6f558d55ca7ac 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use std::convert::{TryFrom, TryInto}; +use std::str::FromStr; use std::sync::Arc; use crate::common::proto_error; @@ -35,6 +36,7 @@ use arrow::ipc::{ writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions}, }; +use datafusion_common::parquet_config::DFTimeUnit; use datafusion_common::{ Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef, DataFusionError, JoinSide, ScalarValue, Statistics, TableReference, @@ -1083,9 +1085,11 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize, schema_force_view_types: value.schema_force_view_types, binary_as_string: value.binary_as_string, - coerce_int96: value.coerce_int96_opt.clone().map(|opt| match opt { - protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => Some(v), - }).unwrap_or(None), + coerce_int96: value.coerce_int96_opt.clone().and_then(|opt| match opt { + protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => { + DFTimeUnit::from_str(&v).ok() + } + }), skip_arrow_metadata: value.skip_arrow_metadata, max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt { protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize), diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 79e3306a4df1b..588abcf217fbc 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -902,7 +902,9 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { schema_force_view_types: value.schema_force_view_types, binary_as_string: value.binary_as_string, skip_arrow_metadata: value.skip_arrow_metadata, - coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), + coerce_int96_opt: value.coerce_int96.map(|time_unit| { + protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(time_unit.to_string()) + }), max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)), }) } diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 08f42b0af7290..e3095bfab2399 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -348,6 +348,8 @@ impl LogicalExtensionCodec for JsonLogicalExtensionCodec { #[cfg(feature = "parquet")] mod parquet { + use std::str::FromStr; + use super::*; use crate::protobuf::{ @@ -359,6 +361,7 @@ mod parquet { use datafusion_common::config::{ ParquetColumnOptions, ParquetOptions, TableParquetOptions, }; + use datafusion_common::parquet_config::DFTimeUnit; use datafusion_datasource_parquet::file_format::ParquetFormatFactory; impl TableParquetOptionsProto { @@ -421,7 +424,7 @@ mod parquet { binary_as_string: global_options.global.binary_as_string, skip_arrow_metadata: global_options.global.skip_arrow_metadata, coerce_int96_opt: global_options.global.coerce_int96.map(|compression| { - parquet_options::CoerceInt96Opt::CoerceInt96(compression) + parquet_options::CoerceInt96Opt::CoerceInt96(compression.to_string()) }), max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64) @@ -520,7 +523,7 @@ mod parquet { binary_as_string: proto.binary_as_string, skip_arrow_metadata: proto.skip_arrow_metadata, coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt { - parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(), + parquet_options::CoerceInt96Opt::CoerceInt96(v) => DFTimeUnit::from_str(v).unwrap(), }), max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize, From 745f250aa91a11496f638515c442fa3b9060c985 Mon Sep 17 00:00:00 2001 From: Aly Date: Tue, 10 Feb 2026 09:15:36 +0200 Subject: [PATCH 2/4] fix(parquet): unwrap DFTimeUnit in test to fix type mismatch --- .../src/datasource/physical_plan/parquet.rs | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 4c6d915d5bcaa..7150e86caa40a 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -26,6 +26,7 @@ mod tests { // See also `parquet_exec` integration test use std::fs::{self, File}; use std::io::Write; + use std::str::FromStr; use std::sync::Arc; use std::sync::Mutex; @@ -47,6 +48,7 @@ mod tests { use arrow_schema::{SchemaRef, TimeUnit}; use bytes::{BufMut, BytesMut}; use datafusion_common::config::TableParquetOptions; + use datafusion_common::parquet_config::DFTimeUnit; use datafusion_common::test_util::{batches_to_sort_string, batches_to_string}; use datafusion_common::{Result, ScalarValue, assert_contains}; use datafusion_datasource::file_format::FileFormat; @@ -1342,18 +1344,18 @@ mod tests { let time_units_and_expected = vec![ ( - None, // Same as "ns" time_unit + None, // default: None = "ns" Arc::new(Int64Array::from(vec![ - Some(1704141296123456000), // Reads as nanosecond fine (note 3 extra 0s) - Some(1704070800000000000), // Reads as nanosecond fine (note 3 extra 0s) - Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999) - Some(1735599600000000000), // Reads as nanosecond fine (note 3 extra 0s) + Some(1704141296123456000), + Some(1704070800000000000), + Some(-4852191831933722624), + Some(1735599600000000000), None, - Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000) + Some(-4864435138808946688), ])), ), ( - Some("ns".to_string()), + Some(DFTimeUnit::NS), Arc::new(Int64Array::from(vec![ Some(1704141296123456000), Some(1704070800000000000), @@ -1364,7 +1366,7 @@ mod tests { ])), ), ( - Some("us".to_string()), + Some(DFTimeUnit::US), Arc::new(Int64Array::from(vec![ Some(1704141296123456), Some(1704070800000000), @@ -1379,7 +1381,7 @@ mod tests { for (time_unit, expected) in time_units_and_expected { let parquet_exec = scan_format( &state, - &ParquetFormat::default().with_coerce_int96(time_unit.clone()), + &ParquetFormat::default().with_coerce_int96(time_unit), Some(schema.clone()), &testdata, filename, @@ -1428,7 +1430,8 @@ mod tests { let parquet_exec = scan_format( &state, - &ParquetFormat::default().with_coerce_int96(Some("us".to_string())), + &ParquetFormat::default() + .with_coerce_int96(Some(DFTimeUnit::from_str("us").unwrap())), None, testdata, filename, From 2413c82174a9852a20f959e6e3976fab711713ef Mon Sep 17 00:00:00 2001 From: Aly Date: Tue, 10 Feb 2026 09:45:27 +0200 Subject: [PATCH 3/4] fixed for clappy and other CI/CD pipeline failures --- datafusion/common/src/config.rs | 2 +- datafusion/datasource-parquet/src/file_format.rs | 9 ++------- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index df9e189f06597..46b3b81eaef9e 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -738,7 +738,7 @@ config_namespace! { /// which stores microsecond resolution timestamps in an int96 allowing it /// to write values with a larger date range than 64-bit timestamps with /// nanosecond resolution. - pub coerce_int96: Option, default = Some(DFTimeUnit::default()) + pub coerce_int96: Option, default = None /// (reading) Use any available bloom filters when reading parquet files pub bloom_filter_on_read: bool, default = true diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 34b95b58377e4..70f49bb0ca961 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -275,7 +275,7 @@ impl ParquetFormat { } pub fn coerce_int96(&self) -> Option { - self.options.global.coerce_int96.clone() + self.options.global.coerce_int96 } pub fn with_coerce_int96(mut self, time_unit: Option) -> Self { @@ -365,12 +365,7 @@ impl FileFormat for ParquetFormat { store: &Arc, objects: &[ObjectMeta], ) -> Result { - let coerce_int96 = match self.coerce_int96() { - Some(time_unit) => Some(time_unit.into()), - - None => None, - }; - + let coerce_int96 = self.coerce_int96().map(|time_unit| time_unit.into()); let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache(); From 079849a505bb511e8593f026b42fbde48d534c51 Mon Sep 17 00:00:00 2001 From: Aly Date: Sun, 15 Feb 2026 06:18:04 +0200 Subject: [PATCH 4/4] Refactor: Consolidate DFTimeUnit enum variants Renamin DFTimeUnit enum variants, and fix docstrings --- datafusion/common/src/config.rs | 20 +++++---- datafusion/common/src/parquet_config.rs | 45 +++++++++---------- .../src/datasource/physical_plan/parquet.rs | 20 ++++----- .../proto/src/logical_plan/file_formats.rs | 4 +- docs/source/user-guide/configs.md | 2 +- 5 files changed, 44 insertions(+), 47 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 46b3b81eaef9e..ab71b1a3c6984 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -732,12 +732,15 @@ config_namespace! { /// BLOB instead. pub binary_as_string: bool, default = false - /// (reading) If true, parquet reader will read columns of + /// (reading) If set, parquet reader will read columns of /// physical type int96 as originating from a different resolution - /// than nanosecond. This is useful for reading data from systems like Spark - /// which stores microsecond resolution timestamps in an int96 allowing it - /// to write values with a larger date range than 64-bit timestamps with - /// nanosecond resolution. + /// than nanosecond. + /// Note that `None` means **do not apply any coercion**, + /// which is different from using `Some(DFTimeUnit::Nanosecond)`. + /// This is useful + /// for reading data from systems like Spark which stores microsecond + /// resolution timestamps in an int96 allowing it to write values with + /// a larger date range than 64-bit timestamps with nanosecond resolution. pub coerce_int96: Option, default = None /// (reading) Use any available bloom filters when reading parquet files @@ -3541,9 +3544,10 @@ mod tests { config .set("datafusion.execution.parquet.coerce_int96", "ns") .unwrap(); - assert_eq!(config.execution.parquet.coerce_int96, Some(DFTimeUnit::NS)); - - // ... tests for "us", "ms", "s" ... + assert_eq!( + config.execution.parquet.coerce_int96, + Some(DFTimeUnit::Nanosecond) + ); // Invalid value should error immediately at SET time let err = config diff --git a/datafusion/common/src/parquet_config.rs b/datafusion/common/src/parquet_config.rs index 928a0f89bb622..de68969a0d455 100644 --- a/datafusion/common/src/parquet_config.rs +++ b/datafusion/common/src/parquet_config.rs @@ -114,27 +114,22 @@ impl From for DFParquetWriterVersion { /// via `SET` commands or proto deserialization. #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] pub enum DFTimeUnit { - /// Nanoseconds #[default] - NS, - /// Microseconds - US, - /// Milliseconds - MS, - /// Seconds - S, + Nanosecond, + Microsecond, + Millisecond, + Second, } -/// Implement parsing strings to `DFTimeUnit` impl FromStr for DFTimeUnit { type Err = DataFusionError; fn from_str(s: &str) -> Result { match s.to_lowercase().as_str() { - "ns" => Ok(DFTimeUnit::NS), - "us" => Ok(DFTimeUnit::US), - "ms" => Ok(DFTimeUnit::MS), - "s" => Ok(DFTimeUnit::S), + "ns" => Ok(DFTimeUnit::Nanosecond), + "us" => Ok(DFTimeUnit::Microsecond), + "ms" => Ok(DFTimeUnit::Millisecond), + "s" => Ok(DFTimeUnit::Second), other => Err(DataFusionError::Configuration(format!( "Invalid parquet coerce_int96: {other}. Expected one of: ns, us, ms, s" ))), @@ -145,10 +140,10 @@ impl FromStr for DFTimeUnit { impl Display for DFTimeUnit { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let s = match self { - DFTimeUnit::NS => "ns", - DFTimeUnit::US => "us", - DFTimeUnit::MS => "ms", - DFTimeUnit::S => "s", + DFTimeUnit::Nanosecond => "ns", + DFTimeUnit::Microsecond => "us", + DFTimeUnit::Millisecond => "ms", + DFTimeUnit::Second => "s", }; write!(f, "{s}") } @@ -173,10 +168,10 @@ impl ConfigField for DFTimeUnit { impl From for arrow::datatypes::TimeUnit { fn from(value: DFTimeUnit) -> Self { match value { - DFTimeUnit::NS => arrow::datatypes::TimeUnit::Nanosecond, - DFTimeUnit::US => arrow::datatypes::TimeUnit::Microsecond, - DFTimeUnit::MS => arrow::datatypes::TimeUnit::Millisecond, - DFTimeUnit::S => arrow::datatypes::TimeUnit::Second, + DFTimeUnit::Nanosecond => arrow::datatypes::TimeUnit::Nanosecond, + DFTimeUnit::Microsecond => arrow::datatypes::TimeUnit::Microsecond, + DFTimeUnit::Millisecond => arrow::datatypes::TimeUnit::Millisecond, + DFTimeUnit::Second => arrow::datatypes::TimeUnit::Second, } } } @@ -189,10 +184,10 @@ impl From for arrow::datatypes::TimeUnit { impl From for DFTimeUnit { fn from(value: arrow::datatypes::TimeUnit) -> Self { match value { - arrow::datatypes::TimeUnit::Nanosecond => DFTimeUnit::NS, - arrow::datatypes::TimeUnit::Microsecond => DFTimeUnit::US, - arrow::datatypes::TimeUnit::Millisecond => DFTimeUnit::MS, - arrow::datatypes::TimeUnit::Second => DFTimeUnit::S, + arrow::datatypes::TimeUnit::Nanosecond => DFTimeUnit::Nanosecond, + arrow::datatypes::TimeUnit::Microsecond => DFTimeUnit::Microsecond, + arrow::datatypes::TimeUnit::Millisecond => DFTimeUnit::Millisecond, + arrow::datatypes::TimeUnit::Second => DFTimeUnit::Second, } } } diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 7150e86caa40a..8efbfc1b8a95b 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -26,7 +26,6 @@ mod tests { // See also `parquet_exec` integration test use std::fs::{self, File}; use std::io::Write; - use std::str::FromStr; use std::sync::Arc; use std::sync::Mutex; @@ -1344,18 +1343,18 @@ mod tests { let time_units_and_expected = vec![ ( - None, // default: None = "ns" + None, Arc::new(Int64Array::from(vec![ - Some(1704141296123456000), - Some(1704070800000000000), - Some(-4852191831933722624), - Some(1735599600000000000), + Some(1704141296123456000), // Reads as nanosecond fine (note 3 extra 0s) + Some(1704070800000000000), // Reads as nanosecond fine (note 3 extra 0s) + Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999) + Some(1735599600000000000), // Reads as nanosecond fine (note 3 extra 0s) None, - Some(-4864435138808946688), + Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000) ])), ), ( - Some(DFTimeUnit::NS), + Some(DFTimeUnit::Nanosecond), Arc::new(Int64Array::from(vec![ Some(1704141296123456000), Some(1704070800000000000), @@ -1366,7 +1365,7 @@ mod tests { ])), ), ( - Some(DFTimeUnit::US), + Some(DFTimeUnit::Microsecond), Arc::new(Int64Array::from(vec![ Some(1704141296123456), Some(1704070800000000), @@ -1430,8 +1429,7 @@ mod tests { let parquet_exec = scan_format( &state, - &ParquetFormat::default() - .with_coerce_int96(Some(DFTimeUnit::from_str("us").unwrap())), + &ParquetFormat::default().with_coerce_int96(Some(DFTimeUnit::Microsecond)), None, testdata, filename, diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index e3095bfab2399..7df8a9eb98a4f 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -423,8 +423,8 @@ mod parquet { schema_force_view_types: global_options.global.schema_force_view_types, binary_as_string: global_options.global.binary_as_string, skip_arrow_metadata: global_options.global.skip_arrow_metadata, - coerce_int96_opt: global_options.global.coerce_int96.map(|compression| { - parquet_options::CoerceInt96Opt::CoerceInt96(compression.to_string()) + coerce_int96_opt: global_options.global.coerce_int96.map(|time_unit| { + parquet_options::CoerceInt96Opt::CoerceInt96(time_unit.to_string()) }), max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index e48f0a7c92276..1c6c95c3b51ec 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -89,7 +89,7 @@ The following configuration settings are available: | datafusion.execution.parquet.force_filter_selections | false | (reading) Force the use of RowSelections for filter results, when pushdown_filters is enabled. If false, the reader will automatically choose between a RowSelection and a Bitmap based on the number and pattern of selected rows. | | datafusion.execution.parquet.schema_force_view_types | true | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.execution.parquet.binary_as_string | false | (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. | -| datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | +| datafusion.execution.parquet.coerce_int96 | NULL | (reading) If set, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. Note that `None` means **do not apply any coercion**, which is different from using `Some(DFTimeUnit::Nanosecond)`. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | | datafusion.execution.parquet.bloom_filter_on_read | true | (reading) Use any available bloom filters when reading parquet files | | datafusion.execution.parquet.max_predicate_cache_size | NULL | (reading) The maximum predicate cache size, in bytes. When `pushdown_filters` is enabled, sets the maximum memory used to cache the results of predicate evaluation between filter evaluation and output generation. Decreasing this value will reduce memory usage, but may increase IO and CPU usage. None means use the default parquet reader setting. 0 means no caching. | | datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes |