Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use arrow_ipc::CompressionType;
use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
use crate::error::_config_err;
use crate::format::{ExplainAnalyzeLevel, ExplainFormat};
use crate::parquet_config::DFParquetWriterVersion;
use crate::parquet_config::{DFParquetWriterVersion, DFTimeUnit};
use crate::parsers::CompressionTypeVariant;
use crate::utils::get_available_parallelism;
use crate::{DataFusionError, Result};
Expand Down Expand Up @@ -732,13 +732,16 @@ config_namespace! {
/// BLOB instead.
pub binary_as_string: bool, default = false

/// (reading) If true, parquet reader will read columns of
/// (reading) If set, parquet reader will read columns of
/// physical type int96 as originating from a different resolution
/// than nanosecond. This is useful for reading data from systems like Spark
/// which stores microsecond resolution timestamps in an int96 allowing it
/// to write values with a larger date range than 64-bit timestamps with
/// nanosecond resolution.
pub coerce_int96: Option<String>, transform = str::to_lowercase, default = None
/// than nanosecond.
/// Note that `None` means **do not apply any coercion**,
/// which is different from using `Some(DFTimeUnit::Nanosecond)`.
/// This is useful
/// for reading data from systems like Spark which stores microsecond
/// resolution timestamps in an int96 allowing it to write values with
/// a larger date range than 64-bit timestamps with nanosecond resolution.
pub coerce_int96: Option<DFTimeUnit>, default = None

/// (reading) Use any available bloom filters when reading parquet files
pub bloom_filter_on_read: bool, default = true
Expand Down Expand Up @@ -3553,4 +3556,29 @@ mod tests {
"Invalid or Unsupported Configuration: Invalid parquet writer version: 3.0. Expected one of: 1.0, 2.0"
);
}
#[cfg(feature = "parquet")]
#[test]
fn test_parquet_coerce_int96_validation() {
use crate::{config::ConfigOptions, parquet_config::DFTimeUnit};

let mut config = ConfigOptions::default();

// Valid values should work
config
.set("datafusion.execution.parquet.coerce_int96", "ns")
.unwrap();
assert_eq!(
config.execution.parquet.coerce_int96,
Some(DFTimeUnit::Nanosecond)
);

// Invalid value should error immediately at SET time
let err = config
.set("datafusion.execution.parquet.coerce_int96", "invalid")
.unwrap_err();
assert_eq!(
err.to_string(),
"Invalid or Unsupported Configuration: Invalid parquet coerce_int96: invalid. Expected one of: ns, us, ms, s"
);
}
}
85 changes: 85 additions & 0 deletions datafusion/common/src/parquet_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,88 @@ impl From<parquet::file::properties::WriterVersion> for DFParquetWriterVersion {
}
}
}

/// Time unit options for Parquet INT96 timestamp coercion.
///
/// This enum validates time unit values at configuration time,
/// ensuring only supported units ("ns", "us", "ms", "s") can be set
/// via `SET` commands or proto deserialization.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum DFTimeUnit {
#[default]
Nanosecond,
Microsecond,
Millisecond,
Second,
}

impl FromStr for DFTimeUnit {
type Err = DataFusionError;

fn from_str(s: &str) -> Result<Self> {
match s.to_lowercase().as_str() {
"ns" => Ok(DFTimeUnit::Nanosecond),
"us" => Ok(DFTimeUnit::Microsecond),
"ms" => Ok(DFTimeUnit::Millisecond),
"s" => Ok(DFTimeUnit::Second),
other => Err(DataFusionError::Configuration(format!(
"Invalid parquet coerce_int96: {other}. Expected one of: ns, us, ms, s"
))),
}
}
}

impl Display for DFTimeUnit {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = match self {
DFTimeUnit::Nanosecond => "ns",
DFTimeUnit::Microsecond => "us",
DFTimeUnit::Millisecond => "ms",
DFTimeUnit::Second => "s",
};
write!(f, "{s}")
}
}

impl ConfigField for DFTimeUnit {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
v.some(key, self, description)
}

fn set(&mut self, _: &str, value: &str) -> Result<()> {
*self = DFTimeUnit::from_str(value)?;
Ok(())
}
}

/// Convert `DFTimeUnit` to parquet crate's `arrow::datatypes::TimeUnit`
///
/// This conversion is infallible since `DFTimeUnit` only contains
/// valid values that have been validated at configuration time.
#[cfg(feature = "parquet")]
impl From<DFTimeUnit> for arrow::datatypes::TimeUnit {
fn from(value: DFTimeUnit) -> Self {
match value {
DFTimeUnit::Nanosecond => arrow::datatypes::TimeUnit::Nanosecond,
DFTimeUnit::Microsecond => arrow::datatypes::TimeUnit::Microsecond,
DFTimeUnit::Millisecond => arrow::datatypes::TimeUnit::Millisecond,
DFTimeUnit::Second => arrow::datatypes::TimeUnit::Second,
}
}
}

/// Convert parquet crate's `arrow::datatypes::TimeUnit` to `DFTimeUnit`
///
/// This is used when converting from existing parquet TimeUnit,
/// such as when reading from proto or test code.
#[cfg(feature = "parquet")]
impl From<arrow::datatypes::TimeUnit> for DFTimeUnit {
fn from(value: arrow::datatypes::TimeUnit) -> Self {
match value {
arrow::datatypes::TimeUnit::Nanosecond => DFTimeUnit::Nanosecond,
arrow::datatypes::TimeUnit::Microsecond => DFTimeUnit::Microsecond,
arrow::datatypes::TimeUnit::Millisecond => DFTimeUnit::Millisecond,
arrow::datatypes::TimeUnit::Second => DFTimeUnit::Second,
}
}
}
11 changes: 6 additions & 5 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ mod tests {
use arrow_schema::{SchemaRef, TimeUnit};
use bytes::{BufMut, BytesMut};
use datafusion_common::config::TableParquetOptions;
use datafusion_common::parquet_config::DFTimeUnit;
use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
use datafusion_common::{Result, ScalarValue, assert_contains};
use datafusion_datasource::file_format::FileFormat;
Expand Down Expand Up @@ -1342,7 +1343,7 @@ mod tests {

let time_units_and_expected = vec![
(
None, // Same as "ns" time_unit
None,
Arc::new(Int64Array::from(vec![
Some(1704141296123456000), // Reads as nanosecond fine (note 3 extra 0s)
Some(1704070800000000000), // Reads as nanosecond fine (note 3 extra 0s)
Expand All @@ -1353,7 +1354,7 @@ mod tests {
])),
),
(
Some("ns".to_string()),
Some(DFTimeUnit::Nanosecond),
Arc::new(Int64Array::from(vec![
Some(1704141296123456000),
Some(1704070800000000000),
Expand All @@ -1364,7 +1365,7 @@ mod tests {
])),
),
(
Some("us".to_string()),
Some(DFTimeUnit::Microsecond),
Arc::new(Int64Array::from(vec![
Some(1704141296123456),
Some(1704070800000000),
Expand All @@ -1379,7 +1380,7 @@ mod tests {
for (time_unit, expected) in time_units_and_expected {
let parquet_exec = scan_format(
&state,
&ParquetFormat::default().with_coerce_int96(time_unit.clone()),
&ParquetFormat::default().with_coerce_int96(time_unit),
Some(schema.clone()),
&testdata,
filename,
Expand Down Expand Up @@ -1428,7 +1429,7 @@ mod tests {

let parquet_exec = scan_format(
&state,
&ParquetFormat::default().with_coerce_int96(Some("us".to_string())),
&ParquetFormat::default().with_coerce_int96(Some(DFTimeUnit::Microsecond)),
None,
testdata,
filename,
Expand Down
15 changes: 6 additions & 9 deletions datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::{fmt, vec};

use arrow::array::RecordBatch;
use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit};
use datafusion_common::parquet_config::DFTimeUnit;
use datafusion_datasource::TableSchema;
use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
Expand Down Expand Up @@ -60,7 +61,7 @@ use datafusion_session::Session;

use crate::metadata::{DFParquetMetadata, lex_ordering_to_sorting_columns};
use crate::reader::CachedParquetFileReaderFactory;
use crate::source::{ParquetSource, parse_coerce_int96_string};
use crate::source::ParquetSource;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion_datasource::source::DataSourceExec;
Expand Down Expand Up @@ -273,11 +274,11 @@ impl ParquetFormat {
self
}

pub fn coerce_int96(&self) -> Option<String> {
self.options.global.coerce_int96.clone()
pub fn coerce_int96(&self) -> Option<DFTimeUnit> {
self.options.global.coerce_int96
}

pub fn with_coerce_int96(mut self, time_unit: Option<String>) -> Self {
pub fn with_coerce_int96(mut self, time_unit: Option<DFTimeUnit>) -> Self {
self.options.global.coerce_int96 = time_unit;
self
}
Expand Down Expand Up @@ -364,11 +365,7 @@ impl FileFormat for ParquetFormat {
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
let coerce_int96 = match self.coerce_int96() {
Some(time_unit) => Some(parse_coerce_int96_string(time_unit.as_str())?),
None => None,
};

let coerce_int96 = self.coerce_int96().map(|time_unit| time_unit.into());
let file_metadata_cache =
state.runtime_env().cache_manager.get_file_metadata_cache();

Expand Down
27 changes: 3 additions & 24 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,12 @@ use crate::row_filter::can_expr_be_pushed_down_with_schemas;
use datafusion_common::config::ConfigOptions;
#[cfg(feature = "parquet_encryption")]
use datafusion_common::config::EncryptionFactoryOptions;
use datafusion_datasource::as_file_source;
use datafusion_datasource::file_stream::FileOpener;

use arrow::datatypes::TimeUnit;
use datafusion_common::DataFusionError;
use datafusion_common::config::TableParquetOptions;
use datafusion_datasource::TableSchema;
use datafusion_datasource::as_file_source;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::file_stream::FileOpener;
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::{EquivalenceProperties, conjunction};
use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
Expand Down Expand Up @@ -483,24 +480,6 @@ impl ParquetSource {
}
}

/// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit
pub(crate) fn parse_coerce_int96_string(
str_setting: &str,
) -> datafusion_common::Result<TimeUnit> {
let str_setting_lower: &str = &str_setting.to_lowercase();

match str_setting_lower {
"ns" => Ok(TimeUnit::Nanosecond),
"us" => Ok(TimeUnit::Microsecond),
"ms" => Ok(TimeUnit::Millisecond),
"s" => Ok(TimeUnit::Second),
_ => Err(DataFusionError::Configuration(format!(
"Unknown or unsupported parquet coerce_int96: \
{str_setting}. Valid values are: ns, us, ms, and s."
))),
}
}

/// Allows easy conversion from ParquetSource to Arc&lt;dyn FileSource&gt;
impl From<ParquetSource> for Arc<dyn FileSource> {
fn from(source: ParquetSource) -> Self {
Expand Down Expand Up @@ -539,7 +518,7 @@ impl FileSource for ParquetSource {
.global
.coerce_int96
.as_ref()
.map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap());
.map(|time_unit| (*time_unit).into());

let opener = Arc::new(ParquetOpener {
partition_index: partition,
Expand Down
10 changes: 7 additions & 3 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::collections::HashMap;
use std::convert::{TryFrom, TryInto};
use std::str::FromStr;
use std::sync::Arc;

use crate::common::proto_error;
Expand All @@ -35,6 +36,7 @@ use arrow::ipc::{
writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions},
};

use datafusion_common::parquet_config::DFTimeUnit;
use datafusion_common::{
Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
DataFusionError, JoinSide, ScalarValue, Statistics, TableReference,
Expand Down Expand Up @@ -1083,9 +1085,11 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize,
schema_force_view_types: value.schema_force_view_types,
binary_as_string: value.binary_as_string,
coerce_int96: value.coerce_int96_opt.clone().map(|opt| match opt {
protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => Some(v),
}).unwrap_or(None),
coerce_int96: value.coerce_int96_opt.clone().and_then(|opt| match opt {
protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => {
DFTimeUnit::from_str(&v).ok()
}
}),
skip_arrow_metadata: value.skip_arrow_metadata,
max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt {
protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize),
Expand Down
4 changes: 3 additions & 1 deletion datafusion/proto-common/src/to_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,9 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
schema_force_view_types: value.schema_force_view_types,
binary_as_string: value.binary_as_string,
skip_arrow_metadata: value.skip_arrow_metadata,
coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96),
coerce_int96_opt: value.coerce_int96.map(|time_unit| {
protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(time_unit.to_string())
}),
max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)),
})
}
Expand Down
9 changes: 6 additions & 3 deletions datafusion/proto/src/logical_plan/file_formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ impl LogicalExtensionCodec for JsonLogicalExtensionCodec {

#[cfg(feature = "parquet")]
mod parquet {
use std::str::FromStr;

use super::*;

use crate::protobuf::{
Expand All @@ -359,6 +361,7 @@ mod parquet {
use datafusion_common::config::{
ParquetColumnOptions, ParquetOptions, TableParquetOptions,
};
use datafusion_common::parquet_config::DFTimeUnit;
use datafusion_datasource_parquet::file_format::ParquetFormatFactory;

impl TableParquetOptionsProto {
Expand Down Expand Up @@ -420,8 +423,8 @@ mod parquet {
schema_force_view_types: global_options.global.schema_force_view_types,
binary_as_string: global_options.global.binary_as_string,
skip_arrow_metadata: global_options.global.skip_arrow_metadata,
coerce_int96_opt: global_options.global.coerce_int96.map(|compression| {
parquet_options::CoerceInt96Opt::CoerceInt96(compression)
coerce_int96_opt: global_options.global.coerce_int96.map(|time_unit| {
parquet_options::CoerceInt96Opt::CoerceInt96(time_unit.to_string())
}),
max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| {
parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64)
Expand Down Expand Up @@ -520,7 +523,7 @@ mod parquet {
binary_as_string: proto.binary_as_string,
skip_arrow_metadata: proto.skip_arrow_metadata,
coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt {
parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(),
parquet_options::CoerceInt96Opt::CoerceInt96(v) => DFTimeUnit::from_str(v).unwrap(),
}),
max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt {
parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize,
Expand Down
Loading
Loading