Skip to content

Commit

Permalink
add more
Browse files Browse the repository at this point in the history
  • Loading branch information
alovew committed Aug 5, 2022
1 parent d3355c0 commit 141cc96
Show file tree
Hide file tree
Showing 26 changed files with 304 additions and 252 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

@SuppressWarnings("PMD.JUnitTestsShouldIncludeAssert")
class JsonSchemasTest {

private static final String UNCHECKED = "unchecked";
private static final String NAME = "name";
private static final String PROPERTIES = "properties";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.junit.jupiter.api.Test;

class JsonsTest {

private static final String SERIALIZED_JSON = "{\"str\":\"abc\",\"num\":999,\"numLong\":888}";
private static final String SERIALIZED_JSON2 = "{\"str\":\"abc\"}";
private static final String ABC = "abc";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.junit.jupiter.api.Test;

class MoreResourcesTest {

private static final String CONTENT_1 = "content1\n";
private static final String CONTENT_2 = "content2\n";
private static final String RESOURCE_TEST = "resource_test";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.junit.jupiter.api.Test;

class AirbyteVersionTest {

private static final String VERSION_678 = "6.7.8";
private static final String VERSION_678_OMEGA = "6.7.8-omega";
private static final String VERSION_678_ALPHA = "6.7.8-alpha";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.junit.jupiter.api.Test;

class YamlsTest {

private static final String LINE_BREAK = "---\n";
private static final String STR_ABC = "str: \"abc\"\n";
private static final String ABC = "abc";
Expand Down Expand Up @@ -104,14 +105,14 @@ void testDeserializeToJsonNode() {
assertEquals(
"{\"str\":\"abc\"}",
Yamls.deserialize(
LINE_BREAK
LINE_BREAK
+ STR_ABC)
.toString());

assertEquals(
"[{\"str\":\"abc\"},{\"str\":\"abc\"}]",
Yamls.deserialize(
LINE_BREAK
LINE_BREAK
+ "- str: \"abc\"\n"
+ "- str: \"abc\"\n")
.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
public class V0_35_26_001__PersistDiscoveredCatalog extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_35_26_001__PersistDiscoveredCatalog.class);
private static final String ACTOR_CATALOG = "actor_catalog";

@Override
public void migrate(final Context context) throws Exception {
Expand All @@ -48,15 +49,15 @@ private static void createActorCatalog(final DSLContext ctx) {
final Field<JSONB> catalog = DSL.field("catalog", SQLDataType.JSONB.nullable(false));
final Field<String> catalogHash = DSL.field("catalog_hash", SQLDataType.VARCHAR(32).nullable(false));
final Field<OffsetDateTime> createdAt = DSL.field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false));
ctx.createTableIfNotExists("actor_catalog")
ctx.createTableIfNotExists(ACTOR_CATALOG)
.columns(id,
catalog,
catalogHash,
createdAt)
.constraints(primaryKey(id))
.execute();
LOGGER.info("actor_catalog table created");
ctx.createIndexIfNotExists("actor_catalog_catalog_hash_id_idx").on("actor_catalog", "catalog_hash").execute();
ctx.createIndexIfNotExists("actor_catalog_catalog_hash_id_idx").on(ACTOR_CATALOG, "catalog_hash").execute();
}

private static void createCatalogFetchEvent(final DSLContext ctx) {
Expand All @@ -73,7 +74,7 @@ private static void createCatalogFetchEvent(final DSLContext ctx) {
configHash,
actorVersion)
.constraints(primaryKey(id),
foreignKey(actorCatalogId).references("actor_catalog", "id").onDeleteCascade(),
foreignKey(actorCatalogId).references(ACTOR_CATALOG, "id").onDeleteCascade(),
foreignKey(actorId).references("actor", "id").onDeleteCascade())
.execute();
LOGGER.info("actor_catalog_fetch_event table created");
Expand All @@ -89,7 +90,7 @@ private static void addConnectionTableForeignKey(final DSLContext ctx) {
.dropConstraintIfExists("connection_actor_catalog_id_fk");
ctx.alterTable("connection")
.add(constraint("connection_actor_catalog_id_fk").foreignKey(sourceCatalogId)
.references("actor_catalog", "id").onDeleteCascade())
.references(ACTOR_CATALOG, "id").onDeleteCascade())
.execute();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@
public class V0_35_54_001__ChangeDefaultConnectionName extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_35_54_001__ChangeDefaultConnectionName.class);
private static final String NAME = "name";

public static void defaultConnectionName(final DSLContext ctx) {
LOGGER.info("Updating connection name column");
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.nullable(false));
final Field<String> name = DSL.field("name", SQLDataType.VARCHAR(256).nullable(false));
List<Connection> connections = getConnections(ctx);
final Field<String> name = DSL.field(NAME, SQLDataType.VARCHAR(256).nullable(false));
final List<Connection> connections = getConnections(ctx);

for (final Connection connection : connections) {
final Actor sourceActor = getActor(connection.getSourceId(), ctx);
Expand All @@ -45,12 +46,12 @@ public static void defaultConnectionName(final DSLContext ctx) {

static <T> List<Connection> getConnections(final DSLContext ctx) {
LOGGER.info("Get connections having name default");
final Field<String> name = DSL.field("name", SQLDataType.VARCHAR(36).nullable(false));
final Field<String> name = DSL.field(NAME, SQLDataType.VARCHAR(36).nullable(false));
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.nullable(false));
final Field<UUID> sourceId = DSL.field("source_id", SQLDataType.UUID.nullable(false));
final Field<UUID> destinationId = DSL.field("destination_id", SQLDataType.UUID.nullable(false));

final Field<String> connectionName = DSL.field("name", SQLDataType.VARCHAR(256).nullable(false));
final Field<String> connectionName = DSL.field(NAME, SQLDataType.VARCHAR(256).nullable(false));
final Result<Record> results = ctx.select(asterisk()).from(table("connection")).where(connectionName.eq("default")).fetch();

return results.stream().map(record -> new Connection(
Expand All @@ -62,7 +63,7 @@ static <T> List<Connection> getConnections(final DSLContext ctx) {
}

static <T> Actor getActor(final UUID actorDefinitionId, final DSLContext ctx) {
final Field<String> name = DSL.field("name", SQLDataType.VARCHAR(36).nullable(false));
final Field<String> name = DSL.field(NAME, SQLDataType.VARCHAR(36).nullable(false));
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.nullable(false));

final Result<Record> results = ctx.select(asterisk()).from(table("actor")).where(id.eq(actorDefinitionId)).fetch();
Expand All @@ -83,7 +84,7 @@ public static class Actor {

private final String name;

public <T> Actor(String name) {
public <T> Actor(final String name) {
this.name = name;
}

Expand All @@ -100,7 +101,7 @@ public static class Connection {
private final UUID sourceId;
private final UUID destinationId;

public <T> Connection(String name, UUID id, UUID sourceId, UUID destinationId) {
public <T> Connection(final String name, final UUID id, final UUID sourceId, final UUID destinationId) {
this.name = name;
this.connectionId = id;
this.sourceId = sourceId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
public class V0_35_40_001__MigrateFailureReasonEnumValues extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_35_40_001__MigrateFailureReasonEnumValues.class);
private static final String NULL = "<null>";

@VisibleForTesting
static String OLD_MANUAL_CANCELLATION = "manualCancellation";
Expand Down Expand Up @@ -179,11 +180,11 @@ public String getInternalMessage() {
return internalMessage;
}

public void setInternalMessage(String internalMessage) {
public void setInternalMessage(final String internalMessage) {
this.internalMessage = internalMessage;
}

public FailureReasonForMigration withInternalMessage(String internalMessage) {
public FailureReasonForMigration withInternalMessage(final String internalMessage) {
this.internalMessage = internalMessage;
return this;
}
Expand All @@ -192,11 +193,11 @@ public String getExternalMessage() {
return externalMessage;
}

public void setExternalMessage(String externalMessage) {
public void setExternalMessage(final String externalMessage) {
this.externalMessage = externalMessage;
}

public FailureReasonForMigration withExternalMessage(String externalMessage) {
public FailureReasonForMigration withExternalMessage(final String externalMessage) {
this.externalMessage = externalMessage;
return this;
}
Expand All @@ -205,11 +206,11 @@ public Metadata getMetadata() {
return metadata;
}

public void setMetadata(Metadata metadata) {
public void setMetadata(final Metadata metadata) {
this.metadata = metadata;
}

public FailureReasonForMigration withMetadata(Metadata metadata) {
public FailureReasonForMigration withMetadata(final Metadata metadata) {
this.metadata = metadata;
return this;
}
Expand All @@ -218,11 +219,11 @@ public String getStacktrace() {
return stacktrace;
}

public void setStacktrace(String stacktrace) {
public void setStacktrace(final String stacktrace) {
this.stacktrace = stacktrace;
}

public FailureReasonForMigration withStacktrace(String stacktrace) {
public FailureReasonForMigration withStacktrace(final String stacktrace) {
this.stacktrace = stacktrace;
return this;
}
Expand All @@ -231,11 +232,11 @@ public Boolean getRetryable() {
return retryable;
}

public void setRetryable(Boolean retryable) {
public void setRetryable(final Boolean retryable) {
this.retryable = retryable;
}

public FailureReasonForMigration withRetryable(Boolean retryable) {
public FailureReasonForMigration withRetryable(final Boolean retryable) {
this.retryable = retryable;
return this;
}
Expand All @@ -244,11 +245,11 @@ public Long getTimestamp() {
return timestamp;
}

public void setTimestamp(Long timestamp) {
public void setTimestamp(final Long timestamp) {
this.timestamp = timestamp;
}

public FailureReasonForMigration withTimestamp(Long timestamp) {
public FailureReasonForMigration withTimestamp(final Long timestamp) {
this.timestamp = timestamp;
return this;
}
Expand All @@ -259,35 +260,35 @@ public String toString() {
sb.append(FailureReasonForMigration.class.getName()).append('@').append(Integer.toHexString(System.identityHashCode(this))).append('[');
sb.append("failureOrigin");
sb.append('=');
sb.append(((this.failureOrigin == null) ? "<null>" : this.failureOrigin));
sb.append(((this.failureOrigin == null) ? NULL : this.failureOrigin));
sb.append(',');
sb.append("failureType");
sb.append('=');
sb.append(((this.failureType == null) ? "<null>" : this.failureType));
sb.append(((this.failureType == null) ? NULL : this.failureType));
sb.append(',');
sb.append("internalMessage");
sb.append('=');
sb.append(((this.internalMessage == null) ? "<null>" : this.internalMessage));
sb.append(((this.internalMessage == null) ? NULL : this.internalMessage));
sb.append(',');
sb.append("externalMessage");
sb.append('=');
sb.append(((this.externalMessage == null) ? "<null>" : this.externalMessage));
sb.append(((this.externalMessage == null) ? NULL : this.externalMessage));
sb.append(',');
sb.append("metadata");
sb.append('=');
sb.append(((this.metadata == null) ? "<null>" : this.metadata));
sb.append(((this.metadata == null) ? NULL : this.metadata));
sb.append(',');
sb.append("stacktrace");
sb.append('=');
sb.append(((this.stacktrace == null) ? "<null>" : this.stacktrace));
sb.append(((this.stacktrace == null) ? NULL : this.stacktrace));
sb.append(',');
sb.append("retryable");
sb.append('=');
sb.append(((this.retryable == null) ? "<null>" : this.retryable));
sb.append(((this.retryable == null) ? NULL : this.retryable));
sb.append(',');
sb.append("timestamp");
sb.append('=');
sb.append(((this.timestamp == null) ? "<null>" : this.timestamp));
sb.append(((this.timestamp == null) ? NULL : this.timestamp));
sb.append(',');
if (sb.charAt((sb.length() - 1)) == ',') {
sb.setCharAt((sb.length() - 1), ']');
Expand Down Expand Up @@ -355,11 +356,11 @@ public Boolean getPartialSuccess() {
return partialSuccess;
}

public void setPartialSuccess(Boolean partialSuccess) {
public void setPartialSuccess(final Boolean partialSuccess) {
this.partialSuccess = partialSuccess;
}

public AttemptFailureSummaryForMigration withPartialSuccess(Boolean partialSuccess) {
public AttemptFailureSummaryForMigration withPartialSuccess(final Boolean partialSuccess) {
this.partialSuccess = partialSuccess;
return this;
}
Expand All @@ -370,11 +371,11 @@ public String toString() {
sb.append(AttemptFailureSummaryForMigration.class.getName()).append('@').append(Integer.toHexString(System.identityHashCode(this))).append('[');
sb.append("failures");
sb.append('=');
sb.append(((this.failures == null) ? "<null>" : this.failures));
sb.append(((this.failures == null) ? NULL : this.failures));
sb.append(',');
sb.append("partialSuccess");
sb.append('=');
sb.append(((this.partialSuccess == null) ? "<null>" : this.partialSuccess));
sb.append(((this.partialSuccess == null) ? NULL : this.partialSuccess));
sb.append(',');
if (sb.charAt((sb.length() - 1)) == ',') {
sb.setCharAt((sb.length() - 1), ']');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class MongoUtils {
private static final String NULL_TYPE = "null";
private static final String AIRBYTE_SUFFIX = "_aibyte_transform";
private static final int DISCOVER_LIMIT = 10000;
private static final String ID = "_id";

public static JsonSchemaType getType(final BsonType dataType) {
return switch (dataType) {
Expand Down Expand Up @@ -248,7 +249,7 @@ private static List<String> getFieldsName(final MongoCollection<Document> collec
new Document("$limit", DISCOVER_LIMIT),
new Document("$project", new Document("arrayofkeyvalue", new Document("$objectToArray", "$" + fieldName))),
new Document("$unwind", "$arrayofkeyvalue"),
new Document("$group", new Document("_id", null).append("allkeys", new Document("$addToSet", "$arrayofkeyvalue.k")))));
new Document("$group", new Document(ID, null).append("allkeys", new Document("$addToSet", "$arrayofkeyvalue.k")))));
if (output.cursor().hasNext()) {
return (List) output.cursor().next().get("allkeys");
} else {
Expand All @@ -260,13 +261,13 @@ private static List<String> getTypes(final MongoCollection<Document> collection,
final var fieldName = "$" + name;
final AggregateIterable<Document> output = collection.aggregate(Arrays.asList(
new Document("$limit", DISCOVER_LIMIT),
new Document("$project", new Document("_id", 0).append("fieldType", new Document("$type", fieldName))),
new Document("$group", new Document("_id", new Document("fieldType", "$fieldType"))
new Document("$project", new Document(ID, 0).append("fieldType", new Document("$type", fieldName))),
new Document("$group", new Document(ID, new Document("fieldType", "$fieldType"))
.append("count", new Document("$sum", 1)))));
final var listOfTypes = new ArrayList<String>();
final var cursor = output.cursor();
while (cursor.hasNext()) {
final var type = ((Document) cursor.next().get("_id")).get("fieldType").toString();
final var type = ((Document) cursor.next().get(ID)).get("fieldType").toString();
if (!MISSING_TYPE.equals(type) && !NULL_TYPE.equals(type)) {
listOfTypes.add(type);
}
Expand Down
Loading

0 comments on commit 141cc96

Please sign in to comment.