diff --git a/index.js b/index.js index f554740e..dfa07553 100644 --- a/index.js +++ b/index.js @@ -3,10 +3,9 @@ import bencode from 'bencode' import BitField from 'bitfield' import crypto from 'crypto' import Debug from 'debug' -import randombytes from 'randombytes' import RC4 from 'rc4' -import stream from 'readable-stream' -import { hash, concat, equal, hex2arr, arr2hex, text2arr, arr2text } from 'uint8-util' +import { Duplex } from 'streamx' +import { hash, concat, equal, hex2arr, arr2hex, text2arr, arr2text, randomBytes } from 'uint8-util' import throughput from 'throughput' import arrayRemove from 'unordered-array-remove' @@ -62,11 +61,11 @@ class HaveAllBitField { set (index) {} } -class Wire extends stream.Duplex { +class Wire extends Duplex { constructor (type = null, retries = 0, peEnabled = false) { super() - this._debugId = arr2hex(randombytes(4)) + this._debugId = arr2hex(randomBytes(4)) this._debug('new wire') this.peerId = null // remote peer id (hex string) @@ -117,7 +116,6 @@ class Wire extends stream.Duplex { this._timeoutMs = 0 this._timeoutExpiresAt = null - this.destroyed = false // was the wire ended by calling `destroy`? this._finished = false this._parserSize = 0 // number of needed bytes to parse next message from remote peer @@ -188,18 +186,17 @@ class Wire extends stream.Duplex { destroy () { if (this.destroyed) return - this.destroyed = true this._debug('destroy') - this.emit('close') this.end() return this } - end (...args) { + end (data) { + if (this.destroyed || this.destroying) return this._debug('end') this._onUninterested() this._onChoke() - return super.end(...args) + return super.end(data) } /** @@ -250,14 +247,14 @@ class Wire extends stream.Duplex { sendPe1 () { if (this._peEnabled) { const padALen = Math.floor(Math.random() * 513) - const padA = randombytes(padALen) + const padA = randomBytes(padALen) this._push(concat([hex2arr(this._myPubKey), padA])) } } sendPe2 () { const padBLen = Math.floor(Math.random() * 513) - const padB = randombytes(padBLen) + const padB = randomBytes(padBLen) this._push(concat([hex2arr(this._myPubKey), padB])) } @@ -270,8 +267,8 @@ class Wire extends stream.Duplex { const hash3Buffer = await hash(hex2arr(this._utfToHex('req3') + this._sharedSecret)) const hashesXorBuffer = xor(hash2Buffer, hash3Buffer) - const padCLen = new DataView(randombytes(2).buffer).getUint16(0) % 512 - const padCBuffer = randombytes(padCLen) + const padCLen = new DataView(randomBytes(2).buffer).getUint16(0) % 512 + const padCBuffer = randomBytes(padCLen) let vcAndProvideBuffer = new Uint8Array(8 + 4 + 2 + padCLen + 2) vcAndProvideBuffer.set(VC) @@ -290,8 +287,8 @@ class Wire extends stream.Duplex { async sendPe4 (infoHash) { await this.setEncrypt(this._sharedSecret, infoHash) - const padDLen = new DataView(randombytes(2).buffer).getUint16(0) % 512 - const padDBuffer = randombytes(padDLen) + const padDLen = new DataView(randomBytes(2).buffer).getUint16(0) % 512 + const padDBuffer = randomBytes(padDLen) let vcAndSelectBuffer = new Uint8Array(8 + 4 + 2 + padDLen) const view = new DataView(vcAndSelectBuffer.buffer) @@ -654,12 +651,6 @@ class Wire extends stream.Duplex { return true } - /** - * Duplex stream method. Called whenever the remote peer stream wants data. No-op - * since we'll just push data whenever we get it. - */ - _read () {} - /** * Send a message to the remote peer. */ @@ -974,10 +965,9 @@ class Wire extends stream.Duplex { * Once enough bytes have arrived to process the message, the callback function * (i.e. `this._parser`) gets called with the full buffer of data. * @param {Uint8Array} data - * @param {string} encoding * @param {function} cb */ - _write (data, encoding, cb) { + _write (data, cb) { if (this._encryptionMethod === 2 && this._cryptoHandshakeDone) { data = this._decrypt(data) } diff --git a/package.json b/package.json index b9305118..223d4c2f 100644 --- a/package.json +++ b/package.json @@ -15,9 +15,8 @@ "bencode": "^3.0.3", "bitfield": "^4.1.0", "debug": "^4.3.4", - "randombytes": "^2.1.0", "rc4": "^0.1.5", - "readable-stream": "^3.6.0", + "streamx": "^2.12.5", "throughput": "^1.0.1", "uint8-util": "^2.1.7", "unordered-array-remove": "^1.0.2" diff --git a/test/protocol.js b/test/protocol.js index fc968bcb..844a0895 100644 --- a/test/protocol.js +++ b/test/protocol.js @@ -55,9 +55,11 @@ test('Asynchronous handshake + extended handshake', t => { eventLog.push('w1 ex') t.ok(obj) - // Last step: ensure handshakes came before extension protocol - t.deepEqual(eventLog, ['w2 hs', 'w1 hs', 'w2 ex', 'w1 ex']) - t.end() + queueMicrotask(() => { + // Last step: ensure handshakes came before extension protocol + t.deepEqual(eventLog, ['w2 hs', 'w1 hs', 'w1 ex', 'w2 ex']) + t.end() + }) } }) @@ -68,7 +70,7 @@ test('Asynchronous handshake + extended handshake', t => { t.equal(extensions.extended, true) // Respond asynchronously - process.nextTick(() => { + queueMicrotask(() => { wire2.handshake(infoHash, peerId) }) }) @@ -169,11 +171,11 @@ test('No duplicate `have` events for same piece', t => { t.equal(haveEvents, 0) t.equal(!!wire.peerPieces.get(0), false) wire.have(0) - process.nextTick(() => { + queueMicrotask(() => { t.equal(haveEvents, 1, 'emitted event for new piece') t.equal(!!wire.peerPieces.get(0), true) wire.have(0) - process.nextTick(() => { + queueMicrotask(() => { t.equal(haveEvents, 1, 'not emitted event for preexisting piece') t.equal(!!wire.peerPieces.get(0), true) }) @@ -198,7 +200,7 @@ test('Fast Extension: handshake when unsupported', t => { wire2.on('handshake', (infoHash, peerId, extensions) => { t.equal(extensions.fast, true) // Respond asynchronously - process.nextTick(() => { + queueMicrotask(() => { wire2.handshake(infoHash, peerId, { fast: false }) // no support }) }) @@ -224,7 +226,7 @@ test('Fast Extension: handshake when supported', t => { wire2.on('handshake', (infoHash, peerId, extensions) => { t.equal(extensions.fast, true) // Respond asynchronously - process.nextTick(() => { + queueMicrotask(() => { wire2.handshake(infoHash, peerId, { fast: true }) }) })