強化学習/Pythonで強化学習する のバックアップ差分(No.3)


  • 追加された行はこの色です。
  • 削除された行はこの色です。
*はじめに [#l524e399]

ここでは、Pythonで強化学習を行います。

例題として、Sutton & Barto の『エージェント・アプローチ人工知能』で用いられている [math]4 \times 3[/math] の迷路を使います。
#ref(./maze.png,nolink,25%)
Sのマスが初期状態、+1 と -1 のマスが終端状態です。

行動は東西南北の4種類です。

外側と塗られているマスには壁があり、その方向には進めません。
0.8の確率で選んだ方向に進むことができ、0.1の確率で選んだ方向に対して90度左のマス、0.1の確率でその反対側のマスに進みます。

数値が書かれているマスに到達するとその数値が報酬として得られます。
その他のマスに到達すると報酬 -0.02 が得られます。


強化学習アルゴリズムはQ学習、行動選択は[math]\epsilon[/math]-グリーディーです。

次の環境で確認しました。
-macOS Sierra 10.12.5
-Python 3.5.1


*準備 [#t9a5a07c]

NumPyを使いますので、インストールします。

まず、pip3をインストールし、最新版にします。
#geshi(sh){{
$ sudo easy_install pip
$ sudo pip3 install --upgrade pip
}}

次に、pip3でNumPyをインストールします。
#geshi(sh){{
$ sudo pip3 install numpy
}}



*クラス構造 [#we2a4a0c]

強化学習の枠組みでは、学習エージェントが環境から状態を観測し、それに基づいて行動を選択し、実行します。
すると、環境の状態が変化し、次の状態と報酬が観測できます。
これを繰り返します。
-環境を表す Environment
--迷路を表す Maze
-強化学習エージェントを表す ReinformentLearningAgent
--Q学習エージェントを表す QLearningAgent

そこで、強化学習エージェントを表す ReinforcementLearningAgent と環境を表す Environment クラスを用意します。

また、Q学習エージェントを表す QLearningAgent を用意し、ReinforcementLearningAgent クラスのの子クラスとします。
*環境 environment.py [#y715bf52]

同様に、迷路を表す Maze クラスを用意し、Environment クラスの子クラスとします。
まず初めに、環境を表すクラスがどのような変数とメソッドを持っているかを決めておきます。

変数としては、次のものを持つことにします。
-状態数 states
-行動数 actions

メソッドしては、次のものを持つことにします。
-状態を初期化して返す initState
-終端状態かどうかを調べる isTerminal
-報酬を返す getReward
-行動を実行して次の状態と報酬を返す takeAction
-状態を文字列で返す strState
-行動を文字列で返す strAction
-ログを出力する printLog

printLog 以外の実際の実装は、子クラスで行います。


#ref(./environment.py)
#geshi(python){{
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# environment.py
#
# Copyright 2017 Tohgoroh Matsui All Rights Reserved.
#
from abc import ABCMeta, abstractmethod


class Environment(metaclass=ABCMeta):
    """強化学習の環境。"""

    states = None   # 状態数
    actions = None  # 行動数

    @classmethod
    @abstractmethod
    def initState(self):
        """初期状態を返す。"""
        pass

    @abstractmethod
    def isTerminal(self):
        """終端状態ならTrue、そうでないならFalseを返す。"""
        pass

    @classmethod
    @abstractmethod
    def getReward(self, s):
        """報酬を返す。"""
        pass

    @classmethod
    @abstractmethod
    def takeAction(self, s, a):
        """行動を実行して状態を更新し、次の状態と報酬を返す。"""
        pass

    @classmethod
    @abstractmethod
    def strState(self, s):
        """状態を表す文字列を返す。"""
        pass

    @classmethod
    @abstractmethod
    def strAction(self, a):
        """行動を表す文字列を返す。"""
        pass

    @classmethod
    def printLog(self, steps, s, a, r, s_):
        """ログを出力する。"""
        print('%d: %s, %s, %f, %s' % (steps, self.strState(s), self.strAction(a), r, self.strState(s_)))
}}




