Безошибочная работа с Kafka из Node js. Часть 3 Cтруктура сообщений, когда Kafka не нужна и теряет данные

Страницы:  1

Ответить
 

Professor Seleznov


В предыдущих частях рассматривались аспекты публикации сообщений, ребалансировки консьюмер групп и масштабирования чтения, а также проектирования консьюмеров. Эта часть посвящена базовой структуре сообщений, неподходящим сценариям использования Kafka и гарантиям записи.
Cтруктура сообщений
При проектировании коммуникаций через Kafka нужно заранее позаботиться о двух вещах. Во-первых, о «скелете» сообщения, который заранее позволит решить ряд проблем c трассировкой, логированием, дедупликацией, повторной обработкой и т.д. Во-вторых, о самой структуре передаваемых данных в сообщении и том, как они будут версионироваться. Kafka это statefull система и версионирование данных в ней сложная задача.
Пример «cкелета» сообщения:
await producer.send({
topic: '...',
messages: [
{
key: '...',
value: JSON.stringify({
message_id: randomUUID(),
timestamp: date,
sequence_id: number,
payload_v1: {
// data...
}
}),
headers: {
'x-service-name': string,
'x-request-id': string
}
}
]
});
  • x-service-name – имя сервиса, который опубликовал сообщение. Полезно для трассировки. Например, когда в один топик пишут несколько сервисов или сообщение может пройти путь через несколько топиков прежде, чем достигнуть консьюмера.
  • x-request-id – уникальный id запроса. Необходим для удобного логирования, например, отфильтровать логи, cвязанные c пачкой сообщений или одним конкретным.
  • message_id – уникальный индентификатор сообщения. Позволяет как определить дубли записей, так и защититься от повторной обработки на стороне консьюмера.
  • timestamp или sequence_id – поля, определяющие порядок создания сообщения на продьюсере. Kafka дает гарантии порядка для уже записанных в топик сообщений, но иногда они могут быть записаны в брокер в разном порядке. Например, пользователь дважды обновил свою фотографию в приложении в рамках малого промежутка времени. Первое обновление заняло больше времени (фото долго грузилось в S3), чем второе, как результат в топик записалось сначала второе событие, а потом первое. Пользователь увидит в приложении предпоследнее фото вместо последней загруженной фотографии. Для решения данной проблемы требуется передавать ключ порядка операций:
    • timestamp – время создания сообщения на продьюсере, а не время, когда его отправили в топик (producer.send)
    • sequence_id – уникальный монотонно возрастающий идентификатор. Можно использовать sequence_id, когда timestamp не подходит, например, когда не хочется полагаться на синхронизацию часов у всех инстансов приложения. По аналогии с timestamp приложение должно запрашивать sequence_id в самом начале бизнес операции, а не в момент отправки сообщения в топик (producer.send). Примером sequence_id, может служить autoincrement поле или функция nextval в БД.
  • payload_v* – передаваемые данные. Для того чтобы иметь возможность версионировать сообщения в ситуации, когда невозможно сохранить обратную совместимость структуры, используется постфикс версии. Почему версионирование нельзя реализовывать через создание нового топика, например, topic_v2? Версионирование через топик не позволяет консьюмеру обеспечить порядок обработки сообщений, ведь сообщения разных версий попадут в разные партиции и топики. Кроме того, в случае отката на предыдущую версию возникают проблемы с необработанными сообщениями в topic_v2: их надо переместить в топик v1 (Kafka не имеет встроенного инструмента в отличие от RabbitMQ) и трансформировать к предыдущей версии. В случае версионирования тела сообщения один и тот же консьюмер сможет по условию или фичи флагу обрабатывать разные версии сообщения c гарантией порядка обработки. А при переключении версии не будет возникать проблем с перезалитием сообщений из одного топика в другой.
