Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/atoideweb/tools/storage.py
diff options
context:
space:
mode:
Diffstat (limited to 'atoideweb/tools/storage.py')
-rw-r--r--atoideweb/tools/storage.py487
1 files changed, 4 insertions, 483 deletions
diff --git a/atoideweb/tools/storage.py b/atoideweb/tools/storage.py
index f3f40ca..329b1f8 100644
--- a/atoideweb/tools/storage.py
+++ b/atoideweb/tools/storage.py
@@ -7,7 +7,7 @@ from gettext import gettext as _
# png import from pypng
from lib import png
-# atoidejouer import
+# atoideweb import
from atoideweb.tools import config
# ...
@@ -23,7 +23,7 @@ else:
BUND_PATH = activity.get_bundle_path()
# get application logger
-logger = logging.getLogger('atoidejouer')
+logger = logging.getLogger('atoideweb')
ACTIVITY_NAMES = {
'paint': 'org.laptop.Oficina',
@@ -40,39 +40,6 @@ def get_config_path():
return os.path.join(BUND_PATH, 'static', 'data', 'config', 'config.ini')
-def get_sequence_items(sequence_path):
- if os.path.exists(sequence_path):
- _f = open(sequence_path)
- _rows = _f.readlines()
- _f.close()
- else:
- return []
- # ..
- _names = list()
- for _n in _rows:
- _n = _n.strip()
- if _n == '':
- continue
- else:
- _names.append(_n)
- # ..
- return _names
-
-
-def get_sequence_path(type_, sequence_name):
- return os.path.join(ROOT_PATH, 'data',
- 'sequences', type_, '%s.seq' % sequence_name)
-
-
-def get_sequence_first_graphic_name(type_, sequence_name):
- # seq file
- _f = open(get_sequence_path(type_, sequence_name))
- _names = _f.readlines()
- _f.close()
- # ..
- return None if len(_names) == 0 else _names[0].strip()
-
-
def get_sound_path(filename, dir_='sounds'):
# return path
return os.path.join(ROOT_PATH, 'data', dir_,
@@ -87,7 +54,7 @@ def get_icon_path(stock_id):
def get_image_path(filename, dir_='graphics'):
# return path
- if filename in ['background_default', 'mask_default']\
+ if filename in ['background_default']\
or dir_=='data':
return os.path.join(BUND_PATH, 'static', 'data',
'graphics', '%s.png' % filename)
@@ -96,140 +63,6 @@ def get_image_path(filename, dir_='graphics'):
'%s.png' % filename)
-def __remove_inner_true(a_list):
- _new_list = list()
- # ..
- _has_false = False
- # ..
- for _i, _v in enumerate(a_list):
- if _v is False:
- _has_false = True
- else:
- _sub = a_list[_i+1:]
- if _has_false is True\
- and _sub.count(False) != 0:
- _new_list.append(False)
- continue
- else:
- pass
- # ..
- _new_list.append(_v)
- # ..
- del a_list
- # ..
- return _new_list
-
-
-def png_from_pixbuf(filename, timestamp):
- # prepare outpath
- _out_path = get_image_path(filename)
- if os.path.exists(_out_path):
- return
- else:
- pass
- # prepare inpath
- _in_path = get_path_from_journal(timestamp, 'image/png')
- # init png reader
- _reader = png.Reader(filename=_in_path)
- # read the file
- _w, _h, _pixels, _metadata = _reader.read()
- # init working vars
- _new_pixels = list()
- _first_color = None
- _remove_row_list = [True for _dummy in range(_h)]
- _remove_col_list = [True for _dummy in range(_w)]
- # ..
- _planes = _metadata['planes']
- # update vars
- for _i, _row in enumerate(_pixels):
- # init new row
- _new_row = list()
- for _j, _col in enumerate(_row):
- # upate rgb
- if _j % _planes == 0:
- _rgb = [_col]
- continue
- else:
- _rgb.append(_col)
- # update color first and after
- if _j % _planes == (_planes - 1):
- # keep the first color
- if _first_color is None:
- _first_color = _rgb
- else:
- pass
- # make it alpha if first
- if _rgb == _first_color:
- _new_row.extend([0, 0, 0, 0])
- else:
- _remove_row_list[_i] = False
- _remove_col_list[(_j/_planes)-1] = False
- # small hack
- if _planes == 3:
- _rgb.append(255)
- else:
- pass
- _new_row.extend(_rgb)
- else:
- continue
- # add new row
- _new_pixels.append(_new_row)
- # cleaning
- del _reader
- del _pixels
- # remove inner True in cols or rows
- _remove_row_list = __remove_inner_true(_remove_row_list)
- _remove_col_list = __remove_inner_true(_remove_col_list)
- # init working vars
- _new_new_pixels = list()
- # 2cd pass
- for _i, _row in enumerate(_new_pixels):
- # transparent row
- if _remove_row_list[_i] is True:
- continue
- else:
- # init new row
- _new_new_row = list()
- # ..
- for _j, _col in enumerate(_row):
- # upate rgb
- if _j % 4 == 0:
- _rgb = [_col]
- continue
- else:
- _rgb.append(_col)
- # update color first and after
- if _j % 4 == 3:
- # transparent col
- if _remove_col_list[(_j/4)-1] is True:
- continue
- else:
- _new_new_row.extend(_rgb)
- else:
- continue
- # sorry for that!
- _new_new_pixels.append(_new_new_row)
- # cleaning
- del _new_pixels
- del _remove_row_list
- del _remove_col_list
- # update h and w
- _w = len(_new_new_pixels[0])/4
- _h = len(_new_new_pixels)
- # update alpha meta
- _metadata['alpha'] = True
- _metadata['planes'] = 4
- del _metadata['size']
- # write the new image with alpha
- _new_png = open(_out_path, 'wb')
- _writer = png.Writer(_w, _h, **_metadata)
- _writer.write(_new_png, _new_new_pixels)
- _new_png.close()
- # just in case
- del _writer
- del _new_new_pixels
-
-
def __do_query(query):
from sugar.datastore import datastore
# find in ds
@@ -298,7 +131,7 @@ def list_files_from_journal(activity_name=None, mime_type=None):
for _o in _objs:
# TODO open the files
yield _o.get_file_path()
-
+
def get_path_from_journal(timestamp, mime_type):
from sugar.datastore import datastore
@@ -315,315 +148,3 @@ def get_path_from_journal(timestamp, mime_type):
return _results[0].get_file_path()
else:
return None
-
-
-def __check_dir(dir_name, parent='data'):
- # get activity path
- if parent is None:
- _dir = os.path.join(ROOT_PATH, dir_name)
- else:
- _dir = os.path.join(ROOT_PATH, parent, dir_name)
- # ensure activity path
- if os.path.exists(_dir):
- pass
- else:
- os.mkdir(_dir)
-
-
-def __check_file(sub_path, file_name):
- # ..
- __check_dir(sub_path)
- # file path
- _path = os.path.join(ROOT_PATH, 'data', sub_path,
- file_name)
- # ensure file
- if os.path.exists(_path):
- pass
- else:
- # get bundle path
- _p = os.path.join(BUND_PATH, 'static', 'ext',
- sub_path, file_name)
- # copy
- shutil.copy(_p, _path)
-
-
-def __check_dir_files(sub_path):
- # get bundle path
- _path = os.path.join(BUND_PATH, 'static', 'ext', sub_path)
- # file by file
- for _f in os.listdir(_path):
- # full path
- _p = os.path.join(_path, _f)
- # little check
- if os.path.isdir(_p):
- pass
- else:
- __check_file(sub_path, _f)
-
-
-def init_activity_folder():
- # graphics
- __check_dir_files('graphics')
- # sounds
- __check_dir_files('sounds')
- # sequences
- __check_dir('sequences')
- __check_dir_files(os.path.join('sequences', 'graphics'))
- __check_dir_files(os.path.join('sequences', 'sounds'))
- # stories
- __check_dir_files('stories')
-
-
-def __show_in_out_result_message(label, message):
- # ..
- label.set_markup('<span size="large" style="italic">%s</span>' % message)
- label.show()
-
-
-def __merge_dir(project_name, dir_name, exist_list=None):
- # archive path
- _path_src = os.path.join(ROOT_PATH, 'tmp', project_name,
- dir_name)
- # little check
- if os.path.exists(_path_src):
- # project path
- _path_dst = os.path.join(ROOT_PATH, 'data',
- dir_name)
- # init existing list
- exist_list = list() if exist_list is None else exist_list
- for _f in os.listdir(_path_src):
- # ..
- _p_src = os.path.join(_path_src, _f)
- _p_dst = os.path.join(_path_dst, _f)
- # little check
- if os.path.isdir(_p_src):
- continue
- # do not replace
- elif os.path.exists(_p_dst):
- # update exist list
- exist_list.append(os.path.join(dir_name, _f))
- # do copy
- else:
- shutil.copy(_p_src, _path_dst)
- # OK!
- return True
- else:
- # Oops!
- return False
-
-
-def __import_keys(activity_, project_name):
- # ..
- _path_data = os.path.join(ROOT_PATH, 'tmp',
- project_name, 'story.keys')
- # init content
- _data = None
- # little check
- if os.path.exists(_path_data):
- # read file
- _file = open(_path_data, 'r')
- try:
- _data = _file.read()
- finally:
- _file.close()
- # parse json data
- _exist_graphic_keys = activity_.graphic_keys.loads(_data, clear=False)
- _exist_sound_keys = activity_.sound_keys.loads(_data, clear=False)
- # set activity new number of keys
- activity_.update_number_of_keys()
- # ..
- return {
- 'graphics': _exist_graphic_keys,
- 'sounds': _exist_sound_keys,
- }
- # ?? invalid archive
- else:
- return None
-
-
-def import_project(activity_, file_path, msg_label):
- # clean tmp dir
- __remove_dir('tmp', parent=None)
- __check_dir('tmp', parent=None)
- # ..
- _tmp_root = os.path.join(ROOT_PATH, 'tmp')
- try:
- # copy file to tmp
- _tar_path = os.path.join(_tmp_root, '__tmp.tar.bz2')
- shutil.copy(file_path, _tar_path)
- # change dir for unzipping
- os.chdir(_tmp_root)
- # extract files in tmp dir
- _tar = tarfile.open(file_path)
- _p_name = _tar.getnames()[0]
- _tar.extractall()
- _tar.close()
- except Exception, e:
- # prepare message
- _msg = _('Project import failed!')
- _msg += _('\n\n[Error] Can not read archive file!')
- # remove tmp structure
- __remove_dir('tmp', parent=None)
- # quit!
- return __show_in_out_result_message(msg_label, _msg)
- # merge dirs
- _exist_list = list()
- if __merge_dir(_p_name, 'graphics', exist_list=_exist_list)\
- and __merge_dir(_p_name, 'sounds', exist_list=_exist_list)\
- and __merge_dir(_p_name, os.path.join('sequences', 'graphics'),
- exist_list=_exist_list)\
- and __merge_dir(_p_name, os.path.join('sequences', 'sounds'),
- exist_list=_exist_list):
- # init result message
- _msg = _('Project sucessfully imported')
- else:
- # prepare message
- _msg = _('Project import failed!')
- _msg += _('\n\n[Error] Can not load files!')
- # remove tmp structure
- __remove_dir('tmp', parent=None)
- # quit!
- return __show_in_out_result_message(msg_label, _msg)
- # existing files stop
- if len(_exist_list) == 0:
- pass
- else:
- # prepare message
- _msg += _('\n\n[Warning] Following files already exist:\n')
- for _f in _exist_list:
- _msg = '%s - %s\n' % (_msg, _f)
- # merge keys
- _existing_dict = __import_keys(activity_, _p_name)
- if _existing_dict is None:
- # prepare message
- _msg = _('Project import failed!')
- _msg += _('\n\n[Error] Can not load keys!')
- # remove tmp structure
- __remove_dir('tmp', parent=None)
- # quit!
- return __show_in_out_result_message(msg_label, _msg)
- if len(_existing_dict['graphics']) == 0\
- or len(_existing_dict['sounds']) == 0:
- pass
- else:
- # prepare message
- _msg += _('\n\n[Warning] Following sequences already exist:\n')
- for _s in _existing_dict['graphics']:
- _msg = '%s - graphics.%s\n' % (_msg, _s)
- _msg = '%s\n' % _msg
- for _s in _existing_dict['sounds']:
- _msg = '%s - sounds.%s\n' % (_msg, _s)
- # remove tmp structure
- __remove_dir('tmp', parent=None)
- # show result
- __show_in_out_result_message(msg_label, _msg)
-
-
-def __remove_dir(dir_name, parent=None):
- # get activity path
- if parent is None:
- _dir = os.path.join(ROOT_PATH, dir_name)
- _next_parent = dir_name
- else:
- _dir = os.path.join(ROOT_PATH, parent, dir_name)
- _next_parent = os.path.join(parent, dir_name)
- # remove files and dir recursively
- if os.path.exists(_dir):
- for _f in os.listdir(_dir):
- _p = os.path.join(_dir, _f)
- if os.path.isdir(_p):
- __remove_dir(_f, parent=_next_parent)
- else:
- os.remove(_p)
- # and remove the dir
- if os.path.exists(_dir):
- os.removedirs(_dir)
- else:
- pass
- # nothing to do
- else:
- pass
-
-
-def __export_seq_and_res(activity_, tmp_root, type_='graphics'):
- # path updates
- _seq_src = os.path.join(ROOT_PATH, 'data', 'sequences',
- type_)
- _seq_dst = os.path.join(tmp_root, 'sequences', type_)
- # ..
- _res_root = os.path.join(ROOT_PATH, 'data', type_)
- _res_dst = os.path.join(tmp_root, type_)
- # keys factory
- _keys = activity_.graphic_keys if type_ == 'graphics'\
- else activity_.sound_keys
- # set res ext
- _ext = '.png' if type_ == 'graphics' else '.ogg'
- # copy
- for _n in _keys._names:
- if _n.strip() == '':
- continue
- else:
- _s_path = os.path.join(_seq_src, '%s.seq' % _n)
- shutil.copy(_s_path, _seq_dst)
- for _res in get_sequence_items(_s_path):
- _res_path = os.path.join(_res_root, '%s%s' % (_res, _ext))
- shutil.copy(_res_path, _res_dst)
-
-
-def export_project(activity_, msg_label, media):
- # get the toolbar
- _toolbar = activity_._toolbox.get_activity_toolbar()
- # get the projet name
- _name = _toolbar.title.get_text()
- # clean tmp dir first
- __remove_dir('tmp', parent=None)
- __check_dir('tmp', parent=None)
- # create a tmp stucture
- __check_dir(_name, parent='tmp')
- __check_dir(os.path.join(_name, 'graphics'), parent='tmp')
- __check_dir(os.path.join(_name, 'sequences'), parent='tmp')
- __check_dir(os.path.join(_name, 'sequences', 'graphics'), parent='tmp')
- __check_dir(os.path.join(_name, 'sequences', 'sounds'), parent='tmp')
- __check_dir(os.path.join(_name, 'sounds'), parent='tmp')
- # ..
- _tmp_root = os.path.join(ROOT_PATH, 'tmp')
- _out_root = os.path.join(_tmp_root, _name)
- # copy keys
- _keys_path = os.path.join(_out_root, 'story.keys')
- activity_.write_file(_keys_path)
- # copy sequences and resources
- __export_seq_and_res(activity_, _out_root, type_='graphics')
- __export_seq_and_res(activity_, _out_root, type_='sounds')
- # change dir for zipping
- os.chdir(_tmp_root)
- # zip all
- _tar_name = '%s.tar.bz2' % _name
- # ..
- _tar = tarfile.open(_tar_name, "w:bz2")
- _tar.add(_name)
- _tar.close()
- # try to copy
- try:
- if os.path.exists(os.path.join('/media', media, _tar_name)):
- # ..
- _msg = _('Project') + ' "' + _name + '" '
- _msg += _('already exported to') + ' "' + media + '" '
- else:
- # ..
- shutil.copy(os.path.join(_tmp_root, _tar_name),
- os.path.join('/media', media))
- # ..
- _msg = _('Project') + ' "' + _name + '" '
- _msg += _('sucessfully exported to') + ' "' + media + '" '
- except Exception, e:
- # ERROR
- logger.error('[storage] export_project - e: %s' % e)
- # ERROR
- # ..
- # ..
- _msg = _('Project') + ' "' + _name + '" '
- _msg += _('export to') + ' "' + media + '" ' + _('failed!')
- # remove tmp structure
- __remove_dir('tmp', parent=None)
- # tmp message
- __show_in_out_result_message(msg_label, _msg)