读研之掉进故障检测(六.5)—基于分类的一般数据异常检测/GOAD-master代码学习记录

(77) 2024-06-24 10:01:01

读研之掉进故障检测(六.5)—代码学习记录


文章目录

  • 前言
  • 一、train_ad_tabular.py
  • 二、opt_tc_tabular.py
  • 三、fcnet.py
  • 四、总结

前言

-------本文是一个论文对应的开源代码笔记。
读研之掉进故障检测(六)—基于分类的一般数据异常检测
论文的代码在:https://github.com/lironber/GOAD
论文链接:https://arxiv.org/abs/2005.02359


一、train_ad_tabular.py

1.代码运行,主要是定义了一些参数,然后执行train_anomaly_detector()函数。

if __name__ == '__main__': parser = argparse.ArgumentParser() # 定义了一个参数解释器 # 使用 add_argument 方法向解析器添加了各种命令行参数及其默认值 parser.add_argument('--lr', default=0.001, type=float) parser.add_argument('--n_rots', default=32, type=int) parser.add_argument('--batch_size', default=64, type=int) parser.add_argument('--n_epoch', default=10, type=int) parser.add_argument('--d_out', default=4, type=int) parser.add_argument('--dataset', default='kdd', type=str) #更换数据在此处 parser.add_argument('--exp', default='affine', type=str) parser.add_argument('--c_pr', default=0, type=int) parser.add_argument('--true_label', default=1, type=int) parser.add_argument('--ndf', default=8, type=int) parser.add_argument('--m', default=1, type=float) parser.add_argument('--lmbda', default=0.1, type=float) parser.add_argument('--eps', default=0, type=float) parser.add_argument('--n_iters', default=500, type=int) args = parser.parse_args() print("Dataset: ", args.dataset) if args.dataset == 'thyroid' or args.dataset == 'arrhythmia': n_iters = args.n_iters f_scores = np.zeros(n_iters) for i in range(n_iters): f_scores[i] = train_anomaly_detector(args) #主要功能实现的函数 print("AVG f1_score", f_scores.mean()) else: train_anomaly_detector(args)#主要功能实现的函数 

2、数据下载与转换
这里可以说是论文中比较有意思的一个点,因为这里是通过转换实现了table dataset的处理。
核心的转换为:
------由train_real中每个样本与rots中的旋转矩阵进行点乘,然后通过np.stack函数用于将列表中所有元素沿着第二维度(轴索引为2)进行堆叠最终获取x_train。具体的来说,它将每个样本的结果按顺序排列,并成一个新的数组。

def load_trans_data(args): dl = Data_Loader() #根据args中的参数进行数据集的下载 train_real, val_real, val_fake = dl.get_dataset(args.dataset, args.c_pr) y_test_fscore = np.concatenate([np.zeros(len(val_real)), np.ones(len(val_fake))]) ratio = 100.0 * len(val_real) / (len(val_real) + len(val_fake)) #生成随机旋转矩阵 rots,其形状为 (args.n_rots, n_dims, args.d_out), #分别表示矩阵数目,训练集数据的维度,表示旋转后的特征维度 n_train, n_dims = train_real.shape rots = np.random.randn(args.n_rots, n_dims, args.d_out) #在KDD数据集中,rots为32/121/4 print('Calculating transforms') x_train = np.stack([train_real.dot(rot) for rot in rots], 2) val_real_xs = np.stack([val_real.dot(rot) for rot in rots], 2) val_fake_xs = np.stack([val_fake.dot(rot) for rot in rots], 2) x_test = np.concatenate([val_real_xs, val_fake_xs]) return x_train, x_test, y_test_fscore, ratio 

3.训练数据 ,关于函数tc_obj.fit_trans_classifier(x_train, x_test, y_test, ratio)的具体内容在opt_tc_tabular.py

def train_anomaly_detector(args): x_train, x_test, y_test, ratio = load_trans_data(args) tc_obj = tc.TransClassifierTabular(args) f_score = tc_obj.fit_trans_classifier(x_train, x_test, y_test, ratio) return f_score 

二、opt_tc_tabular.py

model.netC1()与model.netC5()模型在fcnet.py。

1、介绍TransClassifierTabular():类。
类中的核心函数为fit_trans_classifier()

class TransClassifierTabular(): def __init__(self, args): self.ds = args.dataset self.m = args.m self.lmbda = args.lmbda self.batch_size = args.batch_size self.ndf = args.ndf self.n_rots = args.n_rots self.d_out = args.d_out self.eps = args.eps self.n_epoch = args.n_epoch if args.dataset == "thyroid" or args.dataset == "arrhythmia": self.netC = model.netC1(self.d_out, self.ndf, self.n_rots).cuda() else: self.netC = model.netC5(self.d_out, self.ndf, self.n_rots).cuda() model.weights_init(self.netC) self.optimizerC = optim.Adam(self.netC.parameters(), lr=args.lr, betas=(0.5, 0.999)) 

