【深度强化学习】(5) DDPG 模型解析,附Pytorch完整代码

大家好,今天和各位分享一下深度确定性策略梯度算法 (Deterministic Policy Gradient,DDPG)。并基于 OpenAI 的 gym 环境完成一个小游戏。完整代码在我的 GitHub 中获得:

https://github.com/LiSir-HIT/Reinforcement-Learning/tree/main/Model


1. 基本原理

深度确定性策略梯度算法是结合确定性策略梯度算法的思想,对 DQN 的一种改进,是一种无模型的深度强化学习算法。

DDPG 算法使用演员-评论家(Actor-Critic)算法作为其基本框架,采用深度神经网络作为策略网络和动作值函数的近似,使用随机梯度法训练策略网络和价值网络模型中的参数。DDPG 算法的原理如下图所示。

DDPG 算法架构中使用双重神经网络架构,对于策略函数和价值函数均使用双重神经网络模型架构(即 Online 网络和 Target 网络),使得算法的学习过程更加稳定,收敛的速度加快。同时该算法引入经验回放机制,Actor 与环境交互生产生的经验数据样本存储到经验池中,抽取批量数据样本进行训练,即类似于 DQN 的经验回放机制,去除样本的相关性和依赖性,使得算法更加容易收敛。 


2. 公式推导

为了便于大家理解 DDPG 的推导过程,算法框架如下图所示:

DDPG 共包含 4 个神经网络,用于对 Q 值函数和策略的近似表示。Critic 目标网络用于近似估计下一时刻的状态-动作的 Q 值函数 Q_{w'}(S_{t+1},\pi _{\theta '}(S_{t+1})),其中,下一动作值是通过 Actor 目标网络近似估计得到的 \pi_{\theta' }(S_{t+1})。于是可以得到当前状态下 Q 值函数的目标值

y_i = r_i + \gamma Q_{w'}(S_{i+1}, \pi _{\theta '}(S_{i+1}))

Critic 训练网络输出当前时刻状态-动作的 Q 值函数 Q_w(S_t, a_t),用于对当前策略评价。为了增加智能体在环境中的探索,DDPG 在行为策略上添加了高斯噪声函数Critic 网络的目标定义为:

y_i - Q_w(S_i,a_i)

通过最小化损失值(均方误差损失)来更新 Critic 网络的参数,Critic 网络更新时的损失函数为:

loss =\frac{1}{N} \sum_i (y_i - Q_w(S_i,a_i))^2

其中,a_i = \pi _{\theta} (S_i) + \varepsilon\varepsilon 代表行为策略上的探索噪声。

Actor 目标网络用于提供下一个状态的策略Actor 训练网络则是提供当前状态的策略,结合 Critic 训练网络的 Q 值函数可以得到 Actor 在参数更新时的策略梯度

\bigtriangledown _ {\pi_\theta} J = \frac{1}{N}\sum_i \bigtriangledown _a Q_w(s,a)|_{s=s_i,a=\pi_\theta(s_i)} \bigtriangledown _{\theta} \pi_{\theta} (s)|_{s_i}

对于目标网络参数 w' 和 \theta ' 的更新,DDPG 通过软更新机制(每次 learn 的时候更新部分参数)保证参数可以缓慢更新,从而提高学习的稳定性:

w' \leftarrow \xi w + (1-\xi )w'

\theta ' =\leftarrow \xi \theta + (1- \xi ) \theta '

DDPG 中既有基于价值函数的方法特征,也有基于策略的方法特征,使深度强化学习可以处理连续动作,并且具有一定的探索能力。 

算法流程图如下:


3. 代码实现

DDPG 的伪代码如下:

模型代码如下:

import torch
from torch import nn
from torch.nn import functional as F
import numpy as np
import collections
import random

# ------------------------------------- #
# 经验回放池
# ------------------------------------- #

