Contents

Apache Kafka 做中學

這篇是根據 KK Learn By Doing: Beginner’s Guide to Apache Kafka - Foundation and Development 所做的筆記與實作紀錄。

學習目標是要能夠將 Kafka architecture 以及 essential components 套用在服務紀錄需求面,只是這 2.5 小時的課可能還是遠遠不夠,都有人把 Kafka 寫成 30 篇系列文章了。

A distributed event streaming platform, widely used for building real-time data pipelines and applications

01 introduction to Kafka

Kafka 是開源分布式事件串流平台 (open-sourced, distributed, event streaming platform),由 Apache Software Foundation 開發,原本是 LinkedIn 建立,在 2011 年開源。

Kafka 是設計來處理實時資料,協助組織建構穩定、可擴充、且容錯率高的資料管線。

https://i.imgur.com/lQwhpfd.png

Core components

  • Producer : 將 record 送往 Kafka topics 的 app,負責選擇要將 record 送到該 topic 中的哪一個 partition
  • Consumer : 從 Kafka topics 讀取 record 的 app,consumer 訂閱 topics 並將訊息作即時處理
  • Broker : 運行 Kafka 的伺服器,broker 從 producer 接收訊息,存在 disk,再將其發送到 consumers。一個 Kafka cluster 包含多個 brokers,用來確保負載平衡與容錯率
  • Topic : 是一個 logical channel,使 producer 用以發送 record,consumer 用以讀取 record。為了平行化與可擴張性,topic 會分片 (partitioned)
  • Partition: 每個 topic 都拆分成 partitions,這個 partition 是有序且不可變的紀錄序列 (sequence of records),使 Kafka 能夠水平擴展並在每個 partition 裡面維持紀錄順序
  • ZooKeeper or Kraft: 用來做分布式協調、配置管理、Kafka broker 與 topics 的主節點選舉

Kafka architecture

  • Topics & Partitions: Topic 再切分成 partitions,partition 是平行化跟可擴張性的基礎單位。每一個 partition 都是有序不可變的紀錄序列。partition 當中的每一個 record 都會分配到一個唯一的偏移量 (unique offset)。partitions 使得 Kafka 藉由分布式資料以及負載於多個 brokers 這兩種方式,達到可擴展性。
  • Producers & Consumers: Kafka 支援 pub-sub 模型,多個 conusmer 可以訂閱到相同一個 topic,並獨立處理 data。
  • Brokers & Clusters: Kafka broker 負責儲存與提供資料。Kafka cluster 包含多個 brokers 用來確保容錯性與高可用性。Brokers 分布在不同個 machines 以避免硬體故障造成資料遺失。
  • Zooker Coordination : Zookeeper 或 KRaft 管理配置檔以及 Kafka broker 之間合作,協助進行 partitions 主節點選舉,追蹤 broker metadata。但 Kafka 從 verion 2.8 開始移除 ZooKeeper 依賴,改用 KRaft mode。

Kafta 在分布式串流平台的角色

  • 即時資料消化:Kafka 可以用於消化不同來源的 (logs, sensors, user interactions) 即時資料,對於大流量資料也有收集儲存的方法,具有擴充性與容錯性
  • 串流處理:Kafka 無縫整合串流處理框架,像是 Apache Flink, Apache Spark, Kafka Streams。結合這些能夠即時處理分析資料,也可以偵測詐騙,推薦引擎技術以及監控
  • 資料整合:Kafka 在資料整合扮演 central hub,使得資料能夠跨系統、跨應用程式移動,支援 connector 連向不同資料來源以及資料槽,方便建構 data pipeline
  • 事件溯源 (event sourcing):應用程式的狀態變更會被 Kafka 紀錄成事件序列,提供了一個可靠、可審計的方式來追縱過去一段時間內的異動
  • 資料佇列:Kafka 也能作為分布式訊息佇列,在應用程式的不同元件之間能夠非同步溝通,提供了 producer 與 consumer 間的解耦,提高了應用程式的可擴展性與彈性
  • 日誌集成:Kafka 普遍用於日誌集成,從多個不同服務收集 logs 做集中處理,有利於監控、除錯、並從 log 資料獲取事件有關的洞察。
  • 指標集合與監控:Kafka 可以從多個不同系統中搜集並彙整指標,提供了即時監測與告警功能,有利維持應用程式與基礎建設的健康與效能

Kafka 生態系

  • Kafka Connect:將 Kafka topic 資料匯出到外部系統
  • Kafka Streams:輕量化的串流處理 library
  • Kafka REST Proxy:提供與 kafka 互動的 RESTful 介面,讓 app 可以透過 HTTP 生產/消費訊息
  • Schema Registry:將讀寫數據所需的 schema 作存儲/序列化/反序列化,用於版本控管與前後兼容性
  • KSQL:用直觀且較為熟悉的 query language 簡化了攥流處理與即時串流資料分析的工作

Apache Kafka 使用案例

  • 即時分析:分析即時資料,提供洞察並提供主動決策
  • 事件驅動架構:不同服務透過事件溝通,提高可擴張性與低耦合度
  • 微服務:促進微服務之間非同步且可信賴的資料交互
  • 日誌集成與監控:從不同服務間集成日誌內容,可供集中化管理與告警
  • 資料整合:資料整合、跨系統搬移資料的中心,確保一致性與可靠度

為什麼使用 Kafka

  1. 高吞吐低延遲
    • performance:低延遲處理高吞吐量的即時資料串流,透過有效 disk storage 機制以及高效能 networking capabilities。Kafka 架構讓你能每秒處理上百萬則訊息,相當適合需要高吞吐的應用系統
  2. 可擴充性
    • 水平擴充:藉由往 cluster 增加更多 broker 來達成,而 Kafka 的每個 topic 都有做 partition,這些 partitions 可以遍佈在多個 brokers 各處,確保 Kafka 處理遞增負荷同時,不會降低效能
    • Elasticity:卡夫卡的 partition based 架構可以動態擴展。當負荷增加了,會加入更多 partition 與 broker 而不會造成 downtime,提供了彈性化的擴展性
  3. 耐久性與容錯
    • Replication:Kafka 在多個不同 broker 間複製資料,確保資料耐久性與可用度。複本機制保證即使其中一個或多個 broker 壞了,還是可以存取資料
    • Log-based storage:用 append-only 的方式,確保資料在 disk 的持久性,最小化資料毀損 (data corruption) 的可能性也提供了有效率的資料復原
  4. 彈性多元
    • 多樣的使用案例:Kafka 提供了像是 real-time analytics 即時分析、event sourcing 資料溯源、log aggregation 日誌集成、metric collection 指標收集、stream processing 串流處理等等使用案例,可以應付眾多情境
    • 整合生態系:可以無縫接軌,跟 Kafka Connect 做資料整合、Kafka Streams 做串流處理、串接外部處理框架,例如 Apache Flink 以及 Apache Spark
  5. 保證訊息順序
    • Message ordering:Kafka 確保了單一個 partition 內,嚴格的訊息順序,對於需要事件先後順序的應用程式而言至關重要
    • Delivery semantics:Kafka 支援多樣的傳送語意,包括 at-most-once, at-least-once, exactly-one delivery。可以讓開發者根據需求選擇合適等級的保證度
  6. 高可用
    • Leader-follower architecture: 主節點選舉可以確保 HA,每個 partition 有一個 leader 多個 followers,當 leader 倒了會有一個 follower 被升上去,不用人為介入就能達成持續可用性
  7. 成本效率
    • 有效的資源利用率 (both storage and compute),log-structure storage 機制將 disk I/O 最小化,distributed nature 保證 cluster 其中的負載平衡
    • 開源,沒有與私人訊息系統相關的 licensing 成本
  8. 活躍的社群支援
    • Confluent 提供企業級專業功能
  9. 串流處理能力
    • Kafka Streams : 原生自己的 stream processing library
    • KSQL : 使 user 可以用 SQL-like language query

