blog-post

Kafka और Manticore Search का एकीकृत करना: वास्तविक समय डेटा प्रसंस्करण के लिए कदम-दर-कदम मार्गदर्शिका

परिचय

Kafka एक लोकप्रिय संदेश ब्रोकर है जिसका उपयोग विभिन्न परियोजनाओं में किया जाता है: लॉग प्रसंस्करण और कार्य कतार प्रबंधन से लेकर सामग्री व्यक्तिगतकरण और वास्तविक समय विश्लेषण तक। उदाहरण के लिए, इसका उपयोग Wikipedia में परिवर्तनों को अनुक्रमित करने या ऑनलाइन स्टोर में उत्पादों की खोज करने के लिए किया जा सकता है। Manticore Search, बदले में, Kafka के साथ एकीकरण का समर्थन करता है, जिससे डेटा का स्वचालित आयात और इसका उपयोग पूर्ण-टेक्स्ट खोज, विश्लेषण, वेक्टर खोज और अधिक के लिए संभव होता है।

Manticore में डेटा आयात करते समय, आप इसे लचीले तरीके से संसाधित कर सकते हैं:

  • अनावश्यक फ़ील्ड को हटा दें, नए फ़ील्ड जोड़ें, या मौजूदा फ़ील्ड को संशोधित करें;
  • भू-स्थान के बीच की दूरी की गणना करें;
  • डेटा को सहेजने से पहले फ़िल्टर करें;
  • पूर्ण-टेक्स्ट खोज का उपयोग करके संक्षिप्त स्निपेट उत्पन्न करें।

इस लेख में, हम एक छोटे ऐप्लिकेशन का निर्माण करने पर ध्यान केंद्रित करेंगे जो Kafka से डेटा प्राप्त करता है और उसे Manticore Search में संसाधित करता है। हम पर्यावरण सेट करने के लिए Docker Compose का उपयोग करेंगे। यह मार्गदर्शिका शुरुआती और अनुभवी डेवलपर्स दोनों के लिए उपयुक्त है। संपूर्ण कोड और डेमो GitHub पर उपलब्ध है।


पर्यावरण सेट करना

आइए हम अपने विकास पर्यावरण को कॉन्फ़िगर करने के साथ शुरू करें। हम अपने पूरे पर्यावरण को सेट करने के लिए Docker Compose का उपयोग करेंगे, जिसमें Kafka, Manticore Search और डेटा स्ट्रीमिंग के लिए एक Kafkacat सेवा शामिल होगी। हम पहले प्रत्येक सेवा कॉन्फ़िगरेशन को अलग-अलग देखेंगे, और फिर पूर्ण docker-compose.yml फ़ाइल प्रदान करेंगे।

समय बचाने के लिए, आप हमारे GitHub रिपॉजिटरी से पूर्ण docker-compose.yml फ़ाइल डाउनलोड कर सकते हैं और यदि आप जल्दी शुरू करना चाहते हैं तो सेटअप चलाना अनुभाग पर जा सकते हैं।

Kafka कॉन्फ़िगर करना

आइए हम Kafka सेट करने से शुरू करें। हम KRaft (Kafka Raft) प्रोटोकॉल के साथ एक सरल कॉन्फ़िगरेशन का उपयोग करेंगे, जो आर्किटेक्चर को सरल बनाने के लिए ZooKeeper को प्रतिस्थापित करता है। यहाँ हमारा docker-compose.yml फ़ाइल का Kafka सेवा भाग है:

kafka:
  image: docker.io/bitnami/kafka:3.7
  container_name: kafka
  networks:
    - app-network
  environment:
    # KRaft settings
    - KAFKA_CFG_NODE_ID=0
    - KAFKA_CFG_PROCESS_ROLES=controller,broker
    - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
    # Listeners
    - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
    - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
    - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
    - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT

Manticore कॉन्फ़िगर करना

Kafka को हमारे संदेश स्ट्रीमिंग को संभालने के लिए सेट करने के बाद, अब हमें डेटा संसाधित करने के लिए एक खोज और विश्लेषण इंजन की आवश्यकता है। चलिए एक न्यूनतम लेकिन कार्यात्मक कॉन्फ़िगरेशन का उपयोग करके Manticore Search को कॉन्फ़िगर करते हैं:

manticore:
  image: manticoresearch/manticore:7.4.6
  container_name: manticore
  networks:
    - app-network

पर्यावरण शुरू करना

