00001 """\
00002 @file saranwrap.py
00003 @author Phoenix
00004 @date 2007-07-13
00005 @brief A simple, pickle based rpc mechanism which reflects python
00006 objects and callables.
00007
00008 $LicenseInfo:firstyear=2007&license=mit$
00009
00010 Copyright (c) 2007-2008, Linden Research, Inc.
00011
00012 Permission is hereby granted, free of charge, to any person obtaining a copy
00013 of this software and associated documentation files (the "Software"), to deal
00014 in the Software without restriction, including without limitation the rights
00015 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
00016 copies of the Software, and to permit persons to whom the Software is
00017 furnished to do so, subject to the following conditions:
00018
00019 The above copyright notice and this permission notice shall be included in
00020 all copies or substantial portions of the Software.
00021
00022 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
00023 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
00024 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
00025 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
00026 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
00027 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
00028 THE SOFTWARE.
00029 $/LicenseInfo$
00030
00031 This file provides classes and exceptions used for simple python level
00032 remote procedure calls. This is achieved by intercepting the basic
00033 getattr and setattr calls in a client proxy, which commnicates those
00034 down to the server which will dispatch them to objects in it's process
00035 space.
00036
00037 The basic protocol to get and set attributes is for the client proxy
00038 to issue the command:
00039
00040 getattr $id $name
00041 setattr $id $name $value
00042
00043 getitem $id $item
00044 setitem $id $item $value
00045 eq $id $rhs
00046 del $id
00047
00048 When the get returns a callable, the client proxy will provide a
00049 callable proxy which will invoke a remote procedure call. The command
00050 issued from the callable proxy to server is:
00051
00052 call $id $name $args $kwargs
00053
00054 If the client supplies an id of None, then the get/set/call is applied
00055 to the object(s) exported from the server.
00056
00057 The server will parse the get/set/call, take the action indicated, and
00058 return back to the caller one of:
00059
00060 value $val
00061 callable
00062 object $id
00063 exception $excp
00064
00065 To handle object expiration, the proxy will instruct the rpc server to
00066 discard objects which are no longer in use. This is handled by
00067 catching proxy deletion and sending the command:
00068
00069 del $id
00070
00071 The server will handle this by removing clearing it's own internal
00072 references. This does not mean that the object will necessarily be
00073 cleaned from the server, but no artificial references will remain
00074 after successfully completing. On completion, the server will return
00075 one of:
00076
00077 value None
00078 exception $excp
00079
00080 The server also accepts a special command for debugging purposes:
00081
00082 status
00083
00084 Which will be intercepted by the server to write back:
00085
00086 status {...}
00087
00088 The wire protocol is to pickle the Request class in this file. The
00089 request class is basically an action and a map of parameters'
00090 """
00091
00092 import os
00093 import cPickle
00094 import struct
00095 import sys
00096
00097 try:
00098 set = set
00099 frozenset = frozenset
00100 except NameError:
00101 from sets import Set as set, ImmutableSet as frozenset
00102
00103 from eventlet.processes import Process
00104 from eventlet import api
00105
00106
00107
00108
00109 _g_debug_mode = False
00110 if _g_debug_mode:
00111 import traceback
00112
00113 def pythonpath_sync():
00114 """
00115 @brief apply the current sys.path to the environment variable PYTHONPATH, so that child processes have the same paths as the caller does.
00116 """
00117 pypath = os.pathsep.join(sys.path)
00118 os.environ['PYTHONPATH'] = pypath
00119
00120 def wrap(obj, dead_callback = None):
00121 """
00122 @brief wrap in object in another process through a saranwrap proxy
00123 @param object The object to wrap.
00124 @param dead_callback A callable to invoke if the process exits."""
00125
00126 if type(obj).__name__ == 'module':
00127 return wrap_module(obj.__name__, dead_callback)
00128 pythonpath_sync()
00129 p = Process('python', [__file__, '--child'], dead_callback)
00130 prox = Proxy(p, p)
00131 prox.obj = obj
00132 return prox.obj
00133
00134 def wrap_module(fqname, dead_callback = None):
00135 """
00136 @brief wrap a module in another process through a saranwrap proxy
00137 @param fqname The fully qualified name of the module.
00138 @param dead_callback A callable to invoke if the process exits."""
00139 pythonpath_sync()
00140 global _g_debug_mode
00141 if _g_debug_mode:
00142 p = Process('python', [__file__, '--module', fqname, '--logfile', '/tmp/saranwrap.log'], dead_callback)
00143 else:
00144 p = Process('python', [__file__, '--module', fqname,], dead_callback)
00145 prox = Proxy(p, p)
00146 return prox
00147
00148 def status(proxy):
00149 """
00150 @brief get the status from the server through a proxy
00151 @param proxy a saranwrap.Proxy object connected to a server."""
00152 _write_request(Request('status', {}), proxy.__local_dict['_out'])
00153 return _read_response(None, None, proxy.__local_dict['_in'], proxy.__local_dict['_out'], None)
00154
00155 class BadResponse(Exception):
00156 """"This exception is raised by an saranwrap client when it could
00157 parse but cannot understand the response from the server."""
00158 pass
00159
00160 class BadRequest(Exception):
00161 """"This exception is raised by a saranwrap server when it could parse
00162 but cannot understand the response from the server."""
00163 pass
00164
00165 class UnrecoverableError(Exception):
00166 pass
00167
00168 class Request(object):
00169 "@brief A wrapper class for proxy requests to the server."
00170 def __init__(self, action, param):
00171 self._action = action
00172 self._param = param
00173 def __str__(self):
00174 return "Request `"+self._action+"` "+str(self._param)
00175 def __getitem__(self, name):
00176 return self._param[name]
00177 def action(self):
00178 return self._action
00179
00180 def _read_lp_hunk(stream):
00181 len_bytes = stream.read(4)
00182 length = struct.unpack('I', len_bytes)[0]
00183 body = stream.read(length)
00184 return body
00185
00186 def _read_response(id, attribute, input, output, dead_list):
00187 """@brief local helper method to read respones from the rpc server."""
00188 try:
00189 str = _read_lp_hunk(input)
00190 _prnt(`str`)
00191 response = cPickle.loads(str)
00192 except AttributeError, e:
00193 raise UnrecoverableError(e)
00194 _prnt("response: %s" % response)
00195 if response[0] == 'value':
00196 return response[1]
00197 elif response[0] == 'callable':
00198 return CallableProxy(id, attribute, input, output, dead_list)
00199 elif response[0] == 'object':
00200 return ObjectProxy(input, output, response[1], dead_list)
00201 elif response[0] == 'exception':
00202 exp = response[1]
00203 raise exp
00204 else:
00205 raise BadResponse(response[0])
00206
00207 def _write_lp_hunk(stream, hunk):
00208 write_length = struct.pack('I', len(hunk))
00209 stream.write(write_length + hunk)
00210 if hasattr(stream, 'flush'):
00211 stream.flush()
00212
00213 def _write_request(param, output):
00214 _prnt("request: %s" % param)
00215 str = cPickle.dumps(param)
00216 _write_lp_hunk(output, str)
00217
00218 def _is_local(attribute):
00219 "Return true if the attribute should be handled locally"
00220
00221
00222 if '__local_dict' in attribute:
00223 return True
00224 return False
00225
00226 def _prnt(message):
00227 global _g_debug_mode
00228 if _g_debug_mode:
00229 print message
00230
00231 _g_logfile = None
00232 def _log(message):
00233 global _g_logfile
00234 if _g_logfile:
00235 _g_logfile.write(str(os.getpid()) + ' ' + message)
00236 _g_logfile.write('\n')
00237 _g_logfile.flush()
00238
00239 def _unmunge_attr_name(name):
00240 """ Sometimes attribute names come in with classname prepended, not sure why.
00241 This function removes said classname, because we're huge hackers and we didn't
00242 find out what the true right thing to do is. *FIX: find out. """
00243 if(name.startswith('_Proxy')):
00244 name = name[len('_Proxy'):]
00245 if(name.startswith('_ObjectProxy')):
00246 name = name[len('_ObjectProxy'):]
00247 return name
00248
00249
00250 class Proxy(object):
00251 """\
00252 @class Proxy
00253 @brief This class wraps a remote python process, presumably available
00254 in an instance of an Server.
00255
00256 This is the class you will typically use as a client to a child
00257 process. Simply instantiate one around a file-like interface and start
00258 calling methods on the thing that is exported. The dir() builtin is
00259 not supported, so you have to know what has been exported.
00260 """
00261 def __init__(self, input, output, dead_list = None):
00262 """\
00263 @param input a file-like object which supports read().
00264 @param output a file-like object which supports write() and flush().
00265 @param id an identifier for the remote object. humans do not provide this.
00266 """
00267
00268
00269 if dead_list is None:
00270 dead_list = set()
00271
00272 self.__local_dict = dict(
00273 _in = input,
00274 _out = output,
00275 _dead_list = dead_list,
00276 _id = None)
00277
00278 def __getattribute__(self, attribute):
00279
00280 if _is_local(attribute):
00281
00282 attribute = _unmunge_attr_name(attribute)
00283 return super(Proxy, self).__getattribute__(attribute)
00284 else:
00285 my_in = self.__local_dict['_in']
00286 my_out = self.__local_dict['_out']
00287 my_id = self.__local_dict['_id']
00288
00289 _dead_list = self.__local_dict['_dead_list']
00290 for dead_object in _dead_list.copy():
00291 request = Request('del', {'id':dead_object})
00292 _write_request(request, my_out)
00293 response = _read_response(my_id, attribute, my_in, my_out, _dead_list)
00294 _dead_list.remove(dead_object)
00295
00296
00297
00298 request = Request('getattr', {'id':my_id, 'attribute':attribute})
00299 _write_request(request, my_out)
00300 return _read_response(my_id, attribute, my_in, my_out, _dead_list)
00301
00302 def __setattr__(self, attribute, value):
00303
00304 if _is_local(attribute):
00305
00306
00307 attribute = _unmunge_attr_name(attribute)
00308 super(Proxy, self).__getattribute__('__dict__')[attribute]=value
00309 else:
00310 my_in = self.__local_dict['_in']
00311 my_out = self.__local_dict['_out']
00312 my_id = self.__local_dict['_id']
00313 _dead_list = self.__local_dict['_dead_list']
00314
00315 request = Request('setattr', {'id':my_id, 'attribute':attribute, 'value':value})
00316 _write_request(request, my_out)
00317 return _read_response(my_id, attribute, my_in, my_out, _dead_list)
00318
00319 class ObjectProxy(Proxy):
00320 """\
00321 @class ObjectProxy
00322 @brief This class wraps a remote object in the Server
00323
00324 This class will be created during normal operation, and users should
00325 not need to deal with this class directly."""
00326
00327 def __init__(self, input, output, id, dead_list):
00328 """\
00329 @param input a file-like object which supports read().
00330 @param output a file-like object which supports write() and flush().
00331 @param id an identifier for the remote object. humans do not provide this.
00332 """
00333 Proxy.__init__(self, input, output, dead_list)
00334 self.__local_dict['_id'] = id
00335
00336
00337 def __del__(self):
00338 my_id = self.__local_dict['_id']
00339 _prnt("ObjectProxy::__del__ %s" % my_id)
00340 self.__local_dict['_dead_list'].add(my_id)
00341
00342 def __getitem__(self, key):
00343 my_in = self.__local_dict['_in']
00344 my_out = self.__local_dict['_out']
00345 my_id = self.__local_dict['_id']
00346 _dead_list = self.__local_dict['_dead_list']
00347 request = Request('getitem', {'id':my_id, 'key':key})
00348 _write_request(request, my_out)
00349 return _read_response(my_id, key, my_in, my_out, _dead_list)
00350
00351 def __setitem__(self, key, value):
00352 my_in = self.__local_dict['_in']
00353 my_out = self.__local_dict['_out']
00354 my_id = self.__local_dict['_id']
00355 _dead_list = self.__local_dict['_dead_list']
00356 request = Request('setitem', {'id':my_id, 'key':key, 'value':value})
00357 _write_request(request, my_out)
00358 return _read_response(my_id, key, my_in, my_out, _dead_list)
00359
00360 def __eq__(self, rhs):
00361 my_in = self.__local_dict['_in']
00362 my_out = self.__local_dict['_out']
00363 my_id = self.__local_dict['_id']
00364 _dead_list = self.__local_dict['_dead_list']
00365 request = Request('eq', {'id':my_id, 'rhs':rhs.__local_dict['_id']})
00366 _write_request(request, my_out)
00367 return _read_response(my_id, None, my_in, my_out, _dead_list)
00368
00369 def __repr__(self):
00370
00371
00372
00373 val = self.__repr__()
00374 return "saran:%s" % val
00375
00376 def __str__(self):
00377
00378
00379 return self.__str__()
00380
00381 def __len__(self):
00382
00383
00384 return self.__len__()
00385
00386 def proxied_type(self):
00387 if type(self) is not ObjectProxy:
00388 return type(self)
00389
00390 my_in = self.__local_dict['_in']
00391 my_out = self.__local_dict['_out']
00392 my_id = self.__local_dict['_id']
00393 request = Request('type', {'id':my_id})
00394 _write_request(request, my_out)
00395
00396
00397 return _read_response(my_id, None, my_in, my_out, None)
00398
00399 class CallableProxy(object):
00400 """\
00401 @class CallableProxy
00402 @brief This class wraps a remote function in the Server
00403
00404 This class will be created by an Proxy during normal operation,
00405 and users should not need to deal with this class directly."""
00406
00407 def __init__(self, object_id, name, input, output, dead_list):
00408
00409 self._object_id = object_id
00410 self._name = name
00411 self._in = input
00412 self._out = output
00413 self._dead_list = dead_list
00414
00415 def __call__(self, *args, **kwargs):
00416
00417
00418
00419
00420
00421
00422 request = Request('call', {'id':self._object_id, 'name':self._name, 'args':args, 'kwargs':kwargs})
00423 _write_request(request, self._out)
00424 return _read_response(self._object_id, self._name, self._in, self._out, self._dead_list)
00425
00426 class Server(object):
00427 def __init__(self, input, output, export):
00428 """\
00429 @param input a file-like object which supports read().
00430 @param output a file-like object which supports write() and flush().
00431 @param export an object, function, or map which is exported to clients
00432 when the id is None."""
00433
00434 self._in = input
00435 self._out = output
00436 self._export = export
00437 self._next_id = 1
00438 self._objects = {}
00439
00440 def handle_status(self, object, req):
00441 return {
00442 'object_count':len(self._objects),
00443 'next_id':self._next_id,
00444 'pid':os.getpid()}
00445
00446 def handle_getattr(self, object, req):
00447 try:
00448 return getattr(object, req['attribute'])
00449 except AttributeError, e:
00450 if hasattr(object, "__getitem__"):
00451 return object[req['attribute']]
00452 else:
00453 raise e
00454
00455
00456 def handle_setattr(self, object, req):
00457 try:
00458 return setattr(object, req['attribute'], req['value'])
00459 except AttributeError, e:
00460 if hasattr(object, "__setitem__"):
00461 return object.__setitem__(req['attribute'], req['value'])
00462 else:
00463 raise e
00464
00465 def handle_getitem(self, object, req):
00466 return object[req['key']]
00467
00468 def handle_setitem(self, object, req):
00469 object[req['key']] = req['value']
00470 return None
00471
00472 def handle_eq(self, object, req):
00473
00474 rhs = None
00475 try:
00476 rhs = self._objects[req['rhs']]
00477 except KeyError, e:
00478 return False
00479 return (object == rhs)
00480
00481 def handle_call(self, object, req):
00482
00483 try:
00484 fn = getattr(object, req['name'])
00485 except AttributeError, e:
00486 if hasattr(object, "__setitem__"):
00487 fn = object[req['name']]
00488 else:
00489 raise e
00490
00491 return fn(*req['args'],**req['kwargs'])
00492
00493 def handle_del(self, object, req):
00494 id = req['id']
00495 _log("del %s from %s" % (id, self._objects))
00496
00497
00498 del self._objects[id]
00499 return None
00500
00501 def handle_type(self, object, req):
00502 return type(object)
00503
00504 def loop(self):
00505 """@brief Loop forever and respond to all requests."""
00506 _log("Server::loop")
00507 while True:
00508 try:
00509 try:
00510 str = _read_lp_hunk(self._in)
00511 except EOFError:
00512 sys.exit(0)
00513 request = cPickle.loads(str)
00514 _log("request: %s (%s)" % (request, self._objects))
00515 req = request
00516 id = None
00517 object = None
00518 try:
00519 id = req['id']
00520 if id:
00521 id = int(id)
00522 object = self._objects[id]
00523
00524 except Exception, e:
00525
00526 pass
00527 if object is None or id is None:
00528 id = None
00529 object = self._export
00530
00531
00532
00533 handler_name = 'handle_%s' % request.action()
00534
00535 try:
00536 handler = getattr(self, handler_name)
00537 except AttributeError:
00538 raise BadRequest, request.action()
00539
00540 response = handler(object, request)
00541
00542
00543
00544 if request.action() in ['status', 'type']:
00545
00546
00547 self.respond(['value', response])
00548 elif callable(response):
00549
00550 self.respond(['callable'])
00551 elif self.is_value(response):
00552 self.respond(['value', response])
00553 else:
00554 self._objects[self._next_id] = response
00555
00556 self.respond(['object', self._next_id])
00557 self._next_id += 1
00558 except SystemExit, e:
00559 raise e
00560 except Exception, e:
00561 self.write_exception(e)
00562 except:
00563 self.write_exception(sys.exc_info()[0])
00564
00565 def is_value(self, value):
00566 """\
00567 @brief Test if value should be serialized as a simple dataset.
00568 @param value The value to test.
00569 @return Returns true if value is a simple serializeable set of data.
00570 """
00571 return type(value) in (str,unicode,int,float,long,bool,type(None))
00572
00573 def respond(self, body):
00574 _log("responding with: %s" % body)
00575
00576 s = cPickle.dumps(body)
00577 _log(`s`)
00578 str = _write_lp_hunk(self._out, s)
00579
00580 def write_exception(self, e):
00581 """@brief Helper method to respond with an exception."""
00582
00583
00584 self.respond(['exception', e])
00585 global _g_debug_mode
00586 if _g_debug_mode:
00587 _log("traceback: %s" % traceback.format_tb(sys.exc_info()[2]))
00588
00589
00590
00591 def raise_a_weird_error():
00592 raise "oh noes you can raise a string"
00593
00594
00595 def raise_an_unpicklable_error():
00596 class Unpicklable(Exception):
00597 pass
00598 raise Unpicklable()
00599
00600
00601 def raise_standard_error():
00602 raise FloatingPointError()
00603
00604
00605 def print_string(str):
00606 print str
00607
00608
00609
00610 def err_string(str):
00611 print >>sys.stderr, str
00612
00613 def main():
00614 import optparse
00615 parser = optparse.OptionParser(
00616 usage="usage: %prog [options]",
00617 description="Simple saranwrap.Server wrapper")
00618 parser.add_option(
00619 '-c', '--child', default=False, action='store_true',
00620 help='Wrap an object serialed via setattr.')
00621 parser.add_option(
00622 '-m', '--module', type='string', dest='module', default=None,
00623 help='a module to load and export.')
00624 parser.add_option(
00625 '-l', '--logfile', type='string', dest='logfile', default=None,
00626 help='file to log to.')
00627 options, args = parser.parse_args()
00628 global _g_logfile
00629 if options.logfile:
00630 _g_logfile = open(options.logfile, 'a')
00631 if options.module:
00632 export = api.named(options.module)
00633 server = Server(sys.stdin, sys.stdout, export)
00634 elif options.child:
00635 server = Server(sys.stdin, sys.stdout, {})
00636
00637
00638 class NullSTDOut(object):
00639 def write(a, b):
00640 pass
00641 sys.stderr = NullSTDOut()
00642 sys.stdout = NullSTDOut()
00643
00644
00645 server.loop()
00646 if _g_logfile:
00647 _g_logfile.close()
00648
00649
00650 if __name__ == "__main__":
00651 main()