與其它工具評比

Kafka v.s. RabbitMQ
  • Throughput : 相較於 RabbitMQ,Kafka 提供較高的吞吐性,更適合用於高流量資料串流
  • Scalability : 相較於 RabbitMQ 以訊息佇列為基礎的模型,Kafka 的 partition based 架構在擴展上更簡易有效率
  • Durability : Kafka 的 log-based 存儲與複製提供更好的耐久性與容錯度
Kafka v.s. Apache Pulsar
  • Architecture : Pulsar 提供分層架構,將 serving layer 與 storage layer 拆分開來,在某些情境下屬於優勢。但 Kafka 整合的架構比較簡單,也已經證實了效能
  • Maturity & Ecosystem : Kafka 有比較成熟的生態系、區域更廣的整合度與工具。Pulsar 比較新,在社群支援與生態系還有進步空間
Kafka v.s. Amazon Kinesis
  • Vendor lock-in: 相較於 AWS Kinesis 前者是開源,能夠在本地機房或任何雲端環境上運行,Kinesis 可能會有廠商鎖定問題
  • Feature Set: Kafka 提供的 feature set,例如 Kafka Connect 以及 Kafka Streams 提供了比 Kinesis 更好的彈性與整合選項

01-tasks

  1. Apache Kafka is primarily designed for real-time data feeds
  2. Which of below best describes Kafka’s role in event sourcing? Tracking state changes in applications
  3. How does Kafka support real-time stream processing? By using Kafka Streams and integrating with processing frameworks like Apache Flink
  4. What use case does Kafka support by acting as a distributed message queue? Asynchronous communication between application components
  5. Why is Kafka favored over other providers for high-throughput applications? Kafka can process millions of messages per second with low latency
  6. How does Kafka achieve fault tolerance in distributed environment? By replicating data accross multiple brokers
  7. What feature makes Kafka scalable and suitable for elastic workloads? Kafka’s ability to dynamically adjust partitions and brokers without downtime
  8. Which of the following is NOT a common use case for Kafka? Image process is NOT a common use cases. The other three are real-time analytics, event-driven architecture and log aggregations and monitoring.
  9. What is one key advantage of Kafka over RabbitMQ? Kafka has better scalability with its partition based architecture
  10. In comparison to Apache Pulsar, what is a key advantage of Kafka? Kafka’s integrated architecture is simpler and more proven compared to Pulsar which has different layers for storage and serving.
  11. What is a significant advantage of Kafka over AWS Kinesis? Kafka has built-in stream processing with Kafka Streams and KSQL.
  12. How does Kafka ensure strict message ordering within a partition? By assigning offsets to each message within a partition

02 components & architecture

這章節繼續介紹 Kafka ABC(基本的元素),像是 broker、producer、consumer、topics、partitions, 以及把它們綁在一起的 architecture,著重於複本與容錯機制。

2-1 Kafka Broker

2-1-1 Kafka Broker 是啥

Kafka Broker 是 Kafka cluster 之中的一個 server,用來存放 data 並且提供 clients (producer & consumer) 服務。Broker 處理所有對於 topics 的讀寫操作,一個 Kafka cluster 包含一到多個 broker 來確保擴展性與容錯性。

嗯?Kafka 的 Topic 跟 RabbitMQ 的 channel 似曾相似

2-1-2 Broker 在 Kafka 的角色

Broker 從 producer 接收訊息,將偏移量 (offsets) 分派給該訊息,並將該訊息提交給 disk storage。另外也服務 consumer,回應要取得特定 topics 與特定 partitions 的請求。此外 Broker 還負責做訊息複本以確保容錯率。

2-1-3 啟動一個 kafka broker

通常是用 shell script 帶入 位於 <some-path>/kafka/config/xxx.properties 的配置檔案,以下是簡易版

1
kafka-server-start.sh config/server.properties

2-2 Kafka Producers and Consumers

2-2-1 Kafka Producers

Producer:將訊息發布/寫入到 Kafka topics 的應用程式,決定哪個 record 要被分派到哪一個 topic 的哪一段 partition,如下示例

1
2
3
4
5
6
7
8
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>("my-topic", "key", "value"));
producer.close();

2-2-2 Kafka Consumers

Consumer:從 Kafka topics 讀取訊息,或訂閱 topics 的應用程式,可以平行化讀取多個 brokers 並消費訊息,如下示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

2-3 Kafka Topics and Partitions

2-3-1 Topics

Topic:一個種類或者訊息類型名稱,是 records 要發布的標的。Kafka Topics 都是有多個訂閱者,翻成白話文就是一個 topic 可以有零個、一個或者許多個 consumer 訂閱寫入這個 topics 的資料。

2-3-2 Partitions

Partition:是一個 topic 的部分片段。Topics 可以有多個 partitions,以便處理總數上下多變的資料,另外 partition 也將 data 分佈在多個不同 brokers 中,以達到 topics 平行化處理。

建立一個具有多重 partitions 的 topic
1
2
3
4
/root/kafka/bin/kafka-topics.sh \
--create --bootstrap-server localhost:9092 \
--replication-factor 1 --partitions 3 \
--topic my-first-topic

2-4 Kafka Architecture

卡夫卡架構重點有三 (1) 高容錯度 (2) 可擴長性 (3) 能夠處理高流量資料,以下幾個關鍵元素:

2-4-1 Role of Brokers in Architecture

Brokers 是整個 Kafka cluster 的骨幹,每個 broker 能處理 terabytes 數百萬的訊息,又不影響效能。Brokers 共同協作提供服務並負載 clients 請求與 data 平衡。

2-4-2 主節點複本與從節點副本

每個 topic 的 每一個 partition 都會有一個 broker 作為主節點 (leader),其它 broker 當作從屬節點 (followers)。主節點處這個 partition 的所有讀跟寫的請求,從屬節點則複製 leader 來確保 data redundancy 以及 fault tolerance

描述 topic 來查看主從節點副本
1
2
3
/root/kafka/bin/kafka-topics.sh \
--describe --bootstrap-server localhost:9092 \
--topic my-topic

2-4-3 藉由副本達成容錯度

Kafka 對所有配置的 servers 底下的 partition 都會做副本日誌,這樣即便 server 故障,資料還是能夠從另外一個 broker(此 broker 持有故障的 partition 副本)還原。

