基于DQN的自动驾驶小车绕圈任务

news2025/7/15 12:54:45

1.任务介绍

        任务来源: DQN: Deep Q Learning |自动驾驶入门(?) |算法与实现
        任务原始代码: self-driving car
        最终效果:
请添加图片描述
        以下所有内容,都是对上面DQN代码的改进,在与deepseek的不断提问交互中学习理解代码,并且对代码做出改进,让小车能够尽可能的跑的远
        这个DQN代码依然有很多可改进的地方,受限于个人精力,没有再继续探索下去,欢迎指正交流。

        后续会更新DDPG算法和PPO算法在此任务上的应用调试记录的代码。

2.调试记录

        问题:使用原始DQN代码,小车很难学习到正确的策略去绕圈,并且跑的距离也很短

    2.1 更改学习率(lr)和探索率(eps_end)

        记录:图片为最初设置的average_score,增大减小都没有显著效果;
在这里插入图片描述

    2.2 增加模型层数

        记录:模型采用三层,中间增加256*256的隐藏层;模型增加一层之后,效果显著提升,训练到400次的时候,average_score已经能够达到5.3w
        原因:模型太简单了,这样的结构可能不足以捕捉复杂的状态空间,导致训练的效果差;
        现阶段问题:横向是不太稳定的,小车跑起来方向频繁抖动,且整体流畅度不够

    2.3 横纵向action细化,同时更改max_mem_size=1000000

        记录:[5,0],[2,1],[2,-1],[0,1],[0,0],[0,-1],[-2,1],[-2,-1],[-5,0],一共9中组合;修改完之后流畅度有较大提升,avg_score也有很大提升;
在这里插入图片描述

    2.4 奖励函数优化

        记录:奖励函数去掉距离的概念,主要考虑居中性、速度的连续性、转角的连续性;可以看到avg_score已经可以达到非常大的值,变化趋势都是先持续增加,然后在迭代次数达到200-300次左右时震荡下降
在这里插入图片描述
在这里插入图片描述
        现阶段问题:avg_score变化趋势都是先持续增加,然后在迭代次数达到200-300次左右时震荡下降

    2.5 增加目标网络,且目标网络更新用软更新

        记录:原始DNQ代码中是没有目标网络的,训练震荡会比较大,增加目标网络可以稳定训练; 小车在行驶过程中频繁的小幅度左右摇头的情况变少了,不过感觉纵向的加减速比较频繁,目标网络用软更新之后明显感觉在两台电脑上训练起来性能提升都很快,70次左右的时候都达到了百万级别,前期分数异常高,后期分数抖动很大
在这里插入图片描述

    2.6 状态中增加最小距离,自车车速,自车转角,使得问题符合MDP

        记录:这一点其实早该考虑到的,原来的状态返回之后雷达距离,是缺少一些自车相关的信息
在这里插入图片描述
在这里插入图片描述
        现阶段问题:模型在290次左右的时候score达到峰值,后面越训练得分越低
        原因:核心原因应该还是之前的经验被遗忘导致的,memory_size由100W改为1000W就有很大改善了

    2.7 状态中雷达数据去掉强制int转换,防止丢失精度影响模型策略学习和泛化能力

        记录:最大score能到250W了,并且mem_cntr已经远远超过100W,证明有帮助,不过仍然存在越训练score越低的情况,大概在sp355次左右;在去掉雷达int的强制转换之后,训练效果有很大提升;
在这里插入图片描述在这里插入图片描述

    2.8 observation归一化

        记录:episode 145,单次score已经能够达到2.5亿左右,平均score达到500W
在这里插入图片描述

3.代码主要改进点

    3.1 DeepQNetwork

        1.网络增加一层;

class DeepQNetwork(nn.Module):

    def __init__(self, lr, input_dims, n_actions):
        ...
        # 网络增加一层
        self.fc1 = nn.Linear(*input_dims, 256)
        self.fc2 = nn.Linear(256, 256)
        self.fc3 = nn.Linear(256, n_actions)
        ...

        2.网络激活函数改变;

class DeepQNetwork(nn.Module):

    def forward(self, state):
        # 激活函数也做了一定的尝试变更
        x = F.relu(self.fc1(state))
        x = F.tanh(self.fc2(x))
        ...

        3.输出action做对应适配;

class DeepQNetwork(nn.Module):

    def forward(self, state):
        ...
        # 输出action也做对应适配
        actions = self.fc3(x)
        return actions

    3.2 Agent

        1.增加目标网络,用于稳定训练;

class Agent:
    def __init__(self, gamma, epsilson, lr, input_dims, batch_size, n_actions,
                 max_mem_size=100000, eps_end=0.01, eps_dec=5e-4):
        ...
        self.Q_eval = DeepQNetwork(lr, input_dims, n_actions)
        # 增加目标网络,用于稳定训练
        self.Q_target = DeepQNetwork(lr, input_dims, n_actions)
        ...

    def learn_dqp(self, n_game):
        ...
        # TODO: 确认下为什么要用[batch_index, action_batch]----其目的是为每个样本选择实际执行动作的Q值。
        Q_eval = self.Q_eval.forward(state_batch)[batch_index, action_batch]
        # 直接调self.Q_target(new_state_batch)和self.Q_eval.forward(new_state_batch)的效果是一样的
        q_next = self.Q_target(new_state_batch)
        ...

        2.增加学习率调度器(指数衰减);

class Agent:
    def __init__(self, gamma, epsilson, lr, input_dims, batch_size, n_actions,
                 max_mem_size=100000, eps_end=0.01, eps_dec=5e-4):
        ...
        self.lr_min = 1e-6
        self.loss_value = 0
        # 新增学习率调度器(指数衰减)
        self.lr_scheduler = optim.lr_scheduler.ExponentialLR(
            self.Q_eval.optimizer,
            gamma=0.995  # 每episode学习率衰减0.5%
        )
        self.action_memory = np.zeros(self.mem_size, dtype=np.int32)
        ...

    def learn_dqp(self, n_game):
        ...
        self.Q_eval.optimizer.step()
        # 学习率调整必须在参数更新之后
        if self.mem_cntr % 2000 == 0:
            if self.lr_scheduler.get_last_lr()[0] > self.lr_min:
                self.lr_scheduler.step()  # 调整学习率
                print("lr updated!, current lr = {}".format(self.lr_scheduler.get_last_lr()[0]))
        ...

        3.目标网络参数更新采用软更新;

