OpenDDS应用开发步骤
OpenDDS應用開發步驟
文章目錄
- OpenDDS應用開發步驟
- 1. 定義數據類型
- 2. 處理IDL
- 3. 編寫消息發布者
- 3.1 初始化參與者
- 3.2 注冊數據類型和參與主題
- 3.3 創建發布者
- 3.4 創建數據寫者和等待訂閱者
- 3.5 發布數據樣本
- 4. 實現訂閱者
- 4.1 初始化參與者
- 4.2 注冊數據類型和創建主題
- 4.3 創建訂閱者
- 4.4 創建DataReader和Listener
- 5. DataReaderListener實現
- 6. 在OpenDDS客戶端清理
- 7. 運行示例
- 參考資料
個人學習記錄,僅梳理主要流程,無實際操作參考價值。
1. 定義數據類型
通過利用IDL,對每個DDS所使用的數據類型進行定義。OpenDDS使用#pragma指令,識別DDS傳輸以及處理的數據類型。這些數據由TAO IDL編譯程序以及OpenDDS IDL編譯程序進行處理,用于生成需要的代碼,以便于利用OpenDDS傳輸這些類型的數據。示例如下:
module Supermarket {#pragma DCPS_DATA_TYPE "Supermarket::UpdateInf"#pragma DCPS_DATA_KEY "Supermarket::UpdateInf UpdateInfID"struct UpdateInf {long UpdateInfID;string UpdateInfTopic;string UpdateInfMessage;string PublisherName;string MTime;long ClockTime;};#pragma DCPS_DATA_TYPE "Supermarket::DefaultInf"#pragma DCPS_DATA_KEY "Supermarket::DefaultInf DefaultInfID"struct DefaultInf {long DefaultInfID;string DefaultInfTopic;string DefaultInfMessage;string PublisherName;string MTime;long ClockTime;};};DCPS_DATA_TYPE標記一個供OpenDDS使用的數據類型;DCPS_DATA_KEY標記識別DCPS數據類型的字段,該字段被用作針對于此類型的鍵。在上述示例中,把Supermarket::UpdateInf 的成員UpdateInfID看作一個鍵。利用不同的UpdateInfID值所發布的每個樣本,將會被定義為在相同主題內不同的實例。
2. 處理IDL
該步驟可省略,之后可以通過MPC(The Makefile, Project, And Workspace
Creator)自動進行編譯。
3. 編寫消息發布者
3.1 初始化參與者
//初始化域參數工廠 DDS::DomainParticipantFactory_var dpf =TheParticipantFactoryWithArgs(argc, argv); //創建域參數 DDS::DomainParticipant_var participant =dpf->create_participant(42, // domain IDPARTICIPANT_QOS_DEFAULT,0, // No listener requiredOpenDDS::DCPS::DEFAULT_STATUS_MASK); //域參數創建異常處理 if (!participant) {std::cerr << "create_participant failed." << std::endl;return 1; }3.2 注冊數據類型和參與主題
//注冊Supermarket::DefaultInf類型的類型支持類 Supermarket::DefaultInfTypeSupport_var ts = new Supermarket::DefaultInfTypeSupportImpl; //注冊異常處理 if (DDS::RETCODE_OK != ts->register_type(participant, "")) {std::cerr << "register_type failed." << std::endl;return 1; }//創建主題(DefaultInf Supermarket) CORBA::String_var default_name = ts->get_type_name(); DDS::Topic_var default_topic =participant->create_topic("DefaultInf Supermarket",default_name,TOPIC_QOS_DEFAULT,0, // No listener requiredOpenDDS::DCPS::DEFAULT_STATUS_MASK); //主題創建異常處理 if (!default_topic) {std::cerr << "create_topic failed." << std::endl;return 1; }3.3 創建發布者
//創建發布者 DDS::Publisher_var pub =participant->create_publisher(PUBLISHER_QOS_DEFAULT,0, // No listener requiredOpenDDS::DCPS::DEFAULT_STATUS_MASK); //發布者創建異常處理 if (!pub) {std::cerr << "create_publisher failed." << std::endl;return 1; }3.4 創建數據寫者和等待訂閱者
//創建數據寫者 DDS::DataWriter_var DefaultWriter =pub->create_datawriter(default_topic,DATAWRITER_QOS_DEFAULT,0, // No listener requiredOpenDDS::DCPS::DEFAULT_STATUS_MASK); //數據寫者創建異常 if (!DefaultWriter) {std::cerr << "create_dataDefaultWriter failed." << std::endl;return 1; }3.5 發布數據樣本
//寫入數據樣本 Supermarket::DefaultInf DefaultInf; Supermarket::DefaultInf DefaultInf2; DefaultInf.DefaultInfID= 0; DefaultInf.DefaultInfMessage = "您有新的餓了么外賣訂單,請及時處理!"; DefaultInf.DefaultInfTopic= "餓了么"; DefaultInf.PublisherName = "餓了么(我才不餓)的發布者"; DefaultInf2.DefaultInfID = 0; DefaultInf2.DefaultInfMessage = "您有新的美團外賣訂單,請及時處理!"; DefaultInf2.DefaultInfTopic = "美團"; DefaultInf2.PublisherName = "美團外賣送啥都快"; //開始發布了,發布default //開始發布了,發布update DDS::Duration_t timeout = { 30, 0 }; for (int i = 0; i < 1000; ++i) {time_t nowTime;time(&nowTime);struct timeb tb;ftime(&tb); double rand_num = rand() / (RAND_MAX + 1.0);if (rand_num>0.5){DefaultInf.ClockTime = clock();CORBA::String_var s_Time = CORBA::string_dup(ctime(&nowTime));DefaultInf.MTime = s_Time;DDS::ReturnCode_t error = DefaultInf_DefaultWriter->write(DefaultInf, DDS::HANDLE_NIL);++DefaultInf.DefaultInfID;if (error != DDS::RETCODE_OK) {std::cerr << "default write failed." << std::endl;return 1;} if (DefaultWriter->wait_for_acknowledgments(timeout) != DDS::RETCODE_OK){std::cerr << "wait_for_acknowledgments failed." << std::endl;return 1;}cout << "[S01]發送了一條餓了么主題消息,發送時間是" << ctime(&nowTime) << endl;}else{DefaultInf2.ClockTime = clock();CORBA::String_var s_Time = CORBA::string_dup(ctime(&nowTime));DefaultInf2.MTime = s_Time;DDS::ReturnCode_t error = UpdateInf_DefaultWriter->write(DefaultInf2, DDS::HANDLE_NIL);++DefaultInf2.DefaultInfID;if (error != DDS::RETCODE_OK) {std::cerr << "update write failed." << std::endl;return 1;}if (UpdateWriter->wait_for_acknowledgments(timeout) != DDS::RETCODE_OK){std::cerr << "wait_for_acknowledgments failed." << std::endl;return 1;}cout << "[S01]發送了一條美團主題消息,發送時間是" << ctime(&nowTime) << endl;}Sleep(2500); }4. 實現訂閱者
4.1 初始化參與者
//初始化域參數工廠 DDS::DomainParticipantFactory_var dpf =TheParticipantFactoryWithArgs(argc, argv); //創建域參數 DDS::DomainParticipant_var participant =dpf->create_participant(42, // Domain IDPARTICIPANT_QOS_DEFAULT,0, // No listener requiredOpenDDS::DCPS::DEFAULT_STATUS_MASK); //域參數創建異常處理 if (!participant) {std::cerr << "create_participant failed." << std::endl;return 1; }4.2 注冊數據類型和創建主題
//注冊類型支持類Supermarket::DefaultInfTypeSupport_var mts = new Supermarket::DefaultInfTypeSupportImpl;//支持類異常處理if (DDS::RETCODE_OK != mts->register_type(participant, "")) {std::cerr << "Failed to register the DefaultInfTypeSupport." << std::endl;return 1;}//創建主題2(同樣也是DefaultInf Supermarket)CORBA::String_var type_name = mts->get_type_name();DDS::Topic_var topic2 =participant->create_topic("DefaultInf Supermarket",type_name,TOPIC_QOS_DEFAULT,0, // No listener requiredOpenDDS::DCPS::DEFAULT_STATUS_MASK);//主題異常處理if (!topic2) {std::cerr << "Failed to create_topic." << std::endl;return 1;}//創建主題(同樣也是DefaultInf Supermarket)DDS::Topic_var topic =participant->create_topic("UpdateInf Supermarket",type_name,TOPIC_QOS_DEFAULT,0, // No listener requiredOpenDDS::DCPS::DEFAULT_STATUS_MASK);//主題異常處理if (!topic) {std::cerr << "Failed to create_topic." << std::endl;return 1;}4.3 創建訂閱者
//創建訂閱者DDS::Subscriber_var sub =participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT,0, // No listener requiredOpenDDS::DCPS::DEFAULT_STATUS_MASK);//異常處理if (!sub) {std::cerr << "Failed to create_subscriber." << std::endl;return 1;}4.4 創建DataReader和Listener
利用所創建的Listener對象,就可以檢測數據什么時候可用。
// 創建監聽 DDS::DataReaderListener_var listener(new DataReaderListenerImpl); // 創建數據讀取者DDS::DataReader_var dr =sub->create_datareader(topic,DATAREADER_QOS_DEFAULT,listener,OpenDDS::DCPS::DEFAULT_STATUS_MASK);//讀者創建異常處理if (!dr) {std::cerr << "create_datareader failed." << std::endl;return 1;}Supermarket::DefaultInfDataReader_var reader_i = Supermarket::DefaultInfDataReader::_narrow(dr);if (!reader_i){std::cerr << "_narrow failed." << std::endl;}5. DataReaderListener實現
void DataReaderListenerImpl::on_data_available(DDS::DataReader_ptr reader) {try {Supermarket::DefaultInfDataReader_var reader_i = Supermarket::DefaultInfDataReader::_narrow(reader);if (!reader_i) {std::cerr << "read: _narrow failed." << std::endl;return;}Supermarket::DefaultInf Supermarket_DefaultInf;DDS::SampleInfo si;DDS::ReturnCode_t status = reader_i->take_next_sample(Supermarket_DefaultInf, si);if (status == DDS::RETCODE_OK) {printf("\n");//std::cout << "【冷漠到不想吃飯小組開心提示您】請查看以下信息:" << std::endl;//std::cout << "SampleInfo.sample_rank = " << si.sample_rank << std::endl;//std::cout << "SampleInfo.instance_state = " << si.instance_state << std::endl;if (si.valid_data){time_t nowTime;time(&nowTime);struct timeb tb;ftime(&tb);std::cout << "[S01]發送者消息編號=" << Supermarket_DefaultInf.DefaultInfID << std::endl<< "[S01]發送者毫秒級計時:" << Supermarket_DefaultInf.ClockTime<< std::endl<< " 消息標題: " << Supermarket_DefaultInf.DefaultInfTopic << std::endl<< "====================================================================="<<std::endl << Supermarket_DefaultInf.DefaultInfMessage << std::endl<< "=====================================================================" << std::endl << "消息來源:"<< Supermarket_DefaultInf.PublisherName<< std::endl<< "[S01]發送者時間:" << Supermarket_DefaultInf.MTime<< "[S01]本地接收時間:" << ctime(&nowTime) << "[S01]本地接收毫秒級計時:" << clock() << std::endl;}else if (si.instance_state == DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE){std::cout << "當前實例句柄" << std::endl;}else if (si.instance_state == DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE){std::cout << "instance is unregistered" << std::endl;}else{std::cerr << "ERROR: received unknown instance state "<< si.instance_state << std::endl;}}else if (status == DDS::RETCODE_NO_DATA) {std::cerr << "ERROR: reader received DDS::RETCODE_NO_DATA!" << std::endl;}else {std::cerr << "ERROR: read DefaultInf: Error: " << status << std::endl;}}catch (CORBA::Exception& e) {std::cerr << "Exception caught in main.cpp:" << std::endl<< e << std::endl;ACE_OS::exit(1);}}void DataReaderListenerImpl::on_requested_deadline_missed(DDS::DataReader_ptr,const DDS::RequestedDeadlineMissedStatus &) {std::cerr << "DataReaderListenerImpl::on_requested_deadline_missed" << std::endl; }void DataReaderListenerImpl::on_requested_incompatible_qos(DDS::DataReader_ptr,const DDS::RequestedIncompatibleQosStatus &) {std::cerr << "DataReaderListenerImpl::on_requested_incompatible_qos" << std::endl; }void DataReaderListenerImpl::on_liveliness_changed(DDS::DataReader_ptr,const DDS::LivelinessChangedStatus &) {std::cout << "接收者生成或者死亡" << std::endl; }void DataReaderListenerImpl::on_subscription_matched(DDS::DataReader_ptr,const DDS::SubscriptionMatchedStatus &) {std::cout << "匹配成功或匹配結束" << std::endl; }void DataReaderListenerImpl::on_sample_rejected(DDS::DataReader_ptr,const DDS::SampleRejectedStatus&) {std::cerr << "DataReaderListenerImpl::on_sample_rejected" << std::endl; }void DataReaderListenerImpl::on_sample_lost(DDS::DataReader_ptr,const DDS::SampleLostStatus&) {std::cerr << "DataReaderListenerImpl::on_sample_lost" << std::endl; }on_data_available為監聽的事件。
6. 在OpenDDS客戶端清理
客戶端指的就是發布者和訂閱者,在publisher.cpp和subscriber.cpp文件內添加代碼,清理OpenDDS相關的對象。
//實例清理 participant->delete_contained_entities(); dpf->delete_participant(participant); TheServiceParticipant->shutdown();7. 運行示例
1. 啟動DCPSInfoRepo服務,用于集中式發現
%DDS_ROOT%/bin/DCPSInfoRepo -ORBEndpoint iiop://localhost:123452. 雙擊啟動subscriber.exe和publisher.exe
利用DCPSInfoRepo服務可以實現集中式發現,另外RTPS服務可以實現對等發現。當OpenDDS應用程序需要與DDS規范的非OpenDDS實現進行交互操作,后者如果不希望再OpenDDS的部署中使用集中式發現時,RTPS對等發現的方式就比較重要了。
參考資料
[1]. 《分布式系統實時發布訂閱數據分發技術》
[2]. https://opendds.org/
[3]. https://download.objectcomputing.com/OpenDDS/OpenDDS-latest.pdf
[4]. OpenDDS Java開發(2)——基于Windows10的OpenDDS測試
總結
以上是生活随笔為你收集整理的OpenDDS应用开发步骤的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 三星q90 回音壁最新固件1010.5升
- 下一篇: ASP+AJAX制作无刷新新闻评论系统0