|
MASA-Core
|
00001 /******************************************************************************* 00002 * 00003 * Copyright (c) 2010-2015 Edans Sandes 00004 * 00005 * This file is part of MASA-Core. 00006 * 00007 * MASA-Core is free software: you can redistribute it and/or modify 00008 * it under the terms of the GNU General Public License as published by 00009 * the Free Software Foundation, either version 3 of the License, or 00010 * (at your option) any later version. 00011 * 00012 * MASA-Core is distributed in the hope that it will be useful, 00013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 * GNU General Public License for more details. 00016 * 00017 * You should have received a copy of the GNU General Public License 00018 * along with MASA-Core. If not, see <http://www.gnu.org/licenses/>. 00019 * 00020 ******************************************************************************/ 00021 00022 #include "MasaNet.hpp" 00023 #include "command/CmdJoin.hpp" 00024 #include "command/CmdDiscover.hpp" 00025 #include "command/CmdUndiscover.hpp" 00026 #include "command/CmdNotifyScore.hpp" 00027 #include "command/CmdStatusRequest.hpp" 00028 #include "command/CmdStatusResponse.hpp" 00029 #include "command/CmdPeerRequest.hpp" 00030 #include "command/CmdPeerResponse.hpp" 00031 #include "command/CmdCreateRing.hpp" 00032 #include "command/CmdTestRing.hpp" 00033 00034 #include <stdio.h> 00035 #include <stdlib.h> 00036 #include <string.h> 00037 00038 #include <sys/socket.h> /* for socket(), bind(), and connect() */ 00039 #include <arpa/inet.h> /* for sockaddr_in and inet_ntoa() */ 00040 #include <errno.h> 00041 #include <netdb.h> 00042 #include <unistd.h> 00043 #include <stdlib.h> 00044 00045 #include <string> 00046 #include <algorithm> 00047 using namespace std; 00048 00049 00050 map<int, cmd_handler_f> MasaNet::cmdHandlers; 00051 00052 #define DEFAULT_TCP_PORT (12777) 00053 00054 MasaNet::MasaNet(int nodeType, string nodeDescription) 00055 { 00056 this->nodeType = nodeType; 00057 this->nodeDescription = nodeDescription; 00058 this->leftPeer = NULL; 00059 this->leftPeerData = NULL; 00060 this->rightPeer = NULL; 00061 this->rightPeerData = NULL; 00062 timeval event; 00063 gettimeofday(&event, NULL); 00064 long long nsec = event.tv_usec%1000000; 00065 srand(time(NULL)+nsec); 00066 for (int i=0; i<12; i++) { 00067 int ic = (rand()%52); 00068 char c = (ic<26 ? ('A'+ic) : ('a'+ic-26)); 00069 myId += c; 00070 } 00071 fprintf(stderr, "MasaNet Name: %s\n", myId.c_str()); 00072 00073 registerCommand(CmdJoin::creator, NULL); 00074 registerCommand(CmdDiscover::creator, &MasaNet::cmd_discover); 00075 registerCommand(CmdUndiscover::creator, &MasaNet::cmd_undiscover); 00076 registerCommand(CmdNotifyScore::creator, NULL); 00077 registerCommand(CmdStatusRequest::creator, &MasaNet::cmd_status_request); 00078 registerCommand(CmdStatusResponse::creator, NULL); 00079 registerCommand(CmdPeerRequest::creator, &MasaNet::cmd_peer_request); 00080 registerCommand(CmdPeerResponse::creator, NULL); 00081 registerCommand(CmdCreateRing::creator, &MasaNet::cmd_create_ring); 00082 registerCommand(CmdTestRing::creator, &MasaNet::cmd_test_ring); 00083 00084 pthread_mutex_init(&mutex, NULL); 00085 00086 } 00087 00088 MasaNet::~MasaNet() { 00089 pthread_mutex_destroy(&mutex); 00090 } 00091 00092 void MasaNet::registerCommand(cmd_creator_f creator, cmd_handler_f handler) { 00093 Command* cmd = creator(); 00094 int id = cmd->getId(); 00095 delete cmd; 00096 00097 cmdHandlers[id] = handler; 00098 Peer::registerCommandCreator(id, creator); 00099 } 00100 00101 00102 00103 void MasaNet::startServer(int port) { 00104 int rc; 00105 struct sockaddr_in echoServAddr; /* Local address */ 00106 00107 if (port == 0) { 00108 serverPort = DEFAULT_TCP_PORT; 00109 } else { 00110 serverPort = port; 00111 } 00112 00113 00114 /* Create socket for incoming connections */ 00115 if ((serverSocket = socket(PF_INET, SOCK_STREAM, 0)) < 0) { 00116 fprintf(stderr, "ERROR creating server socket; return code from socket() is %d\n", serverSocket); 00117 exit(-1); 00118 } 00119 00120 int optval = 1; 00121 setsockopt(serverSocket, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); 00122 00123 00124 00125 /* Construct local address structure */ 00126 memset(&echoServAddr, 0, sizeof(echoServAddr)); /* Zero out structure */ 00127 echoServAddr.sin_family = AF_INET; /* Internet address family */ 00128 echoServAddr.sin_addr.s_addr = htonl(INADDR_ANY); /* Any incoming interface */ 00129 echoServAddr.sin_port = htons(serverPort); /* Local port */ 00130 00131 /* Bind to the local address */ 00132 if ((rc = bind(serverSocket, (struct sockaddr *) &echoServAddr, sizeof(echoServAddr))) < 0) { 00133 fprintf(stderr, "ERROR; return code from bind() is %d\n", rc); 00134 exit(-1); 00135 } 00136 00137 /* Mark the socket so it will listen for incoming connections */ 00138 if ((rc=listen(serverSocket, 1)) < 0) { 00139 fprintf(stderr, "ERROR; return code from listen() is %d\n", rc); 00140 exit(-1); 00141 } 00142 00143 serverActive = true; 00144 fprintf(stderr, "Listening on port %d\n", serverPort); 00145 rc = pthread_create(&listeningThread, NULL, staticListeningThread, (void *)this); 00146 if (rc){ 00147 printf("ERROR; return code from pthread_create() is %d\n", rc); 00148 exit(-1); 00149 } 00150 00151 } 00152 00153 Peer* MasaNet::connectToPeer(string address, int connection_type) { 00154 struct sockaddr_in echoServAddr; /* Echo server address */ 00155 int max_retries = 10; 00156 int retries = 0; 00157 int ok = 0; 00158 int rc; 00159 int sock; /* Socket descriptor */ 00160 00161 char host[256]; 00162 int port = 0; 00163 if (sscanf(address.c_str(), "%[^:]:%d", host, &port) < 2) { 00164 //printf("Connecting to host %s\n", host); 00165 } else { 00166 //printf("Connecting to host %s:%d\n", host, port); 00167 } 00168 00169 if (port == 0) { 00170 port = DEFAULT_TCP_PORT; 00171 } 00172 00173 char ip[128]; 00174 hostname_to_ip(host, ip); 00175 00176 /* Construct the server address structure */ 00177 memset(&echoServAddr, 0, sizeof(echoServAddr)); /* Zero out structure */ 00178 echoServAddr.sin_family = AF_INET; /* Internet address family */ 00179 echoServAddr.sin_addr.s_addr = inet_addr(ip); /* Server IP address */ 00180 echoServAddr.sin_port = htons(port); /* Server port */ 00181 00182 /* Establish the connection to the echo server */ 00183 fprintf(stderr, "Connecting to server %s (%s) on port %d\n", host, ip, port); 00184 00185 /* Create a reliable, stream socket using TCP */ 00186 if ((sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) { 00187 fprintf(stderr, "ERROR creating socket: %s\n", strerror(errno)); 00188 sleep(1); 00189 } 00190 00191 while ((retries < max_retries) && !ok) { 00192 if ((rc=connect(sock, (struct sockaddr *) &echoServAddr, sizeof(sockaddr_in))) < 0) { 00193 retries++; 00194 fprintf(stderr, "ERROR connecting to Server [Retry %d/%d]. %s\n", 00195 retries, max_retries, strerror(errno)); 00196 sleep(1); 00197 } else { 00198 ok = 1; 00199 } 00200 } 00201 if (!ok) { 00202 fprintf(stderr, "ERROR connecting to Server.\n"); 00203 return NULL; 00204 } 00205 fprintf(stderr, "Connected to Server %s\n", inet_ntoa(echoServAddr.sin_addr)); 00206 00207 Peer* peersocket = new Peer(sock, myId, true, connection_type); 00208 00209 pthread_t handshakeThread; 00210 void** args = new void*[2]; 00211 args[0] = (void*)this; 00212 args[1] = (void*)peersocket; 00213 rc = pthread_create(&handshakeThread, NULL, staticPeerHandler, (void *)args); 00214 if (rc){ 00215 printf("ERROR; return code from pthread_create() is %d\n", rc); 00216 exit(-1); 00217 } 00218 return peersocket; 00219 } 00220 00221 00222 void* MasaNet::staticListeningThread(void* arg) { 00223 MasaNet* this_obj = (MasaNet*)arg; 00224 struct sockaddr_in echoClntAddr; /* Client address */ 00225 unsigned int clntLen; /* Length of client address data structure */ 00226 int clntSock; /* Socket descriptor for client */ 00227 00228 /* Set the size of the in-out parameter */ 00229 clntLen = sizeof(echoClntAddr); 00230 00231 while (this_obj->serverActive) { 00232 00233 /* Wait for a client to connect */ 00234 fprintf(stderr, "Waiting clients...\n"); 00235 if ((clntSock = accept(this_obj->serverSocket, (struct sockaddr *) &echoClntAddr, &clntLen)) < 0){ 00236 fprintf(stderr, "ERROR; return code from accept() is %d\n", clntSock); 00237 exit(-1); 00238 } 00239 00240 /* clntSock is connected to a client! */ 00241 00242 fprintf(stderr, "Handling client %s\n", inet_ntoa(echoClntAddr.sin_addr)); 00243 00244 Peer* peersocket = new Peer(clntSock, this_obj->myId, false); 00245 00246 pthread_t handshakeThread; 00247 void** args = new void*[2]; 00248 args[0] = (void*)this_obj; 00249 args[1] = (void*)peersocket; 00250 int rc = pthread_create(&handshakeThread, NULL, staticPeerHandler, (void*)args); 00251 if (rc){ 00252 printf("ERROR; return code from pthread_create() is %d\n", rc); 00253 exit(-1); 00254 } 00255 00256 } 00257 00258 close(this_obj->serverSocket); // TODO necessary? 00259 00260 return NULL; 00261 00262 } 00263 00264 void* MasaNet::staticPeerHandler(void* arg) { 00265 void** args = (void**)arg; 00266 MasaNet* this_obj = (MasaNet*)(args[0]); 00267 Peer* socket = (Peer*)(args[1]); 00268 00269 socket->setLocalType(this_obj->nodeType); 00270 00271 /* Publish the public address instead of the spurious client port */ 00272 if (this_obj->serverPort != 0) { 00273 char desc[256]; 00274 sprintf(desc, ":%d", this_obj->serverPort); 00275 socket->setLocalAddress(string(desc)); 00276 } 00277 00278 socket->setCallback(this_obj); 00279 if (!socket->handshake()) { 00280 /*for (int i=0; i<10 + (this_obj->serverPort%4)*2; i++) { 00281 fprintf(stderr, "Will disconnect %p\n", socket); 00282 sleep(1); 00283 }*/ 00284 delete socket; 00285 } 00286 00287 if (socket->getConnectionType() == CONNECTION_TYPE_DATA) { 00288 return NULL; 00289 } 00290 00291 int locked = false; 00292 try { 00293 while (socket->isConnected()) { 00294 fprintf(stderr, "Waiting Command... %p\n", socket); 00295 Command* cmd = socket->recvCommand(); 00296 if (cmd == NULL) { 00297 fprintf(stderr, "Interrupting Command listener\n"); 00298 break; 00299 } 00300 00301 cmd_handler_f handler = cmdHandlers[cmd->getId()]; 00302 fprintf(stderr, "ReceivedCommand: %d\n", cmd->getId()); 00303 if (handler != NULL) { 00304 fprintf(stderr, "Acquiring Lock\n"); 00305 pthread_mutex_lock(&this_obj->mutex); 00306 locked = true; 00307 fprintf(stderr, "Acquiring Lock: OK\n"); 00308 (this_obj->*handler)(cmd, socket); 00309 pthread_mutex_unlock(&this_obj->mutex); 00310 locked = false; 00311 fprintf(stderr, "Unlocked\n"); 00312 } 00313 00314 } 00315 } catch(IOException &e) { 00316 fprintf(stderr, "IOException: %s (%s) %p\n", e.what(), socket->getRemoteId().c_str(), socket); 00317 if (locked) { 00318 pthread_mutex_unlock(&this_obj->mutex); 00319 } 00320 } 00321 00322 00323 return NULL; 00324 } 00325 00326 void MasaNet::broadcastCommand(Command* command, Peer* excludeSocket) { 00327 vector<Peer*> allPeers = peers.getProcessingPeers(); 00328 for (vector<Peer*>::iterator it = allPeers.begin() ; it != allPeers.end(); ++it) { 00329 if ((*it) == excludeSocket) continue; 00330 if ((*it)->getRemoteType() == TYPE_PROCESSING_NODE) { // TODO retirar? 00331 (*it)->sendCommand(command); 00332 } 00333 } 00334 } 00335 00336 /* 00337 Get ip from domain name 00338 */ 00339 int MasaNet::hostname_to_ip(const char *hostname , char *ip) { 00340 // Source: http://www.binarytides.com/hostname-to-ip-address-c-sockets-linux/ 00341 int sockfd; 00342 struct addrinfo hints, *servinfo, *p; 00343 struct sockaddr_in *h; 00344 int rv; 00345 00346 memset(&hints, 0, sizeof hints); 00347 hints.ai_family = AF_UNSPEC; // use AF_INET6 to force IPv6 00348 hints.ai_socktype = SOCK_STREAM; 00349 00350 if ( (rv = getaddrinfo( hostname , "http" , &hints , &servinfo)) != 0) 00351 { 00352 fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); 00353 return 1; 00354 } 00355 00356 // loop through all the results and connect to the first we can 00357 for(p = servinfo; p != NULL; p = p->ai_next) 00358 { 00359 h = (struct sockaddr_in *) p->ai_addr; 00360 strcpy(ip , inet_ntoa( h->sin_addr ) ); 00361 } 00362 00363 freeaddrinfo(servinfo); // all done with this structure 00364 return 0; 00365 } 00366 00367 Peer* MasaNet::solveSimultaneousConnection(Peer* newPeer, Peer* oldPeer) { 00368 if ( (newPeer->getRemoteId() < myId) ^ (newPeer->isInitiator()) ) { 00369 newPeer->finalize(); 00370 return oldPeer; 00371 } else { 00372 oldPeer->finalize(); 00373 return newPeer; 00374 } 00375 } 00376 00377 bool MasaNet::onConnect(Peer* peer) { 00378 fprintf(stderr, "**** ON CONNECT\n"); 00379 00380 pthread_mutex_lock(&this->mutex); 00381 if (peers.get(peer->getRemoteId()) != NULL) { 00382 Peer* preferrablePeer = solveSimultaneousConnection(peer, peers.get(peer->getRemoteId())); 00383 if (preferrablePeer == peer) { 00384 printf("Peer already connected %s %p. Replacing\n", peer->getRemoteId().c_str(), peer); 00385 peers.erase(peer->getRemoteId()); 00386 } else { 00387 pthread_mutex_unlock(&this->mutex); 00388 printf("Peer already connected %s %p. Aborting\n", peer->getRemoteId().c_str(), peer); 00389 return false; 00390 } 00391 } 00392 00393 peers.add(peer); 00394 printf("node connected: %s\n", peer->getRemoteId().c_str()); 00395 00396 if (peer->getRemoteId() == this->leftPeerId) { 00397 this->leftPeer = peer; 00398 peer->ringType = RING_LEFT; 00399 } else if (peer->getRemoteId() == this->rightPeerId) { 00400 this->rightPeer = peer; 00401 peer->ringType = RING_RIGHT; 00402 } else { 00403 peer->ringType = RING_NONE; 00404 } 00405 printf("node connected: %s.. %s|%s %d\n", peer->getRemoteId().c_str(), this->leftPeerId.c_str(), this->rightPeerId.c_str(), peer->ringType); 00406 00407 00408 if (peer->getRemoteType() == TYPE_PROCESSING_NODE) { 00409 00410 if (this->discoveredPeers.get(peer->getRemoteId()) == NULL) { 00411 this->discoveredPeers.add(peer); 00412 printf("node discovered: %s\n", peer->getRemoteId().c_str()); 00413 00414 CmdDiscover cmd; 00415 cmd.addPeers(this->discoveredPeers.getProcessingPeers()); 00416 peer->sendCommand(&cmd); 00417 00418 vector<Peer*> announcedNode; 00419 announcedNode.push_back(peer); 00420 this->announce(announcedNode, peer); // broadcast 00421 } else { 00422 this->discoveredPeers.add(peer); 00423 printf("node updated: %s\n", peer->getRemoteId().c_str()); 00424 } 00425 } 00426 pthread_mutex_unlock(&this->mutex); 00427 00428 return true; 00429 } 00430 00431 bool MasaNet::onConnectData(Peer* peer) { 00432 fprintf(stderr, "**** ON CONNECT DATA **********\n"); 00433 bool successful = true; 00434 00435 pthread_mutex_lock(&this->mutex); 00436 00437 if (peer->getRemoteId() == this->leftPeerId) { 00438 if (leftPeerData == NULL) { 00439 this->leftPeerData = peer; 00440 } else { 00441 this->leftPeerData = solveSimultaneousConnection(peer, this->leftPeerData); 00442 if (this->leftPeerData == peer) { 00443 printf("Data peer already connected %s %p. Replacing\n", peer->getRemoteId().c_str(), peer); 00444 } else { 00445 printf("Data Peer already connected %s %p. Aborting\n", peer->getRemoteId().c_str(), peer); 00446 successful = false; 00447 } 00448 } 00449 } else if (peer->getRemoteId() == this->rightPeerId) { 00450 if (rightPeerData == NULL) { 00451 this->rightPeerData = peer; 00452 } else { 00453 this->rightPeerData = solveSimultaneousConnection(peer, this->rightPeerData); 00454 if (this->rightPeerData == peer) { 00455 printf("Data peer already connected %s %p. Replacing\n", peer->getRemoteId().c_str(), peer); 00456 } else { 00457 printf("Data Peer already connected %s %p. Aborting\n", peer->getRemoteId().c_str(), peer); 00458 successful = false; 00459 } 00460 } 00461 } 00462 pthread_mutex_unlock(&this->mutex); 00463 00464 return successful; 00465 } 00466 00467 00468 void MasaNet::onDisconnect(Peer* peer) { 00469 fprintf(stderr, "**** ON DISCONNECT !!!!\n"); 00470 00471 00472 bool sendDisanounce = false; 00473 pthread_mutex_lock(&this->mutex); 00474 if (this->peers.get(peer->getRemoteId()) == NULL && 00475 this->discoveredPeers.get(peer->getRemoteId()) != NULL) { 00476 this->peers.erase(peer->getRemoteId()); 00477 sendDisanounce = true; 00478 this->discoveredPeers.erase(peer->getRemoteId()); 00479 } 00480 pthread_mutex_unlock(&this->mutex); 00481 00482 //if (sendDisanounce) { 00483 //this->disanounce(peer, NULL); 00484 //} 00485 } 00486 00487 00488 const PeerList& MasaNet::getConnectedPeers() const { 00489 return peers; 00490 } 00491 00492 const PeerList& MasaNet::getDiscoveredPeers() const { 00493 return discoveredPeers; 00494 } 00495 00496 00497 00498 00499 00500 00501 void MasaNet::announce(vector<Peer*> announcedPeers, Peer* excludeSocket) { 00502 CmdDiscover cmd; 00503 if (announcedPeers.size() == 0) { 00504 return; 00505 } 00506 cmd.addPeers(announcedPeers); 00507 broadcastCommand(&cmd, excludeSocket); 00508 } 00509 00510 void MasaNet::disanounce(Peer* peer, Peer* excludeSocket) { 00511 CmdUndiscover cmd; 00512 if (peer == NULL) { 00513 return; 00514 } 00515 cmd.addPeer(peer); 00516 fprintf(stderr, "Disanouncing: %s\n", peer->getRemoteId().c_str()); 00517 broadcastCommand(&cmd, excludeSocket); 00518 } 00519 00520 00521 00522 MasaNetStatus* MasaNet::getPeerStatus() { 00523 vector<Peer*> allPeers = peers.getProcessingPeers(); 00524 for (vector<Peer*>::iterator it = allPeers.begin() ; it != allPeers.end(); ++it) { 00525 CmdStatusRequest request; 00526 CmdStatusResponse* response = (CmdStatusResponse*)(*it)->sendCommand(&request); 00527 if (response != NULL) { 00528 status = response->getStatus(); 00529 // FIXME Memory leak - response 00530 return &status; 00531 } 00532 } 00533 return NULL; 00534 } 00535 00536 Peer* MasaNet::getPeer(const string& peer) { 00537 Peer* socket; 00538 if (peer.length() == 0) { 00539 socket = peers.getProcessingPeers().front(); 00540 } else { 00541 socket = peers.get(peer); 00542 } 00543 return socket; 00544 } 00545 00546 const vector<Peer*>& MasaNet::getRemotePeers(string peer, int type) { 00547 vector<Peer*> remotePeers; 00548 CmdPeerRequest request; 00549 request.setType(type); 00550 Peer* socket = getPeer(peer); 00551 CmdPeerResponse* response = (CmdPeerResponse*)socket->sendCommand(&request); 00552 return response->getPeers(); 00553 } 00554 00555 void MasaNet::createRing() { 00556 CmdCreateRing cmd; 00557 broadcastCommand(&cmd); 00558 } 00559 00560 00561 const set<string>& MasaNet::testRing(string peer) { 00562 CmdTestRing cmd(myId); 00563 00564 Peer* socket = getPeer(peer); 00565 socket->addHook(cmd.getId()); 00566 socket->sendCommand(&cmd); 00567 CmdTestRing* async = (CmdTestRing*)socket->waitHook(); 00568 set<string> ret = async->getIds(); 00569 00570 return ret; 00571 } 00572 00573 00574 00575 void MasaNet::cmd_discover(Command* _cmd, Peer* socket) { 00576 fprintf(stderr, "Cmd Discover %p\n", _cmd); 00577 CmdDiscover* cmd = (CmdDiscover*)_cmd; 00578 vector<Peer*> receivedPeers = cmd->getPeers(); 00579 00580 vector<Peer*> newPeers; 00581 for (vector<Peer*>::iterator it = receivedPeers.begin() ; it != receivedPeers.end(); ++it) { 00582 if (discoveredPeers.get((*it)->getRemoteId()) == NULL) { 00583 newPeers.push_back(*it); 00584 discoveredPeers.add(*it); 00585 printf("New Node: %s\n", (*it)->getRemoteId().c_str()); 00586 } 00587 } 00588 00589 announce(newPeers, socket); 00590 00591 /*vector<peer_t> diff; 00592 for (map<string, peer_t>::iterator it = getConnectedPeers().begin() ; it != getConnectedPeers().end(); ++it) { 00593 sendCommand(it->second.socket, cmd); 00594 } 00595 std::set_difference(peers.begin(), peers.end(), this->discoveredPeers.begin(), this->discoveredPeers.end(), 00596 std::inserter(result, result.end())); 00597 00598 fprintf(stderr, "Command Discover: %d\n", peers.size()); 00599 int len0 = this->discoveredPeers.size(); 00600 this->discoveredPeers.insert(peers.begin(), peers.end()); 00601 int len1 = this->discoveredPeers.size(); 00602 for (set<string>::iterator it=peers.begin(); it!=peers.end(); ++it) { 00603 fprintf(stderr, "-> %s\n", it->c_str()); 00604 } 00605 if (true || len0 != len1) { 00606 for (map<string, peer_t>::iterator it = getConnectedPeers().begin() ; it != getConnectedPeers().end(); ++it) { 00607 sendCommand(it->second.socket, cmd); 00608 } 00609 }*/ 00610 } 00611 00612 void MasaNet::cmd_undiscover(Command* _cmd, Peer* socket) { 00613 fprintf(stderr, "Cmd Undiscover %p\n", _cmd); 00614 /*CmdUndiscover* cmd = (CmdUndiscover*)_cmd; 00615 vector<Peer*> receivedPeers = cmd->getPeers(); 00616 Peer* peer = receivedPeers.front(); 00617 00618 if (discoveredPeers.get(peer->getRemoteId()) != NULL) { 00619 if (peers.get(peer->getRemoteId()) == NULL) { 00620 printf("Deleting Node: %s\n", peer->getRemoteId().c_str()); 00621 discoveredPeers.erase(peer->getRemoteId()); 00622 //discoveredPeers.copy(peers.getProcessingPeers()); 00623 disanounce(peer, socket); 00624 //announce(peers.getProcessingPeers(), NULL); 00625 } else { 00626 announce(receivedPeers, NULL); 00627 } 00628 }*/ 00629 00630 } 00631 00632 void MasaNet::cmd_status_request(Command* _cmd, Peer* socket) { 00633 fprintf(stderr, "Cmd Status Request %p\n", _cmd); 00634 CmdStatusResponse response(&status); 00635 response.setSerial(_cmd->getSerial()); 00636 socket->sendCommand(&response); 00637 } 00638 00639 void MasaNet::cmd_peer_request(Command* _cmd, Peer* socket) { 00640 CmdPeerRequest* request = (CmdPeerRequest*)_cmd;; 00641 fprintf(stderr, "Cmd Peer Request %p (%d)\n", _cmd, request->getType()); 00642 CmdPeerResponse response; 00643 if (request->getType() == CMD_CONNECTED_PEERS) { 00644 response.addPeers(peers.getProcessingPeers()); 00645 } else if (request->getType() == CMD_DISCOVERED_PEERS) { 00646 response.addPeers(discoveredPeers.getProcessingPeers()); 00647 } else if (request->getType() == CMD_DATA_PEERS) { 00648 response.addPeer(leftPeerData); 00649 response.addPeer(rightPeerData); 00650 } 00651 response.setSerial(_cmd->getSerial()); 00652 socket->sendCommand(&response); 00653 } 00654 00655 Peer* MasaNet::getLeftPeer() const { 00656 return leftPeer; 00657 } 00658 00659 Peer* MasaNet::getRightPeer() const { 00660 return rightPeer; 00661 } 00662 00663 void MasaNet::cmd_create_ring(Command* _cmd, Peer* socket) { 00664 fprintf(stderr, "Cmd Create Ring %p\n", _cmd); 00665 00666 if (status.getProcessingState() != STATE_IDLE) { 00667 printf("Server is not idle: %d.\n", status.getProcessingState()); 00668 return; 00669 } 00670 status.setProcessingState(STATE_CREATING_RING); 00671 printf("Creating Ring.\n"); 00672 00673 createRing(); 00674 00675 Peer* prev = discoveredPeers.getPrev(myId); 00676 leftPeerId = prev->getRemoteId(); 00677 Peer* next = discoveredPeers.getNext(myId); 00678 rightPeerId = next->getRemoteId(); 00679 00680 printf("I must connect to nodes: %s and %s\n", leftPeerId.c_str(), rightPeerId.c_str()); 00681 00682 if (peers.get(rightPeerId) == NULL) { 00683 connectToPeer(next->getRemoteAddress(), CONNECTION_TYPE_CTRL); 00684 } else { 00685 rightPeer = next; 00686 rightPeer->ringType = RING_RIGHT; 00687 } 00688 00689 if (peers.get(leftPeerId) == NULL) { 00690 connectToPeer(prev->getRemoteAddress(), CONNECTION_TYPE_CTRL); 00691 } else { 00692 leftPeer = prev; 00693 leftPeer->ringType = RING_LEFT; 00694 } 00695 00696 if (rightPeerData == NULL) { 00697 connectToPeer(next->getRemoteAddress(), CONNECTION_TYPE_DATA); 00698 } 00699 if (leftPeerData == NULL) { 00700 connectToPeer(next->getRemoteAddress(), CONNECTION_TYPE_DATA); 00701 } 00702 00703 } 00704 00705 00706 void MasaNet::cmd_test_ring(Command* _cmd, Peer* socket) { 00707 fprintf(stderr, "Cmd Test Ring %p\n", _cmd); 00708 CmdTestRing* cmd = (CmdTestRing*)_cmd; 00709 00710 if (status.getProcessingState() == STATE_IDLE) { 00711 printf("Ring is not created.\n"); 00712 return; 00713 } 00714 if (cmd->containsId(myId)) { 00715 // Loop finished 00716 if (cmd->getOriginatorId() != myId) { 00717 cmd->addId(myId); 00718 peers.get(cmd->getOriginatorId())->sendCommand(cmd); 00719 } else { 00720 string firstId = *(cmd->getIds().begin()); 00721 if (firstId != myId) { 00722 fprintf(stderr, "Ring is not fully connected: %d peer(s)\n", cmd->getIds().size()); 00723 } else { 00724 fprintf(stderr, "Ring is fully connected with %d peer(s)\n", cmd->getIds().size()); 00725 } 00726 } 00727 } else { 00728 cmd->addId(myId); 00729 00730 Peer* next = getRightPeer(); 00731 next->sendCommand(cmd); 00732 } 00733 }
1.7.6.1