Policy Reinforcement learning with Pytorch

I’m currently working on a code using pygame and pytorch to implement a policy-based agent.
The goal is very simple for the moment : a car is in a straight road and has to stay in it. It can do nothing (action 0) / go left (action 1) or go right (action 2)

I followed the tutorial https://github.com/pytorch/examples/blob/master/reinforcement_learning/reinforce.py to use it.

But it words on the cart pole game but not on my case and I don’t understand why. It seems that my network is not learning.

Here is my main code :

import numpy as np
from model import Agent
from car_game import CarGame
import pygame
import torch
import torch.optim as optim
from torch.autograd import Variable
from torch.distributions import Categorical
import matplotlib.pyplot as plt

GREEN = (20, 255, 140)
GREY = (210, 210, 210)
WHITE = (255, 255, 255)

SCREENWIDTH = 800
SCREENHEIGHT = 600

size = (SCREENWIDTH, SCREENHEIGHT)
screen = pygame.display.set_mode(size)
pygame.display.set_caption("Car Racing")


def select_action(state):
    state = torch.from_numpy(state).float().unsqueeze(0)
    probs = policy(Variable(state))
    m = Categorical(probs)
    action = m.sample()
    policy.saved_log_probs.append(m.log_prob(action))
    return action.data[0]


def finish_episode(show=False):
    R = 0
    policy_loss = []
    rewards = []
    for r in policy.rewards[::-1]:
        R = r + gamma * R
        rewards.insert(0, R)

    rewards = torch.Tensor(rewards)
    rewards = (rewards - rewards.mean()) / (rewards.std() + np.finfo(np.float32).eps)
    for log_prob, reward in zip(policy.saved_log_probs, rewards):
        policy_loss.append(-log_prob * reward)
    optimizer.zero_grad()
    policy_loss = torch.cat(policy_loss).sum()
    policy_loss.backward()
    optimizer.step()
    if show:
        print("Reward : ", R, ' Policy Loss', policy_loss.data[0])
    del policy.rewards[:]
    del policy.saved_log_probs[:]


def main():
        global nb_episodes_before_dying
        nb_episodes_before_dying = []
        for i_episode in range(0, 100):
            car_game = CarGame(speed=1, min_speed=0.5, screenheight=SCREENHEIGHT)
            state = [car_game.playerCar.rect.x]
            pygame.init()
            # print(i_episode)
            carryOn = True
            nb_episodes = 0
            while carryOn:
                nb_episodes += 1
                for event in pygame.event.get():
                    if event.type == pygame.QUIT:
                        carryOn = False
                action = select_action(np.array(state))
                # print(action)

                state, reward, done = car_game.play_one_step(action)

                policy.rewards.append(reward)
                if done or nb_episodes > 10000:
                    nb_episodes_before_dying.append(nb_episodes)
                    carryOn = False
                car_game.all_sprites_list.update()

                # Drawing on Screen
                screen.fill(GREEN)
                # Draw The Road
                pygame.draw.rect(screen, GREY, [300, 0, 200, SCREENHEIGHT])
                # Draw Line painting on the road
                pygame.draw.line(screen, WHITE, [400, 0], [400, SCREENHEIGHT], 5)
                # Draw Line painting on the road
                """pygame.draw.line(screen, WHITE, [240, 0], [240, SCREENHEIGHT], 5)
                # Draw Line painting on the road
                pygame.draw.line(screen, WHITE, [340, 0], [340, SCREENHEIGHT], 5)"""

                # Now let's draw all the sprites in one go. (For now we only have 1 sprite!)
                car_game.all_sprites_list.draw(screen)

                # Refresh Screen
                pygame.display.flip()

                # Number of frames per secong e.g. 60
                car_game.clock.tick(100)

            finish_episode(True)

if __name__ == "__main__":
    policy = Agent()
    optimizer = optim.Adam(policy.parameters(), lr=1e-2)
    gamma = 0.99
    main()
    plt.plot(nb_episodes_before_dying)
    plt.savefig('myfig.png')

Here is my model code (very basic) :

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F


class Agent(nn.Module):
def __init__(self):
    super(Agent, self).__init__()
    self.layer1 = nn.Linear(1, 32)
    self.output = nn.Linear(32, 3)

    self.rewards = []
    self.saved_log_probs = []

def forward(self, x):
    x = F.relu(self.layer1(x))
    action_scores = self.output(x)
    # print(action_scores)
    return F.softmax(action_scores, dim=-1)

And this is my last code file which implement my car game and process the game steps :

class CarGame():

def __init__(self, speed, min_speed, screenheight):
    self.all_sprites_list = pygame.sprite.Group()
    self.SPEED = speed
    self.MIN_SPEED = min_speed
    self.screenheight = screenheight

    self.playerCar = Car(RED, 60, 80, 70)
    self.playerCar.rect.x = 430
    self.playerCar.rect.y = screenheight - 100

    self.car1 = Car(PURPLE, 60, 80, random.randint(50, 100))
    self.car1.rect.x = 310
    self.car1.rect.y = -100

    self.car2 = Car(YELLOW, 60, 80, random.randint(50, 100))
    self.car2.rect.x = 430
    self.car2.rect.y = -600

    # Add the car to the list of objects
    self.all_sprites_list.add(self.playerCar)
    self.all_sprites_list.add(self.car1)
    self.all_sprites_list.add(self.car2)

    self.all_coming_cars = pygame.sprite.Group()
    self.all_coming_cars.add(self.car1)
    self.all_coming_cars.add(self.car2)

    self.clock = pygame.time.Clock()

def play_one_step(self, action):
    """ Action is 0 (nothing), 1 (left) or 2 (right)
    Update the game and return :
        state, reward, done"""
    done = False
    if action == 0:
        pass
    if action == 1:
        self.playerCar.moveLeft(5)
    if action == 2:
        self.playerCar.moveRight(5)

    # Game Logic
    for car in self.all_coming_cars:
        car.moveForward(self.SPEED)
        if car.rect.y > self.screenheight:
            car.changeSpeed(random.randint(50, 100))
            car.repaint(random.choice(colorList))
            car.rect.y = -200

    # Detect if out of pistes
    if self.playerCar.rect.x < 310:
        print("Car out")
        done = True
    if self.playerCar.rect.x > 440:
        print("Car out")
        done = True
    return ([self.playerCar.rect.x], 1, done)

I don’t have any problems in my logs, but when I print the output of my network I have very quickly : 0, 1, 0 --> So my car does every time the same action and doesn’t train.
It’s done randomly, sometimes my car find directly the good action (do nothing because the road is straight …) but when left is at 1, it’s blocked and this forever.
Here is my log when my car go right since the begginning :

Car out
Reward :  2.9701  Policy Loss 0.0
Car out
Reward :  2.9701  Policy Loss 0.0
Car out
Reward :  2.9701  Policy Loss 0.0
Car out
Reward :  2.9701  Policy Loss 0.0
Car out
Reward :  2.9701  Policy Loss 0.0
Car out
Reward :  2.9701  Policy Loss 0.0
Car out
Reward :  2.9701  Policy Loss 0.0
Car out
Reward :  2.9701  Policy Loss 0.0

My reward and my policy loss don’t change …

Do you have any idea please ?

Thank you very much !