代码运行的逻辑为如下

在每次训练轮次中:

  1. 将网络模型self.netc设置为训练模式,通过调用train()
  2. 使用np.random.permutation(len(train_xs))生成训练样本的随机排列索引,并将结果存储在rp中。可以理解为在每个训练批次中随机选择样本。
  3. 初始化变量n_batch 记录当前批次的数量。
  4. 创建一个形状为(self.ndf, self.n_rots)的全0张量,并使用GPU。
  5. 数据批处理循环中:
    1. 将网络模型的梯度清零。
    2. 获取当前批次的实际大小 batch_range。
    3. 获取当前批次对应的训练标签 train_labels,如果是最后一批,则根据旋转矩阵数量创建新的标签,保证标签与样本一一对应。
    4. 根据当前批次的索引值,从训练数据集 train_xs 中获取对应的样本xs,并转换为张量类型,然后将其移动到 GPU 上(如果可用)。
    5. 将xs作为输入,放入模型中获取tc_zs, ce_zs
    6. 将 tc_zs 在第一个和第二个维度上交换位置,通过调用 permute() 方法,使得维度顺序变为 (batch_range, self.n_rots, self.ndf)。
    7. 计算交叉熵损失函数 loss_ce,通过调用 celoss() 方法,传入输出结果 ce_zs 和标签 train_labels。
    8. 计算总体损失函数 er,包括分类器损失和正则化项,通过调用 tc_loss() 方法计算平均的三元组损失 tc_loss(tc_zs, self.m),并乘以超参数 self.lmbda,然后加上交叉熵损失 loss_ce。
    9. 执行反向传播,执行一步优化器的更新。更新批次数量 n_batch,结束数据批处理循环。
  6. 计算 sum_zs 每列维度的均值,通过除以批次数量 n_batch,得到形状为 (self.n_rots, self.ndf) 的均值张量
  7. 将均值张量按列转置,并添加一个维度,得到形状为 (1, self.ndf, self.n_rots) 的新张量 means。
  8. 将网络模型 self.netC 设置为评估模式,通过调用 eval() 方法。
 def fit_trans_classifier(self, train_xs, x_test, y_test, ratio): labels = torch.arange(self.n_rots).unsqueeze(0).expand((self.batch_size, self.n_rots)).long().cuda() celoss = nn.CrossEntropyLoss() print('Training') for epoch in range(self.n_epoch): self.netC.train() # 使用该函数生成训练样本的随机排列索引,并将结果存储在变量rp rp = np.random.permutation(len(train_xs)) n_batch = 0 sum_zs = torch.zeros((self.ndf, self.n_rots)).cuda() for i in range(0, len(train_xs), self.batch_size): self.netC.zero_grad() batch_range = min(self.batch_size, len(train_xs) - i) train_labels = labels if batch_range == len(train_xs) - i: train_labels = torch.arange(self.n_rots).unsqueeze(0).expand((len(train_xs) - i, self.n_rots)).long().cuda() #新label也要满足维度 idx = np.arange(batch_range) + i xs = torch.from_numpy(train_xs[rp[idx]]).float().cuda() tc_zs, ce_zs = self.netC(xs) sum_zs = sum_zs + tc_zs.mean(0) tc_zs = tc_zs.permute(0, 2, 1) # 将tc_zs在第一个和第二个维度上交换位置,通过调用permute方法,使得顺序变为(batch_range,self.n_rots,self.ndf) loss_ce = celoss(ce_zs, train_labels) er = self.lmbda * tc_loss(tc_zs, self.m) + loss_ce er.backward() self.optimizerC.step() n_batch += 1 means = sum_zs.t() / n_batch means = means.unsqueeze(0) self.netC.eval() with torch.no_grad(): val_probs_rots = np.zeros((len(y_test), self.n_rots)) for i in range(0, len(x_test), self.batch_size): batch_range = min(self.batch_size, len(x_test) - i) idx = np.arange(batch_range) + i xs = torch.from_numpy(x_test[idx]).float().cuda() zs, fs = self.netC(xs) zs = zs.permute(0, 2, 1) diffs = ((zs.unsqueeze(2) - means) ** 2).sum(-1) diffs_eps = self.eps * torch.ones_like(diffs) diffs = torch.max(diffs, diffs_eps) logp_sz = torch.nn.functional.log_softmax(-diffs, dim=2) val_probs_rots[idx] = -torch.diagonal(logp_sz, 0, 1, 2).cpu().data.numpy() val_probs_rots = val_probs_rots.sum(1) f1_score = f_score(val_probs_rots, y_test, ratio) print("Epoch:", epoch, ", fscore: ", f1_score) return f1_score 

关于三元损失函数tc_loss的代码,以及分析。

