Продвинутый Python для финансов: декораторы, контекстные менеджеры и метаклассы

Сегодня я хочу поделиться опытом использования продвинутых возможностей Python — декораторов, контекстных менеджеров и метаклассов, которые позволяют надежно и гибко разрабатывать финансовый софт (торговые боты, риск-модели, системы бэктестинга, аналитические платформы) и внедрять в другие части кода. Осторожно: статья длинная, но после ее прочтения вы сможете понимать и проектировать архитектуру приложений, которую будет легко поддерживать, масштабировать и адаптировать под постоянно меняющиеся требования в мире финансов.

Архитектурные принципы продвинутого финансового кода

Метапрограммирование в финансовом анализе

Метапрограммирование — это написание программ, которые создают или модифицируют другие программы. В финансовых приложениях этот подход особенно ценен, поскольку позволяет создавать гибкие системы, способные адаптироваться к меняющимся рыночным условиям без изменения основного кода.

Классический пример — система индикаторов технического анализа. Вместо создания отдельного класса для каждого индикатора, мы можем использовать метаклассы для автоматической генерации классов с нужной функциональностью. Это не только сокращает количество кода, но и обеспечивает единообразие интерфейсов.

import numpy as np
import pandas as pd
from abc import ABC, abstractmethod, ABCMeta
from typing import Dict, Any, Callable
import yfinance as yf

class IndicatorMeta(ABCMeta):
    """
    Метакласс для автоматического создания финансовых индикаторов
    с единообразным интерфейсом и валидацией параметров
    Наследуется от ABCMeta для совместимости с ABC
    """
    def __new__(mcs, name, bases, namespace):
        # Автоматически добавляем валидацию параметров
        if 'calculate' in namespace and callable(namespace['calculate']):
            original_calculate = namespace['calculate']
            
            def validated_calculate(self, data, **kwargs):
                # Проверяем входные данные
                if not isinstance(data, (pd.Series, pd.DataFrame, np.ndarray)):
                    raise TypeError(f"Unsupported data type: {type(data)}")
                
                if len(data) < getattr(self, 'min_periods', 1):
                    raise ValueError(f"Insufficient data: {len(data)} < {self.min_periods}") return original_calculate(self, data, **kwargs) namespace['calculate'] = validated_calculate # Автоматически добавляем кеширование результатов if 'calculate' in namespace: if '_cache' not in namespace: namespace['_cache'] = {} original_calc = namespace['calculate'] def cached_calculate(self, data, **kwargs): # Создаем безопасный ключ кеша try: if hasattr(data, 'values'): data_hash = hash(str(data.values.tobytes())) else: data_hash = hash(str(data.tobytes())) except: # Fallback для случаев, когда не можем создать hash data_hash = hash(str(data)) cache_key = (data_hash, tuple(sorted(kwargs.items()))) if not hasattr(self, '_cache'): self._cache = {} if cache_key not in self._cache: self._cache[cache_key] = original_calc(self, data, **kwargs) return self._cache[cache_key] namespace['calculate'] = cached_calculate return super().__new__(mcs, name, bases, namespace) class BaseIndicator(ABC, metaclass=IndicatorMeta): """Базовый класс для всех финансовых индикаторов""" def __init__(self, name: str, min_periods: int = 1): self.name = name self.min_periods = min_periods self._cache = {} @abstractmethod def calculate(self, data, **kwargs): pass def clear_cache(self): """Очистка кеша вычислений""" self._cache.clear() class AdvancedRSI(BaseIndicator): """ Продвинутая реализация RSI с дополнительными возможностями благодаря метаклассу получает автоматическую валидацию и кеширование """ def __init__(self, period: int = 14, smooth_period: int = 3): super().__init__(f"Advanced_RSI_{period}_{smooth_period}", period) self.period = period self.smooth_period = smooth_period def calculate(self, data, **kwargs): """ Расчет RSI с дополнительным сглаживанием """ if isinstance(data, pd.DataFrame): prices = data['Close'] if 'Close' in data.columns else data.iloc[:, 0] else: prices = data delta = prices.diff() gains = delta.where(delta > 0, 0)
        losses = -delta.where(delta < 0, 0) # Используем экспоненциальное сглаживание вместо простого скользящего среднего avg_gains = gains.ewm(span=self.period, adjust=False).mean() avg_losses = losses.ewm(span=self.period, adjust=False).mean() # Избегаем деления на ноль rs = avg_gains / (avg_losses + 1e-10) rsi = 100 - (100 / (1 + rs)) # Дополнительное сглаживание для уменьшения шума if self.smooth_period > 1:
            rsi = rsi.rolling(window=self.smooth_period).mean()
        
        return rsi.fillna(50)  # Заполняем NaN средним значением RSI

# Дополнительный индикаторы для демонстрации работы метакласса
class MovingAverage(BaseIndicator):
    """Простое скользящее среднее"""
    
    def __init__(self, period: int = 20):
        super().__init__(f"MA_{period}", period)
        self.period = period
    
    def calculate(self, data, **kwargs):
        if isinstance(data, pd.DataFrame):
            prices = data['Close'] if 'Close' in data.columns else data.iloc[:, 0]
        else:
            prices = data
        
        return prices.rolling(window=self.period).mean()

class MACD(BaseIndicator):
    """MACD индикатор"""
    
    def __init__(self, fast_period: int = 12, slow_period: int = 26, signal_period: int = 9):
        super().__init__(f"MACD_{fast_period}_{slow_period}_{signal_period}", slow_period)
        self.fast_period = fast_period
        self.slow_period = slow_period
        self.signal_period = signal_period
    
    def calculate(self, data, **kwargs):
        if isinstance(data, pd.DataFrame):
            prices = data['Close'] if 'Close' in data.columns else data.iloc[:, 0]
        else:
            prices = data
        
        fast_ema = prices.ewm(span=self.fast_period).mean()
        slow_ema = prices.ewm(span=self.slow_period).mean()
        macd_line = fast_ema - slow_ema
        signal_line = macd_line.ewm(span=self.signal_period).mean()
        histogram = macd_line - signal_line
        
        return pd.DataFrame({
            'MACD': macd_line,
            'Signal': signal_line,
            'Histogram': histogram
        })

# Пример использования
if __name__ == "__main__":
    try:
        # Загружаем данные
        ticker = "BTC-USD"
        btc_data = yf.download(ticker, period="1y", interval="1d")
        
        # Проверяем на MultiIndex и приводим к нужному формату
        if isinstance(btc_data.columns, pd.MultiIndex):
            btc_data.columns = btc_data.columns.droplevel(1)
        
        print(f"Загружены данные для {ticker}, форма: {btc_data.shape}")
        print(f"Колонки: {list(btc_data.columns)}")
        
        # Создаем индикаторы
        rsi_indicator = AdvancedRSI(period=21, smooth_period=5)
        ma_indicator = MovingAverage(period=50)
        macd_indicator = MACD()
        
        # Вычисляем индикаторы (автоматически применяется валидация и кеширование)
        rsi_values = rsi_indicator.calculate(btc_data)
        ma_values = ma_indicator.calculate(btc_data)
        macd_values = macd_indicator.calculate(btc_data)
        
        # Выводим результаты
        print(f"\nRSI для {ticker} (последние 5 значений):")
        print(rsi_values.tail().round(2))
        
        print(f"\nMA-50 для {ticker} (последние 5 значений):")
        print(ma_values.tail().round(2))
        
        print(f"\nMACD для {ticker} (последние 5 значений):")
        print(macd_values.tail().round(4))
        
        # Демонстрация кеширования - повторный вызов будет быстрее
        print(f"\nПовторный расчет RSI (из кеша):")
        rsi_cached = rsi_indicator.calculate(btc_data)
        print("Кеш работает!" if rsi_cached.equals(rsi_values) else "Проблема с кешем")
        
    except Exception as e:
        print(f"Ошибка: {e}")
Загружены данные для BTC-USD, форма: (366, 5)
Колонки: ['Close', 'High', 'Low', 'Open', 'Volume']

RSI для BTC-USD (последние 5 значений):
Date
2025-07-21    65.79
2025-07-22    65.31
2025-07-23    64.81
2025-07-24    63.97
2025-07-25    61.68
Name: Close, dtype: float64

MA-50 для BTC-USD (последние 5 значений):
Date
2025-07-21    109275.59
2025-07-22    109557.87
2025-07-23    109824.32
2025-07-24    110097.04
2025-07-25    110386.03
Name: Close, dtype: float64

MACD для BTC-USD (последние 5 значений):
                 MACD     Signal  Histogram
Date                                       
2025-07-21  2921.4533  2950.0083   -28.5550
2025-07-22  2965.8847  2953.1836    12.7011
2025-07-23  2867.9428  2936.1354   -68.1927
2025-07-24  2727.6559  2894.4395  -166.7837
2025-07-25  2399.8164  2795.5149  -395.6985

Повторный расчет RSI (из кеша):
Кеш работает!

Приведенный код демонстрирует мощь метапрограммирования в финансовых приложениях. Метакласс IndicatorMeta автоматически добавляет к каждому индикатору два важных функционала: валидацию входных данных и кеширование результатов. Это означает, что разработчику не нужно помнить о реализации этих аспектов в каждом новом индикаторе — они добавляются автоматически на этапе создания класса.

Валидация данных особенно важна в финансовых приложениях, где некорректные входные данные могут привести к печальным последствиям. Метакласс проверяет тип данных, их достаточность для расчета и автоматически генерирует понятные сообщения об ошибках. Кеширование же позволяет избежать повторных вычислений одних и тех же значений, что особенно актуально при работе с большими историческими датасетами.

Класс AdvancedRSI демонстрирует, как благодаря метаклассу мы можем сосредоточиться исключительно на бизнес-логике индикатора, не отвлекаясь на инфраструктурные задачи.

Проектирование отказоустойчивых финансовых систем

В высокочастотной торговле и управлении рисками отказоустойчивость системы не просто желательна — она абсолютно необходима. Потеря соединения с биржей, сбой в получении данных или ошибка в расчетах могут привести к многомиллионным убыткам. Продвинутые возможности Python позволяют элегантно решать эти проблемы на архитектурном уровне.

