Skip to content

Commit

Permalink
Addressing PR reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
thiagohora committed Oct 30, 2024
1 parent e0d3628 commit 73bc106
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 43 deletions.
5 changes: 0 additions & 5 deletions apps/opik-backend/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,6 @@
<artifactId>java-uuid-generator</artifactId>
<version>${uuid.java.generator.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.3.1-jre</version>
</dependency>

<!-- Test -->

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.comet.opik.api;

import lombok.NonNull;

import java.time.Instant;
import java.util.UUID;

public record DatasetLastExperimentCreated(UUID datasetId, Instant experimentCreatedAt) {
public record DatasetLastExperimentCreated(@NonNull UUID datasetId, Instant experimentCreatedAt) {
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.comet.opik.api.events;

import com.comet.opik.infrastructure.events.BaseEvent;
import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
import lombok.experimental.Accessors;
import lombok.experimental.SuperBuilder;

import java.time.Instant;
import java.util.UUID;
Expand All @@ -11,12 +14,13 @@
@Accessors(fluent = true)
public class ExperimentCreated extends BaseEvent {

private final UUID experimentId;
private final UUID datasetId;
private final Instant createdAt;
private final @NonNull UUID experimentId;
private final @NonNull UUID datasetId;
private final @NonNull Instant createdAt;

public ExperimentCreated(UUID experimentId, UUID datasetId, Instant createdAt, String workspaceId,
String userName) {
public ExperimentCreated(@NonNull UUID experimentId, @NonNull UUID datasetId, @NonNull Instant createdAt,
@NonNull String workspaceId,
@NonNull String userName) {
super(workspaceId, userName);
this.experimentId = experimentId;
this.datasetId = datasetId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@

import com.comet.opik.infrastructure.events.BaseEvent;
import lombok.Getter;
import lombok.NonNull;
import lombok.experimental.Accessors;

import java.util.Set;
import java.util.UUID;

@Getter
@Accessors(fluent = true)
public class ExperimentsDeleted extends BaseEvent {
public final class ExperimentsDeleted extends BaseEvent {

private final Set<UUID> datasetIds;
private final @NonNull Set<UUID> datasetIds;

public ExperimentsDeleted(Set<UUID> datasetIds, String workspaceId, String userName) {
public ExperimentsDeleted(@NonNull Set<UUID> datasetIds, @NonNull String workspaceId, @NonNull String userName) {
super(workspaceId, userName);
this.datasetIds = datasetIds;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.comet.opik.api.resources.v1.events;

import com.comet.opik.api.DatasetLastExperimentCreated;
import com.comet.opik.api.events.ExperimentCreated;
import com.comet.opik.api.events.ExperimentsDeleted;
import com.comet.opik.domain.DatasetService;
Expand All @@ -17,6 +18,7 @@
import reactor.util.context.Context;
import ru.vyarus.dropwizard.guice.module.installer.feature.eager.EagerSingleton;

import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
Expand All @@ -39,7 +41,7 @@ public DatasetEventListener(EventBus eventBus, DatasetService datasetService, Ex
public void onExperimentCreated(ExperimentCreated event) {
log.info("Recording experiment for dataset '{}'", event.datasetId());

datasetService.recordExperiment(event.datasetId(), event.createdAt())
datasetService.recordExperiments(Set.of(new DatasetLastExperimentCreated(event.datasetId(), event.createdAt())))
.contextWrite(ctx -> setContext(event, ctx))
.block();

Expand All @@ -66,43 +68,46 @@ public void onExperimentsDeleted(ExperimentsDeleted event) {

private Set<UUID> updateAndGetDatasetsWithExperiments(ExperimentsDeleted event) {
return experimentService.getMostRecentCreatedExperimentFromDatasets(event.datasetIds())
.flatMap(dto -> {
log.info("Updating dataset '{}' with last experiment created time", dto.datasetId());
.collect(Collectors.toSet())
.flatMap(datasets -> {
log.info("Updating datasets '{}' with last experiment created time", datasets);

if (datasets.isEmpty()) {
return Mono.just(new HashSet<UUID>());
}

return datasetService.recordExperiment(dto.datasetId(), dto.experimentCreatedAt())
return datasetService.recordExperiments(datasets)
.doFinally(signalType -> {
if (signalType == SignalType.ON_ERROR) {
log.error("Failed to update dataset '{}' with last experiment created time",
dto.datasetId());
log.error("Failed to update datasets '{}' with last experiment created time", datasets);
} else {
log.info("Updated dataset '{}' with last experiment created time", dto.datasetId());
log.info("Updated datasets '{}' with last experiment created time", datasets);
}
})
.then(Mono.just(dto.datasetId()));
.then(Mono.just(datasets.stream().map(DatasetLastExperimentCreated::datasetId).collect(Collectors.toSet())));
})
.contextWrite(ctx -> setContext(event, ctx))
.collect(Collectors.toSet())
.block();
}

private void updateDatasetsWithoutExperiments(ExperimentsDeleted event, Set<UUID> updatedDatasets) {
Flux.fromIterable(SetUtils.difference(event.datasetIds(), updatedDatasets))
.flatMap(datasetId -> {
log.info("Updating dataset '{}' with last experiment created time", datasetId);
.map(datasetId -> new DatasetLastExperimentCreated(datasetId, null))
.collect(Collectors.toSet())
.flatMap(datasets-> {
log.info("Updating datasets '{}' with last experiment created time null", datasets);

return datasetService.recordExperiment(datasetId, null)
return datasetService.recordExperiments(datasets)
.doFinally(signalType -> {
if (signalType == SignalType.ON_ERROR) {
log.error("Failed to update dataset '{}' with last experiment created time",
datasetId);
log.error("Failed to update dataset '{}' with last experiment created time null", datasets);
} else {
log.info("Updated dataset '{}' with last experiment created time", datasetId);
log.info("Updated dataset '{}' with last experiment created time", datasets);
}
});

})
.contextWrite(ctx -> setContext(event, ctx))
.collectList()
.block();
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
package com.comet.opik.domain;

import com.comet.opik.api.Dataset;
import com.comet.opik.api.DatasetLastExperimentCreated;
import com.comet.opik.api.DatasetUpdate;
import com.comet.opik.infrastructure.db.UUIDArgumentFactory;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.statement.PreparedBatch;
import org.jdbi.v3.sqlobject.config.RegisterArgumentFactory;
import org.jdbi.v3.sqlobject.config.RegisterConstructorMapper;
import org.jdbi.v3.sqlobject.customizer.AllowUnusedBindings;
import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.customizer.BindBean;
import org.jdbi.v3.sqlobject.customizer.BindList;
import org.jdbi.v3.sqlobject.customizer.BindMethods;
import org.jdbi.v3.sqlobject.customizer.Define;
import org.jdbi.v3.sqlobject.statement.SqlBatch;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import org.jdbi.v3.stringtemplate4.UseStringTemplateEngine;

import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -74,7 +80,17 @@ List<Dataset> find(@Bind("limit") int limit,
@SqlQuery("SELECT * FROM datasets WHERE workspace_id = :workspace_id AND name = :name")
Optional<Dataset> findByName(@Bind("workspace_id") String workspaceId, @Bind("name") String name);

@SqlUpdate("UPDATE datasets SET last_created_experiment_at = :time WHERE id = :id AND workspace_id = :workspace_id")
int recordExperiment(@Bind("workspace_id") String workspaceId, @Bind("id") UUID datasetId,
@Bind("time") Instant time);

default int[] recordExperiments(Handle handle, String workspaceId, Collection<DatasetLastExperimentCreated> datasets) {
PreparedBatch batch = handle.prepareBatch("UPDATE datasets SET last_created_experiment_at = :experimentCreatedAt WHERE id = :datasetId AND workspace_id = :workspace_id");
for (DatasetLastExperimentCreated dataset : datasets) {
batch.bind("workspace_id", workspaceId)
.bind("datasetId", dataset.datasetId())
.bind("experimentCreatedAt", dataset.experimentCreatedAt());

batch.add();
}
return batch.execute();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import com.comet.opik.api.Dataset;
import com.comet.opik.api.DatasetCriteria;
import com.comet.opik.api.DatasetIdentifier;
import com.comet.opik.api.DatasetLastExperimentCreated;
import com.comet.opik.api.DatasetUpdate;
import com.comet.opik.api.error.EntityAlreadyExistsException;
import com.comet.opik.api.error.ErrorMessage;
import com.comet.opik.infrastructure.auth.RequestContext;
import com.comet.opik.utils.AsyncUtils;
import com.google.common.base.Preconditions;
import com.google.inject.ImplementedBy;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import jakarta.inject.Inject;
Expand All @@ -18,13 +20,13 @@
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.jdbi.v3.core.statement.UnableToExecuteStatementException;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import ru.vyarus.guicey.jdbi3.tx.TransactionTemplate;

import java.sql.SQLIntegrityConstraintViolationException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -60,7 +62,7 @@ public interface DatasetService {

DatasetPage find(int page, int size, DatasetCriteria criteria);

Mono<Void> recordExperiment(UUID datasetId, Instant instant);
Mono<Void> recordExperiments(Set<DatasetLastExperimentCreated> datasetsLastExperimentCreated);
}

@Singleton
Expand Down Expand Up @@ -287,18 +289,26 @@ public DatasetPage find(int page, int size, DatasetCriteria criteria) {

@Override
@WithSpan
public Mono<Void> recordExperiment(UUID datasetId, Instant instant) {
public Mono<Void> recordExperiments(Set<DatasetLastExperimentCreated> datasetsLastExperimentCreated) {
Preconditions.checkArgument(CollectionUtils.isNotEmpty(datasetsLastExperimentCreated), "Argument 'datasetsLastExperimentCreated' must not be empty");

return Mono.deferContextual(ctx -> {
String workspaceId = ctx.get(RequestContext.WORKSPACE_ID);

return Mono.fromRunnable(() -> template.inTransaction(WRITE, handle -> {

var dao = handle.attach(DatasetDAO.class);

if (dao.recordExperiment(workspaceId, datasetId, instant) > 0) {
log.info("Recorded experiment for dataset '{}'", datasetId);
} else {
log.warn("Record discarded for dataset '{}'", datasetId);
int[] results = dao.recordExperiments(handle, workspaceId, datasetsLastExperimentCreated);

List<DatasetLastExperimentCreated> datasets = List.copyOf(datasetsLastExperimentCreated);

for (int i = 0; i < results.length; i++) {
if (results[i] > 1) {
log.info("Recorded experiment for dataset '{}'", datasets.get(i).datasetId());
} else {
log.warn("Record discarded for dataset '{}'", datasets.get(i).datasetId());
}
}

return Mono.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.google.inject.matcher.Matchers;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import reactor.core.scheduler.Schedulers;

import java.util.concurrent.Executors;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
import com.comet.opik.api.resources.utils.WireMockUtils;
import com.comet.opik.infrastructure.DatabaseAnalyticsFactory;
import com.comet.opik.podam.PodamFactoryUtils;
import com.fasterxml.uuid.Generators;
import com.fasterxml.uuid.impl.TimeBasedEpochGenerator;
import com.redis.testcontainers.RedisContainer;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.core.HttpHeaders;
Expand Down

0 comments on commit 73bc106

Please sign in to comment.