Skip to content

Commit

Permalink
Add StreamRequestPayloadMiddleware
Browse files Browse the repository at this point in the history
  • Loading branch information
howardlopez committed May 22, 2019
1 parent 214c369 commit f5bc7e1
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 0 deletions.
13 changes: 13 additions & 0 deletions src/AwsClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ public function __construct(array $args)
$this->addClientSideMonitoring($args);
$this->addEndpointParameterMiddleware($args);
$this->addEndpointDiscoveryMiddleware($config, $args);
$this->addStreamRequestPayload();

if (isset($args['with_resolved'])) {
$args['with_resolved']($config);
Expand Down Expand Up @@ -371,6 +372,18 @@ private function addClientSideMonitoring($args)
);
}

private function addStreamRequestPayload()
{
$streamRequestPayloadMiddleware = StreamRequestPayloadMiddleware::wrap(
$this->api
);

$this->handlerList->prependSign(
$streamRequestPayloadMiddleware,
'StreamRequestPayloadMiddleware'
);
}

/**
* Returns a service model and doc model with any necessary changes
* applied.
Expand Down
11 changes: 11 additions & 0 deletions src/Exception/IncalculablePayloadException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php
namespace Aws\Exception;

use Aws\HasMonitoringEventsTrait;
use Aws\MonitoringEventsInterface;

class IncalculablePayloadException extends \RuntimeException implements
MonitoringEventsInterface
{
use HasMonitoringEventsTrait;
}
84 changes: 84 additions & 0 deletions src/StreamRequestPayloadMiddleware.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
<?php
namespace Aws;

use Aws\Api\Service;
use Aws\Exception\IncalculablePayloadException;
use Psr\Http\Message\RequestInterface;

/**
* @internal
*/
class StreamRequestPayloadMiddleware
{
private $nextHandler;
private $service;

/**
* Create a middleware wrapper function
*
* @param Service $service
* @return \Closure
*/
public static function wrap(Service $service)
{
return function (callable $handler) use ($service) {
return new self($handler, $service);
};
}

public function __construct(callable $nextHandler, Service $service)
{
$this->nextHandler = $nextHandler;
$this->service = $service;
}

public function __invoke(CommandInterface $command, RequestInterface $request)
{
$nextHandler = $this->nextHandler;

$operation = $this->service->getOperation($command->getName());
$contentLength = $request->getHeader('content-length');
$hasStreaming = false;
$requiresLength = false;

// Check if any present input member is a stream and requires the
// content length
foreach ($operation->getInput()->getMembers() as $name => $member) {
if (!empty($member['streaming']) && isset($command[$name])) {
$hasStreaming = true;
if (!empty($member['requiresLength'])) {
$requiresLength = true;
}
}
}

if ($hasStreaming) {

// Add 'transfer-encoding' header if payload size not required to
// to be calculated and not already known
if (empty($requiresLength)
&& empty($contentLength)
&& !empty($operation['v4-unsigned-body'])
) {
$request = $request->withHeader('transfer-encoding', 'chunked');

// Otherwise, make sure 'content-length' header is added
} else {
if (empty($contentLength)) {
$size = $request->getBody()->getSize();
if (is_null($size)) {
throw new IncalculablePayloadException('Payload'
. 'content lenggth is required and can not be'
. 'calculated.');
}
$request = $request->withHeader(
'content-length',
$size
);
}
}
}

return $nextHandler($command, $request);
}
}
150 changes: 150 additions & 0 deletions tests/StreamRequestPayloadMiddlewareTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
<?php
namespace Aws\Test;

use Aws\Api\Service;
use Aws\AwsClient;
use Aws\HandlerList;
use Aws\Result;
use Aws\StreamRequestPayloadMiddleware;
use GuzzleHttp\Psr7;
use GuzzleHttp\Psr7\Request;
use PHPUnit\Framework\TestCase;

/**
* @covers \Aws\StreamRequestPayloadMiddleware
*/
class StreamRequestPayloadMiddlewareTest extends TestCase
{

public function testAddsProperHeaders()
{
$service = $this->generateTestService();
$client = $this->generateTestClient($service);
$command = $client->getCommand(
'StreamingOp',
[
'InputStream' => Psr7\stream_for('test'),
// 'InputString' => 'some_string'
]
);

$list = $command->getHandlerList();
// $list = new HandlerList();
$list->setHandler(function ($command, $request) {
// var_dump($request);
return new Result([]);
});
// $list->appendSign(StreamRequestPayloadMiddleware::wrap($service));

$handler = $list->resolve();

$result = $handler($command, new Request('POST', 'https://foo.com'));
}

private function generateTestClient(Service $service, $args = [])
{
return new AwsClient(
array_merge(
[
'service' => 'foo',
'api_provider' => function () use ($service) {
return $service->toArray();
},
'region' => 'us-east-1',
'version' => 'latest',
],
$args
)
);
}

private function generateTestService()
{
return new Service(
[
'metadata' => [
"protocol" => "rest-json",
"apiVersion" => "2014-01-01"
],
'shapes' => [
"BlobLengthStream" => [
"type" => "blob",
"streaming" => true,
"requiresLength" => true,
],
"BlobStream" => [
"type" => "blob",
"streaming" => true,
],
"StreamingInputShape" => [
"type" => "structure",
"required" => [
"InputStream",
],
"members" => [
"InputStream" => [
"shape" => "BlobStream",
],
"InputString" => [
"shape" => "StringType",
],
],
],
"StreamingLengthInputShape" => [
"type" => "structure",
"members" => [
"InputStream" => [
"shape" => "BlobLengthStream",
],
],
],
"StringType"=> [
"type" => "string",
],
],
'operations' => [
"StreamingOp" => [
"name"=> "StreamingOp",
"http"=> [
"method"=> "POST",
"requestUri"=> "/",
"responseCode"=> 200
],
"input"=> ["shape"=> "StreamingInputShape"],
],
"StreamingLengthOp" => [
"name"=> "StreamingLengthOp",
"http"=> [
"method"=> "POST",
"requestUri"=> "/",
"responseCode"=> 200
],
"input"=> ["shape"=> "StreamingLengthInputShape"],
],
"StreamingUnsignedOp" => [
"name"=> "StreamingUnsignedOp",
"http"=> [
"method"=> "POST",
"requestUri"=> "/",
"responseCode"=> 200
],
"input"=> ["shape"=> "StreamingInputShape"],
"authtype" => "v4-unsigned-body",
],
"StreamingLengthUnsignedOp" => [
"name"=> "StreamingLengthUnsignedOp",
"http"=> [
"method"=> "POST",
"requestUri"=> "/",
"responseCode"=> 200
],
"input"=> ["shape"=> "StreamingLengthInputShape"],
"authtype" => "v4-unsigned-body",
],
],
],
function () { return []; }
);
}

}

0 comments on commit f5bc7e1

Please sign in to comment.