Camel中的几个重要概念之 Components
http://jnn.iteye.com/blog/320094
?
Components
Component 是一個容易混淆的名詞,可能使用EndpointFactory會更合適,因為Component是創建Endpoint實力的工廠類。例如如果一個Camel應用使用了幾個JMS 隊列,那么這個應用首先需要創建一個叫JmsComponent(實現了Component接口)的實例,然后應用會調用這個JMSComponent對象的createEndpoint()方法來創建一個JmsEndpoint對象(這個對象實現了Endpoint接口)。事實上,應用代碼并沒直接調用Component.createEndoint() 方法,而是CamelContext對象通過找到對應的Component對象(我馬上會在后續的文章中介紹),并調用createEndpoint() 方法來實現的。
myCamelContext.getEndpoint("pop3://john.smith@mailserv.example.com?password=myPassword");
在getEndpoint()中使用的參數就是URI。這個URI的前綴(: 之前的那部分內容)描述了一個組件的名字,CamelContext對象內部維護著一個組件名字與Component對象的映射表。對于上面給定的URI例子來說,CamelContext對象會根據pop3前綴找到MailComponent類,然后CamelContext對會調用MailComponent的createEndpoint("pop3://john.smith@mailserv.example.com?password=myPassword") 方法。在createEndpoint()方法中, 將把URI分割成一段段小的參數,這些小參數將被用來設置生成的Endpoint對象。
在上一段中, 我提到的CamelContext對象維護了一個組件名到Component對象的映射表。但這個映射表是如何產生的呢?這里可以在通過代碼調用CamelContext.addComponent(String componentName, Component component)來實現。 下面的例子就是展示了如何給一個MailComponent對象注冊上三個不同的名字。
Component mailComponent = new org.apache.camel.component.mail.MailComponent();
myCamelContext.addComponent("pop3", mailComponent);
myCamelContext.addComponent("imap", mailComponent);
myCamelContext.addComponent("smtp", mailComponent);
第二個方法也是最常用的方法,就是通過CamelContext對象來實現一個懶初始化。這個方法依賴于一套Camel內部的定義Component發現規則, 開發者只要在實現Component接口的時候按照這一規則設置,就可以保證CamelContext能夠正常發現這一Component。這里我們假設你所寫的Class名字為 com.example.myproject.FooComponent, 并且你想讓Camel自動將這個component和"foo”這個名字相對應。為了做到這一點,你需要先寫一個叫做"META-INF/services/org/apache/camel/component/foo" 屬性文件, 注意這個文件沒有".properties"作為后綴名,在這個屬性文件中只有一個class的條目,而這個條目的只就是你所寫的類的全名。如下所示
META-INF/services/org/apache/camel/component/foo
class=com.example.myproject.FooComponent
如果你還想讓Camel將上面的類和”bar” 這個名字聯系起來,那你需要在同樣的目錄下在創建一個相同內容叫bar的文件。一旦完成了這些配置, 你可以把 com.example.myproject.FooComponent class和這些配置文件一同打成一個jar 包,然后把這個jar包放你的CLASSPATH中。這樣Camel就會通過分析這些屬性文件的class 項目,通過使用reflectionAPI創建這個指定的類的實例。
正如我在Endpoint中說描述的, Camel提供了對多種通信協議一個開箱即用的支持。這種支持是建立在實現了Component接口的類以及讓CamelContext對象自動建立映射關系的配置文件基礎之上的。
在這一節的開始, 我使用的這個例子來調用CamelContext.getEndpoint()。
myCamelContext.getEndpoint("pop3://john.smith@mailserv.example.com?password=myPassword");
在最開始舉這個例子的時候,我說這個getEndpoint()方法的參數是一個URI。我這么說是因為Camel的在線問答以及Camel的源代碼就把這個參數聲明為一個URI。在現實生活中,這個參數是按照URL來定義的。這是因為Camel會從參數中通過一個簡單的算法查找第一:來分析出組件名。為了了解其中的奧妙,大家可以回想一下我在前面 URL,URI,URN和IRI是什么中談到的 一個URI可以是URL或者URN。 現在讓我們來看一下下面的getEndpoint()調用。
myCamelContext.getEndpoint("pop3:...");
myCamelContext.getEndpoint("jms:...");
myCamelContext.getEndpoint("urn:foo:...");
myCamelContext.getEndpoint("urn:bar:...");
Camel會先找出這些component的標識,例如 "pop3", "jms", "urn" 和 "urn"。如果"urn:foo" 和"urn:bar" 能夠別用來識別component,或者是使用"foo" 和"bar" (這一可以跳過這個"urn:"前綴)。所以在實際的編程中,大家更喜歡使用URL來制定一個Endpoint(使用":..."來描述的字符串)而不是用一個URN( 使用"urn::..."來描述的字符串)。正因為我們沒有安全按照URN的規定的參數來調用getEndpoint() 方法, 所以這個方法的參數更像一個URL而不是一個URI。
?
===========================
1.component.createEndpoint()
2.if it's in from(...) clause, then call endpoint.createConsumer()
? 2.1 consumer will call doStart(),
? 2.2 create message and exchange, set exchange.in,then call processor to process this exchange.
??you can create several exchange, each exchange mean one process.
?3.if it's in to(...), then call endpoint.createProducer()
? 3.1producer call process(), then you can process this exchange, set this exchange.out,?then it will be processed
?
Component
public class MemoryComponent extends DefaultComponent{
?@Override
?//memory:myMemory?param=value
?//remaining = myMemory
?protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
??System.out.println("createEndpoint(String uri, String remaining, Map<String, Object> parameters)");
??MemoryEndpoint endpoint = new MemoryEndpoint(uri, this);
??return endpoint;
?}
}
Endpoint
public class MemoryEndpoint extends DefaultEndpoint{
?public MemoryEndpoint() {
??super();
??System.out.println("MemoryEndpoint()");
?}
?public MemoryEndpoint(String endpointUri, Component component) {
??super(endpointUri, component);
??System.out.println("MemoryEndpoint(String endpointUri, Component component)");
?}
?@Override
?public Consumer createConsumer(Processor processor) throws Exception {
??System.out.println("createConsumer(Processor processor)");
??Consumer memConsumer = new MemoryConsumer(this,processor);
??return memConsumer;
?}
?@Override
?public Producer createProducer() throws Exception {
??System.out.println("createProducer()");
??Producer memProducer = new MemoryProducer(this);
??return memProducer;
?}
?@Override
?public boolean isSingleton() {
??System.out.println("isSingleton()");
??return false;
?}
}
Producer
public class MemoryProducer extends DefaultProducer{
?public MemoryProducer(Endpoint endpoint) {
??super(endpoint);
??System.out.println("MemoryProducer(Endpoint endpoint)");
?}
?@Override
?public void process(Exchange exchange) throws Exception {
??System.out.println("process(Exchange exchange)");
??Object body = exchange.getIn().getBody();
??System.out.println("add to queue:" + (String)body);
??exchange.getOut().setBody("producer-" + (String)body);
??StaticQueue.producerQueue.add("producer-" + (String)body);
?}
}
?
Consumer
public class MemoryConsumer extends DefaultConsumer {
?public MemoryConsumer(Endpoint endpoint, Processor processor) {
??super(endpoint, processor);
??System.out.println("MemoryConsumer(Endpoint endpoint, Processor processor)");
?}
?@Override
?protected void doStart() throws Exception {
??super.doStart();
??
??System.out.println("doStart()");
??for (String str : StaticQueue.consumerQueue) {
???Exchange ex = this.getEndpoint().createExchange();
???Message msg = new DefaultMessage();
???msg.setBody(str);
???ex.setIn(msg);
???getProcessor().process(ex);
??}
?}
?@Override
?protected void doStop() throws Exception {
??super.doStop();
??System.out.println("doStop()");
??
?}
}
?
Processor
public class MemoryProcessor implements Processor {
?@Override
?public void process(Exchange exchange) throws Exception {
??System.out.println("MemoryProcessor-process(Exchange exchange)");
??Message in = exchange.getIn();
??System.out.println("in:" +in.getBody());
??Message out = exchange.getOut();
??System.out.println("out:" +out.getBody());
??
??//self defined processor need to transmit the msg from in to out,?
//to("memory:mymemory").process(processor)
??out.setBody(in.getBody() + " out processed");
??
??//in.setBody(in.getBody() + " in processed");
??
?}
}
?
?
Test
public class Test {
?/**
? * @param args
? * @throws Exception
? */
?public static void main(String[] args) throws Exception {
??if ( System.getProperty("log4j.configuration") != null )
???PropertyConfigurator.configure(System.getProperty("log4j.configuration"));
??else
???BasicConfigurator.configure();
??
??// TODO Auto-generated method stub
??Component memoryCom = new MemoryComponent();
??CamelContext context = new DefaultCamelContext();
??context.addComponent("memory", memoryCom);
??
??
??Component streamCom = new StreamComponent();
??context.addComponent("in", streamCom);
??context.addComponent("out", streamCom);
??
??context.addRoutes(new MemoryRouteBuilder());
??context.start();
??
??Runtime.getRuntime().addShutdownHook(new Thread(){
???@Override
???public void run() {
????System.out.println("queues:");
????for (String str : StaticQueue.producerQueue) {
?????System.out.println("queue:"+str);
????}
???}
???
??});
?}
}
class MemoryRouteBuilder extends RouteBuilder {
?/** * A main() so we can easily run these routing rules in our IDE */
?
?/** * Lets configure the Camel routing rules using Java code... */
?public void configure() {
??//producer
//??from("timer://myTimer?period=2000").setBody().simple("myTimertest1").
//??to("memory:mymemory");
??
??//consumer
??MemoryProcessor processor = new MemoryProcessor();
??//from("memory:mymemory").to("file://C://test");
??//from("memory:mymemory").to("memory:mymemory").to("file://C://test");
??from("memory:mymemory").to("memory:mymemory").process(processor).to("file://test");
????//to("log:out");
??//??to("stream:out");
??
??//from("stream:in?promptMessage=Enter&promptDelay=5000").to("stream:out");
?}
}
class StaticQueue? {
?public static Queue<String> consumerQueue = new LinkedBlockingQueue<String>();
?public static Queue<String> producerQueue = new LinkedBlockingQueue<String>();
?static {
??consumerQueue.add("consumer-1");
??consumerQueue.add("consumer-2");
?}
?
}
?
總結
以上是生活随笔為你收集整理的Camel中的几个重要概念之 Components的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Maven2 的常用命令
- 下一篇: Camel中的几个重要概念之Routes