Skip to content

Commit

Permalink
Merged PR 2918: Cancel Sends if they take too long
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennanConroy authored and Andrew Stanton-Nurse committed Nov 22, 2019
1 parent 147f950 commit 0757955
Show file tree
Hide file tree
Showing 23 changed files with 1,013 additions and 107 deletions.
2 changes: 2 additions & 0 deletions eng/Baseline.Designer.props
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@
<BaselinePackageReference Include="Microsoft.AspNetCore.Routing" Version="[2.1.1, )" />
<BaselinePackageReference Include="Microsoft.AspNetCore.WebSockets" Version="[2.1.1, )" />
<BaselinePackageReference Include="Newtonsoft.Json" Version="[11.0.2, )" />
<BaselinePackageReference Include="System.Net.WebSockets.WebSocketProtocol" Version="[4.5.3, )" />
</ItemGroup>
<ItemGroup Condition=" '$(PackageId)' == 'Microsoft.AspNetCore.Http.Connections' AND '$(TargetFramework)' == 'netstandard2.0' ">
<BaselinePackageReference Include="Microsoft.AspNetCore.Http.Connections.Common" Version="[1.0.4, )" />
Expand All @@ -472,6 +473,7 @@
<BaselinePackageReference Include="Microsoft.AspNetCore.Routing" Version="[2.1.1, )" />
<BaselinePackageReference Include="Microsoft.AspNetCore.WebSockets" Version="[2.1.1, )" />
<BaselinePackageReference Include="Newtonsoft.Json" Version="[11.0.2, )" />
<BaselinePackageReference Include="System.Net.WebSockets.WebSocketProtocol" Version="[4.5.3, )" />
</ItemGroup>
<!-- Package: Microsoft.AspNetCore.Http.Extensions-->
<PropertyGroup Condition=" '$(PackageId)' == 'Microsoft.AspNetCore.Http.Extensions' ">
Expand Down
2 changes: 2 additions & 0 deletions eng/PatchConfig.props
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ Later on, this will be checked using this condition:
</PropertyGroup>
<PropertyGroup Condition=" '$(VersionPrefix)' == '2.1.15' ">
<PackagesInPatch>
Microsoft.AspNetCore.Http.Connections;
Microsoft.AspNetCore.SignalR.Core;
</PackagesInPatch>
</PropertyGroup>
</Project>
23 changes: 22 additions & 1 deletion src/SignalR/clients/ts/FunctionalTests/selenium/run-tests.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
import { ChildProcess, spawn } from "child_process";
import * as fs from "fs";
import * as _fs from "fs";
import { EOL } from "os";
import * as path from "path";
import { promisify } from "util";
import { PassThrough, Readable } from "stream";

import { run } from "../../webdriver-tap-runner/lib";

import * as _debug from "debug";
const debug = _debug("signalr-functional-tests:run");

const ARTIFACTS_DIR = path.resolve(__dirname, "..", "..", "..", "..", "artifacts");
const LOGS_DIR = path.resolve(ARTIFACTS_DIR, "logs");

// Promisify things from fs we want to use.
const fs = {
createWriteStream: _fs.createWriteStream,
exists: promisify(_fs.exists),
mkdir: promisify(_fs.mkdir),
};

