00001
00032 #include "linden_common.h"
00033
00034 #include "llpacketring.h"
00035
00036
00037 #include "llerror.h"
00038 #include "lltimer.h"
00039 #include "timing.h"
00040 #include "llrand.h"
00041 #include "u64.h"
00042
00044 LLPacketRing::LLPacketRing () :
00045 mUseInThrottle(FALSE),
00046 mUseOutThrottle(FALSE),
00047 mInThrottle(256000.f),
00048 mOutThrottle(64000.f),
00049 mActualBitsIn(0),
00050 mActualBitsOut(0),
00051 mMaxBufferLength(64000),
00052 mInBufferLength(0),
00053 mOutBufferLength(0),
00054 mDropPercentage(0.0f),
00055 mPacketsToDrop(0x0)
00056 {
00057 }
00058
00060 LLPacketRing::~LLPacketRing ()
00061 {
00062 free();
00063 }
00064
00066 void LLPacketRing::free ()
00067 {
00068 LLPacketBuffer *packetp;
00069
00070 while (!mReceiveQueue.empty())
00071 {
00072 packetp = mReceiveQueue.front();
00073 delete packetp;
00074 mReceiveQueue.pop();
00075 }
00076
00077 while (!mSendQueue.empty())
00078 {
00079 packetp = mSendQueue.front();
00080 delete packetp;
00081 mSendQueue.pop();
00082 }
00083 }
00084
00086 void LLPacketRing::dropPackets (U32 num_to_drop)
00087 {
00088 mPacketsToDrop += num_to_drop;
00089 }
00090
00092 void LLPacketRing::setDropPercentage (F32 percent_to_drop)
00093 {
00094 mDropPercentage = percent_to_drop;
00095 }
00096
00097 void LLPacketRing::setUseInThrottle(const BOOL use_throttle)
00098 {
00099 mUseInThrottle = use_throttle;
00100 }
00101
00102 void LLPacketRing::setUseOutThrottle(const BOOL use_throttle)
00103 {
00104 mUseOutThrottle = use_throttle;
00105 }
00106
00107 void LLPacketRing::setInBandwidth(const F32 bps)
00108 {
00109 mInThrottle.setRate(bps);
00110 }
00111
00112 void LLPacketRing::setOutBandwidth(const F32 bps)
00113 {
00114 mOutThrottle.setRate(bps);
00115 }
00117 S32 LLPacketRing::receiveFromRing (S32 socket, char *datap)
00118 {
00119
00120 if (mInThrottle.checkOverflow(0))
00121 {
00122
00123 return 0;
00124 }
00125
00126 LLPacketBuffer *packetp = NULL;
00127 if (mReceiveQueue.empty())
00128 {
00129
00130 return 0;
00131 }
00132
00133 S32 packet_size = 0;
00134 packetp = mReceiveQueue.front();
00135 mReceiveQueue.pop();
00136 packet_size = packetp->getSize();
00137 if (packetp->getData() != NULL)
00138 {
00139 memcpy(datap, packetp->getData(), packet_size);
00140 }
00141
00142 mLastSender = packetp->getHost();
00143 delete packetp;
00144
00145 this->mInBufferLength -= packet_size;
00146
00147
00148 mInThrottle.throttleOverflow(packet_size * 8.f);
00149 return packet_size;
00150 }
00151
00153 S32 LLPacketRing::receivePacket (S32 socket, char *datap)
00154 {
00155 S32 packet_size = 0;
00156
00157
00158 if (mUseInThrottle)
00159 {
00160 BOOL done = FALSE;
00161
00162
00163 while (!done)
00164 {
00165 LLPacketBuffer *packetp;
00166 packetp = new LLPacketBuffer(socket);
00167
00168 if (packetp->getSize())
00169 {
00170 mActualBitsIn += packetp->getSize() * 8;
00171
00172
00173 if (mDropPercentage && (ll_frand(100.f) < mDropPercentage))
00174 {
00175 mPacketsToDrop++;
00176 }
00177
00178 if (mPacketsToDrop)
00179 {
00180 delete packetp;
00181 packetp = NULL;
00182 packet_size = 0;
00183 mPacketsToDrop--;
00184 }
00185 }
00186
00187
00188
00189 if (packetp)
00190 {
00191 if (mInBufferLength + packetp->getSize() > mMaxBufferLength)
00192 {
00193
00194 llwarns << "Throwing away packet, overflowing buffer" << llendl;
00195 delete packetp;
00196 packetp = NULL;
00197 }
00198 else if (packetp->getSize())
00199 {
00200 mReceiveQueue.push(packetp);
00201 mInBufferLength += packetp->getSize();
00202 }
00203 else
00204 {
00205 delete packetp;
00206 packetp = NULL;
00207 done = true;
00208 }
00209 }
00210 else
00211 {
00212
00213 }
00214 }
00215
00216
00217
00218 packet_size = receiveFromRing(socket, datap);
00219 }
00220 else
00221 {
00222
00223 packet_size = receive_packet(socket, datap);
00224 mLastSender = ::get_sender();
00225
00226 if (packet_size)
00227 {
00228 if (mDropPercentage && (ll_frand(100.f) < mDropPercentage))
00229 {
00230 mPacketsToDrop++;
00231 }
00232
00233 if (mPacketsToDrop)
00234 {
00235 packet_size = 0;
00236 mPacketsToDrop--;
00237 }
00238 }
00239 }
00240
00241 return packet_size;
00242 }
00243
00244 BOOL LLPacketRing::sendPacket(int h_socket, char * send_buffer, S32 buf_size, LLHost host)
00245 {
00246 BOOL status = TRUE;
00247 if (!mUseOutThrottle)
00248 {
00249 return send_packet(h_socket, send_buffer, buf_size, host.getAddress(), host.getPort() );
00250 }
00251 else
00252 {
00253 mActualBitsOut += buf_size * 8;
00254 LLPacketBuffer *packetp = NULL;
00255
00256 while (!mOutThrottle.checkOverflow(0.f))
00257 {
00258
00259
00260 S32 packet_size = 0;
00261 if (!mSendQueue.empty())
00262 {
00263
00264 LLPacketBuffer *packetp = mSendQueue.front();
00265 mSendQueue.pop();
00266
00267 mOutBufferLength -= packetp->getSize();
00268 packet_size = packetp->getSize();
00269
00270 status = send_packet(h_socket, packetp->getData(), packet_size, packetp->getHost().getAddress(), packetp->getHost().getPort());
00271
00272 delete packetp;
00273
00274 mOutThrottle.throttleOverflow(packet_size * 8.f);
00275 }
00276 else
00277 {
00278
00279 status = send_packet(h_socket, send_buffer, buf_size, host.getAddress(), host.getPort() );
00280 packet_size = buf_size;
00281
00282
00283 mOutThrottle.throttleOverflow(packet_size * 8.f);
00284
00285
00286
00287 return status;
00288 }
00289
00290 }
00291
00292
00293 if (mOutBufferLength + buf_size > mMaxBufferLength)
00294 {
00295
00296
00297 llwarns << "Throwing away outbound packet, overflowing buffer" << llendl;
00298 }
00299 else
00300 {
00301 static LLTimer queue_timer;
00302 if ((mOutBufferLength > 4192) && queue_timer.getElapsedTimeF32() > 1.f)
00303 {
00304
00305 llinfos << "Outbound packet queue " << mOutBufferLength << " bytes" << llendl;
00306 queue_timer.reset();
00307 }
00308 packetp = new LLPacketBuffer(host, send_buffer, buf_size);
00309
00310 mOutBufferLength += packetp->getSize();
00311 mSendQueue.push(packetp);
00312 }
00313 }
00314
00315 return status;
00316 }