From 20ecdf35abe998c9a32ea1ed0f4c3b236d4f63cd Mon Sep 17 00:00:00 2001 From: Andrew Winterman Date: Tue, 18 Jul 2023 14:33:11 -0700 Subject: [PATCH 01/15] feat: port context-aware cond to gobox I've been using this for a few months in polaroid (non-generic version), so I figured it's reasonable at this point to port it to GoBox, --- pkg/async/cond.go | 94 ++++++++++++++++++++ pkg/async/cond_test.go | 194 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 288 insertions(+) create mode 100644 pkg/async/cond.go create mode 100644 pkg/async/cond_test.go diff --git a/pkg/async/cond.go b/pkg/async/cond.go new file mode 100644 index 00000000..a3486b41 --- /dev/null +++ b/pkg/async/cond.go @@ -0,0 +1,94 @@ +// Copyright 2023 Outreach Corporation. All Rights Reserved. + +// Description: Cond.go provides a context respecting sync condition + +package async + +import ( + "context" + "sync" + "sync/atomic" +) + +// Cond is a sync.Cond that respects context cancellation. +// It provides equivalent functionality to sync.Cond (excepting there is no `Signal` method), except that +// the Wait method exits with error if the context cancels. +type Cond struct { + pointer atomic.Pointer[chan struct{}] + mu sync.Mutex +} + +// ch returns the channel that Waiters are waiting on, possibly creating one if it doesn't exist. +func (c *Cond) ch() chan struct{} { + t := make(chan struct{}) + c.pointer.CompareAndSwap(nil, &t) + return *c.pointer.Load() +} + +// Wait waits for the state change Broadcast until context ends. +func (c *Cond) Wait(ctx context.Context) error { + select { + case <-c.ch(): + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// Broadcast signals the state change to all Waiters +func (c *Cond) Broadcast() { + ch := c.ch() + // now that we retrieved the channel, new calls to Wait should get a new channel + c.pointer.Store(nil) + close(ch) +} + +// WaitForCondition checks if the condition is true or the context is done, otherwise +// it waits for the state change Broadcast. +// +// if it returns without error, it also locks the provided locker and the caller must call the returned function +// to unlock it. Until they call unlock, the state should not be changed. +func (c *Cond) WaitForCondition(ctx context.Context, locker sync.Locker, condition func() bool) (func(), error) { + for { + err := c.lockWithContext(ctx, locker) + if err != nil { + return func() {}, err + } + + // we have the lock, we can safely check the condition + ok := condition() + + if !ok { + // condition not met + // but we acquired the lock. so unlock it... + locker.Unlock() + + // either way, wait for the next signal + waitErr := c.Wait(ctx) + if waitErr != nil { + return func() {}, waitErr + } + } else { + // condition met, return the unlock function and nil error + // client must call the unlock function to unlock the mutex + // client guaranteed the condition holds while mutex lock is held. + return locker.Unlock, nil + } + } +} + +// lockWithContext waits to either acquire the lock or for the context to end. +// It returns an error if context ends before it can acquire the lock +func (c *Cond) lockWithContext(ctx context.Context, locker sync.Locker) error { + lockAcquired := make(chan struct{}) + go func() { + locker.Lock() + close(lockAcquired) + }() + select { + case <-lockAcquired: + case <-ctx.Done(): + return ctx.Err() + } + return nil +} diff --git a/pkg/async/cond_test.go b/pkg/async/cond_test.go new file mode 100644 index 00000000..9cc9a8f3 --- /dev/null +++ b/pkg/async/cond_test.go @@ -0,0 +1,194 @@ +package async + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "golang.org/x/sync/errgroup" +) + +func TestCond(t *testing.T) { + + t.Run("broadcast wakes up waiter", func(t *testing.T) { + cond := Cond{} + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + go func() { + time.Sleep(50 * time.Millisecond) // just a breath so the other goroutine goes first + cond.Broadcast() + }() + + err := cond.Wait(ctx) + assert.Nil(t, err) + }) + + t.Run("can safely interleave broadcasts", func(t *testing.T) { + cond := Cond{} + ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond) + defer cancel() + for j := 0; j < 10; j++ { + start := make(chan struct{}) + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + return cond.Wait(ctx) + }) + for i := 0; i < 10; i++ { + g.Go(func() error { + <-start + cond.Broadcast() + return nil + }) + } + g.Go(func() error { + time.Sleep(10 * time.Millisecond) // just a breath so the other goroutine goes first + close(start) + return nil + }) + err := g.Wait() + assert.Nil(t, err) + } + }) + + t.Run("broadcast wakes all waiters", func(t *testing.T) { + cond := Cond{} + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + g, ctx := errgroup.WithContext(ctx) + // start everyone waiting + for i := 0; i < 10; i++ { + g.Go(func() error { + return cond.Wait(ctx) + }) + } + + // wake em all up + g.Go(func() error { + time.Sleep(10 * time.Millisecond) // just a breath so the other goroutine goes first + cond.Broadcast() + return nil + }) + + err := g.Wait() + assert.Nil(t, err) + }) + + t.Run("waiter exits on context cancel", func(t *testing.T) { + cond := Cond{} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + time.Sleep(50 * time.Millisecond) // just a breath so the other goroutine goes first + cancel() + }() + + err := cond.Wait(ctx) + assert.Equal(t, context.Canceled, err) + }) +} + +func TestCond_WaitForCondition(t *testing.T) { + cond := Cond{} + t.Run("returns immediately, without error if the lock is free and the condition is met", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mu := &sync.Mutex{} + unlock, err := cond.WaitForCondition(ctx, mu, func() bool { + return true + }) + assert.Nil(t, err) + assert.False(t, mu.TryLock()) // it was able to lock m + + // the returned function unlocks mu + unlock() + assert.True(t, mu.TryLock()) + }) + + t.Run("blocks until lock is free if condition is met", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mu := &sync.Mutex{} + mu.Lock() // lock it so the condition isn't evaluated until we unlock it + waitedForUnlock := false + go func() { + time.Sleep(100 * time.Millisecond) + mu.Unlock() + waitedForUnlock = true + }() + unlock, err := cond.WaitForCondition(ctx, mu, func() bool { + return true + }) + assert.True(t, waitedForUnlock) + + assert.Nil(t, err) + assert.False(t, mu.TryLock()) // it is locked + // the returned function unlocks mu + unlock() + assert.True(t, mu.TryLock()) + }) + + t.Run("blocks until condition is met", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mu := &sync.Mutex{} + + var i = 0 + unlock, err := cond.WaitForCondition(ctx, mu, func() bool { + go func() { + i++ + cond.Broadcast() + }() + return i > 5 + }) + + assert.Nil(t, err) + assert.False(t, mu.TryLock()) // it is locked + // the returned function unlocks mu + unlock() + assert.True(t, mu.TryLock()) + }) + + t.Run("respects context expiry; even if locked", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mu := &sync.Mutex{} + mu.Lock() + + go func() { + time.Sleep(50 * time.Millisecond) // just a breath so the other goroutine goes first + cancel() + }() + + fn, err := cond.WaitForCondition(ctx, mu, func() bool { + return true + }) + defer fn() + assert.Equal(t, context.Canceled, err) + }) + + t.Run("respects context expiry; if lock is free", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mu := &sync.Mutex{} + + go func() { + time.Sleep(50 * time.Millisecond) // just a breath so the other goroutine goes first + cancel() + }() + + fn, err := cond.WaitForCondition(ctx, mu, func() bool { + return false + }) + defer fn() + assert.Equal(t, context.Canceled, err) + }) +} From a01271be25fcd3fafcef9051150697fb06d47fee Mon Sep 17 00:00:00 2001 From: Andrew Winterman Date: Tue, 18 Jul 2023 14:40:59 -0700 Subject: [PATCH 02/15] feat: cleanup unused values --- pkg/async/cond.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/async/cond.go b/pkg/async/cond.go index a3486b41..6fc19b4a 100644 --- a/pkg/async/cond.go +++ b/pkg/async/cond.go @@ -15,7 +15,6 @@ import ( // the Wait method exits with error if the context cancels. type Cond struct { pointer atomic.Pointer[chan struct{}] - mu sync.Mutex } // ch returns the channel that Waiters are waiting on, possibly creating one if it doesn't exist. From b9e638658bff9520942ce47ef94b811d9ed54395 Mon Sep 17 00:00:00 2001 From: Andrew Winterman Date: Tue, 18 Jul 2023 14:43:23 -0700 Subject: [PATCH 03/15] fix: lint test --- pkg/async/cond_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/async/cond_test.go b/pkg/async/cond_test.go index 9cc9a8f3..f566098b 100644 --- a/pkg/async/cond_test.go +++ b/pkg/async/cond_test.go @@ -59,7 +59,7 @@ func TestCond(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - g, ctx := errgroup.WithContext(ctx) + g := errgroup.Group{} // start everyone waiting for i := 0; i < 10; i++ { g.Go(func() error { From cfe8e42ac9aa4ec9af6bd2b2a965d21b046f9030 Mon Sep 17 00:00:00 2001 From: Andrew Winterman Date: Tue, 18 Jul 2023 14:46:13 -0700 Subject: [PATCH 04/15] fix: another lint error --- pkg/async/cond_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/async/cond_test.go b/pkg/async/cond_test.go index f566098b..93951540 100644 --- a/pkg/async/cond_test.go +++ b/pkg/async/cond_test.go @@ -11,7 +11,6 @@ import ( ) func TestCond(t *testing.T) { - t.Run("broadcast wakes up waiter", func(t *testing.T) { cond := Cond{} From b8c877de9faf8870f85982a4f28cf086449c9b7e Mon Sep 17 00:00:00 2001 From: Andrew Winterman Date: Tue, 18 Jul 2023 14:49:38 -0700 Subject: [PATCH 05/15] fix: yet another lint error --- pkg/async/cond_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/async/cond_test.go b/pkg/async/cond_test.go index 93951540..834ac25c 100644 --- a/pkg/async/cond_test.go +++ b/pkg/async/cond_test.go @@ -32,7 +32,7 @@ func TestCond(t *testing.T) { defer cancel() for j := 0; j < 10; j++ { start := make(chan struct{}) - g, ctx := errgroup.WithContext(ctx) + g := errgroup.Group{} g.Go(func() error { return cond.Wait(ctx) }) From f1e7e8384f047f06a857c27aa2f7d4f7b10ac5bf Mon Sep 17 00:00:00 2001 From: Andrew Winterman Date: Tue, 18 Jul 2023 15:52:20 -0700 Subject: [PATCH 06/15] fix: data races in tests --- pkg/async/cond.go | 7 ++++--- pkg/async/cond_test.go | 13 +++++++------ 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/pkg/async/cond.go b/pkg/async/cond.go index 6fc19b4a..a240aadb 100644 --- a/pkg/async/cond.go +++ b/pkg/async/cond.go @@ -36,10 +36,11 @@ func (c *Cond) Wait(ctx context.Context) error { // Broadcast signals the state change to all Waiters func (c *Cond) Broadcast() { - ch := c.ch() // now that we retrieved the channel, new calls to Wait should get a new channel - c.pointer.Store(nil) - close(ch) + ch := c.pointer.Swap(nil) + if ch != nil { + close(*ch) + } } // WaitForCondition checks if the condition is true or the context is done, otherwise diff --git a/pkg/async/cond_test.go b/pkg/async/cond_test.go index 834ac25c..5ab3f15f 100644 --- a/pkg/async/cond_test.go +++ b/pkg/async/cond_test.go @@ -3,6 +3,7 @@ package async import ( "context" "sync" + "sync/atomic" "testing" "time" @@ -116,16 +117,16 @@ func TestCond_WaitForCondition(t *testing.T) { mu := &sync.Mutex{} mu.Lock() // lock it so the condition isn't evaluated until we unlock it - waitedForUnlock := false + waitedForUnlock := atomic.Bool{} go func() { time.Sleep(100 * time.Millisecond) mu.Unlock() - waitedForUnlock = true + waitedForUnlock.Store(true) }() unlock, err := cond.WaitForCondition(ctx, mu, func() bool { return true }) - assert.True(t, waitedForUnlock) + assert.True(t, waitedForUnlock.Load()) assert.Nil(t, err) assert.False(t, mu.TryLock()) // it is locked @@ -140,13 +141,13 @@ func TestCond_WaitForCondition(t *testing.T) { mu := &sync.Mutex{} - var i = 0 + var i = atomic.Int32{} unlock, err := cond.WaitForCondition(ctx, mu, func() bool { go func() { - i++ + i.Add(1) cond.Broadcast() }() - return i > 5 + return i.Load() > 5 }) assert.Nil(t, err) From 7715469cb861e7ec93489629ac45d99a9b55c9f5 Mon Sep 17 00:00:00 2001 From: Andrew Winterman Date: Wed, 19 Jul 2023 12:52:53 -0700 Subject: [PATCH 07/15] fix: Field Jesse's feedback, add example --- pkg/async/cond.go | 61 ++++++++-------- pkg/async/cond_test.go | 155 ++++++++++++++++++++++++++++++++++------- 2 files changed, 158 insertions(+), 58 deletions(-) diff --git a/pkg/async/cond.go b/pkg/async/cond.go index a240aadb..68bac30e 100644 --- a/pkg/async/cond.go +++ b/pkg/async/cond.go @@ -13,14 +13,22 @@ import ( // Cond is a sync.Cond that respects context cancellation. // It provides equivalent functionality to sync.Cond (excepting there is no `Signal` method), except that // the Wait method exits with error if the context cancels. +// +// It also provides WaitForCondition, which intends to encapsulate the common pattern of acquiring a lock, +// checking a condition, and releasing the lock before waiting for a state change if the condition is not met. type Cond struct { pointer atomic.Pointer[chan struct{}] + Mu sync.Mutex } // ch returns the channel that Waiters are waiting on, possibly creating one if it doesn't exist. func (c *Cond) ch() chan struct{} { - t := make(chan struct{}) - c.pointer.CompareAndSwap(nil, &t) + // non atomic check for nil channel + if c.pointer.Load() == nil { + t := make(chan struct{}) + // make the swap safely. + c.pointer.CompareAndSwap(nil, &t) + } return *c.pointer.Load() } @@ -43,25 +51,29 @@ func (c *Cond) Broadcast() { } } -// WaitForCondition checks if the condition is true or the context is done, otherwise -// it waits for the state change Broadcast. +// WaitForCondition acquires Cond's lock, then checks if the condition is true. If the condition is not true, +// or the lock was not available, it releases the locker and waits for the state change Broadcast. +// If the context ends during any of these operations, the context error is returned. +// +// WaitForCondition returns an unlock function that should always be called to unlock the locker. +// unlock is safe to call regardless of error. +// Error should only be returned if the context ends before the condition is met. // -// if it returns without error, it also locks the provided locker and the caller must call the returned function +// If it returns without error, it also locks the provided locker and the caller must call the returned function // to unlock it. Until they call unlock, the state should not be changed. -func (c *Cond) WaitForCondition(ctx context.Context, locker sync.Locker, condition func() bool) (func(), error) { +func (c *Cond) WaitForCondition(ctx context.Context, condition func() bool) (unlock func(), + err error) { for { - err := c.lockWithContext(ctx, locker) - if err != nil { - return func() {}, err - } - + locked := c.Mu.TryLock() // we have the lock, we can safely check the condition - ok := condition() + ok := locked && condition() if !ok { // condition not met - // but we acquired the lock. so unlock it... - locker.Unlock() + if locked { + // but we acquired the lock. so unlock it... + c.Mu.Unlock() + } // either way, wait for the next signal waitErr := c.Wait(ctx) @@ -72,23 +84,10 @@ func (c *Cond) WaitForCondition(ctx context.Context, locker sync.Locker, conditi // condition met, return the unlock function and nil error // client must call the unlock function to unlock the mutex // client guaranteed the condition holds while mutex lock is held. - return locker.Unlock, nil + return func() { + c.Mu.Unlock() + c.Broadcast() + }, nil } } } - -// lockWithContext waits to either acquire the lock or for the context to end. -// It returns an error if context ends before it can acquire the lock -func (c *Cond) lockWithContext(ctx context.Context, locker sync.Locker) error { - lockAcquired := make(chan struct{}) - go func() { - locker.Lock() - close(lockAcquired) - }() - select { - case <-lockAcquired: - case <-ctx.Done(): - return ctx.Err() - } - return nil -} diff --git a/pkg/async/cond_test.go b/pkg/async/cond_test.go index 5ab3f15f..a018e30f 100644 --- a/pkg/async/cond_test.go +++ b/pkg/async/cond_test.go @@ -2,7 +2,8 @@ package async import ( "context" - "sync" + "fmt" + "runtime/pprof" "sync/atomic" "testing" "time" @@ -11,6 +12,103 @@ import ( "golang.org/x/sync/errgroup" ) +// ExampleCond_WaitForCondition demonstrates how to use a condition variable to wait for a condition to be met. +// +// In this example, we have a queue of integers that we can obtain in specific batch sizes (5 in this case), and +// we want to wait until the queue has room for the entire batch before enqueuing. +// +// We use a condition variable to wait until the queue has room for the next batch, +// use the cond's broadcast method any time elements are pulled from the queue. +// +// It stops after the consumer has consumed 29 values. +func ExampleCond_WaitForCondition() { + // Create a context with a timeout + var ctx, cancel = context.WithTimeout(context.Background(), time.Hour) + defer cancel() + + // Create a new condition variable; the zero value is ready to use. + // Cond protects and synchronizes goroutines that need to respond to changes in the queue's state. + var cond Cond + + // state represents the external state we are synchronizing on + var queue = make([]int, 0, 10) + // counter is used to generate unique values for the queue + // it is also protected by cond + var counter int + // consumed is used to track how many values we have consumed + // it is also protected by cond + var consumed int + + // we're going to run multiple goroutines, Group will keep track of them for us. + var group errgroup.Group + + // this goroutine is the producer, it will enqueue values into the queue when there is capacity + group.Go(func() (err error) { + pprof.Do(ctx, pprof.Labels("cond", "produce"), func(ctx context.Context) { + for ctx.Err() == nil { + // the enqueing goroutine + unlock, waiterr := cond.WaitForCondition(ctx, func() bool { + // our condition is that the queue, has capacity for the entire next batch + return len(queue)+2 <= cap(queue) + }) + + if err != nil { + // this means the context timed out before the condition was met + unlock() // safe to call regardless of error. + err = waiterr + return + } + + // enqueue 5 values. this is threadsafe because we are protected by the condition's lock + for i := 0; i < 5 && ctx.Err() == nil; i++ { + counter += 1 + queue = append(queue, counter) + } + + unlock() // safe to call regardless of error. + } + err = ctx.Err() + }) + return err + }) + + // this goroutine is the consumer; it will dequeue values from the queue when it is full + group.Go(func() (err error) { + pprof.Do(ctx, pprof.Labels("cond", "consume"), func(ctx context.Context) { + for ctx.Err() == nil { + unlock, waiterr := cond.WaitForCondition(ctx, func() bool { + // our condition is that the queue has values to dequeue + return len(queue) > 0 + }) + if waiterr != nil { + err = waiterr + unlock() // safe to call regardless of error. + return + } + if consumed >= 29 { + cancel() + return + } + + consumed += 1 + queue = append(make([]int, 0, 10), + queue[1:]...) // we have to append/make because otherwise the cap decreases by 1 each time we do this. + unlock() + Sleep(ctx, 10*time.Millisecond) + } + err = ctx.Err() + }) + + return err + }) + + err := group.Wait() // wait for all goroutines to exit + fmt.Println(err, consumed) + + // Output: + // context canceled 29 +} + func TestCond(t *testing.T) { t.Run("broadcast wakes up waiter", func(t *testing.T) { cond := Cond{} @@ -95,80 +193,82 @@ func TestCond(t *testing.T) { } func TestCond_WaitForCondition(t *testing.T) { - cond := Cond{} t.Run("returns immediately, without error if the lock is free and the condition is met", func(t *testing.T) { + cond := Cond{} + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - mu := &sync.Mutex{} - unlock, err := cond.WaitForCondition(ctx, mu, func() bool { + unlock, err := cond.WaitForCondition(ctx, func() bool { return true }) assert.Nil(t, err) - assert.False(t, mu.TryLock()) // it was able to lock m + assert.False(t, cond.Mu.TryLock()) // it was able to lock m - // the returned function unlocks mu + // the returned function unlocks Mu unlock() - assert.True(t, mu.TryLock()) + assert.True(t, cond.Mu.TryLock()) }) t.Run("blocks until lock is free if condition is met", func(t *testing.T) { + cond := Cond{} + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - mu := &sync.Mutex{} - mu.Lock() // lock it so the condition isn't evaluated until we unlock it + cond.Mu.Lock() // lock it so the condition isn't evaluated until we unlock it waitedForUnlock := atomic.Bool{} go func() { time.Sleep(100 * time.Millisecond) - mu.Unlock() + cond.Mu.Unlock() waitedForUnlock.Store(true) + cond.Broadcast() }() - unlock, err := cond.WaitForCondition(ctx, mu, func() bool { + unlock, err := cond.WaitForCondition(ctx, func() bool { return true }) assert.True(t, waitedForUnlock.Load()) assert.Nil(t, err) - assert.False(t, mu.TryLock()) // it is locked - // the returned function unlocks mu + assert.False(t, cond.Mu.TryLock()) // it is locked + // the returned function unlocks Mu unlock() - assert.True(t, mu.TryLock()) + assert.True(t, cond.Mu.TryLock()) }) t.Run("blocks until condition is met", func(t *testing.T) { + cond := Cond{} + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - mu := &sync.Mutex{} - var i = atomic.Int32{} - unlock, err := cond.WaitForCondition(ctx, mu, func() bool { + unlock, err := cond.WaitForCondition(ctx, func() bool { go func() { i.Add(1) - cond.Broadcast() + cond.Broadcast() // the condition has changed }() return i.Load() > 5 }) assert.Nil(t, err) - assert.False(t, mu.TryLock()) // it is locked - // the returned function unlocks mu + assert.False(t, cond.Mu.TryLock()) // it is locked + // the returned function unlocks Mu unlock() - assert.True(t, mu.TryLock()) + assert.True(t, cond.Mu.TryLock()) }) t.Run("respects context expiry; even if locked", func(t *testing.T) { + cond := Cond{} + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - mu := &sync.Mutex{} - mu.Lock() - + cond.Mu.Lock() go func() { time.Sleep(50 * time.Millisecond) // just a breath so the other goroutine goes first cancel() }() - fn, err := cond.WaitForCondition(ctx, mu, func() bool { + fn, err := cond.WaitForCondition(ctx, func() bool { return true }) defer fn() @@ -176,16 +276,17 @@ func TestCond_WaitForCondition(t *testing.T) { }) t.Run("respects context expiry; if lock is free", func(t *testing.T) { + cond := Cond{} + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - mu := &sync.Mutex{} go func() { time.Sleep(50 * time.Millisecond) // just a breath so the other goroutine goes first cancel() }() - fn, err := cond.WaitForCondition(ctx, mu, func() bool { + fn, err := cond.WaitForCondition(ctx, func() bool { return false }) defer fn() From 1a6a4888cc43c22c1fe77b10fa182a721214ee0e Mon Sep 17 00:00:00 2001 From: Andrew Winterman Date: Wed, 19 Jul 2023 14:09:06 -0700 Subject: [PATCH 08/15] fix: very occasional nil reference; add benchmarks because Jesse was worried about extra allocations, I aded a couple of benchmarks which also revealed a race that could result in an NPE: ``` BenchmarkCond BenchmarkCond/one_broadcasts;_one_wait BenchmarkCond/one_broadcasts;_one_wait-10 100 11171022 ns/op 1.170 ms_corrected/op 489 B/op 9 allocs/op BenchmarkCond/one_broadcasts;_many_waiters BenchmarkCond/one_broadcasts;_many_waiters-10 100 11239043 ns/op 1.230 ms_corrected/op 2023 B/op 37 allocs/op PASS ``` --- pkg/async/cond.go | 15 ++++++---- pkg/async/cond_test.go | 64 ++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 71 insertions(+), 8 deletions(-) diff --git a/pkg/async/cond.go b/pkg/async/cond.go index 68bac30e..e96bf663 100644 --- a/pkg/async/cond.go +++ b/pkg/async/cond.go @@ -24,18 +24,20 @@ type Cond struct { // ch returns the channel that Waiters are waiting on, possibly creating one if it doesn't exist. func (c *Cond) ch() chan struct{} { // non atomic check for nil channel - if c.pointer.Load() == nil { - t := make(chan struct{}) - // make the swap safely. + load := c.pointer.Load() + if load == nil { + t := make(chan struct{}, 0) c.pointer.CompareAndSwap(nil, &t) + return t } - return *c.pointer.Load() + return *load } // Wait waits for the state change Broadcast until context ends. func (c *Cond) Wait(ctx context.Context) error { + ch := c.ch() select { - case <-c.ch(): + case <-ch: return nil case <-ctx.Done(): return ctx.Err() @@ -45,7 +47,8 @@ func (c *Cond) Wait(ctx context.Context) error { // Broadcast signals the state change to all Waiters func (c *Cond) Broadcast() { // now that we retrieved the channel, new calls to Wait should get a new channel - ch := c.pointer.Swap(nil) + c2 := make(chan struct{}, 0) + ch := c.pointer.Swap(&c2) if ch != nil { close(*ch) } diff --git a/pkg/async/cond_test.go b/pkg/async/cond_test.go index a018e30f..731d10af 100644 --- a/pkg/async/cond_test.go +++ b/pkg/async/cond_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "runtime/pprof" + "sync" "sync/atomic" "testing" "time" @@ -61,7 +62,7 @@ func ExampleCond_WaitForCondition() { // enqueue 5 values. this is threadsafe because we are protected by the condition's lock for i := 0; i < 5 && ctx.Err() == nil; i++ { - counter += 1 + counter++ queue = append(queue, counter) } @@ -90,7 +91,7 @@ func ExampleCond_WaitForCondition() { return } - consumed += 1 + consumed++ queue = append(make([]int, 0, 10), queue[1:]...) // we have to append/make because otherwise the cap decreases by 1 each time we do this. unlock() @@ -293,3 +294,62 @@ func TestCond_WaitForCondition(t *testing.T) { assert.Equal(t, context.Canceled, err) }) } + +func BenchmarkCond(b *testing.B) { + wait := time.Millisecond * 10 + b.Run("one broadcasts; one wait", func(b *testing.B) { + b.ReportAllocs() + var cond Cond + start := time.Now() + + for i := 0; i < b.N; i++ { + var g sync.WaitGroup + var ctx, cancel = context.WithCancel(context.Background()) + g.Add(2) + go func() { + time.Sleep(wait) // just a breath so the other goroutine goes first + cond.Broadcast() + g.Done() + }() + go func() { + err := cond.Wait(ctx) + cancel() + g.Done() + assert.Nil(b, err) + }() + g.Wait() + cancel() + } + + correctedDuration := time.Since(start) - wait*time.Duration(b.N) + b.ReportMetric(float64(correctedDuration.Milliseconds())/float64(b.N), "ms_corrected/op") + }) + + b.Run("one broadcasts; 10 waiters", func(b *testing.B) { + b.ReportAllocs() + start := time.Now() + var cond Cond + for i := 0; i < b.N; i++ { + g, ctx := errgroup.WithContext(context.Background()) + ctx, cancel := context.WithCancel(ctx) + + for i := 0; i < 10; i++ { + g.Go(func() error { + var err error + err = cond.Wait(ctx) + return err + }) + } + + go func() { + time.Sleep(wait) // just a breath so the other goroutine goes first + cond.Broadcast() + }() + err := g.Wait() + assert.Nil(b, err) + cancel() + } + correctedDuration := time.Since(start) - wait*time.Duration(b.N) + b.ReportMetric(float64(correctedDuration.Milliseconds())/float64(b.N), "ms_corrected/op") + }) +} From b6f91b4c7b24f01092e9cbb3939790825ff2fd50 Mon Sep 17 00:00:00 2001 From: Andrew Winterman Date: Wed, 19 Jul 2023 14:11:44 -0700 Subject: [PATCH 09/15] fix: satisfy linter --- pkg/async/cond.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/async/cond.go b/pkg/async/cond.go index e96bf663..7056d5ed 100644 --- a/pkg/async/cond.go +++ b/pkg/async/cond.go @@ -21,12 +21,13 @@ type Cond struct { Mu sync.Mutex } -// ch returns the channel that Waiters are waiting on, possibly creating one if it doesn't exist. +// ch returns the channel that Waiters are waiting on, +// possibly creating one if it hasn't been initialized func (c *Cond) ch() chan struct{} { // non atomic check for nil channel load := c.pointer.Load() if load == nil { - t := make(chan struct{}, 0) + t := make(chan struct{}) c.pointer.CompareAndSwap(nil, &t) return t } @@ -46,9 +47,9 @@ func (c *Cond) Wait(ctx context.Context) error { // Broadcast signals the state change to all Waiters func (c *Cond) Broadcast() { - // now that we retrieved the channel, new calls to Wait should get a new channel - c2 := make(chan struct{}, 0) - ch := c.pointer.Swap(&c2) + // swap in a new channel, close the old one + newChan := make(chan struct{}) + ch := c.pointer.Swap(&newChan) if ch != nil { close(*ch) } From 4bd0dcf8fc07ec4286dad893f7a2569df28f7402 Mon Sep 17 00:00:00 2001 From: Andrew Winterman Date: Wed, 19 Jul 2023 15:11:12 -0700 Subject: [PATCH 10/15] fix: satisfy linter again --- pkg/async/cond_test.go | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/pkg/async/cond_test.go b/pkg/async/cond_test.go index 731d10af..ac996a70 100644 --- a/pkg/async/cond_test.go +++ b/pkg/async/cond_test.go @@ -27,21 +27,21 @@ func ExampleCond_WaitForCondition() { var ctx, cancel = context.WithTimeout(context.Background(), time.Hour) defer cancel() - // Create a new condition variable; the zero value is ready to use. - // Cond protects and synchronizes goroutines that need to respond to changes in the queue's state. - var cond Cond - - // state represents the external state we are synchronizing on - var queue = make([]int, 0, 10) - // counter is used to generate unique values for the queue - // it is also protected by cond - var counter int - // consumed is used to track how many values we have consumed - // it is also protected by cond - var consumed int - - // we're going to run multiple goroutines, Group will keep track of them for us. - var group errgroup.Group + var ( + // Create a new condition variable; the zero value is ready to use. + // Cond protects and synchronizes goroutines that need to respond to changes in the queue's state. + cond Cond + // state represents the external state we are synchronizing on + queue = make([]int, 0, 10) + // counter is used to generate unique values for the queue + // it is also protected by cond + counter int + // consumed is used to track how many values we have consumed + // it is also protected by cond + consumed int + // we're going to run multiple goroutines, Group will keep track of them for us. + group errgroup.Group + ) // this goroutine is the producer, it will enqueue values into the queue when there is capacity group.Go(func() (err error) { @@ -335,8 +335,7 @@ func BenchmarkCond(b *testing.B) { for i := 0; i < 10; i++ { g.Go(func() error { - var err error - err = cond.Wait(ctx) + err := cond.Wait(ctx) return err }) } From 46056472f5196aa795ba50682908641b9ca65e09 Mon Sep 17 00:00:00 2001 From: Andrew Winterman Date: Fri, 21 Jul 2023 16:33:03 -0700 Subject: [PATCH 11/15] improve documentation --- pkg/async/cond.go | 41 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/pkg/async/cond.go b/pkg/async/cond.go index 7056d5ed..0d325004 100644 --- a/pkg/async/cond.go +++ b/pkg/async/cond.go @@ -10,11 +10,13 @@ import ( "sync/atomic" ) -// Cond is a sync.Cond that respects context cancellation. -// It provides equivalent functionality to sync.Cond (excepting there is no `Signal` method), except that -// the Wait method exits with error if the context cancels. +// Cond mimics sync.Cond in purpose, +// but respects context cancellation and wraps up the instructions on how to safely Wait for a condition in a +// specific method. // -// It also provides WaitForCondition, which intends to encapsulate the common pattern of acquiring a lock, +// It provides functionality similar sync.Cond (excepting there is no `Signal` method), except: +// - the Wait method exits with error if the context cancels. +// - it rovides WaitForCondition, which intends to encapsulate the common pattern of acquiring a lock, // checking a condition, and releasing the lock before waiting for a state change if the condition is not met. type Cond struct { pointer atomic.Pointer[chan struct{}] @@ -35,6 +37,7 @@ func (c *Cond) ch() chan struct{} { } // Wait waits for the state change Broadcast until context ends. +// If the returned error is non-nil, then the context ended before the state change Broadcast. func (c *Cond) Wait(ctx context.Context) error { ch := c.ch() select { @@ -65,6 +68,36 @@ func (c *Cond) Broadcast() { // // If it returns without error, it also locks the provided locker and the caller must call the returned function // to unlock it. Until they call unlock, the state should not be changed. +// +// This method encapsulates the instructions in sync.Cond.Wait: +// +// """ +// Because c.L is not locked while Wait is waiting, the caller +// typically cannot assume that the condition is true when +// Wait returns. Instead, the caller should Wait in a loop: +// +// c.L.Lock() +// for !condition() { +// c.Wait() +// } +// ... make use of condition ... +// c.L.Unlock() +// +// """ +// +// Instead, you can do the following: +// +// var c Cond +// +// unlock, err := c.WaitForCondition(ctx, func() bool { +// // check condition +// return true +// }) +// if err != nil { +// // context expired before condition was met +// } +// ... make use of condition ... +// unlock() func (c *Cond) WaitForCondition(ctx context.Context, condition func() bool) (unlock func(), err error) { for { From 5a99073f07c7d83ccfca1f6f96766c86bc06e54e Mon Sep 17 00:00:00 2001 From: Andrew Winterman Date: Fri, 21 Jul 2023 16:34:18 -0700 Subject: [PATCH 12/15] docs wordsmithing --- pkg/async/cond.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/async/cond.go b/pkg/async/cond.go index 0d325004..d1890eb2 100644 --- a/pkg/async/cond.go +++ b/pkg/async/cond.go @@ -10,13 +10,11 @@ import ( "sync/atomic" ) -// Cond mimics sync.Cond in purpose, -// but respects context cancellation and wraps up the instructions on how to safely Wait for a condition in a -// specific method. +// Cond mimics sync.Cond in purpose, with the added goals of easing usability slightly and respecting context expiry. // // It provides functionality similar sync.Cond (excepting there is no `Signal` method), except: // - the Wait method exits with error if the context cancels. -// - it rovides WaitForCondition, which intends to encapsulate the common pattern of acquiring a lock, +// - it provides WaitForCondition, which intends to encapsulate the common pattern of acquiring a lock, // checking a condition, and releasing the lock before waiting for a state change if the condition is not met. type Cond struct { pointer atomic.Pointer[chan struct{}] From e8bffc42306c481401e5559b50abb7afa532d5b8 Mon Sep 17 00:00:00 2001 From: Andrew Winterman Date: Fri, 21 Jul 2023 16:40:39 -0700 Subject: [PATCH 13/15] slight bit more wordsmithing --- pkg/async/cond.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/async/cond.go b/pkg/async/cond.go index d1890eb2..957b4d7f 100644 --- a/pkg/async/cond.go +++ b/pkg/async/cond.go @@ -12,10 +12,11 @@ import ( // Cond mimics sync.Cond in purpose, with the added goals of easing usability slightly and respecting context expiry. // -// It provides functionality similar sync.Cond (excepting there is no `Signal` method), except: -// - the Wait method exits with error if the context cancels. -// - it provides WaitForCondition, which intends to encapsulate the common pattern of acquiring a lock, -// checking a condition, and releasing the lock before waiting for a state change if the condition is not met. +// It provides functionality similar sync.Cond +// - there is no signal method for waking a single Waiter +// - the Wait method exits with error if the context cancels. +// - it provides WaitForCondition, which intends to encapsulate the common pattern of acquiring a lock, checking +// a condition, and releasing the lock before waiting for a state change if the condition is not met. type Cond struct { pointer atomic.Pointer[chan struct{}] Mu sync.Mutex From 3eabeffcd4cf6ca198a1959f70e851531d93c08c Mon Sep 17 00:00:00 2001 From: AndrewWinterman <113374170+AndrewWinterman@users.noreply.github.com> Date: Mon, 7 Aug 2023 14:31:15 -0700 Subject: [PATCH 14/15] Update pkg/async/cond.go Co-authored-by: Jesse Kinkead --- pkg/async/cond.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/async/cond.go b/pkg/async/cond.go index 957b4d7f..f7d31daf 100644 --- a/pkg/async/cond.go +++ b/pkg/async/cond.go @@ -47,7 +47,7 @@ func (c *Cond) Wait(ctx context.Context) error { } } -// Broadcast signals the state change to all Waiters +// Broadcast signals the state change to all Waiters. func (c *Cond) Broadcast() { // swap in a new channel, close the old one newChan := make(chan struct{}) From 47297a378f4c42a23327f121ad2803192bc442a3 Mon Sep 17 00:00:00 2001 From: AndrewWinterman <113374170+AndrewWinterman@users.noreply.github.com> Date: Mon, 7 Aug 2023 14:31:35 -0700 Subject: [PATCH 15/15] Update pkg/async/cond.go Co-authored-by: Jesse Kinkead --- pkg/async/cond.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/async/cond.go b/pkg/async/cond.go index f7d31daf..ba7d82f5 100644 --- a/pkg/async/cond.go +++ b/pkg/async/cond.go @@ -117,7 +117,7 @@ func (c *Cond) WaitForCondition(ctx context.Context, condition func() bool) (unl return func() {}, waitErr } } else { - // condition met, return the unlock function and nil error + // condition met, return the unlock function and nil error. // client must call the unlock function to unlock the mutex // client guaranteed the condition holds while mutex lock is held. return func() {