Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing vulnerabilities for source-salesforce - Premium support #27899

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/publish_connectors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ on:
workflow_dispatch:
inputs:
connectors-options:
description: "Options to pass to the 'airbyte-ci connectors' command group"
description: "Options to pass to the 'airbyte-ci connectors' command group."
default: "--name=source-pokeapi"
publish-options:
description: "Options to pass to the 'airbyte-ci connectors publish' command"
description: "Options to pass to the 'airbyte-ci connectors publish' command. Use --pre-release or --main-release depending on whether you want to publish a dev image or not. "
default: "--pre-release"
required: true
jobs:
publish_connectors:
name: Publish connectors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static DestinationConfig getInstance() {
public JsonNode getNodeValue(final String key) {
final JsonNode node = config.root.get(key);
if (node == null) {
LOGGER.warn("Cannot find node with key {} ", key);
LOGGER.debug("Cannot find node with key {} ", key);
}
return node;
}
Expand All @@ -56,7 +56,7 @@ public JsonNode getNodeValue(final String key) {
public String getTextValue(final String key) {
final JsonNode node = getNodeValue(key);
if (node == null || !node.isTextual()) {
LOGGER.warn("Cannot retrieve text value for node with key {}", key);
LOGGER.debug("Cannot retrieve text value for node with key {}", key);
return "";
}
return node.asText();
Expand All @@ -66,7 +66,7 @@ public String getTextValue(final String key) {
public Boolean getBooleanValue(final String key) {
final JsonNode node = getNodeValue(key);
if (node == null || !node.isBoolean()) {
LOGGER.warn("Cannot retrieve boolean value for node with key {}", key);
LOGGER.debug("Cannot retrieve boolean value for node with key {}", key);
return false;
}
return node.asBoolean();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ data:
name: BigQuery (denormalized typed struct)
registries:
cloud:
dockerImageTag: 1.4.1
enabled: true
oss:
dockerImageTag: 1.4.1
enabled: true
releaseStage: beta
resourceRequirements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,3 @@ jsonSchema2Pojo {
includeConstructors = false
includeSetters = true
}




Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,23 @@
*/
public class PostgresQueryUtils {

public record TableBlockSize(Long tableSize, Long blockSize) { }

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresQueryUtils.class);

public static final String NULL_CURSOR_VALUE_WITH_SCHEMA_QUERY =
"""
SELECT
(EXISTS (SELECT FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s'))
AND
(EXISTS (SELECT from \"%s\".\"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s
(EXISTS (SELECT from "%s"."%s" where "%s" IS NULL LIMIT 1)) AS %s
""";
public static final String NULL_CURSOR_VALUE_NO_SCHEMA_QUERY =
"""
SELECT
(EXISTS (SELECT FROM information_schema.columns WHERE table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s'))
AND
(EXISTS (SELECT from \"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s
(EXISTS (SELECT from "%s" where "%s" IS NULL LIMIT 1)) AS %s
""";

public static final String TABLE_ESTIMATE_QUERY =
Expand Down Expand Up @@ -79,6 +81,16 @@ SELECT pg_relation_filenode('%s')

public static final String TOTAL_BYTES_RESULT_COL = "totalbytes";

/**
* Query returns the size table data takes on DB server disk (not incling any index or other metadata)
* And the size of each page used in (page, tuple) ctid.
* This helps us evaluate how many pages we need to read to traverse the entire table.
*/
public static final String CTID_TABLE_BLOCK_SIZE =
"""
WITH block_sz AS (SELECT current_setting('block_size')::int), rel_sz AS (select pg_relation_size('%s')) SELECT * from block_sz, rel_sz
""";

/**
* Logs the current xmin status : 1. The number of wraparounds the source DB has undergone. (These
* are the epoch bits in the xmin snapshot). 2. The 32-bit xmin value associated with the xmin
Expand Down Expand Up @@ -126,7 +138,7 @@ public static long fileNodeForStreams(final JdbcDatabase database, final Airbyte
final long relationFilenode = jsonNodes.get(0).get("pg_relation_filenode").asLong();
LOGGER.info("Relation filenode is for stream {} is {}", fullTableName, relationFilenode);
return relationFilenode;
} catch (SQLException e) {
} catch (final SQLException e) {
throw new RuntimeException(e);
}
}
Expand All @@ -150,12 +162,45 @@ public static List<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair>
jsonNodes.get(0).get("phase"));
streamsUnderVacuuming.add(io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair.fromConfiguredAirbyteSteam(stream));
}
} catch (SQLException e) {
} catch (final SQLException e) {
// Assume it's safe to progress and skip relation node and vaccuum validation
LOGGER.warn("Failed to fetch vacuum for table {} info. Going to move ahead with the sync assuming it's safe", fullTableName, e);
}
});
return streamsUnderVacuuming;
}

public static Map<AirbyteStreamNameNamespacePair, TableBlockSize> getTableBlockSizeForStream(final JdbcDatabase database,
final List<ConfiguredAirbyteStream> streams,
final String quoteString) {
final Map<AirbyteStreamNameNamespacePair, TableBlockSize> tableBlockSizes = new HashMap<>();
streams.forEach(stream -> {
final AirbyteStreamNameNamespacePair namespacePair =
new AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace());
final TableBlockSize sz = getTableBlockSizeForStream(database, namespacePair, quoteString);
tableBlockSizes.put(namespacePair, sz);
});
return tableBlockSizes;

}
public static TableBlockSize getTableBlockSizeForStream(final JdbcDatabase database,
final AirbyteStreamNameNamespacePair stream,
final String quoteString) {
try {
final String streamName = stream.getName();
final String schemaName = stream.getNamespace();
final String fullTableName =
getFullyQualifiedTableNameWithQuoting(schemaName, streamName, quoteString);
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(
conn -> conn.prepareStatement(CTID_TABLE_BLOCK_SIZE.formatted(fullTableName)).executeQuery(),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
Preconditions.checkState(jsonNodes.size() == 1);
final long relationSize = jsonNodes.get(0).get("pg_relation_size").asLong();
final long blockSize = jsonNodes.get(0).get("current_setting").asLong();
LOGGER.info("Stream {} relation size is {}. block size {}", fullTableName, relationSize, blockSize);
return new TableBlockSize(relationSize, blockSize);
} catch (final SQLException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils;
import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.SslMode;
import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto;
import io.airbyte.integrations.source.postgres.PostgresQueryUtils.TableBlockSize;
import io.airbyte.integrations.source.postgres.ctid.CtidPostgresSourceOperations;
import io.airbyte.integrations.source.postgres.ctid.CtidStateManager;
import io.airbyte.integrations.source.postgres.ctid.PostgresCtidHandler;
Expand Down Expand Up @@ -480,8 +481,14 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
finalListOfStreamsToBeSyncedViaCtid,
getQuoteString());
final CtidStateManager ctidStateManager = new CtidStateManager(streamsCategorised.ctidStreams().statesFromCtidSync(), fileNodes);
final Map<io.airbyte.protocol.models.AirbyteStreamNameNamespacePair, TableBlockSize> tableBlockSizes =
PostgresQueryUtils.getTableBlockSizeForStream(
database,
finalListOfStreamsToBeSyncedViaCtid,
getQuoteString());

final PostgresCtidHandler ctidHandler = new PostgresCtidHandler(sourceConfig, database, new CtidPostgresSourceOperations(), getQuoteString(),
fileNodes, ctidStateManager,
fileNodes, tableBlockSizes, ctidStateManager,
namespacePair -> Jsons.jsonNode(xminStatus),
(namespacePair, jsonState) -> XminStateManager.getAirbyteStateMessage(namespacePair, Jsons.object(jsonState, XminStatus.class)));
ctidIterator.addAll(ctidHandler.getIncrementalIterators(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.airbyte.integrations.source.postgres.ctid;

import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Class represents a postgres ctid record in the form of "(number,number)"
* Used to simplify code dealing with ctid calculations.
*/
public class Ctid {

final Long page;
final Long tuple;

public static Ctid of(final long page, final long tuple) {
return new Ctid(page, tuple);
}

public static Ctid of(final String ctid) {
return new Ctid(ctid);
}

Ctid(final long page, final long tuple) {
this.page = page;
this.tuple = tuple;
}

Ctid(final String ctid) {
final Pattern p = Pattern.compile("\\d+");
final Matcher m = p.matcher(ctid);
if (!m.find()) {
throw new IllegalArgumentException("Invalid ctid format");
}
final String ctidPageStr = m.group();
this.page = Long.parseLong(ctidPageStr);

if (!m.find()) {
throw new IllegalArgumentException("Invalid ctid format");
}
final String ctidTupleStr = m.group();
this.tuple = Long.parseLong(ctidTupleStr);

Objects.requireNonNull(this.page);
Objects.requireNonNull(this.tuple);
}

@Override
public String toString() {
return "(%d,%d)".formatted(page, tuple);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final Ctid ctid = (Ctid) o;
return Objects.equals(page, ctid.page) && Objects.equals(tuple, ctid.tuple);
}

@Override
public int hashCode() {
return Objects.hash(page, tuple);
}
}
Loading