Методы аппроксимации временных рядов

Методы аппроксимации призваны уловить закономерности за хаосом движений, а также представить сложные данные в виде простой модели, сохраняя при этом их суть. Между тем, качественная аппроксимация — это не просто подбор кривой, которая «красиво ложится» на исторические данные. Это искусство извлечения сигнала из шума, способность выделить структурные паттерны, которые сохранят свою предсказательную силу в будущем.

Проблема аппроксимации временных рядов кроется в самой природе финансовых данных. В отличие от физических процессов, где мы можем полагаться на фундаментальные законы, финансовые ряды представляют собой результат взаимодействия миллионов участников рынка, каждый из которых принимает решения на основе неполной информации. Это создает нелинейные зависимости, структурные сдвиги и режимные переключения, которые делают традиционные методы аппроксимации малоэффективными.

Теоретические основы аппроксимации временных рядов

Прежде чем погружаться в конкретные методы, важно понимать специфику данных, с которыми мы работаем. Финансовые временные ряды обладают рядом характерных свойств, которые делают их аппроксимацию нетривиальной задачей.

Первое и наиболее очевидное свойство — это нестационарность. Статистические свойства финансовых рядов изменяются во времени, что означает невозможность простого экстраполирования прошлых закономерностей на будущее.

Второе критически важное свойство — это гетероскедастичность, или изменчивость волатильности во времени. Периоды низкой волатильности сменяются периодами высокой волатильности, причем эти переходы часто происходят резко и непредсказуемо. Это создает дополнительные сложности для методов аппроксимации, которые должны учитывать не только изменения в средних значениях, но и в дисперсии процесса.

Третья особенность — наличие «толстых хвостов» в распределениях доходностей. Экстремальные события происходят гораздо чаще, чем предсказывает нормальное распределение, что требует использования более робастных методов аппроксимации, способных справляться с выбросами и структурными сдвигами.

Фундаментальные принципы эффективной аппроксимации

Основываясь на многолетнем опыте анализа финансовых данных, я хочу выделить несколько ключевых принципов, которые определяют успешность аппроксимации временных рядов:

  1. Баланс между сложностью модели и ее обобщающей способностью. Слишком простые модели не способны уловить сложные нелинейные зависимости в данных, в то время как излишне сложные модели склонны к переобучению и плохо работают на новых данных.
  2. Адаптивность модели. Эффективная аппроксимация должна быть способна адаптироваться к изменяющимся режимам рынка. Это может достигаться через использование скользящих окон, экспоненциального взвешивания более свежих данных, или через явное моделирование структурных сдвигов.
  3. Учет многомасштабной структуры данных. Финансовые временные ряды содержат паттерны на различных временных масштабах — от внутридневных колебаний до долгосрочных трендов. Эффективная аппроксимация должна иметь способность смоделировать эти различные компоненты.

Полиномиальная аппроксимация

Полиномиальная аппроксимация представляет собой один из наиболее интуитивно понятных подходов к моделированию временных рядов. Основная идея заключается в представлении исследуемого ряда в виде полинома степени n от времени.

Полиномиальную аппроксимацию аналитики любят за ее простоту, интепретируемость и красивые линии при сглаживании данных. Однако следует помнить, что этот метод имеет серьезные ограничения при работе с финансовыми данными.

Главная проблема полиномиальной аппроксимации заключается в ее глобальном характере. Изменение даже одной точки данных влияет на все коэффициенты полинома, что делает метод крайне чувствительным к выбросам. В контексте финансовых данных, где экстремальные события являются нормой, а не исключением, это создает серьезные проблемы.

Давайте рассмотрим эти ограничения через построение полиномов на Python:

import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score
from sklearn.metrics import mean_squared_error

# Генерация синтетических данных: случайное блуждание + шум + высокочастотные изгибы
np.random.seed(42)
n_points = 500
time = np.linspace(0, 10, n_points)

# Случайные шаги с высоким шумом
steps = np.random.normal(loc=0, scale=2.5, size=n_points)
prices = 100 + np.cumsum(steps)

# Высокочастотные синусоидальные колебания
freq_oscillations = (
    5 * np.sin(8 * np.pi * time) +
    3 * np.sin(20 * np.pi * time) +
    2 * np.sin(35 * np.pi * time)
)
prices += freq_oscillations

# Дополнительный шум
noise = np.random.normal(0, 4.0, n_points)
synthetic_prices = prices + noise

# Рыночные шоки
shock_indices = [100, 250, 400]
for idx in shock_indices:
    shock = np.random.normal(loc=0, scale=20, size=min(5, n_points - idx))
    synthetic_prices[idx:idx+len(shock)] += shock

# Степени полиномов
degrees = [3, 6, 8, 12]
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
axes = axes.ravel()

for i, degree in enumerate(degrees):
    poly_pipeline = Pipeline([
        ('poly', PolynomialFeatures(degree=degree)),
        ('linear', LinearRegression())
    ])
    
    X = time.reshape(-1, 1)
    y = synthetic_prices
    poly_pipeline.fit(X, y)

    # Предсказание на тренировочных данных
    y_train_pred = poly_pipeline.predict(X)
    train_mse = mean_squared_error(y, y_train_pred)

    # Кросс-валидация
    cv_scores = cross_val_score(poly_pipeline, X, y, cv=5,
                                scoring='neg_mean_squared_error')
    cv_mse = -np.mean(cv_scores)

    # Переобучение: разница между ошибками
    overfit_gap = train_mse - cv_mse

    # Предсказание в пределах обучающего диапазона
    time_extended = np.linspace(0, 10, 500)
    X_extended = time_extended.reshape(-1, 1)
    y_pred = poly_pipeline.predict(X_extended)

    # Визуализация
    axes[i].plot(time, y, 'ko', markersize=2, alpha=0.6,
                 label='Исходные данные')
    axes[i].plot(time_extended, y_pred, 'b-', linewidth=2,
                 label=f'Полином степени {degree}')
    axes[i].set_title(
        f'Степень {degree}\nTrain MSE: {train_mse:.2f}, CV MSE: {cv_mse:.2f}, Gap: {overfit_gap:.2f}')
    axes[i].set_xlabel('Время')
    axes[i].set_ylabel('Цена')
    axes[i].legend()
    axes[i].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

Визуализация аппроксимации одного и того же временного ряда разными степенями полинома

Рис. 1: Визуализация аппроксимации одного и того же временного ряда разными степенями полинома

Представленный код демонстрирует ключевые проблемы полиномиальной аппроксимации при работе с финансовыми данными. Модели низкой степени (2-6) не способны адекватно описать сложную структуру данных, особенно в присутствии рыночных шоков. С другой стороны, полиномы высокой степени (7-15) демонстрируют классические признаки переобучения — они точно следуют обучающим данным, но дают неадекватные экстраполяции за пределами обучающей выборки.

Особенно показательным является анализ коэффициентов полинома высокой степени. Их большие значения указывают на численную неустойчивость модели, что делает ее крайне чувствительной к небольшим изменениям во входных данных. В контексте финансового моделирования это означает, что незначительные пересмотры исторических данных могут кардинально изменить прогнозы модели.

Кроме того, полиномиальная аппроксимация плохо справляется с локальными особенностями данных. Рыночные шоки, которые я включил в синтетические данные, приводят к глобальным изменениям формы аппроксимирующего полинома, что не соответствует реальной природе таких событий.

Аппроксимация сплайнами

Сплайн-аппроксимация представляет собой значительный шаг вперед по сравнению с полиномиальными методами. Основная идея заключается в разбиении временного ряда на отдельные сегменты и аппроксимации каждого сегмента полиномом невысокой степени с обеспечением гладкости в точках соединения (узлах).

Ключевое преимущество сплайнов заключается в их локальности — изменение данных в одном сегменте влияет только на соседние сегменты, что делает метод более робастным к выбросам. Кроме того, сплайны обеспечивают лучший баланс между гибкостью аппроксимации и устойчивостью к переобучению.

from scipy.interpolate import UnivariateSpline, BSpline, splrep
from scipy.optimize import minimize_scalar
import numpy as np

def adaptive_knot_selection(time_series, time_points, max_knots=20):
    """
    Адаптивный выбор узлов на основе локальной кривизны временного ряда
    """
    # Вычисляем первую и вторую производные
    dt = np.diff(time_points)
    dy = np.diff(time_series)
    first_deriv = dy / dt
    
    second_deriv = np.diff(first_deriv) / dt[:-1]
    
    # Находим точки с максимальной кривизной
    curvature = np.abs(second_deriv)
    
    # Нормализуем кривизну
    curvature_normalized = curvature / np.max(curvature)
    
    # Выбираем узлы на основе порогового значения кривизны
    threshold = np.percentile(curvature_normalized, 70)
    knot_candidates = time_points[1:-1][curvature_normalized > threshold]
    
    # Ограничиваем количество узлов
    if len(knot_candidates) > max_knots:
        # Выбираем узлы с максимальной кривизной
        top_indices = np.argsort(curvature_normalized)[-max_knots:]
        knot_candidates = time_points[1:-1][top_indices]
    
    return np.sort(knot_candidates)

# Генерация более сложного синтетического ряда
np.random.seed(123)
n_points = 300
time = np.linspace(0, 15, n_points)

# Создаем ряд с несколькими режимами и структурными сдвигами
regime1 = 100 + 2 * time[:100] + 0.5 * np.sin(time[:100] * 2)
regime2 = regime1[-1] + 0.5 * (time[100:200] - time[100]) + \
          2 * np.sin(time[100:200] * 0.5)
regime3 = regime2[-1] - 1.5 * (time[200:] - time[200]) + \
          np.sin(time[200:] * 3)

complex_series = np.concatenate([regime1, regime2, regime3])
complex_series += np.random.normal(0, 0.8, n_points)

# Применяем различные методы сплайн-аппроксимации
fig, axes = plt.subplots(2, 2, figsize=(16, 12))

# 1. Кубический сплайн с равномерными узлами
uniform_knots = np.linspace(time[5], time[-5], 15)
tck_uniform = splrep(time, complex_series, t=uniform_knots, k=3)
spline_uniform = BSpline(*tck_uniform)

# 2. Кубический сплайн с адаптивными узлами
adaptive_knots = adaptive_knot_selection(complex_series, time)
tck_adaptive = splrep(time, complex_series, t=adaptive_knots, k=3)
spline_adaptive = BSpline(*tck_adaptive)

# 3. Сглаживающий сплайн с оптимальным параметром сглаживания
def cross_validation_smoothing(time_data, series_data, s_values):
    """Кросс-валидация для выбора оптимального параметра сглаживания"""
    cv_errors = []
    n_folds = 5
    fold_size = len(series_data) // n_folds
    
    for s in s_values:
        fold_errors = []
        for fold in range(n_folds):
            start_idx = fold * fold_size
            end_idx = (fold + 1) * fold_size if fold < n_folds - 1 else len(series_data)
            
            # Обучающая и тестовая выборки
            train_mask = np.ones(len(series_data), dtype=bool)
            train_mask[start_idx:end_idx] = False
            
            time_train = time_data[train_mask]
            series_train = series_data[train_mask]
            time_test = time_data[~train_mask]
            series_test = series_data[~train_mask]
            
            # Обучение сплайна
            spline = UnivariateSpline(time_train, series_train, s=s)
            
            # Предсказание и ошибка
            pred_test = spline(time_test)
            mse = np.mean((series_test - pred_test)**2)
            fold_errors.append(mse)
        
        cv_errors.append(np.mean(fold_errors))
    
    return cv_errors

# Поиск оптимального параметра сглаживания
s_values = np.logspace(-2, 3, 50)
cv_errors = cross_validation_smoothing(time, complex_series, s_values)
optimal_s = s_values[np.argmin(cv_errors)]

