Build a Streaming Data Architecture with Apache Kafka and Zookeeper

By:   |   Updated: 2024-11-11   |   Comments   |   Related: > Cloud Strategy


Problem

Billions of data get generated on a data basis. Having a system that captures this generated data in real-time and migrates it to its respective location is of high importance. This has led to heavy investment in different data infrastructures, either enterprise-based or modern data platforms. In this article, we look at how to use Apache Kafka and Zookeeper to process and load streaming data.

Solution

The introduction of Apache Kafka has made real-time data streaming possible for most organizations. As data are being generated from different producers, they get to their various destinations in real-time where business decisions can be made.

Project Architect

This article is purely project-based. To fully grasp the different concepts covered here, I recommend reading the previous article titled Understanding Apache Kafka for Streaming, which covers the installation of Docker Desktop, the importance of Kafka, and how it is used in today's industry.

In this project, we will be streaming data in real time from a site called Random Information to Apache Kafka, which is also managed by Zookeeper. We will then create a consumer script that deserializes the data from the Kafka topic and sends it to both Elasticsearch Index and Azure Data Lake Gen 2. This is where the data team consumes the data and makes descriptive and predictive analyses.

Project Architecture

Prerequisite

To get a full grasp of the project, the following requirements are needed from our reader:

  • Basic Python knowledge.
  • Docker Desktop and WSL (Windows Users) installed.
  • Basic understanding of Docker.
  • Basic understanding of Azure.
  • Read our previous article on Apache Kafka.

Preparing Project Environment

Before we get started, we need to set up the environment to have a seamless process all the way through.

Step 1: Create a Python Virtual Environment

This is an isolated environment that handles dependencies for several projects independently, which is a segregated setting that makes sure every project has its libraries and versions.

The following steps should be followed in setting up the virtual environment.

Step 1.1: Create Necessary Directory. In your command prompt on your Windows Desktop Server, search for cmd.

