OpenDDS
OpenDDS簡介
Don Busch,首席軟件工程師兼合作伙伴
Object Computing,Inc.(OCI)
介紹
分布式實時應用程序有時以數據為中心而不是以服務為中心,這意味著分布式系統中參與者的主要目標是分發應用程序數據,而不是訪問共享服務。應用程序數據的提供者和/或使用者的集合在設計時可能是未知的,并且可能會在應用程序的整個生命周期內發生變化。通常,以發布/訂閱通信模型而不是請求/響應模型最有效地實現以數據為中心的范例。
用于實時系統的OMG 數據分發服務(DDS)解決了以數據為中心的分布式應用程序的性能要求和實時性要求。DDS增加了分布式實時系統開發人員可以使用的發布/訂閱選項的范圍。為了方便起見,使用OMG接口定義語言(IDL)定義DDS接口。但是,大多數細節留給實現,最重要的是如何在發布者和訂閱者之間進行數據傳輸。DDS實現者決定將底層數據通過TCP,UDP,UDP多播,共享內存等從發布者移動到訂閱者的基礎通信機制。使用CORBA或IIOP協議不需要DDS規范的實現。將數據從發布者傳輸到訂閱者。
OpenDDS是OMG數據分發服務規范的開源C ++實現。OpenDDS包含基于文件的配置機制。通過配置文件,OpenDDS用戶可以配置發布者或訂閱者的傳輸,調試輸出,內存分配,DCPSInfoRepo代理進程的位置以及許多其他設置。《OpenDDS開發人員指南》的“配置”一章中介紹了完整的配置設置集。
在本文中,我們涵蓋以下主題:
OMG DDS的OpenDDS實現
DDS架構
股票報價員示例
IDL類型
發行人
訂戶
訂戶的聽眾
建立發布者和訂閱者
配置股票報價器
通過TCP傳輸運行股票報價器
通過UDP傳輸運行股票報價器
摘要
參考資料
OMG DDS的OpenDDS實現
OpenDDS利用可插拔的傳輸體系結構,通過應用程序開發人員選擇的傳輸和封送實施實現數據傳輸。從概念上講,該體系結構借鑒了TAO的可插入協議框架。OpenDDS當前支持TCP和UDP點對點傳輸以及不可靠和可靠的多播,并使用高性能的封送處理實現。
這種可插拔的傳輸體系結構允許DDS用戶基于所需的傳輸以及應用程序部署的同質或異質性質來優化DDS安裝。可以做出這些選擇而不會影響應用程序代碼本身。
封送處理代碼由專用的OpenDDS IDL編譯器生成。單個單獨的DCPS信息存儲庫(DCPSInfoRepo)流程充當中央票據交換所,將發布者和訂閱者聯系在一起。在幕后,OpenDDS使用CORBA與DCPSInfoRepo流程進行通信, 以關聯發布者和訂閱者。發布者和訂閱者之間的數據傳輸直接在發布和訂閱過程之間進行。OpenDDS為RB和在發送或接收DDS數據時發生的非CORBA I / O創建自己的線程。
DDS架構
OMG數據分發服務規范將DDS分為兩個單獨的體系結構層。較低的層是以數據為中心的發布和訂閱(DCPS)層,其中包含到發布/訂閱通信機制的類型安全接口。上層是數據本地重構層(DLRL),它使應用程序開發人員可以在DCPS層之上構建本地對象模型,從而使應用程序免受DCPS知識的影響。每一層都有自己的概念和使用模式集,因此可以分別討論這兩層的概念和術語。
以數據為中心的發布和訂閱-DCPS
DCPS層負責有效地將數據從發布者分發到感興趣的訂閱者。它 在發送方使用發布者和數據寫入器 ,在接收方使用訂戶和數據讀取器的概念來實現。DCPS層由一個或多個數據域組成,每個數據域都包含一組通過DDS進行通信的參與者(發布者和訂閱者)。每個實體(即發布者或訂閱者)都屬于一個域。每個進程在其所屬的每個數據域中都有一個域參與者。
在任何數據域中,數據都是由主題標識的,該主題是特定于類型的域段,允許發布者和訂閱者明確地引用數據。在域中,主題將唯一的主題名稱,數據類型和一組服務質量(QoS)策略與數據本身相關聯。每個主題僅與一種數據類型相關聯,盡管許多不同的主題可以發布相同的數據類型。發布者的行為由與特定數據源的發布者,數據寫入者和主題元素相關聯的QoS策略確定。同樣,訂戶的行為由與訂戶,數據讀取器和特定數據接收器的主題元素相關聯的QoS策略確定。
有關DCPS術語的更多信息,請參見《OpenDDS開發人員指南》。
DDS規范定義了許多服務質量(QoS)策略,應用程序可使用這些策略來指定其可靠性,資源使用,容錯以及對服務的其他要求。參與者指定他們從服務中需要的行為;服務決定如何實現這些行為。這些策略可以應用于各種DCPS實體(主題,數據寫入器,數據讀取器,發布者,訂閱者和域參與者),盡管并非所有策略對所有類型的實體都有效。
訂閱者和發布者通過報價請求范例協作指定QoS策略。發布者向所有訂閱者提供一套QoS策略。訂戶請求其所需的QoS策略集。然后,DDS實現會嘗試將請求的策略與提供的策略進行匹配。如果策略一致,則發布和訂閱將匹配。
OpenDDS支持全套DCPS服務質量(QoS)策略,包括:
| 活潑 | 控制活動性檢查,以確保系統中預期的實體仍處于活動狀態 |
| 可靠性 | 確定是否允許該服務刪除樣本 |
| 歷史 | 控制實例的值發生變化,然后將其傳達給所有訂閱服務器,該實例發生了什么情況 |
| 資源限制 | 控制服務可用于滿足其他QoS要求的資源 |
有關服務質量策略的更完整列表和更詳細的服務質量定義,請參閱對象管理組的DDS白皮書簡介附錄A。
數據本地重建層-DLRL
數據本地重建層(DLRL)是DCPS之上的面向對象層。DLRL對象是具有一個或多個共享屬性的本機語言(即C ++)對象。每個DLRL類都映射到一個或多個DCPS主題。每個共享屬性值都映射到主題數據類型的字段,并且其值通過DCPS分布在整個應用程序中。DLRL參與者通過修改DLRL對象將數據與應用程序的其余部分進行通信,從而發布有關主題的數據樣本。DLRL共享屬性可以是簡單的值或結構,對另一個DLRL對象的引用或這些對象的集合(列表,映射)。DLRL支持復雜的對象圖和DLRL對象之間的復雜關系。
開發人員負責確定DCPS實體如何映射到DLRL對象。使用IDL值類型在OMG接口定義語言(IDL)中指定模型。映射在概念上類似于對象關系數據庫映射,后者將對象模型映射到關系數據庫表。我們認為每個DCPS主題都類似于關系數據庫表,每個樣本都作為該表中的一行。DDS規范具有從DCPS到DLRL的默認映射。或者,開發人員可以選擇通過XML映射文件指定自己的自定義映射。
OpenDDS當前未實現DLRL。
目錄
OpenDDS股票報價器示例
我們的示例說明了通過DDS DCPS層發布和訂閱數據樣本。該示例包含兩個DCPS主題,都與股市有關。
股票報價發布者將股票報價樣本發布給感興趣的訂閱者;每個報價均包含證券的股票代碼,其價值和時間戳。報價在整個交易日中定期發布,因為買賣交易會影響證券的基礎價值。另外,證券交易所事件發布者發布與證券交易所有關的重要事件,即,何時交易所開放,關閉,何時暫停交易或恢復交易。
我們的訂戶同時訂閱股票報價和股票交易所事件。訂戶打印其所看到的每個報價的代碼符號和值。當訂閱者收到表明當天股票交易所已經關閉的事件時,它將正常關閉。因此,“封閉”證券交易所事件的接收是訂戶停止預期股票報價樣本的信號。
我們將演示如何使用相同的發布者和訂閱者代碼通過TCP和UDP傳輸進行通信。傳輸配置隔離在一組配置文件中,使我們無需更改任何代碼即可切換傳輸。
目錄
IDL類型
首先,我們在IDL中定義已發布的DDS數據類型:
#include "orbsvcs/TimeBase.idl" module StockQuoter { #pragma DCPS_DATA_TYPE "StockQuoter::Quote" #pragma DCPS_DATA_KEY "StockQuoter::Quote ticker"struct Quote {string ticker;string exchange;string full_name;double value;TimeBase::TimeT timestamp;};#pragma DCPS_DATA_TYPE "StockQuoter::ExchangeEvent" #pragma DCPS_DATA_KEY "StockQuoter::ExchangeEvent exchange"enum ExchangeEventType { TRADING_OPENED,TRADING_CLOSED,TRADING_SUSPENDED,TRADING_RESUMED };struct ExchangeEvent {string exchange;ExchangeEventType event;TimeBase::TimeT timestamp;}; };我們發布兩種數據類型:每個股票報價的報價類型,以及用于指示何時打開,關閉證券交易所以及何時暫停或恢復交易的ExchangeEvent類型。該DCPS_DATA_TYPE編譯標記的類型與DDS使用。的 DCPS_DATA_KEY每種類型的定義是針對每個唯一標識符 的實例中的數據類型的。我們報價類型的關鍵是股票的股票代碼。一整天,我們希望為每個股票代號發布許多值或樣本。每個股票代號的已發布樣本集屬于同一實例。在我們的示例中,我們將發布兩個股票代號,并因此發布兩個實例:SPY(標準普爾存托憑證,即S&P 500)和MDY(S&P中盤存托憑證,即S&P中盤400)。
接下來,我們使用OpenDDS的opendds_idl編譯器編譯IDL 以生成 類型支持代碼。類型支持代碼包括生成的DCPS 數據寫入器和數據讀取器 C ++類以及其他IDL代碼。DDS使用類型安全的接口進行發布和訂閱。類型安全的接口具有幾個優點:首先,在編譯時更容易捕獲編程錯誤;第二,當在編譯時已知封送數據類型時,可以使生成的封送代碼非常高效。第三,我們可以避免any在數據傳輸中使用低效類型,例如CORBA 。
生成股票報價器的IDL類型的類型支持代碼的命令如下:
$ DDS_ROOT / bin / opendds_idl StockQuoter.idl此命令生成以下文件:
StockQuoterTypeSupport.idl StockQuoterTypeSupportImpl.h StockQuoterTypeSupportImpl.cpp但是,我們不需要opendds_idl手動運行編譯器。稍后,我們將使用Make Project Creator(MPC)項目為我們自動化構建步驟。
接下來,我們使用TAO的IDL編譯器來編譯所有三個IDL文件- StockQuoter.idl我們手動編寫的 文件,以及由生成的類型支持文件opendds_idl。
tao_idl -I $ DDS_ROOT -I $ TAO_ROOT / orbsvcs StockQuoter.idl tao_idl -I $ DDS_ROOT -I $ TAO_ROOT / orbsvcs StockQuoterTypeSupport.idl目錄
發行人
接下來,我們編寫一個發布者,以通過DDS發布股票報價和股票交易所事件。首先,我們包括由opendds_idl編譯器生成的兩個類型支持頭文件。
#include "StockQuoterTypeSupportImpl.h"我們還包括DCPS發布者,服務參與者和QoS標頭文件。
#include "dds/DCPS/Service_Participant.h" #include "dds/DCPS/Marked_Default_Qos.h" #include "dds/DCPS/PublisherImpl.h" #include "ace/streams.h" #include "orbsvcs/Time_Utilities.h"以下常量用于我們的域,類型名稱和主題名稱。每種類型均在單獨的主題上發布。訂戶必須為其域,類型名稱和主題名稱使用相同的值。
// constants for Stock Quoter domain Id, types, and topic DDS::DomainId_t QUOTER_DOMAIN_ID = 1066; const char* QUOTER_QUOTE_TYPE = "Quote Type"; const char* QUOTER_QUOTE_TOPIC = "Stock Quotes"; const char* QUOTER_EXCHANGE_EVENT_TYPE = "Exchange Event Type"; const char* QUOTER_EXCHANGE_EVENT_TOPIC = "Stock Exchange Events";在發布證券交易所事件(即打開,關閉,暫停或恢復)時,我們還將發布該事件適用的證券交易所的名稱。
const char* STOCK_EXCHANGE_NAME = "Test Stock Exchange";這是獲取當前日期和時間的簡單輔助方法。
TimeBase::TimeT get_timestamp() {TimeBase::TimeT retval;ACE_hrtime_t t = ACE_OS::gethrtime ();ORBSVCS_Time::hrtime_to_TimeT (retval, t);return retval; }發布者的源代碼文件的其余部分包含main()。我們輸入發布者的main()
int main (int argc, char *argv[]) {DDS::DomainParticipantFactory_var dpf =DDS::DomainParticipantFactory::_nil();DDS::DomainParticipant_var participant =DDS::DomainParticipant::_nil();try{首先,我們創建一個域參與者。DDS發布者可以在多個獨立域上發布,但是我們的示例僅在一個域上發布。我們使用TheDomainParticipantFactoryWithArgs宏將命令行參數傳遞到DCPS中,并獲得單例域參與者工廠。我們使用域參與者的默認服務質量策略為“ Quote”域創建一個域參與者。QUOTER_DOMAIN_ID傳遞給工廠的值在發布者和訂閱者中必須相同。
// Initialize, and create a DomainParticipant
dpf = TheParticipantFactoryWithArgs(argc, argv);
participant = dpf->create_participant(
QUOTER_DOMAIN_ID,
PARTICIPANT_QOS_DEFAULT,
DDS::DomainParticipantListener::_nil());
if (CORBA::is_nil (participant.in ()))
{
cerr << “create_participant failed.” << endl;
ACE_OS::exit(1);
}
然后,我們通過域參與者使用默認的服務質量值創建發布者。PublisherListener 當某些與發布相關的事件發生時,我們可以附加一個DCPS調用的。但是,我們不在乎那些事件,因此我們附加了一個nil偵聽器。
// Create a publisher for the two topics// (PUBLISHER_QOS_DEFAULT is defined in// Marked_Default_Qos.h)DDS::Publisher_var pub =participant->create_publisher(PUBLISHER_QOS_DEFAULT,DDS::PublisherListener::_nil());if (CORBA::is_nil (pub.in ())){cerr << "create_publisher failed." << endl;ACE_OS::exit(1);}通過DCPS進行發布涉及三個步驟。首先,我們為發布的數據樣本注冊每種類型。我們的示例發布了兩種IDL類型的示例Quote和ExchangeEvent。其次,我們創建一個或多個發布主題。每個主題只能綁定一種類型。因此,我們為兩種類型的每種類型創建一個主題。第三,我們為每個主題創建一個數據編寫器,并通過該數據編寫器發布示例。
我們首先向域參與者注冊IDL Quote類型,并為Quote類型傳遞生成的QuoteTypeSupportImpl類的實例。我們用于報價類型的名稱(存儲在常量值中QUOTER_QUOTE_TYPE)必須與訂閱服務器上使用的名稱匹配。創建主題時,我們指定此類型名稱,從而使DCPS能夠在以后為該主題創建適當類型的數據寫入器。
// Register the Quote typeStockQuoter::QuoteTypeSupport_var quote_servant= new StockQuoter::QuoteTypeSupportImpl();if (DDS::RETCODE_OK !=quote_servant->register_type(participant.in (),QUOTER_QUOTE_TYPE)){cerr << "register_type for " << QUOTER_QUOTE_TYPE<< " failed." << endl;ACE_OS::exit(1);}然后,我們使用生成的ExchangeEventTypeSupportImpl類以相同的方式向域參與者注冊IDL ExchangeEvent類型。我們的DCPS域參與者可以發布有關Quote或ExchangeEvent類型的主題。
// Register the ExchangeEvent typeStockQuoter::ExchangeEventTypeSupport_var exchange_evt_servant= new StockQuoter::ExchangeEventTypeSupportImpl();if (DDS::RETCODE_OK !=exchange_evt_servant->register_type(participant.in (),QUOTER_EXCHANGE_EVENT_TYPE)){cerr << "register_type for "<< QUOTER_EXCHANGE_EVENT_TYPE<< " failed." << endl;ACE_OS::exit(1);}我們為報價樣本創建一個主題,指示報價數據類型的主題名稱和注冊名稱,并使用默認的服務質量設置。同樣,引用主題和類型名稱必須在發布者和訂閱者上匹配。
// Get QoS to use for our two topics
// Could also use TOPIC_QOS_DEFAULT instead
DDS::TopicQos default_topic_qos;
participant->get_default_topic_qos(default_topic_qos);
// Create a topic for the Quote type…
DDS::Topic_var quote_topic =
participant->create_topic (QUOTER_QUOTE_TOPIC,
QUOTER_QUOTE_TYPE,
default_topic_qos,
DDS::TopicListener::_nil());
if (CORBA::is_nil (quote_topic.in ()))
{
cerr << “create_topic for "
<< QUOTER_QUOTE_TOPIC
<< " failed.” << endl;
ACE_OS::exit(1);
}
同樣,我們為ExchangeEvent示例創建一個主題,指示主題名稱和ExchangeEvent類型的注冊名稱,并使用默認的服務質量設置。同樣,證券交易所事件主題和類型名稱必須在發布者和訂閱者上匹配。
// .. and another topic for the Exchange Event typeDDS::Topic_var exchange_evt_topic =participant->create_topic (QUOTER_EXCHANGE_EVENT_TOPIC,QUOTER_EXCHANGE_EVENT_TYPE,default_topic_qos,DDS::TopicListener::_nil());if (CORBA::is_nil (exchange_evt_topic.in ())){cerr << "create_topic for "<< QUOTER_EXCHANGE_EVENT_TOPIC<< " failed."<< endl;ACE_OS::exit(1);}我們創建了兩個數據編寫器,每個主題一個。我們傳入上面創建的主題;該主題知道其類型。每個數據編寫者都與一個發布者正好相關聯,并就一個主題進行發布。后來,我們的發布者通過將數據樣本寫入每個數據寫入器來發布每個主題。以下代碼為“股票報價”主題創建數據編寫器。
// Get QoS to use for our two DataWriters// Could also use DATAWRITER_QOS_DEFAULTDDS::DataWriterQos dw_default_qos;pub->get_default_datawriter_qos (dw_default_qos);// Create a DataWriter for the Quote topicDDS::DataWriter_var quote_base_dw =pub->create_datawriter(quote_topic.in (),dw_default_qos,DDS::DataWriterListener::_nil());if (CORBA::is_nil (quote_base_dw.in ())){cerr << "create_datawriter for "<< QUOTER_QUOTE_TOPIC<< " failed." << endl;ACE_OS::exit(1);}StockQuoter::QuoteDataWriter_var quote_dw= StockQuoter::QuoteDataWriter::_narrow(quote_base_dw.in());if (CORBA::is_nil (quote_dw.in ())){cerr << "QuoteDataWriter could not be narrowed"<< endl;ACE_OS::exit(1);}然后,我們為“證券交易所事件”主題創建一個數據編寫器。同樣,我們傳入上面創建的主題,該主題知道其類型。
// Create a DataWriter for the Exchange Event topicDDS::DataWriter_var exchange_evt_base_dw =pub->create_datawriter(exchange_evt_topic.in (),dw_default_qos,DDS::DataWriterListener::_nil());if (CORBA::is_nil (exchange_evt_base_dw.in ())){cerr << "create_datawriter for "<< QUOTER_EXCHANGE_EVENT_TOPIC<< " failed." << endl;ACE_OS::exit(1);}StockQuoter::ExchangeEventDataWriter_var exchange_evt_dw =StockQuoter::ExchangeEventDataWriter::_narrow(exchange_evt_base_dw.in());if (CORBA::is_nil (exchange_evt_dw.in ())){cerr << "ExchangeEventDataWriter could not "<< "be narrowed"<< endl;ACE_OS::exit(1);}我們可以選擇注冊每個數據實例。注冊每個數據實例將在編寫該實例的樣本時稍微改善延遲。
發布者可以在每個數據實例上發布許多數據樣本。數據實例由唯一鍵標識。對于Quote類型,我們將ticker 其標識為IDL類型定義中的關鍵字段。具有相同鍵值的每個Quote數據樣本均被視為同一數據實例的一部分。換句話說,在股票代碼“ SPY”上發布的每個Quote示例都是同一實例的一部分。
我們有兩個Quote實例,分別是股票代碼“ SPY”(標準普爾存托憑證,即S&P 500)和“ MDY”(標準普爾中型存托憑證,即S&P Midcap 400),以及一個ExchangeEvent實例,用于“測試證券交易所”。 ”。我們向適當的數據寫入器注冊每個實例。實際調用了注冊方法,_cxx_register因為它register是C ++中的保留字。
// Register the Exchange Event and the two// Quoted securities (SPY and MDY) with the// appropriate data writerStockQuoter::Quote spy;spy.ticker = CORBA::string_dup("SPY");DDS::InstanceHandle_t spy_handle =quote_dw->_cxx_register(spy);StockQuoter::Quote mdy;mdy.ticker = CORBA::string_dup("MDY");DDS::InstanceHandle_t mdy_handle =quote_dw->_cxx_register(mdy);StockQuoter::ExchangeEvent ex_evt;ex_evt.exchange = STOCK_EXCHANGE_NAME;DDS::InstanceHandle_t exchange_handle =exchange_evt_dw->_cxx_register(ex_evt);最后,我們發布。首先,我們發布TRADING_OPENED有關“證券交易所事件”主題的事件。
// Publish...StockQuoter::ExchangeEvent opened;opened.exchange = STOCK_EXCHANGE_NAME;opened.event = StockQuoter::TRADING_OPENED;opened.timestamp = get_timestamp();cout << "Publishing TRADING_OPENED" << endl;DDS::ReturnCode_t ret =exchange_evt_dw->write(opened, exchange_handle);if (ret != DDS::RETCODE_OK){ACE_ERROR ((LM_ERROR,ACE_TEXT("(%P|%t)ERROR: OPEN write returned %d.\n"),ret));}然后,我們在“股票報價”主題上發布“ SPY”和“ MDY”實例的幾個股票報價數據樣本。我們簡單地循環,每次增加一點報價符號的報價,以模擬在真正美好的一天中的活躍交易。
ACE_Time_Value quarterSecond( 0, 250000 );
for ( int i = 0; i < 20; ++i )
{
//
// SPY
//
StockQuoter::Quote spy_quote;
spy_quote.exchange = STOCK_EXCHANGE_NAME;
spy_quote.ticker = CORBA::string_dup(“SPY”);
spy_quote.full_name =
CORBA::string_dup(“S&P Depository Receipts”);
spy_quote.value = 1200.0 + 10.0*i;
spy_quote.timestamp = get_timestamp();
cout << "Publishing SPY Quote: "
<< spy_quote.value << endl;
ret = quote_dw->write(spy_quote, spy_handle);
if (ret != DDS::RETCODE_OK)
{
ACE_ERROR ((
LM_ERROR,
ACE_TEXT("(%P|%t)ERROR: SPY write returned %d.\n"),
ret));
}
ACE_OS::sleep( quarterSecond );
//
// MDY
//
StockQuoter::Quote mdy_quote;
mdy_quote.exchange = STOCK_EXCHANGE_NAME;
mdy_quote.ticker = CORBA::string_dup(“MDY”);
mdy_quote.full_name =
CORBA::string_dup(“S&P Midcap Depository Receipts”);
mdy_quote.value = 1400.0 + 10.0*i;
mdy_quote.timestamp = get_timestamp();
cout << "Publishing MDY Quote: "
<< mdy_quote.value << endl;
ret = quote_dw->write(mdy_quote, mdy_handle);
if (ret != DDS::RETCODE_OK)
{
ACE_ERROR ((
LM_ERROR,
ACE_TEXT("(%P|%t)ERROR: MDY write returned %d.\n"),
ret));
}
ACE_OS::sleep( quarterSecond );
}
最后,我們TRADING_CLOSED在“證券交易所事件”主題上發布事件,以指示該證券交易所當天關閉。
```cppStockQuoter::ExchangeEvent closed;closed.exchange = STOCK_EXCHANGE_NAME;closed.event = StockQuoter::TRADING_CLOSED;closed.timestamp = get_timestamp();cout << "Publishing TRADING_CLOSED" << endl;ret = exchange_evt_dw->write(closed, exchange_handle);if (ret != DDS::RETCODE_OK){ACE_ERROR ((LM_ERROR,ACE_TEXT("(%P|%t)ERROR: CLOSED write returned %d.\n"),ret));}cout << "Exiting..." << endl;} catch (CORBA::Exception& e) {cerr << "Exception caught in main.cpp:" << endl<< e << endl;ACE_OS::exit(1);} 最后,我們在離開之前先清理自己。// Cleanuptry {if (!CORBA::is_nil (participant.in ())) {participant->delete_contained_entities();}if (!CORBA::is_nil (dpf.in ())) {dpf->delete_participant(participant.in ());}} catch (CORBA::Exception& e) {cerr << "Exception caught in cleanup."<< endl<< e << endl;ACE_OS::exit(1);}TheServiceParticipant->shutdown ();return 0; } 這就完成了發布者的C ++代碼。目錄## 訂戶我們的訂戶訂閱股票報價和證券交易所事件,從發布者那里接收數據樣本。我們使用發布者的`TRADING_CLOSED`事件來表示當天的交易已經結束,從而觸發了訂閱者的正常關閉。訂戶中的許多代碼與發布者中的代碼相似。我們以與發布者中相同的方式獲得域參與者,注冊類型等。主要區別在于訂閱者是被動的,等待接收樣本,而發布者是主動的。訂閱服務器使用偵聽器對象從發布服務器接收樣本。首先,我們包括由dcps_ts.pl 腳本生成的兩個類型支持頭文件。我們還將這些文件包含在發布者中。但是,我們還包括兩個偵聽器頭文件,每種發布類型一個。當在相關主題上發布數據樣本時,DDS會調用偵聽器。```cpp #include "StockQuoterTypeSupportImpl.h" #include "ExchangeEventDataReaderListenerImpl.h"我們還包括DCPS訂戶,服務參與者和QoS標頭文件。
#include "dds/DCPS/Service_Participant.h" #include "dds/DCPS/Marked_Default_Qos.h" #include "dds/DCPS/SubscriberImpl.h" #include "dds/DCPS/BuiltinTopicUtils.h" #include "ace/streams.h" #include "orbsvcs/Time_Utilities.h"以下常量用于我們的域,類型名稱和主題名稱。這些名稱必須與發布者使用的域,類型名稱和主題名稱匹配。
// constants for Stock Quoter domain Id, types, and topic // (same as publisher) DDS::DomainId_t QUOTER_DOMAIN_ID = 1066; const char* QUOTER_QUOTE_TYPE = "Quote Type"; const char* QUOTER_QUOTE_TOPIC = "Stock Quotes"; const char* QUOTER_EXCHANGE_EVENT_TYPE = "Exchange Event Type"; const char* QUOTER_EXCHANGE_EVENT_TOPIC = "Stock Exchange Events";訂戶的源代碼文件的其余部分包含main()。我們輸入訂戶的main()。
int main (int argc, char *argv[]) {DDS::DomainParticipantFactory_var dpf =DDS::DomainParticipantFactory::_nil();DDS::DomainParticipant_var participant =DDS::DomainParticipant::_nil();try {就像在發布者中一樣,我們創建域參與者。指定的域與發布者的域匹配。
// Initialize, and create a DomainParticipant// (same code as publisher)dpf = TheParticipantFactoryWithArgs(argc, argv);participant = dpf->create_participant(QUOTER_DOMAIN_ID,PARTICIPANT_QOS_DEFAULT,DDS::DomainParticipantListener::_nil());if (CORBA::is_nil (participant.in ())){cerr << "create_participant failed." << endl;ACE_OS::exit(1);}然后,我們通過域參與者使用默認的服務質量策略值創建一個訂戶。SubscriberListener 當某些與訂閱相關的事件發生時,我們可以附加一個DCPS調用的。但是,我們不在乎那些事件,因此我們將其設置為零。這幾乎與我們致電時在發布商中所做的相同create_publisher。
// Create a subscriber for the two topics
// (SUBSCRIBER_QOS_DEFAULT is defined
// in Marked_Default_Qos.h)
DDS::Subscriber_var sub =
participant->create_subscriber(
SUBSCRIBER_QOS_DEFAULT,
DDS::SubscriberListener::_nil());
if (CORBA::is_nil (sub.in ()))
{
cerr << “create_subscriber failed.” << endl;
ACE_OS::exit(1);
}
與發布者一樣,我們必須向域參與者注冊IDL Quote和ExchangeEvent類型,才能訂閱有關這些類型的主題。
// Register the Quote type
// (same code as publisher)
StockQuoter::QuoteTypeSupport_var quote_servant
= new StockQuoter::QuoteTypeSupportImpl();
if (DDS::RETCODE_OK !=
quote_servant->register_type(participant.in (),
QUOTER_QUOTE_TYPE))
{
cerr << “register_type for " << QUOTER_QUOTE_TYPE
<< " failed.” << endl;
ACE_OS::exit(1);
}
// Register the ExchangeEvent type
// (same code as publisher)
StockQuoter::ExchangeEventTypeSupport_var exchange_evt_servant
= new StockQuoter::ExchangeEventTypeSupportImpl();
if (DDS::RETCODE_OK !=
exchange_evt_servant->register_type(
participant.in (),
QUOTER_EXCHANGE_EVENT_TYPE))
{
cerr << “register_type for "
<< QUOTER_EXCHANGE_EVENT_TYPE
<< " failed.” << endl;
ACE_OS::exit(1);
}
與發布者一樣,我們為股票報價創建一個主題,指示主題名稱和報價類型的注冊名稱,并使用默認的服務質量設置。同樣,股票報價主題名稱必須在發布者和訂閱者上匹配。
// Get QoS to use for our two topics
// Could also use TOPIC_QOS_DEFAULT instead
// (same code as publisher)
DDS::TopicQos default_topic_qos;
participant->get_default_topic_qos(default_topic_qos);
// Create a topic for the Quote type…
// (same code as publisher)
DDS::Topic_var quote_topic =
participant->create_topic (QUOTER_QUOTE_TOPIC,
QUOTER_QUOTE_TYPE,
default_topic_qos,
DDS::TopicListener::_nil());
if (CORBA::is_nil (quote_topic.in ()))
{
cerr << “create_topic for "
<< QUOTER_QUOTE_TOPIC
<< " failed.” << endl;
ACE_OS::exit(1);
}
同樣,我們為ExchangeEvent示例創建一個主題,指示主題名稱和ExchangeEvent類型的注冊名稱,并使用默認的服務質量設置。同樣,證券交易所事件主題名稱必須在發布者和訂閱者上匹配。
// .. and another topic for the Exchange Event type// (same code as publisher)DDS::Topic_var exchange_evt_topic =participant->create_topic (QUOTER_EXCHANGE_EVENT_TOPIC,QUOTER_EXCHANGE_EVENT_TYPE,default_topic_qos,DDS::TopicListener::_nil());if (CORBA::is_nil (exchange_evt_topic.in ())){cerr << "create_topic for "<< QUOTER_EXCHANGE_EVENT_TOPIC<< " failed."<< endl;ACE_OS::exit(1);}在發布者上,我們創建了兩個數據編寫器,每個主題一個。在訂戶上,我們將創建兩個數據讀取器,每個主題一個。每個數據讀取器只有一個訂戶,并訂閱一個主題。我們還在每個數據讀取器上附加了一個偵聽器,以接收發布的數據樣本的通知。這是發布者和訂閱者代碼不同的地方。
以下代碼為“股票報價”主題創建一個偵聽器。偵聽器是一個本地CORBA對象,實現了DDS::DataReaderListenerIDL接口。我們使用OpenDDS的便捷servant_to_reference功能模板來獲取接口類型的引用。
// Create DataReaders and DataReaderListeners for the// Quote and ExchangeEvent// Create a Quote listenerQuoteDataReaderListenerImpl quote_listener_servant;DDS::DataReaderListener_var quote_listener =::OpenDDS::DCPS::servant_to_reference("e_listener_servant);if (CORBA::is_nil (quote_listener.in ())){cerr << "Quote listener is nil." << endl;ACE_OS::exit(1);}我們為“證券交易所事件”主題創建第二個偵聽器。
// Create an ExchangeEvent listener ExchangeEventDataReaderListenerImpl exchange_evt_listener_servant;DDS::DataReaderListener_var exchange_evt_listener =::OpenDDS::DCPS::servant_to_reference(&exchange_evt_listener_servant);if (CORBA::is_nil (exchange_evt_listener.in ())) {cerr << "ExchangeEvent listener is nil." << endl;ACE_OS::exit(1); }最后,我們為兩個主題中的每個主題創建一個數據讀取器。首先,我們為“股票報價”主題創建一個數據讀取器,并附加上面創建的相關偵聽器。
// Create the Quote DataReader// Get the default QoS// Could also use DATAREADER_QOS_DEFAULTDDS::DataReaderQos dr_default_qos;sub->get_default_datareader_qos (dr_default_qos);DDS::DataReader_var quote_dr =sub->create_datareader(quote_topic.in (),dr_default_qos,quote_listener.in ());然后,我們為“證券交易所事件”主題創建一個數據讀取器,并附加上面創建的另一個偵聽器。
// Create the ExchangeEvent DataReader
DDS::DataReader_var exchange_evt_dr =
sub->create_datareader(exchange_evt_topic.in (),
dr_default_qos,
exchange_evt_listener.in ());
OpenDDS產生它自己的線程來處理來自發布者的傳入事件。因此,訂戶中沒有事件循環代碼。但是,在準備關閉整個訂戶進程之前,我們必須確保不允許主線程退出。因此,我們循環處理股票報價和證券交易所事件,直到TRADING_CLOSED 在“證券交易所事件”主題上收到該事件為止。本質上,我們希望接收已發布的數據樣本,直到證券交易所告訴我們它已經關閉。該sleep調用使我們每秒檢查一次,以避免消耗過多的CPU。
// Wait for events from the Publisher; shut
// down when “close” received
cout << “Subscriber: waiting for events” << endl;
while ( ! exchange_evt_listener_servant.
is_exchange_closed_received() )
{
ACE_OS::sleep(1);
}
收到TRADING_CLOSED事件后,我們可以正常退出循環。
cout << "Received CLOSED event from publisher; "<< " exiting..."<< endl;} catch (CORBA::Exception& e) {cerr << "Exception caught in main.cpp:" << endl<< e << endl;ACE_OS::exit(1);}最后,我們在離開之前先清理自己。
// Cleanuptry {if (!CORBA::is_nil (participant.in ())) {participant->delete_contained_entities();}if (!CORBA::is_nil (dpf.in ())) {dpf->delete_participant(participant.in ());}} catch (CORBA::Exception& e) {cerr << "Exception caught in cleanup."<< endl<< e << endl;ACE_OS::exit(1);}TheServiceParticipant->shutdown ();return 0; }目錄
訂閱者的“股票報價”和“證券交易所事件”偵聽器
“股票行情”數據閱讀器和“股票交易所事件”數據閱讀器均附帶一個偵聽器。每當從發布者接收到數據樣本時,DDS框架就會調用這些偵聽器。我們已決定兩個數據讀取器中的每一個都應具有自己的偵聽器,盡管如果我們對偵聽器進行編碼以處理兩種數據類型,則可以為兩個數據讀取器使用單個偵聽器。
每個偵聽器都實現DDS::DataReaderListenerIDL接口。我們在上面的訂戶代碼中同時使用了a QuoteDataReaderListenerImpl和an ExchangeEventDataReaderListenerImpl,但尚未定義這些類。我們現在將這樣做。
首先,我們為Quote類型的數據讀取器編寫一個偵聽器。該偵聽器類實現DDS::DataReaderListener IDL接口,該接口重寫了七個純虛方法。它是IDL接口的CORBA本地對象實現,繼承自IDL接口的生成類。
偵聽器類必須重寫所有七個方法,包括偵聽器的實現為空的方法。但是,為簡單起見,我們將僅顯示該on_data_available 方法,當有新的Quote數據樣本可用時將調用該方法。其他六個方法具有空的實現。我們還將使用默認的構造函數和析構函數。
#include "StockQuoterTypeSupportC.h" #include "StockQuoterTypeSupportImpl.h" #include "dds/DCPS/Service_Participant.h" #include "dds/DdsDcpsSubscriptionS.h" #include "ace/streams.h"class QuoteDataReaderListenerImpl: public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener> { public:// DDS calls on_data_available on the listener for each// received Quote sample.virtual void on_data_available(DDS::DataReader_ptr reader)throw (CORBA::SystemException){try{我們首先將數據讀取器參數的值縮小為Quote樣本的適當類型。
StockQuoter::QuoteDataReader_var quote_dr =StockQuoter::QuoteDataReader::_narrow(reader);if (CORBA::is_nil (quote_dr.in ())){cerr << "QuoteDataReaderListenerImpl:: "<< "on_data_available:"<< " _narrow failed." << endl;ACE_OS::exit(1);}然后,我們從數據讀取器中獲取下一個Quote示例。請注意QuoteDataReader接口的類型安全。
StockQuoter::Quote quote;DDS::SampleInfo si;DDS::ReturnCode_t status =quote_dr->take_next_sample(quote, si) ;收到報價樣本后,我們只需打印其內容即可。
if (status == DDS::RETCODE_OK) {cout << "Quote: ticker = " << quote.ticker.in()<< endl<< " exchange = " << quote.exchange.in()<< endl<< " full name = " << quote.full_name.in()<< endl<< " value = " << quote.value<< endl<< " timestamp = " << quote.timestamp<< endl;cout << "SampleInfo.sample_rank = "<< si.sample_rank << endl;}else if (status == DDS::RETCODE_NO_DATA){cerr << "ERROR: reader received DDS::RETCODE_NO_DATA!"<< endl;}else{cerr << "ERROR: read Quote: Error: "<< status << endl;}當報價樣本超出范圍時,堆棧將清除報價樣本的內存。
} catch (CORBA::Exception& e) {cerr << "Exception caught in read:"<< endl << e << endl;ACE_OS::exit(1);}}我們沒有在DDS::DataReaderListener接口中顯示其他方法的實現,但是即使它們的實現為空,我們也必須重寫它們。
// must also override:// on_requested_deadline_missed// on_requested_incompatible_qos// on_liveliness_changed// on_subscription_match// on_sample_rejected// on_sample_lost };接下來,我們為ExchangeEvent類型的數據讀取器編寫一個偵聽器。基本結構與相同QuoteDataReaderListenerImpl。
#include "ExchangeEventDataReaderListenerImpl.h" #include "StockQuoterTypeSupportImpl.h" #include "dds/DCPS/Service_Participant.h" #include "dds/DdsDcpsSubscriptionS.h" #include "ace/streams.h" #include "ace/Synch.h"class ExchangeEventDataReaderListenerImpl: public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener> { public:我們將該is_exchange_closed_received方法添加到數據讀取器中,以便訂戶的主程序可以找出何時TRADING_CLOSED 收到證券交易所事件。此方法在互斥鎖的保護下檢查布爾值。收到股票交易事件on_data_available時,布爾值由偵聽器的方法設置TRADING_CLOSED。
// app-specificCORBA::Boolean is_exchange_closed_received(){ACE_Guard<ACE_Mutex> guard(this->lock_);return this->is_exchange_closed_received_;}DDS on_data_available在偵聽器上為每個接收到的ExchangeEvent示例進行調用。
virtual void on_data_available(DDS::DataReader_ptr reader)throw (CORBA::SystemException){try{與中的一樣QuoteDataReaderListenerImpl,我們首先將數據讀取器參數的值縮小為適當的類型,在這種情況下為ExchangeEventDataReader。
StockQuoter::ExchangeEventDataReader_var exchange_evt_dr =
StockQuoter::ExchangeEventDataReader::_narrow(reader);
if (CORBA::is_nil (exchange_evt_dr.in ())) {
cerr << “ExchangeEventDataReaderListenerImpl:: "
<< “on_data_available:”
<< " _narrow failed.”
<< endl;
ACE_OS::exit(1);
}
然后,我們從數據讀取器中獲取下一個ExchangeEvent示例。注意類型安全。
StockQuoter::ExchangeEvent exchange_evt;
DDS::SampleInfo si;
DDS::ReturnCode_t status =
exchange_evt_dr->take_next_sample(exchange_evt, si) ;
收到ExchangeEvent示例后,我們只需打印其內容即可。
if (status == DDS::RETCODE_OK) {
cout << "ExchangeEvent: exchange = "
<< exchange_evt.exchange.in() << endl;
收到TRADING_CLOSED事件后,我們將設置一個標志,指示當天該證券交易所已經關閉。
case StockQuoter::TRADING_CLOSED: {
cout << “TRADING_CLOSED” << endl;
}
else if (status == DDS::RETCODE_NO_DATA)
{
cerr << "ERROR: reader received "
<< “DDS::RETCODE_NO_DATA!”
<< endl;
}
else
{
cerr << "ERROR: read ExchangeEvent: Error: "
<< status
<< endl;
}
超出范圍時,堆棧會清除ExchangeEvent示例。
} catch (CORBA::Exception& e) {cerr << "Exception caught in read:" << endl<< e << endl;ACE_OS::exit(1);}}// must also override:// on_requested_deadline_missed// on_requested_incompatible_qos// on_liveliness_changed// on_subscription_match// on_sample_rejected// on_sample_lost我們添加了兩個私有類屬性,以跟蹤TRADING_CLOSED事件并使用鎖保護該值。
private:CORBA::Boolean is_exchange_closed_received_;ACE_Mutex lock_; };這樣就完成了訂戶的C ++代碼。
目錄
建立發布者和訂閱者
我們使用MPC,即Make Project Creator,為發布者和訂閱者生成構建文件。MPC提供了一種簡單的語法,并且能夠為GNU Make,Visual C ++和許多其他構建系統生成構建文件。有關MPC的更多信息,請參見OCI的MPC頁面,網址為http://www.objectcomputing.com/products/mpc。
我們創建兩個文件來構建我們的股票報價器,一個工作區文件和一個項目文件。我們的工作區文件只是告訴MPC在哪里可以找到MPC dcps和dcpsexe基礎項目文件,我們將在以后使用它們。
// // file StockQuoter.mwc //workspace {cmdline += -relative DDS_ROOT=$DDS_ROOT }接下來,我們創建一個包含三個項目的MPC文件-一個包含IDL和TypeSupport文件的Common項目,一個Publisher,一個Subscriber。這三個項目中的每一個都繼承自dcps或的dcpsexe基礎項目(位于)$DDS_ROOT。首先,我們創建一個名為StockQuoterCommon的庫來保存由TAO IDL和opendds_idl編譯器生成的代碼。
// // file StockQuoter.mpc //project(*Common) : dcps {sharedname = StockQuoterCommonlibout = .includes += $(TAO_ROOT)/orbsvcsidlflags += -I$(TAO_ROOT)/orbsvcsidlflags += -Wb,export_macro=StockQuoterCommon_Exportidlflags += -Wb,export_include=StockQuoterCommon_Export.hdcps_ts_flags += --export=StockQuoterCommon_Exportdynamicflags = STOCKQUOTERCOMMON_BUILD_DLL一個dcps項目有一個新的部分TypeSupport_Files。本部分執行opendds_idl 腳本以從我們的DDS數據類型生成TypeSupport文件。在這里,我們指示包含DDS數據類型的IDL文件,并指示從中生成的TypeSupport文件。
TypeSupport_Files {StockQuoter.idl}我們的IDL_Files部分包含原始IDL文件以及上一部分生成的TypeSupport IDL文件。
IDL_Files {StockQuoterTypeSupport.idlStockQuoter.idl}在Header_Files和Source_Files部分包含opendds_idl-生成TypeSupport實現文件。MPC會自動添加生成的IDL存根和框架,因此我們不需要手動添加它們。
Header_Files {StockQuoterTypeSupportImpl.h}Source_Files {StockQuoterTypeSupportImpl.cpp} }我們的發布者從上方使用StockQuoterCommon庫,并添加publisher.cpp包含發布者的源文件main()。
project(*Publisher) : dcpsexe, svc_utils {after += *Commonexename = publisherincludes += $(TAO_ROOT)/orbsvcslibs += StockQuoterCommondynamicflags = STOCKQUOTERCOMMON_HAS_DLLTypeSupport_Files {}IDL_Files {}Header_Files {}Source_Files {publisher.cpp}Documentation_Files {README.txtdomain_ids} }我們的訂戶還使用StockQuoterCommon庫,添加了一個subscriber.cpp包含訂戶的main()和兩個偵聽器的源文件。
project(*Subscriber) : dcpsexe {after += *Commonexename = subscriberincludes += $(TAO_ROOT)/orbsvcslibs += StockQuoterCommondynamicflags = STOCKQUOTERCOMMON_HAS_DLLTypeSupport_Files {}IDL_Files {}Header_Files {QuoteDataReaderListenerImpl.h}Source_Files {QuoteDataReaderListenerImpl.cppsubscriber.cpp}Documentation_Files {README.txtdomain_ids} }我們使用此MPC文件為我們的構建系統生成構建文件。例如,要生成GNU Makefile,我們執行
$ ACE_ROOT / bin / mwc.pl -type gnuace StockQuoter.mwc為了生成Visual C ++ 7.1解決方案文件,我們執行
perl%ACE_ROOT%/ bin / mwc.pl -type vc71 StockQuoter.mwc然后,我們構建項目。
目錄
配置股票報價器
OpenDDS包含基于文件的配置機制。有了它,OpenDDS用戶可以配置發布者或訂閱者的傳輸,DCPSInfoRepo過程的位置以及許多其他設置。配置文件的語法類似于Windows INI文件的語法。它包含幾個部分,而這些部分又包含類似屬性的條目。基本語法如下:
[section1-name] Attribute1=value1 Attribute2=value2[section2-name] Attribute1=value1 Attribute2=value2《OpenDDS開發人員指南》的“配置”一章中介紹了完整的配置設置集。
我們基于TCP的示例dds_tcp_conf.ini對發布者和訂閱者都使用一個配置文件:
dds_tcp_conf.ini # [common] # Debug Level DCPSDebugLevel=0 # IOR of DCPSInfoRepo process. DCPSInfoRepo=corbaloc::localhost:12345/DCPSInfoRepo # Sets the global transport configuration (used by default in the # process to config1, defined below DCPSGlobalTransportConfig=config1 # Transport configuration named config1, contains a single transport # instance named tcp1 (defined below) [config/config1] transports=tcp1 # Transport instance named tcp1, of type "tcp". Uses defaults for # all configuration paramaters. [transport/tcp1] transport_type=tcp請注意,有三個部分,[common],[config/config1],和[transport/tcp1]。本[common] 節包含適用于整個過程的配置值。在此配置文件中,我們指定調試級別,該DCPSInfoRepo過程的對象引用以及全局傳輸配置。在這里,我們的DCPSInfoRepo 進程正在監聽回送(127.0.0.1)接口,這意味著我們已將其配置為僅對在同一主機上運行的DDS進程可用。要使其在網絡上可用,請使用IP地址或網絡主機名代替localhost。我們已經指定config1 作為我們的全局傳輸配置,這意味著具有該名稱的傳輸配置將被我們過程中所有未明確指定其他傳輸配置的讀取器和寫入器使用。
本[config/config1]節定義了名稱為的傳輸配置config1。該transports選項指定 tcp1為此配置中包括的唯一傳輸實例。
本[transport/tcp1]節定義了一個名為的傳輸實例, tcp1并將其傳輸類型指定為tcp。如OpenDDS文檔中所述,此部分還可以用于通過許多配置選項來配置傳輸。
目錄
通過TCP傳輸運行股票報價器
要運行該示例,我們必須啟動一個DCPSInfoRepo過程,并至少啟動一個發布者和一個訂閱者。要啟動DCPSInfoRepo,我們使用以下命令行:
$ DDS_ROOT / bin / DCPSInfoRepo -ORBListenEndpoints iiop:// localhost:12345我們的DCPSInfoRepo進程偵聽端口12345。該端口與我們在DCPSInfoRepo上面的傳輸配置文件中的對象引用中指定的端口匹配。此DCPSInfoRepo 進程正在監聽回送(127.0.0.1)接口,這意味著我們已將其配置為僅對在同一主機上運行的DDS進程可用。同樣,要使其在網絡上可用,請使用IP地址或網絡主機名代替localhost。
我們有兩個訂閱者和一個發布者:
訂戶-DCPSConfigFile dds_tcp_conf.ini訂戶-DCPSConfigFile dds_tcp_conf.ini發布者-DCPSConfigFile dds_tcp_conf.ini我們使用-DCPSConfigFile命令行參數來指示我們在上面創建的配置文件的名稱。請注意,每個訂閱者和發布者都使用相同的傳輸配置文件。
上面的命令行用于運行DCPSInfoRepo,帶有內置主題的發布者和訂閱者,默認情況下是內置主題。我們也可以關閉內置主題來運行這些過程。該-NOBITS 所使用的DCPSInfoRepo關閉內置的話題和"-DCPSBit 0" 所使用的其它應用程序DDS。命令行如下:
$ DDS_ROOT / bin / DCPSInfoRepo -NOBITS -ORBListenEndpoints iiop:// localhost:12345訂戶-DCPS位0 -DCPSConfigFile dds_tcp_conf.ini訂戶-DCPS位0 -DCPSConfigFile dds_tcp_conf.ini發布者-DCPSBit 0 -DCPSConfigFile dds_tcp_conf.ini發行者為SPY和MDY股票代號發布20個股票報價,每個訂閱者都收到它們。發布者完成后,它將發布“ TRADING_CLOSED”消息,這將導致訂閱者退出。
目錄
通過UDP傳輸運行股票報價器
我們可以使用相同的代碼庫,通過簡單地運行配置文件來運行UDP傳輸上的示例,該配置文件定義了一個指定UDP傳輸實例的全局傳輸配置。
這是dds_udp_conf.ini文件:
dds_udp_conf.ini # [common] # Debug Level DCPSDebugLevel=0 # IOR of DCPSInfoRepo process. DCPSInfoRepo=corbaloc::localhost:12345/DCPSInfoRepo # Sets the global transport configuration (used by default in the # process to config1, defined below DCPSGlobalTransportConfig=config1 # Transport configuration named config1, contains a single transport # instance named udp1 (defined below) [config/config1] transports=udp1 # Transport instance named udp1, of type "udp". Uses defaults for # all configuration paramaters.[transport/udp1] transport_type=udp然后,我們DCPSInfoRepo像以前一樣開始該過程:
$ DDS_ROOT / bin / DCPSInfoRepo -ORBListenEndpoints iiop:// localhost:12345我們使用新的傳輸配置文件啟動兩個訂戶和發布者:
訂戶-DCPSConfigFile dds_udp_conf.ini訂戶-DCPSConfigFile dds_udp_conf.ini發布者-DCPSConfigFile dds_udp_conf.ini我們還可以在關閉內置主題的情況下運行每個流程。命令行如下:
$ DDS_ROOT / bin / DCPSInfoRepo -NOBITS -ORBListenEndpoints iiop:// localhost:12345訂戶-DCPS位0 -DCPSConfigFile dds_udp_conf.ini訂戶-DCPS位0 -DCPSConfigFile dds_udp_conf.ini發布者-DCPSBit 0 -DCPSConfigFile dds_udp_conf.ini和以前一樣,發布者為SPY和MDY報價器符號發布20個股票報價,每個訂閱者都收到它們。發布者完成后,它將再次發布“ TRADING_CLOSED”消息,這將導致訂閱者退出。唯一的區別是我們用UDP傳輸代替了TCP傳輸。更改運輸方式無需更改代碼。
目錄
摘要
用于實時系統 的OMG數據分發服務(DDS) 是針對高性能,類型安全,發布和訂閱通信中間件的規范。DDS解決了以數據為中心的應用程序,即那些對應用程序數據進行分發非常重要的應用程序。
OpenDDS是OMG數據分發服務規范的一個開源實現,為用戶提供了一個有效的發布和訂閱框架,并具有開源軟件開發模型的優點。
OpenDDS包含基于文件的配置機制。通過配置文件,OpenDDS用戶可以配置發布者或訂閱者的傳輸,調試輸出,內存分配,DCPSInfoRepo 代理進程的位置以及許多其他設置。在示例中我們已經顯示,可以在不進行任何代碼更改的情況下換出OpenDDS應用程序的基礎傳輸。
參考資料
示例代碼位于示例/ DCPS / IntroductionToOpenDDS的OpenDDS源代碼分發中
用于實時系統的OMG數據分發服務(DDS)(https://www.omg.org/spec/DDS/)
OMG DDS門戶(https://www.omg.org/spec/DDS/)
OpenDDS主頁(http://www.opendds.org)
TAO開發人員指南主頁(http://www.theaceorb.com/product/index.html
《 OpenDDS開發人員指南》(http://download.objectcomputing.com/OpenDDS/OpenDDS-latest.pdf)
MPC(https://github.com/objectcomputing/MPC)
OpenDDS官網文獻:https://opendds.org/about/articles/Article-Intro.html
總結
- 上一篇: 【转发】响应式Web设计?怎样进行?
- 下一篇: Android新闻案例clientser