DQN使用的环境与之前Q-learning的大体相同,修改了state的表达。
Agent环境
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import time
import sys
import numpy as np
if sys.version_info.major == 2:
import Tkinter as tk
else:
import tkinter as tk
WIDTH = 4
HEIGHT = 3
UNIT = 40
class Maze(tk.Tk, object):
def __init__(self):
super(Maze, self).__init__()
self.action_space = ['u', 'd', 'l', 'r']
self.n_actions = len(self.action_space)
self.n_features = 2 # feature nums of state/observation
self.title('MAZE')
self.geometry('{0}x{1}'.format(WIDTH*UNIT, HEIGHT*UNIT))
self._build_maze()
def _create_object(self, center_x, center_y, size, shape='oval', color='yellow'):
"""create different object of maze including robot, bomb and treasure
"""
if(shape.lower() == 'oval'):
object = self.canvas.create_oval(
center_x - size, center_y - size,
center_x + size, center_y + size,
fill=color
)
elif(shape.lower() == 'rectangle'):
object = self.canvas.create_rectangle(
center_x - size, center_y - size,
center_x + size, center_y + size,
fill=color
)
return object
def _build_maze(self):
"""draw maze including the whole map and different objects
"""
self.canvas = tk.Canvas(self, bg='white', width=WIDTH*UNIT, height=HEIGHT*UNIT)
for c in range(0, WIDTH * UNIT, UNIT):
x0, y0, x1, y1 = c, 0 ,c , HEIGHT * UNIT
self.canvas.create_line(x0, y0, x1, y1)
for r in range(0, HEIGHT * UNIT, UNIT):
x0, y0, x1, y1 = 0, r, WIDTH * UNIT, r
self.canvas.create_line(x0, y0, x1, y1)
self.origin = np.array([20, 20]) # center
self.robot_center = self.origin + np.array([0, UNIT*2])
self.robot_size = 15
self.robot = self._create_object(
self.robot_center[0], self.robot_center[1], self.robot_size,
shape='oval', color='yellow'
)
bomb1_center = self.origin + UNIT
bomb_size = 15
self.bomb1 = self._create_object(
bomb1_center[0], bomb1_center[1], bomb_size,
shape='rectangle', color='red'
)
bomb2_center = self.origin + np.array([UNIT * 3, UNIT])
self.bomb2 = self._create_object(
bomb2_center[0], bomb2_center[1], bomb_size,
shape='rectangle', color='red'
)
treasure_center = self.origin + np.array([UNIT * 3, 0])
treasure_size = 15
self.treasure = self._create_object(
treasure_center[0], treasure_center[1], treasure_size,
shape='rectangle', color='green'
)
self.canvas.pack()
# self.canvas.wait_window() # preview maze
def reset(self):
"""reset the game, init the coords of robot
"""
self.update()
time.sleep(0.5)
self.canvas.delete(self.robot)
self.robot = self._create_object(
self.robot_center[0], self.robot_center[1], self.robot_size,
shape='oval', color='yellow'
)
return (np.array(self.canvas.coords(self.robot)[:2]) - np.array(self.canvas.coords(self.treasure)[:2])) / (
HEIGHT * UNIT)
def step(self, action):
"""operation of the robots and return the coords of robo, reward and final state
"""
s = self.canvas.coords(self.robot)
base_action = np.array([0, 0])
if action == 0:
if s[1] > UNIT:
base_action[1] -= UNIT # up
elif action == 1:
if s[1] < (HEIGHT - 1) * UNIT:
base_action[1] += UNIT # down
elif action == 2:
if s[0] < (WIDTH - 1) * UNIT:
base_action[0] += UNIT # right
elif action == 3:
if s[0] > UNIT:
base_action[0] -= UNIT # left
self.canvas.move(self.robot, base_action[0], base_action[1])
s_ = self.canvas.coords(self.robot) # next coords
if s_ == self.canvas.coords(self.treasure):
reward = 1
done = True
s = 'terminal'
print('Mission complete')
elif s_ == self.canvas.coords(self.bomb1) or s == self.canvas.coords(self.bomb2):
reward = -1
done = True
s = 'terminal'
print('boom! failed!')
else:
reward = 0
done = False
s_ = (np.array(s_[:2]) - np.array(self.canvas.coords(self.treasure)[:2])) / (HEIGHT * UNIT)
return s_, reward, done
def render(self):
time.sleep(0.1)
self.update()
DQN实现
在Tensorboard中可视化数据流图如下:
算法实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import numpy as np
import tensorflow as tf
np.random.seed(1)
tf.set_random_seed(1)
class DeepQLearning:
def __init__(self, n_actions,
n_features,
learning_rate=0.01,
discount=0.9,
e_greedy=0.1,
replace_target_iter=300,
memory_size=500,
batch_size=32,
output_graph=False):
self.n_actions = n_actions
self.n_features = n_features
self.learning_rate = learning_rate
self.gamma = discount
self.epsilon = e_greedy
self.replace_target_iter = replace_target_iter
self.memory_size = memory_size
self.batch_size = batch_size
self.output_graph = output_graph
self.learning_steps = 0
self.memory = np.zeros((self.memory_size, n_features * 2 + 2)) # [s, a ,r ,s_]
self.construct_network()
t_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='Q_target_net')
e_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='Q_eval_net')
# Q_eval_net -> Q_target_net
with tf.variable_scope('target_replacement'):
self.target_replace_op = [tf.assign(t, e) for t, e in zip(t_params, e_params)]
self.sess = tf.Session()
if self.output_graph:
tf.summary.FileWriter("logs", self.sess.graph)
self.sess.run(tf.global_variables_initializer())
def construct_network(self):
with tf.variable_scope('input'):
self.s = tf.placeholder(tf.float32, [None, self.n_features], name='state')
self.a = tf.placeholder(tf.int32, [None, ], name='actions')
self.r = tf.placeholder(tf.float32, [None, ], name='reward')
self.s_ = tf.placeholder(tf.float32, [None, self.n_features], name='state_')
w_initializer, b_initializer = tf.random_normal_initializer(0., 0.3), tf.constant_initializer(0.1)
with tf.variable_scope('Q_eval_net'):
e1 = tf.layers.dense(self.s, 20, tf.nn.relu, kernel_initializer=w_initializer, bias_initializer=b_initializer, name='e1')
self.q_eval = tf.layers.dense(e1, self.n_actions, kernel_initializer=w_initializer, bias_initializer=b_initializer, name='e2')
with tf.variable_scope('Q_target_net'):
t1 = tf.layers.dense(self.s_, 20, tf.nn.relu, kernel_initializer=w_initializer, bias_initializer=b_initializer, name ='t1')
self.q_next = tf.layers.dense(t1, self.n_actions, kernel_initializer=w_initializer, bias_initializer=b_initializer, name='t2')
with tf.variable_scope('Q_target'):
q_target = self.r + self.gamma * tf.reduce_max(self.q_next, axis=1)
self.q_target = tf.stop_gradient(q_target)
with tf.variable_scope('Q_eval'):
a_indices = tf.stack([tf.range(tf.shape(self.a)[0], dtype=tf.int32), self.a], axis=1)
self.q_eval_by_a = tf.gather_nd(params=self.q_eval, indices=a_indices)
with tf.variable_scope('loss'):
self.loss = tf.reduce_mean(tf.squared_difference(self.q_target, self.q_eval_by_a, name='error'))
with tf.variable_scope('train'):
self.train_op = tf.train.RMSPropOptimizer(self.learning_rate).minimize(self.loss)
def store_transition(self, s, a, r,s_):
if not hasattr(self, 'memory_count'):
self.memory_count = 0
transition = np.hstack((s, [a, r], s_))
index = self.memory_count % self.memory_size
self.memory[index, :] = transition
self.memory_count += 1
def choose_action(self, state):
state = state[np.newaxis, :]
if np.random.uniform() < self.epsilon:
action = np.random.randint(0, self.n_actions)
else:
action_value = self.sess.run(self.q_eval, feed_dict={self.s: state})
action = np.argmax(action_value)
return action
def learn(self):
if self.learning_steps % self.replace_target_iter == 0:
self.sess.run(self.target_replace_op)
print('\nreplace tartget net params')
if self.memory_count > self.memory_size:
sample_index = np.random.choice(self.memory_size, size=self.batch_size)
else:
sample_index = np.random.choice(self.memory_count, size=self.batch_size)
batch_memory = self.memory[sample_index, :]
_, _ = self.sess.run(
[self.train_op, self.loss],
feed_dict={
self.s: batch_memory[:, :self.n_features],
self.a: batch_memory[:, self.n_features],
self.r: batch_memory[:, self.n_features + 1],
self.s_: batch_memory[:, -self.n_features:]
}
)
self.learning_steps += 1
Agent训练
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from dqn_env import Maze
from deep_q_learning import DeepQLearning
def update():
for episode in range(100):
state = env.reset()
step_count = 0
while True:
env.render()
action = dqn.choose_action(state)
state_, reward, done = env.step(action)
step_count += 1
dqn.store_transition(state, action, reward, state_)
if (step_count > 200) and (step_count % 5 == 0):
state = state_
if done:
print(' Round over at: {0} round, Total steps: {1} steps'.format(episode, step_count))
break
env.distroy()
if __name__ == '__main__':
env = Maze()
dqn = DeepQLearning(
env.n_actions,
env.n_features,
learning_rate=0.01,
discount=0.9,
e_greedy=0.1,
replace_target_iter=200,
memory_size=2000,
output_graph=True
)
env.after(100, update())
env.mainloop()