Source code for deepod.models.tabular.slad

"""
scale learning-based deep anomaly detection
@Author: Hongzuo Xu <hongzuoxu@126.com, xuhongzuo13@nudt.edu.cn>
"""

from deepod.core.base_model import BaseDeepAD
from deepod.core.networks.base_networks import MLPnet, LinearBlock
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import torch.nn.functional as F
import torch


[docs]class SLAD(BaseDeepAD): """ Fascinating Supervisory Signals and Where to Find Them: Deep Anomaly Detection with Scale Learning (ICML'23) """ def __init__(self, epochs=100, batch_size=128, lr=1e-3, hidden_dims=100, act='LeakyReLU', distribution_size=10, # the member size in a group, c in the paper n_slad_ensemble=20, subspace_pool_size=50, magnify_factor=200, n_unified_features=128, # dimensionality after transformation, h in the paper epoch_steps=-1, prt_steps=10, device='cuda', verbose=2, random_state=42): super(SLAD, self).__init__( model_name='SLAD', epochs=epochs, batch_size=batch_size, lr=lr, epoch_steps=epoch_steps, prt_steps=prt_steps, device=device, verbose=verbose, random_state=random_state ) self.hidden_dims = hidden_dims self.act = act self.distribution_size = distribution_size self.n_slad_ensemble = n_slad_ensemble self.max_subspace_len = None # maximum length of subspace self.sampling_size = None # number of objects per ensemble member self.subspace_pool_size = subspace_pool_size self.n_unified_features = n_unified_features self.magnify_factor = magnify_factor self.len_pool = None self.affine_network_lst = {} self.subspace_indices_lst = [] self.f_weight = None self.net = None return
[docs] def training_prepare(self, X, y=None): self.adaptively_setting() if self.verbose >= 1: print(f'unified size: {self.n_unified_features}, ' f'subspace pool size: {self.subspace_pool_size}, ' f'ensemble size: {self.n_slad_ensemble}') # randomly determines the pool of subspace sizes self.len_pool = np.sort(np.random.choice(np.arange(1, self.max_subspace_len+1), self.subspace_pool_size, replace=False)) if self.verbose >=1: print('len pool:', self.len_pool) for s in self.len_pool: self.affine_network_lst[s] = LinearBlock( in_channels=s, out_channels=self.n_unified_features, bias=False, activation=None ) # calculate feature weight by Pearson correlation coefficient matrix self.f_weight = self._cal_f_weight(X) # get newly generated data with supervision signals x_new, y_new = self._transform_data_ensemble(X) train_loader = DataLoader(TensorDataset(torch.from_numpy(x_new), torch.from_numpy(y_new)), batch_size=self.batch_size, shuffle=True) net = MLPnet( n_features=self.n_unified_features, n_hidden=self.hidden_dims, n_output=1, activation=self.act ).to(self.device) criterion = SLADLoss() return train_loader, net, criterion
[docs] def training_forward(self, batch_x, net, criterion): batch_x, batch_y = batch_x batch_x = batch_x.float().to(self.device) batch_y = batch_y.float().to(self.device) predict_y = net(batch_x) predict_y = predict_y.squeeze(dim=2) loss = criterion(predict_y, batch_y) return loss
[docs] def decision_function(self, X, return_rep=False): criterion = SLADLoss(reduction='none') all_score_lst = [] for i in range(self.n_slad_ensemble): subspace_indices = self.subspace_indices_lst[i] x_new, y_new = self._transform_data(X, subspace_indices=subspace_indices, subset_idx=np.arange(len(X))) test_loader = DataLoader(TensorDataset(torch.from_numpy(x_new), torch.from_numpy(y_new)), batch_size=self.batch_size, shuffle=False, drop_last=False) self.net.eval() with torch.no_grad(): score_lst = [] for batch_x in test_loader: s = self.inference_forward(batch_x, self.net, criterion) score_lst.append(s) scores = torch.cat(score_lst).data.cpu().numpy() all_score_lst.append(scores) final_s = np.average(np.array(all_score_lst), axis=0) return final_s
[docs] def inference_forward(self, batch_x, net, criterion): return self.training_forward(batch_x, net, criterion)
[docs] def inference_prepare(self, X): pass
def _cal_f_weight(self, X): if self.n_features < 50: pccs = np.abs(np.corrcoef(X.T)) pccs[np.isnan(pccs)] = 0 f_weight = np.average(pccs, axis=1) f_weight = (f_weight - np.min(f_weight)) / (np.max(f_weight) - np.min(f_weight)) else: f_weight = np.ones(self.n_features) return f_weight def _transform_data_ensemble(self, X): # get newly generated data with supervisory signals x_new_lst = [] y_new_lst = [] rng = np.random.RandomState(seed=self.random_state) for i in range(self.n_slad_ensemble): replace = True if self.n_features <= 10 else False subspace_indices = [ rng.choice(np.arange(self.n_features), rng.choice(self.len_pool, 1), replace=replace) for _ in range(self.distribution_size) ] self.subspace_indices_lst.append(subspace_indices) subset_idx = rng.choice(np.arange(X.shape[0]), self.sampling_size, replace=True) # the size of newly generated data: [sampling_size, distribution_size(c), n_unified_features(h)] x_new, y_new = self._transform_data(X, subspace_indices, subset_idx) x_new_lst.append(x_new) y_new_lst.append(y_new) x_new = np.vstack(x_new_lst) y_new = np.vstack(y_new_lst) return x_new, y_new def _transform_data(self, X, subspace_indices, subset_idx): """generate new data sets with supervision according to subspace indices""" x_new = np.zeros([len(subset_idx), self.distribution_size, self.n_unified_features]) y_new = np.zeros([len(subset_idx), self.distribution_size]) for ii, subspace_idx in enumerate(subspace_indices): n_f = len(subspace_idx) # # get the subspace # # use two steps here, otherwise the output shape is 1-dim when subspace is with 1 feature x_sub = X[subset_idx, :] x_sub = x_sub[:, subspace_idx] # # transform function: get transformed vectors x_sub_projected = self._transformation_function(x_sub, n_f) # # use feature weight / number of features to set supervision signals target = (np.sum(self.f_weight[subspace_idx]) / self.n_unified_features) * self.magnify_factor y_true = np.array([target] * x_sub.shape[0]) x_new[:, ii, :], y_new[:, ii] = x_sub_projected, y_true return x_new, y_new def _transformation_function(self, x, n_f): """transform phase to obtain a unified dimensionality""" # # affine transform transform_net = self.affine_network_lst[n_f] transform_net.eval() with torch.no_grad(): x_projected = transform_net(torch.from_numpy(x).float()).data.cpu().numpy() return x_projected def adaptively_setting(self): n_samples = self.n_samples n_features = self.n_features self.max_subspace_len = n_features if self.subspace_pool_size is None: self.subspace_pool_size = min(self.max_subspace_len, 256) else: self.subspace_pool_size = min(self.subspace_pool_size, self.max_subspace_len) if n_samples < 500: factor = 50 elif n_samples < 1000: factor = 20 elif 1000 <= n_samples < 5000: factor = 5 elif n_samples > 100000: factor = 1 else: factor = 2 self.sampling_size = max(int(n_samples / self.n_slad_ensemble) * factor, 10) return
class SLADLoss(torch.nn.Module): def __init__(self, reduction='mean'): super(SLADLoss, self).__init__() assert reduction in ['mean', 'none', 'sum'], 'unsupported reduction operation' self.reduction = reduction self.kl = torch.nn.KLDivLoss(reduction='none') return def forward(self, y_pred, y_true): """ forward function Parameters: y_pred: torch.Tensor, shape = [batch_size, distribution_size] output of the network y_true: torch.Tensor, shape = [batch_size, distribution_size] ground truth labels return_raw_results: bool, default=False return the raw results with shape [batch_size, distribution_size] accompanied by reduced loss value Return: loss value: torch.Tensor reduced loss value """ reduction = self.reduction preds_smax = F.softmax(y_pred, dim=1) true_smax = F.softmax(y_true, dim=1) total_m = 0.5 * (preds_smax + true_smax) js1 = F.kl_div(F.log_softmax(preds_smax, dim=1), total_m, reduction='none') js2 = F.kl_div(F.log_softmax(true_smax, dim=1), total_m, reduction='none') js = torch.sum(js1 + js2, dim=1) if reduction == 'mean': loss = torch.mean(js) elif reduction == 'sum': loss = torch.sum(js) elif reduction == 'none': loss = js else: return return loss