diff --git a/.changes/nextrelease/streaming_request_payloads.json b/.changes/nextrelease/streaming_request_payloads.json new file mode 100644 index 0000000000..b97b0cd89d --- /dev/null +++ b/.changes/nextrelease/streaming_request_payloads.json @@ -0,0 +1,7 @@ +[ + { + "type": "feature", + "category": "", + "description": "Adds support for 'requiresLength' trait, adding headers as necessary for streaming operations." + } +] diff --git a/src/AwsClient.php b/src/AwsClient.php index 206b036906..76ebc63f47 100644 --- a/src/AwsClient.php +++ b/src/AwsClient.php @@ -184,6 +184,7 @@ public function __construct(array $args) $this->addClientSideMonitoring($args); $this->addEndpointParameterMiddleware($args); $this->addEndpointDiscoveryMiddleware($config, $args); + $this->addStreamRequestPayload(); if (isset($args['with_resolved'])) { $args['with_resolved']($config); @@ -371,6 +372,18 @@ private function addClientSideMonitoring($args) ); } + private function addStreamRequestPayload() + { + $streamRequestPayloadMiddleware = StreamRequestPayloadMiddleware::wrap( + $this->api + ); + + $this->handlerList->prependSign( + $streamRequestPayloadMiddleware, + 'StreamRequestPayloadMiddleware' + ); + } + /** * Returns a service model and doc model with any necessary changes * applied. diff --git a/src/Exception/IncalculablePayloadException.php b/src/Exception/IncalculablePayloadException.php new file mode 100644 index 0000000000..a64e7428f6 --- /dev/null +++ b/src/Exception/IncalculablePayloadException.php @@ -0,0 +1,11 @@ +nextHandler = $nextHandler; + $this->service = $service; + } + + public function __invoke(CommandInterface $command, RequestInterface $request) + { + $nextHandler = $this->nextHandler; + + $operation = $this->service->getOperation($command->getName()); + $contentLength = $request->getHeader('content-length'); + $hasStreaming = false; + $requiresLength = false; + + // Check if any present input member is a stream and requires the + // content length + foreach ($operation->getInput()->getMembers() as $name => $member) { + if (!empty($member['streaming']) && isset($command[$name])) { + $hasStreaming = true; + if (!empty($member['requiresLength'])) { + $requiresLength = true; + } + } + } + + if ($hasStreaming) { + + // Add 'transfer-encoding' header if payload size not required to + // to be calculated and not already known + if (empty($requiresLength) + && empty($contentLength) + && isset($operation['authtype']) + && $operation['authtype'] == 'v4-unsigned-body' + ) { + $request = $request->withHeader('transfer-encoding', 'chunked'); + + // Otherwise, make sure 'content-length' header is added + } else { + if (empty($contentLength)) { + $size = $request->getBody()->getSize(); + if (is_null($size)) { + throw new IncalculablePayloadException('Payload' + . ' content length is required and can not be' + . ' calculated.'); + } + $request = $request->withHeader( + 'content-length', + $size + ); + } + } + } + + return $nextHandler($command, $request); + } +} diff --git a/tests/StreamRequestPayloadMiddlewareTest.php b/tests/StreamRequestPayloadMiddlewareTest.php new file mode 100644 index 0000000000..b6aa921ecc --- /dev/null +++ b/tests/StreamRequestPayloadMiddlewareTest.php @@ -0,0 +1,294 @@ +generateTestHandlerList(); + + $list->setHandler(function ( + CommandInterface $command, + RequestInterface $request + ) use ( + $expectedHeaders, + $expectedNonHeaders + ) { + $this->assertArraySubset($expectedHeaders, $request->getHeaders()); + foreach ($expectedNonHeaders as $header) { + $this->assertArrayNotHasKey($header, $request->getHeaders()); + } + return new Result([]); + }); + + $handler = $list->resolve(); + $handler($command, new Request('POST', 'https://foo.com')); + } + + public function generateTestCases() + { + $service = $this->generateTestService(); + $client = $this->generateTestClient($service); + $inputStream = Psr7\stream_for('test'); + + return [ + [ + $client->getCommand( + 'NonStreamingOp', + [ + 'InputString' => 'teststring', + ] + ), + [], + [ 'transfer-encoding', 'content-length' ], + ], + [ + $client->getCommand( + 'StreamingOp', + [ + 'InputStream' => $inputStream, + ] + ), + [ 'content-length' => [26] ], + [ 'transfer-encoding' ], + ], + [ + $client->getCommand( + 'StreamingLengthOp', + [ + 'InputStream' => $inputStream, + ] + ), + [ 'content-length' => [26] ], + [ 'transfer-encoding' ], + ], + [ + $client->getCommand( + 'StreamingUnsignedOp', + [ + 'InputStream' => $inputStream, + ] + ), + [ 'transfer-encoding' => ['chunked'] ], + [ 'content-length' ], + ], + [ + $client->getCommand( + 'StreamingLengthUnsignedOp', + [ + 'InputStream' => $inputStream, + ] + ), + [ 'content-length' => [26] ], + [ 'transfer-encoding' ], + ], + ]; + } + + /** + * @expectedException \Aws\Exception\IncalculablePayloadException + * @expectedExceptionMessage Payload content length is required and can not be calculated. + */ + public function testThrowsExceptionOnIncalculableSize() + { + $service = $this->generateTestService(); + $client = $this->generateTestClient($service); + $command = $client->getCommand( + 'StreamingOp', + [ + 'InputStream' => Psr7\stream_for('test'), + ] + ); + $middleware = StreamRequestPayloadMiddleware::wrap($service); + $invokable = $middleware(function($cmd, $req) {}); + + // Mock a request with a body whose size returns null + $streamMock = $this->getMockBuilder(\stdClass::class) + ->setMethods(['getSize']) + ->getMock(); + $streamMock->expects($this->any()) + ->method('getSize') + ->willReturn(null); + $requestMock = $this->getMockBuilder(Request::class) + ->setConstructorArgs(['POST', 'https://foo.com']) + ->setMethods(['getBody']) + ->getMock(); + $requestMock->expects($this->any()) + ->method('getBody') + ->willReturn($streamMock); + + $invokable($command, $requestMock); + } + + private function generateTestHandlerList() + { + $service = $this->generateTestService(); + $serializer = ClientResolver::_default_serializer([ + 'api' => $service, + 'endpoint' => '' + ]); + + $list = new HandlerList(); + $list->prependBuild(Middleware::requestBuilder($serializer), 'builder'); + $list->prependSign( + StreamRequestPayloadMiddleware::wrap($service), + 'StreamRequestPayloadMiddleware' + ); + + return $list; + } + + private function generateTestClient(Service $service, $args = []) + { + return new AwsClient( + array_merge( + [ + 'service' => 'foo', + 'api_provider' => function () use ($service) { + return $service->toArray(); + }, + 'region' => 'us-east-1', + 'version' => 'latest', + ], + $args + ) + ); + } + + private function generateTestService() + { + return new Service( + [ + 'metadata' => [ + "protocol" => "rest-json", + "apiVersion" => "2014-01-01" + ], + 'shapes' => [ + "BlobLengthStream" => [ + "type" => "blob", + "streaming" => true, + "requiresLength" => true, + ], + "BlobStream" => [ + "type" => "blob", + "streaming" => true, + ], + "NonStreamingInputShape" => [ + "type" => "structure", + "required" => [ + "InputString", + ], + "members" => [ + "InputString" => [ + "shape" => "StringType", + ], + ], + ], + "StreamingInputShape" => [ + "type" => "structure", + "required" => [ + "InputStream", + ], + "members" => [ + "InputStream" => [ + "shape" => "BlobStream", + ], + "InputString" => [ + "shape" => "StringType", + ], + ], + ], + "StreamingLengthInputShape" => [ + "type" => "structure", + "members" => [ + "InputStream" => [ + "shape" => "BlobLengthStream", + ], + ], + ], + "StringType"=> [ + "type" => "string", + ], + ], + 'operations' => [ + "NonStreamingOp" => [ + "name"=> "NonStreamingOp", + "http"=> [ + "method"=> "POST", + "requestUri"=> "/", + "responseCode"=> 200 + ], + "input"=> ["shape"=> "NonStreamingInputShape"], + ], + "StreamingOp" => [ + "name"=> "StreamingOp", + "http"=> [ + "method"=> "POST", + "requestUri"=> "/", + "responseCode"=> 200 + ], + "input"=> ["shape"=> "StreamingInputShape"], + ], + "StreamingLengthOp" => [ + "name"=> "StreamingLengthOp", + "http"=> [ + "method"=> "POST", + "requestUri"=> "/", + "responseCode"=> 200 + ], + "input"=> ["shape"=> "StreamingLengthInputShape"], + ], + "StreamingUnsignedOp" => [ + "name"=> "StreamingUnsignedOp", + "http"=> [ + "method"=> "POST", + "requestUri"=> "/", + "responseCode"=> 200 + ], + "input"=> ["shape"=> "StreamingInputShape"], + "authtype" => "v4-unsigned-body", + ], + "StreamingLengthUnsignedOp" => [ + "name"=> "StreamingLengthUnsignedOp", + "http"=> [ + "method"=> "POST", + "requestUri"=> "/", + "responseCode"=> 200 + ], + "input"=> ["shape"=> "StreamingLengthInputShape"], + "authtype" => "v4-unsigned-body", + ], + ], + ], + function () { return []; } + ); + } + +}