blog-post

将 Kafka 与 Manticore Search 集成:实时数据处理的逐步指南

介绍

Kafka 是一种流行的消息代理,用于广泛的项目:从日志处理和任务队列管理到内容个性化和实时分析。例如,它可以用于索引维基百科中的更改或在线商店中搜索产品。Manticore Search 则支持与 Kafka 集成,能够实现自动数据导入,并用于全文搜索、分析、向量搜索等。

在将数据导入 Manticore 时,您可以灵活处理:

  • 移除不必要的字段,添加新字段或修改现有字段;
  • 计算地理位置之间的距离;
  • 在保存之前过滤数据;
  • 使用全文搜索生成摘要。

在本文中,我们将逐步构建一个小型应用程序,从 Kafka 获取数据并在 Manticore Search 中处理。我们将使用 Docker Compose 来设置环境。此指南适合初学者和有经验的开发人员。完整代码和演示可在 GitHub 上找到。


设置环境

让我们开始配置我们的开发环境。我们将使用 Docker Compose 来设置整个环境,这将包括 Kafka、Manticore Search 和一个用于数据流的 Kafkacat 服务。我们将首先分别查看每个服务的配置,然后提供完整的 docker-compose.yml 文件。

为了节省时间,您可以从我们的 GitHub 仓库下载 完整的 docker-compose.yml 文件 ,如果您更倾向于快速开始,可以跳到 运行设置 部分。

配置 Kafka

让我们开始设置 Kafka。我们将使用简化的配置,使用 KRaft(Kafka Raft)协议,它替代 ZooKeeper 以简化架构。这是我们 docker-compose.yml 文件中的 Kafka 服务部分:

kafka:
  image: docker.io/bitnami/kafka:3.7
  container_name: kafka
  networks:
    - app-network
  environment:
    # KRaft settings
    - KAFKA_CFG_NODE_ID=0
    - KAFKA_CFG_PROCESS_ROLES=controller,broker
    - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
    # Listeners
    - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
    - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
    - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
    - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT

配置 Manticore

在 Kafka 设置好以处理我们的消息流之后,我们现在需要一个搜索和分析引擎来处理数据。让我们使用最小但功能齐全的配置来配置 Manticore Search:

manticore:
  image: manticoresearch/manticore:7.4.6
  container_name: manticore
  networks:
    - app-network

启动环境

使用以下命令启动基本容器(Kafka 和 Manticore):

docker compose up -d

这会启动 Kafka 和 Manticore 服务,但尚未启动 Kafkacat 服务(因为它使用手动配置)。服务运行后,在 Kafka 中创建一个主题。我们将设置分区数量为 4,以便多个消费者并行读取数据,从而提高性能:

docker compose exec kafka kafka-topics.sh \
  --create \
  --topic wikimedia \
  --partitions 4 \
  --bootstrap-server localhost:9092

准备数据源

现在我们的基础设施已经搭建完成,并且主题准备好接收消息,让我们设置一个数据流,将实时内容输入 Kafka。为了将数据发送到 Kafka,我们将使用 Wikimedia Stream。以下是带有 manual 配置文件的 Kafkacat 服务配置(这样我们可以在设置主题和 Manticore 后手动启动它):

kafkacat:
  profiles:
    - manual
  image: edenhill/kcat:1.7.1
  container_name: kcat
  tty: true
  entrypoint:
    - '/bin/sh'
    - '-c'
    - "apk update && apk add -f curl && curl -N https://stream.wikimedia.org/v2/stream/recentchange | awk '/^data: /{gsub(/^data: /, \"\"); print}' | kcat -P -b kafka:9092 -t wikimedia"
  networks:
    - app-network

在使用手动配置文件配置 Kafkacat 服务后,您可以启动它以开始将数据流向 Kafka:

docker compose --profile manual up -d

接收数据示例

一旦 Wikimedia 流开始流入 Kafka,您将开始接收 JSON 格式的消息。让我们检查一个典型消息,以了解我们将要处理的数据结构:

