Changeset 5870 for source/ariba/overlay/BaseOverlay.cpp
- Timestamp:
- Aug 11, 2009, 4:11:02 PM (16 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
source/ariba/overlay/BaseOverlay.cpp
r5860 r5870 49 49 50 50 #include "ariba/overlay/LinkDescriptor.h" 51 51 52 #include "ariba/overlay/messages/OverlayMsg.h" 52 53 #include "ariba/overlay/messages/JoinRequest.h" 53 54 #include "ariba/overlay/messages/JoinReply.h" 54 #include "ariba/overlay/messages/LinkRequest.h"55 #include "ariba/overlay/messages/RelayMessage.h"56 55 57 56 #include "ariba/utility/misc/OvlVis.h" … … 59 58 namespace ariba { 60 59 namespace overlay { 60 61 /* ***************************************************************************** 62 * PREREQUESITES 63 * ****************************************************************************/ 64 65 CommunicationListener* BaseOverlay::getListener( const ServiceID& service ) { 66 if( !communicationListeners.contains( service ) ) { 67 logging_error( "No listener found for service " << service.toString() ); 68 return NULL; 69 } 70 CommunicationListener* listener = communicationListeners.get( service ); 71 assert( listener != NULL ); 72 return listener; 73 } 74 75 // link descriptor handling ---------------------------------------------------- 61 76 62 77 LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) { … … 91 106 if ( desc == NULL ) { 92 107 desc = new LinkDescriptor(); 93 desc->overlayId = link;108 if (!link.isUnspecified()) desc->overlayId = link; 94 109 links.push_back(desc); 95 110 } … … 99 114 /// returns a auto-link descriptor 100 115 LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) { 116 // search for a descriptor that is already up 101 117 BOOST_FOREACH( LinkDescriptor* lp, links ) 102 118 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up) 103 119 return lp; 120 // if not found, search for one that is about to come up... 104 121 BOOST_FOREACH( LinkDescriptor* lp, links ) 105 122 if (lp->autolink && lp->remoteNode == node && lp->service == service ) … … 108 125 } 109 126 110 /// returns a direct local link relay descriptor for the given remote node 111 LinkDescriptor* BaseOverlay::getRelayDescriptor( const NodeID& id ) { 112 127 /// stabilizes link information 128 void BaseOverlay::stabilizeLinks() { 129 // send keep-alive messages over established links 130 BOOST_FOREACH( LinkDescriptor* ld, links ) { 131 if (!ld->up) continue; 132 OverlayMsg msg( OverlayMsg::typeLinkAlive, 133 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode ); 134 msg.setRelayed(true); 135 if (ld->relayed) msg.setRouteRecord(true); 136 send_link( &msg, ld->overlayId ); 137 } 138 139 <<<<<<< .working 113 140 // get used next hop towards node 114 141 LinkDescriptor* rld = NULL; … … 118 145 // get descriptor of first hop 119 146 rld = getDescriptor(rlid); 120 147 ======= 148 // iterate over all links and check for time boundaries 149 vector<LinkDescriptor*> oldlinks; 150 time_t now = time(NULL); 151 BOOST_FOREACH( LinkDescriptor* ld, links ) { 152 // remote used as relay flag 153 if ( ld->relaying && difftime( now, ld->timeRelaying ) > 10) 154 ld->relaying = false; 155 >>>>>>> .merge-rechts.r5869 156 157 <<<<<<< .working 121 158 // is first hop a relay path use local relay 122 159 if ( rld->relay ) relayNode = rld->localRelay; … … 125 162 else relayNode = rld->remoteNode; 126 163 } 127 128 // if first relay is unknown choose a arbitrary direct node as relay 129 if ( relayNode.isUnspecified() ) { 130 for (size_t i=0; i<links.size(); i++) 131 if (links[i]->up && 132 links[i]->communicationUp && 133 !links[i]->relay && 134 links[i]->keepAliveMissed <= 1 && 135 links[i]->service == OverlayInterface::OVERLAY_SERVICE_ID) { 136 relayNode = links[i]->remoteNode; 137 break; 164 ======= 165 // keep alives and not up? yes-> link connection request is stale! 166 if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) { 167 >>>>>>> .merge-rechts.r5869 168 169 // increase counter 170 ld->keepAliveMissed++; 171 172 // missed more than four keep-alive messages (10 sec)? -> drop link 173 if (ld->keepAliveMissed > 4) { 174 logging_info( "Link connection request is stale, closing: " << ld ); 175 ld->relaying = false; 176 oldlinks.push_back( ld ); 177 continue; 138 178 } 139 }140 141 // no local relay found-> damn!142 if (relayNode.isUnspecified()) return NULL; 143 179 } 180 181 if (!ld->up) continue; 182 183 <<<<<<< .working 144 184 // get descriptor 145 185 BOOST_FOREACH( LinkDescriptor* lp, links ) … … 149 189 lp->up) 150 190 return lp; 151 191 ======= 192 // drop links that are dropped and not used as relay 193 if (ld->dropAfterRelaying && !ld->relaying && !ld->autolink) { 194 oldlinks.push_back( ld ); 195 continue; 196 } 197 198 // auto-link time exceeded? 199 if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) { 200 oldlinks.push_back( ld ); 201 continue; 202 >>>>>>> .merge-rechts.r5869 203 204 // keep alives missed? yes-> 205 if ( difftime( now, ld->keepAliveTime ) > 2 ) { 206 207 // increase counter 208 ld->keepAliveMissed++; 209 210 // missed more than four keep-alive messages (4 sec)? -> drop link 211 if (ld->keepAliveMissed >= 4) { 212 logging_info( "Link is stale, closing: " << ld ); 213 ld->relaying = false; 214 oldlinks.push_back( ld ); 215 continue; 216 } 217 } 218 } 219 220 // drop links 221 BOOST_FOREACH( const LinkDescriptor* ld, oldlinks ) { 222 /* 223 vector<LinkID>::iterator it = std::find( 224 bootstrapLinks.begin(), bootstrapLinks.end(), ld->communicationId); 225 226 if (!ld->communicationId.isUnspecified() && it != bootstrapLinks.end() ){ 227 logging_info( "Not dropping initiator link: " << ld ); 228 continue; 229 } 230 */ 231 logging_info( "Link timed out. Dropping " << ld ); 232 dropLink( ld->overlayId ); 233 } 234 235 // show link state 236 counter++; 237 if (counter>=4) showLinks(); 238 if (counter>=4 || counter<0) counter = 0; 239 } 240 241 void BaseOverlay::showLinks() { 242 int i=0; 243 logging_info("--- link state -------------------------------"); 244 BOOST_FOREACH( LinkDescriptor* ld, links ) { 245 logging_info("link " << i << ": " << ld); 246 i++; 247 } 248 logging_info("----------------------------------------------"); 249 } 250 251 // internal message delivery --------------------------------------------------- 252 253 /// routes a message to its destination node 254 void BaseOverlay::route( OverlayMsg* message, LinkDescriptor* incomingLink ) { 255 256 // exceeded time-to-live? yes-> drop message 257 if (message->getNumHops() > message->getTimeToLive()) { 258 logging_warn("Message exceeded TTL -> drop message!"); 259 return; 260 261 // no-> forward message 262 } else { 263 // destinastion myself? yes-> handle message 264 if (message->getDestinationNode() == nodeId) 265 handleMessage( message, incomingLink ); 266 else 267 // no->send message to next hop 268 send( message, message->getDestinationNode() ); 269 } 270 } 271 272 /// sends a message to another node, delivers it to the base overlay class 273 seqnum_t BaseOverlay::send( OverlayMsg* message, const NodeID& destination ) { 274 LinkDescriptor* next_link = NULL; 275 276 // drop messages to unspecified destinations 277 if (destination.isUnspecified()) return -1; 278 279 // send messages to myself -> handle message and drop warning! 280 if (destination == nodeId) { 281 logging_warn("Sent message to myself. Handling message.") 282 Message msg; 283 msg.encapsulate(message); 284 handleMessage( &msg, NULL ); 285 return -1; 286 } 287 288 // relay path known? yes-> send over relay link 289 next_link = getRelayLinkTo( destination ); 290 if (next_link != NULL) { 291 next_link->setRelaying(); 292 return send(message, next_link); 293 } 294 295 // no-> relay path! route over overlay path 296 LinkID next_id = overlayInterface->getNextLinkId( destination ); 297 if (next_id.isUnspecified()) { 298 logging_error("Could not send message. No next hop found to " << destination ); 299 return -1; 300 } 301 302 // get link descriptor, up and running? yes-> send message 303 next_link = getDescriptor(next_id); 304 if (next_link != NULL && next_link->up) 305 return send(message, next_link); 306 307 // no-> error, dropping message 308 else { 309 logging_error("Could not send message. Link not known or up"); 310 return -1; 311 } 312 313 // not reached-> fail 314 return -1; 315 } 316 317 /// send a message using a link descriptor, delivers it to the base overlay class 318 seqnum_t BaseOverlay::send( OverlayMsg* message, LinkDescriptor* ld, bool ignore_down ) { 319 assert(ld!=NULL); 320 321 // check if up 322 if (!ld->up && !ignore_down) { 323 logging_error("Can not send message. Link not up:" << ld ); 324 return -1; 325 } 326 327 // handle relayed link 328 if (ld->relayed) { 329 logging_debug("send(): Resolving direct link for relayed link to " 330 << ld->remoteNode); 331 ld = getRelayLinkTo( ld->remoteNode ); 332 if (ld==NULL) { 333 logging_error("Direct link not found."); 334 return -1; 335 } 336 message->setRelayed(); 337 ld->setRelaying(); 338 } 339 340 // handle direct link 341 if (ld->communicationUp) { 342 logging_debug("send(): Sending message over direct link."); 343 return bc->sendMessage( ld->communicationId, message ); 344 } else { 345 logging_error("send(): Could not send mesage. " 346 "Not a relayed link and direct link is not up."); 347 return -1; 348 } 349 return -1; 350 } 351 352 seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote, 353 const ServiceID& service) { 354 message->setSourceNode(nodeId); 355 message->setService(service); 356 message->setDestinationNode(remote); 357 send( message, remote ); 358 } 359 360 seqnum_t BaseOverlay::send_link( OverlayMsg* message, const LinkID& link,bool ignore_down ) { 361 LinkDescriptor* ld = getDescriptor(link); 362 if (ld==NULL) { 363 logging_error("Cannot find descriptor to link id=" << link.toString()); 364 return -1; 365 } 366 message->setSourceNode(nodeId); 367 message->setSourceLink(ld->overlayId); 368 message->setService(ld->service); 369 message->setDestinationNode(ld->remoteNode); 370 message->setDestinationLink(ld->remoteLink); 371 return send( message, ld, ignore_down ); 372 } 373 374 // relay route management ------------------------------------------------------ 375 376 /// stabilize relay information 377 void BaseOverlay::stabilizeRelays() { 378 vector<relay_route>::iterator i = relay_routes.begin(); 379 while (i!=relay_routes.end() ) { 380 relay_route& route = *i; 381 LinkDescriptor* ld = getDescriptor(route.link); 382 if (ld==NULL 383 || !ld->up 384 || difftime(route.used, time(NULL)) > 4) { 385 logging_info("Forgetting relay information to node " 386 << route.node.toString() ); 387 i = relay_routes.erase(i); 388 } else 389 i++; 390 } 391 } 392 393 void BaseOverlay::removeRelayLink( const LinkID& link ) { 394 vector<relay_route>::iterator i = relay_routes.begin(); 395 while (i!=relay_routes.end() ) { 396 relay_route& route = *i; 397 if (route.link == link ) i = relay_routes.erase(i); else i++; 398 } 399 } 400 401 void BaseOverlay::removeRelayNode( const NodeID& remote ) { 402 vector<relay_route>::iterator i = relay_routes.begin(); 403 while (i!=relay_routes.end() ) { 404 relay_route& route = *i; 405 if (route.node == remote ) i = relay_routes.erase(i); else i++; 406 } 407 } 408 409 /// refreshes relay information 410 void BaseOverlay::refreshRelayInformation( const OverlayMsg* message, LinkDescriptor* ld ) { 411 412 // handle relayed messages from real links only 413 if (ld == NULL 414 || ld->relayed 415 || !message->isRelayed() 416 || message->getSourceNode()==nodeId ) return; 417 418 // check wheter this node is already part of the routing table 419 LinkID next_link = overlayInterface->getNextLinkId(message->getSourceNode()); 420 if (next_link == ld->overlayId) return; 421 ld->setRelaying(); 422 423 // try to find source node 424 BOOST_FOREACH( relay_route& route, relay_routes ) { 425 426 // relay route found? yes-> 427 if ( route.node == message->getSourceNode() ) { 428 429 // refresh timer 430 route.used = time(NULL); 431 432 // route has a shorter hop count? yes-> replace 433 if (route.hops > message->getNumHops() ) { 434 logging_info("Updating relay information to node " 435 << route.node.toString() 436 << " reducing to " << message->getNumHops() << " hops."); 437 route.hops = message->getNumHops(); 438 route.link = ld->overlayId; 439 } 440 return; 441 } 442 } 443 444 // not found-> add new entry 445 relay_route route; 446 route.hops = message->getNumHops(); 447 route.link = ld->overlayId; 448 route.node = message->getSourceNode(); 449 route.used = time(NULL); 450 logging_info("Remembering relay information to node " << route.node.toString()); 451 relay_routes.push_back(route); 452 } 453 454 /// returns a known "vital" relay link which is up and running 455 LinkDescriptor* BaseOverlay::getRelayLinkTo( const NodeID& remote ) { 456 // try to find source node 457 BOOST_FOREACH( relay_route& route, relay_routes ) { 458 if (route.node == remote ) { 459 LinkDescriptor* ld = getDescriptor( route.link ); 460 if (ld==NULL || !ld->up) return NULL; else return ld; 461 } 462 } 152 463 return NULL; 153 464 } 154 465 155 /// returns the link descriptor that is actually used for sending a message over the overöay 156 LinkDescriptor* BaseOverlay::getSendDescriptor( const NodeID& nodeid, bool follow ) { 157 for (size_t i=0; i<links.size(); i++) 158 if ( !links[i]->relay && 159 links[i]->up && 160 links[i]->communicationUp && 161 links[i]->keepAliveMissed <= 1 && 162 links[i]->remoteNode == nodeid && 163 links[i]->service == OverlayInterface::OVERLAY_SERVICE_ID) { 164 return links[i]; 165 } 166 LinkDescriptor* ld = getDescriptor(overlayInterface->getNextLinkId(nodeid)); 167 if (ld != NULL && ld->relay && follow) 168 return getSendDescriptor(ld->localRelay, false); 169 return NULL; 170 } 171 172 NodeID BaseOverlay::getRelayNode( const NodeID& remoteNode ) { 173 LinkDescriptor* rld = getRelayDescriptor(remoteNode); 174 return rld!=NULL ? rld->remoteNode : NodeID::UNSPECIFIED; 175 } 176 177 /// routes a message over the overlay or directly sends it when a link is open 178 seqnum_t BaseOverlay::sendOverlay( Message* message, const NodeID& nodeid, const NodeID& remoteRelay ) { 179 /// send message directly to a neighbor 180 for (size_t i=0; i<links.size(); i++) 181 if ( !links[i]->relay && 182 links[i]->up && 183 links[i]->communicationUp && 184 links[i]->keepAliveMissed <= 1 && 185 links[i]->remoteNode == nodeid && 186 links[i]->service == OverlayInterface::OVERLAY_SERVICE_ID) { 187 188 // mark as relay and send message 189 links[i]->markAsRelay(); 190 return sendMessage( message, links[i] ); 191 } 192 193 /// send relayed message over the overlay 194 if (!remoteRelay.isUnspecified()) { 195 // create a information relay message to inform the relay about 196 OverlayMsg overlay_msg( 197 OverlayMsg::typeRelay, OverlayInterface::OVERLAY_SERVICE_ID, nodeId); 198 RelayMessage relayMsg( RelayMessage::typeInform, remoteRelay, nodeid, LinkID::UNSPECIFIED ); 199 relayMsg.encapsulate( message ); 200 overlay_msg.encapsulate( &relayMsg ); 201 202 // get local relay link 203 LinkDescriptor* rld = getRelayDescriptor(nodeid); 204 205 // local relay available? send to local relay! 206 if (rld!=NULL) { 207 rld->markAsRelay(); 208 sendMessage(&overlay_msg, rld); 209 } else 210 overlayInterface->routeMessage(remoteRelay, &overlay_msg); 211 212 // finished 213 return 0; 214 } 215 216 // common case: send message over the overlay 217 overlayInterface->routeMessage(nodeid, message); 218 return 0; 219 } 220 221 /// forwards a message over relays/directly using link descriptor 222 seqnum_t BaseOverlay::sendMessage( Message* message, const LinkDescriptor* ld ) { 223 224 // directly send message 225 if ( !ld->communicationId.isUnspecified() && ld->communicationUp ) { 226 logging_debug("Send: Sending message via Base Communication"); 227 return bc->sendMessage( ld->communicationId, message ); 228 } 229 230 // relay message 231 else if ( ld->relay ) { 232 233 // sending a relayed message 234 logging_debug("Send: Relaying message to node " 235 << ld->remoteNode.toString() 236 << " using relay " << ld->localRelay 237 ); 238 239 // create a information relay message to inform the relay about 240 OverlayMsg overlay_msg( OverlayMsg::typeRelay, ld->service, nodeId ); 241 RelayMessage relayMsg( RelayMessage::typeInform, ld->remoteRelay, ld->remoteNode, ld->remoteLinkId ); 242 relayMsg.encapsulate( message ); 243 overlay_msg.encapsulate( &relayMsg ); 244 245 // route message to relay node in order to inform it! 246 logging_debug("sendMessage: Sending message over relayed link with" << ld ); 247 sendOverlay( &overlay_msg, ld->localRelay ); 248 return 0; 249 } 250 251 // error 252 else { 253 logging_error( "Could not send message descriptor=" << ld ); 254 return -1; 255 } 256 return -1; 257 } 258 259 /// creates a link descriptor, apply relay semantics if possible 260 LinkDescriptor* BaseOverlay::createLinkDescriptor( 261 const NodeID remoteNode, const ServiceID service, const LinkID link_id ) { 262 263 // find listener 264 if( !communicationListeners.contains( service ) ) { 265 logging_error( "No listener found for service " << service.toString() ); 266 return NULL; 267 } 268 CommunicationListener* listener = communicationListeners.get( service ); 269 assert( listener != NULL ); 270 271 // copy link id 272 LinkID linkid = link_id; 273 274 // create link id if necessary 275 if ( linkid.isUnspecified() ) 276 linkid = LinkID::create(); 277 278 // create relay link descriptor 279 NodeID relayNode = getRelayNode(remoteNode); 280 281 // add descriptor 282 LinkDescriptor* ld = addDescriptor( linkid ); 283 ld->overlayId = linkid; 284 ld->service = service; 285 ld->listener = listener; 286 ld->remoteNode = remoteNode; 287 288 // set relay node if available 289 ld->relay = !relayNode.isUnspecified(); 290 ld->localRelay = relayNode; 291 292 if (!ld->relay) 293 logging_error("No relay found!"); 294 295 // debug output 296 logging_debug( "Created link descriptor: " << ld ); 297 298 return ld; 299 } 300 301 302 // ---------------------------------------------------------------------------- 466 /* ***************************************************************************** 467 * PUBLIC MEMBERS 468 * ****************************************************************************/ 303 469 304 470 use_logging_cpp(BaseOverlay); … … 472 638 // ---------------------------------------------------------------------------- 473 639 474 const LinkID BaseOverlay::establishLink( 475 const EndpointDescriptor& ep, const NodeID& nodeid, 476 const ServiceID& service, const NodeID& remoteRelay, const LinkID& linkid ) { 477 478 LinkID link_id = linkid; 640 const LinkID BaseOverlay::establishLink( const EndpointDescriptor& remoteEp, 641 const NodeID& remoteId, const ServiceID& service ) { 479 642 480 643 // establish link via overlay 481 if (!nodeid.isUnspecified()) 482 link_id = establishLink( nodeid, service, remoteRelay, link_id ); 644 if (!remoteId.isUnspecified()) 645 return establishLink( remoteId, service ); 646 else 483 647 484 648 // establish link directly if only ep is known 485 if (nodeid.isUnspecified()) 486 establishDirectLink( ep, service, link_id ); 487 488 return link_id; 649 if (remoteId.isUnspecified()) 650 return establishDirectLink(remoteEp, service ); 651 489 652 } 490 653 491 654 /// call base communication's establish link and add link mapping 492 655 const LinkID BaseOverlay::establishDirectLink( const EndpointDescriptor& ep, 493 const ServiceID& service, const LinkID& linkid ) { 494 495 // create a new link id if necessary 496 LinkID link_id = linkid; 497 if (link_id.isUnspecified()) link_id = LinkID::create(); 656 const ServiceID& service ) { 498 657 499 658 /// find a service listener … … 505 664 assert( listener != NULL ); 506 665 507 /// establish link and add mapping508 logging_info("Establishing direct link " << link_id.toString()509 << " using " << ep.toString());510 511 666 // create descriptor 512 LinkDescriptor* ld = addDescriptor( link_id ); 513 ld->overlayId = link_id; 514 ld->communicationId = link_id; 667 LinkDescriptor* ld = addDescriptor(); 668 ld->relayed = false; 515 669 ld->listener = listener; 516 670 ld->service = service; 517 bc->establishLink( ep, link_id ); 518 519 return link_id; 671 ld->communicationId = bc->establishLink( ep ); 672 673 /// establish link and add mapping 674 logging_info("Establishing direct link " << ld->communicationId.toString() 675 << " using " << ep.toString()); 676 677 return ld->communicationId; 520 678 } 521 679 522 680 /// establishes a link between two arbitrary nodes 523 const LinkID BaseOverlay::establishLink( const NodeID& node,524 const ServiceID& service, const NodeID& remoteRelay, const LinkID& link_id) {681 const LinkID BaseOverlay::establishLink( const NodeID& remote, 682 const ServiceID& service ) { 525 683 526 684 // do not establish a link to myself! 527 if ( node == nodeId) return LinkID::UNSPECIFIED;685 if (remote == nodeId) return LinkID::UNSPECIFIED; 528 686 529 687 // create a link descriptor 530 LinkDescriptor* ld = createLinkDescriptor( node, service, link_id ); 531 ld->remoteRelay = remoteRelay; 532 533 // create link request message with own link id 534 uint32_t nonce = (uint32_t)(rand() ^ (rand() << 16) ^ time(NULL)); 535 LinkRequest link_request_msg( 536 nonce, &bc->getEndpointDescriptor(), false, 537 ld->overlayId, ld->localRelay ); 538 OverlayMsg overlay_msg( OverlayMsg::typeLinkRequest, service, nodeId ); 539 overlay_msg.encapsulate( &link_request_msg ); 540 pendingLinks.insert( make_pair(nonce, ld->overlayId) ); 688 LinkDescriptor* ld = addDescriptor(); 689 ld->relayed = true; 690 ld->remoteNode = remote; 691 ld->service = service; 692 ld->listener = getListener(ld->service); 693 694 // create link request message 695 OverlayMsg msg(OverlayMsg::typeLinkRequest, service, nodeId, remote ); 696 msg.setSourceLink(ld->overlayId); 697 msg.setRelayed(true); 541 698 542 699 // debug message 543 logging_ debug(700 logging_info( 544 701 "Sending link request with" 545 << " link id=" << ld->overlayId 546 << " node id=" << ld->remoteNode.toString() 547 << " service id=" << ld->service.toString() 548 << " local relay id=" << ld->localRelay.toString() 549 << " nonce= " << nonce 702 << " link=" << ld->overlayId.toString() 703 << " node=" << ld->remoteNode.toString() 704 << " serv=" << ld->service.toString() 550 705 ); 551 706 552 // sending message t hrough new link553 send Message( &overlay_msg, ld);707 // sending message to node 708 send_node( &msg, ld->remoteNode, ld->service ); 554 709 555 710 return ld->overlayId; … … 558 713 /// drops an established link 559 714 void BaseOverlay::dropLink(const LinkID& link) { 560 logging_ debug( "Dropping link (initiated locally):" << link.toString() );715 logging_info( "Dropping link (initiated locally):" << link.toString() ); 561 716 562 717 // find the link item to drop … … 579 734 580 735 // do not drop relay links 581 if (!ld-> usedAsRelay) {736 if (!ld->relaying) { 582 737 // drop the link in base communication 583 738 if (ld->communicationUp) bc->dropLink( ld->communicationId ); … … 586 741 eraseDescriptor( ld->overlayId ); 587 742 } else 588 ld->drop WhenRelaysLeft= true;743 ld->dropAfterRelaying = true; 589 744 } 590 745 … … 605 760 // check if the link is up yet, if its an auto link queue message 606 761 if( !ld->up ) { 607 ld-> markAsUsed();762 ld->setAutoUsed(); 608 763 if( ld->autolink ) { 609 764 logging_info("Auto-link " << link.toString() << " not up, queue message"); … … 618 773 619 774 // compile overlay message (has service and node id) 620 OverlayMsg overmsg( OverlayMsg::typeData , ld->service, nodeId);775 OverlayMsg overmsg( OverlayMsg::typeData ); 621 776 overmsg.encapsulate( const_cast<Message*>(message) ); 622 777 623 778 // send message over relay/direct/overlay 624 return send Message( &overmsg, ld );779 return send_link( &overmsg, ld->overlayId ); 625 780 } 626 781 … … 640 795 ); 641 796 642 // this will call onlinkup on us, if everything worked we now have a mapping643 LinkID link = LinkID::create();644 645 797 // call base overlay to create a link 646 link = establishLink( node, service, NodeID::UNSPECIFIED, link);798 LinkID link = establishLink( node, service ); 647 799 ld = getDescriptor( link ); 648 800 if( ld == NULL ) { … … 658 810 659 811 // mark the link as used, as we now send a message through it 660 ld-> markAsUsed();812 ld->setAutoUsed(); 661 813 662 814 // send / queue message … … 794 946 795 947 // send join request message 796 OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest, nodeId ); 948 OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest, 949 OverlayInterface::OVERLAY_SERVICE_ID, nodeId ); 797 950 JoinRequest joinRequest( spovnetId, nodeId ); 798 951 overlayMsg.encapsulate( &joinRequest ); … … 804 957 if (ld == NULL) { 805 958 ld = addDescriptor( id ); 806 logging_ debug( "onLinkUp (remote request) descriptor: " << ld );959 logging_info( "onLinkUp (remote request) descriptor: " << ld ); 807 960 808 961 // update descriptor … … 810 963 ld->communicationId = id; 811 964 ld->communicationUp = true; 812 ld->markAsUsed(); 965 ld->setAutoUsed(); 966 ld->setAlive(); 813 967 814 968 // in this case, do not inform listener, since service it unknown … … 817 971 // link mapping found? -> send update message with node-id and service id 818 972 } else { 819 logging_debug( "onLinkUp descriptor (initiated locally):" << ld ); 820 821 // note: necessary to validate the link on the remote side! 822 logging_debug( "Sending out update" << 823 " for service " << ld->service.toString() << 824 " with local node id " << nodeId.toString() << 825 " on link " << ld->overlayId.toString() ); 973 logging_info( "onLinkUp descriptor (initiated locally):" << ld ); 826 974 827 975 // update descriptor 828 ld->markAsUsed(); 976 ld->setAutoUsed(); 977 ld->setAlive(); 829 978 ld->communicationUp = true; 830 831 // if link is a relayed link ->convert to direct link 832 if (ld->relay && !ld->remoteLinkId.isUnspecified() ) { 979 ld->fromRemote = false; 980 981 // if link is a relayed link->convert to direct link 982 if (ld->relayed) { 833 983 logging_info( "Converting to direct link: " << ld ); 834 984 ld->up = true; 835 ld->relay = false; 836 ld->localRelay = NodeID::UNSPECIFIED; 837 OverlayMsg overMsg( OverlayMsg::typeDirectLink, ld->service, nodeId ); 838 overMsg.setRelayLink( ld->remoteLinkId ); 839 bc->sendMessage( ld->communicationId, &overMsg ); 840 } 841 842 // compile and send update message 843 OverlayMsg overlayMsg( OverlayMsg::typeUpdate, ld->service, nodeId ); 844 overlayMsg.setAutoLink( ld->autolink ); 845 bc->sendMessage( ld->communicationId, &overlayMsg ); 985 ld->relayed = false; 986 OverlayMsg overMsg( OverlayMsg::typeLinkDirect ); 987 overMsg.setSourceLink( ld->overlayId ); 988 overMsg.setDestinationLink( ld->remoteLink ); 989 send_link( &overMsg, ld->overlayId ); 990 } else { 991 // note: necessary to validate the link on the remote side! 992 logging_info( "Sending out update" << 993 " for service " << ld->service.toString() << 994 " with local node id " << nodeId.toString() << 995 " on link " << ld->overlayId.toString() ); 996 997 // compile and send update message 998 OverlayMsg overlayMsg( OverlayMsg::typeLinkUpdate ); 999 overlayMsg.setSourceLink(ld->overlayId); 1000 overlayMsg.setAutoLink( ld->autolink ); 1001 send_link( &overlayMsg, ld->overlayId, true ); 1002 } 846 1003 } 847 1004 } … … 859 1016 logging_info( "onLinkDown descriptor: " << ld ); 860 1017 1018 // removing relay link information 1019 removeRelayLink(ld->overlayId); 1020 861 1021 // inform listeners about link down 862 1022 ld->communicationUp = false; 863 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode ); 864 sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId ); 1023 if (!ld->service.isUnspecified()) { 1024 getListener(ld->service)->onLinkDown( ld->overlayId, ld->remoteNode ); 1025 sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId ); 1026 } 865 1027 866 1028 // delete all queued messages (auto links) … … 889 1051 890 1052 // autolinks: refresh timestamp 891 ld-> markAsUsed();1053 ld->setAutoUsed(); 892 1054 } 893 1055 … … 908 1070 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode ); 909 1071 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId ); 910 911 // autolinks: refresh timestamp912 ld->markAsUsed();913 1072 } 914 1073 … … 921 1080 if ( ld == NULL ) return; // not found? ->ignore! 922 1081 logging_debug( "Link quality changed id=" << ld->overlayId.toString() ); 923 924 // autolinks: refresh timestamp925 ld->markAsUsed();926 1082 } 927 1083 … … 937 1093 // get descriptor for link 938 1094 LinkDescriptor* ld = getDescriptor( link, true ); 939 940 // link known? 941 if (ld == NULL) { // no-> handle with unspecified params 942 logging_debug("Received message from base communication, link descriptor unknown" ); 943 return handleMessage( message, LinkID::UNSPECIFIED, link, NodeID::UNSPECIFIED ); 944 } else { // yes -> handle with overlay link id 945 logging_debug("Received message from base communication, link id=" << ld->overlayId.toString() ); 946 return handleMessage( message, ld->overlayId, link, NodeID::UNSPECIFIED ); 947 } 1095 return handleMessage( message, ld, link ); 948 1096 } 949 1097 950 1098 // ---------------------------------------------------------------------------- 951 1099 952 /// handles a message from an overlay 953 void BaseOverlay::incomingRouteMessage( Message* msg, const LinkID& link, const NodeID& source ) { 954 logging_debug("Received message from overlay -- " 955 << " link id=" << link.toString() 956 << " node id=" << source.toString() ); 957 handleMessage( msg, link, LinkID::UNSPECIFIED, source ); 958 } 959 960 // ---------------------------------------------------------------------------- 961 962 /// handles an incoming message 963 bool BaseOverlay::handleMessage( const Message* message, 964 const LinkID& boLink, const LinkID& bcLink, const NodeID& remoteNode ) { 965 logging_debug( "Handling message: " << message->toString()); 966 967 bool ret = false; 968 969 // decapsulate overlay message 970 OverlayMsg* overlayMsg = 971 const_cast<Message*>(message)->decapsulate<OverlayMsg>(); 972 if( overlayMsg == NULL ) return false; 973 974 // mark the link as in action 975 LinkDescriptor* ld = getDescriptor(boLink); 976 if (ld == NULL) ld = getDescriptor(bcLink, true); 977 if (ld != NULL) { 978 ld->markAsUsed(); 979 ld->markAlive(); 980 } 981 982 switch ( overlayMsg->getType() ) { 983 // --------------------------------------------------------------------- 984 // Handle spovnet instance join requests 985 // --------------------------------------------------------------------- 986 case OverlayMsg::typeJoinRequest: { 987 988 // decapsulate message 989 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>(); 990 logging_info( "Received join request for spovnet " << 991 joinReq->getSpoVNetID().toString() ); 992 993 // check spovnet id 994 if( joinReq->getSpoVNetID() != spovnetId ) { 995 logging_error( 996 "Received join request for spovnet we don't handle " << 997 joinReq->getSpoVNetID().toString() ); 998 ret = false; 999 break; 1100 /// Handle spovnet instance join requests 1101 bool BaseOverlay::handleJoinRequest( OverlayMsg* overlayMsg, const LinkID& bcLink ) { 1102 1103 // decapsulate message 1104 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>(); 1105 logging_info( "Received join request for spovnet " << 1106 joinReq->getSpoVNetID().toString() ); 1107 1108 // check spovnet id 1109 if( joinReq->getSpoVNetID() != spovnetId ) { 1110 logging_error( 1111 "Received join request for spovnet we don't handle " << 1112 joinReq->getSpoVNetID().toString() ); 1113 return false; 1114 } 1115 1116 // TODO: here you can implement mechanisms to deny joining of a node 1117 bool allow = true; 1118 logging_info( "Sending join reply for spovnet " << 1119 spovnetId.toString() << " to node " << 1120 overlayMsg->getSourceNode().toString() << 1121 ". Result: " << (allow ? "allowed" : "denied") ); 1122 joiningNodes.push_back( overlayMsg->getSourceNode() ); 1123 1124 // return overlay parameters 1125 assert( overlayInterface != NULL ); 1126 logging_debug( "Using bootstrap end-point " 1127 << getEndpointDescriptor().toString() ) 1128 OverlayParameterSet parameters = overlayInterface->getParameters(); 1129 OverlayMsg retmsg( OverlayMsg::typeJoinReply, 1130 OverlayInterface::OVERLAY_SERVICE_ID, nodeId ); 1131 JoinReply replyMsg( spovnetId, parameters, 1132 allow, getEndpointDescriptor() ); 1133 retmsg.encapsulate(&replyMsg); 1134 bc->sendMessage( bcLink, &retmsg ); 1135 1136 return true; 1137 } 1138 1139 /// Handle replies to spovnet instance join requests 1140 bool BaseOverlay::handleJoinReply( OverlayMsg* overlayMsg, const LinkID& bcLink ) { 1141 // decapsulate message 1142 logging_debug("received join reply message"); 1143 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>(); 1144 1145 // correct spovnet? 1146 if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail 1147 logging_error( "Received SpoVNet join reply for " << 1148 replyMsg->getSpoVNetID().toString() << 1149 " != " << spovnetId.toString() ); 1150 delete replyMsg; 1151 return false; 1152 } 1153 1154 // access granted? no -> fail 1155 if( !replyMsg->getJoinAllowed() ) { 1156 logging_error( "Our join request has been denied" ); 1157 1158 // drop initiator link 1159 if( !bcLink.isUnspecified() ){ 1160 bc->dropLink( bcLink ); 1161 1162 vector<LinkID>::iterator it = std::find( 1163 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink); 1164 if( it != bootstrapLinks.end() ) 1165 bootstrapLinks.erase(it); 1166 } 1167 1168 // inform all registered services of the event 1169 BOOST_FOREACH( NodeListener* i, nodeListeners ) 1170 i->onJoinFailed( spovnetId ); 1171 1172 delete replyMsg; 1173 return true; 1174 } 1175 1176 // access has been granted -> continue! 1177 logging_info("Join request has been accepted for spovnet " << 1178 spovnetId.toString() ); 1179 1180 logging_debug( "Using bootstrap end-point " 1181 << replyMsg->getBootstrapEndpoint().toString() ); 1182 1183 // create overlay structure from spovnet parameter set 1184 // if we have not boostrapped yet against some other node 1185 if( overlayInterface == NULL ){ 1186 1187 logging_debug("first-time bootstrapping"); 1188 1189 overlayInterface = OverlayFactory::create( 1190 *this, replyMsg->getParam(), nodeId, this ); 1191 1192 // overlay structure supported? no-> fail! 1193 if( overlayInterface == NULL ) { 1194 logging_error( "overlay structure not supported" ); 1195 1196 if( !bcLink.isUnspecified() ){ 1197 bc->dropLink( bcLink ); 1198 1199 vector<LinkID>::iterator it = std::find( 1200 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink); 1201 if( it != bootstrapLinks.end() ) 1202 bootstrapLinks.erase(it); 1000 1203 } 1001 1204 1002 // TODO: here you can implement mechanisms to deny joining of a node 1003 bool allow = true; 1004 logging_info( "Sending join reply for spovnet " << 1005 spovnetId.toString() << " to node " << 1006 overlayMsg->getSourceNode().toString() << 1007 ". Result: " << (allow ? "allowed" : "denied") ); 1008 joiningNodes.push_back( overlayMsg->getSourceNode() ); 1009 1010 // return overlay parameters 1011 assert( overlayInterface != NULL ); 1012 logging_debug( "Using bootstrap end-point " 1013 << getEndpointDescriptor().toString() ) 1014 OverlayParameterSet parameters = overlayInterface->getParameters(); 1015 OverlayMsg retmsg( OverlayMsg::typeJoinReply, nodeId ); 1016 JoinReply replyMsg( spovnetId, parameters, 1017 allow, getEndpointDescriptor() ); 1018 retmsg.encapsulate(&replyMsg); 1019 bc->sendMessage( bcLink, &retmsg ); 1020 ret = true; 1021 break; 1022 } 1023 1024 // --------------------------------------------------------------------- 1025 // handle replies to spovnet instance join requests 1026 // --------------------------------------------------------------------- 1027 case OverlayMsg::typeJoinReply: { 1028 1029 // decapsulate message 1030 logging_debug("received join reply message"); 1031 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>(); 1032 1033 // correct spovnet? 1034 if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail 1035 logging_error( "Received SpoVNet join reply for " << 1036 replyMsg->getSpoVNetID().toString() << 1037 " != " << spovnetId.toString() ); 1038 ret = false; 1039 delete replyMsg; 1040 break; 1041 } 1042 1043 // access granted? no -> fail 1044 if( !replyMsg->getJoinAllowed() ) { 1045 logging_error( "Our join request has been denied" ); 1046 1047 // drop initiator link 1048 if(bcLink != LinkID::UNSPECIFIED){ 1049 bc->dropLink( bcLink ); 1050 1051 vector<LinkID>::iterator it = std::find( 1052 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink); 1053 if( it != bootstrapLinks.end() ) 1054 bootstrapLinks.erase(it); 1055 } 1056 1057 // inform all registered services of the event 1058 BOOST_FOREACH( NodeListener* i, nodeListeners ) 1059 i->onJoinFailed( spovnetId ); 1060 1061 ret = true; 1062 delete replyMsg; 1063 break; 1064 } 1065 1066 // access has been granted -> continue! 1067 logging_info("Join request has been accepted for spovnet " << 1068 spovnetId.toString() ); 1069 1070 logging_debug( "Using bootstrap end-point " 1071 << replyMsg->getBootstrapEndpoint().toString() ); 1072 1073 // 1074 // create overlay structure from spovnet parameter set 1075 // if we have not boostrapped yet against some other node 1076 // 1077 1078 if( overlayInterface == NULL ){ 1079 1080 logging_debug("first-time bootstrapping"); 1081 1082 overlayInterface = OverlayFactory::create( 1083 *this, replyMsg->getParam(), nodeId, this ); 1084 1085 // overlay structure supported? no-> fail! 1086 if( overlayInterface == NULL ) { 1087 logging_error( "overlay structure not supported" ); 1088 1089 if(bcLink != LinkID::UNSPECIFIED){ 1090 bc->dropLink( bcLink ); 1091 1092 vector<LinkID>::iterator it = std::find( 1093 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink); 1094 if( it != bootstrapLinks.end() ) 1095 bootstrapLinks.erase(it); 1096 } 1097 1205 // inform all registered services of the event 1206 BOOST_FOREACH( NodeListener* i, nodeListeners ) 1207 i->onJoinFailed( spovnetId ); 1208 1209 delete replyMsg; 1210 return true; 1211 } 1212 1213 // everything ok-> join the overlay! 1214 state = BaseOverlayStateCompleted; 1215 overlayInterface->createOverlay(); 1216 1217 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() ); 1218 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() ); 1219 1220 // update ovlvis 1221 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN); 1222 1223 // inform all registered services of the event 1224 BOOST_FOREACH( NodeListener* i, nodeListeners ) 1225 i->onJoinCompleted( spovnetId ); 1226 1227 delete replyMsg; 1228 1229 <<<<<<< .working 1098 1230 // inform all registered services of the event 1099 1231 BOOST_FOREACH( NodeListener* i, nodeListeners ) 1100 1232 i->onJoinFailed( spovnetId ); 1101 1102 delete replyMsg; 1103 ret = true; 1104 break; 1105 } 1106 1107 // everything ok-> join the overlay! 1108 state = BaseOverlayStateCompleted; 1109 overlayInterface->createOverlay(); 1110 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() ); 1111 1112 // update ovlvis 1113 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN); 1114 1115 // inform all registered services of the event 1116 BOOST_FOREACH( NodeListener* i, nodeListeners ) 1117 i->onJoinCompleted( spovnetId ); 1118 1119 } else { 1120 1121 // this is not the first bootstrap, just join the additional node 1122 logging_debug("not first-time bootstrapping"); 1123 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() ); 1124 1125 } // if( overlayInterface == NULL ) 1126 1233 ======= 1234 } else { 1235 >>>>>>> .merge-rechts.r5869 1236 1237 // this is not the first bootstrap, just join the additional node 1238 logging_debug("not first-time bootstrapping"); 1239 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() ); 1240 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() ); 1241 1242 delete replyMsg; 1243 } // if( overlayInterface == NULL ) 1244 1245 return true; 1246 } 1247 1248 1249 <<<<<<< .working 1250 ======= 1251 bool BaseOverlay::handleData( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { 1252 // get service 1253 const ServiceID& service = overlayMsg->getService(); 1254 logging_debug( "Received data for service " << service.toString() 1255 << " on link " << overlayMsg->getDestinationLink().toString() ); 1256 1257 >>>>>>> .merge-rechts.r5869 1258 // delegate data message 1259 getListener(service)->onMessage( 1260 overlayMsg, 1261 overlayMsg->getSourceNode(), 1262 overlayMsg->getDestinationLink() 1263 ); 1264 1265 return true; 1266 } 1267 1268 <<<<<<< .working 1269 ======= 1270 1271 >>>>>>> .merge-rechts.r5869 1272 bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { 1273 1274 <<<<<<< .working 1127 1275 //record bootstrap ep as good endpoint to join 1128 1276 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() ); … … 1132 1280 break; 1133 1281 } 1134 1135 // --------------------------------------------------------------------- 1136 // handle data forward messages 1137 // --------------------------------------------------------------------- 1138 case OverlayMsg::typeData: { 1139 1140 // get service 1141 const ServiceID& service = overlayMsg->getService(); 1142 logging_debug( "received data for service " << service.toString() ); 1143 1144 // find listener 1145 CommunicationListener* listener = 1146 communicationListeners.get( service ); 1147 if( listener == NULL ) { 1148 ret = true; 1149 break; 1150 } 1151 1282 ======= 1283 if( ld == NULL ) { 1284 logging_warn( "received overlay update message for link for " 1285 << "which we have no mapping" ); 1286 return false; 1287 } 1288 logging_info("Received type update message on link " << ld ); 1289 >>>>>>> .merge-rechts.r5869 1290 1291 // update our link mapping information for this link 1292 bool changed = 1293 ( ld->remoteNode != overlayMsg->getSourceNode() ) 1294 || ( ld->service != overlayMsg->getService() ); 1295 1296 // set parameters 1297 ld->up = true; 1298 ld->remoteNode = overlayMsg->getSourceNode(); 1299 ld->remoteLink = overlayMsg->getSourceLink(); 1300 ld->service = overlayMsg->getService(); 1301 ld->autolink = overlayMsg->isAutoLink(); 1302 1303 // if our link information changed, we send out an update, too 1304 if( changed ) { 1305 overlayMsg->swapRoles(); 1306 overlayMsg->setSourceNode(nodeId); 1307 overlayMsg->setSourceLink(ld->overlayId); 1308 overlayMsg->setService(ld->service); 1309 send( overlayMsg, ld ); 1310 } 1311 1312 <<<<<<< .working 1152 1313 // delegate data message 1153 1314 listener->onMessage( overlayMsg, 1154 1315 overlayMsg->getSourceNode(), ld->overlayId ); 1155 1156 ret = true; 1157 break; 1158 } 1159 1160 // --------------------------------------------------------------------- 1161 // handle update messages for link establishment 1162 // --------------------------------------------------------------------- 1163 case OverlayMsg::typeUpdate: { 1164 // get info 1165 const NodeID& sourcenode = overlayMsg->getSourceNode(); 1166 const ServiceID& service = overlayMsg->getService(); 1167 1168 // no link descriptor available -> error! 1169 if( ld == NULL ) { 1170 logging_warn( "received overlay update message for link for " 1171 << "which we have no mapping" ); 1172 ret = false; 1173 break; 1174 } 1175 logging_debug("Received type update message on link " << ld ); 1176 1177 // update our link mapping information for this link 1178 bool changed = 1179 ( ld->remoteNode != sourcenode ) || ( ld->service != service ); 1180 ld->remoteNode = sourcenode; 1181 ld->service = service; 1182 ld->autolink = overlayMsg->isAutoLink(); 1183 1184 // if our link information changed, we send out an update, too 1185 if( changed ) { 1186 OverlayMsg overMsg( OverlayMsg::typeUpdate, ld->service, nodeId ); 1187 overMsg.setAutoLink(ld->autolink); 1188 bc->sendMessage( ld->communicationId, &overMsg ); 1189 } 1190 1191 // service registered? no-> error! 1192 if( !communicationListeners.contains( service ) ) { 1193 logging_warn( "Link up: event listener has not been registered" ); 1194 ret = false; 1195 break; 1196 } 1197 1198 // default or no service registered? 1199 CommunicationListener* listener = communicationListeners.get( service ); 1200 if( listener == NULL || listener == &CommunicationListener::DEFAULT ) { 1201 logging_warn("Link up: event listener is default or null!" ); 1202 ret = true; 1203 break; 1204 } 1205 1206 // update descriptor 1207 ld->listener = listener; 1208 ld->markAsUsed(); 1209 ld->markAlive(); 1210 1211 // ask the service whether it wants to accept this link 1212 if( !listener->onLinkRequest(sourcenode) ) { 1213 1214 logging_debug("Link id=" << ld->overlayId.toString() << 1215 " has been denied by service " << service.toString() << ", dropping link"); 1216 1217 // prevent onLinkDown calls to the service 1218 ld->listener = &CommunicationListener::DEFAULT; 1219 1220 // drop the link 1221 dropLink( ld->overlayId ); 1222 ret = true; 1223 break; 1224 } 1225 1226 // set link up 1227 ld->up = true; 1228 logging_debug( 1229 "Link " << ld->overlayId.toString() 1230 << " has been accepted by service " << service.toString() 1231 << " and is now up" 1232 ); 1233 1234 // auto links: link has been accepted -> send queued messages 1235 if( ld->messageQueue.size() > 0 ) { 1236 logging_info( "sending out queued messages on link " << 1237 ld->overlayId.toString() ); 1238 BOOST_FOREACH( Message* msg, ld->messageQueue ) { 1239 sendMessage( msg, ld->overlayId ); 1240 delete msg; 1241 } 1242 ld->messageQueue.clear(); 1243 } 1244 1245 // call the notification functions 1246 listener->onLinkUp( ld->overlayId, sourcenode ); 1247 sideport->onLinkUp( ld->overlayId, nodeId, sourcenode, this->spovnetId ); 1248 1249 ret = true; 1250 break; 1251 } 1252 1253 // --------------------------------------------------------------------- 1254 // handle link request forwarded through the overlay 1255 // --------------------------------------------------------------------- 1256 case OverlayMsg::typeLinkRequest: { 1257 1258 logging_debug( "received link request on link" ); 1259 1260 // decapsulate message 1261 LinkRequest* linkReq = overlayMsg->decapsulate<LinkRequest>(); 1262 const ServiceID& service = overlayMsg->getService(); 1263 1264 // is request reply? 1265 if ( linkReq->isReply() ) { 1266 1267 // find link 1268 PendingLinkMap::iterator i = pendingLinks.find( linkReq->getNonce() ); 1269 if ( i == pendingLinks.end() ) { 1270 logging_error( "Nonce not found in link request" ); 1271 ret = true; 1272 break; 1273 } 1274 1275 // debug message 1276 logging_debug( "Link request reply received. Establishing link " 1277 << i->second << " to " << (linkReq->getEndpoint()->toString()) 1278 << " for service " << service.toString() 1279 << " with nonce " << linkReq->getNonce() 1280 << " using relay " << linkReq->getRelay().toString() 1281 << " and remote link id=" << linkReq->getRemoteLinkId() 1282 ); 1283 1284 // get descriptor 1285 LinkDescriptor* ldn = getDescriptor(i->second); 1286 if (ldn==NULL) { 1287 delete linkReq; 1288 ret = true; 1289 break; 1290 } 1291 1292 // check if link request reply has a relay node ... 1293 if (!linkReq->getRelay().isUnspecified()) { // yes-> 1294 ldn->up = true; 1295 ldn->relay = true; 1296 if (ldn->localRelay.isUnspecified()) { 1297 logging_error("On LinkRequest reply: local relay is unspecifed on link " << ldn ); 1298 showLinkState(); 1299 } 1300 ldn->remoteRelay = linkReq->getRelay(); 1301 ldn->remoteLinkId = linkReq->getRemoteLinkId(); 1302 ldn->remoteNode = overlayMsg->getSourceNode(); 1303 1304 ldn->markAlive(); 1305 1306 // compile and send update message 1307 OverlayMsg _overlayMsg( OverlayMsg::typeUpdate, ldn->service, nodeId ); 1308 _overlayMsg.setAutoLink(ldn->autolink); 1309 sendMessage( &_overlayMsg, ldn ); 1310 1311 // auto links: link has been accepted -> send queued messages 1312 if( ldn->messageQueue.size() > 0 ) { 1313 logging_info( "Sending out queued messages on link " << 1314 ldn->overlayId.toString() ); 1315 BOOST_FOREACH( Message* msg, ldn->messageQueue ) { 1316 sendMessage( msg, ldn->overlayId ); 1317 delete msg; 1318 } 1319 ldn->messageQueue.clear(); 1320 } 1321 1322 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode ); 1323 1324 // try to establish a direct link 1325 ldn->communicationId = 1326 bc->establishLink( *linkReq->getEndpoint(), i->second ); 1327 } 1328 1329 // no relay node-> use overlay routing 1330 else { 1331 ldn->up = true; 1332 1333 // establish direct link 1334 ldn->communicationId = 1335 bc->establishLink( *linkReq->getEndpoint(), i->second ); 1336 } 1337 } else { 1338 logging_debug( "Link request received from node id=" 1339 << overlayMsg->getSourceNode() ); 1340 1341 // create link descriptor 1342 LinkDescriptor* ldn = 1343 createLinkDescriptor(overlayMsg->getSourceNode(), 1344 overlayMsg->getService(), LinkID::UNSPECIFIED ); 1345 assert(!ldn->overlayId.isUnspecified()); 1346 1347 // create reply message 1348 OverlayMsg overlay_msg( OverlayMsg::typeLinkRequest, service, nodeId ); 1349 LinkRequest link_request_msg( 1350 linkReq->getNonce(), 1351 &bc->getEndpointDescriptor(), 1352 true, ldn->overlayId, ldn->localRelay 1353 ); 1354 overlay_msg.encapsulate( &link_request_msg ); 1355 1356 // debug message 1357 logging_debug( "Sending LinkRequest reply for link with nonce " << 1358 linkReq->getNonce() ); 1359 1360 // if this is a relay link-> update information & inform listeners 1361 if (!linkReq->getRelay().isUnspecified()) { 1362 // set flags 1363 ldn->up = true; 1364 ldn->relay = true; 1365 if (ldn->localRelay.isUnspecified()) { 1366 logging_error("On LinkRequest request: local relay is unspecifed on link " << ldn ); 1367 showLinkState(); 1368 } 1369 ldn->remoteRelay = linkReq->getRelay(); 1370 ldn->remoteNode = overlayMsg->getSourceNode(); 1371 ldn->remoteLinkId = linkReq->getRemoteLinkId(); 1372 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode ); 1373 } 1374 1375 // route message back over overlay 1376 sendMessage( &overlay_msg, ldn ); 1377 } 1378 delete linkReq; 1379 ret = true; 1380 break; 1381 } 1382 1383 // --------------------------------------------------------------------- 1384 // handle relay message to forward messages 1385 // --------------------------------------------------------------------- 1386 case OverlayMsg::typeRelay: { 1387 1388 logging_debug( "received relay request on link" ); 1389 1390 // decapsulate message 1391 RelayMessage* relayMsg = overlayMsg->decapsulate<RelayMessage>(); 1392 1393 // is relay message informative? 1394 switch (relayMsg->getType()) { 1395 1396 // handle relay notification 1397 case RelayMessage::typeInform: { 1398 logging_info("Received relay information message with" 1399 << " relay " << relayMsg->getRelayNode() 1400 << " destination " << relayMsg->getDestNode() ); 1401 1316 ======= 1317 // service registered? no-> error! 1318 if( !communicationListeners.contains( ld->service ) ) { 1319 logging_warn( "Link up: event listener has not been registered" ); 1320 return false; 1321 } 1322 >>>>>>> .merge-rechts.r5869 1323 1324 // default or no service registered? 1325 CommunicationListener* listener = communicationListeners.get( ld->service ); 1326 if( listener == NULL || listener == &CommunicationListener::DEFAULT ) { 1327 logging_warn("Link up: event listener is default or null!" ); 1328 return true; 1329 } 1330 1331 // update descriptor 1332 ld->listener = listener; 1333 ld->setAutoUsed(); 1334 ld->setAlive(); 1335 1336 // ask the service whether it wants to accept this link 1337 if( !listener->onLinkRequest(ld->remoteNode) ) { 1338 1339 logging_debug("Link id=" << ld->overlayId.toString() << 1340 " has been denied by service " << ld->service.toString() << ", dropping link"); 1341 1342 // prevent onLinkDown calls to the service 1343 ld->listener = &CommunicationListener::DEFAULT; 1344 1345 // drop the link 1346 dropLink( ld->overlayId ); 1347 return true; 1348 } 1349 1350 // set link up 1351 ld->up = true; 1352 logging_info( "Link has been accepted by service and is up: " << ld ); 1353 1354 // auto links: link has been accepted -> send queued messages 1355 if( ld->messageQueue.size() > 0 ) { 1356 logging_info( "Sending out queued messages on link " << ld ); 1357 BOOST_FOREACH( Message* msg, ld->messageQueue ) { 1358 sendMessage( msg, ld->overlayId ); 1359 delete msg; 1360 } 1361 ld->messageQueue.clear(); 1362 } 1363 1364 // call the notification functions 1365 listener->onLinkUp( ld->overlayId, ld->remoteNode ); 1366 sideport->onLinkUp( ld->overlayId, nodeId, ld->remoteNode, this->spovnetId ); 1367 1368 return true; 1369 } 1370 1371 /// handle a link request and reply 1372 bool BaseOverlay::handleLinkRequest( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { 1373 logging_info( "Link request received from node id=" << overlayMsg->getSourceNode() ); 1374 1375 //TODO: Check if a request has already been sent using getSourceLink() ... 1376 1377 // create link descriptor 1378 LinkDescriptor* ldn = addDescriptor(); 1379 1380 // flags 1381 ldn->up = true; 1382 ldn->fromRemote = true; 1383 ldn->relayed = true; 1384 1385 // parameters 1386 ldn->service = overlayMsg->getService(); 1387 ldn->listener = getListener(ldn->service); 1388 ldn->remoteNode = overlayMsg->getSourceNode(); 1389 ldn->remoteLink = overlayMsg->getSourceLink(); 1390 1391 // update time-stamps 1392 ldn->setAlive(); 1393 ldn->setAutoUsed(); 1394 1395 // create reply message and send back! 1396 overlayMsg->swapRoles(); // swap source/destination 1397 overlayMsg->setType(OverlayMsg::typeLinkReply); 1398 overlayMsg->setSourceLink(ldn->overlayId); 1399 overlayMsg->setSourceEndpoint( bc->getEndpointDescriptor() ); 1400 overlayMsg->setRelayed(true); 1401 send( overlayMsg, ld ); // send back to link 1402 1403 // inform listener 1404 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode ); 1405 1406 return true; 1407 } 1408 1409 bool BaseOverlay::handleLinkReply( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { 1410 1411 // find link request 1412 LinkDescriptor* ldn = getDescriptor(overlayMsg->getDestinationLink()); 1413 1414 // not found? yes-> drop with error! 1415 if (ldn == NULL) { 1416 logging_error( "No link request pending for " 1417 << overlayMsg->getDestinationLink().toString() ); 1418 return false; 1419 } 1420 logging_debug("Handling link reply for " << ldn ) 1421 1422 // check if already up 1423 if (ldn->up) { 1424 logging_warn( "Link already up: " << ldn ); 1425 return true; 1426 } 1427 1428 // debug message 1429 logging_debug( "Link request reply received. Establishing link" 1430 << " for service " << overlayMsg->getService().toString() 1431 << " with local id=" << overlayMsg->getDestinationLink() 1432 << " and remote link id=" << overlayMsg->getSourceLink() 1433 << " to " << overlayMsg->getSourceEndpoint().toString() 1434 ); 1435 1436 // set local link descriptor data 1437 ldn->up = true; 1438 ldn->relayed = true; 1439 ldn->service = overlayMsg->getService(); 1440 ldn->listener = getListener(ldn->service); 1441 ldn->remoteLink = overlayMsg->getSourceLink(); 1442 ldn->remoteNode = overlayMsg->getSourceNode(); 1443 1444 // update timestamps 1445 ldn->setAlive(); 1446 ldn->setAutoUsed(); 1447 1448 // auto links: link has been accepted -> send queued messages 1449 if( ldn->messageQueue.size() > 0 ) { 1450 logging_info( "Sending out queued messages on link " << 1451 ldn->overlayId.toString() ); 1452 BOOST_FOREACH( Message* msg, ldn->messageQueue ) { 1453 sendMessage( msg, ldn->overlayId ); 1454 delete msg; 1455 } 1456 ldn->messageQueue.clear(); 1457 } 1458 1459 // inform listeners about new link 1460 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode ); 1461 1462 // try to replace relay link with direct link 1463 ldn->communicationId = 1464 bc->establishLink( overlayMsg->getSourceEndpoint() ); 1465 1466 return true; 1467 } 1468 1469 <<<<<<< .working 1470 ======= 1471 /// handle a keep-alive message for a link 1472 bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { 1473 LinkDescriptor* rld = getDescriptor(overlayMsg->getDestinationLink()); 1474 if ( rld != NULL ) { 1475 logging_debug("Keep-Alive for " << 1476 overlayMsg->getDestinationLink() ); 1477 if (overlayMsg->isRouteRecord()) 1478 rld->routeRecord = overlayMsg->getRouteRecord(); 1479 rld->setAlive(); 1480 return true; 1481 } else { 1482 logging_error("Keep-Alive for " 1483 << overlayMsg->getDestinationLink() << ": link unknown." ); 1484 return false; 1485 } 1486 } 1487 1488 /// handle a direct link message 1489 bool BaseOverlay::handleLinkDirect( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { 1490 logging_debug( "Received direct link replacement request" ); 1491 1492 >>>>>>> .merge-rechts.r5869 1493 /// get destination overlay link 1494 LinkDescriptor* rld = getDescriptor( overlayMsg->getDestinationLink() ); 1495 if (rld == NULL || ld == NULL) { 1496 logging_error("Direct link replacement: Link " 1497 << overlayMsg->getDestinationLink() << "not found error." ); 1498 return false; 1499 } 1500 logging_info( "Received direct link convert notification for " << rld ); 1501 1502 // update information 1503 rld->communicationId = ld->communicationId; 1504 rld->communicationUp = true; 1505 rld->relayed = false; 1506 1507 <<<<<<< .working 1402 1508 // mark incoming link as relay 1403 1509 if (ld!=NULL) ld->markAsRelay(); … … 1417 1523 _relayMsg.setType( RelayMessage::typeRoute ); 1418 1524 _overMsg.encapsulate( &_relayMsg ); 1419 1420 // forward message 1421 if (relayMsg->getRelayNode() == nodeId || relayMsg->getRelayNode().isUnspecified()) { 1422 logging_info("Routing relay message to " << relayMsg->getDestNode().toString() ); 1423 sendOverlay( &_overMsg, relayMsg->getDestNode() ); 1424 } else { 1425 logging_info("Routing relay message to " << relayMsg->getRelayNode().toString() ); 1426 sendOverlay( &_overMsg, relayMsg->getRelayNode() ); 1427 } 1428 ret = true; 1429 break; 1430 } 1431 1432 // handle relay routing 1433 case RelayMessage::typeRoute: { 1434 logging_info("Received relay route message with" 1435 << " relay " << relayMsg->getRelayNode() 1436 << " destination " << relayMsg->getDestNode() ); 1437 1525 ======= 1526 // mark used and alive! 1527 rld->setAlive(); 1528 rld->setAutoUsed(); 1529 >>>>>>> .merge-rechts.r5869 1530 1531 // erase the original descriptor 1532 eraseDescriptor(ld->overlayId); 1533 } 1534 1535 /// handles an incoming message 1536 bool BaseOverlay::handleMessage( const Message* message, LinkDescriptor* ld, 1537 const LinkID bcLink ) { 1538 logging_debug( "Handling message: " << message->toString()); 1539 1540 <<<<<<< .working 1438 1541 // mark incoming link as relay 1439 1542 if (ld!=NULL) ld->markAsRelay(); … … 1454 1557 RelayMessage _relayMsg( *relayMsg ); 1455 1558 _overMsg.encapsulate(&_relayMsg); 1456 1457 /// this must be handled by using relay link! 1458 sendOverlay(&_overMsg, relayMsg->getDestNode()); 1459 ret = true; 1460 break; 1461 } 1462 1463 // error: I'm not a relay or destination! 1464 logging_error("This node is neither relay nor destination. Dropping Message!"); 1465 ret = true; 1466 break; 1467 } 1468 default: { 1469 logging_error("RelayMessage Unknown!"); 1470 ret = true; 1471 break; 1472 } 1473 } 1474 delete relayMsg; 1475 break; 1476 } 1477 1478 // --------------------------------------------------------------------- 1479 // handle keep-alive messages 1480 // --------------------------------------------------------------------- 1481 case OverlayMsg::typeKeepAlive: { 1482 logging_debug( "received keep-alive on link" ); 1483 if ( ld != NULL ) { 1484 logging_info("Keep-Alive for "<< ld->overlayId); 1485 ld->markAlive(); 1486 } 1487 break; 1488 } 1489 1490 // --------------------------------------------------------------------- 1491 // handle direct link replacement messages 1492 // --------------------------------------------------------------------- 1493 case OverlayMsg::typeDirectLink: { 1494 1495 logging_debug( "Received direct link replacement request" ); 1496 1497 LinkDescriptor* rld = getDescriptor( overlayMsg->getRelayLink() ); 1498 if (rld == NULL || ld == NULL) { 1499 logging_error("Direct link replacement: Link " 1500 << overlayMsg->getRelayLink() << "not found error." ); 1501 break; 1502 } 1503 logging_info( "Received direct link convert notification for " << rld ); 1504 1505 // set communcation link id and set it up 1506 rld->communicationId = ld->communicationId; 1507 1508 // this is neccessary since this link was a relay link before! 1509 rld->communicationUp = true; 1510 1511 // this is not a relay link anymore! 1512 rld->relay = false; 1513 rld->localRelay = NodeID::UNSPECIFIED; 1514 rld->remoteRelay = NodeID::UNSPECIFIED; 1515 1516 // mark used and alive! 1517 rld->markAsUsed(); 1518 rld->markAlive(); 1519 1520 // erase the original descriptor 1521 eraseDescriptor(ld->overlayId); 1522 break; 1523 } 1524 1525 // --------------------------------------------------------------------- 1559 ======= 1560 // decapsulate overlay message 1561 OverlayMsg* overlayMsg = 1562 const_cast<Message*>(message)->decapsulate<OverlayMsg>(); 1563 if( overlayMsg == NULL ) return false; 1564 >>>>>>> .merge-rechts.r5869 1565 1566 // refresh relay information 1567 refreshRelayInformation( overlayMsg, ld ); 1568 1569 // increase number of hops 1570 overlayMsg->increaseNumHops(); 1571 1572 // update route record 1573 overlayMsg->addRouteRecord(nodeId); 1574 1575 // handle signaling messages (do not route!) 1576 if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart && 1577 overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) { 1578 overlayInterface->onMessage(overlayMsg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED); 1579 delete overlayMsg; 1580 return true; 1581 } 1582 1583 // message for reached destination? no-> route message 1584 if (!overlayMsg->getDestinationNode().isUnspecified() && 1585 overlayMsg->getDestinationNode() != nodeId ) { 1586 logging_debug("Routing message " 1587 << " from " << overlayMsg->getSourceNode() 1588 << " to " << overlayMsg->getDestinationNode() 1589 ); 1590 1591 route( overlayMsg, ld ); 1592 delete overlayMsg; 1593 return true; 1594 } 1595 1596 // handle base overlay message 1597 bool ret = false; // return value 1598 switch ( overlayMsg->getType() ) { 1599 1600 // data transport messages 1601 case OverlayMsg::typeData: 1602 ret = handleData(overlayMsg, ld); break; 1603 1604 // overlay setup messages 1605 case OverlayMsg::typeJoinRequest: 1606 ret = handleJoinRequest(overlayMsg, bcLink ); break; 1607 case OverlayMsg::typeJoinReply: 1608 ret = handleJoinReply(overlayMsg, bcLink ); break; 1609 1610 // link specific messages 1611 case OverlayMsg::typeLinkRequest: 1612 ret = handleLinkRequest(overlayMsg, ld ); break; 1613 case OverlayMsg::typeLinkReply: 1614 ret = handleLinkReply(overlayMsg, ld ); break; 1615 case OverlayMsg::typeLinkUpdate: 1616 ret = handleLinkUpdate(overlayMsg, ld ); break; 1617 case OverlayMsg::typeLinkAlive: 1618 ret = handleLinkAlive(overlayMsg, ld ); break; 1619 case OverlayMsg::typeLinkDirect: 1620 ret = handleLinkDirect(overlayMsg, ld ); break; 1621 1526 1622 // handle unknown message type 1527 // ---------------------------------------------------------------------1528 1623 default: { 1529 1624 logging_error( "received message in invalid state! don't know " << 1530 "what to do with this message of type " << 1531 overlayMsg->getType() ); 1625 "what to do with this message of type " << overlayMsg->getType() ); 1532 1626 ret = false; 1533 1627 break; 1534 1628 } 1535 } /* switch */ 1536 1629 } 1630 1631 // free overlay message and return value 1537 1632 delete overlayMsg; 1538 1633 return ret; … … 1554 1649 } 1555 1650 1651 /// return the overlay neighbors 1556 1652 vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const { 1557 1653 1558 1654 vector<NodeID> nodes = overlayInterface->getKnownNodes(deep); 1559 1560 1655 // the known nodes _can_ also include our node, so we remove ourself 1561 1656 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId ); 1562 1657 if( i != nodes.end() ) nodes.erase( i ); 1563 1564 1658 return nodes; 1565 1659 } … … 1594 1688 1595 1689 void BaseOverlay::eventFunction() { 1690 <<<<<<< .working 1596 1691 1597 1692 // send keep-alive messages over established links … … 1673 1768 if (counter>=4) showLinkState(); 1674 1769 if (counter>=4 || counter<0) counter = 0; 1675 } 1676 1677 void BaseOverlay::showLinkState() { 1678 int i=0; 1679 logging_info("--- link state -------------------------------"); 1680 BOOST_FOREACH( LinkDescriptor* ld, links ) { 1681 logging_info("link " << i << ": " << ld); 1682 i++; 1683 } 1684 logging_info("----------------------------------------------"); 1770 ======= 1771 stabilizeRelays(); 1772 stabilizeLinks(); 1773 >>>>>>> .merge-rechts.r5869 1685 1774 } 1686 1775
Note:
See TracChangeset
for help on using the changeset viewer.