Source code for deepod.models.tabular.icl

# -*- coding: utf-8 -*-
"""
Anomaly Detection for Tabular Data with Internal Contrastive Learning
this script is partially adapted from the supplementary material in
https://openreview.net/forum?id=_hszZbt46bT
@Author: Hongzuo Xu <hongzuoxu@126.com, xuhongzuo13@nudt.edu.cn>
"""

from deepod.core.base_model import BaseDeepAD
from deepod.core.networks.base_networks import MLPnet
from torch.utils.data import DataLoader
import torch
import torch.nn.functional as F
import numpy as np


[docs]class ICL(BaseDeepAD): """ Anomaly Detection for Tabular Data with Internal Contrastive Learning (ICLR'22) :cite:`shenkar2022internal` Parameters ---------- epochs: int, optional (default=100) Number of training epochs batch_size: int, optional (default=64) Number of samples in a mini-batch lr: float, optional (default=1e-3) Learning rate n_ensemble: int, optional (default=2) Number of the ensemble size (make use of the bagging effect) rep_dim: int, optional (default=128) Dimensionality of the representation space hidden_dims: List, str or int, optional (default='100,50') Number of neural units in hidden layers - If List, each item is a layer - If str, neural units of hidden layers are split by comma - If int, number of neural units of single hidden layer act: str, optional (default='ReLU') activation layer name choice = ['ReLU', 'LeakyReLU', 'Sigmoid', 'Tanh'] bias: bool, optional (default=False) Additive bias in linear layer kernel_size: str or int, optional (default='auto') the length of sub-vectors temperature: float, optional (default=0.01) tau in the cross-entropy function max_negatives: int, optional (default=1000) Maximum number of negatives (unmatched sub-vectors) epoch_steps: int, optional (default=-1) Maximum steps in an epoch - If -1, all the batches will be processed prt_steps: int, optional (default=10) Number of epoch intervals per printing device: str, optional (default='cuda') torch device, verbose: int, optional (default=1) Verbosity mode random_state: int, optional (default=42) the seed used by the random """ def __init__(self, epochs=100, batch_size=64, lr=1e-3, n_ensemble='auto', rep_dim=128, hidden_dims='100,50', act='LeakyReLU', bias=False, kernel_size='auto', temperature=0.01, max_negatives=1000, epoch_steps=-1, prt_steps=10, device='cuda', verbose=2, random_state=42): super(ICL, self).__init__( model_name='ICL', epochs=epochs, batch_size=batch_size, lr=lr, n_ensemble=n_ensemble, epoch_steps=epoch_steps, prt_steps=prt_steps, device=device, verbose=verbose, random_state=random_state ) self.hidden_dims = hidden_dims self.rep_dim = rep_dim self.act = act self.bias = bias self.kernel_size = kernel_size self.tau = temperature self.max_negatives = max_negatives return
[docs] def training_prepare(self, X, y): train_loader = DataLoader(X, batch_size=self.batch_size, shuffle=True, pin_memory=True) if self.kernel_size == 'auto': if self.n_features <= 40: self.kernel_size = 2 elif 40 < self.n_features <= 160: self.kernel_size = 10 # else: # self.kernel_size = self.n_features - 150 elif 160 < self.n_features <= 240: self.kernel_size = self.n_features - 150 elif 240 < self.n_features <= 480: self.kernel_size = self.n_features - 200 else: self.kernel_size = self.n_features - 400 # elif 320 < self.n_features <= 480: # self.kernel_size = self.n_features - 300 # # else: # self.kernel_size = self.n_features - 450 if self.verbose >= 1: print(f'kernel size: {self.kernel_size}') if self.n_features < 3: raise ValueError('ICL model cannot handle the data that have less than three features.') net = ICLNet( n_features=self.n_features, kernel_size=self.kernel_size, hidden_dims=self.hidden_dims, rep_dim=self.rep_dim, activation=self.act, bias=self.bias ).to(self.device) criterion = torch.nn.CrossEntropyLoss() if self.verbose >= 2: print(net) return train_loader, net, criterion
[docs] def training_forward(self, batch_x, net, criterion): batch_x = batch_x.float().to(self.device) # positives are sub-vectors, query are their complements positives, query = net(batch_x) logit = self.cal_logit(query, positives) logit = logit.permute(0, 2, 1) correct_class = torch.zeros((logit.shape[0], logit.shape[2]), dtype=torch.long).to(self.device) loss = criterion(logit, correct_class) return loss
[docs] def inference_prepare(self, X): test_loader = DataLoader(X, batch_size=self.batch_size, drop_last=False, shuffle=False) self.criterion.reduction = 'none' return test_loader
[docs] def inference_forward(self, batch_x, net, criterion): loss = self.training_forward(batch_x, net, criterion) batch_z = batch_x # for consistency s = loss.mean(dim=1) return batch_z, s
def cal_logit(self, query, pos): n_pos = query.shape[1] batch_size = query.shape[0] # get negatives negative_index = np.random.choice(np.arange(n_pos), min(self.max_negatives, n_pos), replace=False) negative = pos.permute(0, 2, 1)[:, :, negative_index] pos_multiplication = (query * pos).sum(dim=2).unsqueeze(2) neg_multiplication = torch.matmul(query, negative) # [batch_size, n_neg, n_neg] # Removal of the diagonals identity_matrix = torch.eye(n_pos).unsqueeze(0).to(self.device) identity_matrix = identity_matrix.repeat(batch_size, 1, 1) identity_matrix = identity_matrix[:, :, negative_index] neg_multiplication.masked_fill_(identity_matrix==1, -float('inf')) logit = torch.cat((pos_multiplication, neg_multiplication), dim=2) logit = torch.div(logit, self.tau) return logit
class ICLNet(torch.nn.Module): def __init__(self, n_features, kernel_size, hidden_dims='100,50', rep_dim=64, activation='ReLU', bias=False): super(ICLNet, self).__init__() self.n_features = n_features self.kernel_size = kernel_size # get consecutive subspace indices and the corresponding complement indices start_idx = np.arange(n_features)[: -kernel_size + 1] # [0,1,2,...,dim-kernel_size+1] self.all_idx = start_idx[:, None] + np.arange(kernel_size) self.all_idx_complement = np.array([np.setdiff1d(np.arange(n_features), row) for row in self.all_idx]) if type(hidden_dims)==str: hidden_dims = hidden_dims.split(',') hidden_dims = [int(a) for a in hidden_dims] n_layers = len(hidden_dims) # hidden layers f_act = ['Tanh'] for _ in range(n_layers): f_act.append(activation) self.enc_f_net = MLPnet( n_features=n_features-kernel_size, n_hidden=hidden_dims, n_output=rep_dim, mid_channels=len(self.all_idx), batch_norm=True, activation=f_act, bias=bias, ) hidden_dims2 = [int(0.5*h) for h in hidden_dims] g_act = [] for _ in range(n_layers+1): g_act.append(activation) self.enc_g_net = MLPnet( n_features=kernel_size, n_hidden=hidden_dims2, n_output=rep_dim, mid_channels=len(self.all_idx), batch_norm=True, activation=g_act, bias=bias, ) return def forward(self, x): x1, x2 = self.positive_matrix_builder(data=x) x1 = self.enc_g_net(x1) x2 = self.enc_f_net(x2) x1 = F.normalize(x1) x2 = F.normalize(x2) return x1, x2 def positive_matrix_builder(self, data): """ Generate matrix of sub-vectors and matrix of complement vectors (positive pairs) Parameters ---------- data: torch.Tensor shape (n_samples, n_features), required The input data. Returns ------- matrix: torch.Tensor of shape [n_samples, number of sub-vectors, kernel_size] Derived sub-vectors. complement_matrix: torch.Tensor of shape [n_samples, number of sub-vectors, n_features-kernel_size] Complement vector of derived sub-vectors. """ dim = self.n_features data = torch.unsqueeze(data, 1) # [size, 1, dim] data = data.repeat(1, dim, 1) # [size, dim, dim] matrix = data[:, np.arange(self.all_idx.shape[0])[:, None], self.all_idx] complement_matrix = data[:, np.arange(self.all_idx.shape[0])[:, None], self.all_idx_complement] return matrix, complement_matrix