Введение
Kafka — это популярный брокер сообщений, который используется в самых разных проектах: от обработки логов и управления очередями задач до персонализации контента и аналитики в реальном времени. Например, с его помощью можно индексировать изменения в Wikipedia или искать товары в интернет-магазинах. Manticore Search, в свою очередь, поддерживает интеграцию с Kafka, что позволяет автоматически импортировать данные и применять их для полнотекстового поиска, аналитики, векторного поиска и других задач.
При импорте данных в Manticore можно гибко их обрабатывать:
- Удалять ненужные поля, добавлять новые или изменять существующие;
- Рассчитывать расстояние между геопозициями;
- Фильтровать данные перед сохранением;
- Создавать сниппеты с помощью полнотекстового поиска.
В этой статье мы разберём, как построить небольшое приложение, которое получает данные из Kafka и обрабатывает их в Manticore Search. Для настройки окружения будем использовать Docker Compose. Руководство подойдёт как новичкам, так и опытным разработчикам. Готовый код и демо доступны на GitHub .
Подготовка окружения
Давайте начнем с настройки нашего окружения разработки. Мы будем использовать Docker Compose для создания всей инфраструктуры, включающей Kafka, Manticore Search и сервис Kafkacat для потоковой передачи данных. Сначала рассмотрим конфигурацию каждого сервиса отдельно, а затем предоставим полный файл docker-compose.yml
.
Чтобы сэкономить время, вы можете скачать готовый файл docker-compose.yml из нашего репозитория на GitHub и сразу перейти к разделу Запуск окружения , если хотите начать быстрее.
Настройка Kafka
Начнём с развёртывания Kafka. Мы используем упрощённую конфигурацию с протоколом KRaft (Kafka Raft), который заменяет ZooKeeper для упрощения архитектуры. Вот часть файла docker-compose.yml
, относящаяся к сервису Kafka:
kafka:
image: docker.io/bitnami/kafka:3.7
container_name: kafka
networks:
- app-network
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
Настройка Manticore
Теперь, когда Kafka настроен для обработки потока сообщений, нам нужен движок поиска и аналитики для обработки данных. Настроим Manticore Search с минимальной, но функциональной конфигурацией:
manticore:
image: manticoresearch/manticore:7.4.6
container_name: manticore
networks:
- app-network
Запуск окружения
Запускаем базовые контейнеры (Kafka и Manticore) с помощью команды:
docker compose up -d
Это запустит сервисы Kafka и Manticore, но пока не запустит Kafkacat (так как он использует профиль manual
). После запуска сервисов создаём топик в Kafka. Количество партиций (в нашем случае 4) задаём для параллельного чтения данных несколькими потребителями, что повышает производительность:
docker compose exec kafka kafka-topics.sh \
--create \
--topic wikimedia \
--partitions 4 \
--bootstrap-server localhost:9092
Подготовка источников данных
Теперь, когда инфраструктура запущена и топик готов принимать сообщения, настроим поток данных, который будет отправлять контент в Kafka в реальном времени. Для отправки данных в Kafka используем поток Wikimedia Stream. Настроим Kafkacat с профилем manual
, чтобы запустить его вручную после настройки топика и Manticore:
kafkacat:
profiles:
- manual
image: edenhill/kcat:1.7.1
container_name: kcat
tty: true
entrypoint:
- '/bin/sh'
- '-c'
- "apk update && apk add -f curl && curl -N https://stream.wikimedia.org/v2/stream/recentchange | awk '/^data: /{gsub(/^data: /, \"\"); print}' | kcat -P -b kafka:9092 -t wikimedia"
networks:
- app-network
После настройки сервиса Kafkacat с профилем manual
, вы можете запустить его для начала передачи данных в Kafka:
docker compose --profile manual up -d
Пример получаемых данных
Как только поток Wikimedia начнёт поступать в Kafka, вы начнёте получать сообщения в формате JSON. Давайте рассмотрим типичное сообщение, чтобы понять структуру данных, с которой мы будем работать:
{
"$schema": "/mediawiki/recentchange/1.0.0",
"meta": {
"uri": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
"request_id": "66d1686b-500e-438c-8985-8c7a28295ae8",
"id": "345ce42e-3cac-46b7-901e-2c3161f53436",
"dt": "2024-12-10T16:30:32Z",
"domain": "es.wikipedia.org",
"stream": "mediawiki.recentchange",
"topic": "codfw.mediawiki.recentchange",
"partition": 0,
"offset": 1313008266
},
"id": 323817817,
"type": "edit",
"namespace": 2,
"title": "Usuario:Davicilio/Taller",
"title_url": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
"comment": "/* Uniforme titular */",
"timestamp": 1733848232,
"user": "Davicilio",
"bot": false,
"notify_url": "https://es.wikipedia.org/w/index.php?diff=164049521&oldid=164010074",
"minor": false,
"length": {
"old": 29666,
"new": 29691
},
"revision": {
"old": 164010074,
"new": 164049521
},
"server_url": "https://es.wikipedia.org",
"server_name": "es.wikipedia.org",
"wiki": "eswiki",
"parsedcomment": "<span class=\"autocomment\"><a href=\"/wiki/Usuario:Davicilio/Taller#Uniforme_titular\">→<bdi>Uniforme titular</bdi></a></span>"
}
Работа с данными в Manticore
Теперь, когда Kafka получает данные из потока Wikimedia, настроим Manticore Search для обработки и индексации этих данных для поиска и анализа.
Создание источника данных
Создаём источник (SOURCE
), который будет читать данные из Kafka. Указываем только те поля, которые нас интересуют — остальные будут проигнорированы. Если поле есть в схеме, но отсутствует в сообщении, оно получит значение NULL
или останется пустым в зависимости от типа данных:
docker compose exec manticore mysql -e "
CREATE SOURCE wiki_source (
id bigint,
schema '\$schema' text,
meta json,
type text,
namespace int,
title text,
title_url text,
comment text,
\`timestamp\` timestamp,
user text,
bot bool,
minor bool,
length json,
server_url text,
server_name text,
wiki text
)
type='kafka'
broker_list='kafka:9092'
topic_list='wikimedia'
consumer_group='ms_wikimedia'
num_consumers='1'
batch=200"
Пояснения:
CREATE SOURCE
- команда создания источника данных (source)(id bigint, schema '$schema' text, …)
- перечисление полей входящего сообщения в соответствии с типами данных, поддерживаемыми Manticore ( список типов данных ).- Поле
$schema
— в Manticore нельзя использовать специальные символы в именах полей, поэтому применяется первичный маппинг:new_name 'original_name' type
new_name
— совместимое с Manticore имя поля.original_name
— исходный ключ входящего JSON, который может включать специальные символы. Если требуется использовать апостроф в имени, его следует экранировать\'
.
- Поле
type = kafka
- указывает, что источником данных является Kafka.broker_list='kafka:9092'
— список брокеров сообщений, разделённых запятой.topic_list='wikimedia'
— список топиков для чтения, также разделённых запятыми.consumer_group='ms_wikimedia'
— группа потребителей (consumer group).num_consumers='1'
— количество процессов, обрабатывающих сообщения. Обычно равно количеству партиций топика или кратно ему.batch=200
— размер пакета сообщений для обработки, который влияет на скорость обработки и настраивается индивидуально.
Создание таблицы результатов
Мы создали источник данных для чтения из Kafka, но нам всё ещё нужно место назначения для этих данных. Создадим таблицу, в которую будут попадать обработанные сообщения:
Одним из ключевых полей является ID
сообщения. В процессе передачи данных может произойти что угодно: сбой сети, падение брокера Kafka или самого Manticore. В таких случаях возможно получение дублирующихся сообщений. Чтобы избежать дубликатов, используется уникальный ID
: если запись уже существует в таблице, она просто пропускается.
Помимо ID
, в таблице будут присутствовать следующие поля: type
, title
, title_url
, comment
, timestamp
, user
, bot
, minor
, length
, server_url
, server_name
, wiki
и meta
.
docker compose exec manticore mysql -e "create table wiki_results (
id bigint,
\`schema\` text,
metadata json,
type text,
namespace int,
title text,
title_url text,
comment text,
\`timestamp\` timestamp,
user string,
bot bool,
minor bool,
length_old int,
length_new int,
length_diff int,
server_url text,
server_name text,
wiki text,
received_at timestamp
)"
Поле length
мы разделили на length_old
и length_new
для демонстрации возможностей маппинга.
Создание материализованного представления
Теперь у нас есть источник (Kafka) и место назначения (таблица). Свяжем их и определим, как данные будут перетекать из одного в другое. Материализованное представление связывает источник и таблицу, выполняя преобразования данных в реальном времени — это аналог ETL-процесса, встроенный в Manticore. Вот как это выглядит:
Входной ключ JSON | Ключ источника / Функция | Целевое поле |
---|---|---|
id | id | id |
$schema | schema | schema |
meta | meta | metadata |
type | type | type |
namespace | namespace | namespace |
title | title | title |
title_url | title_url | title_url |
comment | comment | comment |
timestamp | timestamp | timestamp |
user | user | user |
bot | bot | bot |
minor | minor | minor |
length.old | length.old | length_old |
length.new | length.new | length_new |
- | integer(length.old) - integer(length.new) | length_diff |
server_url | server_url | server_url |
server_name | server_name | server_name |
wiki | wiki | wiki |
- | UTC_TIMESTAMP() | received_at |
Команда для создания:
docker compose exec manticore mysql -e "
CREATE MATERIALIZED VIEW wiki_mva
TO wiki_results AS
SELECT
id,
\`schema\`,
meta AS metadata,
type,
namespace,
title,
title_url,
comment,
\`timestamp\`,
user,
bot,
minor,
length.old as length_old,
length.new as length_new,
integer(length.old) - integer(length.new) as length_diff,
server_url,
server_name,
wiki,
UTC_TIMESTAMP() as received_at
FROM wiki_source"
По сути, это обычный запрос SELECT
, знакомый тем, кто работает с MySQL или подобными базами данных:
- Поля, названия которых совпадают в источнике (
SOURCE
) и итоговой таблице, оставляем как есть (id
,schema
,type
и т.д.). - Поля, которые нужно преобразовать (например,
meta
вmetadata
), указываем через операторAS
в форматеисходное_имя AS новое_имя
. - Для зарезервированных слов вроде
schema
иtimestamp
используем обратные кавычки (`). - Дочерние значения JSON выбираем с помощью точки и
AS
(например,length.new
aslength_new
). - Manticore позволяет применять широкий набор функций для обработки данных — от вычислений до форматирования.
- При необходимости можно добавить фильтрацию и группировку. Мы этого не сделали, чтобы не усложнять пример, но, например, после
FROM wiki_source
можно вставитьWHERE MATCH(@title, 'pizza')
.
Полная конфигурация Docker Compose
Теперь, когда мы разобрались со всеми компонентами и их взаимодействием, давайте подведём итог, рассмотрев полный файл docker-compose.yml
. Этот единый файл определяет всё наше окружение с тремя сервисами (Kafka, Manticore и Kafkacat) и конфигурацией сети.
Вы можете скопировать следующее содержимое или скачать готовый файл docker-compose.yml напрямую из нашего репозитория на GitHub:
services:
kafka:
image: docker.io/bitnami/kafka:3.7
container_name: kafka
networks:
- app-network
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
manticore:
image: manticoresearch/manticore:7.4.6
container_name: manticore
networks:
- app-network
kafkacat:
profiles:
- manual
image: edenhill/kcat:1.7.1
container_name: kcat
tty: true
entrypoint:
- '/bin/sh'
- '-c'
- "apk update && apk add -f curl && curl -N https://stream.wikimedia.org/v2/stream/recentchange | awk '/^data: /{gsub(/^data: /, \"\"); print}' | kcat -P -b kafka:9092 -t wikimedia"
networks:
- app-network
networks:
app-network:
driver: bridge
Запуск
Теперь, когда окружение настроено, давайте проверим, как данные проходят через систему. После сохранения или скачивания файла docker-compose.yml
в вашу рабочую директорию и запуска сервисов, как описано выше, вы можете отслеживать поступление данных, выполняя SQL-запросы к Manticore:
docker compose exec manticore mysql -e "SELECT count(*) FROM wiki_results"
+----------+
| count(*) |
+----------+
| 1200 |
+----------+
Подождите несколько секунд и выполните запрос снова — вы увидите обновлённое значение:
docker compose exec manticore mysql -e "SELECT count(*) FROM wiki_results"
+----------+
| count(*) |
+----------+
| 1400 |
+----------+
Пример простого запроса для просмотра данных:
docker compose exec manticore mysql -e "SELECT title, user, timestamp FROM wiki_results LIMIT 5"
+-----------------------------+------------------+------------+
| title | user | timestamp |
+-----------------------------+------------------+------------+
| Bobbi Gibb | BOT-Superzerocool| 1737470127 |
| Angela Alsobrooks | Tomrtn | 1737470214 |
| Category:Goldschmidt Stolper| MB-one | 1737470211 |
| Oklahoma Sooners | JohnDoe | 1737470220 |
| File:Kluse - Phoenix.jpg | WikiBot | 1737470230 |
+-----------------------------+------------------+------------+
Сложный запрос с группировкой:
docker compose exec manticore mysql -e "SELECT
namespace,
COUNT(*) as count,
AVG(length_diff) as avg_length_change,
MAX(timestamp) as latest_edit,
title as sample_title
FROM wiki_results
WHERE MATCH('wiki')
GROUP BY namespace
HAVING count > 5
ORDER BY count DESC
LIMIT 10
OPTION ranker=sph04"
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
| namespace | count | avg_length_change | latest_edit | sample_title |
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
| 14 | 998 | 116196508.99599199 | 1740684056 | Category:Wiki For Minorities in the Middle East 2025 |
| 0 | 634 | 3075575718.85488939 | 1740684057 | Oklahoma Sooners men's basketball |
| 6 | 313 | 2758109067.434505 | 1740684056 | File:Kluse - Phoenix dactylifera 03 ies.jpg |
| 2 | 40 | 1825360728.625000 | 1740684053 | User:SD2125! |
| 4 | 21 | 3272355882.52380943 | 1740684051 | Commons:Wiki For Minorities in the Middle East |
| 3 | 16 | 3489659770.625000 | 1740684054 | Brugerdiskussion:Silas Nicolaisen |
| 1 | 13 | 3634202801.230769 | 1740684045 | Diskussion:Nordische Skiweltmeisterschaften 2025 |
| 1198 | 10 | 1288490146.500000 | 1740684053 | Translations:Commons:Wiki Loves Folklore 2025/Page display title/id |
| 10 | 8 | 3221223681.500000 | 1740684055 | Predefinição:Mana (série) |
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
Изменение схемы источника данных
Если вам нужно изменить схему источника данных (например, добавить новые поля, удалить ненужные или изменить типы данных), это можно сделать следующим образом:
- Приостановка материализованного представления
Сначала нужно приостановить материализованное представление, чтобы остановить поток данных из Kafka в таблицуwiki_results
:docker compose exec manticore mysql -e "ALTER MATERIALIZED VIEW wiki_mva suspended=1"
- Удаление существующего источника
Удалите текущий источник данных:docker compose exec manticore mysql -e "DROP SOURCE wiki_source"
- Создание нового источника с обновленной схемой
Создаём новый источник с изменённой схемой. Например, если вы хотите добавить полеdomain
из JSON-объектаmeta
, полеparsedcomment
и изменить тип данных дляnamespace
наbigint
, команда будет выглядеть так:docker compose exec manticore mysql -e "CREATE SOURCE wiki_source ( id bigint, schema '$schema' text, meta json, parsedcomment text, type text, namespace bigint, title text, title_url text, comment text, \`timestamp\` timestamp, user text, bot bool, minor bool, length json, server_url text, server_name text, wiki text ) type='kafka' broker_list='kafka:9092' topic_list='wikimedia' consumer_group='ms_wikimedia' num_consumers='1' batch=200"
- Обновление таблицы (добавляем
domain
иparsedcomment
):docker compose exec manticore mysql -e "ALTER TABLE wiki_results ADD COLUMN domain text; ALTER TABLE wiki_results ADD COLUMN parsedcomment text"
- Удаление материализованного представления:
docker compose exec manticore mysql -e "DROP MV wiki_mva"
- Обновление представления:
docker compose exec manticore mysql -e "CREATE MATERIALIZED VIEW wiki_mva TO wiki_results AS SELECT id, \`schema\`, meta AS metadata, meta.domain as domain, parsedcomment, type, namespace, title, title_url, comment, \`timestamp\`, user, bot, minor, length.old as length_old, length.new as length_new, integer(length.old) - integer(length.new) as length_diff, server_url, server_name, wiki, UTC_TIMESTAMP() as received_at FROM wiki_source"
Если вы только пересоздали SOURCE
и не меняли MV
, вам требуется запустить чтение данных заново:
docker compose exec manticore mysql -e "ALTER MATERIALIZED VIEW wiki_mva suspended=0"
В противном случае материализованное представление уже будет возобновлено.
Таким образом, процесс изменения схемы полностью контролируем и позволяет гибко адаптировать систему под новые требования.
Заключение
Интеграция Kafka с Manticore Search предлагает мощное решение для обработки и анализа данных в реальном времени. Следуя этому руководству, вы настроили надёжное окружение с помощью Docker Compose, настроили Kafka для обработки потока сообщений и использовали Manticore Search для индексации и выполнения запросов к данным. Эта интеграция не только расширяет функциональность ваших приложений, но и упрощает управление данными и их анализ.
Независимо от того, работаете ли вы над анализом логов, индексацией контента или любым другим приложением, зависящим от данных, эта настройка предоставляет масштабируемую и эффективную основу. Гибкость Manticore Search позволяет адаптировать конвейер обработки данных под ваши конкретные потребности, гарантируя быструю адаптацию к изменяющимся требованиям.
Мы рекомендуем вам поэкспериментировать с этой настройкой, изучить дополнительные возможности Manticore Search и адаптировать пример под ваши уникальные задачи. Полный код доступен на GitHub , а сообщество Manticore всегда готово помочь с любыми вопросами или трудностями, с которыми вы можете столкнуться. Погрузитесь в работу и раскройте весь потенциал обработки данных в реальном времени с Kafka и Manticore Search!