やってみた!

やってみた!

試したことを中心に、書評や興味のあること、思ったこととか

Open AI Gym Box2D BipedalWalkerをColaboratoryで動かしてみる(7)

 今回はおまけということで、DDPGに教師を追加してみました。記事の最初の方で作成したPD制御を教師として、DDPGの経験処理中に行動をアシストすると、学習に何か効果があるかを試しました。

 結果、最初は教師の影響を受けて大股で歩こうとしていたものの、最終的にはちょこちょこ走りになってしまいました。学習回数も教師無しより余計にかかってしまいました。これは教師自体がへたくそ(すぐつまづく)だったこともあると思いますが。。

 その他、Colaboratoryが動画保存中に異常終了することが多かったので、学習だけ実行してネットワークをGoogleドライブに保存し、後でネットワークを読み込んで動画を作るように等、少し改良しました。

 BipedalWalkerは本記事で最後なので、これまでの記事の繰り返しになりますが、やり方を一通り最初からまとめておきます。

1. Colaboratoryを立ち上げる

 ランタイム-ランタイムのタイプを変更でGPUにします。

2.ライブラリをインストールします。

!apt-get update
!pip3 install box2d-py
!pip3 install 'gym[Box2D]'
!pip3 install  'gym[classic_control]'
!apt-get -qq -y install xvfb freeglut3-dev ffmpeg
!pip3 -q install pyglet
!pip3 -q install pyopengl
!pip3 -q install pyvirtualdisplay
!apt-get install x11-utils

 3.Colaboratoryのランタイムを再起動します。

 インストールしたライブラリを有効にするためランタイムを再起動します。

 ランタイム-ランタイムの再起動

4.Googleドライブをマウントします。

 学習したモデルを保存するためGoogleドライブをマウントします。
 実行するとリンクが表示されるのでリンクをクリックして、アカウントを選択して表示されたコードをCtrl-Cでコピーして、Enter your authorization code:の四角のエリアにCtrl-Pでペーストして、Entキーを押します。

from google.colab import drive
drive.mount('/content/drive')

 5.学習前の処理

 ネットワーク、オプティマイザ、リプレイメモリ等を生成し初期化します。ちなみに、BatchNorm処理は試したのですが結局使っていません(コメントアウトしています)。

import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T

from pyvirtualdisplay import Display
from scipy.interpolate import interp1d

display = Display(visible=0, size=(1024, 768))
display.start()
import os
#pyvirtualdisplayの仕様変更で次の文はエラーになるのでコメントアウトします(2021/6/5修正)
#os.environ["DISPLAY"] = ":" + str(display.display) + "." + str(display.screen)

#Bipedalwalkerのv2は無くなったのでv3に変更(2021/6/5)
env = gym.make('BipedalWalker-v3')
#env = gym.make('MountainCarContinuous-v0')

# if gpu is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

#リプレイメモリー
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward','done'))

