| | """
|
| | DataSynthis_ML_JobTask - Movie Recommendation Model
|
| | A movie recommendation system using collaborative filtering and matrix factorization.
|
| | """
|
| |
|
| | import pandas as pd
|
| | import numpy as np
|
| | from sklearn.metrics.pairwise import cosine_similarity
|
| | from sklearn.decomposition import TruncatedSVD
|
| | import os
|
| | import urllib.request
|
| | import zipfile
|
| | import pickle
|
| | from typing import List, Dict, Optional, Union
|
| |
|
| |
|
| | class MovieRecommender:
|
| | """
|
| | Movie Recommendation Model using collaborative filtering and SVD.
|
| | """
|
| |
|
| | def __init__(self):
|
| | self.ratings = None
|
| | self.movies = None
|
| | self.user_item_matrix = None
|
| | self.item_similarity = None
|
| | self.item_similarity_df = None
|
| | self.svd_model = None
|
| | self.pred_svd_df = None
|
| | self.is_trained = False
|
| |
|
| | def load_data(self):
|
| | """Load MovieLens 100k dataset."""
|
| | dataset_url = "http://files.grouplens.org/datasets/movielens/ml-100k.zip"
|
| | dataset_path = "ml-100k"
|
| |
|
| | if not os.path.exists(dataset_path):
|
| | if os.path.exists("ml-100k.zip"):
|
| | print("Extracting existing MovieLens 100k dataset...")
|
| | with zipfile.ZipFile("ml-100k.zip", "r") as zip_ref:
|
| | zip_ref.extractall(".")
|
| | print("Extraction complete.")
|
| | else:
|
| | print("Downloading MovieLens 100k dataset...")
|
| | try:
|
| | urllib.request.urlretrieve(dataset_url, "ml-100k.zip")
|
| | with zipfile.ZipFile("ml-100k.zip", "r") as zip_ref:
|
| | zip_ref.extractall(".")
|
| | print("Download complete.")
|
| | except Exception as e:
|
| | print(f"Download failed: {e}")
|
| | raise Exception("Could not download dataset")
|
| |
|
| |
|
| | self.ratings = pd.read_csv(
|
| | "ml-100k/u.data",
|
| | sep="\t",
|
| | names=["user_id", "movie_id", "rating", "timestamp"]
|
| | )
|
| |
|
| |
|
| | self.movies = pd.read_csv(
|
| | "ml-100k/u.item",
|
| | sep="|",
|
| | encoding="ISO-8859-1",
|
| | names=["movie_id", "title", "release_date", "video_release_date", "IMDb_URL",
|
| | "unknown", "Action", "Adventure", "Animation", "Children", "Comedy",
|
| | "Crime", "Documentary", "Drama", "Fantasy", "Film-Noir", "Horror",
|
| | "Musical", "Mystery", "Romance", "Sci-Fi", "Thriller", "War", "Western"]
|
| | )
|
| |
|
| |
|
| | self.ratings.drop("timestamp", axis=1, inplace=True)
|
| |
|
| | print(f"Loaded {len(self.ratings)} ratings from {len(self.ratings['user_id'].unique())} users")
|
| | print(f"Loaded {len(self.movies)} movies")
|
| |
|
| | def train(self):
|
| | """Train the recommendation models."""
|
| | if self.ratings is None:
|
| | self.load_data()
|
| |
|
| |
|
| | self.user_item_matrix = self.ratings.pivot(
|
| | index='user_id', columns='movie_id', values='rating'
|
| | )
|
| |
|
| |
|
| | self.item_similarity = cosine_similarity(self.user_item_matrix.T.fillna(0))
|
| | self.item_similarity_df = pd.DataFrame(
|
| | self.item_similarity,
|
| | index=self.user_item_matrix.columns,
|
| | columns=self.user_item_matrix.columns
|
| | )
|
| |
|
| |
|
| | R = self.user_item_matrix.fillna(0)
|
| | self.svd_model = TruncatedSVD(n_components=20, random_state=42)
|
| | U = self.svd_model.fit_transform(R)
|
| | Sigma = np.diag(self.svd_model.singular_values_)
|
| | Vt = self.svd_model.components_
|
| | pred_svd = np.dot(np.dot(U, Sigma), Vt)
|
| | self.pred_svd_df = pd.DataFrame(pred_svd, index=R.index, columns=R.columns)
|
| |
|
| | self.is_trained = True
|
| | print("Model training completed!")
|
| |
|
| | def predict_ratings_cf(self, user_id: int) -> pd.Series:
|
| | """Predict ratings using collaborative filtering."""
|
| | if not self.is_trained:
|
| | raise ValueError("Model must be trained first")
|
| |
|
| | if user_id not in self.user_item_matrix.index:
|
| | raise ValueError(f"User {user_id} not found in dataset")
|
| |
|
| | user_ratings = self.user_item_matrix.loc[user_id]
|
| | weighted_sum = self.item_similarity_df.dot(user_ratings.fillna(0))
|
| | sim_sum = np.abs(self.item_similarity_df).dot(user_ratings.notna().astype(int))
|
| | pred = weighted_sum / np.maximum(sim_sum, 1e-9)
|
| | return pred
|
| |
|
| | def recommend_movies(self, user_id: int, n_recommendations: int = 10,
|
| | method: str = "svd") -> List[Dict]:
|
| | """
|
| | Get movie recommendations for a user.
|
| |
|
| | Args:
|
| | user_id: User ID to get recommendations for
|
| | n_recommendations: Number of recommendations to return
|
| | method: "svd" or "cf" (collaborative filtering)
|
| |
|
| | Returns:
|
| | List of dictionaries with movie recommendations
|
| | """
|
| | if not self.is_trained:
|
| | self.train()
|
| |
|
| |
|
| | if user_id not in self.user_item_matrix.index:
|
| | available_users = sorted(self.user_item_matrix.index.tolist())
|
| | return [{
|
| | "error": f"User {user_id} not found",
|
| | "available_users": f"Available user IDs: {available_users[:10]}... (showing first 10)"
|
| | }]
|
| |
|
| |
|
| | if method == "svd":
|
| | preds = self.pred_svd_df.loc[user_id]
|
| | else:
|
| | preds = self.predict_ratings_cf(user_id)
|
| |
|
| |
|
| | watched = self.ratings[self.ratings.user_id == user_id].movie_id.values
|
| | preds = preds.drop(watched, errors='ignore')
|
| |
|
| |
|
| | top_movies = preds.sort_values(ascending=False).head(n_recommendations).index
|
| | recommendations = self.movies[self.movies.movie_id.isin(top_movies)][["movie_id", "title"]]
|
| |
|
| |
|
| | result = []
|
| | for _, row in recommendations.iterrows():
|
| | result.append({
|
| | "movie_id": int(row["movie_id"]),
|
| | "title": row["title"],
|
| | "predicted_rating": float(preds[row["movie_id"]])
|
| | })
|
| |
|
| | return result
|
| |
|
| | def get_user_stats(self, user_id: int) -> Dict:
|
| | """Get statistics for a user."""
|
| | if not self.is_trained:
|
| | self.train()
|
| |
|
| | if user_id not in self.user_item_matrix.index:
|
| | return {"error": f"User {user_id} not found"}
|
| |
|
| | user_ratings = self.ratings[self.ratings.user_id == user_id]
|
| |
|
| | return {
|
| | "user_id": user_id,
|
| | "total_ratings": len(user_ratings),
|
| | "average_rating": float(user_ratings["rating"].mean()),
|
| | "rating_distribution": user_ratings["rating"].value_counts().to_dict()
|
| | }
|
| |
|
| | def get_available_users(self) -> List[int]:
|
| | """Get list of available user IDs."""
|
| | if not self.is_trained:
|
| | self.train()
|
| | return sorted(self.user_item_matrix.index.tolist())
|
| |
|
| | def save_model(self, path: str):
|
| | """Save the trained model."""
|
| | if not self.is_trained:
|
| | raise ValueError("Model must be trained first")
|
| |
|
| | model_data = {
|
| | 'ratings': self.ratings,
|
| | 'movies': self.movies,
|
| | 'user_item_matrix': self.user_item_matrix,
|
| | 'item_similarity_df': self.item_similarity_df,
|
| | 'svd_model': self.svd_model,
|
| | 'pred_svd_df': self.pred_svd_df,
|
| | 'is_trained': self.is_trained
|
| | }
|
| |
|
| | with open(path, 'wb') as f:
|
| | pickle.dump(model_data, f)
|
| |
|
| | print(f"Model saved to {path}")
|
| |
|
| | def load_model(self, path: str):
|
| | """Load a trained model."""
|
| | with open(path, 'rb') as f:
|
| | model_data = pickle.load(f)
|
| |
|
| | self.ratings = model_data['ratings']
|
| | self.movies = model_data['movies']
|
| | self.user_item_matrix = model_data['user_item_matrix']
|
| | self.item_similarity_df = model_data['item_similarity_df']
|
| | self.svd_model = model_data['svd_model']
|
| | self.pred_svd_df = model_data['pred_svd_df']
|
| | self.is_trained = model_data['is_trained']
|
| |
|
| | print(f"Model loaded from {path}")
|
| |
|
| |
|
| |
|
| | model = MovieRecommender()
|
| |
|
| | def predict(user_id: int, n_recommendations: int = 10, method: str = "svd") -> List[Dict]:
|
| | """
|
| | Inference function for Hugging Face model.
|
| |
|
| | Args:
|
| | user_id: User ID to get recommendations for
|
| | n_recommendations: Number of recommendations (default: 10)
|
| | method: Recommendation method - "svd" or "cf" (default: "svd")
|
| |
|
| | Returns:
|
| | List of movie recommendations
|
| | """
|
| | return model.recommend_movies(user_id, n_recommendations, method) |