Когда Kafka НЕ нужна
Встречаются ситуации, когда Kafka применяют как «cеребрянную пулю», но она как и любой другой инструмент имеет свои ограничения. Рассмотрим паттерны использования, где Kafka мало применима или вообще не подходит:
  • Подключение 1000+ клиентов. Кластер Kafka плохо поддерживает подключение тысячи и более клиентов. Например, очень плохая идея подключать тысячи мобильных устройств в качестве продьюсеров или консьюмеров. Рекомендуется поставить proxy/gateway перед Kafka, к которому по HTTP или MQTT будут подключаться устройства для получения или публикации данных.
  • Плохая сеть. Kafka чувствительна к стабильности сетевого соединения между клиентами и брокерами. В условиях плохой сети у консьюмеров будут частые ребалансы, продьюсеры в процессе отправки могут терять сообщения, а брокер может терять данные или быть недоступен для записи. В сочетании с указанным выше пунктом это может стать cмертью для кластера. В условиях плохой сети рекомендуется использовать другие технологии: HTTP, MQTT, UDP и т.д.
  • Реалтайм. Kafka не подходит для систем, где требуется передача данных строго в реальном времени. Очевидно, что сама Kafka, как промежуточное звено добавляет лаг задержки. И к тому же в системах реального времени отдается предпочтение скорости доставки, чем надежности и упорядоченности событий. Например, в трейдинге или игровом шутере требуется получать текущее значение котировки/противников как можно быстрее и чаще, нежели иметь возможность упорядоченно обработать события или заново их перечитать. Для функионала, где требуется реалтайм работа стоит использовать: HTTP/2-3 (SSE или QUICK), WebSocket, HLS, WebRTC, RTMP, MQTT.
  • Репликация данных. Для периодической синхронизации данных между несколькими БД гораздо лучше подойдут снятие и применения дампов, чем передача данных через Kafka. Для перманентной синхронизации с минимальной задержкой следует использовать нативную репликацию самой БД. Обычно это отлаженный механизм, на который можно положиться и который не нуждается в переизобретении. Для синхронизации разных БД (MySQL –> MongoDB) предпочтительней использовать готовые ETL движки, полагающиеся на журнал БД: binlog, wal, oplog и т.д.
  • RPC или request/reply-to. Kafka не подходит в качестве транспорта для RPC взаимодейстия. Ее не проектировали для создания топиков и партиций «на лету» в отличие от того же RabbitMQ. При этом большое их количество отрицательно влияет на производительность всего кластера, ведь это создает нагрузку на ZooKeeper/KRaft. Для RPC взаимодействия следует отдать предпочтение другим инструментам: HTTP webhook, RPC (JSON-RPC, SOAP, gRPC), RabbitMQ, NATS.
    • База данных. ksqlDB дает возможность рассматривать Kafka в качестве БД. Данное решение под капотом использует Kafka Streamsи RocksDB в качестве хранилища данных. Но использование ksqlDB имеет ряд проблем.
      • Отсутствие индексов (вторичных), ведь RocksDB это key-value БД.
      • Сложности с join. ksqlDB стриминговая БД поэтому есть необходимость создавать window (сообщения должны попадать в определенной временной диапазон, например, последние 10 минут). Данные для join должны находиться в одной партиции, иначе их придется перезаливать в новый топик, что создает дополнительную сетевую нагрузку и увеличивает объем хранимых данных на диске.
      • При изменении запросов нужно перепрочитать все данные в топике. При создании нового запроса (CREATE TABLE AS SELECT) или редактировании уже существующего, ksqlDB прочитает весь топик с самого начала. Выполнение запроса на больших топиках может занимать много времени, создавать нагрузку на кластер и потреблять много RAM (ksqlDB читает сообщение целиком, а не отдельные поля).
      • Использование ksqlDBсильно увеличивает объем потребляемого пространства на диске. Во-первых, потребители данных в БД c большой вероятностью попросят увеличить retentionдля топиков. Для того чтобы иметь возможность посчитать актуальные данные для больших window в недавно созданных запросах и иметь возможность перепрочитать сообщения за длительный период. Во-вторых, ksqlDB имеет проблему дублирования при хранении данных. Это происходит из-за таких сущностей как stream, table, changelog topic на которых построена вся обработка данных в ksqlDB.
      Как итог, Kafka в качестве БД является плохой идей. Cтоит использовать реляционные БД такие MySQL/PostgreSQL (индексы и обычные join) или колоночные/timeseries БД (сжатие данных на диске и быстрый доступ к ним), например, ClickHouse/QuestDB.
  • Cложная топология потребителей. Существуют архитектурные сценарии, когда конcьюмерам надо читать лишь определенные типы сообщений. Например, консьюмер хочет получать все события платежей во всех магазинах Москва: moscow.*.payment.* (схема регион.id_магазина.функционал.имя_события). Cложную топологию в рамках Kafka можно организовать следующими способами:
    • Создание множества специализированных топиков под требования консьюмеров. Kafka позволяет консьюмерам подписаться на несколько топиков с помощью регулярного выражение. Однако такой подход создает нагрузку на кластер. ZooKeeper/KRaft испытывает проблемы с тем чтобы управлять тысячами топиков и партиций.
    • Использовать один топик для всех типов сообщений, а консьюмеры отфильтровывают целевые события. Недостаток такого подхода в том, что консьюмерам надо пропускать через себя большое количество ненужных данных.
    Исходя из выше сказанного, для сложной топологии вместо Kafka cтоит выбирать RabbitMQ или Apache Pulsar.
  • Передача больших данных. Kafka испытвает проблемы при передаче больших сообщений (> 1 Mb и тем более > 10Mb). Подробно об этом говорилось в предыдущей части. Данные следует дробить на части или использовать внешние системы для хранения, а в сообщение лишь передавать ссылку на них.
  • DLQ (Dead Letter Queue) и очереди с приоритетами.Подход DLQ можно реализовать в Kafka, но только в случае, когда не важен порядок обработки. В противном случае сообщения из основного топика могут быть обработаны раньше, чем сообщения, которые переместили из него в DLQ. Очереди c приоритетом можно организовать в Kafka, но это довольно трудозатратно и неудобно. И для DLQ и для очередей с приоритетом гораздо правильнее использовать RabbitMQ.
