blob: 407dcd75f98090243014f6b638ec328573cc82c9 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
package main
import (
"time"
)
type Record[T any] struct {
put chan<- T
get chan<- chan Entry[T]
getRecent chan<- chan Entry[T]
}
type Entry[T any] struct {
t time.Time
v T
}
// Create a record with the specified capacity.
// If the capacity is exceeded, old entires will be discarded and new ones kept.
func newRecord[T any](capacity int) Record[T] {
put := make(chan T)
get := make(chan chan Entry[T])
getRecent := make(chan chan Entry[T])
go func() {
entries := make([]Entry[T], 0, capacity)
for {
select {
case v, ok := <-put:
if !ok {
return
}
entries = append(entries, Entry[T]{time.Now(), v})
if len(entries) > capacity {
entries = entries[1:]
}
case c, ok := <-get:
if !ok {
return
}
for _, e := range entries {
c <- e
}
close(c)
case c, ok := <-getRecent:
if !ok {
return
}
if len(entries) > 0 {
c <- entries[len(entries)-1]
}
close(c)
}
}
}()
return Record[T]{put, get, getRecent}
}
func (l Record[T]) Close() {
close(l.put)
close(l.get)
close(l.getRecent)
}
|