In your CMD, follow the command from the image below. The last command would be used on your favorite code editor (I'm using VS Code).

Command Prompt to Create Folder

Step 1.2: Create Virtualenv. In your VS Code, open a new terminal and write in the following command to create your virtual environment: virtualenv architecture_env. After successfully creating a virtual environment, let's activate it with this command: architecture_env\Scripts\activate.

Create Virtual Environment

Step 2: Create Docker-Compose.yml File

Docker Compose makes it easier to manage Docker applications with multiple containers. It enables you to use a single configuration file (often called docker-compose.yml) to define and execute several containers simultaneously as a single application.

Step 2.1: Start Docker Desktop. Part of the prerequisites was the installation of Docker Desktop on your operating system. We are using a Windows server for this project. The Docker Desktop application needs to be on to create a necessary image for this project.

Docker Container Desktop

Step 2.2: Create Docker Images. To create all the necessary images, we need to create a file in our project directory. Right-click inside the project folder and select a new file. Name the file: docker-compose.yml.

Create a New File - Docker-compose.yml

The docker-compose.yml will be used to set all necessary instructions for our docker images. Based on the above architecture, the following images are needed to execute successfully: Zookeeper, Kafka, Elasticsearch, and Kibana.

version: '3'
 
services:
  zookeeper:
    image: wurstmeister/zookeeper:latest
    ports:
      - "2181:2181"
    volumes:
      - zookeeper_data:/data
 
  kafka:
    image: wurstmeister/kafka:latest
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "mssqltips-topic:3:1"
    volumes:
      - kafka_data:/kafka
    depends_on:
      - zookeeper
 
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.17.9
    environment:
      - discovery.type=single-node
    ports:
      - "9200:9200"
      - "9300:9300"
    volumes:
      - elasticsearch_data:/usr/share/elasticsearch/data
 
  kibana:
    image: docker.elastic.co/kibana/kibana:7.17.9
    ports:
      - "5601:5601"
    environment:
      ELASTICSEARCH_HOSTS: http://elasticsearch:9200
    depends_on:
      - elasticsearch
 
volumes:
  zookeeper_data:
  kafka_data:
  elasticsearch_data:
 
networks:
  default:
     driver: bridge

Docker Breakdown

Services: This section defines the individual containers that will be created and managed.

Zookeeper

zookeeper:
  image: wurstmeister/zookeeper:latest
  ports:
    - "2181:2181"
  volumes:
    - zookeeper_data:/data
  • Image: Makes use of the most recent Wurstmeister Zookeeper picture.
  • Ports: Allows external access by mapping port 2181 on the host to port 2181 on the container.
  • Volumes: Zookeeper data is preserved in a volume called zookeeper_data.

Kafka

kafka:
  image: wurstmeister/kafka:latest
  ports:
    - "9092:9092"
  environment:
    KAFKA_ADVERTISED_HOST_NAME: localhost
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    KAFKA_CREATE_TOPICS: "mssqltips-topic:3:1"
  volumes:
    - kafka_data:/kafka
  depends_on:
    - zookeeper
  • Image: The most recent iteration of the Wurstmeister Kafka picture is used.
  • Ports: For Kafka communication, map port 9092.
  • Environmental Variables:
    • KAFKA_ADVERTISED_HOST_NAME: Sets the hostname that is promoted to clients under.
    • KAFKA_ZOOKEEPER_CONNECT: Indicates the connection string for Zookeeper.
    • KAFKA_CREATE_TOPICS: mssqltips-topic, a topic with three partitions and a replication factor of one, is automatically created.
  • Volumes: Kafka data is stored in a volume called kafka_data.
  • Depends_on: Makes sure Zookeeper launches ahead of Kafka.

ElasticSearch

elasticsearch:
  image: docker.elastic.co/elasticsearch/elasticsearch:7.17.9
  environment:
    - discovery.type=single-node
  ports:
    - "9200:9200"
    - "9300:9300"
  volumes:
    - elasticsearch_data:/usr/share/elasticsearch/data
  • Image: Makes use of a certain Elasticsearch version.
  • Environment: Sets up Elasticsearch to operate as a cluster with just one node.
  • Ports: Maps HTTP and transport communication to ports 9200 and 9300, respectively.
  • Volumes: Elasticsearch data is stored in a volume called elasticsearch_data.

Kibana

kibana:
  image: docker.elastic.co/kibana/kibana:7.17.9
  ports:
    - "5601:5601"
  environment:
    ELASTICSEARCH_HOSTS: http://elasticsearch:9200
  depends_on:
    - elasticsearch
  • Image: Makes use of a certain Kibana version.
  • Ports: The Kibana web interface can be accessed by mapping port 5601.
  • Environment: Sets up Kibana to establish a connection with Elasticsearch.
  • Depends_on: Makes sure Kibana launches after Elasticsearch.

Volumes: To ensure that data is kept safe even if containers are stopped or removed, this section defines named volumes for persistent storage.

volumes:
  zookeeper_data:
  kafka_data:
  elasticsearch_data:

Networks: This describes the services' default network type. To facilitate communication between the containers, the bridge driver establishes a private internal network.

networks:
  default:
    driver: bridge

Step 2.3: Start Docker-Compose.yml File. Using the docker-compose up -d command to start all services defined in the Docker compose file in a detached mode.

The detached mode means that you can continue working in your terminal without being dependent on the output of the containers because the command will run the containers in the background and return right away.

Docker Compose up

Step 2.4: Confirm Images. After successfully creating all the necessary images using the docker-compose.yml file, head to Docker Desktop to confirm if it was provisioned as expected. Click on the container; this will take you to all the images in the container.

Docker Container Created

You will notice the major images setup in the docker-compose.yml file are made available.

Images in Container

Streaming, Processing, and Ingestion

In this section, we will be focused more on coding and creating the producer and consumer scripts.

Step 1: Create a Producer Script

For this project, we will be streaming data from an API called random user. In this API, data are generated randomly, which is not real and no sensitive information is disclosed.

The following steps should be followed in creating this:

Step 1.1: Install Necessary Python Libraries. Before we get started, let's create a file in our project folder called requirements.txt and put in the following requirements libraries:

kafka-python
elasticsearch
python-dotenv
azure-storage-file-datalake
azure-identity
Library Requirements

Step 1.2: Create Kafka Topic. Kafka Topics are essential components of the distributed streaming platform for data organization. They function similarly to channels or feeds where subscribers can publish and receive messages.

To get started, let's enter the Windows Subsystem for Linux (WSL) by creating a new terminal and writing the command in the image below.

Activate Environment

Now that we are in the WSL environment, let's create a topic named mssqltipsrandomuser with three partitions and one replication factor.

kafka-topics.sh --create --topic mssqltipsrandomuser-topic --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092
Create Kafka Topic
  • --partitions 3: This specifies that the topic will have 3 partitions.
  • --replication-factor 1: This sets the replication factor for the topic to 1, meaning there will be one copy of each partition.

Step 1.3: Understand Topic Created.

List All Topics. By connecting to the broker running at localhost:9092, this command will show all the topics in your Kafka instance.

kafka-topics.sh --list --bootstrap-server localhost:9092
View Kafka Topic

Describe the Created Topic. This command gives comprehensive details about the given topic, such as the replication factor, number of partitions, and current leader of each partition.

afka-topics.sh --describe --topic mssqltipsrandomuser-topic --bootstrap-server localhost:9092
Describe Kafka Topics

Step 1.4: Python Producer Script. In the project folder directory, create a new file called producer.py and write the following lines of code in it.

Let's start by viewing the random user API site we want to pull data from. This will give a better understanding on how to go about the entire process and the required data mapping to pull.

site url - Randomusers

Write the Python script below of what the producer is expected to look like.

import requests
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
import time
import logging
 
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 
# Function to fetch user data from the API
def fetch_user_data(api_url):
    try:
        response = requests.get(api_url)
        response.raise_for_status()  # Raises an HTTPError for bad responses
        data = response.json()
        user = data['results'][0]
        user_data = {
            "Name": f"{user['name']['title']} {user['name']['first']} {user['name']['last']}",
            "Gender": user['gender'],
            "Address": f"{user['location']['street']['number']} {user['location']['street']['name']}, {user['location']['city']}, {user['location']['state']} - {user['location']['country']} {user['location']['postcode']}",
            "Coordinates": f"Latitude {user['location']['coordinates']['latitude']}, Longitude {user['location']['coordinates']['longitude']}",
            "Timezone": f"{user['location']['timezone']['description']} (Offset {user['location']['timezone']['offset']})",
            "Email": user['email'],
            "Phone": user['phone'],
            "Cell": user['cell'],
            "Date of Birth": f"{user['dob']['date']} (Age: {user['dob']['age']})",
            "Registered": f"{user['registered']['date']} (Age: {user['registered']['age']})",
            "Picture": user['picture']['large']
        }
        return user_data
    except requests.RequestException as e:
        logging.error(f"Failed to fetch data: {str(e)}")
        return None
 
# Function to serialize data
def json_serializer(data):
    try:
        return json.dumps(data).encode('utf-8')
    except (TypeError, ValueError) as e:
        logging.error(f"JSON serialization error: {str(e)}")
        return None
 
# Kafka producer configuration
def create_producer():
    try:
        return KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=json_serializer
        )
    except KafkaError as e:
        logging.error(f"Failed to create Kafka producer: {str(e)}")
        return None
 
