• 欢迎光临~

MAPPO学习笔记(1):从PPO算法开始

开发技术 开发技术 2022-06-19 次浏览

由于这段时间的学习内容涉及到MAPPO算法,并且我对MAPPO算法这种多智能体算法的信息交互机制不甚了解,于是写了这个系列的笔记,目的是巩固知识,并且进行一些粗浅又滑稽的总结。

 

1.PPO算法的介绍

1.1.PG算法

如果要分类的话,根据学习方法的不同,可以将强化学习分为Value-based方法、Policy-based方法,以及基于AC框架的Actor-Critic方法。下面做一个简单的介绍。

 

Value-based方法:典型代表为------Q-learning、Sarsa与DQN,之所以叫“Value-based”方法,是因为在这种类型的方法中,我们的所有决策都是基于一张“表格”采取的,这里的“表格”是一种较为抽象的说法,大致可以理解为:我们现在在一个什么样的状态下,在这个状态下,我们采取一个什么样的动作会获得多大的“收益”。然后我们会根据这样一个“收益”情况采取我们的动作。

 

Policy-based方法:这里的典型代表就是PPO算法,在Q-learning与DQN中,我们输出的会是一个代表状态当前动作收益的Q值,但Policy-based算法摒弃了这种Q值的思想,转而输出当前状态可以采取的所有动作的概率,这样的做法意思就是:此刻的你非常饿,但你不知道自己想吃什么,你只知道你是个肉食主义者,想多吃点肉肉。那么“想吃肉”就是你的策略(Policy),至于吃哪种肉,鸡肉鸭肉鱼肉猪肉牛肉羊肉......都看你心情。

 

 

Actor-Critic方法:这个方法最显著的特点就是AC框架,简单来说,最简单的AC框架由两个部分组成:Actor与Critic,如果我们将Critic看成老师,Actor就可以看成学生,老师会对学生在当前状态的动作进行评价,然后学生会用这个“评价”更新自己的参数。这就是一个最简单的AC框架的更新方式。

 

我们这里展开讲讲PG算法,我们不妨近似地将一个“策略”看成一个概率分布,也许每一个不同的情况都对应着一个概率分布,这时我们的目的就变成了使用带有θ参数的函数对策略进行近似,可以通过调整参数θ从而逼近最优策略。

MAPPO学习笔记(1):从PPO算法开始

 那接下来要做的就是去优化这个策略函数,并且让这个目标函数朝着最大值的方向去优化。同样以吃东西作为例子:你还是一个很饿的人,并且不知道该去吃啥,于是你去尝试了一家你从来没吃过的餐厅,突然发现这一家餐厅很好吃,那么你以后可能就会经常来。这又何尝不是一个策略更新的过程呢?

说了那么多,其实我们的目的只有一个:通过更新策略参数θ来使我们所获得的收益(Reward)最大化。当然这其中有一堆的公式可以进行推导,但我最不喜欢的就是推公式,所以就不从头推到尾了,直接说结论。就是我们可以通过一个更新公式对策略参数θ进行更新,从而达到优化策略的目的。

 

1.2. PPO算法

上面介绍了PG算法,看起来十分地理想且可行,但是PG算法有一个较大的缺点:每当我们更新了一次策略参数θ,那么我们之前的策略可以这样说,“直接作废”。那这样带来的后果是什么呢,已知我们的策略更新是通过旧的策略在环境中采样得到,一旦策略更新了之后,旧的数据——由旧策略采样得到的数据,就不再对现在的新策略的更新有任何的指导意义,于是我们就需要进行新一轮的采样。这就使得传统的PG方法需要花费大量的时间进行采样,极大地减少了数据的利用率。

 为了解决这一个问题,“异策略”这一概念就出现了。简单来说就是用另外一个策略去和环境进行互动,来完成“收集数据”这个任务。然后需要进行训练的策略就可以利用这些数据进行高效的更新。但是!就像上一段中所说的,传统的PG方法无法使用不同策略采集得到的数据进行自我更新,这就需要我们接着引入一些其它的技巧,使得这种“高效”的更新方式变为可能。

第一个技巧是重要性采样(importance sampling)。假设我们有两个策略,一个我们把它叫做$p$,另一个我们把它叫做$q$。已知我们这里有一个函数$f(x)$,且函数中的$x$就是由策略$p$采样得到。那么如果我们要得到函数$f(x)$的均值,就可以从分布$p$中尽量多地采集样本数据$x^i$,通过计算所有$f(x^i)$的均值,我们就可以近似地得到$f(x)$的期望值。就像下面这个公式所表达的那样:

$E_{x sim p}[f(x)] approx frac{1}{N} sum_{i=1}^{N} fleft(x^{i}right)$
但是注意!因为我们使用的是异策略方法,所以这里我们使用的是策略$q$进行采样,那么我们就没有办法直接使用上面的公式得到$f(x)$的近似值。这时候就需要对求期望的公式进行一些变换。
现在我们还是假设所有的$x$都是由策略$p$采样得到,那么我们可以将$f(x)$的期望值表示为下面这个公式:
$int f(x) p(x) d x$
对其进行变换,得到:
$int f(x) p(x) d x=int f(x) frac{p(x)}{q(x)} q(x) d x=E_{x sim q}left[f(x) frac{p(x)}{q(x)}right]$
整理可得:
$E_{x sim p}[f(x)]=E_{x sim q}left[f(x) frac{p(x)}{q(x)}right]$
通过这样的变换,我们就可以通过策略$q$进行采样,然后计算$f(x) frac{p(x)}{q(x)}$,最后求其平均值,同样也可以得到$f(x)$期望的近似值。
看!这样就实现了,在异策略前提下,通过$q$策略收集的数据,计算$p$策略下,函数$f(x)$的期望近似。
我们将这个方法简单地概括为:通过给策略$q$采集得到的每一个数据,乘以一个“重要性权重”:$frac{p(x)}{q(x)}$,就可以对两个策略之间的差异进行修正。有了这样一个修正手段之后,策略$p$与策略$q$之间的关系,就可以近似地看作“行动者”与“信息采集器”,行动者通过信息采集器获取的数据优化自身的策略。
 
第二个技巧就是KL散度,当我们引入上面的重要性采样后,看似所有的问题都迎刃而解,但是要注意的是:采样所使用的策略$q$与决策所使用的策略$p$之间的差距不能过大。因为即使两个分布之间如果均值与方差差异过大,那么就会是训练过程产生各种各样的问题。就好比我是一个肉食主义者,但我却从一个素食主义者朋友喜欢的餐厅列表中来决定今天吃什么。因此就需要引入KL散度来对训练过程进行约束,使得策略$p$与策略$q$之间的差异不能过大。
 
第三个技巧是优势函数。首先我们必须要明白的一点是,PPO生成的是一个随机策略,即在每一种情况下,行动者会采取某些行为的概率。但这样就会产生一个较为严重的问题:好的越来越好,差的越来越差,以至于我们在一些状态下会忽略一些更为合适的动作。通俗的来讲,我们生成的分布会更乐意选择那些在很多情况下表现更为优异的动作,并且以一个极大的概率去采取这样的动作,而以一个极小的概率采取一些表现较差的动作,甚至是“次优解”。这就使得我们“千辛万苦”得到的随机性优势不复存在。于是我们在计算状态-动作对得分时,要人为地引入一个“基线”,这个“基线”就是这个状态下的状态-动作对得分的均值。通过引入这样一个“基线”,我们就可以得到优势函数:即,我们在这个状态下采取的策略要比一般情况下好多少。
现在可以将优势函数与重要性采样结合起来(KL散度稍后再加),那么更新策略所使用的梯度就可以表示为:
$E_{left(s_{t}, a_{t}right) sim pi_{theta^{prime}}}left[frac{p_{theta}left(s_{t}, a_{t}right)}{p_{theta^{prime}}left(s_{t}, a_{t}right)} A^{theta}left(s_{t}, a_{t}right) nabla log p_{theta}left(a_{t}^{n} mid s_{t}^{n}right)right]$
其中$theta^{prime}$代表旧策略,并且也是我们用来采样的策略,而$θ$代表我们当前的策略,上面这个式子就是我们在使用策略梯度对$θ$进行更新时所用到的梯度。这里的$A$是优势函数。
那么我们之前还说过两个策略间的差距不能过大,于是我们将KL散度引入PPO算法,得到的优化目标如下:
$begin{aligned} J_{mathrm{PPO}}^{theta^{prime}}(theta) &=J^{theta^{prime}}(theta)-beta mathrm{KL}left(theta, theta^{prime}right) \ J^{theta^{prime}}(theta) &=E_{left(s_{t}, a_{t}right) sim pi_{theta^{prime}}}left[frac{p_{theta}left(a_{t} mid s_{t}right)}{p_{theta^{prime}}left(a_{t} mid s_{t}right)} A^{theta^{prime}}left(s_{t}, a_{t}right)right] end{aligned}$
在构建代码时,我们可以直接将loss设为负的$J(θ)$,这样就实现了PG中的梯度更新。
值得一提的是,一般情况下我们并不直接构造KL散度,而是采用近端优化惩罚(PPO-penalty)或者近端优化裁剪(PPO-clip)来优化、替代传统的KL散度。
关于这两种方法,可以参考这位大佬的文章:详解近端策略优化(ppo,干货满满) - 简书 (jianshu.com)  大佬写的干货满满!!!我这样的温斯顿都能够较为顺利地理解。
 

