【rl-agents代码学习】02——DQN算法

文章目录

  • Highway-env Intersection
  • rl-agents之DQN
    • *Implemented variants*:
    • *References*:
    • Query agent for actions sequence
      • 探索策略
      • 神经网络实现
      • 小结1
    • Record the experience
      • Replaybuffer
      • compute_bellman_residual
      • step_optimizer
      • update_target_network
      • 小结2
    • exploration_policy
    • 运行结果

Highway-env Intersection

本文将继续探索rl-agents中相关DQN算法的实现。下面的介绍将会以intersection这个环境为例,首先介绍一下Highway-env中的intersection-v1。Highway-env中相关文档——http://highway-env.farama.org/environments/intersection/。

highway-env中的环境可以通过配置文件进行修改, observations, actions, dynamics 以及rewards等信息都是以字典的形式存储在配置文件中。

PS:DQN、DuelingDQN算法原理可参考【强化学习】10 —— DQN算法【强化学习】11 —— Double DQN算法与Dueling DQN算法

import gymnasium as gym
import pprint
from matplotlib import pyplot as plt

env = gym.make("intersection-v1", render_mode='rgb_array')
pprint.pprint(env.unwrapped.config)

输出config,可以看到如下信息:

{'action': {'dynamical': True,
            'lateral': True,
            'longitudinal': True,
            'steering_range': [-1.0471975511965976, 1.0471975511965976],
            'type': 'ContinuousAction'},
 'arrived_reward': 1,
 'centering_position': [0.5, 0.6],
 'collision_reward': -5,
 'controlled_vehicles': 1,
 'destination': 'o1',
 'duration': 13,
 'high_speed_reward': 1,
 'initial_vehicle_count': 10,
 'manual_control': False,
 'normalize_reward': False,
 'observation': {'features': ['presence',
                              'x',
                              'y',
                              'vx',
                              'vy',
                              'long_off',
                              'lat_off',
                              'ang_off'],
                 'type': 'Kinematics',
                 'vehicles_count': 5},
 'offroad_terminal': False,
 'offscreen_rendering': False,
 'other_vehicles_type': 'highway_env.vehicle.behavior.IDMVehicle',
 'policy_frequency': 1,
 'real_time_rendering': False,
 'render_agent': True,
 'reward_speed_range': [7.0, 9.0],
 'scaling': 7.15,
 'screen_height': 600,
 'screen_width': 600,
 'show_trajectories': False,
 'simulation_frequency': 15,
 'spawn_probability': 0.6}

之后可以通过以下代码输出图像:

plt.imshow(env.render())
plt.show()

在这里插入图片描述
输出observation,可以看到是一个5*8的array,:

[[ 1.0000000e+00  9.9999998e-03  1.0000000e+00  0.0000000e+00
  -1.2500000e-01  6.3297665e+01  0.0000000e+00  0.0000000e+00]
 [ 1.0000000e+00  1.3849856e-01 -1.0000000e+00 -9.9416278e-02
   1.2500000e-01  8.1300293e+01  1.0361128e-15  0.0000000e+00]
 [ 1.0000000e+00 -2.0000000e-02 -1.0000000e+00  0.0000000e+00
   2.2993930e-01  6.5756187e+01  2.8473811e-15  0.0000000e+00]
 [ 0.0000000e+00  0.0000000e+00  0.0000000e+00  0.0000000e+00
   0.0000000e+00  0.0000000e+00  0.0000000e+00  0.0000000e+00]
 [ 0.0000000e+00  0.0000000e+00  0.0000000e+00  0.0000000e+00
   0.0000000e+00  0.0000000e+00  0.0000000e+00  0.0000000e+00]]

observation的解释如下,
在这里插入图片描述
通过以下代码,可以将action的类型变为离散的空间。

env.unwrapped.configure({
    "action": {
        'longitudinal': True,
        "type": "DiscreteMetaAction"
    }
})

rl-agents之DQN

A neural-network model is used to estimate the state-action value function and produce a greedy optimal policy.

Implemented variants:

  • Double DQN
  • Dueling architecture
  • N-step targets

References:

Playing Atari with Deep Reinforcement Learning, Mnih V. et al (2013).
Deep Reinforcement Learning with Double Q-learning, van Hasselt H. et al. (2015).
Dueling Network Architectures for Deep Reinforcement Learning, Wang Z. et al. (2015).

Query agent for actions sequence

由上一节所知,通过调用run_episodes函数,进行具体的agent训练。其中会调用step函数,并执行self.agent.plan(self.observation)。对于DQNAgent的实现,首先由AbstractAgent类实现plan,之后plan函数会调用act函数:

    def step(self):
        """
            Plan a sequence of actions according to the agent policy, and step the environment accordingly.
        """
        # Query agent for actions sequence
        actions = self.agent.plan(self.observation)
