欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

Hands on Reinforcement Learning Frontier Chapter

最编程 2024-07-29 15:05:18
...

15 模仿学习

15.1 简介

虽然强化学习不需要有监督学习中的标签数据,但它十分依赖奖励函数的设置。有时在奖励函数上做一些微小的改动,训练出来的策略就会有天差地别。在很多现实场景中,奖励函数并未给定,或者奖励信号极其稀疏,此时随机设计奖励函数将无法保证强化学习训练出来的策略满足实际需要。例如,对于无人驾驶车辆智能体的规控,其观测是当前的环境感知恢复的 3D 局部环境,动作是车辆接下来数秒的具体路径规划,那么奖励是什么?如果只是规定正常行驶而不发生碰撞的奖励为+1,发生碰撞为-100,那么智能体学习的结果则很可能是找个地方停滞不前。具体能帮助无人驾驶小车规控的奖励函数往往需要专家的精心设计和调试。

假设存在一个专家智能体,其策略可以看成最优策略,我们就可以直接模仿这个专家在环境中交互的状态动作数据来训练一个策略,并且不需要用到环境提供的奖励信号。模仿学习(imitation learning)研究的便是这一类问题,在模仿学习的框架下,专家能够提供一系列状态动作对

\{(s_t,a_t)\}

,表示专家在环境

s_t

下做出了的动作

a_t

,而模仿者的任务则是利用这些专家数据进行训练,无须奖励信号就可以达到一个接近专家的策略。目前学术界模仿学习的方法基本上可以分为 3 类:

  • 行为克隆(behavior cloning,BC)
  • 逆强化学习(inverse RL)
  • 生成式对抗模仿学习(generative adversarial imitation learning,GAIL)

在本章将主要介绍行为克隆方法和生成式对抗模仿学习方法。尽管逆强化学习有良好的学术贡献,但由于其计算复杂度较高,实际应用的价值较小。

15.2 行为克隆

行为克隆(BC)就是直接使用监督学习方法,将专家数据中

(s_t,a_t)

s_t

看作样本输入,

a_t

视为标签,学习的目标为

其中,

B

是专家的数据集,

\mathcal{L}

是对应监督学习框架下的损失函数。若动作是离散的,该损失函数可以是最大似然估计得到的。若动作是连续的,该损失函数可以是均方误差函数。

在训练数据量比较大的时候,BC 能够很快地学习到一个不错的策略。例如,围棋人工智能 AlphaGo 就是首先在 16 万盘棋局的 3000 万次落子数据中学习人类选手是如何下棋的,仅仅凭这个行为克隆方法,AlphaGo 的棋力就已经超过了很多业余围棋爱好者。由于 BC 的实现十分简单,因此在很多实际场景下它都可以作为策略预训练的方法。BC 能使得策略无须在较差时仍然低效地通过和环境交互来探索较好的动作,而是通过模仿专家智能体的行为数据来快速达到较高水平,为接下来的强化学习创造一个高起点。

BC 也存在很大的局限性,该局限在数据量比较小的时候犹为明显。具体来说,由于通过 BC 学习得到的策略只是拿小部分专家数据进行训练,因此 BC 只能在专家数据的状态分布下预测得比较准。然而,强化学习面对的是一个序贯决策问题,通过 BC 学习得到的策略在和环境交互过程中不可能完全学成最优,只要存在一点偏差,就有可能导致下一个遇到的状态是在专家数据中没有见过的。此时,由于没有在此状态(或者比较相近的状态)下训练过,策略可能就会随机选择一个动作,这会导致下一个状态进一步偏离专家策略遇到的的数据分布。最终,该策略在真实环境下不能得到比较好的效果,这被称为行为克隆的复合误差(compounding error)问题,如图 15-1 所示。

图15-1 行为克隆带来的复合误差问题

15.3 生成式对抗模仿学习

生成式对抗模仿学习(generative adversarial imitation learning,GAIL)是 2016 年由斯坦福大学研究团队提出的基于生成式对抗网络的模仿学习,它诠释了生成式对抗网络的本质其实就是模仿学习。GAIL 实质上是模仿了专家策略的占用度量

\rho_E(s,a)

,即尽量使得策略在环境中的所有状态动作对

(s,a)

的占用度量

\rho_\pi(s,a)

和专家策略的占用度量

\rho_E(s,a)

一致。为了达成这个目标,策略需要和环境进行交互,收集下一个状态的信息并进一步做出动作。这一点和 BC 不同,BC 完全不需要和环境交互。GAIL 算法中有一个判别器和一个策略,策略

\pi

就相当于是生成式对抗网络中的生成器(generator),给定一个状态,策略会输出这个状态下应该采取的动作,而判别器(discriminator)

D

将状态动作对

(s,a)

作为输入,输出一个

0

1

之间的实数,表示判别器认为该状态动作对

(s,a)

是来自智能体策略而非专家的概率。判别器

D

的目标是尽量将专家数据的输出靠近

0

,将模仿者策略的输出靠近

1

,这样就可以将两组数据分辨开来。于是,判别器

D

的损失函数为

\mathcal{L}(\phi) = -\mathbb{E}_{\rho_\pi} [\log D_\phi(s,a)] - \mathbb{E}_{\rho_E}[\log(1 - D_\phi(s,a))]

其中

\phi

是判别器

D

的参数。有了判别器

D

之后,模仿者策略的目标就是其交互产生的轨迹能被判别器误认为专家轨迹。于是,我们可以用判别器

D

的输出来作为奖励函数来训练模仿者策略。具体来说,若模仿者策略在环境中采样到状态

s

,并且采取动作

a

,此时该状态动作对

(s,a)

会输入到判别器

D

中,输出

D(s,a)

的值,然后将奖励设置为

r(s,a)=-\log D(s,a)

。于是,我们可以用任意强化学习算法,使用这些数据继续训练模仿者策略。最后,在对抗过程不断进行后,模仿者策略生成的数据分布将接近真实的专家数据分布,达到模仿学习的目标。GAIL 的优化目标如图 15-2 所示。

