-
Notifications
You must be signed in to change notification settings - Fork 131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[OPIK-294] Use application events to fill last experiment created at on datasets table. #511
[OPIK-294] Use application events to fill last experiment created at on datasets table. #511
Conversation
…o_fill_last_experiment_created_t
…_experiment_created_t' of https://github.com/comet-ml/opik into thiagohora/OPIK-294_use_application_events_to_fill_last_experiment_created_t
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The majority of comments are minor. Only conventions or things that in theory require some low cost polishing, but that are fine to go in order to favour velocity.
Regarding the instrumentation on the event bus, I'm wondering if it was worthy the effort. But I'm fine to go with it.
My main concern is triggering many small updates on the deletion of experiments. I think it can quickly degrade performance even with very few users. We likely want to double check.
Having said that, I'm happy to move forward for the sake of velocity.
apps/opik-backend/src/main/java/com/comet/opik/api/DatasetLastExperimentCreated.java
Outdated
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/api/events/ExperimentCreated.java
Outdated
Show resolved
Hide resolved
public ExperimentCreated(UUID experimentId, UUID datasetId, Instant createdAt, String workspaceId, | ||
String userName) { | ||
super(workspaceId, userName); | ||
this.experimentId = experimentId; | ||
this.datasetId = datasetId; | ||
this.createdAt = createdAt; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: boiler plate constructor, better defining it with Lombok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not possible due to the super constructor
public DatasetEventListener(EventBus eventBus, DatasetService datasetService, ExperimentService experimentService) { | ||
this.datasetService = datasetService; | ||
this.experimentService = experimentService; | ||
eventBus.register(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: better register with EventBus somewhere else, for instance in a module or some DI hook.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will try to find a better way in the next PR. The problem is that beans are lazy by default, and I think migrating to the module would need to build the Listener, which may involve domain dependencies. I would like to avoid that
.../opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/DatasetEventListener.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@WithSpan | ||
public Mono<List<ExperimentDatasetId>> getExperimentsDatasetIds(Set<UUID> ids) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: most likely you want to return a flux.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I do this, I would need to collect immediately because we want to publish a single event with all dataset IDs from the deleted experiments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And that's fine. The DAO should be more neutral (get arguments, produce neutral response). That type of aggregations are usually better in the service layer, as they're kind of business logic. Anyway, a minor comment.
protected final String userName; | ||
|
||
protected BaseEvent(String workspaceId, String userName) { | ||
this.traceId = Span.current().getSpanContext().getTraceId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: probably better init in the field declaration. So you can use Lombok constructor. Anyway, it's strange to use OTEL stuff as field for a POJO representing an event. Telemetry shouldn't be invasive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added it because sometimes we see traces without their root endpoint, and this can help us find them. I just didn't want to call Span.current().getSpanContext().getTraceId()
in all the places we publish an event.
apps/opik-backend/src/main/java/com/comet/opik/infrastructure/events/BaseEvent.java
Outdated
Show resolved
Hide resolved
import static com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils.TRACER_NAME; | ||
|
||
@Slf4j | ||
class EventInterceptor implements MethodInterceptor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why we need this telemetry part. If the service and DAO methods are well annotated with instrumentation, it might not be necessary to add telemetry to the events itself. Also, maybe the guava Event bus provides telemetry already or it can be simple wrapped with it.
Anyway, I'm fine with this. I was just wondering if it was worthy the effort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at agent documentation: https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/docs/supported-libraries.md#libraries--frameworks
It doesn't mention the Guava Event Bus. I added it so we could have a proper span for the event's execution as it runs async.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks very nice!
73bc106
to
06283e5
Compare
…o_fill_last_experiment_created_t
…o_fill_last_experiment_created_t
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is ready to go from my side.
@@ -76,4 +77,18 @@ List<Dataset> find(@Bind("limit") int limit, | |||
@SqlQuery("SELECT * FROM datasets WHERE workspace_id = :workspace_id AND name = :name") | |||
Optional<Dataset> findByName(@Bind("workspace_id") String workspaceId, @Bind("name") String name); | |||
|
|||
default int[] recordExperiments(Handle handle, String workspaceId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a blocker: you're mixing the fluent and declarative interfaces of JDBI. I believe you can implement batches with the following annotation: https://jdbi.org/#_sqlbatch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried, but it seems this only works for inserts.
I can try again later, but the bind of the collection didn't work
https://jdbi.org/#_prepared_batches
} | ||
|
||
@WithSpan | ||
public Mono<List<ExperimentDatasetId>> getExperimentsDatasetIds(Set<UUID> ids) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And that's fine. The DAO should be more neutral (get arguments, produce neutral response). That type of aggregations are usually better in the service layer, as they're kind of business logic. Anyway, a minor comment.
|
||
@Getter | ||
@Accessors(fluent = true) | ||
public abstract class BaseEvent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It wouldn't be a bad idea. But this is ok too.
import static com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils.TRACER_NAME; | ||
|
||
@Slf4j | ||
class EventInterceptor implements MethodInterceptor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks very nice!
.doOnSuccess(newExperiment -> eventBus.post(new ExperimentCreated( | ||
newExperiment.id(), | ||
newExperiment.datasetId(), | ||
newExperiment.createdAt(), | ||
workspaceId, | ||
userName))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about @SuperBuilder
?
…o_fill_last_experiment_created_t
Details
last_experiment_created_at
field when experiments are created or deleted.Issues
OPIK-294
Testing
last_experiment_created_at
field when an experiment is createdlast_experiment_created_at
field when an experiment is deletedlast_experiment_created_at
field to null when an experiment is deleted and no other experiments exist.