# -*- coding: utf-8 -*- from django.conf import settings from django.utils import simplejson from django.db import connection import datetime try: from django.core.serializers.json import DjangoJSONEncoder except: class DjangoJSONEncoder(simplejson.JSONEncoder): """ JSONEncoder subclass that knows how to encode date/time and decimal types. """ DATE_FORMAT = "%Y-%m-%d" TIME_FORMAT = "%H:%M:%S" def default(self, o): if isinstance(o, datetime.datetime): d = datetime_safe.new_datetime(o) return d.strftime("%s %s" % (self.DATE_FORMAT, self.TIME_FORMAT)) elif isinstance(o, datetime.date): d = datetime_safe.new_date(o) return d.strftime(self.DATE_FORMAT) elif isinstance(o, datetime.time): return o.strftime(self.TIME_FORMAT) else: return super(DjangoJSONEncoder, self).default(o) from types import * from UserDict import UserDict class DBHandlerJSONEncoder(DjangoJSONEncoder): """ DjangoJSONEncoder subclass that knows how to encode UserDict types. """ def default(self, obj): if isinstance(obj, UserDict): return dict([(k, self.default(v)) for k, v in obj.items()]) try: return DjangoJSONEncoder.default(self, obj) except TypeError: return obj def json_serialize(obj): return simplejson.dumps(obj, ensure_ascii=False, cls=DBHandlerJSONEncoder) class HandlerBase(object): _keys = None # les noms des champs que retourne la requete class DoesNotExist(Exception): pass def __init__(self, curs = None): self.curs = curs or connection.cursor() def execute(self, query=None, params = None): if not self.curs: self.curs = connection.cursor() if query: self.curs.execute(query, params or []) if self.curs.description: # cas update self._keys = [ d[0] for d in self.curs.description ] return self.curs def fetchone(self, query=None, params = None): self.execute(query, params) return self.curs.fetchone() def fetchall(self, query=None, params = None): self.execute(query, params) return self.curs.fetchall() def dict_fetchone(self, query=None, params = None): vals = self.fetchone(query, params) if vals: return dict(zip(self._keys, vals)) return {} def dict_fetchall(self, query=None, params = None): list_dict = [] for vals in self.fetchall(query, params): list_dict.append(dict(zip(self._keys, vals))) return list_dict def get_uid(self, sequenceName = 'object_uid_seq'): vals = self.fetchone("select nextval('%s'::text)" % sequenceName) return vals[0] def get_id(self, sequenceName = 'object_id_seq'): return self.get_uid(sequenceName) class EntityDict(HandlerBase, UserDict): # TODO : delitem and __keys !!! SELECT = "select uid, nom from %s where statut=0 order by uid" def __init__(self, tablename): HandlerBase.__init__(self) UserDict.__init__(self) self.load(tablename) def load(self, tablename): data = self.execute(self.SELECT % tablename).fetchall() if not data: raise self.DoesNotExist, self.SELECT % tablename self.__keys = [ key for (key, value) in data ] self.data = dict(data) def items(self): "ordered" return [ (key, self[key]) for key in self.__keys ] def keys(self): return self.__keys def values(self): return [ self[key] for key in self.__keys ] def __setitem__(self, key, value): if not self.has_key(key): self.__keys.append(key) UserDict.__setitem__(self, key, value) # def __delitem__(self, key): # pb de perfs .... # if self.has_key(key): # self.__keys.remove(key) # UserDict.__delitem__(self, key) def id_to_cle(self, id): "pour compatibilite de deref avec GroEntityDict" return self[id] class EntityRDict(EntityDict): SELECT = "select nom, uid from %s where statut=0 order by nom" class EntitiesDict(UserDict): pass entities = EntitiesDict() rentities = EntitiesDict() class BaseDict(HandlerBase, UserDict): type_ref = 'ref_%s' def __init__(self, query=None, params = (), curs=None): HandlerBase.__init__(self, curs) UserDict.__init__(self) self.data = self.dict_fetchone(query, params) if not self.data: raise self.DoesNotExist, query % tuple(params) self.entity_value = {} for tablename, entitydict in entities.items(): ref_key = self.type_ref % tablename if self.has_key(ref_key): self.entity_value[ref_key] = entitydict.get(self[ref_key]) def deref(self, *args): "Change les cles 'ref_type_machin':id en 'type_machin':nom" for table in args: if type(table) == TupleType: table, tablename = table else: tablename = table ref_table = self.type_ref % table if self.has_key(ref_table): self[table] = entities[tablename].id_to_cle(self[ref_table]) del self[self.type_ref % table] def update_from(self, query=None, params = ()): self.execute(query, params) for key, value in zip(self._keys, self.curs.fetchone()): if not self.has_key(key): # on ne change pas les cles existantes self[key] = value def serialize(self): return json_serialize(dict(self)) class ListBaseDict(HandlerBase, list): def __init__(self, query=None, params = (), curs=None): HandlerBase.__init__(self, curs) list.__init__(self) self.update_from(query, params) def update_from(self, query=None, params = ()): self.execute(query, params) while self.curs.rownumber < self.curs.rowcount: self.append(BaseDict(curs=self.curs)) def serialize(self): return json_serialize(list(self)) class DictBaseDict(HandlerBase, UserDict): def __init__(self, query=None, params = (), curs=None, key=None): HandlerBase.__init__(self, curs) UserDict.__init__(self) self.update_from(query, params, key) def update_from(self, query=None, params = (), key=None): self.execute(query, params) while self.curs.rownumber < self.curs.rowcount: basedict = BaseDict(curs=self.curs) self[basedict[key]] = basedict def serialize(self): return json_serialize(dict(self))