pg_upgrade: Parallelize subscription check.

This commit makes use of the new task framework in pg_upgrade to
parallelize the part of check_old_cluster_subscription_state() that
verifies each of the subscribed tables is in the 'i' (initialize)
or 'r' (ready) state.  This check will now process multiple
databases concurrently when pg_upgrade's --jobs option is provided
a value greater than 1.

Reviewed-by: Daniel Gustafsson, Ilya Gladyshev
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
This commit is contained in:
Nathan Bossart 2024-09-16 16:10:33 -05:00
parent 6d3d2e8e54
commit 7baa36de58

View File

@ -1905,6 +1905,38 @@ check_old_cluster_for_valid_slots(void)
check_ok(); check_ok();
} }
/*
* Callback function for processing results of query for
* check_old_cluster_subscription_state()'s UpgradeTask. If the query returned
* any rows (i.e., the check failed), write the details to the report file.
*/
static void
process_old_sub_state_check(DbInfo *dbinfo, PGresult *res, void *arg)
{
UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
int ntup = PQntuples(res);
int i_srsubstate = PQfnumber(res, "srsubstate");
int i_subname = PQfnumber(res, "subname");
int i_nspname = PQfnumber(res, "nspname");
int i_relname = PQfnumber(res, "relname");
AssertVariableIsOfType(&process_old_sub_state_check, UpgradeTaskProcessCB);
for (int i = 0; i < ntup; i++)
{
if (report->file == NULL &&
(report->file = fopen_priv(report->path, "w")) == NULL)
pg_fatal("could not open file \"%s\": %m", report->path);
fprintf(report->file, "The table sync state \"%s\" is not allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" relation:\"%s\"\n",
PQgetvalue(res, i, i_srsubstate),
dbinfo->db_name,
PQgetvalue(res, i, i_subname),
PQgetvalue(res, i, i_nspname),
PQgetvalue(res, i, i_relname));
}
}
/* /*
* check_old_cluster_subscription_state() * check_old_cluster_subscription_state()
* *
@ -1915,115 +1947,99 @@ check_old_cluster_for_valid_slots(void)
static void static void
check_old_cluster_subscription_state(void) check_old_cluster_subscription_state(void)
{ {
FILE *script = NULL; UpgradeTask *task = upgrade_task_create();
char output_path[MAXPGPATH]; UpgradeTaskReport report;
const char *query;
PGresult *res;
PGconn *conn;
int ntup; int ntup;
prep_status("Checking for subscription state"); prep_status("Checking for subscription state");
snprintf(output_path, sizeof(output_path), "%s/%s", report.file = NULL;
snprintf(report.path, sizeof(report.path), "%s/%s",
log_opts.basedir, log_opts.basedir,
"subs_invalid.txt"); "subs_invalid.txt");
for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
/*
* Check that all the subscriptions have their respective replication
* origin. This check only needs to run once.
*/
conn = connectToServer(&old_cluster, old_cluster.dbarr.dbs[0].db_name);
res = executeQueryOrDie(conn,
"SELECT d.datname, s.subname "
"FROM pg_catalog.pg_subscription s "
"LEFT OUTER JOIN pg_catalog.pg_replication_origin o "
" ON o.roname = 'pg_' || s.oid "
"INNER JOIN pg_catalog.pg_database d "
" ON d.oid = s.subdbid "
"WHERE o.roname IS NULL;");
ntup = PQntuples(res);
for (int i = 0; i < ntup; i++)
{ {
PGresult *res; if (report.file == NULL &&
DbInfo *active_db = &old_cluster.dbarr.dbs[dbnum]; (report.file = fopen_priv(report.path, "w")) == NULL)
PGconn *conn = connectToServer(&old_cluster, active_db->db_name); pg_fatal("could not open file \"%s\": %m", report.path);
fprintf(report.file, "The replication origin is missing for database:\"%s\" subscription:\"%s\"\n",
/* We need to check for pg_replication_origin only once. */ PQgetvalue(res, i, 0),
if (dbnum == 0) PQgetvalue(res, i, 1));
{
/*
* Check that all the subscriptions have their respective
* replication origin.
*/
res = executeQueryOrDie(conn,
"SELECT d.datname, s.subname "
"FROM pg_catalog.pg_subscription s "
"LEFT OUTER JOIN pg_catalog.pg_replication_origin o "
" ON o.roname = 'pg_' || s.oid "
"INNER JOIN pg_catalog.pg_database d "
" ON d.oid = s.subdbid "
"WHERE o.roname IS NULL;");
ntup = PQntuples(res);
for (int i = 0; i < ntup; i++)
{
if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL)
pg_fatal("could not open file \"%s\": %m", output_path);
fprintf(script, "The replication origin is missing for database:\"%s\" subscription:\"%s\"\n",
PQgetvalue(res, i, 0),
PQgetvalue(res, i, 1));
}
PQclear(res);
}
/*
* We don't allow upgrade if there is a risk of dangling slot or
* origin corresponding to initial sync after upgrade.
*
* A slot/origin not created yet refers to the 'i' (initialize) state,
* while 'r' (ready) state refers to a slot/origin created previously
* but already dropped. These states are supported for pg_upgrade. The
* other states listed below are not supported:
*
* a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state
* would retain a replication slot, which could not be dropped by the
* sync worker spawned after the upgrade because the subscription ID
* used for the slot name won't match anymore.
*
* b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state
* would retain the replication origin when there is a failure in
* tablesync worker immediately after dropping the replication slot in
* the publisher.
*
* c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on
* a relation upgraded while in this state would expect an origin ID
* with the OID of the subscription used before the upgrade, causing
* it to fail.
*
* d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and
* SUBREL_STATE_UNKNOWN: These states are not stored in the catalog,
* so we need not allow these states.
*/
res = executeQueryOrDie(conn,
"SELECT r.srsubstate, s.subname, n.nspname, c.relname "
"FROM pg_catalog.pg_subscription_rel r "
"LEFT JOIN pg_catalog.pg_subscription s"
" ON r.srsubid = s.oid "
"LEFT JOIN pg_catalog.pg_class c"
" ON r.srrelid = c.oid "
"LEFT JOIN pg_catalog.pg_namespace n"
" ON c.relnamespace = n.oid "
"WHERE r.srsubstate NOT IN ('i', 'r') "
"ORDER BY s.subname");
ntup = PQntuples(res);
for (int i = 0; i < ntup; i++)
{
if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL)
pg_fatal("could not open file \"%s\": %m", output_path);
fprintf(script, "The table sync state \"%s\" is not allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" relation:\"%s\"\n",
PQgetvalue(res, i, 0),
active_db->db_name,
PQgetvalue(res, i, 1),
PQgetvalue(res, i, 2),
PQgetvalue(res, i, 3));
}
PQclear(res);
PQfinish(conn);
} }
PQclear(res);
PQfinish(conn);
if (script) /*
* We don't allow upgrade if there is a risk of dangling slot or origin
* corresponding to initial sync after upgrade.
*
* A slot/origin not created yet refers to the 'i' (initialize) state,
* while 'r' (ready) state refers to a slot/origin created previously but
* already dropped. These states are supported for pg_upgrade. The other
* states listed below are not supported:
*
* a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state would
* retain a replication slot, which could not be dropped by the sync
* worker spawned after the upgrade because the subscription ID used for
* the slot name won't match anymore.
*
* b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state would
* retain the replication origin when there is a failure in tablesync
* worker immediately after dropping the replication slot in the
* publisher.
*
* c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on a
* relation upgraded while in this state would expect an origin ID with
* the OID of the subscription used before the upgrade, causing it to
* fail.
*
* d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and
* SUBREL_STATE_UNKNOWN: These states are not stored in the catalog, so we
* need not allow these states.
*/
query = "SELECT r.srsubstate, s.subname, n.nspname, c.relname "
"FROM pg_catalog.pg_subscription_rel r "
"LEFT JOIN pg_catalog.pg_subscription s"
" ON r.srsubid = s.oid "
"LEFT JOIN pg_catalog.pg_class c"
" ON r.srrelid = c.oid "
"LEFT JOIN pg_catalog.pg_namespace n"
" ON c.relnamespace = n.oid "
"WHERE r.srsubstate NOT IN ('i', 'r') "
"ORDER BY s.subname";
upgrade_task_add_step(task, query, process_old_sub_state_check,
true, &report);
upgrade_task_run(task, &old_cluster);
upgrade_task_free(task);
if (report.file)
{ {
fclose(script); fclose(report.file);
pg_log(PG_REPORT, "fatal"); pg_log(PG_REPORT, "fatal");
pg_fatal("Your installation contains subscriptions without origin or having relations not in i (initialize) or r (ready) state.\n" pg_fatal("Your installation contains subscriptions without origin or having relations not in i (initialize) or r (ready) state.\n"
"You can allow the initial sync to finish for all relations and then restart the upgrade.\n" "You can allow the initial sync to finish for all relations and then restart the upgrade.\n"
"A list of the problematic subscriptions is in the file:\n" "A list of the problematic subscriptions is in the file:\n"
" %s", output_path); " %s", report.path);
} }
else else
check_ok(); check_ok();