# Integrating Kafka with Manticore Search: A Step-by-Step Guide to Real-Time Data Processing

Learn how to set up Kafka and Manticore Search integration for real-time data processing. A step-by-step guide with examples: from configuring Docker Compose to creating materialized views and analyzing data from the Wikimedia Stream.

## Введение

Kafka — популярный брокер сообщений, используемый в самых разных проектах: от обработки журналов и управления очередями задач до персонализации контента и аналитики в реальном времени. Например, его можно использовать для индексации изменений в Википедии или поиска товаров в онлайн‑магазинах. Manticore Search, в свою очередь, поддерживает интеграцию с Kafka, позволяя автоматически импортировать данные и использовать их для полнотекстового поиска, аналитики, векторного поиска и многого другого.

При импорте данных в Manticore вы можете гибко их обрабатывать:

- Удалять ненужные поля, добавлять новые или изменять существующие;
- Вычислять расстояния между геолокациями;
- Фильтровать данные перед сохранением;
- Генерировать сниппеты с помощью полнотекстового поиска.

В этой статье мы пошагово создадим небольшое приложение, которое получает данные из Kafka и обрабатывает их в Manticore Search. Мы используем Docker Compose для настройки окружения. Это руководство подходит как для начинающих, так и для опытных разработчиков. Полный код и демонстрация доступны на [GitHub](http://github.com/manticoresoftware/kafka-demo).

---

## Настройка окружения

Начнём с настройки нашей среды разработки. Мы будем использовать Docker Compose для создания полного окружения, которое будет включать Kafka, Manticore Search и сервис Kafkacat для потоковой передачи данных. Сначала рассмотрим конфигурацию каждого сервиса отдельно, а затем предоставим полный файл `docker-compose.yml`.

Чтобы сэкономить время, вы можете скачать [полный файл docker-compose.yml](https://github.com/manticoresoftware/kafka-demo/blob/main/docker-compose.yml) из нашего репозитория на GitHub и перейти к разделу [Запуск настройки](#running-the-setup), если хотите быстро приступить к работе.

### Настройка Kafka

Начнём с настройки Kafka. Мы используем упрощённую конфигурацию с протоколом KRaft (Kafka Raft), который заменяет ZooKeeper и упрощает архитектуру. Ниже приведена часть сервиса Kafka из нашего файла docker-compose.yml:

```yaml
kafka:
  image: docker.io/bitnami/kafka:3.7
  container_name: kafka
  networks:
    - app-network
  environment:
    # KRaft settings
    - KAFKA_CFG_NODE_ID=0
    - KAFKA_CFG_PROCESS_ROLES=controller,broker
    - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
    # Listeners
    - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
    - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
    - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
    - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
```

### Настройка Manticore

После настройки Kafka для обработки потоков сообщений нам нужен движок поиска и аналитики для обработки данных. Давайте настроим Manticore Search, используя минимальную, но рабочую конфигурацию:

```yaml
manticore:
  image: manticoresearch/manticore:7.4.6
  container_name: manticore
  networks:
    - app-network
```

### Запуск окружения

Запустите базовые контейнеры (Kafka и Manticore) с помощью следующей команды:

```shell
docker compose up -d
```

Это запускает сервисы Kafka и Manticore, но пока не запускает сервис Kafkacat (поскольку он использует ручной профиль). После того как сервисы работают, создайте топик в Kafka. Мы установим количество разделов равным 4 для параллельного чтения данных несколькими потребителями, что повышает производительность:

```shell
docker compose exec kafka kafka-topics.sh \
  --create \
  --topic wikimedia \
  --partitions 4 \
  --bootstrap-server localhost:9092
```

### Подготовка источников данных

Теперь, когда наша инфраструктура запущена и готова принимать сообщения, настроим поток данных, который будет передавать контент в реальном времени в Kafka. Для отправки данных в Kafka мы будем использовать поток Wikimedia. Ниже приведена конфигурация сервиса Kafkacat с профилем `manual` (чтобы мы могли запустить его вручную после настройки топика и Manticore):

```yaml
kafkacat:
  profiles:
    - manual
  image: edenhill/kcat:1.7.1
  container_name: kcat
  tty: true
  entrypoint:
    - '/bin/sh'
    - '-c'
    - "apk update && apk add -f curl && curl -N https://stream.wikimedia.org/v2/stream/recentchange | awk '/^data: /{gsub(/^data: /, \"\"); print}' | kcat -P -b kafka:9092 -t wikimedia"
  networks:
    - app-network
```

После настройки сервиса Kafkacat с ручным профилем вы можете запустить его, чтобы начать потоковую передачу данных в Kafka:

```shell
docker compose --profile manual up -d
```

### Пример полученных данных

Как только поток Wikimedia начнёт поступать в Kafka, вы начнёте получать сообщения в формате JSON. Давайте рассмотрим типичное сообщение, чтобы понять структуру данных, с которой будем работать:

```json
{
  "$schema": "/mediawiki/recentchange/1.0.0",
  "meta": {
    "uri": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
    "request_id": "66d1686b-500e-438c-8985-8c7a28295ae8",
    "id": "345ce42e-3cac-46b7-901e-2c3161f53436",
    "dt": "2024-12-10T16:30:32Z",
    "domain": "es.wikipedia.org",
    "stream": "mediawiki.recentchange",
    "topic": "codfw.mediawiki.recentchange",
    "partition": 0,
    "offset": 1313008266
  },
  "id": 323817817,
  "type": "edit",
  "namespace": 2,
  "title": "Usuario:Davicilio/Taller",
  "title_url": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
  "comment": "/* Uniforme titular */",
  "timestamp": 1733848232,
  "user": "Davicilio",
  "bot": false,
  "notify_url": "https://es.wikipedia.org/w/index.php?diff=164049521&oldid=164010074",
  "minor": false,
  "length": {
    "old": 29666,
    "new": 29691
  },
  "revision": {
    "old": 164010074,
    "new": 164049521
  },
  "server_url": "https://es.wikipedia.org",
  "server_name": "es.wikipedia.org",
  "wiki": "eswiki",
  "parsedcomment": "<span class=\"autocomment\"><a href=\"/wiki/Usuario:Davicilio/Taller#Uniforme_titular\">→<bdi>Uniforme titular</bdi></a></span>"
}
```

---

## Работа с данными в Manticore

Теперь, когда Kafka получает данные из потока Wikimedia, настроим Manticore Search для обработки и индексации этих данных для поиска и анализа.

### Создание источника данных

Создадим `SOURCE`, который будет считывать данные из Kafka. Мы укажем только те поля, которые нас интересуют — остальные будут игнорироваться. Если поле присутствует в схеме, но отсутствует в сообщении, оно будет установлено в `NULL` или оставлено пустым, в зависимости от типа данных:

```shell
docker compose exec manticore mysql -e "
CREATE SOURCE wiki_source (
  id bigint,
  schema '\$schema' text,
  meta json,
  type text,
  namespace int,
  title text,
  title_url text,
  comment text,
  \`timestamp\` timestamp,
  user text,
  bot bool,
  minor bool,
  length json,
  server_url text,
  server_name text,
  wiki text
)
type='kafka'
broker_list='kafka:9092'
topic_list='wikimedia'
consumer_group='ms_wikimedia'
num_consumers='1'
batch=200"
```

Пояснения:

- `CREATE SOURCE` — команда для создания источника данных.
- `(id bigint, schema '$schema' text, …)` — список полей из входящего сообщения, сопоставленных с поддерживаемыми Manticore типами данных ([список типов данных](https://manual.manticoresearch.com/Creating_a_table/Data_types#Data-types)).
    - **Поле `$schema`** — Manticore не допускает специальные символы в названиях полей, поэтому мы используем первичное сопоставление:
      ```
      new_name 'original_name' type
      ```
        - `new_name` — поле, совместимое с Manticore.
        - `original_name` — оригинальный ключ JSON, который может содержать специальные символы. При необходимости используйте `\'` для экранирования апострофов.
- `type=kafka` — указывает Kafka в качестве источника данных.
- `broker_list='kafka:9092'` — список брокеров сообщений, разделённых запятыми.
- `topic_list='wikimedia'` — список тем для чтения, разделённых запятыми.
- `consumer_group='ms_wikimedia'` — название группы потребителей.
- `num_consumers='1'` — количество процессов, обрабатывающих сообщения (обычно совпадает или является кратным числу разделов темы).
- `batch=200` — размер пакета для обработки сообщений, влияющий на производительность и настраиваемый индивидуально.

### Создание таблицы результатов

Мы создали источник данных для чтения из Kafka, но нам всё ещё нужна цель для этих данных. Создадим таблицу для хранения обработанных сообщений:

Ключевым полем является `ID` сообщения. Во время передачи данных могут возникать проблемы, такие как сбои сети, падения брокеров Kafka или недоступность Manticore, что приводит к дублированию сообщений. Чтобы избежать дубликатов, мы используем уникальный `ID`: если запись уже существует в таблице, она пропускается.

Помимо `ID`, в таблице будут присутствовать поля `type`, `title`, `title_url`, `comment`, `timestamp`, `user`, `bot`, `minor`, `length`, `server_url`, `server_name`, `wiki` и `meta`.

```shell
docker compose exec manticore mysql -e "create table wiki_results (
  id bigint, 
  \`schema\` text, 
  metadata json, 
  type text, 
  namespace int, 
  title text, 
  title_url text, 
  comment text, 
  \`timestamp\` timestamp, 
  user string, 
  bot bool, 
  minor bool, 
  length_old int, 
  length_new int, 
  length_diff int, 
  server_url text, 
  server_name text, 
  wiki text, 
  received_at timestamp
)"
```

Мы разделили поле `length` на `length_old` и `length_new`, чтобы продемонстрировать возможности сопоставления.

### Создание материализованного представления

Имея как источник (Kafka), так и назначение (таблицу), нам теперь нужно соединить их и определить, как данные должны перемещаться между ними. Здесь и пригодится материализованное представление — оно выступает в роли процесса ETL в реальном времени, преобразующего данные при их перемещении из Kafka в нашу таблицу:

| **Входящий JSON‑ключ** | **Ключ источника / Функция**                     | **Поле назначения** |
|--------------------|-----------------------------------------------|-----------------------|
| id                 | id                                            | id                    |
| **$schema**        | **schema**                                    | **schema**            |
| meta               | meta                                          | metadata              |
| type               | type                                          | type                  |
| namespace          | namespace                                     | namespace             |
| title              | title                                         | title                 |
| title_url          | title_url                                     | title_url             |
| comment            | comment                                       | comment               |
| **timestamp**      | **timestamp**                                 | **timestamp**         |
| user               | user                                          | user                  |
| bot                | bot                                           | bot                   |
| minor              | minor                                         | minor                 |
| **length.old**     | **length.old**                                | **length_old**        |
| **length.new**     | **length.new**                                | **length_new**        |
| **-**              | **integer(length.old) - integer(length.new)** | **length_diff**       |
| server_url         | server_url                                    | server_url            |
| server_name        | server_name                                   | server_name           |
| wiki               | wiki                                          | wiki                  |
| **-**              | **UTC_TIMESTAMP()**                           | **received_at**       |

Команда для её создания:

```shell
docker compose exec manticore mysql -e "
CREATE MATERIALIZED VIEW wiki_mva
TO wiki_results AS
SELECT
  id,
  \`schema\`,
  meta AS metadata,
  type,
  namespace,
  title,
  title_url,
  comment,
  \`timestamp\`,
  user,
  bot,
  minor,
  length.old as length_old,
  length.new as length_new,
  integer(length.old) - integer(length.new) as length_diff,
  server_url,
  server_name,
  wiki,
  UTC_TIMESTAMP() as received_at
FROM wiki_source"
```

По сути, это стандартный запрос `SELECT`, знакомый тем, кто работает с MySQL или аналогичными базами данных:

- Поля с совпадающими именами в источнике (`SOURCE`) и целевой таблице оставляются без изменений (`id`, `schema`, `type` и т.д.).
- Поля, требующие преобразования (например, `meta` в `metadata`), указываются с помощью `AS` в формате `original_name AS new_name`.
- Зарезервированные слова, такие как `schema` и `timestamp`, заключаются в обратные кавычки (`).
- Вложенные значения JSON доступны через точку и `AS` (например, `length.new` как `length_new`).
- Manticore поддерживает широкий набор функций для обработки данных, от вычислений до форматирования.
- При необходимости можно добавить фильтрацию и группировку. Мы опустили это, чтобы пример был простым, но вы могли бы добавить `WHERE MATCH(@title, 'pizza')` после `FROM wiki_source`.

## Полная конфигурация Docker Compose

Теперь, когда мы понимаем все компоненты и их взаимодействие, подытожим, посмотрев на полный файл `docker-compose.yml`. Этот единственный файл определяет всю нашу среду с тремя сервисами (Kafka, Manticore и Kafkacat) и конфигурацией сети.

Вы можете либо скопировать приведённое ниже содержимое, либо скачать [готовый docker-compose.yml](https://github.com/manticoresoftware/kafka-demo/blob/main/docker-compose.yml) напрямую из нашего репозитория на GitHub:

```yaml
services:
  kafka:
    image: docker.io/bitnami/kafka:3.7
    container_name: kafka
    networks:
      - app-network
    environment:
      # KRaft settings
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      # Listeners
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
  manticore:
    image: manticoresearch/manticore:7.4.6
    container_name: manticore
    networks:
      - app-network
  kafkacat:
    profiles:
      - manual
    image: edenhill/kcat:1.7.1
    container_name: kcat
    tty: true
    entrypoint:
      - '/bin/sh'
      - '-c'
      - "apk update && apk add -f curl && curl -N https://stream.wikimedia.org/v2/stream/recentchange | awk '/^data: /{gsub(/^data: /, \"\"); print}' | kcat -P -b kafka:9092 -t wikimedia"
    networks:
      - app-network
networks:
  app-network:
    driver: bridge
```

## Запуск настройки

С настроенной средой проверим, как данные проходят через систему. После сохранения или загрузки файла `docker-compose.yml` в каталог проекта и запуска сервисов, как описано ранее, вы можете наблюдать процесс загрузки данных, выполняя SQL‑запросы к Manticore:

```sql
docker compose exec manticore mysql -e "SELECT count(*) FROM wiki_results"
```
```
+----------+
| count(*) |
+----------+
|     1200 |
+----------+
```

Подождите несколько секунд и запустите запрос снова — вы получите обновлённое значение:

```sql
docker compose exec manticore mysql -e "SELECT count(*) FROM wiki_results"
```

```
+----------+
| count(*) |
+----------+
|     1400 |
+----------+
```

Простой запрос для просмотра данных:

```sql
docker compose exec manticore mysql -e "SELECT title, user, timestamp FROM wiki_results LIMIT 5"
```
```
+-----------------------------+------------------+------------+
| title                       | user             | timestamp  |
+-----------------------------+------------------+------------+
| Bobbi Gibb                  | BOT-Superzerocool| 1737470127 |
| Angela Alsobrooks           | Tomrtn           | 1737470214 |
| Category:Goldschmidt Stolper| MB-one           | 1737470211 |
| Oklahoma Sooners            | JohnDoe          | 1737470220 |
| File:Kluse - Phoenix.jpg    | WikiBot          | 1737470230 |
+-----------------------------+------------------+------------+
```

Более сложный запрос с группировкой:

```sql
docker compose exec manticore mysql -e "SELECT 
    namespace, 
    COUNT(*) as count, 
    AVG(length_diff) as avg_length_change, 
    MAX(timestamp) as latest_edit, 
    title as sample_title 
FROM wiki_results 
WHERE MATCH('wiki') 
    GROUP BY namespace 
HAVING count > 5 
ORDER BY count DESC 
LIMIT 10 
OPTION ranker=sph04"
```
```
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
| namespace | count | avg_length_change   | latest_edit | sample_title                                                        |
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
|        14 |   998 |  116196508.99599199 |  1740684056 | Category:Wiki For Minorities in the Middle East 2025                |
|         0 |   634 | 3075575718.85488939 |  1740684057 | Oklahoma Sooners men's basketball                                   |
|         6 |   313 |   2758109067.434505 |  1740684056 | File:Kluse - Phoenix dactylifera 03 ies.jpg                         |
|         2 |    40 |   1825360728.625000 |  1740684053 | User:SD2125!                                                        |
|         4 |    21 | 3272355882.52380943 |  1740684051 | Commons:Wiki For Minorities in the Middle East                      |
|         3 |    16 |   3489659770.625000 |  1740684054 | Brugerdiskussion:Silas Nicolaisen                                   |
|         1 |    13 |   3634202801.230769 |  1740684045 | Diskussion:Nordische Skiweltmeisterschaften 2025                    |
|      1198 |    10 |   1288490146.500000 |  1740684053 | Translations:Commons:Wiki Loves Folklore 2025/Page display title/id |
|        10 |     8 |   3221223681.500000 |  1740684055 | Predefinição:Mana (série)                                           |
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
```

## Изменение схемы источника данных

Если необходимо изменить схему источника данных (например, добавить новые поля, удалить ненужные или изменить типы данных), выполните следующие шаги:

1. **Приостановить материализованное представление**  
   Сначала приостановите материализованное представление, чтобы остановить поток данных из Kafka в таблицу `wiki_results`:
   ```sql
   docker compose exec manticore mysql -e "ALTER MATERIALIZED VIEW wiki_mva suspended=1"
   ```
2. **Удалить существующий источник**  
   Удалите текущий источник данных:
   ```sql
   docker compose exec manticore mysql -e "DROP SOURCE wiki_source"
   ```
3. **Создать новый источник с обновлённой схемой**  
   Создайте новый источник с изменённой схемой. Например, чтобы добавить поле `domain` из JSON‑объекта `meta`, поле `parsedcomment` и изменить тип `namespace` на `bigint`:
   ```sql
   docker compose exec manticore mysql -e "CREATE SOURCE wiki_source (
     id bigint,
     schema '$schema' text,
     meta json,
     parsedcomment text,
     type text,
     namespace bigint,
     title text,
     title_url text,
     comment text,
     \`timestamp\` timestamp,
     user text,
     bot bool,
     minor bool,
     length json,
     server_url text,
     server_name text,
     wiki text
   )
   type='kafka'
   broker_list='kafka:9092'
   topic_list='wikimedia'
   consumer_group='ms_wikimedia'
   num_consumers='1'
   batch=200"
   ```
4. **Обновить таблицу** (добавить столбцы `domain` и `parsedcomment`):
   ```sql
   docker compose exec manticore mysql -e "ALTER TABLE wiki_results ADD COLUMN domain text;
   ALTER TABLE wiki_results ADD COLUMN parsedcomment text"
   ```
5. **Удалить материализованное представление**:
   ```sql
   docker compose exec manticore mysql -e "DROP MV wiki_mva"
   ```
6. **Воссоздать материализованное представление**:
   ```sql
   docker compose exec manticore mysql -e "CREATE MATERIALIZED VIEW wiki_mva
   TO wiki_results AS
   SELECT
     id,
     \`schema\`,
     meta AS metadata,
     meta.domain as domain,
     parsedcomment,
     type,
     namespace,
     title,
     title_url,
     comment,
     \`timestamp\`,
     user,
     bot,
     minor,
     length.old as length_old,
     length.new as length_new,
     integer(length.old) - integer(length.new) as length_diff,
     server_url,
     server_name,
     wiki,
     UTC_TIMESTAMP() as received_at
   FROM wiki_source"
   ```

*Если вы только воссоздали `SOURCE`, не изменяя `MV`, возобновите чтение данных с помощью:*

```sql
docker compose exec manticore mysql -e "ALTER MATERIALIZED VIEW wiki_mva suspended=0"
```

В противном случае материализованное представление уже должно быть возобновлено.

Этот процесс даёт вам полный контроль над изменениями схемы и позволяет гибко адаптироваться к новым требованиям.

### Заключение

Интеграция Kafka с Manticore Search предлагает мощное решение для обработки данных в реальном времени и аналитики. Следуя этому руководству, вы создали надёжную среду с использованием Docker Compose, настроили Kafka для обработки потоков сообщений и использовали Manticore Search для индексации и запросов данных. Эта интеграция не только улучшает функциональность вашего приложения, но и упрощает управление данными и их анализ.
Независимо от того, работаете ли вы над анализом журналов, индексацией контента или любой другой приложением, основанным на данных, эта конфигурация предоставляет масштабируемую и эффективную основу. Гибкость Manticore Search позволяет адаптировать конвейер обработки данных под ваши конкретные потребности, обеспечивая быструю адаптацию к меняющимся требованиям.
Мы призываем вас экспериментировать с этой конфигурацией, изучать дополнительные возможности Manticore Search и адаптировать пример под ваши уникальные сценарии использования. Полный код доступен на [GitHub](https://github.com/manticoresoftware/kafka-demo), а сообщество Manticore всегда готово помочь с любыми вопросами или проблемами, с которыми вы можете столкнуться. Погрузитесь в процесс и раскройте весь потенциал обработки данных в реальном времени с Kafka и Manticore Search!
