diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java index 6587444db59..e993242d80e 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java @@ -35,7 +35,6 @@ import io.grpc.gcp.observability.interceptors.LogHelper; import io.grpc.gcp.observability.logging.GcpLogSink; import io.grpc.gcp.observability.logging.Sink; -import io.grpc.internal.TimeProvider; import io.opencensus.common.Duration; import io.opencensus.contrib.grpc.metrics.RpcViewConstants; import io.opencensus.exporter.stats.stackdriver.StackdriverStatsConfiguration; @@ -79,8 +78,8 @@ public static synchronized GcpObservability grpcInit() throws IOException { ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance(); Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId(), globalLocationTags.getLocationTags(), observabilityConfig.getCustomTags(), - observabilityConfig.getFlushMessageCount(), SERVICES_TO_EXCLUDE); - LogHelper helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER); + SERVICES_TO_EXCLUDE); + LogHelper helper = new LogHelper(sink); ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig); instance = grpcInit(sink, observabilityConfig, new InternalLoggingChannelInterceptor.FactoryImpl(helper, configFilterHelper), diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfig.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfig.java index 48dd480973b..d4da16637d2 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfig.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfig.java @@ -36,9 +36,6 @@ public interface ObservabilityConfig { /** Get destination project ID - where logs will go. */ String getDestinationProjectId(); - /** Get message count threshold to flush - flush once message count is reached. */ - Long getFlushMessageCount(); - /** Get filters set for logging. */ List getLogFilters(); diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java index 1d0505e2818..073916180a6 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java @@ -45,7 +45,6 @@ final class ObservabilityConfigImpl implements ObservabilityConfig { private boolean enableCloudMonitoring = false; private boolean enableCloudTracing = false; private String destinationProjectId = null; - private Long flushMessageCount = null; private List logFilters; private List eventTypes; private Sampler sampler; @@ -87,7 +86,6 @@ private void parseConfig(Map config) { enableCloudTracing = value; } destinationProjectId = JsonUtil.getString(config, "destination_project_id"); - flushMessageCount = JsonUtil.getNumberAsLong(config, "flush_message_count"); List rawList = JsonUtil.getList(config, "log_filters"); if (rawList != null) { List> jsonLogFilters = JsonUtil.checkObjectList(rawList); @@ -102,7 +100,7 @@ private void parseConfig(Map config) { List jsonEventTypes = JsonUtil.checkStringList(rawList); ImmutableList.Builder eventTypesBuilder = new ImmutableList.Builder<>(); for (String jsonEventType : jsonEventTypes) { - eventTypesBuilder.add(convertEventType(jsonEventType)); + eventTypesBuilder.add(EventType.valueOf(jsonEventType)); } this.eventTypes = eventTypesBuilder.build(); } @@ -136,28 +134,6 @@ private void parseConfig(Map config) { } } - private EventType convertEventType(String val) { - switch (val) { - case "GRPC_CALL_UNKNOWN": - return EventType.GRPC_CALL_UNKNOWN; - case "GRPC_CALL_REQUEST_HEADER": - return EventType.GRPC_CALL_REQUEST_HEADER; - case "GRPC_CALL_RESPONSE_HEADER": - return EventType.GRPC_CALL_RESPONSE_HEADER; - case "GRPC_CALL_REQUEST_MESSAGE": - return EventType.GRPC_CALL_REQUEST_MESSAGE; - case "GRPC_CALL_RESPONSE_MESSAGE": - return EventType.GRPC_CALL_RESPONSE_MESSAGE; - case "GRPC_CALL_TRAILER": - return EventType.GRPC_CALL_TRAILER; - case "GRPC_CALL_HALF_CLOSE": - return EventType.GRPC_CALL_HALF_CLOSE; - case "GRPC_CALL_CANCEL": - return EventType.GRPC_CALL_CANCEL; - default: - throw new IllegalArgumentException("Unknown event type value:" + val); - } - } private LogFilter parseJsonLogFilter(Map logFilterMap) { return new LogFilter(JsonUtil.getString(logFilterMap, "pattern"), @@ -185,11 +161,6 @@ public String getDestinationProjectId() { return destinationProjectId; } - @Override - public Long getFlushMessageCount() { - return flushMessageCount; - } - @Override public List getLogFilters() { return logFilters; diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptor.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptor.java index 81e0a9819af..e201827bce8 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptor.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptor.java @@ -85,7 +85,7 @@ public ClientCall interceptCall(MethodDescriptor ClientCall interceptCall(MethodDescriptor responseListener, Metadata headers) { - // Event: EventType.GRPC_CALL_REQUEST_HEADER + // Event: EventType.CLIENT_HEADER // The timeout should reflect the time remaining when the call is started, so compute // remaining time here. final Duration timeout = deadline == null ? null : Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS)); - if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER)) { + if (filterHelper.isEventToBeLogged(EventType.CLIENT_HEADER)) { try { - helper.logRequestHeader( + helper.logClientHeader( seq.getAndIncrement(), serviceName, methodName, @@ -121,8 +121,8 @@ public void start(Listener responseListener, Metadata headers) { timeout, headers, maxHeaderBytes, - EventLogger.LOGGER_CLIENT, - rpcId, + EventLogger.CLIENT, + callId, null); } catch (Exception e) { // Catching generic exceptions instead of specific ones for all the events. @@ -139,19 +139,20 @@ public void start(Listener responseListener, Metadata headers) { new SimpleForwardingClientCallListener(responseListener) { @Override public void onMessage(RespT message) { - // Event: EventType.GRPC_CALL_RESPONSE_MESSAGE - EventType responseMessageType = EventType.GRPC_CALL_RESPONSE_MESSAGE; + // Event: EventType.SERVER_MESSAGE + EventType responseMessageType = EventType.SERVER_MESSAGE; if (filterHelper.isEventToBeLogged(responseMessageType)) { try { helper.logRpcMessage( seq.getAndIncrement(), serviceName, methodName, + authority, responseMessageType, message, maxMessageBytes, - EventLogger.LOGGER_CLIENT, - rpcId); + EventLogger.CLIENT, + callId); } catch (Exception e) { logger.log(Level.SEVERE, "Unable to log response message", e); } @@ -161,17 +162,18 @@ public void onMessage(RespT message) { @Override public void onHeaders(Metadata headers) { - // Event: EventType.GRPC_CALL_RESPONSE_HEADER - if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)) { + // Event: EventType.SERVER_HEADER + if (filterHelper.isEventToBeLogged(EventType.SERVER_HEADER)) { try { - helper.logResponseHeader( + helper.logServerHeader( seq.getAndIncrement(), serviceName, methodName, + authority, headers, maxHeaderBytes, - EventLogger.LOGGER_CLIENT, - rpcId, + EventLogger.CLIENT, + callId, LogHelper.getPeerAddress(getAttributes())); } catch (Exception e) { logger.log(Level.SEVERE, "Unable to log response header", e); @@ -182,18 +184,19 @@ public void onHeaders(Metadata headers) { @Override public void onClose(Status status, Metadata trailers) { - // Event: EventType.GRPC_CALL_TRAILER - if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_TRAILER)) { + // Event: EventType.SERVER_TRAILER + if (filterHelper.isEventToBeLogged(EventType.SERVER_TRAILER)) { try { helper.logTrailer( seq.getAndIncrement(), serviceName, methodName, + authority, status, trailers, maxHeaderBytes, - EventLogger.LOGGER_CLIENT, - rpcId, + EventLogger.CLIENT, + callId, LogHelper.getPeerAddress(getAttributes())); } catch (Exception e) { logger.log(Level.SEVERE, "Unable to log trailer", e); @@ -207,19 +210,20 @@ public void onClose(Status status, Metadata trailers) { @Override public void sendMessage(ReqT message) { - // Event: EventType.GRPC_CALL_REQUEST_MESSAGE - EventType requestMessageType = EventType.GRPC_CALL_REQUEST_MESSAGE; + // Event: EventType.CLIENT_MESSAGE + EventType requestMessageType = EventType.CLIENT_MESSAGE; if (filterHelper.isEventToBeLogged(requestMessageType)) { try { helper.logRpcMessage( seq.getAndIncrement(), serviceName, methodName, + authority, requestMessageType, message, maxMessageBytes, - EventLogger.LOGGER_CLIENT, - rpcId); + EventLogger.CLIENT, + callId); } catch (Exception e) { logger.log(Level.SEVERE, "Unable to log request message", e); } @@ -229,15 +233,16 @@ public void sendMessage(ReqT message) { @Override public void halfClose() { - // Event: EventType.GRPC_CALL_HALF_CLOSE - if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)) { + // Event: EventType.CLIENT_HALF_CLOSE + if (filterHelper.isEventToBeLogged(EventType.CLIENT_HALF_CLOSE)) { try { helper.logHalfClose( seq.getAndIncrement(), serviceName, methodName, - EventLogger.LOGGER_CLIENT, - rpcId); + authority, + EventLogger.CLIENT, + callId); } catch (Exception e) { logger.log(Level.SEVERE, "Unable to log half close", e); } @@ -247,15 +252,16 @@ public void halfClose() { @Override public void cancel(String message, Throwable cause) { - // Event: EventType.GRPC_CALL_CANCEL - if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_CANCEL)) { + // Event: EventType.CANCEL + if (filterHelper.isEventToBeLogged(EventType.CANCEL)) { try { helper.logCancel( seq.getAndIncrement(), serviceName, methodName, - EventLogger.LOGGER_CLIENT, - rpcId); + authority, + EventLogger.CLIENT, + callId); } catch (Exception e) { logger.log(Level.SEVERE, "Unable to log cancel", e); } diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptor.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptor.java index 112a1c067b1..217674f13ca 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptor.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptor.java @@ -84,7 +84,7 @@ private InternalLoggingServerInterceptor(LogHelper helper, ConfigFilterHelper fi public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) { final AtomicLong seq = new AtomicLong(1); - final String rpcId = UUID.randomUUID().toString(); + final String callId = UUID.randomUUID().toString(); final String authority = call.getAuthority(); final String serviceName = call.getMethodDescriptor().getServiceName(); final String methodName = call.getMethodDescriptor().getBareMethodName(); @@ -101,10 +101,10 @@ public ServerCall.Listener interceptCall(ServerCall ServerCall.Listener interceptCall(ServerCall ServerCall.Listener interceptCall(ServerCall(call) { @Override public void sendHeaders(Metadata headers) { - // Event: EventType.GRPC_CALL_RESPONSE_HEADER - if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)) { + // Event: EventType.SERVER_HEADER + if (filterHelper.isEventToBeLogged(EventType.SERVER_HEADER)) { try { - helper.logResponseHeader( + helper.logServerHeader( seq.getAndIncrement(), serviceName, methodName, + authority, headers, maxHeaderBytes, - EventLogger.LOGGER_SERVER, - rpcId, + EventLogger.SERVER, + callId, null); } catch (Exception e) { logger.log(Level.SEVERE, "Unable to log response header", e); @@ -151,19 +152,20 @@ public void sendHeaders(Metadata headers) { @Override public void sendMessage(RespT message) { - // Event: EventType.GRPC_CALL_RESPONSE_MESSAGE - EventType responseMessageType = EventType.GRPC_CALL_RESPONSE_MESSAGE; + // Event: EventType.SERVER_MESSAGE + EventType responseMessageType = EventType.SERVER_MESSAGE; if (filterHelper.isEventToBeLogged(responseMessageType)) { try { helper.logRpcMessage( seq.getAndIncrement(), serviceName, methodName, + authority, responseMessageType, message, maxMessageBytes, - EventLogger.LOGGER_SERVER, - rpcId); + EventLogger.SERVER, + callId); } catch (Exception e) { logger.log(Level.SEVERE, "Unable to log response message", e); } @@ -173,18 +175,19 @@ public void sendMessage(RespT message) { @Override public void close(Status status, Metadata trailers) { - // Event: EventType.GRPC_CALL_TRAILER - if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_TRAILER)) { + // Event: EventType.SERVER_TRAILER + if (filterHelper.isEventToBeLogged(EventType.SERVER_TRAILER)) { try { helper.logTrailer( seq.getAndIncrement(), serviceName, methodName, + authority, status, trailers, maxHeaderBytes, - EventLogger.LOGGER_SERVER, - rpcId, + EventLogger.SERVER, + callId, null); } catch (Exception e) { logger.log(Level.SEVERE, "Unable to log trailer", e); @@ -198,19 +201,20 @@ public void close(Status status, Metadata trailers) { return new SimpleForwardingServerCallListener(listener) { @Override public void onMessage(ReqT message) { - // Event: EventType.GRPC_CALL_REQUEST_MESSAGE - EventType requestMessageType = EventType.GRPC_CALL_REQUEST_MESSAGE; + // Event: EventType.CLIENT_MESSAGE + EventType requestMessageType = EventType.CLIENT_MESSAGE; if (filterHelper.isEventToBeLogged(requestMessageType)) { try { helper.logRpcMessage( seq.getAndIncrement(), serviceName, methodName, + authority, requestMessageType, message, maxMessageBytes, - EventLogger.LOGGER_SERVER, - rpcId); + EventLogger.SERVER, + callId); } catch (Exception e) { logger.log(Level.SEVERE, "Unable to log request message", e); } @@ -220,15 +224,16 @@ public void onMessage(ReqT message) { @Override public void onHalfClose() { - // Event: EventType.GRPC_CALL_HALF_CLOSE - if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)) { + // Event: EventType.CLIENT_HALF_CLOSE + if (filterHelper.isEventToBeLogged(EventType.CLIENT_HALF_CLOSE)) { try { helper.logHalfClose( seq.getAndIncrement(), serviceName, methodName, - EventLogger.LOGGER_SERVER, - rpcId); + authority, + EventLogger.SERVER, + callId); } catch (Exception e) { logger.log(Level.SEVERE, "Unable to log half close", e); } @@ -238,15 +243,16 @@ public void onHalfClose() { @Override public void onCancel() { - // Event: EventType.GRPC_CALL_CANCEL - if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_CANCEL)) { + // Event: EventType.CANCEL + if (filterHelper.isEventToBeLogged(EventType.CANCEL)) { try { helper.logCancel( seq.getAndIncrement(), serviceName, methodName, - EventLogger.LOGGER_SERVER, - rpcId); + authority, + EventLogger.SERVER, + callId); } catch (Exception e) { logger.log(Level.SEVERE, "Unable to log cancel", e); } diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/LogHelper.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/LogHelper.java index 46589f93845..9b46699efaf 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/LogHelper.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/LogHelper.java @@ -18,32 +18,32 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.InternalMetadata.BASE64_ENCODING_OMIT_PADDING; -import com.google.common.base.Charsets; +import com.google.common.base.Joiner; import com.google.protobuf.ByteString; import com.google.protobuf.Duration; -import com.google.protobuf.util.Timestamps; import io.grpc.Attributes; import io.grpc.Deadline; import io.grpc.Grpc; import io.grpc.Internal; -import io.grpc.InternalMetadata; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.gcp.observability.logging.Sink; -import io.grpc.internal.TimeProvider; +import io.grpc.observabilitylog.v1.Address; import io.grpc.observabilitylog.v1.GrpcLogRecord; -import io.grpc.observabilitylog.v1.GrpcLogRecord.Address; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; -import io.grpc.observabilitylog.v1.GrpcLogRecord.LogLevel; +import io.grpc.observabilitylog.v1.Payload; import java.net.Inet4Address; import java.net.Inet6Address; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; @@ -63,23 +63,20 @@ public class LogHelper { Metadata.BINARY_BYTE_MARSHALLER); private final Sink sink; - private final TimeProvider timeProvider; /** * Creates a LogHelper instance. + * @param sink sink * - * @param sink sink - * @param timeProvider timeprovider */ - public LogHelper(Sink sink, TimeProvider timeProvider) { + public LogHelper(Sink sink) { this.sink = sink; - this.timeProvider = timeProvider; } /** * Logs the request header. Binary logging equivalent of logClientHeader. */ - void logRequestHeader( + void logClientHeader( long seqId, String serviceName, String methodName, @@ -88,35 +85,33 @@ void logRequestHeader( Metadata metadata, int maxHeaderBytes, GrpcLogRecord.EventLogger eventLogger, - String rpcId, + String callId, // null on client side @Nullable SocketAddress peerAddress) { checkNotNull(serviceName, "serviceName"); checkNotNull(methodName, "methodName"); - checkNotNull(rpcId, "rpcId"); + checkNotNull(authority, "authority"); + checkNotNull(callId, "callId"); checkArgument( - peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.LOGGER_SERVER, + peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.SERVER, "peerAddress can only be specified by server"); - - PayloadBuilder pair = + PayloadBuilderHelper pair = createMetadataProto(metadata, maxHeaderBytes); - GrpcLogRecord.Builder logEntryBuilder = createTimestamp() + if (timeout != null) { + pair.payloadBuilder.setTimeout(timeout); + } + GrpcLogRecord.Builder logEntryBuilder = GrpcLogRecord.newBuilder() .setSequenceId(seqId) .setServiceName(serviceName) .setMethodName(methodName) .setAuthority(authority) - .setEventType(EventType.GRPC_CALL_REQUEST_HEADER) - .setEventLogger(eventLogger) - .setLogLevel(LogLevel.LOG_LEVEL_DEBUG) - .setMetadata(pair.payload) - .setPayloadSize(pair.size) + .setType(EventType.CLIENT_HEADER) + .setLogger(eventLogger) + .setPayload(pair.payloadBuilder) .setPayloadTruncated(pair.truncated) - .setRpcId(rpcId); - if (timeout != null) { - logEntryBuilder.setTimeout(timeout); - } + .setCallId(callId); if (peerAddress != null) { - logEntryBuilder.setPeerAddress(socketAddressToProto(peerAddress)); + logEntryBuilder.setPeer(socketAddressToProto(peerAddress)); } sink.write(logEntryBuilder.build()); } @@ -124,39 +119,40 @@ void logRequestHeader( /** * Logs the response header. Binary logging equivalent of logServerHeader. */ - void logResponseHeader( + void logServerHeader( long seqId, String serviceName, String methodName, + String authority, Metadata metadata, int maxHeaderBytes, GrpcLogRecord.EventLogger eventLogger, - String rpcId, + String callId, @Nullable SocketAddress peerAddress) { checkNotNull(serviceName, "serviceName"); checkNotNull(methodName, "methodName"); - checkNotNull(rpcId, "rpcId"); + checkNotNull(authority, "authority"); + checkNotNull(callId, "callId"); // Logging peer address only on the first incoming event. On server side, peer address will // of logging request header checkArgument( - peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.LOGGER_CLIENT, + peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.CLIENT, "peerAddress can only be specified for client"); - PayloadBuilder pair = + PayloadBuilderHelper pair = createMetadataProto(metadata, maxHeaderBytes); - GrpcLogRecord.Builder logEntryBuilder = createTimestamp() + GrpcLogRecord.Builder logEntryBuilder = GrpcLogRecord.newBuilder() .setSequenceId(seqId) .setServiceName(serviceName) .setMethodName(methodName) - .setEventType(EventType.GRPC_CALL_RESPONSE_HEADER) - .setEventLogger(eventLogger) - .setLogLevel(LogLevel.LOG_LEVEL_DEBUG) - .setMetadata(pair.payload) - .setPayloadSize(pair.size) + .setAuthority(authority) + .setType(EventType.SERVER_HEADER) + .setLogger(eventLogger) + .setPayload(pair.payloadBuilder) .setPayloadTruncated(pair.truncated) - .setRpcId(rpcId); + .setCallId(callId); if (peerAddress != null) { - logEntryBuilder.setPeerAddress(socketAddressToProto(peerAddress)); + logEntryBuilder.setPeer(socketAddressToProto(peerAddress)); } sink.write(logEntryBuilder.build()); } @@ -168,44 +164,45 @@ void logTrailer( long seqId, String serviceName, String methodName, + String authority, Status status, Metadata metadata, int maxHeaderBytes, GrpcLogRecord.EventLogger eventLogger, - String rpcId, + String callId, @Nullable SocketAddress peerAddress) { checkNotNull(serviceName, "serviceName"); checkNotNull(methodName, "methodName"); + checkNotNull(authority, "authority"); checkNotNull(status, "status"); - checkNotNull(rpcId, "rpcId"); + checkNotNull(callId, "callId"); checkArgument( - peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.LOGGER_CLIENT, + peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.CLIENT, "peerAddress can only be specified for client"); - PayloadBuilder pair = + PayloadBuilderHelper pair = createMetadataProto(metadata, maxHeaderBytes); - GrpcLogRecord.Builder logEntryBuilder = createTimestamp() - .setSequenceId(seqId) - .setServiceName(serviceName) - .setMethodName(methodName) - .setEventType(EventType.GRPC_CALL_TRAILER) - .setEventLogger(eventLogger) - .setLogLevel(LogLevel.LOG_LEVEL_DEBUG) - .setMetadata(pair.payload) - .setPayloadSize(pair.size) - .setPayloadTruncated(pair.truncated) - .setStatusCode(status.getCode().value()) - .setRpcId(rpcId); + pair.payloadBuilder.setStatusCode(status.getCode().value()); String statusDescription = status.getDescription(); if (statusDescription != null) { - logEntryBuilder.setStatusMessage(statusDescription); + pair.payloadBuilder.setStatusMessage(statusDescription); } byte[] statusDetailBytes = metadata.get(STATUS_DETAILS_KEY); if (statusDetailBytes != null) { - logEntryBuilder.setStatusDetails(ByteString.copyFrom(statusDetailBytes)); + pair.payloadBuilder.setStatusDetails(ByteString.copyFrom(statusDetailBytes)); } + GrpcLogRecord.Builder logEntryBuilder = GrpcLogRecord.newBuilder() + .setSequenceId(seqId) + .setServiceName(serviceName) + .setMethodName(methodName) + .setAuthority(authority) + .setType(EventType.SERVER_TRAILER) + .setLogger(eventLogger) + .setPayload(pair.payloadBuilder) + .setPayloadTruncated(pair.truncated) + .setCallId(callId); if (peerAddress != null) { - logEntryBuilder.setPeerAddress(socketAddressToProto(peerAddress)); + logEntryBuilder.setPeer(socketAddressToProto(peerAddress)); } sink.write(logEntryBuilder.build()); } @@ -217,17 +214,19 @@ void logRpcMessage( long seqId, String serviceName, String methodName, + String authority, EventType eventType, T message, int maxMessageBytes, EventLogger eventLogger, - String rpcId) { + String callId) { checkNotNull(serviceName, "serviceName"); checkNotNull(methodName, "methodName"); - checkNotNull(rpcId, "rpcId"); + checkNotNull(authority, "authority"); + checkNotNull(callId, "callId"); checkArgument( - eventType == EventType.GRPC_CALL_REQUEST_MESSAGE - || eventType == EventType.GRPC_CALL_RESPONSE_MESSAGE, + eventType == EventType.CLIENT_MESSAGE + || eventType == EventType.SERVER_MESSAGE, "event type must correspond to client message or server message"); checkNotNull(message, "message"); @@ -241,27 +240,23 @@ void logRpcMessage( } else if (message instanceof byte[]) { messageBytesArray = (byte[]) message; } else { - logger.log(Level.WARNING, "message is of UNKNOWN type, message and payload_size fields" + logger.log(Level.WARNING, "message is of UNKNOWN type, message and payload_size fields " + "of GrpcLogRecord proto will not be logged"); } - PayloadBuilder pair = null; + PayloadBuilderHelper pair = null; if (messageBytesArray != null) { pair = createMessageProto(messageBytesArray, maxMessageBytes); } - - GrpcLogRecord.Builder logEntryBuilder = createTimestamp() + GrpcLogRecord.Builder logEntryBuilder = GrpcLogRecord.newBuilder() .setSequenceId(seqId) .setServiceName(serviceName) .setMethodName(methodName) - .setEventType(eventType) - .setEventLogger(eventLogger) - .setLogLevel(LogLevel.LOG_LEVEL_DEBUG) - .setRpcId(rpcId); - if (pair != null && pair.size != 0) { - logEntryBuilder.setPayloadSize(pair.size); - } - if (pair != null && pair.payload != null) { - logEntryBuilder.setMessage(pair.payload) + .setAuthority(authority) + .setType(eventType) + .setLogger(eventLogger) + .setCallId(callId); + if (pair != null) { + logEntryBuilder.setPayload(pair.payloadBuilder) .setPayloadTruncated(pair.truncated); } sink.write(logEntryBuilder.build()); @@ -274,20 +269,22 @@ void logHalfClose( long seqId, String serviceName, String methodName, + String authority, GrpcLogRecord.EventLogger eventLogger, - String rpcId) { + String callId) { checkNotNull(serviceName, "serviceName"); checkNotNull(methodName, "methodName"); - checkNotNull(rpcId, "rpcId"); + checkNotNull(authority, "authority"); + checkNotNull(callId, "callId"); - GrpcLogRecord.Builder logEntryBuilder = createTimestamp() + GrpcLogRecord.Builder logEntryBuilder = GrpcLogRecord.newBuilder() .setSequenceId(seqId) .setServiceName(serviceName) .setMethodName(methodName) - .setEventType(EventType.GRPC_CALL_HALF_CLOSE) - .setEventLogger(eventLogger) - .setLogLevel(LogLevel.LOG_LEVEL_DEBUG) - .setRpcId(rpcId); + .setAuthority(authority) + .setType(EventType.CLIENT_HALF_CLOSE) + .setLogger(eventLogger) + .setCallId(callId); sink.write(logEntryBuilder.build()); } @@ -298,28 +295,25 @@ void logCancel( long seqId, String serviceName, String methodName, + String authority, GrpcLogRecord.EventLogger eventLogger, - String rpcId) { + String callId) { checkNotNull(serviceName, "serviceName"); checkNotNull(methodName, "methodName"); - checkNotNull(rpcId, "rpcId"); + checkNotNull(authority, "authority"); + checkNotNull(callId, "callId"); - GrpcLogRecord.Builder logEntryBuilder = createTimestamp() + GrpcLogRecord.Builder logEntryBuilder = GrpcLogRecord.newBuilder() .setSequenceId(seqId) .setServiceName(serviceName) .setMethodName(methodName) - .setEventType(EventType.GRPC_CALL_CANCEL) - .setEventLogger(eventLogger) - .setLogLevel(LogLevel.LOG_LEVEL_DEBUG) - .setRpcId(rpcId); + .setAuthority(authority) + .setType(EventType.CANCEL) + .setLogger(eventLogger) + .setCallId(callId); sink.write(logEntryBuilder.build()); } - GrpcLogRecord.Builder createTimestamp() { - long nanos = timeProvider.currentTimeNanos(); - return GrpcLogRecord.newBuilder().setTimestamp(Timestamps.fromNanos(nanos)); - } - // TODO(DNVindhya): Evaluate if we need following clause for metadata logging in GcpObservability // Leaving the implementation for now as is to have same behavior across Java and Go private static final Set NEVER_INCLUDED_METADATA = new HashSet<>( @@ -331,58 +325,65 @@ GrpcLogRecord.Builder createTimestamp() { Collections.singletonList( "grpc-trace-bin")); - static final class PayloadBuilder { - T payload; - int size; + static final class PayloadBuilderHelper { + T payloadBuilder; boolean truncated; - private PayloadBuilder(T payload, int size, boolean truncated) { - this.payload = payload; - this.size = size; + private PayloadBuilderHelper(T payload, boolean truncated) { + this.payloadBuilder = payload; this.truncated = truncated; } } - static PayloadBuilder createMetadataProto(Metadata metadata, + static PayloadBuilderHelper createMetadataProto(Metadata metadata, int maxHeaderBytes) { checkNotNull(metadata, "metadata"); checkArgument(maxHeaderBytes >= 0, "maxHeaderBytes must be non negative"); - GrpcLogRecord.Metadata.Builder metadataBuilder = GrpcLogRecord.Metadata.newBuilder(); - // This code is tightly coupled with io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata - // implementation - byte[][] serialized = InternalMetadata.serialize(metadata); + Joiner joiner = Joiner.on(",").skipNulls(); + Payload.Builder payloadBuilder = Payload.newBuilder(); boolean truncated = false; int totalMetadataBytes = 0; - if (serialized != null) { - // Calculate bytes for each GrpcLogRecord.Metadata.MetadataEntry - for (int i = 0; i < serialized.length; i += 2) { - String key = new String(serialized[i], Charsets.UTF_8); - byte[] value = serialized[i + 1]; - if (NEVER_INCLUDED_METADATA.contains(key)) { - continue; - } - boolean forceInclude = ALWAYS_INCLUDED_METADATA.contains(key); - int metadataBytesAfterAdd = totalMetadataBytes + key.length() + value.length; - if (!forceInclude && metadataBytesAfterAdd > maxHeaderBytes) { - truncated = true; - continue; - } - metadataBuilder.addEntryBuilder() - .setKey(key) - .setValue(ByteString.copyFrom(value)); - if (!forceInclude) { - // force included keys do not count towards the size limit - totalMetadataBytes = metadataBytesAfterAdd; - } + for (String key : metadata.keys()) { + if (NEVER_INCLUDED_METADATA.contains(key)) { + continue; + } + boolean forceInclude = ALWAYS_INCLUDED_METADATA.contains(key); + String metadataValue; + if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) { + Iterable metadataValues = + metadata.getAll(Metadata.Key.of(key, Metadata.BINARY_BYTE_MARSHALLER)); + List numList = new ArrayList(); + metadataValues.forEach( + (element) -> { + numList.add(BASE64_ENCODING_OMIT_PADDING.encode(element)); + }); + metadataValue = joiner.join(numList); + } else { + Iterable metadataValues = metadata.getAll( + Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)); + metadataValue = joiner.join(metadataValues); + } + + int metadataBytesAfterAdd = totalMetadataBytes + key.length() + metadataValue.length(); + if (!forceInclude && metadataBytesAfterAdd > maxHeaderBytes) { + truncated = true; + continue; + } + payloadBuilder.putMetadata(key, metadataValue); + if (!forceInclude) { + // force included keys do not count towards the size limit + totalMetadataBytes = metadataBytesAfterAdd; } } - return new PayloadBuilder<>(metadataBuilder, totalMetadataBytes, truncated); + return new PayloadBuilderHelper<>(payloadBuilder, truncated); } - static PayloadBuilder createMessageProto(byte[] message, int maxMessageBytes) { + static PayloadBuilderHelper createMessageProto( + byte[] message, int maxMessageBytes) { checkArgument(maxMessageBytes >= 0, "maxMessageBytes must be non negative"); + Payload.Builder payloadBuilder = Payload.newBuilder(); int desiredBytes = 0; int messageLength = message.length; if (maxMessageBytes > 0) { @@ -390,8 +391,10 @@ static PayloadBuilder createMessageProto(byte[] message, int maxMess } ByteString messageData = ByteString.copyFrom(message, 0, desiredBytes); + payloadBuilder.setMessage(messageData); + payloadBuilder.setMessageLength(messageLength); - return new PayloadBuilder<>(messageData, messageLength, + return new PayloadBuilderHelper<>(payloadBuilder, maxMessageBytes < message.length); } diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java index 0209677aae9..afcaaea8ed4 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java @@ -33,6 +33,7 @@ import io.grpc.internal.JsonParser; import io.grpc.observabilitylog.v1.GrpcLogRecord; import java.io.IOException; +import java.time.Instant; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -50,41 +51,36 @@ public class GcpLogSink implements Sink { private static final String DEFAULT_LOG_NAME = "microservices.googleapis.com%2Fobservability%2Fgrpc"; + private static final Severity DEFAULT_LOG_LEVEL = Severity.DEBUG; private static final String K8S_MONITORED_RESOURCE_TYPE = "k8s_container"; private static final Set kubernetesResourceLabelSet = ImmutableSet.of("project_id", "location", "cluster_name", "namespace_name", "pod_name", "container_name"); - private static final long FALLBACK_FLUSH_LIMIT = 100L; private final String projectId; private final Map customTags; private final MonitoredResource kubernetesResource; - private final Long flushLimit; /** Lazily initialize cloud logging client to avoid circular initialization. Because cloud * logging APIs also uses gRPC. */ private volatile Logging gcpLoggingClient; - private long flushCounter; private final Collection servicesToExclude; @VisibleForTesting GcpLogSink(Logging loggingClient, String destinationProjectId, Map locationTags, - Map customTags, Long flushLimit, Collection servicesToExclude) { - this(destinationProjectId, locationTags, customTags, flushLimit, servicesToExclude); + Map customTags, Collection servicesToExclude) { + this(destinationProjectId, locationTags, customTags, servicesToExclude); this.gcpLoggingClient = loggingClient; } /** * Retrieves a single instance of GcpLogSink. - * - * @param destinationProjectId cloud project id to write logs + * @param destinationProjectId cloud project id to write logs * @param servicesToExclude service names for which log entries should not be generated */ public GcpLogSink(String destinationProjectId, Map locationTags, - Map customTags, Long flushLimit, Collection servicesToExclude) { + Map customTags, Collection servicesToExclude) { this.projectId = destinationProjectId; this.customTags = getCustomTags(customTags, locationTags, destinationProjectId); this.kubernetesResource = getResource(locationTags); - this.flushLimit = flushLimit != null ? flushLimit : FALLBACK_FLUSH_LIMIT; - this.flushCounter = 0L; this.servicesToExclude = checkNotNull(servicesToExclude, "servicesToExclude"); } @@ -106,28 +102,24 @@ public void write(GrpcLogRecord logProto) { return; } try { - GrpcLogRecord.EventType event = logProto.getEventType(); - Severity logEntrySeverity = getCloudLoggingLevel(logProto.getLogLevel()); + GrpcLogRecord.EventType eventType = logProto.getType(); // TODO(DNVindhya): make sure all (int, long) values are not displayed as double // For now, every value is being converted as string because of JsonFormat.printer().print + Map logProtoMap = protoToMapConverter(logProto); LogEntry.Builder grpcLogEntryBuilder = - LogEntry.newBuilder(JsonPayload.of(protoToMapConverter(logProto))) - .setSeverity(logEntrySeverity) + LogEntry.newBuilder(JsonPayload.of(logProtoMap)) + .setSeverity(DEFAULT_LOG_LEVEL) .setLogName(DEFAULT_LOG_NAME) - .setResource(kubernetesResource); + .setResource(kubernetesResource) + .setTimestamp(Instant.now()); if (!customTags.isEmpty()) { grpcLogEntryBuilder.setLabels(customTags); } LogEntry grpcLogEntry = grpcLogEntryBuilder.build(); synchronized (this) { - logger.log(Level.FINEST, "Writing gRPC event : {0} to Cloud Logging", event); + logger.log(Level.FINEST, "Writing gRPC event : {0} to Cloud Logging", eventType); gcpLoggingClient.write(Collections.singleton(grpcLogEntry)); - flushCounter = ++flushCounter; - if (flushCounter >= flushLimit) { - gcpLoggingClient.flush(); - flushCounter = 0L; - } } } catch (Exception e) { logger.log(Level.SEVERE, "Caught exception while writing to Cloud Logging", e); @@ -175,29 +167,11 @@ static MonitoredResource getResource(Map resourceTags) { @SuppressWarnings("unchecked") private Map protoToMapConverter(GrpcLogRecord logProto) throws IOException { - JsonFormat.Printer printer = JsonFormat.printer().preservingProtoFieldNames(); + JsonFormat.Printer printer = JsonFormat.printer(); String recordJson = printer.print(logProto); return (Map) JsonParser.parse(recordJson); } - private Severity getCloudLoggingLevel(GrpcLogRecord.LogLevel recordLevel) { - switch (recordLevel.getNumber()) { - case 1: // GrpcLogRecord.LogLevel.LOG_LEVEL_TRACE - case 2: // GrpcLogRecord.LogLevel.LOG_LEVEL_DEBUG - return Severity.DEBUG; - case 3: // GrpcLogRecord.LogLevel.LOG_LEVEL_INFO - return Severity.INFO; - case 4: // GrpcLogRecord.LogLevel.LOG_LEVEL_WARN - return Severity.WARNING; - case 5: // GrpcLogRecord.LogLevel.LOG_LEVEL_ERROR - return Severity.ERROR; - case 6: // GrpcLogRecord.LogLevel.LOG_LEVEL_CRITICAL - return Severity.CRITICAL; - default: - return Severity.DEFAULT; - } - } - /** * Closes Cloud Logging Client. */ diff --git a/gcp-observability/src/main/proto/grpc/observabilitylog/v1/observabilitylog.proto b/gcp-observability/src/main/proto/grpc/observabilitylog/v1/observabilitylog.proto index a37ac6f43d0..85ef00ac2dd 100644 --- a/gcp-observability/src/main/proto/grpc/observabilitylog/v1/observabilitylog.proto +++ b/gcp-observability/src/main/proto/grpc/observabilitylog/v1/observabilitylog.proto @@ -28,151 +28,99 @@ option java_outer_classname = "ObservabilityLogProto"; message GrpcLogRecord { // List of event types enum EventType { - GRPC_CALL_UNKNOWN = 0; + EVENT_TYPE_UNKNOWN = 0; // Header sent from client to server - GRPC_CALL_REQUEST_HEADER = 1; + CLIENT_HEADER = 1; // Header sent from server to client - GRPC_CALL_RESPONSE_HEADER = 2; + SERVER_HEADER = 2; // Message sent from client to server - GRPC_CALL_REQUEST_MESSAGE = 3; + CLIENT_MESSAGE = 3; // Message sent from server to client - GRPC_CALL_RESPONSE_MESSAGE = 4; - // Trailer indicates the end of the gRPC call - GRPC_CALL_TRAILER = 5; + SERVER_MESSAGE = 4; // A signal that client is done sending - GRPC_CALL_HALF_CLOSE = 6; + CLIENT_HALF_CLOSE = 5; + // Trailer indicates the end of the gRPC call + SERVER_TRAILER = 6; // A signal that the rpc is canceled - GRPC_CALL_CANCEL = 7; + CANCEL = 7; } + // The entity that generates the log entry enum EventLogger { LOGGER_UNKNOWN = 0; - LOGGER_CLIENT = 1; - LOGGER_SERVER = 2; - } - // The log severity level of the log entry - enum LogLevel { - LOG_LEVEL_UNKNOWN = 0; - LOG_LEVEL_TRACE = 1; - LOG_LEVEL_DEBUG = 2; - LOG_LEVEL_INFO = 3; - LOG_LEVEL_WARN = 4; - LOG_LEVEL_ERROR = 5; - LOG_LEVEL_CRITICAL = 6; + CLIENT = 1; + SERVER = 2; } - // The timestamp of the log event - google.protobuf.Timestamp timestamp = 1; - - // Uniquely identifies a call. The value must not be 0 in order to disambiguate - // from an unset value. - // Each call may have several log entries. They will all have the same rpc_id. + // Uniquely identifies a call. + // Each call may have several log entries. They will all have the same call_id. // Nothing is guaranteed about their value other than they are unique across // different RPCs in the same gRPC process. - string rpc_id = 2; + string call_id = 2; - EventType event_type = 3; // one of the above EventType enum - EventLogger event_logger = 4; // one of the above EventLogger enum + // The entry sequence ID for this call. The first message has a value of 1, + // to disambiguate from an unset value. The purpose of this field is to + // detect missing entries in environments where durability or ordering is + // not guaranteed. + uint64 sequence_id = 3; - // the name of the service - string service_name = 5; - // the name of the RPC method - string method_name = 6; + EventType type = 4; // one of the above EventType enum + EventLogger logger = 5; // one of the above EventLogger enum - LogLevel log_level = 7; // one of the above LogLevel enum + // Payload for log entry. + // It can include a combination of {metadata, message, status based on type of + // the event event being logged and config options. + Payload payload = 6; + // true if message or metadata field is either truncated or omitted due + // to config options + bool payload_truncated = 7; // Peer address information. On client side, peer is logged on server // header event or trailer event (if trailer-only). On server side, peer // is always logged on the client header event. - Address peer_address = 8; - - // the RPC timeout value - google.protobuf.Duration timeout = 11; + Address peer = 8; // A single process may be used to run multiple virtual servers with // different identities. // The authority is the name of such a server identify. It is typically a // portion of the URI in the form of or :. - string authority = 12; - - // Size of the message or metadata, depending on the event type, - // regardless of whether the full message or metadata is being logged - // (i.e. could be truncated or omitted). - uint32 payload_size = 13; - - // true if message or metadata field is either truncated or omitted due - // to config options - bool payload_truncated = 14; - - // Used by header event or trailer event - Metadata metadata = 15; - - // The entry sequence ID for this call. The first message has a value of 1, - // to disambiguate from an unset value. The purpose of this field is to - // detect missing entries in environments where durability or ordering is - // not guaranteed. - uint64 sequence_id = 16; - - // Used by message event - bytes message = 17; + string authority = 10; + // the name of the service + string service_name = 11; + // the name of the RPC method + string method_name = 12; +} +message Payload { + // A list of metadata pairs + map metadata = 1; + // the RPC timeout value + google.protobuf.Duration timeout = 2; // The gRPC status code - uint32 status_code = 18; - + uint32 status_code = 3; // The gRPC status message - string status_message = 19; - + string status_message = 4; // The value of the grpc-status-details-bin metadata key, if any. // This is always an encoded google.rpc.Status message - bytes status_details = 20; - - // Attributes of the environment generating log record. The purpose of this - // field is to identify the source environment. - EnvironmentTags env_tags = 21; - - // A list of non-gRPC custom values specified by the application - repeated CustomTags custom_tags = 22; - - // A list of metadata pairs - message Metadata { - repeated MetadataEntry entry = 1; - } - - // One metadata key value pair - message MetadataEntry { - string key = 1; - bytes value = 2; - } - - // Address information - message Address { - enum Type { - TYPE_UNKNOWN = 0; - TYPE_IPV4 = 1; // in 1.2.3.4 form - TYPE_IPV6 = 2; // IPv6 canonical form (RFC5952 section 4) - TYPE_UNIX = 3; // UDS string - } - Type type = 1; - string address = 2; - // only for TYPE_IPV4 and TYPE_IPV6 - uint32 ip_port = 3; - } - - // Source Environment information - message EnvironmentTags { - string gcp_project_id = 1; - string gcp_numeric_project_id = 2; - string gce_instance_id = 3; - string gce_instance_hostname = 4; - string gce_instance_zone = 5; - string gke_cluster_uid = 6; - string gke_cluster_name = 7; - string gke_cluster_location = 8; - } + bytes status_details = 5; + // Size of the message or metadata, depending on the event type, + // regardless of whether the full message or metadata is being logged + // (i.e. could be truncated or omitted). + uint32 message_length = 6; + // Used by message event + bytes message = 7; +} - // Custom key value pair - message CustomTags { - string key = 1; - string value = 2; +// Address information +message Address { + enum Type { + TYPE_UNKNOWN = 0; + TYPE_IPV4 = 1; // in 1.2.3.4 form + TYPE_IPV6 = 2; // IPv6 canonical form (RFC5952 section 4) + TYPE_UNIX = 3; // UDS string } + Type type = 1; + string address = 2; + // only for TYPE_IPV4 and TYPE_IPV6 + uint32 ip_port = 3; } diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java index aa6c2d55d8b..52267cf03df 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java @@ -36,7 +36,6 @@ import io.grpc.gcp.observability.interceptors.LogHelper; import io.grpc.gcp.observability.logging.GcpLogSink; import io.grpc.gcp.observability.logging.Sink; -import io.grpc.internal.TimeProvider; import io.grpc.observabilitylog.v1.GrpcLogRecord; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; import io.grpc.testing.GrpcCleanupRule; @@ -67,7 +66,6 @@ public class LoggingTest { private static final ImmutableMap CUSTOM_TAGS = ImmutableMap.of( "KEY1", "Value1", "KEY2", "VALUE2"); - private static final long FLUSH_LIMIT = 100L; private final StaticTestingClassLoader classLoader = new StaticTestingClassLoader(getClass().getClassLoader(), Pattern.compile("io\\.grpc\\..*")); @@ -113,9 +111,9 @@ public static final class StaticTestingClassEndtoEndLogging implements Runnable public void run() { Sink sink = new GcpLogSink( - PROJECT_ID, LOCATION_TAGS, CUSTOM_TAGS, FLUSH_LIMIT, Collections.emptySet()); + PROJECT_ID, LOCATION_TAGS, CUSTOM_TAGS, Collections.emptySet()); ObservabilityConfig config = mock(ObservabilityConfig.class); - LogHelper spyLogHelper = spy(new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER)); + LogHelper spyLogHelper = spy(new LogHelper(sink)); ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); @@ -123,7 +121,7 @@ public void run() { new InternalLoggingServerInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); when(config.isEnableCloudLogging()).thenReturn(true); - FilterParams logAlwaysFilterParams = FilterParams.create(true, 0, 0); + FilterParams logAlwaysFilterParams = FilterParams.create(true, 1024, 10); when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class))) .thenReturn(logAlwaysFilterParams); when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))).thenReturn(true); @@ -156,7 +154,7 @@ public static final class StaticTestingClassLogNever implements Runnable { public void run() { Sink mockSink = mock(GcpLogSink.class); ObservabilityConfig config = mock(ObservabilityConfig.class); - LogHelper spyLogHelper = spy(new LogHelper(mockSink, TimeProvider.SYSTEM_TIME_PROVIDER)); + LogHelper spyLogHelper = spy(new LogHelper(mockSink)); ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); @@ -209,16 +207,16 @@ public void run() { FilterParams logAlwaysFilterParams = FilterParams.create(true, 0, 0); when(mockFilterHelper2.isMethodToBeLogged(any(MethodDescriptor.class))) .thenReturn(logAlwaysFilterParams); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER)) + when(mockFilterHelper2.isEventToBeLogged(EventType.CLIENT_HEADER)) .thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)) + when(mockFilterHelper2.isEventToBeLogged(EventType.SERVER_HEADER)) .thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)).thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_TRAILER)).thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_CANCEL)).thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_MESSAGE)) + when(mockFilterHelper2.isEventToBeLogged(EventType.CLIENT_HALF_CLOSE)).thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.SERVER_TRAILER)).thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.CANCEL)).thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.CLIENT_MESSAGE)) .thenReturn(false); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_MESSAGE)) + when(mockFilterHelper2.isEventToBeLogged(EventType.SERVER_MESSAGE)) .thenReturn(false); try (GcpObservability observability = diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/ObservabilityConfigImplTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/ObservabilityConfigImplTest.java index 821dcd43ee4..b2541d6a64a 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/ObservabilityConfigImplTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/ObservabilityConfigImplTest.java @@ -46,13 +46,12 @@ public class ObservabilityConfigImplTest { private static final String EVENT_TYPES = "{\n" + " \"enable_cloud_logging\": false,\n" + " \"event_types\": " - + "[\"GRPC_CALL_REQUEST_HEADER\", \"GRPC_CALL_HALF_CLOSE\", \"GRPC_CALL_TRAILER\"]\n" + + "[\"CLIENT_HEADER\", \"CLIENT_HALF_CLOSE\", \"SERVER_TRAILER\"]\n" + "}"; private static final String LOG_FILTERS = "{\n" + " \"enable_cloud_logging\": true,\n" + " \"destination_project_id\": \"grpc-testing\",\n" - + " \"flush_message_count\": 1000,\n" + " \"log_filters\": [{\n" + " \"pattern\": \"*/*\",\n" + " \"header_bytes\": 4096,\n" @@ -69,11 +68,6 @@ public class ObservabilityConfigImplTest { + " \"destination_project_id\": \"grpc-testing\"\n" + "}"; - private static final String FLUSH_MESSAGE_COUNT = "{\n" - + " \"enable_cloud_logging\": true,\n" - + " \"flush_message_count\": 500\n" - + "}"; - private static final String DISABLE_CLOUD_LOGGING = "{\n" + " \"enable_cloud_logging\": false\n" + "}"; @@ -146,7 +140,6 @@ public void emptyConfig() throws IOException { assertFalse(observabilityConfig.isEnableCloudMonitoring()); assertFalse(observabilityConfig.isEnableCloudTracing()); assertNull(observabilityConfig.getDestinationProjectId()); - assertNull(observabilityConfig.getFlushMessageCount()); assertNull(observabilityConfig.getLogFilters()); assertNull(observabilityConfig.getEventTypes()); } @@ -158,7 +151,6 @@ public void disableCloudLogging() throws IOException { assertFalse(observabilityConfig.isEnableCloudMonitoring()); assertFalse(observabilityConfig.isEnableCloudTracing()); assertNull(observabilityConfig.getDestinationProjectId()); - assertNull(observabilityConfig.getFlushMessageCount()); assertNull(observabilityConfig.getLogFilters()); assertNull(observabilityConfig.getEventTypes()); } @@ -170,19 +162,11 @@ public void destProjectId() throws IOException { assertThat(observabilityConfig.getDestinationProjectId()).isEqualTo("grpc-testing"); } - @Test - public void flushMessageCount() throws Exception { - observabilityConfig.parse(FLUSH_MESSAGE_COUNT); - assertTrue(observabilityConfig.isEnableCloudLogging()); - assertThat(observabilityConfig.getFlushMessageCount()).isEqualTo(500L); - } - @Test public void logFilters() throws IOException { observabilityConfig.parse(LOG_FILTERS); assertTrue(observabilityConfig.isEnableCloudLogging()); assertThat(observabilityConfig.getDestinationProjectId()).isEqualTo("grpc-testing"); - assertThat(observabilityConfig.getFlushMessageCount()).isEqualTo(1000L); List logFilters = observabilityConfig.getLogFilters(); assertThat(logFilters).hasSize(2); assertThat(logFilters.get(0).pattern).isEqualTo("*/*"); @@ -199,8 +183,8 @@ public void eventTypes() throws IOException { assertFalse(observabilityConfig.isEnableCloudLogging()); List eventTypes = observabilityConfig.getEventTypes(); assertThat(eventTypes).isEqualTo( - ImmutableList.of(EventType.GRPC_CALL_REQUEST_HEADER, EventType.GRPC_CALL_HALF_CLOSE, - EventType.GRPC_CALL_TRAILER)); + ImmutableList.of(EventType.CLIENT_HEADER, EventType.CLIENT_HALF_CLOSE, + EventType.SERVER_TRAILER)); } @Test @@ -265,7 +249,6 @@ public void configFileLogFilters() throws Exception { observabilityConfig.parseFile(configFile.getAbsolutePath()); assertTrue(observabilityConfig.isEnableCloudLogging()); assertThat(observabilityConfig.getDestinationProjectId()).isEqualTo("grpc-testing"); - assertThat(observabilityConfig.getFlushMessageCount()).isEqualTo(1000L); List logFilters = observabilityConfig.getLogFilters(); assertThat(logFilters).hasSize(2); assertThat(logFilters.get(0).pattern).isEqualTo("*/*"); diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/ConfigFilterHelperTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/ConfigFilterHelperTest.java index ba6e05e2dcd..2da869472ef 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/ConfigFilterHelperTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/ConfigFilterHelperTest.java @@ -48,9 +48,9 @@ public class ConfigFilterHelperTest { private static final ImmutableList configEventTypes = ImmutableList.of( - EventType.GRPC_CALL_REQUEST_HEADER, - EventType.GRPC_CALL_HALF_CLOSE, - EventType.GRPC_CALL_TRAILER); + EventType.CLIENT_HEADER, + EventType.CLIENT_HALF_CLOSE, + EventType.SERVER_TRAILER); private final MethodDescriptor.Builder builder = TestMethodDescriptors.voidMethod() .toBuilder(); @@ -171,13 +171,13 @@ public void checkMethodToBeLoggedConditional() { @Test public void checkEventToBeLogged_noFilter_defaultLogAllEventTypes() { List eventList = new ArrayList<>(); - eventList.add(EventType.GRPC_CALL_REQUEST_HEADER); - eventList.add(EventType.GRPC_CALL_RESPONSE_HEADER); - eventList.add(EventType.GRPC_CALL_REQUEST_MESSAGE); - eventList.add(EventType.GRPC_CALL_RESPONSE_MESSAGE); - eventList.add(EventType.GRPC_CALL_HALF_CLOSE); - eventList.add(EventType.GRPC_CALL_TRAILER); - eventList.add(EventType.GRPC_CALL_CANCEL); + eventList.add(EventType.CLIENT_HEADER); + eventList.add(EventType.SERVER_HEADER); + eventList.add(EventType.CLIENT_MESSAGE); + eventList.add(EventType.SERVER_MESSAGE); + eventList.add(EventType.CLIENT_HALF_CLOSE); + eventList.add(EventType.SERVER_TRAILER); + eventList.add(EventType.CANCEL); for (EventType event : eventList) { assertTrue(configFilterHelper.isEventToBeLogged(event)); @@ -191,13 +191,13 @@ public void checkEventToBeLogged_emptyFilter_doNotLogEventTypes() { configFilterHelper.setEventFilterSet(); List eventList = new ArrayList<>(); - eventList.add(EventType.GRPC_CALL_REQUEST_HEADER); - eventList.add(EventType.GRPC_CALL_RESPONSE_HEADER); - eventList.add(EventType.GRPC_CALL_REQUEST_MESSAGE); - eventList.add(EventType.GRPC_CALL_RESPONSE_MESSAGE); - eventList.add(EventType.GRPC_CALL_HALF_CLOSE); - eventList.add(EventType.GRPC_CALL_TRAILER); - eventList.add(EventType.GRPC_CALL_CANCEL); + eventList.add(EventType.CLIENT_HEADER); + eventList.add(EventType.SERVER_HEADER); + eventList.add(EventType.CLIENT_MESSAGE); + eventList.add(EventType.SERVER_MESSAGE); + eventList.add(EventType.CLIENT_HALF_CLOSE); + eventList.add(EventType.SERVER_TRAILER); + eventList.add(EventType.CANCEL); for (EventType event : eventList) { assertFalse(configFilterHelper.isEventToBeLogged(event)); @@ -209,10 +209,10 @@ public void checkEventToBeLogged_withEventTypesFromConfig() { when(mockConfig.getEventTypes()).thenReturn(configEventTypes); configFilterHelper.setEventFilterSet(); - EventType logEventType = EventType.GRPC_CALL_REQUEST_HEADER; + EventType logEventType = EventType.CLIENT_HEADER; assertTrue(configFilterHelper.isEventToBeLogged(logEventType)); - EventType doNotLogEventType = EventType.GRPC_CALL_RESPONSE_MESSAGE; + EventType doNotLogEventType = EventType.SERVER_MESSAGE; assertFalse(configFilterHelper.isEventToBeLogged(doNotLogEventType)); } } diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptorTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptorTest.java index 025c99e5b6a..082bca826c5 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptorTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptorTest.java @@ -185,7 +185,7 @@ public String authority() { clientInitial.put(keyA, dataA); clientInitial.put(keyB, dataB); interceptedLoggingCall.start(mockListener, clientInitial); - verify(mockLogHelper).logRequestHeader( + verify(mockLogHelper).logClientHeader( /*seq=*/ eq(1L), eq("service"), eq("method"), @@ -193,7 +193,7 @@ public String authority() { ArgumentMatchers.isNull(), same(clientInitial), eq(filterParams.headerBytes()), - eq(EventLogger.LOGGER_CLIENT), + eq(EventLogger.CLIENT), anyString(), ArgumentMatchers.isNull()); verifyNoMoreInteractions(mockLogHelper); @@ -207,13 +207,14 @@ public String authority() { { Metadata serverInitial = new Metadata(); interceptedListener.get().onHeaders(serverInitial); - verify(mockLogHelper).logResponseHeader( + verify(mockLogHelper).logServerHeader( /*seq=*/ eq(2L), eq("service"), eq("method"), + eq("the-authority"), same(serverInitial), eq(filterParams.headerBytes()), - eq(EventLogger.LOGGER_CLIENT), + eq(EventLogger.CLIENT), anyString(), same(peer)); verifyNoMoreInteractions(mockLogHelper); @@ -231,10 +232,11 @@ public String authority() { /*seq=*/ eq(3L), eq("service"), eq("method"), - eq(EventType.GRPC_CALL_REQUEST_MESSAGE), + eq("the-authority"), + eq(EventType.CLIENT_MESSAGE), same(request), eq(filterParams.messageBytes()), - eq(EventLogger.LOGGER_CLIENT), + eq(EventLogger.CLIENT), anyString()); verifyNoMoreInteractions(mockLogHelper); assertSame(request, actualRequest.get()); @@ -250,7 +252,8 @@ public String authority() { /*seq=*/ eq(4L), eq("service"), eq("method"), - eq(EventLogger.LOGGER_CLIENT), + eq("the-authority"), + eq(EventLogger.CLIENT), anyString()); halfCloseCalled.get(1, TimeUnit.MILLISECONDS); verifyNoMoreInteractions(mockLogHelper); @@ -267,10 +270,11 @@ public String authority() { /*seq=*/ eq(5L), eq("service"), eq("method"), - eq(EventType.GRPC_CALL_RESPONSE_MESSAGE), + eq("the-authority"), + eq(EventType.SERVER_MESSAGE), same(response), eq(filterParams.messageBytes()), - eq(EventLogger.LOGGER_CLIENT), + eq(EventLogger.CLIENT), anyString()); verifyNoMoreInteractions(mockLogHelper); verify(mockListener).onMessage(same(response)); @@ -288,10 +292,11 @@ public String authority() { /*seq=*/ eq(6L), eq("service"), eq("method"), + eq("the-authority"), same(status), same(trailers), eq(filterParams.headerBytes()), - eq(EventLogger.LOGGER_CLIENT), + eq(EventLogger.CLIENT), anyString(), same(peer)); verifyNoMoreInteractions(mockLogHelper); @@ -308,7 +313,8 @@ public String authority() { /*seq=*/ eq(7L), eq("service"), eq("method"), - eq(EventLogger.LOGGER_CLIENT), + eq("the-authority"), + eq(EventLogger.CLIENT), anyString()); cancelCalled.get(1, TimeUnit.MILLISECONDS); } @@ -349,7 +355,7 @@ public String authority() { interceptedLoggingCall.start(mockListener, new Metadata()); ArgumentCaptor callOptTimeoutCaptor = ArgumentCaptor.forClass(Duration.class); verify(mockLogHelper, times(1)) - .logRequestHeader( + .logClientHeader( anyLong(), AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), @@ -408,7 +414,7 @@ public String authority() { Objects.requireNonNull(callFuture.get()).start(mockListener, new Metadata()); ArgumentCaptor contextTimeoutCaptor = ArgumentCaptor.forClass(Duration.class); verify(mockLogHelper, times(1)) - .logRequestHeader( + .logClientHeader( anyLong(), AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), @@ -470,7 +476,7 @@ public String authority() { Objects.requireNonNull(callFuture.get()).start(mockListener, new Metadata()); ArgumentCaptor timeoutCaptor = ArgumentCaptor.forClass(Duration.class); verify(mockLogHelper, times(1)) - .logRequestHeader( + .logClientHeader( anyLong(), AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), @@ -633,8 +639,8 @@ public String authority() { @Test public void eventFilter_enabled() { - when(mockFilterHelper.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER)).thenReturn(false); - when(mockFilterHelper.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)).thenReturn(false); + when(mockFilterHelper.isEventToBeLogged(EventType.CLIENT_HEADER)).thenReturn(false); + when(mockFilterHelper.isEventToBeLogged(EventType.SERVER_HEADER)).thenReturn(false); Channel channel = new Channel() { @Override @@ -697,7 +703,7 @@ public String authority() { { interceptedLoggingCall.start(mockListener, new Metadata()); - verify(mockLogHelper, never()).logRequestHeader( + verify(mockLogHelper, never()).logClientHeader( anyLong(), AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), @@ -710,10 +716,11 @@ public String authority() { AdditionalMatchers.or(ArgumentMatchers.isNull(), ArgumentMatchers.any())); interceptedListener.get().onHeaders(new Metadata()); - verify(mockLogHelper, never()).logResponseHeader( + verify(mockLogHelper, never()).logServerHeader( anyLong(), AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), + AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), any(Metadata.class), anyInt(), any(GrpcLogRecord.EventLogger.class), diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptorTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptorTest.java index a222da4c4d3..d8a3bafbdb5 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptorTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptorTest.java @@ -167,7 +167,7 @@ public String getAuthority() { }); // receive request header { - verify(mockLogHelper).logRequestHeader( + verify(mockLogHelper).logClientHeader( /*seq=*/ eq(1L), eq("service"), eq("method"), @@ -175,7 +175,7 @@ public String getAuthority() { ArgumentMatchers.isNull(), same(clientInitial), eq(filterParams.headerBytes()), - eq(EventLogger.LOGGER_SERVER), + eq(EventLogger.SERVER), anyString(), same(peer)); verifyNoMoreInteractions(mockLogHelper); @@ -188,13 +188,14 @@ public String getAuthority() { { Metadata serverInitial = new Metadata(); interceptedLoggingCall.get().sendHeaders(serverInitial); - verify(mockLogHelper).logResponseHeader( + verify(mockLogHelper).logServerHeader( /*seq=*/ eq(2L), eq("service"), eq("method"), + eq("the-authority"), same(serverInitial), eq(filterParams.headerBytes()), - eq(EventLogger.LOGGER_SERVER), + eq(EventLogger.SERVER), anyString(), ArgumentMatchers.isNull()); verifyNoMoreInteractions(mockLogHelper); @@ -212,10 +213,11 @@ public String getAuthority() { /*seq=*/ eq(3L), eq("service"), eq("method"), - eq(EventType.GRPC_CALL_REQUEST_MESSAGE), + eq("the-authority"), + eq(EventType.CLIENT_MESSAGE), same(request), eq(filterParams.messageBytes()), - eq(EventLogger.LOGGER_SERVER), + eq(EventLogger.SERVER), anyString()); verifyNoMoreInteractions(mockLogHelper); verify(mockListener).onMessage(same(request)); @@ -231,7 +233,8 @@ public String getAuthority() { /*seq=*/ eq(4L), eq("service"), eq("method"), - eq(EventLogger.LOGGER_SERVER), + eq("the-authority"), + eq(EventLogger.SERVER), anyString()); verifyNoMoreInteractions(mockLogHelper); verify(mockListener).onHalfClose(); @@ -248,10 +251,11 @@ public String getAuthority() { /*seq=*/ eq(5L), eq("service"), eq("method"), - eq(EventType.GRPC_CALL_RESPONSE_MESSAGE), + eq("the-authority"), + eq(EventType.SERVER_MESSAGE), same(response), eq(filterParams.messageBytes()), - eq(EventLogger.LOGGER_SERVER), + eq(EventLogger.SERVER), anyString()); verifyNoMoreInteractions(mockLogHelper); assertSame(response, actualResponse.get()); @@ -269,10 +273,11 @@ public String getAuthority() { /*seq=*/ eq(6L), eq("service"), eq("method"), + eq("the-authority"), same(status), same(trailers), eq(filterParams.headerBytes()), - eq(EventLogger.LOGGER_SERVER), + eq(EventLogger.SERVER), anyString(), ArgumentMatchers.isNull()); verifyNoMoreInteractions(mockLogHelper); @@ -290,7 +295,8 @@ public String getAuthority() { /*seq=*/ eq(7L), eq("service"), eq("method"), - eq(EventLogger.LOGGER_SERVER), + eq("the-authority"), + eq(EventLogger.SERVER), anyString()); verify(mockListener).onCancel(); } @@ -332,7 +338,7 @@ public String getAuthority() { }); ArgumentCaptor timeoutCaptor = ArgumentCaptor.forClass(Duration.class); verify(mockLogHelper, times(1)) - .logRequestHeader( + .logClientHeader( /*seq=*/ eq(1L), eq("service"), eq("method"), @@ -340,7 +346,7 @@ public String getAuthority() { timeoutCaptor.capture(), any(Metadata.class), eq(filterParams.headerBytes()), - eq(EventLogger.LOGGER_SERVER), + eq(EventLogger.SERVER), anyString(), ArgumentMatchers.isNull()); verifyNoMoreInteractions(mockLogHelper); @@ -480,7 +486,7 @@ public String getAuthority() { @Test public void eventFilter_enabled() { - when(mockFilterHelper.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)).thenReturn(false); + when(mockFilterHelper.isEventToBeLogged(EventType.CLIENT_HALF_CLOSE)).thenReturn(false); Metadata clientInitial = new Metadata(); final MethodDescriptor method = @@ -551,6 +557,7 @@ public String getAuthority() { anyLong(), AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), + AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), any(GrpcLogRecord.EventLogger.class), anyString()); capturedListener.onCancel(); diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/LogHelperTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/LogHelperTest.java index 209543595d6..73704eb4181 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/LogHelperTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/LogHelperTest.java @@ -26,26 +26,22 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import com.google.common.collect.Iterables; import com.google.protobuf.ByteString; import com.google.protobuf.Duration; -import com.google.protobuf.Timestamp; import com.google.protobuf.util.Durations; import io.grpc.Attributes; import io.grpc.Grpc; import io.grpc.Metadata; import io.grpc.MethodDescriptor.Marshaller; import io.grpc.Status; -import io.grpc.gcp.observability.interceptors.LogHelper.PayloadBuilder; +import io.grpc.gcp.observability.interceptors.LogHelper.PayloadBuilderHelper; import io.grpc.gcp.observability.logging.GcpLogSink; import io.grpc.gcp.observability.logging.Sink; -import io.grpc.internal.TimeProvider; +import io.grpc.observabilitylog.v1.Address; import io.grpc.observabilitylog.v1.GrpcLogRecord; -import io.grpc.observabilitylog.v1.GrpcLogRecord.Address; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; -import io.grpc.observabilitylog.v1.GrpcLogRecord.LogLevel; -import io.grpc.observabilitylog.v1.GrpcLogRecord.MetadataEntry; +import io.grpc.observabilitylog.v1.Payload; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -55,8 +51,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.charset.StandardCharsets; -import java.util.Objects; -import java.util.concurrent.TimeUnit; +import java.util.HashMap; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -77,33 +72,12 @@ public class LogHelperTest { Metadata.Key.of("b", Metadata.ASCII_STRING_MARSHALLER); private static final Metadata.Key KEY_C = Metadata.Key.of("c", Metadata.ASCII_STRING_MARSHALLER); - private static final MetadataEntry ENTRY_A = - MetadataEntry - .newBuilder() - .setKey(KEY_A.name()) - .setValue(ByteString.copyFrom(DATA_A.getBytes(StandardCharsets.US_ASCII))) - .build(); - private static final MetadataEntry ENTRY_B = - MetadataEntry - .newBuilder() - .setKey(KEY_B.name()) - .setValue(ByteString.copyFrom(DATA_B.getBytes(StandardCharsets.US_ASCII))) - .build(); - private static final MetadataEntry ENTRY_C = - MetadataEntry - .newBuilder() - .setKey(KEY_C.name()) - .setValue(ByteString.copyFrom(DATA_C.getBytes(StandardCharsets.US_ASCII))) - .build(); private static final int HEADER_LIMIT = 10; private static final int MESSAGE_LIMIT = Integer.MAX_VALUE; private final Metadata nonEmptyMetadata = new Metadata(); private final Sink sink = mock(GcpLogSink.class); - private final Timestamp timestamp - = Timestamp.newBuilder().setSeconds(9876).setNanos(54321).build(); - private final TimeProvider timeProvider = () -> TimeUnit.SECONDS.toNanos(9876) + 54321; - private final LogHelper logHelper = new LogHelper(sink, timeProvider); + private final LogHelper logHelper = new LogHelper(sink); @Before public void setUp() { @@ -159,29 +133,26 @@ public String toString() { @Test public void metadataToProto_empty() { assertThat(metadataToProtoTestHelper( - EventType.GRPC_CALL_REQUEST_HEADER, new Metadata(), Integer.MAX_VALUE)) + EventType.CLIENT_HEADER, new Metadata(), Integer.MAX_VALUE)) .isEqualTo(GrpcLogRecord.newBuilder() - .setEventType(EventType.GRPC_CALL_REQUEST_HEADER) - .setMetadata( - GrpcLogRecord.Metadata.getDefaultInstance()) + .setType(EventType.CLIENT_HEADER) + .setPayload( + Payload.newBuilder().putAllMetadata(new HashMap<>())) .build()); } @Test public void metadataToProto() { - int nonEmptyMetadataSize = 30; + Payload.Builder payloadBuilder = Payload.newBuilder() + .putMetadata("a", DATA_A) + .putMetadata("b", DATA_B) + .putMetadata("c", DATA_C); + assertThat(metadataToProtoTestHelper( - EventType.GRPC_CALL_REQUEST_HEADER, nonEmptyMetadata, Integer.MAX_VALUE)) + EventType.CLIENT_HEADER, nonEmptyMetadata, Integer.MAX_VALUE)) .isEqualTo(GrpcLogRecord.newBuilder() - .setEventType(EventType.GRPC_CALL_REQUEST_HEADER) - .setMetadata( - GrpcLogRecord.Metadata - .newBuilder() - .addEntry(ENTRY_A) - .addEntry(ENTRY_B) - .addEntry(ENTRY_C) - .build()) - .setPayloadSize(nonEmptyMetadataSize) + .setType(EventType.CLIENT_HEADER) + .setPayload(payloadBuilder) .build()); } @@ -193,44 +164,45 @@ public void metadataToProto_setsTruncated() { @Test public void metadataToProto_truncated() { // 0 byte limit not enough for any metadata - assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 0).payload.build()) - .isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata.getDefaultInstance()); + assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 0).payloadBuilder.build()) + .isEqualTo( + io.grpc.observabilitylog.v1.Payload.newBuilder() + .putAllMetadata(new HashMap<>()) + .build()); // not enough bytes for first key value - assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 9).payload.build()) - .isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata.getDefaultInstance()); + assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 9).payloadBuilder.build()) + .isEqualTo( + io.grpc.observabilitylog.v1.Payload.newBuilder() + .putAllMetadata(new HashMap<>()) + .build()); // enough for first key value - assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 10).payload.build()) - .isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata - .newBuilder() - .addEntry(ENTRY_A) - .build()); + assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 10).payloadBuilder.build()) + .isEqualTo( + io.grpc.observabilitylog.v1.Payload.newBuilder().putMetadata("a", DATA_A).build()); // Test edge cases for >= 2 key values - assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 19).payload.build()) - .isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata - .newBuilder() - .addEntry(ENTRY_A) - .build()); - assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 20).payload.build()) - .isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata - .newBuilder() - .addEntry(ENTRY_A) - .addEntry(ENTRY_B) - .build()); - assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 29).payload.build()) - .isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata - .newBuilder() - .addEntry(ENTRY_A) - .addEntry(ENTRY_B) - .build()); - + assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 19).payloadBuilder.build()) + .isEqualTo( + io.grpc.observabilitylog.v1.Payload.newBuilder().putMetadata("a", DATA_A).build()); + assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 20).payloadBuilder.build()) + .isEqualTo( + io.grpc.observabilitylog.v1.Payload.newBuilder() + .putMetadata("a", DATA_A) + .putMetadata("b", DATA_B) + .build()); + assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 29).payloadBuilder.build()) + .isEqualTo( + io.grpc.observabilitylog.v1.Payload.newBuilder() + .putMetadata("a", DATA_A) + .putMetadata("b", DATA_B) + .build()); // not truncated: enough for all keys - assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 30).payload.build()) - .isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata - .newBuilder() - .addEntry(ENTRY_A) - .addEntry(ENTRY_B) - .addEntry(ENTRY_C) - .build()); + assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 30).payloadBuilder.build()) + .isEqualTo( + io.grpc.observabilitylog.v1.Payload.newBuilder() + .putMetadata("a", DATA_A) + .putMetadata("b", DATA_B) + .putMetadata("c", DATA_C) + .build()); } @Test @@ -240,8 +212,11 @@ public void messageToProto() { StandardCharsets.US_ASCII); assertThat(messageTestHelper(bytes, Integer.MAX_VALUE)) .isEqualTo(GrpcLogRecord.newBuilder() - .setMessage(ByteString.copyFrom(bytes)) - .setPayloadSize(bytes.length) + .setPayload( + Payload.newBuilder() + .setMessage( + ByteString.copyFrom(bytes)) + .setMessageLength(bytes.length)) .build()); } @@ -252,18 +227,25 @@ public void messageToProto_truncated() { StandardCharsets.US_ASCII); assertThat(messageTestHelper(bytes, 0)) .isEqualTo(GrpcLogRecord.newBuilder() - .setPayloadSize(bytes.length) + .setPayload( + Payload.newBuilder() + .setMessageLength(bytes.length)) .setPayloadTruncated(true) .build()); int limit = 10; String truncatedMessage = "this is a "; assertThat(messageTestHelper(bytes, limit)) - .isEqualTo(GrpcLogRecord.newBuilder() - .setMessage(ByteString.copyFrom(truncatedMessage.getBytes(StandardCharsets.US_ASCII))) - .setPayloadSize(bytes.length) - .setPayloadTruncated(true) - .build()); + .isEqualTo( + GrpcLogRecord.newBuilder() + .setPayload( + Payload.newBuilder() + .setMessage( + ByteString.copyFrom( + truncatedMessage.getBytes(StandardCharsets.US_ASCII))) + .setMessageLength(bytes.length)) + .setPayloadTruncated(true) + .build()); } @@ -274,30 +256,28 @@ public void logRequestHeader() throws Exception { String methodName = "method"; String authority = "authority"; Duration timeout = Durations.fromMillis(1234); - String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f"; + String callId = "d155e885-9587-4e77-81f7-3aa5a443d47f"; InetAddress address = InetAddress.getByName("127.0.0.1"); int port = 12345; InetSocketAddress peerAddress = new InetSocketAddress(address, port); GrpcLogRecord.Builder builder = - metadataToProtoTestHelper(EventType.GRPC_CALL_REQUEST_HEADER, nonEmptyMetadata, + metadataToProtoTestHelper(EventType.CLIENT_HEADER, nonEmptyMetadata, HEADER_LIMIT) .toBuilder() - .setTimestamp(timestamp) .setSequenceId(seqId) .setServiceName(serviceName) .setMethodName(methodName) - .setEventType(EventType.GRPC_CALL_REQUEST_HEADER) - .setEventLogger(EventLogger.LOGGER_CLIENT) - .setLogLevel(LogLevel.LOG_LEVEL_DEBUG) - .setRpcId(rpcId); - builder.setAuthority(authority) - .setTimeout(timeout); + .setType(EventType.CLIENT_HEADER) + .setLogger(EventLogger.CLIENT) + .setCallId(callId); + builder.setAuthority(authority); + builder.setPayload(builder.getPayload().toBuilder().setTimeout(timeout).build()); GrpcLogRecord base = builder.build(); // logged on client { - logHelper.logRequestHeader( + logHelper.logClientHeader( seqId, serviceName, methodName, @@ -305,15 +285,15 @@ public void logRequestHeader() throws Exception { timeout, nonEmptyMetadata, HEADER_LIMIT, - EventLogger.LOGGER_CLIENT, - rpcId, + EventLogger.CLIENT, + callId, null); verify(sink).write(base); } // logged on server { - logHelper.logRequestHeader( + logHelper.logClientHeader( seqId, serviceName, methodName, @@ -321,19 +301,19 @@ public void logRequestHeader() throws Exception { timeout, nonEmptyMetadata, HEADER_LIMIT, - EventLogger.LOGGER_SERVER, - rpcId, + EventLogger.SERVER, + callId, peerAddress); verify(sink).write( base.toBuilder() - .setPeerAddress(LogHelper.socketAddressToProto(peerAddress)) - .setEventLogger(EventLogger.LOGGER_SERVER) + .setPeer(LogHelper.socketAddressToProto(peerAddress)) + .setLogger(EventLogger.SERVER) .build()); } // timeout is null { - logHelper.logRequestHeader( + logHelper.logClientHeader( seqId, serviceName, methodName, @@ -341,18 +321,18 @@ public void logRequestHeader() throws Exception { null, nonEmptyMetadata, HEADER_LIMIT, - EventLogger.LOGGER_CLIENT, - rpcId, + EventLogger.CLIENT, + callId, null); verify(sink).write( base.toBuilder() - .clearTimeout() + .setPayload(base.getPayload().toBuilder().clearTimeout().build()) .build()); } // peerAddress is not null (error on client) try { - logHelper.logRequestHeader( + logHelper.logClientHeader( seqId, serviceName, methodName, @@ -360,8 +340,8 @@ public void logRequestHeader() throws Exception { timeout, nonEmptyMetadata, HEADER_LIMIT, - EventLogger.LOGGER_CLIENT, - rpcId, + EventLogger.CLIENT, + callId, peerAddress); fail(); } catch (IllegalArgumentException expected) { @@ -374,68 +354,71 @@ public void logResponseHeader() throws Exception { long seqId = 1; String serviceName = "service"; String methodName = "method"; - String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f"; + String authority = "authority"; + String callId = "d155e885-9587-4e77-81f7-3aa5a443d47f"; InetAddress address = InetAddress.getByName("127.0.0.1"); int port = 12345; InetSocketAddress peerAddress = new InetSocketAddress(address, port); GrpcLogRecord.Builder builder = - metadataToProtoTestHelper(EventType.GRPC_CALL_RESPONSE_HEADER, nonEmptyMetadata, + metadataToProtoTestHelper(EventType.SERVER_HEADER, nonEmptyMetadata, HEADER_LIMIT) .toBuilder() - .setTimestamp(timestamp) .setSequenceId(seqId) .setServiceName(serviceName) .setMethodName(methodName) - .setEventType(EventType.GRPC_CALL_RESPONSE_HEADER) - .setEventLogger(EventLogger.LOGGER_CLIENT) - .setLogLevel(LogLevel.LOG_LEVEL_DEBUG) - .setRpcId(rpcId); - builder.setPeerAddress(LogHelper.socketAddressToProto(peerAddress)); + .setAuthority(authority) + .setType(EventType.SERVER_HEADER) + .setLogger(EventLogger.CLIENT) + .setCallId(callId); + builder.setPeer(LogHelper.socketAddressToProto(peerAddress)); GrpcLogRecord base = builder.build(); // logged on client { - logHelper.logResponseHeader( + logHelper.logServerHeader( seqId, serviceName, methodName, + authority, nonEmptyMetadata, HEADER_LIMIT, - EventLogger.LOGGER_CLIENT, - rpcId, + EventLogger.CLIENT, + callId, peerAddress); verify(sink).write(base); } // logged on server { - logHelper.logResponseHeader( + logHelper.logServerHeader( seqId, serviceName, methodName, + authority, nonEmptyMetadata, HEADER_LIMIT, - EventLogger.LOGGER_SERVER, - rpcId, + EventLogger.SERVER, + callId, null); verify(sink).write( base.toBuilder() - .setEventLogger(EventLogger.LOGGER_SERVER) - .clearPeerAddress() + .setLogger(EventLogger.SERVER) + .clearPeer() .build()); } // peerAddress is not null (error on server) try { - logHelper.logResponseHeader( + logHelper.logServerHeader( seqId, serviceName, methodName, + authority, nonEmptyMetadata, HEADER_LIMIT, - EventLogger.LOGGER_SERVER, - rpcId, + EventLogger.SERVER, + callId, peerAddress); fail(); @@ -450,27 +433,30 @@ public void logTrailer() throws Exception { long seqId = 1; String serviceName = "service"; String methodName = "method"; - String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f"; + String authority = "authority"; + String callId = "d155e885-9587-4e77-81f7-3aa5a443d47f"; InetAddress address = InetAddress.getByName("127.0.0.1"); int port = 12345; - InetSocketAddress peerAddress = new InetSocketAddress(address, port); + InetSocketAddress peer = new InetSocketAddress(address, port); Status statusDescription = Status.INTERNAL.withDescription("test description"); GrpcLogRecord.Builder builder = - metadataToProtoTestHelper(EventType.GRPC_CALL_RESPONSE_HEADER, nonEmptyMetadata, + metadataToProtoTestHelper(EventType.SERVER_HEADER, nonEmptyMetadata, HEADER_LIMIT) .toBuilder() - .setTimestamp(timestamp) .setSequenceId(seqId) .setServiceName(serviceName) .setMethodName(methodName) - .setEventType(EventType.GRPC_CALL_TRAILER) - .setEventLogger(EventLogger.LOGGER_CLIENT) - .setLogLevel(LogLevel.LOG_LEVEL_DEBUG) + .setAuthority(authority) + .setType(EventType.SERVER_TRAILER) + .setLogger(EventLogger.CLIENT) + .setCallId(callId); + builder.setPeer(LogHelper.socketAddressToProto(peer)); + builder.setPayload( + builder.getPayload().toBuilder() .setStatusCode(Status.INTERNAL.getCode().value()) .setStatusMessage("test description") - .setRpcId(rpcId); - builder.setPeerAddress(LogHelper.socketAddressToProto(peerAddress)); + .build()); GrpcLogRecord base = builder.build(); // logged on client @@ -479,12 +465,13 @@ public void logTrailer() throws Exception { seqId, serviceName, methodName, + authority, statusDescription, nonEmptyMetadata, HEADER_LIMIT, - EventLogger.LOGGER_CLIENT, - rpcId, - peerAddress); + EventLogger.CLIENT, + callId, + peer); verify(sink).write(base); } @@ -494,16 +481,17 @@ public void logTrailer() throws Exception { seqId, serviceName, methodName, + authority, statusDescription, nonEmptyMetadata, HEADER_LIMIT, - EventLogger.LOGGER_SERVER, - rpcId, + EventLogger.SERVER, + callId, null); verify(sink).write( base.toBuilder() - .clearPeerAddress() - .setEventLogger(EventLogger.LOGGER_SERVER) + .clearPeer() + .setLogger(EventLogger.SERVER) .build()); } @@ -513,15 +501,16 @@ public void logTrailer() throws Exception { seqId, serviceName, methodName, + authority, statusDescription, nonEmptyMetadata, HEADER_LIMIT, - EventLogger.LOGGER_CLIENT, - rpcId, + EventLogger.CLIENT, + callId, null); verify(sink).write( base.toBuilder() - .clearPeerAddress() + .clearPeer() .build()); } @@ -531,15 +520,16 @@ public void logTrailer() throws Exception { seqId, serviceName, methodName, + authority, statusDescription.getCode().toStatus(), nonEmptyMetadata, HEADER_LIMIT, - EventLogger.LOGGER_CLIENT, - rpcId, - peerAddress); + EventLogger.CLIENT, + callId, + peer); verify(sink).write( base.toBuilder() - .clearStatusMessage() + .setPayload(base.getPayload().toBuilder().clearStatusMessage().build()) .build()); } } @@ -551,10 +541,9 @@ public void alwaysLoggedMetadata_grpcTraceBin() { Metadata metadata = new Metadata(); metadata.put(key, new byte[1]); int zeroHeaderBytes = 0; - PayloadBuilder pair = + PayloadBuilderHelper pair = LogHelper.createMetadataProto(metadata, zeroHeaderBytes); - assertThat(Objects.requireNonNull(Iterables.getOnlyElement(pair.payload.getEntryBuilderList())) - .getKey()).isEqualTo(key.name()); + assertThat(pair.payloadBuilder.build().getMetadataMap().containsKey(key.name())).isTrue(); assertFalse(pair.truncated); } @@ -565,9 +554,9 @@ public void neverLoggedMetadata_grpcStatusDetailsBin() { Metadata metadata = new Metadata(); metadata.put(key, new byte[1]); int unlimitedHeaderBytes = Integer.MAX_VALUE; - PayloadBuilder pair + PayloadBuilderHelper pair = LogHelper.createMetadataProto(metadata, unlimitedHeaderBytes); - assertThat(pair.payload.getEntryBuilderList()).isEmpty(); + assertThat(pair.payloadBuilder.getMetadataMap()).isEmpty(); assertFalse(pair.truncated); } @@ -576,19 +565,19 @@ public void logRpcMessage() { long seqId = 1; String serviceName = "service"; String methodName = "method"; - String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f"; + String authority = "authority"; + String callId = "d155e885-9587-4e77-81f7-3aa5a443d47f"; byte[] message = new byte[100]; GrpcLogRecord.Builder builder = messageTestHelper(message, MESSAGE_LIMIT) .toBuilder() - .setTimestamp(timestamp) .setSequenceId(seqId) .setServiceName(serviceName) .setMethodName(methodName) - .setEventType(EventType.GRPC_CALL_REQUEST_MESSAGE) - .setEventLogger(EventLogger.LOGGER_CLIENT) - .setLogLevel(LogLevel.LOG_LEVEL_DEBUG) - .setRpcId(rpcId); + .setAuthority(authority) + .setType(EventType.CLIENT_MESSAGE) + .setLogger(EventLogger.CLIENT) + .setCallId(callId); GrpcLogRecord base = builder.build(); // request message { @@ -596,11 +585,12 @@ public void logRpcMessage() { seqId, serviceName, methodName, - EventType.GRPC_CALL_REQUEST_MESSAGE, + authority, + EventType.CLIENT_MESSAGE, message, MESSAGE_LIMIT, - EventLogger.LOGGER_CLIENT, - rpcId); + EventLogger.CLIENT, + callId); verify(sink).write(base); } // response message, logged on client @@ -609,14 +599,15 @@ public void logRpcMessage() { seqId, serviceName, methodName, - EventType.GRPC_CALL_RESPONSE_MESSAGE, + authority, + EventType.SERVER_MESSAGE, message, MESSAGE_LIMIT, - EventLogger.LOGGER_CLIENT, - rpcId); + EventLogger.CLIENT, + callId); verify(sink).write( base.toBuilder() - .setEventType(EventType.GRPC_CALL_RESPONSE_MESSAGE) + .setType(EventType.SERVER_MESSAGE) .build()); } // request message, logged on server @@ -625,14 +616,15 @@ public void logRpcMessage() { seqId, serviceName, methodName, - EventType.GRPC_CALL_REQUEST_MESSAGE, + authority, + EventType.CLIENT_MESSAGE, message, MESSAGE_LIMIT, - EventLogger.LOGGER_SERVER, - rpcId); + EventLogger.SERVER, + callId); verify(sink).write( base.toBuilder() - .setEventLogger(EventLogger.LOGGER_SERVER) + .setLogger(EventLogger.SERVER) .build()); } // response message, logged on server @@ -641,15 +633,34 @@ public void logRpcMessage() { seqId, serviceName, methodName, - EventType.GRPC_CALL_RESPONSE_MESSAGE, + authority, + EventType.SERVER_MESSAGE, message, MESSAGE_LIMIT, - EventLogger.LOGGER_SERVER, - rpcId); + EventLogger.SERVER, + callId); + verify(sink).write( + base.toBuilder() + .setType(EventType.SERVER_MESSAGE) + .setLogger(EventLogger.SERVER) + .build()); + } + // message is not of type : com.google.protobuf.Message or byte[] + { + logHelper.logRpcMessage( + seqId, + serviceName, + methodName, + authority, + EventType.CLIENT_MESSAGE, + "message", + MESSAGE_LIMIT, + EventLogger.CLIENT, + callId); verify(sink).write( base.toBuilder() - .setEventType(EventType.GRPC_CALL_RESPONSE_MESSAGE) - .setEventLogger(EventLogger.LOGGER_SERVER) + .clearPayload() + .clearPayloadTruncated() .build()); } } @@ -667,21 +678,19 @@ public void getPeerAddressTest() throws Exception { private static GrpcLogRecord metadataToProtoTestHelper( EventType type, Metadata metadata, int maxHeaderBytes) { GrpcLogRecord.Builder builder = GrpcLogRecord.newBuilder(); - PayloadBuilder pair + PayloadBuilderHelper pair = LogHelper.createMetadataProto(metadata, maxHeaderBytes); - builder.setMetadata(pair.payload); - builder.setPayloadSize(pair.size); + builder.setPayload(pair.payloadBuilder); builder.setPayloadTruncated(pair.truncated); - builder.setEventType(type); + builder.setType(type); return builder.build(); } private static GrpcLogRecord messageTestHelper(byte[] message, int maxMessageBytes) { GrpcLogRecord.Builder builder = GrpcLogRecord.newBuilder(); - PayloadBuilder pair + PayloadBuilderHelper pair = LogHelper.createMessageProto(message, maxMessageBytes); - builder.setMessage(pair.payload); - builder.setPayloadSize(pair.size); + builder.setPayload(pair.payloadBuilder); builder.setPayloadTruncated(pair.truncated); return builder.build(); } diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java index cea081b9b55..912d6a08bdb 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java @@ -18,7 +18,6 @@ import static com.google.common.truth.Truth.assertThat; import static org.mockito.ArgumentMatchers.anyIterable; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; @@ -67,7 +66,6 @@ public class GcpLogSinkTest { private static final ImmutableMap CUSTOM_TAGS = ImmutableMap.of("KEY1", "Value1", "KEY2", "VALUE2"); - private static final long FLUSH_LIMIT = 10L; // gRPC is expected to always use this log name when reporting to GCP cloud logging. private static final String EXPECTED_LOG_NAME = "microservices.googleapis.com%2Fobservability%2Fgrpc"; @@ -77,28 +75,33 @@ public class GcpLogSinkTest { private static final String METHOD_NAME = "method"; private static final String AUTHORITY = "authority"; private static final Duration TIMEOUT = Durations.fromMillis(1234); - private static final String RPC_ID = "d155e885-9587-4e77-81f7-3aa5a443d47f"; + private static final String CALL_ID = "d155e885-9587-4e77-81f7-3aa5a443d47f"; private static final GrpcLogRecord LOG_PROTO = GrpcLogRecord.newBuilder() .setSequenceId(SEQ_ID) .setServiceName(SERVICE_NAME) .setMethodName(METHOD_NAME) .setAuthority(AUTHORITY) - .setTimeout(TIMEOUT) - .setEventType(EventType.GRPC_CALL_REQUEST_HEADER) - .setEventLogger(EventLogger.LOGGER_CLIENT) - .setRpcId(RPC_ID) + .setPayload(io.grpc.observabilitylog.v1.Payload.newBuilder().setTimeout(TIMEOUT)) + .setType(EventType.CLIENT_HEADER) + .setLogger(EventLogger.CLIENT) + .setCallId(CALL_ID) .build(); + // .putFields("timeout", Value.newBuilder().setStringValue("1.234s").build()) + private static final Struct struct = + Struct.newBuilder() + .putFields("timeout", Value.newBuilder().setStringValue("1.234s").build()) + .build(); private static final Struct EXPECTED_STRUCT_LOG_PROTO = Struct.newBuilder() - .putFields("sequence_id", Value.newBuilder().setStringValue(String.valueOf(SEQ_ID)).build()) - .putFields("service_name", Value.newBuilder().setStringValue(SERVICE_NAME).build()) - .putFields("method_name", Value.newBuilder().setStringValue(METHOD_NAME).build()) + .putFields("sequenceId", Value.newBuilder().setStringValue(String.valueOf(SEQ_ID)).build()) + .putFields("serviceName", Value.newBuilder().setStringValue(SERVICE_NAME).build()) + .putFields("methodName", Value.newBuilder().setStringValue(METHOD_NAME).build()) .putFields("authority", Value.newBuilder().setStringValue(AUTHORITY).build()) - .putFields("timeout", Value.newBuilder().setStringValue("1.234s").build()) - .putFields("event_type", Value.newBuilder().setStringValue( - String.valueOf(EventType.GRPC_CALL_REQUEST_HEADER)).build()) - .putFields("event_logger", Value.newBuilder().setStringValue( - String.valueOf(EventLogger.LOGGER_CLIENT)).build()) - .putFields("rpc_id", Value.newBuilder().setStringValue(RPC_ID).build()) + .putFields("payload", Value.newBuilder().setStructValue(struct).build()) + .putFields("type", Value.newBuilder().setStringValue( + String.valueOf(EventType.CLIENT_HEADER)).build()) + .putFields("logger", Value.newBuilder().setStringValue( + String.valueOf(EventLogger.CLIENT)).build()) + .putFields("callId", Value.newBuilder().setStringValue(CALL_ID).build()) .build(); @Mock private Logging mockLogging; @@ -107,7 +110,7 @@ public class GcpLogSinkTest { @SuppressWarnings("unchecked") public void verifyWrite() throws Exception { GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, - CUSTOM_TAGS, FLUSH_LIMIT, Collections.emptySet()); + CUSTOM_TAGS, Collections.emptySet()); sink.write(LOG_PROTO); ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( @@ -125,7 +128,7 @@ public void verifyWrite() throws Exception { @SuppressWarnings("unchecked") public void verifyWriteWithTags() { GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, - CUSTOM_TAGS, FLUSH_LIMIT, Collections.emptySet()); + CUSTOM_TAGS, Collections.emptySet()); MonitoredResource expectedMonitoredResource = GcpLogSink.getResource(LOCATION_TAGS); sink.write(LOG_PROTO); @@ -149,7 +152,7 @@ public void emptyCustomTags_labelsNotSet() { Map emptyCustomTags = null; Map expectedEmptyLabels = new HashMap<>(); GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, - emptyCustomTags, FLUSH_LIMIT, Collections.emptySet()); + emptyCustomTags, Collections.emptySet()); sink.write(LOG_PROTO); ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( @@ -170,7 +173,7 @@ public void emptyCustomTags_setSourceProject() { Map expectedLabels = GcpLogSink.getCustomTags(emptyCustomTags, LOCATION_TAGS, destinationProjectId); GcpLogSink sink = new GcpLogSink(mockLogging, destinationProjectId, LOCATION_TAGS, - emptyCustomTags, FLUSH_LIMIT, Collections.emptySet()); + emptyCustomTags, Collections.emptySet()); sink.write(LOG_PROTO); ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( @@ -183,24 +186,10 @@ public void emptyCustomTags_setSourceProject() { } } - @Test - public void verifyFlush() { - long lowerFlushLimit = 2L; - GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, - CUSTOM_TAGS, lowerFlushLimit, Collections.emptySet()); - sink.write(LOG_PROTO); - verify(mockLogging, never()).flush(); - sink.write(LOG_PROTO); - verify(mockLogging, times(1)).flush(); - sink.write(LOG_PROTO); - sink.write(LOG_PROTO); - verify(mockLogging, times(2)).flush(); - } - @Test public void verifyClose() throws Exception { GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, - CUSTOM_TAGS, FLUSH_LIMIT, Collections.emptySet()); + CUSTOM_TAGS, Collections.emptySet()); sink.write(LOG_PROTO); verify(mockLogging, times(1)).write(anyIterable()); sink.close(); @@ -211,7 +200,7 @@ public void verifyClose() throws Exception { @Test public void verifyExclude() throws Exception { Sink mockSink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, - CUSTOM_TAGS, FLUSH_LIMIT, Collections.singleton("service")); + CUSTOM_TAGS, Collections.singleton("service")); mockSink.write(LOG_PROTO); verifyNoInteractions(mockLogging); }