Custom env from gymnasium

I have a custom working gymnasium environment. It works as expected. I am trying to convert the gymnasium environment into PyTorch rl environment. The idea is to use gymnasium custom environment as a wrapper. Why because, the gymnasium custom env has other libraries and complicated file structure that writing the PyTorch rl custom env from scratch is not desired. For some reasons, I keep getting error and can not figure it out.

To make it easy, I have swamped the custom gymnasium env with simple env. The workflow is there and the error is also there. Can anyone guide how to fix the issue:

import gymnasium as gym
from gymnasium import spaces

from tensordict import TensorDict, TensorDictBase
import torch
import torchrl
import numpy as np
from torchrl.data import BoundedTensorSpec, CompositeSpec, UnboundedContinuousTensorSpec

class SimpleEnv(gym.Env):
    def __init__(self, grid_size=(5, 5), goal_pos=(4, 4)):
        super(SimpleEnv, self).__init__()
        self.grid_size = grid_size
        self.goal_pos = goal_pos
        self.agent_pos = (0, 0)

        self.action_space = spaces.Discrete(4)  # 4 possible actions: up, down, left, right
        self.observation_space = spaces.Tuple((
            spaces.Discrete(grid_size[0]),  # Agent's x-position
            spaces.Discrete(grid_size[1])   # Agent's y-position
        ))

    def reset(self):
        self.agent_pos = (0, 0)  # Reset agent position to the starting point
        return self.agent_pos

    def step(self, action):
        x, y = self.agent_pos

        # Move the agent based on the action
        if action == 0:  # Up
            y = max(0, y - 1)
        elif action == 1:  # Down
            y = min(self.grid_size[1] - 1, y + 1)
        elif action == 2:  # Left
            x = max(0, x - 1)
        elif action == 3:  # Right
            x = min(self.grid_size[0] - 1, x + 1)

        # Update agent's position
        self.agent_pos = (x, y)

        # Calculate reward and done
        reward = 0
        done = False
        if self.agent_pos == self.goal_pos:
            reward = 1  # Give reward if agent reaches the goal
            done = True   # End the episode

        info = -55

        return self.agent_pos, reward, done, info

    def render(self):
        grid = np.zeros(self.grid_size)
        x, y = self.agent_pos
        grid[y][x] = 1  # Mark agent's position

        goal_x, goal_y = self.goal_pos
        grid[goal_y][goal_x] = 0.5  # Mark goal position

        # Print the grid
        for row in grid:
            print(' '.join(['A' if cell == 1 else 'G' if cell == 0.5 else '-' for cell in row]))


env = SimpleEnv()
obs = env.reset()



class envTorch(torchrl.envs.EnvBase):
    def __init__(self, gym_env, device="cpu", batch_size=12):
        self.env = gym_env
        self.observation_space = gym_env.observation_space
        self.action_space = gym_env.action_space
        self.device = device
        self.batch_size = batch_size

        self.action_spec = torchrl.data.DiscreteTensorSpec(1, shape=torch.Size([self.action_space.shape[0], self.action_space.shape[1]]))
        observation_spec = torchrl.data.UnboundedDiscreteTensorSpec(shape=torch.Size([self.observation_space.shape[0], self.observation_space.shape[1]])) 
        self.observation_spec = CompositeSpec(observation=observation_spec) # Has to be CompositeSpec(not sure why)
        self.reward_spec = UnboundedContinuousTensorSpec(shape=torch.Size([1])) # Unlimited reward space (even though we could limit it to (-inf, 0] in this particular example)


    def _reset(self, tensordict):
        reset_result = self.env.reset()
        position = torch.tensor(reset_result[0], device=self.device)
        reward = torch.tensor(reset_result[1], device=self.device)

        out = TensorDict({
            "position": position,
            "reward": reward
        }, batch_size=self.batch_size ) # Set the batch size appropriately

        return out

    def _step(self, tensordict):
        u = tensordict["action"]
        obs, reward, done, info = self.env.step( u ) # Use self.env, and numpy for zeros

        obsT = torch.tensor(obs, device=self.device) #
        rewardT = torch.tensor(reward, device=self.device) #
        doneT = torch.zeros_like(torch.tensor(done), dtype=torch.bool, device=self.device) #
        infoT = torch.tensor(info, device=self.device) #

        out = TensorDict({
            "position": obsT,
            "reward": rewardT,
            "done": doneT,
        }, batch_size=self.batch_size ) # Set the batch size appropriately

        return out
    
    def _set_seed(self, seed):
        pass

If your env is already in gym you can just do torchrl.envs.GymWrapper(SimpleEnv()), it should do what you want.

https://pytorch.org/rl/stable/reference/generated/torchrl.envs.GymWrapper.html?highlight=gymwrapper#torchrl.envs.GymWrapper