class Agent:
    def __init__(self, gamma, epsilson, lr, input_dims, batch_size, n_actions,
                 max_mem_size=100000, eps_end=0.01, eps_dec=5e-4):
        ...
        self.Q_target = DeepQNetwork(lr, input_dims, n_actions)
        self.update_target_freq = 1000  # 每1000步同步一次
        self.tau = 0.05  # 软更新系数
        ...

    def soft_update_target(self):
        for target_param, param in zip(self.Q_target.parameters(),
                                       self.Q_eval.parameters()):
            target_param.data.copy_(
                self.tau * param.data + (1 - self.tau) * target_param.data
            )

    def learn_dqp(self, n_game):
        ...
        # 软更新目标网络
        self.soft_update_target()
        ...

        4.增加了double-DQN的实现方式;

class Agent:
    def learn_double_dqp(self, n_game):
        # TODO: 看一下这一块是不是论文中提到的每间几千步更新一下?
        # 这一块他设定了一个batch_size=64,即每64个为一批,然后通过模型更新一下参数
        if self.mem_cntr < self.batch_size:
            return
        self.Q_eval.optimizer.zero_grad()
        max_mem = min(self.mem_cntr, self.mem_size)
        batch = np.random.choice(max_mem, self.batch_size, replace=False)
        batch_index = np.arange(self.batch_size, dtype=np.int32)
        state_batch = torch.tensor(self.state_memory[batch]).to(self.Q_eval.device)
        new_state_batch = torch.tensor(self.new_state_memory[batch]).to(self.Q_eval.device)
        reward_batch = torch.tensor(self.reward_memory[batch]).to(self.Q_eval.device)
        terminal_batch = torch.tensor(self.terminal_memory[batch]).to(self.Q_eval.device)

        action_batch = self.action_memory[batch]
        # TODO: 确认下为什么要用[batch_index, action_batch]----其目的是为每个样本选择实际执行动作的Q值。
        Q_eval = self.Q_eval.forward(state_batch)[batch_index, action_batch]

        # ========== Double DQN修改部分 ==========
        q_eval_next = self.Q_eval(new_state_batch)
        max_actions = torch.argmax(q_eval_next, dim=1)
        # 输出为[batch_size,action_size],每一行对应随机选取的state,每一列对应当前state对应的每个action估计的Q值
        # print(q_eval_next)
        # 输出为每个state下能够得到最大Q值的action,dim为1*batch_size
        # print(max_actions)
        # 输出为每个state下能够得到最大Q值的action,不过将dim转换为了batch_size*1
        # print(max_actions.unsqueeze(1))
        # 输出为,之前得到的每个state下对应的最优action,在当前网络中对应的Q值,dim=batch_size*1
        # print(self.Q_target(new_state_batch).gather(1, max_actions.unsqueeze(1)))
        # 输出为,之前得到的每个state下对应的最优action,在当前网络中对应的Q值,不过将dim转换为了1*batch_size
        # print(self.Q_target(new_state_batch).gather(1, max_actions.unsqueeze(1)).squeeze(1))
        q_next = self.Q_target(new_state_batch).gather(1, max_actions.unsqueeze(1)).squeeze(1)
        q_next[terminal_batch] = 0.0
        q_target = reward_batch + self.gamma * q_next
        # =======================================

        # 估计Q值,因为Q值本来就是未来的预期奖励,我们通过估计未来的预期奖励和当前实际过得的预期奖励的差值,来更新
        # 模型,所以你最大化你的value function,就是在最优化你的policy本身
        loss = self.Q_eval.loss(q_target, Q_eval).to(self.Q_eval.device)
        self.loss_value = loss.item()
        loss.backward()
        # 添加梯度裁剪
        # torch.nn.utils.clip_grad_norm_(self.Q_eval.parameters(), max_norm=1.0)
        self.Q_eval.optimizer.step()
        # 学习率调整必须在参数更新之后
        if self.mem_cntr % 2000 == 0:
            if self.lr_scheduler.get_last_lr()[0] > self.lr_min:
                self.lr_scheduler.step()  # 调整学习率
                print("lr updated!, current lr = {}".format(self.lr_scheduler.get_last_lr()[0]))
        # 软更新目标网络
        self.soft_update_target()

        self.epsilon = self.epsilon - self.eps_dec if self.epsilon > self.eps_min else self.eps_min

    3.3 Agent

        1.小车返回状态优化;
                状态中增加最小距离,自车车速,自车转角,使得问题符合MDP;
                状态中雷达数据去掉强制int转换,防止丢失精度影响模型策略学习和泛化能力;
                observation归一化;

class Car:

    def get_data(self):
        # Get Distances To Border
        return_values = [0] * len(self.radars)
        self.current_lateral_min_dist = 60
        for i, radar in enumerate(self.radars):
            return_values[i] = radar[1] / 300.0
            if radar[1] < self.current_lateral_min_dist:
                self.current_lateral_min_dist = radar[1]

        angle_rad = np.deg2rad(self.angle)
        return_values = return_values + [self.current_lateral_min_dist / 30,
                                         np.clip(self.speed / 20.0, 0.0, 1.0),
                                         np.sin(angle_rad), np.cos(angle_rad)]
        return return_values

        2.奖励函数优化;
                主要去掉了距离的概念,引入居中性、速度连续性、转角连续性概念,这么设计是因为只要小车始终保持居中,并且有速度,小车就可以一直跑下去;