class ReplayBuffer:
    def __init__(self, capacity):  # 经验池的最大容量
        # 创建一个队列,先进先出
        self.buffer = collections.deque(maxlen=capacity)
    # 在队列中添加数据
    def add(self, state, action, reward, next_state, done):
        # 以list类型保存
        self.buffer.append((state, action, reward, next_state, done))
    # 在队列中随机取样batch_size组数据
    def sample(self, batch_size):
        transitions = random.sample(self.buffer, batch_size)
        # 将数据集拆分开来
        state, action, reward, next_state, done = zip(*transitions)
        return np.array(state), action, reward, np.array(next_state), done
    # 测量当前时刻的队列长度
    def size(self):
        return len(self.buffer)

# ------------------------------------- #
# 策略网络
# ------------------------------------- #

class PolicyNet(nn.Module):
    def __init__(self, n_states, n_hiddens, n_actions, action_bound):
        super(PolicyNet, self).__init__()
        # 环境可以接受的动作最大值
        self.action_bound = action_bound
        # 只包含一个隐含层
        self.fc1 = nn.Linear(n_states, n_hiddens)
        self.fc2 = nn.Linear(n_hiddens, n_actions)
    # 前向传播
    def forward(self, x):
        x = self.fc1(x)  # [b,n_states]-->[b,n_hiddens]
        x = F.relu(x)
        x = self.fc2(x)  # [b,n_hiddens]-->[b,n_actions]
        x= torch.tanh(x)  # 将数值调整到 [-1,1]
        x = x * self.action_bound  # 缩放到 [-action_bound, action_bound]
        return x

# ------------------------------------- #
# 价值网络
# ------------------------------------- #

class QValueNet(nn.Module):
    def __init__(self, n_states, n_hiddens, n_actions):
        super(QValueNet, self).__init__()
        # 
        self.fc1 = nn.Linear(n_states + n_actions, n_hiddens)
        self.fc2 = nn.Linear(n_hiddens, n_hiddens)
        self.fc3 = nn.Linear(n_hiddens, 1)
    # 前向传播
    def forward(self, x, a):
        # 拼接状态和动作
        cat = torch.cat([x, a], dim=1)  # [b, n_states + n_actions]
        x = self.fc1(cat)  # -->[b, n_hiddens]
        x = F.relu(x)
        x = self.fc2(x)  # -->[b, n_hiddens]
        x = F.relu(x)
        x = self.fc3(x)  # -->[b, 1]
        return x

# ------------------------------------- #
# 算法主体
# ------------------------------------- #

