Skip to content

Commit

Permalink
Introduce v2 version of finding differences
Browse files Browse the repository at this point in the history
  • Loading branch information
aswinkarthik committed Feb 28, 2019
1 parent 00e3b5b commit 8016dff
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 221 deletions.
40 changes: 34 additions & 6 deletions cmd/formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"encoding/json"
"fmt"
"io"
"strings"

"github.com/aswinkarthik/csvdiff/pkg/digest"
)

// Formatter defines the interface through which differences
// can be formatted and displayed
type Formatter interface {
Format(digest.Difference) error
Format(digest.Differences) error
}

// RowMarkFormatter formats diff by marking each row as
Expand All @@ -22,20 +23,30 @@ type RowMarkFormatter struct {
}

// Format prints the diff to os.Stdout
func (f *RowMarkFormatter) Format(diff digest.Difference) error {
func (f *RowMarkFormatter) Format(diff digest.Differences) error {
fmt.Fprintf(f.Stderr, "Additions %d\n", len(diff.Additions))
fmt.Fprintf(f.Stderr, "Modifications %d\n", len(diff.Modifications))
fmt.Fprintf(f.Stderr, "Rows:\n")

additions := make([]string, 0, len(diff.Additions))
for _, addition := range diff.Additions {
additions = append(additions, strings.Join(addition, ","))
}

modifications := make([]string, 0, len(diff.Modifications))
for _, modification := range diff.Modifications {
modifications = append(modifications, strings.Join(modification.Current, ","))
}

for _, added := range diff.Additions {
for _, added := range additions {
_, err := fmt.Fprintf(f.Stdout, "%s,%s\n", added, "ADDED")

if err != nil {
return fmt.Errorf("error when formatting additions with RowMark formatter: %v", err)
}
}

for _, modified := range diff.Modifications {
for _, modified := range modifications {
_, err := fmt.Fprintf(f.Stdout, "%s,%s\n", modified, "MODIFIED")

if err != nil {
Expand All @@ -52,9 +63,26 @@ type JSONFormatter struct {
Stdout io.Writer
}

// JSONDifference is a struct to represent legacy JSON format
type JSONDifference struct {
Additions []string
Modifications []string
}

// Format prints the diff as a JSON
func (f *JSONFormatter) Format(diff digest.Difference) error {
data, err := json.MarshalIndent(diff, "", " ")
func (f *JSONFormatter) Format(diff digest.Differences) error {
additions := make([]string, 0, len(diff.Additions))
for _, addition := range diff.Additions {
additions = append(additions, strings.Join(addition, ","))
}

modifications := make([]string, 0, len(diff.Modifications))
for _, modification := range diff.Modifications {
modifications = append(modifications, strings.Join(modification.Current, ","))
}

jsonDiff := JSONDifference{Additions: additions, Modifications: modifications}
data, err := json.MarshalIndent(jsonDiff, "", " ")

if err != nil {
return fmt.Errorf("error when serializing with JSON formatter: %v", err)
Expand Down
12 changes: 6 additions & 6 deletions cmd/formatter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (

func TestJSONFormat(t *testing.T) {
var formatter cmd.Formatter
diff := digest.Difference{
Additions: []string{"additions"},
Modifications: []string{"modification"},
diff := digest.Differences{
Additions: []digest.Addition{[]string{"additions"}},
Modifications: []digest.Modification{digest.Modification{Current: []string{"modification"}}},
}
expected := `{
"Additions": [
Expand All @@ -36,9 +36,9 @@ func TestJSONFormat(t *testing.T) {

func TestRowMarkFormatter(t *testing.T) {
var formatter cmd.Formatter
diff := digest.Difference{
Additions: []string{"additions"},
Modifications: []string{"modification"},
diff := digest.Differences{
Additions: []digest.Addition{[]string{"additions"}},
Modifications: []digest.Modification{digest.Modification{Current: []string{"modification"}}},
}
expectedStdout := `additions,ADDED
modification,MODIFIED
Expand Down
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ Most suitable for csv files created from database tables`,
false,
)

diff, err := digest.Diff(baseConfig, deltaConfig)
diff, err := digest.Diff(*baseConfig, *deltaConfig)

if err != nil {
fmt.Fprintf(os.Stderr, "csvdiff failed: %v\n", err)
Expand Down
153 changes: 66 additions & 87 deletions pkg/digest/diff.go
Original file line number Diff line number Diff line change
@@ -1,123 +1,102 @@
package digest

import (
"encoding/csv"
"fmt"
"runtime"
"sync"
)

// Difference represents the additions and modifications
// between the two Configs
type Difference struct {
Additions []string
Modifications []string
}

type messageType int

const (
addition messageType = iota
modification messageType = iota
)

type diffMessage struct {
_type messageType
value string
// Differences represents the differences
// between 2 csv content
type Differences struct {
Additions []Addition
Modifications []Modification
}

// Diff will differentiate between two given configs
func Diff(baseConfig, deltaConfig *Config) (Difference, error) {
maxProcs := runtime.NumCPU()
base, _, err := Create(baseConfig)

if err != nil {
return Difference{}, fmt.Errorf("error in base file: %v", err)
}
// Addition is a row appearing in delta but missing in base
type Addition []string

additions := make([]string, 0, len(base))
modifications := make([]string, 0, len(base))
// Modification is a row present in both delta and base
// with the values column changed in delta
type Modification struct {
Original []string
Current []string
}

messageChan := make(chan []diffMessage, bufferSize*maxProcs)
errorChannel := make(chan error)
defer close(errorChannel)
type message struct {
original []string
current []string
_type messageType
}

go readAndCompare(base, deltaConfig, messageChan, errorChannel)
// Diff finds the Differences between baseConfig and deltaConfig
func Diff(baseConfig, deltaConfig Config) (Differences, error) {
baseEngine := NewEngine(baseConfig)
baseDigestChannel, baseErrorChannel := baseEngine.StreamDigests()

for msgs := range messageChan {
for _, msg := range msgs {
if msg._type == addition {
additions = append(additions, msg.value)
} else if msg._type == modification {
modifications = append(modifications, msg.value)
}
baseFileDigest := NewFileDigest()
for digests := range baseDigestChannel {
for _, d := range digests {
baseFileDigest.Append(d)
}
}

if err := <-errorChannel; err != nil {
return Difference{}, fmt.Errorf("error in delta file: %v", err)
if err := <-baseErrorChannel; err != nil {
return Differences{}, fmt.Errorf("error processing base file: %v", err)
}

return Difference{Additions: additions, Modifications: modifications}, nil
}

func readAndCompare(base map[uint64]uint64, config *Config, msgChannel chan<- []diffMessage, errorChannel chan<- error) {
reader := csv.NewReader(config.Reader)
var wg sync.WaitGroup
for {
lines, eofReached, err := getNextNLines(reader)

if err != nil {
wg.Wait()
close(msgChannel)
errorChannel <- err
return
}
deltaConfig.KeepSource = true
deltaEngine := NewEngine(deltaConfig)
deltaDigestChannel, deltaErrorChannel := deltaEngine.StreamDigests()

wg.Add(1)
go compareDigestForNLines(base, lines, config, msgChannel, &wg)
additions := make([]Addition, 0)
modifications := make([]Modification, 0)

if eofReached {
break
msgChannel := streamDifferences(baseFileDigest, deltaDigestChannel)
for msg := range msgChannel {
switch msg._type {
case addition:
additions = append(additions, msg.current)
case modification:
modifications = append(modifications, Modification{Original: msg.original, Current: msg.current})
}
}
wg.Wait()
close(msgChannel)
errorChannel <- nil

if err := <-deltaErrorChannel; err != nil {
return Differences{}, fmt.Errorf("error processing delta file: %v", err)
}

return Differences{Additions: additions, Modifications: modifications}, nil
}

func compareDigestForNLines(base map[uint64]uint64,
lines [][]string,
config *Config,
msgChannel chan<- []diffMessage,
wg *sync.WaitGroup,
) {
output := make([]diffMessage, len(lines))
diffCounter := 0
for _, line := range lines {
digest := CreateDigest(line, config.Key, config.Value)
if baseValue, present := base[digest.Key]; present {
// Present in both base and delta
if baseValue != digest.Value {
value := config.Include.MapToValue(line)
// Modification
output[diffCounter] = diffMessage{
value: value,
_type: modification,
func streamDifferences(baseFileDigest *FileDigest, digestChannel chan []Digest) chan message {
maxProcs := runtime.NumCPU()
msgChannel := make(chan message, maxProcs*bufferSize)

go func(base *FileDigest, digestChannel chan []Digest, msgChannel chan message) {
defer close(msgChannel)

for digests := range digestChannel {
for _, d := range digests {
if baseValue, present := base.Digests[d.Key]; present {
if baseValue != d.Value {
// Modification
msgChannel <- message{_type: modification, current: d.Source, original: base.SourceMap[d.Key]}
}
} else {
// Addition
msgChannel <- message{_type: addition, current: d.Source}
}
diffCounter++
}
} else {
value := config.Include.MapToValue(line)
// Not present in base. So Addition.
output[diffCounter] = diffMessage{
value: value,
_type: addition,
}
diffCounter++
}
}

msgChannel <- output[:diffCounter]
wg.Done()
}(baseFileDigest, digestChannel, msgChannel)

return msgChannel
}
Loading

0 comments on commit 8016dff

Please sign in to comment.