class Car:

    def get_reward_optimized(self, game_map, alive_count_total):
        # 居中性
        lateral_reward = 1.0
        # print(self.current_lateral_min_dist)
        if self.current_lateral_min_dist / 60 > 0.5:
            lateral_reward = self.current_lateral_min_dist / 60
        elif self.current_lateral_min_dist / 60 < 0.4:
            lateral_reward = -0.5
        else:
            lateral_reward = 0.0

        # 速度基础
        speed_base_reward = self.speed / min(12 + alive_count_total / 100000, 20)

        # 速度连续性
        if len(self.speed_memory) >= 4:
            self.speed_memory = self.speed_memory[1:]
        self.speed_memory.append(self.speed)
        speed_up_discount = 1.0
        if self.speed_memory[-1] - self.speed_memory[0] >= 3 and lateral_reward > 0.0:
            speed_up_discount = -0.5
        elif self.speed_memory[-1] - self.speed_memory[0] >= 2 and lateral_reward > 0.0:
            speed_up_discount = 0.7

        # 转角连续性,写的随意了点,目的就是防止方向左右抖动
        angle_discount = 1.0
        if len(self.angle_memory) >= 5:
            self.angle_memory = self.angle_memory[1:]
        self.angle_memory.append(self.angle)
        aaa = [0] * 4
        if len(self.angle_memory) >= 5:
            for i in range(1, 5):
                aaa[i-1] = self.angle_memory[i] - self.angle_memory[i-1]
        bbb = [0] * 3
        for j in range(1, 4):
            bbb[j-1] = 1 if aaa[j-1] * aaa[j] < 0 else 0
        if sum(bbb) >= 3 and lateral_reward > 0.0:
            angle_discount = 0.8

        # print(lateral_reward, speed_up_discount, angle_discount, " ====== ", self.speed_memory)
        return 100 * lateral_reward * speed_up_discount * angle_discount

    3.4 train

        1.action优化;

def train():
    ...
    agent = Agent(gamma=0.99, epsilson=1.0, batch_size=256, n_actions=9,
                  eps_end=0.1, input_dims=[num_radar + 4], lr=0.005,
                  max_mem_size=1000000, eps_dec=1e-4)
    ...
        while not done:
            action = agent.choose_action(observation)
            # [5,0],[2,1],[2,-1],[0,1],[0,0],[0,-1],[-2,1],[-2,-1],[-5,0]
            if action == 0:
                car.angle += 5
                car.speed += 0
            elif action == 1:
                car.angle += 2
                if car.speed < 20:
                    car.speed += 1
            elif action == 2:
                car.angle += 2
                if car.speed - 1 >= 12:
                    car.speed -= 1
            elif action == 3:
                car.angle += 0
                if car.speed < 20:
                    car.speed += 1
            elif action == 4:
                car.angle += 0
                car.speed += 0
            elif action == 5:
                car.angle += 0
                if car.speed - 1 >= 12:
                    car.speed -= 1
            elif action == 6:
                car.angle -= 2
                if car.speed < 20:
                    car.speed += 1
            elif action == 7:
                car.angle -= 2
                if car.speed - 1 >= 12:
                    car.speed -= 1
            else:
                car.angle -= 5
                car.speed += 0
			...

4.任务思考

        1.DQN为什么不能处理连续控制性问题?
        DQN的核心思想是为每个可能的动作计算Q值,并选择Q值最大的动作。在连续动作空间中,动作是无限多的,连续动作空间离散化会造成维度灾难
        2.DQN总是过估计的原因?
在这里插入图片描述
        一方面总是取max操作;另一方面用自己的估计再去估计自己,如果当前已经出现高估,那么下一轮的TD-target更会高估,相当于高估会累计

        3.计算loss时,q_target对应的是每个状态下的最大Q值的动作,而Q_eval对应的却是从action_memory中随机选取的一批动作。这样是否会导致动作不匹配的问题,进而影响学习效果?
        在DQN中,q_target的计算遵循目标策略(最大化Q值),而Q_eval基于行为策略(实际执行的动作)。这种分离是Q-learning的核心设计,不会导致动作不匹配问题,反而是算法收敛的关键。
        Q_eval的作用:
                估实际动评作的价值:即使动作是随机探索的,也需要知道这些动作的历史价值。
        更新方向:
                通过loss = (q_target - Q_eval)^2,将实际动作的Q值向更优的目标值调整。
        q_target的作用:
                引导策略优化:通过最大化下一状态的Q值,逐步将策略向最优方向推进。

        4.DQN算法的网络中,使用MLP和使用CNN会有什么差异?
        总结:
                MLP:适合低维结构化数据,依赖显式特征工程,计算简单但高维易过拟合。
                CNN:适合高维空间数据(如图像),自动提取局部特征,参数共享提升效率。
        选择关键:根据输入数据的空间属性和维度,权衡特征提取需求与计算资源。
        在DQN中,合理选择网络结构是平衡性能与效率的核心。例如,Atari DQN的成功离不开CNN对图像特征的自动提取,而简单控制任务中MLP的轻量化优势则更为突出。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

5.完整代码

        调试过程中的一些代码保留着,以提些许思考。

from typing import AsyncGenerator
import pygame
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import math
import os


class DeepQNetwork(nn.Module):

    def __init__(self, lr, input_dims, n_actions):
        super(DeepQNetwork, self).__init__()
        self.input_dims = input_dims
        self.n_actions = n_actions
        # 网络增加一层
        self.fc1 = nn.Linear(*input_dims, 256)
        self.fc2 = nn.Linear(256, 256)
        self.fc3 = nn.Linear(256, n_actions)
        self.fc = nn.Linear(*input_dims, n_actions)
        self.optimizer = optim.Adam(self.parameters(), lr=lr)
        self.loss = nn.MSELoss()
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.to(self.device)

    def forward(self, state):
        # 激活函数也做了一定的尝试变更
        x = F.relu(self.fc1(state))
        x = F.tanh(self.fc2(x))
        # 输出action也做对应适配
        actions = self.fc3(x)
        return actions


