summaryrefslogtreecommitdiffstats
path: root/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'queue.go')
-rw-r--r--queue.go49
1 files changed, 49 insertions, 0 deletions
diff --git a/queue.go b/queue.go
new file mode 100644
index 0000000..5e28412
--- /dev/null
+++ b/queue.go
@@ -0,0 +1,49 @@
+package share
+
+// Queue is a FIFO queue with an unlimited capacity.
+//
+// Closing the Enqueue channel closes the Queue. The Queue waits
+// until all elements have been drained from the Dequeue channel
+// before closing it.
+type Queue[T any] struct {
+ // Sending to Enqueue adds an element to the back of the Queue
+ // and never blocks.
+ Enqueue chan<-T
+
+
+ // Receiving from Dequeue removes an element from the front
+ // of the queue or, if the queue is empty, blocks until an element
+ // is enqueued.
+ Dequeue <-chan T
+}
+
+func NewQueue[T any]() Queue[T] {
+ in, out := make(chan T), make(chan T)
+
+ go func() {
+ defer close(out)
+
+ var queue []T
+
+ for v := range in {
+ queue = append(queue, v)
+
+ for len(queue) > 0 {
+ select {
+ case out <- queue[0]:
+ queue = queue[1:]
+ case v, ok := <-in:
+ if !ok {
+ for _, x := range queue {
+ out <- x
+ }
+ return
+ }
+ queue = append(queue, v)
+ }
+ }
+ }
+ }()
+
+ return Queue[T]{Enqueue: in, Dequeue: out}
+}