// rl_agents/agents/common/abstract.py
class AbstractAgent(Configurable, ABC):

    def __init__(self, config=None):
        super(AbstractAgent, self).__init__(config)
        self.writer = None  # Tensorboard writer
        self.directory = None  # Run directory
        
    @abstractmethod
    def act(self, state):
        """
            Pick an action

        :param state: s, the current state of the agent
        :return: a, the action to perform
        """
        raise NotImplementedError()

    def plan(self, state):
        """
            Plan an optimal trajectory from an initial state.

        :param state: s, the initial state of the agent
        :return: [a0, a1, a2...], a sequence of actions to perform
        """
        return [self.act(state)]

DQN抽象类AbstractDQNAgent继承自AbstractStochasticAgentAbstractStochasticAgent继承自AbstractAgent,在DQN抽象类AbstractDQNAgent中实现对act函数的重写:

    def act(self, state, step_exploration_time=True):
        """
            Act according to the state-action value model and an exploration policy
        :param state: current state
        :param step_exploration_time: step the exploration schedule
        :return: an action
        """
        self.previous_state = state
        if step_exploration_time:
            self.exploration_policy.step_time()
        # Handle multi-agent observations
        # TODO: it would be more efficient to forward a batch of states
        if isinstance(state, tuple):
            return tuple(self.act(agent_state, step_exploration_time=False) for agent_state in state)

        # Single-agent setting
        values = self.get_state_action_values(state)
        self.exploration_policy.update(values)
        return self.exploration_policy.sample()

探索策略

首先来看一下exploration_policy 的实现:

        self.exploration_policy = exploration_factory(self.config["exploration"], self.env.action_space)

探索策略加载的配置文件部分:

"exploration": {
 "method": "EpsilonGreedy",
    "tau": 15000,
    "temperature": 1.0,
    "final_temperature": 0.05
}

跳转到exploration_factory,可以看到主要实现了三类探索策略,具体的内容会在后面部分进行介绍:

  • Greedy
  • ϵ \epsilon ϵ-Greedy
  • Boltzmann
def exploration_factory(exploration_config, action_space):
    """
        Handles creation of exploration policies
    :param exploration_config: configuration dictionary of the policy, must contain a "method" key
    :param action_space: the environment action space
    :return: a new exploration policy
    """
    from rl_agents.agents.common.exploration.boltzmann import Boltzmann
    from rl_agents.agents.common.exploration.epsilon_greedy import EpsilonGreedy
    from rl_agents.agents.common.exploration.greedy import Greedy

    if exploration_config['method'] == 'Greedy':
        return Greedy(action_space, exploration_config)
    elif exploration_config['method'] == 'EpsilonGreedy':
        return EpsilonGreedy(action_space, exploration_config)
    elif exploration_config['method'] == 'Boltzmann':
        return Boltzmann(action_space, exploration_config)
    else:
        raise ValueError("Unknown exploration method")

神经网络实现

接着获取 Q ( s , a ) Q(s,a) Q(s,a)

    def get_state_action_values(self, state):
        """
        :param state: s, an environment state
        :return: [Q(a1,s), ..., Q(an,s)] the array of its action-values for each actions
        """
        return self.get_batch_state_action_values([state])[0]

调用了抽象方法get_batch_state_action_values

    @abstractmethod
    def get_batch_state_action_values(self, states):
        """
        Get the state-action values of several states
        :param states: [s1; ...; sN] an array of states
        :return: values:[[Q11, ..., Q1n]; ...] the array of all action values for each state
        """
        raise NotImplementedError

接着来看DQNAgent中的具体实现:

class DQNAgent(AbstractDQNAgent):
    def __init__(self, env, config=None):
        super(DQNAgent, self).__init__(env, config)
        size_model_config(self.env, self.config["model"])
        self.value_net = model_factory(self.config["model"])
        self.target_net = model_factory(self.config["model"])
        self.target_net.load_state_dict(self.value_net.state_dict())
        self.target_net.eval()
        logger.debug("Number of trainable parameters: {}".format(trainable_parameters(self.value_net)))
        self.device = choose_device(self.config["device"])
        self.value_net.to(self.device)
        self.target_net.to(self.device)
        self.loss_function = loss_function_factory(self.config["loss_function"])
        self.optimizer = optimizer_factory(self.config["optimizer"]["type"],
                                           self.value_net.parameters(),
                                           **self.config["optimizer"])
        self.steps = 0
        
    def get_batch_state_action_values(self, states):
        return self.value_net(torch.tensor(states, dtype=torch.float).to(self.device)).data.cpu().numpy()

