在Kafka Spark Streaming Integration中,Kafka作為一個中心集線器來輸入被處理的數據流,然後Spark Streaming使用Spark引擎將結果以儀表盤、HDFS和數據庫的形式發布到另一個Kafka主題或存儲中。
這一過程中涉及的兩種方法是-
- Receiver-based方法
使用Kafka消費者API,接收器被實現來接收數據,然後存儲在Spark執行器中。Spark流同步處理數據,確保數據零丟失。
- 直接的方法
直接方法是沒有接收器的,通過在Kafka的指定分區中查詢每個主題的偏移量來工作,而不是使用接收器。這種方法使並行讀取數據變得更容易,並且通過消除預寫日誌的需要提供零數據丟失。
它在用於Python API的Spark 1.4和用於Java和Scala API的Spark 1.3中被引入。
這裏是Kafka-Spark api -
- SparkConf API
它用於使用SparkConf類設置鍵值對的配置。
- StreamingContext API
它表示到Spark集群的連接,可以用來在集群上創建廣播變量、累加器和rdd。它的簽名是-
public StreamingContext(String master, String appName, Duration)
字符串sparkHome, scala.collection。Seq <字符串>的壇子,
scala.collection。Map < String, String >環境)
其中app name表示你的作業名稱,master是要連接的集群URL, batchDuration是將數據流分成批所需的時間間隔。
- KafkaUtils API
這個API使用下麵提到的createStream簽名將Kafka集群連接到Spark流
公共靜態ReceiverInputDStream < scala。Tuple2 <字符串,字符串> > createStream (
(2)查詢zkQuorum,查詢groupId
scala.collection.immutable。Map
其中,ssc是一個StreamingContext對象,zkQuorum是Zookeeper quorum, groupId是消費者的組id, topics表示要消費的主題映射,storageLevel表示用於存儲接收到的對象的級別。