Apache Kafka for Streaming Real-Time Data

By:   |   Updated: 2024-10-29   |   Comments   |   Related: > Cloud Strategy


Problem

We are in a world of fast and quick decision-making. Data gets generated and distributed in real-time, making decision-makers search for reliability and robust solutions to handle millions and billions of data generated daily. Retail industries and other business-to-customer enterprises have been at the forefront of implementing such technology for everyday use.

Solution

As billions of data get generated daily, it is important to have a solution that can seamlessly handle large volumes of data with multiple parallel processing power capabilities. This need led to the creation of Apache Kafka (The Backbone of Real-Time Data Processing) by LinkedIn, a popular social network for professionals to connect and share ideas.

Section 1: Kafka Fundamentals

This section aims to help understand Kafka fundamentals and its different components.

What is Apache Kafka?

Apache Kafka is a distributed open-source platform from the Apache Software Foundation for streaming that is also designed to handle real-time data feeds. It forms the basis for many current applications, particularly those that require some form of data processing and analysis in real-time.

Kafka Architecture

Kafka Architecture

Kafka Key Features

  • Real-Time Data Processing: Kafka makes it possible to process massive amounts of data in real-time, giving businesses the ability to obtain insights and make decisions based on data swiftly.
  • Decoupling of Microservices: Kafka facilitates the decoupling of microservices by offering an asynchronous messaging system for inter-service communication.
  • Scalable Data Pipelines: Kafka is perfect for creating scalable data pipelines that can manage growing volumes of data over time because of its fault-tolerant, high performance and scalable architecture.
  • Reliability and Durability: Its fault-tolerant design makes sure that data is preserved even in the case of network or device failures. It also stores data on disk, making it a dependable and long-lasting storage method.
  • Data Integration with Big Data Ecosystems: End-to-end data processing pipelines are made possible by Kafka's good integration with other big data technologies, including Apache Spark and Apache Hadoop.

Industry Use Cases of Apache Kafka

Apache Kafka is versatile, making it a popular choice in numerous industries.

  • Online Shopping/E-commerce
    • Real-time recommendations: Making tailored suggestions based on an analysis of consumer behavior. Using algorithms like cosine similarities in machine learning.
    • Order processing includes handling inventories, tracking shipments, and processing orders.
    • Fraud detection: Quickly spotting questionable activity. This is based on the trading/purchase pattern of customers.
  • Banking and Related Services
    • Processing market data involves managing large amounts of data in real time.
    • Risk management means keeping an eye on risk variables and setting off alarms.
    • Trade execution is the act of handling and verifying deals.
  • I.T.
    • IoT device data collection and analysis are known as sensor data processing.
    • Real-time monitoring: Tracking the functionality and health of the equipment.
    • Predicting equipment failures using sensor data is known as predictive maintenance.
  • Medical Care
    • Integration of electronic health records (EHR):Combining information from several medical systems.
    • Real-time patient monitoring: keeping an eye on the health of the patient and sending out alarms.
    • Data processing and analysis for clinical trials is known as clinical research.
  • Gaming
    • Games are played in real-time with numerous players: synchronizing the game state.
    • Analytics: Examining the preferences and actions of players.
    • Messages and chat: Managing communication within games.

Apache Kafka Component

Apache Kafka, being a distributed streaming platform, is made of several components that enable it to function as expected.

Kafka Broker. The Kafka Brokers are the fundamental building blocks of a Kafka cluster. They oversee keeping track of, duplicating, and providing customers with data. Since each broker is a separate server, adding or removing it from the cluster won't have an impact on the system.

Kafka Broker

Kafka Topics. Topics are the core building block for the Apache Kafka distributed streaming platform's data for organizations. They act as sensible routes for message publication and subscription. Think of them as virtual message boards where producers can publish messages and consumers can subscribe to receive those messages.

Kafka Topics

Kafka Partitions. This is an essential data component that makes up a Kafka topic. They function as well-organized message sequences, which provide the Kafka platform with its scalability, parallelism, and fault tolerance. This gives each partition the ability to be processed independently by a consumer, enabling parallel processing and improving performance.

Kafka Partition

Kafka Producers. The Producers are client applications that publish (write) data to Kafka topics. They can select which partition to send messages to, frequently based on a key to preserve order, and they can send messages to specific themes. To maximize performance, producers might additionally serialize and compress messages before transmitting them.

Kafka Producer

Kafka Consumer. Consumers are applications that subscribe to Kafka topics and process the streams of records produced. They oversee managing their offsets to keep track of which messages have been handled, and they can read from one or more topics. Customers in Kafka might behave both individually and collectively.

Kafka Consumers

Kafka Consumer Group. Consumer groups facilitate group reading of a certain topic by several customers. Because each consumer in a group reads from a distinct set of partitions, messages are processed concurrently and duplicate-free. This approach improves load balance and scalability.

Consumer Group

Kafka Zookeeper. ZooKeeper is a centralized service to oversee and manage the Kafka brokers. It takes care of things like managing configuration data, monitoring broker status, and electing leaders for partitions. Although Kafka has been working to become less reliant on ZooKeeper, it is still an essential part of many deployments.