透過瞭解上述的 Kafka 核心組件與架構,user 能設計更穩固、可擴展、且容錯度高的 streaming apps。

2-5 Tasks

  1. What is the primary role of a Kafka broker? To store data and server clients (producers and consumers)

  2. How does Kafka ensure fault tolerance? By replicating data across multiple brokers

  3. What command is used to start a Kafka broker? kafka-server-start.sh

  4. Which component is responsible for publishing messages to Kafka topics? Producer

  5. In the context of Kafka, what is a topic? A category or feed name to which records are published

  6. What does a Kafka consumer do? It subscribes to topics and processes messages

  7. What is a partition in Kafka? A subset of a topic’s data

  8. Which broker is the leader for the partition of my-topic?

    1
    2
    3
    4
    5
    
    $ /root/kafka/bin/kafka-topics.sh --describe \
    --topic my-topic \
    --bootstrap-server localhost:9092
    Topic: my-topic TopicId: KInvVHdEQQ6jazgiRfjt8Q PartitionCount: 1       ReplicationFactor: 1 Configs: 
            Topic: my-topic Partition: 0    Leader: 0       Replicas: 0     Isr: 0
    

03 producer & consumers

這一段落聚焦於 producers 以及 consumers,將探索這兩個 component 的角色、如何與 Kafka 互動,會使用 Kafka CLI 提供實際練習案例來生產/消費訊息。

3-1 Kafka Producers

Kafka producer 負責發布 records/messages 到 Kafka topics,可以把 topic 看成是 records 要發布的類型或訊息來源名稱。Producer 把資料送進 Kafka brokers,讓 broker 確保資料有儲存,也有副本容錯。

3-1-1 Producer 如何作用

Producer 將資料做序列化(把它轉成 bytes)再透過 network 傳送至 Kafka cluster。這個 cluster 根據所定義的 partition strategy(例如 round-robin, key-based partitioning),將資料存進該 topic 下適當的 partition。

範例:產生 message

使用 Kafka CLI 命令 kafka-console-producer,如下範例:

1
2
3
/root/kafka/bin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic exampleTopic

這個指令會開啟一個 prompt,在這之後你輸入的每一行都會被發佈到在 localhost:9092 運行的 Kafka cluster 當中,名為 exampleTopic 的 topic。

3-1-2 Kafka Producer 的幾個重要配置內容

  • linger.ms:控制了該 producer 在發送一個 batch of messages 之前要等待多久,設置比較高的數值可以透過允許一次發送比較多的訊息量,進而提高 throughput,但也可能提高 latency。
  • acks:決定 producer 在認定一則訊息『已發送』之前,要經過多少 acknowledgements
    • acks=all 表示主節點需要等待所有副本都提供 acknowledgement,優點:訊息持久性,缺點:可能提高 latency
  • batch.size:控制 producer 要發送的單一 batch 的最大容量 (in bytes),比較大包的 batch 可以提高 throughput 但是也需要比較多 memory

3-2 Kafka Consumers

Kafka consumer 從 topics 讀取 records,consumer 訂閱一到多個 topics,按照產生的順序讀取 records

3-2-1 Consumer 如何作用

Conumer 使用一個 pull model 來從 broker 取得資料,另外也透過管理 offsets 偏移量來追蹤消費過的 records。Offsets 實質上是 pointers 指向這個 consumer 讀取到的上一則 record。Kafka 儲存這些 offsets,讓 consumer 即使在壞掉或重啟後,都能夠從上一次讀取過的段落重新開始。

Consumer Groups

Kafka consumer 可以作為 consumer groups 的一部分。當有多個 conumsers 共同處於一個 group,Kafka 則確保每個 partition 只會被這個 consumer groups 的 其中一個 consumer 所消費。這個機制可以讓資料分發傳送給多個 consumers 處理,達到可擴展與容錯性。

範例:Conumer messages

使用 kafka-console-consumer 的 Kafka CLI 指令可以從一個 topic 讀取/消費訊息,如下例:

1
2
3
/root/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic exampleTopic --from-beginning

這個指令會印出來自於 exampleTopic topic 的訊息,按照產生的順序,從最舊到最新顯示

3-2-2 Kafka Consumer 的幾個重要配置內容

  • from-beginning:在 consumer command 加入這個 flag,使 client 能夠從 topic 所能提供之 latest offset 開始消費訊息。沒有加這個 flag 的話,consumer 只會先從 latest offset 讀取訊息
  • group.id:定義這個 consumer 要屬於哪一個 consumer group,Kafka 確保一個 consumer group 當中,一次只會有一個 consumer 處理一個 partition,將其它負載分發傳送到相同 group 裡面的多個 consumers。
  • isolation.level:這個設置控制該 consumer 在處理交易性 topics 時,是否要讀取已提交或者尚未提交的訊息。把他設定為 read_committed,會確保該 consumer 只會讀取完全提交的訊息。

3-3 實務上考量

serialization/deserialization、partition、offset 管理,在實作 producer consumer 時都需要仔細考量,這三個面向對於 kafka-based application 的效率性與可靠性都至關重要。

3-3-1 Serialization/deserialization

Producer 把 message 序列化成 bytes –> 送進 Kafka –> Consumer 將 bytes 反序列化回原始資料格式。Kafka 支援多種序列化格式,包含 JSON、Avro、Protobuf。

3-3-2 Partitioning

適當的 partitioning 確保在 Kafka cluster 裡面有效率的資料分派。Producer 可以給每個 message 指明一個 key,這個 key 會被 Kafka 用來決定該 message 會被存進指定 topic 當中的哪一個 partition。

3-3-3 Offset management

Consumer 使用 offset 追縱他們的進度,因此 offsets 需要仔細管理,以確保 consumer app 對於所有訊息都有依照產生順序消費,不會有重複讀取/消費的問題。

3-4 Tasks

  1. Apache Kafka is a distributed streaming platform that allows for the building of real-time streaming data pipelines and applications. At its core, Kafka manages records in a fault-tolerant and scalable way. This labs focus on two essential components of Kafka: producer and consumer

  2. Kafka producers are responsible for publishing records to Kafka topics, while Kafka consumers read records from topics. Understanding how these components interact with Kafka is crucial for building efficient and reliable streaming applications.

  3. Which of the following Kafka producer configurations controls the amount of time a producer waits before sending a batch of messages? linger.ms

  4. Which Kafka producer configuration ensures that the producer waits for acknowledgments from all replicas before considering a message as successfully sent? acks=all

  5. Which Kafka CLI command is used to produce message to a topic? kafka-console-producer.sh

  6. Use the kafka-console-producer command to send Hi, this is my first messsage to the myFirstTopic topic

    1
    2
    3
    4
    5
    6
    
    cd /root/kafka/bin/
    ls
    cat kafka-console-producer.sh
    kafka-console-producer.sh --topic myFirstTopic \
    --bootstrap-server localhost:9092
    > Hi, this is my first messsage
    
  7. Which Kafka CLI command is used to consume messages to a topic? kafka-console-consumer.sh

  8. When using Kafka’s CLI consumer, what does the --from-beginning flag do? Consumes messages from the start of a topic

  9. In Kafka CLI consumer, which of the following command allows you to specify the consumer group that the client should join? –group

  10. What is the purpose of the --isolation-level configuration in the Kafka consumer CLI? To determine whether the consumer reads committed or uncommitted messages

  11. Use the Kafka console consumer to read messages from the topic myFirstTopic and store them in /root/messages

    1
    2
    3
    4
    5
    6
    
    /root/kafka/bin/kafka-console-consumer.sh --topic myFirstTopic \
    --bootstrap-server localhost:9092 \
    --from-beginning --timeout-ms 1000
    Hi, this is my first message
    a29kZWtsb3VkCg==
    Processed a total of 2 messages
    

04 Kafka topics & partitions

這章節要解開 topics 跟 partition 這兩個 kafka 的謎霧。重複強調一下,Kafka 是 distributed streaming platform, 可以協助您達到高吞吐、且具有容錯性的即時資料來源處理。不管是 Kafka 小萌新,或者是打算將這兩個概念了解透徹,要有效率的使用 Kafka,了解 Topic 與 Partition 都是關鍵要素。現在要講如何用 topics 幫 message 分類,以及 partitions 如何啟用 Kafka 的可擴展性以及平行處理的能力。

4-1 Kafka Topics

將 Kafka topic 想像成一個欄位或者一個儲存訊息的資料夾。Topics 是 Kafka producer 與 Kafka consumer 之間溝通的方式。每一則發佈到 Kafka cluster 的訊息都會被分派一個特定的 topic,使得這個 message 能夠被訂閱這個 topic 的任何 consumer group 所讀取/消費。

4-1-1 建立一個 Kafka Topic

使用 kafka-topics CLI 指令工具建立一個新的 Kafka topic

1
2
3
4
/root/kafka/bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 --partitions 3 \
--topic my-first-topic

這指令會建立一個名為 my-first-topic 的 topics,有三個 partition,replication factor 是 1

4-1-2 列出 Kafka Topics

要查看 Kafka cluster 現在有的 topic 清單,可以用以下指令

1
/root/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092

4-1-3 了解 Topics characteristics

  • Immutability 不可變異性:一旦有訊息寫入一個 topic 就不能被異動。這是 kafka 設計的一個關鍵功能
  • Retention Policy 保留政策:Kafka topic 可以配置訊息保留政策,決定訊息再被刪除之前要保留多久一段時間(可以根據時間或者訊息大小而定)

4-2 Kafka Partitions

Partitions 是 Kafka 確保擴展性以及容錯的方式。每一個 topic 會被區分成多個 partitions,每一個 partition 可以放在一個 cluster 當中,不同的 kakfa brokers。可以使得一個 topic 的訊息分佈在 cluster 各處,達到平行化處理並提高吞吐量。

4-2-1 為啥需要 Partitions

  • Parallelism 平行化:partition 允許多個 consumers 平行化讀取一個 topic 的訊息,顯著提升了系統的吞吐量
  • Scalability 可擴展性:隨著 message 流量增加,可以加更多 partitions 近來,將負載分派到比較多的 brokers。

4-2-2 建立 Partitions

建立 topics 可以指定 partitions 數量,您也可以直接修改既有 topics 的 partitions 數量

1
2
3
/root/kafka/bin/kafka-topics.sh --alter \
--bootstrap-server localhost:9092 \
--topic my-first-topic --partitions 6

上述指令會將既有的 my-first-topicpartitions 數量升到 6 個

4-2-3 Partitions 如何運作

  • Ordering: 在一個 partition 內的所有訊息都保證會以他們寫入的先後順序儲存,但是跨 partitions 的 message 順序就不保證
  • Consumer Groups: 一次只能有 consumer group 的唯一一名成員消費這個 partition 的訊息,以確保訊息有按照順序被處理

4-3 實際案例:生產/消費訊息

4-3-1 發送 message 進一個 topic

1
2
3
/root/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-first-topic
> Hello, Kafka!
> This is a message.

4-3-2 從一個 topic 消費 message

1
/root/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-first-topic --from-beginning

這指令會從名為 my-first-topic 的 topic,從最先前的地方開始顯示所有訊息

4-4 Tasks

  1. In this section, we will explore the fundamental concepts of Kafka topics and partitions. Topics serve as categories for messages, while partitions allow for scalability and parallel processing.

  2. What is a Kafka topic? A category or a folder where messages are stored

  3. Create a Kafka Topic

    • Utilize the Kafka CLI tool to create a topic named my-first-topic with 3 partitions and a replication factor of 1
    • Use the Kafka binary located at /root/kafka/bin/ to interact with Kafka
    1
    2
    3
    4
    
    /root/kafka/bin/kafka-topics.sh \
    --create --bootstrap-server localhost:9092 \
    --replication-factor 1 --partitions 3 \
    --topic my-first-topic
    
  4. How many topics are in this cluster?

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    
    /root/kafka/bin/kafka-topics.sh --list \
    --bootstrap-server localhost:9092 
    customer-feedback
    inventory-updates
    my-first-topic
    order-processing-queue
    user-registration-events
    /root/kafka/bin/kafka-topics.sh --list \
    --bootstrap-server localhost:9092 | wc -l
    5
    
  5. Partitions in Kafka play a crucial role in scalability and fault tolerance. By dividing each topic into multiple partitions, Kafka enables parallel processing, which significantly improves throughput and system performance.

  6. Why are partitions important in Kafka? They allow for parallelism and scalability.

  7. Increase the number of partitions for the topic my-first-topic to 6

    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    # alter
    /root/kafka/bin/kafka-topics.sh --alter \
      --bootstrap-server localhost:9092 \
      --topic my-first-topic \
      --partitions 6
    # verify
    root/kafka/bin/kafka-topics.sh --describe \
      --bootstrap-server localhost:9092 \
      --topic my-first-topic
    
  8. What is the retention policy in Kafka? A policy to determine how long messages are kept before being deleted

  9. Change the retention time for messages in the my-second-topic to 7 days

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    
    # set config "retention.ms" to 7 days (=604800000 milliseconds)
    /root/kafka/bin/kafka-configs.sh --alter \
    --bootstrap-server localhost:9092 \
    --entity-type topics \
    --entity-name my-second-topic \
    --add-config retention.ms=604800000
    # verfy
    /root/kafka/bin/kafka-configs.sh --describe \
    --bootstrap-server localhost:9092 \
    --topic my-second-topic
    
  10. Produce messages to a Kafka topic

    Use the Kafka-console-producer to send the message {"user": "Alice", "action": "login", "timestamp": "2024-12-03T10:00:00Z"} to the topic my-first-topic

    1
    2
    3
    4
    
    /root/kafka/bin/kafka-console-producer.sh \
    --broker-list localhost:9092 \
    --topic my-first-topic
    > {"user": "Alice", "action": "login", "timestamp": "2024-09-30T10:00:00Z"}
    
  11. Consume messages from a Kafka topic

    Use the Kafka console consumer to read messages from the topic my-first-topic and store them in /root/messages

    1
    2
    3
    
    /root/kafka/bin/kafka-console-consumer.sh --topic my-first-topic \
    --bootstrap-server localhost:9092 \
    --from-beginning --timeout-ms 1000 > /root/messages
    
  12. Delete a Kafka topic

    Use the Kafka CLI tool to delete the topic my-first-topic

    1
    2
    3
    
    /root/kafka/bin/kafka-topics.sh --delete \
    --topic my-first-topic \
    --bootstrap-server localhost:9092
    

05 Kafka environment setup

這一章節目的要帶你走過整個從零開始建立 Kafka 環境的流程,使用到 Kafka 本身以及 ZooKeeper 用於 cluster 管理,程度適合 beginner。

5-1 先備知識

在建立 kafka setup 之前要有以下知識背景:

  • Linux-based 的系統(這裡提到的指令都用於 linux 環境)
  • Command-line 基本知識
  • 系統已安裝 curl ,用來下載 Kafka
  • tar 用於解壓縮 Kafka archive
  • 系統已安裝 Java,用來運行 Kafka
  • Root 或者 sudo 權限,用來建立 service files

5-2 實際步驟

Step 1: 下載 Kafka

可以從官網下載 Apache Kafka,但是為了便利性,這裡使用 curl 從 command line 直接下載 Kafka,以下指令下載的是 version 3.7.1 版本。需要新的版本的話,可以去 Kafka 下載頁面查看

1
curl -L https://downloads.apache.org/kafka/3.7.1/kafka_2.13-3.7.1.tgz -o ~/Downloads/kafka.tgz

Step 2: 解壓縮 Kafka

一旦下載完成,需要解壓縮到你想要放的目錄裡。以下指令會在你的 home 目錄建立一個新的 directory 給 Kafka,並將 archive 解壓縮至此

1
2
mkdir ~/kafka && cd ~/kafka
tar -xvzf ~/Downloads/kafka.tgz --strip 1 -C ~/kafka

Step 3: 建立 ZooKeeper Systemd Service

使用以下指令建立 ZooKeeper 的 systemd service file

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
sudo vi /etc/systemd/system/zookeeper.service
# paste below config to the editor
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=root
ExecStart=/root/kafka/bin/zookeeper-server-start.sh /root/kafka/config/zookeeper.properties
ExecStop=/root/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target
# save and close vi editor

Step 4: 設定 Kafka Server

類似 ZooKeeper,Kafka 也需要 system service 來做自動化管理,使用以下指令建立 Kafka service file

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
sudo vi /etc/systemd/system/kafka.service
# paste below config to the editor
[Unit]
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=root
ExecStart=/bin/sh -c '/root/kafka/bin/kafka-server-start.sh /root/kafka/config/server.properties > /root/kafka/kafka.log 2>&1'
ExecStop=/root/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target
# save and close vi editor

Step 5: 啟動服務

兩個 services files 皆到位,就可以 enable and start ZooKeeper 跟 Kafka

1
2
3
4
sudo systemctl enable zookeeper
sudo systemctl start zookeeper
sudo systemctl enable kafka
sudo systemctl start kafka

5-3 驗證安裝

以下指令用來驗證 Kafka 以及 Zookeeper 皆已正確安裝。如果都正確無誤,那這兩個 services 應該都在 active 狀態

1
2
sudo systemctl status zookeeper
sudo systemctl status kafka

5-4 Task

  1. Apache Kafka is a powerful distributed streaming platform that enables you to build real-time streaming data pipelines and applications. Setting up a Kafka environment involves installing Kafka itself along with ZooKeeper, which Kafka uses for cluster management.

  2. Pre-requisite per above

  3. Install Java 17 as Kafka depends on this

    1
    
    sudo apt update && sudo apt install -y openjdk-17-jdk
    
  4. What is the primary purpose of ZooKeeper in a Kafka environment? To manage cluster metadata and configurations

  5. Download the latest version of Kafka and store it in the /home/bob/ directory with the filename kafka.tgz

    1
    
    curl -L https://downloads.apache.org/kafka/3.7.1/kafka_2.13-3.7.1.tgz -o /home/bob/kafka.tgz
    
  6. Extract the Kafka archive to the /home/bob/kafka/ directory

    1
    
    mkdir -p /home/bob/kafka && tar -xvzf /home/bob/kafka.tgz --strip 1 -C /home/bob/kafka
    
  7. What is the correct path to the Kafka config file? /home/bob/kafka/config/server.properties

  8. What is the correct path to the ZooKeeper config file? /home/bob/kafka/config/zookeeper.properties

  9. Create a systemd service file to manage ZooKeeper as a service.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    
    sudo vi /etc/systemd/system/zookeeper.service
    # paste below
    [Unit]
    Requires=network.target remote-fs.target
    After=network.target remote-fs.target
    
    [Service]
    Type=simple
    User=root
    ExecStart=/home/bob/kafka/bin/zookeeper-server-start.sh /home/bob/kafka/config/zookeeper.properties
    ExecStop=/home/bob/kafka/bin/zookeeper-server-stop.sh
    Restart=on-abnormal
    
    [Install]
    WantedBy=multi-user.target
    # save and exit
    # reload the systemd daemon to recognize the new service file
    sudo systemctl daemon-reload
    
  10. Create a systemd service file to manage Kafka as a service.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    
    sudo vi /etc/systemd/system/kafka.service
    # paste
    [Unit]
    Requires=zookeeper.service
    After=zookeeper.service
    
    [Service]
    Type=simple
    User=root
    ExecStart=/bin/sh -c '/home/bob/kafka/bin/kafka-server-start.sh /home/bob/kafka/config/server.properties > /home/bob/kafka/kafka.log 2>&1'
    ExecStop=/home/bob/kafka/bin/kafka-server-stop.sh
    Restart=on-abnormal
    
    [Install]
    WantedBy=multi-user.target
    # save and exit
    # reload the systemd daemon to apply the changes
    sudo systemctl daemon-reload
    
  11. Enable and start both the ZooKeeper and Kafka services to ensure they run at boot and are active immediately.

    1
    2
    3
    4
    5
    6
    7
    8
    
    # enable ZooKeeper to start at boot
    sudo systemctl enable zookeeper
    # start the ZooKeeper service
    sudo systemctl start zookeeper
    # enable Kafka to start at boot
    sudo systemctl enable kafka
    # start the Kafka service
    sudo systemctl start kafka
    
  12. Install Kafdrop as a UI for managing your Kafka cluster with a graphical interface. Create Kafkaui as a service and ensure it is enabled and started automatically on boot?

    Solution

    Follow these steps to install Kafdrop and set it up as a systemd service for your Kafka cluster: Step 1: Download Kafdrop

    Use the following curl command to download the Kafdrop JAR file to the /opt directory:

    1
    
    sudo curl -L https://github.com/obsidiandynamics/kafdrop/releases/download/4.0.2/kafdrop-4.0.2.jar -o /opt/kafdrop-4.0.2.jar
    

    Step 2: Create a systemd Service for Kafdrop

    1. Create a systemd service file for Kafdrop. Open a new file using a text editor:
    1
    
    sudo vi /etc/systemd/system/kafkaui.service
    
    1. Add the following content to the service file:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    
    [Unit]
    Description=Web UI for administration of Kafka clusters
    Requires=kafka.service
    After=kafka.service
    
    [Service]
    User=root
    WorkingDirectory=/opt/
    ExecStart=/usr/bin/java --add-opens=java.base/sun.nio.ch=ALL-UNNAMED -jar kafdrop-4.0.2.jar --kafka.brokerConnect=ubuntu-host:9092
    StartLimitInterval=0
    RestartSec=10
    Restart=always
    
    [Install]
    WantedBy=multi-user.target
    
    1. Reload the systemd daemon to recognize the new service file:
    1
    
    sudo systemctl daemon-reload
    
    1. Enable and Start the Kafdrop Service:
    1
    2
    
    sudo systemctl enable kafkaui.service
    sudo systemctl start kafkaui.service
    