class ReplayMemory(object):
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        """Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)
   
    def reset(self):
        self.memory = []
        self.position = 0   

#Actor(Polycy) Network
class Policy_net(nn.Module):
    def __init__(self, NumObs, NumAct,NumHidden):
        super(Policy_net, self).__init__()
        self.ln1 = nn.Linear(NumObs, NumHidden)
        self.ln2 = nn.Linear(NumHidden, NumHidden)
        self.ln3 = nn.Linear(NumHidden, NumHidden)
        self.ln4 = nn.Linear(NumHidden, NumAct)
        self.ln4.weight.data.uniform_(-3e-3, 3e-3)

    def forward(self, x):
        x = F.relu(self.ln1(x))
        x = F.relu(self.ln2(x))
        x = F.relu(self.ln3(x))
        y = torch.tanh(self.ln4(x))
        return y

#Critic(Q) Network Q値の計算
class Q_net(nn.Module):
    def __init__(self, NumObs, NumAct,NumHidden1,NumHidden2):
        super(Q_net, self).__init__()
        self.ln1 = nn.Linear(NumObs, NumHidden1)
        self.ln2 = nn.Linear(NumHidden1+NumAct, NumHidden2)
        self.ln3 = nn.Linear(NumHidden2, NumHidden2)
        self.ln4 = nn.Linear(NumHidden2, 1)
        self.ln4.weight.data.uniform_(-3e-3, 3e-3)
        self.bn1 = nn.BatchNorm1d(num_features=NumHidden2)

    def forward(self, state, action):
        x = F.leaky_relu(self.ln1(state))
        x = torch.cat((x, action), dim=1)
#        x = F.leaky_relu(self.bn1(self.ln2(x)))
        x = F.leaky_relu(self.ln2(x))
        x = F.leaky_relu(self.ln3(x))
        y = self.ln4(x)
        return y

def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    batch = Transition(*zip(*transitions))

    #batchを一括して処理するためtensorのリストを2次元テンソルに変換
    state_batch = torch.stack(batch.state,dim=0)
    action_batch = torch.stack(batch.action,dim=0)
    reward_batch = torch.stack(batch.reward,dim=0)
    next_state_batch = torch.stack(batch.next_state,dim=0)
    done_batch = torch.stack(batch.done,dim=0)

    #Qネットのloss関数計算
    next_actions = policy_target_net(next_state_batch)
    next_Q_values =q_target_net(next_state_batch ,next_actions)
    expected_Q_values = (next_Q_values * GAMMA)*(1.0-done_batch) + reward_batch

    Q_values =  q_net(state_batch ,action_batch)

    #Qネットのloss関数
    q_loss = F.mse_loss(Q_values,expected_Q_values)

    #Qネットの学習
    q_optimizer.zero_grad()
    #誤差逆伝搬
    q_loss.backward()
    #重み更新
    q_optimizer.step()

    #policyネットのloss関数
    actions = policy_net(state_batch)
    p_loss = -q_net(state_batch,actions).mean()

    #policyネットの学習
    p_optimizer.zero_grad()
    #誤差逆伝搬
    p_loss.backward()
    #重み更新
    p_optimizer.step()

    tau = 0.001
    #targetネットのソフトアップデート
    #学習の収束を安定させるためゆっくり学習するtarget netを作りloss関数の計算に使う。
    #学習後のネット重み×tau分を反映させ、ゆっくり追従させる。
    for target_param, local_param in zip(policy_target_net.parameters(), policy_net.parameters()):
      target_param.data.copy_(tau*local_param.data + (1.0-tau)*target_param.data)

    for target_param, local_param in zip(q_target_net.parameters(), q_net.parameters()):
      target_param.data.copy_(tau*local_param.data + (1.0-tau)*target_param.data)

#action,observationの要素数取得
n_actions = env.action_space.shape[0]
n_observations = env.observation_space.shape[0]
print('num actions,observations',n_actions,n_observations )

#ネットワーク
#policyネットとそのtargetネット
policy_net = Policy_net(n_observations, n_actions,128).to(device)
policy_target_net = Policy_net(n_observations, n_actions,128).to(device)
#Qネットとそのtargetネット
q_net = Q_net(n_observations, n_actions, 64, 128).to(device)
q_target_net = Q_net(n_observations, n_actions, 64, 128).to(device)
#学習用optimizer生成
p_optimizer = optim.Adam(policy_net.parameters(),lr=1e-3)
q_optimizer = optim.Adam(q_net.parameters(),lr=3e-3, weight_decay=0.0001)
#学習用リプレイメモリ生成
memory = ReplayMemory(1000000)
memory.reset()
#ノイズの大きさ計算用、最初は大きくして学習が進んだら小さくするためのカウンタ
steps_done = 0

6.学習

 学習します。1000エピソードごとにネットワークをGoogleドライブに保存します。ファイル名はmodelxxxxtで、xxxxはエピソード数です。

 教師の影響を徐々に減衰するようにしてあって、減衰の仕方はSIGMA_T_START、SIGMA_T_END、SIGMA_T_DECAYで調整します。行動actionは次の式で計算します。

action += sigma_t * (action1 - action)

 sigma_tがそのEPSODEにおける減衰率、action1は教師値、actionはpolicyネットワークの予想する最適行動です。教師とpolicyネットワークの予想値との差を、sigma_tの割合だけ、actionを教師側に近い値に修正しています。教師が手を取って介助しているイメージです。

 ノイズはそのままだと教師の影響が薄れてしまうため若干小さめにしました。

#角度制御ゲイン
 #P(比例)項
pgain = np.array([4,4,4,4], dtype = 'float')
 #D(微分)項
dgain = np.array([0.2,0.2,0.2,0.2], dtype = 'float')

#脚の指令角スケジュール
xsch = np.array([0,1,2,3,4,5,6,7,8,9,10],dtype = 'float')
yhip1  = np.array([ 0.021, -0.39, -0.58, -0.54, 0.45,    0.18, 0.88,  0.78, 0.60,  0.20, 0.021],dtype = 'float')
yknee1 = np.array([  0.17,  0.85,  0.95,  0.92, -0.38, -0.05,  -0.34,  -0.090,  0.074, 0.043,  0.17],dtype = 'float')
yhip2  = np.array([ 0.18,  0.88,  0.78,  0.60,  0.20, 0.021, -0.39, -0.58, -0.54, 0.45,    0.18],dtype = 'float')
yknee2 = np.array([ -0.05,-0.34,-0.090,  0.074,  0.043 ,  0.17,  0.85, 0.95,  0.92, -0.38, 0.05],dtype = 'float')
numx = xsch.shape[0]

hip1demmand = interp1d(xsch, yhip1)
knee1demmand = interp1d(xsch, yknee1)
hip2demmand = interp1d(xsch, yhip2)
knee2demmand = interp1d(xsch, yknee2)

legangle = np.array([0,0,0,0], dtype = 'float')
leganglespeed = np.array([0,0,0,0], dtype = 'float')
legangledemand = np.array([0,0,0,0], dtype = 'float')

speed = 4.2

#BATCH_SIZE = 128
BATCH_SIZE = 128
#Qネット学習時の報酬の割引率
GAMMA = 0.98
#steps_doneに対するノイズの減少係数
SIGMA_START = 0.5 #最初
SIGMA_END = 0.3 #最後
SIGMA_DECAY = 800 #この回数で約30%まで減衰

#steps_doneに対する教師の減少係数
SIGMA_T_START = 0.4 #最初
SIGMA_T_END = 0.02 #最後
SIGMA_T_DECAY = 3000 #この回数で約30%まで減衰

#50 フレーム/sec グラフ描画用
FPS = 50
#学習数。num_epsode=200を変更する。
num_episodes = 7000

for i_episode in range(num_episodes):
    #whileループ内初期化
    x = 0.0
    observation = env.reset()
    done = False
    reward_total = 0.0
    t = 0
    #グラフ用データ保存領域初期化
    frames = []
    observations = []
    actions = []
    ts = []
    for i in range(n_observations):
      observations.append([])
    for i in range(n_actions):
      actions.append([])

    #NOISE
    noise =np.array([random.uniform(-0.5,0.5) for i in range(n_actions)],dtype = np.float)
    theta = 0.08
    sigma = SIGMA_END + (SIGMA_START - SIGMA_END) * math.exp(-1. * steps_done / SIGMA_DECAY)
    #TEACHER
    sigma_t = SIGMA_T_END + (SIGMA_T_START - SIGMA_T_END) * math.exp(-1. * steps_done / SIGMA_T_DECAY)
    steps_done += 1
    display_on = 0
    
    while not done and t < 700:
        #指令値生成------------------------------------
        x += speed/FPS
        if x>10.0 :x-=10
        legangledemand[0] = hip1demmand(x)
        legangledemand[1] = knee1demmand(x)
        legangledemand[2] = hip2demmand(x)
        legangledemand[3] = knee2demmand(x)

        #脚の角度 PD制御
        action1 = pgain * (legangledemand - legangle) - dgain * leganglespeed
        action1 = np.clip(action1, -0.6, 0.6)

        #----------------------------------------------
        with torch.no_grad():
          action = policy_net(torch.from_numpy(observation).float().to(device))
          action = action.cpu().data.numpy()

        if (i_episode != num_episodes -1 ):
          action += sigma_t * (action1 - action)

        #最後は純粋にネットワークのデータを取得するためノイズ無し-------------------
        if (i_episode != num_episodes -1 ):
          #OUNoise
          noise = noise - theta * noise + sigma * np.array([random.uniform(-1.0,1.0) for i in range(len(noise))])
          action += noise

        action = np.clip(action, -1, 1)

        #物理モデル1ステップ---------------------------
        next_observation, reward, done, info = env.step(action)
        reward_total = reward_total + reward

        #脚の角度,角速度,胴体角取り出し----------------
        legangle[0] = observation[4] #HIP1
        leganglespeed[0] = observation[5]
        legangle[1] = observation[6] #KNEE1
        leganglespeed[1] = observation[7]
        legangle[2] = observation[9] #HIP2
        leganglespeed[2] = observation[10]
        legangle[3] = observation[11] #KNEE2
        leganglespeed[3] = observation[11]

        #学習用にメモリに保存--------------------------
        tensor_observation = torch.tensor(observation,device=device,dtype=torch.float)
        tensor_action = torch.tensor(action,device=device,dtype=torch.float)
        tensor_next_observation = torch.tensor(next_observation,device=device,dtype=torch.float)
        tensor_reward = torch.tensor([reward],device=device,dtype=torch.float)
        tensor_done =  torch.tensor([done],device=device,dtype=torch.float)
        memory.push(tensor_observation, tensor_action, tensor_next_observation, tensor_reward,tensor_done)

        #observation(state)更新------------------------
        observation = next_observation

        #学習してpolicy_net,target_neを更新
        optimize_model()
        #データ保存------------------------------------
        if (i_episode == num_episodes -1) and (display_on == 1) :
          #動画
          frames.append(env.render(mode = 'rgb_array'))
          #グラフ
          for i in range(n_observations):
            observations[i].append(observation[i])
          for i in range(n_actions):
            actions[i].append(action[i])
          ts.append(t/FPS)

        #時間を進める----------------------------------
        t += 1
        # end while loop ------------------------------
    if (steps_done % 10 == 0) or (i_episode == num_episodes -1 ):
#      print('episode,reward=',i_episode,reward_total)
      print('episode,reward=',steps_done,reward_total)
    if (steps_done % 1000 == 0):
      torch.save({
        'epoch':steps_done,
        'policy':policy_net.state_dict(),
        'policy_target':policy_target_net.state_dict(),
        'p_optimizer':p_optimizer.state_dict(),
        'q':q_net.state_dict(),
        'q_target':q_target_net.state_dict(),
        'q_optimizer':q_optimizer.state_dict(),
      },'/content/drive/My Drive/model' + str(steps_done) + 't' )
  # end for loop --------------------------------------

7.ネットワーク読み込み

  読み込みたいネットワークにあわせて赤字は都度変更してください。

checkpoint = torch.load('/content/drive/My Drive/model4000t')
steps_done = checkpoint['epoch']
policy_net.load_state_dict(checkpoint['policy'])
policy_target_net.load_state_dict(checkpoint['policy_target'])
p_optimizer.load_state_dict(checkpoint['p_optimizer'])
q_net.load_state_dict(checkpoint['q'])
q_target_net.load_state_dict(checkpoint['q_target'])
q_optimizer.load_state_dict(checkpoint['q_optimizer'])

8.学習したネットワークを動かして動画を保存

 6.学習のソースを一部変更して動かします。

①num_episodesを1にする。

 #学習数。num_epsode=200を変更する。
 num_episodes = 1

②画像保存スイッチを入れる。

 display_on = 1

 ③実行します。

9.画像を表示

import matplotlib.pyplot as plt
import matplotlib.animation
import numpy as np
from IPython.display import HTML

fig2 = plt.figure(figsize=(6, 6))
plt.axis('off')

images = []
for f in frames:
  image = plt.imshow(f)
  images.append([image])
ani = matplotlib.animation.ArtistAnimation(fig2, images, interval=30, repeat_delay=1)

HTML(ani.to_jshtml())

10.まとめ

 教師をつけて試しにDDPGで学習をしてみました。結果は次の通りです。

  • 学習したpolicyネットワークは最初は教師の影響を受けて、教師に近い動き(大股歩き)をしようとしました。
  • 教師の影響が強いと、もともと教師自体がすぐにつまづくためか、学習したpolicyもすぐにつまづいてしまいました(10000回も学習すると教師よりは多少ましでしたが・・)
  • SIGMA_T_ENDを0.2→0.02に変更して教師の影響を小さくすると、徐々にちょこちょこ走りになっていきました。
  • 結局、教師無しと比較して、学習回数が減ったとか、大股で歩けるようになったとかいう具体的なメリットは得られませんでした

 今回はあまりいい結果ではありませんでしたが、DDPG学習に教師の影響を受けさせられることを確かめられたので、どこかで使えることもあるかもしれません。ということで前向きに考えることにします!(大分試すのに時間をとられましたが・・・)。

おしまい

 BipedalWalkerの記事はこれでおしまいです!。今回の記事は長く続きましたが、最後までお付き合い頂きありがとうございました。

 強化学習 カテゴリーの記事一覧 - やってみた!

Open AI Gym Box2D BipedalWalkerをColaboratoryで動かしてみる(6)

 前回はDDPGをざくっと解説してみました。

 今回はDDPGでBipedalWalkerを学習させてみます。

1.BipedalWalker-v2の報酬について

 Open AI Gymのgithubサイトにあるソースリスト

 https://github.com/openai/gym

 gym-envs-box2d-bipedal_walker.pyの関数stepを見ると報酬は次のようになっています。

  • ゲームオーバ(体が地面に接地、もしくは領域外まで後ろに下がる)
    reward = -100
  • 上記以外
    shaping = 130*pos[0]/SCALE-5.0*abs(state[0])
    reward = shaping - prev_shaping - 0.00035 * MOTORS_TORQUE * np.clip(np.abs(a), 0, 1)

    進んだ距離:前に進みゴール位置で報酬=300
    姿勢   :水平で報酬0、傾いている分だけ減点
    モータ  :モータトルクを使った分だけ報酬を減点

2.DDPGを試してみる

 動かしてみる(4)で紹介したソースリストのコメントアウトを変更するとBipedalWalkerを動かせます。

(1)環境を変更 

変更前
#env = gym.make('BipedalWalker-v2')
env = gym.make('MountainCarContinuous-v0')

変更後
env = gym.make('BipedalWalker-v2')
#env = gym.make('MountainCarContinuous-v0')

(2)エピソード回数、獲得報酬表示

変更前

        # end while loop ------------------------------
    print('episode,reward=',i_episode,reward_total)

変更後

        # end while loop ------------------------------
    if (steps_done % 10 == 0) or (i_episode == num_episodes -1 ):
#      print('episode,reward=',i_episode,reward_total)
      print('episode,reward=',steps_done,reward_total)

・学習回数が多いので毎回表示ではなく10回に1回表示に変更。
・学習の様子を見て追加で学習させた時の総エピソード数を知るため、実行のたびに0からカウントされるi_epsodeのかわりに、リセットされないsteps_doneを表示。

(3)パラメータ変更

#学習用リプレイメモリ生成
memory = ReplayMemory(1000000)


#BATCH_SIZE = 128
BATCH_SIZE = 128
#Qネット学習時の報酬の割引率
GAMMA = 0.98
#steps_doneに対するノイズの減少係数
SIGMA_START = 1.0 #最初
SIGMA_END = 0.3 #最後
SIGMA_DECAY = 800 #この回数で約30%まで減衰

#50 フレーム/sec グラフ描画用
FPS = 50
#学習数。num_epsode=200を変更する。
num_episodes = 500

(4)最大ステップ数削減
 計算時間がかかるので最大ステップ数を1400→700に変更しました。

while not done and t < 700:
#指令値生成------------------------------------

(5)結果
 学習を何回か繰り返しました。うまくいったケースでは学習数約1000回でぎこちなくスキップして進んでいきました。ですが、さらに学習を進め1500回では、すぐに転んでしまいました。

1000回

f:id:akifukka:20191226153720g:plain

episode,reward= 1000 35.9231961772401

 

2000回
反対側の脚も使うようになってきてちょっと期待できそうです。

f:id:akifukka:20191226160713g:plain

episode,reward= 2000 -51.45643730989602

 

2500回
ちょこちょこ走り

f:id:akifukka:20191226163953g:plain

episode,reward= 2500 208.7962870610689

 3000回、3500回
ちょこちょこ走りのままですが、すぐに転んでしまいました。

4000回
ちょこちょこ結構走りましたが、2500回の総報酬には届かず。

6000回

f:id:akifukka:20191226214324g:plain

episode,reward= 6000 256.1380908660748

7000回
 while文のループを増やしたらゴールまで行けました。6000回の時にみられたおっとっとの動きもなく、安定したちょこちょこ走りでした。

f:id:akifukka:20191226215201p:plain
episode,reward= 7001 306.013638338901

(6)一応ソースリストを貼り付けておきます。

 ライブラリのインストールはOpen AI Gym Box2D BipedalWalkerをColaboratoryで動かしてみる(4)を参照してください。

学習前の処理

import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T

from pyvirtualdisplay import Display
from scipy.interpolate import interp1d

display = Display(visible=0, size=(1024, 768))
display.start()
import os
#pyvirtualdisplayの仕様変更で次の文はエラーになるのでコメントアウトします(2021/6/5修正)
#os.environ["DISPLAY"] = ":" + str(display.display) + "." + str(display.screen)

#Bipedalwalkerのv2は無くなったのでv3に変更(2021/6/5)
env = gym.make('BipedalWalker-v3')
#env = gym.make('MountainCarContinuous-v0')

# if gpu is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

#リプレイメモリー
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward','done'))

class ReplayMemory(object):
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        """Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)
   
    def reset(self):
        self.memory = []
        self.position = 0   

