Skip to content

Commit

Permalink
fix(pipeline): beam output path can contain a filesystem scheme (#3532)
Browse files Browse the repository at this point in the history
  • Loading branch information
robinp authored and schroederc committed Feb 21, 2019
1 parent 7ed4a5f commit eba6ff4
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 3 deletions.
19 changes: 16 additions & 3 deletions kythe/go/serving/pipeline/beamio/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"path/filepath"
"reflect"
"sort"
"strings"
"time"

"github.com/apache/beam/sdks/go/pkg/beam"
Expand Down Expand Up @@ -119,7 +120,7 @@ func (w *writeManifest) ProcessElement(ctx context.Context, _ beam.T, e func(*ta
defer func(start time.Time) { log.Printf("Manifest written in %s", time.Since(start)) }(time.Now())

// Write the CURRENT manifest to the 0'th LevelDB file.
f, err := openWrite(ctx, filepath.Join(w.Path, manifestName))
f, err := openWrite(ctx, schemePreservingPathJoin(w.Path, manifestName))
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -173,7 +174,7 @@ func (w *writeManifest) ProcessElement(ctx context.Context, _ beam.T, e func(*ta
}

// Write the CURRENT pointer to the freshly written manifest file.
currentFile, err := openWrite(ctx, filepath.Join(w.Path, "CURRENT"))
currentFile, err := openWrite(ctx, schemePreservingPathJoin(w.Path, "CURRENT"))
if err != nil {
return 0, err
} else if _, err := io.WriteString(currentFile, manifestName+"\n"); err != nil {
Expand Down Expand Up @@ -226,6 +227,18 @@ var (
conflictingLevelDBValuesCounter = beam.NewCounter("kythe.beamio.leveldb", "conflicting-values")
)

const schemaSeparator = "://"

// schemePreservingPathJoin is like filepath.Join, but doesn't collapse
// the double-slash in the schema prefix, if any.
func schemePreservingPathJoin(p, f string) string {
parts := strings.SplitN(p, schemaSeparator, 2)
if len(parts) == 2 {
return parts[0] + schemaSeparator + filepath.Join(parts[1], f)
}
return filepath.Join(p, f)
}

// ProcessElement writes a set of KeyValues to the an SSTable per shard. Shards
// should be small enough to fit into memory so that they can be sorted.
// TODO(BEAM-4405): use SortValues extension to remove in-memory requirement
Expand Down Expand Up @@ -276,7 +289,7 @@ func (w *writeTable) ProcessElement(ctx context.Context, shard int, kvIter func(
md.Last = els[len(els)-1].Key

// Write each sorted key-value to an SSTable.
f, err := openWrite(ctx, filepath.Join(w.Path, fmt.Sprintf("%06d.ldb", md.Shard)))
f, err := openWrite(ctx, schemePreservingPathJoin(w.Path, fmt.Sprintf("%06d.ldb", md.Shard)))
if err != nil {
return err
}
Expand Down
20 changes: 20 additions & 0 deletions kythe/go/serving/pipeline/beamio/leveldb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,23 @@ func TestLevelDBSink(t *testing.T) {
}

func extendToKey(v beam.T) (beam.T, beam.T) { return v, v }

func TestSchemaPreservingPathJoin(t *testing.T) {
exp := "gs://foo/bar/baz"
res := schemePreservingPathJoin("gs://foo", "bar/baz")
if exp != res {
t.Fatalf("Expected [%s], got [%s]", exp, res)
}

exp = "foo/bar/baz"
res = schemePreservingPathJoin("foo//bar", "baz")
if exp != res {
t.Fatalf("Expected [%s], got [%s]", exp, res)
}

exp = "/foo/bar"
res = schemePreservingPathJoin("/foo/", "bar")
if exp != res {
t.Fatalf("Expected [%s], got [%s]", exp, res)
}
}

0 comments on commit eba6ff4

Please sign in to comment.