<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://camel.apache.org/schema/springhttp://camel.apache.org/schema/spring/camel-spring.xsd "
><camelContext xmlns="http://camel.apache.org/schema/spring" errorHandlerRef="myDLQ"><!-- 2 redeliveries max before failed message is placed into a DLQ --><errorHandler id="myDLQ" type="DeadLetterChannel" deadLetterUri="activemq:queue:errors" useOriginalMessage="true"><redeliveryPolicy maximumRedeliveries="2"/></errorHandler><!-- The polling of a specific folder every 30 sec --><route id="route1"><from uri="file:///Users/bli/folderToPoll?delay=30000&delete=true"/><unmarshal><csv/></unmarshal><split><simple>${body}</simple><setHeader headerName="customerId"><simple>${body[1]}</simple></setHeader><setHeader headerName="carModelId"><simple>${body[2]}</simple></setHeader><setBody><simple>${body[0]}</simple></setBody><to uri="activemq:queue:individualRequests?disableReplyTo=true"/></split></route><!-- The consumption of individual (jms) mailing requests --><route id="route2"><from uri="activemq:queue:individualRequests?maxConcurrentConsumers=5"/><pipeline><to uri="direct:getCustomerEmail"/><to uri="direct:sendMail"/></pipeline></route><!-- Obtain customer email by parsing the XML response of a REST web service --><route id="route3"><from uri="direct:getCustomerEmail"/><setBody><constant/></setBody><loadBalance><roundRobin/><to uri="http://backend1.mycompany.com/ws/customers?id={customerId}&authMethod=Basic&authUsername=geek&authPassword=secret"/><to uri="http://backend2.mycompany.com/ws/customers?id={customerId}&authMethod=Basic&authUsername=geek&authPassword=secret"/></loadBalance><setBody><xpath resultType="java.lang.String">/customer/general/email</xpath></setBody></route><!-- Group individual sendings by car model --><route id="route4"><from uri="direct:sendMail"/><aggregate strategyRef="myAggregator" completionSize="10"><correlationExpression><simple>header.carModelId</simple></correlationExpression><completionTimeout><constant>60000</constant></completionTimeout><setHeader headerName="recipients"><simple>${body}</simple></setHeader><pipeline><to uri="direct:prepareMail"/><to uri="direct:sendMailToMany"/></pipeline></aggregate></route><!-- Prepare the mail content --><route id="route5"><from uri="direct:prepareMail"/><setBody><simple>header.carModelId</simple></setBody><pipeline><to uri="sql:SELECT xml_text FROM template WHERE template_id =# ?dataSourceRef=myDS"/><to uri="xslt:META-INF/xsl/email-formatter.xsl"/></pipeline></route><!-- Send a mail to multiple recipients --><route id="route6"><from uri="direct:sendMailToMany"/><to uri="smtp://mail.mycompany.com:25?username=geek&password=secret&from=no-reply@mycompany.com&to={recipients}&subject=Your request&contentType=text/html"/><log message="Mail ${body} successfully sent to ${headers.recipients}"/></route></camelContext><!-- Pure Spring beans referenced in the various Camel routes --><!-- The ActiveMQ broker --><bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"><property name="brokerURL" value="tcp://localhost:61616"/></bean><!-- A datasource to our database --><bean id="myDS" class="org.apache.commons.dbcp.BasicDataSource"><property name="driverClassName" value="org.h2.Driver"/><property name="url" value="jdbc:h2:file:/Users/bli/db/MyDatabase;AUTO_SERVER=TRUE;TRACE_LEVEL_FILE=0"/><property name="username" value="sa"/><property name="password" value="sa"/></bean><!-- An aggregator implementation --><bean id="myAggregator" class="com.mycompany.camel.ConcatBody"/></beans>
和(僅!)Java類的代碼:
public class ConcatBody implements AggregationStrategy {public static final String SEPARATOR = ", ";public Exchange aggregate(Exchange aggregate, Exchange newExchange) {if (aggregate == null) {// The aggregation for the very exchange item is the exchange itselfreturn newExchange;} else {// Otherwise, we augment the body of current aggregate with new incoming exchangeString originalBody = aggregate.getIn().getBody(String.class);String bodyToAdd = newExchange.getIn().getBody(String.class);aggregate.getIn().setBody(originalBody + SEPARATOR + bodyToAdd);return aggregate;} }}