⚠️ Эта страница автоматически переведена, и перевод может быть несовершенным.
blog-post

Интеграция Kafka с Manticore Search: Пошаговое руководство по обработке данных в реальном времени

Введение

Kafka — это популярный брокер сообщений, используемый в широком круге проектов: от обработки логов и управления очередями задач до персонализации контента и аналитики в реальном времени. Например, его можно использовать для индексации изменений в Википедии или поиска товаров в интернет-магазинах. Manticore Search, в свою очередь, поддерживает интеграцию с Kafka, что позволяет автоматически импортировать данные и использовать их для полнотекстового поиска, аналитики, векторного поиска и многого другого.

При импорте данных в Manticore вы можете гибко обрабатывать их:

  • Удалять ненужные поля, добавлять новые или изменять существующие;
  • Вычислять расстояния между геолокациями;
  • Фильтровать данные перед сохранением;
  • Генерировать фрагменты с использованием полнотекстового поиска.

В этой статье мы пройдем через создание небольшого приложения, которое извлекает данные из Kafka и обрабатывает их в Manticore Search. Мы будем использовать Docker Compose для настройки окружения. Это руководство подходит как для начинающих, так и для опытных разработчиков. Полный код и демонстрация доступны на GitHub .


Настройка окружения

Давайте начнем с настройки нашей среды разработки. Мы будем использовать Docker Compose для настройки всего нашего окружения, которое будет включать Kafka, Manticore Search и сервис Kafkacat для потоковой передачи данных. Сначала мы рассмотрим конфигурацию каждого сервиса отдельно, а затем предоставим полный файл docker-compose.yml.

Чтобы сэкономить время, вы можете скачать полный файл docker-compose.yml из нашего репозитория на GitHub и перейти к разделу Запуск настройки , если вы предпочитаете быстро начать.

Настройка Kafka

Давайте начнем с настройки Kafka. Мы будем использовать упрощенную конфигурацию с протоколом KRaft (Kafka Raft), который заменяет ZooKeeper для упрощения архитектуры. Вот часть сервиса Kafka из нашего файла docker-compose.yml:

kafka:
  image: docker.io/bitnami/kafka:3.7
  container_name: kafka
  networks:
    - app-network
  environment:
    # KRaft settings
    - KAFKA_CFG_NODE_ID=0
    - KAFKA_CFG_PROCESS_ROLES=controller,broker
    - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
    # Listeners
    - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
    - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
    - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
    - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT

Настройка Manticore

С настроенной Kafka для обработки нашей потоковой передачи сообщений, нам теперь нужен движок поиска и аналитики для обработки данных. Давайте настроим Manticore Search, используя минимальную, но функциональную конфигурацию:

manticore:
  image: manticoresearch/manticore:7.4.6
  container_name: manticore
  networks:
    - app-network

Запуск окружения

Запустите базовые контейнеры (Kafka и Manticore) с помощью следующей команды:

docker compose up -d

Это запускает сервисы Kafka и Manticore, но пока не сервис Kafkacat (так как он использует ручной профиль). После запуска сервисов создайте тему в Kafka. Мы установим количество разделов на 4 для параллельного чтения данных несколькими потребителями, что улучшает производительность:

docker compose exec kafka kafka-topics.sh \
  --create \
  --topic wikimedia \
  --partitions 4 \
  --bootstrap-server localhost:9092

Подготовка источников данных

Теперь, когда наша инфраструктура запущена и тема готова к получению сообщений, давайте настроим поток данных, который будет передавать контент в реальном времени в Kafka. Для отправки данных в Kafka мы будем использовать поток Wikimedia. Вот конфигурация сервиса Kafkacat с manual профилем (чтобы мы могли запустить его вручную после настройки темы и Manticore):

