Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.parquet.column.statistics;

import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.ColumnOrder;
import org.apache.parquet.schema.Float16;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;

Expand All @@ -28,6 +31,7 @@ public class BinaryStatistics extends Statistics<Binary> {
private static final PrimitiveType DEFAULT_FAKE_TYPE =
Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).named("fake_binary_type");

private final boolean isFloat16;
private Binary max;
private Binary min;

Expand All @@ -41,26 +45,51 @@ public BinaryStatistics() {

BinaryStatistics(PrimitiveType type) {
super(type);
this.isFloat16 = type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.Float16LogicalTypeAnnotation;
if (isFloat16) {
incrementNanCount(0);
}
}

private BinaryStatistics(BinaryStatistics other) {
super(other.type());
this.isFloat16 = other.isFloat16;
if (other.hasNonNullValue()) {
initializeStats(other.min, other.max);
}
setNumNulls(other.getNumNulls());
incrementNanCount(other.getNanCount());
}

@Override
public void updateStats(Binary value) {
if (isFloat16 && Float16.isNaN(value.get2BytesLittleEndian())) {
incrementNanCount();
}
if (!this.hasNonNullValue()) {
min = value.copy();
max = value.copy();
this.markAsNotEmpty();
} else if (comparator().compare(min, value) > 0) {
min = value.copy();
} else if (comparator().compare(max, value) < 0) {
max = value.copy();
} else {
if (isFloat16 && type().columnOrder().equals(ColumnOrder.ieee754TotalOrder())) {
if (!Float16.isNaN(value.get2BytesLittleEndian())) {
if (Float16.isNaN(min.get2BytesLittleEndian())
|| comparator().compare(min, value) > 0) {
min = value.copy();
}
if (Float16.isNaN(max.get2BytesLittleEndian())
|| comparator().compare(max, value) < 0) {
max = value.copy();
}
}
return;
}

if (comparator().compare(min, value) > 0) {
min = value.copy();
} else if (comparator().compare(max, value) < 0) {
max = value.copy();
}
}
}

Expand Down Expand Up @@ -126,6 +155,20 @@ public boolean isSmallerThanWithTruncation(long size, int truncationLength) {
*/
@Deprecated
public void updateStats(Binary min_value, Binary max_value) {
if (isFloat16 && type().columnOrder().equals(ColumnOrder.ieee754TotalOrder())) {
if (!Float16.isNaN(min_value.get2BytesLittleEndian())) {
if (Float16.isNaN(min.get2BytesLittleEndian()) || comparator().compare(min, min_value) > 0) {
min = min_value.copy();
}
}
if (!Float16.isNaN(max_value.get2BytesLittleEndian())) {
if (Float16.isNaN(max.get2BytesLittleEndian()) || comparator().compare(max, max_value) < 0) {
max = max_value.copy();
}
}
return;
}

if (comparator().compare(min, min_value) > 0) {
min = min_value.copy();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.parquet.column.statistics;

import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.schema.ColumnOrder;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;

Expand All @@ -41,6 +42,7 @@ public DoubleStatistics() {

DoubleStatistics(PrimitiveType type) {
super(type);
incrementNanCount(0);
}

private DoubleStatistics(DoubleStatistics other) {
Expand All @@ -49,10 +51,14 @@ private DoubleStatistics(DoubleStatistics other) {
initializeStats(other.min, other.max);
}
setNumNulls(other.getNumNulls());
incrementNanCount(other.getNanCount());
}

@Override
public void updateStats(double value) {
if (Double.isNaN(value)) {
incrementNanCount();
}
if (!this.hasNonNullValue()) {
initializeStats(value, value);
} else {
Expand Down Expand Up @@ -98,6 +104,20 @@ public boolean isSmallerThan(long size) {
}

public void updateStats(double min_value, double max_value) {
if (type().columnOrder().equals(ColumnOrder.ieee754TotalOrder())) {
if (!Double.isNaN(min_value)) {
if (Double.isNaN(min) || comparator().compare(min, min_value) > 0) {
min = min_value;
}
}
if (!Double.isNaN(max_value)) {
if (Double.isNaN(max) || comparator().compare(max, max_value) < 0) {
max = max_value;
}
}
return;
}

if (comparator().compare(min, min_value) > 0) {
min = min_value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.parquet.column.statistics;

import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.schema.ColumnOrder;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;

Expand All @@ -42,6 +43,7 @@ public FloatStatistics() {

FloatStatistics(PrimitiveType type) {
super(type);
incrementNanCount(0);
}

private FloatStatistics(FloatStatistics other) {
Expand All @@ -50,10 +52,14 @@ private FloatStatistics(FloatStatistics other) {
initializeStats(other.min, other.max);
}
setNumNulls(other.getNumNulls());
incrementNanCount(other.getNanCount());
}

@Override
public void updateStats(float value) {
if (Float.isNaN(value)) {
incrementNanCount();
}
if (!this.hasNonNullValue()) {
initializeStats(value, value);
} else {
Expand Down Expand Up @@ -99,6 +105,20 @@ public boolean isSmallerThan(long size) {
}

public void updateStats(float min_value, float max_value) {
if (type().columnOrder().equals(ColumnOrder.ieee754TotalOrder())) {
if (!Float.isNaN(min_value)) {
if (Float.isNaN(min) || comparator().compare(min, min_value) > 0) {
min = min_value;
}
}
if (!Float.isNaN(max_value)) {
if (Float.isNaN(max) || comparator().compare(max, max_value) < 0) {
max = max_value;
}
}
return;
}

if (comparator().compare(min, min_value) > 0) {
min = min_value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.parquet.column.statistics;

import java.util.Arrays;
import org.apache.parquet.Preconditions;
import org.apache.parquet.column.UnknownColumnTypeException;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.ColumnOrder;
import org.apache.parquet.schema.Float16;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveComparator;
Expand All @@ -40,10 +42,11 @@ public abstract class Statistics<T extends Comparable<T>> {
* Builder class to build Statistics objects. Used to read the statistics from the Parquet file.
*/
public static class Builder {
private final PrimitiveType type;
protected final PrimitiveType type;
private byte[] min;
private byte[] max;
private long numNulls = -1;
private long nanCount = -1;

private Builder(PrimitiveType type) {
this.type = type;
Expand All @@ -64,12 +67,21 @@ public Builder withNumNulls(long numNulls) {
return this;
}

public Builder withNanCount(long nanCount) {
this.nanCount = nanCount;
return this;
}

public Statistics<?> build() {
Statistics<?> stats = createStats(type);
if (min != null && max != null) {
stats.setMinMaxFromBytes(min, max);
}
stats.num_nulls = this.numNulls;
stats.nan_count = this.nanCount;
Preconditions.checkState(
!type.columnOrder().equals(ColumnOrder.ieee754TotalOrder()) || stats.nan_count >= 0,
"nan_count is required by IEEE 754 column order with type " + type);
return stats;
}
}
Expand All @@ -87,10 +99,12 @@ public Statistics<?> build() {
if (stats.hasNonNullValue()) {
Float min = stats.genericGetMin();
Float max = stats.genericGetMax();
// Drop min/max values in case of NaN as the sorting order of values is undefined for this case
if (min.isNaN() || max.isNaN()) {
stats.setMinMax(0.0f, 0.0f);
((Statistics<?>) stats).hasNonNullValue = false;
if (!type.columnOrder().equals(ColumnOrder.ieee754TotalOrder())) {
// For TYPE_DEFINED_ORDER: drop min/max values as NaN ordering is undefined
stats.setMinMax(0.0f, 0.0f);
((Statistics<?>) stats).hasNonNullValue = false;
}
} else {
// Updating min to -0.0 and max to +0.0 to ensure that no 0.0 values would be skipped
if (Float.compare(min, 0.0f) == 0) {
Expand Down Expand Up @@ -120,10 +134,12 @@ public Statistics<?> build() {
if (stats.hasNonNullValue()) {
Double min = stats.genericGetMin();
Double max = stats.genericGetMax();
// Drop min/max values in case of NaN as the sorting order of values is undefined for this case
if (min.isNaN() || max.isNaN()) {
stats.setMinMax(0.0, 0.0);
((Statistics<?>) stats).hasNonNullValue = false;
if (!type.columnOrder().equals(ColumnOrder.ieee754TotalOrder())) {
// For TYPE_DEFINED_ORDER: drop min/max values as NaN ordering is undefined
stats.setMinMax(0.0, 0.0);
((Statistics<?>) stats).hasNonNullValue = false;
}
} else {
// Updating min to -0.0 and max to +0.0 to ensure that no 0.0 values would be skipped
if (Double.compare(min, 0.0) == 0) {
Expand Down Expand Up @@ -160,10 +176,12 @@ public Statistics<?> build() {
Binary bMax = stats.genericGetMax();
short min = bMin.get2BytesLittleEndian();
short max = bMax.get2BytesLittleEndian();
// Drop min/max values in case of NaN as the sorting order of values is undefined for this case
if (Float16.isNaN(min) || Float16.isNaN(max)) {
stats.setMinMax(POSITIVE_ZERO_LITTLE_ENDIAN, NEGATIVE_ZERO_LITTLE_ENDIAN);
((Statistics<?>) stats).hasNonNullValue = false;
if (!type.columnOrder().equals(ColumnOrder.ieee754TotalOrder())) {
// For TYPE_DEFINED_ORDER: drop min/max values as NaN ordering is undefined
stats.setMinMax(POSITIVE_ZERO_LITTLE_ENDIAN, NEGATIVE_ZERO_LITTLE_ENDIAN);
((Statistics<?>) stats).hasNonNullValue = false;
}
} else {
// Updating min to -0.0 and max to +0.0 to ensure that no 0.0 values would be skipped
if (min == (short) 0x0000) {
Expand All @@ -182,6 +200,7 @@ public Statistics<?> build() {
private final PrimitiveComparator<T> comparator;
private boolean hasNonNullValue;
private long num_nulls;
private long nan_count = -1;
final PrimitiveStringifier stringifier;

Statistics(PrimitiveType type) {
Expand Down Expand Up @@ -351,7 +370,8 @@ public boolean equals(Object other) {
return type.equals(stats.type)
&& Arrays.equals(stats.getMaxBytes(), this.getMaxBytes())
&& Arrays.equals(stats.getMinBytes(), this.getMinBytes())
&& stats.getNumNulls() == this.getNumNulls();
&& stats.getNumNulls() == this.getNumNulls()
&& stats.getNanCount() == this.getNanCount();
}

/**
Expand Down Expand Up @@ -384,6 +404,11 @@ public void mergeStatistics(Statistics stats) {
mergeStatisticsMinMax(stats);
markAsNotEmpty();
}
if (isNanCountSet() && stats.isNanCountSet()) {
incrementNanCount(stats.getNanCount());
} else {
unsetNanCount();
}
} else {
throw StatisticsClassException.create(this, stats);
}
Expand Down Expand Up @@ -535,6 +560,53 @@ public void incrementNumNulls(long increment) {
num_nulls += increment;
}

/**
* Increments the NaN count by one. If nan_count was not set (-1), initializes it to 1.
*/
public void incrementNanCount() {
if (nan_count < 0) {
nan_count = 1;
} else {
nan_count++;
}
}

/**
* Increments the NaN count by the parameter value. If nan_count was not set (-1), initializes it to increment.
*
* @param increment value to increment the NaN count by
*/
public void incrementNanCount(long increment) {
if (nan_count < 0) {
nan_count = increment;
} else {
nan_count += increment;
}
}

/**
* Returns the NaN count
*
* @return NaN count or {@code -1} if the NaN count is not set
*/
public long getNanCount() {
return nan_count;
}

/**
* @return whether nanCount is set and can be used
*/
public boolean isNanCountSet() {
return nan_count >= 0;
}

/**
* Unsets the NaN count to -1.
*/
public void unsetNanCount() {
nan_count = -1;
}

/**
* Returns the null count
*
Expand Down
Loading
Loading