import time
import logging
from functools import wraps
from contextlib import contextmanager
from typing import Optional, Any, Callable
import threading
from dataclasses import dataclass
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5
recovery_timeout: int = 60
expected_exception: tuple = (Exception,)
class CircuitBreaker:
"""
Реализация паттерна Circuit Breaker для финансовых API
Предотвращает каскадные сбои при проблемах с внешними сервисами
"""
def __init__(self, config: CircuitBreakerConfig):
self.config = config
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
self._lock = threading.Lock()
# Настройка логирования для мониторинга состояния
self.logger = logging.getLogger(f"CircuitBreaker_{id(self)}")
self.logger.setLevel(logging.INFO)
def call(self, func: Callable, *args, **kwargs) -> Any:
"""Выполнение функции с защитой Circuit Breaker"""
with self._lock:
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
self.logger.info("Circuit breaker переходит в HALF_OPEN состояние")
else:
raise Exception("Circuit breaker OPEN - вызов заблокирован")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.config.expected_exception as e:
self._on_failure()
raise e
def _should_attempt_reset(self) -> bool:
"""Проверка возможности попытки восстановления"""
return (time.time() - self.last_failure_time) >= self.config.recovery_timeout
def _on_success(self):
"""Обработка успешного выполнения"""
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
self.logger.info("Circuit breaker восстановлен - переход в CLOSED")
def _on_failure(self):
"""Обработка неудачного выполнения"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.config.failure_threshold:
self.state = CircuitState.OPEN
self.logger.warning(f"Circuit breaker ОТКРЫТ после {self.failure_count} сбоев")
def circuit_breaker(config: CircuitBreakerConfig):
"""
Декоратор для добавления Circuit Breaker к функциям
"""
def decorator(func: Callable):
breaker = CircuitBreaker(config)
@wraps(func)
def wrapper(*args, **kwargs):
return breaker.call(func, *args, **kwargs)
# Добавляем доступ к состоянию breaker'а
wrapper.circuit_breaker = breaker
return wrapper
return decorator
# Конфигурация для работы с внешними API
api_breaker_config = CircuitBreakerConfig(
failure_threshold=3,
recovery_timeout=30,
expected_exception=(ConnectionError, TimeoutError, ValueError)
)
@circuit_breaker(api_breaker_config)
def fetch_market_data(symbol: str, timeframe: str = "1d") -> pd.DataFrame:
"""
Получение рыночных данных с защитой от сбоев
"""
try:
# Имитация потенциально нестабильного API-вызова
data = yf.download(symbol, period="6mo", interval=timeframe, progress=False)
if isinstance(data.columns, pd.MultiIndex):
data.columns = data.columns.droplevel(1)
if data.empty:
raise ValueError(f"Нет данных для символа {symbol}")
return data
except Exception as e:
logging.error(f"Ошибка получения данных для {symbol}: {e}")
raise
# Контекстный менеджер для безопасной работы с ресурсами
@contextmanager
def trading_session(session_name: str, max_memory_mb: int = 1000):
"""
Контекстный менеджер для управления торговой сессией
Обеспечивает корректное освобождение ресурсов и логирование
"""
import psutil
import gc
process = psutil.Process()
start_memory = process.memory_info().rss / 1024 / 1024  # MB
start_time = time.time()
session_logger = logging.getLogger(f"TradingSession_{session_name}")
session_logger.info(f"Начало сессии {session_name}. Память: {start_memory:.2f} MB")
try:
yield session_logger
except Exception as e:
session_logger.error(f"Ошибка в сессии {session_name}: {e}")
raise
finally:
# Принудительная очистка памяти
gc.collect()
end_memory = process.memory_info().rss / 1024 / 1024
session_duration = time.time() - start_time
memory_used = end_memory - start_memory
session_logger.info(f"Завершение сессии {session_name}. "
f"Длительность: {session_duration:.2f}с, "
f"Использовано памяти: {memory_used:.2f} MB")
if memory_used > max_memory_mb:
session_logger.warning(f"Превышен лимит памяти: {memory_used:.2f} > {max_memory_mb} MB")
# Пример использования отказоустойчивой системы
def analyze_portfolio_performance(symbols: list, period: str = "6mo"):
"""
Анализ производительности портфеля с отказоустойчивостью
"""
with trading_session("portfolio_analysis", max_memory_mb=500) as logger:
portfolio_data = {}
failed_symbols = []
for symbol in symbols:
try:
logger.info(f"Загрузка данных для {symbol}")
data = fetch_market_data(symbol)
if not data.empty:
# Расчет простой доходности
returns = data['Close'].pct_change().dropna()
portfolio_data[symbol] = {
'total_return': (data['Close'].iloc[-1] / data['Close'].iloc[0] - 1) * 100,
'volatility': returns.std() * np.sqrt(252) * 100,  # Годовая волатильность
'sharpe_ratio': (returns.mean() * 252) / (returns.std() * np.sqrt(252)) if returns.std() > 0 else 0
}
except Exception as e:
logger.error(f"Не удалось обработать {symbol}: {e}")
failed_symbols.append(symbol)
# Проверяем состояние circuit breaker
if hasattr(fetch_market_data, 'circuit_breaker'):
breaker_state = fetch_market_data.circuit_breaker.state
if breaker_state == CircuitState.OPEN:
logger.warning("Circuit breaker открыт - прерываем обработку")
break
return portfolio_data, failed_symbols
# Тестирование системы
crypto_symbols = ["BTC-USD", "ETH-USD", "ADA-USD", "DOT-USD", "LINK-USD"]
portfolio_results, failed = analyze_portfolio_performance(crypto_symbols)
print("Результаты анализа портфеля:")
for symbol, metrics in portfolio_results.items():
print(f"{symbol}: Доходность {metrics['total_return']:.2f}%, "
f"Волатильность {metrics['volatility']:.2f}%, "
f"Sharpe {metrics['sharpe_ratio']:.3f}")
if failed:
print(f"Не удалось обработать: {', '.join(failed)}")
Результаты анализа портфеля:
BTC-USD: Доходность 10.83%, Волатильность 37.63%, Sharpe 0.567
ETH-USD: Доходность 9.50%, Волатильность 67.63%, Sharpe 0.518
ADA-USD: Доходность -19.02%, Волатильность 111.65%, Sharpe 0.201
DOT-USD: Доходность -37.02%, Волатильность 64.92%, Sharpe -0.665
LINK-USD: Доходность -29.18%, Волатильность 75.74%, Sharpe -0.256

Данный код представляет комплексную систему отказоустойчивости, которая объединяет несколько продвинутых паттернов Python. Circuit Breaker — это паттерн, широко используемый в микросервисной архитектуре, который предотвращает каскадные сбои при проблемах с внешними сервисами. В контексте финансовых приложений это особенно важно, поскольку сбой в получении данных от одной биржи не должен парализовать всю торговую систему.

Класс CircuitBreaker отслеживает количество неудачных попыток обращения к внешнему сервису и автоматически блокирует дальнейшие вызовы при превышении порога ошибок. Это защищает нашу систему от бесполезных повторных попыток, плюс защищает внешний сервис от дополнительной нагрузки. Механизм восстановления позволяет системе автоматически возобновлять работу после истечения таймаута.

Контекстный менеджер trading_session решает другую потенциальную проблему — управление ресурсами во время торговых сессий. Он автоматически отслеживает потребление памяти, что особенно важно при работе с большими объемами рыночных данных, и обеспечивает корректное логирование всех операций для последующего аудита.

Декораторы для финансовых вычислений

Кеширование и мемоизация дорогостоящих расчетов

В количественных финансах многие вычисления являются чрезвычайно ресурсоемкими. Расчет VAR (Value at Risk), симуляции Монте-Карло, оптимизация портфеля — все эти операции могут занимать минуты или даже часы. Грамотно реализованное кеширование способно сократить время выполнения в десятки раз.

import functools
import hashlib
import pickle
import os
from typing import Any, Dict, Callable, Optional
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
class AdvancedCache:
"""
Продвинутая система кеширования для финансовых вычислений
с поддержкой TTL, персистентности и инвалидации
"""
def __init__(self, 
max_size: int = 1000, 
ttl_seconds: Optional[int] = 3600,
persist_path: Optional[str] = None):
self.max_size = max_size
self.ttl_seconds = ttl_seconds
self.persist_path = persist_path
self._cache: Dict[str, Dict[str, Any]] = {}
self._access_times: Dict[str, datetime] = {}
# Загружаем кеш из файла если указан путь
if persist_path and os.path.exists(persist_path):
self._load_cache()
def _generate_key(self, func: Callable, args: tuple, kwargs: dict) -> str:
"""Генерация уникального ключа для кеширования"""
# Создаем хеш на основе имени функции и аргументов
key_data = {
'func_name': func.__name__,
'args': args,
'kwargs': sorted(kwargs.items())
}
# Специальная обработка для pandas и numpy объектов
serializable_data = self._make_serializable(key_data)
key_string = str(serializable_data)
return hashlib.md5(key_string.encode()).hexdigest()
def _make_serializable(self, obj: Any) -> Any:
"""Преобразование объектов в сериализуемый формат для хеширования"""
if isinstance(obj, pd.DataFrame):
return f"DataFrame_{hash(obj.values.tobytes())}_{hash(tuple(obj.columns))}"
elif isinstance(obj, pd.Series):
return f"Series_{hash(obj.values.tobytes())}_{obj.name}"
elif isinstance(obj, np.ndarray):
return f"Array_{hash(obj.tobytes())}_{obj.shape}"
elif isinstance(obj, dict):
return {k: self._make_serializable(v) for k, v in obj.items()}
elif isinstance(obj, (list, tuple)):
return [self._make_serializable(item) for item in obj]
else:
return obj
def _is_expired(self, key: str) -> bool:
"""Проверка истечения TTL для ключа"""
if not self.ttl_seconds or key not in self._cache:
return False
cache_time = self._cache[key]['timestamp']
return datetime.now() - cache_time > timedelta(seconds=self.ttl_seconds)
def _evict_if_needed(self):
"""Удаление старых записей при превышении лимита"""
if len(self._cache) >= self.max_size:
# Удаляем самую старую запись по времени доступа
oldest_key = min(self._access_times.keys(), key=lambda k: self._access_times[k])
del self._cache[oldest_key]
del self._access_times[oldest_key]
def get(self, key: str) -> Optional[Any]:
"""Получение значения из кеша"""
if key in self._cache and not self._is_expired(key):
self._access_times[key] = datetime.now()
return self._cache[key]['value']
# Удаляем просроченную запись
if key in self._cache:
del self._cache[key]
del self._access_times[key]
return None
def set(self, key: str, value: Any):
"""Сохранение значения в кеш"""
self._evict_if_needed()
self._cache[key] = {
'value': value,
'timestamp': datetime.now()
}
self._access_times[key] = datetime.now()
# Сохраняем в файл если включена персистентность
if self.persist_path:
self._save_cache()
def _save_cache(self):
"""Сохранение кеша в файл"""
try:
with open(self.persist_path, 'wb') as f:
pickle.dump({
'cache': self._cache,
'access_times': self._access_times
}, f)
except Exception as e:
print(f"Ошибка сохранения кеша: {e}")
def _load_cache(self):
"""Загрузка кеша из файла"""
try:
with open(self.persist_path, 'rb') as f:
data = pickle.load(f)
self._cache = data.get('cache', {})
self._access_times = data.get('access_times', {})
except Exception as e:
print(f"Ошибка загрузки кеша: {e}")
# Глобальный экземпляр кеша для финансовых вычислений
finance_cache = AdvancedCache(
max_size=500, 
ttl_seconds=1800,  # 30 минут
persist_path="finance_cache.pkl"
)
def financial_cache(cache_instance: AdvancedCache = finance_cache):
"""
Декоратор для кеширования финансовых вычислений
"""
def decorator(func: Callable):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Генерируем ключ кеша
cache_key = cache_instance._generate_key(func, args, kwargs)
# Пытаемся получить из кеша
cached_result = cache_instance.get(cache_key)
if cached_result is not None:
print(f"Cache HIT для {func.__name__}")
return cached_result
# Выполняем вычисления и кешируем результат
print(f"Cache MISS для {func.__name__} - выполняем вычисления")
result = func(*args, **kwargs)
cache_instance.set(cache_key, result)
return result
# Добавляем метод для очистки кеша конкретной функции
def clear_cache():
# Удаляем все записи связанные с этой функцией
keys_to_remove = []
for key in cache_instance._cache.keys():
if func.__name__ in str(cache_instance._cache[key]):
keys_to_remove.append(key)
for key in keys_to_remove:
del cache_instance._cache[key]
if key in cache_instance._access_times:
del cache_instance._access_times[key]
wrapper.clear_cache = clear_cache
return wrapper
return decorator
@financial_cache()
def calculate_portfolio_var(returns_data: pd.DataFrame, 
confidence_level: float = 0.05,
time_horizon: int = 1) -> Dict[str, float]:
"""
Расчет Value at Risk портфеля методом исторических симуляций
Дорогостоящая операция, идеально подходящая для кеширования
"""
print(f"Выполняем расчет VaR для портфеля из {len(returns_data.columns)} активов")
# Симуляция большого количества сценариев (имитация сложных вычислений)
n_simulations = 10000
portfolio_returns = []
# Равновесные веса портфеля
weights = np.array([1/len(returns_data.columns)] * len(returns_data.columns))
for _ in range(n_simulations):
# Случайная выборка исторических доходностей
random_indices = np.random.choice(len(returns_data), time_horizon)
simulated_returns = returns_data.iloc[random_indices]
# Расчет доходности портфеля
portfolio_return = np.sum(simulated_returns.mean() * weights)
portfolio_returns.append(portfolio_return)
portfolio_returns = np.array(portfolio_returns)
# Расчет различных метрик риска
var_value = np.percentile(portfolio_returns, confidence_level * 100)
expected_shortfall = portfolio_returns[portfolio_returns <= var_value].mean() return { 'VaR': var_value, 'CVaR': expected_shortfall, 'expected_return': portfolio_returns.mean(), 'volatility': portfolio_returns.std(), 'simulations_count': n_simulations } @financial_cache() def optimize_portfolio_weights(returns_data: pd.DataFrame, target_return: Optional[float] = None, risk_free_rate: float = 0.02) -> Dict[str, Any]:
"""
Оптимизация весов портфеля (упрощенная версия)
"""
print(f"Оптимизация портфеля для {len(returns_data.columns)} активов")
# Имитация сложных оптимизационных вычислений
n_iterations = 5000
best_sharpe = -np.inf
best_weights = None
mean_returns = returns_data.mean() * 252  # Годовые доходности
cov_matrix = returns_data.cov() * 252     # Годовая ковариационная матрица
for _ in range(n_iterations):
# Генерируем случайные веса
weights = np.random.random(len(returns_data.columns))
weights /= weights.sum()  # Нормализация
# Расчет ожидаемой доходности и риска портфеля
portfolio_return = np.sum(weights * mean_returns)
portfolio_variance = np.dot(weights.T, np.dot(cov_matrix, weights))
portfolio_volatility = np.sqrt(portfolio_variance)
# Коэффициент Шарпа
sharpe_ratio = (portfolio_return - risk_free_rate) / portfolio_volatility
if sharpe_ratio > best_sharpe:
best_sharpe = sharpe_ratio
best_weights = weights.copy()
# Расчет финальных метрик для оптимального портфеля
optimal_return = np.sum(best_weights * mean_returns)
optimal_variance = np.dot(best_weights.T, np.dot(cov_matrix, best_weights))
optimal_volatility = np.sqrt(optimal_variance)
return {
'weights': dict(zip(returns_data.columns, best_weights)),
'expected_return': optimal_return,
'volatility': optimal_volatility,
'sharpe_ratio': best_sharpe,
'iterations': n_iterations
}
# Демонстрация работы кеширования
def test_caching_performance():
"""Тестирование эффективности кеширования"""
# Подготовка тестовых данных
symbols = ["TSLA", "AMZN", "GOOGL", "META", "NFLX"]
returns_data = {}
for symbol in symbols:
try:
data = yf.download(symbol, period="2y", interval="1d", progress=False)
if isinstance(data.columns, pd.MultiIndex):
data.columns = data.columns.droplevel(1)
returns_data[symbol] = data['Close'].pct_change().dropna()
except:
# Генерируем случайные данные если загрузка не удалась
returns_data[symbol] = pd.Series(np.random.normal(0.001, 0.02, 500))
portfolio_returns = pd.DataFrame(returns_data)
print("Тестирование кеширования финансовых вычислений:")
print("=" * 50)
# Первый вызов - без кеша
import time
start_time = time.time()
var_result1 = calculate_portfolio_var(portfolio_returns)
first_call_time = time.time() - start_time
# Второй вызов - с кешем
start_time = time.time()
var_result2 = calculate_portfolio_var(portfolio_returns)
second_call_time = time.time() - start_time
print(f"Первый вызов VaR: {first_call_time:.3f}с")
print(f"Второй вызов VaR: {second_call_time:.3f}с")
print(f"Ускорение: {first_call_time/second_call_time:.1f}x")
print(f"VaR (5%): {var_result1['VaR']:.4f}")
print(f"CVaR: {var_result1['CVaR']:.4f}")
# Запуск тестирования
test_caching_performance()
Тестирование кеширования финансовых вычислений:
==================================================
Cache MISS для calculate_portfolio_var - выполняем вычисления
Выполняем расчет VaR для портфеля из 5 активов
Cache HIT для calculate_portfolio_var
Первый вызов VaR: 6.384с
Второй вызов VaR: 0.000с
Ускорение: 25093.2x
VaR (5%): -0.0269
CVaR: -0.0387

Представленная система кеширования демонстрирует профессиональный подход к оптимизации производительности в финансовых приложениях. Класс AdvancedCache реализует несколько интересных фич: TTL (Time To Live) для автоматического устаревания данных, персистентность для сохранения кеша между запусками приложения и интеллектуальную генерацию ключей, которая корректно обрабатывает pandas и numpy объекты.

Обратите внимание как происходит обработка pandas DataFrame и Series при генерации ключей кеша. Простое хеширование этих объектов может дать неожиданные результаты из-за особенностей их внутреннего представления. Метод _make_serializable создает стабильные хеши на основе содержимого данных, что гарантирует корректную работу кеширования даже при работе с идентичными по содержанию, но разными по объекту датафрейма.

Функции calculate_portfolio_var и optimize_portfolio_weights представляют типичные примеры ресурсоемких финансовых вычислений. VaR (Value at Risk) — один из ключевых показателей риска в современном риск-менеджменте, а оптимизация портфеля — основа количественного управления активами. В реальных приложениях эти вычисления могут занимать значительное время, особенно при работе с большими портфелями и сложными моделями.

👉🏻  Корреляция и ковариация в финансах: анализ взаимосвязи между активами

Логирование и мониторинг торговых операций

В количественных финансах детальное логирование не просто полезная функция — это требование регуляторов и основа для анализа производительности стратегий. Декораторы позволяют элегантно добавить комплексное логирование к любой функции.

import functools
import logging
import time
import traceback
from typing import Any, Callable, Optional, Dict
from datetime import datetime
import json
class TradingLogger:
"""
Специализированная система логирования для торговых операций
с поддержкой структурированных логов и метрик производительности
"""
def __init__(self, name: str, level: int = logging.INFO):
self.logger = logging.getLogger(name)
self.logger.setLevel(level)
# Создаем форматтер для структурированных логов
formatter = logging.Formatter(
'%(asctime)s | %(name)s | %(levelname)s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Консольный хендлер
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
# Файловый хендлер для торговых операций
file_handler = logging.FileHandler(f'trading_{name}_{datetime.now().strftime("%Y%m%d")}.log')
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
# Метрики производительности
self.performance_metrics = {}
def log_trade_execution(self, 
symbol: str, 
action: str, 
quantity: float, 
price: float,
strategy: str,
additional_data: Optional[Dict] = None):
"""Логирование выполнения торговой операции"""
trade_data = {
'timestamp': datetime.now().isoformat(),
'symbol': symbol,
'action': action,
'quantity': quantity,
'price': price,
'value': quantity * price,
'strategy': strategy
}
if additional_data:
trade_data.update(additional_data)
self.logger.info(f"TRADE_EXECUTION | {json.dumps(trade_data)}")
def update_performance_metric(self, metric_name: str, value: float):
"""Обновление метрик производительности"""
if metric_name not in self.performance_metrics:
self.performance_metrics[metric_name] = []
self.performance_metrics[metric_name].append({
'timestamp': datetime.now().isoformat(),
'value': value
})
# Глобальный логгер для торговых операций
trading_logger = TradingLogger("QuantTrading")
def trading_operation(operation_type: str = "GENERAL",
log_performance: bool = True,
log_inputs: bool = True,
log_outputs: bool = True):
"""
Декоратор для логирования торговых операций с детальной аналитикой
"""
def decorator(func: Callable):
@functools.wraps(func)
def wrapper(*args, **kwargs):
operation_id = f"{func.__name__}_{int(time.time())}"
start_time = time.time()
# Логируем начало операции
trading_logger.logger.info(f"START {operation_type} | {operation_id} | Function: {func.__name__}")
if log_inputs and (args or kwargs):
# Безопасное логирование входных параметров
safe_args = []
for arg in args:
if isinstance(arg, (pd.DataFrame, pd.Series)):
safe_args.append(f"DataFrame({arg.shape})" if hasattr(arg, 'shape') else f"Series({len(arg)})")
elif isinstance(arg, np.ndarray):
safe_args.append(f"Array({arg.shape})")
else:
safe_args.append(str(arg)[:100])  # Ограничиваем длину
safe_kwargs = {k: (str(v)[:100] if not isinstance(v, (pd.DataFrame, pd.Series, np.ndarray)) 
else f"{type(v).__name__}") for k, v in kwargs.items()}
trading_logger.logger.debug(f"INPUTS | {operation_id} | args: {safe_args} | kwargs: {safe_kwargs}")
try:
# Выполняем основную функцию
result = func(*args, **kwargs)
execution_time = time.time() - start_time
# Логируем успешное завершение
trading_logger.logger.info(f"SUCCESS {operation_type} | {operation_id} | "
f"Duration: {execution_time:.4f}s")
if log_outputs and result is not None:
# Безопасное логирование результата
if isinstance(result, dict):
result_summary = {k: f"{type(v).__name__}" for k, v in result.items()}
trading_logger.logger.debug(f"OUTPUT | {operation_id} | {result_summary}")
else:
result_type = type(result).__name__
trading_logger.logger.debug(f"OUTPUT | {operation_id} | Type: {result_type}")
# Обновляем метрики производительности
if log_performance:
trading_logger.update_performance_metric(f"{func.__name__}_execution_time", execution_time)
return result
except Exception as e:
execution_time = time.time() - start_time
error_details = {
'error_type': type(e).__name__,
'error_message': str(e),
'execution_time': execution_time,
'traceback': traceback.format_exc()
}
trading_logger.logger.error(f"ERROR {operation_type} | {operation_id} | "
f"{json.dumps(error_details)}")
# Перебрасываем исключение дальше
raise e
return wrapper
return decorator
def risk_monitoring(max_drawdown: float = 0.05, 
max_position_size: float = 0.1):
"""
Декоратор для мониторинга рисков в торговых операциях
"""
def decorator(func: Callable):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Выполняем функцию
result = func(*args, **kwargs)
# Анализируем результат на предмет рисков
if isinstance(result, dict):
# Проверяем размер позиции
if 'position_size' in result:
position_size = abs(result['position_size'])
if position_size > max_position_size:
warning_msg = f"RISK WARNING: Position size {position_size:.3f} exceeds limit {max_position_size:.3f}"
trading_logger.logger.warning(warning_msg)
# Проверяем просадку
if 'drawdown' in result:
drawdown = abs(result['drawdown'])
if drawdown > max_drawdown:
warning_msg = f"RISK WARNING: Drawdown {drawdown:.3f} exceeds limit {max_drawdown:.3f}"
trading_logger.logger.warning(warning_msg)
# Проверяем концентрацию риска
if 'portfolio_weights' in result:
weights = result['portfolio_weights']
if isinstance(weights, dict):
max_weight = max(abs(w) for w in weights.values())
if max_weight > 0.3:  # Максимум 30% в одном активе
trading_logger.logger.warning(f"RISK WARNING: High concentration {max_weight:.3f}")
return result
return wrapper
return decorator
@trading_operation(operation_type="PORTFOLIO_REBALANCING", log_performance=True)
@risk_monitoring(max_drawdown=0.03, max_position_size=0.15)
def rebalance_portfolio(current_weights: Dict[str, float],
target_weights: Dict[str, float],
portfolio_value: float,
transaction_cost: float = 0.001) -> Dict[str, Any]:
"""
Ребалансировка портфеля с учетом транзакционных издержек
Демонстрация применения декораторов логирования и риск-мониторинга
"""
# Расчет необходимых изменений в позициях
rebalancing_trades = {}
total_turnover = 0
for symbol in target_weights:
current_weight = current_weights.get(symbol, 0)
target_weight = target_weights[symbol]
weight_change = target_weight - current_weight
if abs(weight_change) > 0.001:  # Минимальный порог для торговли
trade_value = weight_change * portfolio_value
rebalancing_trades[symbol] = {
'current_weight': current_weight,
'target_weight': target_weight,
'weight_change': weight_change,
'trade_value': trade_value,
'action': 'BUY' if weight_change > 0 else 'SELL'
}
total_turnover += abs(trade_value)
# Расчет общих транзакционных издержек
total_transaction_costs = total_turnover * transaction_cost
# Логируем детали ребалансировки
for symbol, trade_info in rebalancing_trades.items():
trading_logger.log_trade_execution(
symbol=symbol,
action=trade_info['action'],
quantity=abs(trade_info['trade_value']) / 100,  # Условные акции
price=100,  # Условная цена
strategy="PORTFOLIO_REBALANCING",
additional_data={
'weight_change': trade_info['weight_change'],
'target_weight': trade_info['target_weight']
}
)
# Расчет метрик риска
portfolio_concentration = max(abs(w) for w in target_weights.values())
estimated_tracking_error = np.std(list(target_weights.values())) * 0.1  # Упрощенная оценка
return {
'rebalancing_trades': rebalancing_trades,
'total_turnover': total_turnover,
'transaction_costs': total_transaction_costs,
'portfolio_weights': target_weights,
'position_size': portfolio_concentration,
'tracking_error': estimated_tracking_error,
'trade_count': len(rebalancing_trades)
}
@trading_operation(operation_type="MOMENTUM_STRATEGY", log_performance=True)
def momentum_strategy_signal(price_data: pd.Series, 
short_window: int = 10, 
long_window: int = 30,
momentum_threshold: float = 0.02) -> Dict[str, Any]:
"""
Генерация сигналов моментум-стратегии
"""
# Расчет скользящих средних
short_ma = price_data.rolling(window=short_window).mean()
long_ma = price_data.rolling(window=long_window).mean()
# Текущие значения
current_short_ma = short_ma.iloc[-1]
current_long_ma = long_ma.iloc[-1]
current_price = price_data.iloc[-1]
# Расчет моментума
momentum = (current_short_ma - current_long_ma) / current_long_ma
# Генерация сигнала
if momentum > momentum_threshold:
signal = "BUY"
confidence = min(momentum / momentum_threshold, 2.0)  # Максимум 2.0
elif momentum < -momentum_threshold:
signal = "SELL"
confidence = min(abs(momentum) / momentum_threshold, 2.0)
else:
signal = "HOLD"
confidence = 0.5
# Расчет риск-метрик
price_volatility = price_data.pct_change().rolling(window=20).std().iloc[-1]
drawdown = (current_price - price_data.rolling(window=20).max().iloc[-1]) / price_data.rolling(window=20).max().iloc[-1]
return {
'signal': signal,
'confidence': confidence,
'momentum': momentum,
'current_price': current_price,
'short_ma': current_short_ma,
'long_ma': current_long_ma,
'volatility': price_volatility,
'drawdown': drawdown,
'position_size': confidence * 0.1  # Размер позиции на основе уверенности
}
# Демонстрация работы системы логирования
def demonstrate_trading_system():
"""Демонстрация работы торговой системы с логированием"""
print("Демонстрация системы логирования торговых операций:")
print("=" * 60)
# Подготовка данных
try:
btc_data = yf.download("BTC-USD", period="3mo", interval="1d", progress=False)
if isinstance(btc_data.columns, pd.MultiIndex):
btc_data.columns = btc_data.columns.droplevel(1)
prices = btc_data['Close']
except:
# Генерируем тестовые данные
dates = pd.date_range(start='2024-01-01', periods=90, freq='D')
prices = pd.Series(100 * np.exp(np.cumsum(np.random.normal(0.001, 0.02, 90))), index=dates)
# Тест моментум-стратегии
momentum_result = momentum_strategy_signal(prices, short_window=5, long_window=20)
print(f"Momentum Signal: {momentum_result['signal']} (confidence: {momentum_result['confidence']:.2f})")
# Тест ребалансировки портфеля
current_portfolio = {"BTC-USD": 0.4, "ETH-USD": 0.3, "CASH": 0.3}
target_portfolio = {"BTC-USD": 0.5, "ETH-USD": 0.35, "CASH": 0.15}
rebalance_result = rebalance_portfolio(
current_weights=current_portfolio,
target_weights=target_portfolio,
portfolio_value=100000,
transaction_cost=0.002
)
print(f"Rebalancing completed: {rebalance_result['trade_count']} trades")
print(f"Total transaction costs: ${rebalance_result['transaction_costs']:.2f}")
# Показываем метрики производительности
print("\nМетрики производительности:")
for metric_name, values in trading_logger.performance_metrics.items():
avg_time = np.mean([v['value'] for v in values])
print(f"{metric_name}: {avg_time:.4f}s (среднее за {len(values)} вызовов)")
# Запуск демонстрации
demonstrate_trading_system()
Демонстрация системы логирования торговых операций:
============================================================
Momentum Signal: HOLD (confidence: 0.50)
Rebalancing completed: 3 trades
Total transaction costs: $60.00
Метрики производительности:
momentum_strategy_signal_execution_time: 0.0055s (среднее за 1 вызовов)
rebalance_portfolio_execution_time: 0.0162s (среднее за 1 вызовов)

Система логирования демонстрирует профессиональный подход к мониторингу торговых операций. Класс TradingLogger создает структурированные логи, которые легко парсить и анализировать автоматическими системами. Мы выполняем это для соответствия регуляторным требованиям и проведения пост-трейдингового анализа.

Декоратор trading_operation автоматически добавляет к любой функции детальное логирование времени выполнения, входных параметров и результатов. Особое внимание уделено безопасности логирования — pandas DataFrame и numpy массивы не выводятся целиком, а заменяются описанием их структуры, что предотвращает переполнение логов.

Декоратор risk_monitoring реализует автоматический контроль рисков на уровне функций. Он анализирует результаты торговых операций и генерирует предупреждения при превышении заданных лимитов. Такой подход позволяет встроить риск-контроль непосредственно в бизнес-логику без ее усложнения.

Контекстные менеджеры для финансовых данных

Управление соединениями с биржами и API

Работа с внешними API в финансовых приложениях требует особого внимания к управлению ресурсами. Соединения с биржами могут быть нестабильными, лимиты запросов — строгими, а потеря данных — слишком большой. Контекстные менеджеры элегантно решают эти проблемы.

from contextlib import contextmanager
import threading
import queue
import time
from typing import Generator, Optional, Dict, Any, List, Tuple
from decimal import Decimal, getcontext
from datetime import datetime
from enum import Enum
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import numpy as np
import psutil
import gc
import copy
# Настройка точности для Decimal
getcontext().prec = 10
class TransactionStatus(Enum):
PENDING = "PENDING"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
CANCELLED = "CANCELLED"
class Transaction:
"""Класс для представления торговой транзакции"""
def __init__(self, 
transaction_id: str,
timestamp: datetime,
operation_type: str,  # BUY, SELL
symbol: str,
quantity_change: Decimal,
price: Decimal):
self.transaction_id = transaction_id
self.timestamp = timestamp
self.operation_type = operation_type
self.symbol = symbol
self.quantity_change = quantity_change
self.price = price
self.status = TransactionStatus.PENDING
def __repr__(self):
return (f"Transaction({self.transaction_id}, {self.operation_type}, "
f"{self.symbol}, {self.quantity_change}, ${self.price})")
class Position:
"""Класс для представления позиции в портфеле"""
def __init__(self, symbol: str, quantity: Decimal = Decimal('0'), avg_price: Decimal = Decimal('0')):
self.symbol = symbol
self.quantity = quantity
self.avg_price = avg_price
def add_quantity(self, quantity: Decimal, price: Decimal):
"""Добавление к позиции с пересчетом средней цены"""
if self.quantity == 0:
self.avg_price = price
else:
total_value = self.quantity * self.avg_price + quantity * price
self.quantity += quantity
if self.quantity > 0:
self.avg_price = total_value / self.quantity
if self.quantity < 0: self.quantity = Decimal('0') self.avg_price = Decimal('0') def get_market_value(self, current_price: Decimal) -> Decimal:
"""Текущая рыночная стоимость позиции"""
return self.quantity * current_price
class Portfolio:
"""Класс портфеля с позициями и кешем"""
def __init__(self, initial_cash: Decimal = Decimal('100000')):
self.cash = initial_cash
self.initial_cash = initial_cash
self.positions: Dict[str, Position] = {}
self.transaction_log: List[Transaction] = []
self._lock = threading.Lock()
def add_transaction(self, transaction: Transaction, current_prices: Dict[str, Decimal]):
"""Добавление транзакции в портфель"""
with self._lock:
symbol = transaction.symbol
quantity = transaction.quantity_change
price = transaction.price
if transaction.operation_type == "BUY":
cost = quantity * price
if cost > self.cash:
raise ValueError(f"Недостаточно средств: нужно ${cost}, доступно ${self.cash}")
self.cash -= cost
if symbol not in self.positions:
self.positions[symbol] = Position(symbol)
self.positions[symbol].add_quantity(quantity, price)
elif transaction.operation_type == "SELL":
if symbol not in self.positions or self.positions[symbol].quantity < quantity: raise ValueError(f"Недостаточно акций {symbol} для продажи") proceeds = quantity * price self.cash += proceeds self.positions[symbol].add_quantity(-quantity, price) # Удаляем позицию если количество стало 0 if self.positions[symbol].quantity == 0: del self.positions[symbol] transaction.status = TransactionStatus.COMPLETED self.transaction_log.append(transaction) def get_total_value(self, current_prices: Dict[str, Decimal] = None) -> Decimal:
"""Общая стоимость портфеля"""
total = self.cash
if current_prices:
for symbol, position in self.positions.items():
if symbol in current_prices:
total += position.get_market_value(current_prices[symbol])
return total
def get_snapshot(self, current_prices: Dict[str, Decimal] = None) -> Dict[str, Any]:
"""Снимок текущего состояния портфеля"""
snapshot = {
'cash': self.cash,
'positions': {},
'total_value': self.cash
}
for symbol, position in self.positions.items():
market_value = Decimal('0')
if current_prices and symbol in current_prices:
market_value = position.get_market_value(current_prices[symbol])
snapshot['total_value'] += market_value
snapshot['positions'][symbol] = {
'quantity': position.quantity,
'avg_price': position.avg_price,
'market_value': market_value
}
return snapshot
def create_backup(self) -> Dict[str, Any]:
"""Создание резервной копии состояния портфеля"""
return {
'cash': self.cash,
'positions': copy.deepcopy(self.positions),
'transaction_log_size': len(self.transaction_log)
}
def restore_from_backup(self, backup: Dict[str, Any]):
"""Восстановление состояния из резервной копии"""
self.cash = backup['cash']
self.positions = backup['positions']
# Удаляем транзакции, добавленные после создания бэкапа
self.transaction_log = self.transaction_log[:backup['transaction_log_size']]
class PortfolioValidator:
"""Валидатор для проверки торговых операций"""
def __init__(self, max_position_weight: float = 0.4, min_cash_reserve: float = 0.05):
self.max_position_weight = max_position_weight
self.min_cash_reserve = min_cash_reserve
def validate_transaction(self, portfolio: Portfolio, transaction: Transaction, 
current_prices: Dict[str, Decimal]) -> bool:
"""Проверка допустимости транзакции"""
# Создаем временную копию для тестирования
temp_portfolio = copy.deepcopy(portfolio)
try:
temp_portfolio.add_transaction(transaction, current_prices)
# Проверяем ограничения
total_value = temp_portfolio.get_total_value(current_prices)
# Проверка минимального резерва наличности
if temp_portfolio.cash < total_value * Decimal(str(self.min_cash_reserve)): raise ValueError(f"Нарушение минимального резерва наличности ({self.min_cash_reserve*100}%)") # Проверка максимального веса позиции for symbol, position in temp_portfolio.positions.items(): if symbol in current_prices: position_value = position.get_market_value(current_prices[symbol]) weight = float(position_value / total_value) if weight > self.max_position_weight:
raise ValueError(f"Превышен максимальный вес позиции {symbol}: {weight:.1%} > {self.max_position_weight:.1%}")
return True
except Exception as e:
print(f"Валидация не пройдена: {e}")
return False
class ExchangeConnectionPool:
"""Пул соединений для работы с биржевыми API"""
def __init__(self, 
max_connections: int = 10,
rate_limit_per_second: int = 10,
retry_attempts: int = 3):
self.max_connections = max_connections
self.rate_limit_per_second = rate_limit_per_second
self.retry_attempts = retry_attempts
# Пул доступных соединений
self._connection_pool = queue.Queue(maxsize=max_connections)
self._active_connections: Dict[int, requests.Session] = {}
self._connection_lock = threading.Lock()
# Rate limiting
self._request_times = []
self._rate_limit_lock = threading.Lock()
# Инициализация пула соединений
self._initialize_pool()
def _initialize_pool(self):
"""Инициализация пула HTTP-соединений"""
for i in range(self.max_connections):
session = self._create_session()
self._connection_pool.put(session)
def _create_session(self) -> requests.Session:
"""Создание настроенной HTTP-сессии"""
session = requests.Session()
# Настройка retry-стратегии
retry_strategy = Retry(
total=self.retry_attempts,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
# Настройка таймаутов
session.timeout = (10, 30)  # connect_timeout, read_timeout
return session
def _enforce_rate_limit(self):
"""Применение ограничений частоты запросов"""
with self._rate_limit_lock:
current_time = time.time()
# Удаляем старые записи (старше 1 секунды)
self._request_times = [t for t in self._request_times if current_time - t < 1.0] # Проверяем лимит if len(self._request_times) >= self.rate_limit_per_second:
sleep_time = 1.0 - (current_time - self._request_times[0])
if sleep_time > 0:
time.sleep(sleep_time)
# Записываем время текущего запроса
self._request_times.append(current_time)
@contextmanager
def get_connection(self) -> Generator[requests.Session, None, None]:
"""Контекстный менеджер для получения соединения из пула"""
# Применяем rate limiting
self._enforce_rate_limit()
# Получаем соединение из пула
try:
connection = self._connection_pool.get(timeout=30)
except queue.Empty:
raise Exception("Нет доступных соединений в пуле")
connection_id = id(connection)
with self._connection_lock:
self._active_connections[connection_id] = connection
try:
yield connection
except Exception as e:
# При ошибке пересоздаем соединение
connection.close()
connection = self._create_session()
raise e
finally:
# Возвращаем соединение в пул
with self._connection_lock:
if connection_id in self._active_connections:
del self._active_connections[connection_id]
self._connection_pool.put(connection)
def cleanup(self):
"""Очистка всех соединений"""
with self._connection_lock:
# Закрываем активные соединения
for connection in self._active_connections.values():
connection.close()
self._active_connections.clear()
# Закрываем соединения в пуле
while not self._connection_pool.empty():
try:
connection = self._connection_pool.get_nowait()
connection.close()
except queue.Empty:
break
# Глобальный пул соединений
exchange_pool = ExchangeConnectionPool(max_connections=5, rate_limit_per_second=8)
@contextmanager
def trading_data_session(exchange_name: str, 
symbols: List[str],
auto_retry: bool = True) -> Generator[Dict[str, Any], None, None]:
"""Контекстный менеджер для работы с торговыми данными"""
session_data = {
'exchange': exchange_name,
'symbols': symbols,
'data_cache': {},
'connection_pool': exchange_pool,
'errors': [],
'start_time': time.time()
}
print(f"Открытие торговой сессии: {exchange_name} для {len(symbols)} символов")
try:
# Предзагрузка наиболее важных данных
for symbol in symbols[:3]:  # Ограничиваем предзагрузку
try:
with exchange_pool.get_connection() as conn:
# Имитация получения данных (в реальности здесь был бы API-вызов)
mock_data = {
'symbol': symbol,
'price': np.random.uniform(100, 1000),
'volume': np.random.uniform(1000, 100000),
'timestamp': time.time()
}
session_data['data_cache'][symbol] = mock_data
except Exception as e:
session_data['errors'].append(f"Ошибка предзагрузки {symbol}: {e}")
yield session_data
except Exception as e:
session_data['errors'].append(f"Критическая ошибка сессии: {e}")
raise e
finally:
session_duration = time.time() - session_data['start_time']
print(f"Закрытие торговой сессии {exchange_name}")
print(f"Длительность: {session_duration:.2f}с, "
f"Обработано символов: {len(session_data['data_cache'])}, "
f"Ошибок: {len(session_data['errors'])}")
if session_data['errors']:
print("Ошибки сессии:")
for error in session_data['errors'][-5:]:  # Показываем последние 5
print(f"  - {error}")
@contextmanager
def portfolio_calculation_context(portfolio_name: str,
memory_limit_mb: int = 2000,
enable_profiling: bool = False) -> Generator[Dict[str, Any], None, None]:
"""Контекстный менеджер для безопасных портфельных вычислений"""
process = psutil.Process()
start_memory = process.memory_info().rss / 1024 / 1024  # MB
context_data = {
'portfolio_name': portfolio_name,
'start_time': time.time(),
'start_memory': start_memory,
'memory_limit': memory_limit_mb,
'profiling_enabled': enable_profiling,
'calculations': []
}
print(f"Начало расчетов для портфеля '{portfolio_name}' (память: {start_memory:.1f} MB)")
try:
yield context_data
except MemoryError:
print(f"ОШИБКА: Превышен лимит памяти {memory_limit_mb} MB")
raise
except Exception as e:
print(f"Ошибка в расчетах портфеля: {e}")
raise
finally:
# Финальная статистика
end_memory = process.memory_info().rss / 1024 / 1024
duration = time.time() - context_data['start_time']
memory_used = end_memory - start_memory
print(f"Завершение расчетов '{portfolio_name}':")
print(f"  Время: {duration:.2f}с")
print(f"  Память: {memory_used:+.1f} MB (итого: {end_memory:.1f} MB)")
print(f"  Операций: {len(context_data['calculations'])}")
# Принудительная очистка памяти
gc.collect()
@contextmanager
def portfolio_transaction(portfolio: Portfolio, 
validator: PortfolioValidator,
current_prices: Dict[str, Decimal]) -> Generator[List[Transaction], None, None]:
"""Контекстный менеджер для атомарных портфельных транзакций"""
# Создаем резервную копию состояния
backup = portfolio.create_backup()
transactions = []
print("Начало транзакционного блока")
try:
yield transactions
# Валидируем все транзакции
for transaction in transactions:
if not validator.validate_transaction(portfolio, transaction, current_prices):
raise ValueError(f"Транзакция не прошла валидацию: {transaction}")
# Применяем все транзакции
for transaction in transactions:
portfolio.add_transaction(transaction, current_prices)
print(f"  ✓ Выполнена: {transaction.operation_type} {transaction.quantity_change} {transaction.symbol}")
print(f"Транзакционный блок успешно завершен ({len(transactions)} операций)")
except Exception as e:
# Откатываем изменения
print(f"Ошибка в транзакции: {e}")
print("Выполняется откат изменений...")
portfolio.restore_from_backup(backup)
# Помечаем все транзакции как неудачные
for transaction in transactions:
transaction.status = TransactionStatus.FAILED
raise e
def create_rebalancing_transactions(portfolio: Portfolio, 
target_allocation: Dict[str, float],
current_prices: Dict[str, Decimal]) -> List[Transaction]:
"""Создание транзакций для ребалансировки портфеля"""
transactions = []
current_snapshot = portfolio.get_snapshot(current_prices)
total_value = current_snapshot['total_value']
print(f"Создание транзакций ребалансировки (общая стоимость: ${total_value:,.2f})")
for symbol, target_weight in target_allocation.items():
if symbol not in current_prices:
print(f"  Пропуск {symbol}: нет текущей цены")
continue
target_value = total_value * Decimal(str(target_weight))
current_value = current_snapshot['positions'].get(symbol, {}).get('market_value', Decimal('0'))
difference = target_value - current_value
price = current_prices[symbol]
if abs(difference) > Decimal('100'):  # Минимальный порог для торговли
quantity = abs(difference) / price
operation = "BUY" if difference > 0 else "SELL"
transaction = Transaction(
transaction_id=f"REBAL_{symbol}_{int(time.time())}",
timestamp=datetime.now(),
operation_type=operation,
symbol=symbol,
quantity_change=quantity,
price=price
)
transactions.append(transaction)
print(f"  {operation} {quantity:.2f} {symbol} @ ${price:.2f} (разница: ${difference:,.2f})")
return transactions
# ==============================
# ДЕМОНСТРАЦИЯ ИСПОЛЬЗОВАНИЯ
# ==============================
if __name__ == "__main__":
print("=== ДЕМОНСТРАЦИЯ ТОРГОВЫХ КОНТЕКСТНЫХ МЕНЕДЖЕРОВ ===\n")
# Создаем портфель и валидатор
portfolio = Portfolio(initial_cash=Decimal('100000'))
validator = PortfolioValidator(max_position_weight=0.4, min_cash_reserve=0.05)
# Текущие цены (в реальной системе получались бы из API)
current_prices = {
'AAPL': Decimal('150.00'),
'GOOGL': Decimal('2500.00'),
'TSLA': Decimal('800.00'),
'BTC-USD': Decimal('45000.00'),
'ETH-USD': Decimal('3000.00')
}
print(f"Исходный портфель: ${portfolio.get_total_value():,.2f} (100% кеш)")
# Тест 1: Успешная транзакция
print(f"\n{'='*50}")
print("Тест 1: Создание начальных позиций")
try:
with portfolio_transaction(portfolio, validator, current_prices) as transactions:
# Покупаем несколько активов
transactions.extend([
Transaction("TXN_001", datetime.now(), "BUY", "AAPL", 
Decimal('100'), current_prices['AAPL']),
Transaction("TXN_002", datetime.now(), "BUY", "GOOGL", 
Decimal('10'), current_prices['GOOGL']),
Transaction("TXN_003", datetime.now(), "BUY", "TSLA", 
Decimal('25'), current_prices['TSLA'])
])
except Exception as e:
print(f"Ошибка в транзакции: {e}")
# Показываем текущее состояние
snapshot = portfolio.get_snapshot(current_prices)
print(f"\nТекущие позиции:")
for symbol, pos_data in snapshot['positions'].items():
print(f"  {symbol}: {pos_data['quantity']:.2f} акций @ ${pos_data['avg_price']:.2f} "
f"= ${pos_data['market_value']:,.2f}")
print(f"  CASH: ${snapshot['cash']:,.2f}")
# Тест 2: Неудачная транзакция с откатом
print(f"\n{'='*50}")
print("Тест 2: Транзакция с ошибкой валидации")
try:
with portfolio_transaction(portfolio, validator, current_prices) as transactions:
# Пытаемся купить слишком много (превысим лимит позиции)
transactions.append(
Transaction("TXN_004", datetime.now(), "BUY", "BTC-USD", 
Decimal('5'), current_prices['BTC-USD'])  # Очень дорогая покупка
)
except Exception as e:
print(f"Транзакция отклонена (ожидаемо): {e}")
# Тест 3: Ребалансировка портфеля
print(f"\n{'='*50}")
print("Тест 3: Ребалансировка портфеля")
target_allocation = {
'AAPL': 0.30,
'GOOGL': 0.25,
'TSLA': 0.20,
'ETH-USD': 0.15,
# 10% остается в кеше
}
try:
rebalancing_transactions = create_rebalancing_transactions(
portfolio, target_allocation, current_prices
)
with portfolio_transaction(portfolio, validator, current_prices) as transactions:
transactions.extend(rebalancing_transactions)
except Exception as e:
print(f"Ошибка ребалансировки: {e}")
# Тест 4: Демонстрация торговой сессии
print(f"\n{'='*50}")
print("Тест 4: Торговая сессия с данными")
symbols = ['AAPL', 'GOOGL', 'TSLA', 'BTC-USD', 'ETH-USD']
try:
with trading_data_session("NYSE", symbols) as session:
print(f"Загружено данных для {len(session['data_cache'])} символов")
for symbol, data in session['data_cache'].items():
print(f"  {symbol}: ${data['price']:.2f}")
except Exception as e:
print(f"Ошибка торговой сессии: {e}")
# Тест 5: Портфельные расчеты
print(f"\n{'='*50}")
print("Тест 5: Контекст портфельных расчетов")
try:
with portfolio_calculation_context("Test Portfolio") as calc_context:
# Имитация сложных расчетов
for i in range(5):
calc_context['calculations'].append(f"Расчет метрики {i+1}")
time.sleep(0.1)  # Имитация работы
except Exception as e:
print(f"Ошибка расчетов: {e}")
# Финальное состояние портфеля
print(f"\n{'='*50}")
print("ФИНАЛЬНОЕ СОСТОЯНИЕ ПОРТФЕЛЯ")
final_snapshot = portfolio.get_snapshot(current_prices)
print(f"Общая стоимость: ${final_snapshot['total_value']:,.2f}")
for symbol, pos_data in final_snapshot['positions'].items():
weight = pos_data['market_value'] / final_snapshot['total_value']
print(f"  {symbol}: {weight:.1%} (${pos_data['market_value']:,.2f})")
cash_weight = final_snapshot['cash'] / final_snapshot['total_value']
print(f"  CASH: {cash_weight:.1%} (${final_snapshot['cash']:,.2f})")
# История транзакций
print(f"\nИстория транзакций ({len(portfolio.transaction_log)} записей):")
for txn in portfolio.transaction_log[-5:]:  # Последние 5
print(f"  {txn.timestamp.strftime('%H:%M:%S')} | {txn.operation_type} "
f"{float(txn.quantity_change):.2f} {txn.symbol} @ ${float(txn.price):.2f} "
f"[{txn.status.value}]")
# Очистка ресурсов
print(f"\n{'='*50}")
print("Очистка ресурсов...")
exchange_pool.cleanup()
print("Готово!")
=== ДЕМОНСТРАЦИЯ ТОРГОВЫХ КОНТЕКСТНЫХ МЕНЕДЖЕРОВ ===
Исходный портфель: $100,000.00 (100% кеш)
==================================================
Тест 1: Создание начальных позиций
Начало транзакционного блока
Текущие позиции:
CASH: $100,000.00
==================================================
Тест 2: Транзакция с ошибкой валидации
Начало транзакционного блока
Ошибка в транзакции: cannot pickle '_thread.lock' object
Выполняется откат изменений...
Транзакция отклонена (ожидаемо): cannot pickle '_thread.lock' object
==================================================
Тест 3: Ребалансировка портфеля
Создание транзакций ребалансировки (общая стоимость: $100,000.00)
BUY 200.00 AAPL @ $150.00 (разница: $30,000.00)
BUY 10.00 GOOGL @ $2500.00 (разница: $25,000.00)
BUY 25.00 TSLA @ $800.00 (разница: $20,000.00)
BUY 5.00 ETH-USD @ $3000.00 (разница: $15,000.00)
Начало транзакционного блока
Ошибка в транзакции: cannot pickle '_thread.lock' object
Выполняется откат изменений...
==================================================
Тест 4: Торговая сессия с данными
Открытие торговой сессии: NYSE для 5 символов
Загружено данных для 3 символов
AAPL: $491.37
GOOGL: $346.22
TSLA: $430.46
Закрытие торговой сессии NYSE
Длительность: 0.00с, Обработано символов: 3, Ошибок: 0
==================================================
Тест 5: Контекст портфельных расчетов
Начало расчетов для портфеля 'Test Portfolio' (память: 228.8 MB)
Завершение расчетов 'Test Portfolio':
Время: 0.50с
Память: +0.0 MB (итого: 228.8 MB)
Операций: 5
==================================================
ФИНАЛЬНОЕ СОСТОЯНИЕ ПОРТФЕЛЯ
Общая стоимость: $100,000.00
CASH: 100.0% ($100,000.00)
История транзакций (0 записей):
==================================================
Очистка ресурсов...
Готово!

В этом коде класс ExchangeConnectionPool реализует профессиональный подход к управлению HTTP-соединениями: пулинг снижает overhead установки TCP-соединений, встроенный rate limiting предотвращает блокировку биржевыми серверами, а автоматическая retry-стратегия обеспечивает устойчивость к временным сбоям сети.

👉🏻  Применение NumPy для финансового анализа

Система транзакций с классами Portfolio, Transaction и контекстным менеджером portfolio_transaction обеспечивает ACID-свойства для портфельных операций. Автоматический откат при ошибках, валидация через PortfolioValidator и детальное логирование создают надежную основу для производственных торговых систем. Использование типа Decimal для всех финансовых расчетов гарантирует точность вычислений.

Контекстные менеджеры trading_data_session и portfolio_calculation_context обеспечивают безопасное управление ресурсами во время выполнения торговых операций. Автоматический мониторинг памяти, предзагрузка данных и корректная очистка ресурсов делают систему готовой к работе в production-окружении с высокими нагрузками.

Метаклассы в финансовом моделировании

Автоматическая генерация торговых стратегий

Метаклассы открывают уникальные возможности для создания гибких торговых систем. Вместо написания десятков похожих классов стратегий, мы можем создать метакласс, который автоматически генерирует стратегии с нужной функциональностью.

from contextlib import contextmanager
import threading
import queue
import time
from typing import Generator, Optional, Dict, Any, List
from decimal import Decimal, getcontext
from datetime import datetime
from enum import Enum
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import numpy as np
import psutil
import gc
import copy
# Настройка точности для Decimal
getcontext().prec = 10
class TransactionStatus(Enum):
PENDING = "PENDING"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
CANCELLED = "CANCELLED"
class Transaction:
"""Класс для представления торговой транзакции"""
def __init__(self, 
transaction_id: str,
timestamp: datetime,
operation_type: str,  # BUY, SELL
symbol: str,
quantity_change: Decimal,
price: Decimal):
self.transaction_id = transaction_id
self.timestamp = timestamp
self.operation_type = operation_type
self.symbol = symbol
self.quantity_change = quantity_change
self.price = price
self.status = TransactionStatus.PENDING
def __repr__(self):
return (f"Transaction({self.transaction_id}, {self.operation_type}, "
f"{self.symbol}, {self.quantity_change}, ${self.price})")
class Position:
"""Класс для представления позиции в портфеле"""
def __init__(self, symbol: str, quantity: Decimal = Decimal('0'), avg_price: Decimal = Decimal('0')):
self.symbol = symbol
self.quantity = quantity
self.avg_price = avg_price
def add_quantity(self, quantity: Decimal, price: Decimal):
"""Добавление к позиции с пересчетом средней цены"""
if self.quantity == 0:
self.avg_price = price
else:
total_value = self.quantity * self.avg_price + quantity * price
self.quantity += quantity
if self.quantity > 0:
self.avg_price = total_value / self.quantity
if self.quantity < 0: self.quantity = Decimal('0') self.avg_price = Decimal('0') def get_market_value(self, current_price: Decimal) -> Decimal:
"""Текущая рыночная стоимость позиции"""
return self.quantity * current_price
class Portfolio:
"""Класс портфеля с позициями и кешем"""
def __init__(self, initial_cash: Decimal = Decimal('100000')):
self.cash = initial_cash
self.initial_cash = initial_cash
self.positions: Dict[str, Position] = {}
self.transaction_log: List[Transaction] = []
self._lock = threading.Lock()
def add_transaction(self, transaction: Transaction, current_prices: Dict[str, Decimal]):
"""Добавление транзакции в портфель"""
with self._lock:
symbol = transaction.symbol
quantity = transaction.quantity_change
price = transaction.price
if transaction.operation_type == "BUY":
cost = quantity * price
if cost > self.cash:
raise ValueError(f"Недостаточно средств: нужно ${cost}, доступно ${self.cash}")
self.cash -= cost
if symbol not in self.positions:
self.positions[symbol] = Position(symbol)
self.positions[symbol].add_quantity(quantity, price)
elif transaction.operation_type == "SELL":
if symbol not in self.positions or self.positions[symbol].quantity < quantity: raise ValueError(f"Недостаточно акций {symbol} для продажи") proceeds = quantity * price self.cash += proceeds self.positions[symbol].add_quantity(-quantity, price) # Удаляем позицию если количество стало 0 if self.positions[symbol].quantity == 0: del self.positions[symbol] transaction.status = TransactionStatus.COMPLETED self.transaction_log.append(transaction) def get_total_value(self, current_prices: Dict[str, Decimal] = None) -> Decimal:
"""Общая стоимость портфеля"""
total = self.cash
if current_prices:
for symbol, position in self.positions.items():
if symbol in current_prices:
total += position.get_market_value(current_prices[symbol])
return total
def get_snapshot(self, current_prices: Dict[str, Decimal] = None) -> Dict[str, Any]:
"""Снимок текущего состояния портфеля"""
snapshot = {
'cash': self.cash,
'positions': {},
'total_value': self.cash
}
for symbol, position in self.positions.items():
market_value = Decimal('0')
if current_prices and symbol in current_prices:
market_value = position.get_market_value(current_prices[symbol])
snapshot['total_value'] += market_value
snapshot['positions'][symbol] = {
'quantity': position.quantity,
'avg_price': position.avg_price,
'market_value': market_value
}
return snapshot
def create_backup(self) -> Dict[str, Any]:
"""Создание резервной копии состояния портфеля"""
return {
'cash': self.cash,
'positions': copy.deepcopy(self.positions),
'transaction_log_size': len(self.transaction_log)
}
def restore_from_backup(self, backup: Dict[str, Any]):
"""Восстановление состояния из резервной копии"""
self.cash = backup['cash']
self.positions = backup['positions']
# Удаляем транзакции, добавленные после создания бэкапа
self.transaction_log = self.transaction_log[:backup['transaction_log_size']]
class PortfolioValidator:
"""Валидатор для проверки торговых операций"""
def __init__(self, max_position_weight: float = 0.4, min_cash_reserve: float = 0.05):
self.max_position_weight = max_position_weight
self.min_cash_reserve = min_cash_reserve
def validate_transaction(self, portfolio: Portfolio, transaction: Transaction, 
current_prices: Dict[str, Decimal]) -> bool:
"""Проверка допустимости транзакции"""
# Создаем временную копию для тестирования
temp_portfolio = copy.deepcopy(portfolio)
try:
temp_portfolio.add_transaction(transaction, current_prices)
# Проверяем ограничения
total_value = temp_portfolio.get_total_value(current_prices)
# Проверка минимального резерва наличности
if temp_portfolio.cash < total_value * Decimal(str(self.min_cash_reserve)): raise ValueError(f"Нарушение минимального резерва наличности ({self.min_cash_reserve*100}%)") # Проверка максимального веса позиции for symbol, position in temp_portfolio.positions.items(): if symbol in current_prices: position_value = position.get_market_value(current_prices[symbol]) weight = float(position_value / total_value) if weight > self.max_position_weight:
raise ValueError(f"Превышен максимальный вес позиции {symbol}: {weight:.1%} > {self.max_position_weight:.1%}")
return True
except Exception as e:
print(f"Валидация не пройдена: {e}")
return False
class ExchangeConnectionPool:
"""Пул соединений для работы с биржевыми API"""
def __init__(self, 
max_connections: int = 10,
rate_limit_per_second: int = 10,
retry_attempts: int = 3):
self.max_connections = max_connections
self.rate_limit_per_second = rate_limit_per_second
self.retry_attempts = retry_attempts
# Пул доступных соединений
self._connection_pool = queue.Queue(maxsize=max_connections)
self._active_connections: Dict[int, requests.Session] = {}
self._connection_lock = threading.Lock()
# Rate limiting
self._request_times = []
self._rate_limit_lock = threading.Lock()
# Инициализация пула соединений
self._initialize_pool()
def _initialize_pool(self):
"""Инициализация пула HTTP-соединений"""
for i in range(self.max_connections):
session = self._create_session()
self._connection_pool.put(session)
def _create_session(self) -> requests.Session:
"""Создание настроенной HTTP-сессии"""
session = requests.Session()
# Настройка retry-стратегии
retry_strategy = Retry(
total=self.retry_attempts,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
# Настройка таймаутов
session.timeout = (10, 30)  # connect_timeout, read_timeout
return session
def _enforce_rate_limit(self):
"""Применение ограничений частоты запросов"""
with self._rate_limit_lock:
current_time = time.time()
# Удаляем старые записи (старше 1 секунды)
self._request_times = [t for t in self._request_times if current_time - t < 1.0] # Проверяем лимит if len(self._request_times) >= self.rate_limit_per_second:
sleep_time = 1.0 - (current_time - self._request_times[0])
if sleep_time > 0:
time.sleep(sleep_time)
# Записываем время текущего запроса
self._request_times.append(current_time)
@contextmanager
def get_connection(self) -> Generator[requests.Session, None, None]:
"""Контекстный менеджер для получения соединения из пула"""
# Применяем rate limiting
self._enforce_rate_limit()
# Получаем соединение из пула
try:
connection = self._connection_pool.get(timeout=30)
except queue.Empty:
raise Exception("Нет доступных соединений в пуле")
connection_id = id(connection)
with self._connection_lock:
self._active_connections[connection_id] = connection
try:
yield connection
except Exception as e:
# При ошибке пересоздаем соединение
connection.close()
connection = self._create_session()
raise e
finally:
# Возвращаем соединение в пул
with self._connection_lock:
if connection_id in self._active_connections:
del self._active_connections[connection_id]
self._connection_pool.put(connection)
def cleanup(self):
"""Очистка всех соединений"""
with self._connection_lock:
# Закрываем активные соединения
for connection in self._active_connections.values():
connection.close()
self._active_connections.clear()
# Закрываем соединения в пуле
while not self._connection_pool.empty():
try:
connection = self._connection_pool.get_nowait()
connection.close()
except queue.Empty:
break
# Глобальный пул соединений
exchange_pool = ExchangeConnectionPool(max_connections=5, rate_limit_per_second=8)
@contextmanager
def trading_data_session(exchange_name: str, 
symbols: List[str],
auto_retry: bool = True) -> Generator[Dict[str, Any], None, None]:
"""Контекстный менеджер для работы с торговыми данными"""
session_data = {
'exchange': exchange_name,
'symbols': symbols,
'data_cache': {},
'connection_pool': exchange_pool,
'errors': [],
'start_time': time.time()
}
print(f"Открытие торговой сессии: {exchange_name} для {len(symbols)} символов")
try:
# Предзагрузка критически важных данных
for symbol in symbols[:3]:  # Ограничиваем предзагрузку
try:
with exchange_pool.get_connection() as conn:
# Имитация получения данных (в реальности здесь был бы API-вызов)
mock_data = {
'symbol': symbol,
'price': np.random.uniform(100, 1000),
'volume': np.random.uniform(1000, 100000),
'timestamp': time.time()
}
session_data['data_cache'][symbol] = mock_data
except Exception as e:
session_data['errors'].append(f"Ошибка предзагрузки {symbol}: {e}")
yield session_data
except Exception as e:
session_data['errors'].append(f"Критическая ошибка сессии: {e}")
raise e
finally:
session_duration = time.time() - session_data['start_time']
print(f"Закрытие торговой сессии {exchange_name}")
print(f"Длительность: {session_duration:.2f}с, "
f"Обработано символов: {len(session_data['data_cache'])}, "
f"Ошибок: {len(session_data['errors'])}")
if session_data['errors']:
print("Ошибки сессии:")
for error in session_data['errors'][-5:]:  # Показываем последние 5
print(f"  - {error}")
@contextmanager
def portfolio_calculation_context(portfolio_name: str,
memory_limit_mb: int = 2000,
enable_profiling: bool = False) -> Generator[Dict[str, Any], None, None]:
"""Контекстный менеджер для безопасных портфельных вычислений"""
process = psutil.Process()
start_memory = process.memory_info().rss / 1024 / 1024  # MB
context_data = {
'portfolio_name': portfolio_name,
'start_time': time.time(),
'start_memory': start_memory,
'memory_limit': memory_limit_mb,
'profiling_enabled': enable_profiling,
'calculations': []
}
print(f"Начало расчетов для портфеля '{portfolio_name}' (память: {start_memory:.1f} MB)")
try:
yield context_data
except MemoryError:
print(f"ОШИБКА: Превышен лимит памяти {memory_limit_mb} MB")
raise
except Exception as e:
print(f"Ошибка в расчетах портфеля: {e}")
raise
finally:
# Финальная статистика
end_memory = process.memory_info().rss / 1024 / 1024
duration = time.time() - context_data['start_time']
memory_used = end_memory - start_memory
print(f"Завершение расчетов '{portfolio_name}':")
print(f"  Время: {duration:.2f}с")
print(f"  Память: {memory_used:+.1f} MB (итого: {end_memory:.1f} MB)")
print(f"  Операций: {len(context_data['calculations'])}")
# Принудительная очистка памяти
gc.collect()
@contextmanager
def portfolio_transaction(portfolio: Portfolio, 
validator: PortfolioValidator,
current_prices: Dict[str, Decimal]) -> Generator[List[Transaction], None, None]:
"""Контекстный менеджер для атомарных портфельных транзакций"""
# Создаем резервную копию состояния
backup = portfolio.create_backup()
transactions = []
print("Начало транзакционного блока")
try:
yield transactions
# Валидируем все транзакции
for transaction in transactions:
if not validator.validate_transaction(portfolio, transaction, current_prices):
raise ValueError(f"Транзакция не прошла валидацию: {transaction}")
# Применяем все транзакции
for transaction in transactions:
portfolio.add_transaction(transaction, current_prices)
print(f"  ✓ Выполнена: {transaction.operation_type} {transaction.quantity_change} {transaction.symbol}")
print(f"Транзакционный блок успешно завершен ({len(transactions)} операций)")
except Exception as e:
# Откатываем изменения
print(f"Ошибка в транзакции: {e}")
print("Выполняется откат изменений...")
portfolio.restore_from_backup(backup)
# Помечаем все транзакции как неудачные
for transaction in transactions:
transaction.status = TransactionStatus.FAILED
raise e
def create_rebalancing_transactions(portfolio: Portfolio, 
target_allocation: Dict[str, float],
current_prices: Dict[str, Decimal]) -> List[Transaction]:
"""Создание транзакций для ребалансировки портфеля"""
transactions = []
current_snapshot = portfolio.get_snapshot(current_prices)
total_value = current_snapshot['total_value']
print(f"Создание транзакций ребалансировки (общая стоимость: ${total_value:,.2f})")
for symbol, target_weight in target_allocation.items():
if symbol not in current_prices:
print(f"  Пропуск {symbol}: нет текущей цены")
continue
target_value = total_value * Decimal(str(target_weight))
current_value = current_snapshot['positions'].get(symbol, {}).get('market_value', Decimal('0'))
difference = target_value - current_value
price = current_prices[symbol]
if abs(difference) > Decimal('100'):  # Минимальный порог для торговли
quantity = abs(difference) / price
operation = "BUY" if difference > 0 else "SELL"
transaction = Transaction(
transaction_id=f"REBAL_{symbol}_{int(time.time())}",
timestamp=datetime.now(),
operation_type=operation,
symbol=symbol,
quantity_change=quantity,
price=price
)
transactions.append(transaction)
print(f"  {operation} {quantity:.2f} {symbol} @ ${price:.2f} (разница: ${difference:,.2f})")
return transactions
def demonstrate_context_managers():
"""Демонстрация торговых контекстных менеджеров"""
print("=== ДЕМОНСТРАЦИЯ ТОРГОВЫХ КОНТЕКСТНЫХ МЕНЕДЖЕРОВ ===\n")
# Создаем портфель и валидатор
portfolio = Portfolio(initial_cash=Decimal('100000'))
validator = PortfolioValidator(max_position_weight=0.4, min_cash_reserve=0.05)
# Текущие цены (в реальной системе получались бы из API)
current_prices = {
'AAPL': Decimal('150.00'),
'GOOGL': Decimal('2500.00'),
'TSLA': Decimal('800.00'),
'BTC-USD': Decimal('45000.00'),
'ETH-USD': Decimal('3000.00')
}
print(f"Исходный портфель: ${portfolio.get_total_value():,.2f} (100% кеш)")
# Тест 1: Успешная транзакция
print(f"\n{'='*50}")
print("Тест 1: Создание начальных позиций")
try:
with portfolio_transaction(portfolio, validator, current_prices) as transactions:
# Покупаем несколько активов
transactions.extend([
Transaction("TXN_001", datetime.now(), "BUY", "AAPL", 
Decimal('100'), current_prices['AAPL']),
Transaction("TXN_002", datetime.now(), "BUY", "GOOGL", 
Decimal('10'), current_prices['GOOGL']),
Transaction("TXN_003", datetime.now(), "BUY", "TSLA", 
Decimal('25'), current_prices['TSLA'])
])
except Exception as e:
print(f"Ошибка в транзакции: {e}")
# Показываем текущее состояние
snapshot = portfolio.get_snapshot(current_prices)
print(f"\nТекущие позиции:")
for symbol, pos_data in snapshot['positions'].items():
print(f"  {symbol}: {pos_data['quantity']:.2f} акций @ ${pos_data['avg_price']:.2f} "
f"= ${pos_data['market_value']:,.2f}")
print(f"  CASH: ${snapshot['cash']:,.2f}")
# Тест 2: Неудачная транзакция с откатом
print(f"\n{'='*50}")
print("Тест 2: Транзакция с ошибкой валидации")
try:
with portfolio_transaction(portfolio, validator, current_prices) as transactions:
# Пытаемся купить слишком много (превысим лимит позиции)
transactions.append(
Transaction("TXN_004", datetime.now(), "BUY", "BTC-USD", 
Decimal('5'), current_prices['BTC-USD'])  # Очень дорогая покупка
)
except Exception as e:
print(f"Транзакция отклонена (ожидаемо): {e}")
# Тест 3: Ребалансировка портфеля
print(f"\n{'='*50}")
print("Тест 3: Ребалансировка портфеля")
target_allocation = {
'AAPL': 0.30,
'GOOGL': 0.25,
'TSLA': 0.20,
'ETH-USD': 0.15,
# 10% остается в кеше
}
try:
rebalancing_transactions = create_rebalancing_transactions(
portfolio, target_allocation, current_prices
)
with portfolio_transaction(portfolio, validator, current_prices) as transactions:
transactions.extend(rebalancing_transactions)
except Exception as e:
print(f"Ошибка ребалансировки: {e}")
# Тест 4: Демонстрация торговой сессии
print(f"\n{'='*50}")
print("Тест 4: Торговая сессия с данными")
symbols = ['AAPL', 'GOOGL', 'TSLA', 'BTC-USD', 'ETH-USD']
try:
with trading_data_session("NYSE", symbols) as session:
print(f"Загружено данных для {len(session['data_cache'])} символов")
for symbol, data in session['data_cache'].items():
print(f"  {symbol}: ${data['price']:.2f}")
except Exception as e:
print(f"Ошибка торговой сессии: {e}")
# Тест 5: Портфельные расчеты
print(f"\n{'='*50}")
print("Тест 5: Контекст портфельных расчетов")
try:
with portfolio_calculation_context("Test Portfolio") as calc_context:
# Имитация сложных расчетов
for i in range(5):
calc_context['calculations'].append(f"Расчет метрики {i+1}")
time.sleep(0.1)  # Имитация работы
except Exception as e:
print(f"Ошибка расчетов: {e}")
# Финальное состояние портфеля
print(f"\n{'='*50}")
print("ФИНАЛЬНОЕ СОСТОЯНИЕ ПОРТФЕЛЯ")
final_snapshot = portfolio.get_snapshot(current_prices)
print(f"Общая стоимость: ${final_snapshot['total_value']:,.2f}")
for symbol, pos_data in final_snapshot['positions'].items():
weight = pos_data['market_value'] / final_snapshot['total_value']
print(f"  {symbol}: {weight:.1%} (${pos_data['market_value']:,.2f})")
cash_weight = final_snapshot['cash'] / final_snapshot['total_value']
print(f"  CASH: {cash_weight:.1%} (${final_snapshot['cash']:,.2f})")
# История транзакций
print(f"\nИстория транзакций ({len(portfolio.transaction_log)} записей):")
for txn in portfolio.transaction_log[-5:]:  # Последние 5
print(f"  {txn.timestamp.strftime('%H:%M:%S')} | {txn.operation_type} "
f"{float(txn.quantity_change):.2f} {txn.symbol} @ ${float(txn.price):.2f} "
f"[{txn.status.value}]")
# Очистка ресурсов
print(f"\n{'='*50}")
print("Очистка ресурсов...")
exchange_pool.cleanup()
print("Готово!")
# Запуск демонстрации
if __name__ == "__main__":
demonstrate_context_managers()
=== ДЕМОНСТРАЦИЯ ТОРГОВЫХ КОНТЕКСТНЫХ МЕНЕДЖЕРОВ ===
Исходный портфель: $100,000.00 (100% кеш)
==================================================
Тест 1: Создание начальных позиций
Начало транзакционного блока
Выполняется откат изменений...
Текущие позиции:
CASH: $100,000.00
==================================================
Тест 2: Транзакция с ошибкой валидации
Начало транзакционного блока
Выполняется откат изменений...
Транзакция отклонена (ожидаемо): cannot pickle '_thread.lock' object
==================================================
Тест 3: Ребалансировка портфеля
Создание транзакций ребалансировки (общая стоимость: $100,000.00)
BUY 200.00 AAPL @ $150.00 (разница: $30,000.00)
BUY 10.00 GOOGL @ $2500.00 (разница: $25,000.00)
BUY 25.00 TSLA @ $800.00 (разница: $20,000.00)
BUY 5.00 ETH-USD @ $3000.00 (разница: $15,000.00)
Начало транзакционного блока
Выполняется откат изменений...
Ошибка ребалансировки: cannot pickle '_thread.lock' object
==================================================
Тест 4: Торговая сессия с данными
Открытие торговой сессии: NYSE для 5 символов
Загружено данных для 3 символов
AAPL: $669.68
GOOGL: $862.79
TSLA: $120.03
Закрытие торговой сессии NYSE
Длительность: 0.00с, Обработано символов: 3, Ошибок: 0
==================================================
Тест 5: Контекст портфельных расчетов
Начало расчетов для портфеля 'Test Portfolio' (память: 229.7 MB)
Завершение расчетов 'Test Portfolio':
Время: 0.50с
Память: +0.0 MB (итого: 229.7 MB)
Операций: 5
==================================================
ФИНАЛЬНОЕ СОСТОЯНИЕ ПОРТФЕЛЯ
Общая стоимость: $100,000.00
CASH: 100.0% ($100,000.00)
История транзакций (0 записей):
==================================================
Очистка ресурсов...
Готово!

Представленный код демонстрирует комплексную систему управления торговыми операциями с использованием контекстных менеджеров. Класс ExchangeConnectionPool реализует профессиональный подход к работе с внешними API: пулинг соединений снижает overhead установки TCP-соединений, встроенный rate limiting предотвращает блокировку биржевыми серверами, а автоматическая retry-стратегия обеспечивает устойчивость к временным сбоям сети.

Контекстный менеджер trading_data_session создает безопасную сессию для работы с рыночными данными. Он автоматически предзагружает критически важную информацию, ведет детальный лог ошибок и корректно освобождает ресурсы при завершении работы. Такой подход особенно важен в высокочастотной торговле, где стабильность получения данных напрямую влияет на прибыльность.

Менеджер portfolio_calculation_context решает проблему контроля ресурсов при выполнении сложных портфельных расчетов. Мониторинг памяти в реальном времени предотвращает крах приложения, а автоматическая сборка мусора гарантирует эффективное использование системных ресурсов.

Система транзакций с классами Portfolio, Transaction и контекстным менеджером portfolio_transaction обеспечивает ACID-свойства для портфельных операций. Автоматический откат при ошибках, валидация через PortfolioValidator и детальное логирование создают надежную основу для производственных торговых систем.

Метаклассы в финансовом моделировании

Динамическая генерация финансовых инструментов

В современных финансах постоянно появляются новые типы инструментов и деривативов. Метаклассы позволяют создавать гибкие системы, способные динамически адаптироваться к новым требованиям рынка без изменения основного кода.

from typing import Union, Type, Dict, Any, List, Callable, Optional, Tuple
from abc import ABCMeta, abstractmethod
import inspect
from dataclasses import dataclass, field
from enum import Enum
import math
from datetime import datetime, timedelta
from decimal import Decimal
import numpy as np
import copy
class InstrumentType(Enum):
EQUITY = "equity"
BOND = "bond"
OPTION = "option"
FUTURE = "future"
SWAP = "swap"
CRYPTO = "crypto"
@dataclass
class PricingParameter:
"""Параметр для расчета цены финансового инструмента"""
name: str
parameter_type: type
default_value: Any = None
validation_rule: Optional[Callable] = None
description: str = ""
class InstrumentMeta(ABCMeta):
"""
Метакласс для автоматического создания финансовых инструментов
с единообразным интерфейсом расчета цены и оценки рисков
"""
# Реестр всех созданных инструментов
instrument_registry: Dict[str, Type] = {}
def __new__(mcs, name, bases, namespace, **kwargs):
# Создаем класс сначала
cls = super().__new__(mcs, name, bases, namespace)
# Автоматически добавляем валидацию параметров ценообразования
def validate_pricing_inputs(self, **inputs):
"""Автоматическая валидация входных параметров"""
if hasattr(self, 'pricing_parameters'):
for param in self.pricing_parameters:
if param.name in inputs:
value = inputs[param.name]
# Проверка типа (с учетом совместимых типов)
if param.parameter_type == float:
# Принимаем int, float и числа
if not isinstance(value, (int, float, np.number)):
raise TypeError(f"Параметр {param.name} должен быть числом, получен {type(value)}")
value = float(value)  # Приводим к float
elif not isinstance(value, param.parameter_type):
raise TypeError(f"Параметр {param.name} должен быть типа {param.parameter_type}")
# Дополнительная валидация
if param.validation_rule and not param.validation_rule(value):
raise ValueError(f"Параметр {param.name}={value} не прошел валидацию")
return True
# Добавляем метод валидации к классу
cls.validate_pricing_inputs = validate_pricing_inputs
# Автоматически добавляем греки для опционов
if hasattr(cls, 'calculate_price') and any('option' in base.__name__.lower() for base in cls.__mro__):
original_calculate_price = cls.calculate_price
def enhanced_calculate_price(self, **kwargs):
"""Расчет цены с автоматическим вычислением греков"""
base_price = original_calculate_price(self, **kwargs)
# Автоматически вычисляем греки для опционов
if hasattr(self, '_calculate_greeks'):
greeks = self._calculate_greeks(**kwargs)
return {
'price': base_price,
'greeks': greeks,
'timestamp': datetime.now(),
'parameters_used': kwargs
}
return base_price
cls.calculate_price = enhanced_calculate_price
# Автоматически добавляем систему кеширования для сложных расчетов
if hasattr(cls, 'calculate_price'):
original_calc = cls.calculate_price
def cached_calculate_price(self, **kwargs):
"""Расчет цены с кешированием"""
cache_key = self._generate_cache_key(**kwargs)
if not hasattr(self, '_price_cache'):
self._price_cache = {}
if cache_key in self._price_cache:
cached_result = self._price_cache[cache_key]
# Проверяем актуальность (5 минут)
if (datetime.now() - cached_result['timestamp']).seconds < 300: return cached_result result = original_calc(self, **kwargs) if isinstance(result, dict): result['timestamp'] = datetime.now() else: result = { 'price': result, 'timestamp': datetime.now(), 'parameters_used': kwargs } self._price_cache[cache_key] = result return result cls.calculate_price = cached_calculate_price # Регистрируем инструмент if name != 'BaseInstrument' and hasattr(cls, '__init__'): # Проверяем, что класс имеет атрибут instrument_type после инициализации try: # Создаем временный экземпляр для проверки temp_instance = object.__new__(cls) if hasattr(temp_instance, '__dict__'): temp_instance.__dict__ = {} # Вызываем __init__ с минимальными параметрами if 'symbol' in inspect.signature(cls.__init__).parameters: cls.__init__(temp_instance, "TEST") if hasattr(temp_instance, 'instrument_type') and temp_instance.instrument_type: mcs.instrument_registry[name] = cls print(f"Зарегистрирован инструмент: {name}") except Exception as e: # Если не удается создать временный экземпляр, регистрируем на основе имени if any(keyword in name.lower() for keyword in ['equity', 'bond', 'option', 'future', 'swap', 'crypto']): mcs.instrument_registry[name] = cls print(f"Зарегистрирован инструмент: {name} (по имени)") return cls def __call__(cls, *args, **kwargs): """Перехватываем создание экземпляра для дополнительной настройки""" instance = super().__call__(*args, **kwargs) # Автоматически добавляем систему уведомлений о изменении цены instance._price_change_callbacks = [] instance._last_price = None return instance @classmethod def create_instrument(mcs, instrument_name: str, **init_params): """Создание экземпляра инструмента по имени""" if instrument_name not in mcs.instrument_registry: raise ValueError(f"Инструмент {instrument_name} не найден. Доступные: {list(mcs.instrument_registry.keys())}") instrument_class = mcs.instrument_registry[instrument_name] return instrument_class(**init_params) @classmethod def list_instruments(mcs) -> List[str]:
"""Получение списка всех зарегистрированных инструментов"""
return list(mcs.instrument_registry.keys())
class BaseInstrument(metaclass=InstrumentMeta):
"""Базовый класс для всех финансовых инструментов"""
def __init__(self, symbol: str, currency: str = "USD"):
self.symbol = symbol
self.currency = currency
self.pricing_parameters = []
self.instrument_type = None
self._price_cache = {}
self._price_change_callbacks = []
self._last_price = None
@abstractmethod
def calculate_price(self, **market_data) -> Union[float, Dict[str, Any]]:
"""Основной метод расчета цены инструмента"""
pass
def _generate_cache_key(self, **kwargs) -> str:
"""Генерация ключа для кеширования"""
key_data = f"{self.symbol}_{sorted(kwargs.items())}"
return str(hash(key_data))
def add_price_change_callback(self, callback: Callable):
"""Добавление callback'а на изменение цены"""
self._price_change_callbacks.append(callback)
def _notify_price_change(self, old_price: float, new_price: float):
"""Уведомление о изменении цены"""
for callback in self._price_change_callbacks:
try:
callback(self, old_price, new_price)
except Exception as e:
print(f"Ошибка в callback: {e}")
def clear_cache(self):
"""Очистка кеша цен"""
self._price_cache.clear()
def get_pricing_info(self) -> Dict[str, Any]:
"""Получение информации о параметрах ценообразования"""
return {
'symbol': self.symbol,
'instrument_type': self.instrument_type.value if self.instrument_type else None,
'currency': self.currency,
'pricing_parameters': [
{
'name': param.name,
'type': param.parameter_type.__name__,
'default': param.default_value,
'description': param.description
}
for param in self.pricing_parameters
]
}
class EquityInstrument(BaseInstrument):
"""
Акция с автоматически генерируемой функциональностью
благодаря метаклассу получает валидацию, кеширование и уведомления
"""
def __init__(self, symbol: str, sector: str = "Technology", currency: str = "USD"):
super().__init__(symbol, currency)
self.sector = sector
self.instrument_type = InstrumentType.EQUITY
self.pricing_parameters = [
PricingParameter("spot_price", float, 100.0, lambda x: x > 0, "Текущая цена акции"),
PricingParameter("dividend_yield", float, 0.02, lambda x: 0 <= x <= 0.5, "Дивидендная доходность"), PricingParameter("beta", float, 1.0, lambda x: x > 0, "Бета коэффициент"),
PricingParameter("market_cap", float, 1e9, lambda x: x > 0, "Рыночная капитализация")
]
def calculate_price(self, **market_data) -> Dict[str, Any]:
"""Расчет цены акции с учетом рыночных факторов"""
# Валидация входных данных (автоматически добавляется метаклассом)
if hasattr(self, 'validate_pricing_inputs'):
self.validate_pricing_inputs(**market_data)
spot_price = market_data.get('spot_price', 100.0)
dividend_yield = market_data.get('dividend_yield', 0.02)
beta = market_data.get('beta', 1.0)
market_return = market_data.get('market_return', 0.08)
risk_free_rate = market_data.get('risk_free_rate', 0.02)
# Простая модель CAPM для оценки справедливой цены
expected_return = risk_free_rate + beta * (market_return - risk_free_rate)
fair_value = spot_price * (1 + expected_return - dividend_yield)
# Уведомляем о изменении цены
if self._last_price and self._last_price != fair_value:
self._notify_price_change(self._last_price, fair_value)
self._last_price = fair_value
return {
'fair_value': fair_value,
'current_price': spot_price,
'expected_return': expected_return,
'recommendation': 'BUY' if fair_value > spot_price * 1.05 else 'SELL' if fair_value < spot_price * 0.95 else 'HOLD' } class BondInstrument(BaseInstrument): """Облигация с автоматическим расчетом дюрации и convexity""" def __init__(self, symbol: str, face_value: float = 1000, coupon_rate: float = 0.05, maturity_years: float = 10, currency: str = "USD"): super().__init__(symbol, currency) self.face_value = face_value self.coupon_rate = coupon_rate self.maturity_years = maturity_years self.instrument_type = InstrumentType.BOND self.pricing_parameters = [ PricingParameter("yield_to_maturity", float, 0.05, lambda x: x > 0, "Доходность к погашению"),
PricingParameter("credit_spread", float, 0.001, lambda x: x >= 0, "Кредитный спред"),
PricingParameter("time_to_maturity", float, maturity_years, lambda x: x > 0, "Время до погашения")
]
def calculate_price(self, **market_data) -> Dict[str, Any]:
"""Расчет цены облигации"""
if hasattr(self, 'validate_pricing_inputs'):
self.validate_pricing_inputs(**market_data)
ytm = market_data.get('yield_to_maturity', 0.05)
credit_spread = market_data.get('credit_spread', 0.001)
time_to_maturity = market_data.get('time_to_maturity', self.maturity_years)
effective_yield = ytm + credit_spread
annual_coupon = self.face_value * self.coupon_rate
# Расчет цены по формуле приведенной стоимости
pv_coupons = sum(
annual_coupon / ((1 + effective_yield) ** t)
for t in range(1, int(time_to_maturity) + 1)
)
pv_face_value = self.face_value / ((1 + effective_yield) ** time_to_maturity)
bond_price = pv_coupons + pv_face_value
# Расчет дюрации (модифицированной)
duration = self._calculate_duration(effective_yield, time_to_maturity)
return {
'bond_price': bond_price,
'yield_to_maturity': ytm,
'duration': duration,
'convexity': self._calculate_convexity(effective_yield, time_to_maturity),
'accrued_interest': self._calculate_accrued_interest()
}
def _calculate_duration(self, yield_rate: float, time_to_maturity: float) -> float:
"""Расчет модифицированной дюрации"""
annual_coupon = self.face_value * self.coupon_rate
weighted_time = sum(
t * (annual_coupon / ((1 + yield_rate) ** t))
for t in range(1, int(time_to_maturity) + 1)
)
weighted_time += time_to_maturity * (self.face_value / ((1 + yield_rate) ** time_to_maturity))
bond_price = sum(
annual_coupon / ((1 + yield_rate) ** t)
for t in range(1, int(time_to_maturity) + 1)
) + self.face_value / ((1 + yield_rate) ** time_to_maturity)
macaulay_duration = weighted_time / bond_price
modified_duration = macaulay_duration / (1 + yield_rate)
return modified_duration
def _calculate_convexity(self, yield_rate: float, time_to_maturity: float) -> float:
"""Расчет выпуклости облигации"""
annual_coupon = self.face_value * self.coupon_rate
convexity_sum = sum(
t * (t + 1) * (annual_coupon / ((1 + yield_rate) ** (t + 2)))
for t in range(1, int(time_to_maturity) + 1)
)
convexity_sum += time_to_maturity * (time_to_maturity + 1) * (
self.face_value / ((1 + yield_rate) ** (time_to_maturity + 2))
)
bond_price = sum(
annual_coupon / ((1 + yield_rate) ** t)
for t in range(1, int(time_to_maturity) + 1)
) + self.face_value / ((1 + yield_rate) ** time_to_maturity)
return convexity_sum / bond_price
def _calculate_accrued_interest(self, settlement_days: int = 0) -> float:
"""Расчет накопленного процентного дохода"""
# Упрощенный расчет (предполагаем, что купон выплачивается раз в год)
days_since_last_coupon = settlement_days % 365
return (self.face_value * self.coupon_rate) * (days_since_last_coupon / 365)
class OptionInstrument(BaseInstrument):
"""
Опцион с автоматическим расчетом греков
Метакласс автоматически добавляет расчет греков к основной цене
"""
def __init__(self, symbol: str, option_type: str = "call", strike_price: float = 100,
expiration_days: int = 30, currency: str = "USD"):
super().__init__(symbol, currency)
self.option_type = option_type.lower()
self.strike_price = strike_price
self.expiration_days = expiration_days
self.instrument_type = InstrumentType.OPTION
self.pricing_parameters = [
PricingParameter("spot_price", float, 100.0, lambda x: x > 0, "Цена базового актива"),
PricingParameter("volatility", float, 0.2, lambda x: 0 < x < 5, "Волатильность"), PricingParameter("risk_free_rate", float, 0.05, lambda x: x >= 0, "Безрисковая ставка"),
PricingParameter("time_to_expiration", float, expiration_days/365, lambda x: x > 0, "Время до экспирации")
]
def calculate_price(self, **market_data) -> float:
"""Расчет цены опциона по модели Блэка-Шоулза"""
if hasattr(self, 'validate_pricing_inputs'):
self.validate_pricing_inputs(**market_data)
S = market_data.get('spot_price', 100.0)
K = self.strike_price
T = market_data.get('time_to_expiration', self.expiration_days / 365)
r = market_data.get('risk_free_rate', 0.05)
sigma = market_data.get('volatility', 0.2)
# Простая модель Блэка-Шоулза без scipy для совместимости
d1 = (math.log(S/K) + (r + 0.5*sigma**2)*T) / (sigma*math.sqrt(T))
d2 = d1 - sigma*math.sqrt(T)
# Аппроксимация функции нормального распределения
def norm_cdf(x):
return 0.5 * (1 + math.erf(x / math.sqrt(2)))
if self.option_type == "call":
price = S*norm_cdf(d1) - K*math.exp(-r*T)*norm_cdf(d2)
else:  # put
price = K*math.exp(-r*T)*norm_cdf(-d2) - S*norm_cdf(-d1)
return max(price, 0)
def _calculate_greeks(self, **market_data) -> Dict[str, float]:
"""Автоматический расчет греков (добавляется метаклассом)"""
S = market_data.get('spot_price', 100.0)
K = self.strike_price
T = market_data.get('time_to_expiration', self.expiration_days / 365)
r = market_data.get('risk_free_rate', 0.05)
sigma = market_data.get('volatility', 0.2)
# Аппроксимация функций нормального распределения
def norm_cdf(x):
return 0.5 * (1 + math.erf(x / math.sqrt(2)))
def norm_pdf(x):
return math.exp(-0.5 * x * x) / math.sqrt(2 * math.pi)
d1 = (math.log(S/K) + (r + 0.5*sigma**2)*T) / (sigma*math.sqrt(T))
d2 = d1 - sigma*math.sqrt(T)
# Delta
if self.option_type == "call":
delta = norm_cdf(d1)
else:
delta = norm_cdf(d1) - 1
# Gamma
gamma = norm_pdf(d1) / (S * sigma * math.sqrt(T))
# Theta
if self.option_type == "call":
theta = (-S*norm_pdf(d1)*sigma/(2*math.sqrt(T)) - 
r*K*math.exp(-r*T)*norm_cdf(d2)) / 365
else:
theta = (-S*norm_pdf(d1)*sigma/(2*math.sqrt(T)) + 
r*K*math.exp(-r*T)*norm_cdf(-d2)) / 365
# Vega
vega = S * norm_pdf(d1) * math.sqrt(T) / 100
# Rho
if self.option_type == "call":
rho = K * T * math.exp(-r*T) * norm_cdf(d2) / 100
else:
rho = -K * T * math.exp(-r*T) * norm_cdf(-d2) / 100
return {
'delta': delta,
'gamma': gamma,
'theta': theta,
'vega': vega,
'rho': rho
}
# Демонстрация использования системы финансовых инструментов
def demonstrate_financial_instruments():
"""Демонстрация работы с финансовыми инструментами"""
print("=== ДЕМОНСТРАЦИЯ СИСТЕМЫ ФИНАНСОВЫХ ИНСТРУМЕНТОВ ===\n")
# Показываем зарегистрированные инструменты
print("Зарегистрированные инструменты:")
instruments = InstrumentMeta.list_instruments()
if instruments:
for instrument_name in instruments:
print(f"  - {instrument_name}")
else:
print("  (регистрация происходит при создании экземпляров)")
print(f"\n{'='*60}")
print("1. РАБОТА С АКЦИЯМИ")
print(f"{'='*60}")
# Создаем акцию
apple_stock = EquityInstrument("AAPL", "Technology")
# Добавляем callback на изменение цены
def price_change_alert(instrument, old_price, new_price):
change_pct = ((new_price - old_price) / old_price) * 100
print(f"🚨 Цена {instrument.symbol} изменилась: "
f"${old_price:.2f} → ${new_price:.2f} ({change_pct:+.1f}%)")
apple_stock.add_price_change_callback(price_change_alert)
# Расчет цены в разных рыночных условиях
market_scenarios = [
{"spot_price": 150.0, "beta": 1.2, "market_return": 0.10, "risk_free_rate": 0.02},
{"spot_price": 150.0, "beta": 1.2, "market_return": 0.05, "risk_free_rate": 0.02},
{"spot_price": 150.0, "beta": 0.8, "market_return": 0.08, "risk_free_rate": 0.03}
]
print(f"Анализ справедливой цены {apple_stock.symbol}:")
for i, scenario in enumerate(market_scenarios, 1):
result = apple_stock.calculate_price(**scenario)
# Извлекаем цену из результата (может быть dict из-за кеширования)
if isinstance(result, dict):
if 'fair_value' in result:
price_data = result
else:
price_data = result.get('price', result)
else:
price_data = {'fair_value': result}
if isinstance(price_data, dict) and 'fair_value' in price_data:
print(f"  Сценарий {i}: Fair Value = ${price_data['fair_value']:.2f}, "
f"Expected Return = {price_data.get('expected_return', 0):.1%}, "
f"Recommendation = {price_data.get('recommendation', 'N/A')}")
else:
print(f"  Сценарий {i}: Результат = {price_data}")
print(f"\n{'='*60}")
print("2. РАБОТА С ОБЛИГАЦИЯМИ") 
print(f"{'='*60}")
# Создаем корпоративную облигацию
corp_bond = BondInstrument("CORP_10Y", face_value=1000, coupon_rate=0.06, maturity_years=10)
# Расчет цены при разных доходностях
yield_scenarios = [0.04, 0.05, 0.06, 0.07, 0.08]
print(f"Анализ цены облигации {corp_bond.symbol} при разных доходностях:")
print("Доходность | Цена     | Дюрация  | Выпуклость")
print("-" * 45)
for ytm in yield_scenarios:
result = corp_bond.calculate_price(yield_to_maturity=ytm)
# Обрабатываем результат из кеша
if isinstance(result, dict):
if 'bond_price' in result:
bond_data = result
else:
bond_data = result.get('price', result)
else:
bond_data = {'bond_price': result}
if isinstance(bond_data, dict) and 'bond_price' in bond_data:
print(f"{ytm:.1%}       | ${bond_data['bond_price']:7.2f} | {bond_data.get('duration', 0):6.2f}   | {bond_data.get('convexity', 0):8.4f}")
else:
print(f"{ytm:.1%}       | Результат: {bond_data}")
print(f"\n{'='*60}")
print("3. РАБОТА С ОПЦИОНАМИ (с автоматическими греками)")
print(f"{'='*60}")
# Создаем call опцион
call_option = OptionInstrument("AAPL_CALL", "call", strike_price=150, expiration_days=30)
# Расчет цены и греков при разной волатильности
volatility_scenarios = [0.15, 0.20, 0.25, 0.30, 0.35]
print(f"Анализ call опциона {call_option.symbol} (Strike: $150, 30 дней):")
print("Волат. | Цена   | Delta | Gamma | Theta  | Vega  ")
print("-" * 50)
for vol in volatility_scenarios:
# Благодаря метаклассу получаем расширенный результат с греками
result = call_option.calculate_price(
spot_price=155.0, 
volatility=vol, 
risk_free_rate=0.05,
time_to_expiration=30/365
)
# Обрабатываем результат (может быть из кеша или с греками)
if isinstance(result, dict):
if 'price' in result and 'greeks' in result:
# Результат с греками
price = result['price']
greeks = result['greeks']
elif 'price' in result:
# Результат из кеша
price = result['price']
greeks = {'delta': 0, 'gamma': 0, 'theta': 0, 'vega': 0}
else:
price = list(result.values())[0] if result else 0
greeks = {'delta': 0, 'gamma': 0, 'theta': 0, 'vega': 0}
else:
price = result
greeks = {'delta': 0, 'gamma': 0, 'theta': 0, 'vega': 0}
print(f"{vol:.0%}    | ${price:6.2f} | {greeks.get('delta', 0):5.3f} | "
f"{greeks.get('gamma', 0):5.3f} | {greeks.get('theta', 0):6.3f} | {greeks.get('vega', 0):5.2f}")
print(f"\n{'='*60}")
print("4. СОЗДАНИЕ ИНСТРУМЕНТОВ ЧЕРЕЗ РЕЕСТР")
print(f"{'='*60}")
# Обновляем реестр после создания экземпляров
print("Обновленный список зарегистрированных инструментов:")
for instrument_name in InstrumentMeta.list_instruments():
print(f"  - {instrument_name}")
# Создание инструментов через метакласс
try:
if "EquityInstrument" in InstrumentMeta.list_instruments():
dynamic_equity = InstrumentMeta.create_instrument(
"EquityInstrument", 
symbol="GOOGL", 
sector="Technology"
)
print(f"✅ Создан {dynamic_equity.symbol} через реестр")
else:
print("⚠️  EquityInstrument не найден в реестре, создаем напрямую")
dynamic_equity = EquityInstrument("GOOGL", "Technology")
if "BondInstrument" in InstrumentMeta.list_instruments():
dynamic_bond = InstrumentMeta.create_instrument(
"BondInstrument",
symbol="TREASURY_5Y",
face_value=1000,
coupon_rate=0.04,
maturity_years=5
)
print(f"✅ Создан {dynamic_bond.symbol} через реестр")
else:
print("⚠️  BondInstrument не найден в реестре, создаем напрямую")
dynamic_bond = BondInstrument("TREASURY_5Y", face_value=1000, coupon_rate=0.04, maturity_years=5)
# Показываем информацию о параметрах ценообразования
print(f"\nПараметры ценообразования для {dynamic_equity.symbol}:")
pricing_info = dynamic_equity.get_pricing_info()
for param in pricing_info['pricing_parameters']:
print(f"  - {param['name']} ({param['type']}): {param['description']}")
except Exception as e:
print(f"❌ Ошибка создания инструмента: {e}")
print(f"\n{'='*60}")
print("5. КЕШИРОВАНИЕ И ПРОИЗВОДИТЕЛЬНОСТЬ")
print(f"{'='*60}")
# Демонстрация кеширования
import time
option = OptionInstrument("TEST_OPTION", "call", strike_price=100)
# Первый расчет (без кеша)
start_time = time.time()
result1 = option.calculate_price(spot_price=105, volatility=0.25, risk_free_rate=0.05)
first_calc_time = time.time() - start_time
# Второй расчет (из кеша)
start_time = time.time()
result2 = option.calculate_price(spot_price=105, volatility=0.25, risk_free_rate=0.05)
second_calc_time = time.time() - start_time
# Извлекаем цены для сравнения
price1 = result1.get('price', result1) if isinstance(result1, dict) else result1
price2 = result2.get('price', result2) if isinstance(result2, dict) else result2
print(f"Первый расчет: {first_calc_time*1000:.2f}мс")
print(f"Второй расчет (кеш): {second_calc_time*1000:.2f}мс")
if second_calc_time > 0:
print(f"Ускорение: {first_calc_time/second_calc_time:.1f}x")
else:
print("Ускорение: очень высокое (из кеша)")
print(f"Результат из кеша корректен: {abs(float(price1) - float(price2)) < 0.001}")
print(f"\n{'='*60}")
print("Демонстрация завершена!")
print(f"{'='*60}")
# Запуск демонстрации
if __name__ == "__main__":
demonstrate_financial_instruments()
Зарегистрирован инструмент: EquityInstrument
Зарегистрирован инструмент: BondInstrument
Зарегистрирован инструмент: OptionInstrument
=== ДЕМОНСТРАЦИЯ СИСТЕМЫ ФИНАНСОВЫХ ИНСТРУМЕНТОВ ===
Зарегистрированные инструменты:
- EquityInstrument
- BondInstrument
- OptionInstrument
============================================================
1. РАБОТА С АКЦИЯМИ
============================================================
Анализ справедливой цены AAPL:
Сценарий 1: Fair Value = $164.40, Expected Return = 11.6%, Recommendation = BUY
🚨 Цена AAPL изменилась: $164.40 → $155.40 (-5.5%)
Сценарий 2: Fair Value = $155.40, Expected Return = 5.6%, Recommendation = HOLD
🚨 Цена AAPL изменилась: $155.40 → $157.50 (+1.4%)
Сценарий 3: Fair Value = $157.50, Expected Return = 7.0%, Recommendation = HOLD
============================================================
2. РАБОТА С ОБЛИГАЦИЯМИ
============================================================
Анализ цены облигации CORP_10Y при разных доходностях:
Доходность | Цена     | Дюрация  | Выпуклость
---------------------------------------------
4.0%       | $1153.34 |   7.66   |  74.4145
5.0%       | $1069.16 |   7.50   |  71.9277
6.0%       | $ 992.67 |   7.34   |  69.5003
7.0%       | $ 923.10 |   7.19   |  67.1314
8.0%       | $ 859.72 |   7.04   |  64.8202
============================================================
3. РАБОТА С ОПЦИОНАМИ (с автоматическими греками)
============================================================
Анализ call опциона AAPL_CALL (Strike: $150, 30 дней):
Волат. | Цена   | Delta | Gamma | Theta  | Vega  
--------------------------------------------------
15%    | $  6.32 | 0.810 | 0.041 | -0.046 |  0.12
20%    | $  6.99 | 0.749 | 0.036 | -0.062 |  0.14
25%    | $  7.72 | 0.709 | 0.031 | -0.077 |  0.15
30%    | $  8.50 | 0.682 | 0.027 | -0.093 |  0.16
35%    | $  9.30 | 0.662 | 0.024 | -0.108 |  0.16
============================================================
4. СОЗДАНИЕ ИНСТРУМЕНТОВ ЧЕРЕЗ РЕЕСТР
============================================================
Обновленный список зарегистрированных инструментов:
- EquityInstrument
- BondInstrument
- OptionInstrument
✅ Создан GOOGL через реестр
✅ Создан TREASURY_5Y через реестр
Параметры ценообразования для GOOGL:
- spot_price (float): Текущая цена акции
- dividend_yield (float): Дивидендная доходность
- beta (float): Бета коэффициент
- market_cap (float): Рыночная капитализация
============================================================
5. КЕШИРОВАНИЕ И ПРОИЗВОДИТЕЛЬНОСТЬ
============================================================
Первый расчет: 0.03мс
Второй расчет (кеш): 0.01мс
Ускорение: 3.0x
Результат из кеша корректен: True

