Changeset 12060 for source/ariba/communication/BaseCommunication.cpp
- Timestamp:
- Jun 19, 2013, 11:05:49 AM (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
source/ariba/communication/BaseCommunication.cpp
r10767 r12060 57 57 namespace communication { 58 58 59 using namespace ariba::addressing2; 60 59 61 using ariba::utility::PeerID; 60 62 using ariba::utility::SystemQueue; 61 63 62 64 use_logging_cpp(BaseCommunication); 63 64 /// adds an endpoint to the list65 void BaseCommunication::add_endpoint( const address_v* endpoint ) {66 if (endpoint==NULL) return;67 BOOST_FOREACH( endpoint_reference& ref, remote_endpoints ) {68 if (ref.endpoint->type_id() == endpoint->type_id() && *ref.endpoint == *endpoint) {69 ref.count++;70 return;71 }72 }73 endpoint_reference ref;74 ref.endpoint = endpoint->clone();75 ref.count = 1;76 remote_endpoints.push_back(ref);77 }78 79 /// removes an endpoint from the list80 void BaseCommunication::remove_endpoint( const address_v* endpoint ) {81 if (endpoint==NULL) return;82 for (vector<endpoint_reference>::iterator i = remote_endpoints.begin();83 i != remote_endpoints.end(); i++) {84 if ((*i->endpoint).type_id() == endpoint->type_id() && (*i->endpoint) == *endpoint) {85 i->count--;86 if (i->count==0) {87 logging_info("No more links to " << i->endpoint->to_string() << ": terminating transports!");88 transport->terminate(i->endpoint);89 delete i->endpoint;90 remote_endpoints.erase(i);91 }92 return;93 }94 }95 }96 65 97 66 … … 100 69 transport( NULL ), 101 70 messageReceiver( NULL ), 102 started( false ) 71 started( false ), 72 listenOn_endpoints(new addressing2::endpoint_set()) 103 73 { 104 74 } … … 109 79 110 80 111 void BaseCommunication::start() { 81 void BaseCommunication::start(EndpointSetPtr listen_on) { 82 assert ( ! started ); 83 84 listenOn_endpoints = listen_on; 85 logging_info("Setting local end-points: " << listenOn_endpoints->to_string()); 86 112 87 logging_info( "Starting up ..." ); 113 88 currentSeqnum = 0; 114 89 115 // set local peer id116 localDescriptor.getPeerId() = PeerID::random();117 logging_info( "Using PeerID: " << localDescriptor.getPeerId() );118 119 90 // creating transports 91 // ---> transport_peer holds the set of the active endpoints we're listening on 120 92 logging_info( "Creating transports ..." ); 121 122 #ifdef UNDERLAY_OMNET 123 AribaOmnetModule* module = StartupWrapper::getCurrentModule(); 124 module->setServerPort( listenport ); 125 126 transport = module; 127 network = new OmnetNetworkProtocol( module ); 128 #else 129 transport = new transport_peer( localDescriptor.getEndpoints() ); 130 #endif 93 transport = new transport_peer(); 94 active_listenOn_endpoints = transport->add_listenOn_endpoints(listenOn_endpoints); 95 logging_info( "XXX. Active endpoints = " << active_listenOn_endpoints->to_string() ); // XXX 131 96 132 97 logging_info( "Searching for local locators ..." ); 133 /** 134 * DONT DO THAT: if(localDescriptor.getEndpoints().to_string().length() == 0) 135 * since addresses are used to initialize transport addresses 136 */ 137 AddressDiscovery::discover_endpoints( localDescriptor.getEndpoints() ); 138 logging_info( "Done. Local endpoints = " << localDescriptor.toString() ); 139 98 local_endpoints = AddressDiscovery::discover_endpoints(active_listenOn_endpoints); 99 if ( local_endpoints->count() > 0 ) 100 { 101 logging_info( "Done. Discovered local endpoints: " << local_endpoints->to_string() ); 102 } 103 else 104 { 105 logging_warn("WARING!! No local endpoints found, NO COMMUNICATION POSSIBLE!!"); 106 107 // TODO notify application, so that it may react properly. throw exception..? 108 assert( false ); 109 } 110 111 112 // create local EndpointDescriptor 113 // ---> localDescriptor hold the set endpoints that can be used to reach us 114 localDescriptor.getPeerId() = PeerID::random(); 115 localDescriptor.replace_endpoint_set(local_endpoints); 116 logging_info( "Using PeerID: " << localDescriptor.getPeerId() ); 117 118 // start transport_peer 140 119 transport->register_listener( this ); 141 120 transport->start(); 142 121 143 #ifndef UNDERLAY_OMNET144 122 // bind to the network change detection 145 123 networkMonitor.registerNotification( this ); 146 #endif147 124 148 125 // base comm startup done … … 163 140 bool BaseCommunication::isStarted(){ 164 141 return started; 165 }166 167 /// Sets the endpoints168 void BaseCommunication::setEndpoints( string& _endpoints ) {169 localDescriptor.getEndpoints().assign(_endpoints);170 logging_info("Setting local end-points: "171 << localDescriptor.getEndpoints().to_string());172 142 } 173 143 … … 193 163 addLink( ld ); 194 164 195 // send a message to request new link to remote 196 logging_debug( "Send messages with request to open link to " << descriptor.toString() ); 197 AribaBaseMsg baseMsg( AribaBaseMsg::typeLinkRequest, linkid ); 198 baseMsg.getLocalDescriptor() = localDescriptor; 199 baseMsg.getRemoteDescriptor().getPeerId() = descriptor.getPeerId(); 200 201 // serialize and send message 202 send( &baseMsg, descriptor ); 165 166 /* send a message to request new link to remote */ 167 logging_debug( "Send messages with request to open link to " << descriptor.toString() ); 168 169 /* 170 * Create Link-Request Message: 171 * NOTE: - Their PeerID (in parent message) 172 * - Our LinkID 173 * - Our PeerID 174 * - Our EndpointDescriptor 175 */ 176 reboost::message_t linkmsg; 177 linkmsg.push_back(linkid.serialize()); 178 linkmsg.push_back(localDescriptor.getPeerId().serialize()); 179 linkmsg.push_back(localDescriptor.endpoints->serialize()); 180 181 // // XXX AKTUELL BUG FINDING... 182 // reboost::shared_buffer_t xxx = localDescriptor.endpoints->serialize(); 183 // EndpointSetPtr xxx_set = endpoint_set::create_EndpointSet(); 184 // xxx_set->deserialize(xxx); 185 // cout << "/// MARIO VORHER: " << localDescriptor.endpoints->to_string() << endl; 186 // cout << "/// MARIO NACHHER: " << xxx_set->to_string() << endl; 187 188 // send message 189 // TODO move enum to BaseComm 190 send_to_peer(AribaBaseMsg::typeLinkRequest, descriptor.getPeerId(), linkmsg, 191 descriptor, system_priority::OVERLAY); 203 192 204 193 return linkid; … … 217 206 218 207 // tell the registered listeners 219 BOOST_FOREACH( CommunicationEvents* i, eventListener ) {208 foreach( CommunicationEvents* i, eventListener ) { 220 209 i->onLinkDown( link, ld.localLocator, ld.remoteLocator ); 221 210 } 222 211 223 // create message to drop the link 212 213 // * send message to drop the link * 224 214 logging_debug( "Sending out link close request. for us, the link is closed now" ); 225 AribaBaseMsg msg( AribaBaseMsg::typeLinkClose, ld.localLink, ld.remoteLink ); 226 227 // send message to drop the link 228 send( &msg, ld ); 215 reboost::message_t empty_message; 216 send_over_link( AribaBaseMsg::typeLinkClose, empty_message, ld, system_priority::OVERLAY ); 229 217 230 218 // remove from map … … 232 220 } 233 221 234 seqnum_t BaseCommunication::sendMessage( const LinkID lid, const Message* message) { 235 236 logging_debug( "Sending out message to link " << lid.toString() ); 237 222 223 seqnum_t BaseCommunication::sendMessage( const LinkID& lid, 224 reboost::message_t message, 225 uint8_t priority, 226 bool bypass_overlay) throw(communication_message_not_sent) 227 { 228 // message type: direct data or (normal) data 229 AribaBaseMsg::type_ type; 230 if ( bypass_overlay ) 231 { 232 type = AribaBaseMsg::typeDirectData; 233 logging_debug( "Sending out direct-message to link " << lid.toString() ); 234 } 235 else 236 { 237 type = AribaBaseMsg::typeData; 238 logging_debug( "Sending out message to link " << lid.toString() ); 239 } 240 241 238 242 // query local link info 239 243 LinkDescriptor& ld = queryLocalLink(lid); 240 if( ld.isUnspecified() ){ 241 logging_error( "Don't know the link with id " << lid.toString() ); 242 return -1; 244 if( ld.isUnspecified() ) 245 { 246 throw communication_message_not_sent("Don't know the link with id " 247 + lid.toString()); 243 248 } 244 249 245 250 // link not up-> error 246 if( !ld.up ) {247 logging_error("Can not send on link " << lid.toString() << ": link not up");248 return -1;249 }250 251 // create message 252 AribaBaseMsg msg( AribaBaseMsg::typeData, ld.localLink, ld.remoteLink ); 253 254 // encapsulate the payload message255 msg.encapsulate( const_cast<Message*>(message) ); 256 257 // send message258 send( &msg, ld);259 260 // return sequence number251 if( !ld.up ) 252 { 253 throw communication_message_not_sent("Can not send on link " 254 + lid.toString() + ": link not up"); 255 } 256 257 258 // * send message * 259 bool okay = send_over_link( type, message, ld, priority ); 260 261 if ( ! okay ) 262 { 263 throw communication_message_not_sent("send_over_link failed!"); 264 } 265 261 266 return ++currentSeqnum; 262 267 } … … 268 273 LinkDescriptor& linkDesc = queryLocalLink(link); 269 274 if (linkDesc.isUnspecified()) return EndpointDescriptor::UNSPECIFIED(); 270 return linkDesc.remote Endpoint;275 return linkDesc.remoteDescriptor; 271 276 } 272 277 } … … 283 288 } 284 289 285 SystemEventType TransportEvent("Transport"); 286 SystemEventType MessageDispatchEvent("MessageDispatchEvent", TransportEvent ); 287 288 /// called when a system event is emitted by system queue 289 void BaseCommunication::handleSystemEvent(const SystemEvent& event) { 290 291 // dispatch received messages 292 if ( event.getType() == MessageDispatchEvent ){ 293 logging_debug( "Forwarding message receiver" ); 294 boost::function0<void>* handler = event.getData< boost::function0<void> >(); 295 (*handler)(); 296 delete handler; 297 } 298 } 299 300 /** 301 * called within the ASIO thread 302 * when a message is received from underlay transport 303 */ 290 291 292 /*------------------------------ 293 | ASIO thread --> SystemQueue | 294 ------------------------------*/ 295 296 /// ASIO thread 304 297 void BaseCommunication::receive_message(transport_connection::sptr connection, 305 reboost:: message_t msg) {298 reboost::shared_buffer_t msg) { 306 299 307 300 logging_debug( "Dispatching message" ); 308 301 309 boost::function0<void>* handler = new boost::function0<void>(302 SystemQueue::instance().scheduleCall( 310 303 boost::bind( 311 304 &BaseCommunication::receiveMessage, … … 313 306 connection, 314 307 msg) 315 ); 316 317 SystemQueue::instance().scheduleEvent( 318 SystemEvent(this, MessageDispatchEvent, handler) 319 ); 320 } 321 322 /** 323 * called within the ARIBA thread (System Queue) 324 * when a message is received from underlay transport 325 */ 308 ); 309 } 310 311 /// ASIO thread 312 void BaseCommunication::connection_terminated(transport_connection::sptr connection) 313 { 314 SystemQueue::instance().scheduleCall( 315 boost::bind( 316 &BaseCommunication::connectionTerminated, 317 this, 318 connection) 319 ); 320 } 321 322 /*-------------------------------- 323 | [ASIO thread --> SystemQueue] | 324 -------------------------------*/ 325 326 /// ARIBA thread (System Queue) 327 void BaseCommunication::connectionTerminated(transport_connection::sptr connection) 328 { 329 vector<LinkID*> links = connection->get_communication_links(); 330 331 logging_debug("[BaseCommunication] Connection terminated: " 332 << connection->getLocalEndpoint()->to_string() 333 << " <--> " << connection->getRemoteEndpoint()->to_string() 334 << " (" << links.size() << " links)"); 335 336 // remove all links that used the terminated connection 337 for ( vector<LinkID*>::iterator it = links.begin(); it != links.end(); ++it ) 338 { 339 LinkID& link_id = **it; 340 341 logging_debug(" ---> Removing link: " << link_id.toString()); 342 343 // searching for link, not found-> warn 344 LinkDescriptor& linkDesc = queryLocalLink( link_id ); 345 if (linkDesc.isUnspecified()) { 346 logging_warn("Failed to find local link " << link_id.toString()); 347 continue; 348 } 349 350 // inform listeners 351 foreach( CommunicationEvents* i, eventListener ){ 352 i->onLinkFail( linkDesc.localLink, 353 linkDesc.localLocator, linkDesc.remoteLocator ); 354 } 355 356 // remove the link descriptor 357 removeLink( link_id ); 358 } 359 } 360 361 /// ARIBA thread (System Queue) 326 362 void BaseCommunication::receiveMessage(transport_connection::sptr connection, 327 reboost:: message_t message)363 reboost::shared_buffer_t message) 328 364 { 329 330 //// Adapt to old message system //// 331 // Copy data 332 size_t bytes_len = message.size(); 333 uint8_t* bytes = new uint8_t[bytes_len]; 334 message.read(bytes, 0, bytes_len); 335 336 Data data(bytes, bytes_len * 8); 337 338 Message legacy_message; 339 legacy_message.setPayload(data); 340 341 342 343 /// decapsulate message 344 AribaBaseMsg* msg = legacy_message.decapsulate<AribaBaseMsg>(); 345 logging_debug( "Receiving message of type " << msg->getTypeString() ); 346 365 // XXX 366 logging_debug("/// [receiveMessage] buffersize: " << message.size()); 367 368 // get type 369 uint8_t type = message.data()[0]; 370 reboost::shared_buffer_t sub_buff = message(1); 371 372 // get link id 373 LinkID link_id; 374 if ( type != AribaBaseMsg::typeLinkRequest) 375 { 376 sub_buff = link_id.deserialize(sub_buff); 377 } 378 347 379 // handle message 348 switch ( msg->getType()) {349 380 switch ( type ) 381 { 350 382 // --------------------------------------------------------------------- 351 383 // data message 352 384 // --------------------------------------------------------------------- 353 case AribaBaseMsg::typeData: { 354 logging_debug( "Received data message, forwarding to overlay" ); 355 if( messageReceiver != NULL ) { 385 case AribaBaseMsg::typeData: 386 { 387 logging_debug( "Received data message, forwarding to overlay." ); 388 if( messageReceiver != NULL ) 389 { 356 390 messageReceiver->receiveMessage( 357 msg, msg->getRemoteLink(), NodeID::UNSPECIFIED391 sub_buff, link_id, NodeID::UNSPECIFIED, false 358 392 ); 359 393 } 394 360 395 break; 361 396 } 362 397 398 // --------------------------------------------------------------------- 399 // direct data message (bypass overlay-layer) 400 // --------------------------------------------------------------------- 401 case AribaBaseMsg::typeDirectData: 402 { 403 logging_debug( "Received direct data message, forwarding to application." ); 404 405 if( messageReceiver != NULL ) 406 { 407 messageReceiver->receiveMessage( 408 sub_buff, link_id, NodeID::UNSPECIFIED, true 409 ); 410 } 411 412 break; 413 } 414 415 416 363 417 // --------------------------------------------------------------------- 364 418 // handle link request from remote 365 419 // --------------------------------------------------------------------- 366 case AribaBaseMsg::typeLinkRequest: { 367 logging_debug( "Received link open request" ); 368 369 /// not the correct peer id-> skip request 370 if (!msg->getRemoteDescriptor().getPeerId().isUnspecified() 371 && msg->getRemoteDescriptor().getPeerId() != localDescriptor.getPeerId()) { 372 logging_info("Received link request for " 373 << msg->getRemoteDescriptor().getPeerId().toString() 374 << "but i'm " 375 << localDescriptor.getPeerId() 376 << ": Ignoring!"); 377 break; 378 } 379 420 case AribaBaseMsg::typeLinkRequest: 421 { 422 logging_debug( "Received link open request on " 423 << connection->getLocalEndpoint()->to_string() ); 424 425 /* 426 * Deserialize Peer Message 427 * - Our PeerID 428 */ 429 PeerID our_peer_id; 430 sub_buff = our_peer_id.deserialize(sub_buff); 431 432 /// not the correct peer id-> skip request 433 if ( our_peer_id != localDescriptor.getPeerId() && 434 ! our_peer_id.isUnspecified() /* overlay bootstrap */ ) 435 { 436 logging_info("Received link request for " 437 << our_peer_id.toString() 438 << "but i'm " 439 << localDescriptor.getPeerId() 440 << ": Ignoring!"); 441 442 // TODO terminate connection? 443 444 break; 445 } 446 447 448 /* 449 * Deserialize Link-Request Message: 450 * - Their LinkID 451 * - Their PeerID 452 * - Their EndpointDescriptor 453 */ 454 LinkID their_link_id; 455 PeerID their_peer_id; 456 EndpointSetPtr their_endpoints = endpoint_set::create_EndpointSet(); 457 sub_buff = their_link_id.deserialize(sub_buff); 458 sub_buff = their_peer_id.deserialize(sub_buff); 459 sub_buff = their_endpoints->deserialize(sub_buff); 460 /* [ Deserialize Link-Request Message ] */ 461 462 380 463 /// only answer the first request 381 if (!queryRemoteLink(msg->getLocalLink()).isUnspecified()) { 464 if (!queryRemoteLink(their_link_id).isUnspecified()) 465 { 466 467 // TODO aktuell: When will these connections be closed? 468 // ---> Close it now (if it services no links) ? 469 // (see also ! allowlink below) 470 471 // XXX AKTUELL TESTING !! This will cause race conditions. So this is test-code only! 472 if ( connection->get_communication_links().size() == 0 ) 473 { 474 connection->terminate(); 475 } 476 382 477 logging_debug("Link request already received. Ignore!"); 383 478 break; … … 386 481 /// create link ids 387 482 LinkID localLink = LinkID::create(); 388 LinkID remoteLink = msg->getLocalLink();483 LinkID remoteLink = their_link_id; // XXX intermediate variable is unnecessary 389 484 logging_debug( 390 485 "local=" << connection->getLocalEndpoint()->to_string() … … 394 489 // check if link creation is allowed by ALL listeners 395 490 bool allowlink = true; 396 BOOST_FOREACH( CommunicationEvents* i, eventListener ){491 foreach( CommunicationEvents* i, eventListener ){ 397 492 allowlink &= i->onLinkRequest( localLink, 398 493 connection->getLocalEndpoint(), … … 403 498 if( !allowlink ){ 404 499 logging_warn( "Overlay denied creation of link" ); 405 delete msg;406 500 return; 407 501 } … … 411 505 ld->localLink = localLink; 412 506 ld->remoteLink = remoteLink; 413 ld->localLocator = connection->getLocalEndpoint()->clone(); 414 ld->remoteLocator = connection->getRemoteEndpoint()->clone(); 415 ld->connection = connection; 416 ld->remoteEndpoint = msg->getLocalDescriptor(); 417 add_endpoint(ld->remoteLocator); 418 419 // add layer 1-3 addresses 420 ld->remoteEndpoint.getEndpoints().add( 421 ld->remoteLocator, endpoint_set::Layer1_3 | endpoint_set::NoLoopback); 422 localDescriptor.getEndpoints().add( 423 connection->getLocalEndpoint(), 424 endpoint_set::Layer1_3 | endpoint_set::NoLoopback); 507 ld->localLocator = connection->getLocalEndpoint(); 508 ld->remoteLocator = connection->getRemoteEndpoint(); 509 ld->remoteDescriptor = EndpointDescriptor(their_peer_id, their_endpoints); 510 ld->set_connection(connection); 511 512 513 // update endpoints (should only have any effect in case of NAT) 514 ld->remoteDescriptor.endpoints->add_endpoint(connection->getRemoteEndpoint()); 515 // localDescriptor.endpoints->add_endpoint(connection->getLocalEndpoint()); // XXX 0.0.0.0:0 425 516 426 517 // link is now up-> add it … … 428 519 addLink(ld); 429 520 430 // link is up! 431 logging_debug( "Link (initiated from remote) is up with " 432 << "local(id=" << ld->localLink.toString() << "," 433 << "locator=" << ld->localLocator->to_string() << ") " 434 << "remote(id=" << ld->remoteLink.toString() << ", " 435 << "locator=" << ld->remoteLocator->to_string() << ")" 436 ); 437 438 // sending link request reply 439 logging_debug( "Sending link request reply with ids " 440 << "local=" << localLink.toString() << ", " 441 << "remote=" << remoteLink.toString() ); 442 AribaBaseMsg reply( AribaBaseMsg::typeLinkReply, localLink, remoteLink ); 443 reply.getLocalDescriptor() = localDescriptor; 444 reply.getRemoteDescriptor() = ld->remoteEndpoint; 445 446 send( &reply, *ld ); 521 522 523 /* sending link reply */ 524 logging_debug( "Sending link reply with ids " 525 << "local=" << localLink.toString() << ", " 526 << "remote=" << remoteLink.toString() ); 527 528 /* 529 * Create Link-Reply Message: 530 * - Our LinkID 531 * - Our Endpoint_Set (as update) 532 * - Their EndpointDescriptor (maybe they learn something about NAT) 533 */ 534 reboost::message_t linkmsg; 535 linkmsg.push_back(localLink.serialize()); 536 linkmsg.push_back(localDescriptor.endpoints->serialize()); 537 linkmsg.push_back(ld->remoteDescriptor.endpoints->serialize()); 538 539 // XXX 540 cout << "/// MARIO: " << ld->get_connection()->getRemoteEndpoint()->to_string() << endl; 541 542 // send message 543 bool sent = send_over_link( AribaBaseMsg::typeLinkReply, linkmsg, *ld, system_priority::OVERLAY ); 544 545 if ( ! sent ) 546 { 547 logging_error("ERROR: Could not send LinkReply to: " << ld->remoteLocator->to_string()); 548 549 // TODO remove link, close link, ..? 550 551 break; 552 } 553 554 555 // link is up! 556 logging_debug( "Link (initiated from remote) is up with " 557 << "local(id=" << ld->localLink.toString() << "," 558 << "locator=" << ld->localLocator->to_string() << ") " 559 << "remote(id=" << ld->remoteLink.toString() << ", " 560 << "locator=" << ld->remoteLocator->to_string() << ")" 561 ); 447 562 448 563 // inform listeners about new open link 449 BOOST_FOREACH( CommunicationEvents* i, eventListener ) {564 foreach( CommunicationEvents* i, eventListener ) { 450 565 i->onLinkUp( localLink, ld->localLocator, ld->remoteLocator); 451 566 } … … 458 573 // handle link request reply 459 574 // --------------------------------------------------------------------- 460 case AribaBaseMsg::typeLinkReply: { 575 case AribaBaseMsg::typeLinkReply: 576 { 461 577 logging_debug( "Received link open reply for a link we initiated" ); 462 578 579 /* 580 * Deserialize Link-Reply Message: 581 * - Their LinkID 582 * - Their Endpoint_Set (as update) 583 * - Our EndpointDescriptor (maybe we can learn something about NAT) 584 */ 585 LinkID their_link_id; 586 EndpointSetPtr their_endpoints = endpoint_set::create_EndpointSet(); 587 EndpointSetPtr our_endpoints = endpoint_set::create_EndpointSet(); 588 sub_buff = their_link_id.deserialize(sub_buff); 589 sub_buff = their_endpoints->deserialize(sub_buff); 590 sub_buff = our_endpoints->deserialize(sub_buff); 591 592 463 593 // this is a reply to a link open request, so we have already 464 594 // a link mapping and can now set the remote link to valid 465 LinkDescriptor& ld = queryLocalLink( msg->getRemoteLink());595 LinkDescriptor& ld = queryLocalLink( link_id ); 466 596 467 597 // no link found-> warn! 468 598 if (ld.isUnspecified()) { 469 logging_warn("Failed to find local link " << msg->getRemoteLink().toString()); 470 delete msg; 599 logging_warn("Failed to find local link " << link_id.toString()); 471 600 return; 472 601 } 602 603 if ( ld.up ) 604 { 605 logging_warn("Got link replay for already open link. Ignore. LinkID: " << link_id.toString()); 606 607 // TODO send LinkClose ? 608 return; 609 } 473 610 474 611 // store the connection 475 ld. connection = connection;612 ld.set_connection(connection); 476 613 477 614 // set remote locator and link id 478 ld.remoteLink = msg->getLocalLink(); 479 ld.remoteLocator = connection->getRemoteEndpoint()->clone(); 480 ld.remoteEndpoint.getEndpoints().add( 481 msg->getLocalDescriptor().getEndpoints(), 482 endpoint_set::Layer1_4 483 ); 484 485 localDescriptor.getEndpoints().add( 486 msg->getRemoteDescriptor().getEndpoints(), 487 endpoint_set::Layer1_3 488 ); 615 ld.remoteLink = their_link_id; 616 ld.remoteLocator = connection->getRemoteEndpoint(); 617 618 619 /* Update endpoints */ 620 // NOTE: we might loose some information here, but it's our only chance to get rid of outdated information. 621 ld.remoteDescriptor.replace_endpoint_set(their_endpoints); 622 623 // add actual remote endpoint to this set (should only have any effect in case of NAT) 624 ld.remoteDescriptor.endpoints->add_endpoint(connection->getRemoteEndpoint()); 625 626 // TODO In case of NAT, we could learn something about our external IP. 627 // ---> But we must trust the remote peer about this information!! 628 // localDescriptor.endpoints->add_endpoints(our_endpoints); 629 630 631 632 489 633 ld.up = true; 490 add_endpoint(ld.remoteLocator);491 634 492 635 logging_debug( "Link is now up with local id " … … 496 639 497 640 // inform lisneters about link up event 498 BOOST_FOREACH( CommunicationEvents* i, eventListener ){641 foreach( CommunicationEvents* i, eventListener ){ 499 642 i->onLinkUp( ld.localLink, ld.localLocator, ld.remoteLocator ); 500 643 } … … 509 652 case AribaBaseMsg::typeLinkClose: { 510 653 // get remote link 511 const LinkID& localLink = msg->getRemoteLink();512 logging_debug( "Received link close request for link " << l ocalLink.toString() );654 // const LinkID& localLink = msg.getRemoteLink(); 655 logging_debug( "Received link close request for link " << link_id.toString() ); 513 656 514 657 // searching for link, not found-> warn 515 LinkDescriptor& linkDesc = queryLocalLink( l ocalLink);658 LinkDescriptor& linkDesc = queryLocalLink( link_id ); 516 659 if (linkDesc.isUnspecified()) { 517 logging_warn("Failed to find local link " << localLink.toString()); 518 delete msg; 660 logging_warn("Failed to find local link " << link_id.toString()); 519 661 return; 520 662 } 521 663 522 664 // inform listeners 523 BOOST_FOREACH( CommunicationEvents* i, eventListener ){665 foreach( CommunicationEvents* i, eventListener ){ 524 666 i->onLinkDown( linkDesc.localLink, 525 667 linkDesc.localLocator, linkDesc.remoteLocator ); … … 527 669 528 670 // remove the link descriptor 529 removeLink( l ocalLink);671 removeLink( link_id ); 530 672 531 673 // done … … 534 676 535 677 // --------------------------------------------------------------------- 536 // handle link locator changes 678 // handle link locator changes -- TODO is this ever called..? 537 679 // --------------------------------------------------------------------- 538 case AribaBaseMsg::typeLinkUpdate: { 539 const LinkID& localLink = msg->getRemoteLink(); 540 logging_debug( "Received link update for link " 541 << localLink.toString() ); 542 543 // find the link description 544 LinkDescriptor& linkDesc = queryLocalLink( localLink ); 545 if (linkDesc.isUnspecified()) { 546 logging_warn("Failed to update local link " 547 << localLink.toString()); 548 delete msg; 549 return; 550 } 551 552 // update the remote locator 553 const address_v* oldremote = linkDesc.remoteLocator; 554 linkDesc.remoteLocator = connection->getRemoteEndpoint()->clone(); 555 556 // inform the listeners (local link has _not_ changed!) 557 BOOST_FOREACH( CommunicationEvents* i, eventListener ){ 558 i->onLinkChanged( 559 linkDesc.localLink, // linkid 560 linkDesc.localLocator, // old local 561 linkDesc.localLocator, // new local 562 oldremote, // old remote 563 linkDesc.remoteLocator // new remote 564 ); 565 } 566 567 // done 568 break; 569 } 570 } 571 572 delete msg; 680 // case AribaBaseMsg::typeLinkUpdate: { 681 // const LinkID& localLink = msg.getRemoteLink(); 682 // logging_debug( "Received link update for link " 683 // << localLink.toString() ); 684 // 685 // // find the link description 686 // LinkDescriptor& linkDesc = queryLocalLink( localLink ); 687 // if (linkDesc.isUnspecified()) { 688 // logging_warn("Failed to update local link " 689 // << localLink.toString()); 690 // return; 691 // } 692 // 693 // // update the remote locator 694 // addressing2::EndpointPtr oldremote = linkDesc.remoteLocator; 695 // linkDesc.remoteLocator = connection->getRemoteEndpoint(); 696 // 697 // // TODO update linkDesc.connection ? 698 // 699 // // inform the listeners (local link has _not_ changed!) 700 // foreach( CommunicationEvents* i, eventListener ){ 701 // i->onLinkChanged( 702 // linkDesc.localLink, // linkid 703 // linkDesc.localLocator, // old local 704 // linkDesc.localLocator, // new local 705 // oldremote, // old remote 706 // linkDesc.remoteLocator // new remote 707 // ); 708 // } 709 // 710 // // done 711 // break; 712 // } 713 714 715 default: { 716 logging_warn( "Received unknown message type!" ); 717 break; 718 } 719 720 } 573 721 } 574 722 … … 582 730 for(LinkSet::iterator i=linkSet.begin(); i != linkSet.end(); i++){ 583 731 if( (*i)->localLink != localLink) continue; 584 remove_endpoint((*i)->remoteLocator); 732 // remove_endpoint((*i)->remoteLocator); // XXX 585 733 delete *i; 586 734 linkSet.erase( i ); … … 605 753 } 606 754 607 LinkIDs BaseCommunication::getLocalLinks( const address_v* addr ) const {608 LinkIDs ids;609 for (size_t i=0; i<linkSet.size(); i++){610 if( addr == NULL ){611 ids.push_back( linkSet[i]->localLink );612 } else {613 if ( *linkSet[i]->remoteLocator == *addr )614 ids.push_back( linkSet[i]->localLink );615 }616 }617 return ids;618 }755 //LinkIDs BaseCommunication::getLocalLinks( const address_v* addr ) const { 756 // LinkIDs ids; 757 // for (size_t i=0; i<linkSet.size(); i++){ 758 // if( addr == NULL ){ 759 // ids.push_back( linkSet[i]->localLink ); 760 // } else { 761 // if ( *linkSet[i]->remoteLocator == *addr ) 762 // ids.push_back( linkSet[i]->localLink ); 763 // } 764 // } 765 // return ids; 766 //} 619 767 620 768 void BaseCommunication::onNetworkChange(const NetworkChangeInterface::NetworkChangeInfo& info){ 621 622 #ifdef UNDERLAY_OMNET623 624 // we have no mobility support for simulations625 return626 627 #endif // UNDERLAY_OMNET628 769 629 770 /*- disabled! … … 762 903 } 763 904 764 /// sends a message to all end-points in the end-point descriptor 765 void BaseCommunication::send(Message* legacy_message, const EndpointDescriptor& endpoint) { 766 Data data = data_serialize(legacy_message, DEFAULT_V); 767 768 //// Adapt to new message system //// 769 // transfer data buffer ownership to the shared_buffer 770 reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8); 771 772 reboost::message_t message; 773 message.push_back(buf); 774 775 transport->send(endpoint.getEndpoints(), message); 776 } 777 778 /// sends a message to the remote locator inside the link descriptor 779 void BaseCommunication::send(Message* legacy_message, const LinkDescriptor& desc) { 780 if (desc.remoteLocator==NULL) return; 781 782 Data data = data_serialize(legacy_message, DEFAULT_V); 783 784 //// Adapt to new message system //// 785 // transfer data buffer ownership to the shared_buffer 786 reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8); 787 788 reboost::message_t message; 789 message.push_back(buf); 790 791 desc.connection->send(message); 792 } 905 906 addressing2::EndpointPtr BaseCommunication::get_local_endpoint_of_link( 907 const LinkID& linkid) 908 { 909 LinkDescriptor& ld = queryLocalLink(linkid); 910 911 return ld.get_connection()->getLocalEndpoint(); 912 } 913 914 addressing2::EndpointPtr BaseCommunication::get_remote_endpoint_of_link( 915 const LinkID& linkid) 916 { 917 LinkDescriptor& ld = queryLocalLink(linkid); 918 919 return ld.get_connection()->getRemoteEndpoint(); 920 } 921 922 923 924 bool BaseCommunication::send_over_link( 925 const uint8_t type, 926 reboost::message_t message, 927 const LinkDescriptor& desc, 928 const uint8_t priority) 929 { 930 /* 931 * Create Link Message: 932 * - Type 933 * - Their LinkID 934 */ 935 // link id 936 message.push_front(desc.remoteLink.serialize()); 937 // type 938 memcpy(message.push_front(sizeof(uint8_t)).mutable_data(), &type, sizeof(uint8_t)); 939 /* [ Create Link Message ] */ 940 941 942 /* send message */ 943 transport_connection::sptr conn = desc.get_connection(); 944 if ( ! conn ) 945 { 946 cout << "/// MARIO: No connection!!" << endl; // XXX debug 947 return false; 948 } 949 950 // * send over connection * 951 return conn->send(message, priority); 952 } 953 954 void BaseCommunication::send_to_peer( 955 const uint8_t type, 956 const PeerID& peer_id, 957 reboost::message_t message, 958 const EndpointDescriptor& endpoint, 959 const uint8_t priority ) 960 { 961 /* 962 * Create Peer Message: 963 * - Type 964 * - Their PeerID 965 */ 966 // peer id 967 message.push_front(peer_id.serialize()); 968 // type 969 memcpy(message.push_front(sizeof(uint8_t)).mutable_data(), &type, sizeof(uint8_t)); 970 971 972 /* send message */ 973 transport->send(endpoint.getEndpoints(), message, priority); 974 } 975 976 977 793 978 794 979 }} // namespace ariba, communication
Note:
See TracChangeset
for help on using the changeset viewer.