Open AI Gym Box2D BipedalWalkerをColaboratoryで動かしてみる(6)
前回はDDPGをざくっと解説してみました。
今回はDDPGでBipedalWalkerを学習させてみます。
1.BipedalWalker-v2の報酬について
Open AI Gymのgithubサイトにあるソースリスト
gym-envs-box2d-bipedal_walker.pyの関数stepを見ると報酬は次のようになっています。
- ゲームオーバ(体が地面に接地、もしくは領域外まで後ろに下がる)
reward = -100 - 上記以外
shaping = 130*pos[0]/SCALE-5.0*abs(state[0])
reward = shaping - prev_shaping - 0.00035 * MOTORS_TORQUE * np.clip(np.abs(a), 0, 1)
進んだ距離:前に進みゴール位置で総報酬=300
姿勢 :水平で総報酬0、傾いている分だけ減点
モータ :モータトルクを使った分だけ報酬を減点
2.DDPGを試してみる
動かしてみる(4)で紹介したソースリストのコメントアウトを変更するとBipedalWalkerを動かせます。
(1)環境を変更
変更前
#env = gym.make('BipedalWalker-v2')
env = gym.make('MountainCarContinuous-v0')
変更後
env = gym.make('BipedalWalker-v2')
#env = gym.make('MountainCarContinuous-v0')
(2)エピソード回数、獲得報酬表示
変更前
# end while loop ------------------------------
print('episode,reward=',i_episode,reward_total)
変更後
# end while loop ------------------------------
if (steps_done % 10 == 0) or (i_episode == num_episodes -1 ):
# print('episode,reward=',i_episode,reward_total)
print('episode,reward=',steps_done,reward_total)
・学習回数が多いので毎回表示ではなく10回に1回表示に変更。
・学習の様子を見て追加で学習させた時の総エピソード数を知るため、実行のたびに0からカウントされるi_epsodeのかわりに、リセットされないsteps_doneを表示。
(3)パラメータ変更
#学習用リプレイメモリ生成
memory = ReplayMemory(1000000)
・
・
#BATCH_SIZE = 128
BATCH_SIZE = 128
#Qネット学習時の報酬の割引率
GAMMA = 0.98
#steps_doneに対するノイズの減少係数
SIGMA_START = 1.0 #最初
SIGMA_END = 0.3 #最後
SIGMA_DECAY = 800 #この回数で約30%まで減衰
#50 フレーム/sec グラフ描画用
FPS = 50
#学習数。num_epsode=200を変更する。
num_episodes = 500
(4)最大ステップ数削減
計算時間がかかるので最大ステップ数を1400→700に変更しました。
while not done and t < 700:
#指令値生成------------------------------------
(5)結果
学習を何回か繰り返しました。うまくいったケースでは学習数約1000回でぎこちなくスキップして進んでいきました。ですが、さらに学習を進め1500回では、すぐに転んでしまいました。
1000回
episode,reward= 1000 35.9231961772401
2000回
反対側の脚も使うようになってきてちょっと期待できそうです。
episode,reward= 2000 -51.45643730989602
2500回
ちょこちょこ走り
episode,reward= 2500 208.7962870610689
3000回、3500回
ちょこちょこ走りのままですが、すぐに転んでしまいました。
4000回
ちょこちょこ結構走りましたが、2500回の総報酬には届かず。
6000回
episode,reward= 6000 256.1380908660748
7000回
while文のループを増やしたらゴールまで行けました。6000回の時にみられたおっとっとの動きもなく、安定したちょこちょこ走りでした。
episode,reward= 7001 306.013638338901
(6)一応ソースリストを貼り付けておきます。
ライブラリのインストールはOpen AI Gym Box2D BipedalWalkerをColaboratoryで動かしてみる(4)を参照してください。
学習前の処理
import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
from pyvirtualdisplay import Display
from scipy.interpolate import interp1d
display = Display(visible=0, size=(1024, 768))
display.start()
import os
#pyvirtualdisplayの仕様変更で次の文はエラーになるのでコメントアウトします(2021/6/5修正)
#os.environ["DISPLAY"] = ":" + str(display.display) + "." + str(display.screen)
#Bipedalwalkerのv2は無くなったのでv3に変更(2021/6/5)
env = gym.make('BipedalWalker-v3')
#env = gym.make('MountainCarContinuous-v0')
# if gpu is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#リプレイメモリー
Transition = namedtuple('Transition',
('state', 'action', 'next_state', 'reward','done'))
class ReplayMemory(object):
def __init__(self, capacity):
self.capacity = capacity
self.memory = []
self.position = 0
def push(self, *args):
"""Saves a transition."""
if len(self.memory) < self.capacity:
self.memory.append(None)
self.memory[self.position] = Transition(*args)
self.position = (self.position + 1) % self.capacity
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
def reset(self):
self.memory = []
self.position = 0
#Actor(Polycy) Network
class Policy_net(nn.Module):
def __init__(self, NumObs, NumAct,NumHidden):
super(Policy_net, self).__init__()
self.ln1 = nn.Linear(NumObs, NumHidden)
self.ln2 = nn.Linear(NumHidden, NumHidden)
self.ln3 = nn.Linear(NumHidden, NumHidden)
self.ln4 = nn.Linear(NumHidden, NumAct)
self.ln4.weight.data.uniform_(-3e-3, 3e-3)
def forward(self, x):
x = F.relu(self.ln1(x))
x = F.relu(self.ln2(x))
x = F.relu(self.ln3(x))
y = torch.tanh(self.ln4(x))
return y
#Critic(Q) Network Q値の計算
class Q_net(nn.Module):
def __init__(self, NumObs, NumAct,NumHidden1,NumHidden2):
super(Q_net, self).__init__()
self.ln1 = nn.Linear(NumObs, NumHidden1)
self.ln2 = nn.Linear(NumHidden1+NumAct, NumHidden2)
self.ln3 = nn.Linear(NumHidden2, NumHidden2)
self.ln4 = nn.Linear(NumHidden2, 1)
self.ln4.weight.data.uniform_(-3e-3, 3e-3)
self.bn1 = nn.BatchNorm1d(num_features=NumHidden2)
def forward(self, state, action):
x = F.leaky_relu(self.ln1(state))
x = torch.cat((x, action), dim=1)
# x = F.leaky_relu(self.bn1(self.ln2(x)))
x = F.leaky_relu(self.ln2(x))
x = F.leaky_relu(self.ln3(x))
y = self.ln4(x)
return y
def optimize_model():
if len(memory) < BATCH_SIZE:
return
transitions = memory.sample(BATCH_SIZE)
batch = Transition(*zip(*transitions))
#batchを一括して処理するためtensorのリストを2次元テンソルに変換
state_batch = torch.stack(batch.state,dim=0)
action_batch = torch.stack(batch.action,dim=0)
reward_batch = torch.stack(batch.reward,dim=0)
next_state_batch = torch.stack(batch.next_state,dim=0)
done_batch = torch.stack(batch.done,dim=0)
#Qネットのloss関数計算
next_actions = policy_target_net(next_state_batch)
next_Q_values =q_target_net(next_state_batch ,next_actions)
expected_Q_values = (next_Q_values * GAMMA)*(1.0-done_batch) + reward_batch
Q_values = q_net(state_batch ,action_batch)
#Qネットのloss関数
q_loss = F.mse_loss(Q_values,expected_Q_values)
#Qネットの学習
q_optimizer.zero_grad()
#誤差逆伝搬
q_loss.backward()
#重み更新
q_optimizer.step()
#policyネットのloss関数
actions = policy_net(state_batch)
p_loss = -q_net(state_batch,actions).mean()
#policyネットの学習
p_optimizer.zero_grad()
#誤差逆伝搬
p_loss.backward()
#重み更新
p_optimizer.step()
tau = 0.001
#targetネットのソフトアップデート
#学習の収束を安定させるためゆっくり学習するtarget netを作りloss関数の計算に使う。
#学習後のネット重み×tau分を反映させ、ゆっくり追従させる。
for target_param, local_param in zip(policy_target_net.parameters(), policy_net.parameters()):
target_param.data.copy_(tau*local_param.data + (1.0-tau)*target_param.data)
for target_param, local_param in zip(q_target_net.parameters(), q_net.parameters()):
target_param.data.copy_(tau*local_param.data + (1.0-tau)*target_param.data)
#action,observationの要素数取得
n_actions = env.action_space.shape[0]
n_observations = env.observation_space.shape[0]
print('num actions,observations',n_actions,n_observations )
#ネットワーク
#policyネットとそのtargetネット
policy_net = Policy_net(n_observations, n_actions,128).to(device)
policy_target_net = Policy_net(n_observations, n_actions,128).to(device)
#Qネットとそのtargetネット
q_net = Q_net(n_observations, n_actions, 64, 128).to(device)
q_target_net = Q_net(n_observations, n_actions, 64, 128).to(device)
#学習用optimizer生成
p_optimizer = optim.Adam(policy_net.parameters(),lr=1e-3)
q_optimizer = optim.Adam(q_net.parameters(),lr=3e-3, weight_decay=0.0001)
#学習用リプレイメモリ生成
memory = ReplayMemory(1000000)
memory.reset()
#ノイズの大きさ計算用、最初は大きくして学習が進んだら小さくするためのカウンタ
steps_done = 0
学習する。エピソード1000になっているので、何度か繰り返すか、値を変更して実行してください。
#BATCH_SIZE = 128
BATCH_SIZE = 128
#Qネット学習時の報酬の割引率
GAMMA = 0.98
#steps_doneに対するノイズの減少係数
SIGMA_START = 1.0 #最初
SIGMA_END = 0.3 #最後
SIGMA_DECAY = 800 #この回数で約30%まで減衰
#50 フレーム/sec グラフ描画用
FPS = 50
#学習数。num_epsode=200を変更する。
num_episodes = 1000
for i_episode in range(num_episodes):
#whileループ内初期化
observation = env.reset()
done = False
reward_total = 0.0
t = 0
#グラフ用データ保存領域初期化
frames = []
observations = []
actions = []
ts = []
for i in range(n_observations):
observations.append([])
for i in range(n_actions):
actions.append([])
noise =np.array([random.uniform(-0.5,0.5) for i in range(n_actions)],dtype = np.float)
theta = 0.08
sigma = SIGMA_END + (SIGMA_START - SIGMA_END) * math.exp(-1. * steps_done / SIGMA_DECAY)
steps_done += 1
while not done and t < 700:
#指令値生成------------------------------------
sample = random.random()
with torch.no_grad():
action = policy_net(torch.from_numpy(observation).float().to(device))
action = action.cpu().data.numpy()
#最後は純粋にネットワークのデータを取得するためノイズ無し-------------------
if (i_episode != num_episodes -1 ):
#OUNoise
noise = noise - theta * noise + sigma * np.array([random.uniform(-1.0,1.0) for i in range(len(noise))])
action += noise
action = np.clip(action, -1, 1)
#物理モデル1ステップ---------------------------
next_observation, reward, done, info = env.step(action)
reward_total = reward_total + reward
#学習用にメモリに保存--------------------------
tensor_observation = torch.tensor(observation,device=device,dtype=torch.float)
tensor_action = torch.tensor(action,device=device,dtype=torch.float)
tensor_next_observation = torch.tensor(next_observation,device=device,dtype=torch.float)
tensor_reward = torch.tensor([reward],device=device,dtype=torch.float)
tensor_done = torch.tensor([done],device=device,dtype=torch.float)
memory.push(tensor_observation, tensor_action, tensor_next_observation, tensor_reward,tensor_done)
#observation(state)更新------------------------
observation = next_observation
#学習してpolicy_net,target_neを更新
optimize_model()
#データ保存------------------------------------
if i_episode == num_episodes -1 :
#動画
frames.append(env.render(mode = 'rgb_array'))
#グラフ
for i in range(n_observations):
observations[i].append(observation[i])
for i in range(n_actions):
actions[i].append(action[i])
ts.append(t/FPS)
#時間を進める----------------------------------
t += 1
# end while loop ------------------------------
if (steps_done % 10 == 0) or (i_episode == num_episodes -1 ):
# print('episode,reward=',i_episode,reward_total)
print('episode,reward=',steps_done,reward_total)
# end for loop --------------------------------------
動画を表示する
import matplotlib.pyplot as plt
import matplotlib.animation
import numpy as np
from IPython.display import HTML
fig2 = plt.figure(figsize=(6, 6))
plt.axis('off')
images = []
for f in frames:
image = plt.imshow(f)
images.append([image])
ani = matplotlib.animation.ArtistAnimation(fig2, images, interval=30, repeat_delay=1)
HTML(ani.to_jshtml())
BipedalWalker、とりあえず(思ったよりも)何とか動きました。