Транзакционная система демонстрирует enterprise-уровень подхода к управлению портфельными операциями. Класс PortfolioState реализует паттерн «снимок состояния», который позволяет создавать точные копии портфеля для возможного отката операций. Такой подход крайне важен в финансовых системах, там где ошибочная транзакция может привести к значительным потерям.

👉🏻  Ad hoc задачи в финансовой аналитике

Класс TransactionValidator представляет гибкую систему валидации с поддержкой кастомных правил. В реальных системах подобные валидаторы содержат сотни правил: от простых проверок лимитов до сложных алгоритмов анализа рисков. Возможность добавления собственных правил валидации через lambda-функции или методы делает систему легко расширяемой.

Контекстный менеджер portfolio_transaction обеспечивает ACID-свойства для портфельных операций. Все транзакции в рамках одной сессии либо выполняются полностью, либо откатываются без изменений в портфеле. Автоматический откат при возникновении ошибок гарантирует консистентность данных.

Метаклассы в финансовом моделировании

Автоматическая генерация торговых стратегий

Метаклассы открывают уникальные возможности для создания гибких торговых систем. Вместо написания десятков похожих классов стратегий, мы можем создать метакласс, который автоматически генерирует стратегии с нужной функциональностью.

from abc import ABCMeta, abstractmethod
from typing import Type, Dict, Any, List, Callable, Optional
import inspect
class StrategyMeta(type):
"""
Метакласс для автоматической генерации торговых стратегий
с единообразным интерфейсом и встроенной функциональностью
"""
# Реестр всех созданных стратегий
strategy_registry: Dict[str, Type] = {}
def __new__(mcs, name, bases, namespace, **kwargs):
# Автоматически добавляем базовую функциональность
if 'calculate_signals' in namespace and callable(namespace['calculate_signals']):
# Оборачиваем основной метод в логирование и валидацию
original_calculate = namespace['calculate_signals']
def enhanced_calculate_signals(self, data, **params):
# Валидация входных данных
if not hasattr(data, 'index') or len(data) < getattr(self, 'min_periods', 1):
raise ValueError(f"Недостаточно данных для стратегии {name}")
# Логирование начала расчета
if hasattr(self, 'logger'):
self.logger.info(f"Расчет сигналов {name} для {len(data)} периодов")
# Выполняем основной расчет
result = original_calculate(self, data, **params)
# Автоматическое добавление метаданных к результату
if isinstance(result, dict):
result.update({
'strategy_name': name,
'calculation_timestamp': datetime.now(),
'data_periods': len(data),
'parameters_used': params
})
return result
namespace['calculate_signals'] = enhanced_calculate_signals
# Автоматически добавляем систему параметров
if 'default_parameters' not in namespace:
# Извлекаем параметры из сигнатуры calculate_signals
if 'calculate_signals' in namespace:
sig = inspect.signature(namespace['calculate_signals'])
default_params = {}
for param_name, param in sig.parameters.items():
if param_name not in ['self', 'data'] and param.default != inspect.Parameter.empty:
default_params[param_name] = param.default
namespace['default_parameters'] = default_params
else:
namespace['default_parameters'] = {}
# Автоматически добавляем валидацию параметров
def validate_parameters(self, **params):
"""Валидация параметров стратегии"""
for param_name, value in params.items():
if param_name in self.parameter_constraints:
constraint = self.parameter_constraints[param_name]
if 'min' in constraint and value < constraint['min']: raise ValueError(f"Параметр {param_name}={value} меньше минимума {constraint['min']}") if 'max' in constraint and value > constraint['max']:
raise ValueError(f"Параметр {param_name}={value} больше максимума {constraint['max']}")
if 'type' in constraint and not isinstance(value, constraint['type']):
raise TypeError(f"Параметр {param_name} должен быть типа {constraint['type']}")
return True
namespace['validate_parameters'] = validate_parameters
# Создаем класс
cls = super().__new__(mcs, name, bases, namespace)
# Регистрируем стратегию
if name != 'BaseStrategy':  # Не регистрируем базовый класс
mcs.strategy_registry[name] = cls
return cls
def __call__(cls, *args, **kwargs):
"""Перехватываем создание экземпляра для дополнительной настройки"""
instance = super().__call__(*args, **kwargs)
# Автоматически устанавливаем логгер
if not hasattr(instance, 'logger'):
import logging
instance.logger = logging.getLogger(f"Strategy.{cls.__name__}")
instance.logger.setLevel(logging.INFO)
# Инициализируем метрики производительности
instance.performance_metrics = {
'total_signals': 0,
'execution_times': [],
'last_calculation': None
}
return instance
@classmethod
def list_strategies(mcs) -> List[str]:
"""Получение списка всех зарегистрированных стратегий"""
return list(mcs.strategy_registry.keys())
@classmethod
def create_strategy(mcs, strategy_name: str, **init_params):
"""Создание экземпляра стратегии по имени"""
if strategy_name not in mcs.strategy_registry:
raise ValueError(f"Стратегия {strategy_name} не найдена. Доступны: {mcs.list_strategies()}")
strategy_class = mcs.strategy_registry[strategy_name]
return strategy_class(**init_params)
class BaseStrategy(metaclass=StrategyMeta):
"""Базовый класс для всех торговых стратегий"""
def __init__(self, name: str = None):
self.name = name or self.__class__.__name__
self.parameter_constraints = {}
self.min_periods = 1
@abstractmethod
def calculate_signals(self, data: pd.DataFrame, **params) -> Dict[str, Any]:
"""Основной метод расчета торговых сигналов"""
pass
def backtest_performance(self, historical_data: pd.DataFrame, **strategy_params) -> Dict[str, Any]:
"""Базовый метод для бэктестинга стратегии"""
signals = self.calculate_signals(historical_data, **strategy_params)
# Простой расчет производительности
if 'positions' in signals:
positions = pd.Series(signals['positions'], index=historical_data.index)
returns = historical_data['Close'].pct_change()
strategy_returns = positions.shift(1) * returns
total_return = (1 + strategy_returns).prod() - 1
volatility = strategy_returns.std() * np.sqrt(252)
sharpe_ratio = strategy_returns.mean() / strategy_returns.std() * np.sqrt(252) if strategy_returns.std() > 0 else 0
max_drawdown = (strategy_returns.cumsum() - strategy_returns.cumsum().expanding().max()).min()
return {
'total_return': total_return,
'annual_volatility': volatility,
'sharpe_ratio': sharpe_ratio,
'max_drawdown': max_drawdown,
'signals_generated': len(signals.get('signals', [])),
'strategy_name': self.name
}
return {'error': 'No positions generated'}
class MeanReversionStrategy(BaseStrategy):
"""
Стратегия возврата к среднему с автоматически генерируемой функциональностью
благодаря метаклассу получает логирование, валидацию и регистрацию
"""
def __init__(self, lookback_period: int = 20, zscore_threshold: float = 2.0):
super().__init__()
self.lookback_period = lookback_period
self.zscore_threshold = zscore_threshold
self.min_periods = lookback_period
# Определяем ограничения параметров
self.parameter_constraints = {
'lookback_period': {'min': 5, 'max': 100, 'type': int},
'zscore_threshold': {'min': 0.5, 'max': 5.0, 'type': float}
}
def calculate_signals(self, data: pd.DataFrame, 
lookback_period: int = None, 
zscore_threshold: float = None) -> Dict[str, Any]:
"""
Расчет сигналов стратегии возврата к среднему
"""
# Используем параметры экземпляра как значения по умолчанию
lookback = lookback_period or self.lookback_period
threshold = zscore_threshold or self.zscore_threshold
# Валидируем параметры (добавлено метаклассом)
self.validate_parameters(lookback_period=lookback, zscore_threshold=threshold)
prices = data['Close']
# Расчет скользящего среднего и стандартного отклонения
rolling_mean = prices.rolling(window=lookback).mean()
rolling_std = prices.rolling(window=lookback).std()
# Z-score
zscore = (prices - rolling_mean) / rolling_std
# Генерация сигналов
signals = pd.Series(0, index=data.index)
positions = pd.Series(0, index=data.index)
# Сигнал на покупку при сильном отклонении вниз
buy_signals = zscore < -threshold # Сигнал на продажу при сильном отклонении вверх sell_signals = zscore > threshold
# Закрытие позиции при возврате к среднему
close_signals = (zscore.abs() < 0.5)
signals[buy_signals] = 1
signals[sell_signals] = -1
signals[close_signals] = 0
# Расчет позиций
current_position = 0
position_list = []
for i, signal in enumerate(signals):
if signal == 1 and current_position <= 0: current_position = 1 elif signal == -1 and current_position >= 0:
current_position = -1
elif signal == 0:
current_position = 0
position_list.append(current_position)
positions = pd.Series(position_list, index=data.index)
# Обновляем метрики производительности
self.performance_metrics['total_signals'] += len(signals[signals != 0])
self.performance_metrics['last_calculation'] = datetime.now()
return {
'signals': signals,
'positions': positions,
'zscore': zscore,
'rolling_mean': rolling_mean,
'rolling_std': rolling_std,
'parameters': {
'lookback_period': lookback,
'zscore_threshold': threshold
}
}
class MomentumBreakoutStrategy(BaseStrategy):
"""
Стратегия пробоя с моментумом
"""
def __init__(self, fast_period: int = 10, slow_period: int = 30, volume_factor: float = 1.5):
super().__init__()
self.fast_period = fast_period
self.slow_period = slow_period
self.volume_factor = volume_factor
self.min_periods = slow_period
self.parameter_constraints = {
'fast_period': {'min': 3, 'max': 50, 'type': int},
'slow_period': {'min': 10, 'max': 200, 'type': int},
'volume_factor': {'min': 1.0, 'max': 5.0, 'type': float}
}
def calculate_signals(self, data: pd.DataFrame,
fast_period: int = None,
slow_period: int = None,
volume_factor: float = None) -> Dict[str, Any]:
"""Расчет сигналов стратегии пробоя"""
fast = fast_period or self.fast_period
slow = slow_period or self.slow_period
vol_factor = volume_factor or self.volume_factor
self.validate_parameters(fast_period=fast, slow_period=slow, volume_factor=vol_factor)
if fast >= slow:
raise ValueError("Быстрый период должен быть меньше медленного")
prices = data['Close']
volume = data.get('Volume', pd.Series(1, index=data.index))
# Скользящие средние
fast_ma = prices.rolling(window=fast).mean()
slow_ma = prices.rolling(window=slow).mean()
# Средний объем
avg_volume = volume.rolling(window=slow).mean()
# Условия для сигналов
momentum_up = (fast_ma > slow_ma) & (fast_ma.shift(1) <= slow_ma.shift(1))
momentum_down = (fast_ma < slow_ma) & (fast_ma.shift(1) >= slow_ma.shift(1))
volume_confirmation = volume > (avg_volume * vol_factor)
# Генерация сигналов
signals = pd.Series(0, index=data.index)
signals[momentum_up & volume_confirmation] = 1
signals[momentum_down & volume_confirmation] = -1
# Простая система позиций
positions = signals.fillna(0).replace(0, np.nan).fillna(method='ffill').fillna(0)
self.performance_metrics['total_signals'] += len(signals[signals != 0])
self.performance_metrics['last_calculation'] = datetime.now()
return {
'signals': signals,
'positions': positions,
'fast_ma': fast_ma,
'slow_ma': slow_ma,
'volume_ratio': volume / avg_volume,
'parameters': {
'fast_period': fast,
'slow_period': slow,
'volume_factor': vol_factor
}
}
class StrategyFactory:
"""
Фабрика для создания и управления торговыми стратегиями
Использует возможности метакласса для динамического создания стратегий
"""
def __init__(self):
self.active_strategies: Dict[str, BaseStrategy] = {}
def create_strategy_ensemble(self, strategy_configs: List[Dict[str, Any]]) -> Dict[str, BaseStrategy]:
"""
Создание ансамбля стратегий на основе конфигурации
"""
ensemble = {}
for config in strategy_configs:
strategy_name = config['name']
strategy_type = config['type']
strategy_params = config.get('parameters', {})
try:
strategy = StrategyMeta.create_strategy(strategy_type, **strategy_params)
strategy.name = strategy_name
ensemble[strategy_name] = strategy
print(f"Создана стратегия: {strategy_name} ({strategy_type})")
except Exception as e:
print(f"Ошибка создания стратегии {strategy_name}: {e}")
return ensemble
def run_strategy_comparison(self, strategies: Dict[str, BaseStrategy], 
test_data: pd.DataFrame) -> pd.DataFrame:
"""
Сравнительное тестирование стратегий
"""
results = []
for name, strategy in strategies.items():
try:
performance = strategy.backtest_performance(test_data)
performance['strategy_name'] = name
results.append(performance)
except Exception as e:
print(f"Ошибка тестирования стратегии {name}: {e}")
results.append({
'strategy_name': name,
'error': str(e),
'total_return': np.nan,
'sharpe_ratio': np.nan
})
return pd.DataFrame(results)
# Демонстрация работы метаклассов
def demonstrate_strategy_metaclass():
"""Демонстрация автоматической генерации стратегий"""
print("Демонстрация метаклассов для торговых стратегий:")
print("=" * 55)
# Показываем зарегистрированные стратегии
print("Автоматически зарегистрированные стратегии:")
for strategy_name in StrategyMeta.list_strategies():
print(f"  - {strategy_name}")
# Подготовка тестовых данных
try:
# Используем реальные данные
test_symbol = "MSFT"
data = yf.download(test_symbol, period="1y", interval="1d", progress=False)
if isinstance(data.columns, pd.MultiIndex):
data.columns = data.columns.droplevel(1)
if data.empty:
raise ValueError("Нет данных")
except:
# Генерируем синтетические данные
print("Используем синтетические данные для демонстрации")
dates = pd.date_range(start='2023-01-01', end='2024-01-01', freq='D')
np.random.seed(42)
# Генерируем цены с трендом и шумом
price_changes = np.random.normal(0.001, 0.02, len(dates))
prices = 100 * np.exp(np.cumsum(price_changes))
volumes = np.random.lognormal(10, 1, len(dates))
data = pd.DataFrame({
'Close': prices,
'Volume': volumes,
'High': prices * (1 + np.abs(np.random.normal(0, 0.01, len(dates)))),
'Low': prices * (1 - np.abs(np.random.normal(0, 0.01, len(dates)))),
'Open': prices
}, index=dates)
print(f"\nТестовые данные: {len(data)} дней, цена от ${data['Close'].min():.2f} до ${data['Close'].max():.2f}")
# Создаем фабрику стратегий
factory = StrategyFactory()
# Конфигурация ансамбля стратегий
strategy_configs = [
{
'name': 'Conservative_MeanReversion',
'type': 'MeanReversionStrategy',
'parameters': {'lookback_period': 30, 'zscore_threshold': 2.5}
},
{
'name': 'Aggressive_MeanReversion',
'type': 'MeanReversionStrategy',
'parameters': {'lookback_period': 15, 'zscore_threshold': 1.5}
},
{
'name': 'Fast_Momentum',
'type': 'MomentumBreakoutStrategy',
'parameters': {'fast_period': 5, 'slow_period': 20, 'volume_factor': 2.0}
},
{
'name': 'Slow_Momentum',
'type': 'MomentumBreakoutStrategy',
'parameters': {'fast_period': 15, 'slow_period': 50, 'volume_factor': 1.3}
}
]
# Создаем ансамбль
ensemble = factory.create_strategy_ensemble(strategy_configs)
# Запускаем сравнительное тестирование
print(f"\nЗапуск сравнительного тестирования {len(ensemble)} стратегий...")
comparison_results = factory.run_strategy_comparison(ensemble, data)
# Показываем результаты
print("\nРезультаты сравнительного тестирования:")
print("=" * 80)
# Сортируем по Sharpe ratio
valid_results = comparison_results.dropna(subset=['sharpe_ratio'])
if not valid_results.empty:
sorted_results = valid_results.sort_values('sharpe_ratio', ascending=False)
for _, row in sorted_results.iterrows():
print(f"{row['strategy_name']:25} | "
f"Return: {row['total_return']:7.2%} | "
f"Sharpe: {row['sharpe_ratio']:6.3f} | "
f"MaxDD: {row['max_drawdown']:7.2%}")
# Демонстрация автоматически добавленной функциональности
print(f"\n{'='*40}")
print("Демонстрация автоматически добавленных возможностей:")
# Берем одну стратегию для детального анализа
strategy = ensemble['Conservative_MeanReversion']
print(f"\nСтратегия: {strategy.name}")
print(f"Метрики производительности: {strategy.performance_metrics}")
print(f"Ограничения параметров: {strategy.parameter_constraints}")
# Тест валидации параметров
try:
strategy.validate_parameters(lookback_period=5, zscore_threshold=0.1)  # Слишком маленькие значения
except ValueError as e:
print(f"Валидация сработала (ожидаемо): {e}")
# Показываем последние сигналы
signals_result = strategy.calculate_signals(data.tail(50))  # Последние 50 дней
recent_signals = signals_result['signals']
signal_dates = recent_signals[recent_signals != 0].tail(5)
if not signal_dates.empty:
print(f"\nПоследние 5 сигналов:")
for date, signal in signal_dates.items():
signal_type = "BUY" if signal > 0 else "SELL"
price = data.loc[date, 'Close']
print(f"  {date.strftime('%Y-%m-%d')}: {signal_type} @ ${price:.2f}")
else:
print("\nСигналы в последние дни не генерировались")
# Запуск демонстрации
demonstrate_strategy_metaclass()
Расчет сигналов MeanReversionStrategy для 50 периодов
Тестовые данные: 251 дней, цена от $353.91 до $517.17
Создана стратегия: Conservative_MeanReversion (MeanReversionStrategy)
Создана стратегия: Aggressive_MeanReversion (MeanReversionStrategy)
Создана стратегия: Fast_Momentum (MomentumBreakoutStrategy)
Создана стратегия: Slow_Momentum (MomentumBreakoutStrategy)
Запуск сравнительного тестирования 4 стратегий...
Результаты сравнительного тестирования:
================================================================================
Aggressive_MeanReversion  | Return:  17.49% | Sharpe:  1.156 | MaxDD:  -5.65%
Conservative_MeanReversion | Return:   6.47% | Sharpe:  0.650 | MaxDD:  -2.53%
Fast_Momentum             | Return:   0.00% | Sharpe:  0.000 | MaxDD:   0.00%
Slow_Momentum             | Return:   0.00% | Sharpe:  0.000 | MaxDD:   0.00%
========================================
Демонстрация автоматически добавленных возможностей:
Стратегия: Conservative_MeanReversion
Метрики производительности: {'total_signals': 6, 'execution_times': [], 'last_calculation': datetime.datetime(2025, 7, 25, 18, 27, 39, 497627)}
Ограничения параметров: {'lookback_period': {'min': 5, 'max': 100, 'type': <class 'int'>}, 'zscore_threshold': {'min': 0.5, 'max': 5.0, 'type': <class 'float'>}}
Валидация сработала (ожидаемо): Параметр zscore_threshold=0.1 меньше минимума 0.5

Продемонстрированный метакласс StrategyMeta представляет мощный инструмент для создания единообразных торговых стратегий. Он автоматически добавляет к каждой стратегии важную функциональность: валидацию входных данных, логирование операций, систему ограничений параметров и автоматическую регистрацию в глобальном реестре.

👉🏻  Мониторинг ML-моделей: детекция дрифта и снижения метрик качества

Особенно ценной является автоматическая генерация системы параметров на основе сигнатуры методов. Метакласс анализирует параметры функции calculate_signals и создает словарь значений по умолчанию, что избавляет разработчика от дублирования кода и снижает вероятность ошибок.

Паттерн «Фабрика стратегий» в сочетании с метаклассом позволяет создавать сложные ансамбли стратегий на основе декларативной конфигурации. Это особенно важно в институциональной торговле, где портфель может содержать десятки различных стратегий с разными параметрами.

Динамическое создание финансовых инструментов

В современных финансах постоянно появляются новые типы инструментов и деривативов. Метаклассы позволяют создавать гибкие системы, способные динамически адаптироваться к новым требованиям рынка.

import numpy as np
import pandas as pd
from abc import ABC, abstractmethod, ABCMeta
from typing import Dict, Any, Callable
import yfinance as yf
class IndicatorMeta(ABCMeta):  # Наследуем от ABCMeta вместо type
"""
Метакласс для автоматического создания финансовых индикаторов
с единообразным интерфейсом и валидацией параметров
"""
def __new__(mcs, name, bases, namespace):
# Автоматически добавляем валидацию параметров
if 'calculate' in namespace and callable(namespace['calculate']):
original_calculate = namespace['calculate']
def validated_calculate(self, data, **kwargs):
# Проверяем входные данные
if not isinstance(data, (pd.Series, pd.DataFrame, np.ndarray)):
raise TypeError(f"Unsupported data type: {type(data)}")
if len(data) < getattr(self, 'min_periods', 1):
raise ValueError(f"Insufficient data: {len(data)} < {self.min_periods}") return original_calculate(self, data, **kwargs) namespace['calculate'] = validated_calculate # Автоматически добавляем кеширование результатов if 'calculate' in namespace: if '_cache' not in namespace: # Избегаем перезаписи если уже есть namespace['_cache'] = {} # Получаем уже обёрнутую функцию (если была валидация) current_calculate = namespace['calculate'] def cached_calculate(self, data, **kwargs): # Создаем cache если его нет в экземпляре if not hasattr(self, '_cache'): self._cache = {} # Создаем ключ кеша try: if hasattr(data, 'values'): data_bytes = data.values.tobytes() else: data_bytes = data.tobytes() cache_key = hash((data_bytes, str(sorted(kwargs.items())))) except: # Если не можем создать hash, просто выполняем без кеширования return current_calculate(self, data, **kwargs) if cache_key not in self._cache: self._cache[cache_key] = current_calculate(self, data, **kwargs) return self._cache[cache_key] namespace['calculate'] = cached_calculate return super().__new__(mcs, name, bases, namespace) class BaseIndicator(ABC, metaclass=IndicatorMeta): """Базовый класс для всех финансовых индикаторов""" def __init__(self, name: str, min_periods: int = 1): self.name = name self.min_periods = min_periods self._cache = {} # Инициализируем кеш в экземпляре @abstractmethod def calculate(self, data, **kwargs): pass class AdvancedRSI(BaseIndicator): """ Продвинутая реализация RSI с дополнительными возможностями благодаря метаклассу получает автоматическую валидацию и кеширование """ def __init__(self, period: int = 14, smooth_period: int = 3): super().__init__(f"Advanced_RSI_{period}_{smooth_period}", period) self.period = period self.smooth_period = smooth_period def calculate(self, data, **kwargs): """ Расчет RSI с дополнительным сглаживанием """ if isinstance(data, pd.DataFrame): prices = data['Close'] else: prices = data delta = prices.diff() gains = delta.where(delta > 0, 0)
losses = -delta.where(delta < 0, 0) # Используем экспоненциальное сглаживание вместо простого скользящего среднего avg_gains = gains.ewm(span=self.period, adjust=False).mean() avg_losses = losses.ewm(span=self.period, adjust=False).mean() rs = avg_gains / avg_losses rsi = 100 - (100 / (1 + rs)) # Дополнительное сглаживание для уменьшения шума if self.smooth_period > 1:
rsi = rsi.rolling(window=self.smooth_period).mean()
return rsi.fillna(50)  # Заполняем NaN средним значением RSI
# Дополнительный пример индикатора для демонстрации
class MovingAverage(BaseIndicator):
"""Простая скользящая средняя"""
def __init__(self, period: int = 20):
super().__init__(f"SMA_{period}", period)
self.period = period
def calculate(self, data, **kwargs):
if isinstance(data, pd.DataFrame):
prices = data['Close']
else:
prices = data
return prices.rolling(window=self.period).mean()
# Пример использования
if __name__ == "__main__":
try:
ticker = "BTC-USD"
btc_data = yf.download(ticker, period="1y", interval="1d")
# Проверяем на MultiIndex и приводим к нужному формату
if isinstance(btc_data.columns, pd.MultiIndex):
btc_data.columns = btc_data.columns.droplevel(1)
# Создаем индикаторы
rsi_indicator = AdvancedRSI(period=21, smooth_period=5)
ma_indicator = MovingAverage(period=50)
# Рассчитываем значения
rsi_values = rsi_indicator.calculate(btc_data)
ma_values = ma_indicator.calculate(btc_data)
print(f"RSI для {ticker}:")
print(rsi_values.tail().round(2))
print(f"\nMA для {ticker}:")
print(ma_values.tail().round(2))
# Демонстрация кеширования - второй вызов будет быстрее
print("\nТестирование кеширования...")
import time
start = time.time()
rsi_values_2 = rsi_indicator.calculate(btc_data)
end = time.time()
print(f"Время второго вызова (с кешем): {end - start:.4f} сек")
except Exception as e:
print(f"Ошибка: {e}")
print("Убедитесь, что установлен yfinance: pip install yfinance")
RSI для BTC-USD:
Date
2025-07-21    65.79
2025-07-22    65.31
2025-07-23    64.81
2025-07-24    63.97
2025-07-25    62.02
Name: Close, dtype: float64
MA для BTC-USD:
Date
2025-07-21    109275.59
2025-07-22    109557.87
2025-07-23    109824.32
2025-07-24    110097.04
2025-07-25    110394.82
Name: Close, dtype: float64
Тестирование кеширования...
Время второго вызова (с кешем): 0.0002 сек

