-
Notifications
You must be signed in to change notification settings - Fork 4k
otel: add tcp metrics #12652
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
otel: add tcp metrics #12652
Changes from all commits
508c141
5ef3b1c
c4111e2
172f6c4
6996afd
42a834b
65487c6
2b44889
2990c28
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ | |
| import io.grpc.ChannelCredentials; | ||
| import io.grpc.ChannelLogger; | ||
| import io.grpc.HttpConnectProxiedSocketAddress; | ||
| import io.grpc.MetricRecorder; | ||
| import java.io.Closeable; | ||
| import java.net.SocketAddress; | ||
| import java.util.Collection; | ||
|
|
@@ -91,6 +92,7 @@ final class ClientTransportOptions { | |
| private Attributes eagAttributes = Attributes.EMPTY; | ||
| @Nullable private String userAgent; | ||
| @Nullable private HttpConnectProxiedSocketAddress connectProxiedSocketAddr; | ||
| @Nullable private MetricRecorder metricRecorder; | ||
|
|
||
| public ChannelLogger getChannelLogger() { | ||
| return channelLogger; | ||
|
|
@@ -101,6 +103,16 @@ public ClientTransportOptions setChannelLogger(ChannelLogger channelLogger) { | |
| return this; | ||
| } | ||
|
|
||
| @Nullable | ||
| public MetricRecorder getMetricRecorder() { | ||
| return metricRecorder; | ||
| } | ||
|
Comment on lines
+106
to
+109
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If we need the optimization, we can have the MetricRecorder return whether a particular instrument is enabled. But I see no need to add an additional condition to usages for the metric recorder being missing entirely. |
||
|
|
||
| public ClientTransportOptions setMetricRecorder(@Nullable MetricRecorder metricRecorder) { | ||
| this.metricRecorder = metricRecorder; | ||
| return this; | ||
| } | ||
|
|
||
| public String getAuthority() { | ||
| return authority; | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
|
|
||
| import io.grpc.InternalChannelz.SocketStats; | ||
| import io.grpc.InternalInstrumented; | ||
| import io.grpc.MetricRecorder; | ||
| import java.io.IOException; | ||
| import java.net.SocketAddress; | ||
| import java.util.List; | ||
|
|
@@ -71,4 +72,9 @@ public interface InternalServer { | |
| */ | ||
| @Nullable List<InternalInstrumented<SocketStats>> getListenSocketStatsList(); | ||
|
|
||
| /** | ||
| * Sets the MetricRecorder for the server. This optional method allows setting | ||
| * the MetricRecorder after construction but before start(). | ||
| */ | ||
| default void setMetricRecorder(MetricRecorder metricRecorder) {} | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I want to avoid setters-before-start for this API, as they get wonky and prevent using |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,7 @@ | |
| import io.grpc.InternalChannelz; | ||
| import io.grpc.InternalStatus; | ||
| import io.grpc.Metadata; | ||
| import io.grpc.MetricRecorder; | ||
| import io.grpc.Status; | ||
| import io.grpc.StatusException; | ||
| import io.grpc.internal.ClientStreamListener.RpcProgress; | ||
|
|
@@ -123,6 +124,7 @@ class NettyClientHandler extends AbstractNettyHandler { | |
| private final Supplier<Stopwatch> stopwatchFactory; | ||
| private final TransportTracer transportTracer; | ||
| private final Attributes eagAttributes; | ||
| private final TcpMetrics.Tracker tcpMetrics; | ||
| private final String authority; | ||
| private final InUseStateAggregator<Http2Stream> inUseState = | ||
| new InUseStateAggregator<Http2Stream>() { | ||
|
|
@@ -164,7 +166,8 @@ static NettyClientHandler newHandler( | |
| Attributes eagAttributes, | ||
| String authority, | ||
| ChannelLogger negotiationLogger, | ||
| Ticker ticker) { | ||
| Ticker ticker, | ||
| MetricRecorder metricRecorder) { | ||
| Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive"); | ||
| Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize); | ||
| Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder); | ||
|
|
@@ -194,7 +197,8 @@ static NettyClientHandler newHandler( | |
| eagAttributes, | ||
| authority, | ||
| negotiationLogger, | ||
| ticker); | ||
| ticker, | ||
| metricRecorder); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
|
|
@@ -214,7 +218,8 @@ static NettyClientHandler newHandler( | |
| Attributes eagAttributes, | ||
| String authority, | ||
| ChannelLogger negotiationLogger, | ||
| Ticker ticker) { | ||
| Ticker ticker, | ||
| MetricRecorder metricRecorder) { | ||
| Preconditions.checkNotNull(connection, "connection"); | ||
| Preconditions.checkNotNull(frameReader, "frameReader"); | ||
| Preconditions.checkNotNull(lifecycleManager, "lifecycleManager"); | ||
|
|
@@ -269,7 +274,8 @@ static NettyClientHandler newHandler( | |
| pingCounter, | ||
| ticker, | ||
| maxHeaderListSize, | ||
| softLimitHeaderListSize); | ||
| softLimitHeaderListSize, | ||
| metricRecorder); | ||
| } | ||
|
|
||
| private NettyClientHandler( | ||
|
|
@@ -288,7 +294,8 @@ private NettyClientHandler( | |
| PingLimiter pingLimiter, | ||
| Ticker ticker, | ||
| int maxHeaderListSize, | ||
| int softLimitHeaderListSize) { | ||
| int softLimitHeaderListSize, | ||
| MetricRecorder metricRecorder) { | ||
| super( | ||
| /* channelUnused= */ null, | ||
| decoder, | ||
|
|
@@ -350,6 +357,7 @@ public void onStreamClosed(Http2Stream stream) { | |
| } | ||
| } | ||
| }); | ||
| this.tcpMetrics = new TcpMetrics.Tracker(metricRecorder, "client"); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a TODO here to pass the target? |
||
| } | ||
|
|
||
| /** | ||
|
|
@@ -478,6 +486,7 @@ private void onRstStreamRead(int streamId, long errorCode) { | |
|
|
||
| @Override | ||
| public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { | ||
| tcpMetrics.recordTcpInfo(ctx.channel()); | ||
| logger.fine("Network channel being closed by the application."); | ||
| if (ctx.channel().isActive()) { // Ignore notification that the socket was closed | ||
| lifecycleManager.notifyShutdown( | ||
|
|
@@ -490,10 +499,17 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce | |
| /** | ||
| * Handler for the Channel shutting down. | ||
| */ | ||
| @Override | ||
| public void channelActive(ChannelHandlerContext ctx) throws Exception { | ||
| tcpMetrics.channelActive(ctx.channel()); | ||
| super.channelActive(ctx); | ||
| } | ||
|
|
||
| @Override | ||
| public void channelInactive(ChannelHandlerContext ctx) throws Exception { | ||
| try { | ||
| logger.fine("Network channel is closed"); | ||
| tcpMetrics.channelInactive(ctx.channel()); | ||
| Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason"); | ||
| lifecycleManager.notifyShutdown(status, SimpleDisconnectError.UNKNOWN); | ||
| final Status streamStatus; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |
| import static io.netty.channel.ChannelOption.ALLOCATOR; | ||
| import static io.netty.channel.ChannelOption.SO_KEEPALIVE; | ||
|
|
||
| import com.google.common.annotations.VisibleForTesting; | ||
| import com.google.common.base.MoreObjects; | ||
| import com.google.common.base.Preconditions; | ||
| import com.google.common.util.concurrent.ListenableFuture; | ||
|
|
@@ -31,6 +32,7 @@ | |
| import io.grpc.InternalInstrumented; | ||
| import io.grpc.InternalLogId; | ||
| import io.grpc.InternalWithLogId; | ||
| import io.grpc.MetricRecorder; | ||
| import io.grpc.ServerStreamTracer; | ||
| import io.grpc.internal.InternalServer; | ||
| import io.grpc.internal.ObjectPool; | ||
|
|
@@ -67,6 +69,7 @@ | |
| import java.util.concurrent.Callable; | ||
| import java.util.logging.Level; | ||
| import java.util.logging.Logger; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| /** | ||
| * Netty-based server implementation. | ||
|
|
@@ -93,6 +96,7 @@ class NettyServer implements InternalServer, InternalWithLogId { | |
| private final int maxMessageSize; | ||
| private final int maxHeaderListSize; | ||
| private final int softLimitHeaderListSize; | ||
| private MetricRecorder metricRecorder; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How is this being set? I don't see a constructor or a setter that accesses this value? |
||
| private final long keepAliveTimeInNanos; | ||
| private final long keepAliveTimeoutInNanos; | ||
| private final long maxConnectionIdleInNanos; | ||
|
|
@@ -136,7 +140,8 @@ class NettyServer implements InternalServer, InternalWithLogId { | |
| long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, | ||
| boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos, | ||
| int maxRstCount, long maxRstPeriodNanos, | ||
| Attributes eagAttributes, InternalChannelz channelz) { | ||
| Attributes eagAttributes, InternalChannelz channelz, | ||
| @Nullable MetricRecorder metricRecorder) { | ||
| this.addresses = checkNotNull(addresses, "addresses"); | ||
| this.channelFactory = checkNotNull(channelFactory, "channelFactory"); | ||
| checkNotNull(channelOptions, "channelOptions"); | ||
|
|
@@ -172,6 +177,13 @@ class NettyServer implements InternalServer, InternalWithLogId { | |
| this.channelz = Preconditions.checkNotNull(channelz); | ||
| this.logId = InternalLogId.allocate(getClass(), addresses.isEmpty() ? "No address" : | ||
| String.valueOf(addresses)); | ||
| this.metricRecorder = metricRecorder; | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| @Nullable | ||
| MetricRecorder getMetricRecorder() { | ||
| return metricRecorder; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -272,7 +284,8 @@ public void initChannel(Channel ch) { | |
| permitKeepAliveTimeInNanos, | ||
| maxRstCount, | ||
| maxRstPeriodNanos, | ||
| eagAttributes); | ||
| eagAttributes, | ||
| metricRecorder); | ||
| ServerTransportListener transportListener; | ||
| // This is to order callbacks on the listener, not to guard access to channel. | ||
| synchronized (NettyServer.this) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you review go/java-practices/null from the overall PR's perspective in terms of nullable vs optional ? I am not sure if we have a local java convention to prefer or avoid null