diff --git a/src/backend/storage/lmgr/spin.c b/src/backend/storage/lmgr/spin.c index dfa47a723fa..7c381fe2ae1 100644 --- a/src/backend/storage/lmgr/spin.c +++ b/src/backend/storage/lmgr/spin.c @@ -31,8 +31,24 @@ #ifndef HAVE_SPINLOCKS + +/* + * No TAS, so spinlocks are implemented as PGSemaphores. + */ + +#ifndef HAVE_ATOMICS +#define NUM_EMULATION_SEMAPHORES (NUM_SPINLOCK_SEMAPHORES + NUM_ATOMICS_SEMAPHORES) +#else +#define NUM_EMULATION_SEMAPHORES (NUM_SPINLOCK_SEMAPHORES) +#endif /* DISABLE_ATOMICS */ + PGSemaphore SpinlockSemaArray; -#endif + +#else /* !HAVE_SPINLOCKS */ + +#define NUM_EMULATION_SEMAPHORES 0 + +#endif /* HAVE_SPINLOCKS */ /* * Report the amount of shared memory needed to store semaphores for spinlock @@ -41,34 +57,19 @@ PGSemaphore SpinlockSemaArray; Size SpinlockSemaSize(void) { - return SpinlockSemas() * sizeof(PGSemaphoreData); + return NUM_EMULATION_SEMAPHORES * sizeof(PGSemaphoreData); } -#ifdef HAVE_SPINLOCKS - /* * Report number of semaphores needed to support spinlocks. */ int SpinlockSemas(void) { - return 0; + return NUM_EMULATION_SEMAPHORES; } -#else /* !HAVE_SPINLOCKS */ -/* - * No TAS, so spinlocks are implemented as PGSemaphores. - */ - - -/* - * Report number of semaphores needed to support spinlocks. - */ -int -SpinlockSemas(void) -{ - return NUM_SPINLOCK_SEMAPHORES + NUM_ATOMICS_SEMAPHORES; -} +#ifndef HAVE_SPINLOCKS /* * Initialize semaphores. @@ -85,20 +86,68 @@ SpinlockSemaInit(PGSemaphore spinsemas) } /* - * s_lock.h hardware-spinlock emulation + * s_lock.h hardware-spinlock emulation using semaphores + * + * We map all spinlocks onto NUM_EMULATION_SEMAPHORES semaphores. It's okay to + * map multiple spinlocks onto one semaphore because no process should ever + * hold more than one at a time. We just need enough semaphores so that we + * aren't adding too much extra contention from that. + * + * There is one exception to the restriction of only holding one spinlock at a + * time, which is that it's ok if emulated atomic operations are nested inside + * spinlocks. To avoid the danger of spinlocks and atomic using the same sema, + * we make sure "normal" spinlocks and atomics backed by spinlocks use + * distinct semaphores (see the nested argument to s_init_lock_sema). + * + * slock_t is just an int for this implementation; it holds the spinlock + * number from 0..(NUM_EMULATION_SEMAPHORES - 1). */ +static inline void +s_check_valid(int lockndx) +{ + if (lockndx < 0 || lockndx >= NUM_EMULATION_SEMAPHORES) + elog(ERROR, "invalid spinlock number2: %d", lockndx); +} + void s_init_lock_sema(volatile slock_t *lock, bool nested) { static uint32 counter = 0; + uint32 offset; + uint32 sema_total; + uint32 idx; - *lock = (++counter) % NUM_SPINLOCK_SEMAPHORES; + if (nested) + { + /* + * To allow nesting atomics inside spinlocked sections, use a + * different spinlock. See comment above. + */ + offset = NUM_SPINLOCK_SEMAPHORES; + sema_total = NUM_ATOMICS_SEMAPHORES; + } + else + { + offset = 0; + sema_total = NUM_SPINLOCK_SEMAPHORES; + } + + idx = (counter++ % sema_total) + offset; + + /* double check we did things correctly */ + s_check_valid(idx); + + *lock = idx; } void s_unlock_sema(volatile slock_t *lock) { + int lockndx = *lock; + + s_check_valid(lockndx); + PGSemaphoreUnlock(&SpinlockSemaArray[*lock]); } @@ -113,8 +162,12 @@ s_lock_free_sema(volatile slock_t *lock) int tas_sema(volatile slock_t *lock) { + int lockndx = *lock; + + s_check_valid(lockndx); + /* Note that TAS macros return 0 if *success* */ - return !PGSemaphoreTryLock(&SpinlockSemaArray[*lock]); + return !PGSemaphoreTryLock(&SpinlockSemaArray[lockndx]); } #endif /* !HAVE_SPINLOCKS */ diff --git a/src/test/regress/regress.c b/src/test/regress/regress.c index f6e337741bf..36e8bc72a1b 100644 --- a/src/test/regress/regress.c +++ b/src/test/regress/regress.c @@ -1159,6 +1159,51 @@ test_spinlock(void) #endif } +/* + * Verify that performing atomic ops inside a spinlock isn't a + * problem. Realistically that's only going to be a problem when both + * --disable-spinlocks and --disable-atomics are used, but it's cheap enough + * to just always test. + * + * The test works by initializing enough atomics that we'd conflict if there + * were an overlap between a spinlock and an atomic by holding a spinlock + * while manipulating more than NUM_SPINLOCK_SEMAPHORES atomics. + * + * NUM_TEST_ATOMICS doesn't really need to be more than + * NUM_SPINLOCK_SEMAPHORES, but it seems better to test a bit more + * extensively. + */ +static void +test_atomic_spin_nest(void) +{ + slock_t lock; +#define NUM_TEST_ATOMICS (NUM_SPINLOCK_SEMAPHORES + NUM_ATOMICS_SEMAPHORES + 27) + pg_atomic_uint32 atomics32[NUM_TEST_ATOMICS]; + + SpinLockInit(&lock); + + for (int i = 0; i < NUM_TEST_ATOMICS; i++) + { + pg_atomic_init_u32(&atomics32[i], 0); + } + + /* just so it's not all zeroes */ + for (int i = 0; i < NUM_TEST_ATOMICS; i++) + { + EXPECT_EQ_U32(pg_atomic_fetch_add_u32(&atomics32[i], i), 0); + } + + /* test whether we can do atomic op with lock held */ + SpinLockAcquire(&lock); + for (int i = 0; i < NUM_TEST_ATOMICS; i++) + { + EXPECT_EQ_U32(pg_atomic_fetch_sub_u32(&atomics32[i], i), i); + EXPECT_EQ_U32(pg_atomic_read_u32(&atomics32[i]), 0); + } + SpinLockRelease(&lock); +} +#undef NUM_TEST_ATOMICS + PG_FUNCTION_INFO_V1(test_atomic_ops); Datum test_atomic_ops(PG_FUNCTION_ARGS) @@ -1177,5 +1222,7 @@ test_atomic_ops(PG_FUNCTION_ARGS) */ test_spinlock(); + test_atomic_spin_nest(); + PG_RETURN_BOOL(true); }