强化学习(三) - Gym库介绍和使用,Markov决策程序实例,动态规划决策实例
強化學習(三) - Gym庫介紹和使用,Markov決策程序實例,動態規劃決策實例
1. 引言
在這個部分補充之前馬爾科夫決策和動態規劃部分的代碼。在以后的內容我會把相關代碼都附到相關內容的后面。本部分代碼和將來的代碼會參考《深度強化學習原理與python實現》與Udacity的課程《Reinforcement Learning》。
2. Gym庫
Gym庫(http://gym.openai.com/)是OpenAI推出的強化學習實驗環境庫,它用python語言實現了離散時間智能體/環境接口中的環境部分。除了依賴少量的商業庫外,整個項目時開源免費的。
Gym庫內置上百種實驗環境,包括以下幾類。
- 算法環境:包括一些字符串處理等傳統計算機方法的實驗環境。
- 簡單文本環境:包括幾個用文本表示的簡單游戲。
- 經典控制環境:包括一些簡單幾何體運動,常用于經典強化學習算法的研究。
- Atari游戲環境:包括數十個Atari 2600游戲,具有像素化的圖形界面,希望玩家盡可能奪得高分。
- MuJoCo環境:利用收費的MuJoCo運動引擎進行連續性的控制任務。
- 機械控制環境:關于機械臂的抓取和控制等。
2.1 Gym庫安裝
Python版本要求為 3.5+(在以后的內容中我將使用python 3.6)。然后就可以是使用pip安裝Gym庫
首先升級pip,
pip install --upgrade pip
以下命令為最小安裝Gym環境,
pip install gym
利用源代碼構建
如果愿意,還可以直接克隆gym Git存儲庫。若想要修改Gym本身或添加環境時,這是特別有用的。下載和安裝使用:
git clone https://github.com/openai/gym
cd gym
pip install -e .
之后可以運行pip install -e .[all]來執行包含所有環境的完整安裝。這需要安裝多個涉及的依賴項,包括cmake和一個最近的pip版本。
2.2 Gym庫使用
2.2.1 環境(enviroment)
這里有一個運行程序的最簡單的例子。這將為1000個時間步驟運行CartPole-v0環境的一個實例,并在每個步驟中呈現環境。你應該看到一個窗口彈出渲染經典的車桿問題:
import gym
env = gym.make('CartPole-v0')
env.reset()
for _ in range(1000):env.render()env.step(env.action_space.sample()) # 采取隨機行動
env.close()
輸出如下:
通常,我們將結束模擬之前,車桿被允許離開屏幕。現在,忽略關于調用step()的警告,即使這個環境已經返回done = True。
如果希望運行其他一些環境,可以嘗試用類似MountainCar-v0、MsPacman-v0(需要Atari依賴項)或Hopper-v1(需要MuJoCo依賴項)來替換上面的cartpole-v0。環境都來自于Env基類。
若丟失了任何依賴項,應該會得到錯誤消息來提示丟失了什么。安裝缺少的依賴項通常非常簡單。對于Hopper-v1需要一個的MuJoCo許可證。
代碼解釋
導入Gym庫:
import gym
在導入Gym庫后,可以通過make()函數來得到環境對象。每一個環境都有一個ID,它是形如“Xxxxx-v0”的字符串,如“CartPloe-v0”等。環境名稱的最后的部分表示版本號,不同版本的環境可能有不同的行為。使用取出環境“CartPloe-v0”的代碼為:
env = gym.make('CartPole-v0')
想要查看當前的Gym庫已經注冊了哪些環境,可以使用以下代碼:
from gym import envs
env_specs = envs.registry.all()
env_ids = [env_spec.id for env_spec in env_specs]
env_ids
每個環境都定義了自己的觀測空間和動作空間。環境env的觀測空間用env.observation_space表示,動作空間用env.action_space表示。觀測空間和動作空間既可以是離散空間(即取值是有限個離散的值),也可以使連續的空間(即取值是連續的)。在Gym庫中,離散空間一般用gym.spaces.Discrete類表示,連續空間用gym.spaces.Box類表示。例如,環境‘MountainCar-v0’的動作空間是Box(2,)表示觀測可以用2個float值表示;環境‘MountainCar-v0’的動作空間是Dicrete(3),表示動作取自{0,1,2}。對于離散空間gym.spaces.Discrete類實例成員n表示有幾個可能的取值;對于連續空間,Box類實例的成員low和high表示每個浮點數的取值范圍。
接下來使用環境對象env。首先初始化環境對象env,
env.reset()
該調用能返回智能體的初始觀測,是np.array對象。
環境初始化之后就可以使用了。使用環境的核心是使用環境對象的step()方法,具體內容會在下面介紹。
env.step()的參數需要取自動作空間。可以使用以下語句從動作空間中隨機選取一個動作。
action = env.action_space.sample()
每次調用env.step()只會讓環境前進一步。所以,env.step()往往放在循環結構里,通過循環調用來完成整個回合。
在env.reset()或env.step()后,可以使用以下語句以圖形化的方法顯示當前的環境。
env.render()
使用完環境后,可以使用下列語句關閉環境。
env.close()
如果繪制和實驗的圖形界面窗口,那么關閉該窗口的最佳方式是調用env.close()。試圖直接關閉圖形界面可能會導致內存不能釋放,甚至會導致死機。
測試智能體在Gym庫中某個任務的性能時,學術界一般最關心100個回合的平均回合獎勵。對于有些環境,還會制定一個參考的回合獎勵值,當連續100個回合的獎勵大于指定的值時,就認為這個任務被解決了。但是,并不是所有的任務都指定了這樣的值。對于沒有指定值的任務,就無所謂任務被解決了或者沒有被解決。
2.2.2 觀測(observation)
如果我們想得到一個更好的結果,那我們就不能在每一步都采取隨機的行動。我們需要知道我們的行動對環境做了什么可能會更好。
環境的step函數返回的正是我們需要的東西。事實上,step函數會返回四個值。這四個值是
- observation (object): 一個特定環境的對象,代表智能體對環境的觀察。例如,來自攝像頭的像素數據,機器人的關節角度和關節速度,或者棋盤游戲中的棋盤狀態,和env.reset()返回值的意義相同。。
- reward(float):前一個動作所獲得的獎勵量。不同環境下的比例不同,但目標始終是增加你的總獎勵。
- done (boolean):是否要再次重置環境。大多數(但不是全部)任務都被劃分為定義明確的事件,done為True表示該事件已經終止。(例如,可能是桿子翻得太厲害,或者失去了最后的生命。)
- info(dict):對調試有用的診斷信息。它有時會對學習有用(例如,它可能包含環境最后一次狀態變化背后的原始概率)。然而,你的智能體的評估是不允許使用它來學習的。
這只是經典的 "智能體-環境循環 "的一個實現。每一個時間步,智能體選擇一個動作(action),環境返回一個觀察(observation)和一個獎勵(reward)。
import gym
env = gym.make('CartPole-v0')
for i_episode in range(20):observation = env.reset()for t in range(100):env.render()print(observation)action = env.action_space.sample()observation, reward, done, info = env.step(action)if done:print("Episode finished after {} timesteps".format(t+1))break
env.close()
輸出如下:
[-0.061586 -0.75893141 0.05793238 1.15547541]
[-0.07676463 -0.95475889 0.08104189 1.46574644]
[-0.0958598 -1.15077434 0.11035682 1.78260485]
[-0.11887529 -0.95705275 0.14600892 1.5261692 ]
[-0.13801635 -0.7639636 0.1765323 1.28239155]
[-0.15329562 -0.57147373 0.20218013 1.04977545]
Episode finished after 14 timesteps
[-0.02786724 0.00361763 -0.03938967 -0.01611184]
[-0.02779488 -0.19091794 -0.03971191 0.26388759]
[-0.03161324 0.00474768 -0.03443415 -0.04105167]
2.2.3 空間(spaces)
在上面的示例中,我們從環境的操作空間中隨機取樣操作。但這些行為到底是什么呢?每個環境都帶有一個action_space和一個observation_space。這些屬性類型為Space,它們描述了有效動作和觀察的格式:
import gym
env = gym.make('CartPole-v0')
print(env.action_space)
#> Discrete(2)
print(env.observation_space)
#> Box(4,)
離散(Discrete)空間允許一個固定的非負數范圍,因此在本例中有效的動作(action)要么是0要么是1。Box空間表示一個n維的盒子,因此有效的觀察結果將是一個4個數字的數組。我們也可以檢查Box的邊界:
print(env.observation_space.high)
#> array([ 2.4 , inf, 0.20943951, inf])
print(env.observation_space.low)
#> array([-2.4 , -inf, -0.20943951, -inf])
這種內省有助于編寫適用于許多不同環境的泛型代碼。Box和離散空間是最常見的空間(space)。我們可以從一個空間取樣或檢查某物是否屬于它:
from gym import spaces
space = spaces.Discrete(8) # Set with 8 elements {0, 1, 2, ..., 7}
x = space.sample()
assert space.contains(x)
assert space.n == 8
2.2.4 程序實例
這里我們通過一個完整的例子來學習如何與Gym庫中的環境交互,選用經典控制任務,小車上山(MountainCar-v0)。這里主要介紹交互的代碼,而不詳細說明這個控制任務及其求解。
首先我們看一下這個任務的觀測空間的動作空間
import gym
env = gym.make('MountainCar-v0')
print('觀測空間 = {}'.format(env.observation_space))
print('動作空間 = {}'.format(env.action_space))
print('觀測范圍 = {} ~ {}'.format(env.observation_space.low, env.observation_space.high))
print('動作數 = {}'.format(env.action_space.n))
輸出如下,
觀測空間 = Box(2,)
動作空間 = Discrete(3)
觀測范圍 = [-1.2 -0.07] ~ [0.6 0.07]
動作數 = 3
運行結果告訴我們,觀測空間是形狀為(2,)的浮點型np.array,而動作空間是取{0, 1, 2}的int型數值。
接下來考慮智能體。智能體往往是我們自己實現的。下面代碼給出了一個智能體類——BespokeAgent類。智能體的decide()方法實現了決策功能,而learn()方法實現了學習功能。BespokeAgent類是一個比較簡單的類,它只能根據給定的數學表達式進行決策,并且不能有效學習。所以它并不是一個真正意義上的強化學習智能體類。但是,用于演示智能體和環境的交互已經足夠了。
class BespokeAgent:def __init__(self, env):passdef decide(self, observation): # 決策position, velocity = observationlb = min(-0.09 * (position + 0.25) ** 2 + 0.03, 0.3 * (position + 0.9) ** 4 - 0.008)ub = -0.07 * (position + 0.38) ** 2 + 0.06if lb < velocity < ub:action = 2else:action = 0return action # 返回動作def learn(self, *args): # 學習passagent = BespokeAgent(env)
接下來我們試圖讓智能體與環境交互。play_once()函數可以讓智能體和環境交互一個回合。這個函數有4個參數。
- 參數env是環境類。
- 參數agent是智能體類。
- 參數render是bool類型變量,指示在運行過程中是否要圖形化顯示。如果函數參數render為True,那么在交互過程中會調用env.render()以顯示圖形化界面,而這個界面可以通過調用env.close()關閉。
- 參數train是bool類型的變量,指示在運行過程中是否訓練智能體。在訓練過程中應當設置為True,以調用agent.learn()函數;在測試過程中應當設置為False,使得智能體不變。
這個函數有一個返回值episode_reward,是float類型的數值,表示智能體與環境交互一個回合的回合總獎勵。
def play_montecarlo(env, agent, render=False, train=False):episode_reward = 0. # 記錄回合總獎勵,初始化為0observation = env.reset() # 重置游戲環境,開始新回合while True: # 不斷循環,直到回合結束if render: # 判斷是否顯示env.render() # 顯示圖形界面,圖形界面可以用env.close()語句關閉action = agent.decide(observation)next_observation, reward, done, _ = env.step(action) # 執行動作episode_reward += reward # 收集回合獎勵if train: # 判斷是否訓練智能體agent.learn(observation, action, reward, done) # 學習if done: # 回合結束,跳出循環breakobservation = next_observationreturn episode_reward # 返回回合總獎勵
我們可以用下列代碼讓智能體和環境交互一個回合,并在交互過程中圖形化顯示。交互完畢后,可用env.close()語句關閉圖形化界面。
env.seed(0) # 設置隨機數種子,只是為了讓結果可以精確復現,一般情況下可刪去
episode_reward = play_montecarlo(env, agent, render=True)
print('回合獎勵 ={}'.format(episode_reward))
env.close() # 此語句可關閉圖形界面
為了系統評估智能體的性能,下列代碼求出了連續交互100回合的平均回合獎勵。小車上山環境有一個參考的回合獎勵值-110,如果當連續100個回合的平均回合獎勵大于-110,則認為這個任務被解決了。BespokeAgent類對應的策略的平均回合獎勵大概就在-110左右。
episode_rewards = [play_montecarlo(env, agent) for _ in range(100)]
print('平均回合獎勵 ={}'.format(np.mean(episode_rewards)))
3.Markov 決策過程
3.1 實例:懸崖尋路
本節考慮Gym庫中懸崖尋路問題(CliffWalking-v0)。懸崖尋路問題是這樣一種回合制問題:在一個4×124 \times124×12的網格中,智能體最開始在左下角的網格,希望移動到右下角的網格。智能體每次可以在上下左右這四個方向中移動一步,每移動一步會懲罰一個單位的獎勵。但是,移動有以下的限制。
- 智能體不能移出網格。如果智能體想執行某個動作移出網格,那么就讓本步不移動。但是這個操作仍然會懲罰一個單位的獎勵。
- 如果智能體將要到達最下一排網格(幾開始網格和目標網格之間的10個網格),智能體會立刻回到開始網格,并懲罰100個單位的獎勵。這10個網格可被視為“懸崖”。
當智能體移動到終點時,回合結束,回合獎勵為各步獎勵相加。
3.1.1 實驗環境使用
以下代碼演示了如何倒入這個環境并查看這個環境的基本信息。
import gym
env = gym.make('CliffWalking-v0')
print('觀察空間 = {}'.format(env.observation_space))
print('動作空間 = {}'.format(env.action_space))
print('狀態數量 = {}, 動作數量= {}'.format(env.nS, env.nA))
print('地圖大小 = {}'.format(env.shape))
輸出為,
觀察空間 = Discrete(48)
動作空間 = Discrete(4)
狀態數量 = 48, = 4
地圖大小 = (4, 12)
這個環境是一個離散的Markov決策過程。在這個Markov決策過程中,每個狀態是取自S={0,1,...,46}\mathcal{S}= \{0,1,...,46\}S={0,1,...,46}的int型數值(加上終止狀態則為S+=0,1,...,46,47\mathcal{S}^+={0,1,...,46,47}S+=0,1,...,46,47):0表示向上,1表示向右,2表示向下,3表示向左。獎勵取自{?1,?100}\{-1,-100\}{?1,?100},遇到懸崖為-100,否則為-1。
下面代碼給出了用給出的策略運行一個回合的代碼。函數play_once()有兩個參數, 一個是環境對象,另外一個是策略policy,它是np.array類型的實例。
import numpy as np
def play_once(env, policy):total_reward = 0state = env.reset()loc = np.unravel_index(state, env.shape)print('狀態 = {} , 位置={} '.format(state, loc))while True:action = np.random.choice(env.nA, p=policy[state])# step函數,詳情見2.2.2 觀測next_state, reward, done, _ = env.step(action)print('狀態 = {}, 位置 ={}, 獎勵 ={}'.format(state, loc, reward))total_reward += rewardif done:breakstate = next_statereturn total_reward
以下代碼給出了一個最優策略。optimal_policy最優策略是在開始處向上,接著一 路向右,然后到最右邊時向下。
actions = np.ones(env.shape, dtype=int)
actions[-1, :] = 0
actions[:, -1] = 2
optimal_policy = np.eye(4)[actions.reshape(-1)]
下面的代碼用最優策略運行一個回合。采用最優策略,從開始網格到目標網格一共要 移動13步,回合總獎勵為-13。
total_reward = play_once(env, optimal_policy)
print('總獎勵 ={}'.format(total_reward))
輸出為,
狀態 = 36, 位置 =(3, 0), 獎勵 =-1
狀態 = 24, 位置 =(2, 0), 獎勵 =-1
狀態 = 25, 位置 =(2, 1), 獎勵 =-1
狀態 = 26, 位置 =(2, 2), 獎勵 =-1
狀態 = 27, 位置 =(2, 3), 獎勵 =-1
狀態 = 28, 位置 =(2, 4), 獎勵 =-1
狀態 = 29, 位置 =(2, 5), 獎勵 =-1
狀態 = 30, 位置 =(2, 6), 獎勵 =-1
狀態 = 31, 位置 =(2, 7), 獎勵 =-1
狀態 = 32, 位置 =(2, 8), 獎勵 =-1
狀態 = 33, 位置 =(2, 9), 獎勵 =-1
狀態 = 34, 位置 =(2, 10), 獎勵 =-1
狀態 = 35, 位置 =(2, 11), 獎勵 =-1
總獎勵 =-13
3.1.2 求解Bellman期望方程
接下來考慮策略評估。我們用Bellman期望方程求解給定策略的狀態價值和動作價值。 首先來看狀態價值。用狀態價值表示狀態價值的Bellmn期望方程為:
vπ(s)=∑aπ(a∣s)[∑s′,rp(s′,r∣s,a)[r+γvπ(s′)]],s∈Sv_\pi(s) = \sum_a \pi(a|s)[\sum_{s',r}p(s',r|s,a)[r+\gamma v_\pi(s')]],s \in \mathcal{S}vπ?(s)=a∑?π(a∣s)[s′,r∑?p(s′,r∣s,a)[r+γvπ?(s′)]],s∈S
這是一個線性方程組,其標準形式為
vπ(s)?γ∑a∑s′π(a∣s)p(s′∣s,a)vπ(s′)=∑aπ(a∣s)∑s′,rrp(s′,r∣s,a),s∈Sv_\pi(s)-\gamma\sum_{a}\sum_{s'}\pi(a|s)p(s'|s,a)v_\pi(s') =\sum_{a}\pi(a|s)\sum_{s',r}rp(s',r|s,a),s \in \mathcal{S}vπ?(s)?γa∑?s′∑?π(a∣s)p(s′∣s,a)vπ?(s′)=a∑?π(a∣s)s′,r∑?rp(s′,r∣s,a),s∈S
得到標準形式后就可以調用相關函數直接求解。得到狀態價值函數后,可以用狀態價值表示動作價值的Bellan方程,
qπ(s,a)=∑s′,rp(s′,r∣s,a)[r+γvπ(s′)],s∈S,a∈Aq_\pi(s,a) =\sum_{s',r}p(s',r|s,a)[r+\gamma v_\pi(s')],s \in \mathcal{S},a \in \mathcal{A}qπ?(s,a)=s′,r∑?p(s′,r∣s,a)[r+γvπ?(s′)],s∈S,a∈A
來求動作價值函數。
函數evaluate_bellman ()實現了上述功能。狀態價值求解部分用np.linalg.solve()函數求解標準形式的線性方程組。得到狀態價值后,直接計算得到動作價值.
環境的動力系統儲存在env.P里,它是一個元組列表,每個元組包括概率、下一狀態、獎勵\回合結束指示這4個部分。
# 用Bellman方程求解狀態價值和動作價值
def evaluate_bellman(env, policy, gamma=1.):a, b = np.eye(env.nS), np.zeros((env.nS))for state in range(env.nS - 1):for action in range(env.nA):pi = policy[state][action]# print('{}{}:pi:{}'.format(state, action, pi))# env.P中存儲環境的動力for p, next_state, reward, done in env.P[state][action]:a[state, next_state] -= (pi * gamma)b[state] += (pi * reward * p)# np.linalg.solve()函數求解標準形式的線性方程組v = np.linalg.solve(a, b)q = np.zeros((env.nS, env.nA))# 利用狀態價值函數求解動作價值函數for state in range(env.nS - 1):for action in range(env.nA):for p, next_state, reward, done in env.P[state][action]:q[state][action] += ((reward + gamma * v[next_state]) * p)return v, q
接下來使用evaluate_bellman()函數評估給出的策略。以下代碼分別評估了一個隨機策略和最優確定性策略,并輸出了狀態價值函數和動作價值函數。
評估隨機策略,
policy = np.random.uniform(size=(env.nS, env.nA))
policy = policy / np.sum(policy, axis=1)[:, np.newaxis]
state_values, action_values = evaluate_bellman(env, policy)
print('狀態價值 ={}'.format(state_values))
print('動作價值 ={}'.format(action_values))
狀態價值 =[-134140.09187241 -134085.47213402 -133971.92139818 -133616.23666259-133194.20403738 -132781.56536103 -132435.88940662 -128380.91096168...-133680.76568151 -130121.16858665 -115062.2396112 0. ]
動作價值 =[[-1.34141092e+05 -1.34086472e+05 -1.34152621e+05 -1.34141092e+05][-1.34086472e+05 -1.33972921e+05 -1.34152699e+05 -1.34141092e+05]...[ 0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]]
評估最優策略,
optimal_state_values, optimal_action_values = evaluate_bellman(env, optimal_policy)
print('最優狀態價值 = {}'.format(optimal_state_values))
print('最優動作價值 = {}'.format(optimal_action_values))
最優狀態價值 = [[-14. -13. -12. -11. -10. -9. -8. -7. -6. -5. -4. -3.][-13. -12. -11. -10. -9. -8. -7. -6. -5. -4. -3. -2.][-12. -11. -10. -9. -8. -7. -6. -5. -4. -3. -2. -1.][-13. -12. -11. -10. -9. -8. -7. -6. -5. -4. -3. 0.]]
最優動作價值 = [[ -15. -14. -14. -15.][ -14. -13. -13. -15.][ -13. -12. -12. -14.]...[ 0. 0. 0. 0.]]
3.1.3 求解Bellman最優方程
對于懸崖尋路這樣的數值環境,可以使用線性規劃求解。線性規劃問題的標準形式為:
minimise????????∑s∈Sv(s)over????????v(s),s∈Ss.t.???????v(s)?γ∑s′,rp(s′,r∣s,a)v?(s′)≥∑s′,rrp(s′,r∣s,a),s∈S\begin{aligned}\text{minimise} \ \ \ \ \ \ \ \ & \sum_{s \in \mathcal{S}}v(s) \\ \text{over} \ \ \ \ \ \ \ \ & v(s),s \in \mathcal{S}\\ \text{s.t.} \ \ \ \ \ \ \ &v(s)-\gamma\sum_{s',r}p(s',r|s,a)v_*(s')\geq\sum_{s',r}rp(s',r|s,a), s \in \mathcal{S} \end{aligned}minimise????????over????????s.t.????????s∈S∑?v(s)v(s),s∈Sv(s)?γs′,r∑?p(s′,r∣s,a)v??(s′)≥s′,r∑?rp(s′,r∣s,a),s∈S?
其中目標函數中狀態價值的系數已經被固定為1。也可以選擇其他正實數作為系數。
以下代碼使用scipy.optimize.linprog()函數來計算這個動態規劃問題。
scipy.optimize.linprog(c,A_ub = None,b_ub = None,A_eq = None,b_eq = None,bounds = None,method = 'interior-point',callback = None,options = None,x0 = None)
min?xcTxsuch?thatAubx≤bubAeqx=beql≤x≤u\begin{aligned}\min_{x} \ & c^T x \\\text{such that} & \ A_{ub}x \leq b_{ub} \\ & \ A_{eq}x = b_{eq} \\& \ l \leq x \leq u \end{aligned}xmin??such?that?cTx?Aub?x≤bub??Aeq?x=beq??l≤x≤u?
其中,c是價值向量;A_ub和b_ub對應線性不等式約束;A_eq和b_eq對應線性等式約束;bounds對應公式中的lb和ub,決策向量的下界和上界;method是求解器的類型,如單純形法.
官方幫助文檔可查看此處.
這個函數的 第0個參數是目標函數中各決策變量在目標函數中的系數,本例中都取1;第1個和第2個 參數是形如“Ax≤bAx≤bAx≤b”這樣的不等式約束的A和b的值。函數optimal_bellman ()剛開始就 計算得到這些值。scipy.optimize.linprog()還有關鍵字參數bounds,指定決策變量是否為有界的。本例中,決策變量都是無界的。無界也要顯式指定,不可以忽略。還有關鍵字參數 method確定優化方法。默認的優化方法不能處理不等式約束,這里選擇了能夠處理不等式約束的內點法(interior-point method)。
import scipy
def optimal_bellman(env, gamma=1.):p = np.zeros((env.nS, env.nA, env.nS))r = np.zeros((env.nS, env.nA))for state in range(env.nS - 1):for action in range(env.nA):for prob, next_state, reward, done in env.P[state][action]:p[state, action, next_state] += probr[state, action] += (reward * prob)c = np.ones(env.nS)a_ub = gamma * p.reshape(-1, env.nS) - np.repeat(np.eye(env.nS), env.nA, axis=0)b_ub = -r.reshape(-1)a_eq = np.zeros((0, env.nS))b_eq = np.zeros(0)bounds = [(None, None), ] * env.nSres = scipy.optimize.linprog(c, a_ub, b_ub, bounds=bounds, method='interior-point')v = res.xq = r + gamma * np.dot(p, v)return v, qoptimal_state_values, optimal_action_values = optimal_bellman(env)
print('最優狀態價值 ={}'.format(optimal_state_values))
print('最優動作價值 ={}'.format(optimal_action_values))
求得最優動作價值后,可以用argmax計算出最優確定策略。以下代碼給出了最優策略,
optimal_actions = optimal_action_values.argmax(axis=1)
print('最優策略 ={}'.format(optimal_actions))
在pycharm IDE下的總程序為
import gym
import numpy as np
import scipy# 單回合運行函數
def play_once(env, policy):total_reward = 0state = env.reset()while True:action = np.random.choice(env.nA, p=policy[state])next_state, reward, done, _ = env.step(action)# np.unravel_index函數的作用是獲取一個/組int類型的索引值在一個多維數組中的位置。loc = np.unravel_index(state, env.shape)print('狀態 = {}, 位置 ={}, 獎勵 ={}'.format(state, loc, reward))total_reward += rewardif done:breakstate = next_statereturn total_reward# 用Bellman方程求解狀態價值和動作價值
def evaluate_bellman(env, policy, gamma=1.):a, b = np.eye(env.nS), np.zeros((env.nS))for state in range(env.nS - 1):for action in range(env.nA):pi = policy[state][action]# print('{}{}:pi:{}'.format(state, action, pi))# env.P中存儲環境的動力for p, next_state, reward, done in env.P[state][action]:a[state, next_state] -= (pi * gamma)b[state] += (pi * reward * p)# np.linalg.solve()函數求解標準形式的線性方程組v = np.linalg.solve(a, b)q = np.zeros((env.nS, env.nA))# 利用狀態價值函數求解動作價值函數for state in range(env.nS - 1):for action in range(env.nA):for p, next_state, reward, done in env.P[state][action]:q[state][action] += ((reward + gamma * v[next_state]) * p)return v, q# 使用動態規劃求解最優策略
def optimal_bellman(env, gamma=1.):p = np.zeros((env.nS, env.nA, env.nS))r = np.zeros((env.nS, env.nA))for state in range(env.nS - 1):for action in range(env.nA):for prob, next_state, reward, done in env.P[state][action]:p[state, action, next_state] += probr[state, action] += (reward * prob)c = np.ones(env.nS)a_ub = gamma * p.reshape(-1, env.nS) - np.repeat(np.eye(env.nS), env.nA, axis=0)b_ub = -r.reshape(-1)a_eq = np.zeros((0, env.nS))b_eq = np.zeros(0)bounds = [(None, None), ] * env.nSres = scipy.optimize.linprog(c, a_ub, b_ub, bounds=bounds, method='interior-point')v = res.xq = r + gamma * np.dot(p, v)return v, q# 主函數
if __name__ == '__main__':env = gym.make('CliffWalking-v0')print('觀察空間 = {}'.format(env.observation_space))print('動作空間 = {}'.format(env.action_space))print('狀態數量 = {}, 動作數量= {}'.format(env.nS, env.nA))print('地圖大小 = {}'.format(env.shape))''' # 定義最優策略actions = np.ones(env.shape, dtype=int)actions[-1, :] = 0actions[:, -1] = 2optimal_policy = np.eye(4)[actions.reshape(-1)]total_reward = play_once(env, optimal_policy)print('總獎勵 ={}'.format(total_reward))optimal_state_values, optimal_action_values = evaluate_bellman(env, optimal_policy)print('最優狀態價值 = {}'.format(optimal_state_values.reshape(4, -1)))print('最優動作價值 = {}'.format(optimal_action_values))''''''# 定義隨機策略policy = np.random.uniform(size=(env.nS, env.nA))# 歸一化policy = policy / np.sum(policy, axis=1)[:, np.newaxis]state_values, action_values = evaluate_bellman(env, policy)print('狀態價值 ={}'.format(state_values))print('動作價值 ={}'.format(action_values))'''# 求解最優策略optimal_state_values, optimal_action_values = optimal_bellman(env)print('最優狀態價值 ={}'.format(optimal_state_values))print('最優動作價值 ={}'.format(optimal_action_values))optimal_actions = optimal_action_values.argmax(axis=1)print('最優策略 ={}'.format(optimal_actions))
輸出為,
觀察空間 = Discrete(48)
動作空間 = Discrete(4)
狀態數量 = 48, 動作數量= 4
地圖大小 = (4, 12)
最優狀態價值 =[-1.40000000e+01 -1.30000000e+01 -1.20000000e+01 -1.10000000e+01-1.00000000e+01 -9.00000000e+00 -8.00000000e+00 -7.00000000e+00-6.00000000e+00 -5.00000000e+00 -4.00000000e+00 -3.00000000e+00-1.30000000e+01 -1.20000000e+01 -1.10000000e+01 -1.00000000e+01-9.00000000e+00 -8.00000000e+00 -7.00000000e+00 -6.00000000e+00-5.00000000e+00 -4.00000000e+00 -3.00000000e+00 -2.00000000e+00-1.20000000e+01 -1.10000000e+01 -1.00000000e+01 -9.00000000e+00-8.00000000e+00 -7.00000000e+00 -6.00000000e+00 -5.00000000e+00-4.00000000e+00 -3.00000000e+00 -2.00000000e+00 -1.00000000e+00-1.30000000e+01 -1.20000000e+01 -1.10000000e+01 -1.00000000e+01-9.00000000e+00 -8.00000000e+00 -7.00000000e+00 -6.00000000e+00-5.00000000e+00 -4.00000000e+00 -9.99999999e-01 1.82291993e-11]
最優動作價值 =[[ -14.99999999 -13.99999999 -13.99999999 -14.99999999][ -13.99999999 -13. -13. -14.99999999]...[ 0. 0. 0. 0. ]]
最優策略 =[2 1 1 1 1 1 1 1 1 1 1 2
1 1 1 1 1 1 1 1 1 1 1 2
1 1 1 1 1 1 1 1 1 1 1 2
0 0 0 0 0 0 0 0 0 0 1 0]
4. 動態規劃
4.1 案例:冰面滑行
冰面滑行問題(FrozenLake-v0)是擴展庫Gym里內置的一個文本環境任務。該問題的背景如下:在一個大小為4x4的湖面上,有些地方結冰了,有些地方沒有結冰。我們可以用一個4x4的字符矩陣來表示湖面的情況,
SFFF
FHFH
FFFH
HFFG
其中字母“F”(Frozen)表示結冰的區域,字母“H”(Hole)表示未結冰的冰窟窿,字母“S”(Start)和字母“G”(Goal)分別表示移動任務的起點和目標。在這個湖面上要執行以下的移動任務:要從“S”處移動到“G”處。每一次移動,可以選擇“左”、“下”、“右”、“上”4個方向之一進行移動,每次移動一格。如果移動到G處,則回合結束,獲得1個單位的獎勵;如果移動到“H”處,則回合結束,沒有獲得獎勵;如果移動到其它字母,暫不獲得獎勵,可以繼續。由于冰面滑,所以實際移動的方向和想要移動的方向并不一定完全一致。例如,如果在某個地方想要左移,但是由于冰面滑,實際可能下移、右移或者上移。任務的目標是盡可能達到“G”處,以獲得獎勵。
4.1.1 實驗環境使用
首先我們來看如何使用擴展庫Gym中的環境。
用下列語句引入環境對象:
import gym
env = gym.make('FrozenLake-v0')
env = env.unwrapped
這個環境的狀態空間有16個不同的狀態{s1,s2,...,s15}\{s_1, s_2,...,s_{15}\}{s1?,s2?,...,s15?},表示當前處在哪一個位置;動作空間有4個不同的動作{a0,a1,a2,a3}\{a_0, a_1, a_2, a_3\}{a0?,a1?,a2?,a3?},分別表示上、下、左、右四個方向。在擴展庫Gym中,直接用int型數值來表示這些狀態和動作。下列代碼可以查看環境的狀態空間和動作空間,
print(env.observation_space)
print(env.action_space)
這個環境的動力系統儲存在env.P里。可以用下列方法查看某個狀態(如狀態14)某個動作(如右移)情況下的動力,
env.unwrapped.P[14][2]
它是一個元組列表,每個元組包括概率、下一狀態、獎勵和回合結束指示這4個部分。例如,env.P[14][2]返回元組列表[(0.333333333333333,14, 0.0, False),(0.333333333333333, 15, 1.0 True), (0.333333333333333333, 10, 0.0, False)]這表明:
p(s14,0∣s14,a2)=13p(s_{14}, 0|s_{14},a_2) = \frac{1}{3}p(s14?,0∣s14?,a2?)=31?
p(s15,1∣s14,a2)=13p(s_{15}, 1|s_{14},a_2) = \frac{1}{3}p(s15?,1∣s14?,a2?)=31?
p(s10,0∣s14,a2)=13p(s_{10}, 0|s_{14},a_2) = \frac{1}{3}p(s10?,0∣s14?,a2?)=31?
接下來我們來看怎么使用環境。像之前介紹的,要使用env.reset()和env.step來執行。執行一個回合的代碼如下所示,其中的play_policy()函數接受參數policy,這是一個16x4的np.array對象,表示策略π\piπ。
play_policy()函數返回一個浮點數,表示本回合的獎勵。
# 使用策略執行一個回合
def play_policy(env, policy, render=False):total_reward = 0.observation = env.reset()while True:if render:env.render() # 此行可顯示action = np.random.choice(env.action_space.n, p=policy[observation])observation, reward, done, _ = env.step(action)total_reward += reward # 統計回合獎勵if done: # 游戲結束breakreturn total_reward
接下來用剛剛定義的play_policy()函數來看看隨機策略的性能。下面代碼夠早了隨機策略random_policy,它對于任意的s∈S,a∈As \in \mathcal{S}, a \in \mathcal{A}s∈S,a∈A均有π(s,a)=1∣A∣\pi(s, a) = \frac{1}{|\mathcal{A}|}π(s,a)=∣A∣1?.運行下列代碼,可以求得隨機策略獲得獎勵的期望值。一般情況下的結果基本為0,這意味著隨機策略幾乎不可能成功到達目的地。
# 隨機策略random_policy = np.ones((env.unwrapped.nS, env.unwrapped.nA)) / env.unwrapped.nAepisode_rewards = [play_policy(env, random_policy) for _ in range(100)]print("隨機策略 平均獎勵 :{}".format(np.mean(episode_rewards)))
4.1.2 有模型策略迭代求解
這個部分實現策略評估、策略提升和策略迭代。
首先來看策略評估。以下代碼給出了策略評估的代碼。它首先定義了函數v2q(),這個函數可以根據狀態價值函數計算含有某個狀態的動作價值函數。利用者函數evaluate_policy()函數迭代結算了給定策略policy的狀態價值。這個函數使用theta作為精度控制的參數。之后的代碼測試了evaluate_policy()函數。它首先求得了隨機策略的狀態價值函數,然后利用v2q求得動作價值函數。
# 策略評估的實現
def v2q(env, v, s=None, gamma=1.):if s is not None:q = np.zeros(env.unwrapped.nA)for a in range(env.unwrapped.nA):for prob, next_state, reward, done in env.unwrapped.P[s][a]:q[a] += prob * (reward + gamma * v[next_state] * (1. - done))else:q = np.zeros((env.unwrapped.nS, env.unwrapped.nA))for s in range(env.unwrapped.nS):q[s] = v2q(env, v, s, gamma)return q# 定義策略評估函數
def evaluate_policy(env, policy, gamma=1., tolerant=1e-6):v = np.zeros(env.unwrapped.nS)while True:delta = 0for s in range(env.unwrapped.nS):vs = sum(policy[s] * v2q(env, v, s, gamma))delta = max(delta, abs(v[s]-vs))v[s] = vsif delta < tolerant:breakreturn v
# 對隨機策略進行策略評估
print('狀態價值函數:')
v_random = evaluate_policy(env, random_policy)
print(v_random.reshape(4, 4))print('動作價值函數:')
q_random = v2q(env, v_random)
print(q_random)
接下來看看策略改進。improve_policy()函數實現了策略改進算法。輸入的策略是policy,改進后的策略直接覆蓋原有的policy。該函數返回一個bool類型的值,表示輸入的策略是不是最優策略。之后的代碼測試了improve_policy()函數,它對隨機策略進行改進,得到了一個確定性策略。
# 策略改進的實現
def improve_policy(env, v, policy, gamma=1.):optimal = Truefor s in range(env.unwrappped.nS):q = v2q(env, v, s, gamma)a = np.argmax(q)if policy[s][a] != 1:optimal = Falsepolicy[s] = 0.policy[s][a] = 1.return optimal
# 對隨機策略進行策略改進
policy = random_policy.copy()
optimal = improve_policy(env, v_random, policy)
if optimal:print('無更新, 最優策略為:')
else:print('有更新,最優策略為:')
print(policy)
實現了策略評估和策略改進后,我們就可以實現策略迭代。iterate_policy()函數實現了策略迭代算法。之后的代碼對iterate_policy()進行測試。針對冰面滑性問題,該代碼求得了最優策略,并進行了測試。
# 策略迭代的實現
def iterate_policy(env, gamma=1., tolerant=1e-6:policy = np.ones((env.unwrapped.nS, env.unwrapped.nA)) / env.wrapped.nAwhile True:v = evaluate_policy(env, policy, gamma, tolerant)if improve_policy(env, v, policy):breakreturn policy, v
# 利用策略迭代求解最優策略
policy_pi, v_pi = iterate_policy(env)
print('狀態價值函數 = ')
print(v_pi.reshape(4, 4)
print('最優策略 = ')
print(np.argmax(policy_pi, axis=1).reshape(4, 4))
4.1.3 有模型價值迭代求解
現在我們用價值迭代算法求解冰面滑行問題的最優策略。以下代碼中iterate_value()函數實現了價值迭代算法。這個函數使用參數tolerant來控制價值迭代的精度。之后的代碼在冰面滑行問題上測試了iterate_value()函數。
# 價值迭代的實現
def iterate_value(env, gamma=1, tolerant=1e-6):v = np.zeros(env.unwrapped.nS)while True:delta = 0for s in range(env.unwrapped.nS):vmax = max(v2q(env, v, s, gamma))delta = max(delta, abs(v[s]-vmax))v[s] = vmaxif delta < tolerant:breakpolicy = np.zeros((env.unwrapped.nS, env.unwrapped.nA))for s in range(env.unwrapped.nS):a = np.argmax(v2q(env, v, s, gamma))policy[s][a] = 1.return policy, v
# 利用策略迭代求解最優策略policy_pi, v_pi = iterate_policy(env)print('狀態價值函數 = ')print(v_pi.reshape(4, 4))print('最優策略 = ')print(np.argmax(policy_pi, axis=1).reshape(4, 4))print('價值迭代 平均獎勵:{}'.format(play_policy(env, policy_pi)))
策略迭代和價值迭代得到的最優價值函數和最優策略應該是一致的。最優狀態價值函數為:
[0.82350.82350.82350.82350.823500.529400.82350.82350.7647000.88240.94120]\begin{bmatrix}0.8235 &0.8235 &0.8235& 0.8235\\ 0.8235&0&0.5294&0\\ 0.8235&0.8235&0.7647& 0\\ 0&0.8824& 0.9412&0\end{bmatrix}?????0.82350.82350.82350?0.8235 00.82350.8824?0.82350.52940.76470.9412?0.8235000??????
最優策略為:
[0333000031000210]\begin{bmatrix}0&3&3&3\\ 0&0&0&0\\ 3&1&0&0\\ 0&2&1&0\end{bmatrix}?????0030?3012?3001?3000??????
在pycharm中可以運行的總程序如下,
import gym
import numpy as np# 使用策略執行一個回合
def play_policy(env, policy, render=False):total_reward = 0.observation = env.reset()while True:if render:env.render() # 此行可顯示action = np.random.choice(env.action_space.n, p=policy[observation])observation, reward, done, _ = env.step(action)total_reward += reward # 統計回合獎勵if done: # 游戲結束breakreturn total_reward# 策略評估的實現
def v2q(env, v, s=None, gamma=1.):if s is not None:q = np.zeros(env.unwrapped.nA)for a in range(env.unwrapped.nA):for prob, next_state, reward, done in env.unwrapped.P[s][a]:q[a] += prob * (reward + gamma * v[next_state] * (1. - done))else:q = np.zeros((env.unwrapped.nS, env.unwrapped.nA))for s in range(env.unwrapped.nS):q[s] = v2q(env, v, s, gamma)return q# 定義策略評估函數
def evaluate_policy(env, policy, gamma=1., tolerant=1e-6):v = np.zeros(env.unwrapped.nS)while True:delta = 0for s in range(env.unwrapped.nS):vs = sum(policy[s] * v2q(env, v, s, gamma))delta = max(delta, abs(v[s]-vs))v[s] = vsif delta < tolerant:breakreturn v# 策略改進
def improve_policy(env, v, policy, gamma=1.):optimal = Truefor s in range(env.unwrapped.nS):q = v2q(env, v, s, gamma)a = np.argmax(q)if policy[s][a] != 1:optimal = Falsepolicy[s] = 0.policy[s][a] = 1.return optimal# 策略迭代的實現
def iterate_policy(env, gamma=1., tolerant=1e-6):policy = np.ones((env.unwrapped.nS, env.unwrapped.nA)) / env.unwrapped.nAwhile True:v = evaluate_policy(env, policy, gamma, tolerant)if improve_policy(env, v, policy):breakreturn policy, v# 價值迭代的實現
def iterate_value(env, gamma=1, tolerant=1e-6):v = np.zeros(env.unwrapped.nS)while True:delta = 0for s in range(env.unwrapped.nS):vmax = max(v2q(env, v, s, gamma))delta = max(delta, abs(v[s]-vmax))v[s] = vmaxif delta < tolerant:breakpolicy = np.zeros((env.unwrapped.nS, env.unwrapped.nA))for s in range(env.unwrapped.nS):a = np.argmax(v2q(env, v, s, gamma))policy[s][a] = 1.return policy, vif __name__ == '__main__':env = gym.make('FrozenLake-v0')env = env.unwrappedrandom_policy = np.ones((env.unwrapped.nS, env.unwrapped.nA)) / env.unwrapped.nAepisode_rewards = [play_policy(env, random_policy) for _ in range(100)]print("隨機策略 平均獎勵 :{}".format(np.mean(episode_rewards)))print('狀態價值函數:')v_random = evaluate_policy(env, random_policy)print(v_random.reshape(4, 4))print('動作價值函數:')q_random = v2q(env, v_random)print(q_random)policy = random_policy.copy()optimal = improve_policy(env, v_random, policy)if optimal:print('無更新, 最優策略為:')else:print('有更新,最優策略為:')print(policy)# 利用策略迭代求解最優策略policy_pi, v_pi = iterate_policy(env)print('狀態價值函數 = ')print(v_pi.reshape(4, 4))print('最優策略 = ')print(np.argmax(policy_pi, axis=1).reshape(4, 4))print('價值迭代 平均獎勵:{}'.format(play_policy(env, policy_pi)))
隨機策略 平均獎勵 :0.04
狀態價值函數:
[[0.0139372 0.01162942 0.02095187 0.01047569][0.01624741 0. 0.04075119 0. ][0.03480561 0.08816967 0.14205297 0. ][0. 0.17582021 0.43929104 0. ]]
動作價值函數:
[[0.01470727 0.01393801 0.01393801 0.01316794][0.00852221 0.01162969 0.01086043 0.01550616][0.02444416 0.0209521 0.02405958 0.01435233][0.01047585 0.01047585 0.00698379 0.01396775][0.02166341 0.01701767 0.0162476 0.01006154][0. 0. 0. 0. ][0.05433495 0.04735099 0.05433495 0.00698396][0. 0. 0. 0. ][0.01701767 0.04099176 0.03480569 0.04640756][0.0702086 0.11755959 0.10595772 0.05895286][0.18940397 0.17582024 0.16001408 0.04297362][0. 0. 0. 0. ][0. 0. 0. 0. ][0.08799662 0.20503708 0.23442697 0.17582024][0.25238807 0.53837042 0.52711467 0.43929106][0. 0. 0. 0. ]]
有更新,最優策略為:
[[1. 0. 0. 0.][0. 0. 0. 1.][1. 0. 0. 0.][0. 0. 0. 1.][1. 0. 0. 0.][1. 0. 0. 0.][1. 0. 0. 0.][1. 0. 0. 0.][0. 0. 0. 1.][0. 1. 0. 0.][1. 0. 0. 0.][1. 0. 0. 0.][1. 0. 0. 0.][0. 0. 1. 0.][0. 1. 0. 0.][1. 0. 0. 0.]]
狀態價值函數 =
[[0.82351246 0.82350689 0.82350303 0.82350106][0.82351416 0. 0.5294002 0. ][0.82351683 0.82352026 0.76469786 0. ][0. 0.88234658 0.94117323 0. ]]
最優策略 =
[[0 3 3 3][0 0 0 0][3 1 0 0][0 2 1 0]]
價值迭代 平均獎勵:1.0
總結
以上是生活随笔為你收集整理的强化学习(三) - Gym库介绍和使用,Markov决策程序实例,动态规划决策实例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [编程]UML语言:理论之光与实践之惑
- 下一篇: 玩转开源 |Hugo 的使用实践