forked from flashcat/categraf
126 lines
2.9 KiB
Go
126 lines
2.9 KiB
Go
// Copyright 2020 Google Inc. All Rights Reserved.
|
|
// This file is available under the Apache license.
|
|
|
|
package waker
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
"sync"
|
|
)
|
|
|
|
// A testWaker is used to manually signal to idle routines it's time to look for new work.
|
|
type testWaker struct {
|
|
Waker
|
|
|
|
ctx context.Context
|
|
|
|
n int
|
|
|
|
wakeeReady chan struct{}
|
|
wakeeDone chan struct{}
|
|
wait chan struct{}
|
|
|
|
mu sync.Mutex // protects following fields
|
|
wake chan struct{}
|
|
}
|
|
|
|
// WakeFunc describes a function used by tests to trigger a wakeup of blocked idle goroutines under test. It takes as first parameter the number of goroutines to await before returning to the caller.
|
|
type WakeFunc func(int)
|
|
|
|
// NewTest creates a new Waker to be used in tests, returning it and a function to trigger a wakeup. The constructor parameter says how many wakees are expected in the first pass.
|
|
func NewTest(ctx context.Context, n int) (Waker, WakeFunc) {
|
|
t := &testWaker{
|
|
ctx: ctx,
|
|
n: n,
|
|
wakeeReady: make(chan struct{}),
|
|
wakeeDone: make(chan struct{}),
|
|
wait: make(chan struct{}),
|
|
wake: make(chan struct{}),
|
|
}
|
|
initDone := make(chan struct{})
|
|
go func() {
|
|
defer close(initDone)
|
|
for i := 0; i < t.n; i++ {
|
|
<-t.wakeeDone
|
|
}
|
|
}()
|
|
wakeFunc := func(after int) {
|
|
<-initDone
|
|
log.Println(1, "TestWaker yielding to Wakee")
|
|
for i := 0; i < t.n; i++ {
|
|
t.wait <- struct{}{}
|
|
}
|
|
log.Printf("waiting for %d wakees to get the wake chan", t.n)
|
|
for i := 0; i < t.n; i++ {
|
|
<-t.wakeeReady
|
|
}
|
|
t.broadcastWakeAndReset()
|
|
// Now wakeFunc blocks here
|
|
log.Printf("waiting for %d wakees to return to Wake", after)
|
|
for i := 0; i < after; i++ {
|
|
<-t.wakeeDone
|
|
}
|
|
t.n = after
|
|
log.Println("Wakee yielding to TestWaker")
|
|
}
|
|
return t, wakeFunc
|
|
}
|
|
|
|
// Wake satisfies the Waker interface.
|
|
func (t *testWaker) Wake() (w <-chan struct{}) {
|
|
t.mu.Lock()
|
|
w = t.wake
|
|
t.mu.Unlock()
|
|
log.Println("waiting for wakeup on chan ", w)
|
|
// Background this so we can return the wake channel.
|
|
// The wakeFunc won't close the channel until this completes.
|
|
go func() {
|
|
// Signal we've reentered Wake. wakeFunc can't return until we do this.
|
|
select {
|
|
case <-t.ctx.Done():
|
|
return
|
|
case t.wakeeDone <- struct{}{}:
|
|
}
|
|
// Block wakees here until a subsequent wakeFunc is called.
|
|
select {
|
|
case <-t.ctx.Done():
|
|
return
|
|
case <-t.wait:
|
|
}
|
|
// Signal we've got the wake chan, telling wakeFunc it can now issue a broadcast.
|
|
select {
|
|
case <-t.ctx.Done():
|
|
return
|
|
case t.wakeeReady <- struct{}{}:
|
|
}
|
|
}()
|
|
return
|
|
}
|
|
|
|
func (t *testWaker) broadcastWakeAndReset() {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
log.Printf("broadcasting wake to chan %p", t.wake)
|
|
close(t.wake)
|
|
t.wake = make(chan struct{})
|
|
log.Printf("wake channel reset")
|
|
}
|
|
|
|
// alwaysWaker never blocks the wakee.
|
|
type alwaysWaker struct {
|
|
wake chan struct{}
|
|
}
|
|
|
|
func NewTestAlways() Waker {
|
|
w := &alwaysWaker{
|
|
wake: make(chan struct{}),
|
|
}
|
|
close(w.wake)
|
|
return w
|
|
}
|
|
|
|
func (w *alwaysWaker) Wake() <-chan struct{} {
|
|
return w.wake
|
|
}
|