Skip to content

Commit

Permalink
fix: reject on error and activation guards for Fast Extension (#79)
Browse files Browse the repository at this point in the history
  • Loading branch information
paullouisageneau committed Jan 20, 2022
1 parent 94f72e9 commit d59075b
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 7 deletions.
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,46 @@ wire.on('keep-alive', () => {
// peer sent a keep alive - just ignore it
})
```
### fast extension (BEP 6)

This module has built-in support for the
[BitTorrent Fast Extension (BEP 6)](http://www.bittorrent.org/beps/bep_0006.html).

The Fast Extension introduces several messages to make the protocol more efficient:
have-none, have-all, suggest, reject, and allowed-fast.

```js
wire.handshake(infoHash, peerId, { fast: true })

wire.hasFast // true if Fast Extension is available, required to call the following methods

wire.haveNone() // instead of wire.bitfield(buffer) with an all-zero buffer
wire.on('have-none', () => {
// instead of bitfield with an all-zero buffer
})

wire.haveAll() // instead of wire.bitfield(buffer) with an all-one buffer
wire.on('have-all', () => {
// instead of bitfield with an all-one buffer
})

wire.suggest(pieceIndex) // suggest requesting a piece to the peer
wire.on('suggest', (pieceIndex) => {
// peer suggests requesting piece
})

wire.on('allowed-fast', (pieceIndex) => {
// piece may be obtained from peer while choked
})

wire.peerAllowedFastSet // list of allowed-fast pieces

// Note rejection is handled automatically on choke or request error
wire.reject(pieceIndex, offset, length) // reject a request
wire.on('reject', (pieceIndex, offset, length) => {
// peer rejected a request
})
```

### extension protocol (BEP 10)

Expand Down
23 changes: 19 additions & 4 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -342,15 +342,15 @@ class Wire extends stream.Duplex {
if (this.extensions.dht) reserved[7] |= 0x01
if (this.extensions.fast) reserved[7] |= 0x04

this._push(Buffer.concat([MESSAGE_PROTOCOL, reserved, infoHashBuffer, peerIdBuffer]))
this._handshakeSent = true

// BEP6 Fast Extension: The extension is enabled only if both ends of the connection set this bit.
if (this.extensions.fast && this.peerExtensions.fast) {
this._debug('fast extension is enabled')
this.hasFast = true
}

this._push(Buffer.concat([MESSAGE_PROTOCOL, reserved, infoHashBuffer, peerIdBuffer]))
this._handshakeSent = true

if (this.peerExtensions.extended && !this._extendedHandshakeSent) {
// Peer's handshake indicated support already
// (incoming connection)
Expand Down Expand Up @@ -522,6 +522,7 @@ class Wire extends stream.Duplex {
* @param {number} index
*/
suggest (index) {
if (!this.hasFast) throw Error('fast extension is disabled')
this._debug('suggest %d', index)
this._message(0x0D, [index], null)
}
Expand Down Expand Up @@ -823,7 +824,11 @@ class Wire extends stream.Duplex {

const respond = (err, buffer) => {
if (request !== this._pull(this.peerRequests, index, offset, length)) return
if (err) return this._debug('error satisfying request index=%d offset=%d length=%d (%s)', index, offset, length, err.message)
if (err) {
this._debug('error satisfying request index=%d offset=%d length=%d (%s)', index, offset, length, err.message)
if (this.hasFast) this.reject(index, offset, length)
return
}
this.piece(index, offset, buffer)
}

Expand Down Expand Up @@ -857,6 +862,7 @@ class Wire extends stream.Duplex {
// BEP6: the peer MUST close the connection
this._debug('Error: got suggest whereas fast extension is disabled')
this.destroy()
return
}
this._debug('got suggest %d', index)
this.emit('suggest', index)
Expand All @@ -867,6 +873,7 @@ class Wire extends stream.Duplex {
// BEP6: the peer MUST close the connection
this._debug('Error: got have-all whereas fast extension is disabled')
this.destroy()
return
}
this._debug('got have-all')
this.peerPieces = new HaveAllBitField()
Expand All @@ -878,6 +885,7 @@ class Wire extends stream.Duplex {
// BEP6: the peer MUST close the connection
this._debug('Error: got have-none whereas fast extension is disabled')
this.destroy()
return
}
this._debug('got have-none')
this.emit('have-none')
Expand All @@ -888,6 +896,7 @@ class Wire extends stream.Duplex {
// BEP6: the peer MUST close the connection
this._debug('Error: got reject whereas fast extension is disabled')
this.destroy()
return
}
this._debug('got reject index=%d offset=%d length=%d', index, offset, length)
this._callback(
Expand All @@ -899,6 +908,12 @@ class Wire extends stream.Duplex {
}

_onAllowedFast (index) {
if (!this.hasFast) {
// BEP6: the peer MUST close the connection
this._debug('Error: got allowed-fast whereas fast extension is disabled')
this.destroy()
return
}
this._debug('got allowed-fast %d', index)
if (!this.peerAllowedFastSet.includes(index)) this.peerAllowedFastSet.push(index)
if (this.peerAllowedFastSet.length > ALLOWED_FAST_SET_MAX_LENGTH) this.peerAllowedFastSet.shift()
Expand Down
145 changes: 142 additions & 3 deletions test/protocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,58 @@ test('No duplicate `have` events for same piece', t => {
})
})

test('Fast Extension: handshake when unsupported', t => {
t.plan(4)

const wire1 = new Protocol()
const wire2 = new Protocol()
wire1.pipe(wire2).pipe(wire1)
wire1.on('error', err => { t.fail(err) })
wire2.on('error', err => { t.fail(err) })

wire1.on('handshake', (infoHash, peerId, extensions) => {
t.equal(extensions.fast, false)
t.equal(wire1.hasFast, false)
t.equal(wire2.hasFast, false)
})

wire2.on('handshake', (infoHash, peerId, extensions) => {
t.equal(extensions.fast, true)
// Respond asynchronously
process.nextTick(() => {
wire2.handshake(infoHash, peerId, { fast: false }) // no support
})
})

wire1.handshake(Buffer.from('01234567890123456789'), Buffer.from('12345678901234567890'), { fast: true })
})

test('Fast Extension: handshake when supported', t => {
t.plan(4)

const wire1 = new Protocol()
const wire2 = new Protocol()
wire1.pipe(wire2).pipe(wire1)
wire1.on('error', err => { t.fail(err) })
wire2.on('error', err => { t.fail(err) })

wire1.on('handshake', (infoHash, peerId, extensions) => {
t.equal(extensions.fast, true)
t.equal(wire1.hasFast, true)
t.equal(wire2.hasFast, true)
})

wire2.on('handshake', (infoHash, peerId, extensions) => {
t.equal(extensions.fast, true)
// Respond asynchronously
process.nextTick(() => {
wire2.handshake(infoHash, peerId, { fast: true })
})
})

wire1.handshake(Buffer.from('01234567890123456789'), Buffer.from('12345678901234567890'), { fast: true })
})

test('Fast Extension: have-all', t => {
t.plan(2)

Expand Down Expand Up @@ -260,7 +312,7 @@ test('Fast Extension: allowed-fast', t => {
wire.handshake(Buffer.from('01234567890123456789'), Buffer.from('12345678901234567890'), { fast: true })
})

test('Fast Extension: reject', t => {
test('Fast Extension: reject on choke', t => {
t.plan(14)

const wire = new Protocol()
Expand All @@ -284,7 +336,6 @@ test('Fast Extension: reject', t => {
})

wire.on('request', (i, offset, length, callback) => {
t.equal(wire.peerRequests.length, 1)
t.equal(wire.peerRequests.length, 1)
t.equal(i, 0)
t.equal(offset, 2)
Expand All @@ -298,9 +349,97 @@ test('Fast Extension: reject', t => {
t.equal(wire.requests.length, 1) // not implicitly cancelled
})

wire.on('rejected', () => {
wire.on('reject', () => {
t.equal(wire.requests.length, 0)
})

wire.handshake(Buffer.from('01234567890123456789'), Buffer.from('12345678901234567890'), { fast: true })
})

test('Fast Extension: reject on error', t => {
t.plan(12)

const wire = new Protocol()
wire.on('error', err => { t.fail(err) })
wire.pipe(wire)

wire.once('handshake', (infoHash, peerId, extensions) => {
t.equal(wire.extensions.fast, true)
t.equal(wire.peerExtensions.fast, true)
t.equal(wire.hasFast, true)
wire.unchoke()
})

wire.once('unchoke', () => {
t.equal(wire.requests.length, 0)
wire.request(6, 66, 666, (err, buffer) => {
t.ok(err)
})
t.equal(wire.requests.length, 1)
t.equal(wire.peerRequests.length, 0)
})

wire.on('request', (i, offset, length, callback) => {
t.equal(wire.peerRequests.length, 1)
t.equal(i, 6)
t.equal(offset, 66)
t.equal(length, 666)
callback(new Error('cannot satisfy'), null)
})

wire.on('reject', () => {
t.equal(wire.requests.length, 0)
})

wire.handshake(Buffer.from('01234567890123456789'), Buffer.from('12345678901234567890'), { fast: true })
})

test('Fast Extension disabled: have-all', t => {
t.plan(3)
const wire = new Protocol()
t.equal(wire.hasFast, false)
t.throws(() => wire.haveAll())
wire.on('have-all', () => { t.fail() })
wire.on('close', () => { t.pass('wire closed') })
wire._onHaveAll()
})

test('Fast Extension disabled: have-none', t => {
t.plan(3)
const wire = new Protocol()
t.equal(wire.hasFast, false)
t.throws(() => wire.haveNone())
wire.on('have-none', () => { t.fail() })
wire.on('close', () => { t.pass('wire closed') })
wire._onHaveNone()
})

test('Fast Extension disabled: suggest', t => {
t.plan(3)
const wire = new Protocol()
t.equal(wire.hasFast, false)
t.throws(() => wire.suggest(42))
wire.on('suggest', () => { t.fail() })
wire.on('close', () => { t.pass('wire closed') })
wire._onSuggest(42)
})

test('Fast Extension disabled: allowed-fast', t => {
t.plan(3)
const wire = new Protocol()
t.equal(wire.hasFast, false)
t.throws(() => wire.allowedFast(42))
wire.on('allowed-fast', () => { t.fail() })
wire.on('close', () => { t.pass('wire closed') })
wire._onAllowedFast(42)
})

test('Fast Extension disabled: reject', t => {
t.plan(3)
const wire = new Protocol()
t.equal(wire.hasFast, false)
t.throws(() => wire.reject(42, 0, 99))
wire.on('reject', () => { t.fail() })
wire.on('close', () => { t.pass('wire closed') })
wire._onReject(42, 0, 99)
})

0 comments on commit d59075b

Please sign in to comment.