smoothing_spline = UnivariateSpline(time, complex_series, s=optimal_s)

# 4. B-сплайн с регуляризацией
from sklearn.linear_model import Ridge
from sklearn.preprocessing import SplineTransformer

spline_transformer = SplineTransformer(n_knots=20, degree=3)
X_spline = spline_transformer.fit_transform(time.reshape(-1, 1))

# Используем Ridge регрессию для регуляризации
ridge_alpha_values = [0.1, 1.0, 10.0, 100.0]
ridge_cv_scores = []

for alpha in ridge_alpha_values:
    ridge = Ridge(alpha=alpha)
    scores = cross_val_score(ridge, X_spline, complex_series, cv=5, 
                           scoring='neg_mean_squared_error')
    ridge_cv_scores.append(np.mean(scores))

optimal_alpha = ridge_alpha_values[np.argmax(ridge_cv_scores)]
ridge_spline = Ridge(alpha=optimal_alpha)
ridge_spline.fit(X_spline, complex_series)

# Визуализация результатов
time_dense = np.linspace(0, 15, 1000)
X_dense = spline_transformer.transform(time_dense.reshape(-1, 1))

axes[0, 0].plot(time, complex_series, 'ko', markersize=2, alpha=0.6, 
                label='Исходные данные')
axes[0, 0].plot(time_dense, spline_uniform(time_dense), 'k-', linewidth=2, 
                label='Равномерные узлы')
axes[0, 0].scatter(uniform_knots, spline_uniform(uniform_knots), 
                  color='red', s=30, label='Узлы')
axes[0, 0].set_title('Кубический сплайн с равномерными узлами')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

axes[0, 1].plot(time, complex_series, 'ko', markersize=2, alpha=0.6, 
                label='Исходные данные')
axes[0, 1].plot(time_dense, spline_adaptive(time_dense), 'k-', linewidth=2, 
                label='Адаптивные узлы')
axes[0, 1].scatter(adaptive_knots, spline_adaptive(adaptive_knots), 
                  color='red', s=30, label='Узлы')
axes[0, 1].set_title('Кубический сплайн с адаптивными узлами')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

axes[1, 0].plot(time, complex_series, 'ko', markersize=2, alpha=0.6, 
                label='Исходные данные')
axes[1, 0].plot(time_dense, smoothing_spline(time_dense), 'k-', linewidth=2, 
                label=f'Сглаживающий сплайн (s={optimal_s:.1e})')
axes[1, 0].set_title('Сглаживающий сплайн с оптимальным параметром')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)

axes[1, 1].plot(time, complex_series, 'ko', markersize=2, alpha=0.6, 
                label='Исходные данные')
axes[1, 1].plot(time_dense, ridge_spline.predict(X_dense), 'k-', linewidth=2, 
                label=f'B-сплайн + Ridge (α={optimal_alpha})')
axes[1, 1].set_title('B-сплайн с Ridge регуляризацией')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Анализ качества аппроксимации
print("Сравнение методов сплайн-аппроксимации:")
print(f"MSE равномерные узлы: {np.mean((complex_series - spline_uniform(time))**2):.4f}")
print(f"MSE адаптивные узлы: {np.mean((complex_series - spline_adaptive(time))**2):.4f}")
print(f"MSE сглаживающий сплайн: {np.mean((complex_series - smoothing_spline(time))**2):.4f}")
print(f"MSE B-сплайн + Ridge: {np.mean((complex_series - ridge_spline.predict(X_spline))**2):.4f}")
print(f"Оптимальный параметр сглаживания: {optimal_s:.2e}")
print(f"Оптимальный параметр регуляризации: {optimal_alpha}")

Аппроксимация временного ряда кубическими сплайнами, сглаживающим сплайном и B-spline Ridge

Рис. 2: Аппроксимация временного ряда кубическими сплайнами, сглаживающим сплайном и B-spline Ridge

Сравнение методов сплайн-аппроксимации:
MSE равномерные узлы: 0.6926
MSE адаптивные узлы: 0.6664
MSE сглаживающий сплайн: 1.0297
MSE B-сплайн + Ridge: 0.6216
Оптимальный параметр сглаживания: 3.09e+02
Оптимальный параметр регуляризации: 0.1

Данный пример демонстрирует несколько продвинутых подходов к сплайн-аппроксимации, каждый из которых имеет свои преимущества в различных ситуациях.

Адаптивный выбор узлов на основе локальной кривизны позволяет сосредоточить вычислительные ресурсы на наиболее сложных участках временного ряда, где происходят резкие изменения или структурные сдвиги.

Сглаживающий сплайн с оптимизированным параметром сглаживания представляет собой компромисс между точностью аппроксимации и гладкостью результирующей кривой. Использование кросс-валидации для выбора параметра сглаживания помогает избежать переобучения и обеспечивает лучшую обобщающую способность модели.

B-сплайны с Ridge регуляризацией сочетают гибкость сплайн-представления с мощными методами регуляризации из машинного обучения. Это позволяет эффективно контролировать сложность модели и предотвращать переобучение даже при большом количестве базисных функций.

Важно отметить, что различные методы показывают разную эффективность на разных участках временного ряда. Адаптивные узлы лучше справляются с резкими переходами между режимами, в то время как сглаживающие сплайны дают более стабильные результаты на участках с высоким уровнем шума.

Гауссовские процессы для аппроксимации рядов

Гауссовские процессы представляют собой мощный непараметрический байесовский метод для аппроксимации и прогнозирования временных рядов. В отличие от параметрических моделей, которые предполагают конкретную функциональную форму, гауссовские процессы определяют распределение над функциями, что позволяет естественным образом моделировать неопределенность в прогнозах.

Основная привлекательность гауссовских процессов в контексте финансового моделирования заключается в их способности предоставлять не только точечные оценки, но и доверительные интервалы для прогнозов. Это критически важно в условиях высокой волатильности финансовых рынков, где оценка неопределенности часто не менее важна, чем сам прогноз.

import numpy as np
import matplotlib.pyplot as plt
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, WhiteKernel, Matern, RationalQuadratic
from sklearn.model_selection import cross_val_score
from sklearn.metrics import mean_squared_error
import warnings
warnings.filterwarnings('ignore')

def generate_financial_data_with_regimes(n_points=200, noise_level=1.0):
    """
    Генерирует финансовые данные с несколькими режимами и структурными сдвигами
    """
    np.random.seed(42)
    t = np.linspace(0, 10, n_points)
    
    # Три различных режима
    regime1_end = n_points // 3
    regime2_end = 2 * n_points // 3
    
    # Режим 1: восходящий тренд с умеренной волатильностью
    regime1 = 100 + 2 * t[:regime1_end] + 0.5 * np.sin(t[:regime1_end] * 3)
    
    # Режим 2: боковое движение с высокой волатильностью
    regime2_base = regime1[-1]
    regime2 = regime2_base + 0.2 * (t[regime1_end:regime2_end] - t[regime1_end]) + \
              3 * np.sin(t[regime1_end:regime2_end] * 2)
    
    # Режим 3: нисходящий тренд
    regime3_base = regime2[-1]
    regime3 = regime3_base - 1.5 * (t[regime2_end:] - t[regime2_end]) + \
              0.8 * np.cos(t[regime2_end:] * 4)
    
    # Объединяем режимы
    base_series = np.concatenate([regime1, regime2, regime3])
    
    # Добавляем гетероскедастичный шум
    volatility = np.ones(n_points)
    volatility[regime1_end:regime2_end] *= 2.0  # Повышенная волатильность во втором режиме
    
    noise = np.random.normal(0, noise_level, n_points) * volatility
    
    return t, base_series + noise

class AdvancedGaussianProcessApproximator:
    """
    Продвинутый класс для аппроксимации временных рядов с использованием гауссовских процессов
    """
    
    def __init__(self):
        self.models = {}
        self.kernel_names = {}
        
    def create_kernels(self, length_scale_bounds=(1e-2, 1e2), noise_level_bounds=(1e-5, 1e1)):
        """
        Создаем набор ядер для различных типов данных
        """
        kernels = {
            'RBF': RBF(length_scale=1.0, length_scale_bounds=length_scale_bounds) + 
                   WhiteKernel(noise_level=0.1, noise_level_bounds=noise_level_bounds),
            
            'Matern_3/2': Matern(length_scale=1.0, length_scale_bounds=length_scale_bounds, nu=1.5) + 
                          WhiteKernel(noise_level=0.1, noise_level_bounds=noise_level_bounds),
            
            'Matern_5/2': Matern(length_scale=1.0, length_scale_bounds=length_scale_bounds, nu=2.5) + 
                          WhiteKernel(noise_level=0.1, noise_level_bounds=noise_level_bounds),
            
            'RationalQuadratic': RationalQuadratic(length_scale=1.0, alpha=1.0, 
                                                 length_scale_bounds=length_scale_bounds) + 
                               WhiteKernel(noise_level=0.1, noise_level_bounds=noise_level_bounds),
            
            'Composite': RBF(length_scale=1.0) * RBF(length_scale=0.1) + 
                        Matern(length_scale=2.0, nu=1.5) + 
                        WhiteKernel(noise_level=0.1)
        }
        
        return kernels
    
    def fit_and_compare_kernels(self, X_train, y_train, X_test=None):
        """
        Обучаем модели с различными ядрами и сравниваем их производительность
        """
        kernels = self.create_kernels()
        results = {}
        
        for kernel_name, kernel in kernels.items():
            print(f"Обучение модели с ядром {kernel_name}...")
            
            # Создаем и обучаем модель
            gp = GaussianProcessRegressor(
                kernel=kernel, 
                alpha=1e-6,  # Небольшая регуляризация для численной стабильности
                normalize_y=True,  # Нормализация для лучшей сходимости
                n_restarts_optimizer=5  # Множественные запуски оптимизации
            )
            
            # Обучение модели
            gp.fit(X_train, y_train)
            
            # Вычисляем log-likelihood и cross-validation score
            log_likelihood = gp.log_marginal_likelihood()
            cv_scores = cross_val_score(gp, X_train, y_train, cv=5, 
                                      scoring='neg_mean_squared_error')
            
            self.models[kernel_name] = gp
            self.kernel_names[kernel_name] = kernel_name
            
            results[kernel_name] = {
                'log_likelihood': log_likelihood,
                'cv_score': np.mean(cv_scores),
                'cv_std': np.std(cv_scores),
                'optimized_kernel': gp.kernel_
            }
            
            print(f"  Log-likelihood: {log_likelihood:.2f}")
            print(f"  CV Score: {np.mean(cv_scores):.4f} ± {np.std(cv_scores):.4f}")
        
        return results
    
    def predict_with_uncertainty(self, model_name, X_pred):
        """
        Делаем прогноз с оценкой неопределенности
        """
        if model_name not in self.models:
            raise ValueError(f"Модель {model_name} не найдена")
        
        gp = self.models[model_name]
        y_pred, y_std = gp.predict(X_pred, return_std=True)
        
        return y_pred, y_std

# Генерируем данные и применяем гауссовские процессы
time_points, financial_series = generate_financial_data_with_regimes(200, noise_level=0.8)

# Разделяем на обучающую и тестовую выборки
split_point = int(0.8 * len(financial_series))
X_train = time_points[:split_point].reshape(-1, 1)
y_train = financial_series[:split_point]
X_test = time_points[split_point:].reshape(-1, 1)
y_test = financial_series[split_point:]

# Создаем точки для прогнозирования (включая экстраполяцию)
X_pred = np.linspace(0, 12, 300).reshape(-1, 1)

# Инициализируем и обучаем модели
gp_approximator = AdvancedGaussianProcessApproximator()
results = gp_approximator.fit_and_compare_kernels(X_train, y_train, X_test)

