diff --git a/contrib/onelab2/GmshLocalClient.cpp b/contrib/onelab2/GmshLocalClient.cpp index db3d031bd01424a948084e69bfe64a2c9e6e2b39..8554f043348b9cff92dd5d1a28fa06795a37bb1b 100644 --- a/contrib/onelab2/GmshLocalClient.cpp +++ b/contrib/onelab2/GmshLocalClient.cpp @@ -52,6 +52,13 @@ void GmshLocalClient::onMessage(const std::string & name, const std::string &mes FlGui::instance()->unlock(); Fl::awake((void *)NULL); } +void GmshLocalClient::onStop() +{ + FlGui::instance()->lock(); + _cb_obj->setButtonMode("check", "compute"); + FlGui::instance()->unlock(); + Fl::awake((void *)NULL); +} void GmshLocalClient::refresh() { Fl::awake(onelab_cb, (void*)"refresh"); @@ -64,7 +71,6 @@ void GmshLocalClient::mergeFile(const std::string &filename) FlGui::instance()->unlock(); Fl::awake((void *)NULL); } -#endif void GmshLocalClient::run(std::string action) { if(getName() == "Gmsh") { @@ -74,3 +80,4 @@ void GmshLocalClient::run(std::string action) { Fl::awake((void *)NULL); } } +#endif diff --git a/contrib/onelab2/GmshLocalClient.h b/contrib/onelab2/GmshLocalClient.h index 9fca4a174b53d85f63392078ee62232926ec0275..6701064a698bf0d64c6e6a03ca7d04c9925fcab9 100644 --- a/contrib/onelab2/GmshLocalClient.h +++ b/contrib/onelab2/GmshLocalClient.h @@ -25,6 +25,7 @@ public: void onUpdateParameter(onelab::parameter *p); void onRemoveParameter(onelab::parameter *p); void onMessage(const std::string &name, const std::string &message, int level); + void onStop(); void refresh(); void mergeFile(const std::string &filename); #else diff --git a/contrib/onelab2/NetworkUtils.cpp b/contrib/onelab2/NetworkUtils.cpp index 12bfa2497bed629e2194202b9fbf508f9a58ef31..4f0f6001bd42b5d70e643ba02af573e7264f2054 100644 --- a/contrib/onelab2/NetworkUtils.cpp +++ b/contrib/onelab2/NetworkUtils.cpp @@ -1,10 +1,12 @@ #include <sys/socket.h> +#include <sys/un.h> #include <netdb.h> #include <ifaddrs.h> #include <iostream> #include <unistd.h> #include "NetworkUtils.h" +#include "OnelabException.h" UInt32 ip4_inet_pton(const char *ip) { @@ -69,9 +71,9 @@ Socket ip4_socket(IPv4 ip, int socketType) addr.sin_addr.s_addr = hton32((ip.address==0)? INADDR_ANY : ip.address); addr.sin_port = hton16(ip.port); - if((fd = socket(AF_INET, socketType, 0)) < 0) throw ERROR_SOCKET_CREATE; + if((fd = socket(AF_INET, socketType, 0)) < 0) throw NetworkException(NetworkException::Create); - if(bind(fd, (struct sockaddr*)&addr, sizeof(addr)) != 0) throw ERROR_SOCKET_BIND; + if(bind(fd, (struct sockaddr*)&addr, sizeof(addr)) != 0) throw NetworkException(NetworkException::Bind); return fd; } @@ -144,3 +146,44 @@ bool ip4_socket_get_local_address(Socket fd, IPv4 &ip) } return false; } + +Socket unix_socket(int socketType) +{ + Socket fd; + + if((fd = socket(PF_UNIX, socketType, 0)) < 0) throw NetworkException(NetworkException::Create); + + return fd; +} +void unix_socket_listen(Socket fd, const char *sockname, int maxconnection) +{ + struct sockaddr_un addr; + memset(&addr, 0, sizeof(struct sockaddr_un)); + addr.sun_family = AF_UNIX; + strcpy(addr.sun_path, sockname); + + if(bind(fd, (struct sockaddr*)&addr, sizeof(addr)) != 0) throw NetworkException(NetworkException::Bind); + if(-1 == listen(fd, maxconnection)) throw NetworkException(NetworkException::Listen); +} +Socket unix_socket_accept(Socket fd) +{ + struct sockaddr_un addr; + unsigned int addrl = sizeof(addr); + + memset(&addr, 0, addrl); + addr.sun_family = AF_UNIX; + + Socket cli = accept(fd, (struct sockaddr*)&addr, &addrl); + return cli; +} + +Socket unix_socket_connect(Socket fd, const char* sockname) +{ + struct sockaddr_un addr; + memset(&addr, 0, sizeof(addr)); + + addr.sun_family = AF_UNIX; + strcpy(addr.sun_path, sockname); + + return connect(fd, (struct sockaddr *)&addr, sizeof(addr)); +} diff --git a/contrib/onelab2/OnelabDatabase.cpp b/contrib/onelab2/OnelabDatabase.cpp index 817a671135a8d1e326cd15f53acbabf10defc9d9..165440c39073ec91473d53b4a4f95bc358f070a0 100644 --- a/contrib/onelab2/OnelabDatabase.cpp +++ b/contrib/onelab2/OnelabDatabase.cpp @@ -87,14 +87,6 @@ DWORD WINAPI OnelabDatabase_listen(LPVOID arg) } } } -#ifndef WIN32 -void *OnelabDatabase_server(void *arg) -#else -DWORD WINAPI OnelabDatabase_server(LPVOID arg) -#endif -{ - OnelabServer::instance()->Run(); -} void OnelabDatabase::finalize() { @@ -103,6 +95,24 @@ void OnelabDatabase::finalize() pthread_join(_listenThread, NULL); } else { - //TODO pthread_join(_serverThread, NULL); + OnelabServer::instance()->finalize(); } } + +void solver_batch_cb(void *data) +{ + int num = (intptr_t)data; + if(num < 0) return; + std::string name = opt_solver_name(num, GMSH_GET, ""); + std::string exe = opt_solver_executable(num, GMSH_GET, ""); + std::string host = opt_solver_remote_login(num, GMSH_GET, ""); + if(exe.empty()){ + Msg::Error("Solver executable name not provided"); + return; + } + + onelab::number n("0Metamodel/Batch", CTX::instance()->batch); + n.setVisible(false); + //onelab::server::instance()->set(n); + // TODO +} diff --git a/contrib/onelab2/OnelabDatabase.h b/contrib/onelab2/OnelabDatabase.h index 9b5358a967f9a936f5389b1f06e0a40d8b4211d4..d263358bf8d834d93a72d966d4bcb8a488b04cac 100644 --- a/contrib/onelab2/OnelabDatabase.h +++ b/contrib/onelab2/OnelabDatabase.h @@ -13,16 +13,18 @@ #include "GmshNetworkClient.h" #include "GmshLocalClient.h" +#include "OnelabException.h" + +void solver_batch_cb(void *data); #ifndef WIN32 void *OnelabDatabase_listen(void *arg); -void *OnelabDatabase_server(void *arg); #else DWORD WINAPI OnelabDatabase_listen(LPVOID arg); -DWORD WINAPI OnelabDatabase_server(void *arg); #endif static void initializeLoops(); static bool incrementLoops(); + // OnelabDatabase is a singleton that get/set/... parameters from server/clients class OnelabDatabase { private: @@ -36,10 +38,14 @@ private: GmshLocalClient *_localGUI, *_localGmsh; void _clear() { #ifndef WIN32 - // TODO send message to thread to exit the thread - // pthread_cancel - //pthread_kill(_serverThread, 9); - //if(_client) pthread_kill(_listenThread, 9); + if(_client) pthread_cancel(_listenThread); + else { + OnelabServer::instance()->stopTcp(); + OnelabServer::instance()->stopUnix(); +#ifdef HAVE_UDT + OnelabServer::instance()->stopUdt(); +#endif + } #else // TODO #endif @@ -64,18 +70,46 @@ public: // the server is in the same memory space _clear(); - OnelabServer::instance(0x7F000001, 0); - //OnelabServer::instance(0, 0); + CTX::instance()->onelab.unixConnected = CTX::instance()->onelab.tcpConnected = CTX::instance()->onelab.udtConnected = false; + + try { + if(CTX::instance()->onelab.unixSock.size() > 0) { // UNIX + std::ostringstream tmp; + tmp << CTX::instance()->homeDir << CTX::instance()->onelab.unixSock; + OnelabServer::instance()->listenOnUnix(tmp.str().c_str()); + CTX::instance()->onelab.unixConnected = true; + } + if(CTX::instance()->onelab.tcpSock.size() > 0) { + std::size_t colon = CTX::instance()->onelab.tcpSock.find(":"); + OnelabServer::instance()->listenOnTcp( + ip4_inet_pton(CTX::instance()->onelab.tcpSock.substr(0, colon).c_str()), + atoi(CTX::instance()->onelab.tcpSock.substr(colon+1, CTX::instance()->onelab.tcpSock.size()-colon-1).c_str())); + CTX::instance()->onelab.tcpConnected = true; + } +#ifdef HAVE_UDT + if(CTX::instance()->onelab.udtSock.size() > 0) { + std::size_t colon = CTX::instance()->onelab.tcpSock.find(":"); + OnelabServer::instance()->listenOnUdt( + ip4_inet_pton(CTX::instance()->onelab.tcpSock.substr(0, colon).c_str()), + atoi(CTX::instance()->onelab.tcpSock.substr(colon+1, CTX::instance()->onelab.tcpSock.size()-colon-1).c_str())); + CTX::instance()->onelab.udtConnected = true; + } +#endif + if(CTX::instance()->onelab.unixSock.size() == 0 && CTX::instance()->onelab.tcpSock.size() == 0 && CTX::instance()->onelab.udtSock.size() == 0) { + OnelabServer::instance()->listenOnTcp(0x7F000001, 0); + CTX::instance()->onelab.tcpConnected = true; + } + } catch(NetworkException e) { + std::cout << e.what() << std::endl; + OnelabServer::instance()->listenOnTcp(0x7F000001, 0); + CTX::instance()->onelab.tcpSock = "127.0.0.1:0"; + CTX::instance()->onelab.tcpConnected = true; + } _localGUI = new GmshLocalClient("localGUI", OnelabServer::instance()->getParameterSpace()); + OnelabServer::instance()->addClient(_localGUI); _localGmsh = new GmshLocalClient("Gmsh", OnelabServer::instance()->getParameterSpace()); - OnelabServer::instance()->addClient(_localGUI); // TODO remove from server in _clear() - OnelabServer::instance()->addClient(_localGmsh); // TODO remove from server in _clear() + OnelabServer::instance()->addClient(_localGmsh); -#ifndef WIN32 - pthread_create(&_serverThread, NULL, OnelabDatabase_server, &(((OnelabLocalClient *)_localGmsh)->getName())); -#else - _serverThread = CreateThread(NULL, 0, NULL, OnelabDatabase_server, 0, NULL); -#endif return _localGUI; } GmshNetworkClient *useAsNetworkClient(UInt32 address, UInt16 port, std::string cli="GUI"){ @@ -108,6 +142,7 @@ public: } return NULL; } + bool isNetworkClient() {return _client != NULL;} GmshNetworkClient *getNetworkClient(){return _client;} void finalize(); int listen(OnelabProtocol &msg) { @@ -125,13 +160,9 @@ public: // use this as a network server (take interface/port to listen to) _clear(); - OnelabServer::instance(address, port); + // FIXME TCP / UDT ? + OnelabServer::instance()->listenOnTcp(address, port); _localGUI = new GmshLocalClient("localGUI", OnelabServer::instance()->getParameterSpace()); -#ifndef WIN32 - pthread_create(&_serverThread, NULL, OnelabDatabase_server, NULL); -#else - _serverThread = CreateThread(NULL, 0, NULL, OnelabDatabase_server, 0, NULL); -#endif return _localGUI; } template <class T> bool set(const T &p, const std::string &client) { diff --git a/contrib/onelab2/OnelabException.h b/contrib/onelab2/OnelabException.h new file mode 100644 index 0000000000000000000000000000000000000000..43fa0ffbd66ff6e8818ee97d4ee502818fa2ff9d --- /dev/null +++ b/contrib/onelab2/OnelabException.h @@ -0,0 +1,48 @@ +#ifndef _ONELABEXCEPTION_H_ +#define _ONELABEXCEPTION_H_ +#include <exception> + +//class OnelabException : public std::exception + + +class NetworkException : public std::exception +{ + private: + int _code; + + public: + NetworkException(int code) : _code(code) {} + + typedef enum { + Create, + Bind, + Connect, + Listen, + Send, + Recv, + nUnix + } NetworkExceptionCode; + + virtual const char* what(){ + switch(_code) { + case Create: + return "Unable to create the socket"; + case Bind: + return "Unable to bind the socket"; + case Connect: + return "Unable to connect the socket"; + case Listen: + return "Unable to listen on the socket"; + case Send: + return "Unable to send on the socket"; + case Recv: + return "Unable to recv on the socket"; + case nUnix: + return "UNIX sockets are unavailable"; + default: + return "Unkonown error with the socket"; + } + } +}; + +#endif diff --git a/contrib/onelab2/OnelabLocalClient.h b/contrib/onelab2/OnelabLocalClient.h index b227c5b19bb89e0276c2e9b8ccac08ab8bc4d94f..96c8d6977d2dd1454ea999da3ae352162cf03abc 100644 --- a/contrib/onelab2/OnelabLocalClient.h +++ b/contrib/onelab2/OnelabLocalClient.h @@ -20,11 +20,11 @@ public: virtual void onUpdateParameter(onelab::parameter *p){} virtual void onRemoveParameter(onelab::parameter *p){} virtual void onMessage(const std::string &name, const std::string &message, int level){} + virtual void onStop() {} virtual void refresh(){} virtual void mergeFile(const std::string &filename){} virtual void run(std::string action) {} - void stop() {} }; #endif diff --git a/contrib/onelab2/OnelabLocalNetworkClient.cpp b/contrib/onelab2/OnelabLocalNetworkClient.cpp index 7e40893ffb57255e70fbededdda8096d563cacdf..32cb4a5b9b11e420ed0e19922bc6926b5fba6ae5 100644 --- a/contrib/onelab2/OnelabLocalNetworkClient.cpp +++ b/contrib/onelab2/OnelabLocalNetworkClient.cpp @@ -1,7 +1,7 @@ #include "OnelabLocalNetworkClient.h" #include "OnelabProtocol.h" -//#ifdef HAVE_UDT +#ifdef HAVE_UDT OnelabLocalNetworkClient::OnelabLocalNetworkClient(std::string name, UDTSOCKET fd, unsigned int ip, unsigned short port, bool UDT) { _name = name; @@ -10,33 +10,32 @@ OnelabLocalNetworkClient::OnelabLocalNetworkClient(std::string name, UDTSOCKET f _ip.address = ip; _ip.port = port; } -//#endif +#endif OnelabLocalNetworkClient::OnelabLocalNetworkClient(std::string name, Socket fd, unsigned int ip, unsigned short port) { _name = name; - _fdu = 0; _fds = fd; _ip.address = ip; _ip.port = port; } void OnelabLocalNetworkClient::sendto(UInt8 *buff, unsigned int len) { -//#ifdef HAVE_UDT - if(_fds) ip4_socket_send(_fds, buff, len, _ip); +#ifdef HAVE_UDT + if(_fds) ip4_socket_send(_fds, buff, len); else udt_socket_send(_fdu, buff, len); -//#else -// return ip4_socket_send(_fds, buff, maxlen); -//#endif +#else + ip4_socket_send(_fds, buff, len); +#endif } int OnelabLocalNetworkClient::recvfrom(UInt8 *buff, unsigned int maxlen) { IPv4 unused; -//#ifdef HAVE_UDT +#ifdef HAVE_UDT if(_fds) return ip4_socket_recv(_fds, buff, maxlen, unused); return udt_socket_recv(_fdu, buff, maxlen); -//#else -// return ip4_socket_recv(_fds, buff, maxlen); -//#endif +#else + return ip4_socket_recv(_fds, buff, maxlen); +#endif } int OnelabLocalNetworkClient::recvmsg(OnelabProtocol &msg) { diff --git a/contrib/onelab2/OnelabLocalNetworkClient.h b/contrib/onelab2/OnelabLocalNetworkClient.h index 5349fbc4b91c88e413d0bd4c37e61040a776c837..68d499a5b8d6acbd19d103ac91c53997ba964ecb 100644 --- a/contrib/onelab2/OnelabLocalNetworkClient.h +++ b/contrib/onelab2/OnelabLocalNetworkClient.h @@ -3,34 +3,33 @@ #include <string> #include "onelab.h" -//#ifdef HAVE_UDT +#ifdef HAVE_UDT #include "UdtUtils.h" +#endif +#include "NetworkUtils.h" #include "OnelabProtocol.h" -//#else -//#include "NetworkUtils.h" -//#endif class OnelabLocalNetworkClient { private: Socket _fds; -//#ifdef HAVE_UDT +#ifdef HAVE_UDT UDTSOCKET _fdu; -//#endif +#endif IPv4 _ip; std::string _name; pthread_mutex_t _mutex_wait = PTHREAD_MUTEX_INITIALIZER; public: -//#ifdef HAVE_UDT +#ifdef HAVE_UDT OnelabLocalNetworkClient(std::string name, UDTSOCKET fd, unsigned int ip, unsigned short port, bool UDT); -//#endif + UDTSOCKET getUSocket() {return _fdu;} +#endif OnelabLocalNetworkClient(std::string name, Socket fd, unsigned int ip, unsigned short port); virtual ~OnelabLocalNetworkClient(){} void sendto(UInt8 *buff, unsigned int len); int recvfrom(UInt8 *buff, unsigned int maxlen); int recvmsg(OnelabProtocol &msg); - UDTSOCKET getSSocket() {return _fds;} - UDTSOCKET getUSocket() {return _fdu;} + Socket getSSocket() {return _fds;} std::string getName() {return _name;} void updateParameter(onelab::parameter *); unsigned int getIp() {return _ip.address;} diff --git a/contrib/onelab2/OnelabNetworkClient.cpp b/contrib/onelab2/OnelabNetworkClient.cpp index d3d0889556ef127285c5b933043e9d40e84fe528..5c2fd2fe702e19ff6788f8c0323c2eb729534c75 100644 --- a/contrib/onelab2/OnelabNetworkClient.cpp +++ b/contrib/onelab2/OnelabNetworkClient.cpp @@ -36,18 +36,6 @@ OnelabNetworkClient::OnelabNetworkClient(std::string name, unsigned int ip, unsi else _fds = ip4_socket(local, SOCK_STREAM); } #else -OnelabNetworkClient::OnelabNetworkClient(std::string name) - : VirtualClient(name) -{ - _ip.address = 0; - _ip.port = 0; - _connected = false; - - IPv4 local; - local.address = 0;// FIXME ip4_default_iface(); - local.port = 0; - _fds = ip4_socket(local, SOCK_STREAM); -} OnelabNetworkClient::OnelabNetworkClient(std::string name, unsigned int ip, unsigned short port) : VirtualClient(name) { @@ -61,10 +49,21 @@ OnelabNetworkClient::OnelabNetworkClient(std::string name, unsigned int ip, unsi _fds = ip4_socket(local, SOCK_STREAM); } #endif +OnelabNetworkClient::OnelabNetworkClient(std::string name, const char *sockname) + : VirtualClient(name) +{ + _ip.address = 0; + _ip.port = 0; + _connected = false; + + _sockname = std::string(sockname); + + _fds = unix_socket(SOCK_STREAM); +} void OnelabNetworkClient::sendto(UInt8 *buff, UInt16 len) { #ifndef HAVE_UDT - ip4_socket_send(_fds, buff, len, _ip); + ip4_socket_send(_fds, buff, len); #else if(_fds) ip4_socket_send(_fds, buff, len); else udt_socket_send(_fdu, buff, len); @@ -145,13 +144,18 @@ bool OnelabNetworkClient::connect() OnelabProtocol msg(OnelabProtocol::OnelabStart); #ifdef HAVE_UDT if(_fds) { - ip4_socket_connect(_fds, _ip); + if(_sockname.size()) + unix_socket_connect(_fds, _sockname.c_str()); + else + ip4_socket_connect(_fds, _ip); } - else{ + else udt_socket_connect(_fdu, _ip); - } #else - ip4_socket_connect(_fds, _ip); + if(_sockname.size()) + unix_socket_connect(_fds, _sockname.c_str()); + else + ip4_socket_connect(_fds, _ip); #endif msg.attrs.push_back(new OnelabAttrStart(_name)); recvlen = msg.encodeMsg(buff, bufflen); diff --git a/contrib/onelab2/OnelabNetworkClient.h b/contrib/onelab2/OnelabNetworkClient.h index cb7d0f87cdc0334afabfffe19ddbb1f8e1800b87..50c1a51af466ff0d4319735f9fb5e92ae4288497 100644 --- a/contrib/onelab2/OnelabNetworkClient.h +++ b/contrib/onelab2/OnelabNetworkClient.h @@ -18,6 +18,7 @@ private: #ifdef HAVE_UDT UDTSOCKET _fdu; #endif + std::string _sockname; Socket _fds; bool _connected; IPv4 _ip; @@ -31,6 +32,7 @@ private: } void requestParameters(); // request all parameter for this client public: + OnelabNetworkClient(std::string name, const char *sockname); #ifdef HAVE_UDT OnelabNetworkClient(std::string name, bool UDT=false); OnelabNetworkClient(std::string name, unsigned int ip, unsigned short port, bool UDT=false); diff --git a/contrib/onelab2/OnelabServer.cpp b/contrib/onelab2/OnelabServer.cpp index 21a9dd5256c456a5521f9871b72cc6684c522418..40b72c48d484a19856826342a3c7b83766f533d1 100644 --- a/contrib/onelab2/OnelabServer.cpp +++ b/contrib/onelab2/OnelabServer.cpp @@ -13,8 +13,9 @@ #include "OnelabAttributes.h" #include "onelab.h" #include "onelabUtils.h" +#include "OnelabException.h" -// FIXME no Gmsh specific header (used in launchClient) +// FIXME no Gmsh specific header ? #include "StringUtils.h" #include "GmshMessage.h" #include "OS.h" @@ -23,54 +24,159 @@ #ifndef WIN32 void *OnelabServer_run(void *param); +void *listenOnClients(void *param); +void *acceptTcpClient(void *param); +void *acceptUnixClient(void *param); +void *acceptUdtClient(void *param); #else DWORD WINAPI OnelabServer_run(LPVOID param); #endif -OnelabServer::OnelabServer(UInt32 iface, UInt16 port) +OnelabServer::OnelabServer() { _running = false; - _ip.address = iface; - _ip.port = port; -#ifdef HAVE_UDT + _udtServer = false; + _tcpServer = false; + _unixServer = false; + _ipu.address = _ip.address = 0; + _ipu.port = _ip.port = 0; UDT::startup(); - _fdu = udt_socket(_ip, SOCK_STREAM); -#endif - _fds = ip4_socket(_ip, SOCK_STREAM); + + _eid = UDT::epoll_create(); +} + +void OnelabServer::listenOnTcp(unsigned int iface, unsigned short port) +{ + if(_tcpServer) return; + + IPv4 ip = {iface, port}; + _fds = ip4_socket(ip, SOCK_STREAM); ip4_socket_ip(_fds, _ip); + ip4_socket_listen(_fds); + + std::clog << "\033[0;31m" << "listen on TCP - " << ip4_inet_ntop(iface) << ":" << port << "\033[0m" << std::endl; + + pthread_create(&_tcpThread, NULL, acceptTcpClient, NULL); + _tcpServer = true; } -OnelabServer::OnelabServer(UInt16 port) +void OnelabServer::listenOnUnix(const char *sockname) { - _running = false; - _ip.address = 0; - _ip.port = port; +#if !defined(WIN32) || defined(__CYGWIN__) + if(_unixServer) return; -#ifdef HAVE_UDT - UDT::startup(); - _fdu = udt_socket(_ip, SOCK_STREAM); + _fdx = unix_socket(SOCK_STREAM); + unix_socket_listen(_fdx, sockname); + + std::clog << "\033[0;31m" << "listen on UNIX - " << sockname << "\033[0m" << std::endl; + _sockname = std::string(sockname); + + pthread_create(&_unixThread, NULL, acceptUnixClient, NULL); + _unixServer = true; +#else + throw NetworkException(NetworkException::nUnix); #endif - _fds = ip4_socket(_ip, SOCK_STREAM); - ip4_socket_ip(_fds, _ip); } +void OnelabServer::acceptTcp() +{ + IPv4 ip; + while(Socket newcli = ip4_socket_accept(_fds, ip)) { + std::clog << "\033[0;31m" << "accept TCP peer : " << ip4_inet_ntop(ip.address)<< ':' << ip.port << "\033[0m" << std::endl; + UDT::epoll_add_ssock(_eid, newcli); + if(_clients.size() == 0) +#ifndef WIN32 + pthread_create(&_listenThread, NULL, listenOnClients, NULL); +#else + listenThread = CreateThread(NULL, 0, listenOnClients, NULL, 0, NULL); +#endif + } +} +void OnelabServer::acceptUnix() +{ + while(Socket newcli = unix_socket_accept(_fdx)) { + std::clog << "\033[0;31m" << "accept peer on UNIX socket : " << _sockname << "\033[0m" << std::endl; + UDT::epoll_add_ssock(_eid, newcli); + if(_clients.size() == 0) +#ifndef WIN32 + pthread_create(&_listenThread, NULL, listenOnClients, NULL); +#else + listenThread = CreateThread(NULL, 0, listenOnClients, NULL, 0, NULL); +#endif + } +} +void OnelabServer::stopTcp() +{ + if(!_tcpServer) return; + + pthread_cancel(_tcpThread); + close(_fds); + + _tcpServer = false; +} +void OnelabServer::stopUnix() +{ + if(!_unixServer) return; + + pthread_cancel(_unixThread); + unlink(_sockname.c_str()); + _unixServer = false; +} #ifdef HAVE_UDT -void OnelabServer::addClient(std::string name, UDTSOCKET fd, UInt32 ip, UInt16 port) +void OnelabServer::listenOnUdt(unsigned int iface, unsigned short port) { - this->_clients.push_back(OnelabLocalNetworkClient(name, fd, ip, port)); + if(_udtServer) return; + + IPv4 ip = {iface, port}; + _fdu = udt_socket(ip, SOCK_STREAM); + udt_socket_listen(_fdu); + + std::clog << "\033[0;31m" << "listen on UDT - " << ip4_inet_ntop(iface) << ":" << port << "\033[0m" << std::endl; + + pthread_create(&_udtThread, NULL, acceptUdtClient, NULL); + _udtServer = true; +} +void OnelabServer::acceptUdt() +{ + IPv4 ip; + while(Socket newcli = udt_socket_accept(_fdu, ip)) { + std::clog << "\033[0;31m" << "accept peer on UNIX socket\033[0m" << std::endl; + UDT::epoll_add_usock(_eid, newcli); + if(_clients.size() == 0) +#ifndef WIN32 + pthread_create(&_listenThread, NULL, listenOnClients, NULL); +#else + listenThread = CreateThread(NULL, 0, listenOnClients, NULL, 0, NULL); +#endif + } +} +void OnelabServer::stopUdt() +{ + if(!_udtServer) return; + + pthread_cancel(_udtThread); + + _udtServer = false; } +#endif -OnelabLocalNetworkClient *OnelabServer::getClient(UDTSOCKET fd) // UDTSOCKET Socket +OnelabLocalNetworkClient *OnelabServer::getClient(Socket fd) { for(std::vector<OnelabLocalNetworkClient>::iterator it = _clients.begin(); it != _clients.end(); ++it) { +#ifdef HAVE_UDT if(it->getUSocket() == fd) return &(*it); +#endif if(it->getSSocket() == fd) return &(*it); } return NULL; } -#endif -int OnelabServer::launchClient(const std::string &client, bool blocking) // FIXME OnelabDatabase instead of OnelabServer ? +void OnelabServer::addClient(std::string name, Socket fd, UInt32 ip, UInt16 port) +{ + this->_clients.push_back(OnelabLocalNetworkClient(name, fd, ip, port)); +} + +int OnelabServer::launchClient(const std::string &client, bool blocking) { // launch a new client with a system call std::string command = ""; @@ -110,12 +216,25 @@ int OnelabServer::launchClient(const std::string &client, bool blocking) // FIXM char cmd[1024]; // UNIX socket - //TODO sprintf(cmd, command, _sockname.c_str()); - + if(_sockname.size()) + sprintf(cmd, command.c_str(), _sockname.c_str()); // TCP socket - sprintf(cmd, command.c_str(), " %s:%d"); - command.assign(cmd); - sprintf(cmd, command.c_str(), (_ip.address==0)?"127.0.0.1":ip4_inet_ntop(_ip.address).c_str(), _ip.port); + else if(_ip.port > 0) { + sprintf(cmd, command.c_str(), " %s:%d"); + command.assign(cmd); + if(_ip.address > 0) sprintf(cmd, command.c_str(), ip4_inet_ntop(_ip.address).c_str(), _ip.port); + else sprintf(cmd, command.c_str(), "127.0.0.1", _ip.port); + } + // UDP (UDT) socket + else if(_ipu.port > 0){ + sprintf(cmd, command.c_str(), " u%s:%d"); + command.assign(cmd); + sprintf(cmd, command.c_str(), ip4_inet_ntop(_ipu.address).c_str(), _ipu.port); + } + else { + // unknown.... + return 1; + } std::cout << "launch " << client << " with command: " << cmd << std::endl; SystemCall(cmd, blocking); @@ -123,18 +242,6 @@ int OnelabServer::launchClient(const std::string &client, bool blocking) // FIXM return 0; } -//void OnelabServer::addClient(std::string name, Socket fd, UInt32 ip, UInt16 port) -//{ -// if(ip == 0 || port == 0) throw "Unable to add client (invalid ip or port)"; -// this->_clients.push_back(OnelabLocalNetworkClient(name, fd, ip, port)); -//} -//OnelabLocalNetworkClient *OnelabServer::getClient(Socket fd) -//{ -// for(std::vector<OnelabLocalNetworkClient>::iterator it = _clients.begin(); it != _clients.end(); ++it) { -// if(it->getSSocket() == fd) return &(*it); -// } -// return NULL; -//} void OnelabServer::sendto(std::string client, UInt8 *buff, UInt32 len) { for(std::vector<OnelabLocalNetworkClient>::iterator it = this->_clients.begin() ; it != this->_clients.end(); ++it) { @@ -181,14 +288,58 @@ void OnelabServer::removeClient(OnelabLocalNetworkClient *client) } } +void OnelabServer::stopClient(OnelabLocalNetworkClient *cli) +{ + // FIXME check if listenOnClients does not perform action on this client + UInt8 buff[16]; +#ifdef HAVE_UDT + bool isUDT = cli->getSSocket() == 0; + Socket fd = (!isUDT)? cli->getSSocket() : cli->getUSocket(); +#else + Socket fd = cli->getSSocket(); +#endif + OnelabProtocol msg(OnelabProtocol::OnelabStop); + int recvlen = msg.encodeMsg(buff, 16); + onelab::string o(cli->getName() + "/Action", "stop"); + o.setVisible(false); + o.setNeverChanged(true); + set(o); + cli->sendto(buff, recvlen); +#ifdef HAVE_UDT + if(isUDT) { + UDT::epoll_remove_usock(_eid, fd); + UDT::close(fd); + } + else { + UDT::epoll_remove_ssock(_eid, fd); + close(fd); + } +#else + UDT::epoll_remove_ssock(_eid, fd); + close(fd); +#endif + removeClient(cli); +} + +void OnelabServer::stopClients() +{ + std::cout << _clients.size() << std::endl; + for(int i=0; i < _clients.size(); i++) { + stopClient(&_clients[i]); + } +} + +void OnelabServer::waitOnClients() +{ + if(_running) pthread_join(_runningThread, NULL); +} + bool OnelabServer::performNextAction() { - pthread_mutex_lock(&_mutex_todo); - if(_todoClient.empty() || _todoAction.empty()) return false; + if(_todoClient.size() == 0 || _todoAction.size() == 0) return false; std::string client = _todoClient.front(), action = _todoAction.front(); - pthread_mutex_unlock(&_mutex_todo); performAction(action, client, true); pthread_mutex_lock(&_mutex_todo); _todoAction.pop(); @@ -197,7 +348,7 @@ bool OnelabServer::performNextAction() return true; } -void OnelabServer::performAction(const std::string action, const std::string client, bool blocking) +void OnelabServer::performAction(const std::string &action, const std::string &client, bool blocking) { if(blocking) { if(client.size()) { @@ -213,11 +364,11 @@ void OnelabServer::performAction(const std::string action, const std::string cli } if(cli != NULL){ // Gmsh is used as a server and the client is remote std::cout << action << " on " << client << "(client is remote)" << std::endl; - cli->run(action); // block ,use another thread ? + cli->run(action); } else if(localcli != NULL){ // client is local (in the same memory space than this server) std::cout << action << " on " << client << "(client is local)" << std::endl; - localcli->run(action); // block, use another thread ? + localcli->run(action); } else { // client does not exist (Gmsh is used as a server), launch the client std::cout << action << " on " << client << "(launch a new remote client)" << std::endl; @@ -225,25 +376,25 @@ void OnelabServer::performAction(const std::string action, const std::string cli } } else { - // run all non Gmsh clients TODO; exclude GUI ? + // run all non Gmsh clients for(std::vector<OnelabLocalNetworkClient>::iterator it = _clients.begin(); it != _clients.end(); ++it) { - if((*it).getName() == "Gmsh") continue; + if((*it).getName() == "Gmsh" || (*it).getName() == "GUI") continue; std::cout << action << " on " << (*it).getName() << "(remote)" << std::endl; onelab::string o((*it).getName() + "/Action", action); o.setVisible(false); o.setNeverChanged(true); set(o, (*it).getName()); - (*it).run(action); + (*it).run(action); // FIXME Block } - for(std::vector<OnelabLocalClient *>::iterator it = _localClients.begin(); it != _localClients.end(); ++it) { - if((*it)->getName() == "Gmsh") continue; - onelab::string o((*it)->getName() + "/Action", action); - o.setVisible(false); - o.setNeverChanged(true); - set(o); - std::cout << action << " on " << (*it)->getName() << "(local)" << std::endl; - (*it)->run(action); + for(std::vector<OnelabLocalClient *>::iterator it = _localClients.begin(); it != _localClients.end(); ++it) { + if((*it)->getName() == "Gmsh") continue; + onelab::string o((*it)->getName() + "/Action", action); + o.setVisible(false); + o.setNeverChanged(true); + set(o); + std::cout << action << " on " << (*it)->getName() << "(local)" << std::endl; + (*it)->run(action); } } } @@ -255,14 +406,30 @@ void OnelabServer::performAction(const std::string action, const std::string cli pthread_mutex_unlock(&_mutex_todo); } else { + _running = true; _mutex_todo = PTHREAD_MUTEX_INITIALIZER; - _todoClient.push(client); - _todoAction.push(action); + _todoClient.push(std::string(client)); + _todoAction.push(std::string(action)); pthread_create(&_runningThread, NULL, OnelabServer_run, NULL); } } } +void *acceptTcpClient(void *param) +{ + OnelabServer::instance()->acceptTcp(); +} +void *acceptUnixClient(void *param) +{ + OnelabServer::instance()->acceptUnix(); +} +void *acceptUdtClient(void *param) +{ +#ifdef HAVE_UDT + OnelabServer::instance()->acceptUdt(); +#endif +} + #ifndef WIN32 void *OnelabServer_run(void *param) #else @@ -276,7 +443,6 @@ DWORD WINAPI OnelabServer_run(LPVOID param) OnelabServer::instance()->running(false); } -#ifdef HAVE_UDT #ifndef WIN32 void *listenOnClients(void *param) #else @@ -284,14 +450,16 @@ void *listenOnClients(void *param) #endif { IPv4 ip; - std::set<UDTSOCKET> fdus; - std::set<Socket> fdss; int recvlen = 0; UInt8 buff[1024]; OnelabProtocol msg(-1), rep(-1); int eid = OnelabServer::instance()->getEID(); + std::set<Socket> fdss; +#ifdef HAVE_UDT + std::set<UDTSOCKET> fdus; while(UDT::ERROR != UDT::epoll_wait(eid, &fdus, NULL, -1, &fdss, NULL)) { - /*for(std::set<UDTSOCKET>::iterator it = fdus.begin(); it != fdus.end(); ++it) { + /* + for(std::set<UDTSOCKET>::iterator it = fdus.begin(); it != fdus.end(); ++it) { OnelabLocalNetworkClient *cli = OnelabServer::instance()->getClient(*it); if(cli == NULL) { // Client is not in the list (it must be a Start message) IPv4 ip; @@ -464,30 +632,32 @@ void *listenOnClients(void *param) break; } } - }*/ - - for(std::set<Socket>::iterator it = fdss.begin(); it != fdss.end(); ++it) { - + } + */ +#else + while(UDT::ERROR != UDT::epoll_wait(eid, NULL, NULL, -1, &fdss, NULL)) { +#endif + for(std::set<Socket>::iterator it = fdss.begin(); it != fdss.end(); ++it) { // For TCP and UNIX OnelabLocalNetworkClient *cli = OnelabServer::instance()->getClient(*it); if(cli == NULL) { // Client is not in the list (we should get a Start message) IPv4 ip; // recv the header - recvlen = ip4_socket_recv(*it, buff, 4, ip); + recvlen = ip4_socket_recv(*it, buff, 4); if(recvlen != 4) { // invalid message header UDT::epoll_remove_ssock(eid, *it); - UDT::close(*it); + close(*it); continue; } int msglen = msg.parseHeader(buff, recvlen); if(msglen > 1024) { // FIXME? buffer is too small UDT::epoll_remove_ssock(eid, *it); - UDT::close(*it); + close(*it); continue; } // then recv the message - recvlen = ip4_socket_recv(*it, buff, msglen, ip); + recvlen = ip4_socket_recv(*it, buff, msglen); msg.parseMessage(buff, recvlen); if(msg.msgType() == OnelabProtocol::OnelabStart && msg.attrs.size() > 0 && msg.attrs[0]->getAttributeType() == OnelabAttr::Start) { std::string name = std::string(((OnelabAttrStart *)msg.attrs[0])->name()); @@ -498,7 +668,7 @@ void *listenOnClients(void *param) recvlen = rep.encodeMsg(buff, 1024); ip4_socket_send(*it, buff, recvlen); UDT::epoll_remove_ssock(eid, *it); - UDT::close(*it); + close(*it); continue; } // Add a new remote client @@ -514,7 +684,7 @@ void *listenOnClients(void *param) else { // cli shoud send a name first UDT::epoll_remove_ssock(eid, *it); - UDT::close(*it); + close(*it); continue; } } @@ -527,7 +697,7 @@ void *listenOnClients(void *param) std::cout << "\033[0;31m" << "Connection with client \"" << cli->getName() << "\" was broken, removing the client." << "\033[0m" << std::endl; // DEBUG UDT::epoll_remove_ssock(eid, *it); OnelabServer::instance()->removeClient(cli); - UDT::close(*it); + close(*it); } std::cerr << "Error while recv message." << std::endl; continue; @@ -536,20 +706,13 @@ void *listenOnClients(void *param) std::cout << "\033[0;31m" << "Connection with client \"" << cli->getName() << "\" was broken, removing the client." << "\033[0m" << std::endl; // DEBUG UDT::epoll_remove_ssock(eid, *it); OnelabServer::instance()->removeClient(cli); - UDT::close(*it); + close(*it); continue; } switch (msg.msgType()) { case OnelabProtocol::OnelabStop: std::cout << "\033[0;31m" << "Client \"" << cli->getName() << "\" is going to stop" << "\033[0m" << std::endl; // DEBUG - rep.msgType(OnelabProtocol::OnelabStop); - recvlen = rep.encodeMsg(buff, 1024); - if(ip4_socket_connected(cli->getSSocket())) // FIXME cli can close socket before send - cli->sendto(buff, recvlen); - //UDT::epoll_remove_usock(eid, *it); - UDT::epoll_remove_ssock(eid, *it); - OnelabServer::instance()->removeClient(cli); - UDT::close(*it); + OnelabServer::instance()->stopClient(cli); break; case OnelabProtocol::OnelabMessage: if(msg.attrs.size()==1 && msg.attrs[0]->getAttributeType() == OnelabAttrMessage::attributeType()) { @@ -682,7 +845,6 @@ void *listenOnClients(void *param) } } } -#endif void OnelabServer::sendAllParameter(OnelabLocalNetworkClient *cli) { @@ -700,42 +862,22 @@ void OnelabServer::sendAllParameter(OnelabLocalNetworkClient *cli) } } -void OnelabServer::Run() +void OnelabServer::finalize() { - UInt32 bufflen = 1024, recvlen = 0; - UInt8 buff[1024]; - IPv4 ip; - OnelabProtocol msg(-1), rep(OnelabProtocol::OnelabResponse); - OnelabLocalNetworkClient *currentClient = NULL; - -#ifdef HAVE_UDT - UDTSOCKET newcli = 0; -#ifndef WIN32 - pthread_t listen_thread; -#else - HANDLER listen_thread; -#endif - - _eid = UDT::epoll_create(); - - udt_socket_listen(_fdu); - ip4_socket_listen(_fds); - std::clog << "listen on " << ip4_inet_ntop(_ip.address) << ":" << _ip.port << "(tcp)" << std::endl; - // << "listen on " << ip4_inet_ntop(_ip.address) << ":" << _ip.port << "(udp/udt)" << std::endl; - //while(newcli = udt_socket_accept(_fdu, ip)) { // TODO accept udt and tcp ? - - while(newcli = ip4_socket_accept(_fds, ip)) { - std::clog << "\033[0;31m" << "accpet peer : " << ip4_inet_ntop(ip.address)<< ':' << ip.port << "\033[0m" << std::endl; - //UDT::epoll_add_usock(_eid, newcli); - UDT::epoll_add_ssock(_eid, newcli); - if(_clients.size() == 0) -#ifndef WIN32 - pthread_create(&listen_thread, NULL, listenOnClients, NULL); -#else - listen_thread = CreateThread(NULL, 0, listenOnClients, NULL, 0, NULL); -#endif + stopClients(); +#if !defined(WIN32) || defined(__CYGWIN__) + if(_sockname.size()) { + unlink(_sockname.c_str()); } - udt_socket_close(_fdu); #endif - ip4_socket_close(_fds); +} +void OnelabServer::running(bool running) +{ + _running = running; + if(!_running){ + OnelabLocalClient *localgui = OnelabServer::instance()->getLocalClient("localGUI"); + OnelabLocalNetworkClient *gui = OnelabServer::instance()->getClient("GUI"); + if(localgui) localgui->onStop(); + if(gui) ;// TODO + } } diff --git a/contrib/onelab2/OnelabServer.h b/contrib/onelab2/OnelabServer.h index 8682d91a1e39c7a49c48ef629ae9bdd54cff3da5..a1789672b8a1b28447f54402c13679390f51caed 100644 --- a/contrib/onelab2/OnelabServer.h +++ b/contrib/onelab2/OnelabServer.h @@ -20,58 +20,69 @@ class OnelabServer { private: static OnelabServer *_server; - IPv4 _ip; + std::string _sockname; + IPv4 _ip, // TCP + _ipu; // UDT std::vector<OnelabLocalNetworkClient> _clients; std::vector<OnelabLocalClient *> _localClients; #ifndef WIN32 - pthread_t _runningThread; + pthread_t _runningThread, + _listenThread, // thread to listen on connected clients + _tcpThread, _udtThread, _unixThread; // thread to accept new clients pthread_mutex_t _mutex_todo; #else - HANDLER _runningThread; + HANDLER _runningThread, + _listenThread, + _tcpThread, _udtThread, _unixThread; #endif - bool _running; + bool _running, + _tcpServer, _udtServer, _unixServer; std::queue<std::string> _todoClient; std::queue<std::string> _todoAction; onelab::parameterSpace _parameterSpace; - Socket _fds; + Socket _fds, _fdx; // system socket (TCP and UNIX) + int _eid; #ifdef HAVE_UDT UDTSOCKET _fdu; - int _eid; - void sendto(std::string client, UInt8 *buff, UInt32 len); #endif + void sendto(std::string client, UInt8 *buff, UInt32 len); + public: - OnelabServer(UInt16 port); - OnelabServer(UInt32 iface, UInt16 port); - static OnelabServer *instance(const UInt32 iface=0, const UInt16 port=0) { - if(!_server) _server = new OnelabServer(iface, port); - else if(iface != 0 || port != 0) { - delete _server; - _server = new OnelabServer(iface, port); - } + OnelabServer(); + static OnelabServer *instance() { + if(!_server) _server = new OnelabServer(); return _server; } static void setInstance(OnelabServer *s) { _server = s; } + void listenOnTcp(unsigned int iface=0, unsigned short port=0); + void listenOnUnix(const char *sockname); + void acceptTcp(); + void acceptUnix(); + void stopTcp(); + void stopUnix(); onelab::parameterSpace *getParameterSpace() {return &_parameterSpace;} UInt16 getPort() { return _ip.port;} #ifdef HAVE_UDT ~OnelabServer(){UDT::cleanup();} + void listenOnUdt(unsigned int iface=0, unsigned short port=0); + void acceptUdt(); + void stopUdt(); #else ~OnelabServer(){} #endif - void Run(); + void finalize(); bool isRunning() const {return _running;} - void running(bool running) {_running = running;} + void running(bool running); // Client methods -#ifdef HAVE_UDT inline int getEID() const {return _eid;} - void addClient(std::string name, UDTSOCKET fd, UInt32 ip, UInt16 port); - OnelabLocalNetworkClient *getClient(UDTSOCKET fd); -#else + OnelabLocalNetworkClient *getClient(Socket fd); void addClient(std::string name, UInt32 ip, UInt16 port); -#endif void addClient(OnelabLocalClient *cli) {_localClients.push_back(cli);} + void addClient(std::string name, Socket fd, UInt32 ip, UInt16 port); int launchClient(const std::string &, bool blocking=false); void removeClient(OnelabLocalNetworkClient *client); + void stopClient(OnelabLocalNetworkClient *cli); + void stopClients(); std::vector<OnelabLocalNetworkClient> &getClients() {return _clients;} std::vector<OnelabLocalClient *> &getLocalClients() {return _localClients;} OnelabLocalNetworkClient *getClient(const UInt32 ip, const UInt16 port); @@ -153,7 +164,8 @@ public: void setChanged(bool changed, const std::string &client="") { _parameterSpace.setChanged(changed, client); } - void performAction(const std::string action, const std::string client="", bool blocking=false); + void performAction(const std::string &action, const std::string &client="", bool blocking=false); bool performNextAction(); + void waitOnClients(); }; #endif diff --git a/contrib/onelab2/UdtUtils.h b/contrib/onelab2/UdtUtils.h index 26e12a9d5c020b18fd8b67557cfdfa0d012efd9f..b6f67e842ee2b0f70ea5577715c8fe57d5f425bc 100644 --- a/contrib/onelab2/UdtUtils.h +++ b/contrib/onelab2/UdtUtils.h @@ -1,8 +1,8 @@ #ifndef _UDTUTILS_H_ #define _UDTUTILS_H_ -#include <iostream>// FIXME debug only -#include <udt.h> +#include <iostream> #include "NetworkUtils.h" +#include <udt.h> inline UDTSOCKET udt_socket(IPv4 ip, int socketType) {