Zookeeper

Kafka Connect. A tool used for integrating Kafka with other systems is called Kafka Connect. It makes data streaming into and out of Kafka easier.

Two categories of connections exist:

  1. Source connectors: These insert information into Kafka topics from other systems (such as message queues or databases).
  2. Sink connectors: These write data to other systems (such as databases or data lakes) using information from Kafka topics.

Kafka Stream. A Java package called Kafka Streams makes it possible to process data streams in real time. It enables programmers to create Kafka-capable apps that can generate, process, and consume data. Filtering, aggregating, and joining streams are just a few of the activities that Kafka Streams can offer, making it an effective tool for stream processing.

Kafka Connect

Confluent: Kafka Stream

Kafka ksqlDB. ksqlDB, an event streaming database that builds on Kafka Streams, enables users to do stream processing with syntax to SQL. For users who are comfortable with SQL, it makes stream processing application development simpler and easier to implement real-time analytics and data transformations.

ksqlDB Architecture

ksqlDB Architecture

Kafka Offset. An offset in Apache Kafka is a distinct number that is linked to every message inside a particular topic partition. It allows users to monitor how far along they are in reading messages by indicating where a message is in the partition's log. Every message posted to a Kafka topic has a sequential offset, which is an integer value beginning at zero, added to the end of its partition.

Kafka Replication. One important component that improves the resilience and accessibility of data in a Kafka cluster is Kafka. It permits the system to continue operating and guarantees that messages are not lost in the event of broker failures. This image is a description of the elements of Kafka replication and its importance.

Kafka Replication

Confluent: Kafka Replication

Kafka Controller. An essential feature of an Apache Kafka cluster is the Kafka Controller, which oversees the status of replicas and partitions, supervises administrative duties, and guarantees the cluster's general health.

Kafka Controller

Confluent: Kafka Controller

Section 2: Installation of Kafka and Kafka Basic

In this section, we will install Kafka and Zookeeper using the Docker container. We should note that a new feature Kafka is being released that requires less need for Zookeeper.

Note: With Kafka version 2.8, the KRaft (Kafka Raft) consensus mechanism will replace ZooKeeper, enabling Kafka to handle metadata natively without the requirement for ZooKeeper.

Prerequisite

To successfully configure and install Kafka and Zookeeper, the following are needed:

  • Docker Container - for containerization
  • Basic Python knowledge
  • Basic understanding of command line prompt
  • Virtual Environment (We are using Windows Server, but any OS will work fine.)
  • Be open-minded and let's build something amazing.
  • Your favorite Code ID (I will be using VSCode.)

What is a Docker Container?

A Docker Container is a lightweight, standalone virtualization technology that executes packages, including everything needed to run a piece of software: the code, runtime, libraries, and system tools. Docker images are the foundation upon which containers are constructed.

Docker Container Architecture

Docker Architecture

Characteristics of Docker Containers

  • Isolation: Every Docker container operates independently in a separate setting. This indicates that a container's software doesn't affect other containers or the host system. Although they function separately, containers use the same operating system kernel.
  • Portability: Regardless of the underlying hardware or operating system, Docker containers can operate on any machine that has the Docker engine installed. Because of this mobility, developers may create apps that work reliably in a variety of settings, such as cloud servers and local development PCs.
  • Efficiency: Because containers do not need a whole operating system to function, they are lighter than conventional virtual machines (VMs). Rather, they share the host operating system kernel, which lowers resource usage and enables quicker startup times.
  • Scalability: Docker containers work well with microservices architectures, which divide large programs into smaller, more manageable services that can be independently created, deployed, and scaled.

Components of Docker Container

Docker containers are made up of a few essential parts that combine to give apps a portable, isolated, and lightweight environment to execute.

  • Docker Image. The application code, dependencies, libraries, and other files required to operate the application are all contained in a read-only template called a Docker image. Images are constructed using a Dockerfile, which contains the instructions for producing the image and is used to create containers.
  • Container Layer. A writable layer is added on top of the image layers when a Docker image is used to create a container. This layer of the container allows modifications to be made to files and new installations to be made during the runtime of the container. The application runs at the container layer.
  • Docker Daemon. A persistent process called Docker daemon (dockerd) oversees handling container objects and managing Docker containers. It runs commands to launch, control, and construct containers as it waits for requests from the Docker API.
  • Docker Client. Users can communicate with the Docker daemon via the command-line interface (CLI) provided by the Docker client, or Docker. It offers instructions for creating, launching, and maintaining networks, images, and Docker containers.
  • Docker Network. Virtual networks called Docker networks are used to link several containers. They make it possible for containers to talk to the host system and each other. A variety of network drivers, including bridge, host, and overlay, are offered by Docker to accommodate various networking needs.
  • Docker Volume. Data created by a container is stored in Docker volumes. They are kept out of the writable layer of the container and are not affected by the lifecycle of the container. Volumes can be used to store data that must remain after the container is removed or to exchange data between containers.
  • Docker Registry. A repository for sharing and keeping Docker images is called a Docker registry. Users can download (“pull”) and upload (“push”) images with it. Users can create private registries for their own images in addition to the public registry that is set up by default, Docker Hub.
  • Dockerfile. An instruction script for creating a Docker image is called a Dockerfile. It creates the environment variables, installs dependencies, copies the application code, specifies the base image, and defines the command to launch the application. A Dockerfile can be used to create an image using the docker build command.

