blog-post

Integrating Kafka with Manticore Search: A Step-by-Step Guide to Real-Time Data Processing

Introduction

Kafka is a popular message broker used in a wide range of projects: from log processing and task queue management to content personalization and real-time analytics. For example, it can be used to index changes in Wikipedia or search for products in online stores. Manticore Search, in turn, supports integration with Kafka, enabling automatic data import and its use for full-text search, analytics, vector search, and more.

When importing data into Manticore, you can flexibly process it:

  • Remove unnecessary fields, add new ones, or modify existing ones;
  • Calculate distances between geolocations;
  • Filter data before saving;
  • Generate snippets using full-text search.

In this article, we’ll walk through building a small application that retrieves data from Kafka and processes it in Manticore Search. We’ll use Docker Compose to set up the environment. This guide is suitable for both beginners and experienced developers. The complete code and demo are available on GitHub .


Setting Up the Environment

Let’s begin by configuring our development environment. We’ll use Docker Compose to set up our entire environment, which will include Kafka, Manticore Search, and a Kafkacat service for streaming data. We’ll first look at each service configuration separately, and then provide the complete docker-compose.yml file.

To save time, you can download the complete docker-compose.yml file from our GitHub repository and skip to the Running the Setup section if you prefer to get started quickly.

Configuring Kafka

Let’s start by setting up Kafka. We’ll use a simplified configuration with the KRaft (Kafka Raft) protocol, which replaces ZooKeeper to streamline the architecture. Here’s the Kafka service portion of our docker-compose.yml file:

kafka:
  image: docker.io/bitnami/kafka:3.7
  container_name: kafka
  networks:
    - app-network
  environment:
    # KRaft settings
    - KAFKA_CFG_NODE_ID=0
    - KAFKA_CFG_PROCESS_ROLES=controller,broker
    - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
    # Listeners
    - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
    - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
    - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
    - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT

Configuring Manticore

With Kafka set up to handle our message streaming, we now need a search and analytics engine to process the data. Let’s configure Manticore Search using a minimal but functional configuration:

manticore:
  image: manticoresearch/manticore:7.4.6
  container_name: manticore
  networks:
    - app-network

Starting the Environment

Launch the basic containers (Kafka and Manticore) with the following command:

docker compose up -d

This starts the Kafka and Manticore services, but not the Kafkacat service yet (since it uses a manual profile). After the services are running, create a topic in Kafka. We’ll set the number of partitions to 4 for parallel data reading by multiple consumers, which improves performance:

docker compose exec kafka kafka-topics.sh \
  --create \
  --topic wikimedia \
  --partitions 4 \
  --bootstrap-server localhost:9092

Preparing Data Sources

Now that we have our infrastructure up and running with a topic ready to receive messages, let’s set up a data stream that feeds real-time content into Kafka. To send data to Kafka, we’ll use the Wikimedia Stream. Here’s the Kafkacat service configuration with a manual profile (so we can start it manually after setting up the topic and Manticore):

kafkacat:
  profiles:
    - manual
  image: edenhill/kcat:1.7.1
  container_name: kcat
  tty: true
  entrypoint:
    - '/bin/sh'
    - '-c'
    - "apk update && apk add -f curl && curl -N https://stream.wikimedia.org/v2/stream/recentchange | awk '/^data: /{gsub(/^data: /, \"\"); print}' | kcat -P -b kafka:9092 -t wikimedia"
  networks:
    - app-network

After configuring the Kafkacat service with the manual profile, you can start it to begin streaming data to Kafka:

docker compose --profile manual up -d

Example of Received Data

Once the Wikimedia stream starts flowing into Kafka, you’ll begin receiving messages in JSON format. Let’s examine a typical message to understand the data structure we’ll be working with:

{
  "$schema": "/mediawiki/recentchange/1.0.0",
  "meta": {
    "uri": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
    "request_id": "66d1686b-500e-438c-8985-8c7a28295ae8",
    "id": "345ce42e-3cac-46b7-901e-2c3161f53436",
    "dt": "2024-12-10T16:30:32Z",
    "domain": "es.wikipedia.org",
    "stream": "mediawiki.recentchange",
    "topic": "codfw.mediawiki.recentchange",
    "partition": 0,
    "offset": 1313008266
  },
  "id": 323817817,
  "type": "edit",
  "namespace": 2,
  "title": "Usuario:Davicilio/Taller",
  "title_url": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
  "comment": "/* Uniforme titular */",
  "timestamp": 1733848232,
  "user": "Davicilio",
  "bot": false,
  "notify_url": "https://es.wikipedia.org/w/index.php?diff=164049521&oldid=164010074",
  "minor": false,
  "length": {
    "old": 29666,
    "new": 29691
  },
  "revision": {
    "old": 164010074,
    "new": 164049521
  },
  "server_url": "https://es.wikipedia.org",
  "server_name": "es.wikipedia.org",
  "wiki": "eswiki",
  "parsedcomment": "<span class=\"autocomment\"><a href=\"/wiki/Usuario:Davicilio/Taller#Uniforme_titular\">→<bdi>Uniforme titular</bdi></a></span>"
}

Working with Data in Manticore

Now that we have Kafka receiving data from the Wikimedia stream, let’s configure Manticore Search to process and index this data for searching and analysis.

Creating a Data Source

Let’s create a SOURCE that will read data from Kafka. We’ll specify only the fields we’re interested in—others will be ignored. If a field is in the schema but missing from a message, it will be set to NULL or left empty, depending on the data type:

docker compose exec manticore mysql -e "
CREATE SOURCE wiki_source (
  id bigint,
  schema '\$schema' text,
  meta json,
  type text,
  namespace int,
  title text,
  title_url text,
  comment text,
  \`timestamp\` timestamp,
  user text,
  bot bool,
  minor bool,
  length json,
  server_url text,
  server_name text,
  wiki text
)
type='kafka'
broker_list='kafka:9092'
topic_list='wikimedia'
consumer_group='ms_wikimedia'
num_consumers='1'
batch=200"

Explanations:

  • CREATE SOURCE - Command to create a data source.
  • (id bigint, schema '$schema' text, …) - List of fields from the incoming message, mapped to Manticore-supported data types ( list of data types ).
    • Field $schema - Manticore doesn’t allow special characters in field names, so we use primary mapping:
      new_name 'original_name' type
      
      • new_name — Manticore-compatible field name.
      • original_name — Original JSON key, which may include special characters. Use \' to escape apostrophes if needed.
  • type=kafka - Specifies Kafka as the data source.
  • broker_list='kafka:9092' — List of message brokers, comma-separated.
  • topic_list='wikimedia' — List of topics to read, comma-separated.
  • consumer_group='ms_wikimedia' — Consumer group name.
  • num_consumers='1' — Number of processes handling messages (typically matches or is a multiple of the topic’s partition count).
  • batch=200 — Batch size for processing messages, affecting performance and tuned individually.

Creating a Results Table

We’ve created a data source to read from Kafka, but we still need a destination for that data. Let’s create a table to store the processed messages:

A key field is the message ID. During data transmission, issues like network failures, Kafka broker crashes, or Manticore downtime can lead to duplicate messages. To avoid duplicates, we use a unique ID: if a record already exists in the table, it’s skipped.

In addition to ID, the table will include fields like type, title, title_url, comment, timestamp, user, bot, minor, length, server_url, server_name, wiki, and meta.

docker compose exec manticore mysql -e "create table wiki_results (
  id bigint, 
  \`schema\` text, 
  metadata json, 
  type text, 
  namespace int, 
  title text, 
  title_url text, 
  comment text, 
  \`timestamp\` timestamp, 
  user string, 
  bot bool, 
  minor bool, 
  length_old int, 
  length_new int, 
  length_diff int, 
  server_url text, 
  server_name text, 
  wiki text, 
  received_at timestamp
)"

We’ve split the length field into length_old and length_new to demonstrate mapping capabilities.

Creating a Materialized View

With both our source (Kafka) and destination (the table) in place, we now need to connect them and define how data should flow between them. This is where a materialized view comes in — it acts as a real-time ETL process that transforms data as it moves from Kafka to our table:

Input JSON keySource key / FunctionDestination field
ididid
$schemaschemaschema
metametametadata
typetypetype
namespacenamespacenamespace
titletitletitle
title_urltitle_urltitle_url
commentcommentcomment
timestamptimestamptimestamp
useruseruser
botbotbot
minorminorminor
length.oldlength.oldlength_old
length.newlength.newlength_new
-integer(length.old) - integer(length.new)length_diff
server_urlserver_urlserver_url
server_nameserver_nameserver_name
wikiwikiwiki
-UTC_TIMESTAMP()received_at

Command to create it:

docker compose exec manticore mysql -e "
CREATE MATERIALIZED VIEW wiki_mva
TO wiki_results AS
SELECT
  id,
  \`schema\`,
  meta AS metadata,
  type,
  namespace,
  title,
  title_url,
  comment,
  \`timestamp\`,
  user,
  bot,
  minor,
  length.old as length_old,
  length.new as length_new,
  integer(length.old) - integer(length.new) as length_diff,
  server_url,
  server_name,
  wiki,
  UTC_TIMESTAMP() as received_at
FROM wiki_source"

Essentially, this is a standard SELECT query, familiar to those who work with MySQL or similar databases:

  • Fields with matching names in the source (SOURCE) and target table are left as-is (id, schema, type, etc.).
  • Fields needing transformation (e.g., meta to metadata) are specified with AS in the format original_name AS new_name.
  • Reserved words like schema and timestamp are enclosed in backticks (`).
  • Nested JSON values are accessed with a dot and AS (e.g., length.new as length_new).
  • Manticore supports a wide range of functions for data processing, from calculations to formatting.
  • Filtering and grouping can be added if needed. We skipped this to keep the example simple, but you could add WHERE MATCH(@title, 'pizza') after FROM wiki_source.