class Agent:
    def __init__(self, gamma, epsilson, lr, input_dims, batch_size, n_actions,
                 max_mem_size=100000, eps_end=0.01, eps_dec=5e-4):
        self.gamma = gamma
        self.epsilon = epsilson
        self.eps_min = eps_end
        self.eps_dec = eps_dec
        self.lr = lr
        self.action_space = [i for i in range(n_actions)]
        self.mem_size = max_mem_size  # memory size
        self.batch_size = batch_size
        self.mem_cntr = 0  # memory current
        self.input_dims = input_dims
        self.n_actions = n_actions

        self.Q_eval = DeepQNetwork(lr, input_dims, n_actions)
        self.Q_target = DeepQNetwork(lr, input_dims, n_actions)
        self.update_target_freq = 1000  # 每1000步同步一次
        self.tau = 0.05  # 软更新系数
        self.lr_min = 1e-6
        self.state_memory = np.zeros((self.mem_size, *input_dims), dtype=np.float32)
        self.new_state_memory = np.zeros((self.mem_size, *input_dims), dtype=np.float32)
        self.loss_value = 0
        # 新增学习率调度器(指数衰减)
        self.lr_scheduler = optim.lr_scheduler.ExponentialLR(
            self.Q_eval.optimizer,
            gamma=0.995  # 每episode学习率衰减0.5%
        )
        self.action_memory = np.zeros(self.mem_size, dtype=np.int32)
        self.reward_memory = np.zeros(self.mem_size, dtype=np.float32)
        self.terminal_memory = np.zeros(self.mem_size, dtype=bool)

    def store_transition(self, state_old, action, reward, state_new, done):
        index = self.mem_cntr % self.mem_size
        self.state_memory[index] = state_old
        self.new_state_memory[index] = state_new
        self.reward_memory[index] = reward
        self.action_memory[index] = action
        self.terminal_memory[index] = done

        self.mem_cntr += 1

    def choose_action(self, observation):
        if np.random.random() > self.epsilon:
            state = torch.tensor([observation], dtype=torch.float32).to(self.Q_eval.device)
            actions = self.Q_eval.forward(state)
            action = torch.argmax(actions).item()
        else:
            action = np.random.choice(self.action_space)
        return action

    def soft_update_target(self):
        for target_param, param in zip(self.Q_target.parameters(),
                                       self.Q_eval.parameters()):
            target_param.data.copy_(
                self.tau * param.data + (1 - self.tau) * target_param.data
            )

    def learn_dqp(self, n_game):
        # TODO: 看一下这一块是不是论文中提到的每间几千步更新一下?
        # 这一块他设定了一个batch_size=64,即每64个为一批,然后通过模型更新一下参数
        if self.mem_cntr < self.batch_size:
            return
        self.Q_eval.optimizer.zero_grad()
        max_mem = min(self.mem_cntr, self.mem_size)
        batch = np.random.choice(max_mem, self.batch_size, replace=False)
        batch_index = np.arange(self.batch_size, dtype=np.int32)
        state_batch = torch.tensor(self.state_memory[batch]).to(self.Q_eval.device)
        new_state_batch = torch.tensor(self.new_state_memory[batch]).to(self.Q_eval.device)
        reward_batch = torch.tensor(self.reward_memory[batch]).to(self.Q_eval.device)
        terminal_batch = torch.tensor(self.terminal_memory[batch]).to(self.Q_eval.device)

        action_batch = self.action_memory[batch]
        # TODO: 确认下为什么要用[batch_index, action_batch]----其目的是为每个样本选择实际执行动作的Q值。
        Q_eval = self.Q_eval.forward(state_batch)[batch_index, action_batch]
        # 直接调self.Q_target(new_state_batch)和self.Q_eval.forward(new_state_batch)的效果是一样的
        q_next = self.Q_target(new_state_batch)
        # 终止状态(Terminal State):如果当前状态转移后进入终止状态(例如游戏结束、任务完成或失败),
        # 则没有下一个状态s′,因此未来累积奖励的期望值为0。
        q_next[terminal_batch] = 0.0

        # 目标网络对每个state对应能够得到的最大Q值的action
        # print(torch.max(q_next, dim=1))
        # 目标网络对每个state对应能够得到的最大Q值
        # print(torch.max(q_next, dim=1)[0])
        q_target = reward_batch + self.gamma * torch.max(q_next, dim=1)[0]

        # 估计Q值,因为Q值本来就是未来的预期奖励,我们通过估计未来的预期奖励和当前实际过得的预期奖励的差值,来更新
        # 模型,所以你最大化你的value function,就是在最优化你的policy本身
        loss = self.Q_eval.loss(q_target, Q_eval).to(self.Q_eval.device)
        self.loss_value = loss.item()
        loss.backward()
        # 添加梯度裁剪
        # torch.nn.utils.clip_grad_norm_(self.Q_eval.parameters(), max_norm=1.0)
        self.Q_eval.optimizer.step()
        # 学习率调整必须在参数更新之后
        if self.mem_cntr % 2000 == 0:
            if self.lr_scheduler.get_last_lr()[0] > self.lr_min:
                self.lr_scheduler.step()  # 调整学习率
                print("lr updated!, current lr = {}".format(self.lr_scheduler.get_last_lr()[0]))
        # 软更新目标网络
        self.soft_update_target()

        self.epsilon = self.epsilon - self.eps_dec if self.epsilon > self.eps_min else self.eps_min

    def learn_double_dqp(self, n_game):
        # TODO: 看一下这一块是不是论文中提到的每间几千步更新一下?
        # 这一块他设定了一个batch_size=64,即每64个为一批,然后通过模型更新一下参数
        if self.mem_cntr < self.batch_size:
            return
        self.Q_eval.optimizer.zero_grad()
        max_mem = min(self.mem_cntr, self.mem_size)
        batch = np.random.choice(max_mem, self.batch_size, replace=False)
        batch_index = np.arange(self.batch_size, dtype=np.int32)
        state_batch = torch.tensor(self.state_memory[batch]).to(self.Q_eval.device)
        new_state_batch = torch.tensor(self.new_state_memory[batch]).to(self.Q_eval.device)
        reward_batch = torch.tensor(self.reward_memory[batch]).to(self.Q_eval.device)
        terminal_batch = torch.tensor(self.terminal_memory[batch]).to(self.Q_eval.device)

        action_batch = self.action_memory[batch]
        # TODO: 确认下为什么要用[batch_index, action_batch]----其目的是为每个样本选择实际执行动作的Q值。
        Q_eval = self.Q_eval.forward(state_batch)[batch_index, action_batch]

        # ========== Double DQN修改部分 ==========
        q_eval_next = self.Q_eval(new_state_batch)
        max_actions = torch.argmax(q_eval_next, dim=1)
        # 输出为[batch_size,action_size],每一行对应随机选取的state,每一列对应当前state对应的每个action估计的Q值
        # print(q_eval_next)
        # 输出为每个state下能够得到最大Q值的action,dim为1*batch_size
        # print(max_actions)
        # 输出为每个state下能够得到最大Q值的action,不过将dim转换为了batch_size*1
        # print(max_actions.unsqueeze(1))
        # 输出为,之前得到的每个state下对应的最优action,在当前网络中对应的Q值,dim=batch_size*1
        # print(self.Q_target(new_state_batch).gather(1, max_actions.unsqueeze(1)))
        # 输出为,之前得到的每个state下对应的最优action,在当前网络中对应的Q值,不过将dim转换为了1*batch_size
        # print(self.Q_target(new_state_batch).gather(1, max_actions.unsqueeze(1)).squeeze(1))
        q_next = self.Q_target(new_state_batch).gather(1, max_actions.unsqueeze(1)).squeeze(1)
        q_next[terminal_batch] = 0.0
        q_target = reward_batch + self.gamma * q_next
        # =======================================

        # 估计Q值,因为Q值本来就是未来的预期奖励,我们通过估计未来的预期奖励和当前实际过得的预期奖励的差值,来更新
        # 模型,所以你最大化你的value function,就是在最优化你的policy本身
        loss = self.Q_eval.loss(q_target, Q_eval).to(self.Q_eval.device)
        self.loss_value = loss.item()
        loss.backward()
        # 添加梯度裁剪
        # torch.nn.utils.clip_grad_norm_(self.Q_eval.parameters(), max_norm=1.0)
        self.Q_eval.optimizer.step()
        # 学习率调整必须在参数更新之后
        if self.mem_cntr % 2000 == 0:
            if self.lr_scheduler.get_last_lr()[0] > self.lr_min:
                self.lr_scheduler.step()  # 调整学习率
                print("lr updated!, current lr = {}".format(self.lr_scheduler.get_last_lr()[0]))
        # 软更新目标网络
        self.soft_update_target()

        self.epsilon = self.epsilon - self.eps_dec if self.epsilon > self.eps_min else self.eps_min


