戴尔集群监控与管理系统_监控与管理
戴爾集群監控與管理系統
本文是我們名為“ EAI的Spring集成 ”的學院課程的一部分。
在本課程中,向您介紹了企業應用程序集成模式以及Spring Integration如何解決它們。 接下來,您將深入研究Spring Integration的基礎知識,例如通道,轉換器和適配器。 在這里查看 !
目錄
1.簡介 2.發布和接收JMX通知1.簡介
在嘗試了Spring Integration提供的主要組件并了解了它如何與JMS隊列或Web服務之類的其他系統很好地集成之后,本章通過展示不同的監視或收集有關消息傳遞中發生的情況的機制來結束本課程。系統。
其中一些機制包括通過MBean管理或監視應用程序,MBean是JMX規范的一部分。 我們還將學習如何監視消息以查看消息傳遞流程中涉及哪些組件,以及如何為具有緩沖消息能力的組件保留消息。
本章討論的另一種機制是我們將如何使用元數據存儲實現EIP冪等接收器模式。
最后,描述的最后一種機制是控制總線。 這將使我們發送消息,這些消息將調用應用程序上下文中的組件上的操作。
2.發布和接收JMX通知
JMX規范定義了一種機制,該機制允許MBean發布將發送到其他MBean或管理應用程序的通知。 Oracle 文檔解釋了如何實現此機制。
Spring Integration通過提供能夠發布和接收JMX通知的通道適配器來支持此功能。 我們將看到一個使用兩個通道適配器的示例:
- 通知監聽通道適配器
- 通知發布通道適配器
發布JMX通知
在示例的第一部分中,消息傳遞系統通過其入口網關接收String消息(具有有效載荷類型為String的消息)。 然后,它使用服務激活器(通知處理程序)來構建javax.management.Notification并將其發送到通知發布通道適配器,該適配器將發布JMX通知。
第一部分的流程如下所示:
圖1
xml配置等同于上圖:
<context:component-scan base-package="xpadro.spring.integration.jmx.notification"/><context:mbean-export/> <context:mbean-server/><!-- Sending Notifications --> <int:gateway service-interface="xpadro.spring.integration.jmx.notification.JmxNotificationGateway" default-request-channel="entryChannel"/><int:channel id="entryChannel"/><int:service-activator input-channel="entryChannel" output-channel="sendNotificationChannel" ref="notificationHandler" method="buildNotification"/><int:channel id="sendNotificationChannel"/><int-jmx:notification-publishing-channel-adapter channel="sendNotificationChannel" object-name="xpadro.spring.integration.jmx.adapter:type=integrationMBean,name=integrationMbean"/>網關與前面的示例一樣簡單。 請記住,如果只有一種方法,則@Gateway注釋不是必需的:
public interface JmxNotificationGateway {public void send(String type); }Message將到達服務激活器,該服務激活器將使用JMX通知構建消息:
@Component("notificationHandler") public class NotificationHandler {private Logger logger = LoggerFactory.getLogger(this.getClass());private static final String NOTIFICATION_TYPE_HEADER = "jmx_notificationType";public void receive(Message<Notification> msg) {logger.info("Notification received: {}", msg.getPayload().getType());}public Message<Notification> buildNotification(Message<String> msg) {Notification notification = new Notification(msg.getPayload(), this, 0);return MessageBuilder.withPayload(notification).copyHeadersIfAbsent(msg.getHeaders()).setHeader(NOTIFICATION_TYPE_HEADER, "myJmxNotification").build();} }注意,我們已經設置了一個新的標題。 這對于提供通知類型是必需的,否則JMX適配器將拋出IllegalArgumentException并顯示消息“沒有可用的通知類型頭,并且沒有提供默認值”。
最后,我們只需要返回消息即可發送到發布適配器。 其余的由Spring Integration處理。
接收JMX通知
該流程的第二部分包含一個通知偵聽通道適配器,它將接收我們先前發布的通知。
圖2
xml配置:
<!-- Receiving Notifications --> <int-jmx:notification-listening-channel-adapter channel="receiveNotificationChannel" object-name="xpadro.spring.integration.jmx.adapter:type=integrationMBean,name=integrationMbean"/><int:channel id="receiveNotificationChannel"/><int:service-activator input-channel="receiveNotificationChannel" ref="notificationHandler" method="receive"/>我們將只收到通知并記錄它:
public void receive(Message<Notification> msg) {logger.info("Notification received: {}", msg.getPayload().getType()); }運行示例的應用程序:
public class NotificationApp {public static void main(String[] args) throws InterruptedException {AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:xpadro/spring/integration/jmx/config/int-notification-config.xml");JmxNotificationGateway gateway = context.getBean(JmxNotificationGateway.class);gateway.send("gatewayNotification");Thread.sleep(1000);context.close();} }3.從MBean輪詢托管屬性
假設我們有一個正在監視某些功能的MBean 。 使用屬性輪詢通道適配器,您的應用程序將能夠輪詢MBean并接收更新的數據。
我實現了一個MBean ,每次詢問時都會生成一個隨機數。 這不是最重要的功能,但可以為我們提供示例:
@Component("pollingMbean") @ManagedResource public class JmxPollingMBean {@ManagedAttributepublic int getNumber() {Random rnd = new Random();int randomNum = rnd.nextInt(100);return randomNum;} }流程再簡單不過了; 我們需要一個屬性輪詢通道適配器,用于指定MBean的類型和名稱。 適配器將輪詢MBean并將結果放置在結果通道中。 輪詢的每個結果將通過流標準輸出通道適配器顯示在控制臺上:
<context:component-scan base-package="xpadro.spring.integration.jmx.polling"/><context:mbean-export/> <context:mbean-server/><!-- Polling --> <int-jmx:attribute-polling-channel-adapter channel="resultChannel"object-name="xpadro.spring.integration.jmx.polling:type=JmxPollingMBean,name=pollingMbean"attribute-name="Number"><int:poller max-messages-per-poll="1" fixed-delay="1000"/> </int-jmx:attribute-polling-channel-adapter><int:channel id="resultChannel"/><int-stream:stdout-channel-adapter channel="resultChannel" append-newline="true"/>運行示例的應用程序:
public class PollingApp {public static void main(String[] args) throws InterruptedException {AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:xpadro/spring/integration/jmx/config/int-polling-config.xml");context.registerShutdownHook();Thread.sleep(5000);context.close();} }和控制臺輸出:
2014-04-16 16:23:43,867|AbstractEndpoint|started org.springframework.integration.config.ConsumerEndpointFactoryBean#0 82 72 20 47 21 2014-04-16 16:23:48,878|AbstractApplicationContext|Closing org.springframework.context.support.ClassPathXmlApplicationContext@72839224.調用MBean操作
下一個機制允許我們調用MBean的操作。 我們將實現另一個包含單個操作的bean,我們的老朋友hello world:
@Component("operationMbean") @ManagedResource public class JmxOperationMBean {@ManagedOperationpublic String hello(String name) {return "Hello " + name;} }現在,如果操作未返回結果,則可以使用通道適配器,或者如果網關不返回結果,則可以使用網關。 通過以下xml配置,我們導出MBean并使用網關來調用操作并等待結果:
<context:component-scan base-package="xpadro.spring.integration.jmx.operation"/><context:mbean-export/> <context:mbean-server/><int:gateway service-interface="xpadro.spring.integration.jmx.operation.JmxOperationGateway" default-request-channel="entryChannel"/><int-jmx:operation-invoking-outbound-gateway request-channel="entryChannel" reply-channel="replyChannel"object-name="xpadro.spring.integration.jmx.operation:type=JmxOperationMBean,name=operationMbean" operation-name="hello"/><int:channel id="replyChannel"/><int-stream:stdout-channel-adapter channel="replyChannel" append-newline="true"/>為了工作,我們必須指定MBean的類型和名稱以及我們要調用的操作。 結果將被發送到流通道適配器,以便在控制臺上顯示。
運行示例的應用程序:
public class OperationApp {public static void main(String[] args) throws InterruptedException {AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:xpadro/spring/integration/jmx/config/int-operation-config.xml");JmxOperationGateway gateway = context.getBean(JmxOperationGateway.class);gateway.hello("World");Thread.sleep(1000);context.close();} }5.將組件導出為MBean
此組件用于將消息通道,消息處理程序和消息端點導出為MBean,以便您可以對其進行監視。
您需要將以下配置放入您的應用程序:
<int-jmx:mbean-export id="integrationMBeanExporter"default-domain="xpadro.integration.exporter" server="mbeanServer"/><bean id="mbeanServer" class="org.springframework.jmx.support.MBeanServerFactoryBean"><property name="locateExistingServerIfPossible" value="true"/> </bean>并按照Spring 文檔中的說明設置以下VM參數:
-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=6969 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false運行該示例的應用程序將發送三則消息:
public class ExporterApp {public static void main(String[] args) throws InterruptedException {AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:xpadro/spring/integration/jmx/config/int-exporter-config.xml");context.registerShutdownHook();JmxExporterGateway gateway = context.getBean(JmxExporterGateway.class);gateway.sendMessage("message 1");Thread.sleep(500);gateway.sendMessage("message 2");Thread.sleep(500);gateway.sendMessage("message 3");} }應用程序運行后,您可以查看有關組件的信息。 以下屏幕快照是在JConsole上制作的:
圖3
您會注意到,輸入通道的sendCount屬性值為3,因為在本示例中,我們已發送了3條消息。
6.跟蹤消息路徑
在消息傳遞系統中,組件是松散耦合的。 這意味著發送組件不需要知道誰將接收消息。 反之,接收者只對接收到的消息感興趣,而不是誰發送消息。 當我們需要調試應用程序時,這種好處可能不是很好。
消息歷史記錄包括在消息上附加消息傳遞的所有組件的列表。
以下應用程序將通過多個組件發送消息來測試此功能:
圖4
該配置的關鍵元素在上圖中不可見: message-history元素:
<context:component-scan base-package="xpadro.spring.integration.msg.history"/><int:message-history/><int:gateway id="historyGateway" service-interface="xpadro.spring.integration.msg.history.HistoryGateway" default-request-channel="entryChannel"/><int:channel id="entryChannel"/><int:transformer id="msgTransformer" input-channel="entryChannel" expression="payload + 'transformed'" output-channel="transformedChannel"/><int:channel id="transformedChannel"/><int:service-activator input-channel="transformedChannel" ref="historyActivator"/>使用此配置集,消息流結尾的服務激活器將能夠通過查看消息的標頭來檢索已訪問組件的列表:
@Component("historyActivator") public class HistoryActivator {private Logger logger = LoggerFactory.getLogger(this.getClass());public void handle(Message<String> msg) {MessageHistory msgHistory = msg.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class);if (msgHistory != null) {logger.info("Components visited: {}", msgHistory.toString());}} }運行此示例的應用程序:
public class MsgHistoryApp {public static void main(String[] args) throws InterruptedException {AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:xpadro/spring/integration/msg/history/config/int-msg-history-config.xml");HistoryGateway gateway = context.getBean(HistoryGateway.class);gateway.send("myTest");Thread.sleep(1000);context.close();} }結果將顯示在控制臺上:
2014-04-16 17:34:52,551|HistoryActivator|Components visited: historyGateway,entryChannel,msgTransformer,transformedChannel7.保留緩沖的消息
Spring Integration中的某些組件可以緩沖消息。 例如,隊列通道將緩沖消息,直到使用者從中檢索消息為止。 另一個示例是聚合器端點; 如第二個教程中所見,此終結點將根據組的發布策略來收集消息,直到組完成為止。
這些集成模式意味著,如果發生故障,則緩沖的消息可能會丟失。 為了防止這種情況,我們可以保留這些消息,例如將它們存儲到數據庫中。 默認情況下,Spring Integration將這些消息存儲在內存中。 我們將使用消息存儲來更改此設置。
對于我們的示例,我們將這些消息存儲到MongoDB數據庫中。 為此,我們只需要以下配置:
<bean id="mongoDbFactory" class="org.springframework.data.mongodb.core.SimpleMongoDbFactory"><constructor-arg><bean class="com.mongodb.Mongo"/></constructor-arg><constructor-arg value="jcgdb"/> </bean><bean id="mongoDbMessageStore" class="org.springframework.integration.mongodb.store.ConfigurableMongoDbMessageStore"><constructor-arg ref="mongoDbFactory"/> </bean>現在,我們將創建一個應用程序來測試此功能。 我實現了一個通過網關接收帶有String有效負載的消息的流。 網關將該消息發送到隊列通道,該通道將緩沖消息,直到服務激活程序msgStoreActivator從隊列中檢索到該消息為止。 服務激活器將每五秒鐘輪詢一次消息:
<context:component-scan base-package="xpadro.spring.integration.msg.store"/><import resource="mongodb-config.xml"/><int:gateway id="storeGateway" service-interface="xpadro.spring.integration.msg.store.MsgStoreGateway" default-request-channel="entryChannel"/><int:channel id="entryChannel"><int:queue message-store="myMessageStore"/> </int:channel><int:service-activator input-channel="entryChannel" ref="msgStoreActivator"><int:poller fixed-rate="5000"/> </int:service-activator>也許您已經注意到myMessageStore bean。 為了查看持久消息機制的工作方式,我擴展了ConfigurableMongoDBMessageStore類以將日志放入其中并調試結果。 如果要嘗試此操作,可以刪除mongodb-config.xml的MongoDB messageStore bean,因為我們不再使用它。
我已經覆蓋了兩種方法:
@Component("myMessageStore") public class MyMessageStore extends ConfigurableMongoDbMessageStore {private Logger logger = LoggerFactory.getLogger(this.getClass());private static final String STORE_COLLECTION_NAME = "messageStoreCollection";@Autowiredpublic MyMessageStore(MongoDbFactory mongoDbFactory) {super(mongoDbFactory, STORE_COLLECTION_NAME);logger.info("Creating message store '{}'", STORE_COLLECTION_NAME);}@Overridepublic MessageGroup addMessageToGroup(Object groupId, Message<?> message) {logger.info("Adding message '{}' to group '{}'", message.getPayload(), groupId);return super.addMessageToGroup(groupId, message);}@Overridepublic Message<?> pollMessageFromGroup(Object groupId) {Message<?> msg = super.pollMessageFromGroup(groupId);if (msg != null) {logger.info("polling message '{}' from group '{}'", msg.getPayload(), groupId);}else {logger.info("Polling null message from group {}", groupId);}return msg;} }該機制的工作原理如下:
讓我們通過調試代碼來看看它是如何工作的。 我們將在輪詢消息以查看其在數據庫中的存儲方式之前停止:
圖5
現在,我們可以看一下數據庫:
圖6
恢復后,將從集合中輪詢消息:
圖7
運行示例的應用程序:
public class MsgStoreApp {public static void main(String[] args) throws InterruptedException {AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:xpadro/spring/integration/msg/store/config/int-msg-store-config.xml");MsgStoreGateway gateway = context.getBean(MsgStoreGateway.class);gateway.send("myMessage");Thread.sleep(30000);context.close();} }8.實現冪等組件
如果我們的應用程序需要避免重復消息,Spring Integration通過實現冪等接收器模式來提供此機制。 負責檢測重復消息的是元數據存儲組件。 該組件包括存儲鍵值對。 該框架提供了接口MetadataStore兩種實現:
- SimpleMetadataStore :默認實現。 它使用內存映射來存儲信息。
- PropertiesPersistingMetadataStore :如果需要持久化數據,則很有用。 它使用屬性文件。 我們將在示例中使用此實現。
好的,讓我們從配置文件開始:
<context:component-scan base-package="xpadro.spring.integration.msg.metadata"/><bean id="metadataStore" class="org.springframework.integration.metadata.PropertiesPersistingMetadataStore"/><int:gateway id="metadataGateway" service-interface="xpadro.spring.integration.msg.metadata.MetadataGateway"default-request-channel="entryChannel"/><int:channel id="entryChannel"/><int:filter input-channel="entryChannel" output-channel="processChannel"discard-channel="discardChannel" expression="@metadataStore.get(headers.messageId) == null"/><!-- Process message --> <int:publish-subscribe-channel id="processChannel"/><int:outbound-channel-adapter channel="processChannel" expression="@metadataStore.put(headers.messageId, '')"/><int:service-activator input-channel="processChannel" ref="metadataActivator" method="process"/><!-- Duplicated message - discard it --> <int:channel id="discardChannel"/><int:service-activator input-channel="discardChannel" ref="metadataActivator" method="discard"/>我們定義了一個“ metadataStore”,以便使用我們的屬性元數據存儲來代替默認的內存實現。
流程說明如下:
元數據存儲在文件系統中創建屬性文件。 如果您使用Windows,則會在“ C:\ Users \用戶名\ AppData \ Local \ Temp \ spring-integration”文件夾中看到一個meta-store.properties文件
該示例使用服務激活器來記錄是否已處理消息:
@Component("metadataActivator") public class MetadataActivator {private Logger logger = LoggerFactory.getLogger(this.getClass());public void process(Message<String> msg) {logger.info("Message processed: {}", msg.getPayload());}public void discard(Message<String> msg) {logger.info("Message discarded: {}", msg.getPayload());} }該應用程序將運行示例:
public class MetadataApp {private static final String MESSAGE_STORE_HEADER = "messageId";public static void main(String[] args) throws InterruptedException {AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:xpadro/spring/integration/msg/metadata/config/int-msg-metadata-config.xml");MetadataGateway gateway = context.getBean(MetadataGateway.class);Map<String,String> headers = new HashMap<>();headers.put(MESSAGE_STORE_HEADER, "msg1");Message<String> msg1 = MessageBuilder.withPayload("msg1").copyHeaders(headers).build();headers = new HashMap<>();headers.put(MESSAGE_STORE_HEADER, "msg2");Message<String> msg2 = MessageBuilder.withPayload("msg2").copyHeaders(headers).build();gateway.sendMessage(msg1);Thread.sleep(500);gateway.sendMessage(msg1);Thread.sleep(500);gateway.sendMessage(msg2);Thread.sleep(3000);context.close();} }第一次調用將在控制臺上產生以下輸出:
2014-04-17 13:00:08,223|MetadataActivator|Message processed: msg1 2014-04-17 13:00:08,726|MetadataActivator|Message discarded: msg1 2014-04-17 13:00:09,229|MetadataActivator|Message processed: msg2現在請記住,PropertiesPersistingMetadataStore將數據存儲在屬性文件中。 這意味著該數據將在ApplicationContext重新啟動后繼續存在。 因此,如果我們不刪除屬性文件,而是再次運行示例,結果將有所不同:
2014-04-17 13:02:27,117|MetadataActivator|Message discarded: msg1 2014-04-17 13:02:27,620|MetadataActivator|Message discarded: msg1 2014-04-17 13:02:28,123|MetadataActivator|Message discarded: msg29.發送操作調用請求
本教程討論的最后一種機制是控制總線 。 控制總線將使您以與應用程序相同的方式管理系統。 該消息將作為一種Spring Expression Language執行。 若要從控制總線執行,該方法需要使用@ManagedAttribute或@ManagedOperation批注。
本節的示例使用控制總線來調用Bean上的方法:
<context:component-scan base-package="xpadro.spring.integration.control.bus"/><int:channel id="entryChannel"/><int:control-bus input-channel="entryChannel" output-channel="resultChannel"/><int:channel id="resultChannel"/><int:service-activator input-channel="resultChannel" ref="controlbusActivator"/>將被調用的操作如下:
@Component("controlbusBean") public class ControlBusBean {@ManagedOperationpublic String greet(String name) {return "Hello " + name;} }運行該示例的應用程序發送一條消息,其中包含要執行的表達式:
public class ControlBusApp {public static void main(String[] args) throws InterruptedException {AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:xpadro/spring/integration/control/bus/config/int-control-bus-config.xml");MessageChannel channel = context.getBean("entryChannel", MessageChannel.class);Message<String> msg = MessageBuilder.withPayload("@controlbusBean.greet('World!')").build();channel.send(msg);Thread.sleep(3000);context.close();} }結果顯示在控制臺上:
2014-04-17 13:21:42,910|ControlBusActivator|Message received: Hello World!翻譯自: https://www.javacodegeeks.com/2015/09/monitoring-and-management.html
戴爾集群監控與管理系統
總結
以上是生活随笔為你收集整理的戴尔集群监控与管理系统_监控与管理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: amber 口译_口译员设计模式示例
- 下一篇: 薛佳凝和胡歌怎么回事(胡歌车祸后为什么和