# -*- coding: utf-8 -*-
"""
Random distance prediction-based anomaly detection
this script is partially adapted from https://github.com/billhhh/RDP
@Author: Hongzuo Xu <hongzuoxu@126.com, xuhongzuo13@nudt.edu.cn>
"""
from deepod.core.base_model import BaseDeepAD
from deepod.core.networks.base_networks import MLPnet
from torch.utils.data import DataLoader
import torch.nn.functional as F
import torch
import copy
[docs]class RDP(BaseDeepAD):
"""
Unsupervised Representation Learning by Predicting Random Distances
(IJCAI'20)
Parameters
----------
epochs: int, optional (default=100)
Number of training epochs
batch_size: int, optional (default=64)
Number of samples in a mini-batch
lr: float, optional (default=1e-3)
Learning rate
rep_dim: int, optional (default=128)
Dimensionality of the representation space
hidden_dims: list, str or int, optional (default='100,50')
Number of neural units in hidden layers
- If list, each item is a layer
- If str, neural units of hidden layers are split by comma
- If int, number of neural units of single hidden layer
act: str, optional (default='ReLU')
activation layer name
choice = ['ReLU', 'LeakyReLU', 'Sigmoid', 'Tanh']
bias: bool, optional (default=False)
Additive bias in linear layer
epoch_steps: int, optional (default=-1)
Maximum steps in an epoch
- If -1, all the batches will be processed
prt_steps: int, optional (default=10)
Number of epoch intervals per printing
device: str, optional (default='cuda')
torch device,
verbose: int, optional (default=1)
Verbosity mode
random_stateļ¼ int, optional (default=42)
the seed used by the random
"""
def __init__(self, epochs=100, batch_size=64, lr=1e-3,
rep_dim=128, hidden_dims='100,50', act='LeakyReLU', bias=False,
epoch_steps=-1, prt_steps=10, device='cuda',
verbose=2, random_state=42):
super(RDP, self).__init__(
model_name='RDP', epochs=epochs, batch_size=batch_size, lr=lr,
epoch_steps=epoch_steps, prt_steps=prt_steps, device=device,
verbose=verbose, random_state=random_state
)
self.hidden_dims = hidden_dims
self.rep_dim = rep_dim
self.act = act
self.bias = bias
return
[docs] def training_prepare(self, X, y):
train_loader = DataLoader(X, batch_size=self.batch_size, shuffle=True)
net = MLPnet(
n_features=self.n_features,
n_hidden=self.hidden_dims, n_output=self.rep_dim,
activation=self.act, bias=self.bias,
skip_connection=None,
).to(self.device)
rp_net = copy.deepcopy(net)
criterion = RDPLoss(rp_net)
if self.verbose >= 2:
print(net)
return train_loader, net, criterion
[docs] def inference_prepare(self, X):
test_loader = DataLoader(X, batch_size=self.batch_size, drop_last=False, shuffle=False)
self.criterion.reduction = 'none'
return test_loader
[docs] def training_forward(self, batch_x, net, criterion):
batch_x1 = batch_x[torch.randperm(batch_x.shape[0])]
batch_x = batch_x.float().to(self.device)
batch_x1 = batch_x1.float().to(self.device)
z, z1 = net(batch_x), net(batch_x1)
loss = criterion(z, z1, batch_x, batch_x1)
return loss
[docs] def inference_forward(self, batch_x, net, criterion):
batch_x = batch_x.float().to(self.device)
batch_x1 = batch_x[torch.randperm(batch_x.shape[0])]
batch_z, batch_z1 = net(batch_x), net(batch_x1)
s = criterion(batch_z, batch_z1, batch_x, batch_x1)
return batch_z, s
class RDPLoss(torch.nn.Module):
def __init__(self, random_projection_net, reduction='mean'):
super(RDPLoss, self).__init__()
self.rp_net = random_projection_net
self.mse = torch.nn.MSELoss(reduction=reduction)
self.reduction = reduction
def forward(self, rep, rep1, x, x1):
rep_target = self.rp_net(x)
rep1_target = self.rp_net(x1)
d_target = torch.sum(F.normalize(rep_target, p=1, dim=1) *
F.normalize(rep1_target, p=1, dim=1), dim=1)
d_pred = torch.sum(F.normalize(rep, p=1, dim=1) *
F.normalize(rep1, p=1, dim=1), dim=1)
if self.reduction == 'mean' or self.reduction == 'sum':
gap_loss = self.mse(rep, rep_target)
rdp_loss = self.mse(d_target, d_pred)
else:
gap_loss = torch.mean(F.mse_loss(rep, rep_target, reduction='none'), dim=1)
rdp_loss = F.mse_loss(d_target, d_pred, reduction='none')
return gap_loss + rdp_loss