# -*- coding: utf-8 -*-
"""
Neural Transformation Learning-based Anomaly Detection
this script is partially adapted from https://github.com/boschresearch/NeuTraL-AD (AGPL-3.0 license)
@Author: Hongzuo Xu <hongzuoxu@126.com, xuhongzuo13@nudt.edu.cn>
"""
from deepod.core.base_model import BaseDeepAD
from deepod.core.networks.base_networks import MLPnet
from torch.utils.data import DataLoader
import torch.nn.functional as F
import torch
import numpy as np
[docs]class NeuTraL(BaseDeepAD):
"""
Neural Transformation Learning-based Anomaly Detection (ICML'21)
"""
def __init__(self, epochs=100, batch_size=64, lr=1e-3,
n_trans=11, trans_type='residual', temp=0.1,
rep_dim=128, hidden_dims='100,50', trans_hidden_dims=50,
act='LeakyReLU', bias=False,
epoch_steps=-1, prt_steps=10, device='cuda',
verbose=1, random_state=42):
super(NeuTraL, self).__init__(
model_name='NeuTraL', epochs=epochs, batch_size=batch_size, lr=lr,
epoch_steps=epoch_steps, prt_steps=prt_steps, device=device,
verbose=verbose, random_state=random_state
)
self.n_trans = n_trans
self.trans_type = trans_type
self.temp = temp
self.trans_hidden_dims = trans_hidden_dims
self.enc_hidden_dims = hidden_dims
self.rep_dim = rep_dim
self.act = act
self.bias = bias
return
[docs] def training_prepare(self, X, y):
train_loader = DataLoader(X, batch_size=self.batch_size, shuffle=True)
net = TabNeutralADNet(
n_features=self.n_features,
n_trans=self.n_trans,
trans_type=self.trans_type,
enc_hidden_dims=self.enc_hidden_dims,
trans_hidden_dims=self.trans_hidden_dims,
activation=self.act,
bias=self.bias,
rep_dim=self.rep_dim,
device=self.device
)
criterion = DCL(temperature=self.temp)
if self.verbose >=2:
print(net)
return train_loader, net, criterion
[docs] def inference_prepare(self, X):
test_loader = DataLoader(X, batch_size=self.batch_size, drop_last=False, shuffle=False)
self.criterion.reduction = 'none'
return test_loader
[docs] def training_forward(self, batch_x, net, criterion):
batch_x = batch_x.float().to(self.device)
z = net(batch_x)
loss = criterion(z)
return loss
[docs] def inference_forward(self, batch_x, net, criterion):
batch_x = batch_x.float().to(self.device)
batch_z = net(batch_x)
s = criterion(batch_z)
return batch_z, s
class TabNeutralADNet(torch.nn.Module):
"""
network class of NeuTraL for tabular data
Parameters
----------
n_features: int
dimensionality of input data
n_trans: int
the number of transformation times
trans_type: str, default='residual'
transformation type
enc_hidden_dims: list or str or int
the number of neural units of hidden layers in encoder net
trans_hidden_dims: list or str or int
the number of neural units of hidden layers in transformation net
rep_dim: int
representation dimensionality
activation: str
activation layer name
device: str
device
"""
def __init__(self, n_features, n_trans=11, trans_type='residual',
enc_hidden_dims='24,24,24,24', trans_hidden_dims=24,
rep_dim=24,
activation='ReLU',
bias=False,
device='cuda'):
super(TabNeutralADNet, self).__init__()
self.enc = MLPnet(
n_features=n_features,
n_hidden=enc_hidden_dims,
n_output=rep_dim,
activation=activation,
bias=bias,
batch_norm=False
)
self.trans = torch.nn.ModuleList(
[MLPnet(n_features=n_features,
n_hidden=trans_hidden_dims,
n_output=n_features,
activation=activation,
bias=bias,
batch_norm=False) for _ in range(n_trans)]
)
self.trans.to(device)
self.enc.to(device)
self.n_trans = n_trans
self.trans_type = trans_type
self.z_dim = rep_dim
def forward(self, x):
x_transform = torch.empty(x.shape[0], self.n_trans, x.shape[-1]).to(x)
for i in range(self.n_trans):
mask = self.trans[i](x)
if self.trans_type == 'forward':
x_transform[:, i] = mask
elif self.trans_type == 'mul':
mask = torch.sigmoid(mask)
x_transform[:, i] = mask * x
elif self.trans_type == 'residual':
x_transform[:, i] = mask + x
x_cat = torch.cat([x.unsqueeze(1), x_transform], 1)
zs = self.enc(x_cat.reshape(-1, x.shape[-1]))
zs = zs.reshape(x.shape[0], self.n_trans+1, self.z_dim)
return zs
class DCL(torch.nn.Module):
def __init__(self, temperature=0.1, reduction='mean'):
super(DCL, self).__init__()
self.temp = temperature
self.reduction = reduction
def forward(self, z):
z = F.normalize(z, p=2, dim=-1)
z_ori = z[:, 0] # n,z
z_trans = z[:, 1:] # n,k-1, z
batch_size, n_trans, z_dim = z.shape
sim_matrix = torch.exp(torch.matmul(z, z.permute(0, 2, 1) / self.temp)) # n,k,k
mask = (torch.ones_like(sim_matrix).to(z) - torch.eye(n_trans).unsqueeze(0).to(z)).bool()
sim_matrix = sim_matrix.masked_select(mask).view(batch_size, n_trans, -1)
trans_matrix = sim_matrix[:, 1:].sum(-1) # n,k-1
pos_sim = torch.exp(torch.sum(z_trans * z_ori.unsqueeze(1), -1) / self.temp) # n,k-1
K = n_trans - 1
scale = 1 / np.abs(K*np.log(1.0 / K))
loss = (torch.log(trans_matrix) - torch.log(pos_sim)) * scale
loss = loss.sum(1)
reduction = self.reduction
if reduction == 'mean':
return torch.mean(loss)
elif reduction == 'sum':
return torch.sum(loss)
elif reduction == 'none':
return loss
return loss