kafkacat:
  profiles:
    - manual
  image: edenhill/kcat:1.7.1
  container_name: kcat
  tty: true
  entrypoint:
    - '/bin/sh'
    - '-c'
    - "apk update && apk add -f curl && curl -N https://stream.wikimedia.org/v2/stream/recentchange | awk '/^data: /{gsub(/^data: /, \"\"); print}' | kcat -P -b kafka:9092 -t wikimedia"
  networks:
    - app-network

После настройки сервиса Kafkacat с ручным профилем вы можете запустить его, чтобы начать потоковую передачу данных в Kafka:

docker compose --profile manual up -d

Пример полученных данных

Как только поток Wikimedia начнет поступать в Kafka, вы начнете получать сообщения в формате JSON. Давайте рассмотрим типичное сообщение, чтобы понять структуру данных, с которой мы будем работать:

{
  "$schema": "/mediawiki/recentchange/1.0.0",
  "meta": {
    "uri": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
    "request_id": "66d1686b-500e-438c-8985-8c7a28295ae8",
    "id": "345ce42e-3cac-46b7-901e-2c3161f53436",
    "dt": "2024-12-10T16:30:32Z",
    "domain": "es.wikipedia.org",
    "stream": "mediawiki.recentchange",
    "topic": "codfw.mediawiki.recentchange",
    "partition": 0,
    "offset": 1313008266
  },
  "id": 323817817,
  "type": "edit",
  "namespace": 2,
  "title": "Usuario:Davicilio/Taller",
  "title_url": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
  "comment": "/* Uniforme titular */",
  "timestamp": 1733848232,
  "user": "Davicilio",
  "bot": false,
  "notify_url": "https://es.wikipedia.org/w/index.php?diff=164049521&oldid=164010074",
  "minor": false,
  "length": {
    "old": 29666,
    "new": 29691
  },
  "revision": {
    "old": 164010074,
    "new": 164049521
  },
  "server_url": "https://es.wikipedia.org",
  "server_name": "es.wikipedia.org",
  "wiki": "eswiki",
  "parsedcomment": "<span class=\"autocomment\"><a href=\"/wiki/Usuario:Davicilio/Taller#Uniforme_titular\">→<bdi>Uniforme titular</bdi></a></span>"
}

Работа с данными в Manticore

Теперь, когда у нас есть Kafka, получающая данные из потока Wikimedia, давайте настроим Manticore Search для обработки и индексации этих данных для поиска и анализа.

Создание источника данных

Давайте создадим SOURCE, который будет читать данные из Kafka. Мы укажем только те поля, которые нас интересуют — остальные будут игнорироваться. Если поле есть в схеме, но отсутствует в сообщении, оно будет установлено в NULL или оставлено пустым, в зависимости от типа данных:

