Spaces:
Sleeping
Sleeping
| import threading | |
| import time | |
| import huggingface_hub | |
| from gradio_client import Client | |
| from trackio.sqlite_storage import SQLiteStorage | |
| from trackio.typehints import LogEntry | |
| from trackio.utils import RESERVED_KEYS, fibo, generate_readable_name | |
| class Run: | |
| def __init__( | |
| self, | |
| url: str, | |
| project: str, | |
| client: Client | None, | |
| name: str | None = None, | |
| config: dict | None = None, | |
| ): | |
| print(f"[DEBUG] Run.__init__: url={url}, project={project}, client={client is not None}, name={name}") | |
| self.url = url | |
| self.project = project | |
| self._client_lock = threading.Lock() | |
| self._client_thread = None | |
| self._client = client | |
| self.name = name or generate_readable_name(SQLiteStorage.get_runs(project)) | |
| self.config = config or {} | |
| self._queued_logs: list[LogEntry] = [] | |
| self._stop_flag = threading.Event() | |
| self._client_thread = threading.Thread(target=self._init_client_background) | |
| self._client_thread.daemon = True | |
| self._client_thread.start() | |
| print(f"[DEBUG] Run.__init__: Started client thread for {self.name}") | |
| def _init_client_background(self): | |
| print(f"[DEBUG] _init_client_background: Started for {self.name}") | |
| if self._client is None: | |
| print(f"[DEBUG] _init_client_background: No client provided, creating one for {self.name}") | |
| fib = fibo() | |
| for sleep_coefficient in fib: | |
| try: | |
| print(f"[DEBUG] _init_client_background: Attempting to create client for {self.url}") | |
| client = Client(self.url, verbose=False) | |
| print(f"[DEBUG] _init_client_background: Client created, testing connection...") | |
| # Test the connection by trying to get the Space info | |
| try: | |
| print(f"[DEBUG] _init_client_background: Testing client connection...") | |
| # Try to call a simple endpoint to verify connection | |
| test_result = client.predict(api_name="/test") | |
| print(f"[DEBUG] _init_client_background: Connection test successful: {test_result}") | |
| except Exception as test_e: | |
| print(f"[DEBUG] _init_client_background: Connection test failed: {test_e}") | |
| # Continue anyway, the client might still work for our needs | |
| with self._client_lock: | |
| self._client = client | |
| print(f"[DEBUG] _init_client_background: Successfully created client for {self.name}") | |
| break | |
| except Exception as e: | |
| print(f"[DEBUG] _init_client_background: Failed to create client: {e}") | |
| print(f"[DEBUG] _init_client_background: Error type: {type(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| pass | |
| if sleep_coefficient is not None: | |
| print(f"[DEBUG] _init_client_background: Waiting {0.1 * sleep_coefficient}s before retry") | |
| time.sleep(0.1 * sleep_coefficient) | |
| else: | |
| print(f"[DEBUG] _init_client_background: Client already provided for {self.name}") | |
| print(f"[DEBUG] _init_client_background: About to start _batch_sender for {self.name}") | |
| self._batch_sender() | |
| print(f"[DEBUG] _init_client_background: _batch_sender finished for {self.name}") | |
| def _batch_sender(self): | |
| """Send batched logs every 500ms.""" | |
| print(f"[DEBUG] _batch_sender: Started for {self.name}") | |
| print(f"[DEBUG] _batch_sender: Client available: {self._client is not None}") | |
| print(f"[DEBUG] _batch_sender: Stop flag set: {self._stop_flag.is_set()}") | |
| iteration = 0 | |
| while not self._stop_flag.is_set() or len(self._queued_logs) > 0: | |
| iteration += 1 | |
| print(f"[DEBUG] _batch_sender: Iteration {iteration} for {self.name}") | |
| if not self._stop_flag.is_set(): | |
| time.sleep(0.5) | |
| with self._client_lock: | |
| print(f"[DEBUG] _batch_sender: Checking queue for {self.name}, size: {len(self._queued_logs)}") | |
| if self._queued_logs and self._client is not None: | |
| logs_to_send = self._queued_logs.copy() | |
| self._queued_logs.clear() | |
| print(f"[DEBUG] _batch_sender: Sending {len(logs_to_send)} logs via bulk_log for {self.name}") | |
| try: | |
| hf_token = huggingface_hub.utils.get_token() | |
| print(f"[DEBUG] _batch_sender: Got HF token: {hf_token[:10] if hf_token else 'None'}...") | |
| print(f"[DEBUG] _batch_sender: Calling client.predict with api_name='/bulk_log'") | |
| result = self._client.predict( | |
| api_name="/bulk_log", | |
| logs=logs_to_send, | |
| hf_token=hf_token, | |
| ) | |
| print(f"[DEBUG] _batch_sender: bulk_log call successful for {self.name}, result: {result}") | |
| except Exception as e: | |
| print(f"[DEBUG] _batch_sender: Error calling bulk_log for {self.name}: {e}") | |
| print(f"[DEBUG] _batch_sender: Error type: {type(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| else: | |
| print(f"[DEBUG] _batch_sender: No logs to send or no client for {self.name}") | |
| # If stop flag is set and no more logs, exit | |
| if self._stop_flag.is_set() and len(self._queued_logs) == 0: | |
| print(f"[DEBUG] _batch_sender: Stop flag set and no more logs, exiting for {self.name}") | |
| break | |
| print(f"[DEBUG] _batch_sender: Exiting loop for {self.name}") | |
| def log(self, metrics: dict, step: int | None = None): | |
| print(f"[DEBUG] log: Called for {self.name} with {len(metrics)} metrics, step={step}") | |
| for k in metrics.keys(): | |
| if k in RESERVED_KEYS or k.startswith("__"): | |
| raise ValueError( | |
| f"Please do not use this reserved key as a metric: {k}" | |
| ) | |
| log_entry: LogEntry = { | |
| "project": self.project, | |
| "run": self.name, | |
| "metrics": metrics, | |
| "step": step, | |
| } | |
| with self._client_lock: | |
| self._queued_logs.append(log_entry) | |
| print(f"[DEBUG] log: Added log entry to queue for {self.name}, queue size now: {len(self._queued_logs)}") | |
| def finish(self): | |
| """Cleanup when run is finished.""" | |
| print(f"[DEBUG] finish: Called for {self.name}") | |
| # First, send any remaining queued logs if we have a client | |
| with self._client_lock: | |
| if self._queued_logs and self._client is not None: | |
| logs_to_send = self._queued_logs.copy() | |
| self._queued_logs.clear() | |
| print(f"[DEBUG] finish: Sending final {len(logs_to_send)} logs via bulk_log for {self.name}") | |
| try: | |
| hf_token = huggingface_hub.utils.get_token() | |
| print(f"[DEBUG] finish: Got HF token: {hf_token[:10] if hf_token else 'None'}...") | |
| result = self._client.predict( | |
| api_name="/bulk_log", | |
| logs=logs_to_send, | |
| hf_token=hf_token, | |
| ) | |
| print(f"[DEBUG] finish: Final bulk_log call successful for {self.name}, result: {result}") | |
| except Exception as e: | |
| print(f"[DEBUG] finish: Error in final bulk_log call for {self.name}: {e}") | |
| print(f"[DEBUG] finish: Error type: {type(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| else: | |
| print(f"[DEBUG] finish: No logs to send or no client for {self.name}") | |
| # Now set the stop flag to signal the background thread to stop | |
| print(f"[DEBUG] finish: Setting stop flag for {self.name}") | |
| self._stop_flag.set() | |
| # Give the background thread a moment to process any remaining logs | |
| print(f"[DEBUG] finish: Waiting a moment for background thread to process logs for {self.name}") | |
| time.sleep(1.0) | |
| if self._client_thread is not None: | |
| print(f"* Uploading logs to Trackio Space: {self.url} (please wait...)") | |
| self._client_thread.join() | |
| print(f"[DEBUG] finish: Client thread joined for {self.name}") | |