javascript
通过Spring Integration消费Twitter Streaming API
1.概述
眾所周知, Spring Integration具有用于與外部系統交互的大量連接器。 Twitter也不例外,而且很長一段時間以來,因為Spring Social一直是一個開箱即用的解決方案,Spring Integration利用該解決方案來連接到社交網絡。
1.1Spring社交EOL
不幸的是, Spring Social已經到了使用壽命 ,該項目現在處于維護模式。 Spring團隊決定不進一步開發Spring Social的原因是,使API綁定與社交網絡的API保持同步變得很繁瑣。
除此之外,Spring Framework 5發布后,開發人員希望利用其響應式編程模型,這將要求團隊在現有的響應式社交綁定旁邊重新實現一個響應式Spring Social綁定。
現在建議開發人員實現自己的綁定或使用專用庫之一連接社交網絡。
1.2 Spring Integration的Twitter模塊已移至擴展
Spring Social現在處于維護模式,這迫使Spring Integration團隊將Twitter支持模塊從主項目移至擴展。 由于Spring Social不會接收更新,因此它將基于早期的Spring Framework版本構建。 這將導致類路徑沖突,也將阻礙Spring Integration的開發。
因此, 從Spring Integration 5.1開始,Twitter模塊可作為擴展使用 。
1.3有哪些替代方案?
Twitter4J是Yamas Yusuke開發和維護的Twitter API的非官方Java庫。 官方的HBC庫(由Twitter構建)是一個Java HTTP Client,用于使用Twitter的Streaming API。 自2016年以來,后者從未見過重大更新,而Twitter4J正在定期更新。
也可以選擇實現自己的API綁定。 在使用RestTemplate的基于Spring的項目中,絕對是一個選擇,并且這是進行REST調用的簡便方法。
本指南以流模式使用Twitter4J,可以將其集成到Spring Integration消息流中。
1.4 Twitter流如何工作?
簡而言之, 您的應用打開了一個與Twitter API的單一連接,只要發生新匹配,就會通過該連接發送新結果 。 相反,另一種方法是通過向REST API重復發送請求來批量傳送數據。
流提供了一種低延遲的傳遞機制 ,該機制可以支持非常高的吞吐量,而不必處理速率限制。
2.示例項目
該示例項目展示了Twitter的Streaming API到Spring Integration消息流的集成,可在GitHub上找到 : https : //github.com/springuni/springuni-examples/tree/master/spring-integration/twitter-streaming 。
Maven依賴
由于Spring Social現在是EOL,因此我們不會在此基礎上繼續發展。 我們引入的只是spring-integration-core和twitter4j-stream 。
<dependencies><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-core</artifactId></dependency><dependency><groupId>org.twitter4j</groupId><artifactId>twitter4j-stream</artifactId><version>4.0.1</version></dependency></dependencies>該項目還使用了Lombok和Spring Boot測試支持,但是這些是可選的。
Spring Integration的可聽消息源
Spring Integration提供了對實現入站消息組件的支持。 它們分為輪詢和監聽行為 。
最初依賴于Inbound Twitter Channel Adapter建立在Spring Social之上,現在已移至擴展, 它是輪詢用戶 。 也就是說,您必須提供一個輪詢器配置才能使用它。 另一方面,Twitter實施速率限制,以管理應用程序獲取更新的頻率。 使用舊的Twitter Channel適配器時,您應該考慮速率限制,以便您配置的輪詢間隔符合Twitter策略。
另一方面, 偵聽入站組件更簡單,通常只需要實現MessageProducerSupport 。 這樣的偵聽組件看起來像這樣。
public class MyMessageProducer extends MessageProducerSupport {public MyMessageProducer(MessageChannel outputChannel) {// Defining an output channel is requiredsetOutputChannel(outputChannel);}@Overrideprotected void onInit() {super.onInit();// Custom initialization - if applicable - comes here}@Overridepublic void doStart() {// Lifecycle method for starting receiving messages}@Overridepublic void doStop() {// Lifecycle method for stopping receiving messages}private void receiveMessage() {// Receive data from upstream serviceSomeData data = ...;// Convert it to a message as appropriate and send it outthis.sendMessage(MessageBuilder.withPayload(data).build());}}只有兩個必需的元素:
- 必須定義輸出消息通道
- 每當組件收到消息時,都必須調用sendMessage
(可選)您可能希望控制組件的初始化并管理其生命周期。
由于Twitter的Streaming API本質上是消息驅動的,因此監聽行為自然很合適。 讓我們看看如何在這樣的上下文中合并Twitter4J。
使用Twitter4J連接到Twitter Streaming API
Twitter4J管理連接處理的細微差別,并從Twitter的Streaming API接收更新。 我們需要做的就是獲取一個TwitterStream實例,附加一個偵聽器并定義過濾。
實例化
Twitter4J網站上的流示例表明,應通過TwitterStreamFactory創建一個TwitterStream實例。 這完全有道理,但是在Spring應用程序上下文中,我們希望它成為托管bean。
Spring的FactoryBean工具是包含創建單例TwitterStream實例的詳細信息的簡單FactoryBean方法。
public class TwitterStreamFactory extends AbstractFactoryBean<TwitterStream> {@Overridepublic Class<?> getObjectType() {return TwitterStream.class;}@Overrideprotected TwitterStream createInstance() {return new twitter4j.TwitterStreamFactory().getInstance();}@Overrideprotected void destroyInstance(TwitterStream twitterStream) {twitterStream.shutdown();}}盡管我們也可以將其公開為普通的bean,而不用由FactoryBean創建FactoryBean ,但這不會適當地將其關閉。
附加偵聽器并定義過濾
這將是我們自定義MessageProducer實現的責任。
@Slf4j public class TwitterMessageProducer extends MessageProducerSupport {private final TwitterStream twitterStream;private List<Long> follows;private List<String> terms;private StatusListener statusListener;private FilterQuery filterQuery;public TwitterMessageProducer(TwitterStream twitterStream, MessageChannel outputChannel) {this.twitterStream = twitterStream;setOutputChannel(outputChannel);}@Overrideprotected void onInit() {super.onInit();statusListener = new StatusListener();long[] followsArray = null;if (!CollectionUtils.isEmpty(follows)) {followsArray = new long[follows.size()];for (int i = 0; i < follows.size(); i++) {followsArray[i] = follows.get(i);}}String[] termsArray = null;if (!CollectionUtils.isEmpty(terms)) {termsArray = terms.toArray(new String[0]);}filterQuery = new FilterQuery(0, followsArray, termsArray);}@Overridepublic void doStart() {twitterStream.addListener(statusListener);twitterStream.filter(filterQuery);}@Overridepublic void doStop() {twitterStream.cleanUp();twitterStream.clearListeners();}public void setFollows(List<Long> follows) {this.follows = follows;}public void setTerms(List<String> terms) {this.terms = terms;}StatusListener getStatusListener() {return statusListener;}FilterQuery getFilterQuery() {return filterQuery;}class StatusListener extends StatusAdapter {@Overridepublic void onStatus(Status status) {sendMessage(MessageBuilder.withPayload(status).build());}@Overridepublic void onException(Exception ex) {log.error(ex.getMessage(), ex);}@Overridepublic void onStallWarning(StallWarning warning) {log.warn(warning.toString());}} }MessageProducerSupport和TwitterStream的管理界面提供的生命周期方法可以很好地配合使用。 這也將使我們能夠在需要時在運行時停止和啟動組件。
Java配置
盡管Spring可以自動裝配組件,但我還是更喜歡通過手動配置來控制依賴關系。
@Slf4j @Configuration public class TwitterConfig {@BeanTwitterStreamFactory twitterStreamFactory() {return new TwitterStreamFactory();}@BeanTwitterStream twitterStream(TwitterStreamFactory twitterStreamFactory) {return twitterStreamFactory.getInstance();}@BeanMessageChannel outputChannel() {return MessageChannels.direct().get();}@BeanTwitterMessageProducer twitterMessageProducer(TwitterStream twitterStream, MessageChannel outputChannel) {TwitterMessageProducer twitterMessageProducer =new TwitterMessageProducer(twitterStream, outputChannel);twitterMessageProducer.setTerms(Arrays.asList("java", "microservices", "spring"));return twitterMessageProducer;}@BeanIntegrationFlow twitterFlow(MessageChannel outputChannel) {return IntegrationFlows.from(outputChannel).transform(Status::getText).handle(m -> log.info(m.getPayload().toString())).get();}}這里的重要部分是我們的自定義消息生成器如何與消息流集成。 基本上,除了在生產者的輸出通道中列出消息之外,我們不需要執行任何其他操作。
測試中
只有Chuck Norris在生產中測試代碼。 但是,像您和我這樣的普通凡人,我們確實會編寫測試用例。
@RunWith(SpringRunner.class) @ContextConfiguration(classes = TestConfig.class) public class TwitterMessageProducerTest {@MockBeanprivate TwitterStream twitterStream;@Autowiredprivate PollableChannel outputChannel;@Autowiredprivate TwitterMessageProducer twitterMessageProducer;@Testpublic void shouldBeInitialized() {StatusListener statusListener = twitterMessageProducer.getStatusListener();verify(twitterStream).addListener(statusListener);FilterQuery filterQuery = twitterMessageProducer.getFilterQuery();verify(twitterStream).filter(filterQuery);}@Testpublic void shouldReceiveStatus() {StatusListener statusListener = twitterMessageProducer.getStatusListener();Status status = mock(Status.class);statusListener.onStatus(status);Message<?> statusMessage = outputChannel.receive();assertSame(status, statusMessage.getPayload());}@Import(TwitterConfig.class)static class TestConfig {@BeanMessageChannel outputChannel() {return MessageChannels.queue(1).get();}}}我喜歡Twitter4J的設計,因為它利用了界面。 該庫的大多數重要部分都作為普通接口公開。 TwitterStream也不例外。 也就是說,在測試用例中可以輕松地將其嘲笑。
六,結論
- Spring Social現在已經停產了 -它不會收到新功能
- Spring Integration的Twitter模塊可作為擴展使用 -已從主項目中移出。
- Twitter入站通道適配器是一個輪詢用戶 –選擇輪詢間隔時必須處理速率限制
- Twitter的Streaming API符合入站通道適配器的監聽行為
翻譯自: https://www.javacodegeeks.com/2018/12/streaming-api-spring-integration.html
總結
以上是生活随笔為你收集整理的通过Spring Integration消费Twitter Streaming API的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电脑里打不出电脑里打不出来的字怎么办
- 下一篇: 组装一台电脑的流程自己组装电脑流程