import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.autograd as autograd
import matplotlib.pyplot as plt
Enable anomaly detection for debugging
torch.autograd.set_detect_anomaly(True)
----------------------------
Parameters and Initialization
----------------------------
Set random seeds for reproducibility
np.random.seed(0)
torch.manual_seed(0)
Number of buses (reduced for testing)
N_BUSES = 100 # Reduced from 1000 to 100 for quicker testing
Time parameters
T = 10 # Total time in hours (reduced from 80)
dt = 1.0 # Time step in hours
TIME_STEPS = int(T / dt) # Number of time steps
Constants
eta = 0.8
sigma = 0.2
R = 1.0
Q = 1.0
Terminal cost function
gamma = lambda x: 2.0 / (1 - x)**2 # Encourage maximizing battery level
Initialize SOC for each bus: N(0.2, 0.2), replace negatives with absolute
initial_soc = np.random.normal(0.2, 0.2, N_BUSES)
initial_soc = np.abs(initial_soc)
initial_soc = np.clip(initial_soc, 0.0, 1.0) # Ensure SOC is within [0,1]
Convert initial SOC to torch tensor
initial_soc_tensor = torch.tensor(initial_soc, dtype=torch.float32).unsqueeze(1) # Shape: [N_BUSES, 1]
----------------------------
Define c(t) and z(t)
----------------------------
def generate_consumption_data():
# Set random seed for reproducibility
np.random.seed(42)
# Define time for 4 days, hourly intervals
hours_4days = np.linspace(0, 96, num=97)
# Create base sinusoidal consumption pattern
variable_consumption = 375 * np.sin(2 * np.pi * hours_4days / 24 + np.random.normal(0, 0.1, size=hours_4days.size)) + 375
# Apply peak usage adjustments
for i, hour in enumerate(hours_4days):
if 7 <= hour % 24 < 10: # Morning peak (weaker)
variable_consumption[i] *= (0.8 + 0.05 * np.random.randn())
elif 16 <= hour % 24 < 20: # Evening peak (stronger)
variable_consumption[i] *= (1.2 + 0.05 * np.random.randn())
elif 20 <= hour % 24 < 23: # Late evening (reduce slightly)
variable_consumption[i] *= (0.9 + 0.05 * np.random.randn())
else: # Overnight and midday standard with random fluctuations
variable_consumption[i] *= (1.0 + 0.02 * np.random.randn())
# Convert to MW
variable_consumption_mw = variable_consumption / 1000
# Adjust other appliance consumption to be slightly higher
increase_factor = 1.1
baseline_increase = 50
z_t = variable_consumption * increase_factor + baseline_increase
z_t_mw = z_t / 1000
return hours_4days, variable_consumption_mw, z_t_mw
Generate consumption data
hours_4days, c_t_mw_array, z_t_mw_array = generate_consumption_data()
Ensure that TIME_STEPS does not exceed the available data
assert TIME_STEPS + 1 <= len(hours_4days), “Not enough consumption data for the number of time steps.”
Convert c(t) and z(t) to torch tensors
Shape: [TIME_STEPS +1]
c_t_tensor = torch.tensor(c_t_mw_array[:TIME_STEPS +1], dtype=torch.float32)
z_t_tensor = torch.tensor(z_t_mw_array[:TIME_STEPS +1], dtype=torch.float32)
----------------------------
Neural Network Definitions
----------------------------
Actor Neural Network
class ActorNet(nn.Module):
def init(self, input_dim, hidden_dim, output_dim):
super(ActorNet, self).init()
self.net = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, output_dim),
nn.Sigmoid()
)
def forward(self, x, m):
# Concatenate xi and m
input_tensor = torch.cat((x, m), dim=1)
return self.net(input_tensor)
Critic Neural Network
class CriticNet(nn.Module):
def init(self, input_dim, hidden_dim, output_dim):
super(CriticNet, self).init()
self.net = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.Tanh(), # Smooth activation
nn.Linear(hidden_dim, hidden_dim),
nn.Tanh(), # Smooth activation
nn.Linear(hidden_dim, output_dim)
)
def forward(self, x, m):
# Concatenate xi and m
input_tensor = torch.cat((x, m), dim=1)
return self.net(input_tensor)
Mass Neural Network
class MassNet(nn.Module):
def init(self, input_dim, hidden_dim, output_dim):
super(MassNet, self).init()
self.net = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.Tanh(), # Smooth activation
nn.Linear(hidden_dim, hidden_dim),
nn.Tanh(), # Smooth activation
nn.Linear(hidden_dim, output_dim),
nn.Softplus() # Ensures m(x,t) is positive
)
def forward(self, x, t):
input_tensor = torch.cat((x, t), dim=1)
return self.net(input_tensor)
Initialize neural networks for Actor, Critic, and Mass
input_dim_actor = 2 # xi and m
hidden_dim_actor = 32 # Reduced hidden dimensions for testing
output_dim_actor = 1
input_dim_critic = 2 # xi and m
hidden_dim_critic = 32 # Reduced hidden dimensions for testing
output_dim_critic = 1
input_dim_mass = 2 # xi and t
hidden_dim_mass = 32 # Reduced hidden dimensions for testing
output_dim_mass = 1 # m(x,t)
actor_net = ActorNet(input_dim_actor, hidden_dim_actor, output_dim_actor)
critic_net = CriticNet(input_dim_critic, hidden_dim_critic, output_dim_critic)
mass_net = MassNet(input_dim_mass, hidden_dim_mass, output_dim_mass)
----------------------------
Optimizers and Loss Functions
----------------------------
Optimizers (adjusted learning rate)# Reduced learning rate
actor_optimizer = optim.Adam(actor_net.parameters(), lr=0.00015)
critic_optimizer = optim.Adam(critic_net.parameters(), lr=0.00015)
mass_optimizer = optim.Adam(mass_net.parameters(), lr=0.00015)
Loss function
mse_loss = nn.MSELoss()
----------------------------
Storage for Vi, Bi, and p(m,z)
----------------------------
Initialize storage tensors
Vi_history = torch.zeros(TIME_STEPS + 1, N_BUSES, 1) # Shape: [TIME_STEPS+1, N_BUSES, 1]
Bi_history = torch.zeros(TIME_STEPS + 1, N_BUSES, 1) # Shape: [TIME_STEPS+1, N_BUSES, 1]
p_m_z_history = torch.zeros(TIME_STEPS + 1, 1) # Shape: [TIME_STEPS+1, 1]
Compute and store initial Vi and Bi
with torch.no_grad():
# Initialize time input as zero
t_initial = torch.zeros(N_BUSES, 1, dtype=torch.float32, device=initial_soc_tensor.device)
m_initial = mass_net(initial_soc_tensor, t_initial)
Vi_initial = critic_net(initial_soc_tensor, m_initial)
Bi_initial = actor_net(initial_soc_tensor, m_initial) * eta / (2 * R)
Vi_history[0] = Vi_initial
Bi_history[0] = Bi_initial
# Compute initial p(m,z)
sum_beta_initial = Bi_initial.sum(dim=0) # Sum over buses, resulting in shape [1]
# p(m,z) = sum(beta_i) + z(t)
# Get z(t=0)
z_t_initial = z_t_tensor[0].to(initial_soc_tensor.device)
p_m_z_initial = sum_beta_initial + z_t_initial
p_m_z_history[0] = p_m_z_initial
----------------------------
Main Training Loop
----------------------------
Initialize lists to store aggregated loss values for plotting
actor_losses_epoch =
critic_losses_epoch =
mass_losses_epoch =
total_losses_epoch =
num_epochs = 60 # Number of training epochs
for epoch in range(num_epochs):
print(f"Epoch {epoch+1}/{num_epochs}")
# Reset SOC for each epoch
xi = initial_soc_tensor.clone()
# Initialize accumulators for this epoch
epoch_actor_loss = 0.0
epoch_critic_loss = 0.0
epoch_mass_loss = 0.0
epoch_total_loss = 0.0
for t_step in range(TIME_STEPS):
current_time = t_step * dt
next_time = (t_step + 1) * dt
# Current SOC
xi_current = xi.clone().detach().requires_grad_(True) # Shape: [N_BUSES, 1]
# Get c(t) and z(t) for current time step
# c(t) and z(t) are precomputed and stored in c_t_tensor and z_t_tensor
# c(t) is in MW, ensure units are consistent with your model
c_t = c_t_tensor[t_step].to(xi.device) # Scalar tensor
z_t = z_t_tensor[t_step].to(xi.device) # Scalar tensor
# Expand c(t) to match [N_BUSES, 1] for operations
c_t_expanded = c_t.expand(N_BUSES, 1) # Shape: [N_BUSES, 1]
# Estimate mass m(x,t) using mass_net
t_input = torch.full((N_BUSES, 1), current_time, dtype=torch.float32, device=xi.device)
m_estimated = mass_net(xi_current, t_input) # Shape: [N_BUSES, 1]
# Critic: Estimate Vi(xi, m)
Vi = critic_net(xi_current, m_estimated) # Shape: [N_BUSES, 1]
# Actor: Estimate βi*(xi, m)
beta = actor_net(xi_current, m_estimated) # Shape: [N_BUSES, 1]
beta_scaled = beta * eta / (2 * R) # Scale βi* as per optimal control expression
# Store Vi and Bi
Vi_history[t_step + 1] = Vi.detach()
Bi_history[t_step + 1] = beta_scaled.detach()
# Compute p(m,z)
# p(m,z) = sum(beta_i) + z(t)
p_m_z = beta_scaled.sum(dim=0) + z_t # Sum over buses explicitly
p_m_z_history[t_step + 1] = p_m_z.detach()
# Compute gradients for Critic
# First derivative dVi/dxi with create_graph=True for higher-order derivatives
dVi_dxi = autograd.grad(
outputs=Vi.sum(),
inputs=xi_current,
retain_graph=True,
create_graph=True # Enabled to compute higher-order derivatives
)[0] # dVi/dxi
# Optimal provisioning rate as per βi* = (-eta / 2R) * dVi/dxi
beta_opt = (-eta / (2 * R)) * dVi_dxi # Shape: [N_BUSES, 1]
# Compute Actor loss
actor_loss = mse_loss(beta_scaled, beta_opt.detach())
# Compute second derivative d2Vi/dxi2
# Corrected grad_outputs to be scalar since outputs is scalar
d2Vi_dxi2 = autograd.grad(
outputs=dVi_dxi.sum(),
inputs=xi_current,
grad_outputs=torch.tensor(1.0, device=xi_current.device), # Corrected to scalar
retain_graph=True,
create_graph=False
)[0]
# Compute residual
# residual = -dVi/dt - (sigma**2 / 2) * d2Vi/dxi2 + R * beta^2 + Q * xi^2 + (-eta * beta + c(t) + 1) * dVi/dxi - p(m,z)
# Approximate dVi/dt using finite differences: (Vi(t) - Vi(t-1)) / dt
if t_step == 0:
dVi_dt = torch.zeros_like(Vi)
else:
dVi_current = Vi_history[t_step + 1]
dVi_previous = Vi_history[t_step]
dVi_dt = (dVi_current - dVi_previous) / dt
residual = (
-dVi_dt
- (sigma**2 / 2) * d2Vi_dxi2
+ R * beta_scaled**2
+ Q * xi_current**2
+ (-eta * beta_scaled + c_t_expanded + 1) * dVi_dxi
- p_m_z
)
# Compute Critic loss
critic_loss = mse_loss(residual, torch.zeros_like(residual))
# Compute Mass network residual based on FPK equation
# FPK: dm/dt = -d/dx [m * (-eta / (2R)) * dVi/dxi] + (sigma**2 / 2) * d2m/dxi2
# Enable create_graph=True if higher-order gradients are needed
m_grad = autograd.grad(
outputs=m_estimated.sum(),
inputs=xi_current,
grad_outputs=torch.tensor(1.0, device=xi_current.device), # Corrected to scalar
retain_graph=True,
create_graph=True # Enabled for higher-order derivatives if needed
)[0]
term1 = -torch.mean(m_grad * (-eta / (2 * R)) * dVi_dxi)
m_grad_xi = autograd.grad(
outputs=m_grad.sum(),
inputs=xi_current,
grad_outputs=torch.tensor(1.0, device=xi_current.device), # Corrected to scalar
retain_graph=True,
create_graph=False
)[0]
#print('mgradddddd',m_grad_xi)
term2 = (sigma**2 / 2) * torch.mean(m_grad_xi)
# Compute dm/dt as (m(t) - m(t-1)) / dt
if t_step == 0:
dm_dt = torch.zeros_like(m_estimated)
else:
m_current = m_estimated.detach()
m_previous = m_history
dm_dt = (m_current - m_previous) / dt
# FPK residual
fpk_residual = term1 + term2 - dm_dt
# Compute Mass loss
mass_loss = mse_loss(fpk_residual, torch.zeros_like(fpk_residual))
# Combine all losses
total_loss = actor_loss + critic_loss + mass_loss
# Accumulate losses for this epoch
epoch_actor_loss += actor_loss.item()
epoch_critic_loss += critic_loss.item()
epoch_mass_loss += mass_loss.item()
epoch_total_loss += total_loss.item()
# Zero gradients for all optimizers
actor_optimizer.zero_grad()
critic_optimizer.zero_grad()
mass_optimizer.zero_grad()
# Backward pass for all losses combined
total_loss.backward()
# Gradient clipping to prevent exploding gradients
torch.nn.utils.clip_grad_norm_(actor_net.parameters(), max_norm=1.0)
torch.nn.utils.clip_grad_norm_(critic_net.parameters(), max_norm=1.0)
torch.nn.utils.clip_grad_norm_(mass_net.parameters(), max_norm=1.0)
# Update all optimizers
actor_optimizer.step()
critic_optimizer.step()
mass_optimizer.step()
# Update SOC based on SDE: dxi = (-eta * beta + c(t)) * dt + sigma * dWi
# Approximate dWi as sqrt(dt) * normal
dWi = torch.randn(N_BUSES, 1, device=xi.device) * np.sqrt(dt)
xi = xi + (-eta * beta_scaled + c_t_expanded/N_BUSES) * dt + sigma * dWi
# Apply absolute value function to ensure all negative values become positive
xi = torch.abs(xi)
xi = torch.clamp(xi, 0.0, 1.0) # Ensure SOC remains within [0,1]
#print('xi',xi)
# Store previous SOC and mass for derivative computations
m_history = m_estimated.clone().detach()
# After all time steps in the epoch, compute average losses
epoch_actor_loss /= TIME_STEPS
epoch_critic_loss /= TIME_STEPS
epoch_mass_loss /= TIME_STEPS
epoch_total_loss /= TIME_STEPS
# Append averaged losses to epoch-wise loss lists
actor_losses_epoch.append(epoch_actor_loss)
critic_losses_epoch.append(epoch_critic_loss)
mass_losses_epoch.append(epoch_mass_loss)
total_losses_epoch.append(epoch_total_loss)
# End of epoch logging
print(f"Epoch {epoch+1}/{num_epochs} completed.")
print(f"Actor Loss: {epoch_actor_loss:.6f}, Critic Loss: {epoch_critic_loss:.6f}, Mass Loss: {epoch_mass_loss:.6f}, Total Loss: {epoch_total_loss:.6f}\n")
----------------------------
Plotting the Losses Over Epochs
----------------------------
plt.figure(figsize=(12, 6))
plt.plot(range(1, num_epochs + 1), total_losses_epoch, label=‘Total Loss’, color=‘purple’)
plt.plot(range(1, num_epochs + 1), actor_losses_epoch, label=‘Actor Loss’, color=‘blue’)
plt.plot(range(1, num_epochs + 1), critic_losses_epoch, label=‘Critic Loss’, color=‘green’)
plt.plot(range(1, num_epochs + 1), mass_losses_epoch, label=‘Mass Loss’, color=‘red’)
plt.xlabel(‘Epoch’)
plt.ylabel(‘Loss’)
plt.title(‘Training Loss Over Epochs’)
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()