Skip to content

Commit

Permalink
Merge branch 'master' into PRIS-259-airbyte
Browse files Browse the repository at this point in the history
  • Loading branch information
mauricioalarcon authored Jun 30, 2023
2 parents c02b17d + 688d199 commit 578953b
Show file tree
Hide file tree
Showing 33 changed files with 552 additions and 372 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/publish_connectors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ on:
workflow_dispatch:
inputs:
connectors-options:
description: "Options to pass to the 'airbyte-ci connectors' command group"
description: "Options to pass to the 'airbyte-ci connectors' command group."
default: "--name=source-pokeapi"
publish-options:
description: "Options to pass to the 'airbyte-ci connectors publish' command"
description: "Options to pass to the 'airbyte-ci connectors publish' command. Use --pre-release or --main-release depending on whether you want to publish a dev image or not. "
default: "--pre-release"
required: true
jobs:
publish_connectors:
name: Publish connectors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static DestinationConfig getInstance() {
public JsonNode getNodeValue(final String key) {
final JsonNode node = config.root.get(key);
if (node == null) {
LOGGER.warn("Cannot find node with key {} ", key);
LOGGER.debug("Cannot find node with key {} ", key);
}
return node;
}
Expand All @@ -56,7 +56,7 @@ public JsonNode getNodeValue(final String key) {
public String getTextValue(final String key) {
final JsonNode node = getNodeValue(key);
if (node == null || !node.isTextual()) {
LOGGER.warn("Cannot retrieve text value for node with key {}", key);
LOGGER.debug("Cannot retrieve text value for node with key {}", key);
return "";
}
return node.asText();
Expand All @@ -66,7 +66,7 @@ public String getTextValue(final String key) {
public Boolean getBooleanValue(final String key) {
final JsonNode node = getNodeValue(key);
if (node == null || !node.isBoolean()) {
LOGGER.warn("Cannot retrieve boolean value for node with key {}", key);
LOGGER.debug("Cannot retrieve boolean value for node with key {}", key);
return false;
}
return node.asBoolean();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ data:
name: BigQuery (denormalized typed struct)
registries:
cloud:
dockerImageTag: 1.4.1
enabled: true
oss:
dockerImageTag: 1.4.1
enabled: true
releaseStage: beta
resourceRequirements:
Expand Down
4 changes: 0 additions & 4 deletions airbyte-integrations/connectors/source-postgres/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,3 @@ jsonSchema2Pojo {
includeConstructors = false
includeSetters = true
}




Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,23 @@
*/
public class PostgresQueryUtils {

public record TableBlockSize(Long tableSize, Long blockSize) { }

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresQueryUtils.class);

public static final String NULL_CURSOR_VALUE_WITH_SCHEMA_QUERY =
"""
SELECT
(EXISTS (SELECT FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s'))
AND
(EXISTS (SELECT from \"%s\".\"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s
(EXISTS (SELECT from "%s"."%s" where "%s" IS NULL LIMIT 1)) AS %s
""";
public static final String NULL_CURSOR_VALUE_NO_SCHEMA_QUERY =
"""
SELECT
(EXISTS (SELECT FROM information_schema.columns WHERE table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s'))
AND
(EXISTS (SELECT from \"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s
(EXISTS (SELECT from "%s" where "%s" IS NULL LIMIT 1)) AS %s
""";

public static final String TABLE_ESTIMATE_QUERY =
Expand Down Expand Up @@ -79,6 +81,16 @@ SELECT pg_relation_filenode('%s')

public static final String TOTAL_BYTES_RESULT_COL = "totalbytes";

/**
* Query returns the size table data takes on DB server disk (not incling any index or other metadata)
* And the size of each page used in (page, tuple) ctid.
* This helps us evaluate how many pages we need to read to traverse the entire table.
*/
public static final String CTID_TABLE_BLOCK_SIZE =
"""
WITH block_sz AS (SELECT current_setting('block_size')::int), rel_sz AS (select pg_relation_size('%s')) SELECT * from block_sz, rel_sz
""";

/**
* Logs the current xmin status : 1. The number of wraparounds the source DB has undergone. (These
* are the epoch bits in the xmin snapshot). 2. The 32-bit xmin value associated with the xmin
Expand Down Expand Up @@ -126,7 +138,7 @@ public static long fileNodeForStreams(final JdbcDatabase database, final Airbyte
final long relationFilenode = jsonNodes.get(0).get("pg_relation_filenode").asLong();
LOGGER.info("Relation filenode is for stream {} is {}", fullTableName, relationFilenode);
return relationFilenode;
} catch (SQLException e) {
} catch (final SQLException e) {
throw new RuntimeException(e);
}
}
Expand All @@ -150,12 +162,45 @@ public static List<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair>
jsonNodes.get(0).get("phase"));
streamsUnderVacuuming.add(io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair.fromConfiguredAirbyteSteam(stream));
}
} catch (SQLException e) {
} catch (final SQLException e) {
// Assume it's safe to progress and skip relation node and vaccuum validation
LOGGER.warn("Failed to fetch vacuum for table {} info. Going to move ahead with the sync assuming it's safe", fullTableName, e);
}
});
return streamsUnderVacuuming;
}

public static Map<AirbyteStreamNameNamespacePair, TableBlockSize> getTableBlockSizeForStream(final JdbcDatabase database,
final List<ConfiguredAirbyteStream> streams,
final String quoteString) {
final Map<AirbyteStreamNameNamespacePair, TableBlockSize> tableBlockSizes = new HashMap<>();
streams.forEach(stream -> {
final AirbyteStreamNameNamespacePair namespacePair =
new AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace());
final TableBlockSize sz = getTableBlockSizeForStream(database, namespacePair, quoteString);
tableBlockSizes.put(namespacePair, sz);
});
return tableBlockSizes;

}
public static TableBlockSize getTableBlockSizeForStream(final JdbcDatabase database,
final AirbyteStreamNameNamespacePair stream,
final String quoteString) {
try {
final String streamName = stream.getName();
final String schemaName = stream.getNamespace();
final String fullTableName =
getFullyQualifiedTableNameWithQuoting(schemaName, streamName, quoteString);
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(
conn -> conn.prepareStatement(CTID_TABLE_BLOCK_SIZE.formatted(fullTableName)).executeQuery(),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
Preconditions.checkState(jsonNodes.size() == 1);
final long relationSize = jsonNodes.get(0).get("pg_relation_size").asLong();
final long blockSize = jsonNodes.get(0).get("current_setting").asLong();
LOGGER.info("Stream {} relation size is {}. block size {}", fullTableName, relationSize, blockSize);
return new TableBlockSize(relationSize, blockSize);
} catch (final SQLException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils;
import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.SslMode;
import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto;
import io.airbyte.integrations.source.postgres.PostgresQueryUtils.TableBlockSize;
import io.airbyte.integrations.source.postgres.ctid.CtidPostgresSourceOperations;
import io.airbyte.integrations.source.postgres.ctid.CtidStateManager;
import io.airbyte.integrations.source.postgres.ctid.PostgresCtidHandler;
Expand Down Expand Up @@ -480,8 +481,14 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
finalListOfStreamsToBeSyncedViaCtid,
getQuoteString());
final CtidStateManager ctidStateManager = new CtidStateManager(streamsCategorised.ctidStreams().statesFromCtidSync(), fileNodes);
final Map<io.airbyte.protocol.models.AirbyteStreamNameNamespacePair, TableBlockSize> tableBlockSizes =
PostgresQueryUtils.getTableBlockSizeForStream(
database,
finalListOfStreamsToBeSyncedViaCtid,
getQuoteString());

final PostgresCtidHandler ctidHandler = new PostgresCtidHandler(sourceConfig, database, new CtidPostgresSourceOperations(), getQuoteString(),
fileNodes, ctidStateManager,
fileNodes, tableBlockSizes, ctidStateManager,
namespacePair -> Jsons.jsonNode(xminStatus),
(namespacePair, jsonState) -> XminStateManager.getAirbyteStateMessage(namespacePair, Jsons.object(jsonState, XminStatus.class)));
ctidIterator.addAll(ctidHandler.getIncrementalIterators(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.airbyte.integrations.source.postgres.ctid;

import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Class represents a postgres ctid record in the form of "(number,number)"
* Used to simplify code dealing with ctid calculations.
*/
public class Ctid {

final Long page;
final Long tuple;

public static Ctid of(final long page, final long tuple) {
return new Ctid(page, tuple);
}

public static Ctid of(final String ctid) {
return new Ctid(ctid);
}

Ctid(final long page, final long tuple) {
this.page = page;
this.tuple = tuple;
}

Ctid(final String ctid) {
final Pattern p = Pattern.compile("\\d+");
final Matcher m = p.matcher(ctid);
if (!m.find()) {
throw new IllegalArgumentException("Invalid ctid format");
}
final String ctidPageStr = m.group();
this.page = Long.parseLong(ctidPageStr);

if (!m.find()) {
throw new IllegalArgumentException("Invalid ctid format");
}
final String ctidTupleStr = m.group();
this.tuple = Long.parseLong(ctidTupleStr);

Objects.requireNonNull(this.page);
Objects.requireNonNull(this.tuple);
}

@Override
public String toString() {
return "(%d,%d)".formatted(page, tuple);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final Ctid ctid = (Ctid) o;
return Objects.equals(page, ctid.page) && Objects.equals(tuple, ctid.tuple);
}

@Override
public int hashCode() {
return Objects.hash(page, tuple);
}
}
Loading

0 comments on commit 578953b

Please sign in to comment.