# Визуализируем результаты для разных ядер
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
axes = axes.ravel()

kernel_names = ['RBF', 'Matern_3/2', 'Matern_5/2', 'RationalQuadratic', 'Composite']

for i, kernel_name in enumerate(kernel_names):
    if i >= len(axes):
        break
    
    # Получаем прогнозы
    y_pred, y_std = gp_approximator.predict_with_uncertainty(kernel_name, X_pred)
    
    # Визуализация
    axes[i].plot(time_points, financial_series, 'ko', markersize=3, alpha=0.6, 
                label='Исходные данные')
    axes[i].plot(X_pred.flatten(), y_pred, 'k-', linewidth=2, 
                label='GP аппроксимация')
    
    # Доверительные интервалы
    axes[i].fill_between(X_pred.flatten(), 
                        y_pred - 1.96 * y_std, 
                        y_pred + 1.96 * y_std, 
                        alpha=0.2, color='gray', label='95% доверительный интервал')
    axes[i].fill_between(X_pred.flatten(), 
                        y_pred - y_std, 
                        y_pred + y_std, 
                        alpha=0.3, color='lightgray', label='68% доверительный интервал')
    
    # Отмечаем границу обучения
    axes[i].axvline(x=time_points[split_point], color='red', linestyle='--', 
                   alpha=0.7, label='Граница обучения')
    
    # Вычисляем MSE на тестовой выборке
    y_test_pred, _ = gp_approximator.predict_with_uncertainty(kernel_name, X_test)
    test_mse = mean_squared_error(y_test, y_test_pred)
    
    axes[i].set_title(f'{kernel_name}\nLog-likelihood: {results[kernel_name]["log_likelihood"]:.2f}, '
                     f'Test MSE: {test_mse:.2f}')
    axes[i].legend(fontsize=8)
    axes[i].grid(True, alpha=0.3)
    axes[i].set_xlabel('Время')
    axes[i].set_ylabel('Значение')

# Удаляем лишние подграфики
if len(kernel_names) < len(axes):
    for i in range(len(kernel_names), len(axes)):
        fig.delaxes(axes[i])

plt.tight_layout()
plt.show()

# Анализ параметров обученных ядер
print("\nАнализ оптимизированных параметров ядер:")
print("=" * 60)
for kernel_name, result in results.items():
    print(f"\n{kernel_name}:")
    print(f"  Оптимизированное ядро: {result['optimized_kernel']}")
    print(f"  Log-likelihood: {result['log_likelihood']:.4f}")
    print(f"  CV Score: {result['cv_score']:.4f} ± {result['cv_std']:.4f}")

# Сравнение качества экстраполяции
print("\nСравнение качества экстраполяции:")
print("-" * 40)
extrapolation_points = np.linspace(10, 12, 50).reshape(-1, 1)

for kernel_name in kernel_names:
    y_extrap, y_extrap_std = gp_approximator.predict_with_uncertainty(
        kernel_name, extrapolation_points)
    
    # Средняя неопределенность в области экстраполяции
    mean_uncertainty = np.mean(y_extrap_std)
    print(f"{kernel_name}: средняя неопределенность = {mean_uncertainty:.2f}")

Аппроксимации гауссовского процесса для разных ядер, показывающие прогнозы с доверительными интервалами и границей обучающей выборки

Рис. 3: Аппроксимации гауссовского процесса для разных ядер, показывающие прогнозы с доверительными интервалами и границей обучающей выборки

Анализ оптимизированных параметров ядер:
============================================================

RBF:
  Оптимизированное ядро: RBF(length_scale=0.906) + WhiteKernel(noise_level=0.169)
  Log-likelihood: -104.3451
  CV Score: -4.6035 ± 3.6325

Matern_3/2:
  Оптимизированное ядро: Matern(length_scale=1.63, nu=1.5) + WhiteKernel(noise_level=0.168)
  Log-likelihood: -105.7941
  CV Score: -4.7397 ± 3.4927

Matern_5/2:
  Оптимизированное ядро: Matern(length_scale=1.31, nu=2.5) + WhiteKernel(noise_level=0.169)
  Log-likelihood: -105.0289
  CV Score: -4.5696 ± 3.5394

RationalQuadratic:
  Оптимизированное ядро: RationalQuadratic(alpha=1e+05, length_scale=0.906) + WhiteKernel(noise_level=0.169)
  Log-likelihood: -104.3451
  CV Score: -4.9070 ± 3.7921

Composite:
  Оптимизированное ядро: RBF(length_scale=0.911) * RBF(length_scale=3.33e+03) + Matern(length_scale=5.92, nu=1.5) + WhiteKernel(noise_level=0.169)
  Log-likelihood: -104.6870
  CV Score: -4.1689 ± 3.1738

Сравнение качества экстраполяции:
----------------------------------------
RBF: средняя неопределенность = 3.07
Matern_3/2: средняя неопределенность = 3.01
Matern_5/2: средняя неопределенность = 3.05
RationalQuadratic: средняя неопределенность = 3.07
Composite: средняя неопределенность = 3.85

Применение гауссовских процессов демонстрирует несколько ключевых преимуществ при аппроксимации финансовых временных рядов. Во-первых, различные ядра показывают разную эффективность в зависимости от характеристик данных. RBF ядро хорошо подходит для гладких трендов, в то время как ядра Matern лучше справляются с менее гладкими функциями, что часто встречается в финансовых данных.

Читайте также:  Прогнозирование трафика и конверсий сайта с помощью XGBoost

Особенно важным является автоматическое моделирование неопределенности. Доверительные интервалы естественным образом расширяются в областях с меньшим количеством данных и при экстраполяции за пределы обучающей выборки. Это позволяет количественно оценить надежность прогнозов и принимать более обоснованные решения в условиях неопределенности.

Композитные ядра, объединяющие несколько базовых ядер, часто показывают лучшие результаты на сложных данных, поскольку они способны моделировать множественные паттерны одновременно. Однако их интерпретация становится более сложной, и возрастает риск переобучения.

Практические аспекты применения гауссовских процессов

В реальных применениях важно учитывать вычислительную сложность гауссовских процессов, которая растет как O(n³) от размера обучающей выборки. Для работы с большими временными рядами существует несколько стратегий оптимизации.

from sklearn.cluster import KMeans
from scipy.spatial.distance import cdist

def sparse_gp_approximation(X_train, y_train, n_inducing_points=50):
    """
    Разреженная аппроксимация гауссовского процесса с использованием индуцирующих точек
    """
    # Выбираем индуцирующие точки с помощью k-means кластеризации
    if len(X_train) > n_inducing_points:
        kmeans = KMeans(n_clusters=n_inducing_points, random_state=42, n_init=10)
        kmeans.fit(X_train)
        inducing_points = kmeans.cluster_centers_
        
        # Находим ближайшие точки из обучающей выборки
        distances = cdist(inducing_points, X_train)
        nearest_indices = np.argmin(distances, axis=1)
        
        X_sparse = X_train[nearest_indices]
        y_sparse = y_train[nearest_indices]
    else:
        X_sparse = X_train
        y_sparse = y_train
    
    return X_sparse, y_sparse

def local_gp_prediction(X_train, y_train, X_pred, local_window=0.3, kernel_type='RBF'):
    """
    Локальное предсказание с использованием скользящего окна
    """
    predictions = np.zeros(len(X_pred))
    uncertainties = np.zeros(len(X_pred))
    
    # Определяем размер локального окна
    data_range = np.max(X_train) - np.min(X_train)
    window_size = local_window * data_range
    
    for i, x_pred in enumerate(X_pred):
        # Находим локальную окрестность
        distances = np.abs(X_train.flatten() - x_pred.flatten())
        local_mask = distances <= window_size
        
        if np.sum(local_mask) < 5:  # Минимальное количество точек
            # Используем ближайшие точки
            sorted_indices = np.argsort(distances)
            local_mask = np.zeros_like(local_mask, dtype=bool)
            local_mask[sorted_indices[:10]] = True
        
        X_local = X_train[local_mask]
        y_local = y_train[local_mask]
        
        # Обучаем локальную модель
        if kernel_type == 'RBF':
            kernel = RBF(length_scale=window_size/3) + WhiteKernel(noise_level=0.01)
        elif kernel_type == 'Matern':
            kernel = Matern(length_scale=window_size/3, nu=1.5) + WhiteKernel(noise_level=0.01)
        
        local_gp = GaussianProcessRegressor(kernel=kernel, alpha=1e-6, normalize_y=True)
        local_gp.fit(X_local, y_local)
        
        # Предсказание
        pred_mean, pred_std = local_gp.predict(x_pred.reshape(1, -1), return_std=True)
        predictions[i] = pred_mean[0]
        uncertainties[i] = pred_std[0]
    
    return predictions, uncertainties

# Применяем методы оптимизации
print("Тестирование оптимизированных методов:")

# 1. Разреженная аппроксимация
X_sparse, y_sparse = sparse_gp_approximation(X_train, y_train, n_inducing_points=30)
print(f"Разреженная выборка: {len(X_sparse)} из {len(X_train)} точек")

# Обучаем на разреженной выборке
sparse_gp = GaussianProcessRegressor(
    kernel=RBF(length_scale=1.0) + WhiteKernel(noise_level=0.1),
    normalize_y=True
)
sparse_gp.fit(X_sparse, y_sparse)

# 2. Локальное предсказание
local_pred, local_std = local_gp_prediction(X_train, y_train, X_pred, local_window=0.25)

# 3. Полная модель для сравнения
full_gp = GaussianProcessRegressor(
    kernel=RBF(length_scale=1.0) + WhiteKernel(noise_level=0.1),
    normalize_y=True
)
full_gp.fit(X_train, y_train)

# Сравнение результатов
y_full_pred, y_full_std = full_gp.predict(X_pred, return_std=True)
y_sparse_pred, y_sparse_std = sparse_gp.predict(X_pred, return_std=True)

# Визуализация сравнения
plt.figure(figsize=(16, 10))

plt.subplot(2, 2, 1)
plt.plot(time_points, financial_series, 'ko', markersize=3, alpha=0.6, label='Данные')
plt.plot(X_pred.flatten(), y_full_pred, 'k-', linewidth=2, label='Полная модель')
plt.fill_between(X_pred.flatten(), y_full_pred - 1.96*y_full_std, 
                y_full_pred + 1.96*y_full_std, alpha=0.2, color='gray')
plt.axvline(x=time_points[split_point], color='red', linestyle='--', alpha=0.7)
plt.title('Полная GP модель')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(2, 2, 2)
plt.plot(time_points, financial_series, 'ko', markersize=3, alpha=0.6, label='Данные')
plt.plot(X_sparse.flatten(), y_sparse, 'ro', markersize=5, alpha=0.8, label='Индуцирующие точки')
plt.plot(X_pred.flatten(), y_sparse_pred, 'r-', linewidth=2, label='Разреженная модель')
plt.fill_between(X_pred.flatten(), y_sparse_pred - 1.96*y_sparse_std, 
                y_sparse_pred + 1.96*y_sparse_std, alpha=0.2, color='red')
plt.axvline(x=time_points[split_point], color='red', linestyle='--', alpha=0.7)
plt.title('Разреженная GP модель')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(2, 2, 3)
plt.plot(time_points, financial_series, 'ko', markersize=3, alpha=0.6, label='Данные')
plt.plot(X_pred.flatten(), local_pred, 'b-', linewidth=2, label='Локальная модель')
plt.fill_between(X_pred.flatten(), local_pred - 1.96*local_std, 
                local_pred + 1.96*local_std, alpha=0.2, color='blue')
