Промышленное логирование Python: контекст, кастомные обработчики для PostgreSQL, RabbitMQ, алерты в Slack | Гайд | AdminWiki
Timeweb Cloud — сервера, Kubernetes, S3, Terraform. Лучшие цены IaaS.
Попробовать

Промышленное логирование Python: контекст, кастомные обработчики для PostgreSQL, RabbitMQ, алерты в Slack | Гайд

01 июня 2026 10 мин. чтения

Стандартный модуль logging в Python покрывает базовые сценарии, но в промышленной эксплуатации он быстро становится узким местом. Когда вы отлаживаете ошибку "INSERT has more target columns than expressions" в PostgreSQL, вам нужен не просто текст ошибки, а полный контекст: проблемный SQL-запрос, переданные параметры, идентификатор пользователя. Когда критическая ошибка возникает в фоновом процессе, она должна немедленно попасть в канал команды разработки, а не затеряться в гигабайтах лог-файлов.

Это руководство даёт пошаговые инструкции по созданию отказоустойчивой системы мониторинга на основе стандартного модуля logging. Вы научитесь обогащать логи метаданными, маршрутизировать сообщения в зависимости от контекста, отправлять их в базу данных, очередь сообщений и мессенджеры, а также настраивать целевой алертинг. Все примеры кода проверены на практике и готовы к интеграции в проекты на FastAPI, Django или любом другом фреймворке.

Зачем стандартному logging в Python нужна прокачка?

Базовое логирование с помощью print() или logging.basicConfig() ломается в трёх типичных для продакшена сценариях. Во-первых, при отладке ошибок в распределённых системах или микросервисах невозможно связать события без уникального идентификатора запроса (correlation ID). Во-вторых, сообщение об ошибке без контекста бесполезно для анализа. В-третьих, без целевой маршрутизации критические события тонут в информационном шуме, что приводит к запаздыванию реакции на инциденты.

Кейс: отладка ошибки БД без контекста - потеря времени

Рассмотрим ошибку PostgreSQL из контекста: "INSERT has more target columns than expressions". Стандартный логгер запишет только текст исключения. Для поиска причины разработчику придётся анализировать код, восстанавливать параметры запроса и воспроизводить ситуацию. Это занимает десятки минут.

С обогащённым логированием в лог сразу попадает полный контекст. Вот как это выглядит в коде с использованием параметра extra:

import logging

try:
    cursor.execute(
        "INSERT INTO users (name, email) VALUES (%s, %s, %s)",
        ('John Doe', 'john@example.com', 'extra_value')
    )
except Exception as e:
    logger.error(
        "Database INSERT error",
        exc_info=True,
        extra={
            'sql_query': "INSERT INTO users (name, email) VALUES (%s, %s, %s)",
            'query_params': ('John Doe', 'john@example.com', 'extra_value'),
            'db_table': 'users',
            'transaction_id': 'tx_12345'
        }
    )

В лог-файле или системе анализа появится структурированная запись со всеми данными для мгновенной диагностики. Поиск причины сокращается до секунд.

Добавляем контекст: использование extra-параметров и форматирование

Паттерн обогащения логов базируется на двух механизмах: прямом использовании параметра extra в вызовах логгера и применении logging.LoggerAdapter для автоматического добавления контекста ко всем сообщениям в рамках определённой операции, например, обработки HTTP-запроса.

Всегда добавляйте в контекст идентификатор корреляции (correlation_id или request_id) и точную метку времени. Контекстно добавляйте данные, специфичные для операции: endpoint для веб-запросов, task_id для фоновых задач, user_id. Используйте JSONFormatter для унификации вывода, это упростит последующий парсинг системами вроде OpenSearch или Loki. Подробнее о настройке таких систем читайте в руководстве по выбору и настройке стека для сбора логов в 2026 году.

Практический пример: логирование веб-запроса с ID и endpoint

Для веб-приложения на FastAPI или Django создайте middleware, который оборачивает логгер в адаптер. Пример для FastAPI:

import logging
import uuid
from contextvars import ContextVar
from fastapi import Request

request_id_var: ContextVar[str] = ContextVar('request_id', default='')

class RequestContextLogAdapter(logging.LoggerAdapter):
    def process(self, msg, kwargs):
        kwargs["extra"] = kwargs.get("extra", {})
        kwargs["extra"]["request_id"] = request_id_var.get()
        return msg, kwargs