图15-2 GAIL 的优化目标

第 3 章介绍过一个策略和给定 MDP 交互的占用度量呈一一对应的关系。因此,模仿学习的本质就是通过更新策略使其占用度量尽量靠近专家的占用度量,而这正是 GAIL 的训练目标。由于一旦策略改变,其占用度量就会改变,因此为了训练好最新的判别器,策略需要不断和环境做交互,采样出最新的状态动作对样本。

15.4 代码实践

15.4.1 生成专家数据

首先,我们需要有一定量的专家数据,为此,预先通过 PPO 算法训练出一个表现良好的专家模型,再利用专家模型生成专家数据。本次代码实践的环境是 CartPole-v0,以下是 PPO 代码内容。

import gym
import torch
import torch.nn.functional as F
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import random
import rl_utils


class PolicyNet(torch.nn.Module):
    def __init__(self, state_dim, hidden_dim, action_dim):
        super(PolicyNet, self).__init__()
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
        self.fc2 = torch.nn.Linear(hidden_dim, action_dim)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        return F.softmax(self.fc2(x), dim=1)


class ValueNet(torch.nn.Module):
    def __init__(self, state_dim, hidden_dim):
        super(ValueNet, self).__init__()
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
        self.fc2 = torch.nn.Linear(hidden_dim, 1)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        return self.fc2(x)


class PPO:
    ''' PPO算法,采用截断方式 '''
    def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, epochs, eps, gamma, device):
        self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)
        self.critic = ValueNet(state_dim, hidden_dim).to(device)
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)
        self.gamma = gamma
        self.lmbda = lmbda
        self.epochs = epochs  ## 一条序列的数据用于训练轮数
        self.eps = eps  ## PPO中截断范围的参数
        self.device = device

    def take_action(self, state):
        state = torch.tensor([state], dtype=torch.float).to(self.device)
        probs = self.actor(state)
        action_dist = torch.distributions.Categorical(probs)
        action = action_dist.sample()
        return action.item()

    def update(self, transition_dict):
        states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device)
        actions = torch.tensor(transition_dict['actions']).view(-1, 1).to( self.device)
        rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1).to(self.device)
        next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device)
        dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1).to(self.device)

        td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones)
        td_delta = td_target - self.critic(states)
        advantage = rl_utils.compute_advantage(self.gamma, self.lmbda, td_delta.cpu()).to(self.device)
        old_log_probs = torch.log(self.actor(states).gather(1, actions)).detach()

        for _ in range(self.epochs):
            log_probs = torch.log(self.actor(states).gather(1, actions))
            ratio = torch.exp(log_probs - old_log_probs)
            surr1 = ratio * advantage
            surr2 = torch.clamp(ratio, 1 - self.eps, 1 + self.eps) * advantage  ## 截断
            actor_loss = torch.mean(-torch.min(surr1, surr2))  ## PPO损失函数
            critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))
            self.actor_optimizer.zero_grad()
            self.critic_optimizer.zero_grad()
            actor_loss.backward()
            critic_loss.backward()
            self.actor_optimizer.step()
            self.critic_optimizer.step()


actor_lr = 1e-3
critic_lr = 1e-2
num_episodes = 250
hidden_dim = 128
gamma = 0.98
lmbda = 0.95
epochs = 10
eps = 0.2
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

env_name = 'CartPole-v0'
env = gym.make(env_name)
env.seed(0)
torch.manual_seed(0)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
ppo_agent = PPO(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, epochs, eps, gamma, device)
return_list = rl_utils.train_on_policy_agent(env, ppo_agent, num_episodes)
Iteration 0: 100%|██████████| 25/25 [00:00<00:00, 29.67it/s, episode=20, return=40.700]
Iteration 1: 100%|██████████| 25/25 [00:01<00:00, 15.19it/s, episode=45, return=182.800]
Iteration 2: 100%|██████████| 25/25 [00:01<00:00, 14.11it/s, episode=70, return=176.100]
Iteration 3: 100%|██████████| 25/25 [00:01<00:00, 14.44it/s, episode=95, return=191.500]
Iteration 4: 100%|██████████| 25/25 [00:01<00:00, 14.45it/s, episode=120, return=151.300]
Iteration 5: 100%|██████████| 25/25 [00:02<00:00, 12.15it/s, episode=145, return=200.000]
Iteration 6: 100%|██████████| 25/25 [00:01<00:00, 13.47it/s, episode=170, return=200.000]
Iteration 7: 100%|██████████| 25/25 [00:01<00:00, 13.20it/s, episode=195, return=200.000]
Iteration 8: 100%|██████████| 25/25 [00:01<00:00, 14.43it/s, episode=220, return=188.100]
Iteration 9: 100%|██████████| 25/25 [00:01<00:00, 13.13it/s, episode=245, return=200.000]

接下来开始生成专家数据。因为车杆环境比较简单,我们只生成一条轨迹,并且从中采样 30 个状态动作对(s,a)样本。我们只用这 30 个专家数据样本来训练模仿策略。

def sample_expert_data(n_episode):
    states = []
    actions = []
    for episode in range(n_episode):
        state = env.reset()
        done = False
        while not done:
            action = ppo_agent.take_action(state)
            states.append(state)
            actions.append(action)
            next_state, reward, done, _ = env.step(action)
            state = next_state
    return np.array(states), np.array(actions)


env.seed(0)
torch.manual_seed(0)
random.seed(0)
n_episode = 1
expert_s, expert_a = sample_expert_data(n_episode)

n_samples = 30  ## 采样30个数据
random_index = random.sample(range(expert_s.shape[0]), n_samples)
expert_s = expert_s[random_index]
expert_a = expert_a[random_index]
15.4.2 行为克隆的代码实践