Installation of Docker Desktop (for Windows Server - Optional)

Before we start installing Docker on our Windows server, we need to set up some requirements.

Step 1: Install Windows Subsystem for Linux (WSL) Ubuntu for Windows

WSL is a feature introduced in Windows 10 that allows users to run Linux binaries directly within the Windows environment. It offers an effective and potent method of gaining access to Linux programs, tools, and CLI without requiring an additional Linux installation.

In your Windows Server, open your Command Prompt and select Run as administrator. This should open a command prompt for you.

Command Prompt

In the CMD, type the word wsl - -install. This will install the Ubuntu subnet on your Windows server. You will notice from the image below that Ubuntu is already installed on my Windows server. If you are installing it for the first time, it might take a couple of minutes to download.

Install WSL

Step 2: Download Docker Desktop for Windows

Docker has different versions for different operating systems, but we will be using Docker Desktop for Windows. Follow the official link and download the .exe file. Then, follow the installation step.

Download Docker

If successfully installed, you should see the image below.

Docker Desktop Interface

To perform further investigation using your command prompt, use the command below to list all containers, including those that are stopped or paused.

docker ps -a
List of Running Containers

The image above shows that no container is currently created, but Docker works as expected.

Install Kafka and Zookeeper in the Docker Container

The following steps should be taken to set up Apache Kafka and Zookeeper in the Docker container.

Step 1: Create a Project Virtual Environment

Creating virtual environments for Python projects is a best practice that helps to maintain project isolation, manage dependencies, ensure reproducibility, facilitate experimentation, and improve collaboration.

Let's start by navigating to the desired environment we want to do our project. Use the change directory (cd) command to go to your preferred directory on your Windows server.

cd desktop

On our desktop, create a folder to use for the entire project (mkdir mssqltips_kafka). After successfully creating the folder, change the directory to the folder.

In the folder, create the Python Virtual environment by using the command below.

python -m venv mssqltips_kafka_env

Lastly, activate the created virtual environment with this command:

.\mssqltips_kafka_env\Scripts\activate
Starting VSCode

Open VSCode with this command code.

VSCode

Step 2: Set Docker-compose YAML File

In the left pane of VSCode, let's create a docker-compose.yml file inside the folder directory. Docker Compose is a tool that allows you to define and run multi-container Docker applications. It makes use of a YAML file called docker-compose.yaml to specify the dependencies and services (containers) that comprise your application.

Create Yaml File

In the docker-compose.yml file, paste the command below.

version: '3.1'
 
services:
 
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

Explanation

This Docker Compose file is used to set up a Kafka cluster and a single instance of Zookeeper.

Service Section:

The services section defines the containers that will be created as part of this Docker Compose application. In this case, it includes one Zookeeper service and one Kafka service.

Zookeeper Service:

zookeeper:
   image: wurstmeister/zookeeper 
   container_name: zookeeper
   ports:
     - "2181:2181"
  • zookeeper: This is the term used to identify the defined service.
  • image: Defines the Docker image that this service will utilize. In this case, it makes use of the well-liked Docker image wurstmeister/zookeeper, which runs Zookeeper.
  • container_name: Assigns the container with a unique name that will facilitate future reference.
  • ports: Connects host port 2181 to container port 2181. External apps can connect to the Zookeeper instance over this port, which is used by Zookeeper for client connections.

Kafka Service:

kafka:
   image: wurstmeister/kafka
   container_name: kafka
   ports:
     - "9092:9092"
   environment:
     KAFKA_ADVERTISED_HOST_NAME: localhost 
     KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 
  • kafka: This designates the Kafka service that is being described.
  • image: For the Kafka broker, use the wurstmeister/kafka image.
  • container_name: Assigns the name “kafka” to the container.
  • ports: Connects host port 9092 to container port 9092. The Kafka broker can be accessed from the outside using this port.
  • environment: Describes environment variables needed to set up the Kafka broker.
    • KAFKA_ADVERTISED_HOST_NAME: This variable specifies how customers are advertised to by the Kafka broker. Since it is configured to localhost in this instance, clients establishing connections to Kafka will use localhost as the hostname.
    • KAFKA_ZOOKEEPER_CONNECT: The address of the Zookeeper service, to which the Kafka broker will establish a connection, is specified by the variable. As it is configured to zookeeper:2181, the Kafka broker should establish a connection with the Zookeeper instance that is operating on port 2181 within the zookeeper service.

Step 3: Start Docker Compose Services

Use docker-compose up -d to start all the services defined in a docker-compose.yaml file in detached mode.

In your command terminal, put in the docker command to start all services in the docker-compose.yml file. This should take a couple of minutes to download and start, depending on your internet speed.

Docker-Compose up

Step 4: Test and Confirm Container Creation