class DDPG:
    def __init__(self, n_states, n_hiddens, n_actions, action_bound,
                 sigma, actor_lr, critic_lr, tau, gamma, device):

        # 策略网络--训练
        self.actor = PolicyNet(n_states, n_hiddens, n_actions, action_bound).to(device)
        # 价值网络--训练
        self.critic = QValueNet(n_states, n_hiddens, n_actions).to(device)
        # 策略网络--目标
        self.target_actor = PolicyNet(n_states, n_hiddens, n_actions, action_bound).to(device)
        # 价值网络--目标
        self.target_critic = QValueNet(n_states, n_hiddens, n_actions).to(device
                                                                          )
        # 初始化价值网络的参数,两个价值网络的参数相同
        self.target_critic.load_state_dict(self.critic.state_dict())
        # 初始化策略网络的参数,两个策略网络的参数相同
        self.target_actor.load_state_dict(self.actor.state_dict())

        # 策略网络的优化器
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
        # 价值网络的优化器
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)

        # 属性分配
        self.gamma = gamma  # 折扣因子
        self.sigma = sigma  # 高斯噪声的标准差,均值设为0
        self.tau = tau  # 目标网络的软更新参数
        self.n_actions = n_actions
        self.device = device

    # 动作选择
    def take_action(self, state):
        # 维度变换 list[n_states]-->tensor[1,n_states]-->gpu
        state = torch.tensor(state, dtype=torch.float).view(1,-1).to(self.device)
        # 策略网络计算出当前状态下的动作价值 [1,n_states]-->[1,1]-->int
        action = self.actor(state).item()
        # 给动作添加噪声,增加搜索
        action = action + self.sigma * np.random.randn(self.n_actions)
        return action
    
    # 软更新, 意思是每次learn的时候更新部分参数
    def soft_update(self, net, target_net):
        # 获取训练网络和目标网络需要更新的参数
        for param_target, param in zip(target_net.parameters(), net.parameters()):
            # 训练网络的参数更新要综合考虑目标网络和训练网络
            param_target.data.copy_(param_target.data*(1-self.tau) + param.data*self.tau)

    # 训练
    def update(self, transition_dict):
        # 从训练集中取出数据
        states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device)  # [b,n_states]
        actions = torch.tensor(transition_dict['actions'], dtype=torch.float).view(-1,1).to(self.device)  # [b,1]
        rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1,1).to(self.device)  # [b,1]
        next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device)  # [b,next_states]
        dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1,1).to(self.device)  # [b,1]
        
        # 价值目标网络获取下一时刻的动作[b,n_states]-->[b,n_actors]
        next_q_values = self.target_actor(next_states)
        # 策略目标网络获取下一时刻状态选出的动作价值 [b,n_states+n_actions]-->[b,1]
        next_q_values = self.target_critic(next_states, next_q_values)
        # 当前时刻的动作价值的目标值 [b,1]
        q_targets = rewards + self.gamma * next_q_values * (1-dones)
        
        # 当前时刻动作价值的预测值 [b,n_states+n_actions]-->[b,1]
        q_values = self.critic(states, actions)

        # 预测值和目标值之间的均方差损失
        critic_loss = torch.mean(F.mse_loss(q_values, q_targets))
        # 价值网络梯度
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        # 当前状态的每个动作的价值 [b, n_actions]
        actor_q_values = self.actor(states)
        # 当前状态选出的动作价值 [b,1]
        score = self.critic(states, actor_q_values)
        # 计算损失
        actor_loss = -torch.mean(score)
        # 策略网络梯度
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        # 软更新策略网络的参数  
        self.soft_update(self.actor, self.target_actor)
        # 软更新价值网络的参数
        self.soft_update(self.critic, self.target_critic)

4. 案例演示

基于 OpenAI 的 gym 环境完成一个推车游戏,目标是将小车推到山顶旗子处。动作维度为1,属于连续值;状态维度为 2,分别是 x 坐标和小车速度。

代码如下:

import numpy as np
import torch
import matplotlib.pyplot as plt
import gym
from parsers import args
from RL_brain import ReplayBuffer, DDPG
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# -------------------------------------- #
# 环境加载
# -------------------------------------- #

env_name = "MountainCarContinuous-v0"  # 连续型动作
env = gym.make(env_name, render_mode="human")
n_states = env.observation_space.shape[0]  # 状态数 2
n_actions = env.action_space.shape[0]  # 动作数 1
action_bound = env.action_space.high[0]  # 动作的最大值 1.0


# -------------------------------------- #
# 模型构建
# -------------------------------------- #

# 经验回放池实例化
replay_buffer = ReplayBuffer(capacity=args.buffer_size)
# 模型实例化
agent = DDPG(n_states = n_states,  # 状态数
             n_hiddens = args.n_hiddens,  # 隐含层数
             n_actions = n_actions,  # 动作数
             action_bound = action_bound,  # 动作最大值
             sigma = args.sigma,  # 高斯噪声
             actor_lr = args.actor_lr,  # 策略网络学习率
             critic_lr = args.critic_lr,  # 价值网络学习率
             tau = args.tau,  # 软更新系数
             gamma = args.gamma,  # 折扣因子
             device = device
            )

# -------------------------------------- #
# 模型训练
# -------------------------------------- #

return_list = []  # 记录每个回合的return
mean_return_list = []  # 记录每个回合的return均值

