Стандартный модуль logging в Python покрывает базовые сценарии, но в промышленной эксплуатации он быстро становится узким местом. Когда вы отлаживаете ошибку "INSERT has more target columns than expressions" в PostgreSQL, вам нужен не просто текст ошибки, а полный контекст: проблемный SQL-запрос, переданные параметры, идентификатор пользователя. Когда критическая ошибка возникает в фоновом процессе, она должна немедленно попасть в канал команды разработки, а не затеряться в гигабайтах лог-файлов.
Это руководство даёт пошаговые инструкции по созданию отказоустойчивой системы мониторинга на основе стандартного модуля logging. Вы научитесь обогащать логи метаданными, маршрутизировать сообщения в зависимости от контекста, отправлять их в базу данных, очередь сообщений и мессенджеры, а также настраивать целевой алертинг. Все примеры кода проверены на практике и готовы к интеграции в проекты на FastAPI, Django или любом другом фреймворке.
Зачем стандартному logging в Python нужна прокачка?
Базовое логирование с помощью print() или logging.basicConfig() ломается в трёх типичных для продакшена сценариях. Во-первых, при отладке ошибок в распределённых системах или микросервисах невозможно связать события без уникального идентификатора запроса (correlation ID). Во-вторых, сообщение об ошибке без контекста бесполезно для анализа. В-третьих, без целевой маршрутизации критические события тонут в информационном шуме, что приводит к запаздыванию реакции на инциденты.
Кейс: отладка ошибки БД без контекста - потеря времени
Рассмотрим ошибку PostgreSQL из контекста: "INSERT has more target columns than expressions". Стандартный логгер запишет только текст исключения. Для поиска причины разработчику придётся анализировать код, восстанавливать параметры запроса и воспроизводить ситуацию. Это занимает десятки минут.
С обогащённым логированием в лог сразу попадает полный контекст. Вот как это выглядит в коде с использованием параметра extra:
import logging
try:
cursor.execute(
"INSERT INTO users (name, email) VALUES (%s, %s, %s)",
('John Doe', 'john@example.com', 'extra_value')
)
except Exception as e:
logger.error(
"Database INSERT error",
exc_info=True,
extra={
'sql_query': "INSERT INTO users (name, email) VALUES (%s, %s, %s)",
'query_params': ('John Doe', 'john@example.com', 'extra_value'),
'db_table': 'users',
'transaction_id': 'tx_12345'
}
)
В лог-файле или системе анализа появится структурированная запись со всеми данными для мгновенной диагностики. Поиск причины сокращается до секунд.
Добавляем контекст: использование extra-параметров и форматирование
Паттерн обогащения логов базируется на двух механизмах: прямом использовании параметра extra в вызовах логгера и применении logging.LoggerAdapter для автоматического добавления контекста ко всем сообщениям в рамках определённой операции, например, обработки HTTP-запроса.
Всегда добавляйте в контекст идентификатор корреляции (correlation_id или request_id) и точную метку времени. Контекстно добавляйте данные, специфичные для операции: endpoint для веб-запросов, task_id для фоновых задач, user_id. Используйте JSONFormatter для унификации вывода, это упростит последующий парсинг системами вроде OpenSearch или Loki. Подробнее о настройке таких систем читайте в руководстве по выбору и настройке стека для сбора логов в 2026 году.
Практический пример: логирование веб-запроса с ID и endpoint
Для веб-приложения на FastAPI или Django создайте middleware, который оборачивает логгер в адаптер. Пример для FastAPI:
import logging
import uuid
from contextvars import ContextVar
from fastapi import Request
request_id_var: ContextVar[str] = ContextVar('request_id', default='')
class RequestContextLogAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
kwargs["extra"] = kwargs.get("extra", {})
kwargs["extra"]["request_id"] = request_id_var.get()
return msg, kwargs
@app.middleware("http")
async def add_request_id(request: Request, call_next):
request_id = request.headers.get('X-Request-ID') or str(uuid.uuid4())
request_id_var.set(request_id)
logger = RequestContextLogAdapter(base_logger, {})
logger.info("Request started", extra={"endpoint": request.url.path})
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return response
Теперь каждый лог в рамках этого запроса будет содержать request_id и endpoint. Важно: никогда не логируйте чувствительные заголовки, такие как Authorization с Bearer token, целиком. Используйте фильтры для их маскировки.
Фильтрация на основе контекста: маршрутизация логов
Обогащённые метаданные позволяют интеллектуально маршрутизировать логи. Создайте кастомный класс logging.Filter, который проверяет поля из extra.
class ContextFilter(logging.Filter):
"""Фильтр, который пропускает логи только от определённого модуля с уровнем ERROR и выше."""
def __init__(self, module_name):
super().__init__()
self.module_name = module_name
def filter(self, record):
# Проверяем кастомный атрибут 'module', добавленный через extra
if hasattr(record, 'module'):
return record.module == self.module_name and record.levelno >= logging.ERROR
return False
# Создаём хэндлер для Slack и применяем фильтр
slack_handler = SlackHandler(webhook_url='...')
slack_handler.addFilter(ContextFilter(module_name='payment'))
logger.addHandler(slack_handler)
Этот код отправит в Slack только ошибки из модуля "payment". Такую фильтрацию можно комбинировать для построения сложных правил маршрутизации.
Создание кастомных обработчиков: отправка логов во внешние системы
Стандартные обработчики (FileHandler, StreamHandler) не подходят для интеграции с промышленными системами мониторинга. Реализация собственных обработчиков требует переопределения метода emit(record) и обеспечения отказоустойчивости: логирование не должно ломать основное приложение при недоступности внешнего сервиса.
Handler для PostgreSQL: структурированное хранение в БД
Сохранение логов в PostgreSQL позволяет выполнять сложные аналитические запросы с помощью SQL. Вот основа для такого обработчика с использованием asyncpg для асинхронных приложений.
import logging
import asyncpg
from datetime import datetime
class AsyncPostgresLogHandler(logging.Handler):
def __init__(self, dsn, table_name='app_logs', pool=None):
super().__init__()
self.dsn = dsn
self.table_name = table_name
self.pool = pool
self.create_table_query = f"""
CREATE TABLE IF NOT EXISTS {self.table_name} (
id BIGSERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL,
level VARCHAR(10) NOT NULL,
message TEXT NOT NULL,
module VARCHAR(255),
request_id UUID,
extra JSONB
);
CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON {self.table_name}(timestamp);
CREATE INDEX IF NOT EXISTS idx_logs_request_id ON {self.table_name}(request_id);
"""
async def _ensure_pool(self):
if self.pool is None:
self.pool = await asyncpg.create_pool(self.dsn, min_size=1, max_size=5)
async with self.pool.acquire() as conn:
await conn.execute(self.create_table_query)
def emit(self, record):
# Запускаем асинхронную вставку в фоне, не блокируя основной поток
import asyncio
asyncio.create_task(self._async_emit(record))
async def _async_emit(self, record):
try:
await self._ensure_pool()
async with self.pool.acquire() as conn:
extra_data = {k: v for k, v in record.__dict__.items()
if k not in logging.LogRecord('', 0, '', 0, '', (), None).__dict__}
await conn.execute(
f"""
INSERT INTO {self.table_name}
(timestamp, level, message, module, request_id, extra)
VALUES ($1, $2, $3, $4, $5, $6)
""",
datetime.utcfromtimestamp(record.created),
record.levelname,
record.getMessage(),
getattr(record, 'module', None),
getattr(record, 'request_id', None),
extra_data if extra_data else None
)
except Exception as e:
# Ошибка записи в лог не должна ломать приложение
# Можно задублировать в стандартный поток
print(f"Failed to write log to PostgreSQL: {e}")
Используйте пул соединений и реализуйте retry-логику при сетевых сбоях. Для синхронных приложений используйте библиотеку psycopg2 с аналогичным подходом.
Handler для RabbitMQ: неблокирующая отправка в очередь сообщений
Отправка логов в очередь сообщений развязывает скорость работы приложения и скорость обработки логов агрегатором. Это ключевой элемент отказоустойчивой архитектуры.
import logging
import json
import pika
from pika.adapters.blocking_connection import BlockingConnection
class RabbitMQHandler(logging.Handler):
def __init__(self, host='localhost', exchange='logs', exchange_type='topic', routing_key='app.log'):
super().__init__()
self.host = host
self.exchange = exchange
self.routing_key = routing_key
self.connection_params = pika.ConnectionParameters(host=host, heartbeat=600)
self._ensure_connection()
def _ensure_connection(self):
try:
self.connection = BlockingConnection(self.connection_params)
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange=self.exchange, exchange_type=exchange_type, durable=True)
except Exception:
self.connection = None
self.channel = None
def emit(self, record):
if self.channel is None:
self._ensure_connection()
if self.channel is None:
return # Не удалось подключиться
try:
log_entry = {
'timestamp': record.created,
'level': record.levelname,
'message': record.getMessage(),
'module': getattr(record, 'module', ''),
'request_id': getattr(record, 'request_id', '')
}
# Добавляем все extra-поля
for key, value in record.__dict__.items():
if key.startswith('extra_'):
log_entry[key[6:]] = value
self.channel.basic_publish(
exchange=self.exchange,
routing_key=self.routing_key,
body=json.dumps(log_entry),
properties=pika.BasicProperties(delivery_mode=2) # persistent
)
except Exception as e:
# При разрыве соединения пытаемся переподключиться один раз
try:
self._ensure_connection()
except Exception:
pass
def close(self):
if self.connection and self.connection.is_open:
self.connection.close()
super().close()
Потребителем этой очереди может быть Logstash, Fluentd или кастомный сервис-агрегатор, который перенаправляет логи в Elasticsearch или другую систему хранения. Для асинхронных приложений используйте библиотеку aio-pika.
Handler для Telegram: мгновенные уведомления о проблемах
Простой обработчик для отправки критических ошибок в Telegram-чат команды. Это первый шаг к созданию системы алертинга.
import logging
import requests
class TelegramLogHandler(logging.Handler):
def __init__(self, token, chat_id, level=logging.ERROR):
super().__init__(level=level) # Принимаем только сообщения с заданным уровнем и выше
self.token = token
self.chat_id = chat_id
self.api_url = f"https://api.telegram.org/bot{self.token}/sendMessage"
self.session = requests.Session()
def emit(self, record):
try:
message = self.format(record)
# Форматируем для удобства чтения в телефоне
formatted_msg = (
f"🚨 {record.levelname}\n"
f"{record.getMessage()}\n\n"
f"Module: {getattr(record, 'module', 'N/A')}\n"
f"Request ID: {getattr(record, 'request_id', 'N/A')}\n"
)
if record.exc_info:
formatted_msg += f"Traceback:\n{self.formatException(record.exc_info)}"
payload = {
'chat_id': self.chat_id,
'text': formatted_msg,
'parse_mode': 'HTML',
'disable_web_page_preview': True
}
# Неблокирующий вызов с таймаутом
self.session.post(self.api_url, json=payload, timeout=5)
except requests.exceptions.RequestException:
# Игнорируем сетевые ошибки, чтобы не ломать приложение
pass
except Exception:
self.handleError(record)
Добавьте rate limiting, чтобы не превысить лимиты Telegram API при лавине ошибок. Для более сложных сценариев алертинга, включая настройку порогов и дедупликацию, изучите руководство по наблюдаемости и алертингу для высоконагруженных систем.
Алертинг в Slack и Microsoft Teams: целевые уведомления для команд
Целевой алертинг означает, что разные команды получают уведомления, релевантные их зоне ответственности. Разработчикам нужен детальный stack trace, DevOps - краткое сообщение о сбое инфраструктуры, а DBA - информация об ошибках базы данных. Это достигается комбинацией фильтров и отдельных обработчиков.
Создайте обработчики SlackHandler и TeamsHandler, аналогичные TelegramHandler, но с использованием Incoming Webhooks соответствующих сервисов. Ключевой элемент - фильтрация.
Настройка фильтров для маршрутизации алертов
Создайте библиотеку фильтров для гибкой маршрутизации.
class ModuleFilter(logging.Filter):
def __init__(self, module_name):
self.module_name = module_name
def filter(self, record):
return getattr(record, 'module', '') == self.module_name
class CriticalOnlyFilter(logging.Filter):
def filter(self, record):
return record.levelno >= logging.CRITICAL
# Комбинированная настройка
slack_devops_handler = SlackHandler(webhook_url='...')
slack_devops_handler.addFilter(CriticalOnlyFilter())
slack_devops_handler.addFilter(ModuleFilter('infrastructure'))
slack_dev_handler = SlackHandler(webhook_url='...')
slack_dev_handler.setLevel(logging.ERROR) # ERROR и выше
logger.addHandler(slack_devops_handler) # В канал #sysadmin-alerts
logger.addHandler(slack_dev_handler) # В канал #dev-alerts
Таким образом, критическая ошибка в модуле "infrastructure" попадёт и разработчикам, и DevOps, а остальные ошибки уровня ERROR - только разработчикам. Для анализа таких распределённых потоков логов пригодится руководство по маршрутизации системных событий между разными инструментами.
Сборка промышленной системы: архитектура и best practices
Соберите компоненты в отказоустойчивую архитектуру. Основное приложение отправляет логи в локальный файл и в очередь RabbitMQ. Отдельный процесс-агрегатор читает из очереди и сохраняет логи в PostgreSQL и отправляет алерты. Это предотвращает блокировку основного приложения при сбоях внешних систем.
Как избежать утечки чувствительных данных в логи
Создайте фильтр, который автоматически маскирует чувствительные данные в значениях extra-полей.
class SecretsFilter(logging.Filter):
SENSITIVE_KEYS = {'password', 'api_key', 'token', 'secret', 'authorization'}
def filter(self, record):
for key in self.SENSITIVE_KEYS:
if hasattr(record, key):
setattr(record, key, '***MASKED***')
# Также ищем в тексте сообщения (упрощённо)
if key in record.getMessage().lower():
record.msg = record.msg.replace(getattr(record, key, ''), '***MASKED***')
return True
# Примените фильтр ко всем логгерам на уровне корня
logging.getLogger().addFilter(SecretsFilter())
Никогда не передавайте в extra объекты запросов или ответов целиком без предварительной очистки. Для глубокого анализа уже собранных логов, в том числе с помощью ИИ, используйте методы из статьи про практический анализ логов с помощью ИИ и LLM.
Адаптация под ваш проект: зависимости и конфигурация
Для работы примеров установите зависимости:
# requirements.txt
pika>=3.0.0 # для RabbitMQHandler
asyncpg>=0.28.0 # для AsyncPostgresLogHandler
psycopg2-binary>=2.9 # для синхронного PostgresLogHandler
requests>=2.31.0 # для TelegramHandler/SlackHandler
Настройте всю систему через logging.config.dictConfig для централизованного управления.
import logging.config
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'json': {
'format': '{"time": "%(asctime)s", "level": "%(levelname)s", "msg": "%(message)s", "module": "%(module)s"}',
'datefmt': '%Y-%m-%dT%H:%M:%S%z'
},
},
'filters': {
'secrets_filter': {
'()': 'your_module.SecretsFilter',
}
},
'handlers': {
'rabbitmq': {
'class': 'your_module.RabbitMQHandler',
'host': 'localhost',
'level': 'INFO',
'formatter': 'json',
},
},
'loggers': {
'': { # root logger
'handlers': ['rabbitmq'],
'level': 'INFO',
'filters': ['secrets_filter']
}
}
}
logging.config.dictConfig(LOGGING_CONFIG)
Начинайте внедрение постепенно. Сначала добавьте контекст (extra) и один кастомный обработчик, например, для Telegram. Затем внедрите отправку в очередь для развязки и только после этого - сложную фильтрацию и маршрутизацию. Мониторьте здоровье самих обработчиков логов, чтобы система мониторинга не вышла из строя незаметно. Для автоматизации рутинного анализа структурированных логов, например, поиска медленных запросов или признаков атак, возьмите на вооружение практические команды grep и awk для анализа логов веб-серверов.
Интеграция с внешними ИИ-сервисами для обработки ошибок или генерации аналитических сводок также требует надёжного логирования. Для единого доступа к различным моделям через API можно рассмотреть сервис AiTunnel, который агрегирует более 200 моделей, включая GPT, Gemini и Claude, и позволяет управлять запросами и бюджетами.