Мониторинг ML-моделей: детекция дрифта и снижения метрик качества

Модель обучена, метрики на валидации отличные, деплой в продакшен прошел успешно. Через два месяца точность падает на 15%, а через полгода модель работает хуже бейзлайна. Деградация качества ML-моделей в продакшене — это, увы, довольно частое явление. Данные меняются, распределения сдвигаются, зависимости трансформируются.

Мониторинг ML-моделей позволяет обнаружить проблемы до того, как они повлияют на бизнес-метрики. Система детекции дрифта выявляет изменения в данных и поведении модели, метрики качества количественно оценивают деградацию, алертинг сигнализирует о необходимости переобучения.

Типы дрифта в ML-моделях

Дрифт — изменение статистических свойств данных или целевой переменной во времени. Различают три основных типа, каждый требует специфических методов детекции и стратегий реагирования.

Концептуальный дрифт (Concept Drift)

При концептуальном дрифте изменяется зависимость между признаками и целевой переменной, тогда как распределение входных данных остается неизменным. Модель получает привычные признаки, но их связь с таргетом уже трансформировалась.

Примеры концептуального дрифта:

  • Модель кредитного скоринга: уровень доходов клиентов остался прежним, но изменилась корреляция между доходом и вероятностью дефолта из-за экономического кризиса;
  • Рекомендательная система: пользователи с теми же демографическими характеристиками начали предпочитать другой контент после появления нового культурного тренда;
  • Медицинская диагностика: симптомы остались идентичными, но их связь с заболеванием изменилась при появлении новых штаммов.

Концептуальный дрифт сложнее всего обнаружить без доступа к истинным значениям (ground truth). Единственный надежный индикатор — снижение метрик качества на реальных данных.

Дрифт данных (Data Drift)

При дрифте данных изменяется распределение входных признаков, при этом целевая зависимость может оставаться стабильной. Модель сталкивается с данными из областей признакового пространства, которые слабо представлены в обучающей выборке.

Типичные сценарии:

  • Изменение демографии пользователей после расширения на новые рынки;
  • Сезонные паттерны в поведенческих данных;
  • Технические изменения в системах сбора данных;
  • Появление новых категорий в категориальных признаках.

Дрифт данных выявляется с помощью статистических тестов, оценивающих различия в распределениях. Его критичность определяется тем, насколько новые данные выходят за пределы распределения, использованного при обучении модели.

Дрифт предсказаний (Prediction Drift)

Распределение выходов модели меняется со временем. Это может происходить как следствие дрифта данных или концептуального дрифта, но иногда само по себе служит независимым индикатором потенциальных проблем.

Практическое значение prediction drift:

  • Ранний сигнал о проблемах до получения ground truth;
  • Детекция технических ошибок в пайплайне;
  • Мониторинг консистентности модели.

Дрифт предсказаний весьма коварен. Другие метрики могут быть стабильны, точность предсказаний тоже. Если модель классификации внезапно начала предсказывать положительный класс в 30% случаев вместо привычных 7%, это требует немедленного расследования независимо от стабильности других метрик.

Статистические методы детекции дрифта

Обнаружение дрифта строится на сравнении статистических распределений:

  • Референсное (эталонное) распределение (reference distribution) — данные обучения или стабильный период;
  • Текущее распределение (current distribution) — текущие данные в продакшене.

Выбор метода зависит от типа признаков и требований к чувствительности.

Тест Колмогорова-Смирнова

Kolmogorov-Smirnov-тест (KS-тест) оценивает максимальную разницу между кумулятивными функциями распределения двух выборок. И хотя метод работает только для непрерывных одномерных распределений, он дает достаточно хорошую оценку статистической значимости различий.

import numpy as np
from scipy.stats import ks_2samp
import pandas as pd

def detect_drift_ks(reference_data, current_data, features, threshold=0.05):
    """
    Детекция дрифта методом Kolmogorov-Smirnov
    
    Args:
        reference_data: базовое распределение (обучающие данные)
        current_data: текущее распределение (продакшен)
        features: список числовых признаков для анализа
        threshold: уровень значимости (alpha)
    
    Returns:
        dict с результатами для каждого признака
    """
    drift_report = {}
    
    for feature in features:
        ref_values = reference_data[feature].dropna()
        curr_values = current_data[feature].dropna()
        
        # KS-статистика и p-value
        ks_stat, p_value = ks_2samp(ref_values, curr_values)
        
        drift_report[feature] = {
            'ks_statistic': ks_stat,
            'p_value': p_value,
            'drift_detected': p_value < threshold,
            'reference_mean': ref_values.mean(),
            'current_mean': curr_values.mean(),
            'reference_std': ref_values.std(),
            'current_std': curr_values.std()
        }
    
    return drift_report

# Генерация примера с дрифтом
np.random.seed(42)
reference = pd.DataFrame({
    'feature_1': np.random.normal(0, 1, 1000),
    'feature_2': np.random.exponential(2, 1000),
    'feature_3': np.random.normal(5, 2, 1000)
})

# feature_1 без дрифта, feature_2 с дрифтом среднего, feature_3 с дрифтом дисперсии
current = pd.DataFrame({
    'feature_1': np.random.normal(0, 1, 500),
    'feature_2': np.random.exponential(3, 500),
    'feature_3': np.random.normal(5, 4, 500)
})

results = detect_drift_ks(reference, current, ['feature_1', 'feature_2', 'feature_3'])

for feature, metrics in results.items():
    print(f"\n{feature}:")
    print(f"  KS-статистика: {metrics['ks_statistic']:.4f}")
    print(f"  p-value: {metrics['p_value']:.4f}")
    print(f"  Дрифт: {'Да' if metrics['drift_detected'] else 'Нет'}")
    print(f"  Среднее: {metrics['reference_mean']:.2f} → {metrics['current_mean']:.2f}")
    print(f"  Std: {metrics['reference_std']:.2f} → {metrics['current_std']:.2f}")
feature_1:
  KS-статистика: 0.0380
  p-value: 0.7169
  Дрифт: Нет
  Среднее: 0.02 → -0.04
  Std: 0.98 → 1.03

feature_2:
  KS-статистика: 0.1650
  p-value: 0.0000
  Дрифт: Да
  Среднее: 2.02 → 2.92
  Std: 2.01 → 2.90

feature_3:
  KS-статистика: 0.1900
  p-value: 0.0000
  Дрифт: Да
  Среднее: 5.03 → 4.73
  Std: 1.94 → 3.95

Представленный выше код реализует классический воркфлоу детекции дрифта: независимое сравнение референсной и текущей выборок по каждому признаку. KS-тест возвращает статистику (максимальное расхождение CDF) и p-value для проверки нулевой гипотезы об идентичности распределений. При p-value < threshold отвергаем гипотезу и фиксируем дрифт. Дополнительно рассчитываются описательные статистики для интерпретации природы изменений.

👉🏻  Создание ML-модели прогноза действий пользователей интернет-магазина и рекомендательной системы

Ограничения метода:

  1. KS-тест чувствителен к любым различиям в распределении, включая сдвиги медианы, изменения дисперсии и трансформации формы распределения;
  2. Тест работает только для непрерывных признаков, требует достаточного объема данных (минимум 50-100 наблюдений в каждой выборке);
  3. Тест чувствителен к выбросам;
  4. Не годится для категориальных признаков, в таком случае используются альтернативные подходы.

Population Stability Index (PSI)

Индекс PSI измеряет сдвиг распределения через сравнение долей наблюдений в бинах. Метод универсален: работает для непрерывных и категориальных признаков, дает интерпретируемую числовую оценку степени дрифта.

Формула PSI:

PSI = Σ(Pₐ — Pₑ) × ln(Pₐ / Pₑ)

где:

  • Pₐ — доля наблюдений в бине для текущего распределения;
  • Pₑ — доля наблюдений в бине для референсного распределения;
  • ln — натуральный логарифм.

Интерпретация значений:

  • PSI < 0.1 — дрифт отсутствует,
  • 0.1 ≤ PSI < 0.25 — умеренный дрифт (мониторинг),
  • PSI ≥ 0.25 — критический дрифт (требуется переобучение).
import numpy as np
import pandas as pd

def calculate_psi(reference, current, bins=10, categorical=False):
    """
    Расчет Population Stability Index (PSI)
    
    Args:
        reference: референсное распределение
        current: текущее распределение
        bins: количество бинов для непрерывных признаков
        categorical: True для категориальных признаков
    
    Returns:
        float: значение PSI
    """
    if categorical:
        # Для категориальных: каждая категория = бин
        ref_counts = reference.value_counts()
        curr_counts = current.value_counts()
        
        # Объединение всех категорий
        all_categories = set(ref_counts.index) | set(curr_counts.index)
        ref_props = np.array([ref_counts.get(cat, 0) for cat in all_categories], dtype=float)
        curr_props = np.array([curr_counts.get(cat, 0) for cat in all_categories], dtype=float)
    else:
        # Для непрерывных: квантильные бины на reference
        breakpoints = np.quantile(reference, np.linspace(0, 1, bins + 1))
        breakpoints[0] = -np.inf
        breakpoints[-1] = np.inf
        
        ref_binned = pd.cut(reference, bins=breakpoints)
        curr_binned = pd.cut(current, bins=breakpoints)
        
        ref_counts = ref_binned.value_counts()
        curr_counts = curr_binned.value_counts()
        
        ref_props = ref_counts.values.astype(float)
        curr_props = curr_counts.values.astype(float)
    
    # Нормализация
    ref_props /= ref_props.sum()
    curr_props /= curr_props.sum()
    
    # Защита от нулевых долей
    ref_props = np.where(ref_props == 0, 1e-10, ref_props)
    curr_props = np.where(curr_props == 0, 1e-10, curr_props)
    
    # PSI формула
    psi_value = np.sum((curr_props - ref_props) * np.log(curr_props / ref_props))
    
    return psi_value

# Пример с разными уровнями дрифта
ref_normal = np.random.normal(0, 1, 2000)
curr_no_drift = np.random.normal(0, 1, 1000)
curr_moderate = np.random.normal(0.3, 1.1, 1000)
curr_severe = np.random.normal(1, 2, 1000)

print("PSI для разных сценариев:")
print(f"Без дрифта: {calculate_psi(ref_normal, curr_no_drift):.4f}")
print(f"Умеренный дрифт: {calculate_psi(ref_normal, curr_moderate):.4f}")
print(f"Критический дрифт: {calculate_psi(ref_normal, curr_severe):.4f}")

# Категориальный пример
ref_categorical = pd.Series(np.random.choice(['A', 'B', 'C', 'D'], 2000, p=[0.4, 0.3, 0.2, 0.1]))
curr_categorical = pd.Series(np.random.choice(['A', 'B', 'C', 'D'], 1000, p=[0.2, 0.3, 0.3, 0.2]))

print(f"\nPSI для категориального признака: {calculate_psi(ref_categorical, curr_categorical, categorical=True):.4f}")
PSI для разных сценариев:
Без дрифта: 0.0136
Умеренный дрифт: 0.1147
Критический дрифт: 0.8746

PSI для категориального признака: 0.2821

PSI обладает важным преимуществом: одна метрика для всех типов признаков. Для непрерывных переменных создаются квантильные бины на референсных данных, затем текущее распределение проецируется на эти границы. Для категориальных каждая уникальная категория становится отдельным бином.

Метод устойчив к выбросам, так как оперирует долями в бинах, а не сырыми значениями. Квантильное биннирование гарантирует равное количество наблюдений в каждом бине для референсного распределения, что повышает статистическую мощность детекции.

Выбор количества бинов влияет на чувствительность:

  • 10 бинов — стандарт для большинства задач;
  • 20 бинов — для больших выборок и детального анализа;
  • 5 бинов — для малых выборок.

PSI не дает p-value, но пороговые значения 0.1 и 0.25 подтверждены эмпирически в индустрии.

Jensen-Shannon Divergence

JS-дивергенция измеряет симметричное различие между двумя вероятностными распределениями. В отличие от других типов дивергенций, метод симметричен и ограничен диапазоном [0, 1], что упрощает его интерпретацию.

from scipy.spatial.distance import jensenshannon
from scipy.stats import entropy

def calculate_js_divergence(reference, current, bins=50):
    """
    Расчет Jensen-Shannon Divergence для непрерывных признаков
    
    Args:
        reference: референсное распределение
        current: текущее распределение
        bins: количество бинов для гистограммы
    
    Returns:
        float: JS дивергенция в диапазоне [0, 1]
    """
    # Определение общих границ бинов
    combined = np.concatenate([reference, current])
    bin_edges = np.histogram_bin_edges(combined, bins=bins)
    
    # Гистограммы с нормализацией
    ref_hist, _ = np.histogram(reference, bins=bin_edges, density=True)
    curr_hist, _ = np.histogram(current, bins=bin_edges, density=True)
    
    # Нормализация для получения вероятностей
    ref_probs = ref_hist / ref_hist.sum()
    curr_probs = curr_hist / curr_hist.sum()
    
    # Защита от нулей
    ref_probs = np.where(ref_probs == 0, 1e-10, ref_probs)
    curr_probs = np.where(curr_probs == 0, 1e-10, curr_probs)
    
    # JS дивергенция
    js_div = jensenshannon(ref_probs, curr_probs, base=2)
    
    return js_div

def monitor_multivariate_drift(reference_df, current_df, features, method='psi'):
    """
    Мониторинг дрифта по множеству признаков
    
    Returns:
        DataFrame с метриками дрифта для каждого признака
    """
    results = []
    
    for feature in features:
        ref_data = reference_df[feature].dropna()
        curr_data = current_df[feature].dropna()
        
        if method == 'psi':
            drift_score = calculate_psi(ref_data, curr_data)
            threshold_warn = 0.1
            threshold_critical = 0.25
        elif method == 'js':
            drift_score = calculate_js_divergence(ref_data, curr_data)
            threshold_warn = 0.1
            threshold_critical = 0.3
        
        status = 'OK'
        if drift_score >= threshold_critical:
            status = 'CRITICAL'
        elif drift_score >= threshold_warn:
            status = 'WARNING'
        
        results.append({
            'feature': feature,
            'drift_score': drift_score,
            'status': status,
            'ref_mean': ref_data.mean(),
            'curr_mean': curr_data.mean(),
            'mean_shift_%': ((curr_data.mean() - ref_data.mean()) / ref_data.mean() * 100) if ref_data.mean() != 0 else 0
        })
    
    return pd.DataFrame(results).sort_values('drift_score', ascending=False)

# Пример мониторинга датасета
np.random.seed(123)
reference_data = pd.DataFrame({
    'age': np.random.normal(35, 10, 5000),
    'income': np.random.lognormal(10, 1, 5000),
    'credit_score': np.random.normal(650, 80, 5000),
    'loan_amount': np.random.uniform(5000, 50000, 5000)
})

current_data = pd.DataFrame({
    'age': np.random.normal(37, 11, 2000),
    'income': np.random.lognormal(10.2, 1.1, 2000),
    'credit_score': np.random.normal(630, 90, 2000),
    'loan_amount': np.random.uniform(8000, 55000, 2000)
})

drift_report = monitor_multivariate_drift(
    reference_data, 
    current_data, 
    ['age', 'income', 'credit_score', 'loan_amount'],
    method='psi'
)

print(drift_report.to_string(index=False))
     feature  drift_score  status     ref_mean    curr_mean  mean_shift_%
 loan_amount     0.124800 WARNING 27623.168220 31505.408501     14.054290
credit_score     0.101958 WARNING   652.058734   627.260767     -3.803027
      income     0.058415      OK 36834.662862 47908.792175     30.064424
         age     0.023493      OK    35.210829    36.489371      3.631105

Код демонстрирует полноценный workflow мониторинга множественных признаков. JS-дивергенция вычисляется через гистограммы с общими границами бинов для обоих распределений. Функция monitor_multivariate_drift агрегирует результаты по всем признакам и ранжирует их по степени дрифта.

👉🏻  Автоматизация процессов анализа данных с помощью Python

Практическая интерпретация: признаки с наибольшим drift_score требуют приоритетного анализа. Процентное изменение среднего показывает направление дрифта. Статус CRITICAL сигнализирует о необходимости срочного переобучения или ревизии фичей.

JS-дивергенция обладает математическими преимуществами: симметричность (JS(P||Q) = JS(Q||P)), ограниченность квадратным корнем из 1, гладкость. Метод менее чувствителен к редким значениям по сравнению с KL-дивергенцией, что максимально ценно для признаков с длинными хвостами распределения, которые часто встречаются в финансовых временных рядах.

Метрики качества классификации в MLOps

Метрики классификации измеряют соответствие предсказаний модели реальным меткам классов. Выбор метрики определяется балансом классов, стоимостью ошибок и спецификой задачи.

Базовые метрики: Precision, Recall, F1-Score

Precision (точность) показывает долю корректных предсказаний положительного класса среди всех предсказаний положительного класса. Recall (полнота) измеряет долю найденных положительных объектов среди всех истинно положительных.

Precision = TP / (TP + FP)
Recall = TP / (TP + FN)

TP — true positives (истинно положительные)
FP — false positives (ложно положительные)
FN — false negatives (ложно отрицательные)

F1-score объединяет precision и recall через гармоническое среднее, обеспечивая баланс между двумя метриками.

from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix
from sklearn.metrics import precision_recall_curve, roc_curve, roc_auc_score
import matplotlib.pyplot as plt

def calculate_classification_metrics(y_true, y_pred, y_pred_proba=None):
    """
    Расчет полного набора метрик классификации
    
    Args:
        y_true: истинные метки
        y_pred: предсказанные метки (0/1)
        y_pred_proba: вероятности положительного класса
    
    Returns:
        dict с метриками
    """
    metrics = {
        'precision': precision_score(y_true, y_pred),
        'recall': recall_score(y_true, y_pred),
        'f1': f1_score(y_true, y_pred),
        'accuracy': (y_pred == y_true).mean()
    }
    
    # Confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    metrics['true_negatives'] = cm[0, 0]
    metrics['false_positives'] = cm[0, 1]
    metrics['false_negatives'] = cm[1, 0]
    metrics['true_positives'] = cm[1, 1]
    
    # Дополнительные метрики для несбалансированных классов
    metrics['specificity'] = cm[0, 0] / (cm[0, 0] + cm[0, 1]) if (cm[0, 0] + cm[0, 1]) > 0 else 0
    
    if y_pred_proba is not None:
        metrics['roc_auc'] = roc_auc_score(y_true, y_pred_proba)
    
    return metrics

def monitor_classification_drift(reference_metrics, current_metrics, threshold=0.05):
    """
    Детекция дрифта качества модели классификации
    
    Args:
        reference_metrics: метрики на reference периоде
        current_metrics: метрики на текущем периоде
        threshold: допустимое падение метрик
    
    Returns:
        dict с результатами мониторинга
    """
    drift_report = {}
    
    for metric_name in ['precision', 'recall', 'f1', 'roc_auc']:
        if metric_name in reference_metrics and metric_name in current_metrics:
            ref_value = reference_metrics[metric_name]
            curr_value = current_metrics[metric_name]
            
            degradation = ref_value - curr_value
            degradation_pct = (degradation / ref_value * 100) if ref_value > 0 else 0
            
            drift_report[metric_name] = {
                'reference': ref_value,
                'current': curr_value,
                'degradation': degradation,
                'degradation_%': degradation_pct,
                'alert': degradation > threshold
            }
    
    return drift_report

# Симуляция деградации модели
np.random.seed(42)

# Reference период: модель работает хорошо
y_true_ref = np.random.binomial(1, 0.3, 1000)
y_pred_proba_ref = np.clip(y_true_ref + np.random.normal(0, 0.3, 1000), 0, 1)
y_pred_ref = (y_pred_proba_ref > 0.5).astype(int)

# Current период: концептуальный дрифт, модель деградирует
y_true_curr = np.random.binomial(1, 0.3, 1000)
y_pred_proba_curr = np.clip(y_true_curr + np.random.normal(0, 0.5, 1000), 0, 1)
y_pred_curr = (y_pred_proba_curr > 0.5).astype(int)

ref_metrics = calculate_classification_metrics(y_true_ref, y_pred_ref, y_pred_proba_ref)
curr_metrics = calculate_classification_metrics(y_true_curr, y_pred_curr, y_pred_proba_curr)

drift_report = monitor_classification_drift(ref_metrics, curr_metrics, threshold=0.03)

print("Мониторинг деградации модели:\n")
for metric, data in drift_report.items():
    alert_marker = "⚠️ ALERT" if data['alert'] else "✓ OK"
    print(f"{metric.upper()}: {data['reference']:.3f} → {data['current']:.3f} "
          f"(падение {data['degradation_%']:.1f}%) {alert_marker}")
Мониторинг деградации модели:

PRECISION: 0.845 → 0.660 (падение 21.9%) ⚠️ ALERT
RECALL: 0.948 → 0.840 (падение 11.4%) ⚠️ ALERT
F1: 0.894 → 0.740 (падение 17.2%) ⚠️ ALERT
ROC_AUC: 0.986 → 0.910 (падение 7.7%) ⚠️ ALERT

Реализация включает расчет базовых метрик и детекцию их деградации во времени. Матрица несоответствий (Confusion matrix) разбивается на компоненты для анализа типов ошибок. Специфичность (Specificity — доля корректно классифицированных отрицательных примеров) дополняет полноту (Recall) для полной картины производительности.

👉🏻  Алгоритмы сбора биржевых данных: практическое руководство

Функция monitor_classification_drift сравнивает метрики текущего периода и референсного, вычисляя абсолютное и процентное падение. Threshold определяет допустимую деградацию: 3-5% — стандарт для production систем, 10% — критический уровень требующий немедленного переобучения.

Я рекомендую вести мониторинг сразу всех метрик, поскольку это позволяет определить характер деградации:

  • Падение Precision при стабильном Recall указывает на рост False positives;
  • Снижение Recall при сохранении Precision сигнализирует о пропуске положительных примеров;
  • Одновременное падение обеих метрик — признак концептуального дрифта.

ROC-AUC и PR-AUC

Метрика ROC-AUC (Area Under Receiver Operating Characteristic) измеряет способность модели ранжировать примеры: вероятность того, что случайно выбранный положительный пример получит более высокий скор, чем отрицательный.

Метрика PR-AUC (Area Under Precision-Recall Curve) фокусируется на производительности для положительного класса. Ее часто используют для несбалансированных задач, где доля положительного класса крайне мала и доминирует отрицательный класс.

from sklearn.metrics import average_precision_score

def plot_roc_pr_curves(y_true, y_pred_proba, title_prefix=""):
    """
    Визуализация ROC и PR кривых для анализа качества
    """
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    # ROC Curve
    fpr, tpr, _ = roc_curve(y_true, y_pred_proba)
    roc_auc = roc_auc_score(y_true, y_pred_proba)
    
    axes[0].plot(fpr, tpr, color='#2C3E50', linewidth=2, label=f'ROC (AUC = {roc_auc:.3f})')
    axes[0].plot([0, 1], [0, 1], 'k--', linewidth=1, label='Random')
    axes[0].set_xlabel('False Positive Rate', fontsize=11)
    axes[0].set_ylabel('True Positive Rate', fontsize=11)
    axes[0].set_title(f'{title_prefix}ROC Curve', fontsize=12, fontweight='bold')
    axes[0].legend(loc='lower right')
    axes[0].grid(alpha=0.3)
    
    # PR Curve
    precision, recall, _ = precision_recall_curve(y_true, y_pred_proba)
    pr_auc = average_precision_score(y_true, y_pred_proba)
    baseline = y_true.mean()
    
    axes[1].plot(recall, precision, color='#2C3E50', linewidth=2, label=f'PR (AUC = {pr_auc:.3f})')
    axes[1].axhline(y=baseline, color='gray', linestyle='--', linewidth=1, label=f'Baseline ({baseline:.3f})')
    axes[1].set_xlabel('Recall', fontsize=11)
    axes[1].set_ylabel('Precision', fontsize=11)
    axes[1].set_title(f'{title_prefix}Precision-Recall Curve', fontsize=12, fontweight='bold')
    axes[1].legend(loc='upper right')
    axes[1].grid(alpha=0.3)
    
    plt.tight_layout()
    return fig

# Сравнение моделей с разной степенью дрифта
fig1 = plot_roc_pr_curves(y_true_ref, y_pred_proba_ref, "Reference Period: ")
fig2 = plot_roc_pr_curves(y_true_curr, y_pred_proba_curr, "Current Period (with Drift): ")

plt.show()

print(f"\nReference ROC-AUC: {roc_auc_score(y_true_ref, y_pred_proba_ref):.3f}")
print(f"Current ROC-AUC: {roc_auc_score(y_true_curr, y_pred_proba_curr):.3f}")
print(f"\nReference PR-AUC: {average_precision_score(y_true_ref, y_pred_proba_ref):.3f}")
print(f"Current PR-AUC: {average_precision_score(y_true_curr, y_pred_proba_curr):.3f}")

Рис. 1: Сравнение метрик для текущего (current) и референсного (reference) периодов: левая панель: ROC-кривые. Правая панель: PR-кривые. Деградация модели визуализируется смещением кривых относительно предыдущих уровней

Рис. 1: Сравнение метрик для текущего (current) и референсного (reference) периодов: левая панель: ROC-кривые. Правая панель: PR-кривые. Деградация модели визуализируется смещением кривых относительно предыдущих уровней

Reference ROC-AUC: 0.986
Current ROC-AUC: 0.910

Reference PR-AUC: 0.973
Current PR-AUC: 0.811

ROC-AUC устойчива к дисбалансу классов только в смысле ранжирования, но не отражает абсолютное качество предсказаний при малом Positive rate. PR-AUC решает эту проблему: метрика чувствительна к качеству именно положительного класса, что критично в задачах детекции аномалий, фрода, медицинской диагностики.

Я обычно использую ROC-AUC для оценки общей разделимости классов, PR-AUC — для оценки практической ценности модели при дисбалансе. Если положительный класс составляет менее 10% выборки, PR-AUC становится основной метрикой мониторинга.

Метрики качества регрессии в MLOps

Метрики регрессии оценивают точность предсказания непрерывных величин. Выбор зависит от масштаба целевой переменной, чувствительности к выбросам и интерпретируемости для бизнеса.

MAE, RMSE, MAPE

MAE (Mean Absolute Error) усредняет абсолютные ошибки предсказаний. Метрика линейна: все ошибки вносят пропорциональный вклад независимо от величины.

MAE = (1/n) × Σ|yᵢ — ŷᵢ|

где:

  • n — количество наблюдений;
  • yᵢ — истинное значение;
  • ŷᵢ — предсказание модели.

RMSE (Root Mean Squared Error) возводит ошибки в квадрат перед усреднением, затем извлекает корень. Метрика штрафует большие отклонения сильнее, чем малые.

RMSE = √((1/n) × Σ(yᵢ — ŷᵢ)²)

MAPE (Mean Absolute Percentage Error) нормализует ошибки относительно истинных значений, давая процентную оценку точности.

👉🏻  Топ-10 лучших инструментов MLOps: сравнение и выбор

MAPE = (100%/n) × Σ|((yᵢ — ŷᵢ) / yᵢ)|

from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

def calculate_regression_metrics(y_true, y_pred):
    """
    Расчет набора метрик для задач регрессии
    
    Returns:
        dict с метриками качества
    """
    mae = mean_absolute_error(y_true, y_pred)
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    
    # MAPE с защитой от деления на ноль
    mask = y_true != 0
    mape = np.mean(np.abs((y_true[mask] - y_pred[mask]) / y_true[mask])) * 100
    
    # R-squared
    r2 = r2_score(y_true, y_pred)
    
    # Median Absolute Error (устойчив к выбросам)
    medae = np.median(np.abs(y_true - y_pred))
    
    # Symmetric MAPE (для случаев с нулевыми значениями)
    smape = np.mean(2 * np.abs(y_true - y_pred) / (np.abs(y_true) + np.abs(y_pred))) * 100
    
    return {
        'MAE': mae,
        'RMSE': rmse,
        'MAPE': mape,
        'R²': r2,
        'MedAE': medae,
        'sMAPE': smape
    }

def regression_drift_monitor(reference_y_true, reference_y_pred, 
                             current_y_true, current_y_pred):
    """
    Мониторинг деградации регрессионной модели
    
    Returns:
        DataFrame со сравнением метрик
    """
    ref_metrics = calculate_regression_metrics(reference_y_true, reference_y_pred)
    curr_metrics = calculate_regression_metrics(current_y_true, current_y_pred)
    
    comparison = []
    for metric_name in ref_metrics.keys():
        ref_val = ref_metrics[metric_name]
        curr_val = curr_metrics[metric_name]
        
        # Для R² большее значение лучше, для остальных — меньшее
        if metric_name == 'R²':
            degradation = ref_val - curr_val
            worse = curr_val < ref_val else: degradation = curr_val - ref_val worse = curr_val > ref_val
        
        degradation_pct = (degradation / abs(ref_val) * 100) if ref_val != 0 else 0
        
        comparison.append({
            'Metric': metric_name,
            'Reference': f"{ref_val:.4f}",
            'Current': f"{curr_val:.4f}",
            'Change_%': f"{degradation_pct:+.2f}%",
            'Status': '⚠️ WORSE' if worse else '✓ OK'
        })
    
    return pd.DataFrame(comparison)

# Симуляция деградации регрессионной модели
np.random.seed(42)

# Reference: модель точна
X_ref = np.linspace(0, 10, 500)
y_true_ref = 2 * X_ref + 5 + np.random.normal(0, 2, 500)
y_pred_ref = 2 * X_ref + 5 + np.random.normal(0, 1, 500)

# Current: концептуальный дрифт, зависимость изменилась
X_curr = np.linspace(0, 10, 500)
y_true_curr = 2.25 * X_curr + 5 + np.random.normal(0, 2, 500)
y_pred_curr = 2 * X_curr + 5 + np.random.normal(0, 1, 500)  # модель не адаптировалась

drift_report = regression_drift_monitor(y_true_ref, y_pred_ref, y_true_curr, y_pred_curr)
print(drift_report.to_string(index=False))

# Визуализация ошибок
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

axes[0].scatter(y_true_ref, y_pred_ref, alpha=0.5, s=20, color='#2C3E50')
axes[0].plot([y_true_ref.min(), y_true_ref.max()], 
             [y_true_ref.min(), y_true_ref.max()], 
             'r--', linewidth=2, label='Perfect Prediction')
axes[0].set_xlabel('True Values', fontsize=11)
axes[0].set_ylabel('Predicted Values', fontsize=11)
axes[0].set_title('Reference Period', fontsize=12, fontweight='bold')
axes[0].legend()
axes[0].grid(alpha=0.3)

