summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--deque.go115
-rw-r--r--deque_test.go128
-rw-r--r--queue.go3
-rw-r--r--queue_test.go17
-rw-r--r--slice_test.go14
-rw-r--r--val_test.go24
6 files changed, 287 insertions, 14 deletions
diff --git a/deque.go b/deque.go
new file mode 100644
index 0000000..0001f68
--- /dev/null
+++ b/deque.go
@@ -0,0 +1,115 @@
+package share
+
+// Deque is a double-ended queue with an unlimited capacity.
+type Deque[T any] struct {
+ // Sending to PutTail adds an element to the back of the deque
+ // and never blocks.
+ PutTail chan<- T
+
+ // Sending to PutHead adds an element to the front of the deque
+ // and never blocks.
+ PutHead chan<- T
+
+ // Receiving from TakeHead removes an element from the front
+ // of the deque, or, if the queue is empty, blocks until an element
+ // is enqueued.
+ TakeHead <-chan T
+
+ // Receiving from TakeTail removes an element from the back
+ // of the deque, or, if the deque is empty, blocks until an element
+ // is enqueued.
+ TakeTail <-chan T
+}
+
+func NewDeque[T any]() Deque[T] {
+ putTail, putHead := make(chan T), make(chan T)
+ takeHead, takeTail := make(chan T), make(chan T)
+
+ go run(putTail, putHead, takeHead, takeTail)
+
+ return Deque[T]{
+ PutTail: putTail,
+ PutHead: putHead,
+ TakeHead: takeHead,
+ TakeTail: takeTail,
+ }
+}
+
+func run[T any](putTail, putHead <-chan T, takeHead, takeTail chan<- T) {
+ defer close(takeTail)
+ defer close(takeHead)
+
+ var buf []T
+
+ // While the Put channels are open,
+ for ok := true; ok; {
+ if len(buf) > 0 {
+ buf, ok = putOrTake(putTail, putHead, takeHead, takeTail, buf)
+ } else {
+ buf, ok = put(putTail, putHead, buf)
+ }
+ }
+
+ flush(takeHead, takeTail, buf)
+}
+
+func flush[T any](takeHead, takeTail chan<- T, buf []T) {
+ for len(buf) > 0 {
+ select {
+ case takeHead <- buf[0]:
+ buf = buf[1:]
+ case takeTail <- buf[len(buf)-1]:
+ buf = buf[:len(buf)-1]
+ }
+ }
+}
+
+func putOrTake[T any](putTail, putHead <-chan T, takeHead, takeTail chan<- T, buf []T) ([]T, bool) {
+ select {
+ case takeHead <- buf[0]:
+ buf = buf[1:]
+
+ case takeTail <- buf[len(buf)-1]:
+ buf = buf[:len(buf)-1]
+
+ case v, ok := <-putTail:
+ if !ok {
+ return buf, false
+ }
+ buf = append(buf, v)
+
+ case v, ok := <-putHead:
+ if !ok {
+ return buf, false
+ }
+ buf = append([]T{v}, buf...)
+ }
+
+ return buf, true
+}
+
+func put[T any](putTail, putHead <-chan T, buf []T) ([]T, bool) {
+ var v T
+ var ok bool
+
+ select {
+ case v, ok = <-putTail:
+ if ok {
+ buf = append(buf, v)
+ }
+ case v, ok = <-putHead:
+ if ok {
+ buf = append([]T{v}, buf...)
+ }
+ }
+
+ return buf, ok
+}
+
+// Close the Put channels of the deque.
+// The deque will wait until all elements have been drained through
+// either of the Take channels before closing them.
+func (dq Deque[T]) Close() {
+ close(dq.PutTail)
+ close(dq.PutHead)
+}
diff --git a/deque_test.go b/deque_test.go
new file mode 100644
index 0000000..4f64585
--- /dev/null
+++ b/deque_test.go
@@ -0,0 +1,128 @@
+package share_test
+
+import (
+ "fmt"
+ "slices"
+ "sync"
+ "testing"
+
+ "github.com/sam-rba/share"
+)
+
+func TestDequeFIFO(t *testing.T) {
+ dq := share.NewDeque[string]()
+ vals := []string{"foo", "bar", "baz", "xyz"}
+ testFIFO(t, dq.PutTail, dq.TakeHead, vals, dq.Close)
+}
+
+func TestDequeReverseFIFO(t *testing.T) {
+ dq := share.NewDeque[string]()
+ vals := []string{"foo", "bar", "baz", "xyz"}
+ testFIFO(t, dq.PutHead, dq.TakeTail, vals, dq.Close)
+}
+
+func testFIFO[T comparable](t *testing.T, put chan<- T, take <-chan T, vals []T, end func()) {
+ var wg sync.WaitGroup
+ wg.Add(2)
+ go func() {
+ produce(put, vals)
+ end()
+ wg.Done()
+ }()
+ go func() {
+ consume(t, take, vals)
+ wg.Done()
+ }()
+ wg.Wait()
+}
+
+func TestDequeLIFO(t *testing.T) {
+ dq := share.NewDeque[string]()
+ vals := []string{"foo", "bar", "baz", "xyz"}
+ testLIFO(t, dq.PutTail, dq.TakeTail, vals, dq.Close)
+}
+
+func TestDequeReverseLIFO(t *testing.T) {
+ dq := share.NewDeque[string]()
+ vals := []string{"foo", "bar", "baz", "xyz"}
+ testLIFO(t, dq.PutHead, dq.TakeHead, vals, dq.Close)
+}
+
+func testLIFO[T comparable](t *testing.T, put chan<- T, take <-chan T, vals []T, end func()) {
+ var wg sync.WaitGroup
+
+ wg.Add(1)
+ go func() {
+ produce(put, vals)
+ end()
+ wg.Done()
+ }()
+
+ want := slices.Clone(vals)
+ slices.Reverse(want)
+ wg.Wait()
+ consume(t, take, want)
+}
+
+func produce[T any](put chan<- T, vals []T) {
+ for _, v := range vals {
+ put <- v
+ }
+}
+
+func consume[T comparable](t *testing.T, take <-chan T, want []T) {
+ i := 0
+ for v := range take {
+ if i >= len(want) {
+ t.Fatal("received too many")
+ }
+ if v != want[i] {
+ t.Fatalf("Index %d: %v; want %v", i, v, want[i])
+ }
+ i++
+ }
+ if i < len(want) {
+ t.Fatalf("Only received %d; want %d", i, len(want))
+ }
+}
+
+func TestDequePutback(t *testing.T) {
+ dq := share.NewDeque[string]()
+
+ dq.PutTail <- "foo"
+ dq.PutTail <- "bar"
+ dq.PutTail <- "baz"
+ dq.PutTail <- "xyz"
+
+ <-dq.TakeHead // foo
+ <-dq.TakeHead // bar
+ <-dq.TakeHead // baz
+
+ dq.PutHead <- "baz"
+ dq.PutHead <- "bar"
+
+ dq.Close()
+
+ consume(t, dq.TakeHead, []string{"bar", "baz", "xyz"})
+}
+
+func ExampleDeque() {
+ // Use as a FIFO queue
+ dq := share.NewDeque[string]()
+
+ // Producer
+ go func() {
+ defer dq.Close()
+ for _, word := range []string{"foo", "bar", "baz"} {
+ dq.PutTail <- word
+ }
+ }()
+
+ for word := range dq.TakeHead {
+ fmt.Println(word)
+ }
+ // Output:
+ // foo
+ // bar
+ // baz
+}
diff --git a/queue.go b/queue.go
index 5e28412..cd99994 100644
--- a/queue.go
+++ b/queue.go
@@ -8,8 +8,7 @@ package share
type Queue[T any] struct {
// Sending to Enqueue adds an element to the back of the Queue
// and never blocks.
- Enqueue chan<-T
-
+ Enqueue chan<- T
// Receiving from Dequeue removes an element from the front
// of the queue or, if the queue is empty, blocks until an element
diff --git a/queue_test.go b/queue_test.go
index 9e7d397..a994620 100644
--- a/queue_test.go
+++ b/queue_test.go
@@ -1,6 +1,7 @@
package share_test
import (
+ "sync"
"testing"
"github.com/sam-rba/share"
@@ -8,21 +9,27 @@ import (
func TestQueue(t *testing.T) {
q := share.NewQueue[string]()
-
vals := []string{"foo", "bar", "baz", "xyz"}
-
+ var wg sync.WaitGroup
+
+ // Producer
+ wg.Add(1)
go func() {
+ defer wg.Done()
for _, v := range vals {
q.Enqueue <- v
}
close(q.Enqueue)
}()
+ // Consumer
+ wg.Add(1)
go func() {
+ defer wg.Done()
i := 0
for front := range q.Dequeue {
t.Log("received", front, "from queue")
- if i > len(vals)-1 {
+ if i >= len(vals) {
t.Fatal("received too many elements from queue")
}
if front != vals[i] {
@@ -34,4 +41,6 @@ func TestQueue(t *testing.T) {
t.Fatal("did not receive enough values from queue")
}
}()
-} \ No newline at end of file
+
+ wg.Wait()
+}
diff --git a/slice_test.go b/slice_test.go
index bdb6267..af44eaf 100644
--- a/slice_test.go
+++ b/slice_test.go
@@ -1,6 +1,7 @@
package share_test
import (
+ "sync"
"testing"
"github.com/sam-rba/share"
@@ -8,12 +9,21 @@ import (
func TestConstSlice(t *testing.T) {
orig := []string{"foo", "bar", "baz"}
+
shared := share.NewConstSlice(orig)
- verifySameSlice(shared, orig, t)
+ defer shared.Close()
+
+ var wg sync.WaitGroup
+ wg.Add(2)
+ go func() {
+ verifySameSlice(shared, orig, t)
+ wg.Done()
+ }()
go func() {
- defer shared.Close()
verifySameSlice(shared, orig, t)
+ wg.Done()
}()
+ wg.Wait()
}
func verifySameSlice[T comparable](cs share.ConstSlice[T], s []T, t *testing.T) {
diff --git a/val_test.go b/val_test.go
index 5c37d27..b2ac6d6 100644
--- a/val_test.go
+++ b/val_test.go
@@ -1,6 +1,7 @@
package share_test
import (
+ "sync"
"testing"
"github.com/sam-rba/share"
@@ -9,29 +10,40 @@ import (
// Set value in local goroutine, verify in remote goroutine.
func TestValSetLocal(t *testing.T) {
val := "foo"
+
sharedVal := share.NewVal[string]()
sharedVal.Set <- val
- verifySameVal(sharedVal, val, t)
+ defer sharedVal.Close()
+
+ var wg sync.WaitGroup
+ wg.Add(2)
go func() {
- defer sharedVal.Close()
verifySameVal(sharedVal, val, t)
+ wg.Done()
}()
+ go func() {
+ verifySameVal(sharedVal, val, t)
+ wg.Done()
+ }()
+ wg.Wait()
}
// Set value in remote goroutine, verify in local goroutine.
func TestValSetRemote(t *testing.T) {
val := "foo"
+
sharedVal := share.NewVal[string]()
defer sharedVal.Close()
- done := make(chan bool)
- defer close(done)
+
+ var wg sync.WaitGroup
+ wg.Add(1)
go func() {
sharedVal.Set <- val
verifySameVal(sharedVal, val, t)
- done <- true
+ wg.Done()
}()
verifySameVal(sharedVal, val, t)
- <-done
+ wg.Wait()
}
// Val.TryGet() before Set should fail.