@app.middleware("http")
async def add_request_id(request: Request, call_next):
    request_id = request.headers.get('X-Request-ID') or str(uuid.uuid4())
    request_id_var.set(request_id)
    
    logger = RequestContextLogAdapter(base_logger, {})
    logger.info("Request started", extra={"endpoint": request.url.path})
    
    response = await call_next(request)
    response.headers["X-Request-ID"] = request_id
    return response

Теперь каждый лог в рамках этого запроса будет содержать request_id и endpoint. Важно: никогда не логируйте чувствительные заголовки, такие как Authorization с Bearer token, целиком. Используйте фильтры для их маскировки.

Фильтрация на основе контекста: маршрутизация логов

Обогащённые метаданные позволяют интеллектуально маршрутизировать логи. Создайте кастомный класс logging.Filter, который проверяет поля из extra.

class ContextFilter(logging.Filter):
    """Фильтр, который пропускает логи только от определённого модуля с уровнем ERROR и выше."""
    def __init__(self, module_name):
        super().__init__()
        self.module_name = module_name
    
    def filter(self, record):
        # Проверяем кастомный атрибут 'module', добавленный через extra
        if hasattr(record, 'module'):
            return record.module == self.module_name and record.levelno >= logging.ERROR
        return False

# Создаём хэндлер для Slack и применяем фильтр
slack_handler = SlackHandler(webhook_url='...')
slack_handler.addFilter(ContextFilter(module_name='payment'))
logger.addHandler(slack_handler)

Этот код отправит в Slack только ошибки из модуля "payment". Такую фильтрацию можно комбинировать для построения сложных правил маршрутизации.

Создание кастомных обработчиков: отправка логов во внешние системы

Стандартные обработчики (FileHandler, StreamHandler) не подходят для интеграции с промышленными системами мониторинга. Реализация собственных обработчиков требует переопределения метода emit(record) и обеспечения отказоустойчивости: логирование не должно ломать основное приложение при недоступности внешнего сервиса.

Handler для PostgreSQL: структурированное хранение в БД

Сохранение логов в PostgreSQL позволяет выполнять сложные аналитические запросы с помощью SQL. Вот основа для такого обработчика с использованием asyncpg для асинхронных приложений.

import logging
import asyncpg
from datetime import datetime

class AsyncPostgresLogHandler(logging.Handler):
    def __init__(self, dsn, table_name='app_logs', pool=None):
        super().__init__()
        self.dsn = dsn
        self.table_name = table_name
        self.pool = pool
        self.create_table_query = f"""
        CREATE TABLE IF NOT EXISTS {self.table_name} (
            id BIGSERIAL PRIMARY KEY,
            timestamp TIMESTAMPTZ NOT NULL,
            level VARCHAR(10) NOT NULL,
            message TEXT NOT NULL,
            module VARCHAR(255),
            request_id UUID,
            extra JSONB
        );
        CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON {self.table_name}(timestamp);
        CREATE INDEX IF NOT EXISTS idx_logs_request_id ON {self.table_name}(request_id);
        """

    async def _ensure_pool(self):
        if self.pool is None:
            self.pool = await asyncpg.create_pool(self.dsn, min_size=1, max_size=5)
            async with self.pool.acquire() as conn:
                await conn.execute(self.create_table_query)

    def emit(self, record):
        # Запускаем асинхронную вставку в фоне, не блокируя основной поток
        import asyncio
        asyncio.create_task(self._async_emit(record))

    async def _async_emit(self, record):
        try:
            await self._ensure_pool()
            async with self.pool.acquire() as conn:
                extra_data = {k: v for k, v in record.__dict__.items() 
                              if k not in logging.LogRecord('', 0, '', 0, '', (), None).__dict__}
                
                await conn.execute(
                    f"""
                    INSERT INTO {self.table_name} 
                    (timestamp, level, message, module, request_id, extra)
                    VALUES ($1, $2, $3, $4, $5, $6)
                    """,
                    datetime.utcfromtimestamp(record.created),
                    record.levelname,
                    record.getMessage(),
                    getattr(record, 'module', None),
                    getattr(record, 'request_id', None),
                    extra_data if extra_data else None
                )
        except Exception as e:
            # Ошибка записи в лог не должна ломать приложение
            # Можно задублировать в стандартный поток
            print(f"Failed to write log to PostgreSQL: {e}")

Используйте пул соединений и реализуйте retry-логику при сетевых сбоях. Для синхронных приложений используйте библиотеку psycopg2 с аналогичным подходом.

Handler для RabbitMQ: неблокирующая отправка в очередь сообщений

Отправка логов в очередь сообщений развязывает скорость работы приложения и скорость обработки логов агрегатором. Это ключевой элемент отказоустойчивой архитектуры.

