|
|
|
Professor Seleznov
|
Ваш алерт срабатывает в 3 часа ночи: «Группа потребителей orders-processor отстает на 50 000 сообщений». Вы в тревоге бросаетесь к ноутбуку. Это отставание на 10 секунд, 10 минут или 10 часов? В алерте этого нет. Вы открываете дашборд – там всё те же 50 000 сообщений. Но что это на самом деле означает для ваших пользователей? Метрика, за которой следят все, – смещение лага – почти ничего не говорит о реальном состоянии потребителя. Отставание в 1 000 сообщений может означать задержку в 1 секунду или в 1 день – всё зависит от пропускной способности топика. И именно это число обычно крупно отображается на дашбордах. Метрика, которая действительно важна, – временной лаг, то есть на сколько секунд ваш потребитель фактически отстает. Но большинство инструментов мониторинга либо не показывают её вовсе, либо делают это неточно. Давайте разберемся, почему так происходит и что с этим можно сделать. Иллюзия offset lag Начнем с основ. Offset lag вычисляется так: offset_lag = latest_offset - committed_offset Где:
- latest_offset – позиция следующего сообщения, которое будет записано
- committed_offset– позиция, до которой потребитель уже обработал сообщения
Просто, правда? Два числа, вычли одно из другого – готово. Именно из-за этой простоты все инструменты мониторинга его и показывают. Kafka Admin API напрямую возвращает оба значения – не нужно читать сообщения, разбирать таймстемпы, только метаданные. Но именно здесь всё начинает ломаться. Offset lag полностью оторван от времени. Рассмотрим два сценария:
| Сценарий |
Лаг по offset |
Что это на самом деле означает |
Топик с высокой пропускной способностью (10 000 сообщений/сек) |
50 000 |
Отставание на 5 секунд |
Топик с низкой пропускной способностью (10 сообщений/час) |
50 |
Отставание на 5 часов |
Один и тот же тип метрики, но принципиально разная критичность. Тот самый ночной алерт на 50 000 сообщений? В нагруженном топике дежурный инженер может зря потерять сон. А в малонагруженном топике даже 50 сообщений могут означать, что ваш потребитель не работает уже несколько дней. Ловушка «простаивающего продюсера» Дальше – хуже. Что происходит, когда продюсер перестает отправлять сообщения? Ваш offset lag остается неизменным. Committed offset не сдвигается, потому что нечего обрабатывать. Latest offset тоже не меняется, потому что новые сообщения не появляются. Разница? Ноль. При этом реальная задержка продолжает расти. Если последнее сообщение было отправлено час назад, а ваш потребитель всё еще находится на этом смещении, вы отстаете на час в реальном времени – хотя по offset lag всё выглядит нормально. Я видел, как команды сталкивались с этим на практике. Топик с низким трафиком, который обрабатывает батч-задачи раз в сутки, показывал «стабильный лаг» целую неделю. Оказалось, что потребитель упал, но так как продюсер отправлял сообщения только раз в день, значение лага почти не менялось. Проблему заметили пользователи, когда перестали приходить ежедневные отчеты. Почему time lag сложно считать корректно Если временной лаг так важен, почему его не показывает каждый инструмент? Потому что его корректный расчет – задача дорогая и нетривиальная. Time lag определяется так: time_lag = current_time - timestamp_of_message_at_committed_offset Чтобы получить этот таймстемп, нужно реально прочитать сообщение по committed offset. Это означает:
- Создать потребителя
- Переместиться к нужному смещению
- Считать сообщение
- Извлечь его таймстемп
- Сделать это для каждой партиции с лагом
Для кластера с сотнями consumer group и тысячами партиций это очень быстро превращается в серьезную нагрузку. Поэтому большинство инструментов просто избегают этого подхода. Шорткат через интерполяцию (и почему он не работает) Некоторые инструменты пытаются действовать «умнее». Популярный kafka-lag-exporter (написан на Scala) строит таблицу соответствий пар (offset, timestamp) во времени. Когда нужно оценить таймстемп для конкретного смещения, он интерполирует между известными точками. Звучит разумно. Но вот где это ломается:
- Проблема холодного старта:таблица соответствий изначально пустая. Пока не накопится достаточное количество точек, метрика фактически не имеет значения.
- Граничный случай с простаивающим продюсером: когда производство сообщений останавливается, таблица «схлопывает» повторяющиеся смещения в плоский участок. Интерполяция внутри такого участка приводит к делению на ноль (dy = 0 в формуле), что дает NaN для time lag.
- Переменная пропускная способность:линейная интерполяция предполагает постоянную скорость поступления сообщений между точками. Если внутри одного интервала опроса (по умолчанию 30 секунд) происходят всплески, оценки времени становятся неточными, поскольку реальный нелинейный характер нагрузки не учитывается.
Когда интерполяция не срабатывает (холодный старт, недостаточно данных), система возвращает NaN для time lag. На дашбордах это выглядит как пропущенные значения, а не ноль. При этом метрика offset lag (которая не зависит от интерполяции) остается корректной даже тогда, когда time lag посчитать невозможно. Правильный подход: прямое считывание таймстемпов Точного результата без прямого доступа к данным не получить. Если вам нужен реальный time lag, нужно читать фактические таймстемпы сообщений. Вот подход, реализованный в klag-exporter: Для каждой партиции с lag > 0:
- Создать потребителя и перейти к committed offset
- Считать одно сообщение
- Извлечь его таймстемп
- Посчитать: time_lag = now - message_timestamp
- Закешировать результат, чтобы не перегружать Kafka
Ключевая идея в том, что таймстемпы нужно получать только для тех партиций, где действительно есть lag. Если потребитель догнал поток (lag = 0), таймстемп не нужен – и так понятно, что всё актуально. Для партиций с lag получение таймстемпа кэшируется с временем жизни (TTL). Если потребитель не сдвинулся (тот же committed offset), закешированное значение остается актуальным. Новый запрос выполняется только тогда, когда фиксируется новое смещение. Этот подход дает:
- Точный time lag независимо от пропускной способности топика
- Корректное поведение при простое продюсера (time lag продолжает расти, как и должен)
- Контролируемую нагрузку на Kafka за счет кэширования
А что насчет граничных случаев? Две ситуации могут снизить точность таймстемпов: Уплотнение лога: если топик использует cleanup.policy=compact, сообщение по вашему committed offset могло быть удалено при уплотнении. Kafka вернет следующее доступное сообщение, у которого таймстемп будет более поздним. В результате отображаемый лаг окажется немного ниже реального. Удаление по политике хранения: если ваш committed offset указывает на сообщение, удаленное по политике хранения, возникает та же проблема – Kafka возвращает более новое сообщение. В обоих случаях лаг будет занижен: система покажет меньшую задержку, чем есть на самом деле. Корректная реализация должна обнаруживать такие ситуации и помечать их, чтобы вы понимали: число может быть неточным. TL;DR:для расчета time lag нужно читать реальные сообщения. Интерполяция выглядит привлекательно, но ломается на простаивающих и низконагруженных топиках. Прямое считывание с кэшированием дает точные значения и не убивает брокеры.
| Если хотите понять, насколько уверенно вы разбираетесь в Kafka за пределами базового «producer отправляет, consumer читает»,пройдите вступительный тест. |
Метрики, которые вам действительно нужны Вот что должен включать мониторинг Kafka: По партициям – для отладки конкретных проблем
kafka_consumergroup_group_lag{group="orders", topic="orders", partition="0"} 1523 kafka_consumergroup_group_lag_seconds{group="orders", topic="orders", partition="0"} 12.5
По группе– для алертинга
kafka_consumergroup_group_max_lag{group="orders"} 2847 kafka_consumergroup_group_max_lag_seconds{group="orders"} 45.2
Offset lag (group_lag) всё еще полезен: он показывает, сколько сообщений нужно обработать. Но его нужно смотреть вместе с time lag (group_lag_seconds), чтобы понимать критичность ситуации. Для алертинга используйте max_lag_seconds по всем партициям. Так вы получите задержку в худшем случае для всей группы: Алерт, если любая партиция отстает больше чем на 60 секунд
kafka_consumergroup_group_max_lag_seconds{group="orders-processor"} > 60
Такой алерт срабатывает по фактической задержке, а не по произвольному количеству сообщений. SLA вида «обрабатывать заказы в течение 30 секунд» наконец-то можно нормально контролировать. Знакомьтесь: klag-exporter Мы создали klag-exporter, чтобы решить эти проблемы. Он написан на Rust ради производительности и использует прямое считывание таймстемпов вместо интерполяции.

