This repository has been archived by the owner on Jun 1, 2023. It is now read-only.
/
stream_rule.go
136 lines (123 loc) · 3.8 KB
/
stream_rule.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package logic
import (
"fmt"
log "github.com/sirupsen/logrus"
"github.com/suzuki-shunsuke/go-graylog"
"github.com/suzuki-shunsuke/go-graylog/validator"
)
// HasStreamRule returns whether the stream sule exists.
func (lgc *Logic) HasStreamRule(streamID, streamRuleID string) (bool, error) {
return lgc.store.HasStreamRule(streamID, streamRuleID)
}
// AddStreamRule adds a stream rule to the Server.
func (lgc *Logic) AddStreamRule(rule *graylog.StreamRule) (int, error) {
if err := validator.CreateValidator.Struct(rule); err != nil {
return 400, err
}
s, sc, err := lgc.GetStream(rule.StreamID)
if err != nil {
lgc.Logger().WithFields(log.Fields{
"error": err, "id": rule.StreamID, "sc": sc,
}).Warn("failed to get a stream")
return sc, err
}
if s.IsDefault {
return 400, fmt.Errorf("cannot add stream rules to the default stream")
}
if err := lgc.store.AddStreamRule(rule); err != nil {
return 500, err
}
return 201, nil
}
// UpdateStreamRule updates a stream rule of the Server.
func (lgc *Logic) UpdateStreamRule(prms *graylog.StreamRuleUpdateParams) (int, error) {
// PUT /streams/{streamid}/rules/{streamRuleID} Update a stream rule
if err := validator.UpdateValidator.Struct(prms); err != nil {
return 400, err
}
ok, err := lgc.HasStreamRule(prms.StreamID, prms.ID)
if err != nil {
return 500, err
}
if !ok {
return 404, fmt.Errorf("no stream rule is not found: <%s>", prms.StreamID)
}
if err := lgc.store.UpdateStreamRule(prms); err != nil {
return 500, err
}
return 204, nil
}
// DeleteStreamRule deletes a stream rule from the Server.
func (lgc *Logic) DeleteStreamRule(streamID, streamRuleID string) (int, error) {
ok, err := lgc.HasStream(streamID)
if err != nil {
lgc.Logger().WithFields(log.Fields{
"error": err, "id": streamID,
}).Error("lgc.HasStream() is failure")
return 500, err
}
if !ok {
return 404, fmt.Errorf("no stream found with id <%s>", streamID)
}
ok, err = lgc.HasStreamRule(streamID, streamRuleID)
if err != nil {
lgc.Logger().WithFields(log.Fields{
"error": err, "streamID": streamID, "streamRuleID": streamRuleID,
}).Error("lgc.HasStreamRule() is failure")
return 500, err
}
if !ok {
return 404, fmt.Errorf("no stream rule found with id <%s>", streamRuleID)
}
if err := lgc.store.DeleteStreamRule(streamID, streamRuleID); err != nil {
return 500, err
}
return 200, nil
}
// GetStreamRules returns a list of all stream rules of a given stream.
func (lgc *Logic) GetStreamRules(streamID string) ([]graylog.StreamRule, int, int, error) {
if err := ValidateObjectID(streamID); err != nil {
// unfortunately graylog returns not 400 but 404.
return nil, 0, 404, err
}
ok, err := lgc.HasStream(streamID)
if err != nil {
return nil, 0, 500, err
}
if !ok {
return nil, 0, 404, fmt.Errorf("no stream is not found: <%s>", streamID)
}
rules, total, err := lgc.store.GetStreamRules(streamID)
if err != nil {
return nil, 0, 500, err
}
return rules, total, 200, nil
}
// GetStreamRule returns a stream rule.
func (lgc *Logic) GetStreamRule(streamID, streamRuleID string) (*graylog.StreamRule, int, error) {
ok, err := lgc.HasStream(streamID)
if err != nil {
lgc.Logger().WithFields(log.Fields{
"error": err, "id": streamID,
}).Error("lgc.HasStream() is failure")
return nil, 500, err
}
if !ok {
return nil, 404, fmt.Errorf("no stream found with id <%s>", streamID)
}
ok, err = lgc.HasStreamRule(streamID, streamRuleID)
if err != nil {
lgc.Logger().WithFields(log.Fields{
"error": err, "streamID": streamID, "streamRuleID": streamRuleID,
}).Error("lgc.HasStreamRule() is failure")
return nil, 500, err
}
if !ok {
return nil, 404, fmt.Errorf("no stream rule found with id <%s>", streamRuleID)
}
rule, err := lgc.store.GetStreamRule(streamID, streamRuleID)
if err != nil {
return rule, 500, err
}
return rule, 200, nil
}