强化学习Double DQN方法玩雅达利Breakout游戏完整实现代码与评估pytorch

1. 实验环境

1.1 硬件配置

  • 处理器:2*AMD EPYC 7773X 64-Core
  • 内存:1.5TB
  • 显卡:8*NVIDIA GeForce RTX 3090 24GB

1.2 工具环境

  • Python:3.10.12
  • Anaconda:23.7.4
  • 系统:Ubuntu 22.04.3 LTS (GNU/Linux 5.15.0-91-generic x86_64)
  • IDE:VS Code 1.85.1
  • gym:0.26.2
  • Pytorch:2.1.2

2. 实现

2.1 Breakout for Atari 2600

Breakout是一款经典的雅达利游戏,也就是我们所熟知的“打砖块”。玩家需要左右移动在屏幕下方的短平板子将一颗不断弹跳的小球反弹回屏幕上方,使其将一块块矩形砖块组成的六行砖块墙面打碎,并防止小球从屏幕底部掉落。在Atari 2600版本的Breakout中,玩家共有5次小球掉落机会,一旦用完就标志游戏结束,每打掉一块砖块得1分,全部打掉则游戏胜利结束。

图2-1 Breakout for Atari 2600游戏示意图

图2-1 Breakout for Atari 2600游戏示意图

在由OpenAI编写和维护的公开库Gym中,具备对Atari Breakout游戏的强化学习环境实现,无需自行编写。

2.2 Double Deep-Q Network

Deep-Q Network (DQN)方法是一种利用深度神经网络进行动作价值函数近似的Q-Learning强化学习方法。从价值函数学习的角度来说,在最朴素的Q-Learning方法中,对于状态空间和动作空间离散且简单的环境,可以使用Q table直接学习动作价值函数,从而使用贪心策略从Q table中选择动作价值最高的动作。然而更多情况下的动作价值函数并不能由Q table直接表示,因此衍生出了动作价值函数近似方法,可以引入多样的近似手段实现动作价值函数的近似逼近,这些手段包括简单的线性函数近似、更为强大的神经网络、决策树以及基于傅里叶或小波变换的方法等。而本节所实现的DQN,就是使用深度神经网络来近似动作价值函数。从经验采样学习的角度来说,对动作价值函数的近似可以在蒙特卡洛 (Monte Carlo)方法和时序差分 (Temporal Difference, TD)方法上进行实现,而其中蒙特卡洛方法需要采样完整的一幕后才能进行学习,而时序差分方法可以只采样部分步骤,特别是TD(0)时序差分方法,在每步都可以进行价值函数更新。而本节所实现的DQN就是一种TD(0)时序差分方法。
下面将具体介绍如何实现Double Deep-Q Network (DDQN)强化学习方法,用于在Breakout游戏上进行训练和评估。

2.2.1 基于卷积神经网络的Deep Q Network实现

Deep Q Network (DQN)的输入连续4帧游戏屏幕图像的4通道84*84图像,进入三个卷积层,在最后的全连接层展平输出动作概率,每层后均使用ReLU激活函数激活。详细的DQN神经网络结构参数如表2-1和图2-2所示。

表2-1 本节Deep Q Network结构参数
层类型\参数名称输入通道数/特征数输出通道数/特征数核尺寸步长
Conv143284
Conv2326442
Conv3646431
Dense7764512\\
Out5124\\

在这里插入图片描述

图2-2 本节Deep-Q Network卷积过程示意图

2.2.2 Double DQN agent

在普通的DQN agent中,只有一个Q-Network用于估计动作价值函数时,存在过估计问题,会导致学习到的策略不稳定。Hasselt等人2015年提出的Double Q-Learning很好缓解了过估计问题[ Deep Reinforcement Learning with Double Q-learning]。其中agent使用主网络和目标网络,主网络用于计算当前状态下每个动作的估计值,而目标网络则用于计算下一个状态的最大动作价值的估计值。
动作选择方面,采用的是典型的epsilon-greedy策略。每步在当前状态下以的概率选择随机动作,以 ( 1 − ϵ ) (1-\epsilon) (1ϵ)的概率通过贪心策略选择主网络给出的最大动作价值对应的动作。
遵循DQN agent的经典设置,本节也采用了经验回放缓冲区 (Experience Replay Buffer)方法,用于在训练时将采样的 ( s , a , r , s ′ ) (s,a,r,s') (s,a,r,s)轨迹缓存,并提供随机批量采样供给网络进行小批量学习,可以有效提高训练稳定性和效率[ Playing Atari with Deep Reinforcement Learning]。本节Double DQN方法中的经验缓冲区在缓存满前并不开始训练,具有冷启动特征。
对于主网络的学习,使用SmoothL1Loss,使用目标网络的价值估计结果作为监督,与主网络的价值估计结果计算loss,并对主网络进行梯度反向传播更新参数。

L i ( θ i ) = E ( s , a , r , s ′ ) ∼ D [ S m o o t h L 1 L o s s ( y i , Q ( s , a ; θ i ) ) ] L_i(\theta _i) = \mathbb{E}_{(s,a,r,s') \sim D} [SmoothL1Loss(y_i,Q(s,a;\theta_i))] Li(θi)=E(s,a,r,s)D[SmoothL1Loss(yi,Q(s,a;θi))]

y i = E ( s , a , r , s ′ ) ∼ D [ r + γ max ⁡ a ′ Q ( s ′ , a ′ ; θ c ∗ ⌊ i / c ⌋ ) ] y_i=\mathbb{E}_{(s,a,r,s') \sim D} [r+\gamma \max_{a'}Q(s',a';\theta_{c* \lfloor i/c \rfloor })] yi=E(s,a,r,s)D[r+γamaxQ(s,a;θci/c)]

S m o o t h L 1 L o s s ( x , y ) = 1 n ∑ j = 1 n { 0.5 ( x j − y j ) 2 i f ∣ x j − y j ∣ < δ δ ∗ ∣ x j − y j ∣ − 0.5 ∗ δ 2 o t h e r w i s e SmoothL1Loss(x,y)=\frac{1}{n} \sum^n_{j=1} \left\{ \begin{aligned} & 0.5(x_j-y_j)^2 & if |x_j-y_j|<\delta\\ & \delta * |x_j-y_j|-0.5*\delta^2 &otherwise \end{aligned} \right. SmoothL1Loss(x,y)=n1j=1n{0.5(xjyj)2δxjyj0.5δ2ifxjyj<δotherwise

其中 Q ( s , a ; θ i ) Q(s,a;\theta_i) Q(s,a;θi)是主网络在第 i i i次迭代时,在状态 s s s下采取动作 a a a所估计的动作价值, y i y_i yi是由奖励 r r r和目标网络计算组合得到的目标值。特别地,由于目标网络每隔 c c c次迭代与主网络参数同步一次,因此除了初始时相同,其余时候的目标网络参数为上次同步,即第 c ∗ ⌊ i / c ⌋ c* \lfloor i/c \rfloor ci/c次迭代的参数。

2.2.3 模型训练技巧

平衡探索与利用:这是强化学习方法中几乎无法避开的主题,探索和利用的平衡牵涉到模型性能和训练效率,因此需要设置合适地训练策略来使得agent平衡探索和利用。在本节的实现中,探索与利用的平衡主要通过训练时选择动作所使用的epsilon-greedy策略来实现,更具体而言是由参数epsilon控制。本节方法首先在前期一定步数内只进行完全随机动作选择来进行探索。此外还采用了指数衰减式的动态epsilon,使得训练前期可以进行最大化的探索,而在后期则最大化利用agent学习的策略。如下式,在第 t t t步时的定义为:
ϵ t = ϵ E N D + ( ϵ S T A R T − ϵ E N D ) ∗ e − t / ϵ D E C A Y \epsilon_t = \epsilon _{END} + (\epsilon _{START}-\epsilon _{END})*e^{-t/\epsilon _{DECAY}} ϵt=ϵEND+(ϵSTARTϵEND)et/ϵDECAY
其中 ϵ S T A R T \epsilon _{START} ϵSTART为epsilon在起步时的最大值,\epsilon _{END}为衰减结束时的最小值, ϵ D E C A Y \epsilon _{DECAY} ϵDECAY为衰减过程控制系数, ϵ D E C A Y \epsilon _{DECAY} ϵDECAY越大,衰减过程越长。当 t t t不断增大,$\epsilon_t $的值将不断逼近。
Gym环境配置:Gym提供了wrapper方式对环境进行修改配置,结合训练效率方面的考虑,需要对环境的可观测状态进行和每幕结束时机进行修改。
在Breakout-v5环境中,可观测状态主要是一幅三通道彩色图像,形状为(210, 160, 3),值为[0,255]的8bit无符号整数。由于彩色在此游戏中并没有特殊含义,去除后不影响游戏进行,因此首先将将图像的三通道彩色通过OpenCV库处理为单通道灰度图,形状为(210, 160)。进一步地,为了节省存储空间,加快训练速度,将该灰度图压缩变形为(84, 84)的方形图像。此外,在Breakout游戏中,玩家每次有5次机会,当小球第5次掉落时游戏结束,只能重置。为了使得agent学习到不让小球掉落的重要性,训练时将每次小球掉落作为一幕的结束,而本文若无特殊说明,则一幕仍指以第5次小球掉落为结束的一局游戏。

3. 分析评估

本节将分析介绍2.2和2.3小节实现的Double DQN方法在2.1小结介绍的Breakout游戏上的训练和评估表现。
设置超参数如表3-1所示,本节所有针对Double DQN的实验除特殊说明外所有超参数均相同。

表3-1 Double DQN训练评估超参数设置
超参数名称作用
BATCH_SIZE32批量大小
GAMMA0.99折扣系数
EPS_START1见2.3节描述
EPS_END0.02见2.3节描述
EPS_DECAY1e6见2.3节描述
EPS_RANDOM_COUNT5e4见2.3节描述
LR1e-4优化器学习率
INITIAL_MEMORY1e4Replay Buffer初始大小
MEMORY_SIZE1e5Replay Buffer存储上限大小
N_EPISODE1e5训练幕数(一次生命为一幕)
TARGET_UPDATE1e3目标网络更新频率

首先设置实验定量定性地探究Double DQN的Replay Buffer和fixed target对于模型性能的影响。因此利用单一变量原则,只设置MEMORY_SIZE,分别为2e4、1e5和5e5各进行一次训练;此外只设置TARGET_UPDATE,分别为2e2、1e5和5e3各进行一次训练。上述训练均以100幕(一次生命为一幕)为一个Epoch,得到如下表3-2的6组实验结果(TARGET_UPDATE=1e3的已经在MEMORY_SIZE=1e5实验训练过,因此两个实验的结果相同)。

表3-2 Double DQN agent在Breakout游戏上改变单一超参数训练的结果统计
DQN模型超参数100 Epochs
训练耗时(分钟)最高Epoch平均幕回报最高单幕回报与所在幕数
TARGET_UPDATE=2e2122418.8351 at 14495 ep
TARGET_UPDATE=1e3195030.4485 at 19715 ep
TARGET_UPDATE=5e3167743.44265 at 15186 ep
MEMORY_SIZE=2e4101422.3467 at 17609 ep
MEMORY_SIZE=1e5195030.4485 at 19715 ep

将上述训练过程的每个Epoch的平均幕奖励绘制为折线图,如图3-1所示。结合图表,可见TARGET_UPDATE越大,模型表现越好,且收敛时间与之并非正相关。当TARGET_UPDATE为5e3时模型收敛最好,收敛速度适中。而MEMORY_SIZE参数同样也是越大模型效果越好,但是其训练耗时同样存在很大的增长,相差5倍的MEMORY_SIZE在收敛时间上相差近1000分钟。综合模型收敛效果和速度,在合适的MEMORY_SIZE下适当增加TARGET_UPDATE可以获得最优的收敛性能。

在这里插入图片描述

图3-1 Double DQN在Breakout游戏上改变不同超参数训练的每Epoch平均幕奖励曲线

对Double DQN方法的不同超参数训练分支,均使用历史最优模型进行评估测试,即每个模型进行100幕游戏,并统计每个模型的最高、最低和平均单幕得分,同时绘制每幕得分的箱线图。

在这里插入图片描述

图3-3 Double DQN不同超参模型的100幕评估测试得分箱线图
表3-3 Double DQN不同超参模型的100幕评估测试得分统计
模型平均奖励最低幕奖励最高幕奖励
DDQN(TU=2e2)7.53.023.0
DDQN(TU=1e3)31.57.068.0
DDQN(TU=5e3)38.67.079.0
DDQN(MS=2e4)20.03.039.0
DDQN(MS=5e5)24.06.051.0

*DDQN指Double DQN; TU指TARGET_UPDATE; MS指MEMORY_SIZE;

图3-3描绘了Double DQN在不同超参数下训练的模型在100幕Breakout游戏上评估所得单幕回报的箱线图。从中可以发现Double DQN方法在目标网络更新频率为5e3时的评估效果总体最好,得分上限最高,但相对不稳定;而Double DQN在目标网络更新频率为2e2时的评估效果虽然稳定但总体最差。这一结论与根据图3-1得出的结论基本一致。

4.结论与展望

通过实验过程和结果的分析,我们可以得出以下结论:
Double DQN的超参数选择影响模型性能:在实验中,我们发现Double DQN方法在目标网络更新频率为5e3时表现最好,具有最高的平均幕回报。同时,实验结果还表明MEMORY_SIZE参数的增加可以提升模型性能,但同时也导致训练时间的显著增加。在选择超参数时,需要平衡模型性能和训练效率。
Double DQN在不同超参数下的评估结果有较大差异: 尽管在目标网络更新频率为5e3时Double DQN表现最好,但其评估结果相对不稳定,具有较大的方差。这提示我们在选择超参数时不仅要考虑性能指标的提升,还要关注模型的稳定性。
平衡探索与利用的重要性:在强化学习中,平衡探索和利用是一个重要的主题。在实验中,我们使用了epsilon-greedy策略在DQN中来平衡探索和利用。通过调整epsilon的衰减方式,我们可以在训练的不同阶段进行不同程度的探索和利用,从而提高模型的学习效率。

基于上述的结论和对本次整个实验过程的分析总结,在此对未来的工作和本文不足之处进行以下总结展望。
进一步优化超参数:未来的工作可以通过更系统地调整超参数,尤其是对于Double DQN方法中的其他超参数,来寻找更优的组合,以提高模型性能和训练效率。
尝试其他强化学习算法:除了DQN,还有许多其他强化学习算法可以尝试,例如PPO、DDPG等。对比不同算法在相同环境下的表现,有助于更全面地了解它们的优劣势。
探索更复杂的游戏环境:在本实验中,我们使用了Atari 2600版本的Breakout游戏作为测试环境。未来的工作可以尝试在更复杂的游戏环境中验证模型的性能,以更好地适应现实世界的复杂任务。
深入研究模型稳定性:对于Double DQN在不同超参数下评估结果不稳定的问题,未来的研究可以更深入地探讨如何提高模型的稳定性,以确保在不同训练阶段都能取得良好的性能。

5. 代码

import time
import random
import numpy as np
import ale_py
# import gymnasium as gym
import gym
import torch
import torch.nn as nn
from torch import optim
from torch.autograd import Variable
import torch.nn.functional as F
from collections import deque,namedtuple
from tqdm import tqdm
import matplotlib.pyplot as plt
import cv2
from itertools import count
import random, pickle, os.path, math, glob

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

Transition = namedtuple('Transion', ('state', 'action', 'next_state', 'reward'))

## 超参数
# epsilon = 0.9
BATCH_SIZE = 32
GAMMA = 0.99
EPS_START = 1
EPS_END = 0.02
EPS_DECAY = 1000000
EPS_RANDOM_COUNT = 50000 # 前50000步纯随机用于探索
TARGET_UPDATE = 1000 # steps
RENDER = False
lr = 1e-4
INITIAL_MEMORY = 10000
MEMORY_SIZE = 10 * INITIAL_MEMORY
n_episode = 100000#10000000


MODEL_STORE_PATH = './models'#+'DQN_pytorch_pong'
modelname = 'DQN_Breakout'
madel_path = MODEL_STORE_PATH + 'DQN_Breakout_episode60.pt'#+ '/' + 'model/'


## 超参数
# epsilon = 0.9
BATCH_SIZE = 32
GAMMA = 0.99
EPS_START = 1
EPS_END = 0.02
EPS_DECAY = 1000000
EPS_RANDOM_COUNT = 50000 # 前50000步纯随机用于探索
TARGET_UPDATE = 1000 # steps
RENDER = False
lr = 1e-4
INITIAL_MEMORY = 10000
MEMORY_SIZE = 10 * INITIAL_MEMORY
n_episode = 100000#10000000


MODEL_STORE_PATH = './models'#+'DQN_pytorch_pong'


class ReplayMemory(object):
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity #移动指针,经验池满了之后从最开始的位置开始将最近的经验存进经验池

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)# 从经验池中随机采样

    def __len__(self):
        return len(self.memory)
        
class DQN(nn.Module):
    def __init__(self, in_channels=4, n_actions=14):
        """
        Initialize Deep Q Network
        Args:
            in_channels (int): number of input channels
            n_actions (int): number of outputs
        """
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, 32, kernel_size=8, stride=4)
        # self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        # self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        # self.bn3 = nn.BatchNorm2d(64)
        self.fc4 = nn.Linear(7*7*64, 512)
        self.head = nn.Linear(512, n_actions)

    def forward(self, x):
        # print(x)
        # print(x.shape)
        # cv2.imwrite("test_x.png",x.cpu().numpy()[0][0])
        x = x.float() / 255
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = x.view(x.size(0), -1)  # 将卷积层的输出展平
        x = F.relu(self.fc4(x)) #.view(x.size(0), -1)
        out = self.head(x)
        # print(out)
        return out

class DQN_agent():
    def __init__(self,in_channels=4, action_space=[], learning_rate=1e-4, memory_size=10000, epsilon=1, trained_model_path=''):

        self.in_channels = in_channels
        self.action_space = action_space
        self.action_dim = self.action_space.n

        self.memory_buffer = ReplayMemory(memory_size)
        self.stepdone = 0
        self.DQN = DQN(self.in_channels, self.action_dim).to(device)
        self.target_DQN = DQN(self.in_channels, self.action_dim).to(device)
        # 加载之前训练好的模型,没有预训练好的模型时可以注释
        if(trained_model_path != ''):
            self.DQN.load_state_dict(torch.load(trained_model_path))

        self.target_DQN.load_state_dict(self.DQN.state_dict())
        self.optimizer = optim.RMSprop(self.DQN.parameters(),lr=learning_rate, eps=0.001, alpha=0.95)



    def select_action(self, state):

        self.stepdone += 1
        state = state.to(device)
        epsilon = EPS_END + (EPS_START - EPS_END)* \
            math.exp(-1. * self.stepdone / EPS_DECAY)            # 随机选择动作系数epsilon 衰减,也可以使用固定的epsilon
        # epsilon-greedy策略选择动作
        if self.stepdone<EPS_RANDOM_COUNT or random.random()<epsilon:
            action = torch.tensor([[random.randrange(self.action_dim)]], device=device, dtype=torch.long)
        else:
            action = self.DQN(state).detach().max(1)[1].view(1,1)  # 选择Q值最大的动作并view

        return action


    def learn(self):
        # 经验池小于BATCH_SIZE则直接返回
        if self.memory_buffer.__len__()<BATCH_SIZE:
            return
        # 从经验池中采样

        transitions = self.memory_buffer.sample(BATCH_SIZE)
        '''
        batch.state - tuple of all the states (each state is a tensor)                  (BATCH_SIZE * channel * h * w)
        batch.next_state - tuple of all the next states (each state is a tensor) (BATCH_SIZE * channel * h * w)
        batch.reward - tuple of all the rewards (each reward is a float)          (BATCH_SIZE * 1)
        batch.action - tuple of all the actions (each action is an int)               (BATCH_SIZE * 1)
        '''
        
        batch = Transition(*zip(*transitions))

        actions = tuple((map(lambda a: torch.tensor([[a]], device=device), batch.action)))
        rewards = tuple((map(lambda r: torch.tensor([r], device=device), batch.reward)))


        # 判断是不是在最后一个状态,最后一个状态的next设置为None
        non_final_mask = torch.tensor(
            tuple(map(lambda s: s is not None, batch.next_state)),
            device=device, dtype=torch.uint8).bool()

        non_final_next_states = torch.cat([s for s in batch.next_state
                                           if s is not None]).to(device)


        state_batch = torch.cat(batch.state).to(device)
        action_batch = torch.cat(actions)
        reward_batch = torch.cat(rewards)
        # 计算当前状态的Q值
        state_action_values = self.DQN(state_batch).gather(1, action_batch)

        next_state_values = torch.zeros(BATCH_SIZE, device=device)
        next_state_values[non_final_mask] = self.target_DQN(non_final_next_states).max(1)[0].detach()
        expected_state_action_values = (next_state_values * GAMMA) + reward_batch

        loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))
        self.optimizer.zero_grad()
        loss.backward()
        for param in self.DQN.parameters():
            param.grad.data.clamp_(-1, 1)

        self.optimizer.step()

class Trainer():
    def __init__(self, env, agent, n_episode):
        self.env = env
        self.n_episode = n_episode
        self.agent = agent
        # self.losslist = []
        self.rewardlist = []
        self.avg_rewardlist = []

    # 获取当前状态,将env返回的状态通过transpose调换轴后作为状态
    def get_state(self,obs):
        # print(obs.shape)
        state = np.array(obs)
        # state = state.transpose((1, 2, 0)) #将2轴放在0轴之前
        state = torch.from_numpy(state)
        return state.unsqueeze(0)    # 转化为四维的数据结构


 # 训练智能体
    def train(self):
        for episode in range(self.n_episode):

            obs = self.env.reset()

            # print('============obs = self.env.reset()============')
            # state = self.img_process(obs)
            state = np.stack((obs[0], obs[1], obs[2], obs[3]))
            # print(state.shape)
            state = self.get_state(state)
            # print(state.shape)
            episode_reward = 0.0

            # print('episode:',episode)
            for t in count():
                # print(state.shape)
                action = self.agent.select_action(state)
                if RENDER:
                    self.env.render()


                obs,reward,done,_,_ = self.env.step(action)
                episode_reward += reward

                if not done:
                    # next_state = self.get_state(obs)

                    # next_state = self.img_process(obs)
                    next_state = np.stack((obs[0], obs[1], obs[2], obs[3]))
                    next_state = self.get_state(next_state)
                else:
                    next_state = None
                # print(next_state.shape)
                reward = torch.tensor([reward], device=device)

                # 将四元组存到memory中
                '''
                state: batch_size channel h w    size: batch_size * 4
                action: size: batch_size * 1
                next_state: batch_size channel h w    size: batch_size * 4
                reward: size: batch_size * 1
                '''
                self.agent.memory_buffer.push(state, action.to('cpu'), next_state, reward.to('cpu')) # 里面的数据都是Tensor
                # print('========memory_buffer.push=========')
                # print(state.shape)
                # print(action.shape)
                # print(next_state.shape)
                # print(reward)
                # cv2.imwrite("test_1.png",state[0].numpy()[0])
                # cv2.imwrite("test_2.png",state[0].numpy()[1])
                # cv2.imwrite("test_3.png",state[0].numpy()[2])
                # cv2.imwrite("test_4.png",state[0].numpy()[3])
                # time.sleep(0.1)
                
                state = next_state
                # 经验池满了之后开始学习
                if self.agent.stepdone > INITIAL_MEMORY:
                    self.agent.learn()
                    if self.agent.stepdone % TARGET_UPDATE == 0:
                        print('======== target DQN updated =========')
                        self.agent.target_DQN.load_state_dict(self.agent.DQN.state_dict())

                if done:
                    break
            agent_epsilon = EPS_END + (EPS_START - EPS_END)* math.exp(-1. * self.agent.stepdone / EPS_DECAY) 
                 
            print('Total steps: {} \t Episode/steps: {}/{} \t Total reward: {} \t Avg reward: {} \t epsilon: {}'.format(
                self.agent.stepdone, episode, t, episode_reward, episode_reward/t, agent_epsilon))
            
            
            if episode % 20 == 0:
                torch.save(self.agent.DQN.state_dict(), MODEL_STORE_PATH + '/' + "{}_episode{}.pt".format(modelname, episode))
                # print('Total steps: {} \t Episode: {}/{} \t Total reward: {}'.format(self.agent.stepdone, episode, t, episode_reward))

            self.rewardlist.append(episode_reward)
            self.avg_rewardlist.append(episode_reward/t)
            


            self.env.close()
        return

    #绘制单幕总奖励曲线
    def plot_total_reward(self):
        plt.plot(self.rewardlist)
        plt.xlabel("Training epochs")
        plt.ylabel("Total reward per episode")
        plt.title('Total reward curve of DQN on Skiing')
        plt.savefig('DQN_train_total_reward.png')
        plt.show()

    #绘制单幕平均奖励曲线
    def plot_avg_reward(self):
        plt.plot(self.avg_rewardlist)
        plt.xlabel("Training epochs")
        plt.ylabel("Average reward per episode")
        plt.title('Average reward curve of DQN on Skiing')
        plt.savefig('DQN_train_avg_reward.png')
        plt.show()
# reward clip
class ClipRewardEnv(gym.RewardWrapper):
    def __init__(self, env):
        gym.RewardWrapper.__init__(self, env)

    def reward(self, reward):
        """Bin reward to {+1, 0, -1} by its sign."""
        return np.sign(reward)
    
# image frame process
class WarpFrame(gym.ObservationWrapper):
    def __init__(self, env, width=84, height=84, grayscale=True):
        """Warp frames to 84x84 as done in the Nature paper and later work."""
        gym.ObservationWrapper.__init__(self, env)
        self.width = width
        self.height = height
        self.grayscale = grayscale
        if self.grayscale:
            self.observation_space = gym.spaces.Box(low=0, high=255,
                shape=(self.height, self.width, 1), dtype=np.uint8)
        else:
            self.observation_space = gym.spaces.Box(low=0, high=255,
                shape=(self.height, self.width, 3), dtype=np.uint8)

    def observation(self, frame):
        if self.grayscale:
            frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
        frame = cv2.resize(frame, (self.width, self.height), interpolation=cv2.INTER_AREA)
        if self.grayscale:
            frame = np.expand_dims(frame, -1)
        return frame
    
class ScaledFloatFrame(gym.ObservationWrapper):
    def __init__(self, env):
        gym.ObservationWrapper.__init__(self, env)
        self.observation_space = gym.spaces.Box(low=0, high=1, shape=env.observation_space.shape, dtype=np.float32)

    def observation(self, observation):
        # careful! This undoes the memory optimization, use
        # with smaller replay buffers only.
        return np.array(observation).astype(np.float32) / 255.0
    
# Frame Stacking
class FrameStack(gym.Wrapper):
    def __init__(self, env, k):
        """
        Stack k last frames.
        Returns lazy array, which is much more memory efficient.
        See Also
        --------
        baselines.common.atari_wrappers.LazyFrames
        """
        gym.Wrapper.__init__(self, env)
        self.k = k
        self.frames = deque([], maxlen=k)
        shp = env.observation_space.shape
        self.observation_space = gym.spaces.Box(low=0, high=255, shape=(shp[:-1] + (shp[-1] * k,)), dtype=env.observation_space.dtype)

    def reset(self):
        ob = self.env.reset()
        if isinstance(ob, tuple): # obs is tuple in newer version of gym
            ob = ob[0]
        for _ in range(self.k):
            self.frames.append(ob)
        return self._get_ob()

    def step(self, action):
        ob, reward, done, truncated, info = self.env.step(action) 
        if isinstance(ob, tuple):
            ob = ob[0]
        self.frames.append(ob)
        return self._get_ob(), reward, done, truncated, info

    def _get_ob(self):
        assert len(self.frames) == self.k
        return LazyFrames(list(self.frames))
    
class EpisodicLifeEnv(gym.Wrapper):
    def __init__(self, env):
        """Make end-of-life == end-of-episode, but only reset on true game over.
        Done by DeepMind for the DQN and co. since it helps value estimation.
        """
        gym.Wrapper.__init__(self, env)
        self.lives = 0
        self.was_real_done  = True

    def step(self, action):
        obs, reward, done, truncated, info = self.env.step(action)
        self.was_real_done = done
        # check current lives, make loss of life terminal,
        # then update lives to handle bonus lives
        lives = self.env.unwrapped.ale.lives()

        if lives < self.lives and lives > 0:
            # for Qbert sometimes we stay in lives == 0 condition for a few frames
            # so it's important to keep lives > 0, so that we only reset once
            # the environment advertises done.
            done = True
        self.lives = lives
        return obs, reward, done, truncated, info

    def reset(self, **kwargs):
        """Reset only when lives are exhausted.
        This way all states are still reachable even though lives are episodic,
        and the learner need not know about any of this behind-the-scenes.
        """
        if self.was_real_done:
            obs, info = self.env.reset(**kwargs)
        else:
            # no-op step to advance from terminal/lost life state
            obs, _, _, _, info = self.env.step(0)
        self.lives = self.env.unwrapped.ale.lives()
        return obs, info
    
class LazyFrames(object):
    def __init__(self, frames):
        """
        This object ensures that common frames between the observations are only stored once.
        It exists purely to optimize memory usage which can be huge for DQN's 1M frames replay
        buffers.
        This object should only be converted to numpy array before being passed to the model.
        You'd not believe how complex the previous solution was.
        """
        self._frames = frames
        self._out = None

    def _force(self):
        if self._out is None:
            self._out = np.concatenate(self._frames, axis=-1)
            self._frames = None
        return self._out

    def __array__(self, dtype=None):
        out = self._force()
        if dtype is not None:
            out = out.astype(dtype)
        return out

    def __len__(self):
        return len(self._force())

    def __getitem__(self, i):
        return self._force()[..., i]
    
def env_wrap_deepmind(env, episode_life=True, clip_rewards=True, frame_stack=True, scale=True):
    """
    Configure environment for DeepMind-style Atari.
    """
    if episode_life:
        env = EpisodicLifeEnv(env)
    env = WarpFrame(env)
    if scale:
        env = ScaledFloatFrame(env) # scale the frame image
    if clip_rewards:
        env = ClipRewardEnv(env) # clip the reward into [-1,1]
    if frame_stack:
        env = FrameStack(env, 4) # stack 4 frame to replace the RGB 3 chanels image
    return env
    
# create environment and warp it into DeepMind style
env = env_wrap_deepmind(gym.make("ALE/Breakout-v5"), episode_life=True, clip_rewards=False, frame_stack=True, scale=False)
action_space = env.action_space

# use 4 stacked chanel
agent = DQN_agent(in_channels = 4, action_space = action_space, learning_rate = lr, memory_size=MEMORY_SIZE)

trainer = Trainer(env, agent, n_episode) # 这里应该用超参数里的n_episode
trainer.train()
trainer.plot_total_reward()
# trainer.plot_avg_reward()

# save total reward list
np.save('total_reward_list_breakout_1e5.npy',np.array(trainer.rewardlist))
np.save('avg_reward_list_breakout_1e5.npy',np.array(trainer.avg_rewardlist))

print('The training costs {} episodes'.format(len(trainer.rewardlist)))

print('The max episode reward is {}, at episode {}'.format(
    max(trainer.rewardlist),
    trainer.rewardlist.index(max(trainer.rewardlist))
    ))

# 合并5 episodes为1episode
assert(len(trainer.rewardlist)%5==0)
reshaped_reward_array = np.array(trainer.rewardlist).reshape((int(len(trainer.rewardlist)/5), 5))
# 沿着第二个维度求和
summed_rewawrd_array = reshaped_reward_array.sum(axis=1)


print('Now takes 5 episodes as 1, the training cost {} complete episodes'.format(len(summed_rewawrd_array)))

print('The max episode return is {}, at episode {}'.format(
    max(summed_rewawrd_array),
    np.where(summed_rewawrd_array == max(summed_rewawrd_array))
    ))

# 合并200 episodes为1episode
assert(len(summed_rewawrd_array)%200==0)
reshaped_reward_array_200 = summed_rewawrd_array.reshape((int(len(summed_rewawrd_array)/200), 200))
# 沿着第二个维度求和
summed_rewawrd_array_200 = reshaped_reward_array_200.sum(axis=1)
avg_rewawrd_array_200 = summed_rewawrd_array_200/200.0

np.save('avg_rewawrd_array_200_breakout_1e5.npy',avg_rewawrd_array_200)

print('The following graph takes 1000 games as 1 epoch where 5 games equals to 1 episode as stated before')

max_idx = np.argmax(avg_rewawrd_array_200)
max_y = max(avg_rewawrd_array_200)
print('The best average return per epoch is {}, at epoch {}'.format(max_idx,max_y))

plt.figure(figsize=(10,6))
plt.plot(avg_rewawrd_array_200,marker='o',markersize=4)
plt.xlabel("Training Epochs",fontsize=12)
plt.ylabel("Average Reward per Episode",fontsize=12)

plt.scatter(max_idx, max_y, color='red', s=60)
plt.annotate(f'max avg return: ({max_idx}, {max_y:.2f})', xy=(max_idx, max_y), xytext=(max_idx-40, max_y-1),
             arrowprops=dict(facecolor='red', shrink=0.05))
# plt.title('Average Reward of DQN on Breakout')
plt.savefig('DQN_train_total_reward_Breakout.svg')
plt.show()

Reference:
https://blog.csdn.net/libenfan/article/details/117395547
https://zhuanlan.zhihu.com/p/80185628

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/305637.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

H264/AVC的句法和语义

概述 码流的基本单位&#xff1a; 在编码器输出的码流中&#xff0c;数据的基本单位是句法元素&#xff0c;每个句法元素由若干比特组成&#xff0c;它表示某个特定的物理意义 &#xff0c;比如宏块类型、量化参数等。 句法&#xff1a;句法表征句法元素的组织结构。 语义&a…

Fluids —— Volume VOP

P&#xff0c;当前体素位置&#xff1b;density&#xff0c;此场的值&#xff1b;ix, iy, iz&#xff0c;体素索引&#xff08;0 ~ res-1&#xff09;&#xff1b;resx, resy, resz&#xff0c;当前volume的精度&#xff1b;center&#xff0c;当前volume的中心点&#xff1b;o…

MYSQL学习之buffer pool的理论学习

MYSQL学习之buffer pool的理论学习 by 小乌龟 文章目录 MYSQL学习之buffer pool的理论学习一、buffer pool是什么&#xff1f;二、buffer pool 的内存结构三、buffer pool 的初始化和配置初始化配置 四、buffer pool 空间管理LRU淘汰法冷热数据分离的LRU算法 一、buffer pool是…

大模型第三节课程笔记

大模型开发范式 优点&#xff1a;具有强大语言理解&#xff0c;指令跟随&#xff0c;和语言生成的能力&#xff0c;具有强大的知识储备和一定的逻辑推理能力&#xff0c;进而能作为基座模型&#xff0c;支持多元应用。 不足&#xff1a;大模型的知识时效性受限&#xff0c;大模…

OLED透明屏多少钱一平方,价格影响因素、计算方法与规格种类

OLED透明屏&#xff0c;以其独特的透明度和出色的画质&#xff0c;正逐渐成为高端显示市场的宠儿。但对于消费者来说&#xff0c;最关心的莫过于其价格。本文将详细解析OLED透明屏的价格&#xff0c;包括影响因素、计算方法以及规格种类。 一、影响因素 OLED透明屏的价格受到多…

Nessus漏洞扫描工具安装、使用技巧及注意事项

Nessus是一款功能强大的安全评估工具&#xff0c;它可以帮助安全团队快速发现网络中潜在的安全风险和漏洞&#xff0c;并对其进行评估和修复。对于渗透测试人员来说&#xff0c;Nessus更是必不可少的工具之一。 1. Nessus安装 获取安装包&#xff0c;官网地址&#xff1a;http…

【Java并发】聊聊concurrentHashMap的put核心流程

结构介绍 1.8中concurrentHashMap采用数组链表红黑树的方式存储&#xff0c;并且采用CASSYN的方式。在1.7中主要采用的是数组链表&#xff0c;segment分段锁reentrantlock。本篇主要在1.8基础上介绍下. 那么&#xff0c;我们的主要重点是分析什么呢&#xff0c;其实主要就是p…

业界首款PCIe 4.0/5.0多通道融合接口SSD技术解读

之前小编写过一篇文章劝大家不要碰PCIe 5.0 SSD&#xff0c;详细内容&#xff0c;可以再回顾下&#xff1a; 扩展阅读&#xff1a;当下最好不要入坑PCIe 5.0 SSD 如果想要进一步了解PCIe 6.0&#xff0c;欢迎点击阅读&#xff1a; 浅析PCIe 6.0功能更新与实现的挑战 PCIe 6.…

【强化学习的数学原理-赵世钰】课程笔记(五)蒙特卡洛方法

目录 一.内容概述 二.激励性实例&#xff08;Motivating examples&#xff09; 三.最简单的基于 MC 的 RL 算法&#xff1a;MC basic 1.将策略迭代转换为无模型迭代&#xff08;Convert policy iteration to be model-free&#xff09; 2.The MC Basic algorithm 3.例子 …

无人驾驶卡尔曼滤波

无人驾驶卡尔曼滤波&#xff08;行人检测&#xff09; x k a x k − 1 w k x_k ax_{k-1} w_k xk​axk−1​wk​ w k w_k wk​&#xff1a;过程噪声 状态估计 估计飞行器状态&#xff08;高度&#xff09; x k z k − v k x_k z_k - v_k xk​zk​−vk​ 卡尔曼滤波通…

vivado 导入工程、TCL创建工程命令、

导入外部项目 您可以使用导入在Vivado IDE外部创建的现有RTL级项目文件Synopsys Synplify。Vivado IDE检测项目中的源文件并自动添加文件到新项目。设置&#xff0c;如顶部模块、目标设备和VHDL库 分配是从现有项目导入的。 1.按照创建项目中的步骤进行操作。 2.在“项目类…

Linux学习(13)——系统安全及应用

一、账号安全基本措施 1、系统账号清理 将非登录用户的Shell设为/sbin/nologin,及将用户设置为无法登录 锁定长期不使用的账户 删除无用的账户 锁定账户密码 本质锁定 shell——/sbin/nologin却比较特殊&#xff0c;所谓“无法登陆”指的仅是这个用户无法使用bash或其他sh…

忆阻器芯片STELLAR权重更新算法(清华大学吴华强课题组)

参考文献&#xff08;清华大学吴华强课题组&#xff09; Zhang, Wenbin, et al. “Edge learning using a fully integrated neuro-inspired memristor chip.” Science 381.6663 (2023): 1205-1211. STELLAR更新算法原理 在权值更新阶段&#xff0c;只需根据输入、输出和误差…

Qt/QML编程学习之心得:一个蓝牙音乐播放器的实现(30)

蓝牙bluetooth作为一种短距离的通信方式应用也是越来越广,比如很多智能家居、蓝牙遥控器、蓝牙音箱、蓝牙耳机、蓝牙手表等,手机的蓝牙功能更是可以和各种设备进行互联,甚至可以连接到车机上去配合wifi提供投屏、音乐等。那么如何在中控IVI上使用Qt来实现一个蓝牙音乐播放器…

山东名岳轩印刷包装携专业包装袋盛装亮相2024济南生物发酵展

山东名岳轩印刷包装有限公司盛装亮相2024第12届国际生物发酵展&#xff0c;3月5-7日山东国际会展中心与您相约&#xff01; 展位号&#xff1a;1号馆F17 山东名岳轩印刷包装有限公司是一家拥有南北两个生产厂区&#xff0c;设计、制版、印刷&#xff0c;营销策划为一体的专业…

编译原理期末大题步骤——例题

一、预测分析方法步骤 提取左公因子&#xff0c;消除左递归判断文法是否为LL(1)文法若是&#xff0c;构造预测分析表&#xff1b;否则&#xff0c;不能进行分析。根据预测分析表对输入串进行分析 例子&#xff1a; 文法G[E]&#xff1a; …

C#:让不安全代码(unsafe)运行起来

C#:让不安全代码&#xff08;unsafe&#xff09;运行起来 当我们VS中编写unsafe代码时&#xff0c;显示不能运行 这时只需要按照这里的提示&#xff0c;修改我们的属性设置即可 这样就可以运行了

AQS应用之BlockingQueue详解

概要 AQS全称是 AbstractQueuedSynchronizer&#xff0c;中文译为抽象队列式同步器。BlockingQueue&#xff0c;是java.util.concurrent 包提供的用于解决并发生产者 - 消费者问题的最有用的类&#xff0c;它的特性是在任意时刻只有一个线程可以进行take或者put操作&#xff0…

近似点梯度法

最优化笔记——Proximal Gradient Method 最优化笔记&#xff0c;主要参考资料为《最优化&#xff1a;建模、算法与理论》 文章目录 最优化笔记——Proximal Gradient Method一、邻近算子&#xff08;1&#xff09;定义 二、近似点梯度法&#xff08;1&#xff09;迭代格式&…

4.4 媒资管理模块 - 分布式任务处理介绍、视频处理技术方案

媒资管理模块 - 视频处理 文章目录 媒资管理模块 - 视频处理一、视频转码1.1 视频转码介绍1.2 FFmpeg 基本使用1.2.1 下载安装配置1.2.2 转码测试 1.3 工具类1.3.1 VideoUtil1.3.2 Mp4VideoUtil1.3.3 测试工具类 二、分布式任务处理2.1 分布式任务调度2.2 XXL-JOB 配置执行器 中…