Skip to content

Commit

Permalink
drivers: Refactor package as a more generic and performant library (#…
Browse files Browse the repository at this point in the history
…2223)

* drivers: Make Size an optional property of files

The Filebase S3-compatible API does not return a
file size in the response for files greater than 255 bytes.

That caused a panic on s3.go:267 when reading recordings.

With this change, the problem won't happen anymore cause
we don't even try to dereference the ContentLength, but
hold it as a nilable pointer that should be checked wherever
it is used (right now: nowhere).

* drivers: Receive Reader instead of buffer on save

We want to use this lib for saving large files to ObjectStore
as well for the VOD tasks. Since they can be really large, we
don't want to have to buffer the whole files in memory for
saving. So changing this API to receive a Reader instead makes
more sense.

* core+server: Fix all usage of the drivers pkg

* core+server+verification: Fix tests

* drivers: Use S3 uploader for saving data

Avoid the need to buffer the whole input in memory before saving.
Will also be more performant for large files.

* drivers: Tune s3 uploader for better performance

Increase the concurrency and the part size so the uploads
are more performant in our production environment.

These are also more compatible with Storj's configuration
which saves files in 64MB chunks on the decentralized network,
so that part size also works well for them. But it still improves
upload performance to gcloud in around 10x as well.

* core: Use undeprecated session.NewSession

It returns an error so had to change a few
signatures.

* drivers: Create consts for uploader configs

Also add comments with context about how those numbers came up.

* Update CHANGELOG_PENDING
  • Loading branch information
victorges committed May 19, 2022
1 parent 30b7330 commit 13165dd
Show file tree
Hide file tree
Showing 20 changed files with 192 additions and 111 deletions.
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### General
- \#2383 Add E2E Tests for checking Livepeer on-chain interactions (@leszko)
- \#2223 Refactor `drivers` package as a reusable and more performant lib (@victorges)

