之前我們理解了Kafka與Storm和Spark的整合。讓我們探索如何使用它來使用Twitter流API獲得帶有標簽的實時Twitter提要。
首先,您需要創建一個Twitter開發人員帳戶,以獲得Twitter應用程序證書。在您完成創建它之後,您將需要一個TwitterKafka生產者,可以幫助-
- 閱讀Twitter feed並處理它。
- 提取標簽。
- 向Kafka發送標簽。
Twitter流API
您可以使用任何編程語言訪問Twitter Streaming API。這個開源的、基於java的模塊被稱為“twitter4j”,它提供了一個基於監聽器的框架,可以輕鬆訪問推文和Twitter流API。對於這個過程,我們需要創建一個Twitter開發人員帳戶。
創建它之後,下載' twitter4j ' jar文件,並獲得這些OAuth身份驗證詳細信息—
- Customerkey
- CustomerSecret
- AccessToken
- AccessTokenSecret
這裏是完整的Twitter Kafka生產者編碼-
進口java.util.Arrays;
進口java.util.Properties;
進口java.util.concurrent.LinkedBlockingQueue;
進口twitter4j。*;
進口twitter4j.conf。*;
進口org.apache.kafka.clients.producer.Producer;
進口org.apache.kafka.clients.producer.KafkaProducer;
進口org.apache.kafka.clients.producer.ProducerRecord;
KafkaTwitterProducer
public static void main(String[] args)拋出異常{
LinkedBlockingQueue
如果(arg遊戲。長度< 5){
System.out.println (
"使用方法:KafkaTwitterProducer < twitter-consumer-key >
< twitter-consumer-secret > < twitter-access-token >
< twitter-access-token-secret >
<主題名稱> < twitter-search-keywords >”);
返回;
}
字符串consumerKey = args[0].toString();
字符串consumerSecret = args[1].toString();
字符串accessToken = args[2].toString();
字符串accessTokenSecret = args[3].toString();
字符串topicName = args[4].toString();
String[] arguments = args.clone();
字符串[]keyWords =數組。copyOfRange(參數5 arguments.length);
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(真正的)
.setOAuthConsumerKey (consumerKey)
.setOAuthConsumerSecret (consumerSecret)
.setOAuthAccessToken (accessToken)
.setOAuthAccessTokenSecret (accessTokenSecret);
TwitterStream = new TwitterStreamFactory(cb.build()).get-Instance();
StatusListener監聽器=新StatusListener() {
@Override
公共無效onStatus(狀態狀態){
queue.offer(狀態);
/ / system . out。.getScreenName println(“@”+ status.getUser () ()
+ " - " + status.getText());
/ / system . out。.getScreen-Name println(“@”+ status.getUser () ());
/*for(URLEntity urle: status.getURLEntities()) {
System.out.println (urle.getDisplayURL ());
} * /
/*for(HashtagEntity hashtage: status.getHashtagEntities()) {
System.out.println (hashtage.getText ());
} * /
}
@Override
公共無效onDeletionNotice(StatusDeletionNotice statusdelete - notice) {
/ / system . out。"獲取狀態刪除通知id:"
+ statusDeletionNotice.getStatusId ());
}
@Override
onTrackLimitationNotice(int numberoflimitedstatus) {
/ / system . out。println("收到曲目限製通知:" +
num-berOfLimitedStatuses);
}
@Override
onScrubGeo(長userId,長upToStatusId) {
/ / system . out。println(" get scrub_geo事件userId:" + userId + . println(" get scrub_geo事件userId:" + userId +
“upToStatusId:“+ upToStatusId);
}
@Override
公共無效onStallWarning(StallWarning警告){
/ / system . out。println("Got stall warning:" + warning);
}
@Override
onException(Exception ex) {
ex.printStackTrace ();
}
};
twitterStream.addListener(聽眾);
FilterQuery().track(keyWords);
twitterStream.filter(查詢);
thread . sleep (5000);
//增加Kafka生產者配置設置
屬性道具= new Properties();
props.put(“引導。服務器”、“localhost: 9092”);
道具。把(“ack”、“所有”);
道具。put("重試",0);
props.put(“批處理。大小",16384);
props.put(“徘徊。女士",1);
props.put(“緩衝區。記憶”,33554432);
props.put(“key.serializer”,
“org.apache.kafka.common.serializa-tion.StringSerializer”);
props.put(“value.serializer”,
“org.apache.kafka.common.serializa-tion.StringSerializer”);
Producer
Int I = 0;
Int j = 0;
While (i < 10) {
狀態ret = queue.poll();
If (ret == null) {
thread . sleep (100);
我+ +;
其他}{
for(HashtagEntity hashtage: ret.getHashtagEntities()) {
System.out.println ("Hashtag: " + hashtage.getText());
生產商。發送(新ProducerRecord <字符串,字符串> (
top-icName Integer.toString (+ +), hashtage.getText ()));
}
}
}
producer.close ();
thread . sleep (5000);
twitterStream.shutdown ();
}
}
在創建Twitter Kafka Producer之後,您可以向Kafka發送tweet。隻需運行Twitter Kafka Producer -右鍵單擊KafkaTwitterProducer.java >運行配置>參數。
然後添加Twitter oAuth令牌和Kafka主題如下所示-
“java - cp /路徑/ /卡夫卡/ libs / *”:“/道路/ / twitter4j / lib / *”:
.KafkaTwitterProducer < twitter-consumer-key >
< twitter-consumer-secret >
< twitter-access-token >
< twitter-ac-cess-token-secret >
my-first-topic遊戲
輸出將取決於當時的Twitter提要和關鍵字。示例輸出顯示在這裏-
。
遊戲:1
遊戲:2
gameinsights: 1
。