#Actor(Polycy) Network
class Policy_net(nn.Module):
    def __init__(self, NumObs, NumAct,NumHidden):
        super(Policy_net, self).__init__()
        self.ln1 = nn.Linear(NumObs, NumHidden)
        self.ln2 = nn.Linear(NumHidden, NumHidden)
        self.ln3 = nn.Linear(NumHidden, NumHidden)
        self.ln4 = nn.Linear(NumHidden, NumAct)
        self.ln4.weight.data.uniform_(-3e-3, 3e-3)

    def forward(self, x):
        x = F.relu(self.ln1(x))
        x = F.relu(self.ln2(x))
        x = F.relu(self.ln3(x))
        y = torch.tanh(self.ln4(x))
        return y

#Critic(Q) Network Q値の計算
class Q_net(nn.Module):
    def __init__(self, NumObs, NumAct,NumHidden1,NumHidden2):
        super(Q_net, self).__init__()
        self.ln1 = nn.Linear(NumObs, NumHidden1)
        self.ln2 = nn.Linear(NumHidden1+NumAct, NumHidden2)
        self.ln3 = nn.Linear(NumHidden2, NumHidden2)
        self.ln4 = nn.Linear(NumHidden2, 1)
        self.ln4.weight.data.uniform_(-3e-3, 3e-3)
        self.bn1 = nn.BatchNorm1d(num_features=NumHidden2)

    def forward(self, state, action):
        x = F.leaky_relu(self.ln1(state))
        x = torch.cat((x, action), dim=1)
#        x = F.leaky_relu(self.bn1(self.ln2(x)))
        x = F.leaky_relu(self.ln2(x))
        x = F.leaky_relu(self.ln3(x))
        y = self.ln4(x)
        return y

def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    batch = Transition(*zip(*transitions))

    #batchを一括して処理するためtensorのリストを2次元テンソルに変換
    state_batch = torch.stack(batch.state,dim=0)
    action_batch = torch.stack(batch.action,dim=0)
    reward_batch = torch.stack(batch.reward,dim=0)
    next_state_batch = torch.stack(batch.next_state,dim=0)
    done_batch = torch.stack(batch.done,dim=0)

    #Qネットのloss関数計算
    next_actions = policy_target_net(next_state_batch)
    next_Q_values =q_target_net(next_state_batch ,next_actions)
    expected_Q_values = (next_Q_values * GAMMA)*(1.0-done_batch) + reward_batch

    Q_values =  q_net(state_batch ,action_batch)

    #Qネットのloss関数
    q_loss = F.mse_loss(Q_values,expected_Q_values)

    #Qネットの学習
    q_optimizer.zero_grad()
    #誤差逆伝搬
    q_loss.backward()
    #重み更新
    q_optimizer.step()

    #policyネットのloss関数
    actions = policy_net(state_batch)
    p_loss = -q_net(state_batch,actions).mean()

    #policyネットの学習
    p_optimizer.zero_grad()
    #誤差逆伝搬
    p_loss.backward()
    #重み更新
    p_optimizer.step()

    tau = 0.001
    #targetネットのソフトアップデート
    #学習の収束を安定させるためゆっくり学習するtarget netを作りloss関数の計算に使う。
    #学習後のネット重み×tau分を反映させ、ゆっくり追従させる。
    for target_param, local_param in zip(policy_target_net.parameters(), policy_net.parameters()):
      target_param.data.copy_(tau*local_param.data + (1.0-tau)*target_param.data)

    for target_param, local_param in zip(q_target_net.parameters(), q_net.parameters()):
      target_param.data.copy_(tau*local_param.data + (1.0-tau)*target_param.data)

#action,observationの要素数取得
n_actions = env.action_space.shape[0]
n_observations = env.observation_space.shape[0]
print('num actions,observations',n_actions,n_observations )

#ネットワーク
#policyネットとそのtargetネット
policy_net = Policy_net(n_observations, n_actions,128).to(device)
policy_target_net = Policy_net(n_observations, n_actions,128).to(device)
#Qネットとそのtargetネット
q_net = Q_net(n_observations, n_actions, 64, 128).to(device)
q_target_net = Q_net(n_observations, n_actions, 64, 128).to(device)
#学習用optimizer生成
p_optimizer = optim.Adam(policy_net.parameters(),lr=1e-3)
q_optimizer = optim.Adam(q_net.parameters(),lr=3e-3, weight_decay=0.0001)
#学習用リプレイメモリ生成
memory = ReplayMemory(1000000)
memory.reset()
#ノイズの大きさ計算用、最初は大きくして学習が進んだら小さくするためのカウンタ
steps_done = 0

学習する。エピソード1000になっているので、何度か繰り返すか、値を変更して実行してください。

#BATCH_SIZE = 128
BATCH_SIZE = 128
#Qネット学習時の報酬の割引率
GAMMA = 0.98
#steps_doneに対するノイズの減少係数
SIGMA_START = 1.0 #最初
SIGMA_END = 0.3 #最後
SIGMA_DECAY = 800 #この回数で約30%まで減衰

#50 フレーム/sec グラフ描画用
FPS = 50
#学習数。num_epsode=200を変更する。
num_episodes = 1000

for i_episode in range(num_episodes):
    #whileループ内初期化
    observation = env.reset()
    done = False
    reward_total = 0.0
    t = 0
    #グラフ用データ保存領域初期化
    frames = []
    observations = []
    actions = []
    ts = []
    for i in range(n_observations):
      observations.append([])
    for i in range(n_actions):
      actions.append([])

    noise =np.array([random.uniform(-0.5,0.5) for i in range(n_actions)],dtype = np.float)
    theta = 0.08
    sigma = SIGMA_END + (SIGMA_START - SIGMA_END) * math.exp(-1. * steps_done / SIGMA_DECAY)
    steps_done += 1

    while not done and t < 700:
        #指令値生成------------------------------------
        sample = random.random()

        with torch.no_grad():
          action = policy_net(torch.from_numpy(observation).float().to(device))
          action = action.cpu().data.numpy()

        #最後は純粋にネットワークのデータを取得するためノイズ無し-------------------
        if (i_episode != num_episodes -1 ):
          #OUNoise
          noise = noise - theta * noise + sigma * np.array([random.uniform(-1.0,1.0) for i in range(len(noise))])
          action += noise

        action = np.clip(action, -1, 1)

        #物理モデル1ステップ---------------------------
        next_observation, reward, done, info = env.step(action)
        reward_total = reward_total + reward

        #学習用にメモリに保存--------------------------
        tensor_observation = torch.tensor(observation,device=device,dtype=torch.float)
        tensor_action = torch.tensor(action,device=device,dtype=torch.float)
        tensor_next_observation = torch.tensor(next_observation,device=device,dtype=torch.float)
        tensor_reward = torch.tensor([reward],device=device,dtype=torch.float)
        tensor_done =  torch.tensor([done],device=device,dtype=torch.float)
        memory.push(tensor_observation, tensor_action, tensor_next_observation, tensor_reward,tensor_done)

        #observation(state)更新------------------------
        observation = next_observation

        #学習してpolicy_net,target_neを更新
        optimize_model()
        #データ保存------------------------------------
        if i_episode == num_episodes -1 :
          #動画
          frames.append(env.render(mode = 'rgb_array'))
          #グラフ
          for i in range(n_observations):
            observations[i].append(observation[i])
          for i in range(n_actions):
            actions[i].append(action[i])
          ts.append(t/FPS)

        #時間を進める----------------------------------
        t += 1
        # end while loop ------------------------------
    if (steps_done % 10 == 0) or (i_episode == num_episodes -1 ):
#      print('episode,reward=',i_episode,reward_total)
      print('episode,reward=',steps_done,reward_total)
  # end for loop --------------------------------------

 動画を表示する

import matplotlib.pyplot as plt
import matplotlib.animation
import numpy as np
from IPython.display import HTML

fig2 = plt.figure(figsize=(6, 6))
plt.axis('off')

images = []
for f in frames:
  image = plt.imshow(f)
  images.append([image])
ani = matplotlib.animation.ArtistAnimation(fig2, images, interval=30, repeat_delay=1)

HTML(ani.to_jshtml())

 BipedalWalker、とりあえず(思ったよりも)何とか動きました。

つづく
 強化学習 カテゴリーの記事一覧 - やってみた!

Open AI Gym Box2D BipedalWalkerをColaboratoryで動かしてみる(5)

  前回はDDPG(Deep Deterministic Policy Gradient)でMountainCarContinuous挑戦し、無事学習して山登りに成功しました。(BipedalWalkerは手強いので後回しです・・・)

 今回は中身について、ざっくりですが解説してみます。

 1.DDPG(Deep Deterministic Policy Gradient)

 f:id:akifukka:20191223201154j:plain

 DQNでは、ある環境の時に行動a1,a2・・・を取った時の価値Q1,Q2・・(未来に得られる報酬の合計)を推定するQネットワークを学習します。ここで環境やQxは連続値を取ります。学習が完了したら、Q1,Q2・・・を比較して最も大きな価値の行動axを選択します。ネットワークの構造上、行動a1,a2・・・は離散的になります。点数を限りなく増やせば連続値を模擬できますが、あまり現実的ではありません。

 これに対しDDPGでは行動aもQ値を計算する入力にします。ここでの行動aは連続値です。ただ、このままだと学習後にどの行動aを選ぶかといった時に、aの値を変化させて価値Qが最大になるところを探し出す必要があります。

 じゃあということで、最初から価値Qが最大になる行動aを推定するpolicyネットワークを追加しちゃえばというのがDDPGです。

 policyネットワークはQネットワークの値が最大になる様、Qネットワークを損失関数に使って学習させます。

2.Qネットワーク(Q(s,a))の学習

 'Bellman equation'という考え方を使います。

f:id:akifukka:20191224201708j:plain

 価値関数Qを、これからもらえる報酬の総和で定義します。Qの引数は1項の図を見てわかるようにobservationの状態sと行動aです。

 ただし、そのまま報酬を未来永劫に渡って足し続けるとQが無限大に発散してしまうので、報酬に割引率を乗じて意味のある数字に収束するようにします(①)。すると、Qは次のステップQ(st+1,at+1)と割引率γを使って書くことができます(②)。

 次に左辺=0となるように変形します(③)。で、2乗して損失関数Lを定義して(④)、損失関数が最小になるように学習すればQを学習できることになります。

 ちなみに時間tでゴールした場合は、それ以降は報酬をもらえない、すなわちQt+1=0になるはずなので、その模擬も含めて損失関数は最終的に次のように書けます。