# Main function to fetch data and send to Kafka
def main():
    api_url = "https://randomuser.me/api/"
    producer = create_producer()
    if not producer:
        logging.error("Exiting due to Kafka producer creation failure")
        return
 
    try:
        while True:
            logging.info("Fetching and sending data to Kafka...")
            start_time = time.time()
            # Fetch and send data for 1 minute
            while time.time() - start_time < 60:
                user_data = fetch_user_data(api_url)
                if user_data:
                    logging.info(f"Sending user data to Kafka: {user_data}")
                    future = producer.send('mssqltipsrandomuser-topic', user_data)
                    try:
                        record_metadata = future.get(timeout=10)
                        logging.info(f"Message sent to topic {record_metadata.topic} partition {record_metadata.partition} offset {record_metadata.offset}")
                    except KafkaError as e:
                        logging.error(f"Failed to send message to Kafka: {str(e)}")
                time.sleep(5)  # Slight pause between requests within the minute
            logging.info("Pausing for 10 seconds...")
            time.sleep(10)  # Pause for 10 seconds
    except KeyboardInterrupt:
        logging.info("Process terminated by user. Exciting...")
    finally:
        if producer:
            producer.close()
 
if __name__ == "__main__":
    main()

Code Breakdown

Libraries Import

import requests
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
import time
import logging
  • requests: An HTTP request library to retrieve data from APIs.
  • json: A module that handles serialization and deserialization of JSON data.
  • Sending messages to a Kafka topic is possible with the KafkaProducer class, which is part of the Kafka library.
  • An exception class for managing Kafka-related failures is called KafkaError.
  • time: A module that manages time-related operations, including measuring intervals of time or going to sleep.
  • logging: A message-logging module that aids with debugging and application flow tracking.

Logging Configuration

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

This configures logging to contain the timestamp, log level, and message in a predetermined manner. Since the log level is set to INFO, only messages that fall under this category will be shown.

Function to Fetch Data

def fetch_user_data(api_url):
    try:
        response = requests.get(api_url)
        response.raise_for_status()  # Raises an HTTPError for bad responses
        data = response.json()
        user = data['results'][0]
        user_data = {
            "Name": f"{user['name']['title']} {user['name']['first']} {user['name']['last']}",
            ...
        }
        return user_data
    except requests.RequestException as e:
        logging.error(f"Failed to fetch data: {str(e)}")
        return None

This function is used in fetching data from a specific URL.

Function to Serialize Data

def json_serializer(data):
    try:
        return json.dumps(data).encode('utf-8')
    except (TypeError, ValueError) as e:
        logging.error(f"JSON serialization error: {str(e)}")
        return None

The goal of this method is to encode the user data in UTF-8 and serialize it into JSON format.

  • Error Handling: An error message is logged if serialization fails.

Kafka Producer Configuration

def create_producer():
    try:
        return KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=json_serializer
        )
    except KafkaError as e:
        logging.error(f"Failed to create Kafka producer: {str(e)}")
        return None

The goal of this function is to establish a connection between a localhost:9092 Kafka broker and a producer.

  • Value Serializer: Before delivering messages to Kafka, it serializes them using the json_serializer function.
  • Error Handling: The producer logs an error if it is unable to create.