process.on("unhandledRejection", (reason) => {
console.error(`Unhandled promise rejection: ${reason}`);
process.exit(1);
Expand Down Expand Up @@ -102,6 +113,13 @@ if (chromePath) {
try {
const serverPath = path.resolve(__dirname, "..", "bin", configuration, "netcoreapp2.1", "FunctionalTests.dll");

if (!await fs.exists(ARTIFACTS_DIR)) {
await fs.mkdir(ARTIFACTS_DIR);
}
if (!await fs.exists(LOGS_DIR)) {
await fs.mkdir(LOGS_DIR);
}

debug(`Launching Functional Test Server: ${serverPath}`);
const dotnet = spawn("dotnet", [serverPath], {
env: {
Expand All @@ -117,6 +135,9 @@ if (chromePath) {
}
}

const logStream = fs.createWriteStream(path.resolve(LOGS_DIR, "ts.functionaltests.dotnet.log"));
dotnet.stdout.pipe(logStream);

process.on("SIGINT", cleanup);
process.on("exit", cleanup);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class HttpConnectionContext : ConnectionContext,
IHttpTransportFeature,
IConnectionInherentKeepAliveFeature
{
private static long _tenSeconds = TimeSpan.FromSeconds(10).Ticks;

private readonly object _itemsLock = new object();
private readonly object _heartbeatLock = new object();
private List<(Action<object> handler, object state)> _heartbeatHandlers;
Expand All @@ -35,6 +37,13 @@ public class HttpConnectionContext : ConnectionContext,
private IDuplexPipe _application;
private IDictionary<object, object> _items;

private CancellationTokenSource _sendCts;
private bool _activeSend;
private long _startedSendTime;
private readonly object _sendingLock = new object();

internal CancellationToken SendingToken { get; private set; }

// This tcs exists so that multiple calls to DisposeAsync all wait asynchronously
// on the same task
private readonly TaskCompletionSource<object> _disposeTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
Expand Down Expand Up @@ -274,24 +283,45 @@ private async Task WaitOnTasks(Task applicationTask, Task transportTask, bool cl
// Cancel any pending flushes from back pressure
Application?.Output.CancelPendingFlush();

// Shutdown both sides and wait for nothing
Transport?.Output.Complete(applicationTask.Exception?.InnerException);
Application?.Output.Complete(transportTask.Exception?.InnerException);

// Normally it isn't safe to try and acquire this lock because the Send can hold onto it for a long time if there is backpressure
// It is safe to wait for this lock now because the Send will be in one of 4 states
// 1. In the middle of a write which is in the middle of being canceled by the CancelPendingFlush above, when it throws
// an OperationCanceledException it will complete the PipeWriter which will make any other Send waiting on the lock
// throw an InvalidOperationException if they call Write
// 2. About to write and see that there is a pending cancel from the CancelPendingFlush, go to 1 to see what happens
// 3. Enters the Send and sees the Dispose state from DisposeAndRemoveAsync and releases the lock
// 4. No Send in progress
await WriteLock.WaitAsync();
try
{
Log.WaitingForTransportAndApplication(_logger, TransportType);
// A poorly written application *could* in theory get stuck forever and it'll show up as a memory leak
await Task.WhenAll(applicationTask, transportTask);
// Complete the applications read loop
Application?.Output.Complete(transportTask.Exception?.InnerException);
}
finally
{
Log.TransportAndApplicationComplete(_logger, TransportType);

// Close the reading side after both sides run
Application?.Input.Complete();
Transport?.Input.Complete();
WriteLock.Release();
}

Application?.Input.CancelPendingRead();

await transportTask.NoThrow();
Application?.Input.Complete();

Log.WaitingForTransportAndApplication(_logger, TransportType);

// A poorly written application *could* in theory get stuck forever and it'll show up as a memory leak
// Wait for application so we can complete the writer safely
await applicationTask.NoThrow();
Log.TransportAndApplicationComplete(_logger, TransportType);

// Shutdown application side now that it's finished
Transport?.Output.Complete(applicationTask.Exception?.InnerException);

// Close the reading side after both sides run
Transport?.Input.Complete();

// Observe exceptions
await Task.WhenAll(transportTask, applicationTask);
}

// Notify all waiters that we're done disposing
Expand All @@ -311,6 +341,43 @@ private async Task WaitOnTasks(Task applicationTask, Task transportTask, bool cl
}
}

internal void StartSendCancellation()
{
lock (_sendingLock)
{
if (_sendCts == null || _sendCts.IsCancellationRequested)
{
_sendCts = new CancellationTokenSource();
SendingToken = _sendCts.Token;
}

_startedSendTime = DateTime.UtcNow.Ticks;
_activeSend = true;
}
}

internal void TryCancelSend(long currentTicks)
{
lock (_sendingLock)
{
if (_activeSend)
{
if (currentTicks - _startedSendTime > _tenSeconds)
{
_sendCts.Cancel();
}
}
}
}

internal void StopSendCancellation()
{
lock (_sendingLock)
{
_activeSend = false;
}
}

private static class Log
{
private static readonly Action<ILogger, string, Exception> _disposingConnection =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private async Task ExecuteAsync(HttpContext context, ConnectionDelegate connecti
connection.SupportedFormats = TransferFormat.Text;

// We only need to provide the Input channel since writing to the application is handled through /send.
var sse = new ServerSentEventsTransport(connection.Application.Input, connection.ConnectionId, _loggerFactory);
var sse = new ServerSentEventsTransport(connection.Application.Input, connection.ConnectionId, connection, _loggerFactory);

await DoPersistentConnection(connectionDelegate, sse, context, connection);
}
Expand Down Expand Up @@ -264,7 +264,7 @@ private async Task ExecuteAsync(HttpContext context, ConnectionDelegate connecti
context.Response.RegisterForDispose(timeoutSource);
context.Response.RegisterForDispose(tokenSource);

var longPolling = new LongPollingTransport(timeoutSource.Token, connection.Application.Input, _loggerFactory);
var longPolling = new LongPollingTransport(timeoutSource.Token, connection.Application.Input, _loggerFactory, connection);

// Start the transport
connection.TransportTask = longPolling.ProcessRequestAsync(context, tokenSource.Token);
Expand All @@ -291,7 +291,9 @@ private async Task ExecuteAsync(HttpContext context, ConnectionDelegate connecti
connection.Transport.Output.Complete(connection.ApplicationTask.Exception);

// Wait for the transport to run
await connection.TransportTask;
// Ignore exceptions, it has been logged if there is one and the application has finished
// So there is no one to give the exception to
await connection.TransportTask.NoThrow();

// If the status code is a 204 it means the connection is done
if (context.Response.StatusCode == StatusCodes.Status204NoContent)
Expand All @@ -307,6 +309,18 @@ private async Task ExecuteAsync(HttpContext context, ConnectionDelegate connecti
pollAgain = false;
}
}
else if (connection.TransportTask.IsFaulted || connection.TransportTask.IsCanceled)
{
// Cancel current request to release any waiting poll and let dispose aquire the lock
currentRequestTcs.TrySetCanceled();

// We should be able to safely dispose because there's no more data being written
// We don't need to wait for close here since we've already waited for both sides
await _manager.DisposeAndRemoveAsync(connection, closeGracefully: false);

// Don't poll again if we've removed the connection completely
pollAgain = false;
}
else if (context.Response.StatusCode == StatusCodes.Status204NoContent)
{
// Don't poll if the transport task was canceled
Expand Down Expand Up @@ -511,6 +525,14 @@ private async Task ProcessSend(HttpContext context, HttpConnectionDispatcherOpti

context.Response.StatusCode = StatusCodes.Status404NotFound;
context.Response.ContentType = "text/plain";

// There are no writes anymore (since this is the write "loop")
// So it is safe to complete the writer
// We complete the writer here because we already have the WriteLock acquired
// and it's unsafe to complete outside of the lock
// Other code isn't guaranteed to be able to acquire the lock before another write
// even if CancelPendingFlush is called, and the other write could hang if there is backpressure
connection.Application.Output.Complete();
return;
}

Expand Down Expand Up @@ -549,11 +571,8 @@ private async Task ProcessDeleteAsync(HttpContext context)

Log.TerminatingConection(_logger);

// Complete the receiving end of the pipe
connection.Application.Output.Complete();

// Dispose the connection gracefully, but don't wait for it. We assign it here so we can wait in tests
connection.DisposeAndRemoveTask = _manager.DisposeAndRemoveAsync(connection, closeGracefully: true);
// Dispose the connection, but don't wait for it. We assign it here so we can wait in tests
connection.DisposeAndRemoveTask = _manager.DisposeAndRemoveAsync(connection, closeGracefully: false);

context.Response.StatusCode = StatusCodes.Status202Accepted;
context.Response.ContentType = "text/plain";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public partial class HttpConnectionManager
private readonly TimerAwaitable _nextHeartbeat;
private readonly ILogger<HttpConnectionManager> _logger;
private readonly ILogger<HttpConnectionContext> _connectionLogger;
private readonly bool _useSendTimeout = true;

public HttpConnectionManager(ILoggerFactory loggerFactory, IApplicationLifetime appLifetime)
{
Expand All @@ -38,6 +39,11 @@ public HttpConnectionManager(ILoggerFactory loggerFactory, IApplicationLifetime
appLifetime.ApplicationStarted.Register(() => Start());
appLifetime.ApplicationStopping.Register(() => CloseConnections());
_nextHeartbeat = new TimerAwaitable(_heartbeatTickRate, _heartbeatTickRate);

if (AppContext.TryGetSwitch("Microsoft.AspNetCore.Http.Connections.DoNotUseSendTimeout", out var timeoutDisabled))
{
_useSendTimeout = !timeoutDisabled;
}
}

public void Start()
Expand Down Expand Up @@ -156,9 +162,10 @@ public async Task ScanAsync()
connection.StateLock.Release();
}

var utcNow = DateTimeOffset.UtcNow;
// Once the decision has been made to dispose we don't check the status again
// But don't clean up connections while the debugger is attached.
if (!Debugger.IsAttached && status == HttpConnectionStatus.Inactive && (DateTimeOffset.UtcNow - lastSeenUtc).TotalSeconds > 5)
if (!Debugger.IsAttached && status == HttpConnectionStatus.Inactive && (utcNow - lastSeenUtc).TotalSeconds > 5)
{
Log.ConnectionTimedOut(_logger, connection.ConnectionId);
HttpConnectionsEventSource.Log.ConnectionTimedOut(connection.ConnectionId);
Expand All @@ -170,6 +177,11 @@ public async Task ScanAsync()
}
else
{
if (!Debugger.IsAttached && _useSendTimeout)
{
connection.TryCancelSend(utcNow.Ticks);
}

// Tick the heartbeat, if the connection is still active
connection.TickHeartbeat();
}
Expand Down
27 changes: 27 additions & 0 deletions src/SignalR/common/Http.Connections/src/Internal/TaskExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System.Runtime.CompilerServices;

namespace System.Threading.Tasks
{
internal static class TaskExtensions
{
public static async Task NoThrow(this Task task)
{
await new NoThrowAwaiter(task);
}
}

internal readonly struct NoThrowAwaiter : ICriticalNotifyCompletion
{
private readonly Task _task;
public NoThrowAwaiter(Task task) { _task = task; }
public NoThrowAwaiter GetAwaiter() => this;
public bool IsCompleted => _task.IsCompleted;
// Observe exception
public void GetResult() { _ = _task.Exception; }
public void OnCompleted(Action continuation) => _task.GetAwaiter().OnCompleted(continuation);
public void UnsafeOnCompleted(Action continuation) => OnCompleted(continuation);
}
}
Loading

0 comments on commit 0757955

Please sign in to comment.