06 hands-on with producers & consumers

6-1 撰寫 Kafka Producers

Producer 負責將 data 送到 Kafka topics 以利 consumer 消費。這章節會分別使用 Java 與 Python 展示建立連線、傳送訊息、處理錯誤的細節:

6-1-1 Producer 長話短說

Kafka producers 是負責將 records 傳送進 Kafka topics 的應用程式,負責處理資料序列化(data –> byte streams) 以及分派 records 到一個 topic 的哪些個 partitions。

一個好的 Kafka producer should be designed to be efficient, scalable, resilient。

6-1-2 以 Java 建立 Kafka Producer

建立一個 Gradle Project
  1. Create a new directory for the project

    1
    2
    
    mkdir kafka-producer
    cd kafka-producer
    
  2. Initialize the Gradle project

    1
    
    gradle init --type java-applications
    
  3. Add Kafka dependencies

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    
       plugins {
           id 'application'
       }
    
       repositories {
           mavenCentral()
       }
    
       dependencies {
           implementation 'org.apache.kafka:kafka-clients:3.0.0'
       }
    
       application {
           mainClassName = 'com.example.SimpleProducer'
       }
    
4. 建立連線

Create a Java class for the producer

建立 src/main/<path-to-your-producer-package>/SimpleProducer.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
   package com.example;

   import org.apache.kafka.clients.producer.KafkaProducer;
   import org.apache.kafka.clients.producer.Producer;
   import org.apache.kafka.clients.producer.ProducerRecord;
   import java.util.Properties;

   public class SimpleProducer {
       public static void main(String[] args) {
           Properties props = new Properties();
           props.put("bootstrap.servers", "localhost:9092");
           props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
           props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

           Producer<String, String> producer = new KafkaProducer<>(props);

           String topicName = "example-topic";
           String message = "Hello, Kafka!";

           producer.send(new ProducerRecord<>(topicName, "key1", message));
           producer.close();
       }
   }
解釋
  • bootstrap.servers:要使用的 Kafka cluster 位址
  • key.serializer & value.serializer:如何將 producer 要傳送的 key-value 物件轉成 bytes
5. 發送訊息

要發送訊息的話,則要建立 ProducerRecord 內容包含 topic name、key、value,再呼叫 send 方法。

1
2
3
4
5
6
7
8
9
String topicName = "example-topic";
String key = "key1";
String value = "Hello, Kafka!";

// Create a ProducerRecord
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);

// Send the record
producer.send(record);
解釋
  • ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);

    這裡建立了一個 record,包含指定的 topic, key, value

  • producer.send(record);

    將 record 發送至 Kafka cluster

6. 錯誤處理

使用 callbacks 以及 try-catch block 處裡錯誤,這裡修改一下 send 方法讓它包含 callback

1
2
3
4
5
6
7
producer.send(new ProducerRecord<>(topicName, key, value), (metadata, exception) -> {
    if (exception == null) {
        log.info("Message sent successfully to topic {} partition {}, offset {}", metadata.topic(), metadata.partition(), metadata.offset());
    } else {
        log.error("Error sending message: {}", exception.getMessage());
    }
});

除此之外也要用 finally 把 producer 給關閉以釋放資源

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
try {
    producer.send(new ProducerRecord<>(topicName, key, value), (metadata, exception) -> {
        if (exception == null) {
            log.info("Message sent successfully to topic {}, partition {}, offset {}", metadata.topic(), metadata.partition(), metadata.offset());
        } else {
            log.error("Error sending message: {}", exception.getMessage());
        }
    });
} catch (Exception e) {
    log.error("Error: {}", e.getMessage());
} finally {
    producer.close();   # donnot forget to release resources
}
解釋
  • try-catch-finally:確保 producer 在成功/有錯誤失敗的情況後,都正確關閉
  • callback:處理 send 操作的結果,印出成功/失敗錯誤訊息

6-1-3 以 Python 建立 Kafka Producer

建立一個 Python Project
  1. create a new directory for the project

    1
    2
    
    mkdir kafka-producer
    cd kafka-producer
    
  2. create and activate a virtual environment

    1
    2
    
    python3 -m venv venv
    source venv/bin/activate
    
  3. install Kafka-Python

    1
    
    pip install kafka-python
    
建立連線

新增一個 file producer.py 撰寫以下這些程式碼

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092',
                         key_serializer=lambda k: k.encode('utf-8'),
                         value_serializer=lambda v: v.encode('utf-8'))

topic_name = 'example-topic'
message = 'Hello, Kafka!'

producer.send(topic_name, key='key1', value=message)
producer.close()
解釋
  • bootstrap.servers:要使用的 Kafka cluster 位址
  • key.serializer & value.serializer:如何將 producer 要傳送的 key-value 物件轉成 bytes
發送訊息

要發送訊息的話,則呼叫 send 方法,包含 topic name, key, value。serializers 確保 key value 都被加密成 bytes

1
2
3
4
5
topic_name = 'example-topic'
key = 'key1'
value = 'Hello, Kafka!'

producer.send(topic_name, key=key, value=value)
解釋
  • producer.send(topic_name, key=key, value=value)

    將 record/message 發送至 Kafka cluster,內容包含指定 topic, key, value

錯誤處理

使用 try-except 區塊處理錯誤,並調整 send 方法以納入錯誤處理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from kafka.errors import KafkaError

try:
    future = producer.send(topic_name, key='key1', value=message)
    record_metadata = future.get(timeout=10)
    print(f'Message sent successfully to topic {record_metadata.topic}, partition {record_metadata.partition}, offset {record_metadata.offset}')
except KafkaError as e:
    print(f'Error sending message: {e}')
finally:
    producer.close()
解釋
  • try-except-finally:確保 producer 不管成功失敗都正確關閉
  • KafkaError:捕獲 Kafka operation 相關的錯誤
  • future.get(timeout=10):等待發送操作完成,並取得 metadata 或拋出錯誤

6-1-4 Task

  1. Introduction to Kafka Producers: Kafka producers are applications that send records to Kafka topics. A record is a key-value pair that can also have metadata such as a timestamp. Producers send these records to categories called topics.

  2. What is the role of a Kafka producer? An application that sends records to Kafka topics

  3. Which property specifies the address of your Kafka cluster in the producer configuration? bootstrap.servers

  4. What does the acks property in Kafka producer configuration signify? The number of acknowledgements the producer requires from the cluster

  5. Add below in the build.gradle file

    1
    2
    3
    4
    5
    
    dependencies {
        implementation 'org.apache.kafka:kafka-clients:3.3.1'   # add this
        implementation 'ch.qos.logback:logback-classic:1.2.10'  
        testImplementation 'junit:junit:4.13.2'                 
    }
    
  6. Which serializer is used for the key in the producer configuration?

    org.apache.kafka.common.serialization.IntegerSerializer

    org.apache.kafka.common.serialization.ByteArraySerializer

    The key and value serializers need to convert the data into bytes.

    org.apache.kafka.common.serialization.StringSerializer

    org.apache.kafka.common.serialization.LongSerializer

  7. Configure the Kafka producer in /root/kafka-producer/.../SimpleProducer. Build the project after configuring SimpleProducer.java

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    
    vi src/main/java/com/example/SimpleProducer.java
    # paste
       package com.example;
    
       import org.apache.kafka.clients.producer.KafkaProducer;
       import org.apache.kafka.clients.producer.Producer;
       import org.apache.kafka.clients.producer.ProducerRecord;
       import java.util.Properties;
    
       public class SimpleProducer {
           public static void main(String[] args) {
               Properties props = new Properties();
               props.put("bootstrap.servers", "localhost:9092");
               props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
               props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
               Producer<String, String> producer = new KafkaProducer<>(props);
    
               String topicName = "example-topic";
               String message = "Hello, Kafka!";
    
               producer.send(new ProducerRecord<>(topicName, "key1", message));
               try {
                   producer.send(new ProducerRecord<>(topicName, "key1", message));
               } catch (Exception e) {
                   System.out.println("Error: "+ e.getMessage());
               } finally {
                   producer.close(); 
               }  
           }
       }
    # save and exit
    cd /root/kafka-producer
    # ls and check that the gradlew is here
    # run gradle to build the project
    ./gradlew build
    
  8. What does producer.send() do? It sends record to Kafka cluster.

  9. Send the message Hi, this message is from the Java producer to the kodekloud Kafka topic using the Java producer.

    Build the project after configuring SimpleProducer.java

    To run the java code use java -jar /root/kafka-producer/build/libs/kafka-producer.jar command.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    
    # navigate to the project directory
    cd /root/kafka-producer
    # create the necessary directories for your Java source files if they do not already exist
    mkdir -p src/main/java/com/example
    # use the SimpleProducer created per previous task to modify
    package com.example;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.util.Properties;
    
    public class SimpleProducer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
    
            // Send the message to the "kodekloud" Kafka topic
            String topicName = "kodekloud";
            String message = "Hi, this message is from the Java producer";
            producer.send(new ProducerRecord<>(topicName, "messageKey", message));
            System.out.println("Message sent: " + message);
    
            producer.close();
        }
    }
    # save and exit
    # run the java code
    ./gradlew clean build
    
  10. Which command is used to retrieve the message from the kodekloud topic on a Kafka broker running at localhost:9092?

    1
    2
    3
    
    /root/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
      --topic kodekloud \
      --from-beginning
    
  11. What is the purpose of the callback mechanism in Kafka producer? Error handling

  12. Implement error handling in the Kafka producer using a callback.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    
    # implemente error handling in the Kafka producer using a callback
    package com.example;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.clients.producer.Callback;
    
    import java.util.Properties;
    
    public class SimpleProducer {
        public static void main(String[] args) {
            // Step 1: Set the properties for the Kafka producer
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            // Step 2: Create a Kafka producer
            Producer<String, String> producer = new KafkaProducer<>(props);
    
            // Step 3: Define the topic and message to send
            String topicName = "kodekloud";
            String message = "Hi, this message is from the Java producer";
    
            // Step 4: Create a callback to handle success and error scenarios
            Callback callback = new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        // The message was sent successfully
                        System.out.println("Message sent successfully to topic " + metadata.topic() +
                                           " partition " + metadata.partition() +
                                           " offset " + metadata.offset());
                    } else {
                        // There was an error sending the message
                        System.err.println("Error sending message: " + exception.getMessage());
                    }
                }
            };
    
            // Step 5: Send the message with the callback
            producer.send(new ProducerRecord<>(topicName, "messageKey", message), callback);
    
            // Step 6: Close the producer
            producer.close();
        }
    }
    # save and exit
    ./gradlew clean build
    
  13. Implement a Kafka producer using Python. Your implementation should include setting up the producer to connect to a Kafka broker running on localhost:9092 and define a topic named payment-transactions. Use the kafka-python library to achieve this. Send the following message to the payment-transactions topic: Payment processed successfully

    Create a file named SimpleProducer.py in the /root/kafka-producer directory and write your script in this file.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    
    
    pip install kafka-python
    
    vi /root/kafka-producer/SimpleProducer.py
    
    from kafka import KafkaProducer
    
    def main():
        # Kafka configuration
        kafka_config = {
            'bootstrap_servers': 'localhost:9092',
            'key_serializer': str.encode,
            'value_serializer': str.encode
        }
    
        # Create Kafka producer
        producer = KafkaProducer(**kafka_config)
    
        # Send the message to the "payment-transactions" Kafka topic
        topic_name = "payment-transactions"
        message_key = "transactionKey"
        message_value = "Payment processed successfully"
    
        # Produce the message
        producer.send(topic_name, key=message_key, value=message_value)
    
        # Wait for all messages to be sent
        producer.flush()
    
    if __name__ == "__main__":
        main()
    
  14. To implement error handling in the Kafka producer using a KafkaError. We can use the below code.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    
    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    
    def on_send_success(record_metadata):
        print(f"Message sent to topic: {record_metadata.topic}, partition: {record_metadata.partition}, offset: {record_metadata.offset}")
    
    def on_send_error(excp):
        print(f"Error while producing message: {excp}")
    
    def main():
        # Kafka configuration
        kafka_config = {
            'bootstrap_servers': 'localhost:9092',
            'key_serializer': str.encode,
            'value_serializer': str.encode
        }
    
        # Create Kafka producer
        producer = KafkaProducer(**kafka_config)
    
        # Send the message to the "payment-transactions" Kafka topic
        topic_name = "payment-transactions"
        message_key = "transactionKey"
        message_value = "Payment processed successfully"
    
        # Produce the message with error handling
        producer.send(topic_name, key=message_key, value=message_value).add_callback(on_send_success).add_errback(on_send_error)
    
        # Wait for all messages to be sent
        producer.flush()
    
    if __name__ == "__main__":
        main()
    

6-2 撰寫 Kafka Consumers

此章節目的在讓讀者了解如何使用 Java API + Gradle 以及 Python 撰寫 Kafka consumer,重點在於 consumer 配置、資料讀取、管理 offset。Consumer 在實現即時資料處理分析扮演很重要的角色(從 Kafka topics 讀取資料)。

6-2-1 Kafka Consumer

Kafka Consumer 從 Kafka cluster 讀取資料,consumers 會訂閱一到多個 topics,並處理收到的 streams of records。在 Kafka 中,consumer 負責追蹤它已經處理的 record,這個追蹤行為就稱為管理偏移量。

