import os
from copy import (
deepcopy,
)
from pathlib import (
Path,
)
from typing import (
Any,
Dict,
List,
Optional,
Set,
Type,
Union,
)
from dflow import (
InputArtifact,
InputParameter,
Inputs,
OPTemplate,
OutputArtifact,
OutputParameter,
Outputs,
Step,
Steps,
if_expression,
)
from dflow.python import (
OP,
PythonOPTemplate,
)
from dpgen2.utils.step_config import init_executor
from pfd.op import ModelTestOP, EvalConv, StageScheduler
[docs]
class ExplDistBlock(Steps):
def __init__(
self,
name: str,
prep_run_explore_op: OPTemplate,
prep_run_train_op: OPTemplate,
collect_data_op: Type[OP],
inference_op: Type[OP],
inference_config: dict,
model_test_config: dict,
collect_data_config: dict,
upload_python_packages: Optional[List[os.PathLike]] = None,
):
self._input_parameters = {
"block_id": InputParameter(),
"type_map": InputParameter(),
"mass_map": InputParameter(),
"expl_tasks": InputParameter(),
"numb_models": InputParameter(type=int),
"template_script": InputParameter(),
"train_config": InputParameter(),
"explore_config": InputParameter(),
"inference_config": InputParameter(),
"converge_config": InputParameter(value={}),
"conf_filters_conv": InputParameter(),
"collect_data_config": InputParameter(value={}),
"type_map_train": InputParameter(),
}
self._input_artifacts = {
"systems": InputArtifact(),
"teacher_model": InputArtifact(),
"init_data": InputArtifact(optional=True),
"iter_data": InputArtifact(optional=True), # empty list
"validation_data": InputArtifact(optional=True),
}
self._output_parameters = {
"converged": OutputParameter(),
"report": OutputParameter(default=None),
}
self._output_artifacts = {
"iter_data": OutputArtifact(),
"dist_model": OutputArtifact(),
"dp_test_report": OutputArtifact(),
}
super().__init__(
name=name,
inputs=Inputs(
parameters=self._input_parameters, artifacts=self._input_artifacts
),
outputs=Outputs(
parameters=self._output_parameters,
artifacts=self._output_artifacts,
),
)
self = _expl_dist_cl(
self,
name,
prep_run_explore_op,
prep_run_train_op,
collect_data_op,
inference_op,
inference_config,
model_test_config,
collect_data_config,
upload_python_packages,
)
@property
def input_parameters(self):
return self._input_parameters
@property
def input_artifacts(self):
return self._input_artifacts
@property
def output_parameters(self):
return self._output_parameters
@property
def output_artifacts(self):
return self._output_artifacts
pass
[docs]
class ExplDistLoop(Steps):
def __init__(
self,
name: str,
expl_dist_blk_op: OPTemplate,
scheduler_config: dict,
upload_python_packages: Optional[List[os.PathLike]] = None,
):
self._input_parameters = {
"block_id": InputParameter(value="000"),
"type_map": InputParameter(),
"mass_map": InputParameter(),
"conf_selector": InputParameter(value={}),
"numb_models": InputParameter(type=int),
"template_script": InputParameter(),
"train_config": InputParameter(),
"explore_config": InputParameter(),
"converge_config": InputParameter(value={}),
"conf_filters_conv": InputParameter(),
"inference_config": InputParameter(),
"test_size": InputParameter(value=0.1),
"type_map_train": InputParameter(),
"scheduler": InputParameter(),
"converged": InputParameter(value=False),
"report": InputParameter(value=None),
}
self._input_artifacts = {
"systems": InputArtifact(), # starting systems for model deviation
"teacher_model": InputArtifact(), # model for exploration
"init_data": InputArtifact(), # initial data for model finetune
"iter_data": InputArtifact(
optional=True
), # datas collected during previous exploration
"current_model": InputArtifact(optional=True),
}
self._output_parameters = {"report": OutputParameter(default=None)}
self._output_artifacts = {
"dist_model": OutputArtifact(),
"iter_data": OutputArtifact(),
}
super().__init__(
name=name,
inputs=Inputs(
parameters=self._input_parameters, artifacts=self._input_artifacts
),
outputs=Outputs(
parameters=self._output_parameters,
artifacts=self._output_artifacts,
),
)
self = _loop(
self,
name=name,
expl_dist_blk_op=expl_dist_blk_op,
scheduler_config=scheduler_config,
upload_python_packages=upload_python_packages,
)
@property
def input_parameters(self):
return self._input_parameters
@property
def input_artifacts(self):
return self._input_artifacts
@property
def output_parameters(self):
return self._output_parameters
@property
def output_artifacts(self):
return self._output_artifacts
def _expl_dist_cl(
steps,
name: str,
prep_run_explore_op: OPTemplate,
prep_run_train_op: OPTemplate,
collect_data_op: Type[OP],
inference_op: Type[OP],
inference_config: dict,
model_test_config: dict,
collect_data_config: dict,
upload_python_packages: Optional[List[os.PathLike]] = None,
):
inference_config = deepcopy(inference_config)
inference_template_config = inference_config.pop("template_config")
inference_executor = init_executor(inference_config.pop("executor"))
model_test_config = deepcopy(model_test_config)
model_test_template_config = model_test_config.pop("template_config")
model_test_executor = init_executor(model_test_config.pop("executor"))
# essentially for utillity
collect_data_config = deepcopy(collect_data_config)
collect_data_template_config = collect_data_config.pop("template_config")
collect_data_executor = init_executor(collect_data_config.pop("executor"))
prep_run_explore = Step(
name + "-prep-run-explore",
template=prep_run_explore_op,
parameters={
"block_id": steps.inputs.parameters["block_id"],
"explore_config": steps.inputs.parameters["explore_config"],
"type_map": steps.inputs.parameters["type_map"],
"expl_task_grp": steps.inputs.parameters["expl_tasks"],
},
artifacts={"models": steps.inputs.artifacts["teacher_model"]},
key="--".join(["%s" % steps.inputs.parameters["block_id"], "prep-run-explore"]),
)
steps.add(prep_run_explore)
collect_data = Step(
name + "-collect-data-traj",
template=PythonOPTemplate(
collect_data_op,
python_packages=upload_python_packages,
**collect_data_template_config,
),
parameters={
"type_map": steps.inputs.parameters["type_map"],
"optional_parameters": {"labeled_data": False}
# steps.inputs.parameters["collect_data_config"],
},
artifacts={
"systems": prep_run_explore.outputs.artifacts["trajs"],
# "additional_multi_systems": steps.inputs.artifacts["iter_data"],
},
key="--".join(
["%s" % steps.inputs.parameters["block_id"], "collect-data-traj"]
),
executor=collect_data_executor,
**collect_data_config,
)
steps.add(collect_data)
inference_train = Step(
name + "-inference",
template=PythonOPTemplate(
inference_op,
python_packages=upload_python_packages,
**inference_template_config,
),
parameters={
"inference_config": steps.inputs.parameters["inference_config"],
"type_map": steps.inputs.parameters["type_map"],
},
artifacts={
"systems": collect_data.outputs.artifacts["systems"],
"model": steps.inputs.artifacts["teacher_model"][0],
},
key="--".join(["%s" % steps.inputs.parameters["block_id"], "inference"]),
executor=inference_executor,
)
steps.add(inference_train)
collect_data_train = Step(
name + "-collect-data",
template=PythonOPTemplate(
collect_data_op,
python_packages=upload_python_packages,
**collect_data_template_config,
),
parameters={
"type_map": steps.inputs.parameters["type_map"],
"optional_parameters": steps.inputs.parameters["collect_data_config"],
},
artifacts={
"systems": inference_train.outputs.artifacts["labeled_systems"],
"additional_multi_systems": steps.inputs.artifacts["iter_data"],
},
key="--".join(["%s" % steps.inputs.parameters["block_id"], "collect-data"]),
executor=collect_data_executor,
**collect_data_config,
)
steps.add(collect_data_train)
prep_run_dp = Step(
name + "-prep-run-train",
template=prep_run_train_op,
parameters={
"block_id": steps.inputs.parameters["block_id"],
"train_config": steps.inputs.parameters["train_config"],
"numb_models": steps.inputs.parameters["numb_models"],
"template_script": steps.inputs.parameters["template_script"],
},
artifacts={
"init_data": steps.inputs.artifacts["init_data"],
"iter_data": collect_data_train.outputs.artifacts["multi_systems"],
},
key="--".join(["%s" % steps.inputs.parameters["block_id"], "prep-run-train"]),
)
steps.add(prep_run_dp)
# the exploration steps for validation
dp_test = Step(
name + "-test-model",
template=PythonOPTemplate(
ModelTestOP,
python_packages=upload_python_packages,
**model_test_template_config,
),
parameters={
"inference_config": {
"model": "dp"
}, # ft_steps.inputs.parameters["inference_config"],
"type_map": steps.inputs.parameters["type_map"],
},
artifacts={
"systems": collect_data_train.outputs.artifacts["test_systems"],
"model": prep_run_dp.outputs.artifacts["models"][0],
},
key="--".join(["%s" % steps.inputs.parameters["block_id"], "test-model"]),
executor=model_test_executor,
)
steps.add(dp_test)
evaluate = Step(
name + "-check-converge",
template=PythonOPTemplate(
EvalConv,
python_packages=upload_python_packages,
**collect_data_template_config,
),
parameters={
"config": steps.inputs.parameters["converge_config"],
"test_res": dp_test.outputs.parameters["test_res"],
},
artifacts={"systems": collect_data_train.outputs.artifacts["test_systems"]},
key="--".join(["%s" % steps.inputs.parameters["block_id"], "check-converge"]),
**collect_data_config,
)
steps.add(evaluate)
steps.outputs.artifacts["iter_data"]._from = collect_data_train.outputs.artifacts[
"multi_systems"
]
steps.outputs.artifacts["dist_model"]._from = prep_run_dp.outputs.artifacts[
"models"
][0]
steps.outputs.artifacts["dp_test_report"]._from = dp_test.outputs.artifacts[
"test_report"
]
steps.outputs.parameters[
"converged"
].value_from_parameter = evaluate.outputs.parameters["converged"]
steps.outputs.parameters[
"report"
].value_from_parameter = evaluate.outputs.parameters["report"]
return steps
def _loop(
loop, # the loop Steps
name: str,
expl_dist_blk_op: OPTemplate,
scheduler_config: dict,
upload_python_packages: Optional[List[os.PathLike]] = None,
):
scheduler_config = deepcopy(scheduler_config)
scheduler_template_config = scheduler_config.pop("template_config")
scheduler_executor = init_executor(scheduler_config.pop("executor"))
# add a stage counter
stage_scheduler = Step(
name="stage-scheduler",
template=PythonOPTemplate(
StageScheduler,
python_packages=upload_python_packages,
**scheduler_template_config,
),
parameters={
"converged": loop.inputs.parameters["converged"],
"scheduler": loop.inputs.parameters["scheduler"],
"report": loop.inputs.parameters["report"],
},
artifacts={
"systems": loop.inputs.artifacts["systems"],
"init_model": loop.inputs.artifacts["teacher_model"],
},
key="--".join(["iter-%s" % loop.inputs.parameters["block_id"], "scheduler"]),
executor=scheduler_executor,
**scheduler_config,
)
loop.add(stage_scheduler)
expl_dist_blk = Step(
name=name + "-exploration-dist",
template=expl_dist_blk_op,
parameters={
"block_id": "iter-%s" % stage_scheduler.outputs.parameters["iter_id"],
"type_map": loop.inputs.parameters["type_map"],
"mass_map": loop.inputs.parameters["mass_map"],
"expl_tasks": stage_scheduler.outputs.parameters["task_grp"],
"converge_config": loop.inputs.parameters["converge_config"],
"conf_filters_conv": loop.inputs.parameters["conf_filters_conv"],
"numb_models": loop.inputs.parameters["numb_models"],
"template_script": loop.inputs.parameters["template_script"],
"train_config": loop.inputs.parameters["train_config"],
"explore_config": loop.inputs.parameters["explore_config"],
"inference_config": loop.inputs.parameters["inference_config"],
"type_map_train": loop.inputs.parameters["type_map_train"],
"collect_data_config": {
"labeled_data": True,
"test_size": loop.inputs.parameters["test_size"],
"multi_sys_name": stage_scheduler.outputs.parameters["iter_id"],
},
},
artifacts={
"systems": loop.inputs.artifacts[
"systems"
], # starting systems for model deviation
"teacher_model": loop.inputs.artifacts[
"teacher_model"
], # model for exploration
"init_data": loop.inputs.artifacts["init_data"],
"iter_data": loop.inputs.artifacts["iter_data"],
},
key="--".join(
["iter-%s" % stage_scheduler.outputs.parameters["iter_id"], "explore-block"]
),
when="%s == false" % (stage_scheduler.outputs.parameters["converged"]),
)
loop.add(expl_dist_blk)
# next iteration
next_parameters = {
"converged": stage_scheduler.outputs.parameters["converged"],
"block_id": stage_scheduler.outputs.parameters["next_iter_id"],
"type_map": loop.inputs.parameters["type_map"],
"mass_map": loop.inputs.parameters["mass_map"],
"converge_config": loop.inputs.parameters["converge_config"],
"conf_filters_conv": loop.inputs.parameters["conf_filters_conv"],
"numb_models": loop.inputs.parameters["numb_models"],
"template_script": loop.inputs.parameters["template_script"],
"train_config": loop.inputs.parameters["train_config"],
"explore_config": loop.inputs.parameters["explore_config"],
"inference_config": loop.inputs.parameters["inference_config"],
"type_map_train": loop.inputs.parameters["type_map_train"],
"scheduler": stage_scheduler.outputs.parameters["scheduler"],
"converged": expl_dist_blk.outputs.parameters["converged"],
"report": expl_dist_blk.outputs.parameters["report"],
"test_size": loop.inputs.parameters["test_size"],
}
next_step = Step(
name=name + "-exploration-finetune-next",
template=loop,
parameters=next_parameters,
artifacts={
"systems": loop.inputs.artifacts["systems"],
"teacher_model": loop.inputs.artifacts["teacher_model"],
"current_model": expl_dist_blk.outputs.artifacts["dist_model"],
"iter_data": expl_dist_blk.outputs.artifacts["iter_data"],
"init_data": loop.inputs.artifacts["init_data"],
},
when="%s == false" % (stage_scheduler.outputs.parameters["converged"],),
key="--".join(
[
"iter-%s" % stage_scheduler.outputs.parameters["next_iter_id"],
"expl-ft-loop",
]
),
)
loop.add(next_step)
loop.outputs.parameters["report"].value_from_expression = if_expression(
_if=stage_scheduler.outputs.parameters["converged"],
_then=loop.inputs.parameters["report"],
_else=next_step.outputs.parameters["report"],
)
loop.outputs.artifacts["dist_model"].from_expression = if_expression(
_if=stage_scheduler.outputs.parameters["converged"],
_then=loop.inputs.artifacts["current_model"],
_else=next_step.outputs.artifacts["dist_model"],
)
loop.outputs.artifacts["iter_data"].from_expression = if_expression(
_if=stage_scheduler.outputs.parameters["converged"],
_then=loop.inputs.artifacts["iter_data"],
_else=next_step.outputs.artifacts["iter_data"],
)
return loop