{
  "$schema": "/mediawiki/recentchange/1.0.0",
  "meta": {
    "uri": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
    "request_id": "66d1686b-500e-438c-8985-8c7a28295ae8",
    "id": "345ce42e-3cac-46b7-901e-2c3161f53436",
    "dt": "2024-12-10T16:30:32Z",
    "domain": "es.wikipedia.org",
    "stream": "mediawiki.recentchange",
    "topic": "codfw.mediawiki.recentchange",
    "partition": 0,
    "offset": 1313008266
  },
  "id": 323817817,
  "type": "edit",
  "namespace": 2,
  "title": "Usuario:Davicilio/Taller",
  "title_url": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
  "comment": "/* Uniforme titular */",
  "timestamp": 1733848232,
  "user": "Davicilio",
  "bot": false,
  "notify_url": "https://es.wikipedia.org/w/index.php?diff=164049521&oldid=164010074",
  "minor": false,
  "length": {
    "old": 29666,
    "new": 29691
  },
  "revision": {
    "old": 164010074,
    "new": 164049521
  },
  "server_url": "https://es.wikipedia.org",
  "server_name": "es.wikipedia.org",
  "wiki": "eswiki",
  "parsedcomment": "<span class=\"autocomment\"><a href=\"/wiki/Usuario:Davicilio/Taller#Uniforme_titular\">→<bdi>Uniforme titular</bdi></a></span>"
}

在 Manticore 中处理数据

现在我们已经让 Kafka 接收来自 Wikimedia 流的数据,让我们配置 Manticore Search 来处理和索引这些数据,以便进行搜索和分析。

创建数据源

让我们创建一个 SOURCE,从 Kafka 读取数据。我们将只指定我们感兴趣的字段——其他字段将被忽略。如果某个字段在 schema 中但在消息中缺失,它将被设置为 NULL 或留空,具体取决于数据类型:

docker compose exec manticore mysql -e "
CREATE SOURCE wiki_source (
  id bigint,
  schema '\$schema' text,
  meta json,
  type text,
  namespace int,
  title text,
  title_url text,
  comment text,
  \`timestamp\` timestamp,
  user text,
  bot bool,
  minor bool,
  length json,
  server_url text,
  server_name text,
  wiki text
)
type='kafka'
broker_list='kafka:9092'
topic_list='wikimedia'
consumer_group='ms_wikimedia'
num_consumers='1'
batch=200"

解释:

  • CREATE SOURCE - 创建数据源的命令。
  • (id bigint, schema '$schema' text, …) - 来自传入消息的字段列表,映射到 Manticore 支持的数据类型( 数据类型列表 )。
    • 字段 $schema - Manticore 不允许字段名中包含特殊字符,因此我们使用主映射:
      new_name 'original_name' type
      
      • new_name — 与 Manticore 兼容的字段名。
      • original_name — 原始 JSON 键,可能包含特殊字符。如有需要,使用 \' 转义撇号。
  • type=kafka - 指定 Kafka 作为数据源。
  • broker_list='kafka:9092' — 消息代理的列表,以逗号分隔。
  • topic_list='wikimedia' — 需要读取的主题列表,以逗号分隔。
  • consumer_group='ms_wikimedia' — 消费者组名称。
  • num_consumers='1' — 处理消息的进程数量(通常与主题的分区计数相匹配或为其倍数)。
  • batch=200 — 处理消息的批大小,影响性能,需单独调整。

创建结果表

我们已经创建了一个从 Kafka 读取数据的数据源,但仍需要一个存储该数据的目的地。让我们创建一个表来存储处理后的消息:

一个关键字段是消息 ID。在数据传输过程中,网络故障、Kafka 代理崩溃或 Manticore 停机等问题可能会导致消息重复。为了避免重复,我们使用唯一的 ID:如果表中已存在记录,则跳过该记录。

除了 ID 之外,表还将包括 typetitletitle_urlcommenttimestampuserbotminorlengthserver_urlserver_namewikimeta 等字段。

docker compose exec manticore mysql -e "create table wiki_results (
  id bigint, 
  \`schema\` text, 
  metadata json, 
  type text, 
  namespace int, 
  title text, 
  title_url text, 
  comment text, 
  \`timestamp\` timestamp, 
  user string, 
  bot bool, 
  minor bool, 
  length_old int, 
  length_new int, 
  length_diff int, 
  server_url text, 
  server_name text, 
  wiki text, 
  received_at timestamp
)"

安装Manticore Search

安装Manticore Search