| import sqlite3 |
| import json |
| import uuid |
| import datetime |
| import logging |
| import re |
| import gradio as gr |
|
|
| |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
| |
| TRANSFORMERS_AVAILABLE = False |
| AutoModelForCausalLM = None |
| AutoTokenizer = None |
| tokenizer = None |
| model = None |
|
|
| |
| conn = sqlite3.connect("erp.db", check_same_thread=False) |
| cursor = conn.cursor() |
|
|
| |
| cursor.execute(""" |
| CREATE TABLE IF NOT EXISTS chart_of_accounts ( |
| account_id TEXT PRIMARY KEY, |
| account_name TEXT NOT NULL, |
| account_type TEXT NOT NULL, |
| parent_id TEXT, |
| allow_budgeting BOOLEAN, |
| allow_posting BOOLEAN, |
| FOREIGN KEY (parent_id) REFERENCES chart_of_accounts(account_id) |
| ) |
| """) |
| cursor.execute(""" |
| CREATE TABLE IF NOT EXISTS journal_entries ( |
| entry_id TEXT PRIMARY KEY, |
| date TEXT NOT NULL, |
| debit_account_id TEXT NOT NULL, |
| credit_account_id TEXT NOT NULL, |
| amount REAL NOT NULL, |
| description TEXT, |
| FOREIGN KEY (debit_account_id) REFERENCES chart_of_accounts(account_id), |
| FOREIGN KEY (credit_account_id) REFERENCES chart_of_accounts(account_id) |
| ) |
| """) |
| conn.commit() |
|
|
| |
| ACCOUNT_RULES = { |
| "Asset": {"increase": "Debit", "decrease": "Credit"}, |
| "Liability": {"increase": "Credit", "decrease": "Debit"}, |
| "Equity": {"increase": "Credit", "decrease": "Debit"}, |
| "Revenue": {"increase": "Credit", "decrease": "Debit"}, |
| "Expense": {"increase": "Debit", "decrease": "Credit"} |
| } |
|
|
| |
| def initialize_chart_of_accounts(): |
| accounts = [ |
| ("1", "Assets", "Asset", None, True, False), |
| ("1.1", "Fixed Assets", "Asset", "1", True, False), |
| ("1.1.1", "Plant", "Asset", "1.1", True, True), |
| ("1.1.2", "Machinery", "Asset", "1.1", True, True), |
| ("1.1.3", "Building", "Asset", "1.1", True, True), |
| ("1.2", "Current Assets", "Asset", "1", True, False), |
| ("1.2.1", "Cash", "Asset", "1.2", True, True), |
| ("1.2.2", "Laptop", "Asset", "1.2", True, True), |
| ("1.2.3", "Inventory", "Asset", "1.2", True, True), |
| ("1.2.4", "Accounts Receivable", "Asset", "1.2", True, True), |
| ("1.2.5", "Bank", "Asset", "1.2", True, True), |
| ("2", "Liabilities", "Liability", None, True, False), |
| ("2.1", "Accounts Payable", "Liability", "2", True, True), |
| ("2.2", "Loan Payable", "Liability", "2", True, True), |
| ("3", "Equity", "Equity", None, True, False), |
| ("3.1", "Owner's Equity", "Equity", "3", True, True), |
| ("3.2", "Drawings", "Equity", "3", True, True), |
| ("4", "Revenue", "Revenue", None, True, False), |
| ("4.1", "Sales Revenue", "Revenue", "4", True, True), |
| ("5", "Expenses", "Expense", None, True, False), |
| ("5.1", "Rent Expense", "Expense", "5", True, True), |
| ("5.2", "Salary Expense", "Expense", "5", True, True), |
| ("5.3", "Office Supplies", "Expense", "5", True, True) |
| ] |
| cursor.executemany(""" |
| INSERT OR REPLACE INTO chart_of_accounts |
| (account_id, account_name, account_type, parent_id, allow_budgeting, allow_posting) |
| VALUES (?, ?, ?, ?, ?, ?) |
| """, accounts) |
| conn.commit() |
| logging.info("Chart of accounts initialized.") |
|
|
| |
| def parse_prompt(prompt, state): |
| logging.info(f"Parsing prompt: {prompt}") |
| if model and tokenizer: |
| try: |
| input_text = f""" |
| Parse the following accounting prompt into a JSON object with: |
| - debit: {{account, type, amount}} |
| - credit: {{account, type, amount}} |
| - payment_method: 'cash', 'credit', 'bank', or null |
| Prompt: {prompt} |
| """ |
| inputs = tokenizer(input_text, return_tensors="pt") |
| outputs = model.generate(**inputs, max_length=300) |
| response = tokenizer.decode(outputs[0], skip_special_tokens=True) |
| return json.loads(response), state |
| except Exception as e: |
| logging.error(f"Model parsing failed: {e}") |
|
|
| prompt_lower = prompt.lower().strip() |
| amount = None |
| match = re.search(r'\$[\d,.]+', prompt_lower) |
| if match: |
| try: |
| amount = float(match.group().replace('$', '').replace(',', '')) |
| except ValueError: |
| return {"error": "Invalid amount format."}, state |
|
|
| if not amount: |
| return {"error": "No amount found in prompt."}, state |
|
|
| account_mappings = { |
| "laptop": ("Laptop", "Asset"), |
| "inventory": ("Inventory", "Asset"), |
| "machinery": ("Machinery", "Asset"), |
| "building": ("Building", "Asset"), |
| "plant": ("Plant", "Asset"), |
| "office supplies": ("Office Supplies", "Expense"), |
| "cash": ("Cash", "Asset"), |
| "bank": ("Bank", "Asset"), |
| "receivable": ("Accounts Receivable", "Asset"), |
| "sold goods": ("Sales Revenue", "Revenue"), |
| "sales": ("Sales Revenue", "Revenue"), |
| "rent": ("Rent Expense", "Expense"), |
| "salary": ("Salary Expense", "Expense"), |
| "paid": ("Cash", "Asset"), |
| "bought": ("Laptop", "Asset"), |
| "purchased": ("Laptop", "Asset"), |
| "owner's draw": ("Drawings", "Equity"), |
| "loan": ("Loan Payable", "Liability") |
| } |
|
|
| debit_account = None |
| debit_type = None |
| credit_account = None |
| credit_type = None |
| payment_method = None |
|
|
| if state.get("pending_prompt"): |
| follow_up = prompt_lower |
| if follow_up in ["cash", "credit", "bank"]: |
| payment_method = follow_up |
| parsed = state["pending_parsed"] |
| debit_account = parsed["debit"]["account"] |
| debit_type = parsed["debit"]["type"] |
| amount = parsed["debit"]["amount"] |
| if payment_method == "cash": |
| credit_account, credit_type = "Cash", "Asset" |
| elif payment_method == "bank": |
| credit_account, credit_type = "Bank", "Asset" |
| elif payment_method == "credit": |
| credit_account, credit_type = "Accounts Payable", "Liability" |
| state = {} |
| else: |
| return {"error": "Please respond with 'cash', 'credit', or 'bank'."}, state |
| else: |
| for keyword, (account, acc_type) in account_mappings.items(): |
| if keyword in prompt_lower: |
| if keyword in ["bought", "purchased"]: |
| for asset in ["laptop", "inventory", "machinery", "building", "plant", "office supplies"]: |
| if asset in prompt_lower: |
| debit_account, debit_type = account_mappings[asset] |
| break |
| if not debit_account: |
| debit_account, debit_type = account, acc_type |
| elif keyword in ["rent", "salary", "office supplies"]: |
| debit_account, debit_type = account, acc_type |
| elif keyword in ["sold goods", "sales"]: |
| debit_account, debit_type = "Accounts Receivable", "Asset" |
| credit_account, credit_type = account, acc_type |
| elif keyword == "owner's draw": |
| debit_account, debit_type = "Drawings", "Equity" |
| credit_account, credit_type = "Cash", "Asset" |
| elif keyword == "paid": |
| if "rent" in prompt_lower: |
| debit_account, debit_type = "Rent Expense", "Expense" |
| elif "salary" in prompt_lower: |
| debit_account, debit_type = "Salary Expense", "Expense" |
| elif "office supplies" in prompt_lower: |
| debit_account, debit_type = "Office Supplies", "Expense" |
| credit_account, credit_type = "Cash", "Asset" |
| break |
|
|
| if "cash" in prompt_lower and not credit_account: |
| credit_account, credit_type = "Cash", "Asset" |
| payment_method = "cash" |
| elif "bank" in prompt_lower and not credit_account: |
| credit_account, credit_type = "Bank", "Asset" |
| payment_method = "bank" |
| elif "credit" in prompt_lower and not credit_account: |
| credit_account, credit_type = "Accounts Payable", "Liability" |
| payment_method = "credit" |
| elif debit_account and not credit_account: |
| return { |
| "status": "clarify", |
| "message": "Was this bought on cash, credit, or bank?", |
| "pending_parsed": {"debit": {"account": debit_account, "type": debit_type, "amount": amount}} |
| }, {"pending_prompt": prompt, "pending_parsed": {"debit": {"account": debit_account, "type": debit_type, "amount": amount}}} |
|
|
| if debit_account and credit_account: |
| return { |
| "debit": {"account": debit_account, "type": debit_type, "amount": amount}, |
| "credit": {"account": credit_account, "type": credit_type, "amount": amount}, |
| "payment_method": payment_method |
| }, state |
| return {"error": "Prompt not recognized. Try 'Bought a laptop for $200' or 'Paid rent $400'."}, state |
|
|
| |
| def generate_journal_entry(parsed, state): |
| logging.info(f"Generating journal entry with parsed: {parsed}") |
| if "error" in parsed: |
| return parsed["error"], state |
| if parsed.get("status") == "clarify": |
| return parsed["message"], state |
|
|
| debit_account = parsed["debit"]["account"] |
| amount = parsed["debit"]["amount"] |
| credit_account = parsed["credit"]["account"] |
| credit_type = parsed["credit"]["type"] |
|
|
| cursor.execute("SELECT account_id, account_type, allow_posting FROM chart_of_accounts WHERE account_name = ?", (debit_account,)) |
| debit_result = cursor.fetchone() |
| cursor.execute("SELECT account_id, account_type, allow_posting FROM chart_of_accounts WHERE account_name = ?", (credit_account,)) |
| credit_result = cursor.fetchone() |
|
|
| if not debit_result or not credit_result: |
| return "One or both accounts not found.", state |
| if not debit_result[2] or not credit_result[2]: |
| return "Posting not allowed for one or both accounts.", state |
| if debit_result[1] != parsed["debit"]["type"] or credit_result[1] != credit_type: |
| return "Account type mismatch.", state |
|
|
| entry_id = str(uuid.uuid4()) |
| date = datetime.datetime.now().isoformat() |
| description = state.get("pending_prompt", "Transaction") |
| try: |
| cursor.execute(""" |
| INSERT INTO journal_entries (entry_id, date, debit_account_id, credit_account_id, amount, description) |
| VALUES (?, ?, ?, ?, ?, ?) |
| """, (entry_id, date, debit_result[0], credit_result[0], amount, description)) |
| conn.commit() |
| logging.info(f"Journal entry created: Debit {debit_account} ${amount}, Credit {credit_account} ${amount}") |
| except sqlite3.Error as e: |
| logging.error(f"Database error: {e}") |
| return "Database error occurred.", state |
|
|
| return f"Journal Entry Created: Debit {debit_account} ${amount}, Credit {credit_account} ${amount}", state |
|
|
| |
| def generate_t_account(account_name): |
| cursor.execute("SELECT account_id FROM chart_of_accounts WHERE account_name = ?", (account_name,)) |
| account_id = cursor.fetchone() |
| if not account_id: |
| logging.error(f"Account {account_name} not found.") |
| return "Account not found." |
| |
| account_id = account_id[0] |
| try: |
| cursor.execute(""" |
| SELECT date, amount, description, 'Debit' as type FROM journal_entries WHERE debit_account_id = ? |
| UNION |
| SELECT date, amount, description, 'Credit' as type FROM journal_entries WHERE credit_account_id = ? |
| ORDER BY date |
| """, (account_id, account_id)) |
| entries = cursor.fetchall() |
| logging.info(f"Retrieved {len(entries)} entries for T-account: {account_name}") |
| except sqlite3.Error as e: |
| logging.error(f"SQL error in generate_t_account: {e}") |
| return "Error retrieving T-account data." |
| |
| t_account = f"T-Account for {account_name}\n{'='*50}\n{'Debit':<20} | {'Credit':<20} | Description\n{'-'*50}\n" |
| debit_total = 0 |
| credit_total = 0 |
| for date, amount, desc, entry_type in entries: |
| if entry_type == "Debit": |
| t_account += f"${amount:<19} | {'':<20} | {desc}\n" |
| debit_total += amount |
| else: |
| t_account += f"{'':<20} | ${amount:<19} | {desc}\n" |
| credit_total += amount |
| t_account += f"{'-'*50}\nTotal Debit: ${debit_total:<10} | Total Credit: ${credit_total}\n" |
| |
| return t_account |
|
|
| |
| def chat_function(message, history, state=None): |
| if state is None: |
| state = {} |
| initialize_chart_of_accounts() |
| logging.info("Initialized state and chart of accounts") |
| |
| logging.info(f"Received message: {message}") |
| |
| if message.lower().startswith("t-account "): |
| account_name = message[10:].strip() |
| if account_name: |
| response = generate_t_account(account_name) |
| else: |
| response = "Please specify an account name." |
| else: |
| parsed, state = parse_prompt(message, state) |
| response, state = generate_journal_entry(parsed, state) |
| |
| if history is not None: |
| history.append({"role": "user", "content": message}) |
| history.append({"role": "assistant", "content": response}) |
| else: |
| history = [ |
| {"role": "user", "content": message}, |
| {"role": "assistant", "content": response} |
| ] |
| |
| return history, state, "" |
|
|
| |
| with gr.Blocks() as demo: |
| gr.Markdown("# AI ERP System") |
| gr.Markdown("Enter accounting prompts like 'Bought a laptop for $200' or 't-account Laptop'. The system will ask for clarification if needed.") |
| chatbot = gr.Chatbot(type="messages") |
| msg = gr.Textbox(placeholder="Type your prompt here...", lines=1, submit_btn=None) |
| clear = gr.Button("Clear") |
| |
| state = gr.State({}) |
| |
| msg.submit(chat_function, [msg, chatbot, state], [chatbot, state, msg]) |
| clear.click(lambda: ([], {}, ""), None, [chatbot, state, msg], queue=False) |
|
|
| |
| if __name__ == "__main__": |
| demo.launch(server_name="0.0.0.0", server_port=7860) |