libraries/utils/async/ring_buffer.go: PR feedback. Return error on epoch mismatch.

This commit is contained in:
Aaron Son
2021-03-24 15:45:58 -07:00
parent b77f2bad8c
commit 9b763af5a6
2 changed files with 23 additions and 1 deletions
+6 -1
View File
@@ -15,6 +15,7 @@
package async
import (
"errors"
"io"
"os"
"sync"
@@ -35,6 +36,10 @@ type RingBuffer struct {
items [][]interface{}
}
// Returned from Push() when the supplied epoch does not match the
// buffer's current epoch.
var ErrWrongEpoch = errors.New("ring buffer: wrong epoch")
// NewRingBuffer creates a new RingBuffer instance
func NewRingBuffer(allocSize int) *RingBuffer {
itemBuffer := make([]interface{}, allocSize*2)
@@ -93,7 +98,7 @@ func (rb *RingBuffer) Push(item interface{}, epoch int) error {
return os.ErrClosed
}
if epoch != rb.epoch {
return nil
return ErrWrongEpoch
}
rb.items[rb.headSlice][rb.headPos] = item
@@ -161,3 +161,20 @@ func TestNProducersNConsumers(t *testing.T) {
})
}
}
func TestRingBufferEpoch(t *testing.T) {
rb := NewRingBuffer(1024)
epoch := rb.Reset()
err := rb.Push(1, epoch)
assert.NoError(t, err)
err = rb.Push(2, epoch+1)
assert.Error(t, err)
assert.Equal(t, ErrWrongEpoch, err)
v, ok := rb.TryPop()
assert.True(t, ok)
assert.Equal(t, 1, v)
_, ok = rb.TryPop()
assert.False(t, ok)
newEpoch := rb.Reset()
assert.NotEqual(t, epoch, newEpoch)
}