【机器学习】强化学习(六)-DQN(Deep Q-Learning)训练月球着陆器示例

概述

45ceb3a59f72035983dbed6485a850b4.png

Deep Q-Learning(深度 Q 学习)是一种强化学习算法,用于解决决策问题,其中代理(agent)通过学习在不同环境中采取行动来最大化累积奖励。Lunar Lander 是一个经典的强化学习问题,其中代理的任务是控制一个着陆舱在月球表面着陆,最小化着陆过程中的燃料消耗。

以下是使用 Deep Q-Learning 解决 Lunar Lander 问题的基本步骤:

  1. 环境建模: 首先,需要对 Lunar Lander 环境进行建模。这包括定义状态空间、动作空间、奖励函数等。在 Lunar Lander 中,状态可以包括着陆舱的位置、速度、角度等信息,动作可以是推力引擎的火力等。

  2. 深度 Q 网络: 创建一个深度神经网络,该网络将输入状态,并输出每个可能动作的 Q 值。Q 值表示在给定状态下采取某个动作的累积奖励的估计。网络的目标是通过学习调整 Q 值以最大化累积奖励。

  3. 经验回放: 使用经验回放机制来改善学习稳定性。这涉及到存储代理先前的经验,并从中随机抽样进行训练。这有助于解决样本相关性问题,提高算法的收敛性。

  4. ε-贪婪策略: 引入 ε-贪婪策略,以平衡探索和利用。在一部分情况下,代理将以高概率选择当前认为最佳的动作(贪婪),而在其他情况下,它会以较高概率选择一个随机动作(探索)。

  5. 目标 Q 值计算: 使用目标 Q 值来更新网络参数。目标 Q 值通过将当前状态的即时奖励与下一个状态的最大 Q 值相结合得到。这有助于更有效地传播奖励信号。

  6. 训练: 通过与环境的交互,不断地更新深度 Q 网络的参数。代理通过学习来优化其行为,以最大化预期的累积奖励。

  7. 调优对算法的超参数进行调优,包括学习率、折扣因子、神经网络结构等。这有助于提高算法的性能和稳定性。

使用 Deep Q-Learning 解决 Lunar Lander 问题是一个复杂的任务,需要仔细调整和实验。算法的性能可能受到许多因素的影响,包括网络结构的选择、超参数的设置以及环境的建模等。

Lunar Lander环境

这个环境是 Box2D 环境的一部分,其中包含有关环境的一般信息。3d589ea4bd6f8f9071f664dd00f59080.png

描述 Description

该环境是一个经典的火箭轨迹优化问题。根据庞特里亚金最大值原理,最佳的方法是全速点火或关闭引擎。这就是为什么这个环境具有离散动作的原因:引擎开启或关闭。

有两个环境版本:离散或连续。着陆点始终位于坐标 (0,0)。坐标是状态向量中的前两个数字。在着陆点外着陆是可能的。燃料是无限的,因此代理可以学会飞行,然后在第一次尝试时着陆。

要查看启发式着陆,请运行:

python gymnasium/envs/box2d/lunar_lander.py

动作空间 Action Space

有四个可用的离散动作:

0:什么都不做

1:点火左定向引擎

2:点火主引擎

3:点火右定向引擎

观察空间 Observation Space

状态是一个8维向量:着陆器在 x 和 y 上的坐标,它在 x 和 y 上的线速度,它的角度,它的角速度,以及两个表示每条腿是否与地面接触的布尔值。

奖励 Rewards

每一步都会获得一个奖励。一个 episode(回合)总奖励是该 episode 中所有步骤的奖励之和。

对于每一步,奖励:

着陆器离着陆点越近/远,奖励增加/减少。

着陆器移动越慢/快,奖励增加/减少。

着陆器倾斜度越大,奖励减少(角度非水平)。

每个与地面接触的腿奖励增加10分。

每帧侧引擎点火,奖励减少0.03分。

每帧主引擎点火,奖励减少0.3分。

episode 因坠毁或安全着陆而额外获得-100或+100分的奖励。

如果一个 episode 得分至少为200分,则认为它是一个解决方案。

起始状态 Starting State

着陆器位于视口的顶部中心,对其质心施加随机初始力。

回合终止条件 Episode Termination

如果满足以下条件回合结束:

着陆器坠毁(着陆器本体与月球接触);

着陆器超出视口范围(x 坐标大于1);

着陆器处于非唤醒状态。根据 Box2D 文档,处于非唤醒状态的身体是不移动且不与任何其他身体碰撞的身体:

当 Box2D 确定一个身体(或一组身体)已经停止时,该身体进入了一种几乎没有 CPU 开销的休眠状态。如果一个身体醒着并与休眠中的身体发生碰撞,那么休眠中的身体会醒来。如果连接到它们的关节或接触被销毁,身体也会醒来。

参数 Arguments

要使用连续环境,您需要指定 continuous=True 参数,如下所示:

 
 
import gymnasium as gym
env = gym.make(
    "LunarLander-v2",
    continuous: bool = False,
    gravity: float = -10.0,
    enable_wind: bool = False,
    wind_power: float = 15.0,
    turbulence_power: float = 1.5,
)

