apache camel_学习Apache Camel –实时索引推文
apache camel
在大多數軟件開發項目中,有一點需要使應用程序開始與其他應用程序或第三方組件通信。
無論是發送電子郵件通知,調用外部api,寫入文件還是將數據從一個地方遷移到另一個地方,您都可以推出自己的解決方案或利用現有框架。
對于Java生態系統中的現有框架,我們可以發現Tibco BusinessWorks和Mule ESB ,另一方面是Spring Integration和Apache Camel 。
在本教程中,我將通過一個示例應用程序向您介紹Apache Camel ,該示例應用程序從Twitter的示例提要中讀取推文,并使用Elastic Search實時對這些推文進行索引。
什么是Apache Camel?
將應用程序與生態系統中的內部或外部組件集成是軟件開發中最復雜的任務之一,如果操作不正確,則可能導致巨大的混亂,并導致長期維護的真正痛苦。
幸運的是,Camel是Apache托管的開源集成框架,它基于企業集成模式 ,這些模式可以幫助編寫更易讀和可維護的代碼。 與Lego相似,這些模式可以用作構建可靠軟件設計的基礎。
Apache Camel還支持各種各樣的連接器,以將您的應用程序與不同的框架和技術集成在一起。 順便說一下,它也可以與Spring很好地配合使用。
如果您不熟悉Spring,那么您可能會發現這篇文章很有幫助: 使用Spring Boot處理Twitter feed 。
在以下各節中,我們將介紹一個示例應用程序,其中Camel與Twitter示例提要和ElasticSearch集成在一起。
什么是ElasticSearch?
類似于Apache Solr的 ElasticSearch是基于Apache Lucene的高度可擴展的基于Java的開源全文搜索引擎。
在此示例應用程序中,我們將使用ElasticSearch實時索引推文,并在這些推文上提供全文本搜索功能。
其他使用過的技術
除了Apache Camel和ElasticSearch,我還在此應用程序中包括其他框架: Gradle作為構建工具, Spring Boot作為Web應用程序框架,以及Twitter4j,用于從Twitter示例提要中讀取推文。
入門
該項目的框架是在http://start.spring.io生成的,在該項目中,我檢查了Web依賴項選項,填寫了“項目元數據”部分,然后選擇“ Gradle Project”作為項目類型。
生成項目后,您可以下載并將其導入您喜歡的IDE。 我現在不打算在Gradle上詳細介紹,但是這是build.gradle文件中所有依賴項的列表:
def camelVersion = '2.15.2' dependencies {compile("org.springframework.boot:spring-boot-starter-web")compile("org.apache.camel:camel-core:${camelVersion}")compile("org.apache.camel:camel-spring-boot:${camelVersion}")compile("org.apache.camel:camel-twitter:${camelVersion}")compile("org.apache.camel:camel-elasticsearch:${camelVersion}")compile("org.apache.camel:camel-jackson:${camelVersion}")compile("joda-time:joda-time:2.8.2")testCompile("org.springframework.boot:spring-boot-starter-test") }使用駱駝路線進行整合
駱駝實現了面向消息的體系結構,它的主要構建模塊是描述消息流的路由 。
可以用XML(舊方式)或Java DSL(新方式)描述路由。 我們將僅在本文中討論Java DSL,因為這是首選且更優雅的選擇。
好吧,讓我們看一個簡單的Route:
from("file://orders").convertBodyTo(String.class).to("log:com.mycompany.order?level=DEBUG").to("jms:topic:OrdersTopic");這里有幾件事要注意:
- 消息在由URI表示并使用URI配置的端點之間流動
- 路由只能有一個消息生產者端點(在本例中為“ file:// orders”,它從orders文件夾中讀取文件)和多個消息消費者端點:
- “ log:com.mycompany.order?level = DEBUG”,它將文件內容記錄在com.mycompany.order日志記錄類別下的調試消息中,
- 在端點之間,可以更改消息,即:convertBodyTo(String.class)將消息正文轉換為String。
另請注意,相同的URI可以在一個路由中用于使用者端點,而在另一路由中用于生產者端點:
from("file://orders").convertBodyTo(String.class).to("direct:orders");from("direct:orders).to("log:com.mycompany.order?level=DEBUG").to("jms:topic:OrdersTopic");Direct端點是通用端點之一,它允許將消息從一條路由同步傳遞到另一條路由。
這有助于創建可讀代碼并在代碼的多個位置重用路由。
索引推文
現在,讓我們看一下代碼中的一些路由。 讓我們從簡單的事情開始:
private String ES_TWEET_INDEXER_ENDPOINT = "direct:tweet-indexer-ES";...from("twitter://streaming/sample?type=EVENT&consumerKey={{twitter4j.oauth.consumerKey}}&consumerSecret={{twitter4j.oauth.consumerSecret}}?cessToken={{twitter4j.oauth.accessToken}}?cessTokenSecret={{twitter4j.oauth.accessTokenSecret}}").to(ES_TWEET_INDEXER_ENDPOINT);這是如此簡單,對吧? 到現在為止,您可能已經知道,該路由會從Twitter示例提要中讀取推文,并將它們傳遞到“ direct:tweet-indexer-ES”端點。 請注意,consumerKey,consumerSecret等已配置并作為系統屬性傳遞(請參見http://twitter4j.org/en/configuration.html )。
現在,讓我們看一下一個稍微復雜的Route,它從“ direct:tweet-indexer-ES”端點讀取,并將Tweets批量插入到Elasticsearch中(有關每個步驟的詳細說明,請參見注釋):
@Value("${elasticsearch.tweet.uri}")private String elasticsearchTweetUri;...from(ES_TWEET_INDEXER_ENDPOINT)// groups tweets into separate indexes on a weekly basis to make it easier clean up old tweets:.process(new WeeklyIndexNameHeaderUpdater(ES_TWEET_INDEX_TYPE))// converts Twitter4j Tweet object into an elasticsearch document represented by a Map:.process(new ElasticSearchTweetConverter())// collects tweets into weekly batches based on index name:.aggregate(header("indexName"), new ListAggregationStrategy())// creates new batches every 2 seconds.completionInterval(2000)// makes sure the last batch will be processed before the application shuts down:.forceCompletionOnStop()// inserts a batch of tweets to elasticsearch: .to(elasticsearchTweetUri).log("Uploaded documents to ElasticSearch index ${headers.indexName}: ${body.size()}");關于此路線的注意事項:
- elasticsearchTweetUri是一個字段,其值由Spring從application.properties文件(elasticsearch.tweet.uri = elasticsearch:// tweet-indexer?operation = BULK_INDEX&ip = 127.0.0.1&port = 9300)中獲取并注入到該字段中
- 為了在Route中實現自定義處理邏輯,我們可以創建實現Processor接口的類。 參見WeeklyIndexNameHeaderUpdater和ElasticSearchTweetConverter
- 使用自定義ListAggregationStrategy策略聚合推文,該策略將消息聚合到ArrayList中,稍后每2秒(或在應用程序停止時)傳遞到下一個終結點
- Camel實現了一種表達語言 ,我們正在使用它來記錄批處理的大小(“ $ {body.size()}”)和插入消息的索引的名稱($ {headers.indexName})。
在Elasticsearch中搜索推文
現在我們已經在Elasticsearch中索引了推文,是時候對其進行一些搜索了。
首先,讓我們看一下接收搜索查詢的Route和限制搜索結果數量的maxSize參數:
public static final String TWEET_SEARCH_URI = "vm:tweetSearch";...from(TWEET_SEARCH_URI).setHeader("CamelFileName", simple("tweet-${body}-${header.maxSize}-${date:now:yyyyMMddHHmmss}.txt"))// calls the search() method of the esTweetService which returns an iterator// to process search result - better than keeping the whole resultset in memory:.split(method(esTweetService, "search"))// converts Elasticsearch doucment to Map object:.process(new ElasticSearchSearchHitConverter())// serializes the Map object to JSON:.marshal(new JacksonDataFormat())// appends new line at the end of every tweet.setBody(simple("${body}\n"))// write search results as json into a file under /tmp folder:.to("file:/tmp?fileExist=Append").end().log("Wrote search results to /tmp/${headers.CamelFileName}");當消息傳遞到“ vm:tweetSearch”端點(使用內存隊列異步處理消息)時,將觸發此路由。
SearchController類實現REST api,從而允許用戶通過使用Camel的ProducerTemplate類將消息發送到“ vm:tweetSearch”端點來運行tweet搜索:
@Autowiredprivate ProducerTemplate producerTemplate;@RequestMapping(value = "/tweet/search", method = { RequestMethod.GET, RequestMethod.POST },produces = MediaType.TEXT_PLAIN_VALUE)@ResponseBodypublic String tweetSearch(@RequestParam("q") String query,@RequestParam(value = "max") int maxSize) {LOG.info("Tweet search request received with query: {} and max: {}", query, maxSize);Map<String, Object> headers = new HashMap<String, Object>();// "content" is the field in the Elasticsearch index that we'll be querying:headers.put("queryField", "content");headers.put("maxSize", maxSize);producerTemplate.asyncRequestBodyAndHeaders(CamelRouter.TWEET_SEARCH_URI, query, headers);return "Request is queued";}這將觸發Elasticsearch的執行,但是結果不會在響應中返回,而是寫入/ tmp文件夾中的文件(如前所述)。
此路由使用ElasticSearchService類在ElasticSearch中搜索推文。 當執行此Route時,Camel調用search()方法并傳遞搜索查詢和maxSize作為輸入參數:
public SearchHitIterator search(@Body String query, @Header(value = "queryField") String queryField, @Header(value = "maxSize") int maxSize) {boolean scroll = maxSize > batchSize;LOG.info("Executing {} on index type: '{}' with query: '{}' and max: {}", scroll ? "scan & scroll" : "search", indexType, query, maxSize);QueryBuilder qb = termQuery(queryField, query);long startTime = System.currentTimeMillis();SearchResponse response = scroll ? prepareSearchForScroll(maxSize, qb) : prepareSearchForRegular(maxSize, qb);return new SearchHitIterator(client, response, scroll, maxSize, KEEP_ALIVE_MILLIS, startTime);}請注意,根據maxSize和batchSize,代碼將執行常規搜索以返回單頁結果,或者執行滾動請求以使我們能夠檢索大量結果。 在滾動的情況下, SearchHitIterator將隨后調用Elasticsearch以分批檢索結果。
安裝ElasticSearch
cluster.name:tweet-indexer
這些步驟將允許您以最少的配置運行獨立的Elasticsearch實例,但請記住,它們并非供生產使用。
運行
這是應用程序的入口點,可以從命令行運行。
package com.kaviddiss.twittercamel;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);} }要運行該應用程序,請從您喜歡的IDE運行Application.main()方法,或者從命令行執行以下代碼:
$GRADLE_HOME/bin/gradlew build && java -jar build/libs/twitter-camel-ingester-0.0.1-SNAPSHOT.jar一旦應用程序啟動,它將自動開始索引推文。 轉到http:// localhost:9200 / _plugin / bigdesk /#cluster可視化索引:
要搜索推文,請在瀏覽器中輸入與此類似的URL: http:// localhost:8080 / tweet / search?q = toronto&max = 100 。
使用BigDesk插件,我們可以監視Elasticsearch如何索引推文:
結論
在Apache Camel的簡介中,我們介紹了如何使用此集成框架與Twitter提要feed和Elasticsearch之類的外部組件進行通信,以實時索引和搜索推文。
- 示例應用程序的源代碼可從https://github.com/davidkiss/twitter-camel-ingester獲得 。
翻譯自: https://www.javacodegeeks.com/2015/09/learn-apache-camel-indexing-tweets-in-real-time.html
apache camel
總結
以上是生活随笔為你收集整理的apache camel_学习Apache Camel –实时索引推文的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 月球和地球哪个质量大 月球和地球的质量比
- 下一篇: dubbo单元测试调用_使用LocalT