Spaces:
Sleeping
Sleeping
import copy | |
from collections import namedtuple | |
from typing import Any, List, Union | |
import gfootball | |
import gfootball.env as football_env | |
import numpy as np | |
from ding.envs import BaseEnv, BaseEnvTimestep, BaseEnvInfo | |
from ding.envs.common.env_element import EnvElement, EnvElementInfo | |
from ding.torch_utils import to_ndarray | |
from ding.utils import ENV_REGISTRY | |
from dizoo.gfootball.envs.obs.encoder import FeatureEncoder | |
from dizoo.gfootball.envs.obs.gfootball_obs import FullObs | |
from dizoo.gfootball.envs.action.gfootball_action import GfootballSpAction | |
class GfootballEnv(BaseEnv): | |
timestep = namedtuple('GfootballTimestep', ['obs', 'reward', 'done', 'info']) | |
info_template = namedtuple('GFootballEnvInfo', ['obs_space', 'act_space', 'rew_space']) | |
def __init__(self, cfg: dict) -> None: | |
self._cfg = cfg | |
self.save_replay = self._cfg.save_replay | |
# self.env_name = cfg.get("env_name", "11_vs_11_kaggle") | |
self.gui = self._cfg.render | |
self._obs_helper = FullObs(cfg) | |
self._action_helper = GfootballSpAction(cfg) | |
self._launch_env_flag = False | |
self._encoder = FeatureEncoder() | |
self.is_evaluator = self._cfg.get("is_evaluator", False) | |
if self.is_evaluator: | |
self.env_name = "11_vs_11_hard_stochastic" | |
self.right_role_num = 0 | |
else: | |
self.env_name = "11_vs_11_kaggle" | |
self.right_role_num = 1 | |
def _make_env(self): | |
self._env = football_env.create_environment( | |
env_name=self.env_name, | |
representation='raw', | |
stacked=False, | |
logdir='/tmp/football', | |
write_goal_dumps=False, | |
write_full_episode_dumps=self.save_replay, | |
write_video=self.save_replay, | |
render=self.gui, | |
number_of_right_players_agent_controls=self.right_role_num | |
) | |
if hasattr(self, '_seed') and hasattr(self, '_dynamic_seed') and self._dynamic_seed: | |
np_seed = 100 * np.random.randint(1, 1000) | |
self._env.seed(self._seed + np_seed) | |
elif hasattr(self, '_seed'): | |
self._env.seed(self._seed) | |
self._launch_env_flag = True | |
if self.is_evaluator: | |
self._eval_episode_return = [0, 0] | |
else: | |
self._eval_episode_return = [0, 0] | |
def reset(self) -> np.ndarray: | |
if not self._launch_env_flag: | |
self._make_env() | |
self._init_flag = True | |
self._env.reset() | |
obs = self._env.observation() | |
if self.is_evaluator: | |
self._prev_obs = obs[0] | |
obs = self._encoder.encode(obs[0]) | |
return [obs, obs] | |
else: | |
self._prev_obs, self.prev_obs_opponent = obs | |
obs_ = self._encoder.encode(obs[0]) | |
obs_opponent = self._encoder.encode(obs[1]) | |
return [obs_, obs_opponent] | |
def close(self) -> None: | |
if self._launch_env_flag: | |
self._env.close() | |
self._launch_env_flag = False | |
def seed(self, seed: int, dynamic_seed: int = None) -> None: | |
self._seed = seed | |
if dynamic_seed: | |
self._dynamic_seed = dynamic_seed | |
def step(self, action) -> 'GfootballEnv.timestep': | |
action = to_ndarray(action) | |
# action = self.process_action(action) # process | |
raw_obs, raw_rew, done, info = self._env.step(action) | |
if self.is_evaluator: | |
raw_obs = raw_obs[0] | |
rew = GfootballEnv.calc_reward(raw_rew, self._prev_obs, raw_obs) | |
obs = to_ndarray(self._encoder.encode(raw_obs)) | |
rew = [rew, rew] | |
obs = [obs, obs] | |
self._eval_episode_return[0] += raw_rew | |
self._eval_episode_return[1] += raw_rew | |
else: | |
rew = GfootballEnv.calc_reward(raw_rew[0], self._prev_obs, raw_obs[0]) | |
rew_oppo = GfootballEnv.calc_reward(raw_rew[1], self._prev_obs, raw_obs[1]) | |
rew = [rew, rew_oppo] | |
obs = [to_ndarray(self._encoder.encode(raw_obs[0])), to_ndarray(self._encoder.encode(raw_obs[1]))] | |
self._eval_episode_return[0] += raw_rew[0] | |
self._eval_episode_return[1] += raw_rew[1] | |
if done: | |
if self.is_evaluator: | |
info['eval_episode_return'] = self._eval_episode_return | |
else: | |
info[0]['eval_episode_return'] = self._eval_episode_return[0] | |
info[1]['eval_episode_return'] = self._eval_episode_return[1] | |
return BaseEnvTimestep(obs, rew, done, info) | |
def info(self) -> BaseEnvInfo: | |
info_data = { | |
'obs_space': self._obs_helper.info, | |
'act_space': self._action_helper.info, | |
'rew_space': EnvElementInfo( | |
shape=1, | |
value={ | |
'min': np.float64("-inf"), | |
'max': np.float64("inf"), | |
'dtype': np.float32 | |
}, | |
), | |
} | |
return GfootballEnv.info_template(**info_data) | |
def __repr__(self) -> str: | |
return "DI-engine Gfootball Env({})".format(self.env_name) | |
def calc_reward(rew, prev_obs, obs): | |
""" | |
Reward disign referred to [football-pairs](https://github.com/seungeunrho/football-paris/blob/main/rewarders/rewarder_basic.py) | |
""" | |
ball_x, ball_y, ball_z = obs['ball'] | |
MIDDLE_X, PENALTY_X, END_X = 0.2, 0.64, 1.0 | |
PENALTY_Y, END_Y = 0.27, 0.42 | |
ball_position_r = 0.0 | |
if (-END_X <= ball_x and ball_x < -PENALTY_X) and (-PENALTY_Y < ball_y and ball_y < PENALTY_Y): | |
ball_position_r = -2.0 | |
elif (-END_X <= ball_x and ball_x < -MIDDLE_X) and (-END_Y < ball_y and ball_y < END_Y): | |
ball_position_r = -1.0 | |
elif (-MIDDLE_X <= ball_x and ball_x <= MIDDLE_X) and (-END_Y < ball_y and ball_y < END_Y): | |
ball_position_r = 0.0 | |
elif (PENALTY_X < ball_x and ball_x <= END_X) and (-PENALTY_Y < ball_y and ball_y < PENALTY_Y): | |
ball_position_r = 2.0 | |
elif (MIDDLE_X < ball_x and ball_x <= END_X) and (-END_Y < ball_y and ball_y < END_Y): | |
ball_position_r = 1.0 | |
else: | |
ball_position_r = 0.0 | |
left_yellow = np.sum(obs["left_team_yellow_card"]) - np.sum(prev_obs["left_team_yellow_card"]) | |
right_yellow = np.sum(obs["right_team_yellow_card"]) - np.sum(prev_obs["right_team_yellow_card"]) | |
yellow_r = right_yellow - left_yellow | |
win_reward = 0.0 | |
if obs['steps_left'] == 0: | |
[my_score, opponent_score] = obs['score'] | |
if my_score > opponent_score: | |
win_reward = 1.0 | |
reward = 5.0 * win_reward + 5.0 * rew + 0.003 * ball_position_r + yellow_r | |
return reward | |
def create_collector_env_cfg(cfg: dict) -> List[dict]: | |
collector_cfg = copy.deepcopy(cfg) | |
collector_env_num = collector_cfg.pop('collector_env_num', 1) | |
collector_cfg.is_evaluator = False | |
return [collector_cfg for _ in range(collector_env_num)] | |
def create_evaluator_env_cfg(cfg: dict) -> List[dict]: | |
evaluator_cfg = copy.deepcopy(cfg) | |
evaluator_env_num = evaluator_cfg.pop('evaluator_env_num', 1) | |
evaluator_cfg.is_evaluator = True | |
return [evaluator_cfg for _ in range(evaluator_env_num)] | |