WolfDavid's picture
Initial version: Gradio log anomaly detector
8f92ca5
"""
Gradio app for the Log Anomaly Detector — deployable to Hugging Face Spaces.
Trains an Isolation Forest + TF-IDF model on baseline logs, then scores
new logs for anomalies. CPU-only, no GPU required.
"""
import re
from dataclasses import dataclass, field
from typing import List
import gradio as gr
from sklearn.ensemble import IsolationForest
from sklearn.feature_extraction.text import TfidfVectorizer
# ═══════════════════════════════════════════════════════════════════
# Core detector
# ═══════════════════════════════════════════════════════════════════
LOG_PATTERN = re.compile(
r"^(?P<ts>\S+\s\S+)?\s*"
r"(?P<level>DEBUG|INFO|WARN|WARNING|ERROR|FATAL|CRITICAL)?\s*"
r"(?P<msg>.*)$"
)
@dataclass
class LogAnomaly:
line: str
score: float
rank: int = 0
class LogAnomalyDetector:
def __init__(self, contamination=0.1, max_features=1000, n_estimators=100):
self.vectorizer = TfidfVectorizer(
max_features=max_features,
ngram_range=(1, 2),
token_pattern=r"\b[a-zA-Z_<>][a-zA-Z0-9_<>]*\b",
min_df=1,
)
self.model = IsolationForest(
contamination=contamination,
n_estimators=n_estimators,
random_state=42,
n_jobs=-1,
)
self._fitted = False
@staticmethod
def _preprocess(line: str) -> str:
line = re.sub(r"\d{4}-\d{2}-\d{2}[T\s]\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[+-]\d{2}:?\d{2})?", "<TS>", line)
line = re.sub(r"[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}", "<UUID>", line)
line = re.sub(r"\b\d+\.\d+\.\d+\.\d+\b", "<IP>", line)
line = re.sub(r"\b[\w.-]+@[\w.-]+\b", "<EMAIL>", line)
line = re.sub(r"\b\d+\b", "<NUM>", line)
return line.lower().strip()
def fit(self, lines: List[str]):
processed = [self._preprocess(l) for l in lines if l.strip()]
if not processed:
raise ValueError("No non-empty lines to fit on")
X = self.vectorizer.fit_transform(processed)
self.model.fit(X)
self._fitted = True
return self
def detect(self, lines: List[str]) -> List[LogAnomaly]:
if not self._fitted:
raise RuntimeError("Call fit() first")
non_empty = [l for l in lines if l.strip()]
if not non_empty:
return []
processed = [self._preprocess(l) for l in non_empty]
X = self.vectorizer.transform(processed)
scores = self.model.decision_function(X)
preds = self.model.predict(X)
anomalies = [
LogAnomaly(line=l.strip(), score=float(s))
for l, s, p in zip(non_empty, scores, preds)
if p == -1
]
anomalies.sort(key=lambda a: a.score)
for i, a in enumerate(anomalies, 1):
a.rank = i
return anomalies
# ═══════════════════════════════════════════════════════════════════
# Gradio UI
# ═══════════════════════════════════════════════════════════════════
DEMO_BASELINE = """2026-04-10 10:00:01 INFO app: Request GET /api/users completed in 120ms
2026-04-10 10:00:02 INFO app: Request GET /api/users completed in 135ms
2026-04-10 10:00:03 INFO app: Request GET /api/orders completed in 98ms
2026-04-10 10:00:04 INFO app: Request POST /api/orders completed in 145ms
2026-04-10 10:00:05 INFO app: User 42 logged in successfully
2026-04-10 10:00:06 INFO app: Request GET /api/products completed in 110ms
2026-04-10 10:00:07 INFO app: User 43 logged in successfully
2026-04-10 10:00:08 INFO cache: Cache hit for user:42
2026-04-10 10:00:09 INFO db: Query completed in 25ms
2026-04-10 10:00:10 INFO app: Request GET /api/users completed in 125ms"""
DEMO_TEST = """2026-04-10 10:01:00 INFO app: Request GET /api/users completed in 130ms
2026-04-10 10:01:01 INFO app: User 44 logged in successfully
2026-04-10 10:01:02 ERROR app: OutOfMemoryError: Java heap space exhausted
2026-04-10 10:01:03 FATAL db: Database connection pool exhausted
2026-04-10 10:01:04 INFO app: Request GET /api/products completed in 115ms
2026-04-10 10:01:05 CRITICAL security: Unauthorized access attempt from 203.0.113.42
2026-04-10 10:01:06 INFO cache: Cache hit for user:45
2026-04-10 10:01:07 ERROR billing: Payment gateway timeout after 30s
2026-04-10 10:01:08 WARN app: Slow query detected: SELECT * FROM users WHERE id=*"""
def analyze(baseline_text: str, test_text: str, contamination: float, repeat_baseline: int):
"""Fit on baseline, detect in test logs."""
try:
baseline = [l for l in baseline_text.strip().splitlines() if l.strip()]
test = [l for l in test_text.strip().splitlines() if l.strip()]
if not baseline:
return "Error: baseline is empty", None, None
if not test:
return "Error: test logs are empty", None, None
# Repeat baseline to make model learn patterns
baseline_repeated = baseline * max(1, int(repeat_baseline))
detector = LogAnomalyDetector(contamination=contamination)
detector.fit(baseline_repeated)
anomalies = detector.detect(test)
summary = f"**Baseline:** {len(baseline)} unique lines × {repeat_baseline} = {len(baseline_repeated)} training samples \n"
summary += f"**Test:** {len(test)} lines scored \n"
summary += f"**Anomalies found:** {len(anomalies)} ({len(anomalies) / len(test) * 100:.1f}%) \n"
summary += f"**Contamination:** {contamination}"
if anomalies:
table_data = [
[a.rank, f"{a.score:.4f}", a.line[:120] + ("…" if len(a.line) > 120 else "")]
for a in anomalies
]
else:
table_data = [[0, "N/A", "No anomalies detected"]]
ranked = "\n".join(
f"**#{a.rank}** `score={a.score:.4f}` \n`{a.line}`"
for a in anomalies[:20]
) or "_No anomalies detected._"
return summary, table_data, ranked
except Exception as exc:
return f"Error: {exc}", None, None
with gr.Blocks(title="Log Anomaly Detector") as demo:
gr.Markdown(
"""
# Log Anomaly Detector
Detects unusual log lines using **Isolation Forest** + **TF-IDF** on a
baseline of known-normal logs. No GPU required — runs in seconds on CPU.
**How it works:**
1. Paste a set of baseline logs (normal operation) in the left textbox
2. Paste a set of test logs you want to scan in the right textbox
3. Click "Detect Anomalies"
The model learns what normal looks like, then flags lines that don't fit.
"""
)
with gr.Row():
with gr.Column():
baseline_input = gr.Textbox(
label="Baseline Logs (normal operation)",
lines=12,
value=DEMO_BASELINE,
)
with gr.Column():
test_input = gr.Textbox(
label="Test Logs (to scan)",
lines=12,
value=DEMO_TEST,
)
with gr.Row():
contamination = gr.Slider(
label="Contamination (expected anomaly fraction)",
minimum=0.01,
maximum=0.5,
step=0.01,
value=0.15,
)
repeat_baseline = gr.Slider(
label="Baseline Replication (increases stability)",
minimum=1,
maximum=100,
step=1,
value=20,
)
detect_btn = gr.Button("Detect Anomalies", variant="primary")
summary_out = gr.Markdown(label="Summary")
table_out = gr.Dataframe(
headers=["Rank", "Score", "Log Line"],
label="Anomalies (sorted by severity)",
)
ranked_out = gr.Markdown(label="Top Anomalies (detailed)")
detect_btn.click(
analyze,
inputs=[baseline_input, test_input, contamination, repeat_baseline],
outputs=[summary_out, table_out, ranked_out],
)
gr.Markdown(
"""
---
**Model:** scikit-learn IsolationForest with TF-IDF features (1-2 grams).
**Preprocessing:** timestamps, UUIDs, IPs, emails, and numbers are replaced
with placeholders to help the model learn structural patterns.
Built as part of a 4,000+ AI tools collection — see the source repo for
more tools and architecture details.
"""
)
if __name__ == "__main__":
demo.launch()