import numpy as np
import torch
import skfuzzy as fuzz
from torch import nn
import torch.optim as optim
from sklearn.metrics import mean_squared_error as mse
def generate_random_params_antecedent():
return [np.random.rand() , np.random.rand(), np.random.rand()]
def generate_random_params_consequent():
while True:
a, b, c, d = np.random.randint(1, 10, 4)
if a <= b <= c <= d:
return [a, b, c, d]
class manfis(nn.Module):
def init(self, membership_function = “general_bell”):
super(manfis,self).init()
self.membership_function = membership_function
if membership_function == "general_bell":
self.premise_params = nn.ParameterList([
nn.Parameter(torch.tensor(generate_random_params_antecedent(), dtype=torch.float)),
nn.Parameter(torch.tensor(generate_random_params_antecedent(), dtype=torch.float)),
nn.Parameter(torch.tensor(generate_random_params_antecedent(), dtype=torch.float)),
nn.Parameter(torch.tensor(generate_random_params_antecedent(), dtype=torch.float)),
])
self.consequent_params = nn.ParameterList([
nn.Parameter(torch.tensor(generate_random_params_consequent(), dtype=torch.float)),
nn.Parameter(torch.tensor(generate_random_params_consequent(), dtype=torch.float))
])
def _fuzzification(self, x):
mu_A1 = fuzz.gbellmf(x.detach().numpy(), c = self.premise_params[0][1].item(), b = self.premise_params[0][0].item(), a = self.premise_params[0][2].item())
mu_A2 = fuzz.gbellmf(x.detach().numpy(), c = self.premise_params[1][1].item(), b = self.premise_params[1][0].item(), a = self.premise_params[1][2].item())
mu_B1 = fuzz.gbellmf(x.detach().numpy(), c = self.premise_params[2][1].item(), b = self.premise_params[2][0].item(), a = self.premise_params[2][2].item())
mu_B2 = fuzz.gbellmf(x.detach().numpy(), c = self.premise_params[3][1].item(), b = self.premise_params[3][0].item(), a = self.premise_params[3][2].item())
return (mu_A1, mu_A2, mu_B1, mu_B2)
def _inference(self, membership_degrees):
rule_firing_strength_1 = membership_degrees[0] * membership_degrees[2] # mu_A1 * mu_B1
rule_firing_strength_2 = membership_degrees[1] * membership_degrees[3] # mu_A2 * mu_B2
weighted_rule_firing_strength_1 = rule_firing_strength_1 / (rule_firing_strength_1 + rule_firing_strength_2)
weighted_rule_firing_strength_2 = rule_firing_strength_2 / (rule_firing_strength_1 + rule_firing_strength_2)
return (weighted_rule_firing_strength_1, weighted_rule_firing_strength_2)
def _implication(self, rule_firing_strengths):
output_1 = rule_firing_strengths[0] * self.consequent_params[0][0].item() # W1 * c1(from A1)
output_2 = rule_firing_strengths[1] * self.consequent_params[1][0].item() # W2 * c2(from A2)
return (output_1, output_2)
def _aggregation(self, consequent_activations):
return consequent_activations[0] + consequent_activations[1] # w1c1 + w2c2
def _defuzzification(self, x, aggregated_output):
centroid_1 = fuzz.defuzzify.centroid(x.detach().numpy(), fuzz.trapmf(x.detach().numpy(), [self.consequent_params[0][0].item(), self.consequent_params[0][1].item(), self.consequent_params[0][2].item(), self.consequent_params[0][3].item()]))
area_1 = np.trapz(fuzz.trapmf(x.detach().numpy(), [self.consequent_params[0][0].item(), self.consequent_params[0][1].item(), self.consequent_params[0][2].item(), self.consequent_params[0][3].item()]), x.detach().numpy())
centroid_2 = fuzz.defuzzify.centroid(x.detach().numpy(), fuzz.trapmf(x.detach().numpy(), [self.consequent_params[1][0].item(), self.consequent_params[1][1].item(), self.consequent_params[1][2].item(), self.consequent_params[1][3].item()]))
area_2 = np.trapz(fuzz.trapmf(x.detach().numpy(), [self.consequent_params[1][0].item(), self.consequent_params[1][1].item(), self.consequent_params[1][2].item(), self.consequent_params[1][3].item()]), x.detach().numpy())
centroid = ((centroid_1 * area_1) + (centroid_2 * area_2)) / (area_1 + area_2) # D
output = torch.tensor(aggregated_output * centroid, dtype=torch.float) # O_4 * Defuzzified_result
return output
def forward(self, x):
membership_degrees = self._fuzzification(x)
rule_firing_strengths = self._inference(membership_degrees)
consequent_activations = self._implication(rule_firing_strengths)
aggregated_output = self._aggregation(consequent_activations)
final_output = self._defuzzification(x, aggregated_output)
return final_output
class modeltrain():
def init(self, xMapping, yMapping):
self.xMapping=xMapping
self.yMapping=yMapping
def _fit(self, epochs):
model = manfis()
criteria = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
for e in range(epochs):
optimizer.zero_grad()
out = model(self.xMapping)
loss = criteria(out, self.yMapping)
loss.backward()
optimizer.step()
with torch.no_grad():
pred = model(self.xMapping)
pred = pred.cpu().detach().numpy()
real = self.yMapping.cpu().numpy().reshape(-1, 1)
test_rmse = mse(pred, real) ** 0.5
print('[Epoch: {}/{}] [Train Loss: {}] [Test RMSE: {}]'.format(
e+1, epochs, str(loss.item())[:6], str(test_rmse)[:6]))
return model