Skip to content

Commit

Permalink
feat: close connection when upstream unexpectedly closes
Browse files Browse the repository at this point in the history
  • Loading branch information
vlad-tkachenko committed Feb 1, 2024
1 parent d7450f5 commit d9b5eda
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 88 deletions.
166 changes: 91 additions & 75 deletions src/handlers/HttpProxyHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,102 +81,118 @@ export class HttpProxyHandler {
const client = request(options);
let processed = false;

req.socket.once('close', () => {
const onResClose = () => {
if (processed) return;
processed = true;

client.destroy(new Error('Request closed by the client'));
});
client.destroy();
}

try {
await new Promise<void>((resolve, reject) => {
req.pipe(client);
const onUpstreamClose = () => {
if (processed) return;
processed = true;

client.once('error', (err) => {
this.log.error(context, `Proxy request failed`, err, {
class: HttpProxyHandler.LOG_CLASS,
method,
target,
path: url,
});
res.destroy();
}

reject(err);
});
res.once('close', onResClose);

client.once('response', (response: IncomingMessage) => {
client.once('close', () => {
if (processed) return;
await new Promise<void>((resolve, reject) => {
req.pipe(client);

req.destroy(new Error('Request closed by the upstream server'));
});
client.once('error', (err) => {
this.log.error(context, `Proxy request failed`, err, {
class: HttpProxyHandler.LOG_CLASS,
method,
target,
path: url,
});

this.log.debug(context, `Response received`,{
processed = true;
reject(err);
});

client.once('response', (response: IncomingMessage) => {
client.once('close', () => {
this.log.debug(context, `Client closed`, {
class: HttpProxyHandler.LOG_CLASS,
method,
target,
path: url,
responseStatusCode: response.statusCode,
});

if (isKeepAliveRequest) {
client.setTimeout(0);
}
onUpstreamClose();
});

// map status code
res.statusCode = response.statusCode;

const headersToSet = RequestUtils.prepareProxyHeaders(
response.headers,
this.configuration.responseHeaders,
this.upstream.responseHeaders,
// istanbul ignore next
proxyConfiguration?.proxyResponseHeaders
);

const next = () => {
RequestUtils.updateResponseHeaders(res, headersToSet);

// istanbul ignore else
if (!res.writableEnded) {
response.once('end', () => {
this.log.debug(context, `Proxy request completed`,{
class: HttpProxyHandler.LOG_CLASS,
method,
target,
path: url,
responseStatusCode: response.statusCode,
});
resolve();
this.log.debug(context, `Response received`,{
class: HttpProxyHandler.LOG_CLASS,
method,
target,
path: url,
responseStatusCode: response.statusCode,
});

if (isKeepAliveRequest) {
client.setTimeout(0);
}

// map status code
res.statusCode = response.statusCode;

const headersToSet = RequestUtils.prepareProxyHeaders(
response.headers,
this.configuration.responseHeaders,
this.upstream.responseHeaders,
// istanbul ignore next
proxyConfiguration?.proxyResponseHeaders
);

const next = () => {
RequestUtils.updateResponseHeaders(res, headersToSet);

// istanbul ignore else
if (!res.writableEnded) {
response.once('end', () => {
this.log.debug(context, `Proxy request completed`,{
class: HttpProxyHandler.LOG_CLASS,
method,
target,
path: url,
responseStatusCode: response.statusCode,
});

response.pipe(res);
} else {
processed = true;
resolve();
}
}
});

/* istanbul ignore else */
if (proxyConfiguration && proxyConfiguration.onBeforeResponse) {
callOptionalPromiseFunction(
() => proxyConfiguration.onBeforeResponse(res, headersToSet, context),
() => next(),
(err) => {
this.log.error(context, 'onBeforeResponse function failed', err, {
class: HttpProxyHandler.LOG_CLASS,
method,
target,
path: url,
});

reject(err);
}
)
response.pipe(res);
} else {
next();
processed = true;
resolve();
}
});
}

/* istanbul ignore else */
if (proxyConfiguration && proxyConfiguration.onBeforeResponse) {
callOptionalPromiseFunction(
() => proxyConfiguration.onBeforeResponse(res, headersToSet, context),
() => next(),
(err) => {
this.log.error(context, 'onBeforeResponse function failed', err, {
class: HttpProxyHandler.LOG_CLASS,
method,
target,
path: url,
});

processed = true;
reject(err);
}
)
} else {
next();
}
});
} finally {
processed = true;
}
});
}
}
17 changes: 10 additions & 7 deletions test/HttpProxy.success.suite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,12 @@ abstract class BaseHttpProxySuccessSuite {
controller.abort();
res();
}, 20));
await promise;
await new Promise<void>(res => setTimeout(res, 20));
const resp = await promise;

strictEqual(error.message, 'This operation was aborted');
strictEqual(error, undefined);
ok(resp.ok);
strictEqual(resp.data, null);
}

@test()
Expand All @@ -91,11 +94,13 @@ abstract class BaseHttpProxySuccessSuite {
this.server = null;
res();
});
}, 50));
}, 20));
await new Promise<void>(res => setTimeout(res, 20));
const resp = await promise;
console.log('@@@', resp);

