Skip to content

Commit

Permalink
feat: close connection when upstream unexpectedly closes
Browse files Browse the repository at this point in the history
  • Loading branch information
vlad-tkachenko committed Jan 31, 2024
1 parent 015433b commit d7450f5
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 21 deletions.
8 changes: 8 additions & 0 deletions src/Prxi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import {
ServerOptions,
} from "node:http2";
import { Hooks } from "./Hooks";
import { randomUUID } from "crypto";
import { Http2Stream } from "http2";

interface Proxy {
upstream: UpstreamConfiguration,
Expand Down Expand Up @@ -245,6 +247,10 @@ export class Prxi {
server.on('session', (session) => {
const sessionContext: Record<string, any> = {};
(<any> session)._context = sessionContext;
(<any> session)._uuid = randomUUID();
(<any> session)._streams = new Set<Http2Stream>();

// register hooks
this.hooks.onBeforeHTTP2Session(session, sessionContext);
session.once('close', () => {
this.hooks.onAfterHTTP2Session(session, sessionContext);
Expand All @@ -265,6 +271,7 @@ export class Prxi {
const context = {
...sessionContext,
};
(<any> session)._streams.add(stream);

/* istanbul ignore next */
stream.on('error', (err) => {
Expand All @@ -279,6 +286,7 @@ export class Prxi {

this.hooks.onBeforeHTTP2Request(method, path, stream, headers, context);
stream.once('close', () => {
(<any> session)._streams.delete(stream);
this.hooks.onAfterHTTP2Request(method, path, stream, headers, context);
});

Expand Down
66 changes: 47 additions & 19 deletions src/handlers/Http2ProxyHandler.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ClientHttp2Session, Http2Session, OutgoingHttpHeaders, ServerHttp2Stream, connect, constants } from 'node:http2';
import { ClientHttp2Session, Http2Session, Http2Stream, OutgoingHttpHeaders, ServerHttp2Stream, connect, constants } from 'node:http2';

import { Configuration, LogConfiguration, ProxyRequestConfiguration } from "../interfaces";
import { UpstreamConfiguration } from "../interfaces/UpstreamConfiguration";
Expand All @@ -8,15 +8,14 @@ const emptyObj = {};

export class Http2ProxyHandler {
static LOG_CLASS = 'prxi/http2';
private connections: Map<Http2Session, ClientHttp2Session>;
private connections: Record<string, ClientHttp2Session> = {};
private sessions: Record<string, Http2Session> = {};

constructor(
private log: LogConfiguration,
private configuration: Configuration,
private upstream: UpstreamConfiguration,
) {
this.connections = new Map();
}
) {}

/**
* Get existing connection or create a new one
Expand All @@ -25,45 +24,74 @@ export class Http2ProxyHandler {
* @returns
*/
private getOrCreateConnection(session: Http2Session, target: string): ClientHttp2Session {
let connection = this.connections.get(session);
const uuid = Http2ProxyHandler.getSessionUUID(session);

if (!this.sessions[uuid]) {
this.sessions[uuid] = session;
}

let connection = this.connections[uuid];
if (connection && !connection.closed) {
return connection;
}

connection = connect(target);
this.connections.set(session, connection);
this.connections[uuid] = connection;

connection.once('close', () => {
this.closeConnection(session, connection);
connection.once('close', (e) => {
this.closeConnection(session, connection, true);
});

session.once('close', () => {
this.closeConnection(session, connection);
this.closeConnection(session, connection, false);
});

return connection;
}

/**
* Get session UUID
*/
private static getSessionUUID(session: Http2Session): string {
return (<any> session)._uuid;
}

/**
* Close connection
* @param session
* @param connection
* @param closeSession
*/
private closeConnection(session: Http2Session, connection: ClientHttp2Session): void {
if (this.connections.has(session)) {
this.connections.delete(session);
private closeConnection(session: Http2Session, connection: ClientHttp2Session, closeSession: boolean): void {
const uuid = Http2ProxyHandler.getSessionUUID(session);

if (this.connections[uuid]) {
delete this.connections[uuid];
connection.close();

if (closeSession) {
(<any> session)._streams.forEach((stream: Http2Stream) => {
stream.close();
});

session.close();
}

delete this.sessions[uuid];
}
}

/**
* Close all connections
*/
public closeAllConnections(): void {
this.connections.forEach((connection) => {
connection.close();
});
this.connections = new Map();
for (const uuid of Object.keys(this.connections)) {
this.connections[uuid].close();
this.sessions[uuid].close();
}

this.connections = {};
this.sessions = {};
}

/**
Expand Down Expand Up @@ -218,7 +246,7 @@ export class Http2ProxyHandler {
/* istanbul ignore else */
if (this.configuration.proxyRequestTimeout) {
client.setTimeout(this.configuration.proxyRequestTimeout, () => {
this.closeConnection(session, client);
this.closeConnection(session, client, false);
this.log.error(context, 'Request timeout', null, {
class: Http2ProxyHandler.LOG_CLASS,
method: headers[constants.HTTP2_HEADER_METHOD],
Expand All @@ -233,7 +261,7 @@ export class Http2ProxyHandler {
});

client.once('error', (err) => {
this.closeConnection(session, client);
this.closeConnection(session, client, false);
this.log.error(context, 'Proxy request failed', err, {
class: Http2ProxyHandler.LOG_CLASS,
method: headers[constants.HTTP2_HEADER_METHOD],
Expand Down
6 changes: 6 additions & 0 deletions src/handlers/HttpProxyHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ export class HttpProxyHandler {
});

client.once('response', (response: IncomingMessage) => {
client.once('close', () => {
if (processed) return;

req.destroy(new Error('Request closed by the upstream server'));
});

this.log.debug(context, `Response received`,{
class: HttpProxyHandler.LOG_CLASS,
method,
Expand Down
25 changes: 23 additions & 2 deletions test/HttpProxy.success.suite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ abstract class BaseHttpProxySuccessSuite {
*/
async after(): Promise<void> {
await this.proxy?.stop();
await this.server.stop();
await this.server?.stop();
this.proxy = null;
this.server = null;
Console.printDoubleBox(`[TEST] [${this.mode}]${this.secure ? ' [secure]' : ''} ${this[context].test.title}`);
Expand All @@ -59,7 +59,7 @@ abstract class BaseHttpProxySuccessSuite {
}

@test()
async closeOpenProxyRequest(): Promise<void> {
async closeOpenProxyRequestByClient(): Promise<void> {
await this.initProxy();

const controller = new AbortController();
Expand All @@ -77,6 +77,27 @@ abstract class BaseHttpProxySuccessSuite {
strictEqual(error.message, 'This operation was aborted');
}

@test()
async closeOpenProxyRequestByUpstreamServer(): Promise<void> {
await this.initProxy();

let error: Error;
const promise = new FetchHelpers(this.mode, this.secure).get(`${this.proxyUrl}/hold`).catch(
err => error = err
)

await new Promise<void>((res) => setTimeout(() => {
this.server.stop().then(() => {
this.server = null;
res();
});
}, 50));
const resp = await promise;
console.log('@@@', resp);

//strictEqual(error.message, 'This operation was aborted');
}

@test()
async echoRequest(): Promise<void> {
let c: Record<string, any>;
Expand Down
2 changes: 2 additions & 0 deletions test/helpers/FetchHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ export class FetchHelpers {
});

req.once('end', () => {
console.log('FetchHelper -> req ended');
if (count + 1 === this.repeat) {
client.close();

Expand Down Expand Up @@ -266,6 +267,7 @@ export class FetchHelpers {

makeRequest();
} catch (e) {
console.error('FetchHelper -> req rejected', e);
rej(e);
}
});
Expand Down

0 comments on commit d7450f5

Please sign in to comment.