Source code for piecash.core.session

import datetime
import os
import shutil
import socket
from collections import defaultdict

from sqlalchemy import event, Column, VARCHAR, INTEGER, Table, PrimaryKeyConstraint
from sqlalchemy.sql.ddl import DropConstraint, DropIndex
from sqlalchemy_utils import database_exists

from piecash.core import factories
from .book import Book
from .._common import GnucashException
from ..sa_extra import create_piecash_engine, DeclarativeBase, Session

# version of tables changed between 2.6 and 3.0
#   ('invoices', 4)
#   ('prices', 3)
#   ('slots', 4)
#   ('transactions', 4)
#   ('entries', 4)

version_supported = {
    "2.6": {
        "Gnucash": 2062100,
        "Gnucash-Resave": 19920,
        "accounts": 1,
        "billterms": 2,
        "books": 1,
        "budget_amounts": 1,
        "budgets": 1,
        "commodities": 1,
        "customers": 2,
        "employees": 2,
        "entries": 3,
        "invoices": 3,
        "jobs": 1,
        "lots": 2,
        "orders": 1,
        "prices": 2,
        "recurrences": 2,
        "schedxactions": 1,
        "slots": 3,
        "splits": 4,
        "taxtable_entries": 3,
        "taxtables": 2,
        "transactions": 3,
        "vendors": 1,
    },
    "3.0": {
        "Gnucash": 3000000,
        "Gnucash-Resave": 19920,
        "accounts": 1,
        "billterms": 2,
        "books": 1,
        "budget_amounts": 1,
        "budgets": 1,
        "commodities": 1,
        "customers": 2,
        "employees": 2,
        "entries": 4,
        "invoices": 4,
        "jobs": 1,
        "lots": 2,
        "orders": 1,
        "prices": 3,
        "recurrences": 2,
        "schedxactions": 1,
        "slots": 4,
        "splits": 4,
        "taxtable_entries": 3,
        "taxtables": 2,
        "transactions": 4,
        "vendors": 1,
    },
    "3.7": {
        "Gnucash": 3000001,
        "Gnucash-Resave": 19920,
        "accounts": 1,
        "billterms": 2,
        "books": 1,
        "budget_amounts": 1,
        "budgets": 1,
        "commodities": 1,
        "customers": 2,
        "employees": 2,
        "entries": 4,
        "invoices": 4,
        "jobs": 1,
        "lots": 2,
        "orders": 1,
        "prices": 3,
        "recurrences": 2,
        "schedxactions": 1,
        "slots": 4,
        "splits": 5,
        "taxtable_entries": 3,
        "taxtables": 2,
        "transactions": 4,
        "vendors": 1,
    },
    "4.1": {
        "Gnucash": 4000001,
        "Gnucash-Resave": 19920,
        "accounts": 1,
        "billterms": 2,
        "books": 1,
        "budget_amounts": 1,
        "budgets": 1,
        "commodities": 1,
        "customers": 2,
        "employees": 2,
        "entries": 4,
        "invoices": 4,
        "jobs": 1,
        "lots": 2,
        "orders": 1,
        "prices": 3,
        "recurrences": 2,
        "schedxactions": 1,
        "slots": 4,
        "splits": 5,
        "taxtable_entries": 3,
        "taxtables": 2,
        "transactions": 4,
        "vendors": 1,
    },
}

# this is not a declarative as it is used before binding the session to an engine.
gnclock = Table(
    u"gnclock",
    DeclarativeBase.metadata,
    Column("hostname", VARCHAR(length=255)),
    Column("pid", INTEGER()),
)