Main Function

def main():
    api_url = "https://randomuser.me/api/"
    producer = create_producer()
    if not producer:
        logging.error("Exiting due to Kafka producer creation failure")
        return
 
    try:
        while True:
            logging.info("Fetching and sending data to Kafka...")
            start_time = time.time()
            while time.time() - start_time < 60:
                user_data = fetch_user_data(api_url)
                if user_data:
                    logging.info(f"Sending user data to Kafka: {user_data}")
                    future = producer.send('mssqltipsrandomuser-topic', user_data)
                    try:
                        record_metadata = future.get(timeout=10)
                        logging.info(f"Message sent to topic {record_metadata.topic} partition {record_metadata.partition} offset {record_metadata.offset}")
                    except KafkaError as e:
                        logging.error(f"Failed to send message to Kafka: {str(e)}")
                time.sleep(5)  # Slight pause between requests within the minute
            logging.info("Pausing for 10 seconds...")
            time.sleep(10)  # Pause for 10 seconds
    except KeyboardInterrupt:
        logging.info("Process terminated by user. Exiting...")
    finally:
        if producer:
            producer.close()
  • Goal: The primary function coordinates the user data gathering and sending to the Kafka topic.
  • API endpoint to retrieve user data is specified by the API URL.
  • Producer Creation: If the Kafka producer is not successful, it departs.
  • Infinite Loop: The procedure can continue running indefinitely because of the outer while True loop.
  • Data Fetching: For 60 seconds during the loop, user data is fetched and sent to Kafka every five seconds.
  • Data Sending: It logs the metadata of the transmitted message and sends the fetched user data to the Kafka topic mssqltipsrandomuser-topic.
  • Error Handling: An error is logged if transmitting is unsuccessful.
  • Pausing: It takes a 10-second break after every 60-second fetch cycle.
  • Graceful Exit: The keyboard interrupt (Ctrl+C) can be used to end the script.

Step 1.5: Test Producer Script. In your command prompt, use the command python producer.py to run the script.

From the image below, we can confirm that Kafka topic mssqltipsrandomuser is currently receiving a message.

Test Producer Script

Step 1.6: Confirm Message using Kafka CLI.To confirm if the message has been sent as expected, let's run the following command prompt in our Kafka CLI by creating a consumer that would listen to the Kafka topic.

In the WSL command prompt, write the command below, which will be use in consuming data directly from Kafka topics:

kafka-console-consumer.sh --topic mssqltipsrandomuser-topic --bootstrap-server localhost:9092 --from-beginning

Breakdown

  • The Kafka console consumer is a command-line utility that lets you read data from a Kafka topic and output it to the standard output (console). The script that runs it is called kafka-console-consumer.sh.
  • mssqltipsrandomuser-topic --topic: This option designates the Kafka topic name from which the recipient will receive messages. It is mssqltipsrandomuser-topic in this instance.
  • --server-bootstrap localhost:9092: The address of the Kafka broker to connect to is specified by this parameter. This establishes a connection to a broker that is using port 9092 on localhost. For the user to connect to the Kafka cluster, the bootstrap server is required.
  • --starting at the beginning: By selecting this option, the user is directed to begin reading messages from the beginning of the topic's message log instead of beginning with the most recent message.

During this short period, a total of 54 messages were processed.

Total Number of messages

Create a Consumer Script and Push to ElasticSearch

Now, we can confirm our Producer is working as expected, we need to create a consumer script that would pick the data from Kafka topics and send it to ElasticSearch Index for storing purposes.

Elasticsearch is an open-source search engine that is very scalable for real-time search, analytics, and log management. It is constructed on top of the robust text search engine library Apache Lucene.

Step 1: Create ElasticSearch Index

Firstly, we need to create an ElasticSearch Index, which is a logical container for collecting related documents.

To create an Elasticsearch index with the name randomuser_index with certain mappings and settings, use the supplied curl command.

Create Index
curl -X PUT "http://localhost:9200/randomuser_index" -H "Content-Type: application/json" -d'
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 1
  },
  "mappings": {
    "properties": {
      "Name": {"type": "text"},
      "Gender": {"type": "keyword"},
      "Address": {"type": "text"},
      "Coordinates": {"type": "text"},
      "Timezone": {"type": "text"},
      "Email": {"type": "keyword"},
      "Phone": {"type": "keyword"},
      "Cell": {"type": "keyword"},
      "Date of Birth": {"type": "date"},
      "Registered": {"type": "date"},
      "Picture": {"type": "keyword"}
    }
  }
}'

After successfully creating the ElasticSearch, we can get more information about the index created by following this line of code below.

curl -X GET -x "" "localhost:9200/randomuser_index?pretty"
View Index Value

Step 2: Create Consumer Python Script

