| """ |
| Thinking Module for MiniMind Max2 |
| Interleaved and Sequential Thinking for complex reasoning and tool interactions. |
| """ |
|
|
| from dataclasses import dataclass, field |
| from typing import List, Optional, Dict, Any, Callable, Tuple, Generator |
| from enum import Enum |
| import time |
| import json |
| import re |
|
|
|
|
| class ThinkingMode(Enum): |
| """Modes of thinking.""" |
| INTERLEAVED = "interleaved" |
| SEQUENTIAL = "sequential" |
| STREAMING = "streaming" |
| HIDDEN = "hidden" |
|
|
|
|
| @dataclass |
| class ThinkingStep: |
| """A single step in the thinking process.""" |
| step_id: int |
| content: str |
| step_type: str = "reasoning" |
| confidence: float = 1.0 |
| duration_ms: int = 0 |
| tool_call: Optional[Dict[str, Any]] = None |
| tool_result: Optional[Any] = None |
| is_final: bool = False |
|
|
|
|
| @dataclass |
| class ThinkingConfig: |
| """Configuration for thinking behavior.""" |
| mode: ThinkingMode = ThinkingMode.INTERLEAVED |
| max_thinking_steps: int = 10 |
| min_confidence_threshold: float = 0.7 |
| enable_self_reflection: bool = True |
| enable_step_verification: bool = True |
| show_thinking_to_user: bool = False |
| thinking_budget_ms: int = 30000 |
|
|
| |
| think_start: str = "<Thinking>" |
| think_end: str = "</Thinking>" |
| step_marker: str = "<step>" |
| reflect_marker: str = "<reflect>" |
| conclude_marker: str = "<conclude>" |
|
|
|
|
| class ThinkingContext: |
| """Maintains context across thinking steps.""" |
|
|
| def __init__(self, config: ThinkingConfig): |
| self.config = config |
| self.steps: List[ThinkingStep] = [] |
| self.tool_history: List[Dict[str, Any]] = [] |
| self.start_time: float = 0 |
| self.total_tokens: int = 0 |
| self.current_confidence: float = 1.0 |
|
|
| def start(self): |
| """Start thinking session.""" |
| self.start_time = time.time() |
| self.steps = [] |
| self.tool_history = [] |
|
|
| def elapsed_ms(self) -> int: |
| """Get elapsed time in milliseconds.""" |
| return int((time.time() - self.start_time) * 1000) |
|
|
| def can_continue(self) -> bool: |
| """Check if thinking can continue.""" |
| if len(self.steps) >= self.config.max_thinking_steps: |
| return False |
| if self.elapsed_ms() > self.config.thinking_budget_ms: |
| return False |
| return True |
|
|
| def add_step(self, step: ThinkingStep): |
| """Add a thinking step.""" |
| step.duration_ms = self.elapsed_ms() |
| self.steps.append(step) |
|
|
| def add_tool_call(self, tool_name: str, arguments: Dict[str, Any], result: Any): |
| """Record a tool call.""" |
| self.tool_history.append({ |
| "tool": tool_name, |
| "arguments": arguments, |
| "result": result, |
| "step": len(self.steps), |
| }) |
|
|
| def get_summary(self) -> str: |
| """Get summary of thinking process.""" |
| summary = [] |
| for i, step in enumerate(self.steps): |
| summary.append(f"Step {i+1} ({step.step_type}): {step.content[:100]}...") |
| return "\n".join(summary) |
|
|
| def to_dict(self) -> Dict[str, Any]: |
| """Convert to dictionary.""" |
| return { |
| "steps": [ |
| { |
| "id": s.step_id, |
| "content": s.content, |
| "type": s.step_type, |
| "confidence": s.confidence, |
| "duration_ms": s.duration_ms, |
| "is_final": s.is_final, |
| } |
| for s in self.steps |
| ], |
| "tool_history": self.tool_history, |
| "total_time_ms": self.elapsed_ms(), |
| "total_steps": len(self.steps), |
| } |
|
|
|
|
| class InterleavedThinking: |
| """ |
| Interleaved Thinking: Reason between each tool interaction. |
| Enables the model to adapt strategy based on intermediate results. |
| """ |
|
|
| def __init__( |
| self, |
| model, |
| tool_registry, |
| config: Optional[ThinkingConfig] = None, |
| ): |
| self.model = model |
| self.tools = tool_registry |
| self.config = config or ThinkingConfig(mode=ThinkingMode.INTERLEAVED) |
|
|
| def think_and_act( |
| self, |
| query: str, |
| context: Optional[ThinkingContext] = None, |
| ) -> Generator[ThinkingStep, None, str]: |
| """ |
| Think and act in interleaved fashion. |
| |
| Yields: |
| ThinkingStep objects as thinking progresses |
| |
| Returns: |
| Final answer string |
| """ |
| ctx = context or ThinkingContext(self.config) |
| ctx.start() |
|
|
| step_id = 0 |
| current_state = query |
|
|
| while ctx.can_continue(): |
| |
| thinking = self._generate_thought(current_state, ctx) |
| step = ThinkingStep( |
| step_id=step_id, |
| content=thinking["thought"], |
| step_type="reasoning", |
| confidence=thinking.get("confidence", 0.9), |
| ) |
| ctx.add_step(step) |
| yield step |
| step_id += 1 |
|
|
| |
| action = self._decide_action(thinking, ctx) |
|
|
| if action["type"] == "answer": |
| |
| final_step = ThinkingStep( |
| step_id=step_id, |
| content=action["content"], |
| step_type="conclusion", |
| is_final=True, |
| ) |
| ctx.add_step(final_step) |
| yield final_step |
| return action["content"] |
|
|
| elif action["type"] == "tool_call": |
| |
| tool_name = action["tool"] |
| tool_args = action["arguments"] |
|
|
| try: |
| result = self.tools.execute(tool_name, **tool_args) |
| except Exception as e: |
| result = f"Error: {str(e)}" |
|
|
| ctx.add_tool_call(tool_name, tool_args, result) |
|
|
| |
| tool_step = ThinkingStep( |
| step_id=step_id, |
| content=f"Called {tool_name}", |
| step_type="tool_use", |
| tool_call={"name": tool_name, "args": tool_args}, |
| tool_result=result, |
| ) |
| ctx.add_step(tool_step) |
| yield tool_step |
| step_id += 1 |
|
|
| |
| current_state = f"{current_state}\n\nTool result: {result}" |
|
|
| elif action["type"] == "reflect": |
| |
| reflect_step = ThinkingStep( |
| step_id=step_id, |
| content=action["content"], |
| step_type="reflection", |
| ) |
| ctx.add_step(reflect_step) |
| yield reflect_step |
| step_id += 1 |
|
|
| |
| final_answer = self._generate_final_answer(ctx) |
| return final_answer |
|
|
| def _generate_thought( |
| self, |
| state: str, |
| context: ThinkingContext, |
| ) -> Dict[str, Any]: |
| """Generate a thought about current state.""" |
| |
| |
| return { |
| "thought": f"Analyzing: {state[:100]}...", |
| "confidence": 0.85, |
| "next_action": "continue", |
| } |
|
|
| def _decide_action( |
| self, |
| thinking: Dict[str, Any], |
| context: ThinkingContext, |
| ) -> Dict[str, Any]: |
| """Decide next action based on thinking.""" |
| |
| if thinking.get("confidence", 0) > 0.95: |
| return {"type": "answer", "content": "Final answer based on analysis"} |
|
|
| if len(context.tool_history) < 3: |
| return { |
| "type": "tool_call", |
| "tool": "search", |
| "arguments": {"query": "relevant information"}, |
| } |
|
|
| return {"type": "answer", "content": "Answer after tool use"} |
|
|
| def _generate_final_answer(self, context: ThinkingContext) -> str: |
| """Generate final answer from context.""" |
| return f"Based on {len(context.steps)} thinking steps and {len(context.tool_history)} tool calls: [Final Answer]" |
|
|
|
|
| class SequentialThinking: |
| """ |
| Sequential Thinking: Plan all steps before execution. |
| Best for well-defined tasks with predictable steps. |
| """ |
|
|
| def __init__( |
| self, |
| model, |
| tool_registry, |
| config: Optional[ThinkingConfig] = None, |
| ): |
| self.model = model |
| self.tools = tool_registry |
| self.config = config or ThinkingConfig(mode=ThinkingMode.SEQUENTIAL) |
|
|
| def plan_and_execute( |
| self, |
| query: str, |
| ) -> Tuple[List[Dict[str, Any]], str]: |
| """ |
| Plan all steps then execute sequentially. |
| |
| Returns: |
| Tuple of (execution_log, final_answer) |
| """ |
| |
| plan = self._generate_plan(query) |
|
|
| |
| execution_log = [] |
| context = {} |
|
|
| for step in plan: |
| result = self._execute_step(step, context) |
| execution_log.append({ |
| "step": step, |
| "result": result, |
| }) |
| context[f"step_{len(execution_log)}"] = result |
|
|
| |
| final_answer = self._synthesize_answer(query, execution_log) |
|
|
| return execution_log, final_answer |
|
|
| def _generate_plan(self, query: str) -> List[Dict[str, Any]]: |
| """Generate execution plan.""" |
| |
| return [ |
| {"action": "analyze", "description": "Understand the query"}, |
| {"action": "search", "description": "Gather information"}, |
| {"action": "synthesize", "description": "Combine findings"}, |
| {"action": "answer", "description": "Formulate response"}, |
| ] |
|
|
| def _execute_step( |
| self, |
| step: Dict[str, Any], |
| context: Dict[str, Any], |
| ) -> Any: |
| """Execute a single step.""" |
| action = step.get("action", "") |
|
|
| if action == "search" and self.tools: |
| return self.tools.execute("search", query=step.get("query", "")) |
|
|
| return f"Executed: {action}" |
|
|
| def _synthesize_answer( |
| self, |
| query: str, |
| execution_log: List[Dict[str, Any]], |
| ) -> str: |
| """Synthesize final answer from execution log.""" |
| return f"Answer to '{query}' based on {len(execution_log)} execution steps" |
|
|
|
|
| class ThinkingEngine: |
| """ |
| Unified thinking engine supporting multiple modes. |
| """ |
|
|
| def __init__( |
| self, |
| model, |
| tool_registry=None, |
| config: Optional[ThinkingConfig] = None, |
| ): |
| self.model = model |
| self.tools = tool_registry |
| self.config = config or ThinkingConfig() |
|
|
| self.interleaved = InterleavedThinking(model, tool_registry, config) |
| self.sequential = SequentialThinking(model, tool_registry, config) |
|
|
| def think( |
| self, |
| query: str, |
| mode: Optional[ThinkingMode] = None, |
| stream: bool = False, |
| ) -> Dict[str, Any]: |
| """ |
| Main thinking interface. |
| |
| Args: |
| query: User query |
| mode: Thinking mode (uses config default if None) |
| stream: Whether to stream thinking steps |
| |
| Returns: |
| Dictionary with answer and thinking trace |
| """ |
| mode = mode or self.config.mode |
|
|
| if mode == ThinkingMode.INTERLEAVED: |
| return self._run_interleaved(query, stream) |
| elif mode == ThinkingMode.SEQUENTIAL: |
| return self._run_sequential(query) |
| elif mode == ThinkingMode.HIDDEN: |
| return self._run_hidden(query) |
| else: |
| return self._run_interleaved(query, stream) |
|
|
| def _run_interleaved(self, query: str, stream: bool) -> Dict[str, Any]: |
| """Run interleaved thinking.""" |
| context = ThinkingContext(self.config) |
| steps = [] |
| final_answer = "" |
|
|
| for step in self.interleaved.think_and_act(query, context): |
| steps.append(step) |
| if step.is_final: |
| final_answer = step.content |
|
|
| return { |
| "answer": final_answer, |
| "thinking": self._format_thinking(steps), |
| "context": context.to_dict(), |
| } |
|
|
| def _run_sequential(self, query: str) -> Dict[str, Any]: |
| """Run sequential thinking.""" |
| execution_log, answer = self.sequential.plan_and_execute(query) |
|
|
| return { |
| "answer": answer, |
| "plan": execution_log, |
| "thinking": self._format_plan_thinking(execution_log), |
| } |
|
|
| def _run_hidden(self, query: str) -> Dict[str, Any]: |
| """Run thinking but hide trace.""" |
| result = self._run_interleaved(query, False) |
| return { |
| "answer": result["answer"], |
| "thinking": None, |
| } |
|
|
| def _format_thinking(self, steps: List[ThinkingStep]) -> str: |
| """Format thinking steps for display.""" |
| cfg = self.config |
| lines = [cfg.think_start] |
|
|
| for step in steps: |
| if step.step_type == "reasoning": |
| lines.append(f"{cfg.step_marker} {step.content}") |
| elif step.step_type == "reflection": |
| lines.append(f"{cfg.reflect_marker} {step.content}") |
| elif step.step_type == "tool_use": |
| lines.append(f"[Tool: {step.tool_call['name']}] → {step.tool_result}") |
| elif step.step_type == "conclusion": |
| lines.append(f"{cfg.conclude_marker} {step.content}") |
|
|
| lines.append(cfg.think_end) |
| return "\n".join(lines) |
|
|
| def _format_plan_thinking(self, execution_log: List[Dict[str, Any]]) -> str: |
| """Format sequential plan execution.""" |
| cfg = self.config |
| lines = [cfg.think_start] |
|
|
| for i, entry in enumerate(execution_log): |
| step = entry["step"] |
| result = entry["result"] |
| lines.append(f"{cfg.step_marker} Step {i+1}: {step.get('description', '')}") |
| lines.append(f" Result: {result}") |
|
|
| lines.append(cfg.think_end) |
| return "\n".join(lines) |
|
|
| def evaluate_response( |
| self, |
| query: str, |
| response: str, |
| ) -> Dict[str, Any]: |
| """ |
| Evaluate a response before presenting to user. |
| Can reject or warn based on content. |
| """ |
| evaluation = { |
| "approved": True, |
| "confidence": 0.9, |
| "warnings": [], |
| "suggestions": [], |
| } |
|
|
| |
| if len(response) < 10: |
| evaluation["warnings"].append("Response is very short") |
| evaluation["confidence"] -= 0.2 |
|
|
| |
| uncertainty_markers = ["I'm not sure", "I don't know", "maybe", "perhaps"] |
| for marker in uncertainty_markers: |
| if marker.lower() in response.lower(): |
| evaluation["warnings"].append(f"Contains uncertainty: '{marker}'") |
| evaluation["confidence"] -= 0.1 |
|
|
| |
| if evaluation["confidence"] < self.config.min_confidence_threshold: |
| evaluation["approved"] = False |
| evaluation["suggestions"].append("Consider gathering more information") |
|
|
| return evaluation |
|
|
|
|
| class MultilingualThinking: |
| """ |
| Multilingual response capability with native thinking. |
| """ |
|
|
| LANGUAGE_PROMPTS = { |
| "en": "Think and respond in English.", |
| "zh": "用中文思考和回答。", |
| "es": "Piensa y responde en español.", |
| "fr": "Réfléchis et réponds en français.", |
| "de": "Denke und antworte auf Deutsch.", |
| "ja": "日本語で考えて答えてください。", |
| "ko": "한국어로 생각하고 답하세요.", |
| "ar": "فكر وأجب بالعربية.", |
| "ru": "Думай и отвечай по-русски.", |
| "pt": "Pense e responda em português.", |
| } |
|
|
| def __init__(self, thinking_engine: ThinkingEngine): |
| self.engine = thinking_engine |
|
|
| def detect_language(self, text: str) -> str: |
| """Detect language of input text.""" |
| |
| if re.search(r'[\u4e00-\u9fff]', text): |
| return "zh" |
| if re.search(r'[\u3040-\u309f\u30a0-\u30ff]', text): |
| return "ja" |
| if re.search(r'[\uac00-\ud7af]', text): |
| return "ko" |
| if re.search(r'[\u0600-\u06ff]', text): |
| return "ar" |
| if re.search(r'[\u0400-\u04ff]', text): |
| return "ru" |
| return "en" |
|
|
| def think_multilingual( |
| self, |
| query: str, |
| target_language: Optional[str] = None, |
| ) -> Dict[str, Any]: |
| """ |
| Think in target language natively. |
| |
| Args: |
| query: User query |
| target_language: Target language code (auto-detect if None) |
| |
| Returns: |
| Response with thinking in target language |
| """ |
| lang = target_language or self.detect_language(query) |
| lang_prompt = self.LANGUAGE_PROMPTS.get(lang, self.LANGUAGE_PROMPTS["en"]) |
|
|
| |
| augmented_query = f"{lang_prompt}\n\n{query}" |
|
|
| |
| result = self.engine.think(augmented_query) |
| result["language"] = lang |
|
|
| return result |
|
|