One essential Docker command that lets users list and manage running containers is docker ps. It gives important details about the containers that are running on the Docker host currently. The docker ps command, its arguments, and how to understand its results are explained below.

Confirm Container

To get a visual confirmation, open your Docker Desktop and check the container tab. In the container tab, you will notice the container mssqltips_kafka running.

Confirm Container Creation

In the Images section, you will notice the two images created through the docker-compose.yml file.

Docker Images

Create a Simple Producer and Consumer from Terminal

The necessary setup is complete for Zookeeper and Kafka in our Docker container. Let's create a simple streaming process and test all connections.

Create Kafka Topics

Step 1: Activate Windows Subsystem for Linux.

A named log that records a stream of messages or events is called a Kafka topic. It acts as a conduit for data between the topic's publishers and subscribers, who process and subscribe to the data.

In your VSCode terminal, write the wsl command for Ubuntu.

wsl -d Ubuntu

The wsl -d command Ubuntu is used to start a particular Linux distribution using WSL.

Activate WSL from CMD

Step 2: Understand Kafka and Zookeeper Commands.

Let's go a step deeper by understanding the different Kafka and Zookeeper commands. Follow the numbered commands from the image below to log into the necessary folder path.

Kafka Commands

The commands listed in the red box above are the commands we will be using while working with Kafka from our terminal.

Step 3: Create Kafka Topic from Terminal

Using the terminal of VSCode or any terminal, let's create a topic by using the following command.

kafka-topics.sh --create --topic simplemssqltips-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Kafka Topic
  • kafka-topics.sh: This is the script provided by Apache Kafka for managing topics.
  • create: This option indicates that you want to create a new topic.
  • topic simplemssqltips-topic: This specifies the name of the topic you want to create. In this case, the topic will be called test simplemssqltips-topic.
  • server-bootstrap localhost:9092: The address of the Kafka broker to connect to is specified by this parameter. The broker is operating on the local computer and listening on port 9092, i.e., local host:9092. The first point of contact between the Kafka client and the Kafka cluster is the bootstrap server.
  • partitions 1: The number of partitions for the new topic is indicated by this option. In this instance, 1 denotes the presence of a single partition for the topic. With partitions, Kafka can grow and disperse data over several brokers. However, if there is only one partition, all messages will be kept there.
  • replication-factor 1: This option establishes the topic's replication factor. When the replication factor is 1, it indicates that the data will only be replicated once. Because it lacks fault tolerance, this is best suited for development or testing environments, but not recommended for production. Replication factors of two or three are typically required in a production environment to guarantee the durability of data.

Other commands that can be done on the topics created are:

  • List All Topics
kafka-topics.sh --list --bootstrap-server localhost:9092
  • Describe a Particular Topic
kafka-topics.sh --describe --topic simplemssqltips-topic --bootstrap-server localhost:9092
Kafka topic Description

Create Kafka Producer

The Kafka producer is an application or service that sends data to a Kafka topic. It acts as the data source in a Kafka system. We will be sending data to the topic we created earlier with the command below.

kafka-console-producer.sh --topic simplemssqltips-topic --bootstrap-server localhost:9092

After writing this on your command terminal, you will be prompted to send a message, which will act as a producer being sent to the topic.

Create Kafka producer in CLI

Create Kafka Consumer

To consume the information being sent to the topic by the producer, use the command below. You can either open a new shell terminal in VSCode or use your command prompt. This time I will use the command prompt from my Windows server.

kafka-console-consumer.sh --topic simplemssqltips-topic --bootstrap-server localhost:9092 --from-beginning
Kafka Consumer CLI

From the image above, you will notice the consumer was able to consume the message from the topic.

Test Process

Let's view the terminal side-by-side to confirm the process. In the image below, the above view is the command prompt terminal and below is the terminal from VSCode. You will notice the message is received from the topic to the consumer as it is being produced.

View both Producer and Consumer

Create a Simple Producer and Consumer Using Python Scripts

Creating Producer Script

The following steps will help us achieve this:

Step 1: Import All Necessary Libraries

Before starting with the Python script, we need to install some libraries that will help to work better throughout the entire process. Start by creating a requirements.txt file in the same folder path as your docker-compose.yml.

Create a Requirement File

Use the command below to install all the required libraries.

pip install -r requirements.txt
Pip install requirement

Step 2: Create Producer Script

In the same folder path, create a new Python file named simple_producer.py. We will use this script to send data to the Kafka topic.

from kafka import KafkaProducer
 
# Define the Kafka topic and bootstrap server
topic_name = 'simplemssqltips-topic'
bootstrap_servers = 'localhost:9092'  # 'kafka' is the service name defined in docker-compose
 
# Initialize Kafka producer
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
 
# Message to send
message = b"Hello from Docker Kafka producer: I love Game of Thrones!"
 
# Send the message to the Kafka topic
producer.send(topic_name, message)
 
# Flush the producer to ensure all messages are sent
producer.flush()
 
# Close the producer connection
producer.close()
 
print("Message sent successfully!")

Code Breakdown:

Define Topic and Bootstrap Server

