""" core library for house accounts """ import psycopg; import datetime; class UnbalancedTransactionError(Exception): """ Exception raised when an unbalanced transaction is passed to postTransaction() """ pass class HouseAccts(object): # Zope's security model is strict to the point of being essentially # unusable; accessing the date attribute on a DateTime object is # impossible without this. TODO: Find a way to remove this. __allow_access_to_unprotected_subobjects__ = 1 def __init__(self, dsn='dbname=houseaccts user=zope'): self.__db = psycopg.connect(dsn) def db(self): return self.__db def getUsers(self, active_only=False, visible_only=False): """Get all the users and their info""" c = self.db().cursor() if active_only: c.execute("""select id, full_name, email from "user" where active and visible order by full_name""") elif visible_only: c.execute("""select id, full_name, email from "user" where visible order by full_name""") else: c.execute("""select id, full_name, email from "user" order by full_name""") users = [] for r in c.fetchall(): users.append({'id': r[0], 'full_name': r[1], 'email': r[2]}) return users def getUser(self, id): """ Return details for a single user """ c = self.db().cursor() c.execute("""select id, full_name, email from "user" where id = %s""", (id,)) r = c.fetchall()[0] return {'id': r[0], 'full_name': r[1], 'email': r[2]} def getBalance(self, user, date=None): """ Return balance for a user as of the beginning of date; default date is tomorrow """ if date is None: date = datetime.date.today() + datetime.timedelta(days=1) c = self.db().cursor() c.execute("""select sum(ts.debit_amt) as bal from transaction_split ts inner join transaction t on t.id = ts.id where ts."user" = %s and t.posted < %s """, (user, str(date))) bal = c.fetchall()[0][0] if bal is None: bal = 0.0 return bal def getBalances(self, date=None): """ Get balances for all the users if date is specified, the balances are computed as of that date, otherwise, balances are computed as of the end of today """ if date is None: date = datetime.date.today() + datetime.timedelta(days=1) c = self.db().cursor() c.execute("""select "user", u.full_name, sum(debit_amt) as balance from transaction_split ts inner join transaction t on t.id = ts.id left join "user" u on u.id = ts."user" where posted < %s group by "user", u."full_name" order by balance """, (str(date),)) bals = [] for r in c.fetchall(): bals.append({'id': r[0], 'full_name': r[1], 'balance': r[2]}) return bals def getTransactions(self, start=None, end=None, id=None, filter=None): """ Get all transactions as a list of Transaction objects If start is specified, only returns transactions since that date. If end is specified, only returns transactions up to that date. If filter is specified, only returns transactions with description containing that string. Filter is applied case-insensitively. """ sql_constraints = [] if id is not None: sql_constraints.append("id = %i" % int(id)) if start is not None: sql_constraints.append("posted >= '%s'" % start) if end is not None: sql_constraints.append("posted < '%s'" % end) if filter is not None: sql_constraints.append("lower(description) like lower('%%%s%%')" % filter) sql_constraints_str = "" if len(sql_constraints) > 0: sql_constraints_str = "where " + " and ".join(sql_constraints) txns = [] c = self.db().cursor() c.execute("""select id, posted, description, creator from transaction %s order by posted """ % (sql_constraints_str,)) txn_recs = c.fetchall() for r in txn_recs: txn = {'splits': []} (txn['id'], txn['posted'], txn['description'], txn['creator']) = r txn['description'] = txn['description'].decode('UTF-8') txn['posted_date'] = txn['posted'].strftime('%Y-%m-%d') txn['posted_type'] = repr(type(txn['posted'])) # XXX c.execute("""select split, "user", debit_amt from transaction_split where id=%s order by debit_amt """, (txn['id'],)) for s in c.fetchall(): txn['splits'].append({'id': s[0], 'user': s[1], 'debit_amt': s[2]}) txns.append(txn) return txns def getTransaction(self, id): """ Return a single transaction with specified id """ return self.getTransactions(id=id)[0] def getEarliestTxnDate(self, user): c = self.db().cursor() c.execute("""select min(posted) from transaction_split ts inner join transaction t on t.id = ts.id where ts."user" = %s """, (user,)) return c.fetchall()[0][0] def getStatement(self, user, year=None, month=None): today = datetime.date.today() if year is None: year, month = today.year, today.month start_date = datetime.date(year=year, month=month, day=1) end_year, end_month = year, month + 1 if end_month > 12: end_year, end_month = end_year + 1, 1 end_date = datetime.date(year=end_year, month=end_month, day=1) is_complete = True if end_date > today: is_complete = False start_bal = self.getBalance(user, start_date) end_bal = self.getBalance(user, end_date) running_bal = start_bal tot_credit, tot_debit = 0, 0 txns = self.getTransactions(start=start_date, end=end_date) for txn in txns: credit, debit = 0, 0 txn['posted_date'] = txn['posted'].strftime('%Y-%m-%d') txn['mysplits'] = [s for s in txn['splits'] if s['user'] == user] for s in txn['mysplits']: running_bal += s['debit_amt'] s['balance'] = running_bal if s['debit_amt'] < 0: credit += -1 * s['debit_amt'] s['credit_display'] = -1 * s['debit_amt'] s['debit_display'] = 0 else: debit += s['debit_amt'] s['credit_display'] = 0 s['debit_display'] = s['debit_amt'] txn['mysummary'] = {'credit': credit, 'debit': debit} txn['running_bal'] = running_bal tot_credit += credit tot_debit += debit check_diff = running_bal - end_bal assert check_diff < 1e-10, "off by %.10f" % check_diff user_info = self.getUser(user) statement = { 'is_complete': is_complete, 'user': user, 'user_full_name': user_info['full_name'], 'user_email': user_info['email'], 'start_date': start_date, 'end_date': end_date, 'end_date_incl': end_date - datetime.timedelta(days=1), 'period_txt': start_date.strftime('%B %Y'), 'start_bal': start_bal, 'tot_credit': tot_credit, 'tot_debit': tot_debit, 'end_bal': end_bal, 'transactions': [t for t in txns if len(t['mysplits']) > 0], } return statement def postTransaction(self, txn): """ Post a transaction """ # double precision numbers accumulate a small amount of error; tolerate # this as long as it's very small, raise an exception if it's too big t = sum([s['debit_amt'] for s in txn['splits']]) if abs(t) > 1e-10: raise UnbalancedTransactionError("off by %.10f" % t) posted = datetime.datetime.today() if txn.has_key('posted'): posted = txn['posted'] c = self.db().cursor() c.execute("""start transaction""") c.execute("""select nextval('transaction_id_seq')""") txn_id = c.fetchall()[0][0] c.execute(""" insert into transaction ("id", "posted", "description", "creator") values (%i, %s, %s, %s) """, (txn_id, str(posted), txn['description'], txn['creator'])) for i, split in enumerate(txn['splits']): c.execute(""" insert into transaction_split ("id", "split", "user", "debit_amt") values (%i, %i, %s, %f) """, (txn_id, i, split['user'], split['debit_amt'])) c.execute("""commit""") statement_boilerplate = """\ The Expense column shows your part of a shared expense (or in the case of transfers, money paid to you). The Payment column shows payments you have made. The Balance column keeps a running balance; when the balance is positive, you owe money; when the balance is negative, you are owed money. """ class TextStatement(object): """ A plain-text statement that can be put into an email """ def __init__(self, stmt, acct_type='Account'): self.stmt = stmt self.acct_type = acct_type self.date_fmt = '%Y-%m-%d' def __unicode__(self): stmt = self.stmt lines = [] lines.append("%s Statement for %s" % (self.acct_type, stmt['user_full_name'])) lines.append("Transactions for the month of %s" % stmt['period_txt']) lines.append("") end_state = 'balance due' if stmt['end_bal'] < 0: end_state = 'credit' lines.append("You have a %s of $%.2f as of %s." % (end_state, abs(stmt['end_bal']), stmt['end_date_incl'].strftime(self.date_fmt))) lines.append("") lines.append("%-10s %-35s %7s %7s %7s" % ('Date', 'Description', 'Payment', 'Expense', 'Balance')) lines.append("-" * 74) lines.append("%-10s %-35s %7s %7s %7.2f" % (stmt['start_date'].strftime(self.date_fmt), '(Previous Balance)', '', '', stmt['start_bal'])) for t in stmt['transactions']: desc = t['description'] if len(desc) > 35: desc = desc[:32] + '...' lines.append("%-10s %-35s %7s %7s %7.2f" % (t['posted'].strftime(self.date_fmt), desc, fmt_money(t['mysummary']['credit'], width=7), fmt_money(t['mysummary']['debit'], width=7), t['running_bal'])) lines.append("-" * 74) lines.append("") lines.append(statement_boilerplate) return "\n".join(lines) def newSplitPurchaseTxn(description, creator, amount, buyer, users, post_date=None): """ Create and return a new Transaction that evenly splits a purchase between users """ amount = float(amount) shared_amount = amount / len(users) txn = {'description': description, 'creator': creator, 'splits': []} if post_date is not None: txn['posted'] = post_date txn['splits'].append({'user': buyer, 'debit_amt': -1 * amount}) for user in users: txn['splits'].append({'user': user, 'debit_amt': shared_amount}) return txn def newWeightedSplitTxn(description, creator, amount, buyer, users, post_date=None): """ Create and return a new Transaction that splits a purchase between users using weighting """ amount = float(amount) tweight = sum([float(u[1]) for u in users]) txn = {'description': description, 'creator': creator, 'splits': []} if post_date is not None: txn['posted'] = post_date txn['splits'].append({'user': buyer, 'debit_amt': -1 * amount}) for (user, weight) in users: txn['splits'].append({ 'user': user, 'debit_amt': amount * (float(weight) / tweight), }) return txn def fmt_money(amt, width=8, show_zero=False): if amt == 0 and not show_zero: return " " * width return ("%%%d.2f" % width) % amt if __name__ == '__main__': a = HouseAccts() s = a.getStatement('nturner', 2005, 1) print """ Statement for %(user)s From %(start_date)s through %(end_date_incl)s """ % s for t in s['transactions']: spl = t['mysplits'][0] amts = [spl['credit_display'], spl['debit_display'], spl['balance']] print "%(posted_date)10s %(description)-35s " % t, \ " ".join([fmt_money(a) for a in amts]) for spl in t['mysplits'][1:]: amts = [spl['credit_display'], spl['debit_display'], spl['balance']] print " " * 48 + " ".join([fmt_money(a) for a in amts])