PatchTST: Трансформер для прогнозирования временных рядов

За последние годы область прогнозирования временных рядов сделала ощутимый скачок вперед благодаря внедрению архитектур трансформеров. Если раньше в этой области полновластно доминировали бустинги, рекуррентные нейронные сети и классические статистические методы, то сегодня мы наблюдаем настоящий прорыв в точности и эффективности нейросетевых моделей.

Одной из наиболее значимых инноваций стала архитектура PatchTST, которая не просто адаптировала трансформеры для работы с временными рядами, но и внесла фундаментальные изменения в подход к обработке временных данных.

История создания и авторы PatchTST

PatchTST была представлена в 2023 году исследовательской группой под руководством Yuqi Nie, Nam H. Nguyen, Phanwadee Sinthong и Jayant Kalagnanam из IBM Research. Работа была опубликована в рамках конференции NeurIPS 2023 под названием «A Time Series is Worth 64 Words: Long-term Forecasting with Transformers». Это исследование стало результатом многолетней работы команды IBM Research по адаптации трансформеров для задач прогнозирования временных рядов.

Создание PatchTST было мотивировано несколькими ключевыми проблемами существующих подходов:

  1. Во-первых, классические трансформеры, такие как BERT или GPT, были разработаны для работы с дискретными токенами, в то время как временные ряды представляют собой непрерывные числовые последовательности;
  2. Во-вторых, большинство существующих решений для временных рядов не могли эффективно улавливать долгосрочные зависимости при сохранении вычислительной эффективности;
  3. В-третьих, многие модели страдали от проблемы «забывания» важной локальной информации при обработке длинных последовательностей.

Команда IBM Research подошла к решению этих проблем с нетривиальной стороны, заимствовав идеи из компьютерного зрения. Подобно тому, как Vision Transformer разбивает изображение на патчи (patches), PatchTST разделяет временной ряд на сегменты, каждый из которых рассматривается как отдельный «токен» для трансформера.

Архитектура и ключевые новшества PatchTST

Архитектура PatchTST основана на нескольких принципиальных инновациях, которые кардинально отличают ее от предыдущих подходов к прогнозированию временных рядов. Центральная идея заключается в разбиении временного ряда на патчи фиксированной длины, которые затем обрабатываются стандартной архитектурой трансформера.

(a) Обзор модели PatchTST, в которой каждый батч M временных рядов длиной L обрабатывается независимо (путем решейпинга в размер батча) с помощью Transformer backbone, а затем полученный батч решейпится обратно в M рядов с длиной предсказания T. Каждый одномерный ряд может быть обработан как supervised (b), где набор векторов с патчами используется для вывода полной длины предсказания, или как self-supervised (c), если предсказываются маскированные патчи.

Рис. 1: (a) Обзор модели PatchTST, в которой каждый батч M временных рядов длиной L обрабатывается независимо (путем решейпинга в размер батча) с помощью Transformer backbone, а затем полученный батч решейпится обратно в M рядов с длиной предсказания T. Каждый одномерный ряд может быть обработан как supervised (b), в этом случае набор векторов с патчами используется для вывода полной длины предсказания, либо как self-supervised (c), если предсказываются маскированные патчи. Источник: https://github.com/huggingface/blog/blob/main/patchtst.md

Процесс начинается с разделения исходного временного ряда длиной L на патчи размером M. Каждый патч представляет собой подпоследовательность временного ряда, которая через линейное преобразование конвертируется в эмбединг вектор размерности T. Это позволяет трансформеру работать с временными рядами так же естественно, как с текстовыми последовательностями или изображениями.

Одним из ключевых преимуществ такого подхода является существенное сокращение длины входной последовательности для трансформера. Вместо обработки L временных точек, модель работает с L/M патчами, что квадратично уменьшает вычислительную сложность механизма внимания с O(L²) до O((L/M)²). При типичных значениях M от 16 до 64, это дает выигрыш в производительности в десятки раз.

Второй важной инновацией стала стратегия channel independence. В отличие от большинства многомерных моделей временных рядов, которые пытаются явно моделировать взаимодействия между различными переменными, PatchTST обрабатывает каждую переменную (канал) независимо. Это решение, которое на первый взгляд может показаться упрощением, на практике показало значительно лучшие результаты.

Причина эффективности channel independence кроется в особенностях реальных данных. В большинстве практических задач корреляции между переменными нестабильны во времени и сильно зависят от контекста. Попытки явного моделирования этих взаимодействий часто приводят к переобучению и ухудшению обобщающей способности модели. PatchTST вместо этого полагается на способность трансформера неявно улавливать важные паттерны в данных каждого канала отдельно.

Визуализация патчей. Здесь у нас есть последовательность из 15 временных шагов с длиной патча 5 и шагом 5, в результате чего получается три патча

Рис. 2: Визуализация патчей. Здесь у нас есть последовательность из 15 рядов с длиной патча 5 и шагом 5, в результате чего получается три патча

Третьим новшеством стала специальная стратегия positional encoding для временных рядов. Классические трансформеры используют позиционное кодирование для информирования модели о порядке элементов в последовательности. В PatchTST позиционное кодирование адаптировано для работы с патчами и включает информацию как о позиции патча в последовательности, так и о временной позиции внутри каждого патча.

Сравнение с предшественниками и улучшение метрик

Для понимания революционности PatchTST важно сравнить ее с предшествующими подходами. До появления PatchTST доминировали несколько категорий моделей: классические статистические методы (ARIMA, ETS), градиентные бустинги (XGBoost, LightGBM), рекуррентные нейронные сети (LSTM, GRU), сверточные сети и ранние адаптации трансформеров.

Рекуррентные сети, долгое время считавшиеся золотым стандартом для временных рядов, страдали от проблемы исчезающих градиентов и не могли эффективно обрабатывать очень длинные последовательности. LSTM и GRU решили часть этих проблем, но оставались медленными в обучении из-за последовательной природы вычислений.

Градиентные бустинги, такие как XGBoost, LightGBM и CatBoost, тоже показали хорошую эффективность для прогнозирования временных рядов. Однако для достижения высокой точности для них требовалась тщательная генерация множества признаков, что предполагает глубокое понимание доменной области и особенностей анализируемого временного ряда. Кроме того, процесс подбора оптимальных гиперпараметров мог занять целую вечность.

Ранние попытки применения трансформеров к временным рядам, такие как Temporal Fusion Transformer (TFT) или Informer, показали многообещающие результаты, но сталкивались с проблемами масштабируемости и интерпретируемости. Они также требовали значительных вычислительных ресурсов и сложной инженерии признаков.

PatchTST продемонстрировал впечатляющие улучшения по всем ключевым метрикам. В бенчмарке на стандартных датасетах (ETTh1, ETTh2, ETTm1, Weather, Traffic) модель показала улучшение Mean Squared Error на 20-40% по сравнению с лучшими предшествующими решениями. Особенно впечатляющими были результаты на длинных горизонтах прогнозирования (720 временных точек), где улучшение достигало 50%.

Результаты многомерного долгосрочного прогнозирования с помощью Supervised PatchTST. Длительность предсказания составляет 24,36,48,60 для датасета ILI и 96,192,336,720 для остальных. Сравнение PatchTST с другими нейронными сетями. Лучшие результаты выделены жирным шрифтом, топ-2 - подчеркнуты

Рис. 3: Результаты многомерного долгосрочного прогнозирования с помощью Supervised PatchTST. Длительность предсказания составляет 24,36,48,60 для датасета ILI и 96,192,336,720 для остальных. Сравнение PatchTST с другими нейронными сетями. Лучшие результаты выделены жирным шрифтом, топ-2 — подчеркнуты

Важно отметить, что улучшения коснулись не только точности, но и эффективности. PatchTST требует в 2-3 раза меньше времени на обучение по сравнению с другими transformed-based моделями, сохраняя при этом сопоставимое или лучшее качество прогнозов. Необходимая для обучения память также сократилась благодаря уменьшению длины последовательностей.

