00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 extern "C"
00031 {
00032
00033 #include <unistd.h>
00034 #include <sys/types.h>
00035 #include <netinet/in.h>
00036 #include <netinet/tcp.h>
00037 #include <sys/socket.h>
00038 #include <arpa/inet.h>
00039
00040 #include <fcntl.h>
00041 #include <sys/poll.h>
00042 }
00043
00044 #include <iostream>
00045 #include <errno.h>
00046 #include <string>
00047 #include <sstream>
00048
00049 #include "tp_over_tcp.h"
00050 #include "threadsafe_db.h"
00051 #include "cleanuphandler.h"
00052 #include "setuid.h"
00053 #include "queuemanager.h"
00054 #include "logfile.h"
00055
00056 #include <set>
00057
00058 #define TCP_SUCCESS 0
00059 #define TCP_SEND_FAILURE 1
00060
00061 const unsigned int max_listen_queue_size= 10;
00062
00063 #define IPV6_ADDR_INT32_SMP 0x0000ffff
00064
00065 namespace protlib {
00066
00067 void v6_to_v4(struct sockaddr_in *sin, struct sockaddr_in6 *sin6) {
00068 bzero(sin, sizeof(*sin));
00069 sin->sin_family = AF_INET;
00070 sin->sin_port = sin6->sin6_port;
00071 memcpy(&sin->sin_addr, &sin6->sin6_addr.s6_addr[12], sizeof(struct in_addr));
00072 }
00073
00074
00075 void v4_to_v6(struct sockaddr_in *sin, struct sockaddr_in6 *sin6) {
00076 bzero(sin6, sizeof(*sin6));
00077 sin6->sin6_family = AF_INET6;
00078 sin6->sin6_port = sin->sin_port;
00079 *(uint32_t *)&sin6->sin6_addr.s6_addr[0] = 0;
00080 *(uint32_t *)&sin6->sin6_addr.s6_addr[4] = 0;
00081 *(uint32_t *)&sin6->sin6_addr.s6_addr[8] = IPV6_ADDR_INT32_SMP;
00082 *(uint32_t *)&sin6->sin6_addr.s6_addr[12] = sin->sin_addr.s_addr;
00083 }
00084
00085 using namespace log;
00086
00092 char in6_addrstr[INET6_ADDRSTRLEN+1];
00093
00094
00095
00096
00097
00104 AssocData*
00105 TPoverTCP::get_connection_to(const appladdress& addr)
00106 {
00107
00108 struct timespec ts;
00109 get_time_of_day(ts);
00110 ts.tv_nsec+= tpparam.sleep_time * 1000000L;
00111 if (ts.tv_nsec>=1000000000L)
00112 {
00113 ts.tv_sec += ts.tv_nsec / 1000000000L;
00114 ts.tv_nsec= ts.tv_nsec % 1000000000L;
00115 }
00116
00117
00118 AssocData* assoc= NULL;
00119 int new_socket;
00120
00121 do
00122 {
00123
00124
00125 lock();
00126 assoc= connmap.lookup(addr);
00127
00128 unlock();
00129 if (assoc)
00130 {
00131
00132 if (!assoc->shutdown)
00133 {
00134 return assoc;
00135 }
00136 else
00137 {
00138
00139 ERRCLog(tpparam.name,"socket exists, but is already in mode shutdown");
00140
00141 return 0;
00142 }
00143 }
00144 else
00145 {
00146 Log(DEBUG_LOG,LOG_UNIMP,tpparam.name,"No existing connection to "
00147 << addr.get_ip_str() << " port #" << addr.get_port() << " found, creating a new one.");
00148 }
00149
00150
00151 new_socket = socket( v4_mode ? AF_INET : AF_INET6, SOCK_STREAM, IPPROTO_TCP);
00152 if (new_socket == -1)
00153 {
00154 ERRCLog(tpparam.name,"Couldn't create a new socket: " << strerror(errno));
00155
00156 return 0;
00157 }
00158
00159
00160 int nodelayflag= 1;
00161 int status= setsockopt(new_socket,
00162 IPPROTO_TCP,
00163 TCP_NODELAY,
00164 &nodelayflag,
00165 sizeof(nodelayflag));
00166 if (status)
00167 {
00168 ERRLog(tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
00169 }
00170
00171
00172 int socketreuseflag= 1;
00173 status= setsockopt(new_socket,
00174 SOL_SOCKET,
00175 SO_REUSEADDR,
00176 (const char *) &socketreuseflag,
00177 sizeof(socketreuseflag));
00178 if (status)
00179 {
00180 ERRLog(tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
00181 }
00182
00183 struct sockaddr_in6 dest_address;
00184 dest_address.sin6_flowinfo= 0;
00185 dest_address.sin6_scope_id= 0;
00186 addr.get_sockaddr(dest_address);
00187
00188
00189 int connect_status = 0;
00190 if (v4_mode) {
00191 struct sockaddr_in dest_address_v4;
00192 v6_to_v4( &dest_address_v4, &dest_address );
00193 connect_status = connect(new_socket,
00194 reinterpret_cast<const struct sockaddr*>(&dest_address_v4),
00195 sizeof(dest_address));
00196 } else {
00197 connect_status = connect(new_socket,
00198 reinterpret_cast<const struct sockaddr*>(&dest_address),
00199 sizeof(dest_address));
00200 }
00201
00202
00203 if (connect_status != 0)
00204 {
00205 ERRLog(tpparam.name,"Connect to " << addr.get_ip_str() << " port #" << addr.get_port()
00206 << " failed: [" << color[red] << strerror(errno) << color[off] << "]");
00207
00208 return 0;
00209 }
00210
00211
00212 struct sockaddr_in6 own_address;
00213 if (v4_mode) {
00214 struct sockaddr_in own_address_v4;
00215 socklen_t own_address_len_v4 = sizeof(own_address_v4);
00216 getsockname(new_socket, reinterpret_cast<struct sockaddr*>(&own_address_v4), &own_address_len_v4);
00217 v4_to_v6(&own_address_v4, &own_address);
00218 } else {
00219 socklen_t own_address_len= sizeof(own_address);
00220 getsockname(new_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
00221 }
00222
00223 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,">>--Connect-->> to " << addr.get_ip_str() << " port #" << addr.get_port()
00224 << " from " << inet_ntop(AF_INET6,&own_address.sin6_addr,in6_addrstr,INET6_ADDRSTRLEN)
00225 << " port #" << ntohs(own_address.sin6_port));
00226
00227
00228
00229 assoc = new(nothrow) AssocData(new_socket, addr, appladdress(own_address,IPPROTO_TCP));
00230
00231
00232 if (assoc)
00233 {
00234 bool insert_success= false;
00235
00236 lock();
00237
00238 insert_success= connmap.insert(assoc);
00239
00240 unlock();
00241
00242 if (insert_success == true)
00243 {
00244 #ifdef _DEBUG
00245 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Connected to " << addr.get_ip_str() << ", port #" << addr.get_port()
00246 << " via socket " << new_socket);
00247
00248
00249 #endif
00250
00251
00252 TPoverTCPMsg* newmsg= new(nothrow)TPoverTCPMsg(assoc, tpparam.source, TPoverTCPMsg::start);
00253 if (newmsg)
00254 {
00255 newmsg->send_to(tpparam.source);
00256 return assoc;
00257 }
00258 else
00259 ERRCLog(tpparam.name,"get_connection_to: could not get memory for internal msg");
00260 }
00261 else
00262 {
00263
00264 ERRCLog(tpparam.name, "Cannot insert AssocData for socket " << new_socket << ", "<< addr.get_ip_str()
00265 <<", port #" << addr.get_port() << " into connection map, aborting connection");
00266
00267
00268 close (new_socket);
00269 if (assoc)
00270 {
00271 delete assoc;
00272 assoc= 0;
00273 }
00274 return assoc;
00275 }
00276
00277 }
00278 }
00279 while (wait_cond(ts)!=ETIMEDOUT);
00280
00281 return assoc;
00282 }
00283
00284
00289 void
00290 TPoverTCP::terminate(const address& in_addr)
00291 {
00292 #ifndef _NO_LOGGING
00293 const char *const thisproc="terminate() - ";
00294 #endif
00295
00296 appladdress* addr = NULL;
00297 addr = dynamic_cast<appladdress*>(in_addr.copy());
00298
00299 if (!addr) return;
00300
00301
00302 AssocData* assoc = NULL;
00303
00304
00305
00306
00307
00308
00309
00310 lock();
00311 assoc= connmap.lookup(*addr);
00312 if (assoc)
00313 {
00314 EVLog(tpparam.name,thisproc<<"got request to shutdown connection for peer " << addr);
00315
00316 if (!assoc->shutdown)
00317 {
00318 if (assoc->socketfd)
00319 {
00320
00321 if (shutdown(assoc->socketfd,SHUT_WR))
00322 {
00323 ERRLog(tpparam.name,thisproc<<"shutdown (write) on socket for peer " << addr << " returned error:" << strerror(errno));
00324 }
00325 else
00326 {
00327 EVLog(tpparam.name,thisproc<<"initiated closing of connection for peer " << addr << ". Shutdown (write) on socket "<< assoc->socketfd );
00328 }
00329 }
00330 assoc->shutdown= true;
00331 }
00332 else
00333 EVLog(tpparam.name,thisproc<<"connection for peer " << addr << " is already in mode shutdown");
00334
00335 }
00336 else
00337 WLog(tpparam.name,thisproc<<"could not find a connection for peer " << *addr);
00338
00339 stop_receiver_thread(assoc);
00340
00341
00342 unlock();
00343
00344 if (addr) delete addr;
00345 }
00346
00347
00355 void
00356 TPoverTCP::send(NetMsg* netmsg, const address& in_addr, bool use_existing_connection)
00357 {
00358 if (netmsg == NULL) {
00359 ERRCLog(tpparam.name,"send() - called without valid NetMsg buffer (NULL)");
00360 return;
00361 }
00362
00363 appladdress* addr = NULL;
00364 addr= dynamic_cast<appladdress*>(in_addr.copy());
00365
00366 if (!addr)
00367 {
00368 ERRCLog(tpparam.name,"send() - given destination address is not of expected type (appladdress), has type " << (int) in_addr.get_type());
00369 return;
00370 }
00371
00372
00373 lock();
00374
00375
00376 sender_thread_queuemap_t::const_iterator it= senderthread_queuemap.find(*addr);
00377
00378 FastQueue* destqueue= 0;
00379
00380 if (it == senderthread_queuemap.end())
00381 {
00382
00383
00384
00385
00386 const AssocData* assoc = connmap.lookup(*addr);
00387
00388
00389
00390 if (use_existing_connection==false || (assoc && assoc->shutdown==false && assoc->socketfd>0))
00391 {
00392
00393 FastQueue* sender_thread_queue= new FastQueue;
00394 create_new_sender_thread(sender_thread_queue);
00395
00396
00397
00398 senderthread_queuemap.insert( pair<appladdress,FastQueue*> (*addr,sender_thread_queue) );
00399
00400 destqueue= sender_thread_queue;
00401 }
00402 }
00403 else
00404 {
00405 destqueue= it->second;
00406 }
00407
00408 unlock();
00409
00410
00411 if (destqueue)
00412 {
00413
00414
00415 TPoverTCPMsg* internalmsg= new TPoverTCPMsg(netmsg,new appladdress(*addr));
00416 if (internalmsg)
00417 {
00418
00419 internalmsg->send(tpparam.source,destqueue);
00420 }
00421 }
00422 else
00423 {
00424 if (!use_existing_connection)
00425 WLog(tpparam.name,"send() - found entry for address, but no active sender thread available for peer addr:" << *addr << " - dropping data");
00426 else
00427 {
00428 DLog(tpparam.name,"no active sender thread found for peer " << *addr << " - but policy forbids to set up a new connection, will drop data");
00429
00430 }
00431
00432 delete netmsg;
00433 }
00434
00435 if (addr) delete addr;
00436 }
00437
00446 void
00447 TPoverTCP::tcpsend(NetMsg* netmsg, appladdress* addr)
00448 {
00449 #ifndef _NO_LOGGING
00450 const char *const thisproc="sender - ";
00451 #endif
00452
00453
00454
00455 int result = TCP_SUCCESS;
00456 int saved_errno= 0;
00457 int ret= 0;
00458
00459
00460 AssocData* assoc = NULL;
00461
00462
00463
00464
00465 if (addr) {
00466 addr->convert_to_ipv6();
00467 check_send_args(*netmsg,*addr);
00468 }
00469 else
00470 {
00471 ERRCLog(tpparam.name, thisproc << "address pointer is NULL");
00472 result= TCP_SEND_FAILURE;
00473
00474 delete netmsg;
00475 delete addr;
00476
00477 throw TPErrorInternal();
00478 }
00479
00480
00481
00482
00483
00484
00485
00486
00487 assoc= get_connection_to(*addr);
00488
00489 if (assoc==NULL || assoc->socketfd<=0)
00490 {
00491 ERRCLog(tpparam.name, color[red] << thisproc << "no valid assoc/socket data - dropping packet");
00492
00493 delete netmsg;
00494 delete addr;
00495 return;
00496 }
00497
00498 if (assoc->shutdown)
00499 {
00500 Log(WARNING_LOG, LOG_ALERT, tpparam.name, thisproc << "should send message although connection already half closed");
00501 delete netmsg;
00502 delete addr;
00503
00504 throw TPErrorSendFailed();
00505 }
00506
00507 uint32 msgsize= netmsg->get_size();
00508 #ifdef DEBUG_HARD
00509 cerr << thisproc << "message size=" << netmsg->get_size() << endl;
00510 #endif
00511
00512 const uint32 retry_send_max = 3;
00513 uint32 retry_count = 0;
00514
00515
00516 for (uint32 bytes_sent= 0;
00517 bytes_sent < msgsize;
00518 bytes_sent+= ret)
00519 {
00520
00521 #ifdef _DEBUG_HARD
00522 for (uint32 i=0;i<msgsize;i++)
00523 {
00524 cout << "send_buf: " << i << " : ";
00525 if ( isalnum(*(netmsg->get_buffer()+i)) )
00526 cout << "'" << *(netmsg->get_buffer()+i) << "' (0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec << ")" ;
00527 else
00528 cout << "0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec;
00529 cout << endl;
00530 }
00531
00532 cout << endl;
00533 cout << "bytes_sent: " << bytes_sent << endl;
00534 cout << "Message size: " << msgsize << endl;
00535 cout << "Send-Socket: " << assoc->socketfd << endl;
00536 cout << "pointer-Offset. " << netmsg->get_pos() << endl;
00537 cout << "vor send " << endl;
00538 #endif
00539
00540 retry_count= 0;
00541 do
00542 {
00543
00544 ret= ::send(assoc->socketfd,
00545 netmsg->get_buffer() + bytes_sent,
00546 msgsize - bytes_sent,
00547 MSG_NOSIGNAL);
00548
00549
00550
00551
00552
00553 if (ret < 0)
00554 {
00555 saved_errno= errno;
00556 switch(saved_errno)
00557 {
00558 case EAGAIN:
00559 case EINTR:
00560 case ENOBUFS:
00561 retry_count++;
00562 ERRLog(tpparam.name,"Temporary failure while calling send(): " << strerror(saved_errno) << ", errno: " << saved_errno
00563 << " - retry sending, retry #" << retry_count);
00564
00565 sleep(1);
00566 break;
00567
00568
00569 default:
00570 retry_count= retry_send_max;
00571 break;
00572 }
00573 }
00574 else
00575 break;
00576 }
00577 while(retry_count < retry_send_max);
00578
00579 if (ret < 0)
00580 {
00581
00582 result= TCP_SEND_FAILURE;
00583 break;
00584 }
00585 else
00586 {
00587 if (debug_pdu)
00588 {
00589 ostringstream hexdump;
00590 netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_sent);
00591 DLog(tpparam.name,"PDU debugging enabled - Sent:" << hexdump.str());
00592 }
00593 }
00594
00595
00596 }
00597
00598
00599 delete netmsg;
00600
00601
00602
00603
00604 if (result != TCP_SUCCESS)
00605 {
00606 ERRLog(tpparam.name, thisproc << "TCP error, returns " << ret << ", error : " << strerror(errno));
00607 delete addr;
00608
00609 throw TPErrorSendFailed(saved_errno);
00610
00611 }
00612 else
00613 EVLog(tpparam.name, thisproc << ">>----Sent---->> message (" << msgsize << " bytes) using socket " << assoc->socketfd << " to " << *addr);
00614
00615 if (!assoc) {
00616
00617
00618 ERRLog(tpparam.name, thisproc << "cannot get connection to " << addr->get_ip_str()
00619 << ", port #" << addr->get_port());
00620
00621 delete addr;
00622
00623 throw TPErrorUnreachable();
00624 }
00625
00626
00627 delete addr;
00628 }
00629
00630
00631
00632
00633
00634
00635 void
00636 TPoverTCP::sender_thread(void *argp)
00637 {
00638 #ifndef _NO_LOGGING
00639 const char *const methodname="senderthread - ";
00640 #endif
00641
00642 message* internal_thread_msg = NULL;
00643
00644 EVLog(tpparam.name, methodname << "starting as thread <" << pthread_self() << ">");
00645
00646 FastQueue* fq= reinterpret_cast<FastQueue*>(argp);
00647 if (!fq)
00648 {
00649 ERRLog(tpparam.name, methodname << "thread <" << pthread_self() << "> no valid pointer to msg queue. Stop.");
00650 return;
00651 }
00652
00653 bool terminate= false;
00654 TPoverTCPMsg* internalmsg= 0;
00655 while (terminate==false && (internal_thread_msg= fq->dequeue()) != 0 )
00656 {
00657 internalmsg= dynamic_cast<TPoverTCPMsg*>(internal_thread_msg);
00658
00659 if (internalmsg == 0)
00660 {
00661 ERRLog(tpparam.name, methodname << "received not an TPoverTCPMsg but a" << internal_thread_msg->get_type_name());
00662 }
00663 else
00664 if (internalmsg->get_msgtype() == TPoverTCPMsg::send_data)
00665 {
00666
00667 if (internalmsg->get_netmsg() && internalmsg->get_appladdr())
00668 {
00669 try
00670 {
00671 tcpsend(internalmsg->get_netmsg(),internalmsg->get_appladdr());
00672 }
00673 catch(TPErrorSendFailed& err)
00674 {
00675 ERRLog(tpparam.name, methodname << "TCP send call failed - " << err.what()
00676 << " cause: (" << err.get_reason() << ") " << strerror(err.get_reason()) );
00677 }
00678 catch(TPError& err)
00679 {
00680 ERRLog(tpparam.name, methodname << "TCP send call failed - reason: " << err.what());
00681 }
00682 catch(...)
00683 {
00684 ERRLog(tpparam.name, methodname << "TCP send call failed - unknown exception");
00685 }
00686 }
00687 else
00688 {
00689 ERRLog(tpparam.name, methodname << "problem with passed arguments references, they point to 0");
00690 }
00691 }
00692 else
00693 if (internalmsg->get_msgtype() == TPoverTCPMsg::stop)
00694 {
00695 terminate= true;
00696 }
00697 }
00698
00699 EVLog(tpparam.name, methodname << "<" << pthread_self() << "> terminated connection.");
00700 }
00701
00702
00710 void
00711 TPoverTCP::receiver_thread(void *argp)
00712 {
00713 #ifndef _NO_LOGGING
00714 const char *const methodname="receiver - ";
00715 #endif
00716
00717 receiver_thread_arg_t *receiver_thread_argp= static_cast<receiver_thread_arg_t *>(argp);
00718 const appladdress* peer_addr = NULL;
00719 const appladdress* own_addr = NULL;
00720 uint32 bytes_received = 0;
00721 TPMsg* tpmsg= NULL;
00722
00723
00724 if (receiver_thread_argp == 0)
00725 {
00726 ERRCLog(tpparam.name, methodname << "No arguments given at start of receiver thread <" << pthread_self() << ">, exiting.");
00727
00728 return;
00729 }
00730 else
00731 {
00732
00733 receiver_thread_argp->terminated= false;
00734
00735 #ifdef _DEBUG
00736 DLog(tpparam.name, methodname << "New receiver thread <" << pthread_self() << "> started. ");
00737 #endif
00738 }
00739
00740 int conn_socket= 0;
00741 if (receiver_thread_argp->peer_assoc)
00742 {
00743
00744 conn_socket = receiver_thread_argp->peer_assoc->socketfd;
00745
00746 peer_addr= &receiver_thread_argp->peer_assoc->peer;
00747 own_addr= &receiver_thread_argp->peer_assoc->ownaddr;
00748 }
00749 else
00750 {
00751 ERRCLog(tpparam.name, methodname << "No peer assoc available - pointer is NULL");
00752
00753 return;
00754 }
00755
00756 if (peer_addr == 0)
00757 {
00758 ERRCLog(tpparam.name, methodname << "No peer address available for socket " << conn_socket << ", exiting.");
00759
00760 return;
00761 }
00762
00763 #ifdef _DEBUG
00764 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname <<
00765 "Preparing to wait for data at socket "
00766 << conn_socket << " from " << receiver_thread_argp->peer_assoc->peer);
00767 #endif
00768
00769 int ret= 0;
00770 uint32 msgcontentlength= 0;
00771 bool msgcontentlength_known= false;
00772 bool pdu_complete= false;
00773
00774
00776
00777
00778
00779
00780
00781
00782
00783
00784 const unsigned int number_poll_sockets= 1;
00785 struct pollfd poll_fd;
00786
00787 poll_fd.fd = conn_socket;
00788 poll_fd.events = POLLIN | POLLPRI;
00789 poll_fd.revents = 0;
00790
00791 int poll_status;
00792 bool recv_error= false;
00793
00794 NetMsg* netmsg= 0;
00795 NetMsg* remainbuf= 0;
00796 size_t buffer_bytes_left= 0;
00797 size_t trailingbytes= 0;
00798 bool skiprecv= false;
00799
00800
00801 while( receiver_thread_argp->sig_terminate == false )
00802 {
00803
00804 ret= 0;
00805 msgcontentlength= 0;
00806 msgcontentlength_known= false;
00807 pdu_complete= false;
00808 netmsg= 0;
00809
00810
00811 if (remainbuf != 0)
00812 {
00813 netmsg= remainbuf;
00814 remainbuf= 0;
00815 buffer_bytes_left= netmsg->get_size()-trailingbytes;
00816 bytes_received= trailingbytes;
00817 trailingbytes= 0;
00818 skiprecv= true;
00819 }
00820 else
00821 if ( (netmsg= new NetMsg(NetMsg::max_size)) != 0 )
00822 {
00823 buffer_bytes_left= netmsg->get_size();
00824 bytes_received= 0;
00825 skiprecv= false;
00826 }
00827 else
00828 {
00829 bytes_received= 0;
00830 buffer_bytes_left= 0;
00831 recv_error= true;
00832 }
00833
00834
00835
00836 while (!pdu_complete &&
00837 !recv_error &&
00838 !receiver_thread_argp->sig_terminate)
00839 {
00840 if (!skiprecv)
00841 {
00842
00843 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
00844
00845 if (receiver_thread_argp->sig_terminate)
00846 {
00847 Log(EVENT_LOG,LOG_UNIMP,tpparam.name,methodname << "Thread <" << pthread_self() << "> found terminate signal after poll");
00848
00849 AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
00850 if (myassoc->shutdown == false)
00851 {
00852 myassoc->shutdown= true;
00853 if (shutdown(myassoc->socketfd,SHUT_WR))
00854 {
00855 if ( errno != ENOTCONN )
00856 Log(ERROR_LOG,LOG_UNIMP,tpparam.name,methodname <<"shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
00857 }
00858 }
00859
00860 if (poll_status == 0)
00861 {
00862 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
00863 }
00864 }
00865
00866 if (poll_fd.revents & POLLERR)
00867 {
00868 if (errno == 0 || errno == EINTR)
00869 {
00870 EVLog(tpparam.name, methodname << "poll(): " << strerror(errno));
00871 }
00872 else
00873 {
00874 ERRCLog(tpparam.name, methodname << "Poll indicates error: " << strerror(errno));
00875 recv_error= true;
00876 }
00877 }
00878
00879 if (poll_fd.revents & POLLHUP)
00880 {
00881 Log(EVENT_LOG,LOG_CRIT, tpparam.name, methodname << "Poll hung up");
00882 recv_error= true;
00883 }
00884
00885 if (poll_fd.revents & POLLNVAL)
00886 {
00887 EVLog(tpparam.name, methodname << "Poll Invalid request: fd not open");
00888 recv_error= true;
00889 }
00890
00891
00892 switch (poll_status)
00893 {
00894 case -1:
00895 if (errno == 0 || errno == EINTR)
00896 {
00897 EVLog(tpparam.name, methodname << "Poll status: " << strerror(errno));
00898 }
00899 else
00900 {
00901 ERRCLog(tpparam.name, methodname << "Poll status indicates error: " << strerror(errno) << "- aborting");
00902 recv_error= true;
00903 }
00904
00905 continue;
00906 break;
00907
00908 case 0:
00909 #ifdef DEBUG_HARD
00910 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll timed out after " << tpparam.sleep_time << " ms.");
00911 #endif
00912 continue;
00913 break;
00914
00915 default:
00916 #ifdef DEBUG_HARD
00917 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
00918 #endif
00919 break;
00920 }
00921
00922
00924 ret = recv(conn_socket,
00925 netmsg->get_buffer() + bytes_received,
00926 buffer_bytes_left,
00927 MSG_DONTWAIT);
00928
00929 if ( ret < 0 )
00930 {
00931 if (errno!=EAGAIN && errno!=EWOULDBLOCK)
00932 {
00933 ERRCLog(tpparam.name, methodname << "Receive at socket " << conn_socket << " failed, error: " << strerror(errno));
00934 recv_error= true;
00935 continue;
00936 }
00937 else
00938 {
00939
00940 continue;
00941 }
00942 }
00943 else
00944 {
00945 if (ret == 0)
00946 {
00947
00948
00949 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Other side (" << *peer_addr << ") closed connection for socket " << conn_socket);
00950
00951 AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
00952 if (myassoc->shutdown == false)
00953 {
00954 myassoc->shutdown= true;
00955 if (shutdown(myassoc->socketfd,SHUT_WR))
00956 {
00957 if ( errno != ENOTCONN )
00958 Log(ERROR_LOG,LOG_UNIMP,tpparam.name, methodname << "shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
00959 }
00960 }
00961
00962 recv_error= true;
00963 }
00964 else
00965 {
00966
00967 Log(EVENT_LOG,LOG_UNIMP, tpparam.name, methodname << "<<--Received--<< packet (" << ret << " bytes) at socket " << conn_socket << " from " << *peer_addr);
00968
00969 bytes_received+= ret;
00970 buffer_bytes_left-= ret;
00971 }
00972 }
00973 }
00974
00975 if (buffer_bytes_left < 0)
00976 {
00977 recv_error= true;
00978 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "during receive buffer space exhausted");
00979 }
00980
00981 if (!msgcontentlength_known)
00982 {
00983
00984 if (bytes_received >= common_header_length)
00985 {
00986
00987 if (getmsglength(*netmsg, msgcontentlength))
00988 msgcontentlength_known= true;
00989 else
00990 {
00991 ERRCLog(tpparam.name, methodname << "Not a valid protocol header - discarding received packet. received size " << msgcontentlength);
00992
00993 ostringstream hexdumpstr;
00994 netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
00995 DLog(tpparam.name,"dumping received bytes:" << hexdumpstr.str());
00996
00997
00998 msgcontentlength= 0;
00999 msgcontentlength_known= false;
01000 bytes_received= 0;
01001 pdu_complete= false;
01002 continue;
01003 }
01004 }
01005 }
01006
01007
01008 DLog(tpparam.name, "bytes_received-common_header_length=" << bytes_received-common_header_length << " msgcontentlength: " << msgcontentlength);
01009 if (msgcontentlength_known)
01010 {
01011 if (bytes_received-common_header_length >= msgcontentlength )
01012 {
01013 pdu_complete= true;
01014
01015 netmsg->truncate(common_header_length+msgcontentlength);
01016
01017
01018 if (bytes_received-common_header_length > msgcontentlength)
01019 {
01020 WLog(tpparam.name,"trailing bytes - received more bytes ("<<bytes_received<<") than expected for PDU (" << common_header_length+msgcontentlength << ")");
01021 remainbuf= new NetMsg(NetMsg::max_size);
01022 trailingbytes= (bytes_received-common_header_length) - msgcontentlength;
01023 bytes_received= common_header_length+msgcontentlength;
01024 memcpy(remainbuf->get_buffer(),netmsg->get_buffer()+common_header_length+msgcontentlength, trailingbytes);
01025 }
01026 }
01027 else
01028 {
01029 skiprecv= false;
01030 }
01031 }
01032 }
01033
01034
01035
01036 if (ret == 0)
01037 {
01038 recv_error= false;
01039 }
01040
01041
01042 if (!recv_error && pdu_complete)
01043 {
01044
01045 tpmsg = new(nothrow) TPMsg(netmsg, peer_addr->copy(), own_addr->copy());
01046 if (tpmsg)
01047 {
01048 DLog(tpparam.name, methodname << "receipt of PDU now complete, sending msg#" << tpmsg->get_id()
01049 << " to module " << message::get_qaddr_name(tpparam.dest));
01050 }
01051
01052 debug_pdu=false;
01053
01054 if (debug_pdu)
01055 {
01056 ostringstream hexdump;
01057 netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_received);
01058 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"PDU debugging enabled - Received:" << hexdump.str());
01059 }
01060
01061
01062
01063 if (!tpmsg
01064 || (!tpmsg->get_peeraddress())
01065 || (!tpmsg->send(message::qaddr_tp_over_tcp, tpparam.dest)))
01066 {
01067 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Cannot allocate/send TPMsg");
01068 if (tpmsg) delete tpmsg;
01069 }
01070
01071
01072 }
01073 else
01074 {
01075 if (bytes_received>0)
01076 {
01077 Log(WARNING_LOG,LOG_NORMAL, tpparam.name, methodname << "Attention! " << (recv_error? "Receive error, " : "") << (pdu_complete ? "PDU complete" : "PDU incomplete") << "received bytes: " << bytes_received);
01078 }
01079
01080 if (!pdu_complete && bytes_received>0 && bytes_received<common_header_length)
01081 {
01082 ostringstream hexdumpstr;
01083 netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
01084 Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"Message too short to be a valid protocol header - dumping received bytes:" << hexdumpstr.str());
01085 }
01086
01087
01088 break;
01089
01090 }
01091
01092 }
01093
01094 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self()
01095 << "> shutting down and closing socket " << receiver_thread_argp->peer_assoc->peer);
01096
01097
01098 if (shutdown(conn_socket, SHUT_RD))
01099 {
01100 if ( errno != ENOTCONN )
01101 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> shutdown (read) on socket failed, reason: " << strerror(errno));
01102 }
01103
01104
01105 close(conn_socket);
01106
01107 receiver_thread_argp->terminated= true;
01108
01109 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> terminated");
01110
01111 #ifdef _DEBUG
01112 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Signaling main loop for cleanup");
01113 #endif
01114
01115 TPoverTCPMsg* newmsg= new(nothrow)TPoverTCPMsg(receiver_thread_argp->peer_assoc);
01116
01117 newmsg->send_to(tpparam.source);
01118
01119 }
01120
01121
01127 void
01128 TPoverTCP::stop_receiver_thread(AssocData* peer_assoc)
01129 {
01130
01131
01132
01133
01134
01135 if (peer_assoc == 0)
01136 return;
01137
01138 pthread_t thread_id= peer_assoc->thread_ID;
01139
01140
01141 recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
01142 receiver_thread_arg_t* recv_thread_argp=
01143 (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
01144 if (recv_thread_argp)
01145 {
01146 if (!recv_thread_argp->terminated)
01147 {
01148
01149 Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> signaled for termination");
01150
01151
01152 recv_thread_argp->sig_terminate= true;
01153
01154 pthread_join(thread_id, 0);
01155
01156 return;
01157 }
01158 }
01159 else
01160 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> not found");
01161
01162 }
01163
01164
01170 void
01171 TPoverTCP::cleanup_receiver_thread(AssocData* peer_assoc)
01172 {
01173
01174
01175
01176
01177
01178 if (peer_assoc == 0)
01179 return;
01180
01181 pthread_t thread_id= peer_assoc->thread_ID;
01182
01183
01184 recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
01185 receiver_thread_arg_t* recv_thread_argp=
01186 (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
01187 if (recv_thread_argp)
01188 {
01189 if (!recv_thread_argp->terminated)
01190 {
01191
01192 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> not terminated yet?!");
01193 return;
01194 }
01195 else
01196 {
01197 Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> is terminated");
01198
01199
01200 recv_thread_argmap.erase(recv_thread_arg_iter);
01201
01202
01203 delete recv_thread_argp;
01204 }
01205 }
01206
01207
01208
01209
01210
01211 terminate_sender_thread(peer_assoc);
01212
01213
01214
01215 connmap.erase(peer_assoc);
01216
01217
01218
01219 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Cleanup receiver thread <" << thread_id << ">. Done.");
01220 }
01221
01222
01223
01224
01225
01226
01227 void
01228 TPoverTCP::terminate_sender_thread(const AssocData* assoc)
01229 {
01230 if (assoc == 0)
01231 {
01232 Log(ERROR_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - assoc data == NULL");
01233 return;
01234 }
01235
01236 sender_thread_queuemap_t::iterator it= senderthread_queuemap.find(assoc->peer);
01237
01238 if (it != senderthread_queuemap.end())
01239 {
01240 FastQueue* destqueue= it->second;
01241 if (destqueue)
01242 {
01243 TPoverTCPMsg* internalmsg= new TPoverTCPMsg(assoc,tpparam.source,TPoverTCPMsg::stop);
01244 if (internalmsg)
01245 {
01246
01247 internalmsg->send(tpparam.source,destqueue);
01248 }
01249 }
01250 else
01251 {
01252 Log(WARNING_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - found entry for address, but no sender thread. addr:" << assoc->peer);
01253 }
01254
01255 senderthread_queuemap.erase(it);
01256 }
01257 }
01258
01259
01260
01261
01262
01263 void
01264 TPoverTCP::terminate_all_threads()
01265 {
01266 AssocData* assoc= 0;
01267 receiver_thread_arg_t* terminate_argp;
01268
01269 for (recv_thread_argmap_t::iterator terminate_iterator= recv_thread_argmap.begin();
01270 terminate_iterator != recv_thread_argmap.end();
01271 terminate_iterator++)
01272 {
01273 if ( (terminate_argp= terminate_iterator->second) != 0)
01274 {
01275
01276 assoc= const_cast<AssocData*>(terminate_argp->peer_assoc);
01277
01278 if (terminate_argp->terminated == false)
01279 {
01280 terminate_argp->sig_terminate= true;
01281
01282 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,
01283 "Signaled receiver thread <" << terminate_iterator->first << "> for termination");
01284
01285 pthread_join(terminate_iterator->first, 0);
01286
01287 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Thread <" << terminate_iterator->first << "> is terminated");
01288 }
01289 else
01290 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,
01291 "Receiver thread <" << terminate_iterator->first << "> already terminated");
01292
01293
01294 delete terminate_argp;
01295
01296
01297 terminate_sender_thread(assoc);
01298
01299 connmap.erase(assoc);
01300
01301 }
01302 }
01303 }
01304
01305
01313 void*
01314 TPoverTCP::sender_thread_starter(void *argp)
01315 {
01316 sender_thread_start_arg_t *sargp= static_cast<sender_thread_start_arg_t *>(argp);
01317
01318
01319
01320
01321 if (sargp != 0 && sargp->instance != 0)
01322 {
01323
01324 sargp->instance->sender_thread(sargp->sender_thread_queue);
01325
01326
01327
01328
01329 delete sargp;
01330 }
01331 else
01332 {
01333 Log(ERROR_LOG,LOG_CRIT,"sender_thread_starter","while starting sender_thread: 0 pointer to arg or object");
01334 }
01335 return 0;
01336 }
01337
01338
01339
01340
01348 void*
01349 TPoverTCP::receiver_thread_starter(void *argp)
01350 {
01351 receiver_thread_start_arg_t *rargp= static_cast<receiver_thread_start_arg_t *>(argp);
01352
01353 if (rargp != 0 && rargp->instance != 0)
01354 {
01355
01356 rargp->instance->receiver_thread(rargp->rtargp);
01357
01358
01359 delete rargp;
01360 }
01361 else
01362 {
01363 Log(ERROR_LOG,LOG_CRIT,"receiver_thread_starter","while starting receiver_thread: 0 pointer to arg or object");
01364 }
01365 return 0;
01366 }
01367
01368
01369 void
01370 TPoverTCP::create_new_sender_thread(FastQueue* senderfqueue)
01371 {
01372 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new sender thread...");
01373
01374 pthread_t senderthreadid;
01375
01376 int pthread_status= pthread_create(&senderthreadid,
01377 NULL,
01378
01379 TPoverTCP::sender_thread_starter,
01380 new sender_thread_start_arg_t(this,senderfqueue));
01381 if (pthread_status)
01382 {
01383 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " << strerror(pthread_status));
01384
01385 delete senderfqueue;
01386 }
01387 }
01388
01389
01390 void
01391 TPoverTCP::create_new_receiver_thread(AssocData* peer_assoc)
01392 {
01393 receiver_thread_arg_t* argp=
01394 new(nothrow) receiver_thread_arg(peer_assoc);
01395
01396 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new receiver thread...");
01397
01398
01399 int pthread_status= pthread_create(&peer_assoc->thread_ID,
01400 NULL,
01401
01402 receiver_thread_starter,
01403 new(nothrow) receiver_thread_start_arg_t(this,argp));
01404 if (pthread_status)
01405 {
01406 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " << strerror(pthread_status));
01407
01408 delete argp;
01409 }
01410 else
01411 {
01412 lock();
01413
01414
01415
01416 pair<recv_thread_argmap_t::iterator, bool> tmpinsiterator=
01417 recv_thread_argmap.insert( pair<pthread_t,receiver_thread_arg_t*> (peer_assoc->thread_ID,argp) );
01418 if (tmpinsiterator.second == false)
01419 {
01420 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Thread argument could not be inserted into hashmap");
01421 }
01422 unlock();
01423 }
01424 }
01425
01426
01434 void*
01435 TPoverTCP::master_listener_thread_starter(void *argp)
01436 {
01437
01438 if (argp != 0)
01439 {
01440 (static_cast<TPoverTCP*>(argp))->master_listener_thread();
01441 }
01442 return 0;
01443 }
01444
01445
01446
01452 void
01453 TPoverTCP::master_listener_thread()
01454 {
01455
01456 struct sockaddr_in6 own_address_v6;
01457 own_address_v6.sin6_family = AF_INET6;
01458 own_address_v6.sin6_flowinfo= 0;
01459 own_address_v6.sin6_port = htons(tpparam.port);
01460
01461 own_address_v6.sin6_addr = in6addr_any;
01462 own_address_v6.sin6_scope_id= 0;
01463
01464
01465 struct sockaddr_in own_address_v4;
01466 own_address_v4.sin_family = AF_INET;
01467 own_address_v4.sin_port = htons(tpparam.port);
01468
01469 own_address_v4.sin_addr.s_addr = INADDR_ANY;
01470
01471
01472 int master_listener_socket= socket( AF_INET6, SOCK_STREAM, IPPROTO_TCP);
01473 if (master_listener_socket!=-1) v4_mode = false;
01474 if (master_listener_socket == -1) {
01475 master_listener_socket= socket( AF_INET, SOCK_STREAM, IPPROTO_TCP);
01476 if (master_listener_socket!=-1) v4_mode = true;
01477 }
01478 if (master_listener_socket == -1)
01479 {
01480 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Could not create a new socket, error: " << strerror(errno));
01481 return;
01482 }
01483
01484
01485 int nodelayflag= 1;
01486 int status= setsockopt(master_listener_socket,
01487 IPPROTO_TCP,
01488 TCP_NODELAY,
01489 &nodelayflag,
01490 sizeof(nodelayflag));
01491 if (status)
01492 {
01493 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
01494 }
01495
01496
01497 int socketreuseflag= 1;
01498 status= setsockopt(master_listener_socket,
01499 SOL_SOCKET,
01500 SO_REUSEADDR,
01501 (const char *) &socketreuseflag,
01502 sizeof(socketreuseflag));
01503 if (status)
01504 {
01505 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
01506 }
01507
01508
01509
01510 int bind_status = bind(master_listener_socket, v4_mode ?
01511 reinterpret_cast<struct sockaddr *>(&own_address_v4) :
01512 reinterpret_cast<struct sockaddr *>(&own_address_v6),
01513 v4_mode ? sizeof(own_address_v4) : sizeof(own_address_v6));
01514 if (bind_status)
01515 {
01516 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Binding to "
01517 << (v4_mode ? inet_ntop(AF_INET, &own_address_v4.sin_addr, in_addrstr, INET_ADDRSTRLEN) :
01518 inet_ntop(AF_INET6, &own_address_v6.sin6_addr, in6_addrstr, INET6_ADDRSTRLEN))
01519 << " port " << tpparam.port << " failed, error: " << strerror(errno));
01520 return;
01521 }
01522
01523
01524
01525 int listen_status = listen(master_listener_socket, max_listen_queue_size);
01526 if (listen_status)
01527 {
01528 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Listen at socket " << master_listener_socket
01529 << " failed, error: " << strerror(errno));
01530 return;
01531 }
01532 else
01533 {
01534 Log(INFO_LOG,LOG_NORMAL, tpparam.name, color[green] << "Listening at port #" << tpparam.port << color[off]);
01535 }
01536
01537
01538 fcntl(master_listener_socket,F_SETFL, O_NONBLOCK);
01539
01540
01541 struct pollfd poll_fd;
01542 poll_fd.fd = master_listener_socket;
01543 poll_fd.events = POLLIN | POLLPRI;
01544 poll_fd.revents = 0;
01545
01546
01547
01548
01549
01550
01551 bool terminate = false;
01552
01553 state_t currstate= get_state();
01554 int poll_status= 0;
01555 const unsigned int number_poll_sockets= 1;
01556 struct sockaddr_in6 peer_address;
01557 socklen_t peer_address_len;
01558 int conn_socket;
01559
01560
01561 while(! (terminate= (currstate==STATE_ABORT || currstate==STATE_STOP) ) )
01562 {
01563
01564
01565 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
01566 if (poll_fd.revents & POLLERR)
01567 {
01568 if (errno != EINTR)
01569 {
01570 Log(ERROR_LOG,LOG_CRIT, tpparam.name,
01571 "Poll caused error " << strerror(errno) << " - indicated by revents");
01572 }
01573 else
01574 {
01575 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "poll(): " << strerror(errno));
01576 }
01577
01578 }
01579 if (poll_fd.revents & POLLHUP)
01580 {
01581 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll hung up");
01582 return;
01583 }
01584 if (poll_fd.revents & POLLNVAL)
01585 {
01586 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll Invalid request: fd not open");
01587 return;
01588 }
01589
01590 switch (poll_status)
01591 {
01592 case -1:
01593 if (errno != EINTR)
01594 {
01595 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll status indicates error: " << strerror(errno));
01596 }
01597 else
01598 {
01599 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Poll status: " << strerror(errno));
01600 }
01601
01602 break;
01603
01604 case 0:
01605 #ifdef DEBUG_HARD
01606 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,
01607 "Listen Thread - Poll timed out after " << tpparam.sleep_time << " ms.");
01608 #endif
01609 currstate= get_state();
01610 continue;
01611 break;
01612
01613 default:
01614 #ifdef DEBUG_HARD
01615 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
01616 #endif
01617 break;
01618 }
01619
01620
01621
01622
01623 peer_address_len= sizeof(peer_address);
01624 conn_socket = accept (master_listener_socket,
01625 reinterpret_cast<struct sockaddr *>(&peer_address),
01626 &peer_address_len);
01627 if (conn_socket == -1)
01628 {
01629 if (errno != EWOULDBLOCK && errno != EAGAIN)
01630 {
01631 Log(ERROR_LOG,LOG_EMERG, tpparam.name, "Accept at socket " << master_listener_socket
01632 << " failed, error: " << strerror(errno));
01633 return;
01634 }
01635 }
01636 else
01637 {
01638
01639 AssocData* peer_assoc = NULL;
01640 appladdress addr(peer_address, IPPROTO_TCP);
01641
01642 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "<<--Received connect--<< request from " << addr.get_ip_str()
01643 << " port #" << addr.get_port());
01644
01645 struct sockaddr_in6 own_address;
01646 if (v4_mode) {
01647 struct sockaddr_in own_address_v4;
01648 socklen_t own_address_len_v4 = sizeof(own_address_v4);
01649 getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address_v4), &own_address_len_v4);
01650 v4_to_v6(&own_address_v4, &own_address);
01651 } else {
01652 socklen_t own_address_len= sizeof(own_address);
01653 getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
01654 }
01655
01656
01657
01658 peer_assoc = new(nothrow) AssocData(conn_socket, addr, appladdress(own_address,IPPROTO_TCP));
01659
01660 bool insert_success= false;
01661 if (peer_assoc)
01662 {
01663
01664 lock();
01665 insert_success= connmap.insert(peer_assoc);
01666
01667 unlock();
01668 }
01669
01670
01671 if (insert_success == false)
01672 {
01673 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot insert AssocData for socket " << conn_socket
01674 << ", " << addr.get_ip_str() << ", port #"
01675 << addr.get_port() << " into connection map, aborting connection...");
01676
01677
01678 close (conn_socket);
01679 if (peer_assoc)
01680 {
01681 delete peer_assoc;
01682 peer_assoc= 0;
01683 }
01684 return;
01685
01686 }
01687
01688
01689 create_new_receiver_thread(peer_assoc);
01690 }
01691
01692
01693 currstate= get_state();
01694
01695 }
01696 return;
01697 }
01698
01699
01700 TPoverTCP::~TPoverTCP()
01701 {
01702 init= false;
01703
01704 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Destructor called");
01705
01706 QueueManager::instance()->unregister_queue(tpparam.source);
01707 }
01708
01719 void
01720 TPoverTCP::main_loop(uint32 nr)
01721 {
01722
01723
01724 FastQueue* fq = get_fqueue();
01725 if (!fq)
01726 {
01727 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot find message queue");
01728 return;
01729 }
01730
01731 QueueManager::instance()->register_queue(fq,tpparam.source);
01732
01733
01734 pthread_t master_listener_thread_ID;
01735 int pthread_status= pthread_create(&master_listener_thread_ID,
01736 NULL,
01737
01738 master_listener_thread_starter,
01739 this);
01740 if (pthread_status)
01741 {
01742 Log(ERROR_LOG,LOG_CRIT, tpparam.name,
01743 "New master listener thread could not be created: " << strerror(pthread_status));
01744 }
01745 else
01746 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Master listener thread started");
01747
01748
01749
01750 timespec wait_interval= { 0, 250000000L };
01751 message* internal_thread_msg = NULL;
01752 state_t currstate= get_state();
01753
01754
01755 while( currstate!=STATE_ABORT && currstate!=STATE_STOP )
01756 {
01757
01758 if ( (internal_thread_msg= fq->dequeue_timedwait(wait_interval)) != 0 )
01759 {
01760 TPoverTCPMsg* internalmsg= dynamic_cast<TPoverTCPMsg*>(internal_thread_msg);
01761 if (internalmsg)
01762 {
01763 if (internalmsg->get_msgtype() == TPoverTCPMsg::stop)
01764 {
01765
01766 AssocData* assocd= const_cast<AssocData*>(internalmsg->get_peer_assoc());
01767 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Got cleanup request for thread <" << assocd->thread_ID <<'>');
01768 lock();
01769 cleanup_receiver_thread( assocd );
01770 unlock();
01771 }
01772 else
01773 if (internalmsg->get_msgtype() == TPoverTCPMsg::start)
01774 {
01775
01776 create_new_receiver_thread( const_cast<AssocData*>(internalmsg->get_peer_assoc()) );
01777 }
01778 else
01779 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "unexpected internal message:" << internalmsg->get_msgtype());
01780
01781 delete internalmsg;
01782 }
01783 else
01784 {
01785 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Dynamic_cast failed - received unexpected and unknown internal message source "
01786 << internal_thread_msg->get_source());
01787 }
01788 }
01789
01790
01791 currstate= get_state();
01792 }
01793
01794 if (currstate==STATE_STOP)
01795 {
01796
01797 Log(INFO_LOG,LOG_NORMAL, tpparam.name, "Asked to abort, stopping all receiver threads");
01798 }
01799
01800
01801 fq->shutdown();
01802
01803 terminate_all_threads();
01804 }
01805
01806 }