如果传递 continuous=True,将使用连续动作(对应于引擎的油门),并且动作空间将是 Box(-1, +1, (2,), dtype=np.float32)。动作的第一个坐标确定主引擎的油门,而第二个坐标指定侧推器的油门。给定一个动作 np.array([main, lateral]),如果 main < 0,则主引擎将完全关闭,并且油门在 0 <= main <= 1 时按比例从 50% 缩放到 100%(特别是,主引擎在功率低于 50% 时不起作用)。同样,如果 -0.5 < lateral < 0.5,则侧推器将不会点火。如果 lateral < -0.5,则左推进器将点火,如果 lateral > 0.5,则右推进器将点火。同样,油门在 -1 到 -0.5 之间(以及 0.5 到 1 之间)按比例从 50% 缩放到 100%。

gravity 确定了重力常数,它被限制在 0 和 -12 之间。

如果传递 enable_wind=True,则着陆器将受到风的影响。风是使用函数 tanh(sin(2 k (t+C)) + sin(pi k (t+C))) 生成的。k 设置为 0.01。C 在 -9999 到 9999 之间随机抽样。

wind_power 确定了施加在飞行器上的线性风的最大幅度。wind_power 的推荐值在 0.0 到 20.0 之间。turbulence_power 确定了施加在飞行器上的旋转风的最大幅度。turbulence_power 的推荐值在 0.0 到 2.0 之间。

版本历史 Version History

v2:计算能量消耗,并在 v0.24 版本中,添加了具有风力和 turbulence_power 参数的湍流

v1:在状态向量中添加了与地面接触的腿;与地面接触会奖励 +10 分,如果失去接触则减去 -10 分;奖励重新调整为 200 分;更难的初始随机推力。

v0:初始版本

备注 Notes

在环境实现中存在一些意外的错误。

着陆器身体上侧推器的位置会随着着陆器的方向而变化。这反过来导致对着陆器施加方向依赖性扭矩。

状态的单位不一致。即:

角速度以每秒 0.4 弧度为单位。为了转换为每秒弧度,需要将该值乘以 2.5 的因子。

对于 VIEWPORT_W、VIEWPORT_H、SCALE 和 FPS 的默认值,比例因子相等:'x': 10 'y': 6.666 'vx': 5 'vy': 7.5 'angle': 1 'angular velocity': 2.5

在进行更正后,状态的单位如下:'x':(单位) 'y':(单位) 'vx':(单位/秒) 'vy':(单位/秒) 'angle':(弧度) 'angular velocity':(弧度/秒)

示例代码

################ 完整代码
import gymnasium as gym
from gym.wrappers.monitoring.video_recorder import VideoRecorder
from IPython.display import HTML, display
import imageio
import base64
import io
import glob
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.autograd as autograd
from torch.autograd import Variable
from collections import deque, namedtuple


######## 创建网络架构
class Network(nn.Module):
    def __init__(self, state_size, action_size, seed=42):
        super(Network, self).__init__()
        self.seed = torch.manual_seed(seed)
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_size)


    def forward(self, state):
        x = self.fc1(state)
        x = F.relu(x)
        x = self.fc2(x)
        x = F.relu(x)
        return self.fc3(x)


######## 设置环境:使用Gymnasium创建了LunarLander环境
state_shape = env.observation_space.shape
state_size = env.observation_space.shape[0]
number_actions = env.action_space.n
print('State shape: ', state_shape)
print('State size: ', state_size)
print('Number of actions: ', number_actions)
######## 初始化超参数:定义了学习率、批处理大小、折扣因子等超参数。
learning_rate = 5e-4
minibatch_size = 100
discount_factor = 0.99
replay_buffer_size = int(1e5)
interpolation_parameter = 1e-3


####### 实现经验回放:实现了经验回放(Experience Replay)的类 ReplayMemory,用于存储和采样Agent的经验
class ReplayMemory(object):
    def __init__(self, capacity):
        self.device = torch.device(
            "cuda:0" if torch.cuda.is_available() else "cpu")
        self.capacity = capacity
        self.memory = []


    def push(self, event):
        self.memory.append(event)
        if len(self.memory) > self.capacity:
            del self.memory[0]


    def sample(self, batch_size):
        experiences = random.sample(self.memory, k=batch_size)
        states = torch.from_numpy(np.vstack(
            [e[0] for e in experiences if e is not None])).float().to(self.device)
        actions = torch.from_numpy(
            np.vstack([e[1] for e in experiences if e is not None])).long().to(self.device)
        rewards = torch.from_numpy(np.vstack(
            [e[2] for e in experiences if e is not None])).float().to(self.device)
        next_states = torch.from_numpy(np.vstack(
            [e[3] for e in experiences if e is not None])).float().to(self.device)
        dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None]).astype(
            np.uint8)).float().to(self.device)
        return states, next_states, actions, rewards, dones


