diff options
| author | Sam Anthony <sam@samanthony.xyz> | 2024-02-08 01:37:07 -0500 |
|---|---|---|
| committer | Sam Anthony <sam@samanthony.xyz> | 2024-02-08 01:37:07 -0500 |
| commit | a4ca32963cba9922ec92efe1efa165d880513752 (patch) | |
| tree | f887f1d40c0238e2c9977a130a3775078359c704 /broadcast.go | |
| parent | 7da4e714fee24ac3dca1a6e060edf60bb3954e09 (diff) | |
| download | volute-a4ca32963cba9922ec92efe1efa165d880513752.zip | |
refactor main
Diffstat (limited to 'broadcast.go')
| -rw-r--r-- | broadcast.go | 59 |
1 files changed, 59 insertions, 0 deletions
diff --git a/broadcast.go b/broadcast.go new file mode 100644 index 0000000..7268971 --- /dev/null +++ b/broadcast.go @@ -0,0 +1,59 @@ +package main + +import "sync" + +// Broadcast sends data sent from source to all destination channels. +type Broadcast[T any] struct { + source chan T + destinations []chan<- T + + mu sync.Mutex + wg sync.WaitGroup +} + +// The caller is responsible for closing source. When source is closed, +// Broadcast will close all destinations. +func NewBroadcast[T any](source chan T) Broadcast[T] { + bc := Broadcast[T]{ + source, + make([]chan<- T, 0), + sync.Mutex{}, + sync.WaitGroup{}, + } + + go func(bc *Broadcast[T]) { + bc.wg.Add(1) + + for v := range bc.source { + bc.mu.Lock() + for _, dest := range bc.destinations { + dest <- v + } + bc.mu.Unlock() + } + + bc.mu.Lock() + for _, dest := range bc.destinations { + close(dest) + } + bc.mu.Unlock() + + bc.wg.Done() + }(&bc) + return bc +} + +func (bc *Broadcast[T]) AddDestination() <-chan T { + bc.mu.Lock() + defer bc.mu.Unlock() + + ch := make(chan T) + bc.destinations = append(bc.destinations, ch) + return ch +} + +// Wait for the Broadcast to see that source is closed and to close the +// destinations. +func (bc *Broadcast[T]) Wait() { + bc.wg.Wait() +} |