topic_name = 'simplemssqltips-topic'
bootstrap_servers = 'localhost:9092'
  • topic_name: Specifies the name of the Kafka topic to which messages will be sent.
  • bootstrap_servers: Defines the address of the Kafka broker (in this case, running on localhost at port 9092).

Initialize Kafka Producer and Define Message

producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
message = b"Hello from Docker Kafka producer: I love Game of Thrones"
  • Creates an instance of KafkaProducer, connecting it to the specified Kafka broker.
  • Defines the message to be sent to the Kafka topic. The message is a byte string (indicated by the b prefix).

Send Message to Kafka Topic

producer.send(topic_name, message)
  • Sends the defined message to the specified Kafka topic (simplemssqltips-topic).

Flush Producer and Close Producer Connection

producer.flush()
producer.close()
  • Ensures that all buffered messages are sent to the Kafka broker. This is important for guaranteeing that the message is transmitted before closing the producer.
  • Closes the connection to the Kafka broker, releasing any resources held by the producer.

Step 3: Run Python Script

After writing all necessary Python scripts, run the consumer script with the command below. You should get a success message if done correctly.

Run simple producer script

Creating Consumer Script

Using the same approach, we need to consume the message from the topic using a Python script.

Step 1: Create Python Consumer Script

Using the script below to read data from the kafka topic.
from kafka import KafkaConsumer
 
# Define the Kafka topic and bootstrap server
topic_name = 'simplemssqltips-topic'
bootstrap_servers = 'localhost:9092'  # Adjust as needed
 
# Initialize Kafka consumer
consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers=bootstrap_servers,
    auto_offset_reset='earliest',  # Start reading from the beginning of the topic
    group_id='my-group'  # Consumer group ID (can be any string)
)
 
# Consume messages
print(f"Consuming messages from topic '{topic_name}'...")
for message in consumer:
    print(f"Received message: {message.value.decode('utf-8')}")

Code Breakdown:

Initialize Kafka Consumer

consumer = KafkaConsumer(
   topic_name,
   bootstrap_servers=bootstrap_servers, 
   auto_offset_reset='earliest', # Start reading from the beginning of the topic
   group_id='my-group' # Consumer group ID (can be any string)
)
  • Creates an instance of KafkaConsumer configured to read from the specified topic.
  • Parameters:
    • auto_offset_reset='earliest': This option makes sure that the consumer group will read from the beginning of the subject (i.e., the earliest messages) if there are no previously committed offsets.
    • group_id='my-group': The customer group ID is specified by this argument. To ensure that only one member of the group processes each message, consumers who have the same group ID will divide up the reading load related to the topic.

Consume Message

print(f"Consuming messages from topic '{topic_name}'...")
for message in consumer:
    print(f"Received message: {message.value.decode('utf-8')}")
  • To let the user know when they are beginning to read messages from the designated topic, a message is printed.
  • The consumer: loops for message function never stops searching the Kafka topic for fresh messages. The real content of a message is contained in message.value when it is received.
    • To produce a human-readable result, decode('utf-8') transforms the byte string message into a regular string using UTF-8 encoding.
  • Every communication that is received is shown on the terminal.

Step 2: Test Consumer Script

Using the script created above, run it in the VSCode terminal. You should get the message sent with the producer.

Simple Consumer Script

Section 3: Create a Mini Production Level Streaming Project with Apache Kafka

In this section, we are going to create a mini-streaming project using Apache Kafka.

Project Architecture

For this project, we will connect to Wikimedia Stream API, stream the data to Kafka Topic, and consume the data by storing it in ElasticSearch. ElasticSearch will serve as our storage layer where the stream of NoSQL data will be kept for faster query time. Lastly, we will use Kibana in ElasticSearch to visualize streamed data.

Project Architecture

Project Architecture

Update Docker-Compose YAML File

Let's update the docker-compose.yml file in our VSCode with the necessary command. This will make our Kafka cluster production ready.

