Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use multi-thread for cloud experiments and mutli-process for local ones #29

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion run_all_experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import logging
import os
import sys
import time
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from multiprocessing import Pool

import run_one_experiment
Expand Down Expand Up @@ -219,11 +221,29 @@ def main():
result = run_experiments(*config)
experiment_results.append(result)
_print_experiment_result(result)
elif args.cloud_experiment_name:
# Use multi-threads for cloud experiments, because each thread only needs to
# wait for cloud build results or conduct simple I/O tasks.
with ThreadPoolExecutor(max_workers=NUM_EXP) as executor:
Copy link
Collaborator

@oliverchang oliverchang Jan 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way we can consolidate the two code paths more?

e.g. something like:

if ...:
  pool =  ThreadPool
else:
  pool = Pool

Not sure what the difference is exactly between ThreadPoolExecutor and ThreadPool

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.ThreadPool

I reckon the doc implies ThreadPoolExecutor is more modern and supports thread-level parallelism better?
I can definitely try to consolidate the two code parts to make them look better.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made an attempt to clean up the related code in run_all_experiments.py a bit, will do the same in run_one_experiment.py.
The code should be more readable and have less repetition, but I did not find a perfect way to consolidate them further without over-engineering this.

Please let me know if there are other options.

# Using list comprehension to submit tasks with arguments
futures = []
for config in experiment_configs:
futures.append(executor.submit(run_experiments, *config))
# Sleep to avoid having a peak CPU usage at the beginning because of
# creating too many threads.
# Approx. 30s is sufficient because these threads will soon become idle
# when waiting for cloud build results.
time.sleep(30)
for future in as_completed(futures):
result = future.result()
_print_experiment_result(result)
experiment_results.append(result)
else:
# Use multi-process for local experiments, because each process needs to
# built fuzz targets in local docker containers.
with Pool(NUM_EXP) as p:
for result in p.starmap(run_experiments, experiment_configs):
experiment_results.append(result)
_print_experiment_result(result)

_print_experiment_results(experiment_results)

Expand Down
34 changes: 26 additions & 8 deletions run_one_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import os
import shutil
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
from multiprocessing import pool
from typing import List, Optional

Expand Down Expand Up @@ -174,14 +175,31 @@ def check_targets(
evaluator = exp_evaluator.Evaluator(builder_runner, benchmark, work_dirs)

ai_target_pairs = [(ai_binary, target) for target in generated_targets]
with pool.ThreadPool(NUM_EVA) as p:
for i, target_stat in enumerate(
p.starmap(evaluator.check_target, ai_target_pairs)):
if target_stat is None:
print(f'Error evaluating target {generated_targets[i]}')
continue

target_stats.append((i, target_stat))
if cloud_experiment_name:
# Use multi-threads for cloud experiments, because each thread only needs to
# wait for cloud build results or conduct simple I/O tasks.
with ThreadPoolExecutor(max_workers=NUM_EVA) as executor:
future_to_index = {
executor.submit(evaluator.check_target, *pair): i
for i, pair in enumerate(ai_target_pairs)
}
for future in as_completed(future_to_index):
i = future_to_index[future]
target_stat = future.result()
if target_stat is None:
print(f'Error evaluating target {generated_targets[i]}')
continue

target_stats.append((i, target_stat))
else:
# Use multi-process for local experiments, because each process needs to
# built fuzz targets in local docker containers.
with pool.ThreadPool(NUM_EVA) as p:
for i, target_stat in enumerate(
p.starmap(evaluator.check_target, ai_target_pairs)):
if target_stat is None:
print(f'Error evaluating target {generated_targets[i]}')
continue

if len(target_stats) > 0:
return aggregate_results(target_stats, generated_targets)
Expand Down