Changeset 5284 for source/ariba/communication/BaseCommunication.cpp
- Timestamp:
- Jul 24, 2009, 3:23:11 PM (16 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
source/ariba/communication/BaseCommunication.cpp
r5151 r5284 52 52 namespace communication { 53 53 54 #include "networkinfo/AddressDiscovery.hpp" 55 54 56 use_logging_cpp(BaseCommunication); 55 const BaseCommunication::LinkDescriptor BaseCommunication::LinkDescriptor::UNSPECIFIED; 56 57 BaseCommunication::BaseCommunication() 58 : messageReceiver(NULL), network(NULL), transport(NULL), basecommStarted(false){ 57 const BaseCommunication::LinkDescriptor 58 BaseCommunication::LinkDescriptor::UNSPECIFIED; 59 60 BaseCommunication::BaseCommunication() { 61 this->transport = NULL; 62 this->started = false; 59 63 } 60 64 … … 62 66 } 63 67 64 void BaseCommunication::start( const NetworkLocator* _locallocator, const uint16_t _listenport){65 68 void BaseCommunication::start() { 69 logging_info( "Starting up ..." ); 66 70 currentSeqnum = 0; 67 listenport = _listenport; 68 69 logging_info( "starting up base communication and creating transports ..." ); 70 logging_info( "using port " << listenport ); 71 72 // creating transports 73 logging_info( "Creating transports ..." ); 71 74 72 75 #ifdef UNDERLAY_OMNET … … 77 80 network = new OmnetNetworkProtocol( module ); 78 81 #else 79 transport = new TCPTransport( listenport ); 80 network = new IPv4NetworkProtocol(); 82 transport = new transport_peer( localDescriptor.getEndpoints() ); 81 83 #endif 82 84 83 logging_debug( "searching for local locators ..." ); 84 85 NetworkProtocol::NetworkLocatorSet locators = network->getAddresses(); 86 NetworkProtocol::NetworkLocatorSet::iterator i = locators.begin(); 87 NetworkProtocol::NetworkLocatorSet::iterator iend = locators.end(); 88 89 // 90 // choose the first locator that is not localhost 91 // 92 93 bool foundLocator = false; 94 95 for( ; i != iend; i++){ 96 logging_debug( "local locator found " << (*i)->toString() ); 97 IPv4Locator* ipv4locator = dynamic_cast<IPv4Locator*>(*i); 98 99 // TODO: which locators are find to bind to? 100 // localhost is not too bad, works when testing locally 101 // with several instances. the manual override currently 102 // enables to use an arbitrary address, guess this is fine. 103 // so the manual override also can use ANY, LOCALHOST, BROADCAST 104 105 if( *ipv4locator != IPv4Locator::LOCALHOST && 106 *ipv4locator != IPv4Locator::ANY && 107 *ipv4locator != IPv4Locator::BROADCAST ){ 108 109 ipv4locator->setPort(listenport); 110 localDescriptor.locator = ipv4locator; 111 localDescriptor.isUnspec = false; 112 logging_info( "binding to addr = " << ipv4locator->toString() ); 113 foundLocator = true; 114 break; 115 } 116 } // for( ; i != iend; i++) 117 118 119 if( _locallocator != NULL ) { 120 if( localDescriptor.locator != NULL) delete localDescriptor.locator; 121 localDescriptor.locator = new IPv4Locator( IPv4Locator::fromString( _locallocator->toString()) ); 122 localDescriptor.isUnspec = false; 123 logging_debug( "manual locator override, using locator=" << 124 localDescriptor.locator->toString() ); 125 foundLocator = true; 126 } 127 128 // if we found no local locator, exit using logging fatal 129 if( !foundLocator ) 130 logging_fatal( "did not find a useable local locator!" ); 131 132 transport->addMessageReceiver( this ); 85 logging_info( "Searching for local locators ..." ); 86 discoverEndpoints(localDescriptor.getEndpoints()); 87 logging_info( "Done. Local endpoints = " << localDescriptor.toString() ); 88 89 transport->register_listener( this ); 133 90 transport->start(); 134 91 135 92 #ifndef UNDERLAY_OMNET 136 //137 93 // bind to the network change detection 138 //139 140 94 networkMonitor.registerNotification( this ); 141 95 #endif 142 96 143 //144 97 // base comm startup done 145 // 146 147 basecommStarted = true; 148 logging_info( "base communication started up" ); 98 started = true; 99 logging_info( "Started up." ); 149 100 } 150 101 151 102 void BaseCommunication::stop() { 152 153 logging_info( "stopping base communication and transport ..." ); 103 logging_info( "Stopping transports ..." ); 154 104 155 105 transport->stop(); 156 106 delete transport; 157 delete network; 158 159 basecommStarted = false; 160 logging_info( "base communication stopped" ); 107 started = false; 108 109 logging_info( "Stopped." ); 161 110 } 162 111 163 112 bool BaseCommunication::isStarted(){ 164 return basecommStarted; 113 return started; 114 } 115 116 /// Sets the endpoints 117 void BaseCommunication::setEndpoints( string& _endpoints ) { 118 localDescriptor.getEndpoints().assign(_endpoints); 119 logging_info("Setting local end-points: " 120 << localDescriptor.getEndpoints().to_string()); 165 121 } 166 122 … … 175 131 176 132 // debug 177 logging_debug( "request to establish link" ); 178 179 // 180 // just use the first locator in the endp descriptors 181 // 182 if( descriptor.locator == NULL ){ 183 logging_error( "invalid destination endpoint" ); 184 return LinkID::UNSPECIFIED; 185 } 186 187 if( localDescriptor.locator == NULL ){ 188 logging_error( "invalid local endpoint" ); 189 return LinkID::UNSPECIFIED; 190 } 191 192 const NetworkLocator* remote = descriptor.locator; 193 const NetworkLocator* local = localDescriptor.locator; 194 195 // create link identifier and link descriptor 196 if (linkid.isUnspecified()){ 197 linkid = LinkID::create(); 198 assert(!linkid.isUnspecified()); 199 } 200 201 logging_debug( "creating new local descriptor entry with local link id " << linkid.toString() ); 202 LinkDescriptor linkDescriptor( linkid, local, LinkID::UNSPECIFIED, remote, descriptor, false ); 203 addLink( linkDescriptor ); 204 205 // 206 // create a base msg with our link id and 207 // a request to open a link on the other side 208 // 209 210 logging_debug( "sending out base messages with request to open link to " << remote->toString() ); 211 AribaBaseMsg baseMsg( 212 remote, 213 AribaBaseMsg::LINK_STATE_OPEN_REQUEST, 214 linkid, 215 LinkID::UNSPECIFIED ); 216 217 transport->sendMessage(&baseMsg); 133 logging_debug( "Request to establish link" ); 134 135 // create link identifier 136 if (linkid.isUnspecified()) linkid = LinkID::create(); 137 138 // create link descriptor 139 logging_debug( "Creating new descriptor entry with local link id=" << linkid.toString() ); 140 LinkDescriptor* ld = new LinkDescriptor(); 141 ld->localLink = linkid; 142 addLink( ld ); 143 144 // send a message to request new link to remote 145 logging_debug( "Send messages with request to open link to " << descriptor.toString() ); 146 AribaBaseMsg baseMsg( AribaBaseMsg::typeLinkRequest, linkid ); 147 baseMsg.getLocalDescriptor() = localDescriptor; 148 149 // serialize and send message 150 send( &baseMsg, descriptor ); 151 218 152 return linkid; 219 153 } … … 221 155 void BaseCommunication::dropLink(const LinkID link) { 222 156 223 logging_debug( " starting to drop link " + link.toString() );157 logging_debug( "Starting to drop link " + link.toString() ); 224 158 225 159 // see if we have the link 226 LinkDescriptor& descriptor= queryLocalLink( link );227 if( descriptor.isUnspecified() ){228 logging_error( " don't know the link you want to drop "+ link.toString() );160 LinkDescriptor& ld = queryLocalLink( link ); 161 if( ld.isUnspecified() ) { 162 logging_error( "Don't know the link you want to drop "+ link.toString() ); 229 163 return; 230 164 } 231 165 166 // tell the registered listeners 167 BOOST_FOREACH( CommunicationEvents* i, eventListener ) { 168 i->onLinkDown( link, ld.localLocator, ld.remoteLocator ); 169 } 170 232 171 // create message to drop the link 233 logging_debug( "sending out link close request. for us, the link is closed now" ); 234 AribaBaseMsg msg( 235 descriptor.remoteLocator, 236 AribaBaseMsg::LINK_STATE_CLOSE_REQUEST, 237 descriptor.localLink, 238 descriptor.remoteLink 239 ); 172 logging_debug( "Sending out link close request. for us, the link is closed now" ); 173 AribaBaseMsg msg( AribaBaseMsg::typeLinkClose, ld.localLink, ld.remoteLink ); 240 174 241 175 // send message to drop the link 242 transport->sendMessage( &msg ); 243 244 // tell the registered listeners 245 BOOST_FOREACH( CommunicationEvents* i, eventListener ){ 246 i->onLinkDown( link, descriptor.localLocator, descriptor.remoteLocator ); 247 } 176 send( &msg, ld ); 248 177 249 178 // remove from map … … 253 182 seqnum_t BaseCommunication::sendMessage( const LinkID lid, const Message* message) { 254 183 255 logging_debug( " sending out message to link " << lid.toString() );184 logging_debug( "Sending out message to link " << lid.toString() ); 256 185 257 186 // query local link info 258 LinkDescriptor& l inkDesc= queryLocalLink(lid);259 if( l inkDesc.isUnspecified() ){260 logging_error( " don't know the link with id " << lid.toString() );187 LinkDescriptor& ld = queryLocalLink(lid); 188 if( ld.isUnspecified() ){ 189 logging_error( "Don't know the link with id " << lid.toString() ); 261 190 return -1; 262 191 } 263 192 193 // link not up-> error 194 if( !ld.up ) { 195 logging_error("Can not send on link " << lid.toString() << ": link not up"); 196 return -1; 197 } 198 264 199 // create message 265 AribaBaseMsg msg( 266 linkDesc.remoteLocator, 267 AribaBaseMsg::LINK_STATE_DATA, 268 linkDesc.localLink, 269 linkDesc.remoteLink 270 ); 200 AribaBaseMsg msg( AribaBaseMsg::typeData, ld.localLink, ld.remoteLink ); 271 201 272 202 // encapsulate the payload message 273 203 msg.encapsulate( const_cast<Message*>(message) ); 274 204 275 if( !linkDesc.linkup ){276 logging_error("cant send message on link " << lid.toString() << ", link not up");277 return -1;278 }279 280 205 // send message 281 transport->sendMessage( &msg ); 206 send( &msg, ld ); 207 208 // return sequence number 282 209 return ++currentSeqnum; 283 210 } 284 211 285 212 const EndpointDescriptor& BaseCommunication::getEndpointDescriptor(const LinkID link) const { 286 287 213 if( link == LinkID::UNSPECIFIED){ 288 214 return localDescriptor; … … 294 220 } 295 221 296 void BaseCommunication::registerMessageReceiver(MessageReceiver* _receiver) {297 messageReceiver = _receiver;298 }299 300 void BaseCommunication::unregisterMessageReceiver(MessageReceiver* _receiver) {301 messageReceiver = NULL;302 }303 304 222 void BaseCommunication::registerEventListener(CommunicationEvents* _events){ 305 306 223 if( eventListener.find( _events ) == eventListener.end() ) 307 224 eventListener.insert( _events ); … … 309 226 310 227 void BaseCommunication::unregisterEventListener(CommunicationEvents* _events){ 311 312 228 EventListenerSet::iterator i = eventListener.find( _events ); 313 229 if( i != eventListener.end() ) … … 315 231 } 316 232 317 318 bool BaseCommunication::receiveMessage(const Message* message, const LinkID& /*invalid*/, const NodeID& ){ 319 320 // 321 // these messages arrive from the Transport module 322 // and are incoming network messages. Unpack the 323 // AribaBaseMsg and handle control packets, 324 // deliver data packets to the overlay 325 // 326 327 AribaBaseMsg* spovmsg = ((Message*)message)->decapsulate<AribaBaseMsg>(); 328 logging_debug( "receiving base comm message of type " << spovmsg->getTypeString() ); 329 330 // 331 // deliver data to the overlays. we just give the 332 // inner packet to every registered overlay ... 333 // 334 335 if( spovmsg->getType() == AribaBaseMsg::LINK_STATE_DATA ){ 336 337 logging_debug( "received data message, forwarding to overlay" ); 338 339 // 340 // put the linkid as address into the message 341 // and sent it to the receiver 342 // 343 344 if( messageReceiver != NULL ) { 345 messageReceiver->receiveMessage( 346 spovmsg, 347 spovmsg->getRemoteLink(), 348 NodeID::UNSPECIFIED 233 SystemEventType TransportEvent("Transport"); 234 SystemEventType MessageDispatchEvent("MessageDispatchEvent", TransportEvent ); 235 236 class DispatchMsg { 237 public: 238 address_v* local; 239 address_v* remote; 240 Message* message; 241 }; 242 243 /// called when a system event is emitted by system queue 244 void BaseCommunication::handleSystemEvent(const SystemEvent& event) { 245 246 // dispatch received messages 247 if ( event.getType() == MessageDispatchEvent ){ 248 logging_debug( "Forwarding message receiver" ); 249 DispatchMsg* dmsg = event.getData<DispatchMsg>(); 250 Message* msg = dmsg->message; 251 receiveMessage(msg, dmsg->local, dmsg->remote); 252 msg->dropPayload(); 253 delete dmsg; 254 delete msg; 255 } 256 } 257 258 /// called when a message is received form transport_peer 259 void BaseCommunication::receive_message(transport_protocol* transport, 260 const address_vf local, const address_vf remote, const uint8_t* data, 261 size_t size) { 262 263 // logging_debug( "Dispatching message" ); 264 265 // convert data 266 Data data_( const_cast<uint8_t*>(data), size * 8 ); 267 DispatchMsg* dmsg = new DispatchMsg(); 268 269 Message* msg = new Message(data_); 270 dmsg->local = local->clone(); 271 dmsg->remote = remote->clone(); 272 dmsg->message = msg; 273 274 SystemQueue::instance().scheduleEvent( 275 SystemEvent( this, MessageDispatchEvent, dmsg ) 276 ); 277 } 278 279 /// handles a message from the underlay transport 280 void BaseCommunication::receiveMessage(const Message* message, 281 const address_v* local, const address_v* remote ){ 282 283 /// decapsulate message 284 AribaBaseMsg* msg = ((Message*)message)->decapsulate<AribaBaseMsg>(); 285 logging_debug( "Receiving message of type " << msg->getTypeString() ); 286 287 // handle message 288 switch (msg->getType()) { 289 290 // --------------------------------------------------------------------- 291 // data message 292 // --------------------------------------------------------------------- 293 case AribaBaseMsg::typeData: { 294 logging_debug( "Received data message, forwarding to overlay" ); 295 if( messageReceiver != NULL ) { 296 messageReceiver->receiveMessage( 297 msg, msg->getRemoteLink(), NodeID::UNSPECIFIED 298 ); 299 } 300 break; 301 } 302 303 // --------------------------------------------------------------------- 304 // handle link request from remote 305 // --------------------------------------------------------------------- 306 case AribaBaseMsg::typeLinkRequest: { 307 logging_debug( "Received link open request" ); 308 309 /// only answer the first request 310 if (!queryRemoteLink(msg->getLocalLink()).isUnspecified()) { 311 logging_debug("Link request already received. Ignore!"); 312 break; 313 } 314 315 /// create link ids 316 LinkID localLink = LinkID::create(); 317 LinkID remoteLink = msg->getLocalLink(); 318 logging_debug( "local=" << local->to_string() 319 << " remote=" << remote->to_string() 349 320 ); 321 322 // check if link creation is allowed by ALL listeners 323 bool allowlink = true; 324 BOOST_FOREACH( CommunicationEvents* i, eventListener ){ 325 allowlink &= i->onLinkRequest( localLink, local, remote ); 326 } 327 328 // not allowed-> warn 329 if( !allowlink ){ 330 logging_warn( "Overlay denied creation of link" ); 331 return; 332 } 333 334 // create descriptor 335 LinkDescriptor* ld = new LinkDescriptor(); 336 ld->localLink = localLink; 337 ld->remoteLink = remoteLink; 338 ld->localLocator = local->clone(); 339 ld->remoteLocator = remote->clone(); 340 ld->remoteEndpoint = msg->getLocalDescriptor(); 341 342 // add layer 1-3 addresses 343 ld->remoteEndpoint.getEndpoints().add( 344 ld->remoteLocator, endpoint_set::Layer1_3); 345 localDescriptor.getEndpoints().add( 346 local, endpoint_set::Layer1_3 347 ); 348 349 // link is now up-> add it 350 ld->up = true; 351 addLink(ld); 352 353 // link is up! 354 logging_debug( "Link (initiated from remote) is up with " 355 << "local(id=" << ld->localLink.toString() << "," 356 << "locator=" << ld->localLocator->to_string() << ") " 357 << "remote(id=" << ld->remoteLink.toString() << ", " 358 << "locator=" << ld->remoteLocator->to_string() << ")" 359 ); 360 361 // sending link request reply 362 logging_debug( "Sending link request reply with ids " 363 << "local=" << localLink.toString() << ", " 364 << "remote=" << remoteLink.toString() ); 365 AribaBaseMsg reply( AribaBaseMsg::typeLinkReply, localLink, remoteLink ); 366 reply.getLocalDescriptor() = localDescriptor; 367 reply.getRemoteDescriptor() = ld->remoteEndpoint; 368 369 send( &reply, *ld ); 370 371 // inform listeners about new open link 372 BOOST_FOREACH( CommunicationEvents* i, eventListener ) { 373 i->onLinkUp( localLink, ld->localLocator, ld->remoteLocator); 374 } 375 376 // done 377 break; 350 378 } 351 379 352 } // LINK_STATE_DATA 353 354 // 355 // handle link open requests 356 // 357 358 else if( spovmsg->getType() == AribaBaseMsg::LINK_STATE_OPEN_REQUEST ){ 359 360 logging_debug( "received link open request" ); 361 362 // 363 // create a link context 364 // 365 366 // in an incoming packet the localLink is from 367 // the sender perspective local and from our 368 // perspective remote 369 370 logging_debug( "creating local link" ); 371 372 LinkID localLink = LinkID::create(); 373 LinkID remoteLink = spovmsg->getLocalLink(); 374 375 if(localLink.isUnspecified()){ 376 logging_error("local link is unspecified"); 377 return false; 380 // --------------------------------------------------------------------- 381 // handle link request reply 382 // --------------------------------------------------------------------- 383 case AribaBaseMsg::typeLinkReply: { 384 logging_debug( "Received link open reply for a link we initiated" ); 385 386 // this is a reply to a link open request, so we have already 387 // a link mapping and can now set the remote link to valid 388 LinkDescriptor& ld = queryLocalLink( msg->getRemoteLink() ); 389 390 // no link found-> warn! 391 if (ld.isUnspecified()) { 392 logging_warn("Failed to find local link " << msg->getRemoteLink().toString()); 393 return; 394 } 395 396 // set remote locator and link id 397 ld.remoteLink = msg->getLocalLink(); 398 ld.remoteLocator = remote->clone(); 399 localDescriptor.getEndpoints().add( 400 msg->getRemoteDescriptor().getEndpoints(), 401 endpoint_set::Layer1_3 402 ); 403 ld.up = true; 404 405 logging_debug( "Link is now up with local id " 406 << ld.localLink.toString() << " and remote id " 407 << ld.remoteLink.toString() ); 408 409 410 // inform lisneters about link up event 411 BOOST_FOREACH( CommunicationEvents* i, eventListener ){ 412 i->onLinkUp( ld.localLink, ld.localLocator, ld.remoteLocator ); 413 } 414 415 // done 416 break; 378 417 } 379 418 380 if(remoteLink.isUnspecified()){ 381 logging_error("remote link is unspecified"); 382 return false; 419 // --------------------------------------------------------------------- 420 // handle link close requests 421 // --------------------------------------------------------------------- 422 case AribaBaseMsg::typeLinkClose: { 423 // get remote link 424 const LinkID& localLink = msg->getRemoteLink(); 425 logging_debug( "Received link close request for link " << localLink.toString() ); 426 427 // searching for link, not found-> warn 428 LinkDescriptor& linkDesc = queryLocalLink( localLink ); 429 if (linkDesc.isUnspecified()) { 430 logging_warn("Failed to find local link " << localLink.toString()); 431 return; 432 } 433 434 // inform listeners 435 BOOST_FOREACH( CommunicationEvents* i, eventListener ){ 436 i->onLinkDown( linkDesc.localLink, 437 linkDesc.localLocator, linkDesc.remoteLocator ); 438 } 439 440 // remove the link descriptor 441 removeLink( localLink ); 442 443 // done 444 break; 383 445 } 384 446 385 const NetworkLocator* localLocator = dynamic_cast<const NetworkLocator*>(localDescriptor.locator); 386 const NetworkLocator* remoteLocator = dynamic_cast<const NetworkLocator*>(message->getSourceAddress()); 387 388 logging_debug( "localLocator=" << localLocator->toString() 389 << " remoteLocator=" << remoteLocator->toString()); 390 391 // ask the registered listeners if this link 392 // creation is fine. we will only allow the 393 // link if all of them agree 394 395 bool allowlink = true; 396 BOOST_FOREACH( CommunicationEvents* i, eventListener ){ 397 allowlink &= i->onLinkRequest( localLink, localLocator, remoteLocator ); 447 // --------------------------------------------------------------------- 448 // handle link locator changes 449 // --------------------------------------------------------------------- 450 case AribaBaseMsg::typeLinkUpdate: { 451 const LinkID& localLink = msg->getRemoteLink(); 452 logging_debug( "Received link update for link " 453 << localLink.toString() ); 454 455 // find the link description 456 LinkDescriptor& linkDesc = queryLocalLink( localLink ); 457 if (linkDesc.isUnspecified()) { 458 logging_warn("Failed to update local link " 459 << localLink.toString()); 460 return; 461 } 462 463 // update the remote locator 464 const address_v* oldremote = linkDesc.remoteLocator; 465 linkDesc.remoteLocator = remote->clone(); 466 467 // inform the listeners (local link has _not_ changed!) 468 BOOST_FOREACH( CommunicationEvents* i, eventListener ){ 469 i->onLinkChanged( 470 linkDesc.localLink, // linkid 471 linkDesc.localLocator, // old local 472 linkDesc.localLocator, // new local 473 oldremote, // old remote 474 linkDesc.remoteLocator // new remote 475 ); 476 } 477 478 // done 479 break; 398 480 } 399 400 if( !allowlink ){ 401 logging_warn( "overlay denied creation of link" ); 402 return true; 403 } 404 405 // 406 // create and save the descriptor for the link 407 // 408 409 LinkDescriptor linkDescriptor(localLink, localLocator, remoteLink, 410 remoteLocator, EndpointDescriptor(remoteLocator), true); 411 412 logging_debug( "saving new link descriptor with " << 413 "[local link " << localLink.toString() << "] " << 414 "[local locator " << localLocator->toString() << "] " << 415 "[remote link " << remoteLink.toString() << "] " << 416 "[remote locator " << remoteLocator->toString() << "]" << 417 "[link up true]" ); 418 419 addLink( linkDescriptor ); 420 421 // 422 // send out a link reply 423 // 424 425 logging_debug( "sending back link open reply for " << 426 "[local link " << localLink.toString() << "] " << 427 "[remote link " << remoteLink.toString() << "]" ); 428 429 AribaBaseMsg reply(remoteLocator, 430 AribaBaseMsg::LINK_STATE_OPEN_REPLY, 431 localLink, 432 remoteLink); 433 434 transport->sendMessage( &reply ); 435 436 // 437 // the link is now open 438 // 439 440 BOOST_FOREACH( CommunicationEvents* i, eventListener ){ 441 i->onLinkUp( localLink, localLocator, remoteLocator ); 442 } 443 444 } // LINK_STATE_OPEN_REQUEST 445 446 // 447 // handle link open replies 448 // 449 450 else if( spovmsg->getType() == AribaBaseMsg::LINK_STATE_OPEN_REPLY ){ 451 452 logging_debug( "received link open reply for a link we initiated" ); 453 454 // this is a reply to a link open request, so we have already 455 // a link mapping and can now set the remote link to valid 456 LinkDescriptor& linkDesc = queryLocalLink( spovmsg->getRemoteLink() ); 457 458 if (linkDesc.isUnspecified()) { 459 logging_warn("failed to find local link " << spovmsg->getRemoteLink().toString()); 460 return false; 461 } 462 463 linkDesc.remoteLink = spovmsg->getLocalLink(); 464 linkDesc.linkup = true; 465 466 logging_debug( "the link is now up with local link id " << linkDesc.localLink.toString() << 467 " and remote link id " << linkDesc.remoteLink.toString() ); 468 469 // notify the baseoverlay that the link is up, so 470 // it can exchange nodeids over this link. then we 471 // can send the queued messages, as both nodes have 472 // to know their nodeids first 473 474 BOOST_FOREACH( CommunicationEvents* i, eventListener ){ 475 i->onLinkUp( linkDesc.localLink, linkDesc.localLocator, linkDesc.remoteLocator ); 476 } 477 478 } // LINK_STATE_OPEN_REPLY 479 480 // 481 // handle link close requests 482 // 483 484 else if( spovmsg->getType() == AribaBaseMsg::LINK_STATE_CLOSE_REQUEST ){ 485 486 const LinkID& localLink = spovmsg->getRemoteLink(); 487 logging_debug( "received link close request for link " << localLink.toString() ); 488 489 // 490 // the link is closed immediately, we 491 // don't need to send out a reply, so we 492 // delete the mapping and inform 493 // 494 495 LinkDescriptor& linkDesc = queryLocalLink( localLink ); 496 if (linkDesc.isUnspecified()) { 497 logging_warn("Failed to find local link " << localLink.toString()); 498 return false; 499 } 500 501 BOOST_FOREACH( CommunicationEvents* i, eventListener ){ 502 i->onLinkDown( linkDesc.localLink, linkDesc.localLocator, linkDesc.remoteLocator ); 503 } 504 505 // 506 // remove the link descriptor 507 // 508 509 removeLink( localLink ); 510 511 } // LINK_STATE_CLOSE_REQUEST 512 513 // 514 // handle locator updates 515 // 516 517 else if( spovmsg->getType() == AribaBaseMsg::LINK_STATE_UPDATE ){ 518 519 const LinkID& localLink = spovmsg->getRemoteLink(); 520 logging_debug( "received link update for link " << localLink.toString() ); 521 522 // 523 // find the link description 524 // 525 526 LinkDescriptor& linkDesc = queryLocalLink( localLink ); 527 if (linkDesc.isUnspecified()) { 528 logging_warn("Failed to update local link " << localLink.toString()); 529 return false; 530 } 531 532 // 533 // update the remote locator 534 // 535 536 const NetworkLocator* oldremote = linkDesc.remoteLocator; 537 linkDesc.remoteLocator = dynamic_cast<const NetworkLocator*>(message->getSourceAddress()); 538 539 // 540 // inform the listeners (local link has _not_ changed!) 541 // 542 543 BOOST_FOREACH( CommunicationEvents* i, eventListener ){ 544 i->onLinkChanged( 545 linkDesc.localLink, // linkid 546 linkDesc.localLocator, // old local 547 linkDesc.localLocator, // new local 548 oldremote, // old remote 549 linkDesc.remoteLocator // new remote 550 ); 551 } 552 553 } // LINK_STATE_UPDATE 554 555 return true; 556 } 557 558 void BaseCommunication::addLink( const LinkDescriptor& link ) { 481 } 482 } 483 484 /// add a newly allocated link to the set of links 485 void BaseCommunication::addLink( LinkDescriptor* link ) { 559 486 linkSet.push_back( link ); 560 487 } 561 488 489 /// remove a link from set 562 490 void BaseCommunication::removeLink( const LinkID& localLink ) { 563 564 LinkSet::iterator i = linkSet.begin(); 565 LinkSet::iterator iend = linkSet.end(); 566 567 for( ; i != iend; i++){ 568 if( (*i).localLink != localLink) continue; 569 491 for(LinkSet::iterator i=linkSet.begin(); i != linkSet.end(); i++){ 492 if( (*i)->localLink != localLink) continue; 493 delete *i; 570 494 linkSet.erase( i ); 571 495 break; … … 573 497 } 574 498 499 /// query a descriptor by local link id 575 500 BaseCommunication::LinkDescriptor& BaseCommunication::queryLocalLink( const LinkID& link ) const { 576 501 for (int i=0; i<linkSet.size();i++) 577 if (linkSet[i] .localLink == link) return (LinkDescriptor&)linkSet[i];502 if (linkSet[i]->localLink == link) return (LinkDescriptor&)*linkSet[i]; 578 503 return (LinkDescriptor&)LinkDescriptor::UNSPECIFIED; 579 504 } 580 505 506 /// query a descriptor by remote link id 581 507 BaseCommunication::LinkDescriptor& BaseCommunication::queryRemoteLink( const LinkID& link ) const { 582 508 for (int i=0; i<linkSet.size();i++) 583 if (linkSet[i] .remoteLink == link) return (LinkDescriptor&)linkSet[i];509 if (linkSet[i]->remoteLink == link) return (LinkDescriptor&)*linkSet[i]; 584 510 return (LinkDescriptor&)LinkDescriptor::UNSPECIFIED; 585 511 } 586 512 587 LinkIDs BaseCommunication::getLocalLinks( const EndpointDescriptor& ep) const {513 LinkIDs BaseCommunication::getLocalLinks( const address_v* addr ) const { 588 514 LinkIDs ids; 589 590 515 for (int i=0; i<linkSet.size(); i++){ 591 if( ep == EndpointDescriptor::UNSPECIFIED){592 ids.push_back( linkSet[i] .localLink );516 if( addr == NULL ){ 517 ids.push_back( linkSet[i]->localLink ); 593 518 } else { 594 if ( linkSet[i].remoteLocator == ep.locator )595 ids.push_back( linkSet[i] .localLink );519 if ( *linkSet[i]->remoteLocator == *addr ) 520 ids.push_back( linkSet[i]->localLink ); 596 521 } 597 522 } 598 599 523 return ids; 600 524 } … … 609 533 #endif // UNDERLAY_OMNET 610 534 611 // 535 /*- disabled! 536 612 537 // we only care about address changes, not about interface changes 613 538 // as address changes are triggered by interface changes, we are safe here 614 //615 616 539 if( info.type != NetworkChangeInterface::EventTypeAddressNew && 617 540 info.type != NetworkChangeInterface::EventTypeAddressDelete ) return; … … 619 542 logging_info( "base communication is handling network address changes" ); 620 543 621 //622 544 // get all now available addresses 623 //624 625 545 NetworkInformation networkInformation; 626 546 AddressInformation addressInformation; … … 745 665 transport->sendMessage( &updateMsg ); 746 666 } 667 */ 668 } 669 670 /// sends a message to all end-points in the end-point descriptor 671 void BaseCommunication::send(Message* message, const EndpointDescriptor& endpoint) { 672 Data data = data_serialize( message, DEFAULT_V ); 673 transport->send( endpoint.getEndpoints(), data.getBuffer(), data.getLength() / 8); 674 } 675 676 /// sends a message to the remote locator inside the link descriptor 677 void BaseCommunication::send(Message* message, const LinkDescriptor& desc) { 678 Data data = data_serialize( message, DEFAULT_V ); 679 transport->send( desc.remoteLocator, data.getBuffer(), data.getLength() / 8); 747 680 } 748 681
Note:
See TracChangeset
for help on using the changeset viewer.