⚠️ 此页面为自动翻译,翻译可能不完美。
blog-post

Integrating Kafka with Manticore Search: A Step-by-Step Guide to Real-Time Data Processing

引言

Kafka 是一个广泛用于各种项目的流行消息代理:从日志处理和任务队列管理到内容个性化和实时分析。例如,它可以用于索引维基百科的更改或在在线商店中搜索产品。Manticore Search 支持与 Kafka 的集成,从而实现自动数据导入,并可用于全文搜索、分析、向量搜索等。

在将数据导入 Manticore 时,您可以灵活地处理它:

  • 删除不必要的字段,添加新字段或修改现有字段;
  • 计算地理位置之间的距离;
  • 在保存前过滤数据;
  • 使用全文搜索生成片段。

在本文中,我们将逐步构建一个小型应用程序,从 Kafka 获取数据并在 Manticore Search 中进行处理。我们将使用 Docker Compose 来设置环境。本指南适合初学者和有经验的开发人员。完整的代码和演示可在 GitHub 上找到。


设置环境

让我们从配置开发环境开始。我们将使用 Docker Compose 来设置整个环境,其中包括 Kafka、Manticore Search 和一个用于流式传输数据的 Kafkacat 服务。我们首先分别查看每个服务的配置,然后提供完整的 docker-compose.yml 文件。

为了节省时间,您可以从我们的 GitHub 仓库下载 完整的 docker-compose.yml 文件 ,如果您希望快速入门,可以跳至 运行设置 部分。

配置 Kafka

让我们首先设置 Kafka。我们将使用简化配置,采用 KRaft(Kafka Raft)协议,该协议取代了 ZooKeeper 以简化架构。以下是 docker-compose.yml 文件中 Kafka 服务的部分配置:

kafka:
  image: docker.io/bitnami/kafka:3.7
  container_name: kafka
  networks:
    - app-network
  environment:
    # KRaft settings
    - KAFKA_CFG_NODE_ID=0
    - KAFKA_CFG_PROCESS_ROLES=controller,broker
    - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
    # Listeners
    - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
    - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
    - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
    - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT

配置 Manticore

在 Kafka 设置好以处理我们的消息流后,我们现在需要一个搜索和分析引擎来处理数据。让我们使用一个最小但功能齐全的配置来设置 Manticore Search:

manticore:
  image: manticoresearch/manticore:7.4.6
  container_name: manticore
  networks:
    - app-network

启动环境

使用以下命令启动基本容器(Kafka 和 Manticore):

docker compose up -d

这将启动 Kafka 和 Manticore 服务,但尚未启动 Kafkacat 服务(因为它使用手动配置文件)。在服务运行后,创建一个 Kafka 主题。我们将分区数设置为 4,以便多个消费者并行读取数据,从而提高性能:

docker compose exec kafka kafka-topics.sh \
  --create \
  --topic wikimedia \
  --partitions 4 \
  --bootstrap-server localhost:9092

准备数据源

现在我们已经启动了基础设施并准备好了接收消息的主题,让我们设置一个数据流,将实时内容馈送到 Kafka。为了将数据发送到 Kafka,我们将使用维基媒体流。以下是带有 manual 配置文件的 Kafkacat 服务配置(这样我们可以在设置主题和 Manticore 后手动启动它):

kafkacat:
  profiles:
    - manual
  image: edenhill/kcat:1.7.1
  container_name: kcat
  tty: true
  entrypoint:
    - '/bin/sh'
    - '-c'
    - "apk update && apk add -f curl && curl -N https://stream.wikimedia.org/v2/stream/recentchange | awk '/^data: /{gsub(/^data: /, \"\"); print}' | kcat -P -b kafka:9092 -t wikimedia"
  networks:
    - app-network

在使用手动配置文件配置 Kafkacat 服务后,您可以启动它以开始将数据流式传输到 Kafka:

docker compose --profile manual up -d

接收数据示例

一旦维基媒体流开始流入 Kafka,您将开始以 JSON 格式接收消息。让我们检查一个典型消息以了解我们将处理的数据结构:

{
  "$schema": "/mediawiki/recentchange/1.0.0",
  "meta": {
    "uri": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
    "request_id": "66d1686b-500e-438c-8985-8c7a28295ae8",
    "id": "345ce42e-3cac-46b7-901e-2c3161f53436",
    "dt": "2024-12-10T16:30:32Z",
    "domain": "es.wikipedia.org",
    "stream": "mediawiki.recentchange",
    "topic": "codfw.mediawiki.recentchange",
    "partition": 0,
    "offset": 1313008266
  },
  "id": 323817817,
  "type": "edit",
  "namespace": 2,
  "title": "Usuario:Davicilio/Taller",
  "title_url": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
  "comment": "/* Uniforme titular */",
  "timestamp": 1733848232,
  "user": "Davicilio",
  "bot": false,
  "notify_url": "https://es.wikipedia.org/w/index.php?diff=164049521&oldid=164010074",
  "minor": false,
  "length": {
    "old": 29666,
    "new": 29691
  },
  "revision": {
    "old": 164010074,
    "new": 164049521
  },
  "server_url": "https://es.wikipedia.org",
  "server_name": "es.wikipedia.org",
  "wiki": "eswiki",
  "parsedcomment": "<span class=\"autocomment\"><a href=\"/wiki/Usuario:Davicilio/Taller#Uniforme_titular\">→<bdi>Uniforme titular</bdi></a></span>"
}

在 Manticore 中处理数据

现在 Kafka 已经从维基媒体流接收数据,让我们配置 Manticore Search 来处理和索引这些数据以进行搜索和分析。

创建数据源

让我们创建一个 SOURCE,它将从 Kafka 读取数据。我们将只指定我们感兴趣的字段——其他字段将被忽略。如果字段在模式中但消息中缺失,它将根据数据类型设置为 NULL 或留空:

docker compose exec manticore mysql -e "
CREATE SOURCE wiki_source (
  id bigint,
  schema '\$schema' text,
  meta json,
  type text,
  namespace int,
  title text,
  title_url text,
  comment text,
  \`timestamp\` timestamp,
  user text,
  bot bool,
  minor bool,
  length json,
  server_url text,
  server_name text,
  wiki text
)
type='kafka'
broker_list='kafka:9092'
topic_list='wikimedia'
consumer_group='ms_wikimedia'
num_consumers='1'
batch=200"

解释:

  • CREATE SOURCE - 创建数据源的命令。
  • (id bigint, schema '$schema' text, …) - 从传入消息中提取的字段列表,映射到 Manticore 支持的数据类型 ( 数据类型列表 )。
    • 字段 $schema - Manticore 不允许字段名中包含特殊字符,因此我们使用主映射:
      new_name 'original_name' type
      
      • new_name — Manticore 兼容的字段名。
      • original_name — 原始 JSON 键,可能包含特殊字符。如果需要,使用 \' 转义撇号。
  • type=kafka - 指定 Kafka 作为数据源。
  • broker_list='kafka:9092' — 消息代理列表,逗号分隔。
  • topic_list='wikimedia' — 要读取的主题列表,逗号分隔。
  • consumer_group='ms_wikimedia' — 消费者组名称。
  • num_consumers='1' — 处理消息的进程数(通常与主题的分区数匹配或为其倍数)。
  • batch=200 — 批处理消息的大小,影响性能并单独调整。

创建结果表

我们已经创建了一个从 Kafka 读取数据的数据源,但仍然需要一个存储这些数据的目的地。让我们创建一个表来存储处理后的消息:

一个关键字段是消息 ID。在数据传输过程中,网络故障、Kafka 代理崩溃或 Manticore 停机等问题可能导致重复消息。为了避免重复,我们使用唯一的 ID:如果记录已经在表中存在,则跳过。

除了 ID,表还将包括 typetitletitle_urlcommenttimestampuserbotminorlengthserver_urlserver_namewikimeta 等字段。

docker compose exec manticore mysql -e "create table wiki_results (
  id bigint, 
  \`schema\` text, 
  metadata json, 
  type text, 
  namespace int, 
  title text, 
  title_url text, 
  comment text, 
  \`timestamp\` timestamp, 
  user string, 
  bot bool, 
  minor bool, 
  length_old int, 
  length_new int, 
  length_diff int, 
  server_url text, 
  server_name text, 
  wiki text, 
  received_at timestamp
)"

我们已将 length 字段拆分为 length_oldlength_new,以演示映射功能。

创建物化视图

在源(Kafka)和目标(表)都准备就绪后,我们现在需要将它们连接起来,并定义数据在它们之间流动的方式。这就是物化视图的作用——它充当从 Kafka 到我们表的实时 ETL 过程,转换移动的数据:

输入 JSON 键源键 / 函数目标字段
ididid
$schemaschemaschema
metametametadata
typetypetype
namespacenamespacenamespace
titletitletitle
title_urltitle_urltitle_url
commentcommentcomment
timestamptimestamptimestamp
useruseruser
botbotbot
minorminorminor
length.oldlength.oldlength_old
length.newlength.newlength_new
-integer(length.old) - integer(length.new)length_diff
server_urlserver_urlserver_url
server_nameserver_nameserver_name
wikiwikiwiki
-UTC_TIMESTAMP()received_at

创建它的命令:

docker compose exec manticore mysql -e "
CREATE MATERIALIZED VIEW wiki_mva
TO wiki_results AS
SELECT
  id,
  \`schema\`,
  meta AS metadata,
  type,
  namespace,
  title,
  title_url,
  comment,
  \`timestamp\`,
  user,
  bot,
  minor,
  length.old as length_old,
  length.new as length_new,
  integer(length.old) - integer(length.new) as length_diff,
  server_url,
  server_name,
  wiki,
  UTC_TIMESTAMP() as received_at
FROM wiki_source"

本质上,这是一个标准的 SELECT 查询,对于使用 MySQL 或类似数据库的人来说非常熟悉:

  • 源(SOURCE)和目标表中名称匹配的字段保持原样(idschematype 等)。
  • 需要转换的字段(例如 metametadata)使用 AS 指定,格式为 original_name AS new_name
  • 保留字如 schematimestamp 用反引号(`)括起来。
  • 嵌套的 JSON 值通过点号和 AS 访问(例如 length.new 作为 length_new)。
  • Manticore 支持广泛的数据处理函数,从计算到格式化。
  • 如果需要,可以添加过滤和分组。为了保持示例简单,我们跳过了这一步,但你可以在 FROM wiki_source 后添加 WHERE MATCH(@title, 'pizza')

完整的 Docker Compose 配置

现在我们了解了所有组件及其交互方式,让我们通过查看完整的 docker-compose.yml 文件来回顾一下。这个单一文件定义了我们的整个环境,包括所有三个服务(Kafka、Manticore 和 Kafkacat)以及网络配置。

你可以复制以下内容或直接从我们的 GitHub 仓库下载 现成的 docker-compose.yml

services:
  kafka:
    image: docker.io/bitnami/kafka:3.7
    container_name: kafka
    networks:
      - app-network
    environment:
      # KRaft settings
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      # Listeners
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
  manticore:
    image: manticoresearch/manticore:7.4.6
    container_name: manticore
    networks:
      - app-network
  kafkacat:
    profiles:
      - manual
    image: edenhill/kcat:1.7.1
    container_name: kcat
    tty: true
    entrypoint:
      - '/bin/sh'
      - '-c'
      - "apk update && apk add -f curl && curl -N https://stream.wikimedia.org/v2/stream/recentchange | awk '/^data: /{gsub(/^data: /, \"\"); print}' | kcat -P -b kafka:9092 -t wikimedia"
    networks:
      - app-network
networks:
  app-network:
    driver: bridge

运行设置

在我们的环境配置完成后,让我们检查数据如何通过系统流动。在将 docker-compose.yml 文件保存或下载到项目目录并按照前面所述启动服务后,你可以通过运行 SQL 查询来监控 Manticore 的数据摄入:

docker compose exec manticore mysql -e "SELECT count(*) FROM wiki_results"
+----------+
| count(*) |
+----------+
|     1200 |
+----------+

等待几秒钟后再次运行,你将得到更新的值:

docker compose exec manticore mysql -e "SELECT count(*) FROM wiki_results"
+----------+
| count(*) |
+----------+
|     1400 |
+----------+

一个简单的查询来查看数据:

docker compose exec manticore mysql -e "SELECT title, user, timestamp FROM wiki_results LIMIT 5"
+-----------------------------+------------------+------------+
| title                       | user             | timestamp  |
+-----------------------------+------------------+------------+
| Bobbi Gibb                  | BOT-Superzerocool| 1737470127 |
| Angela Alsobrooks           | Tomrtn           | 1737470214 |
| Category:Goldschmidt Stolper| MB-one           | 1737470211 |
| Oklahoma Sooners            | JohnDoe          | 1737470220 |
| File:Kluse - Phoenix.jpg    | WikiBot          | 1737470230 |
+-----------------------------+------------------+------------+

一个更复杂的带有分组的查询:

docker compose exec manticore mysql -e "SELECT 
    namespace, 
    COUNT(*) as count, 
    AVG(length_diff) as avg_length_change, 
    MAX(timestamp) as latest_edit, 
    title as sample_title 
FROM wiki_results 
WHERE MATCH('wiki') 
    GROUP BY namespace 
HAVING count > 5 
ORDER BY count DESC 
LIMIT 10 
OPTION ranker=sph04"
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
| namespace | count | avg_length_change   | latest_edit | sample_title                                                        |
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
|        14 |   998 |  116196508.99599199 |  1740684056 | Category:Wiki For Minorities in the Middle East 2025                |
|         0 |   634 | 3075575718.85488939 |  1740684057 | Oklahoma Sooners men's basketball                                   |
|         6 |   313 |   2758109067.434505 |  1740684056 | File:Kluse - Phoenix dactylifera 03 ies.jpg                         |
|         2 |    40 |   1825360728.625000 |  1740684053 | User:SD2125!                                                        |
|         4 |    21 | 3272355882.52380943 |  1740684051 | Commons:Wiki For Minorities in the Middle East                      |
|         3 |    16 |   3489659770.625000 |  1740684054 | Brugerdiskussion:Silas Nicolaisen                                   |
|         1 |    13 |   3634202801.230769 |  1740684045 | Diskussion:Nordische Skiweltmeisterschaften 2025                    |
|      1198 |    10 |   1288490146.500000 |  1740684053 | Translations:Commons:Wiki Loves Folklore 2025/Page display title/id |
|        10 |     8 |   3221223681.500000 |  1740684055 | Predefinição:Mana (série)                                           |
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+

修改数据源模式

如果你需要修改数据源模式(例如添加新字段、删除不必要的字段或更改数据类型),请按照以下步骤操作:

  1. 暂停物化视图
    首先暂停物化视图以停止从 Kafka 到 wiki_results 表的数据流:
    docker compose exec manticore mysql -e "ALTER MATERIALIZED VIEW wiki_mva suspended=1"
    
  2. 删除现有源
    删除当前数据源:
    docker compose exec manticore mysql -e "DROP SOURCE wiki_source"
    
  3. 使用更新后的模式创建新源
    创建具有修改后模式的新源。例如,添加来自 meta JSON 对象的 domain 字段、字段 parsedcomment 并将 namespace 类型更改为 bigint
    docker compose exec manticore mysql -e "CREATE SOURCE wiki_source (
      id bigint,
      schema '$schema' text,
      meta json,
      parsedcomment text,
      type text,
      namespace bigint,
      title text,
      title_url text,
      comment text,
      \`timestamp\` timestamp,
      user text,
      bot bool,
      minor bool,
      length json,
      server_url text,
      server_name text,
      wiki text
    )
    type='kafka'
    broker_list='kafka:9092'
    topic_list='wikimedia'
    consumer_group='ms_wikimedia'
    num_consumers='1'
    batch=200"
    
  4. 更新表(添加 domainparsedcomment 列):
    docker compose exec manticore mysql -e "ALTER TABLE wiki_results ADD COLUMN domain text;
    ALTER TABLE wiki_results ADD COLUMN parsedcomment text"
    
  5. 删除物化视图
    docker compose exec manticore mysql -e "DROP MV wiki_mva"
    
  6. 重新创建物化视图
    docker compose exec manticore mysql -e "CREATE MATERIALIZED VIEW wiki_mva
    TO wiki_results AS
    SELECT
      id,
      \`schema\`,
      meta AS metadata,
      meta.domain as domain,
      parsedcomment,
      type,
      namespace,
      title,
      title_url,
      comment,
      \`timestamp\`,
      user,
      bot,
      minor,
      length.old as length_old,
      length.new as length_new,
      integer(length.old) - integer(length.new) as length_diff,
      server_url,
      server_name,
      wiki,
      UTC_TIMESTAMP() as received_at
    FROM wiki_source"
    

如果你仅重新创建了 SOURCE 而未修改 MV,请使用以下命令恢复数据读取:

docker compose exec manticore mysql -e "ALTER MATERIALIZED VIEW wiki_mva suspended=0"

否则,物化视图应该已经恢复。

此过程让你完全控制模式更改,并允许灵活适应新需求。

结论

将Kafka与Manticore Search集成,为实时数据处理和分析提供了一种强大的解决方案。通过遵循本指南,您已使用Docker Compose搭建了一个健壮的环境,配置Kafka以处理消息流,并利用Manticore Search进行数据索引和查询。这种集成不仅增强了应用程序的功能,还简化了数据管理和分析。
无论您是在进行日志分析、内容索引还是其他任何数据驱动的应用程序,此设置都提供了一个可扩展且高效的框架。Manticore Search的灵活性使您能够根据具体需求定制数据处理管道,确保能够快速适应变化的要求。
我们鼓励您尝试此设置,探索Manticore Search的更多功能,并将示例调整为您的特定使用场景。完整的代码可在 GitHub 上找到,Manticore社区始终准备帮助您解决可能遇到的任何问题或挑战。深入探索,释放Kafka和Manticore Search在实时数据处理中的全部潜力!

安装Manticore Search

安装Manticore Search