Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions encodings/alp/src/alp_rd/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use vortex_array::IntoArray;
use vortex_array::Precision;
use vortex_array::ProstMetadata;
use vortex_array::SerializeMetadata;
use vortex_array::arrays::ConstantVTable;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::PrimitiveVTable;
use vortex_array::buffer::BufferHandle;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
Expand All @@ -41,7 +43,6 @@ use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;
use vortex_error::vortex_err;
use vortex_error::vortex_panic;
use vortex_mask::Mask;
use vortex_session::VortexSession;

use crate::alp_rd::kernel::PARENT_KERNELS;
Expand Down Expand Up @@ -297,17 +298,27 @@ impl VTable for ALPRDVTable {
}

fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
let left_parts = array.left_parts().clone().execute::<PrimitiveArray>(ctx)?;
let right_parts = array.right_parts().clone().execute::<PrimitiveArray>(ctx)?;
// Ensure left_parts (child 0) is a PrimitiveArray.
let left_parts = if let Some(primitive) = array.left_parts().as_opt::<PrimitiveVTable>() {
primitive.clone()
} else if array.left_parts().is::<ConstantVTable>() {
array.left_parts().to_canonical()?.into_primitive()
} else {
return Ok(ExecutionStep::ExecuteChild(0));
};

// Ensure right_parts (child 1) is a PrimitiveArray.
let right_parts = if let Some(primitive) = array.right_parts().as_opt::<PrimitiveVTable>() {
primitive.clone()
} else if array.right_parts().is::<ConstantVTable>() {
array.right_parts().to_canonical()?.into_primitive()
} else {
return Ok(ExecutionStep::ExecuteChild(1));
};

// Decode the left_parts using our builtin dictionary.
let left_parts_dict = array.left_parts_dictionary();

let validity = array
.left_parts()
.validity()?
.to_array(array.len())
.execute::<Mask>(ctx)?;
let validity = left_parts.validity_mask()?;