plt.axvline(x=time_points[split_point], color='red', linestyle='--', alpha=0.7)
plt.title('Локальная GP модель')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(2, 2, 4)
test_pred_full, _ = full_gp.predict(X_test, return_std=True)
test_pred_sparse, _ = sparse_gp.predict(X_test, return_std=True)
test_pred_local, _ = local_gp_prediction(X_train, y_train, X_test, local_window=0.25)

mse_full = mean_squared_error(y_test, test_pred_full)
mse_sparse = mean_squared_error(y_test, test_pred_sparse)
mse_local = mean_squared_error(y_test, test_pred_local)

methods = ['Полная', 'Разреженная', 'Локальная']
mse_values = [mse_full, mse_sparse, mse_local]

plt.bar(methods, mse_values, color=['black', 'red', 'blue'], alpha=0.7)
plt.ylabel('MSE на тестовой выборке')
plt.title('Сравнение качества моделей')
plt.grid(True, alpha=0.3)

for i, mse in enumerate(mse_values):
    plt.text(i, mse + 0.1, f'{mse:.2f}', ha='center', va='bottom')

plt.tight_layout()
plt.show()

print(f"\nСравнение вычислительной эффективности:")
print(f"Полная модель: MSE = {mse_full:.4f}")
print(f"Разреженная модель: MSE = {mse_sparse:.4f} (использует {len(X_sparse)} точек)")
print(f"Локальная модель: MSE = {mse_local:.4f}")

Аппроксимации временного ряда гауссовскими процессами: полная модель, разреженная с индуцирующими точками, локальная модель с движущимся окном, а также сравнительная столбчатая диаграмма MSE на тестовой выборке для всех 3-х подходов

Рис. 4: Аппроксимации временного ряда гауссовскими процессами: полная модель, разреженная с индуцирующими точками, локальная модель с движущимся окном, а также сравнительная столбчатая диаграмма MSE на тестовой выборке для всех 3-х подходов

Тестирование оптимизированных методов:
Разреженная выборка: 30 из 160 точек
Сравнение вычислительной эффективности:
Полная модель: MSE = 3.3036
Разреженная модель: MSE = 2.2097 (использует 30 точек)
Локальная модель: MSE = 7.5468

Оптимизированные методы демонстрируют практическую применимость гауссовских процессов к большим временным рядам. Разреженная аппроксимация позволяет значительно сократить вычислительные затраты при сохранении приемлемого качества аппроксимации. Локальные модели особенно эффективны для нестационарных данных, поскольку они автоматически адаптируются к локальным характеристикам временного ряда.

Важно отметить, что выбор между различными подходами зависит от конкретной задачи и характеристик данных. Для стационарных рядов с глобальными паттернами предпочтительны полные модели, в то время как для нестационарных данных с локальными особенностями лучше подходят локальные методы.

Support Vector Regression в аппроксимации временных рядов

Support Vector Regression (SVR) представляет собой адаптацию классического метода опорных векторов для задач регрессии. Основная идея SVR заключается в отображении исходных данных в многомерное пространство признаков с помощью ядерных функций и последующем построении линейной регрессии в этом пространстве. Это позволяет эффективно моделировать сложные нелинейные зависимости в временных рядах.

Ключевое преимущество SVR для финансовых приложений заключается в его робастности к выбросам благодаря использованию ε-insensitive loss функции. Это особенно важно при работе с финансовыми данными, где экстремальные события являются неотъемлемой частью процесса.

import numpy as np
import matplotlib.pyplot as plt
from sklearn.svm import SVR
from sklearn.model_selection import GridSearchCV, TimeSeriesSplit
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_squared_error, mean_absolute_error
import warnings
warnings.filterwarnings('ignore')

class TimeSeriesSVRApproximator:

    def __init__(self):
        self.models = {}
        self.feature_names = None

    def generate_time_features(self, time_points, polynomial_degree=3):
        """
        Генерирует временные признаки для SVR с фиксированным набором признаков
        """
        features = []
        feature_names = []

        # Полиномиальные признаки времени
        for degree in range(1, polynomial_degree + 1):
            features.append((time_points ** degree).reshape(-1, 1))
            feature_names.append(f'time^{degree}')

        # Тригонометрические признаки для всех периодов без условия
        periods = [10, 20, 50, 100]
        for period in periods:
            freq = 2 * np.pi / period
            features.append(np.sin(freq * time_points).reshape(-1, 1))
            features.append(np.cos(freq * time_points).reshape(-1, 1))
            feature_names.extend([f'sin_{period}', f'cos_{period}'])

        # Разностные признаки
        if len(time_points) > 1:
            dt = np.diff(time_points)
            dt_feature = np.concatenate([[dt[0]], dt])
            features.append(dt_feature.reshape(-1, 1))
            feature_names.append('dt')
        else:
            # Если длина 1, добавим 0, чтобы сохранить размерность
            features.append(np.array([[0]]))
            feature_names.append('dt')

        X_features = np.hstack(features)
        if self.feature_names is None:
            self.feature_names = feature_names
        else:
            # Проверяем что имена признаков совпадают
            assert feature_names == self.feature_names, "Несоответствие признаков между генерациями"
        return X_features

    def optimize_svr_parameters(self, X, y, kernel_type='rbf'):
        """
        Оптимизация гиперпараметров SVR с временной кросс-валидацией
        """
        if kernel_type == 'rbf':
            param_grid = {
                'svr__C': [0.1, 1, 10, 100],
                'svr__gamma': ['scale', 'auto', 0.001, 0.01, 0.1, 1],
                'svr__epsilon': [0.01, 0.1, 0.2, 0.5]
            }
        elif kernel_type == 'poly':
            param_grid = {
                'svr__C': [0.1, 1, 10, 100],
                'svr__degree': [2, 3, 4],
                'svr__epsilon': [0.01, 0.1, 0.2, 0.5],
                'svr__coef0': [0, 1, 2]
            }
        elif kernel_type == 'linear':
            param_grid = {
                'svr__C': [0.1, 1, 10, 100],
                'svr__epsilon': [0.01, 0.1, 0.2, 0.5]
            }

        pipeline = Pipeline([
            ('scaler', StandardScaler()),
            ('svr', SVR(kernel=kernel_type))
        ])

        tscv = TimeSeriesSplit(n_splits=5)

        grid_search = GridSearchCV(
            pipeline, param_grid, cv=tscv,
            scoring='neg_mean_squared_error',
            n_jobs=-1, verbose=0
        )

        grid_search.fit(X, y)
        return grid_search.best_estimator_, grid_search.best_params_, grid_search.best_score_

    def fit_multiple_kernels(self, time_points, series):
        """
        Обучаем SVR с разными ядрами и сравниваем
        """
        X_features = self.generate_time_features(time_points)

        kernels = ['linear', 'rbf', 'poly']
        results = {}

        for kernel in kernels:
            print(f"Оптимизация параметров для ядра {kernel}...")
            best_model, best_params, best_score = self.optimize_svr_parameters(X_features, series, kernel_type=kernel)
            self.models[kernel] = best_model
            results[kernel] = {
                'model': best_model,
                'best_params': best_params,
                'cv_score': best_score
            }
            print(f"  Лучший CV score: {best_score:.4f}")
            print(f"  Лучшие параметры: {best_params}")

        return results

    def predict_with_features(self, kernel_type, time_points_pred):
        """
        Предсказание для новых точек
        """
        if kernel_type not in self.models:
            raise ValueError(f"Модель для ядра {kernel_type} не обучена")

        X_pred_features = self.generate_time_features(time_points_pred)
        y_pred = self.models[kernel_type].predict(X_pred_features)
        return y_pred


def create_complex_nonlinear_series(n_points=300):
    np.random.seed(123)
    t = np.linspace(0, 20, n_points)

    component1 = 5 * np.sin(0.5 * t) * np.exp(-0.1 * t)
    component2 = 3 * t / (1 + 0.1 * t ** 2)
    component3 = 2 * np.tanh(0.2 * (t - 10))
    component4 = np.where(t > 15, 2 * (t - 15) ** 0.5, 0)

    amplitude_modulation = 1 + 0.5 * np.sin(0.1 * t)
    periodic_component = amplitude_modulation * np.sin(2 * t + 0.3 * t ** 2)

    volatility = 0.5 + 0.3 * np.sin(0.3 * t) ** 2
    noise = np.random.normal(0, 1, n_points) * volatility

    series = 100 + component1 + component2 + component3 + component4 + periodic_component + noise
    return t, series


# Пример использования
time_complex, series_complex = create_complex_nonlinear_series(250)

split_idx = int(0.8 * len(series_complex))
time_train = time_complex[:split_idx]
series_train = series_complex[:split_idx]
time_test = time_complex[split_idx:]
series_test = series_complex[split_idx:]

svr_approximator = TimeSeriesSVRApproximator()
svr_results = svr_approximator.fit_multiple_kernels(time_train, series_train)

time_pred = np.linspace(0, 25, 400)

predictions = {}
for kernel in ['linear', 'rbf', 'poly']:
    pred = svr_approximator.predict_with_features(kernel, time_pred)
    predictions[kernel] = pred

# Визуализация результатов
fig, axes = plt.subplots(2, 2, figsize=(16, 12))

# 1. Сравнение всех моделей
axes[0, 0].plot(time_complex, series_complex, 'ko', markersize=3, alpha=0.6, label='Исходные данные')
colors = ['red', 'blue', 'green']
for i, (kernel, pred) in enumerate(predictions.items()):
    axes[0, 0].plot(time_pred, pred, color=colors[i], linewidth=2, label=f'SVR {kernel}')

axes[0, 0].axvline(x=time_complex[split_idx], color='gray', linestyle='--', alpha=0.7, label='Граница обучения')
axes[0, 0].set_title('Сравнение SVR с различными ядрами')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
axes[0, 0].set_xlabel('Время')
axes[0, 0].set_ylabel('Значение')

# 2. Детальный анализ RBF ядра: обучение и тест
rbf_pred_train = svr_approximator.predict_with_features('rbf', time_train)
rbf_pred_test = svr_approximator.predict_with_features('rbf', time_test)
axes[0, 1].plot(time_train, series_train, 'ko', markersize=3, alpha=0.6, label='Обучающие данные')
axes[0, 1].plot(time_test, series_test, 'ro', markersize=3, alpha=0.6, label='Тестовые данные')
axes[0, 1].plot(time_train, rbf_pred_train, 'b-', linewidth=2, label='Предсказание на обучении')
axes[0, 1].plot(time_test, rbf_pred_test, 'r-', linewidth=2, label='Предсказание на тесте')
train_mse = mean_squared_error(series_train, rbf_pred_train)
test_mse = mean_squared_error(series_test, rbf_pred_test)
axes[0, 1].set_title(f'RBF SVR: Train MSE={train_mse:.2f}, Test MSE={test_mse:.2f}')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

# 3. Анализ остатков для RBF
residuals_rbf = series_train - rbf_pred_train
axes[1, 0].scatter(rbf_pred_train, residuals_rbf, alpha=0.6, color='blue')
axes[1, 0].axhline(y=0, color='red', linestyle='--')
axes[1, 0].set_xlabel('Предсказанные значения')
axes[1, 0].set_ylabel('Остатки')
axes[1, 0].set_title('Анализ остатков (RBF SVR)')
axes[1, 0].grid(True, alpha=0.3)

# 4. Сравнение метрик MSE по ядрам
kernels = ['linear', 'rbf', 'poly']
train_mses = []
test_mses = []

for kernel in kernels:
    pred_train = svr_approximator.predict_with_features(kernel, time_train)
    pred_test = svr_approximator.predict_with_features(kernel, time_test)

    train_mses.append(mean_squared_error(series_train, pred_train))
    test_mses.append(mean_squared_error(series_test, pred_test))

