Hi, I am enjoying using the opacus package to apply differential privacy to the training process of my models, I am struggling to get it to work with my TVAE implementation though, could someone let me know why I get an Incompatible Module Exception, I am using similar modules to in all my other generative models. See my code below:
import numpy as np
import torch
from torch.nn import Linear, Module, Parameter, ReLU, Sequential
from torch.nn.functional import cross_entropy
from torch.optim import Adam
from torch.utils.data import DataLoader, TensorDataset
from models.CTGAN import DataTransformer, BaseSynthesiser
from .utils import GeneralTransformer, DPSynthesiser
import opacus
from opacus import autograd_grad_sample, PrivacyEngine, utils
import dill
class Encoder(Module):
def __init__(self, data_dim, compress_dims, embedding_dim):
super(Encoder, self).__init__()
dim = data_dim
seq = []
for item in list(compress_dims):
seq += [Linear(dim, item), ReLU()]
dim = item
self.seq = Sequential(*seq)
self.fc1 = Linear(dim, embedding_dim)
self.fc2 = Linear(dim, embedding_dim)
def forward(self, input):
feature = self.seq(input)
mu = self.fc1(feature)
logvar = self.fc2(feature)
std = torch.exp(0.5 * logvar)
return mu, std, logvar
class Decoder(Module):
def __init__(self, embedding_dim, decompress_dims, data_dim):
super(Decoder, self).__init__()
dim = embedding_dim
seq = []
for item in list(decompress_dims):
seq += [Linear(dim, item), ReLU()]
dim = item
seq.append(Linear(dim, data_dim))
self.seq = Sequential(*seq)
self.sigma = Parameter(torch.ones(data_dim) * 0.1)
def forward(self, input):
return self.seq(input), self.sigma
def loss_function(recon_x, x, sigmas, mu, logvar, output_info, factor):
st = 0
loss = []
for column_info in output_info:
for span_info in column_info:
if len(column_info) != 1 or span_info.activation_fn != "softmax":
ed = st + span_info.dim
std = sigmas[st]
loss.append(((x[:, st] - torch.tanh(recon_x[:, st])) ** 2 / 2 / (std ** 2)).sum())
loss.append(torch.log(std) * x.size()[0])
st = ed
else:
ed = st + span_info.dim
loss.append(cross_entropy(recon_x[:, st:ed], torch.argmax(x[:, st:ed], dim=-1), reduction="sum"))
st = ed
assert st == recon_x.size()[1]
KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
return sum(loss) * factor / x.size()[0], KLD / x.size()[0]
class DPTVAE(BaseSynthesiser):
def __init__(
self,
embedding_dim=128,
compress_dims=(128, 128),
decompress_dims=(128, 128),
l2scale=1e-5,
batch_size=500,
disabled_dp=False,
delta=1e-5,
noise_multiplier=3.5,
max_per_sample_grad_norm=1.0,
epsilon=1.0,
iterations=300,
verbose=True,
):
self.embedding_dim = embedding_dim
self.compress_dims = compress_dims
self.decompress_dims = decompress_dims
self.l2scale = l2scale
self.batch_size = batch_size
self.loss_factor = 2
self.iterations = iterations
self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# opacus parameters
self.noise_multiplier = noise_multiplier
self.disabled_dp = disabled_dp
self.delta = delta
self.max_per_sample_grad_norm = max_per_sample_grad_norm
self.epsilon = epsilon
self.epsilon_list = []
self.alpha_list = []
self.loss_list = []
self.verbose = verbose
def train(self, data, categorical_columns=None, ordinal_columns=None, update_epsilon=None, verbose=False):
self.transformer = DataTransformer()
self.transformer.fit(data, discrete_columns=categorical_columns)
data = self.transformer.transform(data)
dataset = TensorDataset(torch.from_numpy(data.astype("float32")).to(self.device))
loader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True, drop_last=True)
data_dim = self.transformer.output_dimensions
self.encoder = Encoder(data_dim, self.compress_dims, self.embedding_dim).to(self.device)
self.decoder = Decoder(self.embedding_dim, self.compress_dims, data_dim).to(self.device)
self.optimizerAE = Adam(list(self.encoder.parameters()) + list(self.decoder.parameters()), weight_decay=self.l2scale)
privacy_engine = opacus.PrivacyEngine(
self.decoder,
batch_size=self.batch_size,
sample_size=data.shape[0],
alphas=[1 + x / 10.0 for x in range(1, 100)] + list(range(12, 64)),
target_delta=self.delta,
noise_multiplier=self.noise_multiplier,
max_grad_norm=self.max_per_sample_grad_norm,
clip_per_layer=True,
)
if not self.disabled_dp:
privacy_engine.attach(self.optimizerAE)
if hasattr(self, "privacy_engine"):
epsilon, best_alpha = self.optimizerAE.privacy_engine.get_privacy_spent(self.delta)
else:
epsilon = 0
for i in range(self.iterations):
if not self.disabled_dp:
if self.epsilon < epsilon:
break
for id_, data_ in enumerate(loader):
self.optimizerAE.zero_grad()
real = data_[0].to(self.device)
mu, std, logvar = self.encoder(real)
eps = torch.randn_like(std)
emb = eps * std + mu
rec, sigmas = self.decoder(emb)
loss_1, loss_2 = loss_function(rec, real, sigmas, mu, logvar, self.transformer.output_info_list, self.loss_factor)
loss = loss_1 + loss_2
loss.backward()
self.optimizerAE.step()
self.decoder.sigma.data_.clamp_(0.01, 1.0)
if not self.disabled_dp:
for p in self.decoder.parameters():
if hasattr(p, "grad_sample"):
del p.grad_sample
epsilon, best_alpha = self.optimizerAE.privacy_engine.get_privacy_spent(self.delta)
self.epsilon_list.append(epsilon)
self.alpha_list.append(best_alpha)
if verbose:
print("eps: {:f} \t alpha: {:f} \t Loss: {:f}".format(epsilon, best_alpha, loss.detach().cpu()))
if not self.disabled_dp:
if self.epsilon < epsilon:
break
self.loss_list.append(loss)
privacy_engine.detach()
self.privacy_engine = privacy_engine
self.state_dict = self.optimizerAE.state_dict()
return self.loss_list, self.epsilon_list, self.alpha_list
def sample(self, samples):
self.decoder.eval()
steps = samples // self.batch_size + 1
data_ = []
for _ in range(steps):
mean = torch.zeros(self.batch_size, self.embedding_dim)
std = mean + 1
noise = torch.normal(mean=mean, std=std).to(self.device)
fake, sigmas = self.decoder(noise)
fake = torch.tanh(fake)
data_.append(fake.detach().cpu().numpy())
data_ = np.concatenate(data_, axis=0)
data_ = data_[:samples]
return self.transformer.inverse_transform(data_, sigmas.detach().cpu().numpy())
def set_device(self, device):
self.device = device
self.decoder.to(self.device)
def save(self, path):
assert hasattr(self, "data_sampler")
# always save a cpu model.
device_bak = self.device
self.device = torch.device("cpu")
self.encoder.to(self.device)
self.decoder.to(self.device)
torch.save(self, path, pickle_module=dill)
self.device = device_bak
self.encoder.to(self.device)
self.decoder.to(self.device)
@classmethod
def load(cls, path):
model = torch.load(path)
model.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.encoder.to(model.device)
model.decoder.to(model.device)
return model
def DPTVAE_runner(seed, run_name, epsilon, delta, n_s, df, batch_size, save, load, categorical_columns, ordinal_columns, iterations, noise_multiplier):
if not load:
gan = DPSynthesiser(
DPTVAE(
batch_size=batch_size,
iterations=iterations,
delta=delta,
noise_multiplier=noise_multiplier,
),
GeneralTransformer(),
epsilon=epsilon,
)
gan.fit(df, categorical_columns=categorical_columns, ordinal_columns=ordinal_columns, verbose=True, seed=seed)
else:
gan = DPTVAE.load(load)
df_synth = gan.sample(n_s)
if save:
gan.save(save)
return df_synth, None
Any help would be appreciated, thanks!