/
monitor.go
219 lines (198 loc) · 4.99 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package monitor
import (
"context"
"fmt"
"sort"
"sync"
"time"
)
// Monitor records events that have occurred in memory and can also periodically
// sample results.
type Monitor struct {
interval time.Duration
samplers []SamplerFunc
lock sync.Mutex
events []*Event
samples []*sample
}
// NewMonitor creates a monitor with the default sampling interval.
func NewMonitor() *Monitor {
return &Monitor{
interval: 15 * time.Second,
}
}
var _ Interface = &Monitor{}
// StartSampling starts sampling every interval until the provided context is done.
// A sample is captured when the context is closed.
func (m *Monitor) StartSampling(ctx context.Context) {
if m.interval == 0 {
return
}
go func() {
ticker := time.NewTicker(m.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
case <-ctx.Done():
m.sample()
return
}
m.sample()
}
}()
}
// AddSampler adds a sampler function to the list of samplers to run every interval.
// Conditions discovered this way are recorded with a start and end time if they persist
// across multiple sampling intervals.
func (m *Monitor) AddSampler(fn SamplerFunc) {
m.lock.Lock()
defer m.lock.Unlock()
m.samplers = append(m.samplers, fn)
}
// Record captures one or more conditions at the current time. All conditions are recorded
// in monotonic order as Event objects.
func (m *Monitor) Record(conditions ...Condition) {
if len(conditions) == 0 {
return
}
m.lock.Lock()
defer m.lock.Unlock()
t := time.Now().UTC()
for _, condition := range conditions {
m.events = append(m.events, &Event{
At: t,
Condition: condition,
})
}
}
func (m *Monitor) sample() {
m.lock.Lock()
samplers := m.samplers
m.lock.Unlock()
now := time.Now().UTC()
var conditions []*Condition
for _, fn := range samplers {
conditions = append(conditions, fn(now)...)
}
if len(conditions) == 0 {
return
}
m.lock.Lock()
defer m.lock.Unlock()
t := time.Now().UTC()
m.samples = append(m.samples, &sample{
at: t,
conditions: conditions,
})
}
func (m *Monitor) snapshot() ([]*sample, []*Event) {
m.lock.Lock()
defer m.lock.Unlock()
return m.samples, m.events
}
// Conditions returns all conditions that were sampled in the interval
// between from and to. If that does not include a sample interval, no
// results will be returned. EventIntervals are returned in order of
// their first sampling. A condition that was only sampled once is
// returned with from == to. No duplicate conditions are returned
// unless a sampling interval did not report that value.
func (m *Monitor) Conditions(from, to time.Time) EventIntervals {
samples, _ := m.snapshot()
return filterSamples(samples, from, to)
}
// Events returns all events that occur between from and to, including
// any sampled conditions that were encountered during that period.
// EventIntervals are returned in order of their occurrence.
func (m *Monitor) Events(from, to time.Time) EventIntervals {
samples, events := m.snapshot()
intervals := filterSamples(samples, from, to)
events = filterEvents(events, from, to)
// merge the two sets of inputs
mustSort := len(intervals) > 0
for i := range events {
if i > 0 && events[i-1].At.After(events[i].At) {
fmt.Printf("ERROR: event %d out of order\n %#v\n %#v\n", i, events[i-1], events[i])
}
at := events[i].At
condition := &events[i].Condition
intervals = append(intervals, &EventInterval{
From: at,
To: at,
Condition: condition,
})
}
if mustSort {
sort.Sort(intervals)
}
return intervals
}
func filterSamples(samples []*sample, from, to time.Time) EventIntervals {
if len(samples) == 0 {
return nil
}
if !from.IsZero() {
first := sort.Search(len(samples), func(i int) bool {
return samples[i].at.After(from)
})
if first == -1 {
return nil
}
samples = samples[first:]
}
if !to.IsZero() {
for i, sample := range samples {
if sample.at.After(to) {
samples = samples[:i]
break
}
}
}
if len(samples) == 0 {
return nil
}
intervals := make(EventIntervals, 0, len(samples)*2)
last, next := make(map[Condition]*EventInterval), make(map[Condition]*EventInterval)
for _, sample := range samples {
for _, condition := range sample.conditions {
interval, ok := last[*condition]
if ok {
interval.To = sample.at
next[*condition] = interval
continue
}
interval = &EventInterval{
Condition: condition,
From: sample.at,
To: sample.at,
}
next[*condition] = interval
intervals = append(intervals, interval)
}
for k := range last {
delete(last, k)
}
last, next = next, last
}
return intervals
}
func filterEvents(events []*Event, from, to time.Time) []*Event {
if from.IsZero() && to.IsZero() {
return events
}
first := sort.Search(len(events), func(i int) bool {
return events[i].At.After(from)
})
if first == -1 {
return nil
}
if to.IsZero() {
return events[first:]
}
for i := first; i < len(events); i++ {
if events[i].At.After(to) {
return events[first:i]
}
}
return events[first:]
}