Эффективность прогнозирования (MSE) с различными окнами look-back на 3 больших датасетах: Электричество, Дорожный трафик и Погода. Значения окон look-back: 24,48,96,192,336,720, значения горизонтов прогнозирования: 97,720. Для этого эксперимента использовался Supervised PatchTST/42 и другие модели на основе трансформеров с открытым исходным кодом

Рис. 4: Эффективность прогнозирования (MSE) с различными окнами look-back на трех больших датасетах: Электричество, Дорожный трафик и Погода. Значения окон look-back: 24,48,96,192,336,720, значения горизонтов прогнозирования: 97,720. Для этого эксперимента использовался Supervised PatchTST/42 и другие модели на основе трансформеров с открытым исходным кодом

Пример использования PatchTST №1: Прогнозирование цен облигаций США

С теорией разобрались. Давайте теперь посмотрим как эта модель работает на практике.

Первая практическая модель — применение PatchTST для прогнозирования относительно стабильных финансовых инструментов. 10-летние облигации США представляют собой оптимальный случай для демонстрации возможностей модели на данных с выраженными трендами и циклическими паттернами, но без экстремальной волатильности.

Первый шаг — установка модели из репозитория и установка библиотеки трансформеров.

!pip install git+https://github.com/IBM/tsfm.git
!pip install transformers
#Импорт библиотек и загрузка данных
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from transformers import PatchTSTConfig, PatchTSTForPrediction, Trainer, TrainingArguments, EarlyStoppingCallback, set_seed
from tsfm_public.toolkit.dataset import ForecastDFDataset
from tsfm_public.toolkit.time_series_preprocessor import TimeSeriesPreprocessor
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import TimeSeriesSplit
import torch
import warnings
warnings.filterwarnings("ignore", module="torch")

# Установка seed для воспроизводимости
set_seed(2025)

# Загрузка данных
ticker = 'ZN=F'
start_date = '2018-05-01'
end_date = '2025-05-01'
data = yf.download(ticker, start=start_date, end=end_date)

# Сброс индекса и обработка мультииндекса
data = data.reset_index()
if isinstance(data.columns, pd.MultiIndex):
    data.columns = [col[0] if col[0] != '' else col[1] for col in data.columns]
data['Date'] = pd.to_datetime(data['Date'])
print("Длина данных:", len(data))
print("Колонки после загрузки из yfinance:", data.columns.tolist())

# Расчёт Typical Close
data['Typical_Close'] = (data['High'] + data['Low'] + data['Close']) / 3
print("\nПроверка первых 5 строк данных:")
print(data[['Date', 'Open', 'High', 'Low', 'Close', 'Typical_Close']].head().to_string())
Длина данных: 1762
Колонки после загрузки из yfinance: ['Date', 'Close', 'High', 'Low', 'Open', 'Volume']

Проверка первых 5 строк данных:
        Date        Open        High         Low       Close  Typical_Close
0 2018-05-01  119.546875  119.578125  119.359375  119.375000     119.437500
1 2018-05-02  119.453125  119.562500  119.250000  119.515625     119.442708
2 2018-05-03  119.515625  119.828125  119.437500  119.687500     119.651042
3 2018-05-04  119.687500  119.984375  119.562500  119.703125     119.750000
4 2018-05-07  119.625000  119.765625  119.578125  119.687500     119.677083

После загрузки котировок далее мы указываем параметры обучения модели. Определим горизонт прогнозирования в 20 будущих временных точек, длину контекстного окна (то что будет запоминать модель при обучении) в 128 точек, а длину патча (отрезки для подачи в трансформер) — в 32 точки.

# Настройка PatchTST и запуск обучения модели
# Параметры
context_length = 128
forecast_horizon = 20
patch_length = 32
timestamp_column = 'Date'
id_columns = []
target_columns = ['Typical_Close']

# Разделение данных
N = len(data)
num_train = int(N * 0.7)
num_test = int(N * 0.2)
num_valid = N - num_train - num_test

border1s = [0, num_train - context_length, N - num_test - context_length]
border2s = [num_train, num_train + num_valid, N]

train_data = data.iloc[border1s[0]:border2s[0]].copy()
valid_data = data.iloc[border1s[1]:border2s[1]].copy()
test_data = data.iloc[border1s[2]:border2s[2]].copy()

# Вывод информации о разделении
print("\nРазделение данных:")
print(f"Train: {border1s[0]} до {border2s[0]} (длина: {len(train_data)})")
print(f"Valid: {border1s[1]} до {border2s[1]} (длина: {len(valid_data)})")
print(f"Test: {border1s[2]} до {border2s[2]} (длина: {len(test_data)})")

# Масштабирование
scaler = StandardScaler()
train_data['Typical_Close_scaled'] = scaler.fit_transform(train_data[['Typical_Close']])
valid_data['Typical_Close_scaled'] = scaler.transform(valid_data[['Typical_Close']])
test_data['Typical_Close_scaled'] = scaler.transform(test_data[['Typical_Close']])
target_columns_scaled = ['Typical_Close_scaled']

# Кросс-валидация
print("\nПроведение кросс-валидации с 10 фолдами...")
tscv = TimeSeriesSplit(n_splits=10, test_size=forecast_horizon, max_train_size=num_train)
cv_mape_scores = []

for fold, (train_idx, val_idx) in enumerate(tscv.split(train_data)):
    print(f"Обучение фолда {fold + 1}/10")
    cv_train_data = train_data.iloc[train_idx].copy()
    cv_val_data = train_data.iloc[val_idx].copy()

    if len(cv_train_data) < context_length + forecast_horizon:
        print(f"Пропуск фолда {fold + 1}: недостаточная длина данных ({len(cv_train_data)})")
        continue

    cv_train_data['Typical_Close_scaled'] = scaler.fit_transform(cv_train_data[['Typical_Close']])
    cv_val_data['Typical_Close_scaled'] = scaler.transform(cv_val_data[['Typical_Close']])

    cv_preprocessor = TimeSeriesPreprocessor(
        timestamp_column=timestamp_column,
        id_columns=id_columns,
        target_columns=target_columns_scaled,
        scaling=False
    )
    cv_preprocessor = cv_preprocessor.train(cv_train_data)

    cv_train_dataset = ForecastDFDataset(
        data=cv_preprocessor.preprocess(cv_train_data),
        id_columns=id_columns,
        timestamp_column=timestamp_column,
        target_columns=target_columns_scaled,
        context_length=context_length,
        prediction_length=forecast_horizon
    )
    cv_val_dataset = ForecastDFDataset(
        data=cv_preprocessor.preprocess(cv_val_data),
        id_columns=id_columns,
        timestamp_column=timestamp_column,
        target_columns=target_columns_scaled,
        context_length=context_length,
        prediction_length=forecast_horizon
    )

    cv_config = PatchTSTConfig(
        num_input_channels=1,
        context_length=context_length,
        patch_length=patch_length,
        patch_stride=patch_length,
        prediction_length=forecast_horizon,
        random_mask_ratio=0.1,
        d_model=128,
        num_attention_heads=16,
        num_hidden_layers=3,
        ffn_dim=256,
        dropout=0.05,
        head_dropout=0.05,
        scaling="std",
        loss="mse",
        pre_norm=True,
        norm_type="batchnorm"
    )
    cv_model = PatchTSTForPrediction(cv_config)

    cv_training_args = TrainingArguments(
        output_dir=f"./checkpoint/patchtst/ZN_F/cv_fold_{fold}/",
        overwrite_output_dir=True,
        num_train_epochs=30,
        do_eval=True,
        eval_strategy="epoch",
        per_device_train_batch_size=64,
        per_device_eval_batch_size=64,
        dataloader_num_workers=16,
        save_strategy="epoch",
        logging_strategy="epoch",
        save_total_limit=1,
        load_best_model_at_end=True,
        metric_for_best_model="eval_loss",
        greater_is_better=False,
        label_names=["future_values"],
        learning_rate=1e-4
    )

    cv_trainer = Trainer(
        model=cv_model,
        args=cv_training_args,
        train_dataset=cv_train_dataset,
        eval_dataset=cv_val_dataset,
        callbacks=[EarlyStoppingCallback(early_stopping_patience=10, early_stopping_threshold=0.0001)]
    )

    cv_trainer.train()

    cv_prediction_output = cv_trainer.predict(cv_val_dataset)
    cv_predictions = cv_prediction_output.predictions
    cv_labels = cv_prediction_output.label_ids

    if isinstance(cv_predictions, tuple):
        cv_predictions = cv_predictions[0]
    if isinstance(cv_labels, tuple):
        cv_labels = cv_labels[0]

    cv_predictions = np.array(cv_predictions) if isinstance(cv_predictions, torch.Tensor) else cv_predictions
    cv_labels = np.array(cv_labels) if isinstance(cv_labels, torch.Tensor) else cv_labels

    cv_predicted_values = scaler.inverse_transform(cv_predictions.reshape(-1, 1)).reshape(-1, forecast_horizon)
    cv_actual_values = scaler.inverse_transform(cv_labels.reshape(-1, 1)).reshape(-1, forecast_horizon)

    cv_mape = np.mean(np.abs((cv_actual_values.flatten() - cv_predicted_values.flatten()) / cv_actual_values.flatten())) * 100
    cv_mape_scores.append(cv_mape)
    print(f"Фолд {fold + 1} MAPE: {cv_mape:.2f}%")

