MASA-Core
MasaNet.cpp
Go to the documentation of this file.
00001 /*******************************************************************************
00002  *
00003  * Copyright (c) 2010-2015   Edans Sandes
00004  *
00005  * This file is part of MASA-Core.
00006  * 
00007  * MASA-Core is free software: you can redistribute it and/or modify
00008  * it under the terms of the GNU General Public License as published by
00009  * the Free Software Foundation, either version 3 of the License, or
00010  * (at your option) any later version.
00011  * 
00012  * MASA-Core is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  * 
00017  * You should have received a copy of the GNU General Public License
00018  * along with MASA-Core.  If not, see <http://www.gnu.org/licenses/>.
00019  *
00020  ******************************************************************************/
00021 
00022 #include "MasaNet.hpp"
00023 #include "command/CmdJoin.hpp"
00024 #include "command/CmdDiscover.hpp"
00025 #include "command/CmdUndiscover.hpp"
00026 #include "command/CmdNotifyScore.hpp"
00027 #include "command/CmdStatusRequest.hpp"
00028 #include "command/CmdStatusResponse.hpp"
00029 #include "command/CmdPeerRequest.hpp"
00030 #include "command/CmdPeerResponse.hpp"
00031 #include "command/CmdCreateRing.hpp"
00032 #include "command/CmdTestRing.hpp"
00033 
00034 #include <stdio.h>
00035 #include <stdlib.h>
00036 #include <string.h>
00037 
00038 #include <sys/socket.h> /* for socket(), bind(), and connect() */
00039 #include <arpa/inet.h>  /* for sockaddr_in and inet_ntoa() */
00040 #include <errno.h>
00041 #include <netdb.h>
00042 #include <unistd.h>
00043 #include <stdlib.h>
00044 
00045 #include <string>
00046 #include <algorithm>
00047 using namespace std;
00048 
00049 
00050 map<int, cmd_handler_f> MasaNet::cmdHandlers;
00051 
00052 #define DEFAULT_TCP_PORT        (12777)
00053 
00054 MasaNet::MasaNet(int nodeType, string nodeDescription)
00055 {
00056         this->nodeType = nodeType;
00057         this->nodeDescription = nodeDescription;
00058         this->leftPeer = NULL;
00059         this->leftPeerData = NULL;
00060         this->rightPeer = NULL;
00061         this->rightPeerData = NULL;
00062     timeval event;
00063     gettimeofday(&event, NULL);
00064     long long nsec = event.tv_usec%1000000;
00065         srand(time(NULL)+nsec);
00066         for (int i=0; i<12; i++) {
00067                 int ic = (rand()%52);
00068                 char c = (ic<26 ? ('A'+ic) : ('a'+ic-26));
00069                 myId += c;
00070         }
00071         fprintf(stderr, "MasaNet Name: %s\n", myId.c_str());
00072 
00073         registerCommand(CmdJoin::creator, NULL);
00074         registerCommand(CmdDiscover::creator, &MasaNet::cmd_discover);
00075         registerCommand(CmdUndiscover::creator, &MasaNet::cmd_undiscover);
00076         registerCommand(CmdNotifyScore::creator, NULL);
00077         registerCommand(CmdStatusRequest::creator, &MasaNet::cmd_status_request);
00078         registerCommand(CmdStatusResponse::creator, NULL);
00079         registerCommand(CmdPeerRequest::creator, &MasaNet::cmd_peer_request);
00080         registerCommand(CmdPeerResponse::creator, NULL);
00081         registerCommand(CmdCreateRing::creator, &MasaNet::cmd_create_ring);
00082         registerCommand(CmdTestRing::creator, &MasaNet::cmd_test_ring);
00083 
00084     pthread_mutex_init(&mutex, NULL);
00085 
00086 }
00087 
00088 MasaNet::~MasaNet() {
00089     pthread_mutex_destroy(&mutex);
00090 }
00091 
00092 void MasaNet::registerCommand(cmd_creator_f creator, cmd_handler_f handler) {
00093         Command* cmd = creator();
00094         int id = cmd->getId();
00095         delete cmd;
00096 
00097         cmdHandlers[id] = handler;
00098         Peer::registerCommandCreator(id, creator);
00099 }
00100 
00101 
00102 
00103 void MasaNet::startServer(int port) {
00104     int rc;
00105     struct sockaddr_in echoServAddr; /* Local address */
00106 
00107     if (port == 0) {
00108         serverPort = DEFAULT_TCP_PORT;
00109     } else {
00110         serverPort = port;
00111     }
00112 
00113 
00114     /* Create socket for incoming connections */
00115     if ((serverSocket = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
00116         fprintf(stderr, "ERROR creating server socket; return code from socket() is %d\n", serverSocket);
00117         exit(-1);
00118     }
00119 
00120     int optval = 1;
00121     setsockopt(serverSocket, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
00122 
00123 
00124 
00125     /* Construct local address structure */
00126     memset(&echoServAddr, 0, sizeof(echoServAddr));   /* Zero out structure */
00127     echoServAddr.sin_family = AF_INET;                /* Internet address family */
00128     echoServAddr.sin_addr.s_addr = htonl(INADDR_ANY); /* Any incoming interface */
00129     echoServAddr.sin_port = htons(serverPort);      /* Local port */
00130 
00131     /* Bind to the local address */
00132     if ((rc = bind(serverSocket, (struct sockaddr *) &echoServAddr, sizeof(echoServAddr))) < 0) {
00133         fprintf(stderr, "ERROR; return code from bind() is %d\n", rc);
00134         exit(-1);
00135     }
00136 
00137     /* Mark the socket so it will listen for incoming connections */
00138     if ((rc=listen(serverSocket, 1)) < 0) {
00139         fprintf(stderr, "ERROR; return code from listen() is %d\n", rc);
00140         exit(-1);
00141     }
00142 
00143     serverActive = true;
00144         fprintf(stderr, "Listening on port %d\n", serverPort);
00145     rc = pthread_create(&listeningThread, NULL, staticListeningThread, (void *)this);
00146     if (rc){
00147         printf("ERROR; return code from pthread_create() is %d\n", rc);
00148         exit(-1);
00149     }
00150 
00151 }
00152 
00153 Peer* MasaNet::connectToPeer(string address, int connection_type) {
00154     struct sockaddr_in echoServAddr; /* Echo server address */
00155     int max_retries = 10;
00156     int retries = 0;
00157     int ok = 0;
00158     int rc;
00159     int sock;                        /* Socket descriptor */
00160 
00161         char host[256];
00162         int port = 0;
00163         if (sscanf(address.c_str(), "%[^:]:%d", host, &port) < 2) {
00164                 //printf("Connecting to host %s\n", host);
00165         } else {
00166                 //printf("Connecting to host %s:%d\n", host, port);
00167         }
00168 
00169     if (port == 0) {
00170         port = DEFAULT_TCP_PORT;
00171     }
00172 
00173     char ip[128];
00174     hostname_to_ip(host, ip);
00175 
00176     /* Construct the server address structure */
00177     memset(&echoServAddr, 0, sizeof(echoServAddr));     /* Zero out structure */
00178     echoServAddr.sin_family      = AF_INET;             /* Internet address family */
00179     echoServAddr.sin_addr.s_addr = inet_addr(ip);   /* Server IP address */
00180     echoServAddr.sin_port        = htons(port); /* Server port */
00181 
00182     /* Establish the connection to the echo server */
00183         fprintf(stderr, "Connecting to server %s (%s) on port %d\n", host, ip, port);
00184 
00185     /* Create a reliable, stream socket using TCP */
00186         if ((sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
00187                 fprintf(stderr, "ERROR creating socket: %s\n", strerror(errno));
00188                 sleep(1);
00189         }
00190 
00191     while ((retries < max_retries) && !ok) {
00192                 if ((rc=connect(sock, (struct sockaddr *) &echoServAddr, sizeof(sockaddr_in))) < 0) {
00193                         retries++;
00194                         fprintf(stderr, "ERROR connecting to Server [Retry %d/%d]. %s\n",
00195                                         retries, max_retries, strerror(errno));
00196                         sleep(1);
00197                 } else {
00198                         ok = 1;
00199                 }
00200         }
00201         if (!ok) {
00202                 fprintf(stderr, "ERROR connecting to Server.\n");
00203                 return NULL;
00204         }
00205     fprintf(stderr, "Connected to Server %s\n", inet_ntoa(echoServAddr.sin_addr));
00206 
00207         Peer* peersocket = new Peer(sock, myId, true, connection_type);
00208 
00209         pthread_t handshakeThread;
00210         void** args = new void*[2];
00211         args[0] = (void*)this;
00212         args[1] = (void*)peersocket;
00213     rc = pthread_create(&handshakeThread, NULL, staticPeerHandler, (void *)args);
00214     if (rc){
00215         printf("ERROR; return code from pthread_create() is %d\n", rc);
00216         exit(-1);
00217     }
00218     return peersocket;
00219 }
00220 
00221 
00222 void* MasaNet::staticListeningThread(void* arg) {
00223         MasaNet* this_obj = (MasaNet*)arg;
00224     struct sockaddr_in echoClntAddr; /* Client address */
00225     unsigned int clntLen;            /* Length of client address data structure */
00226     int clntSock;                    /* Socket descriptor for client */
00227 
00228     /* Set the size of the in-out parameter */
00229     clntLen = sizeof(echoClntAddr);
00230 
00231     while (this_obj->serverActive) {
00232 
00233                 /* Wait for a client to connect */
00234                 fprintf(stderr, "Waiting clients...\n");
00235                 if ((clntSock = accept(this_obj->serverSocket, (struct sockaddr *) &echoClntAddr, &clntLen)) < 0){
00236                         fprintf(stderr, "ERROR; return code from accept() is %d\n", clntSock);
00237                         exit(-1);
00238                 }
00239 
00240                 /* clntSock is connected to a client! */
00241 
00242                 fprintf(stderr, "Handling client %s\n", inet_ntoa(echoClntAddr.sin_addr));
00243 
00244                 Peer* peersocket = new Peer(clntSock, this_obj->myId, false);
00245 
00246                 pthread_t handshakeThread;
00247                 void** args = new void*[2];
00248                 args[0] = (void*)this_obj;
00249                 args[1] = (void*)peersocket;
00250                 int rc = pthread_create(&handshakeThread, NULL, staticPeerHandler, (void*)args);
00251             if (rc){
00252                 printf("ERROR; return code from pthread_create() is %d\n", rc);
00253                 exit(-1);
00254             }
00255 
00256     }
00257 
00258     close(this_obj->serverSocket); // TODO necessary?
00259 
00260     return NULL;
00261 
00262 }
00263 
00264 void* MasaNet::staticPeerHandler(void* arg) {
00265         void** args = (void**)arg;
00266         MasaNet* this_obj = (MasaNet*)(args[0]);
00267         Peer* socket = (Peer*)(args[1]);
00268 
00269         socket->setLocalType(this_obj->nodeType);
00270 
00271         /* Publish the public address instead of the spurious client port */
00272         if (this_obj->serverPort != 0) {
00273                 char desc[256];
00274                 sprintf(desc, ":%d", this_obj->serverPort);
00275                 socket->setLocalAddress(string(desc));
00276         }
00277 
00278         socket->setCallback(this_obj);
00279         if (!socket->handshake()) {
00280                 /*for (int i=0; i<10 + (this_obj->serverPort%4)*2; i++) {
00281                         fprintf(stderr, "Will disconnect %p\n", socket);
00282                         sleep(1);
00283                 }*/
00284                 delete socket;
00285         }
00286 
00287         if (socket->getConnectionType() == CONNECTION_TYPE_DATA) {
00288                 return NULL;
00289         }
00290 
00291         int locked = false;
00292         try {
00293                 while (socket->isConnected()) {
00294                         fprintf(stderr, "Waiting Command... %p\n", socket);
00295                         Command* cmd = socket->recvCommand();
00296                         if (cmd == NULL) {
00297                                 fprintf(stderr, "Interrupting Command listener\n");
00298                                 break;
00299                         }
00300 
00301                         cmd_handler_f handler = cmdHandlers[cmd->getId()];
00302                         fprintf(stderr, "ReceivedCommand: %d\n", cmd->getId());
00303                         if (handler != NULL) {
00304                                 fprintf(stderr, "Acquiring Lock\n");
00305                                 pthread_mutex_lock(&this_obj->mutex);
00306                                 locked = true;
00307                                 fprintf(stderr, "Acquiring Lock: OK\n");
00308                                 (this_obj->*handler)(cmd, socket);
00309                                 pthread_mutex_unlock(&this_obj->mutex);
00310                                 locked = false;
00311                                 fprintf(stderr, "Unlocked\n");
00312                         }
00313 
00314                 }
00315         } catch(IOException &e) {
00316                 fprintf(stderr, "IOException: %s (%s) %p\n", e.what(), socket->getRemoteId().c_str(), socket);
00317                 if (locked) {
00318                         pthread_mutex_unlock(&this_obj->mutex);
00319                 }
00320         }
00321 
00322 
00323         return NULL;
00324 }
00325 
00326 void MasaNet::broadcastCommand(Command* command, Peer* excludeSocket) {
00327         vector<Peer*> allPeers = peers.getProcessingPeers();
00328         for (vector<Peer*>::iterator it = allPeers.begin() ; it != allPeers.end(); ++it) {
00329                 if ((*it) == excludeSocket) continue;
00330                 if ((*it)->getRemoteType() == TYPE_PROCESSING_NODE) { // TODO retirar?
00331                         (*it)->sendCommand(command);
00332                 }
00333         }
00334 }
00335 
00336 /*
00337     Get ip from domain name
00338  */
00339 int MasaNet::hostname_to_ip(const char *hostname , char *ip) {
00340         // Source: http://www.binarytides.com/hostname-to-ip-address-c-sockets-linux/
00341     int sockfd;
00342     struct addrinfo hints, *servinfo, *p;
00343     struct sockaddr_in *h;
00344     int rv;
00345 
00346     memset(&hints, 0, sizeof hints);
00347     hints.ai_family = AF_UNSPEC; // use AF_INET6 to force IPv6
00348     hints.ai_socktype = SOCK_STREAM;
00349 
00350     if ( (rv = getaddrinfo( hostname , "http" , &hints , &servinfo)) != 0)
00351     {
00352         fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
00353         return 1;
00354     }
00355 
00356     // loop through all the results and connect to the first we can
00357     for(p = servinfo; p != NULL; p = p->ai_next)
00358     {
00359         h = (struct sockaddr_in *) p->ai_addr;
00360         strcpy(ip , inet_ntoa( h->sin_addr ) );
00361     }
00362 
00363     freeaddrinfo(servinfo); // all done with this structure
00364     return 0;
00365 }
00366 
00367 Peer* MasaNet::solveSimultaneousConnection(Peer* newPeer, Peer* oldPeer) {
00368         if ( (newPeer->getRemoteId() < myId) ^ (newPeer->isInitiator()) ) {
00369                 newPeer->finalize();
00370                 return oldPeer;
00371         } else {
00372                 oldPeer->finalize();
00373                 return newPeer;
00374         }
00375 }
00376 
00377 bool MasaNet::onConnect(Peer* peer) {
00378         fprintf(stderr, "**** ON CONNECT\n");
00379 
00380         pthread_mutex_lock(&this->mutex);
00381         if (peers.get(peer->getRemoteId()) != NULL) {
00382                 Peer* preferrablePeer = solveSimultaneousConnection(peer, peers.get(peer->getRemoteId()));
00383                 if (preferrablePeer == peer) {
00384                         printf("Peer already connected %s %p. Replacing\n", peer->getRemoteId().c_str(), peer);
00385                         peers.erase(peer->getRemoteId());
00386                 } else {
00387                         pthread_mutex_unlock(&this->mutex);
00388                         printf("Peer already connected %s %p. Aborting\n", peer->getRemoteId().c_str(), peer);
00389                         return false;
00390                 }
00391         }
00392 
00393         peers.add(peer);
00394         printf("node connected: %s\n", peer->getRemoteId().c_str());
00395 
00396         if (peer->getRemoteId() == this->leftPeerId) {
00397                 this->leftPeer = peer;
00398                 peer->ringType = RING_LEFT;
00399         } else if (peer->getRemoteId() == this->rightPeerId) {
00400                 this->rightPeer = peer;
00401                 peer->ringType = RING_RIGHT;
00402         } else {
00403                 peer->ringType = RING_NONE;
00404         }
00405         printf("node connected: %s.. %s|%s %d\n", peer->getRemoteId().c_str(), this->leftPeerId.c_str(), this->rightPeerId.c_str(), peer->ringType);
00406 
00407 
00408         if (peer->getRemoteType() == TYPE_PROCESSING_NODE) {
00409 
00410                 if (this->discoveredPeers.get(peer->getRemoteId()) == NULL) {
00411                         this->discoveredPeers.add(peer);
00412                         printf("node discovered: %s\n", peer->getRemoteId().c_str());
00413 
00414                         CmdDiscover cmd;
00415                         cmd.addPeers(this->discoveredPeers.getProcessingPeers());
00416                         peer->sendCommand(&cmd);
00417 
00418                         vector<Peer*> announcedNode;
00419                         announcedNode.push_back(peer);
00420                         this->announce(announcedNode, peer); // broadcast
00421                 } else {
00422                         this->discoveredPeers.add(peer);
00423                         printf("node updated: %s\n", peer->getRemoteId().c_str());
00424                 }
00425         }
00426         pthread_mutex_unlock(&this->mutex);
00427 
00428         return true;
00429 }
00430 
00431 bool MasaNet::onConnectData(Peer* peer) {
00432         fprintf(stderr, "**** ON CONNECT DATA **********\n");
00433         bool successful = true;
00434 
00435         pthread_mutex_lock(&this->mutex);
00436 
00437         if (peer->getRemoteId() == this->leftPeerId) {
00438                 if (leftPeerData == NULL) {
00439                         this->leftPeerData = peer;
00440                 } else {
00441                         this->leftPeerData = solveSimultaneousConnection(peer, this->leftPeerData);
00442                         if (this->leftPeerData == peer) {
00443                                 printf("Data peer already connected %s %p. Replacing\n", peer->getRemoteId().c_str(), peer);
00444                         } else {
00445                                 printf("Data Peer already connected %s %p. Aborting\n", peer->getRemoteId().c_str(), peer);
00446                                 successful = false;
00447                         }
00448                 }
00449         } else if (peer->getRemoteId() == this->rightPeerId) {
00450                 if (rightPeerData == NULL) {
00451                         this->rightPeerData = peer;
00452                 } else {
00453                         this->rightPeerData = solveSimultaneousConnection(peer, this->rightPeerData);
00454                         if (this->rightPeerData == peer) {
00455                                 printf("Data peer already connected %s %p. Replacing\n", peer->getRemoteId().c_str(), peer);
00456                         } else {
00457                                 printf("Data Peer already connected %s %p. Aborting\n", peer->getRemoteId().c_str(), peer);
00458                                 successful = false;
00459                         }
00460                 }
00461         }
00462         pthread_mutex_unlock(&this->mutex);
00463 
00464         return successful;
00465 }
00466 
00467 
00468 void MasaNet::onDisconnect(Peer* peer) {
00469         fprintf(stderr, "**** ON DISCONNECT !!!!\n");
00470 
00471 
00472         bool sendDisanounce = false;
00473         pthread_mutex_lock(&this->mutex);
00474         if (this->peers.get(peer->getRemoteId()) == NULL &&
00475                         this->discoveredPeers.get(peer->getRemoteId()) != NULL) {
00476                 this->peers.erase(peer->getRemoteId());
00477                 sendDisanounce = true;
00478                 this->discoveredPeers.erase(peer->getRemoteId());
00479         }
00480         pthread_mutex_unlock(&this->mutex);
00481 
00482         //if (sendDisanounce) {
00483                 //this->disanounce(peer, NULL);
00484         //}
00485 }
00486 
00487 
00488 const PeerList& MasaNet::getConnectedPeers() const {
00489         return peers;
00490 }
00491 
00492 const PeerList& MasaNet::getDiscoveredPeers() const {
00493         return discoveredPeers;
00494 }
00495 
00496 
00497 
00498 
00499 
00500 
00501 void MasaNet::announce(vector<Peer*> announcedPeers, Peer* excludeSocket) {
00502         CmdDiscover cmd;
00503         if (announcedPeers.size() == 0) {
00504                 return;
00505         }
00506         cmd.addPeers(announcedPeers);
00507         broadcastCommand(&cmd, excludeSocket);
00508 }
00509 
00510 void MasaNet::disanounce(Peer* peer, Peer* excludeSocket) {
00511         CmdUndiscover cmd;
00512         if (peer == NULL) {
00513                 return;
00514         }
00515         cmd.addPeer(peer);
00516         fprintf(stderr, "Disanouncing: %s\n", peer->getRemoteId().c_str());
00517         broadcastCommand(&cmd, excludeSocket);
00518 }
00519 
00520 
00521 
00522 MasaNetStatus* MasaNet::getPeerStatus() {
00523         vector<Peer*> allPeers = peers.getProcessingPeers();
00524         for (vector<Peer*>::iterator it = allPeers.begin() ; it != allPeers.end(); ++it) {
00525                 CmdStatusRequest request;
00526                 CmdStatusResponse* response = (CmdStatusResponse*)(*it)->sendCommand(&request);
00527                 if (response != NULL) {
00528                         status = response->getStatus();
00529                         // FIXME Memory leak - response
00530                         return &status;
00531                 }
00532         }
00533         return NULL;
00534 }
00535 
00536 Peer* MasaNet::getPeer(const string& peer) {
00537         Peer* socket;
00538         if (peer.length() == 0) {
00539                 socket = peers.getProcessingPeers().front();
00540         } else {
00541                 socket = peers.get(peer);
00542         }
00543         return socket;
00544 }
00545 
00546 const vector<Peer*>& MasaNet::getRemotePeers(string peer, int type) {
00547         vector<Peer*> remotePeers;
00548         CmdPeerRequest request;
00549         request.setType(type);
00550         Peer* socket = getPeer(peer);
00551         CmdPeerResponse* response = (CmdPeerResponse*)socket->sendCommand(&request);
00552         return response->getPeers();
00553 }
00554 
00555 void MasaNet::createRing() {
00556         CmdCreateRing cmd;
00557         broadcastCommand(&cmd);
00558 }
00559 
00560 
00561 const set<string>& MasaNet::testRing(string peer) {
00562         CmdTestRing cmd(myId);
00563 
00564         Peer* socket = getPeer(peer);
00565         socket->addHook(cmd.getId());
00566         socket->sendCommand(&cmd);
00567         CmdTestRing* async = (CmdTestRing*)socket->waitHook();
00568         set<string> ret = async->getIds();
00569 
00570         return ret;
00571 }
00572 
00573 
00574 
00575 void MasaNet::cmd_discover(Command* _cmd, Peer* socket) {
00576         fprintf(stderr, "Cmd Discover %p\n", _cmd);
00577         CmdDiscover* cmd = (CmdDiscover*)_cmd;
00578         vector<Peer*> receivedPeers = cmd->getPeers();
00579 
00580         vector<Peer*> newPeers;
00581         for (vector<Peer*>::iterator it = receivedPeers.begin() ; it != receivedPeers.end(); ++it) {
00582                 if (discoveredPeers.get((*it)->getRemoteId()) == NULL) {
00583                         newPeers.push_back(*it);
00584                         discoveredPeers.add(*it);
00585                         printf("New Node: %s\n", (*it)->getRemoteId().c_str());
00586                 }
00587         }
00588 
00589         announce(newPeers, socket);
00590 
00591         /*vector<peer_t> diff;
00592         for (map<string, peer_t>::iterator it = getConnectedPeers().begin() ; it != getConnectedPeers().end(); ++it) {
00593                 sendCommand(it->second.socket, cmd);
00594         }
00595         std::set_difference(peers.begin(), peers.end(), this->discoveredPeers.begin(), this->discoveredPeers.end(),
00596                                 std::inserter(result, result.end()));
00597 
00598         fprintf(stderr, "Command Discover: %d\n", peers.size());
00599         int len0 = this->discoveredPeers.size();
00600         this->discoveredPeers.insert(peers.begin(), peers.end());
00601         int len1 = this->discoveredPeers.size();
00602         for (set<string>::iterator it=peers.begin(); it!=peers.end(); ++it) {
00603                 fprintf(stderr, "-> %s\n", it->c_str());
00604         }
00605         if (true || len0 != len1) {
00606                 for (map<string, peer_t>::iterator it = getConnectedPeers().begin() ; it != getConnectedPeers().end(); ++it) {
00607                         sendCommand(it->second.socket, cmd);
00608                 }
00609         }*/
00610 }
00611 
00612 void MasaNet::cmd_undiscover(Command* _cmd, Peer* socket) {
00613         fprintf(stderr, "Cmd Undiscover %p\n", _cmd);
00614         /*CmdUndiscover* cmd = (CmdUndiscover*)_cmd;
00615         vector<Peer*> receivedPeers = cmd->getPeers();
00616         Peer* peer = receivedPeers.front();
00617 
00618         if (discoveredPeers.get(peer->getRemoteId()) != NULL) {
00619                 if (peers.get(peer->getRemoteId()) == NULL) {
00620                         printf("Deleting Node: %s\n", peer->getRemoteId().c_str());
00621                         discoveredPeers.erase(peer->getRemoteId());
00622                         //discoveredPeers.copy(peers.getProcessingPeers());
00623                         disanounce(peer, socket);
00624                         //announce(peers.getProcessingPeers(), NULL);
00625                 } else {
00626                         announce(receivedPeers, NULL);
00627                 }
00628         }*/
00629 
00630 }
00631 
00632 void MasaNet::cmd_status_request(Command* _cmd, Peer* socket) {
00633         fprintf(stderr, "Cmd Status Request %p\n", _cmd);
00634         CmdStatusResponse response(&status);
00635         response.setSerial(_cmd->getSerial());
00636         socket->sendCommand(&response);
00637 }
00638 
00639 void MasaNet::cmd_peer_request(Command* _cmd, Peer* socket) {
00640         CmdPeerRequest* request = (CmdPeerRequest*)_cmd;;
00641         fprintf(stderr, "Cmd Peer Request %p (%d)\n", _cmd, request->getType());
00642         CmdPeerResponse response;
00643         if (request->getType() == CMD_CONNECTED_PEERS) {
00644                 response.addPeers(peers.getProcessingPeers());
00645         } else if (request->getType() == CMD_DISCOVERED_PEERS) {
00646                 response.addPeers(discoveredPeers.getProcessingPeers());
00647         } else if (request->getType() == CMD_DATA_PEERS) {
00648                 response.addPeer(leftPeerData);
00649                 response.addPeer(rightPeerData);
00650         }
00651         response.setSerial(_cmd->getSerial());
00652         socket->sendCommand(&response);
00653 }
00654 
00655 Peer* MasaNet::getLeftPeer() const {
00656         return leftPeer;
00657 }
00658 
00659 Peer* MasaNet::getRightPeer() const {
00660         return rightPeer;
00661 }
00662 
00663 void MasaNet::cmd_create_ring(Command* _cmd, Peer* socket) {
00664         fprintf(stderr, "Cmd Create Ring %p\n", _cmd);
00665 
00666         if (status.getProcessingState() != STATE_IDLE) {
00667                 printf("Server is not idle: %d.\n", status.getProcessingState());
00668                 return;
00669         }
00670         status.setProcessingState(STATE_CREATING_RING);
00671         printf("Creating Ring.\n");
00672 
00673         createRing();
00674 
00675         Peer* prev = discoveredPeers.getPrev(myId);
00676         leftPeerId = prev->getRemoteId();
00677         Peer* next = discoveredPeers.getNext(myId);
00678         rightPeerId = next->getRemoteId();
00679 
00680         printf("I must connect to nodes: %s and %s\n", leftPeerId.c_str(), rightPeerId.c_str());
00681 
00682         if (peers.get(rightPeerId) == NULL) {
00683                 connectToPeer(next->getRemoteAddress(), CONNECTION_TYPE_CTRL);
00684         } else {
00685                 rightPeer = next;
00686                 rightPeer->ringType = RING_RIGHT;
00687         }
00688 
00689         if (peers.get(leftPeerId) == NULL) {
00690                 connectToPeer(prev->getRemoteAddress(), CONNECTION_TYPE_CTRL);
00691         } else {
00692                 leftPeer = prev;
00693                 leftPeer->ringType = RING_LEFT;
00694         }
00695 
00696         if (rightPeerData == NULL) {
00697                 connectToPeer(next->getRemoteAddress(), CONNECTION_TYPE_DATA);
00698         }
00699         if (leftPeerData == NULL) {
00700                 connectToPeer(next->getRemoteAddress(), CONNECTION_TYPE_DATA);
00701         }
00702 
00703 }
00704 
00705 
00706 void MasaNet::cmd_test_ring(Command* _cmd, Peer* socket) {
00707         fprintf(stderr, "Cmd Test Ring %p\n", _cmd);
00708         CmdTestRing* cmd = (CmdTestRing*)_cmd;
00709 
00710         if (status.getProcessingState() == STATE_IDLE) {
00711                 printf("Ring is not created.\n");
00712                 return;
00713         }
00714         if (cmd->containsId(myId)) {
00715                 // Loop finished
00716                 if (cmd->getOriginatorId() != myId) {
00717                         cmd->addId(myId);
00718                         peers.get(cmd->getOriginatorId())->sendCommand(cmd);
00719                 } else {
00720                         string firstId = *(cmd->getIds().begin());
00721                         if (firstId != myId) {
00722                                 fprintf(stderr, "Ring is not fully connected: %d peer(s)\n", cmd->getIds().size());
00723                         } else {
00724                                 fprintf(stderr, "Ring is fully connected with %d peer(s)\n", cmd->getIds().size());
00725                         }
00726                 }
00727         } else {
00728                 cmd->addId(myId);
00729 
00730                 Peer* next = getRightPeer();
00731                 next->sendCommand(cmd);
00732         }
00733 }