docker compose exec manticore mysql -e "
CREATE SOURCE wiki_source (
  id bigint,
  schema '\$schema' text,
  meta json,
  type text,
  namespace int,
  title text,
  title_url text,
  comment text,
  \`timestamp\` timestamp,
  user text,
  bot bool,
  minor bool,
  length json,
  server_url text,
  server_name text,
  wiki text
)
type='kafka'
broker_list='kafka:9092'
topic_list='wikimedia'
consumer_group='ms_wikimedia'
num_consumers='1'
batch=200"

Объяснения:

  • CREATE SOURCE - Команда для создания источника данных.
  • (id bigint, schema '$schema' text, …) - Список полей из входящего сообщения, сопоставленных с поддерживаемыми типами данных Manticore ( список типов данных ).
    • Поле $schema - Manticore не позволяет использовать специальные символы в именах полей, поэтому мы используем первичное сопоставление:
      new_name 'original_name' type
      
      • new_name — имя поля, совместимое с Manticore.
      • original_name — оригинальный ключ JSON, который может содержать специальные символы. Используйте \' для экранирования апострофов, если это необходимо.
  • type=kafka - Указывает Kafka в качестве источника данных.
  • broker_list='kafka:9092' — Список брокеров сообщений, разделенных запятыми.
  • topic_list='wikimedia' — Список тем для чтения, разделенных запятыми.
  • consumer_group='ms_wikimedia' — Имя группы потребителей.
  • num_consumers='1' — Количество процессов, обрабатывающих сообщения (обычно соответствует или является кратным количеству разделов темы).
  • batch=200 — Размер пакета для обработки сообщений, влияющий на производительность и настраиваемый индивидуально.

Создание таблицы результатов

Мы создали источник данных для чтения из Kafka, но нам все еще нужно место назначения для этих данных. Давайте создадим таблицу для хранения обработанных сообщений:

Ключевым полем является ID сообщения. Во время передачи данных проблемы, такие как сбои сети, сбои брокера Kafka или простои Manticore, могут привести к дублированию сообщений. Чтобы избежать дубликатов, мы используем уникальный ID: если запись уже существует в таблице, она пропускается.

В дополнение к ID, таблица будет включать такие поля, как type, title, title_url, comment, timestamp, user, bot, minor, length, server_url, server_name, wiki и meta.

docker compose exec manticore mysql -e "create table wiki_results (
  id bigint, 
  \`schema\` text, 
  metadata json, 
  type text, 
  namespace int, 
  title text, 
  title_url text, 
  comment text, 
  \`timestamp\` timestamp, 
  user string, 
  bot bool, 
  minor bool, 
  length_old int, 
  length_new int, 
  length_diff int, 
  server_url text, 
  server_name text, 
  wiki text, 
  received_at timestamp
)"

We've split the length field into length_old and length_new to demonstrate mapping capabilities.

Создание материализованного представления

With both our source (Kafka) and destination (the table) in place, we now need to connect them and define how data should flow between them. This is where a materialized view comes in — it acts as a real-time ETL process that transforms data as it moves from Kafka to our table:

Input JSON keySource key / FunctionDestination field
ididid
$schemaschemaschema
metametametadata
typetypetype
namespacenamespacenamespace
titletitletitle
title_urltitle_urltitle_url
commentcommentcomment
timestamptimestamptimestamp
useruseruser
botbotbot
minorminorminor
length.oldlength.oldlength_old
length.newlength.newlength_new
-integer(length.old) - integer(length.new)length_diff
server_urlserver_urlserver_url
server_nameserver_nameserver_name
wikiwikiwiki
-UTC_TIMESTAMP()received_at

Command to create it:

docker compose exec manticore mysql -e "
CREATE MATERIALIZED VIEW wiki_mva
TO wiki_results AS
SELECT
  id,
  \`schema\`,
  meta AS metadata,
  type,
  namespace,
  title,
  title_url,
  comment,
  \`timestamp\`,
  user,
  bot,
  minor,
  length.old as length_old,
  length.new as length_new,
  integer(length.old) - integer(length.new) as length_diff,
  server_url,
  server_name,
  wiki,
  UTC_TIMESTAMP() as received_at
FROM wiki_source"

Essentially, this is a standard SELECT query, familiar to those who work with MySQL or similar databases:

  • Fields with matching names in the source (SOURCE) and target table are left as-is (id, schema, type, etc.).
  • Fields needing transformation (e.g., meta to metadata) are specified with AS in the format original_name AS new_name.
  • Reserved words like schema and timestamp are enclosed in backticks (`).
  • Nested JSON values are accessed with a dot and AS (e.g., length.new as length_new).
  • Manticore supports a wide range of functions for data processing, from calculations to formatting.
  • Filtering and grouping can be added if needed. We skipped this to keep the example simple, but you could add WHERE MATCH(@title, 'pizza') after FROM wiki_source.

Полная конфигурация Docker Compose

Now that we understand all the components and how they interact, let's recap by looking at the complete docker-compose.yml file. This single file defines our entire environment with all three services (Kafka, Manticore, and Kafkacat) plus the network configuration.

