#include "user_manager.h" using namespace UserManagement; using namespace Database; callbackVector UserHandler::functions; UserManager* UserManager::mgr; UserManager::UserManager() { // register user handler commands UserHandler::RegisterCmd(UserHandler::login,"LOGIN","Login a new user"); UserHandler::RegisterCmd(UserHandler::logout,"LOGOUT","Logout a user"); UserHandler::RegisterCmd(UserHandler::src,"SRC","Request a src IP address"); UserHandler::RegisterCmd(UserHandler::dst,"DST","Request a dst IP address"); //UserHandler::RegisterCmd(UserHandler::port,"PORT","Specify 3 port numbers for tcp, udp and ping traffic"); UserHandler::RegisterCmd(UserHandler::ping,"PING","Receive a heartbeat"); UserHandler::RegisterCmd(UserHandler::reg,"REGISTER","Register a new client"); UserHandler::RegisterCmd(UserHandler::stats,"STATS","Receive stats command"); UserHandler::RegisterCmd(UserHandler::status,"GETSTATS","Request status from the server"); // queue(); // create heartbeat monitor // BeatMonitor *monitor = new BeatMonitor(this); // monitor->Run(); monitor = new BeatMonitor(5000); Create(); } UserManager::~UserManager() { // destruct } UserManager* UserManager::getManager() { if (!mgr) mgr = new UserManager(); return mgr; } void UserManager::ProcessEvents() { while (queue.size() > 0) { UserEvent evt = queue.back(); if (evt.sEventType.CompareTo("CONNECT")==0) { if (evt.data1!=evt.data2) { // take both users and connect them to each other wxString cmd1; // data1 contains first in ring, send command to make // data2 the destination of data1, give them data2's udp port cmd1.Printf("dst %s@%s:%d\r\n",evt.data2->username.c_str(),evt.data2->ip.c_str(),evt.data2->udp_port); if (!Network::SendCommand(cmd1,evt.data1->ip,evt.data1->tcp_port)) wxLogMessage("COMMAND %s failed for %s",cmd1.c_str(),evt.data1->username.c_str()); // now swap, data2 must receive from data1. cmd1.Printf("src %s@%s\r\n",evt.data1->username.c_str(),evt.data1->ip.c_str()); if (!Network::SendCommand(cmd1,evt.data2->ip,evt.data2->tcp_port)) wxLogMessage("COMMAND %s failed for %s",cmd1.c_str(),evt.data2->username.c_str()); } else { //wxLogMessage("Making %s wait",evt.data1->username.c_str()); wxString cmd1 = "wait\r\n"; Network::SendCommand(cmd1,evt.data1->ip,evt.data1->tcp_port); } } else if (evt.sEventType.CompareTo("WAIT")==0) { wxString cmd1 = "wait\r\n"; Network::SendCommand(cmd1,evt.data1->ip,evt.data1->tcp_port); } queue.pop_back(); } } wxThread::ExitCode UserManager::Entry() { //monitor->Run(); // create a listening TCP socket on the USER_PORT Network::TCP_socket server = Network::StartServer(killsignal,USER_PORT); for ( ;!killsignal; ) { // process events.... ProcessEvents(); // accept a connection coming in on server_tcpsock Network::TCP_socket new_tcpsock; new_tcpsock=Network::AcceptSocket(server); if(new_tcpsock) { // communicate over new_tcpsock UserHandler *h = new UserHandler(new_tcpsock); h->Run(); } wxThread::Sleep(100); } return 0; } void UserManager::QueueEvent(UserEvent event) { // add event to the queue queue.insert(queue.begin(),1,event); } /* UserHandler class */ UserHandler::UserHandler(TCP_socket m_socket) { socket = m_socket; valid = false; table = new UserTable(); if (!table->Open()) { // An error occurred opening (setting up) the table wxLogError(wxT("Thread %d: Failed to open usertable for insertion of data!"),(int)GetId()); return; } // construct our socket Create(); } UserHandler::~UserHandler() { Network::Close(socket); // destruct delete table; } wxThread::ExitCode UserHandler::Entry() { // start doing stuff wxLogMessage("Userhandler entering..."); while (!killsignal) { // read a char from the stream and see what command we should do if (TestDestroy()) { killsignal=true; break; } // FIXME: Should wait for socket read here... wxString data; if (Network::ReadChars(socket,data)) { wxLogDebug("CMD : %s",data.c_str()); wxString args; callback fun = MatchFunction( data , args ); if ( fun != NULL ) { wxString databack = fun( this,args ); databack += "\r\n"; WriteChars( socket,(void *)databack.c_str(),databack.Length() ); } data.Empty(); } else break; wxThread::Sleep(200); } wxLogMessage("Userhandler exiting."); return 0; } bool UserHandler::RegisterCmd( callback func, const wxString& sCommand, const wxString& sHelp) { callBackFunction fu; fu.function = func; fu.sCommand = sCommand; fu.sHelp = sHelp; fu.sCommand.MakeLower(); functions.push_back(fu); // add to the vector return true; } callback UserHandler::MatchFunction( const wxString& sCommand , wxString& sArgs ) { callbackVector::iterator pFun; for ( pFun = functions.begin() ; pFun != functions.end() ; pFun++ ) { unsigned int iLenCommand = pFun->sCommand.Length(); wxString sCopCommand = sCommand; sCopCommand.MakeLower(); wxString givenCmd = sCopCommand.Left(iLenCommand); if (givenCmd.Cmp(pFun->sCommand)==0 && (sCopCommand.Length()==iLenCommand || sCopCommand.GetChar(iLenCommand)==' ')) { if ( iLenCommand < sCommand.Length() ) { sArgs = sCommand.Mid( iLenCommand+1 ); } return pFun->function; } } return NULL; } wxString UserHandler::reg(UserHandler *handle,const wxString &args) { // REG username,email,connectionspeed wxString reg_speed = args.BeforeFirst(SEPERATOR); wxString tmp = args.AfterFirst(SEPERATOR); wxString reg_username = tmp.BeforeFirst(SEPERATOR); wxString reg_email = tmp.AfterFirst(SEPERATOR); // check database that user registration is ok if (handle->table->IsNameFree(reg_username) && reg_username!="" && reg_email!="") { // add data to dbase wxLogMessage(_T("User details accepted : U=\"%s\" E=\"%s\""),reg_username.c_str(),reg_email.c_str()); handle->table->GetDb()->RollbackTrans(); handle->table->ClearMemberVars(); // insert details into dbase.....? wxStrcpy(handle->table->name,reg_username); wxStrcpy(handle->table->email,reg_email); wxStrcpy(handle->table->connection,reg_speed); // lookup longitude and latitude and country information wxString query; query.Printf(_T("http://maxmind.com:8010/b?l=msF4tcD0cjNk&i=%s"),Network::GetIP(handle->socket).c_str()); wxURL url(query); //wxHTTP url; wxInputStream *in_stream; in_stream = url.GetInputStream(); //wxLogMessage(_T("Thread %d: Querying : %s"),(int)GetId(),query.c_str()); if (url.GetError()!=wxURL_NOERR) wxLogError(_T("URL Error : %d"),url.GetError()); if (!in_stream->IsOk()) wxLogError(_T("Stream isn't ok!")); //wxLogMessage(_T("Thread %d: Size of stream... %d"),(int)GetId(),in_stream->GetSize()); wxChar *buf2 = new wxChar[4096]; int i=0; char c = in_stream->GetC(); while (in_stream->LastRead()!=0) { buf2[i++] = c; c = in_stream->GetC(); } if (i < 4096) buf2[i] = '\0'; //SE,26,Stockholm,59.333302,18.049999 wxString countryCode; wxString city; double longitude = 0; double latitude = 0; // parse above string into variables for database { wxStringTokenizer tkz(buf2, ",\n"); if (tkz.HasMoreTokens()) countryCode = tkz.GetNextToken(); if (tkz.HasMoreTokens()) tkz.GetNextToken(); if (tkz.HasMoreTokens()) city = tkz.GetNextToken(); if (tkz.HasMoreTokens()) { wxString tempLatitude = tkz.GetNextToken(); tempLatitude.ToDouble(&latitude); latitude += 90; } if (tkz.HasMoreTokens()) { wxString tempLongitude = tkz.GetNextToken(); tempLongitude.ToDouble(&longitude); longitude += 180; } } wxStrcpy(handle->table->countryCode,countryCode); wxStrcpy(handle->table->city,city); handle->table->longitude = longitude; handle->table->latitude = latitude; // now debug print, and then store each of the variables we get //wxLogMessage(_T("Thread %d: GeoIP gave us: %s"),(int)GetId(), buf2); delete[] buf2; // table->isOnline = FALSE; int result = handle->table->Insert(); if (result==DB_FAILURE || result == DB_ERR_INTEGRITY_CONSTRAINT_VIOL) { wxLogError(_T("User details failed to insert into database!")); handle->table->GetDb()->RollbackTrans(); return "ERR Failed to create new user in database"; } else { wxLogMessage(_T("User details added to database")); handle->table->GetDb()->CommitTrans(); return "OK Registered Successfully"; } } else { wxLogMessage(_T("Username duplicate or invalid.")); return "ERR Duplicate/Invalid username"; } } wxString UserHandler::logout(UserHandler *handle,const wxString &args) { wxString user = args.BeforeLast(SEPERATOR); wxString pass = args.AfterLast(SEPERATOR); if (user=="") return "ERR Invalid Username"; MutexLocker lock(&users_mutex); UserData *tmp = users->Find(user); if (tmp) { if (handle->table->FetchByName(user)) { if (pass.CompareTo(handle->table->email)==0) { // valid username/email combination handle->valid = false; handle->loggedIn = false; // reconnect users if (tmp->prev!=tmp && tmp->prev!=NULL) { tmp->prev->next = tmp->next; tmp->next->prev = tmp->prev; UserEvent evt; evt.sEventType = "CONNECT"; evt.data1 = tmp->prev; evt.data2 = tmp->next; UserManager::getManager()->QueueEvent(evt); } users->Remove(tmp); handle->table->isOnline = 0; handle->table->Update(); handle->table->GetDb()->CommitTrans(); return "OK"; } return "ERR Invalid Email"; } return "ERR Invalid Username"; } return "ERR Not logged in!"; } wxString UserHandler::login(UserHandler *handle,const wxString &args) { // received LOGIN command // read args user,email@2000:3000 wxString user = args.BeforeLast(SEPERATOR); wxString tmpS = args.AfterLast(SEPERATOR); wxString pass = tmpS.BeforeLast('@'); wxString ports = tmpS.AfterLast('@'); long udp; long tcp; ports.BeforeFirst(':').ToLong(&udp); ports.AfterFirst(':').ToLong(&tcp); // if (UserDatabase::IsValid(user,pass)) MutexLocker lock(&users_mutex); if (user=="") return "ERR Invalid Username"; UserData *tmp = users->Find(user); if (tmp) { handle->loggedIn = true; handle->data = tmp; handle->valid = true; return "OK Previously logged in"; } if (handle->table->FetchByName(user)) { if (pass.CompareTo(handle->table->email)==0) { handle->valid = true; handle->loggedIn = true; handle->data = new UserData(); handle->data->username.Printf("%s",handle->table->name); handle->data->longitude = handle->table->longitude; handle->data->latitude = handle->table->latitude; handle->data->ip = Network::GetIP(handle->socket); // ensure PORT command is set before server begins to send data to this client handle->data->udp_port = udp; handle->data->tcp_port = tcp; //handle->data->lastPing = now(); // Find user a position, and place in array UserData *tmp = users->GetClosest(handle->data); if (tmp!=NULL && tmp!=handle->data) { if (tmp->next!=NULL && tmp->prev!=NULL) { float d1 = users->GetDistance(tmp->next,handle->data); float d2 = users->GetDistance(tmp->prev,handle->data); if (d1 > d2) { // connect between tmp-prev and tmp tmp->prev->next = handle->data; handle->data->next = tmp; handle->data->prev = tmp->prev; tmp->prev = handle->data; } else { handle->data->next = tmp->next; tmp->next->prev = handle->data; tmp->next = handle->data; handle->data->prev = tmp; } } else { // only one user tmp->next = handle->data; tmp->prev = handle->data; handle->data->next = tmp; handle->data->prev = tmp; } UserEvent evt; evt.sEventType = "CONNECT"; evt.data1 = handle->data; evt.data2 = handle->data->next; UserManager::getManager()->QueueEvent(evt); if (handle->data->prev!=handle->data) { UserEvent evt2; evt2.sEventType = "CONNECT"; evt2.data1 = handle->data->prev; evt2.data2 = handle->data; UserManager::getManager()->QueueEvent(evt2); } } else { // send wait UserEvent evt; evt.sEventType = "WAIT"; evt.data1 = handle->data; evt.data2 = handle->data; UserManager::getManager()->QueueEvent(evt); } users->Add(handle->data); handle->table->isOnline = 1; handle->table->Update(); handle->table->GetDb()->CommitTrans(); return "OK Logged In"; } return "ERR Invalid email"; } return "ERR Invalid username"; } wxString UserHandler::port(UserHandler *handle, const wxString &args) { // if (UserDatabase::IsValid(user,pass)) if (handle->loggedIn) { // read in all 3 port numbers seperated by : wxString udp = args.BeforeFirst(SEPERATOR); wxString tcp = args.AfterFirst(SEPERATOR); unsigned long udpl,tcpl; udp.ToULong(&udpl); tcp.ToULong(&tcpl); handle->data->tcp_port = tcpl; handle->data->udp_port = udpl; return "OK"; } return "ERR Not logged in!"; } wxString UserHandler::src(UserHandler *handle, const wxString &args) { MutexLocker lock(&users_mutex); if (handle->loggedIn) { // logged in so lets give them the IP if (handle->data->prev!=NULL && handle->data->prev!=handle->data) { wxString output; output.Printf("SRC %s@%s:%d",handle->data->prev->username.c_str(),handle->data->prev->ip.c_str(),handle->data->prev->udp_port); return output; } else return "WAIT"; } else return "LOGIN"; } wxString UserHandler::dst(UserHandler *handle, const wxString &args) { MutexLocker lock(&users_mutex); if (handle->loggedIn) { // logged in so lets give them the IP if (handle->data->next!=NULL && handle->data->next!=handle->data) { wxString output; output.Printf("DST %s@%s:%d",handle->data->next->username.c_str(),handle->data->next->ip.c_str(),handle->data->next->udp_port); return output; } else return "WAIT"; } else return "LOGIN"; } wxString UserHandler::ping(UserHandler *handle,const wxString &args) { MutexLocker lock(&users_mutex); if (args=="") return "ERR Invalid Username"; UserData *tmp = users->Find(args); if (tmp) { handle->data = tmp; handle->data->lastPing = time((time_t *) NULL); return "OK"; } return "ERR Not logged in!"; } wxString UserHandler::status(UserHandler *handle, const wxString &args) { // receive and store stats from user MutexLocker lock(&users_mutex); if (args=="") return "ERR Invalid Username"; UserData *tmp = users->Find(args); wxLogMessage("Status report requested."); wxString res = "ERR User not logged in."; if (tmp) { int totalCycles; Database::SelectInt(totalCycles,"SELECT sum(cycles) FROM users"); // return usercount,cycles wxLogDebug("Total users:%d TotalCycles:%d",users->GetCount(),totalCycles); res.Printf("OK %d,%d",users->GetCount(),totalCycles); } return res; } wxString UserHandler::stats(UserHandler *handle, const wxString &args) { // receive and store stats from user MutexLocker lock(&users_mutex); wxString username = args.BeforeLast(SEPERATOR); if (username=="") return "ERR Invalid Username"; UserData *tmp = users->Find(username); wxString res = "ERR User not logged in."; if (tmp) { // user ok, lets grab their stats wxString corrupted = args.AfterLast(SEPERATOR); long corruptedPackets; corrupted.ToLong(&corruptedPackets); // store this data? handle->table->corruption = corruptedPackets; time_t diff = time((time_t *) NULL) - handle->data->lastUpdate; handle->data->lastUpdate = time((time_t *) NULL); handle->table->time += diff; handle->table->cycles++; handle->table->Update(); handle->table->GetDb()->CommitTrans(); int totalCycles; Database::SelectInt(totalCycles,"SELECT sum(cycles) FROM users"); // return usercount,cycles res.Printf("%d,%d",users->GetCount(),totalCycles); } return res; }