diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c index 1fe15ee8899..c353dba9c70 100644 --- a/src/test/modules/libpq_pipeline/libpq_pipeline.c +++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c @@ -114,6 +114,51 @@ confirm_query_canceled_impl(int line, PGconn *conn) PQconsumeInput(conn); } +/* + * Using monitorConn, query pg_stat_activity to see that the connection with + * the given PID is in the given state. We never stop until it does. + */ +static void +wait_for_connection_state(int line, PGconn *monitorConn, int procpid, char *state) +{ + const Oid paramTypes[] = {INT4OID, TEXTOID}; + const char *paramValues[2]; + char *pidstr = psprintf("%d", procpid); + + paramValues[0] = pidstr; + paramValues[1] = state; + + while (true) + { + PGresult *res; + char *value; + + res = PQexecParams(monitorConn, + "SELECT count(*) FROM pg_stat_activity WHERE " + "pid = $1 AND state = $2", + 2, paramTypes, paramValues, NULL, NULL, 1); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn)); + if (PQntuples(res) != 1) + pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res)); + if (PQnfields(res) != 1) + pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res)); + value = PQgetvalue(res, 0, 0); + if (value[0] != '0') + { + PQclear(res); + break; + } + PQclear(res); + + /* wait 10ms before polling again */ + pg_usleep(10000); + } + + pfree(pidstr); +} + #define send_cancellable_query(conn, monitorConn) \ send_cancellable_query_impl(__LINE__, conn, monitorConn) static void @@ -121,7 +166,13 @@ send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn) { const char *env_wait; const Oid paramTypes[1] = {INT4OID}; - int procpid = PQbackendPID(conn); + + /* + * Wait for the connection to be idle, so that our check for an active + * connection below is reliable, instead of possibly seeing an outdated + * state. + */ + wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "idle"); env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT"); if (env_wait == NULL) @@ -132,41 +183,10 @@ send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn) pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn)); /* - * Wait until the query is actually running. Otherwise sending a - * cancellation request might not cancel the query due to race conditions. + * Wait for the query to start, because if the query is not running yet + * the cancel request that we send won't have any effect. */ - while (true) - { - char *value; - PGresult *res; - const char *paramValues[1]; - char pidval[16]; - - snprintf(pidval, 16, "%d", procpid); - paramValues[0] = pidval; - - res = PQexecParams(monitorConn, - "SELECT count(*) FROM pg_stat_activity WHERE " - "pid = $1 AND state = 'active'", - 1, NULL, paramValues, NULL, NULL, 1); - - if (PQresultStatus(res) != PGRES_TUPLES_OK) - pg_fatal("could not query pg_stat_activity: %s", PQerrorMessage(monitorConn)); - if (PQntuples(res) != 1) - pg_fatal("unexpected number of rows received: %d", PQntuples(res)); - if (PQnfields(res) != 1) - pg_fatal("unexpected number of columns received: %d", PQnfields(res)); - value = PQgetvalue(res, 0, 0); - if (*value != '0') - { - PQclear(res); - break; - } - PQclear(res); - - /* wait 10ms before polling again */ - pg_usleep(10000); - } + wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "active"); } /*