from copy import deepcopy
import os
import copy
from typing import Dict, List, Optional, Union
from pathlib import Path
import json
import dpdata
import dflow
import dpgen2
import pfd
import re
import fpop
import ase
import time
from dflow import ArgoStep, Step, Steps, Workflow, upload_artifact, download_artifact
from pfd.entrypoint.args import (
normalize_infer_args,
normalize_pert_gen,
normalize as normalize_args,
)
from pfd.entrypoint.common import global_config_workflow, expand_idx
from dpgen2.fp import fp_styles
from pfd.train import train_styles
from pfd.exploration import explore_styles
from dpgen2.superop import PrepRunDPTrain, PrepRunFp, PrepRunCaly
from pfd.superop.caly_evo_step import (
CalyEvoStep,
)
from pfd.op import (
PertGen,
CollectData,
InferenceOP,
SelectConfs,
ModelTestOP,
CollRunCaly,
PrepCalyDPOptim,
PrepCalyInput,
CalyEvoStepMerge,
RunCalyDPOptim,
)
from pfd.superop import (
ExplFinetuneLoop,
ExplFinetuneBlock,
ExplDistBlock,
ExplDistLoop,
PrepRunCaly,
)
from pfd.exploration.selector import (
ConfFilters,
ConfSelectorFrames,
conf_filter_styles,
)
from pfd.exploration.converge import ConfFiltersConv, ConfFilterConv
from pfd.exploration.render import TrajRender, TrajRenderLammps
from pfd.exploration.scheduler import Scheduler
from pfd.flow import Distillation, FineTune, DataGen
from pfd.constants import default_image
from pfd.utils.step_config import normalize as normalize_step_dict
from pfd.utils import (
upload_artifact_and_print_uri,
get_artifact_from_uri,
matched_step_key,
sort_slice_ops,
print_keys_in_nice_format,
)
from periodictable import elements
import logging
default_config = normalize_step_dict({"template_config": {"image": default_image}})
[docs]
def get_conf_filters(config):
conf_filters = None
if len(config) > 0:
conf_filters = ConfFilters()
for c in config:
c = deepcopy(c)
conf_filter = conf_filter_styles[c.pop("type")](**c)
conf_filters.add(conf_filter)
return conf_filters
[docs]
def get_conf_filters_conv(config):
conf_filters_conv = ConfFiltersConv()
if len(config) > 0:
for c in config:
c = deepcopy(c)
conf_filter_conv = ConfFilterConv.get_filter(c.pop("type"))(**c)
conf_filters_conv.add(conf_filter_conv)
return conf_filters_conv
[docs]
def make_dist_op(
teacher_model_style: str = "dp",
model_style: str = "dp",
explore_style: str = "lmp",
prep_lmp_config: dict = default_config,
run_lmp_config: dict = default_config,
prep_train_config: dict = default_config,
run_train_config: dict = default_config,
scheduler_config: dict = default_config,
collect_data_config: dict = default_config,
pert_gen_config: dict = default_config,
inference_config: dict = default_config,
model_test_config: dict = default_config,
upload_python_packages: Optional[List[os.PathLike]] = None,
):
"""
Make a super OP template for distillation process
"""
# build lmp op
if teacher_model_style in explore_styles.keys():
if explore_style in explore_styles[teacher_model_style].keys():
prep_run_lmp_op = explore_styles[teacher_model_style][explore_style][
"preprun"
](
"prep-run-explore-step",
explore_styles[teacher_model_style][explore_style]["prep"],
explore_styles[teacher_model_style][explore_style]["run"],
prep_config=prep_lmp_config,
run_config=run_lmp_config,
upload_python_packages=upload_python_packages,
)
else:
raise NotImplementedError(
f"Explore style {explore_style} has not been implemented!"
)
else:
raise NotImplementedError(
f"Model style {teacher_model_style} has not been implemented!"
)
## initiate DP train op
if model_style in train_styles.keys():
prep_run_train_op = PrepRunDPTrain(
"finetune",
train_styles[model_style]["prep"],
train_styles[model_style]["run"],
prep_config=prep_train_config,
run_config=run_train_config,
upload_python_packages=upload_python_packages,
valid_data=None,
)
else:
raise NotImplementedError(
f"Training style {model_style} has not been implemented!"
)
expl_dist_blk_op = ExplDistBlock(
"blk",
prep_run_explore_op=prep_run_lmp_op,
prep_run_train_op=prep_run_train_op,
collect_data_op=CollectData,
inference_op=InferenceOP,
inference_config=inference_config,
model_test_config=model_test_config,
collect_data_config=collect_data_config,
upload_python_packages=upload_python_packages,
)
expl_dist_loop_op = ExplDistLoop(
"loop", expl_dist_blk_op, scheduler_config, upload_python_packages
)
dist_op = Distillation(
"distillation",
PertGen,
expl_dist_loop_op,
pert_gen_config,
upload_python_packages=upload_python_packages,
)
return dist_op
[docs]
def make_data_gen_op(
fp_style: str = "vasp",
prep_fp_config: dict = default_config,
run_fp_config: dict = default_config,
pert_gen_config: dict = default_config,
collect_data_config: dict = default_config,
upload_python_packages: Optional[List[os.PathLike]] = None,
):
"""
Creates a DataGen operation.
Args:
fp_style (str): The style of the force field calculation (default is "vasp").
prep_fp_config (dict): Configuration for preparing the force field calculation.
run_fp_config (dict): Configuration for running the force field calculation.
pert_gen_config (dict): Configuration for perturbation generation.
collect_data_config (dict): Configuration for data collection.
upload_python_packages (Optional[List[os.PathLike]]): List of Python packages to upload.
Returns:
DataGen: An instance of the DataGen class.
"""
## initiate fp op
if fp_style in fp_styles.keys():
prep_run_fp_op = PrepRunFp(
"prep-run-fp",
fp_styles[fp_style]["prep"],
fp_styles[fp_style]["run"],
prep_config=prep_fp_config,
run_config=run_fp_config,
upload_python_packages=upload_python_packages,
)
else:
raise RuntimeError(f"unknown fp_style {fp_style}")
return DataGen(
"data-gen",
PertGen,
prep_run_fp_op,
CollectData,
pert_gen_config,
collect_data_config,
upload_python_packages,
)
[docs]
def make_ft_op(
fp_style: str = "vasp",
train_style: str = "dp",
explore_style: str = "lmp",
pert_gen_step_config: dict = default_config,
prep_fp_config: dict = default_config,
run_fp_config: dict = default_config,
prep_train_config: dict = default_config,
run_train_config: dict = default_config,
prep_explore_config: dict = default_config,
run_explore_config: dict = default_config,
scheduler_config: dict = default_config,
collect_data_step_config: dict = default_config,
select_confs_step_config: dict = default_config,
inference_step_config: dict = default_config,
upload_python_packages: Optional[List[os.PathLike]] = None,
init_training: bool = True,
skip_aimd: bool = True,
):
## initiate fp op
if fp_style in fp_styles.keys():
prep_run_fp_op = PrepRunFp(
"prep-run-fp",
fp_styles[fp_style]["prep"],
fp_styles[fp_style]["run"],
prep_config=prep_fp_config,
run_config=run_fp_config,
upload_python_packages=upload_python_packages,
)
else:
raise RuntimeError(f"unknown fp_style {fp_style}")
## initiate DP train op
if train_style in train_styles.keys():
prep_run_train_op = PrepRunDPTrain(
"finetune",
train_styles[train_style]["prep"],
train_styles[train_style]["run"],
prep_config=prep_train_config,
run_config=run_train_config,
upload_python_packages=upload_python_packages,
valid_data=None,
)
else:
raise NotImplementedError(
f"Training style {train_style} has not been implemented!"
)
if explore_style == "lmp":
prep_run_explore_op = explore_styles[train_style][explore_style]["preprun"](
"prep-run-explore-step",
explore_styles[train_style][explore_style]["prep"],
explore_styles[train_style][explore_style]["run"],
prep_config=prep_explore_config,
run_config=run_explore_config,
upload_python_packages=upload_python_packages,
)
elif "calypso" in explore_style:
expl_mode = explore_style.split(":")[-1] if ":" in explore_style else "default"
# the evolution process is running locally
if expl_mode == "merge":
caly_evo_step_op = CalyEvoStepMerge(
name="caly-evo-step",
collect_run_caly=CollRunCaly,
prep_dp_optim=PrepCalyDPOptim,
run_dp_optim=RunCalyDPOptim,
expl_mode=expl_mode,
prep_config=prep_explore_config,
run_config=run_explore_config,
upload_python_packages=None,
)
elif expl_mode == "default":
caly_evo_step_op = CalyEvoStep(
name="caly-evo-step",
collect_run_caly=CollRunCaly,
prep_dp_optim=PrepCalyDPOptim,
run_dp_optim=RunCalyDPOptim,
expl_mode=expl_mode,
prep_config=prep_explore_config,
run_config=run_explore_config,
upload_python_packages=upload_python_packages,
)
else:
raise KeyError(
f"Unknown key: {explore_style}, support `calypso:default` and `calypso:merge`."
)
prep_run_explore_op = PrepRunCaly(
"prep-run-calypso",
prep_caly_input_op=PrepCalyInput,
caly_evo_step_op=caly_evo_step_op,
# prep_caly_model_devi_op=PrepCalyModelDevi,
# run_caly_model_devi_op=RunCalyModelDevi,
expl_mode=expl_mode,
prep_config=prep_explore_config,
run_config=run_explore_config,
upload_python_packages=upload_python_packages,
)
else:
raise ValueError(f"Explore style {explore_style} has not been implemented!")
expl_ft_blk_op = ExplFinetuneBlock(
name="blk",
prep_run_explore_op=prep_run_explore_op,
prep_run_fp_op=prep_run_fp_op,
collect_data_op=CollectData,
select_confs_op=SelectConfs,
prep_run_train_op=prep_run_train_op,
inference_op=ModelTestOP,
collect_data_step_config=collect_data_step_config,
inference_step_config=inference_step_config,
select_confs_step_config=select_confs_step_config,
upload_python_packages=upload_python_packages,
)
expl_finetune_loop_op = ExplFinetuneLoop(
name="loop",
expl_ft_blk_op=expl_ft_blk_op,
scheduler_config=scheduler_config,
upload_python_packages=upload_python_packages,
)
ft_op = FineTune(
name="fine-tune",
pert_gen_op=PertGen,
prep_run_fp_op=prep_run_fp_op,
collect_data_op=CollectData,
prep_run_dp_train_op=prep_run_train_op,
expl_finetune_loop_op=expl_finetune_loop_op,
pert_gen_step_config=pert_gen_step_config,
collect_data_step_config=collect_data_step_config,
upload_python_packages=upload_python_packages,
init_training=init_training,
skip_aimd=skip_aimd,
)
return ft_op
[docs]
def get_systems_from_data(data, data_prefix=None):
data = [data] if isinstance(data, str) else data
assert isinstance(data, list)
if data_prefix is not None:
data = [os.path.join(data_prefix, ii) for ii in data]
return data
[docs]
class FlowGen:
def __init__(
self,
config: Dict,
debug: bool = False,
download_path: Union[Path, str] = Path("./"),
):
self._download_path = download_path
if debug is True:
os.environ["DFLOW_DEBUG"] = "1"
elif os.environ.get("DFLOW_DEBUG"):
del os.environ["DFLOW_DEBUG"]
self._config = normalize_args(config)
global_config_workflow(self._config)
print("dflow mode: %s" % dflow.config["mode"])
self.workflow = Workflow(name=self._config["name"])
self._wf_type = config["task"].get("type")
if self._wf_type == "dist":
self._set_dist_wf(self._config)
elif self.wf_type == "finetune":
self._set_ft_wf(self._config)
elif self.wf_type == "data_gen":
self._set_data_gen_wf(self._config)
@property
def wf_type(self):
return self._wf_type
@property
def download_path(self):
if isinstance(self._download_path, str):
return Path(self._download_path)
else:
return self._download_path
def _set_data_gen_wf(self, config):
default_config = config["default_step_config"]
prep_fp_config = config["step_configs"].get("perp_fp_config", default_config)
run_fp_config = config["step_configs"].get("run_fp_config", default_config)
run_collect_data_config = config["step_configs"].get(
"collect_data_config", default_config
)
upload_python_packages = []
if custom_packages := config.get("upload_python_packages"):
upload_python_packages.extend(custom_packages)
upload_python_packages.extend(list(dpdata.__path__))
upload_python_packages.extend(list(dflow.__path__))
upload_python_packages.extend(list(pfd.__path__))
upload_python_packages.extend(list(dpgen2.__path__))
upload_python_packages.extend(list(fpop.__path__))
pert_config = config["conf_generation"]
pert_gen_tasks = pert_config.pop("pert_generation")
if isinstance(pert_gen_tasks, dict):
pert_gen_tasks = [pert_gen_tasks]
pert_config["pert_generation"] = pert_gen_tasks
for pert_task in pert_gen_tasks:
pert_task.update(normalize_pert_gen(pert_task))
if config["conf_generation"]["init_confs"]["confs_uri"] is not None:
init_confs = get_artifact_from_uri(
config["conf_generation"]["init_confs"]["confs_uri"]
)
elif config["conf_generation"]["init_confs"]["confs_paths"] is not None:
init_confs_prefix = config["conf_generation"]["init_confs"]["prefix"]
init_confs = config["conf_generation"]["init_confs"]["confs_paths"]
init_confs = get_systems_from_data(init_confs, init_confs_prefix)
init_confs = upload_artifact_and_print_uri(init_confs, "init_confs")
else:
raise RuntimeError("init_confs must be provided")
fp_config = {}
fp_inputs_config = config["fp"]["inputs_config"]
fp_type = config["fp"]["type"]
fp_inputs = fp_styles[fp_type]["inputs"](**fp_inputs_config)
fp_config["inputs"] = fp_inputs
fp_config["run"] = config["fp"]["run_config"]
fp_config["extra_output_files"] = config["fp"].get("extra_output_files")
data_gen_op = make_data_gen_op(
fp_style=fp_type,
prep_fp_config=prep_fp_config,
run_fp_config=run_fp_config,
collect_data_config=run_collect_data_config,
upload_python_packages=upload_python_packages,
)
# type_map = config["inputs"]["type_map"]
data_gen_step = Step(
"data-gen",
template=data_gen_op,
parameters={"pert_config": pert_config, "fp_config": fp_config},
artifacts={
"init_confs": init_confs,
},
)
self.workflow.add(data_gen_step)
def _set_dist_wf(self, config):
"""
Build a workflow from the OP templates of distillation
"""
## get input config
default_step_config = config["default_step_config"]
prep_train_step_config = config["step_configs"].get(
"prep_train_config", default_step_config
)
run_train_step_config = config["step_configs"].get(
"run_train_config", default_step_config
)
prep_lmp_step_config = config["step_configs"].get(
"prep_explore_config", default_step_config
)
run_lmp_step_config = config["step_configs"].get(
"run_explore_config", default_step_config
)
collect_data_step_config = config["step_configs"].get(
"collect_data_config", default_step_config
)
pert_gen_step_config = config["step_configs"].get(
"pert_gen_config", default_step_config
)
inference_step_config = config["step_configs"].get(
"inference_config", copy.deepcopy(run_train_step_config)
)
model_test_config = copy.deepcopy(inference_step_config)
# uploaded python packages
upload_python_packages = []
if custom_packages := config.get("upload_python_packages"):
upload_python_packages.extend(custom_packages)
upload_python_packages.extend(list(dpdata.__path__))
upload_python_packages.extend(list(dflow.__path__))
upload_python_packages.extend(list(pfd.__path__))
upload_python_packages.extend(list(dpgen2.__path__))
## task configs
type_map = config["inputs"]["type_map"]
if config["inputs"].get("mass_map") is not None:
mass_map = config["inputs"]["mass_map"]
else:
mass_map = [getattr(elements, ii).mass for ii in type_map]
pert_config = config["conf_generation"]
pert_gen_tasks = pert_config.pop("pert_generation")
if isinstance(pert_gen_tasks, dict):
pert_gen_tasks = [pert_gen_tasks]
pert_config["pert_generation"] = pert_gen_tasks
for pert_task in pert_gen_tasks:
pert_task.update(normalize_pert_gen(pert_task))
# exploration
explore_style = config["exploration"]["type"]
explore_config = config["exploration"]["config"]
expl_stages = config["exploration"]["stages"]
converge_config = config["exploration"]["convergence"]
max_iter = config["exploration"]["max_numb_iter"]
if max_iter < 1:
raise RuntimeError(
"The max number of iteration must be equal to or larger than 1!"
)
if max_iter > 1:
logging.warning(
(
"In most cases, there is absolutely no need for more than one training iteration for knowledge distillation!"
)
)
while True:
if input("Continue with %d max iterations? (Y/n)" % max_iter) not in [
"Y",
"y",
]:
logging.info("Submission ending...")
return
else:
break
conf_filters_conv = get_conf_filters_conv(converge_config.pop("conf_filter"))
# train (student model) style
train_style = config["train"]["type"]
train_config = config["train"]["config"]
train_config.update(
{
# "init_model_policy": "no",
}
)
# others
collect_data_config = config["exploration"].get("test_set_config", {})
collect_data_config["labeled_data"] = collect_data_config.get(
"labeled_data", False
)
collect_data_config["test_size"] = collect_data_config.get("test_size", 0.1)
## prepare artifacts
# read training template
if isinstance(config["train"]["template_script"], str):
with open(config["train"]["template_script"], "r") as fp:
template_script = json.load(fp)
elif isinstance(config["train"]["template_script"], dict):
template_script = config["train"]["template_script"]
else:
template_script = {}
# init_confs
if config["conf_generation"]["init_confs"]["confs_uri"] is not None:
init_confs = get_artifact_from_uri(
config["conf_generation"]["init_confs"]["confs_uri"]
)
elif config["conf_generation"]["init_confs"]["confs_paths"] is not None:
init_confs_prefix = config["conf_generation"]["init_confs"]["prefix"]
init_confs = config["conf_generation"]["init_confs"]["confs_paths"]
init_confs = get_systems_from_data(init_confs, init_confs_prefix)
init_confs = upload_artifact_and_print_uri(init_confs, "init_confs")
else:
raise RuntimeError("init_confs must be provided")
# teacher models
teacher_model_style = config["inputs"]["base_model_style"]
print("teacher model style: %s" % teacher_model_style)
teacher_models_paths = config["inputs"]["base_model_path"]
if config["inputs"]["base_model_uri"] is not None:
print("Using uploaded model at: ", config["inputs"]["base_model_uri"])
teacher_models = get_artifact_from_uri(config["inputs"]["base_model_uri"])
elif teacher_models_paths is not None:
teacher_models = upload_artifact_and_print_uri(
teacher_models_paths, "teacher_models"
)
else:
raise FileNotFoundError("Teacher model must exist!")
inference_config = config["inference"]
inference_config.update({"model": teacher_model_style})
inference_config = normalize_infer_args(inference_config)
expl_args = explore_styles[teacher_model_style][explore_style]["task_args"]
for stg in expl_stages:
for task_grp in stg:
args = expl_args(task_grp)
task_grp.clear()
task_grp.update(args)
scheduler = Scheduler(
model_style=teacher_model_style,
explore_style=explore_style,
explore_stages=expl_stages,
mass_map=mass_map,
type_map=type_map,
max_iter=max_iter,
)
# init_data
if config["inputs"]["init_data_uri"] is not None:
init_data = get_artifact_from_uri(config["inputs"]["init_data_uri"])
elif config["inputs"]["init_data_sys"] is not None:
init_data_prefix = config["inputs"]["init_data_prefix"]
init_data = config["inputs"]["init_data_sys"]
init_data = get_systems_from_data(init_data, init_data_prefix)
init_data = upload_artifact_and_print_uri(init_data, "init_data")
else:
init_data = upload_artifact([])
print("student model style: %s" % train_style)
# make distillation op
dist_op = make_dist_op(
teacher_model_style=teacher_model_style,
model_style=train_style,
explore_style=explore_style,
prep_lmp_config=prep_lmp_step_config,
run_lmp_config=run_lmp_step_config,
prep_train_config=prep_train_step_config,
run_train_config=run_train_step_config,
collect_data_config=collect_data_step_config,
pert_gen_config=pert_gen_step_config,
inference_config=inference_step_config,
model_test_config=model_test_config,
upload_python_packages=upload_python_packages,
)
# make distillation steps
dist_step = Step(
"distillation",
template=dist_op,
parameters={
"block_id": "dist",
"type_map": type_map,
"mass_map": mass_map,
"pert_config": pert_config,
"numb_models": 1,
"explore_config": explore_config,
"converge_config": converge_config,
"conf_filters_conv": conf_filters_conv,
"template_script": template_script,
"train_config": train_config,
"inference_config": inference_config,
"test_size": collect_data_config["test_size"],
"type_map_train": [],
"scheduler": scheduler,
},
artifacts={
"init_confs": init_confs,
"teacher_model": teacher_models,
"init_data": init_data,
"iter_data": upload_artifact([]),
},
)
self.workflow.add(dist_step)
def _set_ft_wf(self, config: Dict):
"""
Build a workflow from the OP templates of model finetune
"""
default_config = config["default_step_config"]
prep_train_config = config["step_configs"].get(
"prep_train_config", default_config
)
run_train_config = config["step_configs"].get(
"run_train_config", default_config
)
prep_fp_config = config["step_configs"].get("perp_fp_config", default_config)
run_fp_config = config["step_configs"].get("run_fp_config", default_config)
run_collect_data_config = config["step_configs"].get(
"collect_data_config", default_config
)
run_select_confs_config = config["step_configs"].get(
"select_confs_config", default_config
)
run_pert_gen_config = config["step_configs"].get(
"pert_gen_config", default_config
)
run_inference_config = copy.deepcopy(run_train_config)
prep_lmp_config = config["step_configs"].get(
"prep_explore_config", default_config
)
run_lmp_config = config["step_configs"].get(
"run_explore_config", default_config
)
scheduler_config = default_config
# uploaded python packages
upload_python_packages = []
if custom_packages := config.get("upload_python_packages"):
upload_python_packages.extend(custom_packages)
upload_python_packages.extend(list(dpdata.__path__))
upload_python_packages.extend(list(dflow.__path__))
upload_python_packages.extend(list(pfd.__path__))
upload_python_packages.extend(list(dpgen2.__path__))
upload_python_packages.extend(list(fpop.__path__))
upload_python_packages.extend(list(ase.__path__))
## task configs
type_map = config["inputs"]["type_map"]
if config["inputs"].get("mass_map") is not None:
mass_map = config["inputs"]["mass_map"]
else:
mass_map = [getattr(elements, ii).mass for ii in type_map]
train_style = config["train"]["type"]
train_config = config["train"]["config"]
numb_models = 1
pert_config = config["conf_generation"]
pert_gen_tasks = pert_config.pop("pert_generation")
if isinstance(pert_gen_tasks, dict):
pert_gen_tasks = [pert_gen_tasks]
pert_config["pert_generation"] = pert_gen_tasks
for pert_task in pert_gen_tasks:
pert_task.update(normalize_pert_gen(pert_task))
explore_style = config["exploration"]["type"]
expl_stages = config["exploration"]["stages"]
explore_config = config["exploration"]["config"]
max_iter = config["exploration"]["max_numb_iter"]
converge_config = config["exploration"]["convergence"]
# conf selectors
conf_filters = get_conf_filters(config["exploration"]["filter"])
render = TrajRender.get_driver(explore_style)()
conf_selector = ConfSelectorFrames(
render, config["fp"]["task_max"], conf_filters
)
conf_filters_conv = get_conf_filters_conv(converge_config.pop("conf_filter"))
# task
init_training = config["task"]["init_training"]
if init_training is False:
print("No initial training before exploration")
skip_aimd = config["task"]["skip_aimd"]
if skip_aimd is True:
print("AIMD exploration is skipped!")
recursive_finetune = config["task"]["recursive"]
if recursive_finetune is True:
if init_training is False:
raise RuntimeError(
"Initial training is required for recursive finetune!"
)
print("Recursive fine-tune, using init_training policy!")
# do not use init model policy in the initial training or naive finetuen mode
train_config.update({"init_model_policy": "no"})
collect_data_config = {}
collect_data_config["test_size"] = config["conf_generation"].get(
"test_data", 0.05
)
collect_data_config["system_partition"] = config["conf_generation"].get(
"system_partition", False
)
collect_data_config["labeled_data"] = True
## prepare artifacts
# read training template
with open(config["train"]["template_script"], "r") as fp:
template_script = json.load(fp)
# init_confs
if config["conf_generation"]["init_confs"]["confs_uri"] is not None:
init_confs = get_artifact_from_uri(
config["conf_generation"]["init_confs"]["confs_uri"]
)
elif config["conf_generation"]["init_confs"]["confs_paths"] is not None:
init_confs_prefix = config["conf_generation"]["init_confs"]["prefix"]
init_confs = config["conf_generation"]["init_confs"]["confs_paths"]
init_confs = get_systems_from_data(init_confs, init_confs_prefix)
init_confs = upload_artifact_and_print_uri(init_confs, "init_confs")
else:
raise RuntimeError("init_confs must be provided")
expl_args = explore_styles[train_style][explore_style.split(":")[0]][
"task_args"
]
for stg in expl_stages:
for task_grp in stg:
args = expl_args(task_grp)
task_grp.clear()
task_grp.update(args)
scheduler = Scheduler(
model_style=train_style,
explore_style=explore_style.split(":")[0],
explore_stages=expl_stages,
mass_map=mass_map,
type_map=type_map,
max_iter=max_iter,
train_config=train_config,
recursive_finetune=recursive_finetune,
)
# init_data
if config["inputs"]["init_data_uri"] is not None:
init_data = get_artifact_from_uri(config["inputs"]["init_data_uri"])
elif config["inputs"]["init_data_sys"] is not None:
init_data_prefix = config["inputs"]["init_data_prefix"]
init_data = config["inputs"]["init_data_sys"]
init_data = get_systems_from_data(init_data, init_data_prefix)
init_data = upload_artifact_and_print_uri(init_data, "init_data")
else:
init_data = upload_artifact([])
iter_data = upload_artifact([])
init_models_paths = config["inputs"]["base_model_path"]
if config["inputs"]["base_model_uri"] is not None:
print("Using uploaded model at: ", config["inputs"]["base_model_uri"])
init_models = get_artifact_from_uri(config["inputs"]["base_model_uri"])
elif init_models_paths:
init_models = upload_artifact_and_print_uri(init_models_paths, "base_model")
else:
raise FileNotFoundError("Pre-trained model must exist!")
fp_config = {}
fp_inputs_config = config["fp"]["inputs_config"]
fp_type = config["fp"]["type"]
fp_inputs = fp_styles[fp_type]["inputs"](**fp_inputs_config)
fp_config["inputs"] = fp_inputs
fp_config["run"] = config["fp"]["run_config"]
fp_config["extra_output_files"] = config["fp"].get("extra_output_files", [])
# aimd exploration
aimd_config = {}
if skip_aimd is not True:
aimd_config.update(fp_config)
aimd_inputs_config = copy.deepcopy(fp_inputs_config)
aimd_inputs_config.update(config["aimd"]["inputs_config"])
aimd_inputs = fp_styles[fp_type]["inputs"](**aimd_inputs_config)
aimd_config["inputs"] = aimd_inputs
aimd_sample_conf = {"labeled_data": False, "multi_sys_name": "init"}
if config["aimd"]["confs"]:
aimd_sample_conf.update(
{
"sample_conf": {
"confs": config["aimd"]["confs"],
"n_sample": config["aimd"]["n_sample"],
}
}
)
inference_config = config["inference"]
inference_config.update({"model": train_style})
inference_config = normalize_infer_args(inference_config)
# get rid of irrelevant argument
inference_config.pop("max_force")
# make distillation op
ft_op = make_ft_op(
fp_style=fp_type,
train_style=train_style,
explore_style=explore_style,
pert_gen_step_config=run_pert_gen_config,
prep_fp_config=prep_fp_config,
run_fp_config=run_fp_config,
prep_train_config=prep_train_config,
run_train_config=run_train_config,
prep_explore_config=prep_lmp_config,
run_explore_config=run_lmp_config,
scheduler_config=scheduler_config,
collect_data_step_config=run_collect_data_config,
select_confs_step_config=run_select_confs_config,
inference_step_config=run_inference_config,
upload_python_packages=upload_python_packages,
init_training=init_training,
skip_aimd=skip_aimd,
)
ft_step = Step(
"finetune",
template=ft_op,
parameters={
"block_id": "finetune",
"type_map": type_map,
"mass_map": mass_map,
"pert_config": pert_config, # Total input parameter file: to be changed in the future
"numb_models": numb_models,
"conf_selector": conf_selector,
"conf_filters_conv": conf_filters_conv,
"converge_config": converge_config,
"scheduler": scheduler,
"explore_config": explore_config,
"template_script": template_script,
"train_config": train_config,
"fp_config": fp_config,
"aimd_config": aimd_config,
"aimd_sample_conf": aimd_sample_conf,
"collect_data_config": collect_data_config,
"inference_config": inference_config,
},
artifacts={
"init_models": init_models,
"init_confs": init_confs,
"expl_models": init_models,
"init_data": init_data,
"iter_data": iter_data,
},
)
self.workflow.add(ft_step)
def _moniter_dist(self):
print("Running...")
while True:
time.sleep(4)
step_info = self.workflow.query()
wf_status = self.workflow.query_status()
if wf_status == "Failed":
raise RuntimeError(
f"Workflow failed (ID: {self.workflow.id}, UID: {self.workflow.uid})"
)
try:
dist_post = step_info.get_step(name="distillation")[0]
except IndexError:
continue
if dist_post["phase"] == "Succeeded":
print(
f"Distillation finished (ID: {self.workflow.id}, UID: {self.workflow.uid})"
)
print("Retrieving completed tasks to local...")
download_artifact(
artifact=dist_post.outputs.artifacts["dist_model"],
path=self.download_path,
)
break
def _moniter_ft(self):
while True:
time.sleep(4)
step_info = self.workflow.query()
wf_status = self.workflow.query_status()
if wf_status == "Failed":
raise RuntimeError(
f"Workflow failed (ID: {self.workflow.id}, UID: {self.workflow.uid})"
)
try:
dist_post = step_info.get_step(name="finetune")[0]
except IndexError:
continue
if dist_post["phase"] == "Succeeded":
print(
f"Distillation finished (ID: {self.workflow.id}, UID: {self.workflow.uid})"
)
print("Retrieving completed tasks to local...")
download_artifact(
artifact=dist_post.outputs.artifacts["fine_tuned_model"],
path=self.download_path,
)
break
[docs]
def submit(
self,
reuse_step: Optional[List[ArgoStep]] = None,
no_submission: bool = False,
only_submit: bool = True,
):
if not no_submission:
self.workflow.submit(reuse_step=reuse_step)
# return self.workflow
else:
return self.workflow
if not only_submit:
if self.wf_type == "dist":
self._moniter_dist()
elif self.wf_type == "finetune":
self._moniter_ft()
[docs]
def successful_step_keys(wf, unsuccessful_step_keys: bool = False):
all_step_keys = []
steps = wf.query_step()
# For reused steps whose startedAt are identical, sort them by key
steps.sort(key=lambda x: "%s-%s" % (x.startedAt, x.key))
for step in steps:
if not unsuccessful_step_keys:
if step.key is not None and step.phase == "Succeeded":
all_step_keys.append(step.key)
else:
if step.key is not None:
all_step_keys.append(step.key)
return all_step_keys
[docs]
def get_superop(key):
if "prep-train" in key:
return key.replace("prep-train", "prep-run-train")
elif "run-train-" in key:
return re.sub("run-train-[0-9]*", "prep-run-train", key)
elif "prep-lmp" in key:
return key.replace("prep-lmp", "prep-run-explore")
elif "run-lmp-" in key:
return re.sub("run-lmp-[0-9]*", "prep-run-explore", key)
elif "prep-fp" in key:
return key.replace("prep-fp", "prep-run-fp")
elif "run-fp-" in key:
return re.sub("run-fp-[0-9]*", "prep-run-fp", key)
elif "prep-caly-input" in key:
return key.replace("prep-caly-input", "prep-run-explore")
elif "collect-run-calypso-" in key:
return re.sub("collect-run-calypso-[0-9]*-[0-9]*", "prep-run-explore", key)
elif "prep-dp-optim-" in key:
return re.sub("prep-dp-optim-[0-9]*-[0-9]*", "prep-run-explore", key)
elif "run-dp-optim-" in key:
return re.sub("run-dp-optim-[0-9]*-[0-9]*-[0-9]*", "prep-run-explore", key)
elif "prep-caly-model-devi" in key:
return key.replace("prep-caly-model-devi", "prep-run-explore")
elif "run-caly-model-devi" in key:
return re.sub("run-caly-model-devi-[0-9]*", "prep-run-explore", key)
elif "caly-evo-step" in key:
return re.sub("caly-evo-step-[0-9]*", "prep-run-explore", key)
return None
[docs]
def fold_keys(all_step_keys):
folded_keys = {}
for key in all_step_keys:
is_superop = False
for superop in ["prep-run-train", "prep-run-explore", "prep-run-fp"]:
if superop in key:
if key not in folded_keys:
folded_keys[key] = []
is_superop = True
break
if is_superop:
continue
superop = get_superop(key)
# if its super OP is succeeded, fold it into its super OP
if superop is not None and superop in all_step_keys:
if superop not in folded_keys:
folded_keys[superop] = []
folded_keys[superop].append(key)
else:
folded_keys[key] = [key]
for k, v in folded_keys.items():
if v == []:
folded_keys[k] = [k]
return folded_keys
[docs]
def get_resubmit_keys(wf, unsuccessful_step_keys: bool = False):
all_step_keys = successful_step_keys(wf, unsuccessful_step_keys)
step_keys = [
"pert-gen",
"sample-aimd",
"prep-run-train",
"prep-train",
"run-train",
"modify-train-script",
"prep-caly-input",
"prep-caly-model-devi",
"run-caly-model-devi",
"prep-run-explore",
"prep-lmp",
"run-lmp",
"select-confs",
"prep-run-fp",
"prep-fp",
"run-fp",
"collect-data",
"test-model",
# "scheduler",
"check-converge",
"inference",
]
all_step_keys = matched_step_key(
all_step_keys,
step_keys,
)
all_step_keys = sort_slice_ops(
all_step_keys,
["run-train", "run-lmp", "run-fp"],
)
folded_keys = fold_keys(all_step_keys)
return folded_keys
[docs]
def resubmit_workflow(
wf_config,
wfid,
list_steps=False,
reuse=None,
fold=False,
unsuccessful_step_keys: bool = False,
**kwargs,
):
global_config_workflow(normalize_args(wf_config))
old_wf = Workflow(id=wfid)
folded_keys = get_resubmit_keys(old_wf, unsuccessful_step_keys)
all_step_keys = sum(folded_keys.values(), [])
if list_steps:
prt_str = print_keys_in_nice_format(
all_step_keys, ["run-train", "run-lmp", "run-fp"]
)
print(prt_str)
return
if reuse is None:
return
reuse_idx = expand_idx(reuse)
reused_keys = [all_step_keys[ii] for ii in reuse_idx]
if fold:
reused_folded_keys = {}
for key in reused_keys:
superop = get_superop(key)
if superop is not None:
if superop not in reused_folded_keys:
reused_folded_keys[superop] = []
reused_folded_keys[superop].append(key)
else:
reused_folded_keys[key] = [key]
for k, v in reused_folded_keys.items():
# reuse the super OP iif all steps within it are reused
if v != [k] and k in folded_keys and set(v) == set(folded_keys[k]):
reused_folded_keys[k] = [k]
reused_keys = sum(reused_folded_keys.values(), [])
print(reused_keys)
reuse_step = old_wf.query_step(key=reused_keys)
# For reused steps whose startedAt are identical, sort them by key
reuse_step.sort(key=lambda x: "%s-%s" % (x.startedAt, x.key))
wf = FlowGen(wf_config)
wf.submit(reuse_step=reuse_step, **kwargs)