########## 实现 DQN 代理:创建了一个Agent类,包含本地Q网络和目标Q网络,包含了采取动作、学习、软更新等方法。
class Agent():
    # 初始化函数,参数为状态大小和动作大小
    def __init__(self, state_size, action_size):
        self.device = torch.device(
            "cuda:0" if torch.cuda.is_available() else "cpu")
        self.state_size = state_size
        self.action_size = action_size
        self.local_qnetwork = Network(state_size, action_size).to(self.device)
        self.target_qnetwork = Network(state_size, action_size).to(self.device)
        self.optimizer = optim.Adam(
            self.local_qnetwork.parameters(), lr=learning_rate)
        self.memory = ReplayMemory(replay_buffer_size)
        self.t_step = 0
    # 定义一个函数,用于存储经验并决定何时从中学习
    def step(self, state, action, reward, next_state, done):
        self.memory.push((state, action, reward, next_state, done))
        self.t_step = (self.t_step + 1) % 4
        if self.t_step == 0:
            if len(self.memory.memory) > minibatch_size:
                experiences = self.memory.sample(100)
                self.learn(experiences, discount_factor)
    # 定义一个函数,根据给定的状态和epsilon值选择一个动作(epsilon贪婪动作选择策略)0.表示浮点数
    def act(self, state, epsilon=0.):
        state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
        self.local_qnetwork.eval()
        with torch.no_grad():
            action_values = self.local_qnetwork(state)
        self.local_qnetwork.train()
        if random.random() > epsilon:
            return np.argmax(action_values.cpu().data.numpy())
        else:
            return random.choice(np.arange(self.action_size))
    # 定义一个函数,根据样本经验更新代理的q值,参数为经验和折扣因子
    def learn(self, experiences, discount_factor):
        states, next_states, actions, rewards, dones = experiences
        next_q_targets = self.target_qnetwork(
            next_states).detach().max(1)[0].unsqueeze(1)
        q_targets = rewards + discount_factor * next_q_targets * (1 - dones)
        q_expected = self.local_qnetwork(states).gather(1, actions)
        loss = F.mse_loss(q_expected, q_targets)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        self.soft_update(self.local_qnetwork,
                         self.target_qnetwork, interpolation_parameter)
    # 定义一个函数,用于软更新目标网络的参数,参数为本地模型,目标模型和插值参数
    def soft_update(self, local_model, target_model, interpolation_parameter):
        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            target_param.data.copy_(interpolation_parameter * local_param.data + (
                1.0 - interpolation_parameter) * target_param.data)


####### 训练DQN代理
agent = Agent(state_size, number_actions)


number_episodes = 2000
maximum_number_timesteps_per_episode = 1000
epsilon_starting_value = 1.0
epsilon_ending_value = 0.01
epsilon_decay_value = 0.995
epsilon = epsilon_starting_value
scores_on_100_episodes = deque(maxlen=100)


for episode in range(1, number_episodes + 1):
    state, _ = env.reset()
    score = 0
    for t in range(maximum_number_timesteps_per_episode):
        action = agent.act(state, epsilon)
        next_state, reward, done, _, _ = env.step(action)
        agent.step(state, action, reward, next_state, done)
        state = next_state
        score += reward
        if done:
            break
    scores_on_100_episodes.append(score)
    epsilon = max(epsilon_ending_value, epsilon_decay_value * epsilon)
    print('\rEpisode {}\tAverage Score: {:.2f}'.format(
        episode, np.mean(scores_on_100_episodes)), end="")
    if episode % 100 == 0:
        print('\rEpisode {}\tAverage Score: {:.2f}'.format(
            episode, np.mean(scores_on_100_episodes)))
    if np.mean(scores_on_100_episodes) >= 200.0:
        print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(
            episode - 100, np.mean(scores_on_100_episodes)))
        torch.save(agent.local_qnetwork.state_dict(), 'checkpoint.pth')
        break


####### 可视化结果
def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state)
        state, reward, done, _, _ = env.step(action.item())
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)




show_video_of_model(agent, 'LunarLander-v2')




def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")


show_video()

终端输出:

State shape:  (8,)
State size:  8
Number of actions:  4
Episode 100     Average Score: -174.79
Episode 200     Average Score: -102.59
Episode 300     Average Score: -68.797
Episode 400     Average Score: -38.18
Episode 500     Average Score: 24.301
Episode 600     Average Score: 149.24
Episode 700     Average Score: 134.89
Episode 800     Average Score: 185.41
Episode 826     Average Score: 200.92
Environment solved in 726 episodes!     Average Score: 200.92
IMAGEIO FFMPEG_WRITER WARNING: input image is not divisible by macro_block_size=16, resizing from (600, 400) to (608, 400) to ensure video compatibility with most codecs and players. To prevent resizing, make your input image divisible by the macro_block_size or set the macro_block_size to 1 (risking incompatibility).
[swscaler @ 000001e276cb1f00] Warning: data is not aligned! This can lead to a speed loss
<IPython.core.display.HTML object>

总结:以上代码是使用深度 Q 学习(DQN)算法训练一个智能体在月球着陆环境中控制火箭的示例。代码分为以下几个部分:

• 导入所需的库和模块,包括 gymnasium(一个开源的强化学习环境库),torch(一个开源的深度学习框架),以及一些辅助的库和模块,如 imageio(用于处理图像和视频),base64(用于编码和解码数据),deque(用于实现双端队列),namedtuple(用于创建命名元组)等。