x_pos = np.arange(len(kernels))
width = 0.35

axes[1, 1].bar(x_pos - width/2, train_mses, width, label='Train MSE', color='lightblue', alpha=0.7)
axes[1, 1].bar(x_pos + width/2, test_mses, width, label='Test MSE', color='darkblue', alpha=0.7)

axes[1, 1].set_xlabel('Тип ядра')
axes[1, 1].set_ylabel('MSE')
axes[1, 1].set_title('Сравнение качества различных ядер')
axes[1, 1].set_xticks(x_pos)
axes[1, 1].set_xticklabels(kernels)
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)

for i, (train_mse_val, test_mse_val) in enumerate(zip(train_mses, test_mses)):
    axes[1, 1].text(i - width/2, train_mse_val + 0.1, f'{train_mse_val:.1f}', ha='center', va='bottom', fontsize=8)
    axes[1, 1].text(i + width/2, test_mse_val + 0.1, f'{test_mse_val:.1f}', ha='center', va='bottom', fontsize=8)

plt.tight_layout()
plt.show()

# Вывод результатов оптимизации
print("Детальный анализ SVR моделей:")
print("=" * 60)
for kernel, result in svr_results.items():
    print(f"\n{kernel.upper()} ядро:")
    print(f"  CV Score: {result['cv_score']:.4f}")
    print(f"  Лучшие параметры: {result['best_params']}")

Сравнение предсказаний SVR с разными ядрами, анализ предсказаний RBF-модели на обучающей и тестовой выборках, диаграмма остатков для RBF-модели и сравнительная гистограмма ошибок (MSE) для всех ядер

Рис. 5: Сравнение предсказаний SVR с разными ядрами, анализ предсказаний RBF-модели на обучающей и тестовой выборках, диаграмма остатков для RBF-модели и сравнительная гистограмма ошибок (MSE) для всех ядер

Оптимизация параметров для ядра linear...
  Лучший CV score: -4.7097
  Лучшие параметры: {'svr__C': 0.1, 'svr__epsilon': 0.01}
Оптимизация параметров для ядра rbf...
  Лучший CV score: -3.4647
  Лучшие параметры: {'svr__C': 1, 'svr__epsilon': 0.2, 'svr__gamma': 0.01}
Оптимизация параметров для ядра poly...
  Лучший CV score: -12.9413
  Лучшие параметры: {'svr__C': 0.1, 'svr__coef0': 2, 'svr__degree': 2, 'svr__epsilon': 0.01}

Детальный анализ SVR моделей:
============================================================

LINEAR ядро:
  CV Score: -4.7097
  Лучшие параметры: {'svr__C': 0.1, 'svr__epsilon': 0.01}

RBF ядро:
  CV Score: -3.4647
  Лучшие параметры: {'svr__C': 1, 'svr__epsilon': 0.2, 'svr__gamma': 0.01}

POLY ядро:
  CV Score: -12.9413
  Лучшие параметры: {'svr__C': 0.1, 'svr__coef0': 2, 'svr__degree': 2, 'svr__epsilon': 0.01}

Применение SVR к аппроксимации временных рядов демонстрирует несколько важных особенностей:

  1. Линейное ядро показывает хорошие результаты на данных с простой структурой и обеспечивает интерпретируемость через анализ весов признаков;
  2. RBF ядро эффективно для моделирования сложных нелинейных зависимостей, но требует тщательной настройки параметров гамма и C;
  3. Полиномиальное ядро может хорошо работать на данных с полиномиальными трендами, но склонно к переобучению при высоких степенях.
Читайте также:  Библиотека sktime для анализа временных рядов

Ключевым преимуществом SVR является его способность автоматически определять сложность модели через количество опорных векторов. Меньшее количество опорных векторов указывает на более простую и обобщаемую модель, что особенно важно для прогнозирования временных рядов.

Анализ остатков показывает, насколько хорошо модель захватывает структуру данных. Случайное распределение остатков указывает на адекватность модели, в то время как систематические паттерны в остатках свидетельствуют о недостаточной сложности модели или необходимости дополнительных признаков.

Модели пространства состояний (State Space Models)

State Space Models (модели пространства состояний) представляют собой мощный класс методов для моделирования временных рядов, которые явно разделяют наблюдаемые данные и скрытые состояния системы. Этот подход особенно эффективен для финансовых временных рядов, где многие важные факторы (например, рыночные настроения, ликвидность, структурные сдвиги) не наблюдаются напрямую, но влияют на динамику цен.

Основная идея состоит в том, что наблюдаемый временной ряд генерируется скрытым процессом состояний, который эволюционирует согласно определенным правилам. Такой подход позволяет моделировать сложные паттерны, такие как изменяющиеся во времени тренды, волатильность и структурные сдвиги.

import numpy as np
import matplotlib.pyplot as plt
from scipy import optimize
from scipy.linalg import solve
from sklearn.metrics import mean_squared_error
import warnings
warnings.filterwarnings('ignore')

class StateSpaceModel:
    """
    Базовый класс для моделей пространства состояний
    """
    
    def __init__(self, n_states=2):
        self.n_states = n_states
        self.states = None
        self.observations = None
        self.transition_matrix = None
        self.observation_matrix = None
        self.process_noise_cov = None
        self.observation_noise_var = None
        
    def kalman_filter(self, y, A, H, Q, R, x0, P0):
        """
        Реализация фильтра Калмана
        """
        n_obs = len(y)
        n_states = len(x0)
        
        # Массивы для хранения результатов
        x_pred = np.zeros((n_obs, n_states))  # Предсказанные состояния
        x_filt = np.zeros((n_obs, n_states))  # Отфильтрованные состояния
        P_pred = np.zeros((n_obs, n_states, n_states))  # Ковариации предсказания
        P_filt = np.zeros((n_obs, n_states, n_states))  # Ковариации фильтрации
        log_likelihood = 0.0
        
        # Инициализация
        x_curr = x0.copy()
        P_curr = P0.copy()
        
        for t in range(n_obs):
            # Этап предсказания
            if t == 0:
                x_pred[t] = x_curr
                P_pred[t] = P_curr
            else:
                x_pred[t] = A @ x_filt[t-1]
                P_pred[t] = A @ P_filt[t-1] @ A.T + Q
            
            # Этап обновления
            innovation = y[t] - H @ x_pred[t]
            innovation_cov = H @ P_pred[t] @ H.T + R
            
            # Калмановский коэффициент усиления
            K = P_pred[t] @ H.T / innovation_cov  # shape: (2, 1)

            # Обновленные оценки
            x_filt[t] = x_pred[t] + (K.flatten() * innovation) 
            P_filt[t] = P_pred[t] - K @ H @ P_pred[t]
            
            # Log-likelihood
            log_likelihood += -0.5 * (np.log(2 * np.pi * innovation_cov) + 
                                     innovation**2 / innovation_cov)
        
        return x_filt, P_filt, log_likelihood
    
    def kalman_smoother(self, x_filt, P_filt, A, Q):
        """
        Сглаживание Калмана (назад по времени)
        """
        n_obs, n_states = x_filt.shape
        x_smooth = np.zeros_like(x_filt)
        P_smooth = np.zeros_like(P_filt)
        
        # Инициализация с последнего момента времени
        x_smooth[-1] = x_filt[-1]
        P_smooth[-1] = P_filt[-1]
        
        # Обратный проход
        for t in range(n_obs - 2, -1, -1):
            # Предсказание на следующий шаг
            x_pred_next = A @ x_filt[t]
            P_pred_next = A @ P_filt[t] @ A.T + Q
            
            # Коэффициент сглаживания
            C = P_filt[t] @ A.T @ np.linalg.inv(P_pred_next)
            
            # Сглаженные оценки
            x_smooth[t] = x_filt[t] + C @ (x_smooth[t+1] - x_pred_next)
            P_smooth[t] = P_filt[t] + C @ (P_smooth[t+1] - P_pred_next) @ C.T
        
        return x_smooth, P_smooth

class LocalTrendModel(StateSpaceModel):
    """
    Модель локального тренда (случайное блуждание уровня и наклона)
    """
    
    def __init__(self):
        super().__init__(n_states=2)
    
    def _fit_mle(self, y):
        """
        MLE оценка для модели локального тренда
        """
        def negative_log_likelihood(params):
            if len(params) != 3 or any(p <= 0 for p in params):
                return 1e10
            
            sigma_level, sigma_slope, sigma_obs = params
            
            # Матрицы системы
            A = np.array([[1.0, 1.0],
                         [0.0, 1.0]])
            H = np.array([[1.0, 0.0]])
            Q = np.array([[sigma_level**2, 0.0],
                         [0.0, sigma_slope**2]])
            R = sigma_obs**2
            
            # Начальные условия
            x0 = np.array([y[0], 0.0])  # Уровень и наклон
            P0 = np.eye(2) * 1e6
            
            try:
                _, _, log_lik = self.kalman_filter(y, A, H, Q, R, x0, P0)
                return -log_lik
            except:
                return 1e10
        
        # Оптимизация
        y_diff = np.diff(y)
        initial_params = [np.std(y_diff), np.std(np.diff(y_diff)), np.std(y)]
        bounds = [(1e-6, 10*np.std(y)) for _ in range(3)]
        
        result = optimize.minimize(negative_log_likelihood, initial_params,
                                 bounds=bounds, method='L-BFGS-B')
        
        if result.success:
            sigma_level, sigma_slope, sigma_obs = result.x
            
            # Сохраняем параметры
            A = np.array([[1.0, 1.0], [0.0, 1.0]])
            H = np.array([[1.0, 0.0]])
            Q = np.array([[sigma_level**2, 0.0], [0.0, sigma_slope**2]])
            R = sigma_obs**2
            
            x0 = np.array([y[0], 0.0])
            P0 = np.eye(2) * 1e6
            
            x_filt, P_filt, log_lik = self.kalman_filter(y, A, H, Q, R, x0, P0)
            x_smooth, P_smooth = self.kalman_smoother(x_filt, P_filt, A, Q)
            
            self.states = x_smooth
            self.states_variance = P_smooth
            
            return {
                'sigma_level': sigma_level,
                'sigma_slope': sigma_slope,
                'sigma_obs': sigma_obs,
                'log_likelihood': log_lik,
                'aic': 2 * 3 - 2 * log_lik,
                'states': x_smooth,
                'states_variance': P_smooth
            }
        else:
            raise RuntimeError("Оптимизация не сошлась")
    
    def fit(self, y):
        return self._fit_mle(y)

# Генерируем тестовые данные с изменяющимся трендом
def generate_time_varying_trend_data(n_points=200):
    """
    Генерирует данные с изменяющимся во времени трендом
    """
    np.random.seed(42)
    
    # Истинные скрытые состояния
    level_noise = np.random.normal(0, 0.5, n_points)
    slope_noise = np.random.normal(0, 0.1, n_points)
    
    true_level = np.zeros(n_points)
    true_slope = np.zeros(n_points)
    
    # Начальные условия
    true_level[0] = 100
    true_slope[0] = 0.1
    
    # Эволюция состояний
    for t in range(1, n_points):
        true_level[t] = true_level[t-1] + true_slope[t-1] + level_noise[t]
        true_slope[t] = true_slope[t-1] + slope_noise[t]
    
    # Наблюдения
    observation_noise = np.random.normal(0, 1.0, n_points)
    observed_series = true_level + observation_noise
    
    return observed_series, true_level, true_slope

# Тестируем модель пространства состояний
observed_data, true_level, true_slope = generate_time_varying_trend_data(150)
time_points = np.arange(len(observed_data))

# Разделяем на обучающую и тестовую выборки
split_point = int(0.8 * len(observed_data))
train_data = observed_data[:split_point]
test_data = observed_data[split_point:]

