broadfield-dev commited on
Commit
fed5eae
·
verified ·
1 Parent(s): 491c1d3

Update database.py

Browse files
Files changed (1) hide show
  1. database.py +188 -25
database.py CHANGED
@@ -2,17 +2,28 @@
2
  import sqlite3
3
  import datetime
4
  import uuid
5
- from typing import List, Dict, Any
6
 
 
7
  DB_FILE = "social_media_platform.db"
8
 
9
- # A simple in-memory cache for agent data to avoid constant DB lookups
 
10
  AGENT_CACHE: Dict[str, Any] = {}
11
 
 
 
 
 
12
  def initialize_db():
13
- conn = sqlite3.connect(DB_FILE)
 
 
 
 
14
  cursor = conn.cursor()
15
- # Create agents table with a secret API token
 
16
  cursor.execute("""
17
  CREATE TABLE IF NOT EXISTS agents (
18
  agent_id INTEGER PRIMARY KEY,
@@ -22,31 +33,65 @@ def initialize_db():
22
  api_token TEXT NOT NULL UNIQUE
23
  )
24
  """)
25
- # Other tables
26
- cursor.execute("CREATE TABLE IF NOT EXISTS posts (post_id INTEGER PRIMARY KEY, agent_id INTEGER, content TEXT, timestamp TEXT)")
27
- cursor.execute("CREATE TABLE IF NOT EXISTS comments (comment_id INTEGER PRIMARY KEY, post_id INTEGER, agent_id INTEGER, content TEXT, timestamp TEXT)")
28
- cursor.execute("CREATE TABLE IF NOT EXISTS likes (like_id INTEGER PRIMARY KEY, post_id INTEGER, agent_id INTEGER)")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29
  conn.commit()
30
  conn.close()
31
 
32
  def populate_initial_agents():
33
- """Adds the 10 starting agents if they don't exist and prints their tokens."""
 
 
 
 
34
  agents_to_create = [
35
  {"name": "TechieTom", "persona": "A cynical but brilliant software developer.", "interests": "AI,programming"},
36
  {"name": "NatureNina", "persona": "A cheerful nature photographer.", "interests": "hiking,animals"},
37
  {"name": "ChefCarlo", "persona": "A passionate Italian chef.", "interests": "pasta,wine"},
38
  {"name": "AstroAlex", "persona": "An enthusiastic astronomer.", "interests": "stars,planets"},
39
  {"name": "HistoryHank", "persona": "A meticulous historian.", "interests": "battles,empires"},
40
- {"name": "Fit Fiona", "persona": "A motivational fitness coach.", "interests": "running,yoga"},
41
  {"name": "GamerGabe", "persona": "A competitive esports player.", "interests": "rpgs,strategy"},
42
  {"name": "ArtistAnna", "persona": "A thoughtful abstract painter.", "interests": "color theory,canvases"},
43
  {"name": "MusoMike", "persona": "A laid-back indie musician.", "interests": "guitar,vinyl"},
44
  {"name": "BookwormBella", "persona": "An avid reader of classic literature.", "interests": "novels,poetry"},
45
  ]
46
 
47
- conn = sqlite3.connect(DB_FILE)
48
  cursor = conn.cursor()
49
- print("\n--- Agent API Tokens ---")
50
  for agent_data in agents_to_create:
51
  cursor.execute("SELECT api_token FROM agents WHERE name = ?", (agent_data["name"],))
52
  result = cursor.fetchone()
@@ -56,33 +101,151 @@ def populate_initial_agents():
56
  "INSERT INTO agents (name, persona, interests, api_token) VALUES (?, ?, ?, ?)",
57
  (agent_data["name"], agent_data["persona"], agent_data["interests"], token)
58
  )
59
- print(f" {agent_data['name']}: {token}")
60
  else:
61
- print(f" {agent_data['name']}: {result[0]} (already exists)")
62
  conn.commit()
63
  conn.close()
64
- print("------------------------\n")
65
 
66
- def get_agent_by_token(token: str) -> Dict[str, Any] | None:
 
 
 
 
 
 
 
 
67
  if token in AGENT_CACHE:
68
  return AGENT_CACHE[token]
69
- conn = sqlite3.connect(DB_FILE)
 
70
  cursor = conn.cursor()
71
  cursor.execute("SELECT agent_id, name FROM agents WHERE api_token = ?", (token,))
72
- agent = cursor.fetchone()
73
  conn.close()
74
- if agent:
75
- agent_data = {"agent_id": agent[0], "name": agent[1]}
 
76
  AGENT_CACHE[token] = agent_data
77
  return agent_data
78
  return None
79
 
 
 
 
 
 
 
 
 
80
  def get_timeline(limit: int = 20) -> List[Dict[str, Any]]:
81
- conn = sqlite3.connect(DB_FILE)
82
- # This makes the output rows behave like dictionaries
83
- conn.row_factory = sqlite3.Row
 
 
 
84
  cursor = conn.cursor()
 
85
  query = """
86
  SELECT
87
- p.post_id, p.content, p.timestamp,
88
- a.agent_id, a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2
  import sqlite3
3
  import datetime
4
  import uuid
5
+ from typing import List, Dict, Any, Optional
6
 
7
+ # --- Configuration ---
8
  DB_FILE = "social_media_platform.db"
9
 
10
+ # A simple in-memory cache to store agent data (token -> {id, name})
11
+ # This prevents hitting the database for every single API call.
12
  AGENT_CACHE: Dict[str, Any] = {}
13
 
14
+ # ===================================================================
15
+ # Initialization and Setup
16
+ # ===================================================================
17
+
18
  def initialize_db():
19
+ """
20
+ Initializes the database by creating all necessary tables if they don't already exist.
21
+ This function is safe to call every time the server starts.
22
+ """
23
+ conn = sqlite3.connect(DB_FILE, check_same_thread=False)
24
  cursor = conn.cursor()
25
+
26
+ # agents table stores the permanent state of each AI agent, including their secret token for API access.
27
  cursor.execute("""
28
  CREATE TABLE IF NOT EXISTS agents (
29
  agent_id INTEGER PRIMARY KEY,
 
33
  api_token TEXT NOT NULL UNIQUE
34
  )
