Multi-Tagger / modules /multi_comfy.py
Werli's picture
Upload multi_comfy.py
bac3a59 verified
import gradio as gr
import json
from PIL import Image
import os
from collections import defaultdict
css = """
#custom-gallery{--row-height:180px;display:grid;grid-auto-rows:min-content;gap:10px}#custom-gallery .thumbnail-item{height:var(--row-height);width:100%;position:relative;overflow:hidden;border-radius:8px;box-shadow:0 2px 5px rgb(0 0 0 / .1);transition:transform 0.2s ease,box-shadow 0.2s ease}#custom-gallery .thumbnail-item:hover{transform:translateY(-3px);box-shadow:0 4px 12px rgb(0 0 0 / .15)}#custom-gallery .thumbnail-item img{width:auto;height:100%;max-width:100%;max-height:var(--row-height);object-fit:contain;margin:0 auto;display:block}#custom-gallery .thumbnail-item img.portrait{max-width:100%}#custom-gallery .thumbnail-item img.landscape{max-height:100%}.gallery-container{max-height:500px;overflow-y:auto;padding-right:0;--size-80:500px}.thumbnails{display:flex;position:absolute;bottom:0;width:120px;overflow-x:scroll;padding-top:320px;padding-bottom:280px;padding-left:4px;flex-wrap:wrap}
"""
EMPTY_RESULT = ("Not Available",) * 15
# ---------- EXTRACTION FUNCTIONS ----------
def read_metadata(file_path):
try:
with Image.open(file_path) as img:
return img.info
except Exception as e:
return {"error": f"Error reading file: {str(e)}"}
def extract_workflow_data(file_path):
metadata = read_metadata(file_path)
if "error" in metadata:
return {"error": metadata["error"]}
if 'prompt' in metadata:
try:
return json.loads(metadata['prompt'])
except json.JSONDecodeError:
pass
for key, value in metadata.items():
if isinstance(value, str) and value.strip().startswith('{'):
try:
return json.loads(value)
except json.JSONDecodeError:
continue
return {"error": "No workflow data found"}
def extract_ksampler_params(workflow_data):
seed = steps = cfg = sampler = scheduler = denoise = "Not found"
if not isinstance(workflow_data, dict):
return seed, steps, cfg, sampler, scheduler, denoise
for node in workflow_data.values():
if isinstance(node, dict) and node.get("class_type", "") in ["KSampler", "KSampler (Efficient)"]:
inputs = node.get("inputs", {})
seed = inputs.get("seed", "Not found")
steps = inputs.get("steps", "Not found")
cfg = inputs.get("cfg", "Not found")
sampler = inputs.get("sampler_name", "Not found")
scheduler = inputs.get("scheduler", "Not found")
denoise = inputs.get("denoise", "Not found")
break
return str(seed), str(steps), str(cfg), str(sampler), str(scheduler), str(denoise)
def extract_prompts(workflow_data):
positive = negative = "Not found"
if not isinstance(workflow_data, dict):
return positive, negative
for node in workflow_data.values():
if isinstance(node, dict):
class_type = node.get("class_type", "")
inputs = node.get("inputs", {})
title = node.get("_meta", {}).get("title", "") if node.get("_meta") else ""
if "Text to Conditioning" in class_type:
if "POSITIVE" in title:
positive = inputs.get("text", "Not found")
elif "NEGATIVE" in title:
negative = inputs.get("text", "Not found")
if "ShowText|pysssss" in class_type:
if "text_1" in inputs:
positive = inputs["text_1"]
if "text_2" in inputs:
negative = inputs["text_2"]
if "DPRandomGenerator" in class_type:
if "POSITIVE" in title:
positive = inputs.get("text", "Not found")
elif "NEGATIVE" in title:
negative = inputs.get("text", "Not found")
return str(positive), str(negative)
def extract_loras(workflow_data):
loras = []
if not isinstance(workflow_data, dict):
return "None found"
for node in workflow_data.values():
if isinstance(node, dict):
inputs = node.get("inputs", {})
if "LoraLoader" in node.get("class_type", ""):
name = inputs.get("lora_name", "Unknown")
strength = inputs.get("strength_model", "Unknown")
loras.append(f"{name} (Strength: {strength})")
for val in inputs.values():
if isinstance(val, str) and "lora:" in val.lower():
loras.append(val)
return "\n".join(loras) if loras else "None found"
def extract_model_info(workflow_data):
models = []
if not isinstance(workflow_data, dict):
return "Not found"
for node in workflow_data.values():
if isinstance(node, dict):
inputs = node.get("inputs", {})
class_type = node.get("class_type", "")
if "CheckpointLoader" in class_type:
models.append(inputs.get("ckpt_name", "Unknown"))
if "Model Mecha Recipe" in class_type:
models.append(inputs.get("model_path", "Unknown"))
return "\n".join(models) if models else "Not found"
def extract_image_info_from_file(image_path):
"""Extract actual image dimensions from the image file itself"""
try:
with Image.open(image_path) as img:
width, height = img.size
return str(width), str(height)
except Exception as e:
return "Not found", "Not found"
def extract_batch_size(workflow_data):
"""Extract batch size from workflow data"""
batch_size = "Not found"
if not isinstance(workflow_data, dict):
return batch_size
for node in workflow_data.values():
if isinstance(node, dict) and node.get("class_type", "") == "EmptyLatentImage":
inputs = node.get("inputs", {})
batch_size = inputs.get("batch_size", "Not found")
break
return str(batch_size)
def extract_nodes_info(workflow_data):
if not isinstance(workflow_data, dict):
return "Not found"
total_nodes = len(workflow_data)
node_types = defaultdict(int)
for node in workflow_data.values():
if isinstance(node, dict):
node_types[node.get("class_type", "Unknown")] += 1
summary = f"Total Nodes: {total_nodes}\n"
for t, c in sorted(node_types.items()):
summary += f"{t}: {c}\n"
return summary.strip()
def extract_workflow_as_json(workflow_data):
if isinstance(workflow_data, dict):
return json.dumps(workflow_data, ensure_ascii=False, indent=2)
return "{}"
# ---------- EXTRACTION FUNCTIONS ----------
#
# ---------- IMAGE PROCESSING ----------
def process_single_image(image_path):
"""Extract all workflow info from a single image path."""
if not image_path:
return EMPTY_RESULT
workflow_data = extract_workflow_data(image_path)
if isinstance(workflow_data, dict) and "error" not in workflow_data:
seed, steps, cfg, sampler, scheduler, denoise = extract_ksampler_params(workflow_data)
positive, negative = extract_prompts(workflow_data)
loras = extract_loras(workflow_data)
models = extract_model_info(workflow_data)
# Get actual image dimensions instead of workflow dimensions
width, height = extract_image_info_from_file(image_path)
batch = extract_batch_size(workflow_data)
nodes = extract_nodes_info(workflow_data)
full_json = extract_workflow_as_json(workflow_data)
else:
error = str(workflow_data.get("error", "Unknown error"))
seed = steps = cfg = sampler = scheduler = denoise = positive = negative = loras = models = width = height = batch = nodes = full_json = error
return seed, steps, cfg, sampler, scheduler, denoise, \
positive, negative, loras, models, width, height, batch, nodes, full_json
def append_gallery(gallery: list, image: str):
"""Add a single image to the gallery"""
if gallery is None:
gallery = []
if not image:
return gallery, None
gallery.append(image)
return gallery, None
def extend_gallery(gallery, images):
"""Extend gallery preserving uniqueness"""
if gallery is None:
gallery = []
if not images:
return gallery
# Normalize input - Gradio might pass various formats
incoming_paths = []
if isinstance(images, str): # Single image path
incoming_paths.append(images)
elif isinstance(images, list):
for img in images:
# Handle cases where elements could be tuples from Gallery
if isinstance(img, (tuple, list)):
incoming_paths.append(str(img[0]))
else:
incoming_paths.append(str(img))
unique_incoming = list(set(incoming_paths)) # Avoid duplicates
seen_paths = {item[0] if isinstance(item, (list, tuple)) else item for item in gallery}
new_entries = [path for path in unique_incoming if path not in seen_paths]
# Create entries matching expected gallery style
formatted_new = [(path, '') for path in new_entries]
updated_gallery = gallery + formatted_new
return updated_gallery
def process_gallery(gallery, results_state):
"""Process all images and populate metadata in session."""
if not gallery or len(gallery) == 0:
# Clear results if nothing left
results_state.clear()
return EMPTY_RESULT + (results_state,)
updated_state = {}
first_image_result = EMPTY_RESULT
try:
for item in gallery:
path = item if isinstance(item, str) else item[0]
if path not in results_state:
res = process_single_image(path)
results_state[path] = res
updated_state[path] = res
if first_image_result == EMPTY_RESULT:
first_image_result = res
else:
# Already cached
res = results_state[path]
updated_state[path] = res
if first_image_result == EMPTY_RESULT:
first_image_result = res
results_state.update(updated_state)
return first_image_result + (results_state,)
except Exception as e:
print("[ERROR]", str(e))
return EMPTY_RESULT + (results_state,)
def get_selection_from_gallery(gallery, results_state, evt: gr.SelectData):
"""Fetch result for selected image in gallery."""
if evt is None or evt.value is None:
# No selection: use first image
if gallery and len(gallery) > 0:
img_path = str(gallery[0][0] if isinstance(gallery[0], (list, tuple)) else gallery[0])
if img_path in results_state:
return list(results_state[img_path])
else:
# Handle selection event
try:
selected_value = evt.value
img_path = None
if isinstance(selected_value, dict) and 'image' in selected_value:
img_path = selected_value['image']['path']
elif isinstance(selected_value, (list, tuple)):
img_path = selected_value[0]
else:
img_path = str(selected_value)
if img_path in results_state:
return list(results_state[img_path])
except Exception as e:
print(f"Selection error: {e}")
# Return empty if no image found
return list(EMPTY_RESULT)
# ---------- IMAGE PROCESSING ----------
#
def create_multi_comfy():
with gr.Blocks(css=css, fill_width=True) as demo:
gr.Markdown("# 🛠️ ComfyUI Workflow Information Extractor")
gr.Markdown("Upload Multiple ComfyUI-generated images. Extract prompts, parameters, models, and full workflows.")
with gr.Row():
with gr.Column(scale=2):
upload_button = gr.UploadButton(
"📁 Upload Multiple Images",
file_types=["image"],
file_count="multiple",
size='lg'
)
gallery = gr.Gallery(
columns=3,
show_share_button=False,
interactive=True,
height='auto',
label='Grid of images',
preview=False,
elem_id='custom-gallery'
)
with gr.Column(scale=3):
with gr.Tabs():
with gr.Tab("Sampling Parameters"):
with gr.Row():
with gr.Column():
seed_out = gr.Textbox(label="Seed", interactive=False, show_copy_button=True)
steps_out = gr.Textbox(label="Steps", interactive=False, show_copy_button=True)
cfg_out = gr.Textbox(label="CFG Scale", interactive=False)
with gr.Column():
sampler_out = gr.Textbox(label="Sampler", interactive=False)
scheduler_out = gr.Textbox(label="Scheduler", interactive=False)
denoise_out = gr.Textbox(label="Denoise", interactive=False)
with gr.Tab("Prompts"):
pos_prompt = gr.Textbox(label="Positive Prompt", lines=4, interactive=False, show_copy_button=True)
neg_prompt = gr.Textbox(label="Negative Prompt", lines=4, interactive=False, show_copy_button=True)
with gr.Tab("Models & LoRAs"):
with gr.Row():
lora_out = gr.Textbox(label="LoRAs", lines=5, interactive=False, show_copy_button=True)
model_out = gr.Textbox(label="Base Models", lines=5, interactive=False, show_copy_button=True)
with gr.Tab("Image Info"):
with gr.Row():
with gr.Column():
width_out = gr.Textbox(label="Width", interactive=False)
height_out = gr.Textbox(label="Height", interactive=False)
batch_out = gr.Textbox(label="Batch Size", interactive=False)
with gr.Column():
nodes_out = gr.Textbox(label="Node Counts", lines=15, interactive=True, show_copy_button=True)
with gr.Tab("Full Workflow"):
json_out = gr.Textbox(label="Workflow JSON", lines=20, interactive=True, show_copy_button=True)
# State to store results per image
results_state = gr.State({})
# Event Connections
upload_event = upload_button.upload(
fn=extend_gallery,
inputs=[gallery, upload_button],
outputs=gallery,
queue=False
)
upload_event.then(
fn=process_gallery,
inputs=[gallery, results_state],
outputs=[
seed_out, steps_out, cfg_out, sampler_out, scheduler_out, denoise_out,
pos_prompt, neg_prompt, lora_out, model_out, width_out, height_out,
batch_out, nodes_out, json_out, results_state
]
)
gallery.change(
fn=process_gallery,
inputs=[gallery, results_state],
outputs=[
seed_out, steps_out, cfg_out, sampler_out, scheduler_out, denoise_out,
pos_prompt, neg_prompt, lora_out, model_out, width_out, height_out,
batch_out, nodes_out, json_out, results_state
],
queue=True
)
gallery.select(
get_selection_from_gallery,
inputs=[gallery, results_state],
outputs=[
seed_out, steps_out, cfg_out, sampler_out, scheduler_out, denoise_out,
pos_prompt, neg_prompt, lora_out, model_out, width_out, height_out,
batch_out, nodes_out, json_out
]
)
gr.Markdown("---\n💡 **Note:** It's under development.")
return demo