import gradio as gr import asyncio import threading from typing import Optional # Import our clean architecture components from src.core.game_engine import GameEngine from src.facades.game_facade import GameFacade from src.ui.huggingface_ui import HuggingFaceUI from src.ui.improved_interface_manager import ImprovedInterfaceManager from src.mcp.mcp_tools import GradioMCPTools class MMORPGApplication: """Main application class that orchestrates the MMORPG game.""" def __init__(self): """Initialize the application with all necessary components.""" # Initialize core game engine (singleton) self.game_engine = GameEngine() # Initialize game facade for simplified operations self.game_facade = GameFacade() # Initialize UI components self.ui = HuggingFaceUI(self.game_facade) self.interface_manager = ImprovedInterfaceManager(self.game_facade, self.ui) # Initialize MCP tools for AI agent integration self.mcp_tools = GradioMCPTools(self.game_facade) # Gradio interface reference self.gradio_interface: Optional[gr.Blocks] = None # Background tasks self._cleanup_task = None self._auto_refresh_task = None def create_gradio_interface(self) -> gr.Blocks: """Create and configure the Gradio interface.""" # Use the ImprovedInterfaceManager to create the complete interface with proper event handling self.gradio_interface = self.interface_manager.create_interface() return self.gradio_interface def start_background_tasks(self): """Start background tasks for game maintenance.""" def cleanup_task(): """Background task for cleaning up inactive players.""" while True: try: self.game_facade.cleanup_inactive_players() threading.Event().wait(30) # Wait 30 seconds except Exception as e: print(f"Error in cleanup task: {e}") threading.Event().wait(5) # Wait before retry def auto_refresh_task(): """Background task for auto-refreshing game state.""" while True: try: # Trigger refresh for active sessions # This would need session tracking for real implementation threading.Event().wait(2) # Refresh every 2 seconds except Exception as e: print(f"Error in auto-refresh task: {e}") threading.Event().wait(5) # Wait before retry # Start cleanup task self._cleanup_task = threading.Thread(target=cleanup_task, daemon=True) self._cleanup_task.start() # Start auto-refresh task self._auto_refresh_task = threading.Thread(target=auto_refresh_task, daemon=True) self._auto_refresh_task.start() def run(self, share: bool = False, server_port: int = 7860): """Run the MMORPG application.""" print("🎮 Starting MMORPG Application...") print("🏗️ Initializing game engine...") # Initialize game world and services if not self.game_engine.start(): print("❌ Failed to start game engine") return print("🎨 Creating user interface...") # Create Gradio interface interface = self.create_gradio_interface() print("🔧 Starting background tasks...") # Start background maintenance tasks self.start_background_tasks() print("🚀 Launching server...") print(f"🌐 Server will be available at: http://localhost:{server_port}") if share: print("🔗 Public URL will be generated...") # Launch the interface interface.launch( share=True, #override debug=True, # server_port=server_port, mcp_server=True, # Enable MCP server integration show_error=True, quiet=False ) def main(): """Main entry point for the application.""" # Create and run the application app = MMORPGApplication() # Run with default settings # Change share=True to make it publicly accessible app.run(share=True, server_port=7869) if __name__ == "__main__": main()