mdz/pytorch/sac/1_scripts/0_infer.py

219 lines
8.3 KiB
Python

import argparse
import importlib
import os
import sys
sys.path.append(R"../0_sac")
import numpy as np
import torch as th
import yaml
from huggingface_sb3 import EnvironmentName
from stable_baselines3.common.callbacks import tqdm
from stable_baselines3.common.utils import set_random_seed
from rl_zoo3 import ALGOS, create_test_env, get_saved_hyperparams
from rl_zoo3.exp_manager import ExperimentManager
from rl_zoo3.utils import StoreDict, get_model_path
def enjoy() -> None: # noqa: C901
parser = argparse.ArgumentParser()
parser.add_argument("--env", help="environment ID", type=EnvironmentName, default="CarRacing-v2")
parser.add_argument("-f", "--folder", help="Log folder", type=str, default="..\\weights")
parser.add_argument("--algo", help="RL Algorithm", default="sac", type=str, required=False, choices=list(ALGOS.keys()))
parser.add_argument("-n", "--n-timesteps", help="number of timesteps", default=1000, type=int)
parser.add_argument("--num-threads", help="Number of threads for PyTorch (-1 to use default)", default=-1, type=int)
parser.add_argument("--n-envs", help="number of environments", default=1, type=int)
parser.add_argument("--exp-id", help="Experiment ID (default: 0: latest, -1: no exp folder)", default=0, type=int)
parser.add_argument("--verbose", help="Verbose mode (0: no output, 1: INFO)", default=1, type=int)
parser.add_argument(
"--no-render", action="store_true", default=False, help="Do not render the environment (useful for tests)"
)
parser.add_argument("--deterministic", action="store_true", default=False, help="Use deterministic actions")
parser.add_argument("--device", help="PyTorch device to be use (ex: cpu, cuda...)", default="auto", type=str)
parser.add_argument(
"--load-best", action="store_true", default=True, help="Load best model instead of last model if available"
)
parser.add_argument("--stochastic", action="store_true", default=False, help="Use stochastic actions")
parser.add_argument(
"--norm-reward", action="store_true", default=False, help="Normalize reward if applicable (trained with VecNormalize)"
)
parser.add_argument("--seed", help="Random generator seed", type=int, default=0)
parser.add_argument("--reward-log", help="Where to log reward", default="", type=str)
parser.add_argument(
"--gym-packages",
type=str,
nargs="+",
default=[],
help="Additional external Gym environment package modules to import",
)
parser.add_argument(
"--env-kwargs", type=str, nargs="+", action=StoreDict, help="Optional keyword argument to pass to the env constructor"
)
parser.add_argument(
"--custom-objects", action="store_true", default=False, help="Use custom objects to solve loading issues"
)
parser.add_argument(
"-P",
"--progress",
action="store_true",
default=False,
help="if toggled, display a progress bar using tqdm and rich",
)
args = parser.parse_args()
# Going through custom gym packages to let them register in the global registory
for env_module in args.gym_packages:
importlib.import_module(env_module)
env_name: EnvironmentName = args.env
algo = args.algo
folder = args.folder
_, model_path, log_path = get_model_path(
args.exp_id,
folder,
algo,
env_name,
args.load_best
)
print(f"Loading {model_path}")
# Off-policy algorithm only support one env for now
off_policy_algos = ["qrdqn", "dqn", "ddpg", "sac", "her", "td3", "tqc"]
set_random_seed(args.seed)
if args.num_threads > 0:
if args.verbose > 1:
print(f"Setting torch.num_threads to {args.num_threads}")
th.set_num_threads(args.num_threads)
is_atari = ExperimentManager.is_atari(env_name.gym_id)
is_minigrid = ExperimentManager.is_minigrid(env_name.gym_id)
stats_path = os.path.join(log_path, env_name)
hyperparams, maybe_stats_path = get_saved_hyperparams(stats_path, norm_reward=args.norm_reward, test_mode=True)
# load env_kwargs if existing
env_kwargs = {}
args_path = os.path.join(log_path, env_name, "args.yml")
if os.path.isfile(args_path):
with open(args_path) as f:
loaded_args = yaml.load(f, Loader=yaml.UnsafeLoader)
if loaded_args["env_kwargs"] is not None:
env_kwargs = loaded_args["env_kwargs"]
# overwrite with command line arguments
if args.env_kwargs is not None:
env_kwargs.update(args.env_kwargs)
log_dir = args.reward_log if args.reward_log != "" else None
env = create_test_env(
env_name.gym_id,
n_envs=args.n_envs,
stats_path=maybe_stats_path,
seed=args.seed,
log_dir=log_dir,
should_render=not args.no_render,
hyperparams=hyperparams,
env_kwargs=env_kwargs,
)
kwargs = dict(seed=args.seed)
if algo in off_policy_algos:
kwargs.update(dict(buffer_size=1))
if "optimize_memory_usage" in hyperparams:
kwargs.update(optimize_memory_usage=False)
# Check if we are running python 3.8+
# we need to patch saved model under python 3.6/3.7 to load them
newer_python_version = sys.version_info.major == 3 and sys.version_info.minor >= 8
custom_objects = {}
if newer_python_version or args.custom_objects:
custom_objects = {
"learning_rate": 0.0,
"lr_schedule": lambda _: 0.0,
"clip_range": lambda _: 0.0,
}
model = ALGOS[algo].load(model_path, custom_objects=custom_objects, device=args.device, **kwargs)
obs = env.reset()
deterministic = True
episode_reward = 0.0
episode_rewards, episode_lengths = [], []
ep_len = 0
# For HER, monitor success rate
successes = []
lstm_states = None
episode_start = np.ones((env.num_envs,), dtype=bool)
generator = range(args.n_timesteps)
if args.progress:
if tqdm is None:
raise ImportError("Please install tqdm and rich to use the progress bar")
generator = tqdm(generator)
try:
for _ in generator:
action, lstm_states = model.predict(
obs, # type: ignore[arg-type]
state=lstm_states,
episode_start=episode_start,
deterministic=deterministic,
)
obs, reward, done, infos = env.step(action)
episode_start = done
if not args.no_render:
env.render("human")
episode_reward += reward[0]
ep_len += 1
if args.n_envs == 1:
if done and not is_atari and args.verbose > 0:
# NOTE: for env using VecNormalize, the mean reward
# is a normalized reward when `--norm_reward` flag is passed
print(f"Episode Reward: {episode_reward:.2f}")
print("Episode Length", ep_len)
episode_rewards.append(episode_reward)
episode_lengths.append(ep_len)
episode_reward = 0.0
ep_len = 0
# Reset also when the goal is achieved when using HER
if done and infos[0].get("is_success") is not None:
if args.verbose > 1:
print("Success?", infos[0].get("is_success", False))
if infos[0].get("is_success") is not None:
successes.append(infos[0].get("is_success", False))
episode_reward, ep_len = 0.0, 0
except KeyboardInterrupt:
pass
if args.verbose > 0 and len(successes) > 0:
print(f"Success rate: {100 * np.mean(successes):.2f}%")
if args.verbose > 0 and len(episode_rewards) > 0:
print(f"{len(episode_rewards)} Episodes")
print(f"Mean reward: {np.mean(episode_rewards):.2f} +/- {np.std(episode_rewards):.2f}")
if args.verbose > 0 and len(episode_lengths) > 0:
print(f"Mean episode length: {np.mean(episode_lengths):.2f} +/- {np.std(episode_lengths):.2f}")
env.close()
if __name__ == "__main__":
enjoy()