for i in range(10):  # 迭代10回合
    episode_return = 0  # 累计每条链上的reward
    state = env.reset()[0]  # 初始时的状态
    done = False  # 回合结束标记

    while not done:
        # 获取当前状态对应的动作
        action = agent.take_action(state)
        # 环境更新
        next_state, reward, done, _, _ = env.step(action)
        # 更新经验回放池
        replay_buffer.add(state, action, reward, next_state, done)
        # 状态更新
        state = next_state
        # 累计每一步的reward
        episode_return += reward

        # 如果经验池超过容量,开始训练
        if replay_buffer.size() > args.min_size:
            # 经验池随机采样batch_size组
            s, a, r, ns, d = replay_buffer.sample(args.batch_size)
            # 构造数据集
            transition_dict = {
                'states': s,
                'actions': a,
                'rewards': r,
                'next_states': ns,
                'dones': d,
            }
            # 模型训练
            agent.update(transition_dict)
    
    # 保存每一个回合的回报
    return_list.append(episode_return)
    mean_return_list.append(np.mean(return_list[-10:]))  # 平滑

    # 打印回合信息
    print(f'iter:{i}, return:{episode_return}, mean_return:{np.mean(return_list[-10:])}')

# 关闭动画窗格
env.close()

# -------------------------------------- #
# 绘图
# -------------------------------------- #

x_range = list(range(len(return_list)))

plt.subplot(121)
plt.plot(x_range, return_list)  # 每个回合return
plt.xlabel('episode')
plt.ylabel('return')
plt.subplot(122)
plt.plot(x_range, mean_return_list)  # 每回合return均值
plt.xlabel('episode')
plt.ylabel('mean_return')

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/3123.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【洛谷刷题】蓝桥杯专题突破-深度优先搜索-dfs(10)

目录 写在前面: 题目:P1019 [NOIP2000 提高组] 单词接龙 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 题目描述: 输入格式: 输出格式: 输入样例: 输出样例: 解题思路: 代…

【数据结构】顺序表和链表

