Apache Kafka 做中學
這篇是根據 KK Learn By Doing: Beginner’s Guide to Apache Kafka - Foundation and Development 所做的筆記與實作紀錄。
學習目標是要能夠將 Kafka architecture 以及 essential components 套用在服務紀錄需求面,只是這 2.5 小時的課可能還是遠遠不夠,都有人把 Kafka 寫成 30 篇系列文章了。
A distributed event streaming platform, widely used for building real-time data pipelines and applications
01 introduction to Kafka
Kafka 是開源分布式事件串流平台 (open-sourced, distributed, event streaming platform),由 Apache Software Foundation 開發,原本是 LinkedIn 建立,在 2011 年開源。
Kafka 是設計來處理實時資料,協助組織建構穩定、可擴充、且容錯率高的資料管線。
Core components
- Producer : 將 record 送往 Kafka topics 的 app,負責選擇要將 record 送到該 topic 中的哪一個 partition
- Consumer : 從 Kafka topics 讀取 record 的 app,consumer 訂閱 topics 並將訊息作即時處理
- Broker : 運行 Kafka 的伺服器,broker 從 producer 接收訊息,存在 disk,再將其發送到 consumers。一個 Kafka cluster 包含多個 brokers,用來確保負載平衡與容錯率
- Topic : 是一個 logical channel,使 producer 用以發送 record,consumer 用以讀取 record。為了平行化與可擴張性,topic 會分片 (partitioned)
- Partition: 每個 topic 都拆分成 partitions,這個 partition 是有序且不可變的紀錄序列 (sequence of records),使 Kafka 能夠水平擴展並在每個 partition 裡面維持紀錄順序
- ZooKeeper or Kraft: 用來做分布式協調、配置管理、Kafka broker 與 topics 的主節點選舉
Kafka architecture
- Topics & Partitions: Topic 再切分成 partitions,partition 是平行化跟可擴張性的基礎單位。每一個 partition 都是有序不可變的紀錄序列。partition 當中的每一個 record 都會分配到一個唯一的偏移量 (unique offset)。partitions 使得 Kafka 藉由分布式資料以及負載於多個 brokers 這兩種方式,達到可擴展性。
- Producers & Consumers: Kafka 支援 pub-sub 模型,多個 conusmer 可以訂閱到相同一個 topic,並獨立處理 data。
- Brokers & Clusters: Kafka broker 負責儲存與提供資料。Kafka cluster 包含多個 brokers 用來確保容錯性與高可用性。Brokers 分布在不同個 machines 以避免硬體故障造成資料遺失。
- Zooker Coordination : Zookeeper 或 KRaft 管理配置檔以及 Kafka broker 之間合作,協助進行 partitions 主節點選舉,追蹤 broker metadata。但 Kafka 從 verion 2.8 開始移除 ZooKeeper 依賴,改用 KRaft mode。
Kafta 在分布式串流平台的角色
- 即時資料消化:Kafka 可以用於消化不同來源的 (logs, sensors, user interactions) 即時資料,對於大流量資料也有收集儲存的方法,具有擴充性與容錯性
- 串流處理:Kafka 無縫整合串流處理框架,像是 Apache Flink, Apache Spark, Kafka Streams。結合這些能夠即時處理分析資料,也可以偵測詐騙,推薦引擎技術以及監控
- 資料整合:Kafka 在資料整合扮演 central hub,使得資料能夠跨系統、跨應用程式移動,支援 connector 連向不同資料來源以及資料槽,方便建構 data pipeline
- 事件溯源 (event sourcing):應用程式的狀態變更會被 Kafka 紀錄成事件序列,提供了一個可靠、可審計的方式來追縱過去一段時間內的異動
- 資料佇列:Kafka 也能作為分布式訊息佇列,在應用程式的不同元件之間能夠非同步溝通,提供了 producer 與 consumer 間的解耦,提高了應用程式的可擴展性與彈性
- 日誌集成:Kafka 普遍用於日誌集成,從多個不同服務收集 logs 做集中處理,有利於監控、除錯、並從 log 資料獲取事件有關的洞察。
- 指標集合與監控:Kafka 可以從多個不同系統中搜集並彙整指標,提供了即時監測與告警功能,有利維持應用程式與基礎建設的健康與效能
Kafka 生態系
- Kafka Connect:將 Kafka topic 資料匯出到外部系統
- Kafka Streams:輕量化的串流處理 library
- Kafka REST Proxy:提供與 kafka 互動的 RESTful 介面,讓 app 可以透過 HTTP 生產/消費訊息
- Schema Registry:將讀寫數據所需的 schema 作存儲/序列化/反序列化,用於版本控管與前後兼容性
- KSQL:用直觀且較為熟悉的 query language 簡化了攥流處理與即時串流資料分析的工作
Apache Kafka 使用案例
- 即時分析:分析即時資料,提供洞察並提供主動決策
- 事件驅動架構:不同服務透過事件溝通,提高可擴張性與低耦合度
- 微服務:促進微服務之間非同步且可信賴的資料交互
- 日誌集成與監控:從不同服務間集成日誌內容,可供集中化管理與告警
- 資料整合:資料整合、跨系統搬移資料的中心,確保一致性與可靠度
為什麼使用 Kafka
- 高吞吐低延遲
- performance:低延遲處理高吞吐量的即時資料串流,透過有效 disk storage 機制以及高效能 networking capabilities。Kafka 架構讓你能每秒處理上百萬則訊息,相當適合需要高吞吐的應用系統
- 可擴充性
- 水平擴充:藉由往 cluster 增加更多 broker 來達成,而 Kafka 的每個 topic 都有做 partition,這些 partitions 可以遍佈在多個 brokers 各處,確保 Kafka 處理遞增負荷同時,不會降低效能
- Elasticity:卡夫卡的 partition based 架構可以動態擴展。當負荷增加了,會加入更多 partition 與 broker 而不會造成 downtime,提供了彈性化的擴展性
- 耐久性與容錯
- Replication:Kafka 在多個不同 broker 間複製資料,確保資料耐久性與可用度。複本機制保證即使其中一個或多個 broker 壞了,還是可以存取資料
- Log-based storage:用 append-only 的方式,確保資料在 disk 的持久性,最小化資料毀損 (data corruption) 的可能性也提供了有效率的資料復原
- 彈性多元
- 多樣的使用案例:Kafka 提供了像是 real-time analytics 即時分析、event sourcing 資料溯源、log aggregation 日誌集成、metric collection 指標收集、stream processing 串流處理等等使用案例,可以應付眾多情境
- 整合生態系:可以無縫接軌,跟 Kafka Connect 做資料整合、Kafka Streams 做串流處理、串接外部處理框架,例如 Apache Flink 以及 Apache Spark
- 保證訊息順序
- Message ordering:Kafka 確保了單一個 partition 內,嚴格的訊息順序,對於需要事件先後順序的應用程式而言至關重要
- Delivery semantics:Kafka 支援多樣的傳送語意,包括
at-most-once
,at-least-once
,exactly-one
delivery。可以讓開發者根據需求選擇合適等級的保證度
- 高可用
- Leader-follower architecture: 主節點選舉可以確保 HA,每個 partition 有一個 leader 多個 followers,當 leader 倒了會有一個 follower 被升上去,不用人為介入就能達成持續可用性
- 成本效率
- 有效的資源利用率 (both storage and compute),log-structure storage 機制將 disk I/O 最小化,distributed nature 保證 cluster 其中的負載平衡
- 開源,沒有與私人訊息系統相關的 licensing 成本
- 活躍的社群支援
- Confluent 提供企業級專業功能
- 串流處理能力
- Kafka Streams : 原生自己的 stream processing library
- KSQL : 使 user 可以用 SQL-like language query
與其它工具評比
Kafka v.s. RabbitMQ
- Throughput : 相較於 RabbitMQ,Kafka 提供較高的吞吐性,更適合用於高流量資料串流
- Scalability : 相較於 RabbitMQ 以訊息佇列為基礎的模型,Kafka 的 partition based 架構在擴展上更簡易有效率
- Durability : Kafka 的 log-based 存儲與複製提供更好的耐久性與容錯度
Kafka v.s. Apache Pulsar
- Architecture : Pulsar 提供分層架構,將 serving layer 與 storage layer 拆分開來,在某些情境下屬於優勢。但 Kafka 整合的架構比較簡單,也已經證實了效能
- Maturity & Ecosystem : Kafka 有比較成熟的生態系、區域更廣的整合度與工具。Pulsar 比較新,在社群支援與生態系還有進步空間
Kafka v.s. Amazon Kinesis
- Vendor lock-in: 相較於 AWS Kinesis 前者是開源,能夠在本地機房或任何雲端環境上運行,Kinesis 可能會有廠商鎖定問題
- Feature Set: Kafka 提供的 feature set,例如 Kafka Connect 以及 Kafka Streams 提供了比 Kinesis 更好的彈性與整合選項
01-tasks
- Apache Kafka is primarily designed for real-time data feeds
- Which of below best describes Kafka’s role in event sourcing? Tracking state changes in applications
- How does Kafka support real-time stream processing? By using Kafka Streams and integrating with processing frameworks like Apache Flink
- What use case does Kafka support by acting as a distributed message queue? Asynchronous communication between application components
- Why is Kafka favored over other providers for high-throughput applications? Kafka can process millions of messages per second with low latency
- How does Kafka achieve fault tolerance in distributed environment? By replicating data accross multiple brokers
- What feature makes Kafka scalable and suitable for elastic workloads? Kafka’s ability to dynamically adjust partitions and brokers without downtime
- Which of the following is NOT a common use case for Kafka? Image process is NOT a common use cases. The other three are real-time analytics, event-driven architecture and log aggregations and monitoring.
- What is one key advantage of Kafka over RabbitMQ? Kafka has better scalability with its partition based architecture
- In comparison to Apache Pulsar, what is a key advantage of Kafka? Kafka’s integrated architecture is simpler and more proven compared to Pulsar which has different layers for storage and serving.
- What is a significant advantage of Kafka over AWS Kinesis? Kafka has built-in stream processing with Kafka Streams and KSQL.
- How does Kafka ensure strict message ordering within a partition? By assigning offsets to each message within a partition
02 components & architecture
這章節繼續介紹 Kafka ABC(基本的元素),像是 broker、producer、consumer、topics、partitions, 以及把它們綁在一起的 architecture,著重於複本與容錯機制。
2-1 Kafka Broker
2-1-1 Kafka Broker 是啥
Kafka Broker 是 Kafka cluster 之中的一個 server,用來存放 data 並且提供 clients (producer & consumer) 服務。Broker 處理所有對於 topics 的讀寫操作,一個 Kafka cluster 包含一到多個 broker 來確保擴展性與容錯性。
嗯?Kafka 的 Topic 跟 RabbitMQ 的 channel 似曾相似
2-1-2 Broker 在 Kafka 的角色
Broker 從 producer 接收訊息,將偏移量 (offsets) 分派給該訊息,並將該訊息提交給 disk storage。另外也服務 consumer,回應要取得特定 topics 與特定 partitions 的請求。此外 Broker 還負責做訊息複本以確保容錯率。
2-1-3 啟動一個 kafka broker
通常是用 shell script 帶入 位於 <some-path>/kafka/config/xxx.properties
的配置檔案,以下是簡易版
|
|
2-2 Kafka Producers and Consumers
2-2-1 Kafka Producers
Producer:將訊息發布/寫入到 Kafka topics 的應用程式,決定哪個 record 要被分派到哪一個 topic 的哪一段 partition,如下示例
|
|
2-2-2 Kafka Consumers
Consumer:從 Kafka topics 讀取訊息,或訂閱 topics 的應用程式,可以平行化讀取多個 brokers 並消費訊息,如下示例
|
|
2-3 Kafka Topics and Partitions
2-3-1 Topics
Topic:一個種類或者訊息類型名稱,是 records 要發布的標的。Kafka Topics 都是有多個訂閱者,翻成白話文就是一個 topic 可以有零個、一個或者許多個 consumer 訂閱寫入這個 topics 的資料。
2-3-2 Partitions
Partition:是一個 topic 的部分片段。Topics 可以有多個 partitions,以便處理總數上下多變的資料,另外 partition 也將 data 分佈在多個不同 brokers 中,以達到 topics 平行化處理。
建立一個具有多重 partitions 的 topic
|
|
2-4 Kafka Architecture
卡夫卡架構重點有三 (1) 高容錯度 (2) 可擴長性 (3) 能夠處理高流量資料,以下幾個關鍵元素:
2-4-1 Role of Brokers in Architecture
Brokers 是整個 Kafka cluster 的骨幹,每個 broker 能處理 terabytes 數百萬的訊息,又不影響效能。Brokers 共同協作提供服務並負載 clients 請求與 data 平衡。
2-4-2 主節點複本與從節點副本
每個 topic 的 每一個 partition 都會有一個 broker 作為主節點 (leader),其它 broker 當作從屬節點 (followers)。主節點處這個 partition 的所有讀跟寫的請求,從屬節點則複製 leader 來確保 data redundancy 以及 fault tolerance
描述 topic 來查看主從節點副本
|
|
2-4-3 藉由副本達成容錯度
Kafka 對所有配置的 servers 底下的 partition 都會做副本日誌,這樣即便 server 故障,資料還是能夠從另外一個 broker(此 broker 持有故障的 partition 副本)還原。
透過瞭解上述的 Kafka 核心組件與架構,user 能設計更穩固、可擴展、且容錯度高的 streaming apps。
2-5 Tasks
-
What is the primary role of a Kafka broker? To store data and server clients (producers and consumers)
-
How does Kafka ensure fault tolerance? By replicating data across multiple brokers
-
What command is used to start a Kafka broker? kafka-server-start.sh
-
Which component is responsible for publishing messages to Kafka topics? Producer
-
In the context of Kafka, what is a topic? A category or feed name to which records are published
-
What does a Kafka consumer do? It subscribes to topics and processes messages
-
What is a partition in Kafka? A subset of a topic’s data
-
Which broker is the leader for the partition of
my-topic
?1 2 3 4 5
$ /root/kafka/bin/kafka-topics.sh --describe \ --topic my-topic \ --bootstrap-server localhost:9092 Topic: my-topic TopicId: KInvVHdEQQ6jazgiRfjt8Q PartitionCount: 1 ReplicationFactor: 1 Configs: Topic: my-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
03 producer & consumers
這一段落聚焦於 producers 以及 consumers,將探索這兩個 component 的角色、如何與 Kafka 互動,會使用 Kafka CLI 提供實際練習案例來生產/消費訊息。
3-1 Kafka Producers
Kafka producer 負責發布 records/messages 到 Kafka topics,可以把 topic 看成是 records 要發布的類型或訊息來源名稱。Producer 把資料送進 Kafka brokers,讓 broker 確保資料有儲存,也有副本容錯。
3-1-1 Producer 如何作用
Producer 將資料做序列化(把它轉成 bytes)再透過 network 傳送至 Kafka cluster。這個 cluster 根據所定義的 partition strategy(例如 round-robin, key-based partitioning),將資料存進該 topic 下適當的 partition。
範例:產生 message
使用 Kafka CLI 命令 kafka-console-producer
,如下範例:
|
|
這個指令會開啟一個 prompt,在這之後你輸入的每一行都會被發佈到在 localhost:9092
運行的 Kafka cluster 當中,名為 exampleTopic
的 topic。
3-1-2 Kafka Producer 的幾個重要配置內容
linger.ms
:控制了該 producer 在發送一個 batch of messages 之前要等待多久,設置比較高的數值可以透過允許一次發送比較多的訊息量,進而提高 throughput,但也可能提高 latency。acks
:決定 producer 在認定一則訊息『已發送』之前,要經過多少 acknowledgementsacks=all
表示主節點需要等待所有副本都提供 acknowledgement,優點:訊息持久性,缺點:可能提高 latency
batch.size
:控制 producer 要發送的單一 batch 的最大容量 (in bytes),比較大包的 batch 可以提高 throughput 但是也需要比較多 memory
3-2 Kafka Consumers
Kafka consumer 從 topics 讀取 records,consumer 訂閱一到多個 topics,按照產生的順序讀取 records
3-2-1 Consumer 如何作用
Conumer 使用一個 pull model 來從 broker 取得資料,另外也透過管理 offsets 偏移量來追蹤消費過的 records。Offsets 實質上是 pointers 指向這個 consumer 讀取到的上一則 record。Kafka 儲存這些 offsets,讓 consumer 即使在壞掉或重啟後,都能夠從上一次讀取過的段落重新開始。
Consumer Groups
Kafka consumer 可以作為 consumer groups 的一部分。當有多個 conumsers 共同處於一個 group,Kafka 則確保每個 partition 只會被這個 consumer groups 的 其中一個 consumer 所消費。這個機制可以讓資料分發傳送給多個 consumers 處理,達到可擴展與容錯性。
範例:Conumer messages
使用 kafka-console-consumer
的 Kafka CLI 指令可以從一個 topic 讀取/消費訊息,如下例:
|
|
這個指令會印出來自於 exampleTopic
topic 的訊息,按照產生的順序,從最舊到最新顯示
3-2-2 Kafka Consumer 的幾個重要配置內容
from-beginning
:在 consumer command 加入這個 flag,使 client 能夠從 topic 所能提供之 latest offset 開始消費訊息。沒有加這個 flag 的話,consumer 只會先從 latest offset 讀取訊息group.id
:定義這個 consumer 要屬於哪一個 consumer group,Kafka 確保一個 consumer group 當中,一次只會有一個 consumer 處理一個 partition,將其它負載分發傳送到相同 group 裡面的多個 consumers。isolation.level
:這個設置控制該 consumer 在處理交易性 topics 時,是否要讀取已提交或者尚未提交的訊息。把他設定為read_committed
,會確保該 consumer 只會讀取完全提交的訊息。
3-3 實務上考量
serialization/deserialization、partition、offset 管理,在實作 producer consumer 時都需要仔細考量,這三個面向對於 kafka-based application 的效率性與可靠性都至關重要。
3-3-1 Serialization/deserialization
Producer 把 message 序列化成 bytes –> 送進 Kafka –> Consumer 將 bytes 反序列化回原始資料格式。Kafka 支援多種序列化格式,包含 JSON、Avro、Protobuf。
3-3-2 Partitioning
適當的 partitioning 確保在 Kafka cluster 裡面有效率的資料分派。Producer 可以給每個 message 指明一個 key,這個 key 會被 Kafka 用來決定該 message 會被存進指定 topic 當中的哪一個 partition。
3-3-3 Offset management
Consumer 使用 offset 追縱他們的進度,因此 offsets 需要仔細管理,以確保 consumer app 對於所有訊息都有依照產生順序消費,不會有重複讀取/消費的問題。
3-4 Tasks
-
Apache Kafka is a distributed streaming platform that allows for the building of real-time streaming data pipelines and applications. At its core, Kafka manages records in a fault-tolerant and scalable way. This labs focus on two essential components of Kafka: producer and consumer
-
Kafka producers are responsible for publishing records to Kafka topics, while Kafka consumers read records from topics. Understanding how these components interact with Kafka is crucial for building efficient and reliable streaming applications.
-
Which of the following Kafka producer configurations controls the amount of time a producer waits before sending a batch of messages? linger.ms
-
Which Kafka producer configuration ensures that the producer waits for acknowledgments from all replicas before considering a message as successfully sent? acks=all
-
Which Kafka CLI command is used to produce message to a topic? kafka-console-producer.sh
-
Use the
kafka-console-producer
command to sendHi, this is my first messsage
to themyFirstTopic
topic1 2 3 4 5 6
cd /root/kafka/bin/ ls cat kafka-console-producer.sh kafka-console-producer.sh --topic myFirstTopic \ --bootstrap-server localhost:9092 > Hi, this is my first messsage
-
Which Kafka CLI command is used to consume messages to a topic? kafka-console-consumer.sh
-
When using Kafka’s CLI consumer, what does the
--from-beginning
flag do? Consumes messages from the start of a topic -
In Kafka CLI consumer, which of the following command allows you to specify the consumer group that the client should join? –group
-
What is the purpose of the
--isolation-level
configuration in the Kafka consumer CLI? To determine whether the consumer reads committed or uncommitted messages -
Use the Kafka console consumer to read messages from the topic
myFirstTopic
and store them in/root/messages
1 2 3 4 5 6
/root/kafka/bin/kafka-console-consumer.sh --topic myFirstTopic \ --bootstrap-server localhost:9092 \ --from-beginning --timeout-ms 1000 Hi, this is my first message a29kZWtsb3VkCg== Processed a total of 2 messages
04 Kafka topics & partitions
這章節要解開 topics 跟 partition 這兩個 kafka 的謎霧。重複強調一下,Kafka 是 distributed streaming platform, 可以協助您達到高吞吐、且具有容錯性的即時資料來源處理。不管是 Kafka 小萌新,或者是打算將這兩個概念了解透徹,要有效率的使用 Kafka,了解 Topic 與 Partition 都是關鍵要素。現在要講如何用 topics 幫 message 分類,以及 partitions 如何啟用 Kafka 的可擴展性以及平行處理的能力。
4-1 Kafka Topics
將 Kafka topic 想像成一個欄位或者一個儲存訊息的資料夾。Topics 是 Kafka producer 與 Kafka consumer 之間溝通的方式。每一則發佈到 Kafka cluster 的訊息都會被分派一個特定的 topic,使得這個 message 能夠被訂閱這個 topic 的任何 consumer group 所讀取/消費。
4-1-1 建立一個 Kafka Topic
使用 kafka-topics
CLI 指令工具建立一個新的 Kafka topic
|
|
這指令會建立一個名為 my-first-topic
的 topics,有三個 partition,replication factor 是 1
4-1-2 列出 Kafka Topics
要查看 Kafka cluster 現在有的 topic 清單,可以用以下指令
|
|
4-1-3 了解 Topics characteristics
- Immutability 不可變異性:一旦有訊息寫入一個 topic 就不能被異動。這是 kafka 設計的一個關鍵功能
- Retention Policy 保留政策:Kafka topic 可以配置訊息保留政策,決定訊息再被刪除之前要保留多久一段時間(可以根據時間或者訊息大小而定)
4-2 Kafka Partitions
Partitions 是 Kafka 確保擴展性以及容錯的方式。每一個 topic 會被區分成多個 partitions,每一個 partition 可以放在一個 cluster 當中,不同的 kakfa brokers。可以使得一個 topic 的訊息分佈在 cluster 各處,達到平行化處理並提高吞吐量。
4-2-1 為啥需要 Partitions
- Parallelism 平行化:partition 允許多個 consumers 平行化讀取一個 topic 的訊息,顯著提升了系統的吞吐量
- Scalability 可擴展性:隨著 message 流量增加,可以加更多 partitions 近來,將負載分派到比較多的 brokers。
4-2-2 建立 Partitions
建立 topics 可以指定 partitions 數量,您也可以直接修改既有 topics 的 partitions 數量
|
|
上述指令會將既有的 my-first-topic
partitions 數量升到 6 個
4-2-3 Partitions 如何運作
- Ordering: 在一個 partition 內的所有訊息都保證會以他們寫入的先後順序儲存,但是跨 partitions 的 message 順序就不保證
- Consumer Groups: 一次只能有 consumer group 的唯一一名成員消費這個 partition 的訊息,以確保訊息有按照順序被處理
4-3 實際案例:生產/消費訊息
4-3-1 發送 message 進一個 topic
|
|
4-3-2 從一個 topic 消費 message
|
|
這指令會從名為 my-first-topic
的 topic,從最先前的地方開始顯示所有訊息
4-4 Tasks
-
In this section, we will explore the fundamental concepts of Kafka topics and partitions. Topics serve as categories for messages, while partitions allow for scalability and parallel processing.
-
What is a Kafka topic? A category or a folder where messages are stored
-
Create a Kafka Topic
- Utilize the Kafka CLI tool to create a topic named
my-first-topic
with 3 partitions and a replication factor of 1 - Use the Kafka binary located at
/root/kafka/bin/
to interact with Kafka
1 2 3 4
/root/kafka/bin/kafka-topics.sh \ --create --bootstrap-server localhost:9092 \ --replication-factor 1 --partitions 3 \ --topic my-first-topic
- Utilize the Kafka CLI tool to create a topic named
-
How many topics are in this cluster?
1 2 3 4 5 6 7 8 9 10
/root/kafka/bin/kafka-topics.sh --list \ --bootstrap-server localhost:9092 customer-feedback inventory-updates my-first-topic order-processing-queue user-registration-events /root/kafka/bin/kafka-topics.sh --list \ --bootstrap-server localhost:9092 | wc -l 5
-
Partitions in Kafka play a crucial role in scalability and fault tolerance. By dividing each topic into multiple partitions, Kafka enables parallel processing, which significantly improves throughput and system performance.
-
Why are partitions important in Kafka? They allow for parallelism and scalability.
-
Increase the number of partitions for the topic
my-first-topic
to6
1 2 3 4 5 6 7 8 9
# alter /root/kafka/bin/kafka-topics.sh --alter \ --bootstrap-server localhost:9092 \ --topic my-first-topic \ --partitions 6 # verify root/kafka/bin/kafka-topics.sh --describe \ --bootstrap-server localhost:9092 \ --topic my-first-topic
-
What is the retention policy in Kafka? A policy to determine how long messages are kept before being deleted
-
Change the retention time for messages in the
my-second-topic
to 7 days1 2 3 4 5 6 7 8 9 10
# set config "retention.ms" to 7 days (=604800000 milliseconds) /root/kafka/bin/kafka-configs.sh --alter \ --bootstrap-server localhost:9092 \ --entity-type topics \ --entity-name my-second-topic \ --add-config retention.ms=604800000 # verfy /root/kafka/bin/kafka-configs.sh --describe \ --bootstrap-server localhost:9092 \ --topic my-second-topic
-
Produce messages to a Kafka topic
Use the Kafka-console-producer to send the message
{"user": "Alice", "action": "login", "timestamp": "2024-12-03T10:00:00Z"}
to the topicmy-first-topic
1 2 3 4
/root/kafka/bin/kafka-console-producer.sh \ --broker-list localhost:9092 \ --topic my-first-topic > {"user": "Alice", "action": "login", "timestamp": "2024-09-30T10:00:00Z"}
-
Consume messages from a Kafka topic
Use the Kafka console consumer to read messages from the topic
my-first-topic
and store them in/root/messages
1 2 3
/root/kafka/bin/kafka-console-consumer.sh --topic my-first-topic \ --bootstrap-server localhost:9092 \ --from-beginning --timeout-ms 1000 > /root/messages
-
Delete a Kafka topic
Use the Kafka CLI tool to delete the topic
my-first-topic
1 2 3
/root/kafka/bin/kafka-topics.sh --delete \ --topic my-first-topic \ --bootstrap-server localhost:9092
05 Kafka environment setup
這一章節目的要帶你走過整個從零開始建立 Kafka 環境的流程,使用到 Kafka 本身以及 ZooKeeper 用於 cluster 管理,程度適合 beginner。
5-1 先備知識
在建立 kafka setup 之前要有以下知識背景:
- Linux-based 的系統(這裡提到的指令都用於 linux 環境)
- Command-line 基本知識
- 系統已安裝
curl
,用來下載 Kafka tar
用於解壓縮 Kafka archive- 系統已安裝
Java
,用來運行 Kafka - Root 或者 sudo 權限,用來建立 service files
5-2 實際步驟
Step 1: 下載 Kafka
可以從官網下載 Apache Kafka,但是為了便利性,這裡使用 curl 從 command line 直接下載 Kafka,以下指令下載的是 version 3.7.1 版本。需要新的版本的話,可以去 Kafka 下載頁面查看
|
|
Step 2: 解壓縮 Kafka
一旦下載完成,需要解壓縮到你想要放的目錄裡。以下指令會在你的 home 目錄建立一個新的 directory 給 Kafka,並將 archive 解壓縮至此
|
|
Step 3: 建立 ZooKeeper Systemd Service
使用以下指令建立 ZooKeeper 的 systemd service file
|
|
Step 4: 設定 Kafka Server
類似 ZooKeeper,Kafka 也需要 system service 來做自動化管理,使用以下指令建立 Kafka service file
|
|
Step 5: 啟動服務
兩個 services files 皆到位,就可以 enable and start ZooKeeper 跟 Kafka
|
|
5-3 驗證安裝
以下指令用來驗證 Kafka 以及 Zookeeper 皆已正確安裝。如果都正確無誤,那這兩個 services 應該都在 active 狀態
|
|
5-4 Task
-
Apache Kafka is a powerful distributed streaming platform that enables you to build real-time streaming data pipelines and applications. Setting up a Kafka environment involves installing Kafka itself along with ZooKeeper, which Kafka uses for cluster management.
-
Pre-requisite per above
-
Install Java 17 as Kafka depends on this
1
sudo apt update && sudo apt install -y openjdk-17-jdk
-
What is the primary purpose of ZooKeeper in a Kafka environment? To manage cluster metadata and configurations
-
Download the latest version of Kafka and store it in the
/home/bob/
directory with the filenamekafka.tgz
1
curl -L https://downloads.apache.org/kafka/3.7.1/kafka_2.13-3.7.1.tgz -o /home/bob/kafka.tgz
-
Extract the Kafka archive to the
/home/bob/kafka/
directory1
mkdir -p /home/bob/kafka && tar -xvzf /home/bob/kafka.tgz --strip 1 -C /home/bob/kafka
-
What is the correct path to the Kafka config file? /home/bob/kafka/config/server.properties
-
What is the correct path to the ZooKeeper config file? /home/bob/kafka/config/zookeeper.properties
-
Create a systemd service file to manage ZooKeeper as a service.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
sudo vi /etc/systemd/system/zookeeper.service # paste below [Unit] Requires=network.target remote-fs.target After=network.target remote-fs.target [Service] Type=simple User=root ExecStart=/home/bob/kafka/bin/zookeeper-server-start.sh /home/bob/kafka/config/zookeeper.properties ExecStop=/home/bob/kafka/bin/zookeeper-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target # save and exit # reload the systemd daemon to recognize the new service file sudo systemctl daemon-reload
-
Create a systemd service file to manage Kafka as a service.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
sudo vi /etc/systemd/system/kafka.service # paste [Unit] Requires=zookeeper.service After=zookeeper.service [Service] Type=simple User=root ExecStart=/bin/sh -c '/home/bob/kafka/bin/kafka-server-start.sh /home/bob/kafka/config/server.properties > /home/bob/kafka/kafka.log 2>&1' ExecStop=/home/bob/kafka/bin/kafka-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target # save and exit # reload the systemd daemon to apply the changes sudo systemctl daemon-reload
-
Enable and start both the ZooKeeper and Kafka services to ensure they run at boot and are active immediately.
1 2 3 4 5 6 7 8
# enable ZooKeeper to start at boot sudo systemctl enable zookeeper # start the ZooKeeper service sudo systemctl start zookeeper # enable Kafka to start at boot sudo systemctl enable kafka # start the Kafka service sudo systemctl start kafka
-
Install Kafdrop as a UI for managing your Kafka cluster with a graphical interface. Create Kafkaui as a service and ensure it is enabled and started automatically on boot?
Solution
Follow these steps to install Kafdrop and set it up as a systemd service for your Kafka cluster: Step 1: Download Kafdrop
Use the following curl command to download the Kafdrop JAR file to the
/opt
directory:1
sudo curl -L https://github.com/obsidiandynamics/kafdrop/releases/download/4.0.2/kafdrop-4.0.2.jar -o /opt/kafdrop-4.0.2.jar
Step 2: Create a systemd Service for Kafdrop
- Create a systemd service file for Kafdrop. Open a new file using a text editor:
1
sudo vi /etc/systemd/system/kafkaui.service
- Add the following content to the service file:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
[Unit] Description=Web UI for administration of Kafka clusters Requires=kafka.service After=kafka.service [Service] User=root WorkingDirectory=/opt/ ExecStart=/usr/bin/java --add-opens=java.base/sun.nio.ch=ALL-UNNAMED -jar kafdrop-4.0.2.jar --kafka.brokerConnect=ubuntu-host:9092 StartLimitInterval=0 RestartSec=10 Restart=always [Install] WantedBy=multi-user.target
- Reload the systemd daemon to recognize the new service file:
1
sudo systemctl daemon-reload
- Enable and Start the Kafdrop Service:
1 2
sudo systemctl enable kafkaui.service sudo systemctl start kafkaui.service
06 hands-on with producers & consumers
6-1 撰寫 Kafka Producers
Producer 負責將 data 送到 Kafka topics 以利 consumer 消費。這章節會分別使用 Java 與 Python 展示建立連線、傳送訊息、處理錯誤的細節:
6-1-1 Producer 長話短說
Kafka producers 是負責將 records 傳送進 Kafka topics 的應用程式,負責處理資料序列化(data –> byte streams) 以及分派 records 到一個 topic 的哪些個 partitions。
一個好的 Kafka producer should be designed to be efficient, scalable, resilient。
6-1-2 以 Java 建立 Kafka Producer
建立一個 Gradle Project
-
Create a new directory for the project
1 2
mkdir kafka-producer cd kafka-producer
-
Initialize the Gradle project
1
gradle init --type java-applications
-
Add Kafka dependencies
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
plugins { id 'application' } repositories { mavenCentral() } dependencies { implementation 'org.apache.kafka:kafka-clients:3.0.0' } application { mainClassName = 'com.example.SimpleProducer' }
4. 建立連線
Create a Java class for the producer
建立 src/main/<path-to-your-producer-package>/SimpleProducer.java
|
|
解釋
bootstrap.servers
:要使用的 Kafka cluster 位址key.serializer
&value.serializer
:如何將 producer 要傳送的 key-value 物件轉成 bytes
5. 發送訊息
要發送訊息的話,則要建立 ProducerRecord
內容包含 topic name、key、value,再呼叫 send
方法。
|
|
解釋
-
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);
這裡建立了一個 record,包含指定的 topic, key, value
-
producer.send(record);
將 record 發送至 Kafka cluster
6. 錯誤處理
使用 callbacks 以及 try-catch block 處裡錯誤,這裡修改一下 send
方法讓它包含 callback
|
|
除此之外也要用 finally
把 producer 給關閉以釋放資源
|
|
解釋
try-catch-finally
:確保 producer 在成功/有錯誤失敗的情況後,都正確關閉callback
:處理 send 操作的結果,印出成功/失敗錯誤訊息
6-1-3 以 Python 建立 Kafka Producer
建立一個 Python Project
-
create a new directory for the project
1 2
mkdir kafka-producer cd kafka-producer
-
create and activate a virtual environment
1 2
python3 -m venv venv source venv/bin/activate
-
install Kafka-Python
1
pip install kafka-python
建立連線
新增一個 file producer.py
撰寫以下這些程式碼
|
|
解釋
bootstrap.servers
:要使用的 Kafka cluster 位址key.serializer
&value.serializer
:如何將 producer 要傳送的 key-value 物件轉成 bytes
發送訊息
要發送訊息的話,則呼叫 send
方法,包含 topic name, key, value。serializers 確保 key value 都被加密成 bytes
|
|
解釋
-
producer.send(topic_name, key=key, value=value)
將 record/message 發送至 Kafka cluster,內容包含指定 topic, key, value
錯誤處理
使用 try-except
區塊處理錯誤,並調整 send
方法以納入錯誤處理
|
|
解釋
try-except-finally
:確保 producer 不管成功失敗都正確關閉KafkaError
:捕獲 Kafka operation 相關的錯誤future.get(timeout=10)
:等待發送操作完成,並取得 metadata 或拋出錯誤
6-1-4 Task
-
Introduction to Kafka Producers: Kafka producers are applications that send records to Kafka topics. A record is a key-value pair that can also have metadata such as a timestamp. Producers send these records to categories called topics.
-
What is the role of a Kafka producer? An application that sends records to Kafka topics
-
Which property specifies the address of your Kafka cluster in the producer configuration? bootstrap.servers
-
What does the
acks
property in Kafka producer configuration signify? The number of acknowledgements the producer requires from the cluster -
Add below in the build.gradle file
1 2 3 4 5
dependencies { implementation 'org.apache.kafka:kafka-clients:3.3.1' # add this implementation 'ch.qos.logback:logback-classic:1.2.10' testImplementation 'junit:junit:4.13.2' }
-
Which serializer is used for the key in the producer configuration?
org.apache.kafka.common.serialization.IntegerSerializer
org.apache.kafka.common.serialization.ByteArraySerializer
The key and value serializers need to convert the data into bytes.
org.apache.kafka.common.serialization.StringSerializer
org.apache.kafka.common.serialization.LongSerializer
-
Configure the Kafka producer in
/root/kafka-producer/.../SimpleProducer
. Build the project after configuring SimpleProducer.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
vi src/main/java/com/example/SimpleProducer.java # paste package com.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class SimpleProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); String topicName = "example-topic"; String message = "Hello, Kafka!"; producer.send(new ProducerRecord<>(topicName, "key1", message)); try { producer.send(new ProducerRecord<>(topicName, "key1", message)); } catch (Exception e) { System.out.println("Error: "+ e.getMessage()); } finally { producer.close(); } } } # save and exit cd /root/kafka-producer # ls and check that the gradlew is here # run gradle to build the project ./gradlew build
-
What does producer.send() do? It sends record to Kafka cluster.
-
Send the message
Hi, this message is from the Java producer
to thekodekloud
Kafka topic using the Java producer.Build the project after configuring SimpleProducer.java
To run the java code use
java -jar /root/kafka-producer/build/libs/kafka-producer.jar
command.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
# navigate to the project directory cd /root/kafka-producer # create the necessary directories for your Java source files if they do not already exist mkdir -p src/main/java/com/example # use the SimpleProducer created per previous task to modify package com.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class SimpleProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); // Send the message to the "kodekloud" Kafka topic String topicName = "kodekloud"; String message = "Hi, this message is from the Java producer"; producer.send(new ProducerRecord<>(topicName, "messageKey", message)); System.out.println("Message sent: " + message); producer.close(); } } # save and exit # run the java code ./gradlew clean build
-
Which command is used to retrieve the message from the
kodekloud
topic on a Kafka broker running at localhost:9092?1 2 3
/root/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic kodekloud \ --from-beginning
-
What is the purpose of the callback mechanism in Kafka producer? Error handling
-
Implement error handling in the Kafka producer using a callback.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
# implemente error handling in the Kafka producer using a callback package com.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.Callback; import java.util.Properties; public class SimpleProducer { public static void main(String[] args) { // Step 1: Set the properties for the Kafka producer Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // Step 2: Create a Kafka producer Producer<String, String> producer = new KafkaProducer<>(props); // Step 3: Define the topic and message to send String topicName = "kodekloud"; String message = "Hi, this message is from the Java producer"; // Step 4: Create a callback to handle success and error scenarios Callback callback = new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { // The message was sent successfully System.out.println("Message sent successfully to topic " + metadata.topic() + " partition " + metadata.partition() + " offset " + metadata.offset()); } else { // There was an error sending the message System.err.println("Error sending message: " + exception.getMessage()); } } }; // Step 5: Send the message with the callback producer.send(new ProducerRecord<>(topicName, "messageKey", message), callback); // Step 6: Close the producer producer.close(); } } # save and exit ./gradlew clean build
-
Implement a Kafka producer using Python. Your implementation should include setting up the producer to connect to a Kafka broker running on
localhost:9092
and define a topic namedpayment-transactions
. Use the kafka-python library to achieve this. Send the following message to the payment-transactions topic:Payment processed successfully
Create a file named
SimpleProducer.py
in the/root/kafka-producer
directory and write your script in this file.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
pip install kafka-python vi /root/kafka-producer/SimpleProducer.py from kafka import KafkaProducer def main(): # Kafka configuration kafka_config = { 'bootstrap_servers': 'localhost:9092', 'key_serializer': str.encode, 'value_serializer': str.encode } # Create Kafka producer producer = KafkaProducer(**kafka_config) # Send the message to the "payment-transactions" Kafka topic topic_name = "payment-transactions" message_key = "transactionKey" message_value = "Payment processed successfully" # Produce the message producer.send(topic_name, key=message_key, value=message_value) # Wait for all messages to be sent producer.flush() if __name__ == "__main__": main()
-
To implement error handling in the Kafka producer using a KafkaError. We can use the below code.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
from kafka import KafkaProducer from kafka.errors import KafkaError def on_send_success(record_metadata): print(f"Message sent to topic: {record_metadata.topic}, partition: {record_metadata.partition}, offset: {record_metadata.offset}") def on_send_error(excp): print(f"Error while producing message: {excp}") def main(): # Kafka configuration kafka_config = { 'bootstrap_servers': 'localhost:9092', 'key_serializer': str.encode, 'value_serializer': str.encode } # Create Kafka producer producer = KafkaProducer(**kafka_config) # Send the message to the "payment-transactions" Kafka topic topic_name = "payment-transactions" message_key = "transactionKey" message_value = "Payment processed successfully" # Produce the message with error handling producer.send(topic_name, key=message_key, value=message_value).add_callback(on_send_success).add_errback(on_send_error) # Wait for all messages to be sent producer.flush() if __name__ == "__main__": main()
6-2 撰寫 Kafka Consumers
此章節目的在讓讀者了解如何使用 Java API + Gradle 以及 Python 撰寫 Kafka consumer,重點在於 consumer 配置、資料讀取、管理 offset。Consumer 在實現即時資料處理分析扮演很重要的角色(從 Kafka topics 讀取資料)。
6-2-1 Kafka Consumer
Kafka Consumer 從 Kafka cluster 讀取資料,consumers 會訂閱一到多個 topics,並處理收到的 streams of records。在 Kafka 中,consumer 負責追蹤它已經處理的 record,這個追蹤行為就稱為管理偏移量。
使用 Kafka Consumer 的原因
- 可擴展性:consumer 能夠水平擴展,以平行化讀取 topics 內容
- 容錯:支援 automatic offset commit 能力,確保 consumer 故障時也不會有資料遺失
- 彈性:consumer 可以從某一個特定的 offset 開始讀取,允許多樣的 processing strategies(例如 reprocessing historical data)
6-2-2 設定環境
在撰寫 Kafka consumer 之前,需要先設定開發環境,包括安裝 Kafka、用 gradle 建立 Java project、建立 Python 合適環境。
安裝 Kafka
從官網下載 / 解壓縮 Kafka,啟動 ZooKeeper 以及 Kafka server
|
|
6-2-3 以 Java 建立 Kafka Consumer
建立一個目錄,然後初始化一個 Gradle 專案
|
|
將 Kafka client dependency 加入 build.gradle
檔案
|
|
接下來要撰寫一個 Kafka consumer 用來訂閱 topics 並將 message 印到 console
基本的 consumer config
建立 src/<path-to-your-consumer-package>/SimpleConsumer.java
初始化
|
|
讀取訊息
在 try block 裡面加入 poll for new message 並印出來的邏輯
|
|
以上是簡單的 Java Kafka Consumer。
6-2-4 以 Python 建立 Kafka Consumer
確定已經安裝了 Python,然後建立一個新的 virtual environment
|
|
安裝 Kafka Python package
|
|
基本的 consumer config
建立一個 python script simple_consumer.py
,以下是基本的配置內容:
|
|
讀取訊息
加入持續讀取訊息並印出來的邏輯
|
|
以上是簡單的 Python Kafka Consumer。
6-2-5 管理 Offsets
Kafka consumer 使用 offset 來追蹤已經消費過的訊息。預設會自動提交 offset,但是也能做手動管理以提供更精細的控制。
自動 offset 提交
上述兩個 consumers 都使用 automatic offset commiting,這個行為是由 enable.auto.commit
的配置所控制(預設是 true
)
手動 offset 提交
如果要 manual offset commiting 的話:
-
Java:將
enable.auto.commit=false
,在 message loop 裡面調用commitSync
方法1 2 3
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // Inside the polling loop consumer.commitSync();
-
Python:在 message loop 裡面調用
commit
方法1 2 3
consumer = KafkaConsumer(enable_auto_commit=False) # Inside the message loop consumer.commit()
6-2-6 Task
-
What is the primary role of a Kafka consumer in a Kafka ecosystem? To read and process messages from Kafka topics
-
What is a Kafka Consumer Group used for? To allow multiple consumers to read messages from a topic in parallel without overlapping
-
What is the first step in creating a Kafka consumer in Java? Configuring consumer properties such as bootstrap server and group ID
-
Which of the following is a required configuration property when creating a Kafka consumer in Java? group.id
-
After configuring the properties, which class is used to instantiate a Kafka consumer in Java? KafkaConsumer
-
After instantiating a Kafka consumer, what method is used to subscribe to a Kafka topic? subscribe()
-
Once a Kafka consumer is subscribe to a topic, which method is typically used to pull messages from Kafka? poll()
-
After calling the poll() method, what data structure is returned to the consumer containing the records? ConsumerRecords
-
To commit the offsets of processed messages manually, which method should be used? commitSync()
-
Configure the Kafka Consumer in Java using the
/root/kafka-consumer
project. Create a Java classSimpleConsumer
inkafka-consumer/src/main/java/com/example
and set up the consumer.Build the project after configuring SimpleConsumer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
cd /root/kafka-consumer mkdir -p src/main/java/com/example vi src/main/java/com/example/SimpleConsumer.java package com.example; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Properties; public class SimpleConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Create the Kafka consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); } } # save and exit ./gradlew build
-
After configuring and subscribing the Kafka consumer to the topic order-event, write the Java code snippet that pulls messages from the topic.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
package com.example; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class SimpleConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Create the Kafka consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // Add below codes for task# 11 🌟 // Subscribe to the 'order-events' topic consumer.subscribe(Collections.singletonList("order-events")); // Poll for new messages and print them to the console try { while (true) { // Poll Kafka broker for new records (with a timeout of 1000 milliseconds) ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); // Process each record and print to console for (ConsumerRecord<String, String> record : records) { // Print consumed message details to the console System.out.printf("Consumed message: key = %s, value = %s, partition = %d, offset = %d%n", record.key(), record.value(), record.partition(), record.offset()); } } } finally { // Ensure the consumer is closed properly consumer.close(); } } }
-
Which library provides Python support for working with Kafka in a more Kafka-native style, supporting both consumers and producers? kafka-python
-
Write the Python code snippet that pulls messages from the
payment-transactions
topic usingkafka-python
.Create a file named
SimpleConsumer.py
in the/root/
directory and write your script in this file.NOTE: Run the Python code to pull messages by executing:
python3 /root/SimpleConsumer.py
and do not stop or terminate the code.1 2 3 4
# install kafka-python library pip install kafka-python # create a python script vi /root/SimpleConsumer.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# At SimpleConsumer.py from kafka import KafkaConsumer # Set up the Kafka consumer consumer = KafkaConsumer( 'payment-transactions', group_id='my-group', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest' ) # Poll messages from the topic for message in consumer: print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}") print(f"Key: {message.key}, Value: {message.value.decode('utf-8')}")
1 2
# run the python code to pull messages python3 /root/SimpleConsumer.py
結語
- 不確定是否要用 http api 進行 producer 發佈訊息的動作?如果是的話,可能需要再 study Kafka Rest Proxy
- 不確定 consumer 這段用 Kafka Connect + MongoDB 方向是否正確?如果是的話,則要再 study Kafka MongoDB Sink Connector ,或者可能有 best practice example 可查看
- Zilla 這個工具看他放在官網的 before / after 示意圖,看來能夠直接取代 Kafka Rest Proxy 跟 Kafka Connect。