Skip to content

Commit

Permalink
Kafka fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
mjok committed Nov 26, 2019
1 parent 58770f5 commit 19c9e73
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public static void before(// @Advice.This Object thiz, //
ed.setName("consume");
ed.setEventType(EntryDefinition.EventType.RECEIVE);
ed.addPropertyIfExist("TOPIC", topic);
ed.setResource(topic, EntryDefinition.ResourceType.QUEUE);
ed.setResource(topic, EntryDefinition.ResourceType.TOPIC);

ed.addPropertyIfExist("PARTITION", partition);
ed.addPropertyIfExist("OFFSET", offset);
Expand All @@ -113,7 +113,6 @@ public static void before(// @Advice.This Object thiz, //
/* startTime = */ fillDefaultValuesBefore(ed, stackThreadLocal, null, null, logging ? logger : null)//
);
ed.setEventType(EntryDefinition.EventType.RECEIVE);
ed.setApplication("KAFKA");

} catch (Throwable t) {
handleAdviceException(t, ADVICE_NAME + "start", logging ? logger : null);
Expand All @@ -125,11 +124,8 @@ public static void before(// @Advice.This Object thiz, //
*
*/

// @Advice.OnMethodExit(onThrowable = Throwable.class)
public static void after(// @Advice.Origin Method method, //
// @Advice.AllArguments Object[] arguments, //
// @Advice.Return Object returnValue, // //TODO needs separate Advice capture for void type
) {
@Advice.OnMethodExit
public static void after() {
boolean doFinally = true;

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
import java.lang.instrument.Instrumentation;
import java.lang.reflect.Method;
import java.text.MessageFormat;
import java.util.Stack;

import org.apache.kafka.clients.consumer.Consumer;
import org.tinylog.Logger;
import org.tinylog.TaggedLogger;

import com.jkoolcloud.remora.RemoraConfig;
import com.jkoolcloud.remora.core.CallStack;
import com.jkoolcloud.remora.core.EntryDefinition;

import net.bytebuddy.agent.builder.AgentBuilder;
Expand Down Expand Up @@ -77,7 +79,14 @@ public static void before(@Advice.This Consumer thiz, //
ed.setEventType(EntryDefinition.EventType.CALL);
String clientId = getFieldValue(thiz, String.class, "clientId");
String groupId = getFieldValue(thiz, String.class, "groupId");
ed.setApplication(MessageFormat.format("clientId={}, groupId={}", clientId, groupId));
Stack<EntryDefinition> entryDefinitions = stackThreadLocal.get();
if (entryDefinitions != null) {
String application = MessageFormat.format("clientId={}, groupId={}", clientId, groupId);
((CallStack) entryDefinitions).setApplication(application);
if (logging) {
logger.info(format("Setting the application", application));
}
}
} catch (Throwable t) {
handleAdviceException(t, ADVICE_NAME, logging ? logger : null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

import java.lang.instrument.Instrumentation;
import java.lang.reflect.Method;
import java.util.Stack;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.tinylog.Logger;
import org.tinylog.TaggedLogger;

import com.jkoolcloud.remora.RemoraConfig;
import com.jkoolcloud.remora.core.CallStack;
import com.jkoolcloud.remora.core.EntryDefinition;
import com.jkoolcloud.remora.core.utils.ReflectionUtils;

Expand Down Expand Up @@ -79,13 +81,21 @@ public static void before(@Advice.This KafkaProducer thiz, //
ed.setEventType(EntryDefinition.EventType.SEND);
String topic = record.topic();

ed.setApplication(ReflectionUtils.getFieldValue(thiz, String.class, "clientId"));
Stack<EntryDefinition> entryDefinitions = stackThreadLocal.get();
if (entryDefinitions != null) {
String application = ReflectionUtils.getFieldValue(thiz, String.class, "clientId");
((CallStack) entryDefinitions).setApplication(application);
if (logging) {
logger.info(format("Setting the application", application));
}
}

ed.addPropertyIfExist("TOPIC", topic);
ed.addPropertyIfExist("TIMESTAMP", record.timestamp());
ed.addPropertyIfExist("PARTITION", record.partition());
ed.addPropertyIfExist("KEY", String.valueOf(record.key()));
ed.addPropertyIfExist("VALUE", String.valueOf(record.value()));
ed.setResource(topic, EntryDefinition.ResourceType.QUEUE);
ed.setResource(topic, EntryDefinition.ResourceType.TOPIC);
} catch (Throwable t) {
handleAdviceException(t, ADVICE_NAME, logging ? logger : null);
}
Expand Down

0 comments on commit 19c9e73

Please sign in to comment.