if cv_mape_scores:
    print(f"\nРезультаты кросс-валидации:")
    print(f"Средний MAPE: {np.mean(cv_mape_scores):.2f}%")
    print(f"Стандартное отклонение MAPE: {np.std(cv_mape_scores):.2f}%")
else:
    print("\nНет валидных фолдов для кросс-валидации.")

# Подготовка данных для финальной модели
preprocessor = TimeSeriesPreprocessor(
    timestamp_column=timestamp_column,
    id_columns=id_columns,
    target_columns=target_columns_scaled,
    scaling=False
)
preprocessor = preprocessor.train(train_data)

train_dataset = ForecastDFDataset(
    data=preprocessor.preprocess(train_data),
    id_columns=id_columns,
    timestamp_column=timestamp_column,
    target_columns=target_columns_scaled,
    context_length=context_length,
    prediction_length=forecast_horizon
)

valid_dataset = ForecastDFDataset(
    data=preprocessor.preprocess(valid_data),
    id_columns=id_columns,
    timestamp_column=timestamp_column,
    target_columns=target_columns_scaled,
    context_length=context_length,
    prediction_length=forecast_horizon
)

test_dataset = ForecastDFDataset(
    data=preprocessor.preprocess(test_data),
    id_columns=id_columns,
    timestamp_column=timestamp_column,
    target_columns=target_columns_scaled,
    context_length=context_length,
    prediction_length=forecast_horizon
)

# Конфигурация модели
config = PatchTSTConfig(
    num_input_channels=1,
    context_length=context_length,
    patch_length=patch_length,
    patch_stride=patch_length,
    prediction_length=forecast_horizon,
    random_mask_ratio=0.1,
    d_model=128,
    num_attention_heads=16,
    num_hidden_layers=3,
    ffn_dim=256,
    dropout=0.05,
    head_dropout=0.05,
    scaling="std",
    loss="mse",
    pre_norm=True,
    norm_type="batchnorm"
)
model = PatchTSTForPrediction(config)

# Аргументы обучения
training_args = TrainingArguments(
    output_dir="./checkpoint/patchtst/ZN_F/output/",
    overwrite_output_dir=True,
    num_train_epochs=100,
    do_eval=True,
    eval_strategy="epoch",
    per_device_train_batch_size=64,
    per_device_eval_batch_size=64,
    dataloader_num_workers=16,
    save_strategy="epoch",
    logging_strategy="epoch",
    save_total_limit=3,
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    greater_is_better=False,
    label_names=["future_values"],
    learning_rate=1e-4
)

early_stopping = EarlyStoppingCallback(
    early_stopping_patience=10,
    early_stopping_threshold=0.0001
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=valid_dataset,
    callbacks=[early_stopping]
)

# Обучение модели
trainer.train()
Разделение данных:
Train: 0 до 1233 (длина: 1233)
Valid: 1105 до 1410 (длина: 305)
Test: 1282 до 1762 (длина: 480)

Проведение кросс-валидации с 10 фолдами...
Обучение фолда 10/10

Графики динамики Loss на Train и Val наборах данных

Рис. 5: Графики динамики Loss на Train и Val наборах данных

# Прогноз на тестовом наборе
prediction_output = trainer.predict(test_dataset)
predictions = prediction_output.predictions
labels = prediction_output.label_ids

if isinstance(predictions, tuple):
    print("Predictions — кортеж, выбираем первый элемент")
    predictions = predictions[0]
if isinstance(labels, tuple):
    print("Labels — кортеж, выбираем первый элемент")
    labels = labels[0]

predictions = np.array(predictions) if isinstance(predictions, torch.Tensor) else predictions
labels = np.array(labels) if isinstance(labels, torch.Tensor) else labels

predicted_values = scaler.inverse_transform(predictions.reshape(-1, 1)).reshape(-1, forecast_horizon)
actual_values = scaler.inverse_transform(labels.reshape(-1, 1)).reshape(-1, forecast_horizon)

overall_mape = np.mean(np.abs((actual_values.flatten() - predicted_values.flatten()) / actual_values.flatten())) * 100
print(f"Общий MAPE на тестовом наборе: {overall_mape:.2f}%")

# Прогноз на следующие 20 свечей
input_data = data.iloc[num_train - context_length:num_train].copy()
input_data['Typical_Close_scaled'] = scaler.transform(input_data[['Typical_Close']])
scaled_input = preprocessor.preprocess(input_data)[target_columns_scaled].values
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
past_values = torch.tensor(scaled_input[-context_length:].reshape(1, context_length, -1), dtype=torch.float32).to(device)
with torch.no_grad():
    prediction = model(past_values).prediction_outputs
predicted_tc = scaler.inverse_transform(prediction.cpu().numpy().reshape(-1, 1)).flatten()

# Реконструкция OHLC
train_data['high_dev'] = train_data['High'] - train_data['Typical_Close']
train_data['low_dev'] = train_data['Typical_Close'] - train_data['Low']
mean_high_dev = train_data['high_dev'].mean()
mean_low_dev = train_data['low_dev'].mean()

last_close = input_data['Close'].iloc[-1]
forecast_candles = []
previous_close = last_close
for tc_pred in predicted_tc:
    open_c = previous_close
    high_c = tc_pred + mean_high_dev
    low_c = tc_pred - mean_low_dev
    close_c = 3 * tc_pred - high_c - low_c
    forecast_candles.append({
        'open': open_c,
        'high': high_c,
        'low': low_c,
        'close': close_c,
        'predicted_typical_close': tc_pred
    })
    previous_close = close_c

forecast_df = pd.DataFrame(forecast_candles)
forecast_df['Date'] = data['Date'].iloc[num_train:min(num_train + 20, len(data))].values

# Корректное присвоение actual_typical_close
forecast_start = num_train
forecast_end = min(num_train + 20, len(data))
forecast_df['actual_typical_close'] = data['Typical_Close'].iloc[forecast_start:forecast_end].values

# Проверка фактических значений для периода прогноза
print("\nФактические значения для периода прогноза (индексы {}–{}):".format(forecast_start, forecast_end - 1))
actual_check = data[['Date', 'Open', 'High', 'Low', 'Close', 'Typical_Close']].iloc[forecast_start:forecast_end]
print(actual_check.to_string(index=False))

# Расчёт MAPE для прогноза
mape_specific = np.mean(np.abs((forecast_df['actual_typical_close'] - forecast_df['predicted_typical_close']) / forecast_df['actual_typical_close'])) * 100
print(f"MAPE для прогноза на 20 свечей: {mape_specific:.2f}%")

# Вывод таблицы прогноза
print("\nПрогноз на следующие 20 свечей:")
print(forecast_df[['open', 'high', 'low', 'close', 'predicted_typical_close', 'actual_typical_close']].to_string(index=False))

# Визуализации
# 1. Исторический прогноз
test_start_index = border1s[2]
num_samples = len(test_dataset)
pred_dates = data['Date'].iloc[test_start_index + context_length:test_start_index + context_length + num_samples]
predicted_1step = predicted_values[:, 0]

plt.figure(figsize=(15, 5))
plt.plot(data['Date'], data['Typical_Close'], color='gray', label='Фактический Typical Close')
plt.plot(pred_dates, predicted_1step, color='red', label='Предсказанный Typical Close (1 шаг)')
plt.legend()
plt.title('Реальные котировки (серый) + Исторический прогноз (красный)')
plt.xlabel('Дата')
plt.ylabel('Typical Close')
plt.savefig('historical_forecast.png')
plt.close()

# 2. Прогноз на 20 периодов
historical_window = 90
historical_dates = data['Date'].iloc[max(0, num_train - historical_window):num_train]
historical_values = data['Typical_Close'].iloc[max(0, num_train - historical_window):num_train]
forecast_dates = data['Date'].iloc[forecast_start:forecast_end]
actual_tc_forecast = data['Typical_Close'].iloc[forecast_start:forecast_end]

plt.figure(figsize=(15, 5))
plt.plot(historical_dates, historical_values, color='gray', label='Исторические данные (последние 90 баров)')
plt.plot(forecast_dates, actual_tc_forecast, color='gray', linestyle='--', label='Фактический период прогноза')
plt.plot(forecast_dates, forecast_df['predicted_typical_close'], color='red', label='Предсказанный')
plt.legend()
plt.title('Последние 90 баров (серый) + Прогноз на 20 периодов (красный)')
plt.xlabel('Дата')
plt.ylabel('Typical Close')
plt.savefig('20_period_forecast.png')
plt.close()

print("\nВизуализации сохранены как 'historical_forecast.png' и '20_period_forecast.png'")
Результаты кросс-валидации:
Средний MAPE: 12.92%
Стандартное отклонение MAPE: 1.56%

Общий MAPE на тестовом наборе: 0.87%
MAPE для прогноза на 20 свечей: 0.63%

Фактические значения для периода прогноза (индексы 1233–1242):
      Date       Open       High        Low      Close  Typical_Close
2023-03-24 115.890625 117.046875 115.671875 116.109375     116.276042
2023-03-27 116.171875 116.203125 114.875000 114.984375     115.354167
2023-03-28 114.984375 115.234375 114.531250 114.640625     114.802083
2023-03-29 114.578125 114.875000 114.218750 114.531250     114.541667
2023-03-30 114.468750 114.734375 114.218750 114.578125     114.510417
2023-03-31 114.531250 115.109375 114.281250 114.921875     114.770833
2023-04-03 114.906250 115.671875 114.562500 115.453125     115.229167
2023-04-04 115.578125 116.281250 115.078125 116.250000     115.869792
2023-04-05 116.171875 116.937500 115.906250 116.578125     116.473958
2023-04-06 116.390625 116.937500 116.343750 116.546875     116.609375

Сравнение динамики реальных котировок гос.облигаций США (серая линия) и предсказаний PatchTST с 2024 года

Рис. 6: Сравнение динамики реальных котировок гос.облигаций США (серая линия) и предсказаний PatchTST с 2024 года

График ближайшего краткосрочного прогноза PatchTST на 20 дней 

Рис. 7: График ближайшего краткосрочного прогноза PatchTST на 20 дней 

Данная реализация демонстрирует полный цикл работы с PatchTST для прогнозирования цен облигаций. Модель использует патчи размером 32 временных точек, что позволяет эффективно улавливать как краткосрочные колебания, так и долгосрочные тренды в данных облигаций. Особенно важна стратегия восстановления OHLC-данных из прогнозируемых значений Typical Close с использованием исторических отклонений.

Читайте также:  Прогнозирование трафика и конверсий сайта с помощью SVM, SVR (опорных векторов)

Давайте рассмотрим кратко что тут происходит в коде:

  1. Мы уходим от классического Close, потому что он не отражает размах цен внутри свечей и вычисляем Typical_Close как среднее (High + Low + Close) / 3 для каждой свечи, которая используется как целевая переменная;
  2. Далее разделяем данные на обучающую (70%), валидационную (10%) и тестовую (20%) выборки;
  3. Приводим Typical_Close к единому масштабу с помощью StandardScaler;
  4. Используем TimeSeriesPreprocessor и ForecastDFDataset из библиотеки tsfm_public для подготовки временных рядов с учетом контекста (context_length=128) и горизонта прогноза (forecast_horizon=20);
  5. Проводим кросс-валидацию с 10 фолдами, используя TimeSeriesSplit, чтобы оценить стабильность модели на разных отрезках обучающих данных;
  6. Для каждого фолда обучаем модель и вычисляем MAPE (Mean Absolute Percentage Error), выводя средний MAPE и его стандартное отклонение;
  7. Обучаем модель PatchTST с параметрами: context_length=128, patch_length=32, d_model=128, с 3 слоями и минимальной регуляризацией (dropout=0.05). Указываем как параметры 100 эпох с ранней остановкой (early_stopping_patience=10) для предотвращения переобучения, используя MSE как функцию потерь;
  8. Делаем прогноз на 20 свечей вперед после обучающей выборки, предсказывая Typical_Close. Реконструируем OHLC (Open, High, Low, Close) для прогнозных свечей, используя средние отклонения High и Low от Typical_Close на обучающих данных;
  9. Вычисляем MAPE для прогноза, сравнивая предсказанный и фактический Typical_Close.

Обратите внимание — для загрузки модели, ее весов, мы используем wandb.ai. Это платформа для отслеживания, визуализации и управления экспериментами в машинном обучении, которая помогает анализировать и оптимизировать модели в реальном времени. Если вы будете использовать этот код вам потребуется получить API ключ. Получить его бесплатно (за регистрацию) можно здесь: https://wandb.ai/authorize?ref=models.

Выбор 10-летних облигаций США в качестве первого примера не случаен. Этот инструмент характеризуется относительной стабильностью, четко выраженными циклическими паттернами, связанными с макроэкономическими циклами, и меньшей подверженностью краткосрочному рыночному шуму. PatchTST особенно эффективно работает с такими данными благодаря способности трансформера улавливать долгосрочные зависимости через механизм внимания.

Пример использования PatchTST №2: Прогнозирование роста или снижения Bitcoin

Второй пример демонстрирует применение PatchTST для гораздо более сложной задачи — прогнозирования направления движения Bitcoin. Криптовалютные рынки отличаются экстремальной волатильностью, нелинейными зависимостями и частыми структурными сдвигами, что делает их идеальным полигоном для тестирования продвинутых архитектур машинного обучения.

Учитывая, что ни одна из open-source моделей машинного обучения даже близко не приблизилась к прогнозированию котировок криптовалют с высокой точностью, мы немного упростим задачу для PatchTST и попросим посчитать для нас вероятности сигналов: роста (LONG) на 5%, снижения (SHORT) на 5% в течение 5 следующих периодов, либо отсутствие вероятности таких больших движений и соответственно сигнал NO TRADE. То есть попросим решить задачу supervised-classification, где мы укажем, когда должен быть тот или иной сигнал на train, и попробуем обучить модель предсказывать их на test.

