diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c index c914c26149f..4d045a52b0e 100644 --- a/src/bin/pg_dump/parallel.c +++ b/src/bin/pg_dump/parallel.c @@ -50,7 +50,7 @@ * WRKR_IDLE: it's waiting for a command * WRKR_WORKING: it's been sent a command * WRKR_FINISHED: it's returned a result - * WRKR_TERMINATED: process ended + * WRKR_TERMINATED: process ended (or not started yet) * The FINISHED state indicates that the worker is idle, but we've not yet * dealt with the status code it returned from the prior command. * ReapWorkerStatus() extracts the unhandled command status value and sets @@ -381,7 +381,9 @@ ShutdownWorkersHard(ParallelState *pstate) /* * Close our write end of the sockets so that any workers waiting for - * commands know they can exit. + * commands know they can exit. (Note: some of the pipeWrite fields might + * still be zero, if we failed to initialize all the workers. Hence, just + * ignore errors here.) */ for (i = 0; i < pstate->numWorkers; i++) closesocket(pstate->parallelSlot[i].pipeWrite); @@ -455,7 +457,7 @@ WaitForTerminatingWorkers(ParallelState *pstate) for (j = 0; j < pstate->numWorkers; j++) { - if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED) + if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus)) { lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread; nrun++; @@ -890,6 +892,7 @@ ParallelBackupStart(ArchiveHandle *AH) if (AH->public.numWorkers == 1) return pstate; + /* Create status array, being sure to initialize all fields to 0 */ pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize); memset((void *) pstate->parallelSlot, 0, slotSize); @@ -931,17 +934,16 @@ ParallelBackupStart(ArchiveHandle *AH) int pipeMW[2], pipeWM[2]; + slot->args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs)); + slot->args->AH = NULL; + slot->args->te = NULL; + /* Create communication pipes for this worker */ if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0) exit_horribly(modulename, "could not create communication channels: %s\n", strerror(errno)); - slot->workerStatus = WRKR_IDLE; - slot->args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs)); - slot->args->AH = NULL; - slot->args->te = NULL; - /* master's ends of the pipes */ slot->pipeRead = pipeWM[PIPE_READ]; slot->pipeWrite = pipeMW[PIPE_WRITE]; @@ -959,6 +961,7 @@ ParallelBackupStart(ArchiveHandle *AH) handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32, wi, 0, &(slot->threadId)); slot->hThread = handle; + slot->workerStatus = WRKR_IDLE; #else /* !WIN32 */ pid = fork(); if (pid == 0) @@ -1003,6 +1006,7 @@ ParallelBackupStart(ArchiveHandle *AH) /* In Master after successful fork */ slot->pid = pid; + slot->workerStatus = WRKR_IDLE; /* close read end of Master -> Worker */ closesocket(pipeMW[PIPE_READ]); @@ -1119,7 +1123,7 @@ GetIdleWorker(ParallelState *pstate) } /* - * Return true iff every worker is in the WRKR_TERMINATED state. + * Return true iff no worker is running. */ static bool HasEveryWorkerTerminated(ParallelState *pstate) @@ -1128,7 +1132,7 @@ HasEveryWorkerTerminated(ParallelState *pstate) for (i = 0; i < pstate->numWorkers; i++) { - if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED) + if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus)) return false; } return true; @@ -1528,7 +1532,7 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker) FD_ZERO(&workerset); for (i = 0; i < pstate->numWorkers; i++) { - if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED) + if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus)) continue; FD_SET(pstate->parallelSlot[i].pipeRead, &workerset); if (pstate->parallelSlot[i].pipeRead > maxFd) @@ -1553,6 +1557,8 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker) { char *msg; + if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus)) + continue; if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset)) continue; diff --git a/src/bin/pg_dump/parallel.h b/src/bin/pg_dump/parallel.h index 21739ca87c1..e7e6bd1be6a 100644 --- a/src/bin/pg_dump/parallel.h +++ b/src/bin/pg_dump/parallel.h @@ -29,6 +29,9 @@ typedef enum WRKR_FINISHED } T_WorkerStatus; +#define WORKER_IS_RUNNING(workerStatus) \ + ((workerStatus) != WRKR_TERMINATED) + /* Arguments needed for a worker process */ typedef struct ParallelArgs {