I have a custom working gymnasium environment. It works as expected. I am trying to convert the gymnasium environment into PyTorch rl environment. The idea is to use gymnasium custom environment as a wrapper. Why because, the gymnasium custom env has other libraries and complicated file structure that writing the PyTorch rl custom env from scratch is not desired. For some reasons, I keep getting error and can not figure it out.
To make it easy, I have swamped the custom gymnasium env with simple env. The workflow is there and the error is also there. Can anyone guide how to fix the issue:
import gymnasium as gym
from gymnasium import spaces
from tensordict import TensorDict, TensorDictBase
import torch
import torchrl
import numpy as np
from torchrl.data import BoundedTensorSpec, CompositeSpec, UnboundedContinuousTensorSpec
class SimpleEnv(gym.Env):
def __init__(self, grid_size=(5, 5), goal_pos=(4, 4)):
super(SimpleEnv, self).__init__()
self.grid_size = grid_size
self.goal_pos = goal_pos
self.agent_pos = (0, 0)
self.action_space = spaces.Discrete(4) # 4 possible actions: up, down, left, right
self.observation_space = spaces.Tuple((
spaces.Discrete(grid_size[0]), # Agent's x-position
spaces.Discrete(grid_size[1]) # Agent's y-position
))
def reset(self):
self.agent_pos = (0, 0) # Reset agent position to the starting point
return self.agent_pos
def step(self, action):
x, y = self.agent_pos
# Move the agent based on the action
if action == 0: # Up
y = max(0, y - 1)
elif action == 1: # Down
y = min(self.grid_size[1] - 1, y + 1)
elif action == 2: # Left
x = max(0, x - 1)
elif action == 3: # Right
x = min(self.grid_size[0] - 1, x + 1)
# Update agent's position
self.agent_pos = (x, y)
# Calculate reward and done
reward = 0
done = False
if self.agent_pos == self.goal_pos:
reward = 1 # Give reward if agent reaches the goal
done = True # End the episode
info = -55
return self.agent_pos, reward, done, info
def render(self):
grid = np.zeros(self.grid_size)
x, y = self.agent_pos
grid[y][x] = 1 # Mark agent's position
goal_x, goal_y = self.goal_pos
grid[goal_y][goal_x] = 0.5 # Mark goal position
# Print the grid
for row in grid:
print(' '.join(['A' if cell == 1 else 'G' if cell == 0.5 else '-' for cell in row]))
env = SimpleEnv()
obs = env.reset()
class envTorch(torchrl.envs.EnvBase):
def __init__(self, gym_env, device="cpu", batch_size=12):
self.env = gym_env
self.observation_space = gym_env.observation_space
self.action_space = gym_env.action_space
self.device = device
self.batch_size = batch_size
self.action_spec = torchrl.data.DiscreteTensorSpec(1, shape=torch.Size([self.action_space.shape[0], self.action_space.shape[1]]))
observation_spec = torchrl.data.UnboundedDiscreteTensorSpec(shape=torch.Size([self.observation_space.shape[0], self.observation_space.shape[1]]))
self.observation_spec = CompositeSpec(observation=observation_spec) # Has to be CompositeSpec(not sure why)
self.reward_spec = UnboundedContinuousTensorSpec(shape=torch.Size([1])) # Unlimited reward space (even though we could limit it to (-inf, 0] in this particular example)
def _reset(self, tensordict):
reset_result = self.env.reset()
position = torch.tensor(reset_result[0], device=self.device)
reward = torch.tensor(reset_result[1], device=self.device)
out = TensorDict({
"position": position,
"reward": reward
}, batch_size=self.batch_size ) # Set the batch size appropriately
return out
def _step(self, tensordict):
u = tensordict["action"]
obs, reward, done, info = self.env.step( u ) # Use self.env, and numpy for zeros
obsT = torch.tensor(obs, device=self.device) #
rewardT = torch.tensor(reward, device=self.device) #
doneT = torch.zeros_like(torch.tensor(done), dtype=torch.bool, device=self.device) #
infoT = torch.tensor(info, device=self.device) #
out = TensorDict({
"position": obsT,
"reward": rewardT,
"done": doneT,
}, batch_size=self.batch_size ) # Set the batch size appropriately
return out
def _set_seed(self, seed):
pass