Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/websdk/mercurial/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'websdk/mercurial/util.py')
-rw-r--r--[l---------]websdk/mercurial/util.py1742
1 files changed, 1741 insertions, 1 deletions
diff --git a/websdk/mercurial/util.py b/websdk/mercurial/util.py
index e86996a..7366614 120000..100644
--- a/websdk/mercurial/util.py
+++ b/websdk/mercurial/util.py
@@ -1 +1,1741 @@
-/usr/share/pyshared/mercurial/util.py \ No newline at end of file
+# util.py - Mercurial utility functions and platform specfic implementations
+#
+# Copyright 2005 K. Thananchayan <thananck@yahoo.com>
+# Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
+# Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+"""Mercurial utility functions and platform specfic implementations.
+
+This contains helper routines that are independent of the SCM core and
+hide platform-specific details from the core.
+"""
+
+from i18n import _
+import error, osutil, encoding
+import errno, re, shutil, sys, tempfile, traceback
+import os, time, datetime, calendar, textwrap, signal
+import imp, socket, urllib
+
+if os.name == 'nt':
+ import windows as platform
+else:
+ import posix as platform
+
+cachestat = platform.cachestat
+checkexec = platform.checkexec
+checklink = platform.checklink
+copymode = platform.copymode
+executablepath = platform.executablepath
+expandglobs = platform.expandglobs
+explainexit = platform.explainexit
+findexe = platform.findexe
+gethgcmd = platform.gethgcmd
+getuser = platform.getuser
+groupmembers = platform.groupmembers
+groupname = platform.groupname
+hidewindow = platform.hidewindow
+isexec = platform.isexec
+isowner = platform.isowner
+localpath = platform.localpath
+lookupreg = platform.lookupreg
+makedir = platform.makedir
+nlinks = platform.nlinks
+normpath = platform.normpath
+normcase = platform.normcase
+nulldev = platform.nulldev
+openhardlinks = platform.openhardlinks
+oslink = platform.oslink
+parsepatchoutput = platform.parsepatchoutput
+pconvert = platform.pconvert
+popen = platform.popen
+posixfile = platform.posixfile
+quotecommand = platform.quotecommand
+realpath = platform.realpath
+rename = platform.rename
+samedevice = platform.samedevice
+samefile = platform.samefile
+samestat = platform.samestat
+setbinary = platform.setbinary
+setflags = platform.setflags
+setsignalhandler = platform.setsignalhandler
+shellquote = platform.shellquote
+spawndetached = platform.spawndetached
+sshargs = platform.sshargs
+statfiles = platform.statfiles
+termwidth = platform.termwidth
+testpid = platform.testpid
+umask = platform.umask
+unlink = platform.unlink
+unlinkpath = platform.unlinkpath
+username = platform.username
+
+# Python compatibility
+
+def sha1(s=''):
+ '''
+ Low-overhead wrapper around Python's SHA support
+
+ >>> f = _fastsha1
+ >>> a = sha1()
+ >>> a = f()
+ >>> a.hexdigest()
+ 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
+ '''
+
+ return _fastsha1(s)
+
+_notset = object()
+def safehasattr(thing, attr):
+ return getattr(thing, attr, _notset) is not _notset
+
+def _fastsha1(s=''):
+ # This function will import sha1 from hashlib or sha (whichever is
+ # available) and overwrite itself with it on the first call.
+ # Subsequent calls will go directly to the imported function.
+ if sys.version_info >= (2, 5):
+ from hashlib import sha1 as _sha1
+ else:
+ from sha import sha as _sha1
+ global _fastsha1, sha1
+ _fastsha1 = sha1 = _sha1
+ return _sha1(s)
+
+import __builtin__
+
+if sys.version_info[0] < 3:
+ def fakebuffer(sliceable, offset=0):
+ return sliceable[offset:]
+else:
+ def fakebuffer(sliceable, offset=0):
+ return memoryview(sliceable)[offset:]
+try:
+ buffer
+except NameError:
+ __builtin__.buffer = fakebuffer
+
+import subprocess
+closefds = os.name == 'posix'
+
+def popen2(cmd, env=None, newlines=False):
+ # Setting bufsize to -1 lets the system decide the buffer size.
+ # The default for bufsize is 0, meaning unbuffered. This leads to
+ # poor performance on Mac OS X: http://bugs.python.org/issue4194
+ p = subprocess.Popen(cmd, shell=True, bufsize=-1,
+ close_fds=closefds,
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ universal_newlines=newlines,
+ env=env)
+ return p.stdin, p.stdout
+
+def popen3(cmd, env=None, newlines=False):
+ p = subprocess.Popen(cmd, shell=True, bufsize=-1,
+ close_fds=closefds,
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ universal_newlines=newlines,
+ env=env)
+ return p.stdin, p.stdout, p.stderr
+
+def version():
+ """Return version information if available."""
+ try:
+ import __version__
+ return __version__.version
+ except ImportError:
+ return 'unknown'
+
+# used by parsedate
+defaultdateformats = (
+ '%Y-%m-%d %H:%M:%S',
+ '%Y-%m-%d %I:%M:%S%p',
+ '%Y-%m-%d %H:%M',
+ '%Y-%m-%d %I:%M%p',
+ '%Y-%m-%d',
+ '%m-%d',
+ '%m/%d',
+ '%m/%d/%y',
+ '%m/%d/%Y',
+ '%a %b %d %H:%M:%S %Y',
+ '%a %b %d %I:%M:%S%p %Y',
+ '%a, %d %b %Y %H:%M:%S', # GNU coreutils "/bin/date --rfc-2822"
+ '%b %d %H:%M:%S %Y',
+ '%b %d %I:%M:%S%p %Y',
+ '%b %d %H:%M:%S',
+ '%b %d %I:%M:%S%p',
+ '%b %d %H:%M',
+ '%b %d %I:%M%p',
+ '%b %d %Y',
+ '%b %d',
+ '%H:%M:%S',
+ '%I:%M:%S%p',
+ '%H:%M',
+ '%I:%M%p',
+)
+
+extendeddateformats = defaultdateformats + (
+ "%Y",
+ "%Y-%m",
+ "%b",
+ "%b %Y",
+ )
+
+def cachefunc(func):
+ '''cache the result of function calls'''
+ # XXX doesn't handle keywords args
+ cache = {}
+ if func.func_code.co_argcount == 1:
+ # we gain a small amount of time because
+ # we don't need to pack/unpack the list
+ def f(arg):
+ if arg not in cache:
+ cache[arg] = func(arg)
+ return cache[arg]
+ else:
+ def f(*args):
+ if args not in cache:
+ cache[args] = func(*args)
+ return cache[args]
+
+ return f
+
+def lrucachefunc(func):
+ '''cache most recent results of function calls'''
+ cache = {}
+ order = []
+ if func.func_code.co_argcount == 1:
+ def f(arg):
+ if arg not in cache:
+ if len(cache) > 20:
+ del cache[order.pop(0)]
+ cache[arg] = func(arg)
+ else:
+ order.remove(arg)
+ order.append(arg)
+ return cache[arg]
+ else:
+ def f(*args):
+ if args not in cache:
+ if len(cache) > 20:
+ del cache[order.pop(0)]
+ cache[args] = func(*args)
+ else:
+ order.remove(args)
+ order.append(args)
+ return cache[args]
+
+ return f
+
+class propertycache(object):
+ def __init__(self, func):
+ self.func = func
+ self.name = func.__name__
+ def __get__(self, obj, type=None):
+ result = self.func(obj)
+ setattr(obj, self.name, result)
+ return result
+
+def pipefilter(s, cmd):
+ '''filter string S through command CMD, returning its output'''
+ p = subprocess.Popen(cmd, shell=True, close_fds=closefds,
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+ pout, perr = p.communicate(s)
+ return pout
+
+def tempfilter(s, cmd):
+ '''filter string S through a pair of temporary files with CMD.
+ CMD is used as a template to create the real command to be run,
+ with the strings INFILE and OUTFILE replaced by the real names of
+ the temporary files generated.'''
+ inname, outname = None, None
+ try:
+ infd, inname = tempfile.mkstemp(prefix='hg-filter-in-')
+ fp = os.fdopen(infd, 'wb')
+ fp.write(s)
+ fp.close()
+ outfd, outname = tempfile.mkstemp(prefix='hg-filter-out-')
+ os.close(outfd)
+ cmd = cmd.replace('INFILE', inname)
+ cmd = cmd.replace('OUTFILE', outname)
+ code = os.system(cmd)
+ if sys.platform == 'OpenVMS' and code & 1:
+ code = 0
+ if code:
+ raise Abort(_("command '%s' failed: %s") %
+ (cmd, explainexit(code)))
+ fp = open(outname, 'rb')
+ r = fp.read()
+ fp.close()
+ return r
+ finally:
+ try:
+ if inname:
+ os.unlink(inname)
+ except OSError:
+ pass
+ try:
+ if outname:
+ os.unlink(outname)
+ except OSError:
+ pass
+
+filtertable = {
+ 'tempfile:': tempfilter,
+ 'pipe:': pipefilter,
+ }
+
+def filter(s, cmd):
+ "filter a string through a command that transforms its input to its output"
+ for name, fn in filtertable.iteritems():
+ if cmd.startswith(name):
+ return fn(s, cmd[len(name):].lstrip())
+ return pipefilter(s, cmd)
+
+def binary(s):
+ """return true if a string is binary data"""
+ return bool(s and '\0' in s)
+
+def increasingchunks(source, min=1024, max=65536):
+ '''return no less than min bytes per chunk while data remains,
+ doubling min after each chunk until it reaches max'''
+ def log2(x):
+ if not x:
+ return 0
+ i = 0
+ while x:
+ x >>= 1
+ i += 1
+ return i - 1
+
+ buf = []
+ blen = 0
+ for chunk in source:
+ buf.append(chunk)
+ blen += len(chunk)
+ if blen >= min:
+ if min < max:
+ min = min << 1
+ nmin = 1 << log2(blen)
+ if nmin > min:
+ min = nmin
+ if min > max:
+ min = max
+ yield ''.join(buf)
+ blen = 0
+ buf = []
+ if buf:
+ yield ''.join(buf)
+
+Abort = error.Abort
+
+def always(fn):
+ return True
+
+def never(fn):
+ return False
+
+def pathto(root, n1, n2):
+ '''return the relative path from one place to another.
+ root should use os.sep to separate directories
+ n1 should use os.sep to separate directories
+ n2 should use "/" to separate directories
+ returns an os.sep-separated path.
+
+ If n1 is a relative path, it's assumed it's
+ relative to root.
+ n2 should always be relative to root.
+ '''
+ if not n1:
+ return localpath(n2)
+ if os.path.isabs(n1):
+ if os.path.splitdrive(root)[0] != os.path.splitdrive(n1)[0]:
+ return os.path.join(root, localpath(n2))
+ n2 = '/'.join((pconvert(root), n2))
+ a, b = splitpath(n1), n2.split('/')
+ a.reverse()
+ b.reverse()
+ while a and b and a[-1] == b[-1]:
+ a.pop()
+ b.pop()
+ b.reverse()
+ return os.sep.join((['..'] * len(a)) + b) or '.'
+
+_hgexecutable = None
+
+def mainfrozen():
+ """return True if we are a frozen executable.
+
+ The code supports py2exe (most common, Windows only) and tools/freeze
+ (portable, not much used).
+ """
+ return (safehasattr(sys, "frozen") or # new py2exe
+ safehasattr(sys, "importers") or # old py2exe
+ imp.is_frozen("__main__")) # tools/freeze
+
+def hgexecutable():
+ """return location of the 'hg' executable.
+
+ Defaults to $HG or 'hg' in the search path.
+ """
+ if _hgexecutable is None:
+ hg = os.environ.get('HG')
+ mainmod = sys.modules['__main__']
+ if hg:
+ _sethgexecutable(hg)
+ elif mainfrozen():
+ _sethgexecutable(sys.executable)
+ elif os.path.basename(getattr(mainmod, '__file__', '')) == 'hg':
+ _sethgexecutable(mainmod.__file__)
+ else:
+ exe = findexe('hg') or os.path.basename(sys.argv[0])
+ _sethgexecutable(exe)
+ return _hgexecutable
+
+def _sethgexecutable(path):
+ """set location of the 'hg' executable"""
+ global _hgexecutable
+ _hgexecutable = path
+
+def system(cmd, environ={}, cwd=None, onerr=None, errprefix=None, out=None):
+ '''enhanced shell command execution.
+ run with environment maybe modified, maybe in different dir.
+
+ if command fails and onerr is None, return status. if ui object,
+ print error message and return status, else raise onerr object as
+ exception.
+
+ if out is specified, it is assumed to be a file-like object that has a
+ write() method. stdout and stderr will be redirected to out.'''
+ try:
+ sys.stdout.flush()
+ except Exception:
+ pass
+ def py2shell(val):
+ 'convert python object into string that is useful to shell'
+ if val is None or val is False:
+ return '0'
+ if val is True:
+ return '1'
+ return str(val)
+ origcmd = cmd
+ cmd = quotecommand(cmd)
+ env = dict(os.environ)
+ env.update((k, py2shell(v)) for k, v in environ.iteritems())
+ env['HG'] = hgexecutable()
+ if out is None or out == sys.__stdout__:
+ rc = subprocess.call(cmd, shell=True, close_fds=closefds,
+ env=env, cwd=cwd)
+ else:
+ proc = subprocess.Popen(cmd, shell=True, close_fds=closefds,
+ env=env, cwd=cwd, stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ for line in proc.stdout:
+ out.write(line)
+ proc.wait()
+ rc = proc.returncode
+ if sys.platform == 'OpenVMS' and rc & 1:
+ rc = 0
+ if rc and onerr:
+ errmsg = '%s %s' % (os.path.basename(origcmd.split(None, 1)[0]),
+ explainexit(rc)[0])
+ if errprefix:
+ errmsg = '%s: %s' % (errprefix, errmsg)
+ try:
+ onerr.warn(errmsg + '\n')
+ except AttributeError:
+ raise onerr(errmsg)
+ return rc
+
+def checksignature(func):
+ '''wrap a function with code to check for calling errors'''
+ def check(*args, **kwargs):
+ try:
+ return func(*args, **kwargs)
+ except TypeError:
+ if len(traceback.extract_tb(sys.exc_info()[2])) == 1:
+ raise error.SignatureError
+ raise
+
+ return check
+
+def copyfile(src, dest):
+ "copy a file, preserving mode and atime/mtime"
+ if os.path.islink(src):
+ try:
+ os.unlink(dest)
+ except OSError:
+ pass
+ os.symlink(os.readlink(src), dest)
+ else:
+ try:
+ shutil.copyfile(src, dest)
+ shutil.copymode(src, dest)
+ except shutil.Error, inst:
+ raise Abort(str(inst))
+
+def copyfiles(src, dst, hardlink=None):
+ """Copy a directory tree using hardlinks if possible"""
+
+ if hardlink is None:
+ hardlink = (os.stat(src).st_dev ==
+ os.stat(os.path.dirname(dst)).st_dev)
+
+ num = 0
+ if os.path.isdir(src):
+ os.mkdir(dst)
+ for name, kind in osutil.listdir(src):
+ srcname = os.path.join(src, name)
+ dstname = os.path.join(dst, name)
+ hardlink, n = copyfiles(srcname, dstname, hardlink)
+ num += n
+ else:
+ if hardlink:
+ try:
+ oslink(src, dst)
+ except (IOError, OSError):
+ hardlink = False
+ shutil.copy(src, dst)
+ else:
+ shutil.copy(src, dst)
+ num += 1
+
+ return hardlink, num
+
+_winreservednames = '''con prn aux nul
+ com1 com2 com3 com4 com5 com6 com7 com8 com9
+ lpt1 lpt2 lpt3 lpt4 lpt5 lpt6 lpt7 lpt8 lpt9'''.split()
+_winreservedchars = ':*?"<>|'
+def checkwinfilename(path):
+ '''Check that the base-relative path is a valid filename on Windows.
+ Returns None if the path is ok, or a UI string describing the problem.
+
+ >>> checkwinfilename("just/a/normal/path")
+ >>> checkwinfilename("foo/bar/con.xml")
+ "filename contains 'con', which is reserved on Windows"
+ >>> checkwinfilename("foo/con.xml/bar")
+ "filename contains 'con', which is reserved on Windows"
+ >>> checkwinfilename("foo/bar/xml.con")
+ >>> checkwinfilename("foo/bar/AUX/bla.txt")
+ "filename contains 'AUX', which is reserved on Windows"
+ >>> checkwinfilename("foo/bar/bla:.txt")
+ "filename contains ':', which is reserved on Windows"
+ >>> checkwinfilename("foo/bar/b\07la.txt")
+ "filename contains '\\\\x07', which is invalid on Windows"
+ >>> checkwinfilename("foo/bar/bla ")
+ "filename ends with ' ', which is not allowed on Windows"
+ >>> checkwinfilename("../bar")
+ '''
+ for n in path.replace('\\', '/').split('/'):
+ if not n:
+ continue
+ for c in n:
+ if c in _winreservedchars:
+ return _("filename contains '%s', which is reserved "
+ "on Windows") % c
+ if ord(c) <= 31:
+ return _("filename contains %r, which is invalid "
+ "on Windows") % c
+ base = n.split('.')[0]
+ if base and base.lower() in _winreservednames:
+ return _("filename contains '%s', which is reserved "
+ "on Windows") % base
+ t = n[-1]
+ if t in '. ' and n not in '..':
+ return _("filename ends with '%s', which is not allowed "
+ "on Windows") % t
+
+if os.name == 'nt':
+ checkosfilename = checkwinfilename
+else:
+ checkosfilename = platform.checkosfilename
+
+def makelock(info, pathname):
+ try:
+ return os.symlink(info, pathname)
+ except OSError, why:
+ if why.errno == errno.EEXIST:
+ raise
+ except AttributeError: # no symlink in os
+ pass
+
+ ld = os.open(pathname, os.O_CREAT | os.O_WRONLY | os.O_EXCL)
+ os.write(ld, info)
+ os.close(ld)
+
+def readlock(pathname):
+ try:
+ return os.readlink(pathname)
+ except OSError, why:
+ if why.errno not in (errno.EINVAL, errno.ENOSYS):
+ raise
+ except AttributeError: # no symlink in os
+ pass
+ fp = posixfile(pathname)
+ r = fp.read()
+ fp.close()
+ return r
+
+def fstat(fp):
+ '''stat file object that may not have fileno method.'''
+ try:
+ return os.fstat(fp.fileno())
+ except AttributeError:
+ return os.stat(fp.name)
+
+# File system features
+
+def checkcase(path):
+ """
+ Check whether the given path is on a case-sensitive filesystem
+
+ Requires a path (like /foo/.hg) ending with a foldable final
+ directory component.
+ """
+ s1 = os.stat(path)
+ d, b = os.path.split(path)
+ p2 = os.path.join(d, b.upper())
+ if path == p2:
+ p2 = os.path.join(d, b.lower())
+ try:
+ s2 = os.stat(p2)
+ if s2 == s1:
+ return False
+ return True
+ except OSError:
+ return True
+
+_fspathcache = {}
+def fspath(name, root):
+ '''Get name in the case stored in the filesystem
+
+ The name is either relative to root, or it is an absolute path starting
+ with root. Note that this function is unnecessary, and should not be
+ called, for case-sensitive filesystems (simply because it's expensive).
+ '''
+ # If name is absolute, make it relative
+ if name.lower().startswith(root.lower()):
+ l = len(root)
+ if name[l] == os.sep or name[l] == os.altsep:
+ l = l + 1
+ name = name[l:]
+
+ if not os.path.lexists(os.path.join(root, name)):
+ return None
+
+ seps = os.sep
+ if os.altsep:
+ seps = seps + os.altsep
+ # Protect backslashes. This gets silly very quickly.
+ seps.replace('\\','\\\\')
+ pattern = re.compile(r'([^%s]+)|([%s]+)' % (seps, seps))
+ dir = os.path.normcase(os.path.normpath(root))
+ result = []
+ for part, sep in pattern.findall(name):
+ if sep:
+ result.append(sep)
+ continue
+
+ if dir not in _fspathcache:
+ _fspathcache[dir] = os.listdir(dir)
+ contents = _fspathcache[dir]
+
+ lpart = part.lower()
+ lenp = len(part)
+ for n in contents:
+ if lenp == len(n) and n.lower() == lpart:
+ result.append(n)
+ break
+ else:
+ # Cannot happen, as the file exists!
+ result.append(part)
+ dir = os.path.join(dir, lpart)
+
+ return ''.join(result)
+
+def checknlink(testfile):
+ '''check whether hardlink count reporting works properly'''
+
+ # testfile may be open, so we need a separate file for checking to
+ # work around issue2543 (or testfile may get lost on Samba shares)
+ f1 = testfile + ".hgtmp1"
+ if os.path.lexists(f1):
+ return False
+ try:
+ posixfile(f1, 'w').close()
+ except IOError:
+ return False
+
+ f2 = testfile + ".hgtmp2"
+ fd = None
+ try:
+ try:
+ oslink(f1, f2)
+ except OSError:
+ return False
+
+ # nlinks() may behave differently for files on Windows shares if
+ # the file is open.
+ fd = posixfile(f2)
+ return nlinks(f2) > 1
+ finally:
+ if fd is not None:
+ fd.close()
+ for f in (f1, f2):
+ try:
+ os.unlink(f)
+ except OSError:
+ pass
+
+ return False
+
+def endswithsep(path):
+ '''Check path ends with os.sep or os.altsep.'''
+ return path.endswith(os.sep) or os.altsep and path.endswith(os.altsep)
+
+def splitpath(path):
+ '''Split path by os.sep.
+ Note that this function does not use os.altsep because this is
+ an alternative of simple "xxx.split(os.sep)".
+ It is recommended to use os.path.normpath() before using this
+ function if need.'''
+ return path.split(os.sep)
+
+def gui():
+ '''Are we running in a GUI?'''
+ if sys.platform == 'darwin':
+ if 'SSH_CONNECTION' in os.environ:
+ # handle SSH access to a box where the user is logged in
+ return False
+ elif getattr(osutil, 'isgui', None):
+ # check if a CoreGraphics session is available
+ return osutil.isgui()
+ else:
+ # pure build; use a safe default
+ return True
+ else:
+ return os.name == "nt" or os.environ.get("DISPLAY")
+
+def mktempcopy(name, emptyok=False, createmode=None):
+ """Create a temporary file with the same contents from name
+
+ The permission bits are copied from the original file.
+
+ If the temporary file is going to be truncated immediately, you
+ can use emptyok=True as an optimization.
+
+ Returns the name of the temporary file.
+ """
+ d, fn = os.path.split(name)
+ fd, temp = tempfile.mkstemp(prefix='.%s-' % fn, dir=d)
+ os.close(fd)
+ # Temporary files are created with mode 0600, which is usually not
+ # what we want. If the original file already exists, just copy
+ # its mode. Otherwise, manually obey umask.
+ copymode(name, temp, createmode)
+ if emptyok:
+ return temp
+ try:
+ try:
+ ifp = posixfile(name, "rb")
+ except IOError, inst:
+ if inst.errno == errno.ENOENT:
+ return temp
+ if not getattr(inst, 'filename', None):
+ inst.filename = name
+ raise
+ ofp = posixfile(temp, "wb")
+ for chunk in filechunkiter(ifp):
+ ofp.write(chunk)
+ ifp.close()
+ ofp.close()
+ except:
+ try: os.unlink(temp)
+ except: pass
+ raise
+ return temp
+
+class atomictempfile(object):
+ '''writeable file object that atomically updates a file
+
+ All writes will go to a temporary copy of the original file. Call
+ close() when you are done writing, and atomictempfile will rename
+ the temporary copy to the original name, making the changes
+ visible. If the object is destroyed without being closed, all your
+ writes are discarded.
+ '''
+ def __init__(self, name, mode='w+b', createmode=None):
+ self.__name = name # permanent name
+ self._tempname = mktempcopy(name, emptyok=('w' in mode),
+ createmode=createmode)
+ self._fp = posixfile(self._tempname, mode)
+
+ # delegated methods
+ self.write = self._fp.write
+ self.fileno = self._fp.fileno
+
+ def close(self):
+ if not self._fp.closed:
+ self._fp.close()
+ rename(self._tempname, localpath(self.__name))
+
+ def discard(self):
+ if not self._fp.closed:
+ try:
+ os.unlink(self._tempname)
+ except OSError:
+ pass
+ self._fp.close()
+
+ def __del__(self):
+ if safehasattr(self, '_fp'): # constructor actually did something
+ self.discard()
+
+def makedirs(name, mode=None):
+ """recursive directory creation with parent mode inheritance"""
+ try:
+ os.mkdir(name)
+ except OSError, err:
+ if err.errno == errno.EEXIST:
+ return
+ if err.errno != errno.ENOENT or not name:
+ raise
+ parent = os.path.dirname(os.path.abspath(name))
+ if parent == name:
+ raise
+ makedirs(parent, mode)
+ os.mkdir(name)
+ if mode is not None:
+ os.chmod(name, mode)
+
+def readfile(path):
+ fp = open(path, 'rb')
+ try:
+ return fp.read()
+ finally:
+ fp.close()
+
+def writefile(path, text):
+ fp = open(path, 'wb')
+ try:
+ fp.write(text)
+ finally:
+ fp.close()
+
+def appendfile(path, text):
+ fp = open(path, 'ab')
+ try:
+ fp.write(text)
+ finally:
+ fp.close()
+
+class chunkbuffer(object):
+ """Allow arbitrary sized chunks of data to be efficiently read from an
+ iterator over chunks of arbitrary size."""
+
+ def __init__(self, in_iter):
+ """in_iter is the iterator that's iterating over the input chunks.
+ targetsize is how big a buffer to try to maintain."""
+ def splitbig(chunks):
+ for chunk in chunks:
+ if len(chunk) > 2**20:
+ pos = 0
+ while pos < len(chunk):
+ end = pos + 2 ** 18
+ yield chunk[pos:end]
+ pos = end
+ else:
+ yield chunk
+ self.iter = splitbig(in_iter)
+ self._queue = []
+
+ def read(self, l):
+ """Read L bytes of data from the iterator of chunks of data.
+ Returns less than L bytes if the iterator runs dry."""
+ left = l
+ buf = ''
+ queue = self._queue
+ while left > 0:
+ # refill the queue
+ if not queue:
+ target = 2**18
+ for chunk in self.iter:
+ queue.append(chunk)
+ target -= len(chunk)
+ if target <= 0:
+ break
+ if not queue:
+ break
+
+ chunk = queue.pop(0)
+ left -= len(chunk)
+ if left < 0:
+ queue.insert(0, chunk[left:])
+ buf += chunk[:left]
+ else:
+ buf += chunk
+
+ return buf
+
+def filechunkiter(f, size=65536, limit=None):
+ """Create a generator that produces the data in the file size
+ (default 65536) bytes at a time, up to optional limit (default is
+ to read all data). Chunks may be less than size bytes if the
+ chunk is the last chunk in the file, or the file is a socket or
+ some other type of file that sometimes reads less data than is
+ requested."""
+ assert size >= 0
+ assert limit is None or limit >= 0
+ while True:
+ if limit is None:
+ nbytes = size
+ else:
+ nbytes = min(limit, size)
+ s = nbytes and f.read(nbytes)
+ if not s:
+ break
+ if limit:
+ limit -= len(s)
+ yield s
+
+def makedate():
+ ct = time.time()
+ if ct < 0:
+ hint = _("check your clock")
+ raise Abort(_("negative timestamp: %d") % ct, hint=hint)
+ delta = (datetime.datetime.utcfromtimestamp(ct) -
+ datetime.datetime.fromtimestamp(ct))
+ tz = delta.days * 86400 + delta.seconds
+ return ct, tz
+
+def datestr(date=None, format='%a %b %d %H:%M:%S %Y %1%2'):
+ """represent a (unixtime, offset) tuple as a localized time.
+ unixtime is seconds since the epoch, and offset is the time zone's
+ number of seconds away from UTC. if timezone is false, do not
+ append time zone to string."""
+ t, tz = date or makedate()
+ if t < 0:
+ t = 0 # time.gmtime(lt) fails on Windows for lt < -43200
+ tz = 0
+ if "%1" in format or "%2" in format:
+ sign = (tz > 0) and "-" or "+"
+ minutes = abs(tz) // 60
+ format = format.replace("%1", "%c%02d" % (sign, minutes // 60))
+ format = format.replace("%2", "%02d" % (minutes % 60))
+ try:
+ t = time.gmtime(float(t) - tz)
+ except ValueError:
+ # time was out of range
+ t = time.gmtime(sys.maxint)
+ s = time.strftime(format, t)
+ return s
+
+def shortdate(date=None):
+ """turn (timestamp, tzoff) tuple into iso 8631 date."""
+ return datestr(date, format='%Y-%m-%d')
+
+def strdate(string, format, defaults=[]):
+ """parse a localized time string and return a (unixtime, offset) tuple.
+ if the string cannot be parsed, ValueError is raised."""
+ def timezone(string):
+ tz = string.split()[-1]
+ if tz[0] in "+-" and len(tz) == 5 and tz[1:].isdigit():
+ sign = (tz[0] == "+") and 1 or -1
+ hours = int(tz[1:3])
+ minutes = int(tz[3:5])
+ return -sign * (hours * 60 + minutes) * 60
+ if tz == "GMT" or tz == "UTC":
+ return 0
+ return None
+
+ # NOTE: unixtime = localunixtime + offset
+ offset, date = timezone(string), string
+ if offset is not None:
+ date = " ".join(string.split()[:-1])
+
+ # add missing elements from defaults
+ usenow = False # default to using biased defaults
+ for part in ("S", "M", "HI", "d", "mb", "yY"): # decreasing specificity
+ found = [True for p in part if ("%"+p) in format]
+ if not found:
+ date += "@" + defaults[part][usenow]
+ format += "@%" + part[0]
+ else:
+ # We've found a specific time element, less specific time
+ # elements are relative to today
+ usenow = True
+
+ timetuple = time.strptime(date, format)
+ localunixtime = int(calendar.timegm(timetuple))
+ if offset is None:
+ # local timezone
+ unixtime = int(time.mktime(timetuple))
+ offset = unixtime - localunixtime
+ else:
+ unixtime = localunixtime + offset
+ return unixtime, offset
+
+def parsedate(date, formats=None, bias={}):
+ """parse a localized date/time and return a (unixtime, offset) tuple.
+
+ The date may be a "unixtime offset" string or in one of the specified
+ formats. If the date already is a (unixtime, offset) tuple, it is returned.
+ """
+ if not date:
+ return 0, 0
+ if isinstance(date, tuple) and len(date) == 2:
+ return date
+ if not formats:
+ formats = defaultdateformats
+ date = date.strip()
+ try:
+ when, offset = map(int, date.split(' '))
+ except ValueError:
+ # fill out defaults
+ now = makedate()
+ defaults = {}
+ for part in ("d", "mb", "yY", "HI", "M", "S"):
+ # this piece is for rounding the specific end of unknowns
+ b = bias.get(part)
+ if b is None:
+ if part[0] in "HMS":
+ b = "00"
+ else:
+ b = "0"
+
+ # this piece is for matching the generic end to today's date
+ n = datestr(now, "%" + part[0])
+
+ defaults[part] = (b, n)
+
+ for format in formats:
+ try:
+ when, offset = strdate(date, format, defaults)
+ except (ValueError, OverflowError):
+ pass
+ else:
+ break
+ else:
+ raise Abort(_('invalid date: %r') % date)
+ # validate explicit (probably user-specified) date and
+ # time zone offset. values must fit in signed 32 bits for
+ # current 32-bit linux runtimes. timezones go from UTC-12
+ # to UTC+14
+ if abs(when) > 0x7fffffff:
+ raise Abort(_('date exceeds 32 bits: %d') % when)
+ if when < 0:
+ raise Abort(_('negative date value: %d') % when)
+ if offset < -50400 or offset > 43200:
+ raise Abort(_('impossible time zone offset: %d') % offset)
+ return when, offset
+
+def matchdate(date):
+ """Return a function that matches a given date match specifier
+
+ Formats include:
+
+ '{date}' match a given date to the accuracy provided
+
+ '<{date}' on or before a given date
+
+ '>{date}' on or after a given date
+
+ >>> p1 = parsedate("10:29:59")
+ >>> p2 = parsedate("10:30:00")
+ >>> p3 = parsedate("10:30:59")
+ >>> p4 = parsedate("10:31:00")
+ >>> p5 = parsedate("Sep 15 10:30:00 1999")
+ >>> f = matchdate("10:30")
+ >>> f(p1[0])
+ False
+ >>> f(p2[0])
+ True
+ >>> f(p3[0])
+ True
+ >>> f(p4[0])
+ False
+ >>> f(p5[0])
+ False
+ """
+
+ def lower(date):
+ d = dict(mb="1", d="1")
+ return parsedate(date, extendeddateformats, d)[0]
+
+ def upper(date):
+ d = dict(mb="12", HI="23", M="59", S="59")
+ for days in ("31", "30", "29"):
+ try:
+ d["d"] = days
+ return parsedate(date, extendeddateformats, d)[0]
+ except:
+ pass
+ d["d"] = "28"
+ return parsedate(date, extendeddateformats, d)[0]
+
+ date = date.strip()
+
+ if not date:
+ raise Abort(_("dates cannot consist entirely of whitespace"))
+ elif date[0] == "<":
+ if not date[1:]:
+ raise Abort(_("invalid day spec, use '<DATE'"))
+ when = upper(date[1:])
+ return lambda x: x <= when
+ elif date[0] == ">":
+ if not date[1:]:
+ raise Abort(_("invalid day spec, use '>DATE'"))
+ when = lower(date[1:])
+ return lambda x: x >= when
+ elif date[0] == "-":
+ try:
+ days = int(date[1:])
+ except ValueError:
+ raise Abort(_("invalid day spec: %s") % date[1:])
+ if days < 0:
+ raise Abort(_("%s must be nonnegative (see 'hg help dates')")
+ % date[1:])
+ when = makedate()[0] - days * 3600 * 24
+ return lambda x: x >= when
+ elif " to " in date:
+ a, b = date.split(" to ")
+ start, stop = lower(a), upper(b)
+ return lambda x: x >= start and x <= stop
+ else:
+ start, stop = lower(date), upper(date)
+ return lambda x: x >= start and x <= stop
+
+def shortuser(user):
+ """Return a short representation of a user name or email address."""
+ f = user.find('@')
+ if f >= 0:
+ user = user[:f]
+ f = user.find('<')
+ if f >= 0:
+ user = user[f + 1:]
+ f = user.find(' ')
+ if f >= 0:
+ user = user[:f]
+ f = user.find('.')
+ if f >= 0:
+ user = user[:f]
+ return user
+
+def email(author):
+ '''get email of author.'''
+ r = author.find('>')
+ if r == -1:
+ r = None
+ return author[author.find('<') + 1:r]
+
+def _ellipsis(text, maxlength):
+ if len(text) <= maxlength:
+ return text, False
+ else:
+ return "%s..." % (text[:maxlength - 3]), True
+
+def ellipsis(text, maxlength=400):
+ """Trim string to at most maxlength (default: 400) characters."""
+ try:
+ # use unicode not to split at intermediate multi-byte sequence
+ utext, truncated = _ellipsis(text.decode(encoding.encoding),
+ maxlength)
+ if not truncated:
+ return text
+ return utext.encode(encoding.encoding)
+ except (UnicodeDecodeError, UnicodeEncodeError):
+ return _ellipsis(text, maxlength)[0]
+
+def bytecount(nbytes):
+ '''return byte count formatted as readable string, with units'''
+
+ units = (
+ (100, 1 << 30, _('%.0f GB')),
+ (10, 1 << 30, _('%.1f GB')),
+ (1, 1 << 30, _('%.2f GB')),
+ (100, 1 << 20, _('%.0f MB')),
+ (10, 1 << 20, _('%.1f MB')),
+ (1, 1 << 20, _('%.2f MB')),
+ (100, 1 << 10, _('%.0f KB')),
+ (10, 1 << 10, _('%.1f KB')),
+ (1, 1 << 10, _('%.2f KB')),
+ (1, 1, _('%.0f bytes')),
+ )
+
+ for multiplier, divisor, format in units:
+ if nbytes >= divisor * multiplier:
+ return format % (nbytes / float(divisor))
+ return units[-1][2] % nbytes
+
+def uirepr(s):
+ # Avoid double backslash in Windows path repr()
+ return repr(s).replace('\\\\', '\\')
+
+# delay import of textwrap
+def MBTextWrapper(**kwargs):
+ class tw(textwrap.TextWrapper):
+ """
+ Extend TextWrapper for width-awareness.
+
+ Neither number of 'bytes' in any encoding nor 'characters' is
+ appropriate to calculate terminal columns for specified string.
+
+ Original TextWrapper implementation uses built-in 'len()' directly,
+ so overriding is needed to use width information of each characters.
+
+ In addition, characters classified into 'ambiguous' width are
+ treated as wide in east asian area, but as narrow in other.
+
+ This requires use decision to determine width of such characters.
+ """
+ def __init__(self, **kwargs):
+ textwrap.TextWrapper.__init__(self, **kwargs)
+
+ # for compatibility between 2.4 and 2.6
+ if getattr(self, 'drop_whitespace', None) is None:
+ self.drop_whitespace = kwargs.get('drop_whitespace', True)
+
+ def _cutdown(self, ucstr, space_left):
+ l = 0
+ colwidth = encoding.ucolwidth
+ for i in xrange(len(ucstr)):
+ l += colwidth(ucstr[i])
+ if space_left < l:
+ return (ucstr[:i], ucstr[i:])
+ return ucstr, ''
+
+ # overriding of base class
+ def _handle_long_word(self, reversed_chunks, cur_line, cur_len, width):
+ space_left = max(width - cur_len, 1)
+
+ if self.break_long_words:
+ cut, res = self._cutdown(reversed_chunks[-1], space_left)
+ cur_line.append(cut)
+ reversed_chunks[-1] = res
+ elif not cur_line:
+ cur_line.append(reversed_chunks.pop())
+
+ # this overriding code is imported from TextWrapper of python 2.6
+ # to calculate columns of string by 'encoding.ucolwidth()'
+ def _wrap_chunks(self, chunks):
+ colwidth = encoding.ucolwidth
+
+ lines = []
+ if self.width <= 0:
+ raise ValueError("invalid width %r (must be > 0)" % self.width)
+
+ # Arrange in reverse order so items can be efficiently popped
+ # from a stack of chucks.
+ chunks.reverse()
+
+ while chunks:
+
+ # Start the list of chunks that will make up the current line.
+ # cur_len is just the length of all the chunks in cur_line.
+ cur_line = []
+ cur_len = 0
+
+ # Figure out which static string will prefix this line.
+ if lines:
+ indent = self.subsequent_indent
+ else:
+ indent = self.initial_indent
+
+ # Maximum width for this line.
+ width = self.width - len(indent)
+
+ # First chunk on line is whitespace -- drop it, unless this
+ # is the very beginning of the text (ie. no lines started yet).
+ if self.drop_whitespace and chunks[-1].strip() == '' and lines:
+ del chunks[-1]
+
+ while chunks:
+ l = colwidth(chunks[-1])
+
+ # Can at least squeeze this chunk onto the current line.
+ if cur_len + l <= width:
+ cur_line.append(chunks.pop())
+ cur_len += l
+
+ # Nope, this line is full.
+ else:
+ break
+
+ # The current line is full, and the next chunk is too big to
+ # fit on *any* line (not just this one).
+ if chunks and colwidth(chunks[-1]) > width:
+ self._handle_long_word(chunks, cur_line, cur_len, width)
+
+ # If the last chunk on this line is all whitespace, drop it.
+ if (self.drop_whitespace and
+ cur_line and cur_line[-1].strip() == ''):
+ del cur_line[-1]
+
+ # Convert current line back to a string and store it in list
+ # of all lines (return value).
+ if cur_line:
+ lines.append(indent + ''.join(cur_line))
+
+ return lines
+
+ global MBTextWrapper
+ MBTextWrapper = tw
+ return tw(**kwargs)
+
+def wrap(line, width, initindent='', hangindent=''):
+ maxindent = max(len(hangindent), len(initindent))
+ if width <= maxindent:
+ # adjust for weird terminal size
+ width = max(78, maxindent + 1)
+ line = line.decode(encoding.encoding, encoding.encodingmode)
+ initindent = initindent.decode(encoding.encoding, encoding.encodingmode)
+ hangindent = hangindent.decode(encoding.encoding, encoding.encodingmode)
+ wrapper = MBTextWrapper(width=width,
+ initial_indent=initindent,
+ subsequent_indent=hangindent)
+ return wrapper.fill(line).encode(encoding.encoding)
+
+def iterlines(iterator):
+ for chunk in iterator:
+ for line in chunk.splitlines():
+ yield line
+
+def expandpath(path):
+ return os.path.expanduser(os.path.expandvars(path))
+
+def hgcmd():
+ """Return the command used to execute current hg
+
+ This is different from hgexecutable() because on Windows we want
+ to avoid things opening new shell windows like batch files, so we
+ get either the python call or current executable.
+ """
+ if mainfrozen():
+ return [sys.executable]
+ return gethgcmd()
+
+def rundetached(args, condfn):
+ """Execute the argument list in a detached process.
+
+ condfn is a callable which is called repeatedly and should return
+ True once the child process is known to have started successfully.
+ At this point, the child process PID is returned. If the child
+ process fails to start or finishes before condfn() evaluates to
+ True, return -1.
+ """
+ # Windows case is easier because the child process is either
+ # successfully starting and validating the condition or exiting
+ # on failure. We just poll on its PID. On Unix, if the child
+ # process fails to start, it will be left in a zombie state until
+ # the parent wait on it, which we cannot do since we expect a long
+ # running process on success. Instead we listen for SIGCHLD telling
+ # us our child process terminated.
+ terminated = set()
+ def handler(signum, frame):
+ terminated.add(os.wait())
+ prevhandler = None
+ SIGCHLD = getattr(signal, 'SIGCHLD', None)
+ if SIGCHLD is not None:
+ prevhandler = signal.signal(SIGCHLD, handler)
+ try:
+ pid = spawndetached(args)
+ while not condfn():
+ if ((pid in terminated or not testpid(pid))
+ and not condfn()):
+ return -1
+ time.sleep(0.1)
+ return pid
+ finally:
+ if prevhandler is not None:
+ signal.signal(signal.SIGCHLD, prevhandler)
+
+try:
+ any, all = any, all
+except NameError:
+ def any(iterable):
+ for i in iterable:
+ if i:
+ return True
+ return False
+
+ def all(iterable):
+ for i in iterable:
+ if not i:
+ return False
+ return True
+
+def interpolate(prefix, mapping, s, fn=None, escape_prefix=False):
+ """Return the result of interpolating items in the mapping into string s.
+
+ prefix is a single character string, or a two character string with
+ a backslash as the first character if the prefix needs to be escaped in
+ a regular expression.
+
+ fn is an optional function that will be applied to the replacement text
+ just before replacement.
+
+ escape_prefix is an optional flag that allows using doubled prefix for
+ its escaping.
+ """
+ fn = fn or (lambda s: s)
+ patterns = '|'.join(mapping.keys())
+ if escape_prefix:
+ patterns += '|' + prefix
+ if len(prefix) > 1:
+ prefix_char = prefix[1:]
+ else:
+ prefix_char = prefix
+ mapping[prefix_char] = prefix_char
+ r = re.compile(r'%s(%s)' % (prefix, patterns))
+ return r.sub(lambda x: fn(mapping[x.group()[1:]]), s)
+
+def getport(port):
+ """Return the port for a given network service.
+
+ If port is an integer, it's returned as is. If it's a string, it's
+ looked up using socket.getservbyname(). If there's no matching
+ service, util.Abort is raised.
+ """
+ try:
+ return int(port)
+ except ValueError:
+ pass
+
+ try:
+ return socket.getservbyname(port)
+ except socket.error:
+ raise Abort(_("no port number associated with service '%s'") % port)
+
+_booleans = {'1': True, 'yes': True, 'true': True, 'on': True, 'always': True,
+ '0': False, 'no': False, 'false': False, 'off': False,
+ 'never': False}
+
+def parsebool(s):
+ """Parse s into a boolean.
+
+ If s is not a valid boolean, returns None.
+ """
+ return _booleans.get(s.lower(), None)
+
+_hexdig = '0123456789ABCDEFabcdef'
+_hextochr = dict((a + b, chr(int(a + b, 16)))
+ for a in _hexdig for b in _hexdig)
+
+def _urlunquote(s):
+ """unquote('abc%20def') -> 'abc def'."""
+ res = s.split('%')
+ # fastpath
+ if len(res) == 1:
+ return s
+ s = res[0]
+ for item in res[1:]:
+ try:
+ s += _hextochr[item[:2]] + item[2:]
+ except KeyError:
+ s += '%' + item
+ except UnicodeDecodeError:
+ s += unichr(int(item[:2], 16)) + item[2:]
+ return s
+
+class url(object):
+ r"""Reliable URL parser.
+
+ This parses URLs and provides attributes for the following
+ components:
+
+ <scheme>://<user>:<passwd>@<host>:<port>/<path>?<query>#<fragment>
+
+ Missing components are set to None. The only exception is
+ fragment, which is set to '' if present but empty.
+
+ If parsefragment is False, fragment is included in query. If
+ parsequery is False, query is included in path. If both are
+ False, both fragment and query are included in path.
+
+ See http://www.ietf.org/rfc/rfc2396.txt for more information.
+
+ Note that for backward compatibility reasons, bundle URLs do not
+ take host names. That means 'bundle://../' has a path of '../'.
+
+ Examples:
+
+ >>> url('http://www.ietf.org/rfc/rfc2396.txt')
+ <url scheme: 'http', host: 'www.ietf.org', path: 'rfc/rfc2396.txt'>
+ >>> url('ssh://[::1]:2200//home/joe/repo')
+ <url scheme: 'ssh', host: '[::1]', port: '2200', path: '/home/joe/repo'>
+ >>> url('file:///home/joe/repo')
+ <url scheme: 'file', path: '/home/joe/repo'>
+ >>> url('file:///c:/temp/foo/')
+ <url scheme: 'file', path: 'c:/temp/foo/'>
+ >>> url('bundle:foo')
+ <url scheme: 'bundle', path: 'foo'>
+ >>> url('bundle://../foo')
+ <url scheme: 'bundle', path: '../foo'>
+ >>> url(r'c:\foo\bar')
+ <url path: 'c:\\foo\\bar'>
+ >>> url(r'\\blah\blah\blah')
+ <url path: '\\\\blah\\blah\\blah'>
+ >>> url(r'\\blah\blah\blah#baz')
+ <url path: '\\\\blah\\blah\\blah', fragment: 'baz'>
+
+ Authentication credentials:
+
+ >>> url('ssh://joe:xyz@x/repo')
+ <url scheme: 'ssh', user: 'joe', passwd: 'xyz', host: 'x', path: 'repo'>
+ >>> url('ssh://joe@x/repo')
+ <url scheme: 'ssh', user: 'joe', host: 'x', path: 'repo'>
+
+ Query strings and fragments:
+
+ >>> url('http://host/a?b#c')
+ <url scheme: 'http', host: 'host', path: 'a', query: 'b', fragment: 'c'>
+ >>> url('http://host/a?b#c', parsequery=False, parsefragment=False)
+ <url scheme: 'http', host: 'host', path: 'a?b#c'>
+ """
+
+ _safechars = "!~*'()+"
+ _safepchars = "/!~*'()+"
+ _matchscheme = re.compile(r'^[a-zA-Z0-9+.\-]+:').match
+
+ def __init__(self, path, parsequery=True, parsefragment=True):
+ # We slowly chomp away at path until we have only the path left
+ self.scheme = self.user = self.passwd = self.host = None
+ self.port = self.path = self.query = self.fragment = None
+ self._localpath = True
+ self._hostport = ''
+ self._origpath = path
+
+ if parsefragment and '#' in path:
+ path, self.fragment = path.split('#', 1)
+ if not path:
+ path = None
+
+ # special case for Windows drive letters and UNC paths
+ if hasdriveletter(path) or path.startswith(r'\\'):
+ self.path = path
+ return
+
+ # For compatibility reasons, we can't handle bundle paths as
+ # normal URLS
+ if path.startswith('bundle:'):
+ self.scheme = 'bundle'
+ path = path[7:]
+ if path.startswith('//'):
+ path = path[2:]
+ self.path = path
+ return
+
+ if self._matchscheme(path):
+ parts = path.split(':', 1)
+ if parts[0]:
+ self.scheme, path = parts
+ self._localpath = False
+
+ if not path:
+ path = None
+ if self._localpath:
+ self.path = ''
+ return
+ else:
+ if self._localpath:
+ self.path = path
+ return
+
+ if parsequery and '?' in path:
+ path, self.query = path.split('?', 1)
+ if not path:
+ path = None
+ if not self.query:
+ self.query = None
+
+ # // is required to specify a host/authority
+ if path and path.startswith('//'):
+ parts = path[2:].split('/', 1)
+ if len(parts) > 1:
+ self.host, path = parts
+ path = path
+ else:
+ self.host = parts[0]
+ path = None
+ if not self.host:
+ self.host = None
+ # path of file:///d is /d
+ # path of file:///d:/ is d:/, not /d:/
+ if path and not hasdriveletter(path):
+ path = '/' + path
+
+ if self.host and '@' in self.host:
+ self.user, self.host = self.host.rsplit('@', 1)
+ if ':' in self.user:
+ self.user, self.passwd = self.user.split(':', 1)
+ if not self.host:
+ self.host = None
+
+ # Don't split on colons in IPv6 addresses without ports
+ if (self.host and ':' in self.host and
+ not (self.host.startswith('[') and self.host.endswith(']'))):
+ self._hostport = self.host
+ self.host, self.port = self.host.rsplit(':', 1)
+ if not self.host:
+ self.host = None
+
+ if (self.host and self.scheme == 'file' and
+ self.host not in ('localhost', '127.0.0.1', '[::1]')):
+ raise Abort(_('file:// URLs can only refer to localhost'))
+
+ self.path = path
+
+ # leave the query string escaped
+ for a in ('user', 'passwd', 'host', 'port',
+ 'path', 'fragment'):
+ v = getattr(self, a)
+ if v is not None:
+ setattr(self, a, _urlunquote(v))
+
+ def __repr__(self):
+ attrs = []
+ for a in ('scheme', 'user', 'passwd', 'host', 'port', 'path',
+ 'query', 'fragment'):
+ v = getattr(self, a)
+ if v is not None:
+ attrs.append('%s: %r' % (a, v))
+ return '<url %s>' % ', '.join(attrs)
+
+ def __str__(self):
+ r"""Join the URL's components back into a URL string.
+
+ Examples:
+
+ >>> str(url('http://user:pw@host:80/?foo#bar'))
+ 'http://user:pw@host:80/?foo#bar'
+ >>> str(url('http://user:pw@host:80/?foo=bar&baz=42'))
+ 'http://user:pw@host:80/?foo=bar&baz=42'
+ >>> str(url('http://user:pw@host:80/?foo=bar%3dbaz'))
+ 'http://user:pw@host:80/?foo=bar%3dbaz'
+ >>> str(url('ssh://user:pw@[::1]:2200//home/joe#'))
+ 'ssh://user:pw@[::1]:2200//home/joe#'
+ >>> str(url('http://localhost:80//'))
+ 'http://localhost:80//'
+ >>> str(url('http://localhost:80/'))
+ 'http://localhost:80/'
+ >>> str(url('http://localhost:80'))
+ 'http://localhost:80/'
+ >>> str(url('bundle:foo'))
+ 'bundle:foo'
+ >>> str(url('bundle://../foo'))
+ 'bundle:../foo'
+ >>> str(url('path'))
+ 'path'
+ >>> str(url('file:///tmp/foo/bar'))
+ 'file:///tmp/foo/bar'
+ >>> print url(r'bundle:foo\bar')
+ bundle:foo\bar
+ """
+ if self._localpath:
+ s = self.path
+ if self.scheme == 'bundle':
+ s = 'bundle:' + s
+ if self.fragment:
+ s += '#' + self.fragment
+ return s
+
+ s = self.scheme + ':'
+ if self.user or self.passwd or self.host:
+ s += '//'
+ elif self.scheme and (not self.path or self.path.startswith('/')):
+ s += '//'
+ if self.user:
+ s += urllib.quote(self.user, safe=self._safechars)
+ if self.passwd:
+ s += ':' + urllib.quote(self.passwd, safe=self._safechars)
+ if self.user or self.passwd:
+ s += '@'
+ if self.host:
+ if not (self.host.startswith('[') and self.host.endswith(']')):
+ s += urllib.quote(self.host)
+ else:
+ s += self.host
+ if self.port:
+ s += ':' + urllib.quote(self.port)
+ if self.host:
+ s += '/'
+ if self.path:
+ # TODO: similar to the query string, we should not unescape the
+ # path when we store it, the path might contain '%2f' = '/',
+ # which we should *not* escape.
+ s += urllib.quote(self.path, safe=self._safepchars)
+ if self.query:
+ # we store the query in escaped form.
+ s += '?' + self.query
+ if self.fragment is not None:
+ s += '#' + urllib.quote(self.fragment, safe=self._safepchars)
+ return s
+
+ def authinfo(self):
+ user, passwd = self.user, self.passwd
+ try:
+ self.user, self.passwd = None, None
+ s = str(self)
+ finally:
+ self.user, self.passwd = user, passwd
+ if not self.user:
+ return (s, None)
+ # authinfo[1] is passed to urllib2 password manager, and its
+ # URIs must not contain credentials. The host is passed in the
+ # URIs list because Python < 2.4.3 uses only that to search for
+ # a password.
+ return (s, (None, (s, self.host),
+ self.user, self.passwd or ''))
+
+ def isabs(self):
+ if self.scheme and self.scheme != 'file':
+ return True # remote URL
+ if hasdriveletter(self.path):
+ return True # absolute for our purposes - can't be joined()
+ if self.path.startswith(r'\\'):
+ return True # Windows UNC path
+ if self.path.startswith('/'):
+ return True # POSIX-style
+ return False
+
+ def localpath(self):
+ if self.scheme == 'file' or self.scheme == 'bundle':
+ path = self.path or '/'
+ # For Windows, we need to promote hosts containing drive
+ # letters to paths with drive letters.
+ if hasdriveletter(self._hostport):
+ path = self._hostport + '/' + self.path
+ elif (self.host is not None and self.path
+ and not hasdriveletter(path)):
+ path = '/' + path
+ return path
+ return self._origpath
+
+def hasscheme(path):
+ return bool(url(path).scheme)
+
+def hasdriveletter(path):
+ return path[1:2] == ':' and path[0:1].isalpha()
+
+def urllocalpath(path):
+ return url(path, parsequery=False, parsefragment=False).localpath()
+
+def hidepassword(u):
+ '''hide user credential in a url string'''
+ u = url(u)
+ if u.passwd:
+ u.passwd = '***'
+ return str(u)
+
+def removeauth(u):
+ '''remove all authentication information from a url string'''
+ u = url(u)
+ u.user = u.passwd = None
+ return str(u)
+
+def isatty(fd):
+ try:
+ return fd.isatty()
+ except AttributeError:
+ return False