Введение
Kafka — это популярный брокер сообщений, используемый в широком круге проектов: от обработки логов и управления очередями задач до персонализации контента и аналитики в реальном времени. Например, его можно использовать для индексации изменений в Википедии или поиска товаров в интернет-магазинах. Manticore Search, в свою очередь, поддерживает интеграцию с Kafka, что позволяет автоматически импортировать данные и использовать их для полнотекстового поиска, аналитики, векторного поиска и многого другого.
При импорте данных в Manticore вы можете гибко обрабатывать их:
- Удалять ненужные поля, добавлять новые или изменять существующие;
- Вычислять расстояния между геолокациями;
- Фильтровать данные перед сохранением;
- Генерировать фрагменты с использованием полнотекстового поиска.
В этой статье мы пройдем через создание небольшого приложения, которое извлекает данные из Kafka и обрабатывает их в Manticore Search. Мы будем использовать Docker Compose для настройки окружения. Это руководство подходит как для начинающих, так и для опытных разработчиков. Полный код и демонстрация доступны на GitHub .
Настройка окружения
Давайте начнем с настройки нашей среды разработки. Мы будем использовать Docker Compose для настройки всего нашего окружения, которое будет включать Kafka, Manticore Search и сервис Kafkacat для потоковой передачи данных. Сначала мы рассмотрим конфигурацию каждого сервиса отдельно, а затем предоставим полный файл docker-compose.yml.
Чтобы сэкономить время, вы можете скачать полный файл docker-compose.yml из нашего репозитория на GitHub и перейти к разделу Запуск настройки , если вы предпочитаете быстро начать.
Настройка Kafka
Давайте начнем с настройки Kafka. Мы будем использовать упрощенную конфигурацию с протоколом KRaft (Kafka Raft), который заменяет ZooKeeper для упрощения архитектуры. Вот часть сервиса Kafka из нашего файла docker-compose.yml:
kafka:
image: docker.io/bitnami/kafka:3.7
container_name: kafka
networks:
- app-network
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
Настройка Manticore
С настроенной Kafka для обработки нашей потоковой передачи сообщений, нам теперь нужен движок поиска и аналитики для обработки данных. Давайте настроим Manticore Search, используя минимальную, но функциональную конфигурацию:
manticore:
image: manticoresearch/manticore:7.4.6
container_name: manticore
networks:
- app-network
Запуск окружения
Запустите базовые контейнеры (Kafka и Manticore) с помощью следующей команды:
docker compose up -d
Это запускает сервисы Kafka и Manticore, но пока не сервис Kafkacat (так как он использует ручной профиль). После запуска сервисов создайте тему в Kafka. Мы установим количество разделов на 4 для параллельного чтения данных несколькими потребителями, что улучшает производительность:
docker compose exec kafka kafka-topics.sh \
--create \
--topic wikimedia \
--partitions 4 \
--bootstrap-server localhost:9092
Подготовка источников данных
Теперь, когда наша инфраструктура запущена и тема готова к получению сообщений, давайте настроим поток данных, который будет передавать контент в реальном времени в Kafka. Для отправки данных в Kafka мы будем использовать поток Wikimedia. Вот конфигурация сервиса Kafkacat с manual профилем (чтобы мы могли запустить его вручную после настройки темы и Manticore):
kafkacat:
profiles:
- manual
image: edenhill/kcat:1.7.1
container_name: kcat
tty: true
entrypoint:
- '/bin/sh'
- '-c'
- "apk update && apk add -f curl && curl -N https://stream.wikimedia.org/v2/stream/recentchange | awk '/^data: /{gsub(/^data: /, \"\"); print}' | kcat -P -b kafka:9092 -t wikimedia"
networks:
- app-network
После настройки сервиса Kafkacat с ручным профилем вы можете запустить его, чтобы начать потоковую передачу данных в Kafka:
docker compose --profile manual up -d
Пример полученных данных
Как только поток Wikimedia начнет поступать в Kafka, вы начнете получать сообщения в формате JSON. Давайте рассмотрим типичное сообщение, чтобы понять структуру данных, с которой мы будем работать:
{
"$schema": "/mediawiki/recentchange/1.0.0",
"meta": {
"uri": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
"request_id": "66d1686b-500e-438c-8985-8c7a28295ae8",
"id": "345ce42e-3cac-46b7-901e-2c3161f53436",
"dt": "2024-12-10T16:30:32Z",
"domain": "es.wikipedia.org",
"stream": "mediawiki.recentchange",
"topic": "codfw.mediawiki.recentchange",
"partition": 0,
"offset": 1313008266
},
"id": 323817817,
"type": "edit",
"namespace": 2,
"title": "Usuario:Davicilio/Taller",
"title_url": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
"comment": "/* Uniforme titular */",
"timestamp": 1733848232,
"user": "Davicilio",
"bot": false,
"notify_url": "https://es.wikipedia.org/w/index.php?diff=164049521&oldid=164010074",
"minor": false,
"length": {
"old": 29666,
"new": 29691
},
"revision": {
"old": 164010074,
"new": 164049521
},
"server_url": "https://es.wikipedia.org",
"server_name": "es.wikipedia.org",
"wiki": "eswiki",
"parsedcomment": "<span class=\"autocomment\"><a href=\"/wiki/Usuario:Davicilio/Taller#Uniforme_titular\">→<bdi>Uniforme titular</bdi></a></span>"
}
Работа с данными в Manticore
Теперь, когда у нас есть Kafka, получающая данные из потока Wikimedia, давайте настроим Manticore Search для обработки и индексации этих данных для поиска и анализа.
Создание источника данных
Давайте создадим SOURCE, который будет читать данные из Kafka. Мы укажем только те поля, которые нас интересуют — остальные будут игнорироваться. Если поле есть в схеме, но отсутствует в сообщении, оно будет установлено в NULL или оставлено пустым, в зависимости от типа данных:
docker compose exec manticore mysql -e "
CREATE SOURCE wiki_source (
id bigint,
schema '\$schema' text,
meta json,
type text,
namespace int,
title text,
title_url text,
comment text,
\`timestamp\` timestamp,
user text,
bot bool,
minor bool,
length json,
server_url text,
server_name text,
wiki text
)
type='kafka'
broker_list='kafka:9092'
topic_list='wikimedia'
consumer_group='ms_wikimedia'
num_consumers='1'
batch=200"
Объяснения:
CREATE SOURCE- Команда для создания источника данных.(id bigint, schema '$schema' text, …)- Список полей из входящего сообщения, сопоставленных с поддерживаемыми типами данных Manticore ( список типов данных ).- Поле
$schema- Manticore не позволяет использовать специальные символы в именах полей, поэтому мы используем первичное сопоставление:new_name 'original_name' typenew_name— имя поля, совместимое с Manticore.original_name— оригинальный ключ JSON, который может содержать специальные символы. Используйте\'для экранирования апострофов, если это необходимо.
- Поле
type=kafka- Указывает Kafka в качестве источника данных.broker_list='kafka:9092'— Список брокеров сообщений, разделенных запятыми.topic_list='wikimedia'— Список тем для чтения, разделенных запятыми.consumer_group='ms_wikimedia'— Имя группы потребителей.num_consumers='1'— Количество процессов, обрабатывающих сообщения (обычно соответствует или является кратным количеству разделов темы).batch=200— Размер пакета для обработки сообщений, влияющий на производительность и настраиваемый индивидуально.
Создание таблицы результатов
Мы создали источник данных для чтения из Kafka, но нам все еще нужно место назначения для этих данных. Давайте создадим таблицу для хранения обработанных сообщений:
Ключевым полем является ID сообщения. Во время передачи данных проблемы, такие как сбои сети, сбои брокера Kafka или простои Manticore, могут привести к дублированию сообщений. Чтобы избежать дубликатов, мы используем уникальный ID: если запись уже существует в таблице, она пропускается.
В дополнение к ID, таблица будет включать такие поля, как type, title, title_url, comment, timestamp, user, bot, minor, length, server_url, server_name, wiki и meta.
docker compose exec manticore mysql -e "create table wiki_results (
id bigint,
\`schema\` text,
metadata json,
type text,
namespace int,
title text,
title_url text,
comment text,
\`timestamp\` timestamp,
user string,
bot bool,
minor bool,
length_old int,
length_new int,
length_diff int,
server_url text,
server_name text,
wiki text,
received_at timestamp
)"
We've split the length field into length_old and length_new to demonstrate mapping capabilities.
Создание материализованного представления
With both our source (Kafka) and destination (the table) in place, we now need to connect them and define how data should flow between them. This is where a materialized view comes in — it acts as a real-time ETL process that transforms data as it moves from Kafka to our table:
| Input JSON key | Source key / Function | Destination field |
|---|---|---|
| id | id | id |
| $schema | schema | schema |
| meta | meta | metadata |
| type | type | type |
| namespace | namespace | namespace |
| title | title | title |
| title_url | title_url | title_url |
| comment | comment | comment |
| timestamp | timestamp | timestamp |
| user | user | user |
| bot | bot | bot |
| minor | minor | minor |
| length.old | length.old | length_old |
| length.new | length.new | length_new |
| - | integer(length.old) - integer(length.new) | length_diff |
| server_url | server_url | server_url |
| server_name | server_name | server_name |
| wiki | wiki | wiki |
| - | UTC_TIMESTAMP() | received_at |
Command to create it:
docker compose exec manticore mysql -e "
CREATE MATERIALIZED VIEW wiki_mva
TO wiki_results AS
SELECT
id,
\`schema\`,
meta AS metadata,
type,
namespace,
title,
title_url,
comment,
\`timestamp\`,
user,
bot,
minor,
length.old as length_old,
length.new as length_new,
integer(length.old) - integer(length.new) as length_diff,
server_url,
server_name,
wiki,
UTC_TIMESTAMP() as received_at
FROM wiki_source"
Essentially, this is a standard SELECT query, familiar to those who work with MySQL or similar databases:
- Fields with matching names in the source (
SOURCE) and target table are left as-is (id,schema,type, etc.). - Fields needing transformation (e.g.,
metatometadata) are specified withASin the formatoriginal_name AS new_name. - Reserved words like
schemaandtimestampare enclosed in backticks (`). - Nested JSON values are accessed with a dot and
AS(e.g.,length.newaslength_new). - Manticore supports a wide range of functions for data processing, from calculations to formatting.
- Filtering and grouping can be added if needed. We skipped this to keep the example simple, but you could add
WHERE MATCH(@title, 'pizza')afterFROM wiki_source.
Полная конфигурация Docker Compose
Now that we understand all the components and how they interact, let's recap by looking at the complete docker-compose.yml file. This single file defines our entire environment with all three services (Kafka, Manticore, and Kafkacat) plus the network configuration.
You can either copy the following content or download the готовый docker-compose.yml directly from our GitHub repository:
services:
kafka:
image: docker.io/bitnami/kafka:3.7
container_name: kafka
networks:
- app-network
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
manticore:
image: manticoresearch/manticore:7.4.6
container_name: manticore
networks:
- app-network
kafkacat:
profiles:
- manual
image: edenhill/kcat:1.7.1
container_name: kcat
tty: true
entrypoint:
- '/bin/sh'
- '-c'
- "apk update && apk add -f curl && curl -N https://stream.wikimedia.org/v2/stream/recentchange | awk '/^data: /{gsub(/^data: /, \"\"); print}' | kcat -P -b kafka:9092 -t wikimedia"
networks:
- app-network
networks:
app-network:
driver: bridge
Запуск настройки
With our environment configured, let's check how the data flows through the system. After saving or downloading the docker-compose.yml file to your project directory and starting the services as described earlier, you can monitor the data ingestion by running SQL queries against Manticore:
docker compose exec manticore mysql -e "SELECT count(*) FROM wiki_results"
+----------+
| count(*) |
+----------+
| 1200 |
+----------+
Wait a few seconds and run it again and you'll get an updated value:
docker compose exec manticore mysql -e "SELECT count(*) FROM wiki_results"
+----------+
| count(*) |
+----------+
| 1400 |
+----------+
A simple query to view the data:
docker compose exec manticore mysql -e "SELECT title, user, timestamp FROM wiki_results LIMIT 5"
+-----------------------------+------------------+------------+
| title | user | timestamp |
+-----------------------------+------------------+------------+
| Bobbi Gibb | BOT-Superzerocool| 1737470127 |
| Angela Alsobrooks | Tomrtn | 1737470214 |
| Category:Goldschmidt Stolper| MB-one | 1737470211 |
| Oklahoma Sooners | JohnDoe | 1737470220 |
| File:Kluse - Phoenix.jpg | WikiBot | 1737470230 |
+-----------------------------+------------------+------------+
A more complex query with grouping:
docker compose exec manticore mysql -e "SELECT
namespace,
COUNT(*) as count,
AVG(length_diff) as avg_length_change,
MAX(timestamp) as latest_edit,
title as sample_title
FROM wiki_results
WHERE MATCH('wiki')
GROUP BY namespace
HAVING count > 5
ORDER BY count DESC
LIMIT 10
OPTION ranker=sph04"
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
| namespace | count | avg_length_change | latest_edit | sample_title |
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
| 14 | 998 | 116196508.99599199 | 1740684056 | Category:Wiki For Minorities in the Middle East 2025 |
| 0 | 634 | 3075575718.85488939 | 1740684057 | Oklahoma Sooners men's basketball |
| 6 | 313 | 2758109067.434505 | 1740684056 | File:Kluse - Phoenix dactylifera 03 ies.jpg |
| 2 | 40 | 1825360728.625000 | 1740684053 | User:SD2125! |
| 4 | 21 | 3272355882.52380943 | 1740684051 | Commons:Wiki For Minorities in the Middle East |
| 3 | 16 | 3489659770.625000 | 1740684054 | Brugerdiskussion:Silas Nicolaisen |
| 1 | 13 | 3634202801.230769 | 1740684045 | Diskussion:Nordische Skiweltmeisterschaften 2025 |
| 1198 | 10 | 1288490146.500000 | 1740684053 | Translations:Commons:Wiki Loves Folklore 2025/Page display title/id |
| 10 | 8 | 3221223681.500000 | 1740684055 | Predefinição:Mana (série) |
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
Изменение схемы источника данных
If you need to modify the data source schema (e.g., add new fields, remove unnecessary ones, or change data types), follow these steps:
- Приостановите материализованное представление
First, pause the materialized view to stop the data flow from Kafka to thewiki_resultstable:docker compose exec manticore mysql -e "ALTER MATERIALIZED VIEW wiki_mva suspended=1" - Удалите существующий источник
Delete the current data source:docker compose exec manticore mysql -e "DROP SOURCE wiki_source" - Создайте новый источник с обновленной схемой
Create a new source with the modified schema. For example, to add thedomainfield from themetaJSON object, fieldparsedcommentand change thenamespacetype tobigint:docker compose exec manticore mysql -e "CREATE SOURCE wiki_source ( id bigint, schema '$schema' text, meta json, parsedcomment text, type text, namespace bigint, title text, title_url text, comment text, \`timestamp\` timestamp, user text, bot bool, minor bool, length json, server_url text, server_name text, wiki text ) type='kafka' broker_list='kafka:9092' topic_list='wikimedia' consumer_group='ms_wikimedia' num_consumers='1' batch=200" - Обновите таблицу (add the
domainandparsedcommentcolumns):docker compose exec manticore mysql -e "ALTER TABLE wiki_results ADD COLUMN domain text; ALTER TABLE wiki_results ADD COLUMN parsedcomment text" - Удалите материализованное представление:
docker compose exec manticore mysql -e "DROP MV wiki_mva" - Воссоздайте материализованный вид:
docker compose exec manticore mysql -e "CREATE MATERIALIZED VIEW wiki_mva TO wiki_results AS SELECT id, \`schema\`, meta AS metadata, meta.domain as domain, parsedcomment, type, namespace, title, title_url, comment, \`timestamp\`, user, bot, minor, length.old as length_old, length.new as length_new, integer(length.old) - integer(length.new) as length_diff, server_url, server_name, wiki, UTC_TIMESTAMP() as received_at FROM wiki_source"
Если вы только что воссоздали SOURCE, не изменяя MV, продолжите чтение данных с:
docker compose exec manticore mysql -e "ALTER MATERIALIZED VIEW wiki_mva suspended=0"
В противном случае материализованный вид уже должен быть возобновлен.
Этот процесс дает вам полный контроль над изменениями схемы и позволяет гибко адаптироваться к новым требованиям.
Заключение
Интеграция Kafka с Manticore Search предлагает мощное решение для обработки данных в реальном времени и аналитики. Следуя этому руководству, вы настроили надежную среду с использованием Docker Compose, сконфигурировали Kafka для обработки потоковой передачи сообщений и использовали Manticore Search для индексации и запроса данных. Эта интеграция не только улучшает функциональность вашего приложения, но и упрощает управление данными и их анализ.
Независимо от того, работаете ли вы над анализом логов, индексацией контента или любым другим приложением, основанным на данных, эта настройка предоставляет масштабируемую и эффективную структуру. Гибкость Manticore Search позволяет вам адаптировать конвейер обработки данных под ваши конкретные нужды, обеспечивая быструю адаптацию к изменяющимся требованиям.
Мы призываем вас поэкспериментировать с этой настройкой, исследовать дополнительные функции Manticore Search и адаптировать пример под ваши уникальные случаи использования. Полный код доступен на
GitHub
, а сообщество Manticore всегда готово помочь с любыми вопросами или проблемами, с которыми вы можете столкнуться. Погружайтесь и раскрывайте весь потенциал обработки данных в реальном времени с Kafka и Manticore Search!
