带有Java DSL的Spring Integration MongoDB适配器
1引言
這篇文章解釋了如何使用Spring Integration從MongoDB數(shù)據(jù)庫中保存和檢索實體。 為了實現(xiàn)這一點,我們將使用Java DSL配置擴展來配置入站和出站MongoDB通道適配器。 例如,我們將構(gòu)建一個應(yīng)用程序,使您可以將訂單寫入MongoDB存儲,然后檢索它們進行處理。
應(yīng)用程序流程可以分為兩部分:
- 新訂單將發(fā)送到消息傳遞系統(tǒng),在該系統(tǒng)中它們將被轉(zhuǎn)換為實際產(chǎn)品,然后存儲到MongoDB。
- 另一方面,另一個組件正在連續(xù)輪詢數(shù)據(jù)庫并處理它找到的任何新產(chǎn)品。
可以在我的Spring Integration存儲庫中找到源代碼。
2 MessagingGateway –進入消息傳遞系統(tǒng)
我們的應(yīng)用程序?qū)ο鬟f系統(tǒng)一無所知。 實際上,它只會創(chuàng)建新訂單并將其發(fā)送到接口(OrderService):
@SpringBootApplication @EnableIntegration public class MongodbBasicApplication {public static void main(String[] args) {ConfigurableApplicationContext context = SpringApplication.run(MongodbBasicApplication.class, args);new MongodbBasicApplication().start(context);}public void start(ConfigurableApplicationContext context) {resetDatabase(context);Order order1 = new Order("1", true);Order order2 = new Order("2", false);Order order3 = new Order("3", true);InfrastructureConfiguration.OrderService orderService = context.getBean(InfrastructureConfiguration.OrderService.class);orderService.order(order1);orderService.order(order2);orderService.order(order3);}private void resetDatabase(ConfigurableApplicationContext context) {ProductRepository productRepository = context.getBean(ProductRepository.class);productRepository.deleteAll();} }首先看一下配置,我們可以看到OrderService實際上是一個消息傳遞網(wǎng)關(guān)。
@Configuration @ComponentScan("xpadro.spring.integration.endpoint") @IntegrationComponentScan("xpadro.spring.integration.mongodb") public class InfrastructureConfiguration {@MessagingGatewaypublic interface OrderService {@Gateway(requestChannel = "sendOrder.input")void order(Order order);}... }發(fā)送到order方法的任何訂單都將通過“ sendOrder.input”直接通道作為Message <Order>引入消息系統(tǒng)。
3第一部分-處理訂單
Spring Integration消息流的第一部分由以下組件組成:
我們使用lambda創(chuàng)建一個IntegrationFlow定義,該定義將DirectChannel注冊為其輸入通道。 輸入通道的名稱解析為'beanName + .input'。 因此,該名稱就是我們在網(wǎng)關(guān)中指定的名稱:“ sendOrder.input”
@Bean @Autowired public IntegrationFlow sendOrder(MongoDbFactory mongo) {return f -> f.transform(Transformers.converter(orderToProductConverter())).handle(mongoOutboundAdapter(mongo)); }流程在收到新訂單時要做的第一件事是使用變壓器將訂單轉(zhuǎn)換為產(chǎn)品。 要注冊一個變壓器,我們可以使用DSL API提供的Transformers工廠。 在這里,我們有不同的可能性。 我選擇的是使用PayloadTypeConvertingTransformer ,它將有效負載轉(zhuǎn)換為對象的委托給轉(zhuǎn)換器。
public class OrderToProductConverter implements Converter<Order, Product> {@Overridepublic Product convert(Order order) {return new Product(order.getId(), order.isPremium());} }訂單流程的下一步是將新創(chuàng)建的產(chǎn)品存儲到數(shù)據(jù)庫中。 在這里,我們使用MongoDB出站適配器:
@Bean @Autowired public MessageHandler mongoOutboundAdapter(MongoDbFactory mongo) {MongoDbStoringMessageHandler mongoHandler = new MongoDbStoringMessageHandler(mongo);mongoHandler.setCollectionNameExpression(new LiteralExpression("product"));return mongoHandler; }如果您想知道消息處理程序在內(nèi)部實際上在做什么,它將使用mongoTemplate保存該實體:
@Override protected void handleMessageInternal(Message<?> message) throws Exception {String collectionName = this.collectionNameExpression.getValue(this.evaluationContext, message, String.class);Object payload = message.getPayload();this.mongoTemplate.save(payload, collectionName); }4第二部分–加工產(chǎn)品
在第二部分中,我們還有另一個用于處理產(chǎn)品的集成流程:
為了檢索以前創(chuàng)建的產(chǎn)品,我們定義了一個入站通道適配器,它將繼續(xù)輪詢MongoDB數(shù)據(jù)庫:
@Bean @Autowired public IntegrationFlow processProduct(MongoDbFactory mongo) {return IntegrationFlows.from(mongoMessageSource(mongo), c -> c.poller(Pollers.fixedDelay(3, TimeUnit.SECONDS))).route(Product::isPremium, this::routeProducts).handle(mongoOutboundAdapter(mongo)).get(); }MongoDB入站通道適配器是負責從數(shù)據(jù)庫輪詢產(chǎn)品的適配器。 我們在構(gòu)造函數(shù)中指定查詢。 在這種情況下,我們每次都會輪詢一種未加工的產(chǎn)品:
@Bean @Autowired public MessageSource<Object> mongoMessageSource(MongoDbFactory mongo) {MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{'processed' : false}"));messageSource.setExpectSingleResult(true);messageSource.setEntityClass(Product.class);messageSource.setCollectionNameExpression(new LiteralExpression("product"));return messageSource; }路由器定義顯示了如何根據(jù)“溢價”字段將產(chǎn)品發(fā)送到其他服務(wù)激活器方法:
private RouterSpec<Boolean, MethodInvokingRouter> routeProducts(RouterSpec<Boolean, MethodInvokingRouter> mapping) {return mapping.subFlowMapping(true, sf -> sf.handle(productProcessor(), "fastProcess")).subFlowMapping(false, sf -> sf.handle(productProcessor(), "process")); }作為服務(wù)激活器,我們有一個簡單的bean,它記錄一條消息并將產(chǎn)品設(shè)置為已處理。 然后,它將返回產(chǎn)品,以便流程中的下一個端點可以處理它。
public class ProductProcessor {public Product process(Product product) {return doProcess(product, String.format("Processing product %s", product.getId()));}public Product fastProcess(Product product) {return doProcess(product, String.format("Fast processing product %s", product.getId()));}private Product doProcess(Product product, String message) {System.out.println(message);product.setProcessed(true);return product;} }將產(chǎn)品設(shè)置為已處理的原因是因為下一步是更新其在數(shù)據(jù)庫中的狀態(tài),以便不再對其進行輪詢。 我們通過將流再次重定向到mongoDb出站通道適配器來保存它。
5結(jié)論
您已經(jīng)了解了必須使用哪些端點才能使用Spring Integration與MongoDB數(shù)據(jù)庫進行交互。 出站通道適配器將產(chǎn)品被動保存到數(shù)據(jù)庫中,而入站通道適配器則主動輪詢數(shù)據(jù)庫以檢索新產(chǎn)品。
如果您發(fā)現(xiàn)此帖子有用,請分享或給我的存儲庫加注星標。 我很感激 :)
我正在Google Plus和Twitter上發(fā)布我的新帖子。 如果您要更新新內(nèi)容,請關(guān)注我。
翻譯自: https://www.javacodegeeks.com/2016/11/spring-integration-mongodb-adapters-java-dsl.html
總結(jié)
以上是生活随笔為你收集整理的带有Java DSL的Spring Integration MongoDB适配器的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 怎么进入路由器设置界面路由器设置界面如何
- 下一篇: 电脑系统采用几进制编码(计算机的数制和编