Skip to content

Commit

Permalink
Add engine to create file level digest
Browse files Browse the repository at this point in the history
  • Loading branch information
aswinkarthik committed Feb 28, 2019
1 parent f185a04 commit 00e3b5b
Show file tree
Hide file tree
Showing 6 changed files with 352 additions and 37 deletions.
39 changes: 39 additions & 0 deletions pkg/digest/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package digest

import "io"

// Config represents configurations that can be passed
// to create a Digest.
//
// Key: The primary key positions
// Value: The Value positions that needs to be compared for diff
// Include: Include these positions in output. It is Value positions by default.
// KeepSource: return the source and target string if diff is computed
type Config struct {
Key Positions
Value Positions
Include Positions
Reader io.Reader
KeepSource bool
}

// NewConfig creates an instance of Config struct.
func NewConfig(
r io.Reader,
primaryKey Positions,
valueColumns Positions,
includeColumns Positions,
keepSource bool,
) *Config {
if len(includeColumns) == 0 {
includeColumns = valueColumns
}

return &Config{
Reader: r,
Key: primaryKey,
Value: valueColumns,
Include: includeColumns,
KeepSource: keepSource,
}
}
37 changes: 0 additions & 37 deletions pkg/digest/digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package digest

import (
"encoding/csv"
"io"
"runtime"
"sync"

Expand Down Expand Up @@ -38,42 +37,6 @@ func CreateDigestWithSource(csv []string, pKey Positions, pRow Positions) Digest
return Digest{Key: key, Value: digest, Source: csv}
}

// Config represents configurations that can be passed
// to create a Digest.
//
// Key: The primary key positions
// Value: The Value positions that needs to be compared for diff
// Include: Include these positions in output. It is Value positions by default.
// KeepSource: return the source and target string if diff is computed
type Config struct {
Key Positions
Value Positions
Include Positions
Reader io.Reader
KeepSource bool
}

// NewConfig creates an instance of Config struct.
func NewConfig(
r io.Reader,
primaryKey Positions,
valueColumns Positions,
includeColumns Positions,
keepSource bool,
) *Config {
if len(includeColumns) == 0 {
includeColumns = valueColumns
}

return &Config{
Reader: r,
Key: primaryKey,
Value: valueColumns,
Include: includeColumns,
KeepSource: keepSource,
}
}

const bufferSize = 512

// Create can create a Digest using the Configurations passed.
Expand Down
109 changes: 109 additions & 0 deletions pkg/digest/engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package digest

import (
"encoding/csv"
"runtime"
"sync"
)

// Engine to create a FileDigest
type Engine struct {
config Config
reader *csv.Reader
lock *sync.Mutex
digestChannel chan []Digest
errorChannel chan error
}

// NewEngine instantiates an engine
func NewEngine(config Config) *Engine {
maxProcs := runtime.NumCPU()
digestChannel := make(chan []Digest, bufferSize*maxProcs)
errorChannel := make(chan error)

return &Engine{
config: config,
lock: &sync.Mutex{},
digestChannel: digestChannel,
errorChannel: errorChannel,
}
}

// Close the engine after use
func (e Engine) Close() {
close(e.errorChannel)
}

// GenerateFileDigest generates FileDigest with thread safety
func (e Engine) GenerateFileDigest() (*FileDigest, error) {
e.lock.Lock()
defer e.lock.Unlock()

fd := NewFileDigest()

var appendFunc func(Digest)

if e.config.KeepSource {
appendFunc = fd.Append
} else {
appendFunc = fd.AppendWithoutSource
}

go e.createDigestsInBackground()

for digests := range e.digestChannel {
for _, digest := range digests {
appendFunc(digest)
}
}

if err := <-e.errorChannel; err != nil {
return nil, err
}

return fd, nil
}

func (e Engine) createDigestsInBackground() {
wg := &sync.WaitGroup{}
reader := csv.NewReader(e.config.Reader)

for {
lines, eofReached, err := getNextNLines(reader)
if err != nil {
wg.Wait()
close(e.digestChannel)
e.errorChannel <- err
return
}

wg.Add(1)
go e.digestForLines(lines, wg)

if eofReached {
break
}
}
wg.Wait()
close(e.digestChannel)
e.errorChannel <- nil
}

func (e Engine) digestForLines(lines [][]string, wg *sync.WaitGroup) {
output := make([]Digest, 0, len(lines))
var createDigestFunc func(csv []string, pKey Positions, pRow Positions) Digest
config := e.config

if config.KeepSource {
createDigestFunc = CreateDigestWithSource
} else {
createDigestFunc = CreateDigest
}

for _, line := range lines {
output = append(output, createDigestFunc(line, config.Key, config.Value))
}

e.digestChannel <- output
wg.Done()
}
103 changes: 103 additions & 0 deletions pkg/digest/engine_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package digest_test