import io  # 导入io模块,用于读写文件
import glob  # 导入glob模块,用于查找文件
import gymnasium as gym  # 导入环境库
import os  # os 是操作系统相关的库,用来处理文件和目录等。
import random  # random 是随机数生成和操作的库,用来实现 epsilon 贪心策略等。
import numpy as np  # numpy 是科学计算的库,用来处理多维数组和矩阵等。
import torch  # torch 是 PyTorch 框架的主要库,用来实现张量和神经网络等。
import torch.nn as nn  # torch.nn 是 PyTorch 框架的神经网络模块,用来定义神经网络的层和损失函数等。
import torch.optim as optim  # torch.optim 是 PyTorch 框架的优化器模块,用来定义优化算法和更新参数等。
# torch.nn.functional 是 PyTorch 框架的函数式接口,用来实现激活函数和池化等。
import torch.nn.functional as F
# torch.autograd 是 PyTorch 框架的自动微分模块,用来实现反向传播和梯度计算等。
import torch.autograd as autograd
# torch.autograd.Variable 是 PyTorch 框架的变量类,用来封装张量和梯度等。
from torch.autograd import Variable
# collections 是 Python 的内置模块,用来实现特殊的容器类型,如双端队列和命名元组等。
from collections import deque, namedtuple

• 定义 Network 类,继承自 torch.nn.Module 类,用来构建神经网络模型,包括三个全连接层和一个前向传播函数,输入是状态的维度,输出是动作的个数。

class Network(nn.Module):  # 继承nn模块中的Module类
    def __init__(self, state_size, action_size, seed=42):  # 初始化函数,参数为状态大小(8),动作大小(4),随机种子(42)
        super(Network, self).__init__()  # 调用父类的初始化函数
        self.seed = torch.manual_seed(seed)  # 设置随机种子
        # 创建第一个全连接层,输入为状态大小,输出为64个神经元,这里64是为了适应月球着陆的问题
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 64)  # 创建第二个全连接层,输入为64个神经元,输出为64个神经元
        self.fc3 = nn.Linear(64, action_size)  # 创建第三个全连接层,输入为64个神经元,输出为动作大小
        # 完成神经网络的构建


    def forward(self, state):  # 前向传播函数
        x = self.fc1(state)  # 从输入层接收状态
        x = F.relu(x)  # 使用激活函数
        x = self.fc2(x)  # 从第一个隐藏层接收输出
        x = F.relu(x)  # 使用激活函数
        return self.fc3(x)  # 返回最后一层的输出

• 创建月球着陆环境,使用 gym.make 函数,获取环境的状态空间和动作空间的属性,如状态的形状,状态的维度,动作的个数等。

# https://gymnasium.farama.org/environments/box2d/lunar_lander/
env = gym.make('LunarLander-v2')
# 导入月球着陆环境
state_shape = env.observation_space.shape  # 状态的形状,这里是8维向量
state_size = env.observation_space.shape[0]  # 状态的大小,这里是8个元素,包括坐标,速度等
number_actions = env.action_space.n  # 动作的数量,这里是4个
print('State shape: ', state_shape)
print('State size: ', state_size)
print('Number of actions: ', number_actions)

• 定义一些超参数,如学习率,批次大小,折扣因子,回放缓冲区的容量,软更新的插值参数等。

learning_rate = 5e-4  # 学习率,这里是0.00005
minibatch_size = 100  # 批次大小,用于更新参数
discount_factor = 0.99  # 折扣因子,用于计算未来奖励,越接近1越考虑未来,越接近0越考虑当前
replay_buffer_size = int(1e5)  # 重放缓冲区的大小,用于存储人工智能的经验,稳定和改善学习,这里是10万个经验
interpolation_parameter = 1e-3  # 插值参数,用于更新目标网络,这里是0.001
# 所有的参数都是通过实验得到的最优值

• 定义 ReplayMemory 类,用来实现经验回放机制,包括一个初始化函数,一个存储经验的函数,一个采样经验的函数,使用双端队列来存储经验元组,使用随机采样的方法来获取一批经验,将经验转换为 PyTorch 张量并发送到 cpu 或 gpu 上。

# 定义一个重放缓冲区的类
class ReplayMemory(object):
    def __init__(self, capacity):  # 初始化函数,参数为缓冲区的容量
        # 判断是否有cuda可用,如果有则使用cuda,否则使用cpu
        self.device = torch.device(
            "cuda:0" if torch.cuda.is_available() else "cpu")
        self.capacity = capacity  # 设置缓冲区的容量
        self.memory = []  # 创建一个列表,用于存储经验,每个经验包括状态,动作,奖励,下一个状态,是否结束等


    def push(self, event):  # 定义一个函数,用于向缓冲区中添加经验
        self.memory.append(event)  # 将经验添加到列表中
        if len(self.memory) > self.capacity:  # 如果列表的长度超过了容量
            del self.memory[0]  # 删除最旧的经验


    def sample(self, batch_size):  # 定义一个函数,用于从缓冲区中随机抽取一批经验
        experiences = random.sample(self.memory, k=batch_size)  # 从列表中随机抽取k个经验
        # 从经验中提取每个元素,并将它们堆叠在一起
        # 使用列表推导式,从每个经验中提取状态,使用np.vstack将它们垂直堆叠,然后转换为pytorch张量,使用.float()将它们转换为浮点数,使用.to(self.device)将它们发送到cpu或gpu
        states = torch.from_numpy(np.vstack(
            [e[0] for e in experiences if e is not None])).float().to(self.device)
        actions = torch.from_numpy(np.vstack([e[1] for e in experiences if e is not None])).long(
        ).to(self.device)  # 同理,从每个经验中提取动作,使用.long()将它们转换为整数
        rewards = torch.from_numpy(np.vstack([e[2] for e in experiences if e is not None])).float(
        ).to(self.device)  # 同理,从每个经验中提取奖励,使用.float()将它们转换为浮点数
        next_states = torch.from_numpy(np.vstack([e[3] for e in experiences if e is not None])).float(
        ).to(self.device)  # 同理,从每个经验中提取下一个状态,使用.float()将它们转换为浮点数
        dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None]).astype(np.uint8)).float(
        ).to(self.device)  # 同理,从每个经验中提取是否结束的标志,使用.astype(np.uint8)将它们转换为无符号整数,使用.float()将它们转换为浮点数
        return states, next_states, actions, rewards, dones  # 返回这些元素,注意顺序要一致

