Skip to content

Commit

Permalink
feat(plugin): added new SQS plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
Inqnuam committed Mar 17, 2023
1 parent 392c3e0 commit b09701c
Show file tree
Hide file tree
Showing 28 changed files with 2,760 additions and 361 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ functions:
```

- `files`
include additionnal files into the package.
include additional files into the package.

```yaml
functions:
Expand Down Expand Up @@ -163,6 +163,7 @@ See [defineConfig](resources/defineConfig.md) for advanced configuration.

- [AWS Local S3](resources/s3.md)
- [AWS Local SNS](resources/sns.md)
- [AWS Local SQS](resources/sqs.md)
- [DynamoDB Local Streams](https://github.com/Inqnuam/serverless-aws-lambda-ddb-streams)
- [Jest](https://github.com/Inqnuam/serverless-aws-lambda-jest)
- [Vitest](https://github.com/Inqnuam/serverless-aws-lambda-vitest)
1 change: 1 addition & 0 deletions build.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const buildIndex = bundle.bind(null, {
"./src/lib/runtime/worker.ts",
"./src/lambda/router.ts",
"./src/plugins/sns/index.ts",
"./src/plugins/sqs/index.ts",
"./src/plugins/s3/index.ts",
"./src/lambda/body-parser.ts",
],
Expand Down
16 changes: 12 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "serverless-aws-lambda",
"version": "4.3.1",
"version": "4.4.0",
"description": "AWS Application Load Balancer and API Gateway - Lambda dev tool for Serverless. Allows Express synthax in handlers. Supports packaging, local invoking and offline real ALB and APG lambda server mocking.",
"author": "Inqnuam",
"license": "MIT",
Expand Down Expand Up @@ -38,21 +38,25 @@
"types": "./dist/plugins/sns/index.d.ts",
"require": "./dist/plugins/sns/index.js"
},
"./sqs": {
"types": "./dist/plugins/sqs/index.d.ts",
"require": "./dist/plugins/sqs/index.js"
},
"./s3": {
"types": "./dist/plugins/s3/index.d.ts",
"require": "./dist/plugins/s3/index.js"
}
},
"dependencies": {
"@types/serverless": "^3.12.10",
"@types/serverless": "^3.12.11",
"archiver": "^5.3.1",
"esbuild": "0.17.11",
"esbuild": "0.17.12",
"serve-static": "^1.15.0"
},
"devDependencies": {
"@types/archiver": "^5.3.1",
"@types/node": "^14.14.31",
"@types/serve-static": "^1.15.0",
"@types/serve-static": "^1.15.1",
"typescript": "^4.9.5"
},
"keywords": [
Expand All @@ -62,9 +66,13 @@
"local",
"apg",
"alb",
"elb",
"lambda",
"sns",
"sqs",
"s3",
"stream",
"dynamodb",
"invoke",
"bundle",
"esbuild",
Expand Down
72 changes: 72 additions & 0 deletions resources/sqs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
## AWS Local SQS

### Description

Plugin for serverless-aws-lambda to trigger locally your sqs event defined lambdas automatically.
Local Queues are created based on your serverless.yml.
To define default and/or override Queue attributes see [Plugin configs](../src/plugins/sqs/types.ts).

### Installation

Import the plugin inside your defineConfig.

```js
// config.js
const { defineConfig } = require("serverless-aws-lambda/defineConfig");
const { sqsPlugin } = require("serverless-aws-lambda/sqs");

module.exports = defineConfig({
plugins: [sqsPlugin(config)],
});
```

### Supported Requests

supports both AWS SDK and CLI requests.

✅ supported
🌕 planned
❌ not planned

- 🌕 AddPermission
- ✅ ChangeMessageVisibility
- ✅ ChangeMessageVisibilityBatch
- 🌕 CreateQueue
- ✅ DeleteMessage
- ✅ DeleteMessageBatch
- ✅ DeleteQueue
- 🌕 GetQueueAttributes
- 🌕 GetQueueUrl
- 🌕 ListDeadLetterSourceQueues
- ✅ ListQueues
- ✅ ListQueueTags
- ✅ PurgeQueue
- ✅ ReceiveMessage
- 🌕 RemovePermission
- ✅ SendMessage
- ✅ SendMessageBatch
- 🌕 SetQueueAttributes
- ✅ TagQueue
- ✅ UntagQueue

### Examples

AWS CLI

```bash
aws sqs --region eu-west-1 --endpoint http://localhost:9999/@sqs list-queues
```

AWS SDK

```js
import { SQSClient, ListQueuesCommand } from "@aws-sdk/client-sqs";

const client = new SQSClient({
region: "eu-west-1",
endpoint: `http://localhost:9999/@sqs`,
});

const queues = await client.send(new ListQueuesCommand({}));
console.log(queues);
```
11 changes: 9 additions & 2 deletions src/defineConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ export interface ClientConfigParams {
config: Config;
options: Options;
serverless: Serverless;
resources: {
ddb: {};
kinesis: {};
sns: {};
sqs: {};
};
}

export interface SlsAwsLambdaPlugin {
Expand All @@ -52,7 +58,7 @@ export interface SlsAwsLambdaPlugin {
*/
method?: HttpMethod | HttpMethod[];
filter: string | RegExp;
callback: (this: ClientConfigParams, req: IncomingMessage, res: ServerResponse) => Promise<void> | void;
callback: (this: ClientConfigParams, req: IncomingMessage, res: ServerResponse) => Promise<any | void> | any | void;
}[];
};
}
Expand Down Expand Up @@ -112,7 +118,7 @@ function defineConfig(options: Options) {
}
return async function config(
this: ClientConfigParams,
{ stop, lambdas, isDeploying, isPackaging, setEnv, stage, port, esbuild, serverless }: ClientConfigParams
{ stop, lambdas, isDeploying, isPackaging, setEnv, stage, port, esbuild, serverless, resources }: ClientConfigParams
): Promise<Omit<Config, "config" | "options">> {
let config: Config = {
esbuild: options.esbuild ?? {},
Expand All @@ -135,6 +141,7 @@ function defineConfig(options: Options) {
serverless,
options,
config,
resources,
};
if (options.plugins) {
config.offline!.onReady = async (port, ip) => {
Expand Down
21 changes: 16 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ class ServerlessAwsLambda extends Daemon {
nodeVersion: number | boolean | string | undefined = false;
invokeName?: string;
afterDeployCallbacks: (() => void | Promise<void>)[] = [];
resources: {
ddb: {};
kinesis: {};
sns: {};
sqs: {};
};
constructor(serverless: any, options: any) {
super({ debug: process.env.SLS_DEBUG == "*" });

Expand Down Expand Up @@ -117,6 +123,8 @@ class ServerlessAwsLambda extends Daemon {
"before:invoke:local:invoke": this.invokeLocal.bind(this),
"after:aws:deploy:finalize:cleanup": this.afterDeploy.bind(this),
};

this.resources = getResources(this.serverless);
}

async invokeLocal() {
Expand Down Expand Up @@ -289,7 +297,7 @@ class ServerlessAwsLambda extends Daemon {
functionsNames = functionsNames.filter((x) => x == this.invokeName);
}
const defaultRuntime = this.serverless.service.provider.runtime;
const resources = getResources(this.serverless);

// @ts-ignore
const Outputs = this.serverless.service.resources?.Outputs;
const lambdas = functionsNames.reduce((accum: any[], funcName: string) => {
Expand Down Expand Up @@ -319,6 +327,7 @@ class ServerlessAwsLambda extends Daemon {
timeout: lambda.timeout ?? this.runtimeConfig.timeout ?? DEFAULT_LAMBDA_TIMEOUT,
endpoints: [],
sns: [],
sqs: [],
ddb: [],
s3: [],
kinesis: [],
Expand Down Expand Up @@ -350,7 +359,7 @@ class ServerlessAwsLambda extends Daemon {
};

// @ts-ignore
lambdaDef.onError = parseDestination(lambda.onError, Outputs, resources);
lambdaDef.onError = parseDestination(lambda.onError, Outputs, this.resources);

if (lambdaDef.onError?.kind == "lambda") {
log.YELLOW("Dead-Letter queue could only be a SNS or SQS service");
Expand All @@ -359,9 +368,9 @@ class ServerlessAwsLambda extends Daemon {
//@ts-ignore
if (lambda.destinations && typeof lambda.destinations == "object") {
//@ts-ignore
lambdaDef.onFailure = parseDestination(lambda.destinations.onFailure, Outputs, resources);
lambdaDef.onFailure = parseDestination(lambda.destinations.onFailure, Outputs, this.resources);
//@ts-ignore
lambdaDef.onSuccess = parseDestination(lambda.destinations.onSuccess, Outputs, resources);
lambdaDef.onSuccess = parseDestination(lambda.destinations.onSuccess, Outputs, this.resources);
}

lambdaDef.onInvoke = (callback: (event: any, info?: any) => void) => {
Expand All @@ -379,9 +388,10 @@ class ServerlessAwsLambda extends Daemon {
lambdaDef.environment.AWS_LAMBDA_FUNCTION_MEMORY_SIZE = lambdaDef.memorySize;

if (lambda.events.length) {
const { endpoints, sns, ddb, s3, kinesis } = parseEvents(lambda.events, Outputs, resources);
const { endpoints, sns, sqs, ddb, s3, kinesis } = parseEvents(lambda.events, Outputs, this.resources);
lambdaDef.endpoints = endpoints;
lambdaDef.sns = sns;
lambdaDef.sqs = sqs;
lambdaDef.ddb = ddb;
lambdaDef.s3 = s3;
lambdaDef.kinesis = kinesis;
Expand Down Expand Up @@ -479,6 +489,7 @@ class ServerlessAwsLambda extends Daemon {
stage: this.options.stage ?? this.serverless.service.provider.stage ?? "dev",
esbuild: esbuild,
serverless: this.serverless,
resources: this.resources,
};
let exportedObject: any = {};

Expand Down
6 changes: 1 addition & 5 deletions src/lib/parseEvents/ddbStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,7 @@ export const parseDdbStreamDefinitions = (Outputs: any, resources: any, event: a
parsedEvent.batchWindow = val.batchWindow;
}

if (
!isNaN(val.maximumRecordAgeInSeconds) &&
parsedEvent.maximumRecordAgeInSeconds >= StreamProps.minRecordAge &&
parsedEvent.maximumRecordAgeInSeconds <= StreamProps.maxRecordAge
) {
if (!isNaN(val.maximumRecordAgeInSeconds) && val.maximumRecordAgeInSeconds >= StreamProps.minRecordAge && val.maximumRecordAgeInSeconds <= StreamProps.maxRecordAge) {
parsedEvent.maximumRecordAgeInSeconds = val.maximumRecordAgeInSeconds;
}

Expand Down
60 changes: 58 additions & 2 deletions src/lib/parseEvents/getResources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,71 @@ export const getResources = (serverless: any) => {
StreamModeDetails,
};
} else if (value.Type == "AWS::SNS::Topic" && value.Properties) {
const { TopicName } = value.Properties;
const { TopicName, ContentBasedDeduplication, DisplayName, FifoTopic } = value.Properties;
accum.sns[key] = {
TopicName,
ContentBasedDeduplication,
DisplayName,
FifoTopic,
};
} else if (value.Type == "AWS::SQS::Queue" && value.Properties) {
const { QueueName } = value.Properties;
const {
QueueName,
ContentBasedDeduplication,
DeduplicationScope,
DelaySeconds,
FifoQueue,
FifoThroughputLimit,
KmsDataKeyReusePeriodSeconds,
KmsMasterKeyId,
MaximumMessageSize,
MessageRetentionPeriod,
ReceiveMessageWaitTimeSeconds,
RedriveAllowPolicy,
RedrivePolicy,
VisibilityTimeout,
Tags,
} = value.Properties;

accum.sqs[key] = {
QueueName,
ContentBasedDeduplication,
DeduplicationScope,
DelaySeconds,
FifoQueue,
FifoThroughputLimit,
KmsDataKeyReusePeriodSeconds,
KmsMasterKeyId,
MaximumMessageSize,
MessageRetentionPeriod,
ReceiveMessageWaitTimeSeconds,
VisibilityTimeout,
RedriveAllowPolicy,
Tags: {},
};

if (RedrivePolicy && typeof RedrivePolicy.deadLetterTargetArn == "string") {
if (RedrivePolicy.deadLetterTargetArn.startsWith("arn:aws:sqs:")) {
const components = RedrivePolicy.deadLetterTargetArn.split(":");
const name = components[components.length - 1];

accum.sqs[key].RedrivePolicy = {
name,
deadLetterTargetArn: RedrivePolicy.deadLetterTargetArn,
maxReceiveCount: !isNaN(RedrivePolicy.maxReceiveCount) ? Number(RedrivePolicy.maxReceiveCount) : 10,
};
} else {
console.log("deadLetterTargetArn must be a string like arn:aws:sqs:...");
}
}

if (Array.isArray(Tags)) {
Tags.forEach((x) => {
if (typeof x.Key == "string" && x.Value) {
accum.sqs[key].Tags[x.Key] = x.Value;
}
});
}
}

return accum;
Expand Down
9 changes: 8 additions & 1 deletion src/lib/parseEvents/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,22 @@ import { parseSns } from "./sns";
import { parseDdbStreamDefinitions } from "./ddbStream";
import { parseS3 } from "./s3";
import { parseKinesis } from "./kinesis";
import { parseSqs } from "./sqs";

const supportedServices: IDestination["kind"][] = ["lambda", "sns", "sqs"];
type arn = [string, string, IDestination["kind"], string, string, string, string];

export const parseEvents = (events: any[], Outputs: any, resources: any) => {
const endpoints: LambdaEndpoint[] = [];
const sns: any[] = [];
const sqs: any[] = [];
const ddb: any[] = [];
const s3: any[] = [];
const kinesis: any[] = [];
for (const event of events) {
const slsEvent = parseEndpoints(event);
const snsEvent = parseSns(Outputs, resources, event);
const sqsEvent = parseSqs(Outputs, resources, event);
const ddbStream = parseDdbStreamDefinitions(Outputs, resources, event);
const s3Event = parseS3(event);
const kinesisStream = parseKinesis(event, Outputs, resources);
Expand All @@ -28,6 +31,10 @@ export const parseEvents = (events: any[], Outputs: any, resources: any) => {
sns.push(snsEvent);
}

if (sqsEvent) {
sqs.push(sqsEvent);
}

if (ddbStream) {
ddb.push(ddbStream);
}
Expand All @@ -40,7 +47,7 @@ export const parseEvents = (events: any[], Outputs: any, resources: any) => {
}
}

return { ddb, endpoints, s3, sns, kinesis };
return { ddb, endpoints, s3, sns, sqs, kinesis };
};

export const parseDestination = (destination: any, Outputs: any, resources: any): IDestination | undefined => {
Expand Down
Loading

0 comments on commit b09701c

Please sign in to comment.