Skip to content

Commit

Permalink
[OPIK-130] Add anonymous usage information
Browse files Browse the repository at this point in the history
  • Loading branch information
thiagohora committed Sep 26, 2024
1 parent 4fab35c commit 8a3afe6
Show file tree
Hide file tree
Showing 14 changed files with 486 additions and 6 deletions.
14 changes: 10 additions & 4 deletions apps/opik-backend/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,13 @@ server:
enabled: true

rateLimit:
enabled: ${RATE_LIMIT_ENABLED:-false}
generalLimit:
limit: ${RATE_LIMIT_GENERAL_EVENTS_LIMIT:-10000}
durationInSeconds: ${RATE_LIMIT_GENERAL_EVENTS_DURATION_IN_SEC:-60}
enabled: ${RATE_LIMIT_ENABLED:-false}
generalLimit:
limit: ${RATE_LIMIT_GENERAL_EVENTS_LIMIT:-10000}
durationInSeconds: ${RATE_LIMIT_GENERAL_EVENTS_DURATION_IN_SEC:-60}

metadata:
version: ${OPIK_VERSION:-latest}
usageReport:
enabled: ${OPIK_REPORTING_ENABLED:-true}
url: ${OPIK_REPORTING_URL:-}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.comet.opik.infrastructure.OpikConfiguration;
import com.comet.opik.infrastructure.auth.AuthModule;
import com.comet.opik.infrastructure.bi.ApplicationStartupListener;
import com.comet.opik.infrastructure.bundle.LiquibaseBundle;
import com.comet.opik.infrastructure.db.DatabaseAnalyticsModule;
import com.comet.opik.infrastructure.db.IdGeneratorModule;
Expand Down Expand Up @@ -62,6 +63,7 @@ public void initialize(Bootstrap<OpikConfiguration> bootstrap) {
.withPlugins(new SqlObjectPlugin(), new Jackson2Plugin()))
.modules(new DatabaseAnalyticsModule(), new IdGeneratorModule(), new AuthModule(), new RedisModule(),
new RateLimitModule(), new NameGeneratorModule())
.listen(new ApplicationStartupListener())
.enableAutoConfig()
.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,8 @@ public class OpikConfiguration extends Configuration {
@Valid
@NotNull @JsonProperty
private RateLimitConfig rateLimit = new RateLimitConfig();

@Valid
@NotNull @JsonProperty
private OpikMetadataConfig metadata = new OpikMetadataConfig();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.comet.opik.infrastructure;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import lombok.Data;

@Data
public class OpikMetadataConfig {

public record UsageReport(@Valid @JsonProperty boolean enabled, @Valid @JsonProperty String url) {}

@Valid
@JsonProperty
@NotNull
private String version;

@Valid
@NotNull
@JsonProperty
private UsageReport usageReport;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package com.comet.opik.infrastructure.bi;

import com.comet.opik.domain.IdGenerator;
import com.comet.opik.infrastructure.OpikConfiguration;
import com.comet.opik.infrastructure.lock.LockService;
import com.google.inject.Injector;
import jakarta.inject.Provider;
import jakarta.inject.Singleton;
import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.core.Response;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import ru.vyarus.dropwizard.guice.module.context.SharedConfigurationState;
import ru.vyarus.dropwizard.guice.module.lifecycle.GuiceyLifecycle;
import ru.vyarus.dropwizard.guice.module.lifecycle.GuiceyLifecycleListener;
import ru.vyarus.dropwizard.guice.module.lifecycle.event.GuiceyLifecycleEvent;
import ru.vyarus.dropwizard.guice.module.lifecycle.event.InjectorPhaseEvent;
import ru.vyarus.dropwizard.guice.module.lifecycle.event.JerseyPhaseEvent;
import ru.vyarus.dropwizard.guice.module.lifecycle.event.run.InjectorCreationEvent;

import java.net.URI;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

import static com.comet.opik.infrastructure.lock.LockService.Lock;

@Slf4j
@Singleton
@RequiredArgsConstructor
public class ApplicationStartupListener implements GuiceyLifecycleListener {

// This event cannot depend on authentication
private final Client client = ClientBuilder.newClient();
private final AtomicReference<Injector> injector = new AtomicReference<>();

@Override
public void onEvent(GuiceyLifecycleEvent event) {

if (event instanceof InjectorPhaseEvent injectorEvent) {
injector.set(injectorEvent.getInjector());
}

if (event.getType() == GuiceyLifecycle.ApplicationStarted) {

String eventType = GuiceyLifecycle.ApplicationStarted.name();

var config = (OpikConfiguration) event.getSharedState().getConfiguration().get();

if (!config.getMetadata().getUsageReport().enabled()) {
log.info("Usage report is disabled");
return;
}

if (StringUtils.isEmpty(config.getMetadata().getUsageReport().url())) {
log.warn("Usage report URL is not set");
return;
}

var lockService = injector.get().getInstance(LockService.class);
var generator = injector.get().getInstance(IdGenerator.class);
var usageReport = injector.get().getInstance(UsageReportDAO.class);

var lock = new Lock("opik-%s".formatted(eventType));

lockService.executeWithLock(lock, tryToReportStartupEvent(usageReport, generator, eventType, config))
.subscribeOn(Schedulers.boundedElastic())
.block();
}
}

private Mono<Mono<Object>> tryToReportStartupEvent(UsageReportDAO usageReport, IdGenerator generator, String eventType, OpikConfiguration config) {
return Mono.fromCallable(() -> {

Optional<String> anonymousId = getAnonymousId(usageReport, generator);

log.info("Anonymous ID: {}", anonymousId.orElse("not found"));

if (anonymousId.isEmpty()) {
log.warn("Anonymous ID not found, skipping event reporting");
return Mono.empty();
}

if (usageReport.isEventReported(eventType)) {
log.info("Event already reported");
return Mono.empty();
}

usageReport.addEvent(eventType);

reportEvent(anonymousId.get(), eventType, config, usageReport);

return Mono.empty();
});
}

private static Optional<String> getAnonymousId(UsageReportDAO usageReport, IdGenerator generator) {
var anonymousId = usageReport.getAnonymousId();

if (anonymousId.isEmpty()) {
log.info("Anonymous ID not found, generating a new one");
var newId = generator.generateId();
log.info("Generated new ID: {}", newId);

// Save the new ID
usageReport.saveAnonymousId(newId.toString());

anonymousId = Optional.of(newId.toString());
}

return anonymousId;
}

private void reportEvent(String anonymousId, String eventType, OpikConfiguration config, UsageReportDAO usageReport) {

var startupEvent = new OpikStartupEvent(
anonymousId,
eventType,
Map.of("opik_app_version", config.getMetadata().getVersion())
);

try (Response response = client.target(URI.create(config.getMetadata().getUsageReport().url()))
.request()
.post(Entity.json(startupEvent))) {

if (response.getStatusInfo().getFamily() == Response.Status.Family.SUCCESSFUL) {
log.info("Event reported successfully");
usageReport.markEventAsReported(eventType);
} else {
log.warn("Failed to report event: {}", response.getStatusInfo());
if (response.hasEntity()) {
log.warn("Response: {}", response.readEntity(String.class));
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.comet.opik.infrastructure.bi;

enum Metadata {
anonymous_id,
;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.comet.opik.infrastructure.bi;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.Builder;

import java.util.Map;

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
record OpikStartupEvent(String anonymousId, String eventType, Map<String, String> eventProperties) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.comet.opik.infrastructure.bi;

import com.google.inject.ImplementedBy;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.core.statement.UnableToExecuteStatementException;

import java.sql.SQLIntegrityConstraintViolationException;
import java.util.Optional;

@ImplementedBy(UsageReportDAOImpl.class)
interface UsageReportDAO {

Optional<String> getAnonymousId();

void saveAnonymousId(@NonNull String id);

boolean isEventReported(@NonNull String eventType);

void addEvent(@NonNull String eventType);

void markEventAsReported(@NonNull String eventType);
}

@Slf4j
@RequiredArgsConstructor(onConstructor_ = @Inject)
@Singleton
class UsageReportDAOImpl implements UsageReportDAO {

private final Jdbi jdbi;

public Optional<String> getAnonymousId() {
return jdbi.inTransaction(handle -> handle.createQuery("SELECT value FROM metadata WHERE `key` = :key")
.bind("key", Metadata.anonymous_id)
.mapTo(String.class)
.findFirst());
}

public void saveAnonymousId(@NonNull String id) {
jdbi.useHandle(handle -> handle.createUpdate("INSERT INTO metadata (`key`, value) VALUES (:key, :value)")
.bind("key", Metadata.anonymous_id)
.bind("value", id)
.execute());
}

public boolean isEventReported(@NonNull String eventType) {
return jdbi.inTransaction(handle -> handle.createQuery("SELECT COUNT(*) > 0 FROM usage_information WHERE event_type = :eventType AND reported_at IS NOT NULL")
.bind("eventType", eventType)
.mapTo(Boolean.class)
.one());
}

public void addEvent(@NonNull String eventType) {
try {
jdbi.useHandle(handle -> handle.createUpdate("INSERT INTO usage_information (event_type) VALUES (:eventType)")
.bind("eventType",eventType)
.execute());
} catch (UnableToExecuteStatementException e) {
if (e.getCause() instanceof SQLIntegrityConstraintViolationException) {
log.warn("Event type already exists: {}", eventType);
} else {
log.error("Failed to add event", e);
}
}
}

public void markEventAsReported(@NonNull String eventType) {
jdbi.useHandle(handle -> handle.createUpdate("UPDATE usage_information SET reported_at = current_timestamp(6) WHERE event_type = :eventType")
.bind("eventType", eventType)
.execute());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
--liquibase formatted sql
--changeset thiagohora:create_usage_usage_information_table

CREATE TABLE metadata (
`key` VARCHAR(255) NOT NULL,
value VARCHAR(255) NOT NULL,
last_updated_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),
PRIMARY KEY `metadata_pk` (`key`)
);

CREATE TABLE usage_information (
event_type VARCHAR(255) NOT NULL,
last_updated_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),
reported_at TIMESTAMP(6) DEFAULT NULL,
PRIMARY KEY `usage_information_pk` (event_type)
);

--rollback DROP TABLE IF EXISTS metadata;
--rollback DROP TABLE IF EXISTS usage_information;
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ public static void runDbMigration(Jdbi jdbi, Map<String, String> parameters) {
}
}

public static void runDbMigration(Connection connection, Map<String, String> parameters) {
runDbMigration(connection, MYSQL_CHANGELOG_FILE, parameters);
}

public static void runDbMigration(Connection connection, String changeLogFile, Map<String, String> parameters) {
try {
var database = DatabaseFactory.getInstance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@
public class MySQLContainerUtils {

public static MySQLContainer<?> newMySQLContainer() {
return newMySQLContainer(true);
}

public static MySQLContainer<?> newMySQLContainer(boolean reusable) {
return new MySQLContainer<>(DockerImageName.parse("mysql"))
.withUrlParam("createDatabaseIfNotExist", "true")
.withUrlParam("rewriteBatchedStatements", "true")
.withDatabaseName("opik")
.withPassword("opik")
.withUsername("opik")
.withReuse(true);
.withReuse(reusable);
}

public static Map<String, String> migrationParameters() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ public record AppContextConfig(
List<Object> customBeans,
String jdbcUserName,
String jdbcDriverClass,
String awsJdbcDriverPlugins) {
String awsJdbcDriverPlugins,
boolean usageReportEnabled,
String usageReportUrl) {
}

public static TestDropwizardAppExtension newTestDropwizardAppExtension(String jdbcUrl,
Expand Down Expand Up @@ -151,6 +153,14 @@ public void run(GuiceyEnvironment environment) {
}
}

if (appContextConfig.usageReportEnabled()) {
list.add("metadata.usageReport.enabled: true");

if (appContextConfig.usageReportUrl() != null) {
list.add("metadata.usageReport.url: %s".formatted(appContextConfig.usageReportUrl()));
}
}

return TestDropwizardAppExtension.forApp(OpikApplication.class)
.config("src/test/resources/config-test.yml")
.configOverrides(list.toArray(new String[0]))
Expand Down
Loading

0 comments on commit 8a3afe6

Please sign in to comment.