1. 深度Q网络(DQN)回顾
DQN通过神经网络近似状态-动作值函数(Q函数),在训练过程中使用经验回放(Experience Replay)和固定目标网络(Fixed Target Network)来稳定训练过程。DQN的更新公式为:
2. Double DQN算法
原理
DQN存在一个问题,即在更新Q值时,使用同一个Q网络选择和评估动作,容易导致过高估计(overestimation)问题。Double DQN(Double Deep Q-Network, DDQN)通过引入两个Q网络,分别用于选择动作和评估动作,来缓解这一问题。
公式推导
Double DQN的更新公式为:
其中:
- 是当前Q网络的参数。
- 是目标Q网络的参数。
代码实现
我们以经典的OpenAI Gym中的CartPole环境为例,展示Double DQN算法的实现。
import gym
import numpy as np
import tensorflow as tf
from tensorflow.keras import models, layers, optimizers
class DoubleDQNAgent:
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.memory = []
self.gamma = 0.95
self.epsilon = 1.0
self.epsilon_decay = 0.995
self.epsilon_min = 0.01
self.learning_rate = 0.001
self.model = self._build_model()
self.target_model = self._build_model()
self.update_target_model()
def _build_model(self):
model = models.Sequential()
model.add(layers.Dense(24, input_dim=self.state_size, activation='relu'))
model.add(layers.Dense(24, activation='relu'))
model.add(layers.Dense(self.action_size, activation='linear'))
model.compile(loss='mse', optimizer=optimizers.Adam(learning_rate=self.learning_rate))
return model
def update_target_model(self):
self.target_model.set_weights(self.model.get_weights())
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
def act(self, state):
if np.random.rand() <= self.epsilon:
return np.random.choice(self.action_size)
q_values = self.model.predict(state)
return np.argmax(q_values[0])
def replay(self, batch_size):
minibatch = np.random.choice(self.memory, batch_size)
for state, action, reward, next_state, done in minibatch:
target = self.model.predict(state)
if done:
target[0][action] = reward
else:
t = self.model.predict(next_state)
t_ = self.target_model.predict(next_state)
target[0][action] = reward + self.gamma * t_[0][np.argmax(t[0])]
self.model.fit(state, target, epochs=1, verbose=0)
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
env = gym.make('CartPole-v1')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
agent = DoubleDQNAgent(state_size, action_size)
episodes = 1000
for e in range(episodes):
state = env.reset()
state = np.reshape(state, [1, state_size])
done = False
time = 0
while not done:
action = agent.act(state)
next_state, reward, done, _ = env.step(action)
next_state = np.reshape(next_state, [1, state_size])
reward = reward if not done else -10
agent.remember(state, action, reward, next_state, done)
state = next_state
time += 1
if done:
agent.update_target_model()
print(f"Episode: {e}/{episodes}, Score: {time}, Epsilon: {agent.epsilon:.2}")
if len(agent.memory) > 32:
agent.replay(32)
env.close()
print("Double DQN训练完成")
3. Dueling DQN算法
原理
Dueling DQN通过将Q值函数拆分为状态价值(Value)和优势函数(Advantage),分别估计某一状态下所有动作的价值和某一动作相对于其他动作的优势。这样可以更好地评估状态的价值,从而提高算法性能。
公式推导
Dueling DQN的Q值函数定义为:
其中:
- 是状态价值函数。
- 是优势函数。
代码实现
以CartPole环境为例,展示Dueling DQN算法的实现。
import gym
import numpy as np
import tensorflow as tf
from tensorflow.keras import models, layers, optimizers
class DuelingDQNAgent:
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.memory = []
self.gamma = 0.95
self.epsilon = 1.0
self.epsilon_decay = 0.995
self.epsilon_min = 0.01
self.learning_rate = 0.001
self.model = self._build_model()
self.target_model = self._build_model()
self.update_target_model()
def _build_model(self):
input = layers.Input(shape=(self.state_size,))
dense1 = layers.Dense(24, activation='relu')(input)
dense2 = layers.Dense(24, activation='relu')(dense1)
value_fc = layers.Dense(24, activation='relu')(dense2)
value = layers.Dense(1, activation='linear')(value_fc)
advantage_fc = layers.Dense(24, activation='relu')(dense2)
advantage = layers.Dense(self.action_size, activation='linear')(advantage_fc)
q_values = layers.Lambda(lambda x: x[0] + (x[1] - tf.reduce_mean(x[1], axis=1, keepdims=True)))([value, advantage])
model = models.Model(inputs=input, outputs=q_values)
model.compile(loss='mse', optimizer=optimizers.Adam(learning_rate=self.learning_rate))
return model
def update_target_model(self):
self.target_model.set_weights(self.model.get_weights())
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
def act(self, state):
if np.random.rand() <= self.epsilon:
return np.random.choice(self.action_size)
q_values = self.model.predict(state)
return np.argmax(q_values[0])
def replay(self, batch_size):
minibatch = np.random.choice(self.memory, batch_size)
for state, action, reward, next_state, done in minibatch:
target = self.model.predict(state)
if done:
target[0][action] = reward
else:
t = self.target_model.predict(next_state)
target[0][action] = reward + self.gamma * np.amax(t[0])
self.model.fit(state, target, epochs=1, verbose=0)
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
env = gym.make('CartPole-v1')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
agent = DuelingDQNAgent(state_size, action_size)
episodes = 1000
for e in range(episodes):
state = env.reset()
state = np.reshape(state, [1, state_size])
done = False
time = 0
while not done:
action = agent.act(state)
next_state, reward, done, _ = env.step(action)
next_state = np.reshape(next_state, [1, state_size])
reward = reward if not done else -10
agent.remember(state, action, reward, next_state, done)
state = next_state
time += 1
if done:
agent.update_target_model()
print(f"Episode: {e}/{episodes}, Score: {time}, Epsilon: {agent.epsilon:.2}")
if len(agent.memory) > 32:
agent.replay(32)
env.close()
print("Dueling DQN训练完成")
4. 优先经验回放DQN(PER DQN)
原理
优先经验回放(Prioritized Experience Replay, PER)通过赋予不同经验样本不同的优先级来增强经验回放机制。优先级高的样本更有可能被再次抽取,从而加速学习过程。
公式推导
优先经验回放基于TD误差计算优先级,定义为:
其中:
- 是TD误差。
- 是一个小的正数,防止优先级为零。
然后根据优先级分布概率来采样,使用重要性采样权重来修正梯度更新,定义为:
代码实现
以CartPole环境为例,展示PER DQN算法的实现。
import gym
import numpy as np
import tensorflow as tf
from tensorflow.keras import models, layers, optimizers
import random
import collections
class PERDQNAgent:
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.memory = collections.deque(maxlen=2000)
self.gamma = 0.95
self.epsilon = 1.0
self.epsilon_decay = 0.995
self.epsilon_min = 0.01
self.learning_rate = 0.001
self.model = self._build_model()
self.target_model = self._build_model()
self.update_target_model()
self.priority = []
self.alpha = 0.6
self.beta = 0.4
self.beta_increment_per_sampling = 0.001
def _build_model(self):
model = models.Sequential()
model.add(layers.Dense(24, input_dim=self.state_size, activation='relu'))
model.add(layers.Dense(24, activation='relu'))
model.add(layers.Dense(self.action_size, activation='linear'))
model.compile(loss='mse', optimizer=optimizers.Adam(learning_rate=self.learning_rate))
return model
def update_target_model(self):
self.target_model.set_weights(self.model.get_weights())
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
self.priority.append(max(self.priority, default=1))
def act(self, state):
if np.random.rand() <= self.epsilon:
return np.random.choice(self.action_size)
q_values = self.model.predict(state)
return np.argmax(q_values[0])
def replay(self, batch_size):
if len(self.memory) < batch_size:
return
priorities = np.array(self.priority)
sampling_probabilities = priorities ** self.alpha
sampling_probabilities /= sampling_probabilities.sum()
indices = np.random.choice(len(self.memory), batch_size, p=sampling_probabilities)
minibatch = [self.memory[i] for i in indices]
importance_sampling_weights = (len(self.memory) * sampling_probabilities[indices]) ** (-self.beta)
importance_sampling_weights /= importance_sampling_weights.max()
for i, (state, action, reward, next_state, done) in enumerate(minibatch):
target = self.model.predict(state)
if done:
target[0][action] = reward
else:
t = self.target_model.predict(next_state)
target[0][action] = reward + self.gamma * np.amax(t[0])
self.model.fit(state, target, epochs=1, verbose=0, sample_weight=importance_sampling_weights[i])
self.priority[indices[i]] = abs(target[0][action] - self.model.predict(state)[0][action]) + 1e-6
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
self.beta = min(1.0, self.beta + self.beta_increment_per_sampling)
env = gym.make('CartPole-v1')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
agent = PERDQNAgent(state_size, action_size)
episodes = 1000
for e in range(episodes):
state = env.reset()
state = np.reshape(state, [1, state_size])
done = False
time = 0
while not done:
action = agent.act(state)
next_state, reward, done, _ = env.step(action)
next_state = np.reshape(next_state, [1, state_size])
reward = reward if not done else -10
agent.remember(state, action, reward, next_state, done)
state = next_state
time += 1
if done:
agent.update_target_model()
print(f"Episode: {e}/{episodes}, Score: {time}, Epsilon: {agent.epsilon:.2}")
if len(agent.memory) > 32:
agent.replay(32)
env.close()
print("PER DQN训练完成")
5. 总结
Double DQN、Dueling DQN和优先经验回放DQN(PER DQN)都是对原始DQN的改进,各有其优点和适用场景。Double DQN通过减少过高估计提高了算法的稳定性;Dueling DQN通过分离状态价值和优势函数更好地评估状态;PER DQN通过优先采样重要经验加速了学习过程。这些改进算法在不同的应用场景下,可以选择合适的算法来提升强化学习的效果。