version: '3.1'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  
  kafka1:
    image: wurstmeister/kafka
    container_name: kafka1
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka1:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_BROKER_ID: 1
 
  kafka2:
    image: wurstmeister/kafka
    container_name: kafka2
    ports:
      - "9094:9094"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka2:9095,OUTSIDE://localhost:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9095,OUTSIDE://0.0.0.0:9094
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_BROKER_ID: 2
 
  kafka3:
    image: wurstmeister/kafka
    container_name: kafka3
    ports:
      - "9096:9096"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka3:9097,OUTSIDE://localhost:9096
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9097,OUTSIDE://0.0.0.0:9096
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_BROKER_ID: 3
  es01:
    image: "docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2"
    ports:
      - "9200:9200"
      - "9300:9300"
    environment:
      node.name: es01
      discovery.seed_hosts: es01,es02,es03
      cluster.initial_master_nodes: es01,es02,es03
      cluster.name: mycluster
      bootstrap.memory_lock: "true"
      ES_JAVA_OPTS: -Xms256m -Xmx256m
    volumes:
      - "es-data-es01:/usr/share/elasticsearch/data"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    healthcheck:
      test: ["CMD-SHELL", "curl http://localhost:9200"]
      interval: 10s
      timeout: 10s
      retries: 120
  es02:
    image: "docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2"
    ports:
      - "9201:9200"
      - "9301:9300"
    environment:
      node.name: es02
      discovery.seed_hosts: es01,es02,es03
      cluster.initial_master_nodes: es01,es02,es03
      cluster.name: mycluster
      bootstrap.memory_lock: "true"
      ES_JAVA_OPTS: -Xms256m -Xmx256m
    volumes:
      - "es-data-es02:/usr/share/elasticsearch/data"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    healthcheck:
      test: ["CMD-SHELL", "curl http://localhost:9200"]
      interval: 10s
      timeout: 10s
      retries: 120
  es03:
    image: "docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2"
    ports:
      - "9202:9200"
      - "9303:9300"
    environment:
      node.name: es03
      discovery.seed_hosts: es01,es02,es03
      cluster.initial_master_nodes: es01,es02,es03
      cluster.name: mycluster
      bootstrap.memory_lock: "true"
      ES_JAVA_OPTS: -Xms256m -Xmx256m
    volumes:
      - "es-data-es03:/usr/share/elasticsearch/data"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    healthcheck:
      test: ["CMD-SHELL", "curl http://localhost:9200"]
      interval: 10s
      timeout: 10s
      retries: 120
  kibana:
    image: docker.elastic.co/kibana/kibana-oss:7.10.2
    depends_on:
      es01:
        condition: service_healthy
      es02:
        condition: service_healthy
      es03:
        condition: service_healthy
    ports:
      - "5601:5601"
    environment:
      - 'ELASTICSEARCH_HOSTS=["http://es01:9200","http://es02:9200","http://es03:9200"]'
volumes:
  es-data-es01:
  es-data-es02:
  es-data-es03:

Code Breakdown:

  • Zookeeper: The wurstmeister/zookeeper image is used to construct a single Zookeeper container. It uses port 2181 to listen.
  • Kafka: The wurstmeister/kafka image is used to build three Kafka containers. Every Kafka container listens on a distinct port and is assigned a unique KAFKA_BROKER_ID:
    • Ports 9092 (outside) and 9093 (inside) are where Kafka1 listens.
    • Ports 9094 (outside) and 9095 (inside) are where Kafka2 listens.
    • Ports 9096 (outside) and 9097 (inside) are where Kafka3 listens.
  • Zookeepers can be accessed by all Kafka containers via zookeeper:2181.
  • Elasticsearch: The docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2 image is used to build three Elasticsearch containers. They use ports 9200 (HTTP), 9300 (transport), and 9201/9202 (HTTP for es02 and es03) and construct a cluster called mycluster. Memory limitations and health checks are configured into the containers.
  • Kibana: Using the docker.elastic.co/kibana/kibana-oss:7.10.2 image, a single Kibana container is created. It is set up to connect to the Elasticsearch cluster using the ELASTICSEARCH_HOSTS environment variable and listens on port 5601.
  • Volumes: To store Elasticsearch data for every node, three named volumes (es-data-es01, es-data-es02, and es-data-es03) are defined.

Rebuild Docker

We need to rebuild the entire docker-compose.yml file. Let's start by stopping the entire docker running in the container.

Stop All Running Docker Containers.

docker stop $(docker ps -aq)

Remove Unused Docker Resources, Including Images, Containers, Networks, and Volumes.

docker system prune -a
Understanding Docker Commands

Rebuild Docker Compose YAML File. This will take a couple of minutes depending on your internet speed.

docker-compose up -d
Docker Compose up

Check Running Docker Containers. This is to check all running containers in docker.

docker ps
List running docker

Check All Images.

docker images
View Docker Images

Test Kibana. Let's test the Kibana image that will be used for data visualization and querying of ElasticSearch. Open your browser and put in the link http://localhost:5601. The image indicates that Kibana works as expected in our local host.

Kibana and ElasticSearch View

Create Kafka Topic for Production

Option 1. Use the host machine's localhost address: Instead of using the internal Docker network hostnames, you can use the exposed ports on localhost:

kafka-topics.sh --create --topic wikimedia-mssqltips-topic --bootstrap-server localhost:9092,localhost:9094,localhost:9096 --partitions 3 --replication-factor 2
View Kafka Topics

Option 2. Run the command from inside one of the Kafka containers. This way, you'll be inside the Docker network and can use the internal hostnames:

docker exec -it kafka1 kafka-topics.sh --create --topic wikimedia-mssqltips-topic --bootstrap-server kafka1:9093,kafka2:9095,kafka3:9097 --partitions 3 --replication-factor 2
Create Kafka Topic

Describe Kafka Topics. By using the command below, you can describe the Kafka topic to get more information.

kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic wikimedia-mssqltips-topic
Kafka Topics Description

Delete Topics. You can delete unwanted topics in Kafka by using the following command:

kafka-topics.sh --delete --topic wikimedia-mssqltips2-topic --bootstrap-server localhost:9092
Topic List

Create Producer Script

Now that we have our Kafka script up and running, we need to create a Python script that will be able to stream data from the WikimediaStream site and send it to the Kafka Cluster Topic.

