import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.autograd as autograd
import matplotlib.pyplot as plt
Enable anomaly detection for debugging
Parameters and Initialization
Set random seeds for reproducibility
Number of buses (reduced for testing)
N_BUSES = 100 # Reduced from 1000 to 100 for quicker testing
Time parameters
T = 10 # Total time in hours (reduced from 80)
dt = 1.0 # Time step in hours
TIME_STEPS = int(T / dt) # Number of time steps
eta = 0.8
sigma = 0.2
R = 1.0
Q = 1.0
Terminal cost function
gamma = lambda x: 2.0 / (1 - x)**2 # Encourage maximizing battery level
Initialize SOC for each bus: N(0.2, 0.2), replace negatives with absolute
initial_soc = np.random.normal(0.2, 0.2, N_BUSES)
initial_soc = np.abs(initial_soc)
initial_soc = np.clip(initial_soc, 0.0, 1.0) # Ensure SOC is within [0,1]
Convert initial SOC to torch tensor
initial_soc_tensor = torch.tensor(initial_soc, dtype=torch.float32).unsqueeze(1) # Shape: [N_BUSES, 1]
Define c(t) and z(t)
def generate_consumption_data():
# Set random seed for reproducibility
# Define time for 4 days, hourly intervals
hours_4days = np.linspace(0, 96, num=97)
# Create base sinusoidal consumption pattern
variable_consumption = 375 * np.sin(2 * np.pi * hours_4days / 24 + np.random.normal(0, 0.1, size=hours_4days.size)) + 375
# Apply peak usage adjustments
for i, hour in enumerate(hours_4days):
if 7 <= hour % 24 < 10: # Morning peak (weaker)
variable_consumption[i] *= (0.8 + 0.05 * np.random.randn())
elif 16 <= hour % 24 < 20: # Evening peak (stronger)
variable_consumption[i] *= (1.2 + 0.05 * np.random.randn())
elif 20 <= hour % 24 < 23: # Late evening (reduce slightly)
variable_consumption[i] *= (0.9 + 0.05 * np.random.randn())
else: # Overnight and midday standard with random fluctuations
variable_consumption[i] *= (1.0 + 0.02 * np.random.randn())
# Convert to MW
variable_consumption_mw = variable_consumption / 1000
# Adjust other appliance consumption to be slightly higher
increase_factor = 1.1
baseline_increase = 50
z_t = variable_consumption * increase_factor + baseline_increase
z_t_mw = z_t / 1000
return hours_4days, variable_consumption_mw, z_t_mw
Generate consumption data
hours_4days, c_t_mw_array, z_t_mw_array = generate_consumption_data()
Ensure that TIME_STEPS does not exceed the available data
assert TIME_STEPS + 1 <= len(hours_4days), “Not enough consumption data for the number of time steps.”
Convert c(t) and z(t) to torch tensors
Shape: [TIME_STEPS +1]
c_t_tensor = torch.tensor(c_t_mw_array[:TIME_STEPS +1], dtype=torch.float32)
z_t_tensor = torch.tensor(z_t_mw_array[:TIME_STEPS +1], dtype=torch.float32)
Neural Network Definitions
Actor Neural Network
class ActorNet(nn.Module):
def init(self, input_dim, hidden_dim, output_dim):
super(ActorNet, self).init() = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.Linear(hidden_dim, hidden_dim),
nn.Linear(hidden_dim, hidden_dim),
nn.Linear(hidden_dim, output_dim),
def forward(self, x, m):
# Concatenate xi and m
input_tensor =, m), dim=1)
Critic Neural Network
class CriticNet(nn.Module):
def init(self, input_dim, hidden_dim, output_dim):
super(CriticNet, self).init() = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.Tanh(), # Smooth activation
nn.Linear(hidden_dim, hidden_dim),
nn.Tanh(), # Smooth activation
nn.Linear(hidden_dim, output_dim)
def forward(self, x, m):
# Concatenate xi and m
input_tensor =, m), dim=1)
Mass Neural Network
class MassNet(nn.Module):
def init(self, input_dim, hidden_dim, output_dim):
super(MassNet, self).init() = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.Tanh(), # Smooth activation
nn.Linear(hidden_dim, hidden_dim),
nn.Tanh(), # Smooth activation
nn.Linear(hidden_dim, output_dim),
nn.Softplus() # Ensures m(x,t) is positive
def forward(self, x, t):
input_tensor =, t), dim=1)
Initialize neural networks for Actor, Critic, and Mass
input_dim_actor = 2 # xi and m
hidden_dim_actor = 32 # Reduced hidden dimensions for testing
output_dim_actor = 1
input_dim_critic = 2 # xi and m
hidden_dim_critic = 32 # Reduced hidden dimensions for testing
output_dim_critic = 1
input_dim_mass = 2 # xi and t
hidden_dim_mass = 32 # Reduced hidden dimensions for testing
output_dim_mass = 1 # m(x,t)
actor_net = ActorNet(input_dim_actor, hidden_dim_actor, output_dim_actor)
critic_net = CriticNet(input_dim_critic, hidden_dim_critic, output_dim_critic)
mass_net = MassNet(input_dim_mass, hidden_dim_mass, output_dim_mass)
Optimizers and Loss Functions
Optimizers (adjusted learning rate)# Reduced learning rate
actor_optimizer = optim.Adam(actor_net.parameters(), lr=0.00015)
critic_optimizer = optim.Adam(critic_net.parameters(), lr=0.00015)
mass_optimizer = optim.Adam(mass_net.parameters(), lr=0.00015)
Loss function
mse_loss = nn.MSELoss()
Storage for Vi, Bi, and p(m,z)
Initialize storage tensors
Vi_history = torch.zeros(TIME_STEPS + 1, N_BUSES, 1) # Shape: [TIME_STEPS+1, N_BUSES, 1]
Bi_history = torch.zeros(TIME_STEPS + 1, N_BUSES, 1) # Shape: [TIME_STEPS+1, N_BUSES, 1]
p_m_z_history = torch.zeros(TIME_STEPS + 1, 1) # Shape: [TIME_STEPS+1, 1]
Compute and store initial Vi and Bi
with torch.no_grad():
# Initialize time input as zero
t_initial = torch.zeros(N_BUSES, 1, dtype=torch.float32, device=initial_soc_tensor.device)
m_initial = mass_net(initial_soc_tensor, t_initial)
Vi_initial = critic_net(initial_soc_tensor, m_initial)
Bi_initial = actor_net(initial_soc_tensor, m_initial) * eta / (2 * R)
Vi_history[0] = Vi_initial
Bi_history[0] = Bi_initial
# Compute initial p(m,z)
sum_beta_initial = Bi_initial.sum(dim=0) # Sum over buses, resulting in shape [1]
# p(m,z) = sum(beta_i) + z(t)
# Get z(t=0)
z_t_initial = z_t_tensor[0].to(initial_soc_tensor.device)
p_m_z_initial = sum_beta_initial + z_t_initial
p_m_z_history[0] = p_m_z_initial
Main Training Loop
Initialize lists to store aggregated loss values for plotting
actor_losses_epoch =
critic_losses_epoch =
mass_losses_epoch =
total_losses_epoch =
num_epochs = 60 # Number of training epochs
for epoch in range(num_epochs):
print(f"Epoch {epoch+1}/{num_epochs}")
# Reset SOC for each epoch
xi = initial_soc_tensor.clone()
# Initialize accumulators for this epoch
epoch_actor_loss = 0.0
epoch_critic_loss = 0.0
epoch_mass_loss = 0.0
epoch_total_loss = 0.0
for t_step in range(TIME_STEPS):
current_time = t_step * dt
next_time = (t_step + 1) * dt
# Current SOC
xi_current = xi.clone().detach().requires_grad_(True) # Shape: [N_BUSES, 1]
# Get c(t) and z(t) for current time step
# c(t) and z(t) are precomputed and stored in c_t_tensor and z_t_tensor
# c(t) is in MW, ensure units are consistent with your model
c_t = c_t_tensor[t_step].to(xi.device) # Scalar tensor
z_t = z_t_tensor[t_step].to(xi.device) # Scalar tensor
# Expand c(t) to match [N_BUSES, 1] for operations
c_t_expanded = c_t.expand(N_BUSES, 1) # Shape: [N_BUSES, 1]
# Estimate mass m(x,t) using mass_net
t_input = torch.full((N_BUSES, 1), current_time, dtype=torch.float32, device=xi.device)
m_estimated = mass_net(xi_current, t_input) # Shape: [N_BUSES, 1]
# Critic: Estimate Vi(xi, m)
Vi = critic_net(xi_current, m_estimated) # Shape: [N_BUSES, 1]
# Actor: Estimate βi*(xi, m)
beta = actor_net(xi_current, m_estimated) # Shape: [N_BUSES, 1]
beta_scaled = beta * eta / (2 * R) # Scale βi* as per optimal control expression
# Store Vi and Bi
Vi_history[t_step + 1] = Vi.detach()
Bi_history[t_step + 1] = beta_scaled.detach()
# Compute p(m,z)
# p(m,z) = sum(beta_i) + z(t)
p_m_z = beta_scaled.sum(dim=0) + z_t # Sum over buses explicitly
p_m_z_history[t_step + 1] = p_m_z.detach()
# Compute gradients for Critic
# First derivative dVi/dxi with create_graph=True for higher-order derivatives
dVi_dxi = autograd.grad(
create_graph=True # Enabled to compute higher-order derivatives
)[0] # dVi/dxi
# Optimal provisioning rate as per βi* = (-eta / 2R) * dVi/dxi
beta_opt = (-eta / (2 * R)) * dVi_dxi # Shape: [N_BUSES, 1]
# Compute Actor loss
actor_loss = mse_loss(beta_scaled, beta_opt.detach())
# Compute second derivative d2Vi/dxi2
# Corrected grad_outputs to be scalar since outputs is scalar
d2Vi_dxi2 = autograd.grad(
grad_outputs=torch.tensor(1.0, device=xi_current.device), # Corrected to scalar
# Compute residual
# residual = -dVi/dt - (sigma**2 / 2) * d2Vi/dxi2 + R * beta^2 + Q * xi^2 + (-eta * beta + c(t) + 1) * dVi/dxi - p(m,z)
# Approximate dVi/dt using finite differences: (Vi(t) - Vi(t-1)) / dt
if t_step == 0:
dVi_dt = torch.zeros_like(Vi)
dVi_current = Vi_history[t_step + 1]
dVi_previous = Vi_history[t_step]
dVi_dt = (dVi_current - dVi_previous) / dt
residual = (
- (sigma**2 / 2) * d2Vi_dxi2
+ R * beta_scaled**2
+ Q * xi_current**2
+ (-eta * beta_scaled + c_t_expanded + 1) * dVi_dxi
- p_m_z
# Compute Critic loss
critic_loss = mse_loss(residual, torch.zeros_like(residual))
# Compute Mass network residual based on FPK equation
# FPK: dm/dt = -d/dx [m * (-eta / (2R)) * dVi/dxi] + (sigma**2 / 2) * d2m/dxi2
# Enable create_graph=True if higher-order gradients are needed
m_grad = autograd.grad(
grad_outputs=torch.tensor(1.0, device=xi_current.device), # Corrected to scalar
create_graph=True # Enabled for higher-order derivatives if needed
term1 = -torch.mean(m_grad * (-eta / (2 * R)) * dVi_dxi)
m_grad_xi = autograd.grad(
grad_outputs=torch.tensor(1.0, device=xi_current.device), # Corrected to scalar
term2 = (sigma**2 / 2) * torch.mean(m_grad_xi)
# Compute dm/dt as (m(t) - m(t-1)) / dt
if t_step == 0:
dm_dt = torch.zeros_like(m_estimated)
m_current = m_estimated.detach()
m_previous = m_history
dm_dt = (m_current - m_previous) / dt
# FPK residual
fpk_residual = term1 + term2 - dm_dt
# Compute Mass loss
mass_loss = mse_loss(fpk_residual, torch.zeros_like(fpk_residual))
# Combine all losses
total_loss = actor_loss + critic_loss + mass_loss
# Accumulate losses for this epoch
epoch_actor_loss += actor_loss.item()
epoch_critic_loss += critic_loss.item()
epoch_mass_loss += mass_loss.item()
epoch_total_loss += total_loss.item()
# Zero gradients for all optimizers
# Backward pass for all losses combined
# Gradient clipping to prevent exploding gradients
torch.nn.utils.clip_grad_norm_(actor_net.parameters(), max_norm=1.0)
torch.nn.utils.clip_grad_norm_(critic_net.parameters(), max_norm=1.0)
torch.nn.utils.clip_grad_norm_(mass_net.parameters(), max_norm=1.0)
# Update all optimizers
# Update SOC based on SDE: dxi = (-eta * beta + c(t)) * dt + sigma * dWi
# Approximate dWi as sqrt(dt) * normal
dWi = torch.randn(N_BUSES, 1, device=xi.device) * np.sqrt(dt)
xi = xi + (-eta * beta_scaled + c_t_expanded/N_BUSES) * dt + sigma * dWi
# Apply absolute value function to ensure all negative values become positive
xi = torch.abs(xi)
xi = torch.clamp(xi, 0.0, 1.0) # Ensure SOC remains within [0,1]
# Store previous SOC and mass for derivative computations
m_history = m_estimated.clone().detach()
# After all time steps in the epoch, compute average losses
epoch_actor_loss /= TIME_STEPS
epoch_critic_loss /= TIME_STEPS
epoch_mass_loss /= TIME_STEPS
epoch_total_loss /= TIME_STEPS
# Append averaged losses to epoch-wise loss lists
# End of epoch logging
print(f"Epoch {epoch+1}/{num_epochs} completed.")
print(f"Actor Loss: {epoch_actor_loss:.6f}, Critic Loss: {epoch_critic_loss:.6f}, Mass Loss: {epoch_mass_loss:.6f}, Total Loss: {epoch_total_loss:.6f}\n")
Plotting the Losses Over Epochs
plt.figure(figsize=(12, 6))
plt.plot(range(1, num_epochs + 1), total_losses_epoch, label=‘Total Loss’, color=‘purple’)
plt.plot(range(1, num_epochs + 1), actor_losses_epoch, label=‘Actor Loss’, color=‘blue’)
plt.plot(range(1, num_epochs + 1), critic_losses_epoch, label=‘Critic Loss’, color=‘green’)
plt.plot(range(1, num_epochs + 1), mass_losses_epoch, label=‘Mass Loss’, color=‘red’)
plt.title(‘Training Loss Over Epochs’)