• 定义 Agent 类,用来实现深度 Q 学习的智能体,包括一个初始化函数,一个执行一步操作的函数,一个选择动作的函数,一个学习的函数,一个软更新的函数,使用两个神经网络模型,一个是本地 Q 网络,用来选择和评估动作,一个是目标 Q 网络,用来计算目标 Q 值,使用 Adam 优化器来更新本地 Q 网络的参数,使用 ReplayMemory 类的实例来存储和采样经验,使用时间步来控制何时学习,使用 epsilon 贪心策略来平衡探索和利用,使用均方误差损失函数来计算预期 Q 值和目标 Q 值之间的差异,使用软更新的方法来更新目标 Q 网络的参数。

# 定义一个代理类
class Agent():
    def __init__(self, state_size, action_size):  # 初始化函数,参数为状态大小和动作大小
        # 判断是否有cuda可用,如果有则使用cuda,否则使用cpu
        self.device = torch.device(
            "cuda:0" if torch.cuda.is_available() else "cpu")
        self.state_size = state_size  # 创建对象变量,存储状态大小
        self.action_size = action_size  # 创建对象变量,存储动作大小
        # Q学习 --
        self.local_qnetwork = Network(state_size, action_size).to(
            self.device)  # 创建一个本地网络,用于选择动作,将其发送到设备上,随机种子已经在之前提供
        self.target_qnetwork = Network(state_size, action_size).to(
            self.device)  # 创建一个目标网络,用于计算目标值,将其发送到设备上


        # 创建一个优化器,用于更新本地网络的参数,使用Adam算法,学习率为之前定义的值
        self.optimizer = optim.Adam(
            self.local_qnetwork.parameters(), lr=learning_rate)
        # 创建一个重放缓冲区,用于存储人工智能的经验,容量为之前定义的值
        self.memory = ReplayMemory(replay_buffer_size)
        self.t_step = 0  # 时间步数


    def step(self, state, action, reward, next_state, done):  # 定义一个函数,用于存储经验并决定何时从中学习
        self.memory.push((state, action, reward, next_state, done))  # 将经验推入缓冲区
        self.t_step = (self.t_step + 1) % 4  # 时间步数计数器(每4步学习一次)
        if self.t_step == 0:  # 如果时间步数为0
            # 如果缓冲区中的经验数量大于批次大小,self.memory是重放缓冲区的实例,memory属性是__init__中定义的列表
            if len(self.memory.memory) > minibatch_size:
                experiences = self.memory.sample(100)  # 从缓冲区中随机抽取100个经验
                self.learn(experiences, discount_factor)  # 从经验中学习,使用之前定义的折扣因子


    def act(self, state, epsilon=0.):  # 定义一个函数,根据给定的状态和epsilon值选择一个动作(epsilon贪婪动作选择策略)0.表示浮点数
        # 将状态转换为pytorch张量,增加一个维度,表示批次的维度,0表示批次维度的索引,将其放在最前面,将张量发送到设备上
        state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
        self.local_qnetwork.eval()  # 将本地网络设置为评估模式,不更新梯度,本地网络是代理类的属性,继承自nn.Module类,有eval()方法
        with torch.no_grad():  # 不计算梯度,检查是否处于推理模式而不是训练模式
            # 使用本地网络对状态进行前向传播,得到每个动作的价值,这些价值将被epsilon贪婪策略选择(这里我们得到的不是最终的值,而是对应于状态的q值)
            action_values = self.local_qnetwork(state)
        self.local_qnetwork.train()  # 将本地网络设置为训练模式,更新梯度,本地网络是代理类的属性,继承自nn.Module类,有train()方法
        # Eplison贪婪动作选择策略 --(用于探索和利用的平衡)
        if random.random() > epsilon:  # 如果随机数大于epsilon,random.random()是random库的方法,返回一个0到1之间的随机数
            # 返回价值最大的动作,np.argmax是numpy库的方法,返回最大值的索引,action_values发送到cpu上,因为它是简单的,data.numpy()将格式转换为numpy的数据格式
            return np.argmax(action_values.cpu().data.numpy())
        else:  # 否则
            # 返回随机的动作,random.choice是random库的方法,从给定的列表中随机选择一个元素,np.arange是numpy库的方法,返回一个从0到动作大小的数组
            return random.choice(np.arange(self.action_size))


    def learn(self, experiences, discount_factor):  # 定义一个函数,根据样本经验更新代理的q值,参数为经验和折扣因子
        states, next_states, actions, rewards, dones = experiences  # 从经验中解包元素
        # 从目标网络中获取下一个状态的最大q值,self.target_qnetwork(next_states)返回每个动作的价值,.detach()将张量从计算图中分离,我们不会在反向传播中使用这些值,.max(1)沿着第一个维度取最大值,得到两个张量(最大值和索引),因此我们添加.max[1][0]只获取最大值,.unsqueeze(1)增加一个维度,表示批次的维度,但这次在第一个位置
        next_q_targets = self.target_qnetwork(
            next_states).detach().max(1)[0].unsqueeze(1) # 从目标Q网络的输出中,选择每个下一个状态对应的最大Q值,然后将其整理成一个列向量 next_q_targets。这个向量将用于计算Q-learning的目标值,以便更新本地Q网络
        q_targets = rewards + discount_factor * next_q_targets * \
            (1 - dones)  # 计算当前状态的目标值,使用公式:奖励加上折扣后的未来最大值,乘以1减去结束标志
        q_expected = self.local_qnetwork(states).gather(
            1, actions)  # 获取当前状态的预期值,.gather(1, actions)根据动作选择对应的价值
        loss = F.mse_loss(q_expected, q_targets)  # 计算预期值和目标值之间的均方误差
        self.optimizer.zero_grad()  # 清空优化器的梯度,zero_grad()是Adam的方法
        loss.backward()  # 反向传播误差
        self.optimizer.step()  # 更新本地网络的参数,step()是优化器的方法
        # 使用软更新的方法更新目标网络的参数,参数为本地网络,目标网络和插值参数
        self.soft_update(self.local_qnetwork,
                         self.target_qnetwork, interpolation_parameter)


    # 定义一个函数,用于软更新目标网络的参数,参数为本地模型,目标模型和插值参数
    def soft_update(self, local_model, target_model, interpolation_parameter):
        # 遍历目标模型和本地模型的参数,zip()是一个函数,用于将参数打包成元组,parameters()是nn.Module的方法,返回模型的参数
        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            # 使用插值参数更新目标模型的参数,将本地模型的参数乘以插值参数,再加上目标模型的参数乘以1减去插值参数,.copy_()是一个方法,用于复制张量的数据
            target_param.data.copy_(interpolation_parameter * local_param.data + (
                1.0 - interpolation_parameter) * target_param.data)

