DQN 实现(二)

DQN使用的环境与之前Q-learning的大体相同,修改了state的表达。

Agent环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import time
import sys
import numpy as np

if sys.version_info.major == 2:
    import Tkinter as tk
else:
    import tkinter as tk

WIDTH = 4
HEIGHT = 3
UNIT = 40


class Maze(tk.Tk, object):
    def __init__(self):
        super(Maze, self).__init__()
        self.action_space = ['u', 'd', 'l', 'r']
        self.n_actions = len(self.action_space)
        self.n_features = 2  # feature nums of state/observation
        self.title('MAZE')
        self.geometry('{0}x{1}'.format(WIDTH*UNIT, HEIGHT*UNIT))
        self._build_maze()

    def _create_object(self, center_x, center_y, size, shape='oval', color='yellow'):
        """create different object of maze including robot, bomb and treasure
        """
        if(shape.lower() == 'oval'):
            object = self.canvas.create_oval(
                center_x - size, center_y - size,
                center_x + size, center_y + size,
                fill=color
            )
        elif(shape.lower() == 'rectangle'):
            object = self.canvas.create_rectangle(
                center_x - size, center_y - size,
                center_x + size, center_y + size,
                fill=color
            )
        return object
    
    def _build_maze(self):
        """draw maze including the whole map and different objects
        """
        self.canvas = tk.Canvas(self, bg='white', width=WIDTH*UNIT, height=HEIGHT*UNIT)
    
        for c in range(0, WIDTH * UNIT, UNIT):
            x0, y0, x1, y1 = c, 0 ,c , HEIGHT * UNIT
            self.canvas.create_line(x0, y0, x1, y1)
        for r in range(0, HEIGHT * UNIT, UNIT):
            x0, y0, x1, y1 = 0, r, WIDTH * UNIT, r
            self.canvas.create_line(x0, y0, x1, y1)
    
        self.origin = np.array([20, 20]) # center
        self.robot_center = self.origin + np.array([0, UNIT*2])
        self.robot_size = 15
        self.robot = self._create_object(
            self.robot_center[0], self.robot_center[1], self.robot_size,
            shape='oval', color='yellow'
        )
    
        bomb1_center = self.origin + UNIT
        bomb_size = 15
        self.bomb1 = self._create_object(
            bomb1_center[0], bomb1_center[1], bomb_size,
            shape='rectangle', color='red'
        )
        bomb2_center = self.origin + np.array([UNIT * 3, UNIT])
        self.bomb2 = self._create_object(
            bomb2_center[0], bomb2_center[1], bomb_size,
            shape='rectangle', color='red'
        )
    
        treasure_center = self.origin + np.array([UNIT * 3, 0])
        treasure_size = 15
        self.treasure = self._create_object(
            treasure_center[0], treasure_center[1], treasure_size,
            shape='rectangle', color='green'
        )
        self.canvas.pack()
        # self.canvas.wait_window() # preview maze
    
    def reset(self):
        """reset the game, init the coords of robot
        """
        self.update()
        time.sleep(0.5)
        self.canvas.delete(self.robot)
        self.robot = self._create_object(
            self.robot_center[0], self.robot_center[1], self.robot_size,
            shape='oval', color='yellow'
        )
        return (np.array(self.canvas.coords(self.robot)[:2]) - np.array(self.canvas.coords(self.treasure)[:2])) / (
                    HEIGHT * UNIT)
    
    def step(self, action):
        """operation of the robots and return the coords of robo, reward and  final state
        """
        s = self.canvas.coords(self.robot)
        base_action = np.array([0, 0])
        if action == 0:
            if s[1] > UNIT:
                base_action[1] -= UNIT  # up
        elif action == 1:
            if s[1] < (HEIGHT - 1) * UNIT:
                base_action[1] += UNIT  # down
        elif action == 2:
            if s[0] < (WIDTH - 1) * UNIT:
                base_action[0] += UNIT  # right
        elif action == 3:
            if s[0] > UNIT:
                base_action[0] -= UNIT  # left
    
        self.canvas.move(self.robot, base_action[0], base_action[1])
        s_ = self.canvas.coords(self.robot)  # next coords
    
        if s_ == self.canvas.coords(self.treasure):
            reward = 1
            done = True
            s = 'terminal'
            print('Mission complete')
        elif s_ == self.canvas.coords(self.bomb1) or s == self.canvas.coords(self.bomb2):
            reward = -1
            done = True
            s = 'terminal'
            print('boom! failed!')
        else:
            reward = 0
            done = False
            
        s_ = (np.array(s_[:2]) - np.array(self.canvas.coords(self.treasure)[:2])) / (HEIGHT * UNIT)
    
        return s_, reward, done
    
    def render(self):
        time.sleep(0.1)
        self.update()

DQN实现

在Tensorboard中可视化数据流图如下:

算法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import numpy as np
import tensorflow as tf

np.random.seed(1)
tf.set_random_seed(1)