# Импорт библиотек и загрузка данных
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from transformers import PatchTSTConfig, PatchTSTForClassification, Trainer, TrainingArguments, EarlyStoppingCallback
from tsfm_public.toolkit.dataset import ForecastDFDataset
from tsfm_public.toolkit.time_series_preprocessor import TimeSeriesPreprocessor
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import precision_score, confusion_matrix
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from typing import Optional
import os
from tqdm import tqdm
import time
import warnings
warnings.filterwarnings('ignore')

# Установка seed для воспроизводимости
torch.manual_seed(2025)
np.random.seed(2025)

# Загрузка котировок Bitcoin
ticker = 'BTC-USD'
start_date = '2018-05-01'
end_date = '2025-05-01'
data = yf.download(ticker, start=start_date, end=end_date)

# Сброс индекса и обработка мультииндекса
data = data.reset_index()
if isinstance(data.columns, pd.MultiIndex):
    data.columns = [col[0] if col[0] != '' else col[1] for col in data.columns]
data['Date'] = pd.to_datetime(data['Date'])
print("Длина данных:", len(data))
print("Колонки после загрузки из yfinance:", data.columns.tolist())

# Расчет Typical Close
data['Typical_Close'] = (data['High'] + data['Low'] + data['Close']) / 3
print("\nПроверка первых 5 строк данных:")
print(data[['Date', 'Open', 'High', 'Low', 'Close', 'Typical_Close']].head().to_string())
Колонки после загрузки из yfinance: ['Date', 'Close', 'High', 'Low', 'Open', 'Volume']

Проверка первых 5 строк данных:
        Date         Open         High          Low        Close  Typical_Close
0 2018-05-01  9251.469727  9255.879883  8891.049805  9119.009766    9088.646484
1 2018-05-02  9104.599609  9256.519531  9015.139648  9235.919922    9169.193034
2 2018-05-03  9233.969727  9798.330078  9188.150391  9743.860352    9576.780273
3 2018-05-04  9695.500000  9779.200195  9585.959961  9700.759766    9688.639974
4 2018-05-05  9700.280273  9964.500000  9695.120117  9858.150391    9839.256836
# Обогащаем датафрейм лаговыми столбцами в диапазоне 1-5 дней
for lag in range(1, 6):
    data[f'Tclose_lag{lag}'] = data['Typical_Close'].shift(-lag)

# Добавляем forward fill для заполнения NaN значений
for lag in range(1, 6):
    col_name = f'Tclose_lag{lag}'
    data[col_name] = data[col_name].fillna(method='ffill')

# Список лаговых столбцов
lag_columns = [f'Tclose_lag{lag}' for lag in range(1, 6)]

# Создание булевых столбцов с True при изменении цены на 5% или более в будущем
data['Plus5change'] = (data[lag_columns] >= data['Typical_Close'].values[:, None] * 1.05).any(axis=1)
data['Minus5change'] = (data[lag_columns] <= data['Typical_Close'].values[:, None] * 0.95).any(axis=1)
data['Flat'] = ( (~data['Plus5change'] & ~data['Minus5change']) |
                 (data['Plus5change'] & data['Minus5change']) )

# Создание столбца с торговыми сигналами
conditions = [ (data['Plus5change'] == True) & (data['Minus5change'] == False),  # LONG
               (data['Minus5change'] == True) & (data['Plus5change'] == False) ]  # SHORT
choices = ['LONG', 'SHORT']
data['Signal'] = np.select(conditions, choices, default='NO_TRADE')

# Финальная выборка колонок
data = data[['Date', 'Close', 'High', 'Low', 'Open', 'Typical_Close', 'Volume', 'Signal']]
data.head(5)

Датафрейм с котировками Биткоина с сигналами LONG, SHORT, NO_TRADE, построенным по лаговым признакам на горизонте ближайших 1-5 дней

Рис. 8: Датафрейм с котировками Биткоина с сигналами LONG, SHORT, NO_TRADE, построенным по лаговым признакам на горизонте ближайших 1-5 дней

# Построение графика котировок с вертикальными линиями сигналов LONG и SHORT
def plot_signals(data, start_date, end_date):
    # Фильтруем данные по диапазону дат
    filtered_data = data[(data['Date'] >= start_date) & (data['Date'] <= end_date)]

    fig, ax = plt.subplots(figsize=(14, 6))

    # Рисуем серую горизонтальную линию Typical_Close
    ax.plot(filtered_data['Date'], filtered_data['Typical_Close'], color='gray', label='Typical_Close')

    # Находим индексы для сигналов LONG и SHORT
    long_dates = filtered_data[filtered_data['Signal'] == 'LONG']['Date']
    short_dates = filtered_data[filtered_data['Signal'] == 'SHORT']['Date']

    # Добавляем вертикальные зеленые линии для LONG
    for date in long_dates:
        ax.axvline(x=date, color='lightgreen', linestyle='-', linewidth=1.5)

    # Добавляем вертикальные красные линии для SHORT
    for date in short_dates:
        ax.axvline(x=date, color='coral', linestyle='-', linewidth=1.5)

    # Оформление графика
    ax.set_title('Typical_Close с сигналами LONG/SHORT')
    ax.set_xlabel('Дата')
    ax.set_ylabel('Цена')
    ax.grid(True)
    ax.legend(['Typical_Close', 'LONG сигналы', 'SHORT сигналы'])

    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

plot_signals(data, start_date='2018-05-01', end_date='2019-05-01')
plot_signals(data, start_date='2019-05-01', end_date='2020-05-01')
plot_signals(data, start_date='2020-05-01', end_date='2021-05-01')
plot_signals(data, start_date='2021-05-01', end_date='2022-05-01')
plot_signals(data, start_date='2022-05-01', end_date='2023-05-01')
plot_signals(data, start_date='2023-05-01', end_date='2024-05-01')
plot_signals(data, start_date='2024-05-01', end_date='2025-05-01')

График цен Bitcoin с сигналами покупки и продажи с мая 2018 по май 2019

Рис. 9: График цен Bitcoin с сигналами покупки и продажи с мая 2018 по май 2019

График с мая 2019 по май 2020

Рис. 10: График с мая 2019 по май 2020

График с мая 2020 по май 2021

Рис. 11: График с мая 2020 по май 2021

График с мая 2021 по май 2022

Рис. 12: График с мая 2021 по май 2022

График с мая 2022 по май 2023

Рис. 13: График с мая 2022 по май 2023

График с мая 2023 по май 2024

Рис. 14: График с мая 2023 по май 2024

График с мая 2024 по май 2025

Рис. 15: График с мая 2024 по май 2025

На графиках выше мы видим зеленые зоны (там где Signal=LONG) и красные зоны (там где Signal=SHORT) на линейном чарте котировок Биткоина. И совершение сделок в этих зонах — наиболее благоприятное для трейдинга с точки зрения времени-доходности, так как в большинстве случаев цена в этих зонах уходит не просто на 5%, а существенно больше, и довольно стремительно.

Читайте также:  Автокорреляция (ACF) и частичная автокорреляция (PACF) в биржевом анализе

Графики впечатляют, однако увы эти сигналы непригодны для торговли, так как построены на лагах на 1-5 периодов вперед, т. е. заглядывают в будущее. Но почему не протестировать сможет ли нейросеть PatchTST найти какие-то закономерности между этими сигналами и предыдущими котировками? Сможет ли предсказывать эти сигналы? Давайте посмотрим…

# Энкодинг. Преобразование категориальных меток в числа
label_map = {'LONG': 0, 'SHORT': 1, 'NO_TRADE': 2}
data['Signal'] = data['Signal'].map(label_map)

# Проверка таблицы после маппинга
print("Уникальные значения в столбце Signal после кодирования:", data['Signal'].unique())
print("Первые 5 строк данных с закодированным Signal:")
print(data[['Date', 'Close', 'High', 'Low', 'Open', 'Typical_Close', 'Volume', 'Signal']].head().to_string())
Уникальные значения в столбце Signal после кодирования: [0 2 1]
Первые 5 строк данных с закодированным Signal:
        Date        Close         High          Low         Open  Typical_Close       Volume  Signal