• 创建 Agent 类的实例,传入状态维度和动作个数。

agent = Agent(state_size, number_actions)  # 创建一个代理或人工智能,参数为状态大小和动作数量

• 定义训练过程,包括回合数,每个回合的最大时间步数,epsilon 值的初始值,终止值,衰减值,记录最近 100 个回合的分数,遍历每个回合,重置环境,初始化分数,遍历每个时间步,选择动作,执行动作,存储经验,更新状态,累加奖励,判断是否结束,更新 epsilon 值,打印当前回合和平均分数,判断是否达到目标分数,保存模型参数。

# ### 训练DQN代理
# %%
number_episodes = 2000  # 我们想要训练的次数
# 我们不想让一个回合卡住,所以设置一个最大的时间步数(在月球上着陆的尝试最多为1000个时间步)
maximum_number_timesteps_per_episode = 1000
epsilon_starting_value = 1.0  # epsilon的初始值
epsilon_ending_value = 0.01  # epsilon的结束值
epsilon_decay_value = 0.995  # epsilon的衰减率,按照(1*0.995 , 1*0.995*0.995,...)的方式递减
epsilon = epsilon_starting_value  # epsilon的变量
scores_on_100_episodes = deque(maxlen=100)  # 最近100个回合的分数(列表) 创建了一个双端队列,最大长度为100


for episode in range(1, number_episodes + 1):  # 循环直到2000次(上限固定)
    state, _ = env.reset()  # 重置环境(这里返回状态和观察值)
    score = 0  # 每个回合的累积分数
    for t in range(maximum_number_timesteps_per_episode):  # 循环每个时间步
        action = agent.act(state, epsilon)  # 根据当前状态和epsilon贪婪策略选择一个动作
        # 根据动作执行环境的步骤,返回下一个状态,奖励,是否结束等值,_是丢弃不需要的值
        next_state, reward, done, _, _ = env.step(action)
        agent.step(state, action, reward, next_state, done)  # 调用代理的学习方法
        state = next_state  # 更新状态
        score += reward  # 累加奖励
        if done:  # 如果回合结束
            break  # 跳出循环
    scores_on_100_episodes.append(score)  # 将最近的回合分数添加到列表中
    epsilon = max(epsilon_ending_value, epsilon_decay_value *
                  epsilon)  # 更新epsilon的值,使其按照衰减率递减,但不低于结束值
    print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(
        scores_on_100_episodes)), end="")  # \r覆盖之前的输出,\t制表符,.2f保留两位小数,打印当前的回合数和平均分数,end表示不换行
    if episode % 100 == 0:  # 每100个回合
        print('\rEpisode {}\tAverage Score: {:.2f}'.format(
            episode, np.mean(scores_on_100_episodes)))  # \r覆盖之前的输出,打印当前的回合数和平均分数,换行
    if np.mean(scores_on_100_episodes) >= 200.0:  # 如果达到胜利的条件
        print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(
            episode - 100, np.mean(scores_on_100_episodes)))  # \n换行,:d表示整数,打印环境在多少个回合内解决,以及平均分数
        torch.save(agent.local_qnetwork.state_dict(),
                   'checkpoint.pth')  # 将当前Agent的本地Q网络(agent.local_qnetwork)的参数保存到文件中
        break  # 跳出循环

• 定义展示模型表现的函数,使用 imageio 库将每一帧的图像保存为视频文件,使用 HTML 标签和 base64 编码在网页中显示视频。

• 调用展示模型表现的函数,传入智能体和环境的名称。

from gym.wrappers.monitoring.video_recorder import VideoRecorder  # 导入gym模块,用于录制视频
from IPython.display import HTML, display  # 导入IPython模块,用于显示HTML和视频
import imageio  # 导入imageio模块,用于处理图像
import base64  # 导入base64模块,用于编码和解码
import io  # 导入io模块,用于读写文件
import glob  # 导入glob模块,用于查找文件
import gymnasium as gym  # 导入环境库
# %%




def show_video_of_model(agent, env_name):  # 定义一个函数,用于展示代理在环境中的表现
    env = gym.make(env_name, render_mode='rgb_array')  # 创建一个环境,渲染模式为rgb数组
    state, _ = env.reset()  # 重置环境,获取初始状态
    done = False  # 设置结束标志为False
    frames = []  # 创建一个列表,用于存储每一帧的图像
    while not done:  # 循环直到结束
        frame = env.render()  # 渲染环境,获取当前帧的图像
        frames.append(frame)  # 将图像添加到列表中
        action = agent.act(state)  # 根据当前状态选择一个动作
        state, reward, done, _, _ = env.step(
            action.item())  # 根据动作执行环境的步骤,获取下一个状态,奖励,是否结束等值
    env.close()  # 关闭环境
    # 使用imageio模块,将列表中的图像保存为视频,帧率为30
    imageio.mimsave('video.mp4', frames, fps=30)




show_video_of_model(agent, 'LunarLander-v2')  # 调用函数,展示代理在月球着陆环境中的表现




def show_video():  # 定义一个函数,用于显示视频
    mp4list = glob.glob('*.mp4')  # 使用glob模块,查找当前目录下的所有mp4文件
    if len(mp4list) > 0:  # 如果找到了
        mp4 = mp4list[0]  # 取第一个文件
        video = io.open(mp4, 'r+b').read()  # 使用io模块,以二进制模式读取文件
        encoded = base64.b64encode(video)  # 使用base64模块,对文件进行编码
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))  # 使用IPython模块,显示HTML格式的视频,使用base64编码的数据作为源
    else:  # 如果没有找到
        print("Could not find video")  # 打印提示信息
show_video()  # 调用函数,显示视频

笔记

e77e7b7239a8ac237536d183a2ada5b6.png

cfb8768c3a4698b1d5ff61849c12ac03.png

da4840839d72ae58338cd2f6860fc4b7.png

参考网址:

https://gymnasium.farama.org/environments/box2d/lunar_lander/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/354727.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

教学质量常态监控与评价平台

教学质量常态监控与评价平台&#xff0c;以提高教学质量为目标导向&#xff0c;利用Al、大数据等新型技术手段作为技术支撑&#xff0c;服务于教学质量科、督导、教师、学生等角色&#xff0c;基于教学过程数据&#xff0c;关注教师的教学内涵&#xff0c;覆盖了对教师、课堂、…

【刷题】 leetcode 面试题 08.05.递归乘法

递归乘法 1 题目描述2 思路一&#xff08;返璞归真版&#xff09;3 思路二&#xff08;二进制乘法器版&#xff09;4 思路三&#xff08;变态版&#xff09;Thanks♪(&#xff65;ω&#xff65;)&#xff89;谢谢阅读下一篇文章见&#xff01;&#xff01;&#xff01; 1 题目…

数据结构.双链表循环链表

一、1.双链表的初始化 void InitLNode(LinkList& L)//双链表的初始化 {L (LNode*)malloc(sizeof(LNode));L->prior NULL;L->next NULL;} 2.双链表的插入 void DInsert(LNode* p,LNode*s)//在p结点后面插入s结点 {s->next p->next;s->next->prior s;…

使用dockers-compose搭建开源监控和可视化工具

简介 Prometheus 和 Grafana 是两个常用的开源监控和可视化工具。 Prometheus 是一个用于存储和查询时间序列数据的系统。它提供了用于监控和报警的数据收集、存储、查询和图形化展示能力。Prometheus 使用拉模型&#xff08;pull model&#xff09;&#xff0c;通过 HTTP 协议…

微信小程序(十九)组件通信(子传父)

注释很详细&#xff0c;直接上代码 上一篇 新增内容&#xff1a; 1.定义触发事件向父组件传输数据 2.父组件绑定绑定触发事件并获取数据 源码&#xff1a; myNav.wxml <view class"navigationBar custom-class" style"padding-top: {{test}}px;">&l…

【GitHub项目推荐--不错的 React 开源项目】【转载】

用 React Flow 连接你的想法 用 React Flow 连接你的想法&#xff0c;这是一个高度可定制的库&#xff0c;基于 React 用于构建基于节点的 交互式 UI、编辑器、流程图和图表。 开源地址&#xff1a;https://github.com/wbkd/react-flow Bulletproof React 一个简单、可扩展且…