#### Broadcaster
- \#2392 Add LP_EXTEND_TIMEOUTS env variable to extend timeouts for Stream Tester (@leszko)
Expand Down
3 changes: 2 additions & 1 deletion core/capabilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"context"
"io"
"sort"
"testing"
"time"
Expand Down Expand Up @@ -394,7 +395,7 @@ func (os *stubOS) GetInfo() *net.OSInfo {
return &net.OSInfo{StorageType: net.OSInfo_StorageType(os.storageType)}
}
func (os *stubOS) EndSession() {}
func (os *stubOS) SaveData(context.Context, string, []byte, map[string]string, time.Duration) (string, error) {
func (os *stubOS) SaveData(context.Context, string, io.Reader, map[string]string, time.Duration) (string, error) {
return "", nil
}
func (os *stubOS) IsExternal() bool { return false }
Expand Down
7 changes: 4 additions & 3 deletions core/livepeernode_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package core

import (
"bytes"
"context"
"fmt"
"math/big"
Expand Down Expand Up @@ -132,23 +133,23 @@ func TestServiceURIChange(t *testing.T) {

drivers.NodeStorage = drivers.NewMemoryDriver(n.GetServiceURI())
sesh := drivers.NodeStorage.NewSession("testpath")
savedUrl, err := sesh.SaveData(context.TODO(), "testdata1", []byte{0, 0, 0}, nil, 0)
savedUrl, err := sesh.SaveData(context.TODO(), "testdata1", bytes.NewReader([]byte{0, 0, 0}), nil, 0)
require.Nil(err)
assert.Equal("test://testurl.com/stream/testpath/testdata1", savedUrl)

glog.Infof("Setting service URL to newurl")
newUrl, err := url.Parse("test://newurl.com")
n.SetServiceURI(newUrl)
require.Nil(err)
furl, err := sesh.SaveData(context.TODO(), "testdata2", []byte{0, 0, 0}, nil, 0)
furl, err := sesh.SaveData(context.TODO(), "testdata2", bytes.NewReader([]byte{0, 0, 0}), nil, 0)
require.Nil(err)
assert.Equal("test://newurl.com/stream/testpath/testdata2", furl)

glog.Infof("Setting service URL to secondurl")
secondUrl, err := url.Parse("test://secondurl.com")
n.SetServiceURI(secondUrl)
require.Nil(err)
surl, err := sesh.SaveData(context.TODO(), "testdata3", []byte{0, 0, 0}, nil, 0)
surl, err := sesh.SaveData(context.TODO(), "testdata3", bytes.NewReader([]byte{0, 0, 0}), nil, 0)
require.Nil(err)
assert.Equal("test://secondurl.com/stream/testpath/testdata3", surl)
}
Expand Down
3 changes: 2 additions & 1 deletion core/orchestrator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package core

import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
Expand Down Expand Up @@ -540,7 +541,7 @@ func (n *LivepeerNode) transcodeSeg(ctx context.Context, config transcodeConfig,
// Need to store segment in our local OS
var err error
name := fmt.Sprintf("%d.tempfile", seg.SeqNo)
url, err = config.LocalOS.SaveData(ctx, name, seg.Data, nil, 0)
url, err = config.LocalOS.SaveData(ctx, name, bytes.NewReader(seg.Data), nil, 0)
if err != nil {
return terr(err)
}
Expand Down
2 changes: 1 addition & 1 deletion core/playlistmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func TestCleanup(t *testing.T) {
testData := []byte{1, 2, 3, 4}

c := NewBasicPlaylistManager(mid, osSession, nil)
uri, err := c.GetOSSession().SaveData(context.TODO(), "testName", testData, nil, 0)
uri, err := c.GetOSSession().SaveData(context.TODO(), "testName", bytes.NewReader(testData), nil, 0)
if err != nil {
t.Fatal(err)
}
Expand Down
11 changes: 6 additions & 5 deletions drivers/drivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package drivers

import (
"bytes"
"context"
"crypto/tls"
"fmt"
Expand Down Expand Up @@ -43,7 +44,7 @@ type FileInfo struct {
Name string
ETag string
LastModified time.Time
Size int64
Size *int64
}

type FileInfoReader struct {
Expand All @@ -62,7 +63,7 @@ type PageInfo interface {
type OSSession interface {
OS() OSDriver

SaveData(ctx context.Context, name string, data []byte, meta map[string]string, timeout time.Duration) (string, error)
SaveData(ctx context.Context, name string, data io.Reader, meta map[string]string, timeout time.Duration) (string, error)
EndSession()

// Info in order to have this session used via RPC
Expand Down Expand Up @@ -137,7 +138,7 @@ func ParseOSURL(input string, useFullAPI bool) (OSDriver, error) {
return nil, fmt.Errorf("password is required with s3:// OS")
}
base := path.Base(u.Path)
return NewS3Driver(u.Host, base, u.User.Username(), pw, useFullAPI), nil
return NewS3Driver(u.Host, base, u.User.Username(), pw, useFullAPI)
}
// custom s3-compatible store
if u.Scheme == "s3+http" || u.Scheme == "s3+https" {
Expand All @@ -162,7 +163,7 @@ func ParseOSURL(input string, useFullAPI bool) (OSDriver, error) {
if !ok {
return nil, fmt.Errorf("password is required with s3:// OS")
}
return NewCustomS3Driver(hosturl.String(), bucket, region, u.User.Username(), pw, useFullAPI), nil
return NewCustomS3Driver(hosturl.String(), bucket, region, u.User.Username(), pw, useFullAPI)
}
if u.Scheme == "gs" {
file := u.User.Username()
Expand Down Expand Up @@ -192,7 +193,7 @@ func SaveRetried(ctx context.Context, sess OSSession, name string, data []byte,
var uri string
var err error
for i := 0; i < retryCount; i++ {
uri, err = sess.SaveData(ctx, name, data, meta, 0)
uri, err = sess.SaveData(ctx, name, bytes.NewReader(data), meta, 0)
if err == nil {
return uri, err
}
Expand Down
17 changes: 10 additions & 7 deletions drivers/gs.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package drivers

import (
"bytes"
"context"
"crypto"
"crypto/rand"
Expand Down Expand Up @@ -153,7 +152,7 @@ func (os *gsSession) createClient() error {
return nil
}

func (os *gsSession) SaveData(ctx context.Context, name string, data []byte, meta map[string]string, timeout time.Duration) (string, error) {
func (os *gsSession) SaveData(ctx context.Context, name string, data io.Reader, meta map[string]string, timeout time.Duration) (string, error) {
if os.useFullAPI {
if os.client == nil {
if err := os.createClient(); err != nil {
Expand All @@ -164,7 +163,7 @@ func (os *gsSession) SaveData(ctx context.Context, name string, data []byte, met
objh := os.client.Bucket(os.bucket).Object(keyname)
clog.V(common.VERBOSE).Infof(ctx, "Saving to GS %s/%s", os.bucket, keyname)
if timeout == 0 {
timeout = saveTimeout
timeout = defaultSaveTimeout
}
ctx, cancel := context.WithTimeout(clog.Clone(context.Background(), ctx), timeout)
defer cancel()
Expand All @@ -175,8 +174,12 @@ func (os *gsSession) SaveData(ctx context.Context, name string, data []byte, met
for k, v := range meta {
wr.Metadata[k] = v
}
wr.ContentType = os.getContentType(name, data)
_, err := io.Copy(wr, bytes.NewReader(data))
data, contentType, err := os.peekContentType(name, data)
if err != nil {
return "", err
}
wr.ContentType = contentType
_, err = io.Copy(wr, data)
err2 := wr.Close()
if err != nil {
return "", err
Expand Down Expand Up @@ -240,7 +243,7 @@ func (gspi *gsPageInfo) listFiles() error {
Name: attrs.Name,
ETag: attrs.Etag,
LastModified: attrs.Updated,
Size: attrs.Size,
Size: &attrs.Size,
}
gspi.files = append(gspi.files, fi)
}
Expand Down Expand Up @@ -303,7 +306,7 @@ func (os *gsSession) ReadData(ctx context.Context, name string) (*FileInfoReader
}
res := &FileInfoReader{}
res.Name = name
res.Size = attrs.Size
res.Size = &attrs.Size
res.ETag = attrs.Etag
res.LastModified = attrs.Updated
if len(attrs.Metadata) > 0 {
Expand Down
15 changes: 11 additions & 4 deletions drivers/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"net/url"
"path"
Expand Down Expand Up @@ -123,7 +124,8 @@ func (ostore *MemorySession) ListFiles(ctx context.Context, prefix, delim string
}
} else {
if pprefix == "" || strings.HasPrefix(it.name, pprefix) {
fi := FileInfo{Name: path.Join(cachePath, it.name), Size: int64(len(it.data))}
size := int64(len(it.data))
fi := FileInfo{Name: path.Join(cachePath, it.name), Size: &size}
pi.files = append(pi.files, fi)
}
}
Expand All @@ -139,10 +141,11 @@ func (ostore *MemorySession) ReadData(ctx context.Context, name string) (*FileIn
if data == nil {
return nil, errors.New("Not found")
}
size := int64(len(data))
res := &FileInfoReader{
FileInfo: FileInfo{
Name: name,
Size: int64(len(data)),
Size: &size,
},
Body: ioutil.NopCloser(bytes.NewReader(data)),
}
Expand Down Expand Up @@ -195,7 +198,7 @@ func (ostore *MemorySession) GetInfo() *net.OSInfo {
return nil
}

func (ostore *MemorySession) SaveData(ctx context.Context, name string, data []byte, meta map[string]string, timeout time.Duration) (string, error) {
func (ostore *MemorySession) SaveData(ctx context.Context, name string, data io.Reader, meta map[string]string, timeout time.Duration) (string, error) {
path, file := path.Split(ostore.getAbsolutePath(name))

ostore.dLock.Lock()
Expand All @@ -205,8 +208,12 @@ func (ostore *MemorySession) SaveData(ctx context.Context, name string, data []b
return "", fmt.Errorf("Session ended")
}

bytes, err := ioutil.ReadAll(data)
if err != nil {
return "", err
}
dc := ostore.getCacheForStream(path)
dc.Insert(file, data)
dc.Insert(file, bytes)

return ostore.getAbsoluteURI(name), nil
}
Expand Down
16 changes: 5 additions & 11 deletions drivers/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,13 @@ import (
"context"
"fmt"
"net/url"
"strings"
"testing"

"github.com/golang/glog"
"github.com/stretchr/testify/assert"
)

func copyBytes(src string) []byte {
srcb := []byte(src)
dst := make([]byte, len(srcb))
copy(dst, srcb)
return dst
}

func TestLocalOS(t *testing.T) {
tempData1 := "dataitselftempdata1"
tempData2 := "dataitselftempdata2"
Expand All @@ -31,17 +25,17 @@ func TestLocalOS(t *testing.T) {
assert.NoError((err))
os := NewMemoryDriver(u)
sess := os.NewSession(("sesspath")).(*MemorySession)
path, err := sess.SaveData(context.TODO(), "name1/1.ts", copyBytes(tempData1), nil, 0)
path, err := sess.SaveData(context.TODO(), "name1/1.ts", strings.NewReader(tempData1), nil, 0)
glog.Info(path)
fmt.Println(path)
assert.Equal("fake.com/url/stream/sesspath/name1/1.ts", path)
data := sess.GetData("sesspath/name1/1.ts")
fmt.Printf("got Data: '%s'\n", data)
assert.Equal(tempData1, string(data))
path, err = sess.SaveData(context.TODO(), "name1/1.ts", copyBytes(tempData2), nil, 0)
path, err = sess.SaveData(context.TODO(), "name1/1.ts", strings.NewReader(tempData2), nil, 0)
data = sess.GetData("sesspath/name1/1.ts")
assert.Equal(tempData2, string(data))
path, err = sess.SaveData(context.TODO(), "name1/2.ts", copyBytes(tempData3), nil, 0)
path, err = sess.SaveData(context.TODO(), "name1/2.ts", strings.NewReader(tempData3), nil, 0)
data = sess.GetData("sesspath/name1/2.ts")
assert.Equal(tempData3, string(data))
// Test trim prefix when baseURI != nil
Expand All @@ -56,7 +50,7 @@ func TestLocalOS(t *testing.T) {
// Test trim prefix when baseURI = nil
os = NewMemoryDriver(nil)
sess = os.NewSession("sesspath").(*MemorySession)
path, err = sess.SaveData(context.TODO(), "name1/1.ts", copyBytes(tempData1), nil, 0)
path, err = sess.SaveData(context.TODO(), "name1/1.ts", strings.NewReader(tempData1), nil, 0)
assert.Nil(err)
assert.Equal("/stream/sesspath/name1/1.ts", path)

Expand Down
3 changes: 2 additions & 1 deletion drivers/overwrite_queue.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package drivers

import (
"bytes"
"context"
"time"

Expand Down Expand Up @@ -69,7 +70,7 @@ func (oq *OverwriteQueue) workerLoop() {
data = oq.getLastMessage(data)
glog.V(common.VERBOSE).Infof("Start saving %s name=%s bytes=%d try=%d", oq.desc, oq.name, len(data), try)
now := time.Now()
_, err = oq.session.SaveData(context.Background(), oq.name, data, nil, timeout)
_, err = oq.session.SaveData(context.Background(), oq.name, bytes.NewReader(data), nil, timeout)
took := time.Since(now)
if err == nil {
glog.V(common.VERBOSE).Infof("Saving %s name=%s bytes=%d took=%s try=%d", oq.desc, oq.name,
Expand Down
23 changes: 18 additions & 5 deletions drivers/overwrite_queue_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package drivers

import (
"bytes"
"errors"
"io"
"io/ioutil"
"testing"
"time"

"github.com/stretchr/testify/mock"
)

// waitForQueueToClear used in tests
Expand All @@ -30,7 +35,7 @@ func TestOverwriteQueueShouldCallSave(t *testing.T) {
data1 := []byte("data01")

var meta map[string]string
mos.On("SaveData", "f1", data1, meta, timeout).Return("not used", nil).Once()
mos.On("SaveData", "f1", dataReader(data1), meta, timeout).Return("not used", nil).Once()

oq.Save(data1)
oq.waitForQueueToClear(5 * time.Second)
Expand All @@ -47,9 +52,9 @@ func TestOverwriteQueueShouldRetry(t *testing.T) {
data1 := []byte("data01")

var meta map[string]string
mos.On("SaveData", "f1", data1, meta, timeout).Return("not used", errors.New("no1")).Once()
mos.On("SaveData", "f1", dataReader(data1), meta, timeout).Return("not used", errors.New("no1")).Once()
timeout = time.Duration(float64(timeout) * timeoutMultiplier)
mos.On("SaveData", "f1", data1, meta, timeout).Return("not used", nil).Once()
mos.On("SaveData", "f1", dataReader(data1), meta, timeout).Return("not used", nil).Once()

oq.Save(data1)
oq.waitForQueueToClear(5 * time.Second)
Expand All @@ -68,8 +73,8 @@ func TestOverwriteQueueShouldUseLastValue(t *testing.T) {
data3 := []byte("data03")

var meta map[string]string
mos.On("SaveData", "f1", dataw1, meta, timeout).Return("not used", nil).Once()
mos.On("SaveData", "f1", data3, meta, timeout).Return("not used", nil).Once()
mos.On("SaveData", "f1", dataReader(dataw1), meta, timeout).Return("not used", nil).Once()
mos.On("SaveData", "f1", dataReader(data3), meta, timeout).Return("not used", nil).Once()

mos.waitForCh = true
oq.Save(dataw1)
Expand All @@ -84,3 +89,11 @@ func TestOverwriteQueueShouldUseLastValue(t *testing.T) {
oq.StopAfter(0)
time.Sleep(10 * time.Millisecond)
}

func dataReader(expected []byte) interface{} {
return mock.MatchedBy(func(r io.Reader) bool {
data, err := ioutil.ReadAll(r)
_, seekErr := r.(*bytes.Reader).Seek(0, 0)
return err == nil && bytes.Equal(data, expected) && seekErr == nil
})
}
Loading

0 comments on commit 13165dd

Please sign in to comment.