# -*- coding: utf-8 -*-
"""
One-class classification
@Author: Hongzuo Xu <hongzuoxu@126.com, xuhongzuo13@nudt.edu.cn>
"""
from deepod.core.base_model import BaseDeepAD
from deepod.core.networks.base_networks import get_network
from torch.utils.data import DataLoader
import torch
[docs]class DeepSVDDTS(BaseDeepAD):
"""
Deep One-class Classification for Anomaly Detection (ICML'18)
:cite:`ruff2018deepsvdd`
Args:
epochs (int, optional):
Number of training epochs. Default is 100.
batch_size (int, optional):
Number of samples in a mini-batch. Default is 64.
lr (float, optional):
Learning rate. Default is 1e-5.
network (str, optional):
Network structure for different data structures. Default is 'Transformer'.
seq_len (int, optional):
Size of window used to create subsequences from the data. Default is 30.
stride (int, optional):
Number of time points the window moves between subsequences. Default is 10.
rep_dim (int, optional):
Dimensionality of the representation space. Default is 64.
hidden_dims (Union[list, str, int], optional):
Dimensions for hidden layers. Default is '512'.
- If list, each item is a layer
- If str, neural units of hidden layers are split by comma
- If int, number of neural units of single hidden layer
act (str, optional):
Activation layer name. Choices are ['ReLU', 'LeakyReLU', 'Sigmoid', 'Tanh']. Default is 'GELU'.
bias (bool, optional):
Whether to add a bias term in linear layers. Default is False.
n_heads (int, optional):
Number of heads in multi-head attention. Default is 8.
d_model (int, optional):
Number of dimensions in Transformer model. Default is 512.
attn (str, optional):
Type of attention mechanism. Default is 'self_attn'.
pos_encoding (str, optional):
Manner of positional encoding. Default is 'fixed'.
norm (str, optional):
Manner of normalization in Transformer. Default is 'LayerNorm'.
epoch_steps (int, optional):
Maximum steps in an epoch. Default is -1.
prt_steps (int, optional):
Number of epoch intervals per printing. Default is 10.
device (str, optional):
Torch device. Default is 'cuda'.
verbose (int, optional):
Verbosity mode. Default is 2.
random_state (int, optional):
Seed used by the random number generator. Default is 42.
"""
def __init__(self, epochs=100, batch_size=64, lr=1e-5,
network='Transformer', seq_len=30, stride=10,
rep_dim=64, hidden_dims='512', act='GELU', bias=False,
n_heads=8, d_model=512, attn='self_attn', pos_encoding='fixed', norm='LayerNorm',
epoch_steps=-1, prt_steps=10, device='cuda',
verbose=2, random_state=42):
"""
Initializes the DeepSVDDTS model with the specified parameters.
"""
super(DeepSVDDTS, self).__init__(
model_name='DeepSVDD', data_type='ts', epochs=epochs, batch_size=batch_size, lr=lr,
network=network, seq_len=seq_len, stride=stride,
epoch_steps=epoch_steps, prt_steps=prt_steps, device=device,
verbose=verbose, random_state=random_state
)
self.hidden_dims = hidden_dims
self.rep_dim = rep_dim
self.act = act
self.bias = bias
# parameters for Transformer
self.n_heads = n_heads
self.d_model = d_model
self.attn = attn
self.pos_encoding = pos_encoding
self.norm = norm
self.c = None
return
[docs] def training_prepare(self, X, y):
"""
Prepares the training process by setting up data loaders and initializing the network and loss criterion.
Args:
X (torch.Tensor):
Input tensor of the features.
y (torch.Tensor):
Input tensor of the labels.
Returns:
train_loader (DataLoader):
DataLoader for the training data.
net (nn.Module):
Initialized neural network model.
criterion (DSVDDLoss):
Loss function for DeepSVDD.
"""
train_loader = DataLoader(X, batch_size=self.batch_size, shuffle=True)
network_params = {
'n_features': self.n_features,
'n_hidden': self.hidden_dims,
'n_output': self.rep_dim,
'activation': self.act,
'bias': self.bias
}
if self.network == 'Transformer':
network_params['n_heads'] = self.n_heads
network_params['d_model'] = self.d_model
network_params['pos_encoding'] = self.pos_encoding
network_params['norm'] = self.norm
network_params['attn'] = self.attn
network_params['seq_len'] = self.seq_len
elif self.network == 'ConvSeq':
network_params['seq_len'] = self.seq_len
network_class = get_network(self.network)
net = network_class(**network_params).to(self.device)
# self.c = torch.randn(net.n_emb).to(self.device)
self.c = self._set_c(net, train_loader)
criterion = DSVDDLoss(c=self.c)
if self.verbose >= 2:
print(net)
return train_loader, net, criterion
[docs] def inference_prepare(self, X):
"""
Prepares the model for inference by setting up data loaders.
Args:
X (torch.Tensor):
Input tensor of the features for inference.
Returns:
test_loader (DataLoader):
DataLoader for inference.
"""
test_loader = DataLoader(X, batch_size=self.batch_size,
drop_last=False, shuffle=False)
self.criterion.reduction = 'none'
return test_loader
[docs] def training_forward(self, batch_x, net, criterion):
"""
Performs a forward pass during training.
Args:
batch_x (torch.Tensor):
Batch of input data.
net (nn.Module):
The neural network model.
criterion (DSVDDLoss):
Loss function for DeepSVDD.
Returns:
loss (torch.Tensor):
Computed loss for the batch.
"""
batch_x = batch_x.float().to(self.device)
z = net(batch_x)
loss = criterion(z)
return loss
[docs] def inference_forward(self, batch_x, net, criterion):
"""
Performs a forward pass during inference.
Args:
batch_x (torch.Tensor):
Batch of input data.
net (nn.Module):
The neural network model.
criterion (DSVDDLoss):
Loss function for DeepSVDD to calculate anomaly score.
Returns:
batch_z (torch.Tensor):
The encoded batch of data in the feature space.
s (torch.Tensor):
The anomaly scores for the batch.
"""
batch_x = batch_x.float().to(self.device)
batch_z = net(batch_x)
s = criterion(batch_z)
return batch_z, s
def _set_c(self, net, dataloader, eps=0.1):
"""
Initializes the center 'c' for the hypersphere in the representation space.
Args:
net (nn.Module):
The neural network model.
dataloader (DataLoader):
DataLoader for the data to compute the center from.
eps (float, optional):
Small value to ensure 'c' is away from zero. Default is 0.1.
Returns:
c (torch.Tensor):
The initialized center of the hypersphere.
"""
net.eval()
z_ = []
with torch.no_grad():
for x in dataloader:
x = x.float().to(self.device)
z = net(x)
z_.append(z.detach())
z_ = torch.cat(z_)
c = torch.mean(z_, dim=0)
c[(abs(c) < eps) & (c < 0)] = -eps
c[(abs(c) < eps) & (c > 0)] = eps
return c
class DSVDDLoss(torch.nn.Module):
"""
Custom loss function for Deep Support Vector Data Description (Deep SVDD).
This loss function computes the distance between each data point in the representation
space and the center of the hypersphere and aims to minimize this distance for normal data points.
Args:
c (torch.Tensor):
The center of the hypersphere in the representation space.
reduction (str, optional):
Specifies the reduction to apply to the output. Choices are 'none', 'mean', 'sum'. Default is 'mean'.
- If ``'none'``: no reduction will be applied;
- If ``'mean'``: the sum of the output will be divided by the number of elements in the output;
- If ``'sum'``: the output will be summed
"""
def __init__(self, c, reduction='mean'):
"""
Initializes the DSVDDLoss with the hypersphere center and reduction method.
"""
super(DSVDDLoss, self).__init__()
self.c = c
self.reduction = reduction
def forward(self, rep, reduction=None):
"""
Calculates the Deep SVDD loss for a batch of representations.
Args:
rep (torch.Tensor):
The representation of the batch of data.
reduction (str, optional):
The reduction method to apply. If None, will use the specified 'reduction' attribute. Default is None.
Returns:
loss (torch.Tensor):
The calculated loss based on the representations and the center 'c'.
"""
loss = torch.sum((rep - self.c) ** 2, dim=1)
if reduction is None:
reduction = self.reduction
if reduction == 'mean':
return torch.mean(loss)
elif reduction == 'sum':
return torch.sum(loss)
elif reduction == 'none':
return loss