import requests
import time
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
 
# Wikimedia Recent Changes Stream URL
url = 'https://stream.wikimedia.org/v2/stream/recentchange'
 
# Updated bootstrap servers to match the new Docker Compose configuration
# bootstrap_servers = ['kafka1:9093', 'kafka2:9095', 'kafka3:9097']
bootstrap_servers = ['localhost:9092', 'localhost:9094', 'localhost:9096']
 
topic_name = 'wikimedia-mssqltips-topic'
 
# Set up Kafka producer with JSON serialization and reliability features
producer = KafkaProducer(
    bootstrap_servers=bootstrap_servers,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    acks='all',  # Wait for all in-sync replicas to acknowledge the write
    retries=10,  # Number of retries if the initial produce request fails
    retry_backoff_ms=1000  # Wait 1 second between retries
)
 
def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')
 
def consume_stream(url, run_duration, pause_duration):
    try:
        while True:
            start_time = time.time()
            print(f"Starting stream consumption for {run_duration} seconds...")
            
            with requests.get(url, stream=True) as response:
                if response.status_code == 200:
                    print("Connected to the Wikimedia stream...")
                    for line in response.iter_lines():
                        # Stop after the run duration
                        if time.time() - start_time > run_duration:
                            break
                        if line:
                            try:
                                # Decode the line from byte to string
                                decoded_line = line.decode('utf-8')
                                
                                # Process only 'data' lines that contain the JSON payload
                                if decoded_line.startswith("data:"):
                                    # Remove the leading 'data: ' and strip whitespace
                                    json_data = json.loads(decoded_line[5:].strip())
                                    
                                    # Print the JSON data for verification
                                    print(json_data)
                                    
                                    # Send JSON data to Kafka topic
                                    future = producer.send(topic_name, json_data)
                                    future.add_callback(delivery_report)
                            
                            except json.JSONDecodeError as e:
                                print(f"Error decoding JSON: {e}")
                            except Exception as e:
                                print(f"Error processing line: {e}")
                else:
                    print(f"Failed to connect: {response.status_code}")
            
            print(f"Pausing for {pause_duration} seconds...")
            time.sleep(pause_duration)  # Pause for the specified duration
    
    except KeyboardInterrupt:
        print("Process terminated.")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
    finally:
        print("Exiting...")
        producer.flush()  # Ensure any buffered messages are sent
        producer.close()
 
# Start consuming the stream
consume_stream(url, run_duration=60, pause_duration=30)  # Run for 1 minute (60 seconds) and pause for 30 seconds

Code Breakdown

Kafka Producer Setup:

producer = KafkaProducer(
    bootstrap_servers=bootstrap_servers,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    acks='all',
    retries=10,
    retry_backoff_ms=1000
)

Sets up a Kafka producer initially with the following settings:

  • bootstrap_servers: Ties in with the designated Kafka brokers.
  • value_serializer: Encodes the message value into UTF-8 bytes and serializes it to JSON format.
  • acks='all': This makes sure that the producer doesn't consider a message successfully transmitted until it has been acknowledged by every in-sync replica.
  • Retries=10: Indicates how many times to try again in case the first send doesn't go through.
  • Retry_backoff_ms=1000: Creates a one-second interval before trying again.

Delivery Report Callback Function:

def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

Following the sending of a message to Kafka, this function is called. It prints the delivery status and does an error check.

Start Consuming:

consume_stream(url, run_duration=60, pause_duration=30)

The consume_stream function is called with the parameters to run for 60 seconds, pause for 30 seconds, and repeat.

Run Producer Script. From the image, you will notice the data are being streamed in real-time and sent to the Kafka topic.

Create Kafka Producer Wikistream

Confirm Streamed Data by Using CLI. Let's consume the data from the Kafka topic by using our CLI in our Windows server. You will notice from the image the total number of messages streamed was 2,401.

kafka-console-consumer.sh --topic wikimedia-mssqltips-topic --from-beginning --bootstrap-server localhost:9092,localhost:9094,localhost:9096
Producer Output

Create Consumer Script

This is the final step. We are going to create a consumer script and push the data to ElasticSearch for storage.

Create ElasticSearch Index. Creating an index in ElasticSearch is essential for efficient data management, search performance, and customization.

We created an index for ElasticSearch with the necessary mappings for data quality and checks.

curl -X PUT -x "" "http://localhost:9200/eventstream_mssqltips_kibana" -H "Content-Type: application/json" -d'
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 1
  },
  "mappings": {
    "properties": {
      "id": {"type": "keyword"},
      "type": {"type": "keyword"},
      "namespace": {"type": "integer"},
      "title": {"type": "text"},
      "title_url": {"type": "keyword"},
      "comment": {"type": "text"},
      "timestamp": {"type": "date"},
      "user": {"type": "keyword"},
      "bot": {"type": "boolean"},
      "notify_url": {"type": "keyword"}
    }
  }
}'
Create ElasticSearch Index

Install Necessary Library. We need to install the ElasticSearch library before running the consumer script. See the numbered steps in the image below.

Requirements Update

