|
MASA-Core
|
00001 /******************************************************************************* 00002 * 00003 * Copyright (c) 2010-2015 Edans Sandes 00004 * 00005 * This file is part of MASA-Core. 00006 * 00007 * MASA-Core is free software: you can redistribute it and/or modify 00008 * it under the terms of the GNU General Public License as published by 00009 * the Free Software Foundation, either version 3 of the License, or 00010 * (at your option) any later version. 00011 * 00012 * MASA-Core is distributed in the hope that it will be useful, 00013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 * GNU General Public License for more details. 00016 * 00017 * You should have received a copy of the GNU General Public License 00018 * along with MASA-Core. If not, see <http://www.gnu.org/licenses/>. 00019 * 00020 ******************************************************************************/ 00021 00022 #include "Peer.hpp" 00023 00024 #include "command/Command.hpp" 00025 #include "MasaNet.hpp" 00026 00027 #include <stdio.h> 00028 #include <stdlib.h> 00029 #include <string.h> 00030 #include <string> 00031 using namespace std; 00032 00033 #include <sys/socket.h> /* for socket(), bind(), and connect() */ 00034 #include <arpa/inet.h> /* for sockaddr_in and inet_ntoa() */ 00035 #include <signal.h> 00036 #include <unistd.h> 00037 #include <errno.h> 00038 00039 #define MAGIC_STRING "MASA_NET" 00040 #define MAGIC_ANSWER "MASA_ACK" 00041 00042 #define MASA_NET_VERSION_MAJOR 0 00043 #define MASA_NET_VERSION_MINOR 1 00044 00045 #define FLAG_NONE 0x00000000 00046 00047 #define SEND_ERROR_MSG ("send: socket error") 00048 #define RECV_ERROR_MSG ("recv: socket error") 00049 00050 map<int, cmd_creator_f> Peer::cmdCreators; 00051 bool Peer::hasStaticEvent = false; 00052 timeval Peer::staticEvent; 00053 00054 Peer::Peer(int socket, const string& localId, bool initiator, int connectionType) { 00055 this->socket = socket; 00056 this->connectionType = connectionType; 00057 this->initiator = initiator; 00058 this->connected = false; 00059 this->serial = 0; 00060 this->localId = localId; 00061 this->remoteType = TYPE_UNKNOWN; 00062 this->localType = TYPE_UNKNOWN; 00063 this->ringType = RING_NONE; 00064 this->error = false; 00065 this->handshakeDone = false; 00066 00067 struct sockaddr_in sin; 00068 socklen_t len = sizeof(sin); 00069 if (getsockname(socket, (struct sockaddr *)&sin, &len) == -1) { 00070 perror("getsockname"); 00071 } 00072 00073 char desc[256]; 00074 sprintf(desc, "%s:%d", inet_ntoa(sin.sin_addr), ntohs(sin.sin_port)); 00075 localAddress = string(desc); 00076 00077 if (getpeername(socket, (struct sockaddr *)&sin, &len) == -1) { 00078 perror("getpeername"); 00079 } 00080 sprintf(desc, "%s:%d", inet_ntoa(sin.sin_addr), ntohs(sin.sin_port)); 00081 remoteAddress = string(desc); 00082 00083 fprintf(stderr, "Peer::Peer(%d, %s, %d): %p [%s]\n", socket, localId.c_str(), initiator, this, desc); 00084 00085 pthread_mutex_init(&mutex, NULL); 00086 pthread_cond_init(&responseCond, NULL); 00087 pthread_cond_init(&handshakeCond, NULL); 00088 00089 /* This code will prevent the SIGPIPE signal from being raised, but you 00090 * will get a read / write error when trying to use the socket, so you 00091 * will need to check for that. 00092 */ 00093 signal(SIGPIPE, SIG_IGN); 00094 } 00095 00096 Peer::Peer(string remoteId, string remoteAddress, 00097 const int remoteType, const int ringType, int connectionType) { 00098 this->remoteId = remoteId; 00099 this->remoteAddress = remoteAddress; 00100 this->remoteType = remoteType; 00101 this->ringType = ringType; 00102 this->localType = TYPE_UNKNOWN; 00103 this->connected = false; 00104 this->serial = 0; 00105 this->initiator = false; 00106 this->socket = 0; 00107 this->error = false; 00108 this->handshakeDone = false; 00109 this->connectionType = connectionType; 00110 00111 // TODO criar mutex, conds? 00112 } 00113 00114 Peer::~Peer() { 00115 this->connected = false; 00116 pthread_mutex_destroy(&mutex); 00117 pthread_cond_destroy(&responseCond); 00118 pthread_cond_destroy(&handshakeCond); 00119 } 00120 00121 bool Peer::handshake() { 00122 if (initiator) { 00123 connected = handshakeInitiator(); 00124 } else { 00125 connected = handshakeInitiated(); 00126 } 00127 pthread_mutex_lock(&mutex); 00128 handshakeDone = true; 00129 pthread_cond_broadcast(&handshakeCond); 00130 pthread_mutex_unlock(&mutex); 00131 00132 if (connected) { 00133 if (connectionType == CONNECTION_TYPE_DATA) { 00134 connected = callback->onConnectData(this); 00135 } else { 00136 connected = callback->onConnect(this); 00137 } 00138 } 00139 00140 if (!connected) { 00141 finalize(); 00142 } 00143 return connected; 00144 } 00145 00146 void Peer::finalize() { 00147 if (socket != 0) { 00148 fprintf(stderr, "Peer::finalize (%s) %d\n", remoteId.c_str(), socket); 00149 close(socket); 00150 socket = 0; 00151 } 00152 } 00153 00154 bool Peer::waitHandshake() { 00155 printf("Wait 1\n"); 00156 pthread_mutex_lock(&mutex); 00157 printf("Wait 2\n"); 00158 while (!handshakeDone) { 00159 printf("Wait 3\n"); 00160 pthread_cond_wait(&handshakeCond, &mutex); 00161 } 00162 printf("Wait 4\n"); 00163 pthread_mutex_unlock(&mutex); 00164 return connected; 00165 } 00166 00167 bool Peer::handshakeInitiator() { 00168 char magic[32]; 00169 recv_array(magic, strlen(MAGIC_STRING)); 00170 if (memcmp(magic, MAGIC_STRING, strlen(MAGIC_STRING)) != 0) { 00171 fprintf(stderr, "handshake: wrong magic string: %s/%s %d\n", MAGIC_STRING, magic, socket); 00172 return 0; 00173 } 00174 00175 send_array(MAGIC_ANSWER, strlen(MAGIC_ANSWER)); 00176 00177 int major = recv_int8(); 00178 int minor = recv_int8(); 00179 00180 if (major != MASA_NET_VERSION_MAJOR) { 00181 fprintf(stderr, "handshake: unsupported version: %d.%d\n", major, minor); 00182 return 0; 00183 } 00184 00185 int flags = FLAG_NONE; 00186 send_int32(flags); 00187 00188 send_int8(connectionType); 00189 00190 send_vls8(localId); 00191 remoteId = recv_vls8(); 00192 00193 send_int16(localType); 00194 remoteType = recv_int16(); 00195 00196 send_vls8(localAddress); 00197 00198 return 1; 00199 } 00200 00201 string Peer::toString() { 00202 stringstream msg; 00203 msg << remoteId << "["; 00204 switch (remoteType) { 00205 case TYPE_CLI: 00206 msg << "CL"; 00207 break; 00208 case TYPE_PROCESSING_NODE: 00209 msg << "PE"; 00210 break; 00211 case TYPE_GATEWAY: 00212 msg << "GW"; 00213 break; 00214 case TYPE_UNKNOWN: 00215 msg << "??"; 00216 break; 00217 } 00218 msg << "](" << remoteAddress << ")"; 00219 if (ringType == RING_RIGHT) { 00220 msg << " *R"; 00221 } else if (ringType == RING_LEFT) { 00222 msg << " *L"; 00223 } 00224 if (connectionType == CONNECTION_TYPE_CTRL) { 00225 msg << " CTRL"; 00226 } else if (connectionType == CONNECTION_TYPE_DATA) { 00227 msg << " DATA"; 00228 } else { 00229 msg << " ??"; 00230 } 00231 return msg.str(); 00232 } 00233 00234 void Peer::registerCommandCreator(int id, cmd_creator_f creator) { 00235 cmdCreators[id] = creator; 00236 } 00237 00238 bool Peer::handshakeInitiated() { 00239 send_array(MAGIC_STRING, strlen(MAGIC_STRING)); 00240 00241 char magic[32]; 00242 recv_array(magic, strlen(MAGIC_ANSWER)); 00243 if (memcmp(magic, MAGIC_ANSWER, strlen(MAGIC_ANSWER)) != 0) { 00244 fprintf(stderr, "handshake: wrong magic answer: %s/%s %d\n", MAGIC_ANSWER, magic, socket); 00245 return 0; 00246 } 00247 00248 send_int8(MASA_NET_VERSION_MAJOR); 00249 send_int8(MASA_NET_VERSION_MINOR); 00250 00251 int flags = recv_int32(); 00252 00253 connectionType = recv_int8(); 00254 00255 remoteId = recv_vls8(); 00256 send_vls8(localId); 00257 00258 remoteType = recv_int16(); 00259 send_int16(localType); 00260 00261 remoteAddress = recv_vls8(); 00262 00263 return 1; 00264 } 00265 00266 00267 bool Peer::isConnected() { 00268 return connected; 00269 } 00270 00271 void Peer::addHook(int id, int serial) { 00272 pthread_t tid = pthread_self(); 00273 00274 pthread_mutex_lock(&mutex); 00275 hookThreads[id].insert(tid); 00276 hookSerial[tid] = serial; 00277 hookCommand[tid] = NULL; 00278 pthread_mutex_unlock(&mutex); 00279 } 00280 00281 Command* Peer::waitHook() { 00282 pthread_t tid = pthread_self(); 00283 Command* ret = NULL; 00284 00285 pthread_mutex_lock(&mutex); 00286 while (hookCommand[tid] == NULL) { 00287 pthread_cond_wait (&responseCond, &mutex); 00288 // TODO colocar timeout? 00289 // FIXME tratar caso de desconexão 00290 } 00291 ret = hookCommand[tid]; 00292 hookCommand[tid] = NULL; 00293 hookThreads[ret->getId()].erase(tid); 00294 pthread_mutex_unlock(&mutex); 00295 return ret; 00296 } 00297 00298 void Peer::notifyHook(Command* cmd) { 00299 int id = cmd->getId(); 00300 pthread_mutex_lock(&mutex); 00301 printf("Hook Received %d. %d\n", id, hookThreads[id].size()); 00302 if (hookThreads[id].size() > 0) { 00303 bool wakeup = false; 00304 for (std::set<pthread_t>::iterator it=hookThreads[id].begin(); it!=hookThreads[id].end(); ++it) { 00305 printf("Hook Received: %d %d\n", id, hookSerial[*it]); 00306 if (hookSerial[*it] == -1 || hookSerial[*it] == cmd->getSerial()) { 00307 hookCommand[*it] = cmd; 00308 hookThreads[id].erase(*it); 00309 wakeup = true; 00310 } 00311 } 00312 if (wakeup) { 00313 pthread_cond_broadcast(&responseCond); 00314 } 00315 } 00316 pthread_mutex_unlock(&mutex); 00317 } 00318 00319 int Peer::getNextSerial() { 00320 return ++serial; 00321 } 00322 00323 00324 int Peer::getLocalType() const { 00325 return localType; 00326 } 00327 00328 void Peer::setLocalType(int localType) { 00329 this->localType = localType; 00330 } 00331 00332 int Peer::getRemoteType() const { 00333 return remoteType; 00334 } 00335 00336 const string& Peer::getLocalAddress() const { 00337 return localAddress; 00338 } 00339 00340 const string& Peer::getRemoteAddress() const { 00341 return remoteAddress; 00342 } 00343 00344 const string& Peer::getLocalId() const { 00345 return localId; 00346 } 00347 00348 const string& Peer::getRemoteId() const { 00349 return remoteId; 00350 } 00351 00352 void Peer::setLocalAddress(const string& publicAddress) { 00353 if (publicAddress[0] != ':') { 00354 this->localAddress = publicAddress; 00355 } else { 00356 int port; 00357 sscanf(publicAddress.c_str(), ":%d", &port); 00358 00359 struct sockaddr_in sin; 00360 socklen_t len = sizeof(sin); 00361 if (getsockname(socket, (struct sockaddr *)&sin, &len) == -1) { 00362 perror("getsockname"); 00363 } 00364 char desc[256]; 00365 sprintf(desc, "%s:%d", inet_ntoa(sin.sin_addr), port); 00366 this->localAddress = string(desc); 00367 } 00368 } 00369 00370 Command* Peer::sendCommand(Command* command) { 00371 fprintf(stderr, "Will Lock: %d \n", command->getId()); 00372 pthread_mutex_lock(&mutex); 00373 fprintf(stderr, " Locked 1\n"); 00374 int id = command->getId(); 00375 fprintf(stderr, " Locked 2\n"); 00376 send_int16(id); 00377 fprintf(stderr, " Locked 3\n"); 00378 if (id & REQUEST_COMMAND) { 00379 int serial = getNextSerial(); 00380 command->setSerial(serial); 00381 send_int32(serial); 00382 } else if (id & RESPONSE_COMMAND) { 00383 send_int32(command->getSerial()); 00384 } 00385 fprintf(stderr, " Locked 4\n"); 00386 command->send(this); 00387 fprintf(stderr, "sendCommand(%s->%s): %d.%d \n", localId.c_str(), remoteId.c_str(), id, command->getSerial()); 00388 00389 00390 00391 Command* ret = NULL; 00392 if (id & REQUEST_COMMAND) { 00393 pthread_t tid = pthread_self(); 00394 hookThreads[(id ^ REQUEST_COMMAND) | RESPONSE_COMMAND].insert(tid); 00395 hookSerial[tid] = serial; 00396 hookCommand[tid] = NULL; 00397 printf("hook: %d %d\n", (id ^ REQUEST_COMMAND) | RESPONSE_COMMAND, serial); 00398 while (hookCommand[tid] == NULL) { 00399 fprintf(stderr, "wait response\n"); 00400 pthread_cond_wait (&responseCond, &mutex); 00401 } 00402 ret = hookCommand[tid]; 00403 } 00404 00405 pthread_mutex_unlock(&mutex); 00406 00407 return ret; 00408 00409 } 00410 00411 00412 Command* Peer::recvCommand() { 00413 fprintf(stderr, "recvCommand(%s<-%s): Waiting... %p\n", localId.c_str(), remoteId.c_str(), this); 00414 int id = recv_int16(); 00415 int serial = 0; 00416 if ((id & REQUEST_COMMAND) || (id & RESPONSE_COMMAND)) { 00417 serial = recv_int32(); 00418 } 00419 cmd_creator_f creator = cmdCreators[id]; 00420 if (creator == NULL) { 00421 fprintf(stderr, "cmd: unsupported command [%d]\n", id); 00422 // TODO 00423 } 00424 Command* cmd = creator(); 00425 cmd->setSerial(serial); 00426 cmd->receive(this); 00427 fprintf(stderr, "recvCommand(%s<-%s): %d.%d \n", localId.c_str(), remoteId.c_str(), id, cmd->getSerial()); 00428 00429 /*if (id & RESPONSE_COMMAND) { 00430 pthread_mutex_lock(&mutex); 00431 response = cmd; 00432 pthread_cond_broadcast (&responseCond); 00433 while (response != NULL && waitingResponse > 0) { 00434 pthread_cond_wait (&responseCond, &mutex); 00435 // TODO colocar timeout? 00436 // FIXME tratar caso de disconexão 00437 } 00438 pthread_mutex_unlock(&mutex); 00439 }*/ 00440 00441 notifyHook(cmd); 00442 00443 return cmd; 00444 } 00445 00446 00447 00448 void Peer::handleSendError(int ret) { 00449 if (ret <= 0) { 00450 callback->onDisconnect(this); 00451 throw IOException(SEND_ERROR_MSG); 00452 } 00453 } 00454 00455 void Peer::handleRecvError(int ret) { 00456 if (ret <= 0) { 00457 perror("Error reading from server"); 00458 fprintf(stderr, "Ret: %d (%d)\n", ret, socket); 00459 00460 callback->onDisconnect(this); 00461 throw IOException(RECV_ERROR_MSG); 00462 } 00463 } 00464 00465 00466 void Peer::send_int32(int value) { 00467 int msg = htonl(value); 00468 int ret = send(socket, &msg, sizeof(msg), 0); 00469 handleSendError(ret); 00470 } 00471 00472 int Peer::recv_int32() { 00473 int msg; 00474 int ret = recv(socket, &msg, sizeof(msg), 0); 00475 handleRecvError(ret); 00476 00477 return ntohl(msg); 00478 } 00479 00480 void Peer::send_int16(short value) { 00481 short msg = htons(value); 00482 int ret = send(socket, &msg, sizeof(msg), 0); 00483 handleSendError(ret); 00484 } 00485 00486 short Peer::recv_int16() { 00487 short msg; 00488 int ret = recv(socket, &msg, sizeof(msg), 0); 00489 handleRecvError(ret); 00490 00491 return ntohs(msg); 00492 } 00493 00494 void Peer::send_int8(char value) { 00495 char msg = value; 00496 int ret = send(socket, &msg, sizeof(msg), 0); 00497 handleSendError(ret); 00498 } 00499 00500 char Peer::recv_int8() { 00501 char msg; 00502 int ret = recv(socket, &msg, sizeof(msg), 0); 00503 handleRecvError(ret); 00504 00505 return msg; 00506 } 00507 00508 void Peer::send_array(const char* value, int len) { 00509 int ret = send(socket, value, len, 0); 00510 handleSendError(ret); 00511 } 00512 00513 void Peer::recv_array(char* value, int len) { 00514 char msg; 00515 int ret = recv(socket, value, len, 0); 00516 handleRecvError(ret); 00517 00518 } 00519 00520 void Peer::send_vls8(const char* value) { 00521 int len = strlen(value); 00522 if (len > 255) { 00523 len = 255; 00524 } 00525 send_int8(len); 00526 send_array(value, len); 00527 } 00528 00529 void Peer::recv_vls8(char* value, int max) { 00530 unsigned char len = (unsigned char)recv_int8(); 00531 //fprintf(stderr, "recv_vls8: %d\n", len); 00532 if (max != -1 && len > max-1) { 00533 recv_array(value, max-1); 00534 value[max-1] = '\0'; 00535 recv_dummy(len-max+1); 00536 } else { 00537 recv_array(value, len); 00538 value[len] = '\0'; 00539 } 00540 } 00541 00542 void Peer::send_vls8(const string& value) { 00543 send_vls8(value.c_str()); 00544 } 00545 00546 string Peer::recv_vls8() { 00547 char str[256]; 00548 recv_vls8(str, sizeof(str)); 00549 return string(str); 00550 } 00551 00552 void Peer::recv_dummy(int len) { 00553 char dummy[len]; 00554 recv_array(dummy, len); 00555 } 00556 00557 float Peer::getGlobalTime() { 00558 if (!hasStaticEvent) { 00559 hasStaticEvent = true; 00560 gettimeofday(&staticEvent, NULL); 00561 } 00562 00563 timeval event; 00564 gettimeofday(&event, NULL); 00565 00566 float diff = getElapsedTime(&event, &staticEvent); 00567 return diff; 00568 } 00569 00570 void Peer::setCallback(MasaNetCallbacks* callback) { 00571 this->callback = callback; 00572 } 00573 00574 bool Peer::isInitiator() const { 00575 return initiator; 00576 } 00577 00578 int Peer::getSocket() const { 00579 return socket; 00580 } 00581 00582 int Peer::getConnectionType() const { 00583 return connectionType; 00584 } 00585 00586 float Peer::getElapsedTime(timeval *end_time, timeval *start_time) { 00587 timeval temp_diff; 00588 00589 temp_diff.tv_sec = end_time->tv_sec - start_time->tv_sec; 00590 temp_diff.tv_usec = end_time->tv_usec - start_time->tv_usec; 00591 00592 if (temp_diff.tv_usec < 0) { 00593 long long nsec = temp_diff.tv_usec/1000000; 00594 temp_diff.tv_usec += 1000000 * nsec; 00595 temp_diff.tv_sec -= nsec; 00596 } 00597 00598 return (1000000LL * temp_diff.tv_sec + temp_diff.tv_usec)/1000.0f; 00599 00600 }
1.7.6.1