Arhn - архитектура программирования

Spring Integration Kafka медленная обработка сообщений при использовании KafkaNativeOffsetManager

Мы успешно использовали SI Kafka для нового проекта. До недавнего переключения мы использовали KafkaTopicOffsetManager для управления смещением темы наших потребителей. Чтобы не создавать дополнительных тем для каждой пары потребитель / тема и использовать мониторинг Burrow или отставания, мы решили использовать последнюю версию KafkaNativeOffsetManager, которая использует встроенное управление смещением, предоставляемое Kafka. Однако после переключения мы заметили, что потребление сообщений из целевой темы постоянно отставало. Мы знаем, что этого не произошло с KafkaTopicOffsetManager, поскольку мы использовали его в течение нескольких месяцев до перехода. Мы также провели параллельные тесты и подтвердили, что потребление сообщений происходило почти в реальном времени с созданием сообщений при использовании KafkaTopicOffsetManager, а KafkaNativeOffsetManager всегда все больше отставал. Оба менеджера смещения используют конфигурацию по умолчанию и оба фиксируют смещения после обработки сообщения (автоматическое подтверждение).

Итак, у меня действительно есть два вопроса, первый не является основным в этой публикации SO.

Первый вопрос: почему собственное управление смещением работает медленнее, чем использование темы для управления смещением?

Второй вопрос: можем ли мы настроить SI kafka так, чтобы не фиксировать смещения при успешной обработке каждого сообщения, а предоставлять другую стратегию? Мы думали, что, возможно, нам не следует делать смещения так часто, а делать их либо как пакетное обновление. Например, зафиксируйте смещения после успешной обработки 25 сообщений или через 30 секунд.

Спасибо


Ответы:


1

При отключении автоматической фиксации и получении заголовка подтверждения единственное, что вам нужно сделать, это вызвать acknowledge() после обработки вашего сообщения. Это предполагает, что даже если вы обрабатываете сообщение в другом потоке, вы сохраните ссылку на экземпляр Acknowledgment либо как таковой, либо как часть исходного Message - или вы копируете заголовки, если выполняете преобразования. Но вызов должен выполняться вашим кодом.

