Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private GrpcOpenTelemetry(Builder builder) {
this.optionalLabels = ImmutableList.copyOf(builder.optionalLabels);
this.openTelemetryMetricsModule = new OpenTelemetryMetricsModule(
STOPWATCH_SUPPLIER, resource, optionalLabels, builder.plugins,
builder.targetFilter);
openTelemetrySdk.getPropagators(), builder.targetFilter);
this.openTelemetryTracingModule = new OpenTelemetryTracingModule(openTelemetrySdk);
this.sink = new OpenTelemetryMetricSink(meter, enableMetrics, disableDefault, optionalLabels);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.opentelemetry.api.baggage.Baggage;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.ContextPropagators;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -100,24 +101,28 @@ final class OpenTelemetryMetricsModule {
private final boolean localityEnabled;
private final boolean backendServiceEnabled;
private final ImmutableList<OpenTelemetryPlugin> plugins;
private final ContextPropagators contextPropagators;
@Nullable
private final TargetFilter targetAttributeFilter;

OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
OpenTelemetryMetricsResource resource,
Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins) {
this(stopwatchSupplier, resource, optionalLabels, plugins, null);
Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins,
ContextPropagators contextPropagators) {
this(stopwatchSupplier, resource, optionalLabels, plugins, contextPropagators, null);
}

OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
OpenTelemetryMetricsResource resource,
Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins,
@Nullable TargetFilter targetAttributeFilter) {
ContextPropagators contextPropagators,
@Nullable TargetFilter targetAttributeFilter) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Restore indentation.

this.resource = checkNotNull(resource, "resource");
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey());
this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey());
this.plugins = ImmutableList.copyOf(plugins);
this.contextPropagators = checkNotNull(contextPropagators, "contextPropagators");
this.targetAttributeFilter = targetAttributeFilter;
}

Expand Down Expand Up @@ -159,8 +164,7 @@ static String recordMethodName(String fullMethodName, boolean isGeneratedMethod)
return isGeneratedMethod ? fullMethodName : "other";
}

private static Context otelContextWithBaggage() {
Baggage baggage = BAGGAGE_KEY.get();
private static Context otelContextWithBaggage(Baggage baggage) {
if (baggage == null) {
return Context.current();
}
Expand Down Expand Up @@ -282,7 +286,7 @@ public void streamClosed(Status status) {
}

void recordFinishedAttempt() {
Context otelContext = otelContextWithBaggage();
Context otelContext = otelContextWithBaggage(BAGGAGE_KEY.get());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no guaranteed Context as this point. I think this is only working because you're using in-process, which calls the StatsTraceContext on the calling thread. If the test used Netty instead, I suspect the baggage would no longer be found.

AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
.put(METHOD_KEY, fullMethodName)
.put(TARGET_KEY, target)
Expand Down Expand Up @@ -448,7 +452,7 @@ void callEnded(Status status) {
}

void recordFinishedCall() {
Context otelContext = otelContextWithBaggage();
Context otelContext = otelContextWithBaggage(BAGGAGE_KEY.get());
if (attemptsPerCall.get() == 0) {
ClientTracer tracer = newClientTracer(null);
tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
Expand Down Expand Up @@ -553,13 +557,15 @@ private static final class ServerTracer extends ServerStreamTracer {
private final Stopwatch stopwatch;
private volatile long outboundWireSize;
private volatile long inboundWireSize;
private final Context otelContext;

ServerTracer(OpenTelemetryMetricsModule module, String fullMethodName,
List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins) {
List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins, Context otelContext) {
this.module = checkNotNull(module, "module");
this.fullMethodName = fullMethodName;
this.streamPlugins = checkNotNull(streamPlugins, "streamPlugins");
this.stopwatch = module.stopwatchSupplier.get().start();
this.otelContext = checkNotNull(otelContext, "otelContext");
}

@Override
Expand All @@ -574,7 +580,7 @@ public void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing));

if (module.resource.serverCallCountCounter() != null) {
module.resource.serverCallCountCounter().add(1, attribute);
module.resource.serverCallCountCounter().add(1, attribute, otelContext);
}
}

Expand Down Expand Up @@ -606,7 +612,6 @@ public void inboundWireSize(long bytes) {
*/
@Override
public void streamClosed(Status status) {
Context otelContext = otelContextWithBaggage();
if (streamClosedUpdater != null) {
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
return;
Expand All @@ -627,17 +632,23 @@ public void streamClosed(Status status) {
}
io.opentelemetry.api.common.Attributes attributes = builder.build();

Context ctxToRecord = otelContext;
Baggage currentBaggage = BAGGAGE_KEY.get();
if (currentBaggage != null && !currentBaggage.isEmpty()) {
ctxToRecord = ctxToRecord.with(currentBaggage);
}

if (module.resource.serverCallDurationCounter() != null) {
module.resource.serverCallDurationCounter()
.record(elapsedTimeNanos * SECONDS_PER_NANO, attributes, otelContext);
.record(elapsedTimeNanos * SECONDS_PER_NANO, attributes, ctxToRecord);
}
if (module.resource.serverTotalSentCompressedMessageSizeCounter() != null) {
module.resource.serverTotalSentCompressedMessageSizeCounter()
.record(outboundWireSize, attributes, otelContext);
.record(outboundWireSize, attributes, ctxToRecord);
}
if (module.resource.serverTotalReceivedCompressedMessageSizeCounter() != null) {
module.resource.serverTotalReceivedCompressedMessageSizeCounter()
.record(inboundWireSize, attributes, otelContext);
.record(inboundWireSize, attributes, ctxToRecord);
}
}
}
Expand All @@ -657,7 +668,10 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata
}
streamPlugins = Collections.unmodifiableList(streamPluginsMutable);
}
return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins);
Context context = contextPropagators.getTextMapPropagator().extract(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is being done by the OpenTelemetryTracingModule and is being propagated via filterContext(), specifically so it can be used by the metrics module. If we want the full context, propagate that instead. I don't understand the goal here (and there's no description about why we're doing this in the PR description or a comment from what I can tell). And I'm doubly confused by using this context directly half the time and copying BAGGAGE_KEY into it other times.

Context.current(), headers, MetadataGetter.getInstance());
return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins,
context);
}
}

Expand Down Expand Up @@ -717,3 +731,4 @@ public void onClose(Status status, Metadata trailers) {
}
}
}

Loading