Create a new Python file and put in the following consumer code in it.

import json
from kafka import KafkaConsumer
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import logging
from datetime import datetime
 
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
 
# Kafka Configuration
KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092']
KAFKA_TOPIC = 'mssqltipsrandomuser-topic'
KAFKA_GROUP_ID = 'python-consumer-group-randomuser'
 
# Elasticsearch Configuration
ES_HOSTS = ['http://localhost:9200']
ES_INDEX = 'randomuser_index'
 
def create_kafka_consumer():
    return KafkaConsumer(
        KAFKA_TOPIC,
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        group_id=KAFKA_GROUP_ID,
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        api_version_auto_timeout_ms=60000,
        request_timeout_ms=65000
    )
 
def create_elasticsearch_client():
    return Elasticsearch(
        ES_HOSTS,
        retry_on_timeout=True,
        max_retries=10,
        timeout=30
    )
 
def parse_date(date_string):
    return datetime.strptime(date_string.split('(')[0].strip(), "%Y-%m-%dT%H:%M:%S.%fZ")
 
def generate_actions(messages):
    for message in messages:
        user_data = message.value
        user_data['Date of Birth'] = parse_date(user_data['Date of Birth'])
        user_data['Registered'] = parse_date(user_data['Registered'])
        yield {
            "_index": ES_INDEX,
            "_source": user_data
        }
 
def main():
    consumer = create_kafka_consumer()
    es_client = create_elasticsearch_client()
 
    logger.info(f"Consumer created with configuration: {consumer.config}")
 
    try:
        message_count = 0
        while True:
            message_batch = consumer.poll(timeout_ms=1000)
           
            if not message_batch:
                continue
 
            for tp, messages in message_batch.items():
                message_count += len(messages)
                logger.info(f"Received {len(messages)} messages. Total count: {message_count}")
               
                try:
                    actions = list(generate_actions(messages))
                    success, _ = bulk(
                        es_client,
                        actions,
                        raise_on_error=False,
                        raise_on_exception=False
                    )
                    logger.info(f"Indexed {success} documents successfully.")
                except Exception as e:
                    logger.error(f"Error during bulk indexing: {e}")
 
    except KeyboardInterrupt:
        logger.info("Consumer stopped by user.")
    except Exception as e:
        logger.error(f"An unexpected error occurred: {e}")
    finally:
        consumer.close()
        logger.info("Consumer closed.")
 
if __name__ == "__main__":
    main()
Consumer Script

Code Breakdown

Kafka Configuration

KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092']
KAFKA_TOPIC = 'mssqltipsrandomuser-topic'
KAFKA_GROUP_ID = 'python-consumer-group-randomuser'
  • KAFKA_BOOTSTRAP_SERVERS: The Kafka broker or brokers to connect to are specified.
  • KAFKA_TOPIC: The Kafka topic name that messages will be derived from.
  • KAFKA_GROUP_ID: This Kafka consumer's consumer group ID, which permits several consumers to split the workload.

ElasticSearch Configuration

ES_HOSTS = ['http://localhost:9200']
ES_INDEX = 'randomuser_index'
  • The Elasticsearch server or servers to connect to are specified by ES_HOSTS.
  • ES_INDEX: The index name in Elasticsearch that will hold the data.

Create Kafka Consumer Function

def create_kafka_consumer():
    return KafkaConsumer(
        KAFKA_TOPIC,
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        group_id=KAFKA_GROUP_ID,
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        api_version_auto_timeout_ms=60000,
        request_timeout_ms=65000
    )

The goal of this function is to generate a Kafka consumer that is ready to read from the given topic and return it.

Setting parameters:

  • auto_offset_reset='earliest': This indicates that the consumer group will begin reading from the earliest message in the subject if no committed offsets exist.
  • enable_auto_commit =True: The offsets of processed messages are committed automatically.
  • value_deserializer: The JSON-formatted message value is deserialized using this lambda function.

Create Elasticsearch Client Function

def create_elasticsearch_client():
    return Elasticsearch(
        ES_HOSTS,
        retry_on_timeout=True,
        max_retries=10,
        timeout=30
    )

The goal of this method is to generate an Elasticsearch client that is ready to connect to the given hosts.

Setting parameters:

  • Retries the request in the event of a timeout (retry_on_timeout=True).
  • max_retries=10: Indicates the most requests that can be made again.
  • timeout=30: Defines the timeout for every Elasticsearch request.

Parse Data Function

def parse_date(date_string):
    return datetime.strptime(date_string.split('(')[0].strip(), "%Y-%m-%dT%H:%M:%S.%fZ")

The goal of this function is to create a datetime object by parsing a date text in ISO format. It parses and removes any extra formatting (like parenthesis) from the date string.

Generate Action Bulk Index Function