0 2018-05-01  9119.009766  9255.879883  8891.049805  9251.469727    9088.646484   7713019904       0
1 2018-05-02  9235.919922  9256.519531  9015.139648  9104.599609    9169.193034   7558159872       0
2 2018-05-03  9743.860352  9798.330078  9188.150391  9233.969727    9576.780273  10207299584       2
3 2018-05-04  9700.759766  9779.200195  9585.959961  9695.500000    9688.639974   8217829888       2
4 2018-05-05  9858.150391  9964.500000  9695.120117  9700.280273    9839.256836   7651939840       1
# Выбираем столбцы с признаками
feature_columns = ['Close', 'High', 'Low', 'Open', 'Typical_Close', 'Volume']
X = data[feature_columns].values
y = data['Signal'].values

# Масштабируем признаки
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Кросс-валидация на 5 фолдов
tscv = TimeSeriesSplit(n_splits=5)
for fold, (train_index, test_index) in enumerate(tscv.split(X_scaled)):
    print(f"Фолд {fold + 1}: Размер тренировочной выборки: {len(train_index)}, Размер тестовой выборки: {len(test_index)}")

# Посмотрим на размеры последнего фолда
train_index, test_index = list(tscv.split(X_scaled))[-1]
X_train, X_test = X_scaled[train_index], X_scaled[test_index]
y_train, y_test = y[train_index], y[test_index]
print("Размер X_train:", X_train.shape)
print("Размер X_test:", X_test.shape)
print("Размер y_train:", y_train.shape)
print("Размер y_test:", y_test.shape)
Фолд 1: Размер тренировочной выборки: 427, Размер тестовой выборки: 426
Фолд 2: Размер тренировочной выборки: 853, Размер тестовой выборки: 426
Фолд 3: Размер тренировочной выборки: 1279, Размер тестовой выборки: 426
Фолд 4: Размер тренировочной выборки: 1705, Размер тестовой выборки: 426
Фолд 5: Размер тренировочной выборки: 2131, Размер тестовой выборки: 426
Размер X_train: (2131, 6)
Размер X_test: (426, 6)
Размер y_train: (2131,)
Размер y_test: (426,)

Мы выполнили энкодинг и кросс-валидацию. Она скользящая — со сдвигом вправо трейн-выборка растет, тестовая — свдигается вместе с окончанием первой.

# Создаем кастомный датасет для PatchTST
class TimeSeriesClassificationDataset(Dataset):
    def __init__(self, X, y, context_length):
        self.X = X  # Нормализованные признаки (n_samples, n_features)
        self.y = y  # Закодированные метки
        self.context_length = context_length
        self.num_samples = len(X) - context_length + 1  # Число возможных последовательностей

    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        # Извлечение последовательности context_length
        past_values = self.X[idx:idx + self.context_length]
        target_values = self.y[idx + self.context_length - 1]  # Метка в конце последовательности
        return {
            'past_values': torch.tensor(past_values, dtype=torch.float32),
            'target_values': torch.tensor(target_values, dtype=torch.long)
        }

# Параметры
context_length = 128  # Количество временных шагов в каждой последовательности
num_input_channels = len(feature_columns)  # 6 признаков

try:
    # Создаем train и test датасеты
    train_dataset = TimeSeriesClassificationDataset(X_train, y_train, context_length)
    test_dataset = TimeSeriesClassificationDataset(X_test, y_test, context_length)

    # Проверка
    sample_data = train_dataset[0]  # 1й сэмпл
    print("Размер past_values в первом тренировочном сэмпле:", sample_data['past_values'].shape)
    print("Значение target_values в первом тренировочном сэмпле:", sample_data['target_values'].item())
    print("Количество тренировочных сэмплов:", len(train_dataset))
    print("Количество тестовых сэмплов:", len(test_dataset))

except Exception as e:
    print("Ошибка при создании датасета:", str(e))
Размер past_values в первом тренировочном сэмпле: torch.Size([128, 6])
Значение target_values в первом тренировочном сэмпле: 1
Количество тренировочных сэмплов: 2004
Количество тестовых сэмплов: 299
# Конфигурация модели
config = PatchTSTConfig(
    num_input_channels=num_input_channels, #6 признаков
    num_targets=3,  #3 класса: LONG (0), SHORT (1), NO_TRADE (2)
    context_length=context_length, #длина контекста = 128 временных рядов
    patch_length=16,   #Длина каждого патча
    stride=16,         #Длина страйда (перекрытия) для патчей
    use_cls_token=True, #Используем CLS токен для классификации
    num_attention_heads=4,  #Количество голов внимания
    d_model=256,       #Размер слоев модели
    num_layers=4,      #Количество слоев
)
model = PatchTSTForClassification(config=config)
device = torch.device("cuda")  #Или cpu
model = model.to(device)

# Проверка
print("Количество входных каналов:", config.num_input_channels)
print("Количество классов:", config.num_targets)
print("Длина контекста:", config.context_length)
print("Размер модели (параметры):", sum(p.numel() for p in model.parameters() if p.requires_grad))
Количество входных каналов: 6
Количество классов: 3
Длина контекста: 128
Размер модели (параметры): 1590531

Мы сконфигурировали не слишком тяжелую модель, но и не примитивную — на 1,6 млн параметров.

Давайте посмотрим на распределение классов.

print("Class distribution in dataset:")
print(data['Signal'].value_counts(normalize=True))
Class distribution in dataset:
Signal
2    0.532655
0    0.263981
1    0.203363
Name: proportion, dtype: float64

Диспропорция классов есть. Чаще всего в нашем датасете был сигнал NO_TRADE (в 53.2% случаев), затем LONG (26.3%), и SHORT (в 20.3% случаев).

Чтобы модель не переобучилась на доминирующий класс, логично назначить классам различные веса.

# Рассчитываем веса классов
label_map_reverse = {0: 'LONG', 1: 'SHORT', 2: 'NO_TRADE'}
class_counts = pd.Series([label_map_reverse[label] for label in y]).value_counts()
total_samples = len(y)
weights = torch.tensor([total_samples / (3 * class_counts[label]) for label in ['LONG', 'SHORT', 'NO_TRADE']]).to(device)
print("Class weights:", weights.tolist())
Class weights: [1.262716049382716, 1.639102564102564, 0.6257953989231523]
# Выбираем устройство для обучения модели
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
print("Device used:", device)

# Определяем какая будет функция потерь и оптимизатор
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=0.01)

# Создаем даталоадеры
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False)

# Функция для замера Precision индивидуально для каждого класса
def compute_precision(y_true, y_pred):
    precision_per_class = precision_score(y_true, y_pred, average=None, labels=[0, 1, 2])
    return {
        'precision_LONG': precision_per_class[0],
        'precision_SHORT': precision_per_class[1],
        'precision_NO_TRADE': precision_per_class[2]
    }

# Проверка DataLoader и входных данных модели
try:
    # Берем один батч из train_dataloader
    batch = next(iter(train_dataloader))
    past_values = batch['past_values'].to(device)
    target_values = batch['target_values'].to(device)

    # Forward pass для проверки совместимости моделей
    model.eval()
    with torch.no_grad():
        outputs = model(past_values=past_values)
        logits = outputs.prediction_logits
        predictions = torch.argmax(logits, dim=-1)
        loss = criterion(logits, target_values)

    print("Shape of past_values in batch:", past_values.shape)
    print("Shape of target_values in batch:", target_values.shape)
    print("Shape of model logits:", logits.shape)
    print("Sample predictions:", predictions.cpu().numpy()[:5])
    print("Sample loss:", loss.item())
    print("Number of training batches:", len(train_dataloader))
    print("Number of evaluation batches:", len(test_dataloader))

