llxfermanager.cpp

Go to the documentation of this file.
00001 
00032 #include "linden_common.h"
00033 
00034 #include "llxfermanager.h"
00035 
00036 #include "llxfer.h"
00037 #include "llxfer_file.h"
00038 #include "llxfer_mem.h"
00039 #include "llxfer_vfile.h"
00040 
00041 #include "llerror.h"
00042 #include "lluuid.h"
00043 #include "u64.h"
00044 
00045 const F32 LL_XFER_REGISTRATION_TIMEOUT = 60.0f;  // timeout if a registered transfer hasn't been requested in 60 seconds
00046 const F32 LL_PACKET_TIMEOUT = 3.0f;             // packet timeout at 3 s
00047 const S32 LL_PACKET_RETRY_LIMIT = 10;            // packet retransmission limit
00048 
00049 const S32 LL_DEFAULT_MAX_SIMULTANEOUS_XFERS = 10;
00050 const S32 LL_DEFAULT_MAX_REQUEST_FIFO_XFERS = 1000;
00051 
00052 #define LL_XFER_PROGRESS_MESSAGES 0
00053 #define LL_XFER_TEST_REXMIT       0
00054 
00055 
00057 
00058 LLXferManager::LLXferManager (LLVFS *vfs)
00059 {
00060         init(vfs);
00061 }
00062 
00064 
00065 LLXferManager::~LLXferManager ()
00066 {
00067         free();
00068 }
00069 
00071 
00072 void LLXferManager::init (LLVFS *vfs)
00073 {
00074         mSendList = NULL;
00075         mReceiveList = NULL;
00076 
00077         setMaxOutgoingXfersPerCircuit(LL_DEFAULT_MAX_SIMULTANEOUS_XFERS);
00078         setMaxIncomingXfers(LL_DEFAULT_MAX_REQUEST_FIFO_XFERS);
00079 
00080         mVFS = vfs;
00081 
00082         // Turn on or off ack throttling
00083         mUseAckThrottling = FALSE;
00084         setAckThrottleBPS(100000);
00085 }
00086         
00088 
00089 void LLXferManager::free ()
00090 {
00091         LLXfer *xferp;
00092         LLXfer *delp;
00093         
00094         mOutgoingHosts.deleteAllData();
00095 
00096         delp = mSendList;
00097         while (delp)
00098         {
00099                 xferp = delp->mNext;
00100                 delete delp;
00101                 delp = xferp;
00102         }
00103         mSendList = NULL;
00104 
00105         delp = mReceiveList;
00106         while (delp)
00107         {
00108                 xferp = delp->mNext;
00109                 delete delp;
00110                 delp = xferp;
00111         }
00112         mReceiveList = NULL;
00113 }
00114 
00116 
00117 void LLXferManager::setMaxIncomingXfers(S32 max_num)
00118 {
00119         mMaxIncomingXfers = max_num;
00120 }
00121 
00123 
00124 void LLXferManager::setMaxOutgoingXfersPerCircuit(S32 max_num)
00125 {
00126         mMaxOutgoingXfersPerCircuit = max_num;
00127 }
00128 
00129 void LLXferManager::setUseAckThrottling(const BOOL use)
00130 {
00131         mUseAckThrottling = use;
00132 }
00133 
00134 void LLXferManager::setAckThrottleBPS(const F32 bps)
00135 {
00136         // Let's figure out the min we can set based on the ack retry rate
00137         // and number of simultaneous.
00138 
00139         // Assuming we're running as slow as possible, this is the lowest ack
00140         // rate we can use.
00141         F32 min_bps = (1000.f * 8.f* mMaxIncomingXfers) / LL_PACKET_TIMEOUT;
00142 
00143         // Set
00144         F32 actual_rate = llmax(min_bps*1.1f, bps);
00145         llinfos << "LLXferManager ack throttle min rate: " << min_bps << llendl;
00146         llinfos << "LLXferManager ack throttle actual rate: " << actual_rate << llendl;
00147         mAckThrottle.setRate(actual_rate);
00148 }
00149 
00150 
00152 
00153 void LLXferManager::updateHostStatus()
00154 {
00155     LLXfer *xferp;
00156         LLHostStatus *host_statusp = NULL;
00157 
00158         mOutgoingHosts.deleteAllData();
00159 
00160         for (xferp = mSendList; xferp; xferp = xferp->mNext)
00161         {
00162                 for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData())
00163                 {
00164                         if (host_statusp->mHost == xferp->mRemoteHost)
00165                         {
00166                                 break;
00167                         }
00168                 }
00169                 if (!host_statusp)
00170                 {
00171                         host_statusp = new LLHostStatus();
00172                         if (host_statusp)
00173                         {
00174                                 host_statusp->mHost = xferp->mRemoteHost;
00175                                 mOutgoingHosts.addData(host_statusp);
00176                         }
00177                 }
00178                 if (host_statusp)
00179                 {
00180                         if (xferp->mStatus == e_LL_XFER_PENDING)
00181                         {
00182                                 host_statusp->mNumPending++;
00183                         }
00184                         else if (xferp->mStatus == e_LL_XFER_IN_PROGRESS)
00185                         {
00186                                 host_statusp->mNumActive++;
00187                         }
00188                 }
00189                 
00190         }       
00191 }
00192 
00194 
00195 void LLXferManager::printHostStatus()
00196 {
00197         LLHostStatus *host_statusp = NULL;
00198         if (mOutgoingHosts.getFirstData())
00199         {
00200                 llinfos << "Outgoing Xfers:" << llendl;
00201 
00202                 for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData())
00203                 {
00204                         llinfos << "    " << host_statusp->mHost << "  active: " << host_statusp->mNumActive << "  pending: " << host_statusp->mNumPending << llendl;
00205                 }
00206         }       
00207 }
00208 
00210 
00211 LLXfer *LLXferManager::findXfer (U64 id, LLXfer *list_head)
00212 {
00213     LLXfer *xferp;
00214         for (xferp = list_head; xferp; xferp = xferp->mNext)
00215         {
00216                 if (xferp->mID == id)
00217                 {
00218                         return(xferp);
00219                 }
00220         }
00221         return(NULL);
00222 }
00223 
00224 
00226 
00227 void LLXferManager::removeXfer (LLXfer *delp, LLXfer **list_head)
00228 {
00229         // This function assumes that delp will only occur in the list
00230         // zero or one times.
00231         if (delp)
00232         {
00233                 if (*list_head == delp)
00234                 {
00235                         *list_head = delp->mNext;
00236                         delete (delp);
00237                 }
00238                 else
00239                 {
00240                         LLXfer *xferp = *list_head;
00241                         while (xferp->mNext)
00242                         {
00243                                 if (xferp->mNext == delp)
00244                                 {
00245                                         xferp->mNext = delp->mNext;
00246                                         delete (delp);
00247                                         break;
00248                                 }
00249                                 xferp = xferp->mNext;
00250                         }
00251                 }
00252         }
00253 }
00254 
00256 
00257 U32 LLXferManager::numActiveListEntries(LLXfer *list_head)
00258 {
00259         U32 num_entries = 0;
00260 
00261         while (list_head)
00262         {
00263                 if ((list_head->mStatus == e_LL_XFER_IN_PROGRESS)) 
00264                 {
00265                         num_entries++;
00266                 }
00267                 list_head = list_head->mNext;
00268         }
00269         return(num_entries);
00270 }
00271 
00273 
00274 S32 LLXferManager::numPendingXfers(const LLHost &host)
00275 {
00276         LLHostStatus *host_statusp = NULL;
00277 
00278         for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData())
00279         {
00280                 if (host_statusp->mHost == host)
00281                 {
00282                         return (host_statusp->mNumPending);
00283                 }
00284         }
00285         return 0;
00286 }
00287 
00289 
00290 S32 LLXferManager::numActiveXfers(const LLHost &host)
00291 {
00292         LLHostStatus *host_statusp = NULL;
00293 
00294         for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData())
00295         {
00296                 if (host_statusp->mHost == host)
00297                 {
00298                         return (host_statusp->mNumActive);
00299                 }
00300         }
00301         return 0;
00302 }
00303 
00305 
00306 void LLXferManager::changeNumActiveXfers(const LLHost &host, S32 delta)
00307 {
00308         LLHostStatus *host_statusp = NULL;
00309 
00310         for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData())
00311         {
00312                 if (host_statusp->mHost == host)
00313                 {
00314                         host_statusp->mNumActive += delta;
00315                 }
00316         }
00317 }
00318 
00320 
00321 void LLXferManager::registerCallbacks(LLMessageSystem *msgsystem)
00322 {
00323         msgsystem->setHandlerFuncFast(_PREHASH_ConfirmXferPacket,  process_confirm_packet, NULL);
00324         msgsystem->setHandlerFuncFast(_PREHASH_RequestXfer,        process_request_xfer,        NULL);
00325         msgsystem->setHandlerFuncFast(_PREHASH_SendXferPacket,          continue_file_receive,           NULL);
00326         msgsystem->setHandlerFuncFast(_PREHASH_AbortXfer,               process_abort_xfer,                  NULL);
00327 }
00328 
00330 
00331 U64 LLXferManager::getNextID ()
00332 {
00333         LLUUID a_guid;
00334 
00335         a_guid.generate();
00336 
00337         
00338         return(*((U64*)(a_guid.mData)));
00339 }
00340 
00342 
00343 S32 LLXferManager::encodePacketNum(S32 packet_num, BOOL is_EOF)
00344 {
00345         if (is_EOF)
00346         {
00347                 packet_num |= 0x80000000;
00348         }
00349         return packet_num;
00350 }
00351 
00353 
00354 S32 LLXferManager::decodePacketNum(S32 packet_num)
00355 {
00356         return(packet_num & 0x0FFFFFFF);
00357 }
00358 
00360 
00361 BOOL LLXferManager::isLastPacket(S32 packet_num)
00362 {
00363         return(packet_num & 0x80000000);
00364 }
00365 
00367 
00368 U64 LLXferManager::registerXfer(const void *datap, const S32 length)
00369 {
00370         LLXfer *xferp;
00371         U64 xfer_id = getNextID();
00372 
00373         xferp = (LLXfer *) new LLXfer_Mem();
00374         if (xferp)
00375         {
00376                 xferp->mNext = mSendList;
00377                 mSendList = xferp;
00378 
00379                 xfer_id = ((LLXfer_Mem *)xferp)->registerXfer(xfer_id, datap,length);
00380 
00381                 if (!xfer_id)
00382                 {
00383                         removeXfer(xferp,&mSendList);
00384                 }
00385         }
00386         else
00387         {
00388                 llerrs << "Xfer allocation error" << llendl;
00389                 xfer_id = 0;
00390         }       
00391 
00392     return(xfer_id);
00393 }
00394 
00396 
00397 void LLXferManager::requestFile(const char* local_filename,
00398                                                                 const char* remote_filename,
00399                                                                 ELLPath remote_path,
00400                                                                 const LLHost& remote_host,
00401                                                                 BOOL delete_remote_on_completion,
00402                                                                 void (*callback)(void**,S32,LLExtStat),
00403                                                                 void** user_data,
00404                                                                 BOOL is_priority,
00405                                                                 BOOL use_big_packets)
00406 {
00407         LLXfer *xferp;
00408 
00409         for (xferp = mReceiveList; xferp ; xferp = xferp->mNext)
00410         {
00411                 if (xferp->getXferTypeTag() == LLXfer::XFER_FILE
00412                         && (((LLXfer_File*)xferp)->matchesLocalFilename(local_filename))
00413                         && (((LLXfer_File*)xferp)->matchesRemoteFilename(remote_filename, remote_path))
00414                         && (remote_host == xferp->mRemoteHost)
00415                         && (callback == xferp->mCallback)
00416                         && (user_data == xferp->mCallbackDataHandle))
00417 
00418                 {
00419                         // cout << "requested a xfer already in progress" << endl;
00420                         return;
00421                 }
00422         }
00423 
00424         S32 chunk_size = use_big_packets ? LL_XFER_LARGE_PAYLOAD : -1;
00425         xferp = (LLXfer *) new LLXfer_File(chunk_size);
00426         if (xferp)
00427         {
00428                 addToList(xferp, mReceiveList, is_priority);
00429 
00430                 // Remove any file by the same name that happens to be lying
00431                 // around.
00432                 // Note: according to AaronB, this is here to deal with locks on files that were
00433                 // in transit during a crash,
00434                 if(delete_remote_on_completion &&
00435                    (strstr(remote_filename,".tmp") == &remote_filename[strlen(remote_filename)-4]))             /* Flawfinder : ignore */
00436                 {
00437                         LLFile::remove(local_filename);
00438                 }
00439                 ((LLXfer_File *)xferp)->initializeRequest(
00440                         getNextID(),
00441                         local_filename,
00442                         remote_filename,
00443                         remote_path,
00444                         remote_host,
00445                         delete_remote_on_completion,
00446                         callback,user_data);
00447                 startPendingDownloads();
00448         }
00449         else
00450         {
00451                 llerrs << "Xfer allocation error" << llendl;
00452         }
00453 }
00454 
00455 void LLXferManager::requestFile(const char* remote_filename,
00456                                                                 ELLPath remote_path,
00457                                                                 const LLHost& remote_host,
00458                                                                 BOOL delete_remote_on_completion,
00459                                                                 void (*callback)(void*,S32,void**,S32,LLExtStat),
00460                                                                 void** user_data,
00461                                                                 BOOL is_priority)
00462 {
00463         LLXfer *xferp;
00464 
00465         xferp = (LLXfer *) new LLXfer_Mem();
00466         if (xferp)
00467         {
00468                 addToList(xferp, mReceiveList, is_priority);
00469                 ((LLXfer_Mem *)xferp)->initializeRequest(getNextID(),
00470                                                                                                  remote_filename, 
00471                                                                                                  remote_path,
00472                                                                                                  remote_host,
00473                                                                                                  delete_remote_on_completion,
00474                                                                                                  callback, user_data);
00475                 startPendingDownloads();
00476         }
00477         else
00478         {
00479                 llerrs << "Xfer allocation error" << llendl;
00480         }
00481 }
00482 
00483 void LLXferManager::requestVFile(const LLUUID& local_id,
00484                                                                  const LLUUID& remote_id,
00485                                                                  LLAssetType::EType type, LLVFS* vfs,
00486                                                                  const LLHost& remote_host,
00487                                                                  void (*callback)(void**,S32,LLExtStat),
00488                                                                  void** user_data,
00489                                                                  BOOL is_priority)
00490 {
00491         LLXfer *xferp;
00492 
00493         for (xferp = mReceiveList; xferp ; xferp = xferp->mNext)
00494         {
00495                 if (xferp->getXferTypeTag() == LLXfer::XFER_VFILE
00496                         && (((LLXfer_VFile*)xferp)->matchesLocalFile(local_id, type))
00497                         && (((LLXfer_VFile*)xferp)->matchesRemoteFile(remote_id, type))
00498                         && (remote_host == xferp->mRemoteHost)
00499                         && (callback == xferp->mCallback)
00500                         && (user_data == xferp->mCallbackDataHandle))
00501 
00502                 {
00503                         // cout << "requested a xfer already in progress" << endl;
00504                         return;
00505                 }
00506         }
00507 
00508         xferp = (LLXfer *) new LLXfer_VFile();
00509         if (xferp)
00510         {
00511                 addToList(xferp, mReceiveList, is_priority);
00512                 ((LLXfer_VFile *)xferp)->initializeRequest(getNextID(),
00513                         vfs,
00514                         local_id,
00515                         remote_id,
00516                         type,
00517                         remote_host,
00518                         callback,
00519                         user_data);
00520                 startPendingDownloads();
00521         }
00522         else
00523         {
00524                 llerrs << "Xfer allocation error" << llendl;
00525         }
00526 
00527 }
00528 
00529 /*
00530 void LLXferManager::requestXfer(
00531                                                                 const char *local_filename, 
00532                                                                 BOOL delete_remote_on_completion,
00533                                                                 U64 xfer_id, 
00534                                                                 const LLHost &remote_host, 
00535                                                                 void (*callback)(void **,S32),
00536                                                                 void **user_data)
00537 {
00538         LLXfer *xferp;
00539 
00540         for (xferp = mReceiveList; xferp ; xferp = xferp->mNext)
00541         {
00542                 if (xferp->getXferTypeTag() == LLXfer::XFER_FILE
00543                         && (((LLXfer_File*)xferp)->matchesLocalFilename(local_filename))
00544                         && (xfer_id == xferp->mID)
00545                         && (remote_host == xferp->mRemoteHost)
00546                         && (callback == xferp->mCallback)
00547                         && (user_data == xferp->mCallbackDataHandle))
00548 
00549                 {
00550                         // cout << "requested a xfer already in progress" << endl;
00551                         return;
00552                 }
00553         }
00554 
00555         xferp = (LLXfer *) new LLXfer_File();
00556         if (xferp)
00557         {
00558                 xferp->mNext = mReceiveList;
00559                 mReceiveList = xferp;
00560 
00561                 ((LLXfer_File *)xferp)->initializeRequest(xfer_id,local_filename,"",LL_PATH_NONE,remote_host,delete_remote_on_completion,callback,user_data);
00562                 startPendingDownloads();
00563         }
00564         else
00565         {
00566                 llerrs << "Xfer allcoation error" << llendl;
00567         }
00568 }
00569 
00570 void LLXferManager::requestXfer(U64 xfer_id, const LLHost &remote_host, BOOL delete_remote_on_completion, void (*callback)(void *,S32,void **,S32),void **user_data)
00571 {
00572         LLXfer *xferp;
00573 
00574         xferp = (LLXfer *) new LLXfer_Mem();
00575         if (xferp)
00576         {
00577                 xferp->mNext = mReceiveList;
00578                 mReceiveList = xferp;
00579 
00580                 ((LLXfer_Mem *)xferp)->initializeRequest(xfer_id,"",LL_PATH_NONE,remote_host,delete_remote_on_completion,callback,user_data);
00581                 startPendingDownloads();
00582         }
00583         else
00584         {
00585                 llerrs << "Xfer allcoation error" << llendl;
00586         }
00587 }
00588 */
00590 
00591 void LLXferManager::processReceiveData (LLMessageSystem *mesgsys, void ** /*user_data*/)
00592 {
00593         // there's sometimes an extra 4 bytes added to an xfer payload
00594         const S32 BUF_SIZE = LL_XFER_LARGE_PAYLOAD + 4;
00595         char fdata_buf[LL_XFER_LARGE_PAYLOAD + 4];              /* Flawfinder : ignore */
00596         S32 fdata_size;
00597         U64 id;
00598         S32 packetnum;
00599         LLXfer * xferp;
00600         
00601         mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
00602         mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Packet, packetnum);
00603 
00604         fdata_size = mesgsys->getSizeFast(_PREHASH_DataPacket,_PREHASH_Data);
00605         mesgsys->getBinaryDataFast(_PREHASH_DataPacket, _PREHASH_Data, fdata_buf, 0, 0, BUF_SIZE);
00606 
00607         xferp = findXfer(id, mReceiveList);
00608 
00609         if (!xferp) 
00610         {
00611                 char U64_BUF[MAX_STRING];               /* Flawfinder : ignore */
00612                 llwarns << "received xfer data from " << mesgsys->getSender()
00613                         << " for non-existent xfer id: "
00614                         << U64_to_str(id, U64_BUF, sizeof(U64_BUF)) << llendl;
00615                 return;
00616         }
00617 
00618         S32 xfer_size;
00619 
00620         if (decodePacketNum(packetnum) != xferp->mPacketNum) // is the packet different from what we were expecting?
00621         {
00622                 // confirm it if it was a resend of the last one, since the confirmation might have gotten dropped
00623                 if (decodePacketNum(packetnum) == (xferp->mPacketNum - 1))
00624                 {
00625                         llinfos << "Reconfirming xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " packet " << packetnum << llendl;                  sendConfirmPacket(mesgsys, id, decodePacketNum(packetnum), mesgsys->getSender());
00626                 }
00627                 else
00628                 {
00629                         llinfos << "Ignoring xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " recv'd packet " << packetnum << "; expecting " << xferp->mPacketNum << llendl;
00630                 }
00631                 return;         
00632         }
00633 
00634         S32 result = 0;
00635 
00636         if (xferp->mPacketNum == 0) // first packet has size encoded as additional S32 at beginning of data
00637         {
00638                 ntohmemcpy(&xfer_size,fdata_buf,MVT_S32,sizeof(S32));
00639                 
00640 // do any necessary things on first packet ie. allocate memory
00641                 xferp->setXferSize(xfer_size);
00642 
00643                 // adjust buffer start and size
00644                 result = xferp->receiveData(&(fdata_buf[sizeof(S32)]),fdata_size-(sizeof(S32)));
00645         }
00646         else
00647         {
00648                 result = xferp->receiveData(fdata_buf,fdata_size);
00649         }
00650         
00651         if (result == LL_ERR_CANNOT_OPEN_FILE)
00652         {
00653                         xferp->abort(LL_ERR_CANNOT_OPEN_FILE);
00654                         removeXfer(xferp,&mReceiveList);
00655                         startPendingDownloads();
00656                         return;         
00657         }
00658 
00659         xferp->mPacketNum++;  // expect next packet
00660 
00661         if (!mUseAckThrottling)
00662         {
00663                 // No throttling, confirm right away
00664                 sendConfirmPacket(mesgsys, id, decodePacketNum(packetnum), mesgsys->getSender());
00665         }
00666         else
00667         {
00668                 // Throttling, put on queue to be confirmed later.
00669                 LLXferAckInfo ack_info;
00670                 ack_info.mID = id;
00671                 ack_info.mPacketNum = decodePacketNum(packetnum);
00672                 ack_info.mRemoteHost = mesgsys->getSender();
00673                 mXferAckQueue.push(ack_info);
00674         }
00675 
00676         if (isLastPacket(packetnum))
00677         {
00678                 xferp->processEOF();
00679                 removeXfer(xferp,&mReceiveList);
00680                 startPendingDownloads();
00681         }
00682 }
00683 
00685 
00686 void LLXferManager::sendConfirmPacket (LLMessageSystem *mesgsys, U64 id, S32 packetnum, const LLHost &remote_host)
00687 {
00688 #if LL_XFER_PROGRESS_MESSAGES
00689         if (!(packetnum % 50))
00690         {
00691                 cout << "confirming xfer packet #" << packetnum << endl;
00692         }
00693 #endif
00694         mesgsys->newMessageFast(_PREHASH_ConfirmXferPacket);
00695         mesgsys->nextBlockFast(_PREHASH_XferID);
00696         mesgsys->addU64Fast(_PREHASH_ID, id);
00697         mesgsys->addU32Fast(_PREHASH_Packet, packetnum);
00698 
00699         mesgsys->sendMessage(remote_host);
00700 }
00701 
00703 
00704 void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user_data*/)
00705 {
00706                 
00707         U64 id;
00708         char local_filename[MAX_STRING];                /* Flawfinder : ignore */
00709         ELLPath local_path = LL_PATH_NONE;
00710         S32 result = LL_ERR_NOERR;
00711         LLUUID  uuid;
00712         LLAssetType::EType type;
00713         S16 type_s16;
00714         BOOL b_use_big_packets;
00715 
00716         mesgsys->getBOOL("XferID", "UseBigPackets", b_use_big_packets);
00717         
00718         mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
00719         char U64_BUF[MAX_STRING];               /* Flawfinder : ignore */
00720         llinfos << "xfer request id: " << U64_to_str(id, U64_BUF, sizeof(U64_BUF))
00721                    << " to " << mesgsys->getSender() << llendl;
00722 
00723         mesgsys->getStringFast(_PREHASH_XferID, _PREHASH_Filename, MAX_STRING, local_filename);
00724         
00725         U8 local_path_u8;
00726         mesgsys->getU8("XferID", "FilePath", local_path_u8);
00727         if( local_path_u8 < (U8)LL_PATH_COUNT )
00728         {
00729                 local_path = (ELLPath)local_path_u8;
00730         }
00731         else
00732         {
00733                 llwarns << "Invalid file path in LLXferManager::processFileRequest() " << (U32)local_path_u8 << llendl;
00734         }
00735 
00736         mesgsys->getUUIDFast(_PREHASH_XferID, _PREHASH_VFileID, uuid);
00737         mesgsys->getS16Fast(_PREHASH_XferID, _PREHASH_VFileType, type_s16);
00738         type = (LLAssetType::EType)type_s16;
00739 
00740         LLXfer *xferp;
00741 
00742         if (uuid != LLUUID::null)
00743         {
00744                 if(NULL == LLAssetType::lookup(type))
00745                 {
00746                         llwarns << "Invalid type for xfer request: " << uuid << ":"
00747                                         << type_s16 << " to " << mesgsys->getSender() << llendl;
00748                         return;
00749                 }
00750                         
00751                 llinfos << "starting vfile transfer: " << uuid << "," << LLAssetType::lookup(type) << " to " << mesgsys->getSender() << llendl;
00752 
00753                 if (! mVFS)
00754                 {
00755                         llwarns << "Attempt to send VFile w/o available VFS" << llendl;
00756                         return;
00757                 }
00758 
00759                 xferp = (LLXfer *)new LLXfer_VFile(mVFS, uuid, type);
00760                 if (xferp)
00761                 {
00762                         xferp->mNext = mSendList;
00763                         mSendList = xferp;      
00764                         result = xferp->startSend(id,mesgsys->getSender());
00765                 }
00766                 else
00767                 {
00768                         llerrs << "Xfer allcoation error" << llendl;
00769                 }
00770         }
00771         else if (strlen(local_filename))                /* Flawfinder : ignore */
00772         {
00773                 std::string expanded_filename = gDirUtilp->getExpandedFilename( local_path, local_filename );
00774                 llinfos << "starting file transfer: " <<  expanded_filename << " to " << mesgsys->getSender() << llendl;
00775 
00776                 BOOL delete_local_on_completion = FALSE;
00777                 mesgsys->getBOOL("XferID", "DeleteOnCompletion", delete_local_on_completion);
00778 
00779                 // -1 chunk_size causes it to use the default
00780                 xferp = (LLXfer *)new LLXfer_File(expanded_filename, delete_local_on_completion, b_use_big_packets ? LL_XFER_LARGE_PAYLOAD : -1);
00781                 
00782                 if (xferp)
00783                 {
00784                         xferp->mNext = mSendList;
00785                         mSendList = xferp;      
00786                         result = xferp->startSend(id,mesgsys->getSender());
00787                 }
00788                 else
00789                 {
00790                         llerrs << "Xfer allcoation error" << llendl;
00791                 }
00792         }
00793         else
00794         {
00795                 char U64_BUF[MAX_STRING];               /* Flawfinder : ignore */
00796                 llinfos << "starting memory transfer: "
00797                         << U64_to_str(id, U64_BUF, sizeof(U64_BUF)) << " to "
00798                         << mesgsys->getSender() << llendl;
00799 
00800                 xferp = findXfer(id, mSendList);
00801                 
00802                 if (xferp)
00803                 {
00804                         result = xferp->startSend(id,mesgsys->getSender());
00805                 }
00806                 else
00807                 {
00808                         llinfos << "Warning: " << U64_BUF << " not found." << llendl;
00809                         result = LL_ERR_FILE_NOT_FOUND;
00810                 }
00811         }
00812 
00813         if (result)
00814         {
00815                 if (xferp)
00816                 {
00817                         xferp->abort(result);
00818                         removeXfer(xferp,&mSendList);
00819                 }
00820                 else // can happen with a memory transfer not found
00821                 {
00822                         llinfos << "Aborting xfer to " << mesgsys->getSender() << " with error: " << result << llendl;
00823 
00824                         mesgsys->newMessageFast(_PREHASH_AbortXfer);
00825                         mesgsys->nextBlockFast(_PREHASH_XferID);
00826                         mesgsys->addU64Fast(_PREHASH_ID, id);
00827                         mesgsys->addS32Fast(_PREHASH_Result, result);
00828         
00829                         mesgsys->sendMessage(mesgsys->getSender());             
00830                 }
00831         }
00832         else if(xferp && (numActiveXfers(xferp->mRemoteHost) < mMaxOutgoingXfersPerCircuit))
00833         {
00834                 xferp->sendNextPacket();
00835                 changeNumActiveXfers(xferp->mRemoteHost,1);
00836 //              llinfos << "***STARTING XFER IMMEDIATELY***" << llendl;
00837         }
00838         else
00839         {
00840                 if(xferp)
00841                 {
00842                         llinfos << "  queueing xfer request, " << numPendingXfers(xferp->mRemoteHost) << " ahead of this one" << llendl;
00843                 }
00844                 else
00845                 {
00846                         llwarns << "LLXferManager::processFileRequest() - no xfer found!"
00847                                         << llendl;
00848                 }
00849         }
00850 }
00851 
00853 
00854 void LLXferManager::processConfirmation (LLMessageSystem *mesgsys, void ** /*user_data*/)
00855 {
00856         U64 id = 0;
00857         S32 packetNum = 0;
00858 
00859         mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
00860         mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Packet, packetNum);
00861 
00862         LLXfer* xferp = findXfer(id, mSendList);
00863         if (xferp)
00864         {
00865 //              cout << "confirmed packet #" << packetNum << " ping: "<< xferp->ACKTimer.getElapsedTimeF32() <<  endl;
00866                 xferp->mWaitingForACK = FALSE;
00867                 if (xferp->mStatus == e_LL_XFER_IN_PROGRESS)
00868                 {
00869                         xferp->sendNextPacket();
00870                 }
00871                 else
00872                 {
00873                         removeXfer(xferp, &mSendList);
00874                 }
00875         }
00876 }
00877 
00879 
00880 void LLXferManager::retransmitUnackedPackets ()
00881 {
00882         LLXfer *xferp;
00883         LLXfer *delp;
00884         xferp = mReceiveList;
00885         while(xferp)
00886         {
00887                 if (xferp->mStatus == e_LL_XFER_IN_PROGRESS)
00888                 {
00889                         // if the circuit dies, abort
00890                         if (! gMessageSystem->mCircuitInfo.isCircuitAlive( xferp->mRemoteHost ))
00891                         {
00892                                 llinfos << "Xfer found in progress on dead circuit, aborting" << llendl;
00893                                 xferp->mCallbackResult = LL_ERR_CIRCUIT_GONE;
00894                                 xferp->processEOF();
00895                                 delp = xferp;
00896                                 xferp = xferp->mNext;
00897                                 removeXfer(delp,&mReceiveList);
00898                                 continue;
00899                         }
00900                                 
00901                 }
00902                 xferp = xferp->mNext;
00903         }
00904 
00905         xferp = mSendList; 
00906         updateHostStatus();
00907         F32 et;
00908         while (xferp)
00909         {
00910                 if (xferp->mWaitingForACK && ( (et = xferp->ACKTimer.getElapsedTimeF32()) > LL_PACKET_TIMEOUT))
00911                 {
00912                         if (xferp->mRetries > LL_PACKET_RETRY_LIMIT)
00913                         {
00914                                 llinfos << "dropping xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " packet retransmit limit exceeded, xfer dropped" << llendl;
00915                                 xferp->abort(LL_ERR_TCP_TIMEOUT);
00916                                 delp = xferp;
00917                                 xferp = xferp->mNext;
00918                                 removeXfer(delp,&mSendList);
00919                         }
00920                         else
00921                         {
00922                                 llinfos << "resending xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " packet unconfirmed after: "<< et << " sec, packet " << xferp->mPacketNum << llendl;
00923                                 xferp->resendLastPacket();
00924                                 xferp = xferp->mNext;
00925                         }
00926                 }
00927                 else if ((xferp->mStatus == e_LL_XFER_REGISTERED) && ( (et = xferp->ACKTimer.getElapsedTimeF32()) > LL_XFER_REGISTRATION_TIMEOUT))
00928                 {
00929                         llinfos << "registered xfer never requested, xfer dropped" << llendl;
00930                         xferp->abort(LL_ERR_TCP_TIMEOUT);
00931                         delp = xferp;
00932                         xferp = xferp->mNext;
00933                         removeXfer(delp,&mSendList);
00934                 }
00935                 else if (xferp->mStatus == e_LL_XFER_ABORTED)
00936                 {
00937                         llwarns << "Removing aborted xfer " << xferp->mRemoteHost << ":" << xferp->getName() << llendl;
00938                         delp = xferp;
00939                         xferp = xferp->mNext;
00940                         removeXfer(delp,&mSendList);
00941                 }
00942                 else if (xferp->mStatus == e_LL_XFER_PENDING)
00943                 {
00944 //                      llinfos << "*** numActiveXfers = " << numActiveXfers(xferp->mRemoteHost) << "        mMaxOutgoingXfersPerCircuit = " << mMaxOutgoingXfersPerCircuit << llendl;   
00945                         if (numActiveXfers(xferp->mRemoteHost) < mMaxOutgoingXfersPerCircuit)
00946                         {
00947 //                          llinfos << "bumping pending xfer to active" << llendl;
00948                                 xferp->sendNextPacket();
00949                                 changeNumActiveXfers(xferp->mRemoteHost,1);
00950                         }                       
00951                         xferp = xferp->mNext;
00952                 }
00953                 else
00954                 {
00955                         xferp = xferp->mNext;
00956                 }
00957         }
00958 
00959         //
00960         // HACK - if we're using xfer confirm throttling, throttle our xfer confirms here
00961         // so we don't blow through bandwidth.
00962         //
00963 
00964         while (mXferAckQueue.getLength())
00965         {
00966                 if (mAckThrottle.checkOverflow(1000.0f*8.0f))
00967                 {
00968                         break;
00969                 }
00970                 //llinfos << "Confirm packet queue length:" << mXferAckQueue.getLength() << llendl;
00971                 LLXferAckInfo ack_info;
00972                 mXferAckQueue.pop(ack_info);
00973                 //llinfos << "Sending confirm packet" << llendl;
00974                 sendConfirmPacket(gMessageSystem, ack_info.mID, ack_info.mPacketNum, ack_info.mRemoteHost);
00975                 mAckThrottle.throttleOverflow(1000.f*8.f); // Assume 1000 bytes/packet
00976         }
00977 }
00978 
00979 
00981 
00982 void LLXferManager::processAbort (LLMessageSystem *mesgsys, void ** /*user_data*/)
00983 {
00984         U64 id = 0;
00985         S32 result_code = 0;
00986         LLXfer * xferp;
00987 
00988         mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
00989         mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Result, result_code);
00990 
00991         xferp = findXfer(id, mReceiveList);
00992         if (xferp)
00993         {
00994                 xferp->mCallbackResult = result_code;
00995                 xferp->processEOF();
00996                 removeXfer(xferp, &mReceiveList);
00997                 startPendingDownloads();
00998         }
00999 }
01000 
01002 
01003 void LLXferManager::startPendingDownloads()
01004 {
01005         // This method goes through the list, and starts pending
01006         // operations until active downloads == mMaxIncomingXfers. I copy
01007         // the pending xfers into a temporary data structure because the
01008         // xfers are stored as an intrusive linked list where older
01009         // requests get pushed toward the back. Thus, if we didn't do a
01010         // stateful iteration, it would be possible for old requests to
01011         // never start.
01012         LLXfer* xferp = mReceiveList;
01013         LLLinkedList<LLXfer> pending_downloads;
01014         S32 download_count = 0;
01015         S32 pending_count = 0;
01016         while(xferp)
01017         {
01018                 if(xferp->mStatus == e_LL_XFER_PENDING)
01019                 {
01020                         ++pending_count; // getLength() is O(N), so track it here.
01021                         pending_downloads.addData(xferp);
01022                 }
01023                 else if(xferp->mStatus == e_LL_XFER_IN_PROGRESS)
01024                 {
01025                         ++download_count;
01026                 }
01027                 xferp = xferp->mNext;
01028         }
01029 
01030         S32 start_count = mMaxIncomingXfers - download_count;
01031 
01032         lldebugs << "LLXferManager::startPendingDownloads() - XFER_IN_PROGRESS: "
01033                          << download_count << " XFER_PENDING: " << pending_count
01034                          << " startring " << llmin(start_count, pending_count) << llendl;
01035 
01036         if((start_count > 0) && (pending_count > 0))
01037         {
01038                 S32 result;
01039                 xferp = pending_downloads.getFirstData();
01040                 while(start_count-- && xferp)
01041                 {
01042                         result = xferp->startDownload();
01043                         if(result)
01044                         {
01045                                 xferp->abort(result);
01046                                 ++start_count;
01047                         }
01048                         xferp = pending_downloads.getNextData();
01049                 }
01050         }
01051 }
01052 
01054 
01055 void LLXferManager::addToList(LLXfer* xferp, LLXfer*& head, BOOL is_priority)
01056 {
01057         if(is_priority)
01058         {
01059                 xferp->mNext = NULL;
01060                 LLXfer* next = head;
01061                 if(next)
01062                 {
01063                         while(next->mNext)
01064                         {
01065                                 next = next->mNext;
01066                         }
01067                         next->mNext = xferp;
01068                 }
01069                 else
01070                 {
01071                         head = xferp;
01072                 }
01073         }
01074         else
01075         {
01076                 xferp->mNext = head;
01077                 head = xferp;
01078         }
01079 }
01080 
01082 //  Globals and C routines
01084 
01085 LLXferManager *gXferManager = NULL;
01086 
01087 
01088 void start_xfer_manager(LLVFS *vfs)
01089 {
01090         gXferManager = new LLXferManager(vfs);
01091 }
01092 
01093 void cleanup_xfer_manager()
01094 {
01095         if (gXferManager)
01096         {
01097                 delete(gXferManager);
01098                 gXferManager = NULL;
01099         }
01100 }
01101 
01102 void process_confirm_packet (LLMessageSystem *mesgsys, void **user_data)
01103 {
01104         gXferManager->processConfirmation(mesgsys,user_data);
01105 }
01106 
01107 void process_request_xfer(LLMessageSystem *mesgsys, void **user_data)
01108 {
01109         gXferManager->processFileRequest(mesgsys,user_data);
01110 }
01111 
01112 void continue_file_receive(LLMessageSystem *mesgsys, void **user_data)
01113 {
01114 #if LL_TEST_XFER_REXMIT
01115         if (ll_frand() > 0.05f)
01116         {
01117 #endif
01118                 gXferManager->processReceiveData(mesgsys,user_data);
01119 #if LL_TEST_XFER_REXMIT
01120         }
01121         else
01122         {
01123                 cout << "oops! dropped a xfer packet" << endl;
01124         }
01125 #endif
01126 }
01127 
01128 void process_abort_xfer(LLMessageSystem *mesgsys, void **user_data)
01129 {
01130         gXferManager->processAbort(mesgsys,user_data);
01131 }
01132 
01133 
01134 
01135 
01136 
01137 
01138 
01139 
01140 
01141 
01142 
01143 
01144 
01145 
01146 
01147 
01148 
01149 
01150 
01151 
01152 
01153 
01154 
01155 
01156 
01157 

Generated on Thu Jul 1 06:09:48 2010 for Second Life Viewer by  doxygen 1.4.7