首先,写下这篇博客有两方面原因,一方面是为了自己复习(一个月前明明理顺代码了,现在再看又忘了),另一方面帮助和我一样的初学者快速理解DQN的代码吧。

之前的DQN算法的博客,包含基础理论和代码:https://blog.csdn/qq_47997583/article/details/124457346
建议边看上面博客的代码边看本文。

1.代码整体

整体上来说,我们需要定义三个类ReplayBufferQnetDQN
首先从主函数部分开始看:
我们定义的num_episodes为500,通过两个for循环,将整体分为10个iteration,每个50个episode,然后通过进度条显示训练过程。每个episode首先初始化回合奖励为0,得到初始state和初始化done为False。之后执行训练直到当前回合的done是false,循环中首先通过将DQN实例化的agent中的take_action函数采样一个动作,将动作通过step函数得到返回的四元组,然后将这个四元组添加到经验回放池,之后更新state和回合奖励,当经验回放池中数据数量超过500开始进行训练。在训练的时候从经验回放池抽取64个数据传入agent的update函数进行参数更新。

for i in range(10):  #我们定义的num_episodes为500,这里分为10个iteration,每个50个episode,然后通过进度条显示训练过程
    with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:
        for i_episode in range(int(num_episodes / 10)):
            episode_return = 0
            state = env.reset()
            done = False
            while not done:
                action = agent.take_action(state)
                next_state, reward, done, _ = env.step(action)
                replay_buffer.add(state, action, reward, next_state, done)
                state = next_state
                episode_return += reward
                # 当buffer数据的数量超过一定值后,才进行Q网络训练
                if replay_buffer.size() > minimal_size:
                    b_s, b_a, b_r, b_ns, b_d = replay_buffer.sample(batch_size)
                    transition_dict = {
                        'states': b_s,
                        'actions': b_a,
                        'next_states': b_ns,
                        'rewards': b_r,
                        'dones': b_d
                    }
                    agent.update(transition_dict)
            return_list.append(episode_return)
            if (i_episode + 1) % 10 == 0:
                pbar.set_postfix({
                    'episode':
                    '%d' % (num_episodes / 10 * i + i_episode + 1),
                    'return':
                    '%.3f' % np.mean(return_list[-10:])
                })
            pbar.update(1)

2.三个类的实现

1.ReplayBuffer类的实现
首先初始化定义时需要用collections包中的deque函数定义一个buffer。之后需要实现add,sample,size三个方法,其中add需要传入五元组(s,a,r,s,d)并且以元组的形式存入buffer中;sample函数传入batch_size的大小,通过random.sample函数抽样批量大小的数据到transition中,之后通过zip(*)函数解包(注:解包后是多个元组)得到的批量大小的五元组,最后返回五元组并将state转为numpy类型;size就是查看buffer大小。

class ReplayBuffer:
    ''' 经验回放池 '''
    def __init__(self, capacity):
        self.buffer = collections.deque(maxlen=capacity)  # 队列,先进先出

    def add(self, state, action, reward, next_state, done):  # 将数据加入buffer
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):  # 从buffer中采样数据,数量为batch_size
        transitions = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = zip(*transitions)
        return np.array(state), action, reward, np.array(next_state), done

    def size(self):  # 目前buffer中数据的数量
        return len(self.buffer)

2.Qnet实现

class Qnet(torch.nn.Module):
    ''' 只有一层隐藏层的Q网络 '''
    def __init__(self, state_dim, hidden_dim, action_dim):
        super(Qnet, self).__init__()
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
        self.fc2 = torch.nn.Linear(hidden_dim, action_dim)

    def forward(self, x):
        x = F.relu(self.fc1(x))  # 隐藏层使用ReLU激活函数
        return self.fc2(x)

3.DQN实现
DQN类的实现是核心的部分。其中需要实现take_action和update方法。

class DQN:
    ''' DQN算法 '''
    def __init__(self, state_dim, hidden_dim, action_dim, learning_rate, gamma,
                 epsilon, target_update, device):
        self.action_dim = action_dim
        self.q_net = Qnet(state_dim, hidden_dim,
                          self.action_dim).to(device)  # Q网络
        # 目标网络
        self.target_q_net = Qnet(state_dim, hidden_dim,
                                 self.action_dim).to(device)
        # 使用Adam优化器
        self.optimizer = torch.optim.Adam(self.q_net.parameters(),
                                          lr=learning_rate)
        self.gamma = gamma  # 折扣因子
        self.epsilon = epsilon  # epsilon-贪婪策略
        self.target_update = target_update  # 目标网络更新频率
        self.count = 0  # 计数器,记录更新次数
        self.device = device

    def take_action(self, state):  # epsilon-贪婪策略采取动作
        if np.random.random() < self.epsilon:
            action = np.random.randint(self.action_dim)
        else:
            state = torch.tensor([state], dtype=torch.float).to(self.device)
            action = self.q_net(state).argmax().item()
        return action

    def update(self, transition_dict):
        states = torch.tensor(transition_dict['states'],
                              dtype=torch.float).to(self.device)
        actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(
            self.device)
        rewards = torch.tensor(transition_dict['rewards'],
                               dtype=torch.float).view(-1, 1).to(self.device)
        next_states = torch.tensor(transition_dict['next_states'],
                                   dtype=torch.float).to(self.device)
        dones = torch.tensor(transition_dict['dones'],
                             dtype=torch.float).view(-1, 1).to(self.device)

        q_values = self.q_net(states).gather(1, actions)  # Q值
        # 下个状态的最大Q值
        max_next_q_values = self.target_q_net(next_states).max(1)[0].view(
            -1, 1)
        q_targets = rewards + self.gamma * max_next_q_values * (1 - dones
                                                                )  # TD误差目标
        dqn_loss = torch.mean(F.mse_loss(q_values, q_targets))  # 均方误差损失函数
        self.optimizer.zero_grad()  # PyTorch中默认梯度会累积,这里需要显式将梯度置为0
        dqn_loss.backward()  # 反向传播更新参数
        self.optimizer.step()

        if self.count % self.target_update == 0:
            self.target_q_net.load_state_dict(
                self.q_net.state_dict())  # 更新目标网络
        self.count += 1

take_action
其中take_action方法中传入的state的形式如下:

经过这行代码处理后

state = torch.tensor([state], dtype=torch.float).to(self.device)

变为

将state传入神经网络然后输出1*2的向量,通过.argmax()函数取到较大Q值的索引值,.item()函数从tensor中取到实数。

action = self.q_net(state).argmax().item()

update
这是核心中的核心,传入的transition_dict如下图形式:

首先需要这个字典中的五个变量转为tensor并传到cuda中。

q_values是从t时刻估计动作价值函数Q,max_next_q_values从t+1时刻估计动作价值函数Q’。
看这行代码:

q_values = self.q_net(states).gather(1, actions)

gather(1, actions) # 按列索引,索引值为actions



在看这行代码

 max_next_q_values = self.target_q_net(next_states).max(1)[0].view(-1, 1) # max(1)代表按列值索引



q_targets = rewards + self.gamma * max_next_q_values * (1 - dones)
 # 如果done了那么对应的Q值为0
dqn_loss = torch.mean(F.mse_loss(q_values, q_targets))
# 定义为均方损失函数
self.optimizer.zero_grad()  
# PyTorch中默认梯度会累积,这里需要显式将梯度置为0
dqn_loss.backward()  
# 反向传播更新参数
self.optimizer.step()

更多推荐

DQN代码逐行详解