00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include "BaseOverlay.h"
00040
00041 #include <sstream>
00042 #include <iostream>
00043 #include <string>
00044 #include <boost/foreach.hpp>
00045
00046 #include "ariba/NodeListener.h"
00047 #include "ariba/CommunicationListener.h"
00048 #include "ariba/SideportListener.h"
00049
00050 #include "ariba/overlay/LinkDescriptor.h"
00051
00052 #include "ariba/overlay/messages/OverlayMsg.h"
00053 #include "ariba/overlay/messages/DHTMessage.h"
00054 #include "ariba/overlay/messages/JoinRequest.h"
00055 #include "ariba/overlay/messages/JoinReply.h"
00056
00057 #include "ariba/utility/visual/OvlVis.h"
00058 #include "ariba/utility/visual/DddVis.h"
00059 #include "ariba/utility/visual/ServerVis.h"
00060
00061 namespace ariba {
00062 namespace overlay {
00063
00064 #define visual ariba::utility::DddVis::instance()
00065 #define visualIdOverlay ariba::utility::ServerVis::NETWORK_ID_BASE_OVERLAY
00066 #define visualIdBase ariba::utility::ServerVis::NETWORK_ID_BASE_COMMUNICATION
00067
00068 class ValueEntry {
00069 public:
00070 ValueEntry( const Data& value ) : ttl(0), last_update(time(NULL)),
00071 last_change(time(NULL)), value(value.clone()) {
00072 }
00073
00074 ValueEntry( const ValueEntry& value ) :
00075 ttl(value.ttl), last_update(value.last_update),
00076 last_change(value.last_change), value(value.value.clone()) {
00077
00078 }
00079
00080 ~ValueEntry() {
00081 value.release();
00082 }
00083
00084 void refresh() {
00085 last_update = time(NULL);
00086 }
00087
00088 void set_value( const Data& value ) {
00089 this->value.release();
00090 this->value = value.clone();
00091 this->last_change = time(NULL);
00092 this->last_update = time(NULL);
00093 }
00094
00095 Data get_value() const {
00096 return value;
00097 }
00098
00099 uint16_t get_ttl() const {
00100 return ttl;
00101 }
00102
00103 void set_ttl( uint16_t ttl ) {
00104 this->ttl = ttl;
00105 }
00106
00107 bool is_ttl_elapsed() const {
00108
00109 if (ttl==0) return false;
00110
00111
00112 return ( difftime( time(NULL), this->last_update ) >= ttl );
00113 }
00114
00115 private:
00116 uint16_t ttl;
00117 time_t last_update;
00118 time_t last_change;
00119 Data value;
00120 };
00121
00122 class DHTEntry {
00123 public:
00124 Data key;
00125 vector<ValueEntry> values;
00126
00127 vector<Data> get_values() {
00128 vector<Data> vect;
00129 BOOST_FOREACH( ValueEntry& e, values )
00130 vect.push_back( e.get_value() );
00131 return vect;
00132 }
00133
00134 void erase_expired_entries() {
00135 for (vector<ValueEntry>::iterator i = values.begin();
00136 i != values.end(); i++ )
00137 if (i->is_ttl_elapsed()) i = values.erase(i)-1;
00138 }
00139 };
00140
00141 class DHT {
00142 public:
00143 typedef vector<DHTEntry> Entries;
00144 typedef vector<ValueEntry> Values;
00145 Entries entries;
00146 static const bool verbose = false;
00147
00148 static bool equals( const Data& lhs, const Data& rhs ) {
00149 if (rhs.getLength()!=lhs.getLength()) return false;
00150 for (size_t i=0; i<lhs.getLength()/8; i++)
00151 if (lhs.getBuffer()[i] != rhs.getBuffer()[i]) return false;
00152 return true;
00153 }
00154
00155 void put( const Data& key, const Data& value, uint16_t ttl = 0 ) {
00156
00157
00158 for (size_t i=0; i<entries.size(); i++) {
00159 DHTEntry& entry = entries.at(i);
00160
00161
00162 if ( equals(entry.key, key) ) {
00163
00164
00165 for (size_t j=0; j<entry.values.size(); j++) {
00166
00167 if ( equals(entry.values[j].get_value(), value) ) {
00168 entry.values[j].refresh();
00169 if (verbose)
00170 std::cout << "DHT: Republished value. Refreshing value timestamp."
00171 << std::endl;
00172 return;
00173 }
00174 }
00175
00176
00177 if (verbose)
00178 std::cout << "DHT: Added value to "
00179 << " key=" << key << " with value=" << value << std::endl;
00180 entry.values.push_back( ValueEntry( value ) );
00181 entry.values.back().set_ttl(ttl);
00182 return;
00183 }
00184 }
00185
00186
00187 if (verbose)
00188 std::cout << "DHT: New key value pair "
00189 << " key=" << key << " with value=" << value << std::endl;
00190
00191
00192 entries.push_back( DHTEntry() );
00193 DHTEntry& entry = entries.back();
00194 entry.key = key.clone();
00195 entry.values.push_back( ValueEntry(value) );
00196 entry.values.back().set_ttl(ttl);
00197 }
00198
00199 vector<Data> get( const Data& key ) {
00200
00201 for (size_t i=0; i<entries.size(); i++) {
00202 DHTEntry& entry = entries.at(i);
00203 if ( equals(entry.key,key) )
00204 return entry.get_values();
00205 }
00206 return vector<Data>();
00207 }
00208
00209 bool remove( const Data& key ) {
00210
00211 for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
00212 DHTEntry& entry = *i;
00213
00214
00215 if ( equals(entry.key, key) ) {
00216 i = entries.erase(i)-1;
00217 return true;
00218 }
00219 }
00220 return false;
00221 }
00222
00223 bool remove( const Data& key, const Data& value ) {
00224
00225 for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
00226 DHTEntry& entry = *i;
00227
00228
00229 if ( equals(entry.key, key) ) {
00230 for (Values::iterator j = entry.values.begin();
00231 j != entry.values.end(); j++) {
00232
00233
00234 if (equals(j->get_value(), value)) {
00235 j = entry.values.erase(j)-1;
00236 return true;
00237 }
00238 }
00239 }
00240 }
00241 return false;
00242 }
00243
00244 void cleanup() {
00245
00246 for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
00247 DHTEntry& entry = *i;
00248
00249 for (Values::iterator j = entry.values.begin();
00250 j != entry.values.end(); j++) {
00251
00252
00253 if (j->is_ttl_elapsed())
00254 j = entry.values.erase(j)-1;
00255 }
00256
00257 if (entry.values.size()==0) i = entries.erase(i)-1;
00258 }
00259 }
00260 };
00261
00262
00263
00264
00265
00266
00267
00268 CommunicationListener* BaseOverlay::getListener( const ServiceID& service ) {
00269 if( !communicationListeners.contains( service ) ) {
00270 logging_info( "No listener found for service " << service.toString() );
00271 return NULL;
00272 }
00273 CommunicationListener* listener = communicationListeners.get( service );
00274 assert( listener != NULL );
00275 return listener;
00276 }
00277
00278
00279
00280 LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
00281 BOOST_FOREACH( LinkDescriptor* lp, links )
00282 if ((communication ? lp->communicationId : lp->overlayId) == link)
00283 return lp;
00284 return NULL;
00285 }
00286
00287 const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
00288 BOOST_FOREACH( const LinkDescriptor* lp, links )
00289 if ((communication ? lp->communicationId : lp->overlayId) == link)
00290 return lp;
00291 return NULL;
00292 }
00293
00295 void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
00296 for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
00297 LinkDescriptor* ld = *i;
00298 if ((communication ? ld->communicationId : ld->overlayId) == link) {
00299 delete ld;
00300 links.erase(i);
00301 break;
00302 }
00303 }
00304 }
00305
00307 LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
00308 LinkDescriptor* desc = getDescriptor( link );
00309 if ( desc == NULL ) {
00310 desc = new LinkDescriptor();
00311 if (!link.isUnspecified()) desc->overlayId = link;
00312 links.push_back(desc);
00313 }
00314 return desc;
00315 }
00316
00318 LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) {
00319
00320 BOOST_FOREACH( LinkDescriptor* lp, links )
00321 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up && lp->keepAliveMissed == 0)
00322 return lp;
00323
00324 BOOST_FOREACH( LinkDescriptor* lp, links )
00325 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->keepAliveMissed == 0 )
00326 return lp;
00327 return NULL;
00328 }
00329
00331 void BaseOverlay::stabilizeLinks() {
00332
00333 BOOST_FOREACH( LinkDescriptor* ld, links ) {
00334 if (!ld->up) continue;
00335 OverlayMsg msg( OverlayMsg::typeLinkAlive,
00336 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
00337 if (ld->relayed) msg.setRouteRecord(true);
00338 send_link( &msg, ld->overlayId );
00339 }
00340
00341
00342 vector<LinkDescriptor*> oldlinks;
00343 time_t now = time(NULL);
00344 BOOST_FOREACH( LinkDescriptor* ld, links ) {
00345
00346
00347 if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) {
00348
00349
00350 ld->keepAliveMissed++;
00351
00352
00353 if (ld->keepAliveMissed > 4) {
00354 logging_info( "Link connection request is stale, closing: " << ld );
00355 oldlinks.push_back( ld );
00356 continue;
00357 }
00358 }
00359
00360 if (!ld->up) continue;
00361
00362
00363 if ( ld->relayed && !ld->communicationUp && ld->retryCounter > 0) {
00364 ld->retryCounter--;
00365 ld->communicationId = bc->establishLink( ld->endpoint );
00366 }
00367
00368
00369 if ( ld->relaying && difftime( now, ld->timeRelaying ) > 10)
00370 ld->relaying = false;
00371
00372
00373 if (ld->dropAfterRelaying && !ld->relaying && !ld->autolink) {
00374 oldlinks.push_back( ld );
00375 continue;
00376 }
00377
00378
00379 if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) {
00380 oldlinks.push_back( ld );
00381 continue;
00382 }
00383
00384
00385 if ( difftime( now, ld->keepAliveTime ) > 4 ) {
00386
00387
00388 ld->keepAliveMissed++;
00389
00390
00391 if (ld->keepAliveMissed >= 2) {
00392 logging_info( "Link is stale, closing: " << ld );
00393 oldlinks.push_back( ld );
00394 continue;
00395 }
00396 }
00397 }
00398
00399
00400 BOOST_FOREACH( LinkDescriptor* ld, oldlinks ) {
00401 logging_info( "Link timed out. Dropping " << ld );
00402 ld->relaying = false;
00403 dropLink( ld->overlayId );
00404 }
00405
00406
00407 counter++;
00408 if (counter>=4) showLinks();
00409 if (counter>=4 || counter<0) counter = 0;
00410 }
00411
00412
00413 std::string BaseOverlay::getLinkHTMLInfo() {
00414 std::ostringstream s;
00415 vector<NodeID> nodes;
00416 if (links.size()==0) {
00417 s << "<h2 style=\"color=#606060\">No links established!</h2>";
00418 } else {
00419 s << "<h2 style=\"color=#606060\">Links</h2>";
00420 s << "<table width=\"100%\" cellpadding=\"0\" border=\"0\" cellspacing=\"0\">";
00421 s << "<tr style=\"background-color=#ffe0e0\">";
00422 s << "<td><b>Link ID</b></td><td><b>Remote ID</b></td><td><b>Relay path</b></td>";
00423 s << "</tr>";
00424
00425 int i=0;
00426 BOOST_FOREACH( LinkDescriptor* ld, links ) {
00427 if (!ld->isVital() || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) continue;
00428 bool found = false;
00429 BOOST_FOREACH(NodeID& id, nodes)
00430 if (id == ld->remoteNode) found = true;
00431 if (found) continue;
00432 i++;
00433 nodes.push_back(ld->remoteNode);
00434 if ((i%1) == 1) s << "<tr style=\"background-color=#f0f0f0;\">";
00435 else s << "<tr>";
00436 s << "<td>" << ld->overlayId.toString().substr(0,4) << "..</td>";
00437 s << "<td>" << ld->remoteNode.toString().substr(0,4) << "..</td>";
00438 s << "<td>";
00439 if (ld->routeRecord.size()>1 && ld->relayed) {
00440 for (size_t i=1; i<ld->routeRecord.size(); i++)
00441 s << ld->routeRecord[ld->routeRecord.size()-i-1].toString().substr(0,4) << ".. ";
00442 } else {
00443 s << "Direct";
00444 }
00445 s << "</td>";
00446 s << "</tr>";
00447 }
00448 s << "</table>";
00449 }
00450 return s.str();
00451 }
00452
00454 void BaseOverlay::showLinks() {
00455 int i=0;
00456 logging_info("--- link state -------------------------------");
00457 BOOST_FOREACH( LinkDescriptor* ld, links ) {
00458 string epd = "";
00459 if (ld->isDirectVital())
00460 epd = getEndpointDescriptor(ld->remoteNode).toString();
00461
00462 logging_info("LINK_STATE: " << i << ": " << ld << " " << epd);
00463 i++;
00464 }
00465 logging_info("----------------------------------------------");
00466 }
00467
00469 int BaseOverlay::compare( const LinkID& lhs, const LinkID& rhs ) {
00470 LinkDescriptor* lhsld = getDescriptor(lhs);
00471 LinkDescriptor* rhsld = getDescriptor(rhs);
00472 if (lhsld==NULL || rhsld==NULL
00473 || !lhsld->up || !rhsld->up
00474 || lhsld->remoteNode != rhsld->remoteNode) return -1;
00475
00476 if ((lhsld->remoteLink^lhsld->overlayId)<(rhsld->remoteLink^lhsld->overlayId) )
00477 return -1;
00478
00479 return 1;
00480 }
00481
00482
00483
00484
00486 void BaseOverlay::route( OverlayMsg* message ) {
00487
00488
00489 if (message->getNumHops() > message->getTimeToLive()) {
00490 logging_warn("Message exceeded TTL. Dropping message and relay routes"
00491 "for recovery.");
00492 removeRelayNode(message->getDestinationNode());
00493 return;
00494 }
00495
00496
00497 else {
00498
00499 if (message->getDestinationNode() == nodeId) {
00500 logging_warn("Usually I should not route messages to myself!");
00501 Message msg;
00502 msg.encapsulate(message);
00503 handleMessage( &msg, NULL );
00504 } else {
00505
00506 send( message, message->getDestinationNode() );
00507 }
00508 }
00509 }
00510
00512 seqnum_t BaseOverlay::send( OverlayMsg* message, const NodeID& destination ) {
00513 LinkDescriptor* next_link = NULL;
00514
00515
00516 if (destination.isUnspecified()) return -1;
00517
00518
00519 if (destination == nodeId) {
00520 logging_warn("Sent message to myself. Handling message.")
00521 Message msg;
00522 msg.encapsulate(message);
00523 handleMessage( &msg, NULL );
00524 return -1;
00525 }
00526
00527
00528 if (message->isRelayed()) {
00529 next_link = getRelayLinkTo( destination );
00530 if (next_link != NULL) {
00531 next_link->setRelaying();
00532 return bc->sendMessage(next_link->communicationId, message);
00533 } else {
00534 logging_warn("Could not send message. No relay hop found to "
00535 << destination << " -- trying to route over overlay paths ...")
00536
00537
00538 }
00539 }
00540
00541
00542 LinkID next_id = overlayInterface->getNextLinkId( destination );
00543 if (next_id.isUnspecified()) {
00544 logging_warn("Could not send message. No next hop found to " <<
00545 destination );
00546 logging_error("ERROR: " << debugInformation() );
00547 return -1;
00548 }
00549
00550
00551 next_link = getDescriptor(next_id);
00552 if (next_link != NULL && next_link->up) {
00553
00554 return send(message, next_link);
00555 }
00556
00557
00558 else {
00559 logging_warn("Could not send message. Link not known or up");
00560 logging_error("ERROR: " << debugInformation() );
00561 return -1;
00562 }
00563
00564
00565 return -1;
00566 }
00567
00569 seqnum_t BaseOverlay::send( OverlayMsg* message, LinkDescriptor* ldr, bool ignore_down ) {
00570
00571 if (ldr == NULL) {
00572 logging_error("Can not send message to " << message->getDestinationAddress());
00573 return -1;
00574 }
00575
00576
00577 if (!ldr->up && !ignore_down) {
00578 logging_error("Can not send message. Link not up:" << ldr );
00579 logging_error("DEBUG_INFO: " << debugInformation() );
00580 return -1;
00581 }
00582 LinkDescriptor* ld = NULL;
00583
00584
00585 if (ldr->relayed) {
00586 logging_debug("Resolving direct link for relayed link to "
00587 << ldr->remoteNode);
00588 ld = getRelayLinkTo( ldr->remoteNode );
00589 if (ld==NULL) {
00590 logging_error("No relay path found to link " << ldr );
00591 logging_error("DEBUG_INFO: " << debugInformation() );
00592 return -1;
00593 }
00594 ld->setRelaying();
00595 message->setRelayed(true);
00596 } else
00597 ld = ldr;
00598
00599
00600 if (ld->communicationUp) {
00601 logging_debug("send(): Sending message over direct link.");
00602 return bc->sendMessage( ld->communicationId, message );
00603 } else {
00604 logging_error("send(): Could not send message. "
00605 "Not a relayed link and direct link is not up.");
00606 return -1;
00607 }
00608 return -1;
00609 }
00610
00611 seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote,
00612 const ServiceID& service) {
00613 message->setSourceNode(nodeId);
00614 message->setDestinationNode(remote);
00615 message->setService(service);
00616 return send( message, remote );
00617 }
00618
00619 seqnum_t BaseOverlay::send_link( OverlayMsg* message, const LinkID& link,bool ignore_down ) {
00620 LinkDescriptor* ld = getDescriptor(link);
00621 if (ld==NULL) {
00622 logging_error("Cannot find descriptor to link id=" << link.toString());
00623 return -1;
00624 }
00625 message->setSourceNode(nodeId);
00626 message->setDestinationNode(ld->remoteNode);
00627
00628 message->setSourceLink(ld->overlayId);
00629 message->setDestinationLink(ld->remoteLink);
00630
00631 message->setService(ld->service);
00632 message->setRelayed(ld->relayed);
00633 return send( message, ld, ignore_down );
00634 }
00635
00636
00637
00639 void BaseOverlay::stabilizeRelays() {
00640 vector<relay_route>::iterator i = relay_routes.begin();
00641 while (i!=relay_routes.end() ) {
00642 relay_route& route = *i;
00643 LinkDescriptor* ld = getDescriptor(route.link);
00644
00645
00646 if (ld==NULL
00647 || !ld->isDirectVital()
00648 || difftime(route.used, time(NULL)) > 8) {
00649 logging_info("Forgetting relay information to node "
00650 << route.node.toString() );
00651 i = relay_routes.erase(i);
00652 } else
00653 i++;
00654 }
00655 }
00656
00657 void BaseOverlay::removeRelayLink( const LinkID& link ) {
00658 vector<relay_route>::iterator i = relay_routes.begin();
00659 while (i!=relay_routes.end() ) {
00660 relay_route& route = *i;
00661 if (route.link == link ) i = relay_routes.erase(i); else i++;
00662 }
00663 }
00664
00665 void BaseOverlay::removeRelayNode( const NodeID& remote ) {
00666 vector<relay_route>::iterator i = relay_routes.begin();
00667 while (i!=relay_routes.end() ) {
00668 relay_route& route = *i;
00669 if (route.node == remote ) i = relay_routes.erase(i); else i++;
00670 }
00671 }
00672
00674 void BaseOverlay::refreshRelayInformation( const OverlayMsg* message, LinkDescriptor* ld ) {
00675
00676
00677 if (ld == NULL
00678 || ld->relayed
00679 || message->getSourceNode()==nodeId ) return;
00680
00681
00682 if (message->isRelayed()) {
00683
00684 BOOST_FOREACH( relay_route& route, relay_routes ) {
00685
00686 if ( route.node == message->getDestinationNode() ) {
00687 ld->setRelaying();
00688 route.used = time(NULL);
00689 }
00690 }
00691
00692 }
00693
00694
00695 if (message->isRegisterRelay()) {
00696
00697 ld->setRelaying();
00698
00699
00700 BOOST_FOREACH( relay_route& route, relay_routes ) {
00701
00702
00703 if ( route.node == message->getSourceNode() ) {
00704
00705
00706 route.used = time(NULL);
00707 LinkDescriptor* rld = getDescriptor(route.link);
00708
00709
00710 if (route.hops > message->getNumHops()
00711 || rld == NULL
00712 || !rld->isDirectVital()) {
00713 logging_info("Updating relay information to node "
00714 << route.node.toString()
00715 << " reducing to " << message->getNumHops() << " hops.");
00716 route.hops = message->getNumHops();
00717 route.link = ld->overlayId;
00718 }
00719 return;
00720 }
00721 }
00722
00723
00724 relay_route route;
00725 route.hops = message->getNumHops();
00726 route.link = ld->overlayId;
00727 route.node = message->getSourceNode();
00728 route.used = time(NULL);
00729 logging_info("Remembering relay information to node "
00730 << route.node.toString());
00731 relay_routes.push_back(route);
00732 }
00733 }
00734
00736 LinkDescriptor* BaseOverlay::getRelayLinkTo( const NodeID& remote ) {
00737
00738 BOOST_FOREACH( relay_route& route, relay_routes ) {
00739 if (route.node == remote ) {
00740 LinkDescriptor* ld = getDescriptor( route.link );
00741 if (ld==NULL || !ld->isDirectVital()) return NULL; else {
00742 route.used = time(NULL);
00743 return ld;
00744 }
00745 }
00746 }
00747 return NULL;
00748 }
00749
00750
00751
00752
00753
00754 use_logging_cpp(BaseOverlay);
00755
00756
00757
00758 BaseOverlay::BaseOverlay() :
00759 started(false),state(BaseOverlayStateInvalid),
00760 bc(NULL),
00761 nodeId(NodeID::UNSPECIFIED), spovnetId(SpoVNetID::UNSPECIFIED),
00762 sideport(&SideportListener::DEFAULT), overlayInterface(NULL),
00763 counter(0) {
00764 initDHT();
00765 }
00766
00767 BaseOverlay::~BaseOverlay() {
00768 destroyDHT();
00769 }
00770
00771
00772
00773 void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) {
00774 logging_info("Starting...");
00775
00776
00777 bc = &_basecomm;
00778 nodeId = _nodeid;
00779
00780
00781 bc->registerMessageReceiver( this );
00782 bc->registerEventListener( this );
00783
00784
00785 Timer::setInterval( 1000 );
00786 Timer::start();
00787
00788 started = true;
00789 state = BaseOverlayStateInvalid;
00790 }
00791
00792 void BaseOverlay::stop() {
00793 logging_info("Stopping...");
00794
00795
00796 Timer::stop();
00797
00798
00799 if(overlayInterface != NULL) {
00800 delete overlayInterface;
00801 overlayInterface = NULL;
00802 }
00803
00804
00805 bc->unregisterMessageReceiver( this );
00806 bc->unregisterEventListener( this );
00807
00808 started = false;
00809 state = BaseOverlayStateInvalid;
00810 }
00811
00812 bool BaseOverlay::isStarted(){
00813 return started;
00814 }
00815
00816
00817
00818 void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
00819 const EndpointDescriptor& bootstrapEp) {
00820
00821 if(id != spovnetId){
00822 logging_error("attempt to join against invalid spovnet, call initiate first");
00823 return;
00824 }
00825
00826
00827 logging_info( "Starting to join spovnet " << id.toString() <<
00828 " with nodeid " << nodeId.toString());
00829
00830 if(bootstrapEp.isUnspecified() && state == BaseOverlayStateInvalid){
00831
00832
00833
00834
00835 logging_info("joining spovnet locally");
00836
00837 overlayInterface->joinOverlay();
00838 state = BaseOverlayStateCompleted;
00839 BOOST_FOREACH( NodeListener* i, nodeListeners )
00840 i->onJoinCompleted( spovnetId );
00841
00842
00843
00844
00845 } else {
00846
00847
00848
00849
00850 logging_info("joining spovnet remotely against " << bootstrapEp.toString());
00851
00852 const LinkID& lnk = bc->establishLink( bootstrapEp );
00853 bootstrapLinks.push_back(lnk);
00854 logging_info("join process initiated for " << id.toString() << "...");
00855 }
00856 }
00857
00858
00859 void BaseOverlay::startBootstrapModules(vector<pair<BootstrapManager::BootstrapType,string> > modules){
00860 logging_debug("starting overlay bootstrap module");
00861 overlayBootstrap.start(this, spovnetId, nodeId, modules);
00862 overlayBootstrap.publish(bc->getEndpointDescriptor());
00863 }
00864
00865 void BaseOverlay::stopBootstrapModules(){
00866 logging_debug("stopping overlay bootstrap module");
00867 overlayBootstrap.stop();
00868 overlayBootstrap.revoke();
00869 }
00870
00871 void BaseOverlay::leaveSpoVNet() {
00872
00873 logging_info( "Leaving spovnet " << spovnetId );
00874 bool ret = ( state != this->BaseOverlayStateInvalid );
00875
00876 logging_debug( "Dropping all auto-links" );
00877
00878
00879 vector<LinkID> servicelinks;
00880 BOOST_FOREACH( LinkDescriptor* ld, links ) {
00881 if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
00882 servicelinks.push_back( ld->overlayId );
00883 }
00884
00885
00886 BOOST_FOREACH( LinkID lnk, servicelinks )
00887 dropLink( lnk );
00888
00889
00890 logging_debug( "Leaving overlay" );
00891 if( overlayInterface != NULL )
00892 overlayInterface->leaveOverlay();
00893
00894
00895 BOOST_FOREACH( LinkID lnk, bootstrapLinks )
00896 bc->dropLink( lnk );
00897
00898
00899 state = BaseOverlayStateInvalid;
00900
00901
00902 visual.visShutdown(visualIdOverlay, nodeId, "");
00903 visual.visShutdown(visualIdBase, nodeId, "");
00904
00905
00906 BOOST_FOREACH( NodeListener* i, nodeListeners ) {
00907 if( ret ) i->onLeaveCompleted( spovnetId );
00908 else i->onLeaveFailed( spovnetId );
00909 }
00910 }
00911
00912 void BaseOverlay::createSpoVNet(const SpoVNetID& id,
00913 const OverlayParameterSet& param,
00914 const SecurityParameterSet& sec,
00915 const QoSParameterSet& qos) {
00916
00917
00918
00919 logging_info( "creating spovnet " + id.toString() <<
00920 " with nodeid " << nodeId.toString() );
00921
00922 spovnetId = id;
00923
00924 overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
00925 if( overlayInterface == NULL ) {
00926 logging_fatal( "overlay structure not supported" );
00927 state = BaseOverlayStateInvalid;
00928
00929 BOOST_FOREACH( NodeListener* i, nodeListeners )
00930 i->onJoinFailed( spovnetId );
00931
00932 return;
00933 }
00934
00935 visual.visCreate(visualIdBase, nodeId, "", "");
00936 visual.visCreate(visualIdOverlay, nodeId, "", "");
00937 }
00938
00939
00940
00941 const LinkID BaseOverlay::establishLink( const EndpointDescriptor& remoteEp,
00942 const NodeID& remoteId, const ServiceID& service ) {
00943
00944
00945 if (!remoteId.isUnspecified())
00946 return establishLink( remoteId, service );
00947 else
00948 return establishDirectLink(remoteEp, service );
00949 }
00950
00952 const LinkID BaseOverlay::establishDirectLink( const EndpointDescriptor& ep,
00953 const ServiceID& service ) {
00954
00956 if( !communicationListeners.contains( service ) ) {
00957 logging_error( "No listener registered for service id=" << service.toString() );
00958 return LinkID::UNSPECIFIED;
00959 }
00960 CommunicationListener* listener = communicationListeners.get( service );
00961 assert( listener != NULL );
00962
00963
00964 LinkDescriptor* ld = addDescriptor();
00965 ld->relayed = false;
00966 ld->listener = listener;
00967 ld->service = service;
00968 ld->communicationId = bc->establishLink( ep );
00969
00971 logging_info("Establishing direct link " << ld->communicationId.toString()
00972 << " using " << ep.toString());
00973
00974 return ld->communicationId;
00975 }
00976
00978 const LinkID BaseOverlay::establishLink( const NodeID& remote,
00979 const ServiceID& service ) {
00980
00981
00982 if (remote == nodeId) return LinkID::UNSPECIFIED;
00983
00984
00985 LinkDescriptor* ld = addDescriptor();
00986 ld->relayed = true;
00987 ld->remoteNode = remote;
00988 ld->service = service;
00989 ld->listener = getListener(ld->service);
00990
00991
00992 OverlayMsg msg(OverlayMsg::typeLinkRequest, service, nodeId, remote );
00993 msg.setSourceLink(ld->overlayId);
00994
00995
00996 msg.setRelayed(true);
00997 msg.setRegisterRelay(true);
00998
00999
01000 logging_info(
01001 "Sending link request with"
01002 << " link=" << ld->overlayId.toString()
01003 << " node=" << ld->remoteNode.toString()
01004 << " serv=" << ld->service.toString()
01005 );
01006
01007
01008 send_node( &msg, ld->remoteNode, ld->service );
01009
01010 return ld->overlayId;
01011 }
01012
01014 void BaseOverlay::dropLink(const LinkID& link) {
01015 logging_info( "Dropping link (initiated locally):" << link.toString() );
01016
01017
01018 LinkDescriptor* ld = getDescriptor(link);
01019 if( ld == NULL ) {
01020 logging_warn( "Can't drop link, link is unknown!");
01021 return;
01022 }
01023
01024
01025 if( ld->messageQueue.size() > 0 ) {
01026 logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
01027 << ld->messageQueue.size() << " waiting messages" );
01028 ld->flushQueue();
01029 }
01030
01031
01032 if(ld->listener != NULL)
01033 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
01034 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
01035
01036
01037 if (!ld->relaying) {
01038
01039 if (ld->communicationUp) bc->dropLink( ld->communicationId );
01040
01041
01042 eraseDescriptor( ld->overlayId );
01043 } else {
01044 ld->dropAfterRelaying = true;
01045 }
01046 }
01047
01048
01049
01051 seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) {
01052 logging_debug( "Sending data message on link " << link.toString() );
01053
01054
01055 LinkDescriptor* ld = getDescriptor(link);
01056 if( ld == NULL ) {
01057 logging_error("Could not send message. "
01058 << "Link not found id=" << link.toString());
01059 return -1;
01060 }
01061
01062
01063 if( !ld->up ) {
01064 ld->setAutoUsed();
01065 if( ld->autolink ) {
01066 logging_info("Auto-link " << link.toString() << " not up, queue message");
01067 Data data = data_serialize( message );
01068 const_cast<Message*>(message)->dropPayload();
01069 ld->messageQueue.push_back( new Message(data) );
01070 } else {
01071 logging_error("Link " << link.toString() << " not up, drop message");
01072 }
01073 return -1;
01074 }
01075
01076
01077 OverlayMsg overmsg( OverlayMsg::typeData );
01078 overmsg.encapsulate( const_cast<Message*>(message) );
01079
01080
01081 return send_link( &overmsg, ld->overlayId );
01082 }
01083
01084 seqnum_t BaseOverlay::sendMessage(const Message* message,
01085 const NodeID& node, const ServiceID& service) {
01086
01087
01088 LinkDescriptor* ld = getAutoDescriptor( node, service );
01089
01090
01091 if( ld == NULL ) {
01092
01093
01094 logging_info( "No link to send message to node "
01095 << node.toString() << " found for service "
01096 << service.toString() << ". Creating auto link ..."
01097 );
01098
01099
01100 LinkID link = establishLink( node, service );
01101 ld = getDescriptor( link );
01102 if( ld == NULL ) {
01103 logging_error( "Failed to establish auto-link.");
01104 return -1;
01105 }
01106 ld->autolink = true;
01107
01108 logging_debug( "Auto-link establishment in progress to node "
01109 << node.toString() << " with link id=" << link.toString() );
01110 }
01111 assert(ld != NULL);
01112
01113
01114 ld->setAutoUsed();
01115
01116
01117 return sendMessage( message, ld->overlayId );
01118 }
01119
01120
01121
01122 const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
01123 const LinkID& link) const {
01124
01125
01126 if( link.isUnspecified() )
01127 return bc->getEndpointDescriptor();
01128
01129
01130 const LinkDescriptor* ld = getDescriptor(link);
01131 if (ld==NULL) return EndpointDescriptor::UNSPECIFIED();
01132
01133
01134 return bc->getEndpointDescriptor( ld->communicationId );
01135 }
01136
01137 const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
01138 const NodeID& node) const {
01139
01140
01141 if( node == nodeId || node.isUnspecified() ) {
01142
01143 return bc->getEndpointDescriptor();
01144 }
01145
01146
01147 if( overlayInterface == NULL ) {
01148 logging_error( "Overlay interface not set, cannot resolve end-point." );
01149 return EndpointDescriptor::UNSPECIFIED();
01150 }
01151
01152
01153
01154
01155
01156
01157 BOOST_FOREACH(const LinkDescriptor* ld, links){
01158 if(ld->remoteNode != node) continue;
01159 if(!ld->communicationUp) continue;
01160 const EndpointDescriptor& ep =
01161 bc->getEndpointDescriptor(ld->communicationId);
01162 if(ep != EndpointDescriptor::UNSPECIFIED()) {
01163
01164 return ep;
01165 }
01166 }
01167
01168 logging_warn( "No EndpointDescriptor found for node " << node );
01169 logging_warn( const_cast<BaseOverlay*>(this)->debugInformation() );
01170
01171 return EndpointDescriptor::UNSPECIFIED();
01172 }
01173
01174
01175
01176 bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
01177 sideport = _sideport;
01178 _sideport->configure( this );
01179 return true;
01180 }
01181
01182 bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
01183 sideport = &SideportListener::DEFAULT;
01184 return true;
01185 }
01186
01187
01188
01189 bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
01190 logging_debug( "binding communication listener " << listener
01191 << " on serviceid " << sid.toString() );
01192
01193 if( communicationListeners.contains( sid ) ) {
01194 logging_error( "some listener already registered for service id "
01195 << sid.toString() );
01196 return false;
01197 }
01198
01199 communicationListeners.registerItem( listener, sid );
01200 return true;
01201 }
01202
01203
01204 bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
01205 logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );
01206
01207 if( !communicationListeners.contains( sid ) ) {
01208 logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
01209 return false;
01210 }
01211
01212 if( communicationListeners.get(sid) != listener ) {
01213 logging_warn( "listener bound to service id " << sid.toString()
01214 << " is different than listener trying to unbind" );
01215 return false;
01216 }
01217
01218 communicationListeners.unregisterItem( sid );
01219 return true;
01220 }
01221
01222
01223
01224 bool BaseOverlay::bind(NodeListener* listener) {
01225 logging_debug( "Binding node listener " << listener );
01226
01227
01228 NodeListenerVector::iterator i =
01229 find( nodeListeners.begin(), nodeListeners.end(), listener );
01230 if( i != nodeListeners.end() ) {
01231 logging_warn("Node listener " << listener << " is already bound!" );
01232 return false;
01233 }
01234
01235
01236 nodeListeners.push_back( listener );
01237 return true;
01238 }
01239
01240 bool BaseOverlay::unbind(NodeListener* listener) {
01241 logging_debug( "Unbinding node listener " << listener );
01242
01243
01244 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
01245 if( i == nodeListeners.end() ) {
01246 logging_warn( "Node listener " << listener << " is not bound!" );
01247 return false;
01248 }
01249
01250
01251 nodeListeners.erase( i );
01252 return true;
01253 }
01254
01255
01256
01257 void BaseOverlay::onLinkUp(const LinkID& id,
01258 const address_v* local, const address_v* remote) {
01259 logging_debug( "Link up with base communication link id=" << id );
01260
01261
01262 LinkDescriptor* ld = getDescriptor(id, true);
01263
01264
01265 if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
01266 logging_info(
01267 "Join has been initiated by me and the link is now up. " <<
01268 "Sending out join request for SpoVNet " << spovnetId.toString()
01269 );
01270
01271
01272 OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest,
01273 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
01274 JoinRequest joinRequest( spovnetId, nodeId );
01275 overlayMsg.encapsulate( &joinRequest );
01276 bc->sendMessage( id, &overlayMsg );
01277 return;
01278 }
01279
01280
01281 if (ld == NULL) {
01282 ld = addDescriptor( id );
01283 logging_info( "onLinkUp (remote request) descriptor: " << ld );
01284
01285
01286 ld->fromRemote = true;
01287 ld->communicationId = id;
01288 ld->communicationUp = true;
01289 ld->setAutoUsed();
01290 ld->setAlive();
01291
01292
01293
01294
01295
01296 } else {
01297 logging_info( "onLinkUp descriptor (initiated locally):" << ld );
01298
01299
01300 ld->setAutoUsed();
01301 ld->setAlive();
01302 ld->communicationUp = true;
01303 ld->fromRemote = false;
01304
01305
01306 if (ld->relayed) {
01307 logging_info( "Converting to direct link: " << ld );
01308 ld->up = true;
01309 ld->relayed = false;
01310 OverlayMsg overMsg( OverlayMsg::typeLinkDirect );
01311 overMsg.setSourceLink( ld->overlayId );
01312 overMsg.setDestinationLink( ld->remoteLink );
01313 send_link( &overMsg, ld->overlayId );
01314 } else {
01315
01316 logging_info( "Sending out update" <<
01317 " for service " << ld->service.toString() <<
01318 " with local node id " << nodeId.toString() <<
01319 " on link " << ld->overlayId.toString() );
01320
01321
01322 OverlayMsg overlayMsg( OverlayMsg::typeLinkUpdate );
01323 overlayMsg.setSourceLink(ld->overlayId);
01324 overlayMsg.setAutoLink( ld->autolink );
01325 send_link( &overlayMsg, ld->overlayId, true );
01326 }
01327 }
01328 }
01329
01330 void BaseOverlay::onLinkDown(const LinkID& id,
01331 const address_v* local, const address_v* remote) {
01332
01333
01334 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
01335 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
01336
01337
01338 LinkDescriptor* ld = getDescriptor(id, true);
01339 if ( ld == NULL ) return;
01340 logging_info( "onLinkDown descriptor: " << ld );
01341
01342
01343 removeRelayLink(ld->overlayId);
01344
01345
01346 ld->communicationUp = false;
01347 if (!ld->service.isUnspecified()) {
01348 CommunicationListener* lst = getListener(ld->service);
01349 if(lst != NULL) lst->onLinkDown( ld->overlayId, ld->remoteNode );
01350 sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
01351 }
01352
01353
01354 if( ld->messageQueue.size() > 0 ) {
01355 logging_warn( "Dropping link " << id.toString() << " that has "
01356 << ld->messageQueue.size() << " waiting messages" );
01357 ld->flushQueue();
01358 }
01359
01360
01361 eraseDescriptor(ld->overlayId);
01362 }
01363
01364 void BaseOverlay::onLinkChanged(const LinkID& id,
01365 const address_v* oldlocal, const address_v* newlocal,
01366 const address_v* oldremote, const address_v* newremote) {
01367
01368
01369 LinkDescriptor* ld = getDescriptor(id, true);
01370 if ( ld == NULL ) return;
01371 logging_debug( "onLinkChanged descriptor: " << ld );
01372
01373
01374 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
01375 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
01376
01377
01378 ld->setAutoUsed();
01379 }
01380
01381 void BaseOverlay::onLinkFail(const LinkID& id,
01382 const address_v* local, const address_v* remote) {
01383 logging_debug( "Link fail with base communication link id=" << id );
01384
01385
01386 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
01387 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
01388
01389
01390 LinkDescriptor* ld = getDescriptor(id, true);
01391 if ( ld == NULL ) return;
01392 logging_debug( "Link failed id=" << ld->overlayId.toString() );
01393
01394
01395 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
01396 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
01397 }
01398
01399 void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local,
01400 const address_v* remote, const QoSParameterSet& qos) {
01401 logging_debug( "Link quality changed with base communication link id=" << id );
01402
01403
01404 LinkDescriptor* ld = getDescriptor(id, true);
01405 if ( ld == NULL ) return;
01406 logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
01407 }
01408
01409 bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local,
01410 const address_v* remote ) {
01411 logging_debug("Accepting link request from " << remote->to_string() );
01412 return true;
01413 }
01414
01416 bool BaseOverlay::receiveMessage(const Message* message,
01417 const LinkID& link, const NodeID& ) {
01418
01419 LinkDescriptor* ld = getDescriptor( link, true );
01420 return handleMessage( message, ld, link );
01421 }
01422
01423
01424
01426 bool BaseOverlay::handleJoinRequest( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
01427
01428
01429 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
01430 logging_info( "Received join request for spovnet " <<
01431 joinReq->getSpoVNetID().toString() );
01432
01433
01434 if( joinReq->getSpoVNetID() != spovnetId ) {
01435 logging_error(
01436 "Received join request for spovnet we don't handle " <<
01437 joinReq->getSpoVNetID().toString() );
01438 return false;
01439 }
01440
01441
01442 bool allow = true;
01443 logging_info( "Sending join reply for spovnet " <<
01444 spovnetId.toString() << " to node " <<
01445 overlayMsg->getSourceNode().toString() <<
01446 ". Result: " << (allow ? "allowed" : "denied") );
01447 joiningNodes.push_back( overlayMsg->getSourceNode() );
01448
01449
01450 assert( overlayInterface != NULL );
01451 logging_debug( "Using bootstrap end-point "
01452 << getEndpointDescriptor().toString() )
01453 OverlayParameterSet parameters = overlayInterface->getParameters();
01454 OverlayMsg retmsg( OverlayMsg::typeJoinReply,
01455 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
01456 JoinReply replyMsg( spovnetId, parameters,
01457 allow, getEndpointDescriptor() );
01458 retmsg.encapsulate(&replyMsg);
01459 bc->sendMessage( bcLink, &retmsg );
01460
01461 return true;
01462 }
01463
01465 bool BaseOverlay::handleJoinReply( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
01466
01467 logging_debug("received join reply message");
01468 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
01469
01470
01471 if( replyMsg->getSpoVNetID() != spovnetId ) {
01472 logging_error( "Received SpoVNet join reply for " <<
01473 replyMsg->getSpoVNetID().toString() <<
01474 " != " << spovnetId.toString() );
01475 delete replyMsg;
01476 return false;
01477 }
01478
01479
01480 if( !replyMsg->getJoinAllowed() ) {
01481 logging_error( "Our join request has been denied" );
01482
01483
01484 if( !bcLink.isUnspecified() ){
01485 bc->dropLink( bcLink );
01486
01487 vector<LinkID>::iterator it = std::find(
01488 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
01489 if( it != bootstrapLinks.end() )
01490 bootstrapLinks.erase(it);
01491 }
01492
01493
01494 BOOST_FOREACH( NodeListener* i, nodeListeners )
01495 i->onJoinFailed( spovnetId );
01496
01497 delete replyMsg;
01498 return true;
01499 }
01500
01501
01502 logging_info("Join request has been accepted for spovnet " <<
01503 spovnetId.toString() );
01504
01505 logging_debug( "Using bootstrap end-point "
01506 << replyMsg->getBootstrapEndpoint().toString() );
01507
01508
01509
01510 if( overlayInterface == NULL ){
01511
01512 logging_debug("first-time bootstrapping");
01513
01514 overlayInterface = OverlayFactory::create(
01515 *this, replyMsg->getParam(), nodeId, this );
01516
01517
01518 if( overlayInterface == NULL ) {
01519 logging_error( "overlay structure not supported" );
01520
01521 if( !bcLink.isUnspecified() ){
01522 bc->dropLink( bcLink );
01523
01524 vector<LinkID>::iterator it = std::find(
01525 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
01526 if( it != bootstrapLinks.end() )
01527 bootstrapLinks.erase(it);
01528 }
01529
01530
01531 BOOST_FOREACH( NodeListener* i, nodeListeners )
01532 i->onJoinFailed( spovnetId );
01533
01534 delete replyMsg;
01535 return true;
01536 }
01537
01538
01539 state = BaseOverlayStateCompleted;
01540 overlayInterface->createOverlay();
01541
01542 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
01543 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
01544
01545
01546
01547
01548
01549 BOOST_FOREACH( NodeListener* i, nodeListeners )
01550 i->onJoinCompleted( spovnetId );
01551
01552 delete replyMsg;
01553
01554 } else {
01555
01556
01557 logging_debug("not first-time bootstrapping");
01558 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
01559 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
01560
01561 delete replyMsg;
01562
01563 }
01564
01565 return true;
01566 }
01567
01568
01569 bool BaseOverlay::handleData( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
01570
01571 const ServiceID& service = overlayMsg->getService();
01572 logging_debug( "Received data for service " << service.toString()
01573 << " on link " << overlayMsg->getDestinationLink().toString() );
01574
01575
01576 CommunicationListener* lst = getListener(service);
01577 if(lst != NULL){
01578 lst->onMessage(
01579 overlayMsg,
01580 overlayMsg->getSourceNode(),
01581 overlayMsg->getDestinationLink()
01582 );
01583 }
01584
01585 return true;
01586 }
01587
01588
01589 bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
01590
01591 if( ld == NULL ) {
01592 logging_warn( "received overlay update message for link for "
01593 << "which we have no mapping" );
01594 return false;
01595 }
01596 logging_info("Received type update message on link " << ld );
01597
01598
01599 bool changed =
01600 ( ld->remoteNode != overlayMsg->getSourceNode() )
01601 || ( ld->service != overlayMsg->getService() );
01602
01603
01604 ld->up = true;
01605 ld->remoteNode = overlayMsg->getSourceNode();
01606 ld->remoteLink = overlayMsg->getSourceLink();
01607 ld->service = overlayMsg->getService();
01608 ld->autolink = overlayMsg->isAutoLink();
01609
01610
01611 if( changed ) {
01612 overlayMsg->swapRoles();
01613 overlayMsg->setSourceNode(nodeId);
01614 overlayMsg->setSourceLink(ld->overlayId);
01615 overlayMsg->setService(ld->service);
01616 send( overlayMsg, ld );
01617 }
01618
01619
01620 if( !communicationListeners.contains( ld->service ) ) {
01621 logging_warn( "Link up: event listener has not been registered" );
01622 return false;
01623 }
01624
01625
01626 CommunicationListener* listener = communicationListeners.get( ld->service );
01627 if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
01628 logging_warn("Link up: event listener is default or null!" );
01629 return true;
01630 }
01631
01632
01633 ld->listener = listener;
01634 ld->setAutoUsed();
01635 ld->setAlive();
01636
01637
01638 if( !listener->onLinkRequest(ld->remoteNode) ) {
01639
01640 logging_debug("Link id=" << ld->overlayId.toString() <<
01641 " has been denied by service " << ld->service.toString() << ", dropping link");
01642
01643
01644 ld->listener = &CommunicationListener::DEFAULT;
01645
01646
01647 dropLink( ld->overlayId );
01648 return true;
01649 }
01650
01651
01652 ld->up = true;
01653 logging_info( "Link has been accepted by service and is up: " << ld );
01654
01655
01656 if( ld->messageQueue.size() > 0 ) {
01657 logging_info( "Sending out queued messages on link " << ld );
01658 BOOST_FOREACH( Message* msg, ld->messageQueue ) {
01659 sendMessage( msg, ld->overlayId );
01660 delete msg;
01661 }
01662 ld->messageQueue.clear();
01663 }
01664
01665
01666 listener->onLinkUp( ld->overlayId, ld->remoteNode );
01667 sideport->onLinkUp( ld->overlayId, nodeId, ld->remoteNode, this->spovnetId );
01668
01669 return true;
01670 }
01671
01673 bool BaseOverlay::handleLinkRequest( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
01674 logging_info( "Link request received from node id=" << overlayMsg->getSourceNode() );
01675
01676
01677
01678
01679 LinkDescriptor* ldn = addDescriptor();
01680
01681
01682 ldn->up = true;
01683 ldn->fromRemote = true;
01684 ldn->relayed = true;
01685
01686
01687 ldn->service = overlayMsg->getService();
01688 ldn->listener = getListener(ldn->service);
01689 ldn->remoteNode = overlayMsg->getSourceNode();
01690 ldn->remoteLink = overlayMsg->getSourceLink();
01691
01692
01693 ldn->setAlive();
01694 ldn->setAutoUsed();
01695
01696
01697 overlayMsg->swapRoles();
01698 overlayMsg->setType(OverlayMsg::typeLinkReply);
01699 overlayMsg->setSourceLink(ldn->overlayId);
01700 overlayMsg->setSourceEndpoint( bc->getEndpointDescriptor() );
01701 overlayMsg->setRelayed(true);
01702 send( overlayMsg, ld );
01703
01704
01705 if(ldn != NULL && ldn->listener != NULL)
01706 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
01707
01708 return true;
01709 }
01710
01711 bool BaseOverlay::handleLinkReply( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
01712
01713
01714 LinkDescriptor* ldn = getDescriptor(overlayMsg->getDestinationLink());
01715
01716
01717 if (ldn == NULL) {
01718 logging_error( "No link request pending for "
01719 << overlayMsg->getDestinationLink().toString() );
01720 return false;
01721 }
01722 logging_debug("Handling link reply for " << ldn )
01723
01724
01725 if (ldn->up) {
01726 logging_warn( "Link already up: " << ldn );
01727 return true;
01728 }
01729
01730
01731 logging_debug( "Link request reply received. Establishing link"
01732 << " for service " << overlayMsg->getService().toString()
01733 << " with local id=" << overlayMsg->getDestinationLink()
01734 << " and remote link id=" << overlayMsg->getSourceLink()
01735 << " to " << overlayMsg->getSourceEndpoint().toString()
01736 );
01737
01738
01739 ldn->up = true;
01740 ldn->relayed = true;
01741 ldn->service = overlayMsg->getService();
01742 ldn->listener = getListener(ldn->service);
01743 ldn->remoteLink = overlayMsg->getSourceLink();
01744 ldn->remoteNode = overlayMsg->getSourceNode();
01745
01746
01747 ldn->setAlive();
01748 ldn->setAutoUsed();
01749
01750
01751 if( ldn->messageQueue.size() > 0 ) {
01752 logging_info( "Sending out queued messages on link " <<
01753 ldn->overlayId.toString() );
01754 BOOST_FOREACH( Message* msg, ldn->messageQueue ) {
01755 sendMessage( msg, ldn->overlayId );
01756 delete msg;
01757 }
01758 ldn->messageQueue.clear();
01759 }
01760
01761
01762 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
01763
01764
01765 ldn->retryCounter = 3;
01766 ldn->endpoint = overlayMsg->getSourceEndpoint();
01767 ldn->communicationId = bc->establishLink( ldn->endpoint );
01768
01769 return true;
01770 }
01771
01773 bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
01774 LinkDescriptor* rld = getDescriptor(overlayMsg->getDestinationLink());
01775 if ( rld != NULL ) {
01776 logging_debug("Keep-Alive for " <<
01777 overlayMsg->getDestinationLink() );
01778 if (overlayMsg->isRouteRecord())
01779 rld->routeRecord = overlayMsg->getRouteRecord();
01780 rld->setAlive();
01781 return true;
01782 } else {
01783 logging_error("Keep-Alive for "
01784 << overlayMsg->getDestinationLink() << ": link unknown." );
01785 return false;
01786 }
01787 }
01788
01790 bool BaseOverlay::handleLinkDirect( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
01791 logging_debug( "Received direct link replacement request" );
01792
01794 LinkDescriptor* rld = getDescriptor( overlayMsg->getDestinationLink() );
01795 if (rld == NULL || ld == NULL) {
01796 logging_error("Direct link replacement: Link "
01797 << overlayMsg->getDestinationLink() << "not found error." );
01798 return false;
01799 }
01800 logging_info( "Received direct link convert notification for " << rld );
01801
01802
01803 rld->communicationId = ld->communicationId;
01804 rld->communicationUp = true;
01805 rld->relayed = false;
01806
01807
01808 rld->setAlive();
01809 rld->setAutoUsed();
01810
01811
01812 eraseDescriptor(ld->overlayId);
01813 return true;
01814 }
01815
01817 bool BaseOverlay::handleMessage( const Message* message, LinkDescriptor* ld,
01818 const LinkID bcLink ) {
01819 logging_debug( "Handling message: " << message->toString());
01820
01821
01822 OverlayMsg* overlayMsg =
01823 const_cast<Message*>(message)->decapsulate<OverlayMsg>();
01824 if( overlayMsg == NULL ) return false;
01825
01826
01827 overlayMsg->increaseNumHops();
01828
01829
01830 refreshRelayInformation( overlayMsg, ld );
01831
01832
01833 overlayMsg->addRouteRecord(nodeId);
01834
01835
01836 if (overlayMsg->isDHTMessage())
01837 return handleDHTMessage(overlayMsg);
01838
01839
01840 if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart &&
01841 overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) {
01842 overlayInterface->onMessage(overlayMsg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED);
01843 delete overlayMsg;
01844 return true;
01845 }
01846
01847
01848 if (!overlayMsg->getDestinationNode().isUnspecified() &&
01849 overlayMsg->getDestinationNode() != nodeId ) {
01850 logging_debug("Routing message "
01851 << " from " << overlayMsg->getSourceNode()
01852 << " to " << overlayMsg->getDestinationNode()
01853 );
01854 route( overlayMsg );
01855 delete overlayMsg;
01856 return true;
01857 }
01858
01859
01860 if (overlayMsg->hasTypeMask( OverlayMsg::maskDHTResponse )) {
01861 bool ret = handleDHTMessage(overlayMsg);
01862 delete overlayMsg;
01863 return ret;
01864 }
01865
01866
01867 bool ret = false;
01868 switch ( overlayMsg->getType() ) {
01869
01870
01871 case OverlayMsg::typeData:
01872 ret = handleData(overlayMsg, ld); break;
01873
01874
01875 case OverlayMsg::typeJoinRequest:
01876 ret = handleJoinRequest(overlayMsg, bcLink ); break;
01877 case OverlayMsg::typeJoinReply:
01878 ret = handleJoinReply(overlayMsg, bcLink ); break;
01879
01880
01881 case OverlayMsg::typeLinkRequest:
01882 ret = handleLinkRequest(overlayMsg, ld ); break;
01883 case OverlayMsg::typeLinkReply:
01884 ret = handleLinkReply(overlayMsg, ld ); break;
01885 case OverlayMsg::typeLinkUpdate:
01886 ret = handleLinkUpdate(overlayMsg, ld ); break;
01887 case OverlayMsg::typeLinkAlive:
01888 ret = handleLinkAlive(overlayMsg, ld ); break;
01889 case OverlayMsg::typeLinkDirect:
01890 ret = handleLinkDirect(overlayMsg, ld ); break;
01891
01892
01893 default: {
01894 logging_error( "received message in invalid state! don't know " <<
01895 "what to do with this message of type " << overlayMsg->getType() );
01896 ret = false;
01897 break;
01898 }
01899 }
01900
01901
01902 delete overlayMsg;
01903 return ret;
01904 }
01905
01906
01907
01908 void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) {
01909
01910 logging_debug( "broadcasting message to all known nodes " <<
01911 "in the overlay from service " + service.toString() );
01912
01913 if(message == NULL) return;
01914 message->setReleasePayload(false);
01915
01916 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true);
01917 for(size_t i=0; i<nodes.size(); i++){
01918 NodeID& id = nodes.at(i);
01919 if(id == this->nodeId) continue;
01920 if(i+1 == nodes.size()) message->setReleasePayload(true);
01921 sendMessage( message, id, service );
01922 }
01923 }
01924
01926 vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const {
01927
01928 vector<NodeID> nodes = overlayInterface->getKnownNodes(deep);
01929 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
01930 if( i != nodes.end() ) nodes.erase( i );
01931 return nodes;
01932 }
01933
01934 const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
01935 if( lid == LinkID::UNSPECIFIED ) return nodeId;
01936 const LinkDescriptor* ld = getDescriptor(lid);
01937 if( ld == NULL ) return NodeID::UNSPECIFIED;
01938 else return ld->remoteNode;
01939 }
01940
01941 vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
01942 vector<LinkID> linkvector;
01943 BOOST_FOREACH( LinkDescriptor* ld, links ) {
01944 if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
01945 linkvector.push_back( ld->overlayId );
01946 }
01947 }
01948 return linkvector;
01949 }
01950
01951
01952 void BaseOverlay::onNodeJoin(const NodeID& node) {
01953 JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
01954 if( i == joiningNodes.end() ) return;
01955
01956 logging_info( "node has successfully joined baseoverlay and overlay structure "
01957 << node.toString() );
01958
01959 joiningNodes.erase( i );
01960 }
01961
01962 void BaseOverlay::eventFunction() {
01963 stabilizeRelays();
01964 stabilizeLinks();
01965 stabilizeDHT();
01966 updateVisual();
01967 }
01968
01969 void BaseOverlay::updateVisual(){
01970
01971
01972
01973
01974
01975 static NodeID pre = NodeID::UNSPECIFIED;
01976 static NodeID suc = NodeID::UNSPECIFIED;
01977
01978 vector<NodeID> nodes = this->getOverlayNeighbors(false);
01979
01980 if(nodes.size() == 0){
01981
01982 if(pre != NodeID::UNSPECIFIED){
01983 visual.visDisconnect(visualIdOverlay, this->nodeId, pre, "");
01984 pre = NodeID::UNSPECIFIED;
01985 }
01986 if(suc != NodeID::UNSPECIFIED){
01987 visual.visDisconnect(visualIdOverlay, this->nodeId, suc, "");
01988 suc = NodeID::UNSPECIFIED;
01989 }
01990
01991 }
01992
01993 if(nodes.size() == 1){
01994
01995
01996
01997
01998 if(pre != nodes.at(0)){
01999 pre = nodes.at(0);
02000 if(pre != NodeID::UNSPECIFIED)
02001 visual.visConnect(visualIdOverlay, this->nodeId, pre, "");
02002 }
02003 }
02004
02005 if(nodes.size() == 2){
02006
02007
02008 if(nodes.at(0) != pre){
02009 if(pre != NodeID::UNSPECIFIED)
02010 visual.visDisconnect(visualIdOverlay, this->nodeId, pre, "");
02011 pre = NodeID::UNSPECIFIED;
02012 }
02013 if(nodes.at(1) != suc){
02014 if(suc != NodeID::UNSPECIFIED)
02015 visual.visDisconnect(visualIdOverlay, this->nodeId, suc, "");
02016 suc = NodeID::UNSPECIFIED;
02017 }
02018
02019
02020 if(pre == NodeID::UNSPECIFIED){
02021 pre = nodes.at(0);
02022 if(pre != NodeID::UNSPECIFIED)
02023 visual.visConnect(visualIdOverlay, this->nodeId, pre, "");
02024 }
02025 if(suc == NodeID::UNSPECIFIED){
02026 suc = nodes.at(1);
02027 if(suc != NodeID::UNSPECIFIED)
02028 visual.visConnect(visualIdOverlay, this->nodeId, suc, "");
02029 }
02030
02031 }
02032
02033
02034
02035
02036
02037
02038
02039
02040
02041
02042
02043
02044
02045
02046
02047
02048
02049
02050
02051
02052
02053
02054
02055
02056
02057
02058
02059
02060
02061
02062
02063
02064
02065 static set<NodeID> linkset;
02066 set<NodeID> remotenodes;
02067 BOOST_FOREACH( LinkDescriptor* ld, links ) {
02068 if (!ld->isVital() || ld->service != OverlayInterface::OVERLAY_SERVICE_ID)
02069 continue;
02070
02071 if (ld->routeRecord.size()>1 && ld->relayed) {
02072 for (size_t i=1; i<ld->routeRecord.size(); i++)
02073 remotenodes.insert( ld->routeRecord[ld->routeRecord.size()-i-1] );
02074 } else {
02075 remotenodes.insert(ld->remoteNode);
02076 }
02077 }
02078
02079
02080 bool changed = false;
02081
02082 do{
02083 changed = false;
02084 BOOST_FOREACH(NodeID n, linkset){
02085 if(remotenodes.find(n) == remotenodes.end()){
02086 visual.visDisconnect(visualIdBase, this->nodeId, n, "");
02087 linkset.erase(n);
02088 changed = true;
02089 break;
02090 }
02091 }
02092 }while(changed);
02093
02094
02095 do{
02096 changed = false;
02097 BOOST_FOREACH(NodeID n, remotenodes){
02098 if(linkset.find(n) == linkset.end()){
02099 visual.visConnect(visualIdBase, this->nodeId, n, "");
02100 linkset.insert(n);
02101 changed = true;
02102 break;
02103 }
02104 }
02105 }while(changed);
02106
02107 }
02108
02109
02110
02111 void BaseOverlay::initDHT() {
02112 dht = new DHT();
02113 localDHT = new DHT();
02114 republishCounter = 0;
02115 }
02116
02117 void BaseOverlay::destroyDHT() {
02118 delete dht;
02119 delete localDHT;
02120 }
02121
02123 void BaseOverlay::stabilizeDHT() {
02124
02125
02126 if (republishCounter < 2) {
02127 republishCounter++;
02128 return;
02129 }
02130 republishCounter = 0;
02131
02132
02133 BOOST_FOREACH( DHTEntry& entry, dht->entries ) {
02134
02135 entry.erase_expired_entries();
02136 }
02137
02138
02139 BOOST_FOREACH( DHTEntry& entry, localDHT->entries ) {
02140 BOOST_FOREACH( ValueEntry& value, entry.values )
02141 dhtPut(entry.key, value.get_value(), value.get_ttl(), false, true );
02142 }
02143 }
02144
02145
02146 bool BaseOverlay::handleDHTMessage( OverlayMsg* msg ) {
02147
02148
02149 logging_debug("Received DHT message");
02150 DHTMessage* dhtMsg = msg->decapsulate<DHTMessage>();
02151
02152
02153 if (msg->getType()==OverlayMsg::typeDHTData) {
02154 const ServiceID& service = msg->getService();
02155 logging_info( "Received DHT data for service " << service.toString() );
02156
02157
02158 CommunicationListener* lst = getListener(service);
02159 if(lst != NULL) lst->onKeyValue(dhtMsg->getKey(), dhtMsg->getValues() );
02160 return true;
02161 }
02162
02163
02164 if (!overlayInterface->isClosestNodeTo(msg->getDestinationNode())) {
02165 logging_debug("Routing DHT message to closest node "
02166 << " from " << msg->getSourceNode()
02167 << " to " << msg->getDestinationNode()
02168 );
02169 route( msg );
02170 delete msg;
02171 return true;
02172 }
02173
02174
02175 switch (msg->getType()) {
02176
02177
02178 case OverlayMsg::typeDHTPut: {
02179 logging_debug("DHT-Put: Attempt to store values for key "
02180 << dhtMsg->getKey());
02181 if (dhtMsg->doReplace()) {
02182 logging_debug("DHT-Put: Attempt to replace key: remove old values first!");
02183 dht->remove(dhtMsg->getKey());
02184 }
02185 BOOST_FOREACH( Data value, dhtMsg->getValues() ) {
02186 logging_debug("DHT-Put: Stored value: " << value );
02187 dht->put(dhtMsg->getKey(), value, dhtMsg->getTTL() );
02188 }
02189 break;
02190 }
02191
02192
02193 case OverlayMsg::typeDHTGet: {
02194 logging_info("DHT-Get: key=" << dhtMsg->getKey() );
02195 vector<Data> vect = dht->get(dhtMsg->getKey());
02196 BOOST_FOREACH(const Data& d, vect)
02197 logging_info("DHT-Get: value=" << d);
02198 OverlayMsg omsg(*msg);
02199 omsg.swapRoles();
02200 omsg.setType(OverlayMsg::typeDHTData);
02201 DHTMessage dhtmsg(dhtMsg->getKey(), vect);
02202 omsg.encapsulate(&dhtmsg);
02203 dhtSend(&omsg, omsg.getDestinationNode());
02204 break;
02205 }
02206
02207
02208 case OverlayMsg::typeDHTRemove: {
02209 if (dhtMsg->hasValues()) {
02210 BOOST_FOREACH( Data value, dhtMsg->getValues() )
02211 dht->remove(dhtMsg->getKey(), value );
02212 } else
02213 dht->remove( dhtMsg->getKey() );
02214 break;
02215 }
02216
02217
02218 default:
02219 logging_error("DHT Message type unknown.");
02220 return false;
02221 }
02222 delete msg;
02223 return true;
02224 }
02225
02227 void BaseOverlay::dhtPut( const Data& key, const Data& value, int ttl, bool replace, bool no_local_refresh ) {
02228
02229
02230 logging_info("DHT-Put:"
02231 << " key=" << key << " value=" << value
02232 << " ttl=" << ttl << " replace=" << replace
02233 );
02234
02235 if (!no_local_refresh) {
02236
02237
02238 if (replace) localDHT->remove(key);
02239 localDHT->put(key, value, ttl);
02240 }
02241
02242
02243 NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
02244 DHTMessage dhtmsg( key, value );
02245 dhtmsg.setReplace( replace );
02246 dhtmsg.setTTL(ttl);
02247
02248 OverlayMsg msg( OverlayMsg::typeDHTPut );
02249 msg.encapsulate( &dhtmsg );
02250 dhtSend(&msg, dest);
02251 }
02252
02254 void BaseOverlay::dhtRemove( const Data& key, const Data& value ) {
02255
02256 localDHT->remove(key,value);
02257
02258
02259 NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
02260 DHTMessage dhtmsg(key,value);
02261
02262
02263 OverlayMsg msg(OverlayMsg::typeDHTRemove);
02264 msg.encapsulate( &dhtmsg );
02265 dhtSend(&msg, dest);
02266 }
02267
02269 void BaseOverlay::dhtRemove( const Data& key ) {
02270
02271 logging_info("DHT-Remove: Removing key=" << key );
02272
02273
02274 NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
02275 DHTMessage dhtmsg(key);
02276
02277
02278 OverlayMsg msg(OverlayMsg::typeDHTRemove);
02279 msg.encapsulate( &dhtmsg );
02280 dhtSend(&msg, dest);
02281 }
02282
02284 void BaseOverlay::dhtGet( const Data& key, const ServiceID& service ) {
02285
02286 logging_info("DHT-Get: Trying to resolve key=" <<
02287 key << " for service=" << service.toString() );
02288
02289
02290 NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
02291 DHTMessage dhtmsg(key);
02292
02293
02294 OverlayMsg msg(OverlayMsg::typeDHTGet);
02295 msg.setService(service);
02296 msg.encapsulate( &dhtmsg );
02297 dhtSend(&msg, dest);
02298 }
02299
02300 void BaseOverlay::dhtSend( OverlayMsg* msg, const NodeID& dest ) {
02301
02302 logging_info("DHT-Send: Sending message with key=" << dest.toString() );
02303
02305 msg->setSourceNode(this->nodeId);
02306 msg->setDestinationNode(dest);
02307
02308
02309 if (overlayInterface->isClosestNodeTo(msg->getDestinationNode())) {
02310 Data d = data_serialize(msg);
02311 Message* m2 = new Message(d);
02312 OverlayMsg* m3 = m2->decapsulate<OverlayMsg>();
02313 handleDHTMessage(m3);
02314 delete m2;
02315 return;
02316 }
02317
02318
02319 send( msg, dest );
02320 }
02321
02322 std::string BaseOverlay::debugInformation() {
02323 std::stringstream s;
02324 int i=0;
02325
02326
02327 s << "Long debug info ... [see below]" << endl << endl;
02328 s << "--- overlay information ----------------------" << endl;
02329 s << overlayInterface->debugInformation() << endl;
02330
02331
02332 s << "--- link state -------------------------------" << endl;
02333 BOOST_FOREACH( LinkDescriptor* ld, links ) {
02334 s << "link " << i << ": " << ld << endl;
02335 i++;
02336 }
02337 s << endl << endl;
02338
02339 return s.str();
02340 }
02341
02342 }}