diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index d71af206c78d5..4d0cc609c6377 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -23,7 +23,7 @@ use arrow_ipc::CompressionType; use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties}; use crate::error::_config_err; use crate::format::{ExplainAnalyzeLevel, ExplainFormat}; -use crate::parquet_config::DFParquetWriterVersion; +use crate::parquet_config::{DFParquetWriterVersion, DFTimeUnit}; use crate::parsers::CompressionTypeVariant; use crate::utils::get_available_parallelism; use crate::{DataFusionError, Result}; @@ -732,13 +732,16 @@ config_namespace! { /// BLOB instead. pub binary_as_string: bool, default = false - /// (reading) If true, parquet reader will read columns of + /// (reading) If set, parquet reader will read columns of /// physical type int96 as originating from a different resolution - /// than nanosecond. This is useful for reading data from systems like Spark - /// which stores microsecond resolution timestamps in an int96 allowing it - /// to write values with a larger date range than 64-bit timestamps with - /// nanosecond resolution. - pub coerce_int96: Option, transform = str::to_lowercase, default = None + /// than nanosecond. + /// Note that `None` means **do not apply any coercion**, + /// which is different from using `Some(DFTimeUnit::Nanosecond)`. + /// This is useful + /// for reading data from systems like Spark which stores microsecond + /// resolution timestamps in an int96 allowing it to write values with + /// a larger date range than 64-bit timestamps with nanosecond resolution. + pub coerce_int96: Option, default = None /// (reading) Use any available bloom filters when reading parquet files pub bloom_filter_on_read: bool, default = true @@ -3553,4 +3556,29 @@ mod tests { "Invalid or Unsupported Configuration: Invalid parquet writer version: 3.0. Expected one of: 1.0, 2.0" ); } + #[cfg(feature = "parquet")] + #[test] + fn test_parquet_coerce_int96_validation() { + use crate::{config::ConfigOptions, parquet_config::DFTimeUnit}; + + let mut config = ConfigOptions::default(); + + // Valid values should work + config + .set("datafusion.execution.parquet.coerce_int96", "ns") + .unwrap(); + assert_eq!( + config.execution.parquet.coerce_int96, + Some(DFTimeUnit::Nanosecond) + ); + + // Invalid value should error immediately at SET time + let err = config + .set("datafusion.execution.parquet.coerce_int96", "invalid") + .unwrap_err(); + assert_eq!( + err.to_string(), + "Invalid or Unsupported Configuration: Invalid parquet coerce_int96: invalid. Expected one of: ns, us, ms, s" + ); + } } diff --git a/datafusion/common/src/parquet_config.rs b/datafusion/common/src/parquet_config.rs index 9d6d7a88566a7..de68969a0d455 100644 --- a/datafusion/common/src/parquet_config.rs +++ b/datafusion/common/src/parquet_config.rs @@ -106,3 +106,88 @@ impl From for DFParquetWriterVersion { } } } + +/// Time unit options for Parquet INT96 timestamp coercion. +/// +/// This enum validates time unit values at configuration time, +/// ensuring only supported units ("ns", "us", "ms", "s") can be set +/// via `SET` commands or proto deserialization. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum DFTimeUnit { + #[default] + Nanosecond, + Microsecond, + Millisecond, + Second, +} + +impl FromStr for DFTimeUnit { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "ns" => Ok(DFTimeUnit::Nanosecond), + "us" => Ok(DFTimeUnit::Microsecond), + "ms" => Ok(DFTimeUnit::Millisecond), + "s" => Ok(DFTimeUnit::Second), + other => Err(DataFusionError::Configuration(format!( + "Invalid parquet coerce_int96: {other}. Expected one of: ns, us, ms, s" + ))), + } + } +} + +impl Display for DFTimeUnit { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match self { + DFTimeUnit::Nanosecond => "ns", + DFTimeUnit::Microsecond => "us", + DFTimeUnit::Millisecond => "ms", + DFTimeUnit::Second => "s", + }; + write!(f, "{s}") + } +} + +impl ConfigField for DFTimeUnit { + fn visit(&self, v: &mut V, key: &str, description: &'static str) { + v.some(key, self, description) + } + + fn set(&mut self, _: &str, value: &str) -> Result<()> { + *self = DFTimeUnit::from_str(value)?; + Ok(()) + } +} + +/// Convert `DFTimeUnit` to parquet crate's `arrow::datatypes::TimeUnit` +/// +/// This conversion is infallible since `DFTimeUnit` only contains +/// valid values that have been validated at configuration time. +#[cfg(feature = "parquet")] +impl From for arrow::datatypes::TimeUnit { + fn from(value: DFTimeUnit) -> Self { + match value { + DFTimeUnit::Nanosecond => arrow::datatypes::TimeUnit::Nanosecond, + DFTimeUnit::Microsecond => arrow::datatypes::TimeUnit::Microsecond, + DFTimeUnit::Millisecond => arrow::datatypes::TimeUnit::Millisecond, + DFTimeUnit::Second => arrow::datatypes::TimeUnit::Second, + } + } +} + +/// Convert parquet crate's `arrow::datatypes::TimeUnit` to `DFTimeUnit` +/// +/// This is used when converting from existing parquet TimeUnit, +/// such as when reading from proto or test code. +#[cfg(feature = "parquet")] +impl From for DFTimeUnit { + fn from(value: arrow::datatypes::TimeUnit) -> Self { + match value { + arrow::datatypes::TimeUnit::Nanosecond => DFTimeUnit::Nanosecond, + arrow::datatypes::TimeUnit::Microsecond => DFTimeUnit::Microsecond, + arrow::datatypes::TimeUnit::Millisecond => DFTimeUnit::Millisecond, + arrow::datatypes::TimeUnit::Second => DFTimeUnit::Second, + } + } +} diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 4c6d915d5bcaa..8efbfc1b8a95b 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -47,6 +47,7 @@ mod tests { use arrow_schema::{SchemaRef, TimeUnit}; use bytes::{BufMut, BytesMut}; use datafusion_common::config::TableParquetOptions; + use datafusion_common::parquet_config::DFTimeUnit; use datafusion_common::test_util::{batches_to_sort_string, batches_to_string}; use datafusion_common::{Result, ScalarValue, assert_contains}; use datafusion_datasource::file_format::FileFormat; @@ -1342,7 +1343,7 @@ mod tests { let time_units_and_expected = vec![ ( - None, // Same as "ns" time_unit + None, Arc::new(Int64Array::from(vec![ Some(1704141296123456000), // Reads as nanosecond fine (note 3 extra 0s) Some(1704070800000000000), // Reads as nanosecond fine (note 3 extra 0s) @@ -1353,7 +1354,7 @@ mod tests { ])), ), ( - Some("ns".to_string()), + Some(DFTimeUnit::Nanosecond), Arc::new(Int64Array::from(vec![ Some(1704141296123456000), Some(1704070800000000000), @@ -1364,7 +1365,7 @@ mod tests { ])), ), ( - Some("us".to_string()), + Some(DFTimeUnit::Microsecond), Arc::new(Int64Array::from(vec![ Some(1704141296123456), Some(1704070800000000), @@ -1379,7 +1380,7 @@ mod tests { for (time_unit, expected) in time_units_and_expected { let parquet_exec = scan_format( &state, - &ParquetFormat::default().with_coerce_int96(time_unit.clone()), + &ParquetFormat::default().with_coerce_int96(time_unit), Some(schema.clone()), &testdata, filename, @@ -1428,7 +1429,7 @@ mod tests { let parquet_exec = scan_format( &state, - &ParquetFormat::default().with_coerce_int96(Some("us".to_string())), + &ParquetFormat::default().with_coerce_int96(Some(DFTimeUnit::Microsecond)), None, testdata, filename, diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 6d1758abeb47b..2424484433338 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -27,6 +27,7 @@ use std::{fmt, vec}; use arrow::array::RecordBatch; use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit}; +use datafusion_common::parquet_config::DFTimeUnit; use datafusion_datasource::TableSchema; use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; @@ -60,7 +61,7 @@ use datafusion_session::Session; use crate::metadata::{DFParquetMetadata, lex_ordering_to_sorting_columns}; use crate::reader::CachedParquetFileReaderFactory; -use crate::source::{ParquetSource, parse_coerce_int96_string}; +use crate::source::ParquetSource; use async_trait::async_trait; use bytes::Bytes; use datafusion_datasource::source::DataSourceExec; @@ -273,11 +274,11 @@ impl ParquetFormat { self } - pub fn coerce_int96(&self) -> Option { - self.options.global.coerce_int96.clone() + pub fn coerce_int96(&self) -> Option { + self.options.global.coerce_int96 } - pub fn with_coerce_int96(mut self, time_unit: Option) -> Self { + pub fn with_coerce_int96(mut self, time_unit: Option) -> Self { self.options.global.coerce_int96 = time_unit; self } @@ -364,11 +365,7 @@ impl FileFormat for ParquetFormat { store: &Arc, objects: &[ObjectMeta], ) -> Result { - let coerce_int96 = match self.coerce_int96() { - Some(time_unit) => Some(parse_coerce_int96_string(time_unit.as_str())?), - None => None, - }; - + let coerce_int96 = self.coerce_int96().map(|time_unit| time_unit.into()); let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache(); diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 75d87a4cd16fc..41b6a3a5dcb47 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -29,15 +29,12 @@ use crate::row_filter::can_expr_be_pushed_down_with_schemas; use datafusion_common::config::ConfigOptions; #[cfg(feature = "parquet_encryption")] use datafusion_common::config::EncryptionFactoryOptions; -use datafusion_datasource::as_file_source; -use datafusion_datasource::file_stream::FileOpener; - -use arrow::datatypes::TimeUnit; -use datafusion_common::DataFusionError; use datafusion_common::config::TableParquetOptions; use datafusion_datasource::TableSchema; +use datafusion_datasource::as_file_source; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_datasource::file_stream::FileOpener; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::{EquivalenceProperties, conjunction}; use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; @@ -483,24 +480,6 @@ impl ParquetSource { } } -/// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit -pub(crate) fn parse_coerce_int96_string( - str_setting: &str, -) -> datafusion_common::Result { - let str_setting_lower: &str = &str_setting.to_lowercase(); - - match str_setting_lower { - "ns" => Ok(TimeUnit::Nanosecond), - "us" => Ok(TimeUnit::Microsecond), - "ms" => Ok(TimeUnit::Millisecond), - "s" => Ok(TimeUnit::Second), - _ => Err(DataFusionError::Configuration(format!( - "Unknown or unsupported parquet coerce_int96: \ - {str_setting}. Valid values are: ns, us, ms, and s." - ))), - } -} - /// Allows easy conversion from ParquetSource to Arc<dyn FileSource> impl From for Arc { fn from(source: ParquetSource) -> Self { @@ -539,7 +518,7 @@ impl FileSource for ParquetSource { .global .coerce_int96 .as_ref() - .map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap()); + .map(|time_unit| (*time_unit).into()); let opener = Arc::new(ParquetOpener { partition_index: partition, diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index ca8a269958d73..6f558d55ca7ac 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use std::convert::{TryFrom, TryInto}; +use std::str::FromStr; use std::sync::Arc; use crate::common::proto_error; @@ -35,6 +36,7 @@ use arrow::ipc::{ writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions}, }; +use datafusion_common::parquet_config::DFTimeUnit; use datafusion_common::{ Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef, DataFusionError, JoinSide, ScalarValue, Statistics, TableReference, @@ -1083,9 +1085,11 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize, schema_force_view_types: value.schema_force_view_types, binary_as_string: value.binary_as_string, - coerce_int96: value.coerce_int96_opt.clone().map(|opt| match opt { - protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => Some(v), - }).unwrap_or(None), + coerce_int96: value.coerce_int96_opt.clone().and_then(|opt| match opt { + protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => { + DFTimeUnit::from_str(&v).ok() + } + }), skip_arrow_metadata: value.skip_arrow_metadata, max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt { protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize), diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 79e3306a4df1b..588abcf217fbc 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -902,7 +902,9 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { schema_force_view_types: value.schema_force_view_types, binary_as_string: value.binary_as_string, skip_arrow_metadata: value.skip_arrow_metadata, - coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), + coerce_int96_opt: value.coerce_int96.map(|time_unit| { + protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(time_unit.to_string()) + }), max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)), }) } diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 08f42b0af7290..7df8a9eb98a4f 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -348,6 +348,8 @@ impl LogicalExtensionCodec for JsonLogicalExtensionCodec { #[cfg(feature = "parquet")] mod parquet { + use std::str::FromStr; + use super::*; use crate::protobuf::{ @@ -359,6 +361,7 @@ mod parquet { use datafusion_common::config::{ ParquetColumnOptions, ParquetOptions, TableParquetOptions, }; + use datafusion_common::parquet_config::DFTimeUnit; use datafusion_datasource_parquet::file_format::ParquetFormatFactory; impl TableParquetOptionsProto { @@ -420,8 +423,8 @@ mod parquet { schema_force_view_types: global_options.global.schema_force_view_types, binary_as_string: global_options.global.binary_as_string, skip_arrow_metadata: global_options.global.skip_arrow_metadata, - coerce_int96_opt: global_options.global.coerce_int96.map(|compression| { - parquet_options::CoerceInt96Opt::CoerceInt96(compression) + coerce_int96_opt: global_options.global.coerce_int96.map(|time_unit| { + parquet_options::CoerceInt96Opt::CoerceInt96(time_unit.to_string()) }), max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64) @@ -520,7 +523,7 @@ mod parquet { binary_as_string: proto.binary_as_string, skip_arrow_metadata: proto.skip_arrow_metadata, coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt { - parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(), + parquet_options::CoerceInt96Opt::CoerceInt96(v) => DFTimeUnit::from_str(v).unwrap(), }), max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize, diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index e48f0a7c92276..1c6c95c3b51ec 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -89,7 +89,7 @@ The following configuration settings are available: | datafusion.execution.parquet.force_filter_selections | false | (reading) Force the use of RowSelections for filter results, when pushdown_filters is enabled. If false, the reader will automatically choose between a RowSelection and a Bitmap based on the number and pattern of selected rows. | | datafusion.execution.parquet.schema_force_view_types | true | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.execution.parquet.binary_as_string | false | (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. | -| datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | +| datafusion.execution.parquet.coerce_int96 | NULL | (reading) If set, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. Note that `None` means **do not apply any coercion**, which is different from using `Some(DFTimeUnit::Nanosecond)`. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | | datafusion.execution.parquet.bloom_filter_on_read | true | (reading) Use any available bloom filters when reading parquet files | | datafusion.execution.parquet.max_predicate_cache_size | NULL | (reading) The maximum predicate cache size, in bytes. When `pushdown_filters` is enabled, sets the maximum memory used to cache the results of predicate evaluation between filter evaluation and output generation. Decreasing this value will reduce memory usage, but may increase IO and CPU usage. None means use the default parquet reader setting. 0 means no caching. | | datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes |