localbuf: Introduce StartLocalBufferIO()

To initiate IO on a shared buffer we have StartBufferIO(). For temporary table
buffers no similar function exists - likely because the code for that
currently is very simple due to the lack of concurrency.

However, the upcoming AIO support will make it possible to re-encounter a
local buffer, while the buffer already is the target of IO. In that case we
need to wait for already in-progress IO to complete. This commit makes it
easier to add the necessary code, by introducing StartLocalBufferIO().

Reviewed-by: Melanie Plageman <melanieplageman@gmail.com>
Discussion: https://postgr.es/m/CAAKRu_b9anbWzEs5AAF9WCvcEVmgz-1AkHSQ-CLLy-p7WHzvFw@mail.gmail.com
This commit is contained in:
Andres Freund 2025-03-15 12:30:07 -04:00
parent 4b4d33b9ea
commit 771ba90298
3 changed files with 39 additions and 6 deletions

View File

@ -1038,7 +1038,7 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
{ {
/* Simple case for non-shared buffers. */ /* Simple case for non-shared buffers. */
bufHdr = GetLocalBufferDescriptor(-buffer - 1); bufHdr = GetLocalBufferDescriptor(-buffer - 1);
need_to_zero = (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0; need_to_zero = StartLocalBufferIO(bufHdr, true);
} }
else else
{ {
@ -1388,11 +1388,7 @@ static inline bool
WaitReadBuffersCanStartIO(Buffer buffer, bool nowait) WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
{ {
if (BufferIsLocal(buffer)) if (BufferIsLocal(buffer))
{ return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1), true);
BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1);
return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0;
}
else else
return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait); return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
} }

View File

@ -183,6 +183,13 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
instr_time io_start; instr_time io_start;
Page localpage = (char *) LocalBufHdrGetBlock(bufHdr); Page localpage = (char *) LocalBufHdrGetBlock(bufHdr);
/*
* Try to start an I/O operation. There currently are no reasons for
* StartLocalBufferIO to return false, so we raise an error in that case.
*/
if (!StartLocalBufferIO(bufHdr, false))
elog(ERROR, "failed to start write IO on local buffer");
/* Find smgr relation for buffer */ /* Find smgr relation for buffer */
if (reln == NULL) if (reln == NULL)
reln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag), reln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag),
@ -406,11 +413,17 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
PinLocalBuffer(existing_hdr, false); PinLocalBuffer(existing_hdr, false);
buffers[i] = BufferDescriptorGetBuffer(existing_hdr); buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
/*
* Clear the BM_VALID bit, do StartLocalBufferIO() and proceed.
*/
buf_state = pg_atomic_read_u32(&existing_hdr->state); buf_state = pg_atomic_read_u32(&existing_hdr->state);
Assert(buf_state & BM_TAG_VALID); Assert(buf_state & BM_TAG_VALID);
Assert(!(buf_state & BM_DIRTY)); Assert(!(buf_state & BM_DIRTY));
buf_state &= ~BM_VALID; buf_state &= ~BM_VALID;
pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state); pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
/* no need to loop for local buffers */
StartLocalBufferIO(existing_hdr, true);
} }
else else
{ {
@ -425,6 +438,8 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state); pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state);
hresult->id = victim_buf_id; hresult->id = victim_buf_id;
StartLocalBufferIO(victim_buf_hdr, true);
} }
} }
@ -489,6 +504,27 @@ MarkLocalBufferDirty(Buffer buffer)
pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
} }
/*
* Like StartBufferIO, but for local buffers
*/
bool
StartLocalBufferIO(BufferDesc *bufHdr, bool forInput)
{
uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
{
/* someone else already did the I/O */
return false;
}
/* BM_IO_IN_PROGRESS isn't currently used for local buffers */
/* local buffers don't track IO using resowners */
return true;
}
/* /*
* Like TerminateBufferIO, but for local buffers * Like TerminateBufferIO, but for local buffers
*/ */

View File

@ -473,6 +473,7 @@ extern BlockNumber ExtendBufferedRelLocal(BufferManagerRelation bmr,
extern void MarkLocalBufferDirty(Buffer buffer); extern void MarkLocalBufferDirty(Buffer buffer);
extern void TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, extern void TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty,
uint32 set_flag_bits); uint32 set_flag_bits);
extern bool StartLocalBufferIO(BufferDesc *bufHdr, bool forInput);
extern void FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln); extern void FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln);
extern void DropRelationLocalBuffers(RelFileLocator rlocator, extern void DropRelationLocalBuffers(RelFileLocator rlocator,
ForkNumber forkNum, ForkNumber forkNum,