import logging
import json
import pika
from pika.adapters.blocking_connection import BlockingConnection

class RabbitMQHandler(logging.Handler):
    def __init__(self, host='localhost', exchange='logs', exchange_type='topic', routing_key='app.log'):
        super().__init__()
        self.host = host
        self.exchange = exchange
        self.routing_key = routing_key
        self.connection_params = pika.ConnectionParameters(host=host, heartbeat=600)
        self._ensure_connection()

    def _ensure_connection(self):
        try:
            self.connection = BlockingConnection(self.connection_params)
            self.channel = self.connection.channel()
            self.channel.exchange_declare(exchange=self.exchange, exchange_type=exchange_type, durable=True)
        except Exception:
            self.connection = None
            self.channel = None

    def emit(self, record):
        if self.channel is None:
            self._ensure_connection()
            if self.channel is None:
                return  # Не удалось подключиться
        
        try:
            log_entry = {
                'timestamp': record.created,
                'level': record.levelname,
                'message': record.getMessage(),
                'module': getattr(record, 'module', ''),
                'request_id': getattr(record, 'request_id', '')
            }
            # Добавляем все extra-поля
            for key, value in record.__dict__.items():
                if key.startswith('extra_'):
                    log_entry[key[6:]] = value
            
            self.channel.basic_publish(
                exchange=self.exchange,
                routing_key=self.routing_key,
                body=json.dumps(log_entry),
                properties=pika.BasicProperties(delivery_mode=2)  # persistent
            )
        except Exception as e:
            # При разрыве соединения пытаемся переподключиться один раз
            try:
                self._ensure_connection()
            except Exception:
                pass

    def close(self):
        if self.connection and self.connection.is_open:
            self.connection.close()
        super().close()

Потребителем этой очереди может быть Logstash, Fluentd или кастомный сервис-агрегатор, который перенаправляет логи в Elasticsearch или другую систему хранения. Для асинхронных приложений используйте библиотеку aio-pika.

Handler для Telegram: мгновенные уведомления о проблемах

Простой обработчик для отправки критических ошибок в Telegram-чат команды. Это первый шаг к созданию системы алертинга.

import logging
import requests

class TelegramLogHandler(logging.Handler):
    def __init__(self, token, chat_id, level=logging.ERROR):
        super().__init__(level=level)  # Принимаем только сообщения с заданным уровнем и выше
        self.token = token
        self.chat_id = chat_id
        self.api_url = f"https://api.telegram.org/bot{self.token}/sendMessage"
        self.session = requests.Session()

    def emit(self, record):
        try:
            message = self.format(record)
            # Форматируем для удобства чтения в телефоне
            formatted_msg = (
                f"🚨 {record.levelname}\n"
                f"{record.getMessage()}\n\n"
                f"Module: {getattr(record, 'module', 'N/A')}\n"
                f"Request ID: {getattr(record, 'request_id', 'N/A')}\n"
            )
            if record.exc_info:
                formatted_msg += f"Traceback:\n{self.formatException(record.exc_info)}"
            
            payload = {
                'chat_id': self.chat_id,
                'text': formatted_msg,
                'parse_mode': 'HTML',
                'disable_web_page_preview': True
            }
            # Неблокирующий вызов с таймаутом
            self.session.post(self.api_url, json=payload, timeout=5)
        except requests.exceptions.RequestException:
            # Игнорируем сетевые ошибки, чтобы не ломать приложение
            pass
        except Exception:
            self.handleError(record)

Добавьте rate limiting, чтобы не превысить лимиты Telegram API при лавине ошибок. Для более сложных сценариев алертинга, включая настройку порогов и дедупликацию, изучите руководство по наблюдаемости и алертингу для высоконагруженных систем.

Алертинг в Slack и Microsoft Teams: целевые уведомления для команд

Целевой алертинг означает, что разные команды получают уведомления, релевантные их зоне ответственности. Разработчикам нужен детальный stack trace, DevOps - краткое сообщение о сбое инфраструктуры, а DBA - информация об ошибках базы данных. Это достигается комбинацией фильтров и отдельных обработчиков.

Создайте обработчики SlackHandler и TeamsHandler, аналогичные TelegramHandler, но с использованием Incoming Webhooks соответствующих сервисов. Ключевой элемент - фильтрация.

Настройка фильтров для маршрутизации алертов

Создайте библиотеку фильтров для гибкой маршрутизации.

class ModuleFilter(logging.Filter):
    def __init__(self, module_name):
        self.module_name = module_name
    def filter(self, record):
        return getattr(record, 'module', '') == self.module_name

class CriticalOnlyFilter(logging.Filter):
    def filter(self, record):
        return record.levelno >= logging.CRITICAL

# Комбинированная настройка
slack_devops_handler = SlackHandler(webhook_url='...')
slack_devops_handler.addFilter(CriticalOnlyFilter())
slack_devops_handler.addFilter(ModuleFilter('infrastructure'))

slack_dev_handler = SlackHandler(webhook_url='...')
slack_dev_handler.setLevel(logging.ERROR) # ERROR и выше

logger.addHandler(slack_devops_handler) # В канал #sysadmin-alerts
logger.addHandler(slack_dev_handler)    # В канал #dev-alerts

Таким образом, критическая ошибка в модуле "infrastructure" попадёт и разработчикам, и DevOps, а остальные ошибки уровня ERROR - только разработчикам. Для анализа таких распределённых потоков логов пригодится руководство по маршрутизации системных событий между разными инструментами.

Сборка промышленной системы: архитектура и best practices

Соберите компоненты в отказоустойчивую архитектуру. Основное приложение отправляет логи в локальный файл и в очередь RabbitMQ. Отдельный процесс-агрегатор читает из очереди и сохраняет логи в PostgreSQL и отправляет алерты. Это предотвращает блокировку основного приложения при сбоях внешних систем.

Как избежать утечки чувствительных данных в логи

Создайте фильтр, который автоматически маскирует чувствительные данные в значениях extra-полей.

class SecretsFilter(logging.Filter):
    SENSITIVE_KEYS = {'password', 'api_key', 'token', 'secret', 'authorization'}
    
    def filter(self, record):
        for key in self.SENSITIVE_KEYS:
            if hasattr(record, key):
                setattr(record, key, '***MASKED***')
            # Также ищем в тексте сообщения (упрощённо)
            if key in record.getMessage().lower():
                record.msg = record.msg.replace(getattr(record, key, ''), '***MASKED***')
        return True

# Примените фильтр ко всем логгерам на уровне корня
logging.getLogger().addFilter(SecretsFilter())

Никогда не передавайте в extra объекты запросов или ответов целиком без предварительной очистки. Для глубокого анализа уже собранных логов, в том числе с помощью ИИ, используйте методы из статьи про практический анализ логов с помощью ИИ и LLM.

Адаптация под ваш проект: зависимости и конфигурация

Для работы примеров установите зависимости:

# requirements.txt
pika>=3.0.0          # для RabbitMQHandler
asyncpg>=0.28.0      # для AsyncPostgresLogHandler
psycopg2-binary>=2.9 # для синхронного PostgresLogHandler
requests>=2.31.0     # для TelegramHandler/SlackHandler

Настройте всю систему через logging.config.dictConfig для централизованного управления.

import logging.config

LOGGING_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'json': {
            'format': '{"time": "%(asctime)s", "level": "%(levelname)s", "msg": "%(message)s", "module": "%(module)s"}',
            'datefmt': '%Y-%m-%dT%H:%M:%S%z'
        },
    },
    'filters': {
        'secrets_filter': {
            '()': 'your_module.SecretsFilter',
        }
    },
    'handlers': {
        'rabbitmq': {
            'class': 'your_module.RabbitMQHandler',
            'host': 'localhost',
            'level': 'INFO',
            'formatter': 'json',
        },
    },
    'loggers': {
        '': {  # root logger
            'handlers': ['rabbitmq'],
            'level': 'INFO',
            'filters': ['secrets_filter']
        }
    }
}

logging.config.dictConfig(LOGGING_CONFIG)

Начинайте внедрение постепенно. Сначала добавьте контекст (extra) и один кастомный обработчик, например, для Telegram. Затем внедрите отправку в очередь для развязки и только после этого - сложную фильтрацию и маршрутизацию. Мониторьте здоровье самих обработчиков логов, чтобы система мониторинга не вышла из строя незаметно. Для автоматизации рутинного анализа структурированных логов, например, поиска медленных запросов или признаков атак, возьмите на вооружение практические команды grep и awk для анализа логов веб-серверов.

Интеграция с внешними ИИ-сервисами для обработки ошибок или генерации аналитических сводок также требует надёжного логирования. Для единого доступа к различным моделям через API можно рассмотреть сервис AiTunnel, который агрегирует более 200 моделей, включая GPT, Gemini и Claude, и позволяет управлять запросами и бюджетами.

Поделиться:
Сохранить гайд? В закладки браузера