Autoencoderで異常検知
PythonのニューラルネットワークライブラリKerasを使ってAutoencoderを実装し、クレジットカードの不正利用検知をしてみる。
データセット
kaggleのクレジットカード不正利用データを使う。 CSV形式でダウンロードできるので、Pandasで読み込む。
import pandas as pd df = pd.read_csv('creditcard.csv') df.head()
Time | V1 | V2 | ... | V27 | V28 | Amount | Class | |
---|---|---|---|---|---|---|---|---|
0 | 0.0 | -1.359807 | -0.072781 | ... | 0.133558 | -0.021053 | 149.62 | 0 |
1 | 0.0 | 0.085102 | -0.255425 | ... | -0.008983 | 0.014724 | 2.69 | 0 |
2 | 1.0 | 0.247676 | -1.514654 | ... | -0.055353 | -0.059752 | 378.66 | 0 |
3 | 1.0 | 0.377436 | -1.387024 | ... | 0.062723 | 0.061458 | 123.50 | 0 |
4 | 2.0 | -0.270533 | 0.817739 | ... | 0.219422 | 0.215153 | 69.99 | 0 |
5 rows × 31 columns
各行がそれぞれ1回分のカード決済に対応する。 Timeは0行目の決済からの経過秒数、Amountは決済額。 その他の情報は個人情報を含むため、PCAによって特徴量V1~V27に変換された形式で提供されている。 Classは0なら正常決済、1なら不正決済を表す。
データセットの読み込み
V1~V28, Amountを特徴量として、Classをラベルとして読み込む。特徴量は、scikit-learnのMinMaxScaler()関数を使用して、範囲0-1にスケールする。
from sklearn.preprocessing import MinMaxScaler feature = df.iloc[:,1:-1].values # V1~V28, Amount label = df['Class'].values # 正常/不正ラベル # 特徴量を範囲0-1にスケールする scaler = MinMaxScaler() feature = scaler.fit_transform(feature)
特徴量、ラベルを1/3ずつ、訓練セット、検証セット、テストセットに分割する。 scikit-learnのtrain_test_split()関数を使うと簡単に分割できる。
from sklearn.model_selection import train_test_split X_train, X_test, y_train, y_test = train_test_split(feature, label, test_size=0.333, shuffle=False) X_train, X_valid, y_train, y_valid = train_test_split(X_train, y_train, test_size=0.5, shuffle=False)
今回試すAutoencoderによる異常検知手法は、教師なし学習であるため、正常データのみを抽出しておく。
import numpy as np X_train_regular = X_train[np.where(y_train==0)] X_valid_regular = X_valid[np.where(y_valid==0)]
Autoencoderとは
ニューラルネットワークによる次元圧縮のアルゴリズム。
今回実装するAutoencoderは入力層、隠れ層(中間層)、出力層の3層から成る。 学習データ、教師データ共に同じデータを与え学習させることで、入力をそのまま出力するように学習させる。このため、入力層、出力層の次元は同じ。隠れ層の次元を入出力層の次元より小さくすることで、情報圧縮できる。
異常検知への応用では、正常データのみで学習させる。正常データでは入力に近い出力が得られ、学習時に与えていない異常データでは、入力と乖離した出力となる。入力と出力がどれだけずれているかを、異常スコアとし、適当な閾値を超過したらワーニングを出すというような手法となる。
以下の通り、Kerasを利用して実装した。
from keras.models import Sequential from keras.layers import Dense, LeakyReLU from keras.callbacks import EarlyStopping class Autoencoder: def __init__(self, input_dim, hidden_dim): self.model = Sequential() self.model.add(Dense(hidden_dim, input_shape=(input_dim,))) self.model.add(LeakyReLU()) self.model.add(Dense(input_dim)) self.model.compile(optimizer='adam', loss='mse') def fit(self, X_train, X_valid): es = EarlyStopping(monitor='val_loss', patience=3) self.model.fit(X_train, X_train, epochs=100, batch_size=10, validation_data=(X_valid, X_valid), callbacks=[es]) def calc_score(self, X): score = np.array([np.linalg.norm(x-x_pred) for x, x_pred in zip(X, ae.model.predict(X))]) return score
Autoencoderを学習させる。中間層のユニット数は入力の半数にしている。
ae = Autoencoder(X_train_regular.shape[1], X_train_regular.shape[1]//2) ae.fit(X_train_regular, X_valid_regular)
Train on 94766 samples, validate on 94829 samples
Epoch 1/100
94766/94766 [==============================] - 45s 478us/step - loss: 0.0025 - val_loss: 3.0553e-04
Epoch 2/100
94766/94766 [==============================] - 45s 475us/step - loss: 1.8962e-04 - val_loss: 1.8584e-04
Epoch 3/100
94766/94766 [==============================] - 45s 474us/step - loss: 1.3686e-04 - val_loss: 1.6639e-04
Epoch 4/100
94766/94766 [==============================] - 45s 475us/step - loss: 1.2192e-04 - val_loss: 1.5606e-04
Epoch 5/100
94766/94766 [==============================] - 45s 474us/step - loss: 1.1415e-04 - val_loss: 1.4786e-04
Epoch 6/100
94766/94766 [==============================] - 45s 475us/step - loss: 1.0980e-04 - val_loss: 1.4219e-04
Epoch 7/100
94766/94766 [==============================] - 45s 475us/step - loss: 1.0710e-04 - val_loss: 1.4130e-04
Epoch 8/100
94766/94766 [==============================] - 45s 475us/step - loss: 1.0502e-04 - val_loss: 1.3713e-04
Epoch 9/100
94766/94766 [==============================] - 45s 476us/step - loss: 1.0370e-04 - val_loss: 1.3665e-04
Epoch 10/100
94766/94766 [==============================] - 45s 475us/step - loss: 1.0272e-04 - val_loss: 1.4210e-04
Epoch 11/100
94766/94766 [==============================] - 45s 476us/step - loss: 1.0195e-04 - val_loss: 1.3461e-04
Epoch 12/100
94766/94766 [==============================] - 45s 475us/step - loss: 1.0159e-04 - val_loss: 1.3375e-04
Epoch 13/100
94766/94766 [==============================] - 45s 475us/step - loss: 1.0121e-04 - val_loss: 1.3333e-04
Epoch 14/100
94766/94766 [==============================] - 45s 476us/step - loss: 1.0094e-04 - val_loss: 1.3681e-04
Epoch 15/100
94766/94766 [==============================] - 45s 475us/step - loss: 1.0083e-04 - val_loss: 1.3767e-04
Epoch 16/100
94766/94766 [==============================] - 45s 476us/step - loss: 1.0073e-04 - val_loss: 1.3884e-04
kerasのEarlyStoppingをcallbackとしているため、検証セットのlossが減少しなくなった16エポックで学習が終了終わっている。GPU(GTX1060)で処理させて12分ほどかかった。
検証セットで閾値決定
検証セットのスコアをプロットする。
%matplotlib inline import matplotlib.pyplot as plt from sklearn.metrics import confusion_matrix # 検証セットのスコア計算 score = ae.calc_score(X_valid) regular_idx = np.where(y_valid==0)[0] anomaly_idx = np.where(y_valid==1)[0] # プロット plt.plot(regular_idx, score[regular_idx], linestyle='None', marker='.', color='gray', markerfacecolor='None', label='regular') plt.plot(anomaly_idx, score[anomaly_idx], linestyle='None', marker='.', color='orange', markerfacecolor='None', label='fraud') plt.legend()
グレーの点が正常決済、オレンジの点が不正決済。完全ではないが、やや分離てきている。 閾値を0.1~0.2あたりに設定するのが良さそうである。
閾値を0.0~0.2まで変化させて、評価指標として精度(予測が当たった割合)、偽陰性率(不正決済を正常決済と予測した割合)、偽陽性率(正常決済を不正決済と予測した割合)をプロットしてみる。
# 精度、偽陽性率、偽陰性率をプロット threshold = np.linspace(0, 0.2, 100) cms = [confusion_matrix(y_valid, np.where(score >= th, 1, 0)).ravel() for th in threshold] accuracy = np.array([(tp+tn) / (tp+fp+fn+tn) for tn, fp, fn, tp in cms]) fn_rate = np.array([fn / (tp+fn) for tn, fp, fn, tp in cms]) fp_rate = np.array([fp / (tn+fp) for tn, fp, fn, tp in cms]) plt.plot(threshold, accuracy, label='accuracy', linestyle='dashed') plt.plot(threshold, fn_rate, label='fn rate') plt.plot(threshold, fp_rate, label='fp rate') index = np.argmin(np.abs(fn_rate-fp_rate)) # 偽陽性率、偽陰性率が同程度になる閾値のインデックス plt.vlines(threshold[index], 0, 1.0, linestyle='dashed') plt.legend()
青が精度、緑が偽陽性率、オレンジが偽陰性率。今回は、偽陽性率、偽陰性率が同程度の水準になる値(約0.08)を使う。 この閾値での検証セットの偽陰性率、偽陽性率は約11%で、精度は89%となった。
print('threshold: ', threshold[index]) print('accuracy: ', accuracy[index]) print('fn_rate: ', fn_rate[index]) print('fp_rate: ', fp_rate[index])
threshold: 0.08282828282828283 # 閾値
accuracy: 0.8936546539907141 # 精度
fn_rate: 0.11038961038961038 # 偽陰性率
fp_rate: 0.10633877822185196 # 偽陽性率
テストセットで精度評価
テストセットのスコアをプロットする。
# テストセットのスコア計算 score = ae.calc_score(X_test) regular_idx = np.where(y_test==0)[0] anomaly_idx = np.where(y_test==1)[0] # プロット plt.plot(regular_idx, score[regular_idx], linestyle='None', marker='.', color='gray', markerfacecolor='None', label='regular') plt.plot(anomaly_idx, score[anomaly_idx], linestyle='None', marker='.', color='orange', markerfacecolor='None', label='fraud') plt.legend()
検証セットほどには分離できていないように見える。
混同行列を確認する。
cm = confusion_matrix(y_test, np.where(score >= threshold[index], 1, 0)) print(cm)
[[77799 16921]
[ 9 112]]
評価指標を確認する。
tn, fp, fn, tp = cm.ravel() print('accuracy: ', (tp+tn) / (tp+fp+fn+tn)) print('fn_rate: ', fn / (tp+fn)) print('fp_rate: ', fp / (tn+fp))
accuracy: 0.8214907054965679 # 閾値
fn_rate: 0.0743801652892562 # 偽陰性率
fp_rate: 0.1786423141891892 # 偽陽性率
精度が82%、偽陰性率が7%、偽陽性率が18%。 不正決済の見逃しは少ないが、誤検知がやや多めとなった。 もう少し精度を高くしたいところ。