diff --git a/remora-kafka/src/main/java/com/jkoolcloud/remora/advices/KafkaConsumerAdvice.java b/remora-kafka/src/main/java/com/jkoolcloud/remora/advices/KafkaConsumerAdvice.java index 1aae409..ddff067 100644 --- a/remora-kafka/src/main/java/com/jkoolcloud/remora/advices/KafkaConsumerAdvice.java +++ b/remora-kafka/src/main/java/com/jkoolcloud/remora/advices/KafkaConsumerAdvice.java @@ -90,7 +90,7 @@ public static void before(// @Advice.This Object thiz, // ed.setName("consume"); ed.setEventType(EntryDefinition.EventType.RECEIVE); ed.addPropertyIfExist("TOPIC", topic); - ed.setResource(topic, EntryDefinition.ResourceType.QUEUE); + ed.setResource(topic, EntryDefinition.ResourceType.TOPIC); ed.addPropertyIfExist("PARTITION", partition); ed.addPropertyIfExist("OFFSET", offset); @@ -113,7 +113,6 @@ public static void before(// @Advice.This Object thiz, // /* startTime = */ fillDefaultValuesBefore(ed, stackThreadLocal, null, null, logging ? logger : null)// ); ed.setEventType(EntryDefinition.EventType.RECEIVE); - ed.setApplication("KAFKA"); } catch (Throwable t) { handleAdviceException(t, ADVICE_NAME + "start", logging ? logger : null); @@ -125,11 +124,8 @@ public static void before(// @Advice.This Object thiz, // * */ - // @Advice.OnMethodExit(onThrowable = Throwable.class) - public static void after(// @Advice.Origin Method method, // - // @Advice.AllArguments Object[] arguments, // - // @Advice.Return Object returnValue, // //TODO needs separate Advice capture for void type - ) { + @Advice.OnMethodExit + public static void after() { boolean doFinally = true; try { diff --git a/remora-kafka/src/main/java/com/jkoolcloud/remora/advices/KafkaConsumerClientAdvice.java b/remora-kafka/src/main/java/com/jkoolcloud/remora/advices/KafkaConsumerClientAdvice.java index ae4175c..ab9c59b 100644 --- a/remora-kafka/src/main/java/com/jkoolcloud/remora/advices/KafkaConsumerClientAdvice.java +++ b/remora-kafka/src/main/java/com/jkoolcloud/remora/advices/KafkaConsumerClientAdvice.java @@ -6,12 +6,14 @@ import java.lang.instrument.Instrumentation; import java.lang.reflect.Method; import java.text.MessageFormat; +import java.util.Stack; import org.apache.kafka.clients.consumer.Consumer; import org.tinylog.Logger; import org.tinylog.TaggedLogger; import com.jkoolcloud.remora.RemoraConfig; +import com.jkoolcloud.remora.core.CallStack; import com.jkoolcloud.remora.core.EntryDefinition; import net.bytebuddy.agent.builder.AgentBuilder; @@ -77,7 +79,14 @@ public static void before(@Advice.This Consumer thiz, // ed.setEventType(EntryDefinition.EventType.CALL); String clientId = getFieldValue(thiz, String.class, "clientId"); String groupId = getFieldValue(thiz, String.class, "groupId"); - ed.setApplication(MessageFormat.format("clientId={}, groupId={}", clientId, groupId)); + Stack entryDefinitions = stackThreadLocal.get(); + if (entryDefinitions != null) { + String application = MessageFormat.format("clientId={}, groupId={}", clientId, groupId); + ((CallStack) entryDefinitions).setApplication(application); + if (logging) { + logger.info(format("Setting the application", application)); + } + } } catch (Throwable t) { handleAdviceException(t, ADVICE_NAME, logging ? logger : null); } diff --git a/remora-kafka/src/main/java/com/jkoolcloud/remora/advices/KafkaProducerAdvice.java b/remora-kafka/src/main/java/com/jkoolcloud/remora/advices/KafkaProducerAdvice.java index 3929e6a..b6f3ba7 100644 --- a/remora-kafka/src/main/java/com/jkoolcloud/remora/advices/KafkaProducerAdvice.java +++ b/remora-kafka/src/main/java/com/jkoolcloud/remora/advices/KafkaProducerAdvice.java @@ -5,6 +5,7 @@ import java.lang.instrument.Instrumentation; import java.lang.reflect.Method; +import java.util.Stack; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -12,6 +13,7 @@ import org.tinylog.TaggedLogger; import com.jkoolcloud.remora.RemoraConfig; +import com.jkoolcloud.remora.core.CallStack; import com.jkoolcloud.remora.core.EntryDefinition; import com.jkoolcloud.remora.core.utils.ReflectionUtils; @@ -79,13 +81,21 @@ public static void before(@Advice.This KafkaProducer thiz, // ed.setEventType(EntryDefinition.EventType.SEND); String topic = record.topic(); - ed.setApplication(ReflectionUtils.getFieldValue(thiz, String.class, "clientId")); + Stack entryDefinitions = stackThreadLocal.get(); + if (entryDefinitions != null) { + String application = ReflectionUtils.getFieldValue(thiz, String.class, "clientId"); + ((CallStack) entryDefinitions).setApplication(application); + if (logging) { + logger.info(format("Setting the application", application)); + } + } + ed.addPropertyIfExist("TOPIC", topic); ed.addPropertyIfExist("TIMESTAMP", record.timestamp()); ed.addPropertyIfExist("PARTITION", record.partition()); ed.addPropertyIfExist("KEY", String.valueOf(record.key())); ed.addPropertyIfExist("VALUE", String.valueOf(record.value())); - ed.setResource(topic, EntryDefinition.ResourceType.QUEUE); + ed.setResource(topic, EntryDefinition.ResourceType.TOPIC); } catch (Throwable t) { handleAdviceException(t, ADVICE_NAME, logging ? logger : null); }