Skip to content

Commit

Permalink
fix: migrate to streamx (#96)
Browse files Browse the repository at this point in the history
* fix: migrate to streamx

* fix: drop randombytes
  • Loading branch information
ThaUnknown committed May 30, 2023
1 parent f17c7aa commit 9b77f6c
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 34 deletions.
38 changes: 14 additions & 24 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ import bencode from 'bencode'
import BitField from 'bitfield'
import crypto from 'crypto'
import Debug from 'debug'
import randombytes from 'randombytes'
import RC4 from 'rc4'
import stream from 'readable-stream'
import { hash, concat, equal, hex2arr, arr2hex, text2arr, arr2text } from 'uint8-util'
import { Duplex } from 'streamx'
import { hash, concat, equal, hex2arr, arr2hex, text2arr, arr2text, randomBytes } from 'uint8-util'
import throughput from 'throughput'
import arrayRemove from 'unordered-array-remove'

Expand Down Expand Up @@ -62,11 +61,11 @@ class HaveAllBitField {
set (index) {}
}

class Wire extends stream.Duplex {
class Wire extends Duplex {
constructor (type = null, retries = 0, peEnabled = false) {
super()

this._debugId = arr2hex(randombytes(4))
this._debugId = arr2hex(randomBytes(4))
this._debug('new wire')

this.peerId = null // remote peer id (hex string)
Expand Down Expand Up @@ -117,7 +116,6 @@ class Wire extends stream.Duplex {
this._timeoutMs = 0
this._timeoutExpiresAt = null

this.destroyed = false // was the wire ended by calling `destroy`?
this._finished = false

this._parserSize = 0 // number of needed bytes to parse next message from remote peer
Expand Down Expand Up @@ -188,18 +186,17 @@ class Wire extends stream.Duplex {

destroy () {
if (this.destroyed) return
this.destroyed = true
this._debug('destroy')
this.emit('close')
this.end()
return this
}

end (...args) {
end (data) {
if (this.destroyed || this.destroying) return
this._debug('end')
this._onUninterested()
this._onChoke()
return super.end(...args)
return super.end(data)
}

/**
Expand Down Expand Up @@ -250,14 +247,14 @@ class Wire extends stream.Duplex {
sendPe1 () {
if (this._peEnabled) {
const padALen = Math.floor(Math.random() * 513)
const padA = randombytes(padALen)
const padA = randomBytes(padALen)
this._push(concat([hex2arr(this._myPubKey), padA]))
}
}

sendPe2 () {
const padBLen = Math.floor(Math.random() * 513)
const padB = randombytes(padBLen)
const padB = randomBytes(padBLen)
this._push(concat([hex2arr(this._myPubKey), padB]))
}

Expand All @@ -270,8 +267,8 @@ class Wire extends stream.Duplex {
const hash3Buffer = await hash(hex2arr(this._utfToHex('req3') + this._sharedSecret))
const hashesXorBuffer = xor(hash2Buffer, hash3Buffer)

const padCLen = new DataView(randombytes(2).buffer).getUint16(0) % 512
const padCBuffer = randombytes(padCLen)
const padCLen = new DataView(randomBytes(2).buffer).getUint16(0) % 512
const padCBuffer = randomBytes(padCLen)

let vcAndProvideBuffer = new Uint8Array(8 + 4 + 2 + padCLen + 2)
vcAndProvideBuffer.set(VC)
Expand All @@ -290,8 +287,8 @@ class Wire extends stream.Duplex {
async sendPe4 (infoHash) {
await this.setEncrypt(this._sharedSecret, infoHash)

const padDLen = new DataView(randombytes(2).buffer).getUint16(0) % 512
const padDBuffer = randombytes(padDLen)
const padDLen = new DataView(randomBytes(2).buffer).getUint16(0) % 512
const padDBuffer = randomBytes(padDLen)
let vcAndSelectBuffer = new Uint8Array(8 + 4 + 2 + padDLen)
const view = new DataView(vcAndSelectBuffer.buffer)

Expand Down Expand Up @@ -654,12 +651,6 @@ class Wire extends stream.Duplex {
return true
}

/**
* Duplex stream method. Called whenever the remote peer stream wants data. No-op
* since we'll just push data whenever we get it.
*/
_read () {}

/**
* Send a message to the remote peer.
*/
Expand Down Expand Up @@ -974,10 +965,9 @@ class Wire extends stream.Duplex {
* Once enough bytes have arrived to process the message, the callback function
* (i.e. `this._parser`) gets called with the full buffer of data.
* @param {Uint8Array} data
* @param {string} encoding
* @param {function} cb
*/
_write (data, encoding, cb) {
_write (data, cb) {
if (this._encryptionMethod === 2 && this._cryptoHandshakeDone) {
data = this._decrypt(data)
}
Expand Down
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
"bencode": "^3.0.3",
"bitfield": "^4.1.0",
"debug": "^4.3.4",
"randombytes": "^2.1.0",
"rc4": "^0.1.5",
"readable-stream": "^3.6.0",
"streamx": "^2.12.5",
"throughput": "^1.0.1",
"uint8-util": "^2.1.7",
"unordered-array-remove": "^1.0.2"
Expand Down
18 changes: 10 additions & 8 deletions test/protocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ test('Asynchronous handshake + extended handshake', t => {
eventLog.push('w1 ex')
t.ok(obj)

// Last step: ensure handshakes came before extension protocol
t.deepEqual(eventLog, ['w2 hs', 'w1 hs', 'w2 ex', 'w1 ex'])
t.end()
queueMicrotask(() => {
// Last step: ensure handshakes came before extension protocol
t.deepEqual(eventLog, ['w2 hs', 'w1 hs', 'w1 ex', 'w2 ex'])
t.end()
})
}
})

Expand All @@ -68,7 +70,7 @@ test('Asynchronous handshake + extended handshake', t => {
t.equal(extensions.extended, true)

// Respond asynchronously
process.nextTick(() => {
queueMicrotask(() => {
wire2.handshake(infoHash, peerId)
})
})
Expand Down Expand Up @@ -169,11 +171,11 @@ test('No duplicate `have` events for same piece', t => {
t.equal(haveEvents, 0)
t.equal(!!wire.peerPieces.get(0), false)
wire.have(0)
process.nextTick(() => {
queueMicrotask(() => {
t.equal(haveEvents, 1, 'emitted event for new piece')
t.equal(!!wire.peerPieces.get(0), true)
wire.have(0)
process.nextTick(() => {
queueMicrotask(() => {
t.equal(haveEvents, 1, 'not emitted event for preexisting piece')
t.equal(!!wire.peerPieces.get(0), true)
})
Expand All @@ -198,7 +200,7 @@ test('Fast Extension: handshake when unsupported', t => {
wire2.on('handshake', (infoHash, peerId, extensions) => {
t.equal(extensions.fast, true)
// Respond asynchronously
process.nextTick(() => {
queueMicrotask(() => {
wire2.handshake(infoHash, peerId, { fast: false }) // no support
})
})
Expand All @@ -224,7 +226,7 @@ test('Fast Extension: handshake when supported', t => {
wire2.on('handshake', (infoHash, peerId, extensions) => {
t.equal(extensions.fast, true)
// Respond asynchronously
process.nextTick(() => {
queueMicrotask(() => {
wire2.handshake(infoHash, peerId, { fast: true })
})
})
Expand Down

0 comments on commit 9b77f6c

Please sign in to comment.