Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFlorent Pigout <florent.pigout@gmail.com>2011-10-17 18:26:34 (GMT)
committer Florent Pigout <florent.pigout@gmail.com>2011-10-17 18:26:34 (GMT)
commit8620b880f8905698f73aad79395737ebf8234ba7 (patch)
tree86debecd68cb675452227a830e872b08e9132c9a
parent5b626df3de717641b1ac11d26a342ecfb659b907 (diff)
add duration wise management + unit tests -> need to be implemented now
-rw-r--r--atoidejouer/db/story.py108
-rw-r--r--tests/test_db_story.py105
2 files changed, 200 insertions, 13 deletions
diff --git a/atoidejouer/db/story.py b/atoidejouer/db/story.py
index 476b610..515be85 100644
--- a/atoidejouer/db/story.py
+++ b/atoidejouer/db/story.py
@@ -1,5 +1,5 @@
# python import
-import logging
+import copy, logging
# get application logger
logger = logging.getLogger('atoidejouer')
@@ -163,6 +163,112 @@ class Key(object):
else:
return "%s %s" % (q, self.where())
+ def _query_duration(self, time, sign='=', order=''):
+ return "select * from story where %s order by %stime" % (
+ " and ".join([
+ "%s='%s'" % ("title", self.title),
+ "%s='%s'" % ("mime_type", self.mime_type),
+ "%s='%s'" % ("timestamp", self.timestamp),
+ "%s=%s" % ("layer", self.layer),
+ "%s%s%s" % ("time", sign, time),
+ ]),
+ order)
+
+ def update_duration(self, duration):
+ # list next keys
+ key_dura = duration
+ start = self.time + 1
+ _range = self.duration if self.duration > duration else duration
+ for time in range(start, start + _range):
+ key = None
+ for key in DB()._all(query=self._query_duration(time)):
+ break
+ key_dura -= 1
+ # DEBUG
+ # print '\nup time: %s' % time
+ # print 'up query : %s' % self._query_duration(time)
+ # print 'up key : %s' % key
+ # print 'up key_dura: %s' % key_dura
+ # no key?
+ if key is None:
+ new_key = copy.copy(self)
+ new_key.id = None
+ new_key.time = time
+ new_key.duration = key_dura
+ # DEBUG
+ # print 'up new_key: %s' % new_key
+ DB().add(new_key)
+ # remove next keys
+ elif key.time > self.time + duration:
+ key._del(refresh=False)
+ break
+ # update duration of existing key
+ else:
+ key.duration = key_dura
+ DB().update(key)
+ # update key duration after check
+ self.duration = duration
+ DB().update(self)
+ # need some refresh
+ self._duration_refresh()
+
+ def _duration_refresh(self):
+ """Ensure valid duration logic.
+ """
+ def _do_fresh(iter_):
+ """Common method for our generator.
+ """
+ cur = None
+ for key in iter_:
+ if cur is None or key.time < cur - 1:
+ cur = 0
+ key.duration = cur
+ else:
+ cur += 1
+ key.duration = cur
+ # DEBUG
+ # print 'fresh: %s' % key
+ # and update
+ DB().update(key)
+ # update previous
+ end = self.time
+ _do_fresh(DB()._all(query=self._query_duration(end, sign='<', order='-')))
+ # update next
+ start = self.time + self.duration
+ _do_fresh(DB()._all(query=self._query_duration(start, sign='>', order='-')))
+
+ def _del(self, refresh=True):
+ # list next keys
+ key_dura = 0
+ ran_time = self.time + 1
+ ran_dura = self.duration
+ for time in range(ran_time, ran_time + ran_dura):
+ key = None
+ for key in DB()._all(query=self._query_duration(time)):
+ break
+ # DEBUG
+ # print '\ndel time: %s' % time
+ # print 'del query : %s' % self._query_duration(time)
+ # print 'del key : %s' % key
+ # print 'del key_dura: %s' % key_dura
+ if key is None:
+ pass
+ # finish
+ elif key.time > time:
+ break
+ # update duration of existing key
+ else:
+ key.duration = key_dura
+ DB()._del(key)
+ # update next duration
+ key_dura += 1
+ # DEBUG
+ # print 'del old key: %s' % self
+ # and del
+ DB()._del(self)
+ # need some refresh
+ if refresh is True:
+ self._duration_refresh()
class DB(object):
diff --git a/tests/test_db_story.py b/tests/test_db_story.py
index 941372d..66f46fc 100644
--- a/tests/test_db_story.py
+++ b/tests/test_db_story.py
@@ -18,28 +18,28 @@ class TestDBStory(unittest.TestCase):
story.DB()._del(all=True)
def test_all(self):
- all = [r for r in story.DB().all()]
+ all = [r for r in story.DB()._all()]
self.assertEqual(len(all), 0,
"should not have row! found: %s" % len(all))
def test_add(self):
# second row
- key = story.Key(None, 'helo', 'image', 'helo.png')
+ key = story.Key(None, 'helo.png', 'image/png', '12345')
story.DB().add(key)
- all = [r for r in story.DB().all()]
+ all = [r for r in story.DB()._all()]
self.assertEqual(len(all), 1,
"should have 1 row! found: %s" % len(all))
self.assertEqual(all[0], key, "not the same row: %s" % all[0])
# second row
- key = story.Key(None, 'hola', 'image', 'hola.png')
+ key = story.Key(None, 'hola.png', 'image/png', '67890')
story.DB().add(key)
- all = [r for r in story.DB().all()]
+ all = [r for r in story.DB()._all()]
self.assertEqual(len(all), 2,
"should have 1 row! found: %s" % len(all))
self.assertEqual(all[1], key, "not the same row: %s" % all[1])
def test_get(self):
- key = story.Key(None, 'helo', 'image', 'helo.png')
+ key = story.Key(None, 'helo.png', 'image/png', '12345')
story.DB().add(key)
all = [r for r in story.DB().get(story.Key(name='helo'))]
self.assertEqual(len(all), 1,
@@ -47,16 +47,96 @@ class TestDBStory(unittest.TestCase):
self.assertEqual(all[0], key, "not the same row: %s" % all[0])
def test_update(self):
- key = story.Key(None, 'helo', 'image', 'helo.png', layer=1)
+ key = story.Key(None, 'helo.png', 'image/png', '12345', layer=1)
story.DB().add(key)
- all = [r for r in story.DB().all()]
- key = story.Key(id=all[0].id, name='hola', media='hola.png', layer=2)
+ all = [r for r in story.DB()._all()]
+ key = story.Key(id=all[0].id, title='hola.png', timestamp='12345', layer=2)
story.DB().update(key)
- all = [r for r in story.DB().all()]
+ all = [r for r in story.DB()._all()]
self.assertEqual(len(all), 1,
"should have 1 row! found: %s" % len(all))
- exp_key = story.Key(None, 'hola', 'image', 'hola.png', layer=2)
- self.assertEqual(all[0], exp_key, "not the same row: %s" % all[0])
+ # NOTE: title, mime_type and timestamp can not be changed
+ exp_key = story.Key(None, 'helo.png', 'image/png', '12345', layer=2)
+ self.assertEqual(all[0], exp_key,
+ "not the same row\n`%s`\ninstead of\n`%s`" % (all[0], exp_key))
+
+ def test_duration(self):
+ story.DB().add(story.Key(title='helo', mime_type='img', timestamp='12345', time=0))
+ story.DB().add(story.Key(title='helo', mime_type='img', timestamp='12345', time=4))
+ story.DB().add(story.Key(title='helo', mime_type='img', timestamp='12345', time=5))
+ story.DB().add(story.Key(title='helo', mime_type='img', timestamp='12345', time=6))
+ _all = [r for r in story.DB()._all()]
+ # duration update +2
+ # print '+2 ...'
+ _all[0].update_duration(2)
+ _all = [r for r in story.DB()._all()]
+ self.assertEqual(len(_all), 6,
+ "should have 6 rows! found: %s" % _all)
+ self.assertEqual(_all[4].time, 1)
+ self.assertEqual(_all[5].time, 2)
+ self.assertEqual(_all[4].duration, 1)
+ self.assertEqual(_all[5].duration, 0)
+ self.assertEqual(_all[1].duration, 2, _all[1])
+ self.assertEqual(_all[2].duration, 1, _all[2])
+ self.assertEqual(_all[3].duration, 0, _all[3])
+ # duration update +3
+ # print '+3 ...'
+ _all[0].update_duration(3)
+ _all = [r for r in story.DB()._all()]
+ self.assertEqual(len(_all), 7,
+ "should have 7 rows! found: %s" % len(_all))
+ self.assertEqual(_all[4].time, 1)
+ self.assertEqual(_all[5].time, 2)
+ self.assertEqual(_all[6].time, 3)
+ self.assertEqual(_all[4].duration, 2)
+ self.assertEqual(_all[5].duration, 1)
+ self.assertEqual(_all[6].duration, 0)
+ self.assertEqual(_all[1].duration, 2)
+ self.assertEqual(_all[2].duration, 1)
+ self.assertEqual(_all[3].duration, 0)
+ # duration update +5
+ # print '+5 ...'
+ _all[0].update_duration(5)
+ _all = [r for r in story.DB()._all()]
+ self.assertEqual(len(_all), 7,
+ "should have 7 rows! found: %s" % len(_all))
+ self.assertEqual(_all[4].time, 1)
+ self.assertEqual(_all[5].time, 2)
+ self.assertEqual(_all[6].time, 3)
+ self.assertEqual(_all[1].time, 4)
+ self.assertEqual(_all[2].time, 5)
+ self.assertEqual(_all[3].time, 6)
+ self.assertEqual(_all[4].duration, 4)
+ self.assertEqual(_all[5].duration, 3)
+ self.assertEqual(_all[6].duration, 2)
+ self.assertEqual(_all[1].duration, 1)
+ self.assertEqual(_all[2].duration, 0)
+ self.assertEqual(_all[3].duration, 0)
+ # duration update +3 again
+ # print '+3 again ...'
+ _all[0].update_duration(3)
+ _all = [r for r in story.DB()._all()]
+ self.assertEqual(len(_all), 5,
+ "should have 5 rows! found: %s" % _all)
+ self.assertEqual(_all[2].time, 1)
+ self.assertEqual(_all[3].time, 2)
+ self.assertEqual(_all[4].time, 3)
+ self.assertEqual(_all[1].time, 6)
+ self.assertEqual(_all[2].duration, 2)
+ self.assertEqual(_all[3].duration, 1)
+ self.assertEqual(_all[4].duration, 0)
+ self.assertEqual(_all[1].duration, 0)
+ # duration update +0 back
+ # print '+0 back ...'
+ _all[0].update_duration(0)
+ _all = [r for r in story.DB()._all()]
+ self.assertEqual(len(_all), 2,
+ "should have 4 rows! found: %s" % len(_all))
+ self.assertEqual(_all[0].time, 0)
+ self.assertEqual(_all[1].time, 6)
+ self.assertEqual(_all[0].duration, 0)
+ self.assertEqual(_all[1].duration, 0)
+
def suite():
suite = unittest.TestSuite()
@@ -64,6 +144,7 @@ def suite():
suite.addTest(TestDBStory('test_add'))
suite.addTest(TestDBStory('test_get'))
suite.addTest(TestDBStory('test_update'))
+ suite.addTest(TestDBStory('test_duration'))
return suite