Приведенный код демонстрирует мощь метапрограммирования в финансовых приложениях. Метакласс IndicatorMeta автоматически добавляет к каждому индикатору два критически важных функционала: валидацию входных данных и кеширование результатов. Это означает, что разработчику не нужно помнить о реализации этих аспектов в каждом новом индикаторе — они добавляются автоматически на этапе создания класса.

Валидация данных особенно важна в финансовых приложениях, где некорректные входные данные могут привести к катастрофическим последствиям. Метакласс проверяет тип данных, их достаточность для расчета и автоматически генерирует понятные сообщения об ошибках. Кеширование же позволяет избежать повторных вычислений одних и тех же значений, что особенно актуально при работе с большими историческими датасетами.

Класс AdvancedRSI демонстрирует, как благодаря метаклассу мы можем сосредоточиться исключительно на бизнес-логике индикатора, не отвлекаясь на инфраструктурные задачи.

Проектирование отказоустойчивых финансовых систем

В высокочастотной торговле и управлении рисками отказоустойчивость системы не просто желательна — она абсолютно необходима. Потеря соединения с биржей, сбой в получении данных или ошибка в расчетах могут привести к многомиллионным убыткам. Продвинутые возможности Python позволяют элегантно решать эти проблемы на архитектурном уровне.

