From 8620b880f8905698f73aad79395737ebf8234ba7 Mon Sep 17 00:00:00 2001 From: Florent Pigout Date: Mon, 17 Oct 2011 18:26:34 +0000 Subject: add duration wise management + unit tests -> need to be implemented now --- diff --git a/atoidejouer/db/story.py b/atoidejouer/db/story.py index 476b610..515be85 100644 --- a/atoidejouer/db/story.py +++ b/atoidejouer/db/story.py @@ -1,5 +1,5 @@ # python import -import logging +import copy, logging # get application logger logger = logging.getLogger('atoidejouer') @@ -163,6 +163,112 @@ class Key(object): else: return "%s %s" % (q, self.where()) + def _query_duration(self, time, sign='=', order=''): + return "select * from story where %s order by %stime" % ( + " and ".join([ + "%s='%s'" % ("title", self.title), + "%s='%s'" % ("mime_type", self.mime_type), + "%s='%s'" % ("timestamp", self.timestamp), + "%s=%s" % ("layer", self.layer), + "%s%s%s" % ("time", sign, time), + ]), + order) + + def update_duration(self, duration): + # list next keys + key_dura = duration + start = self.time + 1 + _range = self.duration if self.duration > duration else duration + for time in range(start, start + _range): + key = None + for key in DB()._all(query=self._query_duration(time)): + break + key_dura -= 1 + # DEBUG + # print '\nup time: %s' % time + # print 'up query : %s' % self._query_duration(time) + # print 'up key : %s' % key + # print 'up key_dura: %s' % key_dura + # no key? + if key is None: + new_key = copy.copy(self) + new_key.id = None + new_key.time = time + new_key.duration = key_dura + # DEBUG + # print 'up new_key: %s' % new_key + DB().add(new_key) + # remove next keys + elif key.time > self.time + duration: + key._del(refresh=False) + break + # update duration of existing key + else: + key.duration = key_dura + DB().update(key) + # update key duration after check + self.duration = duration + DB().update(self) + # need some refresh + self._duration_refresh() + + def _duration_refresh(self): + """Ensure valid duration logic. + """ + def _do_fresh(iter_): + """Common method for our generator. + """ + cur = None + for key in iter_: + if cur is None or key.time < cur - 1: + cur = 0 + key.duration = cur + else: + cur += 1 + key.duration = cur + # DEBUG + # print 'fresh: %s' % key + # and update + DB().update(key) + # update previous + end = self.time + _do_fresh(DB()._all(query=self._query_duration(end, sign='<', order='-'))) + # update next + start = self.time + self.duration + _do_fresh(DB()._all(query=self._query_duration(start, sign='>', order='-'))) + + def _del(self, refresh=True): + # list next keys + key_dura = 0 + ran_time = self.time + 1 + ran_dura = self.duration + for time in range(ran_time, ran_time + ran_dura): + key = None + for key in DB()._all(query=self._query_duration(time)): + break + # DEBUG + # print '\ndel time: %s' % time + # print 'del query : %s' % self._query_duration(time) + # print 'del key : %s' % key + # print 'del key_dura: %s' % key_dura + if key is None: + pass + # finish + elif key.time > time: + break + # update duration of existing key + else: + key.duration = key_dura + DB()._del(key) + # update next duration + key_dura += 1 + # DEBUG + # print 'del old key: %s' % self + # and del + DB()._del(self) + # need some refresh + if refresh is True: + self._duration_refresh() class DB(object): diff --git a/tests/test_db_story.py b/tests/test_db_story.py index 941372d..66f46fc 100644 --- a/tests/test_db_story.py +++ b/tests/test_db_story.py @@ -18,28 +18,28 @@ class TestDBStory(unittest.TestCase): story.DB()._del(all=True) def test_all(self): - all = [r for r in story.DB().all()] + all = [r for r in story.DB()._all()] self.assertEqual(len(all), 0, "should not have row! found: %s" % len(all)) def test_add(self): # second row - key = story.Key(None, 'helo', 'image', 'helo.png') + key = story.Key(None, 'helo.png', 'image/png', '12345') story.DB().add(key) - all = [r for r in story.DB().all()] + all = [r for r in story.DB()._all()] self.assertEqual(len(all), 1, "should have 1 row! found: %s" % len(all)) self.assertEqual(all[0], key, "not the same row: %s" % all[0]) # second row - key = story.Key(None, 'hola', 'image', 'hola.png') + key = story.Key(None, 'hola.png', 'image/png', '67890') story.DB().add(key) - all = [r for r in story.DB().all()] + all = [r for r in story.DB()._all()] self.assertEqual(len(all), 2, "should have 1 row! found: %s" % len(all)) self.assertEqual(all[1], key, "not the same row: %s" % all[1]) def test_get(self): - key = story.Key(None, 'helo', 'image', 'helo.png') + key = story.Key(None, 'helo.png', 'image/png', '12345') story.DB().add(key) all = [r for r in story.DB().get(story.Key(name='helo'))] self.assertEqual(len(all), 1, @@ -47,16 +47,96 @@ class TestDBStory(unittest.TestCase): self.assertEqual(all[0], key, "not the same row: %s" % all[0]) def test_update(self): - key = story.Key(None, 'helo', 'image', 'helo.png', layer=1) + key = story.Key(None, 'helo.png', 'image/png', '12345', layer=1) story.DB().add(key) - all = [r for r in story.DB().all()] - key = story.Key(id=all[0].id, name='hola', media='hola.png', layer=2) + all = [r for r in story.DB()._all()] + key = story.Key(id=all[0].id, title='hola.png', timestamp='12345', layer=2) story.DB().update(key) - all = [r for r in story.DB().all()] + all = [r for r in story.DB()._all()] self.assertEqual(len(all), 1, "should have 1 row! found: %s" % len(all)) - exp_key = story.Key(None, 'hola', 'image', 'hola.png', layer=2) - self.assertEqual(all[0], exp_key, "not the same row: %s" % all[0]) + # NOTE: title, mime_type and timestamp can not be changed + exp_key = story.Key(None, 'helo.png', 'image/png', '12345', layer=2) + self.assertEqual(all[0], exp_key, + "not the same row\n`%s`\ninstead of\n`%s`" % (all[0], exp_key)) + + def test_duration(self): + story.DB().add(story.Key(title='helo', mime_type='img', timestamp='12345', time=0)) + story.DB().add(story.Key(title='helo', mime_type='img', timestamp='12345', time=4)) + story.DB().add(story.Key(title='helo', mime_type='img', timestamp='12345', time=5)) + story.DB().add(story.Key(title='helo', mime_type='img', timestamp='12345', time=6)) + _all = [r for r in story.DB()._all()] + # duration update +2 + # print '+2 ...' + _all[0].update_duration(2) + _all = [r for r in story.DB()._all()] + self.assertEqual(len(_all), 6, + "should have 6 rows! found: %s" % _all) + self.assertEqual(_all[4].time, 1) + self.assertEqual(_all[5].time, 2) + self.assertEqual(_all[4].duration, 1) + self.assertEqual(_all[5].duration, 0) + self.assertEqual(_all[1].duration, 2, _all[1]) + self.assertEqual(_all[2].duration, 1, _all[2]) + self.assertEqual(_all[3].duration, 0, _all[3]) + # duration update +3 + # print '+3 ...' + _all[0].update_duration(3) + _all = [r for r in story.DB()._all()] + self.assertEqual(len(_all), 7, + "should have 7 rows! found: %s" % len(_all)) + self.assertEqual(_all[4].time, 1) + self.assertEqual(_all[5].time, 2) + self.assertEqual(_all[6].time, 3) + self.assertEqual(_all[4].duration, 2) + self.assertEqual(_all[5].duration, 1) + self.assertEqual(_all[6].duration, 0) + self.assertEqual(_all[1].duration, 2) + self.assertEqual(_all[2].duration, 1) + self.assertEqual(_all[3].duration, 0) + # duration update +5 + # print '+5 ...' + _all[0].update_duration(5) + _all = [r for r in story.DB()._all()] + self.assertEqual(len(_all), 7, + "should have 7 rows! found: %s" % len(_all)) + self.assertEqual(_all[4].time, 1) + self.assertEqual(_all[5].time, 2) + self.assertEqual(_all[6].time, 3) + self.assertEqual(_all[1].time, 4) + self.assertEqual(_all[2].time, 5) + self.assertEqual(_all[3].time, 6) + self.assertEqual(_all[4].duration, 4) + self.assertEqual(_all[5].duration, 3) + self.assertEqual(_all[6].duration, 2) + self.assertEqual(_all[1].duration, 1) + self.assertEqual(_all[2].duration, 0) + self.assertEqual(_all[3].duration, 0) + # duration update +3 again + # print '+3 again ...' + _all[0].update_duration(3) + _all = [r for r in story.DB()._all()] + self.assertEqual(len(_all), 5, + "should have 5 rows! found: %s" % _all) + self.assertEqual(_all[2].time, 1) + self.assertEqual(_all[3].time, 2) + self.assertEqual(_all[4].time, 3) + self.assertEqual(_all[1].time, 6) + self.assertEqual(_all[2].duration, 2) + self.assertEqual(_all[3].duration, 1) + self.assertEqual(_all[4].duration, 0) + self.assertEqual(_all[1].duration, 0) + # duration update +0 back + # print '+0 back ...' + _all[0].update_duration(0) + _all = [r for r in story.DB()._all()] + self.assertEqual(len(_all), 2, + "should have 4 rows! found: %s" % len(_all)) + self.assertEqual(_all[0].time, 0) + self.assertEqual(_all[1].time, 6) + self.assertEqual(_all[0].duration, 0) + self.assertEqual(_all[1].duration, 0) + def suite(): suite = unittest.TestSuite() @@ -64,6 +144,7 @@ def suite(): suite.addTest(TestDBStory('test_add')) suite.addTest(TestDBStory('test_get')) suite.addTest(TestDBStory('test_update')) + suite.addTest(TestDBStory('test_duration')) return suite -- cgit v0.9.1