# Обучаем модель локального тренда
print("Обучение модели пространства состояний...")
local_trend_model = LocalTrendModel()
lt_results = local_trend_model.fit(train_data)

print(f"Модель локального тренда:")
print(f"  Sigma_level: {lt_results['sigma_level']:.4f}")
print(f"  Sigma_slope: {lt_results['sigma_slope']:.4f}")
print(f"  Sigma_obs: {lt_results['sigma_obs']:.4f}")
print(f"  Log-likelihood: {float(lt_results['log_likelihood']):.2f}")
print(f"  AIC: {float(lt_results['aic']):.2f}")

# Прогнозирование за пределы обучающей выборки
def forecast_state_space(model_results, n_ahead=10):
    """
    Прогнозирование с использованием модели локального тренда
    """
    A = np.array([[1.0, 1.0], [0.0, 1.0]])
    Q = np.array([[model_results['sigma_level']**2, 0.0], 
                 [0.0, model_results['sigma_slope']**2]])
    
    last_state = model_results['states'][-1:, :]
    last_var = model_results['states_variance'][-1:, :, :]
    
    # Прогнозирование
    forecasts = []
    forecast_vars = []
    
    current_state = last_state[0]
    current_var = last_var[0]
    
    for step in range(n_ahead):
        # Прогноз состояния
        next_state = A @ current_state
        next_var = A @ current_var @ A.T + Q
        
        # Прогноз наблюдения (только уровень)
        forecast = next_state[0]
        forecast_var = next_var[0, 0] + model_results['sigma_obs']**2
        
        forecasts.append(forecast)
        forecast_vars.append(forecast_var)
        
        # Обновляем для следующей итерации
        current_state = next_state
        current_var = next_var
    
    return np.array(forecasts), np.array(forecast_vars)

# Делаем прогнозы
n_forecast = len(test_data)
lt_forecasts, lt_forecast_vars = forecast_state_space(lt_results, n_forecast)
test_mse = mean_squared_error(test_data, lt_forecasts)

# Визуализация результатов
fig, axes = plt.subplots(2, 2, figsize=(16, 12))

# График 1: Модель и доверительные интервалы
level_states = lt_results['states'][:, 0]
level_std = np.sqrt(lt_results['states_variance'][:, 0, 0])

axes[0, 0].plot(time_points[:split_point], train_data, 'ko', markersize=2, alpha=0.5, 
                label='Наблюдения')
axes[0, 0].plot(time_points[:split_point], level_states, 'g-', linewidth=2, 
                label='Оценка уровня')
axes[0, 0].fill_between(time_points[:split_point], 
                        level_states - 1.96*level_std, 
                        level_states + 1.96*level_std, 
                        alpha=0.2, color='green', label='95% доверительный интервал')
axes[0, 0].plot(time_points, true_level, 'k--', linewidth=2, alpha=0.7, 
                label='Истинный уровень')
axes[0, 0].set_title('Модель локального тренда с доверительными интервалами')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# График 2: Эволюция наклона
slope_states = lt_results['states'][:, 1]
slope_std = np.sqrt(lt_results['states_variance'][:, 1, 1])

axes[0, 1].plot(time_points[:split_point], slope_states, 'g-', linewidth=2, 
                label='Оценка наклона')
axes[0, 1].fill_between(time_points[:split_point], 
                        slope_states - 1.96*slope_std, 
                        slope_states + 1.96*slope_std, 
                        alpha=0.2, color='green')
axes[0, 1].plot(time_points, true_slope, 'k--', linewidth=2, alpha=0.7, 
                label='Истинный наклон')
axes[0, 1].set_title('Эволюция наклона во времени')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
axes[0, 1].set_xlabel('Время')
axes[0, 1].set_ylabel('Наклон')

# График 3: Прогнозирование
axes[1, 0].plot(time_points[:split_point], train_data, 'ko', markersize=3, alpha=0.6, 
                label='Обучающие данные')
axes[1, 0].plot(time_points[split_point:], test_data, 'ro', markersize=3, alpha=0.6, 
                label='Тестовые данные')
axes[1, 0].plot(time_points[:split_point], level_states, 'g-', linewidth=2, 
                label='Модель на обучении')

forecast_time = time_points[split_point:]
axes[1, 0].plot(forecast_time, lt_forecasts, 'g--', linewidth=2, 
                label=f'Прогноз (MSE={test_mse:.2f})')

lt_forecast_std = np.sqrt(lt_forecast_vars)
axes[1, 0].fill_between(forecast_time, 
                        lt_forecasts - 1.96*lt_forecast_std, 
                        lt_forecasts + 1.96*lt_forecast_std, 
                        alpha=0.2, color='green')

axes[1, 0].axvline(x=split_point, color='gray', linestyle='--', alpha=0.7, 
                   label='Граница прогнозирования')
axes[1, 0].set_title('Прогнозирование с моделью пространства состояний')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)

# График 4: Анализ остатков
residuals = train_data - level_states
axes[1, 1].hist(residuals, bins=20, alpha=0.7, color='green', density=True)
axes[1, 1].axvline(x=0, color='red', linestyle='--', alpha=0.7)
axes[1, 1].set_title(f'Распределение остатков (σ={np.std(residuals):.2f})')
axes[1, 1].set_xlabel('Остатки')
axes[1, 1].set_ylabel('Плотность')
axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"\nКачество прогнозирования: MSE = {test_mse:.4f}")
print(f"Среднее отклонение остатков: {np.std(residuals):.4f}")

Модель локального тренда с доверительными интервалами, эволюцией наклона во времени и прогнозными значениями

Рис. 6: Модель локального тренда с доверительными интервалами, эволюцией наклона во времени и прогнозными значениями

Обучение модели пространства состояний...
Модель локального тренда:
  Sigma_level: 0.5854
  Sigma_slope: 0.0833
  Sigma_obs: 0.8403
  Log-likelihood: -212.01
  AIC: 430.03

Качество прогнозирования: MSE = 51.6832
Среднее отклонение остатков: 0.6831

Модели пространства состояний демонстрируют выдающуюся способность к адаптивному моделированию временных рядов с изменяющимися характеристиками. Ключевое преимущество этого подхода заключается в явном разделении наблюдаемого процесса и скрытой динамики, что позволяет моделировать сложные паттерны, такие как изменяющиеся во времени тренды и структурные сдвиги.

Фильтр Калмана обеспечивает оптимальные (в смысле минимума среднеквадратической ошибки) оценки скрытых состояний при условии, что модель корректно специфицирована. Алгоритм сглаживания дополнительно улучшает оценки, используя всю доступную информацию, включая будущие наблюдения.

Важной особенностью моделей пространства состояний является их способность предоставлять не только точечные прогнозы, но и оценки неопределенности. Доверительные интервалы прогнозов естественным образом расширяются с увеличением горизонта прогнозирования, отражая нарастающую неопределенность.

Преобразование Фурье и спектральный анализ

Преобразование Фурье представляет собой классический инструмент для анализа частотного состава временных рядов. В контексте финансовых данных, спектральный анализ позволяет выявить периодические компоненты, такие как сезонные паттерны, деловые циклы и другие регулярные колебания, которые могут быть использованы для построения аппроксимирующих моделей.

Основная идея спектрального подхода к аппроксимации заключается в представлении временного ряда как суммы синусоидальных компонент различных частот. Это позволяет не только выделить доминирующие частоты в данных, но и построить аппроксимацию путем отбора наиболее значимых частотных компонент.

import numpy as np
import matplotlib.pyplot as plt
from scipy import signal
from scipy.fft import fft, fftfreq
from sklearn.linear_model import LinearRegression