WIDTH = 1920
HEIGHT = 1080
CAR_SIZE_X = 60
CAR_SIZE_Y = 60
BORDER_COLOR = (255, 255, 255, 255)  # Color To Crash on Hit
current_generation = 0  # Generation counter


class Car:

    def __init__(self, boundary_x, boundary_y, num_radar):
        # Load Car Sprite and Rotate
        self.sprite = pygame.image.load('car.png').convert()  # Convert Speeds Up A Lot
        self.sprite = pygame.transform.scale(self.sprite, (CAR_SIZE_X, CAR_SIZE_Y))
        self.rotated_sprite = self.sprite

        # self.position = [690, 740] # Starting Position
        self.position = [830, 920]  # Starting Position
        self.angle = 0
        self.angle_memory = []
        self.speed = 0
        self.speed_memory = []

        self.speed_set = False  # Flag For Default Speed Later on

        self.center = [self.position[0] + CAR_SIZE_X / 2, self.position[1] + CAR_SIZE_Y / 2]  # Calculate Center

        self.radars = [[(0, 0), 60]] * num_radar  # List For Sensors / Radars
        self.drawing_radars = []  # Radars To Be Drawn
        self.current_lateral_min_dist = 60

        self.alive = True  # Boolean To Check If Car is Crashed

        self.distance = 0  # Distance Driven
        self.time = 0  # Time Passed

        self.width = 0
        self.height = 0

        self.boundary_x = boundary_x
        self.boundary_y = boundary_y

    def draw(self, screen):
        screen.blit(self.rotated_sprite, self.position)  # Draw Sprite
        self.draw_radar(screen)  # OPTIONAL FOR SENSORS

    def draw_radar(self, screen):
        # Optionally Draw All Sensors / Radars
        for radar in self.radars:
            position = radar[0]

            pygame.draw.line(screen, (0, 255, 0), self.center, position, 1)
            pygame.draw.circle(screen, (0, 255, 0), position, 5)

    def check_collision(self, game_map):
        self.alive = True
        for point in self.corners:
            # If Any Corner Touches Border Color -> Crash
            # Assumes Rectangle
            if game_map.get_at((int(point[0]), int(point[1]))) == BORDER_COLOR:
                self.alive = False
                break

    def check_radar(self, degree, game_map):
        length = 0
        x = int(self.center[0] + math.cos(math.radians(360 - (self.angle + degree))) * length)
        y = int(self.center[1] + math.sin(math.radians(360 - (self.angle + degree))) * length)

        # While We Don't Hit BORDER_COLOR AND length < 300 (just a max) -> go further and further
        while not game_map.get_at((x, y)) == BORDER_COLOR and length < 300:
            length = length + 1
            x = int(self.center[0] + math.cos(math.radians(360 - (self.angle + degree))) * length)
            y = int(self.center[1] + math.sin(math.radians(360 - (self.angle + degree))) * length)

        # Calculate Distance To Border And Append To Radars List TODO: update dist calculate
        dist = int(math.sqrt(math.pow(x - self.center[0], 2) + math.pow(y - self.center[1], 2)))
        self.radars.append([(x, y), dist])

    def update(self, game_map):
        # Set The Speed To 20 For The First Time
        # Only When Having 4 Output Nodes With Speed Up and Down
        if not self.speed_set:
            self.speed = 10
            self.speed_set = True

        self.width, self.height = game_map.get_size()

        # Get Rotated Sprite And Move Into The Right X-Direction
        # Don't Let The Car Go Closer Than 20px To The Edge
        self.rotated_sprite = self.rotate_center(self.sprite, self.angle)
        self.position[0] += math.cos(math.radians(360 - self.angle)) * self.speed
        self.position[0] = max(self.position[0], 20)
        self.position[0] = min(self.position[0], WIDTH - 120)

        # Increase Distance and Time
        self.distance += self.speed
        self.time += 1

        # Same For Y-Position
        self.position[1] += math.sin(math.radians(360 - self.angle)) * self.speed
        self.position[1] = max(self.position[1], 20)
        self.position[1] = min(self.position[1], WIDTH - 120)

        # Calculate New Center
        self.center = [int(self.position[0]) + CAR_SIZE_X / 2, int(self.position[1]) + CAR_SIZE_Y / 2]
        # print("center: {}".format(self.center))

        # Calculate Four Corners
        # Length Is Half The Side
        length = 0.5 * CAR_SIZE_X
        left_top = [self.center[0] + math.cos(math.radians(360 - (self.angle + 30))) * length,
                    self.center[1] + math.sin(math.radians(360 - (self.angle + 30))) * length]
        right_top = [self.center[0] + math.cos(math.radians(360 - (self.angle + 150))) * length,
                     self.center[1] + math.sin(math.radians(360 - (self.angle + 150))) * length]
        left_bottom = [self.center[0] + math.cos(math.radians(360 - (self.angle + 210))) * length,
                       self.center[1] + math.sin(math.radians(360 - (self.angle + 210))) * length]
        right_bottom = [self.center[0] + math.cos(math.radians(360 - (self.angle + 330))) * length,
                        self.center[1] + math.sin(math.radians(360 - (self.angle + 330))) * length]
        self.corners = [left_top, right_top, left_bottom, right_bottom]

        # Check Collisions And Clear Radars
        self.check_collision(game_map)
        self.radars.clear()

        # From -90 To 120 With Step-Size 45 Check Radar
        for d in range(-120, 126, 15):  # -90,-45,0,45,90z
            self.check_radar(d, game_map)

    def get_data(self):
        # Get Distances To Border
        return_values = [0] * len(self.radars)
        self.current_lateral_min_dist = 60
        for i, radar in enumerate(self.radars):
            return_values[i] = radar[1] / 300.0
            if radar[1] < self.current_lateral_min_dist:
                self.current_lateral_min_dist = radar[1]

        angle_rad = np.deg2rad(self.angle)
        return_values = return_values + [self.current_lateral_min_dist / 30,
                                         np.clip(self.speed / 20.0, 0.0, 1.0),
                                         np.sin(angle_rad), np.cos(angle_rad)]
        return return_values

    def is_alive(self):
        # Basic Alive Function
        return self.alive

    def get_reward_optimized(self, game_map, alive_count_total):
        # 居中性
        lateral_reward = 1.0
        # print(self.current_lateral_min_dist)
        if self.current_lateral_min_dist / 60 > 0.5:
            lateral_reward = self.current_lateral_min_dist / 60
        elif self.current_lateral_min_dist / 60 < 0.4:
            lateral_reward = -0.5
        else:
            lateral_reward = 0.0

        # 速度基础
        speed_base_reward = self.speed / min(12 + alive_count_total / 100000, 20)

        # 速度连续性
        if len(self.speed_memory) >= 4:
            self.speed_memory = self.speed_memory[1:]
        self.speed_memory.append(self.speed)
        speed_up_discount = 1.0
        if self.speed_memory[-1] - self.speed_memory[0] >= 3 and lateral_reward > 0.0:
            speed_up_discount = -0.5
        elif self.speed_memory[-1] - self.speed_memory[0] >= 2 and lateral_reward > 0.0:
            speed_up_discount = 0.7

        # 转角连续性
        angle_discount = 1.0
        if len(self.angle_memory) >= 5:
            self.angle_memory = self.angle_memory[1:]
        self.angle_memory.append(self.angle)
        aaa = [0] * 4
        if len(self.angle_memory) >= 5:
            for i in range(1, 5):
                aaa[i-1] = self.angle_memory[i] - self.angle_memory[i-1]
        bbb = [0] * 3
        for j in range(1, 4):
            bbb[j-1] = 1 if aaa[j-1] * aaa[j] < 0 else 0
        if sum(bbb) >= 3 and lateral_reward > 0.0:
            angle_discount = 0.8

        # print(lateral_reward, speed_up_discount, angle_discount, " ====== ", self.speed_memory)
        return 100 * lateral_reward * speed_up_discount * angle_discount

    def rotate_center(self, image, angle):
        # Rotate The Rectangle
        rectangle = image.get_rect()
        rotated_image = pygame.transform.rotate(image, angle)
        rotated_rectangle = rectangle.copy()
        rotated_rectangle.center = rotated_image.get_rect().center
        rotated_image = rotated_image.subsurface(rotated_rectangle).copy()
        return rotated_image


