/
alerts.go
323 lines (281 loc) · 7.73 KB
/
alerts.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
//go:generate mockgen -package=mockalerts -destination=mock/alerts.mock.go github.com/libopenstorage/openstorage/alerts FilterDeleter
package alerts
import (
"encoding/json"
"fmt"
"path/filepath"
"strconv"
"sync"
"time"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/libopenstorage/openstorage/api"
"github.com/portworx/kvdb"
"github.com/sirupsen/logrus"
)
// Error type for defining error constants.
type Error string
// Error satisfies Error interface from std lib.
func (e Error) Error() string {
return string(e)
}
// Tag tags an error with a tag string.
// Helpful for providing error contexts.
func (e Error) Tag(tag Error) Error {
return Error(string(tag) + ":" + string(e))
}
const (
kvdbKey = "alerts"
typeAssertionError Error = "type assertion error"
invalidFilterType Error = "invalid filter type"
invalidOptionType Error = "invalid option type"
incorrectFilterValue Error = "incorrectly set filter value"
)
const (
HalfDay = Day / 2
Day = 60 * 60 * 24
FiveDays = Day * 5
)
// Manager manages alerts.
type Manager interface {
// FilterDeleter allows read only operation on alerts
FilterDeleter
// Raise raises an alert.
Raise(alert *api.Alert) error
// SetRules sets a set of rules to be performed on alert events.
SetRules(rules ...Rule)
// DeleteRules deletes rules
DeleteRules(rules ...Rule)
}
// FilterDeleter defines a list and delete interface on alerts.
// This interface is used in SDK.
type FilterDeleter interface {
// Enumerate lists all alerts filtered by a variadic list of filters.
// It will fetch a superset such that every alert is matched by at least one filter.
Enumerate(filters ...Filter) ([]*api.Alert, error)
// Filter filters given list of alerts successively through each filter.
Filter(alerts []*api.Alert, filters ...Filter) ([]*api.Alert, error)
// Delete deletes alerts filtered by a chain of filters.
Delete(filters ...Filter) error
}
func newManager(options ...Option) (*manager, error) {
m := &manager{rules: make(map[string]Rule), ttl: HalfDay}
for _, option := range options {
switch option.GetType() {
case ttlOption:
v, ok := option.GetValue().(uint64)
if !ok {
return nil, typeAssertionError
}
m.ttl = v
}
}
return m, nil
}
// manager implements Manager interface.
type manager struct {
rules map[string]Rule
ttl uint64
sync.Mutex
}
// getKey is a util func that constructs kvdb key.
// kvdb tree structure is setup as follows:
// <baseKey>/<resourceType>/<dec2hex(alertType)>/<resourceID>/<alertObject>
func getKey(resourceType string, alertType int64, resourceID string) string {
return filepath.Join(kvdbKey, resourceType, strconv.FormatInt(alertType, 16), resourceID)
}
func (m *manager) Raise(alert *api.Alert) error {
for _, rule := range m.rules {
if rule.GetEvent() == raiseEvent {
match, err := rule.GetFilter().Match(alert)
if err != nil {
return err
}
if match {
if err := rule.GetAction().Run(m); err != nil {
return err
}
}
}
}
if alert.Timestamp == nil {
alert.Timestamp = ×tamp.Timestamp{Seconds: time.Now().Unix()}
}
key := getKey(alert.Resource.String(), alert.GetAlertType(), alert.ResourceId)
if kvdb.Instance() == nil {
return fmt.Errorf("kvdb instance is not set")
}
if _, err := kvdb.Instance().Delete(key); err != nil && err != kvdb.ErrNotFound {
logrus.WithField("pkg", "openstorage/alerts").WithField("func", "Raise").Error(err)
}
// ttl is time to live. it indicates how long (in seconds) the object should live inside kvdb backend.
// kvdb will delete the object once ttl elapses.
if alert.Cleared {
// if the alert is marked Cleared, it is pushed to kvdb with a ttlOption of half day
_, err := kvdb.Instance().Put(key, alert, m.ttl)
return err
} else {
// otherwise use the ttl value embedded in the alert object
_, err := kvdb.Instance().Put(key, alert, alert.Ttl)
return err
}
}
// Enumerate takes a variadic list of filters that are first analyzed to see if one filter
// is inclusive of other. Only the filters that are unique supersets are retained and their contents
// is fetched using kvdb enumerate.
func (m *manager) Enumerate(filters ...Filter) ([]*api.Alert, error) {
myAlerts := make([]*api.Alert, 0, 0)
keys, err := getUniqueKeysFromFilters(filters...)
if err != nil {
return nil, err
}
if kvdb.Instance() == nil {
return nil, fmt.Errorf("kvdb instance is not set")
}
// enumerate for unique keys
for key := range keys {
kvps, err := enumerate(kvdb.Instance(), key)
if err != nil {
return nil, err
}
for _, kvp := range kvps {
alert := new(api.Alert)
if err := json.Unmarshal(kvp.Value, alert); err != nil {
return nil, err
}
if len(filters) == 0 {
myAlerts = append(myAlerts, alert)
continue
}
for _, filter := range filters {
if match, err := filter.Match(alert); err != nil {
return nil, err
} else {
// if alert is matched by at least one filter,
// include it and break out of loop to avoid further checks.
if match {
myAlerts = append(myAlerts, alert)
break
}
}
}
}
}
return myAlerts, nil
}
// enumerate recursively fetches kvpairs.
// Recursive call is required since, unlike mem kv, an etcd or consul based kv will not return
// leaf objects if the key is a prefix referencing only higher level paths. For instance if the
// kvdb structure is as follows:
// a/b/c/<data>
// a/B/C/<data>
// then enumerating for keys using "a" will only return "b" and "B".
func enumerate(kv kvdb.Kvdb, key string) (kvdb.KVPairs, error) {
kvps, err := kv.Enumerate(key)
if err != nil {
return nil, err
}
var keys []string
var out kvdb.KVPairs
for _, kvp := range kvps {
kvp := kvp
if len(kvp.Value) == 0 {
keys = append(keys, kvp.Key)
continue
}
out = append(out, kvp)
}
for _, key := range keys {
kvps, err := enumerate(kv, key)
if err != nil {
return nil, err
}
out = append(out, kvps...)
}
return out, nil
}
func (m *manager) Filter(alerts []*api.Alert, filters ...Filter) ([]*api.Alert, error) {
for _, filter := range filters {
i := 0
for j, alert := range alerts {
match, err := filter.Match(alert)
if err != nil {
return nil, err
}
if match {
alerts[i] = alerts[j]
i += 1
}
}
// shrink the list
alerts = alerts[:i]
if len(alerts) == 0 {
return alerts, nil
}
}
return alerts, nil
}
func (m *manager) Delete(filters ...Filter) error {
for _, rule := range m.rules {
if rule.GetEvent() == deleteEvent {
if err := rule.GetAction().Run(m); err != nil {
return err
}
}
}
allFiltersIndexBased := true
Loop:
for _, filter := range filters {
switch filter.GetFilterType() {
case CustomFilter,
timeSpanFilter,
alertTypeFilter,
countSpanFilter,
minSeverityFilter,
flagCheckFilter,
matchAlertTypeFilter,
resourceIDFilter,
matchResourceIDFilter:
allFiltersIndexBased = false
break Loop
}
}
if kvdb.Instance() == nil {
return fmt.Errorf("kvdb instance is not set")
}
if allFiltersIndexBased {
keys, err := getUniqueKeysFromFilters(filters...)
if err != nil {
return err
}
for key := range keys {
if err := kvdb.Instance().DeleteTree(key); err != nil {
return err
}
}
} else {
myAlerts, err := m.Enumerate(filters...)
if err != nil {
return err
}
for _, alert := range myAlerts {
if _, err := kvdb.Instance().Delete(getKey(alert.Resource.String(), alert.GetAlertType(), alert.ResourceId)); err != nil {
return err
}
}
}
return nil
}
func (m *manager) SetRules(rules ...Rule) {
m.Lock()
defer m.Unlock()
for _, rule := range rules {
m.rules[rule.GetName()] = rule
}
}
func (m *manager) DeleteRules(rules ...Rule) {
m.Lock()
defer m.Unlock()
for _, rule := range rules {
delete(m.rules, rule.GetName())
}
}