Skip to content
This repository has been archived by the owner on Aug 30, 2024. It is now read-only.

Commit

Permalink
feat: Add Benchmarks to test WebRTC connections
Browse files Browse the repository at this point in the history
  • Loading branch information
kylecarbs committed Jun 15, 2021
1 parent 544f276 commit 5ff4902
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 4 deletions.
41 changes: 40 additions & 1 deletion wsnet/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,22 @@ import (
"fmt"
"net"
"net/url"
"sync"
"time"

"github.com/pion/datachannel"
"github.com/pion/webrtc/v3"
)

const (
httpScheme = "http"

bufferedAmountLowThreshold uint64 = 512 * 1024 // 512 KB
maxBufferedAmount uint64 = 1024 * 1024 // 1 MB
// For some reason messages larger just don't work...
// This shouldn't be a huge deal for real-world usage.
// See: https://github.com/pion/datachannel/issues/59
maxMessageLength = 32 * 1024 // 32 KB
)

// TURNEndpoint returns the TURN address for a Coder baseURL.
Expand Down Expand Up @@ -43,19 +52,49 @@ func ConnectEndpoint(baseURL *url.URL, workspace, token string) string {

type conn struct {
addr *net.UnixAddr
dc *webrtc.DataChannel
rw datachannel.ReadWriteCloser

sendMore chan struct{}
closedMutex sync.Mutex
closed bool
}

func (c *conn) init() {
c.sendMore = make(chan struct{})
c.dc.SetBufferedAmountLowThreshold(bufferedAmountLowThreshold)
c.dc.OnBufferedAmountLow(func() {
c.closedMutex.Lock()
defer c.closedMutex.Unlock()
if c.closed {
return
}
c.sendMore <- struct{}{}
})
}

func (c *conn) Read(b []byte) (n int, err error) {
return c.rw.Read(b)
}

func (c *conn) Write(b []byte) (n int, err error) {
if len(b) > maxMessageLength {
return 0, fmt.Errorf("outbound packet larger than maximum message size: %d", maxMessageLength)
}
if c.dc.BufferedAmount()+uint64(len(b)) >= maxBufferedAmount {
<-c.sendMore
}
return c.rw.Write(b)
}

func (c *conn) Close() error {
return c.rw.Close()
c.closedMutex.Lock()
defer c.closedMutex.Unlock()
if !c.closed {
c.closed = true
close(c.sendMore)
}
return c.dc.Close()
}

func (c *conn) LocalAddr() net.Addr {
Expand Down
7 changes: 5 additions & 2 deletions wsnet/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,14 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.
return nil, ctx.Err()
}

return &conn{
c := &conn{
addr: &net.UnixAddr{
Name: address,
Net: network,
},
dc: dc,
rw: rw,
}, nil
}
c.init()
return c, nil
}
68 changes: 68 additions & 0 deletions wsnet/dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package wsnet
import (
"bytes"
"context"
"crypto/rand"
"errors"
"io"
"net"
"strconv"
"testing"

"github.com/pion/webrtc/v3"
Expand Down Expand Up @@ -160,3 +162,69 @@ func TestDial(t *testing.T) {
}
})
}

func BenchmarkThroughput(b *testing.B) {
sizes := []int64{
// 4,
// 16,
// 256,
// 1024,
// 4096,
// 16384,
32768,
}

listener, err := net.Listen("tcp", "0.0.0.0:0")
if err != nil {
b.Error(err)
return
}
go func() {
for {
conn, err := listener.Accept()
if err != nil {
b.Error(err)
return
}
go func() {
_, _ = io.Copy(io.Discard, conn)
}()
}
}()
connectAddr, listenAddr := createDumbBroker(b)
_, err = Listen(context.Background(), listenAddr)
if err != nil {
b.Error(err)
return
}

dialer, err := DialWebsocket(context.Background(), connectAddr, nil)
if err != nil {
b.Error(err)
return
}
for _, size := range sizes {
size := size
bytes := make([]byte, size)
_, _ = rand.Read(bytes)
b.Run("Rand"+strconv.Itoa(int(size)), func(b *testing.B) {
b.SetBytes(size)
b.ReportAllocs()

conn, err := dialer.DialContext(context.Background(), listener.Addr().Network(), listener.Addr().String())
if err != nil {
b.Error(err)
return
}
defer conn.Close()

for i := 0; i < b.N; i++ {
_, err := conn.Write(bytes)
if err != nil {
b.Error(err)
break
}
}
})
}
}
25 changes: 25 additions & 0 deletions wsnet/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,31 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
_, _ = io.Copy(rw, conn)
}()
_, _ = io.Copy(conn, rw)

// bufs := make(chan []byte, 32)
// go func() {
// defer close(bufs)

// for {
// buf := <-bufs
// _, _ = conn.Write(buf)
// }
// }()

// buf := make([]byte, maxMessageLength)
// for {
// nr, err := rw.Read(buf)
// if nr > 0 {
// select {
// case bufs <- buf[0:nr]:
// default:
// }

// }
// if err != nil {
// break
// }
// }
})
}
}
Expand Down
4 changes: 4 additions & 0 deletions wsnet/rtc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/pion/dtls/v2"
"github.com/pion/ice/v2"
"github.com/pion/logging"
"github.com/pion/turn/v2"
"github.com/pion/webrtc/v3"
)
Expand Down Expand Up @@ -159,6 +160,9 @@ func newPeerConnection(servers []webrtc.ICEServer) (*webrtc.PeerConnection, erro
se.SetSrflxAcceptanceMinWait(0)
se.DetachDataChannels()
se.SetICETimeouts(time.Second*5, time.Second*5, time.Second*2)
lf := logging.NewDefaultLoggerFactory()
lf.DefaultLogLevel = logging.LogLevelDebug
se.LoggerFactory = lf

// If one server is provided and we know it's TURN, we can set the
// relay acceptable so the connection starts immediately.
Expand Down
2 changes: 1 addition & 1 deletion wsnet/wsnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

// createDumbBroker proxies sockets between /listen and /connect
// to emulate an authenticated WebSocket pair.
func createDumbBroker(t *testing.T) (connectAddr string, listenAddr string) {
func createDumbBroker(t testing.TB) (connectAddr string, listenAddr string) {
listener, err := net.Listen("tcp4", "127.0.0.1:0")
if err != nil {
t.Error(err)
Expand Down

0 comments on commit 5ff4902

Please sign in to comment.