什么时跨域问题和如何解决跨域问题

什么是跨域问题&#xff1f;解决跨域的方案都有哪些&#xff1f;日常工作中会使用哪种解决方案&#xff1f; 跨域问题指的是不同站点之间&#xff0c;使用ajax无法互相调用的问题。跨域问题本质是浏览器的一种保护机制&#xff0c;它的初衷是为了保证用户的安全&#xff0c;防…

一道CTF签到题

点击题目的签到&#xff0c;提示&#xff1a; 看来需要修改请求的源地址&#xff1a; 上来我先尝试了我最常用的xff&#xff0c;结果不行&#xff0c;于是尝试了其他的几个常用请求头&#xff1a; 1.host头 如果后端从host取值来判断是否是本地就可以通过此方法进行绕过&…

【语录】岁月

中年 写中年&#xff0c;应该是年少励志三千里 踌躇百步无寸功&#xff0c;转眼高堂已白发 儿女蹒跚学堂中&#xff0c;不如意事常八九&#xff0c;可与人言无二三 可是诸位&#xff0c;不用悲伤&#xff0c;稻盛和夫说&#xff0c; 人生并不是一场物质的盛宴&#xff0c;而是…

面经基础版案例(路由,请求渲染,传参,组件缓存)

文章目录 1.案例效果分析2.配置一级路由&#xff08;首页&#xff0c;详情&#xff09;3.配置二级路由4.导航高亮效果5.首页的请求渲染6.传参&#xff08;查询参数 $ 动态路由&#xff09;7.详情页渲染8.组件缓存kepp-alive9.总结 1.案例效果分析 2.配置一级路由&#xff08;首…

MGRE综合实验

一&#xff1a;实验要求 二&#xff1a;实验过程 1、配置IP [r1]int g0/0/0 [r1-GigabitEthernet0/0/0]ip add 192.168.1.1 24 [r1-GigabitEthernet0/0/0]q [r1]int s4/0/0 [r1-Serial4/0/0]ip add 15.0.0.1 8 [r2]int g0/0/0 [r2-GigabitEthernet0/0/0]ip add 192.168.2.1 2…

【高效开发工具系列】Java读取Html

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

STM32 SDIO接口传输中的错误处理和中断优化技巧

在 STM32 的 SDIO 接口传输中&#xff0c;错误处理和中断优化是确保传输稳定和可靠性的重要方面。下面将介绍一些常用的错误处理和中断优化技巧&#xff0c;并给出相应的代码示例。 ✅作者简介&#xff1a;热爱科研的嵌入式开发者&#xff0c;修心和技术同步精进 ❤欢迎关注我的…

Redis 持久化详解:RDB 与 AOF 的配置、触发机制和实际测试

什么是持久化&#xff1f; 就是 Redis 将内存数据持久化到硬盘&#xff0c;避免从数据库恢复数据。之所以避免从数据库恢复数据是因为后端数据通常有性能瓶颈&#xff0c;大量数据从数据库恢复可能会给数据库造成巨大压力。 Redis 持久化通常有 RDB 和 AOF 两种方式&#xff…

VMware虚拟机部署Linux Ubuntu系统

本文介绍基于VMware Workstation Pro虚拟机软件&#xff0c;配置Linux Ubuntu操作系统环境的方法。 首先&#xff0c;我们需要进行VMware Workstation Pro虚拟机软件的下载与安装。需要注意的是&#xff0c;VMware Workstation Pro软件是一个收费软件&#xff0c;而互联网中有很…

一道CTFsql注入的问题

打开页面一个登录框&#xff0c;我甚至都没先弱口令登录&#xff0c;直接开始sql测试&#xff0c;结果测到最后发现没有&#xff0c;直接admin/admin就就去了&#xff0c;下次遇到登录框请先测试弱口令&#xff0c;ok&#xff1f; 登录成功后&#xff1a; 这句话提示中提到了…

linux(进程概念)

目录 前言&#xff1a; 正文 冯诺依曼体系结构 操作系统 &#xff08;Operator System&#xff09; 概念 目的 定位 如何理解“管理” 进程组织 基本概念 内核数据结构 代码和数据 查看进程 ps指令 top指令 父子进程 fork创建进程 小结&#xff1a; 前…

【Web】云服务器的购买与使用

目录 一、云服务器的选择 二、配置 一、云服务器的选择 腾讯云、阿里云都可以 有首次试用、学生打折啥的。 租借时间、配置都可以自行选择 二、配置 先进行重置密码、拍摄快照

HCS 华为云Stack产品组件

HCS 华为云Stack产品组件 Cloud Provisioning Service(CPS) 负责laas的云平台层的部署和升级是laas层中真正面向硬件设备&#xff0c;并将其池化软件化的部件。 Service OM 资源池(计算/存储/网络)以及基础云服务(ECS/EVS/PC)的管理工具。 ManageOne ManageOne包括服务中心…

【服务器Midjourney】创建部署Midjourney网站

目录 🌺【前言】 🌺【准备】 🌺【宝塔搭建MJ】 🌼1. 给服务器添加端口 🌼2. 使用Xshell连接服务器 🌼3. 安装docker 🌼4. 安装Midjourney程序 🌼5. 绑定域名+申请SSL证书 🌼6. 更新网站