From bc39cdb82618904595f8b8f11acaa3ae07d9053e Mon Sep 17 00:00:00 2001 From: Jamin Hitchcock Date: Thu, 30 Jan 2025 20:09:57 -0600 Subject: [PATCH] VOIP - handle ongoing calls for announce and start conversation Allow announcements and conversation starts to be queued up for an existing call. Also allow specifying a SIP username for the HA endpoint which is required for some VOIP servers. --- homeassistant/components/voip/__init__.py | 9 +- .../components/voip/assist_satellite.py | 305 +++++++++++------- .../components/voip/binary_sensor.py | 2 +- homeassistant/components/voip/config_flow.py | 35 +- homeassistant/components/voip/const.py | 3 + homeassistant/components/voip/devices.py | 10 +- homeassistant/components/voip/voip.py | 20 +- homeassistant/components/wyoming/stt.py | 7 +- tests/components/voip/test_binary_sensor.py | 14 +- tests/components/voip/test_voip.py | 86 +++-- 10 files changed, 326 insertions(+), 165 deletions(-) diff --git a/homeassistant/components/voip/__init__.py b/homeassistant/components/voip/__init__.py index 96e758e91f4b8..b27e925ee3394 100644 --- a/homeassistant/components/voip/__init__.py +++ b/homeassistant/components/voip/__init__.py @@ -8,14 +8,16 @@ import logging from voip_utils import SIP_PORT +from voip_utils.sip import get_sip_endpoint from homeassistant.auth.const import GROUP_ID_USER +from homeassistant.components.network import async_get_source_ip from homeassistant.config_entries import ConfigEntry from homeassistant.const import Platform from homeassistant.core import HomeAssistant from homeassistant.helpers import device_registry as dr -from .const import CONF_SIP_PORT, DOMAIN +from .const import CONF_SIP_PORT, CONF_SIP_USER, DEFAULT_SIP_USER, DOMAIN from .devices import VoIPDevices from .voip import HassVoipDatagramProtocol @@ -59,12 +61,15 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: entry, data={**entry.data, "user": voip_user.id} ) + sip_host = await async_get_source_ip(hass) sip_port = entry.options.get(CONF_SIP_PORT, SIP_PORT) + sip_user = entry.options.get(CONF_SIP_USER, DEFAULT_SIP_USER) devices = VoIPDevices(hass, entry) devices.async_setup() + local_endpoint = get_sip_endpoint(sip_host, port=sip_port, username=sip_user) transport, protocol = await _create_sip_server( hass, - lambda: HassVoipDatagramProtocol(hass, devices), + lambda: HassVoipDatagramProtocol(hass, devices, local_endpoint), sip_port, ) _LOGGER.debug("Listening for VoIP calls on port %s", sip_port) diff --git a/homeassistant/components/voip/assist_satellite.py b/homeassistant/components/voip/assist_satellite.py index 1877b8c655c10..4a69d06b48d9c 100644 --- a/homeassistant/components/voip/assist_satellite.py +++ b/homeassistant/components/voip/assist_satellite.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +from collections.abc import Coroutine from enum import IntFlag from functools import partial import io @@ -14,9 +15,9 @@ import wave from voip_utils import SIP_PORT, RtpDatagramProtocol -from voip_utils.sip import SipDatagramProtocol, SipEndpoint, get_sip_endpoint +from voip_utils.sip import SipEndpoint, get_sip_endpoint -from homeassistant.components import tts +from homeassistant.components import assist_satellite, tts from homeassistant.components.assist_pipeline import PipelineEvent, PipelineEventType from homeassistant.components.assist_satellite import ( AssistSatelliteAnnouncement, @@ -25,12 +26,11 @@ AssistSatelliteEntityDescription, AssistSatelliteEntityFeature, ) -from homeassistant.components.network import async_get_source_ip from homeassistant.config_entries import ConfigEntry from homeassistant.core import Context, HomeAssistant, callback from homeassistant.helpers.entity_platform import AddEntitiesCallback -from .const import CHANNELS, CONF_SIP_PORT, DOMAIN, RATE, RTP_AUDIO_SETTINGS, WIDTH +from .const import CHANNELS, DOMAIN, RATE, RTP_AUDIO_SETTINGS, WIDTH from .devices import VoIPDevice from .entity import VoIPEntity @@ -42,8 +42,9 @@ _PIPELINE_TIMEOUT_SEC: Final = 30 _ANNOUNCEMENT_BEFORE_DELAY: Final = 0.5 _ANNOUNCEMENT_AFTER_DELAY: Final = 1.0 -_ANNOUNCEMENT_HANGUP_SEC: Final = 0.5 -_ANNOUNCEMENT_RING_TIMEOUT: Final = 30 +_CALL_HANGUP_SEC: Final = 0.5 +_CANCELLED_TTS_WAIT_TIMEOUT: Final = 5 +_CALL_ANSWER_TIMEOUT: Final = 30 class Tones(IntFlag): @@ -110,6 +111,7 @@ def __init__( self.config_entry = config_entry self._audio_queue: asyncio.Queue[bytes | None] = asyncio.Queue() + self._pipeline_task_queue: asyncio.Queue[Coroutine] = asyncio.Queue() self._audio_chunk_timeout: float = 2.0 self._run_pipeline_task: asyncio.Task | None = None self._pipeline_had_error: bool = False @@ -118,14 +120,10 @@ def __init__( self._tone_bytes: dict[Tones, bytes] = {} self._tones = tones self._processing_tone_done = asyncio.Event() + self._call_answered = asyncio.Event() - self._announcement: AssistSatelliteAnnouncement | None = None - self._announcement_future: asyncio.Future[Any] = asyncio.Future() - self._announcment_start_time: float = 0.0 - self._check_announcement_ended_task: asyncio.Task | None = None + self._check_call_ended_task: asyncio.Task | None = None self._last_chunk_time: float | None = None - self._rtp_port: int | None = None - self._run_pipeline_after_announce: bool = False @property def pipeline_entity_id(self) -> str | None: @@ -171,150 +169,191 @@ async def async_set_configuration( """Set the current satellite configuration.""" raise NotImplementedError - async def async_announce(self, announcement: AssistSatelliteAnnouncement) -> None: - """Announce media on the satellite. - - Plays announcement in a loop, blocking until the caller hangs up. - """ - await self._do_announce(announcement, run_pipeline_after=False) - - async def _do_announce( - self, announcement: AssistSatelliteAnnouncement, run_pipeline_after: bool - ) -> None: - """Announce media on the satellite. - - Optionally run a voice pipeline after the announcement has finished. - """ - self._announcement_future = asyncio.Future() - self._run_pipeline_after_announce = run_pipeline_after - - if self._rtp_port is None: - # Choose random port for RTP + # ------------------------------------------------------------------------- + # RTP/RTCP port selection + # ------------------------------------------------------------------------- + def _choose_rtp_ports(self) -> tuple[int, int]: + while True: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setblocking(False) + + # Bind to a random UDP port sock.bind(("", 0)) - _rtp_ip, self._rtp_port = sock.getsockname() + _, rtp_port = sock.getsockname() + + # Close socket to free port for reuse sock.close() - # HA SIP server - source_ip = await async_get_source_ip(self.hass) - sip_port = self.config_entry.options.get(CONF_SIP_PORT, SIP_PORT) - source_endpoint = get_sip_endpoint(host=source_ip, port=sip_port) + # Check that the next port up is available for RTCP + rtcp_port = rtp_port + 1 + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + sock.bind(("", rtcp_port)) + + # Will be opened again below + sock.close() + + # Found our ports + break + except OSError: + # RTCP port is taken + pass + + return rtp_port, rtcp_port + + async def async_announce( + self, announcement: assist_satellite.AssistSatelliteAnnouncement + ) -> None: + """Announce media on the phone. First call and wait for phone. + + Before calling the phone, we should check to make sure there is not + already an ongoing call. If there is an existing call, the announcement + is queued to be played after current pipeline actions. + """ + _LOGGER.debug( + "Calling phone for announcement (message=%s, media_id=%s, media_id_source=%s)", + announcement.message, + announcement.media_id, + announcement.media_id_source, + ) + rtp_port, rtcp_port = self._choose_rtp_ports() + source = self.hass.data[DOMAIN].protocol.local_endpoint try: # VoIP ID is SIP header - destination_endpoint = SipEndpoint(self.voip_device.voip_id) + destination = SipEndpoint(self.voip_device.voip_id) except ValueError: # VoIP ID is IP address - destination_endpoint = get_sip_endpoint( - host=self.voip_device.voip_id, port=SIP_PORT - ) + destination = get_sip_endpoint(host=self.voip_device.voip_id, port=SIP_PORT) - # Reset state so we can time out if needed - self._last_chunk_time = None - self._announcment_start_time = time.monotonic() - self._announcement = announcement - - # Make the call - sip_protocol: SipDatagramProtocol = self.hass.data[DOMAIN].protocol - call_info = sip_protocol.outgoing_call( - source=source_endpoint, - destination=destination_endpoint, - rtp_port=self._rtp_port, + await self._pipeline_task_queue.put( + self._run_announce_pipeline(announcement.original_media_id) ) - # Check if caller hung up or didn't pick up - self._check_announcement_ended_task = ( - self.config_entry.async_create_background_task( - self.hass, - self._check_announcement_ended(), - "voip_announcement_ended", - ) - ) - - try: - await self._announcement_future - except TimeoutError: - # Stop ringing - sip_protocol.cancel_call(call_info) - raise + # Check to see if there is an existing call, if not call out to the phone + if self.transport is None: + _LOGGER.debug("No existing call, making outgoing call to %s", destination) + self.hass.data[DOMAIN].protocol.outgoing_call(source, destination, rtp_port) + async with asyncio.timeout(_CALL_ANSWER_TIMEOUT): + await self._call_answered.wait() - async def _check_announcement_ended(self) -> None: + async def _check_call_ended(self) -> None: """Continuously checks if an audio chunk was received within a time limit. - If not, the caller is presumed to have hung up and the announcement is ended. + If not, the caller is presumed to have hung up and the call is ended. """ - while self._announcement is not None: - current_time = time.monotonic() - if (self._last_chunk_time is None) and ( - (current_time - self._announcment_start_time) - > _ANNOUNCEMENT_RING_TIMEOUT - ): - # Ring timeout - self._announcement = None - self._check_announcement_ended_task = None - self._announcement_future.set_exception( - TimeoutError("User did not pick up in time") - ) - _LOGGER.debug("Timed out waiting for the user to pick up the phone") - break - + while self.voip_device.is_active(): if (self._last_chunk_time is not None) and ( - (current_time - self._last_chunk_time) > _ANNOUNCEMENT_HANGUP_SEC + (time.monotonic() - self._last_chunk_time) > _CALL_HANGUP_SEC ): # Caller hung up - self._announcement = None - self._announcement_future.set_result(None) - self._check_announcement_ended_task = None - _LOGGER.debug("Announcement ended") + _LOGGER.debug("End of call detected, hanging up") + if self.voip_device.current_call is not None: + self.hass.data[DOMAIN].protocol.hang_up( + self.voip_device.current_call + ) + self.disconnect() # caller hung up + if self._run_pipeline_task is not None: + _LOGGER.debug("Cancel running pipeline") + self._run_pipeline_task.cancel() + self._run_pipeline_task = None + self._clear_pipeline_task_queue() + _LOGGER.debug("Call ended") break - await asyncio.sleep(_ANNOUNCEMENT_HANGUP_SEC / 2) + await asyncio.sleep(_CALL_HANGUP_SEC / 2) async def async_start_conversation( self, start_announcement: AssistSatelliteAnnouncement ) -> None: """Start a conversation from the satellite.""" - await self._do_announce(start_announcement, run_pipeline_after=True) + rtp_port, rtcp_port = self._choose_rtp_ports() + source = self.hass.data[DOMAIN].protocol.local_endpoint + + try: + # VoIP ID is SIP header + destination = SipEndpoint(self.voip_device.voip_id) + except ValueError: + # VoIP ID is IP address + destination = get_sip_endpoint(host=self.voip_device.voip_id, port=SIP_PORT) + + # Play the announcement first, then go into the regular + # conversation pipeline. + await self._pipeline_task_queue.put( + self._run_announce_pipeline(start_announcement.original_media_id) + ) + await self._pipeline_task_queue.put(self._run_pipeline()) + + # Check to see if there is an existing call, if not call out to the phone + if self.transport is None: + _LOGGER.debug("No existing call, making outgoing call to %s", destination) + self.hass.data[DOMAIN].protocol.outgoing_call(source, destination, rtp_port) + async with asyncio.timeout(_CALL_ANSWER_TIMEOUT): + await self._call_answered.wait() # ------------------------------------------------------------------------- # VoIP # ------------------------------------------------------------------------- + def disconnect(self): + """Server disconnected.""" + super().disconnect() + self._call_answered.clear() + if self._check_call_ended_task is not None: + self._check_call_ended_task.cancel() + self._check_call_ended_task = None + + def connection_made(self, transport): + """Server is ready.""" + # If a connection is made with nothing in the queue, we must have + # received a call and we start with the regular pipeline + _LOGGER.debug("Assist satellite connection made") + super().connection_made(transport) + if self._pipeline_task_queue.empty(): + self._pipeline_task_queue.put_nowait(self._run_pipeline()) + # Check if caller hung up + self._last_chunk_time = time.monotonic() + self._call_answered.set() + self._check_call_ended_task = self.config_entry.async_create_background_task( + self.hass, + self._check_call_ended(), + "voip_call_ended", + ) + def on_chunk(self, audio_bytes: bytes) -> None: """Handle raw audio chunk.""" self._last_chunk_time = time.monotonic() - if self._announcement is None: - # Pipeline with STT - if self._run_pipeline_task is None: - # Run pipeline until voice command finishes, then start over - self._clear_audio_queue() - self._tts_done.clear() + if self._run_pipeline_task is None: + # Run pipeline until voice command finishes, then start over + self._clear_audio_queue() + self._tts_done.clear() + try: + coroutine = self._pipeline_task_queue.get_nowait() + _LOGGER.debug("Got task from queue") + self._run_pipeline_task = ( self.config_entry.async_create_background_task( self.hass, - self._run_pipeline(), + coroutine, "voip_pipeline_run", ) ) + except asyncio.QueueEmpty: + _LOGGER.debug("No task on queue") + if self.voip_device.current_call is not None: + self.hass.data[DOMAIN].protocol.hang_up( + self.voip_device.current_call + ) + self.disconnect() - self._audio_queue.put_nowait(audio_bytes) - elif self._run_pipeline_task is None: - # Announcement only - # Play announcement (will repeat) - self._run_pipeline_task = self.config_entry.async_create_background_task( - self.hass, - self._play_announcement(self._announcement), - "voip_play_announcement", - ) + self._audio_queue.put_nowait(audio_bytes) async def _run_pipeline(self) -> None: """Run a pipeline with STT input and TTS output.""" _LOGGER.debug("Starting pipeline") self.async_set_context(Context(user_id=self.config_entry.data["user"])) - self.voip_device.set_is_active(True) async def stt_stream(): while True: @@ -334,6 +373,7 @@ async def stt_stream(): ) if self._pipeline_had_error: + _LOGGER.debug("Pipeline error") self._pipeline_had_error = False await self._play_tone(Tones.ERROR) else: @@ -341,30 +381,52 @@ async def stt_stream(): # # This is set in _send_tts and has a timeout that's based on the # length of the TTS audio. + _LOGGER.debug("Waiting for TTS to finish") await self._tts_done.wait() + + # Add _run_pipeline back on the task queue so it continues + # in a loop, but will occur after any announcements that + # were added in the meantime. + await self._pipeline_task_queue.put(self._run_pipeline()) + except TimeoutError: + if self.voip_device.current_call is not None: + self.hass.data[DOMAIN].protocol.hang_up(self.voip_device.current_call) self.disconnect() # caller hung up + self._clear_pipeline_task_queue() + except asyncio.exceptions.CancelledError: + _LOGGER.debug("Voip pipeline task cancelled") + # If the pipeline got cancelled wait a little longer for it to finish. + # If there is still an ongoing call try to restart the pipeline, otherwise + # just let it end. + await asyncio.sleep(0) + if self.transport is not None: + try: + async with asyncio.timeout(_CANCELLED_TTS_WAIT_TIMEOUT): + await self._tts_done.wait() + except TimeoutError: + _LOGGER.debug("Timed out waiting for TTS to finish") + await self._pipeline_task_queue.put(self._run_pipeline()) finally: # Stop audio stream await self._audio_queue.put(None) - self.voip_device.set_is_active(False) self._run_pipeline_task = None _LOGGER.debug("Pipeline finished") - async def _play_announcement( - self, announcement: AssistSatelliteAnnouncement - ) -> None: + async def _run_announce_pipeline(self, media_id: str) -> None: """Play an announcement once.""" _LOGGER.debug("Playing announcement") try: await asyncio.sleep(_ANNOUNCEMENT_BEFORE_DELAY) - await self._send_tts(announcement.original_media_id, wait_for_tone=False) - - if not self._run_pipeline_after_announce: - # Delay before looping announcement - await asyncio.sleep(_ANNOUNCEMENT_AFTER_DELAY) + await self._send_tts(media_id, wait_for_tone=False) + await asyncio.sleep(_ANNOUNCEMENT_AFTER_DELAY) + except TimeoutError: + if self.voip_device.current_call is not None: + self.hass.data[DOMAIN].protocol.hang_up(self.voip_device.current_call) + self.disconnect() # caller hung up + self._clear_pipeline_task_queue() except Exception: _LOGGER.exception("Unexpected error while playing announcement") raise @@ -372,16 +434,17 @@ async def _play_announcement( self._run_pipeline_task = None _LOGGER.debug("Announcement finished") - if self._run_pipeline_after_announce: - # Clear announcement to allow pipeline to run - self._announcement = None - self._announcement_future.set_result(None) - def _clear_audio_queue(self) -> None: """Ensure audio queue is empty.""" while not self._audio_queue.empty(): self._audio_queue.get_nowait() + def _clear_pipeline_task_queue(self) -> None: + """Ensure audio queue is empty.""" + while not self._pipeline_task_queue.empty(): + _ = self._pipeline_task_queue.get_nowait() + self._pipeline_task_queue.task_done() + def on_pipeline_event(self, event: PipelineEvent) -> None: """Set state based on pipeline stage.""" if event.type == PipelineEventType.STT_END: diff --git a/homeassistant/components/voip/binary_sensor.py b/homeassistant/components/voip/binary_sensor.py index f38b228c46c3f..b99d0bf1adbcc 100644 --- a/homeassistant/components/voip/binary_sensor.py +++ b/homeassistant/components/voip/binary_sensor.py @@ -92,5 +92,5 @@ async def async_will_remove_from_hass(self) -> None: @callback def _is_active_changed(self, device: VoIPDevice) -> None: """Call when active state changed.""" - self._attr_is_on = self.voip_device.is_active + self._attr_is_on = self.voip_device.is_active() self.async_write_ha_state() diff --git a/homeassistant/components/voip/config_flow.py b/homeassistant/components/voip/config_flow.py index 63dcb8f86ee54..3c370735a764f 100644 --- a/homeassistant/components/voip/config_flow.py +++ b/homeassistant/components/voip/config_flow.py @@ -16,7 +16,7 @@ from homeassistant.core import callback from homeassistant.helpers import config_validation as cv -from .const import CONF_SIP_PORT, DOMAIN +from .const import CONF_SIP_PORT, CONF_SIP_USER, DEFAULT_SIP_USER, DOMAIN class VoIPConfigFlow(ConfigFlow, domain=DOMAIN): @@ -58,6 +58,9 @@ async def async_step_init( ) -> ConfigFlowResult: """Manage the options.""" if user_input is not None: + if "enable_advanced" in user_input: + return await self.async_step_advanced() + return self.async_create_entry(title="", data=user_input) return self.async_show_form( @@ -70,7 +73,35 @@ async def async_step_init( CONF_SIP_PORT, SIP_PORT, ), - ): cv.port + ): cv.port, + vol.Optional("enable_advanced"): bool, + } + ), + description_placeholders={ + "note": "Enable advanced options by checking the box.", + }, + ) + + async def async_step_advanced( + self, user_input: dict[str, Any] | None = None + ) -> ConfigFlowResult: + """Manage the advanced options.""" + if user_input is not None: + return self.async_create_entry( + title="", data={**self.config_entry.options, **user_input} + ) + + return self.async_show_form( + step_id="advanced", + data_schema=vol.Schema( + { + vol.Optional( + CONF_SIP_USER, + default=self.config_entry.options.get( + CONF_SIP_USER, + DEFAULT_SIP_USER, + ), + ): str, } ), ) diff --git a/homeassistant/components/voip/const.py b/homeassistant/components/voip/const.py index b4ee5d8ce7aad..0dbee798a4599 100644 --- a/homeassistant/components/voip/const.py +++ b/homeassistant/components/voip/const.py @@ -12,4 +12,7 @@ "sleep_ratio": 0.99, } +DEFAULT_SIP_USER = "HA" + CONF_SIP_PORT = "sip_port" +CONF_SIP_USER = "sip_user" diff --git a/homeassistant/components/voip/devices.py b/homeassistant/components/voip/devices.py index c33ec048cbd38..43c4448bcceef 100644 --- a/homeassistant/components/voip/devices.py +++ b/homeassistant/components/voip/devices.py @@ -21,14 +21,18 @@ class VoIPDevice: voip_id: str device_id: str - is_active: bool = False + current_call: CallInfo | None = None update_listeners: list[Callable[[VoIPDevice], None]] = field(default_factory=list) protocol: VoipDatagramProtocol | None = None + def is_active(self) -> bool: + """Get active state.""" + return self.current_call is not None + @callback - def set_is_active(self, active: bool) -> None: + def set_is_active(self, call_info: CallInfo | None) -> None: """Set active state.""" - self.is_active = active + self.current_call = call_info for listener in self.update_listeners: listener(self) diff --git a/homeassistant/components/voip/voip.py b/homeassistant/components/voip/voip.py index 6f6cf989d3b2f..00736fa2dbd03 100644 --- a/homeassistant/components/voip/voip.py +++ b/homeassistant/components/voip/voip.py @@ -16,6 +16,7 @@ SdpInfo, VoipDatagramProtocol, ) +from voip_utils.sip import SipEndpoint from homeassistant.components.assist_pipeline import ( Pipeline, @@ -79,7 +80,9 @@ def make_protocol( class HassVoipDatagramProtocol(VoipDatagramProtocol): """HA UDP server for Voice over IP (VoIP).""" - def __init__(self, hass: HomeAssistant, devices: VoIPDevices) -> None: + def __init__( + self, hass: HomeAssistant, devices: VoIPDevices, local_endpoint: SipEndpoint + ) -> None: """Set up VoIP call handler.""" super().__init__( sdp_info=SdpInfo( @@ -102,8 +105,17 @@ def __init__(self, hass: HomeAssistant, devices: VoIPDevices) -> None: ) self.hass = hass self.devices = devices + self.local_endpoint = local_endpoint self._closed_event = asyncio.Event() + def on_call(self, call_info: CallInfo): + """Set up state when starting a call.""" + device = self.devices.async_get_or_create(call_info) + device.set_is_active(call_info) + _LOGGER.debug("Set call [%s] on device [%s]", call_info, device) + + super().on_call(call_info) + def is_valid_call(self, call_info: CallInfo) -> bool: """Filter calls.""" device = self.devices.async_get_or_create(call_info) @@ -117,6 +129,12 @@ async def wait_closed(self) -> None: """Wait for connection_lost to be called.""" await self._closed_event.wait() + def on_hangup(self, call_info: CallInfo): + """Handle the end of a call.""" + _LOGGER.debug("Handling hangup: %s", call_info) + device = self.devices.async_get_or_create(call_info) + device.set_is_active(None) + class PreRecordMessageProtocol(RtpDatagramProtocol): """Plays a pre-recorded message on a loop.""" diff --git a/homeassistant/components/wyoming/stt.py b/homeassistant/components/wyoming/stt.py index a28e5fdb527ea..161156916f64b 100644 --- a/homeassistant/components/wyoming/stt.py +++ b/homeassistant/components/wyoming/stt.py @@ -1,5 +1,6 @@ """Support for Wyoming speech-to-text services.""" +import asyncio from collections.abc import AsyncIterable import logging @@ -116,7 +117,11 @@ async def async_process_audio_stream( await client.write_event(AudioStop().event()) while True: - event = await client.read_event() + try: + event = await client.read_event() + except asyncio.exceptions.CancelledError: + _LOGGER.debug("Event read cancelled, retrying") + continue if event is None: _LOGGER.debug("Connection lost") return stt.SpeechResult(None, stt.SpeechResultState.ERROR) diff --git a/tests/components/voip/test_binary_sensor.py b/tests/components/voip/test_binary_sensor.py index 44ac8e4d77f12..c7bfc9ae9081b 100644 --- a/tests/components/voip/test_binary_sensor.py +++ b/tests/components/voip/test_binary_sensor.py @@ -3,6 +3,8 @@ from http import HTTPStatus import pytest +from voip_utils import CallInfo +from voip_utils.sip import SipEndpoint from homeassistant.components.repairs import DOMAIN as REPAIRS_DOMAIN from homeassistant.components.voip import DOMAIN @@ -26,12 +28,20 @@ async def test_call_in_progress( assert state is not None assert state.state == "off" - voip_device.set_is_active(True) + call_info = CallInfo( + caller_endpoint=SipEndpoint("sip:192.168.1.2:5060"), + local_endpoint=SipEndpoint("sip:192.168.1.1:5060"), + caller_rtp_port=10000, + server_ip="127.0.0.1", + headers={}, + ) + + voip_device.set_is_active(call_info) state = hass.states.get("binary_sensor.192_168_1_210_call_in_progress") assert state.state == "on" - voip_device.set_is_active(False) + voip_device.set_is_active(None) state = hass.states.get("binary_sensor.192_168_1_210_call_in_progress") assert state.state == "off" diff --git a/tests/components/voip/test_voip.py b/tests/components/voip/test_voip.py index 442f4a62392ab..3ff50ca782106 100644 --- a/tests/components/voip/test_voip.py +++ b/tests/components/voip/test_voip.py @@ -10,6 +10,7 @@ import pytest from syrupy.assertion import SnapshotAssertion from voip_utils import CallInfo +from voip_utils.sip import SipEndpoint from homeassistant.components import assist_pipeline, assist_satellite, tts, voip from homeassistant.components.assist_satellite import AssistSatelliteEntity @@ -73,7 +74,8 @@ async def test_is_valid_call( ) -> None: """Test that a call is now allowed from an unknown device.""" assert await async_setup_component(hass, "voip", {}) - protocol = HassVoipDatagramProtocol(hass, voip_devices) + local_endpoint = SipEndpoint("sip:192.168.1.1:5060") + protocol = HassVoipDatagramProtocol(hass, voip_devices, local_endpoint) assert not protocol.is_valid_call(call_info) ent_reg = er.async_get(hass) @@ -342,9 +344,9 @@ async def async_get_media_source_audio( patch.object(satellite, "tts_response_finished", tts_response_finished), ): satellite._tones = Tones(0) - satellite.transport = Mock() + transport = Mock() + satellite.connection_made(transport) - satellite.connection_made(satellite.transport) assert satellite.state == AssistSatelliteState.IDLE # Ensure audio queue is cleared before pipeline starts @@ -492,7 +494,9 @@ async def async_get_media_source_audio( for tone in Tones: satellite._tone_bytes[tone] = tone_bytes - satellite.transport = Mock() + transport = Mock() + satellite.connection_made(transport) + satellite.send_audio = Mock() original_send_tts = satellite._send_tts @@ -589,7 +593,9 @@ async def async_get_media_source_audio( new=async_get_media_source_audio, ), ): - satellite.transport = Mock() + transport = Mock() + satellite.connection_made(transport) + satellite.addr = ("192.168.1.1", 12345) original_send_tts = satellite._send_tts @@ -609,10 +615,10 @@ async def send_tts(*args, **kwargs): satellite.on_chunk(bytes([255] * _ONE_SECOND * 2)) # silence (assumes relaxed VAD sensitivity) - satellite.on_chunk(bytes(_ONE_SECOND * 4)) + satellite.on_chunk(bytes(_ONE_SECOND)) # Wait for mock pipeline to exhaust the audio stream - async with asyncio.timeout(1): + async with asyncio.timeout(3): await done.wait() @@ -691,7 +697,9 @@ async def async_get_media_source_audio( new=async_get_media_source_audio, ), ): - satellite.transport = Mock() + transport = Mock() + satellite.connection_made(transport) + satellite.addr = ("192.168.1.1", 12345) original_send_tts = satellite._send_tts @@ -711,10 +719,10 @@ async def send_tts(*args, **kwargs): satellite.on_chunk(bytes([255] * _ONE_SECOND * 2)) # silence (assumes relaxed VAD sensitivity) - satellite.on_chunk(bytes(_ONE_SECOND * 4)) + satellite.on_chunk(bytes(_ONE_SECOND)) # Wait for mock pipeline to exhaust the audio stream - async with asyncio.timeout(1): + async with asyncio.timeout(3): await done.wait() @@ -776,7 +784,9 @@ async def async_pipeline_from_audio_stream(*args, **kwargs): "homeassistant.components.voip.assist_satellite.VoipAssistSatellite._send_tts", ) as mock_send_tts, ): - satellite.transport = Mock() + transport = Mock() + satellite.connection_made(transport) + satellite.addr = ("192.168.1.1", 12345) # silence satellite.on_chunk(bytes(_ONE_SECOND)) @@ -785,10 +795,10 @@ async def async_pipeline_from_audio_stream(*args, **kwargs): satellite.on_chunk(bytes([255] * _ONE_SECOND * 2)) # silence (assumes relaxed VAD sensitivity) - satellite.on_chunk(bytes(_ONE_SECOND * 4)) + satellite.on_chunk(bytes(_ONE_SECOND)) # Wait for mock pipeline to finish - async with asyncio.timeout(1): + async with asyncio.timeout(2): await satellite._tts_done.wait() mock_send_tts.assert_not_called() @@ -833,7 +843,9 @@ async def async_send_audio(audio_bytes: bytes, **kwargs): ), ): satellite._tones = Tones.ERROR - satellite.transport = Mock() + transport = Mock() + satellite.connection_made(transport) + satellite.addr = ("192.168.1.1", 12345) satellite._async_send_audio = AsyncMock(side_effect=async_send_audio) # type: ignore[method-assign] satellite.on_chunk(bytes(_ONE_SECOND)) @@ -878,17 +890,20 @@ async def test_announce( "homeassistant.components.voip.assist_satellite.VoipAssistSatellite._send_tts", ) as mock_send_tts, ): - satellite.transport = Mock() - announce_task = hass.async_create_background_task( + hass.async_create_background_task( satellite.async_announce(announcement), "voip_announce" ) await asyncio.sleep(0) mock_protocol.outgoing_call.assert_called_once() + # Simulate that the connection was made after the outgoing call + transport = Mock() + satellite.connection_made(transport) + # Trigger announcement satellite.on_chunk(bytes(_ONE_SECOND)) - async with asyncio.timeout(1): - await announce_task + # Wait for the simulated answer and for the asynchronous playing of the announcement + await asyncio.sleep(3) mock_send_tts.assert_called_once_with(_MEDIA_ID, wait_for_tone=False) @@ -926,21 +941,21 @@ async def test_voip_id_is_ip_address( "homeassistant.components.voip.assist_satellite.VoipAssistSatellite._send_tts", ) as mock_send_tts, ): - satellite.transport = Mock() - announce_task = hass.async_create_background_task( + hass.async_create_background_task( satellite.async_announce(announcement), "voip_announce" ) await asyncio.sleep(0) mock_protocol.outgoing_call.assert_called_once() - assert ( - mock_protocol.outgoing_call.call_args.kwargs["destination"].host - == "192.168.68.10" - ) + assert mock_protocol.outgoing_call.call_args.args[1].host == "192.168.68.10" + + # Simulate that the connection was made after the outgoing call + transport = Mock() + satellite.connection_made(transport) # Trigger announcement satellite.on_chunk(bytes(_ONE_SECOND)) - async with asyncio.timeout(1): - await announce_task + # Wait for the simulated answer and for the asynchronous playing of the announcement + await asyncio.sleep(3) mock_send_tts.assert_called_once_with(_MEDIA_ID, wait_for_tone=False) @@ -976,13 +991,12 @@ async def test_announce_timeout( # Very short timeout which will trigger because we don't send any audio in with ( patch( - "homeassistant.components.voip.assist_satellite._ANNOUNCEMENT_RING_TIMEOUT", + "homeassistant.components.voip.assist_satellite._CALL_ANSWER_TIMEOUT", 0.01, ), + pytest.raises(TimeoutError), ): - satellite.transport = Mock() - with pytest.raises(TimeoutError): - await satellite.async_announce(announcement) + await satellite.async_announce(announcement) @pytest.mark.usefixtures("socket_enabled") @@ -1064,13 +1078,18 @@ async def async_pipeline_from_audio_stream( new=async_pipeline_from_audio_stream, ), ): - satellite.transport = Mock() conversation_task = hass.async_create_background_task( satellite.async_start_conversation(announcement), "voip_start_conversation" ) await asyncio.sleep(0) mock_protocol.outgoing_call.assert_called_once() + # Simulate that the connection was made after the outgoing call + await asyncio.sleep(0.5) + transport = Mock() + satellite.connection_made(transport) + satellite.addr = ("192.168.1.1", 12345) + # Trigger announcement and wait for it to finish satellite.on_chunk(bytes(_ONE_SECOND)) async with asyncio.timeout(1): @@ -1080,7 +1099,10 @@ async def async_pipeline_from_audio_stream( # Trigger pipeline satellite.on_chunk(bytes(_ONE_SECOND)) - async with asyncio.timeout(1): + await asyncio.sleep(1) + satellite.on_chunk(bytes(_ONE_SECOND)) + await asyncio.sleep(1) + async with asyncio.timeout(4): # Wait for TTS await tts_sent.wait() await conversation_task