00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include "BaseCommunication.h"
00040
00041 #include "networkinfo/AddressDiscovery.h"
00042 #include "ariba/utility/types/PeerID.h"
00043
00044 #ifdef UNDERLAY_OMNET
00045 #include "ariba/communication/modules/transport/omnet/AribaOmnetModule.h"
00046 #include "ariba/communication/modules/network/omnet/OmnetNetworkProtocol.h"
00047 #include "ariba/utility/system/StartupWrapper.h"
00048
00049 using ariba::communication::AribaOmnetModule;
00050 using ariba::communication::OmnetNetworkProtocol;
00051 using ariba::utility::StartupWrapper;
00052 #endif
00053
00054 namespace ariba {
00055 namespace communication {
00056
00057 using ariba::utility::PeerID;
00058
00059 use_logging_cpp(BaseCommunication);
00060
00062 void BaseCommunication::add_endpoint( const address_v* endpoint ) {
00063 if (endpoint==NULL) return;
00064 BOOST_FOREACH( endpoint_reference& ref, remote_endpoints ) {
00065 if (ref.endpoint->type_id() == endpoint->type_id() && *ref.endpoint == *endpoint) {
00066 ref.count++;
00067 return;
00068 }
00069 }
00070 endpoint_reference ref;
00071 ref.endpoint = endpoint->clone();
00072 ref.count = 1;
00073 remote_endpoints.push_back(ref);
00074 }
00075
00077 void BaseCommunication::remove_endpoint( const address_v* endpoint ) {
00078 if (endpoint==NULL) return;
00079 for (vector<endpoint_reference>::iterator i = remote_endpoints.begin();
00080 i != remote_endpoints.end(); i++) {
00081 if ((*i->endpoint).type_id() == endpoint->type_id() && (*i->endpoint) == *endpoint) {
00082 i->count--;
00083 if (i->count==0) {
00084 logging_info("No more links to " << i->endpoint->to_string() << ": terminating transports!");
00085 transport->terminate(i->endpoint);
00086 delete i->endpoint;
00087 remote_endpoints.erase(i);
00088 }
00089 return;
00090 }
00091 }
00092 }
00093
00094 BaseCommunication::BaseCommunication() {
00095 this->transport = NULL;
00096 this->started = false;
00097 }
00098
00099 BaseCommunication::~BaseCommunication(){
00100 }
00101
00102 void BaseCommunication::start() {
00103 logging_info( "Starting up ..." );
00104 currentSeqnum = 0;
00105
00106
00107 localDescriptor.getPeerId() = PeerID::random();
00108 logging_info( "Using PeerID: " << localDescriptor.getPeerId() );
00109
00110
00111 logging_info( "Creating transports ..." );
00112
00113 #ifdef UNDERLAY_OMNET
00114 AribaOmnetModule* module = StartupWrapper::getCurrentModule();
00115 module->setServerPort( listenport );
00116
00117 transport = module;
00118 network = new OmnetNetworkProtocol( module );
00119 #else
00120 transport = new transport_peer( localDescriptor.getEndpoints() );
00121 #endif
00122
00123 logging_info( "Searching for local locators ..." );
00128 AddressDiscovery::discover_endpoints( localDescriptor.getEndpoints() );
00129 logging_info( "Done. Local endpoints = " << localDescriptor.toString() );
00130
00131 transport->register_listener( this );
00132 transport->start();
00133
00134 #ifndef UNDERLAY_OMNET
00135
00136 networkMonitor.registerNotification( this );
00137 #endif
00138
00139
00140 started = true;
00141 logging_info( "Started up." );
00142 }
00143
00144 void BaseCommunication::stop() {
00145 logging_info( "Stopping transports ..." );
00146
00147 transport->stop();
00148 delete transport;
00149 started = false;
00150
00151 logging_info( "Stopped." );
00152 }
00153
00154 bool BaseCommunication::isStarted(){
00155 return started;
00156 }
00157
00159 void BaseCommunication::setEndpoints( string& _endpoints ) {
00160 localDescriptor.getEndpoints().assign(_endpoints);
00161 logging_info("Setting local end-points: "
00162 << localDescriptor.getEndpoints().to_string());
00163 }
00164
00165 const LinkID BaseCommunication::establishLink(
00166 const EndpointDescriptor& descriptor,
00167 const LinkID& link_id,
00168 const QoSParameterSet& qos,
00169 const SecurityParameterSet& sec) {
00170
00171
00172 LinkID linkid = link_id;
00173
00174
00175 logging_debug( "Request to establish link" );
00176
00177
00178 if (linkid.isUnspecified()) linkid = LinkID::create();
00179
00180
00181 logging_debug( "Creating new descriptor entry with local link id=" << linkid.toString() );
00182 LinkDescriptor* ld = new LinkDescriptor();
00183 ld->localLink = linkid;
00184 addLink( ld );
00185
00186
00187 logging_debug( "Send messages with request to open link to " << descriptor.toString() );
00188 AribaBaseMsg baseMsg( AribaBaseMsg::typeLinkRequest, linkid );
00189 baseMsg.getLocalDescriptor() = localDescriptor;
00190 baseMsg.getRemoteDescriptor().getPeerId() = descriptor.getPeerId();
00191
00192
00193 send( &baseMsg, descriptor );
00194
00195 return linkid;
00196 }
00197
00198 void BaseCommunication::dropLink(const LinkID link) {
00199
00200 logging_debug( "Starting to drop link " + link.toString() );
00201
00202
00203 LinkDescriptor& ld = queryLocalLink( link );
00204 if( ld.isUnspecified() ) {
00205 logging_error( "Don't know the link you want to drop "+ link.toString() );
00206 return;
00207 }
00208
00209
00210 BOOST_FOREACH( CommunicationEvents* i, eventListener ) {
00211 i->onLinkDown( link, ld.localLocator, ld.remoteLocator );
00212 }
00213
00214
00215 logging_debug( "Sending out link close request. for us, the link is closed now" );
00216 AribaBaseMsg msg( AribaBaseMsg::typeLinkClose, ld.localLink, ld.remoteLink );
00217
00218
00219 send( &msg, ld );
00220
00221
00222 removeLink(link);
00223 }
00224
00225 seqnum_t BaseCommunication::sendMessage( const LinkID lid, const Message* message) {
00226
00227 logging_debug( "Sending out message to link " << lid.toString() );
00228
00229
00230 LinkDescriptor& ld = queryLocalLink(lid);
00231 if( ld.isUnspecified() ){
00232 logging_error( "Don't know the link with id " << lid.toString() );
00233 return -1;
00234 }
00235
00236
00237 if( !ld.up ) {
00238 logging_error("Can not send on link " << lid.toString() << ": link not up");
00239 return -1;
00240 }
00241
00242
00243 AribaBaseMsg msg( AribaBaseMsg::typeData, ld.localLink, ld.remoteLink );
00244
00245
00246 msg.encapsulate( const_cast<Message*>(message) );
00247
00248
00249 send( &msg, ld );
00250
00251
00252 return ++currentSeqnum;
00253 }
00254
00255 const EndpointDescriptor& BaseCommunication::getEndpointDescriptor(const LinkID link) const {
00256 if( link.isUnspecified() ){
00257 return localDescriptor;
00258 } else {
00259 LinkDescriptor& linkDesc = queryLocalLink(link);
00260 if (linkDesc.isUnspecified()) return EndpointDescriptor::UNSPECIFIED();
00261 return linkDesc.remoteEndpoint;
00262 }
00263 }
00264
00265 void BaseCommunication::registerEventListener(CommunicationEvents* _events){
00266 if( eventListener.find( _events ) == eventListener.end() )
00267 eventListener.insert( _events );
00268 }
00269
00270 void BaseCommunication::unregisterEventListener(CommunicationEvents* _events){
00271 EventListenerSet::iterator i = eventListener.find( _events );
00272 if( i != eventListener.end() )
00273 eventListener.erase( i );
00274 }
00275
00276 SystemEventType TransportEvent("Transport");
00277 SystemEventType MessageDispatchEvent("MessageDispatchEvent", TransportEvent );
00278
00279 class DispatchMsg {
00280 public:
00281 address_v* local;
00282 address_v* remote;
00283 Message* message;
00284 };
00285
00287 void BaseCommunication::handleSystemEvent(const SystemEvent& event) {
00288
00289
00290 if ( event.getType() == MessageDispatchEvent ){
00291 logging_debug( "Forwarding message receiver" );
00292 DispatchMsg* dmsg = event.getData<DispatchMsg>();
00293 Message* msg = dmsg->message;
00294 receiveMessage(msg, dmsg->local, dmsg->remote);
00295 msg->dropPayload();
00296 delete dmsg;
00297 delete msg;
00298 }
00299 }
00300
00302 void BaseCommunication::receive_message(transport_protocol* transport,
00303 const address_vf local, const address_vf remote, const uint8_t* data,
00304 size_t size) {
00305
00306
00307
00308
00309 Data data_( const_cast<uint8_t*>(data), size * 8 );
00310 DispatchMsg* dmsg = new DispatchMsg();
00311
00312 Message* msg = new Message(data_);
00313 dmsg->local = local->clone();
00314 dmsg->remote = remote->clone();
00315 dmsg->message = msg;
00316
00317 SystemQueue::instance().scheduleEvent(
00318 SystemEvent( this, MessageDispatchEvent, dmsg )
00319 );
00320 }
00321
00323 void BaseCommunication::receiveMessage(const Message* message,
00324 const address_v* local, const address_v* remote ){
00325
00327 AribaBaseMsg* msg = ((Message*)message)->decapsulate<AribaBaseMsg>();
00328 logging_debug( "Receiving message of type " << msg->getTypeString() );
00329
00330
00331 switch (msg->getType()) {
00332
00333
00334
00335
00336 case AribaBaseMsg::typeData: {
00337 logging_debug( "Received data message, forwarding to overlay" );
00338 if( messageReceiver != NULL ) {
00339 messageReceiver->receiveMessage(
00340 msg, msg->getRemoteLink(), NodeID::UNSPECIFIED
00341 );
00342 }
00343 break;
00344 }
00345
00346
00347
00348
00349 case AribaBaseMsg::typeLinkRequest: {
00350 logging_debug( "Received link open request" );
00351
00353 if (!msg->getRemoteDescriptor().getPeerId().isUnspecified()
00354 && msg->getRemoteDescriptor().getPeerId() != localDescriptor.getPeerId()) {
00355 logging_info("Received link request for "
00356 << msg->getRemoteDescriptor().getPeerId().toString()
00357 << "but i'm "
00358 << localDescriptor.getPeerId()
00359 << ": Ignoring!");
00360 break;
00361 }
00362
00364 if (!queryRemoteLink(msg->getLocalLink()).isUnspecified()) {
00365 logging_debug("Link request already received. Ignore!");
00366 break;
00367 }
00368
00370 LinkID localLink = LinkID::create();
00371 LinkID remoteLink = msg->getLocalLink();
00372 logging_debug( "local=" << local->to_string()
00373 << " remote=" << remote->to_string()
00374 );
00375
00376
00377 bool allowlink = true;
00378 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
00379 allowlink &= i->onLinkRequest( localLink, local, remote );
00380 }
00381
00382
00383 if( !allowlink ){
00384 logging_warn( "Overlay denied creation of link" );
00385 return;
00386 }
00387
00388
00389 LinkDescriptor* ld = new LinkDescriptor();
00390 ld->localLink = localLink;
00391 ld->remoteLink = remoteLink;
00392 ld->localLocator = local->clone();
00393 ld->remoteLocator = remote->clone();
00394 ld->remoteEndpoint = msg->getLocalDescriptor();
00395 add_endpoint(ld->remoteLocator);
00396
00397
00398 ld->remoteEndpoint.getEndpoints().add(
00399 ld->remoteLocator, endpoint_set::Layer1_3 | endpoint_set::NoLoopback);
00400 localDescriptor.getEndpoints().add(
00401 local, endpoint_set::Layer1_3 | endpoint_set::NoLoopback
00402 );
00403
00404
00405 ld->up = true;
00406 addLink(ld);
00407
00408
00409 logging_debug( "Link (initiated from remote) is up with "
00410 << "local(id=" << ld->localLink.toString() << ","
00411 << "locator=" << ld->localLocator->to_string() << ") "
00412 << "remote(id=" << ld->remoteLink.toString() << ", "
00413 << "locator=" << ld->remoteLocator->to_string() << ")"
00414 );
00415
00416
00417 logging_debug( "Sending link request reply with ids "
00418 << "local=" << localLink.toString() << ", "
00419 << "remote=" << remoteLink.toString() );
00420 AribaBaseMsg reply( AribaBaseMsg::typeLinkReply, localLink, remoteLink );
00421 reply.getLocalDescriptor() = localDescriptor;
00422 reply.getRemoteDescriptor() = ld->remoteEndpoint;
00423
00424 send( &reply, *ld );
00425
00426
00427 BOOST_FOREACH( CommunicationEvents* i, eventListener ) {
00428 i->onLinkUp( localLink, ld->localLocator, ld->remoteLocator);
00429 }
00430
00431
00432 break;
00433 }
00434
00435
00436
00437
00438 case AribaBaseMsg::typeLinkReply: {
00439 logging_debug( "Received link open reply for a link we initiated" );
00440
00441
00442
00443 LinkDescriptor& ld = queryLocalLink( msg->getRemoteLink() );
00444
00445
00446 if (ld.isUnspecified()) {
00447 logging_warn("Failed to find local link " << msg->getRemoteLink().toString());
00448 return;
00449 }
00450
00451
00452 ld.remoteLink = msg->getLocalLink();
00453 ld.remoteLocator = remote->clone();
00454 ld.remoteEndpoint.getEndpoints().add(
00455 msg->getLocalDescriptor().getEndpoints(),
00456 endpoint_set::Layer1_4
00457 );
00458
00459
00460 localDescriptor.getEndpoints().add(
00461 msg->getRemoteDescriptor().getEndpoints(),
00462 endpoint_set::Layer1_3
00463 );
00464 ld.up = true;
00465 add_endpoint(ld.remoteLocator);
00466
00467 logging_debug( "Link is now up with local id "
00468 << ld.localLink.toString() << " and remote id "
00469 << ld.remoteLink.toString() );
00470
00471
00472
00473 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
00474 i->onLinkUp( ld.localLink, ld.localLocator, ld.remoteLocator );
00475 }
00476
00477
00478 break;
00479 }
00480
00481
00482
00483
00484 case AribaBaseMsg::typeLinkClose: {
00485
00486 const LinkID& localLink = msg->getRemoteLink();
00487 logging_debug( "Received link close request for link " << localLink.toString() );
00488
00489
00490 LinkDescriptor& linkDesc = queryLocalLink( localLink );
00491 if (linkDesc.isUnspecified()) {
00492 logging_warn("Failed to find local link " << localLink.toString());
00493 return;
00494 }
00495
00496
00497 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
00498 i->onLinkDown( linkDesc.localLink,
00499 linkDesc.localLocator, linkDesc.remoteLocator );
00500 }
00501
00502
00503 removeLink( localLink );
00504
00505
00506 break;
00507 }
00508
00509
00510
00511
00512 case AribaBaseMsg::typeLinkUpdate: {
00513 const LinkID& localLink = msg->getRemoteLink();
00514 logging_debug( "Received link update for link "
00515 << localLink.toString() );
00516
00517
00518 LinkDescriptor& linkDesc = queryLocalLink( localLink );
00519 if (linkDesc.isUnspecified()) {
00520 logging_warn("Failed to update local link "
00521 << localLink.toString());
00522 return;
00523 }
00524
00525
00526 const address_v* oldremote = linkDesc.remoteLocator;
00527 linkDesc.remoteLocator = remote->clone();
00528
00529
00530 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
00531 i->onLinkChanged(
00532 linkDesc.localLink,
00533 linkDesc.localLocator,
00534 linkDesc.localLocator,
00535 oldremote,
00536 linkDesc.remoteLocator
00537 );
00538 }
00539
00540
00541 break;
00542 }
00543 }
00544 delete msg;
00545 }
00546
00548 void BaseCommunication::addLink( LinkDescriptor* link ) {
00549 linkSet.push_back( link );
00550 }
00551
00553 void BaseCommunication::removeLink( const LinkID& localLink ) {
00554 for(LinkSet::iterator i=linkSet.begin(); i != linkSet.end(); i++){
00555 if( (*i)->localLink != localLink) continue;
00556 remove_endpoint((*i)->remoteLocator);
00557 delete *i;
00558 linkSet.erase( i );
00559 break;
00560 }
00561 }
00562
00564 BaseCommunication::LinkDescriptor& BaseCommunication::queryLocalLink( const LinkID& link ) const {
00565 for (size_t i=0; i<linkSet.size();i++)
00566 if (linkSet[i]->localLink == link) return (LinkDescriptor&)*linkSet[i];
00567
00568 return LinkDescriptor::UNSPECIFIED();
00569 }
00570
00572 BaseCommunication::LinkDescriptor& BaseCommunication::queryRemoteLink( const LinkID& link ) const {
00573 for (size_t i=0; i<linkSet.size();i++)
00574 if (linkSet[i]->remoteLink == link) return (LinkDescriptor&)*linkSet[i];
00575
00576 return LinkDescriptor::UNSPECIFIED();
00577 }
00578
00579 LinkIDs BaseCommunication::getLocalLinks( const address_v* addr ) const {
00580 LinkIDs ids;
00581 for (size_t i=0; i<linkSet.size(); i++){
00582 if( addr == NULL ){
00583 ids.push_back( linkSet[i]->localLink );
00584 } else {
00585 if ( *linkSet[i]->remoteLocator == *addr )
00586 ids.push_back( linkSet[i]->localLink );
00587 }
00588 }
00589 return ids;
00590 }
00591
00592 void BaseCommunication::onNetworkChange(const NetworkChangeInterface::NetworkChangeInfo& info){
00593
00594 #ifdef UNDERLAY_OMNET
00595
00596
00597 return
00598
00599 #endif // UNDERLAY_OMNET
00600
00601
00602
00603
00604
00605
00606
00607
00608
00609
00610
00611
00612
00613
00614
00615
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629
00630
00631
00632
00633
00634
00635
00636
00637
00638
00639
00640
00641
00642
00643
00644
00645
00646
00647
00648
00649
00650
00651
00652
00653
00654
00655
00656
00657
00658
00659
00660
00661
00662
00663
00664
00665
00666
00667
00668
00669
00670
00671
00672
00673
00674
00675
00676
00677
00678
00679
00680
00681
00682
00683
00684
00685
00686
00687
00688
00689
00690
00691
00692
00693
00694
00695
00696
00697
00698
00699
00700
00701
00702
00703
00704
00705
00706
00707
00708
00709
00710
00711
00712
00713
00714
00715
00716
00717
00718
00719
00720
00721
00722
00723
00724
00725
00726
00727
00728
00729
00730
00731
00732
00733
00734 }
00735
00737 void BaseCommunication::send(Message* message, const EndpointDescriptor& endpoint) {
00738 Data data = data_serialize( message, DEFAULT_V );
00739 transport->send( endpoint.getEndpoints(), data.getBuffer(), data.getLength() / 8);
00740 }
00741
00743 void BaseCommunication::send(Message* message, const LinkDescriptor& desc) {
00744 if (desc.remoteLocator==NULL) return;
00745 Data data = data_serialize( message, DEFAULT_V );
00746 transport->send( desc.remoteLocator, data.getBuffer(), data.getLength() / 8);
00747 }
00748
00749 }}