def train():
    pygame.init()
    screen = pygame.display.set_mode((WIDTH, HEIGHT))
    game_map = pygame.image.load('map.png').convert()  # Convert Speeds Up A Lot
    clock = pygame.time.Clock()
    num_radar = 17

    agent = Agent(gamma=0.99, epsilson=1.0, batch_size=256, n_actions=9,
                  eps_end=0.1, input_dims=[num_radar + 4], lr=0.005,
                  max_mem_size=1000000, eps_dec=1e-4)
    scores, eps_history = [], []
    average_scores = []
    distance = []
    average_distance = []
    alive_counts = []
    average_alive_counts = []
    n_games = 500
    optimal_score_and_episode = [0, 0]

    for i in range(n_games):
        car = Car([], [], num_radar)
        done = False
        score = 0
        observation = car.get_data()
        alive_count = 0
        while not done:
            action = agent.choose_action(observation)
            # [5,0],[2,1],[2,-1],[0,1],[0,0],[0,-1],[-2,1],[-2,-1],[-5,0]
            if action == 0:
                car.angle += 5
                car.speed += 0
            elif action == 1:
                car.angle += 2
                if car.speed < 20:
                    car.speed += 1
            elif action == 2:
                car.angle += 2
                if car.speed - 1 >= 12:
                    car.speed -= 1
            elif action == 3:
                car.angle += 0
                if car.speed < 20:
                    car.speed += 1
            elif action == 4:
                car.angle += 0
                car.speed += 0
            elif action == 5:
                car.angle += 0
                if car.speed - 1 >= 12:
                    car.speed -= 1
            elif action == 6:
                car.angle -= 2
                if car.speed < 20:
                    car.speed += 1
            elif action == 7:
                car.angle -= 2
                if car.speed - 1 >= 12:
                    car.speed -= 1
            else:
                car.angle -= 5
                car.speed += 0

            screen.blit(game_map, (0, 0))
            car.update(game_map)
            car.draw(screen)
            pygame.display.flip()
            clock.tick(60)

            observation_, reward, done = car.get_data(), car.get_reward_optimized(game_map,
                                                                                   agent.mem_cntr), not car.is_alive()
            score += reward
            agent.store_transition(observation, action, reward, observation_, done)
            agent.learn_double_dqp(i)
            observation = observation_
            alive_count += 1

        if score > optimal_score_and_episode[0]:
            print("-----optimal_score_and_episode updated form {} to {}.".format(optimal_score_and_episode,
                                                                                 [score, i]))
            optimal_score_and_episode = [score, i]
            state = {
                'eval_state': agent.Q_eval.state_dict(),
                'target_state': agent.Q_target.state_dict(),
                'eval_optimizer_state': agent.Q_eval.optimizer.state_dict(),
                'target_optimizer_state': agent.Q_target.optimizer.state_dict(),
                'optimal_score_and_episode': optimal_score_and_episode
            }
            torch.save(state, f'./dqn_eval.pth')
            # TODO:待改进,加载模型创建临时环境让小车随即初始化位置跑若干次,并且跑的时候保留最低的探索率,
            #      取score平均值,平均值大的模型保存下来

        scores.append(score)
        eps_history.append(agent.epsilon)
        avg_score = np.mean(scores[-100:])
        average_scores.append(avg_score)
        distance.append(car.distance)
        avg_distance = np.mean(distance[-100:])
        average_distance.append(avg_distance)
        alive_counts.append(alive_count)
        avg_alive_count = np.mean(alive_counts[-100:])
        average_alive_counts.append(avg_alive_count)
        # 打印当前学习率(调试用)
        current_lr = agent.lr_scheduler.get_last_lr()[0]

        print(
            f'episode: {i}, current_lr: {current_lr}, score= {round(score, 2)}, epsilon= {round(agent.epsilon, 3)},'
            f' avg_score= {round(avg_score, 2)}, distance= {round(car.distance)}, loss= {round(agent.loss_value)}, '
            f'alive_count= {round(alive_count)}, mem_cntr= {agent.mem_cntr}')

    plt.subplot(1, 2, 1)
    plt.plot([i for i in range(0, n_games)], average_scores)
    plt.title("average_scores")
    plt.subplot(1, 2, 2)
    plt.plot([i for i in range(0, n_games)], average_distance)
    plt.title("average_distance")
    plt.show()