f:id:akifukka:20191223221032j:plain

 赤字はtarget関数です。これは価値関数Qをそのまま使うと、学習で自身の更新の影響を受け安定して収束しなくなるのを防ぐため、価値関数Qより少し遅れてゆっくり変化するようにしたものです。

 割引率は0.999等、1に近い値を使います。1に近ければ近いほど遠い将来の報酬の影響を受け、値が小さければ比較的短い時間範囲の報酬までしか影響を受けません。

 Qネットワークは上記の損失関数(loss関数)が最小になるように学習させます。
環境st、その時に取った行動at、その結果の環境st+1は実際に1ステップ動かした時の値(経験値)を使えます。1ステップの経験値からat+1の情報は得られないので、後で説明する行動を予測するpolicyネットワークを使って計算します。この時使用するpolicyネットワークも実際のpolicyネットワークに遅れて学習するtarget関数です。

 ソースリストのQネットワークの学習はdef optimize_model()中で行っています。該当する箇所を次に示します。

#Qネットのloss関数計算
next_actions = policy_target_net(next_state_batch)
next_Q_values =q_target_net(next_state_batch ,next_actions)
expected_Q_values = (next_Q_values * GAMMA)*(1.0-done_batch) + reward_batch

Q_values =  q_net(state_batch ,action_batch)

#Qネットのloss関数
q_loss = F.mse_loss(Q_values,expected_Q_values)

#Qネットの学習
q_optimizer.zero_grad()
#誤差逆伝搬
q_loss.backward()
#重み更新
q_optimizer.step()

 F.mse_loss()は差の2乗平均を算出する関数で、q_optimizer.zero_grad()~q_optimizer.step()で学習します。

 なお、複数の経験(ステップ)を一括して学習するため、環境s、行動a等は複数の経験分をstack(テンソルを複数まとめる)しています。

3. policyネットワーク(policy(s))の学習

 policyネットワークはQが最大となる行動を推測するよう学習させます。pytorchは損失関数が最小になるよう学習させるため、Q値に-1を乗じた損失関数を定義、値が最小になるよう学習させます。

  該当する箇所を次に示します。policyネットワーク(Target)はQネットの学習、経験の取得に使用するため、Qネットと交互に学習させ、並行して学習させます。

#policyネットのloss関数
actions = policy_net(state_batch)
p_loss = -q_net(state_batch,actions).mean()

#policyネットの学習
p_optimizer.zero_grad()
#誤差逆伝搬
p_loss.backward()
#重み更新
p_optimizer.step()

4.Target関数のソフトアップデート

 Target関数は前述の通り実際のQネット、policyネットにゆっくり追従するネットワークで、それぞれQネット、policyネットを学習した後、次のように更新します。

tau = 0.001
#targetネットのソフトアップデート
#学習の収束を安定させるためゆっくり学習するtarget netを作りloss関数の計算に使う。
#学習後のネット重み×tau分を反映させ、ゆっくり追従させる。
for target_param, local_param in zip(policy_target_net.parameters(), policy_net.parameters()):
  target_param.data.copy_(tau*local_param.data + (1.0-tau)*target_param.data)

for target_param, local_param in zip(q_target_net.parameters(), q_net.parameters()):
  target_param.data.copy_(tau*local_param.data + (1.0-tau)*target_param.data)

 ネットワークの重みをそのままコピーするのではなく、tau(この場合は1/1000)だけ学習後の値を使い、残りの割合分は現状の値をそのまま使うことで、学習の効果がゆっくり浸透するようにしています(1次遅れと同じイメージ)。

5.リプレイメモリ

 直前の経験だけを使って学習すると、直近のpolicyが出す行動に偏りQネットの学習もその領域に偏ってしまうため、過去の経験をメモリにためておき満遍なく学習するようにしています。リプレイメモリは有限の大きさにしておき、あまりに古くて使えない経験は新しい経験に上書きされます。

6.ノイズ

 経験を取得するのにpolicyの出力する行動を使うと、行動が偏ってしまい、学習もpolicyが出力する行動のごく狭い範囲に留まってしまいます。そのため極所最適解に陥り、他に良い行動があっても発見できまないまま終わってしまいます。

 DDPGでは、より良い行動を探すため、policyが出力する行動にノイズを混ぜて幅広く経験させる様、工夫されています。

7.ネットワーク定義

 今回はQネット、policyネット共に4層、隠れ要素128のネットワークにしてみました。層数、隠れ要素数ともに特にこの値にした理由があるわけではないので、変更して試してみるのもいいと思います。

8.まとめ

 DDPGのアルゴリズムの概要を纏めました。学習を効率良く行うには、報酬の与え方(今回はOpen AI Gymなので変更しない)、割引率、ノイズの大きさ、ネットワーク構造といったあたりが鍵になると思われます。

 次回は再びBipedalWalkerを動かします。

 強化学習 カテゴリーの記事一覧 - やってみた!

Open AI Gym Box2D BipedalWalkerをColaboratoryで動かしてみる(4)

改正2019.12.26

 ソースリスト中でsteps_done +=1の位置をwhileループ(各ステップ計算ループ)からepsodeのforループに移動しました(バグ)。このバグのためSIGMA_DECAYがほとんど効かず、すぐにノイズが小さくなっていました。あわせてSIGMA_DECAYの設定値も150→30に見直しました。

-----------------------------------

 前回は歩行パターンのスケジュールを作り、PD制御で脚の角度を制御し何とか2,3歩歩くことができました。

 ここからはpytorchで深層強化学習を実装していきます。が、BipedalWalkerはあまりに手強いので、手始めにMountainCarContinuousで試してみます。

 主に次のサイトを参考にさせてもらいました。

  実装はDDPG(Deep Deterministic Policy Gradient)です。元の論文はこちら。

 また、こちらにOPEN AIによる解説があります。

Deep Deterministic Policy Gradient — Spinning Up documentation

1.DDPG 

 DDPGは強化学習の手法のひとつDPG(Deterministic Policy Gradient)を発展させ、DeepLerningを適用したものです。元となったDPGの論文はこちら。

Deterministic Policy Gradient Algorithms,Silver et al., 2014

 DDPGの特徴は次の通りです。

  • action(制御の指令値)に連続値(discreteではなく)を扱うことができますDQN(Deep Q Network)の制御指令値はDiscrete値なので応用範囲が限定されますが、DDPGは連続値なので一般的な制御に応用することができます。
  • actor-critic アルゴリズムを採用します。actor(policy)は行動方策を推定し、critic(Q関数)は行動の価値を推定するものです。これらをディープネットワークで定義して同時に学習していきます。

2.動かしてみます

 DDPGの詳細は後回しにして、まずはどんなものか動かしてみます。

(1)Corabolatoryの準備

 Colaboratoryを立ち上げます。「ランタイム」ー「ランタイムのタイプを変更」でハードウェアアクセラレータ「GPU」を選択します。

(2)必要なライブラリのインストール

必要なライブラリをインストールします。

!apt-get update
!pip3 install box2d-py
!pip3 install 'gym[Box2D]'
!pip3 install  'gym[classic_control]'
!apt-get -qq -y install xvfb freeglut3-dev ffmpeg
!pip3 -q install pyglet
!pip3 -q install pyopengl
!pip3 -q install pyvirtualdisplay
!apt-get install x11-utils

(3)ランタイムを再起動します。

 インルトールした内容を反映させるため、一旦ランタイムを再起動します。
「ランタイム」-「ランタイムの再起動」

(4)学習前の処理(リプレイメモリ、ネットワークの定義、初期化)

 次のコードを実行します。

import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T

from pyvirtualdisplay import Display
from scipy.interpolate import interp1d

display = Display(visible=0, size=(1024, 768))
display.start()
import os
#pyvirtualdisplayの仕様変更で次の文はエラーになるのでコメントアウトします(2021/6/5変更)
#os.environ["DISPLAY"] = ":" + str(display.display) + "." + str(display.screen)

#Bipedalwalkerのv2は無くなったのでv3に変更(2021/6/5)
#env = gym.make('BipedalWalker-v3')
env = gym.make('MountainCarContinuous-v0')

# if gpu is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

#リプレイメモリー
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward','done'))