value_net的实现依赖于model_factory,其中的配置文件部分如下:

    "model": {
        "type": "MultiLayerPerceptron",
        "layers": [128, 128]
    },

再进入model_factory,主要实现了四类网络:

  • MultiLayerPerceptron
  • DuelingNetwork
  • ConvolutionalNetwork
  • EgoAttentionNetwork

这里我们暂且先分析多层感知机MultiLayerPerceptron(即普通DQN)。

// rl_agents/agents/common/models.py
def model_factory(config: dict) -> nn.Module:
    if config["type"] == "MultiLayerPerceptron":
        return MultiLayerPerceptron(config)
    elif config["type"] == "DuelingNetwork":
        return DuelingNetwork(config)
    elif config["type"] == "ConvolutionalNetwork":
        return ConvolutionalNetwork(config)
    elif config["type"] == "EgoAttentionNetwork":
        return EgoAttentionNetwork(config)
    else:
        raise ValueError("Unknown model type")

MultiLayerPerceptron类继承自BaseModuleBaseModule继承自torch.nn.Module。根据配置文件baseline.json,可以看到MultiLayerPerceptron类的sizes为[128, 128],激活函数为RELU。我们可以注意到,网络实现中有reshape操作,因为state的输入是5*8的矩阵,通过reshape,可以将其转换为一维的向量。最终网络结构类似于下图。

在这里插入图片描述

class MultiLayerPerceptron(BaseModule, Configurable):
    def __init__(self, config):
        super().__init__()
        Configurable.__init__(self, config)
        sizes = [self.config["in"]] + self.config["layers"] 
        self.activation = activation_factory(self.config["activation"])
        layers_list = [nn.Linear(sizes[i], sizes[i + 1]) for i in range(len(sizes) - 1)]
        self.layers = nn.ModuleList(layers_list)
        if self.config.get("out", None):
            self.predict = nn.Linear(sizes[-1], self.config["out"])

    @classmethod
    def default_config(cls):
        return {"in": None,
                "layers": [64, 64],
                "activation": "RELU",
                "reshape": "True",
                "out": None}

    def forward(self, x):
        if self.config["reshape"]:
            x = x.reshape(x.shape[0], -1)  # We expect a batch of vectors
        for layer in self.layers:
            x = self.activation(layer(x))
        if self.config.get("out", None):
            x = self.predict(x)
        return x

获取 Q Q Q之后,探索策略进行更新,并sample一个action。以 ϵ \epsilon ϵ-Greedy为例,因为 ϵ \epsilon ϵ-Greedy继承DiscreteDistribution,所以主要关注DiscreteDistribution中的相关实现。

    def act(self, state, step_exploration_time=True):
    ...
        self.exploration_policy.update(values)
        return self.exploration_policy.sample()
rl_agents/agents/common/exploration/epsilon_greedy.py
    def update(self, values):
        """
            Update the action distribution parameters
        :param values: the state-action values
        :param step_time: whether to update epsilon schedule
        """
        self.optimal_action = np.argmax(values)
        self.epsilon = self.config['final_temperature'] + \
            (self.config['temperature'] - self.config['final_temperature']) * \
            np.exp(- self.time / self.config['tau'])
        if self.writer:
            self.writer.add_scalar('exploration/epsilon', self.epsilon, self.time)
class DiscreteDistribution(Configurable, ABC):
    def __init__(self, config=None, **kwargs):
        super(DiscreteDistribution, self).__init__(config)
        self.np_random = None
        
    @abstractmethod
    def get_distribution(self):
        """
        :return: a distribution over actions {action:probability}
        """
        raise NotImplementedError()

    def sample(self):
        """
        :return: an action sampled from the distribution
        """
        distribution = self.get_distribution()
        return self.np_random.choice(list(distribution.keys()), 1, p=np.array(list(distribution.values())))[0]

可以看到首先需要获得action的一个分布,这部分在 ϵ \epsilon ϵ-Greedy中的实现为:

    def get_distribution(self):
        distribution = {action: self.epsilon / self.action_space.n for action in range(self.action_space.n)}
        distribution[self.optimal_action] += 1 - self.epsilon
        return distribution

get_distribution 函数返回一个动作的概率分布字典。字典的键是动作,字典的值是动作被选择的概率。概率分布的计算方式为:每个动作都有一个基础概率 self.epsilon / self.action_space.n,其中 self.action_space.n 是动作的总数,即每个动作被选择的概率相等,这是基于探索的角度。同时,最优动作 self.optimal_action 会额外获得一个概率增量 1 - self.epsilon,这是基于利用的角度,即利用已知的最优动作。

