Skip to content

Commit

Permalink
Add new compare logic to not load all lines to memory
Browse files Browse the repository at this point in the history
  • Loading branch information
aswinkarthik committed Apr 29, 2018
1 parent 967ee52 commit 9775c17
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 33 deletions.
113 changes: 113 additions & 0 deletions pkg/digest/compare.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
package digest

import (
"encoding/csv"
"io"
"runtime"
"strings"
"sync"
)

// Compare compares two Digest maps and returns the additions and modification
// keys as arrays.
func Compare(baseDigest, newDigest map[uint64]uint64) (additions []uint64, modifications []uint64) {
Expand All @@ -24,3 +32,108 @@ func Compare(baseDigest, newDigest map[uint64]uint64) (additions []uint64, modif
}
return additions[:additionCounter], modifications[:modificationCounter]
}

// Difference represents the additions and modifications
// between the two Configs
type Difference struct {
Additions []string
Modifications []string
}

type messageType int

const (
addition messageType = iota
modification messageType = iota
)

type diffMessage struct {
_type messageType
value string
}

// Diff will differentiate between two given configs
func Diff(baseConfig, deltaConfig *Config) Difference {
maxProcs := runtime.NumCPU()
base, _, _ := Create(baseConfig)

additions := make([]string, 0, len(base))
modifications := make([]string, 0, len(base))

messageChan := make(chan []diffMessage, bufferSize*maxProcs)

go readAndCompare(base, deltaConfig, messageChan)

for msgs := range messageChan {
for _, msg := range msgs {
if msg._type == addition {
additions = append(additions, msg.value)
} else if msg._type == modification {
modifications = append(modifications, msg.value)
}
}
}

return Difference{Additions: additions, Modifications: modifications}
}

func readAndCompare(base map[uint64]uint64, config *Config, msgChannel chan<- []diffMessage) {
reader := csv.NewReader(config.Reader)
eofReached := false
var wg sync.WaitGroup
for !eofReached {
lines := make([][]string, bufferSize)

lineCount := 0
for ; lineCount < bufferSize; lineCount++ {
line, err := reader.Read()
lines[lineCount] = line
if err != nil {
if err == io.EOF {
eofReached = true
break
}
return
}
}

wg.Add(1)
go compareDigestForNLines(base, lines[:lineCount], config, msgChannel, &wg)
}
wg.Wait()
close(msgChannel)
}

func compareDigestForNLines(base map[uint64]uint64,
lines [][]string,
config *Config,
msgChannel chan<- []diffMessage,
wg *sync.WaitGroup,
) {
output := make([]diffMessage, len(lines))
diffCounter := 0
for _, line := range lines {
digest := CreateDigest(line, config.Key, config.Value)
if baseValue, present := base[digest.Key]; present {
// Present in both base and delta
if baseValue != digest.Value {
// Modification
output[diffCounter] = diffMessage{
value: strings.Join(line, Separator),
_type: modification,
}
diffCounter++
}
} else {
// Not present in base. So Addition.
output[diffCounter] = diffMessage{
value: strings.Join(line, Separator),
_type: addition,
}
diffCounter++
}
}

msgChannel <- output[:diffCounter]
wg.Done()
}
47 changes: 45 additions & 2 deletions pkg/digest/compare_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package digest
package digest_test

import (
"strings"
"testing"

"github.com/aswinkarthik93/csvdiff/pkg/digest"
"github.com/stretchr/testify/assert"
)

Expand All @@ -20,11 +22,52 @@ func TestCompare(t *testing.T) {
10049141081086325814: 12259600610026582000,
}

additions, modifications := Compare(baseDigest, newDigest)
additions, modifications := digest.Compare(baseDigest, newDigest)

expectedAdditions := []uint64{10049141081086325814}
expectedModifications := []uint64{10000305084889337335}

assert.Equal(t, expectedAdditions, additions)
assert.Equal(t, expectedModifications, modifications)
}

