Skip to content

Commit

Permalink
Merge pull request #15 from fujiwara/discard-no-warning
Browse files Browse the repository at this point in the history
Do not raise warnings by discarded or test events.
  • Loading branch information
fujiwara committed Mar 30, 2021
2 parents e076044 + d8b7f22 commit dfe0938
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 15 deletions.
19 changes: 17 additions & 2 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ import (

func ParseEvent(b []byte) (Event, error) {
var e Event
err := json.Unmarshal(b, &e)
if err := json.Unmarshal(b, &e); err != nil {
return e, err
}
if len(e.Records) == 0 && e.IsTestEvent() {
return e, nil
}
for _, r := range e.Records {
if !strings.Contains(r.S3.Object.Key, "%") {
continue
Expand All @@ -18,14 +23,24 @@ func ParseEvent(b []byte) (Event, error) {
r.S3.Object.Key = _key
}
}
return e, err
return e, nil
}

type Event struct {
Records []*EventRecord `json:"Records"`
Event string
Bucket string
}

func (e Event) IsTestEvent() bool {
return e.Event == "s3:TestEvent"
}

func (e Event) String() string {
if e.IsTestEvent() {
return fmt.Sprintf("%s for %s", e.Event, e.Bucket)
}

s := make([]string, len(e.Records))
for i, r := range e.Records {
s[i] = r.String()
Expand Down
25 changes: 25 additions & 0 deletions event_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package rin_test

import (
"io/ioutil"
"os"
"testing"

rin "github.com/fujiwara/Rin"
Expand Down Expand Up @@ -50,6 +52,9 @@ func TestParseEvent(t *testing.T) {
if err != nil {
t.Error("json decode error", err)
}
if event.IsTestEvent() {
t.Error("must not be a test event")
}
r := event.Records[0]
if r.EventName != "ObjectCreated:Put" {
t.Error("unexpected EventName", r.EventName)
Expand All @@ -67,3 +72,23 @@ func TestParseEvent(t *testing.T) {
t.Error("unexpected key", r.S3.Object.Key)
}
}

func TestParseTestEvent(t *testing.T) {
f, err := os.Open("test/testevent.json")
if err != nil {
t.Error(err)
}
b, _ := ioutil.ReadAll(f)
f.Close()

event, err := rin.ParseEvent(b)
if err != nil {
t.Error("json decode error", err)
}
if !event.IsTestEvent() {
t.Errorf("not a test event %s", string(b))
}
if event.String() != "s3:TestEvent for example-bucket" {
t.Errorf("unexpected string %s", event.String())
}
}
9 changes: 5 additions & 4 deletions redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,28 @@ var (
)

func Import(event Event) (int, error) {
imported := 0
var processed int
for _, record := range event.Records {
TARGETS:
for _, target := range config.Targets {
if ok, cap := target.MatchEventRecord(record); ok {
if target.Discard {
processed++
break TARGETS
}
err := ImportRedshift(target, record, cap)
if err != nil {
return imported, err
return processed, err
} else {
imported++
processed++
}
if target.Break {
break TARGETS
}
}
}
}
return imported, nil
return processed, nil
}

func ConnectToRedshift(target *Target) (*sql.DB, error) {
Expand Down
22 changes: 13 additions & 9 deletions rin.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,20 @@ func handleMessage(ctx context.Context, svc *sqs.SQS, queueUrl *string) error {
log.Printf("[error] [%s] Can't parse event from Body. %s", msgId, err)
return err
}
log.Printf("[info] [%s] Importing event: %s", msgId, event)
n, err := Import(event)
if err != nil {
log.Printf("[error] [%s] Import failed. %s", msgId, err)
return err
}
if n == 0 {
log.Printf("[warn] [%s] All events were not matched for any targets. Ignored.", msgId)
if event.IsTestEvent() {
log.Printf("[info] [%s] Skipping %s", msgId, event.String())
} else {
log.Printf("[info] [%s] %d import action completed.", msgId, n)
log.Printf("[info] [%s] Importing event: %s", msgId, event)
n, err := Import(event)
if err != nil {
log.Printf("[error] [%s] Import failed. %s", msgId, err)
return err
}
if n == 0 {
log.Printf("[warn] [%s] All events were not matched for any targets. Ignored.", msgId)
} else {
log.Printf("[info] [%s] %d actions completed.", msgId, n)
}
}
_, err = svc.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: queueUrl,
Expand Down
8 changes: 8 additions & 0 deletions test/testevent.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"Service": "Amazon S3",
"Event": "s3:TestEvent",
"Time": "2021-03-30T05:58:18.184Z",
"Bucket": "example-bucket",
"RequestId": "C0CMH2QNHV3A1SGQ",
"HostId": "/9JSRrT+j26Hk56OjL/qeDZM76Otmaj3oY9hnTuD/kdyN+WN6vDli3q0LPJzUMRW5y/yVmTOlzQ="
}

0 comments on commit dfe0938

Please sign in to comment.