class DeepQLearning:
    def __init__(self, n_actions,
                 n_features,
                 learning_rate=0.01,
                 discount=0.9,
                 e_greedy=0.1,
                 replace_target_iter=300,
                 memory_size=500,
                 batch_size=32,
                 output_graph=False):

        self.n_actions = n_actions
        self.n_features = n_features
        self.learning_rate = learning_rate
        self.gamma = discount
        self.epsilon = e_greedy
        self.replace_target_iter = replace_target_iter
        self.memory_size = memory_size
        self.batch_size = batch_size
        self.output_graph = output_graph
    
        self.learning_steps = 0
        self.memory = np.zeros((self.memory_size, n_features * 2 + 2))  # [s, a ,r ,s_]
    
        self.construct_network()
        t_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='Q_target_net')
        e_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='Q_eval_net')
    
        # Q_eval_net -> Q_target_net
        with tf.variable_scope('target_replacement'):
            self.target_replace_op = [tf.assign(t, e) for t, e in zip(t_params, e_params)]
    
        self.sess = tf.Session()
    
        if self.output_graph:
            tf.summary.FileWriter("logs", self.sess.graph)
    
        self.sess.run(tf.global_variables_initializer())
    
    def construct_network(self):
        with tf.variable_scope('input'):
            self.s = tf.placeholder(tf.float32, [None, self.n_features], name='state')
            self.a = tf.placeholder(tf.int32, [None, ], name='actions')
            self.r = tf.placeholder(tf.float32, [None, ], name='reward')
            self.s_ = tf.placeholder(tf.float32, [None, self.n_features], name='state_')
            w_initializer, b_initializer = tf.random_normal_initializer(0., 0.3), tf.constant_initializer(0.1)
    
        with tf.variable_scope('Q_eval_net'):
            e1 = tf.layers.dense(self.s, 20, tf.nn.relu, kernel_initializer=w_initializer, bias_initializer=b_initializer, name='e1')
            self.q_eval = tf.layers.dense(e1, self.n_actions, kernel_initializer=w_initializer, bias_initializer=b_initializer, name='e2')
    
        with tf.variable_scope('Q_target_net'):
            t1 = tf.layers.dense(self.s_, 20, tf.nn.relu, kernel_initializer=w_initializer, bias_initializer=b_initializer, name ='t1')
            self.q_next = tf.layers.dense(t1, self.n_actions, kernel_initializer=w_initializer, bias_initializer=b_initializer, name='t2')
    
        with tf.variable_scope('Q_target'):
            q_target = self.r + self.gamma * tf.reduce_max(self.q_next, axis=1)
            self.q_target = tf.stop_gradient(q_target)
    
        with tf.variable_scope('Q_eval'):
            a_indices = tf.stack([tf.range(tf.shape(self.a)[0], dtype=tf.int32), self.a], axis=1)
            self.q_eval_by_a = tf.gather_nd(params=self.q_eval, indices=a_indices)
    
        with tf.variable_scope('loss'):
            self.loss = tf.reduce_mean(tf.squared_difference(self.q_target, self.q_eval_by_a, name='error'))
    
        with tf.variable_scope('train'):
            self.train_op = tf.train.RMSPropOptimizer(self.learning_rate).minimize(self.loss)
    
    def store_transition(self, s, a, r,s_):
        if not hasattr(self, 'memory_count'):
            self.memory_count = 0
    
        transition = np.hstack((s, [a, r], s_))
        index = self.memory_count % self.memory_size
        self.memory[index, :] = transition
        self.memory_count += 1
    
    def choose_action(self, state):
        state = state[np.newaxis, :]
        if np.random.uniform() < self.epsilon:
            action = np.random.randint(0, self.n_actions)
        else:
            action_value = self.sess.run(self.q_eval, feed_dict={self.s: state})
            action = np.argmax(action_value)
        return action
    
    def learn(self):
        if self.learning_steps % self.replace_target_iter == 0:
            self.sess.run(self.target_replace_op)
            print('\nreplace tartget net params')
        if self.memory_count > self.memory_size:
            sample_index = np.random.choice(self.memory_size, size=self.batch_size)
        else:
            sample_index = np.random.choice(self.memory_count, size=self.batch_size)
        batch_memory = self.memory[sample_index, :]
        _, _ = self.sess.run(
            [self.train_op, self.loss],
            feed_dict={
                self.s: batch_memory[:, :self.n_features],
                self.a: batch_memory[:, self.n_features],
                self.r: batch_memory[:, self.n_features + 1],
                self.s_: batch_memory[:, -self.n_features:]
            }
        )
    
        self.learning_steps += 1

Agent训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from dqn_env import Maze
from deep_q_learning import DeepQLearning

def update():
    for episode in range(100):
        state = env.reset()
        step_count = 0
        while True:
            env.render()
            action = dqn.choose_action(state)
            state_, reward, done = env.step(action)
            step_count += 1
            dqn.store_transition(state, action, reward, state_)

            if (step_count > 200) and (step_count % 5 == 0):
                state = state_
    
            if done:
                print(' Round over at: {0} round, Total steps: {1} steps'.format(episode, step_count))
                break
    env.distroy()

if __name__ == '__main__':
    env = Maze()
    dqn = DeepQLearning(
        env.n_actions,
        env.n_features,
        learning_rate=0.01,
        discount=0.9,
        e_greedy=0.1,
        replace_target_iter=200,
        memory_size=2000,
        output_graph=True
    )

    env.after(100, update())
    env.mainloop()