Skip to content

Commit

Permalink
feat(pipes-enrichments): add Step Functions enrichment eventbridge pi…
Browse files Browse the repository at this point in the history
…pes (#30495)

### Issue # (if applicable)

Closes #29385.

### Reason for this change
To use Step Functions state machine enrichment for eventbrige pipes



### Description of changes
Add `StepFunctionsEnrichment` class.



### Description of how you validated changes
Add unit test and integ tests.


### Checklist
- [x] My code adheres to the [CONTRIBUTING GUIDE](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md) and [DESIGN GUIDELINES](https://github.com/aws/aws-cdk/blob/main/docs/DESIGN_GUIDELINES.md)

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
mazyu36 committed Jun 21, 2024
1 parent f93c2ef commit 8b495f9
Show file tree
Hide file tree
Showing 16 changed files with 33,695 additions and 0 deletions.
23 changes: 23 additions & 0 deletions packages/@aws-cdk/aws-pipes-enrichments-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,26 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
target: new SomeTarget(targetQueue),
});
```

### Step Functions state machine

Step Functions state machine can be used to enrich events of a pipe.

**Note:** EventBridge Pipes only supports Express workflows invoked synchronously.

> Visit [Amazon EventBridge Pipes event enrichment](https://docs.aws.amazon.com/eventbridge/latest/userguide/pipes-enrichment.html) for more details.
```ts
declare const sourceQueue: sqs.Queue;
declare const targetQueue: sqs.Queue;

declare const enrichmentStateMachine: stepfunctions.StateMachine;

const enrichment = new enrichments.StepFunctionsEnrichment(enrichmentStateMachine);

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SomeSource(sourceQueue),
enrichment,
target: new SomeTarget(targetQueue),
});
```
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-pipes-enrichments-alpha/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './lambda';
export * from './stepfunctions';
46 changes: 46 additions & 0 deletions packages/@aws-cdk/aws-pipes-enrichments-alpha/lib/stepfunctions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { EnrichmentParametersConfig, IEnrichment, IPipe, InputTransformation } from '@aws-cdk/aws-pipes-alpha';
import { IRole } from 'aws-cdk-lib/aws-iam';
import { IStateMachine, StateMachine, StateMachineType } from 'aws-cdk-lib/aws-stepfunctions';

/**
* Properties for a StepFunctionsEnrichment
*/
export interface StepFunctionsEnrichmentProps {
/**
* The input transformation for the enrichment
* @see https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-input-transformation.html
* @default - None
*/
readonly inputTransformation?: InputTransformation;
}

/**
* A StepFunctions enrichment for a pipe
*/
export class StepFunctionsEnrichment implements IEnrichment {
public readonly enrichmentArn: string;

private readonly inputTransformation?: InputTransformation;
constructor(private readonly stateMachine: IStateMachine, props?: StepFunctionsEnrichmentProps) {
if (stateMachine instanceof StateMachine
&& (stateMachine.stateMachineType !== StateMachineType.EXPRESS)
) {
throw new Error(`EventBridge pipes only support EXPRESS workflows as enrichment, got ${stateMachine.stateMachineType}`);
}
this.enrichmentArn = stateMachine.stateMachineArn;
this.inputTransformation = props?.inputTransformation;
}

bind(pipe: IPipe): EnrichmentParametersConfig {
return {
enrichmentParameters: {
inputTemplate: this.inputTransformation?.bind(pipe).inputTemplate,
},
};
}

grantInvoke(pipeRole: IRole): void {
this.stateMachine.grantStartSyncExecution(pipeRole);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import * as cdk from 'aws-cdk-lib';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as stepfunctions from 'aws-cdk-lib/aws-stepfunctions';
import { Construct } from 'constructs';
import * as pipes from '@aws-cdk/aws-pipes-alpha';
import * as enrichments from '@aws-cdk/aws-pipes-enrichments-alpha';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`stepfunctions should grant pipe role invoke access 1`] = `
{
"EnrichmentStateMachineRoleDE810FCA": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": {
"Fn::FindInMap": [
"ServiceprincipalMap",
{
"Ref": "AWS::Region",
},
"states",
],
},
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
"MyPipeRoleCBC8E9AB": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "pipes.amazonaws.com",
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
}
`;

exports[`stepfunctions should grant pipe role invoke access 2`] = `
{
"MyPipeRoleDefaultPolicy31387C20": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": "states:StartSyncExecution",
"Effect": "Allow",
"Resource": {
"Ref": "EnrichmentStateMachine8BED6C4E",
},
},
],
"Version": "2012-10-17",
},
"PolicyName": "MyPipeRoleDefaultPolicy31387C20",
"Roles": [
{
"Ref": "MyPipeRoleCBC8E9AB",
},
],
},
"Type": "AWS::IAM::Policy",
},
}
`;
Loading

0 comments on commit 8b495f9

Please sign in to comment.