def spectral_approximation_analysis(time_series, time_points, n_components=None):
    n = len(time_series)
    dt = time_points[1] - time_points[0]
    
    window = signal.windows.hann(n)
    windowed_series = time_series * window
    
    fft_values = fft(windowed_series)
    frequencies = fftfreq(n, dt)
    power_spectrum = np.abs(fft_values)**2
    
    positive_freq_mask = frequencies > 0
    positive_freqs = frequencies[positive_freq_mask]
    positive_power = power_spectrum[positive_freq_mask]
    
    sorted_indices = np.argsort(positive_power)[::-1]
    
    if n_components is None:
        threshold = 0.05 * np.max(positive_power)
        significant_indices = sorted_indices[positive_power[sorted_indices] > threshold]
        n_components = min(len(significant_indices), n // 4)
    else:
        significant_indices = sorted_indices[:n_components]
    
    selected_freqs = positive_freqs[significant_indices]
    selected_amplitudes = np.abs(fft_values[positive_freq_mask][significant_indices])
    selected_phases = np.angle(fft_values[positive_freq_mask][significant_indices])
    
    return selected_freqs, selected_amplitudes, selected_phases, power_spectrum, frequencies

def construct_fourier_approximation(time_points, frequencies, amplitudes, phases):
    approximation = np.zeros_like(time_points)
    for freq, amp, phase in zip(frequencies, amplitudes, phases):
        approximation += (2 * amp / len(time_points)) * np.cos(2 * np.pi * freq * time_points + phase)
    return approximation

# Генерация временного ряда
np.random.seed(123)
n_points = 500
time = np.linspace(0, 50, n_points)
dt = time[1] - time[0]

trend = 0.001 * (time - 25)**3 + 0.05 * (time - 25) + 100
component1 = 7 * np.sin(2 * np.pi * 0.1 * time)
component2 = 4 * np.cos(2 * np.pi * 0.05 * time + np.pi / 4)
component3 = 2 * np.sin(2 * np.pi * 0.02 * time)

# Высокочастотный шумовой компонент
high_freq_component = np.zeros_like(time)
for freq in [0.3, 0.45, 0.6, 0.75]:
    phase = np.random.uniform(0, 2 * np.pi)
    amp = np.random.uniform(0.3, 1.2)
    high_freq_component += amp * np.sin(2 * np.pi * freq * time + phase)

# Белый шум
noise = np.random.normal(0, 1.0, n_points)

# Финальный временной ряд
complex_series = trend + component1 + component2 + component3 + high_freq_component + noise

# Спектральный анализ
freqs, amps, phases, power_spec, all_freqs = spectral_approximation_analysis(
    complex_series, time, n_components=20)

# Аппроксимации с разным числом компонент
n_components_list = [3, 5, 8, 15]
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
axes = axes.ravel()

for i, n_comp in enumerate(n_components_list):
    selected_freqs = freqs[:n_comp]
    selected_amps = amps[:n_comp]
    selected_phases = phases[:n_comp]
    
    approximation = construct_fourier_approximation(time, selected_freqs, selected_amps, selected_phases)
    dc_component = np.mean(complex_series)
    approximation += dc_component
    mse = np.mean((complex_series - approximation)**2)
    
    axes[i].plot(time, complex_series, 'k-', alpha=0.6, linewidth=1, label='Исходный ряд')
    axes[i].plot(time, approximation, 'r-', linewidth=2, label=f'{n_comp} компонент')
    axes[i].set_title(f'{n_comp} компонент, MSE: {mse:.2f}')
    axes[i].legend()
    axes[i].grid(True, alpha=0.3)
    axes[i].set_xlabel('Время')
    axes[i].set_ylabel('Значение')

plt.tight_layout()
plt.show()

# Дополнительная визуализация спектра
plt.figure(figsize=(15, 8))

# Спектр мощности
plt.subplot(2, 2, 1)
positive_mask = all_freqs > 0
plt.semilogy(all_freqs[positive_mask], power_spec[positive_mask], 'k-', linewidth=1)
plt.scatter(freqs[:10], amps[:10]**2, color='red', s=50, zorder=5, label='Отобранные компоненты')
plt.xlabel('Частота')
plt.ylabel('Спектральная плотность мощности')
plt.title('Спектр мощности')
plt.legend()
plt.grid(True, alpha=0.3)

# Кумулятивная энергия
plt.subplot(2, 2, 2)
sorted_power = np.sort(power_spec[positive_mask])[::-1]
cumulative_energy = np.cumsum(sorted_power) / np.sum(sorted_power)
plt.plot(range(1, min(101, len(cumulative_energy)+1)), cumulative_energy[:min(100, len(cumulative_energy))], 'k-', linewidth=2)
plt.axhline(y=0.9, color='red', linestyle='--', alpha=0.7, label='90% энергии')
plt.axhline(y=0.95, color='orange', linestyle='--', alpha=0.7, label='95% энергии')
plt.xlabel('Количество компонент')
plt.ylabel('Доля объясненной энергии')
plt.title('Кумулятивная энергия спектра')
plt.legend()
plt.grid(True, alpha=0.3)

# Фазовый спектр
plt.subplot(2, 2, 3)
plt.scatter(freqs[:10], phases[:10], s=amps[:10]*2, c=range(10), cmap='viridis', alpha=0.7)
plt.colorbar(label='Порядок компоненты')
plt.xlabel('Частота')
plt.ylabel('Фаза (радианы)')
plt.title('Фазовый спектр значимых компонент')
plt.grid(True, alpha=0.3)

# Амплитуды компонент
plt.subplot(2, 2, 4)
plt.bar(range(len(freqs[:10])), amps[:10], color='darkgray', alpha=0.7)
plt.xlabel('Номер компоненты')
plt.ylabel('Амплитуда')
plt.title('Амплитуды отобранных компонент')

plt.tight_layout()
plt.show()

Аппроксимация временного ряда разным числом Фурье-компонент

Рис. 7: Аппроксимация временного ряда разным числом Фурье-компонент

Визуализация спектра мощности, кумулятивной энергии спектра, фазового спектра значимых компонент

Рис. 8: Визуализация спектра мощности, кумулятивной энергии спектра, фазового спектра значимых компонент

Спектр мощности — логарифмический график распределения мощности по частотам временного ряда с выделением наиболее значимых частот, которые дают основной вклад в сигнал.

Кумулятивная энергия спектра — показывает, какую долю общей энергии временного ряда объясняет выбранное число гармоник, с отметками 90% и 95% накопленной энергии для оценки адекватного количества компонент.

Фазовый спектр значимых компонент — отображает фазы отобранных гармоник в зависимости от частоты, при этом размер и цвет точек отражают амплитуды и порядок компонент, что помогает понять структуру колебаний.

Спектральный анализ демонстрирует важные особенности частотной декомпозиции временных рядов. Основное преимущество метода заключается в способности автоматически выявлять периодические паттерны в данных без предварительных предположений об их природе. Это особенно ценно при работе с финансовыми данными, где могут присутствовать скрытые циклические компоненты.

Ключевая особенность аппроксимаций Фурье заключается в том, что анализируются только несколько (например, 3, 5, 8, 15) доминирующих частот. По сути, это сжатое, сглаженное представление сигнала, очищенное от шума. Фурье-анализ лучше подходит для периодических или квазипериодических сигналов. Случайный шум, тренды и скачки не выражаются через чистые синусы и косинусы, поэтому аппроксимация не может полностью их повторить.

Читайте также:  Стационарность временных рядов. Как анализировать нестационарные данные?

Практическая значимость аппроксимации Фурье:

  1. Выделение основных циклов и паттернов. Позволяет понять, какие частоты доминируют и влияют на поведение ряда (например, сезонность, тренды с определенной периодичностью);
  2. Сжатие и фильтрация сигнала. Можно убрать шум, оставив только значимые компоненты, улучшая качество анализа и прогнозирования;
  3. Предобработка для моделей. Часто используется как шаг для уменьшения размерности и выделения признаков;
  4. Диагностика. Позволяет увидеть структуру временного ряда в частотной области, что помогает выявить скрытые закономерности.

Анализ кумулятивной энергии спектра показывает, сколько компонент необходимо для объяснения заданной доли вариации в данных. В большинстве практических случаев 90-95% энергии сигнала содержится в относительно небольшом количестве низкочастотных компонент, что позволяет строить эффективные аппроксимации с существенным сжатием данных.

Важным ограничением классического преобразования Фурье является предположение о стационарности анализируемого процесса. Для нестационарных финансовых рядов более подходящими могут быть методы кратковременного преобразования Фурье (STFT) или адаптивные спектральные методы.

Вейвлет-анализ в аппроксимации временных рядов

Вейвлет-анализ (Wavelet Analysis) представляет собой мощный инструмент для многомасштабной декомпозиции временных рядов, который особенно эффективен при работе с нестационарными данными. В отличие от традиционного преобразования Фурье, которое предоставляет информацию о частотном составе сигнала в целом, вейвлет-преобразование позволяет анализировать как частотные, так и временные характеристики локально.

Основная идея вейвлет-анализа заключается в разложении исходного сигнала на компоненты различных масштабов с использованием семейства функций, полученных из базового вейвлета путем сдвигов и масштабирования. Это делает метод идеально подходящим для анализа финансовых временных рядов, которые часто содержат паттерны на различных временных горизонтах.

import pywt
import numpy as np
import matplotlib.pyplot as plt
from scipy import signal

def generate_complex_financial_series(n_points=560, dt=1.0):

    np.random.seed(42)
    t = np.arange(n_points) * dt

    segment = n_points // 3
    trend = np.zeros(n_points)
    trend[:segment] = 0.04 * np.arange(segment)                          
    trend[segment:2*segment] = trend[segment - 1]                       
    trend[2*segment:] = trend[2*segment - 1] - 0.04 * np.arange(1, n_points - 2*segment + 1)

    seasonal = 3 * np.sin(2 * np.pi * t / 250) + 1.2 * np.cos(2 * np.pi * t / 120)
    medium_cycles = 1.5 * np.sin(2 * np.pi * t / 400) + 0.8 * np.cos(2 * np.pi * t / 600)

    short_fluctuations = np.zeros(n_points)
    jumps = np.random.choice([0, 4, -4], size=n_points, p=[0.96, 0.02, 0.02])
    short_fluctuations[0] = np.random.normal(0, 0.5)
    for i in range(1, n_points):
        short_fluctuations[i] = 0.5 * short_fluctuations[i-1] + np.random.normal(0, 0.6) + jumps[i]

    volatility = np.ones(n_points)
    for i in range(1, n_points):
        volatility[i] = 0.07 + 0.7 * volatility[i-1] + 0.03 * short_fluctuations[i-1]**2

    noise = np.random.normal(0, 1, n_points) * np.sqrt(volatility)

    structural_breaks = np.zeros(n_points)
    break_points = [int(0.45 * n_points), int(0.65 * n_points)]
    break_magnitudes = [4, -3]
    for bp, mag in zip(break_points, break_magnitudes):
        structural_breaks[bp:] += mag

    series = 100 + trend + seasonal + medium_cycles + short_fluctuations + noise + structural_breaks
    return t, series

# Генерация временного ряда
time_points, financial_series = generate_complex_financial_series(560)

# Список вейвлетов для анализа
wavelets = ['db4', 'db8', 'coif4', 'bior4.4']
fig, axes = plt.subplots(len(wavelets), 2, figsize=(16, 12 * 1.3))

for i, wavelet_name in enumerate(wavelets):
    # Определяем максимально возможный уровень декомпозиции
    max_level = pywt.dwt_max_level(len(financial_series), wavelet_name)
    decomp_level = min(max_level, 6)
    
    # Прямое вейвлет-преобразование
    coeffs = pywt.wavedec(financial_series, wavelet_name, level=decomp_level)
    
    # Оценка уровня шума по самому детальному уровню (наименьший масштаб)
    sigma_est = np.median(np.abs(coeffs[-1])) / 0.6745
    
    # Вычисляем базовый порог по формуле Донхо (universal threshold)
    base_threshold = sigma_est * np.sqrt(2 * np.log(len(financial_series)))
    
    # Усиливаем порог для более агрессивного подавления шума
    threshold = base_threshold * 1.8 * 1.2

    # Применение soft thresholding ко всем детальным уровням
    coeffs_thresh = coeffs.copy()
    for j in range(1, len(coeffs)):
        coeffs_thresh[j] = pywt.threshold(coeffs[j], threshold * (2 ** (-j / 2)), 'soft')
    
    # Реконструкция сигнала после подавления шума
    reconstructed = pywt.waverec(coeffs_thresh, wavelet_name)
    
    # Оставляем только низкочастотные компоненты (тренд) — убираем детали
    coeffs_approx = coeffs.copy()
    for j in range(3, len(coeffs)):  # убираем детали от 3 уровня и выше
        coeffs_approx[j] = np.zeros_like(coeffs[j])
    approximation = pywt.waverec(coeffs_approx, wavelet_name)

    # Визуализация исходного сигнала и сигнала после подавления шума
    axes[i, 0].plot(time_points, financial_series, color='#666666', alpha=0.9, linewidth=1, label='Исходный ряд')
    axes[i, 0].plot(time_points[:len(reconstructed)], reconstructed, color='#CC6666', linewidth=2,
                    label='После шумоподавления')
    axes[i, 0].set_title(f'Вейвлет {wavelet_name}: шумоподавление')
    axes[i, 0].legend()
    axes[i, 0].grid(True, alpha=0.3)
    
    # Визуализация аппроксимации тренда (низкочастотная часть)
    axes[i, 1].plot(time_points, financial_series, color='#666666', alpha=0.9, linewidth=1, label='Исходный ряд')
    axes[i, 1].plot(time_points[:len(approximation)], approximation, color='#336699', linewidth=2,
                    label='Низкочастотная аппроксимация')
    axes[i, 1].set_title(f'Вейвлет {wavelet_name}: аппроксимация трендов')
    axes[i, 1].legend()
    axes[i, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Анализ распределения энергии по уровням для одного из вейвлетов
coeffs_analysis = pywt.wavedec(financial_series, 'db8', level=6)
energies = [np.sum(c**2) for c in coeffs_analysis]
total_energy = sum(energies)
energy_percentages = [e / total_energy * 100 for e in energies]

print("Распределение энергии по уровням декомпозиции (вейвлет db8):")
print(f"Аппроксимация (уровень 6): {energy_percentages[0]:.3f}%")
for i in range(1, len(energy_percentages)):
    print(f"Детали уровня {7 - i}: {energy_percentages[i]:.3f}%")

Графики показывают, как различные вейвлеты эффективно фильтруют шум и извлекают тренды из сложного финансового временного ряда, визуализируя как сглаженный сигнал после шумоподавления, так и его низкочастотную аппроксимацию

Рис. 9: Графики показывают, как различные вейвлеты эффективно фильтруют шум и извлекают тренды из сложного финансового временного ряда, визуализируя как сглаженный сигнал после шумоподавления, так и его низкочастотную аппроксимацию

Распределение энергии по уровням декомпозиции (вейвлет db8):
Аппроксимация (уровень 6): 99.988%
Детали уровня 6: 0.003%
Детали уровня 5: 0.001%
Детали уровня 4: 0.002%
Детали уровня 3: 0.002%
Детали уровня 2: 0.002%
Детали уровня 1: 0.002%

Данный пример демонстрирует мощь вейвлет-анализа в разложении сложного финансового временного ряда на компоненты различных масштабов. Лично я использую этот метод чаще других для удаления рыночного шума из графиков финансовых временных рядов.

Особенно важным является то, что различные вейвлеты показывают разную эффективность в зависимости от характеристик данных. Вейвлеты семейства Добеши (db4, db8) хорошо подходят для анализа данных с резкими переходами, в то время как биортогональные вейвлеты (bior4.4) лучше сохраняют симметрию сигнала.

Анализ распределения энергии показывает, какая часть информации содержится на каждом уровне декомпозиции. В типичных финансовых рядах большая часть энергии (60-80%) содержится в низкочастотных компонентах, что соответствует долгосрочным трендам и основным структурным изменениям. Высокочастотные компоненты содержат преимущественно шум и краткосрочные флуктуации.

Практические аспекты Wavelet-аппроксимации

Ключевое преимущество вейвлет-аппроксимации заключается в возможности адаптивного контроля гладкости результирующей кривой. Убирая детальные коэффициенты высших уровней, мы получаем сглаженную аппроксимацию, которая сохраняет основные структурные особенности данных, но устраняет шум и случайные флуктуации.

def multiresolution_wavelet_approximation(time_series, wavelet='db8', levels_to_keep=None):
    """
    Многоразрешающая вейвлет-аппроксимация с селективным восстановлением
    """
    # Выполняем полную декомпозицию
    max_level = pywt.dwt_max_level(len(time_series), wavelet)
    coeffs = pywt.wavedec(time_series, wavelet, level=max_level)
    
    if levels_to_keep is None:
        # Автоматический выбор уровней на основе энергетического критерия
        energies = [np.sum(c**2) for c in coeffs]
        total_energy = sum(energies)
        energy_ratios = [e/total_energy for e in energies]
        
        # Сохраняем уровни с энергией > 5% от общей
        levels_to_keep = [i for i, ratio in enumerate(energy_ratios) if ratio > 0.05]
    
    # Создаем различные аппроксимации
    approximations = {}
    
    # 1. Только аппроксимирующие коэффициенты
    coeffs_approx_only = [coeffs[0]] + [np.zeros_like(c) for c in coeffs[1:]]
    approximations['trend_only'] = pywt.waverec(coeffs_approx_only, wavelet)
    
    # 2. Аппроксимация + крупномасштабные детали
    coeffs_low_freq = coeffs.copy()
    for i in range(max_level//2, len(coeffs)):  # Убираем мелкомасштабные детали
        coeffs_low_freq[i] = np.zeros_like(coeffs[i])
    approximations['low_frequency'] = pywt.waverec(coeffs_low_freq, wavelet)
    
    # 3. Селективная реконструкция по энергетическому критерию
    coeffs_selective = [np.zeros_like(c) for c in coeffs]
    for level in levels_to_keep:
        coeffs_selective[level] = coeffs[level]
    approximations['energy_based'] = pywt.waverec(coeffs_selective, wavelet)
    
    # 4. Адаптивное пороговое значение
    coeffs_adaptive = coeffs.copy()
    for i in range(1, len(coeffs)):
        # Вычисляем порог для каждого уровня
        sigma_level = np.std(coeffs[i])
        threshold = sigma_level * np.sqrt(2 * np.log(len(coeffs[i])))
        coeffs_adaptive[i] = pywt.threshold(coeffs[i], threshold, 'soft')
    approximations['adaptive_threshold'] = pywt.waverec(coeffs_adaptive, wavelet)
    
    return approximations, coeffs, levels_to_keep

# Применяем различные стратегии вейвлет-аппроксимации
approximations, original_coeffs, kept_levels = multiresolution_wavelet_approximation(
    financial_series, 'db8')

# Визуализация результатов
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
axes = axes.ravel()

methods = ['trend_only', 'low_frequency', 'energy_based', 'adaptive_threshold']
titles = ['Только тренд', 'Низкочастотная', 'По энергии', 'Адаптивная']

for i, (method, title) in enumerate(zip(methods, titles)):
    approx = approximations[method]
    mse = np.mean((financial_series[:len(approx)] - approx)**2)
    
    axes[i].plot(time_points, financial_series, 'k-', alpha=0.6, linewidth=1, 
                label='Исходный ряд')
    axes[i].plot(time_points[:len(approx)], approx, 'r-', linewidth=2, 
                label=f'{title} (MSE: {mse:.2f})')
    axes[i].set_title(f'Вейвлет-аппроксимация: {title}')
    axes[i].legend()
    axes[i].grid(True, alpha=0.3)
    axes[i].set_xlabel('Время')
    axes[i].set_ylabel('Значение')

plt.tight_layout()
plt.show()

# Анализ компрессии данных
print("Анализ сжатия данных:")
print("=" * 40)

original_size = len(financial_series)
for method in methods:
    # Подсчитываем количество ненулевых коэффициентов
    if method == 'trend_only':
        non_zero_coeffs = len(original_coeffs[0])
    elif method == 'low_frequency':
        non_zero_coeffs = sum(len(original_coeffs[i]) for i in range(len(original_coeffs)//2))
    elif method == 'energy_based':
        non_zero_coeffs = sum(len(original_coeffs[i]) for i in kept_levels)
    else:  # adaptive_threshold
        coeffs_adaptive = original_coeffs.copy()
        non_zero_coeffs = 0
        for i in range(len(original_coeffs)):
            if i == 0:
                non_zero_coeffs += len(original_coeffs[i])
            else:
                sigma_level = np.std(original_coeffs[i])
                threshold = sigma_level * np.sqrt(2 * np.log(len(original_coeffs[i])))
                thresholded = pywt.threshold(original_coeffs[i], threshold, 'soft')
                non_zero_coeffs += np.count_nonzero(thresholded)
    
    compression_ratio = (1 - non_zero_coeffs / original_size) * 100
    mse = np.mean((financial_series[:len(approximations[method])] - approximations[method])**2)
    
    print(f"{titles[methods.index(method)]:15}: "
          f"Сжатие {compression_ratio:.1f}%, MSE {mse:.3f}")

print(f"\nСохраненные энергетические уровни: {kept_levels}")

Разные стратегии вейвлет-аппроксимации (только тренд, низкочастотная фильтрация, селективная по энергии и адаптивное пороговое сглаживание) по-разному извлекают ключевые структуры и подавляют шум в сложном временном ряду

Рис. 10: Разные стратегии вейвлет-аппроксимации (только тренд, низкочастотная фильтрация, селективная по энергии и адаптивное пороговое сглаживание) по-разному извлекают ключевые структуры и подавляют шум в сложном временном ряду

Анализ сжатия данных:
========================================
Только тренд   : Сжатие 94.3%, MSE 1.978
Низкочастотная : Сжатие 79.8%, MSE 1.709
По энергии     : Сжатие 94.3%, MSE 1.978
Адаптивная     : Сжатие 93.4%, MSE 1.938

Сохраненные энергетические уровни: [0]

Приведенный код демонстрирует гибкость вейвлет-методов в создании аппроксимаций различной степени детализации:

  • Тренд-аппроксимация извлекает только основную долгосрочную компоненту, что полезно для выявления общего направления движения;
  • Низкочастотная аппроксимация сохраняет среднесрочные циклы, важные для понимания деловых циклов и сезонности;
  • Энергетический критерий автоматически выбирает наиболее значимые компоненты, обеспечивая оптимальный баланс между точностью и сжатием данных;
  • Адаптивное пороговое значение эффективно подавляет шум, сохраняя при этом важные структурные особенности сигнала.

Важным практическим аспектом является выбор подходящего вейвлета. Для финансовых данных с резкими изменениями (например, рыночные крахи) хорошо подходят вейвлеты Добеши высших порядков. Для более гладких трендов эффективны биортогональные вейвлеты или вейвлеты Койфлета.

Что лучше выбрать?

В этой статье мы рассмотрели множество методов аппроксимации временных рядов и можем сделать выводы об их применимости в практических задачах финансового анализа. Внимательный читатель заметит, что мы не рассматривали такие популярные методы как ARMA, ARIMA, экспоненциальное сглаживание и прочие. Почему? Лично я считаю их неэффективными для анализа биржевых временных рядов.

Эти классические эконометрические модели основаны на фундаментальных предпосылках, которые не соответствуют микроструктуре современных финансовых рынков. ARIMA и подобные методы предполагают стационарность остатков, линейность зависимостей и гомоскедастичность.

Критическая проблема заключается в их неспособности эффективно разделять сигнал от шума в условиях высокой рыночной волатильности. Классические модели демонстрируют избыточную чувствительность к краткосрочным флуктуациям, интерпретируя случайные движения как значимые паттерны, что приводит к генерации ложных торговых сигналов. Одновременно они страдают от инерционности при идентификации истинных структурных изменений в данных.

Линейная природа ARIMA моделей обуславливает их запаздывающий характер — они способны реагировать на уже состоявшиеся изменения тренда, но не могут адаптироваться к формирующимся режимным переключениям в реальном времени. Экспоненциальное сглаживание, несмотря на свою адаптивность, демонстрирует аналогичную проблему: оптимальные параметры сглаживания для фильтрации шума оказываются слишком консервативными для своевременного обнаружения трендов, и наоборот.

Рассмотренные в данной статье методы предлагают принципиально иные решения этих фундаментальных проблем:

  • Вейвлет-анализ обеспечивает многомасштабную декомпозицию, позволяя одновременно фильтровать высокочастотный шум и сохранять чувствительность к структурным изменениям на различных временных горизонтах;
  • Сплайны с адаптивным выбором узлов демонстрируют локальную гибкость, концентрируя вычислительные ресурсы на участках с высокой кривизной — именно там, где происходят значимые изменения тренда;
  • State Space Models через механизм фильтра Калмана обеспечивают оптимальное разделение сигнала и шума в режиме реального времени, автоматически адаптируя свои параметры к изменяющейся волатильности;
  • Гауссовские процессы предоставляют непараметрическую гибкость в моделировании сложных нелинейных зависимостей, при этом естественным образом квантифицируя неопределенность прогнозов;
  • SVR с робастными ядрами эффективно справляется с выбросами и нелинейностями, характерными для финансовых данных;
  • Адаптивные Фурье-методы способны отслеживать эволюцию частотной структуры рынка во времени.

Каждый из этих подходов решает конкретные ограничения классических методов, предлагая инструментарий, действительно адекватный сложности современных финансовых рынков.

Сравнительная таблица методов аппроксимации временных рядов

Рис. 11: Сравнительная таблица методов аппроксимации временных рядов

Практические рекомендации по выбору методов

  • Для стационарных данных с четкими трендами: Полиномиальная аппроксимация или классическое преобразование Фурье обеспечивают хорошие результаты при минимальных вычислительных затратах;
  • Для нестационарных финансовых рядов: State Space Models и вейвлет-анализ показывают лучшую адаптивность к изменяющимся характеристикам данных. Модели пространства состояний особенно эффективны при наличии скрытых факторов;
  • При высоком уровне шума: SVR с робастными ядрами и вейвлет-методы с адаптивным пороговым значением демонстрируют превосходную устойчивость к выбросам;
  • Когда критична оценка неопределенности: Гауссовские процессы и State Space Models предоставляют надежные доверительные интервалы, что крайне важно для принятия решений в условиях риска;
  • Для задач реального времени: Сплайны и адаптивные спектральные методы обеспечивают оптимальный баланс между качеством и скоростью вычислений.

Выбор метода аппроксимации должен основываться на специфике данных, требованиях к точности и вычислительных ограничениях. В практических приложениях часто оказывается эффективным комбинирование различных подходов: использование вейвлет-анализа для предварительной обработки данных, State Space Models для моделирования основной динамики и гауссовских процессов для оценки неопределенности прогнозов.

Такой комплексный подход позволяет максимально учесть все особенности временного ряда, повысить качество аппроксимации и сделать модели более устойчивыми к шуму и изменчивости данных, что особенно важно в условиях реальной финансовой и технической аналитики.