Skip to content

Commit

Permalink
Fix outcomes archive not being ready before response
Browse files Browse the repository at this point in the history
With larger outcomes archives now being packed due to more and larger
Zap reports, we now wait for the completion of the zipping process
before responding to CLI.
  • Loading branch information
binarymist committed Jan 11, 2022
1 parent ea17ef5 commit 3800046
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 17 deletions.
9 changes: 9 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
# Copyright (C) 2017-2022 BinaryMist Limited. All rights reserved.

# Use of this software is governed by the Business Source License
# included in the file /licenses/bsl.md

# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

FROM node:16-alpine

ARG LOCAL_USER_ID
Expand Down
4 changes: 2 additions & 2 deletions compose/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Orchestrator and tester containers share the same group.
# They also read, write and delete outcomes files within this directory.
HOST_DIR=</mnt/your-spare-drive/Logs/purpleteam/outcomes>
# This directory needs group rwx and other r (for the Emissary to read) permissions.
# Both app-scanner and it's Emissary mount this directory. The app-scanner puts files here for the Emissary to consume.
# Both app-scanner and it's Emissary mount this directory.
# The app-scanner puts files here for the Emissary to consume. The Emissary puts reports here for the app-scanner to consume.
# This environment variable and the same value is also required by the App Emissary in the purpleteam-s2-containers project.
HOST_DIR_APP_SCANNER=</mnt/your-spare-drive/purpleteam-app-scanner>
38 changes: 24 additions & 14 deletions src/api/orchestration/models/orchestrate.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,16 @@ class Orchestrate {
this.#testerModels.forEach((tM) => tM.init(this.#testersConfig[tM.name]));
}

#archiveOutcomes() {
async #archiveOutcomes() {
// For a lib based and richer solution: https://github.com/archiverjs/node-archiver
const { compressionLvl, fileName, dir } = this.#outcomesConfig;
this.#log.info(`About to write outcomes file "${fileName}" to dir "${dir}"`, { tags: ['orchestrate'] });
exec(`zip -r ${compressionLvl} ${fileName} *`, { cwd: dir }, (error, stdout, stderr) => {
if (error) {
this.#log.error(`Error occurred archiving the outcomes: ${error}.`, { tags: ['orchestrate'] });
return;
}

!!stdout && this.#log.info(`Archiving the outcomes, stdout:\n${stdout}`, { tags: ['orchestrate'] });
!!stderr && this.#log.info(`Archiving the outcomes, stderr:\n${stderr}`, { tags: ['orchestrate'] });
return new Promise((resolve, reject) => {
exec(`zip -r ${compressionLvl} ${fileName} *`, { cwd: dir }, (error, stdout, stderr) => {
if (error) reject(new Error(`Error occurred archiving the outcomes. The error was: ${error}.`));
if (stderr) reject(new Error(`Archiving the outcomes, stderr:\n${stderr}`));
resolve(`Archiving the outcomes, stdout:\n${stdout}`);
});
});
}

Expand Down Expand Up @@ -118,21 +116,27 @@ class Orchestrate {
}, this.#coolDownTimeout);
}

#sseTesterWatcherCallback(chan, message, respToolkit) {
async #sseTesterWatcherCallback(chan, message, respToolkit) {
const response = respToolkit.response(message);
const update = Bourne.parse(response.source);
const { dataMap, allTestSessionsOfAllTestersFinished } = this.#processTesterFeedbackMessageForCli({ update, chan });
if (allTestSessionsOfAllTestersFinished) {
this.resetTesters({});
this.#archiveOutcomes();
await this.#archiveOutcomes()
.then((result) => {
this.#log.info(result, { tags: ['orchestrate'] });
})
.catch((err) => {
this.#log.error(err, { tags: ['orchestrate'] });
});
}
respToolkit.event({ id: update.timestamp, event: update.event, data: dataMap });
// Close event stream if all Testers finished. null makes stream emit it's `end` event.
allTestSessionsOfAllTestersFinished && setTimeout(() => { respToolkit.event(null); }, 10000);
// Now we just close from client side, so client doesn't keep trying to re-establish.
}

#lpTesterWatcherCallback(chan, message) {
async #lpTesterWatcherCallback(chan, message) {
// We use event 'testerMessage' when the Redis client returns a nil multi-bulk (The event type is arbitrary if there was no message) and 'testerMessage' is the easiest to handle in the CLI.
// This is what happens when we blpop (blocking lpop) and it times out waiting for a message to be available on the given list.
// So there is actually no message published from any Tester.
Expand All @@ -141,7 +145,13 @@ class Orchestrate {
const { dataMap, allTestSessionsOfAllTestersFinished } = this.#processTesterFeedbackMessageForCli({ update, chan });
if (allTestSessionsOfAllTestersFinished) {
this.resetTesters({});
this.#archiveOutcomes();
await this.#archiveOutcomes()
.then((result) => {
this.#log.info(result, { tags: ['orchestrate'] });
})
.catch((err) => {
this.#log.error(err, { tags: ['orchestrate'] });
});
}
update.data = dataMap;
return update;
Expand Down Expand Up @@ -265,7 +275,7 @@ class Orchestrate {
}

async #getTesterMessages(channel) {
const testerWatcherCallbackClosure = (chan, message) => this.#lpTesterWatcherCallback(chan, message);
const testerWatcherCallbackClosure = async (chan, message) => this.#lpTesterWatcherCallback(chan, message);
const testerMessageSet = await this.#testerWatcher.pollTesterMessages(channel, testerWatcherCallbackClosure);
return testerMessageSet;
}
Expand Down
6 changes: 5 additions & 1 deletion src/api/orchestration/subscribers/testerWatcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ const pollTesterMessages = async (redisChannel, callback) => {
// then return the CLI ready set of messages.
const testerMessageSet = await getTesterMessages(redisList);

const cliAfiedTesterMessageSet = testerMessageSet.map((tM) => callback(redisChannel, tM));
const cliAfiedTesterMessageSet = await testerMessageSet.reduce(async (accum, tM) => {
const results = await accum;
return [...results, await callback(redisChannel, tM)];
}, []);

return cliAfiedTesterMessageSet;
};

Expand Down

0 comments on commit 3800046

Please sign in to comment.