You can either copy the following content or download the готовый docker-compose.yml directly from our GitHub repository:

services:
  kafka:
    image: docker.io/bitnami/kafka:3.7
    container_name: kafka
    networks:
      - app-network
    environment:
      # KRaft settings
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      # Listeners
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
  manticore:
    image: manticoresearch/manticore:7.4.6
    container_name: manticore
    networks:
      - app-network
  kafkacat:
    profiles:
      - manual
    image: edenhill/kcat:1.7.1
    container_name: kcat
    tty: true
    entrypoint:
      - '/bin/sh'
      - '-c'
      - "apk update && apk add -f curl && curl -N https://stream.wikimedia.org/v2/stream/recentchange | awk '/^data: /{gsub(/^data: /, \"\"); print}' | kcat -P -b kafka:9092 -t wikimedia"
    networks:
      - app-network
networks:
  app-network:
    driver: bridge

Запуск настройки

With our environment configured, let's check how the data flows through the system. After saving or downloading the docker-compose.yml file to your project directory and starting the services as described earlier, you can monitor the data ingestion by running SQL queries against Manticore:

docker compose exec manticore mysql -e "SELECT count(*) FROM wiki_results"
+----------+
| count(*) |
+----------+
|     1200 |
+----------+

Wait a few seconds and run it again and you'll get an updated value:

docker compose exec manticore mysql -e "SELECT count(*) FROM wiki_results"
+----------+
| count(*) |
+----------+
|     1400 |
+----------+

A simple query to view the data:

docker compose exec manticore mysql -e "SELECT title, user, timestamp FROM wiki_results LIMIT 5"
+-----------------------------+------------------+------------+
| title                       | user             | timestamp  |
+-----------------------------+------------------+------------+
| Bobbi Gibb                  | BOT-Superzerocool| 1737470127 |
| Angela Alsobrooks           | Tomrtn           | 1737470214 |
| Category:Goldschmidt Stolper| MB-one           | 1737470211 |
| Oklahoma Sooners            | JohnDoe          | 1737470220 |
| File:Kluse - Phoenix.jpg    | WikiBot          | 1737470230 |
+-----------------------------+------------------+------------+

A more complex query with grouping:

docker compose exec manticore mysql -e "SELECT 
    namespace, 
    COUNT(*) as count, 
    AVG(length_diff) as avg_length_change, 
    MAX(timestamp) as latest_edit, 
    title as sample_title 
FROM wiki_results 
WHERE MATCH('wiki') 
    GROUP BY namespace 
HAVING count > 5 
ORDER BY count DESC 
LIMIT 10 
OPTION ranker=sph04"
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
| namespace | count | avg_length_change   | latest_edit | sample_title                                                        |
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
|        14 |   998 |  116196508.99599199 |  1740684056 | Category:Wiki For Minorities in the Middle East 2025                |
|         0 |   634 | 3075575718.85488939 |  1740684057 | Oklahoma Sooners men's basketball                                   |
|         6 |   313 |   2758109067.434505 |  1740684056 | File:Kluse - Phoenix dactylifera 03 ies.jpg                         |
|         2 |    40 |   1825360728.625000 |  1740684053 | User:SD2125!                                                        |
|         4 |    21 | 3272355882.52380943 |  1740684051 | Commons:Wiki For Minorities in the Middle East                      |
|         3 |    16 |   3489659770.625000 |  1740684054 | Brugerdiskussion:Silas Nicolaisen                                   |
|         1 |    13 |   3634202801.230769 |  1740684045 | Diskussion:Nordische Skiweltmeisterschaften 2025                    |
|      1198 |    10 |   1288490146.500000 |  1740684053 | Translations:Commons:Wiki Loves Folklore 2025/Page display title/id |
|        10 |     8 |   3221223681.500000 |  1740684055 | Predefinição:Mana (série)                                           |
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+

Изменение схемы источника данных

