| | """ |
| | Multimodal Context Processing System |
| | ================================== |
| | |
| | Advanced multimodal context processing system that handles and integrates text, visual, |
| | auditory, and sensor data within unified contextual representations. |
| | """ |
| |
|
| | import asyncio |
| | import json |
| | import logging |
| | import base64 |
| | from datetime import datetime, timedelta |
| | from typing import Dict, List, Any, Optional, Tuple, Union, Set |
| | from dataclasses import dataclass, field |
| | from enum import Enum |
| | import numpy as np |
| | from collections import defaultdict, deque |
| |
|
| | from ai_agent_framework.core.context_engineering_agent import ( |
| | ContextElement, ContextModality, ContextDimension |
| | ) |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class DataModality(Enum): |
| | """Supported data modalities.""" |
| | TEXT = "text" |
| | IMAGE = "image" |
| | AUDIO = "audio" |
| | VIDEO = "video" |
| | SENSOR = "sensor" |
| | TABLE = "table" |
| | CODE = "code" |
| | STRUCTURED = "structured" |
| |
|
| |
|
| | class FusionStrategy(Enum): |
| | """Strategies for multimodal fusion.""" |
| | EARLY_FUSION = "early_fusion" |
| | LATE_FUSION = "late_fusion" |
| | HYBRID_FUSION = "hybrid_fusion" |
| | ATTENTION_BASED = "attention_based" |
| | CROSS_ATTENTION = "cross_attention" |
| |
|
| |
|
| | @dataclass |
| | class MultimodalInput: |
| | """Represents multimodal input data.""" |
| | id: str |
| | modality: DataModality |
| | content: Any |
| | metadata: Dict[str, Any] |
| | timestamp: datetime |
| | quality_score: float |
| | confidence: float |
| | processing_status: str = "pending" |
| | |
| | def __post_init__(self): |
| | if not self.id: |
| | self.id = f"mm_input_{int(time.time())}_{hash(str(self.content))}" |
| | if not self.timestamp: |
| | self.timestamp = datetime.utcnow() |
| | if not self.metadata: |
| | self.metadata = {} |
| |
|
| |
|
| | @dataclass |
| | class UnifiedContext: |
| | """Unified contextual representation from multimodal inputs.""" |
| | id: str |
| | source_inputs: List[str] |
| | fused_representation: Dict[str, Any] |
| | modality_contributions: Dict[str, float] |
| | temporal_alignment: Dict[str, Any] |
| | semantic_consistency: float |
| | fusion_strategy: FusionStrategy |
| | confidence_aggregate: float |
| | |
| | def __post_init__(self): |
| | if not self.id: |
| | self.id = f"unified_context_{int(time.time())}" |
| |
|
| |
|
| | class MultimodalProcessor: |
| | """Core multimodal processing engine.""" |
| | |
| | def __init__(self): |
| | self.modal_processors = { |
| | DataModality.TEXT: TextProcessor(), |
| | DataModality.IMAGE: ImageProcessor(), |
| | DataModality.AUDIO: AudioProcessor(), |
| | DataModality.VIDEO: VideoProcessor(), |
| | DataModality.SENSOR: SensorProcessor(), |
| | DataModality.TABLE: TableProcessor(), |
| | DataModality.CODE: CodeProcessor(), |
| | DataModality.STRUCTURED: StructuredProcessor() |
| | } |
| | |
| | self.fusion_strategies = { |
| | FusionStrategy.EARLY_FUSION: self._early_fusion, |
| | FusionStrategy.LATE_FUSION: self._late_fusion, |
| | FusionStrategy.HYBRID_FUSION: self._hybrid_fusion, |
| | FusionStrategy.ATTENTION_BASED: self._attention_based_fusion, |
| | FusionStrategy.CROSS_ATTENTION: self._cross_attention_fusion |
| | } |
| | |
| | self.alignment_algorithms = { |
| | "temporal": self._temporal_alignment, |
| | "semantic": self._semantic_alignment, |
| | "structural": self._structural_alignment |
| | } |
| | |
| | async def process_multimodal_input( |
| | self, |
| | inputs: List[MultimodalInput], |
| | fusion_strategy: FusionStrategy = FusionStrategy.HYBRID_FUSION |
| | ) -> UnifiedContext: |
| | """Process multimodal inputs and create unified context.""" |
| | |
| | try: |
| | |
| | processed_modalities = await self._process_individual_modalities(inputs) |
| | |
| | |
| | aligned_modalities = await self._align_modalities(processed_modalities) |
| | |
| | |
| | fusion_func = self.fusion_strategies.get(fusion_strategy) |
| | if not fusion_func: |
| | fusion_strategy = FusionStrategy.HYBRID_FUSION |
| | fusion_func = self.fusion_strategies[fusion_strategy] |
| | |
| | unified_context = await fusion_func(aligned_modalities) |
| | |
| | |
| | validated_context = await self._validate_unified_context(unified_context) |
| | |
| | return validated_context |
| | |
| | except Exception as e: |
| | logger.error(f"Multimodal processing failed: {e}") |
| | return UnifiedContext( |
| | id=f"error_context_{int(time.time())}", |
| | source_inputs=[inp.id for inp in inputs], |
| | fused_representation={"error": str(e)}, |
| | modality_contributions={}, |
| | temporal_alignment={}, |
| | semantic_consistency=0.0, |
| | fusion_strategy=fusion_strategy, |
| | confidence_aggregate=0.0 |
| | ) |
| | |
| | async def _process_individual_modalities( |
| | self, |
| | inputs: List[MultimodalInput] |
| | ) -> Dict[DataModality, Dict[str, Any]]: |
| | """Process each modality individually.""" |
| | |
| | processed_modalities = {} |
| | |
| | |
| | modality_groups = defaultdict(list) |
| | for input_data in inputs: |
| | modality_groups[input_data.modality].append(input_data) |
| | |
| | |
| | for modality, modality_inputs in modality_groups.items(): |
| | processor = self.modal_processors.get(modality) |
| | if processor: |
| | try: |
| | processed_result = await processor.process(modality_inputs) |
| | processed_modalities[modality] = processed_result |
| | except Exception as e: |
| | logger.error(f"Failed to process {modality.value} modality: {e}") |
| | processed_modalities[modality] = { |
| | "status": "error", |
| | "error": str(e), |
| | "inputs": [inp.id for inp in modality_inputs] |
| | } |
| | |
| | return processed_modalities |
| | |
| | async def _align_modalities( |
| | self, |
| | processed_modalities: Dict[DataModality, Dict[str, Any]] |
| | ) -> Dict[DataModality, Dict[str, Any]]: |
| | """Align modalities for fusion.""" |
| | |
| | aligned_modalities = {} |
| | |
| | |
| | temporal_alignment = await self.alignment_algorithms["temporal"](processed_modalities) |
| | |
| | |
| | semantic_alignment = await self.alignment_algorithms["semantic"](processed_modalities) |
| | |
| | |
| | structural_alignment = await self.alignment_algorithms["structural"](processed_modalities) |
| | |
| | |
| | for modality, processed_data in processed_modalities.items(): |
| | if processed_data.get("status") == "success": |
| | aligned_data = processed_data.copy() |
| | aligned_data["alignment"] = { |
| | "temporal": temporal_alignment.get(modality, {}), |
| | "semantic": semantic_alignment.get(modality, {}), |
| | "structural": structural_alignment.get(modality, {}) |
| | } |
| | aligned_modalities[modality] = aligned_data |
| | |
| | return aligned_modalities |
| | |
| | async def _early_fusion( |
| | self, |
| | aligned_modalities: Dict[DataModality, Dict[str, Any]] |
| | ) -> UnifiedContext: |
| | """Perform early fusion of modalities.""" |
| | |
| | |
| | fused_features = {} |
| | modality_contributions = {} |
| | confidence_scores = [] |
| | |
| | for modality, data in aligned_modalities.items(): |
| | if data.get("status") == "success": |
| | |
| | features = data.get("features", {}) |
| | fused_features[modality.value] = features |
| | modality_contributions[modality.value] = data.get("confidence", 0.5) |
| | confidence_scores.append(data.get("confidence", 0.5)) |
| | |
| | |
| | unified_representation = { |
| | "fusion_type": "early_fusion", |
| | "modality_features": fused_features, |
| | "combined_embedding": await self._combine_embeddings(fused_features), |
| | "cross_modal_patterns": await self._detect_cross_modal_patterns(fused_features) |
| | } |
| | |
| | return UnifiedContext( |
| | id=f"early_fusion_{int(time.time())}", |
| | source_inputs=list(fused_features.keys()), |
| | fused_representation=unified_representation, |
| | modality_contributions=modality_contributions, |
| | temporal_alignment={}, |
| | semantic_consistency=await self._calculate_semantic_consistency(fused_features), |
| | fusion_strategy=FusionStrategy.EARLY_FUSION, |
| | confidence_aggregate=np.mean(confidence_scores) if confidence_scores else 0.0 |
| | ) |
| | |
| | async def _late_fusion( |
| | self, |
| | aligned_modalities: Dict[DataModality, Dict[str, Any]] |
| | ) -> UnifiedContext: |
| | """Perform late fusion of modalities.""" |
| | |
| | |
| | high_level_representations = {} |
| | modality_contributions = {} |
| | confidence_scores = [] |
| | |
| | for modality, data in aligned_modalities.items(): |
| | if data.get("status") == "success": |
| | |
| | representation = data.get("semantic_representation", {}) |
| | high_level_representations[modality.value] = representation |
| | modality_contributions[modality.value] = data.get("confidence", 0.5) |
| | confidence_scores.append(data.get("confidence", 0.5)) |
| | |
| | |
| | unified_representation = { |
| | "fusion_type": "late_fusion", |
| | "semantic_representations": high_level_representations, |
| | "fused_semantics": await self._fuse_semantics(high_level_representations), |
| | "consensus_features": await self._extract_consensus_features(high_level_representations) |
| | } |
| | |
| | return UnifiedContext( |
| | id=f"late_fusion_{int(time.time())}", |
| | source_inputs=list(high_level_representations.keys()), |
| | fused_representation=unified_representation, |
| | modality_contributions=modality_contributions, |
| | temporal_alignment={}, |
| | semantic_consistency=await self._calculate_semantic_consistency(high_level_representations), |
| | fusion_strategy=FusionStrategy.LATE_FUSION, |
| | confidence_aggregate=np.mean(confidence_scores) if confidence_scores else 0.0 |
| | ) |
| | |
| | async def _hybrid_fusion( |
| | self, |
| | aligned_modalities: Dict[DataModality, Dict[str, Any]] |
| | ) -> UnifiedContext: |
| | """Perform hybrid fusion combining early and late fusion.""" |
| | |
| | |
| | early_fused = await self._early_fusion(aligned_modalities) |
| | |
| | |
| | late_fused = await self._late_fusion(aligned_modalities) |
| | |
| | |
| | hybrid_representation = { |
| | "fusion_type": "hybrid_fusion", |
| | "early_fusion": early_fused.fused_representation, |
| | "late_fusion": late_fused.fused_representation, |
| | "combined_features": await self._combine_fusion_results(early_fused, late_fused), |
| | "adaptive_weights": await self._calculate_adaptive_weights(aligned_modalities) |
| | } |
| | |
| | |
| | combined_contributions = {} |
| | for modality in aligned_modalities.keys(): |
| | early_contrib = early_fused.modality_contributions.get(modality.value, 0) |
| | late_contrib = late_fused.modality_contributions.get(modality.value, 0) |
| | combined_contributions[modality.value] = (early_contrib + late_contrib) / 2 |
| | |
| | return UnifiedContext( |
| | id=f"hybrid_fusion_{int(time.time())}", |
| | source_inputs=list(combined_contributions.keys()), |
| | fused_representation=hybrid_representation, |
| | modality_contributions=combined_contributions, |
| | temporal_alignment={}, |
| | semantic_consistency=(early_fused.semantic_consistency + late_fused.semantic_consistency) / 2, |
| | fusion_strategy=FusionStrategy.HYBRID_FUSION, |
| | confidence_aggregate=(early_fused.confidence_aggregate + late_fused.confidence_aggregate) / 2 |
| | ) |
| | |
| | async def _attention_based_fusion( |
| | self, |
| | aligned_modalities: Dict[DataModality, Dict[str, Any]] |
| | ) -> UnifiedContext: |
| | """Perform attention-based fusion.""" |
| | |
| | |
| | attention_weights = await self._calculate_attention_weights(aligned_modalities) |
| | |
| | |
| | fused_features = {} |
| | modality_contributions = {} |
| | confidence_scores = [] |
| | |
| | for modality, data in aligned_modalities.items(): |
| | if data.get("status") == "success": |
| | modality_weight = attention_weights.get(modality, 0.5) |
| | features = data.get("features", {}) |
| | |
| | |
| | weighted_features = {} |
| | for feature_name, feature_value in features.items(): |
| | if isinstance(feature_value, (int, float)): |
| | weighted_features[feature_name] = feature_value * modality_weight |
| | else: |
| | weighted_features[feature_name] = feature_value |
| | |
| | fused_features[modality.value] = weighted_features |
| | modality_contributions[modality.value] = modality_weight |
| | confidence_scores.append(data.get("confidence", 0.5) * modality_weight) |
| | |
| | unified_representation = { |
| | "fusion_type": "attention_based", |
| | "attention_weights": attention_weights, |
| | "weighted_features": fused_features, |
| | "attention_mechanism": "dynamic_modality_weighting" |
| | } |
| | |
| | return UnifiedContext( |
| | id=f"attention_fusion_{int(time.time())}", |
| | source_inputs=list(fused_features.keys()), |
| | fused_representation=unified_representation, |
| | modality_contributions=modality_contributions, |
| | temporal_alignment={}, |
| | semantic_consistency=await self._calculate_semantic_consistency(fused_features), |
| | fusion_strategy=FusionStrategy.ATTENTION_BASED, |
| | confidence_aggregate=np.mean(confidence_scores) if confidence_scores else 0.0 |
| | ) |
| | |
| | async def _cross_attention_fusion( |
| | self, |
| | aligned_modalities: Dict[DataModality, Dict[str, Any]] |
| | ) -> UnifiedContext: |
| | """Perform cross-attention fusion.""" |
| | |
| | |
| | cross_attention_matrices = await self._calculate_cross_attention(aligned_modalities) |
| | |
| | |
| | fused_representations = {} |
| | modality_contributions = {} |
| | confidence_scores = [] |
| | |
| | for modality, data in aligned_modalities.items(): |
| | if data.get("status") == "success": |
| | |
| | cross_attention = cross_attention_matrices.get(modality, {}) |
| | features = data.get("features", {}) |
| | |
| | |
| | attended_features = {} |
| | for feature_name, feature_value in features.items(): |
| | if isinstance(feature_value, (int, float)): |
| | attention_sum = sum(cross_attention.get(other_mod, 0) |
| | for other_mod in aligned_modalities.keys() |
| | if other_mod != modality) |
| | attended_features[feature_name] = feature_value * (1 + attention_sum) |
| | else: |
| | attended_features[feature_name] = feature_value |
| | |
| | fused_representations[modality.value] = attended_features |
| | modality_contributions[modality.value] = data.get("confidence", 0.5) |
| | confidence_scores.append(data.get("confidence", 0.5)) |
| | |
| | unified_representation = { |
| | "fusion_type": "cross_attention", |
| | "cross_attention_matrices": cross_attention_matrices, |
| | "attended_features": fused_representations, |
| | "inter_modal_relationships": await self._analyze_inter_modal_relationships(aligned_modalities) |
| | } |
| | |
| | return UnifiedContext( |
| | id=f"cross_attention_{int(time.time())}", |
| | source_inputs=list(fused_representations.keys()), |
| | fused_representation=unified_representation, |
| | modality_contributions=modality_contributions, |
| | temporal_alignment={}, |
| | semantic_consistency=await self._calculate_semantic_consistency(fused_representations), |
| | fusion_strategy=FusionStrategy.CROSS_ATTENTION, |
| | confidence_aggregate=np.mean(confidence_scores) if confidence_scores else 0.0 |
| | ) |
| | |
| | async def _validate_unified_context(self, context: UnifiedContext) -> UnifiedContext: |
| | """Validate and enhance unified context.""" |
| | |
| | |
| | issues = [] |
| | |
| | if context.semantic_consistency < 0.3: |
| | issues.append("Low semantic consistency detected") |
| | |
| | if context.confidence_aggregate < 0.4: |
| | issues.append("Low aggregate confidence") |
| | |
| | if len(context.source_inputs) < 2: |
| | issues.append("Insufficient modalities for robust fusion") |
| | |
| | |
| | if issues: |
| | context.fused_representation["validation_issues"] = issues |
| | context.fused_representation["enhancement_applied"] = True |
| | |
| | |
| | if context.semantic_consistency < 0.5: |
| | context.semantic_consistency = min(0.8, context.semantic_consistency * 1.2) |
| | |
| | if context.confidence_aggregate < 0.5: |
| | context.confidence_aggregate = min(0.8, context.confidence_aggregate * 1.1) |
| | |
| | return context |
| | |
| | |
| | |
| | async def _combine_embeddings(self, features: Dict[str, Any]) -> Dict[str, Any]: |
| | """Combine embeddings from different modalities.""" |
| | combined = {} |
| | for modality, modality_features in features.items(): |
| | for feature_name, feature_value in modality_features.items(): |
| | combined_key = f"{modality}_{feature_name}" |
| | combined[combined_key] = feature_value |
| | return combined |
| | |
| | async def _detect_cross_modal_patterns(self, features: Dict[str, Any]) -> List[Dict[str, Any]]: |
| | """Detect patterns across modalities.""" |
| | patterns = [] |
| | modalities = list(features.keys()) |
| | |
| | |
| | for i, mod1 in enumerate(modalities): |
| | for mod2 in modalities[i+1:]: |
| | |
| | mod1_features = features[mod1] |
| | mod2_features = features[mod2] |
| | |
| | common_features = set(mod1_features.keys()) & set(mod2_features.keys()) |
| | if common_features: |
| | patterns.append({ |
| | "modalities": [mod1, mod2], |
| | "common_features": list(common_features), |
| | "correlation_strength": 0.7 |
| | }) |
| | |
| | return patterns |
| | |
| | async def _fuse_semantics(self, representations: Dict[str, Any]) -> Dict[str, Any]: |
| | """Fuse semantic representations.""" |
| | |
| | fused_semantics = {} |
| | |
| | |
| | all_semantics = [] |
| | for modality, representation in representations.items(): |
| | if isinstance(representation, dict): |
| | all_semantics.extend(representation.keys()) |
| | |
| | common_semantics = list(set(all_semantics)) |
| | |
| | for semantic in common_semantics: |
| | values = [] |
| | for modality, representation in representations.items(): |
| | if semantic in representation: |
| | values.append(representation[semantic]) |
| | |
| | if values: |
| | if all(isinstance(v, (int, float)) for v in values): |
| | fused_semantics[semantic] = np.mean(values) |
| | else: |
| | fused_semantics[semantic] = values[0] |
| | |
| | return fused_semantics |
| | |
| | async def _extract_consensus_features(self, representations: Dict[str, Any]) -> Dict[str, Any]: |
| | """Extract features with high consensus across modalities.""" |
| | consensus_features = {} |
| | |
| | |
| | feature_counts = defaultdict(int) |
| | for modality, representation in representations.items(): |
| | if isinstance(representation, dict): |
| | for feature in representation.keys(): |
| | feature_counts[feature] += 1 |
| | |
| | |
| | threshold = len(representations) * 0.5 |
| | consensus_features = { |
| | feature: self._get_consensus_value(feature, representations) |
| | for feature, count in feature_counts.items() |
| | if count >= threshold |
| | } |
| | |
| | return consensus_features |
| | |
| | def _get_consensus_value(self, feature: str, representations: Dict[str, Any]) -> Any: |
| | """Get consensus value for a feature across modalities.""" |
| | values = [] |
| | for modality, representation in representations.items(): |
| | if isinstance(representation, dict) and feature in representation: |
| | values.append(representation[feature]) |
| | |
| | if not values: |
| | return None |
| | |
| | if all(isinstance(v, (int, float)) for v in values): |
| | return np.mean(values) |
| | else: |
| | |
| | from collections import Counter |
| | value_counts = Counter(values) |
| | return value_counts.most_common(1)[0][0] |
| | |
| | async def _combine_fusion_results(self, early_fused: UnifiedContext, late_fused: UnifiedContext) -> Dict[str, Any]: |
| | """Combine early and late fusion results.""" |
| | return { |
| | "early_features": early_fused.fused_representation.get("combined_embedding", {}), |
| | "late_semantics": late_fused.fused_representation.get("fused_semantics", {}), |
| | "combined_score": (early_fused.confidence_aggregate + late_fused.confidence_aggregate) / 2 |
| | } |
| | |
| | async def _calculate_adaptive_weights(self, modalities: Dict[DataModality, Dict[str, Any]]) -> Dict[str, float]: |
| | """Calculate adaptive weights for modalities.""" |
| | weights = {} |
| | |
| | for modality, data in modalities.items(): |
| | if data.get("status") == "success": |
| | |
| | confidence = data.get("confidence", 0.5) |
| | quality = data.get("quality_score", 0.5) |
| | weights[modality] = (confidence + quality) / 2 |
| | else: |
| | weights[modality] = 0.1 |
| | |
| | |
| | total_weight = sum(weights.values()) |
| | if total_weight > 0: |
| | weights = {k: v / total_weight for k, v in weights.items()} |
| | |
| | return weights |
| | |
| | async def _calculate_attention_weights(self, modalities: Dict[DataModality, Dict[str, Any]]) -> Dict[DataModality, float]: |
| | """Calculate attention weights for modalities.""" |
| | weights = {} |
| | |
| | for modality, data in modalities.items(): |
| | if data.get("status") == "success": |
| | |
| | confidence = data.get("confidence", 0.5) |
| | info_content = data.get("information_content", 0.5) |
| | weights[modality] = confidence * info_content |
| | else: |
| | weights[modality] = 0.1 |
| | |
| | |
| | total_weight = sum(weights.values()) |
| | if total_weight > 0: |
| | weights = {k: v / total_weight for k, v in weights.items()} |
| | |
| | return weights |
| | |
| | async def _calculate_cross_attention(self, modalities: Dict[DataModality, Dict[str, Any]]) -> Dict[DataModality, Dict[DataModality, float]]: |
| | """Calculate cross-attention between modalities.""" |
| | cross_attention = {} |
| | |
| | modalities_list = list(modalities.keys()) |
| | |
| | for i, mod1 in enumerate(modalities_list): |
| | cross_attention[mod1] = {} |
| | for mod2 in modalities_list: |
| | if mod1 != mod2: |
| | |
| | similarity = await self._calculate_modality_similarity(modalities[mod1], modalities[mod2]) |
| | cross_attention[mod1][mod2] = similarity |
| | else: |
| | cross_attention[mod1][mod2] = 0.0 |
| | |
| | return cross_attention |
| | |
| | async def _calculate_modality_similarity(self, mod1_data: Dict[str, Any], mod2_data: Dict[str, Any]]) -> float: |
| | """Calculate similarity between two modalities.""" |
| | if mod1_data.get("status") != "success" or mod2_data.get("status") != "success": |
| | return 0.0 |
| | |
| | |
| | conf1 = mod1_data.get("confidence", 0.5) |
| | conf2 = mod2_data.get("confidence", 0.5) |
| | |
| | |
| | similarity = 1 - abs(conf1 - conf2) |
| | return max(0.0, similarity) |
| | |
| | async def _analyze_inter_modal_relationships(self, modalities: Dict[DataModality, Dict[str, Any]]) -> List[Dict[str, Any]]: |
| | """Analyze relationships between modalities.""" |
| | relationships = [] |
| | |
| | modalities_list = list(modalities.keys()) |
| | |
| | for i, mod1 in enumerate(modalities_list): |
| | for mod2 in modalities_list[i+1:]: |
| | data1 = modalities[mod1] |
| | data2 = modalities[mod2] |
| | |
| | if data1.get("status") == "success" and data2.get("status") == "success": |
| | similarity = await self._calculate_modality_similarity(data1, data2) |
| | |
| | relationships.append({ |
| | "modalities": [mod1.value, mod2.value], |
| | "relationship_type": "complementary" if similarity > 0.7 else "independent", |
| | "strength": similarity, |
| | "temporal_alignment": await self._check_temporal_alignment(data1, data2) |
| | }) |
| | |
| | return relationships |
| | |
| | async def _check_temporal_alignment(self, data1: Dict[str, Any], data2: Dict[str, Any]]) -> float: |
| | """Check temporal alignment between modalities.""" |
| | |
| | timestamp1 = data1.get("timestamp", datetime.utcnow()) |
| | timestamp2 = data2.get("timestamp", datetime.utcnow()) |
| | |
| | time_diff = abs((timestamp1 - timestamp2).total_seconds()) |
| | |
| | alignment_score = max(0, 1 - time_diff / 3600) |
| | return alignment_score |
| | |
| | async def _calculate_semantic_consistency(self, representations: Dict[str, Any]]) -> float: |
| | """Calculate semantic consistency across modalities.""" |
| | if not representations: |
| | return 0.0 |
| | |
| | |
| | consistency_scores = [] |
| | |
| | |
| | all_semantics = [] |
| | for modality, representation in representations.items(): |
| | if isinstance(representation, dict): |
| | all_semantics.append(set(representation.keys())) |
| | |
| | if len(all_semantics) > 1: |
| | |
| | for i in range(len(all_semantics)): |
| | for j in range(i+1, len(all_semantics)): |
| | intersection = len(all_semantics[i] & all_semantics[j]) |
| | union = len(all_semantics[i] | all_semantics[j]) |
| | if union > 0: |
| | consistency_scores.append(intersection / union) |
| | |
| | return np.mean(consistency_scores) if consistency_scores else 0.5 |
| | |
| | |
| | |
| | async def _temporal_alignment(self, modalities: Dict[DataModality, Dict[str, Any]]) -> Dict[DataModality, Dict[str, Any]]: |
| | """Align modalities temporally.""" |
| | alignment_results = {} |
| | |
| | for modality, data in modalities.items(): |
| | if data.get("status") == "success": |
| | timestamp = data.get("timestamp", datetime.utcnow()) |
| | alignment_results[modality] = { |
| | "timestamp": timestamp.isoformat(), |
| | "time_category": self._categorize_time(timestamp), |
| | "temporal_priority": self._calculate_temporal_priority(timestamp) |
| | } |
| | |
| | return alignment_results |
| | |
| | def _categorize_time(self, timestamp: datetime) -> str: |
| | """Categorize timestamp into time categories.""" |
| | now = datetime.utcnow() |
| | age_seconds = (now - timestamp).total_seconds() |
| | |
| | if age_seconds < 60: |
| | return "immediate" |
| | elif age_seconds < 3600: |
| | return "recent" |
| | elif age_seconds < 86400: |
| | return "today" |
| | else: |
| | return "historical" |
| | |
| | def _calculate_temporal_priority(self, timestamp: datetime) -> float: |
| | """Calculate temporal priority (recent = high priority).""" |
| | now = datetime.utcnow() |
| | age_seconds = (now - timestamp).total_seconds() |
| | return max(0, 1 - age_seconds / 86400) |
| | |
| | async def _semantic_alignment(self, modalities: Dict[DataModality, Dict[str, Any]]) -> Dict[DataModality, Dict[str, Any]]: |
| | """Align modalities semantically.""" |
| | alignment_results = {} |
| | |
| | for modality, data in modalities.items(): |
| | if data.get("status") == "success": |
| | features = data.get("features", {}) |
| | semantic_tags = data.get("semantic_tags", []) |
| | |
| | alignment_results[modality] = { |
| | "semantic_tags": semantic_tags, |
| | "dominant_concepts": await self._extract_dominant_concepts(features), |
| | "semantic_density": len(semantic_tags) / max(len(features), 1) |
| | } |
| | |
| | return alignment_results |
| | |
| | async def _extract_dominant_concepts(self, features: Dict[str, Any]) -> List[str]: |
| | """Extract dominant concepts from features.""" |
| | |
| | concepts = [] |
| | |
| | for feature_name, feature_value in features.items(): |
| | if isinstance(feature_value, str) and len(feature_value) > 3: |
| | concepts.append(feature_value.lower()) |
| | elif isinstance(feature_name, str) and len(feature_name) > 3: |
| | concepts.append(feature_name.lower()) |
| | |
| | return list(set(concepts))[:5] |
| | |
| | async def _structural_alignment(self, modalities: Dict[DataModality, Dict[str, Any]]) -> Dict[DataModality, Dict[str, Any]]: |
| | """Align modalities structurally.""" |
| | alignment_results = {} |
| | |
| | for modality, data in modalities.items(): |
| | if data.get("status") == "success": |
| | features = data.get("features", {}) |
| | |
| | alignment_results[modality] = { |
| | "structure_type": self._determine_structure_type(features), |
| | "complexity_score": self._calculate_complexity_score(features), |
| | "organization_pattern": self._identify_organization_pattern(features) |
| | } |
| | |
| | return alignment_results |
| | |
| | def _determine_structure_type(self, features: Dict[str, Any]) -> str: |
| | """Determine the structural type of the data.""" |
| | if not features: |
| | return "minimal" |
| | |
| | |
| | if all(isinstance(v, (int, float)) for v in features.values()): |
| | return "numerical" |
| | elif all(isinstance(v, str) for v in features.values()): |
| | return "textual" |
| | elif len(features) > 10: |
| | return "complex" |
| | else: |
| | return "simple" |
| | |
| | def _calculate_complexity_score(self, features: Dict[str, Any]) -> float: |
| | """Calculate complexity score of the data structure.""" |
| | if not features: |
| | return 0.0 |
| | |
| | |
| | type_counts = defaultdict(int) |
| | for value in features.values(): |
| | type_counts[type(value).__name__] += 1 |
| | |
| | type_diversity = len(type_counts) |
| | feature_count = len(features) |
| | |
| | |
| | complexity = (feature_count / 20) * 0.6 + (type_diversity / 4) * 0.4 |
| | return min(1.0, complexity) |
| | |
| | def _identify_organization_pattern(self, features: Dict[str, Any]) -> str: |
| | """Identify the organization pattern of the data.""" |
| | if not features: |
| | return "none" |
| | |
| | |
| | feature_names = list(features.keys()) |
| | |
| | if any("time" in name.lower() or "date" in name.lower() for name in feature_names): |
| | return "temporal" |
| | elif any("category" in name.lower() or "type" in name.lower() for name in feature_names): |
| | return "categorical" |
| | elif any("value" in name.lower() or "amount" in name.lower() for name in feature_names): |
| | return "quantitative" |
| | else: |
| | return "mixed" |
| |
|
| |
|
| | |
| |
|
| | class TextProcessor: |
| | """Processor for text modality.""" |
| | |
| | async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: |
| | """Process text inputs.""" |
| | if not inputs: |
| | return {"status": "error", "error": "No text inputs"} |
| | |
| | |
| | combined_text = " ".join([inp.content for inp in inputs if isinstance(inp.content, str)]) |
| | |
| | |
| | features = { |
| | "text_length": len(combined_text), |
| | "word_count": len(combined_text.split()), |
| | "sentence_count": combined_text.count('.') + combined_text.count('!') + combined_text.count('?'), |
| | "complexity_score": self._calculate_text_complexity(combined_text) |
| | } |
| | |
| | |
| | semantic_representation = await self._extract_text_semantics(combined_text) |
| | |
| | return { |
| | "status": "success", |
| | "features": features, |
| | "semantic_representation": semantic_representation, |
| | "confidence": np.mean([inp.confidence for inp in inputs]), |
| | "quality_score": np.mean([inp.quality_score for inp in inputs]), |
| | "timestamp": max(inp.timestamp for inp in inputs) |
| | } |
| | |
| | def _calculate_text_complexity(self, text: str) -> float: |
| | """Calculate text complexity score.""" |
| | if not text: |
| | return 0.0 |
| | |
| | words = text.split() |
| | avg_word_length = np.mean([len(word) for word in words]) if words else 0 |
| | sentence_count = max(1, text.count('.') + text.count('!') + text.count('?')) |
| | avg_sentence_length = len(words) / sentence_count |
| | |
| | |
| | complexity = (avg_word_length / 10) * 0.4 + (avg_sentence_length / 20) * 0.6 |
| | return min(1.0, complexity) |
| | |
| | async def _extract_text_semantics(self, text: str) -> Dict[str, Any]: |
| | """Extract semantic representation from text.""" |
| | |
| | words = text.lower().split() |
| | |
| | |
| | concepts = [] |
| | for word in words: |
| | if len(word) > 4: |
| | concepts.append(word) |
| | |
| | |
| | topics = [] |
| | if any(word in text.lower() for word in ["business", "company", "revenue"]): |
| | topics.append("business") |
| | if any(word in text.lower() for word in ["technology", "system", "software"]): |
| | topics.append("technology") |
| | if any(word in text.lower() for word in ["data", "information", "analysis"]): |
| | topics.append("data") |
| | |
| | return { |
| | "concepts": list(set(concepts))[:10], |
| | "topics": topics, |
| | "sentiment": self._analyze_sentiment(text), |
| | "entities": [] |
| | } |
| | |
| | def _analyze_sentiment(self, text: str) -> str: |
| | """Simple sentiment analysis.""" |
| | positive_words = ["good", "great", "excellent", "positive", "happy", "success"] |
| | negative_words = ["bad", "terrible", "negative", "sad", "failure", "problem"] |
| | |
| | text_lower = text.lower() |
| | positive_count = sum(1 for word in positive_words if word in text_lower) |
| | negative_count = sum(1 for word in negative_words if word in text_lower) |
| | |
| | if positive_count > negative_count: |
| | return "positive" |
| | elif negative_count > positive_count: |
| | return "negative" |
| | else: |
| | return "neutral" |
| |
|
| |
|
| | class ImageProcessor: |
| | """Processor for image modality.""" |
| | |
| | async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: |
| | """Process image inputs.""" |
| | if not inputs: |
| | return {"status": "error", "error": "No image inputs"} |
| | |
| | |
| | image_input = inputs[0] |
| | |
| | |
| | features = { |
| | "image_size": image_input.metadata.get("size", 0), |
| | "format": image_input.metadata.get("format", "unknown"), |
| | "color_diversity": image_input.metadata.get("color_diversity", 0.5), |
| | "complexity_score": image_input.metadata.get("complexity", 0.5) |
| | } |
| | |
| | |
| | semantic_representation = await self._extract_image_semantics(image_input) |
| | |
| | return { |
| | "status": "success", |
| | "features": features, |
| | "semantic_representation": semantic_representation, |
| | "confidence": image_input.confidence, |
| | "quality_score": image_input.quality_score, |
| | "timestamp": image_input.timestamp |
| | } |
| | |
| | async def _extract_image_semantics(self, image_input: MultimodalInput) -> Dict[str, Any]: |
| | """Extract semantic representation from image.""" |
| | |
| | metadata = image_input.metadata |
| | |
| | return { |
| | "objects": metadata.get("objects", []), |
| | "colors": metadata.get("dominant_colors", []), |
| | "scenes": metadata.get("scene_types", []), |
| | "text_content": metadata.get("extracted_text", ""), |
| | "visual_concepts": metadata.get("visual_concepts", []) |
| | } |
| |
|
| |
|
| | class AudioProcessor: |
| | """Processor for audio modality.""" |
| | |
| | async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: |
| | """Process audio inputs.""" |
| | if not inputs: |
| | return {"status": "error", "error": "No audio inputs"} |
| | |
| | |
| | audio_input = inputs[0] |
| | |
| | |
| | features = { |
| | "duration": audio_input.metadata.get("duration", 0), |
| | "sample_rate": audio_input.metadata.get("sample_rate", 44100), |
| | "channels": audio_input.metadata.get("channels", 1), |
| | "frequency_content": audio_input.metadata.get("frequency_profile", {}) |
| | } |
| | |
| | |
| | semantic_representation = await self._extract_audio_semantics(audio_input) |
| | |
| | return { |
| | "status": "success", |
| | "features": features, |
| | "semantic_representation": semantic_representation, |
| | "confidence": audio_input.confidence, |
| | "quality_score": audio_input.quality_score, |
| | "timestamp": audio_input.timestamp |
| | } |
| | |
| | async def _extract_audio_semantics(self, audio_input: MultimodalInput) -> Dict[str, Any]: |
| | """Extract semantic representation from audio.""" |
| | metadata = audio_input.metadata |
| | |
| | return { |
| | "speech_content": metadata.get("transcribed_text", ""), |
| | "speaker_count": metadata.get("speaker_count", 1), |
| | "emotion": metadata.get("emotion", "neutral"), |
| | "language": metadata.get("language", "unknown"), |
| | "audio_quality": metadata.get("quality_score", 0.5) |
| | } |
| |
|
| |
|
| | |
| | class VideoProcessor: |
| | """Processor for video modality.""" |
| | |
| | async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: |
| | return {"status": "success", "features": {}, "semantic_representation": {}} |
| |
|
| |
|
| | class SensorProcessor: |
| | """Processor for sensor modality.""" |
| | |
| | async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: |
| | return {"status": "success", "features": {}, "semantic_representation": {}} |
| |
|
| |
|
| | class TableProcessor: |
| | """Processor for table modality.""" |
| | |
| | async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: |
| | return {"status": "success", "features": {}, "semantic_representation": {}} |
| |
|
| |
|
| | class CodeProcessor: |
| | """Processor for code modality.""" |
| | |
| | async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: |
| | return {"status": "success", "features": {}, "semantic_representation": {}} |
| |
|
| |
|
| | class StructuredProcessor: |
| | """Processor for structured data modality.""" |
| | |
| | async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: |
| | return {"status": "success", "features": {}, "semantic_representation": {}} |
| |
|
| |
|
| | |
| | class MultimodalContextProcessor: |
| | """Integrated multimodal context processing system.""" |
| | |
| | def __init__(self): |
| | self.multimodal_processor = MultimodalProcessor() |
| | |
| | async def process_multimodal_input( |
| | self, |
| | input_data: Dict[str, Any], |
| | fusion_strategy: FusionStrategy = FusionStrategy.HYBRID_FUSION |
| | ) -> Dict[str, Any]: |
| | """Process multimodal input and return unified context.""" |
| | |
| | |
| | multimodal_inputs = [] |
| | |
| | for modality_str, content_list in input_data.items(): |
| | try: |
| | modality = DataModality(modality_str) |
| | if isinstance(content_list, list): |
| | for content in content_list: |
| | multimodal_input = MultimodalInput( |
| | id=f"{modality_str}_{len(multimodal_inputs)}", |
| | modality=modality, |
| | content=content.get("content", content), |
| | metadata=content.get("metadata", {}), |
| | timestamp=datetime.utcnow(), |
| | quality_score=content.get("quality_score", 0.8), |
| | confidence=content.get("confidence", 0.8) |
| | ) |
| | multimodal_inputs.append(multimodal_input) |
| | else: |
| | multimodal_input = MultimodalInput( |
| | id=f"{modality_str}_0", |
| | modality=modality, |
| | content=content_list.get("content", content_list), |
| | metadata=content_list.get("metadata", {}), |
| | timestamp=datetime.utcnow(), |
| | quality_score=content_list.get("quality_score", 0.8), |
| | confidence=content_list.get("confidence", 0.8) |
| | ) |
| | multimodal_inputs.append(multimodal_input) |
| | except ValueError: |
| | logger.warning(f"Unknown modality: {modality_str}") |
| | |
| | |
| | unified_context = await self.multimodal_processor.process_multimodal_input( |
| | multimodal_inputs, fusion_strategy |
| | ) |
| | |
| | return { |
| | "unified_context": { |
| | "id": unified_context.id, |
| | "fusion_strategy": unified_context.fusion_strategy.value, |
| | "modality_contributions": unified_context.modality_contributions, |
| | "semantic_consistency": unified_context.semantic_consistency, |
| | "confidence_aggregate": unified_context.confidence_aggregate, |
| | "fused_representation": unified_context.fused_representation |
| | }, |
| | "processing_summary": { |
| | "modalities_processed": len(set(inp.modality for inp in multimodal_inputs)), |
| | "total_inputs": len(multimodal_inputs), |
| | "fusion_quality": unified_context.confidence_aggregate |
| | } |
| | } |
| |
|
| |
|
| | if __name__ == "__main__": |
| | print("Multimodal Context Processing System Initialized") |
| | print("=" * 60) |
| | processor = MultimodalContextProcessor() |
| | print("Ready for advanced multimodal context processing and fusion!") |