PyTorch深度学习实战——使用深度Q学习进行Pong游戏
- 0. 前言
- 1. 结合固定目标网络的深度 Q 学习模型
- 1.1 模型输入
- 1.2 模型策略
- 2. 实现深度 Q 学习进行 Pong 游戏
- 相关链接
0. 前言
我们已经学习了如何利用深度 Q 学习来进行 Gym
中的 CartPole 游戏。在本节中,我们将研究更复杂的 Pong 游戏,并了解如何结合深度 Q 学习与固定目标网络进行此游戏,同时利用基于卷积神经网络 (Convolutional Neural Networks
, CNN
) 的模型替代普通神经网络。
1. 结合固定目标网络的深度 Q 学习模型
1.1 模型输入
在本节中,我们的目标是构建一个可以与计算机进行乒乓球对战并击败它的智能体,该智能体预计能够获得 21
分。我们采用以下策略训练智能体用于进行 Pong
游戏:
裁剪图像的无关部分,获取游戏当前帧(状态):
在上示图像中,我们获取了原始图像,并裁剪原始图像的顶部和底部像素。
1.2 模型策略
为了构建具有固定目标网络的深度 Q 学习模型,使用以下策略:
- 堆叠四个连续的帧——智能体需要状态序列了解球是否正在向它靠近
- 智能体在初始阶段采取随机动作进行游戏,并将当前状态、未来状态、采取的动作和奖励存储在内存中,只保留最近的
10,000
个动作的信息,并清除超过10,000
的历史记录 - 构建预测网络,从内存中获取状态样本并预测可能动作的值
- 定义作为预测网络的副本——目标网络
- 预测网络每更新
1000
次就更新一次目标网络 - 每
1000
个epoch
结束时目标网络的权重与预测网络的权重相同 - 利用目标网络计算下一个状态下最佳动作的 Q 值
- 对于预测网络提议的动作,我们期望它预测即时奖励和下一个状态中最佳动作的 Q 值的总和
- 最小化预测网络的
MSE
损失 - 令智能体持续游戏,直到奖励最大化
2. 实现深度 Q 学习进行 Pong 游戏
根据上一小节的策略,使用 PyTorch
实现智能体,用于最大化 Pong
游戏奖励。
(1) 导入相关库,搭建游戏环境:
import gym
import numpy as np
import cv2
from collections import deque
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import random
from collections import namedtuple, deque
import torch
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
env = gym.make('PongDeterministic-v0')
(2) 获取状态空间和动作空间大小:
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
(3) 定义预处理函数,以便删除无关的底部和顶部像素:
def preprocess_frame(frame):
bkg_color = np.array([144, 72, 17])
img = np.mean(frame[34:-16:2,::2]-bkg_color, axis=-1)/255.
resized_image = img
return resized_image
(4) 定义用于堆叠四个连续游戏帧的函数。
函数将 stack_frames
、当前状态 state
和标志 is_new_episode
作为输入:
def stack_frames(stacked_frames, state, is_new_episode):
# Preprocess frame
frame = preprocess_frame(state)
stack_size = 4
如果为新一回合,则从初始帧开始:
if is_new_episode:
# Clear stacked_frames
stacked_frames = deque([np.zeros((80,80), dtype=np.uint8) for i in range(stack_size)], maxlen=4)
# Because we're in a new episode, copy the same frame 4x
for i in range(stack_size):
stacked_frames.append(frame)
# Stack the frames
stacked_state = np.stack(stacked_frames, axis=2).transpose(2, 0, 1)
如果并非新一回合,则从 stacked_frames
中删除最旧的帧并添加最新的帧:
else:
# Append frame to deque, automatically removes the oldest frame
stacked_frames.append(frame)
# Build the stacked state (first dimension specifies different frames)
stacked_state = np.stack(stacked_frames, axis=2).transpose(2, 0, 1)
return stacked_state, stacked_frames
(5) 定义网络架构 DQNetwork
:
class DQNetwork(nn.Module):
def __init__(self, states, action_size):
super(DQNetwork, self).__init__()
self.conv1 = nn.Conv2d(4, 32, (8, 8), stride=4)
self.conv2 = nn.Conv2d(32, 64, (4, 4), stride=2)
self.conv3 = nn.Conv2d(64, 64, (3, 3), stride=1)
self.flatten = nn.Flatten()
self.fc1 = nn.Linear(2304, 512)
self.fc2 = nn.Linear(512, action_size)
def forward(self, state):
x = F.relu(self.conv1(state))
x = F.relu(self.conv2(x))
x = F.relu(self.conv3(x))
x = self.flatten(x)
x = F.relu(self.fc1(x))
x = self.fc2(x)
return x
(6) 定义 Agent
类。
定义 __init__
方法:
class Agent():
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.seed = random.seed(0)
## hyperparameters
self.buffer_size = 10000
self.batch_size = 32
self.gamma = 0.99
self.lr = 0.0001
self.update_every = 4
self.update_every_target = 1000
self.learn_every_target_counter = 0
# Q-Network
self.local = DQNetwork(state_size, action_size).to(device)
self.target = DQNetwork(state_size, action_size).to(device)
self.optimizer = optim.Adam(self.local.parameters(), lr=self.lr)
# Replay memory
self.memory = deque(maxlen=self.buffer_size)
self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])
# Initialize time step (for updating every few steps)
self.t_step = 0
在 __init__
方法添加目标网络和目标网络的更新频率。
定义权重更新方法 step
:
def step(self, state, action, reward, next_state, done):
# Save experience in replay memory
self.memory.append(self.experience(state[None], action, reward, next_state[None], done))
# Learn every update_every time steps.
self.t_step = (self.t_step + 1) % self.update_every
if self.t_step == 0:
# If enough samples are available in memory, get random subset and learn
if len(self.memory) > self.batch_size:
experiences = self.sample_experiences()
self.learn(experiences, self.gamma)
定义 act
方法,根据给定状态获取要执行的动作:
def act(self, state, eps=0.):
# Epsilon-greedy action selection
if random.random() > eps:
state = torch.from_numpy(state).float().unsqueeze(0).to(device)
self.local.eval()
with torch.no_grad():
action_values = self.local(state)
self.local.train()
return np.argmax(action_values.cpu().data.numpy())
else:
return random.choice(np.arange(self.action_size))
定义 learn
函数,训练预测模型:
def learn(self, experiences, gamma):
self.learn_every_target_counter+=1
states, actions, rewards, next_states, dones = experiences
# Get expected Q values from local model
Q_expected = self.local(states).gather(1, actions)
# Get max predicted Q values (for next states) from target model
Q_targets_next = self.target(next_states).detach().max(1)[0].unsqueeze(1)
# Compute Q targets for current state
Q_targets = rewards + (gamma * Q_targets_next * (1 - dones))
# Compute loss
loss = F.mse_loss(Q_expected, Q_targets)
# Minimize the loss
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# ------------------- update target network ------------------- #
if self.learn_every_target_counter%1000 ==0:
self.target_update()
在以上代码中,Q_targets_next
是使用目标模型预测的,而非预测模型,每经过 1,000
步后更新目标网络,其中 learn_every_target_counter
是用于确定是否应该更新目标模型的计数器。
定义函数 target_update
用于更新目标模型:
def target_update(self):
print('target updating')
self.target.load_state_dict(self.local.state_dict())
定义函数 sample_experiences
用于从内存中采样经验:
def sample_experiences(self):
experiences = random.sample(self.memory, k=self.batch_size)
states = torch.from_numpy(np.vstack([e.state for e in experiences if e is not None])).float().to(device)
actions = torch.from_numpy(np.vstack([e.action for e in experiences if e is not None])).long().to(device)
rewards = torch.from_numpy(np.vstack([e.reward for e in experiences if e is not None])).float().to(device)
next_states = torch.from_numpy(np.vstack([e.next_state for e in experiences if e is not None])).float().to(device)
dones = torch.from_numpy(np.vstack([e.done for e in experiences if e is not None]).astype(np.uint8)).float().to(device)
return (states, actions, rewards, next_states, dones)
(7) 定义智能体对象:
agent = Agent(state_size, action_size)
(8) 定义用于训练智能体的参数:
n_episodes=5000
max_t=5000
eps_start=1.0
eps_end=0.02
eps_decay=0.995
scores = [] # list containing scores from each episode
scores_window = deque(maxlen=100) # last 100 scores
eps = eps_start
stack_size = 4
stacked_frames = deque([np.zeros((80,80), dtype=np.int) for i in range(stack_size)], maxlen=stack_size)
(9) 训练智能体:
for i_episode in range(1, n_episodes+1):
state = env.reset()
state, frames = stack_frames(stacked_frames, state, True)
score = 0
for i in range(max_t):
action = agent.act(state, eps)
next_state, reward, done, _ = env.step(action)
next_state, frames = stack_frames(frames, next_state, False)
agent.step(state, action, reward, next_state, done)
state = next_state
score += reward
if done:
break
scores_window.append(score) # save most recent score
scores.append(score) # save most recent score
eps = max(eps_end, eps_decay*eps) # decrease epsilon
print('\rEpisode {}\tReward {} \tAverage Score: {:.2f} \tEpsilon: {}'.format(i_episode,score,np.mean(scores_window), eps), end="")
if i_episode % 100 == 0:
print('\rEpisode {}\tAverage Score: {:.2f} \tEpsilon: {}'.format(i_episode, np.mean(scores_window), eps))
随着训练回合的增加分数变化情况如下:
import matplotlib.pyplot as plt
plt.plot(scores)
plt.title('Scores over increasing episodes')
plt.show()
从上图中可以看出,智能体逐渐学会了进行 Pong
游戏,能够获得较高奖励。
相关链接
PyTorch深度学习实战(1)——神经网络与模型训练过程详解
PyTorch深度学习实战(3)——使用PyTorch构建神经网络
PyTorch深度学习实战(11)——卷积神经网络
PyTorch深度学习实战(45)——强化学习
PyTorch深度学习实战(46)——深度Q学习