Kafka Cluster

type access

  • Operating System:

  • Terminal:

  • Shell:

  • Editor:

  • Package Manager:

  • Programming Language:

  • Database:

  • Utility:

  • Extension:

Apache Kafka is an event streaming platform that efficiently manages real-time data at scale. Kafka simplifies distributed logging, stream processing, data integration, and pub/sub messaging. For more comprehensive information, visit the Apache Kafka Documentation.

Core Services Overview

  1. Kafka Brokers

    • Kafka brokers are the core of the system. They store and serve messages for producers and consumers.

    • Kafka brokers use Apache Kafka Raft (KRaft) consensus protocol.

    • Learn more: Kafka Brokers, KRaft.

  2. Schema Registry

    • Provides a central repository for managing message schemas, ensuring consistency between producers and consumers.

    • Learn more: Schema Registry

  3. Kafka Connect

    • Facilitates integration between Kafka and external systems by enabling seamless data import and export.

    • Learn more: Kafka Connect

  4. REST Proxy

    • Offers an HTTP interface for producing and consuming messages, simplifying integration with web services.

    • Learn more: REST Proxy

  5. ksqlDB

    • A streaming SQL engine for processing data from Kafka topics using SQL-like syntax.

    • Learn more: ksqlDB

  6. Kafka UI

    • A web-based interface for monitoring and managing Kafka clusters.

    • Learn more: Kafka UI

Initialization

For details on using the Initialization parameter, refer to the Initialization - Bash script section of the documentation.

Kafka Cluster Organization

Kafka structures data and configuration in a logical manner, adapting to various cluster sizes. Below is an example of a single-node cluster layout:

./kafka-repo/
├── cluster_id
├── logs
│   └── 1
│       ├── connect-node1.log
│       ├── kafka-broker-node1.log
│       ├── kafka-ui.log
│       ├── kraft-node1
│       ├── ksql-node1.log
│       ├── rest-proxy-node1.log
│       └── schema-registry-node1.log
└── properties
    └── 1
        ├── connect-distributed-node1.properties
        ├── kafka-rest-node1.properties
        ├── kafka-ui-config.yml
        ├── ksql-server-node1.properties
        ├── schema-registry-node1.properties
        └── server-node1.properties

In a multi-node deployment, services are distributed across nodes to ensure data replication and high availability. Configuration property files can be customized to meet specific deployment requirements.

Kafka UI for Cluster Monitoring

Kafka UI provides an efficient way to monitor and manage Kafka clusters.

By default, the UI is accessible from the job progress view once the following message appears in the output logs on node1:

[INFO] All services started on node1.

From the UI, it is possible to create and manage topics, produce and visualize messages within these topics, initiate statistical analyses on selected topics, and execute KSQL queries, among other tasks.

Note

Alternatively, the Confluent Platform can be activated using the optional parameter Switch to Confluent Platform. However, using the Confluent Platform on a multi-node Kafka cluster requires a Confluent Enterprise subscription license.

Practical Workflow with Kafka

This section provides a step-by-step workflow for using Kafka, covering topic management, message production and consumption, and stream processing with ksqlDB.

The workflow can be reproduced by opening the integrated terminal of the app from the job progress view.

Topic management

Kafka topics serve as logical containers for messages. Creating topics ensures structured data organization.

New topics can be created by executing the following commands:

$ kafka-topics --create --topic products --partitions 1 --replication-factor 1 --bootstrap-server node1:9092
$ kafka-topics --create --topic orders --partitions 1 --replication-factor 1 --bootstrap-server node1:9092

They will appear in the Topics list within the Kafka UI side menu.

Message production

Kafka Producers publish data streams to topics, enabling real-time data ingestion.

As an example, run this command in the terminal:

$ kafka-console-producer --bootstrap-server node1:9092 --topic products --property parse.key=true --property key.separator=:

and add the following entries to the products topic:

101:{"make":"Apple","model":"iPhone 13","unit_price":799}
102:{"make":"Samsung","model":"Galaxy S21","unit_price":699}

The new entries can be viewed in the UI by selecting Messages from the products topic.

Message consumption

Kafka Consumers read data streams from topics, allowing real-time data retrieval:

$ kafka-console-consumer --topic products --from-beginning --bootstrap-server node1:9092

Tip

{"make":"Apple","model":"iPhone 13","unit_price":799} {"make":"Samsung","model":"Galaxy S21","unit_price":699}

Stream Processing with ksqlDB

Kafka’s ksqlDB enables real-time stream processing using SQL-like syntax.

Starting ksqlDB

To begin stream processing, start ksqlDB on a node where it is deployed, by opening the terminal in the job progress view:

$ ksql http://node1:8088

Tip

Debug Options:

              ===========================================
              =       _              _ ____  ____       =
              =      | | _____  __ _| |  _ \| __ )      =
              =      | |/ / __|/ _` | | | | |  _ \      =
              =      |   <\__ \ (_| | | |_| | |_) |     =
              =      |_|\_\___/\__, |_|____/|____/      =
              =                   |_|                   =
              =        The Database purpose-built       =
              =        for stream processing apps       =
              ===========================================

Copyright 2017-2022 Confluent Inc.

CLI v7.9.0, Server v7.9.0 located at http://node1:8088
Server Status: RUNNING

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>

Note

While Kafka brokers run on all nodes, ksqlDB is typically deployed only on specific nodes designated for stream processing.

Creating streams and tables

Streams handle continuous data events, while tables represent stateful data for quick lookups and aggregations.

-- Orders stream captures continuous order data.
CREATE STREAM orders_stream (order_id VARCHAR KEY, product_id VARCHAR, quantity INT)
WITH (VALUE_FORMAT='JSON', KAFKA_TOPIC='orders', PARTITIONS=1);

-- Products table stores persistent product information.
CREATE TABLE products_table (product_id VARCHAR PRIMARY KEY, make VARCHAR, model VARCHAR, unit_price DOUBLE)
WITH (KAFKA_TOPIC='products', VALUE_FORMAT='JSON', PARTITIONS=1);

Inserting orders into the stream

INSERT INTO orders_stream VALUES ('5001', '101', 2);
INSERT INTO orders_stream VALUES ('5002', '102', 1);

Querying data

SET 'auto.offset.reset' = 'earliest';
SELECT * FROM orders_stream EMIT CHANGES;

Data enrichment

ksqlDB combines streams and tables to enrich real-time data with additional context:

-- Enriched stream with detailed product info and total value calculation.
CREATE STREAM enriched_orders AS
SELECT o.order_id, o.product_id, p.make, p.model, o.quantity,
       (o.quantity * p.unit_price) AS total_order_value
FROM orders_stream o
JOIN products_table p
ON o.product_id = p.product_id
EMIT CHANGES;

Data verification

Verify enriched data easily in real-time:

SELECT * FROM enriched_orders EMIT CHANGES;

Kafka Connect: Deploying a Source Connector

Kafka Connect runs as a cluster on one or multiple nodes alongside Kafka. To deploy a connector, you must send requests to the correct Kafka Connect worker URL (http://<connect-node>:8083).

Deploying the DataGen connector

Kafka Connect’s REST API can be used to create a new DataGen connector. This connector generates test data and publishes it to a Kafka topic, simulating real-time streaming data.

To deploy the connector, run the following command, replacing <connect-node> with the hostname or IP of a node running a Kafka Connect worker:

curl -X POST -H "Content-Type: application/json" \
     --data '{
       "name": "datagen-users-connector",
       "config": {
         "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
         "quickstart": "users",
         "kafka.topic": "users",
         "key.converter": "org.apache.kafka.connect.storage.StringConverter",
         "value.converter": "org.apache.kafka.connect.json.JsonConverter",
         "value.converter.schemas.enable": "false",
         "max.interval": 1000,
         "iterations": 10000000,
         "tasks.max": 1
       }
     }' \
     http://<connect-node>:8083/connectors | jq .

Verifying data flow

To confirm that the connector is running and producing messages, check its status by querying the same Kafka Connect node:

$ curl http://<connect-node>:8083/connectors/datagen-users-connector/status | jq .

For example, in the case of node1, the expected output will be:

$ curl http://node1:8083/connectors/datagen-users-connector/status | jq .

Tip

{
  "name": "datagen-users-connector",
  "connector": { "state": "RUNNING", "worker_id": "node1:8083" },
  "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "node1:8083" } ],
  "type": "source"
}

If the connector is active, messages should be produced to the users topic. These messages can be consumed using the Kafka console consumer:

$ kafka-console-consumer --bootstrap-server node1:9092 \
                         --topic users --from-beginning \
                         --max-messages 5

Tip

{"registertime":1516362565298,"userid":"User_8","regionid":"Region_9","gender":"FEMALE"}
{"registertime":1516493067583,"userid":"User_9","regionid":"Region_8","gender":"MALE"}
{"registertime":1493180624971,"userid":"User_6","regionid":"Region_7","gender":"MALE"}
{"registertime":1511654825833,"userid":"User_7","regionid":"Region_5","gender":"MALE"}
{"registertime":1490374150812,"userid":"User_4","regionid":"Region_1","gender":"FEMALE"}
Processed a total of 5 messages

Additionally, the generated messages can be verified via the Kafka UI. By navigating to the Topics section and selecting the users topic, real-time messages can be observed.

Stopping the connector

To remove the connector, use the DELETE method on the correct Kafka Connect node. For example, in the case of node1, the command output will be:

$ curl -X DELETE http://<connect-node>:8083/connectors/datagen-users-connector

This stops the connector but does not delete the messages in Kafka.