diff options
| author | Sam Anthony <sam@samanthony.xyz> | 2025-09-15 11:45:04 -0400 |
|---|---|---|
| committer | Sam Anthony <sam@samanthony.xyz> | 2025-09-15 11:45:04 -0400 |
| commit | abdfc41f64938dc77a23e35db2768e46e19f6d18 (patch) | |
| tree | 651492ebaf9fd3d90834ca2f1f5499ec1d315480 | |
| parent | a26dcba10a0d02a2e74784d87d93f5f5e53c5c24 (diff) | |
| download | share-0.3.0.zip | |
| -rw-r--r-- | deque.go | 115 | ||||
| -rw-r--r-- | deque_test.go | 128 | ||||
| -rw-r--r-- | queue.go | 3 | ||||
| -rw-r--r-- | queue_test.go | 17 | ||||
| -rw-r--r-- | slice_test.go | 14 | ||||
| -rw-r--r-- | val_test.go | 24 |
6 files changed, 287 insertions, 14 deletions
diff --git a/deque.go b/deque.go new file mode 100644 index 0000000..0001f68 --- /dev/null +++ b/deque.go @@ -0,0 +1,115 @@ +package share + +// Deque is a double-ended queue with an unlimited capacity. +type Deque[T any] struct { + // Sending to PutTail adds an element to the back of the deque + // and never blocks. + PutTail chan<- T + + // Sending to PutHead adds an element to the front of the deque + // and never blocks. + PutHead chan<- T + + // Receiving from TakeHead removes an element from the front + // of the deque, or, if the queue is empty, blocks until an element + // is enqueued. + TakeHead <-chan T + + // Receiving from TakeTail removes an element from the back + // of the deque, or, if the deque is empty, blocks until an element + // is enqueued. + TakeTail <-chan T +} + +func NewDeque[T any]() Deque[T] { + putTail, putHead := make(chan T), make(chan T) + takeHead, takeTail := make(chan T), make(chan T) + + go run(putTail, putHead, takeHead, takeTail) + + return Deque[T]{ + PutTail: putTail, + PutHead: putHead, + TakeHead: takeHead, + TakeTail: takeTail, + } +} + +func run[T any](putTail, putHead <-chan T, takeHead, takeTail chan<- T) { + defer close(takeTail) + defer close(takeHead) + + var buf []T + + // While the Put channels are open, + for ok := true; ok; { + if len(buf) > 0 { + buf, ok = putOrTake(putTail, putHead, takeHead, takeTail, buf) + } else { + buf, ok = put(putTail, putHead, buf) + } + } + + flush(takeHead, takeTail, buf) +} + +func flush[T any](takeHead, takeTail chan<- T, buf []T) { + for len(buf) > 0 { + select { + case takeHead <- buf[0]: + buf = buf[1:] + case takeTail <- buf[len(buf)-1]: + buf = buf[:len(buf)-1] + } + } +} + +func putOrTake[T any](putTail, putHead <-chan T, takeHead, takeTail chan<- T, buf []T) ([]T, bool) { + select { + case takeHead <- buf[0]: + buf = buf[1:] + + case takeTail <- buf[len(buf)-1]: + buf = buf[:len(buf)-1] + + case v, ok := <-putTail: + if !ok { + return buf, false + } + buf = append(buf, v) + + case v, ok := <-putHead: + if !ok { + return buf, false + } + buf = append([]T{v}, buf...) + } + + return buf, true +} + +func put[T any](putTail, putHead <-chan T, buf []T) ([]T, bool) { + var v T + var ok bool + + select { + case v, ok = <-putTail: + if ok { + buf = append(buf, v) + } + case v, ok = <-putHead: + if ok { + buf = append([]T{v}, buf...) + } + } + + return buf, ok +} + +// Close the Put channels of the deque. +// The deque will wait until all elements have been drained through +// either of the Take channels before closing them. +func (dq Deque[T]) Close() { + close(dq.PutTail) + close(dq.PutHead) +} diff --git a/deque_test.go b/deque_test.go new file mode 100644 index 0000000..4f64585 --- /dev/null +++ b/deque_test.go @@ -0,0 +1,128 @@ +package share_test + +import ( + "fmt" + "slices" + "sync" + "testing" + + "github.com/sam-rba/share" +) + +func TestDequeFIFO(t *testing.T) { + dq := share.NewDeque[string]() + vals := []string{"foo", "bar", "baz", "xyz"} + testFIFO(t, dq.PutTail, dq.TakeHead, vals, dq.Close) +} + +func TestDequeReverseFIFO(t *testing.T) { + dq := share.NewDeque[string]() + vals := []string{"foo", "bar", "baz", "xyz"} + testFIFO(t, dq.PutHead, dq.TakeTail, vals, dq.Close) +} + +func testFIFO[T comparable](t *testing.T, put chan<- T, take <-chan T, vals []T, end func()) { + var wg sync.WaitGroup + wg.Add(2) + go func() { + produce(put, vals) + end() + wg.Done() + }() + go func() { + consume(t, take, vals) + wg.Done() + }() + wg.Wait() +} + +func TestDequeLIFO(t *testing.T) { + dq := share.NewDeque[string]() + vals := []string{"foo", "bar", "baz", "xyz"} + testLIFO(t, dq.PutTail, dq.TakeTail, vals, dq.Close) +} + +func TestDequeReverseLIFO(t *testing.T) { + dq := share.NewDeque[string]() + vals := []string{"foo", "bar", "baz", "xyz"} + testLIFO(t, dq.PutHead, dq.TakeHead, vals, dq.Close) +} + +func testLIFO[T comparable](t *testing.T, put chan<- T, take <-chan T, vals []T, end func()) { + var wg sync.WaitGroup + + wg.Add(1) + go func() { + produce(put, vals) + end() + wg.Done() + }() + + want := slices.Clone(vals) + slices.Reverse(want) + wg.Wait() + consume(t, take, want) +} + +func produce[T any](put chan<- T, vals []T) { + for _, v := range vals { + put <- v + } +} + +func consume[T comparable](t *testing.T, take <-chan T, want []T) { + i := 0 + for v := range take { + if i >= len(want) { + t.Fatal("received too many") + } + if v != want[i] { + t.Fatalf("Index %d: %v; want %v", i, v, want[i]) + } + i++ + } + if i < len(want) { + t.Fatalf("Only received %d; want %d", i, len(want)) + } +} + +func TestDequePutback(t *testing.T) { + dq := share.NewDeque[string]() + + dq.PutTail <- "foo" + dq.PutTail <- "bar" + dq.PutTail <- "baz" + dq.PutTail <- "xyz" + + <-dq.TakeHead // foo + <-dq.TakeHead // bar + <-dq.TakeHead // baz + + dq.PutHead <- "baz" + dq.PutHead <- "bar" + + dq.Close() + + consume(t, dq.TakeHead, []string{"bar", "baz", "xyz"}) +} + +func ExampleDeque() { + // Use as a FIFO queue + dq := share.NewDeque[string]() + + // Producer + go func() { + defer dq.Close() + for _, word := range []string{"foo", "bar", "baz"} { + dq.PutTail <- word + } + }() + + for word := range dq.TakeHead { + fmt.Println(word) + } + // Output: + // foo + // bar + // baz +} @@ -8,8 +8,7 @@ package share type Queue[T any] struct { // Sending to Enqueue adds an element to the back of the Queue // and never blocks. - Enqueue chan<-T - + Enqueue chan<- T // Receiving from Dequeue removes an element from the front // of the queue or, if the queue is empty, blocks until an element diff --git a/queue_test.go b/queue_test.go index 9e7d397..a994620 100644 --- a/queue_test.go +++ b/queue_test.go @@ -1,6 +1,7 @@ package share_test import ( + "sync" "testing" "github.com/sam-rba/share" @@ -8,21 +9,27 @@ import ( func TestQueue(t *testing.T) { q := share.NewQueue[string]() - vals := []string{"foo", "bar", "baz", "xyz"} - + var wg sync.WaitGroup + + // Producer + wg.Add(1) go func() { + defer wg.Done() for _, v := range vals { q.Enqueue <- v } close(q.Enqueue) }() + // Consumer + wg.Add(1) go func() { + defer wg.Done() i := 0 for front := range q.Dequeue { t.Log("received", front, "from queue") - if i > len(vals)-1 { + if i >= len(vals) { t.Fatal("received too many elements from queue") } if front != vals[i] { @@ -34,4 +41,6 @@ func TestQueue(t *testing.T) { t.Fatal("did not receive enough values from queue") } }() -}
\ No newline at end of file + + wg.Wait() +} diff --git a/slice_test.go b/slice_test.go index bdb6267..af44eaf 100644 --- a/slice_test.go +++ b/slice_test.go @@ -1,6 +1,7 @@ package share_test import ( + "sync" "testing" "github.com/sam-rba/share" @@ -8,12 +9,21 @@ import ( func TestConstSlice(t *testing.T) { orig := []string{"foo", "bar", "baz"} + shared := share.NewConstSlice(orig) - verifySameSlice(shared, orig, t) + defer shared.Close() + + var wg sync.WaitGroup + wg.Add(2) + go func() { + verifySameSlice(shared, orig, t) + wg.Done() + }() go func() { - defer shared.Close() verifySameSlice(shared, orig, t) + wg.Done() }() + wg.Wait() } func verifySameSlice[T comparable](cs share.ConstSlice[T], s []T, t *testing.T) { diff --git a/val_test.go b/val_test.go index 5c37d27..b2ac6d6 100644 --- a/val_test.go +++ b/val_test.go @@ -1,6 +1,7 @@ package share_test import ( + "sync" "testing" "github.com/sam-rba/share" @@ -9,29 +10,40 @@ import ( // Set value in local goroutine, verify in remote goroutine. func TestValSetLocal(t *testing.T) { val := "foo" + sharedVal := share.NewVal[string]() sharedVal.Set <- val - verifySameVal(sharedVal, val, t) + defer sharedVal.Close() + + var wg sync.WaitGroup + wg.Add(2) go func() { - defer sharedVal.Close() verifySameVal(sharedVal, val, t) + wg.Done() }() + go func() { + verifySameVal(sharedVal, val, t) + wg.Done() + }() + wg.Wait() } // Set value in remote goroutine, verify in local goroutine. func TestValSetRemote(t *testing.T) { val := "foo" + sharedVal := share.NewVal[string]() defer sharedVal.Close() - done := make(chan bool) - defer close(done) + + var wg sync.WaitGroup + wg.Add(1) go func() { sharedVal.Set <- val verifySameVal(sharedVal, val, t) - done <- true + wg.Done() }() verifySameVal(sharedVal, val, t) - <-done + wg.Wait() } // Val.TryGet() before Set should fail. |