[docs]class Version(DeclarativeBase): """The declarative class for the 'versions' table.""" __tablename__ = "versions" __table_args__ = {} # column definitions # : The name of the table table_name = Column( "table_name", VARCHAR(length=50), primary_key=True, nullable=False ) #: The version for the table table_version = Column("table_version", INTEGER(), nullable=False) def __init__(self, table_name, table_version): self.table_name = table_name self.table_version = table_version def __str__(self): return "Version<{}={}>".format(self.table_name, self.table_version)
[docs]def build_uri( sqlite_file=None, uri_conn=None, db_type=None, db_user=None, db_password=None, db_name=None, db_host=None, db_port=None, check_same_thread=True, ): """Create the connection string in function of some choices. :param str sqlite_file: a path to an sqlite3 file (only used if uri_conn is None) :param str uri_conn: a sqlalchemy connection string :param str db_type: type of database in ["postgres","mysql"] :param str db_user: username of database :param str db_password: password for the use of database :param str db_name: name of database :param str db_host: host of database :param int db_port: port of database :param bool check_same_thread: sqlite flag that restricts connection use to the thread that created (see False for use in ipython/flask/... but read first https://docs.python.org/3/library/sqlite3.html) :return: the connection string :rtype: str """ db_config = (db_type, db_host, db_port, db_name, db_user, db_password) db_config_isdefined = map(lambda x: x is not None, db_config[:-1]) if any(db_config_isdefined): if not all(db_config_isdefined): raise ValueError( "When using db_* arguments, all must be specified : {}".format( db_config ) ) uri_conn = { "postgres": "postgresql://{username}:{password}@{host}:{port}/{name}", "mysql": "mysql+pymysql://{username}:{password}@{host}:{port}/{name}?charset=utf8", }[db_type].format( username=db_user, password=db_password, host=db_host, port=db_port, name=db_name, ) # db_postgres_uri = "postgresql://postgres:{pwd}@localhost:5432/foo".format(pwd=pg_password) # db_mysql_uri = "mysql+pymysql://travis:@localhost/foo?charset=utf8" # db_sqlite_uri = "sqlite:///{}".format(db_sqlite) if sqlite_file and uri_conn: raise ValueError( "Only one of 'sqlite_file' or 'uri_conn' argument can be defined" ) if uri_conn is None: # fallback on sqlite if sqlite_file: if isinstance(sqlite_file, str) and sqlite_file.startswith("sqlite:///"): # already have the protocol specified. uri_conn = sqlite_file else: uri_conn = "sqlite:///{}".format(sqlite_file) else: uri_conn = "sqlite:///:memory:" if check_same_thread is False: uri_conn = uri_conn + "?check_same_thread=False" return uri_conn
[docs]def create_book( sqlite_file=None, uri_conn=None, currency="EUR", overwrite=False, keep_foreign_keys=False, db_type=None, db_user=None, db_password=None, db_name=None, db_host=None, db_port=None, check_same_thread=True, pg_template="template0", **kwargs ): """Create a new empty GnuCash book. If both sqlite_file and uri_conn are None, then an "in memory" sqlite book is created. :param str sqlite_file: a path to an sqlite3 file (only used if uri_conn is None) :param str uri_conn: a sqlalchemy connection string :param str currency: the ISO symbol of the default currency of the book :param bool overwrite: True if book should be deleted and recreated if it exists already :param bool keep_foreign_keys: True if the foreign keys should be kept (may not work at all with GnuCash) :param str db_type: type of database in ["postgres","mysql"] :param str db_user: username of database :param str db_password: password for the use of database :param str db_name: name of database :param str db_host: host of database :param int db_port: port of database :param bool check_same_thread: sqlite flag that restricts connection use to the thread that created (see False for use in ipython/flask/... but read first https://docs.python.org/3/library/sqlite3.html) :param str pg_template: the postgres template to use when creating the database. One of template1 or template0 (default template0). Irrelevant for other databases than postgres. :return: the document as a gnucash session :rtype: :class:`GncSession` :raises GnucashException: if document already exists and overwrite is False """ from sqlalchemy_utils.functions import ( database_exists, create_database, drop_database, ) VERSION_FORMAT = "3.0" uri_conn = build_uri( sqlite_file, uri_conn, db_type, db_user, db_password, db_name, db_host, db_port, check_same_thread, ) # create database (if DB is not a sqlite in memory) if uri_conn != "sqlite:///:memory:": if database_exists(uri_conn): if overwrite: drop_database(uri_conn) else: raise GnucashException("'{}' db already exists".format(uri_conn)) create_database(uri_conn, template=pg_template) engine = create_piecash_engine(uri_conn, **kwargs) # drop constraints if we de not want to keep them (keep_foreign_keys=False), the default if not keep_foreign_keys: for n, tbl in DeclarativeBase.metadata.tables.items(): # drop index constraints for idx in tbl.indexes: if idx.name.startswith("ix_") or idx.name.startswith("_"): event.listen(tbl, "after_create", DropIndex(idx), once=True) # drop FK constraints for cstr in tbl.constraints: if isinstance(cstr, PrimaryKeyConstraint): continue else: event.listen(tbl, "before_drop", DropConstraint(cstr), once=True) # # create all (tables, fk, ...) DeclarativeBase.metadata.create_all(engine) s = Session(bind=engine) # create all rows in version table assert ( VERSION_FORMAT in version_supported ), "The 'version_format'={} is not supported. " "Choose one of {}".format( VERSION_FORMAT, list(version_supported.keys()) ) for table_name, table_version in version_supported[VERSION_FORMAT].items(): s.add(Version(table_name=table_name, table_version=table_version)) # create book and merge with session b = Book() s.add(b) adapt_session(s, book=b, readonly=False) # create commodities and initial accounts from .account import Account b.root_account = Account( name="Root Account", type="ROOT", commodity=factories.create_currency_from_ISO(currency), book=b, ) b.root_template = Account(name="Template Root", type="ROOT", commodity=None, book=b) b.save() return b
[docs]def open_book( sqlite_file=None, uri_conn=None, readonly=True, open_if_lock=False, do_backup=True, db_type=None, db_user=None, db_password=None, db_name=None, db_host=None, db_port=None, check_same_thread=True, check_exists=True, **kwargs ): """Open an existing GnuCash book :param str sqlite_file: a path to an sqlite3 file (only used if uri_conn is None) :param str uri_conn: a sqlalchemy connection string :param bool readonly: open the file as readonly (useful to play with and avoid any unwanted save) :param bool open_if_lock: open the file even if it is locked by another user (using open_if_lock=True with readonly=False is not recommended) :param bool do_backup: do a backup if the file written in RW (i.e. readonly=False) (this only works with the sqlite backend and copy the file with .{:%Y%m%d%H%M%S}.gnucash appended to it) :param str db_type: type of database in ["postgres","mysql"] :param str db_user: username of database :param str db_password: password for the use of database :param str db_name: name of database :param str db_host: host of database :param str db_port: port of database :param bool check_same_thread: sqlite flag that restricts connection use to the thread that created (see False for use in ipython/flask/... but read first https://docs.python.org/3/library/sqlite3.html) :param bool check_exists: check if the database exists before connecting :return: the document as a gnucash session :rtype: :class:`GncSession` :raises GnucashException: if the document does not exist :raises GnucashException: if there is a lock on the file and open_if_lock is False """ uri_conn = build_uri( sqlite_file, uri_conn, db_type, db_user, db_password, db_name, db_host, db_port, check_same_thread, ) if uri_conn == "sqlite:///:memory:": raise ValueError( "An in memory sqlite gnucash databook cannot be opened, it should be created" ) # check if the database exists if check_exists and not database_exists(uri_conn): raise GnucashException( "Database '{}' does not exist (please use create_book to create " "GnuCash books from scratch). If you want to bypass this existence check, " "use the argument check_exists=False.".format(uri_conn) ) engine = create_piecash_engine(uri_conn, **kwargs) # backup database if readonly=False and do_backup=True if not readonly and do_backup: if engine.name != "sqlite": raise GnucashException( "Cannot do a backup for engine '{}'. Do yourself a backup and then specify do_backup=False".format( engine.name ) ) url = uri_conn[len("sqlite:///") :].replace("?check_same_thread=False", "") url_backup = url + ".{:%Y%m%d%H%M%S}.gnucash".format(datetime.datetime.now()) shutil.copyfile(url, url_backup) locks = list(engine.execute(gnclock.select())) # ensure the file is not locked by GnuCash itself if locks and not open_if_lock: raise GnucashException("Lock on the file") s = Session(bind=engine) # check the versions in the table versions is consistent with the API version_book = { v.table_name: v.table_version for v in s.query(Version).all() if "Gnucash" not in v.table_name } for version, vt in version_supported.items(): if version_book == {k: v for k, v in vt.items() if "Gnucash" not in k}: break else: raise ValueError("Unsupported table versions") assert version == "3.0" or version == "3.7" or version == "4.1", ( "This version of piecash only support books from gnucash (3.0|3.7|4.1) " "which is not the case for {}".format(uri_conn) ) book = s.query(Book).one() adapt_session(s, book=book, readonly=readonly) return book
[docs]def adapt_session(session, book, readonly): """ Change the SA session object to add some features. :param session: the SA session object that will be modified in place :param book: the gnucash singleton book linked to the SA session :param readonly: True if the session should not allow commits. :return: """ # link session and book together book.session = session session.book = book book.uri = session.bind.url # def new_flush(*args, **kwargs): # if session.dirty or session.new or session.deleted: # session.rollback() # raise GnucashException("You cannot change the DB, it is locked !") # add logic to make session readonly def readonly_commit(*args, **kwargs): # session.rollback() raise GnucashException("You cannot change the DB, it was opened as readonly!") if readonly: session.commit = readonly_commit # add logic to create/delete GnuCash locks def delete_lock(): session.execute( gnclock.delete( whereclause=(gnclock.c.hostname == socket.gethostname()) and (gnclock.c.pid == os.getpid()) ) ) session.commit() session.delete_lock = delete_lock def create_lock(): session.execute( gnclock.insert(values=dict(hostname=socket.gethostname(), pid=os.getpid())) ) session.commit() session.create_lock = create_lock # add logic to track if a session has been modified or not session._is_modified = False session._all_changes = {} @event.listens_for(session, "after_flush") def receive_after_flush(session, flush_context): session._is_modified = not session.is_saved @event.listens_for(session, "after_commit") @event.listens_for(session, "after_rollback") def init_session_status(session, *args, **kwargs): session._is_modified = False session._all_changes.clear() session.book.session_changes = defaultdict(list) session.__class__.is_saved = property( fget=lambda self: not ( self._is_modified or self.dirty or self.deleted or self.new ), doc="True if nothing has yet been changed (False otherwise)", )
event.listen(Session, "before_commit", Book.validate_book) event.listen(Session, "before_flush", Book.track_dirty)