00001
00032 #include "linden_common.h"
00033
00034 #if LL_WINDOWS
00035
00036 #include <process.h>
00037
00038 #else
00039
00040 #if LL_LINUX
00041 #include <dlfcn.h>
00042 #endif
00043 #include <sys/types.h>
00044 #include <sys/socket.h>
00045 #include <netinet/in.h>
00046
00047 #endif
00048
00049
00050 #if !defined(USE_CIRCUIT_LIST)
00051 #include <algorithm>
00052 #endif
00053 #include <sstream>
00054 #include <iterator>
00055 #include <stack>
00056
00057 #include "llcircuit.h"
00058
00059 #include "message.h"
00060 #include "llrand.h"
00061 #include "llstl.h"
00062 #include "lltransfermanager.h"
00063
00064 const F32 PING_INTERVAL = 5.f;
00065 const S32 PING_START_BLOCK = 3;
00066 const S32 PING_RELEASE_BLOCK = 2;
00067
00068 const F32 TARGET_PERIOD_LENGTH = 5.f;
00069 const F32 LL_DUPLICATE_SUPPRESSION_TIMEOUT = 60.f;
00070
00071
00072 LLCircuitData::LLCircuitData(const LLHost &host, TPACKETID in_id)
00073 : mHost (host),
00074 mWrapID(0),
00075 mPacketsOutID(0),
00076 mPacketsInID(in_id),
00077 mHighestPacketID(in_id),
00078 mTrusted(FALSE),
00079 mbAllowTimeout(TRUE),
00080 mbAlive(TRUE),
00081 mBlocked(FALSE),
00082 mPingTime(0.0),
00083 mLastPingSendTime(0.0),
00084 mLastPingReceivedTime(0.0),
00085 mNextPingSendTime(0.0),
00086 mPingsInTransit(0),
00087 mLastPingID(0),
00088 mPingDelay(INITIAL_PING_VALUE_MSEC),
00089 mPingDelayAveraged((F32)INITIAL_PING_VALUE_MSEC),
00090 mUnackedPacketCount(0),
00091 mUnackedPacketBytes(0),
00092 mLocalEndPointID(),
00093 mPacketsOut(0),
00094 mPacketsIn(0),
00095 mPacketsLost(0),
00096 mBytesIn(0),
00097 mBytesOut(0),
00098 mLastPeriodLength(-1.f),
00099 mBytesInLastPeriod(0),
00100 mBytesOutLastPeriod(0),
00101 mBytesInThisPeriod(0),
00102 mBytesOutThisPeriod(0),
00103 mPeakBPSIn(0),
00104 mPeakBPSOut(0),
00105 mPeriodTime(0.0),
00106 mExistenceTimer(),
00107 mCurrentResendCount(0)
00108 {
00109
00110
00111 F64 mt_sec = LLMessageSystem::getMessageTimeSeconds(TRUE);
00112 F32 distribution_offset = ll_frand();
00113
00114 mPingTime = mt_sec;
00115 mLastPingSendTime = mt_sec + PING_INTERVAL * distribution_offset;
00116 mLastPingReceivedTime = mt_sec;
00117 mNextPingSendTime = mLastPingSendTime + 0.95*PING_INTERVAL + ll_frand(0.1f*PING_INTERVAL);
00118 mPeriodTime = mt_sec;
00119
00120 mTimeoutCallback = NULL;
00121 mTimeoutUserData = NULL;
00122
00123 mLocalEndPointID.generate();
00124 }
00125
00126
00127 LLCircuitData::~LLCircuitData()
00128 {
00129 LLReliablePacket *packetp = NULL;
00130
00131
00132 gTransferManager.cleanupConnection(mHost);
00133
00134
00135 std::vector<TPACKETID> doomed;
00136 reliable_iter iter;
00137 reliable_iter end = mUnackedPackets.end();
00138 for(iter = mUnackedPackets.begin(); iter != end; ++iter)
00139 {
00140 packetp = iter->second;
00141 gMessageSystem->mFailedResendPackets++;
00142 if(gMessageSystem->mVerboseLog)
00143 {
00144 doomed.push_back(packetp->mPacketID);
00145 }
00146 if (packetp->mCallback)
00147 {
00148 packetp->mCallback(packetp->mCallbackData,LL_ERR_CIRCUIT_GONE);
00149 }
00150
00151
00152 mUnackedPacketCount--;
00153 mUnackedPacketBytes -= packetp->mBufferLength;
00154
00155 delete packetp;
00156 }
00157
00158
00159 end = mFinalRetryPackets.end();
00160 for(iter = mFinalRetryPackets.begin(); iter != end; ++iter)
00161 {
00162 packetp = iter->second;
00163 gMessageSystem->mFailedResendPackets++;
00164 if(gMessageSystem->mVerboseLog)
00165 {
00166 doomed.push_back(packetp->mPacketID);
00167 }
00168 if (packetp->mCallback)
00169 {
00170 packetp->mCallback(packetp->mCallbackData,LL_ERR_CIRCUIT_GONE);
00171 }
00172
00173
00174 mUnackedPacketCount--;
00175 mUnackedPacketBytes -= packetp->mBufferLength;
00176
00177 delete packetp;
00178 }
00179
00180
00181 if(gMessageSystem->mVerboseLog && !doomed.empty())
00182 {
00183 std::ostringstream str;
00184 std::ostream_iterator<TPACKETID> append(str, " ");
00185 str << "MSG: -> " << mHost << "\tABORTING RELIABLE:\t";
00186 std::copy(doomed.begin(), doomed.end(), append);
00187 llinfos << str.str().c_str() << llendl;
00188 }
00189 }
00190
00191
00192 void LLCircuitData::ackReliablePacket(TPACKETID packet_num)
00193 {
00194 reliable_iter iter;
00195 LLReliablePacket *packetp;
00196
00197 iter = mUnackedPackets.find(packet_num);
00198 if (iter != mUnackedPackets.end())
00199 {
00200 packetp = iter->second;
00201
00202 if(gMessageSystem->mVerboseLog)
00203 {
00204 std::ostringstream str;
00205 str << "MSG: <- " << packetp->mHost << "\tRELIABLE ACKED:\t"
00206 << packetp->mPacketID;
00207 llinfos << str.str().c_str() << llendl;
00208 }
00209 if (packetp->mCallback)
00210 {
00211 if (packetp->mTimeout < 0.f)
00212 {
00213 packetp->mCallback(packetp->mCallbackData,LL_ERR_TCP_TIMEOUT);
00214 }
00215 else
00216 {
00217 packetp->mCallback(packetp->mCallbackData,LL_ERR_NOERR);
00218 }
00219 }
00220
00221
00222 mUnackedPacketCount--;
00223 mUnackedPacketBytes -= packetp->mBufferLength;
00224
00225
00226 delete packetp;
00227 mUnackedPackets.erase(iter);
00228 return;
00229 }
00230
00231 iter = mFinalRetryPackets.find(packet_num);
00232 if (iter != mFinalRetryPackets.end())
00233 {
00234 packetp = iter->second;
00235
00236 if(gMessageSystem->mVerboseLog)
00237 {
00238 std::ostringstream str;
00239 str << "MSG: <- " << packetp->mHost << "\tRELIABLE ACKED:\t"
00240 << packetp->mPacketID;
00241 llinfos << str.str().c_str() << llendl;
00242 }
00243 if (packetp->mCallback)
00244 {
00245 if (packetp->mTimeout < 0.f)
00246 {
00247 packetp->mCallback(packetp->mCallbackData,LL_ERR_TCP_TIMEOUT);
00248 }
00249 else
00250 {
00251 packetp->mCallback(packetp->mCallbackData,LL_ERR_NOERR);
00252 }
00253 }
00254
00255
00256 mUnackedPacketCount--;
00257 mUnackedPacketBytes -= packetp->mBufferLength;
00258
00259
00260 delete packetp;
00261 mFinalRetryPackets.erase(iter);
00262 }
00263 else
00264 {
00265
00266
00267 }
00268 }
00269
00270
00271
00272 S32 LLCircuitData::resendUnackedPackets(const F64 now)
00273 {
00274 S32 resent_packets = 0;
00275 LLReliablePacket *packetp;
00276
00277
00278
00279
00280
00281
00282
00283
00284
00285 reliable_iter iter;
00286 BOOL have_resend_overflow = FALSE;
00287 for (iter = mUnackedPackets.begin(); iter != mUnackedPackets.end();)
00288 {
00289 packetp = iter->second;
00290
00291
00292 if (!have_resend_overflow)
00293 {
00294 have_resend_overflow = mThrottles.checkOverflow(TC_RESEND, 0);
00295 }
00296
00297 if (have_resend_overflow)
00298 {
00299
00300
00301
00302
00303 if (mUnackedPacketBytes > 512000)
00304 {
00305 if (now > packetp->mExpirationTime)
00306 {
00307
00308 packetp->mRetries = 0;
00309
00310 mUnackedPackets.erase(iter++);
00311 mFinalRetryPackets[packetp->mPacketID] = packetp;
00312 }
00313 else
00314 {
00315 ++iter;
00316 }
00317
00318 continue;
00319 }
00320
00321 if (mUnackedPacketBytes > 256000 && !(getPacketsOut() % 1024))
00322 {
00323
00324 llwarns << mHost << " has " << mUnackedPacketBytes
00325 << " bytes of reliable messages waiting" << llendl;
00326 }
00327
00328 break;
00329 }
00330
00331 if (now > packetp->mExpirationTime)
00332 {
00333 packetp->mRetries--;
00334
00335
00336 mCurrentResendCount++;
00337
00338 gMessageSystem->mResentPackets++;
00339
00340 if(gMessageSystem->mVerboseLog)
00341 {
00342 std::ostringstream str;
00343 str << "MSG: -> " << packetp->mHost
00344 << "\tRESENDING RELIABLE:\t" << packetp->mPacketID;
00345 llinfos << str.str().c_str() << llendl;
00346 }
00347
00348 packetp->mBuffer[0] |= LL_RESENT_FLAG;
00349
00350 gMessageSystem->mPacketRing.sendPacket(packetp->mSocket,
00351 (char *)packetp->mBuffer, packetp->mBufferLength,
00352 packetp->mHost);
00353
00354 mThrottles.throttleOverflow(TC_RESEND, packetp->mBufferLength * 8.f);
00355
00356
00357 if (packetp->mPingBasedRetry)
00358 {
00359 packetp->mExpirationTime = now + llmax(LL_MINIMUM_RELIABLE_TIMEOUT_SECONDS, (LL_RELIABLE_TIMEOUT_FACTOR * getPingDelayAveraged()));
00360 }
00361 else
00362 {
00363
00364 packetp->mExpirationTime = now + packetp->mTimeout;
00365 }
00366
00367 if (!packetp->mRetries)
00368 {
00369
00370 mUnackedPackets.erase(iter++);
00371 mFinalRetryPackets[packetp->mPacketID] = packetp;
00372 }
00373 else
00374 {
00375
00376 ++iter;
00377 }
00378 resent_packets++;
00379 }
00380 else
00381 {
00382
00383 ++iter;
00384 }
00385 }
00386
00387
00388 for (iter = mFinalRetryPackets.begin(); iter != mFinalRetryPackets.end();)
00389 {
00390 packetp = iter->second;
00391 if (now > packetp->mExpirationTime)
00392 {
00393
00394
00395
00396
00397
00398
00399 gMessageSystem->mFailedResendPackets++;
00400
00401 if(gMessageSystem->mVerboseLog)
00402 {
00403 std::ostringstream str;
00404 str << "MSG: -> " << packetp->mHost << "\tABORTING RELIABLE:\t"
00405 << packetp->mPacketID;
00406 llinfos << str.str().c_str() << llendl;
00407 }
00408
00409 if (packetp->mCallback)
00410 {
00411 packetp->mCallback(packetp->mCallbackData,LL_ERR_TCP_TIMEOUT);
00412 }
00413
00414
00415 mUnackedPacketCount--;
00416 mUnackedPacketBytes -= packetp->mBufferLength;
00417
00418 mFinalRetryPackets.erase(iter++);
00419 delete packetp;
00420 }
00421 else
00422 {
00423 ++iter;
00424 }
00425 }
00426
00427 return mUnackedPacketCount;
00428 }
00429
00430
00431 LLCircuit::LLCircuit() : mLastCircuit(NULL)
00432 {
00433 }
00434
00435 LLCircuit::~LLCircuit()
00436 {
00437
00438 std::for_each(mCircuitData.begin(),
00439 mCircuitData.end(),
00440 llcompose1(
00441 DeletePointerFunctor<LLCircuitData>(),
00442 llselect2nd<circuit_data_map::value_type>()));
00443 }
00444
00445 LLCircuitData *LLCircuit::addCircuitData(const LLHost &host, TPACKETID in_id)
00446 {
00447
00448 llinfos << "LLCircuit::addCircuitData for " << host << llendl;
00449 LLCircuitData *tempp = new LLCircuitData(host, in_id);
00450 mCircuitData.insert(circuit_data_map::value_type(host, tempp));
00451 mPingSet.insert(tempp);
00452
00453 mLastCircuit = tempp;
00454 return tempp;
00455 }
00456
00457 void LLCircuit::removeCircuitData(const LLHost &host)
00458 {
00459 llinfos << "LLCircuit::removeCircuitData for " << host << llendl;
00460 mLastCircuit = NULL;
00461 circuit_data_map::iterator it = mCircuitData.find(host);
00462 if(it != mCircuitData.end())
00463 {
00464 LLCircuitData *cdp = it->second;
00465 mCircuitData.erase(it);
00466
00467 LLCircuit::ping_set_t::iterator psit = mPingSet.find(cdp);
00468 if (psit != mPingSet.end())
00469 {
00470 mPingSet.erase(psit);
00471 }
00472 else
00473 {
00474 llwarns << "Couldn't find entry for next ping in ping set!" << llendl;
00475 }
00476
00477
00478 mUnackedCircuitMap.erase(host);
00479 mSendAckMap.erase(host);
00480 delete cdp;
00481 }
00482
00483
00484
00485
00486
00487
00488 mLastCircuit = NULL;
00489 }
00490
00491 void LLCircuitData::setAlive(BOOL b_alive)
00492 {
00493 if (mbAlive != b_alive)
00494 {
00495 mPacketsOutID = 0;
00496 mPacketsInID = 0;
00497 mbAlive = b_alive;
00498 }
00499 if (b_alive)
00500 {
00501 mLastPingReceivedTime = LLMessageSystem::getMessageTimeSeconds();
00502 mPingsInTransit = 0;
00503 mBlocked = FALSE;
00504 }
00505 }
00506
00507
00508 void LLCircuitData::setAllowTimeout(BOOL allow)
00509 {
00510 mbAllowTimeout = allow;
00511
00512 if (allow)
00513 {
00514
00515
00516 setAlive(TRUE);
00517 }
00518 }
00519
00520
00521
00522 void LLCircuitData::checkPeriodTime()
00523 {
00524 F64 mt_sec = LLMessageSystem::getMessageTimeSeconds();
00525 F64 period_length = mt_sec - mPeriodTime;
00526 if ( period_length > TARGET_PERIOD_LENGTH)
00527 {
00528 F32 bps_in = (F32)(mBytesInThisPeriod * 8.f / period_length);
00529 if (bps_in > mPeakBPSIn)
00530 {
00531 mPeakBPSIn = bps_in;
00532 }
00533
00534 F32 bps_out = (F32)(mBytesOutThisPeriod * 8.f / period_length);
00535 if (bps_out > mPeakBPSOut)
00536 {
00537 mPeakBPSOut = bps_out;
00538 }
00539
00540 mBytesInLastPeriod = mBytesInThisPeriod;
00541 mBytesOutLastPeriod = mBytesOutThisPeriod;
00542 mBytesInThisPeriod = 0;
00543 mBytesOutThisPeriod = 0;
00544 mLastPeriodLength = (F32)period_length;
00545
00546 mPeriodTime = mt_sec;
00547 }
00548 }
00549
00550
00551 void LLCircuitData::addBytesIn(S32 bytes)
00552 {
00553 mBytesIn += bytes;
00554 mBytesInThisPeriod += bytes;
00555 }
00556
00557
00558 void LLCircuitData::addBytesOut(S32 bytes)
00559 {
00560 mBytesOut += bytes;
00561 mBytesOutThisPeriod += bytes;
00562 }
00563
00564
00565 void LLCircuitData::addReliablePacket(S32 mSocket, U8 *buf_ptr, S32 buf_len, LLReliablePacketParams *params)
00566 {
00567 LLReliablePacket *packet_info;
00568
00569 packet_info = new LLReliablePacket(mSocket, buf_ptr, buf_len, params);
00570
00571 mUnackedPacketCount++;
00572 mUnackedPacketBytes += packet_info->mBufferLength;
00573
00574 if (params && params->mRetries)
00575 {
00576 mUnackedPackets[packet_info->mPacketID] = packet_info;
00577 }
00578 else
00579 {
00580 mFinalRetryPackets[packet_info->mPacketID] = packet_info;
00581 }
00582 }
00583
00584
00585 void LLCircuit::resendUnackedPackets(S32& unacked_list_length, S32& unacked_list_size)
00586 {
00587 F64 now = LLMessageSystem::getMessageTimeSeconds();
00588 unacked_list_length = 0;
00589 unacked_list_size = 0;
00590
00591 LLCircuitData* circ;
00592 circuit_data_map::iterator end = mUnackedCircuitMap.end();
00593 for(circuit_data_map::iterator it = mUnackedCircuitMap.begin(); it != end; ++it)
00594 {
00595 circ = (*it).second;
00596 unacked_list_length += circ->resendUnackedPackets(now);
00597 unacked_list_size += circ->getUnackedPacketBytes();
00598 }
00599 }
00600
00601
00602 BOOL LLCircuitData::isDuplicateResend(TPACKETID packetnum)
00603 {
00604 return (mRecentlyReceivedReliablePackets.find(packetnum) != mRecentlyReceivedReliablePackets.end());
00605 }
00606
00607
00608 void LLCircuit::dumpResends()
00609 {
00610 circuit_data_map::iterator end = mCircuitData.end();
00611 for(circuit_data_map::iterator it = mCircuitData.begin(); it != end; ++it)
00612 {
00613 (*it).second->dumpResendCountAndReset();
00614 }
00615 }
00616
00617 LLCircuitData* LLCircuit::findCircuit(const LLHost& host) const
00618 {
00619
00620 if (mLastCircuit && (mLastCircuit->mHost == host))
00621 {
00622 return mLastCircuit;
00623 }
00624
00625 circuit_data_map::const_iterator it = mCircuitData.find(host);
00626 if(it == mCircuitData.end())
00627 {
00628 return NULL;
00629 }
00630 mLastCircuit = it->second;
00631 return mLastCircuit;
00632 }
00633
00634
00635 BOOL LLCircuit::isCircuitAlive(const LLHost& host) const
00636 {
00637 LLCircuitData *cdp = findCircuit(host);
00638 if(cdp)
00639 {
00640 return cdp->mbAlive;
00641 }
00642
00643 return FALSE;
00644 }
00645
00646 void LLCircuitData::setTimeoutCallback(void (*callback_func)(const LLHost &host, void *user_data), void *user_data)
00647 {
00648 mTimeoutCallback = callback_func;
00649 mTimeoutUserData = user_data;
00650 }
00651
00652 void LLCircuitData::checkPacketInID(TPACKETID id, BOOL receive_resent)
00653 {
00654
00655
00656 F32 delta = (F32)mHighestPacketID - (F32)id;
00657 if (delta > (0.5f*LL_MAX_OUT_PACKET_ID))
00658 {
00659
00660 mHighestPacketID = id;
00661 }
00662 else if (delta < (-0.5f*LL_MAX_OUT_PACKET_ID))
00663 {
00664
00665 }
00666 else
00667 {
00668 mHighestPacketID = llmax(mHighestPacketID, id);
00669 }
00670
00671
00672
00673 if (0 == mPacketsIn)
00674 {
00675
00676 mPacketsIn++;
00677 setPacketInID((id + 1) % LL_MAX_OUT_PACKET_ID);
00678
00679 return;
00680 }
00681
00682 mPacketsIn++;
00683
00684
00685
00686 if ((mPacketsInID == id))
00687 {
00688
00689 mPacketsInID++;
00690 mPacketsInID = (mPacketsInID) % LL_MAX_OUT_PACKET_ID;
00691 }
00692 else if (id < mWrapID)
00693 {
00694
00695
00696
00697
00698
00699 }
00700 else
00701 {
00702
00703
00704
00705
00706
00707 if (mPotentialLostPackets.find(id) != mPotentialLostPackets.end())
00708 {
00709 if(gMessageSystem->mVerboseLog)
00710 {
00711 std::ostringstream str;
00712 str << "MSG: <- " << mHost << "\tRECOVERING LOST:\t" << id;
00713 llinfos << str.str().c_str() << llendl;
00714 }
00715
00716 mPotentialLostPackets.erase(id);
00717 }
00718 else if (!receive_resent)
00719 {
00720 U64 time = LLMessageSystem::getMessageTimeUsecs();
00721 TPACKETID index = mPacketsInID;
00722 S32 gap_count = 0;
00723 if ((index < id) && ((id - index) < 16))
00724 {
00725 while (index != id)
00726 {
00727 if(gMessageSystem->mVerboseLog)
00728 {
00729 std::ostringstream str;
00730 str << "MSG: <- " << mHost << "\tPACKET GAP:\t"
00731 << index;
00732 llinfos << str.str().c_str() << llendl;
00733 }
00734
00735
00736 mPotentialLostPackets[index] = time;
00737 index++;
00738 index = index % LL_MAX_OUT_PACKET_ID;
00739 gap_count++;
00740 }
00741 }
00742 else
00743 {
00744 llinfos << "packet_out_of_order - got packet " << id << " expecting " << index << " from " << mHost << llendl;
00745 if(gMessageSystem->mVerboseLog)
00746 {
00747 std::ostringstream str;
00748 str << "MSG: <- " << mHost << "\tPACKET GAP:\t"
00749 << id << " expected " << index;
00750 llinfos << str.str().c_str() << llendl;
00751 }
00752 }
00753
00754 mPacketsInID = id + 1;
00755 mPacketsInID = (mPacketsInID) % LL_MAX_OUT_PACKET_ID;
00756
00757 if (gap_count > 128)
00758 {
00759 llwarns << "Packet loss gap filler running amok!" << llendl;
00760 }
00761 else if (gap_count > 16)
00762 {
00763 llwarns << "Sustaining large amounts of packet loss!" << llendl;
00764 }
00765
00766 }
00767 }
00768 }
00769
00770
00771 void LLCircuit::updateWatchDogTimers(LLMessageSystem *msgsys)
00772 {
00773 F64 cur_time = LLMessageSystem::getMessageTimeSeconds();
00774 S32 count = mPingSet.size();
00775 S32 cur = 0;
00776
00777
00778 while((cur < count) && !mPingSet.empty())
00779 {
00780 cur++;
00781
00782 LLCircuit::ping_set_t::iterator psit = mPingSet.begin();
00783 LLCircuitData *cdp = *psit;
00784
00785 if (!cdp->mbAlive)
00786 {
00787
00788
00789
00790
00791
00792 mPingSet.erase(psit);
00793 cdp->mNextPingSendTime = cur_time + PING_INTERVAL;
00794 mPingSet.insert(cdp);
00795 continue;
00796 }
00797 else
00798 {
00799
00800 if (cur_time < cdp->mNextPingSendTime)
00801 {
00802
00803
00804 break;
00805 }
00806
00807
00808 if (cdp->updateWatchDogTimers(msgsys))
00809 {
00810
00811 F64 dt = 0.95f*PING_INTERVAL + ll_frand(0.1f*PING_INTERVAL);
00812
00813
00814
00815 mPingSet.erase(psit);
00816 cdp->mNextPingSendTime = cur_time + dt;
00817 mPingSet.insert(cdp);
00818
00819
00820 cdp->mThrottles.dynamicAdjust();
00821
00822
00823 cdp->checkPeriodTime();
00824 }
00825 else
00826 {
00827
00828
00829
00830 removeCircuitData(cdp->mHost);
00831 }
00832 }
00833 }
00834 }
00835
00836
00837 BOOL LLCircuitData::updateWatchDogTimers(LLMessageSystem *msgsys)
00838 {
00839 F64 cur_time = LLMessageSystem::getMessageTimeSeconds();
00840 mLastPingSendTime = cur_time;
00841
00842 if (!checkCircuitTimeout())
00843 {
00844
00845 return FALSE;
00846 }
00847
00848
00849
00850
00851
00852
00853
00854
00855
00856
00857
00858
00859 BOOL wrapped = FALSE;
00860 reliable_iter iter;
00861 iter = mUnackedPackets.upper_bound(getPacketOutID());
00862 if (iter == mUnackedPackets.end())
00863 {
00864
00865
00866 iter = mUnackedPackets.begin();
00867 wrapped = TRUE;
00868 }
00869
00870 TPACKETID packet_id = 0;
00871
00872
00873 BOOL wrapped_final = FALSE;
00874 reliable_iter iter_final;
00875 iter_final = mFinalRetryPackets.upper_bound(getPacketOutID());
00876 if (iter_final == mFinalRetryPackets.end())
00877 {
00878 iter_final = mFinalRetryPackets.begin();
00879 wrapped_final = TRUE;
00880 }
00881
00882
00883
00884 if (wrapped != wrapped_final)
00885 {
00886
00887
00888 if (!wrapped)
00889 {
00890
00891
00892 packet_id = iter->first;
00893
00894 }
00895 else
00896 {
00897 packet_id = iter_final->first;
00898
00899 }
00900 }
00901 else
00902 {
00903
00904 if ((iter == mUnackedPackets.end()) && (iter_final == mFinalRetryPackets.end()))
00905 {
00906
00907
00908
00909
00910
00911 packet_id = getPacketOutID();
00912 }
00913 else
00914 {
00915 BOOL had_unacked = FALSE;
00916 if (iter != mUnackedPackets.end())
00917 {
00918
00919 packet_id = iter->first;
00920 had_unacked = TRUE;
00921
00922 }
00923
00924 if (iter_final != mFinalRetryPackets.end())
00925 {
00926
00927 if (had_unacked)
00928 {
00929
00930 packet_id = llmin(packet_id, iter_final->first);
00931
00932 }
00933 else
00934 {
00935
00936 packet_id = iter_final->first;
00937
00938 }
00939 }
00940 }
00941 }
00942
00943
00944 pingTimerStart();
00945 msgsys->newMessageFast(_PREHASH_StartPingCheck);
00946 msgsys->nextBlock(_PREHASH_PingID);
00947 msgsys->addU8Fast(_PREHASH_PingID, nextPingID());
00948 msgsys->addU32Fast(_PREHASH_OldestUnacked, packet_id);
00949 msgsys->sendMessage(mHost);
00950
00951
00952
00953
00954
00955 LLCircuitData::packet_time_map::iterator it;
00956 U64 timeout = (U64)(1000000.0*llmin(LL_MAX_LOST_TIMEOUT, getPingDelayAveraged() * LL_LOST_TIMEOUT_FACTOR));
00957
00958 U64 mt_usec = LLMessageSystem::getMessageTimeUsecs();
00959 for (it = mPotentialLostPackets.begin(); it != mPotentialLostPackets.end(); )
00960 {
00961 U64 delta_t_usec = mt_usec - (*it).second;
00962 if (delta_t_usec > timeout)
00963 {
00964
00965 mPacketsLost++;
00966 gMessageSystem->mDroppedPackets++;
00967 if(gMessageSystem->mVerboseLog)
00968 {
00969 std::ostringstream str;
00970 str << "MSG: <- " << mHost << "\tLOST PACKET:\t"
00971 << (*it).first;
00972 llinfos << str.str().c_str() << llendl;
00973 }
00974 mPotentialLostPackets.erase(it++);
00975 }
00976 else
00977 {
00978 ++it;
00979 }
00980 }
00981
00982 return TRUE;
00983 }
00984
00985
00986 void LLCircuitData::clearDuplicateList(TPACKETID oldest_id)
00987 {
00988
00989
00990
00991
00992
00993
00994 if (oldest_id < mHighestPacketID)
00995 {
00996
00997 packet_time_map::iterator pit_start;
00998 packet_time_map::iterator pit_end;
00999 pit_start = mRecentlyReceivedReliablePackets.begin();
01000 pit_end = mRecentlyReceivedReliablePackets.lower_bound(oldest_id);
01001 mRecentlyReceivedReliablePackets.erase(pit_start, pit_end);
01002 }
01003
01004
01005
01006
01007 U64 mt_usec = LLMessageSystem::getMessageTimeUsecs();
01008
01009 packet_time_map::iterator pit;
01010 for(pit = mRecentlyReceivedReliablePackets.upper_bound(mHighestPacketID);
01011 pit != mRecentlyReceivedReliablePackets.end(); )
01012 {
01013
01014 if ((pit->first - mHighestPacketID) < 100)
01015 {
01016 llwarns << "Probably incorrectly timing out non-wrapped packets!" << llendl;
01017 }
01018 U64 delta_t_usec = mt_usec - (*pit).second;
01019 F64 delta_t_sec = delta_t_usec * SEC_PER_USEC;
01020 if (delta_t_sec > LL_DUPLICATE_SUPPRESSION_TIMEOUT)
01021 {
01022
01023 llinfos << "Clearing " << pit->first << " from recent list" << llendl;
01024 mRecentlyReceivedReliablePackets.erase(pit++);
01025 }
01026 else
01027 {
01028 ++pit;
01029 }
01030 }
01031
01032 }
01033
01034 BOOL LLCircuitData::checkCircuitTimeout()
01035 {
01036 F64 time_since_last_ping = LLMessageSystem::getMessageTimeSeconds() - mLastPingReceivedTime;
01037
01038
01039 if (time_since_last_ping > PING_INTERVAL_MAX)
01040 {
01041 llwarns << "LLCircuitData::checkCircuitTimeout for " << mHost << " last ping " << time_since_last_ping << " seconds ago." <<llendl;
01042 setAlive(FALSE);
01043 if (mTimeoutCallback)
01044 {
01045 llwarns << "LLCircuitData::checkCircuitTimeout for " << mHost << " calling callback." << llendl;
01046 mTimeoutCallback(mHost, mTimeoutUserData);
01047 }
01048 if (!isAlive())
01049 {
01050
01051 llwarns << "LLCircuitData::checkCircuitTimeout for " << mHost << " still dead, dropping." << llendl;
01052 return FALSE;
01053 }
01054 }
01055 else if (time_since_last_ping > PING_INTERVAL_ALARM)
01056 {
01057
01058 }
01059 return TRUE;
01060 }
01061
01062
01063
01064 BOOL LLCircuitData::collectRAck(TPACKETID packet_num)
01065 {
01066 if (mAcks.empty())
01067 {
01068
01069 gMessageSystem->mCircuitInfo.mSendAckMap[mHost] = this;
01070 }
01071
01072 mAcks.push_back(packet_num);
01073 return TRUE;
01074 }
01075
01076
01077
01078 void LLCircuit::sendAcks()
01079 {
01080 LLCircuitData* cd;
01081 circuit_data_map::iterator end = mSendAckMap.end();
01082 for(circuit_data_map::iterator it = mSendAckMap.begin(); it != end; ++it)
01083 {
01084 cd = (*it).second;
01085
01086 S32 count = (S32)cd->mAcks.size();
01087 if(count > 0)
01088 {
01089
01090 S32 acks_this_packet = 0;
01091 for(S32 i = 0; i < count; ++i)
01092 {
01093 if(acks_this_packet == 0)
01094 {
01095 gMessageSystem->newMessageFast(_PREHASH_PacketAck);
01096 }
01097 gMessageSystem->nextBlockFast(_PREHASH_Packets);
01098 gMessageSystem->addU32Fast(_PREHASH_ID, cd->mAcks[i]);
01099 ++acks_this_packet;
01100 if(acks_this_packet > 250)
01101 {
01102 gMessageSystem->sendMessage(cd->mHost);
01103 acks_this_packet = 0;
01104 }
01105 }
01106 if(acks_this_packet > 0)
01107 {
01108 gMessageSystem->sendMessage(cd->mHost);
01109 }
01110
01111 if(gMessageSystem->mVerboseLog)
01112 {
01113 std::ostringstream str;
01114 str << "MSG: -> " << cd->mHost << "\tPACKET ACKS:\t";
01115 std::ostream_iterator<TPACKETID> append(str, " ");
01116 std::copy(cd->mAcks.begin(), cd->mAcks.end(), append);
01117 llinfos << str.str().c_str() << llendl;
01118 }
01119
01120
01121 cd->mAcks.clear();
01122 }
01123 }
01124
01125
01126 mSendAckMap.clear();
01127 }
01128
01129
01130 std::ostream& operator<<(std::ostream& s, LLCircuitData& circuit)
01131 {
01132 F32 age = circuit.mExistenceTimer.getElapsedTimeF32();
01133
01134 using namespace std;
01135 s << "Circuit " << circuit.mHost << " ";
01136 s << circuit.mRemoteID << " ";
01137 s << (circuit.mbAlive ? "Alive" : "Not Alive") << " ";
01138 s << (circuit.mbAllowTimeout ? "Timeout Allowed" : "Timeout Not Allowed");
01139 s << endl;
01140
01141 s << " Packets Lost: " << circuit.mPacketsLost;
01142 s << " Measured Ping: " << circuit.mPingDelay;
01143 s << " Averaged Ping: " << circuit.mPingDelayAveraged;
01144 s << endl;
01145
01146 s << "Global In/Out " << S32(age) << " sec";
01147 s << " KBytes: " << circuit.mBytesIn / 1024 << "/" << circuit.mBytesOut / 1024;
01148 s << " Kbps: ";
01149 s << S32(circuit.mBytesIn * 8.f / circuit.mExistenceTimer.getElapsedTimeF32() / 1024.f);
01150 s << "/";
01151 s << S32(circuit.mBytesOut * 8.f / circuit.mExistenceTimer.getElapsedTimeF32() / 1024.f);
01152 s << " Packets: " << circuit.mPacketsIn << "/" << circuit.mPacketsOut;
01153 s << endl;
01154
01155 s << "Recent In/Out " << S32(circuit.mLastPeriodLength) << " sec";
01156 s << " KBytes: ";
01157 s << circuit.mBytesInLastPeriod / 1024;
01158 s << "/";
01159 s << circuit.mBytesOutLastPeriod / 1024;
01160 s << " Kbps: ";
01161 s << S32(circuit.mBytesInLastPeriod * 8.f / circuit.mLastPeriodLength / 1024.f);
01162 s << "/";
01163 s << S32(circuit.mBytesOutLastPeriod * 8.f / circuit.mLastPeriodLength / 1024.f);
01164 s << " Peak kbps: ";
01165 s << S32(circuit.mPeakBPSIn / 1024.f);
01166 s << "/";
01167 s << S32(circuit.mPeakBPSOut / 1024.f);
01168 s << endl;
01169
01170 return s;
01171 }
01172
01173 const LLString LLCircuitData::getInfoString() const
01174 {
01175 std::ostringstream info;
01176 info << "Circuit: " << mHost << std::endl
01177 << (mbAlive ? "Alive" : "Not Alive") << std::endl
01178 << "Age: " << mExistenceTimer.getElapsedTimeF32() << std::endl;
01179 return LLString(info.str());
01180 }
01181
01182 void LLCircuitData::dumpResendCountAndReset()
01183 {
01184 if (mCurrentResendCount)
01185 {
01186 llinfos << "Circuit: " << mHost << " resent " << mCurrentResendCount << " packets" << llendl;
01187 mCurrentResendCount = 0;
01188 }
01189 }
01190
01191 std::ostream& operator<<(std::ostream& s, LLCircuit &circuit)
01192 {
01193 s << "Circuit Info:" << std::endl;
01194 LLCircuit::circuit_data_map::iterator end = circuit.mCircuitData.end();
01195 LLCircuit::circuit_data_map::iterator it;
01196 for(it = circuit.mCircuitData.begin(); it != end; ++it)
01197 {
01198 s << *((*it).second) << std::endl;
01199 }
01200 return s;
01201 }
01202
01203 const LLString LLCircuit::getInfoString() const
01204 {
01205 std::ostringstream info;
01206 info << "Circuit Info:" << std::endl;
01207 LLCircuit::circuit_data_map::const_iterator end = mCircuitData.end();
01208 LLCircuit::circuit_data_map::const_iterator it;
01209 for(it = mCircuitData.begin(); it != end; ++it)
01210 {
01211 info << (*it).second->getInfoString() << std::endl;
01212 }
01213 return LLString(info.str());
01214 }
01215
01216 void LLCircuit::getCircuitRange(
01217 const LLHost& key,
01218 LLCircuit::circuit_data_map::iterator& first,
01219 LLCircuit::circuit_data_map::iterator& end)
01220 {
01221 end = mCircuitData.end();
01222 first = mCircuitData.upper_bound(key);
01223 }
01224
01225 TPACKETID LLCircuitData::nextPacketOutID()
01226 {
01227 mPacketsOut++;
01228
01229 TPACKETID id;
01230
01231 id = (mPacketsOutID + 1) % LL_MAX_OUT_PACKET_ID;
01232
01233 if (id < mPacketsOutID)
01234 {
01235
01236 mWrapID = 0;
01237 }
01238 mPacketsOutID = id;
01239 return id;
01240 }
01241
01242
01243 void LLCircuitData::setPacketInID(TPACKETID id)
01244 {
01245 id = id % LL_MAX_OUT_PACKET_ID;
01246 mPacketsInID = id;
01247 mRecentlyReceivedReliablePackets.clear();
01248
01249 mWrapID = id;
01250 }
01251
01252
01253 void LLCircuitData::pingTimerStop(const U8 ping_id)
01254 {
01255 F64 mt_secs = LLMessageSystem::getMessageTimeSeconds();
01256
01257
01258 F64 time = mt_secs - mPingTime;
01259 if (time == 0.0)
01260 {
01261
01262
01263 mt_secs = LLMessageSystem::getMessageTimeSeconds(TRUE);
01264 }
01265 mLastPingReceivedTime = mt_secs;
01266
01267
01268
01269 S32 delta_ping = (S32)mLastPingID - (S32) ping_id;
01270 if (delta_ping < 0)
01271 {
01272 delta_ping += 256;
01273 }
01274
01275 U32 msec = (U32) ((delta_ping*PING_INTERVAL + time) * 1000.f);
01276 setPingDelay(msec);
01277
01278 mPingsInTransit = delta_ping;
01279 if (mBlocked && (mPingsInTransit <= PING_RELEASE_BLOCK))
01280 {
01281 mBlocked = FALSE;
01282 }
01283 }
01284
01285
01286 void LLCircuitData::pingTimerStart()
01287 {
01288 mPingTime = LLMessageSystem::getMessageTimeSeconds();
01289 mPingsInTransit++;
01290
01291 if (!mBlocked && (mPingsInTransit > PING_START_BLOCK))
01292 {
01293 mBlocked = TRUE;
01294 }
01295 }
01296
01297
01298 U32 LLCircuitData::getPacketsIn() const
01299 {
01300 return mPacketsIn;
01301 }
01302
01303
01304 S32 LLCircuitData::getBytesIn() const
01305 {
01306 return mBytesIn;
01307 }
01308
01309
01310 S32 LLCircuitData::getBytesOut() const
01311 {
01312 return mBytesOut;
01313 }
01314
01315
01316 U32 LLCircuitData::getPacketsOut() const
01317 {
01318 return mPacketsOut;
01319 }
01320
01321
01322 TPACKETID LLCircuitData::getPacketOutID() const
01323 {
01324 return mPacketsOutID;
01325 }
01326
01327
01328 U32 LLCircuitData::getPacketsLost() const
01329 {
01330 return mPacketsLost;
01331 }
01332
01333
01334 BOOL LLCircuitData::isAlive() const
01335 {
01336 return mbAlive;
01337 }
01338
01339
01340 BOOL LLCircuitData::isBlocked() const
01341 {
01342 return mBlocked;
01343 }
01344
01345
01346 BOOL LLCircuitData::getAllowTimeout() const
01347 {
01348 return mbAllowTimeout;
01349 }
01350
01351
01352 U32 LLCircuitData::getPingDelay() const
01353 {
01354 return mPingDelay;
01355 }
01356
01357
01358 F32 LLCircuitData::getPingInTransitTime()
01359 {
01360
01361
01362 F32 time_since_ping_was_sent = 0;
01363
01364 if (mPingsInTransit)
01365 {
01366 time_since_ping_was_sent = (F32)((mPingsInTransit*PING_INTERVAL - 1) + (LLMessageSystem::getMessageTimeSeconds() - mPingTime))*1000.f;
01367 }
01368
01369 return time_since_ping_was_sent;
01370 }
01371
01372
01373 void LLCircuitData::setPingDelay(U32 ping)
01374 {
01375 mPingDelay = ping;
01376 mPingDelayAveraged = llmax((F32)ping, getPingDelayAveraged());
01377 mPingDelayAveraged = ((1.f - LL_AVERAGED_PING_ALPHA) * mPingDelayAveraged)
01378 + (LL_AVERAGED_PING_ALPHA * (F32) ping);
01379 mPingDelayAveraged = llclamp(mPingDelayAveraged,
01380 LL_AVERAGED_PING_MIN,
01381 LL_AVERAGED_PING_MAX);
01382 }
01383
01384
01385 F32 LLCircuitData::getPingDelayAveraged()
01386 {
01387 return llmin(llmax(getPingInTransitTime(), mPingDelayAveraged), LL_AVERAGED_PING_MAX);
01388 }
01389
01390
01391 BOOL LLCircuitData::getTrusted() const
01392 {
01393 return mTrusted;
01394 }
01395
01396
01397 void LLCircuitData::setTrusted(BOOL t)
01398 {
01399 mTrusted = t;
01400 }
01401
01402 F32 LLCircuitData::getAgeInSeconds() const
01403 {
01404 return mExistenceTimer.getElapsedTimeF32();
01405 }