################################################## # SPYCE - Python-based HTML Scripting # Copyright (c) 2002 Rimon Barr. # # Refer to spyce.py ################################################## import os.path, sys, re, string, imp, pydoc, time, shutil from cStringIO import StringIO __doc__ = '''Spyce utility functions''' ################################################## # Misc module_mtimes = {} # allows passing config module directly so you can avoid # starting a server instance if you like. spyceWWW does this. def scan_modules(): modified = [] for name, module in sys.modules.items(): try: filename = os.path.realpath(module.__file__) except AttributeError: continue try: mtime = os.path.getmtime(filename) except OSError: continue if filename.endswith('.pyc') or filename.endswith('.pyo'): pyfile = filename[:-1] if os.path.exists(pyfile): mtime = max(mtime, os.path.getmtime(pyfile)) try: cachedtime = module_mtimes[filename] except: cachedtime = mtime if cachedtime < mtime: modified.append(name) import spyce spyce.DEBUG('found changed python module %s from %s' % (name, filename)) module_mtimes[filename] = mtime return modified def tryForAwhile(callable, tries=10, pause=1): for i in range(tries): try: return callable() except: time.sleep(pause) return None def doc(thing, forceload=0): """Display text documentation, given an object or a path to an object.""" s = "" object, name = pydoc.resolve(thing, forceload) desc = pydoc.describe(object) module = pydoc.inspect.getmodule(object) if name and '.' in name: desc += ' in ' + name[:name.rfind('.')] elif module and module is not object: desc += ' in module ' + module.__name__ s += pydoc.html.document(object, name) return s class attrdict(dict): def __getattr__(self, name): return self[name] def __setattr__(self, name, value): self[name] = value def isTagCollection(f): while True: line = f.readline() if not line: return False if re.search(r'\S', line): break f.seek(0) return re.match(r'\s*\[\[\.\s*tagcollection', line, re.I) def argsForSpawn(args): if sys.platform == 'win32': # spawn doesn't escape spaces on win32 args = ['"' + re.sub('"', r'\"', i) + '"' for i in args] return args def url2file(url, relativeto=None): """ Returns the file corresponding to url. If url is relative to the current file (i.e., url does not start with "/"), Spyce needs to know the current file to give the correct answer. """ if url.startswith('/'): L = url[1:].split('/') import spyce base = spyce.getServer().config.root else: if not relativeto: raise Exception('unable to determine relative path inside anonymous Spyce') L = url.split('/') base = os.path.dirname(relativeto) return os.path.realpath(os.path.join(base, *L)) ################################################## # Current exception string def exceptionString(): "Generate string out of current exception." # (basically the equivalent of the python 2.4 traceback.format_exc) import traceback, string info = traceback.format_exception(*sys.exc_info()) return string.join(info, '') ################################################## # Return hashtable value, or entire hashtable def extractValue(hash, key, default=None): """Extract value from dictionary, if it exists. If key is none, return entire dictionary""" if key is None: if isinstance(hash, dict): return hash else: return dict(hash) # can't use hash.get; some half-assed classes, like the one mod_python # passes as the env, don't support it if hash.has_key(key): return hash[key] return default ################################################## # Return hashtable value, or entire hashtable RE_SPACE_REDUCE = re.compile('[ \t][ \t]+') RE_SPACE_NEWLINE_REDUCE = re.compile('\n\s+') def spaceCompact(text): text = string.split(text, '\n') text = map(lambda s: RE_SPACE_REDUCE.sub(' ', s), text) text = string.join(text, '\n') text = RE_SPACE_NEWLINE_REDUCE.sub('\n', text) return text ################################################## # Threading helpers class ThreadedWriter: '''Thread-safe writer''' def __init__(self, o=None): import threading self.__dict__['_currentThread'] = threading.currentThread self.__dict__['_o'] = o def setObject(self, o=None): self._currentThread().threadOut = o self._currentThread().threadWrite = o.write def getObject(self): try: return self._currentThread().threadOut except AttributeError: return self._o def clearObject(self): try: del self._currentThread().threadOut except AttributeError: pass def write(self, s): try: self._currentThread().threadWrite(s) except AttributeError: self._o.write(s) def close(self): self.getObject().close() def flush(self): self.getObject().flush() def __getattr__(self, name): if name=='softspace': # performance return self.getObject().softspace return eval('self.getObject().%s'%name) def __setattr__(self, name, value): if name=='softspace': # performance self.getObject().softspace = value eval('self.getObject().%s=value'%name) def __delattr__(self, name): return eval('del self.getObject().%s'%name) ################################################## # Output class BufferedOutput: "Buffered output stream." def __init__(self, out): self.buf = StringIO() self.writeBuf = self.buf.write self.out = out self.closed = 0 def write(self, s): if self.closed: raise 'output stream closed' self.writeBuf(s) def clear(self): if not self.buf: raise 'stream is not buffered' self.buf = StringIO() self.writeBuf = self.buf.write def flush(self, stopFlag=0): if stopFlag: return if self.buf and self.buf.getvalue(): self.out.write(self.buf.getvalue()) self.out.flush() self.clear() def close(self): if self.closed: raise 'output stream closed' self.closed = 1 self.flush() self.out.close() def unbuffer(self): "Turn this into a pass-through." if self.buf: self.flush() self.buf = None self.writeBuf = self.out.write def getOut(self): "Return underlying output stream." return self.out class NoCloseOut: def __init__(self, out): self.out = out self.write = self.out.write self.flush = self.out.flush def close(self): pass def getOut(self): return self.out def panicOutput(response, s): import cgi # output to browser, if possible try: response.clear() except: pass try: response.write('
\n')
    response.write('Spyce Panic!
\n') response.write(cgi.escape(s)) response.write('
\n') response.returncode = response.RETURN_OK response.flush() except: pass # output to error log sys.stderr.write(s) sys.stderr.flush() sys.exit(1)