from math import log
from pathlib import Path
import json
from typing import (
List,
)
from dflow.python import OP, OPIO, Artifact, BigParameter, OPIOSign, Parameter
from pfd.exploration.inference import EvalModel, TestReports
import logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
[docs]
class ModelTestOP(OP):
[docs]
@classmethod
def get_output_sign(cls):
return OPIOSign(
{
"test_report": Artifact(Path),
"test_res": BigParameter(TestReports),
"test_res_dir": Artifact(Path),
}
)
[docs]
@OP.exec_sign_check
def execute(
self,
ip: OPIO,
) -> OPIO:
systems = ip["systems"]
model_path = ip["model"]
type_map = ip["type_map"]
config = ip["inference_config"]
model_type = config.pop("model")
Eval = EvalModel.get_driver(model_type)
res_total = []
report = {}
res_dir = Path("result")
res_dir.mkdir(exist_ok=True)
evaluator = Eval(model=model_path, **config)
logging.info("##### Number of systems: %d" % len(systems))
res_total = TestReports()
for idx, sys in enumerate(systems):
if sys is None:
logging.warning("System at index %d is None, skipping..." % idx)
continue
name = "sys_%03d_%s" % (idx, sys.name)
logging.info("##### Testing: %s" % name)
evaluator.read_data(data=sys, type_map=type_map)
res, rep = evaluator.evaluate(name, prefix=str(res_dir))
res_total.add_report(res)
logging.info("##### Testing ends, : writing to report...")
report[name] = rep
with open("report.json", "w") as fp:
json.dump(report, fp, indent=4)
return OPIO(
{
"test_res": res_total,
"test_report": Path("report.json"),
"test_res_dir": res_dir,
}
)