|
|
"""
|
|
|
Job management service for handling job lifecycle and operations.
|
|
|
|
|
|
This service provides business logic for job management operations,
|
|
|
including job retrieval, cancellation, deletion, and cleanup.
|
|
|
"""
|
|
|
|
|
|
import logging
|
|
|
from datetime import datetime, timedelta
|
|
|
from typing import Optional, Dict, Any, List
|
|
|
from redis.asyncio import Redis
|
|
|
|
|
|
from ..models.job import Job, JobStatus, JobType, JobPriority, JobConfiguration
|
|
|
from ..core.redis import RedisKeyManager, redis_json_get, redis_json_set
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class JobService:
|
|
|
"""
|
|
|
Service class for job management operations.
|
|
|
|
|
|
Handles job lifecycle management, status updates, cleanup,
|
|
|
and data retrieval operations.
|
|
|
"""
|
|
|
|
|
|
def __init__(self, redis_client: Redis):
|
|
|
self.redis_client = redis_client
|
|
|
|
|
|
async def get_jobs_paginated(
|
|
|
self,
|
|
|
user_id: str,
|
|
|
limit: int = 10,
|
|
|
offset: int = 0,
|
|
|
filters: Optional[Dict[str, Any]] = None
|
|
|
) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Get paginated list of jobs for a user with optional filtering.
|
|
|
|
|
|
Args:
|
|
|
user_id: User ID to get jobs for
|
|
|
limit: Maximum number of jobs to return
|
|
|
offset: Number of jobs to skip
|
|
|
filters: Optional filters (status, job_type, priority, dates)
|
|
|
|
|
|
Returns:
|
|
|
Dict containing jobs list and pagination info
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
user_jobs_key = RedisKeyManager.user_jobs_key(user_id)
|
|
|
job_ids = await self.redis_client.smembers(user_jobs_key)
|
|
|
|
|
|
if not job_ids:
|
|
|
return {
|
|
|
"jobs": [],
|
|
|
"total_count": 0
|
|
|
}
|
|
|
|
|
|
|
|
|
jobs = []
|
|
|
for job_id in job_ids:
|
|
|
job = await self._get_job_by_id(job_id)
|
|
|
if job and not job.is_deleted:
|
|
|
|
|
|
if self._job_matches_filters(job, filters):
|
|
|
jobs.append(job)
|
|
|
|
|
|
|
|
|
jobs.sort(key=lambda x: x.created_at, reverse=True)
|
|
|
|
|
|
|
|
|
total_count = len(jobs)
|
|
|
paginated_jobs = jobs[offset:offset + limit]
|
|
|
|
|
|
logger.info(
|
|
|
"Retrieved paginated jobs",
|
|
|
user_id=user_id,
|
|
|
total_count=total_count,
|
|
|
returned_count=len(paginated_jobs),
|
|
|
offset=offset,
|
|
|
limit=limit
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"jobs": paginated_jobs,
|
|
|
"total_count": total_count
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(
|
|
|
"Failed to get paginated jobs",
|
|
|
user_id=user_id,
|
|
|
error=str(e),
|
|
|
exc_info=True
|
|
|
)
|
|
|
return {
|
|
|
"jobs": [],
|
|
|
"total_count": 0
|
|
|
}
|
|
|
|
|
|
async def cancel_job(self, job_id: str) -> bool:
|
|
|
"""
|
|
|
Cancel a job if it's in a cancellable state.
|
|
|
|
|
|
Args:
|
|
|
job_id: Job ID to cancel
|
|
|
|
|
|
Returns:
|
|
|
True if cancellation successful, False otherwise
|
|
|
"""
|
|
|
try:
|
|
|
job = await self._get_job_by_id(job_id)
|
|
|
if not job:
|
|
|
logger.warning(f"Job {job_id} not found for cancellation")
|
|
|
return False
|
|
|
|
|
|
|
|
|
if not job.can_be_cancelled:
|
|
|
logger.warning(
|
|
|
f"Job {job_id} cannot be cancelled. Current status: {job.status}"
|
|
|
)
|
|
|
return False
|
|
|
|
|
|
|
|
|
job.status = JobStatus.CANCELLED
|
|
|
job.completed_at = datetime.utcnow()
|
|
|
job.updated_at = datetime.utcnow()
|
|
|
job.progress.current_stage = "cancelled"
|
|
|
|
|
|
|
|
|
await self._save_job(job)
|
|
|
|
|
|
|
|
|
await self.redis_client.lrem(RedisKeyManager.JOB_QUEUE, 0, job_id)
|
|
|
|
|
|
|
|
|
status_key = RedisKeyManager.job_status_key(job_id)
|
|
|
await self.redis_client.set(status_key, JobStatus.CANCELLED.value, ex=300)
|
|
|
|
|
|
logger.info(f"Job {job_id} cancelled successfully")
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(
|
|
|
"Failed to cancel job",
|
|
|
job_id=job_id,
|
|
|
error=str(e),
|
|
|
exc_info=True
|
|
|
)
|
|
|
return False
|
|
|
|
|
|
async def soft_delete_job(self, job_id: str) -> bool:
|
|
|
"""
|
|
|
Perform soft delete on a job.
|
|
|
|
|
|
Args:
|
|
|
job_id: Job ID to delete
|
|
|
|
|
|
Returns:
|
|
|
True if deletion successful, False otherwise
|
|
|
"""
|
|
|
try:
|
|
|
job = await self._get_job_by_id(job_id)
|
|
|
if not job:
|
|
|
logger.warning(f"Job {job_id} not found for deletion")
|
|
|
return False
|
|
|
|
|
|
|
|
|
job.is_deleted = True
|
|
|
job.deleted_at = datetime.utcnow()
|
|
|
job.updated_at = datetime.utcnow()
|
|
|
|
|
|
|
|
|
await self._save_job(job)
|
|
|
|
|
|
logger.info(f"Job {job_id} soft deleted successfully")
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(
|
|
|
"Failed to soft delete job",
|
|
|
job_id=job_id,
|
|
|
error=str(e),
|
|
|
exc_info=True
|
|
|
)
|
|
|
return False
|
|
|
|
|
|
async def cleanup_job_data(self, job_id: str) -> bool:
|
|
|
"""
|
|
|
Clean up related data for a deleted job.
|
|
|
|
|
|
Args:
|
|
|
job_id: Job ID to clean up
|
|
|
|
|
|
Returns:
|
|
|
True if cleanup successful, False otherwise
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
video_key = RedisKeyManager.video_key(f"job_{job_id}")
|
|
|
await self.redis_client.delete(video_key)
|
|
|
|
|
|
|
|
|
status_key = RedisKeyManager.job_status_key(job_id)
|
|
|
await self.redis_client.delete(status_key)
|
|
|
|
|
|
|
|
|
logs_key = f"job_logs:{job_id}"
|
|
|
await self.redis_client.delete(logs_key)
|
|
|
|
|
|
|
|
|
cache_pattern = f"cache:job:{job_id}:*"
|
|
|
cache_keys = await self.redis_client.keys(cache_pattern)
|
|
|
if cache_keys:
|
|
|
await self.redis_client.delete(*cache_keys)
|
|
|
|
|
|
logger.info(f"Cleaned up data for job {job_id}")
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(
|
|
|
"Failed to cleanup job data",
|
|
|
job_id=job_id,
|
|
|
error=str(e),
|
|
|
exc_info=True
|
|
|
)
|
|
|
return False
|
|
|
|
|
|
async def get_job_statistics(self, user_id: str) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Get job statistics for a user.
|
|
|
|
|
|
Args:
|
|
|
user_id: User ID to get statistics for
|
|
|
|
|
|
Returns:
|
|
|
Dict containing job statistics
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
user_jobs_key = RedisKeyManager.user_jobs_key(user_id)
|
|
|
job_ids = await self.redis_client.smembers(user_jobs_key)
|
|
|
|
|
|
if not job_ids:
|
|
|
return {
|
|
|
"total_jobs": 0,
|
|
|
"by_status": {},
|
|
|
"by_type": {},
|
|
|
"by_priority": {},
|
|
|
"recent_activity": []
|
|
|
}
|
|
|
|
|
|
|
|
|
stats = {
|
|
|
"total_jobs": 0,
|
|
|
"by_status": {},
|
|
|
"by_type": {},
|
|
|
"by_priority": {},
|
|
|
"recent_activity": []
|
|
|
}
|
|
|
|
|
|
recent_jobs = []
|
|
|
|
|
|
|
|
|
for job_id in job_ids:
|
|
|
job = await self._get_job_by_id(job_id)
|
|
|
if job and not job.is_deleted:
|
|
|
stats["total_jobs"] += 1
|
|
|
|
|
|
|
|
|
status = job.status.value
|
|
|
stats["by_status"][status] = stats["by_status"].get(status, 0) + 1
|
|
|
|
|
|
|
|
|
job_type = job.job_type.value
|
|
|
stats["by_type"][job_type] = stats["by_type"].get(job_type, 0) + 1
|
|
|
|
|
|
|
|
|
priority = job.priority.value
|
|
|
stats["by_priority"][priority] = stats["by_priority"].get(priority, 0) + 1
|
|
|
|
|
|
|
|
|
if job.created_at >= datetime.utcnow() - timedelta(days=7):
|
|
|
recent_jobs.append({
|
|
|
"job_id": job.id,
|
|
|
"status": job.status.value,
|
|
|
"created_at": job.created_at.isoformat(),
|
|
|
"topic": job.configuration.topic[:50] + "..." if len(job.configuration.topic) > 50 else job.configuration.topic
|
|
|
})
|
|
|
|
|
|
|
|
|
recent_jobs.sort(key=lambda x: x["created_at"], reverse=True)
|
|
|
stats["recent_activity"] = recent_jobs[:10]
|
|
|
|
|
|
logger.info(
|
|
|
"Generated job statistics",
|
|
|
user_id=user_id,
|
|
|
total_jobs=stats["total_jobs"]
|
|
|
)
|
|
|
|
|
|
return stats
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(
|
|
|
"Failed to get job statistics",
|
|
|
user_id=user_id,
|
|
|
error=str(e),
|
|
|
exc_info=True
|
|
|
)
|
|
|
return {
|
|
|
"total_jobs": 0,
|
|
|
"by_status": {},
|
|
|
"by_type": {},
|
|
|
"by_priority": {},
|
|
|
"recent_activity": []
|
|
|
}
|
|
|
|
|
|
async def cleanup_old_jobs(self, retention_days: int = 30) -> int:
|
|
|
"""
|
|
|
Clean up old completed jobs based on retention policy.
|
|
|
|
|
|
Args:
|
|
|
retention_days: Number of days to retain completed jobs
|
|
|
|
|
|
Returns:
|
|
|
Number of jobs cleaned up
|
|
|
"""
|
|
|
try:
|
|
|
cutoff_date = datetime.utcnow() - timedelta(days=retention_days)
|
|
|
cleaned_count = 0
|
|
|
|
|
|
|
|
|
job_pattern = f"{RedisKeyManager.JOB_PREFIX}:*"
|
|
|
job_keys = await self.redis_client.keys(job_pattern)
|
|
|
|
|
|
for job_key in job_keys:
|
|
|
try:
|
|
|
job_data = await redis_json_get(self.redis_client, job_key)
|
|
|
if job_data:
|
|
|
job = Job(**job_data)
|
|
|
|
|
|
|
|
|
if (job.completed_at and
|
|
|
job.completed_at < cutoff_date and
|
|
|
job.status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]):
|
|
|
|
|
|
|
|
|
job_id = job.id
|
|
|
await self.cleanup_job_data(job_id)
|
|
|
await self.redis_client.delete(job_key)
|
|
|
|
|
|
|
|
|
user_jobs_key = RedisKeyManager.user_jobs_key(job.user_id)
|
|
|
await self.redis_client.srem(user_jobs_key, job_id)
|
|
|
|
|
|
cleaned_count += 1
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Failed to process job key {job_key}: {e}")
|
|
|
continue
|
|
|
|
|
|
logger.info(f"Cleaned up {cleaned_count} old jobs")
|
|
|
return cleaned_count
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(
|
|
|
"Failed to cleanup old jobs",
|
|
|
retention_days=retention_days,
|
|
|
error=str(e),
|
|
|
exc_info=True
|
|
|
)
|
|
|
return 0
|
|
|
|
|
|
async def collect_job_metrics(self, job_id: str) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Collect comprehensive metrics for a job.
|
|
|
|
|
|
Args:
|
|
|
job_id: Job ID to collect metrics for
|
|
|
|
|
|
Returns:
|
|
|
Dict containing job metrics
|
|
|
"""
|
|
|
try:
|
|
|
job = await self._get_job_by_id(job_id)
|
|
|
if not job:
|
|
|
return {}
|
|
|
|
|
|
metrics = {
|
|
|
"job_id": job_id,
|
|
|
"status": job.status.value,
|
|
|
"duration_seconds": job.duration_seconds,
|
|
|
"queue_time_seconds": None,
|
|
|
"processing_time_seconds": None,
|
|
|
"created_at": job.created_at.isoformat(),
|
|
|
"started_at": job.started_at.isoformat() if job.started_at else None,
|
|
|
"completed_at": job.completed_at.isoformat() if job.completed_at else None,
|
|
|
"progress_percentage": job.progress.percentage,
|
|
|
"stages_completed": job.progress.stages_completed,
|
|
|
"error_info": job.error.dict() if job.error else None
|
|
|
}
|
|
|
|
|
|
|
|
|
if job.started_at:
|
|
|
queue_time = (job.started_at - job.created_at).total_seconds()
|
|
|
metrics["queue_time_seconds"] = queue_time
|
|
|
|
|
|
|
|
|
if job.started_at and job.completed_at:
|
|
|
processing_time = (job.completed_at - job.started_at).total_seconds()
|
|
|
metrics["processing_time_seconds"] = processing_time
|
|
|
|
|
|
|
|
|
if job.metrics:
|
|
|
metrics.update({
|
|
|
"cpu_usage_percent": job.metrics.cpu_usage_percent,
|
|
|
"memory_usage_mb": job.metrics.memory_usage_mb,
|
|
|
"disk_usage_mb": job.metrics.disk_usage_mb,
|
|
|
"network_usage_mb": job.metrics.network_usage_mb
|
|
|
})
|
|
|
|
|
|
return metrics
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to collect metrics for job {job_id}: {e}")
|
|
|
return {}
|
|
|
|
|
|
async def update_job_metrics(
|
|
|
self,
|
|
|
job_id: str,
|
|
|
cpu_usage: Optional[float] = None,
|
|
|
memory_usage: Optional[float] = None,
|
|
|
disk_usage: Optional[float] = None,
|
|
|
network_usage: Optional[float] = None
|
|
|
) -> bool:
|
|
|
"""
|
|
|
Update performance metrics for a job.
|
|
|
|
|
|
Args:
|
|
|
job_id: Job ID to update metrics for
|
|
|
cpu_usage: CPU usage percentage
|
|
|
memory_usage: Memory usage in MB
|
|
|
disk_usage: Disk usage in MB
|
|
|
network_usage: Network usage in MB
|
|
|
|
|
|
Returns:
|
|
|
True if metrics updated successfully, False otherwise
|
|
|
"""
|
|
|
try:
|
|
|
job = await self._get_job_by_id(job_id)
|
|
|
if not job:
|
|
|
return False
|
|
|
|
|
|
|
|
|
if not job.metrics:
|
|
|
from ..models.job import JobMetrics
|
|
|
job.metrics = JobMetrics()
|
|
|
|
|
|
|
|
|
if cpu_usage is not None:
|
|
|
job.metrics.cpu_usage_percent = cpu_usage
|
|
|
if memory_usage is not None:
|
|
|
job.metrics.memory_usage_mb = memory_usage
|
|
|
if disk_usage is not None:
|
|
|
job.metrics.disk_usage_mb = disk_usage
|
|
|
if network_usage is not None:
|
|
|
job.metrics.network_usage_mb = network_usage
|
|
|
|
|
|
|
|
|
if job.started_at:
|
|
|
if job.completed_at:
|
|
|
job.metrics.processing_time_seconds = (job.completed_at - job.started_at).total_seconds()
|
|
|
else:
|
|
|
job.metrics.processing_time_seconds = (datetime.utcnow() - job.started_at).total_seconds()
|
|
|
|
|
|
|
|
|
if job.started_at:
|
|
|
job.metrics.queue_time_seconds = (job.started_at - job.created_at).total_seconds()
|
|
|
|
|
|
|
|
|
await self._save_job(job)
|
|
|
|
|
|
logger.info(f"Updated metrics for job {job_id}")
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to update metrics for job {job_id}: {e}")
|
|
|
return False
|
|
|
|
|
|
async def get_system_metrics(self) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Get system-wide job metrics and statistics.
|
|
|
|
|
|
Returns:
|
|
|
Dict containing system metrics
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
queue_length = await self.redis_client.llen(RedisKeyManager.JOB_QUEUE)
|
|
|
processing_jobs = await self.redis_client.scard("queue:processing")
|
|
|
|
|
|
|
|
|
today = datetime.utcnow().date()
|
|
|
completed_today = int(await self.redis_client.get(f"queue:completed:{today}") or 0)
|
|
|
failed_today = int(await self.redis_client.get(f"queue:failed:{today}") or 0)
|
|
|
|
|
|
|
|
|
processing_times_key = "queue:processing_times"
|
|
|
recent_times = await self.redis_client.lrange(processing_times_key, 0, 99)
|
|
|
|
|
|
avg_processing_time = 0
|
|
|
if recent_times:
|
|
|
total_time = sum(float(time) for time in recent_times)
|
|
|
avg_processing_time = total_time / len(recent_times)
|
|
|
|
|
|
|
|
|
status_distribution = await self._get_job_status_distribution()
|
|
|
|
|
|
|
|
|
active_users = await self._get_active_users_count()
|
|
|
|
|
|
return {
|
|
|
"queue_metrics": {
|
|
|
"queue_length": queue_length,
|
|
|
"processing_jobs": processing_jobs,
|
|
|
"completed_today": completed_today,
|
|
|
"failed_today": failed_today,
|
|
|
"success_rate": (completed_today / max(completed_today + failed_today, 1)) * 100
|
|
|
},
|
|
|
"performance_metrics": {
|
|
|
"average_processing_time_minutes": avg_processing_time,
|
|
|
"total_jobs_processed_today": completed_today + failed_today
|
|
|
},
|
|
|
"job_distribution": status_distribution,
|
|
|
"user_metrics": {
|
|
|
"active_users_today": active_users
|
|
|
},
|
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to get system metrics: {e}")
|
|
|
return {}
|
|
|
|
|
|
async def _get_job_status_distribution(self) -> Dict[str, int]:
|
|
|
"""Get distribution of jobs by status."""
|
|
|
try:
|
|
|
distribution = {}
|
|
|
|
|
|
|
|
|
job_pattern = f"{RedisKeyManager.JOB_PREFIX}:*"
|
|
|
job_keys = await self.redis_client.keys(job_pattern)
|
|
|
|
|
|
for job_key in job_keys[:1000]:
|
|
|
try:
|
|
|
job_data = await redis_json_get(self.redis_client, job_key)
|
|
|
if job_data and not job_data.get("is_deleted", False):
|
|
|
status = job_data.get("status", "unknown")
|
|
|
distribution[status] = distribution.get(status, 0) + 1
|
|
|
except Exception:
|
|
|
continue
|
|
|
|
|
|
return distribution
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to get job status distribution: {e}")
|
|
|
return {}
|
|
|
|
|
|
async def _get_active_users_count(self) -> int:
|
|
|
"""Get count of users who created jobs today."""
|
|
|
try:
|
|
|
today = datetime.utcnow().date()
|
|
|
active_users_key = f"active_users:{today}"
|
|
|
|
|
|
|
|
|
return await self.redis_client.scard(active_users_key)
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to get active users count: {e}")
|
|
|
return 0
|
|
|
|
|
|
async def create_batch_jobs(
|
|
|
self,
|
|
|
user_id: str,
|
|
|
job_configurations: List[Dict[str, Any]],
|
|
|
batch_priority: str = "normal"
|
|
|
) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Create multiple jobs as a batch operation.
|
|
|
|
|
|
Args:
|
|
|
user_id: User ID creating the batch
|
|
|
job_configurations: List of job configurations
|
|
|
batch_priority: Priority for the entire batch
|
|
|
|
|
|
Returns:
|
|
|
Dict containing batch information and job IDs
|
|
|
"""
|
|
|
try:
|
|
|
import uuid
|
|
|
from ..models.job import JobConfiguration, JobPriority, JobType
|
|
|
|
|
|
batch_id = str(uuid.uuid4())
|
|
|
created_jobs = []
|
|
|
failed_jobs = []
|
|
|
|
|
|
logger.info(
|
|
|
"Creating batch jobs",
|
|
|
user_id=user_id,
|
|
|
batch_id=batch_id,
|
|
|
job_count=len(job_configurations),
|
|
|
batch_priority=batch_priority
|
|
|
)
|
|
|
|
|
|
|
|
|
for i, config_data in enumerate(job_configurations):
|
|
|
try:
|
|
|
|
|
|
job_config = JobConfiguration(**config_data)
|
|
|
|
|
|
|
|
|
job = Job(
|
|
|
user_id=user_id,
|
|
|
job_type=JobType.VIDEO_GENERATION,
|
|
|
priority=JobPriority(batch_priority),
|
|
|
configuration=job_config,
|
|
|
batch_id=batch_id
|
|
|
)
|
|
|
|
|
|
|
|
|
job_key = RedisKeyManager.job_key(job.id)
|
|
|
await redis_json_set(self.redis_client, job_key, job.dict())
|
|
|
|
|
|
|
|
|
user_jobs_key = RedisKeyManager.user_jobs_key(user_id)
|
|
|
await self.redis_client.sadd(user_jobs_key, job.id)
|
|
|
|
|
|
|
|
|
await self.redis_client.lpush(RedisKeyManager.JOB_QUEUE, job.id)
|
|
|
|
|
|
created_jobs.append({
|
|
|
"job_id": job.id,
|
|
|
"position_in_batch": i + 1,
|
|
|
"topic": job_config.topic[:50] + "..." if len(job_config.topic) > 50 else job_config.topic
|
|
|
})
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to create job {i + 1} in batch {batch_id}: {e}")
|
|
|
failed_jobs.append({
|
|
|
"position_in_batch": i + 1,
|
|
|
"error": str(e),
|
|
|
"config": config_data
|
|
|
})
|
|
|
|
|
|
|
|
|
batch_metadata = {
|
|
|
"batch_id": batch_id,
|
|
|
"user_id": user_id,
|
|
|
"total_jobs": len(job_configurations),
|
|
|
"created_jobs": len(created_jobs),
|
|
|
"failed_jobs": len(failed_jobs),
|
|
|
"batch_priority": batch_priority,
|
|
|
"created_at": datetime.utcnow().isoformat(),
|
|
|
"job_ids": [job["job_id"] for job in created_jobs]
|
|
|
}
|
|
|
|
|
|
batch_key = f"batch:{batch_id}"
|
|
|
await redis_json_set(self.redis_client, batch_key, batch_metadata, ex=86400 * 7)
|
|
|
|
|
|
|
|
|
user_batches_key = f"user_batches:{user_id}"
|
|
|
await self.redis_client.sadd(user_batches_key, batch_id)
|
|
|
|
|
|
logger.info(
|
|
|
"Batch jobs created",
|
|
|
batch_id=batch_id,
|
|
|
user_id=user_id,
|
|
|
created_count=len(created_jobs),
|
|
|
failed_count=len(failed_jobs)
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"batch_id": batch_id,
|
|
|
"total_requested": len(job_configurations),
|
|
|
"created_jobs": created_jobs,
|
|
|
"failed_jobs": failed_jobs,
|
|
|
"success_rate": len(created_jobs) / len(job_configurations) * 100,
|
|
|
"created_at": datetime.utcnow().isoformat()
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(
|
|
|
"Failed to create batch jobs",
|
|
|
user_id=user_id,
|
|
|
error=str(e),
|
|
|
exc_info=True
|
|
|
)
|
|
|
return {
|
|
|
"batch_id": None,
|
|
|
"total_requested": len(job_configurations),
|
|
|
"created_jobs": [],
|
|
|
"failed_jobs": [{"error": str(e), "config": config} for config in job_configurations],
|
|
|
"success_rate": 0,
|
|
|
"created_at": datetime.utcnow().isoformat()
|
|
|
}
|
|
|
|
|
|
async def get_batch_status(self, batch_id: str, user_id: str) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Get status information for a batch of jobs.
|
|
|
|
|
|
Args:
|
|
|
batch_id: Batch ID to get status for
|
|
|
user_id: User ID (for ownership validation)
|
|
|
|
|
|
Returns:
|
|
|
Dict containing batch status and job summaries
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
batch_key = f"batch:{batch_id}"
|
|
|
batch_data = await redis_json_get(self.redis_client, batch_key)
|
|
|
|
|
|
if not batch_data:
|
|
|
return {"error": "Batch not found"}
|
|
|
|
|
|
|
|
|
if batch_data["user_id"] != user_id:
|
|
|
return {"error": "Access denied"}
|
|
|
|
|
|
|
|
|
job_statuses = []
|
|
|
status_counts = {}
|
|
|
total_progress = 0
|
|
|
|
|
|
for job_id in batch_data["job_ids"]:
|
|
|
job = await self._get_job_by_id(job_id)
|
|
|
if job:
|
|
|
status = job.status.value
|
|
|
status_counts[status] = status_counts.get(status, 0) + 1
|
|
|
total_progress += job.progress.percentage
|
|
|
|
|
|
job_statuses.append({
|
|
|
"job_id": job_id,
|
|
|
"status": status,
|
|
|
"progress": job.progress.percentage,
|
|
|
"topic": job.configuration.topic[:50] + "..." if len(job.configuration.topic) > 50 else job.configuration.topic,
|
|
|
"created_at": job.created_at.isoformat(),
|
|
|
"completed_at": job.completed_at.isoformat() if job.completed_at else None,
|
|
|
"error_message": job.error.error_message if job.error else None
|
|
|
})
|
|
|
|
|
|
|
|
|
overall_progress = total_progress / len(batch_data["job_ids"]) if batch_data["job_ids"] else 0
|
|
|
|
|
|
|
|
|
if status_counts.get("failed", 0) == len(batch_data["job_ids"]):
|
|
|
batch_status = "failed"
|
|
|
elif status_counts.get("completed", 0) == len(batch_data["job_ids"]):
|
|
|
batch_status = "completed"
|
|
|
elif status_counts.get("cancelled", 0) == len(batch_data["job_ids"]):
|
|
|
batch_status = "cancelled"
|
|
|
elif any(status in ["processing"] for status in status_counts.keys()):
|
|
|
batch_status = "processing"
|
|
|
elif any(status in ["queued"] for status in status_counts.keys()):
|
|
|
batch_status = "queued"
|
|
|
else:
|
|
|
batch_status = "mixed"
|
|
|
|
|
|
return {
|
|
|
"batch_id": batch_id,
|
|
|
"batch_status": batch_status,
|
|
|
"overall_progress": round(overall_progress, 2),
|
|
|
"total_jobs": batch_data["total_jobs"],
|
|
|
"status_summary": status_counts,
|
|
|
"created_at": batch_data["created_at"],
|
|
|
"batch_priority": batch_data["batch_priority"],
|
|
|
"jobs": job_statuses
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to get batch status for {batch_id}: {e}")
|
|
|
return {"error": str(e)}
|
|
|
|
|
|
async def cancel_batch_jobs(self, batch_id: str, user_id: str) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Cancel all jobs in a batch.
|
|
|
|
|
|
Args:
|
|
|
batch_id: Batch ID to cancel
|
|
|
user_id: User ID (for ownership validation)
|
|
|
|
|
|
Returns:
|
|
|
Dict containing cancellation results
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
batch_key = f"batch:{batch_id}"
|
|
|
batch_data = await redis_json_get(self.redis_client, batch_key)
|
|
|
|
|
|
if not batch_data:
|
|
|
return {"error": "Batch not found"}
|
|
|
|
|
|
|
|
|
if batch_data["user_id"] != user_id:
|
|
|
return {"error": "Access denied"}
|
|
|
|
|
|
cancelled_jobs = []
|
|
|
failed_cancellations = []
|
|
|
|
|
|
|
|
|
for job_id in batch_data["job_ids"]:
|
|
|
try:
|
|
|
success = await self.cancel_job(job_id)
|
|
|
if success:
|
|
|
cancelled_jobs.append(job_id)
|
|
|
else:
|
|
|
failed_cancellations.append(job_id)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to cancel job {job_id} in batch {batch_id}: {e}")
|
|
|
failed_cancellations.append(job_id)
|
|
|
|
|
|
logger.info(
|
|
|
"Batch cancellation completed",
|
|
|
batch_id=batch_id,
|
|
|
cancelled_count=len(cancelled_jobs),
|
|
|
failed_count=len(failed_cancellations)
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"batch_id": batch_id,
|
|
|
"cancelled_jobs": cancelled_jobs,
|
|
|
"failed_cancellations": failed_cancellations,
|
|
|
"success_rate": len(cancelled_jobs) / len(batch_data["job_ids"]) * 100,
|
|
|
"cancelled_at": datetime.utcnow().isoformat()
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to cancel batch {batch_id}: {e}")
|
|
|
return {"error": str(e)}
|
|
|
|
|
|
async def get_user_batches(
|
|
|
self,
|
|
|
user_id: str,
|
|
|
limit: int = 10,
|
|
|
offset: int = 0
|
|
|
) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Get list of batches for a user.
|
|
|
|
|
|
Args:
|
|
|
user_id: User ID to get batches for
|
|
|
limit: Maximum number of batches to return
|
|
|
offset: Number of batches to skip
|
|
|
|
|
|
Returns:
|
|
|
Dict containing batch list and pagination info
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
user_batches_key = f"user_batches:{user_id}"
|
|
|
batch_ids = await self.redis_client.smembers(user_batches_key)
|
|
|
|
|
|
if not batch_ids:
|
|
|
return {
|
|
|
"batches": [],
|
|
|
"total_count": 0
|
|
|
}
|
|
|
|
|
|
|
|
|
batches = []
|
|
|
for batch_id in batch_ids:
|
|
|
batch_key = f"batch:{batch_id}"
|
|
|
batch_data = await redis_json_get(self.redis_client, batch_key)
|
|
|
|
|
|
if batch_data:
|
|
|
|
|
|
status_counts = {}
|
|
|
for job_id in batch_data["job_ids"]:
|
|
|
status_key = RedisKeyManager.job_status_key(job_id)
|
|
|
status = await self.redis_client.get(status_key)
|
|
|
if status:
|
|
|
status_counts[status] = status_counts.get(status, 0) + 1
|
|
|
|
|
|
batches.append({
|
|
|
"batch_id": batch_id,
|
|
|
"total_jobs": batch_data["total_jobs"],
|
|
|
"created_jobs": batch_data["created_jobs"],
|
|
|
"batch_priority": batch_data["batch_priority"],
|
|
|
"created_at": batch_data["created_at"],
|
|
|
"status_summary": status_counts
|
|
|
})
|
|
|
|
|
|
|
|
|
batches.sort(key=lambda x: x["created_at"], reverse=True)
|
|
|
|
|
|
|
|
|
total_count = len(batches)
|
|
|
paginated_batches = batches[offset:offset + limit]
|
|
|
|
|
|
return {
|
|
|
"batches": paginated_batches,
|
|
|
"total_count": total_count
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to get user batches for {user_id}: {e}")
|
|
|
return {
|
|
|
"batches": [],
|
|
|
"total_count": 0
|
|
|
}
|
|
|
|
|
|
async def cleanup_batch_data(self, batch_id: str) -> bool:
|
|
|
"""
|
|
|
Clean up batch-related data.
|
|
|
|
|
|
Args:
|
|
|
batch_id: Batch ID to clean up
|
|
|
|
|
|
Returns:
|
|
|
True if cleanup successful, False otherwise
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
batch_key = f"batch:{batch_id}"
|
|
|
batch_data = await redis_json_get(self.redis_client, batch_key)
|
|
|
|
|
|
if batch_data:
|
|
|
|
|
|
for job_id in batch_data["job_ids"]:
|
|
|
await self.cleanup_job_data(job_id)
|
|
|
|
|
|
|
|
|
user_batches_key = f"user_batches:{batch_data['user_id']}"
|
|
|
await self.redis_client.srem(user_batches_key, batch_id)
|
|
|
|
|
|
|
|
|
await self.redis_client.delete(batch_key)
|
|
|
|
|
|
logger.info(f"Cleaned up batch data for {batch_id}")
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to cleanup batch data for {batch_id}: {e}")
|
|
|
return False
|
|
|
|
|
|
async def update_job_status(
|
|
|
self,
|
|
|
job_id: str,
|
|
|
status: JobStatus,
|
|
|
progress_percentage: Optional[float] = None,
|
|
|
current_stage: Optional[str] = None,
|
|
|
error_info: Optional[Dict[str, Any]] = None
|
|
|
) -> bool:
|
|
|
"""
|
|
|
Update job status and progress information.
|
|
|
|
|
|
Args:
|
|
|
job_id: Job ID to update
|
|
|
status: New job status
|
|
|
progress_percentage: Progress percentage (0-100)
|
|
|
current_stage: Current processing stage
|
|
|
error_info: Error information if job failed
|
|
|
|
|
|
Returns:
|
|
|
True if update successful, False otherwise
|
|
|
"""
|
|
|
try:
|
|
|
job = await self._get_job_by_id(job_id)
|
|
|
if not job:
|
|
|
logger.warning(f"Job {job_id} not found for status update")
|
|
|
return False
|
|
|
|
|
|
|
|
|
job.status = status
|
|
|
job.updated_at = datetime.utcnow()
|
|
|
|
|
|
|
|
|
if progress_percentage is not None:
|
|
|
job.progress.percentage = max(0, min(100, progress_percentage))
|
|
|
|
|
|
if current_stage:
|
|
|
job.progress.current_stage = current_stage
|
|
|
if current_stage not in job.progress.stages_completed:
|
|
|
job.progress.stages_completed.append(current_stage)
|
|
|
|
|
|
|
|
|
if status == JobStatus.PROCESSING and not job.started_at:
|
|
|
job.started_at = datetime.utcnow()
|
|
|
elif status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]:
|
|
|
if not job.completed_at:
|
|
|
job.completed_at = datetime.utcnow()
|
|
|
if status == JobStatus.COMPLETED:
|
|
|
job.progress.percentage = 100.0
|
|
|
|
|
|
|
|
|
if error_info and status == JobStatus.FAILED:
|
|
|
from ..models.job import JobError
|
|
|
job.error = JobError(
|
|
|
error_code=error_info.get("error_code", "UNKNOWN_ERROR"),
|
|
|
error_message=error_info.get("error_message", "An unknown error occurred"),
|
|
|
error_details=error_info.get("error_details"),
|
|
|
stack_trace=error_info.get("stack_trace")
|
|
|
)
|
|
|
|
|
|
|
|
|
await self._save_job(job)
|
|
|
|
|
|
|
|
|
status_key = RedisKeyManager.job_status_key(job_id)
|
|
|
await self.redis_client.set(status_key, status.value, ex=300)
|
|
|
|
|
|
|
|
|
if status in [JobStatus.COMPLETED, JobStatus.FAILED]:
|
|
|
today = datetime.utcnow().date()
|
|
|
if status == JobStatus.COMPLETED:
|
|
|
await self.redis_client.incr(f"queue:completed:{today}")
|
|
|
else:
|
|
|
await self.redis_client.incr(f"queue:failed:{today}")
|
|
|
|
|
|
|
|
|
if job.started_at and job.completed_at:
|
|
|
processing_time = (job.completed_at - job.started_at).total_seconds() / 60
|
|
|
processing_times_key = "queue:processing_times"
|
|
|
await self.redis_client.lpush(processing_times_key, str(processing_time))
|
|
|
await self.redis_client.ltrim(processing_times_key, 0, 99)
|
|
|
|
|
|
logger.info(
|
|
|
"Job status updated",
|
|
|
job_id=job_id,
|
|
|
old_status=job.status if job else "unknown",
|
|
|
new_status=status.value,
|
|
|
progress=progress_percentage
|
|
|
)
|
|
|
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(
|
|
|
"Failed to update job status",
|
|
|
job_id=job_id,
|
|
|
status=status.value,
|
|
|
error=str(e),
|
|
|
exc_info=True
|
|
|
)
|
|
|
return False
|
|
|
|
|
|
async def retry_failed_job(self, job_id: str) -> bool:
|
|
|
"""
|
|
|
Retry a failed job if it's eligible for retry.
|
|
|
|
|
|
Args:
|
|
|
job_id: Job ID to retry
|
|
|
|
|
|
Returns:
|
|
|
True if retry initiated, False otherwise
|
|
|
"""
|
|
|
try:
|
|
|
job = await self._get_job_by_id(job_id)
|
|
|
if not job:
|
|
|
logger.warning(f"Job {job_id} not found for retry")
|
|
|
return False
|
|
|
|
|
|
|
|
|
if job.status != JobStatus.FAILED:
|
|
|
logger.warning(f"Job {job_id} is not in failed state, cannot retry")
|
|
|
return False
|
|
|
|
|
|
if job.error and not job.error.can_retry:
|
|
|
logger.warning(f"Job {job_id} has exceeded maximum retry attempts")
|
|
|
return False
|
|
|
|
|
|
|
|
|
job.status = JobStatus.QUEUED
|
|
|
job.progress.percentage = 0.0
|
|
|
job.progress.current_stage = None
|
|
|
job.progress.stages_completed = []
|
|
|
job.started_at = None
|
|
|
job.completed_at = None
|
|
|
job.updated_at = datetime.utcnow()
|
|
|
|
|
|
|
|
|
if job.error:
|
|
|
job.error.retry_count += 1
|
|
|
|
|
|
|
|
|
await self._save_job(job)
|
|
|
|
|
|
|
|
|
await self.redis_client.lpush(RedisKeyManager.JOB_QUEUE, job_id)
|
|
|
|
|
|
|
|
|
status_key = RedisKeyManager.job_status_key(job_id)
|
|
|
await self.redis_client.set(status_key, JobStatus.QUEUED.value, ex=300)
|
|
|
|
|
|
logger.info(f"Job {job_id} queued for retry")
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to retry job {job_id}: {e}")
|
|
|
return False
|
|
|
|
|
|
async def get_job_queue_position(self, job_id: str) -> Optional[int]:
|
|
|
"""
|
|
|
Get the position of a job in the processing queue.
|
|
|
|
|
|
Args:
|
|
|
job_id: Job ID to check position for
|
|
|
|
|
|
Returns:
|
|
|
Queue position (0-based) or None if not in queue
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
queue_jobs = await self.redis_client.lrange(RedisKeyManager.JOB_QUEUE, 0, -1)
|
|
|
|
|
|
|
|
|
for i, queued_job_id in enumerate(queue_jobs):
|
|
|
if queued_job_id == job_id:
|
|
|
return i
|
|
|
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to get queue position for job {job_id}: {e}")
|
|
|
return None
|
|
|
|
|
|
async def get_estimated_completion_time(self, job_id: str) -> Optional[datetime]:
|
|
|
"""
|
|
|
Estimate completion time for a job based on queue position and average processing time.
|
|
|
|
|
|
Args:
|
|
|
job_id: Job ID to estimate completion for
|
|
|
|
|
|
Returns:
|
|
|
Estimated completion datetime or None if cannot estimate
|
|
|
"""
|
|
|
try:
|
|
|
job = await self._get_job_by_id(job_id)
|
|
|
if not job:
|
|
|
return None
|
|
|
|
|
|
|
|
|
if job.status == JobStatus.COMPLETED:
|
|
|
return job.completed_at
|
|
|
elif job.status == JobStatus.PROCESSING:
|
|
|
|
|
|
if job.progress.percentage > 0:
|
|
|
elapsed = (datetime.utcnow() - job.started_at).total_seconds() / 60
|
|
|
estimated_total = elapsed / (job.progress.percentage / 100)
|
|
|
remaining = estimated_total - elapsed
|
|
|
return datetime.utcnow() + timedelta(minutes=remaining)
|
|
|
elif job.status == JobStatus.QUEUED:
|
|
|
|
|
|
queue_position = await self.get_job_queue_position(job_id)
|
|
|
if queue_position is not None:
|
|
|
|
|
|
processing_times_key = "queue:processing_times"
|
|
|
recent_times = await self.redis_client.lrange(processing_times_key, 0, 19)
|
|
|
|
|
|
if recent_times:
|
|
|
avg_time = sum(float(time) for time in recent_times) / len(recent_times)
|
|
|
estimated_wait = queue_position * avg_time
|
|
|
return datetime.utcnow() + timedelta(minutes=estimated_wait)
|
|
|
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to estimate completion time for job {job_id}: {e}")
|
|
|
return None
|
|
|
|
|
|
async def _get_job_by_id(self, job_id: str) -> Optional[Job]:
|
|
|
"""
|
|
|
Get job by ID from Redis.
|
|
|
|
|
|
Args:
|
|
|
job_id: Job ID
|
|
|
|
|
|
Returns:
|
|
|
Job instance or None if not found
|
|
|
"""
|
|
|
try:
|
|
|
job_key = RedisKeyManager.job_key(job_id)
|
|
|
job_data = await redis_json_get(self.redis_client, job_key)
|
|
|
|
|
|
if job_data:
|
|
|
return Job(**job_data)
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to get job {job_id}: {e}")
|
|
|
return None
|
|
|
|
|
|
async def _save_job(self, job: Job) -> bool:
|
|
|
"""
|
|
|
Save job to Redis.
|
|
|
|
|
|
Args:
|
|
|
job: Job instance to save
|
|
|
|
|
|
Returns:
|
|
|
True if save successful, False otherwise
|
|
|
"""
|
|
|
try:
|
|
|
job_key = RedisKeyManager.job_key(job.id)
|
|
|
await redis_json_set(self.redis_client, job_key, job.dict())
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to save job {job.id}: {e}")
|
|
|
return False
|
|
|
|
|
|
def _job_matches_filters(self, job: Job, filters: Optional[Dict[str, Any]]) -> bool:
|
|
|
"""
|
|
|
Check if job matches the provided filters.
|
|
|
|
|
|
Args:
|
|
|
job: Job instance to check
|
|
|
filters: Filter criteria
|
|
|
|
|
|
Returns:
|
|
|
True if job matches filters, False otherwise
|
|
|
"""
|
|
|
if not filters:
|
|
|
return True
|
|
|
|
|
|
try:
|
|
|
|
|
|
if filters.get("status") and job.status.value != filters["status"]:
|
|
|
return False
|
|
|
|
|
|
|
|
|
if filters.get("job_type") and job.job_type.value != filters["job_type"]:
|
|
|
return False
|
|
|
|
|
|
|
|
|
if filters.get("priority") and job.priority.value != filters["priority"]:
|
|
|
return False
|
|
|
|
|
|
|
|
|
if filters.get("created_after"):
|
|
|
try:
|
|
|
created_after = datetime.fromisoformat(filters["created_after"].replace("Z", "+00:00"))
|
|
|
if job.created_at < created_after:
|
|
|
return False
|
|
|
except ValueError:
|
|
|
logger.warning(f"Invalid created_after date format: {filters['created_after']}")
|
|
|
|
|
|
if filters.get("created_before"):
|
|
|
try:
|
|
|
created_before = datetime.fromisoformat(filters["created_before"].replace("Z", "+00:00"))
|
|
|
if job.created_at > created_before:
|
|
|
return False
|
|
|
except ValueError:
|
|
|
logger.warning(f"Invalid created_before date format: {filters['created_before']}")
|
|
|
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error applying filters to job {job.id}: {e}")
|
|
|
return True |