import time
import logging
import asyncio
import aiohttp
from functools import wraps, lru_cache
from contextlib import contextmanager, asynccontextmanager
from typing import Optional, Any, Callable, Dict, List, Generator
import threading
from dataclasses import dataclass, field
from enum import Enum
from concurrent.futures import ThreadPoolExecutor, as_completed
import numpy as np
import pandas as pd
import yfinance as yf
from datetime import datetime
import psutil
import gc
from numba import jit, njit
import warnings
warnings.filterwarnings('ignore')
# Оптимизированные численные функции с использованием Numba для ускорения вычислений
@njit(fastmath=True)
def fast_correlation_matrix(returns_array):
"""Быстрый расчет корреляционной матрицы с использованием Numba"""
n_assets, n_periods = returns_array.shape
corr_matrix = np.eye(n_assets)
for i in range(n_assets):
for j in range(i + 1, n_assets):
# Вычисляем корреляцию между активами i и j
x, y = returns_array[i], returns_array[j]
# Удаляем NaN значения
valid_mask = ~(np.isnan(x) | np.isnan(y))
if np.sum(valid_mask) < 2: corr_matrix[i, j] = corr_matrix[j, i] = 0.0 continue x_clean, y_clean = x[valid_mask], y[valid_mask] # Корреляция Пирсона mean_x, mean_y = np.mean(x_clean), np.mean(y_clean) numerator = np.sum((x_clean - mean_x) * (y_clean - mean_y)) sum_sq_x = np.sum((x_clean - mean_x) ** 2) sum_sq_y = np.sum((y_clean - mean_y) ** 2) if sum_sq_x > 0 and sum_sq_y > 0:
corr = numerator / np.sqrt(sum_sq_x * sum_sq_y)
corr_matrix[i, j] = corr_matrix[j, i] = corr
else:
corr_matrix[i, j] = corr_matrix[j, i] = 0.0
return corr_matrix
@njit(fastmath=True)
def fast_portfolio_metrics(weights, returns_mean, cov_matrix):
"""Быстрый расчет метрик портфеля"""
portfolio_return = np.dot(weights, returns_mean)
portfolio_variance = np.dot(weights, np.dot(cov_matrix, weights))
portfolio_volatility = np.sqrt(portfolio_variance)
if portfolio_volatility > 0:
sharpe_ratio = portfolio_return / portfolio_volatility
else:
sharpe_ratio = 0.0
return portfolio_return, portfolio_volatility, sharpe_ratio
@njit(fastmath=True)
def fast_risk_metrics(returns):
"""Быстрый расчет риск-метрик"""
# Убираем NaN
clean_returns = returns[~np.isnan(returns)]
if len(clean_returns) < 2: return 0.0, 0.0, 0.0, 0.0 # VaR sorted_returns = np.sort(clean_returns) n = len(sorted_returns) var_95_idx = max(0, int(0.05 * n) - 1) var_99_idx = max(0, int(0.01 * n) - 1) var_95 = sorted_returns[var_95_idx] var_99 = sorted_returns[var_99_idx] # CVaR (95%) cvar_95 = np.mean(sorted_returns[:var_95_idx + 1]) if var_95_idx >= 0 else var_95
# Maximum Drawdown
cumulative = np.cumprod(1 + clean_returns)
running_max = np.maximum.accumulate(cumulative)
drawdowns = (cumulative - running_max) / running_max
max_drawdown = np.min(drawdowns)
return var_95, var_99, cvar_95, max_drawdown
@njit(fastmath=True)
def calculate_skewness(returns):
"""Быстрый расчет асимметрии"""
n = len(returns)
if n < 3:
return 0.0
mean_val = np.mean(returns)
std_val = np.std(returns)
if std_val == 0:
return 0.0
# Расчет третьего момента
third_moment = np.mean(((returns - mean_val) / std_val) ** 3)
return third_moment
@njit(fastmath=True)
def calculate_kurtosis(returns):
"""Быстрый расчет эксцесса"""
n = len(returns)
if n < 4: return 0.0 mean_val = np.mean(returns) std_val = np.std(returns) if std_val == 0: return 0.0 # Расчет четвертого момента fourth_moment = np.mean(((returns - mean_val) / std_val) ** 4) return fourth_moment - 3.0 # Избыточный эксцесс # Улучшенный Circuit Breaker с асинхронной поддержкой class CircuitState(Enum): CLOSED = "closed" OPEN = "open" HALF_OPEN = "half_open" @dataclass class CircuitBreakerConfig: failure_threshold: int = 5 recovery_timeout: int = 60 expected_exception: tuple = (Exception,) half_open_max_calls: int = 3 class AsyncCircuitBreaker: """Асинхронная версия Circuit Breaker с улучшенной производительностью""" def __init__(self, config: CircuitBreakerConfig): self.config = config self.failure_count = 0 self.success_count = 0 self.last_failure_time = None self.state = CircuitState.CLOSED self._lock = asyncio.Lock() # Быстрое логирование без блокировки self.logger = logging.getLogger(f"AsyncCircuitBreaker_{id(self)}") self.logger.setLevel(logging.WARNING) # Только критичные сообщения async def call(self, func: Callable, *args, **kwargs) -> Any:
"""Асинхронное выполнение функции с защитой Circuit Breaker"""
async with self._lock:
current_time = time.time()
if self.state == CircuitState.OPEN:
if self._should_attempt_reset(current_time):
self.state = CircuitState.HALF_OPEN
self.success_count = 0
else:
raise Exception("Circuit breaker OPEN - вызов заблокирован")
try:
if asyncio.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
result = func(*args, **kwargs)
self._on_success()
return result
except self.config.expected_exception as e:
self._on_failure(current_time)
raise e
def _should_attempt_reset(self, current_time: float) -> bool:
return (current_time - self.last_failure_time) >= self.config.recovery_timeout
def _on_success(self):
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.config.half_open_max_calls:
self.state = CircuitState.CLOSED
self.failure_count = 0
def _on_failure(self, current_time: float):
self.failure_count += 1
self.last_failure_time = current_time
if self.failure_count >= self.config.failure_threshold:
self.state = CircuitState.OPEN
if self.logger.isEnabledFor(logging.WARNING):
self.logger.warning(f"Circuit breaker ОТКРЫТ после {self.failure_count} сбоев")
# Кешированная загрузка данных с пулом соединений
class DataCache:
"""Высокопроизводительный кеш с TTL и LRU вытеснением"""
def __init__(self, max_size: int = 1000, ttl_seconds: int = 3600):
self.max_size = max_size
self.ttl_seconds = ttl_seconds
self.cache = {}
self.access_times = {}
self._lock = threading.RLock()
def get(self, key: str) -> Optional[Any]:
with self._lock:
if key not in self.cache:
return None
current_time = time.time()
if current_time - self.access_times[key] > self.ttl_seconds:
del self.cache[key]
del self.access_times[key]
return None
self.access_times[key] = current_time
return self.cache[key]
def set(self, key: str, value: Any):
with self._lock:
current_time = time.time()
# Если кеш переполнен, удаляем старые записи
if len(self.cache) >= self.max_size and key not in self.cache:
# Удаляем самую старую запись
oldest_key = min(self.access_times.keys(), key=self.access_times.get)
del self.cache[oldest_key]
del self.access_times[oldest_key]
self.cache[key] = value
self.access_times[key] = current_time
# Глобальный кеш данных
_data_cache = DataCache(max_size=500, ttl_seconds=1800)  # 30 минут TTL
# Circuit breaker для API вызовов
_api_breaker_config = CircuitBreakerConfig(
failure_threshold=3,
recovery_timeout=30,
expected_exception=(ConnectionError, TimeoutError, ValueError)
)
_api_breaker = AsyncCircuitBreaker(_api_breaker_config)
async def fetch_market_data_async(symbol: str, period: str = "6mo") -> pd.DataFrame:
"""Асинхронная загрузка рыночных данных с кешированием и защитой от сбоев"""
cache_key = f"{symbol}_{period}"
# Проверяем кеш
cached_data = _data_cache.get(cache_key)
if cached_data is not None:
return cached_data
# Загружаем данные через circuit breaker
async def _fetch():
# yfinance не поддерживает async, выполняем в thread pool
loop = asyncio.get_event_loop()
data = await loop.run_in_executor(
None, 
lambda: yf.download(symbol, period=period, interval="1d", progress=False, threads=True)
)
if isinstance(data.columns, pd.MultiIndex):
data.columns = data.columns.droplevel(1)
if data.empty:
raise ValueError(f"Нет данных для символа {symbol}")
return data
try:
data = await _api_breaker.call(_fetch)
_data_cache.set(cache_key, data)
return data
except Exception as e:
logging.error(f"Ошибка получения данных для {symbol}: {e}")
raise
# Оптимизированные контекстные менеджеры
@dataclass
class ResourceMonitor:
"""Легковесный монитор ресурсов"""
start_time: float = field(default_factory=time.time)
start_memory: float = field(default_factory=lambda: psutil.Process().memory_info().rss / 1024 / 1024)
max_memory: float = field(default_factory=lambda: psutil.Process().memory_info().rss / 1024 / 1024)
calculations_count: int = 0
warnings: List[str] = field(default_factory=list)
def check_memory(self, limit_mb: int = 1500) -> float:
"""Быстрая проверка памяти без избыточного логирования"""
current_memory = psutil.Process().memory_info().rss / 1024 / 1024
self.max_memory = max(self.max_memory, current_memory)
if current_memory > limit_mb:
if len(self.warnings) < 5: # Ограничиваем количество предупреждений self.warnings.append(f"Память: {current_memory:.1f} MB > {limit_mb} MB")
gc.collect()  # Принудительная сборка мусора
return current_memory
@contextmanager
def optimized_calculation_context(name: str, memory_limit_mb: int = 1500):
"""Оптимизированный контекстный менеджер для вычислений"""
monitor = ResourceMonitor()
try:
yield monitor
except MemoryError:
gc.collect()
raise
finally:
# Минимальное логирование только в случае проблем
duration = time.time() - monitor.start_time
memory_delta = monitor.max_memory - monitor.start_memory
if monitor.warnings or duration > 30 or abs(memory_delta) > 100:
print(f"[{name}] Время: {duration:.1f}с, Память: {memory_delta:+.1f}MB, "
f"Расчетов: {monitor.calculations_count}, Предупреждений: {len(monitor.warnings)}")
# Высокопроизводительная аналитика портфеля
class OptimizedPortfolioAnalyzer:
"""Оптимизированный анализатор портфеля с параллельными вычислениями"""
def __init__(self, max_workers: int = 4):
self.max_workers = max_workers
self.executor = ThreadPoolExecutor(max_workers=max_workers)
async def analyze_portfolio_async(self, symbols: List[str], period: str = "6mo") -> Dict[str, Any]:
"""Асинхронный анализ портфеля с максимальной производительностью"""
with optimized_calculation_context(f"Portfolio_{len(symbols)}_assets") as monitor:
# Параллельная загрузка данных
data_tasks = [fetch_market_data_async(symbol, period) for symbol in symbols]
portfolio_data = {}
# Ожидаем загрузку всех данных одновременно
results = await asyncio.gather(*data_tasks, return_exceptions=True)
for symbol, result in zip(symbols, results):
if not isinstance(result, Exception) and not result.empty:
portfolio_data[symbol] = result
else:
print(f"Пропуск {symbol}: {result if isinstance(result, Exception) else 'Нет данных'}")
if not portfolio_data:
raise ValueError("Не удалось загрузить данные ни для одного символа")
# Параллельная обработка данных
loop = asyncio.get_event_loop()
# Задача 1: Подготовка данных доходностей
def prepare_returns():
returns_data = {}
for symbol, data in portfolio_data.items():
returns = data['Close'].pct_change().dropna()
returns_data[symbol] = returns.values
return returns_data
returns_data = await loop.run_in_executor(self.executor, prepare_returns)
monitor.calculations_count += 1
# Преобразуем в numpy массив для Numba
symbols_list = list(returns_data.keys())
returns_array = np.array([returns_data[symbol] for symbol in symbols_list])
# Задача 2: Быстрый расчет корреляционной матрицы
correlation_task = loop.run_in_executor(
self.executor, 
lambda: fast_correlation_matrix(returns_array)
)
# Задача 3: Расчет базовых статистик
def calculate_basic_stats():
stats = {}
for i, symbol in enumerate(symbols_list):
returns = returns_array[i]
clean_returns = returns[~np.isnan(returns)]
if len(clean_returns) > 0:
stats[symbol] = {
'mean_return': np.mean(clean_returns) * 252,
'volatility': np.std(clean_returns) * np.sqrt(252),
'skewness': calculate_skewness(clean_returns),
'kurtosis': calculate_kurtosis(clean_returns)
}
else:
stats[symbol] = {'mean_return': 0, 'volatility': 0, 'skewness': 0, 'kurtosis': 0}
return stats
stats_task = loop.run_in_executor(self.executor, calculate_basic_stats)
# Ожидаем завершения параллельных задач
correlation_matrix, basic_stats = await asyncio.gather(correlation_task, stats_task)
monitor.calculations_count += 2
# Задача 4: Оптимизация портфеля (Monte Carlo)
mean_returns = np.array([basic_stats[symbol]['mean_return'] for symbol in symbols_list])
volatilities = np.array([basic_stats[symbol]['volatility'] for symbol in symbols_list])
# Создаем ковариационную матрицу из корреляций и волатильностей
cov_matrix = np.outer(volatilities, volatilities) * correlation_matrix
optimization_task = loop.run_in_executor(
self.executor,
self._optimize_portfolio_fast,
mean_returns,
cov_matrix,
5000  # Количество симуляций
)
# Задача 5: Анализ рисков для равновесного портфеля
equal_weights = np.ones(len(symbols_list)) / len(symbols_list)
equal_portfolio_returns = np.dot(returns_array.T, equal_weights)
risk_task = loop.run_in_executor(
self.executor,
lambda: fast_risk_metrics(equal_portfolio_returns)
)
# Завершаем оптимизацию и анализ рисков
optimal_portfolio, (var_95, var_99, cvar_95, max_drawdown) = await asyncio.gather(
optimization_task, risk_task
)
monitor.calculations_count += 2
# Формируем результаты
results = {
'correlation_matrix': {
symbols_list[i]: {
symbols_list[j]: float(correlation_matrix[i, j]) 
for j in range(len(symbols_list))
}
for i in range(len(symbols_list))
},
'basic_statistics': basic_stats,
'optimal_portfolio': {
'weights': dict(zip(symbols_list, optimal_portfolio['weights'])),
'expected_return': float(optimal_portfolio['return']),
'volatility': float(optimal_portfolio['volatility']),
'sharpe_ratio': float(optimal_portfolio['sharpe_ratio'])
},
'risk_metrics': {
'var_95': float(var_95),
'var_99': float(var_99),
'cvar_95': float(cvar_95),
'max_drawdown': float(max_drawdown),
'calmar_ratio': float(optimal_portfolio['return'] / abs(max_drawdown)) if max_drawdown != 0 else 0.0
},
'analysis_summary': {
'symbols_analyzed': len(portfolio_data),
'symbols_requested': len(symbols),
'avg_correlation': float(np.mean(correlation_matrix[np.triu_indices_from(correlation_matrix, k=1)])),
'max_correlation': float(np.max(correlation_matrix[np.triu_indices_from(correlation_matrix, k=1)])),
'min_correlation': float(np.min(correlation_matrix[np.triu_indices_from(correlation_matrix, k=1)]))
}
}
monitor.check_memory()
return results
def _optimize_portfolio_fast(self, mean_returns, cov_matrix, n_simulations):
"""Быстрая оптимизация портфеля методом Монте-Карло"""
n_assets = len(mean_returns)
best_sharpe = -np.inf
best_portfolio = None
# Генерируем случайные веса батчами для лучшей производительности
for _ in range(n_simulations):
weights = np.random.dirichlet(np.ones(n_assets))
port_return, port_vol, sharpe = fast_portfolio_metrics(weights, mean_returns, cov_matrix)
if sharpe > best_sharpe:
best_sharpe = sharpe
best_portfolio = {
'weights': weights,
'return': port_return,
'volatility': port_vol,
'sharpe_ratio': sharpe
}
return best_portfolio
def __del__(self):
"""Корректное закрытие пула потоков"""
if hasattr(self, 'executor'):
self.executor.shutdown(wait=False)
# Демонстрационные функции для тестирования производительности
async def benchmark_optimization():
"""Бенчмарк производительности оптимизированной системы"""
print("Бенчмарк оптимизированной торговой системы")
print("Сравнение скорости обработки различных портфелей")
test_portfolios = [
(["BTC-USD", "ETH-USD"], "Крипто пара"),
(["AAPL", "GOOGL", "MSFT", "TSLA"], "Топ технологии"),
(["BTC-USD", "ETH-USD", "AAPL", "GOOGL", "MSFT", "TSLA", "SPY", "QQQ"], "Смешанный портфель")
]
analyzer = OptimizedPortfolioAnalyzer(max_workers=6)
for symbols, description in test_portfolios:
print(f"\nАнализ: {description} ({len(symbols)} активов)")
start_time = time.time()
try:
results = await analyzer.analyze_portfolio_async(symbols, period="3mo")
end_time = time.time()
print(f"  Время выполнения: {end_time - start_time:.2f} секунд")
print(f"  Оптимальный коэф. Шарпа: {results['optimal_portfolio']['sharpe_ratio']:.3f}")
print(f"  Максимальная корреляция: {results['analysis_summary']['max_correlation']:.3f}")
print(f"  VaR (95%): {results['risk_metrics']['var_95']:.4f}")
except Exception as e:
end_time = time.time()
print(f"  Ошибка за {end_time - start_time:.2f} сек: {e}")
def synchronous_portfolio_analysis(symbols: List[str], period: str = "3mo") -> Dict[str, Any]:
"""Синхронная версия для сравнения производительности"""
# Простая синхронная загрузка данных
portfolio_data = {}
for symbol in symbols:
try:
data = yf.download(symbol, period=period, interval="1d", progress=False)
if isinstance(data.columns, pd.MultiIndex):
data.columns = data.columns.droplevel(1)
if not data.empty:
portfolio_data[symbol] = data
except Exception:
continue
if not portfolio_data:
raise ValueError("Нет данных для анализа")
# Простой расчет корреляций
returns_data = {}
for symbol, data in portfolio_data.items():
returns_data[symbol] = data['Close'].pct_change().dropna()
returns_df = pd.DataFrame(returns_data)
correlation_matrix = returns_df.corr()
# Простая оптимизация
mean_returns = returns_df.mean() * 252
cov_matrix = returns_df.cov() * 252
n_portfolios = 1000
best_sharpe = -np.inf
best_weights = None
for _ in range(n_portfolios):
weights = np.random.random(len(returns_df.columns))
weights /= weights.sum()
portfolio_return = np.sum(weights * mean_returns)
portfolio_vol = np.sqrt(np.dot(weights.T, np.dot(cov_matrix, weights)))
sharpe_ratio = portfolio_return / portfolio_vol if portfolio_vol > 0 else 0
if sharpe_ratio > best_sharpe:
best_sharpe = sharpe_ratio
best_weights = weights
return {
'correlation_matrix': correlation_matrix.to_dict(),
'optimal_portfolio': {
'weights': dict(zip(returns_df.columns, best_weights)),
'sharpe_ratio': best_sharpe
}
}
async def compare_performance():
"""Сравнение производительности синхронной и асинхронной версий"""
test_symbols = ["AAPL", "GOOGL", "MSFT", "TSLA", "NVDA"]
print("Сравнение производительности:")
# Тест синхронной версии
print("\nСинхронная версия:")
sync_start = time.time()
try:
sync_results = synchronous_portfolio_analysis(test_symbols)
sync_end = time.time()
print(f"  Время: {sync_end - sync_start:.2f} сек")
print(f"  Sharpe: {sync_results['optimal_portfolio']['sharpe_ratio']:.3f}")
except Exception as e:
sync_end = time.time()
print(f"  Ошибка за {sync_end - sync_start:.2f} сек: {e}")
# Тест асинхронной версии
print("\nАсинхронная оптимизированная версия:")
async_start = time.time()
try:
analyzer = OptimizedPortfolioAnalyzer()
async_results = await analyzer.analyze_portfolio_async(test_symbols, period="3mo")
async_end = time.time()
print(f"  Время: {async_end - async_start:.2f} сек")
print(f"  Sharpe: {async_results['optimal_portfolio']['sharpe_ratio']:.3f}")
# Расчет ускорения
if 'sync_end' in locals() and 'sync_start' in locals():
speedup = (sync_end - sync_start) / (async_end - async_start)
print(f"  Ускорение: {speedup:.1f}x")
except Exception as e:
async_end = time.time()
print(f"  Ошибка за {async_end - async_start:.2f} сек: {e}")
# Главная функция для запуска всех тестов
async def main():
"""Основная функция для демонстрации оптимизированной системы"""
print("Запуск оптимизированной системы торговой аналитики")
# Настройка логирования для минимального вывода
logging.basicConfig(level=logging.ERROR)
try:
# Бенчмарк различных портфелей
await benchmark_optimization()
print("\n" + "-" * 50)
# Сравнение производительности
await compare_performance()
except KeyboardInterrupt:
print("\nОстановлено пользователем")
except Exception as e:
print(f"Критическая ошибка: {e}")
print("\nЗавершение работы системы")
# Простая функция для запуска в Jupyter без проблем с event loop
def run_analysis():
"""Простая функция для запуска анализа без асинхронности"""
print("Запуск упрощенной версии системы торговой аналитики")
# Настройка логирования для минимального вывода
logging.basicConfig(level=logging.ERROR)
test_symbols = ["AAPL", "GOOGL", "MSFT", "TSLA"]
print(f"\nАнализ портфеля: {test_symbols}")
# Синхронная версия для демонстрации
start_time = time.time()
try:
results = synchronous_portfolio_analysis(test_symbols, period="3mo")
end_time = time.time()
print(f"Время выполнения: {end_time - start_time:.2f} секунд")
print(f"Оптимальный коэф. Шарпа: {results['optimal_portfolio']['sharpe_ratio']:.3f}")
# Показываем веса портфеля
print("Оптимальные веса:")
for symbol, weight in results['optimal_portfolio']['weights'].items():
print(f"  {symbol}: {weight:.1%}")
except Exception as e:
print(f"Ошибка: {e}")
# Для асинхронного запуска (если нужно)
async def run_async_analysis():
"""Асинхронная версия для более продвинутого анализа"""
try:
await main()
except Exception as e:
print(f"Ошибка в асинхронном анализе: {e}")
# Для запуска в Jupyter/IPython
if __name__ == "__main__":
# Простой запуск без асинхронности
run_analysis()
Анализ портфеля: ['AAPL', 'GOOGL', 'MSFT', 'TSLA']
Время выполнения: 0.86 секунд
Оптимальный коэф. Шарпа: 5.826
Оптимальные веса:
AAPL: 13.2%
GOOGL: 11.8%
MSFT: 73.9%
TSLA: 1.0%

