Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/cloud/improve post process #384

Merged
merged 15 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions frontend/build/asset-manifest.json
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
{
"files": {
"main.css": "/static/css/main.341bf264.css",
"main.js": "/static/js/main.7a24960b.js",
"main.js": "/static/js/main.9adb5116.js",
"static/js/47.145ba9bc.chunk.js": "/static/js/47.145ba9bc.chunk.js",
"static/media/logo.png": "/static/media/logo.e44efe3246aecc6d6219.png",
"index.html": "/index.html",
"main.341bf264.css.map": "/static/css/main.341bf264.css.map",
"main.7a24960b.js.map": "/static/js/main.7a24960b.js.map",
"main.9adb5116.js.map": "/static/js/main.9adb5116.js.map",
"47.145ba9bc.chunk.js.map": "/static/js/47.145ba9bc.chunk.js.map"
},
"entrypoints": [
"static/css/main.341bf264.css",
"static/js/main.7a24960b.js"
"static/js/main.9adb5116.js"
]
}
2 changes: 1 addition & 1 deletion frontend/build/index.html
Original file line number Diff line number Diff line change
@@ -1 +1 @@
<!doctype html><html lang="en"><head><meta charset="utf-8"/><link rel="icon" href="/static/favicon.ico"/><meta name="viewport" content="width=device-width,initial-scale=1"/><meta name="theme-color" content="#000000"/><meta name="description" content="Web site created using create-react-app"/><link rel="manifest" href="/static/manifest.json"/><title>OptiNiSt</title><script defer="defer" src="/static/js/main.7a24960b.js"></script><link href="/static/css/main.341bf264.css" rel="stylesheet"></head><body><noscript>You need to enable JavaScript to run this app.</noscript><div id="root"></div></body></html>
<!doctype html><html lang="en"><head><meta charset="utf-8"/><link rel="icon" href="/static/favicon.ico"/><meta name="viewport" content="width=device-width,initial-scale=1"/><meta name="theme-color" content="#000000"/><meta name="description" content="Web site created using create-react-app"/><link rel="manifest" href="/static/manifest.json"/><title>OptiNiSt</title><script defer="defer" src="/static/js/main.9adb5116.js"></script><link href="/static/css/main.341bf264.css" rel="stylesheet"></head><body><noscript>You need to enable JavaScript to run this app.</noscript><div id="root"></div></body></html>

Large diffs are not rendered by default.

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions frontend/src/store/slice/Pipeline/PipelineType.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,10 @@ export const RUN_BTN_LABELS = {
[RUN_BTN_OPTIONS.RUN_NEW]: "RUN ALL",
[RUN_BTN_OPTIONS.RUN_ALREADY]: "RUN",
} as const

export const PROCESS_TYPE = {
POST_PROCESS: {
id: "post_process_0",
name: "post_process",
},
} as const
9 changes: 9 additions & 0 deletions frontend/src/store/slice/Pipeline/PipelineUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
NodeResultError,
NodeResultSuccess,
NodeResult,
PROCESS_TYPE,
} from "store/slice/Pipeline/PipelineType"

export function isNodeResultPending(
Expand Down Expand Up @@ -52,6 +53,14 @@ export function getInitialRunResult(runPostData: RunPostData) {
name: data?.label ?? "",
}
})

// NOTE: Force the addition of a parameter here
// to confirm post_process processing.
initialResult[PROCESS_TYPE.POST_PROCESS.id] = {
status: NODE_RESULT_STATUS.PENDING,
name: PROCESS_TYPE.POST_PROCESS.name,
}

return initialResult
}

Expand Down
2 changes: 1 addition & 1 deletion studio/app/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ else:

# make rule of post_process
elif details["type"] in [
ProcessType.POST_PROCESS,
ProcessType.POST_PROCESS.type,
]:
rule:
name:
Expand Down
23 changes: 23 additions & 0 deletions studio/app/common/core/experiment/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,28 @@ class ExptConfig:
success: Optional[str]
hasNWB: bool
function: Dict[str, ExptFunction]
procs: Optional[Dict[str, ExptFunction]]
nwb: NWBParams
snakemake: SmkParam


@dataclass
class ExptOutputPathIds:
output_dir: Optional[str] = None
workspace_id: Optional[str] = None
unique_id: Optional[str] = None
function_id: Optional[str] = None

def __post_init__(self):
"""
Extract each ID from output_path
- output_dir format
- {DIRPATH.OUTPUT_DIR}/{workspace_id}/{unique_id}/{function_id}
"""
if self.output_dir:
splitted_path = self.output_dir.split("/")
else:
splitted_path = []

if len(splitted_path) >= 3:
self.workspace_id, self.unique_id, self.function_id = splitted_path[-3:]
7 changes: 7 additions & 0 deletions studio/app/common/core/experiment/experiment_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def __init__(self):
self._success = None
self._hasNWB = False
self._function = {}
self._procs = {}
self._nwbfile = None
self._snakemake = None

