Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate MirrorVulnerabilityProcessor from Kafka Streams to Parallel Consumer #553

Merged
merged 1 commit into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.dependencytrack.event.kafka.processor;

import alpine.common.logging.Logger;
import org.dependencytrack.event.kafka.KafkaTopics;
import org.dependencytrack.event.kafka.processor.api.ProcessorManager;

import javax.servlet.ServletContextEvent;
Expand All @@ -16,7 +17,8 @@ public class ProcessorInitializer implements ServletContextListener {
public void contextInitialized(final ServletContextEvent event) {
LOGGER.info("Initializing processors");

// TODO: Register processor here!
PROCESSOR_MANAGER.registerProcessor(VulnerabilityMirrorProcessor.PROCESSOR_NAME,
KafkaTopics.NEW_VULNERABILITY, new VulnerabilityMirrorProcessor());

PROCESSOR_MANAGER.startAll();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package org.dependencytrack.event.kafka.streams.processor;
package org.dependencytrack.event.kafka.processor;

import alpine.common.logging.Logger;
import alpine.common.metrics.Metrics;
import com.github.packageurl.MalformedPackageURLException;
import com.github.packageurl.PackageURL;
import io.micrometer.core.instrument.Timer;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.cyclonedx.proto.v1_4.Bom;
import org.cyclonedx.proto.v1_4.Component;
import org.cyclonedx.proto.v1_4.VulnerabilityAffects;
import org.dependencytrack.event.kafka.processor.api.Processor;
import org.dependencytrack.model.Vulnerability;
import org.dependencytrack.model.VulnerableSoftware;
import org.dependencytrack.parser.dependencytrack.ModelConverterCdxToVuln;
Expand All @@ -27,19 +25,18 @@
import java.util.List;
import java.util.Optional;

/**
* A {@link Processor} that ingests vulnerability data from CycloneDX Bill of Vulnerabilities.
*/
public class VulnerabilityMirrorProcessor implements Processor<String, Bom> {

public class MirrorVulnerabilityProcessor implements Processor<String, Bom, Void, Void> {
static final String PROCESSOR_NAME = "vuln.mirror";

private static final Logger LOGGER = Logger.getLogger(MirrorVulnerabilityProcessor.class);
private static final Timer TIMER = Timer.builder("vuln_mirror_processing")
.description("Time taken to process mirrored vulnerabilities")
.register(Metrics.getRegistry());
private static final Logger LOGGER = Logger.getLogger(VulnerabilityMirrorProcessor.class);

@Override
public void process(final Record<String, Bom> record) {
final Timer.Sample timerSample = Timer.start();

try (QueryManager qm = new QueryManager().withL2CacheDisabled()) {
public void process(final ConsumerRecord<String, Bom> record) {
try (QueryManager qm = new QueryManager()) {
LOGGER.debug("Synchronizing Mirrored Vulnerability : " + record.key());
Bom bom = record.value();
String key = record.key();
Expand Down Expand Up @@ -112,11 +109,6 @@ public void process(final Record<String, Bom> record) {
synchronizedVulnerability.setVulnerableSoftware(reconciledVsList);
}
qm.persist(synchronizedVulnerability);
} catch (Exception e) {
// TODO: Send record to a dead letter topic.
LOGGER.error("Synchronizing vulnerability %s failed".formatted(record.key()), e);
} finally {
timerSample.stop(TIMER);
}
}

Expand Down Expand Up @@ -230,8 +222,8 @@ public VulnerableSoftware mapAffectedRangeToVulnerableSoftware(final QueryManage

for (final Constraint constraint : vers.constraints()) {
if (constraint.version() == null
|| constraint.version().equals("0")
|| constraint.version().equals("*")) {
|| constraint.version().equals("0")
|| constraint.version().equals("*")) {
// Semantically, ">=0" is equivalent to versionStartIncluding=null,
// and ">0" is equivalent to versionStartExcluding=null.
//
Expand All @@ -253,12 +245,12 @@ public VulnerableSoftware mapAffectedRangeToVulnerableSoftware(final QueryManage
}

if (versionStartIncluding == null && versionStartExcluding == null
&& versionEndIncluding == null && versionEndExcluding == null) {
&& versionEndIncluding == null && versionEndExcluding == null) {
LOGGER.warn("Unable to assemble a version range from %s for %s".formatted(vers, vulnId));
return null;
}
if ((versionStartIncluding != null || versionStartExcluding != null)
&& (versionEndIncluding == null && versionEndExcluding == null)) {
&& (versionEndIncluding == null && versionEndExcluding == null)) {
LOGGER.warn("Skipping indefinite version range assembled from %s for %s".formatted(vers, vulnId));
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.dependencytrack.event.ProjectPolicyEvaluationEvent;
import org.dependencytrack.event.kafka.KafkaTopics;
import org.dependencytrack.event.kafka.streams.processor.DelayedBomProcessedNotificationProcessor;
import org.dependencytrack.event.kafka.streams.processor.MirrorVulnerabilityProcessor;
import org.dependencytrack.event.kafka.streams.processor.RepositoryMetaResultProcessor;
import org.dependencytrack.event.kafka.streams.processor.VulnerabilityScanResultProcessor;
import org.dependencytrack.model.VulnerabilityScan;
Expand Down Expand Up @@ -224,12 +223,6 @@ Topology createTopology() {
.withName("consume_from_%s_topic".formatted(KafkaTopics.REPO_META_ANALYSIS_RESULT.name())))
.process(RepositoryMetaResultProcessor::new, Named.as("process_repo_meta_analysis_result"));

streamsBuilder
.stream(KafkaTopics.NEW_VULNERABILITY.name(),
Consumed.with(KafkaTopics.NEW_VULNERABILITY.keySerde(), KafkaTopics.NEW_VULNERABILITY.valueSerde())
.withName("consume_from_%s_topic".formatted(KafkaTopics.NEW_VULNERABILITY.name())))
.process(MirrorVulnerabilityProcessor::new, Named.as("process_mirror_vulnerability"));

return streamsBuilder.build(streamsProperties);
}

Expand Down
13 changes: 13 additions & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,19 @@ kafka.streams.transient.processing.exception.threshold.interval=PT30M
# Refer to https://kafka.apache.org/documentation/#consumerconfigs for available options.
# alpine.kafka.processor.<name>.consumer.<consumer.config.name>=

# Required
# Configures the Kafka processor responsible for ingesting mirrored vulnerability
nscuro marked this conversation as resolved.
Show resolved Hide resolved
# data from the dtrack.vulnerability topic. The processor only occasionally receives
# records, such that high concurrency is usually not justified.
alpine.kafka.processor.vuln.mirror.max.concurrency=-1
alpine.kafka.processor.vuln.mirror.processing.order=partition
alpine.kafka.processor.vuln.mirror.retry.initial.delay.ms=3000
alpine.kafka.processor.vuln.mirror.retry.multiplier=2
alpine.kafka.processor.vuln.mirror.retry.randomization.factor=0.3
alpine.kafka.processor.vuln.mirror.retry.max.delay.ms=180000
alpine.kafka.processor.vuln.mirror.consumer.group.id=dtrack-apiserver-processor
alpine.kafka.processor.vuln.mirror.consumer.auto.offset.reset=earliest

# Scheduling tasks after 3 minutes (3*60*1000) of starting application
task.scheduler.initial.delay=180000

Expand Down
Original file line number Diff line number Diff line change
@@ -1,55 +1,26 @@
package org.dependencytrack.event.kafka.streams.processor;
package org.dependencytrack.event.kafka.processor;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.cyclonedx.proto.v1_4.Bom;
import org.dependencytrack.PersistenceCapableTest;
import org.dependencytrack.event.kafka.serialization.KafkaProtobufSerde;
import org.dependencytrack.event.kafka.serialization.KafkaProtobufSerializer;
import org.dependencytrack.model.Severity;
import org.dependencytrack.model.Vulnerability;
import org.dependencytrack.persistence.CweImporter;
import org.dependencytrack.util.KafkaTestUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.dependencytrack.util.KafkaTestUtil.generateBomFromJson;

public class MirrorVulnerabilityProcessorTest extends PersistenceCapableTest {

private TopologyTestDriver testDriver;
private TestInputTopic<String, Bom> inputTopic;
public class VulnerabilityMirrorProcessorTest extends AbstractProcessorTest {

@Before
public void setUp() throws Exception {
final var streamsBuilder = new StreamsBuilder();
streamsBuilder
.stream("input-topic", Consumed
.with(Serdes.String(), new KafkaProtobufSerde<>(Bom.parser())))
.process(MirrorVulnerabilityProcessor::new);

testDriver = new TopologyTestDriver(streamsBuilder.build());
inputTopic = testDriver.createInputTopic("input-topic",
new StringSerializer(), new KafkaProtobufSerializer<>());
public void before() throws Exception {
super.before();

new CweImporter().processCweDefinitions(); // Required for CWE mapping
}

@After
public void tearDown() {
if (testDriver != null) {
testDriver.close();
}
}

@Test
public void testProcessNvdVuln() throws Exception {
inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson("""
final var bovJson = """
{
"components": [
{
Expand Down Expand Up @@ -91,7 +62,10 @@ public void testProcessNvdVuln() throws Exception {
{ "url": "https://github.com/thinkcmf/thinkcmf/issues/736" }
]
}
"""));
""";

final var processor = new VulnerabilityMirrorProcessor();
processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build());

final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489");
assertThat(vuln).isNotNull();
Expand Down Expand Up @@ -160,7 +134,7 @@ public void testProcessNvdVuln() throws Exception {

@Test
public void testProcessGitHubVuln() throws Exception {
inputTopic.pipeInput("GITHUB/GHSA-fxwm-579q-49qq", KafkaTestUtil.generateBomFromJson("""
final var bovJson = """
{
"components": [
{
Expand Down Expand Up @@ -223,7 +197,10 @@ public void testProcessGitHubVuln() throws Exception {
{ "url": "https://github.com/advisories/GHSA-fxwm-579q-49qq" }
]
}
"""));
""";

final var processor = new VulnerabilityMirrorProcessor();
processor.process(aConsumerRecord("GITHUB/GHSA-fxwm-579q-49qq", generateBomFromJson(bovJson)).build());

final Vulnerability vuln = qm.getVulnerabilityByVulnId("GITHUB", "GHSA-fxwm-579q-49qq");
assertThat(vuln).isNotNull();
Expand Down Expand Up @@ -375,7 +352,7 @@ public void testProcessGitHubVuln() throws Exception {

@Test
public void testProcessOsvVuln() throws Exception {
inputTopic.pipeInput("OSV/GHSA-2cc5-23r7-vc4v", KafkaTestUtil.generateBomFromJson("""
final var bovJson = """
{
"components": [
{
Expand Down Expand Up @@ -427,7 +404,10 @@ public void testProcessOsvVuln() throws Exception {
{ "url": "https://github.com/ratpack/ratpack/blob/29434f7ac6fd4b36a4495429b70f4c8163100332/ratpack-session/src/main/java/ratpack/session/clientside/ClientSideSessionConfig.java#L29" }
]
}
"""));
""";

final var processor = new VulnerabilityMirrorProcessor();
processor.process(aConsumerRecord("OSV/GHSA-2cc5-23r7-vc4v", generateBomFromJson(bovJson)).build());

final Vulnerability vuln = qm.getVulnerabilityByVulnId("GITHUB", "GHSA-2cc5-23r7-vc4v");
assertThat(vuln).isNotNull();
Expand Down Expand Up @@ -555,7 +535,7 @@ public void testProcessOsvVuln() throws Exception {

@Test
public void testProcessVulnWithoutAffects() throws Exception {
inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson("""
final var bovJson = """
{
"components": [
{
Expand All @@ -573,7 +553,10 @@ public void testProcessVulnWithoutAffects() throws Exception {
}
]
}
"""));
""";

final var processor = new VulnerabilityMirrorProcessor();
processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build());

final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489");
assertThat(vuln).isNotNull();
Expand Down Expand Up @@ -613,7 +596,7 @@ public void testProcessVulnWithoutAffects() throws Exception {

@Test
public void testProcessVulnWithUnmatchedAffectsBomRef() throws Exception {
inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson("""
final var bovJson = """
{
"components": [
{
Expand All @@ -639,7 +622,10 @@ public void testProcessVulnWithUnmatchedAffectsBomRef() throws Exception {
}
]
}
"""));
""";

final var processor = new VulnerabilityMirrorProcessor();
processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build());

final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489");
assertThat(vuln).isNotNull();
Expand Down Expand Up @@ -679,7 +665,7 @@ public void testProcessVulnWithUnmatchedAffectsBomRef() throws Exception {

@Test
public void testProcessVulnWithVersConstraints() throws Exception {
inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson("""
final var bovJson = """
{
"components": [
{
Expand Down Expand Up @@ -731,7 +717,10 @@ public void testProcessVulnWithVersConstraints() throws Exception {
}
]
}
"""));
""";

final var processor = new VulnerabilityMirrorProcessor();
processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build());

final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489");
assertThat(vuln).isNotNull();
Expand Down Expand Up @@ -935,7 +924,7 @@ public void testProcessVulnWithVersConstraints() throws Exception {

@Test
public void testProcessVulnWithInvalidCpeOrPurl() throws Exception {
inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson("""
final var bovJson = """
{
"components": [
{
Expand Down Expand Up @@ -997,7 +986,10 @@ public void testProcessVulnWithInvalidCpeOrPurl() throws Exception {
}
]
}
"""));
""";

final var processor = new VulnerabilityMirrorProcessor();
processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build());

final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489");
assertThat(vuln).isNotNull();
Expand Down Expand Up @@ -1035,4 +1027,4 @@ public void testProcessVulnWithInvalidCpeOrPurl() throws Exception {
assertThat(vuln.getVulnerableSoftware()).isEmpty();
}

}
}
Loading