*強化学習エージェント rlagent.py [#qdcfcadd]

強化学習エージェントを表す ReinforcementLearningAgent クラスには、主に行動選択のメソッドを実装します。

学習するためのメソッドは抽象クラスとし、子クラスである QLearningAgent に実装します。

#ref(./rlagent.py)
#geshi(python){{
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# rlagent.py
#
# Copyright 2017 Tohgoroh Matsui All Rights Reserved.
#
from abc import ABCMeta, abstractmethod
import environment as env
import numpy as np


class ReinforcementLearningAgent(metaclass=ABCMeta):
  @abstractmethod
  def learn():
    pass
    """強化学習エージェント。"""

    @abstractmethod
    def learn(self):
        """強化学習を用いて学習する。"""
        pass

  # 一様ランダム選択
  def uniformly(self):
    a = np.random.randint(self.env.actions)     # 0からaction-1までの整数をランダムに生成する
    return a                                    # 生成した行動を返す
    def evaluate(self):
        """学習した行動価値が最も高い行動を選択したときの平均ステップ数を返す。"""
        episodes = 0
        totalSteps = 0
        while episodes < 100:   # 100エピソード繰り返す:
            steps = 0   # このエピソードにおけるステップ数
            s = self.env.initState()    # 状態 s を初期化する
            while not self.env.isTerminal(s) and steps < 100:   # 状態 s が終端状態になるか、または、100ステップになるまで繰り返す:
                a = self.greedy(s)  # グリーディーに行動を選択する
                s_, r = self.env.takeAction(s, a)   # 選択した行動を実行する
                #self.env.printLog(steps, s, a, r, s_)  # ログを出力する
                s = s_
                steps += 1
            episodes += 1
            totalSteps += steps
        return totalSteps / 100

    def uniformly(self):
        """一様ランダムに行動を選択して返す。"""
        a = np.random.randint(self.env.actions) # 0からaction-1までの整数をランダムに生成する
        return a

  # グリーディー選択
  #   s:  状態
  def greedy(self, s):
    maxQ     = float('-inf')                    # 最大のQ値
    maxA     = -1                               # 最大のQ値を持つ行動
    for a in range(self.env.actions):           # すべての行動 a について繰り返し:
      if self.aQ[s, a] > maxQ:                    # Q値がこれまでの最大のQ値よりも大きいなら:
        maxQ     =  self.aQ[s, a]                 # 最大のQ値を更新する
        maxA     =  a                             # 最大のQ値を持つ行動を更新する
    # 最大のQ値を持つ行動が複数ある場合、その中から一様ランダムに選択する
    aMaxId = np.where(self.aQ[s] == maxQ)[0]    # Q値がmaxQである要素のインデックスの配列
    if len(aMaxId) > 1:                         # Q値がmaxQである要素が複数あるなら
      r = np.random.randint(len(aMaxId))          # 0から要素数-1までの整数をランダムに生成する
      maxA = aMaxId[r]                            # Q値がmaxQである要素のインデックスから行動を選択する
    return maxA                                 # 最大のQ値を持つ行動を返す
    def greedy(self, s):
        """グリーディーに行動を選択して返す。"""
        maxQ = self.aQ[s].max() # 最大のQ値
        aIds = np.where(self.aQ[s] == maxQ)[0]  # Q値がmaxQである要素のインデックスの配列
        r = np.random.randint(len(aIds))    # 0から要素数-1までの整数をランダムに生成する
        a = aIds[r] # Q値がmaxQである要素のインデックスから行動を選択する
        return a

    def epsilonGreedy(self, s):
        """ε-グリーディー選択を用いて行動を選択して返す。"""
        r = np.random.rand()    # 乱数を生成する
        a = self.uniformly() if  r < self.epsilon else self.greedy(s)   # 確率εで一様ランダムに、確率1-εでグリーディーに行動を選択する
        return a

  # ε-グリーディー選択
  #   aQ: Q値
  #   s:  状態
  def epsilonGreedy(self, s):
    r = np.random.rand()          # 乱数を生成する
    if r < self.epsilon:          # 確率εで:
      a = self.uniformly()          # 一様ランダムに行動を選択する
    else:                         # 確率1-εで:
      a = self.greedy(s)            # グリーディーに行動を選択する
    return a                      # 選択した行動を返す
    def chooseAction(self, s):
        """行動を選択して返す。"""
        a = self.epsilonGreedy(s)   # ε-グリーディー選択で行動を選択する
        return a

    def setEnvironment(self, env):
        """環境をセットする。"""
        self.env = env

  # 環境をセットする
  #   env: 環境
  def setEnvironment(self, env):
    self.env = env


  # コンストラクター
  def __init__(self, seed=0, discountRate=0.9, stepSize = 0.1, epsilon=0.1, episodes=10000):
    np.random.seed(seed)
    self.discountRate = discountRate    # 割引率 γ
    self.stepSize     = stepSize        # ステップ・サイズ α
    self.epsilon      = epsilon         # ε-グリーディー選択のε
    self.episodes     = episodes        # 最大エピソード数
    self.aQ           = None            # Q値
    def __init__(self, seed=0, discountRate=0.9, stepSize = 0.1, epsilon=0.1, episodes=10000):
        np.random.seed(seed)
        self.discountRate = discountRate    # 割引率 γ
        self.stepSize = stepSize    # ステップ・サイズ α
        self.epsilon = epsilon  # ε-グリーディー選択のε
        self.episodes = episodes    # 最大エピソード数
        self.env = None # 環境
        self.aQ = None  # Q値
}}