axes[1].scatter(y_true_curr, y_pred_curr, alpha=0.5, s=20, color='#E74C3C')
axes[1].plot([y_true_curr.min(), y_true_curr.max()], 
             [y_true_curr.min(), y_true_curr.max()], 
             'r--', linewidth=2, label='Perfect Prediction')
axes[1].set_xlabel('True Values', fontsize=11)
axes[1].set_ylabel('Predicted Values', fontsize=11)
axes[1].set_title('Current Period (with Drift)', fontsize=12, fontweight='bold')
axes[1].legend()
axes[1].grid(alpha=0.3)

plt.tight_layout()
plt.show()
Metric Reference Current Change_%   Status
   MAE    1.8116  2.2291  +23.05% ⚠️ WORSE
  RMSE    2.2558  2.7723  +22.90% ⚠️ WORSE
  MAPE   15.9465 15.7851   -1.01%     ✓ OK
    R²    0.8654  0.8346   +3.56% ⚠️ WORSE
 MedAE    1.5484  1.8659  +20.50% ⚠️ WORSE
 sMAPE   14.7467 16.0373   +8.75% ⚠️ WORSE

Графики рассеяния значений истинных против предсказанных для reference и current периодов. Левый график показывает хорошую калибровку модели с точками близко к диагонали. Правый график демонстрирует систематическое смещение предсказаний после концептуального дрифта

Рис. 2: Графики рассеяния значений истинных против предсказанных для reference и current периодов. Левый график показывает хорошую калибровку модели с точками близко к диагонали. Правый график демонстрирует систематическое смещение предсказаний после концептуального дрифта

Код демонстрирует комплексный мониторинг регрессии с расчетом 6 метрик и детекцией их изменений. Функция calculate_regression_metrics включает защиту от граничных случаев: MAPE исключает нулевые истинные значения, sMAPE (Symmetric MAPE) работает корректно при y_true близких к нулю.

MedAE (Median Absolute Error) устойчив к выбросам и ценен для задач, где редкие экстремальные ошибки не должны доминировать в оценке. R² показывает долю объясненной дисперсии: значение 0.8 означает, что модель объясняет 80% вариативности таргета.

Directional Accuracy для прогнозных моделей

Метрика Directional Accuracy (направленная верность) оценивает способность модели правильно предсказывать направление изменения целевой переменной. Она часто применяется в задачах прогнозирования временных рядов, где важнее определить знак изменения, чем его точную величину.

import numpy as np
import matplotlib.pyplot as plt

def calculate_directional_accuracy(y_true, y_pred, y_prev):
    true_direction = np.sign(y_true - y_prev)
    pred_direction = np.sign(y_pred - y_prev)
    mask = true_direction != 0
    return (true_direction[mask] == pred_direction[mask]).mean()

# Синтетический временной ряд для примера
np.random.seed(42)
n_points = 500
time = np.arange(n_points)

base_trend = 0.3 * time

# Инициализация амплитуды и фазы
amp_drift = np.ones(n_points)
phase_drift = np.zeros(n_points)

# Стабильность
amp_drift[:125] = 1
phase_drift[:125] = 0

# Начало расхождения
amp_drift[80:140] = np.linspace(1.0, 1.02, 60)
phase_drift[80:140] = np.linspace(0, 0.05, 60)

# Легкое расхождение
amp_drift[140:190] = np.linspace(1.02, 1.05, 50)
phase_drift[140:190] = np.linspace(0.05, 0.2, 50)

# Усиливающийся дрифт
amp_drift[190:240] = np.linspace(1.1, 1.1, 50)
phase_drift[190:240] = np.linspace(0.2, 0.8, 50)

# Ускоряющийся дрифт
amp_drift[240:] = np.linspace(1.5, 1.5, n_points - 240)
phase_drift[240:] = np.linspace(0.8, 2.5, n_points - 240)

# Истинный ряд с плавными изменениями
true_series = (
    100
    + base_trend
    + 10 * amp_drift * np.sin(time / 10 + phase_drift)
    + np.random.normal(0, 1, n_points)
)

# Модель застряла в старом паттерне
pred_series = 100 + base_trend + 10 * np.sin(time / 10) + np.random.normal(0, 1, n_points)

# Расчет Directional Accuracy
window = 25
acc_values = []
for i in range(window, n_points):
    y_true = true_series[i-window+1:i+1]
    y_prev = true_series[i-window:i]
    y_pred = pred_series[i-window+1:i+1]
    acc_values.append(calculate_directional_accuracy(y_true, y_pred, y_prev))

# Визуализация
fig, axes = plt.subplots(2, 1, figsize=(14, 8), sharex=True)

# Истинный ряд и прогноз
axes[0].plot(time, true_series, label='Истинные значения', color='black', linewidth=2)
axes[0].plot(time, pred_series, label='Прогноз модели', color='red', alpha=0.7)
axes[0].set_title('Постепенное появление дрифта из-за изменения фазы и амплитуды временного ряда', fontsize=13, fontweight='bold')
axes[0].set_ylabel('Значение ряда')
axes[0].legend()
axes[0].grid(alpha=0.3)

# Динамика Directional Accuracy
axes[1].plot(time[window:], acc_values, color='#E67E22', linewidth=2)
initial_level = np.mean(acc_values[:int(0.25 * len(acc_values))])
axes[1].axhline(initial_level, color='green', linestyle='--', alpha=0.8, label='Начальный уровень')

axes[1].set_xlabel('Время')
axes[1].set_ylabel('Directional Accuracy')
axes[1].set_title('Динамика показателя Directional Accuracy', fontsize=12)
axes[1].legend()
axes[1].grid(alpha=0.3)

plt.tight_layout()
plt.show()

Постепенное снижение направленной верности (Directional Accuracy) ML-модели из-за дрифта, возникающего вследствие поэтапного изменения амплитуды и фазы временного ряда

Рис. 3: Постепенное снижение направленной верности (Directional Accuracy) ML-модели из-за дрифта, возникающего вследствие поэтапного изменения амплитуды и фазы временного ряда

Метрика Directional Accuracy фокусируется на правильности предсказания знака изменения, игнорируя величину отклонения. Метрика превосходит MAE/RMSE в задачах, где решение принимается на основе направления движения: прогнозирование спроса (увеличить или уменьшить заказ), предсказание оттока клиентов (вырастет или снизится), энергопотребление (больше или меньше вчерашнего).

