强化学习_06_pytorch-TD3实践(CarRacing-v2)

0、TD3算法原理简介

详见笔者前一篇实践强化学习_06_pytorch-TD3实践(BipedalWalkerHardcore-v3)

1、CarRacing环境观察及调整

Action SpaceBox([-1. 0. 0.], 1.0, (3,), float32)
Observation SpaceBox(0, 255, (96, 96, 3), uint8)

动作空间是[-1~1, 0~1, 0~1], 状态空间是 96 × 96 × 3 96\times96\times3 96×96×3 的图片。

1.1 图片裁剪及跳帧

环境初始的时候有40-50帧是没有意义的,可能还会影响模型训练。同时图片下面黑色部分也是没有太多意义,所以可以直接对图片截取s = s[:84, 6:90]
在这里插入图片描述

对环境进行简单观察会发现,一个step是一帧,一帧很难捕捉动作产生的影响(移动量,奖励等)。所以我们进行跳帧观察(1个action进行n个step,期间累计奖励),从红线看,每隔5帧已经可以看出小车在移动。
在这里插入图片描述

1.2 车驶离赛道判断 & reward调整

我们可以看出在gymnasiumCarRacing-V2连续的环境中没有驶出赛道终止的设定,所以我们可以基于像素进行判断是否驶离赛道。观察三个channel,我们可以看出在第二个channel中可以基于大约75行左右的一行像素进行是否行驶出去的判断
经过试验我们可以直接用s[75, 35:50, 1] 前2个和后2个像素点来判断是否行驶到赛道外。
在这里插入图片描述

    def judge_out_of_route(self, obs):
        s = obs[:84, 6:90, :]
        out_sum = (s[75, 35:48, 1][:2] > 200).sum() + (s[75, 35:48, 1][-2:] > 200).sum()
        return out_sum == 4

在加入了是否行驶到赛道外的判断后,如果判断出了赛道则reward=-10

1.4 对多个输出进行通道叠加FrameStack

进行跳帧可以看出车辆的移动,但是只有多张的连续输入,CNN才能感知连续的动作。所以我们这两将4次跳帧组成一个observe,即最终20个step返回一个observe和叠加reward
在这里插入图片描述

1.5 最终环境构建python code

import gymnasium as gym
import torch
import numpy as np
from torchvision import transforms
from gymnasium.spaces import Box
from gymnasium.wrappers import FrameStack

class CarV2SkipFrame(gym.Wrapper):
    def __init__(self, env, skip: int):
        """skip frame
        Args:
            env (_type_): _description_
            skip (int): skip frames
        """
        super().__init__(env)
        self._skip = skip
    
    def step(self, action):
        tt_reward_list = []
        done = False
        total_reward = 0
        for i in range(self._skip):
            obs, reward, done, info, _ = self.env.step(action)
            out_done = self.judge_out_of_route(obs)
            done_f = done or out_done
            reward = -10 if out_done else reward
            # reward = -100 if out_done else reward
            # reward = reward * 10 if reward > 0 else reward
            total_reward += reward
            tt_reward_list.append(reward)
            if done_f:
                break
        return obs[:84, 6:90, :], total_reward, done_f, info, _
    
    def judge_out_of_route(self, obs):
        s = obs[:84, 6:90, :]
        out_sum = (s[75, 35:48, 1][:2] > 200).sum() + (s[75, 35:48, 1][-2:] > 200).sum()
        return out_sum == 4

    def reset(self, seed=0, options=None):
        s, info = self.env.reset(seed=seed, options=options)
        # steering  gas  breaking
        a = np.array([0.0, 0.0, 0.0])
        for i in range(45):
            obs, reward, done, info, _ = self.env.step(a)

        return obs[:84, 6:90, :], info


class SkipFrame(gym.Wrapper):
    def __init__(self, env, skip: int):
        """skip frame
        Args:
            env (_type_): _description_
            skip (int): skip frames
        """
        super().__init__(env)
        self._skip = skip
    
    def step(self, action):
        total_reward = 0.0
        done = False
        for _ in range(self._skip):
            obs, reward, done, info, _ = self.env.step(action)
            total_reward += reward
            if done:
                break
        return obs, total_reward, done, info, _


