Skip to content

Commit

Permalink
feat: add remote SQS EventSourceMapping support
Browse files Browse the repository at this point in the history
feat: add SQS full support

fix: minor fixes

core: update dependencies
  • Loading branch information
Inqnuam committed Apr 28, 2024
1 parent ff0cec7 commit b26019a
Show file tree
Hide file tree
Showing 32 changed files with 2,690 additions and 2,188 deletions.
1 change: 1 addition & 0 deletions .npmignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ src
build.mjs
TODO.md
tsconfig.json
vitest.config.ts
resources/invokeError.png
resources/invokeSuccess.png
templates
3 changes: 2 additions & 1 deletion build.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const compileDeclarations = () => {
console.log(error.output?.[1]?.toString());
}
};
const external = ["esbuild", "archiver", "serve-static", "@smithy/eventstream-codec"];
const external = ["esbuild", "archiver", "serve-static", "@smithy/eventstream-codec", "local-aws-sqs", "@aws-sdk/client-sqs"];
const watchPlugin = {
name: "watch-plugin",
setup: (build) => {
Expand All @@ -30,6 +30,7 @@ const esBuildConfig = {
outdir: "dist",
format: "cjs",
plugins: [watchPlugin],
dropLabels: shouldWatch ? [] : ["DEV"],
drop: shouldWatch ? [] : ["debugger"],
external,
};
Expand Down
15 changes: 9 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "serverless-aws-lambda",
"version": "4.6.6",
"version": "4.7.0",
"description": "AWS Application Load Balancer and API Gateway - Lambda dev tool for Serverless. Allows Express synthax in handlers. Supports packaging, local invoking and offline ALB, APG, S3, SNS, SQS, DynamoDB Stream server mocking.",
"author": "Inqnuam",
"license": "MIT",
Expand Down Expand Up @@ -58,17 +58,20 @@
}
},
"dependencies": {
"@smithy/eventstream-codec": "^2.0.12",
"@types/serverless": "^3.12.17",
"@aws-sdk/client-sqs": ">=3",
"@smithy/eventstream-codec": "^2.2.0",
"@types/serverless": "^3.12.22",
"archiver": "^5.3.1",
"esbuild": "^0.19.5",
"esbuild": "0.20.2",
"local-aws-sqs": "^1.0.1",
"serve-static": "^1.15.0"
},
"devDependencies": {
"@types/archiver": "^5.3.2",
"@types/node": "^14.14.31",
"@types/serve-static": "^1.15.3",
"typescript": "^5.2.2"
"@types/serve-static": "^1.15.5",
"typescript": "^5.4.5",
"vitest": "^1.5.2"
},
"keywords": [
"aws",
Expand Down
6 changes: 6 additions & 0 deletions resources/defineConfig.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ export default defineConfig({
port: 9999,
},
plugins: [],
services: {
sqs: {
region: "us-east-1",
endpoint: "http://localhost:5433",
},
},
});
```

Expand Down
64 changes: 6 additions & 58 deletions resources/sqs.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,72 +2,20 @@

### Description

Plugin for serverless-aws-lambda to trigger locally your sqs event defined lambdas automatically.
Local Queues are created based on your serverless.yml.
To define default and/or override Queue attributes see [Plugin configs](../src/plugins/sqs/types.ts).
Currently FIFO queues are considered as Standart.
Wrapper for [Local AWS SQS](https://github.com/Inqnuam/local-aws-sqs).
This plugin automatically creates all Queues declared in `serverless.yml` and enables SQS EventSourceMapping by setting AWS SQS Client config in [defineConfig](./defineConfig.md) `services` if it is not already defined.

### Installation
### Usage

Import the plugin inside your defineConfig.
Import the plugin inside your defineConfig.
To define default and/or override Queue attributes see [Plugin configs](../src/plugins/sqs/types.ts).

```js
// config.js
// config.ts
import { defineConfig } from "serverless-aws-lambda/defineConfig";
import { sqsPlugin } = from "serverless-aws-lambda/sqs";

export default defineConfig({
plugins: [sqsPlugin(config)],
});
```

### Supported Requests

supports both AWS SDK, CLI and raw low-level API requests.

✅ supported
🌕 planned
❌ not planned

- 🌕 AddPermission
- ✅ ChangeMessageVisibility
- ✅ ChangeMessageVisibilityBatch
- 🌕 CreateQueue
- ✅ DeleteMessage
- ✅ DeleteMessageBatch
- ✅ DeleteQueue
- 🌕 GetQueueAttributes
- ✅ GetQueueUrl
- 🌕 ListDeadLetterSourceQueues
- ✅ ListQueues
- ✅ ListQueueTags
- ✅ PurgeQueue
- ✅ ReceiveMessage
- 🌕 RemovePermission
- ✅ SendMessage
- ✅ SendMessageBatch
- 🌕 SetQueueAttributes
- ✅ TagQueue
- ✅ UntagQueue

### Examples

AWS CLI

```bash
aws sqs --region eu-west-1 --endpoint http://localhost:9999/@sqs list-queues
```

AWS SDK

```js
import { SQSClient, ListQueuesCommand } from "@aws-sdk/client-sqs";

const client = new SQSClient({
region: "eu-west-1",
endpoint: `http://localhost:9999/@sqs`,
});

const queues = await client.send(new ListQueuesCommand({}));
console.log(queues);
```
19 changes: 18 additions & 1 deletion src/defineConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { IncomingMessage, ServerResponse } from "http";
import type Serverless from "serverless";
import { log } from "./lib/utils/colorize";
import { exitEvents } from "./server";
import type { SQSClientConfig, SQSClient } from "@aws-sdk/client-sqs";

export type ILambda = {
/**
Expand All @@ -31,6 +32,10 @@ export type ILambda = {
onInvokeSuccess: (callback: (input: any, output: any, info?: any) => void) => void;
} & Omit<ILambdaMock, "invokeSub" | "invokeSuccessSub" | "invokeErrorSub" | "runner">;

interface IServicesConfig {
sqs?: SQSClientConfig;
}

export interface ClientConfigParams {
stop: (err?: any) => Promise<void>;
lambdas: ILambda[];
Expand All @@ -52,6 +57,8 @@ export interface ClientConfigParams {
sns: {};
sqs: {};
};
getServices(): { sqs?: SQSClient };
setServices({ sqs }: IServicesConfig): Promise<void>;
}

export interface OfflineRequest {
Expand Down Expand Up @@ -102,6 +109,10 @@ export interface Options {
* This allows conditionnally ( true ?? customPlugin) plugin import.
*/
plugins?: (SlsAwsLambdaPlugin | null | undefined | boolean)[];
/**
* AWS clients configs used by EventSourceMapping, Lambda error/success destination.
*/
services?: IServicesConfig;
}

let exiting = false;
Expand Down Expand Up @@ -151,7 +162,7 @@ function defineConfig(options: Options) {
}
return async function config(
this: ClientConfigParams,
{ stop, lambdas, isDeploying, isPackaging, setEnv, stage, region, esbuild, serverless, resources }: ClientConfigParams
{ stop, lambdas, isDeploying, isPackaging, setEnv, stage, region, esbuild, serverless, resources, getServices, setServices }: ClientConfigParams
): Promise<Omit<Config, "config" | "options">> {
let config: Config = {
esbuild: options.esbuild ?? {},
Expand All @@ -162,6 +173,10 @@ function defineConfig(options: Options) {
afterDeployCallbacks: [],
};

if (options.services) {
await setServices(options.services);
}

const self = {
stop,
lambdas,
Expand All @@ -175,6 +190,8 @@ function defineConfig(options: Options) {
options,
config,
resources,
getServices,
setServices,
};
if (options.plugins) {
config.offline!.onReady = async (port, ip) => {
Expand Down
13 changes: 13 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import { parseFuncUrl } from "./lib/parseEvents/funcUrl";
import { LambdaRequests } from "./plugins/lambda/index";
import { readDefineConfig } from "./lib/utils/readDefineConfig";
import { patchSchema } from "./lib/utils/schema";
import { AwsServices } from "./lib/services";
import type { SQSClientConfig } from "@aws-sdk/client-sqs";

const cwd = process.cwd();
const DEFAULT_LAMBDA_TIMEOUT = 6;
Expand Down Expand Up @@ -54,6 +56,7 @@ class ServerlessAwsLambda extends Daemon {
invokeName?: string;
afterDeployCallbacks: (() => void | Promise<void>)[] = [];
resources: ReturnType<typeof getResources> = { ddb: {}, kinesis: {}, sns: {}, sqs: {} };
static tags: string[] = ["build"];
constructor(serverless: any, options: any, pluginUtils: PluginUtils) {
super({ debug: process.env.SLS_DEBUG == "*" });

Expand Down Expand Up @@ -534,6 +537,16 @@ class ServerlessAwsLambda extends Daemon {
esbuild: esbuild,
serverless: this.serverless,
resources: this.resources,
getServices() {
return {
sqs: AwsServices.sqs,
};
},
setServices: async ({ sqs }: { sqs?: SQSClientConfig }) => {
if (sqs) {
await AwsServices.setSqsClient(sqs);
}
},
};

const customOfflineRequests = LambdaRequests.map((x) => {
Expand Down
14 changes: 11 additions & 3 deletions src/lib/parseEvents/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ export interface ISqs {
batchSize?: number;
maximumBatchingWindow?: number;
filterPatterns?: any;
enabled?: boolean;
functionResponseType?: ["ReportBatchItemFailures"];
}
const parseQueueNameFromObject = (resources: any, Outputs: any, obj: any) => {
const [key, value] = Object.entries(obj)?.[0];
Expand Down Expand Up @@ -49,7 +51,7 @@ export const parseSqs = (Outputs: any, resources: any, event: any): ISqs | undef
sqs.name = event.sqs;
}
} else {
const { arn, filterPatterns, batchSize, maximumBatchingWindow, functionResponseType } = event.sqs;
const { arn, filterPatterns, batchSize, maximumBatchingWindow, functionResponseType, enabled } = event.sqs;

if (typeof arn == "string") {
const arnComponents = arn.split(":");
Expand All @@ -72,8 +74,14 @@ export const parseSqs = (Outputs: any, resources: any, event: any): ISqs | undef
if (maximumBatchingWindow) {
sqs.maximumBatchingWindow = maximumBatchingWindow;
}
if (functionResponseType) {
sqs.functionResponseType = functionResponseType;
if (functionResponseType == "ReportBatchItemFailures" || (Array.isArray(functionResponseType) && functionResponseType[0] == "ReportBatchItemFailures")) {
sqs.functionResponseType = ["ReportBatchItemFailures"];
}

if (typeof enabled == "boolean") {
sqs.enabled = enabled;
} else {
sqs.enabled = true;
}
}
if (sqs.name) {
Expand Down
94 changes: 94 additions & 0 deletions src/lib/runtime/eventSourceMapping/base.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { randomUUID } from "crypto";
import type { ILambdaMock } from "../rapidApi";
import { filterObject } from "./filter";

export interface IEventSourceMappingConfig {
AmazonManagedKafkaEventSourceConfig?: {
ConsumerGroupId: string;
};
BatchSize: number;
BisectBatchOnFunctionError?: boolean;
DestinationConfig?: {
OnFailure: {
Destination: string;
};
OnSuccess: {
Destination: string;
};
};
DocumentDBEventSourceConfig?: {
CollectionName: string;
DatabaseName: string;
FullDocument: "UpdateLookup" | "Default";
};
Enabled: boolean;
EventSourceArn: string;
FilterCriteria?: {
Filters: { Pattern: string }[];
};
FunctionName: string;
FunctionResponseTypes?: ["ReportBatchItemFailures"];
MaximumBatchingWindowInSeconds?: number;
MaximumRecordAgeInSeconds?: number;
MaximumRetryAttempts?: number;
ParallelizationFactor?: number;
Queues?: string[];
ScalingConfig?: {
MaximumConcurrency?: number;
};
SelfManagedEventSource?: {
Endpoints?: string[];
};
SelfManagedKafkaEventSourceConfig?: {
ConsumerGroupId?: string;
};
SourceAccessConfigurations?: {
Type?:
| "BASIC_AUTH"
| "VPC_SUBNET"
| "VPC_SECURITY_GROUP"
| "SASL_SCRAM_512_AUTH"
| "SASL_SCRAM_256_AUTH"
| "VIRTUAL_HOST"
| "CLIENT_CERTIFICATE_TLS_AUTH"
| "SERVER_ROOT_CA_CERTIFICATE";
URI?: string;
}[];

StartingPosition?: "TRIM_HORIZON" | "LATEST" | "AT_TIMESTAMP";
StartingPositionTimestamp?: number;
Topics?: string[];
TumblingWindowInSeconds?: number;
}

export abstract class EventSourceMapping {
static SOURCES: EventSourceMapping[] = [];
UUID = randomUUID();
LastModified = Date.now();
LastProcessingResult: "No records processed" = "No records processed";
State: "Creating" | "Enabling" | "Enabled" | "Disabling" | "Disabled" = "Creating";
StateTransitionReason: "USER_INITIATED" | "User action" = "User action";

constructor(public config: IEventSourceMappingConfig, public lambda: ILambdaMock, public legacyDefinition: any) {}

filterRecords(records: any[]) {
const pass: any[] = [];
const failed: any[] = [];

if (!Array.isArray(this.config.FilterCriteria?.Filters)) {
return [records, []];
}

for (const record of records) {
const filterResult = this.config.FilterCriteria!.Filters.map((p) => filterObject(JSON.parse(p.Pattern), record));
const hasPassedFilters = filterResult.find((x) => x === true);
if (hasPassedFilters) {
pass.push(record);
} else {
failed.push(record);
}
}

return [pass, failed];
}
}
Loading

0 comments on commit b26019a

Please sign in to comment.