momentum

Twitter: https://twitter.com/occ42

Autoencoderで異常検知

PythonニューラルネットワークライブラリKerasを使ってAutoencoderを実装し、クレジットカードの不正利用検知をしてみる。

データセット

kaggleのクレジットカード不正利用データを使う。 CSV形式でダウンロードできるので、Pandasで読み込む。

import pandas as pd

df = pd.read_csv('creditcard.csv')
df.head()
Time V1 V2 ... V27 V28 Amount Class
0 0.0 -1.359807 -0.072781 ... 0.133558 -0.021053 149.62 0
1 0.0 0.085102 -0.255425 ... -0.008983 0.014724 2.69 0
2 1.0 0.247676 -1.514654 ... -0.055353 -0.059752 378.66 0
3 1.0 0.377436 -1.387024 ... 0.062723 0.061458 123.50 0
4 2.0 -0.270533 0.817739 ... 0.219422 0.215153 69.99 0

5 rows × 31 columns

各行がそれぞれ1回分のカード決済に対応する。 Timeは0行目の決済からの経過秒数、Amountは決済額。 その他の情報は個人情報を含むため、PCAによって特徴量V1~V27に変換された形式で提供されている。 Classは0なら正常決済、1なら不正決済を表す。

データセットの読み込み

V1~V28, Amountを特徴量として、Classをラベルとして読み込む。特徴量は、scikit-learnのMinMaxScaler()関数を使用して、範囲0-1にスケールする。

from sklearn.preprocessing import MinMaxScaler

feature = df.iloc[:,1:-1].values        #  V1~V28, Amount
label = df['Class'].values              # 正常/不正ラベル
# 特徴量を範囲0-1にスケールする
scaler = MinMaxScaler()
feature = scaler.fit_transform(feature)

特徴量、ラベルを1/3ずつ、訓練セット、検証セット、テストセットに分割する。 scikit-learnのtrain_test_split()関数を使うと簡単に分割できる。

from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(feature, label, test_size=0.333, shuffle=False)
X_train, X_valid, y_train, y_valid = train_test_split(X_train, y_train, test_size=0.5, shuffle=False)

今回試すAutoencoderによる異常検知手法は、教師なし学習であるため、正常データのみを抽出しておく。

import numpy as np

X_train_regular = X_train[np.where(y_train==0)]
X_valid_regular = X_valid[np.where(y_valid==0)]

Autoencoderとは

ニューラルネットワークによる次元圧縮のアルゴリズム

今回実装するAutoencoderは入力層、隠れ層(中間層)、出力層の3層から成る。 学習データ、教師データ共に同じデータを与え学習させることで、入力をそのまま出力するように学習させる。このため、入力層、出力層の次元は同じ。隠れ層の次元を入出力層の次元より小さくすることで、情報圧縮できる。

異常検知への応用では、正常データのみで学習させる。正常データでは入力に近い出力が得られ、学習時に与えていない異常データでは、入力と乖離した出力となる。入力と出力がどれだけずれているかを、異常スコアとし、適当な閾値を超過したらワーニングを出すというような手法となる。

以下の通り、Kerasを利用して実装した。

from keras.models import Sequential
from keras.layers import Dense, LeakyReLU
from keras.callbacks import EarlyStopping

class Autoencoder:
    def __init__(self, input_dim, hidden_dim):
        self.model = Sequential()

        self.model.add(Dense(hidden_dim, input_shape=(input_dim,)))
        self.model.add(LeakyReLU())

        self.model.add(Dense(input_dim))
        self.model.compile(optimizer='adam', loss='mse')
    
    def fit(self, X_train, X_valid):
        es = EarlyStopping(monitor='val_loss', patience=3)
        self.model.fit(X_train, X_train, epochs=100, batch_size=10, validation_data=(X_valid, X_valid), callbacks=[es])
    
    def calc_score(self, X):
        score = np.array([np.linalg.norm(x-x_pred) for x, x_pred in zip(X, ae.model.predict(X))])
        return score

Autoencoderを学習させる。中間層のユニット数は入力の半数にしている。

