# 将 Kafka 与 Manticore 搜索集成：实时数据处理的逐步指南

学习如何设置 Kafka 与 Manticore 搜索的集成以进行实时数据处理。逐步指南附有示例：从配置 Docker Compose 到创建物化视图和分析维基媒体流的数据。

## 介绍

Kafka 是一个广泛用于各种项目的流行消息代理：从日志处理和任务队列管理到内容个性化和实时分析。例如，它可以用于索引维基百科的更改或在线商店中的产品搜索。Manticore 搜索则支持与 Kafka 的集成，从而实现自动数据导入，并可用于全文搜索、分析、向量搜索等。

在将数据导入 Manticore 时，您可以灵活地处理数据：

- 删除不必要的字段，添加新字段或修改现有字段；
- 计算地理位置之间的距离；
- 在保存前过滤数据；
- 使用全文搜索生成片段。

在本文中，我们将逐步构建一个小型应用程序，从 Kafka 获取数据并在 Manticore 搜索中进行处理。我们将使用 Docker Compose 来设置环境。本指南适合初学者和有经验的开发人员。完整的代码和演示可在 [GitHub](http://github.com/manticoresoftware/kafka-demo) 上找到。

---

## 设置环境

让我们从配置开发环境开始。我们将使用 Docker Compose 来设置整个环境，其中包括 Kafka、Manticore 搜索和一个用于流式传输数据的 Kafkacat 服务。我们首先分别查看每个服务的配置，然后提供完整的 `docker-compose.yml` 文件。

为了节省时间，您可以从我们的 GitHub 仓库下载 [完整的 docker-compose.yml 文件](https://github.com/manticoresoftware/kafka-demo/blob/main/docker-compose.yml)，如果您希望快速入门，可以跳至 [运行设置](#running-the-setup) 部分。

### 配置 Kafka

让我们首先设置 Kafka。我们将使用简化配置，采用 KRaft（Kafka Raft）协议，该协议取代了 ZooKeeper 以简化架构。以下是 docker-compose.yml 文件中 Kafka 服务的部分配置：

```yaml
kafka:
  image: docker.io/bitnami/kafka:3.7
  container_name: kafka
  networks:
    - app-network
  environment:
    # KRaft settings
    - KAFKA_CFG_NODE_ID=0
    - KAFKA_CFG_PROCESS_ROLES=controller,broker
    - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
    # Listeners
    - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
    - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
    - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
    - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
```

### 配置 Manticore

在 Kafka 设置好以处理我们的消息流后，我们现在需要一个搜索和分析引擎来处理数据。让我们使用一个最小但功能齐全的配置来设置 Manticore 搜索：

```yaml
manticore:
  image: manticoresearch/manticore:7.4.6
  container_name: manticore
  networks:
    - app-network
```

### 启动环境

使用以下命令启动基本容器（Kafka 和 Manticore）：

```shell
docker compose up -d
```

这将启动 Kafka 和 Manticore 服务，但尚未启动 Kafkacat 服务（因为它使用手动配置文件）。在服务运行后，创建一个 Kafka 主题。我们将分区数设置为 4，以便多个消费者并行读取数据，从而提高性能：

```shell
docker compose exec kafka kafka-topics.sh \
  --create \
  --topic wikimedia \
  --partitions 4 \
  --bootstrap-server localhost:9092
```

### 准备数据源

现在我们已经搭建好基础设施并准备好了接收消息的主题，让我们设置一个数据流，将实时内容馈送到 Kafka。为了将数据发送到 Kafka，我们将使用维基媒体流。以下是带有 `manual` 配置文件的 Kafkacat 服务配置（这样我们可以在设置主题和 Manticore 后手动启动它）：

```yaml
kafkacat:
  profiles:
    - manual
  image: edenhill/kcat:1.7.1
  container_name: kcat
  tty: true
  entrypoint:
    - '/bin/sh'
    - '-c'
    - "apk update && apk add -f curl && curl -N https://stream.wikimedia.org/v2/stream/recentchange | awk '/^data: /{gsub(/^data: /, \"\"); print}' | kcat -P -b kafka:9092 -t wikimedia"
  networks:
    - app-network
```

在使用手动配置文件配置 Kafkacat 服务后，您可以启动它以开始将数据流式传输到 Kafka：

```shell
docker compose --profile manual up -d
```

### 接收数据的示例

一旦维基媒体流开始流入 Kafka，您将开始以 JSON 格式接收消息。让我们检查一个典型消息以了解我们将处理的数据结构：

```json
{
  "$schema": "/mediawiki/recentchange/1.0.0",
  "meta": {
    "uri": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
    "request_id": "66d1686b-500e-438c-8985-8c7a28295ae8",
    "id": "345ce42e-3cac-46b7-901e-2c3161f53436",
    "dt": "2024-12-10T16:30:32Z",
    "domain": "es.wikipedia.org",
    "stream": "mediawiki.recentchange",
    "topic": "codfw.mediawiki.recentchange",
    "partition": 0,
    "offset": 1313008266
  },
  "id": 323817817,
  "type": "edit",
  "namespace": 2,
  "title": "Usuario:Davicilio/Taller",
  "title_url": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
  "comment": "/* Uniforme titular */",
  "timestamp": 1733848232,
  "user": "Davicilio",
  "bot": false,
  "notify_url": "https://es.wikipedia.org/w/index.php?diff=164049521&oldid=164010074",
  "minor": false,
  "length": {
    "old": 29666,
    "new": 29691
  },
  "revision": {
    "old": 164010074,
    "new": 164049521
  },
  "server_url": "https://es.wikipedia.org",
  "server_name": "es.wikipedia.org",
  "wiki": "eswiki",
  "parsedcomment": "<span class=\"autocomment\"><a href=\"/wiki/Usuario:Davicilio/Taller#Uniforme_titular\">→<bdi>Uniforme titular</bdi></a></span>"
}
```

---

## 在 Manticore 中处理数据

现在 Kafka 已经从维基媒体流接收数据，让我们配置 Manticore 搜索以处理和索引这些数据，以便进行搜索和分析。

### 创建数据源

让我们创建一个 `SOURCE`，它将从 Kafka 读取数据。我们将只指定我们感兴趣的字段——其他字段将被忽略。如果某个字段在模式中但消息中缺失，它将根据数据类型设置为 `NULL` 或留空：

```shell
docker compose exec manticore mysql -e "
CREATE SOURCE wiki_source (
  id bigint,
  schema '\$schema' text,
  meta json,
  type text,
  namespace int,
  title text,
  title_url text,
  comment text,
  \`timestamp\` timestamp,
  user text,
  bot bool,
  minor bool,
  length json,
  server_url text,
  server_name text,
  wiki text
)
type='kafka'
broker_list='kafka:9092'
topic_list='wikimedia'
consumer_group='ms_wikimedia'
num_consumers='1'
batch=200"
```

解释：

- `CREATE SOURCE` - 创建数据源的命令。
- `(id bigint, schema '$schema' text, …)` - 从传入消息中提取的字段列表，映射到 Manticore 支持的数据类型 ([数据类型列表](https://manual.manticoresearch.com/Creating_a_table/Data_types#Data-types))。
    - **字段 `$schema`** - Manticore 不允许字段名称中包含特殊字符，因此我们使用主映射：
      ```
      new_name 'original_name' type
      ```
        - `new_name` — Manticore 兼容的字段名称。
        - `original_name` — 原始 JSON 键，可能包含特殊字符。如果需要，使用 `\'` 转义撇号。
- `type=kafka` - 指定 Kafka 作为数据源。
- `broker_list='kafka:9092'` — 消息代理列表，逗号分隔。
- `topic_list='wikimedia'` — 要读取的主题列表，逗号分隔。
- `consumer_group='ms_wikimedia'` — 消费者组名称。
- `num_consumers='1'` — 处理消息的进程数（通常与主题的分区数匹配或为其倍数）。
- `batch=200` — 批处理消息的大小，影响性能，需单独调整。

### 创建结果表

我们已经创建了一个从 Kafka 读取数据的数据源，但还需要一个存储这些数据的目标。让我们创建一个表来存储处理后的消息：

一个关键字段是消息 `ID`。在数据传输过程中，网络故障、Kafka 代理崩溃或 Manticore 停机等问题可能导致重复消息。为了避免重复，我们使用唯一的 `ID`：如果记录已经在表中存在，则跳过。

除了 `ID`，表还将包括字段如 `type`、`title`、`title_url`、`comment`、`timestamp`、`user`、`bot`、`minor`、`length`、`server_url`、`server_name`、`wiki` 和 `meta`。

```shell
docker compose exec manticore mysql -e "create table wiki_results (
  id bigint, 
  \`schema\` text, 
  metadata json, 
  type text, 
  namespace int, 
  title text, 
  title_url text, 
  comment text, 
  \`timestamp\` timestamp, 
  user string, 
  bot bool, 
  minor bool, 
  length_old int, 
  length_new int, 
  length_diff int, 
  server_url text, 
  server_name text, 
  wiki text, 
  received_at timestamp
)"
```

我们已将 `length` 字段拆分为 `length_old` 和 `length_new`，以演示映射功能。

### 创建物化视图

在源（Kafka）和目标（表）都准备就绪后，我们现在需要将它们连接起来，并定义数据在它们之间流动的方式。这就是物化视图的作用——它充当从 Kafka 到我们表的实时 ETL 过程，转换移动的数据：

| **输入 JSON 键** | **源键 / 函数**                     | **目标字段** |
|--------------------|-----------------------------------------------|-----------------------|
| id                 | id                                            | id                    |
| **$schema**        | **schema**                                    | **schema**            |
| meta               | meta                                          | metadata              |
| type               | type                                          | type                  |
| namespace          | namespace                                     | namespace             |
| title              | title                                         | title                 |
| title_url          | title_url                                     | title_url             |
| comment            | comment                                       | comment               |
| **timestamp**      | **timestamp**                                 | **timestamp**         |
| user               | user                                          | user                  |
| bot                | bot                                           | bot                   |
| minor              | minor                                         | minor                 |
| **length.old**     | **length.old**                                | **length_old**        |
| **length.new**     | **length.new**                                | **length_new**        |
| **-**              | **integer(length.old) - integer(length.new)** | **length_diff**       |
| server_url         | server_url                                    | server_url            |
| server_name        | server_name                                   | server_name           |
| wiki               | wiki                                          | wiki                  |
| **-**              | **UTC_TIMESTAMP()**                           | **received_at**       |

创建它的命令：

```shell
docker compose exec manticore mysql -e "
CREATE MATERIALIZED VIEW wiki_mva
TO wiki_results AS
SELECT
  id,
  \`schema\`,
  meta AS metadata,
  type,
  namespace,
  title,
  title_url,
  comment,
  \`timestamp\`,
  user,
  bot,
  minor,
  length.old as length_old,
  length.new as length_new,
  integer(length.old) - integer(length.new) as length_diff,
  server_url,
  server_name,
  wiki,
  UTC_TIMESTAMP() as received_at
FROM wiki_source"
```

本质上，这是一个标准的 `SELECT` 查询，对于使用 MySQL 或类似数据库的人来说非常熟悉：

- 源（`SOURCE`）和目标表中名称匹配的字段保持原样（`id`、`schema`、`type` 等）。
- 需要转换的字段（例如 `meta` 到 `metadata`）使用 `AS` 指定，格式为 `original_name AS new_name`。
- 保留字如 `schema` 和 `timestamp` 用反引号（`）括起来。
- 嵌套的 JSON 值通过点号和 `AS` 访问（例如 `length.new` 作为 `length_new`）。
- Manticore 支持广泛的数据处理函数，从计算到格式化。
- 如果需要，可以添加过滤和分组。为了保持示例简单，我们跳过了这一步，但你可以在 `FROM wiki_source` 后添加 `WHERE MATCH(@title, 'pizza')`。

## 完整的 Docker Compose 配置

现在我们了解了所有组件及其交互方式，让我们通过查看完整的 `docker-compose.yml` 文件来回顾一下。这个单一文件定义了我们的整个环境，包括所有三个服务（Kafka、Manticore 和 Kafkacat）以及网络配置。

你可以复制以下内容或直接从我们的 GitHub 仓库下载 [现成的 docker-compose.yml](https://github.com/manticoresoftware/kafka-demo/blob/main/docker-compose.yml)：

```yaml
services:
  kafka:
    image: docker.io/bitnami/kafka:3.7
    container_name: kafka
    networks:
      - app-network
    environment:
      # KRaft settings
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      # Listeners
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
  manticore:
    image: manticoresearch/manticore:7.4.6
    container_name: manticore
    networks:
      - app-network
  kafkacat:
    profiles:
      - manual
    image: edenhill/kcat:1.7.1
    container_name: kcat
    tty: true
    entrypoint:
      - '/bin/sh'
      - '-c'
      - "apk update && apk add -f curl && curl -N https://stream.wikimedia.org/v2/stream/recentchange | awk '/^data: /{gsub(/^data: /, \"\"); print}' | kcat -P -b kafka:9092 -t wikimedia"
    networks:
      - app-network
networks:
  app-network:
    driver: bridge
```

## 运行设置

在我们的环境配置完成后，让我们检查数据如何通过系统流动。在将 `docker-compose.yml` 文件保存或下载到项目目录并按照前面所述启动服务后，你可以通过运行 SQL 查询来监控 Manticore 的数据摄入：

```sql
docker compose exec manticore mysql -e "SELECT count(*) FROM wiki_results"
```
```
+----------+
| count(*) |
+----------+
|     1200 |
+----------+
```

等待几秒钟后再次运行，你将得到更新的值：

```sql
docker compose exec manticore mysql -e "SELECT count(*) FROM wiki_results"
```

```
+----------+
| count(*) |
+----------+
|     1400 |
+----------+
```

一个简单的查询来查看数据：

```sql
docker compose exec manticore mysql -e "SELECT title, user, timestamp FROM wiki_results LIMIT 5"
```
```
+-----------------------------+------------------+------------+
| title                       | user             | timestamp  |
+-----------------------------+------------------+------------+
| Bobbi Gibb                  | BOT-Superzerocool| 1737470127 |
| Angela Alsobrooks           | Tomrtn           | 1737470214 |
| Category:Goldschmidt Stolper| MB-one           | 1737470211 |
| Oklahoma Sooners            | JohnDoe          | 1737470220 |
| File:Kluse - Phoenix.jpg    | WikiBot          | 1737470230 |
+-----------------------------+------------------+------------+
```

一个更复杂的带有分组的查询：

```sql
docker compose exec manticore mysql -e "SELECT 
    namespace, 
    COUNT(*) as count, 
    AVG(length_diff) as avg_length_change, 
    MAX(timestamp) as latest_edit, 
    title as sample_title 
FROM wiki_results 
WHERE MATCH('wiki') 
    GROUP BY namespace 
HAVING count > 5 
ORDER BY count DESC 
LIMIT 10 
OPTION ranker=sph04"
```
```
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
| namespace | count | avg_length_change   | latest_edit | sample_title                                                        |
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
|        14 |   998 |  116196508.99599199 |  1740684056 | Category:Wiki For Minorities in the Middle East 2025                |
|         0 |   634 | 3075575718.85488939 |  1740684057 | Oklahoma Sooners men's basketball                                   |
|         6 |   313 |   2758109067.434505 |  1740684056 | File:Kluse - Phoenix dactylifera 03 ies.jpg                         |
|         2 |    40 |   1825360728.625000 |  1740684053 | User:SD2125!                                                        |
|         4 |    21 | 3272355882.52380943 |  1740684051 | Commons:Wiki For Minorities in the Middle East                      |
|         3 |    16 |   3489659770.625000 |  1740684054 | Brugerdiskussion:Silas Nicolaisen                                   |
|         1 |    13 |   3634202801.230769 |  1740684045 | Diskussion:Nordische Skiweltmeisterschaften 2025                    |
|      1198 |    10 |   1288490146.500000 |  1740684053 | Translations:Commons:Wiki Loves Folklore 2025/Page display title/id |
|        10 |     8 |   3221223681.500000 |  1740684055 | Predefinição:Mana (série)                                           |
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
```

## 修改数据源模式

如果你需要修改数据源模式（例如添加新字段、删除不必要的字段或更改数据类型），请按照以下步骤操作：

1. **暂停物化视图**  
   首先暂停物化视图以停止从 Kafka 到 `wiki_results` 表的数据流：
   ```sql
   docker compose exec manticore mysql -e "ALTER MATERIALIZED VIEW wiki_mva suspended=1"
   ```
2. **删除现有源**  
   删除当前数据源：
   ```sql
   docker compose exec manticore mysql -e "DROP SOURCE wiki_source"
   ```
3. **使用更新后的模式创建新源**  
   创建具有修改后模式的新源。例如，添加来自 `meta` JSON 对象的 `domain` 字段、字段 `parsedcomment` 并将 `namespace` 类型更改为 `bigint`：
   ```sql
   docker compose exec manticore mysql -e "CREATE SOURCE wiki_source (
     id bigint,
     schema '$schema' text,
     meta json,
     parsedcomment text,
     type text,
     namespace bigint,
     title text,
     title_url text,
     comment text,
     \`timestamp\` timestamp,
     user text,
     bot bool,
     minor bool,
     length json,
     server_url text,
     server_name text,
     wiki text
   )
   type='kafka'
   broker_list='kafka:9092'
   topic_list='wikimedia'
   consumer_group='ms_wikimedia'
   num_consumers='1'
   batch=200"
   ```
4. **更新表**（添加 `domain` 和 `parsedcomment` 列）：
   ```sql
   docker compose exec manticore mysql -e "ALTER TABLE wiki_results ADD COLUMN domain text;
   ALTER TABLE wiki_results ADD COLUMN parsedcomment text"
   ```
5. **删除物化视图**：
   ```sql
   docker compose exec manticore mysql -e "DROP MV wiki_mva"
   ```
6. **重新创建物化视图**：
   ```sql
   docker compose exec manticore mysql -e "CREATE MATERIALIZED VIEW wiki_mva
   TO wiki_results AS
   SELECT
     id,
     \`schema\`,
     meta AS metadata,
     meta.domain as domain,
     parsedcomment,
     type,
     namespace,
     title,
     title_url,
     comment,
     \`timestamp\`,
     user,
     bot,
     minor,
     length.old as length_old,
     length.new as length_new,
     integer(length.old) - integer(length.new) as length_diff,
     server_url,
     server_name,
     wiki,
     UTC_TIMESTAMP() as received_at
   FROM wiki_source"
   ```

*如果你仅重新创建了 `SOURCE` 而未修改 `MV`，请使用以下命令恢复数据读取：*

```sql
docker compose exec manticore mysql -e "ALTER MATERIALIZED VIEW wiki_mva suspended=0"
```

否则，物化视图应该已经恢复。

此过程让你完全控制模式更改，并允许灵活适应新需求。

### 结论

将Kafka与Manticore Search集成，为实时数据处理和分析提供了一种强大的解决方案。通过遵循本指南，您已使用Docker Compose搭建了一个健壮的环境，配置Kafka以处理消息流，并利用Manticore Search进行数据索引和查询。这种集成不仅增强了应用程序的功能，还简化了数据管理和分析。  
无论您是在进行日志分析、内容索引还是其他任何数据驱动的应用程序，此设置都提供了一个可扩展且高效的框架。Manticore Search的灵活性使您能够根据具体需求定制数据处理管道，确保能够快速适应变化的要求。  
我们鼓励您尝试此设置，探索Manticore Search的更多功能，并将示例调整为您的特定使用场景。完整的代码可在[GitHub](https://github.com/manticoresoftware/kafka-demo)上找到，Manticore社区始终准备帮助您解决可能遇到的任何问题或挑战。深入探索，释放Kafka和Manticore Search在实时数据处理中的全部潜力！