if __name__ == '__main__':
    train()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2371867.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Linux】Linux工具(1)

3.Linux工具&#xff08;1&#xff09; 文章目录 3.Linux工具&#xff08;1&#xff09;Linux 软件包管理器 yum什么是软件包关于 rzsz查看软件包——yum list命令如何安装软件如何卸载软件补充——yum如何找到要安装软件的下载地址 Linux开发工具Linux编辑器-vim使用1.vim的基…

基于 Spring Boot 瑞吉外卖系统开发(十一)

基于 Spring Boot 瑞吉外卖系统开发&#xff08;十一&#xff09; 菜品启售和停售 “批量启售”、“批量停售”、操作列的售卖状态绑定单击事件&#xff0c;触发单击事件时&#xff0c;最终携带需要修改售卖状态的菜品id以post请求方式向“/dish/status/{params.status}”发送…

深入理解负载均衡:传输层与应用层的原理与实战

目录 前言1. 传输层&#xff08;Layer 4&#xff09;负载均衡1.1 工作层级与核心机制1.2 实现方式详解1.3 优缺点分析1.4 典型实现工具 2. 应用层&#xff08;Layer 7&#xff09;负载均衡2.1 工作层级与核心机制2.2 实现方式解析2.3 优缺点分析2.4 常用实现工具 3. Layer 4 与…

WPF之Slider控件详解

文章目录 1. 概述2. 基本属性2.1 值范围属性2.2 滑动步长属性2.3 刻度显示属性2.4 方向属性2.5 选择范围属性 3. 事件处理3.1 值变化事件3.2 滑块拖动事件 4. 样式和模板自定义4.1 基本样式设置4.2 控件模板自定义 5. 数据绑定5.1 绑定到ViewModel5.2 同步多个控件 6. 实际应用…

企业微信自建消息推送应用

企业微信自建应用来推送消息 前言 最近有个给特定部门推送消息的需求&#xff0c;所以配置一个应用专门用来推送消息。实现过程大致为&#xff1a;服务器生成每天的报告&#xff0c;通过调用API来发送消息。以前一直都是发邮件&#xff0c;整个邮箱里全是报告文件&#xff0c…

日志之ClickHouse部署及替换ELK中的Elasticsearch

文章目录 1 ELK替换1.1 Elasticsearch vs ClickHouse1.2 环境部署1.2.1 zookeeper 集群部署1.2.2 Kafka 集群部署1.2.3 FileBeat 部署1.2.4 clickhouse 部署1.2.4.1 准备步骤1.2.4.2 添加官方存储库1.2.4.3 部署&启动&连接1.2.4.5 基本配置服务1.2.4.6 测试创建数据库和…