ae = Autoencoder(X_train_regular.shape[1], X_train_regular.shape[1]//2)
ae.fit(X_train_regular, X_valid_regular)
Train on 94766 samples, validate on 94829 samples
Epoch 1/100
94766/94766 [==============================] - 45s 478us/step - loss: 0.0025 - val_loss: 3.0553e-04
Epoch 2/100
94766/94766 [==============================] - 45s 475us/step - loss: 1.8962e-04 - val_loss: 1.8584e-04
Epoch 3/100
94766/94766 [==============================] - 45s 474us/step - loss: 1.3686e-04 - val_loss: 1.6639e-04
Epoch 4/100
94766/94766 [==============================] - 45s 475us/step - loss: 1.2192e-04 - val_loss: 1.5606e-04
Epoch 5/100
94766/94766 [==============================] - 45s 474us/step - loss: 1.1415e-04 - val_loss: 1.4786e-04
Epoch 6/100
94766/94766 [==============================] - 45s 475us/step - loss: 1.0980e-04 - val_loss: 1.4219e-04
Epoch 7/100
94766/94766 [==============================] - 45s 475us/step - loss: 1.0710e-04 - val_loss: 1.4130e-04
Epoch 8/100
94766/94766 [==============================] - 45s 475us/step - loss: 1.0502e-04 - val_loss: 1.3713e-04
Epoch 9/100
94766/94766 [==============================] - 45s 476us/step - loss: 1.0370e-04 - val_loss: 1.3665e-04
Epoch 10/100
94766/94766 [==============================] - 45s 475us/step - loss: 1.0272e-04 - val_loss: 1.4210e-04
Epoch 11/100
94766/94766 [==============================] - 45s 476us/step - loss: 1.0195e-04 - val_loss: 1.3461e-04
Epoch 12/100
94766/94766 [==============================] - 45s 475us/step - loss: 1.0159e-04 - val_loss: 1.3375e-04
Epoch 13/100
94766/94766 [==============================] - 45s 475us/step - loss: 1.0121e-04 - val_loss: 1.3333e-04
Epoch 14/100
94766/94766 [==============================] - 45s 476us/step - loss: 1.0094e-04 - val_loss: 1.3681e-04
Epoch 15/100
94766/94766 [==============================] - 45s 475us/step - loss: 1.0083e-04 - val_loss: 1.3767e-04
Epoch 16/100
94766/94766 [==============================] - 45s 476us/step - loss: 1.0073e-04 - val_loss: 1.3884e-04

kerasのEarlyStoppingをcallbackとしているため、検証セットのlossが減少しなくなった16エポックで学習が終了終わっている。GPU(GTX1060)で処理させて12分ほどかかった。

検証セットで閾値決定

検証セットのスコアをプロットする。

%matplotlib inline
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

# 検証セットのスコア計算
score = ae.calc_score(X_valid)
regular_idx = np.where(y_valid==0)[0]
anomaly_idx = np.where(y_valid==1)[0]
# プロット
plt.plot(regular_idx, score[regular_idx], linestyle='None', marker='.', color='gray', markerfacecolor='None', label='regular')
plt.plot(anomaly_idx, score[anomaly_idx], linestyle='None', marker='.', color='orange', markerfacecolor='None', label='fraud')
plt.legend()

f:id:core-dumped:20181114205202p:plain

グレーの点が正常決済、オレンジの点が不正決済。完全ではないが、やや分離てきている。 閾値を0.1~0.2あたりに設定するのが良さそうである。

閾値を0.0~0.2まで変化させて、評価指標として精度(予測が当たった割合)、偽陰性率(不正決済を正常決済と予測した割合)、偽陽性率(正常決済を不正決済と予測した割合)をプロットしてみる。

# 精度、偽陽性率、偽陰性率をプロット
threshold = np.linspace(0, 0.2, 100)
cms = [confusion_matrix(y_valid, np.where(score >= th, 1, 0)).ravel() for th in threshold]
accuracy = np.array([(tp+tn) / (tp+fp+fn+tn) for tn, fp, fn, tp in cms])
fn_rate = np.array([fn / (tp+fn) for tn, fp, fn, tp in cms])
fp_rate = np.array([fp / (tn+fp) for tn, fp, fn, tp in cms])
plt.plot(threshold, accuracy, label='accuracy', linestyle='dashed')
plt.plot(threshold, fn_rate, label='fn rate')
plt.plot(threshold, fp_rate, label='fp rate')
index = np.argmin(np.abs(fn_rate-fp_rate))     # 偽陽性率、偽陰性率が同程度になる閾値のインデックス
plt.vlines(threshold[index], 0, 1.0, linestyle='dashed')
plt.legend()

f:id:core-dumped:20181114205206p:plain

青が精度、緑が偽陽性率、オレンジが偽陰性率。今回は、偽陽性率、偽陰性率が同程度の水準になる値(約0.08)を使う。 この閾値での検証セットの偽陰性率、偽陽性率は約11%で、精度は89%となった。

print('threshold: ', threshold[index])
print('accuracy: ', accuracy[index])
print('fn_rate: ', fn_rate[index])
print('fp_rate: ', fp_rate[index])
threshold:  0.08282828282828283    # 閾値
accuracy:  0.8936546539907141    # 精度
fn_rate:  0.11038961038961038    # 偽陰性率
fp_rate:  0.10633877822185196    # 偽陽性率

テストセットで精度評価

テストセットのスコアをプロットする。

# テストセットのスコア計算
score = ae.calc_score(X_test)
regular_idx = np.where(y_test==0)[0]
anomaly_idx = np.where(y_test==1)[0]
# プロット
plt.plot(regular_idx, score[regular_idx], linestyle='None', marker='.', color='gray', markerfacecolor='None', label='regular')
plt.plot(anomaly_idx, score[anomaly_idx], linestyle='None', marker='.', color='orange', markerfacecolor='None', label='fraud')
plt.legend()

f:id:core-dumped:20181114205213p:plain

検証セットほどには分離できていないように見える。

混同行列を確認する。

cm = confusion_matrix(y_test, np.where(score >= threshold[index], 1, 0))
print(cm)
[[77799 16921]
 [    9   112]]

評価指標を確認する。

tn, fp, fn, tp = cm.ravel()
print('accuracy: ', (tp+tn) / (tp+fp+fn+tn))
print('fn_rate: ', fn / (tp+fn))
print('fp_rate: ', fp / (tn+fp))
accuracy:  0.8214907054965679    # 閾値
fn_rate:  0.0743801652892562    # 偽陰性率
fp_rate:  0.1786423141891892    # 偽陽性率

精度が82%、偽陰性率が7%、偽陽性率が18%。 不正決済の見逃しは少ないが、誤検知がやや多めとなった。 もう少し精度を高くしたいところ。