diff options
Diffstat (limited to 'broadcast.go')
| -rw-r--r-- | broadcast.go | 59 |
1 files changed, 59 insertions, 0 deletions
diff --git a/broadcast.go b/broadcast.go new file mode 100644 index 0000000..7268971 --- /dev/null +++ b/broadcast.go @@ -0,0 +1,59 @@ +package main + +import "sync" + +// Broadcast sends data sent from source to all destination channels. +type Broadcast[T any] struct { + source chan T + destinations []chan<- T + + mu sync.Mutex + wg sync.WaitGroup +} + +// The caller is responsible for closing source. When source is closed, +// Broadcast will close all destinations. +func NewBroadcast[T any](source chan T) Broadcast[T] { + bc := Broadcast[T]{ + source, + make([]chan<- T, 0), + sync.Mutex{}, + sync.WaitGroup{}, + } + + go func(bc *Broadcast[T]) { + bc.wg.Add(1) + + for v := range bc.source { + bc.mu.Lock() + for _, dest := range bc.destinations { + dest <- v + } + bc.mu.Unlock() + } + + bc.mu.Lock() + for _, dest := range bc.destinations { + close(dest) + } + bc.mu.Unlock() + + bc.wg.Done() + }(&bc) + return bc +} + +func (bc *Broadcast[T]) AddDestination() <-chan T { + bc.mu.Lock() + defer bc.mu.Unlock() + + ch := make(chan T) + bc.destinations = append(bc.destinations, ch) + return ch +} + +// Wait for the Broadcast to see that source is closed and to close the +// destinations. +func (bc *Broadcast[T]) Wait() { + bc.wg.Wait() +} |