Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/src/olpc/datastore/filestore.py
blob: 07617232b735291a6ae576113cfa1865cc48d4ba (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
import os
import errno
import logging

import gobject

from olpc.datastore import layoutmanager

class FileStore(object):
    """Handle the storage of one file per entry.

    """
    def __init__(self): 
        self._enqueue_checksum_id = None

        # TODO: add protection against store and retrieve operations on entries
        # that are being processed async.

    def store(self, uid, file_path, transfer_ownership, completion_cb):
        """Store a file for a given entry.
           
        """
        dir_path = layoutmanager.get_instance().get_entry_path(uid)
        if not os.path.exists(dir_path):
            os.makedirs(dir_path)

        destination_path = os.path.join(dir_path, uid)
        if file_path:
            if not os.path.isfile(file_path):
                raise ValueError('No file at %r' % file_path)
            if transfer_ownership:
                try:
                    logging.debug('FileStore moving from %r to %r' % \
                                  (file_path, destination_path))
                    os.rename(file_path, destination_path)
                    self._enqueue_checksum(uid)
                    completion_cb()
                except OSError, e:
                    if e.errno == errno.EXDEV:
                        self._async_copy(uid, file_path, destination_path,
                                completion_cb)
                    else:
                        raise
            else:
                self._async_copy(uid, file_path, destination_path,
                        completion_cb)
        elif not file_path and os.path.exists(destination_path):
            os.remove(destination_path)
            completion_cb()
        else:
            logging.debug('FileStore: Nothing to do')
            completion_cb()

    def _async_copy(self, uid, file_path, destination_path, completion_cb):
        """Start copying a file asynchronously.
        
        """
        logging.debug('FileStore copying from %r to %r' % \
                      (file_path, destination_path))
        async_copy = AsyncCopy(file_path, destination_path,
                lambda exception: self._async_copy_completion_cb(uid,
                                                                 completion_cb,
                                                                 exception))
        async_copy.start()

    def _async_copy_completion_cb(self, uid, completion_cb, exception):
        """Callback called when an asynchronous copy has finished.
        
        """
        if exception is None:
            self._enqueue_checksum(uid)
        completion_cb(exception)

    def _enqueue_checksum(self, uid):
        """Add an entry to a queue of entries to be checked for duplicates.

        """
        queue_path = layoutmanager.get_instance().get_queue_path()
        open(os.path.join(queue_path, uid), 'w').close()
        logging.debug('_enqueue_checksum %r' % os.path.join(queue_path, uid))
        if self._enqueue_checksum_id is None:
            self._enqueue_checksum_id = \
                    gobject.idle_add(self._compute_checksum_cb,
                                     priority=gobject.PRIORITY_LOW)

    def _identical_file_already_exists(self, checksum):
        """Check if we already have files with this checksum.

        """
        checksums_dir = layoutmanager.get_instance().get_checksums_dir()
        checksum_path = os.path.join(checksums_dir, checksum)
        return os.path.exists(checksum_path)

    def _get_file_from_checksum(self, checksum):
        """Get a file that matches checksum.

        """
        checksums_dir = layoutmanager.get_instance().get_checksums_dir()
        checksum_path = os.path.join(checksums_dir, checksum)
        first_file_link = os.listdir(checksum_path)[0]
        first_file = os.readlink(os.path.join(checksum_path, first_file_link))
        return first_file

    def _create_checksum_dir(self, checksum):
        """Create directory that tracks files with this same checksum.
        
        """
        checksums_dir = layoutmanager.get_instance().get_checksums_dir()
        checksum_path = os.path.join(checksums_dir, checksum)
        logging.debug('create dir %r' % checksum_path)
        os.mkdir(checksum_path)

    def _add_checksum_entry(self, uid, checksum):
        """Create a symbolic link in the checksum dir to the file in the entry
           dir and another one in the entry path to the checksum dir.

        """
        entry_path = layoutmanager.get_instance().get_entry_path(uid)
        checksums_dir = layoutmanager.get_instance().get_checksums_dir()
        checksum_path = os.path.join(checksums_dir, checksum)

        logging.debug('symlink %r -> %r' % (os.path.join(checksum_path, uid),
                                            os.path.join(entry_path, uid)))
        os.symlink(os.path.join(entry_path, uid),
                   os.path.join(checksum_path, uid))

        logging.debug('symlink %r -> %r' % \
                (os.path.join(entry_path, 'checksum'), checksum_path))
        os.symlink(checksum_path, os.path.join(entry_path, 'checksum'))

    def _remove_checksum_entry(self, uid):
        """Remove links created in _add_checksum_entry() and the checksum dir
           if empty.

        """
        entry_path = layoutmanager.get_instance().get_entry_path(uid)
        checksum = os.readlink(os.path.join(entry_path, 'checksum'))

        checksums_dir = layoutmanager.get_instance().get_checksums_dir()
        checksum_path = os.path.join(checksums_dir, checksum)

        os.remove(os.path.join(checksum_path, uid))
        try:
            os.rmdir(checksum_path)
        except OSError, e:
            if e.errno != errno.ENOTEMPTY:
                raise

        os.remove(os.path.join(entry_path, 'checksum'))

    def _already_linked(self, uid, checksum):
        """Check if this entry's file is already a hard link to the checksums
           dir.
        
        """
        checksums_dir = layoutmanager.get_instance().get_checksums_dir()
        checksum_path = os.path.join(checksums_dir, checksum)
        return os.path.exists(os.path.join(checksum_path, uid))

    def _compute_checksum_cb(self):
        """Process one item in the checksums queue by calculating its checksum,
           checking if there exist already an identical file, and in that case
           substituting its file with a hard link to that pre-existing file.
        
        """
        queue_path = layoutmanager.get_instance().get_queue_path()
        queue = os.listdir(queue_path)
        if queue:
            uid = queue[0]
            logging.debug('_compute_checksum_cb processing %r' % uid)
            entry_path = layoutmanager.get_instance().get_entry_path(uid)
            file_in_entry_path = os.path.join(entry_path, uid)
            checksum = self._calculate_md5sum(os.path.join(entry_path, uid))

            if self._identical_file_already_exists(checksum):
                if not self._already_linked(uid, checksum):
                    logging.debug('delete %r' % file_in_entry_path)
                    os.remove(file_in_entry_path)

                    existing_file = self._get_file_from_checksum(checksum)
                    logging.debug('link %r -> %r' % \
                            (existing_file, file_in_entry_path))
                    os.link(existing_file, file_in_entry_path)

                    self._add_checksum_entry(uid, checksum)
            else:
                self._create_checksum_dir(checksum)
                self._add_checksum_entry(uid, checksum)

            os.remove(os.path.join(queue_path, uid))

        if len(queue) <= 1:
            self._enqueue_checksum_id = None
            return False
        else:
            return True

    def _calculate_md5sum(self, path):
        """Calculate the md5 checksum of a given file.
        
        """
        in_, out = os.popen2(['md5sum', path])
        return out.read().split(' ', 1)[0]

    def retrieve(self, uid, user_id):
        """Place the file associated to a given entry into a directory where the
            user can read it. The caller is reponsible for deleting this file.
        
        """
        dir_path = layoutmanager.get_instance().get_entry_path(uid)
        file_path = os.path.join(dir_path, uid)
        if not os.path.exists(file_path):
            return ''

        use_instance_dir = os.path.exists('/etc/olpc-security') and \
                           os.getuid() != user_id
        if use_instance_dir:
            if not user_id:
                raise ValueError("Couldn't determine the current user uid.")
            destination_dir = os.path.join(os.environ['HOME'], 'isolation', '1',
                                           'uid_to_instance_dir', str(user_id))
        else:
            profile = os.environ.get('SUGAR_PROFILE', 'default')
            destination_dir = os.path.join(os.path.expanduser('~'), '.sugar',
                    profile, 'data')
            if not os.path.exists(destination_dir):
                os.makedirs(destination_dir)

        destination_path = os.path.join(destination_dir, uid)

        # Try to make the original file readable. This can fail if the file is
        # in a FAT filesystem.
        try:
            os.chmod(file_path, 0604)
        except OSError, e:
            if e.errno != errno.EPERM:
                raise

        # Try to hard link from the original file to the targetpath. This can
        # fail if the file is in a different filesystem. Do a symlink instead.
        try:
            os.link(file_path, destination_path)
        except OSError, e:
            if e.errno == errno.EXDEV:
                os.symlink(file_path, destination_path)
            else:
                raise

        return destination_path

    def delete(self, uid):
        """Remove the file associated to a given entry.
        
        """
        dir_path = layoutmanager.get_instance().get_entry_path(uid)
        file_path = os.path.join(dir_path, uid)
        if os.path.exists(file_path):
            self._remove_checksum_entry(uid)
            os.remove(file_path)

class AsyncCopy(object):
    """Copy a file in chunks in the idle loop.
    
    """
    CHUNK_SIZE = 65536

    def __init__(self, src, dest, completion):
        self.src = src
        self.dest = dest
        self.completion = completion
        self.src_fp = -1
        self.dest_fp = -1
        self.written = 0
        self.size = 0

    def _cleanup(self):
        os.close(self.src_fp)
        os.close(self.dest_fp)

    def _copy_block(self, user_data=None):
        try:
            data = os.read(self.src_fp, AsyncCopy.CHUNK_SIZE)
            count = os.write(self.dest_fp, data)
            self.written += len(data)

            # error writing data to file?
            if count < len(data):
                logging.error('AC: Error writing %s -> %s: wrote less than '
                        'expected' % (self.src, self.dest))
                self._cleanup()
                self.completion(RuntimeError(
                        'Error writing data to destination file'))
                return False

            # FIXME: emit progress here

            # done?
            if len(data) < AsyncCopy.CHUNK_SIZE:
                self._cleanup()
                self.completion(None)
                return False
        except Exception, err:
            logging.error("AC: Error copying %s -> %s: %r" % \
                    (self.src, self.dest, err))
            self._cleanup()
            self.completion(err)
            return False

        return True

    def start(self):
        self.src_fp = os.open(self.src, os.O_RDONLY)
        self.dest_fp = os.open(self.dest, os.O_RDWR | os.O_TRUNC | os.O_CREAT,
                0644)

        stat = os.fstat(self.src_fp)
        self.size = stat[6]

        gobject.idle_add(self._copy_block)