klag%20exporter Ключевые отличия от существующих инструментов:
| Характеристика |
klag-exporter |
kafka-lag-exporter |
KMinion |
| Метод расчёта временного лага |
Прямое сэмплирование |
Интерполяция |
Н/Д — только offset |
| Обработка простаивающего продюсера |
Точная |
Некорректно показывает 0 |
Н/Д |
| Использование памяти |
~30 МБ |
~300 МБ (JVM) |
~80 МБ |
| Время запуска |
< 1 секунды |
5–15 секунд |
< 1 секунды |
Экспортер также обнаруживает граничные случаи с уплотнением и политикой хранения, помечая затронутые партиции, чтобы вы понимали, когда к time lag стоит относиться с поправкой. Быстрый старт с Docker Начнем с самой простой возможной настройки и будем двигаться дальше. Минимальная настройка
- Создайте базовую конфигурацию
# Создайте базовый конфиг cat > config.toml << 'EOF' [exporter] poll_interval = "30s" http_port = 8000 [[clusters]] name = "my-cluster" bootstrap_servers = "kafka:9092" EOF
2. Запустите экспортер
docker run -d \ --name klag-exporter \ -p 8000:8000 \ -v $(pwd)/config.toml:/etc/klag-exporter/config.toml \ --network your-kafka-network \ ghcr.io/your-org/klag-exporter:latest
Замените your-kafka-network на Docker-сеть, в которой работает ваша Kafka, и обновите bootstrap_servers, чтобы он указывал на ваш брокер или брокеры. 3. Проверьте, работает ли всё
# Проверить состояние сервиса curl http://localhost:8000/health # Проверить готовность сервиса curl http://localhost:8000/ready # Посмотреть метрики curl http://localhost:8000/metrics | grep kafka_consumergroup
Вы должны увидеть примерно такие метрики:
kafka_consumergroup_group_lag{cluster_name="my-cluster",group="my-group",topic="my-topic",partition="0"} 1523 kafka_consumergroup_group_lag_seconds{cluster_name="my-cluster",consumer_group="my-group",topic="my-topic",partition="0"} 12.5
Полный демонстрационный стек Хотите увидеть, как всё работает вместе: Kafka, продюсеры, потребители, Prometheus и Grafana – с готовыми дашбордами? В репозитории есть готовый к запуску демостенд:
git clone https://github.com/softwaremill/klag-exporter cd klag-exporter/test-stack docker-compose up -d
Демостенд включает продюсеров и потребителей, которые генерируют реалистичные паттерны лага, поэтому вы сразу сможете увидеть метрики time lag и offset lag в действии. Подробный разбор конфигурации
[exporter] poll_interval = "30s" # Как часто собирать метрики http_port = 8000 # Порт для endpoint с метриками http_host = "0.0.0.0" # Адрес прослушивания granularity = "partition" # "topic" или "partition" log_level = "info" # debug, info, warn, error [exporter.timestamp_sampling] enabled = true # Рассчитывать временной лаг, рекомендуется cache_ttl = "60s" # Как долго кэшировать timestamp max_concurrent_fetches = 10 # Параллельные запросы timestamp
Компромиссы детализации:
| Гранулярность |
Объём метрик |
Когда использовать |
| partition / партиция |
Высокая кардинальность |
Отладка, малые и средние кластеры |
| topic / топик |
Более низкая кардинальность |
Большие кластеры, когда важно контролировать затраты |
Для большинства конфигураций начните с партиции. При необходимости вы всегда сможете агрегировать данные в Prometheus. 2. Конфигурация кластера
[[clusters]] name = "production" bootstrap_servers = "kafka1:9092,kafka2:9092,kafka3:9092" # Фильтровать группы и топики для мониторинга group_whitelist = ["^prod-.*"] # Regex: только группы, начинающиеся с "prod-" group_blacklist = ["^test-.*", "^__.*"] # Исключить тестовые и внутренние группы topic_whitelist = [".*"] # Все топики topic_blacklist = ["^__.*"] # Исключить внутренние топики # Добавить пользовательские labels к метрикам [clusters.labels] environment = "production" datacenter = "us-east-1"
Паттерны whitelist/blacklist – это регулярные выражения. Они применяются по порядку: сначала whitelist, затем blacklist удаляет совпадения. Способы аутентификации klag-exporter поддерживает все стандартные механизмы аутентификации Kafka через consumer_properties. SASL/PLAIN (имя пользователя/пароль):
[clusters.consumer_properties] "security.protocol" = "SASL_PLAINTEXT" "sasl.mechanism" = "PLAIN" "sasl.username" = "${KAFKA_USER}" "sasl.password" = "${KAFKA_PASSWORD}"
SASL/SCRAM (с TLS):
[clusters.consumer_properties] "security.protocol" = "SASL_SSL" "sasl.mechanism" = "SCRAM-SHA-512" "sasl.username" = "${KAFKA_USER}" "sasl.password" = "${KAFKA_PASSWORD}" "ssl.ca.location" = "/certs/ca.crt"
mTLS (взаимный TLS, без имени пользователя):
[clusters.consumer_properties] "security.protocol" = "SSL" "ssl.ca.location" = "/certs/ca.crt" "ssl.certificate.location" = "/certs/client.crt" "ssl.key.location" = "/certs/client.key"
AWS MSK с IAM:
[clusters.consumer_properties] "security.protocol" = "SASL_SSL" "sasl.mechanism" = "AWS_MSK_IAM" "sasl.jaas.config" = "software.amazon.msk.auth.iam.IAMLoginModule required;"
Подстановка переменных окружения Все значения конфигурации поддерживают синтаксис ${VAR_NAME} – не нужно зашивать секреты в конфигурационный файл.
bootstrap_servers = "${KAFKA_BROKERS}" "sasl.password" = "${KAFKA_PASSWORD}"
Развертывание в Kubernetes В продакшене klag-exporter обычно запускают в Kubernetes. В нашем репозитории на GitHub есть готовые Helm-чарты, с помощью которых можно легко развернуть klag-exporter в вашем кластере. Дашборд Grafana Импорт дашборда Вариант A: скопировать JSON-файл Из репозитория
# Из репозитория cp test-stack/grafana/provisioning/dashboards/kafka-lag.json \ /your/grafana/dashboards/
Вариант B: импортировать через интерфейс Grafana
- Перейдите в Dashboards → Import
- Загрузите kafka-lag.json или вставьте его содержимое
- Выберите ваш источник данных Prometheus
- Нажмите Import
Ключевые панели Включенный дашборд состоит из нескольких разделов: Обзорные показатели:
- Total Consumer Lag – общий лаг потребителей, сумма по всем группам
- Max Time Lag – максимальный временной лаг, задержка в худшем случае
- Exporter Status – статус экспортера, UP/DOWN
- Scrape Duration – длительность сбора метрик, показатель производительности сбора
- Consumer Groups – количество отслеживаемых групп потребителей
Временные ряды:
- Consumer Group Total Lag – динамика общего лага группы потребителей во времени
- Time Lag in Seconds – временной лаг в секундах, главная метрика
- Per-Partition Lag – лаг по партициям для отладки конкретных проблем
Пропускная способность и распределение:
- Partition Latest Offsets – распределение последних смещений по партициям
- Topic Message Rate – скорость записи сообщений в топик во времени
Переменные дашборда В дашборде есть переменные для фильтрации:
| Переменная |
Назначение |
| $cluster |
Фильтр по Kafka-кластеру |
| $consumer_group |
Фильтр по consumer group / группе потребителей |
| $topic |
Фильтр по topic / топику |
Используйте их, чтобы проваливаться глубже при расследовании проблем. Настройка для нескольких кластеров Klag-exporter может отслеживать несколько кластеров Kafka из одного экземпляра. Просто добавьте дополнительные блоки [[clusters]]:
[[clusters]] name = "production-us" bootstrap_servers = "kafka-us.example.com:9092" [clusters.labels] region = "us-east-1" environment = "production" [[clusters]] name = "production-eu" bootstrap_servers = "kafka-eu.example.com:9092" [clusters.labels] region = "eu-west-1" environment = "production" [[clusters]] name = "staging" bootstrap_servers = "kafka-staging.internal:9092" [clusters.labels] environment = "staging"
Каждый кластер получает собственную метку cluster_name в метриках, поэтому в Grafana и алертах можно фильтровать данные по кластеру. Когда использовать один экспортер, а когда несколько:
| Сценарий |
Отставание по offset |
Что это на самом деле означает |
| Топик с высокой пропускной способностью (10 000 сообщений/сек) |
50 000 |
Отставание на 5 секунд |
| Топик с низкой пропускной способностью (10 сообщений/час) |
50 |
Отставание на 5 часов |
Интеграция с OpenTelemetry Klag-exporter также может отправлять метрики через OTEL дополнительно к конечной точке Prometheus или вместо него.
[exporter.otlp] enabled = true endpoint = "http://otel-collector:4317" push_interval = "15s"
Это полезно, если вы используете Datadog, New Relic, Honeycomb или другие системы, которые поддерживают прием данных через OTEL. Тестовый стек включает OpenTelemetry Collector, который принимает эти метрики и экспортирует их одновременно в отладочный вывод, то есть в логи, и в Prometheus endpoint. Устранение неполадок Экспортер не показывает метрики
- Проверьте подключение к Kafka:
# Из контейнера/pod экспортера nc -zv kafka-broker 9092
2. Проверьте логи экспортера:
docker logs klag-exporter # или kubectl logs deployment/klag-exporter
3. Проверьте аутентификацию: Ищите ошибки SASL или SSL в логах. Еще раз проверьте учетные данные и сертификаты. Метрики есть, но групп потребителей нет
- Проверьте паттерны whitelist/blacklist: если у вас строгие паттерны, группы могут быть отфильтрованы.
- Проверьте, что группы потребителей существуют: kafka-consumer-groups --bootstrap-server kafka:9092 --list
- Проверьте, есть ли у потребителей зафиксированные смещения: klag-exporter показывает только группы, у которых есть committed offsets.
Time lag всегда равен 0
- Проверьте, включено ли считывание таймстемпов:
[exporter.timestamp_sampling] enabled = true
2. Проверьте, есть ли лаг у потребителей: если group_lag равен 0, то group_lag_secondsтоже будет 0, потому что потребитель догнал поток и временного лага нет. Grafana показывает «No data»
- Проверьте targets в Prometheus: http://localhost:9090/targets – klag-exporter в статусе UP?
- Проверьте источник данных: Grafana → Configuration → Data Sources → Test
- Проверьте названия метрик: дашборд ожидает конкретные имена метрик. Если вы их изменили, обновите запросы.
Что у вас должно быть к этому моменту Если вы прошлись по этому руководству, у вас должны быть:
- klag-exporter, который собирает метрики offset lag и time lag
- Prometheus, который забирает и хранит эти метрики
- дашборды Grafana, показывающие динамику и распределение лага
- алерты, настроенные под ваши SLA
- понимание граничных случаев: обнаружение compaction и retention
Теперь дежурные инженеры могут реагировать на «потребитель X отстает на 45 секунд», а не на «у потребителя X lag в 50 000 сообщений». Такой алерт гораздо проще превратить в действие. Ключевые выводы
- Offset lag показывает, на сколько сообщений вы отстали; time lag показывает, насколько вы отстали в реальном времени.Важны обе метрики, но ваши SLA измеряются именно во временном лаге.
- Инструменты на основе интерполяции ломаются на простаивающих или низконагруженных топиках.Они могут показывать нулевой лаг, когда реальное отставание уже измеряется часами.
- Прямое считывание таймстемпов точнее, но требует кэширования. Получайте данные только для партиций с лагом и кэшируйте результаты с TTL.
- Настраивайте алерты по time lag, а не по offset lag. Дежурные инженеры скажут спасибо, когда смогут сразу понять критичность ситуации.
Если сегодня вы отслеживаете только offset lag, вы видите картину лишь частично. Добавление time lag на дашборды – самое полезное улучшение, которое можно сделать для мониторинга Kafka. Сообщения никуда не денутся. Но ваши пользователи ждут – и они замечают задержку.
 Если после настройки лагов хочется глубже разобраться не только в мониторинге Kafka, но и в самой потоковой обработке, 13 мая в 20:00 пройдет бесплатный урок «Kafka Streams DSL». На занятии разберут, как строить потоковые приложения на Kafka Streams DSL: читать, фильтровать и трансформировать сообщения, работать с состоянием, роутингом и объединением потоков. Это демо-урок курса «Apache Kafka», так что заодно можно будет посмотреть на формат обучения, познакомиться с преподавателем и задать вопросы по практическим сценариям работы с Kafka. Записаться на урок
-Источник
|
|
|
|