diff --git a/CHANGELOG.md b/CHANGELOG.md index fa389c5b..2b0ba711 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## 1.8.0 [unreleased] +### Features + +1. [#352](https://github.com/InfluxCommunity/influxdb3-java/pull/352): Update Apache Flight client dependencies. + ### CI 1. [#313](https://github.com/InfluxCommunity/influxdb3-java/pull/313): Clarify JDK 25+ requirements. diff --git a/pom.xml b/pom.xml index c8cd7fd4..3f935a14 100644 --- a/pom.xml +++ b/pom.xml @@ -90,7 +90,11 @@ 5.14.3 1.14.3 - 4.2.10.Final + 18.3.0 + + 1.78.0 + + 4.1.127.Final 3.5.4 0.8.14 @@ -98,12 +102,39 @@ 3.12.0 + + + + org.apache.arrow + arrow-bom + ${arrow.version} + pom + import + + + + io.grpc + grpc-bom + ${grpc-bom.version} + pom + import + + + + io.netty + netty-bom + ${netty-bom.version} + pom + import + + + + org.apache.arrow flight-core - 18.3.0 org.slf4j @@ -171,19 +202,16 @@ io.netty netty-handler - ${netty-handler.version} io.netty netty-buffer - ${netty-handler.version} io.netty netty-codec - ${netty-handler.version} io.netty @@ -195,7 +223,6 @@ io.netty netty-codec-http2 - ${netty-handler.version} io.netty @@ -211,13 +238,11 @@ io.netty netty-transport-native-unix-common - ${netty-handler.version} io.netty netty-tcnative-boringssl-static - 2.0.75.Final diff --git a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java index 63c95afa..6ec5ee72 100644 --- a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java +++ b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java @@ -59,7 +59,7 @@ public class E2ETest { @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") @Test - void testQueryWithProxy() { + void testQueryWithProxy() throws Exception { String proxyUrl = "http://localhost:10000"; try { @@ -82,17 +82,18 @@ void testQueryWithProxy() { .proxyUrl(proxyUrl) .build(); - InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig); - influxDBClient.writePoint( - Point.measurement("test1") - .setField("field", "field1") - ); + try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) { + influxDBClient.writePoint( + Point.measurement("test1") + .setField("field", "field1") + ); - try (Stream stream = influxDBClient.queryPoints("SELECT * FROM test1")) { - stream.findFirst() - .ifPresent(pointValues -> { - Assertions.assertThat(pointValues.getField("field")).isEqualTo("field1"); - }); + try (Stream stream = influxDBClient.queryPoints("SELECT * FROM test1")) { + stream.findFirst() + .ifPresent(pointValues -> { + Assertions.assertThat(pointValues.getField("field")).isEqualTo("field1"); + }); + } } } @@ -110,15 +111,16 @@ void correctSslCertificates() throws Exception { .database(System.getenv("TESTING_INFLUXDB_DATABASE")) .sslRootsFilePath(influxDBcertificateFile) .build(); - InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig); - assertGetDataSuccess(influxDBClient); + try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) { + assertGetDataSuccess(influxDBClient); + } } @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") @Test - void disableServerCertificateValidation() { + void disableServerCertificateValidation() throws Exception { String wrongCertificateFile = "src/test/java/com/influxdb/v3/client/testdata/docker.com.pem"; ClientConfig clientConfig = new ClientConfig.Builder() @@ -130,8 +132,9 @@ void disableServerCertificateValidation() { .build(); // Test succeeded with wrong certificate file because disableServerCertificateValidation is true - InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig); - assertGetDataSuccess(influxDBClient); + try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) { + assertGetDataSuccess(influxDBClient); + } } @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") diff --git a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java index c17e118f..e9083b7a 100644 --- a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java @@ -27,6 +27,7 @@ import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -39,8 +40,12 @@ import org.apache.arrow.flight.CallOption; import org.apache.arrow.flight.CallOptions; import org.apache.arrow.flight.CallStatus; +import org.apache.arrow.flight.FlightProducer.CallContext; +import org.apache.arrow.flight.FlightProducer.ServerStreamListener; import org.apache.arrow.flight.FlightRuntimeException; import org.apache.arrow.flight.FlightServer; +import org.apache.arrow.flight.NoOpFlightProducer; +import org.apache.arrow.flight.Ticket; import org.apache.arrow.flight.impl.FlightServiceGrpc; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -48,6 +53,7 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import com.influxdb.v3.client.InfluxDBClient; import com.influxdb.v3.client.PointValues; @@ -186,6 +192,62 @@ void setInboundMessageSizeLarge() throws Exception { } } + @Test + @Timeout(5) + void queryTimeout() throws Exception { + int freePort = findFreePort(); + URI uri = URI.create("http://127.0.0.1:" + freePort); + CountDownLatch serverStreamFinished = new CountDownLatch(1); + try (VectorSchemaRoot vectorSchemaRoot = TestUtils.generateVectorSchemaRoot(1, 1); + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + FlightServer flightServer = TestUtils.simpleFlightServer(uri, allocator, new NoOpFlightProducer() { + @Override + public void getStream(final CallContext context, + final Ticket ticket, + final ServerStreamListener listener) { + listener.start(vectorSchemaRoot); + try { + Thread.sleep(1000); + if (!context.isCancelled() && !listener.isCancelled()) { + listener.completed(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + serverStreamFinished.countDown(); + } + } + }) + ) { + flightServer.start(); + + String host = String.format("http://%s:%d", uri.getHost(), uri.getPort()); + ClientConfig clientConfig = new ClientConfig.Builder() + .host(host) + .database("test") + .queryTimeout(Duration.ofMillis(200)) + .build(); + + try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) { + Throwable thrown = Assertions.catchThrowable(() -> { + try (Stream stream = influxDBClient.queryPoints( + "Select * from \"nothing\"" + )) { + stream.count(); + } + }); + + Assertions.assertThat(thrown).isInstanceOf(FlightRuntimeException.class); + FlightRuntimeException fre = (FlightRuntimeException) thrown; + Assertions.assertThat(fre.status().code()).isEqualTo(CallStatus.TIMED_OUT.code()); + } + + Assertions.assertThat(serverStreamFinished.await(2, TimeUnit.SECONDS)).isTrue(); + flightServer.shutdown(); + flightServer.awaitTermination(2, TimeUnit.SECONDS); + } + } + @Test void defaultGrpcCallOptions() { GrpcCallOptions grpcCallOptions = new QueryOptions("test").grpcCallOptions();