Expand All @@ -23,6 +24,7 @@ def set_config(self, config: ExptConfig) -> "ExptConfigBuilder":
self._success = config.success
self._hasNWB = config.hasNWB
self._function = config.function
self._procs = config.procs
self._nwbfile = config.nwb
self._snakemake = config.snakemake
return self
Expand Down Expand Up @@ -55,6 +57,10 @@ def set_function(self, function) -> "ExptConfigBuilder":
self._function = function
return self

def set_procs(self, function) -> "ExptConfigBuilder":
self._procs = function
return self

def set_nwbfile(self, nwbfile) -> "ExptConfigBuilder":
self._nwbfile = nwbfile
return self
Expand All @@ -73,6 +79,7 @@ def build(self) -> ExptConfig:
success=self._success,
hasNWB=self._hasNWB,
function=self._function,
procs=self._procs,
nwb=self._nwbfile,
snakemake=self._snakemake,
)
6 changes: 6 additions & 0 deletions studio/app/common/core/experiment/experiment_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@ def read(cls, filepath) -> ExptConfig:
success=config.get("success", "running"),
hasNWB=config["hasNWB"],
function=cls.read_function(config["function"]),
procs=cls.read_function(config.get("procs")),
nwb=config.get("nwb"),
snakemake=config.get("snakemake"),
)

@classmethod
def read_function(cls, config) -> Dict[str, ExptFunction]:
# Assuming the case where an empty value is specified, check here.
# (For backward compatibility with config yaml)
if not config:
return {}

