From 9ad0f9bb2bdfe47f144e2a0e8951dcfe6344a4ea Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Sat, 24 Aug 2024 16:58:01 -0400 Subject: Queue --- queue.go | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 queue.go (limited to 'queue.go') diff --git a/queue.go b/queue.go new file mode 100644 index 0000000..5e28412 --- /dev/null +++ b/queue.go @@ -0,0 +1,49 @@ +package share + +// Queue is a FIFO queue with an unlimited capacity. +// +// Closing the Enqueue channel closes the Queue. The Queue waits +// until all elements have been drained from the Dequeue channel +// before closing it. +type Queue[T any] struct { + // Sending to Enqueue adds an element to the back of the Queue + // and never blocks. + Enqueue chan<-T + + + // Receiving from Dequeue removes an element from the front + // of the queue or, if the queue is empty, blocks until an element + // is enqueued. + Dequeue <-chan T +} + +func NewQueue[T any]() Queue[T] { + in, out := make(chan T), make(chan T) + + go func() { + defer close(out) + + var queue []T + + for v := range in { + queue = append(queue, v) + + for len(queue) > 0 { + select { + case out <- queue[0]: + queue = queue[1:] + case v, ok := <-in: + if !ok { + for _, x := range queue { + out <- x + } + return + } + queue = append(queue, v) + } + } + } + }() + + return Queue[T]{Enqueue: in, Dequeue: out} +} -- cgit v1.2.3