//strictEqual(error.message, 'This operation was aborted');
strictEqual(error, undefined);
ok(resp.ok);
strictEqual(resp.data, null);
}

@test()
Expand Down Expand Up @@ -325,13 +330,11 @@ abstract class BaseHttpProxySuccessSuite {
await this.initProxy({
on: {
upgrade: (req, socket, head, context) => {
console.log('-> upgrade');
c = context;
context.upgrade = true;
},

afterUpgrade: (req, socket, head, context) => {
console.log('-> afterUpgrade');
c = context;
context.afterUpgrade = true;
res();
Expand Down
14 changes: 9 additions & 5 deletions test/helpers/FetchHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,12 @@ export class FetchHelpers {
responseHeaders[header] = response.headers.get(header).toString();
}

const isSSE = response.headers.get('content-type') === 'text/event-stream';

return {
data: await response.json(),
ok: response.ok,
status: response.status,
data: isSSE ? null : await response.json(),
headers: responseHeaders,
};
}
Expand Down Expand Up @@ -240,12 +244,12 @@ export class FetchHelpers {
client.close();

try {
if (aborted && !data) {
return rej(new Error('This operation was aborted'));
}
const isSSE = responseHeaders['content-type'] === 'text/event-stream';

res({
data: data ? JSON.parse(data) : undefined,
ok: responseHeaders[constants.HTTP2_HEADER_STATUS] === '200',
status: responseHeaders[constants.HTTP2_HEADER_STATUS],
data: isSSE ? null : (data ? JSON.parse(data) : undefined),
headers: responseHeaders,
});
} catch (e) {
Expand Down
20 changes: 19 additions & 1 deletion test/helpers/TestServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,14 @@ export class TestServer {
}

if (req.url.indexOf(`${this.prefix}/hold`) === 0) {
return;
res.setHeader("Cache-Control", "no-store");
res.setHeader("Content-Type", "text/event-stream");
res.write([
'event: ping',
'data: {"time": "' + new Date().toISOString() + '"}',
].join('\n') + '\n\n');

return
}

console.log(`Unable to find handler for URL: ${req.url}`);
Expand Down Expand Up @@ -138,6 +145,17 @@ export class TestServer {
}

if (path.indexOf(`${this.prefix}/hold`) === 0) {
stream.respond({
'content-type': 'text/event-stream',
'Cache-Control': 'no-store',
[constants.HTTP2_HEADER_STATUS]: 200,
})

stream.write([
'event: ping',
'data: {"time": "' + new Date().toISOString() + '"}',
].join('\n') + '\n\n');

return;
}

Expand Down

0 comments on commit d9b5eda

Please sign in to comment.