Skip to content

Commit

Permalink
flink: add metrics support
Browse files Browse the repository at this point in the history
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
  • Loading branch information
mobuchowski committed Apr 24, 2024
1 parent d002a0a commit 9cf73e7
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.openlineage.flink.visitor.lifecycle;

import io.micrometer.core.instrument.MeterRegistry;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineage.JobFacets;
import io.openlineage.client.OpenLineage.JobFacetsBuilder;
Expand Down Expand Up @@ -57,6 +58,7 @@ public class FlinkExecutionContext implements ExecutionContext {
private final String processingType;
private final CircuitBreaker circuitBreaker;
private final FlinkOpenLineageConfig config;
@Getter private final MeterRegistry meterRegistry;

@Getter private final List<Transformation<?>> transformations;

Expand All @@ -70,7 +72,9 @@ public void onJobSubmitted() {
.run(new OpenLineage.RunBuilder().runId(runId).build())
.build();
log.debug("Posting event for onJobSubmitted {}: {}", jobId, runEvent);
meterRegistry.counter("openlineage.flink.event.submitted.start").increment();
eventEmitter.emit(runEvent);
meterRegistry.counter("openlineage.flink.event.submitted.end").increment();
return null;
});
}
Expand All @@ -90,7 +94,9 @@ public void onJobCheckpoint(CheckpointFacet facet) {
.build())
.build();
log.debug("Posting event for onJobCheckpoint {}: {}", jobId, runEvent);
meterRegistry.counter("openlineage.flink.event.checkpoint.start").increment();
eventEmitter.emit(runEvent);
meterRegistry.counter("openlineage.flink.event.checkpoint.end").increment();
return null;
});
}
Expand Down Expand Up @@ -119,11 +125,13 @@ public void onJobCompleted(JobExecutionResult jobExecutionResult) {
circuitBreaker.run(
() -> {
OpenLineage openLineage = openLineageContext.getOpenLineage();
meterRegistry.counter("openlineage.flink.event.completed.start").increment();
eventEmitter.emit(
commonEventBuilder()
.run(openLineage.newRun(runId, null))
.eventType(EventType.COMPLETE)
.build());
meterRegistry.counter("openlineage.flink.event.completed.end").increment();
return null;
});
}
Expand All @@ -133,6 +141,7 @@ public void onJobFailed(Throwable failed) {
circuitBreaker.run(
() -> {
OpenLineage openLineage = openLineageContext.getOpenLineage();
meterRegistry.counter("openlineage.flink.event.failed.start").increment();
eventEmitter.emit(
commonEventBuilder()
.run(
Expand All @@ -149,6 +158,7 @@ public void onJobFailed(Throwable failed) {
.eventType(EventType.FAIL)
.eventTime(ZonedDateTime.now())
.build());
meterRegistry.counter("openlineage.flink.event.failed.end").increment();
return null;
});
}
Expand Down Expand Up @@ -217,7 +227,14 @@ private List<OpenLineage.InputDataset> getInputDatasets(
inputDatasets.addAll(
inputVisitors.stream()
.filter(inputVisitor -> inputVisitor.isDefinedAt(transformation))
.map(inputVisitor -> inputVisitor.apply(transformation))
.map(
inputVisitor ->
meterRegistry
.timer(
"openlineage.flink.dataset.input.extraction.time",
"visitor",
inputVisitor.getClass().getName())
.record(() -> inputVisitor.apply(transformation)))
.flatMap(List::stream)
.collect(Collectors.toList()));
}
Expand All @@ -232,7 +249,14 @@ private List<OpenLineage.OutputDataset> getOutputDatasets(
log.debug("Getting output dataset from sink {}", sink.toString());
return outputVisitors.stream()
.filter(outputVisitor -> outputVisitor.isDefinedAt(sink))
.map(outputVisitor -> outputVisitor.apply(sink))
.map(
outputVisitor ->
meterRegistry
.timer(
"openlineage.flink.dataset.output.extraction.time",
"visitor",
outputVisitor.getClass().getName())
.record(() -> outputVisitor.apply(sink)))
.flatMap(List::stream)
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,24 @@

package io.openlineage.flink.visitor.lifecycle;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.circuitBreaker.CircuitBreakerFactory;
import io.openlineage.client.metrics.MicrometerProvider;
import io.openlineage.flink.api.OpenLineageContext;
import io.openlineage.flink.client.EventEmitter;
import io.openlineage.flink.client.FlinkConfigParser;
import io.openlineage.flink.client.FlinkOpenLineageConfig;
import io.openlineage.flink.client.Versions;
import java.util.List;
import java.util.UUID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.EnvironmentInformation;

public class FlinkExecutionContextFactory {

Expand All @@ -26,21 +33,64 @@ public static FlinkExecutionContext getContext(
JobID jobId,
String jobType,
List<Transformation<?>> transformations) {
FlinkOpenLineageConfig openLineageConfig = FlinkConfigParser.parse(configuration);
FlinkOpenLineageConfig config = FlinkConfigParser.parse(configuration);
return getContext(
config, jobNamespace, jobName, jobId, jobType, new EventEmitter(config), transformations);
}

public static FlinkExecutionContext getContext(
FlinkOpenLineageConfig config,
String jobNamespace,
String jobName,
JobID jobId,
String jobType,
EventEmitter eventEmitter,
List<Transformation<?>> transformations) {
return new FlinkExecutionContext.FlinkExecutionContextBuilder()
.jobId(jobId)
.processingType(jobType)
.jobName(jobName)
.jobNamespace(jobNamespace)
.transformations(transformations)
.runId(UUID.randomUUID())
.circuitBreaker(new CircuitBreakerFactory(openLineageConfig.getCircuitBreaker()).build())
.circuitBreaker(new CircuitBreakerFactory(config.getCircuitBreaker()).build())
.openLineageContext(
OpenLineageContext.builder()
.openLineage(new OpenLineage(EventEmitter.OPEN_LINEAGE_CLIENT_URI))
.build())
.eventEmitter(new EventEmitter(openLineageConfig))
.config(openLineageConfig)
.eventEmitter(eventEmitter)
.config(config)
.meterRegistry(initializeMetrics(config))
.build();
}

private static MeterRegistry initializeMetrics(FlinkOpenLineageConfig config) {
MeterRegistry meterRegistry =
MicrometerProvider.addMeterRegistryFromConfig(config.getMetricsConfig());
String disabledFacets;
if (config.getFacetsConfig() != null && config.getFacetsConfig().getDisabledFacets() != null) {
disabledFacets = String.join(";", config.getFacetsConfig().getDisabledFacets());
} else {
disabledFacets = "";
}
meterRegistry
.config()
.commonTags(
Tags.of(
Tag.of("openlineage.flink.integration.version", Versions.getVersion()),
Tag.of("openlineage.flink.version", EnvironmentInformation.getVersion()),
Tag.of("openlineage.flink.disabled.facets", disabledFacets)));
((CompositeMeterRegistry) meterRegistry)
.getRegistries()
.forEach(
r ->
r.config()
.commonTags(
Tags.of(
Tag.of("openlineage.flink.integration.version", Versions.getVersion()),
Tag.of(
"openlineage.flink.version", EnvironmentInformation.getVersion()),
Tag.of("openlineage.flink.disabled.facets", disabledFacets))));
return meterRegistry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static org.mockito.Mockito.when;

import com.github.tomakehurst.wiremock.WireMockServer;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.openlineage.flink.client.CheckpointFacet;
import io.openlineage.flink.visitor.lifecycle.FlinkExecutionContext;
import java.time.Duration;
Expand All @@ -44,6 +46,7 @@ class OpenLineageContinousJobTrackerTest {
OpenLineageContinousJobTracker tracker =
new OpenLineageContinousJobTracker(config, Duration.ofMillis(100));
FlinkExecutionContext context = mock(FlinkExecutionContext.class);
MeterRegistry meterRegistry;
JobID jobID = new JobID(1, 2);
CheckpointFacet expectedCheckpointFacet = new CheckpointFacet(1, 5, 6, 7, 1);

Expand All @@ -62,7 +65,9 @@ class OpenLineageContinousJobTrackerTest {
public void setup() {
wireMockServer.start();
configureFor("localhost", 18088);
meterRegistry = new SimpleMeterRegistry();
when(context.getJobId()).thenReturn(jobID);
when(context.getMeterRegistry()).thenReturn(meterRegistry);
when(config.get(RestOptions.ADDRESS)).thenReturn("localhost");
when(config.get(RestOptions.PORT)).thenReturn(18088);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,35 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.openlineage.client.OpenLineage.OwnershipJobFacetOwners;
import io.openlineage.client.OpenLineage.RunEvent;
import io.openlineage.client.OpenLineage.RunEvent.EventType;
import io.openlineage.client.metrics.MicrometerProvider;
import io.openlineage.flink.client.CheckpointFacet;
import io.openlineage.flink.client.EventEmitter;
import io.openlineage.flink.client.FlinkOpenLineageConfig;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

public class FlinkExecutionContextTest {

Configuration config = new Configuration();

@AfterEach
void cleanUp() {
MicrometerProvider.clear();
}

@Test
void testBuildEventForEventTypeWithJobOwnershipFacet() {
ConfigOption transportTypeOption =
Expand Down Expand Up @@ -76,4 +89,68 @@ void testBuildEventForEventTypeWithNoJobOwnersConfig() {
.getOwnership())
.isNull();
}

@Test
void testEmitCheckpointEventIncrementsMetrics() {
FlinkExecutionContext context = setupMetricsContext();
context.onJobCheckpoint(new CheckpointFacet(1, 2, 3, 4, 5));

assertThat(
MicrometerProvider.getMeterRegistry()
.counter("openlineage.flink.event.checkpoint.start")
.count())
.isGreaterThanOrEqualTo(1.0);
assertThat(
MicrometerProvider.getMeterRegistry()
.counter("openlineage.flink.event.checkpoint.end")
.count())
.isGreaterThanOrEqualTo(1.0);
}

@Test
void testEmitSubmittedEventIncrementsMetrics() {
FlinkExecutionContext context = setupMetricsContext();
context.onJobSubmitted();

assertThat(
MicrometerProvider.getMeterRegistry()
.counter("openlineage.flink.event.submitted.start")
.count())
.isGreaterThanOrEqualTo(1.0);
assertThat(
MicrometerProvider.getMeterRegistry()
.counter("openlineage.flink.event.submitted.end")
.count())
.isGreaterThanOrEqualTo(1.0);
}

@Test
void testEmitCompletedEventIncrementsMetrics() {
FlinkExecutionContext context = setupMetricsContext();
context.onJobCompleted(mock(JobExecutionResult.class));

assertThat(
MicrometerProvider.getMeterRegistry()
.counter("openlineage.flink.event.completed.start")
.count())
.isGreaterThanOrEqualTo(1.0);
assertThat(
MicrometerProvider.getMeterRegistry()
.counter("openlineage.flink.event.completed.end")
.count())
.isGreaterThanOrEqualTo(1.0);
}

FlinkExecutionContext setupMetricsContext() {
FlinkOpenLineageConfig config = mock(FlinkOpenLineageConfig.class);
when(config.getMetricsConfig()).thenReturn(Map.of("type", "simple"));
return FlinkExecutionContextFactory.getContext(
config,
"jobNamespace",
"jobName",
mock(JobID.class),
"streaming",
mock(EventEmitter.class),
Collections.emptyList());
}
}
3 changes: 3 additions & 0 deletions integration/flink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ shadowJar {
relocate 'org.yaml', 'io.openlineage.flink.shaded.org.yaml'
relocate 'org.apache.hc', 'io.openlineage.flink.shaded.org.apache.hc'
relocate 'org.apache.commons.codec', 'io.openlineage.flink.shaded.org.apache.commons.codec'
relocate 'org.apache.commons.lang3', 'io.openlineage.flink.shaded.org.apache.commons.lang3'
relocate "org.hdrhistogram", "io.openlineage.spark.shaded.org.hdrhistogram"
relocate "org.latencyutils", "io.openlineage.spark.shaded.org.latencyutils"
dependencies {
exclude(dependency('org.slf4j::'))
}
Expand Down
3 changes: 3 additions & 0 deletions integration/flink/shared/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ ext {
mockitoVersion = '5.2.0'
testcontainersVersion = '1.19.6'
isReleaseVersion = !version.endsWith('SNAPSHOT')
icebergVersion = "1.3.0"
micrometerVersion = "1.12.4"

versionsMap = [
"1.15": ["cassandra": "1.15.4", "kafka": flinkVersion, "jdbc": "1.15.4", "iceberg": "1.3.0", "alternativeShort": "1.15"],
Expand All @@ -84,6 +86,7 @@ ext {
dependencies {
api "io.openlineage:openlineage-java:${project.version}"
api "io.openlineage:openlineage-sql-java:${project.version}"
api "io.micrometer:micrometer-core:${micrometerVersion}"

compileOnly "org.projectlombok:lombok:${lombokVersion}"
compileOnly "org.apache.flink:flink-java:$flinkVersion"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
/* Copyright 2018-2024 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/

package io.openlineage.flink.client;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Properties;

public class Versions {

public static final URI OPEN_LINEAGE_PRODUCER_URI = getProducerUri();

private static URI getProducerUri() {
return URI.create(
String.format(
"https://github.com/OpenLineage/OpenLineage/tree/%s/integration/flink", getVersion()));
}

@SuppressWarnings("PMD")
public static String getVersion() {
try {
Properties properties = new Properties();
InputStream is = Versions.class.getResourceAsStream("version.properties");
properties.load(is);
return properties.getProperty("version");
} catch (IOException exception) {
return "main";
}
}
}
Loading

0 comments on commit 9cf73e7

Please sign in to comment.