Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions encodings/fastlanes/src/for/array/for_compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ mod test {

use super::*;
use crate::BitPackedArray;
use crate::r#for::array::for_decompress::decompress;
use crate::r#for::array::for_decompress::apply_reference;
use crate::r#for::array::for_decompress::fused_decompress;
use crate::r#for::array::for_decompress::try_fused_decompress;

static SESSION: LazyLock<VortexSession> =
LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
Expand Down Expand Up @@ -169,7 +170,13 @@ mod test {
let expected_unsigned = PrimitiveArray::from_iter(unsigned);
assert_arrays_eq!(encoded, expected_unsigned);

let decompressed = decompress(&compressed, &mut SESSION.create_execution_ctx())?;
let mut ctx = SESSION.create_execution_ctx();
let decompressed = if let Some(result) = try_fused_decompress(&compressed, &mut ctx)? {
result
} else {
let encoded = compressed.encoded().to_primitive();
apply_reference(&compressed, encoded)
};
array
.as_slice::<i8>()
.iter()
Expand Down
23 changes: 14 additions & 9 deletions encodings/fastlanes/src/for/array/for_decompress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,28 @@ impl<T: PhysicalPType<Physical = T> + FoR> UnpackStrategy<T> for FoRStrategy<T>
}
}

pub fn decompress(array: &FoRArray, ctx: &mut ExecutionCtx) -> VortexResult<PrimitiveArray> {
let ptype = array.ptype();

// Try to do fused unpack.
/// Try the fused BitPacked decompression path. Returns `None` if the child is not BitPacked
/// or the reference type is not unsigned.
pub fn try_fused_decompress(
array: &FoRArray,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<PrimitiveArray>> {
if array.reference_scalar().dtype().is_unsigned_int()
&& let Some(bp) = array.encoded().as_opt::<BitPackedVTable>()
{
return match_each_unsigned_integer_ptype!(array.ptype(), |T| {
fused_decompress::<T>(array, bp, ctx)
fused_decompress::<T>(array, bp, ctx).map(Some)
});
}
Ok(None)
}

// TODO(ngates): Do we need this to be into_encoded() somehow?
let encoded = array.encoded().clone().execute::<PrimitiveArray>(ctx)?;
/// Apply the FoR reference value to an already-decoded PrimitiveArray.
pub fn apply_reference(array: &FoRArray, encoded: PrimitiveArray) -> PrimitiveArray {
let ptype = array.ptype();
let validity = encoded.validity().clone();

Ok(match_each_integer_ptype!(ptype, |T| {
match_each_integer_ptype!(ptype, |T| {
let min = array
.reference_scalar()
.as_primitive()
Expand All @@ -75,7 +80,7 @@ pub fn decompress(array: &FoRArray, ctx: &mut ExecutionCtx) -> VortexResult<Prim
validity,
)
}
}))
})
}

pub(crate) fn fused_decompress<
Expand Down
57 changes: 55 additions & 2 deletions encodings/fastlanes/src/for/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ use vortex_array::ExecutionCtx;
use vortex_array::ExecutionStep;
use vortex_array::IntoArray;
use vortex_array::Precision;
use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::ConstantVTable;
use vortex_array::arrays::PrimitiveVTable;
use vortex_array::buffer::BufferHandle;
use vortex_array::dtype::DType;
use vortex_array::match_each_integer_ptype;
use vortex_array::scalar::Scalar;
use vortex_array::scalar::ScalarValue;
use vortex_array::serde::ArrayChildren;
Expand All @@ -21,14 +25,16 @@ use vortex_array::vtable;
use vortex_array::vtable::ArrayId;
use vortex_array::vtable::VTable;
use vortex_array::vtable::ValidityVTableFromChild;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;
use vortex_error::vortex_panic;
use vortex_session::VortexSession;

use crate::FoRArray;
use crate::r#for::array::for_decompress::decompress;
use crate::r#for::array::for_decompress::apply_reference;
use crate::r#for::array::for_decompress::try_fused_decompress;
use crate::r#for::vtable::kernels::PARENT_KERNELS;
use crate::r#for::vtable::rules::PARENT_RULES;

Expand Down Expand Up @@ -167,7 +173,54 @@ impl VTable for FoRVTable {
}

fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
Ok(ExecutionStep::Done(decompress(array, ctx)?.into_array()))
// Try fused decompress with BitPacked child (no child execution needed).
if let Some(result) = try_fused_decompress(array, ctx)? {
return Ok(ExecutionStep::Done(result.into_array()));
}

// If child is already a PrimitiveArray, add the reference value.
if array.encoded().is::<PrimitiveVTable>() {
let encoded = array.encoded().as_::<PrimitiveVTable>().clone();
return Ok(ExecutionStep::Done(
apply_reference(array, encoded).into_array(),
));
}

// If child is a constant, compute the result as a constant.
if let Some(constant) = array.encoded().as_opt::<ConstantVTable>() {
let scalar = constant.scalar();
if scalar.is_null() {
return Ok(ExecutionStep::Done(
ConstantArray::new(Scalar::null(array.dtype().clone()), array.len())
.into_array(),
));
}
return Ok(ExecutionStep::Done(match_each_integer_ptype!(
array.ptype(),
|T| {
let enc_val = scalar
.as_primitive()
.typed_value::<T>()
.vortex_expect("constant must be non-null after check");
let ref_val = array
.reference_scalar()
.as_primitive()
.typed_value::<T>()
.vortex_expect("reference must be non-null");
ConstantArray::new(
Scalar::primitive(
enc_val.wrapping_add(ref_val),
scalar.dtype().nullability(),
),
array.len(),
)
.into_array()
}
)));
}

// Otherwise, ask the scheduler to execute the child first.
Ok(ExecutionStep::ExecuteChild(0))
}

fn execute_parent(
Expand Down
2 changes: 1 addition & 1 deletion encodings/zigzag/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub fn vortex_zigzag::ZigZagVTable::deserialize(_bytes: &[u8], _dtype: &vortex_a

pub fn vortex_zigzag::ZigZagVTable::dtype(array: &vortex_zigzag::ZigZagArray) -> &vortex_array::dtype::DType

pub fn vortex_zigzag::ZigZagVTable::execute(array: &Self::Array, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::executor::ExecutionStep>
pub fn vortex_zigzag::ZigZagVTable::execute(array: &Self::Array, _ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::executor::ExecutionStep>

pub fn vortex_zigzag::ZigZagVTable::execute_parent(array: &Self::Array, parent: &vortex_array::array::ArrayRef, child_idx: usize, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<core::option::Option<vortex_array::array::ArrayRef>>

Expand Down
40 changes: 36 additions & 4 deletions encodings/zigzag/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ use vortex_array::ExecutionCtx;
use vortex_array::ExecutionStep;
use vortex_array::IntoArray;
use vortex_array::Precision;
use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::ConstantVTable;
use vortex_array::arrays::PrimitiveVTable;
use vortex_array::buffer::BufferHandle;
use vortex_array::dtype::DType;
use vortex_array::dtype::PType;
Expand Down Expand Up @@ -149,10 +152,39 @@ impl VTable for ZigZagVTable {
Ok(())
}

fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
Ok(ExecutionStep::Done(
zigzag_decode(array.encoded().clone().execute(ctx)?).into_array(),
))
fn execute(array: &Self::Array, _ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
// If child is already a PrimitiveArray, decode it directly.
if array.encoded().is::<PrimitiveVTable>() {
let encoded = array.encoded().as_::<PrimitiveVTable>().clone();
return Ok(ExecutionStep::Done(zigzag_decode(encoded).into_array()));
}

// If child is a constant, decode the scalar value.
if let Some(constant) = array.encoded().as_opt::<ConstantVTable>() {
let scalar = constant.scalar();
if scalar.is_null() {
return Ok(ExecutionStep::Done(
ConstantArray::new(Scalar::null(array.dtype().clone()), array.len())
.into_array(),
));
}
let result = match_each_unsigned_integer_ptype!(scalar.as_primitive().ptype(), |P| {
let val = scalar
.as_primitive()
.typed_value::<P>()
.vortex_expect("constant must be non-null after check");
Scalar::primitive(
<<P as ZigZagEncoded>::Int>::decode(val),
array.dtype().nullability(),
)
});
return Ok(ExecutionStep::Done(
ConstantArray::new(result, array.len()).into_array(),
));
}

// Otherwise, ask the scheduler to execute the child first.
Ok(ExecutionStep::ExecuteChild(0))
}

fn reduce_parent(
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3138,7 +3138,7 @@ pub fn vortex_array::arrays::SliceVTable::deserialize(_bytes: &[u8], _dtype: &vo

pub fn vortex_array::arrays::SliceVTable::dtype(array: &vortex_array::arrays::SliceArray) -> &vortex_array::dtype::DType

pub fn vortex_array::arrays::SliceVTable::execute(array: &Self::Array, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::ExecutionStep>
pub fn vortex_array::arrays::SliceVTable::execute(array: &Self::Array, _ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::ExecutionStep>

pub fn vortex_array::arrays::SliceVTable::execute_parent(array: &Self::Array, parent: &vortex_array::ArrayRef, child_idx: usize, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<core::option::Option<vortex_array::ArrayRef>>

Expand Down Expand Up @@ -17402,7 +17402,7 @@ pub fn vortex_array::arrays::SliceVTable::deserialize(_bytes: &[u8], _dtype: &vo

pub fn vortex_array::arrays::SliceVTable::dtype(array: &vortex_array::arrays::SliceArray) -> &vortex_array::dtype::DType

pub fn vortex_array::arrays::SliceVTable::execute(array: &Self::Array, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::ExecutionStep>
pub fn vortex_array::arrays::SliceVTable::execute(array: &Self::Array, _ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::ExecutionStep>

pub fn vortex_array::arrays::SliceVTable::execute_parent(array: &Self::Array, parent: &vortex_array::ArrayRef, child_idx: usize, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<core::option::Option<vortex_array::ArrayRef>>

Expand Down
30 changes: 23 additions & 7 deletions vortex-array/src/arrays/filter/vtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ use vortex_error::vortex_panic;
use vortex_mask::Mask;
use vortex_session::VortexSession;

use crate::AnyCanonical;
use crate::Array;
use crate::ArrayEq;
use crate::ArrayHash;
use crate::ArrayRef;
use crate::Canonical;
use crate::IntoArray;
use crate::Precision;
use crate::arrays::ConstantArray;
use crate::arrays::ConstantVTable;
use crate::arrays::filter::array::FilterArray;
use crate::arrays::filter::execute::execute_filter;
use crate::arrays::filter::execute::execute_filter_fast_paths;
Expand Down Expand Up @@ -156,18 +160,30 @@ impl VTable for FilterVTable {
}

fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
if let Some(canonical) = execute_filter_fast_paths(array, ctx)? {
return Ok(ExecutionStep::Done(canonical));
if let Some(result) = execute_filter_fast_paths(array, ctx)? {
return Ok(ExecutionStep::Done(result));
}
let Mask::Values(mask_values) = &array.mask else {
unreachable!("`execute_filter_fast_paths` handles AllTrue and AllFalse")
};

// We rely on the optimization pass that runs prior to this execution for filter pushdown,
// so now we can just execute the filter without worrying.
Ok(ExecutionStep::Done(
execute_filter(array.child.clone().execute(ctx)?, mask_values).into_array(),
))
// If child is already canonical, filter it directly.
if let Some(canonical) = array.child.as_opt::<AnyCanonical>() {
return Ok(ExecutionStep::Done(
execute_filter(Canonical::from(canonical), mask_values).into_array(),
));
}

// If child is a constant, filtering just changes the length.
if let Some(constant) = array.child.as_opt::<ConstantVTable>() {
return Ok(ExecutionStep::Done(
ConstantArray::new(constant.scalar().clone(), mask_values.true_count())
.into_array(),
));
}

// Otherwise, ask the scheduler to execute the child first.
Ok(ExecutionStep::ExecuteChild(0))
}

fn reduce_parent(
Expand Down
25 changes: 21 additions & 4 deletions vortex-array/src/arrays/masked/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@ use vortex_error::vortex_ensure;
use vortex_error::vortex_panic;
use vortex_session::VortexSession;

use crate::AnyCanonical;
use crate::ArrayRef;
use crate::Canonical;
use crate::EmptyMetadata;
use crate::IntoArray;
use crate::Precision;
use crate::arrays::ConstantArray;
use crate::arrays::ConstantVTable;
use crate::arrays::constant::constant_canonicalize;
use crate::arrays::masked::MaskedArray;
use crate::arrays::masked::compute::rules::PARENT_RULES;
use crate::arrays::masked::mask_validity_canonical;
Expand Down Expand Up @@ -178,10 +181,24 @@ impl VTable for MaskedVTable {
// While we could manually convert the dtype, `mask_validity_canonical` is already O(1) for
// `AllTrue` masks (no data copying), so there's no benefit.

let child = array.child().clone().execute::<Canonical>(ctx)?;
Ok(ExecutionStep::Done(
mask_validity_canonical(child, &validity_mask, ctx)?.into_array(),
))
// If child is already canonical, apply the validity mask directly.
if let Some(canonical) = array.child().as_opt::<AnyCanonical>() {
return Ok(ExecutionStep::Done(
mask_validity_canonical(Canonical::from(canonical), &validity_mask, ctx)?
.into_array(),
));
}

// If child is a constant, expand to canonical then apply the validity mask.
if let Some(constant) = array.child().as_opt::<ConstantVTable>() {
let canonical = constant_canonicalize(constant)?;
return Ok(ExecutionStep::Done(
mask_validity_canonical(canonical, &validity_mask, ctx)?.into_array(),
));
}

// Otherwise, ask the scheduler to execute the child first.
Ok(ExecutionStep::ExecuteChild(0))
}

fn reduce_parent(
Expand Down
35 changes: 20 additions & 15 deletions vortex-array/src/arrays/slice/vtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ use crate::ArrayEq;
use crate::ArrayHash;
use crate::ArrayRef;
use crate::Canonical;
use crate::IntoArray;
use crate::Precision;
use crate::arrays::ConstantArray;
use crate::arrays::ConstantVTable;
use crate::arrays::slice::array::SliceArray;
use crate::arrays::slice::rules::PARENT_RULES;
use crate::buffer::BufferHandle;
Expand Down Expand Up @@ -155,23 +158,25 @@ impl VTable for SliceVTable {
Ok(())
}

fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
// Execute the child to get canonical form, then slice it
let Some(canonical) = array.child.as_opt::<AnyCanonical>() else {
// If the child is not canonical, recurse.
return array
.child
.clone()
.execute::<ArrayRef>(ctx)?
.slice(array.slice_range().clone())
fn execute(array: &Self::Array, _ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
// If child is already canonical, slice it directly.
if let Some(canonical) = array.child.as_opt::<AnyCanonical>() {
// TODO(ngates): we should inline canonical slice logic here.
return Canonical::from(canonical)
.as_ref()
.slice(array.range.clone())
.map(ExecutionStep::Done);
};
}

// If child is a constant, slicing just changes the length.
if let Some(constant) = array.child.as_opt::<ConstantVTable>() {
return Ok(ExecutionStep::Done(
ConstantArray::new(constant.scalar().clone(), array.range.len()).into_array(),
));
}

// TODO(ngates): we should inline canonical slice logic here.
Canonical::from(canonical)
.as_ref()
.slice(array.range.clone())
.map(ExecutionStep::Done)
// Otherwise, ask the scheduler to execute the child first.
Ok(ExecutionStep::ExecuteChild(0))
}

fn reduce_parent(
Expand Down
Loading