import gradio as gr import time import psutil import pandas as pd import polars as pl import duckdb import dask.dataframe as dd import matplotlib.pyplot as plt from clearml import Task import os import numpy as np from sklearn.preprocessing import OneHotEncoder, LabelEncoder, RobustScaler, MaxAbsScaler, PolynomialFeatures from scipy.stats import zscore, boxcox from sklearn.decomposition import PCA # Set up ClearML configuration with access key and secret key Task.set_credentials( api_host="https://api.clear.ml", # Replace with your ClearML server URL web_host="https://app.clear.ml", # Replace with your ClearML Web URL files_host="https://files.clear.ml", # Replace with your ClearML Files URL key=os.environ.get('CLEARML_API_KEY'), # Your ClearML Access Key secret=os.environ.get('CLEARML_API_SECRET') # Your ClearML Secret Key ) # Create a ClearML task task = Task.init(project_name="nyc_taxi_trip", task_name="MyTask") def display_data_info(file): # Read the uploaded Parquet file using DuckDB con = duckdb.connect(':memory:') df = con.execute("SELECT * FROM read_parquet('" + file.name + "')").fetchdf() # Get information about the data info = df.describe() columns = df.columns.tolist() dtypes = df.dtypes.tolist() shape = df.shape # Create a string to display the information info_str = "Data Shape: {}\n".format(shape) info_str += "Columns: {}\n".format(columns) info_str += "Data Types: {}\n\n".format(dtypes) info_str += str(info) return info_str, columns def perform_action(file, column, action, normalization, encoding, outlier_method, transformation, feature_engineering, data_reduction): # Read the uploaded Parquet file using different libraries libraries = { "pandas": pd.read_parquet, "polars": pl.read_parquet, "duckdb": lambda f: duckdb.connect(':memory:').execute("SELECT * FROM read_parquet('" + f.name + "')").fetchdf(), "dask": lambda f: dd.read_parquet(f.name).compute() } execution_times = {} cpu_usages = {} memory_usages = {} file_reading_times = {} con = None # Reuse the connection for DuckDB for lib, reader in libraries.items(): try: start_time = time.time() process = psutil.Process() cpu_before = process.cpu_percent(interval=None) mem_before = process.memory_info().rss / (1024 ** 2) # Memory in MB file_reading_start_time = time.time() if lib == "duckdb": if con is None: con = duckdb.connect(':memory:') df = con.execute("SELECT * FROM read_parquet('" + file.name + "')").fetchdf() else: df = reader(file) file_reading_end_time = time.time() file_reading_times[lib] = round(file_reading_end_time - file_reading_start_time, 4) if action == "Handle Missing Values": if lib == "pandas": df[column] = df[column].fillna(df[column].mean()) elif lib == "polars": df = df.fill_null(column, df[column].mean()) elif lib == "duckdb": df = df.fillna({column: df[column].mean()}) elif lib == "dask": df = df.fillna({column: df[column].mean()}) elif action == "Normalize Data": if lib == "pandas": if normalization == "Min-Max Scaler": df[column] = (df[column] - df[column].min()) / (df[column].max() - df[column].min()) elif normalization == "Standard Scaler": df[column] = (df[column] - df[column].mean()) / df[column].std() elif normalization == "Robust Scaler": scaler = RobustScaler() df[column] = scaler.fit_transform(df[[column]]) elif normalization == "Max Abs Scaler": scaler = MaxAbsScaler() df[column] = scaler.fit_transform(df[[column]]) elif lib == "polars": if normalization == "Min-Max Scaler": df = df.with_columns([(df[column] - df[column].min()) / (df[column].max() - df[column].min()).alias(column)]) elif normalization == "Standard Scaler": df = df.with_columns([(df[column] - df[column].mean()) / df[column].std().alias(column)]) elif lib == "duckdb": if normalization == "Min-Max Scaler": df = df.assign(**{column: (df[column] - df[column].min()) / (df[column].max() - df[column].min())}) elif normalization == "Standard Scaler": df = df.assign(**{column: (df[column] - df[column].mean()) / df[column].std()}) elif lib == "dask": if normalization == "Min-Max Scaler": df = df.assign(**{column: (df[column] - df[column].min()) / (df[column].max() - df[column].min())}) elif normalization == "Standard Scaler": df = df.assign(**{column: (df[column] - df[column].mean()) / df[column].std()}) elif action == "Encode Categorical Variables": if encoding == "One-Hot Encoding": if lib == "pandas": df = pd.get_dummies(df, columns=[column]) elif lib == "polars": df = df.to_dummies(columns=[column]) elif lib == "duckdb": df = pd.get_dummies(df, columns=[column]) elif lib == "dask": df = dd.get_dummies(df, columns=[column]) elif encoding == "Label Encoding": if lib == "pandas": df[column] = LabelEncoder().fit_transform(df[column]) elif lib == "polars": df = df.with_columns([pl.col(column).cast(pl.Categorical).cast(pl.Int32)]) elif lib == "duckdb": df[column] = LabelEncoder().fit_transform(df[column]) elif lib == "dask": df[column] = dd.from_pandas(LabelEncoder().fit_transform(df[column].compute()), npartitions=1) elif action == "Handle Outliers": if outlier_method == "Z-Score Method": if lib == "pandas": df = df[(np.abs(zscore(df[column])) < 3)] elif lib == "polars": df = df.filter(np.abs(zscore(df[column])) < 3) elif lib == "duckdb": df = df[(np.abs(zscore(df[column])) < 3)] elif lib == "dask": df = df[(np.abs(zscore(df[column].compute())) < 3)] elif outlier_method == "IQR Method": Q1 = df[column].quantile(0.25) Q3 = df[column].quantile(0.75) IQR = Q3 - Q1 if lib == "pandas": df = df[~((df[column] < (Q1 - 1.5 * IQR)) | (df[column] > (Q3 + 1.5 * IQR)))] elif lib == "polars": df = df.filter(~((df[column] < (Q1 - 1.5 * IQR)) | (df[column] > (Q3 + 1.5 * IQR)))) elif lib == "duckdb": df = df[~((df[column] < (Q1 - 1.5 * IQR)) | (df[column] > (Q3 + 1.5 * IQR)))] elif lib == "dask": df = df[~((df[column].compute() < (Q1 - 1.5 * IQR)) | (df[column].compute() > (Q3 + 1.5 * IQR)))] elif action == "Data Transformation": if transformation == "Log Transformation": if lib == "pandas": df[column] = np.log1p(df[column]) elif lib == "polars": df = df.with_columns([pl.col(column).log1p().alias(column)]) elif lib == "duckdb": df[column] = np.log1p(df[column]) elif lib == "dask": df[column] = dd.from_pandas(np.log1p(df[column].compute()), npartitions=1) elif transformation == "Box-Cox Transformation": if lib == "pandas": df[column], _ = boxcox(df[column]) elif lib == "polars": df = df.with_columns([pl.col(column).apply(lambda x: boxcox(x)[0]).alias(column)]) elif lib == "duckdb": df[column], _ = boxcox(df[column]) elif lib == "dask": df[column] = dd.from_pandas(boxcox(df[column].compute())[0], npartitions=1) elif action == "Feature Engineering": if feature_engineering == "Polynomial Features": if lib == "pandas": poly = PolynomialFeatures(degree=2) df = pd.DataFrame(poly.fit_transform(df[[column]]), columns=poly.get_feature_names_out([column])) elif lib == "polars": df = df.with_columns([pl.col(column).pow(2).alias(f"{column}_squared")]) elif lib == "duckdb": df[column + "_squared"] = df[column] ** 2 elif lib == "dask": df[column + "_squared"] = df[column] ** 2 elif feature_engineering == "Interaction Terms": if lib == "pandas": df[column + "_interaction"] = df[column] * df[column] elif lib == "polars": df = df.with_columns([pl.col(column).mul(pl.col(column)).alias(f"{column}_interaction")]) elif lib == "duckdb": df[column + "_interaction"] = df[column] * df[column] elif lib == "dask": df[column + "_interaction"] = df[column] * df[column] elif action == "Data Reduction": if data_reduction == "PCA": if lib == "pandas": pca = PCA(n_components=1) df[column + "_pca"] = pca.fit_transform(df[[column]]) elif lib == "polars": pca = PCA(n_components=1) df = df.with_columns([pl.Series(pca.fit_transform(df[[column]]).alias(f"{column}_pca"))]) elif lib == "duckdb": pca = PCA(n_components=1) df[column + "_pca"] = pca.fit_transform(df[[column]]) elif lib == "dask": pca = PCA(n_components=1) df[column + "_pca"] = dd.from_pandas(pca.fit_transform(df[[column]].compute()), npartitions=1) elif data_reduction == "Feature Selection": if lib == "pandas": corr = df.corr() df = df[corr[column].abs().sort_values(ascending=False).index[:5]] elif lib == "polars": corr = df.corr() df = df.select(corr[column].abs().sort(reverse=True).columns[:5]) elif lib == "duckdb": corr = df.corr() df = df[corr[column].abs().sort_values(ascending=False).index[:5]] elif lib == "dask": corr = df.corr().compute() df = df[corr[column].abs().sort_values(ascending=False).index[:5]] cpu_after = process.cpu_percent(interval=None) mem_after = process.memory_info().rss / (1024 ** 2) # Memory in MB execution_times[lib] = round(time.time() - start_time, 4) cpu_usages[lib] = round(cpu_after - cpu_before, 2) memory_usages[lib] = round(mem_after - mem_before, 2) # Log metrics to ClearML task.logger.report_scalar(title=f"Execution Time ({lib})", series=f"Execution Time ({lib})", value=execution_times[lib], iteration=0) task.logger.report_scalar(title=f"CPU Usage ({lib})", series=f"CPU Usage ({lib})", value=cpu_usages[lib], iteration=0) task.logger.report_scalar(title=f"Memory Usage ({lib})", series=f"Memory Usage ({lib})", value=memory_usages[lib], iteration=0) except Exception as e: execution_times[lib] = 0 cpu_usages[lib] = 0 memory_usages[lib] = 0 file_reading_times[lib] = 0 print(f"Error processing {lib}: {e}") # Generate and save charts libs = list(libraries.keys()) plt.figure(figsize=(6, 4)) plt.bar(libs, [execution_times.get(lib, 0) for lib in libs], color='skyblue') plt.title("Execution Time (s)") plt.xlabel("Library") plt.ylabel("Time (s)") plt.savefig("execution_time.png") plt.close() plt.figure(figsize=(6, 4)) plt.bar(libs, [cpu_usages.get(lib, 0) for lib in libs], color='lightgreen') plt.title("CPU Usage (%)") plt.xlabel("Library") plt.ylabel("CPU (%)") plt.savefig("cpu_usage.png") plt.close() plt.figure(figsize=(6, 4)) plt.bar(libs, [memory_usages.get(lib, 0) for lib in libs], color='lightcoral') plt.title("Memory Usage (MB)") plt.xlabel("Library") plt.ylabel("Memory (MB)") plt.savefig("memory_usage.png") plt.close() plt.figure(figsize=(6, 4)) plt.bar(libs, [file_reading_times.get(lib, 0) for lib in libs], color='yellow') plt.title("File Reading Time (s)") plt.xlabel("Library") plt.ylabel("Time (s)") plt.savefig("file_reading_time.png") plt.close() # Upload plots to ClearML task.upload_artifact(name="execution_time.png", artifact_object="execution_time.png") task.upload_artifact(name="cpu_usage.png", artifact_object="cpu_usage.png") task.upload_artifact(name="memory_usage.png", artifact_object="memory_usage.png") task.upload_artifact(name="file_reading_time.png", artifact_object="file_reading_time.png") return { "pandas": { "Time Taken": execution_times.get("pandas", 0), "CPU Utilization": cpu_usages.get("pandas", 0), "Memory Usage": memory_usages.get("pandas", 0), "File Reading Time": file_reading_times.get("pandas", 0), "Result": "success" }, "polars": { "Time Taken": execution_times.get("polars", 0), "CPU Utilization": cpu_usages.get("polars", 0), "Memory Usage": memory_usages.get("polars", 0), "File Reading Time": file_reading_times.get("polars", 0), "Result": "success" }, "duckdb": { "Time Taken": execution_times.get("duckdb", 0), "CPU Utilization": cpu_usages.get("duckdb", 0), "Memory Usage": memory_usages.get("duckdb", 0), "File Reading Time": file_reading_times.get("duckdb", 0), "Result": "success" }, "dask": { "Time Taken": execution_times.get("dask", 0), "CPU Utilization": cpu_usages.get("dask", 0), "Memory Usage": memory_usages.get("dask", 0), "File Reading Time": file_reading_times.get("dask", 0), "Result": "success" } }, "execution_time.png", "cpu_usage.png", "memory_usage.png", "file_reading_time.png" demo = gr.Blocks() with demo: gr.Markdown("# Data Information and Preprocessing Benchmarking") file_upload = gr.File(label="Upload Parquet File") info_button = gr.Button("Get Data Info") info_text = gr.Textbox(label="Data Information") column_input = gr.Textbox(label="Enter Column Name") action_input = gr.Dropdown(label="Select Action", choices=["Handle Missing Values", "Normalize Data", "Encode Categorical Variables", "Handle Outliers", "Data Transformation", "Feature Engineering", "Data Reduction"]) normalization_input = gr.Dropdown(label="Select Normalization Method", choices=["Min-Max Scaler", "Standard Scaler", "Robust Scaler", "Max Abs Scaler"], visible=False) encoding_input = gr.Dropdown(label="Select Encoding Method", choices=["One-Hot Encoding", "Label Encoding"], visible=False) outlier_input = gr.Dropdown(label="Select Outlier Handling Method", choices=["Z-Score Method", "IQR Method"], visible=False) transformation_input = gr.Dropdown(label="Select Transformation Method", choices=["Log Transformation", "Box-Cox Transformation"], visible=False) feature_engineering_input = gr.Dropdown(label="Select Feature Engineering Method", choices=["Polynomial Features", "Interaction Terms"], visible=False) data_reduction_input = gr.Dropdown(label="Select Data Reduction Method", choices=["PCA", "Feature Selection"], visible=False) action_button = gr.Button("Perform Action") results_text = gr.Textbox(label="Results") execution_time_image = gr.Image(label="Execution Time") cpu_usage_image = gr.Image(label="CPU Usage") memory_usage_image = gr.Image(label="Memory Usage") file_reading_time_image = gr.Image(label="File Reading Time") info_button.click( display_data_info, inputs=[file_upload], outputs=[info_text, column_input] ) action_input.change( lambda x: gr.update(visible=x == "Normalize Data"), inputs=[action_input], outputs=[normalization_input] ) action_input.change( lambda x: gr.update(visible=x == "Encode Categorical Variables"), inputs=[action_input], outputs=[encoding_input] ) action_input.change( lambda x: gr.update(visible=x == "Handle Outliers"), inputs=[action_input], outputs=[outlier_input] ) action_input.change( lambda x: gr.update(visible=x == "Data Transformation"), inputs=[action_input], outputs=[transformation_input] ) action_input.change( lambda x: gr.update(visible=x == "Feature Engineering"), inputs=[action_input], outputs=[feature_engineering_input] ) action_input.change( lambda x: gr.update(visible=x == "Data Reduction"), inputs=[action_input], outputs=[data_reduction_input] ) action_button.click( perform_action, inputs=[file_upload, column_input, action_input, normalization_input, encoding_input, outlier_input, transformation_input, feature_engineering_input, data_reduction_input], outputs=[results_text, execution_time_image, cpu_usage_image, memory_usage_image, file_reading_time_image] ) demo.launch(share=True)