runtime: make bubbled timers more consistent with unbubbled
This CL makes two changes to reduce the predictability with which bubbled timers fire. When asynctimerchan=0 (the default), regular timers with an associated channel are only added to a timer heap when some channel operation is blocked on that channel. This allows us to garbage collect unreferenced, unstopped timers. Timers in a synctest bubble, in contrast, are always added to the bubble's timer heap. This CL changes bubbled timers with a channel to be handled the same as unbubbled ones, adding them to the bubble's timer heap only when some channel operation is blocked on the timer's channel. This permits unstopped bubbled timers to be garbage collected, but more importantly it makes all timers past their deadline behave identically, regardless of whether they are in a bubble. This CL also changes timer scheduling to execute bubbled timers immediately when possible rather than adding them to a heap. Timers in a bubble's heap are executed when the bubble is idle. Executing timers immediately avoids creating a predictable order of execution. For #73850 Fixes #73934 Change-Id: If82e441546408f780f6af6fb7f6e416d3160295d Reviewed-on: https://go-review.googlesource.com/c/go/+/678075 Auto-Submit: Damien Neil <dneil@google.com> Reviewed-by: Michael Pratt <mpratt@google.com> LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
This commit is contained in:
parent
1aa3362093
commit
3432c68467
@ -226,8 +226,8 @@ func TestTimerNondeterminism(t *testing.T) {
|
||||
const iterations = 1000
|
||||
var seen1, seen2 bool
|
||||
for range iterations {
|
||||
tm1 := time.NewTimer(0)
|
||||
tm2 := time.NewTimer(0)
|
||||
tm1 := time.NewTimer(1)
|
||||
tm2 := time.NewTimer(1)
|
||||
select {
|
||||
case <-tm1.C:
|
||||
seen1 = true
|
||||
@ -278,6 +278,57 @@ func TestSleepNondeterminism(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// TestTimerRunsImmediately verifies that a 0-duration timer sends on its channel
|
||||
// without waiting for the bubble to block.
|
||||
func TestTimerRunsImmediately(t *testing.T) {
|
||||
synctest.Run(func() {
|
||||
start := time.Now()
|
||||
tm := time.NewTimer(0)
|
||||
select {
|
||||
case got := <-tm.C:
|
||||
if !got.Equal(start) {
|
||||
t.Errorf("<-tm.C = %v, want %v", got, start)
|
||||
}
|
||||
default:
|
||||
t.Errorf("0-duration timer channel is not readable; want it to be")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// TestTimerRunsLater verifies that reading from a timer's channel receives the
|
||||
// timer fired, even when that time is in reading from a timer's channel receives the
|
||||
// time the timer fired, even when that time is in the past.
|
||||
func TestTimerRanInPast(t *testing.T) {
|
||||
synctest.Run(func() {
|
||||
delay := 1 * time.Second
|
||||
want := time.Now().Add(delay)
|
||||
tm := time.NewTimer(delay)
|
||||
time.Sleep(2 * delay)
|
||||
select {
|
||||
case got := <-tm.C:
|
||||
if !got.Equal(want) {
|
||||
t.Errorf("<-tm.C = %v, want %v", got, want)
|
||||
}
|
||||
default:
|
||||
t.Errorf("0-duration timer channel is not readable; want it to be")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// TestAfterFuncRunsImmediately verifies that a 0-duration AfterFunc is scheduled
|
||||
// without waiting for the bubble to block.
|
||||
func TestAfterFuncRunsImmediately(t *testing.T) {
|
||||
synctest.Run(func() {
|
||||
var b atomic.Bool
|
||||
time.AfterFunc(0, func() {
|
||||
b.Store(true)
|
||||
})
|
||||
for !b.Load() {
|
||||
runtime.Gosched()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestChannelFromOutsideBubble(t *testing.T) {
|
||||
choutside := make(chan struct{})
|
||||
for _, test := range []struct {
|
||||
|
@ -497,7 +497,7 @@ func empty(c *hchan) bool {
|
||||
// c.timer is also immutable (it is set after make(chan) but before any channel operations).
|
||||
// All timer channels have dataqsiz > 0.
|
||||
if c.timer != nil {
|
||||
c.timer.maybeRunChan()
|
||||
c.timer.maybeRunChan(c)
|
||||
}
|
||||
return atomic.Loaduint(&c.qcount) == 0
|
||||
}
|
||||
@ -542,7 +542,7 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
|
||||
}
|
||||
|
||||
if c.timer != nil {
|
||||
c.timer.maybeRunChan()
|
||||
c.timer.maybeRunChan(c)
|
||||
}
|
||||
|
||||
// Fast path: check for failed non-blocking operation without acquiring the lock.
|
||||
@ -821,7 +821,7 @@ func chanlen(c *hchan) int {
|
||||
}
|
||||
async := debug.asynctimerchan.Load() != 0
|
||||
if c.timer != nil && async {
|
||||
c.timer.maybeRunChan()
|
||||
c.timer.maybeRunChan(c)
|
||||
}
|
||||
if c.timer != nil && !async {
|
||||
// timer channels have a buffered implementation
|
||||
|
@ -3341,7 +3341,7 @@ top:
|
||||
// which may steal timers. It's important that between now
|
||||
// and then, nothing blocks, so these numbers remain mostly
|
||||
// relevant.
|
||||
now, pollUntil, _ := pp.timers.check(0)
|
||||
now, pollUntil, _ := pp.timers.check(0, nil)
|
||||
|
||||
// Try to schedule the trace reader.
|
||||
if traceEnabled() || traceShuttingDown() {
|
||||
@ -3780,7 +3780,7 @@ func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWo
|
||||
// timerpMask tells us whether the P may have timers at all. If it
|
||||
// can't, no need to check at all.
|
||||
if stealTimersOrRunNextG && timerpMask.read(enum.position()) {
|
||||
tnow, w, ran := p2.timers.check(now)
|
||||
tnow, w, ran := p2.timers.check(now, nil)
|
||||
now = tnow
|
||||
if w != 0 && (pollUntil == 0 || w < pollUntil) {
|
||||
pollUntil = w
|
||||
|
@ -185,7 +185,7 @@ func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, blo
|
||||
}
|
||||
|
||||
if cas.c.timer != nil {
|
||||
cas.c.timer.maybeRunChan()
|
||||
cas.c.timer.maybeRunChan(cas.c)
|
||||
}
|
||||
|
||||
j := cheaprandn(uint32(norder + 1))
|
||||
|
@ -185,7 +185,6 @@ func synctestRun(f func()) {
|
||||
}
|
||||
const synctestBaseTime = 946684800000000000 // midnight UTC 2000-01-01
|
||||
bubble.now = synctestBaseTime
|
||||
bubble.timers.bubble = bubble
|
||||
lockInit(&bubble.mu, lockRankSynctest)
|
||||
lockInit(&bubble.timers.mu, lockRankTimers)
|
||||
|
||||
@ -213,7 +212,7 @@ func synctestRun(f func()) {
|
||||
// so timer goroutines inherit their child race context from g0.
|
||||
curg := gp.m.curg
|
||||
gp.m.curg = nil
|
||||
gp.bubble.timers.check(gp.bubble.now)
|
||||
gp.bubble.timers.check(bubble.now, bubble)
|
||||
gp.m.curg = curg
|
||||
})
|
||||
gopark(synctestidle_c, nil, waitReasonSynctestRun, traceBlockSynctest, 0)
|
||||
|
@ -157,8 +157,6 @@ type timers struct {
|
||||
// heap[i].when over timers with the timerModified bit set.
|
||||
// If minWhenModified = 0, it means there are no timerModified timers in the heap.
|
||||
minWhenModified atomic.Int64
|
||||
|
||||
bubble *synctestBubble
|
||||
}
|
||||
|
||||
type timerWhen struct {
|
||||
@ -403,7 +401,7 @@ func newTimer(when, period int64, f func(arg any, seq uintptr, delay int64), arg
|
||||
throw("invalid timer channel: no capacity")
|
||||
}
|
||||
}
|
||||
if gr := getg().bubble; gr != nil {
|
||||
if bubble := getg().bubble; bubble != nil {
|
||||
t.isFake = true
|
||||
}
|
||||
t.modify(when, period, f, arg, 0)
|
||||
@ -485,7 +483,7 @@ func (t *timer) maybeRunAsync() {
|
||||
// timer ourselves now is fine.)
|
||||
if now := nanotime(); t.when <= now {
|
||||
systemstack(func() {
|
||||
t.unlockAndRun(now) // resets t.when
|
||||
t.unlockAndRun(now, nil) // resets t.when
|
||||
})
|
||||
t.lock()
|
||||
}
|
||||
@ -621,6 +619,29 @@ func (t *timer) modify(when, period int64, f func(arg any, seq uintptr, delay in
|
||||
|
||||
add := t.needsAdd()
|
||||
|
||||
if add && t.isFake {
|
||||
// If this is a bubbled timer scheduled to fire immediately,
|
||||
// run it now rather than waiting for the bubble's timer scheduler.
|
||||
// This avoids deferring timer execution until after the bubble
|
||||
// becomes durably blocked.
|
||||
//
|
||||
// Don't do this for non-bubbled timers: It isn't necessary,
|
||||
// and there may be cases where the runtime executes timers with
|
||||
// the expectation the timer func will not run in the current goroutine.
|
||||
// Bubbled timers are always created by the time package, and are
|
||||
// safe to run in the current goroutine.
|
||||
bubble := getg().bubble
|
||||
if bubble == nil {
|
||||
throw("fake timer executing with no bubble")
|
||||
}
|
||||
if t.state&timerHeaped == 0 && when <= bubble.now {
|
||||
systemstack(func() {
|
||||
t.unlockAndRun(bubble.now, bubble)
|
||||
})
|
||||
return pending
|
||||
}
|
||||
}
|
||||
|
||||
if !async && t.isChan {
|
||||
// Stop any future sends with stale values.
|
||||
// See timer.unlockAndRun.
|
||||
@ -657,7 +678,7 @@ func (t *timer) modify(when, period int64, f func(arg any, seq uintptr, delay in
|
||||
// t must be locked.
|
||||
func (t *timer) needsAdd() bool {
|
||||
assertLockHeld(&t.mu)
|
||||
need := t.state&timerHeaped == 0 && t.when > 0 && (!t.isChan || t.isFake || t.blocked > 0)
|
||||
need := t.state&timerHeaped == 0 && t.when > 0 && (!t.isChan || t.blocked > 0)
|
||||
if need {
|
||||
t.trace("needsAdd+")
|
||||
} else {
|
||||
@ -982,7 +1003,7 @@ func (ts *timers) wakeTime() int64 {
|
||||
// We pass now in and out to avoid extra calls of nanotime.
|
||||
//
|
||||
//go:yeswritebarrierrec
|
||||
func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) {
|
||||
func (ts *timers) check(now int64, bubble *synctestBubble) (rnow, pollUntil int64, ran bool) {
|
||||
ts.trace("check")
|
||||
// If it's not yet time for the first timer, or the first adjusted
|
||||
// timer, then there is nothing to do.
|
||||
@ -1015,7 +1036,7 @@ func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) {
|
||||
ts.adjust(now, false)
|
||||
for len(ts.heap) > 0 {
|
||||
// Note that runtimer may temporarily unlock ts.
|
||||
if tw := ts.run(now); tw != 0 {
|
||||
if tw := ts.run(now, bubble); tw != 0 {
|
||||
if tw > 0 {
|
||||
pollUntil = tw
|
||||
}
|
||||
@ -1047,7 +1068,7 @@ func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) {
|
||||
// If a timer is run, this will temporarily unlock ts.
|
||||
//
|
||||
//go:systemstack
|
||||
func (ts *timers) run(now int64) int64 {
|
||||
func (ts *timers) run(now int64, bubble *synctestBubble) int64 {
|
||||
ts.trace("run")
|
||||
assertLockHeld(&ts.mu)
|
||||
Redo:
|
||||
@ -1081,7 +1102,7 @@ Redo:
|
||||
return t.when
|
||||
}
|
||||
|
||||
t.unlockAndRun(now)
|
||||
t.unlockAndRun(now, bubble)
|
||||
assertLockHeld(&ts.mu) // t is unlocked now, but not ts
|
||||
return 0
|
||||
}
|
||||
@ -1092,7 +1113,7 @@ Redo:
|
||||
// unlockAndRun returns with t unlocked and t.ts (re-)locked.
|
||||
//
|
||||
//go:systemstack
|
||||
func (t *timer) unlockAndRun(now int64) {
|
||||
func (t *timer) unlockAndRun(now int64, bubble *synctestBubble) {
|
||||
t.trace("unlockAndRun")
|
||||
assertLockHeld(&t.mu)
|
||||
if t.ts != nil {
|
||||
@ -1104,10 +1125,10 @@ func (t *timer) unlockAndRun(now int64) {
|
||||
// out from under us while this function executes.
|
||||
gp := getg()
|
||||
var tsLocal *timers
|
||||
if t.ts == nil || t.ts.bubble == nil {
|
||||
if bubble == nil {
|
||||
tsLocal = &gp.m.p.ptr().timers
|
||||
} else {
|
||||
tsLocal = &t.ts.bubble.timers
|
||||
tsLocal = &bubble.timers
|
||||
}
|
||||
if tsLocal.raceCtx == 0 {
|
||||
tsLocal.raceCtx = racegostart(abi.FuncPCABIInternal((*timers).run) + sys.PCQuantum)
|
||||
@ -1160,10 +1181,10 @@ func (t *timer) unlockAndRun(now int64) {
|
||||
if gp.racectx != 0 {
|
||||
throw("unexpected racectx")
|
||||
}
|
||||
if ts == nil || ts.bubble == nil {
|
||||
if bubble == nil {
|
||||
gp.racectx = gp.m.p.ptr().timers.raceCtx
|
||||
} else {
|
||||
gp.racectx = ts.bubble.timers.raceCtx
|
||||
gp.racectx = bubble.timers.raceCtx
|
||||
}
|
||||
}
|
||||
|
||||
@ -1171,14 +1192,14 @@ func (t *timer) unlockAndRun(now int64) {
|
||||
ts.unlock()
|
||||
}
|
||||
|
||||
if ts != nil && ts.bubble != nil {
|
||||
if bubble != nil {
|
||||
// Temporarily use the timer's synctest group for the G running this timer.
|
||||
gp := getg()
|
||||
if gp.bubble != nil {
|
||||
throw("unexpected syncgroup set")
|
||||
}
|
||||
gp.bubble = ts.bubble
|
||||
ts.bubble.changegstatus(gp, _Gdead, _Grunning)
|
||||
gp.bubble = bubble
|
||||
bubble.changegstatus(gp, _Gdead, _Grunning)
|
||||
}
|
||||
|
||||
if !async && t.isChan {
|
||||
@ -1222,13 +1243,13 @@ func (t *timer) unlockAndRun(now int64) {
|
||||
unlock(&t.sendLock)
|
||||
}
|
||||
|
||||
if ts != nil && ts.bubble != nil {
|
||||
if bubble != nil {
|
||||
gp := getg()
|
||||
ts.bubble.changegstatus(gp, _Grunning, _Gdead)
|
||||
bubble.changegstatus(gp, _Grunning, _Gdead)
|
||||
if raceenabled {
|
||||
// Establish a happens-before between this timer event and
|
||||
// the next synctest.Wait call.
|
||||
racereleasemergeg(gp, ts.bubble.raceaddr())
|
||||
racereleasemergeg(gp, bubble.raceaddr())
|
||||
}
|
||||
gp.bubble = nil
|
||||
}
|
||||
@ -1415,24 +1436,10 @@ func badTimer() {
|
||||
// maybeRunChan checks whether the timer needs to run
|
||||
// to send a value to its associated channel. If so, it does.
|
||||
// The timer must not be locked.
|
||||
func (t *timer) maybeRunChan() {
|
||||
if t.isFake {
|
||||
t.lock()
|
||||
var timerBubble *synctestBubble
|
||||
if t.ts != nil {
|
||||
timerBubble = t.ts.bubble
|
||||
}
|
||||
t.unlock()
|
||||
bubble := getg().bubble
|
||||
if bubble == nil {
|
||||
panic(plainError("synctest timer accessed from outside bubble"))
|
||||
}
|
||||
if timerBubble != nil && bubble != timerBubble {
|
||||
panic(plainError("timer moved between synctest bubbles"))
|
||||
}
|
||||
// No need to do anything here.
|
||||
// synctest.Run will run the timer when it advances its fake clock.
|
||||
return
|
||||
func (t *timer) maybeRunChan(c *hchan) {
|
||||
if t.isFake && getg().bubble != c.bubble {
|
||||
// This should have been checked by the caller, but check just in case.
|
||||
fatal("synctest timer accessed from outside bubble")
|
||||
}
|
||||
if t.astate.Load()&timerHeaped != 0 {
|
||||
// If the timer is in the heap, the ordinary timer code
|
||||
@ -1442,6 +1449,9 @@ func (t *timer) maybeRunChan() {
|
||||
|
||||
t.lock()
|
||||
now := nanotime()
|
||||
if t.isFake {
|
||||
now = getg().bubble.now
|
||||
}
|
||||
if t.state&timerHeaped != 0 || t.when == 0 || t.when > now {
|
||||
t.trace("maybeRunChan-")
|
||||
// Timer in the heap, or not running at all, or not triggered.
|
||||
@ -1450,7 +1460,7 @@ func (t *timer) maybeRunChan() {
|
||||
}
|
||||
t.trace("maybeRunChan+")
|
||||
systemstack(func() {
|
||||
t.unlockAndRun(now)
|
||||
t.unlockAndRun(now, c.bubble)
|
||||
})
|
||||
}
|
||||
|
||||
@ -1460,9 +1470,11 @@ func (t *timer) maybeRunChan() {
|
||||
// adding it if needed.
|
||||
func blockTimerChan(c *hchan) {
|
||||
t := c.timer
|
||||
if t.isFake {
|
||||
return
|
||||
if t.isFake && c.bubble != getg().bubble {
|
||||
// This should have been checked by the caller, but check just in case.
|
||||
fatal("synctest timer accessed from outside bubble")
|
||||
}
|
||||
|
||||
t.lock()
|
||||
t.trace("blockTimerChan")
|
||||
if !t.isChan {
|
||||
@ -1500,9 +1512,6 @@ func blockTimerChan(c *hchan) {
|
||||
// blocked on it anymore.
|
||||
func unblockTimerChan(c *hchan) {
|
||||
t := c.timer
|
||||
if t.isFake {
|
||||
return
|
||||
}
|
||||
t.lock()
|
||||
t.trace("unblockTimerChan")
|
||||
if !t.isChan || t.blocked == 0 {
|
||||
|
Loading…
x
Reference in New Issue
Block a user