diff --git a/CHANGELOG.md b/CHANGELOG.md
index fa389c5b..2b0ba711 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,9 @@
## 1.8.0 [unreleased]
+### Features
+
+1. [#352](https://github.com/InfluxCommunity/influxdb3-java/pull/352): Update Apache Flight client dependencies.
+
### CI
1. [#313](https://github.com/InfluxCommunity/influxdb3-java/pull/313): Clarify JDK 25+ requirements.
diff --git a/pom.xml b/pom.xml
index c8cd7fd4..3f935a14 100644
--- a/pom.xml
+++ b/pom.xml
@@ -90,7 +90,11 @@
5.14.3
1.14.3
- 4.2.10.Final
+ 18.3.0
+
+ 1.78.0
+
+ 4.1.127.Final
3.5.4
0.8.14
@@ -98,12 +102,39 @@
3.12.0
+
+
+
+ org.apache.arrow
+ arrow-bom
+ ${arrow.version}
+ pom
+ import
+
+
+
+ io.grpc
+ grpc-bom
+ ${grpc-bom.version}
+ pom
+ import
+
+
+
+ io.netty
+ netty-bom
+ ${netty-bom.version}
+ pom
+ import
+
+
+
+
org.apache.arrow
flight-core
- 18.3.0
org.slf4j
@@ -171,19 +202,16 @@
io.netty
netty-handler
- ${netty-handler.version}
io.netty
netty-buffer
- ${netty-handler.version}
io.netty
netty-codec
- ${netty-handler.version}
io.netty
@@ -195,7 +223,6 @@
io.netty
netty-codec-http2
- ${netty-handler.version}
io.netty
@@ -211,13 +238,11 @@
io.netty
netty-transport-native-unix-common
- ${netty-handler.version}
io.netty
netty-tcnative-boringssl-static
- 2.0.75.Final
diff --git a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java
index 63c95afa..6ec5ee72 100644
--- a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java
+++ b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java
@@ -59,7 +59,7 @@ public class E2ETest {
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
@Test
- void testQueryWithProxy() {
+ void testQueryWithProxy() throws Exception {
String proxyUrl = "http://localhost:10000";
try {
@@ -82,17 +82,18 @@ void testQueryWithProxy() {
.proxyUrl(proxyUrl)
.build();
- InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig);
- influxDBClient.writePoint(
- Point.measurement("test1")
- .setField("field", "field1")
- );
+ try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) {
+ influxDBClient.writePoint(
+ Point.measurement("test1")
+ .setField("field", "field1")
+ );
- try (Stream stream = influxDBClient.queryPoints("SELECT * FROM test1")) {
- stream.findFirst()
- .ifPresent(pointValues -> {
- Assertions.assertThat(pointValues.getField("field")).isEqualTo("field1");
- });
+ try (Stream stream = influxDBClient.queryPoints("SELECT * FROM test1")) {
+ stream.findFirst()
+ .ifPresent(pointValues -> {
+ Assertions.assertThat(pointValues.getField("field")).isEqualTo("field1");
+ });
+ }
}
}
@@ -110,15 +111,16 @@ void correctSslCertificates() throws Exception {
.database(System.getenv("TESTING_INFLUXDB_DATABASE"))
.sslRootsFilePath(influxDBcertificateFile)
.build();
- InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig);
- assertGetDataSuccess(influxDBClient);
+ try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) {
+ assertGetDataSuccess(influxDBClient);
+ }
}
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
@Test
- void disableServerCertificateValidation() {
+ void disableServerCertificateValidation() throws Exception {
String wrongCertificateFile = "src/test/java/com/influxdb/v3/client/testdata/docker.com.pem";
ClientConfig clientConfig = new ClientConfig.Builder()
@@ -130,8 +132,9 @@ void disableServerCertificateValidation() {
.build();
// Test succeeded with wrong certificate file because disableServerCertificateValidation is true
- InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig);
- assertGetDataSuccess(influxDBClient);
+ try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) {
+ assertGetDataSuccess(influxDBClient);
+ }
}
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
diff --git a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java
index c17e118f..e9083b7a 100644
--- a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java
+++ b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java
@@ -27,6 +27,7 @@
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -39,8 +40,12 @@
import org.apache.arrow.flight.CallOption;
import org.apache.arrow.flight.CallOptions;
import org.apache.arrow.flight.CallStatus;
+import org.apache.arrow.flight.FlightProducer.CallContext;
+import org.apache.arrow.flight.FlightProducer.ServerStreamListener;
import org.apache.arrow.flight.FlightRuntimeException;
import org.apache.arrow.flight.FlightServer;
+import org.apache.arrow.flight.NoOpFlightProducer;
+import org.apache.arrow.flight.Ticket;
import org.apache.arrow.flight.impl.FlightServiceGrpc;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
@@ -48,6 +53,7 @@
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import com.influxdb.v3.client.InfluxDBClient;
import com.influxdb.v3.client.PointValues;
@@ -186,6 +192,62 @@ void setInboundMessageSizeLarge() throws Exception {
}
}
+ @Test
+ @Timeout(5)
+ void queryTimeout() throws Exception {
+ int freePort = findFreePort();
+ URI uri = URI.create("http://127.0.0.1:" + freePort);
+ CountDownLatch serverStreamFinished = new CountDownLatch(1);
+ try (VectorSchemaRoot vectorSchemaRoot = TestUtils.generateVectorSchemaRoot(1, 1);
+ BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+ FlightServer flightServer = TestUtils.simpleFlightServer(uri, allocator, new NoOpFlightProducer() {
+ @Override
+ public void getStream(final CallContext context,
+ final Ticket ticket,
+ final ServerStreamListener listener) {
+ listener.start(vectorSchemaRoot);
+ try {
+ Thread.sleep(1000);
+ if (!context.isCancelled() && !listener.isCancelled()) {
+ listener.completed();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ serverStreamFinished.countDown();
+ }
+ }
+ })
+ ) {
+ flightServer.start();
+
+ String host = String.format("http://%s:%d", uri.getHost(), uri.getPort());
+ ClientConfig clientConfig = new ClientConfig.Builder()
+ .host(host)
+ .database("test")
+ .queryTimeout(Duration.ofMillis(200))
+ .build();
+
+ try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) {
+ Throwable thrown = Assertions.catchThrowable(() -> {
+ try (Stream stream = influxDBClient.queryPoints(
+ "Select * from \"nothing\""
+ )) {
+ stream.count();
+ }
+ });
+
+ Assertions.assertThat(thrown).isInstanceOf(FlightRuntimeException.class);
+ FlightRuntimeException fre = (FlightRuntimeException) thrown;
+ Assertions.assertThat(fre.status().code()).isEqualTo(CallStatus.TIMED_OUT.code());
+ }
+
+ Assertions.assertThat(serverStreamFinished.await(2, TimeUnit.SECONDS)).isTrue();
+ flightServer.shutdown();
+ flightServer.awaitTermination(2, TimeUnit.SECONDS);
+ }
+ }
+
@Test
void defaultGrpcCallOptions() {
GrpcCallOptions grpcCallOptions = new QueryOptions("test").grpcCallOptions();