使用 Kafka Consumer 的原因
  • 可擴展性:consumer 能夠水平擴展,以平行化讀取 topics 內容
  • 容錯:支援 automatic offset commit 能力,確保 consumer 故障時也不會有資料遺失
  • 彈性:consumer 可以從某一個特定的 offset 開始讀取,允許多樣的 processing strategies(例如 reprocessing historical data)

6-2-2 設定環境

在撰寫 Kafka consumer 之前,需要先設定開發環境,包括安裝 Kafka、用 gradle 建立 Java project、建立 Python 合適環境。

安裝 Kafka

從官網下載 / 解壓縮 Kafka,啟動 ZooKeeper 以及 Kafka server

1
2
3
4
# Start Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# Start Kafka Server
bin/kafka-server-start.sh config/server.properties

6-2-3 以 Java 建立 Kafka Consumer

建立一個目錄,然後初始化一個 Gradle 專案

1
2
3
mkdir kafka-consumer-java
cd kafka-consumer-java
gradle init --type java-application

將 Kafka client dependency 加入 build.gradle 檔案

1
2
3
dependencies {
    implementation 'org.apache.kafka:kafka-clients:2.8.0'
}

接下來要撰寫一個 Kafka consumer 用來訂閱 topics 並將 message 印到 console

基本的 consumer config

建立 src/<path-to-your-consumer-package>/SimpleConsumer.java 初始化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;

public class SimpleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Arrays.asList("test-topic"));
            // Consumer logic goes here
        }
    }
}
讀取訊息

在 try block 裡面加入 poll for new message 並印出來的邏輯

1
2
3
4
5
6
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}

以上是簡單的 Java Kafka Consumer。

6-2-4 以 Python 建立 Kafka Consumer

確定已經安裝了 Python,然後建立一個新的 virtual environment

1
2
python3 -m venv kafka-consumer-python
source kafka-consumer-python/bin/activate

安裝 Kafka Python package

1
pip install kafka-python
基本的 consumer config

建立一個 python script simple_consumer.py,以下是基本的配置內容:

1
2
3
4
5
6
7
8
9
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'test-topic',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    group_id='test-group',
    value_deserializer=lambda x: x.decode('utf-8')
)
讀取訊息

加入持續讀取訊息並印出來的邏輯

1
2
for message in consumer:
    print(f"offset = {message.offset}, key = {message.key}, value = {message.value}")

以上是簡單的 Python Kafka Consumer。

6-2-5 管理 Offsets

Kafka consumer 使用 offset 來追蹤已經消費過的訊息。預設會自動提交 offset,但是也能做手動管理以提供更精細的控制。

自動 offset 提交

上述兩個 consumers 都使用 automatic offset commiting,這個行為是由 enable.auto.commit 的配置所控制(預設是 true)

手動 offset 提交

如果要 manual offset commiting 的話:

  • Java:將 enable.auto.commit=false ,在 message loop 裡面調用 commitSync 方法

    1
    2
    3
    
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    // Inside the polling loop
    consumer.commitSync();
    
  • Python:在 message loop 裡面調用 commit 方法

    1
    2
    3
    
    consumer = KafkaConsumer(enable_auto_commit=False)
    # Inside the message loop
    consumer.commit()
    

6-2-6 Task

  1. What is the primary role of a Kafka consumer in a Kafka ecosystem? To read and process messages from Kafka topics

  2. What is a Kafka Consumer Group used for? To allow multiple consumers to read messages from a topic in parallel without overlapping

  3. What is the first step in creating a Kafka consumer in Java? Configuring consumer properties such as bootstrap server and group ID

  4. Which of the following is a required configuration property when creating a Kafka consumer in Java? group.id

  5. After configuring the properties, which class is used to instantiate a Kafka consumer in Java? KafkaConsumer

  6. After instantiating a Kafka consumer, what method is used to subscribe to a Kafka topic? subscribe()

  7. Once a Kafka consumer is subscribe to a topic, which method is typically used to pull messages from Kafka? poll()

  8. After calling the poll() method, what data structure is returned to the consumer containing the records? ConsumerRecords

  9. To commit the offsets of processed messages manually, which method should be used? commitSync()

  10. Configure the Kafka Consumer in Java using the /root/kafka-consumer project. Create a Java class SimpleConsumer in kafka-consumer/src/main/java/com/example and set up the consumer.

    Build the project after configuring SimpleConsumer.java

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    
    cd /root/kafka-consumer
    mkdir -p src/main/java/com/example
    vi src/main/java/com/example/SimpleConsumer.java
    package com.example;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.Properties;
    
    public class  SimpleConsumer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
    
    
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
            // Create the Kafka consumer
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
        }
    }
    # save and exit
    ./gradlew build
    
  11. After configuring and subscribing the Kafka consumer to the topic order-event, write the Java code snippet that pulls messages from the topic.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    
    package com.example;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class  SimpleConsumer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
    
    
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
            // Create the Kafka consumer
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
            // Add below codes for task# 11 🌟
            // Subscribe to the 'order-events' topic
            consumer.subscribe(Collections.singletonList("order-events"));
    
            // Poll for new messages and print them to the console
            try {
                while (true) {
                    // Poll Kafka broker for new records (with a timeout of 1000 milliseconds)
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    
                    // Process each record and print to console
                    for (ConsumerRecord<String, String> record : records) {
                        // Print consumed message details to the console
                        System.out.printf("Consumed message: key = %s, value = %s, partition = %d, offset = %d%n",
                                record.key(), record.value(), record.partition(), record.offset());
                    }
                }
            } finally {
                // Ensure the consumer is closed properly
                consumer.close();
            }
        }
    }
    
  12. Which library provides Python support for working with Kafka in a more Kafka-native style, supporting both consumers and producers? kafka-python

  13. Write the Python code snippet that pulls messages from the payment-transactions topic using kafka-python.

    Create a file named SimpleConsumer.py in the /root/ directory and write your script in this file.

    NOTE: Run the Python code to pull messages by executing: python3 /root/SimpleConsumer.py and do not stop or terminate the code.

    1
    2
    3
    4
    
    # install kafka-python library
    pip install kafka-python
    # create a python script
    vi /root/SimpleConsumer.py
    
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    
    # At SimpleConsumer.py
    from kafka import KafkaConsumer
    
    # Set up the Kafka consumer
    consumer = KafkaConsumer(
        'payment-transactions',
        group_id='my-group',
        bootstrap_servers=['localhost:9092'],
        auto_offset_reset='earliest'
    )
    
    # Poll messages from the topic
    for message in consumer:
        print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}")
        print(f"Key: {message.key}, Value: {message.value.decode('utf-8')}")
    
    1
    2
    
    # run the python code to pull messages
    python3 /root/SimpleConsumer.py
    

    結語

    1. 不確定是否要用 http api 進行 producer 發佈訊息的動作?如果是的話,可能需要再 study Kafka Rest Proxy
    2. 不確定 consumer 這段用 Kafka Connect + MongoDB 方向是否正確?如果是的話,則要再 study Kafka MongoDB Sink Connector ,或者可能有 best practice example 可查看
    3. Zilla 這個工具看他放在官網的 before / after 示意圖,看來能夠直接取代 Kafka Rest Proxy 跟 Kafka Connect。
comments powered by Disqus