sample 函数根据 get_distribution 函数得到的动作概率分布进行采样,返回一个动作。具体地,使用 np_random.choice 函数,其参数包括动作列表和对应的动作概率分布列表,返回的是一个根据给定概率分布随机采样的动作。

小结1

到此,act函数返回一个待执行的action,此部分的框图如下所示:

在这里插入图片描述
之后这几步在上一讲已经讨论过http://t.csdnimg.cn/ddpVJ。

        # Forward the actions to the environment viewer
        try:
            self.env.unwrapped.viewer.set_agent_action_sequence(actions)
        except AttributeError:
            pass
            
        # Step the environment
        previous_observation, action = self.observation, actions[0]
        transition = self.wrapped_env.step(action)
        self.observation, reward, done, truncated, info = transition
        terminal = done or truncated

        # Call callback
        if self.step_callback_fn is not None:
            self.step_callback_fn(self.episode, self.wrapped_env, self.agent, transition, self.writer)

Record the experience

现在step函数中只剩下这一步,我们再来看这一步的实现。

        # Record the experience.
        try:
            self.agent.record(previous_observation, action, reward, self.observation, done, info)
        except NotImplementedError:
            pass

直接跳转到AbstractDQNAgent类中查看相关实现

    def record(self, state, action, reward, next_state, done, info):
        """
            Record a transition by performing a Deep Q-Network iteration

            - push the transition into memory
            - sample a minibatch
            - compute the bellman residual loss over the minibatch
            - perform one gradient descent step
            - slowly track the policy network with the target network
        :param state: a state
        :param action: an action
        :param reward: a reward
        :param next_state: a next state
        :param done: whether state is terminal
        """
        if not self.training:
            return
        if isinstance(state, tuple) and isinstance(action, tuple):  # Multi-agent setting
            [self.memory.push(agent_state, agent_action, reward, agent_next_state, done, info)
             for agent_state, agent_action, agent_next_state in zip(state, action, next_state)]
        else:  # Single-agent setting
            self.memory.push(state, action, reward, next_state, done, info)
        batch = self.sample_minibatch()
        if batch:
            loss, _, _ = self.compute_bellman_residual(batch)
            self.step_optimizer(loss)
            self.update_target_network()

Replaybuffer

self.memory是Replaybuffer的一个实现

  self.memory = ReplayMemory(self.config)
  • push函数的实现可以提升运算速率。
  • 在强化学习中,经常需要从经验回放缓存(这里就是self.memory)中抽样出一批数据来更新模型。而这里的n-step是一个常用的技巧,它表明在预测下一个状态时,不仅仅使用当前的状态和动作,还使用接下来的n-1个状态和动作。当n为1时,这就是常见的单步过渡;当n大于1时,这就是n步采样。
