Skip to content

Commit

Permalink
Fix remove tables trait to handle CDC tables
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamVyborny committed Sep 16, 2024
1 parent 2093f5c commit f8ab543
Showing 1 changed file with 49 additions and 11 deletions.
60 changes: 49 additions & 11 deletions tests/traits/RemoveAllTablesTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Keboola\DbExtractor\TraitTests;

use PDO;
use PDOException;

trait RemoveAllTablesTrait
{
Expand All @@ -16,23 +17,60 @@ protected function removeAllTables(): void
{
$this->removeAllFkConstraints();

// Delete all tables, except sys tables
$sql = 'SELECT * FROM information_schema.tables';
// Delete all tables, excluding system tables (sys, cdc)
$sql = "
SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE
FROM information_schema.tables
WHERE TABLE_SCHEMA NOT IN ('sys', 'cdc')
AND TABLE_NAME NOT LIKE 'sys%'
AND TABLE_TYPE = 'BASE TABLE'
";

/** @var \PDOStatement $stmt */
$stmt = $this->connection->query($sql);
/** @var array $tables */
$tables = $stmt->fetchAll(PDO::FETCH_ASSOC);

foreach ($tables as $table) {
$statement = 'DROP TABLE';
if ($table['TABLE_TYPE'] === 'VIEW') {
$statement = 'DROP VIEW';
$schema = $this->quoteIdentifier($table['TABLE_SCHEMA']);
$tableName = $this->quoteIdentifier($table['TABLE_NAME']);

// Check if CDC is enabled by querying the system CDC tables directly
$cdcEnabled = false;
$captureInstance = null;
try {
$cdcCheckSql = sprintf(
"SELECT capture_instance FROM cdc.change_tables WHERE source_object_id = OBJECT_ID('%s.%s')",
$table['TABLE_SCHEMA'],
$table['TABLE_NAME'],
);
$cdcCheckStmt = $this->connection->query($cdcCheckSql);
$cdcRow = $cdcCheckStmt->fetch(PDO::FETCH_ASSOC);
if (is_array($cdcRow) && isset($cdcRow['capture_instance'])) {
$cdcEnabled = true;
$captureInstance = $cdcRow['capture_instance'];
}
} catch (PDOException $e) {
// Ignore error if CDC is not enabled
if (!str_contains($e->getMessage(), 'Invalid object name')) {
throw $e;
}
}
$this->connection->query(sprintf(
'%s %s.%s',
$statement,
$this->quoteIdentifier($table['TABLE_SCHEMA']),
$this->quoteIdentifier($table['TABLE_NAME']),
));

// If CDC is enabled, disable it before dropping the table
if ($cdcEnabled && $captureInstance) {
$disableCdcSql = sprintf(
"EXEC sys.sp_cdc_disable_table @source_schema = '%s', @source_name = '%s', " .
"@capture_instance = '%s'",
$table['TABLE_SCHEMA'],
$table['TABLE_NAME'],
$captureInstance,
);
$this->connection->exec($disableCdcSql);
}

// Drop the table
$this->connection->query(sprintf('DROP TABLE %s.%s', $schema, $tableName));
}
}

Expand Down

0 comments on commit f8ab543

Please sign in to comment.