""" Gradio app for the Log Anomaly Detector — deployable to Hugging Face Spaces. Trains an Isolation Forest + TF-IDF model on baseline logs, then scores new logs for anomalies. CPU-only, no GPU required. """ import re from dataclasses import dataclass, field from typing import List import gradio as gr from sklearn.ensemble import IsolationForest from sklearn.feature_extraction.text import TfidfVectorizer # ═══════════════════════════════════════════════════════════════════ # Core detector # ═══════════════════════════════════════════════════════════════════ LOG_PATTERN = re.compile( r"^(?P\S+\s\S+)?\s*" r"(?PDEBUG|INFO|WARN|WARNING|ERROR|FATAL|CRITICAL)?\s*" r"(?P.*)$" ) @dataclass class LogAnomaly: line: str score: float rank: int = 0 class LogAnomalyDetector: def __init__(self, contamination=0.1, max_features=1000, n_estimators=100): self.vectorizer = TfidfVectorizer( max_features=max_features, ngram_range=(1, 2), token_pattern=r"\b[a-zA-Z_<>][a-zA-Z0-9_<>]*\b", min_df=1, ) self.model = IsolationForest( contamination=contamination, n_estimators=n_estimators, random_state=42, n_jobs=-1, ) self._fitted = False @staticmethod def _preprocess(line: str) -> str: line = re.sub(r"\d{4}-\d{2}-\d{2}[T\s]\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[+-]\d{2}:?\d{2})?", "", line) line = re.sub(r"[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}", "", line) line = re.sub(r"\b\d+\.\d+\.\d+\.\d+\b", "", line) line = re.sub(r"\b[\w.-]+@[\w.-]+\b", "", line) line = re.sub(r"\b\d+\b", "", line) return line.lower().strip() def fit(self, lines: List[str]): processed = [self._preprocess(l) for l in lines if l.strip()] if not processed: raise ValueError("No non-empty lines to fit on") X = self.vectorizer.fit_transform(processed) self.model.fit(X) self._fitted = True return self def detect(self, lines: List[str]) -> List[LogAnomaly]: if not self._fitted: raise RuntimeError("Call fit() first") non_empty = [l for l in lines if l.strip()] if not non_empty: return [] processed = [self._preprocess(l) for l in non_empty] X = self.vectorizer.transform(processed) scores = self.model.decision_function(X) preds = self.model.predict(X) anomalies = [ LogAnomaly(line=l.strip(), score=float(s)) for l, s, p in zip(non_empty, scores, preds) if p == -1 ] anomalies.sort(key=lambda a: a.score) for i, a in enumerate(anomalies, 1): a.rank = i return anomalies # ═══════════════════════════════════════════════════════════════════ # Gradio UI # ═══════════════════════════════════════════════════════════════════ DEMO_BASELINE = """2026-04-10 10:00:01 INFO app: Request GET /api/users completed in 120ms 2026-04-10 10:00:02 INFO app: Request GET /api/users completed in 135ms 2026-04-10 10:00:03 INFO app: Request GET /api/orders completed in 98ms 2026-04-10 10:00:04 INFO app: Request POST /api/orders completed in 145ms 2026-04-10 10:00:05 INFO app: User 42 logged in successfully 2026-04-10 10:00:06 INFO app: Request GET /api/products completed in 110ms 2026-04-10 10:00:07 INFO app: User 43 logged in successfully 2026-04-10 10:00:08 INFO cache: Cache hit for user:42 2026-04-10 10:00:09 INFO db: Query completed in 25ms 2026-04-10 10:00:10 INFO app: Request GET /api/users completed in 125ms""" DEMO_TEST = """2026-04-10 10:01:00 INFO app: Request GET /api/users completed in 130ms 2026-04-10 10:01:01 INFO app: User 44 logged in successfully 2026-04-10 10:01:02 ERROR app: OutOfMemoryError: Java heap space exhausted 2026-04-10 10:01:03 FATAL db: Database connection pool exhausted 2026-04-10 10:01:04 INFO app: Request GET /api/products completed in 115ms 2026-04-10 10:01:05 CRITICAL security: Unauthorized access attempt from 203.0.113.42 2026-04-10 10:01:06 INFO cache: Cache hit for user:45 2026-04-10 10:01:07 ERROR billing: Payment gateway timeout after 30s 2026-04-10 10:01:08 WARN app: Slow query detected: SELECT * FROM users WHERE id=*""" def analyze(baseline_text: str, test_text: str, contamination: float, repeat_baseline: int): """Fit on baseline, detect in test logs.""" try: baseline = [l for l in baseline_text.strip().splitlines() if l.strip()] test = [l for l in test_text.strip().splitlines() if l.strip()] if not baseline: return "Error: baseline is empty", None, None if not test: return "Error: test logs are empty", None, None # Repeat baseline to make model learn patterns baseline_repeated = baseline * max(1, int(repeat_baseline)) detector = LogAnomalyDetector(contamination=contamination) detector.fit(baseline_repeated) anomalies = detector.detect(test) summary = f"**Baseline:** {len(baseline)} unique lines × {repeat_baseline} = {len(baseline_repeated)} training samples \n" summary += f"**Test:** {len(test)} lines scored \n" summary += f"**Anomalies found:** {len(anomalies)} ({len(anomalies) / len(test) * 100:.1f}%) \n" summary += f"**Contamination:** {contamination}" if anomalies: table_data = [ [a.rank, f"{a.score:.4f}", a.line[:120] + ("…" if len(a.line) > 120 else "")] for a in anomalies ] else: table_data = [[0, "N/A", "No anomalies detected"]] ranked = "\n".join( f"**#{a.rank}** `score={a.score:.4f}` \n`{a.line}`" for a in anomalies[:20] ) or "_No anomalies detected._" return summary, table_data, ranked except Exception as exc: return f"Error: {exc}", None, None with gr.Blocks(title="Log Anomaly Detector") as demo: gr.Markdown( """ # Log Anomaly Detector Detects unusual log lines using **Isolation Forest** + **TF-IDF** on a baseline of known-normal logs. No GPU required — runs in seconds on CPU. **How it works:** 1. Paste a set of baseline logs (normal operation) in the left textbox 2. Paste a set of test logs you want to scan in the right textbox 3. Click "Detect Anomalies" The model learns what normal looks like, then flags lines that don't fit. """ ) with gr.Row(): with gr.Column(): baseline_input = gr.Textbox( label="Baseline Logs (normal operation)", lines=12, value=DEMO_BASELINE, ) with gr.Column(): test_input = gr.Textbox( label="Test Logs (to scan)", lines=12, value=DEMO_TEST, ) with gr.Row(): contamination = gr.Slider( label="Contamination (expected anomaly fraction)", minimum=0.01, maximum=0.5, step=0.01, value=0.15, ) repeat_baseline = gr.Slider( label="Baseline Replication (increases stability)", minimum=1, maximum=100, step=1, value=20, ) detect_btn = gr.Button("Detect Anomalies", variant="primary") summary_out = gr.Markdown(label="Summary") table_out = gr.Dataframe( headers=["Rank", "Score", "Log Line"], label="Anomalies (sorted by severity)", ) ranked_out = gr.Markdown(label="Top Anomalies (detailed)") detect_btn.click( analyze, inputs=[baseline_input, test_input, contamination, repeat_baseline], outputs=[summary_out, table_out, ranked_out], ) gr.Markdown( """ --- **Model:** scikit-learn IsolationForest with TF-IDF features (1-2 grams). **Preprocessing:** timestamps, UUIDs, IPs, emails, and numbers are replaced with placeholders to help the model learn structural patterns. Built as part of a 4,000+ AI tools collection — see the source repo for more tools and architecture details. """ ) if __name__ == "__main__": demo.launch()