class GrayScaleObservation(gym.ObservationWrapper):
    def __init__(self, env):
        """RGP -> Gray
        (high, width, channel) -> (1, high, width) 
        """
        super().__init__(env)
        self.observation_space = Box(
            low=0, high=255, shape=self.observation_space.shape[:2], dtype=np.uint8
        )
    
    def observation(self, observation):
        tf = transforms.Grayscale()
        # channel first
        return tf(torch.tensor(np.transpose(observation, (2, 0, 1)).copy(), dtype=torch.float))


class ResizeObservation(gym.ObservationWrapper):
    def __init__(self, env, shape: int):
        """reshape observe
        Args:
            env (_type_): _description_
            shape (int): reshape size
        """
        super().__init__(env)
        self.shape = (shape, shape)
        obs_shape = self.shape + self.observation_space.shape[2:]
        self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)

    def observation(self, observation):
        #  Normalize -> input[channel] - mean[channel]) / std[channel]
        transformations = transforms.Compose([transforms.Resize(self.shape), transforms.Normalize(0, 255)])
        return transformations(observation).squeeze(0)



env_name = 'CarRacing-v2'
env = gym.make(env_name)
SKIP_N = 5
STACK_N = 4
env_ = FrameStack(
    ResizeObservation(
        GrayScaleObservation(CarV2SkipFrame(env, skip=SKIP_N)), 
        shape=84
    ), 
    num_stack=STACK_N
)

二、智能体构建

因为是用的CNN,所以需要注意梯度消失的问题。

2.1 actor

主要架构就是CNN + MLP + maxMinScale

  • CNN: 因为环境比较简单第一层用MaxPool2d采样,第二层进行AvgPool2d平滑
    nn.Sequential(
        nn.Conv2d(in_channels=4, out_channels=16, kernel_size=4, stride=2),
        nn.ReLU(),
        nn.MaxPool2d(2, 2, 0),
        nn.Conv2d(in_channels=16, out_channels=32, kernel_size=4, stride=2),
        nn.ReLU(),
        nn.AvgPool2d(2, 2, 0),
        nn.Flatten()
    )
    
  • MLP
    • 对cnn提取的特征进行 LayerNorm (一定程度干预梯度消失)
    • 对最后层全连接层的输出进行 LayerNorm (一定程度干预梯度消失)
  • maxMinScale
    • 最后通过tanh激活层action全部归一化到[-1,1]之间
    • 基于环境的动作上线限,用maxMinScale方式将最终的输出映射到[动作下限,动作上限]

actor 网络

class TD3CNNPolicyNet(nn.Module):
    """
    输入state, 输出action
    """
    def __init__(self, 
                state_dim: int, 
                hidden_layers_dim: typ.List, 
                action_dim: int, 
                action_bound: typ.Union[float, gym.Env]=1.0, 
                state_feature_share: bool=False
                ):
        super(TD3CNNPolicyNet, self).__init__()
        self.state_feature_share = state_feature_share
        self.low_high_flag = hasattr(action_bound, "action_space")
        print('action_bound=',action_bound)
        self.action_bound = action_bound
        if self.low_high_flag:
            self.action_high = torch.FloatTensor(action_bound.action_space.low)
            self.action_low = torch.FloatTensor(action_bound.action_space.high)

        self.cnn_feature = nn.Sequential(
            nn.Conv2d(in_channels=4, out_channels=16, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.AvgPool2d(2, 2, 0),
            nn.Flatten()
        )
        self.cnn_out_ln = nn.LayerNorm([512])
        self.features = nn.ModuleList()
        for idx, h in enumerate(hidden_layers_dim):
            self.features.append(nn.ModuleDict({
                'linear': nn.Linear(hidden_layers_dim[idx-1] if idx else 512, h),
                'linear_action': nn.ReLU()
            }))
        
        self.fc_out = nn.Linear(hidden_layers_dim[-1], action_dim)
        self.final_ln = nn.LayerNorm([action_dim])

    def max_min_scale(self, act):
        """
        X_std = (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0))
        X_scaled = X_std * (max - min) + min
        """
        # print("max_min_scale(", act, ")")
        device_ = act.device
        action_range = self.action_high.to(device_) - self.action_low.to(device_)
        act_std = (act - -1.0) / 2.0
        return act_std * action_range.to(device_) + self.action_low.to(device_)

    def forward(self, state):
        if len(state.shape) == 3:
            state = state.unsqueeze(0)
        try:
            x = self.cnn_feature(state)
        except Exception as e:
            print(state.shape)
            state = state.permute(0, 3, 1, 2)
            x = self.cnn_feature(state)

        x = self.cnn_out_ln(x)
        for layer in self.features:
            x = layer['linear_action'](layer['linear'](x))

        device_ = x.device
        if self.low_high_flag:
            return self.max_min_scale(torch.tanh(self.final_ln(self.fc_out(x))))
        return torch.tanh(self.final_ln(self.fc_out(x)).clip(-6.0, 6.0)) * self.action_bound

2.2 critic

  • CNN: 设计同Actor
  • concat状态和action
    • 进行observe和action concat 之前对action进行线性变换(一定程度解决梯度消失 及 原地转圈)
class TD3CNNValueNet(nn.Module):
    """
    输入[state, cation], 输出value
    """
    def __init__(self, state_dim: int, action_dim: int, hidden_layers_dim: typ.List, 
                 state_feature_share=False
                ):
        super(TD3CNNValueNet, self).__init__()
        self.state_feature_share = state_feature_share
        self.q1_cnn_feature = nn.Sequential(
            nn.Conv2d(in_channels=4, out_channels=16, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.AvgPool2d(2, 2, 0),
            nn.Flatten()
        )
        self.q2_cnn_feature = nn.Sequential(
            nn.Conv2d(in_channels=4, out_channels=16, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.AvgPool2d(2, 2, 0),
            nn.Flatten()
        )
        self.features_q1 = nn.ModuleList()
        self.features_q2 = nn.ModuleList()
        for idx, h in enumerate(hidden_layers_dim + [action_dim]):
            self.features_q1.append(nn.ModuleDict({
                'linear': nn.Linear(hidden_layers_dim[idx-1] if idx else 512, h),
                'linear_activation': nn.ReLU()
            }))
            self.features_q2.append(nn.ModuleDict({
                'linear': nn.Linear(hidden_layers_dim[idx-1] if idx else 512, h),
                'linear_activation': nn.ReLU()
            }))

        self.act_q1_fc = nn.Linear(action_dim, action_dim)
        self.act_q2_fc = nn.Linear(action_dim, action_dim)

        self.head_q1_bf = nn.Linear(action_dim * 2, action_dim)
        self.head_q2_bf = nn.Linear(action_dim * 2, action_dim)
        
        self.head_q1 = nn.Linear(action_dim, 1)
        self.head_q2 = nn.Linear(action_dim, 1)
        
    def forward(self, state, action):
        if len(state.shape) == 3:
            state = state.unsqueeze(0)
        try:
            x1 = self.q1_cnn_feature(state)
            x2 = self.q2_cnn_feature(state)
        except Exception as e:
            state = state.permute(0, 3, 1, 2)
            x1 = self.q1_cnn_feature(state)
            x2 = self.q2_cnn_feature(state)
            
        for layer1, layer2 in zip(self.features_q1, self.features_q2):
            x1 = layer1['linear_activation'](layer1['linear'](x1))
            x2 = layer2['linear_activation'](layer2['linear'](x2))

        # 拼接状态和动作
        act1 = torch.relu(self.act_q1_fc(action.float()))
        act2 = torch.relu(self.act_q2_fc(action.float()))
        x1 = torch.relu( self.head_q1_bf(torch.cat([x1, act1], dim=-1).float()))
        # print("torch.cat([x1, action], dim=-1)=", torch.cat([x1, act1], dim=-1)[:5, :])
        x2 = torch.relu( self.head_q2_bf(torch.cat([x2, act2], dim=-1).float()))
        return self.head_q1(x1), self.head_q2(x2)

    def Q1(self, state, action):
        if len(state.shape) == 3:
            state = state.unsqueeze(0)
        try:
            x = self.q1_cnn_feature(state)
        except Exception as e:
            state = state.permute(0, 3, 1, 2)
            x = self.q1_cnn_feature(state)

        for layer in self.features_q1:
            x = layer['linear_activation'](layer['linear'](x))

        # 拼接状态和动作
        act1 = torch.relu(self.act_q1_fc(action.float()))
        x = torch.relu( self.head_q1_bf(torch.cat([x, act1], dim=-1).float()))
        return self.head_q1(x) 

2.3 TD3算法简单调整

  1. policy_noise: 分布调整为(mean=0, std=每个维度动作范围) * self.policy_noise
  2. expl_noise: 分布调整为(mean=0, std=每个维度动作范围) * self.train_noise

3、训练

整体训练脚本可以看笔者的github test_TD3.py : CarRacing_TD3_test()

  1. 对训练做了一些调整: 在训练的过程中增加测试阶段:每隔test_ep_freq进行测试
  2. 基于多次测试的奖励均值进行最佳模型参数保存
def CarRacing_TD3_test():
    env_name = 'CarRacing-v2'
    gym_env_desc(env_name)
    env = gym.make(env_name)
    env = FrameStack(
        ResizeObservation(
            GrayScaleObservation(CarV2SkipFrame(env, skip=5)), 
            shape=84
        ), 
        num_stack=4
    )
    print("gym.__version__ = ", gym.__version__ )
    path_ = os.path.dirname(__file__)
    cfg = Config(
        env, 
        # 环境参数
        save_path=os.path.join(path_, "test_models" ,'TD3_CarRacing-v2_test2-3'), 
        seed=42,
        # 网络参数
        actor_hidden_layers_dim=[128], # 256
        critic_hidden_layers_dim=[128],
        # agent参数
        actor_lr=2.5e-4, #5.5e-5,
        critic_lr=1e-3, #7.5e-4,  
        gamma=0.99,
        # 训练参数
        num_episode=15000,
        sample_size=128,
        # 环境复杂多变,需要保存多一些buffer
        off_buffer_size=1024*100,  
        off_minimal_size=256,
        max_episode_rewards=50000,
        max_episode_steps=1200, # 200
        # agent 其他参数
        TD3_kwargs={
            'CNN_env_flag': 1,
            'pic_shape': env.observation_space.shape,
            "env": env,
            'action_low': env.action_space.low,
            'action_high': env.action_space.high,
            # soft update parameters
            'tau': 0.05, 
            # trick2: Delayed Policy Update
            'delay_freq': 1,
            # trick3: Target Policy Smoothing
            'policy_noise': 0.2,
            'policy_noise_clip': 0.5,
            # exploration noise
            'expl_noise': 0.5,
            # 探索的 noise 指数系数率减少 noise = expl_noise * expl_noise_exp_reduce_factor^t
            'expl_noise_exp_reduce_factor':  1 - 1e-4
        }
    )
    agent = TD3(
        state_dim=cfg.state_dim,
        actor_hidden_layers_dim=cfg.actor_hidden_layers_dim,
        critic_hidden_layers_dim=cfg.critic_hidden_layers_dim,
        action_dim=cfg.action_dim,
        actor_lr=cfg.actor_lr,
        critic_lr=cfg.critic_lr,
        gamma=cfg.gamma,
        TD3_kwargs=cfg.TD3_kwargs,
        device=cfg.device
    )
    agent.train()
    train_off_policy(env, agent, cfg, done_add=False, train_without_seed=True, wandb_flag=False, test_ep_freq=100)
    agent.load_model(cfg.save_path)
    agent.eval()
    env = gym.make(env_name, render_mode='human') # 
    env = FrameStack(
        ResizeObservation(
            GrayScaleObservation(CarV2SkipFrame(env, skip=5)), 
            shape=84
        ), 
        num_stack=4
    )
    play(env, agent, cfg, episode_count=2)

4、训练结果观察及后续工作

由于上传大小限制5MB, 所以对较多直线部分进行了裁剪

最终训练的时候发现会突然陷入低分状态,可以考虑间隔n(可以设置较大比如2000)个episode和最佳的reward比较,分数低于x%个百分点,就重新载入最佳参数,以继续训练。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/267688.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

10 NAT网络地址转换

广域网技术 上面聊的内容都是内网的一些配置,但内网终将要访问外网的,我们需要怎么处理呢?一般使用HDLC(高级数据链路控制协议)或者PPP(点对点协议)。 使用PPP安全接入Internet PPP&#xff0…

Podman配置mongodb

文章目录 查询镜像拉取镜像查看镜像运行容器创建root用户 查询镜像 podman search mongo拉取镜像 podman pull docker.io/library/mongo查看镜像 podman images运行容器 podman run -d -p 27017:27017 --namemongodb-test docker.io/library/mongo创建root用户 podman exe…

详解现实世界资产(RWAs)

区块链中的现实世界资产(RWAs)是代表实际和传统金融资产的数字通证,如货币、大宗商品、股票和债券。 实际世界资产(RWA)的通证化是区块链行业中最大的市场机会之一,潜在市场规模可达数万万亿美元。理论上&…

12章总结

一.集合类概述 java.util包中提供了一些集合类,这些集合类又被称为容器。 集合类与数组的不同之处: 数组的长度是固定的,集合的长度是可变的:数组用来存放基本类型的数据,集合用来存放对象的引用。 常…

windows下使用vccode+cmake编译cuda程序

1、在vscode中安装Nsight Visual Studio Code Edition 在vscode中安装插件能够对cuda的代码进行语法检查 2、编写cuda程序 #include <iostream>__global__ void mykernelfunc(){}; int main() {mykernelfunc<<<1,1>>>();std::cout << "hel…

C++ 比 C语言增加的新特性 2

1.C新增了带默认值参数的函数 1.1 格式 格式&#xff1a;返回值 函数名&#xff08;参数1初始值1&#xff0c;..........&#xff09;{} 例如&#xff1a;void function&#xff08;int a10&#xff09;{} 调用&#xff1a;不需要更改参数的值&#xff1a;function&#x…

Kubernetes 学习总结(40)—— Kubernetes 之 自动伸缩 HPA、VPA、CA和CPA详解

前言 Kubernetes 提供了多种自动伸缩机制&#xff0c;例如 HPA&#xff08;Horizontal Pod Autoscaling&#xff09;&#xff0c;可以根据不同情况动态调整 Pod 副本数量。此功能使 Pod 能够有效地处理当前流量&#xff0c;而无需管理员不断干预来调整副本数量。除了 HPA 之外…

每日一题——LeetCode160.相交链表

个人主页&#xff1a;白日依山璟 专栏&#xff1a;Java|数据结构与算法|每日一题 文章目录 1. 题目描述示例1&#xff1a;示例2&#xff1a;提示&#xff1a; 2. 思路3. 代码 1. 题目描述 给你两个单链表的头节点 headA 和 headB &#xff0c;请你找出并返回两个单链表相交的…

HarmonyOs4.0基础(一)

目录 一、HarmonyOs系统定义 1.1系统的技术特性(三大特征) 1.1.1、硬件互助、资源共享 1.1.2、一次开发、多端部署(面向开发者) 1.1.3、统一OS&#xff0c;弹性部署(支持多种API&#xff1a;ArkTs、JS、C/C、Java) 1.2、系统的技术架构 二、Harmony OS项目搭建 2.1、(D…

Github 2023-12-24 开源项目日报 Top10

根据Github Trendings的统计&#xff0c;今日(2023-12-24统计)共有10个项目上榜。根据开发语言中项目的数量&#xff0c;汇总情况如下&#xff1a; 开发语言项目数量Python项目5Jupyter Notebook项目2C项目1C项目1Go项目1Java项目1JavaScript项目1Ruby项目1 Serverless Frame…

【零基础入门Docker】如何构建Web服务Dockerfile?

✍面向读者&#xff1a;所有人 ✍所属专栏&#xff1a;零基础入门Docker专栏https://blog.csdn.net/arthas777/category_12455882.html 目录 步骤1&#xff1a;第一步是构建我们的Docker文件&#xff0c;您可以使用vim编辑器。 步骤2&#xff1a;下一步是使用docker build命令…

【Python机器学习系列】一文搞懂机器学习中的转换器和估计器(附案例)

一、引言 表格数据一套完整的机器学习建模流程如下&#xff1a; 在机器学习中&#xff0c;转换器&#xff08;Transformer&#xff09;和估计器&#xff08;Estimator&#xff09;是两个重要的概念&#xff0c;转换器和估计器在机器学习中扮演不同的角色&#xff0c;但它们通常…

【论文解读】CNN-Based Fast HEVC Quantization Parameter Mode Decision

时间&#xff1a;2019 年 级别&#xff1a;SCI 机构&#xff1a;南京信息工程大学 摘要 随着多媒体呈现技术、图像采集技术和互联网行业的发展&#xff0c;远程通信的方式已经从以前的书信、音频转变为现在的音频/视频。和 视频在工作、学习和娱乐中的比例不断提高&#xff0…

Python如何将图片转换成字符

PIL(Python Image Library)库是Python平台上一个功能强大的图像处理标准库&#xff0c;支持图像的存储、显示和处理&#xff0c;几乎可以处理所有图片格式&#xff0c;如图像的压缩、裁剪、叠加、添加文字等等。 安装PIL库:pip install pillow from PIL import Image ascii_cha…

35c3 krautflare

参考这篇文章可以彻底了解本题的漏洞所在 https://xz.aliyun.com/t/6527 由于Math.expm1经过patch以后的返回值不可能是-0&#xff0c;但是patch的地方是在typer优化中&#xff0c;所以实际上如果没有优化的话是可以返回-0的&#xff0c;这就意味着如果我们先不停地Math.expm1…

手机技巧:安卓微信8.0.45测试版功能来了

目录 一、更新介绍 二、本次功能更新介绍 2.1 小程序界面优化 2.2 小程序个性化推荐支持关闭 三、其他实用的微信使用长按小技巧 3.1、长按对话框 3.2、长按搜索 3.3、长按相册 3.4、长按视频 3.5、长按表情包&#xff08;能开启2个技巧&#xff09; 3.6、长按音频文…

Linux——环境变量与本地变量

环境变量与本地变量 文章目录 环境变量与本地变量1. 环境变量1.1 命令行参数1.2 环境变量PATH1.3 环境变量的概念和相关操作1.3.1 用命令查看环境变量1.3.2 用命令添加环境变量&#xff1a;1.3.2 用命令删除环境变量1.3.3 利用代码查看环境变量1.3.4 利用代码修改或添加环境变量…

运行时和编译时使用的so库不同是否影响可执行文件执行

引子 近日遇到如下问题: 1.如果可执行文件依赖的so库在编译和执行阶段使用的名字一样&#xff0c;但是内容不一样&#xff0c;比如运行时相比于编译时在so库里增加了几个api定义&#xff0c;so库还可以正常使用吗&#xff1f; 2.如果可执行文件依赖的so库在编译和执行阶段使用的…

一文带你认识JVM

&#x1f697;&#x1f697;&#x1f697;今天给大家分享的关于JVM的一些基本认识。 清风的CSDN博客 &#x1f6e9;️&#x1f6e9;️&#x1f6e9;️希望我的文章能对你有所帮助&#xff0c;有不足的地方还请各位看官多多指教&#xff0c;大家一起学习交流&#xff01; ✈️✈…

Lambda表达式超详解

目录 背景 Lambda表达式的用法 函数式接口 Lambda表达式的基本使用 语法精简 变量捕获 匿名内部类 匿名内部类中的变量捕获 Lambda的变量捕获 Lambda表达式在类集中的使用 Collection接口 List接口 Map接口 总结 背景 Lambda表达式是Java SE 8中的一个重要的新特性.…