Source code for piecash.kvp

import datetime
import decimal
import sys
import uuid
from enum import Enum
from importlib import import_module

from sqlalchemy import Column, VARCHAR, INTEGER, REAL, BIGINT, types, event, Index
from sqlalchemy.orm import relation, foreign, object_session, backref

from ._common import CallableList
from ._common import hybrid_property_gncnumeric
from .sa_extra import _DateTime, DeclarativeBase, _Date


[docs]class KVP_Type(Enum): KVP_TYPE_INVALID = -1 KVP_TYPE_GINT64 = 1 KVP_TYPE_DOUBLE = 2 KVP_TYPE_NUMERIC = 3 KVP_TYPE_STRING = 4 KVP_TYPE_GUID = 5 KVP_TYPE_TIMESPEC = 6 KVP_TYPE_BINARY = 7 KVP_TYPE_GLIST = 8 KVP_TYPE_FRAME = 9 KVP_TYPE_GDATE = 10
pytype_KVPtype = { int: KVP_Type.KVP_TYPE_GINT64, float: KVP_Type.KVP_TYPE_DOUBLE, decimal.Decimal: KVP_Type.KVP_TYPE_NUMERIC, dict: KVP_Type.KVP_TYPE_FRAME, list: KVP_Type.KVP_TYPE_GLIST, # to fill } KVPtype_fields = { KVP_Type.KVP_TYPE_GINT64: "int64_val", KVP_Type.KVP_TYPE_DOUBLE: "double_val", KVP_Type.KVP_TYPE_STRING: "string_val", KVP_Type.KVP_TYPE_GUID: "guid_val", KVP_Type.KVP_TYPE_TIMESPEC: "timespec_val", KVP_Type.KVP_TYPE_GDATE: "gdate_val", KVP_Type.KVP_TYPE_NUMERIC: ("numeric_val_num", "numeric_val_denom"), KVP_Type.KVP_TYPE_FRAME: "guid", KVP_Type.KVP_TYPE_GLIST: "guid", }
[docs]class SlotType(types.TypeDecorator): """Used to customise the DateTime type for sqlite (ie without the separators as in gnucash""" impl = INTEGER
[docs] def process_bind_param(self, value, dialect): if value is not None: return value.value
[docs] def process_result_value(self, value, dialect): if value is not None: return KVP_Type(value)
class DictWrapper(object): def __contains__(self, key): for sl in self.slots: if sl.name == key: return True else: return False def __getitem__(self, key): assert not isinstance( key, int ), "You are accessing slots with an integer (={}) while a string is expected".format( key ) keys = key.split("/", 1) key = keys[0] for sl in self.slots: if sl.name == key: break else: raise KeyError("No slot exists with name '{}'".format(key)) if len(keys) > 1: return sl[keys[1]] else: return sl # .value def __setitem__(self, key, value): keys = key.split("/", 1) key = keys[0] for sl in self.slots: if sl.name == key: break else: # new key if len(keys) > 1: if isinstance(self, SlotFrame): sf = SlotFrame(name=self._name + "/" + key, obj_guid=self.guid_val) else: sf = SlotFrame(name=key, obj_guid=self.guid) sf[keys[1]] = value self.slots.append(sf) else: self.slots.append(slot(parent=self, name=key, value=value)) return if len(keys) > 1: sl[keys[1]] = value return # assign if type is correct if isinstance(value, sl._python_type): sl.value = value else: raise TypeError( "Type of '{}' is not one of {}".format(value, sl._python_type) ) def __delitem__(self, key): if isinstance(key, slice): # delete all del self.slots[key] return keys = key.split("/", 1) for i, sl in enumerate(self.slots): if sl.name == keys[0]: break else: raise KeyError("No slot exists with name '{}'".format(key)) if len(keys) > 1: del sl[keys[1]] else: del self.slots[i] def iteritems(self): for sl in self.slots: yield sl.name, sl def get(self, key, default=None): try: return self[key].value except KeyError: return default class Slot(DeclarativeBase): __tablename__ = "slots" __table_args__ = ( Index("slots_guid_index", "obj_guid"), { "sqlite_autoincrement": True, }, ) # column definitions id = Column("id", INTEGER(), primary_key=True, nullable=False, autoincrement=True) obj_guid = Column("obj_guid", VARCHAR(length=32), nullable=False) _name = Column("name", VARCHAR(length=4096), nullable=False) @property def name(self): if self._name: return self._name.split("/")[-1] else: return self._name @name.setter def name(self, value): self._name = value slot_type = Column("slot_type", SlotType(), nullable=False) __mapper_args__ = { "polymorphic_on": slot_type, } def __init__(self, name, value=None, obj_guid=None): self.name = name if value is not None: self.value = value if obj_guid is not None: self.obj_guid = obj_guid def __str__(self): return "<{} {}={!r}>".format(self.__class__.__name__, self.name, self.value) class SlotSimple(Slot): __mapper_args__ = { "polymorphic_identity": -1, } _python_type = () @property def value(self): return getattr(self, self._field) @value.setter def value(self, value): setattr(self, self._field, value) def __eq__(self, other): return ( isinstance(other, self.__class__) and self.name == other.name and self.value == other.value ) def define_simpleslot(postfix, pytype, KVPtype, field, col_type, col_default): cls = type( "Slot{}".format(postfix), (SlotSimple,), { "__mapper_args__": {"polymorphic_identity": KVPtype}, field: Column(field, col_type, default=col_default), "_field": field, "_python_type": pytype, }, ) return cls SlotInt = define_simpleslot( postfix="Int", pytype=(int,), KVPtype=KVP_Type.KVP_TYPE_GINT64, field="int64_val", col_type=BIGINT(), col_default=0, ) SlotString = define_simpleslot( postfix="String", pytype=(str,), KVPtype=KVP_Type.KVP_TYPE_STRING, field="string_val", col_type=VARCHAR(length=4096), col_default=None, ) SlotDouble = define_simpleslot( postfix="Double", pytype=(float,), KVPtype=KVP_Type.KVP_TYPE_DOUBLE, field="double_val", col_type=REAL(), col_default=0, ) SlotTime = define_simpleslot( postfix="Time", pytype=(datetime.time,), KVPtype=KVP_Type.KVP_TYPE_TIMESPEC, field="timespec_val", col_type=_DateTime(), col_default=None, ) class SlotFrame(DictWrapper, Slot): __mapper_args__ = {"polymorphic_identity": KVP_Type.KVP_TYPE_FRAME} _python_type = (dict,) guid_val = Column("guid_val", VARCHAR(length=32)) slots = relation( "Slot", primaryjoin=foreign(Slot.obj_guid) == guid_val, cascade="all, delete-orphan", collection_class=CallableList, single_parent=True, backref=backref("parent", remote_side=guid_val), ) @property def value(self): # convert to dict return {sl.name: sl.value for sl in self.slots} @value.setter def value(self, value): self.slots = [slot(parent=self, name=k, value=v) for k, v in value.items()] def __init__(self, **kwargs): self.guid_val = uuid.uuid4().hex super(SlotFrame, self).__init__(**kwargs) class SlotList(SlotFrame): __mapper_args__ = {"polymorphic_identity": KVP_Type.KVP_TYPE_GLIST} _python_type = (list,) @property def value(self): # convert to dict return [sl.value for sl in self.slots] @value.setter def value(self, value): self.slots = [ slot(parent=self, name=str(i), value=v) for i, v in enumerate(value) ] def __init__(self, **kwargs): self.guid_val = uuid.uuid4().hex super(SlotFrame, self).__init__(**kwargs) @event.listens_for(SlotFrame.slots, "remove") def remove_slot(target, value, initiator): s = object_session(value) if value in s.new: s.expunge(value) else: s.delete(value) class SlotGUID(SlotFrame): __mapper_args__ = {"polymorphic_identity": KVP_Type.KVP_TYPE_GUID} _python_type = (DeclarativeBase,) # add _mapping_name_class = { "from-sched-xaction": "piecash.core.transaction.ScheduledTransaction", "account": "piecash.core.account.Account", "invoice-guid": "piecash.business.invoice.Invoice", "peer_guid": "piecash.core.transaction.Split", "gains-split": "piecash.core.transaction.Split", "gains-source": "piecash.core.transaction.Split", "default-currency": "piecash.core.commodity.Commodity", } @property def Class(self): name, guid = self.name, self.guid_val if name.startswith("CURRENCY::"): # handle capital gain account class_to_retrieve = "piecash.core.account.Account" else: class_to_retrieve = self._mapping_name_class.get(name, None) if class_to_retrieve is None: raise ValueError( "Smart retrieval of GUID slot with name '{}' is not yet supported. " "Need to retrieve proper object type in kvp module " "(add in SlotGUID._mapping_name_class)".format(name) ) class_module, class_name = class_to_retrieve.rsplit(".", 1) mod = import_module(class_module) Class = getattr(mod, class_name) return Class @property def value(self): return ( object_session(self).query(self.Class).filter_by(guid=self.guid_val).one() ) @value.setter def value(self, value): assert isinstance(value, self.Class) self.guid_val = value.guid def get_all_subclasses(cls): all_subclasses = [] direct_subclasses = cls.__subclasses__() all_subclasses.extend(direct_subclasses) for subclass in direct_subclasses: all_subclasses.extend(get_all_subclasses(subclass)) return all_subclasses def slot(parent, name, value): if isinstance(parent, SlotFrame): name = parent._name + "/" + name guid_parent = parent.guid_val else: guid_parent = parent.guid # handle datetime before others (as otherwise can be mixed with date) if isinstance(value, datetime.datetime): return SlotTime(name=name, value=value, obj_guid=guid_parent) for cls in get_all_subclasses(Slot): if isinstance(value, cls._python_type) and cls != SlotFrame and cls != SlotList: return cls(name=name, value=value, obj_guid=guid_parent) if isinstance(value, dict): # transform a dict to Frame/Slots sf = SlotFrame(name=name, obj_guid=guid_parent) for k, v in value.items(): sl = slot(parent=sf, name=k, value=v) sl.parent = sf return sf if isinstance(value, list): # transform a list to List/Slots sf = SlotList(name=name) for i, v in enumerate(value): sl = slot(parent=sf, name=str(i), value=v) sl.parent = sf return sf raise ValueError("Cannot handle type of '{}'".format(value)) class SlotNumeric(Slot): __mapper_args__ = {"polymorphic_identity": KVP_Type.KVP_TYPE_NUMERIC} _python_type = (tuple, decimal.Decimal) _numeric_val_num = Column("numeric_val_num", BIGINT(), nullable=True, default=0) _numeric_val_denom = Column("numeric_val_denom", BIGINT(), nullable=True, default=1) value = hybrid_property_gncnumeric(_numeric_val_num, _numeric_val_denom) SlotDate = define_simpleslot( postfix="Date", pytype=(datetime.date,), KVPtype=KVP_Type.KVP_TYPE_GDATE, field="gdate_val", col_type=_Date(), col_default=None, )