За последние годы область прогнозирования временных рядов сделала ощутимый скачок вперед благодаря внедрению архитектур трансформеров. Если раньше в этой области полновластно доминировали бустинги, рекуррентные нейронные сети и классические статистические методы, то сегодня мы наблюдаем настоящий прорыв в точности и эффективности нейросетевых моделей.
Одной из наиболее значимых инноваций стала архитектура PatchTST, которая не просто адаптировала трансформеры для работы с временными рядами, но и внесла фундаментальные изменения в подход к обработке временных данных.
История создания и авторы PatchTST
PatchTST была представлена в 2023 году исследовательской группой под руководством Yuqi Nie, Nam H. Nguyen, Phanwadee Sinthong и Jayant Kalagnanam из IBM Research. Работа была опубликована в рамках конференции NeurIPS 2023 под названием «A Time Series is Worth 64 Words: Long-term Forecasting with Transformers». Это исследование стало результатом многолетней работы команды IBM Research по адаптации трансформеров для задач прогнозирования временных рядов.
Создание PatchTST было мотивировано несколькими ключевыми проблемами существующих подходов:
- Во-первых, классические трансформеры, такие как BERT или GPT, были разработаны для работы с дискретными токенами, в то время как временные ряды представляют собой непрерывные числовые последовательности;
- Во-вторых, большинство существующих решений для временных рядов не могли эффективно улавливать долгосрочные зависимости при сохранении вычислительной эффективности;
- В-третьих, многие модели страдали от проблемы «забывания» важной локальной информации при обработке длинных последовательностей.
Команда IBM Research подошла к решению этих проблем с нетривиальной стороны, заимствовав идеи из компьютерного зрения. Подобно тому, как Vision Transformer разбивает изображение на патчи (patches), PatchTST разделяет временной ряд на сегменты, каждый из которых рассматривается как отдельный «токен» для трансформера.
Архитектура и ключевые новшества PatchTST
Архитектура PatchTST основана на нескольких принципиальных инновациях, которые кардинально отличают ее от предыдущих подходов к прогнозированию временных рядов. Центральная идея заключается в разбиении временного ряда на патчи фиксированной длины, которые затем обрабатываются стандартной архитектурой трансформера.
Рис. 1: (a) Обзор модели PatchTST, в которой каждый батч M временных рядов длиной L обрабатывается независимо (путем решейпинга в размер батча) с помощью Transformer backbone, а затем полученный батч решейпится обратно в M рядов с длиной предсказания T. Каждый одномерный ряд может быть обработан как supervised (b), в этом случае набор векторов с патчами используется для вывода полной длины предсказания, либо как self-supervised (c), если предсказываются маскированные патчи. Источник: https://github.com/huggingface/blog/blob/main/patchtst.md
Процесс начинается с разделения исходного временного ряда длиной L на патчи размером M. Каждый патч представляет собой подпоследовательность временного ряда, которая через линейное преобразование конвертируется в эмбединг вектор размерности T. Это позволяет трансформеру работать с временными рядами так же естественно, как с текстовыми последовательностями или изображениями.
Одним из ключевых преимуществ такого подхода является существенное сокращение длины входной последовательности для трансформера. Вместо обработки L временных точек, модель работает с L/M патчами, что квадратично уменьшает вычислительную сложность механизма внимания с O(L²) до O((L/M)²). При типичных значениях M от 16 до 64, это дает выигрыш в производительности в десятки раз.
Второй важной инновацией стала стратегия channel independence. В отличие от большинства многомерных моделей временных рядов, которые пытаются явно моделировать взаимодействия между различными переменными, PatchTST обрабатывает каждую переменную (канал) независимо. Это решение, которое на первый взгляд может показаться упрощением, на практике показало значительно лучшие результаты.
Причина эффективности channel independence кроется в особенностях реальных данных. В большинстве практических задач корреляции между переменными нестабильны во времени и сильно зависят от контекста. Попытки явного моделирования этих взаимодействий часто приводят к переобучению и ухудшению обобщающей способности модели. PatchTST вместо этого полагается на способность трансформера неявно улавливать важные паттерны в данных каждого канала отдельно.
Рис. 2: Визуализация патчей. Здесь у нас есть последовательность из 15 рядов с длиной патча 5 и шагом 5, в результате чего получается три патча
Третьим новшеством стала специальная стратегия positional encoding для временных рядов. Классические трансформеры используют позиционное кодирование для информирования модели о порядке элементов в последовательности. В PatchTST позиционное кодирование адаптировано для работы с патчами и включает информацию как о позиции патча в последовательности, так и о временной позиции внутри каждого патча.
Сравнение с предшественниками и улучшение метрик
Для понимания революционности PatchTST важно сравнить ее с предшествующими подходами. До появления PatchTST доминировали несколько категорий моделей: классические статистические методы (ARIMA, ETS), градиентные бустинги (XGBoost, LightGBM), рекуррентные нейронные сети (LSTM, GRU), сверточные сети и ранние адаптации трансформеров.
Рекуррентные сети, долгое время считавшиеся золотым стандартом для временных рядов, страдали от проблемы исчезающих градиентов и не могли эффективно обрабатывать очень длинные последовательности. LSTM и GRU решили часть этих проблем, но оставались медленными в обучении из-за последовательной природы вычислений.
Градиентные бустинги, такие как XGBoost, LightGBM и CatBoost, тоже показали хорошую эффективность для прогнозирования временных рядов. Однако для достижения высокой точности для них требовалась тщательная генерация множества признаков, что предполагает глубокое понимание доменной области и особенностей анализируемого временного ряда. Кроме того, процесс подбора оптимальных гиперпараметров мог занять целую вечность.
Ранние попытки применения трансформеров к временным рядам, такие как Temporal Fusion Transformer (TFT) или Informer, показали многообещающие результаты, но сталкивались с проблемами масштабируемости и интерпретируемости. Они также требовали значительных вычислительных ресурсов и сложной инженерии признаков.
PatchTST продемонстрировал впечатляющие улучшения по всем ключевым метрикам. В бенчмарке на стандартных датасетах (ETTh1, ETTh2, ETTm1, Weather, Traffic) модель показала улучшение Mean Squared Error на 20-40% по сравнению с лучшими предшествующими решениями. Особенно впечатляющими были результаты на длинных горизонтах прогнозирования (720 временных точек), где улучшение достигало 50%.
Рис. 3: Результаты многомерного долгосрочного прогнозирования с помощью Supervised PatchTST. Длительность предсказания составляет 24,36,48,60 для датасета ILI и 96,192,336,720 для остальных. Сравнение PatchTST с другими нейронными сетями. Лучшие результаты выделены жирным шрифтом, топ-2 — подчеркнуты
Важно отметить, что улучшения коснулись не только точности, но и эффективности. PatchTST требует в 2-3 раза меньше времени на обучение по сравнению с другими transformed-based моделями, сохраняя при этом сопоставимое или лучшее качество прогнозов. Необходимая для обучения память также сократилась благодаря уменьшению длины последовательностей.
Рис. 4: Эффективность прогнозирования (MSE) с различными окнами look-back на трех больших датасетах: Электричество, Дорожный трафик и Погода. Значения окон look-back: 24,48,96,192,336,720, значения горизонтов прогнозирования: 97,720. Для этого эксперимента использовался Supervised PatchTST/42 и другие модели на основе трансформеров с открытым исходным кодом
Пример использования PatchTST №1: Прогнозирование цен облигаций США
С теорией разобрались. Давайте теперь посмотрим как эта модель работает на практике.
Первая практическая модель — применение PatchTST для прогнозирования относительно стабильных финансовых инструментов. 10-летние облигации США представляют собой оптимальный случай для демонстрации возможностей модели на данных с выраженными трендами и циклическими паттернами, но без экстремальной волатильности.
Первый шаг — установка модели из репозитория и установка библиотеки трансформеров.
!pip install git+https://github.com/IBM/tsfm.git
!pip install transformers
#Импорт библиотек и загрузка данных
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from transformers import PatchTSTConfig, PatchTSTForPrediction, Trainer, TrainingArguments, EarlyStoppingCallback, set_seed
from tsfm_public.toolkit.dataset import ForecastDFDataset
from tsfm_public.toolkit.time_series_preprocessor import TimeSeriesPreprocessor
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import TimeSeriesSplit
import torch
import warnings
warnings.filterwarnings("ignore", module="torch")
# Установка seed для воспроизводимости
set_seed(2025)
# Загрузка данных
ticker = 'ZN=F'
start_date = '2018-05-01'
end_date = '2025-05-01'
data = yf.download(ticker, start=start_date, end=end_date)
# Сброс индекса и обработка мультииндекса
data = data.reset_index()
if isinstance(data.columns, pd.MultiIndex):
data.columns = [col[0] if col[0] != '' else col[1] for col in data.columns]
data['Date'] = pd.to_datetime(data['Date'])
print("Длина данных:", len(data))
print("Колонки после загрузки из yfinance:", data.columns.tolist())
# Расчёт Typical Close
data['Typical_Close'] = (data['High'] + data['Low'] + data['Close']) / 3
print("\nПроверка первых 5 строк данных:")
print(data[['Date', 'Open', 'High', 'Low', 'Close', 'Typical_Close']].head().to_string())
Длина данных: 1762
Колонки после загрузки из yfinance: ['Date', 'Close', 'High', 'Low', 'Open', 'Volume']
Проверка первых 5 строк данных:
Date Open High Low Close Typical_Close
0 2018-05-01 119.546875 119.578125 119.359375 119.375000 119.437500
1 2018-05-02 119.453125 119.562500 119.250000 119.515625 119.442708
2 2018-05-03 119.515625 119.828125 119.437500 119.687500 119.651042
3 2018-05-04 119.687500 119.984375 119.562500 119.703125 119.750000
4 2018-05-07 119.625000 119.765625 119.578125 119.687500 119.677083
После загрузки котировок далее мы указываем параметры обучения модели. Определим горизонт прогнозирования в 20 будущих временных точек, длину контекстного окна (то что будет запоминать модель при обучении) в 128 точек, а длину патча (отрезки для подачи в трансформер) — в 32 точки.
# Настройка PatchTST и запуск обучения модели
# Параметры
context_length = 128
forecast_horizon = 20
patch_length = 32
timestamp_column = 'Date'
id_columns = []
target_columns = ['Typical_Close']
# Разделение данных
N = len(data)
num_train = int(N * 0.7)
num_test = int(N * 0.2)
num_valid = N - num_train - num_test
border1s = [0, num_train - context_length, N - num_test - context_length]
border2s = [num_train, num_train + num_valid, N]
train_data = data.iloc[border1s[0]:border2s[0]].copy()
valid_data = data.iloc[border1s[1]:border2s[1]].copy()
test_data = data.iloc[border1s[2]:border2s[2]].copy()
# Вывод информации о разделении
print("\nРазделение данных:")
print(f"Train: {border1s[0]} до {border2s[0]} (длина: {len(train_data)})")
print(f"Valid: {border1s[1]} до {border2s[1]} (длина: {len(valid_data)})")
print(f"Test: {border1s[2]} до {border2s[2]} (длина: {len(test_data)})")
# Масштабирование
scaler = StandardScaler()
train_data['Typical_Close_scaled'] = scaler.fit_transform(train_data[['Typical_Close']])
valid_data['Typical_Close_scaled'] = scaler.transform(valid_data[['Typical_Close']])
test_data['Typical_Close_scaled'] = scaler.transform(test_data[['Typical_Close']])
target_columns_scaled = ['Typical_Close_scaled']
# Кросс-валидация
print("\nПроведение кросс-валидации с 10 фолдами...")
tscv = TimeSeriesSplit(n_splits=10, test_size=forecast_horizon, max_train_size=num_train)
cv_mape_scores = []
for fold, (train_idx, val_idx) in enumerate(tscv.split(train_data)):
print(f"Обучение фолда {fold + 1}/10")
cv_train_data = train_data.iloc[train_idx].copy()
cv_val_data = train_data.iloc[val_idx].copy()
if len(cv_train_data) < context_length + forecast_horizon:
print(f"Пропуск фолда {fold + 1}: недостаточная длина данных ({len(cv_train_data)})")
continue
cv_train_data['Typical_Close_scaled'] = scaler.fit_transform(cv_train_data[['Typical_Close']])
cv_val_data['Typical_Close_scaled'] = scaler.transform(cv_val_data[['Typical_Close']])
cv_preprocessor = TimeSeriesPreprocessor(
timestamp_column=timestamp_column,
id_columns=id_columns,
target_columns=target_columns_scaled,
scaling=False
)
cv_preprocessor = cv_preprocessor.train(cv_train_data)
cv_train_dataset = ForecastDFDataset(
data=cv_preprocessor.preprocess(cv_train_data),
id_columns=id_columns,
timestamp_column=timestamp_column,
target_columns=target_columns_scaled,
context_length=context_length,
prediction_length=forecast_horizon
)
cv_val_dataset = ForecastDFDataset(
data=cv_preprocessor.preprocess(cv_val_data),
id_columns=id_columns,
timestamp_column=timestamp_column,
target_columns=target_columns_scaled,
context_length=context_length,
prediction_length=forecast_horizon
)
cv_config = PatchTSTConfig(
num_input_channels=1,
context_length=context_length,
patch_length=patch_length,
patch_stride=patch_length,
prediction_length=forecast_horizon,
random_mask_ratio=0.1,
d_model=128,
num_attention_heads=16,
num_hidden_layers=3,
ffn_dim=256,
dropout=0.05,
head_dropout=0.05,
scaling="std",
loss="mse",
pre_norm=True,
norm_type="batchnorm"
)
cv_model = PatchTSTForPrediction(cv_config)
cv_training_args = TrainingArguments(
output_dir=f"./checkpoint/patchtst/ZN_F/cv_fold_{fold}/",
overwrite_output_dir=True,
num_train_epochs=30,
do_eval=True,
eval_strategy="epoch",
per_device_train_batch_size=64,
per_device_eval_batch_size=64,
dataloader_num_workers=16,
save_strategy="epoch",
logging_strategy="epoch",
save_total_limit=1,
load_best_model_at_end=True,
metric_for_best_model="eval_loss",
greater_is_better=False,
label_names=["future_values"],
learning_rate=1e-4
)
cv_trainer = Trainer(
model=cv_model,
args=cv_training_args,
train_dataset=cv_train_dataset,
eval_dataset=cv_val_dataset,
callbacks=[EarlyStoppingCallback(early_stopping_patience=10, early_stopping_threshold=0.0001)]
)
cv_trainer.train()
cv_prediction_output = cv_trainer.predict(cv_val_dataset)
cv_predictions = cv_prediction_output.predictions
cv_labels = cv_prediction_output.label_ids
if isinstance(cv_predictions, tuple):
cv_predictions = cv_predictions[0]
if isinstance(cv_labels, tuple):
cv_labels = cv_labels[0]
cv_predictions = np.array(cv_predictions) if isinstance(cv_predictions, torch.Tensor) else cv_predictions
cv_labels = np.array(cv_labels) if isinstance(cv_labels, torch.Tensor) else cv_labels
cv_predicted_values = scaler.inverse_transform(cv_predictions.reshape(-1, 1)).reshape(-1, forecast_horizon)
cv_actual_values = scaler.inverse_transform(cv_labels.reshape(-1, 1)).reshape(-1, forecast_horizon)
cv_mape = np.mean(np.abs((cv_actual_values.flatten() - cv_predicted_values.flatten()) / cv_actual_values.flatten())) * 100
cv_mape_scores.append(cv_mape)
print(f"Фолд {fold + 1} MAPE: {cv_mape:.2f}%")
if cv_mape_scores:
print(f"\nРезультаты кросс-валидации:")
print(f"Средний MAPE: {np.mean(cv_mape_scores):.2f}%")
print(f"Стандартное отклонение MAPE: {np.std(cv_mape_scores):.2f}%")
else:
print("\nНет валидных фолдов для кросс-валидации.")
# Подготовка данных для финальной модели
preprocessor = TimeSeriesPreprocessor(
timestamp_column=timestamp_column,
id_columns=id_columns,
target_columns=target_columns_scaled,
scaling=False
)
preprocessor = preprocessor.train(train_data)
train_dataset = ForecastDFDataset(
data=preprocessor.preprocess(train_data),
id_columns=id_columns,
timestamp_column=timestamp_column,
target_columns=target_columns_scaled,
context_length=context_length,
prediction_length=forecast_horizon
)
valid_dataset = ForecastDFDataset(
data=preprocessor.preprocess(valid_data),
id_columns=id_columns,
timestamp_column=timestamp_column,
target_columns=target_columns_scaled,
context_length=context_length,
prediction_length=forecast_horizon
)
test_dataset = ForecastDFDataset(
data=preprocessor.preprocess(test_data),
id_columns=id_columns,
timestamp_column=timestamp_column,
target_columns=target_columns_scaled,
context_length=context_length,
prediction_length=forecast_horizon
)
# Конфигурация модели
config = PatchTSTConfig(
num_input_channels=1,
context_length=context_length,
patch_length=patch_length,
patch_stride=patch_length,
prediction_length=forecast_horizon,
random_mask_ratio=0.1,
d_model=128,
num_attention_heads=16,
num_hidden_layers=3,
ffn_dim=256,
dropout=0.05,
head_dropout=0.05,
scaling="std",
loss="mse",
pre_norm=True,
norm_type="batchnorm"
)
model = PatchTSTForPrediction(config)
# Аргументы обучения
training_args = TrainingArguments(
output_dir="./checkpoint/patchtst/ZN_F/output/",
overwrite_output_dir=True,
num_train_epochs=100,
do_eval=True,
eval_strategy="epoch",
per_device_train_batch_size=64,
per_device_eval_batch_size=64,
dataloader_num_workers=16,
save_strategy="epoch",
logging_strategy="epoch",
save_total_limit=3,
load_best_model_at_end=True,
metric_for_best_model="eval_loss",
greater_is_better=False,
label_names=["future_values"],
learning_rate=1e-4
)
early_stopping = EarlyStoppingCallback(
early_stopping_patience=10,
early_stopping_threshold=0.0001
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=valid_dataset,
callbacks=[early_stopping]
)
# Обучение модели
trainer.train()
Разделение данных:
Train: 0 до 1233 (длина: 1233)
Valid: 1105 до 1410 (длина: 305)
Test: 1282 до 1762 (длина: 480)
Проведение кросс-валидации с 10 фолдами...
Обучение фолда 10/10
Рис. 5: Графики динамики Loss на Train и Val наборах данных
# Прогноз на тестовом наборе
prediction_output = trainer.predict(test_dataset)
predictions = prediction_output.predictions
labels = prediction_output.label_ids
if isinstance(predictions, tuple):
print("Predictions — кортеж, выбираем первый элемент")
predictions = predictions[0]
if isinstance(labels, tuple):
print("Labels — кортеж, выбираем первый элемент")
labels = labels[0]
predictions = np.array(predictions) if isinstance(predictions, torch.Tensor) else predictions
labels = np.array(labels) if isinstance(labels, torch.Tensor) else labels
predicted_values = scaler.inverse_transform(predictions.reshape(-1, 1)).reshape(-1, forecast_horizon)
actual_values = scaler.inverse_transform(labels.reshape(-1, 1)).reshape(-1, forecast_horizon)
overall_mape = np.mean(np.abs((actual_values.flatten() - predicted_values.flatten()) / actual_values.flatten())) * 100
print(f"Общий MAPE на тестовом наборе: {overall_mape:.2f}%")
# Прогноз на следующие 20 свечей
input_data = data.iloc[num_train - context_length:num_train].copy()
input_data['Typical_Close_scaled'] = scaler.transform(input_data[['Typical_Close']])
scaled_input = preprocessor.preprocess(input_data)[target_columns_scaled].values
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
past_values = torch.tensor(scaled_input[-context_length:].reshape(1, context_length, -1), dtype=torch.float32).to(device)
with torch.no_grad():
prediction = model(past_values).prediction_outputs
predicted_tc = scaler.inverse_transform(prediction.cpu().numpy().reshape(-1, 1)).flatten()
# Реконструкция OHLC
train_data['high_dev'] = train_data['High'] - train_data['Typical_Close']
train_data['low_dev'] = train_data['Typical_Close'] - train_data['Low']
mean_high_dev = train_data['high_dev'].mean()
mean_low_dev = train_data['low_dev'].mean()
last_close = input_data['Close'].iloc[-1]
forecast_candles = []
previous_close = last_close
for tc_pred in predicted_tc:
open_c = previous_close
high_c = tc_pred + mean_high_dev
low_c = tc_pred - mean_low_dev
close_c = 3 * tc_pred - high_c - low_c
forecast_candles.append({
'open': open_c,
'high': high_c,
'low': low_c,
'close': close_c,
'predicted_typical_close': tc_pred
})
previous_close = close_c
forecast_df = pd.DataFrame(forecast_candles)
forecast_df['Date'] = data['Date'].iloc[num_train:min(num_train + 20, len(data))].values
# Корректное присвоение actual_typical_close
forecast_start = num_train
forecast_end = min(num_train + 20, len(data))
forecast_df['actual_typical_close'] = data['Typical_Close'].iloc[forecast_start:forecast_end].values
# Проверка фактических значений для периода прогноза
print("\nФактические значения для периода прогноза (индексы {}–{}):".format(forecast_start, forecast_end - 1))
actual_check = data[['Date', 'Open', 'High', 'Low', 'Close', 'Typical_Close']].iloc[forecast_start:forecast_end]
print(actual_check.to_string(index=False))
# Расчёт MAPE для прогноза
mape_specific = np.mean(np.abs((forecast_df['actual_typical_close'] - forecast_df['predicted_typical_close']) / forecast_df['actual_typical_close'])) * 100
print(f"MAPE для прогноза на 20 свечей: {mape_specific:.2f}%")
# Вывод таблицы прогноза
print("\nПрогноз на следующие 20 свечей:")
print(forecast_df[['open', 'high', 'low', 'close', 'predicted_typical_close', 'actual_typical_close']].to_string(index=False))
# Визуализации
# 1. Исторический прогноз
test_start_index = border1s[2]
num_samples = len(test_dataset)
pred_dates = data['Date'].iloc[test_start_index + context_length:test_start_index + context_length + num_samples]
predicted_1step = predicted_values[:, 0]
plt.figure(figsize=(15, 5))
plt.plot(data['Date'], data['Typical_Close'], color='gray', label='Фактический Typical Close')
plt.plot(pred_dates, predicted_1step, color='red', label='Предсказанный Typical Close (1 шаг)')
plt.legend()
plt.title('Реальные котировки (серый) + Исторический прогноз (красный)')
plt.xlabel('Дата')
plt.ylabel('Typical Close')
plt.savefig('historical_forecast.png')
plt.close()
# 2. Прогноз на 20 периодов
historical_window = 90
historical_dates = data['Date'].iloc[max(0, num_train - historical_window):num_train]
historical_values = data['Typical_Close'].iloc[max(0, num_train - historical_window):num_train]
forecast_dates = data['Date'].iloc[forecast_start:forecast_end]
actual_tc_forecast = data['Typical_Close'].iloc[forecast_start:forecast_end]
plt.figure(figsize=(15, 5))
plt.plot(historical_dates, historical_values, color='gray', label='Исторические данные (последние 90 баров)')
plt.plot(forecast_dates, actual_tc_forecast, color='gray', linestyle='--', label='Фактический период прогноза')
plt.plot(forecast_dates, forecast_df['predicted_typical_close'], color='red', label='Предсказанный')
plt.legend()
plt.title('Последние 90 баров (серый) + Прогноз на 20 периодов (красный)')
plt.xlabel('Дата')
plt.ylabel('Typical Close')
plt.savefig('20_period_forecast.png')
plt.close()
print("\nВизуализации сохранены как 'historical_forecast.png' и '20_period_forecast.png'")
Результаты кросс-валидации:
Средний MAPE: 12.92%
Стандартное отклонение MAPE: 1.56%
Общий MAPE на тестовом наборе: 0.87%
MAPE для прогноза на 20 свечей: 0.63%
Фактические значения для периода прогноза (индексы 1233–1242):
Date Open High Low Close Typical_Close
2023-03-24 115.890625 117.046875 115.671875 116.109375 116.276042
2023-03-27 116.171875 116.203125 114.875000 114.984375 115.354167
2023-03-28 114.984375 115.234375 114.531250 114.640625 114.802083
2023-03-29 114.578125 114.875000 114.218750 114.531250 114.541667
2023-03-30 114.468750 114.734375 114.218750 114.578125 114.510417
2023-03-31 114.531250 115.109375 114.281250 114.921875 114.770833
2023-04-03 114.906250 115.671875 114.562500 115.453125 115.229167
2023-04-04 115.578125 116.281250 115.078125 116.250000 115.869792
2023-04-05 116.171875 116.937500 115.906250 116.578125 116.473958
2023-04-06 116.390625 116.937500 116.343750 116.546875 116.609375
Рис. 6: Сравнение динамики реальных котировок гос.облигаций США (серая линия) и предсказаний PatchTST с 2024 года
Рис. 7: График ближайшего краткосрочного прогноза PatchTST на 20 дней
Данная реализация демонстрирует полный цикл работы с PatchTST для прогнозирования цен облигаций. Модель использует патчи размером 32 временных точек, что позволяет эффективно улавливать как краткосрочные колебания, так и долгосрочные тренды в данных облигаций. Особенно важна стратегия восстановления OHLC-данных из прогнозируемых значений Typical Close с использованием исторических отклонений.
Давайте рассмотрим кратко что тут происходит в коде:
- Мы уходим от классического Close, потому что он не отражает размах цен внутри свечей и вычисляем Typical_Close как среднее (High + Low + Close) / 3 для каждой свечи, которая используется как целевая переменная;
- Далее разделяем данные на обучающую (70%), валидационную (10%) и тестовую (20%) выборки;
- Приводим Typical_Close к единому масштабу с помощью StandardScaler;
- Используем TimeSeriesPreprocessor и ForecastDFDataset из библиотеки tsfm_public для подготовки временных рядов с учетом контекста (context_length=128) и горизонта прогноза (forecast_horizon=20);
- Проводим кросс-валидацию с 10 фолдами, используя TimeSeriesSplit, чтобы оценить стабильность модели на разных отрезках обучающих данных;
- Для каждого фолда обучаем модель и вычисляем MAPE (Mean Absolute Percentage Error), выводя средний MAPE и его стандартное отклонение;
- Обучаем модель PatchTST с параметрами: context_length=128, patch_length=32, d_model=128, с 3 слоями и минимальной регуляризацией (dropout=0.05). Указываем как параметры 100 эпох с ранней остановкой (early_stopping_patience=10) для предотвращения переобучения, используя MSE как функцию потерь;
- Делаем прогноз на 20 свечей вперед после обучающей выборки, предсказывая Typical_Close. Реконструируем OHLC (Open, High, Low, Close) для прогнозных свечей, используя средние отклонения High и Low от Typical_Close на обучающих данных;
- Вычисляем MAPE для прогноза, сравнивая предсказанный и фактический Typical_Close.
Обратите внимание — для загрузки модели, ее весов, мы используем wandb.ai. Это платформа для отслеживания, визуализации и управления экспериментами в машинном обучении, которая помогает анализировать и оптимизировать модели в реальном времени. Если вы будете использовать этот код вам потребуется получить API ключ. Получить его бесплатно (за регистрацию) можно здесь: https://wandb.ai/authorize?ref=models.
Выбор 10-летних облигаций США в качестве первого примера не случаен. Этот инструмент характеризуется относительной стабильностью, четко выраженными циклическими паттернами, связанными с макроэкономическими циклами, и меньшей подверженностью краткосрочному рыночному шуму. PatchTST особенно эффективно работает с такими данными благодаря способности трансформера улавливать долгосрочные зависимости через механизм внимания.
Пример использования PatchTST №2: Прогнозирование роста или снижения Bitcoin
Второй пример демонстрирует применение PatchTST для гораздо более сложной задачи — прогнозирования направления движения Bitcoin. Криптовалютные рынки отличаются экстремальной волатильностью, нелинейными зависимостями и частыми структурными сдвигами, что делает их идеальным полигоном для тестирования продвинутых архитектур машинного обучения.
Учитывая, что ни одна из open-source моделей машинного обучения даже близко не приблизилась к прогнозированию котировок криптовалют с высокой точностью, мы немного упростим задачу для PatchTST и попросим посчитать для нас вероятности сигналов: роста (LONG) на 5%, снижения (SHORT) на 5% в течение 5 следующих периодов, либо отсутствие вероятности таких больших движений и соответственно сигнал NO TRADE. То есть попросим решить задачу supervised-classification, где мы укажем, когда должен быть тот или иной сигнал на train, и попробуем обучить модель предсказывать их на test.
# Импорт библиотек и загрузка данных
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from transformers import PatchTSTConfig, PatchTSTForClassification, Trainer, TrainingArguments, EarlyStoppingCallback
from tsfm_public.toolkit.dataset import ForecastDFDataset
from tsfm_public.toolkit.time_series_preprocessor import TimeSeriesPreprocessor
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import precision_score, confusion_matrix
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from typing import Optional
import os
from tqdm import tqdm
import time
import warnings
warnings.filterwarnings('ignore')
# Установка seed для воспроизводимости
torch.manual_seed(2025)
np.random.seed(2025)
# Загрузка котировок Bitcoin
ticker = 'BTC-USD'
start_date = '2018-05-01'
end_date = '2025-05-01'
data = yf.download(ticker, start=start_date, end=end_date)
# Сброс индекса и обработка мультииндекса
data = data.reset_index()
if isinstance(data.columns, pd.MultiIndex):
data.columns = [col[0] if col[0] != '' else col[1] for col in data.columns]
data['Date'] = pd.to_datetime(data['Date'])
print("Длина данных:", len(data))
print("Колонки после загрузки из yfinance:", data.columns.tolist())
# Расчет Typical Close
data['Typical_Close'] = (data['High'] + data['Low'] + data['Close']) / 3
print("\nПроверка первых 5 строк данных:")
print(data[['Date', 'Open', 'High', 'Low', 'Close', 'Typical_Close']].head().to_string())
Колонки после загрузки из yfinance: ['Date', 'Close', 'High', 'Low', 'Open', 'Volume']
Проверка первых 5 строк данных:
Date Open High Low Close Typical_Close
0 2018-05-01 9251.469727 9255.879883 8891.049805 9119.009766 9088.646484
1 2018-05-02 9104.599609 9256.519531 9015.139648 9235.919922 9169.193034
2 2018-05-03 9233.969727 9798.330078 9188.150391 9743.860352 9576.780273
3 2018-05-04 9695.500000 9779.200195 9585.959961 9700.759766 9688.639974
4 2018-05-05 9700.280273 9964.500000 9695.120117 9858.150391 9839.256836
# Обогащаем датафрейм лаговыми столбцами в диапазоне 1-5 дней
for lag in range(1, 6):
data[f'Tclose_lag{lag}'] = data['Typical_Close'].shift(-lag)
# Добавляем forward fill для заполнения NaN значений
for lag in range(1, 6):
col_name = f'Tclose_lag{lag}'
data[col_name] = data[col_name].fillna(method='ffill')
# Список лаговых столбцов
lag_columns = [f'Tclose_lag{lag}' for lag in range(1, 6)]
# Создание булевых столбцов с True при изменении цены на 5% или более в будущем
data['Plus5change'] = (data[lag_columns] >= data['Typical_Close'].values[:, None] * 1.05).any(axis=1)
data['Minus5change'] = (data[lag_columns] <= data['Typical_Close'].values[:, None] * 0.95).any(axis=1)
data['Flat'] = ( (~data['Plus5change'] & ~data['Minus5change']) |
(data['Plus5change'] & data['Minus5change']) )
# Создание столбца с торговыми сигналами
conditions = [ (data['Plus5change'] == True) & (data['Minus5change'] == False), # LONG
(data['Minus5change'] == True) & (data['Plus5change'] == False) ] # SHORT
choices = ['LONG', 'SHORT']
data['Signal'] = np.select(conditions, choices, default='NO_TRADE')
# Финальная выборка колонок
data = data[['Date', 'Close', 'High', 'Low', 'Open', 'Typical_Close', 'Volume', 'Signal']]
data.head(5)
Рис. 8: Датафрейм с котировками Биткоина с сигналами LONG, SHORT, NO_TRADE, построенным по лаговым признакам на горизонте ближайших 1-5 дней
# Построение графика котировок с вертикальными линиями сигналов LONG и SHORT
def plot_signals(data, start_date, end_date):
# Фильтруем данные по диапазону дат
filtered_data = data[(data['Date'] >= start_date) & (data['Date'] <= end_date)]
fig, ax = plt.subplots(figsize=(14, 6))
# Рисуем серую горизонтальную линию Typical_Close
ax.plot(filtered_data['Date'], filtered_data['Typical_Close'], color='gray', label='Typical_Close')
# Находим индексы для сигналов LONG и SHORT
long_dates = filtered_data[filtered_data['Signal'] == 'LONG']['Date']
short_dates = filtered_data[filtered_data['Signal'] == 'SHORT']['Date']
# Добавляем вертикальные зеленые линии для LONG
for date in long_dates:
ax.axvline(x=date, color='lightgreen', linestyle='-', linewidth=1.5)
# Добавляем вертикальные красные линии для SHORT
for date in short_dates:
ax.axvline(x=date, color='coral', linestyle='-', linewidth=1.5)
# Оформление графика
ax.set_title('Typical_Close с сигналами LONG/SHORT')
ax.set_xlabel('Дата')
ax.set_ylabel('Цена')
ax.grid(True)
ax.legend(['Typical_Close', 'LONG сигналы', 'SHORT сигналы'])
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
plot_signals(data, start_date='2018-05-01', end_date='2019-05-01')
plot_signals(data, start_date='2019-05-01', end_date='2020-05-01')
plot_signals(data, start_date='2020-05-01', end_date='2021-05-01')
plot_signals(data, start_date='2021-05-01', end_date='2022-05-01')
plot_signals(data, start_date='2022-05-01', end_date='2023-05-01')
plot_signals(data, start_date='2023-05-01', end_date='2024-05-01')
plot_signals(data, start_date='2024-05-01', end_date='2025-05-01')
Рис. 9: График цен Bitcoin с сигналами покупки и продажи с мая 2018 по май 2019
Рис. 10: График с мая 2019 по май 2020
Рис. 11: График с мая 2020 по май 2021
Рис. 12: График с мая 2021 по май 2022
Рис. 13: График с мая 2022 по май 2023
Рис. 14: График с мая 2023 по май 2024
Рис. 15: График с мая 2024 по май 2025
На графиках выше мы видим зеленые зоны (там где Signal=LONG) и красные зоны (там где Signal=SHORT) на линейном чарте котировок Биткоина. И совершение сделок в этих зонах — наиболее благоприятное для трейдинга с точки зрения времени-доходности, так как в большинстве случаев цена в этих зонах уходит не просто на 5%, а существенно больше, и довольно стремительно.
Графики впечатляют, однако увы эти сигналы непригодны для торговли, так как построены на лагах на 1-5 периодов вперед, т. е. заглядывают в будущее. Но почему не протестировать сможет ли нейросеть PatchTST найти какие-то закономерности между этими сигналами и предыдущими котировками? Сможет ли предсказывать эти сигналы? Давайте посмотрим…
# Энкодинг. Преобразование категориальных меток в числа
label_map = {'LONG': 0, 'SHORT': 1, 'NO_TRADE': 2}
data['Signal'] = data['Signal'].map(label_map)
# Проверка таблицы после маппинга
print("Уникальные значения в столбце Signal после кодирования:", data['Signal'].unique())
print("Первые 5 строк данных с закодированным Signal:")
print(data[['Date', 'Close', 'High', 'Low', 'Open', 'Typical_Close', 'Volume', 'Signal']].head().to_string())
Уникальные значения в столбце Signal после кодирования: [0 2 1]
Первые 5 строк данных с закодированным Signal:
Date Close High Low Open Typical_Close Volume Signal
0 2018-05-01 9119.009766 9255.879883 8891.049805 9251.469727 9088.646484 7713019904 0
1 2018-05-02 9235.919922 9256.519531 9015.139648 9104.599609 9169.193034 7558159872 0
2 2018-05-03 9743.860352 9798.330078 9188.150391 9233.969727 9576.780273 10207299584 2
3 2018-05-04 9700.759766 9779.200195 9585.959961 9695.500000 9688.639974 8217829888 2
4 2018-05-05 9858.150391 9964.500000 9695.120117 9700.280273 9839.256836 7651939840 1
# Выбираем столбцы с признаками
feature_columns = ['Close', 'High', 'Low', 'Open', 'Typical_Close', 'Volume']
X = data[feature_columns].values
y = data['Signal'].values
# Масштабируем признаки
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Кросс-валидация на 5 фолдов
tscv = TimeSeriesSplit(n_splits=5)
for fold, (train_index, test_index) in enumerate(tscv.split(X_scaled)):
print(f"Фолд {fold + 1}: Размер тренировочной выборки: {len(train_index)}, Размер тестовой выборки: {len(test_index)}")
# Посмотрим на размеры последнего фолда
train_index, test_index = list(tscv.split(X_scaled))[-1]
X_train, X_test = X_scaled[train_index], X_scaled[test_index]
y_train, y_test = y[train_index], y[test_index]
print("Размер X_train:", X_train.shape)
print("Размер X_test:", X_test.shape)
print("Размер y_train:", y_train.shape)
print("Размер y_test:", y_test.shape)
Фолд 1: Размер тренировочной выборки: 427, Размер тестовой выборки: 426
Фолд 2: Размер тренировочной выборки: 853, Размер тестовой выборки: 426
Фолд 3: Размер тренировочной выборки: 1279, Размер тестовой выборки: 426
Фолд 4: Размер тренировочной выборки: 1705, Размер тестовой выборки: 426
Фолд 5: Размер тренировочной выборки: 2131, Размер тестовой выборки: 426
Размер X_train: (2131, 6)
Размер X_test: (426, 6)
Размер y_train: (2131,)
Размер y_test: (426,)
Мы выполнили энкодинг и кросс-валидацию. Она скользящая — со сдвигом вправо трейн-выборка растет, тестовая — свдигается вместе с окончанием первой.
# Создаем кастомный датасет для PatchTST
class TimeSeriesClassificationDataset(Dataset):
def __init__(self, X, y, context_length):
self.X = X # Нормализованные признаки (n_samples, n_features)
self.y = y # Закодированные метки
self.context_length = context_length
self.num_samples = len(X) - context_length + 1 # Число возможных последовательностей
def __len__(self):
return self.num_samples
def __getitem__(self, idx):
# Извлечение последовательности context_length
past_values = self.X[idx:idx + self.context_length]
target_values = self.y[idx + self.context_length - 1] # Метка в конце последовательности
return {
'past_values': torch.tensor(past_values, dtype=torch.float32),
'target_values': torch.tensor(target_values, dtype=torch.long)
}
# Параметры
context_length = 128 # Количество временных шагов в каждой последовательности
num_input_channels = len(feature_columns) # 6 признаков
try:
# Создаем train и test датасеты
train_dataset = TimeSeriesClassificationDataset(X_train, y_train, context_length)
test_dataset = TimeSeriesClassificationDataset(X_test, y_test, context_length)
# Проверка
sample_data = train_dataset[0] # 1й сэмпл
print("Размер past_values в первом тренировочном сэмпле:", sample_data['past_values'].shape)
print("Значение target_values в первом тренировочном сэмпле:", sample_data['target_values'].item())
print("Количество тренировочных сэмплов:", len(train_dataset))
print("Количество тестовых сэмплов:", len(test_dataset))
except Exception as e:
print("Ошибка при создании датасета:", str(e))
Размер past_values в первом тренировочном сэмпле: torch.Size([128, 6])
Значение target_values в первом тренировочном сэмпле: 1
Количество тренировочных сэмплов: 2004
Количество тестовых сэмплов: 299
# Конфигурация модели
config = PatchTSTConfig(
num_input_channels=num_input_channels, #6 признаков
num_targets=3, #3 класса: LONG (0), SHORT (1), NO_TRADE (2)
context_length=context_length, #длина контекста = 128 временных рядов
patch_length=16, #Длина каждого патча
stride=16, #Длина страйда (перекрытия) для патчей
use_cls_token=True, #Используем CLS токен для классификации
num_attention_heads=4, #Количество голов внимания
d_model=256, #Размер слоев модели
num_layers=4, #Количество слоев
)
model = PatchTSTForClassification(config=config)
device = torch.device("cuda") #Или cpu
model = model.to(device)
# Проверка
print("Количество входных каналов:", config.num_input_channels)
print("Количество классов:", config.num_targets)
print("Длина контекста:", config.context_length)
print("Размер модели (параметры):", sum(p.numel() for p in model.parameters() if p.requires_grad))
Количество входных каналов: 6
Количество классов: 3
Длина контекста: 128
Размер модели (параметры): 1590531
Мы сконфигурировали не слишком тяжелую модель, но и не примитивную — на 1,6 млн параметров.
Давайте посмотрим на распределение классов.
print("Class distribution in dataset:")
print(data['Signal'].value_counts(normalize=True))
Class distribution in dataset:
Signal
2 0.532655
0 0.263981
1 0.203363
Name: proportion, dtype: float64
Диспропорция классов есть. Чаще всего в нашем датасете был сигнал NO_TRADE (в 53.2% случаев), затем LONG (26.3%), и SHORT (в 20.3% случаев).
Чтобы модель не переобучилась на доминирующий класс, логично назначить классам различные веса.
# Рассчитываем веса классов
label_map_reverse = {0: 'LONG', 1: 'SHORT', 2: 'NO_TRADE'}
class_counts = pd.Series([label_map_reverse[label] for label in y]).value_counts()
total_samples = len(y)
weights = torch.tensor([total_samples / (3 * class_counts[label]) for label in ['LONG', 'SHORT', 'NO_TRADE']]).to(device)
print("Class weights:", weights.tolist())
Class weights: [1.262716049382716, 1.639102564102564, 0.6257953989231523]
# Выбираем устройство для обучения модели
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
print("Device used:", device)
# Определяем какая будет функция потерь и оптимизатор
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=0.01)
# Создаем даталоадеры
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False)
# Функция для замера Precision индивидуально для каждого класса
def compute_precision(y_true, y_pred):
precision_per_class = precision_score(y_true, y_pred, average=None, labels=[0, 1, 2])
return {
'precision_LONG': precision_per_class[0],
'precision_SHORT': precision_per_class[1],
'precision_NO_TRADE': precision_per_class[2]
}
# Проверка DataLoader и входных данных модели
try:
# Берем один батч из train_dataloader
batch = next(iter(train_dataloader))
past_values = batch['past_values'].to(device)
target_values = batch['target_values'].to(device)
# Forward pass для проверки совместимости моделей
model.eval()
with torch.no_grad():
outputs = model(past_values=past_values)
logits = outputs.prediction_logits
predictions = torch.argmax(logits, dim=-1)
loss = criterion(logits, target_values)
print("Shape of past_values in batch:", past_values.shape)
print("Shape of target_values in batch:", target_values.shape)
print("Shape of model logits:", logits.shape)
print("Sample predictions:", predictions.cpu().numpy()[:5])
print("Sample loss:", loss.item())
print("Number of training batches:", len(train_dataloader))
print("Number of evaluation batches:", len(test_dataloader))
except Exception as e:
print("Ошибка при проверке DataLoader и модели:", str(e))
Device used: cuda
Shape of past_values in batch: torch.Size([16, 128, 6])
Shape of target_values in batch: torch.Size([16])
Shape of model logits: torch.Size([16, 3])
Sample predictions: [0 0 0 0 0]
Sample loss: 1.1202704906463623
Number of training batches: 126
Number of evaluation batches: 19
В качестве параметров обучения мы зададим 50 эпох обучения и early stopping с patience=5. Для ускорения обучения будем использовать CUDA, то есть видеокарту.
# Задаем параметры обучения
num_epochs = 50
patience = 5
best_precision = 0.0
epochs_no_improve = 0
best_model_path = "./best_model_weighted.pth"
# Запуск обучения и тайминга
start_time = time.time()
model.train()
for epoch in range(num_epochs):
train_loss = 0.0
train_predictions = []
train_labels = []
train_progress = tqdm(train_dataloader, desc=f"Epoch {epoch + 1}/{num_epochs} [Train]", leave=False)
for batch in train_progress:
past_values = batch['past_values'].to(device)
target_values = batch['target_values'].to(device)
optimizer.zero_grad()
outputs = model(past_values=past_values)
loss = criterion(outputs.prediction_logits, target_values)
loss.backward()
optimizer.step()
train_loss += loss.item()
predictions = torch.argmax(outputs.prediction_logits, dim=-1)
train_predictions.extend(predictions.cpu().numpy())
train_labels.extend(target_values.cpu().numpy())
train_progress.set_postfix({'loss': loss.item()})
train_loss /= len(train_dataloader)
train_precision = compute_precision(train_labels, train_predictions)
model.eval()
eval_loss = 0.0
eval_predictions = []
eval_labels = []
eval_progress = tqdm(test_dataloader, desc=f"Epoch {epoch + 1}/{num_epochs} [Eval]", leave=False)
with torch.no_grad():
for batch in eval_progress:
past_values = batch['past_values'].to(device)
target_values = batch['target_values'].to(device)
outputs = model(past_values=past_values)
loss = criterion(outputs.prediction_logits, target_values)
eval_loss += loss.item()
predictions = torch.argmax(outputs.prediction_logits, dim=-1)
eval_predictions.extend(predictions.cpu().numpy())
eval_labels.extend(target_values.cpu().numpy())
eval_progress.set_postfix({'loss': loss.item()})
eval_loss /= len(test_dataloader)
eval_precision = compute_precision(eval_labels, eval_predictions)
print(f"\nEpoch {epoch + 1}/{num_epochs}")
print(f"Train Loss: {train_loss:.4f}")
print(f"Train Precision - LONG: {train_precision['precision_LONG']:.4f}, "
f"SHORT: {train_precision['precision_SHORT']:.4f}, "
f"NO_TRADE: {train_precision['precision_NO_TRADE']:.4f}")
print(f"Eval Loss: {eval_loss:.4f}")
print(f"Eval Precision - LONG: {eval_precision['precision_LONG']:.4f}, "
f"SHORT: {eval_precision['precision_SHORT']:.4f}, "
f"NO_TRADE: {eval_precision['precision_NO_TRADE']:.4f}")
if eval_precision['precision_NO_TRADE'] > best_precision:
best_precision = eval_precision['precision_NO_TRADE']
epochs_no_improve = 0
torch.save(model.state_dict(), best_model_path)
print(f"Best model saved with precision_NO_TRADE: {best_precision:.4f}")
else:
epochs_no_improve += 1
if epochs_no_improve >= patience:
print(f"Early stopping triggered after {epoch + 1} epochs")
break
model.train()
# End timing
end_time = time.time()
duration = end_time - start_time
print(f"\nTraining completed in {duration:.2f} seconds.")
Epoch 15/50
Train Loss: 0.8640
Train Precision - LONG: 0.6368, SHORT: 0.5349, NO_TRADE: 0.6224
Eval Loss: 1.6981
Eval Precision - LONG: 0.3108, SHORT: 0.0000, NO_TRADE: 0.5960
Early stopping triggered after 15 epochs
Training completed in 111.28 seconds.
Early stopping остановил обучение на 15 эпохе, так как loss перестал снижаться.
Метрики Precision на тренировочной выборке неплохие:
- LONG = 0.63;
- SHORT = 0.53:
- NO_TRADE = 0.62.
Однако на валидационной выборке мы видим что модель слабая.
# Загрузка лучшей модели
model.load_state_dict(torch.load("./best_model_weighted.pth"))
model.eval()
print("Best model loaded from ./best_model.pth")
# Создаем полный датасет (train + test) для предсказаний
full_dataset = TimeSeriesClassificationDataset(X_scaled, y, context_length)
full_dataloader = DataLoader(full_dataset, batch_size=16, shuffle=False)
# Генерируем предсказания и вероятности
all_predictions = []
all_probabilities = []
all_labels = []
with torch.no_grad():
progress_bar = tqdm(full_dataloader, desc="Generating Predictions", leave=True)
for batch in progress_bar:
past_values = batch['past_values'].to(device)
target_values = batch['target_values'].to(device)
outputs = model(past_values=past_values)
logits = outputs.prediction_logits
predictions = torch.argmax(logits, dim=-1)
probabilities = torch.softmax(logits, dim=-1)
all_predictions.extend(predictions.cpu().numpy())
all_probabilities.extend(probabilities.cpu().numpy())
all_labels.extend(target_values.cpu().numpy())
progress_bar.set_postfix({'batch_size': len(past_values)})
# Преобразуем числа классов обратно в текст
label_map_reverse = {0: 'LONG', 1: 'SHORT', 2: 'NO_TRADE'}
signal_predict = [label_map_reverse[pred] for pred in all_predictions]
signal_true = [label_map_reverse[label] for label in all_labels]
# Создаем столбцы для вероятностей
probabilities = np.array(all_probabilities)
prob_LONG = probabilities[:, 0]
prob_SHORT = probabilities[:, 1]
prob_NO_TRADE = probabilities[:, 2]
# Создаем финальный датафрейм
data_final = data.iloc[context_length-1:].copy().reset_index(drop=True)
data_final['Signal'] = signal_true
data_final['Signal_Predict'] = signal_predict
data_final['prob_LONG'] = prob_LONG
data_final['prob_SHORT'] = prob_SHORT
data_final['prob_NO_TRADE'] = prob_NO_TRADE
# Проверка данных
print("Final DataFrame shape:", data_final.shape)
print("First 5 rows of final DataFrame:")
print(data_final[['Date', 'Signal', 'Signal_Predict', 'prob_LONG', 'prob_SHORT', 'prob_NO_TRADE']].head().to_string())
print("\nPrecision on full dataset:")
full_precision = compute_precision(all_labels, all_predictions)
print(f"Precision - LONG: {full_precision['precision_LONG']:.4f}, "
f"SHORT: {full_precision['precision_SHORT']:.4f}, "
f"NO_TRADE: {full_precision['precision_NO_TRADE']:.4f}")
# Строим и визуализируем матрицу несоответствий
labels = ['LONG', 'SHORT', 'NO_TRADE']
cm = confusion_matrix(signal_true, signal_predict, labels=labels)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=labels, yticklabels=labels)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.savefig('confusion_matrix.png')
print("\nConfusion matrix saved as 'confusion_matrix.png'")
# Расчет статистик по ошибочным прогнозам
total_samples = len(data_final)
# 1) Signal_Predict = LONG когда Signal != LONG
long_errors = len(data_final[(data_final['Signal_Predict'] == 'LONG') & (data_final['Signal'] != 'LONG')])
long_error_percent = (long_errors / total_samples) * 100
# 2) Signal_Predict = SHORT когда Signal != SHORT
short_errors = len(data_final[(data_final['Signal_Predict'] == 'SHORT') & (data_final['Signal'] != 'SHORT')])
short_error_percent = (short_errors / total_samples) * 100
# 3) Signal_Predict = NO_TRADE когда Signal != NO_TRADE
no_trade_errors = len(data_final[(data_final['Signal_Predict'] == 'NO_TRADE') & (data_final['Signal'] != 'NO_TRADE')])
no_trade_error_percent = (no_trade_errors / total_samples) * 100
# Вывод статистик ошибок
print("\nError Statistics:")
print(f"1) Signal_Predict = LONG when Signal != LONG: {long_errors} samples ({long_error_percent:.2f}%)")
print(f"2) Signal_Predict = SHORT when Signal != SHORT: {short_errors} samples ({short_error_percent:.2f}%)")
print(f"3) Signal_Predict = NO_TRADE when Signal != NO_TRADE: {no_trade_errors} samples ({no_trade_error_percent:.2f}%)")
Best model loaded from ./best_model.pth
Generating Predictions: 100%|██████████| 152/152 [00:04<00:00, 33.62it/s, batch_size=14]
Final DataFrame shape: (2430, 12)
First 5 rows of final DataFrame:
Date Signal Signal_Predict prob_LONG prob_SHORT prob_NO_TRADE
0 2018-09-05 SHORT LONG 0.558355 0.045573 0.396072
1 2018-09-06 NO_TRADE LONG 0.607106 0.039679 0.353215
2 2018-09-07 NO_TRADE LONG 0.678753 0.029567 0.291680
3 2018-09-08 NO_TRADE LONG 0.759127 0.019339 0.221534
4 2018-09-09 NO_TRADE LONG 0.813804 0.013379 0.172816
Precision on full dataset:
Precision - LONG: 0.2478, SHORT: 0.0000, NO_TRADE: 0.5217
Confusion matrix saved as 'confusion_matrix.png'
Error Statistics:
1) Signal_Predict = LONG when Signal != LONG: 944 samples (38.85%)
2) Signal_Predict = SHORT when Signal != SHORT: 0 samples (0.00%)
3) Signal_Predict = NO_TRADE when Signal != NO_TRADE: 562 samples (23.13%)
Рис. 16: Матрица несоответствий (confusion matrix) реальных классов и прогнозных
Увы, но судя по матрице несоответствий наша модель совсем не обучилась предсказывать SHORT — по нулям. Что касаемо LONG, то PatchTST спрогнозировал верное направление роста цены лишь в 311 случаях из 1255. Лучше всего модель смогла предсказать избегание торговли (NO_TRADE) — в 613 случаях из 1175 это было верным прогнозом.
А что с вероятностями? Давайте взглянем на них выборочно. Используем sample из функционала pandas чтобы посмотреть случайные строки по разным датам.
data_final[['Date', 'Signal_Predict', 'Signal', 'prob_LONG', 'prob_SHORT', 'prob_NO_TRADE']].sample(10)
Рис. 17: Таблица с датами, реальными и прогнозными сигналами и вероятностями
В целом тут без сюрпризов. Разве что строки 1314 и 311 демонстрируют, что будь вероятности следующего за доминирующим классом повыше, то сигналы были бы правильными.
Итак резюмируем:
Код выше демонстрирует применение PatchTST для прогнозирования направления движения цены Bitcoin. Задача заключалась в предсказании роста на 5% и более, падения на 5% и более или отсутствия значительных движений в течение следующих 5-ти периодов.
Модель показала умеренные результаты: точность предсказания класса NO_TRADE составила 52.2%, что близко к доле этого класса в данных (53.6%), однако предсказания для LONG (24.8%) и особенно SHORT (0%) оказались значительно более слабыми.
Основная сложность связана с экстремальной волатильностью Биткоина, где даже продвинутые архитектуры, такие как PatchTST, сталкиваются с проблемой нестабильных паттернов и шумных данных. Высокий процент ошибок для сигнала LONG (38.8% ложных срабатываний) указывает на склонность модели переоценивать вероятность роста, что может быть связано с дисбалансом классов и недостаточной настройкой гиперпараметров. Отсутствие точных предсказаний для SHORT, вероятно, обусловлено меньшей долей этого класса в данных (19.5%) и сложностью улавливания резких падений в краткосрочной перспективе.
Несмотря на это, PatchTST демонстрирует потенциал для задач классификации временных рядов, особенно при наличии более стабильных данных или более тщательной предобработки.
Что было интересного реализовано в этом коде:
- Определение сигналов: Созданы метки LONG (рост ≥5%), SHORT (падение ≥5%) и NO_TRADE (изменение <5%) на основе лаговых столбцов, после чего мы используем эти метки чтобы оценить способность модели предсказывать верное направление тренда.
- Множество признаков для обучения: Цена открытия (open), закрытия (close), максимум и минимум (max, min), typical_close как среднее (High + Low + Close) / 3 для учета размаха цен внутри свечи, плюс объем (volume).
- Разделение данных: Использован TimeSeriesSplit с 5 фолдами для кросс-валидации, с постепенным ростом train выборки.
- Настройка модели: PatchTSTForClassification сконфигурирована с 6 входными каналами, контекстом 128, патчами длиной 16 и 4 слоями трансформера.
- Обучение модели: Применена функция потерь CrossEntropyLoss с весами классов для учета их дисбаланса, обучение проводилось до 50 эпох с ранней остановкой.
- Оценка результатов: Вычислены метрики Precision для каждого класса, построена матрица ошибок и проанализированы ложные срабатывания. Почему Precision? Потому что конкретно в этой задаче нам точность важнее полноты и других метрик, так как потенциальный инвестор в каждой сделке рискует деньгами.
Заключение
Трансформер для временных рядов PatchTST представляет собой значительный шаг вперед в развитии архитектур для прогнозирования. Что интересно — эта модель заложила целый старт развития многих других трансформеров на основе патчей: TimesFM, TimesNet, Crossformer, PatchMixer и др.
Результаты исследования подтверждают три ключевых тезиса о современном состоянии области:
- Центральная инновация — разбиение временных рядов на патчи — решила проблему масштабируемости трансформеров, снизив вычислительную сложность с O(L²) до O((L/M)²). Это позволяет эффективно обрабатывать длинные последовательности без существенного роста требований к памяти и времени вычислений;
- Стратегия channel independence, вопреки интуитивным ожиданиям, показала превосходство над многомерными подходами. Отказ от явного моделирования межканальных зависимостей снижает риск переобучения и повышает обобщающую способность модели на реальных данных с нестабильными корреляциями;
- Улучшение метрик качества на 20-50% при сокращении времени обучения в 2-3 раза делает PatchTST практически применимой архитектурой для production систем.
Таким образом, PatchTST закладывает основу для нового поколения transformer-based решений в области временных рядов, где эффективность архитектуры сочетается с практической применимостью для широкого спектра аналитических задач.
Что касаемо областей применения. Эта нейронная сеть показывает высокую эффективность на структурированных финансовых данных с выраженными циклическими паттернами. Например, в задаче анализа государственных облигаций она продемонстрировала высокую точность долгосрочного прогнозирования. В высоковолатильных сегментах, таких как криптовалюты, модель показывает неплохой потенциал, хотя до высокой точности прогнозов тут еще далеко. Хотя, есть вероятность, что модели нужны более качественные классы, больше фичей для обучения, либо более сложная архитектура, либо интеграция с другими моделями.