निम्नलिखित आदेश के साथ बुनियादी कंटेनर (Kafka और Manticore) लॉन्च करें:

docker compose up -d

यह Kafka और Manticore सेवाएं शुरू करता है, लेकिन अभी Kafkacat सेवा को नहीं (चूंकि यह एक मैनुअल प्रोफ़ाइल का उपयोग करता है)। सेवाएँ चालू होने के बाद, Kafka में एक टॉपिक बनाएं। हम कई उपभोक्ताओं द्वारा समानांतर डेटा पढ़ने के लिए भागों की संख्या 4 सेट करेंगे, जिससे प्रदर्शन में सुधार होता है:

docker compose exec kafka kafka-topics.sh \
  --create \
  --topic wikimedia \
  --partitions 4 \
  --bootstrap-server localhost:9092

डेटा स्रोतों की तैयारी

अब जब हमारी बुनियादी ढाँचा चालू है और संदेश प्राप्त करने के लिए एक टॉपिक तैयार है, चलिए एक डेटा स्ट्रीम सेट करते हैं जो वास्तविक समय की सामग्री को Kafka में फीड करता है। Kafka में डेटा भेजने के लिए, हम Wikimedia स्ट्रीम का उपयोग करेंगे। यहाँ Kafkacat सेवा कॉन्फ़िगरेशन है जिसमें एक manual प्रोफ़ाइल है (ताकि हम टॉपिक और Manticore सेटअप के बाद इसे मैन्युअल रूप से शुरू कर सकें):

kafkacat:
  profiles:
    - manual
  image: edenhill/kcat:1.7.1
  container_name: kcat
  tty: true
  entrypoint:
    - '/bin/sh'
    - '-c'
    - "apk update && apk add -f curl && curl -N https://stream.wikimedia.org/v2/stream/recentchange | awk '/^data: /{gsub(/^data: /, \"\"); print}' | kcat -P -b kafka:9092 -t wikimedia"
  networks:
    - app-network

मैन्युअल प्रोफ़ाइल के साथ Kafkacat सेवा को कॉन्फ़िगर करने के बाद, आप इसे Kafka में डेटा स्ट्रीमिंग आरंभ करने के लिए शुरू कर सकते हैं:

docker compose --profile manual up -d

प्राप्त डेटा का उदाहरण

एक बार जब Wikimedia स्ट्रीम Kafka में बहने लगती है, तो आप JSON फ़ॉर्मेट में संदेश प्राप्त करने लगेंगे। चलिए एक सामान्य संदेश का अवलोकन करते हैं ताकि हम उस डेटा संरचना को समझ सकें जिस पर हम कार्य कर रहे होंगे:

{
  "$schema": "/mediawiki/recentchange/1.0.0",
  "meta": {
    "uri": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
    "request_id": "66d1686b-500e-438c-8985-8c7a28295ae8",
    "id": "345ce42e-3cac-46b7-901e-2c3161f53436",
    "dt": "2024-12-10T16:30:32Z",
    "domain": "es.wikipedia.org",
    "stream": "mediawiki.recentchange",
    "topic": "codfw.mediawiki.recentchange",
    "partition": 0,
    "offset": 1313008266
  },
  "id": 323817817,
  "type": "edit",
  "namespace": 2,
  "title": "Usuario:Davicilio/Taller",
  "title_url": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
  "comment": "/* Uniforme titular */",
  "timestamp": 1733848232,
  "user": "Davicilio",
  "bot": false,
  "notify_url": "https://es.wikipedia.org/w/index.php?diff=164049521&oldid=164010074",
  "minor": false,
  "length": {
    "old": 29666,
    "new": 29691
  },
  "revision": {
    "old": 164010074,
    "new": 164049521
  },
  "server_url": "https://es.wikipedia.org",
  "server_name": "es.wikipedia.org",
  "wiki": "eswiki",
  "parsedcomment": "<span class=\"autocomment\"><a href=\"/wiki/Usuario:Davicilio/Taller#Uniforme_titular\">→<bdi>Uniforme titular</bdi></a></span>"
}

Manticore में डेटा के साथ काम करना

अब जब हमारे पास Kafka में Wikimedia स्ट्रीम से डेटा प्राप्त हो रहा है, चलिए Manticore Search को इस डेटा को संसाधित और अनुक्रमित करने के लिए कॉन्फ़िगर करते हैं।

एक डेटा स्रोत बनाना

