-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathserver.py
157 lines (132 loc) · 5.58 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import asyncio
import base64
import json
import sys
import websockets
import ssl
def sts_connect():
sts_ws = websockets.connect(
"wss://agent.deepgram.com/agent", subprotocols=["token", "YOUR_DEEPGRAM_API_KEY"]
)
return sts_ws
async def twilio_handler(twilio_ws):
audio_queue = asyncio.Queue()
streamsid_queue = asyncio.Queue()
async with sts_connect() as sts_ws:
config_message = {
"type": "SettingsConfiguration",
"audio": {
"input": {
"encoding": "mulaw",
"sample_rate": 8000,
},
"output": {
"encoding": "mulaw",
"sample_rate": 8000,
"container": "none",
},
},
"agent": {
"listen": {"model": "nova-2"},
"think": {
"provider": {
"type": "anthropic",
},
"model": "claude-3-haiku-20240307",
"instructions": "You are a helpful car seller.",
},
"speak": {"model": "aura-asteria-en"},
},
}
await sts_ws.send(json.dumps(config_message))
async def sts_sender(sts_ws):
print("sts_sender started")
while True:
chunk = await audio_queue.get()
await sts_ws.send(chunk)
async def sts_receiver(sts_ws):
print("sts_receiver started")
# we will wait until the twilio ws connection figures out the streamsid
streamsid = await streamsid_queue.get()
# for each sts result received, forward it on to the call
async for message in sts_ws:
if type(message) is str:
print(message)
# handle barge-in
decoded = json.loads(message)
if decoded['type'] == 'UserStartedSpeaking':
clear_message = {
"event": "clear",
"streamSid": streamsid
}
await twilio_ws.send(json.dumps(clear_message))
continue
print(type(message))
raw_mulaw = message
# construct a Twilio media message with the raw mulaw (see https://www.twilio.com/docs/voice/twiml/stream#websocket-messages---to-twilio)
media_message = {
"event": "media",
"streamSid": streamsid,
"media": {"payload": base64.b64encode(raw_mulaw).decode("ascii")},
}
# send the TTS audio to the attached phonecall
await twilio_ws.send(json.dumps(media_message))
async def twilio_receiver(twilio_ws):
print("twilio_receiver started")
# twilio sends audio data as 160 byte messages containing 20ms of audio each
# we will buffer 20 twilio messages corresponding to 0.4 seconds of audio to improve throughput performance
BUFFER_SIZE = 20 * 160
inbuffer = bytearray(b"")
async for message in twilio_ws:
try:
data = json.loads(message)
if data["event"] == "start":
print("got our streamsid")
start = data["start"]
streamsid = start["streamSid"]
streamsid_queue.put_nowait(streamsid)
if data["event"] == "connected":
continue
if data["event"] == "media":
media = data["media"]
chunk = base64.b64decode(media["payload"])
if media["track"] == "inbound":
inbuffer.extend(chunk)
if data["event"] == "stop":
break
# check if our buffer is ready to send to our audio_queue (and, thus, then to sts)
while len(inbuffer) >= BUFFER_SIZE:
chunk = inbuffer[:BUFFER_SIZE]
audio_queue.put_nowait(chunk)
inbuffer = inbuffer[BUFFER_SIZE:]
except:
break
# the async for loop will end if the ws connection from twilio dies
# and if this happens, we should forward an some kind of message to sts
# to signal sts to send back remaining messages before closing(?)
# audio_queue.put_nowait(b'')
await asyncio.wait(
[
asyncio.ensure_future(sts_sender(sts_ws)),
asyncio.ensure_future(sts_receiver(sts_ws)),
asyncio.ensure_future(twilio_receiver(twilio_ws)),
]
)
await twilio_ws.close()
async def router(websocket, path):
print(f"Incoming connection on path: {path}")
if path == "/twilio":
print("Starting Twilio handler")
await twilio_handler(websocket)
def main():
# use this if using ssl
# ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
# ssl_context.load_cert_chain('cert.pem', 'key.pem')
# server = websockets.serve(router, '0.0.0.0', 443, ssl=ssl_context)
# use this if not using ssl
server = websockets.serve(router, "localhost", 5000)
print("Server starting on ws://localhost:5000")
asyncio.get_event_loop().run_until_complete(server)
asyncio.get_event_loop().run_forever()
if __name__ == "__main__":
sys.exit(main() or 0)