javascript
streaming api_通过Spring Integration消费Twitter Streaming API
streaming api
1.概述
眾所周知, Spring Integration具有用于與外部系統(tǒng)交互的大量連接器。 Twitter也不例外,而且很長(zhǎng)一段時(shí)間以來,因?yàn)镾pring Social一直是一個(gè)開箱即用的解決方案,Spring Integration利用該解決方案來連接到社交網(wǎng)絡(luò)。
1.1Spring社交停產(chǎn)
不幸的是, Spring Social已經(jīng)到了使用壽命 ,該項(xiàng)目現(xiàn)在處于維護(hù)模式。 Spring團(tuán)隊(duì)決定不進(jìn)一步開發(fā)Spring Social的原因是,使API綁定與社交網(wǎng)絡(luò)的API保持同步變得很繁瑣。
除此之外,Spring Framework 5發(fā)布后,開發(fā)人員希望利用其響應(yīng)式編程模型,這將要求團(tuán)隊(duì)在現(xiàn)有的響應(yīng)式社交綁定旁邊重新實(shí)現(xiàn)一個(gè)響應(yīng)式Spring Social綁定。
現(xiàn)在建議開發(fā)人員要么實(shí)現(xiàn)自己的綁定,要么使用專用庫(kù)之一連接社交網(wǎng)絡(luò)。
1.2 Spring Integration的Twitter模塊已移至擴(kuò)展
Spring Social現(xiàn)在處于維護(hù)模式,這迫使Spring Integration團(tuán)隊(duì)將Twitter支持模塊從主項(xiàng)目移至擴(kuò)展。 由于Spring Social不會(huì)接收更新,因此它將基于較早的Spring Framework版本構(gòu)建。 這將導(dǎo)致類路徑?jīng)_突,也將阻礙Spring Integration的開發(fā)。
因此, 從Spring Integration 5.1開始,Twitter模塊可作為擴(kuò)展使用 。
1.3有哪些替代方案?
Twitter4J是Yamas Yusuke開發(fā)和維護(hù)的,用于Twitter API的非官方Java庫(kù)。 官方的HBC庫(kù)(由Twitter構(gòu)建)是一個(gè)Java HTTP Client,用于使用Twitter的Streaming API。 自2016年以來,后者從未見過重大更新,而Twitter4J正在定期更新。
也可以選擇實(shí)現(xiàn)自己的API綁定。 在使用RestTemplate的基于Spring的項(xiàng)目中,絕對(duì)是一種選擇,并且這是進(jìn)行REST調(diào)用的簡(jiǎn)便方法。
本指南以流模式使用Twitter4J,可以將其集成到Spring Integration消息流中。
1.4 Twitter流如何工作?
簡(jiǎn)而言之, 您的應(yīng)用打開了一個(gè)與Twitter API的單一連接,只要發(fā)生新匹配,就會(huì)通過該連接發(fā)送新結(jié)果 。 相反,另一種方法是通過向REST API重復(fù)發(fā)送請(qǐng)求來批量傳送數(shù)據(jù)。
流提供了一種低延遲的傳遞機(jī)制 ,該機(jī)制可以支持非常高的吞吐量,而不必處理速率限制。
2.示例項(xiàng)目
該示例項(xiàng)目展示了Twitter的Streaming API到Spring Integration消息流的集成,可在GitHub上找到 : https : //github.com/springuni/springuni-examples/tree/master/spring-integration/twitter-streaming 。
Maven依賴
由于Spring Social現(xiàn)在已經(jīng)停產(chǎn),因此我們不會(huì)在此基礎(chǔ)上繼續(xù)發(fā)展。 我們引入的只是spring-integration-core和twitter4j-stream 。
<dependencies><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-core</artifactId></dependency><dependency><groupId>org.twitter4j</groupId><artifactId>twitter4j-stream</artifactId><version>4.0.1</version></dependency></dependencies>該項(xiàng)目還使用了Lombok和Spring Boot測(cè)試支持,但是這些是可選的。
Spring Integration的可聽消息源
Spring Integration提供了對(duì)實(shí)現(xiàn)入站消息組件的支持。 它們分為輪詢和監(jiān)聽行為 。
最初依賴于Inbound Twitter Channel Adapter建立在Spring Social之上,現(xiàn)在已移至擴(kuò)展, 它是輪詢用戶 。 也就是說,您必須提供一個(gè)輪詢器配置才能使用它。 另一方面,Twitter實(shí)施速率限制,以管理應(yīng)用程序獲取更新的頻率。 使用舊的Twitter Channel適配器時(shí),您應(yīng)該考慮速率限制,以便您配置的輪詢間隔符合Twitter策略。
另一方面, 偵聽入站組件更簡(jiǎn)單,通常只需要實(shí)現(xiàn)MessageProducerSupport 。 這樣的偵聽組件看起來像這樣。
public class MyMessageProducer extends MessageProducerSupport {public MyMessageProducer(MessageChannel outputChannel) {// Defining an output channel is requiredsetOutputChannel(outputChannel);}@Overrideprotected void onInit() {super.onInit();// Custom initialization - if applicable - comes here}@Overridepublic void doStart() {// Lifecycle method for starting receiving messages}@Overridepublic void doStop() {// Lifecycle method for stopping receiving messages}private void receiveMessage() {// Receive data from upstream serviceSomeData data = ...;// Convert it to a message as appropriate and send it outthis.sendMessage(MessageBuilder.withPayload(data).build());}}只有兩個(gè)必需的元素:
- 必須定義輸出消息通道
- 每當(dāng)組件收到消息時(shí),都必須調(diào)用sendMessage
(可選)您可能希望控制組件的初始化并管理其生命周期。
由于Twitter的Streaming API本質(zhì)上是消息驅(qū)動(dòng)的,因此監(jiān)聽行為自然很合適。 讓我們看看如何在這樣的上下文中合并Twitter4J。
使用Twitter4J連接到Twitter Streaming API
Twitter4J管理連接處理的細(xì)微差別,并從Twitter的Streaming API接收更新。 我們需要做的就是獲取一個(gè)TwitterStream實(shí)例,附加一個(gè)偵聽器并定義過濾。
實(shí)例化
Twitter4J網(wǎng)站上的流示例表明,應(yīng)通過TwitterStreamFactory創(chuàng)建一個(gè)TwitterStream實(shí)例。 這完全有道理,但是在Spring應(yīng)用程序上下文中,我們希望它成為托管bean。
Spring的FactoryBean工具是包含創(chuàng)建單例TwitterStream實(shí)例的詳細(xì)信息的簡(jiǎn)單FactoryBean方法。
public class TwitterStreamFactory extends AbstractFactoryBean<TwitterStream> {@Overridepublic Class<?> getObjectType() {return TwitterStream.class;}@Overrideprotected TwitterStream createInstance() {return new twitter4j.TwitterStreamFactory().getInstance();}@Overrideprotected void destroyInstance(TwitterStream twitterStream) {twitterStream.shutdown();}}盡管我們也可以將其公開為普通的bean,而不用由FactoryBean創(chuàng)建FactoryBean ,但這并不會(huì)適當(dāng)?shù)貙⑵潢P(guān)閉。
附加偵聽器并定義過濾
這將是我們自定義MessageProducer實(shí)現(xiàn)的責(zé)任。
@Slf4j public class TwitterMessageProducer extends MessageProducerSupport {private final TwitterStream twitterStream;private List<Long> follows;private List<String> terms;private StatusListener statusListener;private FilterQuery filterQuery;public TwitterMessageProducer(TwitterStream twitterStream, MessageChannel outputChannel) {this.twitterStream = twitterStream;setOutputChannel(outputChannel);}@Overrideprotected void onInit() {super.onInit();statusListener = new StatusListener();long[] followsArray = null;if (!CollectionUtils.isEmpty(follows)) {followsArray = new long[follows.size()];for (int i = 0; i < follows.size(); i++) {followsArray[i] = follows.get(i);}}String[] termsArray = null;if (!CollectionUtils.isEmpty(terms)) {termsArray = terms.toArray(new String[0]);}filterQuery = new FilterQuery(0, followsArray, termsArray);}@Overridepublic void doStart() {twitterStream.addListener(statusListener);twitterStream.filter(filterQuery);}@Overridepublic void doStop() {twitterStream.cleanUp();twitterStream.clearListeners();}public void setFollows(List<Long> follows) {this.follows = follows;}public void setTerms(List<String> terms) {this.terms = terms;}StatusListener getStatusListener() {return statusListener;}FilterQuery getFilterQuery() {return filterQuery;}class StatusListener extends StatusAdapter {@Overridepublic void onStatus(Status status) {sendMessage(MessageBuilder.withPayload(status).build());}@Overridepublic void onException(Exception ex) {log.error(ex.getMessage(), ex);}@Overridepublic void onStallWarning(StallWarning warning) {log.warn(warning.toString());}} }MessageProducerSupport和TwitterStream的管理界面提供的生命周期方法可以很好地配合使用。 這也將使我們能夠在需要時(shí)在運(yùn)行時(shí)停止和啟動(dòng)組件。
Java配置
盡管Spring可以自動(dòng)裝配組件,但我仍然更喜歡通過手動(dòng)配置來控制依賴關(guān)系。
@Slf4j @Configuration public class TwitterConfig {@BeanTwitterStreamFactory twitterStreamFactory() {return new TwitterStreamFactory();}@BeanTwitterStream twitterStream(TwitterStreamFactory twitterStreamFactory) {return twitterStreamFactory.getInstance();}@BeanMessageChannel outputChannel() {return MessageChannels.direct().get();}@BeanTwitterMessageProducer twitterMessageProducer(TwitterStream twitterStream, MessageChannel outputChannel) {TwitterMessageProducer twitterMessageProducer =new TwitterMessageProducer(twitterStream, outputChannel);twitterMessageProducer.setTerms(Arrays.asList("java", "microservices", "spring"));return twitterMessageProducer;}@BeanIntegrationFlow twitterFlow(MessageChannel outputChannel) {return IntegrationFlows.from(outputChannel).transform(Status::getText).handle(m -> log.info(m.getPayload().toString())).get();}}這里的重要部分是我們的自定義消息生成器如何與消息流集成。 基本上,除了在生產(chǎn)者的輸出通道中列出消息之外,我們不需要執(zhí)行任何其他操作。
測(cè)試中
只有Chuck Norris在生產(chǎn)中測(cè)試代碼。 但是,像您和我這樣的普通凡人,我們確實(shí)會(huì)編寫測(cè)試用例。
@RunWith(SpringRunner.class) @ContextConfiguration(classes = TestConfig.class) public class TwitterMessageProducerTest {@MockBeanprivate TwitterStream twitterStream;@Autowiredprivate PollableChannel outputChannel;@Autowiredprivate TwitterMessageProducer twitterMessageProducer;@Testpublic void shouldBeInitialized() {StatusListener statusListener = twitterMessageProducer.getStatusListener();verify(twitterStream).addListener(statusListener);FilterQuery filterQuery = twitterMessageProducer.getFilterQuery();verify(twitterStream).filter(filterQuery);}@Testpublic void shouldReceiveStatus() {StatusListener statusListener = twitterMessageProducer.getStatusListener();Status status = mock(Status.class);statusListener.onStatus(status);Message<?> statusMessage = outputChannel.receive();assertSame(status, statusMessage.getPayload());}@Import(TwitterConfig.class)static class TestConfig {@BeanMessageChannel outputChannel() {return MessageChannels.queue(1).get();}}}我喜歡Twitter4J的設(shè)計(jì),因?yàn)樗昧私缑妗?/strong> 該庫(kù)的大多數(shù)重要部分都作為普通接口公開。 TwitterStream也不例外。 也就是說,在測(cè)試案例中可以輕松地將其嘲笑。
六,結(jié)論
- Spring Social現(xiàn)在已經(jīng)停產(chǎn)了 -它不會(huì)收到新功能
- Spring Integration的Twitter模塊可作為擴(kuò)展使用 -已從主項(xiàng)目中移出。
- Twitter入站通道適配器是一個(gè)輪詢用戶 -選擇輪詢間隔時(shí)必須處理速率限制
- Twitter的Streaming API符合入站通道適配器的監(jiān)聽行為
翻譯自: https://www.javacodegeeks.com/2018/12/streaming-api-spring-integration.html
streaming api
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的streaming api_通过Spring Integration消费Twitter Streaming API的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电脑版吉他(电脑电吉他软件)
- 下一篇: aws技术峰会2018_AWS re:I