let decoded_array = if array.is_f32() {
PrimitiveArray::new(
Expand Down
2 changes: 1 addition & 1 deletion encodings/decimal-byte-parts/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub fn vortex_decimal_byte_parts::DecimalBytePartsVTable::deserialize(bytes: &[u

pub fn vortex_decimal_byte_parts::DecimalBytePartsVTable::dtype(array: &vortex_decimal_byte_parts::DecimalBytePartsArray) -> &vortex_array::dtype::DType

pub fn vortex_decimal_byte_parts::DecimalBytePartsVTable::execute(array: &Self::Array, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::executor::ExecutionStep>
pub fn vortex_decimal_byte_parts::DecimalBytePartsVTable::execute(array: &Self::Array, _ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::executor::ExecutionStep>

pub fn vortex_decimal_byte_parts::DecimalBytePartsVTable::execute_parent(array: &Self::Array, parent: &vortex_array::array::ArrayRef, child_idx: usize, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<core::option::Option<vortex_array::array::ArrayRef>>

Expand Down
54 changes: 27 additions & 27 deletions encodings/decimal-byte-parts/src/decimal_byte_parts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use vortex_array::IntoArray;
use vortex_array::Precision;
use vortex_array::ProstMetadata;
use vortex_array::SerializeMetadata;
use vortex_array::arrays::ConstantVTable;
use vortex_array::arrays::DecimalArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::PrimitiveVTable;
use vortex_array::buffer::BufferHandle;
use vortex_array::dtype::DType;
use vortex_array::dtype::DecimalDType;
Expand Down Expand Up @@ -190,8 +191,31 @@ impl VTable for DecimalBytePartsVTable {
PARENT_RULES.evaluate(array, parent, child_idx)
}

fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
to_canonical_decimal(array, ctx).map(ExecutionStep::Done)
fn execute(array: &Self::Array, _ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
// Ensure msp (child 0) is a PrimitiveArray.
let prim = if let Some(primitive) = array.msp.as_opt::<PrimitiveVTable>() {
primitive.clone()
} else if array.msp.is::<ConstantVTable>() {
array.msp.to_canonical()?.into_primitive()
} else {
return Ok(ExecutionStep::ExecuteChild(0));
};

Ok(ExecutionStep::Done(match_each_signed_integer_ptype!(
prim.ptype(),
|P| {
// SAFETY: The primitive array's buffer is already validated with correct type.
// The decimal dtype matches the array's dtype, and validity is preserved.
unsafe {
DecimalArray::new_unchecked(
prim.to_buffer::<P>(),
*array.decimal_dtype(),
prim.validity().clone(),
)
}
.into_array()
}
)))
}

fn execute_parent(
Expand Down Expand Up @@ -277,30 +301,6 @@ impl DecimalBytePartsVTable {
pub const ID: ArrayId = ArrayId::new_ref("vortex.decimal_byte_parts");
}

/// Converts a DecimalBytePartsArray to its canonical DecimalArray representation.
fn to_canonical_decimal(
array: &DecimalBytePartsArray,
ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
// TODO(joe): support parts len != 1
let prim = array.msp.clone().execute::<PrimitiveArray>(ctx)?;
// Depending on the decimal type and the min/max of the primitive array we can choose
// the correct buffer size

Ok(match_each_signed_integer_ptype!(prim.ptype(), |P| {
// SAFETY: The primitive array's buffer is already validated with correct type.
// The decimal dtype matches the array's dtype, and validity is preserved.
unsafe {
DecimalArray::new_unchecked(
prim.to_buffer::<P>(),
*array.decimal_dtype(),
prim.validity().clone(),
)
}
.into_array()
}))
}

impl OperationsVTable<DecimalBytePartsVTable> for DecimalBytePartsVTable {
fn scalar_at(array: &DecimalBytePartsArray, index: usize) -> VortexResult<Scalar> {
// TODO(joe): support parts len != 1
Expand Down
11 changes: 9 additions & 2 deletions encodings/fastlanes/src/for/array/for_compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ mod test {

use super::*;
use crate::BitPackedArray;
use crate::r#for::array::for_decompress::decompress;
use crate::r#for::array::for_decompress::apply_reference;
use crate::r#for::array::for_decompress::fused_decompress;
use crate::r#for::array::for_decompress::try_fused_decompress;

static SESSION: LazyLock<VortexSession> =
LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
Expand Down Expand Up @@ -169,7 +170,13 @@ mod test {
let expected_unsigned = PrimitiveArray::from_iter(unsigned);
assert_arrays_eq!(encoded, expected_unsigned);

let decompressed = decompress(&compressed, &mut SESSION.create_execution_ctx())?;
let mut ctx = SESSION.create_execution_ctx();
let decompressed = if let Some(result) = try_fused_decompress(&compressed, &mut ctx)? {
result
} else {
let encoded = compressed.encoded().to_primitive();
apply_reference(&compressed, encoded)
};
array
.as_slice::<i8>()
.iter()
Expand Down
23 changes: 14 additions & 9 deletions encodings/fastlanes/src/for/array/for_decompress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,28 @@ impl<T: PhysicalPType<Physical = T> + FoR> UnpackStrategy<T> for FoRStrategy<T>
}
}

pub fn decompress(array: &FoRArray, ctx: &mut ExecutionCtx) -> VortexResult<PrimitiveArray> {
let ptype = array.ptype();

// Try to do fused unpack.
/// Try the fused BitPacked decompression path. Returns `None` if the child is not BitPacked
/// or the reference type is not unsigned.
pub fn try_fused_decompress(
array: &FoRArray,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<PrimitiveArray>> {
if array.reference_scalar().dtype().is_unsigned_int()
&& let Some(bp) = array.encoded().as_opt::<BitPackedVTable>()
{
return match_each_unsigned_integer_ptype!(array.ptype(), |T| {
fused_decompress::<T>(array, bp, ctx)
fused_decompress::<T>(array, bp, ctx).map(Some)
});
}
Ok(None)
}

// TODO(ngates): Do we need this to be into_encoded() somehow?
let encoded = array.encoded().clone().execute::<PrimitiveArray>(ctx)?;
/// Apply the FoR reference value to an already-decoded PrimitiveArray.
pub fn apply_reference(array: &FoRArray, encoded: PrimitiveArray) -> PrimitiveArray {
let ptype = array.ptype();
let validity = encoded.validity().clone();

Ok(match_each_integer_ptype!(ptype, |T| {
match_each_integer_ptype!(ptype, |T| {
let min = array
.reference_scalar()
.as_primitive()
Expand All @@ -75,7 +80,7 @@ pub fn decompress(array: &FoRArray, ctx: &mut ExecutionCtx) -> VortexResult<Prim
validity,
)
}
}))
})
}

pub(crate) fn fused_decompress<
Expand Down
57 changes: 55 additions & 2 deletions encodings/fastlanes/src/for/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ use vortex_array::ExecutionCtx;
use vortex_array::ExecutionStep;
use vortex_array::IntoArray;
use vortex_array::Precision;
use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::ConstantVTable;
use vortex_array::arrays::PrimitiveVTable;
use vortex_array::buffer::BufferHandle;
use vortex_array::dtype::DType;
use vortex_array::match_each_integer_ptype;
use vortex_array::scalar::Scalar;
use vortex_array::scalar::ScalarValue;
use vortex_array::serde::ArrayChildren;
Expand All @@ -21,14 +25,16 @@ use vortex_array::vtable;
use vortex_array::vtable::ArrayId;
use vortex_array::vtable::VTable;
use vortex_array::vtable::ValidityVTableFromChild;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;
use vortex_error::vortex_panic;
use vortex_session::VortexSession;

use crate::FoRArray;
use crate::r#for::array::for_decompress::decompress;
use crate::r#for::array::for_decompress::apply_reference;
use crate::r#for::array::for_decompress::try_fused_decompress;
use crate::r#for::vtable::kernels::PARENT_KERNELS;
use crate::r#for::vtable::rules::PARENT_RULES;

Expand Down Expand Up @@ -167,7 +173,54 @@ impl VTable for FoRVTable {
}

fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
Ok(ExecutionStep::Done(decompress(array, ctx)?.into_array()))
// Try fused decompress with BitPacked child (no child execution needed).
if let Some(result) = try_fused_decompress(array, ctx)? {
return Ok(ExecutionStep::Done(result.into_array()));
}

// If child is already a PrimitiveArray, add the reference value.
if array.encoded().is::<PrimitiveVTable>() {
let encoded = array.encoded().as_::<PrimitiveVTable>().clone();
return Ok(ExecutionStep::Done(
apply_reference(array, encoded).into_array(),
));
}

// If child is a constant, compute the result as a constant.
if let Some(constant) = array.encoded().as_opt::<ConstantVTable>() {
let scalar = constant.scalar();
if scalar.is_null() {
return Ok(ExecutionStep::Done(
ConstantArray::new(Scalar::null(array.dtype().clone()), array.len())
.into_array(),
));
}
return Ok(ExecutionStep::Done(match_each_integer_ptype!(
array.ptype(),
|T| {
let enc_val = scalar
.as_primitive()
.typed_value::<T>()
.vortex_expect("constant must be non-null after check");
let ref_val = array
.reference_scalar()
.as_primitive()
.typed_value::<T>()
.vortex_expect("reference must be non-null");
ConstantArray::new(
Scalar::primitive(
enc_val.wrapping_add(ref_val),
scalar.dtype().nullability(),
),
array.len(),
)
.into_array()
}
)));
}

// Otherwise, ask the scheduler to execute the child first.
Ok(ExecutionStep::ExecuteChild(0))
}

fn execute_parent(
Expand Down
2 changes: 1 addition & 1 deletion encodings/zigzag/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub fn vortex_zigzag::ZigZagVTable::deserialize(_bytes: &[u8], _dtype: &vortex_a

pub fn vortex_zigzag::ZigZagVTable::dtype(array: &vortex_zigzag::ZigZagArray) -> &vortex_array::dtype::DType

pub fn vortex_zigzag::ZigZagVTable::execute(array: &Self::Array, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::executor::ExecutionStep>
pub fn vortex_zigzag::ZigZagVTable::execute(array: &Self::Array, _ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::executor::ExecutionStep>

pub fn vortex_zigzag::ZigZagVTable::execute_parent(array: &Self::Array, parent: &vortex_array::array::ArrayRef, child_idx: usize, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<core::option::Option<vortex_array::array::ArrayRef>>

Expand Down
40 changes: 36 additions & 4 deletions encodings/zigzag/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ use vortex_array::ExecutionCtx;
use vortex_array::ExecutionStep;
use vortex_array::IntoArray;
use vortex_array::Precision;
use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::ConstantVTable;
use vortex_array::arrays::PrimitiveVTable;
use vortex_array::buffer::BufferHandle;
use vortex_array::dtype::DType;
use vortex_array::dtype::PType;
Expand Down Expand Up @@ -149,10 +152,39 @@ impl VTable for ZigZagVTable {
Ok(())
}

fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
Ok(ExecutionStep::Done(
zigzag_decode(array.encoded().clone().execute(ctx)?).into_array(),
))
fn execute(array: &Self::Array, _ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
// If child is already a PrimitiveArray, decode it directly.
if array.encoded().is::<PrimitiveVTable>() {
let encoded = array.encoded().as_::<PrimitiveVTable>().clone();
return Ok(ExecutionStep::Done(zigzag_decode(encoded).into_array()));
}

// If child is a constant, decode the scalar value.
if let Some(constant) = array.encoded().as_opt::<ConstantVTable>() {
let scalar = constant.scalar();
if scalar.is_null() {
return Ok(ExecutionStep::Done(
ConstantArray::new(Scalar::null(array.dtype().clone()), array.len())
.into_array(),
));
}
let result = match_each_unsigned_integer_ptype!(scalar.as_primitive().ptype(), |P| {
let val = scalar
.as_primitive()
.typed_value::<P>()
.vortex_expect("constant must be non-null after check");
Scalar::primitive(
<<P as ZigZagEncoded>::Int>::decode(val),
array.dtype().nullability(),
)
});
return Ok(ExecutionStep::Done(
ConstantArray::new(result, array.len()).into_array(),
));
}

// Otherwise, ask the scheduler to execute the child first.
Ok(ExecutionStep::ExecuteChild(0))
}

fn reduce_parent(
Expand Down
2 changes: 1 addition & 1 deletion encodings/zstd/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ pub fn vortex_zstd::ZstdVTable::deserialize(bytes: &[u8], _dtype: &vortex_array:

pub fn vortex_zstd::ZstdVTable::dtype(array: &vortex_zstd::ZstdArray) -> &vortex_array::dtype::DType

pub fn vortex_zstd::ZstdVTable::execute(array: &Self::Array, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::executor::ExecutionStep>
pub fn vortex_zstd::ZstdVTable::execute(array: &Self::Array, _ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::executor::ExecutionStep>

pub fn vortex_zstd::ZstdVTable::id(_array: &Self::Array) -> vortex_array::vtable::dyn_::ArrayId

Expand Down
7 changes: 2 additions & 5 deletions encodings/zstd/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,8 @@ impl VTable for ZstdVTable {
Ok(())
}

fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
array
.decompress()?
.execute::<ArrayRef>(ctx)
.map(ExecutionStep::Done)
fn execute(array: &Self::Array, _ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
Ok(ExecutionStep::Done(array.decompress()?))
}

fn reduce_parent(
Expand Down
4 changes: 1 addition & 3 deletions encodings/zstd/src/zstd_buffers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,9 +470,7 @@ impl VTable for ZstdBuffersVTable {
fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
let session = ctx.session();
let inner_array = array.decompress_and_build_inner(session)?;
inner_array
.execute::<ArrayRef>(ctx)
.map(ExecutionStep::Done)
Ok(ExecutionStep::Done(inner_array))
}
}

Expand Down
Loading
Loading