Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cloning with extensions in non default schema #717

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 73 additions & 13 deletions src/bin/pgcopydb/catalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ static char *sourceDBcreateDDLs[] = {

"create index s_d_p_oid on s_database_property(datname)",

"create table s_namespace("
" nspname text primary key, restore_list_name text"
")",

"create index s_n_rlname on s_namespace(restore_list_name)",

"create table s_table("
" oid integer primary key, "
" datname text, qname text, nspname text, relname text, amname text, "
Expand Down Expand Up @@ -1674,6 +1680,11 @@ CopyDataSectionToString(CopyDataSection section)
return "all";
}

case DATA_SECTION_NAMESPACES:
{
return "namespaces";
}

case DATA_SECTION_NONE:
default:
{
Expand Down Expand Up @@ -4836,11 +4847,6 @@ catalog_prepare_filter(DatabaseCatalog *catalog,
char *sql =
"insert into filter(oid, restore_list_name, kind) "

" select oid, restore_list_name, 's_namespace' "
" from s_namespace "

" union all "

" select oid, restore_list_name, 'table' "
" from s_table "

Expand Down Expand Up @@ -5952,25 +5958,25 @@ catalog_add_s_namespace(DatabaseCatalog *catalog, SourceSchema *namespace)


/*
* catalog_lookup_s_namespace_by_rlname fetches a entry from our catalogs.
* catalog_lookup_s_namespace_by_nspname fetches a s_namespace entry from our catalogs.
*/
bool
catalog_lookup_s_namespace_by_rlname(DatabaseCatalog *catalog,
const char *restoreListName,
SourceSchema *result)
catalog_lookup_s_namespace_by_nspname(DatabaseCatalog *catalog,
const char *nspname,
SourceSchema *result)
{
sqlite3 *db = catalog->db;

if (db == NULL)
{
log_error("BUG: catalog_lookup_s_namespace_by_rlname: db is NULL");
log_error("BUG: catalog_lookup_s_namespace_by_nspname: db is NULL");
return false;
}

char *sql =
" select oid, nspname, restore_list_name "
" from s_namespace "
" where restore_list_name = $1 ";
" where nspname = $1 ";

SQLiteQuery query = {
.context = result,
Expand All @@ -5985,8 +5991,62 @@ catalog_lookup_s_namespace_by_rlname(DatabaseCatalog *catalog,

/* bind our parameters now */
BindParam params[1] = {
{ BIND_PARAMETER_TYPE_TEXT, "restore_list_name", 0,
(char *) restoreListName },
{ BIND_PARAMETER_TYPE_TEXT, "nspname", 0,
(char *) nspname },
};

if (!catalog_sql_bind(&query, params, 1))
{
/* errors have already been logged */
return false;
}

/* now execute the query, which return exactly one row */
if (!catalog_sql_execute_once(&query))
{
/* errors have already been logged */
return false;
}

return true;
}


/*
* catalog_lookup_s_namespace_by_oid fetches a s_namespace entry from our catalogs using the oid.
*/
bool
catalog_lookup_s_namespace_by_oid(DatabaseCatalog *catalog,
uint32_t oid,
SourceSchema *result)
{
sqlite3 *db = catalog->db;

if (db == NULL)
{
log_error("BUG: catalog_lookup_s_namespace_by_oid: db is NULL");
return false;
}

char *sql =
" select oid, nspname, restore_list_name "
" from s_namespace "
" where oid = $1 ";

SQLiteQuery query = {
.context = result,
.fetchFunction = &catalog_s_namespace_fetch
};

if (!catalog_sql_prepare(db, sql, &query))
{
/* errors have already been logged */
return false;
}

/* bind our parameters now */
BindParam params[1] = {
{ BIND_PARAMETER_TYPE_INT64, "oid", oid, NULL },
};

if (!catalog_sql_bind(&query, params, 1))
Expand Down
10 changes: 6 additions & 4 deletions src/bin/pgcopydb/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -452,10 +452,12 @@ bool catalog_s_database_guc_fetch(SQLiteQuery *query);
* Namespaces
*/
bool catalog_add_s_namespace(DatabaseCatalog * catalog, SourceSchema *namespace);

bool catalog_lookup_s_namespace_by_rlname(DatabaseCatalog *catalog,
const char *restoreListName,
SourceSchema *result);
bool catalog_lookup_s_namespace_by_oid(DatabaseCatalog *catalog,
uint32_t oid,
SourceSchema *result);
bool catalog_lookup_s_namespace_by_nspname(DatabaseCatalog *catalog,
const char *nspname,
SourceSchema *result);

bool catalog_s_namespace_fetch(SQLiteQuery *query);

Expand Down
3 changes: 2 additions & 1 deletion src/bin/pgcopydb/copydb.h
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,12 @@ bool copydb_objectid_is_filtered_out(CopyDataSpec *specs,

bool copydb_prepare_table_specs(CopyDataSpec *specs, PGSQL *pgsql);
bool copydb_prepare_index_specs(CopyDataSpec *specs, PGSQL *pgsql);
bool copydb_prepare_namespace_specs(CopyDataSpec *specs, PGSQL *pgsql);
bool copydb_fetch_filtered_oids(CopyDataSpec *specs, PGSQL *pgsql);

bool copydb_prepare_target_catalog(CopyDataSpec *specs);
bool copydb_schema_already_exists(CopyDataSpec *specs,
const char *restoreListName,
uint32_t sourceOid,
bool *exists);

/* table-data.c */
Expand Down
76 changes: 70 additions & 6 deletions src/bin/pgcopydb/copydb_schema.c
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ copydb_fetch_source_catalog_setup(CopyDataSpec *specs)

/* compute "allDone" in the context of a sourceDB */
if (s->section == DATA_SECTION_DATABASE_PROPERTIES ||
s->section == DATA_SECTION_NAMESPACES ||
s->section == DATA_SECTION_TABLE_DATA ||
s->section == DATA_SECTION_SET_SEQUENCES ||
s->section == DATA_SECTION_INDEXES ||
Expand Down Expand Up @@ -408,6 +409,40 @@ copydb_fetch_previous_run_state(CopyDataSpec *specs)
}


/*
* Retrieve a list of namespaces from the source databases and store them
* in the source catalog and register the section as fetched.
*/
bool
copydb_prepare_namespace_specs(CopyDataSpec *specs, PGSQL *pgsql)
{
DatabaseCatalog *sourceDB = &(specs->catalogs.source);

TopLevelTiming timing = {
.label = CopyDataSectionToString(DATA_SECTION_NAMESPACES)
};

(void) catalog_start_timing(&timing);

if (!schema_list_schemas(pgsql, sourceDB))
{
log_error("Failed to prepare namespace specs in our catalogs, "
"see above for details");
return false;
}

(void) catalog_stop_timing(&timing);

if (!catalog_register_section(sourceDB, &timing))
{
/* errors have already been logged */
return false;
}

return true;
}


/*
* copydb_fetch_source_schema is a utility function for the previous definition
* copydb_fetch_schema_and_prepare_specs.
Expand Down Expand Up @@ -548,6 +583,17 @@ copydb_fetch_source_schema(CopyDataSpec *specs, PGSQL *src)
}
}

if ((specs->section == DATA_SECTION_ALL ||
specs->section == DATA_SECTION_NAMESPACES) &&
!sourceDB->sections[DATA_SECTION_NAMESPACES].fetched)
{
if (!copydb_prepare_namespace_specs(specs, src))
{
/* errors have already been logged */
return false;
}
}

/* now update --split-tables-larger-than and target pguri */
if (!catalog_update_setup(specs))
{
Expand Down Expand Up @@ -1332,21 +1378,39 @@ copydb_prepare_target_catalog(CopyDataSpec *specs)
*/
bool
copydb_schema_already_exists(CopyDataSpec *specs,
const char *restoreListName,
uint32_t sourceOid,
bool *exists)
{
DatabaseCatalog *sourceDB = &(specs->catalogs.source);
SourceSchema sourceResults = { 0 };

if (!catalog_lookup_s_namespace_by_oid(sourceDB,
sourceOid,
&sourceResults))
{
/* errors have already been logged */
return false;
}

if (sourceResults.oid == 0)
{
log_error("Failed to find schema with oid %d in source database",
sourceOid);
return false;
}

DatabaseCatalog *targetDB = &(specs->catalogs.target);
SourceSchema schema = { 0 };
SourceSchema targetResults = { 0 };

if (!catalog_lookup_s_namespace_by_rlname(targetDB,
restoreListName,
&schema))
if (!catalog_lookup_s_namespace_by_nspname(targetDB,
sourceResults.nspname,
&targetResults))
{
/* errors have already been logged */
return false;
}

*exists = (schema.oid != 0);
*exists = (targetResults.oid != 0);

return true;
}
2 changes: 1 addition & 1 deletion src/bin/pgcopydb/dump_restore.c
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ copydb_write_restore_list(CopyDataSpec *specs, PostgresDumpSection section)
{
bool exists = false;

if (!copydb_schema_already_exists(specs, name, &exists))
if (!copydb_schema_already_exists(specs, oid, &exists))
{
log_error("Failed to check if restore name \"%s\" "
"already exists",
Expand Down
44 changes: 0 additions & 44 deletions src/bin/pgcopydb/pgcmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -451,50 +451,6 @@ pg_dump_db(PostgresPaths *pgPaths,
char *extNamespaces[PG_CMD_MAX_ARG];
int extNamespaceCount = 0;

/* now --exclude-schema for extension's own schemas */
DumpExtensionNamespaceContext context = {
.extNamespaces = extNamespaces,
.extNamespaceCount = &extNamespaceCount,
};

if (!catalog_begin(filtersDB, false))
{
/* errors have already been logged */
return false;
}

if (!catalog_iter_s_extension(filtersDB,
&context,
&pg_dump_db_extension_namespace_hook))
{
log_error("Failed to prepare pg_dump command line arguments, "
"see above for details");
return false;
}

if (!catalog_commit(filtersDB))
{
/* errors have already been logged */
return false;
}

for (int i = 0; i < extNamespaceCount; i++)
{
/* check that we still have room for --exclude-schema args */
if (PG_CMD_MAX_ARG < (argsIndex + 2))
{
log_error("Failed to call pg_dump, "
"too many schema are excluded: "
"argsIndex %d > %d",
argsIndex + 2,
PG_CMD_MAX_ARG);
return false;
}

args[argsIndex++] = "--exclude-schema";
args[argsIndex++] = extNamespaces[i];
}

args[argsIndex++] = "--file";
args[argsIndex++] = (char *) filename;
args[argsIndex++] = (char *) connStrings->safeSourcePGURI.pguri;
Expand Down
3 changes: 2 additions & 1 deletion src/bin/pgcopydb/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ typedef enum
DATA_SECTION_FILTERS,
DATA_SECTION_BLOBS,
DATA_SECTION_VACUUM,
DATA_SECTION_ALL
DATA_SECTION_NAMESPACES,
DATA_SECTION_ALL,
} CopyDataSection;

#define DATA_SECTION_COUNT (DATA_SECTION_ALL + 1)
Expand Down
17 changes: 17 additions & 0 deletions tests/pagila/copydb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,26 @@ EOF
PAGILA_SOURCE_PGURI="postgres://pagila:0wn3d@source/pagila"
PAGILA_TARGET_PGURI="postgres://pagila:0wn3d@target/pagila"

# Create extensions in the default and alternative schemas to always
# ensure these copy as expected.
psql -d ${PAGILA_SOURCE_PGURI} <<EOF
create extension ltree;
create extension hstore;
create schema foo;
create extension intarray schema foo cascade;
create schema bar;
create extension pg_trgm schema bar cascade;
create schema "Foo Bar";
create extension pgcrypto schema "Foo Bar" cascade;
create schema "With a space";
create extension seg schema "With a space" cascade;
EOF

# Create a couple of schemas in the target database
# to ensure they get skipped during the clone
psql -d ${PAGILA_TARGET_PGURI} <<EOF
create schema bar;
create schema "With a space";
EOF

grep -v "OWNER TO postgres" /usr/src/pagila/pagila-schema.sql > /tmp/pagila-schema.sql
Expand Down
Loading