介绍
Kafka 是一种流行的消息代理,用于广泛的项目:从日志处理和任务队列管理到内容个性化和实时分析。例如,它可以用于索引维基百科中的更改或在线商店中搜索产品。Manticore Search 则支持与 Kafka 集成,能够实现自动数据导入,并用于全文搜索、分析、向量搜索等。
在将数据导入 Manticore 时,您可以灵活处理:
- 移除不必要的字段,添加新字段或修改现有字段;
- 计算地理位置之间的距离;
- 在保存之前过滤数据;
- 使用全文搜索生成摘要。
在本文中,我们将逐步构建一个小型应用程序,从 Kafka 获取数据并在 Manticore Search 中处理。我们将使用 Docker Compose 来设置环境。此指南适合初学者和有经验的开发人员。完整代码和演示可在 GitHub 上找到。
设置环境
让我们开始配置我们的开发环境。我们将使用 Docker Compose 来设置整个环境,这将包括 Kafka、Manticore Search 和一个用于数据流的 Kafkacat 服务。我们将首先分别查看每个服务的配置,然后提供完整的 docker-compose.yml
文件。
为了节省时间,您可以从我们的 GitHub 仓库下载 完整的 docker-compose.yml 文件 ,如果您更倾向于快速开始,可以跳到 运行设置 部分。
配置 Kafka
让我们开始设置 Kafka。我们将使用简化的配置,使用 KRaft(Kafka Raft)协议,它替代 ZooKeeper 以简化架构。这是我们 docker-compose.yml 文件中的 Kafka 服务部分:
kafka:
image: docker.io/bitnami/kafka:3.7
container_name: kafka
networks:
- app-network
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
配置 Manticore
在 Kafka 设置好以处理我们的消息流之后,我们现在需要一个搜索和分析引擎来处理数据。让我们使用最小但功能齐全的配置来配置 Manticore Search:
manticore:
image: manticoresearch/manticore:7.4.6
container_name: manticore
networks:
- app-network
启动环境
使用以下命令启动基本容器(Kafka 和 Manticore):
docker compose up -d
这会启动 Kafka 和 Manticore 服务,但尚未启动 Kafkacat 服务(因为它使用手动配置)。服务运行后,在 Kafka 中创建一个主题。我们将设置分区数量为 4,以便多个消费者并行读取数据,从而提高性能:
docker compose exec kafka kafka-topics.sh \
--create \
--topic wikimedia \
--partitions 4 \
--bootstrap-server localhost:9092
准备数据源
现在我们的基础设施已经搭建完成,并且主题准备好接收消息,让我们设置一个数据流,将实时内容输入 Kafka。为了将数据发送到 Kafka,我们将使用 Wikimedia Stream。以下是带有 manual
配置文件的 Kafkacat 服务配置(这样我们可以在设置主题和 Manticore 后手动启动它):
kafkacat:
profiles:
- manual
image: edenhill/kcat:1.7.1
container_name: kcat
tty: true
entrypoint:
- '/bin/sh'
- '-c'
- "apk update && apk add -f curl && curl -N https://stream.wikimedia.org/v2/stream/recentchange | awk '/^data: /{gsub(/^data: /, \"\"); print}' | kcat -P -b kafka:9092 -t wikimedia"
networks:
- app-network
在使用手动配置文件配置 Kafkacat 服务后,您可以启动它以开始将数据流向 Kafka:
docker compose --profile manual up -d
接收数据示例
一旦 Wikimedia 流开始流入 Kafka,您将开始接收 JSON 格式的消息。让我们检查一个典型消息,以了解我们将要处理的数据结构:
{
"$schema": "/mediawiki/recentchange/1.0.0",
"meta": {
"uri": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
"request_id": "66d1686b-500e-438c-8985-8c7a28295ae8",
"id": "345ce42e-3cac-46b7-901e-2c3161f53436",
"dt": "2024-12-10T16:30:32Z",
"domain": "es.wikipedia.org",
"stream": "mediawiki.recentchange",
"topic": "codfw.mediawiki.recentchange",
"partition": 0,
"offset": 1313008266
},
"id": 323817817,
"type": "edit",
"namespace": 2,
"title": "Usuario:Davicilio/Taller",
"title_url": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
"comment": "/* Uniforme titular */",
"timestamp": 1733848232,
"user": "Davicilio",
"bot": false,
"notify_url": "https://es.wikipedia.org/w/index.php?diff=164049521&oldid=164010074",
"minor": false,
"length": {
"old": 29666,
"new": 29691
},
"revision": {
"old": 164010074,
"new": 164049521
},
"server_url": "https://es.wikipedia.org",
"server_name": "es.wikipedia.org",
"wiki": "eswiki",
"parsedcomment": "<span class=\"autocomment\"><a href=\"/wiki/Usuario:Davicilio/Taller#Uniforme_titular\">→<bdi>Uniforme titular</bdi></a></span>"
}
在 Manticore 中处理数据
现在我们已经让 Kafka 接收来自 Wikimedia 流的数据,让我们配置 Manticore Search 来处理和索引这些数据,以便进行搜索和分析。
创建数据源
让我们创建一个 SOURCE
,从 Kafka 读取数据。我们将只指定我们感兴趣的字段——其他字段将被忽略。如果某个字段在 schema 中但在消息中缺失,它将被设置为 NULL
或留空,具体取决于数据类型:
docker compose exec manticore mysql -e "
CREATE SOURCE wiki_source (
id bigint,
schema '\$schema' text,
meta json,
type text,
namespace int,
title text,
title_url text,
comment text,
\`timestamp\` timestamp,
user text,
bot bool,
minor bool,
length json,
server_url text,
server_name text,
wiki text
)
type='kafka'
broker_list='kafka:9092'
topic_list='wikimedia'
consumer_group='ms_wikimedia'
num_consumers='1'
batch=200"
解释:
CREATE SOURCE
- 创建数据源的命令。(id bigint, schema '$schema' text, …)
- 来自传入消息的字段列表,映射到 Manticore 支持的数据类型( 数据类型列表 )。- 字段
$schema
- Manticore 不允许字段名中包含特殊字符,因此我们使用主映射:new_name 'original_name' type
new_name
— 与 Manticore 兼容的字段名。original_name
— 原始 JSON 键,可能包含特殊字符。如有需要,使用\'
转义撇号。
- 字段
type=kafka
- 指定 Kafka 作为数据源。broker_list='kafka:9092'
— 消息代理的列表,以逗号分隔。topic_list='wikimedia'
— 需要读取的主题列表,以逗号分隔。consumer_group='ms_wikimedia'
— 消费者组名称。num_consumers='1'
— 处理消息的进程数量(通常与主题的分区计数相匹配或为其倍数)。batch=200
— 处理消息的批大小,影响性能,需单独调整。
创建结果表
我们已经创建了一个从 Kafka 读取数据的数据源,但仍需要一个存储该数据的目的地。让我们创建一个表来存储处理后的消息:
一个关键字段是消息 ID
。在数据传输过程中,网络故障、Kafka 代理崩溃或 Manticore 停机等问题可能会导致消息重复。为了避免重复,我们使用唯一的 ID
:如果表中已存在记录,则跳过该记录。
除了 ID
之外,表还将包括 type
、title
、title_url
、comment
、timestamp
、user
、bot
、minor
、length
、server_url
、server_name
、wiki
和 meta
等字段。
docker compose exec manticore mysql -e "create table wiki_results (
id bigint,
\`schema\` text,
metadata json,
type text,
namespace int,
title text,
title_url text,
comment text,
\`timestamp\` timestamp,
user string,
bot bool,
minor bool,
length_old int,
length_new int,
length_diff int,
server_url text,
server_name text,
wiki text,
received_at timestamp
)"