return {
key: ExptFunction(
unique_id=value["unique_id"],
Expand Down
18 changes: 16 additions & 2 deletions studio/app/common/core/experiment/experiment_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from studio.app.common.core.experiment.experiment_reader import ExptConfigReader
from studio.app.common.core.utils.config_handler import ConfigWriter
from studio.app.common.core.utils.filepath_creater import join_filepath
from studio.app.common.core.workflow.workflow import ProcessType
from studio.app.common.core.workflow.workflow_reader import WorkflowConfigReader
from studio.app.const import DATE_FORMAT
from studio.app.dir_path import DIRPATH
Expand Down Expand Up @@ -48,7 +49,8 @@ def write(self) -> None:
else:
self.create_config()

self.function_from_nodeDict()
self.build_function_from_nodeDict()
self.build_procs()

ConfigWriter.write(
dirname=join_filepath(
Expand Down Expand Up @@ -77,7 +79,7 @@ def add_run_info(self) -> ExptConfig:
.build()
)

def function_from_nodeDict(self) -> ExptConfig:
def build_function_from_nodeDict(self) -> ExptConfig:
func_dict: Dict[str, ExptFunction] = {}
node_dict = WorkflowConfigReader.read(
join_filepath(
Expand All @@ -102,6 +104,17 @@ def function_from_nodeDict(self) -> ExptConfig:

return self.builder.set_function(func_dict).build()

def build_procs(self) -> ExptConfig:
target_procs = [ProcessType.POST_PROCESS]
func_dict: Dict[str, ExptFunction] = {}

for proc in target_procs:
func_dict[proc.id] = ExptFunction(
unique_id=proc.id, name=proc.label, hasNWB=False, success="running"
)

return self.builder.set_procs(func_dict).build()


class ExptDataWriter:
def __init__(
Expand Down Expand Up @@ -143,6 +156,7 @@ def rename(self, new_name: str) -> ExptConfig:
success=config.get("success", "running"),
hasNWB=config["hasNWB"],
function=ExptConfigReader.read_function(config["function"]),
procs=ExptConfigReader.read_function(config.get("procs")),
nwb=config.get("nwb"),
snakemake=config.get("snakemake"),
)
82 changes: 57 additions & 25 deletions studio/app/common/core/rules/post_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@
import json
import os
import sys
import traceback
from os.path import abspath, dirname
from pathlib import Path

ROOT_DIRPATH = dirname(dirname(dirname(dirname(dirname(dirname(abspath(__file__)))))))

sys.path.append(ROOT_DIRPATH)

from studio.app.common.core.experiment.experiment import ExptOutputPathIds
from studio.app.common.core.logger import AppLogger
from studio.app.common.core.rules.runner import Runner
from studio.app.common.core.snakemake.smk import Rule
from studio.app.common.core.snakemake.snakemake_reader import RuleConfigReader
from studio.app.common.core.storage.remote_storage_controller import (
RemoteStorageController,
Expand All @@ -26,35 +29,64 @@

logger = AppLogger.get_logger()


class PostProcessRunner:
@classmethod
def run(cls, __rule: Rule):
try:
logger.info("start post_process runner")

# Get input data for a rule.
# Note:
# - Check if all node data can be successfully retrieved.
# If there is an error in any node, an AssertionError is generated here.
# - read_input_info() is used to determine if there is an error,
# and the return value is not used here.
Runner.read_input_info(__rule.input)

# Operate remote storage.
if RemoteStorageController.use_remote_storage():
# Get workspace_id, unique_id from output file path
ids = ExptOutputPathIds(dirname(__rule.output))
workspace_id = ids.workspace_id
unique_id = ids.unique_id

# 処理結果データの外部ストレージへのデータ転送
# TODO: データ転送の実施要否の判定追加(環境変数想定)
remote_storage_controller = RemoteStorageController()
remote_storage_controller.upload_experiment(workspace_id, unique_id)
else:
logger.debug("remote storage is unused in post_process.")

# 処理結果ファイルを保存
output_path = __rule.output
output_info = {"success": True} # TODO: より適切な情報の記録に修正想定
PickleWriter.write(output_path, output_info)

except Exception as e:
"""
Note: The code here is the same as in the except section of Runner.run
"""
err_msg = list(traceback.TracebackException.from_exception(e).format())

# show full trace to console
logger.error("\n".join(err_msg))

# save msg for GUI
PickleWriter.write(__rule.output, err_msg)


if __name__ == "__main__":
# TODO: debug log.
logger.debug(
">>>>>>>>> Post process logging\n"
">>>>>>>>>>>>>>>>>>>> Post process logging\n"
f"[snakemake.input: {snakemake.input}]\n"
f"[snakemake.output: {snakemake.output}]\n"
)

# Get input data for a rule.
# Note: Check if all node data can be successfully retrieved.
# If there is an error in any node, an AssertionError is generated here.
input_info = Runner.read_input_info(snakemake.input)
del input_info

# Operate remote storage.
if RemoteStorageController.use_remote_storage():
# Get workspace_id, unique_id
# TODO: 関数(Utility)化
rule_config = RuleConfigReader.read(snakemake.params.name)
workspace_id, unique_id = rule_config.output.split("/")[0:2]

# 処理結果データの外部ストレージへのデータ転送
# TODO: データ転送の実施要否の判定追加(環境変数想定)
remote_storage_controller = RemoteStorageController()
remote_storage_controller.upload_experiment(workspace_id, unique_id)
else:
logger.debug("remote storage is unused in post-process.")

# 処理結果ファイルを保存
output_path = str(snakemake.output)
output_info = {"success": True} # TODO: より適切な情報の記録に要修正
PickleWriter.write(output_path, output_info)
rule_config = RuleConfigReader.read(snakemake.params.name)

rule_config.input = snakemake.input
rule_config.output = snakemake.output[0]

PostProcessRunner.run(rule_config)
6 changes: 6 additions & 0 deletions studio/app/common/core/snakemake/smk.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ class FlowConfig:
last_output: list


class NormalRun(BaseModel):
id: str
type: str
label: str


class ForceRun(BaseModel):
nodeId: str
name: str
Expand Down
2 changes: 1 addition & 1 deletion studio/app/common/core/snakemake/smk_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def conda(cls, details):
return None
# skip conda for process-event
elif details["type"] in [
ProcessType.POST_PROCESS,
ProcessType.POST_PROCESS.type,
]:
return None

Expand Down
31 changes: 29 additions & 2 deletions studio/app/common/core/snakemake/snakemake_executor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import os
from collections import deque
from typing import Dict
from typing import Dict, List

from snakemake import snakemake

from studio.app.common.core.snakemake.smk import SmkParam
from studio.app.common.core.snakemake.smk import ForceRun, SmkParam
from studio.app.common.core.snakemake.smk_status_logger import SmkStatusLogger
from studio.app.common.core.utils.filepath_creater import get_pickle_file, join_filepath
from studio.app.common.core.workflow.workflow import Edge, Node
Expand Down Expand Up @@ -75,3 +75,30 @@ def delete_dependencies(
for edge in edgeDict.values():
if node_id == edge.source:
queue.append(edge.target)


def delete_procs_dependencies(
workspace_id: str,
unique_id: str,
forceRunList: List[ForceRun],
):
"""
Delete procs (ExptConfig.procs) dependencies
"""

for proc in forceRunList:
# delete pickle
pickle_filepath = join_filepath(
[
DIRPATH.OUTPUT_DIR,
get_pickle_file(
workspace_id=workspace_id,
unique_id=unique_id,
node_id=proc.nodeId,
algo_name=proc.name,
),
]
)

if os.path.exists(pickle_filepath):
os.remove(pickle_filepath)
4 changes: 3 additions & 1 deletion studio/app/common/core/snakemake/snakemake_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ def post_process(self) -> Rule:
pp_input = self._node.data.path.copy() # do explicit copy

return (
self.builder.set_input(pp_input).set_type(ProcessType.POST_PROCESS).build()
self.builder.set_input(pp_input)
.set_type(ProcessType.POST_PROCESS.type)
.build()
)

def get_return_name(self) -> str or None:
Expand Down
Loading
Loading