Spaces:
Sleeping
Sleeping
| """ | |
| Gradio app for the Log Anomaly Detector — deployable to Hugging Face Spaces. | |
| Trains an Isolation Forest + TF-IDF model on baseline logs, then scores | |
| new logs for anomalies. CPU-only, no GPU required. | |
| """ | |
| import re | |
| from dataclasses import dataclass, field | |
| from typing import List | |
| import gradio as gr | |
| from sklearn.ensemble import IsolationForest | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| # ═══════════════════════════════════════════════════════════════════ | |
| # Core detector | |
| # ═══════════════════════════════════════════════════════════════════ | |
| LOG_PATTERN = re.compile( | |
| r"^(?P<ts>\S+\s\S+)?\s*" | |
| r"(?P<level>DEBUG|INFO|WARN|WARNING|ERROR|FATAL|CRITICAL)?\s*" | |
| r"(?P<msg>.*)$" | |
| ) | |
| class LogAnomaly: | |
| line: str | |
| score: float | |
| rank: int = 0 | |
| class LogAnomalyDetector: | |
| def __init__(self, contamination=0.1, max_features=1000, n_estimators=100): | |
| self.vectorizer = TfidfVectorizer( | |
| max_features=max_features, | |
| ngram_range=(1, 2), | |
| token_pattern=r"\b[a-zA-Z_<>][a-zA-Z0-9_<>]*\b", | |
| min_df=1, | |
| ) | |
| self.model = IsolationForest( | |
| contamination=contamination, | |
| n_estimators=n_estimators, | |
| random_state=42, | |
| n_jobs=-1, | |
| ) | |
| self._fitted = False | |
| def _preprocess(line: str) -> str: | |
| line = re.sub(r"\d{4}-\d{2}-\d{2}[T\s]\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[+-]\d{2}:?\d{2})?", "<TS>", line) | |
| line = re.sub(r"[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}", "<UUID>", line) | |
| line = re.sub(r"\b\d+\.\d+\.\d+\.\d+\b", "<IP>", line) | |
| line = re.sub(r"\b[\w.-]+@[\w.-]+\b", "<EMAIL>", line) | |
| line = re.sub(r"\b\d+\b", "<NUM>", line) | |
| return line.lower().strip() | |
| def fit(self, lines: List[str]): | |
| processed = [self._preprocess(l) for l in lines if l.strip()] | |
| if not processed: | |
| raise ValueError("No non-empty lines to fit on") | |
| X = self.vectorizer.fit_transform(processed) | |
| self.model.fit(X) | |
| self._fitted = True | |
| return self | |
| def detect(self, lines: List[str]) -> List[LogAnomaly]: | |
| if not self._fitted: | |
| raise RuntimeError("Call fit() first") | |
| non_empty = [l for l in lines if l.strip()] | |
| if not non_empty: | |
| return [] | |
| processed = [self._preprocess(l) for l in non_empty] | |
| X = self.vectorizer.transform(processed) | |
| scores = self.model.decision_function(X) | |
| preds = self.model.predict(X) | |
| anomalies = [ | |
| LogAnomaly(line=l.strip(), score=float(s)) | |
| for l, s, p in zip(non_empty, scores, preds) | |
| if p == -1 | |
| ] | |
| anomalies.sort(key=lambda a: a.score) | |
| for i, a in enumerate(anomalies, 1): | |
| a.rank = i | |
| return anomalies | |
| # ═══════════════════════════════════════════════════════════════════ | |
| # Gradio UI | |
| # ═══════════════════════════════════════════════════════════════════ | |
| DEMO_BASELINE = """2026-04-10 10:00:01 INFO app: Request GET /api/users completed in 120ms | |
| 2026-04-10 10:00:02 INFO app: Request GET /api/users completed in 135ms | |
| 2026-04-10 10:00:03 INFO app: Request GET /api/orders completed in 98ms | |
| 2026-04-10 10:00:04 INFO app: Request POST /api/orders completed in 145ms | |
| 2026-04-10 10:00:05 INFO app: User 42 logged in successfully | |
| 2026-04-10 10:00:06 INFO app: Request GET /api/products completed in 110ms | |
| 2026-04-10 10:00:07 INFO app: User 43 logged in successfully | |
| 2026-04-10 10:00:08 INFO cache: Cache hit for user:42 | |
| 2026-04-10 10:00:09 INFO db: Query completed in 25ms | |
| 2026-04-10 10:00:10 INFO app: Request GET /api/users completed in 125ms""" | |
| DEMO_TEST = """2026-04-10 10:01:00 INFO app: Request GET /api/users completed in 130ms | |
| 2026-04-10 10:01:01 INFO app: User 44 logged in successfully | |
| 2026-04-10 10:01:02 ERROR app: OutOfMemoryError: Java heap space exhausted | |
| 2026-04-10 10:01:03 FATAL db: Database connection pool exhausted | |
| 2026-04-10 10:01:04 INFO app: Request GET /api/products completed in 115ms | |
| 2026-04-10 10:01:05 CRITICAL security: Unauthorized access attempt from 203.0.113.42 | |
| 2026-04-10 10:01:06 INFO cache: Cache hit for user:45 | |
| 2026-04-10 10:01:07 ERROR billing: Payment gateway timeout after 30s | |
| 2026-04-10 10:01:08 WARN app: Slow query detected: SELECT * FROM users WHERE id=*""" | |
| def analyze(baseline_text: str, test_text: str, contamination: float, repeat_baseline: int): | |
| """Fit on baseline, detect in test logs.""" | |
| try: | |
| baseline = [l for l in baseline_text.strip().splitlines() if l.strip()] | |
| test = [l for l in test_text.strip().splitlines() if l.strip()] | |
| if not baseline: | |
| return "Error: baseline is empty", None, None | |
| if not test: | |
| return "Error: test logs are empty", None, None | |
| # Repeat baseline to make model learn patterns | |
| baseline_repeated = baseline * max(1, int(repeat_baseline)) | |
| detector = LogAnomalyDetector(contamination=contamination) | |
| detector.fit(baseline_repeated) | |
| anomalies = detector.detect(test) | |
| summary = f"**Baseline:** {len(baseline)} unique lines × {repeat_baseline} = {len(baseline_repeated)} training samples \n" | |
| summary += f"**Test:** {len(test)} lines scored \n" | |
| summary += f"**Anomalies found:** {len(anomalies)} ({len(anomalies) / len(test) * 100:.1f}%) \n" | |
| summary += f"**Contamination:** {contamination}" | |
| if anomalies: | |
| table_data = [ | |
| [a.rank, f"{a.score:.4f}", a.line[:120] + ("…" if len(a.line) > 120 else "")] | |
| for a in anomalies | |
| ] | |
| else: | |
| table_data = [[0, "N/A", "No anomalies detected"]] | |
| ranked = "\n".join( | |
| f"**#{a.rank}** `score={a.score:.4f}` \n`{a.line}`" | |
| for a in anomalies[:20] | |
| ) or "_No anomalies detected._" | |
| return summary, table_data, ranked | |
| except Exception as exc: | |
| return f"Error: {exc}", None, None | |
| with gr.Blocks(title="Log Anomaly Detector") as demo: | |
| gr.Markdown( | |
| """ | |
| # Log Anomaly Detector | |
| Detects unusual log lines using **Isolation Forest** + **TF-IDF** on a | |
| baseline of known-normal logs. No GPU required — runs in seconds on CPU. | |
| **How it works:** | |
| 1. Paste a set of baseline logs (normal operation) in the left textbox | |
| 2. Paste a set of test logs you want to scan in the right textbox | |
| 3. Click "Detect Anomalies" | |
| The model learns what normal looks like, then flags lines that don't fit. | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| baseline_input = gr.Textbox( | |
| label="Baseline Logs (normal operation)", | |
| lines=12, | |
| value=DEMO_BASELINE, | |
| ) | |
| with gr.Column(): | |
| test_input = gr.Textbox( | |
| label="Test Logs (to scan)", | |
| lines=12, | |
| value=DEMO_TEST, | |
| ) | |
| with gr.Row(): | |
| contamination = gr.Slider( | |
| label="Contamination (expected anomaly fraction)", | |
| minimum=0.01, | |
| maximum=0.5, | |
| step=0.01, | |
| value=0.15, | |
| ) | |
| repeat_baseline = gr.Slider( | |
| label="Baseline Replication (increases stability)", | |
| minimum=1, | |
| maximum=100, | |
| step=1, | |
| value=20, | |
| ) | |
| detect_btn = gr.Button("Detect Anomalies", variant="primary") | |
| summary_out = gr.Markdown(label="Summary") | |
| table_out = gr.Dataframe( | |
| headers=["Rank", "Score", "Log Line"], | |
| label="Anomalies (sorted by severity)", | |
| ) | |
| ranked_out = gr.Markdown(label="Top Anomalies (detailed)") | |
| detect_btn.click( | |
| analyze, | |
| inputs=[baseline_input, test_input, contamination, repeat_baseline], | |
| outputs=[summary_out, table_out, ranked_out], | |
| ) | |
| gr.Markdown( | |
| """ | |
| --- | |
| **Model:** scikit-learn IsolationForest with TF-IDF features (1-2 grams). | |
| **Preprocessing:** timestamps, UUIDs, IPs, emails, and numbers are replaced | |
| with placeholders to help the model learn structural patterns. | |
| Built as part of a 4,000+ AI tools collection — see the source repo for | |
| more tools and architecture details. | |
| """ | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |