From 1823c273a9a31b3773c3a292c6cd630af2d69f47 Mon Sep 17 00:00:00 2001 From: Maxime Graulich <maxime.graulich@gmail.com> Date: Tue, 7 Oct 2014 15:25:41 +0000 Subject: [PATCH] edit recv for onelab2 server --- contrib/onelab2/OnelabLocalNetworkClient.cpp | 15 ++++ contrib/onelab2/OnelabLocalNetworkClient.h | 2 + contrib/onelab2/OnelabProtocol.cpp | 80 ++++++++++++++++++++ contrib/onelab2/OnelabProtocol.h | 2 + contrib/onelab2/OnelabServer.cpp | 54 ++++++------- 5 files changed, 126 insertions(+), 27 deletions(-) diff --git a/contrib/onelab2/OnelabLocalNetworkClient.cpp b/contrib/onelab2/OnelabLocalNetworkClient.cpp index 2e7d4a6a37..94060708b6 100644 --- a/contrib/onelab2/OnelabLocalNetworkClient.cpp +++ b/contrib/onelab2/OnelabLocalNetworkClient.cpp @@ -38,6 +38,21 @@ int OnelabLocalNetworkClient::recvfrom(UInt8 *buff, unsigned int maxlen) // return ip4_socket_recv(_fds, buff, maxlen); //#endif } +int OnelabLocalNetworkClient::recvmsg(OnelabProtocol &msg) +{ + UInt8 header[8]; + UInt8 *buff = NULL; + int recvlen = 0; + // recv the header + recvlen = recvfrom(header, 4); + if(recvlen != 4) return recvlen; + int msglen = msg.parseHeader(header, recvlen); + // then recv the message + buff = (UInt8 *) malloc(sizeof(UInt8)*msglen); + recvlen = recvfrom(buff, msglen); // recvlen should be equals to msglen + msg.parseMessage(buff, recvlen); + return recvlen + 4; +} void OnelabLocalNetworkClient::updateParameter(onelab::parameter *p) { OnelabProtocol msg(OnelabProtocol::OnelabUpdate); diff --git a/contrib/onelab2/OnelabLocalNetworkClient.h b/contrib/onelab2/OnelabLocalNetworkClient.h index cc2fe354ac..ca21c66960 100644 --- a/contrib/onelab2/OnelabLocalNetworkClient.h +++ b/contrib/onelab2/OnelabLocalNetworkClient.h @@ -5,6 +5,7 @@ #include "onelab.h" //#ifdef HAVE_UDT #include "UdtUtils.h" +#include "OnelabProtocol.h" //#else //#include "NetworkUtils.h" //#endif @@ -25,6 +26,7 @@ public: OnelabLocalNetworkClient(std::string name, Socket fd, unsigned int ip, unsigned short port); void sendto(UInt8 *buff, unsigned int len); int recvfrom(UInt8 *buff, unsigned int maxlen); + int recvmsg(OnelabProtocol &msg); UDTSOCKET getSSocket() {return _fds;} UDTSOCKET getUSocket() {return _fdu;} virtual ~OnelabLocalNetworkClient(){} diff --git a/contrib/onelab2/OnelabProtocol.cpp b/contrib/onelab2/OnelabProtocol.cpp index 490ca67ea6..416d1d1af9 100644 --- a/contrib/onelab2/OnelabProtocol.cpp +++ b/contrib/onelab2/OnelabProtocol.cpp @@ -39,6 +39,86 @@ unsigned short OnelabProtocol::encodeMsg(UInt8 *buff, UInt32 len) encode(sizeptr, _size); return (unsigned short)(ptr-buff); } +int OnelabProtocol::parseHeader(UInt8 *buff, UInt32 len) +{ + this->clearAttrs(); + if(len < 4) throw ERROR_BUFFER_TOO_SMALL; + + UInt8 *ptr = buff; + UInt8 version = 0; + ptr = parse(ptr, version); + if(version != ONELAB_VERSION) throw ERROR_ONELAB_VERSION; + ptr = parse(ptr, _type); + ptr = parse(ptr, _size); + + return _size; +} +UInt32 OnelabProtocol::parseMessage(UInt8 *buff, UInt32 len) +{ + UInt8 *ptr = buff; + UInt8 *payload = ptr; + unsigned short parsed = 4; + unsigned short size = _size; + while(size >= 4) { + UInt16 attrType = 0; + UInt16 attrSize = 0; + ptr = parse(ptr, attrType); + ptr = parse(ptr, attrSize); + size -= 4; + std::cout << "Try to parse an attribute of type 0x" << std::hex << (UInt16)attrType << std::dec << " and size : " << attrSize << std::endl; + if(attrSize > size) throw ERROR_BUFFER_TOO_SMALL; + switch(attrType) { + case OnelabAttr::Message: + this->attrs.push_back(new OnelabAttrMessage()); + ((OnelabAttrMessage *)this->attrs.back())->parseAttribute(ptr, attrSize); + break; + case OnelabAttr::Number: + this->attrs.push_back(new onelab::number()); + ((onelab::number *)this->attrs.back())->parseAttribute(ptr, attrSize); + break; + case OnelabAttr::String: + this->attrs.push_back(new onelab::string()); + ((onelab::string *)this->attrs.back())->parseAttribute(ptr, attrSize); + break; + case OnelabAttr::Region: + this->attrs.push_back(new onelab::region()); + ((onelab::region *)this->attrs.back())->parseAttribute(ptr, attrSize); + break; + case OnelabAttr::Function: + this->attrs.push_back(new onelab::region()); + ((onelab::function *)this->attrs.back())->parseAttribute(ptr, attrSize); + break; + case OnelabAttr::Start: + this->attrs.push_back(new OnelabAttrStart()); + ((onelab::string *)this->attrs.back())->parseAttribute(ptr, attrSize); + break; + case OnelabAttr::Parameter: + this->attrs.push_back(new OnelabAttrParameterQuery()); + ((OnelabAttrParameterQuery *)this->attrs.back())->parseAttribute(ptr, attrSize); + break; + case 0x0b: + this->attrs.push_back(new OnelabAttrFileQuery()); + ((OnelabAttrFileQuery *)this->attrs.back())->parseAttribute(ptr, attrSize); + break; + case 0x0c: + this->attrs.push_back(new OnelabAttrFile()); + ((OnelabAttrFile *)this->attrs.back())->parseAttribute(ptr, attrSize); + break; + default: + // FIXME unknown attribute + //if(attrSize != 0) throw "Size of attr must be 0!"; + /*this->attrs.push_back(new OnelabAttr(attrType)); + this->attrs.back()->parseAttribute(ptr, &attrSize);*/ + break; + } + ptr += attrSize; + size -= attrSize; + parsed += attrSize+4; + } + if(parsed != len) {std::cout << "parse - size left:" << len-parsed << '-' << size << "(len is "<< len <<" and parsed is "<< parsed <<" )" << std::endl;} + + return len-parsed; +} UInt32 OnelabProtocol::parseMsg(UInt8 *buff, UInt32 len) { this->clearAttrs(); diff --git a/contrib/onelab2/OnelabProtocol.h b/contrib/onelab2/OnelabProtocol.h index c2b8497f7a..2179bfbe5c 100644 --- a/contrib/onelab2/OnelabProtocol.h +++ b/contrib/onelab2/OnelabProtocol.h @@ -21,6 +21,8 @@ public: void clearAttrs(); unsigned short encodeMsg(UInt8 *buff, UInt32 len); UInt32 parseMsg(UInt8 *buff, UInt32 len); + int parseHeader(UInt8 *buff, UInt32 len); + UInt32 parseMessage(UInt8 *buff, UInt32 len); void showMsg(); short msgType() {return _type;} diff --git a/contrib/onelab2/OnelabServer.cpp b/contrib/onelab2/OnelabServer.cpp index e17a658e22..26f04a4829 100644 --- a/contrib/onelab2/OnelabServer.cpp +++ b/contrib/onelab2/OnelabServer.cpp @@ -115,26 +115,24 @@ DWORD WINAPI listenOnClients(LPVOID param) std::set<Socket> fdss; int recvlen = 0; UInt8 prev[1024]; - UInt32 unreaded = 0; UInt8 buff[1024]; OnelabProtocol msg(-1), rep(-1); int eid = OnelabServer::instance()->getEID(); - while(UDT::ERROR != UDT::epoll_wait(eid, &fdus, NULL, -1, &fdss)) { - std::cout << "ok" << std::endl; + while(UDT::ERROR != UDT::epoll_wait(eid, &fdus, NULL, -1, &fdss, NULL)) { //for(std::set<UDTSOCKET>::iterator it = fdus.begin(); it != fdus.end(); ++it) { for(std::set<Socket>::iterator it = fdss.begin(); it != fdss.end(); ++it) { OnelabLocalNetworkClient *cli = OnelabServer::instance()->getClient(*it); - std::cout << "ok ->" << (void*)cli << std::endl; if(cli == NULL) { IPv4 ip; //recvlen = udt_socket_recv(*it, buff, 1024); - recvlen = ip4_socket_recv(*it, buff, 1024, ip); + recvlen = ip4_socket_recv(*it, buff, 1024, ip); // FIXME std::clog << "recv " << recvlen << std::endl; msg.parseMsg(buff, recvlen); msg.showMsg(); if(msg.msgType() == OnelabProtocol::OnelabStart && msg.attrs.size() > 0 && msg.attrs[0]->getAttributeType() == OnelabAttr::Start) { std::string name = std::string(((OnelabAttrStart *)msg.attrs[0])->name()); if(OnelabServer::instance()->getClient(name) != NULL) { + std::cout << "A client exist with this name !" << std::endl; rep.msgType(OnelabProtocol::OnelabMessage); rep.attrs.push_back(new OnelabAttrMessage("A client exist with this name !", OnelabAttrMessage::Fatal)); recvlen = rep.encodeMsg(buff, 1024); @@ -151,7 +149,7 @@ DWORD WINAPI listenOnClients(LPVOID param) recvlen = rep.encodeMsg(buff, 1024); cli = OnelabServer::instance()->getClient(*it); cli->sendto(buff, recvlen); - //FIXME OnelabServer::instance()->sendAllParameter(cli); + OnelabServer::instance()->sendAllParameter(cli); continue; } else { @@ -163,20 +161,24 @@ DWORD WINAPI listenOnClients(LPVOID param) } } else { + std::cout << "ok" << std::endl; try { - //recvlen = cli->recvfrom(buff, 1024); - std::cout << "i've " << unreaded << "bytes from previous packet" << std::endl; - memcpy((char*)buff, (char*)prev, unreaded); - recvlen = cli->recvfrom(buff+unreaded, 1024-unreaded); - recvlen += unreaded; + recvlen = cli->recvmsg(msg); } - catch(int &e) { // for UDT - if(UDT::getlasterror().getErrorCode() == 2001) { // ECONNLOST + catch(int &e) { + if(e == 50) { // Recv error (TCP) std::cout << "\033[0;31m" << "Connection with client \"" << cli->getName() << "\" was broken, removing the client." << "\033[0m" << std::endl; // DEBUG - UDT::epoll_remove_usock(eid, *it); + UDT::epoll_remove_ssock(eid, *it); OnelabServer::instance()->removeClient(cli); UDT::close(*it); } + // TODO for UDT + /*if(UDT::getlasterror().getErrorCode() == 2001 || e == 50) { // ECONNLOST + std::cout << "\033[0;31m" << "Connection with client \"" << cli->getName() << "\" was broken, removing the client." << "\033[0m" << std::endl; // DEBUG + UDT::epoll_remove_usock(eid, *it); + OnelabServer::instance()->removeClient(cli); + UDT::close(*it); + }*/ continue; } if(recvlen == 0) { // for TCP @@ -186,11 +188,8 @@ DWORD WINAPI listenOnClients(LPVOID param) UDT::close(*it); continue; } - //std::clog << "recv " << recvlen << " bytes on client " << cli->getName() << std::endl; - unreaded = msg.parseMsg(buff, recvlen); - if(unreaded > 0) { - memcpy((char*)prev, (char*)buff+(recvlen-unreaded), unreaded); - } + std::clog << "recv " << recvlen << " bytes on client " << cli->getName() << std::endl; + //unreaded = msg.parseMsg(buff, recvlen); switch (msg.msgType()) { case OnelabProtocol::OnelabStop: std::cout << "\033[0;31m" << "Client \"" << cli->getName() << "\" is going to stop" << "\033[0m" << std::endl; // DEBUG @@ -348,10 +347,10 @@ void OnelabServer::Run() udt_socket_listen(_fdu); ip4_socket_listen(_fds); - std::clog << "listen on " << ip4_inet_ntop(_ip.address) << ":" << _ip.port << "(tcp)" << std::endl - << "listen on " << ip4_inet_ntop(_ip.address) << ":" << _ip.port << "(udp/udt)" << std::endl; - //while(newcli = udt_socket_accept(_fdu, ip)) { // TODO accept ip4 - while(newcli = ip4_socket_accept(_fds, ip)) { // TODO accept ip4 + std::clog << "listen on " << ip4_inet_ntop(_ip.address) << ":" << _ip.port << "(tcp)" << std::endl; + // << "listen on " << ip4_inet_ntop(_ip.address) << ":" << _ip.port << "(udp/udt)" << std::endl; + //while(newcli = udt_socket_accept(_fdu, ip)) { // TODO accept udt and tcp ? + while(newcli = ip4_socket_accept(_fds, ip)) { std::clog << "\033[0;31m" << "accpet peer : " << ip4_inet_ntop(ip.address)<< ':' << ip.port << "\033[0m" << std::endl; //UDT::epoll_add_usock(_eid, newcli); UDT::epoll_add_ssock(_eid, newcli); @@ -359,14 +358,15 @@ void OnelabServer::Run() #ifndef WIN32 pthread_create(&listen_thread, NULL, listenOnClients, NULL); #else - listen_thread = CreateThread(NULL, 0, listenOnClients, NULL, 0, NULL); + listen_thread = CreateThread(NULL, 0, listenOnClients, NULL, 0, NULL); #endif - } + if(_clients.size() == 1) #ifndef WIN32 - pthread_join(listen_thread, NULL); + pthread_join(listen_thread, NULL); #else - WaitForSingleObject(listen_thread, INFINITE); + WaitForSingleObject(listen_thread, INFINITE); #endif + } udt_socket_close(_fdu); #else // TODO -- GitLab