def generate_actions(messages):
    for message in messages:
        user_data = message.value
        user_data['Date of Birth'] = parse_date(user_data['Date of Birth'])
        user_data['Registered'] = parse_date(user_data['Registered'])
        yield {
            "_index": ES_INDEX,
            "_source": user_data
        }

The objective of this function is to produce a series of steps for bulk indexing into Elasticsearch.

  • Processing: It retrieves the user information from each message, parses the date fields, and produces a dictionary with the index and document source information.

Main Function

def main():
    consumer = create_kafka_consumer()
    es_client = create_elasticsearch_client()
 
    logger.info(f"Consumer created with configuration: {consumer.config}")
 
    try:
        message_count = 0
        while True:
            message_batch = consumer.poll(timeout_ms=1000)
            
            if not message_batch:
                continue
 
            for tp, messages in message_batch.items():
                message_count += len(messages)
                logger.info(f"Received {len(messages)} messages. Total count: {message_count}")
                
                try:
                    actions = list(generate_actions(messages))
                    success, _ = bulk(
                        es_client, 
                        actions,
                        raise_on_error=False,
                        raise_on_exception=False
                    )
                    logger.info(f"Indexed {success} documents successfully.")
                except Exception as e:
                    logger.error(f"Error during bulk indexing: {e}")
 
    except KeyboardInterrupt:
        logger.info("Consumer stopped by user.")
    except Exception as e:
        logger.error(f"An unexpected error occurred: {e}")

    finally:
        consumer.close()
        logger.info("Consumer closed.")

The primary goal of this function is to coordinate the ingestion of Kafka messages and their subsequent indexing into Elasticsearch.

Creation of Consumers and Clients: It logs the configuration of the consumers and creates Elasticsearch and Kafka consumers.

  • Message Polling: It starts a never-ending loop to check every second for new messages from Kafka.
  • It logs the count and processes any messages that are received.
  • Using the bulk helper function, it creates actions for bulk indexing and tries to index them into Elasticsearch.
  • The error is logged if it happens while indexing.
  • Exit: By pressing the keyboard shortcut CTRL + C, you can end the script gracefully by closing the consumer and logging a termination message.

Step 3: Visualize Data in Kibana

Now that the consumer is working as expected, let's confirm our configuration in Kibana. In your Windows browser, open the URL http://localhost:5601/. To confirm the index was created, use the command prompt.

curl -X GET -x "" "localhost:9200/randomuser_index?pretty"
ElasticSearch View

Ensure both Producer and Consumer Python scripta are running now before performing our test.

Return 10 Documents from Index. Use the command below to return 10 records from the ElasticSearch index.

GET /randomuser_index/_search?pretty
{
  "query": {
    "match_all": {}
  },        
  "size": 10
}
Get 10 Values in the Index

Count Records. Using the count, we can get the total number of records currently available in our index. You will notice a total of 127 records are currently in the ElasticSearch index.

GET /randomuser_index/_count?pretty
Count the records in the Index

After a couple of minutes, you will notice the record has increased to 153 in the ElasticSearch index.

Count values

Create a Consumer Script and Push to Azure Data Lake Gen 2

We also need to store the messages from the Kafka topic to Azure Data Lake Gen 2 as a JSON file format. This will be used by the data scientist and data analysis of the team.

Step 1: Set App Registry and Service Principal

Azure's Set App Registry and Service Principal are fundamental ideas that enable safe resource access for automated tools and apps.

In our previous article, Provisioning Azure Resources with Azure CLI, we went deep in provisioning and configuring Azure App Registry and Service Principal. I advise going through this article as more concepts were covered in setting the necessary securities.

Step 2: Set Security File

Create a new file in the project folder and name it .env, which is a configuration file that stores environment variables for your application. The .env file contains the following security information that will be used in our Python code.

.env credential files

In the Azure Data Lake Gen 2, notice the storage account name and file system name, which is the container.

Storage Account

Step 3: Consumer ADLS Python Script

In your VS Code, create a new Python script called consumeradls.py. Write the line of code below, which is a consumer that picks data from Kafka topics, deserializes it, and loads it to a folder in Azure Data Lake Gen 2 in real time as a JSON file.

import os
import json
import logging
from kafka import KafkaConsumer
from azure.identity import ClientSecretCredential
from azure.storage.filedatalake import DataLakeServiceClient
from kafka.errors import KafkaError
 
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 
# Function to create a Kafka Consumer
def create_kafka_consumer(topic):
    try:
        consumer = KafkaConsumer(
            topic,
            bootstrap_servers=['localhost:9092'],
            auto_offset_reset='earliest',  # Adjust as per use case
            enable_auto_commit=True,
            group_id='my-group-id',  # Adjust to your specific group
            value_deserializer=lambda x: json.loads(x.decode('utf-8'))
        )
        logging.info("Kafka Consumer created successfully.")
        return consumer
    except KafkaError as e:
        logging.error(f"Failed to create Kafka consumer: {str(e)}")
        return None
 
