00001
00033 #include "linden_common.h"
00034
00035 #include "lltransfermanager.h"
00036
00037 #include "llerror.h"
00038 #include "message.h"
00039 #include "lldatapacker.h"
00040
00041 #include "lltransfersourcefile.h"
00042 #include "lltransfersourceasset.h"
00043 #include "lltransfertargetfile.h"
00044 #include "lltransfertargetvfile.h"
00045
00046 const S32 MAX_PACKET_DATA_SIZE = 2048;
00047 const S32 MAX_PARAMS_SIZE = 1024;
00048
00049 LLTransferManager gTransferManager;
00050 LLTransferSource::stype_scfunc_map LLTransferSource::sSourceCreateMap;
00051
00052
00053
00054
00055
00056 LLTransferManager::LLTransferManager() :
00057 mValid(FALSE)
00058 {
00059 S32 i;
00060 for (i = 0; i < LLTTT_NUM_TYPES; i++)
00061 {
00062 mTransferBitsIn[i] = 0;
00063 mTransferBitsOut[i] = 0;
00064 }
00065 }
00066
00067
00068 LLTransferManager::~LLTransferManager()
00069 {
00070 if (mValid)
00071 {
00072 llwarns << "LLTransferManager::~LLTransferManager - Should have been cleaned up by message system shutdown process" << llendl;
00073 cleanup();
00074 }
00075 }
00076
00077
00078 void LLTransferManager::init()
00079 {
00080 if (mValid)
00081 {
00082 llerrs << "Double initializing LLTransferManager!" << llendl;
00083 }
00084 mValid = TRUE;
00085
00086
00087 gMessageSystem->setHandlerFunc("TransferRequest", processTransferRequest, NULL);
00088 gMessageSystem->setHandlerFunc("TransferInfo", processTransferInfo, NULL);
00089 gMessageSystem->setHandlerFunc("TransferPacket", processTransferPacket, NULL);
00090 gMessageSystem->setHandlerFunc("TransferAbort", processTransferAbort, NULL);
00091 }
00092
00093
00094 void LLTransferManager::cleanup()
00095 {
00096 mValid = FALSE;
00097
00098 host_tc_map::iterator iter;
00099 for (iter = mTransferConnections.begin(); iter != mTransferConnections.end(); iter++)
00100 {
00101 delete iter->second;
00102 }
00103 mTransferConnections.clear();
00104 }
00105
00106
00107 void LLTransferManager::updateTransfers()
00108 {
00109 host_tc_map::iterator iter,cur;
00110
00111 iter = mTransferConnections.begin();
00112
00113 while (iter !=mTransferConnections.end())
00114 {
00115 cur = iter;
00116 iter++;
00117 cur->second->updateTransfers();
00118 }
00119 }
00120
00121
00122 void LLTransferManager::cleanupConnection(const LLHost &host)
00123 {
00124 host_tc_map::iterator iter;
00125 iter = mTransferConnections.find(host);
00126 if (iter == mTransferConnections.end())
00127 {
00128
00129
00130
00131 return;
00132 }
00133 LLTransferConnection *connp = iter->second;
00134 delete connp;
00135 mTransferConnections.erase(iter);
00136 }
00137
00138
00139 LLTransferConnection *LLTransferManager::getTransferConnection(const LLHost &host)
00140 {
00141 host_tc_map::iterator iter;
00142 iter = mTransferConnections.find(host);
00143 if (iter == mTransferConnections.end())
00144 {
00145 mTransferConnections[host] = new LLTransferConnection(host);
00146 return mTransferConnections[host];
00147 }
00148
00149 return iter->second;
00150 }
00151
00152
00153 LLTransferSourceChannel *LLTransferManager::getSourceChannel(const LLHost &host, const LLTransferChannelType type)
00154 {
00155 LLTransferConnection *tcp = getTransferConnection(host);
00156 if (!tcp)
00157 {
00158 return NULL;
00159 }
00160 return tcp->getSourceChannel(type);
00161 }
00162
00163
00164
00165 LLTransferTargetChannel *LLTransferManager::getTargetChannel(const LLHost &host, const LLTransferChannelType type)
00166 {
00167 LLTransferConnection *tcp = getTransferConnection(host);
00168 if (!tcp)
00169 {
00170 return NULL;
00171 }
00172 return tcp->getTargetChannel(type);
00173 }
00174
00175
00176 LLTransferSourceParams::~LLTransferSourceParams()
00177 { }
00178
00179
00180 LLTransferSource *LLTransferManager::findTransferSource(const LLUUID &transfer_id)
00181 {
00182
00183
00184
00185 host_tc_map::iterator iter;
00186 for (iter = mTransferConnections.begin(); iter != mTransferConnections.end(); iter++)
00187 {
00188 LLTransferConnection *tcp = iter->second;
00189 LLTransferConnection::tsc_iter sc_iter;
00190 for (sc_iter = tcp->mTransferSourceChannels.begin(); sc_iter != tcp->mTransferSourceChannels.end(); sc_iter++)
00191 {
00192 LLTransferSourceChannel *scp = *sc_iter;
00193 LLTransferSource *sourcep = scp->findTransferSource(transfer_id);
00194 if (sourcep)
00195 {
00196 return sourcep;
00197 }
00198 }
00199 }
00200
00201 return NULL;
00202 }
00203
00204
00205
00206
00207
00208
00209 void LLTransferManager::processTransferRequest(LLMessageSystem *msgp, void **)
00210 {
00211
00212
00213 LLUUID transfer_id;
00214 LLTransferSourceType source_type;
00215 LLTransferChannelType channel_type;
00216 F32 priority;
00217
00218 msgp->getUUID("TransferInfo", "TransferID", transfer_id);
00219 msgp->getS32("TransferInfo", "SourceType", (S32 &)source_type);
00220 msgp->getS32("TransferInfo", "ChannelType", (S32 &)channel_type);
00221 msgp->getF32("TransferInfo", "Priority", priority);
00222
00223 LLTransferSourceChannel *tscp = gTransferManager.getSourceChannel(msgp->getSender(), channel_type);
00224
00225 if (!tscp)
00226 {
00227 llwarns << "Source channel not found" << llendl;
00228 return;
00229 }
00230
00231 if (tscp->findTransferSource(transfer_id))
00232 {
00233 llwarns << "Duplicate request for transfer " << transfer_id << ", aborting!" << llendl;
00234 return;
00235 }
00236
00237 S32 size = msgp->getSize("TransferInfo", "Params");
00238 if(size > MAX_PARAMS_SIZE)
00239 {
00240 llwarns << "LLTransferManager::processTransferRequest params too big."
00241 << llendl;
00242 return;
00243 }
00244
00245
00246 LLTransferSource* tsp = LLTransferSource::createSource(
00247 source_type,
00248 transfer_id,
00249 priority);
00250 if(!tsp)
00251 {
00252 llwarns << "LLTransferManager::processTransferRequest couldn't create"
00253 << " transfer source!" << llendl;
00254 return;
00255 }
00256 U8 tmp[MAX_PARAMS_SIZE];
00257 msgp->getBinaryData("TransferInfo", "Params", tmp, size);
00258
00259 LLDataPackerBinaryBuffer dpb(tmp, MAX_PARAMS_SIZE);
00260 BOOL unpack_ok = tsp->unpackParams(dpb);
00261 if (!unpack_ok)
00262 {
00263
00264
00265
00266 llwarns << "LLTransferManager::processTransferRequest: bad parameters."
00267 << llendl;
00268 delete tsp;
00269 return;
00270 }
00271
00272 tscp->addTransferSource(tsp);
00273 tsp->initTransfer();
00274 }
00275
00276
00277
00278 void LLTransferManager::processTransferInfo(LLMessageSystem *msgp, void **)
00279 {
00280
00281
00282 LLUUID transfer_id;
00283 LLTransferTargetType target_type;
00284 LLTransferChannelType channel_type;
00285 LLTSCode status;
00286 S32 size;
00287
00288 msgp->getUUID("TransferInfo", "TransferID", transfer_id);
00289 msgp->getS32("TransferInfo", "TargetType", (S32 &)target_type);
00290 msgp->getS32("TransferInfo", "ChannelType", (S32 &)channel_type);
00291 msgp->getS32("TransferInfo", "Status", (S32 &)status);
00292 msgp->getS32("TransferInfo", "Size", size);
00293
00294
00295 LLTransferTargetChannel *ttcp = gTransferManager.getTargetChannel(msgp->getSender(), channel_type);
00296 if (!ttcp)
00297 {
00298 llwarns << "Target channel not found" << llendl;
00299
00300 return;
00301 }
00302
00303 LLTransferTarget *ttp = ttcp->findTransferTarget(transfer_id);
00304 if (!ttp)
00305 {
00306 llwarns << "TransferInfo for unknown transfer! Not able to handle this yet!" << llendl;
00307
00308
00309 return;
00310 }
00311
00312 if (status != LLTS_OK)
00313 {
00314 llwarns << transfer_id << ": Non-ok status, cleaning up" << llendl;
00315 ttp->completionCallback(status);
00316
00317 ttcp->deleteTransfer(ttp);
00318 return;
00319 }
00320
00321
00322 S32 params_size = msgp->getSize("TransferInfo", "Params");
00323 if(params_size > MAX_PARAMS_SIZE)
00324 {
00325 llwarns << "LLTransferManager::processTransferInfo params too big."
00326 << llendl;
00327 return;
00328 }
00329 else if(params_size > 0)
00330 {
00331 U8 tmp[MAX_PARAMS_SIZE];
00332 msgp->getBinaryData("TransferInfo", "Params", tmp, params_size);
00333 LLDataPackerBinaryBuffer dpb(tmp, MAX_PARAMS_SIZE);
00334 if (!ttp->unpackParams(dpb))
00335 {
00336
00337
00338 llwarns << "LLTransferManager::processTransferRequest: bad params."
00339 << llendl;
00340 ttp->abortTransfer();
00341 ttcp->deleteTransfer(ttp);
00342 return;
00343 }
00344 }
00345
00346 llinfos << "Receiving " << transfer_id << ", size " << size << " bytes" << llendl;
00347 ttp->setSize(size);
00348 ttp->setGotInfo(TRUE);
00349
00350
00351
00352
00353
00354
00355
00356 while (1)
00357 {
00358 S32 packet_id = 0;
00359 U8 tmp_data[MAX_PACKET_DATA_SIZE];
00360
00361 packet_id = ttp->getNextPacketID();
00362 if (ttp->mDelayedPacketMap.find(packet_id) != ttp->mDelayedPacketMap.end())
00363 {
00364
00365
00366
00367 LLTransferPacket *packetp = ttp->mDelayedPacketMap[packet_id];
00368
00369
00370
00371 packet_id = packetp->mPacketID;
00372 size = packetp->mSize;
00373 if (size)
00374 {
00375 if ((packetp->mDatap != NULL) && (size<(S32)sizeof(tmp_data)))
00376 {
00377 memcpy(tmp_data, packetp->mDatap, size);
00378 }
00379 }
00380 status = packetp->mStatus;
00381 ttp->mDelayedPacketMap.erase(packet_id);
00382 delete packetp;
00383 }
00384 else
00385 {
00386
00387 break;
00388 }
00389
00390 LLTSCode ret_code = ttp->dataCallback(packet_id, tmp_data, size);
00391 if (ret_code == LLTS_OK)
00392 {
00393 ttp->setLastPacketID(packet_id);
00394 }
00395
00396 if (status != LLTS_OK)
00397 {
00398 if (status != LLTS_DONE)
00399 {
00400 llwarns << "LLTransferManager::processTransferInfo Error in playback!" << llendl;
00401 }
00402 else
00403 {
00404 llinfos << "LLTransferManager::processTransferInfo replay FINISHED for " << transfer_id << llendl;
00405 }
00406
00407 ttp->completionCallback(status);
00408 ttcp->deleteTransfer(ttp);
00409 return;
00410 }
00411 }
00412 }
00413
00414
00415
00416 void LLTransferManager::processTransferPacket(LLMessageSystem *msgp, void **)
00417 {
00418
00419
00420 LLUUID transfer_id;
00421 LLTransferChannelType channel_type;
00422 S32 packet_id;
00423 LLTSCode status;
00424 S32 size;
00425 msgp->getUUID("TransferData", "TransferID", transfer_id);
00426 msgp->getS32("TransferData", "ChannelType", (S32 &)channel_type);
00427 msgp->getS32("TransferData", "Packet", packet_id);
00428 msgp->getS32("TransferData", "Status", (S32 &)status);
00429
00430
00431
00432 LLTransferTargetChannel *ttcp = gTransferManager.getTargetChannel(msgp->getSender(), channel_type);
00433 if (!ttcp)
00434 {
00435 llwarns << "Target channel not found" << llendl;
00436 return;
00437 }
00438
00439 LLTransferTarget *ttp = ttcp->findTransferTarget(transfer_id);
00440 if (!ttp)
00441 {
00442 llwarns << "Didn't find matching transfer for " << transfer_id
00443 << " processing packet " << packet_id
00444 << " from " << msgp->getSender() << llendl;
00445 return;
00446 }
00447
00448 size = msgp->getSize("TransferData", "Data");
00449
00450 S32 msg_bytes = 0;
00451 if (msgp->getReceiveCompressedSize())
00452 {
00453 msg_bytes = msgp->getReceiveCompressedSize();
00454 }
00455 else
00456 {
00457 msg_bytes = msgp->getReceiveSize();
00458 }
00459 gTransferManager.addTransferBitsIn(ttcp->mChannelType, msg_bytes*8);
00460
00461 if ((size < 0) || (size > MAX_PACKET_DATA_SIZE))
00462 {
00463 llwarns << "Invalid transfer packet size " << size << llendl;
00464 return;
00465 }
00466
00467 U8 tmp_data[MAX_PACKET_DATA_SIZE];
00468 if (size > 0)
00469 {
00470
00471 msgp->getBinaryData("TransferData", "Data", tmp_data, size);
00472 }
00473
00474 if ((!ttp->gotInfo()) || (ttp->getNextPacketID() != packet_id))
00475 {
00476
00477 if(!ttp->addDelayedPacket(packet_id, status, tmp_data, size))
00478 {
00479
00480 llwarns << "Too many delayed packets processing transfer "
00481 << transfer_id << " from " << msgp->getSender() << llendl;
00482 ttp->abortTransfer();
00483 ttcp->deleteTransfer(ttp);
00484 return;
00485 }
00486 #if 0
00487
00488 const S32 LL_TRANSFER_WARN_GAP = 10;
00489 if(!ttp->gotInfo())
00490 {
00491 llwarns << "Got data packet before information in transfer "
00492 << transfer_id << " from " << msgp->getSender()
00493 << ", got " << packet_id << llendl;
00494 }
00495 else if((packet_id - ttp->getNextPacketID()) > LL_TRANSFER_WARN_GAP)
00496 {
00497 llwarns << "Out of order packet in transfer " << transfer_id
00498 << " from " << msgp->getSender() << ", got " << packet_id
00499 << " expecting " << ttp->getNextPacketID() << llendl;
00500 }
00501 #endif
00502 return;
00503 }
00504
00505
00506
00507
00508
00509
00510
00511 BOOL done = FALSE;
00512 while (!done)
00513 {
00514 LLTSCode ret_code = ttp->dataCallback(packet_id, tmp_data, size);
00515 if (ret_code == LLTS_OK)
00516 {
00517 ttp->setLastPacketID(packet_id);
00518 }
00519
00520 if (status != LLTS_OK)
00521 {
00522 if (status != LLTS_DONE)
00523 {
00524 llwarns << "LLTransferManager::processTransferPacket Error in transfer!" << llendl;
00525 }
00526 else
00527 {
00528
00529 }
00530
00531 ttp->completionCallback(status);
00532 ttcp->deleteTransfer(ttp);
00533 return;
00534 }
00535
00536
00537 packet_id = ttp->getNextPacketID();
00538 if (ttp->mDelayedPacketMap.find(packet_id) != ttp->mDelayedPacketMap.end())
00539 {
00540
00541
00542
00543 LLTransferPacket *packetp = ttp->mDelayedPacketMap[packet_id];
00544
00545
00546
00547 packet_id = packetp->mPacketID;
00548 size = packetp->mSize;
00549 if (size)
00550 {
00551 if ((packetp->mDatap != NULL) && (size<(S32)sizeof(tmp_data)))
00552 {
00553 memcpy(tmp_data, packetp->mDatap, size);
00554 }
00555 }
00556 status = packetp->mStatus;
00557 ttp->mDelayedPacketMap.erase(packet_id);
00558 delete packetp;
00559 }
00560 else
00561 {
00562
00563 done = TRUE;
00564 }
00565 }
00566 }
00567
00568
00569
00570 void LLTransferManager::processTransferAbort(LLMessageSystem *msgp, void **)
00571 {
00572
00573
00574 LLUUID transfer_id;
00575 LLTransferChannelType channel_type;
00576 msgp->getUUID("TransferInfo", "TransferID", transfer_id);
00577 msgp->getS32("TransferInfo", "ChannelType", (S32 &)channel_type);
00578
00579
00580
00581 LLTransferTargetChannel *ttcp = gTransferManager.getTargetChannel(msgp->getSender(), channel_type);
00582 if (ttcp)
00583 {
00584 LLTransferTarget *ttp = ttcp->findTransferTarget(transfer_id);
00585 if (ttp)
00586 {
00587 ttp->abortTransfer();
00588 ttcp->deleteTransfer(ttp);
00589 return;
00590 }
00591 }
00592
00593
00594 LLTransferSourceChannel *tscp = gTransferManager.getSourceChannel(msgp->getSender(), channel_type);
00595 if (tscp)
00596 {
00597 LLTransferSource *tsp = tscp->findTransferSource(transfer_id);
00598 if (tsp)
00599 {
00600 tsp->abortTransfer();
00601 tscp->deleteTransfer(tsp);
00602 return;
00603 }
00604 }
00605
00606 llwarns << "Couldn't find transfer " << transfer_id << " to abort!" << llendl;
00607 }
00608
00609
00610
00611 void LLTransferManager::reliablePacketCallback(void **user_data, S32 result)
00612 {
00613 LLUUID *transfer_idp = (LLUUID *)user_data;
00614 if (result)
00615 {
00616 llwarns << "Aborting reliable transfer " << *transfer_idp << " due to failed reliable resends!" << llendl;
00617 LLTransferSource *tsp = gTransferManager.findTransferSource(*transfer_idp);
00618 if (tsp)
00619 {
00620 LLTransferSourceChannel *tscp = tsp->mChannelp;
00621 tsp->abortTransfer();
00622 tscp->deleteTransfer(tsp);
00623 }
00624 }
00625 delete transfer_idp;
00626 }
00627
00628
00629
00630
00631
00632 LLTransferConnection::LLTransferConnection(const LLHost &host)
00633 {
00634 mHost = host;
00635 }
00636
00637 LLTransferConnection::~LLTransferConnection()
00638 {
00639 tsc_iter itersc;
00640 for (itersc = mTransferSourceChannels.begin(); itersc != mTransferSourceChannels.end(); itersc++)
00641 {
00642 delete *itersc;
00643 }
00644 mTransferSourceChannels.clear();
00645
00646 ttc_iter itertc;
00647 for (itertc = mTransferTargetChannels.begin(); itertc != mTransferTargetChannels.end(); itertc++)
00648 {
00649 delete *itertc;
00650 }
00651 mTransferTargetChannels.clear();
00652 }
00653
00654
00655 void LLTransferConnection::updateTransfers()
00656 {
00657
00658 tsc_iter iter, cur;
00659 iter = mTransferSourceChannels.begin();
00660
00661 while (iter !=mTransferSourceChannels.end())
00662 {
00663 cur = iter;
00664 iter++;
00665 (*cur)->updateTransfers();
00666 }
00667
00668
00669
00670
00671
00672
00673 }
00674
00675
00676 LLTransferSourceChannel *LLTransferConnection::getSourceChannel(const LLTransferChannelType channel_type)
00677 {
00678 tsc_iter iter;
00679 for (iter = mTransferSourceChannels.begin(); iter != mTransferSourceChannels.end(); iter++)
00680 {
00681 if ((*iter)->getChannelType() == channel_type)
00682 {
00683 return *iter;
00684 }
00685 }
00686
00687 LLTransferSourceChannel *tscp = new LLTransferSourceChannel(channel_type, mHost);
00688 mTransferSourceChannels.push_back(tscp);
00689 return tscp;
00690 }
00691
00692
00693 LLTransferTargetChannel *LLTransferConnection::getTargetChannel(const LLTransferChannelType channel_type)
00694 {
00695 ttc_iter iter;
00696 for (iter = mTransferTargetChannels.begin(); iter != mTransferTargetChannels.end(); iter++)
00697 {
00698 if ((*iter)->getChannelType() == channel_type)
00699 {
00700 return *iter;
00701 }
00702 }
00703
00704 LLTransferTargetChannel *ttcp = new LLTransferTargetChannel(channel_type, mHost);
00705 mTransferTargetChannels.push_back(ttcp);
00706 return ttcp;
00707 }
00708
00709
00710
00711
00712
00713
00714 const S32 DEFAULT_PACKET_SIZE = 1000;
00715
00716
00717 LLTransferSourceChannel::LLTransferSourceChannel(const LLTransferChannelType channel_type, const LLHost &host) :
00718 mChannelType(channel_type),
00719 mHost(host),
00720 mTransferSources(LLTransferSource::sSetPriority, LLTransferSource::sGetPriority),
00721 mThrottleID(TC_ASSET)
00722 {
00723 }
00724
00725
00726 LLTransferSourceChannel::~LLTransferSourceChannel()
00727 {
00728 LLPriQueueMap<LLTransferSource*>::pqm_iter iter =
00729 mTransferSources.mMap.begin();
00730 LLPriQueueMap<LLTransferSource*>::pqm_iter end =
00731 mTransferSources.mMap.end();
00732 for (; iter != end; ++iter)
00733 {
00734
00735 (*iter).second->abortTransfer();
00736 delete iter->second;
00737 }
00738 mTransferSources.mMap.clear();
00739 }
00740
00741 void LLTransferSourceChannel::updatePriority(LLTransferSource *tsp, const F32 priority)
00742 {
00743 mTransferSources.reprioritize(priority, tsp);
00744 }
00745
00746 void LLTransferSourceChannel::updateTransfers()
00747 {
00748
00749
00750
00751
00752
00753 LLCircuitData *cdp = gMessageSystem->mCircuitInfo.findCircuit(getHost());
00754 if (!cdp)
00755 {
00756 return;
00757 }
00758
00759 if (cdp->isBlocked())
00760 {
00761
00762
00763
00764
00765
00766
00767 return;
00768 }
00769
00770 const S32 throttle_id = mThrottleID;
00771
00772 LLThrottleGroup &tg = cdp->getThrottleGroup();
00773
00774 if (tg.checkOverflow(throttle_id, 0.f))
00775 {
00776 return;
00777 }
00778
00779 LLPriQueueMap<LLTransferSource *>::pqm_iter iter, next;
00780
00781 BOOL done = FALSE;
00782 for (iter = mTransferSources.mMap.begin(); (iter != mTransferSources.mMap.end()) && !done;)
00783 {
00784
00785
00786 next = iter;
00787 next++;
00788
00789 LLTransferSource *tsp = iter->second;
00790 U8 *datap = NULL;
00791 S32 data_size = 0;
00792 BOOL delete_data = FALSE;
00793 S32 packet_id = 0;
00794 S32 sent_bytes = 0;
00795 LLTSCode status = LLTS_OK;
00796
00797
00798 packet_id = tsp->getNextPacketID();
00799 status = tsp->dataCallback(packet_id, DEFAULT_PACKET_SIZE, &datap, data_size, delete_data);
00800
00801 if (status == LLTS_SKIP)
00802 {
00803
00804
00805
00806 iter=next;
00807 continue;
00808 }
00809
00810 LLUUID *cb_uuid = new LLUUID(tsp->getID());
00811 LLUUID transaction_id = tsp->getID();
00812
00813
00814
00815 gMessageSystem->newMessage("TransferPacket");
00816 gMessageSystem->nextBlock("TransferData");
00817 gMessageSystem->addUUID("TransferID", tsp->getID());
00818 gMessageSystem->addS32("ChannelType", getChannelType());
00819 gMessageSystem->addS32("Packet", packet_id);
00820 gMessageSystem->addS32("Status", status);
00821 gMessageSystem->addBinaryData("Data", datap, data_size);
00822 sent_bytes = gMessageSystem->getCurrentSendTotal();
00823 gMessageSystem->sendReliable(getHost(), LL_DEFAULT_RELIABLE_RETRIES, TRUE, 0.f,
00824 LLTransferManager::reliablePacketCallback, (void**)cb_uuid);
00825
00826
00827 done = tg.throttleOverflow(throttle_id, sent_bytes*8.f);
00828 gTransferManager.addTransferBitsOut(mChannelType, sent_bytes*8);
00829
00830
00831 if (delete_data)
00832 {
00833 delete[] datap;
00834 datap = NULL;
00835 }
00836
00837 if (findTransferSource(transaction_id) == NULL)
00838 {
00839
00840
00841
00842
00843 iter=next;
00844 continue;
00845 }
00846
00847
00848 tsp->setLastPacketID(packet_id);
00849
00850 switch (status)
00851 {
00852 case LLTS_OK:
00853
00854 break;
00855 case LLTS_ERROR:
00856 llwarns << "Error in transfer dataCallback!" << llendl;
00857 case LLTS_DONE:
00858
00859
00860 tsp->completionCallback(status);
00861 delete tsp;
00862
00863 mTransferSources.mMap.erase(iter);
00864 iter = next;
00865 break;
00866 default:
00867 llerrs << "Unknown transfer error code!" << llendl;
00868 }
00869
00870
00871
00872
00873 }
00874 }
00875
00876
00877 void LLTransferSourceChannel::addTransferSource(LLTransferSource *sourcep)
00878 {
00879 sourcep->mChannelp = this;
00880 mTransferSources.push(sourcep->getPriority(), sourcep);
00881 }
00882
00883
00884 LLTransferSource *LLTransferSourceChannel::findTransferSource(const LLUUID &transfer_id)
00885 {
00886 LLPriQueueMap<LLTransferSource *>::pqm_iter iter;
00887 for (iter = mTransferSources.mMap.begin(); iter != mTransferSources.mMap.end(); iter++)
00888 {
00889 LLTransferSource *tsp = iter->second;
00890 if (tsp->getID() == transfer_id)
00891 {
00892 return tsp;
00893 }
00894 }
00895 return NULL;
00896 }
00897
00898
00899 BOOL LLTransferSourceChannel::deleteTransfer(LLTransferSource *tsp)
00900 {
00901
00902 LLPriQueueMap<LLTransferSource *>::pqm_iter iter;
00903 for (iter = mTransferSources.mMap.begin(); iter != mTransferSources.mMap.end(); iter++)
00904 {
00905 if (iter->second == tsp)
00906 {
00907 delete tsp;
00908 mTransferSources.mMap.erase(iter);
00909 return TRUE;
00910 }
00911 }
00912
00913 llerrs << "Unable to find transfer source to delete!" << llendl;
00914 return FALSE;
00915 }
00916
00917
00918
00919
00920
00921
00922 LLTransferTargetChannel::LLTransferTargetChannel(const LLTransferChannelType channel_type, const LLHost &host) :
00923 mChannelType(channel_type),
00924 mHost(host)
00925 {
00926 }
00927
00928 LLTransferTargetChannel::~LLTransferTargetChannel()
00929 {
00930 tt_iter iter;
00931 for (iter = mTransferTargets.begin(); iter != mTransferTargets.end(); iter++)
00932 {
00933
00934 (*iter)->abortTransfer();
00935 delete *iter;
00936 }
00937 mTransferTargets.clear();
00938 }
00939
00940
00941 void LLTransferTargetChannel::requestTransfer(
00942 const LLTransferSourceParams& source_params,
00943 const LLTransferTargetParams& target_params,
00944 const F32 priority)
00945 {
00946 LLUUID id;
00947 id.generate();
00948 LLTransferTarget* ttp = LLTransferTarget::createTarget(
00949 target_params.getType(),
00950 id,
00951 source_params.getType());
00952 if (!ttp)
00953 {
00954 llwarns << "LLTransferManager::requestTransfer aborting due to target creation failure!" << llendl;
00955 return;
00956 }
00957
00958 ttp->applyParams(target_params);
00959 addTransferTarget(ttp);
00960
00961 sendTransferRequest(ttp, source_params, priority);
00962 }
00963
00964
00965 void LLTransferTargetChannel::sendTransferRequest(LLTransferTarget *targetp,
00966 const LLTransferSourceParams ¶ms,
00967 const F32 priority)
00968 {
00969
00970
00971
00972
00973 llassert(targetp);
00974 llassert(targetp->getChannel() == this);
00975
00976 gMessageSystem->newMessage("TransferRequest");
00977 gMessageSystem->nextBlock("TransferInfo");
00978 gMessageSystem->addUUID("TransferID", targetp->getID());
00979 gMessageSystem->addS32("SourceType", params.getType());
00980 gMessageSystem->addS32("ChannelType", getChannelType());
00981 gMessageSystem->addF32("Priority", priority);
00982
00983 U8 tmp[MAX_PARAMS_SIZE];
00984 LLDataPackerBinaryBuffer dp(tmp, MAX_PARAMS_SIZE);
00985 params.packParams(dp);
00986 S32 len = dp.getCurrentSize();
00987 gMessageSystem->addBinaryData("Params", tmp, len);
00988
00989 gMessageSystem->sendReliable(mHost);
00990 }
00991
00992
00993 void LLTransferTargetChannel::addTransferTarget(LLTransferTarget *targetp)
00994 {
00995 targetp->mChannelp = this;
00996 mTransferTargets.push_back(targetp);
00997 }
00998
00999
01000 LLTransferTarget *LLTransferTargetChannel::findTransferTarget(const LLUUID &transfer_id)
01001 {
01002 tt_iter iter;
01003 for (iter = mTransferTargets.begin(); iter != mTransferTargets.end(); iter++)
01004 {
01005 LLTransferTarget *ttp = *iter;
01006 if (ttp->getID() == transfer_id)
01007 {
01008 return ttp;
01009 }
01010 }
01011 return NULL;
01012 }
01013
01014
01015 BOOL LLTransferTargetChannel::deleteTransfer(LLTransferTarget *ttp)
01016 {
01017 tt_iter iter;
01018 for (iter = mTransferTargets.begin(); iter != mTransferTargets.end(); iter++)
01019 {
01020 if (*iter == ttp)
01021 {
01022 delete ttp;
01023 mTransferTargets.erase(iter);
01024 return TRUE;
01025 }
01026 }
01027
01028 llerrs << "Unable to find transfer target to delete!" << llendl;
01029 return FALSE;
01030 }
01031
01032
01033
01034
01035
01036
01037 LLTransferSource::LLTransferSource(const LLTransferSourceType type,
01038 const LLUUID &transfer_id,
01039 const F32 priority) :
01040 mType(type),
01041 mID(transfer_id),
01042 mChannelp(NULL),
01043 mPriority(priority),
01044 mSize(0),
01045 mLastPacketID(-1)
01046 {
01047 setPriority(priority);
01048 }
01049
01050
01051 LLTransferSource::~LLTransferSource()
01052 {
01053
01054
01055
01056 }
01057
01058
01059 void LLTransferSource::sendTransferStatus(LLTSCode status)
01060 {
01061 gMessageSystem->newMessage("TransferInfo");
01062 gMessageSystem->nextBlock("TransferInfo");
01063 gMessageSystem->addUUID("TransferID", getID());
01064 gMessageSystem->addS32("TargetType", LLTTT_UNKNOWN);
01065 gMessageSystem->addS32("ChannelType", mChannelp->getChannelType());
01066 gMessageSystem->addS32("Status", status);
01067 gMessageSystem->addS32("Size", mSize);
01068 U8 tmp[MAX_PARAMS_SIZE];
01069 LLDataPackerBinaryBuffer dp(tmp, MAX_PARAMS_SIZE);
01070 packParams(dp);
01071 S32 len = dp.getCurrentSize();
01072 gMessageSystem->addBinaryData("Params", tmp, len);
01073 gMessageSystem->sendReliable(mChannelp->getHost());
01074
01075
01076 if (status != LLTS_OK)
01077 {
01078 completionCallback(status);
01079 mChannelp->deleteTransfer(this);
01080 }
01081 }
01082
01083
01084
01085
01086
01087 void LLTransferSource::abortTransfer()
01088 {
01089
01090 llinfos << "LLTransferSource::Aborting transfer " << getID() << " to " << mChannelp->getHost() << llendl;
01091 gMessageSystem->newMessage("TransferAbort");
01092 gMessageSystem->nextBlock("TransferInfo");
01093 gMessageSystem->addUUID("TransferID", getID());
01094 gMessageSystem->addS32("ChannelType", mChannelp->getChannelType());
01095 gMessageSystem->sendReliable(mChannelp->getHost());
01096
01097 completionCallback(LLTS_ABORT);
01098 }
01099
01100
01101
01102 void LLTransferSource::registerSourceType(const LLTransferSourceType stype, LLTransferSourceCreateFunc func)
01103 {
01104 if (sSourceCreateMap.count(stype))
01105 {
01106
01107
01108 llerrs << "Reregistering source type " << stype << llendl;
01109 }
01110 else
01111 {
01112 sSourceCreateMap[stype] = func;
01113 }
01114 }
01115
01116
01117 LLTransferSource *LLTransferSource::createSource(const LLTransferSourceType stype,
01118 const LLUUID &id,
01119 const F32 priority)
01120 {
01121 switch (stype)
01122 {
01123
01124
01125
01126
01127
01128
01129 case LLTST_ASSET:
01130 return new LLTransferSourceAsset(id, priority);
01131 default:
01132 {
01133 if (!sSourceCreateMap.count(stype))
01134 {
01135
01136 llwarns << "Unknown transfer source type: " << stype << llendl;
01137 return NULL;
01138 }
01139 return (sSourceCreateMap[stype])(id, priority);
01140 }
01141 }
01142 }
01143
01144
01145
01146 void LLTransferSource::sSetPriority(LLTransferSource *&tsp, const F32 priority)
01147 {
01148 tsp->setPriority(priority);
01149 }
01150
01151
01152
01153 F32 LLTransferSource::sGetPriority(LLTransferSource *&tsp)
01154 {
01155 return tsp->getPriority();
01156 }
01157
01158
01159
01160
01161
01162
01163 LLTransferPacket::LLTransferPacket(const S32 packet_id, const LLTSCode status, const U8 *datap, const S32 size) :
01164 mPacketID(packet_id),
01165 mStatus(status),
01166 mDatap(NULL),
01167 mSize(size)
01168 {
01169 if (size == 0)
01170 {
01171 return;
01172 }
01173
01174 mDatap = new U8[size];
01175 if (mDatap != NULL)
01176 {
01177 memcpy(mDatap, datap, size);
01178 }
01179 }
01180
01181 LLTransferPacket::~LLTransferPacket()
01182 {
01183 delete[] mDatap;
01184 }
01185
01186
01187
01188
01189
01190 LLTransferTarget::LLTransferTarget(
01191 LLTransferTargetType type,
01192 const LLUUID& transfer_id,
01193 LLTransferSourceType source_type) :
01194 mType(type),
01195 mSourceType(source_type),
01196 mID(transfer_id),
01197 mGotInfo(FALSE),
01198 mSize(0),
01199 mLastPacketID(-1)
01200 {
01201 }
01202
01203 LLTransferTarget::~LLTransferTarget()
01204 {
01205
01206
01207
01208 tpm_iter iter;
01209 for (iter = mDelayedPacketMap.begin(); iter != mDelayedPacketMap.end(); iter++)
01210 {
01211 delete iter->second;
01212 }
01213 mDelayedPacketMap.clear();
01214 }
01215
01216
01217
01218
01219 void LLTransferTarget::abortTransfer()
01220 {
01221
01222 llinfos << "LLTransferTarget::Aborting transfer " << getID() << " from " << mChannelp->getHost() << llendl;
01223 gMessageSystem->newMessage("TransferAbort");
01224 gMessageSystem->nextBlock("TransferInfo");
01225 gMessageSystem->addUUID("TransferID", getID());
01226 gMessageSystem->addS32("ChannelType", mChannelp->getChannelType());
01227 gMessageSystem->sendReliable(mChannelp->getHost());
01228
01229 completionCallback(LLTS_ABORT);
01230 }
01231
01232 bool LLTransferTarget::addDelayedPacket(
01233 const S32 packet_id,
01234 const LLTSCode status,
01235 U8* datap,
01236 const S32 size)
01237 {
01238 const transfer_packet_map::size_type LL_MAX_DELAYED_PACKETS = 100;
01239 if(mDelayedPacketMap.size() > LL_MAX_DELAYED_PACKETS)
01240 {
01241
01242 return false;
01243 }
01244
01245 LLTransferPacket* tpp = new LLTransferPacket(
01246 packet_id,
01247 status,
01248 datap,
01249 size);
01250
01251 #ifdef _DEBUG
01252 if (mDelayedPacketMap.find(packet_id) != mDelayedPacketMap.end())
01253 {
01254 llerrs << "Packet ALREADY in delayed packet map!" << llendl;
01255 }
01256 #endif
01257
01258 mDelayedPacketMap[packet_id] = tpp;
01259 return true;
01260 }
01261
01262
01263 LLTransferTarget* LLTransferTarget::createTarget(
01264 LLTransferTargetType type,
01265 const LLUUID& id,
01266 LLTransferSourceType source_type)
01267 {
01268 switch (type)
01269 {
01270 case LLTTT_FILE:
01271 return new LLTransferTargetFile(id, source_type);
01272 case LLTTT_VFILE:
01273 return new LLTransferTargetVFile(id, source_type);
01274 default:
01275 llwarns << "Unknown transfer target type: " << type << llendl;
01276 return NULL;
01277 }
01278 }
01279
01280
01281 LLTransferSourceParamsInvItem::LLTransferSourceParamsInvItem() : LLTransferSourceParams(LLTST_SIM_INV_ITEM), mAssetType(LLAssetType::AT_NONE)
01282 {
01283 }
01284
01285
01286 void LLTransferSourceParamsInvItem::setAgentSession(const LLUUID &agent_id, const LLUUID &session_id)
01287 {
01288 mAgentID = agent_id;
01289 mSessionID = session_id;
01290 }
01291
01292
01293 void LLTransferSourceParamsInvItem::setInvItem(const LLUUID &owner_id, const LLUUID &task_id, const LLUUID &item_id)
01294 {
01295 mOwnerID = owner_id;
01296 mTaskID = task_id;
01297 mItemID = item_id;
01298 }
01299
01300
01301 void LLTransferSourceParamsInvItem::setAsset(const LLUUID &asset_id, const LLAssetType::EType asset_type)
01302 {
01303 mAssetID = asset_id;
01304 mAssetType = asset_type;
01305 }
01306
01307
01308 void LLTransferSourceParamsInvItem::packParams(LLDataPacker &dp) const
01309 {
01310 lldebugs << "LLTransferSourceParamsInvItem::packParams()" << llendl;
01311 dp.packUUID(mAgentID, "AgentID");
01312 dp.packUUID(mSessionID, "SessionID");
01313 dp.packUUID(mOwnerID, "OwnerID");
01314 dp.packUUID(mTaskID, "TaskID");
01315 dp.packUUID(mItemID, "ItemID");
01316 dp.packUUID(mAssetID, "AssetID");
01317 dp.packS32(mAssetType, "AssetType");
01318 }
01319
01320
01321 BOOL LLTransferSourceParamsInvItem::unpackParams(LLDataPacker &dp)
01322 {
01323 S32 tmp_at;
01324
01325 dp.unpackUUID(mAgentID, "AgentID");
01326 dp.unpackUUID(mSessionID, "SessionID");
01327 dp.unpackUUID(mOwnerID, "OwnerID");
01328 dp.unpackUUID(mTaskID, "TaskID");
01329 dp.unpackUUID(mItemID, "ItemID");
01330 dp.unpackUUID(mAssetID, "AssetID");
01331 dp.unpackS32(tmp_at, "AssetType");
01332
01333 mAssetType = (LLAssetType::EType)tmp_at;
01334
01335 return TRUE;
01336 }
01337
01338 LLTransferSourceParamsEstate::LLTransferSourceParamsEstate() :
01339 LLTransferSourceParams(LLTST_SIM_ESTATE),
01340 mEstateAssetType(ET_NONE),
01341 mAssetType(LLAssetType::AT_NONE)
01342 {
01343 }
01344
01345 void LLTransferSourceParamsEstate::setAgentSession(const LLUUID &agent_id, const LLUUID &session_id)
01346 {
01347 mAgentID = agent_id;
01348 mSessionID = session_id;
01349 }
01350
01351 void LLTransferSourceParamsEstate::setEstateAssetType(const EstateAssetType etype)
01352 {
01353 mEstateAssetType = etype;
01354 }
01355
01356 void LLTransferSourceParamsEstate::setAsset(const LLUUID &asset_id, const LLAssetType::EType asset_type)
01357 {
01358 mAssetID = asset_id;
01359 mAssetType = asset_type;
01360 }
01361
01362 void LLTransferSourceParamsEstate::packParams(LLDataPacker &dp) const
01363 {
01364 dp.packUUID(mAgentID, "AgentID");
01365
01366
01367
01368 dp.packUUID(mSessionID, "SessionID");
01369 dp.packS32(mEstateAssetType, "EstateAssetType");
01370 }
01371
01372
01373 BOOL LLTransferSourceParamsEstate::unpackParams(LLDataPacker &dp)
01374 {
01375 S32 tmp_et;
01376
01377 dp.unpackUUID(mAgentID, "AgentID");
01378 dp.unpackUUID(mSessionID, "SessionID");
01379 dp.unpackS32(tmp_et, "EstateAssetType");
01380
01381 mEstateAssetType = (EstateAssetType)tmp_et;
01382
01383 return TRUE;
01384 }