Spaces:
Running
Running
Blu3Orange
Add main application logic, memory management, and configuration for 12 Angry Agents
a9f964a
| """Memory summarization for context window management.""" | |
| from core.models import JurorMemory, ArgumentMemory | |
| class MemorySummarizer: | |
| """Compresses old deliberation history to manage context window.""" | |
| SUMMARY_INTERVAL = 5 # Summarize every 5 rounds | |
| KEEP_RECENT = 3 # Keep last 3 turns in full detail | |
| def should_summarize(self, round_num: int) -> bool: | |
| """Check if we should summarize at this round.""" | |
| return round_num > 0 and round_num % self.SUMMARY_INTERVAL == 0 | |
| def summarize(self, memory: JurorMemory) -> None: | |
| """Compress old turns in memory if needed. | |
| Args: | |
| memory: Juror memory to summarize. | |
| """ | |
| if len(memory.arguments_heard) <= self.KEEP_RECENT: | |
| return | |
| # Split: recent (keep full) vs old (summarize) | |
| old_turns = memory.arguments_heard[:-self.KEEP_RECENT] | |
| recent_turns = memory.arguments_heard[-self.KEEP_RECENT:] | |
| if not old_turns: | |
| return | |
| # Create simple text summary | |
| summary_parts = [] | |
| for arg in old_turns: | |
| summary_parts.append( | |
| f"- {arg.speaker_id} made a {arg.argument_type} argument" | |
| ) | |
| # Append to existing summary | |
| if memory.deliberation_summary: | |
| memory.deliberation_summary += "\n" + "\n".join(summary_parts) | |
| else: | |
| memory.deliberation_summary = "\n".join(summary_parts) | |
| # Keep only recent turns | |
| memory.arguments_heard = recent_turns | |
| def get_memory_context(self, memory: JurorMemory) -> str: | |
| """Get formatted memory context for prompts. | |
| Args: | |
| memory: Juror memory. | |
| Returns: | |
| Formatted string of memory context. | |
| """ | |
| parts = [] | |
| if memory.deliberation_summary: | |
| parts.append("## Earlier Discussion Summary") | |
| parts.append(memory.deliberation_summary) | |
| if memory.arguments_heard: | |
| parts.append("## Recent Arguments") | |
| for arg in memory.arguments_heard: | |
| parts.append(f"- {arg.speaker_id} [{arg.argument_type}]: {arg.content_summary}") | |
| return "\n".join(parts) if parts else "No deliberation history yet." | |