Skip to content

Commit

Permalink
Add write buffer pooling
Browse files Browse the repository at this point in the history
Add WriteBufferPool to Dialer and Upgrader. This field specifies a pool
to use for write operations on a connection.  Use of the pool can reduce
memory use when there is a modest write volume over a large number of
connections.

Use larger of hijacked buffer and buffer allocated for connection (if
any) as buffer for building handshake response. This decreases possible
allocations when building the handshake response.

Modify bufio reuse test to call Upgrade instead of the internal
newConnBRW. Move the test from conn_test.go to server_test.go because
it's a serer test.

Update newConn and newConnBRW:

- Move the bufio "hacks" from newConnBRW to separate functions and call
these functions directly from Upgrade.
- Rename newConn to newTestConn and move to conn_test.go. Shorten
argument list to common use case.
- Rename newConnBRW to newConn.
- Add pool code to newConn.
  • Loading branch information
Steven Scott authored and garyburd committed Aug 22, 2018
1 parent 5fb9417 commit b378cae
Show file tree
Hide file tree
Showing 9 changed files with 326 additions and 115 deletions.
13 changes: 12 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ type Dialer struct {
// do not limit the size of the messages that can be sent or received.
ReadBufferSize, WriteBufferSize int

// WriteBufferPool is a pool of buffers for write operations. If the value
// is not set, then write buffers are allocated to the connection for the
// lifetime of the connection.
//
// A pool is most useful when the application has a modest volume of writes
// across a large number of connections.
//
// Applications should use a single pool for each unique value of
// WriteBufferSize.
WriteBufferPool BufferPool

// Subprotocols specifies the client's requested subprotocols.
Subprotocols []string

Expand Down Expand Up @@ -277,7 +288,7 @@ func (d *Dialer) Dial(urlStr string, requestHeader http.Header) (*Conn, *http.Re
}
}

conn := newConn(netConn, false, d.ReadBufferSize, d.WriteBufferSize)
conn := newConn(netConn, false, d.ReadBufferSize, d.WriteBufferSize, d.WriteBufferPool, nil, nil)

