libwebsockets的学习
生活随笔
收集整理的這篇文章主要介紹了
libwebsockets的学习
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1 下載源碼
apt-get source libwebsockets-devsudo apt-get install libwebsockets-dev2 編譯(查看readme.)
cd libwebsockets mkdir build cd build cmake ../ -D參考 README.build.md?? cmake .. -DCMAKE_BUILD_TYPE=DEBUG
3 使用
3.1 Client
/*============================================================================Name : test-websocket.cAuthor : cjVersion :Copyright : Your copyright noticeDescription : Hello World in C, Ansi-style============================================================================*/#include <stdio.h> #include <stdlib.h> #include <libwebsockets.h> #include "clog.h" #include <pthread.h>struct session_data {char name[100]; };static int ws_service_callback(struct lws *wsi,enum lws_callback_reasons reason, void *user, void *in, size_t size) {int n;struct session_data *session = (struct session_data*) user;switch (reason) {case LWS_CALLBACK_GET_THREAD_ID:break;case LWS_CALLBACK_CLIENT_ESTABLISHED: {log_d("CLIENT_ESTABLISHED");char *str = "hello start..";int len = strlen(str);unsigned char *out = (unsigned char *) malloc(sizeof(unsigned char)* (LWS_SEND_BUFFER_PRE_PADDING + len+ LWS_SEND_BUFFER_POST_PADDING));memcpy(out + LWS_SEND_BUFFER_PRE_PADDING, str, len);n = lws_write(wsi, out + LWS_SEND_BUFFER_PRE_PADDING, len,LWS_WRITE_TEXT);//sprintf(session->name, "start....");//log_d("n=%d", n);}break;case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:log_d("CONNECTION_ERROR");return -1;break;case LWS_CALLBACK_CLOSED:log_d("LWS_CALLBACK_CLOSED");break;case LWS_CALLBACK_WSI_DESTROY:log_d("LWS_CALLBACK_WSI_DESTROY");break;case LWS_CALLBACK_CLIENT_RECEIVE:log_d("session:%s, recv(len=%d)>%s", session->name, size, (char * ) in);{char *str = "hello world. recv";int len = strlen(str);unsigned char *out = (unsigned char *) malloc(sizeof(unsigned char) * (LWS_PRE + len));memcpy(out + LWS_PRE, str, len);n = lws_write(wsi, out + LWS_PRE, len, LWS_WRITE_BINARY);//log_d("n=%d", n);n = lws_callback_on_writable(wsi);//log_d("n=%d", n);}break;case LWS_CALLBACK_CLIENT_WRITEABLE: {//log_d("LWS_CALLBACK_CLIENT_WRITEABLE");char *str = "write.....";int len = strlen(str);unsigned char *out = (unsigned char *) malloc(sizeof(unsigned char) * (LWS_PRE + len));memcpy(out + LWS_PRE, str, len);n = lws_write(wsi, out + LWS_PRE, len, LWS_WRITE_BINARY);n = lws_callback_on_writable(wsi);}break;default:break;}return 0; } #ifdef WCLIENT int main(int argc,char **argv) { #else int main_client() { #endifconst char *address = "127.0.0.1";struct lws_context *context = NULL;struct lws_context_creation_info info;struct lws *wsi = NULL;struct lws_protocols protocol; // struct sigaction act; // act.sa_handler = INT_HANDLER; // act.sa_flags = 0; // sigemptyset(&act.sa_mask); // sigaction( SIGINT, &act, 0);memset(&info, 0, sizeof info);info.port = CONTEXT_PORT_NO_LISTEN;info.iface = NULL;info.protocols = &protocol;info.ssl_cert_filepath = NULL;info.ssl_private_key_filepath = NULL;info.extensions = NULL;info.gid = -1;info.uid = -1;info.options = 0;protocol.name = "abc";protocol.callback = &ws_service_callback;protocol.per_session_data_size = sizeof(struct session_data);protocol.rx_buffer_size = 65535;protocol.id = 0;protocol.user = NULL;context = lws_create_context(&info);log_d("[Main] context created.");if (context == NULL) {log_d("[Main] context is NULL.");return -1;}#if 0wsi = lws_client_connect(context, address, 1883, 0, "/123","111:5000",NULL, protocol.name, -1); #elsestruct lws_client_connect_info i;memset(&i, 0, sizeof(i));i.port = 8080;i.address = address;i.path = "abc";i.context = context;i.ssl_connection = 0;i.host = i.address;i.origin = i.address;i.ietf_version_or_minus_one = -1;i.client_exts = NULL;i.protocol = "abc";wsi = lws_client_connect_via_info(&i); #endifif (wsi == NULL) {log_d("[Main] wsi create error.");return -1;}log_d("start....");int n = 0;while (n >= 0) {//當連接斷開時需要重新連接在這使用標志位//if conflag==0 lws_client_connect_via_infon = lws_service(context, 50);}lws_context_destroy(context);log_d("close....");return EXIT_SUCCESS; }| LWS_CALLBACK_CLIENT_ESTABLISHED | 第一次連接 這是客戶端 |
| LWS_CALLBACK_CLIENT_RECEIVE | 接收到數據 這是客戶端 |
| LWS_CALLBACK_CLIENT_WRITEABLE | 可以發送數據了 |
| LWS_CALLBACK_WSI_DESTROY | 正在銷毀,在這可以釋放ESTABLISHED時malloc的數據 |
3.2 Server
/*============================================================================Name : test-websocket.cAuthor : cjVersion :Copyright : Your copyright noticeDescription : Hello World in C, Ansi-style============================================================================*/#include <stdio.h> #include <stdlib.h> #include <libwebsockets.h> #include "clog.h" #include <pthread.h>struct session_data {char name[100]; };static int ws_service_callback(struct lws *wsi,enum lws_callback_reasons reason, void *user, void *in, size_t size) {int n;struct session_data *session = (struct session_data*) user;switch (reason) {case LWS_CALLBACK_GET_THREAD_ID:break;case LWS_CALLBACK_ESTABLISHED: {log_d("ESTABLISHED");char *str = "hello start..";int len = strlen(str);unsigned char *out = (unsigned char *) malloc(sizeof(unsigned char)* (LWS_SEND_BUFFER_PRE_PADDING + len+ LWS_SEND_BUFFER_POST_PADDING));memcpy(out + LWS_SEND_BUFFER_PRE_PADDING, str, len);n = lws_write(wsi, out + LWS_SEND_BUFFER_PRE_PADDING, len,LWS_WRITE_TEXT);sprintf(session->name, "start....");}break;case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:log_d("CONNECTION_ERROR");return -1;break;case LWS_CALLBACK_CLOSED:log_d("LWS_CALLBACK_CLOSED");break;case LWS_CALLBACK_WSI_DESTROY:log_d("LWS_CALLBACK_WSI_DESTROY");break;case LWS_CALLBACK_RECEIVE://log_d("session:%s, recv(len=%d)>%s", session->name, size, (char * ) in);{char *str = "hello world. recv";int len = strlen(str);unsigned char *out = (unsigned char *) malloc(sizeof(unsigned char) * (LWS_PRE + len));memcpy(out + LWS_PRE, str, len);n = lws_write(wsi, out + LWS_PRE, len, LWS_WRITE_BINARY);n = lws_callback_on_writable(wsi);}break;case LWS_CALLBACK_SERVER_WRITEABLE: {//log_d("LWS_CALLBACK_CLIENT_WRITEABLE");char *str = "write....ser.";int len = strlen(str);unsigned char *out = (unsigned char *) malloc(sizeof(unsigned char) * (LWS_PRE + len));memcpy(out + LWS_PRE, str, len);n = lws_write(wsi, out + LWS_PRE, len, LWS_WRITE_BINARY);n = lws_callback_on_writable(wsi);}break;default:break;}return 0; }int main(void) {struct lws_context *context = NULL;struct lws_context_creation_info info;struct lws_protocols protocol; // struct sigaction act; // act.sa_handler = INT_HANDLER; // act.sa_flags = 0; // sigemptyset(&act.sa_mask); // sigaction( SIGINT, &act, 0);memset(&info, 0, sizeof info);info.port = 8080;info.iface = NULL;info.protocols = &protocol;info.ssl_cert_filepath = NULL;info.ssl_private_key_filepath = NULL;info.extensions = NULL;info.gid = -1;info.uid = -1;info.options = 0;protocol.name = "abc";protocol.callback = &ws_service_callback;protocol.per_session_data_size = sizeof(struct session_data);protocol.rx_buffer_size = 65535;protocol.id = 0;protocol.user = NULL;context = lws_create_context(&info);log_d("[Main] context created.");if (context == NULL) {log_d("[Main] context is NULL.");return -1;}log_d("start....");int n = 0;while (n >= 0) {n = lws_service(context, 50);}lws_context_destroy(context);return EXIT_SUCCESS; }| LWS_CALLBACK_ESTABLISHED | 收到客戶端連接,在這里可以注冊user (session_data) 到一個線程安全Lock的鏈表上 user中可以添加一個發送數據隊列 |
| LWS_CALLBACK_RECEIVE | 收到客戶端數據,后續需要寫數據可以添加使用 lws_callback_on_writable 來處理寫事件 |
| LWS_CALLBACK_SERVER_WRITEABLE | 可以寫數據到客戶端,添加 lws_callback_on_writable, 這時可以把session中存儲的代發送數據發送出去 |
| LWS_CALLBACK_WSI_DESTROY | 釋放數據開始 ,這里請取消注冊到線程安全客戶端隊列里面 |
| LWS_CALLBACK_CLOSED | 有時候不會到這來,最好在DESTROY中處理 |
對于libwebsockets高級應用方面,我參考了 WebRTC+libwebsockets+Janus的秒開實踐_一朵喇叭花壓海棠的博客-CSDN博客_lws_callback_on_writable
? 里面說的 libwebsockets IO的優化
在這里可以參考一下mqtt服務器mosquitto
C++ 處理封裝邏輯
/*獲取wsi的fd遠程IP, 這樣寫差不多是為libwebsockets增加功能 */ static int lws_get_connectip(struct lws* wsi,char localip[20], int* localport,char remip[20], int* remport) {lws_sockfd_type fd = lws_get_socket_fd(wsi);if (fd < 0){perror("lws_get_socket_fd failed ! \n");return -1;}struct sockaddr_in sockAddr;socklen_t addrLen = sizeof(sockAddr);getsockname(fd, (struct sockaddr*)&sockAddr, &addrLen);inet_ntop(AF_INET, &sockAddr.sin_addr, localip, 20);*localport = ntohs(sockAddr.sin_port);addrLen = sizeof(sockAddr);getpeername(fd, (struct sockaddr*)&sockAddr, &addrLen); //得到遠程IP地址和端口號inet_ntop(AF_INET, &sockAddr.sin_addr, remip, 20); //IPV4*remport = ntohs(sockAddr.sin_port); //對方的端口號return 0; }class BuffCache {char* data;int size;int pos; public:BuffCache(){data = NULL;size = 0;pos = 0;}~BuffCache(){if (data)free(data);}void addData(void* _data, int _size){if (pos + _size > size){size = pos + _size + 10;if (data){data = (char*)realloc(data, size);}else{data = (char*)calloc(1, size);}}memcpy(data + pos, _data, _size);pos += size;}char* getData(){return data;}int getSize(){return pos;}void reset(){pos = 0;}};class WSMsg {char* data;char* _ptr;size_t len; public:typedef std::shared_ptr<WSMsg> Ptr;WSMsg(){data = NULL;len = 0;}~WSMsg(){if (data)free(data);}void setData(const void* _data, int size){data = (char*)calloc(1, size + LWS_PRE + 20);if (data){memcpy(data + LWS_PRE, _data, size);_ptr = data + LWS_PRE;len = size;}}void* getData(){return data + LWS_PRE;}size_t getLen(){return len;}static WSMsg::Ptr build(const void* data, int len){WSMsg::Ptr msg = std::make_shared<WSMsg>();msg->setData(data, len);return msg;}static WSMsg::Ptr buildFromJSON(cJSON* json){char* buff = cJSON_Print(json);WSMsg::Ptr msg = WSMsg::build(buff, strlen(buff));free(buff);return msg;} };static int _websocket_callback_app(struct lws* wsi,enum lws_callback_reasons reason, void* user, void* in, size_t len);static struct lws_protocols protocols[] = {//{ "http", lws_callback_http_dummy, 0, 0 },{"LxAppProtocol", _websocket_callback_app, /100, /1024 * 1024}, //500k/{ NULL, NULL, 0, 0 } /* terminator */ };struct lws* wsi_connect_client(struct lws_context* context, const char* addr) {struct lws_client_connect_info i;memset(&i, 0, sizeof(i));i.path = "/test";i.address = addr;i.port = 12345; //默認端口i.context = context;i.ssl_connection = 0;i.host = i.address;i.origin = i.address;i.ietf_version_or_minus_one = -1;i.client_exts = NULL;i.protocol = "xxxx";return lws_client_connect_via_info(&i); }//ws 會話 class WSSession {struct lws* _wsi;int fd;char local_ip[20]; //本地int local_port;char rem_ip[20]; //遠程IPint peer_port; //遠程端口std::string uri;int conType;std::mutex _mtx;std::list<WSMsg::Ptr> _txMsgList; //發送消息列表bool _need_close;BuffCache bufCache; public:typedef std::shared_ptr<WSSession> Ptr;typedef enum{//連接類型 //鏈接模式MyServer = 0, //我是服務器,別人連接我MyClient, //連接到外部,} ConnectType;WSSession(){data_type = WS_DATA_NONE;fd = 0;_need_close = false;memset(&lxProtoCtx, 0, sizeof(lxProtoCtx));}void set_lws(struct lws* wsi){_wsi = wsi;fd = (int)lws_get_socket_fd(_wsi);//lws_get_protocol(wsi);lws_get_connectip(wsi, local_ip, &local_port, rem_ip, &peer_port);char buff[100];memset(buff, 0, sizeof(buff));lws_hdr_copy(wsi, buff, sizeof(buff), WSI_TOKEN_GET_URI);uri = buff;_need_close = false;}void setDeviceID(const char* id){strcpy(lxProtoCtx.device_id, id);}struct lws* get_lws(){return _wsi;}void setConnectType(WSSession::ConnectType t){conType = (int)t;}void debug(){std::cout << "fd=" << fd << std::endl;std::cout << "local_ip:" << local_ip << ":" << local_port << std::endl;std::cout << "rem_ip:" << rem_ip << ":" << peer_port << std::endl;std::cout << "uri:" << uri << std::endl;}//初始化處理void initSession();//接收消息void onRecv(void* in, int size, int is_final, int is_binary){bufCache.addData(in, size);if (is_final){char* buff = bufCache.getData();if (WS_DATA_TYPE_LXPROTO_JSON == data_type)onHandleLxProtoMsg(buff, bufCache.getSize());bufCache.reset();}}//處理lx協議消息int onHandleLxProtoMsg(const char* msg, int msglen);///void asyncClose(){_need_close = true;}bool isNeedClose(){return _need_close;}size_t sendMessage(WSMsg::Ptr& msg){std::lock_guard<std::mutex> lock(_mtx);_txMsgList.push_back(msg);return _txMsgList.size();}size_t sendMessage(const void* data, int len){std::lock_guard<std::mutex> lock(_mtx);WSMsg::Ptr msg = WSMsg::build(data, len);_txMsgList.push_back(msg);return _txMsgList.size();}size_t sendMessage(cJSON* json){std::lock_guard<std::mutex> lock(_mtx);WSMsg::Ptr msg = WSMsg::buildFromJSON(json);_txMsgList.push_back(msg);return _txMsgList.size();}size_t msgCount(){std::lock_guard<std::mutex> lock(_mtx);return _txMsgList.size();}WSMsg::Ptr popFirstMsg(){std::lock_guard<std::mutex> lock(_mtx);if (_txMsgList.size() == 0){return nullptr;}WSMsg::Ptr msg = _txMsgList.front();_txMsgList.pop_front();return msg;}bool checkMsg(){//判斷的心跳消息if (msgCount() > 0 || isNeedClose()){//lws_callback_all_protocollws_callback_on_writable(get_lws());return true;}return false;} };//Websocket Server class WSServer {std::atomic_bool _runflag;std::thread thread;int _listen_port;std::mutex _mtx;std::list<WSSession::Ptr> sessions;std::string _host_server;std::string _devid; //設備IDstruct lws* _host_wsi;struct lws_context* _context; public:WSServer(){_runflag = false;_listen_port = -1;_host_wsi = NULL;_context = NULL;}~WSServer(){if (_runflag){stop();std::cout << __FUNCTION__ << std::endl;}}void setListenPort(int port){_listen_port = port;}//是否運行中bool isRunning(){return _runflag;}//停止void stop(){if (_runflag){_runflag = false;thread.join();}}//啟動void start(){if (_runflag)throw std::runtime_error("is running...");_runflag = true;WSServer* serverPtr = this;thread = std::thread([serverPtr]{serverPtr->run();});}//添加需要連接的服務器void addConnect(const char* host, const char* devid){_host_server = host;_devid = devid;}std::string& getDevID(){return _devid;}//連接失敗void wsi_connect_error(struct lws* wsi){_host_wsi = NULL;}//連接斷開void wsi_distory(struct lws* wsi){if (wsi == _host_wsi){std::cout << "close _host_wsi" << std::endl;_host_wsi = NULL;}std::lock_guard<std::mutex> lock(_mtx);for (auto itr = sessions.begin(); itr != sessions.end();){WSSession::Ptr s = (*itr);if (s->get_lws() == wsi)itr = sessions.erase(itr);else++itr;}}//添加一個會話void addSession(WSSession::Ptr& session){std::lock_guard<std::mutex> lock(_mtx);sessions.push_back(session);LOGI("WSServer", "add session\n");}//廣播一個消息void broadcastMessage(const void* data, int len){std::lock_guard<std::mutex> lock(_mtx);for (auto itr = sessions.begin(); itr != sessions.end(); ++itr){(*itr)->sendMessage(data, len);}if (sessions.size() > 0 && _context){lws_cancel_service(_context);}}//檢測發送消息void checkMsg(){std::lock_guard<std::mutex> lock(_mtx);for (auto itr = sessions.begin(); itr != sessions.end(); ++itr){(*itr)->checkMsg();}}//如果this被釋放會有問題哦..void run(){struct lws_context_creation_info info;struct lws_context* context = NULL;//struct lws_client_connect_info i;int ret = 0;memset(&info, 0, sizeof(info));info.port = CONTEXT_PORT_NO_LISTEN;info.port = _listen_port;info.iface = NULL;info.protocols = protocols;info.ssl_cert_filepath = NULL;info.ssl_private_key_filepath = NULL;info.extensions = NULL;//lws_get_internal_extensions();info.gid = -1;info.uid = -1;info.options = 0;//info.count_threads = 1;info.options = LWS_SERVER_OPTION_VALIDATE_UTF8;info.mounts = NULL;info.user = (void*)this;//是否啟用服務器功能呢?context = lws_create_context(&info);if (!context){perror("not create context");//lws_cancel_service(_lws_context_server);return;}_context = context;printf("user:%s\n", (char*)lws_context_user(context));while (!ret && isRunning()){//什么時候開始連接呢?if (_host_wsi == NULL && _host_server.length() > 0){_host_wsi = wsi_connect_client(context, _host_server.c_str());LOGI("WSServer", "connect >>>> %p\n", _host_wsi);}//checkMsg();ret = lws_service(context, 1000);}lws_context_destroy(context);_context = NULL;LOGI("WSServer", "run exit\n");} };static int _websocket_callback_app(struct lws* wsi,enum lws_callback_reasons reason, void* user, void* in, size_t len) {struct UserCtx{WSSession* session;};struct lws_context* ctx = lws_get_context(wsi);WSServer* wsServer = (WSServer*)lws_context_user(ctx);struct UserCtx* userCtx = (struct UserCtx*)user;switch ((int)reason){case LWS_CALLBACK_PROTOCOL_INIT:break;case LWS_CALLBACK_WSI_CREATE: //user=NULL//log_d("websocket", "LWS_CALLBACK_WSI_CREATE wsi_session=%p\n", wsi_session);break;case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:printf("服務器鏈接失敗\r\n");if (wsServer)wsServer->wsi_connect_error(wsi);break;case LWS_CALLBACK_CLIENT_ESTABLISHED: //鏈接ws服務器成功,client modecase LWS_CALLBACK_ESTABLISHED: //webserver server模式時{printf("連接成功 \r\n");memset(userCtx, 0, sizeof(struct UserCtx));WSSession::Ptr session = std::make_shared<WSSession>();session->set_lws(wsi);if (reason == LWS_CALLBACK_CLIENT_ESTABLISHED)session->setConnectType(WSSession::ConnectType::MyClient);if (reason == LWS_CALLBACK_ESTABLISHED)session->setConnectType(WSSession::ConnectType::MyServer);session->debug();if (wsServer)wsServer->addSession(session);userCtx->session = session.get();std::string devID = wsServer->getDevID();session->setDeviceID(devID.c_str());session->initSession();//不能在這里發消息...........//lws_callback_on_writable(wsi);break;}case LWS_CALLBACK_CLOSED:printf("LWS_CALLBACK_CLOSED:鏈接斷開:%p\n", wsi);break;case LWS_CALLBACK_WSI_DESTROY:{printf("LWS_CALLBACK_WSI_DESTROY:%p\n", wsi);if (wsServer)wsServer->wsi_distory(wsi);return -1;}break;/接收-發送/case LWS_CALLBACK_CLIENT_RECEIVE: //收到數據case LWS_CALLBACK_RECEIVE: //服務器模式接收{printf(">>>接收數據,%s isfinal:%d, 長度:%d\n",lws_frame_is_binary(wsi) ? "bin" : "txt",lws_is_final_fragment(wsi),len);userCtx->session->onRecv(in,len,lws_is_final_fragment(wsi),lws_frame_is_binary(wsi));}break;case LWS_CALLBACK_SERVER_WRITEABLE: //服務端寫入case LWS_CALLBACK_CLIENT_WRITEABLE: //客戶端寫入{//作為服務器來發送消息時,text會被加密,固不能使用buff緩存//如果發送隊列阻塞則不能發送消息.....WSMsg::Ptr msg = userCtx->session->popFirstMsg();if (msg){char* ptr = (char*)(msg->getData());printf("發送消息:[%s], len=%d\n", ptr, msg->getLen());int ret = lws_write(wsi, (unsigned char*)ptr, msg->getLen(), LWS_WRITE_BINARY);//LWS_WRITE_BINARY);printf("發送:ret=%d\n", ret);}if (userCtx->session->isNeedClose())return -1; #if 0{uint8_t ping[LWS_PRE + 125];int m, n = 0;n = lws_snprintf((char*)ping + LWS_PRE, 125,"ping body!");lwsl_user("Sending PING %d...\n", n);m = lws_write(wsi, ping + LWS_PRE, n, LWS_WRITE_PING);if (m < n) {lwsl_err("sending ping failed: %d\n", m);return -1;}} #endif}break;}return 0; }void WebSocketServerTest(){WSServer server1;WSServer server2;server1.setListenPort(11258);char id[50];strcpy(id, "112233445677");server1.start();server2.addConnect("192.168.0.189", id);server2.start();const char* msg = "hello xxxxxx,ccccc";for (int i = 0; i < 100; i++){Sleep(1*1000);server1.broadcastMessage(msg, strlen(msg));//server2.broadcastMessage(msg, strlen(msg));}server1.stop();server2.stop();LOGI("test", "stop\n");}總結
以上是生活随笔為你收集整理的libwebsockets的学习的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【水浒传】——宋江
- 下一篇: jdbc.properties 系统找不