*Q学習エージェント qlagent.py [#m72d7cc6]

Q学習エージェントを表す QLearningAgent クラスには、Q学習のアルゴリズムだけを実装します。

Q学習のアルゴリズムは、次のようなものです。
+行動価値 [math]Q(s, a)[/math] を初期化
+エピソードごとに繰り返し:
++状態 [math]s[/math] を初期化
++[math]s[/math] が終端状態になるまで繰り返し:
+++[math]Q[/math]から導かれる行動選択確率に基づいて [math]s[/math] から行動 [math]a[/math] を選択
+++[math]a[/math] を実行し、次の状態 [math]s'[/math] と報酬 [math]r[/math] を観測する
+++[math]Q(s, a) \leftarrow Q(s, a) + \alpha \left[ r + \gamma * \max_{a'}Q(s', a') - Q(s, a)\right][/math]
+++[math]s \leftarrow s'[/math]

これを、Pythonで実装すると、次のようになります。

#ref(./qlagent.py)
#geshi(python){{
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# qlagent.py
#
# Copyright 2017 Tohgoroh Matsui All Rights Reserved.
#
import rlagent
import environment as env
import numpy as np


class QLearningAgent(rlagent.ReinforcementLearningAgent):
  # 学習する
  def learn(self):
    gamma = self.discountRate   # 割引率
    alpha = self.stepSize       # ステップ・サイズ
    """Q学習エージェント。"""

    episodes = 0
    self.aQ = np.zeros([self.env.states, self.env.actions])       # Q値をゼロに初期化する
    while episodes < self.episodes:
      steps = 0
      s = self.env.initState()                                                  # 状態 s を初期化する
      while not self.env.isTerminal(s):                                         # 状態 s が終端状態になるまで繰り返す:
        a = self.epsilonGreedy(s)                                                 # Q値と状態 s から行動 a を選択する
        s_, r = self.env.takeAction(s, a)                                         # 行動 a を実行し、次の状態 s_ と報酬 r を観測する
        # self.env.printLog(steps, s, a, r)                                       # ログを出力する
        a_ = self.greedy(s_)                                                      # 次の状態 s_ でQ値が最大の行動を調べる
        self.aQ[s, a] += alpha * (r + gamma * self.aQ[s_, a_] - self.aQ[s, a])    # Q値を更新する
        s = s_                                                                    # 状態を更新する
        steps += 1
      episodes += 1
    print(self.aQ)
    def learn(self):
        """Q学習を用いて学習する。"""
        gamma = self.discountRate   # 割引率
        alpha = self.stepSize   # ステップ・サイズ
        episodes = 0    # エピソード数
        self.aQ = np.zeros([self.env.states, self.env.actions]) # Q値をゼロに初期化する
        while episodes < self.episodes: # エピソード数が最大エピソード数になるまで繰り返し:
            steps = 0   # このエピソードにおけるステップ数
            s = self.env.initState()    # 状態 s を初期化する
            while not self.env.isTerminal(s):   # 状態 s が終端状態になるまで繰り返す:
                a = self.epsilonGreedy(s)   # Q値と状態 s から行動 a を選択する
                s_, r = self.env.takeAction(s, a)   # 行動 a を実行し、次の状態 s_ と報酬 r を観測する
                #self.env.printLog(steps, s, a, r, s)   # ログを出力する
                a_ = self.greedy(s_)    # 次の状態 s_ でQ値が最大の行動を調べる
                self.aQ[s, a] += alpha * (r + gamma * self.aQ[s_, a_] - self.aQ[s, a])  # Q値を更新する
                s = s_  # 状態を更新する
                steps += 1
            episodes += 1
            if episodes % np.power(10, np.floor(np.log10(episodes))) == 0:
                eval = self.evaluate()
                print('%d, %f' % (episodes, eval))
        print(self.aQ)


  # コンストラクター
  def __init__(self, seed=0, discountRate=0.9, stepSize=0.1, epsilon=0.1, episodes=10000):
    super().__init__(seed, discountRate, stepSize, epsilon, episodes)
    def __init__(self, seed=0, discountRate=0.9, stepSize=0.1, epsilon=0.1, episodes=10000):
        super().__init__(seed, discountRate, stepSize, epsilon, episodes)
}}




*環境 environment.py [#y715bf52]

環境を表す Environment クラスは、ほとんどのメソッドが実際の環境を表す子クラスが実装すべき抽象メソッドとしています。
*迷路 maze.py [#se58b5a5]

#ref(./maze.py)
#geshi(python){{
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# environment.py
#
# Copyright 2017 Tohgoroh Matsui All Rights Reserved.
#
from abc import ABCMeta, abstractmethod

class Environment(metaclass=ABCMeta):
  @abstractmethod
  def initState(self):
    pass

  @abstractmethod
  def isTerminal(self, s):
    pass

  @abstractmethod
  def getState(self):
    pass

  @abstractmethod
  def getReward(self, s):
    pass

  @abstractmethod
  def takeAction(self, s, a):
    pass

  @abstractmethod
  def strState(self, s):
    pass

  @abstractmethod
  def strAction(self, a):
    pass


  # ログを出力する
  def printLog(self, steps, s, a, r):
    print('%d: %s, %s, %f' % (steps, self.strState(s), self.strAction(a), r))
}}


*迷路 maze.py [#se58b5a5]
#geshi(python){{
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# maze.py
#
# Copyright 2017 Tohgoroh Matsui All Rights Reserved.
#
import environment as env
import numpy as np


class Maze(env.Environment):
  # 壁
  walls = np.array([[0, 0, 0, 0],
                    [0, 1, 0, 0],
                    [0, 0, 0, 0]]).T
    """Sutton & Barto 4x3迷路問題。"""

  # 終端状態
  terminals = np.array([[0, 0, 0, 1],
                        [0, 0, 0, 1],
                        [0, 0, 0, 0]]).T
    walls = np.array([[0, 0, 0, 0],
                      [0, 1, 0, 0],
                      [0, 0, 0, 0]]).T  # 壁
    terminals = np.array([[ 0, 0, 0, 1],
                          [ 0, 0, 0, 1],
                          [ 0, 0, 0, 0]]).T # 終端状態
    rewards = np.array([[-0.02, -0.02, -0.02,  1.],
                        [-0.02,  0.,   -0.02, -1.],
                        [-0.02, -0.02, -0.02, -0.02]]).T    # 報酬
    width = len(walls)  # 迷路の幅
    height = len(walls[0])   # 迷路の高さ
    states = width * height   # 状態の数
    actions = 4   # 行動の数

  # 報酬
  rewards = np.array([[-0.02, -0.02, -0.02,  1.  ],
                      [-0.02,     0, -0.02, -1.  ],
                      [-0.02, -0.02, -0.02, -0.02]]).T
    @classmethod
    def initState(cls):
        """初期状態を返す。"""
        state = cls.getState(0, 2)
        return state

    @classmethod
    def isTerminal(cls, state):
        """渡された状態が終端状態ならTrue, そうでないならFalseを返す。"""
        x, y = cls.getCoordinates(state)
        return True if cls.terminals[x, y] == 1 else False

  # 状態を初期化する
  def initState(self):
    s = self.coordinate2state(0, 2)
    self.state = s
    return s
    @classmethod
    def isWall(cls, x, y):
        """渡された座標が壁ならTrue, そうでないならFalseを返す。"""
        return True if x < 0 or x >= cls.width or y < 0 or y >= cls.height or cls.walls[x, y] == 1 else False

    @classmethod
    def north(cls, state):
        """北に壁がない場合は北隣の状態を、壁がある場合は現在の状態を返す。"""
        x, y = cls.getCoordinates(state)
        y_ = y - 1
        return cls.getState(x, y_) if not cls.isWall(x, y_) else state

  # 終端状態ならTrue、そうでないならFalseを返す
  #   s: 状態
  def isTerminal(self, s):
    x, y = self.state2coodinate(s)
    if self.terminals[x, y] == 1:
      return True
    else:
      return False
    @classmethod
    def east(cls, state):
        """東に壁がない場合は東隣の状態を、壁がある場合は現在の状態を返す。"""
        x, y = cls.getCoordinates(state)
        x_ = x + 1
        return cls.getState(x_, y) if not cls.isWall(x_, y) else state

    @classmethod
    def west(cls, state):
        """西に壁がない場合は西隣の状態を、壁がある場合は現在の状態を返す。"""
        x, y = cls.getCoordinates(state)
        x_ = x - 1
        return cls.getState(x_, y) if not cls.isWall(x_, y) else state

  # 北に進む
  #   s: 状態
  def north(self, s):
    x_, y_ = self.state2coodinate(s)
    if y_ > 0 and self.walls[x_, y_ - 1] == 0:
      y_ -= 1
    s_ = self.coordinate2state(x_, y_)
    return s_
    @classmethod
    def south(cls, state):
        """南に壁がない場合は南隣の状態を、壁がある場合は現在の状態を返す。"""
        x, y = cls.getCoordinates(state)
        y_ = y + 1
        return cls.getState(x, y_) if not cls.isWall(x, y_) else state

    @classmethod
    def getState(cls, x, y):
        """座標を状態に変換して返す"""
        state = y * cls.width + x
        return state

  # 東に進む
  #   s: 状態
  def east(self, s):
    x_, y_ = self.state2coodinate(s)
    if x_ < 3 and self.walls[x_ + 1, y_] == 0:
      x_ += 1
    s_ = self.coordinate2state(x_, y_)
    return s_
    @classmethod
    def getCoordinates(cls, state):
        """状態を座標に変換して返す。"""
        x = state % cls.width
        y = state // cls.width
        return x, y

    @classmethod
    def getReward(cls, state):
        """報酬を返す。"""
        x, y = cls.getCoordinates(state)
        reward = cls.rewards[x, y]
        return reward

  # 西に進む
  #   s: 状態
  def west(self, s):
    x_, y_ = self.state2coodinate(s)
    if x_ > 0 and self.walls[x_ - 1, y_] == 0:
      x_ -= 1
    s_ = self.coordinate2state(x_, y_)
    return s_
    @classmethod
    def takeAction(cls, state, action):
        """行動を実行して状態を更新し、次の状態と報酬を返す。"""
        if action == 0:
            forward = cls.north(state)
            left = cls.east(state)
            right = cls.west(state)
        elif action == 1:
            forward = cls.east(state)
            left = cls.south(state)
            right = cls.west(state)
        elif action == 2:
            forward = cls.west(state)
            left = cls.north(state)
            right = cls.south(state)
        else:
            forward = cls.south(state)
            left = cls.west(state)
            right = cls.east(state)
        r = np.random.random()
        if r < 0.8:
            state_ = forward
        elif r < 0.9:
            state_ = left
        else:
            state_ = right
        reward = cls.getReward(state_)  # 報酬を観測する
        return state_, reward # 次の状態と報酬を返す

    @classmethod
    def strState(cls, state):
        """状態を表す文字列を返す。"""
        x, y = cls.getCoordinates(state)
        return '(%d, %d)' % (x, y)

  # 南に進む
  #   s: 状態
  def south(self, s):
    x_, y_ = self.state2coodinate(s)
    if y_ < 2 and self.walls[x_, y_ + 1] == 0:
      y_ += 1
    s_ = self.coordinate2state(x_, y_)
    return s_
    @classmethod
    def strAction(cls, action):
        """行動を表す文字列を返す。"""
        if action == 0:
            str = 'north'
        elif action == 1:
            str = 'east'
        elif action == 2:
            str = 'west'
        else:
            str = 'south'
        return str


  # 座標から状態に変換する
  #   x: x座標
  #   y: y座標
  def coordinate2state(self, x, y):
    s = y * self.height + x
    return s


  # 状態から座標に変換する
  #   s: 状態
  def state2coodinate(self, s):
    x = s % self.height
    y = s // self.height
    return x, y


  # 状態を返す
  def getState():
    return self.state


  # 報酬を返す
  #   s: 状態
  def getReward(self, s):
    x, y = self.state2coodinate(s)
    r = self.rewards[x, y]
    return r


  # 行動する
  #   s: 状態
  #   a: 行動
  def takeAction(self, s, a):
    act = self.lActions[a]        # 行動
    s_ = act(s)                   # 行動を実行する
    r  = self.getReward(s_)       # 報酬を観測する
    return s_, r                  # 次の状態と報酬を返す


  # 状態を文字列で表して返す
  #   s: 状態
  def strState(self, s):
    x, y = self.state2coodinate(s)
    return '(%d, %d)' % (x, y)


  # 行動を文字列で表して返す
  #   a: 行動
  def strAction(self, a):
    return self.lActions[a].__name__


  # コンストラクター
  def __init__(self):
    super().__init__()
    self.width    = len(self.walls[0])                              # 迷路の幅
    self.height   = len(self.walls)                                 # 迷路の高さ
    self.states   = self.width * self.height                        # 状態の数
    self.lActions = [self.north, self.east, self.west, self.south]  # 行動のリスト
    self.actions  = len(self.lActions)                              # 行動の数
    self.initState()                                                # 状態を初期化する
    def __init__(self):
        super().__init__()
}}


*実行用ファイル mazeql.py [#ab065b92]

#ref(./mazeql.py)
#geshi(python){{
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# mazeql.py
#
# Copyright 2017 Tohgoroh Matsui All Rights Reserved.
#
import maze
import qlagent

# mainモジュール
if __name__ == '__main__':
  env   = maze.Maze()                   # 環境
  agent = qlagent.QLearningAgent()      # Q学習エージェント
  agent.setEnvironment(env)             # 環境をエージェントにセットする
  agent.learn()                         # 学習する
    maze  = maze.Maze() # 環境
    agent = qlagent.QLearningAgent()    # Q学習エージェント
    agent.setEnvironment(maze)  # 環境をエージェントにセットする
    agent.learn()   # 学習する
}}

トップ   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS