Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/src/webdav/davlib.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/webdav/davlib.py')
-rw-r--r--src/webdav/davlib.py336
1 files changed, 336 insertions, 0 deletions
diff --git a/src/webdav/davlib.py b/src/webdav/davlib.py
new file mode 100644
index 0000000..f4dac91
--- /dev/null
+++ b/src/webdav/davlib.py
@@ -0,0 +1,336 @@
+# pylint: disable-msg=W0402,W0231,W0141,R0903,C0321,W0701,R0904,C0103,W0201,W0102,R0913,W0622,E1101,C0111,C0121,R0901
+# DAV client library
+#
+# Copyright (C) 1998-2000 Guido van Rossum. All Rights Reserved.
+# Written by Greg Stein. Given to Guido. Licensed using the Python license.
+#
+# This module is maintained by Greg and is available at:
+# http://www.lyra.org/greg/python/davlib.py
+#
+# Since this isn't in the Python distribution yet, we'll use the CVS ID
+# for tracking:
+# $Id: davlib.py 3182 2008-02-22 15:57:55 +0000 (Fr, 22 Feb 2008) schlauch $
+#
+
+import httplib
+import urllib
+import string
+import types
+import mimetypes
+import qp_xml
+
+
+INFINITY = 'infinity'
+XML_DOC_HEADER = '<?xml version="1.0" encoding="utf-8"?>'
+XML_CONTENT_TYPE = 'text/xml; charset="utf-8"'
+
+# block size for copying files up to the server
+BLOCKSIZE = 16384
+
+
+class HTTPProtocolChooser(httplib.HTTPSConnection):
+ def __init__(self, *args, **kw):
+ self.protocol = kw.pop('protocol')
+ if self.protocol == "https":
+ self.default_port = 443
+ else:
+ self.default_port = 80
+
+ apply(httplib.HTTPSConnection.__init__, (self,) + args, kw)
+
+ def connect(self):
+ if self.protocol == "https":
+ httplib.HTTPSConnection.connect(self)
+ else:
+ httplib.HTTPConnection.connect(self)
+
+
+class HTTPConnectionAuth(HTTPProtocolChooser):
+ def __init__(self, *args, **kw):
+ apply(HTTPProtocolChooser.__init__, (self,) + args, kw)
+
+ self.__username = None
+ self.__password = None
+ self.__nonce = None
+ self.__opaque = None
+
+ def setauth(self, username, password):
+ self.__username = username
+ self.__password = password
+
+
+def _parse_status(elem):
+ text = elem.textof()
+ idx1 = string.find(text, ' ')
+ idx2 = string.find(text, ' ', idx1+1)
+ return int(text[idx1:idx2]), text[idx2+1:]
+
+class _blank:
+ def __init__(self, **kw):
+ self.__dict__.update(kw)
+class _propstat(_blank): pass
+class _response(_blank): pass
+class _multistatus(_blank): pass
+
+def _extract_propstat(elem):
+ ps = _propstat(prop={}, status=None, responsedescription=None)
+ for child in elem.children:
+ if child.ns != 'DAV:':
+ continue
+ if child.name == 'prop':
+ for prop in child.children:
+ ps.prop[(prop.ns, prop.name)] = prop
+ elif child.name == 'status':
+ ps.status = _parse_status(child)
+ elif child.name == 'responsedescription':
+ ps.responsedescription = child.textof()
+ ### unknown element name
+
+ return ps
+
+def _extract_response(elem):
+ resp = _response(href=[], status=None, responsedescription=None, propstat=[])
+ for child in elem.children:
+ if child.ns != 'DAV:':
+ continue
+ if child.name == 'href':
+ resp.href.append(child.textof())
+ elif child.name == 'status':
+ resp.status = _parse_status(child)
+ elif child.name == 'responsedescription':
+ resp.responsedescription = child.textof()
+ elif child.name == 'propstat':
+ resp.propstat.append(_extract_propstat(child))
+ ### unknown child element
+
+ return resp
+
+def _extract_msr(root):
+ if root.ns != 'DAV:' or root.name != 'multistatus':
+ raise 'invalid response: <DAV:multistatus> expected'
+
+ msr = _multistatus(responses=[ ], responsedescription=None)
+
+ for child in root.children:
+ if child.ns != 'DAV:':
+ continue
+ if child.name == 'responsedescription':
+ msr.responsedescription = child.textof()
+ elif child.name == 'response':
+ msr.responses.append(_extract_response(child))
+ ### unknown child element
+
+ return msr
+
+def _extract_locktoken(root):
+ if root.ns != 'DAV:' or root.name != 'prop':
+ raise 'invalid response: <DAV:prop> expected'
+ elem = root.find('lockdiscovery', 'DAV:')
+ if not elem:
+ raise 'invalid response: <DAV:lockdiscovery> expected'
+ elem = elem.find('activelock', 'DAV:')
+ if not elem:
+ raise 'invalid response: <DAV:activelock> expected'
+ elem = elem.find('locktoken', 'DAV:')
+ if not elem:
+ raise 'invalid response: <DAV:locktoken> expected'
+ elem = elem.find('href', 'DAV:')
+ if not elem:
+ raise 'invalid response: <DAV:href> expected'
+ return elem.textof()
+
+
+class DAVResponse(httplib.HTTPResponse):
+ def parse_multistatus(self):
+ self.root = qp_xml.Parser().parse(self)
+ self.msr = _extract_msr(self.root)
+
+ def parse_lock_response(self):
+ self.root = qp_xml.Parser().parse(self)
+ self.locktoken = _extract_locktoken(self.root)
+
+
+class DAV(HTTPConnectionAuth):
+
+ response_class = DAVResponse
+
+ def get(self, url, extra_hdrs={ }):
+ return self._request('GET', url, extra_hdrs=extra_hdrs)
+
+ def head(self, url, extra_hdrs={ }):
+ return self._request('HEAD', url, extra_hdrs=extra_hdrs)
+
+ def post(self, url, data={ }, body=None, extra_hdrs={ }):
+ headers = extra_hdrs.copy()
+
+ assert body or data, "body or data must be supplied"
+ assert not (body and data), "cannot supply both body and data"
+ if data:
+ body = ''
+ for key, value in data.items():
+ if isinstance(value, types.ListType):
+ for item in value:
+ body = body + '&' + key + '=' + urllib.quote(str(item))
+ else:
+ body = body + '&' + key + '=' + urllib.quote(str(value))
+ body = body[1:]
+ headers['Content-Type'] = 'application/x-www-form-urlencoded'
+
+ return self._request('POST', url, body, headers)
+
+ def options(self, url='*', extra_hdrs={ }):
+ return self._request('OPTIONS', url, extra_hdrs=extra_hdrs)
+
+ def trace(self, url, extra_hdrs={ }):
+ return self._request('TRACE', url, extra_hdrs=extra_hdrs)
+
+ def put(self, url, contents,
+ content_type=None, content_enc=None, extra_hdrs={ }):
+
+ if not content_type:
+ content_type, content_enc = mimetypes.guess_type(url)
+
+ headers = extra_hdrs.copy()
+ if content_type:
+ headers['Content-Type'] = content_type
+ if content_enc:
+ headers['Content-Encoding'] = content_enc
+ return self._request('PUT', url, contents, headers)
+
+ def delete(self, url, extra_hdrs={ }):
+ return self._request('DELETE', url, extra_hdrs=extra_hdrs)
+
+ def propfind(self, url, body=None, depth=None, extra_hdrs={ }):
+ headers = extra_hdrs.copy()
+ headers['Content-Type'] = XML_CONTENT_TYPE
+ if depth is not None:
+ headers['Depth'] = str(depth)
+ return self._request('PROPFIND', url, body, headers)
+
+ def proppatch(self, url, body, extra_hdrs={ }):
+ headers = extra_hdrs.copy()
+ headers['Content-Type'] = XML_CONTENT_TYPE
+ return self._request('PROPPATCH', url, body, headers)
+
+ def mkcol(self, url, extra_hdrs={ }):
+ return self._request('MKCOL', url, extra_hdrs=extra_hdrs)
+
+ def move(self, src, dst, extra_hdrs={ }):
+ headers = extra_hdrs.copy()
+ headers['Destination'] = dst
+ return self._request('MOVE', src, extra_hdrs=headers)
+
+ def copy(self, src, dst, depth=None, extra_hdrs={ }):
+ headers = extra_hdrs.copy()
+ headers['Destination'] = dst
+ if depth is not None:
+ headers['Depth'] = str(depth)
+ return self._request('COPY', src, extra_hdrs=headers)
+
+ def lock(self, url, owner='', timeout=None, depth=None,
+ scope='exclusive', type='write', extra_hdrs={ }):
+ headers = extra_hdrs.copy()
+ headers['Content-Type'] = XML_CONTENT_TYPE
+ if depth is not None:
+ headers['Depth'] = str(depth)
+ if timeout is not None:
+ headers['Timeout'] = timeout
+ body = XML_DOC_HEADER + \
+ '<DAV:lockinfo xmlns:DAV="DAV:">' + \
+ '<DAV:lockscope><DAV:%s/></DAV:lockscope>' % scope + \
+ '<DAV:locktype><DAV:%s/></DAV:locktype>' % type + \
+ '<DAV:owner>' + owner + '</DAV:owner>' + \
+ '</DAV:lockinfo>'
+ return self._request('LOCK', url, body, extra_hdrs=headers)
+
+ def unlock(self, url, locktoken, extra_hdrs={ }):
+ headers = extra_hdrs.copy()
+ if locktoken[0] != '<':
+ locktoken = '<' + locktoken + '>'
+ headers['Lock-Token'] = locktoken
+ return self._request('UNLOCK', url, extra_hdrs=headers)
+
+ def _request(self, method, url, body=None, extra_hdrs={}):
+ "Internal method for sending a request."
+
+ self.request(method, url, body, extra_hdrs)
+ return self.getresponse()
+
+
+ #
+ # Higher-level methods for typical client use
+ #
+
+ def allprops(self, url, depth=None):
+ body = XML_DOC_HEADER + \
+ '<DAV:propfind xmlns:DAV="DAV:"><DAV:allprop/></DAV:propfind>'
+ return self.propfind(url, body, depth=depth)
+
+ def propnames(self, url, depth=None):
+ body = XML_DOC_HEADER + \
+ '<DAV:propfind xmlns:DAV="DAV:"><DAV:propname/></DAV:propfind>'
+ return self.propfind(url, body, depth)
+
+ def getprops(self, url, *names, **kw):
+ assert names, 'at least one property name must be provided'
+ if kw.has_key('ns'):
+ xmlns = ' xmlns:NS="' + kw['ns'] + '"'
+ ns = 'NS:'
+ del kw['ns']
+ else:
+ xmlns = ns = ''
+ if kw.has_key('depth'):
+ depth = kw['depth']
+ del kw['depth']
+ else:
+ depth = 0
+ assert not kw, 'unknown arguments'
+ body = XML_DOC_HEADER + \
+ '<DAV:propfind xmlns:DAV="DAV:"' + xmlns + '><DAV:prop><' + ns + \
+ string.joinfields(names, '/><' + ns) + \
+ '/></DAV:prop></DAV:propfind>'
+ return self.propfind(url, body, depth)
+
+ def delprops(self, url, *names, **kw):
+ assert names, 'at least one property name must be provided'
+ if kw.has_key('ns'):
+ xmlns = ' xmlns:NS="' + kw['ns'] + '"'
+ ns = 'NS:'
+ del kw['ns']
+ else:
+ xmlns = ns = ''
+ assert not kw, 'unknown arguments'
+ body = XML_DOC_HEADER + \
+ '<DAV:propertyupdate xmlns:DAV="DAV:"' + xmlns + \
+ '><DAV:remove><DAV:prop><' + ns + \
+ string.joinfields(names, '/><' + ns) + \
+ '/></DAV:prop></DAV:remove></DAV:propertyupdate>'
+ return self.proppatch(url, body)
+
+ def setprops(self, url, *xmlprops, **props):
+ assert xmlprops or props, 'at least one property must be provided'
+ xmlprops = list(xmlprops)
+ if props.has_key('ns'):
+ xmlns = ' xmlns:NS="' + props['ns'] + '"'
+ ns = 'NS:'
+ del props['ns']
+ else:
+ xmlns = ns = ''
+ for key, value in props.items():
+ if value:
+ xmlprops.append('<%s%s>%s</%s%s>' % (ns, key, value, ns, key))
+ else:
+ xmlprops.append('<%s%s/>' % (ns, key))
+ elems = string.joinfields(xmlprops, '')
+ body = XML_DOC_HEADER + \
+ '<DAV:propertyupdate xmlns:DAV="DAV:"' + xmlns + \
+ '><DAV:set><DAV:prop>' + \
+ elems + \
+ '</DAV:prop></DAV:set></DAV:propertyupdate>'
+ return self.proppatch(url, body)
+
+ def get_lock(self, url, owner='', timeout=None, depth=None):
+ response = self.lock(url, owner, timeout, depth)
+ response.parse_lock_response()
+ return response.locktoken
+ \ No newline at end of file