# Function to upload the JSON file to Azure Data Lake Gen 2 using Service Principal
def upload_to_datalake_gen2(json_data, filename):
    try:
        # Service Principal credentials from environment variables
        account_name = os.getenv("account_name")
        file_system_name = os.getenv("file_system_name")  # Equivalent to the container name in Data Lake Gen 2
        client_id = os.getenv("client_id")
        client_secret = os.getenv("client_secret")
        tenant_id = os.getenv("tenant_id")
 
        # Authenticate using Service Principal
        credential = ClientSecretCredential(tenant_id=tenant_id, client_id=client_id, client_secret=client_secret)
 
        # Create DataLakeServiceClient object
        service_client = DataLakeServiceClient(account_url=f"https://{account_name}.dfs.core.windows.net", credential=credential)
 
        # Get the file system (container) client
        file_system_client = service_client.get_file_system_client(file_system=file_system_name)
 
        # Create a new file in the specified directory
        file_path = f"randomuserjson/{filename}"
        file_client = file_system_client.get_file_client(file_path)
 
        # Upload the JSON data to the file
        file_contents = json.dumps(json_data)
        file_client.upload_data(file_contents, overwrite=True)
 
        logging.info(f"File {file_path} uploaded to Azure Data Lake Gen 2 successfully.")
    except Exception as e:
        logging.error(f"Failed to upload to Azure Data Lake Gen 2: {str(e)}")
       
# Main function to consume messages from Kafka and send them to Azure Data Lake
def main():
    topic = "mssqltipsrandomuser-topic"
    consumer = create_kafka_consumer(topic)
    if not consumer:
        logging.error("Exiting due to Kafka consumer creation failure.")
        return
 
    try:
        for message in consumer:
            user_data = message.value
            logging.info(f"Received user data: {user_data}")
 
            # Generate a unique filename using timestamp or UUID
            filename = f"user_data_{message.timestamp}.json"
 
            # Upload to Azure Data Lake Gen 2
            upload_to_datalake_gen2(user_data, filename)
 
    except KeyboardInterrupt:
        logging.info("Process terminated by user.")
    finally:
        consumer.close()
 
if __name__ == "__main__":
    main()

Code Breakdown

Kafka Consumer Creation Function

def create_kafka_consumer(topic):
    try:
        consumer = KafkaConsumer(
            topic,
            bootstrap_servers=['localhost:9092'],
            auto_offset_reset='earliest',
            enable_auto_commit=True,
            group_id='my-group-id',
            value_deserializer=lambda x: json.loads(x.decode('utf-8'))
        )
        logging.info("Kafka Consumer created successfully.")
        return consumer
    except KafkaError as e:
        logging.error(f"Failed to create Kafka consumer: {str(e)}")
        return None
  • For a given topic, this function builds a Kafka consumer.
  • It establishes a connection with the localhost:9092 Kafka server.
  • If an offset cannot be identified, it begins reading from the earliest message and commits offsets automatically.
  • The incoming message is converted from bytes to a JSON object via the value_deserializer.
  • It logs the success if it works, and the error if it doesn't.

Uploading to Azure Data Lake Gen 2

def upload_to_datalake_gen2(json_data, filename):
    try:
        # Service Principal credentials from environment variables
        account_name = os.getenv("account_name")
        file_system_name = os.getenv("file_system_name")
        client_id = os.getenv("client_id")
        client_secret = os.getenv("client_secret")
        tenant_id = os.getenv("tenant_id")
 
        # Authenticate using Service Principal
        credential = ClientSecretCredential(tenant_id=tenant_id, client_id=client_id, client_secret=client_secret)
 
        # Create DataLakeServiceClient object
        service_client = DataLakeServiceClient(account_url=f"https://{account_name}.dfs.core.windows.net", credential=credential)
 
        # Get the file system (container) client
        file_system_client = service_client.get_file_system_client(file_system=file_system_name)
 
        # Create a new file in the specified directory
        file_path = f"randomuserjson/{filename}"
        file_client = file_system_client.get_file_client(file_path)
 
        # Upload the JSON data to the file
        file_contents = json.dumps(json_data)
        file_client.upload_data(file_contents, overwrite=True)
 
        logging.info(f"File {file_path} uploaded to Azure Data Lake Gen 2 successfully.")
    except Exception as e:
        logging.error(f"Failed to upload to Azure Data Lake Gen 2: {str(e)}")
  • JSON data is uploaded using this function to Azure Data Lake Storage Gen 2.
  • It obtains the credentials required for authentication from environment variables.
  • To communicate with Azure storage, a DataLakeServiceClient is created.
  • It creates the directory for the new file, overwrites any existing files with the same name, and then uploads the JSON data.
  • The upload's success or failure is noted appropriately in the log.

Main Function to Consume Messages and Upload Data

