From 7c6bc642addb3e6fee1b1fdc84f83a72ff11ca4a Mon Sep 17 00:00:00 2001 From: Yan Ivan Evdokimov Date: Wed, 31 Jan 2024 10:54:42 +0200 Subject: [PATCH] fix(stream): premature close when it is paused (#2416) * Fix premature close * Add test for premature close of the stream when it is paused * Restore package-lock.json --- lib/commands/query.js | 12 +++++++----- test/integration/connection/test-stream.js | 7 +++++++ 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/lib/commands/query.js b/lib/commands/query.js index 1b3725d73f..67386bbfba 100644 --- a/lib/commands/query.js +++ b/lib/commands/query.js @@ -30,7 +30,7 @@ class Query extends Command { this._receivedFieldsCount = 0; this._resultIndex = 0; this._localStream = null; - this._unpipeStream = function() {}; + this._unpipeStream = function () { }; this._streamFactory = options.infileStreamFactory; this._connection = null; } @@ -155,7 +155,7 @@ class Query extends Command { const onPause = () => { this._localStream.pause(); }; - const onData = function(data) { + const onData = function (data) { const dataWithHeader = Buffer.allocUnsafe(data.length + 4); data.copy(dataWithHeader, 4); connection.writePacket( @@ -227,7 +227,7 @@ class Query extends Command { } /* eslint no-unused-vars: ["error", { "argsIgnorePattern": "^_" }] */ - row(packet, _connection) { + row(packet, _connection) { if (packet.isEOF()) { const status = packet.eofStatusFlags(); const moreResults = status & ServerStatus.SERVER_MORE_RESULTS_EXISTS; @@ -279,11 +279,13 @@ class Query extends Command { }); this.on('end', () => { stream.push(null); // pushing null, indicating EOF - setImmediate(() => stream.emit('close')); // notify readers that query has completed }); this.on('fields', fields => { stream.emit('fields', fields); // replicate old emitter }); + stream.on('end', () => { + stream.emit('close'); + }); return stream; } @@ -302,7 +304,7 @@ class Query extends Command { Timers.clearTimeout(this.queryTimeout); this.queryTimeout = null; } - + const err = new Error('Query inactivity timeout'); err.errorno = 'PROTOCOL_SEQUENCE_TIMEOUT'; err.code = 'PROTOCOL_SEQUENCE_TIMEOUT'; diff --git a/test/integration/connection/test-stream.js b/test/integration/connection/test-stream.js index 57ac6dd5ba..3a2b32368f 100644 --- a/test/integration/connection/test-stream.js +++ b/test/integration/connection/test-stream.js @@ -8,6 +8,7 @@ let rows; const rows1 = []; const rows2 = []; const rows3 = []; +const rows4 = []; connection.query( [ @@ -65,6 +66,11 @@ connection.execute('SELECT * FROM announcements', async (err, _rows) => { for await (const row of s3) { rows3.push(row); } + const s4 = connection.query('SELECT * FROM announcements').stream(); + for await (const row of s4) { + await new Promise(resolve => setTimeout(resolve, 1000)); + rows4.push(row); + } }); process.on('exit', () => { @@ -72,4 +78,5 @@ process.on('exit', () => { assert.deepEqual(rows, rows1); assert.deepEqual(rows, rows2); assert.deepEqual(rows, rows3); + assert.deepEqual(rows, rows4); });