2.PPO算法代码

PPO算法的代码较为通俗易懂,这里不多加赘述,只是贴出来看一下,方便理解。但是要注意的是,这段代码中所使用的就是我们之前提到的近端优化裁剪方法(PPO-clip).

网络结构:
点击查看代码
class FeedForwardNN(nn.Module):

    def __init__(self, in_dim, out_dim):
        
        super(FeedForwardNN, self).__init__()

        self.layer1 = nn.Linear(in_dim, 64)
        self.layer2 = nn.Linear(64, 64)
        self.layer3 = nn.Linear(64, out_dim)

    def forward(self, obs):
        
        if isinstance(obs, np.ndarray):
            obs = torch.tensor(obs, dtype=torch.float)

        activation1 = F.relu(self.layer1(obs))
        activation2 = F.relu(self.layer2(activation1))
        output = self.layer3(activation2)

        return output

PPO算法类:

点击查看代码
class PPO:

    def __init__(self, policy_class, env, **hyperparameters):

        # PPO 初始化用于训练的超参数
        self._init_hyperparameters(hyperparameters)

        # 提取环境信息
        self.env = env
        self.obs_dim = env.observation_space.shape[0]
        self.act_dim = env.action_space.shape[0]
        
        # 初始化演员和评论家网络
        self.actor = policy_class(self.obs_dim, self.act_dim)                                                
        self.critic = policy_class(self.obs_dim, 1)

        # 为演员和评论家初始化优化器
        self.actor_optim = Adam(self.actor.parameters(), lr=self.lr)
        self.critic_optim = Adam(self.critic.parameters(), lr=self.lr)

        # 初始化协方差矩阵,用于查询actor网络的action
        self.cov_var = torch.full(size=(self.act_dim,), fill_value=0.5)
        self.cov_mat = torch.diag(self.cov_var)

        # 这个记录器将帮助我们打印出每个迭代的摘要
        self.logger = {
            'delta_t': time.time_ns(),
            't_so_far': 0,          # 到目前为止的时间步数
            'i_so_far': 0,          # 到目前为止的迭代次数
            'batch_lens': [],       # 批次中的episodic长度
            'batch_rews': [],       # 批次中的rews回报
            'actor_losses': [],     # 当前迭代中演员网络的损失
        }

    def learn(self, total_timesteps):

        print(f"Learning... Running {self.max_timesteps_per_episode} timesteps per episode, ", end='')
        print(f"{self.timesteps_per_batch} timesteps per batch for a total of {total_timesteps} timesteps")
        t_so_far = 0 # 到目前为止仿真的时间步数
        i_so_far = 0 # 到目前为止,已运行的迭代次数
        while t_so_far < total_timesteps:                                                                  
    
            # 收集批量实验数据
            batch_obs, batch_acts, batch_log_probs, batch_rtgs, batch_lens = self.rollout()                    

            # 计算收集这一批数据的时间步数
            t_so_far += np.sum(batch_lens)

            # 增加迭代次数
            i_so_far += 1

            # 记录到目前为止的时间步数和到目前为止的迭代次数
            self.logger['t_so_far'] = t_so_far
            self.logger['i_so_far'] = i_so_far

            # 计算第k次迭代的advantage
            V, _ = self.evaluate(batch_obs, batch_acts)
            A_k = batch_rtgs - V.detach()                                                                 

            # 将优势归一化 在理论上不是必须的,但在实践中,它减少了我们优势的方差,使收敛更加稳定和快速。
            # 添加这个是因为在没有这个的情况下,解决一些环境的问题太不稳定了。
            A_k = (A_k - A_k.mean()) / (A_k.std() + 1e-10)
            
            # 在其中更新我们的网络。
            for _ in range(self.n_updates_per_iteration):  
  
                V, curr_log_probs = self.evaluate(batch_obs, batch_acts)

                # 重要性采样的权重
                ratios = torch.exp(curr_log_probs - batch_log_probs)

                surr1 = ratios * A_k
                surr2 = torch.clamp(ratios, 1 - self.clip, 1 + self.clip) * A_k

                # 计算两个网络的损失。
                actor_loss = (-torch.min(surr1, surr2)).mean()
                critic_loss = nn.MSELoss()(V, batch_rtgs)

                # 计算梯度并对actor网络进行反向传播
                # 梯度清零
                self.actor_optim.zero_grad()
                # 反向传播,产生梯度
                actor_loss.backward(retain_graph=True)
                # 通过梯度下降进行优化
                self.actor_optim.step()

                # 计算梯度并对critic网络进行反向传播
                self.critic_optim.zero_grad()
                critic_loss.backward()
                self.critic_optim.step()

                self.logger['actor_losses'].append(actor_loss.detach())
                
            self._log_summary()

            if i_so_far % self.save_freq == 0:
                torch.save(self.actor.state_dict(), './ppo_actor.pth')
                torch.save(self.critic.state_dict(), './ppo_critic.pth')

    def rollout(self):
        """
            这就是我们从实验中收集一批数据的地方。由于这是一个on-policy的算法,我们需要在每次迭代行为者/批评者网络时收集一批新的数据。
        """
        batch_obs = []
        batch_acts = []
        batch_log_probs = []
        batch_rews = []
        batch_rtgs = []
        batch_lens = []

        # 一回合的数据。追踪每一回合的奖励,在回合结束的时候会被清空,开始新的回合。
        ep_rews = []

        # 追踪到目前为止这批程序我们已经运行了多少个时间段
        t = 0 

        # 继续实验,直到我们每批运行超过或等于指定的时间步数
        while t < self.timesteps_per_batch:
            ep_rews = []  每回合收集的奖励

            # 重置环境
            obs = self.env.reset()
            done = False
            
            # 运行一个回合的最大时间为max_timesteps_per_episode的时间步数
            for ep_t in range(self.max_timesteps_per_episode):
            
                if self.render and (self.logger['i_so_far'] % self.render_every_i == 0) and len(batch_lens) == 0:
                    self.env.render()

                # 递增时间步数,到目前为止已经运行了这批程序
                t += 1

                #  追踪本批中的观察结果
                batch_obs.append(obs)

                # 计算action,并在env中执行一次step。
                # 注意,rew是奖励的简称。
                action, log_prob = self.get_action(obs)
                obs, rew, done, _ = self.env.step(action)

                # 追踪最近的奖励、action和action的对数概率
                ep_rews.append(rew)
                batch_acts.append(action)
                batch_log_probs.append(log_prob)

                if done:
                    break
                    
            # 追踪本回合的长度和奖励
            batch_lens.append(ep_t + 1)
            batch_rews.append(ep_rews)

        # 将数据重塑为函数描述中指定形状的张量,然后返回
        batch_obs = torch.tensor(batch_obs, dtype=torch.float)
        batch_acts = torch.tensor(batch_acts, dtype=torch.float)
        batch_log_probs = torch.tensor(batch_log_probs, dtype=torch.float)
        batch_rtgs = self.compute_rtgs(batch_rews)                                                              

        # 在这批中记录回合的回报和回合的长度。
        self.logger['batch_rews'] = batch_rews
        self.logger['batch_lens'] = batch_lens

        return batch_obs, batch_acts, batch_log_probs, batch_rtgs, batch_lens

    def compute_rtgs(self, batch_rews):

        batch_rtgs = []
        
        # 遍历每一回合,一个回合有一批奖励
        for ep_rews in reversed(batch_rews):
            # 到目前为止的折扣奖励
            discounted_reward = 0

            # 遍历这一回合的所有奖励。我们向后退,以便更顺利地计算每一个折现的回报
            for rew in reversed(ep_rews):
                
                discounted_reward = rew + discounted_reward * self.gamma
                batch_rtgs.insert(0, discounted_reward)

        # 将每个回合的折扣奖励的数据转换成张量
        batch_rtgs = torch.tensor(batch_rtgs, dtype=torch.float)

        return batch_rtgs

    def get_action(self, obs):
    
        mean = self.actor(obs)

        # 用上述协方差矩阵中的平均行动和标准差创建一个分布。
        dist = MultivariateNormal(mean, self.cov_mat)
        action = dist.sample()
        log_prob = dist.log_prob(action)

        return action.detach().numpy(), log_prob.detach()

    def evaluate(self, batch_obs, batch_acts):
        """
            估算每个观察值,以及最近一批actor网络迭代中的每个action的对数prob。
        """
        
        # 为每个batch_obs查询critic网络的V值。V的形状应与batch_rtgs相同。
        V = self.critic(batch_obs).squeeze()

        # 使用最近的actor网络计算批量action的对数概率。
        mean = self.actor(batch_obs)
        dist = MultivariateNormal(mean, self.cov_mat)
        log_probs = dist.log_prob(batch_acts)

        # 返回批次中每个观察值的值向量V和批次中每个动作的对数概率log_probs
        return V, log_probs

程序员灯塔
转载请注明原文链接:MAPPO学习笔记(1):从PPO算法开始
喜欢 (0)