def main():
    topic = "mssqltipsrandomuser-topic"
    consumer = create_kafka_consumer(topic)
    if not consumer:
        logging.error("Exiting due to Kafka consumer creation failure.")
        return
 
    try:
        for message in consumer:
            user_data = message.value
            logging.info(f"Received user data: {user_data}")
 
            # Generate a unique filename using timestamp or UUID
            filename = f"user_data_{message.timestamp}.json"
 
            # Upload to Azure Data Lake Gen 2
            upload_to_datalake_gen2(user_data, filename)
 
    except KeyboardInterrupt:
        logging.info("Process terminated by user.")
    finally:
        consumer.close()
  • The Kafka consumer for a given topic is initialized by the main function.
  • It loops through incoming messages.
    • It records the user data it receives for every transmission.
    • Uses the message timestamp to generate a unique filename.
    • To upload this data as a JSON file, use the upload_to_datalake_gen2 function.

It makes sure that the consumer is closed correctly and elegantly responds to keyboard disruptions.

Step 4: Test Consumer

Now that we have completed the consumer script, let's run it and see if it works as expected.

You will notice this success message from the terminal.

adls consumer script test

Next, head to Azure Storage Account (Data Lake Gen 2) and confirm if data is uploaded to Folder in the Container. You will notice the JSON files are being uploaded to the storage account.

View values

Let's view one of the JSON files to confirm. You will notice that the data is being captured as expected in the storage account.

View JSON file in adls

Connect Power BI to Azure Data Lake Gen 2

The Azure Data Lake Storage (ADLS) is a complete cloud-based storage system created especially for big data analytics. It improves performance and scalability while incorporating capabilities from the previous Azure Data Lake Storage Gen1 to build upon Azure Blob Storage.

The following steps should be followed to connect Power BI to Azure Data Lake Gen 2.

Step 1: Get Data Lake Endpoint

ADLS endpoint is a URL by which you can access the data that is kept in your Azure Data Lake Storage Gen2 account. It acts as a portal for you to use different tools and programs to interact with your data.

In your Azure storage account, search for endpoints. This should take you to another window.

Get Endpoint

In the new Window, copy the endpoint for Data Lake Storage.

Copy Endpoint

Step 2: Get Access Key

The ADLS Access Keys are security credentials used to validate and grant access to your Azure Data Lake Storage Gen2 account. They provide the ability to manage who has access to your data and what they can do with it.

In your Azure storage account, search for Access keys, then copy the key to be used for connecting with Power BI.

Get Access Key

Step 3: Connect Fabric Dataflow Gen 2

In your Power BI portal, create a new Workspace that will be used to house all our resources. In your workspace, click on New item and select Lakehouse.

Create Lakehouse

In the new window, put in the Lakehouse name you want to use and click Create.

Lakehouse name

In your Lakehouse environment, select Get data and then click New Dataflow Gen2. This will open a new window where you are expected to select the type of data to use.

Create Data Flow Gen 2

In the Data Flow Gen 2 Get Data, search for Azure Data Lake Storage Account.

Choose data source

You are expected to paste the Azure Data Lake Endpoint to the URL and add the folder directory to the link.

URL to data sources

Change the Authentication kind to Account key, paste the Access key, then click Next.

Authentication type

In the Power Query Preview for Data Flow Gen 2, click Combine. This will automatically append all the data in the ADLS folder.

Data Preview in Data Flow

In your Power Query Editor, you can perform the needed transformation and click Publish. This will publish the transformed data to the Lakehouse we created earlier.

Transform Data

You can also view the report in Power BI Desktop by connecting with the Dataflow created.

Data View Power BI Desktop

Conclusion

This is the first part of a two-part series. This article covered all the processes in creating a real-time streaming architecture using Apache Kafka. We started by creating a producer that pulls data from a fake URL site to a Kafka topic. We then created a consumer script that moves data to Elasticsearch for storage and Azure Data Lake Gen 2.

The data stored in ADLS was then used in Microsoft Fabric for analytic purpose using the Data Lake Gen 2 to perform ETL from the cloud level on the Azure Data Storage. For the Microsoft Fabric section, it is more of a batch process involved in getting data from ADLS using Dataflow Gen 2. In a future series, I will explain how to stream data directly to Microsoft Fabric using EventStream in Fabric.

Download the code for this article.

Next Steps


sql server categories

sql server webinars

subscribe to mssqltips

sql server tutorials

sql server white papers

next tip



About the author
MSSQLTips author Temidayo Omoniyi Temidayo Omoniyi is a Microsoft Certified Data Analyst, Microsoft Certified Trainer, Azure Data Engineer, Content Creator, and Technical writer with over 3 years of experience.

This author pledges the content of this article is based on professional experience and not AI generated.

View all my tips


Article Last Updated: 2024-11-11

Comments For This Article

















get free sql tips
agree to terms