From a4ca32963cba9922ec92efe1efa165d880513752 Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Thu, 8 Feb 2024 01:37:07 -0500 Subject: refactor main --- broadcast.go | 59 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 broadcast.go (limited to 'broadcast.go') diff --git a/broadcast.go b/broadcast.go new file mode 100644 index 0000000..7268971 --- /dev/null +++ b/broadcast.go @@ -0,0 +1,59 @@ +package main + +import "sync" + +// Broadcast sends data sent from source to all destination channels. +type Broadcast[T any] struct { + source chan T + destinations []chan<- T + + mu sync.Mutex + wg sync.WaitGroup +} + +// The caller is responsible for closing source. When source is closed, +// Broadcast will close all destinations. +func NewBroadcast[T any](source chan T) Broadcast[T] { + bc := Broadcast[T]{ + source, + make([]chan<- T, 0), + sync.Mutex{}, + sync.WaitGroup{}, + } + + go func(bc *Broadcast[T]) { + bc.wg.Add(1) + + for v := range bc.source { + bc.mu.Lock() + for _, dest := range bc.destinations { + dest <- v + } + bc.mu.Unlock() + } + + bc.mu.Lock() + for _, dest := range bc.destinations { + close(dest) + } + bc.mu.Unlock() + + bc.wg.Done() + }(&bc) + return bc +} + +func (bc *Broadcast[T]) AddDestination() <-chan T { + bc.mu.Lock() + defer bc.mu.Unlock() + + ch := make(chan T) + bc.destinations = append(bc.destinations, ch) + return ch +} + +// Wait for the Broadcast to see that source is closed and to close the +// destinations. +func (bc *Broadcast[T]) Wait() { + bc.wg.Wait() +} -- cgit v1.2.3