trackio-63243 / run.py
abidlabs's picture
abidlabs HF Staff
Upload folder using huggingface_hub
3b4ee4c verified
import threading
import time
import huggingface_hub
from gradio_client import Client
from trackio.sqlite_storage import SQLiteStorage
from trackio.typehints import LogEntry
from trackio.utils import RESERVED_KEYS, fibo, generate_readable_name
class Run:
def __init__(
self,
url: str,
project: str,
client: Client | None,
name: str | None = None,
config: dict | None = None,
):
print(f"[DEBUG] Run.__init__: url={url}, project={project}, client={client is not None}, name={name}")
self.url = url
self.project = project
self._client_lock = threading.Lock()
self._client_thread = None
self._client = client
self.name = name or generate_readable_name(SQLiteStorage.get_runs(project))
self.config = config or {}
self._queued_logs: list[LogEntry] = []
self._stop_flag = threading.Event()
self._client_thread = threading.Thread(target=self._init_client_background)
self._client_thread.daemon = True
self._client_thread.start()
print(f"[DEBUG] Run.__init__: Started client thread for {self.name}")
def _init_client_background(self):
print(f"[DEBUG] _init_client_background: Started for {self.name}")
if self._client is None:
print(f"[DEBUG] _init_client_background: No client provided, creating one for {self.name}")
fib = fibo()
for sleep_coefficient in fib:
try:
print(f"[DEBUG] _init_client_background: Attempting to create client for {self.url}")
client = Client(self.url, verbose=False)
print(f"[DEBUG] _init_client_background: Client created, testing connection...")
# Test the connection by trying to get the Space info
try:
print(f"[DEBUG] _init_client_background: Testing client connection...")
# Try to call a simple endpoint to verify connection
test_result = client.predict(api_name="/test")
print(f"[DEBUG] _init_client_background: Connection test successful: {test_result}")
except Exception as test_e:
print(f"[DEBUG] _init_client_background: Connection test failed: {test_e}")
# Continue anyway, the client might still work for our needs
with self._client_lock:
self._client = client
print(f"[DEBUG] _init_client_background: Successfully created client for {self.name}")
break
except Exception as e:
print(f"[DEBUG] _init_client_background: Failed to create client: {e}")
print(f"[DEBUG] _init_client_background: Error type: {type(e)}")
import traceback
traceback.print_exc()
pass
if sleep_coefficient is not None:
print(f"[DEBUG] _init_client_background: Waiting {0.1 * sleep_coefficient}s before retry")
time.sleep(0.1 * sleep_coefficient)
else:
print(f"[DEBUG] _init_client_background: Client already provided for {self.name}")
print(f"[DEBUG] _init_client_background: About to start _batch_sender for {self.name}")
self._batch_sender()
print(f"[DEBUG] _init_client_background: _batch_sender finished for {self.name}")
def _batch_sender(self):
"""Send batched logs every 500ms."""
print(f"[DEBUG] _batch_sender: Started for {self.name}")
print(f"[DEBUG] _batch_sender: Client available: {self._client is not None}")
print(f"[DEBUG] _batch_sender: Stop flag set: {self._stop_flag.is_set()}")
iteration = 0
while not self._stop_flag.is_set() or len(self._queued_logs) > 0:
iteration += 1
print(f"[DEBUG] _batch_sender: Iteration {iteration} for {self.name}")
if not self._stop_flag.is_set():
time.sleep(0.5)
with self._client_lock:
print(f"[DEBUG] _batch_sender: Checking queue for {self.name}, size: {len(self._queued_logs)}")
if self._queued_logs and self._client is not None:
logs_to_send = self._queued_logs.copy()
self._queued_logs.clear()
print(f"[DEBUG] _batch_sender: Sending {len(logs_to_send)} logs via bulk_log for {self.name}")
try:
hf_token = huggingface_hub.utils.get_token()
print(f"[DEBUG] _batch_sender: Got HF token: {hf_token[:10] if hf_token else 'None'}...")
print(f"[DEBUG] _batch_sender: Calling client.predict with api_name='/bulk_log'")
result = self._client.predict(
api_name="/bulk_log",
logs=logs_to_send,
hf_token=hf_token,
)
print(f"[DEBUG] _batch_sender: bulk_log call successful for {self.name}, result: {result}")
except Exception as e:
print(f"[DEBUG] _batch_sender: Error calling bulk_log for {self.name}: {e}")
print(f"[DEBUG] _batch_sender: Error type: {type(e)}")
import traceback
traceback.print_exc()
else:
print(f"[DEBUG] _batch_sender: No logs to send or no client for {self.name}")
# If stop flag is set and no more logs, exit
if self._stop_flag.is_set() and len(self._queued_logs) == 0:
print(f"[DEBUG] _batch_sender: Stop flag set and no more logs, exiting for {self.name}")
break
print(f"[DEBUG] _batch_sender: Exiting loop for {self.name}")
def log(self, metrics: dict, step: int | None = None):
print(f"[DEBUG] log: Called for {self.name} with {len(metrics)} metrics, step={step}")
for k in metrics.keys():
if k in RESERVED_KEYS or k.startswith("__"):
raise ValueError(
f"Please do not use this reserved key as a metric: {k}"
)
log_entry: LogEntry = {
"project": self.project,
"run": self.name,
"metrics": metrics,
"step": step,
}
with self._client_lock:
self._queued_logs.append(log_entry)
print(f"[DEBUG] log: Added log entry to queue for {self.name}, queue size now: {len(self._queued_logs)}")
def finish(self):
"""Cleanup when run is finished."""
print(f"[DEBUG] finish: Called for {self.name}")
# First, send any remaining queued logs if we have a client
with self._client_lock:
if self._queued_logs and self._client is not None:
logs_to_send = self._queued_logs.copy()
self._queued_logs.clear()
print(f"[DEBUG] finish: Sending final {len(logs_to_send)} logs via bulk_log for {self.name}")
try:
hf_token = huggingface_hub.utils.get_token()
print(f"[DEBUG] finish: Got HF token: {hf_token[:10] if hf_token else 'None'}...")
result = self._client.predict(
api_name="/bulk_log",
logs=logs_to_send,
hf_token=hf_token,
)
print(f"[DEBUG] finish: Final bulk_log call successful for {self.name}, result: {result}")
except Exception as e:
print(f"[DEBUG] finish: Error in final bulk_log call for {self.name}: {e}")
print(f"[DEBUG] finish: Error type: {type(e)}")
import traceback
traceback.print_exc()
else:
print(f"[DEBUG] finish: No logs to send or no client for {self.name}")
# Now set the stop flag to signal the background thread to stop
print(f"[DEBUG] finish: Setting stop flag for {self.name}")
self._stop_flag.set()
# Give the background thread a moment to process any remaining logs
print(f"[DEBUG] finish: Waiting a moment for background thread to process logs for {self.name}")
time.sleep(1.0)
if self._client_thread is not None:
print(f"* Uploading logs to Trackio Space: {self.url} (please wait...)")
self._client_thread.join()
print(f"[DEBUG] finish: Client thread joined for {self.name}")