rl_agents/agents/common/memory.py
class ReplayMemory(Configurable):
    """
        Container that stores and samples transitions.
    """
    def __init__(self, config=None, transition_type=Transition):
        super(ReplayMemory, self).__init__(config)
        self.capacity = int(self.config['memory_capacity'])
        self.transition_type = transition_type
        self.memory = []
        self.position = 0

    @classmethod
    def default_config(cls):
        return dict(memory_capacity=10000,
                    n_steps=1,
                    gamma=0.99)

    def push(self, *args):
        """Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
            self.position = len(self.memory) - 1
        elif len(self.memory) > self.capacity:
            self.memory = self.memory[:self.capacity]
        # Faster than append and pop
        self.memory[self.position] = self.transition_type(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size, collapsed=True):
        """
            Sample a batch of transitions.

            If n_steps is greater than one, the batch will be composed of lists of successive transitions.
        :param batch_size: size of the batch
        :param collapsed: whether successive transitions must be collapsed into one n-step transition.
        :return: the sampled batch
        """
        # TODO: use agent's np_random for seeding
        if self.config["n_steps"] == 1:
            # Directly sample transitions
            return random.sample(self.memory, batch_size)
        else:
            # Sample initial transition indexes
            indexes = random.sample(range(len(self.memory)), batch_size)
            # Get the batch of n-consecutive-transitions starting from sampled indexes
            all_transitions = [self.memory[i:i+self.config["n_steps"]] for i in indexes]
            # Collapse transitions
            return map(self.collapse_n_steps, all_transitions) if collapsed else all_transitions

    def collapse_n_steps(self, transitions):
        """
            Collapse n transitions <s,a,r,s',t> of a trajectory into one transition <s0, a0, Sum(r_i), sp, tp>.

            We start from the initial state, perform the first action, and then the return estimate is formed by
            accumulating the discounted rewards along the trajectory until a terminal state or the end of the
            trajectory is reached.
        :param transitions: A list of n successive transitions
        :return: The corresponding n-step transition
        """
        state, action, cumulated_reward, next_state, done, info = transitions[0]
        discount = 1
        for transition in transitions[1:]:
            if done:
                break
            else:
                _, _, reward, next_state, done, info = transition
                discount *= self.config['gamma']
                cumulated_reward += discount*reward
        return state, action, cumulated_reward, next_state, done, info

    def __len__(self):
        return len(self.memory)

    def is_full(self):
        return len(self.memory) == self.capacity

    def is_empty(self):
        return len(self.memory) == 0

回到record代码中,首先将采样到的数据放入Replaybuffer,当采样数据量大于batch_size时,从Replaybuffer中采样。

    def sample_minibatch(self):
        if len(self.memory) < self.config["batch_size"]:
            return None
        transitions = self.memory.sample(self.config["batch_size"])
        return Transition(*zip(*transitions))

compute_bellman_residual

之后便利用bellman方程进行更新:

loss, _, _ = self.compute_bellman_residual(batch)
    def compute_bellman_residual(self, batch, target_state_action_value=None):
        # Compute concatenate the batch elements
        if not isinstance(batch.state, torch.Tensor):
            # logger.info("Casting the batch to torch.tensor")
            state = torch.cat(tuple(torch.tensor([batch.state], dtype=torch.float))).to(self.device)
            action = torch.tensor(batch.action, dtype=torch.long).to(self.device)
            reward = torch.tensor(batch.reward, dtype=torch.float).to(self.device)
            next_state = torch.cat(tuple(torch.tensor([batch.next_state], dtype=torch.float))).to(self.device)
            terminal = torch.tensor(batch.terminal, dtype=torch.bool).to(self.device)
            batch = Transition(state, action, reward, next_state, terminal, batch.info)

        # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
        # columns of actions taken
        state_action_values = self.value_net(batch.state)
        state_action_values = state_action_values.gather(1, batch.action.unsqueeze(1)).squeeze(1)

        if target_state_action_value is None:
            with torch.no_grad():
                # Compute V(s_{t+1}) for all next states.
                next_state_values = torch.zeros(batch.reward.shape).to(self.device)
                if self.config["double"]:
                    # Double Q-learning: pick best actions from policy network
                    _, best_actions = self.value_net(batch.next_state).max(1)
                    # Double Q-learning: estimate action values from target network
                    best_values = self.target_net(batch.next_state).gather(1, best_actions.unsqueeze(1)).squeeze(1)
                else:
                    best_values, _ = self.target_net(batch.next_state).max(1)
                next_state_values[~batch.terminal] = best_values[~batch.terminal]
                # Compute the expected Q values
                target_state_action_value = batch.reward + self.config["gamma"] * next_state_values

        # Compute loss
        loss = self.loss_function(state_action_values, target_state_action_value)
        return loss, target_state_action_value, batch
  • with torch.no_grad():用于禁止在其作用域内进行梯度计算
  • 实现了DoubleDQN
  • self.loss_function = loss_function_factory(self.config["loss_function"])loss函数包括以下几种:
def loss_function_factory(loss_function):
    if loss_function == "l2":
        return F.mse_loss
    elif loss_function == "l1":
        return F.l1_loss
    elif loss_function == "smooth_l1":
        return F.smooth_l1_loss
    elif loss_function == "bce":
        return F.binary_cross_entropy
    else:
        raise ValueError("Unknown loss function : {}".format(loss_function))

step_optimizer

对梯度进行了截断

    def step_optimizer(self, loss):
        # Optimize the model
        self.optimizer.zero_grad()
        loss.backward()
        for param in self.value_net.parameters():
            param.grad.data.clamp_(-1, 1)
        self.optimizer.step()

update_target_network

更新目标网络

    def update_target_network(self):
        self.steps += 1
        if self.steps % self.config["target_update"] == 0:
            self.target_net.load_state_dict(self.value_net.state_dict())

小结2

到此,整个DQN算法实现完毕,record部分的框图如下:

在这里插入图片描述

exploration_policy

这部分主要实现了三种策略:

  • Greedy
  • ϵ \epsilon ϵ-Greedy
  • Boltzmann

此部分可以参考:【强化学习】02—— 探索与利用

Greedy

Greedy贪婪策略即选择最优的策略 a t = arg max ⁡ a ∈ A Q ( s , a ) a_t=\argmax_{a\in\mathcal{A}}Q(s,a) at=argmaxaAQ(s,a)

class Greedy(DiscreteDistribution):
    """
        Always use the optimal action
    """

    def __init__(self, action_space, config=None):
        super(Greedy, self).__init__(config)
        self.action_space = action_space
        if isinstance(self.action_space, spaces.Tuple):
            self.action_space = self.action_space.spaces[0]
        if not isinstance(self.action_space, spaces.Discrete):
            raise TypeError("The action space should be discrete")
        self.values = None
        self.seed()

    def get_distribution(self):
        optimal_action = np.argmax(self.values)
        return {action: 1 if action == optimal_action else 0 for action in range(self.action_space.n)}

    def update(self, values):
        self.values = values

ϵ \epsilon ϵ-Greedy

ϵ \epsilon ϵ-Greedy公式如下:
a t = { arg ⁡ max ⁡ a ∈ A Q ^ ( a ) , 采样概率:1- ϵ 从  A  中随机选择 , 采样概率:  ϵ a_t=\begin{cases}\arg\max_{a\in\mathcal{A}}\hat{Q}(a),&\text{采样概率:1-}\epsilon\\\text{从 }\mathcal{A}\text{ 中随机选择},&\text{采样概率: }\epsilon&\end{cases} at={argmaxaAQ^(a), A 中随机选择,采样概率:1-ϵ采样概率ϵ
这里实现的其实是衰减贪心策略,衰减曲线如下图所示。
ϵ = final-temperature + ( temperature − final-temperature ) ∗ e − t τ \begin{aligned}\epsilon &= \text{final-temperature}+(\text{temperature}-\text{final-temperature})*e^{\frac{-t}{\tau}}\end{aligned} ϵ=final-temperature+(temperaturefinal-temperature)eτt
在这里插入图片描述

class EpsilonGreedy(DiscreteDistribution):
    """
        Uniform distribution with probability epsilon, and optimal action with probability 1-epsilon
    """

    def __init__(self, action_space, config=None):
        super(EpsilonGreedy, self).__init__(config)
        self.action_space = action_space
        if isinstance(self.action_space, spaces.Tuple):
            self.action_space = self.action_space.spaces[0]
        if not isinstance(self.action_space, spaces.Discrete):
            raise TypeError("The action space should be discrete")
        self.config['final_temperature'] = min(self.config['temperature'], self.config['final_temperature'])
        self.optimal_action = None
        self.epsilon = 0
        self.time = 0
        self.writer = None
        self.seed()

    @classmethod
    def default_config(cls):
        return dict(temperature=1.0,
                    final_temperature=0.1,
                    tau=5000)

    def get_distribution(self):
        distribution = {action: self.epsilon / self.action_space.n for action in range(self.action_space.n)}
        distribution[self.optimal_action] += 1 - self.epsilon
        return distribution

    def update(self, values):
        """
            Update the action distribution parameters
        :param values: the state-action values
        :param step_time: whether to update epsilon schedule
        """
        self.optimal_action = np.argmax(values)
        self.epsilon = self.config['final_temperature'] + \
            (self.config['temperature'] - self.config['final_temperature']) * \
            np.exp(- self.time / self.config['tau'])
        if self.writer:
            self.writer.add_scalar('exploration/epsilon', self.epsilon, self.time)

    def step_time(self):
        self.time += 1

    def set_time(self, time):
        self.time = time

    def set_writer(self, writer):
        self.writer = writer

Boltzmann

玻尔兹曼分布(Boltzmann Distribution)是描述分子在热力学平衡时分布的概率分布函数。它表明在给定的能量状态下,不同的微观状态出现的概率是不同的,且符合一个指数函数形式。

在热力学中,任何物质在一定温度下都会具有一定的热运动,这些热运动状态可以用分子内能或动能来描述。而玻尔兹曼分布表明了在相同温度下,分子在所有可能状态之间的分布概率。其表达式为:

P ( E i ) = e − E i / k T ∑ j e − E j / k T P(E_i) = \frac{e^{-E_i/kT}}{\sum_{j} e^{-E_j/kT}} P(Ei)=jeEj/kTeEi/kT

其中, P ( E i ) P(E_i) P(Ei)为分子处于能量状态 E i E_i Ei的概率, k k k为玻尔兹曼常数, T T T为温度, E j E_j Ej为所有可以达到的能量状态。

可以看到,玻尔兹曼分布中每个能量状态的出现概率与其能量成负指数关系,因此能量较小的状态出现的概率更大。这符合熵增加的趋势,即越有序的状态出现的概率越小。

class Boltzmann(DiscreteDistribution):
    """
        Uniform distribution with probability epsilon, and optimal action with probability 1-epsilon
    """

    def __init__(self, action_space, config=None):
        super(Boltzmann, self).__init__(config)
        self.action_space = action_space
        if not isinstance(self.action_space, spaces.Discrete):
            raise TypeError("The action space should be discrete")
        self.values = None
        self.seed()

    @classmethod
    def default_config(cls):
        return dict(temperature=0.5)

    def get_distribution(self):
        actions = range(self.action_space.n)
        if self.config['temperature'] > 0:
            weights = np.exp(self.values / self.config['temperature'])
        else:
            weights = np.zeros((len(actions),))
            weights[np.argmax(self.values)] = 1
        return {action: weights[action] / np.sum(weights) for action in actions}

    def update(self, values):
        self.values = values

运行结果

运行命令与方法在上一讲已经介绍【rl-agents代码学习】01——总体框架。

超参数设置采用默认设置,使用DQN算法分别运行4000steps和20000steps。使用Tensorboard查看结果:

 tensorboard --logdir C:\Users\16413\Desktop\rl-agents-master\scripts\out\IntersectionEnv\DQNAgent\baseline_20231113-123234_7944\

4000steps
在这里插入图片描述
在这里插入图片描述
可以看到最后的episode reward大致在3左右。

20000steps
在这里插入图片描述
可以看到最后的episode reward大致在3左右。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/147376.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

(.htaccess文件特性)[MRCTF2020]你传你呢 1

题目环境&#xff1a; 不难看出是一道文件上传漏洞 上传一句话木马文件burpsuite进行抓包<?php eval($_POST[shell]);?> 命名为PHP文件格式 Repeater进行重放 尝试了其它后缀进行绕过都没有成功 通过 application/x-php内容类型&#xff0c;可以看出被识别出是PHP文件&…

模拟接口数据之使用Mock方法实现(vite)

文章目录 前言一、安装依赖mockjs 安装vite-plugin-mock 安装新增mock脚本 二、vite插件配置vite-plugin-mockvite.config.ts 引入vite-plugin-mock 三、新建mock数据新建mock目录env目录新建.env.mock文件 四、使用mock数据定义接口调用接口 如有启发&#xff0c;可点赞收藏哟…

μC/OS-II---互斥信号量管理1(os_mutex.c)

目录 背景&#xff1a;优先级反转问题互斥信号量管理互斥信号量创建互斥信号量删除互斥信号量获取/等待 背景&#xff1a;优先级反转问题 在高优先级任务等待低优先级任务释放资源时&#xff0c;第三个中等优先级任务抢占了低优先级任务。阻塞时间是无法预测的&#xff0c;可能…

【ROS导航Navigation】二 | 坐标系 | 定位 | 导航约束

目录 致谢&#xff1a;ROS赵虚左老师 一、通过里程计定位 二、通过传感器定位 三、坐标系变换 四、导航约束条件 1.硬件 2.软件 致谢&#xff1a;ROS赵虚左老师 Introduction Autolabor-ROS机器人入门课程《ROS理论与实践》零基础教程 参考赵虚左老师的实战教程 一、…

聚观早报 |英伟达发布H200;夸克发布自研大模型

【聚观365】11月15日消息 英伟达发布H200 夸克发布自研大模型 iQOO 12系列开启销售 红魔9 Pro配置细节 禾赛科技第三季度营收4.5亿元 英伟达发布H200 全球市值最高的芯片制造商英伟达公司&#xff0c;正在升级其H100人工智能处理器&#xff0c;为这款产品增加更多功能&am…

《网络协议》07. 其他协议

title: 《网络协议》07. 其他协议 date: 2022-10-07 18:24:02 updated: 2023-11-15 08:00:52 categories: 学习记录&#xff1a;网络协议 excerpt: IPv6、WebSocket、WebService&#xff08;SOAP&#xff0c;WSDL&#xff09;、HTTPDNS、FTP、邮件&#xff08;SMTP&#xff0c;…

PostgreSQL基本操作

目录 1.源码安装PostgreSQL 1.1.前置条件&#xff08;root下操作&#xff09; 1.1.1.卸载yum安装的postgresql 1.1.2.创建postgres用户 1.1.3.安装部分依赖 1.1.4.源码安装uuid 1.2.安装PostgreSQL 1.2.1.使用postgres用户管理PostgreSQL 1.2.2.下载解压postgres12源码…

SUMO道路封闭车辆绕行仿真实验【TraCI】

本文将介绍如何在 SUMO 交通模拟中动态选择车辆绕行指定道路。 绕道是城市驾驶中的常见现象&#xff0c;造成原因有很多&#xff0c;包括建筑和交通事故等。 无论出于何种原因&#xff0c;并非所有车辆都会选择避开这些道路&#xff1b; 有些人可能会毫不犹豫地直接开车过去&a…

Linux信号量以及基于环形队列的生产者消费者模型

文章目录 信号量信号量的接口初始化销毁等待信号量发布信号量 环形队列结合信号量设计模型 实现基于环形队列的生产者消费者模型Task.hppRingQueue.hppmain.cc效果对于多生产多消费的情况 信号量 信号量的本质是一个计数器 首先一份公共资源在实际情况中可能存在不同的线程可…

spring 整合 JUnit

大家好&#xff0c;本篇博客我们通过spring来整合JUnitt单元测试框架。 在之前篇章的测试方法中&#xff0c;几乎都能看到以下的两行代码&#xff1a; ApplicationContext context new ClassPathXmlApplicationContext("xxx.xml"); Xxxx xxx context.getBean(Xxx…

ctyunos 与 openeuler

ctyunos-2.0.1-220311-aarch64-dvd ctyunos-2.0.1-220329-everything-aarch64-dvd glibc python3 对应openEuler 20.03 LTS SP1

μC/OS-II---互斥信号量管理2(os_mutex.c)

目录 背景&#xff1a;优先级反转问题互斥信号量管理互斥信号量发出&#xff08;释放&#xff09;互斥信号量获取/无等待互斥信号量状态查询 背景&#xff1a;优先级反转问题 在高优先级任务等待低优先级任务释放资源时&#xff0c;第三个中等优先级任务抢占了低优先级任务。阻…

【京东API】商品详情+搜索商品列表接口

利用电商API获取数据的步骤 1.申请API接口&#xff1a;首先要在相应电商平台上注册账号并申请API接口。 2.获取授权&#xff1a;在账号注册成功后&#xff0c;需要获取相应的授权才能访问电商API。 3.调用API&#xff1a;根据电商API提供的请求格式&#xff0c;通过编程实现…

8.GC基本原理

目录 概述垃圾回收引用计数法 (Reference Counting)根可达分析算法 (GCRooting Tracing)对象引用类型强引用软引用弱引用 清除垃圾1.标记-清除算法 (Mark-Sweep)2.复制算法 (Copying)3.标记-整理算法 (Mark-Compact)分代回收 (Generational Collection) 垃圾回收器GC-串行收集器…

力扣每日一题-K个元素的最大和-2023.11.15

力扣每日一题&#xff1a;K个元素的最大和 题目链接:2656.K个元素的最大和 题目描述 代码思路 题目看完直接笑嘻了&#xff0c;还有这么容易的题。由题可知&#xff0c;第一次要找出最大值m&#xff0c;那由于把m1放回去&#xff0c;那第二次找的就是m1&#xff0c;以此类推…

DGL如何表征一张图

有关于DGL中图的构建 DGL 将有向图表示为一个 DGL 图对象。图中的节点编号连续&#xff0c;从0开始。我们一般通过指定图中的节点数&#xff0c;以及源节点和目标节点的列表&#xff0c;来构建这么一个图。 下面的代码构造了一个图&#xff0c;这个图有五个叶子节点。中心节点…

python 多线程池 CPU拉满

前言&#xff1a; 关于多线程的博客先前的博客如下&#xff1a; python线程join方法_python 线程join-CSDN博客 【精选】Python GIL锁_python gil 锁-CSDN博客 python函数运行加速_python os.listdir速度慢_两只蜡笔的小新的博客-CSDN博客 只需下面的模版即可: from multi…

CNCC 2023收官,Milvus Cloud与行业大咖共话向量数据库系统

近期,CNCC 2023 在沈阳圆满结束,紧凑、前沿的 129 场技术论坛让人印象深刻。据悉,这 129 场技术论坛涵盖人工智能、安全、计算+、软件工程、教育、网络、芯片、云计算等 30 余个方向。Zilliz 受邀参与【智能时代的大数据系统】技术论坛。 智能时代的到来,无疑给社会经济和日…

前端 vue 面试题 (一)

文章目录 v-if,v-show差别v-for和v-if虚拟dom解决什么问题vue的data为什么返回函数不返回对象比较vue&#xff0c;reactvue双向绑定原理vue虚拟dom 的diff算法vue 虚拟dom的diff算法的时间复杂度vue2与vue3的区别vue数据缓存&#xff0c;避免重复计算单页应用怎么跨页面传参vue…

基于springboot实现学生选课平台管理系统项目【项目源码】

系统开发平台 在该地方废物回收机构管理系统中&#xff0c;Eclipse能给用户提供更多的方便&#xff0c;其特点一是方便学习&#xff0c;方便快捷&#xff1b;二是有非常大的信息储存量&#xff0c;主要功能是用在对数据库中查询和编程。其功能有比较灵活的数据应用&#xff0c…