卡夫卡

如何與Spark集成?

如何與Spark集成?

在Kafka Spark Streaming Integration中,Kafka作為一個中心集線器來輸入被處理的數據流,然後Spark Streaming使用Spark引擎將結果以儀表盤、HDFS和數據庫的形式發布到另一個Kafka主題或存儲中。

IMG_256

這一過程中涉及的兩種方法是-

  • Receiver-based方法

使用Kafka消費者API,接收器被實現來接收數據,然後存儲在Spark執行器中。Spark流同步處理數據,確保數據零丟失。



  • 直接的方法

直接方法是沒有接收器的,通過在Kafka的指定分區中查詢每個主題的偏移量來工作,而不是使用接收器。這種方法使並行讀取數據變得更容易,並且通過消除預寫日誌的需要提供零數據丟失。

它在用於Python API的Spark 1.4和用於Java和Scala API的Spark 1.3中被引入。

這裏是Kafka-Spark api -

  • SparkConf API

它用於使用SparkConf類設置鍵值對的配置。

  • StreamingContext API

它表示到Spark集群的連接,可以用來在集群上創建廣播變量、累加器和rdd。它的簽名是-

public StreamingContext(String master, String appName, Duration)

字符串sparkHome, scala.collection。Seq <字符串>的壇子,

scala.collection。Map < String, String >環境)

其中app name表示你的作業名稱,master是要連接的集群URL, batchDuration是將數據流分成批所需的時間間隔。

  • KafkaUtils API

這個API使用下麵提到的createStream簽名將Kafka集群連接到Spark流

公共靜態ReceiverInputDStream < scala。Tuple2 <字符串,字符串> > createStream (

(2)查詢zkQuorum,查詢groupId

scala.collection.immutable。Map主題,StorageLevel StorageLevel)

其中,ssc是一個StreamingContext對象,zkQuorum是Zookeeper quorum, groupId是消費者的組id, topics表示要消費的主題映射,storageLevel表示用於存儲接收到的對象的級別。

Python入門基礎
4小時
初學者
3.1 l +人注冊
4.56 (10987)
前端開發- HTML
2小時
初學者
1.8 l +人注冊
4.5 (11324)
前端開發- CSS
2小時
初學者
66.8 k +人注冊
4.48 (3889)
區塊鏈基礎知識
3小時
初學者
36.7 k +人注冊
4.54 (1275)
C語言中的數據結構
2小時
初學者
82 k +人注冊
4.38 (2984)
Excel初學者
2小時
初學者
4.9 l +人注冊
4.48 (23068)
我的SQL基礎知識
2小時
初學者
1.5 l +人注冊
4.41 (6730)
Android應用程序開發
2小時
初學者
86.2 k +人注冊
4.38 (2623)
哦在Java中
2小時
初學者
48.8 k +人注冊
4.39 (2072)
使用JavaScript製作遊戲
2小時
初學者
15.8 k +人注冊
4.34 (180)
Baidu
map