If you need to modify the data source schema (e.g., add new fields, remove unnecessary ones, or change data types), follow these steps:

  1. Приостановите материализованное представление
    First, pause the materialized view to stop the data flow from Kafka to the wiki_results table:
    docker compose exec manticore mysql -e "ALTER MATERIALIZED VIEW wiki_mva suspended=1"
    
  2. Удалите существующий источник
    Delete the current data source:
    docker compose exec manticore mysql -e "DROP SOURCE wiki_source"
    
  3. Создайте новый источник с обновленной схемой
    Create a new source with the modified schema. For example, to add the domain field from the meta JSON object, field parsedcomment and change the namespace type to bigint:
    docker compose exec manticore mysql -e "CREATE SOURCE wiki_source (
      id bigint,
      schema '$schema' text,
      meta json,
      parsedcomment text,
      type text,
      namespace bigint,
      title text,
      title_url text,
      comment text,
      \`timestamp\` timestamp,
      user text,
      bot bool,
      minor bool,
      length json,
      server_url text,
      server_name text,
      wiki text
    )
    type='kafka'
    broker_list='kafka:9092'
    topic_list='wikimedia'
    consumer_group='ms_wikimedia'
    num_consumers='1'
    batch=200"
    
  4. Обновите таблицу (add the domain and parsedcomment columns):
    docker compose exec manticore mysql -e "ALTER TABLE wiki_results ADD COLUMN domain text;
    ALTER TABLE wiki_results ADD COLUMN parsedcomment text"
    
  5. Удалите материализованное представление:
    docker compose exec manticore mysql -e "DROP MV wiki_mva"
    
  6. Воссоздайте материализованный вид:
    docker compose exec manticore mysql -e "CREATE MATERIALIZED VIEW wiki_mva
    TO wiki_results AS
    SELECT
      id,
      \`schema\`,
      meta AS metadata,
      meta.domain as domain,
      parsedcomment,
      type,
      namespace,
      title,
      title_url,
      comment,
      \`timestamp\`,
      user,
      bot,
      minor,
      length.old as length_old,
      length.new as length_new,
      integer(length.old) - integer(length.new) as length_diff,
      server_url,
      server_name,
      wiki,
      UTC_TIMESTAMP() as received_at
    FROM wiki_source"
    

Если вы только что воссоздали SOURCE, не изменяя MV, продолжите чтение данных с:

docker compose exec manticore mysql -e "ALTER MATERIALIZED VIEW wiki_mva suspended=0"

В противном случае материализованный вид уже должен быть возобновлен.

Этот процесс дает вам полный контроль над изменениями схемы и позволяет гибко адаптироваться к новым требованиям.

Заключение

Интеграция Kafka с Manticore Search предлагает мощное решение для обработки данных в реальном времени и аналитики. Следуя этому руководству, вы настроили надежную среду с использованием Docker Compose, сконфигурировали Kafka для обработки потоковой передачи сообщений и использовали Manticore Search для индексации и запроса данных. Эта интеграция не только улучшает функциональность вашего приложения, но и упрощает управление данными и их анализ.
Независимо от того, работаете ли вы над анализом логов, индексацией контента или любым другим приложением, основанным на данных, эта настройка предоставляет масштабируемую и эффективную структуру. Гибкость Manticore Search позволяет вам адаптировать конвейер обработки данных под ваши конкретные нужды, обеспечивая быструю адаптацию к изменяющимся требованиям.
Мы призываем вас поэкспериментировать с этой настройкой, исследовать дополнительные функции Manticore Search и адаптировать пример под ваши уникальные случаи использования. Полный код доступен на GitHub , а сообщество Manticore всегда готово помочь с любыми вопросами или проблемами, с которыми вы можете столкнуться. Погружайтесь и раскрывайте весь потенциал обработки данных в реальном времени с Kafka и Manticore Search!

Установить Manticore Search

Установить Manticore Search