在 BC 中,我们将专家数据中的(st,at)中的ata_tat​视为标签,BC 则转化成监督学习中经典的分类问题,采用最大似然估计的训练方法可得到分类结果。

class BehaviorClone:
    def __init__(self, state_dim, hidden_dim, action_dim, lr):
        self.policy = PolicyNet(state_dim, hidden_dim, action_dim).to(device)
        self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr)

    def learn(self, states, actions):
        states = torch.tensor(states, dtype=torch.float).to(device)
        actions = torch.tensor(actions).view(-1, 1).to(device)
        log_probs = torch.log(self.policy(states).gather(1, actions))
        bc_loss = torch.mean(-log_probs)  ## 最大似然估计

        self.optimizer.zero_grad()
        bc_loss.backward()
        self.optimizer.step()

    def take_action(self, state):
        state = torch.tensor([state], dtype=torch.float).to(device)
        probs = self.policy(state)
        action_dist = torch.distributions.Categorical(probs)
        action = action_dist.sample()
        return action.item()


def test_agent(agent, env, n_episode):
    return_list = []
    for episode in range(n_episode):
        episode_return = 0
        state = env.reset()
        done = False
        while not done:
            action = agent.take_action(state)
            next_state, reward, done, _ = env.step(action)
            state = next_state
            episode_return += reward
        return_list.append(episode_return)
    return np.mean(return_list)


env.seed(0)
torch.manual_seed(0)
np.random.seed(0)

lr = 1e-3
bc_agent = BehaviorClone(state_dim, hidden_dim, action_dim, lr)
n_iterations = 1000
batch_size = 64
test_returns = []

with tqdm(total=n_iterations, desc="进度条") as pbar:
    for i in range(n_iterations):
        sample_indices = np.random.randint(low = 0, high = expert_s.shape[0], size=batch_size)
        bc_agent.learn(expert_s[sample_indices], expert_a[sample_indices])
        current_return = test_agent(bc_agent, env, 5)
        test_returns.append(current_return)
        if (i + 1) % 10 == 0:
            pbar.set_postfix({'return': '%.3f' % np.mean(test_returns[-10:])})
        pbar.update(1)
进度条: 100%|██████████| 1000/1000 [03:05<00:00,  5.40it/s, return=199.320]
iteration_list = list(range(len(test_returns)))
plt.plot(iteration_list, test_returns)
plt.xlabel('Iterations')
plt.ylabel('Returns')
plt.title('BC on {}'.format(env_name))
plt.show()

我们发现 BC 无法学习到最优策略(不同设备运行结果可能会有不同),这主要是因为在数据量比较少的情况下,学习容易发生过拟合。

15.4.3 生成式对抗模仿学习的代码实践

接下来我们实现 GAIL 的代码。

首先实现判别器模型,其模型架构为一个两层的全连接网络,模型输入为一个状态动作对,输出一个概率标量。

class Discriminator(nn.Module):
    def __init__(self, state_dim, hidden_dim, action_dim):
        super(Discriminator, self).__init__()
        self.fc1 = torch.nn.Linear(state_dim + action_dim, hidden_dim)
        self.fc2 = torch.nn.Linear(hidden_dim, 1)

    def forward(self, x, a):
        cat = torch.cat([x, a], dim=1)
        x = F.relu(self.fc1(cat))
        return torch.sigmoid(self.fc2(x))

接下来正式实现 GAIL 的代码。每一轮迭代中,GAIL 中的策略和环境交互,采样新的状态动作对。基于专家数据和策略新采样的数据,首先训练判别器,然后将判别器的输出转换为策略的奖励信号,指导策略用 PPO 算法做训练。

class GAIL:
    def __init__(self, agent, state_dim, action_dim, hidden_dim, lr_d):
        self.discriminator = Discriminator(state_dim, hidden_dim, action_dim).to(device)
        self.discriminator_optimizer = torch.optim.Adam(self.discriminator.parameters(), lr_d)
        self.agent = agent

    def learn(self, expert_s, expert_a, agent_s, agent_a, next_s, dones):
        expert_states = torch.tensor(expert_s, dtype=torch.float).to(device)
        expert_actions = torch.tensor(expert_a).to(device)
        agent_states = torch.tensor(agent_s, dtype=torch.float).to(device)
        agent_actions = torch.tensor(agent_a).to(device)
        expert_actions = F.one_hot(expert_actions, num_classes=2).float()
        agent_actions = F.one_hot(agent_actions, num_classes=2).float()

        expert_prob = self.discriminator(expert_states, expert_actions)
        agent_prob = self.discriminator(agent_states, agent_actions)
        discriminator_loss = nn.BCELoss()(agent_prob, torch.ones_like(agent_prob)) 
                           + nn.BCELoss()(expert_prob, torch.zeros_like(expert_prob))
        self.discriminator_optimizer.zero_grad()
        discriminator_loss.backward()
        self.discriminator_optimizer.step()

        rewards = -torch.log(agent_prob).detach().cpu().numpy()
        transition_dict = {
            'states': agent_s,
            'actions': agent_a,
            'rewards': rewards,
            'next_states': next_s,
            'dones': dones
        }
        self.agent.update(transition_dict)


env.seed(0)
torch.manual_seed(0)
lr_d = 1e-3
agent = PPO(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, epochs, eps, gamma, device)
gail = GAIL(agent, state_dim, action_dim, hidden_dim, lr_d)
n_episode = 500
return_list = []

with tqdm(total=n_episode, desc="进度条") as pbar:
    for i in range(n_episode):
        episode_return = 0
        state = env.reset()
        done = False
        state_list = []
        action_list = []
        next_state_list = []
        done_list = []
        while not done:
            action = agent.take_action(state)
            next_state, reward, done, _ = env.step(action)
            state_list.append(state)
            action_list.append(action)
            next_state_list.append(next_state)
            done_list.append(done)
            state = next_state
            episode_return += reward
        return_list.append(episode_return)
        gail.learn(expert_s, expert_a, state_list, action_list, next_state_list, done_list)
        if (i + 1) % 10 == 0:
            pbar.set_postfix({'return': '%.3f' % np.mean(return_list[-10:])})
        pbar.update(1)
