Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 1.8.0 [unreleased]

### Features

1. [#352](https://github.com/InfluxCommunity/influxdb3-java/pull/352): Update Apache Flight client dependencies.

### CI

1. [#313](https://github.com/InfluxCommunity/influxdb3-java/pull/313): Clarify JDK 25+ requirements.
Expand Down
41 changes: 33 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,51 @@

<junit-jupiter.version>5.14.3</junit-jupiter.version>
<junit-platform.version>1.14.3</junit-platform.version>
<netty-handler.version>4.2.10.Final</netty-handler.version>
<arrow.version>18.3.0</arrow.version>
<!-- grpc-bom 1.79.0 is very likely incompatible with Arrow 18.3.0 -->
<grpc-bom.version>1.78.0</grpc-bom.version>
<!-- keep aligned with gRPC/Arrow -->
<netty-bom.version>4.1.127.Final</netty-bom.version>

<plugin.surefire.version>3.5.4</plugin.surefire.version>
<plugin.jacoco.version>0.8.14</plugin.jacoco.version>
<plugin.checkstyle.version>3.6.0</plugin.checkstyle.version>
<plugin.javadoc.version>3.12.0</plugin.javadoc.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-bom</artifactId>
<version>${arrow.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-bom</artifactId>
<version>${grpc-bom.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-bom</artifactId>
<version>${netty-bom.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>

<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>flight-core</artifactId>
<version>18.3.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
Expand Down Expand Up @@ -171,19 +202,16 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty-handler.version}</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>${netty-handler.version}</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
<version>${netty-handler.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
Expand All @@ -195,7 +223,6 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
<version>${netty-handler.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
Expand All @@ -211,13 +238,11 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-unix-common</artifactId>
<version>${netty-handler.version}</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>2.0.75.Final</version>
</dependency>

<dependency>
Expand Down
35 changes: 19 additions & 16 deletions src/test/java/com/influxdb/v3/client/integration/E2ETest.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class E2ETest {
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
@Test
void testQueryWithProxy() {
void testQueryWithProxy() throws Exception {
String proxyUrl = "http://localhost:10000";

try {
Expand All @@ -82,17 +82,18 @@ void testQueryWithProxy() {
.proxyUrl(proxyUrl)
.build();

InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig);
influxDBClient.writePoint(
Point.measurement("test1")
.setField("field", "field1")
);
try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) {
influxDBClient.writePoint(
Point.measurement("test1")
.setField("field", "field1")
);

try (Stream<PointValues> stream = influxDBClient.queryPoints("SELECT * FROM test1")) {
stream.findFirst()
.ifPresent(pointValues -> {
Assertions.assertThat(pointValues.getField("field")).isEqualTo("field1");
});
try (Stream<PointValues> stream = influxDBClient.queryPoints("SELECT * FROM test1")) {
stream.findFirst()
.ifPresent(pointValues -> {
Assertions.assertThat(pointValues.getField("field")).isEqualTo("field1");
});
}
}
}

Expand All @@ -110,15 +111,16 @@ void correctSslCertificates() throws Exception {
.database(System.getenv("TESTING_INFLUXDB_DATABASE"))
.sslRootsFilePath(influxDBcertificateFile)
.build();
InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig);
assertGetDataSuccess(influxDBClient);
try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) {
assertGetDataSuccess(influxDBClient);
}
}

@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
@Test
void disableServerCertificateValidation() {
void disableServerCertificateValidation() throws Exception {
String wrongCertificateFile = "src/test/java/com/influxdb/v3/client/testdata/docker.com.pem";

ClientConfig clientConfig = new ClientConfig.Builder()
Expand All @@ -130,8 +132,9 @@ void disableServerCertificateValidation() {
.build();

// Test succeeded with wrong certificate file because disableServerCertificateValidation is true
InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig);
assertGetDataSuccess(influxDBClient);
try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) {
assertGetDataSuccess(influxDBClient);
}
}

@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
Expand Down
62 changes: 62 additions & 0 deletions src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -39,15 +40,20 @@
import org.apache.arrow.flight.CallOption;
import org.apache.arrow.flight.CallOptions;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.FlightProducer.CallContext;
import org.apache.arrow.flight.FlightProducer.ServerStreamListener;
import org.apache.arrow.flight.FlightRuntimeException;
import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.NoOpFlightProducer;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.flight.impl.FlightServiceGrpc;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import com.influxdb.v3.client.InfluxDBClient;
import com.influxdb.v3.client.PointValues;
Expand Down Expand Up @@ -186,6 +192,62 @@ void setInboundMessageSizeLarge() throws Exception {
}
}

@Test
@Timeout(5)
void queryTimeout() throws Exception {
int freePort = findFreePort();
URI uri = URI.create("http://127.0.0.1:" + freePort);
CountDownLatch serverStreamFinished = new CountDownLatch(1);
try (VectorSchemaRoot vectorSchemaRoot = TestUtils.generateVectorSchemaRoot(1, 1);
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
FlightServer flightServer = TestUtils.simpleFlightServer(uri, allocator, new NoOpFlightProducer() {
@Override
public void getStream(final CallContext context,
final Ticket ticket,
final ServerStreamListener listener) {
listener.start(vectorSchemaRoot);
try {
Thread.sleep(1000);
if (!context.isCancelled() && !listener.isCancelled()) {
listener.completed();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
serverStreamFinished.countDown();
}
}
})
) {
flightServer.start();

String host = String.format("http://%s:%d", uri.getHost(), uri.getPort());
ClientConfig clientConfig = new ClientConfig.Builder()
.host(host)
.database("test")
.queryTimeout(Duration.ofMillis(200))
.build();

try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) {
Throwable thrown = Assertions.catchThrowable(() -> {
try (Stream<PointValues> stream = influxDBClient.queryPoints(
"Select * from \"nothing\""
)) {
stream.count();
}
});

Assertions.assertThat(thrown).isInstanceOf(FlightRuntimeException.class);
FlightRuntimeException fre = (FlightRuntimeException) thrown;
Assertions.assertThat(fre.status().code()).isEqualTo(CallStatus.TIMED_OUT.code());
}

Assertions.assertThat(serverStreamFinished.await(2, TimeUnit.SECONDS)).isTrue();
flightServer.shutdown();
flightServer.awaitTermination(2, TimeUnit.SECONDS);
}
}

@Test
void defaultGrpcCallOptions() {
GrpcCallOptions grpcCallOptions = new QueryOptions("test").grpcCallOptions();
Expand Down
Loading