👉🏻  Базы данных для хранения торговых данных: PostgreSQL, Redis, TimescaleDB

Дополнительные метрики Accuracy_Up и Accuracy_Down выявляют асимметрию в качестве предсказаний. Это тоже полезные метрики, так как обученная модель может хорошо предсказывать рост, но плохо — падение. Различие между этими метриками более 15% сигнализирует о необходимости ребалансировки обучающих данных или корректировки функции потерь.

Системы непрерывного мониторинга

Мониторинг ML-моделей в продакшене требует автоматизированной инфраструктуры для сбора метрик, хранения временных рядов, визуализации трендов и настройки оповещений при обнаружении деградации. Система мониторинга интегрируется в конвейер инференса и функционирует в режиме реального времени.

Архитектура мониторинга

Если это ML-модели для финансовых рядов, то традиционно архитектура системы мониторинга включает четыре основных компонента:

  1. Сборщик метрик;
  2. Базу данных временных рядов (time-series database);
  3. Систему визуализации;
  4. Модуль оповещений (алертинга).

Сборщик метрик функционирует как промежуточный слой (middleware) в конвейере инференса, логируя при каждом запросе входные признаки, предсказания и метаданные (временную метку, версию модели, идентификатор запроса), а также периодически (ежечасно или ежедневно) рассчитывая агрегированные метрики дрифта и качества.

База данных временных рядов хранит метрики с привязкой ко времени. Стандартами индустрии являются InfluxDB и Prometheus, при этом InfluxDB оптимизирована для высокочастотной записи и аналитических запросов и поддерживает политики хранения (retention policies) для автоматического удаления устаревших данных.

Визуализация реализуется с помощью Grafana: дашборды отображают тренды метрик, распределения признаков и матрицы ошибок во времени, при этом критически важные метрики выводятся на главный дашборд для постоянного мониторинга.

Модуль алертинга формирует уведомления при превышении пороговых значений. При этом правила учитывают не только абсолютные значения метрик, но и их динамику: резкое падение за короткий период рассматривается как более критическое событие по сравнению с постепенной деградацией.

Алертинг и пороговые значения

Эффективная система алертинга обеспечивает баланс между ложными срабатываниями (false positives) и пропуском реальных проблем (false negatives), при этом многоуровневая настройка порогов для уровней WARNING и CRITICAL позволяет снизить эффект усталости от оповещений (alert fatigue).

from dataclasses import dataclass
from enum import Enum
from typing import List, Dict, Callable
from datetime import datetime
import random
import smtplib
from email.mime.text import MIMEText

class AlertSeverity(Enum):
    WARNING = "WARNING"
    CRITICAL = "CRITICAL"

@dataclass
class AlertRule:
    """Правило алертинга для метрики"""
    metric_name: str
    warning_threshold: float
    critical_threshold: float
    comparison: str  # 'less_than', 'greater_than'
    window_size: int  # количество последовательных нарушений для триггера
    
@dataclass
class Alert:
    """Объект алерта"""
    severity: AlertSeverity
    metric_name: str
    current_value: float
    threshold: float
    message: str
    timestamp: datetime

class AlertingSystem:
    """
    Система алертинга для ML-моделей
    """
    
    def __init__(self, model_name: str):
        self.model_name = model_name
        self.alert_rules: Dict[str, AlertRule] = {}
        self.violation_counters: Dict[str, int] = {}
        self.alert_handlers: List[Callable] = []
    
    def add_rule(self, rule: AlertRule):
        """Добавление правила алертинга"""
        self.alert_rules[rule.metric_name] = rule
        self.violation_counters[rule.metric_name] = 0
    
    def add_handler(self, handler: Callable[[Alert], None]):
        """Добавление обработчика алертов (email, Slack, PagerDuty)"""
        self.alert_handlers.append(handler)
    
    def check_metrics(self, metrics: Dict[str, float]) -> List[Alert]:
        """
        Проверка метрик на превышение порогов
        
        Returns:
            List[Alert]: список сгенерированных алертов
        """
        alerts = []
        
        for metric_name, metric_value in metrics.items():
            if metric_name not in self.alert_rules:
                continue
            
            rule = self.alert_rules[metric_name]
            violated = self._check_threshold(metric_value, rule)
            
            if violated:
                self.violation_counters[metric_name] += 1
                
                # Триггер алерта только после window_size последовательных нарушений
                if self.violation_counters[metric_name] >= rule.window_size:
                    alert = self._create_alert(metric_name, metric_value, rule)
                    alerts.append(alert)
                    
                    # Выполнение обработчиков
                    for handler in self.alert_handlers:
                        handler(alert)
                    
                    # Сброс счетчика после алерта
                    self.violation_counters[metric_name] = 0
            else:
                # Сброс счетчика при восстановлении метрики
                self.violation_counters[metric_name] = 0
        
        return alerts
    
    def _check_threshold(self, value: float, rule: AlertRule) -> bool:
        """Проверка нарушения порогов"""
        if rule.comparison == 'less_than':
            return value < rule.critical_threshold or value < rule.warning_threshold elif rule.comparison == 'greater_than': return value > rule.critical_threshold or value > rule.warning_threshold
        return False
    
    def _create_alert(self, metric_name: str, value: float, rule: AlertRule) -> Alert:
        """Создание объекта алерта"""
        if rule.comparison == 'less_than':
            if value < rule.critical_threshold: severity = AlertSeverity.CRITICAL threshold = rule.critical_threshold else: severity = AlertSeverity.WARNING threshold = rule.warning_threshold else: if value > rule.critical_threshold:
                severity = AlertSeverity.CRITICAL
                threshold = rule.critical_threshold
            else:
                severity = AlertSeverity.WARNING
                threshold = rule.warning_threshold
        
        message = (f"[{severity.value}] Model '{self.model_name}': "
                  f"{metric_name} = {value:.4f} "
                  f"({'<' if rule.comparison == 'less_than' else '>'} "
                  f"threshold {threshold:.4f})")
        
        return Alert(
            severity=severity,
            metric_name=metric_name,
            current_value=value,
            threshold=threshold,
            message=message,
            timestamp=datetime.now()
        )

# Обработчики алертов
def console_alert_handler(alert: Alert):
    """Вывод алерта в консоль"""
    emoji = "🚨" if alert.severity == AlertSeverity.CRITICAL else "⚠️"
    print(f"{emoji} {alert.message}")

def email_alert_handler(alert: Alert, recipients: List[str], smtp_config: Dict):
    """Отправка алерта по email (упрощенная версия)"""
    # В продакшене используйте надежную email библиотеку
    subject = f"ML Model Alert: {alert.severity.value}"
    body = f"""
    Model Alert Notification
    
    Severity: {alert.severity.value}
    Metric: {alert.metric_name}
    Current Value: {alert.current_value:.4f}
    Threshold: {alert.threshold:.4f}
    Timestamp: {alert.timestamp}
    
    Action Required: Investigate model performance degradation.
    """
    
    print(f"📧 Email alert sent to {recipients}: {subject}")
    # В реальности: отправка через smtplib

# Пример настройки системы алертинга
def setup_alerting_system():
    alerting = AlertingSystem(model_name="fraud_detector")
    
    # Правила для метрик качества (качество падает - bad)
    alerting.add_rule(AlertRule(
        metric_name='precision',
        warning_threshold=0.80,
        critical_threshold=0.75,
        comparison='less_than',
        window_size=3
    ))
    
    alerting.add_rule(AlertRule(
        metric_name='recall',
        warning_threshold=0.75,
        critical_threshold=0.70,
        comparison='less_than',
        window_size=3
    ))
    
    # Правило для дрифта (дрифт растет - bad)
    alerting.add_rule(AlertRule(
        metric_name='feature_psi',
        warning_threshold=0.15,
        critical_threshold=0.25,
        comparison='greater_than',
        window_size=2
    ))
    
    # Добавление обработчиков
    alerting.add_handler(console_alert_handler)
    # alerting.add_handler(lambda alert: email_alert_handler(
    #     alert, ['ml-team@company.com'], smtp_config
    # ))
    
    return alerting

# Симуляция мониторинга с алертами
alerting_system = setup_alerting_system()

print("Симуляция мониторинга модели с алертингом:\n")

for hour in range(10):
    # Симуляция деградации метрик
    metrics = {
        'precision': 0.85 - hour * 0.015 + random.uniform(-0.01, 0.01),
        'recall': 0.80 - hour * 0.012 + random.uniform(-0.01, 0.01),
        'feature_psi': 0.05 + hour * 0.025 + random.uniform(0, 0.02)
    }
    
    print(f"Hour {hour+1}: Precision={metrics['precision']:.3f}, "
          f"Recall={metrics['recall']:.3f}, PSI={metrics['feature_psi']:.3f}")
    
    alerts = alerting_system.check_metrics(metrics)
    
    if not alerts:
        print("  ✓ All metrics within thresholds\n")
    else:
        print()
Симуляция мониторинга модели с алертингом:

Hour 1: Precision=0.855, Recall=0.793, PSI=0.064
  ✓ All metrics within thresholds

Hour 2: Precision=0.837, Recall=0.782, PSI=0.079
  ✓ All metrics within thresholds

Hour 3: Precision=0.813, Recall=0.769, PSI=0.104
  ✓ All metrics within thresholds

Hour 4: Precision=0.801, Recall=0.762, PSI=0.125
  ✓ All metrics within thresholds

Hour 5: Precision=0.785, Recall=0.759, PSI=0.167
  ✓ All metrics within thresholds

Hour 6: Precision=0.770, Recall=0.731, PSI=0.184
⚠️ [WARNING] Model 'fraud_detector': feature_psi = 0.1841 (> threshold 0.1500)

Hour 7: Precision=0.763, Recall=0.719, PSI=0.218
⚠️ [WARNING] Model 'fraud_detector': precision = 0.7632 (< threshold 0.8000)

Hour 8: Precision=0.754, Recall=0.719, PSI=0.229
⚠️ [WARNING] Model 'fraud_detector': recall = 0.7187 (< threshold 0.7500) ⚠️ [WARNING] Model 'fraud_detector': feature_psi = 0.2293 (> threshold 0.1500)

Hour 9: Precision=0.721, Recall=0.696, PSI=0.253
  ✓ All metrics within thresholds

Hour 10: Precision=0.709, Recall=0.692, PSI=0.277
🚨 [CRITICAL] Model 'fraud_detector': precision = 0.7093 (< threshold 0.7500) 🚨 [CRITICAL] Model 'fraud_detector': feature_psi = 0.2766 (> threshold 0.2500)

Система AlertingSystem реализует подход на основе скользящего окна (window-based), при котором алерт срабатывает только после window_size последовательных нарушений порога, что позволяет уменьшить количество ложных срабатываний на единичные аномалии или шум в метриках.

👉🏻  Библиотека typing в Python для решения задач типизации

Двухуровневые пороги (WARNING/CRITICAL) позволяют ранжировать реакцию: WARNING требует мониторинга и расследования, CRITICAL — немедленного действия (остановка модели, откат на предыдущую версию, экстренное переобучение).

Обработчики алертов (alert handlers) разделяют логику детекции и уведомлений, обеспечивая поддержку множества каналов доставки: консоль для разработки, email для команды, Slack для оперативного реагирования и PagerDuty для on-call инженеров.

Декомпозиция логики детекции и уведомлений через обработчики алертов обеспечивает гибкость и масштабируемость системы. Разделение ответственности позволяет легко добавлять новые каналы оповещений или интегрировать алертинг с существующими DevOps-процессами, минимизируя риск пропуска критических событий и ускоряя реакцию команды на снижение качества модели.

Заключение

Дрифт данных и концептуальный дрифт — главные источники незаметной деградации моделей. Даже небольшие изменения распределения признаков или связи с таргетом могут со временем снизить точность прогнозов.

Детекция дрифта с помощью PSI, KS-теста и JS-дивергенции позволяет выявлять отклонения в распределении признаков до критического падения качества. Метрики классификации и регрессии дают количественную оценку производительности, а метрика Directional accuracy обеспечивает контроль специфичных для прогнозирования временных рядов направлений предсказаний.

Инфраструктура непрерывного мониторинга, интегрированная с базами данных, дашбордами и настроенным алертингом превращает пассивный подход в проактивный: модель перестает ломаться внезапно, а система заранее предупреждает о деградации. Правильно настроенный мониторинг экономит недели инженерного времени на отладку и защищает бизнес-метрики от незаметного падения качества прогнозов.