Create database.py
Browse files- database.py +88 -0
database.py
ADDED
|
@@ -0,0 +1,88 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
# database.py
|
| 2 |
+
import sqlite3
|
| 3 |
+
import datetime
|
| 4 |
+
import uuid
|
| 5 |
+
from typing import List, Dict, Any
|
| 6 |
+
|
| 7 |
+
DB_FILE = "social_media_platform.db"
|
| 8 |
+
|
| 9 |
+
# A simple in-memory cache for agent data to avoid constant DB lookups
|
| 10 |
+
AGENT_CACHE: Dict[str, Any] = {}
|
| 11 |
+
|
| 12 |
+
def initialize_db():
|
| 13 |
+
conn = sqlite3.connect(DB_FILE)
|
| 14 |
+
cursor = conn.cursor()
|
| 15 |
+
# Create agents table with a secret API token
|
| 16 |
+
cursor.execute("""
|
| 17 |
+
CREATE TABLE IF NOT EXISTS agents (
|
| 18 |
+
agent_id INTEGER PRIMARY KEY,
|
| 19 |
+
name TEXT NOT NULL UNIQUE,
|
| 20 |
+
persona TEXT NOT NULL,
|
| 21 |
+
interests TEXT NOT NULL,
|
| 22 |
+
api_token TEXT NOT NULL UNIQUE
|
| 23 |
+
)
|
| 24 |
+
""")
|
| 25 |
+
# Other tables
|
| 26 |
+
cursor.execute("CREATE TABLE IF NOT EXISTS posts (post_id INTEGER PRIMARY KEY, agent_id INTEGER, content TEXT, timestamp TEXT)")
|
| 27 |
+
cursor.execute("CREATE TABLE IF NOT EXISTS comments (comment_id INTEGER PRIMARY KEY, post_id INTEGER, agent_id INTEGER, content TEXT, timestamp TEXT)")
|
| 28 |
+
cursor.execute("CREATE TABLE IF NOT EXISTS likes (like_id INTEGER PRIMARY KEY, post_id INTEGER, agent_id INTEGER)")
|
| 29 |
+
conn.commit()
|
| 30 |
+
conn.close()
|
| 31 |
+
|
| 32 |
+
def populate_initial_agents():
|
| 33 |
+
"""Adds the 10 starting agents if they don't exist and prints their tokens."""
|
| 34 |
+
agents_to_create = [
|
| 35 |
+
{"name": "TechieTom", "persona": "A cynical but brilliant software developer.", "interests": "AI,programming"},
|
| 36 |
+
{"name": "NatureNina", "persona": "A cheerful nature photographer.", "interests": "hiking,animals"},
|
| 37 |
+
{"name": "ChefCarlo", "persona": "A passionate Italian chef.", "interests": "pasta,wine"},
|
| 38 |
+
{"name": "AstroAlex", "persona": "An enthusiastic astronomer.", "interests": "stars,planets"},
|
| 39 |
+
{"name": "HistoryHank", "persona": "A meticulous historian.", "interests": "battles,empires"},
|
| 40 |
+
{"name": "Fit Fiona", "persona": "A motivational fitness coach.", "interests": "running,yoga"},
|
| 41 |
+
{"name": "GamerGabe", "persona": "A competitive esports player.", "interests": "rpgs,strategy"},
|
| 42 |
+
{"name": "ArtistAnna", "persona": "A thoughtful abstract painter.", "interests": "color theory,canvases"},
|
| 43 |
+
{"name": "MusoMike", "persona": "A laid-back indie musician.", "interests": "guitar,vinyl"},
|
| 44 |
+
{"name": "BookwormBella", "persona": "An avid reader of classic literature.", "interests": "novels,poetry"},
|
| 45 |
+
]
|
| 46 |
+
|
| 47 |
+
conn = sqlite3.connect(DB_FILE)
|
| 48 |
+
cursor = conn.cursor()
|
| 49 |
+
print("\n--- Agent API Tokens ---")
|
| 50 |
+
for agent_data in agents_to_create:
|
| 51 |
+
cursor.execute("SELECT api_token FROM agents WHERE name = ?", (agent_data["name"],))
|
| 52 |
+
result = cursor.fetchone()
|
| 53 |
+
if result is None:
|
| 54 |
+
token = str(uuid.uuid4())
|
| 55 |
+
cursor.execute(
|
| 56 |
+
"INSERT INTO agents (name, persona, interests, api_token) VALUES (?, ?, ?, ?)",
|
| 57 |
+
(agent_data["name"], agent_data["persona"], agent_data["interests"], token)
|
| 58 |
+
)
|
| 59 |
+
print(f" {agent_data['name']}: {token}")
|
| 60 |
+
else:
|
| 61 |
+
print(f" {agent_data['name']}: {result[0]} (already exists)")
|
| 62 |
+
conn.commit()
|
| 63 |
+
conn.close()
|
| 64 |
+
print("------------------------\n")
|
| 65 |
+
|
| 66 |
+
def get_agent_by_token(token: str) -> Dict[str, Any] | None:
|
| 67 |
+
if token in AGENT_CACHE:
|
| 68 |
+
return AGENT_CACHE[token]
|
| 69 |
+
conn = sqlite3.connect(DB_FILE)
|
| 70 |
+
cursor = conn.cursor()
|
| 71 |
+
cursor.execute("SELECT agent_id, name FROM agents WHERE api_token = ?", (token,))
|
| 72 |
+
agent = cursor.fetchone()
|
| 73 |
+
conn.close()
|
| 74 |
+
if agent:
|
| 75 |
+
agent_data = {"agent_id": agent[0], "name": agent[1]}
|
| 76 |
+
AGENT_CACHE[token] = agent_data
|
| 77 |
+
return agent_data
|
| 78 |
+
return None
|
| 79 |
+
|
| 80 |
+
def get_timeline(limit: int = 20) -> List[Dict[str, Any]]:
|
| 81 |
+
conn = sqlite3.connect(DB_FILE)
|
| 82 |
+
# This makes the output rows behave like dictionaries
|
| 83 |
+
conn.row_factory = sqlite3.Row
|
| 84 |
+
cursor = conn.cursor()
|
| 85 |
+
query = """
|
| 86 |
+
SELECT
|
| 87 |
+
p.post_id, p.content, p.timestamp,
|
| 88 |
+
a.agent_id, a
|