if err := req.Write(netConn); err != nil {
return nil, nil, err
Expand Down
6 changes: 3 additions & 3 deletions compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func textMessages(num int) [][]byte {

func BenchmarkWriteNoCompression(b *testing.B) {
w := ioutil.Discard
c := newConn(fakeNetConn{Reader: nil, Writer: w}, false, 1024, 1024)
c := newTestConn(nil, w, false)
messages := textMessages(100)
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -54,7 +54,7 @@ func BenchmarkWriteNoCompression(b *testing.B) {

func BenchmarkWriteWithCompression(b *testing.B) {
w := ioutil.Discard
c := newConn(fakeNetConn{Reader: nil, Writer: w}, false, 1024, 1024)
c := newTestConn(nil, w, false)
messages := textMessages(100)
c.enableWriteCompression = true
c.newCompressionWriter = compressNoContextTakeover
Expand All @@ -66,7 +66,7 @@ func BenchmarkWriteWithCompression(b *testing.B) {
}

func TestValidCompressionLevel(t *testing.T) {
c := newConn(fakeNetConn{}, false, 1024, 1024)
c := newTestConn(nil, nil, false)
for _, level := range []int{minCompressionLevel - 1, maxCompressionLevel + 1} {
if err := c.SetCompressionLevel(level); err == nil {
t.Errorf("no error for level %d", level)
Expand Down
91 changes: 45 additions & 46 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,20 @@ func isValidReceivedCloseCode(code int) bool {
return validReceivedCloseCodes[code] || (code >= 3000 && code <= 4999)
}

// BufferPool represents a pool of buffers. The *sync.Pool type satisfies this
// interface. The type of the value stored in a pool is not specified.
type BufferPool interface {
// Get gets a value from the pool or returns nil if the pool is empty.
Get() interface{}
// Put adds a value to the pool.
Put(interface{})
}

// writePoolData is the type added to the write buffer pool. This wrapper is
// used to prevent applications from peeking at and depending on the values
// added to the pool.
type writePoolData struct{ buf []byte }

// The Conn type represents a WebSocket connection.
type Conn struct {
conn net.Conn
Expand All @@ -232,6 +246,8 @@ type Conn struct {
// Write fields
mu chan bool // used as mutex to protect write to conn
writeBuf []byte // frame is constructed in this buffer.
writePool BufferPool
writeBufSize int
writeDeadline time.Time
writer io.WriteCloser // the current writer returned to the application
isWriting bool // for best-effort concurrent write detection
Expand Down Expand Up @@ -263,71 +279,38 @@ type Conn struct {
newDecompressionReader func(io.Reader) io.ReadCloser
}

func newConn(conn net.Conn, isServer bool, readBufferSize, writeBufferSize int) *Conn {
return newConnBRW(conn, isServer, readBufferSize, writeBufferSize, nil)
}

type writeHook struct {
p []byte
}

func (wh *writeHook) Write(p []byte) (int, error) {
wh.p = p
return len(p), nil
}

func newConnBRW(conn net.Conn, isServer bool, readBufferSize, writeBufferSize int, brw *bufio.ReadWriter) *Conn {
mu := make(chan bool, 1)
mu <- true
func newConn(conn net.Conn, isServer bool, readBufferSize, writeBufferSize int, writeBufferPool BufferPool, br *bufio.Reader, writeBuf []byte) *Conn {

var br *bufio.Reader
if readBufferSize == 0 && brw != nil && brw.Reader != nil {
// Reuse the supplied bufio.Reader if the buffer has a useful size.
// This code assumes that peek on a reader returns
// bufio.Reader.buf[:0].
brw.Reader.Reset(conn)
if p, err := brw.Reader.Peek(0); err == nil && cap(p) >= 256 {
br = brw.Reader
}
}
if br == nil {
if readBufferSize == 0 {
readBufferSize = defaultReadBufferSize
}
if readBufferSize < maxControlFramePayloadSize {
} else if readBufferSize < maxControlFramePayloadSize {
// must be large enough for control frame
readBufferSize = maxControlFramePayloadSize
}
br = bufio.NewReaderSize(conn, readBufferSize)
}

var writeBuf []byte
if writeBufferSize == 0 && brw != nil && brw.Writer != nil {
// Use the bufio.Writer's buffer if the buffer has a useful size. This
// code assumes that bufio.Writer.buf[:1] is passed to the
// bufio.Writer's underlying writer.
var wh writeHook
brw.Writer.Reset(&wh)
brw.Writer.WriteByte(0)
brw.Flush()
if cap(wh.p) >= maxFrameHeaderSize+256 {
writeBuf = wh.p[:cap(wh.p)]
}
if writeBufferSize <= 0 {
writeBufferSize = defaultWriteBufferSize
}
writeBufferSize += maxFrameHeaderSize

if writeBuf == nil {
if writeBufferSize == 0 {
writeBufferSize = defaultWriteBufferSize
}
writeBuf = make([]byte, writeBufferSize+maxFrameHeaderSize)
if writeBuf == nil && writeBufferPool == nil {
writeBuf = make([]byte, writeBufferSize)
}

mu := make(chan bool, 1)
mu <- true
c := &Conn{
isServer: isServer,
br: br,
conn: conn,
mu: mu,
readFinal: true,
writeBuf: writeBuf,
writePool: writeBufferPool,
writeBufSize: writeBufferSize,
enableWriteCompression: true,
compressionLevel: defaultCompressionLevel,
}
Expand Down Expand Up @@ -484,7 +467,19 @@ func (c *Conn) prepWrite(messageType int) error {
c.writeErrMu.Lock()
err := c.writeErr
c.writeErrMu.Unlock()
return err
if err != nil {
return err
}

if c.writeBuf == nil {
wpd, ok := c.writePool.Get().(writePoolData)
if ok {
c.writeBuf = wpd.buf
} else {
c.writeBuf = make([]byte, c.writeBufSize)
}
}
return nil
}

// NextWriter returns a writer for the next message to send. The writer's Close
Expand Down Expand Up @@ -610,6 +605,10 @@ func (w *messageWriter) flushFrame(final bool, extra []byte) error {

if final {
c.writer = nil
if c.writePool != nil {
c.writePool.Put(writePoolData{buf: c.writeBuf})
c.writeBuf = nil
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion conn_broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (b *broadcastBench) makeConns(numConns int) {
conns := make([]*broadcastConn, numConns)

for i := 0; i < numConns; i++ {
c := newConn(fakeNetConn{Reader: nil, Writer: b.w}, true, 1024, 1024)
c := newTestConn(nil, b.w, true)
if b.compression {
c.enableWriteCompression = true
c.newCompressionWriter = compressNoContextTakeover
Expand Down
Loading

0 comments on commit b378cae

Please sign in to comment.