except Exception as e:
    print("Ошибка при проверке DataLoader и модели:", str(e))
Device used: cuda
Shape of past_values in batch: torch.Size([16, 128, 6])
Shape of target_values in batch: torch.Size([16])
Shape of model logits: torch.Size([16, 3])
Sample predictions: [0 0 0 0 0]
Sample loss: 1.1202704906463623
Number of training batches: 126
Number of evaluation batches: 19

В качестве параметров обучения мы зададим 50 эпох обучения и early stopping с patience=5. Для ускорения обучения будем использовать CUDA, то есть видеокарту.

# Задаем параметры обучения
num_epochs = 50  
patience = 5     
best_precision = 0.0
epochs_no_improve = 0
best_model_path = "./best_model_weighted.pth"

# Запуск обучения и тайминга
start_time = time.time()

model.train()
for epoch in range(num_epochs):
    train_loss = 0.0
    train_predictions = []
    train_labels = []
    
    train_progress = tqdm(train_dataloader, desc=f"Epoch {epoch + 1}/{num_epochs} [Train]", leave=False)
    for batch in train_progress:
        past_values = batch['past_values'].to(device)
        target_values = batch['target_values'].to(device)
        
        optimizer.zero_grad()
        outputs = model(past_values=past_values)
        loss = criterion(outputs.prediction_logits, target_values)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        predictions = torch.argmax(outputs.prediction_logits, dim=-1)
        train_predictions.extend(predictions.cpu().numpy())
        train_labels.extend(target_values.cpu().numpy())
        
        train_progress.set_postfix({'loss': loss.item()})
    
    train_loss /= len(train_dataloader)
    train_precision = compute_precision(train_labels, train_predictions)
    
    model.eval()
    eval_loss = 0.0
    eval_predictions = []
    eval_labels = []
    
    eval_progress = tqdm(test_dataloader, desc=f"Epoch {epoch + 1}/{num_epochs} [Eval]", leave=False)
    with torch.no_grad():
        for batch in eval_progress:
            past_values = batch['past_values'].to(device)
            target_values = batch['target_values'].to(device)
            outputs = model(past_values=past_values)
            loss = criterion(outputs.prediction_logits, target_values)
            
            eval_loss += loss.item()
            predictions = torch.argmax(outputs.prediction_logits, dim=-1)
            eval_predictions.extend(predictions.cpu().numpy())
            eval_labels.extend(target_values.cpu().numpy())
            
            eval_progress.set_postfix({'loss': loss.item()})
    
    eval_loss /= len(test_dataloader)
    eval_precision = compute_precision(eval_labels, eval_predictions)
    
    print(f"\nEpoch {epoch + 1}/{num_epochs}")
    print(f"Train Loss: {train_loss:.4f}")
    print(f"Train Precision - LONG: {train_precision['precision_LONG']:.4f}, "
          f"SHORT: {train_precision['precision_SHORT']:.4f}, "
          f"NO_TRADE: {train_precision['precision_NO_TRADE']:.4f}")
    print(f"Eval Loss: {eval_loss:.4f}")
    print(f"Eval Precision - LONG: {eval_precision['precision_LONG']:.4f}, "
          f"SHORT: {eval_precision['precision_SHORT']:.4f}, "
          f"NO_TRADE: {eval_precision['precision_NO_TRADE']:.4f}")
    
    if eval_precision['precision_NO_TRADE'] > best_precision:
        best_precision = eval_precision['precision_NO_TRADE']
        epochs_no_improve = 0
        torch.save(model.state_dict(), best_model_path)
        print(f"Best model saved with precision_NO_TRADE: {best_precision:.4f}")
    else:
        epochs_no_improve += 1
    
    if epochs_no_improve >= patience:
        print(f"Early stopping triggered after {epoch + 1} epochs")
        break
    
    model.train()

# End timing
end_time = time.time()
duration = end_time - start_time
print(f"\nTraining completed in {duration:.2f} seconds.")
Epoch 15/50
Train Loss: 0.8640
Train Precision - LONG: 0.6368, SHORT: 0.5349, NO_TRADE: 0.6224
Eval Loss: 1.6981
Eval Precision - LONG: 0.3108, SHORT: 0.0000, NO_TRADE: 0.5960
Early stopping triggered after 15 epochs

Training completed in 111.28 seconds.

Early stopping остановил обучение на 15 эпохе, так как loss перестал снижаться.

Читайте также:  Поиск аномалий в данных с Python

Метрики Precision на тренировочной выборке неплохие:

  • LONG = 0.63;
  • SHORT = 0.53:
  • NO_TRADE = 0.62.

Однако на валидационной выборке мы видим что модель слабая.

# Загрузка лучшей модели
model.load_state_dict(torch.load("./best_model_weighted.pth"))
model.eval()
print("Best model loaded from ./best_model.pth")

# Создаем полный датасет (train + test) для предсказаний
full_dataset = TimeSeriesClassificationDataset(X_scaled, y, context_length)
full_dataloader = DataLoader(full_dataset, batch_size=16, shuffle=False)

# Генерируем предсказания и вероятности
all_predictions = []
all_probabilities = []
all_labels = []

with torch.no_grad():
    progress_bar = tqdm(full_dataloader, desc="Generating Predictions", leave=True)
    for batch in progress_bar:
        past_values = batch['past_values'].to(device)
        target_values = batch['target_values'].to(device)
        
        outputs = model(past_values=past_values)
        logits = outputs.prediction_logits
        predictions = torch.argmax(logits, dim=-1)     
        probabilities = torch.softmax(logits, dim=-1)  
        
        all_predictions.extend(predictions.cpu().numpy())
        all_probabilities.extend(probabilities.cpu().numpy())
        all_labels.extend(target_values.cpu().numpy())
        
        progress_bar.set_postfix({'batch_size': len(past_values)})

# Преобразуем числа классов обратно в текст
label_map_reverse = {0: 'LONG', 1: 'SHORT', 2: 'NO_TRADE'}
signal_predict = [label_map_reverse[pred] for pred in all_predictions]
signal_true = [label_map_reverse[label] for label in all_labels]

# Создаем столбцы для вероятностей
probabilities = np.array(all_probabilities)
prob_LONG = probabilities[:, 0]
prob_SHORT = probabilities[:, 1]
prob_NO_TRADE = probabilities[:, 2]

# Создаем финальный датафрейм
data_final = data.iloc[context_length-1:].copy().reset_index(drop=True)
data_final['Signal'] = signal_true
data_final['Signal_Predict'] = signal_predict
data_final['prob_LONG'] = prob_LONG
data_final['prob_SHORT'] = prob_SHORT
data_final['prob_NO_TRADE'] = prob_NO_TRADE

# Проверка данных
print("Final DataFrame shape:", data_final.shape)
print("First 5 rows of final DataFrame:")
print(data_final[['Date', 'Signal', 'Signal_Predict', 'prob_LONG', 'prob_SHORT', 'prob_NO_TRADE']].head().to_string())
print("\nPrecision on full dataset:")
full_precision = compute_precision(all_labels, all_predictions)
print(f"Precision - LONG: {full_precision['precision_LONG']:.4f}, "
      f"SHORT: {full_precision['precision_SHORT']:.4f}, "
      f"NO_TRADE: {full_precision['precision_NO_TRADE']:.4f}")

# Строим и визуализируем матрицу несоответствий
labels = ['LONG', 'SHORT', 'NO_TRADE']
cm = confusion_matrix(signal_true, signal_predict, labels=labels)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=labels, yticklabels=labels)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.savefig('confusion_matrix.png')
print("\nConfusion matrix saved as 'confusion_matrix.png'")

# Расчет статистик по ошибочным прогнозам
total_samples = len(data_final)

