-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrag.py
133 lines (108 loc) · 4.39 KB
/
rag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import os
import psycopg2
from openai import OpenAI
from anthropic import Anthropic
from typing import List, Dict, Any
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SemanticSearchClaude:
def __init__(self):
# Initialize database connection
connection_string = os.getenv('DATABASE_URL', 'postgresql://postgres:[email protected]:5433/frames_new')
self.db_conn = psycopg2.connect(connection_string)
# Initialize API clients
self.openai_client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
self.anthropic_client = Anthropic(api_key=os.getenv('ANTHROPIC_API_KEY'))
def format_embedding_for_postgres(self, embedding: List[float]) -> str:
"""Format the embedding array as a PostgreSQL vector literal"""
return f"[{','.join(map(str, embedding))}]"
async def search_wikipedia(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
"""Search for similar content using semantic similarity"""
try:
# Get embedding for the query
embedding_response = self.openai_client.embeddings.create(
model="text-embedding-3-small",
input=query
)
query_embedding = embedding_response.data[0].embedding
formatted_embedding = self.format_embedding_for_postgres(query_embedding)
# Perform similarity search
search_query = """
SELECT
article_id,
content_chunk,
(-1.0) * (content_chunk_vector <#> %s::vector) as similarity
FROM
wikipedia_content_vectors
ORDER BY
similarity DESC
LIMIT %s;
"""
with self.db_conn.cursor() as cursor:
cursor.execute(search_query, (formatted_embedding, top_k))
results = cursor.fetchall()
return [
{
"articleId": row[0],
"chunkContent": row[1],
"similarity": float(row[2])
}
for row in results
]
except Exception as e:
logger.error(f"Error in similarity search: {str(e)}")
raise
async def get_claude_response(self, query: str, context: List[Dict[str, Any]]) -> str:
"""Get response from Claude with semantic search context"""
# Format context for the prompt
context_text = "\n\n".join([
f"Related content {i+1} (similarity: {item['similarity']:.2f}):\n{item['chunkContent']}"
for i, item in enumerate(context)
])
# Construct the prompt with context
prompt = f"""Here is some relevant context that might help answer the query:
{context_text}
Based on the above context and your knowledge, please answer this query:
{query}"""
try:
response = self.anthropic_client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{"role": "user", "content": prompt}
]
)
return response.content[0].text # type: ignore
except Exception as e:
logger.error(f"Error getting Claude response: {str(e)}")
raise
async def process_query(self, query: str, top_k: int = 5) -> str:
"""Process a query using semantic search and Claude"""
# Get similar content
similar_content = await self.search_wikipedia(query, top_k)
# Get Claude's response with context
response = await self.get_claude_response(query, similar_content)
return response
def close(self):
"""Close database connection"""
self.db_conn.close()
async def main():
# Example usage
assistant = SemanticSearchClaude()
try:
while True:
query = input("\nEnter your question (or 'quit' to exit): ")
if query.lower() == 'quit':
break
response = await assistant.process_query(query)
print("\nClaude's response:")
print(response)
except KeyboardInterrupt:
print("\nExiting...")
finally:
assistant.close()
if __name__ == "__main__":
import asyncio
asyncio.run(main())