"""
TCN is adapted from https://github.com/locuslab/TCN
"""
import numpy as np
from deepod.core.base_model import BaseDeepAD
from deepod.core.networks.ts_network_tcn import TcnAE
from deepod.metrics import ts_metrics, point_adjustment
import time
import torch
from torch.utils.data import DataLoader
from ray import tune
from ray.air import session, Checkpoint
[docs]class TcnED(BaseDeepAD):
"""
An Evaluation of Anomaly Detection and Diagnosis in Multivariate Time Series (TNNLS'21)
Temporal Convolutional Network for Anomaly Detection in Multivariate Time Series.
Args:
seq_len (int):
The length of the input sequences for the network. Default is 100.
stride (int):
The stride of the convolutional operation. Default is 1.
epochs (int):
The number of training epochs. Default is 10.
batch_size (int):
The batch size used in training. Default is 32.
lr (float):
The learning rate for the optimizer. Default is 1e-4.
rep_dim (int):
The dimensionality of the latent representation (embedding) layer. Default is 32.
hidden_dims (int):
The number of hidden units in each layer. Default is 32.
kernel_size (int):
The size of the kernel in the convolutional layers. Default is 3.
act (str):
The activation function used in the network. Default is 'ReLU'.
bias (bool):
Whether to use bias in the convolutional layers. Default is True.
dropout (float):
The dropout rate used in the network. Default is 0.2.
epoch_steps (int):
The number of steps per epoch. Default is -1, indicating use of the full dataset.
prt_steps (int):
The interval of epochs at which to print training progress. Default is 1.
device (str):
The device on which to train the model, 'cuda' or 'cpu'. Default is 'cuda'.
verbose (int):
The verbosity level. Default is 2.
random_state (int):
The seed for random number generation. Default is 42.
"""
def __init__(self, seq_len=100, stride=1, epochs=10, batch_size=32, lr=1e-4,
rep_dim=32, hidden_dims=32, kernel_size=3, act='ReLU', bias=True, dropout=0.2,
epoch_steps=-1, prt_steps=1, device='cuda',
verbose=2, random_state=42):
"""
Initializes the TcnED model with specified parameters.
"""
super(TcnED, self).__init__(
model_name='TcnED', data_type='ts', epochs=epochs, batch_size=batch_size, lr=lr,
seq_len=seq_len, stride=stride,
epoch_steps=epoch_steps, prt_steps=prt_steps, device=device,
verbose=verbose, random_state=random_state
)
self.hidden_dims = hidden_dims
self.rep_dim = rep_dim
self.kernel_size = kernel_size
self.dropout = dropout
self.act = act
self.bias = bias
return
[docs] def training_prepare(self, X, y=None):
"""
Sets up the model for training including the data loader, network, and loss criterion.
Args:
X (numpy.ndarray):
The input features for training.
y (numpy.ndarray, optional):
The target values for training. Defaults to None.
Returns:
tuple:
A tuple containing the training data loader, network, and loss criterion.
"""
train_loader = DataLoader(X, batch_size=self.batch_size, shuffle=True)
net = TcnAE(
n_features=self.n_features,
n_hidden=self.hidden_dims,
n_emb=self.rep_dim,
activation=self.act,
bias=self.bias,
kernel_size=self.kernel_size,
dropout=self.dropout
).to(self.device)
criterion = torch.nn.MSELoss(reduction="mean")
if self.verbose >= 2:
print(net)
return train_loader, net, criterion
[docs] def inference_prepare(self, X):
"""
Prepares the model for inference, including setting up the data loader.
Args:
X (numpy.ndarray):
The input features for inference.
Returns:
DataLoader:
A data loader containing the test dataset.
"""
test_loader = DataLoader(X, batch_size=self.batch_size,
drop_last=False, shuffle=False)
self.criterion = torch.nn.MSELoss(reduction="none")
return test_loader
[docs] def training_forward(self, batch_x, net, criterion):
"""
Conducts a forward training pass with a batch of data.
Args:
batch_x (Tensor):
The batch of training data.
net (torch.nn.Module):
The network model.
criterion (callable):
The loss criterion.
Returns:
Tensor:
The loss for the training batch.
"""
ts_batch = batch_x.float().to(self.device)
output, _ = net(ts_batch)
loss = criterion(output[:, -1], ts_batch[:, -1])
return loss
[docs] def inference_forward(self, batch_x, net, criterion):
"""
Conducts a forward inference pass with a batch of data.
Args:
batch_x (Tensor):
The batch of inference data.
net (torch.nn.Module):
The network model.
criterion (callable):
The loss criterion used to compute the error.
Returns:
tuple:
A tuple containing the output and the error for the inference batch.
"""
batch_x = batch_x.float().to(self.device)
output, _ = net(batch_x)
error = torch.nn.L1Loss(reduction='none')(output[:, -1], batch_x[:, -1])
error = torch.sum(error, dim=1)
return output, error
def _training_ray(self, config, X_test, y_test):
"""
Internal method for training using Ray Tune for hyperparameter search.
Args:
config (dict):
The configuration dictionary for Ray Tune.
X_test (numpy.ndarray):
The test dataset features.
y_test (numpy.ndarray):
The test dataset labels.
"""
train_data = self.train_data[:int(0.8 * len(self.train_data))]
val_data = self.train_data[int(0.8 * len(self.train_data)):]
train_loader = DataLoader(train_data, batch_size=self.batch_size, shuffle=True)
val_loader = DataLoader(val_data, batch_size=self.batch_size, shuffle=True)
criterion = torch.nn.MSELoss(reduction="mean")
self.net = self.set_tuned_net(config)
optimizer = torch.optim.Adam(self.net.parameters(), lr=config['lr'], eps=1e-6)
self.net.train()
for i in range(config['epochs']):
t1 = time.time()
total_loss = 0
cnt = 0
for batch_x in train_loader:
loss = self.training_forward(batch_x, self.net, criterion)
self.net.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
cnt += 1
# terminate this epoch when reaching assigned maximum steps per epoch
if cnt > self.epoch_steps != -1:
break
# validation phase
val_loss = []
with torch.no_grad():
for batch_x in val_loader:
loss = self.training_forward(batch_x, self.net, criterion)
val_loss.append(loss)
val_loss = torch.mean(torch.stack(val_loss)).data.cpu().item()
test_metric = -1
if X_test is not None and y_test is not None:
scores = self.decision_function(X_test)
adj_eval_metrics = ts_metrics(y_test, point_adjustment(y_test, scores))
test_metric = adj_eval_metrics[2] # use adjusted Best-F1
t = time.time() - t1
if self.verbose >= 1 and (i == 0 or (i+1) % self.prt_steps == 0):
print(f'epoch{i+1:3d}, '
f'training loss: {total_loss/cnt:.6f}, '
f'validation loss: {val_loss:.6f}, '
f'test F1: {test_metric:.3f}, '
f'time: {t:.1f}s')
checkpoint_data = {
"epoch": i,
"net_state_dict": self.net.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
}
checkpoint = Checkpoint.from_dict(checkpoint_data)
session.report(
{"loss": val_loss, "metric": test_metric},
checkpoint=checkpoint,
)
[docs] def load_ray_checkpoint(self, best_config, best_checkpoint):
"""
Loads the best model checkpoint from Ray Tune.
Args:
best_config (dict):
The best configuration found by Ray Tune.
best_checkpoint (dict):
The checkpoint data to load.
"""
self.net = self.set_tuned_net(best_config)
self.net.load_state_dict(best_checkpoint['net_state_dict'])
return
[docs] def set_tuned_net(self, config):
"""
Sets up the network model with tuned hyperparameters.
Args:
config (dict):
The configuration dictionary containing the hyperparameters.
Returns:
TcnAE:
The initialized network model with the specified hyperparameters.
"""
net = TcnAE(
n_features=self.n_features,
n_hidden=config['hidden_dims'],
n_emb=config['rep_dim'],
activation=self.act,
bias=self.bias,
kernel_size=config['kernel_size'],
dropout=self.dropout
).to(self.device)
return net
[docs] @staticmethod
def set_tuned_params():
"""
Defines the grid of hyperparameters for tuning.
Returns:
dict:
A configuration dictionary for Ray Tune.
"""
config = {
'lr': tune.grid_search([1e-5, 1e-4, 1e-3, 1e-2]),
'epochs': tune.grid_search([20, 50, 100]),
'rep_dim': tune.choice([16, 64, 128, 512]),
'hidden_dims': tune.choice(['100,100', '100']),
'kernel_size': tune.choice([2, 3, 5])
}
return config