Skip to content

Commit

Permalink
feat(jdbc): refactor queue to use jooq
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jun 17, 2022
1 parent 06337f5 commit 746bcbd
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 132 deletions.
95 changes: 48 additions & 47 deletions jdbc-h2/src/main/java/io/kestra/runner/h2/H2Queue.java
Original file line number Diff line number Diff line change
@@ -1,74 +1,75 @@
package io.kestra.runner.h2;

import io.kestra.jdbc.repository.AbstractRepository;
import io.kestra.jdbc.runner.JdbcQueue;
import io.micronaut.context.ApplicationContext;
import io.micronaut.core.annotation.NonNull;
import org.jooq.DSLContext;
import org.jooq.Record;
import org.jooq.Result;
import org.jooq.*;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.jooq.util.h2.H2DataType;

import java.sql.Types;
import java.util.List;
import java.util.stream.Collectors;

public class H2Queue<T> extends JdbcQueue<T> {
public H2Queue(Class<T> cls, ApplicationContext applicationContext) {
super(cls, applicationContext);
}

protected Result<Record> receiveFetch(DSLContext ctx, @NonNull Integer offset) {
return ctx
.resultQuery(
"SELECT" + "\n" +
" \"value\"," + "\n" +
" \"offset\"" + "\n" +
"FROM " + table.getName() + "\n" +
"WHERE 1 = 1" + "\n" +
(offset != 0 ? "AND \"offset\" > ?" + "\n" : "") +
"AND \"type\" = ? " + "\n" +
"ORDER BY \"offset\" ASC" + "\n" +
"LIMIT 10" + "\n" +
"FOR UPDATE",
offset != 0 ? offset : this.cls.getName(),
this.cls.getName()
@Override
protected Result<Record> receiveFetch(DSLContext ctx, Integer offset) {
SelectConditionStep<Record2<Object, Object>> select = ctx
.select(
AbstractRepository.field("value"),
AbstractRepository.field("offset")
)
.fetch();
.from(this.table)
.where(AbstractRepository.field("type").eq(this.cls.getName()));

if (offset != 0) {
select = select.and(AbstractRepository.field("offset").gt(offset));
}

return select
.orderBy(AbstractRepository.field("offset").asc())
.limit(10)
.forUpdate()
.fetchMany()
.get(0);
}

protected Result<Record> receiveFetch(DSLContext ctx, String consumerGroup) {
return ctx
.resultQuery(
"SELECT" + "\n" +
" \"value\"," + "\n" +
" \"offset\"" + "\n" +
"FROM " + table.getName() + "\n" +
"WHERE (" +
" \"consumers\" IS NULL" + "\n" +
" OR NOT(ARRAY_CONTAINS(\"consumers\", ?))" + "\n" +
")" + "\n" +
"AND \"type\" = ?" + "\n" +
"ORDER BY \"offset\" ASC" + "\n" +
"LIMIT 10" + "\n" +
"FOR UPDATE",
consumerGroup,
this.cls.getName()
.select(
AbstractRepository.field("value"),
AbstractRepository.field("offset")
)
.fetch();
.from(this.table)
.where(AbstractRepository.field("type").eq(this.cls.getName()))
.and(DSL.or(List.of(
AbstractRepository.field("consumers").isNull(),
DSL.condition("NOT(ARRAY_CONTAINS(\"consumers\", ?))", consumerGroup)
)))
.orderBy(AbstractRepository.field("offset").asc())
.limit(10)
.forUpdate()
.fetchMany()
.get(0);
}

@Override
protected void updateGroupOffsets(DSLContext ctx, String consumerGroup, List<Integer> offsets) {
ctx
.query(
"UPDATE " + table.getName() + "\n" +
"SET \"consumers\" = COALESCE(\"consumers\", ARRAY[]) || ARRAY['" + consumerGroup + "']\n" +
"WHERE \"offset\" IN (" +
offsets
.stream()
.map(Object::toString)
.collect(Collectors.joining(",")) +
")",
consumerGroup
.update(DSL.table(table.getName()))
.set(
AbstractRepository.field("consumers"),
DSL.field(
"ARRAY_APPEND(COALESCE(\"consumers\", ARRAY[]), ?)",
SQLDataType.VARCHAR(50).getArrayType(),
(Object) new String[]{consumerGroup}
)
)
.where(AbstractRepository.field("offset").in(offsets.toArray(Integer[]::new)))
.execute();
}
}
87 changes: 41 additions & 46 deletions jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlQueue.java
Original file line number Diff line number Diff line change
@@ -1,75 +1,70 @@
package io.kestra.runner.mysql;

import io.kestra.jdbc.repository.AbstractRepository;
import io.kestra.jdbc.runner.JdbcQueue;
import io.micronaut.context.ApplicationContext;
import io.micronaut.core.annotation.NonNull;
import lombok.SneakyThrows;
import org.jooq.*;
import org.jooq.impl.DSL;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class MysqlQueue<T> extends JdbcQueue<T> {
public MysqlQueue(Class<T> cls, ApplicationContext applicationContext) {
super(cls, applicationContext);
}

protected Result<Record> receiveFetch(DSLContext ctx, @NonNull Integer offset) {
return ctx
.resultQuery(
"SELECT" + "\n" +
" value," + "\n" +
" offset" + "\n" +
"FROM " + table.getName() + "\n" +
"WHERE 1 = 1" + "\n" +
(offset != 0 ? "AND offset > ?" + "\n" : "") +
"AND type = ? " + "\n" +
"ORDER BY offset ASC" + "\n" +
"LIMIT 10" + "\n" +
"FOR UPDATE SKIP LOCKED",
offset != 0 ? offset : this.cls.getName(),
this.cls.getName()
@Override
protected Result<Record> receiveFetch(DSLContext ctx, Integer offset) {
SelectConditionStep<Record2<Object, Object>> select = ctx
.select(
AbstractRepository.field("value"),
AbstractRepository.field("offset")
)
.fetch();
.from(this.table)
.where(AbstractRepository.field("type").eq(this.cls.getName()));

if (offset != 0) {
select = select.and(AbstractRepository.field("offset").gt(offset));
}

return select
.orderBy(AbstractRepository.field("offset").asc())
.limit(10)
.forUpdate()
.skipLocked()
.fetchMany()
.get(0);
}

protected Result<Record> receiveFetch(DSLContext ctx, String consumerGroup) {
return ctx
.resultQuery(
"SELECT" + "\n" +
" value," + "\n" +
" offset" + "\n" +
"FROM " + table.getName() + "\n" +
"WHERE (" +
" consumers IS NULL" + "\n" +
" OR NOT(FIND_IN_SET(?, consumers) > 0)" + "\n" +
")" + "\n" +
"AND type = ?" + "\n" +
"ORDER BY offset ASC" + "\n" +
"LIMIT 10" + "\n" +
"FOR UPDATE SKIP LOCKED",
consumerGroup,
this.cls.getName()
.select(
AbstractRepository.field("value"),
AbstractRepository.field("offset")
)
.fetch();
.from(this.table)
.where(AbstractRepository.field("type").eq(this.cls.getName()))
.and(DSL.or(List.of(
AbstractRepository.field("consumers").isNull(),
DSL.condition("NOT(FIND_IN_SET(?, consumers) > 0)", consumerGroup)
)))
.orderBy(AbstractRepository.field("offset").asc())
.limit(10)
.forUpdate()
.skipLocked()
.fetchMany()
.get(0);
}

@Override
protected void updateGroupOffsets(DSLContext ctx, String consumerGroup, List<Integer> offsets) {
ctx
.query(
"UPDATE " + table.getName() + "\n" +
"SET consumers = CONCAT_WS(',', consumers, '" + consumerGroup + "')\n" +
"WHERE offset IN (" +
offsets
.stream()
.map(Object::toString)
.collect(Collectors.joining(",")) +
")",
consumerGroup
.update(DSL.table(table.getName()))
.set(
AbstractRepository.field("consumers"),
DSL.field("CONCAT_WS(',', consumers, ?)", String.class, consumerGroup)
)
.where(AbstractRepository.field("offset").in(offsets.toArray(Integer[]::new)))
.execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
import lombok.SneakyThrows;
import org.jooq.*;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class PostgresQueue<T> extends JdbcQueue<T> {
public PostgresQueue(Class<T> cls, ApplicationContext applicationContext) {
Expand All @@ -36,56 +36,60 @@ protected Map<Field<Object>, Object> produceFields(String key, T message) {
}

protected Result<Record> receiveFetch(DSLContext ctx, @NonNull Integer offset) {
return ctx
.resultQuery(
"SELECT" + "\n" +
" \"value\"," + "\n" +
" \"offset\"" + "\n" +
"FROM " + table.getName() + "\n" +
"WHERE 1 = 1" + "\n" +
(offset != 0 ? "AND \"offset\" > ?" + "\n" : "") +
"AND type = CAST(? AS queue_type)" + "\n" +
"ORDER BY \"offset\" ASC" + "\n" +
"LIMIT 10" + "\n" +
"FOR UPDATE SKIP LOCKED",
offset != 0 ? offset : this.cls.getName(),
this.cls.getName()
SelectConditionStep<Record2<Object, Object>> select = ctx
.select(
AbstractRepository.field("value"),
AbstractRepository.field("offset")
)
.fetch();
.from(this.table)
.where(DSL.condition("type = CAST(? AS queue_type)", this.cls.getName()));

if (offset != 0) {
select = select.and(AbstractRepository.field("offset").gt(offset));
}

return select
.orderBy(AbstractRepository.field("offset").asc())
.limit(10)
.forUpdate()
.skipLocked()
.fetchMany()
.get(0);
}

protected Result<Record> receiveFetch(DSLContext ctx, String consumerGroup) {
return ctx
.resultQuery(
"SELECT" + "\n" +
" \"value\"," + "\n" +
" \"offset\"" + "\n" +
"FROM " + table.getName() + "\n" +
"WHERE (" +
" \"consumers\" IS NULL" + "\n" +
" OR NOT(CAST(? AS queue_consumers) = ANY(\"consumers\"))" + "\n" +
")" + "\n" +
"AND type = CAST(? AS queue_type)" + "\n" +
"ORDER BY \"offset\" ASC" + "\n" +
"LIMIT 10" + "\n" +
"FOR UPDATE SKIP LOCKED",
consumerGroup,
this.cls.getName()
.select(
AbstractRepository.field("value"),
AbstractRepository.field("offset")
)
.fetch();
.from(this.table)
.where(DSL.condition("type = CAST(? AS queue_type)", this.cls.getName()))
.and(DSL.or(List.of(
AbstractRepository.field("consumers").isNull(),
DSL.condition("NOT(CAST(? AS queue_consumers) = ANY(\"consumers\"))", consumerGroup)
)))
.orderBy(AbstractRepository.field("offset").asc())
.limit(10)
.forUpdate()
.skipLocked()
.fetchMany()
.get(0);
}

@Override
protected void updateGroupOffsets(DSLContext ctx, String consumerGroup, List<Integer> offsets) {
ctx
.query(
"UPDATE " + table.getName() + "\n" +
"SET \"consumers\" = \"consumers\" || '{" + consumerGroup + "}' \n" +
"WHERE \"offset\" IN (" + offsets.stream()
.map(Object::toString)
.collect(Collectors.joining(",")) + ")",
consumerGroup
.update(DSL.table(table.getName()))
.set(
AbstractRepository.field("consumers"),
DSL.field(
"\"consumers\" || ?",
SQLDataType.VARCHAR(50).getArrayType(),
(Object) new String[]{consumerGroup}
)
)
.where(AbstractRepository.field("offset").in(offsets.toArray(Integer[]::new)))
.execute();
}
}

0 comments on commit 746bcbd

Please sign in to comment.