1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
|
- """
- This file is copied/apdated from https://github.com/berkeleydeeprlcourse/homework/tree/master/hw3
- """
- import numpy as np
- from collections import deque
- import gym
- from gym import spaces
- from PIL import Image
- class NoopResetEnv(gym.Wrapper):
- def __init__(self, env=None, noop_max=30):
- """Sample initial states by taking random number of no-ops on reset.
- No-op is assumed to be action 0.
- """
- super(NoopResetEnv, self).__init__(env)
- self.noop_max = noop_max
- assert env.unwrapped.get_action_meanings()[0] == 'NOOP'
- def _reset(self):
- """ Do no-op action for a number of steps in [1, noop_max]."""
- self.env.reset()
- noops = np.random.randint(1, self.noop_max + 1)
- for _ in range(noops):
- obs, _, _, _ = self.env.step(0)
- return obs
- class FireResetEnv(gym.Wrapper):
- def __init__(self, env=None):
- """Take action on reset for environments that are fixed until firing."""
- super(FireResetEnv, self).__init__(env)
- assert env.unwrapped.get_action_meanings()[1] == 'FIRE'
- assert len(env.unwrapped.get_action_meanings()) >= 3
- def _reset(self):
- self.env.reset()
- obs, _, _, _ = self.env.step(1)
- obs, _, _, _ = self.env.step(2)
- return obs
- class EpisodicLifeEnv(gym.Wrapper):
- def __init__(self, env=None):
- """Make end-of-life == end-of-episode, but only reset on true game over.
- Done by DeepMind for the DQN and co. since it helps value estimation.
- """
- super(EpisodicLifeEnv, self).__init__(env)
- self.lives = 0
- self.was_real_done = True
- self.was_real_reset = False
- def _step(self, action):
- obs, reward, done, info = self.env.step(action)
- self.was_real_done = done
- # check current lives, make loss of life terminal,
- # then update lives to handle bonus lives
- lives = self.env.unwrapped.ale.lives()
- if lives < self.lives and lives > 0:
- # for Qbert somtimes we stay in lives == 0 condtion for a few frames
- # so its important to keep lives > 0, so that we only reset once
- # the environment advertises done.
- done = True
- self.lives = lives
- return obs, reward, done, info
- def _reset(self):
- """Reset only when lives are exhausted.
- This way all states are still reachable even though lives are episodic,
- and the learner need not know about any of this behind-the-scenes.
- """
- if self.was_real_done:
- obs = self.env.reset()
- self.was_real_reset = True
- else:
- # no-op step to advance from terminal/lost life state
- obs, _, _, _ = self.env.step(0)
- self.was_real_reset = False
- self.lives = self.env.unwrapped.ale.lives()
- return obs
- class MaxAndSkipEnv(gym.Wrapper):
- def __init__(self, env=None, skip=4):
- """Return only every `skip`-th frame"""
- super(MaxAndSkipEnv, self).__init__(env)
- # most recent raw observations (for max pooling across time steps)
- self._obs_buffer = deque(maxlen=2)
- self._skip = skip
- def _step(self, action):
- total_reward = 0.0
- done = None
- for _ in range(self._skip):
- obs, reward, done, info = self.env.step(action)
- self._obs_buffer.append(obs)
- total_reward += reward
- if done:
- break
- max_frame = np.max(np.stack(self._obs_buffer), axis=0)
- return max_frame, total_reward, done, info
- def _reset(self):
- """Clear past frame buffer and init. to first obs. from inner env."""
- self._obs_buffer.clear()
- obs = self.env.reset()
- self._obs_buffer.append(obs)
- return obs
- def _process_frame84(frame):
- img = np.reshape(frame, [210, 160, 3]).astype(np.float32)
- img = img[:, :, 0] * 0.299 + img[:, :, 1] * 0.587 + img[:, :, 2] * 0.114
- img = Image.fromarray(img)
- resized_screen = img.resize((84, 110), Image.BILINEAR)
- resized_screen = np.array(resized_screen)
- x_t = resized_screen[18:102, :]
- x_t = np.reshape(x_t, [84, 84, 1])
- return x_t.astype(np.uint8)
- class ProcessFrame84(gym.Wrapper):
- def __init__(self, env=None):
- super(ProcessFrame84, self).__init__(env)
- self.observation_space = spaces.Box(low=0, high=255, shape=(84, 84, 1))
- def _step(self, action):
- obs, reward, done, info = self.env.step(action)
- return _process_frame84(obs), reward, done, info
- def _reset(self):
- return _process_frame84(self.env.reset())
- class ClippedRewardsWrapper(gym.Wrapper):
- def _step(self, action):
- obs, reward, done, info = self.env.step(action)
- return obs, np.sign(reward), done, info
- def wrap_deepmind_ram(env):
- env = EpisodicLifeEnv(env)
- env = NoopResetEnv(env, noop_max=30)
- env = MaxAndSkipEnv(env, skip=4)
- if 'FIRE' in env.unwrapped.get_action_meanings():
- env = FireResetEnv(env)
- env = ClippedRewardsWrapper(env)
- return env
- def wrap_deepmind(env):
- assert 'NoFrameskip' in env.spec.id
- env = EpisodicLifeEnv(env)
- env = NoopResetEnv(env, noop_max=30)
- env = MaxAndSkipEnv(env, skip=4)
- if 'FIRE' in env.unwrapped.get_action_meanings():
- env = FireResetEnv(env)
- env = ProcessFrame84(env)
- env = ClippedRewardsWrapper(env)
- return env
|