Complete Docker Compose Configuration

Now that we understand all the components and how they interact, let’s recap by looking at the complete docker-compose.yml file. This single file defines our entire environment with all three services (Kafka, Manticore, and Kafkacat) plus the network configuration.

You can either copy the following content or download the ready-to-use docker-compose.yml directly from our GitHub repository:

services:
  kafka:
    image: docker.io/bitnami/kafka:3.7
    container_name: kafka
    networks:
      - app-network
    environment:
      # KRaft settings
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      # Listeners
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
  manticore:
    image: manticoresearch/manticore:7.4.6
    container_name: manticore
    networks:
      - app-network
  kafkacat:
    profiles:
      - manual
    image: edenhill/kcat:1.7.1
    container_name: kcat
    tty: true
    entrypoint:
      - '/bin/sh'
      - '-c'
      - "apk update && apk add -f curl && curl -N https://stream.wikimedia.org/v2/stream/recentchange | awk '/^data: /{gsub(/^data: /, \"\"); print}' | kcat -P -b kafka:9092 -t wikimedia"
    networks:
      - app-network
networks:
  app-network:
    driver: bridge

Running the Setup

With our environment configured, let’s check how the data flows through the system. After saving or downloading the docker-compose.yml file to your project directory and starting the services as described earlier, you can monitor the data ingestion by running SQL queries against Manticore:

docker compose exec manticore mysql -e "SELECT count(*) FROM wiki_results"
+----------+
| count(*) |
+----------+
|     1200 |
+----------+

Wait a few seconds and run it again and you’ll get an updated value:

docker compose exec manticore mysql -e "SELECT count(*) FROM wiki_results"
+----------+
| count(*) |
+----------+
|     1400 |
+----------+

A simple query to view the data:

docker compose exec manticore mysql -e "SELECT title, user, timestamp FROM wiki_results LIMIT 5"
+-----------------------------+------------------+------------+
| title                       | user             | timestamp  |
+-----------------------------+------------------+------------+
| Bobbi Gibb                  | BOT-Superzerocool| 1737470127 |
| Angela Alsobrooks           | Tomrtn           | 1737470214 |
| Category:Goldschmidt Stolper| MB-one           | 1737470211 |
| Oklahoma Sooners            | JohnDoe          | 1737470220 |
| File:Kluse - Phoenix.jpg    | WikiBot          | 1737470230 |
+-----------------------------+------------------+------------+

A more complex query with grouping:

docker compose exec manticore mysql -e "SELECT 
    namespace, 
    COUNT(*) as count, 
    AVG(length_diff) as avg_length_change, 
    MAX(timestamp) as latest_edit, 
    title as sample_title 
FROM wiki_results 
WHERE MATCH('wiki') 
    GROUP BY namespace 