Kafka теряет сообщения
Зачастую считается, что внедрение Kafka в качестве транспорта для межсервисной коммуникации обеспечивает гарантии доставки. Но это не верно, сообщения могут теряться как самим брокером, так и при записи в него самого. И причина тут не в Kafka, ведь вопрос с гарантией доставки есть в любой распределенной системе.
Продьюсер
Сообщения могут быть не получены брокером от продьюсера по следующим причинам:
  • Продьюсер не успел отправить накопленный буффер сообщений по причине перезапуска его или ошибки.
  • Продьюсер не успел вызвать метод .send по аналогичным причинам.
  • Продьюсер не смог отправить сообщения (с учетом retry) в брокер из-за проблем с сетью или недоступности лидер ноды партиции.
Данные проблемы можно решить с помощью transactional outbox и реконсиляцией. Реконсиляция – процесс cинхронизации данных в нескольких сервисах. Например, сервис обработки заказа не просто получает сообщения по Kafka о состоянии платежа, но и периодически опрашивает платежный сервис (например, по HTTP) с целью получения актуального состояния платежей, которые еще находятся в процессе обработки.
Брокер
Сам брокер тоже может терять сообщения. Kafka считает реплики ISR (In-Sync Replicas), если их лаг репликации не больше replica.lag.time.max.ms (по умолчанию, 10 cекунд). Допустим, имеется продьюсер (acks=-1), который пишет в кластер (min.insync.replicas = 2), который состоит из node-1 (лидер), node-2(SR реплика с лагом 3 секунды), node-3 (реплика с лагом в 4 секунды):
pic
Сразу после записи происходит падение node-1(лидер) и node-2 (ISR реплики). Например, из-за перезагрузки хост машин, на которых были запущены ноды. Если затем node-3 будет выбрана в качестве лидера (unclean.leader.election.enable=true), то сообщения, которые были записаны на node-1/2 будут потеряны. Ведьnode-3 не входила в ISR, и часть данных могла не отреплицироваться. Это может происходить как по причине человеческой ошибки, так и из-за срочной необходимости вернуть кластеру работоспособность. И тут все логично, ведь данная реплика не являлась ISR и значит не обеспечивала гарантии записи.
Но даже в случае успешного восстановления node-1/2, последние записи на них все равно могут быть утрачены.
pic
Как это возможно? Это связано с тем как реализована запись данных в Kafka. В начале сообщения записываются в кэш файловой системы (os page cache) на все ISR реплики. После чего продьюсеру отправляется подтверждение записи. Затем через определенный интервал времени брокер делает fsync для кэша на диск. Данная стратегия записи обеспечивает высокую пропускную способность, но ценой потери надежности. Предполагается, что все min.insync.replicas одномоментно не выйдут из строя. Да, можно заставить Kafka писать каждое сообщение на диск (log.flush.interval.messages=1 || log.flush.interval.ms=0), но это крайне плохо скажется на пропускной способности. На основании вышеописанного сценария можно сделать вывод, что хосты машин лидера и ISR реплик должны иметь разные жесткие диски и раздельное электропитание. Иначе, в случае перезагрузки по питанию или выхода из строя серверной стойки произойдет падение сразу всех ключевых нод кластера. И получается, что acks=-1 эфемерен, ведь на уровне железа гарантии записи всего лишь acks=1 (подтверждение записи от одной ноды). Защититься от потери сообщений на уровне брокера можно с помощью реконсиляции. В данной ситуации transactional outbox бесполезен, так как на стороне продьюсера проблемы не будут наблюдаться.-На этом цикл завершен. Благодарю за прочтение. Желаю по меньше обкафкаться в продакшене 😅 при работе с Kafka.
Предыдущие части: Ставь cтрелку вверх, если нашел что-то полезное для себя 🤓-Источник
 
Loading...
Error