Skip to content

Commit

Permalink
Merge pull request #188 from kuzzleio/feat/paas-dump-es
Browse files Browse the repository at this point in the history
Add command for dumping an Elasticsearch instance in the PaaS
  • Loading branch information
rolljee committed Jan 8, 2024
2 parents e8e20b1 + 0c64151 commit a66bfe9
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 10 deletions.
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ All other arguments and options will be passed as-is to the `sdk:query` method.
* [`kourou instance:logs`](#kourou-instancelogs)
* [`kourou instance:spawn`](#kourou-instancespawn)
* [`kourou paas:deploy ENVIRONMENT APPLICATIONID IMAGE`](#kourou-paasdeploy-environment-applicationid-image)
* [`kourou paas:elasticsearch:dump ENVIRONMENT APPLICATIONID DUMPDIRECTORY`](#kourou-paaselasticsearchdump-environment-applicationid-dumpdirectory)
* [`kourou paas:init PROJECT`](#kourou-paasinit-project)
* [`kourou paas:login`](#kourou-paaslogin)
* [`kourou paas:logs ENVIRONMENT APPLICATION`](#kourou-paaslogs-environment-application)
Expand Down Expand Up @@ -1064,6 +1065,27 @@ OPTIONS

_See code: [lib/commands/paas/deploy.js](lib/commands/paas/deploy.js)_

## `kourou paas:elasticsearch:dump ENVIRONMENT APPLICATIONID DUMPDIRECTORY`

Dump data from the Elasticsearch of a PaaS application

```
USAGE
$ kourou paas:elasticsearch:dump ENVIRONMENT APPLICATIONID DUMPDIRECTORY
ARGUMENTS
ENVIRONMENT Project environment name
APPLICATIONID Application Identifier
DUMPDIRECTORY Directory where to store dump files
OPTIONS
--batch-size=batch-size [default: 2000] Maximum batch size
--help show CLI help
--project=project Current PaaS project
```

_See code: [src/commands/paas/elasticsearch/dump.ts](src/commands/paas/elasticsearch/dump.ts)_

## `kourou paas:init PROJECT`

Initialize a PaaS project in current directory
Expand Down
36 changes: 27 additions & 9 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
"@types/listr": "^0.14.2",
"@types/mocha": "^8.2.2",
"@types/ndjson": "^2.0.0",
"@types/node": "^14.14.41",
"@types/node": "^18.19.0",
"@types/node-emoji": "^1.8.1",
"@types/node-fetch": "^2.6.1",
"@types/tar": "^6.1.3",
Expand Down
226 changes: 226 additions & 0 deletions src/commands/paas/elasticsearch/dump.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
import path from "path";
import fs from "node:fs/promises";

import ndjson from "ndjson";
import { flags } from "@oclif/command";

import { PaasKommand } from "../../../support/PaasKommand";

/**
* Results of the document dump action.
*/
type DocumentDump = {
pit_id: string;
hits: DocumentDumpHits;
};

type DocumentDumpHits = {
total: DocumentDumpHitsTotal;
hits: DocumentDumpHit[];
};

type DocumentDumpHitsTotal = {
value: number;
};

type DocumentDumpHit = {
sort: string[];
};

class PaasEsDump extends PaasKommand {
public static description = "Dump data from the Elasticsearch of a PaaS application";

public static flags = {
help: flags.help(),
project: flags.string({
description: "Current PaaS project",
}),
"batch-size": flags.integer({
description: "Maximum batch size",
default: 2000,
}),
};

static args = [
{
name: "environment",
description: "Project environment name",
required: true,
},
{
name: "applicationId",
description: "Application Identifier",
required: true,
},
{
name: "dumpDirectory",
description: "Directory where to store dump files",
required: true,
}
];

async runSafe() {
// Check that the batch size is positive
if (this.flags["batch-size"] <= 0) {
this.logKo(`The batch size must be greater than zero. (Specified batch size: ${this.flags["batch-size"]})`);
process.exit(1);
}

// Log in to the PaaS
const apiKey = await this.getCredentials();

await this.initPaasClient({ apiKey });

const user = await this.paas.auth.getCurrentUser();
this.logInfo(
`Logged as "${user._id}" for project "${this.flags.project || this.getProject()
}"`
);

// Create the dump directory
await fs.mkdir(this.args.dumpDirectory, { recursive: true });

// Dump the indexes
this.logInfo("Dumping Elasticsearch indexes...");

const indexesResult = await this.getAllIndexes();
await fs.writeFile(path.join(this.args.dumpDirectory, "indexes.json"), JSON.stringify(indexesResult));

this.logOk("Elasticsearch indexes dumped!");

// Dump all the documents
this.logInfo("Dumping Elasticsearch documents...");
await this.dumpAllDocuments();

this.logOk("Elasticsearch documents dumped!");
this.logOk(`The dumped files are available under "${path.resolve(this.args.dumpDirectory)}"`)
}

/**
* @description Get all indexes from the Elasticsearch of the PaaS application.
* @returns The indexes.
*/
private async getAllIndexes() {
const { result }: any = await this.paas.query({
controller: "application/storage",
action: "getIndexes",
environmentId: this.args.environment,
projectId: this.flags.project || this.getProject(),
applicationId: this.args.applicationId,
body: {},
});

return result;
}

/**
* @description Dump documents from the Elasticsearch of the PaaS application.
* @param pitId ID of the PIT opened on Elasticsearch.
* @param searchAfter Cursor for dumping documents after a certain one.
* @returns The dumped documents.
*/
private async dumpDocuments(pitId: string, searchAfter: string[]): Promise<DocumentDump> {
const { result }: any = await this.paas.query({
controller: "application/storage",
action: "dumpDocuments",
environmentId: this.args.environment,
projectId: this.flags.project || this.getProject(),
applicationId: this.args.applicationId,
body: {
pitId,
searchAfter: JSON.stringify(searchAfter),
size: this.flags["batch-size"],
},
});

return result;
}

private async dumpAllDocuments() {
// Prepare dumping all documents
let pitId = "";
let searchAfter: string[] = [];

let dumpedDocuments = 0;
let totalDocuments = 0;

const fd = await fs.open(path.join(this.args.dumpDirectory, "documents.jsonl"), "w");
const writeStream = fd.createWriteStream();
const ndjsonStream = ndjson.stringify();

writeStream.on("error", (error) => {
throw error;
});

ndjsonStream.on("data", (line: string) => {
writeStream.write(line);
});

const teardown = async () => {
// Finish the dump session if a PIT ID is set
if (pitId.length > 0) {
await this.finishDump(pitId);
}

// Close the open streams/file
writeStream.close();
await fd.close();
};

try {
// Dump the first batch
let result = await this.dumpDocuments(pitId, searchAfter);
let hits = result.hits.hits;

while (hits.length > 0) {
// Update the PIT ID and the cursor for the next dump
pitId = result.pit_id;
searchAfter = hits[hits.length - 1].sort;

// Save the documents
for (let i = 0; i < hits.length; ++i) {
ndjsonStream.write(hits[i]);
}

dumpedDocuments += hits.length;
totalDocuments = result.hits.total.value;
this.logInfo(`Dumping Elasticsearch documents: ${Math.floor(dumpedDocuments / totalDocuments * 100)}% (${dumpedDocuments}/${totalDocuments})`);

// Dump the next batch
result = await this.dumpDocuments(pitId, searchAfter);
hits = result.hits.hits;
}
} catch (error: any) {
teardown();

this.logKo(`Error while dumping the documents: ${error}`);
process.exit(1);
}

// Finish the dump
teardown();
}

/**
* @description Finish the document dumping session.
* @param pitId ID of the PIT opened on Elasticsearch.
*/
private async finishDump(pitId: string) {
try {
await this.paas.query({
controller: "application/storage",
action: "finishDumpDocuments",
environmentId: this.args.environment,
projectId: this.flags.project || this.getProject(),
applicationId: this.args.applicationId,
body: {
pitId,
},
});
} catch (error: any) {
this.logInfo(`Unable to cleanly finish the dump session: ${error}`);
}
}
}

export default PaasEsDump;

0 comments on commit a66bfe9

Please sign in to comment.