Qlearning是一种基于价值的强化学习方法 下面是个简单的例子来自周莫烦

Q-learning

一个一维的世界 -o---T

o:agent
T:target
action{'left','right'}

基本参数

γ\gamma 是对未来reward的衰减

α<1\alpha < 1 是学习率

ϵ\epsilon 引入随机性,有1ϵ1-\epsilon的概率随机选择一个action

import numpy as np
import pandas as pd
import time

N_STATES = 6   # 1维世界的宽度
ACTIONS = ['left', 'right']     # 探索者的可用动作
EPSILON = 0.9   # 贪婪度 greedy
ALPHA = 0.1     # 学习率
GAMMA = 0.9    # 奖励递减值
MAX_EPISODES = 13   # 最大回合数
FRESH_TIME = 0.3    # 移动间隔时间

Q-table

Q-laerning 记录对应状态和行为的value,记录在Qtable
Qtable也就是agent的行为准则

def build_q_table(n_states, actions):
    table = pd.DataFrame(
        np.zeros((n_states, len(actions))),     # q_table 全 0 初始
        columns=actions,    # columns 对应的是行为名称
    )
    return table

# q_table:
"""
   left  right
0   0.0    0.0
1   0.0    0.0
2   0.0    0.0
3   0.0    0.0
4   0.0    0.0
5   0.0    0.0
"""
'\n   left  right\n0   0.0    0.0\n1   0.0    0.0\n2   0.0    0.0\n3   0.0    0.0\n4   0.0    0.0\n5   0.0    0.0\n'

定义动作方法

根据Qtable和当前state来选择进行的动作
greedy?

# 在某个 state 地点, 选择行为
def choose_action(state, q_table):
    state_actions = q_table.iloc[state, :]  # 选出这个 state 的所有 action 值
    if (np.random.uniform() > EPSILON) or (state_actions.all() == 0):  # 非贪婪 or 或者这个 state 还没有探索过
        action_name = np.random.choice(ACTIONS)
    else:
        action_name = state_actions.argmax()    # 贪婪模式
    return action_name

定义环境反馈

env对于对应的上个state,下个state,做出的action做出reward
这里例子的reward是唯一且确定的

def get_env_feedback(S, A):
    # This is how agent will interact with the environment
    if A == 'right':    # move right
        if S == N_STATES - 2:   # terminate
            S_ = 'terminal'
            R = 1
        else:
            S_ = S + 1
            R = 0
    else:   # move left
        R = 0
        if S == 0:
            S_ = S  # reach the wall
        else:
            S_ = S - 1
    return S_, R

环境更新

???

def update_env(S, episode, step_counter):
    # This is how environment be updated
    env_list = ['-']*(N_STATES-1) + ['T']   # '---------T' our environment
    if S == 'terminal':
        interaction = 'Episode %s: total_steps = %s' % (episode+1, step_counter)
        print('\r{}'.format(interaction), end='')
        time.sleep(2)
        print('\r                                ', end='')
    else:
        env_list[S] = 'o'
        interaction = ''.join(env_list)
        print('\r{}'.format(interaction), end='')
        time.sleep(FRESH_TIME)

Q-learning主要循环

Initialize Q(s,a)

Repeat(for each episode):

Initialize s
Repeat (for each step of episode)
Choose a from s using policy derived from Q(e.g.,epsilon-greedy)
Take action a,Observe reward r,next state s'

