Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import io.grpc.opentelemetry.GrpcOpenTelemetry.TargetFilter;
import io.opentelemetry.api.baggage.Baggage;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -159,14 +158,6 @@ static String recordMethodName(String fullMethodName, boolean isGeneratedMethod)
return isGeneratedMethod ? fullMethodName : "other";
}

private static Context otelContextWithBaggage() {
Baggage baggage = BAGGAGE_KEY.get();
if (baggage == null) {
return Context.current();
}
return Context.current().with(baggage);
}

private static final class ClientTracer extends ClientStreamTracer {
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> outboundWireSizeUpdater;
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> inboundWireSizeUpdater;
Expand Down Expand Up @@ -282,7 +273,6 @@ public void streamClosed(Status status) {
}

void recordFinishedAttempt() {
Context otelContext = otelContextWithBaggage();
AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
.put(METHOD_KEY, fullMethodName)
.put(TARGET_KEY, target)
Expand All @@ -308,15 +298,15 @@ void recordFinishedAttempt() {

if (module.resource.clientAttemptDurationCounter() != null ) {
module.resource.clientAttemptDurationCounter()
.record(attemptNanos * SECONDS_PER_NANO, attribute, otelContext);
.record(attemptNanos * SECONDS_PER_NANO, attribute);
}
if (module.resource.clientTotalSentCompressedMessageSizeCounter() != null) {
module.resource.clientTotalSentCompressedMessageSizeCounter()
.record(outboundWireSize, attribute, otelContext);
.record(outboundWireSize, attribute);
}
if (module.resource.clientTotalReceivedCompressedMessageSizeCounter() != null) {
module.resource.clientTotalReceivedCompressedMessageSizeCounter()
.record(inboundWireSize, attribute, otelContext);
.record(inboundWireSize, attribute);
}
}
}
Expand Down Expand Up @@ -448,7 +438,6 @@ void callEnded(Status status) {
}

void recordFinishedCall() {
Context otelContext = otelContextWithBaggage();
if (attemptsPerCall.get() == 0) {
ClientTracer tracer = newClientTracer(null);
tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
Expand All @@ -470,8 +459,7 @@ void recordFinishedCall() {
callLatencyNanos * SECONDS_PER_NANO,
baseAttributes.toBuilder()
.put(STATUS_KEY, status.getCode().toString())
.build(),
otelContext
.build()
);
}

Expand All @@ -480,7 +468,7 @@ void recordFinishedCall() {
long retriesPerCall = Math.max(attemptsPerCall.get() - 1, 0);
if (retriesPerCall > 0) {
module.resource.clientCallRetriesCounter()
.record(retriesPerCall, baseAttributes, otelContext);
.record(retriesPerCall, baseAttributes);
}
}

Expand All @@ -489,7 +477,7 @@ void recordFinishedCall() {
long hedges = hedgedAttemptsPerCall.get();
if (hedges > 0) {
module.resource.clientCallHedgesCounter()
.record(hedges, baseAttributes, otelContext);
.record(hedges, baseAttributes);
}
}

Expand All @@ -498,16 +486,15 @@ void recordFinishedCall() {
long transparentRetries = transparentRetriesPerCall.get();
if (transparentRetries > 0) {
module.resource.clientCallTransparentRetriesCounter()
.record(transparentRetries, baseAttributes, otelContext);
.record(transparentRetries, baseAttributes);
}
}

// Retry delay
if (module.resource.clientCallRetryDelayCounter() != null) {
module.resource.clientCallRetryDelayCounter().record(
retryDelayNanos * SECONDS_PER_NANO,
baseAttributes,
otelContext
baseAttributes
);
}
}
Expand Down Expand Up @@ -553,6 +540,7 @@ private static final class ServerTracer extends ServerStreamTracer {
private final Stopwatch stopwatch;
private volatile long outboundWireSize;
private volatile long inboundWireSize;
private volatile Baggage baggage;

ServerTracer(OpenTelemetryMetricsModule module, String fullMethodName,
List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins) {
Expand All @@ -568,13 +556,23 @@ public void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
// which is true for all generated methods. Otherwise, programmatically
// created methods result in high cardinality metrics.
boolean isSampledToLocalTracing = callInfo.getMethodDescriptor().isSampledToLocalTracing();
baggage = BAGGAGE_KEY.get(io.grpc.Context.current());
isGeneratedMethod = isSampledToLocalTracing;
io.opentelemetry.api.common.Attributes attribute =
io.opentelemetry.api.common.Attributes.of(
METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing));

AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
.put(METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing));

if (baggage != null) {
for (java.util.Map.Entry<String, io.opentelemetry.api.baggage.BaggageEntry> entry :
baggage.asMap().entrySet()) {
builder.put(entry.getKey(), entry.getValue().getValue());
}
}

io.opentelemetry.api.common.Attributes attributes = builder.build();

if (module.resource.serverCallCountCounter() != null) {
module.resource.serverCallCountCounter().add(1, attribute);
module.resource.serverCallCountCounter().add(1, attributes);
}
}

Expand Down Expand Up @@ -606,7 +604,6 @@ public void inboundWireSize(long bytes) {
*/
@Override
public void streamClosed(Status status) {
Context otelContext = otelContextWithBaggage();
if (streamClosedUpdater != null) {
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
return;
Expand All @@ -622,22 +619,30 @@ public void streamClosed(Status status) {
AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
.put(METHOD_KEY, recordMethodName(fullMethodName, isGeneratedMethod))
.put(STATUS_KEY, status.getCode().toString());

if (baggage != null) {
for (java.util.Map.Entry<String, io.opentelemetry.api.baggage.BaggageEntry> entry :
baggage.asMap().entrySet()) {
builder.put(entry.getKey(), entry.getValue().getValue());
}
}

for (OpenTelemetryPlugin.ServerStreamPlugin plugin : streamPlugins) {
plugin.addLabels(builder);
}
io.opentelemetry.api.common.Attributes attributes = builder.build();

if (module.resource.serverCallDurationCounter() != null) {
module.resource.serverCallDurationCounter()
.record(elapsedTimeNanos * SECONDS_PER_NANO, attributes, otelContext);
.record(elapsedTimeNanos * SECONDS_PER_NANO, attributes);
}
if (module.resource.serverTotalSentCompressedMessageSizeCounter() != null) {
module.resource.serverTotalSentCompressedMessageSizeCounter()
.record(outboundWireSize, attributes, otelContext);
.record(outboundWireSize, attributes);
}
if (module.resource.serverTotalReceivedCompressedMessageSizeCounter() != null) {
module.resource.serverTotalReceivedCompressedMessageSizeCounter()
.record(inboundWireSize, attributes, otelContext);
.record(inboundWireSize, attributes);
}
}
}
Expand All @@ -657,7 +662,8 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata
}
streamPlugins = Collections.unmodifiableList(streamPluginsMutable);
}
return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins);
return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName,
streamPlugins);
}
}

Expand Down Expand Up @@ -717,3 +723,4 @@ public void onClose(Status status, Metadata trailers) {
}
}
}

Loading