Skip to content

Commit 4a9b45e

Browse files
committed
Implement config options
1 parent 2208665 commit 4a9b45e

File tree

10 files changed

+258
-126
lines changed

10 files changed

+258
-126
lines changed

Cargo.lock

Lines changed: 100 additions & 114 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,3 +91,9 @@ crate-type = ["cdylib", "rlib"]
9191
[profile.release]
9292
lto = true
9393
codegen-units = 1
94+
95+
[patch.crates-io]
96+
datafusion = { git = "https://github.com/timsaucer/datafusion", rev = "33ea91c5fa37aa0cfa787d58c6bf84ec4628db88" }
97+
datafusion-substrait = { git = "https://github.com/timsaucer/datafusion", rev = "33ea91c5fa37aa0cfa787d58c6bf84ec4628db88" }
98+
datafusion-proto = { git = "https://github.com/timsaucer/datafusion", rev = "33ea91c5fa37aa0cfa787d58c6bf84ec4628db88" }
99+
datafusion-ffi = { git = "https://github.com/timsaucer/datafusion", rev = "33ea91c5fa37aa0cfa787d58c6bf84ec4628db88" }
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import pyarrow as pa
2+
from datafusion import SessionConfig
3+
from datafusion_ffi_example import MyConfig
4+
5+
def test_catalog_provider():
6+
config = MyConfig()
7+
config = SessionConfig().with_extension(config)
8+
config.set("my_config.baz_count", "42")
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
use datafusion::common::{config_err, DataFusionError};
2+
use datafusion::config::{ConfigEntry, ConfigExtension, ConfigField, ExtensionOptions, Visit};
3+
use pyo3::{pyclass, pymethods, Bound, PyResult, Python};
4+
use std::any::Any;
5+
use datafusion_ffi::config::extension_options::FFI_ExtensionOptions;
6+
use pyo3::exceptions::PyRuntimeError;
7+
use pyo3::types::PyCapsule;
8+
9+
/// My own config options.
10+
#[pyclass(name = "MyConfig", module = "datafusion_ffi_example", subclass)]
11+
#[derive(Clone, Debug)]
12+
pub struct MyConfig {
13+
/// Should "foo" be replaced by "bar"?
14+
pub foo_to_bar: bool,
15+
16+
/// How many "baz" should be created?
17+
pub baz_count: usize,
18+
}
19+
20+
#[pymethods]
21+
impl MyConfig {
22+
#[new]
23+
fn new() -> Self {
24+
Self::default()
25+
}
26+
27+
28+
fn __datafusion_extension_options__<'py>(
29+
&self,
30+
py: Python<'py>,
31+
) -> PyResult<Bound<'py, PyCapsule>> {
32+
let name = cr"datafusion_extension_options".into();
33+
34+
let mut config = FFI_ExtensionOptions::default();
35+
config.add_config(self).map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
36+
37+
PyCapsule::new(py, config, Some(name))
38+
}
39+
}
40+
41+
impl Default for MyConfig {
42+
fn default() -> Self {
43+
Self {
44+
foo_to_bar: true,
45+
baz_count: 1337,
46+
}
47+
}
48+
}
49+
50+
impl ConfigExtension for MyConfig {
51+
const PREFIX: &'static str = "my_config";
52+
}
53+
54+
impl ExtensionOptions for MyConfig {
55+
fn as_any(&self) -> &dyn Any {
56+
self
57+
}
58+
59+
fn as_any_mut(&mut self) -> &mut dyn Any {
60+
self
61+
}
62+
63+
fn cloned(&self) -> Box<dyn ExtensionOptions> {
64+
Box::new(self.clone())
65+
}
66+
67+
fn set(&mut self, key: &str, value: &str) -> datafusion::common::Result<()> {
68+
datafusion::config::ConfigField::set(self, key, value)
69+
}
70+
71+
fn entries(&self) -> Vec<ConfigEntry> {
72+
vec![
73+
ConfigEntry {
74+
key: "foo_to_bar".to_owned(),
75+
value: Some(format!("{}", self.foo_to_bar)),
76+
description: "foo to bar",
77+
},
78+
ConfigEntry {
79+
key: "baz_count".to_owned(),
80+
value: Some(format!("{}", self.baz_count)),
81+
description: "baz count",
82+
},
83+
]
84+
}
85+
}
86+
87+
impl ConfigField for MyConfig {
88+
fn visit<V: Visit>(&self, v: &mut V, _key: &str, _description: &'static str) {
89+
let key = "foo_to_bar";
90+
let desc = "foo to bar";
91+
self.foo_to_bar.visit(v, key, desc);
92+
93+
let key = "baz_count";
94+
let desc = "baz count";
95+
self.baz_count.visit(v, key, desc);
96+
}
97+
98+
fn set(&mut self, key: &str, value: &str) -> Result<(), DataFusionError> {
99+
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
100+
match key {
101+
"foo_to_bar" => self.foo_to_bar.set(rem, value.as_ref()),
102+
"baz_count" => self.baz_count.set(rem, value.as_ref()),
103+
104+
_ => config_err!("Config value \"{}\" not found on MyConfig", key),
105+
}
106+
107+
}
108+
}