class ReplayMemory(object):
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        """Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)
   
    def reset(self):
        self.memory = []
        self.position = 0   

#Actor(Polycy) Network
class Policy_net(nn.Module):
    def __init__(self, NumObs, NumAct,NumHidden):
        super(Policy_net, self).__init__()
        self.ln1 = nn.Linear(NumObs, NumHidden)
        self.ln2 = nn.Linear(NumHidden, NumHidden)
        self.ln3 = nn.Linear(NumHidden, NumHidden)
        self.ln4 = nn.Linear(NumHidden, NumAct)
        self.ln4.weight.data.uniform_(-3e-3, 3e-3)

    def forward(self, x):
        x = F.relu(self.ln1(x))
        x = F.relu(self.ln2(x))
        x = F.relu(self.ln3(x))
        y = torch.tanh(self.ln4(x))
        return y

#Critic(Q) Network Q値の計算
class Q_net(nn.Module):
    def __init__(self, NumObs, NumAct,NumHidden1,NumHidden2):
        super(Q_net, self).__init__()
        self.ln1 = nn.Linear(NumObs, NumHidden1)
        self.ln2 = nn.Linear(NumHidden1+NumAct, NumHidden2)
        self.ln3 = nn.Linear(NumHidden2, NumHidden2)
        self.ln4 = nn.Linear(NumHidden2, 1)
        self.ln4.weight.data.uniform_(-3e-3, 3e-3)

    def forward(self, state, action):
        x = F.leaky_relu(self.ln1(state))
        x = torch.cat((x, action), dim=1)
        x = F.leaky_relu(self.ln2(x))
        x = F.leaky_relu(self.ln3(x))
        y = self.ln4(x)
        return y

def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    batch = Transition(*zip(*transitions))

    #batchを一括して処理するためtensorのリストを2次元テンソルに変換
    state_batch = torch.stack(batch.state,dim=0)
    action_batch = torch.stack(batch.action,dim=0)
    reward_batch = torch.stack(batch.reward,dim=0)
    next_state_batch = torch.stack(batch.next_state,dim=0)
    done_batch = torch.stack(batch.done,dim=0)

    #Qネットのloss関数計算
    next_actions = policy_target_net(next_state_batch)
    next_Q_values =q_target_net(next_state_batch ,next_actions)
    expected_Q_values = (next_Q_values * GAMMA)*(1.0-done_batch) + reward_batch

    Q_values =  q_net(state_batch ,action_batch)

    #Qネットのloss関数
    q_loss = F.mse_loss(Q_values,expected_Q_values)

    #Qネットの学習
    q_optimizer.zero_grad()
    #誤差逆伝搬
    q_loss.backward()
    #重み更新
    q_optimizer.step()

    #policyネットのloss関数
    actions = policy_net(state_batch)
    p_loss = -q_net(state_batch,actions).mean()

    #policyネットの学習
    p_optimizer.zero_grad()
    #誤差逆伝搬
    p_loss.backward()
    #重み更新
    p_optimizer.step()

    tau = 0.001
    #targetネットのソフトアップデート
    #学習の収束を安定させるためゆっくり学習するtarget netを作りloss関数の計算に使う。
    #学習後のネット重み×tau分を反映させ、ゆっくり追従させる。
    for target_param, local_param in zip(policy_target_net.parameters(), policy_net.parameters()):
      target_param.data.copy_(tau*local_param.data + (1.0-tau)*target_param.data)

    for target_param, local_param in zip(q_target_net.parameters(), q_net.parameters()):
      target_param.data.copy_(tau*local_param.data + (1.0-tau)*target_param.data)

#action,observationの要素数取得
n_actions = env.action_space.shape[0]
n_observations = env.observation_space.shape[0]
print('num actions,observations',n_actions,n_observations )

#ネットワーク
#policyネットとそのtargetネット
policy_net = Policy_net(n_observations, n_actions,128).to(device)
policy_target_net = Policy_net(n_observations, n_actions,128).to(device)
#Qネットとそのtargetネット
q_net = Q_net(n_observations, n_actions, 64, 128).to(device)
q_target_net = Q_net(n_observations, n_actions, 64, 128).to(device)
#学習用optimizer生成
p_optimizer = optim.Adam(policy_net.parameters(),lr=1e-3)
q_optimizer = optim.Adam(q_net.parameters(),lr=3e-3, weight_decay=0.0001)
#学習用リプレイメモリ生成
memory = ReplayMemory(10000)
memory.reset()
#ノイズの大きさ計算用、最初は大きくして学習が進んだら小さくするためのカウンタ
steps_done = 0

(5)学習します

 所要時間は10分くらいでしょうか。表示されるrewardは報酬の合計値で、山登り成功の場合は80~95点くらいになります。学習中は制御指令値にノイズを加算しているので完全な実力値とは言えません。epsodeの最後の回はノイズ無しなので、ほぼ実力値になります。

 学習epsodeが200回の場合、0から始めて199回目のrewardが実力相当の値です。学習数は必要に応じて変更してください。

#BATCH_SIZE = 128
BATCH_SIZE = 128
#Qネット学習時の報酬の割引率
GAMMA = 0.999
#step_doneに対するノイズの減少係数
SIGMA_START = 1.0 #最初
SIGMA_END = 0.3 #最後
SIGMA_DECAY = 30 #この回数で約30%まで減衰

#50 フレーム/sec グラフ描画用
FPS = 50
#学習数。num_epsode=200を変更する。
num_episodes = 200

for i_episode in range(num_episodes):
    #whileループ内初期化
    observation = env.reset()
    done = False
    reward_total = 0.0
    t = 0
    #グラフ用データ保存領域初期化
    frames = []
    observations = []
    actions = []
    ts = []
    for i in range(n_observations):
      observations.append([])
    for i in range(n_actions):
      actions.append([])

    noise =np.array([random.uniform(-0.5,0.5) for i in range(n_actions)],dtype = np.float)
    theta = 0.08
    sigma = SIGMA_END + (SIGMA_START - SIGMA_END) * math.exp(-1. * steps_done / SIGMA_DECAY)
    steps_done += 1
while not done and t < 1400: #指令値生成------------------------------------ sample = random.random() with torch.no_grad(): action = policy_net(torch.from_numpy(observation).float().to(device)) action = action.cpu().data.numpy() #最後は純粋にネットワークのデータを取得するためノイズ無し------------------- if (i_episode != num_episodes -1 ): #OUNoise noise = noise - theta * noise + sigma * np.array([random.uniform(-1.0,1.0) for i in range(len(noise))]) action += noise action = np.clip(action, -1, 1) #物理モデル1ステップ--------------------------- next_observation, reward, done, info = env.step(action) reward_total = reward_total + reward #学習用にメモリに保存-------------------------- tensor_observation = torch.tensor(observation,device=device,dtype=torch.float) tensor_action = torch.tensor(action,device=device,dtype=torch.float) tensor_next_observation = torch.tensor(next_observation,device=device,dtype=torch.float) tensor_reward = torch.tensor([reward],device=device,dtype=torch.float) tensor_done = torch.tensor([done],device=device,dtype=torch.float) memory.push(tensor_observation, tensor_action, tensor_next_observation, tensor_reward,tensor_done) #observation(state)更新------------------------ observation = next_observation #学習してpolicy_net,target_neを更新 optimize_model() #データ保存------------------------------------ if i_episode == num_episodes -1 : #動画 frames.append(env.render(mode = 'rgb_array')) #グラフ for i in range(n_observations): observations[i].append(observation[i]) for i in range(n_actions): actions[i].append(action[i]) ts.append(t/FPS) #時間を進める---------------------------------- t += 1 # end while loop ------------------------------ print('episode,reward=',i_episode,reward_total) # end for loop --------------------------------------

 (6)学習結果を見てみる

次を実行させると動画を見ることができます。

import matplotlib.pyplot as plt
import matplotlib.animation
import numpy as np
from IPython.display import HTML

fig2 = plt.figure(figsize=(6, 6))
plt.axis('off')

images = []
for f in frames:
  image = plt.imshow(f)
  images.append([image])
ani = matplotlib.animation.ArtistAnimation(fig2, images, interval=30, repeat_delay=1)

HTML(ani.to_jshtml()) 

f:id:akifukka:20191222113715j:plain


(7)ネットワークを可視化してみる

 デバッグのためQネット(critic)、policyネット(actor)を可視化してみました。少し謎のところもありますが・・・。

import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import numpy as np

X, Y = np.mgrid[-1:1.1:0.1, -1:1.1:0.1]

#Q map
Z = np.zeros((21,21,21),dtype = float)
Za = np.zeros((21,21),dtype = float)
position = -1.0
for i in range(21):#position loop
  speed = -1.0
  for j in range(21):#speed loop
    a = -1.0
    a_qmax = -1.0
    qmax = -10000.0
    for k in range(21):#a loop
      q = q_net( torch.tensor([[position,speed]]).float().to(device), torch.tensor([[a]]).float().to(device)).to("cpu").detach().numpy()[0][0]
      Z[i][j][k] = q
      if (qmax < q) :
        qmax = q 
        a_qmax = a
      a = a + 0.1
    Za[j][i] = a_qmax
    speed = speed + 0.1
  position = position + 0.1

#Qmap
fig = plt.figure(figsize=(18, 6), facecolor="w")
ax = fig.add_subplot(121, projection="3d")
surf = ax.plot_surface(X, Y, Z[10])
ax.set_xlabel("speed",fontsize = 14)
ax.set_ylabel("a",fontsize = 14)
ax.set_zlabel("Q",fontsize = 14)
ax.set_title("position 0.0")

ax = fig.add_subplot(122, projection="3d")
surf = ax.plot_surface(X, Y, Za)
ax.set_ylabel("position",fontsize = 14)
ax.set_xlabel("speed",fontsize = 14)
ax.set_zlabel("a",fontsize = 14)
ax.set_title("calc from Qmap")

plt.show()

#policy map
Zpolicy = np.zeros((21,21),dtype = float)
speed = -1.0
for i in range(21):
  position = -1.0
  for j in range(21):
    Zpolicy[i][j] = policy_net(torch.tensor([[position,speed]]).float().to(device)).to("cpu").detach().numpy()[0][0]
    position = position + 0.1
  speed = speed + 0.1

fig = plt.figure(figsize=(9, 6), facecolor="w")
ax = fig.add_subplot(111, projection="3d")
ax.set_xlabel("speed",fontsize = 14)
ax.set_ylabel("position",fontsize = 14)
ax.set_zlabel("a",fontsize = 14)
surf = ax.plot_surface(X, Y, Zpolicy,color="green")
ax.set_title("policy")
plt.show()

 

f:id:akifukka:20191222114605j:plain

 左上の図はposition=0.0(図の真ん中なので谷より若干右より?)における価値関数Qネットの計算値です。

 Q = Q_net(observation, action)

 observation=[position,speed]

 a=action (+で右に押す、-で左に押す)

 aが変わっても以外にQの値は大きくは変わりませんでした。

 右上の図は行動aを変化させながらQを計算して、最もQが大きくなるaを描画したものです。

 左下の図は行動aを決めるpolicyネットの計算値です。この図から次のことがわかります。

・速度が-の時(台車が左に動いている)は-に押す(左に押す)

・速度が+の時(台車が右に動いている)は+に押す(右に押す)

 右上のQネットから算出した行動予測の図とpolicyネットを計算した左下の図はほぼ一致すると思っていたのですが、どうも一致しないようです。学習の初期ではほぼ一致してますが、学習が進むにつれ違いが出てきます。

 今のところ、この現象は理解できていません・・・。が、学習自体は上手くいっているようなので・・。

ーーーーーーーーーーーーーーーーーーーーーーーー

つづく!

 次回はDDPGの概要を理解できる様、ざっくり解説をしてみます。BipedalWalkerはその後に挑戦ということで(なんせ先に中身を理解しておかないと、太刀打ちできなさそうなので)。

 強化学習 カテゴリーの記事一覧 - やってみた!

Open AI Gym Box2D BipedalWalkerをColaboratoryで動かしてみる(3)

 前回の続きです。歩行のスケジュールをチューニングした結果、2,3歩歩けるようになりました。これ以上は胴体の傾きをつかって制御するなどもう一工夫必要そうです。 

  スケジュール制御での歩行は一旦このくらいにしておきます。とりあえず現状の歩行を以下に報告します。

(2)歩行スケジュールのチューニング

 チューニング結果の歩行スケジュールを次に示します。

xsch = np.array([0,1,2,3,4,5,6,7,8,9,10],dtype = 'float')
yhip1  = np.array([ 0.021, -0.39, -0.58, -0.54, 0.45,    0.18, 0.88,  0.78, 0.60,  0.20, 0.021],dtype = 'float')
yknee1 = np.array([  0.17,  0.85,  0.95,  0.92, -0.38, -0.05,  -0.34,  -0.090,  0.074, 0.043,  0.17],dtype = 'float')
yhip2  = np.array([ 0.18,  0.88,  0.78,  0.60,  0.20, 0.021, -0.39, -0.58, -0.54, 0.45,    0.18],dtype = 'float')
yknee2 = np.array([ -0.05,-0.34,-0.090,  0.074,  0.043 ,  0.17,  0.85, 0.95,  0.92, -0.38, 0.05],dtype = 'float')

 前回のソースリストで上記スケジュールを差替え、次の赤文字のspeedを5→4.2に変更します。

for i in range(1):
    ・
    ・
    speed = 5

 歩き方はこんな感じ。スキップのような歩き方になりました。実行するたび若干変わりますが、たぶん地面のでこぼこが毎回ちょっとずつ違うんじゃないかと思います。

f:id:akifukka:20191212220343j:plain

 さすがに完全手動でチューニングするのは面倒なので、乱数でスケジュールをちょっとずつ変更してはBipedalWalkerの報酬(reward)が多ければ採用、小さければ不採用として、ある程度自動でチューニングさせました。ただし、報酬が安定しない(地面のでこぼこの違い)ので、最終的にどの数値を採用するかはセンス?適当に選びました・・・。

 つぎはぎで、あまり参考になりませんが乱数でチューニングするのに使ったソースリストを残しておきます。

import torch
import numpy as np
import gym
import random
from pyvirtualdisplay import Display
from scipy.interpolate import interp1d

display = Display(visible=0, size=(1024, 768))
display.start()
import os
#pyvirtualdisplayの仕様変更で次の文はエラーになるのでコメントアウトします(2021/6/5修正)
#os.environ["DISPLAY"] = ":" + str(display.display) + "." + str(display.screen)

#Bipedalwalkerのv2は無くなったのでv3に変更(2021/6/5)
#env = gym.make('BipedalWalker-v3')

#角度制御ゲイン
 #P(比例)項
pgain = np.array([4,4,4,4], dtype = 'float')
 #D(微分)項
dgain = np.array([0.2,0.2,0.2,0.2], dtype = 'float')

#脚の指令角スケジュール
xsch = np.array([0,1,2,3,4,5,6,7,8,9,10],dtype = 'float')
yhip1  = np.array([ 0.021, -0.39, -0.58, -0.54, 0.45,    0.18, 0.88,  0.78, 0.60,  0.20, 0.021],dtype = 'float')
yknee1 = np.array([  0.17,  0.85,  0.95,  0.92, -0.38, -0.05,  -0.34,  -0.090,  0.074, 0.043,  0.17],dtype = 'float')
yhip2  = np.array([ 0.18,  0.88,  0.78,  0.60,  0.20, 0.021, -0.39, -0.58, -0.54, 0.45,    0.18],dtype = 'float')
yknee2 = np.array([ -0.05,-0.34,-0.090,  0.074,  0.043 ,  0.17,  0.85, 0.95,  0.92, -0.38, 0.05],dtype = 'float')
numx = xsch.shape[0]

hip1demmand = interp1d(xsch, yhip1)
knee1demmand = interp1d(xsch, yknee1)
hip2demmand = interp1d(xsch, yhip2)
knee2demmand = interp1d(xsch, yknee2)

graphschys =[]
graphschys.append([hip1demmand(0)])
graphschys.append([knee1demmand(0)])
graphschys.append([hip2demmand(0)])
graphschys.append([knee2demmand(0)])
graphschxs =[0.0]

for graphschx in np.arange(0.1,10.0,0.1):
  graphschxs.append(graphschx)
  graphschys[0].append(hip1demmand(graphschx))
  graphschys[1].append(knee1demmand(graphschx))
  graphschys[2].append(hip2demmand(graphschx))
  graphschys[3].append(knee2demmand(graphschx))

#モデルタイムステップ(1秒間に50フレーム = 0.02s)
FPS = 50.0

speed = 4.2
bestspeed = 4.2
bestreward = -9999.0

prevyhip1 = yhip1.copy()
prevyknee1 = yknee1.copy()
prevyhip2 = yhip2.copy()
prevyknee2 = yknee2.copy()

for i in range(30):
    obs = env.reset()
    done = False
    frames = []
    observations = []
    actions = []
    ts = []
    legangle = np.array([0,0,0,0], dtype = 'float')
    leganglespeed = np.array([0,0,0,0], dtype = 'float')
    legangledemand = np.array([0,0,0,0], dtype = 'float')

    x = 0.0
    t = 0
    leg1gcount = 0
    leg2gcount = 0
    leg1touchground =0
    leg2touchground =0
    reward = 0.0

    while not done and t < 1000:
        #動画用画面保存-------------------------------
        frames.append(env.render(mode = 'rgb_array'))

        #指令値生成-----------------------------------
        x += speed/FPS
        if x>10.0 :x-=10

        #接地でスケジュール強制的に進める
#        if x>0.5 and x<2.5 and leg2touchground == 1:x = 4.5 - x
#        if x>5.5 and x<7.5 and leg1touchground == 1:x = 9.5 - (x - 5)

        legangledemand[0] = hip1demmand(x)
        legangledemand[1] = knee1demmand(x)
        legangledemand[2] = hip2demmand(x)
        legangledemand[3] = knee2demmand(x)

        #角度指令値上書き。スケジュールで動かす時はコメントアウトすること
#        legangledemand = np.array([1,-1,-1,-1], dtype = 'float')

        #脚の角度 PD制御
        action = pgain * (legangledemand - legangle) - dgain * leganglespeed

        #2足歩行物理モデル1ステップ(0.02s)-------------
        observation, r, done, info = env.step(action)
        reward = reward + r

        #脚の角度,角速度,胴体角取り出し----------------
        legangle[0] = observation[4] #HIP1
        leganglespeed[0] = observation[5]
        legangle[1] = observation[6] #KNEE1
        leganglespeed[1] = observation[7]
        legangle[2] = observation[9] #HIP2
        leganglespeed[2] = observation[10]
        legangle[3] = observation[11] #KNEE2
        leganglespeed[3] = observation[11]

        #leg1 接地判定 チャタリング防止
        if observation[8] == 1:
          if leg1gcount <3 :leg1gcount += 1
        else:
          leg1gcount = 0
        
        if leg1gcount == 3:
          leg1touchground = 1
        else:
          leg1touchground = 0

        #leg2 接地判定 チャタリング防止
        if observation[13] == 1:
          if leg2gcount <3 :leg2gcount += 1
        else:
          leg2gcount = 0;

        if leg2gcount == 3:
          leg2touchground = 1
        else:
          leg2touchground = 0

        #グラフ用データ保存----------------------------
        if t == 0 :
          for i in range(24):
            observations.append([observation[i]])
          for i in range(4):
            actions.append([action[i]])
        else :
          for i in range(24):
            observations[i].append(observation[i])
          for i in range(4):
            actions[i].append(action[i])
        ts.append(t/FPS)

        #時間を進める----------------------------------
        t += 1
  
    if reward > bestreward:
#      bestspeed = speed
      bestreward = reward.copy()
      print(yhip1)
      print(yknee1)
      print(yhip2)
      print(yknee2)
      print(i,bestreward)
    else:
      #元に戻す
      yhip1 = prevyhip1.copy()
      yknee1 = prevyknee1.copy()
      yhip2 = prevyhip2.copy()
      yknee2 = prevyknee2.copy()

    #一時退避
    prevyhip1 = yhip1.copy()
    prevyknee1 = yknee1.copy()
    prevyhip2 = yhip2.copy()
    prevyknee2 = yknee2.copy()

    prev1 = random.randint(0,1)
    prev21 = random.randint(0,9)
    #両足同じスケジュールにする
    prev22 = prev21 + 5
    if prev22 >9:
      prev22 = prev22 - 10 

    if prev1 == 0:
      yhip1[prev21] += random.uniform(-0.1,0.1)
      yhip2[prev22] = yhip1[prev21]
      #端点は同じ値を維持
      yhip1[10] = yhip1[0]
      yhip2[10] = yhip2[0]
    elif prev1 == 1:
      yknee1[prev21] += random.uniform(-0.1,0.1)
      yknee2[prev22] = yknee1[prev21]
      #端点は同じ値を維持
      yknee1[10] = yknee1[0]
      yknee2[10] = yknee2[0]

#    speed = bestspeed + random.uniform(-1, 1)
    hip1demmand = interp1d(xsch, yhip1)
    knee1demmand = interp1d(xsch, yknee1)
    hip2demmand = interp1d(xsch, yhip2)
    knee2demmand = interp1d(xsch, yknee2)

#end for loop -----------------------------------------

#元に戻す
yhip1 = prevyhip1
knee1 = prevyknee1
yhip2 = prevyhip2
knee2 = prevyknee2

 ちなみにspeedも乱数で変更して報酬が多くなるよう選びました。

 pythonは慣れないこともあり変数への '=' が代入では無く、新たに作られるオブジェクトへの参照だと知らなくて少し手間取りました。最初、スケジュールを退避したはずが、退避できずにおかしな動きをしていました。copy()を追加、新らたなオブジェクトを作ることで正しく動くようになりましたが、今後も変数の使い方は注意しないと。

 a+=1、a-=1といった演算も新たにオブジェクトを作ってくれないということで、pythonに慣れるまで気を付けないといけなさそうです。(a=a+1の場合は右辺で新たなオブジェクトを生成してくれるとのこと)。

 さて、スケジュール制御はここまでとして、次回からニューラルネットワークを使っていく予定です。今日は短いけどおしまい。

つづく

 強化学習 カテゴリーの記事一覧 - やってみた!

Open AI Gym Box2D BipedalWalkerをColaboratoryで動かしてみる(2)

 前回の続きです。ColaboratoryでBipedicalWalkerを動かすセッティング等は前回の記事を参照ください。

 まずは、ニューラルネットワークを使わずにとにかく動かして、BipedicalWalkerがどのようなものなのか試してみます。歩くとこまで行くかどうか(現在も試行錯誤中)。

7.とにかく動かしてみる

(1)角度制御を追加してとにかく動かす

 ソースリストを示します。計算部分と表示部分の2つにわけてありますので、Colaboratoryで動かす際は順番に動かしてください。

・計算部分

import torch
import numpy as np
import gym
from pyvirtualdisplay import Display
from scipy.interpolate import interp1d

display = Display(visible=0, size=(1024, 768))
display.start()
import os
#pyvirtualdisplayの仕様変更で次の文はエラーになるのでコメントアウトします(2021/6/5修正)
#os.environ["DISPLAY"] = ":" + str(display.display) + "." + str(display.screen)

#Bipedalwalkerのv2は無くなったのでv3に変更(2021/6/5)
env = gym.make('BipedalWalker-v3')

#角度制御ゲイン
 #P(比例)項
pgain = np.array([4,4,4,4], dtype = 'float')
 #D(微分)項
dgain = np.array([0.2,0.2,0.2,0.2], dtype = 'float')

#脚の指令角スケジュール
xsch = np.array([0,1,2,3,4,5,6,7,8,9,10],dtype = 'float')
yhip1  = np.array([    0, -0.3, -0.6, -0.6, -0.3,    0, 0.85,  0.7,  0.5,  0.3,    0],dtype = 'float')
yknee1 = np.array([  0.2,  0.9,  0.8,  0.7, -0.4,    0,  0.0,  0.0,  0.0,  0.0,  0.0],dtype = 'float')
yhip2  = np.array([ 0.85,  0.7,  0.5,  0.3,    0, -0.3, -0.6, -0.6, -0.3,    0, 0.85],dtype = 'float')
yknee2 = np.array([ -0.4,  0.0,  0.0,  0.0,  0.2,  0.9,  0.8,  0.7, -0.4,    0,    0],dtype = 'float')
numx = xsch.shape[0]

hip1demmand = interp1d(xsch, yhip1)
knee1demmand = interp1d(xsch, yknee1)
hip2demmand = interp1d(xsch, yhip2)
knee2demmand = interp1d(xsch, yknee2)

graphschys =[]
graphschys.append([hip1demmand(0)])
graphschys.append([knee1demmand(0)])
graphschys.append([hip2demmand(0)])
graphschys.append([knee2demmand(0)])
graphschxs =[0.0]

for graphschx in np.arange(0.1,10.0,0.1):
  graphschxs.append(graphschx)
  graphschys[0].append(hip1demmand(graphschx))
  graphschys[1].append(knee1demmand(graphschx))
  graphschys[2].append(hip2demmand(graphschx))
  graphschys[3].append(knee2demmand(graphschx))

#モデルタイムステップ(1秒間に50フレーム = 0.02s)
FPS = 50.0

for i in range(1):
    obs = env.reset()
    done = False
    frames = []
    observations = []
    actions = []
    ts = []
    legangle = np.array([0,0,0,0], dtype = 'float')
    leganglespeed = np.array([0,0,0,0], dtype = 'float')
    legangledemand = np.array([0,0,0,0], dtype = 'float')
    speed = 5
    x = 0.0
    t = 0
    leg1gcount = 0
    leg2gcount = 0
    leg1touchground =0
    leg2touchground =0

    while not done and t < 400:
        #動画用画面保存-------------------------------
        frames.append(env.render(mode = 'rgb_array'))

        #指令値生成-----------------------------------
        x += speed/FPS
        if x>10.0 :x-=10

        #接地でスケジュール強制的に進める
#        if x>0.5 and x<2.5 and leg2touchground == 1:x = 4.5 - x
#        if x>5.5 and x<7.5 and leg1touchground == 1:x = 9.5 - (x - 5)

        legangledemand[0] = hip1demmand(x)
        legangledemand[1] = knee1demmand(x)
        legangledemand[2] = hip2demmand(x)
        legangledemand[3] = knee2demmand(x)

        #角度指令値上書き。スケジュールで動かす時はコメントアウトすること
        legangledemand = np.array([1,1,1,1], dtype = 'float')

        #脚の角度 PD制御
        action = pgain * (legangledemand - legangle) - dgain * leganglespeed

        #2足歩行物理モデル1ステップ(0.02s)-------------
        observation, reward, done, info = env.step(action)

        #脚の角度,角速度,胴体角取り出し----------------
        legangle[0] = observation[4] #HIP1
        leganglespeed[0] = observation[5]
        legangle[1] = observation[6] #KNEE1
        leganglespeed[1] = observation[7]
        legangle[2] = observation[9] #HIP2
        leganglespeed[2] = observation[10]
        legangle[3] = observation[11] #KNEE2
        leganglespeed[3] = observation[11]

        #leg1 接地判定 チャタリング防止
        if observation[8] == 1:
          if leg1gcount <3 :leg1gcount += 1
        else:
          leg1gcount = 0
        
        if leg1gcount == 3:
          leg1touchground = 1
        else:
          leg1touchground = 0

        #leg2 接地判定 チャタリング防止
        if observation[13] == 1:
          if leg2gcount <3 :leg2gcount += 1
        else:
          leg2gcount = 0;

        if leg2gcount == 3:
          leg2touchground = 1
        else:
          leg2touchground = 0

        #グラフ用データ保存----------------------------
        if t == 0 :
          for i in range(24):
            observations.append([observation[i]])
          for i in range(4):
            actions.append([action[i]])
        else :
          for i in range(24):
            observations[i].append(observation[i])
          for i in range(4):
            actions[i].append(action[i])
        ts.append(t/FPS)

        #時間を進める----------------------------------
        t += 1

・表示部分

import matplotlib.pyplot as plt
import matplotlib.animation
import numpy as np
from IPython.display import HTML

fig1 = plt.figure(figsize=(12, 16))
sub1 = fig1.add_subplot(621)
sub1.plot(ts,observations[0],label="hull angle")
sub1.plot(ts,observations[1],label="hull angle Velocity")
sub1.legend()

sub2 = fig1.add_subplot(622)
sub2.plot(ts,observations[2],label="Velocity x")
sub2.plot(ts,observations[3],label="Velocity y")
sub2.legend()

sub3 = fig1.add_subplot(623)
sub3.plot(ts,observations[4],label="Hip1 Angle")
sub3.plot(ts,observations[9],label="Hip2 Angle")
sub3.legend()

sub4 = fig1.add_subplot(624)
sub4.plot(ts,observations[5],label="Hip1 Angle Velocity")
sub4.plot(ts,observations[10],label="Hip2 Angle Velocity")
sub4.legend()

sub5 = fig1.add_subplot(625)
sub5.plot(ts,observations[6],label="Knee1 Angle")
sub5.plot(ts,observations[11],label="Knee2 Angle")
sub5.legend()

sub6 = fig1.add_subplot(626)
sub6.plot(ts,observations[7],label="Knee1 Angle Velocity")
sub6.plot(ts,observations[12],label="Knee2 Angle Velocity")
sub6.legend()

sub7 = fig1.add_subplot(627)
sub7.plot(ts,observations[8],label="Leg1 Touch Ground")
sub7.plot(ts,observations[13],label="Leg2 Touch Ground")
sub7.legend()

sub8 = fig1.add_subplot(326)
sub8.plot(ts,actions[0],label="HIP1 Angle Velocity Demand")
sub8.plot(ts,actions[1],label="KNEE1 Angle Velocity Demand")
sub8.plot(ts,actions[2],label="HIP2 Angle Velocity Demand")
sub8.plot(ts,actions[3],label="KNEE2 Angle Velocity Demand")
sub8.set_ylim(-2.0,2.0)
sub8.legend()

sub9 = fig1.add_subplot(325)
sub9.plot(graphschxs,graphschys[0],label="HIP1 Angle Sch")
sub9.plot(graphschxs,graphschys[1],label="KNEE1 Angle Sch")
sub9.plot(graphschxs,graphschys[2],label="HIP2 Angle Sch")
sub9.plot(graphschxs,graphschys[3],label="KNEE2 Angle Sch")
sub8.set_ylim(-1.0,1.0)
sub9.legend()

plt.show()

fig2 = plt.figure(figsize=(6, 6))
plt.axis('off')

images = []
for f in frames:
  image = plt.imshow(f)
  images.append([image])
ani = matplotlib.animation.ArtistAnimation(fig2, images, interval=30, repeat_delay=1)

HTML(ani.to_jshtml())

 

 モデルの入力は脚の関節の角速度です。これでは歩行させるためのスケジュールを決めずらいので、まずは決まった角度に制御するよう制御回路を追加します。制御回路の構造を図に示します。

f:id:akifukka:20191208113348j:plain

 上記じゃなきゃダメというわけでは無く、他にも様々な構造を考えることができます。制御構造は何を重視するか(目標角度と実際の角度の差、偏差を小さくするのを重視したいのか、差はちょっとくらいはあっても、即応性を重視したいのかなど)によって変わってくるのですが、今回はバランスの良い(というか、だいたい何でもそこそこ対応できる万能型)PID制御から、即応性を重視してP(比例)、D(微分)を使うPD制御としました(I(積分)項は使わないことにしました)。

 図を見るとわかるように、P項、D項は次のように動きます。

  • P項
    目標と今の位置が大きくずれていたら、指令値もそれに応じて大きくし早く動かす。
  • D項
    目標へ向かう速度が早すぎ、最後に止まれず行き過ぎてしまう(オーバーシュート)。オーバシュート量を減らすため、速度が出たら指令値を少し小さくし、速度を加減するよう動く。

 P項、D項をどのくらい効かせるかはPゲイン、Dゲインの値で決めますが、今回は何回か動かして適当(何となくうまく動く程度)に次の値に決めました(ソースリストの上の方)

 

#角度制御ゲイン
#P(比例)項
 pgain = np.array([4,4,4,4], dtype = 'float')
#D(微分)項
 dgain = np.array([0.2,0.2,0.2,0.2], dtype = 'float')

 それぞれ、脚1の腿、脚1の膝、脚2の腿、脚2の膝のゲインです。

 

 PD制御の計算部分はソースリストのforループの中です。それぞれの値はnumpyのテンソル(np.array)にして1行で書いてます。

 #脚の角度 PD制御
 action = pgain * (legangledemand - legangle) - dgain * leganglespeed

 現在の脚の角度(legangle)、角速度(leganglespeed)はソースリストのforループの中で次のようにobservationから取り出します。

#脚の角度,角速度,胴体角取り出し----------------
 legangle[0] = observation[4] #HIP1
 leganglespeed[0] = observation[5]
 legangle[1] = observation[6] #KNEE1
 leganglespeed[1] = observation[7]
 legangle[2] = observation[9] #HIP2
 leganglespeed[2] = observation[10]
 legangle[3] = observation[11] #KNEE2
 leganglespeed[3] = observation[11]

では、ソースリストの次の文の角度指令値を色々変更して動かしてみます。

#角度指令値上書き。スケジュールで動かす時はコメントアウトすること
 legangledemand = np.array([1,1,1,1], dtype = 'float')

結果

f:id:akifukka:20191208123737j:plain

 指令値(legangledemand)は脚1の腿の角度、脚1の膝の角度、脚2の腿の角度、脚2の膝の角度です。

 この結果から、次のことがわかります。

  1. 脚1は薄い紫、脚2は濃い紫
  2. 腿の角度は1 ~ -1の範囲で動く。前が正、後ろが負の値。
  3. 膝の角度は1 ~ -0.6の範囲で動く。1が伸ばした状態、-0.6がもっとも曲げた状態。

さてさて、ソースリストの補足です

 ソースリスト中で指令値を与えている行をコメントアウトすると、ソースリストの上の方で"#脚の指令角スケジュール”として与えているスケジュールに従い、脚を動かして歩こうとします(何歩か歩けよォ。心の中の叫び)。

 が、結構難しくてうまく歩いてくれません・・・・。スケジュールの値、speed値(1秒間にスケジュールをどのくらい進めるか)を変えて試しているんですが・・。

 スケジュールは0~10のステップを1巡(脚1で1歩、脚2で1歩、計2歩で元に戻る)として、繰り返します。speed=5だと、1秒でスケジュール半分相当なので、1秒1歩です。

 このあたりは、うまく歩けてないので大幅に修正するかも知れませんが、現状でも色々試せると思います。

つづく
 どうやったらもう少し歩けるか思案して、人力でのチューニングはあきらめ乱数を使って半自動でチューニングしました。なんとか2,3歩ですが、歩けるようになりました。歩き方が若干変ですが・・・(何かぎこちないスキップ?)。

 強化学習 カテゴリーの記事一覧 - やってみた!

Open AI Gym Box2D BipedalWalkerをColaboratoryで動かしてみる(1)

 Open AI Gymをご存知の方も多いと思いますが、強化学習の開発用にOpen AIがフリーで提供しているテスト環境です。pythonで書かれています。

https://gym.openai.com/

 この中にはブロック崩し、インベーダゲームを始めとするAtariの様々なゲームや、2次元の月着陸、2足歩行(この記事)、3次元の人型ロボット等の様々な課題があります。当初、3次元はMujocoという有料の物理エンジンが使われていましたが、現在はPybulletというフリーのものが使えるようです。

 この記事ではOpen AI Gymの中でBox2D(2次元の物理エンジン)を使った2足歩行のBipedalWalker-v2を試してみたいと思います。 

1.Google Colaboratoryの準備

 GoogleアカウントでログインしてColaboratoryを起動、ファイルーPython3の新しいノートブックを開きます。

 

2.Box2Dのインストール

 次の2行でBox2D自身と、Box2D対応gymをインストールします。

!pip3 install box2d-py
!pip3 install 'gym[Box2D]'
 上記を実行したら必ず「ランタイムを再起動」を実行しましょう。実行しないとインストールしてもBox2Dを認識せずに、importしようとするとエラー(BipedalWalker-v2が無いと怒られます)が起きます。毎度ながらランタイムを再起動していないのが原因だと気づくまで1時間近くかかってしまいました。

3.画像表示の準備

 表示に必要なライブラリ類をインストールします。

!apt-get update
!apt-get -qq -y install xvfb freeglut3-dev ffmpeg
!pip3 -q install pyglet
!pip3 -q install pyopengl
!pip3 -q install pyvirtualdisplay

4.動かしてみます

  次のコードで動かします。この時点では絵は出ません。2足歩行BipedalWalker-v2を動かして、後で可視化するために次のデータを保存します。

・画像をリストframesに保存します。

・状態(observation)をリストobservationsに保存します。

・指令は action=[1,-1,1,-1]を与えます。
 両足のももを速度+1で動かし、両足の膝を速度ー1で動かします。#でコメントアウトしているのはサンプルで乱数で指令を与える関数です。

 BipedalWalker-v2はソースを見ると50FPS(0.02秒ステップで計算)しているので、グラフ描画用のリストts(横軸に使う)を1/50秒ステップで作っています。

import torch
import numpy as np
import gym
from pyvirtualdisplay import Display
display = Display(visible=0, size=(1024, 768))
display.start()
import os
#pyvirtualdisplayの仕様変更で次の文はエラーになるのでコメントアウトします(2021/6/5修正)
#os.environ["DISPLAY"] = ":" + str(display.display) + "." + str(display.screen)

#Bipedalwalkerのv2は無くなったのでv3に変更(2021/6/5)
env = gym.make('BipedalWalker-v3')

for i in range(1):
    obs = env.reset()
    done = False
    frames = []
    observations = []
    ts = []
    t = 0
    while not done and t < 20:
        frames.append(env.render(mode = 'rgb_array'))
#        action = env.action_space.sample()
        action = np.array([1,-1,1,-1])
        observation, reward, done, info = env.step(action)
        if t == 0 :
          for i in range(24):
            observations.append([observation[i]])
        else :
          for i in range(24):
            observations[i].append(observation[i])
        ts.append(t/50)
        t += 1

5.表示します

 次で4.項で計算した結果を表示します。

(1)observationsを使ってグラフを書きます。

 手直しできるよう簡単に使っている命令を解説します。
 fig1=plt.figure(figsize=(10,8)) 縦10、横8の図を生成
 sub1=fig1.add_subplot(421) サブプロット縦4、横2分割の場所1に追加。
 sub1.plot(ts,observations[0],label="hull angle") 横ts、縦observations[0]の折れ線追加
                     ラベル名はhull angle

 sub1.legend()       ラベルを表示

 後は上記の繰り返しです。

(2)frames内のimageを使って動画を作成します。

  fig2 = plt.figure(figsize=(6, 6))以降で図を生成して動画を作っています。

import matplotlib.pyplot as plt
import matplotlib.animation
import numpy as np
from IPython.display import HTML

fig1 = plt.figure(figsize=(10, 8))
sub1 = fig1.add_subplot(421)
sub1.plot(ts,observations[0],label="hull angle")
sub1.plot(ts,observations[1],label="hull angle Velocity")
sub1.legend()

sub2 = fig1.add_subplot(422)
sub2.plot(ts,observations[2],label="Velocity x")
sub2.plot(ts,observations[3],label="Velocity y")
sub2.legend()

sub3 = fig1.add_subplot(423)
sub3.plot(ts,observations[4],label="Hip1 Angle")
sub3.plot(ts,observations[9],label="Hip2 Angle")
sub3.legend()

sub4 = fig1.add_subplot(424)
sub4.plot(ts,observations[5],label="Hip1 Angle Velocity")
sub4.plot(ts,observations[10],label="Hip2 Angle Velocity")
sub4.legend()

sub5 = fig1.add_subplot(425)
sub5.plot(ts,observations[6],label="Knee1 Angle")
sub5.plot(ts,observations[11],label="Knee2 Angle")
sub5.legend()

sub6 = fig1.add_subplot(426)
sub6.plot(ts,observations[7],label="Knee1 Angle Velocity")
sub6.plot(ts,observations[12],label="Knee2 Angle Velocity")
sub6.legend()

sub7 = fig1.add_subplot(427)
sub7.plot(ts,observations[8],label="Leg1 Touch Ground")
sub7.plot(ts,observations[13],label="Leg2 Touch Ground")
sub7.legend()

plt.show()

fig2 = plt.figure(figsize=(6, 6))
plt.axis('off')

images = []
for f in frames:
  image = plt.imshow(f)
  images.append([image])
ani = matplotlib.animation.ArtistAnimation(fig2, images, interval=30, repeat_delay=1)

HTML(ani.to_jshtml())

 こんな感じでしゃがみます。

f:id:akifukka:20191206232803j:plain

f:id:akifukka:20191207125142j:plain

 

 足の指令値として同じ値を与えていても、地面からの反力等の影響で必ずしも同じにはなりません。動かしてみて、足の指令(角度の速度指令)は次の方向だとわかりました。

腿(HIP)   前に出す方が +

膝(KNEE) しゃがむ方向が -

6.observation

githubでgym/envs/box2d/bipedal_walker.pyを見てみます。

    def step(self, action):
        #self.hull.ApplyForceToCenter((0, 20), True) -- Uncomment this to receive a bit of stability help
        control_speed = False  # Should be easier as well
        if control_speed:
            self.joints[0].motorSpeed = float(SPEED_HIP  * np.clip(action[0], -1, 1))
            self.joints[1].motorSpeed = float(SPEED_KNEE * np.clip(action[1], -1, 1))
            self.joints[2].motorSpeed = float(SPEED_HIP  * np.clip(action[2], -1, 1))
            self.joints[3].motorSpeed = float(SPEED_KNEE * np.clip(action[3], -1, 1))
        else:
            self.joints[0].motorSpeed     = float(SPEED_HIP     * np.sign(action[0]))
            self.joints[0].maxMotorTorque = float(MOTORS_TORQUE * np.clip(np.abs(action[0]), 0, 1))
            self.joints[1].motorSpeed     = float(SPEED_KNEE    * np.sign(action[1]))
            self.joints[1].maxMotorTorque = float(MOTORS_TORQUE * np.clip(np.abs(action[1]), 0, 1))
            self.joints[2].motorSpeed     = float(SPEED_HIP     * np.sign(action[2]))
            self.joints[2].maxMotorTorque = float(MOTORS_TORQUE * np.clip(np.abs(action[2]), 0, 1))
            self.joints[3].motorSpeed     = float(SPEED_KNEE    * np.sign(action[3]))
            self.joints[3].maxMotorTorque = float(MOTORS_TORQUE * np.clip(np.abs(action[3]), 0, 1))

        self.world.Step(1.0/FPS, 6*30, 2*30)

        pos = self.hull.position
        vel = self.hull.linearVelocity

        for i in range(10):
            self.lidar[i].fraction = 1.0
            self.lidar[i].p1 = pos
            self.lidar[i].p2 = (
                pos[0] + math.sin(1.5*i/10.0)*LIDAR_RANGE,
                pos[1] - math.cos(1.5*i/10.0)*LIDAR_RANGE)
            self.world.RayCast(self.lidar[i], self.lidar[i].p1, self.lidar[i].p2)

        state = [
            self.hull.angle,        # Normal angles up to 0.5 here, but sure more is possible.
            2.0*self.hull.angularVelocity/FPS,
            0.3*vel.x*(VIEWPORT_W/SCALE)/FPS,  # Normalized to get -1..1 range
            0.3*vel.y*(VIEWPORT_H/SCALE)/FPS,
            self.joints[0].angle,   # This will give 1.1 on high up, but it's still OK (and there should be spikes on hiting the ground, that's normal too)
            self.joints[0].speed / SPEED_HIP,
            self.joints[1].angle + 1.0,
            self.joints[1].speed / SPEED_KNEE,
            1.0 if self.legs[1].ground_contact else 0.0,
            self.joints[2].angle,
            self.joints[2].speed / SPEED_HIP,
            self.joints[3].angle + 1.0,
            self.joints[3].speed / SPEED_KNEE,
            1.0 if self.legs[3].ground_contact else 0.0
            ]
        state += [l.fraction for l in self.lidar]
        assert len(state)==24
        .
        .
        .return np.array(state), reward, done, {}

 最後の方にdef stepという記述があり、この関数が実際のモデルの中枢です。上の方で指令値actionを実際のモデルに適用して、最後の方でstate(observationに相当)を作っています。

 上記ソースコードからobservationの中身は次の通りです。

  0:胴体角度
 1:胴体角速度
 2:x方向移動速度
 3:y方向移動速度
 4:足1のHIP(腿)角度
 5:足1のHIP(腿)角速度
 6:足1のKNEE(膝)角度
 7:足1のKNEE(膝)角速度
 8:足1接地で1
 9:足2のHIP(腿)角度
10:足2のHIP(腿)角速度
11:足2のKNEE(膝)角度
12:足2のKNEE(膝)角速度
13:足2接地で1
14~23:Lidar(レーザレーダ、障害物検知用)、ここでは省略します。

actionの値を変更する等、試してみるといいと思います。

つづく

  強化学習 カテゴリーの記事一覧 - やってみた!