Source code for deepod.models.time_series.dif

# -*- coding: utf-8 -*-
"""
Deep isolation forest for anomaly detection
@Author: Hongzuo Xu <hongzuoxu@126.com, xuhongzuo13@nudt.edu.cn>
"""

from sklearn.utils import check_array
from sklearn.ensemble import IsolationForest
from deepod.core.base_model import BaseDeepAD
from deepod.core.networks.ts_network_dilated_conv import DilatedConvEncoder
from deepod.utils.utility import get_sub_seqs
from deepod.models.tabular.dif import cal_score
from tqdm import tqdm
import torch
from torch.utils.data import DataLoader
import numpy as np


[docs]class DeepIsolationForestTS(BaseDeepAD): """ Deep isolation forest for anomaly detection (TKDE'23) Implementation of a Deep Isolation Forest model for time-series anomaly detection, as described in TKDE'23. This model combines deep learning methods for dimensionality reduction with the traditional Isolation Forest algorithm to detect anomalies in time-series data. Args: epochs (int, optional): Number of training epochs. Default is 100. batch_size (int, optional): Batch size for training. Default is 1000. lr (float, optional): Learning rate for the optimizer. Default is 1e-3. seq_len (int, optional): Length of the input sequences. Default is 100. stride (int, optional): Stride of the sliding window over the time series. Default is 1. hidden_dims (str, optional): String representation of the hidden layer dimensions, separated by commas. bias (bool, optional): If True, adds a bias term to the layers of the neural network. Default is False. n_ensemble (int, optional): Number of ensemble models to train. n_estimators (int, optional): Number of base estimators in the Isolation Forest. Default is 6. max_samples (int, optional): Maximum number of samples to draw to train each base estimator. Default is 256. n_jobs (int, optional): Number of jobs to run in parallel for Isolation Forest training. Default is 1. epoch_steps (int, optional): Number of steps per epoch. If -1, all batches will be processed. prt_steps (int, optional): Interval of epochs at which to print progress updates. device (str, optional): Device to use for training ('cuda' or 'cpu'). Default is 'cuda'. verbose (int, optional): Verbosity mode. 0 = silent, 1 = progress bar, 2 = one line per epoch. random_state (int, optional): Seed for random number generation for reproducibility. Default is 42. """ def __init__(self, epochs=100, batch_size=1000, lr=1e-3, seq_len=100, stride=1, rep_dim=128, hidden_dims=100, bias=False, n_ensemble=50, n_estimators=6, max_samples=256, n_jobs=1, epoch_steps=-1, prt_steps=10, device='cuda', verbose=2, random_state=42): """ Initializes the Deep Isolation Forest Time-Series model with the specified hyperparameters. """ super(DeepIsolationForestTS, self).__init__( model_name='DIF', data_type='ts', network='DilatedConv', epochs=epochs, batch_size=batch_size, lr=lr, seq_len=seq_len, stride=stride, epoch_steps=epoch_steps, prt_steps=prt_steps, device=device, verbose=verbose, random_state=random_state ) self.hidden_dims = hidden_dims self.rep_dim = rep_dim self.bias = bias self.n_ensemble = n_ensemble self.n_estimators = n_estimators self.max_samples = max_samples self.n_jobs = n_jobs self.net_lst = [] self.iForest_lst = [] self.x_reduced_lst = [] self.score_lst = [] return
[docs] def fit(self, X, y=None): """ Fits the Deep Isolation Forest model on the provided time-series data. Args: X (np.ndarray): The input samples of shape (n_samples, n_features). y (np.ndarray, optional): Target values of shape (n_samples, ) (ignored in unsupervised training). Returns: self: The fitted estimator. """ X_seqs = get_sub_seqs(X, seq_len=self.seq_len, stride=self.stride) y_seqs = get_sub_seqs(y, seq_len=self.seq_len, stride=self.stride) if y is not None else None self.train_data = X_seqs self.train_label = y_seqs self.n_samples, self.n_features = X_seqs.shape[0], X_seqs.shape[2] if self.verbose >= 1: print('Start Training...') network_params = { 'n_features': self.n_features, 'n_hidden': self.hidden_dims, 'n_output': self.rep_dim, 'bias': self.bias } if self.verbose >= 2: iteration = tqdm(range(self.n_ensemble)) else: iteration = range(self.n_ensemble) ensemble_seeds = np.random.randint(0, 100000, self.n_ensemble) for i in iteration: net = DilatedConvEncoder(**network_params) net = net.to(self.device) torch.manual_seed(ensemble_seeds[i]) for name, param in net.named_parameters(): torch.nn.init.normal_(param, mean=0., std=1.) x_reduced = self._deep_transfer(self.train_data, net, self.batch_size, self.device) self.x_reduced_lst.append(x_reduced) self.net_lst.append(net) self.iForest_lst.append(IsolationForest(n_estimators=self.n_estimators, max_samples=self.max_samples, n_jobs=self.n_jobs, random_state=ensemble_seeds[i])) self.iForest_lst[i].fit(x_reduced) if self.verbose >= 1: print('Start Inference on the training data...') self.decision_scores_ = self.decision_function(X) self.labels_ = self._process_decision_scores() return self
[docs] def decision_function(self, X): """ Predict raw anomaly scores of X using the fitted detector. The anomaly score of an input sample is computed based on the fitted detector. For consistency, outliers are assigned with higher anomaly scores. Args: X (np.ndarray): The input samples of shape (n_samples, n_features). Sparse matrices are accepted only if they are supported by the base estimator. Returns: anomaly_scores (np.ndarray): The anomaly score of the input samples with the shape of (n_samples,). """ if self.verbose >= 1: print('Start Inference...') testing_n_samples = X.shape[0] X = get_sub_seqs(X, seq_len=self.seq_len, stride=1) self.score_lst = np.zeros([self.n_ensemble, testing_n_samples]) if self.verbose >= 2: iteration = tqdm(range(self.n_ensemble)) else: iteration = range(self.n_ensemble) for i in iteration: x_reduced = self._deep_transfer(X, self.net_lst[i], self.batch_size, self.device) scores = cal_score(x_reduced, self.iForest_lst[i]) padding = np.zeros(self.seq_len-1) scores = np.hstack((padding, scores)) self.score_lst[i] = scores final_scores = np.average(self.score_lst, axis=0) return final_scores
@staticmethod def _deep_transfer(X, net, batch_size, device): """ Transfers the input data through the network to obtain reduced representations. Args: X (np.ndarray): The input samples to be reduced. net (nn.Module): The neural network model for dimensionality reduction. batch_size (int): Batch size for processing. device (str): The device on which to perform computations. Returns: x_reduced (np.ndarray): The reduced representation of the input samples. """ x_reduced = [] loader = DataLoader(dataset=X, batch_size=batch_size, drop_last=False, pin_memory=True, shuffle=False) for batch_x in loader: batch_x = batch_x.float().to(device) batch_x_reduced = net(batch_x).data.cpu().numpy() x_reduced.extend(batch_x_reduced) x_reduced = np.array(x_reduced) return x_reduced
[docs] def training_prepare(self, X, y): pass
[docs] def training_forward(self, batch_x, net, criterion): pass
[docs] def inference_prepare(self, X): pass
[docs] def inference_forward(self, batch_x, net, criterion): pass