HAVING count > 5 
ORDER BY count DESC 
LIMIT 10 
OPTION ranker=sph04"
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
| namespace | count | avg_length_change   | latest_edit | sample_title                                                        |
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
|        14 |   998 |  116196508.99599199 |  1740684056 | Category:Wiki For Minorities in the Middle East 2025                |
|         0 |   634 | 3075575718.85488939 |  1740684057 | Oklahoma Sooners men's basketball                                   |
|         6 |   313 |   2758109067.434505 |  1740684056 | File:Kluse - Phoenix dactylifera 03 ies.jpg                         |
|         2 |    40 |   1825360728.625000 |  1740684053 | User:SD2125!                                                        |
|         4 |    21 | 3272355882.52380943 |  1740684051 | Commons:Wiki For Minorities in the Middle East                      |
|         3 |    16 |   3489659770.625000 |  1740684054 | Brugerdiskussion:Silas Nicolaisen                                   |
|         1 |    13 |   3634202801.230769 |  1740684045 | Diskussion:Nordische Skiweltmeisterschaften 2025                    |
|      1198 |    10 |   1288490146.500000 |  1740684053 | Translations:Commons:Wiki Loves Folklore 2025/Page display title/id |
|        10 |     8 |   3221223681.500000 |  1740684055 | Predefinição:Mana (série)                                           |
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+

Modifying the Data Source Schema

If you need to modify the data source schema (e.g., add new fields, remove unnecessary ones, or change data types), follow these steps:

  1. Pause the Materialized View
    First, pause the materialized view to stop the data flow from Kafka to the wiki_results table:
    docker compose exec manticore mysql -e "ALTER MATERIALIZED VIEW wiki_mva suspended=1"
    
  2. Drop the Existing Source
    Delete the current data source:
    docker compose exec manticore mysql -e "DROP SOURCE wiki_source"
    
  3. Create a New Source with the Updated Schema
    Create a new source with the modified schema. For example, to add the domain field from the meta JSON object, field parsedcomment and change the namespace type to bigint:
    docker compose exec manticore mysql -e "CREATE SOURCE wiki_source (
      id bigint,
      schema '$schema' text,
      meta json,
      parsedcomment text,
      type text,
      namespace bigint,
      title text,
      title_url text,
      comment text,
      \`timestamp\` timestamp,
      user text,
      bot bool,
      minor bool,
      length json,
      server_url text,
      server_name text,
      wiki text
    )
    type='kafka'
    broker_list='kafka:9092'
    topic_list='wikimedia'
    consumer_group='ms_wikimedia'
    num_consumers='1'
    batch=200"
    
  4. Update the Table (add the domain and parsedcomment columns):
    docker compose exec manticore mysql -e "ALTER TABLE wiki_results ADD COLUMN domain text;
    ALTER TABLE wiki_results ADD COLUMN parsedcomment text"
    
  5. Drop the Materialized View:
    docker compose exec manticore mysql -e "DROP MV wiki_mva"
    
  6. Recreate the Materialized View:
    docker compose exec manticore mysql -e "CREATE MATERIALIZED VIEW wiki_mva
    TO wiki_results AS
    SELECT
      id,
      \`schema\`,
      meta AS metadata,
      meta.domain as domain,
      parsedcomment,
      type,
      namespace,
      title,
      title_url,
      comment,
      \`timestamp\`,
      user,
      bot,
      minor,
      length.old as length_old,
      length.new as length_new,
      integer(length.old) - integer(length.new) as length_diff,
      server_url,
      server_name,
      wiki,
      UTC_TIMESTAMP() as received_at
    FROM wiki_source"
    

If you’ve only recreated the SOURCE without modifying the MV, resume data reading with:

docker compose exec manticore mysql -e "ALTER MATERIALIZED VIEW wiki_mva suspended=0"

Otherwise, the materialized view should already be resumed.

This process gives you full control over schema changes and allows flexible adaptation to new requirements.

Conclusion

Integrating Kafka with Manticore Search offers a powerful solution for real-time data processing and analytics. By following this guide, you’ve set up a robust environment using Docker Compose, configured Kafka to handle message streaming, and leveraged Manticore Search for indexing and querying data. This integration not only enhances your application’s functionality but also simplifies data management and analysis.
Whether you’re working on log analysis, content indexing, or any other data-driven application, this setup provides a scalable and efficient framework. The flexibility of Manticore Search allows you to tailor the data processing pipeline to your specific needs, ensuring that you can adapt quickly to changing requirements.
We encourage you to experiment with this setup, explore additional features of Manticore Search, and adapt the example to your unique use cases. The complete code is available on GitHub , and the Manticore community is always ready to help with any questions or challenges you might encounter. Dive in and unlock the full potential of real-time data processing with Kafka and Manticore Search!

Install Manticore Search

Install Manticore Search