aboutsummaryrefslogtreecommitdiffstats
path: root/broadcast.go
diff options
context:
space:
mode:
Diffstat (limited to 'broadcast.go')
-rw-r--r--broadcast.go60
1 files changed, 60 insertions, 0 deletions
diff --git a/broadcast.go b/broadcast.go
new file mode 100644
index 0000000..dc0dc84
--- /dev/null
+++ b/broadcast.go
@@ -0,0 +1,60 @@
+package main
+
+import "sync"
+
+// Broadcast sends data sent from source to all destination channels.
+type Broadcast[T any] struct {
+ source chan T
+ destinations []chan<- T
+
+ mu sync.Mutex
+ wg sync.WaitGroup
+}
+
+// The caller is responsible for closing source. When source is closed,
+// Broadcast will close all destinations.
+func NewBroadcast[T any](source chan T) *Broadcast[T] {
+ bc := &Broadcast[T]{
+ source,
+ make([]chan<- T, 0),
+ sync.Mutex{},
+ sync.WaitGroup{},
+ }
+
+ go func() {
+ bc.wg.Add(1)
+ for v := range bc.source {
+ bc.broadcast(v)
+ }
+ bc.mu.Lock()
+ for _, dest := range bc.destinations {
+ close(dest)
+ }
+ bc.mu.Unlock()
+ bc.wg.Done()
+ }()
+ return bc
+}
+
+func (bc *Broadcast[T]) AddDestination() <-chan T {
+ bc.mu.Lock()
+ defer bc.mu.Unlock()
+
+ ch := make(chan T)
+ bc.destinations = append(bc.destinations, ch)
+ return ch
+}
+
+// Wait for the Broadcast to see that source is closed and to close the
+// destinations.
+func (bc *Broadcast[T]) Wait() {
+ bc.wg.Wait()
+}
+
+func (bc *Broadcast[T]) broadcast(v T) {
+ bc.mu.Lock()
+ defer bc.mu.Unlock()
+ for _, dest := range bc.destinations {
+ dest <- v
+ }
+}