35
  """)
36
+
37
+ # posts table stores the core content.
38
+ cursor.execute("""
39
+ CREATE TABLE IF NOT EXISTS posts (
40
+ post_id INTEGER PRIMARY KEY,
41
+ agent_id INTEGER NOT NULL,
42
+ content TEXT NOT NULL,
43
+ timestamp TEXT NOT NULL
44
+ )
45
+ """)
46
+
47
+ # comments table links to the posts table via post_id.
48
+ cursor.execute("""
49
+ CREATE TABLE IF NOT EXISTS comments (
50
+ comment_id INTEGER PRIMARY KEY,
51
+ post_id INTEGER NOT NULL,
52
+ agent_id INTEGER NOT NULL,
53
+ content TEXT NOT NULL,
54
+ timestamp TEXT NOT NULL,
55
+ FOREIGN KEY (post_id) REFERENCES posts (post_id)
56
+ )
57
+ """)
58
+
59
+ # likes table is a simple mapping of which agent liked which post.
60
+ cursor.execute("""
61
+ CREATE TABLE IF NOT EXISTS likes (
62
+ like_id INTEGER PRIMARY KEY,
63
+ post_id INTEGER NOT NULL,
64
+ agent_id INTEGER NOT NULL,
65
+ FOREIGN KEY (post_id) REFERENCES posts (post_id),
66
+ UNIQUE(post_id, agent_id) -- Ensures an agent can only like a post once
67
+ )
68
+ """)
69
+
70
  conn.commit()
71
  conn.close()
72
 
73
  def populate_initial_agents():
74
+ """
75
+ Adds a predefined list of 10 agents to the database if they aren't already there.
76
+ Generates a unique API token for each one and prints them to the console.
77
+ This is crucial for you to be able to test the API.
78
+ """
79
  agents_to_create = [
80
  {"name": "TechieTom", "persona": "A cynical but brilliant software developer.", "interests": "AI,programming"},
81
  {"name": "NatureNina", "persona": "A cheerful nature photographer.", "interests": "hiking,animals"},
82
  {"name": "ChefCarlo", "persona": "A passionate Italian chef.", "interests": "pasta,wine"},
83
  {"name": "AstroAlex", "persona": "An enthusiastic astronomer.", "interests": "stars,planets"},
84
  {"name": "HistoryHank", "persona": "A meticulous historian.", "interests": "battles,empires"},
85
+ {"name": "FitFiona", "persona": "A motivational fitness coach.", "interests": "running,yoga"},
86
  {"name": "GamerGabe", "persona": "A competitive esports player.", "interests": "rpgs,strategy"},
87
  {"name": "ArtistAnna", "persona": "A thoughtful abstract painter.", "interests": "color theory,canvases"},
88
  {"name": "MusoMike", "persona": "A laid-back indie musician.", "interests": "guitar,vinyl"},
89
  {"name": "BookwormBella", "persona": "An avid reader of classic literature.", "interests": "novels,poetry"},
90
  ]
91
 
92
+ conn = sqlite3.connect(DB_FILE, check_same_thread=False)
93
  cursor = conn.cursor()
94
+ print("\n--- Agent API Tokens (save these!) ---")
95
  for agent_data in agents_to_create:
96
  cursor.execute("SELECT api_token FROM agents WHERE name = ?", (agent_data["name"],))
97
  result = cursor.fetchone()
 
101
  "INSERT INTO agents (name, persona, interests, api_token) VALUES (?, ?, ?, ?)",
102
  (agent_data["name"], agent_data["persona"], agent_data["interests"], token)
103
  )
104
+ print(f' "{agent_data["name"]}": "{token}"')
105
  else:
106
+ print(f' "{agent_data["name"]}": "{result[0]}" (already exists)')
107
  conn.commit()
108
  conn.close()
109
+ print("--------------------------------------\n")
110
 
111
+ # ===================================================================
112
+ # Core Data Access Functions
113
+ # ===================================================================
114
+
115
+ def get_agent_by_token(token: str) -> Optional[Dict[str, Any]]:
116
+ """
117
+ Finds an agent by their API token, using a cache to improve performance.
118
+ Returns a dictionary with agent_id and name, or None if not found.
119
+ """
120
  if token in AGENT_CACHE:
121
  return AGENT_CACHE[token]
122
+
123
+ conn = sqlite3.connect(DB_FILE, check_same_thread=False)
124
  cursor = conn.cursor()
125
  cursor.execute("SELECT agent_id, name FROM agents WHERE api_token = ?", (token,))
126
+ agent_row = cursor.fetchone()
127
  conn.close()
128
+
129
+ if agent_row:
130
+ agent_data = {"agent_id": agent_row[0], "name": agent_row[1]}
131
  AGENT_CACHE[token] = agent_data
132
  return agent_data
133
  return None
134
 
135
+ def _get_agent_name(agent_id: int, cursor) -> str:
136
+ """A helper function to get an agent's name from their ID. Handles the special HumanUser case."""
137
+ if agent_id == 0:
138
+ return "HumanUser"
139
+ cursor.execute("SELECT name FROM agents WHERE agent_id = ?", (agent_id,))
140
+ result = cursor.fetchone()
141
+ return result[0] if result else "Unknown Agent"
142
+
143
  def get_timeline(limit: int = 20) -> List[Dict[str, Any]]:
144
+ """
145
+ Fetches a list of recent posts for the main timeline.
146
+ For each post, it calculates the number of likes and comments using subqueries.
147
+ """
148
+ conn = sqlite3.connect(DB_FILE, check_same_thread=False)
149
+ conn.row_factory = sqlite3.Row # This allows accessing columns by name
150
  cursor = conn.cursor()
151
+
152
  query = """
153
  SELECT
154
+ p.post_id, p.content, p.timestamp, p.agent_id,
155
+ a.name AS author_name,
156
+ (SELECT COUNT(*) FROM likes WHERE post_id = p.post_id) AS likes_count,
157
+ (SELECT COUNT(*) FROM comments WHERE post_id = p.post_id) AS comments_count
158
+ FROM posts p
159
+ LEFT JOIN agents a ON p.agent_id = a.agent_id
160
+ ORDER BY p.timestamp DESC
161
+ LIMIT ?
162
+ """
163
+
164
+ posts_data = []
165
+ for row in cursor.execute(query, (limit,)):
166
+ # Handle the case where the author is the Human (agent_id=0), who isn't in the agents table
167
+ author_name = row['author_name'] if row['author_name'] else "HumanUser"
168
+ posts_data.append({
169
+ "post_id": row["post_id"],
170
+ "author": {"agent_id": row["agent_id"], "name": author_name},
171
+ "content": row["content"],
172
+ "timestamp": datetime.datetime.fromisoformat(row["timestamp"]),
173
+ "stats": {"likes": row["likes_count"], "comments": row["comments_count"]}
174
+ })
175
+ conn.close()
176
+ return posts_data
177
+
178
+ def create_post(agent_id: int, content: str, agent_name: str) -> Dict[str, Any]:
179
+ """
180
+ Adds a new post to the database.
181
+ Returns a dictionary representing the newly created post.
182
+ """
183
+ conn = sqlite3.connect(DB_FILE, check_same_thread=False)
184
+ cursor = conn.cursor()
185
+
186
+ timestamp = datetime.datetime.utcnow().isoformat()
187
+ cursor.execute(
188
+ "INSERT INTO posts (agent_id, content, timestamp) VALUES (?, ?, ?)",
189
+ (agent_id, content, timestamp)
190
+ )
191
+ post_id = cursor.lastrowid
192
+ conn.commit()
193
+ conn.close()
194
+
195
+ return {
196
+ "post_id": post_id,
197
+ "author": {"agent_id": agent_id, "name": agent_name},
198
+ "content": content,
199
+ "timestamp": datetime.datetime.fromisoformat(timestamp),
200
+ "stats": {"likes": 0, "comments": 0}
201
+ }
202
+
203
+ def create_comment(post_id: int, agent_id: int, content: str, agent_name: str) -> Optional[Dict[str, Any]]:
204
+ """
205
+ Adds a new comment to a specific post.
206
+ Returns a dictionary of the new comment, or None if the post doesn't exist.
207
+ """
208
+ conn = sqlite3.connect(DB_FILE, check_same_thread=False)
209
+ cursor = conn.cursor()
210
+
211
+ # First, check if the post exists
212
+ cursor.execute("SELECT post_id FROM posts WHERE post_id = ?", (post_id,))
213
+ if cursor.fetchone() is None:
214
+ conn.close()
215
+ return None
216
+
217
+ timestamp = datetime.datetime.utcnow().isoformat()
218
+ cursor.execute(
219
+ "INSERT INTO comments (post_id, agent_id, content, timestamp) VALUES (?, ?, ?, ?)",
220
+ (post_id, agent_id, content, timestamp)
221
+ )
222
+ comment_id = cursor.lastrowid
223
+ conn.commit()
224
+ conn.close()
225
+
226
+ return {
227
+ "comment_id": comment_id,
228
+ "author": {"agent_id": agent_id, "name": agent_name},
229
+ "content": content,
230
+ "timestamp": datetime.datetime.fromisoformat(timestamp)
231
+ }
232
+
233
+ def create_like(post_id: int, agent_id: int) -> bool:
234
+ """
235
+ Adds a 'like' to a specific post for a given agent.
236
+ Returns True if the like was added, False if it already existed or post not found.
237
+ """
238
+ conn = sqlite3.connect(DB_FILE, check_same_thread=False)
239
+ cursor = conn.cursor()
240
+ try:
241
+ # The UNIQUE constraint on (post_id, agent_id) will cause an IntegrityError if a duplicate is inserted.
242
+ cursor.execute("INSERT INTO likes (post_id, agent_id) VALUES (?, ?)", (post_id, agent_id))
243
+ conn.commit()
244
+ success = True
245
+ except sqlite3.IntegrityError:
246
+ # This means the like already existed, or the post_id is invalid. We can treat it as a "success"
247
+ # from the client's perspective, as the end state is that the post is liked.
248
+ success = True
249
+ finally:
250
+ conn.close()
251
+ return success