Introduction
Kafka is a popular message broker used in a wide range of projects: from log processing and task queue management to content personalization and real-time analytics. For example, it can be used to index changes in Wikipedia or search for products in online stores. Manticore Search, in turn, supports integration with Kafka, enabling automatic data import and its use for full-text search, analytics, vector search, and more.
When importing data into Manticore, you can flexibly process it:
- Remove unnecessary fields, add new ones, or modify existing ones;
- Calculate distances between geolocations;
- Filter data before saving;
- Generate snippets using full-text search.
In this article, we’ll walk through building a small application that retrieves data from Kafka and processes it in Manticore Search. We’ll use Docker Compose to set up the environment. This guide is suitable for both beginners and experienced developers. The complete code and demo are available on GitHub .
Setting Up the Environment
Let’s begin by configuring our development environment. We’ll use Docker Compose to set up our entire environment, which will include Kafka, Manticore Search, and a Kafkacat service for streaming data. We’ll first look at each service configuration separately, and then provide the complete docker-compose.yml
file.
To save time, you can download the complete docker-compose.yml file from our GitHub repository and skip to the Running the Setup section if you prefer to get started quickly.
Configuring Kafka
Let’s start by setting up Kafka. We’ll use a simplified configuration with the KRaft (Kafka Raft) protocol, which replaces ZooKeeper to streamline the architecture. Here’s the Kafka service portion of our docker-compose.yml file:
kafka:
image: docker.io/bitnami/kafka:3.7
container_name: kafka
networks:
- app-network
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
Configuring Manticore
With Kafka set up to handle our message streaming, we now need a search and analytics engine to process the data. Let’s configure Manticore Search using a minimal but functional configuration:
manticore:
image: manticoresearch/manticore:7.4.6
container_name: manticore
networks:
- app-network
Starting the Environment
Launch the basic containers (Kafka and Manticore) with the following command:
docker compose up -d
This starts the Kafka and Manticore services, but not the Kafkacat service yet (since it uses a manual profile). After the services are running, create a topic in Kafka. We’ll set the number of partitions to 4 for parallel data reading by multiple consumers, which improves performance:
docker compose exec kafka kafka-topics.sh \
--create \
--topic wikimedia \
--partitions 4 \
--bootstrap-server localhost:9092
Preparing Data Sources
Now that we have our infrastructure up and running with a topic ready to receive messages, let’s set up a data stream that feeds real-time content into Kafka. To send data to Kafka, we’ll use the Wikimedia Stream. Here’s the Kafkacat service configuration with a manual
profile (so we can start it manually after setting up the topic and Manticore):
kafkacat:
profiles:
- manual
image: edenhill/kcat:1.7.1
container_name: kcat
tty: true
entrypoint:
- '/bin/sh'
- '-c'
- "apk update && apk add -f curl && curl -N https://stream.wikimedia.org/v2/stream/recentchange | awk '/^data: /{gsub(/^data: /, \"\"); print}' | kcat -P -b kafka:9092 -t wikimedia"
networks:
- app-network
After configuring the Kafkacat service with the manual profile, you can start it to begin streaming data to Kafka:
docker compose --profile manual up -d
Example of Received Data
Once the Wikimedia stream starts flowing into Kafka, you’ll begin receiving messages in JSON format. Let’s examine a typical message to understand the data structure we’ll be working with:
{
"$schema": "/mediawiki/recentchange/1.0.0",
"meta": {
"uri": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
"request_id": "66d1686b-500e-438c-8985-8c7a28295ae8",
"id": "345ce42e-3cac-46b7-901e-2c3161f53436",
"dt": "2024-12-10T16:30:32Z",
"domain": "es.wikipedia.org",
"stream": "mediawiki.recentchange",
"topic": "codfw.mediawiki.recentchange",
"partition": 0,
"offset": 1313008266
},
"id": 323817817,
"type": "edit",
"namespace": 2,
"title": "Usuario:Davicilio/Taller",
"title_url": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
"comment": "/* Uniforme titular */",
"timestamp": 1733848232,
"user": "Davicilio",
"bot": false,
"notify_url": "https://es.wikipedia.org/w/index.php?diff=164049521&oldid=164010074",
"minor": false,
"length": {
"old": 29666,
"new": 29691
},
"revision": {
"old": 164010074,
"new": 164049521
},
"server_url": "https://es.wikipedia.org",
"server_name": "es.wikipedia.org",
"wiki": "eswiki",
"parsedcomment": "<span class=\"autocomment\"><a href=\"/wiki/Usuario:Davicilio/Taller#Uniforme_titular\">→<bdi>Uniforme titular</bdi></a></span>"
}
Working with Data in Manticore
Now that we have Kafka receiving data from the Wikimedia stream, let’s configure Manticore Search to process and index this data for searching and analysis.
Creating a Data Source
Let’s create a SOURCE
that will read data from Kafka. We’ll specify only the fields we’re interested in—others will be ignored. If a field is in the schema but missing from a message, it will be set to NULL
or left empty, depending on the data type:
docker compose exec manticore mysql -e "
CREATE SOURCE wiki_source (
id bigint,
schema '\$schema' text,
meta json,
type text,
namespace int,
title text,
title_url text,
comment text,
\`timestamp\` timestamp,
user text,
bot bool,
minor bool,
length json,
server_url text,
server_name text,
wiki text
)
type='kafka'
broker_list='kafka:9092'
topic_list='wikimedia'
consumer_group='ms_wikimedia'
num_consumers='1'
batch=200"
Explanations:
CREATE SOURCE
- Command to create a data source.(id bigint, schema '$schema' text, …)
- List of fields from the incoming message, mapped to Manticore-supported data types ( list of data types ).- Field
$schema
- Manticore doesn’t allow special characters in field names, so we use primary mapping:new_name 'original_name' type
new_name
— Manticore-compatible field name.original_name
— Original JSON key, which may include special characters. Use\'
to escape apostrophes if needed.
- Field
type=kafka
- Specifies Kafka as the data source.broker_list='kafka:9092'
— List of message brokers, comma-separated.topic_list='wikimedia'
— List of topics to read, comma-separated.consumer_group='ms_wikimedia'
— Consumer group name.num_consumers='1'
— Number of processes handling messages (typically matches or is a multiple of the topic’s partition count).batch=200
— Batch size for processing messages, affecting performance and tuned individually.
Creating a Results Table
We’ve created a data source to read from Kafka, but we still need a destination for that data. Let’s create a table to store the processed messages:
A key field is the message ID
. During data transmission, issues like network failures, Kafka broker crashes, or Manticore downtime can lead to duplicate messages. To avoid duplicates, we use a unique ID
: if a record already exists in the table, it’s skipped.
In addition to ID
, the table will include fields like type
, title
, title_url
, comment
, timestamp
, user
, bot
, minor
, length
, server_url
, server_name
, wiki
, and meta
.
docker compose exec manticore mysql -e "create table wiki_results (
id bigint,
\`schema\` text,
metadata json,
type text,
namespace int,
title text,
title_url text,
comment text,
\`timestamp\` timestamp,
user string,
bot bool,
minor bool,
length_old int,
length_new int,
length_diff int,
server_url text,
server_name text,
wiki text,
received_at timestamp
)"
We’ve split the length
field into length_old
and length_new
to demonstrate mapping capabilities.
Creating a Materialized View
With both our source (Kafka) and destination (the table) in place, we now need to connect them and define how data should flow between them. This is where a materialized view comes in — it acts as a real-time ETL process that transforms data as it moves from Kafka to our table:
Input JSON key | Source key / Function | Destination field |
---|---|---|
id | id | id |
$schema | schema | schema |
meta | meta | metadata |
type | type | type |
namespace | namespace | namespace |
title | title | title |
title_url | title_url | title_url |
comment | comment | comment |
timestamp | timestamp | timestamp |
user | user | user |
bot | bot | bot |
minor | minor | minor |
length.old | length.old | length_old |
length.new | length.new | length_new |
- | integer(length.old) - integer(length.new) | length_diff |
server_url | server_url | server_url |
server_name | server_name | server_name |
wiki | wiki | wiki |
- | UTC_TIMESTAMP() | received_at |
Command to create it:
docker compose exec manticore mysql -e "
CREATE MATERIALIZED VIEW wiki_mva
TO wiki_results AS
SELECT
id,
\`schema\`,
meta AS metadata,
type,
namespace,
title,
title_url,
comment,
\`timestamp\`,
user,
bot,
minor,
length.old as length_old,
length.new as length_new,
integer(length.old) - integer(length.new) as length_diff,
server_url,
server_name,
wiki,
UTC_TIMESTAMP() as received_at
FROM wiki_source"
Essentially, this is a standard SELECT
query, familiar to those who work with MySQL or similar databases:
- Fields with matching names in the source (
SOURCE
) and target table are left as-is (id
,schema
,type
, etc.). - Fields needing transformation (e.g.,
meta
tometadata
) are specified withAS
in the formatoriginal_name AS new_name
. - Reserved words like
schema
andtimestamp
are enclosed in backticks (`). - Nested JSON values are accessed with a dot and
AS
(e.g.,length.new
aslength_new
). - Manticore supports a wide range of functions for data processing, from calculations to formatting.
- Filtering and grouping can be added if needed. We skipped this to keep the example simple, but you could add
WHERE MATCH(@title, 'pizza')
afterFROM wiki_source
.
Complete Docker Compose Configuration
Now that we understand all the components and how they interact, let’s recap by looking at the complete docker-compose.yml
file. This single file defines our entire environment with all three services (Kafka, Manticore, and Kafkacat) plus the network configuration.
You can either copy the following content or download the ready-to-use docker-compose.yml directly from our GitHub repository:
services:
kafka:
image: docker.io/bitnami/kafka:3.7
container_name: kafka
networks:
- app-network
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
manticore:
image: manticoresearch/manticore:7.4.6
container_name: manticore
networks:
- app-network
kafkacat:
profiles:
- manual
image: edenhill/kcat:1.7.1
container_name: kcat
tty: true
entrypoint:
- '/bin/sh'
- '-c'
- "apk update && apk add -f curl && curl -N https://stream.wikimedia.org/v2/stream/recentchange | awk '/^data: /{gsub(/^data: /, \"\"); print}' | kcat -P -b kafka:9092 -t wikimedia"
networks:
- app-network
networks:
app-network:
driver: bridge
Running the Setup
With our environment configured, let’s check how the data flows through the system. After saving or downloading the docker-compose.yml
file to your project directory and starting the services as described earlier, you can monitor the data ingestion by running SQL queries against Manticore:
docker compose exec manticore mysql -e "SELECT count(*) FROM wiki_results"
+----------+
| count(*) |
+----------+
| 1200 |
+----------+
Wait a few seconds and run it again and you’ll get an updated value:
docker compose exec manticore mysql -e "SELECT count(*) FROM wiki_results"
+----------+
| count(*) |
+----------+
| 1400 |
+----------+
A simple query to view the data:
docker compose exec manticore mysql -e "SELECT title, user, timestamp FROM wiki_results LIMIT 5"
+-----------------------------+------------------+------------+
| title | user | timestamp |
+-----------------------------+------------------+------------+
| Bobbi Gibb | BOT-Superzerocool| 1737470127 |
| Angela Alsobrooks | Tomrtn | 1737470214 |
| Category:Goldschmidt Stolper| MB-one | 1737470211 |
| Oklahoma Sooners | JohnDoe | 1737470220 |
| File:Kluse - Phoenix.jpg | WikiBot | 1737470230 |
+-----------------------------+------------------+------------+
A more complex query with grouping:
docker compose exec manticore mysql -e "SELECT
namespace,
COUNT(*) as count,
AVG(length_diff) as avg_length_change,
MAX(timestamp) as latest_edit,
title as sample_title
FROM wiki_results
WHERE MATCH('wiki')
GROUP BY namespace
HAVING count > 5
ORDER BY count DESC
LIMIT 10
OPTION ranker=sph04"
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
| namespace | count | avg_length_change | latest_edit | sample_title |
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
| 14 | 998 | 116196508.99599199 | 1740684056 | Category:Wiki For Minorities in the Middle East 2025 |
| 0 | 634 | 3075575718.85488939 | 1740684057 | Oklahoma Sooners men's basketball |
| 6 | 313 | 2758109067.434505 | 1740684056 | File:Kluse - Phoenix dactylifera 03 ies.jpg |
| 2 | 40 | 1825360728.625000 | 1740684053 | User:SD2125! |
| 4 | 21 | 3272355882.52380943 | 1740684051 | Commons:Wiki For Minorities in the Middle East |
| 3 | 16 | 3489659770.625000 | 1740684054 | Brugerdiskussion:Silas Nicolaisen |
| 1 | 13 | 3634202801.230769 | 1740684045 | Diskussion:Nordische Skiweltmeisterschaften 2025 |
| 1198 | 10 | 1288490146.500000 | 1740684053 | Translations:Commons:Wiki Loves Folklore 2025/Page display title/id |
| 10 | 8 | 3221223681.500000 | 1740684055 | Predefinição:Mana (série) |
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
Modifying the Data Source Schema
If you need to modify the data source schema (e.g., add new fields, remove unnecessary ones, or change data types), follow these steps:
- Pause the Materialized View
First, pause the materialized view to stop the data flow from Kafka to thewiki_results
table:docker compose exec manticore mysql -e "ALTER MATERIALIZED VIEW wiki_mva suspended=1"
- Drop the Existing Source
Delete the current data source:docker compose exec manticore mysql -e "DROP SOURCE wiki_source"
- Create a New Source with the Updated Schema
Create a new source with the modified schema. For example, to add thedomain
field from themeta
JSON object, fieldparsedcomment
and change thenamespace
type tobigint
:docker compose exec manticore mysql -e "CREATE SOURCE wiki_source ( id bigint, schema '$schema' text, meta json, parsedcomment text, type text, namespace bigint, title text, title_url text, comment text, \`timestamp\` timestamp, user text, bot bool, minor bool, length json, server_url text, server_name text, wiki text ) type='kafka' broker_list='kafka:9092' topic_list='wikimedia' consumer_group='ms_wikimedia' num_consumers='1' batch=200"
- Update the Table (add the
domain
andparsedcomment
columns):docker compose exec manticore mysql -e "ALTER TABLE wiki_results ADD COLUMN domain text; ALTER TABLE wiki_results ADD COLUMN parsedcomment text"
- Drop the Materialized View:
docker compose exec manticore mysql -e "DROP MV wiki_mva"
- Recreate the Materialized View:
docker compose exec manticore mysql -e "CREATE MATERIALIZED VIEW wiki_mva TO wiki_results AS SELECT id, \`schema\`, meta AS metadata, meta.domain as domain, parsedcomment, type, namespace, title, title_url, comment, \`timestamp\`, user, bot, minor, length.old as length_old, length.new as length_new, integer(length.old) - integer(length.new) as length_diff, server_url, server_name, wiki, UTC_TIMESTAMP() as received_at FROM wiki_source"
If you’ve only recreated the SOURCE
without modifying the MV
, resume data reading with:
docker compose exec manticore mysql -e "ALTER MATERIALIZED VIEW wiki_mva suspended=0"
Otherwise, the materialized view should already be resumed.
This process gives you full control over schema changes and allows flexible adaptation to new requirements.
Conclusion
Integrating Kafka with Manticore Search offers a powerful solution for real-time data processing and analytics. By following this guide, you’ve set up a robust environment using Docker Compose, configured Kafka to handle message streaming, and leveraged Manticore Search for indexing and querying data. This integration not only enhances your application’s functionality but also simplifies data management and analysis.
Whether you’re working on log analysis, content indexing, or any other data-driven application, this setup provides a scalable and efficient framework. The flexibility of Manticore Search allows you to tailor the data processing pipeline to your specific needs, ensuring that you can adapt quickly to changing requirements.
We encourage you to experiment with this setup, explore additional features of Manticore Search, and adapt the example to your unique use cases. The complete code is available on
GitHub
, and the Manticore community is always ready to help with any questions or challenges you might encounter. Dive in and unlock the full potential of real-time data processing with Kafka and Manticore Search!