Q learning算法
代码仓库:https://github.com/daiyizheng/DL/tree/master/09-rl
Q Learning是强化学习算法中的一个经典算法。在一个决策过程中,我们不知道完整的计算模型,所以需要我们去不停的尝试。
算法流程

整体流程如下:
- Q-table 初始化 第一步是创建 Q-table,作为跟踪每个状态下的每个动作和相关进度的地方
- Observation. 代理需要观察环境的当前状态
- Action.智能体选择在环境中行动。动作完成后,模型会观察该动作是否对环境有益。
- Update.采取行动后,用结果更新 Q-table
- Repeat 重复步骤 2-4,直到模型达到预期目标的终止状态。
数学公式
 
     
      
       
       
         Q 
        
       
         ( 
        
       
         s 
        
       
         , 
        
       
         a 
        
       
         ) 
        
       
         = 
        
       
         Q 
        
       
         ( 
        
       
         s 
        
       
         , 
        
       
         a 
        
       
         ) 
        
       
         + 
        
       
         α 
        
       
         ∗ 
        
       
         ( 
        
       
         r 
        
       
         + 
        
       
         γ 
        
       
         ∗ 
        
       
         m 
        
       
         a 
        
       
         x 
        
       
         ( 
        
       
         Q 
        
       
         ( 
        
       
         s 
        
       
         ’ 
        
       
         , 
        
       
         a 
        
       
         ’ 
        
       
         ) 
        
       
         ) 
        
       
         − 
        
       
         Q 
        
       
         ( 
        
       
         s 
        
       
         , 
        
       
         a 
        
       
         ) 
        
       
         ) 
        
       
      
        Q(s,a) = Q(s,a) + α * (r + γ * max(Q(s’,a’)) - Q(s,a)) 
       
      
    Q(s,a)=Q(s,a)+α∗(r+γ∗max(Q(s’,a’))−Q(s,a))
 该等式分解如下:
- Q(s, a) 表示在状态 s 中采取行动 a 的预期奖励。
- 该动作收到的实际奖励由 r 引用,而 s’ 指的是下一个状态。
- 学习率是 α,γ 是折扣因子。
- 状态 s’ 中所有可能的动作 a’ 的最高预期奖励由 max(Q(s’, a’)) 表示。
代码
基于表格的简单价值学习
- 构建环境
import gym
#定义环境
class MyWrapper(gym.Wrapper):
    def __init__(self):
        #is_slippery控制会不会滑
        env = gym.make('FrozenLake-v1',
                       render_mode='rgb_array',
                       is_slippery=False)
        super().__init__(env)
        self.env = env
    def reset(self):
        state, _ = self.env.reset()
        return state
    def step(self, action):
        state, reward, terminated, truncated, info = self.env.step(action)
        over = terminated or truncated
        #走一步扣一份,逼迫机器人尽快结束游戏
        if not over:
            reward = -1
        #掉坑扣100分
        if over and reward == 0:
            reward = -100
        return state, reward, over
    #打印游戏图像
    def show(self):
        from matplotlib import pyplot as plt
        plt.figure(figsize=(3, 3))
        plt.imshow(self.env.render())
        plt.show()
env = MyWrapper()
env.reset()
env.show()
- 构建Q 表
import numpy as np
#初始化Q表,定义了每个状态下每个动作的价值
Q = np.zeros((16, 4))
Q
- 记录数据
from IPython import display
import random
#玩一局游戏并记录数据
def play(show=False):
    data = []
    reward_sum = 0
    state = env.reset()
    over = False
    while not over:
        action = Q[state].argmax()
        if random.random() < 0.1:
            action = env.action_space.sample()
        next_state, reward, over = env.step(action)
        data.append((state, action, reward, next_state, over))
        reward_sum += reward
        state = next_state
        if show:
            display.clear_output(wait=True)
            env.show()
    return data, reward_sum
play()[-1]
#数据池
class Pool:
    def __init__(self):
        self.pool = []
    def __len__(self):
        return len(self.pool)
    def __getitem__(self, i):
        return self.pool[i]
    #更新动作池
    def update(self):
        #每次更新不少于N条新数据
        old_len = len(self.pool)
        while len(pool) - old_len < 200:
            self.pool.extend(play()[0])
        #只保留最新的N条数据
        self.pool = self.pool[-1_0000:]
    #获取一批数据样本
    def sample(self):
        return random.choice(self.pool)
pool = Pool()
pool.update()
len(pool), pool[0]
- 训练
#训练
'''
Brain of the agent 探索者的大脑!
agent will make desicion here 用于做决策
Q(s,a) <- Q(s,a) + Alpha * [r + gamma * max(Q(s', a')) - Q(s,a)]
下面是Q——table表: (状态:行,行为:列)
        up    down    left    right   
state1  
state2
  .
  .
  .     
'''
def train():
    #共更新N轮数据
    for epoch in range(1000):
        pool.update()
        #每次更新数据后,训练N次
        for i in range(200):
            #随机抽一条数据
            state, action, reward, next_state, over = pool.sample()
            #Q矩阵当前估计的state下action的价值
            value = Q[state, action]
            #实际玩了之后得到的reward+下一个状态的价值*0.9
            target = reward + Q[next_state].max() * 0.9
            #value和target应该是相等的,说明Q矩阵的评估准确
            #如果有误差,则应该以target为准更新Q表,修正它的偏差
            #这就是TD误差,指评估值之间的偏差,以实际成分高的评估为准进行修正
            update = (target - value) * 0.1
            #更新Q表
            Q[state, action] += update
        if epoch % 100 == 0:
            print(epoch, len(pool), play()[-1])
train()



















