00001 
00032 #include "linden_common.h"
00033 
00034 #include "llxfermanager.h"
00035 
00036 #include "llxfer.h"
00037 #include "llxfer_file.h"
00038 #include "llxfer_mem.h"
00039 #include "llxfer_vfile.h"
00040 
00041 #include "llerror.h"
00042 #include "lluuid.h"
00043 #include "u64.h"
00044 
00045 const F32 LL_XFER_REGISTRATION_TIMEOUT = 60.0f;  
00046 const F32 LL_PACKET_TIMEOUT = 3.0f;             
00047 const S32 LL_PACKET_RETRY_LIMIT = 10;            
00048 
00049 const S32 LL_DEFAULT_MAX_SIMULTANEOUS_XFERS = 10;
00050 const S32 LL_DEFAULT_MAX_REQUEST_FIFO_XFERS = 1000;
00051 
00052 #define LL_XFER_PROGRESS_MESSAGES 0
00053 #define LL_XFER_TEST_REXMIT       0
00054 
00055 
00057 
00058 LLXferManager::LLXferManager (LLVFS *vfs)
00059 {
00060         init(vfs);
00061 }
00062 
00064 
00065 LLXferManager::~LLXferManager ()
00066 {
00067         free();
00068 }
00069 
00071 
00072 void LLXferManager::init (LLVFS *vfs)
00073 {
00074         mSendList = NULL;
00075         mReceiveList = NULL;
00076 
00077         setMaxOutgoingXfersPerCircuit(LL_DEFAULT_MAX_SIMULTANEOUS_XFERS);
00078         setMaxIncomingXfers(LL_DEFAULT_MAX_REQUEST_FIFO_XFERS);
00079 
00080         mVFS = vfs;
00081 
00082         
00083         mUseAckThrottling = FALSE;
00084         setAckThrottleBPS(100000);
00085 }
00086         
00088 
00089 void LLXferManager::free ()
00090 {
00091         LLXfer *xferp;
00092         LLXfer *delp;
00093         
00094         mOutgoingHosts.deleteAllData();
00095 
00096         delp = mSendList;
00097         while (delp)
00098         {
00099                 xferp = delp->mNext;
00100                 delete delp;
00101                 delp = xferp;
00102         }
00103         mSendList = NULL;
00104 
00105         delp = mReceiveList;
00106         while (delp)
00107         {
00108                 xferp = delp->mNext;
00109                 delete delp;
00110                 delp = xferp;
00111         }
00112         mReceiveList = NULL;
00113 }
00114 
00116 
00117 void LLXferManager::setMaxIncomingXfers(S32 max_num)
00118 {
00119         mMaxIncomingXfers = max_num;
00120 }
00121 
00123 
00124 void LLXferManager::setMaxOutgoingXfersPerCircuit(S32 max_num)
00125 {
00126         mMaxOutgoingXfersPerCircuit = max_num;
00127 }
00128 
00129 void LLXferManager::setUseAckThrottling(const BOOL use)
00130 {
00131         mUseAckThrottling = use;
00132 }
00133 
00134 void LLXferManager::setAckThrottleBPS(const F32 bps)
00135 {
00136         
00137         
00138 
00139         
00140         
00141         F32 min_bps = (1000.f * 8.f* mMaxIncomingXfers) / LL_PACKET_TIMEOUT;
00142 
00143         
00144         F32 actual_rate = llmax(min_bps*1.1f, bps);
00145         llinfos << "LLXferManager ack throttle min rate: " << min_bps << llendl;
00146         llinfos << "LLXferManager ack throttle actual rate: " << actual_rate << llendl;
00147         mAckThrottle.setRate(actual_rate);
00148 }
00149 
00150 
00152 
00153 void LLXferManager::updateHostStatus()
00154 {
00155     LLXfer *xferp;
00156         LLHostStatus *host_statusp = NULL;
00157 
00158         mOutgoingHosts.deleteAllData();
00159 
00160         for (xferp = mSendList; xferp; xferp = xferp->mNext)
00161         {
00162                 for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData())
00163                 {
00164                         if (host_statusp->mHost == xferp->mRemoteHost)
00165                         {
00166                                 break;
00167                         }
00168                 }
00169                 if (!host_statusp)
00170                 {
00171                         host_statusp = new LLHostStatus();
00172                         if (host_statusp)
00173                         {
00174                                 host_statusp->mHost = xferp->mRemoteHost;
00175                                 mOutgoingHosts.addData(host_statusp);
00176                         }
00177                 }
00178                 if (host_statusp)
00179                 {
00180                         if (xferp->mStatus == e_LL_XFER_PENDING)
00181                         {
00182                                 host_statusp->mNumPending++;
00183                         }
00184                         else if (xferp->mStatus == e_LL_XFER_IN_PROGRESS)
00185                         {
00186                                 host_statusp->mNumActive++;
00187                         }
00188                 }
00189                 
00190         }       
00191 }
00192 
00194 
00195 void LLXferManager::printHostStatus()
00196 {
00197         LLHostStatus *host_statusp = NULL;
00198         if (mOutgoingHosts.getFirstData())
00199         {
00200                 llinfos << "Outgoing Xfers:" << llendl;
00201 
00202                 for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData())
00203                 {
00204                         llinfos << "    " << host_statusp->mHost << "  active: " << host_statusp->mNumActive << "  pending: " << host_statusp->mNumPending << llendl;
00205                 }
00206         }       
00207 }
00208 
00210 
00211 LLXfer *LLXferManager::findXfer (U64 id, LLXfer *list_head)
00212 {
00213     LLXfer *xferp;
00214         for (xferp = list_head; xferp; xferp = xferp->mNext)
00215         {
00216                 if (xferp->mID == id)
00217                 {
00218                         return(xferp);
00219                 }
00220         }
00221         return(NULL);
00222 }
00223 
00224 
00226 
00227 void LLXferManager::removeXfer (LLXfer *delp, LLXfer **list_head)
00228 {
00229         
00230         
00231         if (delp)
00232         {
00233                 if (*list_head == delp)
00234                 {
00235                         *list_head = delp->mNext;
00236                         delete (delp);
00237                 }
00238                 else
00239                 {
00240                         LLXfer *xferp = *list_head;
00241                         while (xferp->mNext)
00242                         {
00243                                 if (xferp->mNext == delp)
00244                                 {
00245                                         xferp->mNext = delp->mNext;
00246                                         delete (delp);
00247                                         break;
00248                                 }
00249                                 xferp = xferp->mNext;
00250                         }
00251                 }
00252         }
00253 }
00254 
00256 
00257 U32 LLXferManager::numActiveListEntries(LLXfer *list_head)
00258 {
00259         U32 num_entries = 0;
00260 
00261         while (list_head)
00262         {
00263                 if ((list_head->mStatus == e_LL_XFER_IN_PROGRESS)) 
00264                 {
00265                         num_entries++;
00266                 }
00267                 list_head = list_head->mNext;
00268         }
00269         return(num_entries);
00270 }
00271 
00273 
00274 S32 LLXferManager::numPendingXfers(const LLHost &host)
00275 {
00276         LLHostStatus *host_statusp = NULL;
00277 
00278         for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData())
00279         {
00280                 if (host_statusp->mHost == host)
00281                 {
00282                         return (host_statusp->mNumPending);
00283                 }
00284         }
00285         return 0;
00286 }
00287 
00289 
00290 S32 LLXferManager::numActiveXfers(const LLHost &host)
00291 {
00292         LLHostStatus *host_statusp = NULL;
00293 
00294         for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData())
00295         {
00296                 if (host_statusp->mHost == host)
00297                 {
00298                         return (host_statusp->mNumActive);
00299                 }
00300         }
00301         return 0;
00302 }
00303 
00305 
00306 void LLXferManager::changeNumActiveXfers(const LLHost &host, S32 delta)
00307 {
00308         LLHostStatus *host_statusp = NULL;
00309 
00310         for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData())
00311         {
00312                 if (host_statusp->mHost == host)
00313                 {
00314                         host_statusp->mNumActive += delta;
00315                 }
00316         }
00317 }
00318 
00320 
00321 void LLXferManager::registerCallbacks(LLMessageSystem *msgsystem)
00322 {
00323         msgsystem->setHandlerFuncFast(_PREHASH_ConfirmXferPacket,  process_confirm_packet, NULL);
00324         msgsystem->setHandlerFuncFast(_PREHASH_RequestXfer,        process_request_xfer,        NULL);
00325         msgsystem->setHandlerFuncFast(_PREHASH_SendXferPacket,          continue_file_receive,           NULL);
00326         msgsystem->setHandlerFuncFast(_PREHASH_AbortXfer,               process_abort_xfer,                  NULL);
00327 }
00328 
00330 
00331 U64 LLXferManager::getNextID ()
00332 {
00333         LLUUID a_guid;
00334 
00335         a_guid.generate();
00336 
00337         
00338         return(*((U64*)(a_guid.mData)));
00339 }
00340 
00342 
00343 S32 LLXferManager::encodePacketNum(S32 packet_num, BOOL is_EOF)
00344 {
00345         if (is_EOF)
00346         {
00347                 packet_num |= 0x80000000;
00348         }
00349         return packet_num;
00350 }
00351 
00353 
00354 S32 LLXferManager::decodePacketNum(S32 packet_num)
00355 {
00356         return(packet_num & 0x0FFFFFFF);
00357 }
00358 
00360 
00361 BOOL LLXferManager::isLastPacket(S32 packet_num)
00362 {
00363         return(packet_num & 0x80000000);
00364 }
00365 
00367 
00368 U64 LLXferManager::registerXfer(const void *datap, const S32 length)
00369 {
00370         LLXfer *xferp;
00371         U64 xfer_id = getNextID();
00372 
00373         xferp = (LLXfer *) new LLXfer_Mem();
00374         if (xferp)
00375         {
00376                 xferp->mNext = mSendList;
00377                 mSendList = xferp;
00378 
00379                 xfer_id = ((LLXfer_Mem *)xferp)->registerXfer(xfer_id, datap,length);
00380 
00381                 if (!xfer_id)
00382                 {
00383                         removeXfer(xferp,&mSendList);
00384                 }
00385         }
00386         else
00387         {
00388                 llerrs << "Xfer allocation error" << llendl;
00389                 xfer_id = 0;
00390         }       
00391 
00392     return(xfer_id);
00393 }
00394 
00396 
00397 void LLXferManager::requestFile(const char* local_filename,
00398                                                                 const char* remote_filename,
00399                                                                 ELLPath remote_path,
00400                                                                 const LLHost& remote_host,
00401                                                                 BOOL delete_remote_on_completion,
00402                                                                 void (*callback)(void**,S32,LLExtStat),
00403                                                                 void** user_data,
00404                                                                 BOOL is_priority,
00405                                                                 BOOL use_big_packets)
00406 {
00407         LLXfer *xferp;
00408 
00409         for (xferp = mReceiveList; xferp ; xferp = xferp->mNext)
00410         {
00411                 if (xferp->getXferTypeTag() == LLXfer::XFER_FILE
00412                         && (((LLXfer_File*)xferp)->matchesLocalFilename(local_filename))
00413                         && (((LLXfer_File*)xferp)->matchesRemoteFilename(remote_filename, remote_path))
00414                         && (remote_host == xferp->mRemoteHost)
00415                         && (callback == xferp->mCallback)
00416                         && (user_data == xferp->mCallbackDataHandle))
00417 
00418                 {
00419                         
00420                         return;
00421                 }
00422         }
00423 
00424         S32 chunk_size = use_big_packets ? LL_XFER_LARGE_PAYLOAD : -1;
00425         xferp = (LLXfer *) new LLXfer_File(chunk_size);
00426         if (xferp)
00427         {
00428                 addToList(xferp, mReceiveList, is_priority);
00429 
00430                 
00431                 
00432                 
00433                 
00434                 if(delete_remote_on_completion &&
00435                    (strstr(remote_filename,".tmp") == &remote_filename[strlen(remote_filename)-4]))             
00436                 {
00437                         LLFile::remove(local_filename);
00438                 }
00439                 ((LLXfer_File *)xferp)->initializeRequest(
00440                         getNextID(),
00441                         local_filename,
00442                         remote_filename,
00443                         remote_path,
00444                         remote_host,
00445                         delete_remote_on_completion,
00446                         callback,user_data);
00447                 startPendingDownloads();
00448         }
00449         else
00450         {
00451                 llerrs << "Xfer allocation error" << llendl;
00452         }
00453 }
00454 
00455 void LLXferManager::requestFile(const char* remote_filename,
00456                                                                 ELLPath remote_path,
00457                                                                 const LLHost& remote_host,
00458                                                                 BOOL delete_remote_on_completion,
00459                                                                 void (*callback)(void*,S32,void**,S32,LLExtStat),
00460                                                                 void** user_data,
00461                                                                 BOOL is_priority)
00462 {
00463         LLXfer *xferp;
00464 
00465         xferp = (LLXfer *) new LLXfer_Mem();
00466         if (xferp)
00467         {
00468                 addToList(xferp, mReceiveList, is_priority);
00469                 ((LLXfer_Mem *)xferp)->initializeRequest(getNextID(),
00470                                                                                                  remote_filename, 
00471                                                                                                  remote_path,
00472                                                                                                  remote_host,
00473                                                                                                  delete_remote_on_completion,
00474                                                                                                  callback, user_data);
00475                 startPendingDownloads();
00476         }
00477         else
00478         {
00479                 llerrs << "Xfer allocation error" << llendl;
00480         }
00481 }
00482 
00483 void LLXferManager::requestVFile(const LLUUID& local_id,
00484                                                                  const LLUUID& remote_id,
00485                                                                  LLAssetType::EType type, LLVFS* vfs,
00486                                                                  const LLHost& remote_host,
00487                                                                  void (*callback)(void**,S32,LLExtStat),
00488                                                                  void** user_data,
00489                                                                  BOOL is_priority)
00490 {
00491         LLXfer *xferp;
00492 
00493         for (xferp = mReceiveList; xferp ; xferp = xferp->mNext)
00494         {
00495                 if (xferp->getXferTypeTag() == LLXfer::XFER_VFILE
00496                         && (((LLXfer_VFile*)xferp)->matchesLocalFile(local_id, type))
00497                         && (((LLXfer_VFile*)xferp)->matchesRemoteFile(remote_id, type))
00498                         && (remote_host == xferp->mRemoteHost)
00499                         && (callback == xferp->mCallback)
00500                         && (user_data == xferp->mCallbackDataHandle))
00501 
00502                 {
00503                         
00504                         return;
00505                 }
00506         }
00507 
00508         xferp = (LLXfer *) new LLXfer_VFile();
00509         if (xferp)
00510         {
00511                 addToList(xferp, mReceiveList, is_priority);
00512                 ((LLXfer_VFile *)xferp)->initializeRequest(getNextID(),
00513                         vfs,
00514                         local_id,
00515                         remote_id,
00516                         type,
00517                         remote_host,
00518                         callback,
00519                         user_data);
00520                 startPendingDownloads();
00521         }
00522         else
00523         {
00524                 llerrs << "Xfer allocation error" << llendl;
00525         }
00526 
00527 }
00528 
00529 
00530 
00531 
00532 
00533 
00534 
00535 
00536 
00537 
00538 
00539 
00540 
00541 
00542 
00543 
00544 
00545 
00546 
00547 
00548 
00549 
00550 
00551 
00552 
00553 
00554 
00555 
00556 
00557 
00558 
00559 
00560 
00561 
00562 
00563 
00564 
00565 
00566 
00567 
00568 
00569 
00570 
00571 
00572 
00573 
00574 
00575 
00576 
00577 
00578 
00579 
00580 
00581 
00582 
00583 
00584 
00585 
00586 
00587 
00588 
00590 
00591 void LLXferManager::processReceiveData (LLMessageSystem *mesgsys, void ** )
00592 {
00593         
00594         const S32 BUF_SIZE = LL_XFER_LARGE_PAYLOAD + 4;
00595         char fdata_buf[LL_XFER_LARGE_PAYLOAD + 4];              
00596         S32 fdata_size;
00597         U64 id;
00598         S32 packetnum;
00599         LLXfer * xferp;
00600         
00601         mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
00602         mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Packet, packetnum);
00603 
00604         fdata_size = mesgsys->getSizeFast(_PREHASH_DataPacket,_PREHASH_Data);
00605         mesgsys->getBinaryDataFast(_PREHASH_DataPacket, _PREHASH_Data, fdata_buf, 0, 0, BUF_SIZE);
00606 
00607         xferp = findXfer(id, mReceiveList);
00608 
00609         if (!xferp) 
00610         {
00611                 char U64_BUF[MAX_STRING];               
00612                 llwarns << "received xfer data from " << mesgsys->getSender()
00613                         << " for non-existent xfer id: "
00614                         << U64_to_str(id, U64_BUF, sizeof(U64_BUF)) << llendl;
00615                 return;
00616         }
00617 
00618         S32 xfer_size;
00619 
00620         if (decodePacketNum(packetnum) != xferp->mPacketNum) 
00621         {
00622                 
00623                 if (decodePacketNum(packetnum) == (xferp->mPacketNum - 1))
00624                 {
00625                         llinfos << "Reconfirming xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " packet " << packetnum << llendl;                  sendConfirmPacket(mesgsys, id, decodePacketNum(packetnum), mesgsys->getSender());
00626                 }
00627                 else
00628                 {
00629                         llinfos << "Ignoring xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " recv'd packet " << packetnum << "; expecting " << xferp->mPacketNum << llendl;
00630                 }
00631                 return;         
00632         }
00633 
00634         S32 result = 0;
00635 
00636         if (xferp->mPacketNum == 0) 
00637         {
00638                 ntohmemcpy(&xfer_size,fdata_buf,MVT_S32,sizeof(S32));
00639                 
00640 
00641                 xferp->setXferSize(xfer_size);
00642 
00643                 
00644                 result = xferp->receiveData(&(fdata_buf[sizeof(S32)]),fdata_size-(sizeof(S32)));
00645         }
00646         else
00647         {
00648                 result = xferp->receiveData(fdata_buf,fdata_size);
00649         }
00650         
00651         if (result == LL_ERR_CANNOT_OPEN_FILE)
00652         {
00653                         xferp->abort(LL_ERR_CANNOT_OPEN_FILE);
00654                         removeXfer(xferp,&mReceiveList);
00655                         startPendingDownloads();
00656                         return;         
00657         }
00658 
00659         xferp->mPacketNum++;  
00660 
00661         if (!mUseAckThrottling)
00662         {
00663                 
00664                 sendConfirmPacket(mesgsys, id, decodePacketNum(packetnum), mesgsys->getSender());
00665         }
00666         else
00667         {
00668                 
00669                 LLXferAckInfo ack_info;
00670                 ack_info.mID = id;
00671                 ack_info.mPacketNum = decodePacketNum(packetnum);
00672                 ack_info.mRemoteHost = mesgsys->getSender();
00673                 mXferAckQueue.push(ack_info);
00674         }
00675 
00676         if (isLastPacket(packetnum))
00677         {
00678                 xferp->processEOF();
00679                 removeXfer(xferp,&mReceiveList);
00680                 startPendingDownloads();
00681         }
00682 }
00683 
00685 
00686 void LLXferManager::sendConfirmPacket (LLMessageSystem *mesgsys, U64 id, S32 packetnum, const LLHost &remote_host)
00687 {
00688 #if LL_XFER_PROGRESS_MESSAGES
00689         if (!(packetnum % 50))
00690         {
00691                 cout << "confirming xfer packet #" << packetnum << endl;
00692         }
00693 #endif
00694         mesgsys->newMessageFast(_PREHASH_ConfirmXferPacket);
00695         mesgsys->nextBlockFast(_PREHASH_XferID);
00696         mesgsys->addU64Fast(_PREHASH_ID, id);
00697         mesgsys->addU32Fast(_PREHASH_Packet, packetnum);
00698 
00699         mesgsys->sendMessage(remote_host);
00700 }
00701 
00703 
00704 void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** )
00705 {
00706                 
00707         U64 id;
00708         char local_filename[MAX_STRING];                
00709         ELLPath local_path = LL_PATH_NONE;
00710         S32 result = LL_ERR_NOERR;
00711         LLUUID  uuid;
00712         LLAssetType::EType type;
00713         S16 type_s16;
00714         BOOL b_use_big_packets;
00715 
00716         mesgsys->getBOOL("XferID", "UseBigPackets", b_use_big_packets);
00717         
00718         mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
00719         char U64_BUF[MAX_STRING];               
00720         llinfos << "xfer request id: " << U64_to_str(id, U64_BUF, sizeof(U64_BUF))
00721                    << " to " << mesgsys->getSender() << llendl;
00722 
00723         mesgsys->getStringFast(_PREHASH_XferID, _PREHASH_Filename, MAX_STRING, local_filename);
00724         
00725         U8 local_path_u8;
00726         mesgsys->getU8("XferID", "FilePath", local_path_u8);
00727         if( local_path_u8 < (U8)LL_PATH_COUNT )
00728         {
00729                 local_path = (ELLPath)local_path_u8;
00730         }
00731         else
00732         {
00733                 llwarns << "Invalid file path in LLXferManager::processFileRequest() " << (U32)local_path_u8 << llendl;
00734         }
00735 
00736         mesgsys->getUUIDFast(_PREHASH_XferID, _PREHASH_VFileID, uuid);
00737         mesgsys->getS16Fast(_PREHASH_XferID, _PREHASH_VFileType, type_s16);
00738         type = (LLAssetType::EType)type_s16;
00739 
00740         LLXfer *xferp;
00741 
00742         if (uuid != LLUUID::null)
00743         {
00744                 if(NULL == LLAssetType::lookup(type))
00745                 {
00746                         llwarns << "Invalid type for xfer request: " << uuid << ":"
00747                                         << type_s16 << " to " << mesgsys->getSender() << llendl;
00748                         return;
00749                 }
00750                         
00751                 llinfos << "starting vfile transfer: " << uuid << "," << LLAssetType::lookup(type) << " to " << mesgsys->getSender() << llendl;
00752 
00753                 if (! mVFS)
00754                 {
00755                         llwarns << "Attempt to send VFile w/o available VFS" << llendl;
00756                         return;
00757                 }
00758 
00759                 xferp = (LLXfer *)new LLXfer_VFile(mVFS, uuid, type);
00760                 if (xferp)
00761                 {
00762                         xferp->mNext = mSendList;
00763                         mSendList = xferp;      
00764                         result = xferp->startSend(id,mesgsys->getSender());
00765                 }
00766                 else
00767                 {
00768                         llerrs << "Xfer allcoation error" << llendl;
00769                 }
00770         }
00771         else if (strlen(local_filename))                
00772         {
00773                 std::string expanded_filename = gDirUtilp->getExpandedFilename( local_path, local_filename );
00774                 llinfos << "starting file transfer: " <<  expanded_filename << " to " << mesgsys->getSender() << llendl;
00775 
00776                 BOOL delete_local_on_completion = FALSE;
00777                 mesgsys->getBOOL("XferID", "DeleteOnCompletion", delete_local_on_completion);
00778 
00779                 
00780                 xferp = (LLXfer *)new LLXfer_File(expanded_filename, delete_local_on_completion, b_use_big_packets ? LL_XFER_LARGE_PAYLOAD : -1);
00781                 
00782                 if (xferp)
00783                 {
00784                         xferp->mNext = mSendList;
00785                         mSendList = xferp;      
00786                         result = xferp->startSend(id,mesgsys->getSender());
00787                 }
00788                 else
00789                 {
00790                         llerrs << "Xfer allcoation error" << llendl;
00791                 }
00792         }
00793         else
00794         {
00795                 char U64_BUF[MAX_STRING];               
00796                 llinfos << "starting memory transfer: "
00797                         << U64_to_str(id, U64_BUF, sizeof(U64_BUF)) << " to "
00798                         << mesgsys->getSender() << llendl;
00799 
00800                 xferp = findXfer(id, mSendList);
00801                 
00802                 if (xferp)
00803                 {
00804                         result = xferp->startSend(id,mesgsys->getSender());
00805                 }
00806                 else
00807                 {
00808                         llinfos << "Warning: " << U64_BUF << " not found." << llendl;
00809                         result = LL_ERR_FILE_NOT_FOUND;
00810                 }
00811         }
00812 
00813         if (result)
00814         {
00815                 if (xferp)
00816                 {
00817                         xferp->abort(result);
00818                         removeXfer(xferp,&mSendList);
00819                 }
00820                 else 
00821                 {
00822                         llinfos << "Aborting xfer to " << mesgsys->getSender() << " with error: " << result << llendl;
00823 
00824                         mesgsys->newMessageFast(_PREHASH_AbortXfer);
00825                         mesgsys->nextBlockFast(_PREHASH_XferID);
00826                         mesgsys->addU64Fast(_PREHASH_ID, id);
00827                         mesgsys->addS32Fast(_PREHASH_Result, result);
00828         
00829                         mesgsys->sendMessage(mesgsys->getSender());             
00830                 }
00831         }
00832         else if(xferp && (numActiveXfers(xferp->mRemoteHost) < mMaxOutgoingXfersPerCircuit))
00833         {
00834                 xferp->sendNextPacket();
00835                 changeNumActiveXfers(xferp->mRemoteHost,1);
00836 
00837         }
00838         else
00839         {
00840                 if(xferp)
00841                 {
00842                         llinfos << "  queueing xfer request, " << numPendingXfers(xferp->mRemoteHost) << " ahead of this one" << llendl;
00843                 }
00844                 else
00845                 {
00846                         llwarns << "LLXferManager::processFileRequest() - no xfer found!"
00847                                         << llendl;
00848                 }
00849         }
00850 }
00851 
00853 
00854 void LLXferManager::processConfirmation (LLMessageSystem *mesgsys, void ** )
00855 {
00856         U64 id = 0;
00857         S32 packetNum = 0;
00858 
00859         mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
00860         mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Packet, packetNum);
00861 
00862         LLXfer* xferp = findXfer(id, mSendList);
00863         if (xferp)
00864         {
00865 
00866                 xferp->mWaitingForACK = FALSE;
00867                 if (xferp->mStatus == e_LL_XFER_IN_PROGRESS)
00868                 {
00869                         xferp->sendNextPacket();
00870                 }
00871                 else
00872                 {
00873                         removeXfer(xferp, &mSendList);
00874                 }
00875         }
00876 }
00877 
00879 
00880 void LLXferManager::retransmitUnackedPackets ()
00881 {
00882         LLXfer *xferp;
00883         LLXfer *delp;
00884         xferp = mReceiveList;
00885         while(xferp)
00886         {
00887                 if (xferp->mStatus == e_LL_XFER_IN_PROGRESS)
00888                 {
00889                         
00890                         if (! gMessageSystem->mCircuitInfo.isCircuitAlive( xferp->mRemoteHost ))
00891                         {
00892                                 llinfos << "Xfer found in progress on dead circuit, aborting" << llendl;
00893                                 xferp->mCallbackResult = LL_ERR_CIRCUIT_GONE;
00894                                 xferp->processEOF();
00895                                 delp = xferp;
00896                                 xferp = xferp->mNext;
00897                                 removeXfer(delp,&mReceiveList);
00898                                 continue;
00899                         }
00900                                 
00901                 }
00902                 xferp = xferp->mNext;
00903         }
00904 
00905         xferp = mSendList; 
00906         updateHostStatus();
00907         F32 et;
00908         while (xferp)
00909         {
00910                 if (xferp->mWaitingForACK && ( (et = xferp->ACKTimer.getElapsedTimeF32()) > LL_PACKET_TIMEOUT))
00911                 {
00912                         if (xferp->mRetries > LL_PACKET_RETRY_LIMIT)
00913                         {
00914                                 llinfos << "dropping xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " packet retransmit limit exceeded, xfer dropped" << llendl;
00915                                 xferp->abort(LL_ERR_TCP_TIMEOUT);
00916                                 delp = xferp;
00917                                 xferp = xferp->mNext;
00918                                 removeXfer(delp,&mSendList);
00919                         }
00920                         else
00921                         {
00922                                 llinfos << "resending xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " packet unconfirmed after: "<< et << " sec, packet " << xferp->mPacketNum << llendl;
00923                                 xferp->resendLastPacket();
00924                                 xferp = xferp->mNext;
00925                         }
00926                 }
00927                 else if ((xferp->mStatus == e_LL_XFER_REGISTERED) && ( (et = xferp->ACKTimer.getElapsedTimeF32()) > LL_XFER_REGISTRATION_TIMEOUT))
00928                 {
00929                         llinfos << "registered xfer never requested, xfer dropped" << llendl;
00930                         xferp->abort(LL_ERR_TCP_TIMEOUT);
00931                         delp = xferp;
00932                         xferp = xferp->mNext;
00933                         removeXfer(delp,&mSendList);
00934                 }
00935                 else if (xferp->mStatus == e_LL_XFER_ABORTED)
00936                 {
00937                         llwarns << "Removing aborted xfer " << xferp->mRemoteHost << ":" << xferp->getName() << llendl;
00938                         delp = xferp;
00939                         xferp = xferp->mNext;
00940                         removeXfer(delp,&mSendList);
00941                 }
00942                 else if (xferp->mStatus == e_LL_XFER_PENDING)
00943                 {
00944 
00945                         if (numActiveXfers(xferp->mRemoteHost) < mMaxOutgoingXfersPerCircuit)
00946                         {
00947 
00948                                 xferp->sendNextPacket();
00949                                 changeNumActiveXfers(xferp->mRemoteHost,1);
00950                         }                       
00951                         xferp = xferp->mNext;
00952                 }
00953                 else
00954                 {
00955                         xferp = xferp->mNext;
00956                 }
00957         }
00958 
00959         
00960         
00961         
00962         
00963 
00964         while (mXferAckQueue.getLength())
00965         {
00966                 if (mAckThrottle.checkOverflow(1000.0f*8.0f))
00967                 {
00968                         break;
00969                 }
00970                 
00971                 LLXferAckInfo ack_info;
00972                 mXferAckQueue.pop(ack_info);
00973                 
00974                 sendConfirmPacket(gMessageSystem, ack_info.mID, ack_info.mPacketNum, ack_info.mRemoteHost);
00975                 mAckThrottle.throttleOverflow(1000.f*8.f); 
00976         }
00977 }
00978 
00979 
00981 
00982 void LLXferManager::processAbort (LLMessageSystem *mesgsys, void ** )
00983 {
00984         U64 id = 0;
00985         S32 result_code = 0;
00986         LLXfer * xferp;
00987 
00988         mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
00989         mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Result, result_code);
00990 
00991         xferp = findXfer(id, mReceiveList);
00992         if (xferp)
00993         {
00994                 xferp->mCallbackResult = result_code;
00995                 xferp->processEOF();
00996                 removeXfer(xferp, &mReceiveList);
00997                 startPendingDownloads();
00998         }
00999 }
01000 
01002 
01003 void LLXferManager::startPendingDownloads()
01004 {
01005         
01006         
01007         
01008         
01009         
01010         
01011         
01012         LLXfer* xferp = mReceiveList;
01013         LLLinkedList<LLXfer> pending_downloads;
01014         S32 download_count = 0;
01015         S32 pending_count = 0;
01016         while(xferp)
01017         {
01018                 if(xferp->mStatus == e_LL_XFER_PENDING)
01019                 {
01020                         ++pending_count; 
01021                         pending_downloads.addData(xferp);
01022                 }
01023                 else if(xferp->mStatus == e_LL_XFER_IN_PROGRESS)
01024                 {
01025                         ++download_count;
01026                 }
01027                 xferp = xferp->mNext;
01028         }
01029 
01030         S32 start_count = mMaxIncomingXfers - download_count;
01031 
01032         lldebugs << "LLXferManager::startPendingDownloads() - XFER_IN_PROGRESS: "
01033                          << download_count << " XFER_PENDING: " << pending_count
01034                          << " startring " << llmin(start_count, pending_count) << llendl;
01035 
01036         if((start_count > 0) && (pending_count > 0))
01037         {
01038                 S32 result;
01039                 xferp = pending_downloads.getFirstData();
01040                 while(start_count-- && xferp)
01041                 {
01042                         result = xferp->startDownload();
01043                         if(result)
01044                         {
01045                                 xferp->abort(result);
01046                                 ++start_count;
01047                         }
01048                         xferp = pending_downloads.getNextData();
01049                 }
01050         }
01051 }
01052 
01054 
01055 void LLXferManager::addToList(LLXfer* xferp, LLXfer*& head, BOOL is_priority)
01056 {
01057         if(is_priority)
01058         {
01059                 xferp->mNext = NULL;
01060                 LLXfer* next = head;
01061                 if(next)
01062                 {
01063                         while(next->mNext)
01064                         {
01065                                 next = next->mNext;
01066                         }
01067                         next->mNext = xferp;
01068                 }
01069                 else
01070                 {
01071                         head = xferp;
01072                 }
01073         }
01074         else
01075         {
01076                 xferp->mNext = head;
01077                 head = xferp;
01078         }
01079 }
01080 
01082 
01084 
01085 LLXferManager *gXferManager = NULL;
01086 
01087 
01088 void start_xfer_manager(LLVFS *vfs)
01089 {
01090         gXferManager = new LLXferManager(vfs);
01091 }
01092 
01093 void cleanup_xfer_manager()
01094 {
01095         if (gXferManager)
01096         {
01097                 delete(gXferManager);
01098                 gXferManager = NULL;
01099         }
01100 }
01101 
01102 void process_confirm_packet (LLMessageSystem *mesgsys, void **user_data)
01103 {
01104         gXferManager->processConfirmation(mesgsys,user_data);
01105 }
01106 
01107 void process_request_xfer(LLMessageSystem *mesgsys, void **user_data)
01108 {
01109         gXferManager->processFileRequest(mesgsys,user_data);
01110 }
01111 
01112 void continue_file_receive(LLMessageSystem *mesgsys, void **user_data)
01113 {
01114 #if LL_TEST_XFER_REXMIT
01115         if (ll_frand() > 0.05f)
01116         {
01117 #endif
01118                 gXferManager->processReceiveData(mesgsys,user_data);
01119 #if LL_TEST_XFER_REXMIT
01120         }
01121         else
01122         {
01123                 cout << "oops! dropped a xfer packet" << endl;
01124         }
01125 #endif
01126 }
01127 
01128 void process_abort_xfer(LLMessageSystem *mesgsys, void **user_data)
01129 {
01130         gXferManager->processAbort(mesgsys,user_data);
01131 }
01132 
01133 
01134 
01135 
01136 
01137 
01138 
01139 
01140 
01141 
01142 
01143 
01144 
01145 
01146 
01147 
01148 
01149 
01150 
01151 
01152 
01153 
01154 
01155 
01156 
01157