diff --git a/CMakeLists.txt b/CMakeLists.txt index ff4521c23cbe464a9306a9858cb313a46cf4dccd..45ad0307577e9d0cea11747533130e49732d9740 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -521,17 +521,17 @@ endif(ENABLE_NATIVE_FILE_CHOOSER) if(ENABLE_ONELAB2) if(ENABLE_UDT) - set_config_option(HAVE_UDT "UDT") find_package(Threads REQUIRED) - find_path(UDT_INCLUDE_DIR NAMES udt.h PATH_SUFFIXES udt/) - find_library(UDT_LIBRARY NAMES udt) - if(UDT_INCLUDE_DIR AND UDT_LIBRARY AND CMAKE_THREAD_LIBS_INIT) + find_path(UDT_INCLUDE_DIR "udt.h" PATH_SUFFIXES include udt) + find_library(UDT_LIBRARY udt) + if(UDT_INCLUDE_DIR AND UDT_LIBRARY AND Threads_FOUND) + set_config_option(HAVE_UDT "UDT") list(APPEND EXTERNAL_LIBRARIES ${CMAKE_THREAD_LIBS_INIT}) list(APPEND EXTERNAL_LIBRARIES ${UDT_LIBRARY}) list(APPEND EXTERNAL_INCLUDES ${UDT_INCLUDE_DIR}) - else(UDT_INCLUDE_DIR AND UDT_LIBRARY AND CMAKE_THREAD_LIBS_INIT) + else(UDT_INCLUDE_DIR AND UDT_LIBRARY AND Threads_FOUND) message(FATAL_ERROR "Unable to find UDT library") - endif(UDT_INCLUDE_DIR AND UDT_LIBRARY AND CMAKE_THREAD_LIBS_INIT) + endif(UDT_INCLUDE_DIR AND UDT_LIBRARY AND Threads_FOUND) endif(ENABLE_UDT) set_config_option(HAVE_ONELAB2 "ONELAB2") set(GMSH_SRC ${GMSH_SRC};Fltk/onelab2Group.cpp;Fltk/onelab2Group.h) diff --git a/contrib/onelab2/CMakeLists.txt b/contrib/onelab2/CMakeLists.txt index 79f258c812db1cba64a064ca9848380932e8ba01..ef37305050f069b82fe6372e5a3db32a420ffadb 100644 --- a/contrib/onelab2/CMakeLists.txt +++ b/contrib/onelab2/CMakeLists.txt @@ -12,15 +12,15 @@ append_gmsh_src(contrib/onelab2 "${SRC};${HDR}") # build only the server if(HAVE_UDT) find_package(Threads REQUIRED) - find_path(UDT_INCLUDE_DIR NAMES udt.h PATH_SUFFIXES udt/) - find_library(UDT_LIBRARY NAMES udt) - if(UDT_INCLUDE_DIR AND UDT_LIBRARY AND CMAKE_THREAD_LIBS_INIT) + find_path(UDT_INCLUDE_DIR udt.h PATH_SUFFIXES include udt) + find_library(UDT_LIBRARY udt) + if(UDT_INCLUDE_DIR AND UDT_LIBRARY AND Threads_FOUND) list(APPEND EXTERNAL_INCLUDES ${UDT_INCLUDE_DIR}) list(APPEND EXTERNAL_LIBRARIES ${CMAKE_THREAD_LIBS_INIT}) list(APPEND EXTERNAL_LIBRARIES ${UDT_LIBRARY}) - else(UDT_INCLUDE_DIR AND UDT_LIBRARY AND CMAKE_THREAD_LIBS_INIT) + else(UDT_INCLUDE_DIR AND UDT_LIBRARY AND Threads_FOUND) message(FATAL_ERROR "Unable to find UDT library") - endif(UDT_INCLUDE_DIR AND UDT_LIBRARY AND CMAKE_THREAD_LIBS_INIT) + endif(UDT_INCLUDE_DIR AND UDT_LIBRARY AND Threads_FOUND) endif(HAVE_UDT) include_directories(${CMAKE_SOURCE_DIR}/Common/) # for onelab.h diff --git a/contrib/onelab2/OnelabDatabase.h b/contrib/onelab2/OnelabDatabase.h index c190d7216654bc97edc6e8f17f6154c529763417..906717f8cd6438e6292c6b09666e82789a5cc322 100644 --- a/contrib/onelab2/OnelabDatabase.h +++ b/contrib/onelab2/OnelabDatabase.h @@ -58,6 +58,7 @@ public: //OnelabServer::instance(0, 0); _localClient = new GmshLocalClient("localGUI", OnelabServer::instance()->getParameterSpace()); OnelabServer::instance()->addClient(_localClient); // TODO remove from server in _clear() + #ifndef WIN32 pthread_create(&_serverThread, NULL, OnelabDatabase_server, NULL); #else @@ -114,12 +115,12 @@ public: if(_client) return _client->get(ps, name); //if(_localClient) _localClient->get(ps, name); return OnelabServer::instance()->get(ps, name, client); - } + } bool fromFile(FILE *fp, const std::string &client="") { if(_client) return _client->fromFile(fp, client); return OnelabServer::instance()->fromFile(fp, client); - + } void onelab_cb(std::string action) { if(_client) return; // TODO send action to the server @@ -138,11 +139,11 @@ public: this->set(o); (*it).run(); } - + } while(action == "compute" && //TODO incrementLoops() && !false/*TODO onelab->stop*/); - + } }; @@ -158,7 +159,7 @@ DWORD WINAPI OnelabDatabase_listen(LPVOID arg) while(1) { recvlen = OnelabDatabase::instance()->listen(buff, 1024); if(recvlen == 1 && buff[0] == 'S') - break; + break; msg.parseMsg(buff, recvlen); msg.showMsg(); switch(msg.msgType()) { diff --git a/contrib/onelab2/OnelabNetworkClient.cpp b/contrib/onelab2/OnelabNetworkClient.cpp index 9e497c617568b5fea0636f452a18abf8ae02aab3..6db37ee4c459b02ea3e3a5066d3b989999154ce4 100644 --- a/contrib/onelab2/OnelabNetworkClient.cpp +++ b/contrib/onelab2/OnelabNetworkClient.cpp @@ -4,139 +4,147 @@ #ifdef HAVE_UDT OnelabNetworkClient::OnelabNetworkClient(std::string name, bool UDT) - : VirtualClient(name) + : VirtualClient(name) { _fds = 0; _fdu = 0; - _ip.address = 0; - _ip.port = 0; - _connected = false; + _ip.address = 0; + _ip.port = 0; + _connected = false; - IPv4 local; - local.address = 0;// FIXME ip4_default_iface(); - local.port = 0; - UDT::startup(); - if(UDT) _fdu = udt_socket(local, SOCK_STREAM); - else _fds = ip4_socket(local, SOCK_STREAM); + IPv4 local; + local.address = 0;// FIXME ip4_default_iface(); + local.port = 0; + UDT::startup(); + if(UDT) _fdu = udt_socket(local, SOCK_STREAM); + else _fds = ip4_socket(local, SOCK_STREAM); } OnelabNetworkClient::OnelabNetworkClient(std::string name, unsigned int ip, unsigned short port, bool UDT) - : VirtualClient(name) + : VirtualClient(name) { _fds = 0; _fdu = 0; - _ip.address = ip; - _ip.port = port; - _connected = false; + _ip.address = ip; + _ip.port = port; + _connected = false; - IPv4 local; - local.address = 0;//ip4_default_iface(); - local.port = 0; - UDT::startup(); - if(UDT) _fdu = udt_socket(local, SOCK_STREAM); - else _fds = ip4_socket(local, SOCK_STREAM); + IPv4 local; + local.address = 0;//ip4_default_iface(); + local.port = 0; + UDT::startup(); + if(UDT) _fdu = udt_socket(local, SOCK_STREAM); + else _fds = ip4_socket(local, SOCK_STREAM); } #else OnelabNetworkClient::OnelabNetworkClient(std::string name) - : VirtualClient(name) + : VirtualClient(name) { - _ip.address = 0; - _ip.port = 0; - _connected = false; + _ip.address = 0; + _ip.port = 0; + _connected = false; - IPv4 local; - local.address = 0;// FIXME ip4_default_iface(); - local.port = 0; - _fds = ip4_socket(local, SOCK_STREAM); + IPv4 local; + local.address = 0;// FIXME ip4_default_iface(); + local.port = 0; + _fds = ip4_socket(local, SOCK_STREAM); } OnelabNetworkClient::OnelabNetworkClient(std::string name, unsigned int ip, unsigned short port) - : VirtualClient(name) + : VirtualClient(name) { - _ip.address = ip; - _ip.port = port; - _connected = false; + _ip.address = ip; + _ip.port = port; + _connected = false; - IPv4 local; - local.address = 0;//ip4_default_iface(); - local.port = 0; - _fds = ip4_socket(local, SOCK_STREAM); + IPv4 local; + local.address = 0;//ip4_default_iface(); + local.port = 0; + _fds = ip4_socket(local, SOCK_STREAM); } #endif void OnelabNetworkClient::sendto(UInt8 *buff, UInt16 len) { #ifndef HAVE_UDT - ip4_socket_send(_fds, buff, len, _ip); + ip4_socket_send(_fds, buff, len, _ip); #else - if(_fds) ip4_socket_send(_fds, buff, len); - else udt_socket_send(_fdu, buff, len); + if(_fds) ip4_socket_send(_fds, buff, len); + else udt_socket_send(_fdu, buff, len); #endif } int OnelabNetworkClient::recvfrom(UInt8 *buff, UInt16 maxlen) { - IPv4 unused; + IPv4 unused; #ifndef HAVE_UDT - return ip4_socket_recv(_fds, buff, maxlen, unused); // FIXME check unused == _ip ? + return ip4_socket_recv(_fds, buff, maxlen, unused); // FIXME check unused == _ip ? #else - if(_fds) return ip4_socket_recv(_fds, buff, maxlen); // FIXME check unused == _ip ? - return udt_socket_recv(_fdu, buff, maxlen); + if(_fds) return ip4_socket_recv(_fds, buff, maxlen); // FIXME check unused == _ip ? + return udt_socket_recv(_fdu, buff, maxlen); #endif } void OnelabNetworkClient::recvfrom(OnelabProtocol &msg) { - UInt16 bufflen = 1024, recvlen = 0; - UInt8 buff[1024]; - recvlen = this->recvfrom(buff, bufflen); - msg.parseMsg(buff, recvlen); + UInt16 bufflen = 1024, recvlen = 0; + UInt8 buff[1024]; + recvlen = this->recvfrom(buff, bufflen); + msg.parseMsg(buff, recvlen); } bool OnelabNetworkClient::connect() { - UInt16 bufflen = 1024; - int recvlen = 0; - UInt8 buff[1024]; - OnelabProtocol msg(OnelabProtocol::OnelabStart); - if(_connected) return true; + UInt16 bufflen = 1024; + int recvlen = 0; + UInt8 buff[1024]; + OnelabProtocol msg(OnelabProtocol::OnelabStart); + if(_connected) return true; #ifdef HAVE_UDT - if(_fds) ip4_socket_connect(_fds, _ip); - else udt_socket_connect(_fdu, _ip); + if(_fds) { + ip4_socket_connect(_fds, _ip); + } + else{ + udt_socket_connect(_fdu, _ip); + } #else - ip4_socket_connect(_fds, _ip); + ip4_socket_connect(_fds, _ip); #endif - msg.attrs.push_back(new OnelabAttrStart(_name)); - recvlen = msg.encodeMsg(buff, bufflen); - sendto(buff, recvlen); + msg.attrs.push_back(new OnelabAttrStart(_name)); + recvlen = msg.encodeMsg(buff, bufflen); + sendto(buff, recvlen); #ifdef HAVE_UDT - udt_socket_timeout(_fdu, 3); + udt_socket_timeout(_fdu, 3); #endif - ip4_socket_timeout(_fds, 3); - recvlen = recvfrom(buff, bufflen); + //ip4_socket_timeout(_fds, 3); + recvlen = recvfrom(buff, bufflen); + #ifdef HAVE_UDT - udt_socket_timeout(_fdu, -1); + udt_socket_timeout(_fdu, -1); #endif - ip4_socket_timeout(_fds, 0); - if(recvlen <= 0) return false; - msg.parseMsg(buff, recvlen); - if(recvlen > 0 && msg.msgType() == OnelabProtocol::OnelabStart) _connected = true; - return _connected; + ip4_socket_timeout(_fds, 0); + if(recvlen <= 0) return false; + msg.parseMsg(buff, recvlen); + if(recvlen > 0 && msg.msgType() == OnelabProtocol::OnelabStart) _connected = true; + return _connected; } + void OnelabNetworkClient::disconnect() { - // Send a message to the server to say the client stop (the server have to reply) - UInt16 bufflen = 1024, recvlen = 0; - UInt8 buff[1024]; - OnelabProtocol msg(OnelabProtocol::OnelabStop); - if(!_connected) return; - recvlen = msg.encodeMsg(buff, bufflen); - this->sendto(buff, recvlen); - _connected = false; + // Send a message to the server to say the client stop (the server have to reply) + UInt16 bufflen = 1024, recvlen = 0; + UInt8 buff[1024]; + OnelabProtocol msg(OnelabProtocol::OnelabStop); + if(!_connected) return; + recvlen = msg.encodeMsg(buff, bufflen); + this->sendto(buff, recvlen); + _connected = false; } + void OnelabNetworkClient::request(OnelabProtocol &msg) { - UInt16 bufflen = 1024, recvlen = 0; - UInt8 buff[1024]; - recvlen = msg.encodeMsg(buff, bufflen); - this->sendto(buff, recvlen); + UInt16 bufflen = 1024, recvlen = 0; + UInt8 buff[1024]; + recvlen = msg.encodeMsg(buff, bufflen); + this->sendto(buff, recvlen); } + void OnelabNetworkClient::requestParameters() { - OnelabProtocol msg(OnelabProtocol::OnelabRequest); - this->request(msg); + OnelabProtocol msg(OnelabProtocol::OnelabRequest); + this->request(msg); } diff --git a/contrib/onelab2/OnelabServer.cpp b/contrib/onelab2/OnelabServer.cpp index 26f04a482975ba88a009d7ec5869c5a0e7fdd922..1c757b26da8696f387249dbec7a5ef845f56ddf0 100644 --- a/contrib/onelab2/OnelabServer.cpp +++ b/contrib/onelab2/OnelabServer.cpp @@ -17,47 +17,51 @@ static bool haveToStop = false; void signalHandler(int unused) { - haveToStop = true; + haveToStop = true; } OnelabServer::OnelabServer(UInt32 iface, UInt16 port) { - _ip.address = iface; - _ip.port = port; + _ip.address = iface; + _ip.port = port; #ifdef HAVE_UDT - UDT::startup(); - _fdu = udt_socket(_ip, SOCK_STREAM); + UDT::startup(); + _fdu = udt_socket(_ip, SOCK_STREAM); #endif - _fds = ip4_socket(_ip, SOCK_STREAM); + _fds = ip4_socket(_ip, SOCK_STREAM); ip4_socket_ip(_fds, _ip); } + OnelabServer::OnelabServer(UInt16 port) { - _ip.address = 0; - _ip.port = port; + _ip.address = 0; + _ip.port = port; #ifdef HAVE_UDT - UDT::startup(); - _fdu = udt_socket(_ip, SOCK_STREAM); + UDT::startup(); + _fdu = udt_socket(_ip, SOCK_STREAM); #endif - _fds = ip4_socket(_ip, SOCK_STREAM); + _fds = ip4_socket(_ip, SOCK_STREAM); ip4_socket_ip(_fds, _ip); } + #ifdef HAVE_UDT void OnelabServer::addClient(std::string name, UDTSOCKET fd, UInt32 ip, UInt16 port) { - this->_clients.push_back(OnelabLocalNetworkClient(name, fd, ip, port)); + this->_clients.push_back(OnelabLocalNetworkClient(name, fd, ip, port)); } + OnelabLocalNetworkClient *OnelabServer::getClient(UDTSOCKET fd) // UDTSOCKET Socket { - for(std::vector<OnelabLocalNetworkClient>::iterator it = _clients.begin(); it != _clients.end(); ++it) { - if(it->getUSocket() == fd) return &(*it); - if(it->getSSocket() == fd) return &(*it); - } - return NULL; + for(std::vector<OnelabLocalNetworkClient>::iterator it = _clients.begin(); it != _clients.end(); ++it) { + if(it->getUSocket() == fd) return &(*it); + if(it->getSSocket() == fd) return &(*it); + } + return NULL; } #endif + //void OnelabServer::addClient(std::string name, Socket fd, UInt32 ip, UInt16 port) //{ // if(ip == 0 || port == 0) throw "Unable to add client (invalid ip or port)"; @@ -72,304 +76,313 @@ OnelabLocalNetworkClient *OnelabServer::getClient(UDTSOCKET fd) // UDTSOCKET Soc //} void OnelabServer::sendto(std::string client, UInt8 *buff, UInt32 len) { - for(std::vector<OnelabLocalNetworkClient>::iterator it = this->_clients.begin() ; it != this->_clients.end(); ++it) { - if((*it).getName() != client) continue; - (*it).sendto(buff, len); - return; - } + for(std::vector<OnelabLocalNetworkClient>::iterator it = this->_clients.begin() ; it != this->_clients.end(); ++it) { + if((*it).getName() != client) continue; + (*it).sendto(buff, len); + return; + } } + OnelabLocalNetworkClient *OnelabServer::getClient(UInt32 ip, UInt16 port) { - for(std::vector<OnelabLocalNetworkClient>::iterator it = _clients.begin(); it != _clients.end(); ++it) { - if((*it).getIp() == ip && (*it).getPort() == port) - return &(*it); - } - return NULL; + for(std::vector<OnelabLocalNetworkClient>::iterator it = _clients.begin(); it != _clients.end(); ++it) { + if((*it).getIp() == ip && (*it).getPort() == port) + return &(*it); + } + return NULL; } + OnelabLocalNetworkClient *OnelabServer::getClient(std::string name) { - for(std::vector<OnelabLocalNetworkClient>::iterator it = _clients.begin(); it != _clients.end(); ++it) { - if((*it).getName() == name) - return &(*it); - } - return NULL; + for(std::vector<OnelabLocalNetworkClient>::iterator it = _clients.begin(); it != _clients.end(); ++it) { + if((*it).getName() == name) + return &(*it); + } + return NULL; } + void OnelabServer::removeClient(OnelabLocalNetworkClient *client) { - for(std::vector<OnelabLocalNetworkClient>::iterator it = _clients.begin(); it != _clients.end(); ++it) { - if(&(*it) == client) { - _clients.erase(it); - return; - } - } + for(std::vector<OnelabLocalNetworkClient>::iterator it = _clients.begin(); it != _clients.end(); ++it) { + if(&(*it) == client) { + _clients.erase(it); + return; + } + } } + #ifdef HAVE_UDT #ifndef WIN32 void *listenOnClients(void *param) #else -DWORD WINAPI listenOnClients(LPVOID param) + DWORD WINAPI listenOnClients(LPVOID param) #endif { IPv4 ip; - std::set<UDTSOCKET> fdus; - std::set<Socket> fdss; - int recvlen = 0; + std::set<UDTSOCKET> fdus; + std::set<Socket> fdss; + int recvlen = 0; UInt8 prev[1024]; - UInt8 buff[1024]; - OnelabProtocol msg(-1), rep(-1); - int eid = OnelabServer::instance()->getEID(); - while(UDT::ERROR != UDT::epoll_wait(eid, &fdus, NULL, -1, &fdss, NULL)) { - //for(std::set<UDTSOCKET>::iterator it = fdus.begin(); it != fdus.end(); ++it) { + UInt8 buff[1024]; + OnelabProtocol msg(-1), rep(-1); + int eid = OnelabServer::instance()->getEID(); + while(UDT::ERROR != UDT::epoll_wait(eid, &fdus, NULL, -1, &fdss, NULL)) { + //for(std::set<UDTSOCKET>::iterator it = fdus.begin(); it != fdus.end(); ++it) { + for(std::set<Socket>::iterator it = fdss.begin(); it != fdss.end(); ++it) { - OnelabLocalNetworkClient *cli = OnelabServer::instance()->getClient(*it); - if(cli == NULL) { - IPv4 ip; - //recvlen = udt_socket_recv(*it, buff, 1024); - recvlen = ip4_socket_recv(*it, buff, 1024, ip); // FIXME - std::clog << "recv " << recvlen << std::endl; - msg.parseMsg(buff, recvlen); + + OnelabLocalNetworkClient *cli = OnelabServer::instance()->getClient(*it); + if(cli == NULL) { + IPv4 ip; + //recvlen = udt_socket_recv(*it, buff, 1024); + recvlen = ip4_socket_recv(*it, buff, 1024, ip); // FIXME + std::clog << "recv " << recvlen << std::endl; + msg.parseMsg(buff, recvlen); msg.showMsg(); - if(msg.msgType() == OnelabProtocol::OnelabStart && msg.attrs.size() > 0 && msg.attrs[0]->getAttributeType() == OnelabAttr::Start) { - std::string name = std::string(((OnelabAttrStart *)msg.attrs[0])->name()); - if(OnelabServer::instance()->getClient(name) != NULL) { + if(msg.msgType() == OnelabProtocol::OnelabStart && msg.attrs.size() > 0 && msg.attrs[0]->getAttributeType() == OnelabAttr::Start) { + std::string name = std::string(((OnelabAttrStart *)msg.attrs[0])->name()); + if(OnelabServer::instance()->getClient(name) != NULL) { std::cout << "A client exist with this name !" << std::endl; - rep.msgType(OnelabProtocol::OnelabMessage); - rep.attrs.push_back(new OnelabAttrMessage("A client exist with this name !", OnelabAttrMessage::Fatal)); - recvlen = rep.encodeMsg(buff, 1024); - //udt_socket_send(*it, buff, recvlen); - //TODO ip4_socket_send(*it, buff, recvlen); - //UDT::epoll_remove_usock(eid, *it); - UDT::epoll_remove_ssock(eid, *it); - UDT::close(*it); - continue; - } - OnelabServer::instance()->addClient(name, *it, ip.address, ip.port); - std::clog << "\033[0;31m" << "Add a new client: " << name << "\033[0m" << std::endl; - OnelabProtocol rep(OnelabProtocol::OnelabStart); - recvlen = rep.encodeMsg(buff, 1024); - cli = OnelabServer::instance()->getClient(*it); - cli->sendto(buff, recvlen); - OnelabServer::instance()->sendAllParameter(cli); - continue; - } - else { - // cli shoud send a name first - //UDT::epoll_remove_usock(eid, *it); - UDT::epoll_remove_ssock(eid, *it); - UDT::close(*it); - continue; - } - } - else { + rep.msgType(OnelabProtocol::OnelabMessage); + rep.attrs.push_back(new OnelabAttrMessage("A client exist with this name !", OnelabAttrMessage::Fatal)); + recvlen = rep.encodeMsg(buff, 1024); + //udt_socket_send(*it, buff, recvlen); + //TODO ip4_socket_send(*it, buff, recvlen); + //UDT::epoll_remove_usock(eid, *it); + UDT::epoll_remove_ssock(eid, *it); + UDT::close(*it); + continue; + } + OnelabServer::instance()->addClient(name, *it, ip.address, ip.port); + std::clog << "\033[0;31m" << "Add a new client: " << name << "\033[0m" << std::endl; + OnelabProtocol rep(OnelabProtocol::OnelabStart); + recvlen = rep.encodeMsg(buff, 1024); + cli = OnelabServer::instance()->getClient(*it); + cli->sendto(buff, recvlen); + OnelabServer::instance()->sendAllParameter(cli); + continue; + } + else { + // cli shoud send a name first + //UDT::epoll_remove_usock(eid, *it); + UDT::epoll_remove_ssock(eid, *it); + UDT::close(*it); + continue; + } + } + else { std::cout << "ok" << std::endl; - try { + try { recvlen = cli->recvmsg(msg); - } - catch(int &e) { - if(e == 50) { // Recv error (TCP) - std::cout << "\033[0;31m" << "Connection with client \"" << cli->getName() << "\" was broken, removing the client." << "\033[0m" << std::endl; // DEBUG - UDT::epoll_remove_ssock(eid, *it); - OnelabServer::instance()->removeClient(cli); - UDT::close(*it); - } + } + catch(int &e) { + if(e == 50) { // Recv error (TCP) + std::cout << "\033[0;31m" << "Connection with client \"" << cli->getName() << "\" was broken, removing the client." << "\033[0m" << std::endl; // DEBUG + UDT::epoll_remove_ssock(eid, *it); + OnelabServer::instance()->removeClient(cli); + UDT::close(*it); + } // TODO for UDT - /*if(UDT::getlasterror().getErrorCode() == 2001 || e == 50) { // ECONNLOST - std::cout << "\033[0;31m" << "Connection with client \"" << cli->getName() << "\" was broken, removing the client." << "\033[0m" << std::endl; // DEBUG - UDT::epoll_remove_usock(eid, *it); - OnelabServer::instance()->removeClient(cli); - UDT::close(*it); - }*/ - continue; - } + /*if(UDT::getlasterror().getErrorCode() == 2001 || e == 50) { // ECONNLOST + std::cout << "\033[0;31m" << "Connection with client \"" << cli->getName() << "\" was broken, removing the client." << "\033[0m" << std::endl; // DEBUG + UDT::epoll_remove_usock(eid, *it); + OnelabServer::instance()->removeClient(cli); + UDT::close(*it); + }*/ + continue; + } if(recvlen == 0) { // for TCP - std::cout << "\033[0;31m" << "Connection with client \"" << cli->getName() << "\" was broken, removing the client." << "\033[0m" << std::endl; // DEBUG + std::cout << "\033[0;31m" << "Connection with client \"" << cli->getName() << "\" was broken, removing the client." << "\033[0m" << std::endl; // DEBUG UDT::epoll_remove_ssock(eid, *it); - OnelabServer::instance()->removeClient(cli); + OnelabServer::instance()->removeClient(cli); UDT::close(*it); continue; } - std::clog << "recv " << recvlen << " bytes on client " << cli->getName() << std::endl; - //unreaded = msg.parseMsg(buff, recvlen); - switch (msg.msgType()) { - case OnelabProtocol::OnelabStop: - std::cout << "\033[0;31m" << "Client \"" << cli->getName() << "\" is going to stop" << "\033[0m" << std::endl; // DEBUG - rep.msgType(OnelabProtocol::OnelabStop); - recvlen = rep.encodeMsg(buff, 1024); - cli->sendto(buff, recvlen); - //UDT::epoll_remove_usock(eid, *it); - UDT::epoll_remove_ssock(eid, *it); - OnelabServer::instance()->removeClient(cli); - UDT::close(*it); - break; - case OnelabProtocol::OnelabMessage: - // TODO - break; - case OnelabProtocol::OnelabRequest: - rep.msgType(OnelabProtocol::OnelabResponse); - if(msg.attrs.size() == 0) OnelabServer::instance()->sendAllParameter(cli); - else for(std::vector<OnelabAttr *>::iterator it = msg.attrs.begin() ; it != msg.attrs.end(); ++it) { - if((*it)->getAttributeType() == OnelabAttr::Parameter) { - OnelabAttrParameterQuery *attr = (OnelabAttrParameterQuery *)*it; - std::cout << "\033[0;31m" << "Client \"" << cli->getName() << " ask for parameter \"" << attr->getName() << "\"\033[0m" << std::endl; // DEBUG - onelab::parameter *p; - switch(attr->paramType()) { - case OnelabAttr::Number: - OnelabServer::instance()->getPtr((onelab::number **)&p, attr->getName(), cli->getName()); - break; - case OnelabAttr::String: - OnelabServer::instance()->getPtr((onelab::string **)&p, attr->getName(), cli->getName()); - break; - case OnelabAttr::Region: - OnelabServer::instance()->getPtr((onelab::region **)&p, attr->getName(), cli->getName()); - break; - case OnelabAttr::Function: - OnelabServer::instance()->getPtr((onelab::function **)&p, attr->getName(), cli->getName()); - break; - } - if(p != NULL) rep.attrs.push_back(p); - else rep.attrs.push_back(new OnelabAttrMessage("Unable to find the request parameter.", OnelabAttrMessage::Error)); - } - // else ?? - } - recvlen = rep.encodeMsg(buff, 1024); - cli->sendto(buff, recvlen); - break; - case OnelabProtocol::OnelabUpdate: - for(std::vector<OnelabAttr *>::iterator it = msg.attrs.begin() ; it != msg.attrs.end(); ++it) { - if((*it)->getAttributeType() >= OnelabAttr::Number && (*it)->getAttributeType() <= OnelabAttr::Function) { - onelab::parameter *attr = (onelab::parameter *)*it; - std::cout << "\033[0;31m" << "Client \"" << cli->getName() << " update parameter \"" << attr->getName() << "\"\033[0m" << std::endl; // DEBUG - onelab::parameter *parameter = NULL; - switch(attr->getAttributeType()) { - case OnelabAttr::Number: - OnelabServer::instance()->set(*(onelab::number *)attr, cli->getName()); - OnelabServer::instance()->getPtr((onelab::number **)¶meter, attr->getName(), cli->getName()); - break; - case OnelabAttr::String: - OnelabServer::instance()->set(*(onelab::string *)attr, cli->getName()); - OnelabServer::instance()->getPtr((onelab::string **)¶meter, attr->getName(), cli->getName()); - break; - case OnelabAttr::Region: - OnelabServer::instance()->set(*(onelab::region *)attr, cli->getName()); - OnelabServer::instance()->getPtr((onelab::region **)¶meter, attr->getName(), cli->getName()); - break; - case OnelabAttr::Function: - OnelabServer::instance()->set(*(onelab::function *)attr, cli->getName()); - OnelabServer::instance()->getPtr((onelab::function **)¶meter, attr->getName(), cli->getName()); - break; - } - // TODO - //rep.msgType(OnelabProtocol::OnelabUpdate); - //rep.attrs.push_back(parameter); - //recvlen = rep.encodeMsg(buff, 1024); - //std::map<std::string, bool> clients = attr->getClients(); - //for(std::map<std::string, bool>::const_iterator it = clients.begin(); it != clients.end(); it++) { - // OnelabLocalNetworkClient *tmp = OnelabServer::instance()->getClient(it->first); - // if(tmp == NULL || cli == tmp) continue; - // tmp->sendto(buff, recvlen); - //} - } - else - switch((*it)->getAttributeType()) { - case 0x0B: - { - // TODO check if file is on a specific client - const char *filename = ((OnelabAttrFileQuery *)*it)->getFilename(); - // FIXME path/filename ? - std::clog << "try to open " << filename << " to read" << std::endl; - FILE *fp = fopen(filename, "rb"); - if(fp != NULL){ - std::clog << "file open" << std::endl; - rep.msgType(OnelabProtocol::OnelabUpdate); - rep.attrs.push_back(new OnelabAttrFile(std::string(filename), fp)); - recvlen = rep.encodeMsg(buff, 1024); + std::clog << "recv " << recvlen << " bytes on client " << cli->getName() << std::endl; + //unreaded = msg.parseMsg(buff, recvlen); + switch (msg.msgType()) { + case OnelabProtocol::OnelabStop: + std::cout << "\033[0;31m" << "Client \"" << cli->getName() << "\" is going to stop" << "\033[0m" << std::endl; // DEBUG + rep.msgType(OnelabProtocol::OnelabStop); + recvlen = rep.encodeMsg(buff, 1024); + cli->sendto(buff, recvlen); + //UDT::epoll_remove_usock(eid, *it); + UDT::epoll_remove_ssock(eid, *it); + OnelabServer::instance()->removeClient(cli); + UDT::close(*it); + break; + case OnelabProtocol::OnelabMessage: + // TODO + break; + case OnelabProtocol::OnelabRequest: + rep.msgType(OnelabProtocol::OnelabResponse); + if(msg.attrs.size() == 0) OnelabServer::instance()->sendAllParameter(cli); + else for(std::vector<OnelabAttr *>::iterator it = msg.attrs.begin() ; it != msg.attrs.end(); ++it) { + if((*it)->getAttributeType() == OnelabAttr::Parameter) { + OnelabAttrParameterQuery *attr = (OnelabAttrParameterQuery *)*it; + std::cout << "\033[0;31m" << "Client \"" << cli->getName() << " ask for parameter \"" << attr->getName() << "\"\033[0m" << std::endl; // DEBUG + onelab::parameter *p; + switch(attr->paramType()) { + case OnelabAttr::Number: + OnelabServer::instance()->getPtr((onelab::number **)&p, attr->getName(), cli->getName()); + break; + case OnelabAttr::String: + OnelabServer::instance()->getPtr((onelab::string **)&p, attr->getName(), cli->getName()); + break; + case OnelabAttr::Region: + OnelabServer::instance()->getPtr((onelab::region **)&p, attr->getName(), cli->getName()); + break; + case OnelabAttr::Function: + OnelabServer::instance()->getPtr((onelab::function **)&p, attr->getName(), cli->getName()); + break; + } + if(p != NULL) rep.attrs.push_back(p); + else rep.attrs.push_back(new OnelabAttrMessage("Unable to find the request parameter.", OnelabAttrMessage::Error)); + } + // else ?? + } + recvlen = rep.encodeMsg(buff, 1024); + cli->sendto(buff, recvlen); + break; + case OnelabProtocol::OnelabUpdate: + for(std::vector<OnelabAttr *>::iterator it = msg.attrs.begin() ; it != msg.attrs.end(); ++it) { + if((*it)->getAttributeType() >= OnelabAttr::Number && (*it)->getAttributeType() <= OnelabAttr::Function) { + onelab::parameter *attr = (onelab::parameter *)*it; + std::cout << "\033[0;31m" << "Client \"" << cli->getName() << " update parameter \"" << attr->getName() << "\"\033[0m" << std::endl; // DEBUG + onelab::parameter *parameter = NULL; + switch(attr->getAttributeType()) { + case OnelabAttr::Number: + OnelabServer::instance()->set(*(onelab::number *)attr, cli->getName()); + OnelabServer::instance()->getPtr((onelab::number **)¶meter, attr->getName(), cli->getName()); + break; + case OnelabAttr::String: + OnelabServer::instance()->set(*(onelab::string *)attr, cli->getName()); + OnelabServer::instance()->getPtr((onelab::string **)¶meter, attr->getName(), cli->getName()); + break; + case OnelabAttr::Region: + OnelabServer::instance()->set(*(onelab::region *)attr, cli->getName()); + OnelabServer::instance()->getPtr((onelab::region **)¶meter, attr->getName(), cli->getName()); + break; + case OnelabAttr::Function: + OnelabServer::instance()->set(*(onelab::function *)attr, cli->getName()); + OnelabServer::instance()->getPtr((onelab::function **)¶meter, attr->getName(), cli->getName()); + break; + } + // TODO + //rep.msgType(OnelabProtocol::OnelabUpdate); + //rep.attrs.push_back(parameter); + //recvlen = rep.encodeMsg(buff, 1024); + //std::map<std::string, bool> clients = attr->getClients(); + //for(std::map<std::string, bool>::const_iterator it = clients.begin(); it != clients.end(); it++) { + // OnelabLocalNetworkClient *tmp = OnelabServer::instance()->getClient(it->first); + // if(tmp == NULL || cli == tmp) continue; + // tmp->sendto(buff, recvlen); + //} + } + else + switch((*it)->getAttributeType()) { + case 0x0B: + { + // TODO check if file is on a specific client + const char *filename = ((OnelabAttrFileQuery *)*it)->getFilename(); + // FIXME path/filename ? + std::clog << "try to open " << filename << " to read" << std::endl; + FILE *fp = fopen(filename, "rb"); + if(fp != NULL){ + std::clog << "file open" << std::endl; + rep.msgType(OnelabProtocol::OnelabUpdate); + rep.attrs.push_back(new OnelabAttrFile(std::string(filename), fp)); + recvlen = rep.encodeMsg(buff, 1024); + cli->sendto(buff, recvlen); + while((recvlen = fread(buff, 1, 1024, fp)) > 0){ cli->sendto(buff, recvlen); - while((recvlen = fread(buff, 1, 1024, fp)) > 0){ - cli->sendto(buff, recvlen); - } } - std::clog << "file ok" << std::endl; - break; } - case 0x0C: - const char *filename = ((OnelabAttrFile *)*it)->getFilename(); - std::clog << "try to open " << filename << " to write" << std::endl; - FILE *fp = fopen(filename, "wb"); - if(fp != NULL){ - std::clog << "file open" << std::endl; - int filesize = ((OnelabAttrFile *)*it)->getFileSize(); - int downloadsize = 0; - while(downloadsize < filesize) { - recvlen = cli->recvfrom(buff, 1024); - fwrite(buff, 1, recvlen, fp); - } - } - std::clog << "file ok" << std::endl; - break; - } - } - break; - } - } - } - } + std::clog << "file ok" << std::endl; + break; + } + case 0x0C: + const char *filename = ((OnelabAttrFile *)*it)->getFilename(); + std::clog << "try to open " << filename << " to write" << std::endl; + FILE *fp = fopen(filename, "wb"); + if(fp != NULL){ + std::clog << "file open" << std::endl; + int filesize = ((OnelabAttrFile *)*it)->getFileSize(); + int downloadsize = 0; + while(downloadsize < filesize) { + recvlen = cli->recvfrom(buff, 1024); + fwrite(buff, 1, recvlen, fp); + } + } + std::clog << "file ok" << std::endl; + break; + } + } + break; + } + } + } + } } #endif + void OnelabServer::sendAllParameter(OnelabLocalNetworkClient *cli) { - std::set<onelab::parameter*, onelab::parameterLessThan> ps; - OnelabProtocol msg = OnelabProtocol(OnelabProtocol::OnelabUpdate); - UInt32 bufflen = 1024, recvlen = 0; - UInt8 buff[1024]; - _parameterSpace.getAllParameters(ps); - for(std::set<onelab::parameter*, onelab::parameterLessThan>::iterator it = ps.begin(); it != ps.end(); it++) - if((*it)->hasClient(cli->getName())) msg.attrs.push_back(*it); - recvlen = msg.encodeMsg(buff, bufflen); - cli->sendto(buff, recvlen); + std::set<onelab::parameter*, onelab::parameterLessThan> ps; + OnelabProtocol msg = OnelabProtocol(OnelabProtocol::OnelabUpdate); + UInt32 bufflen = 1024, recvlen = 0; + UInt8 buff[1024]; + _parameterSpace.getAllParameters(ps); + for(std::set<onelab::parameter*, onelab::parameterLessThan>::iterator it = ps.begin(); it != ps.end(); it++) + if((*it)->hasClient(cli->getName())) msg.attrs.push_back(*it); + recvlen = msg.encodeMsg(buff, bufflen); + cli->sendto(buff, recvlen); } + void OnelabServer::Run() { - UInt32 bufflen = 1024, recvlen = 0; - UInt8 buff[1024]; - IPv4 ip; - OnelabProtocol msg(-1), rep(OnelabProtocol::OnelabResponse); - OnelabLocalNetworkClient *currentClient = NULL; + UInt32 bufflen = 1024, recvlen = 0; + UInt8 buff[1024]; + IPv4 ip; + OnelabProtocol msg(-1), rep(OnelabProtocol::OnelabResponse); + OnelabLocalNetworkClient *currentClient = NULL; #ifdef HAVE_UDT - UDTSOCKET newcli = 0; + UDTSOCKET newcli = 0; #ifndef WIN32 - pthread_t listen_thread; + pthread_t listen_thread; #else - HANDLER listen_thread; + HANDLER listen_thread; #endif - _eid = UDT::epoll_create(); + _eid = UDT::epoll_create(); + + udt_socket_listen(_fdu); + ip4_socket_listen(_fds); + std::clog << "listen on " << ip4_inet_ntop(_ip.address) << ":" << _ip.port << "(tcp)" << std::endl; + // << "listen on " << ip4_inet_ntop(_ip.address) << ":" << _ip.port << "(udp/udt)" << std::endl; + //while(newcli = udt_socket_accept(_fdu, ip)) { // TODO accept udt and tcp ? - udt_socket_listen(_fdu); - ip4_socket_listen(_fds); - std::clog << "listen on " << ip4_inet_ntop(_ip.address) << ":" << _ip.port << "(tcp)" << std::endl; - // << "listen on " << ip4_inet_ntop(_ip.address) << ":" << _ip.port << "(udp/udt)" << std::endl; - //while(newcli = udt_socket_accept(_fdu, ip)) { // TODO accept udt and tcp ? - while(newcli = ip4_socket_accept(_fds, ip)) { - std::clog << "\033[0;31m" << "accpet peer : " << ip4_inet_ntop(ip.address)<< ':' << ip.port << "\033[0m" << std::endl; - //UDT::epoll_add_usock(_eid, newcli); - UDT::epoll_add_ssock(_eid, newcli); - if(_clients.size() == 0) + while(newcli = ip4_socket_accept(_fds, ip)) { + std::clog << "\033[0;31m" << "accpet peer : " << ip4_inet_ntop(ip.address)<< ':' << ip.port << "\033[0m" << std::endl; + //UDT::epoll_add_usock(_eid, newcli); + UDT::epoll_add_ssock(_eid, newcli); + if(_clients.size() == 0) #ifndef WIN32 - pthread_create(&listen_thread, NULL, listenOnClients, NULL); + pthread_create(&listen_thread, NULL, listenOnClients, NULL); #else - listen_thread = CreateThread(NULL, 0, listenOnClients, NULL, 0, NULL); + listen_thread = CreateThread(NULL, 0, listenOnClients, NULL, 0, NULL); #endif - if(_clients.size() == 1) + if(_clients.size() == 1) #ifndef WIN32 - pthread_join(listen_thread, NULL); + pthread_join(listen_thread, NULL); #else - WaitForSingleObject(listen_thread, INFINITE); + WaitForSingleObject(listen_thread, INFINITE); #endif - } - udt_socket_close(_fdu); + } + udt_socket_close(_fdu); #else // TODO - ip4_socket_close(_fds); + ip4_socket_close(_fds); #endif }