卡夫卡

.實時應用程序(Twitter)

.實時應用程序(Twitter)

之前我們理解了Kafka與Storm和Spark的整合。讓我們探索如何使用它來使用Twitter流API獲得帶有標簽的實時Twitter提要。

首先,您需要創建一個Twitter開發人員帳戶,以獲得Twitter應用程序證書。在您完成創建它之後,您將需要一個TwitterKafka生產者,可以幫助-

  • 閱讀Twitter feed並處理它。
  • 提取標簽。
  • 向Kafka發送標簽。

Twitter流API

您可以使用任何編程語言訪問Twitter Streaming API。這個開源的、基於java的模塊被稱為“twitter4j”,它提供了一個基於監聽器的框架,可以輕鬆訪問推文和Twitter流API。對於這個過程,我們需要創建一個Twitter開發人員帳戶。

創建它之後,下載' twitter4j ' jar文件,並獲得這些OAuth身份驗證詳細信息—

  • Customerkey
  • CustomerSecret
  • AccessToken
  • AccessTokenSecret



這裏是完整的Twitter Kafka生產者編碼-

進口java.util.Arrays;

進口java.util.Properties;

進口java.util.concurrent.LinkedBlockingQueue;

進口twitter4j。*;

進口twitter4j.conf。*;

進口org.apache.kafka.clients.producer.Producer;

進口org.apache.kafka.clients.producer.KafkaProducer;

進口org.apache.kafka.clients.producer.ProducerRecord;

KafkaTwitterProducer

public static void main(String[] args)拋出異常{

LinkedBlockingQueue queue = new LinkedBlockingQueue (1000);

如果(arg遊戲。長度< 5){

System.out.println (

"使用方法:KafkaTwitterProducer < twitter-consumer-key >

< twitter-consumer-secret > < twitter-access-token >

< twitter-access-token-secret >

<主題名稱> < twitter-search-keywords >”);

返回;

字符串consumerKey = args[0].toString();

字符串consumerSecret = args[1].toString();

字符串accessToken = args[2].toString();

字符串accessTokenSecret = args[3].toString();

字符串topicName = args[4].toString();

String[] arguments = args.clone();

字符串[]keyWords =數組。copyOfRange(參數5 arguments.length);

ConfigurationBuilder cb = new ConfigurationBuilder();

cb.setDebugEnabled(真正的)

.setOAuthConsumerKey (consumerKey)

.setOAuthConsumerSecret (consumerSecret)

.setOAuthAccessToken (accessToken)

.setOAuthAccessTokenSecret (accessTokenSecret);

TwitterStream = new TwitterStreamFactory(cb.build()).get-Instance();

StatusListener監聽器=新StatusListener() {

@Override

公共無效onStatus(狀態狀態){

queue.offer(狀態);

/ / system . out。.getScreenName println(“@”+ status.getUser () ()

+ " - " + status.getText());

/ / system . out。.getScreen-Name println(“@”+ status.getUser () ());

/*for(URLEntity urle: status.getURLEntities()) {

System.out.println (urle.getDisplayURL ());

} * /

/*for(HashtagEntity hashtage: status.getHashtagEntities()) {

System.out.println (hashtage.getText ());

} * /

@Override

公共無效onDeletionNotice(StatusDeletionNotice statusdelete - notice) {

/ / system . out。"獲取狀態刪除通知id:"

+ statusDeletionNotice.getStatusId ());

@Override

onTrackLimitationNotice(int numberoflimitedstatus) {

/ / system . out。println("收到曲目限製通知:" +

num-berOfLimitedStatuses);

@Override

onScrubGeo(長userId,長upToStatusId) {

/ / system . out。println(" get scrub_geo事件userId:" + userId + . println(" get scrub_geo事件userId:" + userId +

“upToStatusId:“+ upToStatusId);

@Override

公共無效onStallWarning(StallWarning警告){

/ / system . out。println("Got stall warning:" + warning);

@Override

onException(Exception ex) {

ex.printStackTrace ();

};

twitterStream.addListener(聽眾);

FilterQuery().track(keyWords);

twitterStream.filter(查詢);

thread . sleep (5000);

//增加Kafka生產者配置設置

屬性道具= new Properties();

props.put(“引導。服務器”、“localhost: 9092”);

道具。把(“ack”、“所有”);

道具。put("重試",0);

props.put(“批處理。大小",16384);

props.put(“徘徊。女士",1);

props.put(“緩衝區。記憶”,33554432);

props.put(“key.serializer”,

“org.apache.kafka.common.serializa-tion.StringSerializer”);

props.put(“value.serializer”,

“org.apache.kafka.common.serializa-tion.StringSerializer”);

Producer Producer = new KafkaProducer(props);

Int I = 0;

Int j = 0;

While (i < 10) {

狀態ret = queue.poll();

If (ret == null) {

thread . sleep (100);

我+ +;

其他}{

for(HashtagEntity hashtage: ret.getHashtagEntities()) {

System.out.println ("Hashtag: " + hashtage.getText());

生產商。發送(新ProducerRecord <字符串,字符串> (

top-icName Integer.toString (+ +), hashtage.getText ()));

producer.close ();

thread . sleep (5000);

twitterStream.shutdown ();

在創建Twitter Kafka Producer之後,您可以向Kafka發送tweet。隻需運行Twitter Kafka Producer -右鍵單擊KafkaTwitterProducer.java >運行配置>參數。

然後添加Twitter oAuth令牌和Kafka主題如下所示-

“java - cp /路徑/ /卡夫卡/ libs / *”:“/道路/ / twitter4j / lib / *”:

.KafkaTwitterProducer < twitter-consumer-key >

< twitter-consumer-secret >

< twitter-access-token >

< twitter-ac-cess-token-secret >

my-first-topic遊戲

輸出將取決於當時的Twitter提要和關鍵字。示例輸出顯示在這裏-

遊戲:1

遊戲:2

gameinsights: 1

DevOps概論
3小時
初學者
19.6 k +人注冊
4.56 (687)
介紹AngularJS
2小時
初學者
11.9 k +人注冊
4.55 (399)
JavaScript概論
3小時
初學者
34.9 k +人注冊
4.38 (1374)
中級Java數據結構與算法
4小時
中間
13.3 k +人注冊
4.49 (649)
使用Java製作遊戲
2小時
初學者
14.5 k +人注冊
4.28 (53)
算法用C
3小時
初學者
14.9 k +人注冊
4.39 (290)
Angular7初學者
3小時
初學者
9.8 k +人注冊
4.48 (315)
中級水平的Angular7
3小時
中間
4.7 k +人注冊
4.57 (95)
介紹Kubernetes
2小時
初學者
3.9 k +人注冊
4.13 (38)
用於高級水平的Angular7
3小時
先進的
5.1 k +人注冊
4.46 (101)
Baidu
map