进度条: 100%|██████████| 500/500 [00:35<00:00, 14.20it/s, return=200.000]
iteration_list = list(range(len(return_list)))
plt.plot(iteration_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('GAIL on {}'.format(env_name))
plt.show()

通过上面两个实验的对比我们可以直观地感受到,在数据样本有限的情况下,BC 不能学习到最优策略,但是 GAIL 在相同的专家数据下可以取得非常好的结果。这一方面归因于 GAIL 的训练目标(拉近策略和专家的占用度量)十分贴合模仿学习任务的目标,避免了 BC 中的复合误差问题;另一方面得益于 GAIL 训练中,策略可以和环境交互出更多的数据,以此训练判别器,进而生成对基于策略“量身定做”的指导奖励信号。

15.5 总结

本章讲解了模仿学习的基础概念,即根据一些专家数据来学习一个策略,数据中不包含奖励,和环境交互也不能获得奖励。本章还介绍了模仿学习中的两类方法,分别是行为克隆(BC)和生成式对抗模仿学习(GAIL)。通过实验对比发现,在少量专家数据的情况下,GAIL 能获得更好的效果。

此外,逆向强化学习(IRL)也是模仿学习中的重要方法,它假设环境的奖励函数应该使得专家轨迹获得最高的奖励值,进而学习背后的奖励函数,最后基于该奖励函数做正向强化学习,从而得到模仿策略。感兴趣的读者可以查阅相关文献进行学习。

15.6 参考文献

[1] SYED U, BOWLING M, SCHAPIRE R E. Apprenticeship learning using linear programming [C]// Proceedings of the 25th international conference on Machine learning, 2008: 1032-1039.

[2] HO J, ERMON S. Generative adversarial imitation learning [J]. Advances in neural information processing systems 2016, 29: 4565-4573.

[3] ABBEEL P, NG A Y. Apprenticeship learning via inverse reinforcement learning [C] // Proceedings of the twenty-first international conference on machine learning, 2004.

16 模型预测控制

16.1 简介

之前几章介绍了基于值函数的方法 DQN、基于策略的方法 REINFORCE 以及两者结合的方法 Actor-Critic。它们都是无模型(model-free)的方法,即没有建立一个环境模型来帮助智能体决策。而在深度强化学习领域下,基于模型(model-based)的方法通常用神经网络学习一个环境模型,然后利用该环境模型来帮助智能体训练和决策。利用环境模型帮助智能体训练和决策的方法有很多种,例如可以用与之前的 Dyna 类似的思想生成一些数据来加入策略训练中。本章要介绍的模型预测控制(model predictive control,MPC)算法并不构建一个显式的策略,只根据环境模型来选择当前步要采取的动作。

16.2 打靶法

首先,让我们用一个形象的比喻来帮助理解模型预测控制方法。假设我们在下围棋,现在根据棋盘的布局,我们要选择现在落子的位置。一个优秀的棋手会根据目前局势来推演落子几步可能发生的局势,然后选择局势最好的一种情况来决定当前落子位置。

模型预测控制方法就是这样一种迭代的、基于模型的控制方法。值得注意的是,MPC 方法中不存在一个显式的策略。具体而言,MPC 方法在每次采取动作时,首先会生成一些候选动作序列,然后根据当前状态来确定每一条候选序列能得到多好的结果,最终选择结果最好的那条动作序列的第一个动作来执行。因此,在使用 MPC 方法时,主要在两个过程中迭代,一是根据历史数据学习环境模型P^(s,a),二是在和真实环境交互过程中用环境模型来选择动作。

首先,我们定义模型预测方法的目标。在第

k

步时,我们要想做的就是最大化智能体的累积奖励,具体来说就是:

其中

H

为推演的长度,

表示从所有动作序列中选取累积奖励最大的序列。我们每次取最优序列中的第一个动作

a_k

来与环境交互。MPC 方法中的一个关键是如何生成一些候选动作序列,候选动作生成的好坏将直接影响到 MPC 方法得到的动作。生成候选动作序列的过程我们称为打靶(shooting).

16.2.1 随机打靶法

随机打靶法(random shooting method)的做法便是随机生成N条动作序列,即在生成每条动作序列的每一个动作时,都是从动作空间中随机采样一个动作,最终组合成NNN条长度为K的动作序列。

对于一些简单的环境,这个方法不但十分简单,而且效果还不错。那么,能不能在随机的基础上,根据已有的结果做得更好一些呢?接下来,我们来介绍另外一种打靶法:交叉熵方法

16.2.2 交叉熵方法

交叉熵方法(cross entropy method,CEM)是一种进化策略方法,它的核心思想是维护一个带参数的分布,根据每次采样的结果来更新分布中的参数,使得分布中能获得较高累积奖励的动作序列的概率比较高。相比于随机打靶法,交叉熵方法能够利用之前采样到的比较好的结果,在一定程度上减少采样到一些较差动作的概率,从而使得算法更加高效。对于一个与连续动作交互的环境来说,每次交互时交叉熵方法的做法如下:

for 次数

e=1\rightarrow E

do

  • 从分布
P(\mathbf{A})

中选取

N

条动作序列

\mathbf{A}_1,\cdots,\mathbf{A}_N
  • 对于每条动作序列
\mathbf{A}_1,\cdots,\mathbf{A}_N

,用环境模型评估累积奖励

  • 根据评估结果保留
M

条最优的动作序列

\mathbf{A}_{i_1},\cdots,\mathbf{A}_{i_M}
  • 用这些动作序列
\mathbf{A}_{i_1},\cdots,\mathbf{A}_{i_M}

去更新分布

p(\mathbf{A})
  • end for
  • 计算所有最优动作序列的第一个动作的均值,作为当前时刻采取的动作

我们可以使用如下的代码来实现交叉熵方法,其中将采用截断正态分布。

import numpy as np
from scipy.stats import truncnorm
import gym
import itertools
import torch
import torch.nn as nn
import torch.nn.functional as F
import collections
import matplotlib.pyplot as plt

class CEM:
    def __init__(self, n_sequence, elite_ratio, fake_env, upper_bound, lower_bound):
        self.n_sequence = n_sequence
        self.elite_ratio = elite_ratio
        self.upper_bound = upper_bound
        self.lower_bound = lower_bound
        self.fake_env = fake_env

    def optimize(self, state, init_mean, init_var):
        mean, var = init_mean, init_var
        X = truncnorm(-2, 2, loc=np.zeros_like(mean), scale=np.ones_like(var))
        state = np.tile(state, (self.n_sequence, 1))

        for _ in range(5):
            lb_dist, ub_dist = mean - self.lower_bound, self.upper_bound - mean
            constrained_var = np.minimum(
                np.minimum(
                    np.square(lb_dist / 2),
                    np.square(ub_dist / 2)
                ),
                var
            )
            ## 生成动作序列
            action_sequences = [X.rvs() for _ in range(self.n_sequence)] * np.sqrt(constrained_var) + mean
            ## 计算每条动作序列的累积奖励
            returns = self.fake_env.propagate(state, action_sequences)[:, 0]
            ## 选取累积奖励最高的若干条动作序列
            elites = action_sequences[np.argsort(returns)][-int(self.elite_ratio * self.n_sequence):]
            new_mean = np.mean(elites, axis=0)
            new_var = np.var(elites, axis=0)
            ## 更新动作序列分布
            mean = 0.1 * mean + 0.9 * new_mean
            var = 0.1 * var + 0.9 * new_var
        return mean

16.3 PETS 算法

带有轨迹采样的概率集成(probabilistic ensembles with trajectory sampling,PETS)是一种使用 MPC 的基于模型的强化学习算法。在 PETS 中,环境模型采用了集成学习的方法,即会构建多个环境模型,然后用这多个环境模型来进行预测,最后使用 CEM 进行模型预测控制。接下来,我们来详细介绍模型构建与模型预测的方法。

在强化学习中,与智能体交互的环境是一个动态系统,所以拟合它的环境模型也通常是一个动态模型。我们通常认为一个系统中有两种不确定性,分别是偶然不确定性(aleatoric uncertainty)和认知不确定性(epistemic uncertainty)。偶然不确定性是由于系统中本身存在的随机性引起的,而认知不确定性是由“见”过的数据较少导致的自身认知的不足而引起的,如图 16-1 所示。

图16-1 偶然不确定性和认知不确定性

在 PET 算法中,环境模型的构建会同时考虑到这两种不确定性。首先,我们定义环境模型的输出为一个高斯分布,用来捕捉偶然不确定性。令环境模型为

\hat{P}

,其参数为

\theta

,那么基于当前状态动作对

(s_t,a_t)

,下一个状态

s_t

的分布可以写为

\hat{P}(s_t,a_t) = \mathcal{N}\Big( \mu_\theta(s_t,a_t), \Sigma_\theta(s_t,a_t) \Big)

这里我们可以采用神经网络来构建

\mu_\theta

\Sigma_\theta

。这样,神经网络的损失函数则为

\mathcal{L}(\theta) = \sum_{n=1}^N \Big[ \mu_\theta(s_n,a_n) - s_{n+1} \Big]^T \Sigma_\theta^{-1}(s_t,a_t) \Big[ \mu_\theta(s_n,a_n) - s_{n+1} \Big] + \log \det \Sigma_\theta(s_n,a_n)

这样我们就得到了一个由神经网络表示的环境模型。在此基础之上,我们选择用集成(ensemble)方法来捕捉认知不确定性。具体而言,我们构建BBB个网络框架一样的神经网络,它们的输入都是状态动作对,输出都是下一个状态的高斯分布的均值向量和协方差矩阵。但是它们的参数采用不同的随机初始化方式,并且当每次训练时,会从真实数据中随机采样不同的数据来训练。

有了环境模型的集成后,MPC 算法会用其来预测奖励和下一个状态。具体来说,每一次预测会从B个模型中挑选一个来进行预测,因此一条轨迹的采样会使用到多个环境模型,如图 16-2 所示。

图16-2 PETS 算法利用各个环境模型选取动作

16.4 PETS 算法实践

首先,为了搭建这样一个较为复杂的模型,我们定义模型中每一层的构造。在定义时就必须考虑每一层都是一个集成。

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")


class Swish(nn.Module):
    ''' Swish激活函数 '''
    def __init__(self):
        super(Swish, self).__init__()

    def forward(self, x):
        return x * torch.sigmoid(x)


def init_weights(m):
    ''' 初始化模型权重 '''
    def truncated_normal_init(t, mean=0.0, std=0.01):
        torch.nn.init.normal_(t, mean=mean, std=std)
        while True:
            cond = (t < mean - 2 * std) | (t > mean + 2 * std)
            if not torch.sum(cond):
                break
            t = torch.where(
                cond,
                torch.nn.init.normal_(
                    torch.ones(t.shape, device=device),
                    mean=mean,
                    std=std
                ),
                t
            )
        return t

    if type(m) == nn.Linear or isinstance(m, FCLayer):
        truncated_normal_init(m.weight, std=1 / (2 * np.sqrt(m._input_dim)))
        m.bias.data.fill_(0.0)


class FCLayer(nn.Module):
    ''' 集成之后的全连接层 '''
    def __init__(self, input_dim, output_dim, ensemble_size, activation):
        super(FCLayer, self).__init__()
        self._input_dim, self._output_dim = input_dim, output_dim
        self.weight = nn.Parameter(torch.Tensor(ensemble_size, input_dim, output_dim).to(device))
        self._activation = activation
        self.bias = nn.Parameter(torch.Tensor(ensemble_size, output_dim).to(device))

    def forward(self, x):
        return self._activation(
            torch.add(torch.bmm(x, self.weight), self.bias[:, None, :]))

接着,使用高斯分布的概率模型来定义一个集成模型。

class EnsembleModel(nn.Module):
    ''' 环境模型集成 '''
    def __init__(
        self,
        state_dim,
        action_dim,
        ensemble_size=5,
        learning_rate=1e-3
    ):
        super(EnsembleModel, self).__init__()
        ## 输出包括均值和方差,因此是状态与奖励维度之和的两倍
        self._output_dim = (state_dim + 1) * 2
        self._max_logvar = nn.Parameter(
            (torch.ones((1, self._output_dim // 2)).float() / 2).to(device),
            requires_grad=False
        )
        self._min_logvar = nn.Parameter(
            (-torch.ones((1, self._output_dim // 2)).float() * 10).to(device),
            requires_grad=False
        )

        self.layer1 = FCLayer(state_dim + action_dim, 200, ensemble_size, Swish())
        self.layer2 = FCLayer(200, 200, ensemble_size, Swish())
        self.layer3 = FCLayer(200, 200, ensemble_size, Swish())
        self.layer4 = FCLayer(200, 200, ensemble_size, Swish())
        self.layer5 = FCLayer(200, self._output_dim, ensemble_size, nn.Identity())
        self.apply(init_weights)  ## 初始化环境模型中的参数
        self.optimizer = torch.optim.Adam(self.parameters(), lr=learning_rate)

    def forward(self, x, return_log_var=False):
        ret = self.layer5(self.layer4(self.layer3(self.layer2(self.layer1(x)))))
        mean = ret[:, :, :self._output_dim // 2]
        ## 在PETS算法中,将方差控制在最小值和最大值之间
        logvar = self._max_logvar - F.softplus(self._max_logvar - ret[:, :, self._output_dim // 2:])
        logvar = self._min_logvar + F.softplus(logvar - self._min_logvar)
        return mean, logvar if return_log_var else torch.exp(logvar)

    def loss(self, mean, logvar, labels, use_var_loss=True):
        inverse_var = torch.exp(-logvar)
        if use_var_loss:
            mse_loss = torch.mean(
                torch.mean(torch.pow(mean - labels, 2) * inverse_var, dim=-1),
                dim=-1
            )
            var_loss = torch.mean(torch.mean(logvar, dim=-1), dim=-1)
            total_loss = torch.sum(mse_loss) + torch.sum(var_loss)
        else:
            mse_loss = torch.mean(torch.pow(mean - labels, 2), dim=(1, 2))
            total_loss = torch.sum(mse_loss)
        return total_loss, mse_loss

    def train(self, loss):
        self.optimizer.zero_grad()
        loss += 0.01 * torch.sum(self._max_logvar) - 0.01 * torch.sum(self._min_logvar)
        loss.backward()
        self.optimizer.step()

接下来,我们定义一个EnsembleDynamicsModel的类,把模型集成的训练设计得更加精细化。具体而言,我们并不会选择模型训练的轮数,而是在每次训练的时候将一部分数据单独取出来,用于验证模型的表现,在 5 次没有获得表现提升时就结束训练。

class EnsembleDynamicsModel:
    ''' 环境模型集成,加入精细化的训练 '''
    def __init__(self, state_dim, action_dim, num_network=5):
        self._num_network = num_network
        self._state_dim, self._action_dim = state_dim, action_dim
        self.model = EnsembleModel(
            state_dim,
            action_dim,
            ensemble_size=num_network
        )
        self._epoch_since_last_update = 0

    def train(
        self,
        inputs,
        labels,
        batch_size=64,
        holdout_ratio=0.1,
        max_iter=20
    ):
        ## 设置训练集与验证集
        permutation = np.random.permutation(inputs.shape[0])
        inputs, labels = inputs[permutation], labels[permutation]
        num_holdout = int(inputs.shape[0] * holdout_ratio)
        train_inputs, train_labels = inputs[num_holdout:], labels[num_holdout:]
        holdout_inputs, holdout_labels = inputs[:num_holdout], labels[:num_holdout]
        holdout_inputs = torch.from_numpy(holdout_inputs).float().to(device)
        holdout_labels = torch.from_numpy(holdout_labels).float().to(device)
        holdout_inputs = holdout_inputs[None, :, :].repeat([self._num_network, 1, 1])
        holdout_labels = holdout_labels[None, :, :].repeat([self._num_network, 1, 1])

        ## 保留最好的结果
        self._snapshots = {i: (None, 1e10) for i in range(self._num_network)}

        for epoch in itertools.count():
            ## 定义每一个网络的训练数据
            train_index = np.vstack([
                np.random.permutation(train_inputs.shape[0])
                for _ in range(self._num_network)
            ])
            ## 所有真实数据都用来训练
            for batch_start_pos in range(0, train_inputs.shape[0], batch_size):
                batch_index = train_index[:, batch_start_pos:batch_start_pos + batch_size]
                train_input = torch.from_numpy(train_inputs[batch_index]).float().to(device)
                train_label = torch.from_numpy(train_labels[batch_index]).float().to(device)
                mean, logvar = self.model(train_input, return_log_var=True)
                loss, _ = self.model.loss(mean, logvar, train_label)
                self.model.train(loss)

            with torch.no_grad():
                mean, logvar = self.model(holdout_inputs, return_log_var=True)
                _, holdout_losses = self.model.loss(
                    mean,
                    logvar,
                    holdout_labels,
                    use_var_loss=False
                )
                holdout_losses = holdout_losses.cpu()
                break_condition = self._save_best(epoch, holdout_losses)
                if break_condition or epoch > max_iter:  ## 结束训练
                    break

    def _save_best(self, epoch, losses, threshold=0.1):
        updated = False
        for i in range(len(losses)):
            current = losses[i]
            _, best = self._snapshots[i]
            improvement = (best - current) / best
            if improvement > threshold:
                self._snapshots[i] = (epoch, current)
                updated = True
        self._epoch_since_last_update = 0 if updated else self._epoch_since_last_update + 1
        return self._epoch_since_last_update > 5

    def predict(self, inputs, batch_size=64):
        mean, var = [], []
        for i in range(0, inputs.shape[0], batch_size):
            input = torch.from_numpy(
                inputs[i:min(i + batch_size, inputs.shape[0])]
            ).float().to(device)
            cur_mean, cur_var = self.model(input[None, :, :].repeat([self._num_network, 1, 1]), return_log_var=False)
            mean.append(cur_mean.detach().cpu().numpy())
            var.append(cur_var.detach().cpu().numpy())
        return np.hstack(mean), np.hstack(var)

有了环境模型之后,我们就可以定义一个FakeEnv,主要用于实现给定状态和动作,用模型集成来进行预测。该功能会用在 MPC 算法中。

class FakeEnv:
    def __init__(self, model):
        self.model = model

    def step(self, obs, act):
        inputs = np.concatenate((obs, act), axis=-1)
        ensemble_model_means, ensemble_model_vars = self.model.predict(inputs)
        ensemble_model_means[:, :, 1:] += obs.numpy()
        ensemble_model_stds = np.sqrt(ensemble_model_vars)
        ensemble_samples = ensemble_model_means + np.random.normal(
            size=ensemble_model_means.shape
        ) * ensemble_model_stds

        num_models, batch_size, _ = ensemble_model_means.shape
        models_to_use = np.random.choice(
            [i for i in range(self.model._num_network)],
            size=batch_size
        )
        batch_inds = np.arange(0, batch_size)
        samples = ensemble_samples[models_to_use, batch_inds]
        rewards, next_obs = samples[:, :1], samples[:, 1:]
        return rewards, next_obs

    def propagate(self, obs, actions):
        with torch.no_grad():
            obs = np.copy(obs)
            total_reward = np.expand_dims(np.zeros(obs.shape[0]), axis=-1)
            obs, actions = torch.as_tensor(obs), torch.as_tensor(actions)
            for i in range(actions.shape[1]):
                action = torch.unsqueeze(actions[:, i], 1)
                rewards, next_obs = self.step(obs, action)
                total_reward += rewards
                obs = torch.as_tensor(next_obs)
            return total_reward

接下来定义经验回放池的类Replay Buffer。与之前的章节对比,此处经验回放缓冲区会额外实现一个返回所有数据的函数。

class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = collections.deque(maxlen=capacity)

    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def size(self):
        return len(self.buffer)

    def return_all_samples(self):
        all_transitions = list(self.buffer)
        state, action, reward, next_state, done = zip(*all_transitions)
        return np.array(state), action, reward, np.array(next_state), done

接下来是 PETS 算法的主体部分。

class PETS:
    ''' PETS算法 '''
    def __init__(self, env, replay_buffer, n_sequence, elite_ratio,
                 plan_horizon, num_episodes):
        self._env = env
        self._env_pool = replay_buffer

        obs_dim = env.observation_space.shape[0]
        self._action_dim = env.action_space.shape[0]
        self._model = EnsembleDynamicsModel(obs_dim, self._action_dim)
        self._fake_env = FakeEnv(self._model)
        self.upper_bound = env.action_space.high[0]
        self.lower_bound = env.action_space.low[0]

        self._cem = CEM(n_sequence, elite_ratio, self._fake_env,
                        self.upper_bound, self.lower_bound)
        self.plan_horizon = plan_horizon
        self.num_episodes = num_episodes

    def train_model(self):
        env_samples = self._env_pool.return_all_samples()
        obs = env_samples[0]
        actions = np.array(env_samples[1])
        rewards = np.array(env_samples[2]).reshape(-1, 1)
        next_obs = env_samples[3]
        inputs = np.concatenate((obs, actions), axis=-1)
        labels = np.concatenate((rewards, next_obs - obs), axis=-1)
        self._model.train(inputs, labels)

    def mpc(self):
        mean = np.tile(
            (self.upper_bound + self.lower_bound) / 2.0,
            self.plan_horizon
        )
        var = np.tile(
            np.square(self.upper_bound - self.lower_bound) / 16,
            self.plan_horizon
        )
        obs, done, episode_return = self._env.reset(), False, 0
        while not done:
            actions = self._cem.optimize(obs, mean, var)
            action = actions[:self._action_dim]  ## 选取第一个动作
            next_obs, reward, done, _ = self._env.step(action)
            self._env_pool.add(obs, action, reward, next_obs, done)
            obs = next_obs
            episode_return += reward
            mean = np.concatenate([
                np.copy(actions)[self._action_dim:],
                np.zeros(self._action_dim)
            ])
        return episode_return

    def explore(self):
        obs, done, episode_return = self._env.reset(), False, 0
        while not done:
            action = self._env.action_space.sample()
            next_obs, reward, done, _ = self._env.step(action)
            self._env_pool.add(obs, action, reward, next_obs, done)
            obs = next_obs
            episode_return += reward
        return episode_return

    def train(self):
        return_list = []
        explore_return = self.explore()  ## 先进行随机策略的探索来收集一条序列的数据
        print('episode: 1, return: %d' % explore_return)
        return_list.append(explore_return)

        for i_episode in range(self.num_episodes - 1):
            self.train_model()
            episode_return = self.mpc()
            return_list.append(episode_return)
            print('episode: %d, return: %d' % (i_episode + 2, episode_return))
        return return_list

大功告成!让我们在倒立摆环境上试一下吧,以下代码需要一定的运行时间。

buffer_size = 100000
n_sequence = 50
elite_ratio = 0.2
plan_horizon = 25
num_episodes = 10
env_name = 'Pendulum-v0'
env = gym.make(env_name)

replay_buffer = ReplayBuffer(buffer_size)
pets = PETS(env, replay_buffer, n_sequence, elite_ratio, plan_horizon, num_episodes)
return_list = pets.train()

episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('PETS on {}'.format(env_name))
plt.show()
episode: 1, return: -985
episode: 2, return: -1384
episode: 3, return: -1006
episode: 4, return: -1853
episode: 5, return: -378
episode: 6, return: -123
episode: 7, return: -124
episode: 8, return: -122
episode: 9, return: -124
episode: 10, return: -125

可以看出,PETS 算法的效果非常好,但是由于每次选取动作都需要在环境模型上进行大量的模拟,因此运行速度非常慢。与 SAC 算法的结果进行对比可以看出,PETS 算法大大提高了样本效率,在比 SAC 算法的环境交互次数少得多的情况下就取得了差不多的效果。

16.5 总结

通过学习与实践,我们可以看出模型预测控制(MPC)方法有着其独特的优势,例如它不用构建和训练策略,可以更好地利用环境,可以进行更长步数的规划。但是 MPC 也有其局限性,例如模型在多步推演之后的准确性会大大降低,简单的控制策略对于复杂系统可能不够。MPC 还有一个更为严重的问题,即每次计算动作的复杂度太大,这使其在一些策略及时性要求较高的系统中应用就变得不太现实。

16.6 参考文献

[1] CHUA K, CALANDRA R, MCALLISTER R, et al. Deep reinforcement learning in a handful of trials using probabilistic dynamics models [J]. Advances in neural information processing systems, 2018: 31.

[2] LAKSHMINARAYANAN B, PRITZEL A, BLUNDELL C. Simple and scalable predictive uncertainty estimation using deep ensembles [J]. Advances in neural information processing systems, 2017: 30.

17 基于模型的策略优化

17.1 简介

第 16 章介绍的 PETS 算法是基于模型的强化学习算法中的一种,它没有显式构建一个策略(即一个从状态到动作的映射函数)。回顾一下之前介绍过的 Dyna-Q 算法,它也是一种基于模型的强化学习算法。但是 Dyna-Q 算法中的模型只存储之前遇到的数据,只适用于表格型环境。而在连续型状态和动作的环境中,我们需要像 PETS 算法一样学习一个用神经网络表示的环境模型,此时若继续利用 Dyna 的思想,可以在任意状态和动作下用环境模型来生成一些虚拟数据,这些虚拟数据可以帮助进行策略的学习。如此,通过和模型进行交互产生额外的虚拟数据,对真实环境中样本的需求量就会减少,因此通常会比无模型的强化学习方法具有更高的采样效率。本章将介绍这样一种算法——MBPO 算法。

17.2 MBPO 算法

基于模型的策略优化(model-based policy optimization,MBPO)算法是加州大学伯克利分校的研究员在 2019 年的 NeurIPS 会议中提出的。随即 MBPO 成为深度强化学习中最重要的基于模型的强化学习算法之一。

MBPO 算法基于以下两个关键的观察: (1) 随着环境模型的推演步数变长,模型累积的复合误差会快速增加,使得环境模型得出的结果变得很不可靠; (2) 必须要权衡推演步数增加后模型增加的误差带来的负面作用与步数增加后使得训练的策略更优的正面作用,二者的权衡决定了推演的步数。

MBPO 算法在这两个观察的基础之上,提出只使用模型来从之前访问过的真实状态开始进行较短步数的推演,而非从初始状态开始进行完整的推演。这就是 MBPO 中的分支推演(branched rollout)的概念,即在原来真实环境中采样的轨迹上面推演出新的“短分支”,如图 17-1 所示。这样做可以使模型的累积误差不至于过大,从而保证最后的采样效率和策略表现。

图17-1 分支推演示意图

MBPO 与第 6 章讲解的经典的 Dyna-Q 算法十分类似。Dyna-Q 采用的无模型强化学习部分是 Q-learning,而 MBPO 采用的是 SAC。此外,MBPO 算法中环境模型的构建和 PETS 算法中一致,都使用模型集成的方式,并且其中每一个环境模型的输出都是一个高斯分布。接下来,我们来看一下 MBPO 的具体算法框架。MBPO 算法会把真实环境样本作为分支推演的起点,使用模型进行一定步数的推演,并用推演得到的模型数据用来训练模型。

  • 初始化策略
\pi_\phi

、环境模型参数

p_\theta

、真实环境数据集

D_{\text{env}}

、模型数据集

D_{\text{model}}
  • for 轮数
n=1\rightarrow N

do

  • 通过环境数据来训练模型参数
p_\theta
  • for 时间步
t=1\rightarrow T

do

  • 根据策略
\pi_\phi

与环境交互,并将交互的轨迹添加到

D_{\text{env}}

  • for 模型推演次数
e=1\rightarrow E

do

D_{\text{env}}

中均匀随机采样一个状态

s_t
s_t

为初始状态,在模型中使用策略

\pi_\phi

进行

k

步的推演,并将生成的轨迹添加到

D_{\text{model}}

  • end for
  • for 梯度更新次数
g=1\rightarrow G

do

  • 基于模型数据
D_{\text{model}}

,使用 SAC 来更新策略参数

\pi_\phi
  • end for
  • end for
  • end for

分支推演的长度kkk是平衡样本效率和策略性能的重要超参数。接下来我们看看 MBPO 的代码,本章最后会给出关于 MBPO 的理论推导,可以指导参数kkk的选取。

17.3 MBPO 代码实践

首先,我们先导入一些必要的包。

import gym
from collections import namedtuple
import itertools
from itertools import count
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions.normal import Normal
import numpy as np
import collections
import random
import matplotlib.pyplot as plt

MBPO 算法使用 SAC 算法来训练策略。和 SAC 算法相比,MBPO 多用了一些模型推演得到的数据来训练策略。要想了解 SAC 方法的详细过程,读者可以阅读第 14 章对应的内容。我们将 SAC 代码直接复制到此处。

class PolicyNet