Стандартный модуль logging в Python ломается в асинхронной среде. При переходе с WSGI на ASGI или при использовании asyncio вы теряете контекст запроса: request_id, user_id или ID транзакции перестают передаваться через цепочки вызовов. Логи из разных запросов перемешиваются, отладка превращается в поиск иголки в стоге сена.
Решение лежит в отказе от потокозависимого хранилища (threading.local) и переходе на контекстные переменные (contextvars). Это руководство дает готовые конфигурации для FastAPI, Django Channels и чистого asyncio, которые сохраняют контекст через все await и асинхронные задачи. Вы получите код middleware, адаптеров и форматтеров, проверенный в production.
Почему стандартное логирование ломается в асинхронной среде
В синхронном веб-приложении (например, Django на WSGI) каждый HTTP-запрос обрабатывается в отдельном потоке. Для хранения данных запроса, таких как ID пользователя или уникальный идентификатор запроса, разработчики используют потокобезопасное хранилище threading.local(). Это работает, потому что контекст выполнения привязан к потоку и не меняется до завершения обработки.
В асинхронной модели (ASGI, asyncio) один поток обрабатывает множество запросов одновременно, переключаясь между ними при операциях ввода-вывода. threading.local() становится общим для всех запросов в этом потоке. Если вы установите request_id для первого запроса, а затем переключитесь на второй до завершения первого, второй запрос увидит и изменит этот же request_id. Контекст теряется.
Thread-local storage: почему это больше не работает
Рассмотрим пример. В синхронном приложении вы могли бы использовать декоратор для логирования:
import threading
import logging
request_local = threading.local()
def set_request_id(request_id):
request_local.request_id = request_id
def get_request_id():
return getattr(request_local, 'request_id', None)
logger = logging.getLogger(__name__)
async def handle_request(request_id):
set_request_id(request_id)
logger.info("Начало обработки")
# Эмуляция операции ввода-вывода
await asyncio.sleep(0.1)
# Здесь request_id уже может быть перезаписан другим запросом!
logger.info("Завершение обработки")
Если вы запустите две корутины handle_request одновременно с разными ID, в логах появится путаница. Второй запрос, выполнив await asyncio.sleep, может перезаписать request_local.request_id, и первый запрос завершится с неверным идентификатором.
Архитектура WSGI, где сервер (Gunicorn, uWSGI) создает отдельный поток или процесс для каждого запроса, гарантирует изоляцию. Архитектура ASGI, которую используют FastAPI, Quart и Django Channels, основана на цикле событий (event loop) в одном потоке, что ломает эту гарантию.
Как asyncio управляет контекстом: задачи, контекстные переменные и изоляция
Для решения проблемы в Python 3.7 появился модуль contextvars. Контекстная переменная (ContextVar) привязывает значение не к потоку, а к контексту выполнения, который копируется при создании каждой асинхронной задачи (asyncio.Task).
Когда вы создаете задачу с помощью asyncio.create_task(), текущий контекст копируется внутрь нее. Изменения контекста внутри задачи не видны в родительской задаче и в других задачах. Это обеспечивает ту же изоляцию, которую давали потоки в WSGI.
import asyncio
import contextvars
request_id_var = contextvars.ContextVar('request_id', default=None)
async def task_worker():
# Получаем значение контекстной переменной для этой задачи
id = request_id_var.get()
print(f"Задача выполняется с request_id={id}")
async def main():
# Устанавливаем контекст для "запроса"
request_id_var.set("req-123")
# Создаем задачу - контекст копируется
task = asyncio.create_task(task_worker())
await task
# В основном коде контекст не изменился
print(f"Основной контекст: {request_id_var.get()}") # Выведет 'req-123'
asyncio.run(main())
Ключевое ограничение: контекст наследуется только при явном создании задачи через asyncio.create_task(). Если вы передаете колбэк в loop.call_soon() или работаете с пулом потоков через run_in_executor, контекст может потеряться. Для этих случаев требуется дополнительная обвязка.
Базовый паттерн: передача контекста через contextvars и кастомный адаптер
Универсальное решение для любого asyncio-проекта состоит из трех частей: объявление контекстных переменных для метаданных (request_id, user_id), создание адаптера для логгера, который подставляет эти метаданные в записи, и утилит для управления жизненным циклом контекста.
Создаем контекстно-зависимый адаптер для logging.LoggerAdapter
Класс LoggerAdapter в стандартной библиотеке позволяет добавлять дополнительный контекст к каждому сообщению лога. Мы расширим его, чтобы он автоматически извлекал данные из contextvars.
import logging
import contextvars
from typing import Dict, Any
# Объявляем контекстные переменные
REQUEST_ID = contextvars.ContextVar("request_id", default=None)
USER_ID = contextvars.ContextVar("user_id", default=None)
class ContextLoggingAdapter(logging.LoggerAdapter):
"""Адаптер, который добавляет request_id и user_id из contextvars в логи."""
def process(self, msg: str, kwargs: Dict[str, Any]) -> tuple[str, Dict[str, Any]]:
# Извлекаем значения из контекстных переменных
extra = kwargs.get('extra', {})
if REQUEST_ID.get() is not None:
extra['request_id'] = REQUEST_ID.get()
if USER_ID.get() is not None:
extra['user_id'] = USER_ID.get()
kwargs['extra'] = extra
return msg, kwargs
def get_context_logger(name: str) -> logging.Logger:
"""Фабрика для получения логгера с адаптером."""
logger = logging.getLogger(name)
# Возвращаем адаптер, а не сам логгер
return ContextLoggingAdapter(logger, extra={})
Использование extra полей критически важно для структурированного логирования. Позже форматтер сможет включить эти поля в JSON-выход или текстовый шаблон. Для получения готовых конфигураций логирования для Django и Flask, включая методы маскирования чувствительных данных, смотрите руководство «Логирование Django и Flask: подробная настройка для production».
Утилиты для управления жизненным циклом контекста
Контекст нужно где-то установить. Для этого используют контекстные менеджеры или декораторы для асинхронных функций.
import uuid
import contextlib
from typing import Optional
@contextlib.asynccontextmanager
async def set_logging_context(request_id: Optional[str] = None, user_id: Optional[str] = None):
"""Асинхронный контекстный менеджер для установки контекста логирования."""
# Генерируем request_id, если не передан
token_req = REQUEST_ID.set(request_id if request_id else f"req-{uuid.uuid4().hex[:8]}")
token_user = None
if user_id:
token_user = USER_ID.set(user_id)
try:
yield
finally:
# Восстанавливаем предыдущие значения
REQUEST_ID.reset(token_req)
if token_user:
USER_ID.reset(token_user)
# Пример использования
async def process_order(user_id: str):
async with set_logging_context(user_id=user_id):
logger = get_context_logger(__name__)
logger.info("Обработка заказа начата")
# ... асинхронная логика
logger.info("Заказ обработан")
Для генерации request_id используют комбинацию временной метки и случайной части (uuid) или заимствуют ID из входящего HTTP-заголовка (например, X-Request-ID). В production системах этот ID передают через все микросервисы для сквозной трассировки.
Готовая конфигурация для FastAPI: middleware и интеграция
В FastAPI контекст логирования устанавливают через middleware, который работает до и после обработки запроса. Dependency инжектирует настроенный логгер в эндпоинты.
Middleware для инъекции request_id в контекст запроса
Этот middleware генерирует request_id для каждого запроса, устанавливает его в contextvars и добавляет в заголовки ответа для клиента.
from fastapi import FastAPI, Request, Response
import uuid
import contextvars
import time
app = FastAPI()
REQUEST_ID = contextvars.ContextVar("request_id", default=None)
@app.middleware("http")
async def logging_context_middleware(request: Request, call_next):
# Пытаемся получить request_id из заголовка, иначе генерируем
request_id = request.headers.get("X-Request-ID") or f"fastapi-{uuid.uuid4().hex[:8]}"
# Устанавливаем контекстную переменную
token = REQUEST_ID.set(request_id)
# Засекаем время выполнения для логов
start_time = time.time()
try:
response = await call_next(request)
# Добавляем request_id в заголовки ответа
response.headers["X-Request-ID"] = request_id
return response
finally:
# Восстанавливаем контекст
REQUEST_ID.reset(token)
# Логируем время выполнения
duration = time.time() - start_time
logger = get_context_logger(__name__)
logger.info(
"Запрос завершен",
extra={"duration_sec": round(duration, 3), "method": request.method, "path": request.url.path}
)
Middleware обрабатывает и фоновые задачи, запущенные через asyncio.create_task() внутри эндпоинта, потому что они наследуют контекст родительской задачи. Однако задачи, созданные вне запроса (например, в фоновом процессе), требуют ручной установки контекста.
Dependency для получения логгера с контекстом в эндпоинтах
Каноничный способ FastAPI для предоставления зависимостей – dependency injection.
from fastapi import Depends
def get_context_logger_dependency(logger_name: str = __name__):
"""Возвращает dependency-функцию для получения контекстного логгера."""
def _get_logger():
return get_context_logger(logger_name)
return _get_logger
# Создаем dependency
GetLogger = get_context_logger_dependency(__name__)
@app.get("/items/")
async def read_items(logger: logging.Logger = Depends(GetLogger)):
logger.info("Получение списка товаров")
# request_id уже будет в логах благодаря адаптеру
return {"items": ["item1", "item2"]}
Альтернатива – глобальная утилитная функция, но dependency обеспечивает лучшую тестируемость и явную передачу зависимостей.
Конфигурация dictConfig для структурированного JSON-логирования
Для production окружения логи должны быть машиночитаемыми. Формат JSON позволяет легко отправлять их в ELK Stack (Elasticsearch, Logstash, Kibana) или Grafana Loki.
import logging.config
from pythonjsonlogger import jsonlogger
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"json": {
"()": "pythonjsonlogger.jsonlogger.JsonFormatter",
"fmt": '%(asctime)s %(levelname)s %(name)s %(message)s %(request_id)s %(user_id)s',
"datefmt": "%Y-%m-%dT%H:%M:%SZ",
"rename_fields": {
"asctime": "timestamp",
"levelname": "level",
"name": "logger"
},
"static_fields": {
"service": "my-fastapi-app",
"environment": "production"
}
},
"detailed": {
"format": "%(asctime)s [%(levelname)s] %(name)s (request_id=%(request_id)s, user_id=%(user_id)s): %(message)s"
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "json",
"stream": "ext://sys.stdout"
},
"file": {
"class": "concurrent.logging.QueueHandler", # Важно для асинхронности
"level": "DEBUG",
"formatter": "json",
"filename": "/var/log/app/app.log",
"maxBytes": 10485760, # 10 MB
"backupCount": 5
}
},
"loggers": {
"": { # Корневой логгер
"level": "INFO",
"handlers": ["console", "file"],
"propagate": False
},
"uvicorn": {
"level": "WARNING",
"propagate": False
}
}
}
logging.config.dictConfig(LOGGING_CONFIG)
Обратите внимание на использование QueueHandler для файлового обработчика. Это предотвращает блокировку event loop при записи на диск. Для глубокого погружения в создание кастомных обработчиков для PostgreSQL, RabbitMQ и настройки алертинга в Slack изучите руководство «Промышленное логирование в Python: контекст, кастомные обработчики для PostgreSQL, RabbitMQ, алерты в Slack».
Логирование в Django Channels: работа с потребителями и фоновыми задачами
Django Channels добавляет асинхронный слой к синхронному ядру Django. Основная единица – consumer, который обрабатывает WebSocket-соединения или задачи из фоновых очередей. Проблема в том, что данные из scope (аналог request в Django) доступны только внутри методов consumer.
Middleware для Django Channels: привязка scope к контексту выполнения
Middleware для Channels работает аналогично middleware HTTP, но для протоколов WebSocket, HTTP2 и других.
# myapp/middleware.py
import contextvars
from channels.middleware import BaseMiddleware
REQUEST_ID = contextvars.ContextVar('request_id', default=None)
USER_ID = contextvars.ContextVar('user_id', default=None)
class ContextVarsMiddleware(BaseMiddleware):
"""Middleware для Django Channels, сохраняющий данные scope в contextvars."""
async def __call__(self, scope, receive, send):
# Извлекаем данные из scope
request_id = scope.get("headers", {}).get(b"x-request-id", b"").decode()
user = scope.get("user")
user_id = str(user.pk) if user and user.is_authenticated else None
# Устанавливаем контекстные переменные
token_req = REQUEST_ID.set(request_id or f"channels-{uuid.uuid4().hex[:8]}")
token_user = USER_ID.set(user_id) if user_id else None
try:
return await super().__call__(scope, receive, send)
finally:
# Очистка контекста
REQUEST_ID.reset(token_req)
if token_user:
USER_ID.reset(token_user)
Зарегистрируйте middleware в asgi.py или routing.py:
# myproject/routing.py
from channels.routing import ProtocolTypeRouter, URLRouter
from django.urls import path
from myapp.middleware import ContextVarsMiddleware
application = ProtocolTypeRouter({
"websocket": ContextVarsMiddleware(
URLRouter([
path("ws/some_path/", MyConsumer.as_asgi()),
])
),
})
Теперь внутри consumer и любой асинхронной функции, вызванной из него, будут доступны REQUEST_ID.get() и USER_ID.get().
Базовый ContextAwareConsumer для наследования
Чтобы не создавать логгер вручную в каждом consumer, создайте базовый класс.
# myapp/consumers.py
import logging
from channels.generic.websocket import AsyncWebsocketConsumer
from .context_logging import get_context_logger
class ContextAwareAsyncConsumer(AsyncWebsocketConsumer):
"""Базовый consumer с поддержкой контекстного логирования."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._logger = None
@property
def logger(self) -> logging.Logger:
if self._logger is None:
self._logger = get_context_logger(self.__class__.__module__)
return self._logger
# Пример использования
class MyConsumer(ContextAwareAsyncConsumer):
async def connect(self):
self.logger.info("WebSocket соединение установлено", extra={"path": self.scope["path"]})
await self.accept()
async def receive(self, text_data):
self.logger.debug("Получено сообщение", extra={"data_length": len(text_data)})
# Обработка данных...
await self.send(text_data="OK")
Особенность Channels – вызов синхронного кода Django (например, ORM) через database_sync_to_async. Контекстные переменные не передаются автоматически в синхронный код, запущенный в отдельном потоке. Решение – явно передать нужные значения как аргументы или использовать contextvars.copy_context().
Для фоновых задач, созданных через asyncio.create_task() внутри consumer, контекст наследуется корректно. Но если задача запускается из внешнего планировщика (Celery, Huey), контекст нужно установить вручную при старте задачи.
Производительность и надежность: что проверить перед продакшеном
Неправильная настройка логирования в асинхронном приложении может привести к дедлокам, потере логов или значительному падению производительности. Проверьте эти пункты перед выкатыванием в production.
Избегаем блокирующих хендлеров и дедлоков
Стандартные FileHandler, SMTPHandler и SocketHandler выполняют синхронные операции ввода-вывода. В асинхронном приложении они блокируют event loop, что приводит к резкому падению RPS (запросов в секунду).
Решение – использовать concurrent.logging.QueueHandler вместе с QueueListener. Запись логов помещается в очередь, а отдельный поток-слушатель забирает их оттуда и передает реальным обработчикам.
import logging
import logging.handlers
from concurrent.logging import QueueHandler, QueueListener
import queue
# Создаем очередь
log_queue = queue.Queue(-1) # Без ограничения размера
# Настраиваем реальные обработчики (они будут работать в отдельном потоке)
file_handler = logging.handlers.RotatingFileHandler("app.log", maxBytes=10*1024*1024, backupCount=5)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.WARNING)
# Создаем слушателя очереди с обработчиками
listener = QueueListener(log_queue, file_handler, console_handler)
listener.start()
# Настраиваем корневой логгер на использование QueueHandler
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
queue_handler = QueueHandler(log_queue)
root_logger.addHandler(queue_handler)
Также безопасны WatchedFileHandler (для Docker, где нужно отслеживать rotation сигналами) и SysLogHandler (если syslog демон работает локально).
Структурированное логирование (JSON) для ELK Stack и Grafana Loki
Текстовые логи сложно анализировать автоматически. JSON формат превращает каждую запись в объект с предсказуемыми полями, которые легко индексировать и фильтровать.
Ключевые поля для добавления в каждый лог:
timestamp: время в формате ISO 8601.level: уровень логирования (INFO, ERROR).message: текстовое сообщение.request_id: сквозной идентификатор запроса.user_id: идентификатор пользователя (если есть).service_name: имя микросервиса или приложения.module: имя модуля Python.duration_sec: длительность операции (для middleware).
Библиотека python-json-logger позволяет легко добавить эти поля через конфигурацию dictConfig, как показано выше. Для отправки таких логов в Grafana Loki настройте прометей-совместимые метки (job, instance, level).
Подробные конфигурации Python-логгеров для ELK Stack и Loki, включая интеграцию с Docker и Kubernetes, собраны в руководстве «Структурированное логирование в Python для ELK Stack и Grafana Loki».
Накладные расходы на использование contextvars минимальны. Замеры показывают увеличение времени вызова на 0.5-2 микросекунды по сравнению с передачей контекста через аргументы функций. Это приемлемая цена за удобство и надежность.
Тестирование: убедитесь, что контекст не «протекает» между независимыми запросами. Напишите тест, который имитирует два параллельных запроса и проверяет, что их request_id не смешиваются в логах. Мониторинг: отслеживайте скорость логирования (сообщений в секунду) и время блокировки в очереди QueueHandler.
Чеклист внедрения и ответы на частые вопросы
Пошаговый план для внедрения контекстного логирования в существующий проект:
- Добавьте модуль с объявлением
contextvarsи классомContextLoggingAdapter. - Для FastAPI: установите middleware из раздела выше и dependency для логгера. Для Django Channels: добавьте
ContextVarsMiddlewareи базовыйContextAwareAsyncConsumer. - Замените вызовы
logging.getLogger(__name__)наget_context_logger(__name__)в асинхронных частях кода. - Настройте неблокирующие обработчики через
QueueHandlerвLOGGING_CONFIG. - Протестируйте цепочку запросов: убедитесь, что
request_idсохраняется от входящего HTTP-запроса через все асинхронные вызовы до фоновой задачи и обратно в ответ.
Вопрос: Что делать со сторонними асинхронными библиотеками (aiohttp, databases)? Они используют свой внутренний логгер.
Ответ: Настройте корневой логгер с QueueHandler и JSON-форматтером. Большинство библиотек наследуют настройки корневого логгера. Для aiohttp можно установить свой логгер через aiohttp.web.Application settings.
Вопрос: Как мигрировать постепенно, не ломая существующее логирование?
Ответ: Начните с middleware и адаптера. Заменяйте getLogger на get_context_logger по мере изменения кода. Старый код продолжит работать, но без контекстных полей. Используйте фильтр, который добавляет поля по умолчанию для логов без контекста.
Вопрос: Как быть с синхронными частями кода в основном теле Django (views, models)?
Ответ: В синхронном коде продолжайте использовать threading.local или передавайте контекст через аргументы. На границе синхронного и асинхронного кода (например, в sync_to_async) явно передавайте нужные значения.
Для автоматизации анализа таких структурированных логов с помощью ИИ, что экономит до 60% времени на расследовании инцидентов, изучите практическое руководство «Анализ логов с помощью ИИ и LLM в 2026: практические решения для DevOps».
Настройка надежного логирования ошибок и исключений, включая интеграцию с Sentry и DataDog, описана в отдельном руководстве «Логирование ошибок и исключений в Python: обработка traceback и интеграция с Sentry/DataDog».
Для работы с различными моделями ИИ через единый API, что упрощает интеграцию логирования с системами анализа, можно использовать сервис AiTunnel. Он предоставляет доступ к более чем 200 моделям, включая GPT, Gemini и Claude, с оплатой в рублях и без необходимости VPN.