| |
| """ |
| BibGuard - Bibliography Checker & Paper Submission Quality Tool |
| |
| Usage: |
| python main.py # Use bibguard.yaml in current directory |
| python main.py --config my.yaml # Use specified config file |
| python main.py --init # Create default config file |
| python main.py --list-templates # List available templates |
| """ |
| import argparse |
| import sys |
| from pathlib import Path |
| from typing import Optional, List |
|
|
| from src.parsers import BibParser, TexParser |
| from src.fetchers import ArxivFetcher, ScholarFetcher, CrossRefFetcher, SemanticScholarFetcher, OpenAlexFetcher, DBLPFetcher |
| from src.analyzers import MetadataComparator, UsageChecker, LLMEvaluator, DuplicateDetector |
| from src.analyzers.llm_evaluator import LLMBackend |
| from src.report.generator import ReportGenerator, EntryReport |
| from src.utils.progress import ProgressDisplay |
| from src.config.yaml_config import BibGuardConfig, load_config, find_config_file, create_default_config |
| from src.config.workflow import WorkflowConfig, WorkflowStep as WFStep, get_default_workflow |
| from src.templates.base_template import get_template, get_all_templates |
| from src.checkers import CHECKER_REGISTRY, CheckResult, CheckSeverity |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser( |
| description="BibGuard: Bibliography Checker & Paper Submission Quality Tool", |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| epilog=""" |
| Usage Examples: |
| python main.py # Auto-detect config.yaml in current directory |
| python main.py --config my.yaml # Use specified config file |
| python main.py --init # Create default config.yaml |
| python main.py --list-templates # List available conference templates |
| """ |
| ) |
| |
| parser.add_argument( |
| "--config", "-c", |
| help="Config file path (default: auto-detect config.yaml)" |
| ) |
| parser.add_argument( |
| "--init", |
| action="store_true", |
| help="Create default config.yaml in current directory" |
| ) |
| parser.add_argument( |
| "--list-templates", |
| action="store_true", |
| help="List all available conference templates" |
| ) |
| |
| args = parser.parse_args() |
| |
| |
| if args.init: |
| output = create_default_config() |
| print(f"✓ Created configuration file: {output}") |
| print("") |
| print(" Next steps:") |
| print(" 1. Edit the 'bib' and 'tex' paths in config.yaml") |
| print(" 2. Run: python main.py --config config.yaml") |
| print("") |
| sys.exit(0) |
| |
| |
| if args.list_templates: |
| from src.ui.template_selector import list_templates |
| list_templates() |
| sys.exit(0) |
| |
| |
| config_path = args.config |
| if not config_path: |
| found = find_config_file() |
| if found: |
| config_path = str(found) |
| else: |
| print("Error: Config file not found") |
| print("") |
| print("Please run 'python main.py --init' to create config.yaml") |
| print("Or use 'python main.py --config <path>' to specify a config file") |
| print("") |
| sys.exit(1) |
| |
| try: |
| config = load_config(config_path) |
| except FileNotFoundError: |
| print(f"Error: Config file does not exist: {config_path}") |
| sys.exit(1) |
| except Exception as e: |
| print(f"Error: Failed to parse config file: {e}") |
| sys.exit(1) |
| |
| |
| mode_dir = bool(config.files.input_dir) |
| |
| if mode_dir: |
| input_dir = config.input_dir_path |
| if not input_dir.exists() or not input_dir.is_dir(): |
| print(f"Error: Input directory does not exist or is not a directory: {input_dir}") |
| sys.exit(1) |
| |
| tex_files = list(input_dir.rglob("*.tex")) |
| bib_files = list(input_dir.rglob("*.bib")) |
| |
| if not tex_files: |
| print(f"Error: No .tex files found in {input_dir}") |
| sys.exit(1) |
| if not bib_files: |
| print(f"Error: No .bib files found in {input_dir}") |
| sys.exit(1) |
| |
| config._tex_files = tex_files |
| config._bib_files = bib_files |
| else: |
| if not config.files.bib: |
| print("Error: bib file path not specified in config") |
| sys.exit(1) |
| if not config.files.tex: |
| print("Error: tex file path not specified in config") |
| sys.exit(1) |
| |
| |
| if not config.bib_path.exists(): |
| print(f"Error: Bib file does not exist: {config.bib_path}") |
| sys.exit(1) |
| if not config.tex_path.exists(): |
| print(f"Error: TeX file does not exist: {config.tex_path}") |
| sys.exit(1) |
| |
| config._tex_files = [config.tex_path] |
| config._bib_files = [config.bib_path] |
| |
| |
| template = None |
| if config.template: |
| template = get_template(config.template) |
| if not template: |
| print(f"Error: Unknown template: {config.template}") |
| print("Use --list-templates to see available templates") |
| sys.exit(1) |
| |
| |
| try: |
| run_checker(config, template) |
| except KeyboardInterrupt: |
| print("\n\nCancelled") |
| sys.exit(130) |
| except Exception as e: |
| print(f"\nError: {e}") |
| import traceback |
| traceback.print_exc() |
| sys.exit(1) |
|
|
|
|
| def run_checker(config: BibGuardConfig, template=None): |
| """Run the bibliography checker with the given configuration.""" |
| progress = ProgressDisplay() |
| |
| |
| if template: |
| pass |
| |
| |
| bib_parser = BibParser() |
| entries = [] |
| for bib_path in config._bib_files: |
| entries.extend(bib_parser.parse_file(str(bib_path))) |
| |
| tex_parser = TexParser() |
| tex_contents = {} |
| merged_citations = {} |
| merged_all_keys = set() |
| |
| for tex_path in config._tex_files: |
| cits = tex_parser.parse_file(str(tex_path)) |
| |
| for k, v in cits.items(): |
| if k not in merged_citations: |
| merged_citations[k] = [] |
| merged_citations[k].extend(v) |
| |
| merged_all_keys.update(tex_parser.get_all_cited_keys()) |
| |
| tex_contents[str(tex_path)] = tex_path.read_text(encoding='utf-8', errors='replace') |
| |
| |
| tex_parser.citations = merged_citations |
| tex_parser.all_keys = merged_all_keys |
| |
| |
| bib_config = config.bibliography |
| |
| arxiv_fetcher = None |
| crossref_fetcher = None |
| scholar_fetcher = None |
| semantic_scholar_fetcher = None |
| openalex_fetcher = None |
| dblp_fetcher = None |
| comparator = None |
| usage_checker = None |
| llm_evaluator = None |
| duplicate_detector = None |
| |
| if bib_config.check_metadata or bib_config.check_relevance: |
| arxiv_fetcher = ArxivFetcher() |
| |
| if bib_config.check_metadata: |
| semantic_scholar_fetcher = SemanticScholarFetcher() |
| openalex_fetcher = OpenAlexFetcher() |
| dblp_fetcher = DBLPFetcher() |
| crossref_fetcher = CrossRefFetcher() |
| scholar_fetcher = ScholarFetcher() |
| comparator = MetadataComparator() |
| |
| if bib_config.check_usage: |
| usage_checker = UsageChecker(tex_parser) |
| |
| if bib_config.check_duplicates: |
| duplicate_detector = DuplicateDetector() |
| |
| if bib_config.check_relevance: |
| llm_config = config.llm |
| backend = LLMBackend(llm_config.backend) |
| llm_evaluator = LLMEvaluator( |
| backend=backend, |
| endpoint=llm_config.endpoint or None, |
| model=llm_config.model or None, |
| api_key=llm_config.api_key or None |
| ) |
| |
| |
| llm_evaluator.test_connection() |
| |
| if not usage_checker: |
| usage_checker = UsageChecker(tex_parser) |
| |
| |
| report_gen = ReportGenerator( |
| minimal_verified=config.output.minimal_verified, |
| check_preprint_ratio=config.bibliography.check_preprint_ratio, |
| preprint_warning_threshold=config.bibliography.preprint_warning_threshold |
| ) |
| report_gen.set_metadata( |
| [str(f) for f in config._bib_files], |
| [str(f) for f in config._tex_files] |
| ) |
| |
| |
| submission_results = [] |
| enabled_checkers = config.submission.get_enabled_checkers() |
| |
| for checker_name in enabled_checkers: |
| if checker_name in CHECKER_REGISTRY: |
| checker = CHECKER_REGISTRY[checker_name]() |
| for tex_path_str, content in tex_contents.items(): |
| results = checker.check(content, {}) |
| |
| for r in results: |
| r.file_path = tex_path_str |
| submission_results.extend(results) |
| |
| |
| report_gen.set_submission_results(submission_results, template) |
| |
| |
| if bib_config.check_duplicates and duplicate_detector: |
| duplicate_groups = duplicate_detector.find_duplicates(entries) |
| report_gen.set_duplicate_groups(duplicate_groups) |
| |
| |
| if bib_config.check_usage and usage_checker: |
| missing = usage_checker.get_missing_entries(entries) |
| report_gen.set_missing_citations(missing) |
| |
| |
| |
| |
| from src.config.workflow import WorkflowConfig, get_default_workflow, WorkflowStep as WFStep |
| workflow_config = get_default_workflow() |
| if config.workflow: |
| workflow_config = WorkflowConfig( |
| steps=[ |
| WFStep( |
| name=step.name, |
| display_name=step.name, |
| description=step.description, |
| enabled=step.enabled, |
| priority=i |
| ) |
| for i, step in enumerate(config.workflow) |
| ] |
| ) |
| |
| |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| import threading |
| |
| |
| progress_lock = threading.Lock() |
| completed_count = [0] |
| |
| def process_single_entry(entry): |
| """Process a single entry (thread-safe).""" |
| |
| usage_result = None |
| if usage_checker: |
| usage_result = usage_checker.check_usage(entry) |
| |
| |
| comparison_result = None |
| if bib_config.check_metadata and comparator: |
| comparison_result = fetch_and_compare_with_workflow( |
| entry, workflow_config, arxiv_fetcher, crossref_fetcher, |
| scholar_fetcher, semantic_scholar_fetcher, openalex_fetcher, |
| dblp_fetcher, comparator |
| ) |
| |
| |
| evaluations = [] |
| if bib_config.check_relevance and llm_evaluator: |
| if usage_result and usage_result.is_used: |
| abstract = get_abstract(entry, comparison_result, arxiv_fetcher) |
| if abstract: |
| for ctx in usage_result.contexts: |
| eval_result = llm_evaluator.evaluate( |
| entry.key, ctx.full_context, abstract |
| ) |
| eval_result.line_number = ctx.line_number |
| eval_result.file_path = ctx.file_path |
| evaluations.append(eval_result) |
| |
| |
| entry_report = EntryReport( |
| entry=entry, |
| comparison=comparison_result, |
| usage=usage_result, |
| evaluations=evaluations |
| ) |
| |
| return entry_report, comparison_result |
| |
| |
| max_workers = min(10, len(entries)) |
| |
| with progress.progress_context(len(entries), "Processing bibliography") as prog: |
| |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: |
| |
| future_to_entry = {executor.submit(process_single_entry, entry): entry for entry in entries} |
| |
| |
| for future in as_completed(future_to_entry): |
| entry = future_to_entry[future] |
| try: |
| entry_report, comparison_result = future.result() |
| |
| |
| with progress_lock: |
| report_gen.add_entry_report(entry_report) |
| |
| |
| if comparison_result and comparison_result.is_match: |
| prog.mark_success() |
| elif comparison_result and comparison_result.has_issues: |
| prog.mark_warning() |
| else: |
| prog.mark_error() |
| |
| completed_count[0] += 1 |
| prog.update(entry.key, "Done", 1) |
| |
| except Exception as e: |
| with progress_lock: |
| prog.mark_error() |
| progress.print_error(f"Error processing {entry.key}: {e}") |
| completed_count[0] += 1 |
| prog.update(entry.key, "Failed", 1) |
| |
| |
| |
| |
| |
| |
| output_dir = config.output_dir_path |
| output_dir.mkdir(parents=True, exist_ok=True) |
| |
| |
| import shutil |
| for bib_path in config._bib_files: |
| shutil.copy2(bib_path, output_dir / bib_path.name) |
| for tex_path in config._tex_files: |
| shutil.copy2(tex_path, output_dir / tex_path.name) |
| |
| bib_report_path = output_dir / "bibliography_report.md" |
| report_gen.save_bibliography_report(str(bib_report_path)) |
| |
| |
| if submission_results: |
| latex_report_path = output_dir / "latex_quality_report.md" |
| report_gen.save_latex_quality_report( |
| str(latex_report_path), |
| submission_results, |
| template |
| ) |
| |
| |
| from src.report.line_report import generate_line_report |
| line_report_path = output_dir / "line_by_line_report.md" |
| |
| |
| all_line_reports = [] |
| for tex_path_str, content in tex_contents.items(): |
| file_results = [r for r in submission_results if r.file_path == tex_path_str] |
| if not file_results: |
| continue |
| |
| from src.report.line_report import LineByLineReportGenerator |
| gen = LineByLineReportGenerator(content, tex_path_str) |
| gen.add_results(file_results) |
| all_line_reports.append(gen.generate()) |
| |
| if all_line_reports: |
| with open(line_report_path, 'w', encoding='utf-8') as f: |
| f.write("\n\n".join(all_line_reports)) |
| |
| |
| if bib_config.check_usage and usage_checker: |
| used_entries = [er.entry for er in report_gen.entries if er.usage and er.usage.is_used] |
| if used_entries: |
| try: |
| keys_to_keep = {entry.key for entry in used_entries} |
| |
| |
| |
| if len(config._bib_files) == 1: |
| clean_bib_path = output_dir / f"{config._bib_files[0].stem}_only_used.bib" |
| bib_parser.filter_file(str(config._bib_files[0]), str(clean_bib_path), keys_to_keep) |
| else: |
| clean_bib_path = output_dir / "merged_only_used.bib" |
| |
| |
| |
| with open(clean_bib_path, 'w', encoding='utf-8') as f: |
| for entry in used_entries: |
| f.write(entry.raw + "\n\n") |
| except Exception as e: |
| pass |
| |
| |
| if not config.output.quiet: |
| bib_stats, latex_stats = report_gen.get_summary_stats() |
| progress.print_detailed_summary(bib_stats, latex_stats, str(output_dir.absolute())) |
|
|
|
|
| def fetch_and_compare_with_workflow( |
| entry, workflow_config, arxiv_fetcher, crossref_fetcher, scholar_fetcher, |
| semantic_scholar_fetcher, openalex_fetcher, dblp_fetcher, comparator |
| ): |
| """Fetch metadata from online sources using the configured workflow.""" |
| from src.utils.normalizer import TextNormalizer |
| |
| all_results = [] |
| enabled_steps = workflow_config.get_enabled_steps() |
| |
| for step in enabled_steps: |
| result = None |
| |
| if step.name == "arxiv_id" and entry.has_arxiv and arxiv_fetcher: |
| arxiv_meta = arxiv_fetcher.fetch_by_id(entry.arxiv_id) |
| if arxiv_meta: |
| result = comparator.compare_with_arxiv(entry, arxiv_meta) |
| |
| elif step.name == "crossref_doi" and entry.doi and crossref_fetcher: |
| crossref_result = crossref_fetcher.search_by_doi(entry.doi) |
| if crossref_result: |
| result = comparator.compare_with_crossref(entry, crossref_result) |
| |
| elif step.name == "semantic_scholar" and entry.title and semantic_scholar_fetcher: |
| ss_result = None |
| if entry.doi: |
| ss_result = semantic_scholar_fetcher.fetch_by_doi(entry.doi) |
| if not ss_result: |
| ss_result = semantic_scholar_fetcher.search_by_title(entry.title) |
| if ss_result: |
| result = comparator.compare_with_semantic_scholar(entry, ss_result) |
| |
| elif step.name == "dblp" and entry.title and dblp_fetcher: |
| dblp_result = dblp_fetcher.search_by_title(entry.title) |
| if dblp_result: |
| result = comparator.compare_with_dblp(entry, dblp_result) |
| |
| elif step.name == "openalex" and entry.title and openalex_fetcher: |
| oa_result = None |
| if entry.doi: |
| oa_result = openalex_fetcher.fetch_by_doi(entry.doi) |
| if not oa_result: |
| oa_result = openalex_fetcher.search_by_title(entry.title) |
| if oa_result: |
| result = comparator.compare_with_openalex(entry, oa_result) |
| |
| elif step.name == "arxiv_title" and entry.title and arxiv_fetcher: |
| results = arxiv_fetcher.search_by_title(entry.title, max_results=3) |
| if results: |
| best_result = None |
| best_sim = 0.0 |
| norm1 = TextNormalizer.normalize_for_comparison(entry.title) |
| |
| for r in results: |
| norm2 = TextNormalizer.normalize_for_comparison(r.title) |
| sim = TextNormalizer.similarity_ratio(norm1, norm2) |
| if sim > best_sim: |
| best_sim = sim |
| best_result = r |
| |
| if best_result and best_sim > 0.5: |
| result = comparator.compare_with_arxiv(entry, best_result) |
| |
| elif step.name == "crossref_title" and entry.title and crossref_fetcher: |
| crossref_result = crossref_fetcher.search_by_title(entry.title) |
| if crossref_result: |
| result = comparator.compare_with_crossref(entry, crossref_result) |
| |
| elif step.name == "google_scholar" and entry.title and scholar_fetcher: |
| scholar_result = scholar_fetcher.search_by_title(entry.title) |
| if scholar_result: |
| result = comparator.compare_with_scholar(entry, scholar_result) |
| |
| if result: |
| all_results.append(result) |
| if result.is_match: |
| return result |
| |
| if all_results: |
| all_results.sort(key=lambda r: r.confidence, reverse=True) |
| return all_results[0] |
| |
| return comparator.create_unable_result(entry, "Unable to find this paper in any data source") |
|
|
|
|
| def get_abstract(entry, comparison_result, arxiv_fetcher): |
| """Get abstract for an entry from various sources.""" |
| if entry.abstract: |
| return entry.abstract |
| |
| if entry.has_arxiv and arxiv_fetcher: |
| arxiv_meta = arxiv_fetcher.fetch_by_id(entry.arxiv_id) |
| if arxiv_meta and arxiv_meta.abstract: |
| return arxiv_meta.abstract |
| |
| if entry.title and arxiv_fetcher: |
| results = arxiv_fetcher.search_by_title(entry.title, max_results=1) |
| if results and results[0].abstract: |
| return results[0].abstract |
| |
| return "" |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|