llxfermanager.cpp

Go to the documentation of this file.
00001 
00032 #include "linden_common.h"
00033 
00034 #include "llxfermanager.h"
00035 
00036 #include "llxfer.h"
00037 #include "llxfer_file.h"
00038 #include "llxfer_mem.h"
00039 #include "llxfer_vfile.h"
00040 
00041 #include "llerror.h"
00042 #include "lluuid.h"
00043 #include "u64.h"
00044 
00045 const F32 LL_XFER_REGISTRATION_TIMEOUT = 60.0f;  // timeout if a registered transfer hasn't been requested in 60 seconds
00046 const F32 LL_PACKET_TIMEOUT = 3.0f;             // packet timeout at 3 s
00047 const S32 LL_PACKET_RETRY_LIMIT = 10;            // packet retransmission limit
00048 
00049 const S32 LL_DEFAULT_MAX_SIMULTANEOUS_XFERS = 10;
00050 const S32 LL_DEFAULT_MAX_REQUEST_FIFO_XFERS = 1000;
00051 
00052 #define LL_XFER_PROGRESS_MESSAGES 0
00053 #define LL_XFER_TEST_REXMIT       0
00054 
00055 
00057 
00058 LLXferManager::LLXferManager (LLVFS *vfs)
00059 {
00060         init(vfs);
00061 }
00062 
00064 
00065 LLXferManager::~LLXferManager ()
00066 {
00067         free();
00068 }
00069 
00071 
00072 void LLXferManager::init (LLVFS *vfs)
00073 {
00074         mSendList = NULL;
00075         mReceiveList = NULL;
00076 
00077         setMaxOutgoingXfersPerCircuit(LL_DEFAULT_MAX_SIMULTANEOUS_XFERS);
00078         setMaxIncomingXfers(LL_DEFAULT_MAX_REQUEST_FIFO_XFERS);
00079 
00080         mVFS = vfs;
00081 
00082         // Turn on or off ack throttling
00083         mUseAckThrottling = FALSE;
00084         setAckThrottleBPS(100000);
00085 }
00086         
00088 
00089 void LLXferManager::free ()
00090 {
00091         LLXfer *xferp;
00092         LLXfer *delp;
00093 
00094         for_each(mOutgoingHosts.begin(), mOutgoingHosts.end(), DeletePointer());
00095         mOutgoingHosts.clear();
00096 
00097         delp = mSendList;
00098         while (delp)
00099         {
00100                 xferp = delp->mNext;
00101                 delete delp;
00102                 delp = xferp;
00103         }
00104         mSendList = NULL;
00105 
00106         delp = mReceiveList;
00107         while (delp)
00108         {
00109                 xferp = delp->mNext;
00110                 delete delp;
00111                 delp = xferp;
00112         }
00113         mReceiveList = NULL;
00114 }
00115 
00117 
00118 void LLXferManager::setMaxIncomingXfers(S32 max_num)
00119 {
00120         mMaxIncomingXfers = max_num;
00121 }
00122 
00124 
00125 void LLXferManager::setMaxOutgoingXfersPerCircuit(S32 max_num)
00126 {
00127         mMaxOutgoingXfersPerCircuit = max_num;
00128 }
00129 
00130 void LLXferManager::setUseAckThrottling(const BOOL use)
00131 {
00132         mUseAckThrottling = use;
00133 }
00134 
00135 void LLXferManager::setAckThrottleBPS(const F32 bps)
00136 {
00137         // Let's figure out the min we can set based on the ack retry rate
00138         // and number of simultaneous.
00139 
00140         // Assuming we're running as slow as possible, this is the lowest ack
00141         // rate we can use.
00142         F32 min_bps = (1000.f * 8.f* mMaxIncomingXfers) / LL_PACKET_TIMEOUT;
00143 
00144         // Set
00145         F32 actual_rate = llmax(min_bps*1.1f, bps);
00146         LL_DEBUGS("AppInit") << "LLXferManager ack throttle min rate: " << min_bps << LL_ENDL;
00147         LL_DEBUGS("AppInit") << "LLXferManager ack throttle actual rate: " << actual_rate << LL_ENDL;
00148         mAckThrottle.setRate(actual_rate);
00149 }
00150 
00151 
00153 
00154 void LLXferManager::updateHostStatus()
00155 {
00156     LLXfer *xferp;
00157         LLHostStatus *host_statusp = NULL;
00158 
00159         for_each(mOutgoingHosts.begin(), mOutgoingHosts.end(), DeletePointer());
00160         mOutgoingHosts.clear();
00161 
00162         for (xferp = mSendList; xferp; xferp = xferp->mNext)
00163         {
00164                 for (status_list_t::iterator iter = mOutgoingHosts.begin();
00165                          iter != mOutgoingHosts.end(); ++iter)
00166                 {
00167                         host_statusp = *iter;
00168                         if (host_statusp->mHost == xferp->mRemoteHost)
00169                         {
00170                                 break;
00171                         }
00172                 }
00173                 if (!host_statusp)
00174                 {
00175                         host_statusp = new LLHostStatus();
00176                         if (host_statusp)
00177                         {
00178                                 host_statusp->mHost = xferp->mRemoteHost;
00179                                 mOutgoingHosts.push_front(host_statusp);
00180                         }
00181                 }
00182                 if (host_statusp)
00183                 {
00184                         if (xferp->mStatus == e_LL_XFER_PENDING)
00185                         {
00186                                 host_statusp->mNumPending++;
00187                         }
00188                         else if (xferp->mStatus == e_LL_XFER_IN_PROGRESS)
00189                         {
00190                                 host_statusp->mNumActive++;
00191                         }
00192                 }
00193                 
00194         }       
00195 }
00196 
00198 
00199 void LLXferManager::printHostStatus()
00200 {
00201         LLHostStatus *host_statusp = NULL;
00202         if (!mOutgoingHosts.empty())
00203         {
00204                 llinfos << "Outgoing Xfers:" << llendl;
00205 
00206                 for (status_list_t::iterator iter = mOutgoingHosts.begin();
00207                          iter != mOutgoingHosts.end(); ++iter)
00208                 {
00209                         host_statusp = *iter;
00210                         llinfos << "    " << host_statusp->mHost << "  active: " << host_statusp->mNumActive << "  pending: " << host_statusp->mNumPending << llendl;
00211                 }
00212         }       
00213 }
00214 
00216 
00217 LLXfer *LLXferManager::findXfer (U64 id, LLXfer *list_head)
00218 {
00219     LLXfer *xferp;
00220         for (xferp = list_head; xferp; xferp = xferp->mNext)
00221         {
00222                 if (xferp->mID == id)
00223                 {
00224                         return(xferp);
00225                 }
00226         }
00227         return(NULL);
00228 }
00229 
00230 
00232 
00233 void LLXferManager::removeXfer (LLXfer *delp, LLXfer **list_head)
00234 {
00235         // This function assumes that delp will only occur in the list
00236         // zero or one times.
00237         if (delp)
00238         {
00239                 if (*list_head == delp)
00240                 {
00241                         *list_head = delp->mNext;
00242                         delete (delp);
00243                 }
00244                 else
00245                 {
00246                         LLXfer *xferp = *list_head;
00247                         while (xferp->mNext)
00248                         {
00249                                 if (xferp->mNext == delp)
00250                                 {
00251                                         xferp->mNext = delp->mNext;
00252                                         delete (delp);
00253                                         break;
00254                                 }
00255                                 xferp = xferp->mNext;
00256                         }
00257                 }
00258         }
00259 }
00260 
00262 
00263 U32 LLXferManager::numActiveListEntries(LLXfer *list_head)
00264 {
00265         U32 num_entries = 0;
00266 
00267         while (list_head)
00268         {
00269                 if ((list_head->mStatus == e_LL_XFER_IN_PROGRESS)) 
00270                 {
00271                         num_entries++;
00272                 }
00273                 list_head = list_head->mNext;
00274         }
00275         return(num_entries);
00276 }
00277 
00279 
00280 S32 LLXferManager::numPendingXfers(const LLHost &host)
00281 {
00282         LLHostStatus *host_statusp = NULL;
00283 
00284         for (status_list_t::iterator iter = mOutgoingHosts.begin();
00285                  iter != mOutgoingHosts.end(); ++iter)
00286         {
00287                 host_statusp = *iter;
00288                 if (host_statusp->mHost == host)
00289                 {
00290                         return (host_statusp->mNumPending);
00291                 }
00292         }
00293         return 0;
00294 }
00295 
00297 
00298 S32 LLXferManager::numActiveXfers(const LLHost &host)
00299 {
00300         LLHostStatus *host_statusp = NULL;
00301 
00302         for (status_list_t::iterator iter = mOutgoingHosts.begin();
00303                  iter != mOutgoingHosts.end(); ++iter)
00304         {
00305                 host_statusp = *iter;
00306                 if (host_statusp->mHost == host)
00307                 {
00308                         return (host_statusp->mNumActive);
00309                 }
00310         }
00311         return 0;
00312 }
00313 
00315 
00316 void LLXferManager::changeNumActiveXfers(const LLHost &host, S32 delta)
00317 {
00318         LLHostStatus *host_statusp = NULL;
00319 
00320         for (status_list_t::iterator iter = mOutgoingHosts.begin();
00321                  iter != mOutgoingHosts.end(); ++iter)
00322         {
00323                 host_statusp = *iter;
00324                 if (host_statusp->mHost == host)
00325                 {
00326                         host_statusp->mNumActive += delta;
00327                 }
00328         }
00329 }
00330 
00332 
00333 void LLXferManager::registerCallbacks(LLMessageSystem *msgsystem)
00334 {
00335         msgsystem->setHandlerFuncFast(_PREHASH_ConfirmXferPacket,  process_confirm_packet, NULL);
00336         msgsystem->setHandlerFuncFast(_PREHASH_RequestXfer,        process_request_xfer,        NULL);
00337         msgsystem->setHandlerFuncFast(_PREHASH_SendXferPacket,          continue_file_receive,           NULL);
00338         msgsystem->setHandlerFuncFast(_PREHASH_AbortXfer,               process_abort_xfer,                  NULL);
00339 }
00340 
00342 
00343 U64 LLXferManager::getNextID ()
00344 {
00345         LLUUID a_guid;
00346 
00347         a_guid.generate();
00348 
00349         
00350         return(*((U64*)(a_guid.mData)));
00351 }
00352 
00354 
00355 S32 LLXferManager::encodePacketNum(S32 packet_num, BOOL is_EOF)
00356 {
00357         if (is_EOF)
00358         {
00359                 packet_num |= 0x80000000;
00360         }
00361         return packet_num;
00362 }
00363 
00365 
00366 S32 LLXferManager::decodePacketNum(S32 packet_num)
00367 {
00368         return(packet_num & 0x0FFFFFFF);
00369 }
00370 
00372 
00373 BOOL LLXferManager::isLastPacket(S32 packet_num)
00374 {
00375         return(packet_num & 0x80000000);
00376 }
00377 
00379 
00380 U64 LLXferManager::registerXfer(const void *datap, const S32 length)
00381 {
00382         LLXfer *xferp;
00383         U64 xfer_id = getNextID();
00384 
00385         xferp = (LLXfer *) new LLXfer_Mem();
00386         if (xferp)
00387         {
00388                 xferp->mNext = mSendList;
00389                 mSendList = xferp;
00390 
00391                 xfer_id = ((LLXfer_Mem *)xferp)->registerXfer(xfer_id, datap,length);
00392 
00393                 if (!xfer_id)
00394                 {
00395                         removeXfer(xferp,&mSendList);
00396                 }
00397         }
00398         else
00399         {
00400                 llerrs << "Xfer allocation error" << llendl;
00401                 xfer_id = 0;
00402         }       
00403 
00404     return(xfer_id);
00405 }
00406 
00408 
00409 void LLXferManager::requestFile(const char* local_filename,
00410                                                                 const char* remote_filename,
00411                                                                 ELLPath remote_path,
00412                                                                 const LLHost& remote_host,
00413                                                                 BOOL delete_remote_on_completion,
00414                                                                 void (*callback)(void**,S32,LLExtStat),
00415                                                                 void** user_data,
00416                                                                 BOOL is_priority,
00417                                                                 BOOL use_big_packets)
00418 {
00419         LLXfer *xferp;
00420 
00421         for (xferp = mReceiveList; xferp ; xferp = xferp->mNext)
00422         {
00423                 if (xferp->getXferTypeTag() == LLXfer::XFER_FILE
00424                         && (((LLXfer_File*)xferp)->matchesLocalFilename(local_filename))
00425                         && (((LLXfer_File*)xferp)->matchesRemoteFilename(remote_filename, remote_path))
00426                         && (remote_host == xferp->mRemoteHost)
00427                         && (callback == xferp->mCallback)
00428                         && (user_data == xferp->mCallbackDataHandle))
00429 
00430                 {
00431                         // cout << "requested a xfer already in progress" << endl;
00432                         return;
00433                 }
00434         }
00435 
00436         S32 chunk_size = use_big_packets ? LL_XFER_LARGE_PAYLOAD : -1;
00437         xferp = (LLXfer *) new LLXfer_File(chunk_size);
00438         if (xferp)
00439         {
00440                 addToList(xferp, mReceiveList, is_priority);
00441 
00442                 // Remove any file by the same name that happens to be lying
00443                 // around.
00444                 // Note: according to AaronB, this is here to deal with locks on files that were
00445                 // in transit during a crash,
00446                 if(delete_remote_on_completion &&
00447                    (strstr(remote_filename,".tmp") == &remote_filename[strlen(remote_filename)-4]))             /* Flawfinder : ignore */
00448                 {
00449                         LLFile::remove(local_filename);
00450                 }
00451                 ((LLXfer_File *)xferp)->initializeRequest(
00452                         getNextID(),
00453                         local_filename,
00454                         remote_filename,
00455                         remote_path,
00456                         remote_host,
00457                         delete_remote_on_completion,
00458                         callback,user_data);
00459                 startPendingDownloads();
00460         }
00461         else
00462         {
00463                 llerrs << "Xfer allocation error" << llendl;
00464         }
00465 }
00466 
00467 void LLXferManager::requestFile(const char* remote_filename,
00468                                                                 ELLPath remote_path,
00469                                                                 const LLHost& remote_host,
00470                                                                 BOOL delete_remote_on_completion,
00471                                                                 void (*callback)(void*,S32,void**,S32,LLExtStat),
00472                                                                 void** user_data,
00473                                                                 BOOL is_priority)
00474 {
00475         LLXfer *xferp;
00476 
00477         xferp = (LLXfer *) new LLXfer_Mem();
00478         if (xferp)
00479         {
00480                 addToList(xferp, mReceiveList, is_priority);
00481                 ((LLXfer_Mem *)xferp)->initializeRequest(getNextID(),
00482                                                                                                  remote_filename, 
00483                                                                                                  remote_path,
00484                                                                                                  remote_host,
00485                                                                                                  delete_remote_on_completion,
00486                                                                                                  callback, user_data);
00487                 startPendingDownloads();
00488         }
00489         else
00490         {
00491                 llerrs << "Xfer allocation error" << llendl;
00492         }
00493 }
00494 
00495 void LLXferManager::requestVFile(const LLUUID& local_id,
00496                                                                  const LLUUID& remote_id,
00497                                                                  LLAssetType::EType type, LLVFS* vfs,
00498                                                                  const LLHost& remote_host,
00499                                                                  void (*callback)(void**,S32,LLExtStat),
00500                                                                  void** user_data,
00501                                                                  BOOL is_priority)
00502 {
00503         LLXfer *xferp;
00504 
00505         for (xferp = mReceiveList; xferp ; xferp = xferp->mNext)
00506         {
00507                 if (xferp->getXferTypeTag() == LLXfer::XFER_VFILE
00508                         && (((LLXfer_VFile*)xferp)->matchesLocalFile(local_id, type))
00509                         && (((LLXfer_VFile*)xferp)->matchesRemoteFile(remote_id, type))
00510                         && (remote_host == xferp->mRemoteHost)
00511                         && (callback == xferp->mCallback)
00512                         && (user_data == xferp->mCallbackDataHandle))
00513 
00514                 {
00515                         // cout << "requested a xfer already in progress" << endl;
00516                         return;
00517                 }
00518         }
00519 
00520         xferp = (LLXfer *) new LLXfer_VFile();
00521         if (xferp)
00522         {
00523                 addToList(xferp, mReceiveList, is_priority);
00524                 ((LLXfer_VFile *)xferp)->initializeRequest(getNextID(),
00525                         vfs,
00526                         local_id,
00527                         remote_id,
00528                         type,
00529                         remote_host,
00530                         callback,
00531                         user_data);
00532                 startPendingDownloads();
00533         }
00534         else
00535         {
00536                 llerrs << "Xfer allocation error" << llendl;
00537         }
00538 
00539 }
00540 
00541 /*
00542 void LLXferManager::requestXfer(
00543                                                                 const char *local_filename, 
00544                                                                 BOOL delete_remote_on_completion,
00545                                                                 U64 xfer_id, 
00546                                                                 const LLHost &remote_host, 
00547                                                                 void (*callback)(void **,S32),
00548                                                                 void **user_data)
00549 {
00550         LLXfer *xferp;
00551 
00552         for (xferp = mReceiveList; xferp ; xferp = xferp->mNext)
00553         {
00554                 if (xferp->getXferTypeTag() == LLXfer::XFER_FILE
00555                         && (((LLXfer_File*)xferp)->matchesLocalFilename(local_filename))
00556                         && (xfer_id == xferp->mID)
00557                         && (remote_host == xferp->mRemoteHost)
00558                         && (callback == xferp->mCallback)
00559                         && (user_data == xferp->mCallbackDataHandle))
00560 
00561                 {
00562                         // cout << "requested a xfer already in progress" << endl;
00563                         return;
00564                 }
00565         }
00566 
00567         xferp = (LLXfer *) new LLXfer_File();
00568         if (xferp)
00569         {
00570                 xferp->mNext = mReceiveList;
00571                 mReceiveList = xferp;
00572 
00573                 ((LLXfer_File *)xferp)->initializeRequest(xfer_id,local_filename,"",LL_PATH_NONE,remote_host,delete_remote_on_completion,callback,user_data);
00574                 startPendingDownloads();
00575         }
00576         else
00577         {
00578                 llerrs << "Xfer allcoation error" << llendl;
00579         }
00580 }
00581 
00582 void LLXferManager::requestXfer(U64 xfer_id, const LLHost &remote_host, BOOL delete_remote_on_completion, void (*callback)(void *,S32,void **,S32),void **user_data)
00583 {
00584         LLXfer *xferp;
00585 
00586         xferp = (LLXfer *) new LLXfer_Mem();
00587         if (xferp)
00588         {
00589                 xferp->mNext = mReceiveList;
00590                 mReceiveList = xferp;
00591 
00592                 ((LLXfer_Mem *)xferp)->initializeRequest(xfer_id,"",LL_PATH_NONE,remote_host,delete_remote_on_completion,callback,user_data);
00593                 startPendingDownloads();
00594         }
00595         else
00596         {
00597                 llerrs << "Xfer allcoation error" << llendl;
00598         }
00599 }
00600 */
00602 
00603 void LLXferManager::processReceiveData (LLMessageSystem *mesgsys, void ** /*user_data*/)
00604 {
00605         // there's sometimes an extra 4 bytes added to an xfer payload
00606         const S32 BUF_SIZE = LL_XFER_LARGE_PAYLOAD + 4;
00607         char fdata_buf[LL_XFER_LARGE_PAYLOAD + 4];              /* Flawfinder : ignore */
00608         S32 fdata_size;
00609         U64 id;
00610         S32 packetnum;
00611         LLXfer * xferp;
00612         
00613         mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
00614         mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Packet, packetnum);
00615 
00616         fdata_size = mesgsys->getSizeFast(_PREHASH_DataPacket,_PREHASH_Data);
00617         mesgsys->getBinaryDataFast(_PREHASH_DataPacket, _PREHASH_Data, fdata_buf, 0, 0, BUF_SIZE);
00618 
00619         xferp = findXfer(id, mReceiveList);
00620 
00621         if (!xferp) 
00622         {
00623                 char U64_BUF[MAX_STRING];               /* Flawfinder : ignore */
00624                 llwarns << "received xfer data from " << mesgsys->getSender()
00625                         << " for non-existent xfer id: "
00626                         << U64_to_str(id, U64_BUF, sizeof(U64_BUF)) << llendl;
00627                 return;
00628         }
00629 
00630         S32 xfer_size;
00631 
00632         if (decodePacketNum(packetnum) != xferp->mPacketNum) // is the packet different from what we were expecting?
00633         {
00634                 // confirm it if it was a resend of the last one, since the confirmation might have gotten dropped
00635                 if (decodePacketNum(packetnum) == (xferp->mPacketNum - 1))
00636                 {
00637                         llinfos << "Reconfirming xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " packet " << packetnum << llendl;                  sendConfirmPacket(mesgsys, id, decodePacketNum(packetnum), mesgsys->getSender());
00638                 }
00639                 else
00640                 {
00641                         llinfos << "Ignoring xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " recv'd packet " << packetnum << "; expecting " << xferp->mPacketNum << llendl;
00642                 }
00643                 return;         
00644         }
00645 
00646         S32 result = 0;
00647 
00648         if (xferp->mPacketNum == 0) // first packet has size encoded as additional S32 at beginning of data
00649         {
00650                 ntohmemcpy(&xfer_size,fdata_buf,MVT_S32,sizeof(S32));
00651                 
00652 // do any necessary things on first packet ie. allocate memory
00653                 xferp->setXferSize(xfer_size);
00654 
00655                 // adjust buffer start and size
00656                 result = xferp->receiveData(&(fdata_buf[sizeof(S32)]),fdata_size-(sizeof(S32)));
00657         }
00658         else
00659         {
00660                 result = xferp->receiveData(fdata_buf,fdata_size);
00661         }
00662         
00663         if (result == LL_ERR_CANNOT_OPEN_FILE)
00664         {
00665                         xferp->abort(LL_ERR_CANNOT_OPEN_FILE);
00666                         removeXfer(xferp,&mReceiveList);
00667                         startPendingDownloads();
00668                         return;         
00669         }
00670 
00671         xferp->mPacketNum++;  // expect next packet
00672 
00673         if (!mUseAckThrottling)
00674         {
00675                 // No throttling, confirm right away
00676                 sendConfirmPacket(mesgsys, id, decodePacketNum(packetnum), mesgsys->getSender());
00677         }
00678         else
00679         {
00680                 // Throttling, put on queue to be confirmed later.
00681                 LLXferAckInfo ack_info;
00682                 ack_info.mID = id;
00683                 ack_info.mPacketNum = decodePacketNum(packetnum);
00684                 ack_info.mRemoteHost = mesgsys->getSender();
00685                 mXferAckQueue.push(ack_info);
00686         }
00687 
00688         if (isLastPacket(packetnum))
00689         {
00690                 xferp->processEOF();
00691                 removeXfer(xferp,&mReceiveList);
00692                 startPendingDownloads();
00693         }
00694 }
00695 
00697 
00698 void LLXferManager::sendConfirmPacket (LLMessageSystem *mesgsys, U64 id, S32 packetnum, const LLHost &remote_host)
00699 {
00700 #if LL_XFER_PROGRESS_MESSAGES
00701         if (!(packetnum % 50))
00702         {
00703                 cout << "confirming xfer packet #" << packetnum << endl;
00704         }
00705 #endif
00706         mesgsys->newMessageFast(_PREHASH_ConfirmXferPacket);
00707         mesgsys->nextBlockFast(_PREHASH_XferID);
00708         mesgsys->addU64Fast(_PREHASH_ID, id);
00709         mesgsys->addU32Fast(_PREHASH_Packet, packetnum);
00710 
00711         mesgsys->sendMessage(remote_host);
00712 }
00713 
00715 
00716 void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user_data*/)
00717 {
00718                 
00719         U64 id;
00720         char local_filename[MAX_STRING];                /* Flawfinder : ignore */
00721         ELLPath local_path = LL_PATH_NONE;
00722         S32 result = LL_ERR_NOERR;
00723         LLUUID  uuid;
00724         LLAssetType::EType type;
00725         S16 type_s16;
00726         BOOL b_use_big_packets;
00727 
00728         mesgsys->getBOOL("XferID", "UseBigPackets", b_use_big_packets);
00729         
00730         mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
00731         char U64_BUF[MAX_STRING];               /* Flawfinder : ignore */
00732         llinfos << "xfer request id: " << U64_to_str(id, U64_BUF, sizeof(U64_BUF))
00733                    << " to " << mesgsys->getSender() << llendl;
00734 
00735         mesgsys->getStringFast(_PREHASH_XferID, _PREHASH_Filename, MAX_STRING, local_filename);
00736         
00737         U8 local_path_u8;
00738         mesgsys->getU8("XferID", "FilePath", local_path_u8);
00739         if( local_path_u8 < (U8)LL_PATH_COUNT )
00740         {
00741                 local_path = (ELLPath)local_path_u8;
00742         }
00743         else
00744         {
00745                 llwarns << "Invalid file path in LLXferManager::processFileRequest() " << (U32)local_path_u8 << llendl;
00746         }
00747 
00748         mesgsys->getUUIDFast(_PREHASH_XferID, _PREHASH_VFileID, uuid);
00749         mesgsys->getS16Fast(_PREHASH_XferID, _PREHASH_VFileType, type_s16);
00750         type = (LLAssetType::EType)type_s16;
00751 
00752         LLXfer *xferp;
00753 
00754         if (uuid != LLUUID::null)
00755         {
00756                 if(NULL == LLAssetType::lookup(type))
00757                 {
00758                         llwarns << "Invalid type for xfer request: " << uuid << ":"
00759                                         << type_s16 << " to " << mesgsys->getSender() << llendl;
00760                         return;
00761                 }
00762                         
00763                 llinfos << "starting vfile transfer: " << uuid << "," << LLAssetType::lookup(type) << " to " << mesgsys->getSender() << llendl;
00764 
00765                 if (! mVFS)
00766                 {
00767                         llwarns << "Attempt to send VFile w/o available VFS" << llendl;
00768                         return;
00769                 }
00770 
00771                 xferp = (LLXfer *)new LLXfer_VFile(mVFS, uuid, type);
00772                 if (xferp)
00773                 {
00774                         xferp->mNext = mSendList;
00775                         mSendList = xferp;      
00776                         result = xferp->startSend(id,mesgsys->getSender());
00777                 }
00778                 else
00779                 {
00780                         llerrs << "Xfer allcoation error" << llendl;
00781                 }
00782         }
00783         else if (strlen(local_filename))                /* Flawfinder : ignore */
00784         {
00785                 std::string expanded_filename = gDirUtilp->getExpandedFilename( local_path, local_filename );
00786                 llinfos << "starting file transfer: " <<  expanded_filename << " to " << mesgsys->getSender() << llendl;
00787 
00788                 BOOL delete_local_on_completion = FALSE;
00789                 mesgsys->getBOOL("XferID", "DeleteOnCompletion", delete_local_on_completion);
00790 
00791                 // -1 chunk_size causes it to use the default
00792                 xferp = (LLXfer *)new LLXfer_File(expanded_filename, delete_local_on_completion, b_use_big_packets ? LL_XFER_LARGE_PAYLOAD : -1);
00793                 
00794                 if (xferp)
00795                 {
00796                         xferp->mNext = mSendList;
00797                         mSendList = xferp;      
00798                         result = xferp->startSend(id,mesgsys->getSender());
00799                 }
00800                 else
00801                 {
00802                         llerrs << "Xfer allcoation error" << llendl;
00803                 }
00804         }
00805         else
00806         {
00807                 char U64_BUF[MAX_STRING];               /* Flawfinder : ignore */
00808                 llinfos << "starting memory transfer: "
00809                         << U64_to_str(id, U64_BUF, sizeof(U64_BUF)) << " to "
00810                         << mesgsys->getSender() << llendl;
00811 
00812                 xferp = findXfer(id, mSendList);
00813                 
00814                 if (xferp)
00815                 {
00816                         result = xferp->startSend(id,mesgsys->getSender());
00817                 }
00818                 else
00819                 {
00820                         llinfos << "Warning: " << U64_BUF << " not found." << llendl;
00821                         result = LL_ERR_FILE_NOT_FOUND;
00822                 }
00823         }
00824 
00825         if (result)
00826         {
00827                 if (xferp)
00828                 {
00829                         xferp->abort(result);
00830                         removeXfer(xferp,&mSendList);
00831                 }
00832                 else // can happen with a memory transfer not found
00833                 {
00834                         llinfos << "Aborting xfer to " << mesgsys->getSender() << " with error: " << result << llendl;
00835 
00836                         mesgsys->newMessageFast(_PREHASH_AbortXfer);
00837                         mesgsys->nextBlockFast(_PREHASH_XferID);
00838                         mesgsys->addU64Fast(_PREHASH_ID, id);
00839                         mesgsys->addS32Fast(_PREHASH_Result, result);
00840         
00841                         mesgsys->sendMessage(mesgsys->getSender());             
00842                 }
00843         }
00844         else if(xferp && (numActiveXfers(xferp->mRemoteHost) < mMaxOutgoingXfersPerCircuit))
00845         {
00846                 xferp->sendNextPacket();
00847                 changeNumActiveXfers(xferp->mRemoteHost,1);
00848 //              llinfos << "***STARTING XFER IMMEDIATELY***" << llendl;
00849         }
00850         else
00851         {
00852                 if(xferp)
00853                 {
00854                         llinfos << "  queueing xfer request, " << numPendingXfers(xferp->mRemoteHost) << " ahead of this one" << llendl;
00855                 }
00856                 else
00857                 {
00858                         llwarns << "LLXferManager::processFileRequest() - no xfer found!"
00859                                         << llendl;
00860                 }
00861         }
00862 }
00863 
00865 
00866 void LLXferManager::processConfirmation (LLMessageSystem *mesgsys, void ** /*user_data*/)
00867 {
00868         U64 id = 0;
00869         S32 packetNum = 0;
00870 
00871         mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
00872         mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Packet, packetNum);
00873 
00874         LLXfer* xferp = findXfer(id, mSendList);
00875         if (xferp)
00876         {
00877 //              cout << "confirmed packet #" << packetNum << " ping: "<< xferp->ACKTimer.getElapsedTimeF32() <<  endl;
00878                 xferp->mWaitingForACK = FALSE;
00879                 if (xferp->mStatus == e_LL_XFER_IN_PROGRESS)
00880                 {
00881                         xferp->sendNextPacket();
00882                 }
00883                 else
00884                 {
00885                         removeXfer(xferp, &mSendList);
00886                 }
00887         }
00888 }
00889 
00891 
00892 void LLXferManager::retransmitUnackedPackets ()
00893 {
00894         LLXfer *xferp;
00895         LLXfer *delp;
00896         xferp = mReceiveList;
00897         while(xferp)
00898         {
00899                 if (xferp->mStatus == e_LL_XFER_IN_PROGRESS)
00900                 {
00901                         // if the circuit dies, abort
00902                         if (! gMessageSystem->mCircuitInfo.isCircuitAlive( xferp->mRemoteHost ))
00903                         {
00904                                 llinfos << "Xfer found in progress on dead circuit, aborting" << llendl;
00905                                 xferp->mCallbackResult = LL_ERR_CIRCUIT_GONE;
00906                                 xferp->processEOF();
00907                                 delp = xferp;
00908                                 xferp = xferp->mNext;
00909                                 removeXfer(delp,&mReceiveList);
00910                                 continue;
00911                         }
00912                                 
00913                 }
00914                 xferp = xferp->mNext;
00915         }
00916 
00917         xferp = mSendList; 
00918         updateHostStatus();
00919         F32 et;
00920         while (xferp)
00921         {
00922                 if (xferp->mWaitingForACK && ( (et = xferp->ACKTimer.getElapsedTimeF32()) > LL_PACKET_TIMEOUT))
00923                 {
00924                         if (xferp->mRetries > LL_PACKET_RETRY_LIMIT)
00925                         {
00926                                 llinfos << "dropping xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " packet retransmit limit exceeded, xfer dropped" << llendl;
00927                                 xferp->abort(LL_ERR_TCP_TIMEOUT);
00928                                 delp = xferp;
00929                                 xferp = xferp->mNext;
00930                                 removeXfer(delp,&mSendList);
00931                         }
00932                         else
00933                         {
00934                                 llinfos << "resending xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " packet unconfirmed after: "<< et << " sec, packet " << xferp->mPacketNum << llendl;
00935                                 xferp->resendLastPacket();
00936                                 xferp = xferp->mNext;
00937                         }
00938                 }
00939                 else if ((xferp->mStatus == e_LL_XFER_REGISTERED) && ( (et = xferp->ACKTimer.getElapsedTimeF32()) > LL_XFER_REGISTRATION_TIMEOUT))
00940                 {
00941                         llinfos << "registered xfer never requested, xfer dropped" << llendl;
00942                         xferp->abort(LL_ERR_TCP_TIMEOUT);
00943                         delp = xferp;
00944                         xferp = xferp->mNext;
00945                         removeXfer(delp,&mSendList);
00946                 }
00947                 else if (xferp->mStatus == e_LL_XFER_ABORTED)
00948                 {
00949                         llwarns << "Removing aborted xfer " << xferp->mRemoteHost << ":" << xferp->getName() << llendl;
00950                         delp = xferp;
00951                         xferp = xferp->mNext;
00952                         removeXfer(delp,&mSendList);
00953                 }
00954                 else if (xferp->mStatus == e_LL_XFER_PENDING)
00955                 {
00956 //                      llinfos << "*** numActiveXfers = " << numActiveXfers(xferp->mRemoteHost) << "        mMaxOutgoingXfersPerCircuit = " << mMaxOutgoingXfersPerCircuit << llendl;   
00957                         if (numActiveXfers(xferp->mRemoteHost) < mMaxOutgoingXfersPerCircuit)
00958                         {
00959 //                          llinfos << "bumping pending xfer to active" << llendl;
00960                                 xferp->sendNextPacket();
00961                                 changeNumActiveXfers(xferp->mRemoteHost,1);
00962                         }                       
00963                         xferp = xferp->mNext;
00964                 }
00965                 else
00966                 {
00967                         xferp = xferp->mNext;
00968                 }
00969         }
00970 
00971         //
00972         // HACK - if we're using xfer confirm throttling, throttle our xfer confirms here
00973         // so we don't blow through bandwidth.
00974         //
00975 
00976         while (mXferAckQueue.getLength())
00977         {
00978                 if (mAckThrottle.checkOverflow(1000.0f*8.0f))
00979                 {
00980                         break;
00981                 }
00982                 //llinfos << "Confirm packet queue length:" << mXferAckQueue.getLength() << llendl;
00983                 LLXferAckInfo ack_info;
00984                 mXferAckQueue.pop(ack_info);
00985                 //llinfos << "Sending confirm packet" << llendl;
00986                 sendConfirmPacket(gMessageSystem, ack_info.mID, ack_info.mPacketNum, ack_info.mRemoteHost);
00987                 mAckThrottle.throttleOverflow(1000.f*8.f); // Assume 1000 bytes/packet
00988         }
00989 }
00990 
00991 
00993 
00994 void LLXferManager::processAbort (LLMessageSystem *mesgsys, void ** /*user_data*/)
00995 {
00996         U64 id = 0;
00997         S32 result_code = 0;
00998         LLXfer * xferp;
00999 
01000         mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
01001         mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Result, result_code);
01002 
01003         xferp = findXfer(id, mReceiveList);
01004         if (xferp)
01005         {
01006                 xferp->mCallbackResult = result_code;
01007                 xferp->processEOF();
01008                 removeXfer(xferp, &mReceiveList);
01009                 startPendingDownloads();
01010         }
01011 }
01012 
01014 
01015 void LLXferManager::startPendingDownloads()
01016 {
01017         // This method goes through the list, and starts pending
01018         // operations until active downloads == mMaxIncomingXfers. I copy
01019         // the pending xfers into a temporary data structure because the
01020         // xfers are stored as an intrusive linked list where older
01021         // requests get pushed toward the back. Thus, if we didn't do a
01022         // stateful iteration, it would be possible for old requests to
01023         // never start.
01024         LLXfer* xferp = mReceiveList;
01025         std::list<LLXfer*> pending_downloads;
01026         S32 download_count = 0;
01027         S32 pending_count = 0;
01028         while(xferp)
01029         {
01030                 if(xferp->mStatus == e_LL_XFER_PENDING)
01031                 {
01032                         ++pending_count;
01033                         pending_downloads.push_front(xferp);
01034                 }
01035                 else if(xferp->mStatus == e_LL_XFER_IN_PROGRESS)
01036                 {
01037                         ++download_count;
01038                 }
01039                 xferp = xferp->mNext;
01040         }
01041 
01042         S32 start_count = mMaxIncomingXfers - download_count;
01043 
01044         lldebugs << "LLXferManager::startPendingDownloads() - XFER_IN_PROGRESS: "
01045                          << download_count << " XFER_PENDING: " << pending_count
01046                          << " startring " << llmin(start_count, pending_count) << llendl;
01047 
01048         if((start_count > 0) && (pending_count > 0))
01049         {
01050                 S32 result;
01051                 for (std::list<LLXfer*>::iterator iter = pending_downloads.begin();
01052                          iter != pending_downloads.end(); ++iter)
01053                 {
01054                         xferp = *iter;
01055                         if (start_count-- <= 0)
01056                                 break;
01057                         result = xferp->startDownload();
01058                         if(result)
01059                         {
01060                                 xferp->abort(result);
01061                                 ++start_count;
01062                         }
01063                 }
01064         }
01065 }
01066 
01068 
01069 void LLXferManager::addToList(LLXfer* xferp, LLXfer*& head, BOOL is_priority)
01070 {
01071         if(is_priority)
01072         {
01073                 xferp->mNext = NULL;
01074                 LLXfer* next = head;
01075                 if(next)
01076                 {
01077                         while(next->mNext)
01078                         {
01079                                 next = next->mNext;
01080                         }
01081                         next->mNext = xferp;
01082                 }
01083                 else
01084                 {
01085                         head = xferp;
01086                 }
01087         }
01088         else
01089         {
01090                 xferp->mNext = head;
01091                 head = xferp;
01092         }
01093 }
01094 
01096 //  Globals and C routines
01098 
01099 LLXferManager *gXferManager = NULL;
01100 
01101 
01102 void start_xfer_manager(LLVFS *vfs)
01103 {
01104         gXferManager = new LLXferManager(vfs);
01105 }
01106 
01107 void cleanup_xfer_manager()
01108 {
01109         if (gXferManager)
01110         {
01111                 delete(gXferManager);
01112                 gXferManager = NULL;
01113         }
01114 }
01115 
01116 void process_confirm_packet (LLMessageSystem *mesgsys, void **user_data)
01117 {
01118         gXferManager->processConfirmation(mesgsys,user_data);
01119 }
01120 
01121 void process_request_xfer(LLMessageSystem *mesgsys, void **user_data)
01122 {
01123         gXferManager->processFileRequest(mesgsys,user_data);
01124 }
01125 
01126 void continue_file_receive(LLMessageSystem *mesgsys, void **user_data)
01127 {
01128 #if LL_TEST_XFER_REXMIT
01129         if (ll_frand() > 0.05f)
01130         {
01131 #endif
01132                 gXferManager->processReceiveData(mesgsys,user_data);
01133 #if LL_TEST_XFER_REXMIT
01134         }
01135         else
01136         {
01137                 cout << "oops! dropped a xfer packet" << endl;
01138         }
01139 #endif
01140 }
01141 
01142 void process_abort_xfer(LLMessageSystem *mesgsys, void **user_data)
01143 {
01144         gXferManager->processAbort(mesgsys,user_data);
01145 }
01146 
01147 
01148 
01149 
01150 
01151 
01152 
01153 
01154 
01155 
01156 
01157 
01158 
01159 
01160 
01161 
01162 
01163 
01164 
01165 
01166 
01167 
01168 
01169 
01170 
01171 

Generated on Fri May 16 08:32:32 2008 for SecondLife by  doxygen 1.5.5