介绍
Kafka 是一个流行的消息代理,广泛用于各种项目:从日志处理和任务队列管理到内容个性化和实时分析。例如,它可以用于索引维基百科的更改或在在线商店中搜索产品。Manticore Search 反过来支持与 Kafka 的集成,使得自动数据导入成为可能,并用于全文搜索、分析、向量搜索等。
在将数据导入 Manticore 时,您可以灵活地处理数据:
- 删除不必要的字段,添加新字段或修改现有字段;
- 计算地理位置之间的距离;
- 在保存之前过滤数据;
- 使用全文搜索生成摘要。
在本文中,我们将逐步构建一个小型应用程序,从 Kafka 中检索数据并在 Manticore Search 中处理它。我们将使用 Docker Compose 来设置环境。本指南适合初学者和经验丰富的开发人员。完整的代码和演示可在 GitHub 上找到。
设置环境
让我们开始配置我们的开发环境。我们将使用 Docker Compose 来设置整个环境,其中将包括 Kafka、Manticore Search 和一个用于流数据的 Kafkacat 服务。我们将首先分别查看每个服务的配置,然后提供完整的 docker-compose.yml 文件。
为了节省时间,您可以从我们的 GitHub 存储库下载 完整的 docker-compose.yml 文件 ,如果您希望快速开始,可以跳到 运行设置 部分。
配置 Kafka
让我们开始设置 Kafka。我们将使用简化的配置,采用 KRaft(Kafka Raft)协议,替代 ZooKeeper 以简化架构。以下是我们 docker-compose.yml 文件中的 Kafka 服务部分:
kafka:
image: docker.io/bitnami/kafka:3.7
container_name: kafka
networks:
- app-network
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
配置 Manticore
在 Kafka 设置好以处理我们的消息流后,我们现在需要一个搜索和分析引擎来处理数据。让我们使用最小但功能齐全的配置来配置 Manticore Search:
manticore:
image: manticoresearch/manticore:7.4.6
container_name: manticore
networks:
- app-network
启动环境
使用以下命令启动基本容器(Kafka 和 Manticore):
docker compose up -d
这将启动 Kafka 和 Manticore 服务,但尚未启动 Kafkacat 服务(因为它使用手动配置)。在服务运行后,在 Kafka 中创建一个主题。我们将将分区数量设置为 4,以便多个消费者并行读取数据,从而提高性能:
docker compose exec kafka kafka-topics.sh \
--create \
--topic wikimedia \
--partitions 4 \
--bootstrap-server localhost:9092
准备数据源
现在我们已经建立了基础设施,并准备好接收消息的主题,让我们设置一个数据流,将实时内容输入 Kafka。为了将数据发送到 Kafka,我们将使用 Wikimedia Stream。以下是 Kafkacat 服务的配置,使用 manual 配置(以便在设置主题和 Manticore 后手动启动它):
kafkacat:
profiles:
- manual
image: edenhill/kcat:1.7.1
container_name: kcat
tty: true
entrypoint:
- '/bin/sh'
- '-c'
- "apk update && apk add -f curl && curl -N https://stream.wikimedia.org/v2/stream/recentchange | awk '/^data: /{gsub(/^data: /, \"\"); print}' | kcat -P -b kafka:9092 -t wikimedia"
networks:
- app-network
在使用手动配置配置 Kafkacat 服务后,您可以启动它以开始将数据流发送到 Kafka:
docker compose --profile manual up -d
接收数据示例
一旦 Wikimedia 流开始流入 Kafka,您将开始接收 JSON 格式的消息。让我们检查一个典型的消息,以了解我们将要处理的数据结构:
{
"$schema": "/mediawiki/recentchange/1.0.0",
"meta": {
"uri": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
"request_id": "66d1686b-500e-438c-8985-8c7a28295ae8",
"id": "345ce42e-3cac-46b7-901e-2c3161f53436",
"dt": "2024-12-10T16:30:32Z",
"domain": "es.wikipedia.org",
"stream": "mediawiki.recentchange",
"topic": "codfw.mediawiki.recentchange",
"partition": 0,
"offset": 1313008266
},
"id": 323817817,
"type": "edit",
"namespace": 2,
"title": "Usuario:Davicilio/Taller",
"title_url": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
"comment": "/* Uniforme titular */",
"timestamp": 1733848232,
"user": "Davicilio",
"bot": false,
"notify_url": "https://es.wikipedia.org/w/index.php?diff=164049521&oldid=164010074",
"minor": false,
"length": {
"old": 29666,
"new": 29691
},
"revision": {
"old": 164010074,
"new": 164049521
},
"server_url": "https://es.wikipedia.org",
"server_name": "es.wikipedia.org",
"wiki": "eswiki",
"parsedcomment": "<span class=\"autocomment\"><a href=\"/wiki/Usuario:Davicilio/Taller#Uniforme_titular\">→<bdi>Uniforme titular</bdi></a></span>"
}
在 Manticore 中处理数据
现在我们已经让 Kafka 接收来自 Wikimedia 流的数据,让我们配置 Manticore Search 来处理和索引这些数据,以便进行搜索和分析。
创建数据源
让我们创建一个 SOURCE,从 Kafka 中读取数据。我们将仅指定我们感兴趣的字段——其他字段将被忽略。如果某个字段在模式中但在消息中缺失,它将被设置为 NULL 或留空,具体取决于数据类型:
docker compose exec manticore mysql -e "
CREATE SOURCE wiki_source (
id bigint,
schema '\$schema' text,
meta json,
type text,
namespace int,
title text,
title_url text,
comment text,
\`timestamp\` timestamp,
user text,
bot bool,
minor bool,
length json,
server_url text,
server_name text,
wiki text
)
type='kafka'
broker_list='kafka:9092'
topic_list='wikimedia'
consumer_group='ms_wikimedia'
num_consumers='1'
batch=200"
解释:
CREATE SOURCE- 创建数据源的命令。(id bigint, schema '$schema' text, …)- 来自传入消息的字段列表,映射到 Manticore 支持的数据类型( 数据类型列表 )。- 字段
$schema- Manticore 不允许字段名称中包含特殊字符,因此我们使用主映射:new_name 'original_name' typenew_name— Manticore 兼容的字段名称。original_name— 原始 JSON 键,可能包含特殊字符。如果需要,使用\'转义撇号。
- 字段
type=kafka- 指定 Kafka 作为数据源。broker_list='kafka:9092'— 消息代理的列表,以逗号分隔。topic_list='wikimedia'— 要读取的主题列表,以逗号分隔。consumer_group='ms_wikimedia'— 消费者组名称。num_consumers='1'— 处理消息的进程数量(通常与主题的分区数量相匹配或是其倍数)。batch=200— 处理消息的批量大小,影响性能并单独调整。
创建结果表
我们已经创建了一个数据源来从 Kafka 中读取数据,但我们仍然需要一个存储该数据的目标。让我们创建一个表来存储处理过的消息:
一个关键字段是消息的 ID。在数据传输过程中,网络故障、Kafka 代理崩溃或 Manticore 停机等问题可能导致重复消息。为了避免重复,我们使用唯一的 ID:如果记录已经存在于表中,则会被跳过。
除了 ID,表还将包括 type、title、title_url、comment、timestamp、user、bot、minor、length、server_url、server_name、wiki 和 meta 等字段。
docker compose exec manticore mysql -e "create table wiki_results (
id bigint,
\`schema\` text,
metadata json,
type text,
namespace int,
title text,
title_url text,
comment text,
\`timestamp\` timestamp,
user string,
bot bool,
minor bool,
length_old int,
length_new int,
length_diff int,
server_url text,
server_name text,
wiki text,
received_at timestamp
)"
We've split the length field into length_old and length_new to demonstrate mapping capabilities.
创建物化视图
With both our source (Kafka) and destination (the table) in place, we now need to connect them and define how data should flow between them. This is where a materialized view comes in — it acts as a real-time ETL process that transforms data as it moves from Kafka to our table:
| 输入 JSON 键 | 源键 / 函数 | 目标字段 |
|---|---|---|
| id | id | id |
| $schema | schema | schema |
| meta | meta | metadata |
| type | type | type |
| namespace | namespace | namespace |
| title | title | title |
| title_url | title_url | title_url |
| comment | comment | comment |
| timestamp | timestamp | timestamp |
| user | user | user |
| bot | bot | bot |
| minor | minor | minor |
| length.old | length.old | length_old |
| length.new | length.new | length_new |
| - | integer(length.old) - integer(length.new) | length_diff |
| server_url | server_url | server_url |
| server_name | server_name | server_name |
| wiki | wiki | wiki |
| - | UTC_TIMESTAMP() | received_at |
Command to create it:
docker compose exec manticore mysql -e "
CREATE MATERIALIZED VIEW wiki_mva
TO wiki_results AS
SELECT
id,
\`schema\`,
meta AS metadata,
type,
namespace,
title,
title_url,
comment,
\`timestamp\`,
user,
bot,
minor,
length.old as length_old,
length.new as length_new,
integer(length.old) - integer(length.new) as length_diff,
server_url,
server_name,
wiki,
UTC_TIMESTAMP() as received_at
FROM wiki_source"
Essentially, this is a standard SELECT query, familiar to those who work with MySQL or similar databases:
- Fields with matching names in the source (
SOURCE) and target table are left as-is (id,schema,type, etc.). - Fields needing transformation (e.g.,
metatometadata) are specified withASin the formatoriginal_name AS new_name. - Reserved words like
schemaandtimestampare enclosed in backticks (`). - Nested JSON values are accessed with a dot and
AS(e.g.,length.newaslength_new). - Manticore supports a wide range of functions for data processing, from calculations to formatting.
- Filtering and grouping can be added if needed. We skipped this to keep the example simple, but you could add
WHERE MATCH(@title, 'pizza')afterFROM wiki_source.
完整的 Docker Compose 配置
Now that we understand all the components and how they interact, let's recap by looking at the complete docker-compose.yml file. This single file defines our entire environment with all three services (Kafka, Manticore, and Kafkacat) plus the network configuration.
You can either copy the following content or download the ready-to-use docker-compose.yml directly from our GitHub repository:
services:
kafka:
image: docker.io/bitnami/kafka:3.7
container_name: kafka
networks:
- app-network
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
manticore:
image: manticoresearch/manticore:7.4.6
container_name: manticore
networks:
- app-network
kafkacat:
profiles:
- manual
image: edenhill/kcat:1.7.1
container_name: kcat
tty: true
entrypoint:
- '/bin/sh'
- '-c'
- "apk update && apk add -f curl && curl -N https://stream.wikimedia.org/v2/stream/recentchange | awk '/^data: /{gsub(/^data: /, \"\"); print}' | kcat -P -b kafka:9092 -t wikimedia"
networks:
- app-network
networks:
app-network:
driver: bridge
运行设置
With our environment configured, let's check how the data flows through the system. After saving or downloading the docker-compose.yml file to your project directory and starting the services as described earlier, you can monitor the data ingestion by running SQL queries against Manticore:
docker compose exec manticore mysql -e "SELECT count(*) FROM wiki_results"
+----------+
| count(*) |
+----------+
| 1200 |
+----------+
Wait a few seconds and run it again and you'll get an updated value:
docker compose exec manticore mysql -e "SELECT count(*) FROM wiki_results"
+----------+
| count(*) |
+----------+
| 1400 |
+----------+
A simple query to view the data:
docker compose exec manticore mysql -e "SELECT title, user, timestamp FROM wiki_results LIMIT 5"
+-----------------------------+------------------+------------+
| title | user | timestamp |
+-----------------------------+------------------+------------+
| Bobbi Gibb | BOT-Superzerocool| 1737470127 |
| Angela Alsobrooks | Tomrtn | 1737470214 |
| Category:Goldschmidt Stolper| MB-one | 1737470211 |
| Oklahoma Sooners | JohnDoe | 1737470220 |
| File:Kluse - Phoenix.jpg | WikiBot | 1737470230 |
+-----------------------------+------------------+------------+
A more complex query with grouping:
docker compose exec manticore mysql -e "SELECT
namespace,
COUNT(*) as count,
AVG(length_diff) as avg_length_change,
MAX(timestamp) as latest_edit,
title as sample_title
FROM wiki_results
WHERE MATCH('wiki')
GROUP BY namespace
HAVING count > 5
ORDER BY count DESC
LIMIT 10
OPTION ranker=sph04"
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
| namespace | count | avg_length_change | latest_edit | sample_title |
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
| 14 | 998 | 116196508.99599199 | 1740684056 | Category:Wiki For Minorities in the Middle East 2025 |
| 0 | 634 | 3075575718.85488939 | 1740684057 | Oklahoma Sooners men's basketball |
| 6 | 313 | 2758109067.434505 | 1740684056 | File:Kluse - Phoenix dactylifera 03 ies.jpg |
| 2 | 40 | 1825360728.625000 | 1740684053 | User:SD2125! |
| 4 | 21 | 3272355882.52380943 | 1740684051 | Commons:Wiki For Minorities in the Middle East |
| 3 | 16 | 3489659770.625000 | 1740684054 | Brugerdiskussion:Silas Nicolaisen |
| 1 | 13 | 3634202801.230769 | 1740684045 | Diskussion:Nordische Skiweltmeisterschaften 2025 |
| 1198 | 10 | 1288490146.500000 | 1740684053 | Translations:Commons:Wiki Loves Folklore 2025/Page display title/id |
| 10 | 8 | 3221223681.500000 | 1740684055 | Predefinição:Mana (série) |
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
修改数据源架构
If you need to modify the data source schema (e.g., add new fields, remove unnecessary ones, or change data types), follow these steps:
- 暂停物化视图
First, pause the materialized view to stop the data flow from Kafka to thewiki_resultstable:docker compose exec manticore mysql -e "ALTER MATERIALIZED VIEW wiki_mva suspended=1" - 删除现有源
Delete the current data source:docker compose exec manticore mysql -e "DROP SOURCE wiki_source" - 使用更新的架构创建新源
Create a new source with the modified schema. For example, to add thedomainfield from themetaJSON object, fieldparsedcommentand change thenamespacetype tobigint:docker compose exec manticore mysql -e "CREATE SOURCE wiki_source ( id bigint, schema '$schema' text, meta json, parsedcomment text, type text, namespace bigint, title text, title_url text, comment text, \`timestamp\` timestamp, user text, bot bool, minor bool, length json, server_url text, server_name text, wiki text ) type='kafka' broker_list='kafka:9092' topic_list='wikimedia' consumer_group='ms_wikimedia' num_consumers='1' batch=200" - 更新表 (添加
domain和parsedcomment列):docker compose exec manticore mysql -e "ALTER TABLE wiki_results ADD COLUMN domain text; ALTER TABLE wiki_results ADD COLUMN parsedcomment text" - 删除物化视图:
docker compose exec manticore mysql -e "DROP MV wiki_mva" - 重新创建物化视图:
docker compose exec manticore mysql -e "CREATE MATERIALIZED VIEW wiki_mva TO wiki_results AS SELECT id, \`schema\`, meta AS metadata, meta.domain as domain, parsedcomment, type, namespace, title, title_url, comment, \`timestamp\`, user, bot, minor, length.old as length_old, length.new as length_new, integer(length.old) - integer(length.new) as length_diff, server_url, server_name, wiki, UTC_TIMESTAMP() as received_at FROM wiki_source"
如果您仅重新创建了 SOURCE 而没有修改 MV,请使用以下命令恢复数据读取:
docker compose exec manticore mysql -e "ALTER MATERIALIZED VIEW wiki_mva suspended=0"
否则,物化视图应该已经恢复。
此过程使您能够完全控制模式更改,并允许灵活适应新需求。
结论
将 Kafka 与 Manticore Search 集成提供了一个强大的实时数据处理和分析解决方案。通过遵循本指南,您已使用 Docker Compose 设置了一个强大的环境,配置 Kafka 以处理消息流,并利用 Manticore Search 进行数据的索引和查询。这种集成不仅增强了您应用程序的功能,还简化了数据管理和分析。
无论您是在进行日志分析、内容索引还是其他任何数据驱动的应用程序,这种设置都提供了一个可扩展和高效的框架。Manticore Search 的灵活性使您能够根据特定需求定制数据处理管道,确保您能够快速适应变化的要求。
我们鼓励您尝试此设置,探索 Manticore Search 的其他功能,并将示例调整为您的独特用例。完整代码可在
GitHub
上找到,Manticore 社区始终准备好帮助您解决任何问题或挑战。深入了解并释放 Kafka 和 Manticore Search 的实时数据处理的全部潜力!