func TestDiff(t *testing.T) {
base := `1,col-1,col-2,col-3,one-value
2,col-1,col-2,col-3,two-value
3,col-1,col-2,col-3,three-value
100,col-1,col-2,col-3,hundred-value
`

delta := `1,col-1,col-2,col-3,one-value
2,col-1,col-2,col-3,two-value-modified
4,col-1,col-2,col-3,four-value-added
100,col-1,col-2,col-3,hundred-value-modified
5,col-1,col-2,col-3,five-value-added
`

baseConfig := &digest.Config{
Reader: strings.NewReader(base),
Key: []int{0},
}

deltaConfig := &digest.Config{
Reader: strings.NewReader(delta),
Key: []int{0},
}

expected := digest.Difference{
Additions: []string{
"4,col-1,col-2,col-3,four-value-added",
"5,col-1,col-2,col-3,five-value-added",
},
Modifications: []string{
"2,col-1,col-2,col-3,two-value-modified",
"100,col-1,col-2,col-3,hundred-value-modified",
},
}

actual := digest.Diff(baseConfig, deltaConfig)

assert.ElementsMatch(t, expected.Modifications, actual.Modifications)
assert.ElementsMatch(t, expected.Additions, actual.Additions)
}
37 changes: 6 additions & 31 deletions pkg/digest/digest_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package digest

import (
"bytes"
"os"
"strings"
"testing"
Expand All @@ -15,7 +14,7 @@ func TestCreateDigest(t *testing.T) {
firstKey := xxhash.Sum64String("1")
firstLineDigest := xxhash.Sum64String(firstLine)

expectedDigest := Digest{Key: firstKey, Value: firstLineDigest, Row: firstLine}
expectedDigest := Digest{Key: firstKey, Value: firstLineDigest}

actualDigest := CreateDigest(strings.Split(firstLine, Separator), []int{0}, []int{})

Expand All @@ -31,39 +30,17 @@ func TestDigestForFile(t *testing.T) {
secondKey := xxhash.Sum64String("2")
secondDigest := xxhash.Sum64String(secondLine)

var outputBuffer bytes.Buffer

testConfig := &Config{
Reader: strings.NewReader(firstLine + "\n" + secondLine),
Writer: &outputBuffer,
KeyPositions: []int{0},
Key: []int{0},
SourceMap: true,
Reader: strings.NewReader(firstLine + "\n" + secondLine),
Key: []int{0},
}

actualDigest, sourceMap, err := Create(testConfig)
actualDigest, _, err := Create(testConfig)

expectedDigest := map[uint64]uint64{firstKey: firstDigest, secondKey: secondDigest}
expectedSourceMap := map[uint64]string{firstKey: firstLine, secondKey: secondLine}

assert.Nil(t, err, "error at DigestForFile")
assert.Equal(t, expectedDigest, actualDigest)
assert.Equal(t, expectedSourceMap, sourceMap)

// No source map
testConfigWithoutSourceMap := &Config{
Reader: strings.NewReader(firstLine + "\n" + secondLine),
Writer: &outputBuffer,
KeyPositions: []int{0},
Key: []int{0},
SourceMap: false,
}

actualDigest, sourceMap, err = Create(testConfigWithoutSourceMap)

assert.Nil(t, err, "error at DigestForFile")
assert.Equal(t, expectedDigest, actualDigest)
assert.Equal(t, map[uint64]string{}, sourceMap)
}

func TestCreatePerformance(t *testing.T) {
Expand All @@ -72,10 +49,8 @@ func TestCreatePerformance(t *testing.T) {
assert.NoError(t, err)

config := &Config{
Reader: file,
KeyPositions: []int{0},
Key: []int{},
SourceMap: false,
Reader: file,
Key: []int{},
}

result, _, _ := Create(config)
Expand Down

0 comments on commit 9775c17

Please sign in to comment.