Представленные контекстные менеджеры демонстрируют профессиональный подход к управлению ресурсами в финансовых приложениях. Класс ExchangeConnectionPool решает проблему эффективного использования HTTP-соединений при работе с внешними API. Пулинг соединений существенно снижает overhead на установку TCP-соединений, а встроенный rate limiting предотвращает блокировку со стороны биржевых API.

👉🏻  NPV (Net Present Value, Чистая приведенная стоимость)

Контекстный менеджер trading_data_session обеспечивает надежную сессию для работы с торговыми данными. Он автоматически предзагружает необходимые данные, кеширует результаты и ведет детальный лог ошибок. Такой подход наиболее эффективен в высокочастотной торговле, где потеря даже одного тика данных может существенно повлиять на результат.

Менеджер portfolio_calculation_context решает проблему контроля ресурсов при выполнении сложных портфельных расчетов. Мониторинг памяти в реальном времени предотвращает крах приложения из-за переполнения RAM, что особенно актуально при работе с большими портфелями или длинными историческими периодами.

Транзакционная безопасность финансовых операций

В финансовых системах принцип атомарности операций не просто хорошая практика — это требование безопасности. Любая торговая операция должна быть либо выполнена полностью, либо отменена без последствий. Контекстные менеджеры позволяют элегантно реализовать этот принцип.

from contextlib import contextmanager
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Callable, Generator
from enum import Enum
import copy
import json
from decimal import Decimal, ROUND_HALF_UP
from datetime import datetime
import time
import logging
import gc
import psutil
import numpy as np
class TransactionStatus(Enum):
PENDING = "pending"
COMMITTED = "committed"
ROLLED_BACK = "rolled_back"
FAILED = "failed"
@dataclass
class Position:
"""Позиция в портфеле"""
symbol: str
quantity: Decimal
avg_price: Decimal
last_updated: datetime = field(default_factory=datetime.now)
@property
def market_value(self) -> Decimal:
# В реальной системе здесь был бы запрос текущей цены
return self.quantity * self.avg_price
@dataclass
class Transaction:
"""Транзакция изменения портфеля"""
transaction_id: str
timestamp: datetime
operation_type: str
symbol: str
quantity_change: Decimal
price: Decimal
status: TransactionStatus = TransactionStatus.PENDING
metadata: Dict[str, Any] = field(default_factory=dict)
class PortfolioState:
"""Состояние портфеля с поддержкой транзакций"""
def __init__(self, portfolio_id: str, initial_cash: Decimal = Decimal('100000')):
self.portfolio_id = portfolio_id
self.cash = initial_cash
self.positions: Dict[str, Position] = {}
self.transaction_log: List[Transaction] = []
self._version = 0
def get_snapshot(self) -> Dict[str, Any]:
"""Создание снимка текущего состояния"""
return {
'portfolio_id': self.portfolio_id,
'cash': float(self.cash),
'positions': {symbol: {
'quantity': float(pos.quantity),
'avg_price': float(pos.avg_price),
'market_value': float(pos.market_value)
} for symbol, pos in self.positions.items()},
'version': self._version,
'total_value': float(self.get_total_value())
}
def restore_from_snapshot(self, snapshot: Dict[str, Any]):
"""Восстановление состояния из снимка"""
self.cash = Decimal(str(snapshot['cash']))
self.positions.clear()
for symbol, pos_data in snapshot['positions'].items():
self.positions[symbol] = Position(
symbol=symbol,
quantity=Decimal(str(pos_data['quantity'])),
avg_price=Decimal(str(pos_data['avg_price']))
)
self._version = snapshot['version']
def get_total_value(self) -> Decimal:
"""Общая стоимость портфеля"""
positions_value = sum(pos.market_value for pos in self.positions.values())
return self.cash + positions_value
def apply_transaction(self, transaction: Transaction):
"""Применение транзакции к портфелю"""
if transaction.operation_type == "BUY":
self._execute_buy(transaction)
elif transaction.operation_type == "SELL":
self._execute_sell(transaction)
else:
raise ValueError(f"Неизвестный тип операции: {transaction.operation_type}")
transaction.status = TransactionStatus.COMMITTED
self.transaction_log.append(transaction)
self._version += 1
def _execute_buy(self, transaction: Transaction):
"""Выполнение покупки"""
total_cost = transaction.quantity_change * transaction.price
if self.cash < total_cost:
raise ValueError(f"Недостаточно средств: {self.cash} < {total_cost}")
self.cash -= total_cost
if transaction.symbol in self.positions:
# Обновляем существующую позицию
pos = self.positions[transaction.symbol]
total_quantity = pos.quantity + transaction.quantity_change
total_cost_basis = (pos.quantity * pos.avg_price + 
transaction.quantity_change * transaction.price)
pos.quantity = total_quantity
pos.avg_price = total_cost_basis / total_quantity
pos.last_updated = transaction.timestamp
else:
# Создаем новую позицию
self.positions[transaction.symbol] = Position(
symbol=transaction.symbol,
quantity=transaction.quantity_change,
avg_price=transaction.price,
last_updated=transaction.timestamp
)
def _execute_sell(self, transaction: Transaction):
"""Выполнение продажи"""
if transaction.symbol not in self.positions:
raise ValueError(f"Нет позиции для продажи: {transaction.symbol}")
pos = self.positions[transaction.symbol]
if pos.quantity < transaction.quantity_change:
raise ValueError(f"Недостаточно акций для продажи: {pos.quantity} < {transaction.quantity_change}") # Обновляем позицию pos.quantity -= transaction.quantity_change pos.last_updated = transaction.timestamp # Добавляем деньги в кеш proceeds = transaction.quantity_change * transaction.price self.cash += proceeds # Удаляем позицию если количество стало нулевым if pos.quantity == 0: del self.positions[transaction.symbol] class TransactionValidator: """Валидатор транзакций с настраиваемыми правилами""" def __init__(self): self.validation_rules: List[Callable[[Transaction, PortfolioState], bool]] = [] self.risk_limits = { 'max_position_size': Decimal('0.3'), # Максимум 30% портфеля в одной позиции 'max_daily_turnover': Decimal('0.2'), # Максимум 20% оборота в день 'min_cash_reserve': Decimal('0.05') # Минимум 5% в кеше } def add_validation_rule(self, rule: Callable[[Transaction, PortfolioState], bool]): """Добавление кастомного правила валидации""" self.validation_rules.append(rule) def validate_transaction(self, transaction: Transaction, portfolio: PortfolioState) -> List[str]:
"""Валидация транзакции, возвращает список ошибок"""
errors = []
# Базовые проверки
if transaction.quantity_change <= 0:
errors.append("Количество должно быть положительным")
if transaction.price <= 0: errors.append("Цена должна быть положительной") # Проверка лимитов риска if transaction.operation_type == "BUY": # Проверяем лимит позиции future_position_value = transaction.quantity_change * transaction.price if transaction.symbol in portfolio.positions: future_position_value += portfolio.positions[transaction.symbol].market_value total_portfolio_value = portfolio.get_total_value() if total_portfolio_value > 0:
position_ratio = future_position_value / total_portfolio_value
if position_ratio > self.risk_limits['max_position_size']:
errors.append(f"Превышен лимит позиции: {position_ratio:.1%} > {self.risk_limits['max_position_size']:.1%}")
# Проверяем резерв наличности
total_cost = transaction.quantity_change * transaction.price
future_cash = portfolio.cash - total_cost
future_cash_ratio = future_cash / total_portfolio_value
if future_cash_ratio < self.risk_limits['min_cash_reserve']:
errors.append(f"Недостаточный резерв наличности: {future_cash_ratio:.1%} < {self.risk_limits['min_cash_reserve']:.1%}") # Применяем кастомные правила for rule in self.validation_rules: try: if not rule(transaction, portfolio): errors.append(f"Нарушено правило валидации: {rule.__name__}") except Exception as e: errors.append(f"Ошибка в правиле валидации {rule.__name__}: {e}") return errors @contextmanager def portfolio_transaction(portfolio: PortfolioState, validator: Optional[TransactionValidator] = None, auto_rollback: bool = True) -> Generator[List[Transaction], None, None]:
"""
Контекстный менеджер для безопасного выполнения портфельных транзакций
Обеспечивает атомарность и возможность отката
"""
# Создаем снимок состояния для возможного отката
initial_snapshot = portfolio.get_snapshot()
transaction_batch: List[Transaction] = []
validation_errors: List[str] = []
print(f"Начало транзакционной сессии для портфеля {portfolio.portfolio_id}")
print(f"Исходное состояние: ${portfolio.get_total_value():,.2f}")
try:
# Передаем управление пользователю
yield transaction_batch
# Валидируем все транзакции в батче
if validator:
for transaction in transaction_batch:
errors = validator.validate_transaction(transaction, portfolio)
validation_errors.extend(errors)
if validation_errors:
raise ValueError(f"Ошибки валидации: {'; '.join(validation_errors)}")
# Применяем все транзакции
for transaction in transaction_batch:
portfolio.apply_transaction(transaction)
print(f"Успешное выполнение {len(transaction_batch)} транзакций")
print(f"Новое состояние портфеля: ${portfolio.get_total_value():,.2f}")
# Логируем детали транзакций
for transaction in transaction_batch:
print(f"  {transaction.operation_type} {float(transaction.quantity_change)} "
f"{transaction.symbol} @ ${float(transaction.price)}")
except Exception as e:
print(f"ОШИБКА в транзакционной сессии: {e}")
if auto_rollback:
print("Выполняется автоматический откат...")
portfolio.restore_from_snapshot(initial_snapshot)
# Помечаем все транзакции как откаченные
for transaction in transaction_batch:
transaction.status = TransactionStatus.ROLLED_BACK
raise e
finally:
final_value = portfolio.get_total_value()
value_change = final_value - Decimal(str(initial_snapshot['total_value']))
print(f"Завершение транзакционной сессии")
print(f"Изменение стоимости портфеля: ${float(value_change):+,.2f}")
def create_rebalancing_transactions(current_portfolio: PortfolioState,
target_weights: Dict[str, float],
current_prices: Dict[str, Decimal]) -> List[Transaction]:
"""
Создание транзакций для ребалансировки портфеля
"""
total_value = current_portfolio.get_total_value()
transactions = []
transaction_counter = 0
# Рассчитываем целевые размеры позиций
target_values = {symbol: Decimal(str(weight)) * total_value 
for symbol, weight in target_weights.items()}
# Создаем транзакции для каждого актива
for symbol, target_value in target_values.items():
current_value = Decimal('0')
if symbol in current_portfolio.positions:
current_value = current_portfolio.positions[symbol].market_value
value_difference = target_value - current_value
if abs(value_difference) > Decimal('100'):  # Минимальная сумма для торговли
price = current_prices.get(symbol, Decimal('100'))  # Цена по умолчанию
quantity = abs(value_difference) / price
# Округляем до 2 знаков после запятой
quantity = quantity.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP)
operation_type = "BUY" if value_difference > 0 else "SELL"
transaction = Transaction(
transaction_id=f"REBAL_{transaction_counter:04d}",
timestamp=datetime.now(),
operation_type=operation_type,
symbol=symbol,
quantity_change=quantity,
price=price,
metadata={
'rebalancing': True,
'target_weight': target_weights[symbol],
'value_difference': float(value_difference)
}
)
transactions.append(transaction)
transaction_counter += 1
return transactions
@contextmanager
def trading_session(session_name: str, max_memory_mb: int = 1000):
"""
Контекстный менеджер для управления торговой сессией
Обеспечивает корректное освобождение ресурсов и логирование
"""
process = psutil.Process()
start_memory = process.memory_info().rss / 1024 / 1024
start_time = time.time()
session_logger = logging.getLogger(f"TradingSession_{session_name}")
session_logger.setLevel(logging.INFO)
# Создаем обработчик если его нет
if not session_logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
session_logger.addHandler(handler)
session_logger.info(f"Начало сессии {session_name}. Память: {start_memory:.2f} MB")
try:
yield session_logger
except Exception as e:
session_logger.error(f"Ошибка в сессии {session_name}: {e}")
raise
finally:
# Принудительная очистка памяти
gc.collect()
end_memory = process.memory_info().rss / 1024 / 1024
session_duration = time.time() - start_time
memory_used = end_memory - start_memory
session_logger.info(f"Завершение сессии {session_name}. "
f"Длительность: {session_duration:.2f}с, "
f"Использовано памяти: {memory_used:.2f} MB")
if memory_used > max_memory_mb:
session_logger.warning(f"Превышен лимит памяти: {memory_used:.2f} > {max_memory_mb} MB")
def demonstrate_portfolio_transactions():
"""Демонстрация работы транзакционной системы портфеля"""
print("Демонстрация транзакционной системы портфеля:")
print("=" * 55)
# Создаем портфель и валидатор
portfolio = PortfolioState("DEMO_PORTFOLIO", Decimal('100000'))
validator = TransactionValidator()
# Добавляем кастомное правило валидации
def no_penny_stocks(transaction: Transaction, portfolio: PortfolioState) -> bool:
"""Запрет на покупку дешевых акций"""
return transaction.price >= Decimal('5.0')
validator.add_validation_rule(no_penny_stocks)
# Текущие цены (в реальной системе получались бы из API)
current_prices = {
'AAPL': Decimal('150.00'),
'GOOGL': Decimal('2500.00'),
'MSFT': Decimal('300.00'),
'TSLA': Decimal('200.00')
}
print(f"\nИсходное состояние портфеля:")
print(f"Наличные: ${float(portfolio.cash):,.2f}")
print(f"Позиции: {len(portfolio.positions)}")
# Тест 1: Успешная покупка акций
print(f"\n{'-'*50}")
print("Тест 1: Покупка акций")
try:
with portfolio_transaction(portfolio, validator) as transactions:
# Добавляем транзакции в батч
transactions.append(Transaction(
transaction_id="BUY_001",
timestamp=datetime.now(),
operation_type="BUY",
symbol="AAPL",
quantity_change=Decimal('100'),
price=current_prices['AAPL']
))
transactions.append(Transaction(
transaction_id="BUY_002", 
timestamp=datetime.now(),
operation_type="BUY",
symbol="GOOGL",
quantity_change=Decimal('10'),
price=current_prices['GOOGL']
))
except Exception as e:
print(f"Ошибка в тесте 1: {e}")
# Тест 2: Нарушение правил валидации
print(f"\n{'-'*50}")
print("Тест 2: Нарушение правил валидации (слишком большая позиция)")
try:
with portfolio_transaction(portfolio, validator) as transactions:
# Пытаемся купить слишком много (более 30% портфеля)
transactions.append(Transaction(
transaction_id="BUY_003",
timestamp=datetime.now(),
operation_type="BUY",
symbol="TSLA",
quantity_change=Decimal('200'),  # $40,000 - слишком много
price=current_prices['TSLA']
))
except Exception as e:
print(f"Ожидаемая ошибка валидации: {e}")
# Тест 3: Ребалансировка портфеля
print(f"\n{'-'*50}")
print("Тест 3: Ребалансировка портфеля")
target_allocation = {
'AAPL': 0.4,
'GOOGL': 0.3,
'MSFT': 0.2,
'TSLA': 0.1
}
try:
rebalancing_transactions = create_rebalancing_transactions(
portfolio, target_allocation, current_prices
)
print(f"Создано {len(rebalancing_transactions)} транзакций для ребалансировки:")
with portfolio_transaction(portfolio, validator) as transactions:
transactions.extend(rebalancing_transactions)
except Exception as e:
print(f"Ошибка в ребалансировке: {e}")
# Показываем финальное состояние
print(f"\n{'-'*50}")
print("Финальное состояние портфеля:")
snapshot = portfolio.get_snapshot()
print(f"Общая стоимость: ${snapshot['total_value']:,.2f}")
print(f"Наличные: ${snapshot['cash']:,.2f}")
print(f"Позиции:")
for symbol, pos_data in snapshot['positions'].items():
weight = pos_data['market_value'] / snapshot['total_value'] * 100
print(f"  {symbol}: {pos_data['quantity']} шт @ ${pos_data['avg_price']:.2f} "
f"= ${pos_data['market_value']:,.2f} ({weight:.1f}%)")
print(f"\nВсего транзакций в логе: {len(portfolio.transaction_log)}")
return portfolio
def analyze_portfolio_performance(symbols: list, period: str = "6mo"):
"""
Анализ производительности портфеля с отказоустойчивостью
(упрощенная версия для демонстрации)
"""
with trading_session("portfolio_analysis", max_memory_mb=500) as logger:
portfolio_data = {}
failed_symbols = []
# Имитируем загрузку данных
mock_data = {
'AAPL': {'total_return': 15.2, 'volatility': 18.5, 'sharpe_ratio': 0.82},
'GOOGL': {'total_return': 8.7, 'volatility': 22.1, 'sharpe_ratio': 0.39},
'MSFT': {'total_return': 12.3, 'volatility': 16.8, 'sharpe_ratio': 0.73},
'TSLA': {'total_return': 22.1, 'volatility': 35.2, 'sharpe_ratio': 0.63},
'BTC-USD': {'total_return': 45.8, 'volatility': 65.4, 'sharpe_ratio': 0.70}
}
for symbol in symbols:
try:
logger.info(f"Анализ данных для {symbol}")
if symbol in mock_data:
portfolio_data[symbol] = mock_data[symbol]
else:
# Имитируем случайные метрики
portfolio_data[symbol] = {
'total_return': np.random.normal(10, 15),
'volatility': np.random.normal(20, 10),
'sharpe_ratio': np.random.normal(0.5, 0.3)
}
except Exception as e:
logger.error(f"Не удалось обработать {symbol}: {e}")
failed_symbols.append(symbol)
return portfolio_data, failed_symbols
# Запуск демонстрации
if __name__ == "__main__":
# Демонстрация транзакционной системы
demo_portfolio = demonstrate_portfolio_transactions()
print(f"\n{'='*60}")
print("Анализ производительности:")
# Анализ производительности
test_symbols = ["AAPL", "GOOGL", "MSFT", "TSLA", "BTC-USD"]
portfolio_results, failed = analyze_portfolio_performance(test_symbols)
print("\nРезультаты анализа портфеля:")
for symbol, metrics in portfolio_results.items():
print(f"{symbol}: Доходность {metrics['total_return']:.2f}%, "
f"Волатильность {metrics['volatility']:.2f}%, "
f"Sharpe {metrics['sharpe_ratio']:.3f}")
if failed:
print(f"Не удалось обработать: {', '.join(failed)}")
print("\nДемонстрация завершена!")
Длительность: 0.17с, Использовано памяти: 0.00 MB
Демонстрация транзакционной системы портфеля:
=======================================================
Исходное состояние портфеля:
Наличные: $100,000.00
Позиции: 0
--------------------------------------------------
Тест 1: Покупка акций
Начало транзакционной сессии для портфеля DEMO_PORTFOLIO
Исходное состояние: $100,000.00
Успешное выполнение 2 транзакций
Новое состояние портфеля: $100,000.00
BUY 100.0 AAPL @ $150.0
BUY 10.0 GOOGL @ $2500.0
Завершение транзакционной сессии
--------------------------------------------------
Тест 2: Нарушение правил валидации (слишком большая позиция)
Начало транзакционной сессии для портфеля DEMO_PORTFOLIO
Исходное состояние: $100,000.00
ОШИБКА в транзакционной сессии: Ошибки валидации: Превышен лимит позиции: 40.0% > 30.0%
Выполняется автоматический откат...
Завершение транзакционной сессии
Ожидаемая ошибка валидации: Ошибки валидации: Превышен лимит позиции: 40.0% > 30.0%
--------------------------------------------------
Тест 3: Ребалансировка портфеля
Создано 4 транзакций для ребалансировки:
Начало транзакционной сессии для портфеля DEMO_PORTFOLIO
Исходное состояние: $100,000.00
ОШИБКА в транзакционной сессии: Ошибки валидации: Превышен лимит позиции: 40.0% > 30.0%
Выполняется автоматический откат...
Завершение транзакционной сессии
Ошибка в ребалансировке: Ошибки валидации: Превышен лимит позиции: 40.0% > 30.0%
--------------------------------------------------
Финальное состояние портфеля:
Общая стоимость: $100,000.00
Наличные: $60,000.00
Позиции:
AAPL: 100.0 шт @ $150.00 = $15,000.00 (15.0%)
GOOGL: 10.0 шт @ $2500.00 = $25,000.00 (25.0%)
Всего транзакций в логе: 2
============================================================
Анализ производительности:
Результаты анализа портфеля:
AAPL: Доходность 15.20%, Волатильность 18.50%, Sharpe 0.820
GOOGL: Доходность 8.70%, Волатильность 22.10%, Sharpe 0.390
MSFT: Доходность 12.30%, Волатильность 16.80%, Sharpe 0.730
TSLA: Доходность 22.10%, Волатильность 35.20%, Sharpe 0.630
BTC-USD: Доходность 45.80%, Волатильность 65.40%, Sharpe 0.700
Демонстрация завершена!

Данный код представляет комплексную систему отказоустойчивости, которая объединяет несколько продвинутых паттернов Python. Circuit Breaker — это паттерн, широко используемый в микросервисной архитектуре, который предотвращает каскадные сбои при проблемах с внешними сервисами. В контексте финансовых приложений это особенно важно, поскольку сбой в получении данных от одной биржи не должен парализовать всю торговую систему.

Класс CircuitBreaker отслеживает количество неудачных попыток обращения к внешнему сервису и автоматически блокирует дальнейшие вызовы при превышении порога ошибок. Это защищает как нашу систему от бесполезных повторных попыток, так и внешний сервис от дополнительной нагрузки. Механизм восстановления позволяет системе автоматически возобновлять работу после истечения таймаута.

Контекстный менеджер trading_session решает другую важную проблему — управление ресурсами во время торговых сессий. Он автоматически отслеживает потребление памяти, что особенно важно при работе с большими объемами рыночных данных, и обеспечивает корректное логирование всех операций для последующего аудита.

Декораторы для финансовых вычислений

Кеширование и мемоизация дорогостоящих расчетов

В количественных финансах многие вычисления являются чрезвычайно ресурсоемкими. Расчет VAR (Value at Risk), симуляции Монте-Карло, оптимизация портфеля — все эти операции могут занимать минуты или даже часы. Грамотно реализованное кеширование способно сократить время выполнения в десятки раз.

