# -*- coding: utf-8 -*- from django.conf import settings from django.utils import simplejson from django.db import connection try: from django.core.serializers.json import DjangoJSONEncoder except: class DjangoJSONEncoder(simplejson.JSONEncoder): """ JSONEncoder subclass that knows how to encode date/time and decimal types. """ DATE_FORMAT = "%Y-%m-%d" TIME_FORMAT = "%H:%M:%S" def default(self, o): if isinstance(o, datetime.datetime): d = datetime_safe.new_datetime(o) return d.strftime("%s %s" % (self.DATE_FORMAT, self.TIME_FORMAT)) elif isinstance(o, datetime.date): d = datetime_safe.new_date(o) return d.strftime(self.DATE_FORMAT) elif isinstance(o, datetime.time): return o.strftime(self.TIME_FORMAT) elif isinstance(o, decimal.Decimal): return str(o) else: return super(DjangoJSONEncoder, self).default(o) from types import * from UserDict import UserDict class DBHandlerJSONEncoder(DjangoJSONEncoder): """ DjangoJSONEncoder subclass that knows how to encode UserDict types. """ def default(self, obj): if isinstance(obj, UserDict): return dict([(k, self.default(v)) for k, v in obj.items()]) try: return DjangoJSONEncoder.default(self, obj) except TypeError: return obj def json_serialize(obj): return simplejson.dumps(obj, ensure_ascii=False, cls=DBHandlerJSONEncoder) class HandlerBase(object): _keys = None # les noms des champs que retourne la requete class DoesNotExist(Exception): pass def __init__(self, curs = None): self.curs = curs or connection.cursor() def execute(self, query=None, params = []): if query: self.curs.execute(query, params) if self.curs.description: # cas update self._keys = [ d[0] for d in self.curs.description ] return self.curs def dict_fetchone(self, query=None, params = []): self.execute(query, params) vals = self.curs.fetchone() if vals: return dict(zip(self._keys, vals)) return {} def dict_fetchall(self, query=None, params = []): list_dict = [] self.execute(query, params) for vals in self.curs.fetchall(): list_dict.append(dict(zip(self._keys, vals))) return list_dict class EntityDict(HandlerBase, UserDict): SELECT = "select uid, nom from %s where statut=0" def __init__(self, tablename): HandlerBase.__init__(self) UserDict.__init__(self) self.load(tablename) def load(self, tablename): self.data = dict(self.execute(self.SELECT % tablename).fetchall()) if not self.data: raise self.DoesNotExist, self.SELECT % tablename def id_to_cle(self, id): "pour compatibilite de deref avec GroEntityDict" return self[id] class EntityRDict(EntityDict): SELECT = "select nom, uid from %s where statut=0" class EntitiesDict(UserDict): pass entities = EntitiesDict() rentities = EntitiesDict() class BaseDict(HandlerBase, UserDict): type_ref = 'ref_%s' def __init__(self, query=None, params = [], curs=None): HandlerBase.__init__(self, curs) UserDict.__init__(self) self.data = self.dict_fetchone(query, params) if not self.data: raise self.DoesNotExist, query % params def deref(self, *args): "Change les cles 'ref_type_machin':id en 'type_machin':nom" for table in args: if type(table) == TupleType: table, tablename = table else: tablename = table ref_table = self.type_ref % table if self.has_key(ref_table): #self[table] = entities[tablename][self[ref_table]] self[table] = entities[tablename].id_to_cle(self[ref_table]) del self[self.type_ref % table] def update_from(self, query=None, params = []): self.execute(query, params) for key, value in zip(self._keys, self.curs.fetchone()): if not self.has_key(key): # on ne change pas les cles existantes self[key] = value def serialize(self): return json_serialize(dict(self)) class ListBaseDict(HandlerBase, list): def __init__(self, query=None, params = [], curs=None): HandlerBase.__init__(self, curs) list.__init__(self) self.update_from(query, params) def update_from(self, query=None, params = []): self.execute(query, params) while self.curs.rownumber < self.curs.rowcount: self.append(BaseDict(curs=self.curs)) def serialize(self): return json_serialize(list(self)) class DictBaseDict(HandlerBase, UserDict): def __init__(self, query=None, params = [], curs=None, key=None): HandlerBase.__init__(self, curs) UserDict.__init__(self) self.update_from(query, params, key) def update_from(self, query=None, params = [], key=None): self.execute(query, params) while self.curs.rownumber < self.curs.rowcount: basedict = BaseDict(curs=self.curs) self[basedict[key]] = basedict def serialize(self): return json_serialize(dict(self))