Source code for tariochbctools.importers.ibkr.importer
import re
from datetime import date
from decimal import Decimal
from os import path
from typing import Any
import beangulp
import yaml
from beancount.core import amount, data
from beancount.core.number import D
from ibflex import Types, client, parser
from ibflex.enums import CashAction
from tariochbctools.importers.general.priceLookup import PriceLookup
[docs]
class Importer(beangulp.Importer):
"""An importer for Interactive Broker using the flex query service."""
[docs]
def identify(self, filepath: str) -> bool:
return path.basename(filepath).endswith("ibkr.yaml")
[docs]
def matches(
self, trx: Types.CashTransaction, t: Any, account: data.Account
) -> bool:
p = re.compile(r".* (?P<perShare>\d+\.?\d+) PER SHARE")
trxPerShareGroups = p.search(trx.description)
tPerShareGroups = p.search(t["description"])
trxPerShare = trxPerShareGroups.group("perShare") if trxPerShareGroups else ""
tPerShare = tPerShareGroups.group("perShare") if tPerShareGroups else ""
return (
t["date"] == trx.dateTime
and t["symbol"] == self.cleanupSymbol(trx.symbol)
and trxPerShare == tPerShare
and t["account"] == account
)
[docs]
def extract(self, filepath: str, existing: data.Entries) -> data.Entries:
with open(filepath, "r") as f:
config = yaml.safe_load(f)
token = config["token"]
queryId = config["queryId"]
period = config["period"] if "period" in config else None
priceLookup = PriceLookup(existing, config["baseCcy"])
response = client.download(token, queryId, period=period)
statement = parser.parse(response)
assert isinstance(statement, Types.FlexQueryResponse)
result = []
for stmt in statement.FlexStatements:
transactions: list = []
account = stmt.accountId
for trx in stmt.Trades:
result.append(
self.createBuy(
trx.tradeDate,
account,
self.cleanupSymbol(trx.symbol),
trx.quantity,
trx.currency,
trx.tradePrice,
amount.Amount(
round(-trx.ibCommission, 2), trx.ibCommissionCurrency
),
amount.Amount(round(trx.netCash, 2), trx.currency),
config["baseCcy"],
trx.fxRateToBase,
)
)
for trx in stmt.CashTransactions:
existingEntry = None
if CashAction.DIVIDEND == trx.type or CashAction.WHTAX == trx.type:
existingEntry = next(
(
t
for t in transactions
if self.matches(trx, t, stmt.accountId)
),
None,
)
if existingEntry:
if CashAction.WHTAX == trx.type:
existingEntry["whAmount"] += trx.amount
else:
existingEntry["amount"] += trx.amount
existingEntry["description"] = trx.description
existingEntry["type"] = trx.type
else:
if CashAction.WHTAX == trx.type:
amt = 0
whAmount = trx.amount
else:
amt = trx.amount
whAmount = 0
transactions.append(
{
"date": trx.dateTime,
"symbol": self.cleanupSymbol(trx.symbol),
"currency": trx.currency,
"amount": amt,
"whAmount": whAmount,
"description": trx.description,
"type": trx.type,
"account": account,
}
)
for trx in transactions:
if trx["type"] == CashAction.DIVIDEND:
asset = trx["symbol"]
payDate = trx["date"].date()
totalDividend = trx["amount"]
totalWithholding = -trx["whAmount"]
totalPayout = totalDividend - totalWithholding
currency = trx["currency"]
account = trx["account"]
result.append(
self.createDividen(
totalPayout,
totalWithholding,
asset,
currency,
payDate,
priceLookup,
trx["description"],
account,
)
)
return result
[docs]
def createDividen(
self,
payout: Decimal,
withholding: Decimal,
asset: str,
currency: str,
date: date,
priceLookup: PriceLookup,
description: str,
account: str,
) -> data.Transaction:
narration = "Dividend: " + description
liquidityAccount = self.getLiquidityAccount(account, currency)
incomeAccount = self.getIncomeAccount(account)
assetAccount = self.getAssetAccount(account, asset)
price = priceLookup.fetchPrice(currency, date)
postings = [
data.Posting(
assetAccount, amount.Amount(D(0), asset), None, None, None, None
),
data.Posting(
liquidityAccount,
amount.Amount(payout, currency),
None,
price,
None,
None,
),
]
if withholding > 0:
receivableAccount = self.getReceivableAccount(account)
postings.append(
data.Posting(
receivableAccount,
amount.Amount(withholding, currency),
None,
None,
None,
None,
)
)
postings.append(data.Posting(incomeAccount, None, None, None, None, None))
meta = data.new_metadata("dividend", 0, {"account": account})
return data.Transaction(
meta, date, "*", "", narration, data.EMPTY_SET, data.EMPTY_SET, postings
)
[docs]
def createBuy(
self,
date: date,
account: data.Account,
asset: str,
quantity: Decimal,
currency: str,
price: Decimal,
commission: amount.Amount,
netCash: amount.Amount,
baseCcy: str,
fxRateToBase: Decimal,
) -> data.Transaction:
narration = "Buy"
feeAccount = self.getFeeAccount(account)
liquidityAccount = self.getLiquidityAccount(account, currency)
assetAccount = self.getAssetAccount(account, asset)
liquidityPrice = None
if currency != baseCcy:
price = price * fxRateToBase
commission = amount.Amount(
round(commission.number * fxRateToBase, 2), baseCcy
)
liquidityPrice = amount.Amount(fxRateToBase, baseCcy)
postings = [
data.Posting(
assetAccount,
amount.Amount(quantity, asset),
data.CostSpec(price, None, baseCcy, None, None, False),
None,
None,
None,
),
data.Posting(feeAccount, commission, None, None, None, None),
data.Posting(
liquidityAccount,
netCash,
None,
liquidityPrice,
None,
None,
),
]
meta = data.new_metadata("buy", 0, {"account": account})
return data.Transaction(
meta, date, "*", "", narration, data.EMPTY_SET, data.EMPTY_SET, postings
)
[docs]
def getAssetAccount(self, account: str, asset: str) -> data.Account:
return f"Assets:{account}:Investment:IB:{asset}"
[docs]
def getLiquidityAccount(self, account: str, currency: str) -> data.Account:
return f"Assets:{account}:Liquidity:IB:{currency}"
[docs]
def getReceivableAccount(self, account: str) -> data.Account:
return f"Assets:{account}:Receivable:Verrechnungssteuer"
[docs]
def getIncomeAccount(self, account: str) -> data.Account:
return f"Income:{account}:Interest"
[docs]
def cleanupSymbol(self, symbol: str) -> str:
result = symbol
result = result.rstrip("z")
result, _, _ = result.partition(".")
return result