import functools
import hashlib
import pickle
import os
from typing import Any, Dict, Callable, Optional
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
class AdvancedCache:
"""
Продвинутая система кеширования для финансовых вычислений
с поддержкой TTL, персистентности и инвалидации
"""
def __init__(self, 
max_size: int = 1000, 
ttl_seconds: Optional[int] = 3600,
persist_path: Optional[str] = None):
self.max_size = max_size
self.ttl_seconds = ttl_seconds
self.persist_path = persist_path
self._cache: Dict[str, Dict[str, Any]] = {}
self._access_times: Dict[str, datetime] = {}
# Загружаем кеш из файла если указан путь
if persist_path and os.path.exists(persist_path):
self._load_cache()
def _generate_key(self, func: Callable, args: tuple, kwargs: dict) -> str:
"""Генерация уникального ключа для кеширования"""
# Создаем хеш на основе имени функции и аргументов
key_data = {
'func_name': func.__name__,
'args': args,
'kwargs': sorted(kwargs.items())
}
# Специальная обработка для pandas и numpy объектов
serializable_data = self._make_serializable(key_data)
key_string = str(serializable_data)
return hashlib.md5(key_string.encode()).hexdigest()
def _make_serializable(self, obj: Any) -> Any:
"""Преобразование объектов в сериализуемый формат для хеширования"""
if isinstance(obj, pd.DataFrame):
return f"DataFrame_{hash(obj.values.tobytes())}_{hash(tuple(obj.columns))}"
elif isinstance(obj, pd.Series):
return f"Series_{hash(obj.values.tobytes())}_{obj.name}"
elif isinstance(obj, np.ndarray):
return f"Array_{hash(obj.tobytes())}_{obj.shape}"
elif isinstance(obj, dict):
return {k: self._make_serializable(v) for k, v in obj.items()}
elif isinstance(obj, (list, tuple)):
return [self._make_serializable(item) for item in obj]
else:
return obj
def _is_expired(self, key: str) -> bool:
"""Проверка истечения TTL для ключа"""
if not self.ttl_seconds or key not in self._cache:
return False
cache_time = self._cache[key]['timestamp']
return datetime.now() - cache_time > timedelta(seconds=self.ttl_seconds)
def _evict_if_needed(self):
"""Удаление старых записей при превышении лимита"""
if len(self._cache) >= self.max_size:
# Удаляем самую старую запись по времени доступа
oldest_key = min(self._access_times.keys(), key=lambda k: self._access_times[k])
del self._cache[oldest_key]
del self._access_times[oldest_key]
def get(self, key: str) -> Optional[Any]:
"""Получение значения из кеша"""
if key in self._cache and not self._is_expired(key):
self._access_times[key] = datetime.now()
return self._cache[key]['value']
# Удаляем просроченную запись
if key in self._cache:
del self._cache[key]
del self._access_times[key]
return None
def set(self, key: str, value: Any):
"""Сохранение значения в кеш"""
self._evict_if_needed()
self._cache[key] = {
'value': value,
'timestamp': datetime.now()
}
self._access_times[key] = datetime.now()
# Сохраняем в файл если включена персистентность
if self.persist_path:
self._save_cache()
def _save_cache(self):
"""Сохранение кеша в файл"""
try:
with open(self.persist_path, 'wb') as f:
pickle.dump({
'cache': self._cache,
'access_times': self._access_times
}, f)
except Exception as e:
print(f"Ошибка сохранения кеша: {e}")
def _load_cache(self):
"""Загрузка кеша из файла"""
try:
with open(self.persist_path, 'rb') as f:
data = pickle.load(f)
self._cache = data.get('cache', {})
self._access_times = data.get('access_times', {})
except Exception as e:
print(f"Ошибка загрузки кеша: {e}")
# Глобальный экземпляр кеша для финансовых вычислений
finance_cache = AdvancedCache(
max_size=500, 
ttl_seconds=1800,  # 30 минут
persist_path="finance_cache.pkl"
)
def financial_cache(cache_instance: AdvancedCache = finance_cache):
"""
Декоратор для кеширования финансовых вычислений
"""
def decorator(func: Callable):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Генерируем ключ кеша
cache_key = cache_instance._generate_key(func, args, kwargs)
# Пытаемся получить из кеша
cached_result = cache_instance.get(cache_key)
if cached_result is not None:
print(f"Cache HIT для {func.__name__}")
return cached_result
# Выполняем вычисления и кешируем результат
print(f"Cache MISS для {func.__name__} - выполняем вычисления")
result = func(*args, **kwargs)
cache_instance.set(cache_key, result)
return result
# Добавляем метод для очистки кеша конкретной функции
def clear_cache():
# Удаляем все записи связанные с этой функцией
keys_to_remove = []
for key in cache_instance._cache.keys():
if func.__name__ in str(cache_instance._cache[key]):
keys_to_remove.append(key)
for key in keys_to_remove:
del cache_instance._cache[key]
if key in cache_instance._access_times:
del cache_instance._access_times[key]
wrapper.clear_cache = clear_cache
return wrapper
return decorator
@financial_cache()
def calculate_portfolio_var(returns_data: pd.DataFrame, 
confidence_level: float = 0.05,
time_horizon: int = 1) -> Dict[str, float]:
"""
Расчет Value at Risk портфеля методом исторических симуляций
Дорогостоящая операция, идеально подходящая для кеширования
"""
print(f"Выполняем расчет VaR для портфеля из {len(returns_data.columns)} активов")
# Симуляция большого количества сценариев (имитация сложных вычислений)
n_simulations = 10000
portfolio_returns = []
# Равновесные веса портфеля
weights = np.array([1/len(returns_data.columns)] * len(returns_data.columns))
for _ in range(n_simulations):
# Случайная выборка исторических доходностей
random_indices = np.random.choice(len(returns_data), time_horizon)
simulated_returns = returns_data.iloc[random_indices]
# Расчет доходности портфеля
portfolio_return = np.sum(simulated_returns.mean() * weights)
portfolio_returns.append(portfolio_return)
portfolio_returns = np.array(portfolio_returns)
# Расчет различных метрик риска
var_value = np.percentile(portfolio_returns, confidence_level * 100)
expected_shortfall = portfolio_returns[portfolio_returns <= var_value].mean() return { 'VaR': var_value, 'CVaR': expected_shortfall, 'expected_return': portfolio_returns.mean(), 'volatility': portfolio_returns.std(), 'simulations_count': n_simulations } @financial_cache() def optimize_portfolio_weights(returns_data: pd.DataFrame, target_return: Optional[float] = None, risk_free_rate: float = 0.02) -> Dict[str, Any]:
"""
Оптимизация весов портфеля (упрощенная версия)
Еще один пример ресурсоемких вычислений
"""
print(f"Оптимизация портфеля для {len(returns_data.columns)} активов")
# Имитация сложных оптимизационных вычислений
n_iterations = 5000
best_sharpe = -np.inf
best_weights = None
mean_returns = returns_data.mean() * 252  # Годовые доходности
cov_matrix = returns_data.cov() * 252     # Годовая ковариационная матрица
for _ in range(n_iterations):
# Генерируем случайные веса
weights = np.random.random(len(returns_data.columns))
weights /= weights.sum()  # Нормализация
# Расчет ожидаемой доходности и риска портфеля
portfolio_return = np.sum(weights * mean_returns)
portfolio_variance = np.dot(weights.T, np.dot(cov_matrix, weights))
portfolio_volatility = np.sqrt(portfolio_variance)
# Коэффициент Шарпа
sharpe_ratio = (portfolio_return - risk_free_rate) / portfolio_volatility
if sharpe_ratio > best_sharpe:
best_sharpe = sharpe_ratio
best_weights = weights.copy()
# Расчет финальных метрик для оптимального портфеля
optimal_return = np.sum(best_weights * mean_returns)
optimal_variance = np.dot(best_weights.T, np.dot(cov_matrix, best_weights))
optimal_volatility = np.sqrt(optimal_variance)
return {
'weights': dict(zip(returns_data.columns, best_weights)),
'expected_return': optimal_return,
'volatility': optimal_volatility,
'sharpe_ratio': best_sharpe,
'iterations': n_iterations
}
# Демонстрация работы кеширования
def test_caching_performance():
"""Тестирование эффективности кеширования"""
# Подготовка тестовых данных
symbols = ["TSLA", "AMZN", "GOOGL", "META", "NFLX"]
returns_data = {}
for symbol in symbols:
try:
data = yf.download(symbol, period="2y", interval="1d", progress=False)
if isinstance(data.columns, pd.MultiIndex):
data.columns = data.columns.droplevel(1)
returns_data[symbol] = data['Close'].pct_change().dropna()
except:
# Генерируем случайные данные если загрузка не удалась
returns_data[symbol] = pd.Series(np.random.normal(0.001, 0.02, 500))
portfolio_returns = pd.DataFrame(returns_data)
print("Тестирование кеширования финансовых вычислений:")
print("=" * 50)
# Первый вызов - без кеша
import time
start_time = time.time()
var_result1 = calculate_portfolio_var(portfolio_returns)
first_call_time = time.time() - start_time
# Второй вызов - с кешем
start_time = time.time()
var_result2 = calculate_portfolio_var(portfolio_returns)
second_call_time = time.time() - start_time
print(f"Первый вызов VaR: {first_call_time:.3f}с")
print(f"Второй вызов VaR: {second_call_time:.3f}с")
print(f"Ускорение: {first_call_time/second_call_time:.1f}x")
print(f"VaR (5%): {var_result1['VaR']:.4f}")
print(f"CVaR: {var_result1['CVaR']:.4f}")
# Запуск тестирования
test_caching_performance()
Тестирование кеширования финансовых вычислений:
==================================================
Cache MISS для calculate_portfolio_var - выполняем вычисления
Выполняем расчет VaR для портфеля из 5 активов
Cache HIT для calculate_portfolio_var
Первый вызов VaR: 6.002с
Второй вызов VaR: 0.000с
Ускорение: 23974.7x
VaR (5%): -0.0269
CVaR: -0.0394

Представленная система кеширования демонстрирует профессиональный подход к оптимизации производительности в финансовых приложениях. Класс AdvancedCache реализует несколько ключевых особенностей: TTL (Time To Live) для автоматического устаревания данных, персистентность для сохранения кеша между запусками приложения и интеллектуальную генерацию ключей, которая корректно обрабатывает pandas и numpy объекты.

👉🏻  Автокорреляция (ACF) и частичная автокорреляция (PACF) в биржевом анализе

Хочу обратить внимание на обработку pandas DataFrame и Series при генерации ключей кеша. Простое хеширование этих объектов может дать неожиданные результаты из-за особенностей их внутреннего представления. Метод _make_serializable создает стабильные хеши на основе содержимого данных, что гарантирует корректную работу кеширования даже при работе с идентичными по содержанию, но разными по объекту DataFrame.

Функции calculate_portfolio_var и optimize_portfolio_weights представляют типичные примеры ресурсоемких финансовых вычислений. VaR (Value at Risk) — один из ключевых показателей риска в современном риск-менеджменте, а оптимизация портфеля — основа количественного управления активами. В реальных приложениях эти вычисления могут занимать значительное время, особенно при работе с большими портфелями и сложными моделями.

Логирование и мониторинг торговых операций

В количественных финансах детальное логирование — это не просто полезная функция. Зачастую это требование регуляторов и основа для анализа производительности стратегий. Декораторы позволяют элегантно добавить комплексное логирование к любой функции.

import functools
import logging
import time
import traceback
from typing import Any, Callable, Optional, Dict
from datetime import datetime
import json
class TradingLogger:
"""
Специализированная система логирования для торговых операций
с поддержкой структурированных логов и метрик производительности
"""
def __init__(self, name: str, level: int = logging.INFO):
self.logger = logging.getLogger(name)
self.logger.setLevel(level)
# Создаем форматтер для структурированных логов
formatter = logging.Formatter(
'%(asctime)s | %(name)s | %(levelname)s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Консольный хендлер
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
# Файловый хендлер для торговых операций
file_handler = logging.FileHandler(f'trading_{name}_{datetime.now().strftime("%Y%m%d")}.log')
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
# Метрики производительности
self.performance_metrics = {}
def log_trade_execution(self, 
symbol: str, 
action: str, 
quantity: float, 
price: float,
strategy: str,
additional_data: Optional[Dict] = None):
"""Логирование выполнения торговой операции"""
trade_data = {
'timestamp': datetime.now().isoformat(),
'symbol': symbol,
'action': action,
'quantity': quantity,
'price': price,
'value': quantity * price,
'strategy': strategy
}
if additional_data:
trade_data.update(additional_data)
self.logger.info(f"TRADE_EXECUTION | {json.dumps(trade_data)}")
def update_performance_metric(self, metric_name: str, value: float):
"""Обновление метрик производительности"""
if metric_name not in self.performance_metrics:
self.performance_metrics[metric_name] = []
self.performance_metrics[metric_name].append({
'timestamp': datetime.now().isoformat(),
'value': value
})
# Глобальный логгер для торговых операций
trading_logger = TradingLogger("QuantTrading")
def trading_operation(operation_type: str = "GENERAL",
log_performance: bool = True,
log_inputs: bool = True,
log_outputs: bool = True):
"""
Декоратор для логирования торговых операций с детальной аналитикой
"""
def decorator(func: Callable):
@functools.wraps(func)
def wrapper(*args, **kwargs):
operation_id = f"{func.__name__}_{int(time.time())}"
start_time = time.time()
# Логируем начало операции
trading_logger.logger.info(f"START {operation_type} | {operation_id} | Function: {func.__name__}")
if log_inputs and (args or kwargs):
# Безопасное логирование входных параметров
safe_args = []
for arg in args:
if isinstance(arg, (pd.DataFrame, pd.Series)):
safe_args.append(f"DataFrame({arg.shape})" if hasattr(arg, 'shape') else f"Series({len(arg)})")
elif isinstance(arg, np.ndarray):
safe_args.append(f"Array({arg.shape})")
else:
safe_args.append(str(arg)[:100])  # Ограничиваем длину
safe_kwargs = {k: (str(v)[:100] if not isinstance(v, (pd.DataFrame, pd.Series, np.ndarray)) 
else f"{type(v).__name__}") for k, v in kwargs.items()}
trading_logger.logger.debug(f"INPUTS | {operation_id} | args: {safe_args} | kwargs: {safe_kwargs}")
try:
# Выполняем основную функцию
result = func(*args, **kwargs)
execution_time = time.time() - start_time
# Логируем успешное завершение
trading_logger.logger.info(f"SUCCESS {operation_type} | {operation_id} | "
f"Duration: {execution_time:.4f}s")
if log_outputs and result is not None:
# Безопасное логирование результата
if isinstance(result, dict):
result_summary = {k: f"{type(v).__name__}" for k, v in result.items()}
trading_logger.logger.debug(f"OUTPUT | {operation_id} | {result_summary}")
else:
result_type = type(result).__name__
trading_logger.logger.debug(f"OUTPUT | {operation_id} | Type: {result_type}")
# Обновляем метрики производительности
if log_performance:
trading_logger.update_performance_metric(f"{func.__name__}_execution_time", execution_time)
return result
except Exception as e:
execution_time = time.time() - start_time
error_details = {
'error_type': type(e).__name__,
'error_message': str(e),
'execution_time': execution_time,
'traceback': traceback.format_exc()
}
trading_logger.logger.error(f"ERROR {operation_type} | {operation_id} | "
f"{json.dumps(error_details)}")
# Перебрасываем исключение дальше
raise e
return wrapper
return decorator
def risk_monitoring(max_drawdown: float = 0.05, 
max_position_size: float = 0.1):
"""
Декоратор для мониторинга рисков в торговых операциях
"""
def decorator(func: Callable):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Выполняем функцию
result = func(*args, **kwargs)
# Анализируем результат на предмет рисков
if isinstance(result, dict):
# Проверяем размер позиции
if 'position_size' in result:
position_size = abs(result['position_size'])
if position_size > max_position_size:
warning_msg = f"RISK WARNING: Position size {position_size:.3f} exceeds limit {max_position_size:.3f}"
trading_logger.logger.warning(warning_msg)
# Проверяем просадку
if 'drawdown' in result:
drawdown = abs(result['drawdown'])
if drawdown > max_drawdown:
warning_msg = f"RISK WARNING: Drawdown {drawdown:.3f} exceeds limit {max_drawdown:.3f}"
trading_logger.logger.warning(warning_msg)
# Проверяем концентрацию риска
if 'portfolio_weights' in result:
weights = result['portfolio_weights']
if isinstance(weights, dict):
max_weight = max(abs(w) for w in weights.values())
if max_weight > 0.3:  # Максимум 30% в одном активе
trading_logger.logger.warning(f"RISK WARNING: High concentration {max_weight:.3f}")
return result
return wrapper
return decorator
@trading_operation(operation_type="PORTFOLIO_REBALANCING", log_performance=True)
@risk_monitoring(max_drawdown=0.03, max_position_size=0.15)
def rebalance_portfolio(current_weights: Dict[str, float],
target_weights: Dict[str, float],
portfolio_value: float,
transaction_cost: float = 0.001) -> Dict[str, Any]:
"""
Ребалансировка портфеля с учетом транзакционных издержек
Демонстрация применения декораторов логирования и риск-мониторинга
"""
# Расчет необходимых изменений в позициях
rebalancing_trades = {}
total_turnover = 0
for symbol in target_weights:
current_weight = current_weights.get(symbol, 0)
target_weight = target_weights[symbol]
weight_change = target_weight - current_weight
if abs(weight_change) > 0.001:  # Минимальный порог для торговли
trade_value = weight_change * portfolio_value
rebalancing_trades[symbol] = {
'current_weight': current_weight,
'target_weight': target_weight,
'weight_change': weight_change,
'trade_value': trade_value,
'action': 'BUY' if weight_change > 0 else 'SELL'
}
total_turnover += abs(trade_value)
# Расчет общих транзакционных издержек
total_transaction_costs = total_turnover * transaction_cost
# Логируем детали ребалансировки
for symbol, trade_info in rebalancing_trades.items():
trading_logger.log_trade_execution(
symbol=symbol,
action=trade_info['action'],
quantity=abs(trade_info['trade_value']) / 100,  # Условные акции
price=100,  # Условная цена
strategy="PORTFOLIO_REBALANCING",
additional_data={
'weight_change': trade_info['weight_change'],
'target_weight': trade_info['target_weight']
}
)
# Расчет метрик риска
portfolio_concentration = max(abs(w) for w in target_weights.values())
estimated_tracking_error = np.std(list(target_weights.values())) * 0.1  # Упрощенная оценка
return {
'rebalancing_trades': rebalancing_trades,
'total_turnover': total_turnover,
'transaction_costs': total_transaction_costs,
'portfolio_weights': target_weights,
'position_size': portfolio_concentration,
'tracking_error': estimated_tracking_error,
'trade_count': len(rebalancing_trades)
}
@trading_operation(operation_type="MOMENTUM_STRATEGY", log_performance=True)
def momentum_strategy_signal(price_data: pd.Series, 
short_window: int = 10, 
long_window: int = 30,
momentum_threshold: float = 0.02) -> Dict[str, Any]:
"""
Генерация сигналов моментум-стратегии
"""
# Расчет скользящих средних
short_ma = price_data.rolling(window=short_window).mean()
long_ma = price_data.rolling(window=long_window).mean()
# Текущие значения
current_short_ma = short_ma.iloc[-1]
current_long_ma = long_ma.iloc[-1]
current_price = price_data.iloc[-1]
# Расчет моментума
momentum = (current_short_ma - current_long_ma) / current_long_ma
# Генерация сигнала
if momentum > momentum_threshold:
signal = "BUY"
confidence = min(momentum / momentum_threshold, 2.0)  # Максимум 2.0
elif momentum < -momentum_threshold:
signal = "SELL"
confidence = min(abs(momentum) / momentum_threshold, 2.0)
else:
signal = "HOLD"
confidence = 0.5
# Расчет риск-метрик
price_volatility = price_data.pct_change().rolling(window=20).std().iloc[-1]
drawdown = (current_price - price_data.rolling(window=20).max().iloc[-1]) / price_data.rolling(window=20).max().iloc[-1]
return {
'signal': signal,
'confidence': confidence,
'momentum': momentum,
'current_price': current_price,
'short_ma': current_short_ma,
'long_ma': current_long_ma,
'volatility': price_volatility,
'drawdown': drawdown,
'position_size': confidence * 0.1  # Размер позиции на основе уверенности
}
# Демонстрация работы системы логирования
def demonstrate_trading_system():
"""Демонстрация работы торговой системы с логированием"""
print("Демонстрация системы логирования торговых операций:")
print("=" * 60)
# Подготовка данных
try:
btc_data = yf.download("BTC-USD", period="3mo", interval="1d", progress=False)
if isinstance(btc_data.columns, pd.MultiIndex):
btc_data.columns = btc_data.columns.droplevel(1)
prices = btc_data['Close']
except:
# Генерируем тестовые данные
dates = pd.date_range(start='2024-01-01', periods=90, freq='D')
prices = pd.Series(100 * np.exp(np.cumsum(np.random.normal(0.001, 0.02, 90))), index=dates)
# Тест моментум-стратегии
momentum_result = momentum_strategy_signal(prices, short_window=5, long_window=20)
print(f"Momentum Signal: {momentum_result['signal']} (confidence: {momentum_result['confidence']:.2f})")
# Тест ребалансировки портфеля
current_portfolio = {"BTC-USD": 0.4, "ETH-USD": 0.3, "CASH": 0.3}
target_portfolio = {"BTC-USD": 0.5, "ETH-USD": 0.35, "CASH": 0.15}
rebalance_result = rebalance_portfolio(
current_weights=current_portfolio,
target_weights=target_portfolio,
portfolio_value=100000,
transaction_cost=0.002
)
print(f"Rebalancing completed: {rebalance_result['trade_count']} trades")
print(f"Total transaction costs: ${rebalance_result['transaction_costs']:.2f}")
# Показываем метрики производительности
print("\nМетрики производительности:")
for metric_name, values in trading_logger.performance_metrics.items():
avg_time = np.mean([v['value'] for v in values])
print(f"{metric_name}: {avg_time:.4f}s (среднее за {len(values)} вызовов)")
# Запуск демонстрации
demonstrate_trading_system()
INFO:QuantTrading:SUCCESS PORTFOLIO_REBALANCING | rebalance_portfolio_1753469681 | Duration: 0.0178s
Демонстрация системы логирования торговых операций:
============================================================
Momentum Signal: HOLD (confidence: 0.50)
Rebalancing completed: 3 trades
Total transaction costs: $60.00
Метрики производительности:
momentum_strategy_signal_execution_time: 0.0072s (среднее за 1 вызовов)
rebalance_portfolio_execution_time: 0.0178s (среднее за 1 вызовов)

Система логирования демонстрирует профессиональный подход к мониторингу торговых операций. Класс TradingLogger создает структурированные логи, которые легко парсить и анализировать автоматическими системами.

Декоратор trading_operation автоматически добавляет к любой функции детальное логирование времени выполнения, входных параметров и результатов. Особое внимание уделено безопасности логирования — pandas DataFrame и numpy массивы не выводятся целиком, а заменяются описанием их структуры, что предотвращает переполнение логов.

Декоратор risk_monitoring реализует автоматический контроль рисков на уровне функций. Он анализирует результаты торговых операций и генерирует предупреждения при превышении заданных лимитов. Такой подход позволяет встроить риск-контроль непосредственно в бизнес-логику без ее усложнения.

Синтез подходов: создание надежного и эффективного финансового ядра

Рассмотренные нами отдельно декораторы, контекстные менеджеры и метаклассы – это мощные инструменты, но их истинная ценность раскрывается при комплексном применении. Современные финансовые приложения требуют не просто корректности расчетов, но и высокой производительности, отказоустойчивости и строгого соблюдения регуляторных требований. Давайте рассмотрим, как можно объединить эти концепции для создания надежного ядра финансовой системы.

Представим себе модуль расчета риска портфеля. Он должен быть быстрым (кеширование), безопасным (транзакционность), надежным (отказоустойчивость) и детально логируемым (аудит). Ниже приведен пример, демонстрирующий синтез подходов:

import functools
import logging
import time
import traceback
from contextlib import contextmanager
from typing import Any, Callable, Optional, Dict, List
from decimal import Decimal
import numpy as np
import pandas as pd
# --- Декораторы ---
def risk_cache(ttl_seconds: int = 3600):
"""Декоратор для кеширования результатов расчетов риска с TTL."""
def decorator(func: Callable) -> Callable:
cache = {}
timestamps = {}
@functools.wraps(func)
def wrapper(*args, **kwargs):
key = str(args) + str(sorted(kwargs.items()))
current_time = time.time()
if key in cache and (current_time - timestamps[key]) < ttl_seconds: logging.debug(f"[КЕШ] Использован кеш для {func.__name__}") return cache[key] else: result = func(*args, **kwargs) cache[key] = result timestamps[key] = current_time logging.debug(f"[КЕШ] Обновлен кеш для {func.__name__}") return result return wrapper return decorator def risk_logger(func: Callable) -> Callable:
"""Декоратор для логирования входов и выходов функций расчета риска."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
logging.info(f"[ЛОГ] Вызов {func.__name__} с args={args[:2]}...")
try:
result = func(*args, **kwargs)
end_time = time.time()
logging.info(f"[ЛОГ] {func.__name__} успешно завершена за {end_time - start_time:.4f}с. Результат: {type(result).__name__}")
return result
except Exception as e:
end_time = time.time()
logging.error(f"[ЛОГ] Ошибка в {func.__name__} ({end_time - start_time:.4f}с): {e}")
raise # Перебрасываем исключение
return wrapper
# --- Контекстные менеджеры ---
@contextmanager
def risk_calculation_context(portfolio_id: str, timeout_seconds: int = 30):
"""Контекстный менеджер для безопасного выполнения расчетов риска."""
import signal
import threading
print(f"--- Начало расчета риска для портфеля {portfolio_id} ---")
start_time = time.time()
# Установка таймаута (упрощенная реализация)
timer = threading.Timer(timeout_seconds, lambda: print(f"[ПРЕДУПРЕЖДЕНИЕ] Расчет риска для {portfolio_id} превышает {timeout_seconds} секунд!"))
timer.start()
try:
yield # Передаем управление блоку with
print(f"--- Расчет риска для {portfolio_id} завершен успешно за {time.time() - start_time:.2f}с ---")
except Exception as e:
print(f"--- Расчет риска для {portfolio_id} прерван ошибкой: {e} ---")
raise
finally:
timer.cancel() # Отменяем таймер
# --- Базовый класс с метаклассом ---
from abc import ABCMeta, abstractmethod
class RiskModelMeta(ABCMeta):
"""Метакласс для автоматической регистрации и валидации моделей риска."""
models = {}  # Словарь для хранения зарегистрированных моделей
def __new__(mcs, name, bases, namespace, **kwargs):
cls = super().__new__(mcs, name, bases, namespace, **kwargs)
if name != "BaseRiskModel":  # Исключаем абстрактный базовый класс
mcs.models[name] = cls
print(f"[РЕГИСТРАЦИЯ] Модель риска '{name}' зарегистрирована.")
return cls
@classmethod
def list_models(mcs):
"""Возвращает список зарегистрированных моделей."""
return list(mcs.models.keys())
class BaseRiskModel(ABC, metaclass=RiskModelMeta):
"""Базовый абстрактный класс для всех моделей риска."""
def __init__(self, model_id: str):
self.model_id = model_id
self._last_calculation_time = None
self._last_result = None
@abstractmethod
def calculate(self, portfolio_data: pd.DataFrame, **params) -> Dict[str, float]:
"""Абстрактный метод для расчета риска."""
pass
# --- Конкретная реализация модели ---
class AdvancedVaRModel(BaseRiskModel):
"""Продвинутая модель Value at Risk, использующая симуляцию Монте-Карло."""
@risk_logger # Логируем вызовы
@risk_cache(ttl_seconds=1800) # Кешируем результаты на 30 минут
def calculate(self, portfolio_data: pd.DataFrame, confidence_level: float = 0.95, simulations: int = 10000) -> Dict[str, float]:
"""Рассчитывает VaR и CVaR для портфеля."""
# Имитация сложных вычислений
time.sleep(0.1) # Симуляция нагрузки
returns = portfolio_data.pct_change().dropna()
# Симуляция Монте-Карло (упрощенная)
simulated_returns = np.random.choice(returns.values.flatten(), size=simulations)
var = np.percentile(simulated_returns, (1 - confidence_level) * 100)
cvar = simulated_returns[simulated_returns <= var].mean()
return {"VaR": float(var), "CVaR": float(cvar)}
# --- Демонстрация ---
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 1. Создаем данные
dates = pd.date_range('2023-01-01', periods=252)
prices = 100 * np.cumprod(1 + np.random.normal(0, 0.01, 252))
portfolio_df = pd.DataFrame({'price': prices}, index=dates)
# 2. Создаем модель
var_model = AdvancedVaRModel("VAR_MODEL_001")
# 3. Выполняем расчет в безопасном контексте
try:
with risk_calculation_context("PORTFOLIO_123", timeout_seconds=10):
risk_metrics = var_model.calculate(portfolio_df, confidence_level=0.99, simulations=5000)
print(f"Рассчитанные метрики риска: {risk_metrics}")
except Exception as e:
print(f"Ошибка при расчете риска: {e}")
# 4. Повторный вызов - будет использован кеш
try:
with risk_calculation_context("PORTFOLIO_123"):
risk_metrics_cached = var_model.calculate(portfolio_df, confidence_level=0.99, simulations=5000)
print(f"Рассчитанные метрики риска (из кеша): {risk_metrics_cached}")
except Exception as e:
print(f"Ошибка при повторном расчете риска: {e}")
# 5. Просмотр зарегистрированных моделей (благодаря метаклассу)
print(f"\nЗарегистрированные модели риска: {RiskModelMeta.list_models()}")
Модель риска 'AdvancedVaRModel' зарегистрирована.
--- Начало расчета риска для портфеля PORTFOLIO_123 ---
Рассчитанные метрики риска: {'VaR': -0.026722172208942796, 'CVaR': -0.027575677818379902}
--- Расчет риска для PORTFOLIO_123 завершен успешно за 0.11с ---
--- Начало расчета риска для портфеля PORTFOLIO_123 ---
Рассчитанные метрики риска (из кеша): {'VaR': -0.026722172208942796, 'CVaR': -0.027575677818379902}
--- Расчет риска для PORTFOLIO_123 завершен успешно за 0.00с ---
Зарегистрированные модели риска: ['AdvancedVaRModel']

Этот пример иллюстрирует, как различные концепции работают сообща:

  1. Метакласс RiskModelMeta автоматически регистрирует все созданные модели риска, обеспечивая централизованное управление ими. Это упрощает расширение системы новыми моделями;
  2. Декоратор @risk_cache значительно ускоряет повторные расчеты, что критично для интерактивных систем управления рисками;
  3. Декоратор @risk_logger обеспечивает прозрачное логирование всех операций без загрязнения основной логики модели, что необходимо для аудита и отладки;
  4. Контекстный менеджер risk_calculation_context предоставляет безопасную среду выполнения с таймаутом и структурированным логированием начала/окончания процесса.

Такой подход делает код не только более надежным и производительным, но и значительно более читаемым и поддерживаемым. Каждый аспект (кеширование, логирование, безопасность) инкапсулирован в отдельных конструкциях, что упрощает их повторное использование и тестирование.

Заключение

Использование продвинутых возможностей Python – декораторов, контекстных менеджеров и метаклассов – в финансовых приложениях выходит далеко за рамки академического интереса. Это практический инструмент для решения реальных задач, стоящих перед разработчиками в этой области.

Освоение этих концепций – важный шаг на пути к становлению профессиональным разработчиком финансового софта. Хотя путь может показаться сложным, инвестиции в понимание этих инструментов окупятся сторицей в виде более качественного, надежного и эффективного кода. Не стоит бояться экспериментировать с ними, ведь именно на стыке мощи языка Python и специфики финансовых задач рождаются по-настоящему элегантные и эффективные решения.