Q(s,a)=Q(s,a)+α[r+γ×maxa×Q(s,a)Q(s,a)]Q(s,a)= Q(s,a)+ \alpha[r+\gamma×max_{a'}×Q(s',a')-Q(s,a)]

s = s'
until s is terminal

s state

a action

r reward

def rl():
    q_table = build_q_table(N_STATES, ACTIONS)  # 初始 q table
    for episode in range(MAX_EPISODES):     # 回合
        step_counter = 0
        S = 0   # 回合初始位置
        is_terminated = False   # 是否回合结束
        update_env(S, episode, step_counter)    # 环境更新
        while not is_terminated:

            A = choose_action(S, q_table)   # 选行为
            S_, R = get_env_feedback(S, A)  # 实施行为并得到环境的反馈
            q_predict = q_table.loc[S, A]    # 估算的(状态-行为)值
            if S_ != 'terminal':
                q_target = R + GAMMA * q_table.iloc[S_, :].max()   #  实际的(状态-行为)值 (回合没结束)
            else:
                q_target = R     #  实际的(状态-行为)值 (回合结束)
                is_terminated = True    # terminate this episode

            q_table.loc[S, A] += ALPHA * (q_target - q_predict)  #  q_table 更新
            S = S_  # 探索者移动到下一个 state

            update_env(S, episode, step_counter+1)  # 环境更新

            step_counter += 1
    return q_table
if __name__ == "__main__":
    q_table = rl()
    print('\r\nQ-table:\n')
    print(q_table)
---o-T

Q-table:

       left     right
0  0.000029  0.005061
1  0.000029  0.026896
2  0.000018  0.111724
3  0.000073  0.343331
4  0.000810  0.745813
5  0.000000  0.000000

Q-Learning Class

import numpy as np
import pandas as pd

class QLearningTable:
    def __init__(self, env, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9):
        self.lr = learning_rate # 学习率
        self.gamma = reward_decay   # 奖励衰减
        self.epsilon = e_greedy     # 贪婪度
        if isinstance(env.observation_space,gym.spaces.discrete.Discrete):
            self.state_dim = env.observation_space.n
        else:
            self.state_dim = env.observation_space.shape[0]
        if isinstance(env.action_space,gym.spaces.discrete.Discrete):
            self.action_dim = env.action_space.n   
        else:
            self.action_dim = env.action_space.shape[0]
            
        
        self.q_table = pd.DataFrame(columns=list(range(0,self.action_dim)) ,dtype=np.float)   # 初始 q_table
        

        
    def choose_action(self, observation):
        observation = str(observation)
        self.check_state_exist(observation) 
        # 选择 action
        if np.random.uniform() < self.epsilon:  # 选择 Q value 最高的 action
            state_action = self.q_table.loc[observation, :]
            state_action = state_action.reindex(np.random.permutation(state_action.index))
            action = state_action.argmax()
        else:   # 随机选择 action
            action = np.random.choice(self.q_table.columns)
        return action
   
    def learn(self, state, action, reward, next_state,done):
        next_state = str(next_state)
        state = str(state)
        self.check_state_exist(next_state)  # 检测 q_table 中是否存在 s_ (见后面标题内容)
        q_predict = self.q_table.loc[state, action]
        if done==False:
            q_target = reward + self.gamma * self.q_table.loc[next_state, :].max()  # 下个 state 不是 终止符
        else:
            q_target = reward
        self.q_table.loc[state, action] += self.lr * (q_target - q_predict)  # 更新对应的 state-action 值

    def check_state_exist(self, state):
        if state not in self.q_table.index:
            # append new state to q table
            self.q_table = self.q_table.append(
                pd.Series(
                    [0.001]*self.action_dim,
                    index=self.q_table.columns,
                    name=state,
                )
            )

Q-Learning on gym

查看安装的env

from gym import envs
#print(envs.registry.all())

创建环境和QTable

对于CartPole,返回的observation是一个list,[x,x_dot,theta,theta_dot]

%pdb
import pandas as pd
import numpy as np
import gym

learn_episode = 1000
learn_step = 100
env = gym.make('CartPole-v0')
Qt = QLearningTable(env)

学习QTable

%pdb on
for i in range(learn_episode):
    observation = env.reset()
    for step in range(learn_step):
        action = Qt.choose_action(observation)
        next_observation,reward,done,_ = env.step(action)
        Qt.learn(next_state=next_observation,state=observation,action=action,reward=reward,done=done)
        observation = next_observation
        if done:
            break
print(Qt.q_table)
[22974 rows x 2 columns]

显然简单粗暴的将观察转化为字符串得到的结果较差。

测试

for i_episode in range(10):
    observation = env.reset()
    while(True):
        env.render()
        #print(observation)
        action = Qt.choose_action(observation)
        observation, reward, done, info = env.step(action)
        if done:
            print("Episode finished after {} timesteps".format(t+1))
            break
Episode finished after 200 timesteps
	 reward 1.0.
Episode finished after 200 timesteps
	 reward 1.0....

离散状态空间

从上面的cartpole的结果来看,Q-Learning对于连续性状态空间问题的解决存在局限性,Q-Table的维度可能达到一个很大的尺寸,下面尝试用来尝试解决离散状态空间的问题

这里用gym内的FrozenLake8x8-v0,同样的是定义环境,学习Q-Table,之后进行测试

import pandas as pd
import numpy as np
import gym

learn_episode = 1000
learn_step = 200
env = gym.make('FrozenLake-v0')
Qt = QLearningTable(env,learning_rate=0.1)
for i in range(learn_episode):
    observation = env.reset()
    for step in range(learn_step):
        action = Qt.choose_action(observation)
        next_observation,reward,done,_ = env.step(action)
        
        Qt.learn(next_state=next_observation,state=observation,action=action,reward=reward,done=done)
        observation = next_observation
        if done:
            break
print(Qt.q_table)

/home/sangyj/.conda/envs/tf/lib/python3.6/site-packages/ipykernel_launcher.py:32: FutureWarning: 'argmax' is deprecated, use 'idxmax' instead. The behavior of 'argmax'
will be corrected to return the positional maximum in the future.
Use 'series.values.argmax' to get the position of the maximum now.


           0         1         2         3
0   0.064842  0.055531  0.060236  0.053204
1   0.018737  0.039729  0.025952  0.056370
5   0.001000  0.001000  0.001000  0.001000
4   0.086859  0.071700  0.048331  0.047095
8   0.075819  0.122097  0.088587  0.149676
12  0.001000  0.001000  0.001000  0.001000
9   0.119481  0.228482  0.133091  0.097725
2   0.072173  0.055193  0.038618  0.037949
6   0.054548  0.077508  0.080189  0.003905
3   0.000688  0.003873  0.001489  0.029156
7   0.001000  0.001000  0.001000  0.001000
13  0.130018  0.223768  0.276910  0.201894
10  0.290034  0.163259  0.211654  0.084711
11  0.001000  0.001000  0.001000  0.001000
14  0.232816  0.423925  0.433946  0.431733
15  0.001000  0.001000  0.001000  0.001000
a=0
for i_episode in range(100):
    observation = env.reset()
    for t in range(learn_step):
        #temp = env.render()
        #print(observation)
        action = Qt.choose_action(observation)
        observation, reward, done, info = env.step(action)
        if done and reward>0:
            print("Episode finished after {} timesteps".format(t+1))
            print("\t reward {}.".format(reward))
            a=a+1
            break
print(a)
            
Episode finished after 32 timesteps
	 reward 1.0.
Episode finished after 29 timesteps
	 reward 1.0.
Episode finished after 66 timesteps
	 reward 1.0.
...
Episode finished after 53 timesteps
	 reward 1.0.
Episode finished after 34 timesteps
	 reward 1.0.
41#成功次数

结果的优化

在上面的结果中最后的reward与步数无关,当reward随着步数的增多而降低时,可以得到一条最优的路径???

Q-learning的问题

Q-learning 适用的状态和动作空间非常小;另一方面但如果一个状态从未出现过,Q-learning 是无法处理的。也就是说 Q-learning 压根没有预测能力,也就是没有泛化能力。
所以下一步转到基于Q-learning的DQN