import (
"encoding/csv"
"strings"
"testing"

"github.com/aswinkarthik/csvdiff/pkg/digest"
"github.com/cespare/xxhash"
"github.com/stretchr/testify/assert"
)

func TestEngine_GenerateFileDigest(t *testing.T) {
firstLine := "1,first-line,some-columne,friday"
firstKey := xxhash.Sum64String("1")
firstDigest := xxhash.Sum64String(firstLine)
fridayDigest := xxhash.Sum64String("friday")

secondLine := "2,second-line,nobody-needs-this,saturday"
secondKey := xxhash.Sum64String("2")
secondDigest := xxhash.Sum64String(secondLine)
saturdayDigest := xxhash.Sum64String("saturday")

t.Run("should create digest for given key and all values", func(t *testing.T) {
conf := digest.Config{
Reader: strings.NewReader(firstLine + "\n" + secondLine),
Key: []int{0},
}

engine := digest.NewEngine(conf)
defer engine.Close()

fd, err := engine.GenerateFileDigest()

assert.NoError(t, err)

expectedDigest := map[uint64]uint64{firstKey: firstDigest, secondKey: secondDigest}

assert.Equal(t, expectedDigest, fd.Digests)
})

t.Run("should create digest skeeping source", func(t *testing.T) {
conf := digest.Config{
Reader: strings.NewReader(firstLine + "\n" + secondLine),
Key: []int{0},
KeepSource: true,
}

engine := digest.NewEngine(conf)
defer engine.Close()

fd, err := engine.GenerateFileDigest()

assert.NoError(t, err)

expectedDigest := map[uint64]uint64{firstKey: firstDigest, secondKey: secondDigest}
expectedSourceMap := map[uint64][]string{
firstKey: strings.Split(firstLine, ","),
secondKey: strings.Split(secondLine, ","),
}

assert.Equal(t, expectedDigest, fd.Digests)
assert.Equal(t, expectedSourceMap, fd.SourceMap)
})

t.Run("should create digest for given key and given values", func(t *testing.T) {
conf := digest.Config{
Reader: strings.NewReader(firstLine + "\n" + secondLine),
Key: []int{0},
Value: []int{3},
}

engine := digest.NewEngine(conf)
defer engine.Close()

fd, err := engine.GenerateFileDigest()

expectedDigest := map[uint64]uint64{firstKey: fridayDigest, secondKey: saturdayDigest}

assert.NoError(t, err)
assert.Equal(t, expectedDigest, fd.Digests)
})

t.Run("should return ParseError if csv reading fails", func(t *testing.T) {
conf := digest.Config{
Reader: strings.NewReader(firstLine + "\n" + "some-random-line"),
Key: []int{0},
Value: []int{3},
}

engine := digest.NewEngine(conf)
defer engine.Close()

fd, err := engine.GenerateFileDigest()

assert.Error(t, err)

_, isParseError := err.(*csv.ParseError)

assert.True(t, isParseError)
assert.Nil(t, fd)
})
}
44 changes: 44 additions & 0 deletions pkg/digest/file_digest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package digest

import (
"sync"
)

// FileDigest represents the digests created from one file
type FileDigest struct {
Digests map[uint64]uint64
SourceMap map[uint64][]string
lock *sync.Mutex
}

// NewFileDigest to instantiate a new FileDigest
func NewFileDigest() *FileDigest {
return &FileDigest{
Digests: make(map[uint64]uint64),
SourceMap: make(map[uint64][]string),
lock: &sync.Mutex{},
}
}

// Append a Digest to a FileDigest
// This operation is not thread safe
func (f *FileDigest) Append(d Digest) {
f.Digests[d.Key] = d.Value
f.SourceMap[d.Key] = d.Source
}

// AppendWithoutSource will append a Digest to a FileDigest without copying Source
// This operation is not thread safe
func (f *FileDigest) AppendWithoutSource(d Digest) {
f.Digests[d.Key] = d.Value
}

// SafeAppend a Digest to a FileDigest
// This operation is thread safe
func (f *FileDigest) SafeAppend(d Digest) {
f.lock.Lock()
defer f.lock.Unlock()

f.Digests[d.Key] = d.Value
f.SourceMap[d.Key] = d.Source
}
Loading

0 comments on commit 00e3b5b

Please sign in to comment.