Skip to content
Snippets Groups Projects
Commit 1823c273 authored by Maxime Graulich's avatar Maxime Graulich
Browse files

edit recv for onelab2 server

parent 707f0f21
Branches
Tags
No related merge requests found
...@@ -38,6 +38,21 @@ int OnelabLocalNetworkClient::recvfrom(UInt8 *buff, unsigned int maxlen) ...@@ -38,6 +38,21 @@ int OnelabLocalNetworkClient::recvfrom(UInt8 *buff, unsigned int maxlen)
// return ip4_socket_recv(_fds, buff, maxlen); // return ip4_socket_recv(_fds, buff, maxlen);
//#endif //#endif
} }
int OnelabLocalNetworkClient::recvmsg(OnelabProtocol &msg)
{
UInt8 header[8];
UInt8 *buff = NULL;
int recvlen = 0;
// recv the header
recvlen = recvfrom(header, 4);
if(recvlen != 4) return recvlen;
int msglen = msg.parseHeader(header, recvlen);
// then recv the message
buff = (UInt8 *) malloc(sizeof(UInt8)*msglen);
recvlen = recvfrom(buff, msglen); // recvlen should be equals to msglen
msg.parseMessage(buff, recvlen);
return recvlen + 4;
}
void OnelabLocalNetworkClient::updateParameter(onelab::parameter *p) void OnelabLocalNetworkClient::updateParameter(onelab::parameter *p)
{ {
OnelabProtocol msg(OnelabProtocol::OnelabUpdate); OnelabProtocol msg(OnelabProtocol::OnelabUpdate);
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include "onelab.h" #include "onelab.h"
//#ifdef HAVE_UDT //#ifdef HAVE_UDT
#include "UdtUtils.h" #include "UdtUtils.h"
#include "OnelabProtocol.h"
//#else //#else
//#include "NetworkUtils.h" //#include "NetworkUtils.h"
//#endif //#endif
...@@ -25,6 +26,7 @@ public: ...@@ -25,6 +26,7 @@ public:
OnelabLocalNetworkClient(std::string name, Socket fd, unsigned int ip, unsigned short port); OnelabLocalNetworkClient(std::string name, Socket fd, unsigned int ip, unsigned short port);
void sendto(UInt8 *buff, unsigned int len); void sendto(UInt8 *buff, unsigned int len);
int recvfrom(UInt8 *buff, unsigned int maxlen); int recvfrom(UInt8 *buff, unsigned int maxlen);
int recvmsg(OnelabProtocol &msg);
UDTSOCKET getSSocket() {return _fds;} UDTSOCKET getSSocket() {return _fds;}
UDTSOCKET getUSocket() {return _fdu;} UDTSOCKET getUSocket() {return _fdu;}
virtual ~OnelabLocalNetworkClient(){} virtual ~OnelabLocalNetworkClient(){}
......
...@@ -39,6 +39,86 @@ unsigned short OnelabProtocol::encodeMsg(UInt8 *buff, UInt32 len) ...@@ -39,6 +39,86 @@ unsigned short OnelabProtocol::encodeMsg(UInt8 *buff, UInt32 len)
encode(sizeptr, _size); encode(sizeptr, _size);
return (unsigned short)(ptr-buff); return (unsigned short)(ptr-buff);
} }
int OnelabProtocol::parseHeader(UInt8 *buff, UInt32 len)
{
this->clearAttrs();
if(len < 4) throw ERROR_BUFFER_TOO_SMALL;
UInt8 *ptr = buff;
UInt8 version = 0;
ptr = parse(ptr, version);
if(version != ONELAB_VERSION) throw ERROR_ONELAB_VERSION;
ptr = parse(ptr, _type);
ptr = parse(ptr, _size);
return _size;
}
UInt32 OnelabProtocol::parseMessage(UInt8 *buff, UInt32 len)
{
UInt8 *ptr = buff;
UInt8 *payload = ptr;
unsigned short parsed = 4;
unsigned short size = _size;
while(size >= 4) {
UInt16 attrType = 0;
UInt16 attrSize = 0;
ptr = parse(ptr, attrType);
ptr = parse(ptr, attrSize);
size -= 4;
std::cout << "Try to parse an attribute of type 0x" << std::hex << (UInt16)attrType << std::dec << " and size : " << attrSize << std::endl;
if(attrSize > size) throw ERROR_BUFFER_TOO_SMALL;
switch(attrType) {
case OnelabAttr::Message:
this->attrs.push_back(new OnelabAttrMessage());
((OnelabAttrMessage *)this->attrs.back())->parseAttribute(ptr, attrSize);
break;
case OnelabAttr::Number:
this->attrs.push_back(new onelab::number());
((onelab::number *)this->attrs.back())->parseAttribute(ptr, attrSize);
break;
case OnelabAttr::String:
this->attrs.push_back(new onelab::string());
((onelab::string *)this->attrs.back())->parseAttribute(ptr, attrSize);
break;
case OnelabAttr::Region:
this->attrs.push_back(new onelab::region());
((onelab::region *)this->attrs.back())->parseAttribute(ptr, attrSize);
break;
case OnelabAttr::Function:
this->attrs.push_back(new onelab::region());
((onelab::function *)this->attrs.back())->parseAttribute(ptr, attrSize);
break;
case OnelabAttr::Start:
this->attrs.push_back(new OnelabAttrStart());
((onelab::string *)this->attrs.back())->parseAttribute(ptr, attrSize);
break;
case OnelabAttr::Parameter:
this->attrs.push_back(new OnelabAttrParameterQuery());
((OnelabAttrParameterQuery *)this->attrs.back())->parseAttribute(ptr, attrSize);
break;
case 0x0b:
this->attrs.push_back(new OnelabAttrFileQuery());
((OnelabAttrFileQuery *)this->attrs.back())->parseAttribute(ptr, attrSize);
break;
case 0x0c:
this->attrs.push_back(new OnelabAttrFile());
((OnelabAttrFile *)this->attrs.back())->parseAttribute(ptr, attrSize);
break;
default:
// FIXME unknown attribute
//if(attrSize != 0) throw "Size of attr must be 0!";
/*this->attrs.push_back(new OnelabAttr(attrType));
this->attrs.back()->parseAttribute(ptr, &attrSize);*/
break;
}
ptr += attrSize;
size -= attrSize;
parsed += attrSize+4;
}
if(parsed != len) {std::cout << "parse - size left:" << len-parsed << '-' << size << "(len is "<< len <<" and parsed is "<< parsed <<" )" << std::endl;}
return len-parsed;
}
UInt32 OnelabProtocol::parseMsg(UInt8 *buff, UInt32 len) UInt32 OnelabProtocol::parseMsg(UInt8 *buff, UInt32 len)
{ {
this->clearAttrs(); this->clearAttrs();
......
...@@ -21,6 +21,8 @@ public: ...@@ -21,6 +21,8 @@ public:
void clearAttrs(); void clearAttrs();
unsigned short encodeMsg(UInt8 *buff, UInt32 len); unsigned short encodeMsg(UInt8 *buff, UInt32 len);
UInt32 parseMsg(UInt8 *buff, UInt32 len); UInt32 parseMsg(UInt8 *buff, UInt32 len);
int parseHeader(UInt8 *buff, UInt32 len);
UInt32 parseMessage(UInt8 *buff, UInt32 len);
void showMsg(); void showMsg();
short msgType() {return _type;} short msgType() {return _type;}
......
...@@ -115,26 +115,24 @@ DWORD WINAPI listenOnClients(LPVOID param) ...@@ -115,26 +115,24 @@ DWORD WINAPI listenOnClients(LPVOID param)
std::set<Socket> fdss; std::set<Socket> fdss;
int recvlen = 0; int recvlen = 0;
UInt8 prev[1024]; UInt8 prev[1024];
UInt32 unreaded = 0;
UInt8 buff[1024]; UInt8 buff[1024];
OnelabProtocol msg(-1), rep(-1); OnelabProtocol msg(-1), rep(-1);
int eid = OnelabServer::instance()->getEID(); int eid = OnelabServer::instance()->getEID();
while(UDT::ERROR != UDT::epoll_wait(eid, &fdus, NULL, -1, &fdss)) { while(UDT::ERROR != UDT::epoll_wait(eid, &fdus, NULL, -1, &fdss, NULL)) {
std::cout << "ok" << std::endl;
//for(std::set<UDTSOCKET>::iterator it = fdus.begin(); it != fdus.end(); ++it) { //for(std::set<UDTSOCKET>::iterator it = fdus.begin(); it != fdus.end(); ++it) {
for(std::set<Socket>::iterator it = fdss.begin(); it != fdss.end(); ++it) { for(std::set<Socket>::iterator it = fdss.begin(); it != fdss.end(); ++it) {
OnelabLocalNetworkClient *cli = OnelabServer::instance()->getClient(*it); OnelabLocalNetworkClient *cli = OnelabServer::instance()->getClient(*it);
std::cout << "ok ->" << (void*)cli << std::endl;
if(cli == NULL) { if(cli == NULL) {
IPv4 ip; IPv4 ip;
//recvlen = udt_socket_recv(*it, buff, 1024); //recvlen = udt_socket_recv(*it, buff, 1024);
recvlen = ip4_socket_recv(*it, buff, 1024, ip); recvlen = ip4_socket_recv(*it, buff, 1024, ip); // FIXME
std::clog << "recv " << recvlen << std::endl; std::clog << "recv " << recvlen << std::endl;
msg.parseMsg(buff, recvlen); msg.parseMsg(buff, recvlen);
msg.showMsg(); msg.showMsg();
if(msg.msgType() == OnelabProtocol::OnelabStart && msg.attrs.size() > 0 && msg.attrs[0]->getAttributeType() == OnelabAttr::Start) { if(msg.msgType() == OnelabProtocol::OnelabStart && msg.attrs.size() > 0 && msg.attrs[0]->getAttributeType() == OnelabAttr::Start) {
std::string name = std::string(((OnelabAttrStart *)msg.attrs[0])->name()); std::string name = std::string(((OnelabAttrStart *)msg.attrs[0])->name());
if(OnelabServer::instance()->getClient(name) != NULL) { if(OnelabServer::instance()->getClient(name) != NULL) {
std::cout << "A client exist with this name !" << std::endl;
rep.msgType(OnelabProtocol::OnelabMessage); rep.msgType(OnelabProtocol::OnelabMessage);
rep.attrs.push_back(new OnelabAttrMessage("A client exist with this name !", OnelabAttrMessage::Fatal)); rep.attrs.push_back(new OnelabAttrMessage("A client exist with this name !", OnelabAttrMessage::Fatal));
recvlen = rep.encodeMsg(buff, 1024); recvlen = rep.encodeMsg(buff, 1024);
...@@ -151,7 +149,7 @@ DWORD WINAPI listenOnClients(LPVOID param) ...@@ -151,7 +149,7 @@ DWORD WINAPI listenOnClients(LPVOID param)
recvlen = rep.encodeMsg(buff, 1024); recvlen = rep.encodeMsg(buff, 1024);
cli = OnelabServer::instance()->getClient(*it); cli = OnelabServer::instance()->getClient(*it);
cli->sendto(buff, recvlen); cli->sendto(buff, recvlen);
//FIXME OnelabServer::instance()->sendAllParameter(cli); OnelabServer::instance()->sendAllParameter(cli);
continue; continue;
} }
else { else {
...@@ -163,20 +161,24 @@ DWORD WINAPI listenOnClients(LPVOID param) ...@@ -163,20 +161,24 @@ DWORD WINAPI listenOnClients(LPVOID param)
} }
} }
else { else {
std::cout << "ok" << std::endl;
try { try {
//recvlen = cli->recvfrom(buff, 1024); recvlen = cli->recvmsg(msg);
std::cout << "i've " << unreaded << "bytes from previous packet" << std::endl; }
memcpy((char*)buff, (char*)prev, unreaded); catch(int &e) {
recvlen = cli->recvfrom(buff+unreaded, 1024-unreaded); if(e == 50) { // Recv error (TCP)
recvlen += unreaded;
}
catch(int &e) { // for UDT
if(UDT::getlasterror().getErrorCode() == 2001) { // ECONNLOST
std::cout << "\033[0;31m" << "Connection with client \"" << cli->getName() << "\" was broken, removing the client." << "\033[0m" << std::endl; // DEBUG std::cout << "\033[0;31m" << "Connection with client \"" << cli->getName() << "\" was broken, removing the client." << "\033[0m" << std::endl; // DEBUG
UDT::epoll_remove_usock(eid, *it); UDT::epoll_remove_ssock(eid, *it);
OnelabServer::instance()->removeClient(cli); OnelabServer::instance()->removeClient(cli);
UDT::close(*it); UDT::close(*it);
} }
// TODO for UDT
/*if(UDT::getlasterror().getErrorCode() == 2001 || e == 50) { // ECONNLOST
std::cout << "\033[0;31m" << "Connection with client \"" << cli->getName() << "\" was broken, removing the client." << "\033[0m" << std::endl; // DEBUG
UDT::epoll_remove_usock(eid, *it);
OnelabServer::instance()->removeClient(cli);
UDT::close(*it);
}*/
continue; continue;
} }
if(recvlen == 0) { // for TCP if(recvlen == 0) { // for TCP
...@@ -186,11 +188,8 @@ DWORD WINAPI listenOnClients(LPVOID param) ...@@ -186,11 +188,8 @@ DWORD WINAPI listenOnClients(LPVOID param)
UDT::close(*it); UDT::close(*it);
continue; continue;
} }
//std::clog << "recv " << recvlen << " bytes on client " << cli->getName() << std::endl; std::clog << "recv " << recvlen << " bytes on client " << cli->getName() << std::endl;
unreaded = msg.parseMsg(buff, recvlen); //unreaded = msg.parseMsg(buff, recvlen);
if(unreaded > 0) {
memcpy((char*)prev, (char*)buff+(recvlen-unreaded), unreaded);
}
switch (msg.msgType()) { switch (msg.msgType()) {
case OnelabProtocol::OnelabStop: case OnelabProtocol::OnelabStop:
std::cout << "\033[0;31m" << "Client \"" << cli->getName() << "\" is going to stop" << "\033[0m" << std::endl; // DEBUG std::cout << "\033[0;31m" << "Client \"" << cli->getName() << "\" is going to stop" << "\033[0m" << std::endl; // DEBUG
...@@ -348,10 +347,10 @@ void OnelabServer::Run() ...@@ -348,10 +347,10 @@ void OnelabServer::Run()
udt_socket_listen(_fdu); udt_socket_listen(_fdu);
ip4_socket_listen(_fds); ip4_socket_listen(_fds);
std::clog << "listen on " << ip4_inet_ntop(_ip.address) << ":" << _ip.port << "(tcp)" << std::endl std::clog << "listen on " << ip4_inet_ntop(_ip.address) << ":" << _ip.port << "(tcp)" << std::endl;
<< "listen on " << ip4_inet_ntop(_ip.address) << ":" << _ip.port << "(udp/udt)" << std::endl; // << "listen on " << ip4_inet_ntop(_ip.address) << ":" << _ip.port << "(udp/udt)" << std::endl;
//while(newcli = udt_socket_accept(_fdu, ip)) { // TODO accept ip4 //while(newcli = udt_socket_accept(_fdu, ip)) { // TODO accept udt and tcp ?
while(newcli = ip4_socket_accept(_fds, ip)) { // TODO accept ip4 while(newcli = ip4_socket_accept(_fds, ip)) {
std::clog << "\033[0;31m" << "accpet peer : " << ip4_inet_ntop(ip.address)<< ':' << ip.port << "\033[0m" << std::endl; std::clog << "\033[0;31m" << "accpet peer : " << ip4_inet_ntop(ip.address)<< ':' << ip.port << "\033[0m" << std::endl;
//UDT::epoll_add_usock(_eid, newcli); //UDT::epoll_add_usock(_eid, newcli);
UDT::epoll_add_ssock(_eid, newcli); UDT::epoll_add_ssock(_eid, newcli);
...@@ -361,12 +360,13 @@ void OnelabServer::Run() ...@@ -361,12 +360,13 @@ void OnelabServer::Run()
#else #else
listen_thread = CreateThread(NULL, 0, listenOnClients, NULL, 0, NULL); listen_thread = CreateThread(NULL, 0, listenOnClients, NULL, 0, NULL);
#endif #endif
} if(_clients.size() == 1)
#ifndef WIN32 #ifndef WIN32
pthread_join(listen_thread, NULL); pthread_join(listen_thread, NULL);
#else #else
WaitForSingleObject(listen_thread, INFINITE); WaitForSingleObject(listen_thread, INFINITE);
#endif #endif
}
udt_socket_close(_fdu); udt_socket_close(_fdu);
#else #else
// TODO // TODO
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment