Skip to content

Commit

Permalink
Handle errors when processing csv
Browse files Browse the repository at this point in the history
Signed-off-by: aswinkarthik <[email protected]>
  • Loading branch information
aswinkarthik committed Feb 16, 2019
1 parent aca0154 commit 239708f
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 31 deletions.
9 changes: 8 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,16 @@ Most suitable for csv files created from database tables`,
baseConfig := digest.NewConfig(baseFile, config.GetPrimaryKeys(), config.GetValueColumns())
deltaConfig := digest.NewConfig(deltaFile, config.GetPrimaryKeys(), config.GetValueColumns())

diff := digest.Diff(baseConfig, deltaConfig)
diff, err := digest.Diff(baseConfig, deltaConfig)

if err != nil {
fmt.Fprintf(os.Stderr, "csvdiff failed: %v\n", err)
os.Exit(2)
}

config.Formatter().Format(diff, os.Stdout)

return
},
}

Expand Down
32 changes: 26 additions & 6 deletions pkg/digest/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package digest

import (
"encoding/csv"
"fmt"
"runtime"
"strings"
"sync"
Expand All @@ -27,16 +28,22 @@ type diffMessage struct {
}

// Diff will differentiate between two given configs
func Diff(baseConfig, deltaConfig *Config) Difference {
func Diff(baseConfig, deltaConfig *Config) (Difference, error) {
maxProcs := runtime.NumCPU()
base := Create(baseConfig)
base, err := Create(baseConfig)

if err != nil {
return Difference{}, fmt.Errorf("error in base file: %v", err)
}

additions := make([]string, 0, len(base))
modifications := make([]string, 0, len(base))

messageChan := make(chan []diffMessage, bufferSize*maxProcs)
errorChannel := make(chan error)
defer close(errorChannel)

go readAndCompare(base, deltaConfig, messageChan)
go readAndCompare(base, deltaConfig, messageChan, errorChannel)

for msgs := range messageChan {
for _, msg := range msgs {
Expand All @@ -48,14 +55,26 @@ func Diff(baseConfig, deltaConfig *Config) Difference {
}
}

return Difference{Additions: additions, Modifications: modifications}
if err := <-errorChannel; err != nil {
return Difference{}, fmt.Errorf("error in delta file: %v", err)
}

return Difference{Additions: additions, Modifications: modifications}, nil
}

func readAndCompare(base map[uint64]uint64, config *Config, msgChannel chan<- []diffMessage) {
func readAndCompare(base map[uint64]uint64, config *Config, msgChannel chan<- []diffMessage, errorChannel chan<- error) {
reader := csv.NewReader(config.Reader)
var wg sync.WaitGroup
for {
lines, eofReached := getNextNLines(reader)
lines, eofReached, err := getNextNLines(reader)

if err != nil {
wg.Wait()
close(msgChannel)
errorChannel <- err
return
}

wg.Add(1)
go compareDigestForNLines(base, lines, config, msgChannel, &wg)

Expand All @@ -65,6 +84,7 @@ func readAndCompare(base map[uint64]uint64, config *Config, msgChannel chan<- []
}
wg.Wait()
close(msgChannel)
errorChannel <- nil
}

func compareDigestForNLines(base map[uint64]uint64,
Expand Down
3 changes: 2 additions & 1 deletion pkg/digest/compare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ func TestDiff(t *testing.T) {
},
}

actual := digest.Diff(baseConfig, deltaConfig)
actual, err := digest.Diff(baseConfig, deltaConfig)

assert.NoError(t, err)
assert.ElementsMatch(t, expected.Modifications, actual.Modifications)
assert.ElementsMatch(t, expected.Additions, actual.Additions)
}
24 changes: 19 additions & 5 deletions pkg/digest/digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,29 +51,42 @@ const bufferSize = 512
// Create can create a Digest using the Configurations passed.
// It returns the digest as a map[uint64]uint64.
// It can also keep track of the Source line.
func Create(config *Config) map[uint64]uint64 {
func Create(config *Config) (map[uint64]uint64, error) {
maxProcs := runtime.NumCPU()
reader := csv.NewReader(config.Reader)

output := make(map[uint64]uint64)

digestChannel := make(chan []Digest, bufferSize*maxProcs)
errorChannel := make(chan error)
defer close(errorChannel)

go readAndProcess(config, reader, digestChannel)
go readAndProcess(config, reader, digestChannel, errorChannel)

for digests := range digestChannel {
for _, digest := range digests {
output[digest.Key] = digest.Value
}
}

return output
if err := <-errorChannel; err != nil {
return nil, err
}

return output, nil
}

func readAndProcess(config *Config, reader *csv.Reader, digestChannel chan<- []Digest) {
func readAndProcess(config *Config, reader *csv.Reader, digestChannel chan<- []Digest, errorChannel chan<- error) {
var wg sync.WaitGroup
for {
lines, eofReached := getNextNLines(reader)
lines, eofReached, err := getNextNLines(reader)
if err != nil {
wg.Wait()
close(digestChannel)
errorChannel <- err
return
}

wg.Add(1)
go createDigestForNLines(lines, config, digestChannel, &wg)

Expand All @@ -83,6 +96,7 @@ func readAndProcess(config *Config, reader *csv.Reader, digestChannel chan<- []D
}
wg.Wait()
close(digestChannel)
errorChannel <- nil
}

func createDigestForNLines(lines [][]string,
Expand Down
54 changes: 39 additions & 15 deletions pkg/digest/digest_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package digest_test

import (
"encoding/csv"
"strings"
"testing"

Expand Down Expand Up @@ -32,25 +33,48 @@ func TestDigestForFile(t *testing.T) {
secondDigest := xxhash.Sum64String(secondLine)
saturdayDigest := xxhash.Sum64String("saturday")

testConfig := &digest.Config{
Reader: strings.NewReader(firstLine + "\n" + secondLine),
Key: []int{0},
}
t.Run("should create digest for given key and all values", func(t *testing.T) {
testConfig := &digest.Config{
Reader: strings.NewReader(firstLine + "\n" + secondLine),
Key: []int{0},
}

actualDigest := digest.Create(testConfig)
actualDigest, err := digest.Create(testConfig)

expectedDigest := map[uint64]uint64{firstKey: firstDigest, secondKey: secondDigest}
expectedDigest := map[uint64]uint64{firstKey: firstDigest, secondKey: secondDigest}

assert.Equal(t, expectedDigest, actualDigest)
assert.NoError(t, err)
assert.Equal(t, expectedDigest, actualDigest)
})

testConfig = &digest.Config{
Reader: strings.NewReader(firstLine + "\n" + secondLine),
Key: []int{0},
Value: []int{3},
}
t.Run("should create digest for given key and given values", func(t *testing.T) {
testConfig := &digest.Config{
Reader: strings.NewReader(firstLine + "\n" + secondLine),
Key: []int{0},
Value: []int{3},
}

actualDigest = digest.Create(testConfig)
expectedDigest = map[uint64]uint64{firstKey: fridayDigest, secondKey: saturdayDigest}
actualDigest, err := digest.Create(testConfig)
expectedDigest := map[uint64]uint64{firstKey: fridayDigest, secondKey: saturdayDigest}

assert.Equal(t, expectedDigest, actualDigest)
assert.NoError(t, err)
assert.Equal(t, expectedDigest, actualDigest)
})

t.Run("should return ParseError if csv reading fails", func(t *testing.T) {
testConfig := &digest.Config{
Reader: strings.NewReader(firstLine + "\n" + "some-random-line"),
Key: []int{0},
Value: []int{3},
}

actualDigest, err := digest.Create(testConfig)

assert.Error(t, err)

_, isParseError := err.(*csv.ParseError)

assert.True(t, isParseError)
assert.Nil(t, actualDigest)
})
}
7 changes: 4 additions & 3 deletions pkg/digest/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"io"
)

func getNextNLines(reader *csv.Reader) ([][]string, bool) {
func getNextNLines(reader *csv.Reader) ([][]string, bool, error) {
lines := make([][]string, bufferSize)

lineCount := 0
Expand All @@ -18,9 +18,10 @@ func getNextNLines(reader *csv.Reader) ([][]string, bool) {
eofReached = true
break
}
panic(err)

return nil, true, err
}
}

return lines[:lineCount], eofReached
return lines[:lineCount], eofReached, nil
}
57 changes: 57 additions & 0 deletions pkg/digest/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package digest

import (
"encoding/csv"
"fmt"
"strconv"
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

func TestGetNextNLines(t *testing.T) {
t.Run("should get given number of lines from csv", func(t *testing.T) {
var csvBuilder strings.Builder
const totalLines = 1000
for i := 0; i < totalLines; i++ {
csvBuilder.WriteString(fmt.Sprintf("%d,random-col-1,random-col-2\n", i))
}

csvFile := csv.NewReader(strings.NewReader(csvBuilder.String()))

lines, eofReached, err := getNextNLines(csvFile)

assert.Len(t, lines, bufferSize)
assert.False(t, eofReached)
assert.NoError(t, err)

for i := 0; i < bufferSize; i++ {
expected := []string{strconv.Itoa(i), "random-col-1", "random-col-2"}
assert.Equal(t, expected, lines[i])
}

lines, eofReached, err = getNextNLines(csvFile)

assert.Len(t, lines, totalLines-bufferSize)
assert.True(t, eofReached)
assert.NoError(t, err)

for i := 0; i < totalLines-bufferSize; i++ {
expected := []string{strconv.Itoa(i + bufferSize), "random-col-1", "random-col-2"}
assert.Equal(t, expected, lines[i])
}
})

t.Run("should throw error if not a valid csv", func(t *testing.T) {
sampleInvalidCSV := `1,2,3
4,5,6
random-stuff
7,8,9`
csvFile := csv.NewReader(strings.NewReader(sampleInvalidCSV))

_, _, err := getNextNLines(csvFile)

assert.Error(t, err)
})
}

0 comments on commit 239708f

Please sign in to comment.