आइए हम एक SOURCE बनाएं जो Kafka से डेटा पढ़ेगा। हम केवल उन फ़ील्ड्स को निर्दिष्ट करेंगे जिनमें हम रुचि रखते हैं—अन्य फ़ील्ड्स को अनदेखा किया जाएगा। यदि कोई फ़ील्ड स्कीमा में है लेकिन संदेश से गायब है, तो इसे NULL पर सेट किया जाएगा या खाली छोड़ा जाएगा, डेटा प्रकार के आधार पर:

docker compose exec manticore mysql -e "
CREATE SOURCE wiki_source (
  id bigint,
  schema '\$schema' text,
  meta json,
  type text,
  namespace int,
  title text,
  title_url text,
  comment text,
  \`timestamp\` timestamp,
  user text,
  bot bool,
  minor bool,
  length json,
  server_url text,
  server_name text,
  wiki text
)
type='kafka'
broker_list='kafka:9092'
topic_list='wikimedia'
consumer_group='ms_wikimedia'
num_consumers='1'
batch=200"

व्याख्याएँ:

  • CREATE SOURCE - डेटा स्रोत बनाने के लिए आदेश।
  • (id bigint, schema '$schema' text, …) - आने वाले संदेश से फ़ील्ड्स की सूची, जो Manticore द्वारा समर्थित डेटा प्रकारों से मैप की गई है ( डेटा प्रकारों की सूची )।
    • फ़ील्ड $schema - Manticore फ़ील्ड नामों में विशेष वर्णों की अनुमति नहीं देता है, इसलिए हम प्राथमिक मैपिंग का उपयोग करते हैं:
      new_name 'original_name' type
      
      • new_name — Manticore-संगत फ़ील्ड नाम।
      • original_name — मूल JSON कुंजी, जिसमें विशेष वर्ण शामिल हो सकते हैं। यदि आवश्यक हो, एकल उद्धरण के लिए \' का उपयोग करें।
  • type=kafka - Kafka को डेटा स्रोत के रूप में निर्दिष्ट करता है।
  • broker_list='kafka:9092' — संदेश ब्रोकरों की सूची, कॉमा से अलग।
  • topic_list='wikimedia' — पढ़ने के लिए टॉपिक्स की सूची, कॉमा से अलग।
  • consumer_group='ms_wikimedia' — उपभोक्ता समूह का नाम।
  • num_consumers='1' — संदेशों को संभालने वाली प्रक्रियाओं की संख्या (आमतौर पर विषय के भागों की संख्या के बराबर या उसका पूर्णांक होगा)।
  • batch=200 — संदेशों के प्रसंस्करण के लिए बैच आकार, प्रदर्शन को प्रभावित करता है और व्यक्तिगत रूप से समायोजित किया जाता है।

परिणाम तालिका बनाना

हमने Kafka से पढ़ने के लिए एक डेटा स्रोत बनाया है, लेकिन हमें उस डेटा के लिए एक गंतव्य की आवश्यकता है। चलिए संसाधित संदेशों को संग्रहीत करने के लिए एक तालिका बनाते हैं:

एक कुंजी फ़ील्ड संदेश ID है। डेटा संचरण के दौरान, नेटवर्क विफलताओं, Kafka ब्रोकर क्रैश, या Manticore डाउनटाइम जैसी समस्याएं डुप्लीकेट संदेश उत्पन्न कर सकती हैं। डुप्लिकेट से बचने के लिए, हम एक अद्वितीय ID का उपयोग करते हैं: यदि तालिका में कोई रिकॉर्ड पहले से मौजूद है, तो इसे छोड़ दिया जाता है।

ID के अलावा, तालिका में type, title, title_url, comment, timestamp, user, bot, minor, length, server_url, server_name, wiki, और meta जैसे फ़ील्ड शामिल होंगे।

docker compose exec manticore mysql -e "create table wiki_results (
  id bigint, 
  \`schema\` text, 
  metadata json, 
  type text, 
  namespace int, 
  title text, 
  title_url text, 
  comment text, 
  \`timestamp\` timestamp, 
  user string, 
  bot bool, 
  minor bool, 
  length_old int, 
  length_new int, 
  length_diff int, 
  server_url text, 
  server_name text, 
  wiki text, 
  received_at timestamp
)"

मैंटीकोर सर्च इंस्टॉल करें

मैंटीकोर सर्च इंस्टॉल करें