00001
00032 #include "linden_common.h"
00033
00034 #include "llxfermanager.h"
00035
00036 #include "llxfer.h"
00037 #include "llxfer_file.h"
00038 #include "llxfer_mem.h"
00039 #include "llxfer_vfile.h"
00040
00041 #include "llerror.h"
00042 #include "lluuid.h"
00043 #include "u64.h"
00044
00045 const F32 LL_XFER_REGISTRATION_TIMEOUT = 60.0f;
00046 const F32 LL_PACKET_TIMEOUT = 3.0f;
00047 const S32 LL_PACKET_RETRY_LIMIT = 10;
00048
00049 const S32 LL_DEFAULT_MAX_SIMULTANEOUS_XFERS = 10;
00050 const S32 LL_DEFAULT_MAX_REQUEST_FIFO_XFERS = 1000;
00051
00052 #define LL_XFER_PROGRESS_MESSAGES 0
00053 #define LL_XFER_TEST_REXMIT 0
00054
00055
00057
00058 LLXferManager::LLXferManager (LLVFS *vfs)
00059 {
00060 init(vfs);
00061 }
00062
00064
00065 LLXferManager::~LLXferManager ()
00066 {
00067 free();
00068 }
00069
00071
00072 void LLXferManager::init (LLVFS *vfs)
00073 {
00074 mSendList = NULL;
00075 mReceiveList = NULL;
00076
00077 setMaxOutgoingXfersPerCircuit(LL_DEFAULT_MAX_SIMULTANEOUS_XFERS);
00078 setMaxIncomingXfers(LL_DEFAULT_MAX_REQUEST_FIFO_XFERS);
00079
00080 mVFS = vfs;
00081
00082
00083 mUseAckThrottling = FALSE;
00084 setAckThrottleBPS(100000);
00085 }
00086
00088
00089 void LLXferManager::free ()
00090 {
00091 LLXfer *xferp;
00092 LLXfer *delp;
00093
00094 mOutgoingHosts.deleteAllData();
00095
00096 delp = mSendList;
00097 while (delp)
00098 {
00099 xferp = delp->mNext;
00100 delete delp;
00101 delp = xferp;
00102 }
00103 mSendList = NULL;
00104
00105 delp = mReceiveList;
00106 while (delp)
00107 {
00108 xferp = delp->mNext;
00109 delete delp;
00110 delp = xferp;
00111 }
00112 mReceiveList = NULL;
00113 }
00114
00116
00117 void LLXferManager::setMaxIncomingXfers(S32 max_num)
00118 {
00119 mMaxIncomingXfers = max_num;
00120 }
00121
00123
00124 void LLXferManager::setMaxOutgoingXfersPerCircuit(S32 max_num)
00125 {
00126 mMaxOutgoingXfersPerCircuit = max_num;
00127 }
00128
00129 void LLXferManager::setUseAckThrottling(const BOOL use)
00130 {
00131 mUseAckThrottling = use;
00132 }
00133
00134 void LLXferManager::setAckThrottleBPS(const F32 bps)
00135 {
00136
00137
00138
00139
00140
00141 F32 min_bps = (1000.f * 8.f* mMaxIncomingXfers) / LL_PACKET_TIMEOUT;
00142
00143
00144 F32 actual_rate = llmax(min_bps*1.1f, bps);
00145 llinfos << "LLXferManager ack throttle min rate: " << min_bps << llendl;
00146 llinfos << "LLXferManager ack throttle actual rate: " << actual_rate << llendl;
00147 mAckThrottle.setRate(actual_rate);
00148 }
00149
00150
00152
00153 void LLXferManager::updateHostStatus()
00154 {
00155 LLXfer *xferp;
00156 LLHostStatus *host_statusp = NULL;
00157
00158 mOutgoingHosts.deleteAllData();
00159
00160 for (xferp = mSendList; xferp; xferp = xferp->mNext)
00161 {
00162 for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData())
00163 {
00164 if (host_statusp->mHost == xferp->mRemoteHost)
00165 {
00166 break;
00167 }
00168 }
00169 if (!host_statusp)
00170 {
00171 host_statusp = new LLHostStatus();
00172 if (host_statusp)
00173 {
00174 host_statusp->mHost = xferp->mRemoteHost;
00175 mOutgoingHosts.addData(host_statusp);
00176 }
00177 }
00178 if (host_statusp)
00179 {
00180 if (xferp->mStatus == e_LL_XFER_PENDING)
00181 {
00182 host_statusp->mNumPending++;
00183 }
00184 else if (xferp->mStatus == e_LL_XFER_IN_PROGRESS)
00185 {
00186 host_statusp->mNumActive++;
00187 }
00188 }
00189
00190 }
00191 }
00192
00194
00195 void LLXferManager::printHostStatus()
00196 {
00197 LLHostStatus *host_statusp = NULL;
00198 if (mOutgoingHosts.getFirstData())
00199 {
00200 llinfos << "Outgoing Xfers:" << llendl;
00201
00202 for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData())
00203 {
00204 llinfos << " " << host_statusp->mHost << " active: " << host_statusp->mNumActive << " pending: " << host_statusp->mNumPending << llendl;
00205 }
00206 }
00207 }
00208
00210
00211 LLXfer *LLXferManager::findXfer (U64 id, LLXfer *list_head)
00212 {
00213 LLXfer *xferp;
00214 for (xferp = list_head; xferp; xferp = xferp->mNext)
00215 {
00216 if (xferp->mID == id)
00217 {
00218 return(xferp);
00219 }
00220 }
00221 return(NULL);
00222 }
00223
00224
00226
00227 void LLXferManager::removeXfer (LLXfer *delp, LLXfer **list_head)
00228 {
00229
00230
00231 if (delp)
00232 {
00233 if (*list_head == delp)
00234 {
00235 *list_head = delp->mNext;
00236 delete (delp);
00237 }
00238 else
00239 {
00240 LLXfer *xferp = *list_head;
00241 while (xferp->mNext)
00242 {
00243 if (xferp->mNext == delp)
00244 {
00245 xferp->mNext = delp->mNext;
00246 delete (delp);
00247 break;
00248 }
00249 xferp = xferp->mNext;
00250 }
00251 }
00252 }
00253 }
00254
00256
00257 U32 LLXferManager::numActiveListEntries(LLXfer *list_head)
00258 {
00259 U32 num_entries = 0;
00260
00261 while (list_head)
00262 {
00263 if ((list_head->mStatus == e_LL_XFER_IN_PROGRESS))
00264 {
00265 num_entries++;
00266 }
00267 list_head = list_head->mNext;
00268 }
00269 return(num_entries);
00270 }
00271
00273
00274 S32 LLXferManager::numPendingXfers(const LLHost &host)
00275 {
00276 LLHostStatus *host_statusp = NULL;
00277
00278 for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData())
00279 {
00280 if (host_statusp->mHost == host)
00281 {
00282 return (host_statusp->mNumPending);
00283 }
00284 }
00285 return 0;
00286 }
00287
00289
00290 S32 LLXferManager::numActiveXfers(const LLHost &host)
00291 {
00292 LLHostStatus *host_statusp = NULL;
00293
00294 for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData())
00295 {
00296 if (host_statusp->mHost == host)
00297 {
00298 return (host_statusp->mNumActive);
00299 }
00300 }
00301 return 0;
00302 }
00303
00305
00306 void LLXferManager::changeNumActiveXfers(const LLHost &host, S32 delta)
00307 {
00308 LLHostStatus *host_statusp = NULL;
00309
00310 for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData())
00311 {
00312 if (host_statusp->mHost == host)
00313 {
00314 host_statusp->mNumActive += delta;
00315 }
00316 }
00317 }
00318
00320
00321 void LLXferManager::registerCallbacks(LLMessageSystem *msgsystem)
00322 {
00323 msgsystem->setHandlerFuncFast(_PREHASH_ConfirmXferPacket, process_confirm_packet, NULL);
00324 msgsystem->setHandlerFuncFast(_PREHASH_RequestXfer, process_request_xfer, NULL);
00325 msgsystem->setHandlerFuncFast(_PREHASH_SendXferPacket, continue_file_receive, NULL);
00326 msgsystem->setHandlerFuncFast(_PREHASH_AbortXfer, process_abort_xfer, NULL);
00327 }
00328
00330
00331 U64 LLXferManager::getNextID ()
00332 {
00333 LLUUID a_guid;
00334
00335 a_guid.generate();
00336
00337
00338 return(*((U64*)(a_guid.mData)));
00339 }
00340
00342
00343 S32 LLXferManager::encodePacketNum(S32 packet_num, BOOL is_EOF)
00344 {
00345 if (is_EOF)
00346 {
00347 packet_num |= 0x80000000;
00348 }
00349 return packet_num;
00350 }
00351
00353
00354 S32 LLXferManager::decodePacketNum(S32 packet_num)
00355 {
00356 return(packet_num & 0x0FFFFFFF);
00357 }
00358
00360
00361 BOOL LLXferManager::isLastPacket(S32 packet_num)
00362 {
00363 return(packet_num & 0x80000000);
00364 }
00365
00367
00368 U64 LLXferManager::registerXfer(const void *datap, const S32 length)
00369 {
00370 LLXfer *xferp;
00371 U64 xfer_id = getNextID();
00372
00373 xferp = (LLXfer *) new LLXfer_Mem();
00374 if (xferp)
00375 {
00376 xferp->mNext = mSendList;
00377 mSendList = xferp;
00378
00379 xfer_id = ((LLXfer_Mem *)xferp)->registerXfer(xfer_id, datap,length);
00380
00381 if (!xfer_id)
00382 {
00383 removeXfer(xferp,&mSendList);
00384 }
00385 }
00386 else
00387 {
00388 llerrs << "Xfer allocation error" << llendl;
00389 xfer_id = 0;
00390 }
00391
00392 return(xfer_id);
00393 }
00394
00396
00397 void LLXferManager::requestFile(const char* local_filename,
00398 const char* remote_filename,
00399 ELLPath remote_path,
00400 const LLHost& remote_host,
00401 BOOL delete_remote_on_completion,
00402 void (*callback)(void**,S32,LLExtStat),
00403 void** user_data,
00404 BOOL is_priority,
00405 BOOL use_big_packets)
00406 {
00407 LLXfer *xferp;
00408
00409 for (xferp = mReceiveList; xferp ; xferp = xferp->mNext)
00410 {
00411 if (xferp->getXferTypeTag() == LLXfer::XFER_FILE
00412 && (((LLXfer_File*)xferp)->matchesLocalFilename(local_filename))
00413 && (((LLXfer_File*)xferp)->matchesRemoteFilename(remote_filename, remote_path))
00414 && (remote_host == xferp->mRemoteHost)
00415 && (callback == xferp->mCallback)
00416 && (user_data == xferp->mCallbackDataHandle))
00417
00418 {
00419
00420 return;
00421 }
00422 }
00423
00424 S32 chunk_size = use_big_packets ? LL_XFER_LARGE_PAYLOAD : -1;
00425 xferp = (LLXfer *) new LLXfer_File(chunk_size);
00426 if (xferp)
00427 {
00428 addToList(xferp, mReceiveList, is_priority);
00429
00430
00431
00432
00433
00434 if(delete_remote_on_completion &&
00435 (strstr(remote_filename,".tmp") == &remote_filename[strlen(remote_filename)-4]))
00436 {
00437 LLFile::remove(local_filename);
00438 }
00439 ((LLXfer_File *)xferp)->initializeRequest(
00440 getNextID(),
00441 local_filename,
00442 remote_filename,
00443 remote_path,
00444 remote_host,
00445 delete_remote_on_completion,
00446 callback,user_data);
00447 startPendingDownloads();
00448 }
00449 else
00450 {
00451 llerrs << "Xfer allocation error" << llendl;
00452 }
00453 }
00454
00455 void LLXferManager::requestFile(const char* remote_filename,
00456 ELLPath remote_path,
00457 const LLHost& remote_host,
00458 BOOL delete_remote_on_completion,
00459 void (*callback)(void*,S32,void**,S32,LLExtStat),
00460 void** user_data,
00461 BOOL is_priority)
00462 {
00463 LLXfer *xferp;
00464
00465 xferp = (LLXfer *) new LLXfer_Mem();
00466 if (xferp)
00467 {
00468 addToList(xferp, mReceiveList, is_priority);
00469 ((LLXfer_Mem *)xferp)->initializeRequest(getNextID(),
00470 remote_filename,
00471 remote_path,
00472 remote_host,
00473 delete_remote_on_completion,
00474 callback, user_data);
00475 startPendingDownloads();
00476 }
00477 else
00478 {
00479 llerrs << "Xfer allocation error" << llendl;
00480 }
00481 }
00482
00483 void LLXferManager::requestVFile(const LLUUID& local_id,
00484 const LLUUID& remote_id,
00485 LLAssetType::EType type, LLVFS* vfs,
00486 const LLHost& remote_host,
00487 void (*callback)(void**,S32,LLExtStat),
00488 void** user_data,
00489 BOOL is_priority)
00490 {
00491 LLXfer *xferp;
00492
00493 for (xferp = mReceiveList; xferp ; xferp = xferp->mNext)
00494 {
00495 if (xferp->getXferTypeTag() == LLXfer::XFER_VFILE
00496 && (((LLXfer_VFile*)xferp)->matchesLocalFile(local_id, type))
00497 && (((LLXfer_VFile*)xferp)->matchesRemoteFile(remote_id, type))
00498 && (remote_host == xferp->mRemoteHost)
00499 && (callback == xferp->mCallback)
00500 && (user_data == xferp->mCallbackDataHandle))
00501
00502 {
00503
00504 return;
00505 }
00506 }
00507
00508 xferp = (LLXfer *) new LLXfer_VFile();
00509 if (xferp)
00510 {
00511 addToList(xferp, mReceiveList, is_priority);
00512 ((LLXfer_VFile *)xferp)->initializeRequest(getNextID(),
00513 vfs,
00514 local_id,
00515 remote_id,
00516 type,
00517 remote_host,
00518 callback,
00519 user_data);
00520 startPendingDownloads();
00521 }
00522 else
00523 {
00524 llerrs << "Xfer allocation error" << llendl;
00525 }
00526
00527 }
00528
00529
00530
00531
00532
00533
00534
00535
00536
00537
00538
00539
00540
00541
00542
00543
00544
00545
00546
00547
00548
00549
00550
00551
00552
00553
00554
00555
00556
00557
00558
00559
00560
00561
00562
00563
00564
00565
00566
00567
00568
00569
00570
00571
00572
00573
00574
00575
00576
00577
00578
00579
00580
00581
00582
00583
00584
00585
00586
00587
00588
00590
00591 void LLXferManager::processReceiveData (LLMessageSystem *mesgsys, void ** )
00592 {
00593
00594 const S32 BUF_SIZE = LL_XFER_LARGE_PAYLOAD + 4;
00595 char fdata_buf[LL_XFER_LARGE_PAYLOAD + 4];
00596 S32 fdata_size;
00597 U64 id;
00598 S32 packetnum;
00599 LLXfer * xferp;
00600
00601 mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
00602 mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Packet, packetnum);
00603
00604 fdata_size = mesgsys->getSizeFast(_PREHASH_DataPacket,_PREHASH_Data);
00605 mesgsys->getBinaryDataFast(_PREHASH_DataPacket, _PREHASH_Data, fdata_buf, 0, 0, BUF_SIZE);
00606
00607 xferp = findXfer(id, mReceiveList);
00608
00609 if (!xferp)
00610 {
00611 char U64_BUF[MAX_STRING];
00612 llwarns << "received xfer data from " << mesgsys->getSender()
00613 << " for non-existent xfer id: "
00614 << U64_to_str(id, U64_BUF, sizeof(U64_BUF)) << llendl;
00615 return;
00616 }
00617
00618 S32 xfer_size;
00619
00620 if (decodePacketNum(packetnum) != xferp->mPacketNum)
00621 {
00622
00623 if (decodePacketNum(packetnum) == (xferp->mPacketNum - 1))
00624 {
00625 llinfos << "Reconfirming xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " packet " << packetnum << llendl; sendConfirmPacket(mesgsys, id, decodePacketNum(packetnum), mesgsys->getSender());
00626 }
00627 else
00628 {
00629 llinfos << "Ignoring xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " recv'd packet " << packetnum << "; expecting " << xferp->mPacketNum << llendl;
00630 }
00631 return;
00632 }
00633
00634 S32 result = 0;
00635
00636 if (xferp->mPacketNum == 0)
00637 {
00638 ntohmemcpy(&xfer_size,fdata_buf,MVT_S32,sizeof(S32));
00639
00640
00641 xferp->setXferSize(xfer_size);
00642
00643
00644 result = xferp->receiveData(&(fdata_buf[sizeof(S32)]),fdata_size-(sizeof(S32)));
00645 }
00646 else
00647 {
00648 result = xferp->receiveData(fdata_buf,fdata_size);
00649 }
00650
00651 if (result == LL_ERR_CANNOT_OPEN_FILE)
00652 {
00653 xferp->abort(LL_ERR_CANNOT_OPEN_FILE);
00654 removeXfer(xferp,&mReceiveList);
00655 startPendingDownloads();
00656 return;
00657 }
00658
00659 xferp->mPacketNum++;
00660
00661 if (!mUseAckThrottling)
00662 {
00663
00664 sendConfirmPacket(mesgsys, id, decodePacketNum(packetnum), mesgsys->getSender());
00665 }
00666 else
00667 {
00668
00669 LLXferAckInfo ack_info;
00670 ack_info.mID = id;
00671 ack_info.mPacketNum = decodePacketNum(packetnum);
00672 ack_info.mRemoteHost = mesgsys->getSender();
00673 mXferAckQueue.push(ack_info);
00674 }
00675
00676 if (isLastPacket(packetnum))
00677 {
00678 xferp->processEOF();
00679 removeXfer(xferp,&mReceiveList);
00680 startPendingDownloads();
00681 }
00682 }
00683
00685
00686 void LLXferManager::sendConfirmPacket (LLMessageSystem *mesgsys, U64 id, S32 packetnum, const LLHost &remote_host)
00687 {
00688 #if LL_XFER_PROGRESS_MESSAGES
00689 if (!(packetnum % 50))
00690 {
00691 cout << "confirming xfer packet #" << packetnum << endl;
00692 }
00693 #endif
00694 mesgsys->newMessageFast(_PREHASH_ConfirmXferPacket);
00695 mesgsys->nextBlockFast(_PREHASH_XferID);
00696 mesgsys->addU64Fast(_PREHASH_ID, id);
00697 mesgsys->addU32Fast(_PREHASH_Packet, packetnum);
00698
00699 mesgsys->sendMessage(remote_host);
00700 }
00701
00703
00704 void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** )
00705 {
00706
00707 U64 id;
00708 char local_filename[MAX_STRING];
00709 ELLPath local_path = LL_PATH_NONE;
00710 S32 result = LL_ERR_NOERR;
00711 LLUUID uuid;
00712 LLAssetType::EType type;
00713 S16 type_s16;
00714 BOOL b_use_big_packets;
00715
00716 mesgsys->getBOOL("XferID", "UseBigPackets", b_use_big_packets);
00717
00718 mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
00719 char U64_BUF[MAX_STRING];
00720 llinfos << "xfer request id: " << U64_to_str(id, U64_BUF, sizeof(U64_BUF))
00721 << " to " << mesgsys->getSender() << llendl;
00722
00723 mesgsys->getStringFast(_PREHASH_XferID, _PREHASH_Filename, MAX_STRING, local_filename);
00724
00725 U8 local_path_u8;
00726 mesgsys->getU8("XferID", "FilePath", local_path_u8);
00727 if( local_path_u8 < (U8)LL_PATH_COUNT )
00728 {
00729 local_path = (ELLPath)local_path_u8;
00730 }
00731 else
00732 {
00733 llwarns << "Invalid file path in LLXferManager::processFileRequest() " << (U32)local_path_u8 << llendl;
00734 }
00735
00736 mesgsys->getUUIDFast(_PREHASH_XferID, _PREHASH_VFileID, uuid);
00737 mesgsys->getS16Fast(_PREHASH_XferID, _PREHASH_VFileType, type_s16);
00738 type = (LLAssetType::EType)type_s16;
00739
00740 LLXfer *xferp;
00741
00742 if (uuid != LLUUID::null)
00743 {
00744 if(NULL == LLAssetType::lookup(type))
00745 {
00746 llwarns << "Invalid type for xfer request: " << uuid << ":"
00747 << type_s16 << " to " << mesgsys->getSender() << llendl;
00748 return;
00749 }
00750
00751 llinfos << "starting vfile transfer: " << uuid << "," << LLAssetType::lookup(type) << " to " << mesgsys->getSender() << llendl;
00752
00753 if (! mVFS)
00754 {
00755 llwarns << "Attempt to send VFile w/o available VFS" << llendl;
00756 return;
00757 }
00758
00759 xferp = (LLXfer *)new LLXfer_VFile(mVFS, uuid, type);
00760 if (xferp)
00761 {
00762 xferp->mNext = mSendList;
00763 mSendList = xferp;
00764 result = xferp->startSend(id,mesgsys->getSender());
00765 }
00766 else
00767 {
00768 llerrs << "Xfer allcoation error" << llendl;
00769 }
00770 }
00771 else if (strlen(local_filename))
00772 {
00773 std::string expanded_filename = gDirUtilp->getExpandedFilename( local_path, local_filename );
00774 llinfos << "starting file transfer: " << expanded_filename << " to " << mesgsys->getSender() << llendl;
00775
00776 BOOL delete_local_on_completion = FALSE;
00777 mesgsys->getBOOL("XferID", "DeleteOnCompletion", delete_local_on_completion);
00778
00779
00780 xferp = (LLXfer *)new LLXfer_File(expanded_filename, delete_local_on_completion, b_use_big_packets ? LL_XFER_LARGE_PAYLOAD : -1);
00781
00782 if (xferp)
00783 {
00784 xferp->mNext = mSendList;
00785 mSendList = xferp;
00786 result = xferp->startSend(id,mesgsys->getSender());
00787 }
00788 else
00789 {
00790 llerrs << "Xfer allcoation error" << llendl;
00791 }
00792 }
00793 else
00794 {
00795 char U64_BUF[MAX_STRING];
00796 llinfos << "starting memory transfer: "
00797 << U64_to_str(id, U64_BUF, sizeof(U64_BUF)) << " to "
00798 << mesgsys->getSender() << llendl;
00799
00800 xferp = findXfer(id, mSendList);
00801
00802 if (xferp)
00803 {
00804 result = xferp->startSend(id,mesgsys->getSender());
00805 }
00806 else
00807 {
00808 llinfos << "Warning: " << U64_BUF << " not found." << llendl;
00809 result = LL_ERR_FILE_NOT_FOUND;
00810 }
00811 }
00812
00813 if (result)
00814 {
00815 if (xferp)
00816 {
00817 xferp->abort(result);
00818 removeXfer(xferp,&mSendList);
00819 }
00820 else
00821 {
00822 llinfos << "Aborting xfer to " << mesgsys->getSender() << " with error: " << result << llendl;
00823
00824 mesgsys->newMessageFast(_PREHASH_AbortXfer);
00825 mesgsys->nextBlockFast(_PREHASH_XferID);
00826 mesgsys->addU64Fast(_PREHASH_ID, id);
00827 mesgsys->addS32Fast(_PREHASH_Result, result);
00828
00829 mesgsys->sendMessage(mesgsys->getSender());
00830 }
00831 }
00832 else if(xferp && (numActiveXfers(xferp->mRemoteHost) < mMaxOutgoingXfersPerCircuit))
00833 {
00834 xferp->sendNextPacket();
00835 changeNumActiveXfers(xferp->mRemoteHost,1);
00836
00837 }
00838 else
00839 {
00840 if(xferp)
00841 {
00842 llinfos << " queueing xfer request, " << numPendingXfers(xferp->mRemoteHost) << " ahead of this one" << llendl;
00843 }
00844 else
00845 {
00846 llwarns << "LLXferManager::processFileRequest() - no xfer found!"
00847 << llendl;
00848 }
00849 }
00850 }
00851
00853
00854 void LLXferManager::processConfirmation (LLMessageSystem *mesgsys, void ** )
00855 {
00856 U64 id = 0;
00857 S32 packetNum = 0;
00858
00859 mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
00860 mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Packet, packetNum);
00861
00862 LLXfer* xferp = findXfer(id, mSendList);
00863 if (xferp)
00864 {
00865
00866 xferp->mWaitingForACK = FALSE;
00867 if (xferp->mStatus == e_LL_XFER_IN_PROGRESS)
00868 {
00869 xferp->sendNextPacket();
00870 }
00871 else
00872 {
00873 removeXfer(xferp, &mSendList);
00874 }
00875 }
00876 }
00877
00879
00880 void LLXferManager::retransmitUnackedPackets ()
00881 {
00882 LLXfer *xferp;
00883 LLXfer *delp;
00884 xferp = mReceiveList;
00885 while(xferp)
00886 {
00887 if (xferp->mStatus == e_LL_XFER_IN_PROGRESS)
00888 {
00889
00890 if (! gMessageSystem->mCircuitInfo.isCircuitAlive( xferp->mRemoteHost ))
00891 {
00892 llinfos << "Xfer found in progress on dead circuit, aborting" << llendl;
00893 xferp->mCallbackResult = LL_ERR_CIRCUIT_GONE;
00894 xferp->processEOF();
00895 delp = xferp;
00896 xferp = xferp->mNext;
00897 removeXfer(delp,&mReceiveList);
00898 continue;
00899 }
00900
00901 }
00902 xferp = xferp->mNext;
00903 }
00904
00905 xferp = mSendList;
00906 updateHostStatus();
00907 F32 et;
00908 while (xferp)
00909 {
00910 if (xferp->mWaitingForACK && ( (et = xferp->ACKTimer.getElapsedTimeF32()) > LL_PACKET_TIMEOUT))
00911 {
00912 if (xferp->mRetries > LL_PACKET_RETRY_LIMIT)
00913 {
00914 llinfos << "dropping xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " packet retransmit limit exceeded, xfer dropped" << llendl;
00915 xferp->abort(LL_ERR_TCP_TIMEOUT);
00916 delp = xferp;
00917 xferp = xferp->mNext;
00918 removeXfer(delp,&mSendList);
00919 }
00920 else
00921 {
00922 llinfos << "resending xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " packet unconfirmed after: "<< et << " sec, packet " << xferp->mPacketNum << llendl;
00923 xferp->resendLastPacket();
00924 xferp = xferp->mNext;
00925 }
00926 }
00927 else if ((xferp->mStatus == e_LL_XFER_REGISTERED) && ( (et = xferp->ACKTimer.getElapsedTimeF32()) > LL_XFER_REGISTRATION_TIMEOUT))
00928 {
00929 llinfos << "registered xfer never requested, xfer dropped" << llendl;
00930 xferp->abort(LL_ERR_TCP_TIMEOUT);
00931 delp = xferp;
00932 xferp = xferp->mNext;
00933 removeXfer(delp,&mSendList);
00934 }
00935 else if (xferp->mStatus == e_LL_XFER_ABORTED)
00936 {
00937 llwarns << "Removing aborted xfer " << xferp->mRemoteHost << ":" << xferp->getName() << llendl;
00938 delp = xferp;
00939 xferp = xferp->mNext;
00940 removeXfer(delp,&mSendList);
00941 }
00942 else if (xferp->mStatus == e_LL_XFER_PENDING)
00943 {
00944
00945 if (numActiveXfers(xferp->mRemoteHost) < mMaxOutgoingXfersPerCircuit)
00946 {
00947
00948 xferp->sendNextPacket();
00949 changeNumActiveXfers(xferp->mRemoteHost,1);
00950 }
00951 xferp = xferp->mNext;
00952 }
00953 else
00954 {
00955 xferp = xferp->mNext;
00956 }
00957 }
00958
00959
00960
00961
00962
00963
00964 while (mXferAckQueue.getLength())
00965 {
00966 if (mAckThrottle.checkOverflow(1000.0f*8.0f))
00967 {
00968 break;
00969 }
00970
00971 LLXferAckInfo ack_info;
00972 mXferAckQueue.pop(ack_info);
00973
00974 sendConfirmPacket(gMessageSystem, ack_info.mID, ack_info.mPacketNum, ack_info.mRemoteHost);
00975 mAckThrottle.throttleOverflow(1000.f*8.f);
00976 }
00977 }
00978
00979
00981
00982 void LLXferManager::processAbort (LLMessageSystem *mesgsys, void ** )
00983 {
00984 U64 id = 0;
00985 S32 result_code = 0;
00986 LLXfer * xferp;
00987
00988 mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
00989 mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Result, result_code);
00990
00991 xferp = findXfer(id, mReceiveList);
00992 if (xferp)
00993 {
00994 xferp->mCallbackResult = result_code;
00995 xferp->processEOF();
00996 removeXfer(xferp, &mReceiveList);
00997 startPendingDownloads();
00998 }
00999 }
01000
01002
01003 void LLXferManager::startPendingDownloads()
01004 {
01005
01006
01007
01008
01009
01010
01011
01012 LLXfer* xferp = mReceiveList;
01013 LLLinkedList<LLXfer> pending_downloads;
01014 S32 download_count = 0;
01015 S32 pending_count = 0;
01016 while(xferp)
01017 {
01018 if(xferp->mStatus == e_LL_XFER_PENDING)
01019 {
01020 ++pending_count;
01021 pending_downloads.addData(xferp);
01022 }
01023 else if(xferp->mStatus == e_LL_XFER_IN_PROGRESS)
01024 {
01025 ++download_count;
01026 }
01027 xferp = xferp->mNext;
01028 }
01029
01030 S32 start_count = mMaxIncomingXfers - download_count;
01031
01032 lldebugs << "LLXferManager::startPendingDownloads() - XFER_IN_PROGRESS: "
01033 << download_count << " XFER_PENDING: " << pending_count
01034 << " startring " << llmin(start_count, pending_count) << llendl;
01035
01036 if((start_count > 0) && (pending_count > 0))
01037 {
01038 S32 result;
01039 xferp = pending_downloads.getFirstData();
01040 while(start_count-- && xferp)
01041 {
01042 result = xferp->startDownload();
01043 if(result)
01044 {
01045 xferp->abort(result);
01046 ++start_count;
01047 }
01048 xferp = pending_downloads.getNextData();
01049 }
01050 }
01051 }
01052
01054
01055 void LLXferManager::addToList(LLXfer* xferp, LLXfer*& head, BOOL is_priority)
01056 {
01057 if(is_priority)
01058 {
01059 xferp->mNext = NULL;
01060 LLXfer* next = head;
01061 if(next)
01062 {
01063 while(next->mNext)
01064 {
01065 next = next->mNext;
01066 }
01067 next->mNext = xferp;
01068 }
01069 else
01070 {
01071 head = xferp;
01072 }
01073 }
01074 else
01075 {
01076 xferp->mNext = head;
01077 head = xferp;
01078 }
01079 }
01080
01082
01084
01085 LLXferManager *gXferManager = NULL;
01086
01087
01088 void start_xfer_manager(LLVFS *vfs)
01089 {
01090 gXferManager = new LLXferManager(vfs);
01091 }
01092
01093 void cleanup_xfer_manager()
01094 {
01095 if (gXferManager)
01096 {
01097 delete(gXferManager);
01098 gXferManager = NULL;
01099 }
01100 }
01101
01102 void process_confirm_packet (LLMessageSystem *mesgsys, void **user_data)
01103 {
01104 gXferManager->processConfirmation(mesgsys,user_data);
01105 }
01106
01107 void process_request_xfer(LLMessageSystem *mesgsys, void **user_data)
01108 {
01109 gXferManager->processFileRequest(mesgsys,user_data);
01110 }
01111
01112 void continue_file_receive(LLMessageSystem *mesgsys, void **user_data)
01113 {
01114 #if LL_TEST_XFER_REXMIT
01115 if (ll_frand() > 0.05f)
01116 {
01117 #endif
01118 gXferManager->processReceiveData(mesgsys,user_data);
01119 #if LL_TEST_XFER_REXMIT
01120 }
01121 else
01122 {
01123 cout << "oops! dropped a xfer packet" << endl;
01124 }
01125 #endif
01126 }
01127
01128 void process_abort_xfer(LLMessageSystem *mesgsys, void **user_data)
01129 {
01130 gXferManager->processAbort(mesgsys,user_data);
01131 }
01132
01133
01134
01135
01136
01137
01138
01139
01140
01141
01142
01143
01144
01145
01146
01147
01148
01149
01150
01151
01152
01153
01154
01155
01156
01157