python/datafusion/context.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,10 @@ def set(self, key: str, value: str) -> SessionConfig:
294294
self.config_internal = self.config_internal.set(key, value)
295295
return self
296296

297+
def with_extension(self, extension: Any) -> SessionConfig:
298+
self.config_internal = self.config_internal.with_extension(extension)
299+
return self
300+
297301

298302
class RuntimeEnvBuilder:
299303
"""Runtime configuration options."""

src/common/data_type.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ impl DataTypeMap {
347347
ScalarValue::Map(_) => Err(PyNotImplementedError::new_err(
348348
"ScalarValue::Map".to_string(),
349349
)),
350+
ScalarValue::RunEndEncoded(_, _, _) => todo!(),
350351
}
351352
}
352353
}

src/context.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,11 @@ use datafusion::execution::options::ReadOptions;
4444
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
4545
use datafusion::execution::session_state::SessionStateBuilder;
4646
use datafusion::prelude::{
47-
AvroReadOptions, CsvReadOptions, DataFrame, NdJsonReadOptions, ParquetReadOptions,
47+
AvroReadOptions, CsvReadOptions, DataFrame, JsonReadOptions, ParquetReadOptions,
4848
};
4949
use datafusion_ffi::catalog_provider::FFI_CatalogProvider;
5050
use datafusion_ffi::catalog_provider_list::FFI_CatalogProviderList;
51+
use datafusion_ffi::config::extension_options::FFI_ExtensionOptions;
5152
use datafusion_ffi::execution::FFI_TaskContextProvider;
5253
use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
5354
use datafusion_proto::logical_plan::DefaultLogicalExtensionCodec;
@@ -176,6 +177,27 @@ impl PySessionConfig {
176177
fn set(&self, key: &str, value: &str) -> Self {
177178
Self::from(self.config.clone().set_str(key, value))
178179
}
180+
181+
pub fn with_extension(&self, extension: Bound<PyAny>) -> PyResult<Self> {
182+
let capsule = extension.call_method0("__datafusion_extension_options__")?;
183+
let capsule = capsule.downcast::<PyCapsule>().map_err(py_datafusion_err)?;
184+
185+
validate_pycapsule(capsule, "datafusion_extension_options")?;
186+
187+
let mut extension = unsafe { capsule.reference::<FFI_ExtensionOptions>() }.clone();
188+
189+
let mut config = self.config.clone();
190+
let options = config.options_mut();
191+
if let Some(prior_extension) = options.extensions.get::<FFI_ExtensionOptions>() {
192+
extension
193+
.merge(prior_extension)
194+
.map_err(py_datafusion_err)?;
195+
}
196+
197+
options.extensions.insert(extension);
198+
199+
Ok(Self::from(config))
200+
}
179201
}
180202

181203
/// Runtime options for a SessionContext
@@ -815,7 +837,7 @@ impl PySessionContext {
815837
.to_str()
816838
.ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?;
817839

818-
let mut options = NdJsonReadOptions::default()
840+
let mut options = JsonReadOptions::default()
819841
.file_compression_type(parse_file_compression_type(file_compression_type)?)
820842
.table_partition_cols(
821843
table_partition_cols
@@ -976,7 +998,7 @@ impl PySessionContext {
976998
let path = path
977999
.to_str()
9781000
.ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?;
979-
let mut options = NdJsonReadOptions::default()
1001+
let mut options = JsonReadOptions::default()
9801002
.table_partition_cols(
9811003
table_partition_cols
9821004
.into_iter()

src/dataset_exec.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
3131
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
3232
use datafusion::physical_plan::{
3333
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
34-
SendableRecordBatchStream, Statistics,
34+
SendableRecordBatchStream,
3535
};
3636
use futures::{TryStreamExt, stream};
3737
/// Implements a Datafusion physical ExecutionPlan that delegates to a PyArrow Dataset
@@ -70,7 +70,6 @@ pub(crate) struct DatasetExec {
7070
fragments: Py<PyList>,
7171
columns: Option<Vec<String>>,
7272
filter_expr: Option<Py<PyAny>>,
73-
projected_statistics: Statistics,
7473
plan_properties: datafusion::physical_plan::PlanProperties,
7574
}
7675

@@ -111,7 +110,7 @@ impl DatasetExec {
111110

112111
let scanner = dataset.call_method("scanner", (), Some(&kwargs))?;
113112

114-
let schema = Arc::new(
113+
let schema: SchemaRef = Arc::new(
115114
scanner
116115
.getattr("projected_schema")?
117116
.extract::<PyArrowType<_>>()?
@@ -130,7 +129,6 @@ impl DatasetExec {
130129
let fragments_iter = pylist.call1((fragments_iterator,))?;
131130
let fragments = fragments_iter.downcast::<PyList>().map_err(PyErr::from)?;
132131

133-
let projected_statistics = Statistics::new_unknown(&schema);
134132
let plan_properties = datafusion::physical_plan::PlanProperties::new(
135133
EquivalenceProperties::new(schema.clone()),
136134
Partitioning::UnknownPartitioning(fragments.len()),
@@ -144,7 +142,6 @@ impl DatasetExec {
144142
fragments: fragments.clone().unbind(),
145143
columns,
146144
filter_expr,
147-
projected_statistics,
148145
plan_properties,
149146
})
150147
}
@@ -235,10 +232,6 @@ impl ExecutionPlan for DatasetExec {
235232
})
236233
}
237234

238-
fn statistics(&self) -> DFResult<Statistics> {
239-
Ok(self.projected_statistics.clone())
240-
}
241-
242235
fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
243236
&self.plan_properties
244237
}

src/expr.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ impl PyExpr {
210210
Expr::Unnest(value) => {
211211
Ok(unnest_expr::PyUnnestExpr::from(value.clone()).into_bound_py_any(py)?)
212212
}
213+
Expr::SetComparison(_) => todo!(),
213214
})
214215
}
215216

@@ -382,6 +383,7 @@ impl PyExpr {
382383
Expr::Wildcard { .. } => {
383384
return Err(py_unsupported_variant_err("Expr::Wildcard is unsupported"));
384385
}
386+
Expr::SetComparison(_) => todo!(),
385387
})
386388
}
387389

@@ -518,6 +520,7 @@ impl PyExpr {
518520
Expr::Wildcard { .. } => {
519521
Err(py_unsupported_variant_err("Expr::Wildcard is unsupported"))
520522
}
523+
Expr::SetComparison(_) => todo!(),
521524
}
522525
}
523526

src/expr/dml.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ impl From<WriteOp> for PyWriteOp {
110110
WriteOp::Update => PyWriteOp::Update,
111111
WriteOp::Delete => PyWriteOp::Delete,
112112
WriteOp::Ctas => PyWriteOp::Ctas,
113+
WriteOp::Truncate => todo!(),
113114
}
114115
}
115116
}

0 commit comments

Comments
 (0)