kn29 commited on
Commit
b511946
·
verified ·
1 Parent(s): fef6ed9

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +165 -104
app.py CHANGED
@@ -1,3 +1,4 @@
 
1
  import asyncio
2
  import concurrent.futures
3
  import logging
@@ -25,13 +26,13 @@ except ImportError:
25
  FAISS_AVAILABLE = False
26
 
27
  try:
28
- # We import the SessionRAG class and the model initializer.
29
- from rag import SessionRAG, initialize_models
30
  RAG_AVAILABLE = True
31
  except ImportError:
32
  RAG_AVAILABLE = False
33
 
34
- # Configure comprehensive logging
35
  logging.basicConfig(
36
  level=logging.INFO,
37
  format='%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s:%(lineno)d] - %(message)s',
@@ -46,7 +47,7 @@ logger = logging.getLogger(__name__)
46
  MONGO_CLIENT = None
47
  DB = None
48
  RAG_MODELS_INITIALIZED = False
49
- SESSION_LAST_ACCESS = {} # Track last access time for each session
50
  APP_STATE = {
51
  "startup_time": None,
52
  "mongodb_connected": False,
@@ -56,12 +57,15 @@ APP_STATE = {
56
  "errors": []
57
  }
58
 
59
- # --- In-memory Session Management & Threading ---
60
  SESSION_STORES = {}
61
  STORE_LOCK = threading.RLock()
62
  CLEANUP_INTERVAL = 1800 # 30 minutes
63
  STORE_TTL = 1800 # 30 minutes
64
 
 
 
 
65
  # --- Pydantic Models ---
66
  class ChatRequest(BaseModel):
67
  message: str = Field(..., min_length=1, max_length=5000)
@@ -70,25 +74,12 @@ class ChatResponse(BaseModel):
70
  success: bool
71
  answer: str
72
  sources: List[Dict[str, Any]] = Field(default_factory=list)
73
- chat_history: List[Dict[str, Any]] = Field(default_factory=list)
74
  processing_time: float
75
  session_id: str
76
  query_analysis: Optional[Dict[str, Any]] = None
77
  confidence: Optional[float] = None
78
  error_details: Optional[str] = None
79
 
80
- class InitRequest(BaseModel):
81
- force_reload: bool = Field(default=False)
82
-
83
- class InitResponse(BaseModel):
84
- success: bool
85
- session_id: str
86
- message: str
87
- chunk_count: int = Field(default=0)
88
- title: str = Field(default="Unknown Document")
89
- document_info: Optional[Dict[str, Any]] = None
90
- error_details: Optional[str] = None
91
-
92
  class HealthResponse(BaseModel):
93
  status: str
94
  mongodb_connected: bool
@@ -99,7 +90,6 @@ class HealthResponse(BaseModel):
99
  uptime_seconds: float
100
  last_error: Optional[str] = None
101
 
102
-
103
  # --- Helper Functions ---
104
  def create_session_logger(session_id: str):
105
  return logging.LoggerAdapter(logger, {'session_id': session_id[:8]})
@@ -110,12 +100,20 @@ def connect_mongodb():
110
  mongodb_url = os.getenv("MONGODB_URL", "mongodb://localhost:27017/")
111
  logger.info(f"Connecting to MongoDB...")
112
  MONGO_CLIENT = pymongo.MongoClient(
113
- mongodb_url, serverSelectionTimeoutMS=5000
 
 
 
114
  )
115
  MONGO_CLIENT.admin.command('ping')
116
  DB = MONGO_CLIENT["legal_rag_system"]
 
 
117
  DB.chats.create_index("session_id", background=True)
118
  DB.chats.create_index("created_at", expireAfterSeconds=24 * 60 * 60, background=True)
 
 
 
119
  APP_STATE["mongodb_connected"] = True
120
  logger.info("MongoDB connected successfully")
121
  return True
@@ -124,7 +122,7 @@ def connect_mongodb():
124
  return False
125
 
126
  def init_rag_models():
127
- """Initializes the shared, stateless RAG models once at startup."""
128
  global RAG_MODELS_INITIALIZED
129
  if not RAG_AVAILABLE or not FAISS_AVAILABLE:
130
  logger.error("RAG module or FAISS not available - cannot initialize models.")
@@ -143,75 +141,82 @@ def init_rag_models():
143
  APP_STATE["errors"].append(f"RAG init failed: {str(e)}")
144
  return False
145
 
146
- def load_session_from_mongodb(session_id: str) -> Dict[str, Any]:
147
- """Load pre-existing session data and embeddings from MongoDB"""
148
- session_logger = create_session_logger(session_id)
149
- session_logger.info(f"Loading session from MongoDB: {session_id}")
150
-
151
- if not DB:
152
- raise ValueError("Database not connected")
153
-
154
- # Load session metadata
155
- session_doc = DB.sessions.find_one({"session_id": session_id})
156
- if not session_doc:
157
- raise ValueError(f"Session {session_id} not found in database")
158
- if session_doc.get("status") != "completed":
159
- raise ValueError(f"Session not ready - status: {session_doc.get('status')}")
160
-
161
- # Load chunks with embeddings from MongoDB
162
- session_logger.info(f"Loading chunks with embeddings for: {session_doc.get('filename', 'unknown')}")
163
- chunks_cursor = DB.chunks.find({"session_id": session_id}).sort("created_at", 1)
164
- chunks_list = list(chunks_cursor)
165
-
166
- if not chunks_list:
167
- raise ValueError(f"No chunks found for session {session_id}")
168
-
169
- # Create SessionRAG instance and load pre-existing data
170
- groq_api_key = os.getenv("GROQ_API_KEY")
171
- session_rag = SessionRAG(session_id, groq_api_key)
172
-
173
- session_logger.info(f"Loading existing session data with {len(chunks_list)} chunks")
174
- session_rag.load_existing_session_data(chunks_list)
 
 
 
 
 
 
 
 
175
 
176
- session_store = {
177
- "session_rag": session_rag,
178
- "indexed": True, # Add this field for health check
179
- "metadata": {
180
- "session_id": session_id,
181
- "title": session_doc.get("filename", "Document"),
182
- "chunk_count": len(chunks_list),
183
- "loaded_at": datetime.utcnow(),
184
- "document_info": {"filename": session_doc.get("filename", "Unknown")}
 
 
 
 
 
 
 
 
 
 
185
  }
186
- }
187
- session_logger.info("Session loaded from MongoDB with existing embeddings")
188
- return session_store
189
-
190
- def get_chat_history_safely(session_id: str, limit: int = 50) -> List[Dict[str, Any]]:
191
- """Get chat history with error handling."""
192
- if not DB:
193
- return []
194
- try:
195
- chats_cursor = DB.chats.find({"session_id": session_id}).sort("created_at", -1).limit(limit)
196
- # Reverse to get chronological order [oldest -> newest]
197
- return list(chats_cursor)[::-1]
198
- except Exception as e:
199
- logger.error(f"Failed to get chat history for session {session_id}: {e}")
200
- return []
201
 
202
- # --- Session Cleanup ---
203
  def cleanup_session_resources(session_id: str):
204
- """Safely cleans up a session's resources."""
205
  with STORE_LOCK:
206
  if session_id in SESSION_STORES:
207
- store = SESSION_STORES.pop(session_id) # Atomically get and remove
208
  session_rag = store.get("session_rag")
209
  if hasattr(session_rag, 'cleanup'):
210
  session_rag.cleanup()
211
  logger.info(f"Cleaned up session from memory: {session_id[:8]}")
212
 
213
  def cleanup_expired_sessions():
214
- """Clean up sessions that haven't been accessed in 30 minutes"""
215
  now = datetime.utcnow()
216
  expired_ids = [
217
  sid for sid, last_access in list(SESSION_LAST_ACCESS.items())
@@ -230,14 +235,19 @@ async def periodic_cleanup():
230
  cleanup_expired_sessions()
231
 
232
  async def save_chat_message_safely(session_id: str, role: str, message: str):
233
- """Saves chat messages in a non-blocking way."""
234
  if not DB:
235
  return
236
  try:
237
- await asyncio.to_thread(
238
- DB.chats.insert_one,
239
- {"session_id": session_id, "role": role, "message": message, "created_at": datetime.utcnow()}
240
- )
 
 
 
 
 
241
  except Exception as e:
242
  logger.error(f"Failed to save chat message for session {session_id}: {e}")
243
 
@@ -246,7 +256,7 @@ async def save_chat_message_safely(session_id: str, role: str, message: str):
246
  async def lifespan(app: FastAPI):
247
  cleanup_task = None
248
  APP_STATE["startup_time"] = datetime.utcnow()
249
- logger.info("Starting Advanced RAG Chat Service...")
250
 
251
  connect_mongodb()
252
  init_rag_models()
@@ -261,13 +271,14 @@ async def lifespan(app: FastAPI):
261
  cleanup_task.cancel()
262
  if MONGO_CLIENT:
263
  MONGO_CLIENT.close()
 
264
  logger.info("Shutdown complete.")
265
 
266
- # --- FastAPI App and Endpoints ---
267
  app = FastAPI(
268
- title="Advanced RAG Chat Service",
269
- description="A robust, session-isolated RAG chat service.",
270
- version="3.1.0",
271
  lifespan=lifespan
272
  )
273
 
@@ -281,7 +292,7 @@ app.add_middleware(
281
 
282
  @app.get("/")
283
  async def root():
284
- return {"service": "Advanced RAG Chat Service", "version": "3.1.0"}
285
 
286
  @app.get("/health", response_model=HealthResponse)
287
  async def health_check():
@@ -300,7 +311,10 @@ async def health_check():
300
  rag_models_initialized=RAG_MODELS_INITIALIZED,
301
  faiss_available=FAISS_AVAILABLE,
302
  active_sessions=active_sessions,
303
- memory_usage={"loaded_sessions": active_sessions, "indexed_sessions": indexed_sessions},
 
 
 
304
  uptime_seconds=uptime,
305
  last_error=APP_STATE["errors"][-1] if APP_STATE["errors"] else None
306
  )
@@ -313,15 +327,15 @@ async def chat_with_document(session_id: str, request: ChatRequest):
313
  try:
314
  session_logger.info(f"Received chat request: {request.message[:100]}...")
315
 
316
- # Check if session is already loaded in memory
317
  with STORE_LOCK:
318
  if session_id not in SESSION_STORES:
319
- # Lazy load: Load session from MongoDB when first chat request comes
320
  session_logger.info("Loading session from MongoDB for first chat request")
321
  try:
322
- session_store = await asyncio.to_thread(load_session_from_mongodb, session_id)
323
  SESSION_STORES[session_id] = session_store
324
- session_logger.info("Session loaded successfully from MongoDB")
325
  except Exception as load_error:
326
  session_logger.error(f"Failed to load session: {load_error}")
327
  raise HTTPException(status_code=404, detail=f"Failed to load session: {str(load_error)}")
@@ -330,10 +344,13 @@ async def chat_with_document(session_id: str, request: ChatRequest):
330
  SESSION_LAST_ACCESS[session_id] = datetime.utcnow()
331
  session_rag = SESSION_STORES[session_id]["session_rag"]
332
 
333
- session_logger.info(f"Processing query with SessionRAG...")
 
 
 
 
334
 
335
- # Process the query using SessionRAG
336
- result = await asyncio.to_thread(session_rag.query_documents, request.message, top_k=5)
337
 
338
  if 'error' in result:
339
  session_logger.error(f"Query processing error: {result['error']}")
@@ -342,7 +359,7 @@ async def chat_with_document(session_id: str, request: ChatRequest):
342
  APP_STATE["total_queries"] += 1
343
  answer = result.get('answer', 'Unable to generate an answer.')
344
 
345
- # Save chat messages asynchronously
346
  asyncio.create_task(save_chat_message_safely(session_id, "user", request.message))
347
  asyncio.create_task(save_chat_message_safely(session_id, "assistant", answer))
348
 
@@ -353,7 +370,6 @@ async def chat_with_document(session_id: str, request: ChatRequest):
353
  success=True,
354
  answer=answer,
355
  sources=result.get('sources', []),
356
- chat_history=[],
357
  processing_time=processing_time,
358
  session_id=session_id,
359
  query_analysis=result.get('query_analysis'),
@@ -369,21 +385,66 @@ async def chat_with_document(session_id: str, request: ChatRequest):
369
 
370
  @app.get("/history/{session_id}")
371
  async def get_session_history(session_id: str):
372
- """Retrieves chat history for a session."""
373
  if not DB:
374
  raise HTTPException(status_code=503, detail="Database not connected")
375
 
376
- history = await asyncio.to_thread(get_chat_history_safely, session_id)
 
 
 
 
 
 
 
 
377
  return {"session_id": session_id, "chat_history": history}
378
 
379
  @app.delete("/session/{session_id}")
380
  async def cleanup_session(session_id: str):
381
- """Manually cleans up a specific session from memory."""
382
  cleanup_session_resources(session_id)
 
383
  return {"success": True, "message": f"Session {session_id} cleaned up."}
384
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
385
  if __name__ == "__main__":
386
  import uvicorn
387
  port = int(os.getenv("PORT", 7861))
388
- logger.info(f"Starting server on http://0.0.0.0:{port}")
389
- uvicorn.run("app:app", host="0.0.0.0", port=port, reload=True)
 
1
+ # app_optimized.py - Performance-Optimized FastAPI App
2
  import asyncio
3
  import concurrent.futures
4
  import logging
 
26
  FAISS_AVAILABLE = False
27
 
28
  try:
29
+ # Import the optimized SessionRAG class
30
+ from rag_optimized import OptimizedSessionRAG, initialize_models
31
  RAG_AVAILABLE = True
32
  except ImportError:
33
  RAG_AVAILABLE = False
34
 
35
+ # Configure logging
36
  logging.basicConfig(
37
  level=logging.INFO,
38
  format='%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s:%(lineno)d] - %(message)s',
 
47
  MONGO_CLIENT = None
48
  DB = None
49
  RAG_MODELS_INITIALIZED = False
50
+ SESSION_LAST_ACCESS = {}
51
  APP_STATE = {
52
  "startup_time": None,
53
  "mongodb_connected": False,
 
57
  "errors": []
58
  }
59
 
60
+ # --- Optimized Session Management ---
61
  SESSION_STORES = {}
62
  STORE_LOCK = threading.RLock()
63
  CLEANUP_INTERVAL = 1800 # 30 minutes
64
  STORE_TTL = 1800 # 30 minutes
65
 
66
+ # Thread pool for async operations
67
+ EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=4)
68
+
69
  # --- Pydantic Models ---
70
  class ChatRequest(BaseModel):
71
  message: str = Field(..., min_length=1, max_length=5000)
 
74
  success: bool
75
  answer: str
76
  sources: List[Dict[str, Any]] = Field(default_factory=list)
 
77
  processing_time: float
78
  session_id: str
79
  query_analysis: Optional[Dict[str, Any]] = None
80
  confidence: Optional[float] = None
81
  error_details: Optional[str] = None
82
 
 
 
 
 
 
 
 
 
 
 
 
 
83
  class HealthResponse(BaseModel):
84
  status: str
85
  mongodb_connected: bool
 
90
  uptime_seconds: float
91
  last_error: Optional[str] = None
92
 
 
93
  # --- Helper Functions ---
94
  def create_session_logger(session_id: str):
95
  return logging.LoggerAdapter(logger, {'session_id': session_id[:8]})
 
100
  mongodb_url = os.getenv("MONGODB_URL", "mongodb://localhost:27017/")
101
  logger.info(f"Connecting to MongoDB...")
102
  MONGO_CLIENT = pymongo.MongoClient(
103
+ mongodb_url,
104
+ serverSelectionTimeoutMS=5000,
105
+ maxPoolSize=50,
106
+ waitQueueTimeoutMS=2500
107
  )
108
  MONGO_CLIENT.admin.command('ping')
109
  DB = MONGO_CLIENT["legal_rag_system"]
110
+
111
+ # Ensure indices exist
112
  DB.chats.create_index("session_id", background=True)
113
  DB.chats.create_index("created_at", expireAfterSeconds=24 * 60 * 60, background=True)
114
+ DB.sessions.create_index("session_id", unique=True, background=True)
115
+ DB.chunks.create_index("session_id", background=True)
116
+
117
  APP_STATE["mongodb_connected"] = True
118
  logger.info("MongoDB connected successfully")
119
  return True
 
122
  return False
123
 
124
  def init_rag_models():
125
+ """Initialize shared RAG models once at startup"""
126
  global RAG_MODELS_INITIALIZED
127
  if not RAG_AVAILABLE or not FAISS_AVAILABLE:
128
  logger.error("RAG module or FAISS not available - cannot initialize models.")
 
141
  APP_STATE["errors"].append(f"RAG init failed: {str(e)}")
142
  return False
143
 
144
+ async def load_session_from_mongodb_async(session_id: str) -> Dict[str, Any]:
145
+ """OPTIMIZED: Load session from MongoDB in async way"""
146
+ def _load_session_sync():
147
+ session_logger = create_session_logger(session_id)
148
+ session_logger.info(f"Loading session from MongoDB: {session_id}")
149
+
150
+ if not DB:
151
+ raise ValueError("Database not connected")
152
+
153
+ # Load session metadata
154
+ session_doc = DB.sessions.find_one({"session_id": session_id})
155
+ if not session_doc:
156
+ raise ValueError(f"Session {session_id} not found in database")
157
+ if session_doc.get("status") != "completed":
158
+ raise ValueError(f"Session not ready - status: {session_doc.get('status')}")
159
+
160
+ # Load chunks with embeddings efficiently
161
+ session_logger.info(f"Loading chunks with embeddings for: {session_doc.get('filename', 'unknown')}")
162
+
163
+ # Use projection to only load needed fields
164
+ chunks_cursor = DB.chunks.find(
165
+ {"session_id": session_id},
166
+ {
167
+ "chunk_id": 1,
168
+ "content": 1,
169
+ "title": 1,
170
+ "section_type": 1,
171
+ "importance_score": 1,
172
+ "entities": 1,
173
+ "embedding": 1
174
+ }
175
+ ).sort("created_at", 1)
176
+
177
+ chunks_list = list(chunks_cursor)
178
+
179
+ if not chunks_list:
180
+ raise ValueError(f"No chunks found for session {session_id}")
181
 
182
+ # Create OptimizedSessionRAG instance
183
+ groq_api_key = os.getenv("GROQ_API_KEY")
184
+ session_rag = OptimizedSessionRAG(session_id, groq_api_key)
185
+
186
+ session_logger.info(f"Loading existing session data with {len(chunks_list)} chunks")
187
+ session_rag.load_existing_session_data(chunks_list)
188
+
189
+ session_store = {
190
+ "session_rag": session_rag,
191
+ "indexed": True,
192
+ "metadata": {
193
+ "session_id": session_id,
194
+ "title": session_doc.get("filename", "Document"),
195
+ "chunk_count": len(chunks_list),
196
+ "loaded_at": datetime.utcnow(),
197
+ "load_time": getattr(session_rag, 'load_time', 0),
198
+ "index_build_time": getattr(session_rag, 'index_build_time', 0),
199
+ "document_info": {"filename": session_doc.get("filename", "Unknown")}
200
+ }
201
  }
202
+ session_logger.info(f"Session loaded from MongoDB in {session_rag.load_time:.2f}s")
203
+ return session_store
204
+
205
+ # Run in thread pool to avoid blocking
206
+ return await asyncio.get_event_loop().run_in_executor(EXECUTOR, _load_session_sync)
 
 
 
 
 
 
 
 
 
 
207
 
 
208
  def cleanup_session_resources(session_id: str):
209
+ """Safely clean up session resources"""
210
  with STORE_LOCK:
211
  if session_id in SESSION_STORES:
212
+ store = SESSION_STORES.pop(session_id)
213
  session_rag = store.get("session_rag")
214
  if hasattr(session_rag, 'cleanup'):
215
  session_rag.cleanup()
216
  logger.info(f"Cleaned up session from memory: {session_id[:8]}")
217
 
218
  def cleanup_expired_sessions():
219
+ """Clean up sessions that haven't been accessed recently"""
220
  now = datetime.utcnow()
221
  expired_ids = [
222
  sid for sid, last_access in list(SESSION_LAST_ACCESS.items())
 
235
  cleanup_expired_sessions()
236
 
237
  async def save_chat_message_safely(session_id: str, role: str, message: str):
238
+ """Save chat messages asynchronously"""
239
  if not DB:
240
  return
241
  try:
242
+ def _save_message():
243
+ DB.chats.insert_one({
244
+ "session_id": session_id,
245
+ "role": role,
246
+ "message": message,
247
+ "created_at": datetime.utcnow()
248
+ })
249
+
250
+ await asyncio.get_event_loop().run_in_executor(EXECUTOR, _save_message)
251
  except Exception as e:
252
  logger.error(f"Failed to save chat message for session {session_id}: {e}")
253
 
 
256
  async def lifespan(app: FastAPI):
257
  cleanup_task = None
258
  APP_STATE["startup_time"] = datetime.utcnow()
259
+ logger.info("Starting Optimized RAG Chat Service...")
260
 
261
  connect_mongodb()
262
  init_rag_models()
 
271
  cleanup_task.cancel()
272
  if MONGO_CLIENT:
273
  MONGO_CLIENT.close()
274
+ EXECUTOR.shutdown(wait=True)
275
  logger.info("Shutdown complete.")
276
 
277
+ # --- FastAPI App ---
278
  app = FastAPI(
279
+ title="Optimized RAG Chat Service",
280
+ description="High-performance, session-isolated RAG chat service with pre-computed embeddings.",
281
+ version="4.0.0",
282
  lifespan=lifespan
283
  )
284
 
 
292
 
293
  @app.get("/")
294
  async def root():
295
+ return {"service": "Optimized RAG Chat Service", "version": "4.0.0"}
296
 
297
  @app.get("/health", response_model=HealthResponse)
298
  async def health_check():
 
311
  rag_models_initialized=RAG_MODELS_INITIALIZED,
312
  faiss_available=FAISS_AVAILABLE,
313
  active_sessions=active_sessions,
314
+ memory_usage={
315
+ "loaded_sessions": active_sessions,
316
+ "indexed_sessions": indexed_sessions
317
+ },
318
  uptime_seconds=uptime,
319
  last_error=APP_STATE["errors"][-1] if APP_STATE["errors"] else None
320
  )
 
327
  try:
328
  session_logger.info(f"Received chat request: {request.message[:100]}...")
329
 
330
+ # Check if session is loaded in memory
331
  with STORE_LOCK:
332
  if session_id not in SESSION_STORES:
333
+ # Lazy load: Load session from MongoDB asynchronously
334
  session_logger.info("Loading session from MongoDB for first chat request")
335
  try:
336
+ session_store = await load_session_from_mongodb_async(session_id)
337
  SESSION_STORES[session_id] = session_store
338
+ session_logger.info(f"Session loaded successfully from MongoDB in {session_store['metadata'].get('load_time', 0):.2f}s")
339
  except Exception as load_error:
340
  session_logger.error(f"Failed to load session: {load_error}")
341
  raise HTTPException(status_code=404, detail=f"Failed to load session: {str(load_error)}")
 
344
  SESSION_LAST_ACCESS[session_id] = datetime.utcnow()
345
  session_rag = SESSION_STORES[session_id]["session_rag"]
346
 
347
+ session_logger.info(f"Processing query with OptimizedSessionRAG...")
348
+
349
+ # Process the query using OptimizedSessionRAG - this is FAST now
350
+ def _process_query():
351
+ return session_rag.query_documents(request.message, top_k=5)
352
 
353
+ result = await asyncio.get_event_loop().run_in_executor(EXECUTOR, _process_query)
 
354
 
355
  if 'error' in result:
356
  session_logger.error(f"Query processing error: {result['error']}")
 
359
  APP_STATE["total_queries"] += 1
360
  answer = result.get('answer', 'Unable to generate an answer.')
361
 
362
+ # Save chat messages asynchronously (non-blocking)
363
  asyncio.create_task(save_chat_message_safely(session_id, "user", request.message))
364
  asyncio.create_task(save_chat_message_safely(session_id, "assistant", answer))
365
 
 
370
  success=True,
371
  answer=answer,
372
  sources=result.get('sources', []),
 
373
  processing_time=processing_time,
374
  session_id=session_id,
375
  query_analysis=result.get('query_analysis'),
 
385
 
386
  @app.get("/history/{session_id}")
387
  async def get_session_history(session_id: str):
388
+ """Retrieve chat history for a session"""
389
  if not DB:
390
  raise HTTPException(status_code=503, detail="Database not connected")
391
 
392
+ def _get_history():
393
+ try:
394
+ chats_cursor = DB.chats.find({"session_id": session_id}).sort("created_at", -1).limit(50)
395
+ return list(chats_cursor)[::-1] # Reverse to get chronological order
396
+ except Exception as e:
397
+ logger.error(f"Failed to get chat history for session {session_id}: {e}")
398
+ return []
399
+
400
+ history = await asyncio.get_event_loop().run_in_executor(EXECUTOR, _get_history)
401
  return {"session_id": session_id, "chat_history": history}
402
 
403
  @app.delete("/session/{session_id}")
404
  async def cleanup_session(session_id: str):
405
+ """Manually clean up a specific session from memory"""
406
  cleanup_session_resources(session_id)
407
+ SESSION_LAST_ACCESS.pop(session_id, None)
408
  return {"success": True, "message": f"Session {session_id} cleaned up."}
409
 
410
+ @app.get("/session/{session_id}/info")
411
+ async def get_session_info(session_id: str):
412
+ """Get information about a loaded session"""
413
+ with STORE_LOCK:
414
+ if session_id not in SESSION_STORES:
415
+ raise HTTPException(status_code=404, detail="Session not loaded in memory")
416
+
417
+ metadata = SESSION_STORES[session_id]["metadata"]
418
+ return {
419
+ "session_id": session_id,
420
+ "loaded": True,
421
+ "metadata": metadata,
422
+ "last_access": SESSION_LAST_ACCESS.get(session_id)
423
+ }
424
+
425
+ @app.get("/sessions")
426
+ async def list_active_sessions():
427
+ """List all currently active sessions in memory"""
428
+ with STORE_LOCK:
429
+ sessions = []
430
+ for session_id, store in SESSION_STORES.items():
431
+ metadata = store.get("metadata", {})
432
+ sessions.append({
433
+ "session_id": session_id,
434
+ "title": metadata.get("title", "Unknown"),
435
+ "chunk_count": metadata.get("chunk_count", 0),
436
+ "loaded_at": metadata.get("loaded_at"),
437
+ "load_time": metadata.get("load_time", 0),
438
+ "last_access": SESSION_LAST_ACCESS.get(session_id)
439
+ })
440
+
441
+ return {
442
+ "active_sessions": len(sessions),
443
+ "sessions": sessions
444
+ }
445
+
446
  if __name__ == "__main__":
447
  import uvicorn
448
  port = int(os.getenv("PORT", 7861))
449
+ logger.info(f"Starting optimized server on http://0.0.0.0:{port}")
450
+ uvicorn.run("app_optimized:app", host="0.0.0.0", port=port, reload=False, workers=1)