Consumer Script. Create a new Python file with the script below. The script is meant to filter out some necessary parts (mapping) and push them to the ElasticSearch Index.

from kafka import KafkaConsumer
import json
import time
import logging
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
 
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
 
# Print versions for debugging
import elasticsearch
import kafka
logger.info(f"Elasticsearch version: {elasticsearch.__version__}")
logger.info(f"Kafka-python version: {kafka.__version__}")
 
bootstrap_servers = ['localhost:9092', 'localhost:9094', 'localhost:9096']
 
consumer = KafkaConsumer(
    'wikimedia-mssqltips-topic',
    bootstrap_servers=bootstrap_servers,
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='python-consumer-group-1',
    value_deserializer=lambda x: json.loads(x.decode('utf-8')),
    api_version_auto_timeout_ms=60000,
    request_timeout_ms=65000
)
 
# Updated Elasticsearch client configuration
es = Elasticsearch(
    ['http://localhost:9200', 'http://localhost:9201', 'http://localhost:9202'],
    retry_on_timeout=True,
    max_retries=10,
    timeout=30
)
 
logger.info(f"Consumer created with configuration: {consumer.config}")
 
assignment_start_time = time.time()
assignment_timeout = 60
 
while not consumer.assignment():
    if time.time() - assignment_start_time > assignment_timeout:
        logger.error("Timed out waiting for partition assignment.")
        consumer.close()
        exit(1)
    time.sleep(1)
    consumer.poll(timeout_ms=1000)
    logger.info("Waiting for partition assignment...")
 
logger.info(f"Consumer assigned partitions: {consumer.assignment()}")
 
def generate_actions(messages):
    for message in messages:
        event_data = message.value
        filtered_data = {
            'id': event_data.get('id'),
            'type': event_data.get('type'),
            'namespace': event_data.get('namespace'),
            'title': event_data.get('title'),
            'title_url': event_data.get('title_url'),
            'comment': event_data.get('comment'),
            'timestamp': event_data.get('timestamp'),
            'user': event_data.get('user'),
            'bot': event_data.get('bot'),
            'notify_url': event_data.get('notify_url')
        }
        yield {
            "_index": "eventstream_mssqltips_kibana",
            "_source": filtered_data
        }
 
try:
    message_count = 0
    start_time = time.time()
    while True:
        message_batch = consumer.poll(timeout_ms=1000)
        
        if not message_batch:
            if time.time() - start_time > 60:
                logger.info("No messages received in the last 60 seconds. Still waiting...")
                start_time = time.time()
            continue
 
        for tp, messages in message_batch.items():
            message_count += len(messages)
            logger.info(f"Received {len(messages)} messages. Total count: {message_count}")
            
            try:
                actions = list(generate_actions(messages))
                success, _ = bulk(
                    es, 
                    actions,
                    raise_on_error=False,
                    raise_on_exception=False
                )
                logger.info(f"Indexed {success} documents successfully.")
            except Exception as e:
                logger.error(f"Error during bulk indexing: {e}")
 
except KeyboardInterrupt:
    logger.info("Consumer stopped by user.")
except Exception as e:
    logger.error(f"An unexpected error occurred: {e}")
finally:
    consumer.close()
    logger.info("Consumer closed.")

Test Consumer Script. We have all the requirements done. Let's run the Python script and confirm if it is sending to ElasticSearch Index.

python WikimediaStream_consumer_mssqltips.py
Consumer WikiStream

From the image above, we can confirm the producer successfully got the data from the Kafka topic and sent it to the ElasticSearch Index.

ElasticSearch Command

In your ElasticSearch http://localhost:5601, let's write the following command and explore the stored data.

Get Index Information. You can either use the VSCode terminal or navigate to the ElasticSearch URL.

curl -X GET -x "" "localhost:9200/eventstream_mssqltips_kibana?pretty"
View Index
Kibana Index View

Get 10 Records.

curl -X GET "http://localhost:9200/eventstream_mssqltips_kibana/_search?pretty" -H 'Content-Type: application/json' -d'
{
  "query": {
    "match_all": {}
  },
  "size": 10
}'
Index Filter
Index Filter Kibana

Conclusion

In this article, we have learned all about Apache Kafka, including setting up Apache Kafka and Zookeeper in a Docker container. We covered different concepts varying from basic to intermediate.

A mini project was completed to pull live data from a streaming platform, like Wikimedia Stream, taking the data through Kafka Cluster to ElasticSearch for storage and querying. This article is the first part of a three-part series, and we will be continuing onto other streaming projects in our next article. All code and resources used for this project will be made available via this Zip file.

Next Steps


sql server categories

sql server webinars

subscribe to mssqltips

sql server tutorials

sql server white papers

next tip



About the author
MSSQLTips author Temidayo Omoniyi Temidayo Omoniyi is a Microsoft Certified Data Analyst, Microsoft Certified Trainer, Azure Data Engineer, Content Creator, and Technical writer with over 3 years of experience.

This author pledges the content of this article is based on professional experience and not AI generated.

View all my tips


Article Last Updated: 2024-10-29

Comments For This Article

















get free sql tips
agree to terms