Во-вторых, проблема с производительностью - это вызвано тем, что реализация KafkaNativeOffsetManager делает блокирующий, относительно более дорогой вызов брокерам (по сравнению с простой отправкой сообщения в сжатую тему, как это делает KafkaTopicOffsetManager. В общем, выполнение обновлений после каждое сообщение стоит дорого, и в Spring XD мы смягчаем это, используя https://github.com/spring-projects/spring-xd/blob/master/extensions/spring-xd-extension-kafka/src/main/java/org/springframework/integration/x/kafka/WindowingOffsetManager.java, что снижает количество эффективных операций записи. Полагаю, мы могли бы сделать что-то подобное для Spring Integration.

Для сравнения: 100000 обновлений завершаются за 9,8 с с KafkaNativeOffsetManager и за 0,382 с с KafkaTopicOffsetManager, как показано https://github.com/mbogoevici/spring-integration-kafka/blob/perftest/src/test/java/org/springframework/integration/kafka/performance/OffsetManagerPerformanceTests.java (результаты собраны на моем компьютере). Результаты могут быть как-то искажены, но все же указывают на большую разницу. Трассировка в YourKit подтверждает результат.

09.02.2016
  • Спасибо. см. мои комментарии к другому ответу. Мы закончили тем, что отключили автоматическую фиксацию смещений и обработали смещение вниз по потоку интеграции, фиксируя только каждое 10-е смещение или последнее смещение после 1 секунды бездействия. однако здесь есть свои проблемы. например, если у вас есть поток событий, и ваш поток обрабатывает только 10% сообщений и отбрасывает остальные, тогда компонент нисходящего потока не получает подтверждение, и тогда смещение не фиксируется. Я уверен, что это можно решить с помощью дополнительной симмантики потока, но мы можем просто снова включить автоматическую фиксацию и фиксировать только каждое 10-е смещение. 30.03.2016
  • Я посмотрю на WindowingOffsetManager и посмотрю, соответствует ли это нашим потребностям. Спасибо. 30.03.2016
  • отметив это как ответ на запись, потому что он говорит о различиях в производительности, способах фиксации смещений позже, а также о стратегии работы с окнами, которая по концепции аналогична той, которую мы в конечном итоге выбрали. Скорее всего, мы перейдем к использованию диспетчера смещения окон или чего-то подобного. было бы хорошо, если бы это было доступно прямо в SI Kafka 30.03.2016

  • 2

    Не уверен, в чем проблема с KafkaNativeOffsetManager, было бы здорово, если бы вы поделились некоторым исследованием этого вопроса, узким местом в нашем коде в JIRA.

    Для фиксации отложенного смещения я могу предложить autoCommitOffset = false на KafkaMessageDrivenChannelAdapter. После этого отправленное в channel сообщение будет дополнено заголовком KafkaHeaders.ACKNOWLEDGMENT перед DefaultAcknowledgment. Он действительно отвечает на ваш запрос:

    /**
     * Invoked when the message for which the acknowledgment has been created has been processed.
     * Calling this method implies that all the previous messages in the partition have been processed already.
     */
    void acknowledge();
    
    30.12.2015
  • сначала мы думали о создании теста с двумя потоками, один с использованием KafkaTopicOffsetManager, а другой - KafkaNativeOffsetManager, который будет пытаться записывать смещения как можно быстрее примерно для 500000 итераций и посмотреть, какой из них будет лучше. Мы думаем, что это не обязательно с кодом Spring, поэтому, возможно, стоит попробовать и собственный клиент Kafka. Вопрос был больше, если такое поведение было замечено и объяснено. 31.12.2015
  • во-вторых, мы знаем об отключении автоматической фиксации, но что мне не ясно, так это то, как смещение затем фиксируется. Должно ли быть что-то в потоке сообщений, которое должно получить сообщение и определить, следует ли зафиксировать смещение (например, kafka | handler | handler | ... | handler (commit kafka offset) | output handler)? 31.12.2015
  • поэтому мы в конечном итоге протестировали это и убедились, что нативный был намного медленнее, чем основанный на теме. Покопавшись, мы подумали, что двух офсетных менеджеров нельзя сравнивать. Причина в том, что менеджер смещения темы смещает асинхронно и не ждет ответа перед возвратом управления. возможно, что пул потоков выполняет резервное копирование асинхронной записи. тогда это могло бы выглядеть так, как будто один выполнял другой, но на самом деле возможно, что смещения записывались с одинаковой скоростью. 30.03.2016
  • Это наводит нас на мысль, что использование тематического управления смещением небезопасно. поскольку обработка сообщений не была взаимно однозначной с фиксацией смещения этих сообщений. Это могло быть неправильно, но мы все равно пошли другим путем. Мысли? 30.03.2016
  • Вот в чем суть: не фиксируйте смещения для каждого сообщения, а делайте это периодически для некоторого диапазона. В новом проекте Spring Kafka мы полностью основаны на собственном коммите смещения и предоставляем такие стратегии, как: docs.spring.io/spring-kafka/docs/1.0.0.M1/reference/htmlsingle/. Где BATCH-осведомленные коммиты - лучший выбор. С Kafka вы все равно не теряете сообщения. Вы можете думать только об идемпотентном приемнике: docs.spring.io/spring-integration/reference/html/ не обрабатывайте одно и то же сообщение дважды. 30.03.2016
  • есть ли относительная целевая дата выпуска для новых spring-kafka и spring-integration-kafka? 19.04.2016
  • См. Это: spring.io/blog/2016/04/11/. Думаю где-то в середине мая все мы увидим GA! 19.04.2016
  • Новые материалы

    Коллекции публикаций по глубокому обучению
    Последние пару месяцев я создавал коллекции последних академических публикаций по различным подполям глубокого обучения в моем блоге https://amundtveit.com - эта публикация дает обзор 25..

    Представляем: Pepita
    Фреймворк JavaScript с открытым исходным кодом Я знаю, что недостатка в фреймворках JavaScript нет. Но я просто не мог остановиться. Я хотел написать что-то сам, со своими собственными..

    Советы по коду Laravel #2
    1-) Найти // You can specify the columns you need // in when you use the find method on a model User::find(‘id’, [‘email’,’name’]); // You can increment or decrement // a field in..

    Работа с временными рядами спутниковых изображений, часть 3 (аналитика данных)
    Анализ временных рядов спутниковых изображений для данных наблюдений за большой Землей (arXiv) Автор: Рольф Симоэс , Жильберто Камара , Жильберто Кейрос , Фелипе Соуза , Педро Р. Андраде ,..

    3 способа решить квадратное уравнение (3-й мой любимый) -
    1. Методом факторизации — 2. Используя квадратичную формулу — 3. Заполнив квадрат — Давайте поймем это, решив это простое уравнение: Мы пытаемся сделать LHS,..

    Создание VR-миров с A-Frame
    Виртуальная реальность (и дополненная реальность) стали главными модными терминами в образовательных технологиях. С недорогими VR-гарнитурами, такими как Google Cardboard , и использованием..

    Демистификация рекурсии
    КОДЕКС Демистификация рекурсии Упрощенная концепция ошеломляющей О чем весь этот шум? Рекурсия, кажется, единственная тема, от которой у каждого начинающего студента-информатика..