def tc_loss(zs, m): means = zs.mean(0).unsqueeze(0) # 计算zs的平均值,同时使用unsqueeze(0)来增加维度,得到形状为 (1, n_rots, ndf) 的 means 张量 res = ((zs.unsqueeze(2) - means.unsqueeze(1)) ** 2).sum(-1) # 计算每个特征向量与均值之间的欧氏距离平方,得到形状为 (batch_size, n_rots) 的 res 张量 pos = torch.diagonal(res, dim1=1, dim2=2) # 提取 res 张量的对角线元素,即正样本对应的距离 offset = torch.diagflat(torch.ones(zs.size(1))).unsqueeze(0).cuda() * 1e6 # 创建一个形状与 res 相同的对角矩阵,并在 GPU 上进行计算(如果可用) neg = (res + offset).min(-1)[0] # 将 res 和 offset 相加,并沿着最后一个维度找到每个样本的最小值。这相当于将负样本之间的距离增加了一个固定的偏移量 loss = torch.clamp(pos + m - neg, min=0).mean() # 计算三元损失函数的值。首先,计算 pos + m - neg,然后通过 torch.clamp 函数将其限制在非负范围内。最后,取平均值作为最终的损失值,并返回该值 return loss 

三、fcnet.py

import torch.nn as nn import torch.nn.init as init import numpy as np def weights_init(m): classname = m.__class__.__name__ if isinstance(m, nn.Linear): init.xavier_normal_(m.weight, gain=np.sqrt(2.0)) elif classname.find('Conv') != -1: init.xavier_normal_(m.weight, gain=np.sqrt(2.0)) elif classname.find('Linear') != -1: init.eye_(m.weight) elif classname.find('Emb') != -1: init.normal(m.weight, mean=0, std=0.01) class netC5(nn.Module): def __init__(self, d, ndf, nc): super(netC5, self).__init__() self.trunk = nn.Sequential( nn.Conv1d(d, ndf, kernel_size=1, bias=False), nn.LeakyReLU(0.2, inplace=True), nn.Conv1d(ndf, ndf, kernel_size=1, bias=False), nn.LeakyReLU(0.2, inplace=True), nn.Conv1d(ndf, ndf, kernel_size=1, bias=False), nn.LeakyReLU(0.2, inplace=True), nn.Conv1d(ndf, ndf, kernel_size=1, bias=False), nn.LeakyReLU(0.2, inplace=True), nn.Conv1d(ndf, ndf, kernel_size=1, bias=False), ) self.head = nn.Sequential( nn.LeakyReLU(0.2, inplace=True), nn.Conv1d(ndf, nc, kernel_size=1, bias=True), ) def forward(self, input): tc = self.trunk(input) ce = self.head(tc) return tc, ce class netC1(nn.Module): def __init__(self, d, ndf, nc): super(netC1, self).__init__() self.trunk = nn.Sequential( nn.Conv1d(d, ndf, kernel_size=1, bias=False), ) self.head = nn.Sequential( nn.LeakyReLU(0.2, inplace=True), nn.Conv1d(ndf, nc, kernel_size=1, bias=True), ) def forward(self, input): tc = self.trunk(input) ce = self.head(tc) return tc, ce 

四、总结

那么关于原论文中的GOAD算法中的代码实现部分在哪儿?
读研之掉进故障检测(六.5)—基于分类的一般数据异常检测/GOAD-master代码学习记录 (https://mushiming.com/)  第1张
关于训练算法中的公式3。依靠函数tc_loss()进行实现。
读研之掉进故障检测(六.5)—基于分类的一般数据异常检测/GOAD-master代码学习记录 (https://mushiming.com/)  第2张
关于评估算法中的公式4与公式5.
读研之掉进故障检测(六.5)—基于分类的一般数据异常检测/GOAD-master代码学习记录 (https://mushiming.com/)  第3张
读研之掉进故障检测(六.5)—基于分类的一般数据异常检测/GOAD-master代码学习记录 (https://mushiming.com/)  第4张

 with torch.no_grad(): val_probs_rots = np.zeros((len(y_test), self.n_rots)) for i in range(0, len(x_test), self.batch_size): batch_range = min(self.batch_size, len(x_test) - i) idx = np.arange(batch_range) + i xs = torch.from_numpy(x_test[idx]).float().cuda() zs, fs = self.netC(xs) zs = zs.permute(0, 2, 1) diffs = ((zs.unsqueeze(2) - means) ** 2).sum(-1) diffs_eps = self.eps * torch.ones_like(diffs) diffs = torch.max(diffs, diffs_eps) logp_sz = torch.nn.functional.log_softmax(-diffs, dim=2) val_probs_rots[idx] = -torch.diagonal(logp_sz, 0, 1, 2).cpu().data.numpy() val_probs_rots = val_probs_rots.sum(1) f1_score = f_score(val_probs_rots, y_test, ratio) print("Epoch:", epoch, ", fscore: ", f1_score) 
THE END

发表回复