Ignore:
Timestamp:
Jun 19, 2013, 11:05:49 AM (11 years ago)
Author:
hock@…
Message:

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • source/ariba/overlay/BaseOverlay.cpp

    r10653 r12060  
    5757#include "ariba/utility/visual/DddVis.h"
    5858#include "ariba/utility/visual/ServerVis.h"
     59#include <ariba/utility/misc/sha1.h>
    5960
    6061namespace ariba {
    6162namespace overlay {
     63
     64using namespace std;
     65using ariba::transport::system_priority;
    6266
    6367#define visualInstance          ariba::utility::DddVis::instance()
    6468#define visualIdOverlay         ariba::utility::ServerVis::NETWORK_ID_BASE_OVERLAY
    6569#define visualIdBase            ariba::utility::ServerVis::NETWORK_ID_BASE_COMMUNICATION
     70
     71
     72// time constants (in seconds)
     73#define KEEP_ALIVE_TIME         60                      // send keep-alive message after link is not used for #s 
     74
     75#define LINK_ESTABLISH_TIME_OUT 10                      // timeout: link requested but not up
     76#define KEEP_ALIVE_TIME_OUT     KEEP_ALIVE_TIME + LINK_ESTABLISH_TIME_OUT     // timeout: no data received on this link (incl. keep-alive messages)
     77#define AUTO_LINK_TIME_OUT      KEEP_ALIVE_TIME_OUT                    // timeout: auto link not used for #s
    6678
    6779
     
    8597
    8698LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
    87         BOOST_FOREACH( LinkDescriptor* lp, links )
     99        foreach( LinkDescriptor* lp, links )
    88100                                if ((communication ? lp->communicationId : lp->overlayId) == link)
    89101                                        return lp;
     
    92104
    93105const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
    94         BOOST_FOREACH( const LinkDescriptor* lp, links )
     106        foreach( const LinkDescriptor* lp, links )
    95107                                if ((communication ? lp->communicationId : lp->overlayId) == link)
    96108                                        return lp;
     
    122134
    123135/// returns a auto-link descriptor
    124 LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) {
     136LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service )
     137{
    125138        // search for a descriptor that is already up
    126         BOOST_FOREACH( LinkDescriptor* lp, links )
    127                                 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up && lp->keepAliveMissed == 0)
    128                                         return lp;
     139        foreach( LinkDescriptor* lp, links )
     140    {
     141        if (lp->autolink && lp->remoteNode == node && lp->service == service && isLinkVital(lp) )
     142            return lp;
     143    }
     144       
    129145        // if not found, search for one that is about to come up...
    130         BOOST_FOREACH( LinkDescriptor* lp, links )
    131         if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->keepAliveMissed == 0 )
    132                 return lp;
     146        foreach( LinkDescriptor* lp, links )
     147        {
     148            time_t now = time(NULL);
     149           
     150        if (lp->autolink && lp->remoteNode == node && lp->service == service
     151                && difftime( now, lp->keepAliveReceived ) <= LINK_ESTABLISH_TIME_OUT )
     152            return lp;
     153        }
     154       
    133155        return NULL;
    134156}
     
    136158/// stabilizes link information
    137159void BaseOverlay::stabilizeLinks() {
    138         // send keep-alive messages over established links
    139         BOOST_FOREACH( LinkDescriptor* ld, links ) {
     160    time_t now = time(NULL);
     161   
     162    // send keep-alive messages over established links
     163        foreach( LinkDescriptor* ld, links )
     164    {
    140165                if (!ld->up) continue;
    141                 OverlayMsg msg( OverlayMsg::typeLinkAlive,
    142                                 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
    143                 if (ld->relayed) msg.setRouteRecord(true);
    144                 send_link( &msg, ld->overlayId );
     166               
     167                if ( difftime( now, ld->keepAliveSent ) >= KEEP_ALIVE_TIME )
     168                {
     169                    logging_debug("[BaseOverlay] Sending KeepAlive over "
     170                            << ld->to_string()
     171                            << " after "
     172                            << difftime( now, ld->keepAliveSent )
     173                            << "s");
     174                   
     175            OverlayMsg msg( OverlayMsg::typeKeepAlive,
     176                    OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
     177            msg.setRouteRecord(true);
     178            ld->keepAliveSent = now;
     179            send_link( &msg, ld->overlayId, system_priority::OVERLAY );
     180                }
    145181        }
    146182
    147183        // iterate over all links and check for time boundaries
    148184        vector<LinkDescriptor*> oldlinks;
    149         time_t now = time(NULL);
    150         BOOST_FOREACH( LinkDescriptor* ld, links ) {
    151 
    152                 // keep alives and not up? yes-> link connection request is stale!
    153                 if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) {
    154 
    155                         // increase counter
    156                         ld->keepAliveMissed++;
    157 
    158                         // missed more than four keep-alive messages (10 sec)? -> drop link
    159                         if (ld->keepAliveMissed > 4) {
    160                                 logging_info( "Link connection request is stale, closing: " << ld );
    161                                 oldlinks.push_back( ld );
    162                                 continue;
    163                         }
     185        foreach( LinkDescriptor* ld, links ) {
     186
     187                // link connection request stale?
     188                if ( !ld->up && difftime( now, ld->keepAliveReceived ) >= LINK_ESTABLISH_TIME_OUT )  // NOTE: keepAliveReceived == now, on connection request
     189                {
     190            logging_info( "Link connection request is stale, closing: " << ld );
     191            ld->failed = true;
     192            oldlinks.push_back( ld );
     193            continue;
    164194                }
    165195
    166196                if (!ld->up) continue;
    167197
     198               
     199               
     200               
    168201                // check if link is relayed and retry connecting directly
     202                // TODO Mario: What happens here?  --> There are 3 attempts to replace a relayed link with a direct one. see: handleLinkReply
    169203                if ( ld->relayed && !ld->communicationUp && ld->retryCounter > 0) {
    170204                        ld->retryCounter--;
     
    173207
    174208                // remote used as relay flag
    175                 if ( ld->relaying && difftime( now, ld->timeRelaying ) > 10)
     209                if ( ld->relaying && difftime( now, ld->timeRelaying ) > KEEP_ALIVE_TIME_OUT)  // TODO is this a reasonable timeout ??
    176210                        ld->relaying = false;
    177211
     
    183217
    184218                // auto-link time exceeded?
    185                 if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) {
     219                if ( ld->autolink && difftime( now, ld->lastuse ) > AUTO_LINK_TIME_OUT ) {
    186220                        oldlinks.push_back( ld );
    187221                        continue;
     
    189223
    190224                // keep alives missed? yes->
    191                 if ( difftime( now, ld->keepAliveTime ) > 4 ) {
    192 
    193                         // increase counter
    194                         ld->keepAliveMissed++;
    195 
    196                         // missed more than four keep-alive messages (4 sec)? -> drop link
    197                         if (ld->keepAliveMissed >= 2) {
    198                                 logging_info( "Link is stale, closing: " << ld );
    199                                 oldlinks.push_back( ld );
    200                                 continue;
    201                         }
     225                if ( difftime( now, ld->keepAliveReceived ) >= KEEP_ALIVE_TIME_OUT )
     226                {
     227            logging_info( "Link is stale, closing: " << ld );
     228            ld->failed = true;
     229            oldlinks.push_back( ld );
     230            continue;
    202231                }
    203232        }
    204233
    205234        // drop links
    206         BOOST_FOREACH( LinkDescriptor* ld, oldlinks ) {
     235        foreach( LinkDescriptor* ld, oldlinks ) {
    207236                logging_info( "Link timed out. Dropping " << ld );
    208237                ld->relaying = false;
     
    210239        }
    211240
    212         // show link state
    213         counter++;
    214         if (counter>=4) showLinks();
    215         if (counter>=4 || counter<0) counter = 0;
     241       
     242       
     243       
     244        // show link state  (debug output)
     245        if (counter>=10 || counter<0)
     246    {
     247            showLinks();
     248            counter = 0;
     249    }
     250        else
     251        {
     252            counter++;
     253        }
    216254}
    217255
     
    230268
    231269                int i=0;
    232                 BOOST_FOREACH( LinkDescriptor* ld, links ) {
    233                         if (!ld->isVital() || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) continue;
     270                foreach( LinkDescriptor* ld, links ) {
     271                        if (!isLinkVital(ld) || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) continue;
    234272                        bool found = false;
    235                         BOOST_FOREACH(NodeID& id, nodes)
     273                        foreach(NodeID& id, nodes)
    236274                        if (id  == ld->remoteNode) found = true;
    237275                        if (found) continue;
     
    261299        int i=0;
    262300        logging_info("--- link state -------------------------------");
    263         BOOST_FOREACH( LinkDescriptor* ld, links ) {
     301        foreach( LinkDescriptor* ld, links ) {
    264302                string epd = "";
    265                 if (ld->isDirectVital())
    266                         epd = getEndpointDescriptor(ld->remoteNode).toString();
     303                if (isLinkDirectVital(ld))
     304                {
     305//                      epd = getEndpointDescriptor(ld->remoteNode).toString();
     306                   
     307                    epd = "Connection: ";
     308                    epd += bc->get_local_endpoint_of_link(ld->communicationId)->to_string();
     309                    epd += " <---> ";
     310                    epd += bc->get_remote_endpoint_of_link(ld->communicationId)->to_string();
     311                }
    267312
    268313                logging_info("LINK_STATE: " << i << ": " << ld << " " << epd);
     
    289334// internal message delivery ---------------------------------------------------
    290335
     336
     337seqnum_t BaseOverlay::send_overlaymessage_down( OverlayMsg* message, const LinkID& bc_link, uint8_t priority )
     338{
     339    // set priority
     340    message->setPriority(priority);
     341   
     342    // wrap old OverlayMsg into reboost message
     343    reboost::message_t wrapped_message = message->wrap_up_for_sending();
     344   
     345    // send down to BaseCommunication
     346    try
     347    {
     348        // * send *
     349        return bc->sendMessage(bc_link, wrapped_message, priority, false);
     350    }
     351    catch ( communication::communication_message_not_sent& e )
     352    {
     353        ostringstream out;
     354        out << "Communication message not sent: " << e.what();
     355        throw message_not_sent(out.str());
     356    }
     357   
     358    throw logic_error("This should never happen!");
     359}
     360
     361
    291362/// routes a message to its destination node
    292 void BaseOverlay::route( OverlayMsg* message ) {
    293 
     363void BaseOverlay::route( OverlayMsg* message, const NodeID& last_hop )
     364{
    294365        // exceeded time-to-live? yes-> drop message
    295         if (message->getNumHops() > message->getTimeToLive()) {
    296                 logging_warn("Message exceeded TTL. Dropping message and relay routes"
    297                                 "for recovery.");
     366        if (message->getNumHops() > message->getTimeToLive())
     367        {
     368                logging_warn("Message exceeded TTL. Dropping message and relay routes "
     369            << "for recovery. Hop count: " << (int) message->getNumHops());
    298370                removeRelayNode(message->getDestinationNode());
    299371                return;
     
    301373
    302374        // no-> forward message
    303         else {
     375        else
     376        {
    304377                // destinastion myself? yes-> handle message
    305                 if (message->getDestinationNode() == nodeId) {
    306                         logging_warn("Usually I should not route messages to myself!");
    307                         Message msg;
    308                         msg.encapsulate(message);
    309                         handleMessage( &msg, NULL );
    310                 } else {
    311                         // no->send message to next hop
    312                         send( message, message->getDestinationNode() );
    313                 }
    314         }
     378                if (message->getDestinationNode() == nodeId)
     379                {
     380                        logging_warn("Usually I should not route messages to myself. And I won't!");
     381                }
     382
     383                // no->send message to next hop
     384                else
     385                {
     386                    try
     387                    {
     388                /* (deep) packet inspection to determine priority */
     389                // BRANCH: typeData  -->  send with low priority
     390                if ( message->getType() == OverlayMsg::typeData )
     391                {
     392                    // TODO think about implementing explicit routing queue (with active queue management??)
     393                    send( message,
     394                          message->getDestinationNode(),
     395                          message->getPriority(),
     396                          last_hop );
     397                }
     398                // BRANCH: internal message  -->  send with higher priority
     399                else
     400                {
     401                    send( message,
     402                          message->getDestinationNode(),
     403                          system_priority::HIGH,
     404                          last_hop );
     405                }
     406                    }
     407                    catch ( message_not_sent& e )
     408                    {
     409                        logging_warn("Unable to route message of type "
     410                                << message->getType()
     411                                << " to "
     412                                << message->getDestinationNode()
     413                                << ". Reason: "
     414                                << e.what());
     415                       
     416                        // inform sender
     417                if ( message->getType() != OverlayMsg::typeMessageLost )
     418                {
     419                    report_lost_message(message);
     420                }
     421                    }
     422                }
     423        }
     424}
     425
     426void BaseOverlay::report_lost_message( const OverlayMsg* message )
     427{
     428    OverlayMsg reply(OverlayMsg::typeMessageLost);
     429    reply.setSeqNum(message->getSeqNum());
     430   
     431    /**
     432     * MessageLost-Message
     433     *
     434     * - Type of lost message
     435     * - Hop count of lost message
     436     * - Source-LinkID  of lost message
     437     */
     438    reboost::shared_buffer_t b(sizeof(uint8_t)*2);
     439    b.mutable_data()[0] = message->getType();
     440    b.mutable_data()[1] = message->getNumHops();
     441    reply.append_buffer(b);
     442    reply.append_buffer(message->getSourceLink().serialize());
     443   
     444    try
     445    {
     446        send_node(&reply, message->getSourceNode(),
     447                system_priority::OVERLAY,
     448                OverlayInterface::OVERLAY_SERVICE_ID);
     449    }
     450    catch ( message_not_sent& e )
     451    {
     452        logging_warn("Tried to inform another node that we could'n route their message. But we were not able to send this error-message, too.");
     453    }
    315454}
    316455
    317456/// sends a message to another node, delivers it to the base overlay class
    318 seqnum_t BaseOverlay::send( OverlayMsg* message, const NodeID& destination ) {
     457seqnum_t BaseOverlay::send( OverlayMsg* message,
     458        const NodeID& destination,
     459        uint8_t priority,
     460        const NodeID& last_hop ) throw(message_not_sent)
     461{
    319462        LinkDescriptor* next_link = NULL;
    320463
    321464        // drop messages to unspecified destinations
    322         if (destination.isUnspecified()) return -1;
    323 
    324         // send messages to myself -> handle message and drop warning!
    325         if (destination == nodeId) {
    326                 logging_warn("Sent message to myself. Handling message.")
    327                 Message msg;
    328                 msg.encapsulate(message);
    329                 handleMessage( &msg, NULL );
    330                 return -1;
     465        if (destination.isUnspecified())
     466            throw message_not_sent("No destination specified. Drop!");
     467
     468        // send messages to myself -> drop!
     469    // TODO maybe this is not what we want. why not just deliver this message?
     470    //   There is a similar check in the route function, there it should be okay.
     471        if (destination == nodeId)
     472        {
     473            logging_warn("Sent message to myself. Drop!");
     474           
     475            throw message_not_sent("Sent message to myself. Drop!");
    331476        }
    332477
    333478        // use relay path?
    334         if (message->isRelayed()) {
     479        if (message->isRelayed())
     480        {
    335481                next_link = getRelayLinkTo( destination );
    336                 if (next_link != NULL) {
     482               
     483                if (next_link != NULL)
     484                {
    337485                        next_link->setRelaying();
    338                         return bc->sendMessage(next_link->communicationId, message);
    339                 } else {
    340                         logging_warn("Could not send message. No relay hop found to "
    341                                         << destination << " -- trying to route over overlay paths ...")
    342 //                      logging_error("ERROR: " << debugInformation() );
    343                 //                      return -1;
    344                 }
    345         }
    346 
     486
     487                        // * send message over relayed link *
     488                        return send_overlaymessage_down(message, next_link->communicationId, priority);
     489                }
     490                else
     491                {
     492                        logging_warn("No relay hop found to " << destination
     493                                << " -- trying to route over overlay paths ...")
     494                }
     495        }
     496
     497       
    347498        // last resort -> route over overlay path
    348499        LinkID next_id = overlayInterface->getNextLinkId( destination );
    349         if (next_id.isUnspecified()) {
    350                 logging_warn("Could not send message. No next hop found to " <<
    351                                 destination );
    352                 logging_error("ERROR: " << debugInformation() );
    353                 return -1;
    354         }
    355 
    356         // get link descriptor, up and running? yes-> send message
     500        if ( next_id.isUnspecified() )
     501        {               
     502        // apperently we're the closest node --> try second best node
     503        //   NOTE: This is helpful if our routing table is not up-to-date, but
     504        //   may lead to circles. So we have to be careful.
     505        std::vector<const LinkID*> next_ids =
     506            overlayInterface->getSortedLinkIdsTowardsNode( destination );
     507           
     508        for ( int i = 0; i < next_ids.size(); i++ )
     509        {
     510            const LinkID& link = *next_ids[i];
     511           
     512            if ( ! link.isUnspecified() )
     513            {
     514                next_id = link;
     515               
     516                break;
     517            }
     518        }
     519     
     520        // still no next hop found. drop.
     521        if ( next_id.isUnspecified() )
     522        {
     523            logging_warn("Could not send message. No next hop found to " <<
     524                destination );
     525            logging_error("ERROR: " << debugInformation() );
     526           
     527            throw message_not_sent("No next hop found.");
     528        }
     529        }
     530
     531       
     532        /* get link descriptor, do some checks and send message */
    357533        next_link = getDescriptor(next_id);
    358         if (next_link != NULL && next_link->up) {
    359                 // send message over relayed link
    360                 return send(message, next_link);
    361         }
    362 
    363         // no-> error, dropping message
    364         else {
    365                 logging_warn("Could not send message. Link not known or up");
    366                 logging_error("ERROR: " << debugInformation() );
    367                 return -1;
    368         }
    369 
    370         // not reached-> fail
    371         return -1;
    372 }
     534   
     535    // check pointer
     536    if ( next_link == NULL )
     537    {
     538        // NOTE: this shuldn't happen
     539        throw message_not_sent("Could not send message. Link not known.");
     540    }
     541   
     542    // avoid circles
     543    if ( next_link->remoteNode == last_hop )
     544    {
     545        // XXX logging_debug
     546        logging_info("Possible next hop would create a circle: "
     547            << next_link->remoteNode);
     548       
     549        throw message_not_sent("Could not send message. Possible next hop would create a circle.");
     550    }
     551   
     552    // check if link is up
     553        if ( ! next_link->up)
     554        {
     555        logging_warn("Could not send message. Link not up");
     556        logging_error("ERROR: " << debugInformation() );
     557       
     558        throw message_not_sent("Could not send message. Link not up");
     559        }
     560
     561        // * send message over overlay link *
     562        return send(message, next_link, priority);
     563}
     564
    373565
    374566/// send a message using a link descriptor, delivers it to the base overlay class
    375 seqnum_t BaseOverlay::send( OverlayMsg* message, LinkDescriptor* ldr, bool ignore_down ) {
     567seqnum_t BaseOverlay::send( OverlayMsg* message,
     568        LinkDescriptor* ldr,
     569        uint8_t priority ) throw(message_not_sent)
     570{
    376571        // check if null
    377         if (ldr == NULL) {
    378                 logging_error("Can not send message to " << message->getDestinationAddress());
    379                 return -1;
     572        if (ldr == NULL)
     573        {
     574        ostringstream out;
     575        out << "Can not send message to " << message->getDestinationAddress();
     576        throw message_not_sent(out.str());
    380577        }
    381578
    382579        // check if up
    383         if (!ldr->up && !ignore_down) {
    384                 logging_error("Can not send message. Link not up:" << ldr );
     580        if ( !ldr->up )
     581        {
    385582                logging_error("DEBUG_INFO: " << debugInformation() );
    386                 return -1;
    387         }
    388         LinkDescriptor* ld = NULL;
    389 
    390         // handle relayed link
    391         if (ldr->relayed) {
     583
     584        ostringstream out;
     585        out << "Can not send message. Link not up:" << ldr->to_string();
     586        throw message_not_sent(out.str());
     587        }
     588       
     589        LinkDescriptor* next_hop_ld = NULL;
     590
     591        // BRANCH: relayed link
     592        if (ldr->relayed)
     593        {
    392594                logging_debug("Resolving direct link for relayed link to "
    393595                                << ldr->remoteNode);
    394                 ld = getRelayLinkTo( ldr->remoteNode );
    395                 if (ld==NULL) {
    396                         logging_error("No relay path found to link " << ldr );
     596               
     597                next_hop_ld = getRelayLinkTo( ldr->remoteNode );
     598               
     599                if (next_hop_ld==NULL)
     600                {
    397601                        logging_error("DEBUG_INFO: " << debugInformation() );
    398                         return -1;
    399                 }
    400                 ld->setRelaying();
     602                       
     603                ostringstream out;
     604                out << "No relay path found to link: " << ldr;
     605                throw message_not_sent(out.str());
     606                }
     607               
     608                next_hop_ld->setRelaying();
    401609                message->setRelayed(true);
    402         } else
    403                 ld = ldr;
    404 
    405         // handle direct link
    406         if (ld->communicationUp) {
    407                 logging_debug("send(): Sending message over direct link.");
    408                 return bc->sendMessage( ld->communicationId, message );
    409         } else {
    410                 logging_error("send(): Could not send message. "
    411                                 "Not a relayed link and direct link is not up.");
    412                 return -1;
    413         }
    414         return -1;
     610        }
     611        // BRANCH: direct link
     612        else
     613        {
     614                next_hop_ld = ldr;
     615        }
     616
     617       
     618        // check next hop-link
     619        if ( ! next_hop_ld->communicationUp)
     620        {
     621            throw message_not_sent( "send(): Could not send message."
     622                    " Not a relayed link and direct link is not up." );
     623        }
     624
     625        // send over next link
     626    logging_debug("send(): Sending message over direct link.");
     627    return send_overlaymessage_down(message, next_hop_ld->communicationId, priority);
     628
    415629}
    416630
    417631seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote,
    418                 const ServiceID& service) {
     632        uint8_t priority, const ServiceID& service) throw(message_not_sent)
     633{
    419634        message->setSourceNode(nodeId);
    420635        message->setDestinationNode(remote);
    421636        message->setService(service);
    422         return send( message, remote );
    423 }
    424 
    425 seqnum_t BaseOverlay::send_link( OverlayMsg* message, const LinkID& link,bool ignore_down ) {
     637        return send( message, remote, priority );
     638}
     639
     640void BaseOverlay::send_link( OverlayMsg* message,
     641        const LinkID& link,
     642        uint8_t priority ) throw(message_not_sent)
     643{
    426644        LinkDescriptor* ld = getDescriptor(link);
    427         if (ld==NULL) {
    428                 logging_error("Cannot find descriptor to link id=" << link.toString());
    429                 return -1;
    430         }
     645        if (ld==NULL)
     646        {
     647            throw message_not_sent("Cannot find descriptor to link id=" + link.toString());
     648        }
     649       
    431650        message->setSourceNode(nodeId);
    432651        message->setDestinationNode(ld->remoteNode);
     
    437656        message->setService(ld->service);
    438657        message->setRelayed(ld->relayed);
    439         return send( message, ld, ignore_down );
     658   
     659   
     660    try
     661    {
     662        // * send message *
     663        send( message, ld, priority );
     664    }
     665    catch ( message_not_sent& e )
     666    {
     667        // drop failed link
     668        ld->failed = true;
     669        dropLink(ld->overlayId);
     670    }
    440671}
    441672
     
    451682                // relay link still used and alive?
    452683                if (ld==NULL
    453                                 || !ld->isDirectVital()
    454                                 || difftime(route.used, time(NULL)) > 8) {
     684                                || !isLinkDirectVital(ld)
     685                                || difftime(route.used, time(NULL)) > KEEP_ALIVE_TIME_OUT)  // TODO this was set to 8 before.. Is the new timeout better?
     686                {
    455687                        logging_info("Forgetting relay information to node "
    456688                                        << route.node.toString() );
     
    488720        if (message->isRelayed()) {
    489721                // try to find source node
    490                 BOOST_FOREACH( relay_route& route, relay_routes ) {
     722                foreach( relay_route& route, relay_routes ) {
    491723                        // relay route found? yes->
    492724                        if ( route.node == message->getDestinationNode() ) {
     
    504736
    505737                // try to find source node
    506                 BOOST_FOREACH( relay_route& route, relay_routes ) {
     738                foreach( relay_route& route, relay_routes ) {
    507739
    508740                        // relay route found? yes->
     
    516748                                if (route.hops > message->getNumHops()
    517749                                                || rld == NULL
    518                                                 || !rld->isDirectVital()) {
     750                                                || !isLinkDirectVital(ld)) {
    519751                                        logging_info("Updating relay information to node "
    520752                                                        << route.node.toString()
    521                                                         << " reducing to " << message->getNumHops() << " hops.");
     753                                                        << " reducing to " << (int) message->getNumHops() << " hops.");
    522754                                        route.hops = message->getNumHops();
    523755                                        route.link = ld->overlayId;
     
    542774LinkDescriptor* BaseOverlay::getRelayLinkTo( const NodeID& remote ) {
    543775        // try to find source node
    544         BOOST_FOREACH( relay_route& route, relay_routes ) {
     776        foreach( relay_route& route, relay_routes ) {
    545777                if (route.node == remote ) {
    546778                        LinkDescriptor* ld = getDescriptor( route.link );
    547                         if (ld==NULL || !ld->isDirectVital()) return NULL; else {
     779                        if (ld==NULL || !isLinkDirectVital(ld)) return NULL; else {
    548780                                route.used = time(NULL);
    549781                                return ld;
     
    575807// ----------------------------------------------------------------------------
    576808
    577 void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) {
     809void BaseOverlay::start( BaseCommunication* _basecomm, const NodeID& _nodeid ) {
    578810        logging_info("Starting...");
    579811
    580812        // set parameters
    581         bc = &_basecomm;
     813        bc = _basecomm;
    582814        nodeId = _nodeid;
    583815
     
    587819
    588820        // timer for auto link management
    589         Timer::setInterval( 1000 );
     821        Timer::setInterval( 1000 ); // XXX
     822//      Timer::setInterval( 10000 );
    590823        Timer::start();
    591824
     
    641874                overlayInterface->joinOverlay();
    642875                state = BaseOverlayStateCompleted;
    643                 BOOST_FOREACH( NodeListener* i, nodeListeners )
     876                foreach( NodeListener* i, nodeListeners )
    644877                        i->onJoinCompleted( spovnetId );
    645878
     
    682915        // gather all service links
    683916        vector<LinkID> servicelinks;
    684         BOOST_FOREACH( LinkDescriptor* ld, links ) {
     917        foreach( LinkDescriptor* ld, links )
     918        {
    685919                if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
    686920                        servicelinks.push_back( ld->overlayId );
     
    688922
    689923        // drop all service links
    690         BOOST_FOREACH( LinkID lnk, servicelinks )
    691         dropLink( lnk );
     924        foreach( LinkID lnk, servicelinks )
     925        {
     926            logging_debug("Dropping service link " << lnk.toString());
     927            dropLink( lnk );
     928        }
    692929
    693930        // let the node leave the spovnet overlay interface
    694931        logging_debug( "Leaving overlay" );
    695932        if( overlayInterface != NULL )
     933        {
    696934                overlayInterface->leaveOverlay();
     935        }
    697936
    698937        // drop still open bootstrap links
    699         BOOST_FOREACH( LinkID lnk, bootstrapLinks )
    700         bc->dropLink( lnk );
     938        foreach( LinkID lnk, bootstrapLinks )
     939        {
     940            logging_debug("Dropping bootstrap link " << lnk.toString());
     941            bc->dropLink( lnk );
     942        }
    701943
    702944        // change to inalid state
     
    708950
    709951        // inform all registered services of the event
    710         BOOST_FOREACH( NodeListener* i, nodeListeners ) {
     952        foreach( NodeListener* i, nodeListeners )
     953        {
    711954                if( ret ) i->onLeaveCompleted( spovnetId );
    712955                else i->onLeaveFailed( spovnetId );
     
    731974                state = BaseOverlayStateInvalid;
    732975
    733                 BOOST_FOREACH( NodeListener* i, nodeListeners )
     976                foreach( NodeListener* i, nodeListeners )
    734977                i->onJoinFailed( spovnetId );
    735978
     
    7831026                const ServiceID& service ) {
    7841027
     1028    // TODO What if we already have a Link to this node and this service id?
     1029   
    7851030        // do not establish a link to myself!
    786         if (remote == nodeId) return LinkID::UNSPECIFIED;
    787 
     1031        if (remote == nodeId) return
     1032                LinkID::UNSPECIFIED;
     1033
     1034       
    7881035        // create a link descriptor
    7891036        LinkDescriptor* ld = addDescriptor();
     
    7921039        ld->service = service;
    7931040        ld->listener = getListener(ld->service);
     1041   
     1042    // initialize sequence numbers
     1043    ld->last_sent_seqnum = SequenceNumber::createRandomSeqNum_Short();
     1044    logging_debug("Creating new link with initial SeqNum: " << ld->last_sent_seqnum);
    7941045
    7951046        // create link request message
     
    8001051        msg.setRelayed(true);
    8011052        msg.setRegisterRelay(true);
     1053//      msg.setRouteRecord(true);
     1054   
     1055    msg.setSeqNum(ld->last_sent_seqnum);
    8021056
    8031057        // debug message
     
    8091063        );
    8101064
     1065       
    8111066        // sending message to node
    812         send_node( &msg, ld->remoteNode, ld->service );
    813 
     1067        try
     1068        {
     1069            // * send *
     1070            seqnum_t seq = send_node( &msg, ld->remoteNode, system_priority::OVERLAY, ld->service );
     1071        }
     1072        catch ( message_not_sent& e )
     1073        {
     1074            logging_warn("Link request not sent: " << e.what());
     1075           
     1076            // Message not sent. Cancel link request.
     1077            SystemQueue::instance().scheduleCall(
     1078                    boost::bind(
     1079                            &BaseOverlay::__onLinkEstablishmentFailed,
     1080                            this,
     1081                            ld->overlayId)
     1082                );
     1083        }
     1084       
    8141085        return ld->overlayId;
    8151086}
    8161087
     1088/// NOTE: "id" is an Overlay-LinkID
     1089void BaseOverlay::__onLinkEstablishmentFailed(const LinkID& id)
     1090{
     1091    // TODO This code redundant. But also it's not easy to aggregate in one function.
     1092   
     1093    // get descriptor for link
     1094    LinkDescriptor* ld = getDescriptor(id, false);
     1095    if ( ld == NULL ) return; // not found? ->ignore!
     1096
     1097    logging_debug( "__onLinkEstablishmentFaild: " << ld );
     1098
     1099    // removing relay link information
     1100    removeRelayLink(ld->overlayId);
     1101
     1102    // inform listeners about link down
     1103    ld->communicationUp = false;
     1104    if (!ld->service.isUnspecified())
     1105    {
     1106        CommunicationListener* lst = getListener(ld->service);
     1107        if(lst != NULL) lst->onLinkFail( ld->overlayId, ld->remoteNode );
     1108        sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
     1109    }
     1110
     1111    // delete all queued messages (auto links)
     1112    if( ld->messageQueue.size() > 0 ) {
     1113        logging_warn( "Dropping link " << id.toString() << " that has "
     1114                << ld->messageQueue.size() << " waiting messages" );
     1115        ld->flushQueue();
     1116    }
     1117
     1118    // erase mapping
     1119    eraseDescriptor(ld->overlayId);
     1120}
     1121
     1122
    8171123/// drops an established link
    818 void BaseOverlay::dropLink(const LinkID& link) {
    819         logging_info( "Dropping link (initiated locally):" << link.toString() );
     1124void BaseOverlay::dropLink(const LinkID& link)
     1125{
     1126        logging_info( "Dropping link: " << link.toString() );
    8201127
    8211128        // find the link item to drop
    8221129        LinkDescriptor* ld = getDescriptor(link);
    823         if( ld == NULL ) {
     1130        if( ld == NULL )
     1131        {
    8241132                logging_warn( "Can't drop link, link is unknown!");
    8251133                return;
     
    8271135
    8281136        // delete all queued messages
    829         if( ld->messageQueue.size() > 0 ) {
     1137        if( ld->messageQueue.size() > 0 )
     1138        {
    8301139                logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
    8311140                                << ld->messageQueue.size() << " waiting messages" );
    8321141                ld->flushQueue();
    8331142        }
    834 
    835         // inform sideport and listener
    836         if(ld->listener != NULL)
    837                 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
    838         sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
    839 
    840         // do not drop relay links
    841         if (!ld->relaying) {
    842                 // drop the link in base communication
    843                 if (ld->communicationUp) bc->dropLink( ld->communicationId );
    844 
    845                 // erase descriptor
    846                 eraseDescriptor( ld->overlayId );
    847         } else {
    848                 ld->dropAfterRelaying = true;
    849         }
     1143       
     1144           
     1145        // inform application and remote note (but only once)
     1146        //   NOTE: If we initiated the drop, this function is called twice, but on
     1147        //   the second call, there is noting to do.
     1148        if ( ld->up && ! ld->failed )
     1149        {
     1150        // inform sideport and listener
     1151        if(ld->listener != NULL)
     1152        {
     1153            ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
     1154        }
     1155        sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
     1156       
     1157        // send link-close to remote node
     1158        logging_info("Sending LinkClose message to remote node.");
     1159        OverlayMsg close_msg(OverlayMsg::typeLinkClose);
     1160        send_link(&close_msg, link, system_priority::OVERLAY);
     1161   
     1162        // deactivate link
     1163        ld->up = false;
     1164//         ld->closing = true;
     1165        }
     1166       
     1167        else if ( ld->failed )
     1168    {
     1169        // inform listener
     1170        if( ld->listener != NULL )
     1171        {
     1172            ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
     1173        }
     1174       
     1175        ld->up = false;
     1176        __removeDroppedLink(ld->overlayId);
     1177    }
     1178}
     1179
     1180/// called from typeLinkClose-handler
     1181void BaseOverlay::__removeDroppedLink(const LinkID& link)
     1182{
     1183    // find the link item to drop
     1184    LinkDescriptor* ld = getDescriptor(link);
     1185    if( ld == NULL )
     1186    {
     1187        return;
     1188    }
     1189
     1190    // do not drop relay links
     1191    if (!ld->relaying)
     1192    {
     1193        // drop the link in base communication
     1194        if (ld->communicationUp)
     1195        {
     1196            bc->dropLink( ld->communicationId );
     1197        }
     1198
     1199        // erase descriptor
     1200        eraseDescriptor( ld->overlayId );
     1201    }
     1202    else
     1203    {
     1204        ld->dropAfterRelaying = true;
     1205    }
    8501206}
    8511207
     
    8531209
    8541210/// internal send message, always use this functions to send messages over links
    855 seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) {
     1211const SequenceNumber& BaseOverlay::sendMessage( reboost::message_t message,
     1212        const LinkID& link,
     1213        uint8_t priority ) throw(message_not_sent)
     1214{
    8561215        logging_debug( "Sending data message on link " << link.toString() );
    8571216
    8581217        // get the mapping for this link
    8591218        LinkDescriptor* ld = getDescriptor(link);
    860         if( ld == NULL ) {
    861                 logging_error("Could not send message. "
    862                                 << "Link not found id=" << link.toString());
    863                 return -1;
     1219        if( ld == NULL )
     1220        {
     1221            throw message_not_sent("Could not send message. Link not found id=" + link.toString());
    8641222        }
    8651223
    8661224        // check if the link is up yet, if its an auto link queue message
    867         if( !ld->up ) {
     1225        if( !ld->up )
     1226        {
    8681227                ld->setAutoUsed();
    869                 if( ld->autolink ) {
     1228                if( ld->autolink )
     1229                {
    8701230                        logging_info("Auto-link " << link.toString() << " not up, queue message");
    871                         Data data = data_serialize( message );
    872                         const_cast<Message*>(message)->dropPayload();
    873                         ld->messageQueue.push_back( new Message(data) );
    874                 } else {
    875                         logging_error("Link " << link.toString() << " not up, drop message");
    876                 }
    877                 return -1;
    878         }
    879 
    880         // compile overlay message (has service and node id)
    881         OverlayMsg overmsg( OverlayMsg::typeData );
    882         overmsg.encapsulate( const_cast<Message*>(message) );
    883 
    884         // send message over relay/direct/overlay
    885         return send_link( &overmsg, ld->overlayId );
    886 }
    887 
    888 
    889 seqnum_t BaseOverlay::sendMessage(const Message* message,
    890                 const NodeID& node, const ServiceID& service) {
     1231                       
     1232                        // queue message
     1233                LinkDescriptor::message_queue_entry msg;
     1234                msg.message = message;
     1235                msg.priority = priority;
     1236
     1237                        ld->messageQueue.push_back( msg );
     1238                       
     1239                        return SequenceNumber::DISABLED;  // TODO what to return if message is queued?
     1240                }
     1241                else
     1242                {
     1243                    throw message_not_sent("Link " + link.toString() + " not up, drop message");
     1244                }
     1245        }
     1246       
     1247        // TODO AKTUELL: sequence numbers
     1248        // TODO seqnum on fast path ?
     1249        ld->last_sent_seqnum.increment();
     1250       
     1251        /* choose fast-path for direct links; normal overlay-path otherwise */
     1252        // BRANCH: direct link
     1253        if ( ld->communicationUp && !ld->relayed )
     1254        {
     1255            // * send down to BaseCommunication *
     1256            try
     1257            {
     1258                bc->sendMessage(ld->communicationId, message, priority, true);
     1259        }
     1260        catch ( communication::communication_message_not_sent& e )
     1261        {
     1262            ostringstream out;
     1263            out << "Communication message on fast-path not sent: " << e.what();
     1264            throw message_not_sent(out.str());
     1265        }
     1266        }
     1267
     1268        // BRANCH: use (slow) overlay-path
     1269        else
     1270        {
     1271        // compile overlay message (has service and node id)
     1272        OverlayMsg overmsg( OverlayMsg::typeData );
     1273        overmsg.set_payload_message(message);
     1274       
     1275        // set SeqNum
     1276        if ( ld->transmit_seqnums )
     1277        {
     1278            overmsg.setSeqNum(ld->last_sent_seqnum);
     1279        }
     1280        logging_debug("Sending Message with SeqNum: " << overmsg.getSeqNum());
     1281   
     1282        // send message over relay/direct/overlay
     1283        send_link( &overmsg, ld->overlayId, priority );
     1284        }
     1285       
     1286        // return seqnum
     1287        return ld->last_sent_seqnum;
     1288}
     1289
     1290
     1291const SequenceNumber& BaseOverlay::sendMessage(reboost::message_t message,
     1292                const NodeID& node, uint8_t priority, const ServiceID& service) {
    8911293
    8921294        // find link for node and service
     
    9071309                if( ld == NULL ) {
    9081310                        logging_error( "Failed to establish auto-link.");
    909                         return -1;
     1311            throw message_not_sent("Failed to establish auto-link.");
    9101312                }
    9111313                ld->autolink = true;
     
    9201322
    9211323        // send / queue message
    922         return sendMessage( message, ld->overlayId );
    923 }
    924 
    925 
    926 NodeID BaseOverlay::sendMessageCloserToNodeID(const Message* message,
    927         const NodeID& address, const ServiceID& service) {
     1324        return sendMessage( message, ld->overlayId, priority );
     1325}
     1326
     1327
     1328NodeID BaseOverlay::sendMessageCloserToNodeID(reboost::message_t message,
     1329        const NodeID& address, uint8_t priority, const ServiceID& service) {
    9281330   
    9291331    if ( overlayInterface->isClosestNodeTo(address) )
     
    9361338    if ( closest_node != NodeID::UNSPECIFIED )
    9371339    {
    938         seqnum_t seqnum = sendMessage(message, closest_node, service);
     1340        sendMessage(message, closest_node, priority, service);
    9391341    }
    9401342   
    941     return closest_node;  // XXX return seqnum ?? tuple? closest_node via (non const) reference?
     1343    return closest_node;  // return seqnum ?? tuple? closest_node via (non const) reference?
    9421344}
    9431345// ----------------------------------------------------------------------------
     
    9781380
    9791381        // see if we can find the node in our own table
    980         BOOST_FOREACH(const LinkDescriptor* ld, links){
     1382        foreach(const LinkDescriptor* ld, links){
    9811383                if(ld->remoteNode != node) continue;
    9821384                if(!ld->communicationUp) continue;
     
    10791481
    10801482void BaseOverlay::onLinkUp(const LinkID& id,
    1081                 const address_v* local, const address_v* remote) {
     1483        const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote)
     1484{
    10821485        logging_debug( "Link up with base communication link id=" << id );
    10831486
     
    10851488        LinkDescriptor* ld = getDescriptor(id, true);
    10861489
    1087         // handle bootstrap link we initiated
     1490        // BRANCH: handle bootstrap link we initiated
    10881491        if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
    10891492                logging_info(
    10901493                                "Join has been initiated by me and the link is now up. " <<
     1494                                "LinkID: " << id.toString() <<
    10911495                                "Sending out join request for SpoVNet " << spovnetId.toString()
    10921496                );
     
    10961500                                OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
    10971501                JoinRequest joinRequest( spovnetId, nodeId );
    1098                 overlayMsg.encapsulate( &joinRequest );
    1099                 bc->sendMessage( id, &overlayMsg );
     1502                overlayMsg.append_buffer(joinRequest.serialize_into_shared_buffer());
     1503
     1504                send_overlaymessage_down(&overlayMsg, id, system_priority::OVERLAY);
     1505               
    11001506                return;
    11011507        }
    11021508
    1103         // no link found? -> link establishment from remote, add one!
     1509        // BRANCH: link establishment from remote, add one!
    11041510        if (ld == NULL) {
    11051511                ld = addDescriptor( id );
     
    11151521                // in this case, do not inform listener, since service it unknown
    11161522                // -> wait for update message!
    1117 
    1118                 // link mapping found? -> send update message with node-id and service id
    1119         } else {
     1523        }
     1524       
     1525        // BRANCH: We requested this link in the first place
     1526        else
     1527        {
    11201528                logging_info( "onLinkUp descriptor (initiated locally):" << ld );
    11211529
     
    11261534                ld->fromRemote = false;
    11271535
    1128                 // if link is a relayed link->convert to direct link
    1129                 if (ld->relayed) {
    1130                         logging_info( "Converting to direct link: " << ld );
     1536                // BRANCH: this was a relayed link before --> convert to direct link
     1537                //   TODO do we really have to send a message here?
     1538                if (ld->relayed)
     1539                {
    11311540                        ld->up = true;
    11321541                        ld->relayed = false;
     1542                        logging_info( "Converting to direct link: " << ld );
     1543                       
     1544                        // send message
    11331545                        OverlayMsg overMsg( OverlayMsg::typeLinkDirect );
    11341546                        overMsg.setSourceLink( ld->overlayId );
    11351547                        overMsg.setDestinationLink( ld->remoteLink );
    1136                         send_link( &overMsg, ld->overlayId );
    1137                 } else {
     1548                        send_link( &overMsg, ld->overlayId, system_priority::OVERLAY );
     1549                       
     1550                    // inform listener
     1551                    if( ld->listener != NULL)
     1552                        ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
     1553                }
     1554               
     1555
     1556        /* NOTE: Chord is opening direct-links in it's setup routine which are
     1557         *   neither set to "relayed" nor to "up". To activate these links a
     1558         *   typeLinkUpdate must be sent.
     1559         *   
     1560         * This branch is would also be taken when we had a working link before
     1561         *   (ld->up == true). I'm not sure if this case does actually happen
     1562         *   and whether it's tested.
     1563         */
     1564                else
     1565                {
    11381566                        // note: necessary to validate the link on the remote side!
    11391567                        logging_info( "Sending out update" <<
     
    11441572                        // compile and send update message
    11451573                        OverlayMsg overlayMsg( OverlayMsg::typeLinkUpdate );
    1146                         overlayMsg.setSourceLink(ld->overlayId);
    11471574                        overlayMsg.setAutoLink( ld->autolink );
    1148                         send_link( &overlayMsg, ld->overlayId, true );
     1575                    overlayMsg.setSourceNode(nodeId);
     1576                    overlayMsg.setDestinationNode(ld->remoteNode);
     1577                    overlayMsg.setSourceLink(ld->overlayId);
     1578                    overlayMsg.setDestinationLink(ld->remoteLink);
     1579                    overlayMsg.setService(ld->service);
     1580                    overlayMsg.setRelayed(false);
     1581
     1582                    // TODO ld->communicationId = id ??
     1583                   
     1584                    send_overlaymessage_down(&overlayMsg, id, system_priority::OVERLAY);
    11491585                }
    11501586        }
     
    11521588
    11531589void BaseOverlay::onLinkDown(const LinkID& id,
    1154                 const address_v* local, const address_v* remote) {
    1155 
     1590        const addressing2::EndpointPtr local,
     1591        const addressing2::EndpointPtr remote)
     1592{
    11561593        // erase bootstrap links
    11571594        vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
     
    11851622}
    11861623
     1624
     1625void BaseOverlay::onLinkFail(const LinkID& id,
     1626        const addressing2::EndpointPtr local,
     1627        const addressing2::EndpointPtr remote)
     1628{
     1629        logging_debug( "Link fail with base communication link id=" << id );
     1630
     1631//      // erase bootstrap links
     1632//      vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
     1633//      if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
     1634//
     1635//      // get descriptor for link
     1636//      LinkDescriptor* ld = getDescriptor(id, true);
     1637//      if ( ld == NULL ) return; // not found? ->ignore!
     1638//      logging_debug( "Link failed id=" << ld->overlayId.toString() );
     1639//
     1640//      // inform listeners
     1641//      ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
     1642//      sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
     1643       
     1644        logging_debug( "  ... calling onLinkDown ..." );
     1645        onLinkDown(id, local, remote);
     1646}
     1647
     1648
    11871649void BaseOverlay::onLinkChanged(const LinkID& id,
    1188                 const address_v* oldlocal, const address_v* newlocal,
    1189                 const address_v* oldremote, const address_v* newremote) {
    1190 
    1191         // get descriptor for link
    1192         LinkDescriptor* ld = getDescriptor(id, true);
    1193         if ( ld == NULL ) return; // not found? ->ignore!
    1194         logging_debug( "onLinkChanged descriptor: " << ld );
    1195 
    1196         // inform listeners
    1197         ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
    1198         sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
    1199 
    1200         // autolinks: refresh timestamp
    1201         ld->setAutoUsed();
    1202 }
    1203 
    1204 void BaseOverlay::onLinkFail(const LinkID& id,
    1205                 const address_v* local, const address_v* remote) {
    1206         logging_debug( "Link fail with base communication link id=" << id );
    1207 
    1208         // erase bootstrap links
    1209         vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
    1210         if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
    1211 
    1212         // get descriptor for link
    1213         LinkDescriptor* ld = getDescriptor(id, true);
    1214         if ( ld == NULL ) return; // not found? ->ignore!
    1215         logging_debug( "Link failed id=" << ld->overlayId.toString() );
    1216 
    1217         // inform listeners
    1218         ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
    1219         sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
    1220 }
    1221 
    1222 void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local,
    1223                 const address_v* remote, const QoSParameterSet& qos) {
    1224         logging_debug( "Link quality changed with base communication link id=" << id );
    1225 
    1226         // get descriptor for link
    1227         LinkDescriptor* ld = getDescriptor(id, true);
    1228         if ( ld == NULL ) return; // not found? ->ignore!
    1229         logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
    1230 }
    1231 
    1232 bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local,
    1233                 const address_v* remote ) {
     1650        const addressing2::EndpointPtr oldlocal,  const addressing2::EndpointPtr newlocal,
     1651        const addressing2::EndpointPtr oldremote, const addressing2::EndpointPtr newremote)
     1652{
     1653    // get descriptor for link
     1654    LinkDescriptor* ld = getDescriptor(id, true);
     1655    if ( ld == NULL ) return; // not found? ->ignore!
     1656    logging_debug( "onLinkChanged descriptor: " << ld );
     1657
     1658    // inform listeners
     1659    ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
     1660    sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
     1661
     1662    // autolinks: refresh timestamp
     1663    ld->setAutoUsed();
     1664}
     1665
     1666//void BaseOverlay::onLinkQoSChanged(const LinkID& id,
     1667//        const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote,
     1668//        const QoSParameterSet& qos)
     1669//{
     1670//      logging_debug( "Link quality changed with base communication link id=" << id );
     1671//
     1672//      // get descriptor for link
     1673//      LinkDescriptor* ld = getDescriptor(id, true);
     1674//      if ( ld == NULL ) return; // not found? ->ignore!
     1675//      logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
     1676//}
     1677
     1678bool BaseOverlay::onLinkRequest(const LinkID& id,
     1679        const addressing2::EndpointPtr local,
     1680        const addressing2::EndpointPtr remote)
     1681{
    12341682        logging_debug("Accepting link request from " << remote->to_string() );
     1683       
     1684        // TODO ask application..?
     1685       
    12351686        return true;
    12361687}
    12371688
     1689
     1690
     1691
    12381692/// handles a message from base communication
    1239 bool BaseOverlay::receiveMessage(const Message* message,
    1240                 const LinkID& link, const NodeID& ) {
     1693bool BaseOverlay::receiveMessage( reboost::shared_buffer_t message,
     1694                const LinkID& link,
     1695                const NodeID&,
     1696                bool bypass_overlay )
     1697{
    12411698        // get descriptor for link
    12421699        LinkDescriptor* ld = getDescriptor( link, true );
    1243         return handleMessage( message, ld, link );
     1700
     1701       
     1702        /* choose fastpath for direct links; normal overlay-path otherwise */   
     1703        if ( bypass_overlay && ld )
     1704        {
     1705        // message received --> link is alive
     1706        ld->keepAliveReceived = time(NULL);
     1707        // hop count on this link
     1708        ld->hops = 0;
     1709
     1710       
     1711        // hand over to CommunicationListener (aka Application)
     1712            CommunicationListener* lst = getListener(ld->service);
     1713            if ( lst != NULL )
     1714            {
     1715                lst->onMessage(
     1716                        message,
     1717                        ld->remoteNode,
     1718                        ld->overlayId,
     1719                    SequenceNumber::DISABLED,
     1720                        NULL );
     1721               
     1722                return true;
     1723            }
     1724
     1725            return false;
     1726        }
     1727        else
     1728        {
     1729            return handleMessage( message, ld, link );     
     1730        }
    12441731}
    12451732
     
    12471734
    12481735/// Handle spovnet instance join requests
    1249 bool BaseOverlay::handleJoinRequest( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
    1250 
     1736bool BaseOverlay::handleJoinRequest( reboost::shared_buffer_t message, const NodeID& source, const LinkID& bcLink )
     1737{
    12511738        // decapsulate message
    1252         JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
     1739        JoinRequest joinReq;
     1740        joinReq.deserialize_from_shared_buffer(message);
     1741       
    12531742        logging_info( "Received join request for spovnet " <<
    1254                         joinReq->getSpoVNetID().toString() );
     1743                        joinReq.getSpoVNetID().toString() );
    12551744
    12561745        // check spovnet id
    1257         if( joinReq->getSpoVNetID() != spovnetId ) {
     1746        if( joinReq.getSpoVNetID() != spovnetId ) {
    12581747                logging_error(
    12591748                                "Received join request for spovnet we don't handle " <<
    1260                                 joinReq->getSpoVNetID().toString() );
    1261                 delete joinReq;
     1749                                joinReq.getSpoVNetID().toString() );
     1750
    12621751                return false;
    12631752        }
     
    12671756        logging_info( "Sending join reply for spovnet " <<
    12681757                        spovnetId.toString() << " to node " <<
    1269                         overlayMsg->getSourceNode().toString() <<
     1758                        source.toString() <<
    12701759                        ". Result: " << (allow ? "allowed" : "denied") );
    1271         joiningNodes.push_back( overlayMsg->getSourceNode() );
     1760        joiningNodes.push_back( source );
    12721761
    12731762        // return overlay parameters
     
    12761765                        << getEndpointDescriptor().toString() )
    12771766        OverlayParameterSet parameters = overlayInterface->getParameters();
     1767       
     1768       
     1769        // create JoinReplay Message
    12781770        OverlayMsg retmsg( OverlayMsg::typeJoinReply,
    12791771                        OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
    1280         JoinReply replyMsg( spovnetId, parameters,
    1281                         allow, getEndpointDescriptor() );
    1282         retmsg.encapsulate(&replyMsg);
    1283         bc->sendMessage( bcLink, &retmsg );
    1284 
    1285         delete joinReq;
     1772        JoinReply replyMsg( spovnetId, parameters, allow );
     1773        retmsg.append_buffer(replyMsg.serialize_into_shared_buffer());
     1774
     1775        // XXX This is unlovely clash between the old message system and the new one,
     1776        // but a.t.m. we can't migrate everything to the new system at once..
     1777        // ---> Consider the EndpointDescriptor as part of the JoinReply..
     1778        retmsg.append_buffer(getEndpointDescriptor().serialize());
     1779       
     1780        // * send *
     1781        send_overlaymessage_down(&retmsg, bcLink, system_priority::OVERLAY);
     1782
    12861783        return true;
    12871784}
    12881785
    12891786/// Handle replies to spovnet instance join requests
    1290 bool BaseOverlay::handleJoinReply( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
     1787bool BaseOverlay::handleJoinReply( reboost::shared_buffer_t message, const LinkID& bcLink )
     1788{
    12911789        // decapsulate message
    12921790        logging_debug("received join reply message");
    1293         JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
     1791        JoinReply replyMsg;
     1792        EndpointDescriptor endpoints;
     1793        reboost::shared_buffer_t buff = replyMsg.deserialize_from_shared_buffer(message);
     1794        buff = endpoints.deserialize(buff);
    12941795
    12951796        // correct spovnet?
    1296         if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail
     1797        if( replyMsg.getSpoVNetID() != spovnetId ) { // no-> fail
    12971798                logging_error( "Received SpoVNet join reply for " <<
    1298                                 replyMsg->getSpoVNetID().toString() <<
     1799                                replyMsg.getSpoVNetID().toString() <<
    12991800                                " != " << spovnetId.toString() );
    1300                 delete replyMsg;
     1801
    13011802                return false;
    13021803        }
    13031804
    13041805        // access granted? no -> fail
    1305         if( !replyMsg->getJoinAllowed() ) {
     1806        if( !replyMsg.getJoinAllowed() ) {
    13061807                logging_error( "Our join request has been denied" );
    13071808
     
    13171818
    13181819                // inform all registered services of the event
    1319                 BOOST_FOREACH( NodeListener* i, nodeListeners )
     1820                foreach( NodeListener* i, nodeListeners )
    13201821                i->onJoinFailed( spovnetId );
    13211822
    1322                 delete replyMsg;
    13231823                return true;
    13241824        }
     
    13291829
    13301830        logging_debug( "Using bootstrap end-point "
    1331                         << replyMsg->getBootstrapEndpoint().toString() );
     1831                        << endpoints.toString() );
    13321832
    13331833        // create overlay structure from spovnet parameter set
     
    13381838
    13391839                overlayInterface = OverlayFactory::create(
    1340                                 *this, replyMsg->getParam(), nodeId, this );
     1840                                *this, replyMsg.getParam(), nodeId, this );
    13411841
    13421842                // overlay structure supported? no-> fail!
     
    13541854
    13551855                        // inform all registered services of the event
    1356                         BOOST_FOREACH( NodeListener* i, nodeListeners )
     1856                        foreach( NodeListener* i, nodeListeners )
    13571857                        i->onJoinFailed( spovnetId );
    13581858
    1359                         delete replyMsg;
    13601859                        return true;
    13611860                }
     
    13651864                overlayInterface->createOverlay();
    13661865
    1367                 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
    1368                 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
     1866                overlayInterface->joinOverlay( endpoints );
     1867                overlayBootstrap.recordJoin( endpoints );
    13691868
    13701869                // update ovlvis
     
    13721871
    13731872                // inform all registered services of the event
    1374                 BOOST_FOREACH( NodeListener* i, nodeListeners )
    1375                 i->onJoinCompleted( spovnetId );
    1376 
    1377                 delete replyMsg;
    1378 
    1379         } else {
    1380 
     1873                foreach( NodeListener* i, nodeListeners )
     1874                    i->onJoinCompleted( spovnetId );
     1875        }
     1876        else
     1877        {
    13811878                // this is not the first bootstrap, just join the additional node
    13821879                logging_debug("not first-time bootstrapping");
    1383                 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
    1384                 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
    1385 
    1386                 delete replyMsg;
    1387 
     1880                overlayInterface->joinOverlay( endpoints );
     1881                overlayBootstrap.recordJoin( endpoints );
    13881882        } // if( overlayInterface == NULL )
    13891883
     
    13921886
    13931887
    1394 bool BaseOverlay::handleData( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
     1888bool BaseOverlay::handleData( reboost::shared_buffer_t message, OverlayMsg* overlayMsg, LinkDescriptor* ld )
     1889{
    13951890        // get service
    1396         const ServiceID& service = overlayMsg->getService();
     1891        const ServiceID& service = ld->service; //overlayMsg->getService();
     1892
    13971893        logging_debug( "Received data for service " << service.toString()
    13981894                        << " on link " << overlayMsg->getDestinationLink().toString() );
     
    14021898        if(lst != NULL){
    14031899                lst->onMessage(
    1404                                 overlayMsg,
    1405                                 overlayMsg->getSourceNode(),
    1406                                 overlayMsg->getDestinationLink()
     1900                                message,
     1901//                              overlayMsg->getSourceNode(),
     1902//                              overlayMsg->getDestinationLink(),
     1903                                ld->remoteNode,
     1904                                ld->overlayId,
     1905                overlayMsg->getSeqNum(),
     1906                                overlayMsg
    14071907                );
    14081908        }
     
    14111911}
    14121912
     1913bool BaseOverlay::handleLostMessage( reboost::shared_buffer_t message, OverlayMsg* msg )
     1914{
     1915    /**
     1916     * Deserialize MessageLost-Message
     1917     *
     1918     * - Type of lost message
     1919     * - Hop count of lost message
     1920     * - Source-LinkID  of lost message
     1921     */
     1922    const uint8_t* buff = message(0, sizeof(uint8_t)*2).data();
     1923    uint8_t type = buff[0];
     1924    uint8_t hops = buff[1];
     1925    LinkID linkid;
     1926    linkid.deserialize(message(sizeof(uint8_t)*2));
     1927   
     1928    logging_warn("Node " << msg->getSourceNode()
     1929            << " informed us, that our message of type " << (int) type
     1930            << " is lost after traveling " << (int) hops << " hops."
     1931            << " (LinkID: " << linkid.toString());
     1932
     1933   
     1934    // TODO switch-case ?
     1935   
     1936    // BRANCH: LinkRequest --> link request failed
     1937    if ( type == OverlayMsg::typeLinkRequest )
     1938    {
     1939        __onLinkEstablishmentFailed(linkid);
     1940    }
     1941   
     1942    // BRANCH: Data --> link disrupted. Drop link.
     1943    //   (We could use something more advanced here. e.g. At least send a
     1944    //    keep-alive message and wait for a keep-alive reply.)
     1945    if ( type == OverlayMsg::typeData )
     1946    {
     1947        LinkDescriptor* link_desc = getDescriptor(linkid);
     1948       
     1949        if ( link_desc )
     1950        {
     1951            link_desc->failed = true;
     1952        }
     1953       
     1954        dropLink(linkid);
     1955    }
     1956   
     1957    // BRANCH: ping lost
     1958    if ( type == OverlayMsg::typePing )
     1959    {
     1960        CommunicationListener* lst = getListener(msg->getService());
     1961        if( lst != NULL )
     1962        {
     1963            lst->onPingLost(msg->getSourceNode());
     1964        }
     1965    }
     1966   
     1967    return true;
     1968}
     1969
     1970bool BaseOverlay::handlePing( OverlayMsg* overlayMsg, LinkDescriptor* ld )
     1971{
     1972    // TODO AKTUELL: implement interfaces: Node::ping(node); BaseOverlay::ping(node)
     1973   
     1974    bool send_pong = false;
     1975   
     1976    // inform application and ask permission to send a pong message
     1977    CommunicationListener* lst = getListener(overlayMsg->getService());
     1978    if( lst != NULL )
     1979    {
     1980        send_pong = lst->onPing(overlayMsg->getSourceNode());
     1981    }
     1982   
     1983    // send pong message if allowed
     1984    if ( send_pong )
     1985    {
     1986        OverlayMsg pong_msg(OverlayMsg::typePong);
     1987        pong_msg.setSeqNum(overlayMsg->getSeqNum());
     1988       
     1989        // send message
     1990        try
     1991        {
     1992            send_node( &pong_msg,
     1993                overlayMsg->getSourceNode(),
     1994                system_priority::OVERLAY,
     1995                overlayMsg->getService() );
     1996        }
     1997        catch ( message_not_sent& e )
     1998        {
     1999            logging_info("Could not send Pong-Message to node: " <<
     2000                overlayMsg->getSourceNode());
     2001        }
     2002    }
     2003}
     2004
     2005bool BaseOverlay::handlePong( OverlayMsg* overlayMsg, LinkDescriptor* ld )
     2006{
     2007    // inform application
     2008    CommunicationListener* lst = getListener(overlayMsg->getService());
     2009    if( lst != NULL )
     2010    {
     2011        lst->onPong(overlayMsg->getSourceNode());
     2012    }
     2013}
    14132014
    14142015bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
     
    14392040                overlayMsg->setSourceLink(ld->overlayId);
    14402041                overlayMsg->setService(ld->service);
    1441                 send( overlayMsg, ld );
     2042                send( overlayMsg, ld, system_priority::OVERLAY );
    14422043        }
    14432044
     
    14812082        if( ld->messageQueue.size() > 0 ) {
    14822083                logging_info( "Sending out queued messages on link " << ld );
    1483                 BOOST_FOREACH( Message* msg, ld->messageQueue ) {
    1484                         sendMessage( msg, ld->overlayId );
    1485                         delete msg;
    1486                 }
     2084        foreach( LinkDescriptor::message_queue_entry msg, ld->messageQueue )
     2085        {
     2086            sendMessage( msg.message, ld->overlayId, msg.priority );
     2087        }
    14872088                ld->messageQueue.clear();
    14882089        }
     
    14972098/// handle a link request and reply
    14982099bool BaseOverlay::handleLinkRequest( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
    1499         logging_info( "Link request received from node id=" << overlayMsg->getSourceNode() );
    15002100
    15012101        //TODO: Check if a request has already been sent using getSourceLink() ...
     
    15142114        ldn->remoteNode = overlayMsg->getSourceNode();
    15152115        ldn->remoteLink = overlayMsg->getSourceLink();
    1516 
     2116        ldn->hops = overlayMsg->getNumHops();
     2117       
     2118    // initialize sequence numbers
     2119    ldn->last_sent_seqnum = SequenceNumber::createRandomSeqNum_Short();
     2120    logging_debug("Creating new link with initial SeqNum: " << ldn->last_sent_seqnum);
     2121   
     2122   
    15172123        // update time-stamps
    15182124        ldn->setAlive();
    15192125        ldn->setAutoUsed();
    15202126
     2127        logging_info( "Link request received from node id="
     2128                << overlayMsg->getSourceNode()
     2129                << " LINK: "
     2130                << ldn);
     2131       
    15212132        // create reply message and send back!
    15222133        overlayMsg->swapRoles(); // swap source/destination
    15232134        overlayMsg->setType(OverlayMsg::typeLinkReply);
    15242135        overlayMsg->setSourceLink(ldn->overlayId);
    1525         overlayMsg->setSourceEndpoint( bc->getEndpointDescriptor() );
    15262136        overlayMsg->setRelayed(true);
    1527         send( overlayMsg, ld ); // send back to link
     2137//      overlayMsg->setRouteRecord(true);
     2138    overlayMsg->setSeqNum(ld->last_sent_seqnum);
     2139       
     2140        // TODO aktuell do the same thing in the typeLinkRequest-Message, too. But be careful with race conditions!!
     2141        // append our endpoints (for creation of a direct link)
     2142        overlayMsg->set_payload_message(bc->getEndpointDescriptor().serialize());
     2143       
     2144        send( overlayMsg, ld, system_priority::OVERLAY ); // send back to link
    15282145
    15292146        // inform listener
     
    15342151}
    15352152
    1536 bool BaseOverlay::handleLinkReply( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
    1537 
     2153bool BaseOverlay::handleLinkReply(
     2154        OverlayMsg* overlayMsg,
     2155        reboost::shared_buffer_t sub_message,
     2156        LinkDescriptor* ld )
     2157{
     2158    // deserialize EndpointDescriptor
     2159    EndpointDescriptor endpoints;
     2160    endpoints.deserialize(sub_message);
     2161   
    15382162        // find link request
    15392163        LinkDescriptor* ldn = getDescriptor(overlayMsg->getDestinationLink());
     
    15542178
    15552179        // debug message
    1556         logging_debug( "Link request reply received. Establishing link"
     2180        logging_info( "Link request reply received. Establishing link"
    15572181                        << " for service " << overlayMsg->getService().toString()
    15582182                        << " with local id=" << overlayMsg->getDestinationLink()
    15592183                        << " and remote link id=" << overlayMsg->getSourceLink()
    1560                         << " to " << overlayMsg->getSourceEndpoint().toString()
     2184                        << " to " << endpoints.toString()
     2185                        << " hop count: " << overlayMsg->getRouteRecord().size()
    15612186        );
    15622187
     
    15772202                logging_info( "Sending out queued messages on link " <<
    15782203                                ldn->overlayId.toString() );
    1579                 BOOST_FOREACH( Message* msg, ldn->messageQueue ) {
    1580                         sendMessage( msg, ldn->overlayId );
    1581                         delete msg;
     2204                foreach( LinkDescriptor::message_queue_entry msg, ldn->messageQueue )
     2205                {
     2206                        sendMessage( msg.message, ldn->overlayId, msg.priority );
    15822207                }
    15832208                ldn->messageQueue.clear();
     
    15892214        // try to replace relay link with direct link
    15902215        ldn->retryCounter = 3;
    1591         ldn->endpoint = overlayMsg->getSourceEndpoint();
     2216        ldn->endpoint = endpoints;
    15922217        ldn->communicationId =  bc->establishLink( ldn->endpoint );
    15932218
     
    15962221
    15972222/// handle a keep-alive message for a link
    1598 bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
     2223bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld )
     2224{
    15992225        LinkDescriptor* rld = getDescriptor(overlayMsg->getDestinationLink());
    1600         if ( rld != NULL ) {
    1601                 logging_debug("Keep-Alive for " <<
    1602                                 overlayMsg->getDestinationLink() );
     2226       
     2227        if ( rld != NULL )
     2228        {
     2229                logging_debug("Keep-Alive for " << overlayMsg->getDestinationLink() );
    16032230                if (overlayMsg->isRouteRecord())
     2231                {
    16042232                        rld->routeRecord = overlayMsg->getRouteRecord();
     2233                }
     2234               
     2235                // set alive
    16052236                rld->setAlive();
     2237               
     2238               
     2239                /* answer keep alive */
     2240                if ( overlayMsg->getType() == OverlayMsg::typeKeepAlive )
     2241                {
     2242            time_t now = time(NULL);
     2243            logging_debug("[BaseOverlay] Answering KeepAlive over "
     2244                    << ld->to_string()
     2245                    << " after "
     2246                    << difftime( now, ld->keepAliveSent )
     2247                    << "s");
     2248           
     2249            OverlayMsg msg( OverlayMsg::typeKeepAliveReply,
     2250                    OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
     2251            msg.setRouteRecord(true);
     2252            ld->keepAliveSent = now;
     2253            send_link( &msg, ld->overlayId, system_priority::OVERLAY );
     2254                }
     2255
    16062256                return true;
    1607         } else {
    1608                 logging_error("Keep-Alive for "
     2257        }
     2258        else
     2259        {
     2260                logging_error("No Keep-Alive for "
    16092261                                << overlayMsg->getDestinationLink() << ": link unknown." );
    16102262                return false;
     
    16362288        // erase the original descriptor
    16372289        eraseDescriptor(ld->overlayId);
     2290       
     2291    // inform listener
     2292    if( rld->listener != NULL)
     2293        rld->listener->onLinkChanged( rld->overlayId, rld->remoteNode );
     2294       
    16382295        return true;
    16392296}
    16402297
    16412298/// handles an incoming message
    1642 bool BaseOverlay::handleMessage( const Message* message, LinkDescriptor* ld,
    1643                 const LinkID bcLink ) {
    1644         logging_debug( "Handling message: " << message->toString());
    1645 
     2299bool BaseOverlay::handleMessage( reboost::shared_buffer_t message, LinkDescriptor* ld,
     2300                const LinkID bcLink )
     2301{
    16462302        // decapsulate overlay message
    1647         OverlayMsg* overlayMsg =
    1648                         const_cast<Message*>(message)->decapsulate<OverlayMsg>();
    1649         if( overlayMsg == NULL ) return false;
    1650 
     2303        OverlayMsg* overlayMsg = new OverlayMsg();
     2304        reboost::shared_buffer_t sub_buff = overlayMsg->deserialize_from_shared_buffer(message);
     2305
     2306//      // XXX debug
     2307//      logging_info( "Received overlay message."
     2308//              << " Hops: " << (int) overlayMsg->getNumHops()
     2309//              << " Type: " << (int) overlayMsg->getType()
     2310//              << " Payload size: " << sub_buff.size()
     2311//             << " SeqNum: " << overlayMsg->getSeqNum() );
     2312       
     2313       
    16512314        // increase number of hops
    16522315        overlayMsg->increaseNumHops();
     
    16602323        // handle signaling messages (do not route!)
    16612324        if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart &&
    1662                         overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) {
    1663                 overlayInterface->onMessage(overlayMsg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED);
     2325                        overlayMsg->getType()<=OverlayMsg::typeSignalingEnd )
     2326        {
     2327                overlayInterface->onMessage(overlayMsg, sub_buff, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED);
    16642328                delete overlayMsg;
    16652329                return true;
     
    16732337                                << " to " << overlayMsg->getDestinationNode()
    16742338                );
    1675                 route( overlayMsg );
     2339               
     2340//              // XXX testing AKTUELL
     2341//        logging_info("MARIO: Routing message "
     2342//                << " from " << overlayMsg->getSourceNode()
     2343//                << " to " << overlayMsg->getDestinationNode() );
     2344//        logging_info( "Type: " << overlayMsg->getType() << " Payload size: " << sub_buff.size());
     2345                overlayMsg->append_buffer(sub_buff);
     2346               
     2347                route( overlayMsg, ld->remoteNode );
    16762348                delete overlayMsg;
    16772349                return true;
    16782350        }
    16792351
    1680         // handle base overlay message
     2352       
     2353        /* handle base overlay message */
    16812354        bool ret = false; // return value
    1682         switch ( overlayMsg->getType() ) {
    1683 
    1684         // data transport messages
    1685         case OverlayMsg::typeData:
    1686                 ret = handleData(overlayMsg, ld);                       break;
    1687 
    1688                 // overlay setup messages
    1689         case OverlayMsg::typeJoinRequest:
    1690                 ret = handleJoinRequest(overlayMsg, bcLink );   break;
    1691         case OverlayMsg::typeJoinReply:
    1692                 ret = handleJoinReply(overlayMsg, bcLink );     break;
    1693 
    1694                 // link specific messages
    1695         case OverlayMsg::typeLinkRequest:
    1696                 ret = handleLinkRequest(overlayMsg, ld );       break;
    1697         case OverlayMsg::typeLinkReply:
    1698                 ret = handleLinkReply(overlayMsg, ld );         break;
    1699         case OverlayMsg::typeLinkUpdate:
    1700                 ret = handleLinkUpdate(overlayMsg, ld );        break;
    1701         case OverlayMsg::typeLinkAlive:
    1702                 ret = handleLinkAlive(overlayMsg, ld );         break;
    1703         case OverlayMsg::typeLinkDirect:
    1704                 ret = handleLinkDirect(overlayMsg, ld );        break;
    1705 
    1706                 // handle unknown message type
    1707         default: {
    1708                 logging_error( "received message in invalid state! don't know " <<
    1709                                 "what to do with this message of type " << overlayMsg->getType() );
    1710                 ret = false;
    1711                 break;
    1712         }
     2355        try
     2356        {
     2357        switch ( overlayMsg->getType() )
     2358        {
     2359            // data transport messages
     2360            case OverlayMsg::typeData:
     2361            {
     2362                // NOTE: On relayed links, »ld« does not point to our link, but on the relay link.
     2363                LinkDescriptor* end_to_end_ld = getDescriptor(overlayMsg->getDestinationLink());
     2364               
     2365                if ( ! end_to_end_ld )
     2366                {
     2367                    logging_warn("Error: Data-Message claims to belong to a link we don't know.");
     2368                   
     2369                    ret = false;
     2370                }
     2371                else
     2372                {
     2373                    // message received --> link is alive
     2374                    end_to_end_ld->keepAliveReceived = time(NULL);
     2375                    // hop count on this link
     2376                    end_to_end_ld->hops = overlayMsg->getNumHops();
     2377                   
     2378                    // * call handler *
     2379                    ret = handleData(sub_buff, overlayMsg, end_to_end_ld);
     2380                }
     2381               
     2382                break;
     2383            }
     2384            case OverlayMsg::typeMessageLost:
     2385                ret = handleLostMessage(sub_buff, overlayMsg);
     2386               
     2387                break;
     2388       
     2389                // overlay setup messages
     2390            case OverlayMsg::typeJoinRequest:
     2391                ret = handleJoinRequest(sub_buff, overlayMsg->getSourceNode(), bcLink );        break;
     2392            case OverlayMsg::typeJoinReply:
     2393                ret = handleJoinReply(sub_buff, bcLink );       break;
     2394       
     2395                // link specific messages
     2396            case OverlayMsg::typeLinkRequest:
     2397                ret = handleLinkRequest(overlayMsg, ld );       break;
     2398            case OverlayMsg::typeLinkReply:
     2399                ret = handleLinkReply(overlayMsg, sub_buff, ld );       break;
     2400            case OverlayMsg::typeLinkUpdate:
     2401                ret = handleLinkUpdate(overlayMsg, ld );        break;
     2402            case OverlayMsg::typeKeepAlive:
     2403            case OverlayMsg::typeKeepAliveReply:
     2404                ret = handleLinkAlive(overlayMsg, ld );         break;
     2405            case OverlayMsg::typeLinkDirect:
     2406                ret = handleLinkDirect(overlayMsg, ld );        break;
     2407               
     2408            case OverlayMsg::typeLinkClose:
     2409            {
     2410                dropLink(overlayMsg->getDestinationLink());
     2411                __removeDroppedLink(overlayMsg->getDestinationLink());
     2412               
     2413                break;
     2414            }
     2415           
     2416            /// ping over overlay path (or similar)
     2417            case OverlayMsg::typePing:
     2418            {
     2419                ret = handlePing(overlayMsg, ld);
     2420                break;
     2421            }
     2422            case OverlayMsg::typePong:
     2423            {
     2424                ret = handlePong(overlayMsg, ld);
     2425                break;
     2426            }
     2427           
     2428                // handle unknown message type
     2429            default:
     2430            {
     2431                logging_error( "received message in invalid state! don't know " <<
     2432                        "what to do with this message of type " << overlayMsg->getType() );
     2433                ret = false;
     2434                break;
     2435            }
     2436        }
     2437        }
     2438        catch ( reboost::illegal_sub_buffer& e )
     2439        {
     2440            logging_error( "Failed to create sub-buffer while reading message: »"
     2441                    << e.what()
     2442                    << "« Message too short? ");
     2443           
     2444            assert(false); // XXX
    17132445        }
    17142446
     
    17202452// ----------------------------------------------------------------------------
    17212453
    1722 void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) {
     2454void BaseOverlay::broadcastMessage(reboost::message_t message, const ServiceID& service, uint8_t priority) {
    17232455
    17242456        logging_debug( "broadcasting message to all known nodes " <<
    17252457                        "in the overlay from service " + service.toString() );
    1726 
    1727         if(message == NULL) return;
    1728         message->setReleasePayload(false);
    17292458
    17302459        OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true);
     
    17322461                NodeID& id = nodes.at(i);
    17332462                if(id == this->nodeId) continue; // don't send to ourselfs
    1734                 if(i+1 == nodes.size()) message->setReleasePayload(true); // release payload on last send
    1735                 sendMessage( message, id, service );
     2463
     2464                sendMessage( message, id, priority, service );
    17362465        }
    17372466}
     
    17552484vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
    17562485        vector<LinkID> linkvector;
    1757         BOOST_FOREACH( LinkDescriptor* ld, links ) {
     2486        foreach( LinkDescriptor* ld, links ) {
    17582487                if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
    17592488                        linkvector.push_back( ld->overlayId );
     
    17792508        updateVisual();
    17802509}
     2510
     2511
     2512
     2513/* link status */
     2514bool BaseOverlay::isLinkDirect(const ariba::LinkID& lnk) const
     2515{
     2516    const LinkDescriptor* ld = getDescriptor(lnk);
     2517   
     2518    if (!ld)
     2519        return false;
     2520   
     2521    return ld->communicationUp && !ld->relayed;
     2522}
     2523
     2524int BaseOverlay::getHopCount(const ariba::LinkID& lnk) const
     2525{
     2526    const LinkDescriptor* ld = getDescriptor(lnk);
     2527   
     2528    if (!ld)
     2529        return -1;
     2530   
     2531    return ld->hops;   
     2532}
     2533
     2534
     2535bool BaseOverlay::isLinkVital(const LinkDescriptor* link) const
     2536{
     2537    time_t now = time(NULL);
     2538
     2539    return link->up && difftime( now, link->keepAliveReceived ) <= KEEP_ALIVE_TIME_OUT; // TODO is this too long for a "vital" link..?
     2540}
     2541
     2542bool BaseOverlay::isLinkDirectVital(const LinkDescriptor* link) const
     2543{
     2544    return isLinkVital(link) && link->communicationUp && !link->relayed;
     2545}
     2546
     2547/* [link status] */
     2548
    17812549
    17822550void BaseOverlay::updateVisual(){
     
    18782646        static set<NodeID> linkset;
    18792647        set<NodeID> remotenodes;
    1880         BOOST_FOREACH( LinkDescriptor* ld, links ) {
    1881                 if (!ld->isVital() || ld->service != OverlayInterface::OVERLAY_SERVICE_ID)
     2648        foreach( LinkDescriptor* ld, links ) {
     2649                if (!isLinkVital(ld) || ld->service != OverlayInterface::OVERLAY_SERVICE_ID)
    18822650                        continue;
    18832651
     
    18952663        do{
    18962664                changed = false;
    1897                 BOOST_FOREACH(NodeID n, linkset){
     2665                foreach(NodeID n, linkset){
    18982666                        if(remotenodes.find(n) == remotenodes.end()){
    18992667                                visualInstance.visDisconnect(visualIdBase, this->nodeId, n, "");
     
    19082676        do{
    19092677                changed = false;
    1910                 BOOST_FOREACH(NodeID n, remotenodes){
     2678                foreach(NodeID n, remotenodes){
    19112679                        if(linkset.find(n) == linkset.end()){
    19122680                                visualInstance.visConnect(visualIdBase, this->nodeId, n, "");
     
    19332701        // dump link state
    19342702        s << "--- link state -------------------------------" << endl;
    1935         BOOST_FOREACH( LinkDescriptor* ld, links ) {
     2703        foreach( LinkDescriptor* ld, links ) {
    19362704                s << "link " << i << ": " << ld << endl;
    19372705                i++;
Note: See TracChangeset for help on using the changeset viewer.