# 1) Signal_Predict = LONG когда Signal != LONG
long_errors = len(data_final[(data_final['Signal_Predict'] == 'LONG') & (data_final['Signal'] != 'LONG')])
long_error_percent = (long_errors / total_samples) * 100

# 2) Signal_Predict = SHORT когда Signal != SHORT
short_errors = len(data_final[(data_final['Signal_Predict'] == 'SHORT') & (data_final['Signal'] != 'SHORT')])
short_error_percent = (short_errors / total_samples) * 100

# 3) Signal_Predict = NO_TRADE когда Signal != NO_TRADE
no_trade_errors = len(data_final[(data_final['Signal_Predict'] == 'NO_TRADE') & (data_final['Signal'] != 'NO_TRADE')])
no_trade_error_percent = (no_trade_errors / total_samples) * 100

# Вывод статистик ошибок
print("\nError Statistics:")
print(f"1) Signal_Predict = LONG when Signal != LONG: {long_errors} samples ({long_error_percent:.2f}%)")
print(f"2) Signal_Predict = SHORT when Signal != SHORT: {short_errors} samples ({short_error_percent:.2f}%)")
print(f"3) Signal_Predict = NO_TRADE when Signal != NO_TRADE: {no_trade_errors} samples ({no_trade_error_percent:.2f}%)")
Best model loaded from ./best_model.pth
Generating Predictions: 100%|██████████| 152/152 [00:04<00:00, 33.62it/s, batch_size=14]
Final DataFrame shape: (2430, 12)
First 5 rows of final DataFrame:
        Date    Signal Signal_Predict  prob_LONG  prob_SHORT  prob_NO_TRADE
0 2018-09-05     SHORT           LONG   0.558355    0.045573       0.396072
1 2018-09-06  NO_TRADE           LONG   0.607106    0.039679       0.353215
2 2018-09-07  NO_TRADE           LONG   0.678753    0.029567       0.291680
3 2018-09-08  NO_TRADE           LONG   0.759127    0.019339       0.221534
4 2018-09-09  NO_TRADE           LONG   0.813804    0.013379       0.172816

Precision on full dataset:
Precision - LONG: 0.2478, SHORT: 0.0000, NO_TRADE: 0.5217

Confusion matrix saved as 'confusion_matrix.png'

Error Statistics:
1) Signal_Predict = LONG when Signal != LONG: 944 samples (38.85%)
2) Signal_Predict = SHORT when Signal != SHORT: 0 samples (0.00%)
3) Signal_Predict = NO_TRADE when Signal != NO_TRADE: 562 samples (23.13%)

Матрица несоответствий (confusion matrix) реальных классов и прогнозных

Рис. 16: Матрица несоответствий (confusion matrix) реальных классов и прогнозных

Увы, но судя по матрице несоответствий наша модель совсем не обучилась предсказывать SHORT — по нулям. Что касаемо LONG, то PatchTST спрогнозировал верное направление роста цены лишь в 311 случаях из 1255. Лучше всего модель смогла предсказать избегание торговли (NO_TRADE) — в 613 случаях из 1175 это было верным прогнозом.

А что с вероятностями? Давайте взглянем на них выборочно. Используем sample из функционала pandas чтобы посмотреть случайные строки по разным датам.

data_final[['Date', 'Signal_Predict', 'Signal', 'prob_LONG', 'prob_SHORT', 'prob_NO_TRADE']].sample(10)

Таблица с датами, реальными и прогнозными сигналами и вероятностями

Рис. 17: Таблица с датами, реальными и прогнозными сигналами и вероятностями

В целом тут без сюрпризов. Разве что строки 1314 и 311 демонстрируют, что будь вероятности следующего за доминирующим классом повыше, то сигналы были бы правильными.

Итак резюмируем:

Код выше демонстрирует применение PatchTST для прогнозирования направления движения цены Bitcoin. Задача заключалась в предсказании роста на 5% и более, падения на 5% и более или отсутствия значительных движений в течение следующих 5-ти периодов.

Модель показала умеренные результаты: точность предсказания класса NO_TRADE составила 52.2%, что близко к доле этого класса в данных (53.6%), однако предсказания для LONG (24.8%) и особенно SHORT (0%) оказались значительно более слабыми.

Основная сложность связана с экстремальной волатильностью Биткоина, где даже продвинутые архитектуры, такие как PatchTST, сталкиваются с проблемой нестабильных паттернов и шумных данных. Высокий процент ошибок для сигнала LONG (38.8% ложных срабатываний) указывает на склонность модели переоценивать вероятность роста, что может быть связано с дисбалансом классов и недостаточной настройкой гиперпараметров. Отсутствие точных предсказаний для SHORT, вероятно, обусловлено меньшей долей этого класса в данных (19.5%) и сложностью улавливания резких падений в краткосрочной перспективе.

Несмотря на это, PatchTST демонстрирует потенциал для задач классификации временных рядов, особенно при наличии более стабильных данных или более тщательной предобработки.

Что было интересного реализовано в этом коде:

  1. Определение сигналов: Созданы метки LONG (рост ≥5%), SHORT (падение ≥5%) и NO_TRADE (изменение <5%) на основе лаговых столбцов, после чего мы используем эти метки чтобы оценить способность модели предсказывать верное направление тренда.
  2. Множество признаков для обучения: Цена открытия (open), закрытия (close), максимум и минимум (max, min), typical_close как среднее (High + Low + Close) / 3 для учета размаха цен внутри свечи, плюс объем (volume).
  3. Разделение данных: Использован TimeSeriesSplit с 5 фолдами для кросс-валидации, с постепенным ростом train выборки.
  4. Настройка модели: PatchTSTForClassification сконфигурирована с 6 входными каналами, контекстом 128, патчами длиной 16 и 4 слоями трансформера.
  5. Обучение модели: Применена функция потерь CrossEntropyLoss с весами классов для учета их дисбаланса, обучение проводилось до 50 эпох с ранней остановкой.
  6. Оценка результатов: Вычислены метрики Precision для каждого класса, построена матрица ошибок и проанализированы ложные срабатывания. Почему Precision? Потому что конкретно в этой задаче нам точность важнее полноты и других метрик, так как потенциальный инвестор в каждой сделке рискует деньгами.

Заключение

Трансформер для временных рядов PatchTST представляет собой значительный шаг вперед в развитии архитектур для прогнозирования. Что интересно — эта модель заложила целый старт развития многих других трансформеров на основе патчей: TimesFM, TimesNet, Crossformer, PatchMixer и др.

Результаты исследования подтверждают три ключевых тезиса о современном состоянии области:

  1. Центральная инновация — разбиение временных рядов на патчи — решила проблему масштабируемости трансформеров, снизив вычислительную сложность с O(L²) до O((L/M)²). Это позволяет эффективно обрабатывать длинные последовательности без существенного роста требований к памяти и времени вычислений;
  2. Стратегия channel independence, вопреки интуитивным ожиданиям, показала превосходство над многомерными подходами. Отказ от явного моделирования межканальных зависимостей снижает риск переобучения и повышает обобщающую способность модели на реальных данных с нестабильными корреляциями;
  3. Улучшение метрик качества на 20-50% при сокращении времени обучения в 2-3 раза делает PatchTST практически применимой архитектурой для production систем.

Таким образом, PatchTST закладывает основу для нового поколения transformer-based решений в области временных рядов, где эффективность архитектуры сочетается с практической применимостью для широкого спектра аналитических задач.

Что касаемо областей применения. Эта нейронная сеть показывает высокую эффективность на структурированных финансовых данных с выраженными циклическими паттернами. Например, в задаче анализа государственных облигаций она продемонстрировала высокую точность долгосрочного прогнозирования. В высоковолатильных сегментах, таких как криптовалюты, модель показывает неплохой потенциал, хотя до высокой точности прогнозов тут еще далеко. Хотя, есть вероятность, что модели нужны более качественные классы, больше фичей для обучения, либо более сложная архитектура, либо интеграция с другими моделями.