diff options
| author | Sam Anthony <sam@samanthony.xyz> | 2024-08-24 16:58:01 -0400 |
|---|---|---|
| committer | Sam Anthony <sam@samanthony.xyz> | 2024-08-24 16:58:01 -0400 |
| commit | 9ad0f9bb2bdfe47f144e2a0e8951dcfe6344a4ea (patch) | |
| tree | ba2e3e0d11ae0759ce6bf70989279fd545b0b569 /queue.go | |
| parent | c25a68894f3bce8a8e63289b46ecd6e29f5cc227 (diff) | |
| download | share-9ad0f9bb2bdfe47f144e2a0e8951dcfe6344a4ea.zip | |
Queuev0.2.0
Diffstat (limited to 'queue.go')
| -rw-r--r-- | queue.go | 49 |
1 files changed, 49 insertions, 0 deletions
diff --git a/queue.go b/queue.go new file mode 100644 index 0000000..5e28412 --- /dev/null +++ b/queue.go @@ -0,0 +1,49 @@ +package share + +// Queue is a FIFO queue with an unlimited capacity. +// +// Closing the Enqueue channel closes the Queue. The Queue waits +// until all elements have been drained from the Dequeue channel +// before closing it. +type Queue[T any] struct { + // Sending to Enqueue adds an element to the back of the Queue + // and never blocks. + Enqueue chan<-T + + + // Receiving from Dequeue removes an element from the front + // of the queue or, if the queue is empty, blocks until an element + // is enqueued. + Dequeue <-chan T +} + +func NewQueue[T any]() Queue[T] { + in, out := make(chan T), make(chan T) + + go func() { + defer close(out) + + var queue []T + + for v := range in { + queue = append(queue, v) + + for len(queue) > 0 { + select { + case out <- queue[0]: + queue = queue[1:] + case v, ok := <-in: + if !ok { + for _, x := range queue { + out <- x + } + return + } + queue = append(queue, v) + } + } + } + }() + + return Queue[T]{Enqueue: in, Dequeue: out} +} |