Skip to content

Commit

Permalink
feat(entrystream): support aggregating entries into an EntrySet (#3363)
Browse files Browse the repository at this point in the history
  • Loading branch information
schroederc committed Jan 11, 2019
1 parent 3a6bffd commit e1b38f5
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 16 deletions.
1 change: 1 addition & 0 deletions kythe/go/platform/tools/entrystream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_binary(
srcs = ["entrystream.go"],
deps = [
"//kythe/go/platform/delimited",
"//kythe/go/storage/entryset",
"//kythe/go/storage/stream",
"//kythe/go/util/compare",
"//kythe/go/util/disksort",
Expand Down
48 changes: 32 additions & 16 deletions kythe/go/platform/tools/entrystream/entrystream.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"strings"

"kythe.io/kythe/go/platform/delimited"
"kythe.io/kythe/go/storage/entryset"
"kythe.io/kythe/go/storage/stream"
"kythe.io/kythe/go/util/compare"
"kythe.io/kythe/go/util/disksort"
Expand All @@ -51,7 +52,7 @@ import (
spb "kythe.io/kythe/proto/storage_go_proto"
)

type entrySet struct {
type entrySet struct { // TODO(schroederc): rename to avoid confusion with EntrySet proto
Source *spb.VName `json:"source"`
Target *spb.VName `json:"target,omitempty"`
EdgeKind string `json:"edge_kind,omitempty"`
Expand All @@ -75,16 +76,19 @@ var (

riegeliOptions = flag.String("riegeli_writer_options", "", "Riegeli writer options")

sortStream = flag.Bool("sort", false, "Sort entry stream into GraphStore order")
uniqEntries = flag.Bool("unique", false, "Print only unique entries (implies --sort)")
entrySets = flag.Bool("entrysets", false, "Print Entry protos as JSON EntrySets (implies --sort and --write_format=json)")
countOnly = flag.Bool("count", false, "Only print the count of protos streamed")
sortStream = flag.Bool("sort", false, "Sort entry stream into GraphStore order")
uniqEntries = flag.Bool("unique", false, "Print only unique entries (implies --sort)")

aggregateEntrySet = flag.Bool("aggregate_entryset", false, "Output a single aggregate EntrySet proto")
entrySets = flag.Bool("entrysets", false, "Print Entry protos as JSON EntrySets (implies --sort and --write_format=json)")
countOnly = flag.Bool("count", false, "Only print the count of protos streamed")

structuredFacts = flag.Bool("structured_facts", false, "Encode and/or decode the fact_value for marked source facts")
)

func init() {
flag.Usage = flagutil.SimpleUsage("Manipulate a stream of Entry messages",
"[--read_format=<format>] [--unique] ([--write_format=<format>] [--sort] | [--entrysets] | [--count])")
"[--read_format=<format>] [--unique] ([--write_format=<format>] [--sort] | [--entrysets] | [--count] | [--aggregate_entryset])")
}

func main() {
Expand Down Expand Up @@ -159,6 +163,26 @@ func main() {
return nil
}))
fmt.Println(count)
case *aggregateEntrySet:
es := entryset.New(nil)
failOnErr(rd(es.Add))
pb := es.Encode()
switch *writeFormat {
case jsonFormat:
encoder := json.NewEncoder(out)
failOnErr(encoder.Encode(pb))
case riegeliFormat:
opts, err := riegeli.ParseOptions(*riegeliOptions)
failOnErr(err)
wr := riegeli.NewWriter(out, opts)
failOnErr(wr.PutProto(pb))
failOnErr(wr.Flush())
case delimitedFormat:
wr := delimited.NewWriter(out)
failOnErr(wr.PutProto(pb))
default:
log.Fatalf("Unsupported --write_format=%s", *writeFormat)
}
case *entrySets:
encoder := json.NewEncoder(out)
var set entrySet
Expand Down Expand Up @@ -200,21 +224,13 @@ func main() {
failOnErr(err)
wr := riegeli.NewWriter(out, opts)
failOnErr(rd(func(entry *spb.Entry) error {
rec, err := proto.Marshal(entry)
if err != nil {
return err
}
return wr.Put(rec)
return wr.PutProto(entry)
}))
failOnErr(wr.Flush())
case delimitedFormat:
wr := delimited.NewWriter(out)
failOnErr(rd(func(entry *spb.Entry) error {
rec, err := proto.Marshal(entry)
if err != nil {
return err
}
return wr.Put(rec)
return wr.PutProto(entry)
}))
default:
log.Fatalf("Unsupported --write_format=%s", *writeFormat)
Expand Down

0 comments on commit e1b38f5

Please sign in to comment.