I am trying to build dataset class for dataloader which I can use to train pytorch graph temporal model. Here is my class
import torch
import numpy as np
from networkx import from_numpy_array, from_numpy_matrix
from torch_geometric_temporal.signal import temporal_signal_split
from torch_geometric.utils import dense_to_sparse
from torch_geometric_temporal.signal import StaticHeteroGraphTemporalSignal
import networkx as nx
import pandas as pd
def adjacency_matrix_transition(node_state_type_size, node_action_type_size):
ns = node_state_type_size
na = node_action_type_size
A = np.zeros((ns+na,ns+na)).astype(np.float32)
for i in range(A.shape[0]):
for j in range(A.shape[1]):
if i!=j:
if i <j:
if i<ns and j<ns:
A[i,j]=A[j,i]=1
elif i<ns and j>=ns:
A[i,j]=A[j,i]=0.6
elif i>=ns and j>=ns:
A[i,j]=A[j,i]=0.9
return A
class GraphDataset(object):
def __init__(self,
input_name,
target_name,
observation_size,
action_size,
reward_size,
filepath=None,
max_length=90,
device=torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
):
super(GraphDataset, self).__init__()
self.action_size = action_size
self.observation_size = observation_size
self.reward_size = reward_size
self.input_filename = input_name
self.target_filename = target_name
self.filepath = filepath
self.max_length = max_length
self.device = device
self._read_data()
def _read_data(self):
#input shape(Batch, seq_length_input_size)
#
self.X = torch.load(os.path.join(self.filepath, self.input_filename))[:,:self.max_length,:]
self.Y = torch.load(os.path.join(self.filepath, self.target_filename))[:,:self.max_length,:]
self.X = self.X.permute(0,2,1).unsqueeze(-2)## (B, N, F, T)
self.Y = self.Y.permute(0,2,1)
print(f"input data {self.X.shape}")
print(f"target data {self.Y.shape}")
def _get_edges_and_weights(self):
self.edge_index_dicts=dict()
self.edge_weights_dicts=dict()
self.input_graph = torch.from_numpy(adjacency_matrix_transition(self.observation_size,self.action_size))
edge_indices, values = dense_to_sparse(self.input_graph)
edge_indices = edge_indices.numpy()
values = values.numpy()
self.edge_index_dicts["features"] = edge_indices
self.edge_weights_dicts["features"] = values
graph = from_numpy_array(self.input_graph.numpy())
edge_list = nx.to_pandas_edgelist(graph)
adj = nx.from_pandas_edgelist(edge_list, edge_attr=['weight'])
adj = pd.DataFrame(nx.adjacency_matrix(adj, weight='weight').todense())
adjacency = adj.copy()
D = np.diag(np.sum(adjacency,axis=1))
I = np.identity(adjacency.shape[0])
D_inv_sqrt = np.linalg.inv(np.sqrt(D))
L = I - np.dot(D_inv_sqrt, adjacency).dot(D_inv_sqrt)
self.normalized_laplacian_input = L.copy()
##
self.target_graph= torch.from_numpy(adjacency_matrix_transition(self.observation_size,self.reward_size))
edge_indices, values = dense_to_sparse(self.target_graph)
edge_indices = edge_indices.numpy()
values = values.numpy()
self.edge_index_dicts["targets"] = edge_indices
self.edge_weights_dicts["targets"] = values
graph = from_numpy_array(self.target_graph.numpy())
edge_list = nx.to_pandas_edgelist(graph)
adj = nx.from_pandas_edgelist(edge_list, edge_attr=['weight'])#,source=['source'], target=['target'])
adj = pd.DataFrame(nx.adjacency_matrix(adj, weight='weight').todense())
adjacency = adj.copy()
D = np.diag(np.sum(adjacency,axis=1))
I = np.identity(adjacency.shape[0])
D_inv_sqrt = np.linalg.inv(np.sqrt(D))
L = I - np.dot(D_inv_sqrt, adjacency).dot(D_inv_sqrt)
self.normalized_laplacian_output = L.copy()
def _get_targets_and_features(self, num_timesteps_in: int = 12, num_timesteps_out: int = 12):
indices=[i
for i in range(self.X.shape[-1] - (num_timesteps_in + num_timesteps_out) + 1)]
self.features,self.targets=[],[]
for i in indices:
self.features.append(self.X[:,:,:, i : i + num_timesteps_in].numpy())
self.targets.append(self.Y[:,:,i : i + num_timesteps_in].numpy())
def get_dataset(self, num_timesteps_in: int = 12, num_timesteps_out: int = 12) -> StaticHeteroGraphTemporalSignal:
"""Returns data iterator as an instance of the static graph temporal signal class."""
self._get_edges_and_weights()
self._get_targets_and_features(num_timesteps_in, num_timesteps_out)
dataset = StaticHeteroGraphTemporalSignal(
self.edge_index_dicts,
self.edge_weights_dicts,
self.features,
self.targets
)
return dataset
def create_dataloader(train_dataset, DEVICE, bs, shuffle):
train_input = np.array(train_dataset.features)
train_target = np.array(train_dataset.targets)
train_x_tensor = torch.from_numpy(train_input).type(torch.FloatTensor).to(DEVICE) # (B, N, F, T)
train_target_tensor = torch.from_numpy(train_target).type(torch.FloatTensor).to(DEVICE) # (B, N, T)
train_dataset_new = torch.utils.data.TensorDataset(train_x_tensor, train_target_tensor)
train_loader = torch.utils.data.DataLoader(train_dataset_new, batch_size=bs, shuffle=shuffle, drop_last=True)
return train_loader
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # cuda
print(DEVICE)
shuffle = False
obs_size, act_size, r_size= 11, 3, 1
batch_size = 16
d_e=7*2
d_d=7
times = 20
# Read data
input_name = "inputs_features.pt"
target_name = "outputs_features.pt"
filepath ="/home/"
loader = GraphDataset(input_name, target_name, ob_size, act_size, r_size, filepath)
dataset = loader.get_dataset(num_timesteps_in=d_e, num_timesteps_out=d_d)
print(next(iter(dataset))) # Show first sample
A_tilde = torch.from_numpy(loader.normalized_laplacian_input).to(DEVICE)
X_s = torch.from_numpy(loader.features).to(DEVICE)
I am getting this error running above code
input data torch.Size([4447, 14, 1, 90])
target data torch.Size([4447, 12, 90])
AttributeError Traceback (most recent call last)
/tmp/ipykernel_5880/605001366.py in <module>
154 loader = GraphDataset(input_name, target_name, obs_size, act_size, r_size, filepath)
155 dataset = loader.get_dataset(num_timesteps_in=d_e, num_timesteps_out=d_d)
--> 156 print(next(iter(dataset))) # Show first sample
157 A_tilde = torch.from_numpy(loader.normalized_laplacian_input).to(DEVICE)
158 X_s = torch.from_numpy(loader.features).to(DEVICE)
~/anaconda3/lib/python3.9/site-packages/torch_geometric_temporal/signal/static_hetero_graph_temporal_signal.py in __next__(self)
201 def __next__(self):
202 if self.t < len(self.feature_dicts):
--> 203 snapshot = self[self.t]
204 self.t = self.t + 1
205 return snapshot
~/anaconda3/lib/python3.9/site-packages/torch_geometric_temporal/signal/static_hetero_graph_temporal_signal.py in __getitem__(self, time_index)
173 )
174 else:
--> 175 x_dict = self._get_features(time_index)
176 edge_index_dict = self._get_edge_index()
177 edge_weight_dict = self._get_edge_weight()
~/anaconda3/lib/python3.9/site-packages/torch_geometric_temporal/signal/static_hetero_graph_temporal_signal.py in _get_features(self, time_index)
136 return self.feature_dicts[time_index]
137 else:
--> 138 return {key: torch.FloatTensor(value) for key, value in self.feature_dicts[time_index].items()
139 if value is not None}
140
AttributeError: 'numpy.ndarray' object has no attribute 'items'
any suggestion how I can fix this error?
I am also wondering whether my approach is correct for building tempporal graph where the input graph structure is different from the output graph ? Especially I am wondering whether StaticHeteroGraphTemporalSignal
is a correct choice or not?