diff --git a/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java b/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java index fcba49a7ae1..07bbb953489 100644 --- a/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java +++ b/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java @@ -59,7 +59,6 @@ * Client stream for the cronet transport. */ class CronetClientStream extends AbstractClientStream { - private static final int READ_BUFFER_CAPACITY = 4 * 1024; private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0); private static final String LOG_TAG = "grpc-java-cronet"; @@ -69,6 +68,12 @@ class CronetClientStream extends AbstractClientStream { static final CallOptions.Key> CRONET_ANNOTATIONS_KEY = CallOptions.Key.create("cronet-annotations"); + /** + * Sets the read buffer size which the GRPC layer will use to read data from Cronet. Higher buffer + * size leads to less overhead but more memory consumption. The current default value is 4KB. + */ + static final CallOptions.Key CRONET_READ_BUFFER_SIZE_KEY = + CallOptions.Key.createWithDefault("cronet-read-buffer-size", 4 * 1024); private final String url; private final String userAgent; @@ -85,6 +90,8 @@ class CronetClientStream extends AbstractClientStream { private final Collection annotations; private final TransportState state; private final Sink sink = new Sink(); + @VisibleForTesting + final int readBufferSize; private StreamBuilderFactory streamFactory; CronetClientStream( @@ -120,6 +127,7 @@ class CronetClientStream extends AbstractClientStream { this.annotations = callOptions.getOption(CRONET_ANNOTATIONS_KEY); this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, transportTracer, callOptions); + this.readBufferSize = callOptions.getOption(CRONET_READ_BUFFER_SIZE_KEY); // Tests expect the "plain" deframer behavior, not MigratingDeframer // https://github.com/grpc/grpc-java/issues/7140 @@ -309,7 +317,7 @@ public void bytesRead(int processedBytes) { if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { Log.v(LOG_TAG, "BidirectionalStream.read"); } - stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY)); + stream.read(ByteBuffer.allocateDirect(readBufferSize)); } } @@ -429,7 +437,7 @@ public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInf Log.v(LOG_TAG, "BidirectionalStream.read"); } reportHeaders(info.getAllHeadersAsList(), false); - stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY)); + stream.read(ByteBuffer.allocateDirect(readBufferSize)); } @Override diff --git a/cronet/src/main/java/io/grpc/cronet/InternalCronetCallOptions.java b/cronet/src/main/java/io/grpc/cronet/InternalCronetCallOptions.java index e7c4144e63a..9261a0a8f4b 100644 --- a/cronet/src/main/java/io/grpc/cronet/InternalCronetCallOptions.java +++ b/cronet/src/main/java/io/grpc/cronet/InternalCronetCallOptions.java @@ -36,6 +36,18 @@ public static CallOptions withAnnotation(CallOptions callOptions, Object annotat return CronetClientStream.withAnnotation(callOptions, annotation); } + public static CallOptions withReadBufferSize(CallOptions callOptions, int size) { + return callOptions.withOption(CronetClientStream.CRONET_READ_BUFFER_SIZE_KEY, size); + } + + /** + * Returns Cronet read buffer size for gRPC included in the given {@code callOptions}. Read + * buffer can be customized via {@link #withReadBufferSize(CallOptions, int)}. + */ + public static int getReadBufferSize(CallOptions callOptions) { + return callOptions.getOption(CronetClientStream.CRONET_READ_BUFFER_SIZE_KEY); + } + /** * Returns Cronet annotations for gRPC included in the given {@code callOptions}. Annotations * are attached via {@link #withAnnotation(CallOptions, Object)}. diff --git a/cronet/src/test/java/io/grpc/cronet/CronetChannelBuilderTest.java b/cronet/src/test/java/io/grpc/cronet/CronetChannelBuilderTest.java index 41f48bc03bb..be437b3c80b 100644 --- a/cronet/src/test/java/io/grpc/cronet/CronetChannelBuilderTest.java +++ b/cronet/src/test/java/io/grpc/cronet/CronetChannelBuilderTest.java @@ -16,7 +16,9 @@ package io.grpc.cronet; +import static io.grpc.cronet.CronetClientStream.CRONET_READ_BUFFER_SIZE_KEY; import static io.grpc.internal.GrpcUtil.TIMER_SERVICE; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; @@ -92,6 +94,41 @@ public void alwaysUsePut_defaultsToFalse() throws Exception { assertFalse(stream.idempotent); } + @Test + public void channelBuilderReadBufferSize_defaultsTo4Kb() throws Exception { + CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine); + CronetTransportFactory transportFactory = + (CronetTransportFactory) builder.buildTransportFactory(); + CronetClientTransport transport = + (CronetClientTransport) + transportFactory.newClientTransport( + new InetSocketAddress("localhost", 443), + new ClientTransportOptions(), + channelLogger); + CronetClientStream stream = transport.newStream( + method, new Metadata(), CallOptions.DEFAULT, tracers); + + assertEquals(4 * 1024, stream.readBufferSize); + } + + @Test + public void channelBuilderReadBufferSize_changeReflected() throws Exception { + CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine); + CronetTransportFactory transportFactory = + (CronetTransportFactory) builder.buildTransportFactory(); + CronetClientTransport transport = + (CronetClientTransport) + transportFactory.newClientTransport( + new InetSocketAddress("localhost", 443), + new ClientTransportOptions(), + channelLogger); + CronetClientStream stream = transport.newStream( + method, new Metadata(), + CallOptions.DEFAULT.withOption(CRONET_READ_BUFFER_SIZE_KEY, 32 * 1024), tracers); + + assertEquals(32 * 1024, stream.readBufferSize); + } + @Test public void scheduledExecutorService_default() { CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine);