Skip to content

Commit

Permalink
Run digests of two files and the comparator in background
Browse files Browse the repository at this point in the history
  • Loading branch information
aswinkarthik committed Apr 15, 2018
1 parent 09358c7 commit 8a9b106
Showing 1 changed file with 77 additions and 45 deletions.
122 changes: 77 additions & 45 deletions cmd/digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ package cmd
import (
"encoding/json"
"fmt"
"io"
"log"
"os"
"strings"
"sync"

"github.com/aswinkarthik93/csv-digest/pkg/digest"
"github.com/spf13/cobra"
Expand All @@ -37,6 +39,35 @@ It can also serialize the output as a binary file for any other go program to co
},
}

var debug bool

var newLine []byte

func init() {
rootCmd.AddCommand(digestCmd)
newLine = []byte{'\n'}

// Here you will define your flags and configuration settings.

// Cobra supports Persistent Flags which will work for this command
// and all subcommands, e.g.:

// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
// digestCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")

digestCmd.Flags().StringVarP(&config.Base, "base", "b", "", "Input csv to be used as base")
digestCmd.Flags().StringVarP(&config.Input, "input", "i", "", "The new csv file on which diff should be done")
digestCmd.Flags().StringVarP(&config.Encoder, "encoder", "e", "json", "Encoder to use to output the digest. Available Encoders: "+strings.Join(GetEncoders(), ","))
digestCmd.Flags().IntSliceVarP(&config.KeyPositions, "key-positions", "k", []int{0}, "Primary key positions of the Input CSV as comma separated values Eg: 1,2")
digestCmd.Flags().BoolVarP(&debug, "debug", "", false, "Debug mode")
digestCmd.Flags().StringVarP(&config.Additions, "additions", "a", "STDOUT", "Output stream for the additions in delta file")
digestCmd.Flags().StringVarP(&config.Modifications, "modifications", "m", "STDOUT", "Output stream for the modifications in delta file")

digestCmd.MarkFlagRequired("base")
digestCmd.MarkFlagRequired("input")
}

func runDigest() {
if str, err := json.Marshal(config); err == nil && debug {
fmt.Println(string(str))
Expand All @@ -59,62 +90,63 @@ func runDigest() {
SourceMap: true,
}

base, _, err := digest.Create(baseConfig)
if err != nil {
log.Fatal(err)
}
log.Println("Generated Base Digest")
var wg sync.WaitGroup
baseChannel := make(chan message)
deltaChannel := make(chan message)

change, sourceMap, err := digest.Create(inputConfig)
if err != nil {
log.Fatal(err)
}
log.Println("Generated Input file Digest")
wg.Add(1)
go generateInBackground("base", baseConfig, &wg, baseChannel)

additions, modifications := digest.Compare(base, change)
wg.Add(1)
go generateInBackground("delta", inputConfig, &wg, deltaChannel)

log.Println(fmt.Sprintf("Additions Count: %d", len(additions)))
aWriter := config.AdditionsWriter()
defer aWriter.Close()
newLine := []byte{'\n'}
for _, addition := range additions {
aWriter.Write([]byte(sourceMap[addition]))
aWriter.Write(newLine)
}
wg.Add(1)
go compareInBackgroud(baseChannel, deltaChannel, &wg)

fmt.Println("")
wg.Wait()
}

log.Println(fmt.Sprintf("Modifications Count: %d", len(modifications)))
mWriter := config.ModificationsWriter()
defer mWriter.Close()
for _, modification := range modifications {
mWriter.Write([]byte(sourceMap[modification]))
mWriter.Write(newLine)
}
type message struct {
digestMap map[uint64]uint64
sourceMap map[uint64]string
}

var debug bool
func generateInBackground(name string, config digest.DigestConfig, wg *sync.WaitGroup, channel chan<- message) {
digest, sourceMap, err := digest.Create(config)
if err != nil {
panic(err)
}

func init() {
rootCmd.AddCommand(digestCmd)
log.Println("Generated Digest for " + name)
channel <- message{digestMap: digest, sourceMap: sourceMap}
close(channel)
wg.Done()
}

// Here you will define your flags and configuration settings.
func compareInBackgroud(baseChannel, deltaChannel <-chan message, wg *sync.WaitGroup) {
baseMessage := <-baseChannel
deltaMessage := <-deltaChannel

// Cobra supports Persistent Flags which will work for this command
// and all subcommands, e.g.:
additions, modifications := digest.Compare(baseMessage.digestMap, deltaMessage.digestMap)

// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
// digestCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
aWriter := config.AdditionsWriter()
mWriter := config.ModificationsWriter()
defer aWriter.Close()
defer mWriter.Close()

digestCmd.Flags().StringVarP(&config.Base, "base", "b", "", "Input csv to be used as base")
digestCmd.Flags().StringVarP(&config.Input, "input", "i", "", "The new csv file on which diff should be done")
digestCmd.Flags().StringVarP(&config.Encoder, "encoder", "e", "json", "Encoder to use to output the digest. Available Encoders: "+strings.Join(GetEncoders(), ","))
digestCmd.Flags().IntSliceVarP(&config.KeyPositions, "key-positions", "k", []int{0}, "Primary key positions of the Input CSV as comma separated values Eg: 1,2")
digestCmd.Flags().BoolVarP(&debug, "debug", "", false, "Debug mode")
digestCmd.Flags().StringVarP(&config.Additions, "additions", "a", "STDOUT", "Output stream for the additions in delta file")
digestCmd.Flags().StringVarP(&config.Modifications, "modifications", "m", "STDOUT", "Output stream for the modifications in delta file")
fmt.Println()
print("Additions", aWriter, additions, deltaMessage.sourceMap)
fmt.Println()
print("Modifications", mWriter, modifications, deltaMessage.sourceMap)
fmt.Println()
wg.Done()
}

digestCmd.MarkFlagRequired("base")
digestCmd.MarkFlagRequired("input")
func print(recordType string, w io.Writer, positions []uint64, content map[uint64]string) {
log.Println(fmt.Sprintf("%s Count: %d", recordType, len(positions)))

for _, i := range positions {
w.Write([]byte(content[i]))
w.Write(newLine)
}
}

0 comments on commit 8a9b106

Please sign in to comment.