-
Notifications
You must be signed in to change notification settings - Fork 0
/
Double-DQN_CartPole.py
411 lines (349 loc) · 13.6 KB
/
Double-DQN_CartPole.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
import random
import datetime
import numpy as np
from gym import wrappers
from collections import deque
import tensorflow as tf
from keras import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
import gym
from gym import wrappers
import os
'''
Original paper: https://www.cs.toronto.edu/~vmnih/docs/dqn.pdf
- DQN model with Dense layers only
- Multiple states are concatenated before given to the model
- Uses target model for more stable training
- More states was shown to have better performance for CartPole env
This is based on the Double-DQN script from https://github.com/VXU1230/Medium-Tutorials/blob/master/dqn/cart_pole.py which has been modified to work with
the updated CartPole-v1 environment. There has also been some changes to how rewards are calculated upon truncation.
Early termination has been added to the training loop based on the Gym Wiki's definition of the task to be solved.
Several other changes have been made to the code, like easier change of NN depth and width and conversion to f-strings.
'''
class MyModel(tf.keras.Model):
"""
In the call method, we define the forward pass of the model.
The model can be given a width and depth, inputs shape , output shape and learning rate.
Arguments:
----------
num_states: int
The number of states in the environment
hidden_units: int
The number of hidden units in the model
hidden_layers: int
The number of hidden layers in the model
num_actions: int
The number of actions in the environment
lr: float
The learning rate of the model
Returns:
--------
none
"""
def __init__(self, num_states, hidden_units, hidden_layers, num_actions, lr):
super(MyModel, self).__init__()
self.input_layer = tf.keras.layers.InputLayer(input_shape=(num_states,))
self.hidden_layers = []
for _ in range(hidden_layers):
self.hidden_layers.append(tf.keras.layers.Dense(
hidden_units, activation='relu', kernel_initializer='RandomNormal'))
self.output_layer = tf.keras.layers.Dense(
num_actions, activation='linear', kernel_initializer='RandomNormal')
@tf.function
def call(self, inputs):
"""
Simple forward pass of the model
Arguments:
----------
inputs: array
The input to the model
Returns:
--------
output: array
The output of the model
"""
z = self.input_layer(inputs)
for layer in self.hidden_layers:
z = layer(z)
output = self.output_layer(z)
return output
class DQN:
"""
DQN agent with target model and experience replay
The DQN agent that interacts with the environment. It has a memory buffer that stores the past experiences.
It also has a the neural network that is used to predict the Q values of the states.
Arguments:
----------
num_states: int
The number of states in the environment
num_actions: int
The number of actions in the environment
hidden_units: int
The number of hidden units in the model
hidden_layers: int
The number of hidden layers in the model
gamma: float
The discount factor
max_experiences: int
The maximum number of experiences to store in the memory buffer
min_experiences: int
The minimum number of experiences to store in the memory buffer before training
batch_size: int
The number of experiences to sample from the memory buffer for training
lr: float
The learning rate of the model
max_steps: int
The maximum number of steps to run the environment for
decay_rate: float
The decay rate of the learning rate
Returns:
--------
none
"""
def __init__(self, num_states, num_actions, hidden_units, hidden_layers, gamma, max_experiences, min_experiences, batch_size, lr, max_steps, decay_rate):
self.num_actions = num_actions
self.batch_size = batch_size
self.optimizer = tf.optimizers.Adam(learning_rate=lr, decay=decay_rate)
self.gamma = gamma
self.model = MyModel(num_states, hidden_units, hidden_layers, num_actions, lr)
self.experience = {'s': [], 'a': [], 'r': [], 's2': [], 'done': []}
self.max_experiences = max_experiences
self.min_experiences = min_experiences
self.max_steps = max_steps
def predict(self, inputs):
"""
Predicts the Q values of the states
Arguments:
----------
inputs: array
The states of the environment
Returns:
--------
output: array
The Q values of the states
"""
return self.model(np.atleast_2d(inputs.astype('float32')))
def train(self, TargetNet):
"""
Trains the model using the experiences in the memory buffer
Arguments:
----------
TrainNet: keras model
The target model
Returns:
--------
loss: float
The loss of the model
"""
if len(self.experience['s']) < self.min_experiences:
return 0
ids = np.random.randint(low=0, high=len(self.experience['s']), size=self.batch_size)
states = np.asarray([self.experience['s'][i] for i in ids])
actions = np.asarray([self.experience['a'][i] for i in ids])
rewards = np.asarray([self.experience['r'][i] for i in ids])
states_next = np.asarray([self.experience['s2'][i] for i in ids])
dones = np.asarray([self.experience['done'][i] for i in ids])
value_next = np.max(TargetNet.predict(states_next), axis=1)
actual_values = np.where(dones, rewards, rewards+self.gamma*value_next)
with tf.GradientTape() as tape:
selected_action_values = tf.math.reduce_sum(
self.predict(states) * tf.one_hot(actions, self.num_actions), axis=1)
loss = tf.math.reduce_mean(tf.square(actual_values - selected_action_values))
variables = self.model.trainable_variables
gradients = tape.gradient(loss, variables)
self.optimizer.apply_gradients(zip(gradients, variables))
return loss
def get_action(self, states, epsilon):
"""
Gets the action to take based on the epsilon greedy policy
Arguments:
----------
states: array
The states of the environment
epsilon: float
The probability of taking a random action
Returns:
--------
action: int
The action to take
"""
if np.random.random() < epsilon:
return np.random.choice(self.num_actions)
else:
return np.argmax(self.predict(np.atleast_2d(states))[0])
def add_experience(self, exp):
"""
Adds the experience to the memory buffer
Arguments:
----------
exp: tuple
The experience to add to the memory buffer
Returns:
--------
none
"""
if len(self.experience['s']) >= self.max_experiences:
for key in self.experience.keys():
self.experience[key].pop(0)
for key, value in exp.items():
self.experience[key].append(value)
def copy_weights(self, TrainNet):
"""
Copies the weights of the model to the target model
Arguments:
----------
TrainNet: keras model
The target model
Returns:
--------
none
"""
variables1 = self.model.trainable_variables
variables2 = TrainNet.model.trainable_variables
for v1, v2 in zip(variables1, variables2):
v1.assign(v2.numpy())
def play_game(env, TrainNet, TargetNet, epsilon, copy_step):
"""
Here the interaction between the agent and the environment takes place.
The agent takes an action, the environment returns the next state, reward and done.
The agent then adds the experience to the memory buffer and trains the model.
The target model is updated every copy_step.
Arguments:
----------
env: gym environment
The environment to interact with
TrainNet: keras model
The target model
epsilon: float
The probability of taking a random action
Returns:
--------
rewards: int
The total reward obtained from the episode
mean_loss: float
The mean loss of the model
mean_loss: float
The mean loss of the model
"""
rewards = 0
iter = 0
done = False
truncated = False
observations, _ = env.reset()
losses = list()
while not done and not truncated:
action = TrainNet.get_action(observations, epsilon)
prev_observations = observations
observations, reward, done, truncated, _ = env.step(action)
rewards += reward
if done or truncated:
env.reset()
if truncated:
rewards = 500
exp = {'s': prev_observations, 'a': action, 'r': reward, 's2': observations, 'done': done}
TrainNet.add_experience(exp)
loss = TrainNet.train(TargetNet)
if isinstance(loss, int):
losses.append(loss)
else:
losses.append(loss.numpy())
iter += 1
if iter % copy_step == 0:
TargetNet.copy_weights(TrainNet)
return rewards, np.mean(losses)
def test(env, TrainNet):
"""
Tests the model on the environment
Arguments:
----------
env: gym environment
The environment to interact with
TrainNet: keras model
The target model
Returns:
--------
rewards: int
The total reward obtained from the episode
"""
rewards = 0
steps = 0
done = False
truncated = False
observation, _ = env.reset()
while not done and not truncated:
action = TrainNet.get_action(observation, 0)
observation, reward, done, truncated, _= env.step(action)
steps += 1
rewards += reward
rewards = 500 if truncated==True else rewards
return rewards
def main():
"""
Here we initialize the environment, agent and train the agent.
The first part is checking if there is a GPU available.
If you have GPU's, you're a lucky bitch, and can uncomment the GPU line.
First all hyperparameters that should be tuned for optimum performance are set.
Then the loop goes until max number of episodes are reached or the environment is solved
Last some metrics are printed
"""
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
try:
# Currently, memory growth needs to be the same across GPUs
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
logical_gpus = tf.config.experimental.list_logical_devices('GPU')
print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
except RuntimeError as e:
# Memory growth must be set before GPUs have been initialized
print(e)
os.environ["CUDA_VISIBLE_DEVICES"]="0" # use GPU with ID=0 (uncomment if GPU is available)
value_dicts = []
env = gym.make('CartPole-v1')
max_steps = 500 # Environment max step
env._max_episode_steps = max_steps
gamma = 0.95
copy_step = 25
num_states = len(env.observation_space.sample())
num_actions = env.action_space.n
hidden_units = 2
hidden_layers = 24
max_experiences = 10000
min_experiences = 100
batch_size = 32
lr = 1e-3
decay_rate = 0.95
current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
log_dir = 'logs/dqn/' + current_time
summary_writer = tf.summary.create_file_writer(log_dir)
TrainNet = DQN(num_states, num_actions, hidden_units, hidden_layers, gamma, max_experiences, min_experiences, batch_size, lr, max_steps, decay_rate)
TargetNet = DQN(num_states, num_actions, hidden_units, hidden_layers, gamma, max_experiences, min_experiences, batch_size, lr, max_steps, decay_rate)
max_episodes = 2000
total_rewards = np.empty(max_episodes)
epsilon = 1
decay = 0.99
min_epsilon = 0.1
for episode in range(max_episodes):
epsilon = max(min_epsilon, epsilon * decay)
total_reward, losses = play_game(env, TrainNet, TargetNet, epsilon, copy_step)
total_rewards[episode] = total_reward
avg_rewards = np.mean(total_rewards[max(0, episode - 100):(episode + 1)])
with summary_writer.as_default():
tf.summary.scalar('episode reward', total_reward, step=episode)
tf.summary.scalar('running avg reward(100)', avg_rewards, step=episode)
tf.summary.scalar('average loss)', losses, step=episode)
#if episode % 100 == 0 and episode != 0:
#print(f"episode: {episode}, episode reward: {total_reward}, eps: {epsilon}, avg reward (last 100): {avg_rewards}, episode loss: {losses}")
# Check if last 100 episodes have total_reward >= 195 to approve training
if episode >= 100 and all(total_rewards[max(0, episode - 100):(episode + 1)] >= 195):
final_episode = episode
print(f"You solved it in {final_episode} episodes!")
break
# if final_episode doesn't exist set too max episodes.
final_episode = final_episode if 'final_episode' in locals() else max_episodes
test_reward = test(env, TrainNet)
print(f"Test reward: {test_reward}, avgerage reward last 100 episodes: {avg_rewards}, num episodes: {final_episode}")
env.close()
if __name__ == "__main__":
main()