aboutsummaryrefslogtreecommitdiffstats
path: root/broadcast.go
diff options
context:
space:
mode:
authorSam Anthony <sam@samanthony.xyz>2024-02-08 01:37:07 -0500
committerSam Anthony <sam@samanthony.xyz>2024-02-08 01:37:07 -0500
commita4ca32963cba9922ec92efe1efa165d880513752 (patch)
treef887f1d40c0238e2c9977a130a3775078359c704 /broadcast.go
parent7da4e714fee24ac3dca1a6e060edf60bb3954e09 (diff)
downloadvolute-a4ca32963cba9922ec92efe1efa165d880513752.zip
refactor main
Diffstat (limited to 'broadcast.go')
-rw-r--r--broadcast.go59
1 files changed, 59 insertions, 0 deletions
diff --git a/broadcast.go b/broadcast.go
new file mode 100644
index 0000000..7268971
--- /dev/null
+++ b/broadcast.go
@@ -0,0 +1,59 @@
+package main
+
+import "sync"
+
+// Broadcast sends data sent from source to all destination channels.
+type Broadcast[T any] struct {
+ source chan T
+ destinations []chan<- T
+
+ mu sync.Mutex
+ wg sync.WaitGroup
+}
+
+// The caller is responsible for closing source. When source is closed,
+// Broadcast will close all destinations.
+func NewBroadcast[T any](source chan T) Broadcast[T] {
+ bc := Broadcast[T]{
+ source,
+ make([]chan<- T, 0),
+ sync.Mutex{},
+ sync.WaitGroup{},
+ }
+
+ go func(bc *Broadcast[T]) {
+ bc.wg.Add(1)
+
+ for v := range bc.source {
+ bc.mu.Lock()
+ for _, dest := range bc.destinations {
+ dest <- v
+ }
+ bc.mu.Unlock()
+ }
+
+ bc.mu.Lock()
+ for _, dest := range bc.destinations {
+ close(dest)
+ }
+ bc.mu.Unlock()
+
+ bc.wg.Done()
+ }(&bc)
+ return bc
+}
+
+func (bc *Broadcast[T]) AddDestination() <-chan T {
+ bc.mu.Lock()
+ defer bc.mu.Unlock()
+
+ ch := make(chan T)
+ bc.destinations = append(bc.destinations, ch)
+ return ch
+}
+
+// Wait for the Broadcast to see that source is closed and to close the
+// destinations.
+func (bc *Broadcast[T]) Wait() {
+ bc.wg.Wait()
+}