xyz / app.py
usiddiquee786's picture
Update app.py
7fe93df verified
import gradio as gr
import time
import psutil
import pandas as pd
import polars as pl
import duckdb
import dask.dataframe as dd
import matplotlib.pyplot as plt
from clearml import Task
import os
import numpy as np
from sklearn.preprocessing import OneHotEncoder, LabelEncoder, RobustScaler, MaxAbsScaler, PolynomialFeatures
from scipy.stats import zscore, boxcox
from sklearn.decomposition import PCA
# Set up ClearML configuration with access key and secret key
Task.set_credentials(
api_host="https://api.clear.ml", # Replace with your ClearML server URL
web_host="https://app.clear.ml", # Replace with your ClearML Web URL
files_host="https://files.clear.ml", # Replace with your ClearML Files URL
key=os.environ.get('CLEARML_API_KEY'), # Your ClearML Access Key
secret=os.environ.get('CLEARML_API_SECRET') # Your ClearML Secret Key
)
# Create a ClearML task
task = Task.init(project_name="nyc_taxi_trip", task_name="MyTask")
def display_data_info(file):
# Read the uploaded Parquet file using DuckDB
con = duckdb.connect(':memory:')
df = con.execute("SELECT * FROM read_parquet('" + file.name + "')").fetchdf()
# Get information about the data
info = df.describe()
columns = df.columns.tolist()
dtypes = df.dtypes.tolist()
shape = df.shape
# Create a string to display the information
info_str = "Data Shape: {}\n".format(shape)
info_str += "Columns: {}\n".format(columns)
info_str += "Data Types: {}\n\n".format(dtypes)
info_str += str(info)
return info_str, columns
def perform_action(file, column, action, normalization, encoding, outlier_method, transformation, feature_engineering, data_reduction):
# Read the uploaded Parquet file using different libraries
libraries = {
"pandas": pd.read_parquet,
"polars": pl.read_parquet,
"duckdb": lambda f: duckdb.connect(':memory:').execute("SELECT * FROM read_parquet('" + f.name + "')").fetchdf(),
"dask": lambda f: dd.read_parquet(f.name).compute()
}
execution_times = {}
cpu_usages = {}
memory_usages = {}
file_reading_times = {}
con = None # Reuse the connection for DuckDB
for lib, reader in libraries.items():
try:
start_time = time.time()
process = psutil.Process()
cpu_before = process.cpu_percent(interval=None)
mem_before = process.memory_info().rss / (1024 ** 2) # Memory in MB
file_reading_start_time = time.time()
if lib == "duckdb":
if con is None:
con = duckdb.connect(':memory:')
df = con.execute("SELECT * FROM read_parquet('" + file.name + "')").fetchdf()
else:
df = reader(file)
file_reading_end_time = time.time()
file_reading_times[lib] = round(file_reading_end_time - file_reading_start_time, 4)
if action == "Handle Missing Values":
if lib == "pandas":
df[column] = df[column].fillna(df[column].mean())
elif lib == "polars":
df = df.fill_null(column, df[column].mean())
elif lib == "duckdb":
df = df.fillna({column: df[column].mean()})
elif lib == "dask":
df = df.fillna({column: df[column].mean()})
elif action == "Normalize Data":
if lib == "pandas":
if normalization == "Min-Max Scaler":
df[column] = (df[column] - df[column].min()) / (df[column].max() - df[column].min())
elif normalization == "Standard Scaler":
df[column] = (df[column] - df[column].mean()) / df[column].std()
elif normalization == "Robust Scaler":
scaler = RobustScaler()
df[column] = scaler.fit_transform(df[[column]])
elif normalization == "Max Abs Scaler":
scaler = MaxAbsScaler()
df[column] = scaler.fit_transform(df[[column]])
elif lib == "polars":
if normalization == "Min-Max Scaler":
df = df.with_columns([(df[column] - df[column].min()) / (df[column].max() - df[column].min()).alias(column)])
elif normalization == "Standard Scaler":
df = df.with_columns([(df[column] - df[column].mean()) / df[column].std().alias(column)])
elif lib == "duckdb":
if normalization == "Min-Max Scaler":
df = df.assign(**{column: (df[column] - df[column].min()) / (df[column].max() - df[column].min())})
elif normalization == "Standard Scaler":
df = df.assign(**{column: (df[column] - df[column].mean()) / df[column].std()})
elif lib == "dask":
if normalization == "Min-Max Scaler":
df = df.assign(**{column: (df[column] - df[column].min()) / (df[column].max() - df[column].min())})
elif normalization == "Standard Scaler":
df = df.assign(**{column: (df[column] - df[column].mean()) / df[column].std()})
elif action == "Encode Categorical Variables":
if encoding == "One-Hot Encoding":
if lib == "pandas":
df = pd.get_dummies(df, columns=[column])
elif lib == "polars":
df = df.to_dummies(columns=[column])
elif lib == "duckdb":
df = pd.get_dummies(df, columns=[column])
elif lib == "dask":
df = dd.get_dummies(df, columns=[column])
elif encoding == "Label Encoding":
if lib == "pandas":
df[column] = LabelEncoder().fit_transform(df[column])
elif lib == "polars":
df = df.with_columns([pl.col(column).cast(pl.Categorical).cast(pl.Int32)])
elif lib == "duckdb":
df[column] = LabelEncoder().fit_transform(df[column])
elif lib == "dask":
df[column] = dd.from_pandas(LabelEncoder().fit_transform(df[column].compute()), npartitions=1)
elif action == "Handle Outliers":
if outlier_method == "Z-Score Method":
if lib == "pandas":
df = df[(np.abs(zscore(df[column])) < 3)]
elif lib == "polars":
df = df.filter(np.abs(zscore(df[column])) < 3)
elif lib == "duckdb":
df = df[(np.abs(zscore(df[column])) < 3)]
elif lib == "dask":
df = df[(np.abs(zscore(df[column].compute())) < 3)]
elif outlier_method == "IQR Method":
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
if lib == "pandas":
df = df[~((df[column] < (Q1 - 1.5 * IQR)) | (df[column] > (Q3 + 1.5 * IQR)))]
elif lib == "polars":
df = df.filter(~((df[column] < (Q1 - 1.5 * IQR)) | (df[column] > (Q3 + 1.5 * IQR))))
elif lib == "duckdb":
df = df[~((df[column] < (Q1 - 1.5 * IQR)) | (df[column] > (Q3 + 1.5 * IQR)))]
elif lib == "dask":
df = df[~((df[column].compute() < (Q1 - 1.5 * IQR)) | (df[column].compute() > (Q3 + 1.5 * IQR)))]
elif action == "Data Transformation":
if transformation == "Log Transformation":
if lib == "pandas":
df[column] = np.log1p(df[column])
elif lib == "polars":
df = df.with_columns([pl.col(column).log1p().alias(column)])
elif lib == "duckdb":
df[column] = np.log1p(df[column])
elif lib == "dask":
df[column] = dd.from_pandas(np.log1p(df[column].compute()), npartitions=1)
elif transformation == "Box-Cox Transformation":
if lib == "pandas":
df[column], _ = boxcox(df[column])
elif lib == "polars":
df = df.with_columns([pl.col(column).apply(lambda x: boxcox(x)[0]).alias(column)])
elif lib == "duckdb":
df[column], _ = boxcox(df[column])
elif lib == "dask":
df[column] = dd.from_pandas(boxcox(df[column].compute())[0], npartitions=1)
elif action == "Feature Engineering":
if feature_engineering == "Polynomial Features":
if lib == "pandas":
poly = PolynomialFeatures(degree=2)
df = pd.DataFrame(poly.fit_transform(df[[column]]), columns=poly.get_feature_names_out([column]))
elif lib == "polars":
df = df.with_columns([pl.col(column).pow(2).alias(f"{column}_squared")])
elif lib == "duckdb":
df[column + "_squared"] = df[column] ** 2
elif lib == "dask":
df[column + "_squared"] = df[column] ** 2
elif feature_engineering == "Interaction Terms":
if lib == "pandas":
df[column + "_interaction"] = df[column] * df[column]
elif lib == "polars":
df = df.with_columns([pl.col(column).mul(pl.col(column)).alias(f"{column}_interaction")])
elif lib == "duckdb":
df[column + "_interaction"] = df[column] * df[column]
elif lib == "dask":
df[column + "_interaction"] = df[column] * df[column]
elif action == "Data Reduction":
if data_reduction == "PCA":
if lib == "pandas":
pca = PCA(n_components=1)
df[column + "_pca"] = pca.fit_transform(df[[column]])
elif lib == "polars":
pca = PCA(n_components=1)
df = df.with_columns([pl.Series(pca.fit_transform(df[[column]]).alias(f"{column}_pca"))])
elif lib == "duckdb":
pca = PCA(n_components=1)
df[column + "_pca"] = pca.fit_transform(df[[column]])
elif lib == "dask":
pca = PCA(n_components=1)
df[column + "_pca"] = dd.from_pandas(pca.fit_transform(df[[column]].compute()), npartitions=1)
elif data_reduction == "Feature Selection":
if lib == "pandas":
corr = df.corr()
df = df[corr[column].abs().sort_values(ascending=False).index[:5]]
elif lib == "polars":
corr = df.corr()
df = df.select(corr[column].abs().sort(reverse=True).columns[:5])
elif lib == "duckdb":
corr = df.corr()
df = df[corr[column].abs().sort_values(ascending=False).index[:5]]
elif lib == "dask":
corr = df.corr().compute()
df = df[corr[column].abs().sort_values(ascending=False).index[:5]]
cpu_after = process.cpu_percent(interval=None)
mem_after = process.memory_info().rss / (1024 ** 2) # Memory in MB
execution_times[lib] = round(time.time() - start_time, 4)
cpu_usages[lib] = round(cpu_after - cpu_before, 2)
memory_usages[lib] = round(mem_after - mem_before, 2)
# Log metrics to ClearML
task.logger.report_scalar(title=f"Execution Time ({lib})", series=f"Execution Time ({lib})", value=execution_times[lib], iteration=0)
task.logger.report_scalar(title=f"CPU Usage ({lib})", series=f"CPU Usage ({lib})", value=cpu_usages[lib], iteration=0)
task.logger.report_scalar(title=f"Memory Usage ({lib})", series=f"Memory Usage ({lib})", value=memory_usages[lib], iteration=0)
except Exception as e:
execution_times[lib] = 0
cpu_usages[lib] = 0
memory_usages[lib] = 0
file_reading_times[lib] = 0
print(f"Error processing {lib}: {e}")
# Generate and save charts
libs = list(libraries.keys())
plt.figure(figsize=(6, 4))
plt.bar(libs, [execution_times.get(lib, 0) for lib in libs], color='skyblue')
plt.title("Execution Time (s)")
plt.xlabel("Library")
plt.ylabel("Time (s)")
plt.savefig("execution_time.png")
plt.close()
plt.figure(figsize=(6, 4))
plt.bar(libs, [cpu_usages.get(lib, 0) for lib in libs], color='lightgreen')
plt.title("CPU Usage (%)")
plt.xlabel("Library")
plt.ylabel("CPU (%)")
plt.savefig("cpu_usage.png")
plt.close()
plt.figure(figsize=(6, 4))
plt.bar(libs, [memory_usages.get(lib, 0) for lib in libs], color='lightcoral')
plt.title("Memory Usage (MB)")
plt.xlabel("Library")
plt.ylabel("Memory (MB)")
plt.savefig("memory_usage.png")
plt.close()
plt.figure(figsize=(6, 4))
plt.bar(libs, [file_reading_times.get(lib, 0) for lib in libs], color='yellow')
plt.title("File Reading Time (s)")
plt.xlabel("Library")
plt.ylabel("Time (s)")
plt.savefig("file_reading_time.png")
plt.close()
# Upload plots to ClearML
task.upload_artifact(name="execution_time.png", artifact_object="execution_time.png")
task.upload_artifact(name="cpu_usage.png", artifact_object="cpu_usage.png")
task.upload_artifact(name="memory_usage.png", artifact_object="memory_usage.png")
task.upload_artifact(name="file_reading_time.png", artifact_object="file_reading_time.png")
return {
"pandas": {
"Time Taken": execution_times.get("pandas", 0),
"CPU Utilization": cpu_usages.get("pandas", 0),
"Memory Usage": memory_usages.get("pandas", 0),
"File Reading Time": file_reading_times.get("pandas", 0),
"Result": "success"
},
"polars": {
"Time Taken": execution_times.get("polars", 0),
"CPU Utilization": cpu_usages.get("polars", 0),
"Memory Usage": memory_usages.get("polars", 0),
"File Reading Time": file_reading_times.get("polars", 0),
"Result": "success"
},
"duckdb": {
"Time Taken": execution_times.get("duckdb", 0),
"CPU Utilization": cpu_usages.get("duckdb", 0),
"Memory Usage": memory_usages.get("duckdb", 0),
"File Reading Time": file_reading_times.get("duckdb", 0),
"Result": "success"
},
"dask": {
"Time Taken": execution_times.get("dask", 0),
"CPU Utilization": cpu_usages.get("dask", 0),
"Memory Usage": memory_usages.get("dask", 0),
"File Reading Time": file_reading_times.get("dask", 0),
"Result": "success"
}
}, "execution_time.png", "cpu_usage.png", "memory_usage.png", "file_reading_time.png"
demo = gr.Blocks()
with demo:
gr.Markdown("# Data Information and Preprocessing Benchmarking")
file_upload = gr.File(label="Upload Parquet File")
info_button = gr.Button("Get Data Info")
info_text = gr.Textbox(label="Data Information")
column_input = gr.Textbox(label="Enter Column Name")
action_input = gr.Dropdown(label="Select Action", choices=["Handle Missing Values", "Normalize Data", "Encode Categorical Variables", "Handle Outliers", "Data Transformation", "Feature Engineering", "Data Reduction"])
normalization_input = gr.Dropdown(label="Select Normalization Method", choices=["Min-Max Scaler", "Standard Scaler", "Robust Scaler", "Max Abs Scaler"], visible=False)
encoding_input = gr.Dropdown(label="Select Encoding Method", choices=["One-Hot Encoding", "Label Encoding"], visible=False)
outlier_input = gr.Dropdown(label="Select Outlier Handling Method", choices=["Z-Score Method", "IQR Method"], visible=False)
transformation_input = gr.Dropdown(label="Select Transformation Method", choices=["Log Transformation", "Box-Cox Transformation"], visible=False)
feature_engineering_input = gr.Dropdown(label="Select Feature Engineering Method", choices=["Polynomial Features", "Interaction Terms"], visible=False)
data_reduction_input = gr.Dropdown(label="Select Data Reduction Method", choices=["PCA", "Feature Selection"], visible=False)
action_button = gr.Button("Perform Action")
results_text = gr.Textbox(label="Results")
execution_time_image = gr.Image(label="Execution Time")
cpu_usage_image = gr.Image(label="CPU Usage")
memory_usage_image = gr.Image(label="Memory Usage")
file_reading_time_image = gr.Image(label="File Reading Time")
info_button.click(
display_data_info,
inputs=[file_upload],
outputs=[info_text, column_input]
)
action_input.change(
lambda x: gr.update(visible=x == "Normalize Data"), inputs=[action_input], outputs=[normalization_input]
)
action_input.change(
lambda x: gr.update(visible=x == "Encode Categorical Variables"), inputs=[action_input], outputs=[encoding_input]
)
action_input.change(
lambda x: gr.update(visible=x == "Handle Outliers"), inputs=[action_input], outputs=[outlier_input]
)
action_input.change(
lambda x: gr.update(visible=x == "Data Transformation"), inputs=[action_input], outputs=[transformation_input]
)
action_input.change(
lambda x: gr.update(visible=x == "Feature Engineering"), inputs=[action_input], outputs=[feature_engineering_input]
)
action_input.change(
lambda x: gr.update(visible=x == "Data Reduction"), inputs=[action_input], outputs=[data_reduction_input]
)
action_button.click(
perform_action,
inputs=[file_upload, column_input, action_input, normalization_input, encoding_input, outlier_input, transformation_input, feature_engineering_input, data_reduction_input],
outputs=[results_text, execution_time_image, cpu_usage_image, memory_usage_image, file_reading_time_image]
)
demo.launch(share=True)