Hi everybody, I trying to implement my own DDPG agent to control an unstable system taken from MATLAB. In particular, my goal is to stabilize some plasma velocity, keeping a low current in the control circuit and using a limited control action (the tension applied to such circuit).
As references, i used this tutorial for the agent : TorchRL objectives: Coding a DDPG loss — PyTorch Tutorials 2.0.1+cu117 documentation
And this one for the environment: Pendulum: Writing your environment and transforms with TorchRL — PyTorch Tutorials 2.4.0+cu121 documentation
The program runs correctly but the agent doesn’t learn to solve the task. It’s been a few weeks of debugging, however I don’t have a clue where the problem is at. I’ll share with you the Environment as the DDPG part of my code (included training) is identical to the tutorial.
A few differences from the tutorials is that I don’t use the “get_env_stats()” method as I already know the mean and variance of my observations, and that I’m not using multiple environments in parallel (env_per_collector = 1), because I get some batch size error that I wasn’t able to figure out. Thanks in advance for any reply!
#MATLAB
#C2D ‘ZOH’ Ts = 0.0025
A = torch.tensor(np.array([[0.9755, -168.6], [-6.01e-07, 1.01]])).to(dtype=torch.float32)
B = torch.tensor(np.array([[2.037], [ 5.004e-05]])).to(dtype=torch.float32)
C = torch.tensor(np.identity(2)).to(dtype=torch.float32)
D = torch.tensor(np.zeros((2, 1))).to(dtype=torch.float32)
#Constants
is_fork = torch.multiprocessing.get_start_method() == “fork”
device = (
torch.device(0)
if torch.cuda.is_available() and not is_fork
else torch.device(“cpu”)
)
collector_device = “cpu” # or “CUDA”
k1 = 1
k2 = 2
k3 = 1
MAX_SPEED = 10.0 # [m/s]
MAX_VOLTAGE = 2300.0 # [V]
MAX_CURRENT = 10_000.0 # [A] (max current for the vs circuit)
CURRENT_THRESHOLD = 500
SPEED_THRESHOLD = 0.5
def generate_params(batch_size=None) → TensorDictBase:
if batch_size is None:
batch_size =
tens_dict = TensorDict(
{
“params”: TensorDict(
{
“max_speed”: MAX_SPEED,
“max_current”: MAX_CURRENT,
“max_voltage”: MAX_VOLTAGE,
},
)
},
,
)
if batch_size:
tens_dict = tens_dict.expand(batch_size).contiguous()
return tens_dict
def _reset(self, tensordict):
#print(‘resetting’)
if tensordict is None or tensordict.is_empty():
tensordict = self.generate_params()
max_init_current = torch.tensor(100, device=self.device)
max_init_speed = torch.tensor(0.1, device=self.device)
min_init_current = -max_init_current
min_init_speed = -max_init_speed
ivs3 = (
torch.rand(tensordict.shape, generator=self.rng, device=self.device)
* (max_init_current-min_init_current) + min_init_current
)
zc_dot = (
torch.rand(tensordict.shape, generator=self.rng, device=self.device)
* (max_init_speed-min_init_speed) + min_init_speed
)
VS_Env.current_time = 0
shape = torch.zeros(size=(*tensordict.shape, 1))
done = torch.zeros_like(shape, dtype=torch.bool)
#terminated = torch.zeros_like(shape, dtype=torch.bool)
out = TensorDict(
{
"ivs3": ivs3,
"zc_dot": zc_dot,
"params": tensordict["params"],
"done": done,
#"terminated": terminated,
},
batch_size=tensordict.shape
)
return out
#ENV SPEC:
def make_composite_from_td(td):
# Custom function to convert a tensordict
in a similar spec structure
# of unbounded values.
composite = CompositeSpec(
{
key: make_composite_from_td(tensor)
if isinstance(tensor, TensorDictBase)
else UnboundedContinuousTensorSpec(
dtype=tensor.dtype, device=tensor.device, shape=tensor.shape
)
for key, tensor in td.items()
},
shape=td.shape,
)
return composite
def _make_spec(self, td_params):
# STATE:
self.observation_spec = CompositeSpec(
zc_dot=BoundedTensorSpec(
low=-td_params[“params”, “max_speed”],
high=td_params[“params”, “max_speed”],
shape=(),
dtype=torch.float32,
),
ivs3=BoundedTensorSpec(
low=-td_params[“params”, “max_current”],
high=td_params[“params”, “max_current”],
shape=(),
dtype=torch.float32,
),
# We need to add the params
to the observation specs, as we want
# to pass it at each step during a rollout
params=make_composite_from_td(td_params[“params”]),
shape=(),
)
# Since the environment is stateless, we expect the previous output as input.
# For this, EnvBase
expects some state_spec to be available
self.state_spec = self.observation_spec.clone()
#ACTION:
self.action_spec = BoundedTensorSpec(
low=-td_params["params", "max_voltage"],
high=td_params["params", "max_voltage"],
shape=(1,),
dtype=torch.float32,
)
# REWARD:
self.reward_spec = UnboundedContinuousTensorSpec(shape=(*td_params.shape, 1))
#SEEDING:
def _set_seed(self, seed: Optional[int]):
rng = torch.manual_seed(seed) # Returns a torch.Generator object
self.rng = rng
class VS_Env(EnvBase):
metadata = {
“render_modes”: [“human”, “rgb_array”],
“render_fps”: 30,
}
batch_locked = False
current_time = 0
total_time = 2.5
time_step = 0.0025
def __init__(self, td_params=None, seed=None, device="cpu"):
if td_params is None:
td_params = generate_params()
super().__init__(device=device, batch_size=[])
self._make_spec(td_params)
if seed is None:
seed = torch.empty((), dtype=torch.int64).random_().item()
self.set_seed(seed)
# Helpers: _make_spec and gen_params
generate_params = staticmethod(generate_params)
_make_spec = _make_spec
# Mandatory methods: _step, _reset and _set_seed
_reset = _reset
#_step = staticmethod(_step)
_set_seed = _set_seed
@staticmethod
def _step(tensordict):
# Var di stato per bsys
ivs3, zc_dot = tensordict["ivs3"], tensordict["zc_dot"]
max_speed = tensordict["params", "max_speed"]
max_current = tensordict["params", "max_current"]
max_voltage = tensordict["params", "max_voltage"]
# Action
u = tensordict["action"].squeeze(-1)
u = u.clamp(-max_voltage, max_voltage)
shape = torch.zeros(size=(*tensordict.shape, 1))
cost_function = (k1 * ((zc_dot / max_speed).pow(2)) + k2 * ((ivs3 / max_current).pow(2)) +
k3 * ((u / max_voltage).pow(2)))
#terminated = torch.zeros_like(shape, dtype=torch.bool)
#truncated = torch.zeros_like(shape, dtype=torch.bool)
done = torch.zeros_like(shape, dtype=torch.bool)
state = torch.stack((ivs3, zc_dot)).squeeze(-1)
new_ivs3 = (A[0, 0] * state[0] + A[0, 1] * state[1] + B[0] * u).squeeze(-1)
new_zc_dot = (A[1, 0] * state[0] + A[1, 1] * state[1] + B[1] * u).squeeze(-1)
if abs(new_ivs3) > max_current or abs(new_zc_dot) > max_speed:
done = torch.ones_like(shape, dtype=torch.bool)
cost_function = cost_function + 10
print('disruption ->', new_ivs3)
if abs(new_zc_dot) > max_speed:
print('disruption vel ->', new_zc_dot)
"""
if abs(new_ivs3) < CURRENT_THRESHOLD and abs(new_zc_dot) < SPEED_THRESHOLD:
cost_function = cost_function - 2
"""
if VS_Env.current_time >= VS_Env.total_time:
done = torch.ones_like(shape, dtype=torch.bool)
print('done')
VS_Env.current_time = VS_Env.current_time + VS_Env.time_step
"""
if abs(new_ivs3) > 5000 or abs(zc_dot) > 5:
cost_function = cost_function + 5
if abs(new_ivs3) > max_current or abs(new_zc_dot) > max_speed:
truncated = torch.ones_like(shape, dtype=torch.bool)
cost_function = cost_function + 10
if abs(new_ivs3) < CURRENT_THRESHOLD and abs(new_zc_dot) < SPEED_THRESHOLD:
cost_function = cost_function - 2
if VS_Env.current_time >= VS_Env.total_time:
terminated = torch.ones_like(shape, dtype=torch.bool)
"""
reward = -cost_function.view(*tensordict.shape, 1).to(dtype=torch.float32)
# Solo reward e terminated devono essere torch.Size([1]), il resto deve essere
# torch.Size([])
# Uscita di step
out = TensorDict(
{
"ivs3": new_ivs3,
"zc_dot": new_zc_dot,
"params": tensordict["params"],
"reward": reward,
#"terminated": terminated,
#"truncated": truncated,
"done": done,
},
tensordict.shape,
)
return out
def make_env():
env = VS_Env()
return env
#Transforming the env:
def env_transform(env):
reward_scaling = 5
#max_steps = 2_000
env = TransformedEnv(
env,
# ``Unsqueeze`` the observations that we will concatenate
UnsqueezeTransform(
dim=-1,
in_keys=["ivs3", "zc_dot"],
in_keys_inv=["ivs3", "zc_dot"],
),
)
env.append_transform(RewardScaling(loc=0.0, scale=reward_scaling))
cat_transform = CatTensors(
in_keys=["ivs3", "zc_dot"], dim=-1, out_key="observation_vector", del_keys=False
)
env.append_transform(cat_transform)
env.append_transform(ObservationNorm(in_keys=["observation_vector"], loc=[-175, 0], scale=[1.11e+04, 2.8305], standard_normal=True))
env.append_transform(DoubleToFloat())
#env.append_transform(StepCounter(max_steps))
# We need a marker for the start of trajectories for our Ornstein-Uhlenbeck (OU)
# exploration:
env.append_transform(InitTracker())
return env