解构与重构:自动化测试框架的进阶认知之旅

目录 一、自动化测试的介绍 &#xff08;一&#xff09;自动化测试的起源与发展 &#xff08;二&#xff09;自动化测试的定义与目标 &#xff08;三&#xff09;自动化测试的适用场景 二、什么是自动化测试框架 &#xff08;一&#xff09;自动化测试框架的定义 &#x…

DockerDesktop替换方案

背景 由于DockerDesktop并非开源软件&#xff0c;如果在公司使用&#xff0c;可能就有一些限制&#xff0c;那是不是除了使用DockerDesktop外&#xff0c;就没其它办法了呢&#xff0c;现在咱们来说说替换方案。 WSL WSL是什么&#xff0c;可自行百度&#xff0c;这里引用WS…

力扣热题100之搜索二维矩阵 II

题目 编写一个高效的算法来搜索 m x n 矩阵 matrix 中的一个目标值 target 。该矩阵具有以下特性&#xff1a; 每行的元素从左到右升序排列。 每列的元素从上到下升序排列。 代码 方法一&#xff1a;直接全体遍历 这个方法很直接&#xff0c;但是居然没有超时&#xff0c…

docker操作镜像-以mysql为例

Docker安装使用-CSDN博客 docker操作镜像-以mysql为例 当安装一个新的镜像时可以登录https://hub.docker.com/直接搜索想要安装的镜像&#xff0c;查看文档 1&#xff09;拉取镜像 docker pull mysql 或者 docker pull mysql:版本号 然后直接跳到第4&#xff09;步即可 2…

使用OpenCV 和 Dlib 进行卷积神经网络人脸检测

文章目录 引言1.准备工作2.代码解析2.1 导入必要的库2.2 加载CNN人脸检测模型2.3 加载并预处理图像2.4 进行人脸检测2.5 绘制检测结果2.6 显示结果 3.完整代码4.性能考虑5.总结 引言 人脸检测是计算机视觉中最基础也最重要的任务之一。今天我将分享如何使用dlib库中的CNN人脸检…

React 实现 JWT 登录验证的最小可运行示例

下面是一个用 React 实现 JWT 登录验证的最小可运行示例&#xff0c;包含&#xff1a; React 前端&#xff1a;登录、保存 Token、获取用户数据。模拟后端&#xff1a;用 mock API&#xff08;你也可以接真后端&#xff09;。 &#x1f9f1; 技术栈 React&#xff08;使用 Vi…

Power Query精通指南1:查询结构设计、数据类型、数据导入与迁移(平面文件、Excel、Web)

文章目录 零、Power Query简介0.1 Power Query 主要功能0.2 Power Query 的优势0.3 Power Query 组件 一、Power Query数据处理基本流程1.1 前期准备1.2 提取1.3 转换1.3.1 Power Query 编辑器界面1.3.2 默认转换1.3.3 自定义转换 1.4 加载1.4.1 自动检测数据类型1.4.2 重命名查…

vue2开发者sass预处理注意

vue2开发者sass预处理注意 sass的预处理器&#xff0c;早年使用node-sass&#xff0c;也就是vue2最初默认的编译器。 sass官方推出了dart-sass来替代。 node-sass已经停维很久了。 vue3默认使用的是dart-sass。 Uniapp的官方文档截图 从 HBuilderX 4.56 &#xff0c;vue2 …

使用 Selenium 爬取动态网页数据 —— 实战与坑点详解

本文记录了笔者在爬取网页数据过程中遇到的各种技术挑战&#xff0c;包括页面动态渲染、JavaScript 注入等问题&#xff0c;并最终给出一个可运行的完整方案。 文章目录 网页获取不到数据&#x1f680; 尝试用 Selenium 渲染页面 网页获取不到数据 某网页数据依赖大量 JavaSc…

守护数字家园:个人博客安全防护指南

前言 在之前的文章《WordPress个人博客搭建&#xff08;一&#xff09;》《WordPress个人博客搭建&#xff08;二&#xff09;》《WordPress个人博客搭建&#xff08;三&#xff09;》中&#xff0c;我们已经在非凡云云服务器上&#xff0c;借助1Panel搭建起属于自己的数字庭院…

【网络编程】三、TCP网络套接字编程

文章目录 TCP通信流程Ⅰ. 服务器日志类实现Ⅱ. TCP服务端1、服务器创建流程2、创建套接字 -- socket3、绑定服务器 -- bind&#x1f38f;4、服务器监听 -- listen&#x1f38f;5、获取客户端连接请求 -- acceptaccept函数返回的套接字描述符是什么&#xff0c;不是已经有一个了…

trae ai编程工具

Trae&#xff0c;致力于成为真正的 AI 工程师&#xff08;The Real Al Engineer&#xff09;。Trae 旗下的 AI IDE 产品&#xff0c;以智能生产力为核心&#xff0c;无缝融入你的开发流程&#xff0c;与你默契配合&#xff0c;更高质量、高效率完成每一个任务。 版本差异 国内…

神经网络发展历程——积跬步至千里

神经网络类型层线性or非线性创新问题备注感知器单层线性模型&#xff0c;输出 1 1 1&#xff0c; − 1 -1 −1误差反馈学习阈值函数不可导&#xff0c;构造学习规则与感知器准则等价线性神经元单层线性模型梯度下降法训练参数线性函数&#xff0c;多层仍是线性变换本质上是最小…

荣耀A8互动娱乐组件部署实录(第2部分:界面逻辑与资源加载机制)

作者&#xff1a;从 Spine 骨骼动画里抠图三小时没睡的美术兼前端苦工 一、界面整体架构拆解 荣耀A8组件采用的是典型的分模块 UI 架构&#xff0c;即&#xff1a;主界面为入口容器&#xff0c;不同子页面&#xff08;如商城、银行、客服、游戏入口&#xff09;以逻辑功能划分…