import textwrap
from typing import (
List,
)
import dargs
from dargs import (
Argument,
Variant,
)
from pfd.exploration.converge import CheckConv, ConfFilterConv
from pfd.exploration.selector import conf_filter_styles
from pfd.exploration.inference import EvalModel
from dpgen2.fp import (
fp_styles,
)
from pfd.train import train_styles
from dpgen2.op.run_lmp import (
RunLmp,
)
from pfd.utils import (
normalize_step_dict,
step_conf_args,
)
[docs]
def make_link(content, ref_key):
raw_anchor = dargs.dargs.RAW_ANCHOR
return (
f"`{content} <{ref_key}_>`_" if not raw_anchor else f"`{content} <#{ref_key}>`_"
)
[docs]
def task_finetune():
doc_init_train = "Training before exploration"
doc_skip_aimd = "Skip aimd exploration"
doc_rec_ft = "Start training from the output model of the last iteration"
return [
Argument(
"init_training", bool, optional=True, default=False, doc=doc_init_train
),
Argument("skip_aimd", bool, optional=True, default=True, doc=doc_skip_aimd),
Argument("recursive", bool, optional=True, default=False, doc=doc_rec_ft),
]
[docs]
def variant_task():
return Variant(
"type",
[
Argument("finetune", dict, task_finetune(), alias=["ft"]),
Argument("dist", dict, [], alias=["distillation"]),
Argument("data_gen", dict, [], alias=["data generation"]),
],
)
[docs]
def conf_args():
doc_fmt = "Format of input structure files"
return [
Argument("prefix", str, optional=True, default=None),
Argument("fmt", str, optional=True, default="vasp/poscar", doc=doc_fmt),
Argument("confs_paths", [str, List[str]], optional=True, alias=["files"]),
Argument("confs_uri", [str, List[str]], optional=True, default=None),
]
[docs]
def pert_gen():
doc_atom_pert_distance = "Perturb distance for atoms, in Angstrom."
doc_orig = "Include unperturbed structures."
doc_pert_num = "Number of perturbed structures"
doc_cell_pert = "The amount of lattice contraction or extension, relative to original lattice constant."
doc_replicate = (
"Generate supercell by lattice replication. Either an integer number or a list of three integers. "
"If an integer is given, the lattice is replicated uniformly in all three directions."
)
return [
Argument("conf_idx", [str, List[int]], optional=True, default="default"),
Argument(
"atom_pert_distance",
float,
optional=True,
default=0.0,
doc=doc_atom_pert_distance,
),
Argument("orig", bool, optional=True, default=False, doc=doc_orig),
Argument(
"cell_pert_fraction", float, optional=True, default=0.0, doc=doc_cell_pert
),
Argument("pert_num", int, optional=True, default=1, doc=doc_pert_num),
Argument(
"replicate", [int, List[int]], optional=True, default=1, doc=doc_replicate
),
]
[docs]
def normalize_pert_gen(data):
defs = pert_gen()
base = Argument("base", dict, defs)
data = base.normalize_value(data, trim_pattern="_*")
# not possible to strictly check arguments, dirty hack!
base.check_value(data, strict=True)
return data
[docs]
def conf_gen_args():
doc_init_conf = "The initial configurations for PFD workflow"
doc_pert_gen = "Structure perturbation settings. A list of multiple perturbation settings can also be supplied if neccesarry."
return [
Argument(
"init_confs",
dict,
conf_args(),
alias=["confs", "init_configurations"],
doc=doc_init_conf,
),
Argument(
"pert_generation",
[dict, List[dict]],
pert_gen(),
optional=True,
default={},
doc=doc_pert_gen,
),
]
[docs]
def train_args(run_train):
doc_numb_models = "Number of models trained for evaluating the model deviation"
doc_config = "Configuration of training"
doc_template_script = "File names of the template training script. It can be a `List[str]`, the length of which is the same as `numb_models`. Each template script in the list is used to train a model. Can be a `str`, the models share the same template training script. "
doc_init_models_paths = "the paths to initial models"
doc_init_models_uri = "The URI of initial models"
doc_optional_files = "Optional files for training"
return [
Argument(
"config",
dict,
run_train.training_args(),
optional=True,
default=run_train.normalize_config({}),
doc=doc_config,
),
Argument("numb_models", int, optional=True, default=1, doc=doc_numb_models),
Argument(
"template_script",
[List[str], str, dict],
optional=True,
default={},
doc=doc_template_script,
),
Argument(
"init_models_paths",
List[str],
optional=True,
default=None,
doc=doc_init_models_paths,
alias=["training_iter0_model_path"],
),
Argument(
"init_models_uri",
str,
optional=True,
default=None,
doc=doc_init_models_uri,
),
Argument(
"optional_files",
list,
optional=True,
default=None,
doc=doc_optional_files,
),
]
[docs]
def variant_train():
doc = "the type of the training model"
train_list = []
for kk in train_styles.keys():
train_list.append(Argument(kk, dict, train_args(train_styles[kk]["run"])))
return Variant(
"type",
train_list,
doc=doc,
)
[docs]
def variant_conv():
doc = "the type of the condidate selection and convergence check method."
var_list = []
for kk, vv in CheckConv.get_checkers().items():
var_list.append(Argument(kk, dict, vv.args(), doc=vv.doc()))
return Variant(
"type",
var_list,
doc=doc,
)
[docs]
def variant_frame_selector():
doc = "the type of the frame selector"
var_list = []
for kk, vv in conf_filter_styles.items():
var_list.append(Argument(kk, dict, vv.args(), doc=vv.doc()))
return Variant("type", var_list, doc=doc)
[docs]
def variant_conv_filter():
doc = "frame filters based on model test"
var_list = []
for kk, vv in ConfFilterConv.get_filters().items():
var_list.append(Argument(kk, dict, vv.args(), doc=vv.doc()))
return Variant("type", var_list, doc=doc)
[docs]
def lmp_args():
doc_config = "Configuration of lmp exploration"
doc_max_numb_iter = "Maximum number of iterations per stage"
doc_fatal_at_max = (
"Fatal when the number of iteration per stage reaches the `max_numb_iter`"
)
doc_output_nopbc = "Remove pbc of the output configurations"
doc_convergence = "The method of convergence check."
doc_stages = (
"The definition of exploration stages of type `List[List[ExplorationTaskGroup]`. "
"The outer list provides the enumeration of the exploration stages. "
"Then each stage is defined by a list of exploration task groups. "
"Each task group is described in :ref:`the task group definition<task_group_sec>` "
)
doc_filter = "Filter configuration for DFT calculation"
doc_conf_filter = (
"Filtering configurations with too larger or too small prediction error"
)
return [
Argument(
"config",
dict,
RunLmp.lmp_args(),
optional=True,
default=RunLmp.normalize_config({}),
doc=doc_config,
),
Argument(
"max_numb_iter",
int,
optional=True,
default=10,
doc=doc_max_numb_iter,
alias=["max_iter"],
),
Argument(
"fatal_at_max", bool, optional=True, default=True, doc=doc_fatal_at_max
),
Argument(
"output_nopbc", bool, optional=True, default=False, doc=doc_output_nopbc
),
Argument(
"convergence",
dict,
[
Argument(
"conf_filter",
List[dict],
[],
[variant_conv_filter()],
optional=True,
default=[],
doc=doc_conf_filter,
)
],
[variant_conv()],
optional=False,
doc=doc_convergence,
alias=["converge_config"],
),
Argument(
"filter",
List[dict],
[],
[variant_frame_selector()],
optional=True,
default=[{"type": "distance"}],
doc=doc_filter,
),
Argument("stages", List[List[dict]], optional=False, doc=doc_stages),
]
[docs]
def run_expl_caly_conf_args():
doc_caly_model_devi_group_size = "group size for model deviation."
doc_run_calypso_command = "command of running calypso."
doc_caly_run_dp_opt_command = "command of running optimization with dp."
return [
Argument(
"model_devi_group_size",
int,
optional=True,
doc=doc_caly_model_devi_group_size,
),
Argument(
"run_calypso_command",
str,
optional=True,
default="calypso.x",
doc=doc_run_calypso_command,
),
Argument(
"run_opt_command",
str,
optional=True,
doc=doc_caly_run_dp_opt_command,
),
]
[docs]
def caly_args():
doc_config = "Configuration of calypso exploration"
doc_max_numb_iter = "Maximum number of iterations per stage"
doc_fatal_at_max = (
"Fatal when the number of iteration per stage reaches the `max_numb_iter`"
)
doc_output_nopbc = "Remove pbc of the output configurations"
doc_convergence = "The method of convergence check."
doc_configuration = "A list of initial configurations."
doc_stages = (
"The definition of exploration stages of type `List[List[ExplorationTaskGroup]`. "
"The outer list provides the enumeration of the exploration stages. "
"Then each stage is defined by a list of exploration task groups. "
"Each task group is described in :ref:`the task group definition<task_group_sec>` "
)
doc_filters = "A list of configuration filters"
return [
Argument(
"config",
dict,
run_expl_caly_conf_args(),
optional=True,
default=RunLmp.normalize_config({}),
doc=doc_config,
),
Argument(
"max_numb_iter",
int,
optional=True,
default=5,
doc=doc_max_numb_iter,
alias=["max_iter"],
),
Argument(
"fatal_at_max", bool, optional=True, default=True, doc=doc_fatal_at_max
),
Argument(
"output_nopbc", bool, optional=True, default=False, doc=doc_output_nopbc
),
Argument(
"convergence",
dict,
[],
[variant_conv()],
optional=False,
doc=doc_convergence,
),
Argument("stages", List[List[dict]], optional=False, doc=doc_stages),
Argument(
"filters",
list,
[],
[variant_frame_selector()],
optional=True,
default=[],
doc=doc_filters,
),
]
[docs]
def variant_explore():
doc = "The type of the exploration"
doc_lmp = "The exploration by LAMMPS simulations"
doc_calypso = "The exploration by CALYPSO structure prediction"
return Variant(
"type",
[
Argument("lmp", dict, lmp_args(), doc=doc_lmp),
Argument("calypso", dict, caly_args(), doc=doc_calypso),
Argument("calypso:default", dict, caly_args(), doc=doc_calypso),
Argument("calypso:merge", dict, caly_args(), doc=doc_calypso),
],
doc=doc,
)
[docs]
def fp_args(inputs, run):
doc_inputs_config = "Configuration for preparing vasp inputs"
doc_run_config = "Configuration for running vasp tasks"
doc_task_max = "Maximum number of vasp tasks for each iteration"
doc_extra_output_files = "Extra output file names, support wildcards"
return [
Argument(
"inputs_config",
dict,
inputs.args(),
optional=False,
doc=doc_inputs_config,
),
Argument(
"run_config",
dict,
run.args(),
optional=False,
doc=doc_run_config,
),
Argument("task_max", int, optional=True, default=100, doc=doc_task_max),
Argument(
"extra_output_files",
List,
optional=True,
default=[],
doc=doc_extra_output_files,
),
]
[docs]
def variant_fp():
doc = "the type of the fp"
fp_list = []
for kk in fp_styles.keys():
fp_list.append(
Argument(
kk,
dict,
fp_args(fp_styles[kk]["inputs"], fp_styles[kk]["run"]),
)
)
return Variant("type", fp_list, doc=doc)
[docs]
def aimd_args():
doc_conf = "The systems selected for initial fp calculation"
doc_n_sample = (
"The number of configurations selected for fp calculation within each system"
)
return [
Argument("confs", List[int], optional=True, default=None, doc=doc_conf),
Argument("n_sample", int, optional=True, default=1, doc=doc_n_sample),
]
[docs]
def infer_args():
doc_max_force = "The max value of allowed atomic force"
return [
Argument("max_force", float, optional=True, default=None, doc=doc_max_force),
]
[docs]
def variant_infer():
doc_model_type = "The supported model type for inference"
var = []
for kk, vv in EvalModel.get_drivers().items():
var.append(Argument(kk, dict, vv.args(), doc=vv.doc()))
return Variant("model", var, doc=doc_model_type)
[docs]
def normalize_infer_args(data):
base = Argument("base", dict, infer_args(), [variant_infer()])
data = base.normalize_value(data, trim_pattern="_*")
base.check_value(data)
return data
[docs]
def dflow_conf_args():
doc_dflow_config = "The configuration passed to dflow"
doc_dflow_s3_config = "The S3 configuration passed to dflow"
return [
Argument(
"dflow_config", dict, optional=True, default=None, doc=doc_dflow_config
),
Argument(
"dflow_s3_config",
dict,
optional=True,
default=None,
doc=doc_dflow_s3_config,
),
]
[docs]
def bohrium_conf_args():
doc_username = "The username of the Bohrium platform"
doc_password = "The password of the Bohrium platform"
doc_project_id = "The project ID of the Bohrium platform"
doc_host = (
"The host name of the Bohrium platform. Will overwrite `dflow_config['host']`"
)
doc_k8s_api_server = "The k8s server of the Bohrium platform. Will overwrite `dflow_config['k8s_api_server']`"
doc_repo_key = "The repo key of the Bohrium platform. Will overwrite `dflow_s3_config['repo_key']`"
doc_storage_client = "The storage client of the Bohrium platform. Will overwrite `dflow_s3_config['storage_client']`"
return [
Argument("username", str, optional=False, doc=doc_username),
Argument("password", str, optional=True, doc=doc_password),
Argument("project_id", int, optional=False, doc=doc_project_id),
Argument("ticket", str, optional=True),
Argument(
"host",
str,
optional=True,
default="https://workflows.deepmodeling.com",
doc=doc_host,
),
Argument(
"k8s_api_server",
str,
optional=True,
default="https://workflows.deepmodeling.com",
doc=doc_k8s_api_server,
),
Argument(
"repo_key", str, optional=True, default="oss-bohrium", doc=doc_repo_key
),
Argument(
"storage_client",
str,
optional=True,
default="dflow.plugins.bohrium.TiefblueClient",
doc=doc_storage_client,
),
]
[docs]
def default_step_config_args():
doc_default_step_config = "The default step configuration."
return [
Argument(
"default_step_config",
dict,
step_conf_args(),
optional=True,
default={},
doc=doc_default_step_config,
),
]
[docs]
def pfd_step_config_args(default_config):
doc_prep_train_config = "Configuration for prepare train"
doc_run_train_config = "Configuration for run train"
doc_prep_explore_config = "Configuration for prepare exploration"
doc_run_explore_config = "Configuration for run exploration"
doc_prep_fp_config = "Configuration for prepare fp"
doc_run_fp_config = "Configuration for run fp"
doc_select_confs_config = "Configuration for the select confs"
doc_collect_data_config = "Configuration for the collect data"
return [
Argument(
"prep_train_config",
dict,
step_conf_args(),
optional=True,
default=default_config,
doc=doc_prep_train_config,
),
Argument(
"run_train_config",
dict,
step_conf_args(),
optional=True,
default=default_config,
doc=doc_run_train_config,
),
Argument(
"prep_explore_config",
dict,
step_conf_args(),
optional=True,
default=default_config,
doc=doc_prep_explore_config,
),
Argument(
"run_explore_config",
dict,
step_conf_args(),
optional=True,
default=default_config,
doc=doc_run_explore_config,
),
Argument(
"prep_fp_config",
dict,
step_conf_args(),
optional=True,
default=default_config,
doc=doc_prep_fp_config,
),
Argument(
"run_fp_config",
dict,
step_conf_args(),
optional=True,
default=default_config,
doc=doc_run_fp_config,
),
Argument(
"select_confs_config",
dict,
step_conf_args(),
optional=True,
default=default_config,
doc=doc_select_confs_config,
),
Argument(
"collect_data_config",
dict,
step_conf_args(),
optional=True,
default=default_config,
doc=doc_collect_data_config,
),
]
[docs]
def wf_args(default_step_config=normalize_step_dict({})):
doc_name = "The workflow name, 'pfd' for default"
doc_bohrium_config = "Configurations for the Bohrium platform."
doc_step_configs = "Configurations for executing dflow steps"
doc_upload_python_packages = "Upload python package, for debug purpose"
doc_parallelism = "The parallelism for the workflow. Accept an int that stands for the maximum number of running pods for the workflow. None for default"
return (
[Argument("name", str, optional=True, default="pfd", doc=doc_name)]
+ dflow_conf_args()
+ default_step_config_args()
+ [
Argument(
"parallelism", int, optional=True, default=None, doc=doc_parallelism
),
Argument(
"bohrium_config",
dict,
bohrium_conf_args(),
optional=True,
default=None,
doc=doc_bohrium_config,
),
Argument(
"step_configs",
dict,
pfd_step_config_args(default_step_config),
optional=True,
default={},
doc=doc_step_configs,
),
Argument(
"upload_python_packages",
[List[str], str],
optional=True,
default=None,
doc=doc_upload_python_packages,
alias=["upload_python_package"],
),
]
)
[docs]
def task_args():
doc_task = "Task type, `finetune` or `dist`"
doc_inputs = "The input parameter and artifacts for pfd"
return [
Argument("task", dict, [], [variant_task()], optional=False, doc=doc_task),
Argument("inputs", dict, input_args(), optional=False, doc=doc_inputs),
]
[docs]
def conf_generation_args():
doc_conf_gen = "The inputparameter and artifacts for confs generation"
return [
Argument(
"conf_generation",
dict,
conf_gen_args(),
# pert_gen(),
optional=False,
alias=["configurations"],
doc=doc_conf_gen,
),
]
[docs]
def training_args():
doc_train = "The configuration for training"
return [
Argument("train", dict, [], [variant_train()], optional=False, doc=doc_train),
]
[docs]
def label_args():
doc_fp = "The configuration for FP"
doc_aimd = "The parameter for initial fp calculation"
doc_infer = "The parameters for inference settings"
return [
Argument("fp", dict, [], [variant_fp()], optional=True, doc=doc_fp),
Argument("aimd", dict, aimd_args(), optional=True, doc=doc_aimd),
Argument(
"inference",
dict,
infer_args(),
optional=True,
default={},
doc=doc_infer,
),
]
[docs]
def explore_args():
doc_test_set = "Set the portion of test set. Only available for `dist`"
doc_explore = "The configuration for exploration"
return [
Argument(
"exploration",
dict,
[
Argument(
"test_set_config",
dict,
optional=True,
default={"test_size": 0.1},
alias=["test_set"],
doc=doc_test_set,
)
],
[variant_explore()],
optional=False,
doc=doc_explore,
alias=["explore"],
),
]
[docs]
def submit_args(default_step_config=normalize_step_dict({})):
return (
wf_args(default_step_config)
+ task_args()
+ conf_generation_args()
+ training_args()
+ label_args()
+ explore_args()
)
[docs]
def normalize(data):
default_step_config = normalize_step_dict(data.get("default_step_config", {}))
defs = submit_args(default_step_config)
base = Argument("base", dict, defs)
data = base.normalize_value(data, trim_pattern="_*")
# not possible to strictly check arguments, dirty hack!
base.check_value(data, strict=False)
return data
[docs]
def gen_doc(*, make_anchor=True, make_link=True, **kwargs):
if make_link:
make_anchor = True
sca = submit_args()
base = Argument("submit", dict, sca)
ptr = []
ptr.append(base.gen_doc(make_anchor=make_anchor, make_link=make_link, **kwargs))
key_words = []
for ii in "\n\n".join(ptr).split("\n"):
if "argument path" in ii:
key_words.append(ii.split(":")[1].replace("`", "").strip())
return "\n\n".join(ptr)