目录.顺序表.链表比较.顺序表 线性表的顺序存储结构,使用一段物理地址连续的存储单元以此存储数据单元的线性结构(从头开始连续存储) 静态顺序表:使用定长数组存储动态顺序表:使用动态开辟的数组存储(常用…

第十三届蓝桥杯省赛 python B组复盘

文章目录前言主要内容🦞试题 A:排列字母思路代码🦞试题 B:寻找整数思路代码🦞试题 C:纸张尺寸思路代码🦞试题 D:数位排序思路代码🦞试题 E:蜂巢思路代码&…

打印菱形、三角形-课后程序(JavaScript前端开发案例教程-黑马程序员编著-第2章-课后作业)

【案例2-10】打印菱形、三角形 一、案例描述 考核知识点 for双重循环 练习目标 掌握for循环应用。打印出菱形打印出三角形。 需求分析 在本案例中我们将用JavaScript代码在页面中用“*”打印出菱形和三角形。 案例分析 菱形效果如图2-16所示。输入菱形行数6打印菱形 三角形…

计及光伏波动性的主动配电网有功无功协调优化(Matlab代码实现)

👨‍🎓个人主页:研学社的博客💥💥💞💞欢迎来到本博客❤️❤️💥💥🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密…

JVM知识整理

JVM知识整理 JVM的主要组成部分 JVM包含两个两个子系统(类加载子系统和执行引擎)和两个组件(运行时数据区与和本地库接口) 类加载子系统:根据给定的全限定类名来加载class文件到运行时数据区域中的方法区。执行引擎&a…

学大数据算跟风吗?

随着互联网、物联网和人工智能等技术的不断发展,大数据技术逐渐进入人们的视野,成为一个备受关注的热点话题。那么,大数据专业好学吗?前景如何?下面我们来一起探讨一下。 一、大数据专业的学习难度 大数据技术是一种综…

将 XLS 转换为 EXE:xlCompiler Crack

只需单击几下即可将Excel文件转换为应用程序 xl编译器无需编程即可将您的Excel电子表格转换为软件应用程序 将 XLS 转换为 EXE 将Excel文件转换为具有保护选项的应用程序。Excel 到 EXE 转换器为您提供了分发 Excel 模型的竞争优势和灵活性。将 Excel 的功能丰富的环境保存在应…

一文了解Gralde

🏠个人主页:shark-Gao 🧑个人简介:大家好,我是shark-Gao,一个想要与大家共同进步的男人😉😉 🎉目前状况:23届毕业生,目前在某公司实习&#x1f…

蓝桥杯·3月份刷题集训Day02

本篇博客旨在记录自已打卡蓝桥杯3月份刷题集训,同时会有自己的思路及代码解答希望可以给小伙伴一些帮助。本人也是算法小白,水平有限,如果文章中有什么错误之处,希望小伙伴们可以在评论区指出来,共勉💪。 文…

第14届蓝桥杯STEMA测评真题剖析-2023年3月12日Scratch编程初中级组

[导读]:超平老师的《Scratch蓝桥杯真题解析100讲》已经全部完成,后续会不定期解读蓝桥杯真题,这是Scratch蓝桥杯真题解析第113讲。 蓝桥杯选拔赛现已更名为STEMA,即STEM 能力测试,是蓝桥杯大赛组委会与美国普林斯顿多…

JavaScript 应用

目录 1、编程实现“计算任意区间内连续自然数的累加和”页面。 代码实现 2、应用 appendChild()方法和 getElementById()方法实现年月日的联动功能。 代码 1、编程实现“计算任意区间内连续自然数的累加和”页面。 (1)文档结构的创建 启动程序&#…

若依框架---权限管理设计

前言 若依权限管理包含两个部分:菜单权限 和 数据权限。菜单权限控制着我们可以执行哪些操作。数据权限控制着我们可以看到哪些数据。 菜单是一个概括性名称,可以细分为目录、菜单和按钮,以若依自身为例: 目录,就是页…

acm省赛:高桥和低桥(三种做法:区间计数、树状数组、线段树)

题目描述 有个脑筋急转弯是这样的:有距离很近的一高一低两座桥,两次洪水之后高桥被淹了两次,低桥却只被淹了一次,为什么?答案是:因为低桥太低了,第一次洪水退去之后水位依然在低桥之上&#xff…

Linux内核IO基础知识与概念

什么是 IO在计算机操作系统中,所谓的I/O就是 输入(Input)和输出(Output),也可以理解为读(Read)和写(Write),针对不同的对象,I/O模式可以划分为磁盘…

<Linux>进程控制

进程控制 文章目录进程控制一、进程创建1.fork函数认识2.写时拷贝3.fork常规用法4.fork调用失败的原因二、进程终止1.进程退出场景2.进程退出码3.进程退出的方式三、进程等待1.进程等待是什么?2.进程等待的必要性3.进程等待的方法3.1.wait函数3.2.waitpid函数4.如何…

为什么 ChatGPT 输出时经常会中断,需要输入“继续” 才可以继续输出?

作者:明明如月学长, CSDN 博客专家,蚂蚁集团高级 Java 工程师,《性能优化方法论》作者、《解锁大厂思维:剖析《阿里巴巴Java开发手册》》、《再学经典:《EffectiveJava》独家解析》专栏作者。 热门文章推荐…

树莓派Pico开发板I2C OLED显示模块接口与MicroPython编程

首先简要介绍I2C接口及I2C接口OLED显示模块,然后讲述Pico开发板I2C总线引脚及其与I2C总线OLED SSD1306显示模块的接口原理,最后给出Pico开发板控制OLED屏显示文字/图形的MicroPython程序实例。 一、I2C接口简介 I2C/IIC/I2C(Inter-Integrated…

Linux内核Socket通信原理和实例讲解

关于对 Socket 的认识,大致分为下面几个主题,Socket 是什么,Socket 是如何创建的,Socket 是如何连接并收发数据的,Socket 套接字的删除等。Socket 是什么以及创建过程一个数据包经由应用程序产生,进入到协议…

平板触控笔哪些品牌好?ipad触控笔推荐平价

苹果电容笔与平替电容笔两者需要根据我们的预算以及需求去选择,要是日常多用于用于绘画,建议可以用Apple Pencil,而对于日常仅仅用于学习与记笔记,可以用平替电容笔,由于平替电容笔的品质与表现都非常优秀。小编整理了…