Compare commits

...

13 Commits

Author SHA1 Message Date
Erik 0f789a6797 Improve tests of helpers.trigger.extract_devices/entities 2026-05-28 10:06:24 +02:00
Erik Montnemery fd1a5d0c5a Add zone triggers entered/left zone (#171751) 2026-05-28 10:05:41 +02:00
Erik Montnemery 632ec39d53 Deprecate device tracker TrackerEntity location_name property (#171820) 2026-05-28 10:02:28 +02:00
Abílio Costa 67b9d28953 Fix OMIE sensors not updating on setup (#172383) 2026-05-28 08:29:53 +02:00
J. Nick Koston e3880eedb0 Bump yalexs to 9.2.1 (#172389) 2026-05-27 22:01:07 -05:00
J. Nick Koston ce64f5f902 Bump onvif-zeep-async to 4.1.1 (#172391) 2026-05-27 22:00:56 -05:00
J. Nick Koston 0da99a50fc Bump dbus-fast to 5.0.16 (#172378) 2026-05-27 17:16:36 -05:00
Arcadiy Ivanov 43f636be65 Include device identity in Matter light transition blocklist warning (#172324) 2026-05-27 23:58:37 +02:00
Simone Chemelli 262cdbfab5 Bump aioamazondevices to 13.8.1 (#172382) 2026-05-27 23:16:23 +02:00
puddly 8cbd358435 Bump ZHA to 1.4.0 (#172357) 2026-05-27 22:55:07 +02:00
torben-iometer df04b19a0a bump iometer version to 1.0.1 (#172338) 2026-05-27 22:19:20 +02:00
markvp adeb352079 Add GeneralDiagnostics sensors and fault binary sensors to Matter integration (#169830) 2026-05-27 21:07:08 +02:00
Stefan Agner 1e457600f1 Harden backup tar extraction with Python tar_filter (#172252) 2026-05-27 18:10:04 +02:00
55 changed files with 12868 additions and 585 deletions
+2 -4
View File
@@ -92,8 +92,7 @@ def _extract_backup(
):
ostf.tar.extractall(
path=Path(tempdir, "extracted"),
members=securetar.secure_path(ostf.tar),
filter="fully_trusted",
filter="tar",
)
backup_meta_file = Path(tempdir, "extracted", "backup.json")
backup_meta = json.loads(backup_meta_file.read_text(encoding="utf8"))
@@ -119,8 +118,7 @@ def _extract_backup(
) as istf:
istf.extractall(
path=Path(tempdir, "homeassistant"),
members=securetar.secure_path(istf),
filter="fully_trusted",
filter="tar",
)
if restore_content.restore_homeassistant:
keep = list(KEEP_BACKUPS)
@@ -8,5 +8,5 @@
"iot_class": "cloud_polling",
"loggers": ["aioamazondevices"],
"quality_scale": "platinum",
"requirements": ["aioamazondevices==13.8.0"]
"requirements": ["aioamazondevices==13.8.1"]
}
@@ -30,5 +30,5 @@
"integration_type": "hub",
"iot_class": "cloud_push",
"loggers": ["pubnub", "yalexs"],
"requirements": ["yalexs==9.2.0", "yalexs-ble==3.3.0"]
"requirements": ["yalexs==9.2.1", "yalexs-ble==3.3.0"]
}
@@ -20,7 +20,7 @@
"bluetooth-adapters==2.3.0",
"bluetooth-auto-recovery==1.6.4",
"bluetooth-data-tools==1.29.18",
"dbus-fast==5.0.15",
"dbus-fast==5.0.16",
"habluetooth==6.7.9"
]
}
@@ -12,13 +12,18 @@ from homeassistant.const import (
CONF_DOMAIN,
CONF_ENTITY_ID,
CONF_EVENT,
CONF_OPTIONS,
CONF_PLATFORM,
CONF_TYPE,
CONF_ZONE,
)
from homeassistant.core import CALLBACK_TYPE, HomeAssistant
from homeassistant.helpers import config_validation as cv, entity_registry as er
from homeassistant.helpers.trigger import TriggerActionType, TriggerInfo
from homeassistant.helpers.trigger import (
TriggerActionType,
TriggerInfo,
_async_attach_trigger_cls,
)
from homeassistant.helpers.typing import ConfigType
from .const import DOMAIN
@@ -79,16 +84,18 @@ async def async_attach_trigger(
event = zone.EVENT_ENTER
else:
event = zone.EVENT_LEAVE
zone_config = {
CONF_PLATFORM: ZONE_DOMAIN,
CONF_ENTITY_ID: config[CONF_ENTITY_ID],
CONF_ZONE: config[CONF_ZONE],
CONF_EVENT: event,
}
zone_config = await zone.async_validate_trigger_config(hass, zone_config)
return await zone.async_attach_trigger(
hass, zone_config, action, trigger_info, platform_type="device"
zone_config = await zone.LegacyZoneTrigger.async_validate_config(
hass,
{
CONF_OPTIONS: {
CONF_ENTITY_ID: [config[CONF_ENTITY_ID]],
CONF_ZONE: config[CONF_ZONE],
CONF_EVENT: event,
}
},
)
return await _async_attach_trigger_cls(
hass, zone.LegacyZoneTrigger, "device", zone_config, action, trigger_info
)
@@ -1,6 +1,7 @@
"""Provide functionality to keep track of devices."""
import asyncio
import logging
from typing import TYPE_CHECKING, Any, final
from propcache.api import cached_property
@@ -22,6 +23,7 @@ from homeassistant.core import (
EventStateChangedData,
HomeAssistant,
State,
async_get_hass_or_none,
callback,
)
from homeassistant.helpers import (
@@ -37,6 +39,7 @@ from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.entity import Entity, EntityDescription
from homeassistant.helpers.entity_platform import EntityPlatform
from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.loader import async_suggest_report_issue
from homeassistant.util.hass_dict import HassKey
from .const import (
@@ -52,6 +55,8 @@ from .const import (
SourceType,
)
_LOGGER = logging.getLogger(__name__)
DATA_KEY: HassKey[dict[str, tuple[str, str]]] = HassKey(f"{DOMAIN}_mac")
@@ -212,13 +217,38 @@ class TrackerEntity(
_attr_in_zones: list[str] | None = None
_attr_latitude: float | None = None
_attr_location_accuracy: float = 0
# _attr_location_name is deprecated and will be removed in Home Assistant 2027.7
_attr_location_name: str | None = None
_attr_longitude: float | None = None
_attr_source_type: SourceType = SourceType.GPS
__active_zone: State | None = None
# If we reported setting deprecated _attr_location_name
__deprecated_attr_location_name_reported = False
__in_zones: list[str] | None = None
def __init_subclass__(cls, **kwargs: Any) -> None:
"""Post initialisation processing."""
super().__init_subclass__(**kwargs)
if "location_name" in cls.__dict__:
if cls.__module__.startswith("homeassistant.components."):
# Don't ask users to report issue for built in integrations,
# they already have issues opened on them.
return
report_issue = async_suggest_report_issue(
async_get_hass_or_none(), module=cls.__module__
)
_LOGGER.warning(
(
"%s::%s is overriding the deprecated location_name property on "
"an instance of TrackerEntity, this will be unsupported from "
"Home Assistant 2027.7, please %s"
),
cls.__module__,
cls.__name__,
report_issue,
)
@cached_property
def should_poll(self) -> bool:
"""No polling for entities that have location pushed."""
@@ -249,7 +279,32 @@ class TrackerEntity(
@cached_property
def location_name(self) -> str | None:
"""Return a location name for the current location of the device."""
"""Return a location name for the current location of the device.
The property is deprecated and will be removed in Home Assistant 2027.7.
"""
if (location_name := self._attr_location_name) is not None:
if (
not self.__deprecated_attr_location_name_reported
and not self.__class__.__module__.startswith(
"homeassistant.components."
)
):
report_issue = async_suggest_report_issue(
self.hass, module=self.__class__.__module__
)
_LOGGER.warning(
(
"%s::%s is setting the deprecated _attr_location_name attribute "
"on an instance of TrackerEntity, this will be unsupported from "
"Home Assistant 2027.7, please %s"
),
self.__class__.__module__,
self.__class__.__name__,
report_issue,
)
self.__deprecated_attr_location_name_reported = True
return location_name
return self._attr_location_name
@cached_property
@@ -7,6 +7,6 @@
"integration_type": "device",
"iot_class": "local_polling",
"quality_scale": "bronze",
"requirements": ["iometer==0.4.0"],
"requirements": ["iometer==1.0.1"],
"zeroconf": ["_iometer._tcp.local."]
}
@@ -556,4 +556,48 @@ DISCOVERY_SCHEMAS = [
featuremap_contains=clusters.Thermostat.Bitmaps.Feature.kOccupancy,
allow_multi=True,
),
# GeneralDiagnostics active fault sensors
MatterDiscoverySchema(
platform=Platform.BINARY_SENSOR,
entity_description=MatterBinarySensorEntityDescription(
key="GeneralDiagnosticsActiveHardwareFaults",
translation_key="active_hardware_faults",
device_class=BinarySensorDeviceClass.PROBLEM,
entity_category=EntityCategory.DIAGNOSTIC,
entity_registry_enabled_default=False,
device_to_ha=bool,
),
entity_class=MatterBinarySensor,
required_attributes=(
clusters.GeneralDiagnostics.Attributes.ActiveHardwareFaults,
),
),
MatterDiscoverySchema(
platform=Platform.BINARY_SENSOR,
entity_description=MatterBinarySensorEntityDescription(
key="GeneralDiagnosticsActiveRadioFaults",
translation_key="active_radio_faults",
device_class=BinarySensorDeviceClass.PROBLEM,
entity_category=EntityCategory.DIAGNOSTIC,
entity_registry_enabled_default=False,
device_to_ha=bool,
),
entity_class=MatterBinarySensor,
required_attributes=(clusters.GeneralDiagnostics.Attributes.ActiveRadioFaults,),
),
MatterDiscoverySchema(
platform=Platform.BINARY_SENSOR,
entity_description=MatterBinarySensorEntityDescription(
key="GeneralDiagnosticsActiveNetworkFaults",
translation_key="active_network_faults",
device_class=BinarySensorDeviceClass.PROBLEM,
entity_category=EntityCategory.DIAGNOSTIC,
entity_registry_enabled_default=False,
device_to_ha=bool,
),
entity_class=MatterBinarySensor,
required_attributes=(
clusters.GeneralDiagnostics.Attributes.ActiveNetworkFaults,
),
),
]
+8 -1
View File
@@ -457,7 +457,14 @@ class MatterLight(MatterEntity, LightEntity):
self._transitions_disabled = True
LOGGER.warning(
"Detected a device that has been reported to have firmware issues "
"with light transitions. Transitions will be disabled for this light"
"with light transitions. Transitions will be disabled for this "
"light: %s %s (vendor_id: %s, product_id: %s, hw: %s, sw: %s)",
device_info.vendorName,
device_info.productName,
device_info.vendorID,
device_info.productID,
device_info.hardwareVersionString,
device_info.softwareVersionString,
)
+53
View File
@@ -137,6 +137,17 @@ RVC_OPERATIONAL_STATE_ERROR_MAP = {
_rvc_err.kNavigationSensorObscured: ("navigation_sensor_obscured"),
}
BOOT_REASON_MAP = {
clusters.GeneralDiagnostics.Enums.BootReasonEnum.kUnspecified: "unspecified",
clusters.GeneralDiagnostics.Enums.BootReasonEnum.kPowerOnReboot: "power_on_reboot",
clusters.GeneralDiagnostics.Enums.BootReasonEnum.kBrownOutReset: "brown_out_reset",
clusters.GeneralDiagnostics.Enums.BootReasonEnum.kSoftwareWatchdogReset: "software_watchdog_reset",
clusters.GeneralDiagnostics.Enums.BootReasonEnum.kHardwareWatchdogReset: "hardware_watchdog_reset",
clusters.GeneralDiagnostics.Enums.BootReasonEnum.kSoftwareUpdateCompleted: "software_update_completed",
clusters.GeneralDiagnostics.Enums.BootReasonEnum.kSoftwareReset: "software_reset",
clusters.GeneralDiagnostics.Enums.BootReasonEnum.kUnknownEnumValue: None,
}
BOOST_STATE_MAP = {
clusters.WaterHeaterManagement.Enums.BoostStateEnum.kInactive: "inactive",
clusters.WaterHeaterManagement.Enums.BoostStateEnum.kActive: "active",
@@ -1575,4 +1586,46 @@ DISCOVERY_SCHEMAS = [
required_attributes=(clusters.DoorLock.Attributes.DoorClosedEvents,),
featuremap_contains=clusters.DoorLock.Bitmaps.Feature.kDoorPositionSensor,
),
# GeneralDiagnostics cluster sensors
MatterDiscoverySchema(
platform=Platform.SENSOR,
entity_description=MatterSensorEntityDescription(
key="GeneralDiagnosticsRebootCount",
translation_key="reboot_count",
entity_category=EntityCategory.DIAGNOSTIC,
entity_registry_enabled_default=False,
state_class=SensorStateClass.TOTAL_INCREASING,
),
entity_class=MatterSensor,
required_attributes=(clusters.GeneralDiagnostics.Attributes.RebootCount,),
),
MatterDiscoverySchema(
platform=Platform.SENSOR,
entity_description=MatterSensorEntityDescription(
key="GeneralDiagnosticsUpTime",
translation_key="uptime",
device_class=SensorDeviceClass.UPTIME,
entity_category=EntityCategory.DIAGNOSTIC,
entity_registry_enabled_default=False,
device_to_ha=lambda uptime: dt_util.utcnow() - timedelta(seconds=uptime),
),
entity_class=MatterSensor,
required_attributes=(clusters.GeneralDiagnostics.Attributes.UpTime,),
),
MatterDiscoverySchema(
platform=Platform.SENSOR,
entity_description=MatterSensorEntityDescription(
key="GeneralDiagnosticsBootReason",
translation_key="boot_reason",
device_class=SensorDeviceClass.ENUM,
entity_category=EntityCategory.DIAGNOSTIC,
entity_registry_enabled_default=False,
options=[
reason for reason in BOOT_REASON_MAP.values() if reason is not None
],
device_to_ha=BOOT_REASON_MAP.get,
),
entity_class=MatterSensor,
required_attributes=(clusters.GeneralDiagnostics.Attributes.BootReason,),
),
]
@@ -47,6 +47,15 @@
},
"entity": {
"binary_sensor": {
"active_hardware_faults": {
"name": "Hardware faults"
},
"active_network_faults": {
"name": "Network faults"
},
"active_radio_faults": {
"name": "Radio faults"
},
"actuator": {
"name": "Actuator"
},
@@ -408,6 +417,18 @@
"battery_voltage": {
"name": "Battery voltage"
},
"boot_reason": {
"name": "Boot reason",
"state": {
"brown_out_reset": "Brownout reset",
"hardware_watchdog_reset": "Hardware watchdog reset",
"power_on_reboot": "Power-on reboot",
"software_reset": "Software reset",
"software_update_completed": "Software update completed",
"software_watchdog_reset": "Software watchdog reset",
"unspecified": "Unspecified"
}
},
"contamination_state": {
"name": "Contamination state",
"state": {
@@ -576,6 +597,9 @@
"reactive_current": {
"name": "Reactive current"
},
"reboot_count": {
"name": "Reboot count"
},
"rms_current": {
"name": "Effective current"
},
@@ -600,6 +624,9 @@
"medium": "[%key:common::state::medium%]"
}
},
"uptime": {
"name": "Uptime"
},
"valve_position": {
"name": "Valve position"
},
+5
View File
@@ -52,6 +52,11 @@ class OMIEPriceSensor(CoordinatorEntity[OMIECoordinator], SensorEntity):
self._attr_unique_id = pyomie_series_name
self._pyomie_series_name = pyomie_series_name
async def async_added_to_hass(self) -> None:
"""Call when entity is added to hass."""
await super().async_added_to_hass()
self._handle_coordinator_update()
@callback
def _handle_coordinator_update(self) -> None:
"""Update this sensor's state from the coordinator results."""
+1 -1
View File
@@ -14,7 +14,7 @@
"iot_class": "local_push",
"loggers": ["onvif", "wsdiscovery", "zeep"],
"requirements": [
"onvif-zeep-async==4.1.0",
"onvif-zeep-async==4.1.1",
"onvif_parsers==2.3.0",
"WSDiscovery==2.1.2"
]
+1 -1
View File
@@ -14,5 +14,5 @@
"integration_type": "hub",
"iot_class": "cloud_push",
"loggers": ["socketio", "engineio", "yalexs"],
"requirements": ["yalexs==9.2.0", "yalexs-ble==3.3.0"]
"requirements": ["yalexs==9.2.1", "yalexs-ble==3.3.0"]
}
@@ -80,31 +80,31 @@ class ZHAAlarmControlPanel(ZHAEntity, AlarmControlPanelEntity):
"""Whether the code is required for arm actions."""
return self.entity_data.entity.code_arm_required
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_alarm_disarm(self, code: str | None = None) -> None:
"""Send disarm command."""
await self.entity_data.entity.async_alarm_disarm(code)
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_alarm_arm_home(self, code: str | None = None) -> None:
"""Send arm home command."""
await self.entity_data.entity.async_alarm_arm_home(code)
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_alarm_arm_away(self, code: str | None = None) -> None:
"""Send arm away command."""
await self.entity_data.entity.async_alarm_arm_away(code)
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_alarm_arm_night(self, code: str | None = None) -> None:
"""Send arm night command."""
await self.entity_data.entity.async_alarm_arm_night(code)
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_alarm_trigger(self, code: str | None = None) -> None:
"""Send alarm trigger command."""
await self.entity_data.entity.async_alarm_trigger(code)
+1 -1
View File
@@ -52,7 +52,7 @@ class ZHAButton(ZHAEntity, ButtonEntity):
self.entity_data.entity.info_object.device_class
)
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_press(self) -> None:
"""Send out a update command."""
await self.entity_data.entity.async_press()
+4 -4
View File
@@ -203,25 +203,25 @@ class Thermostat(ZHAEntity, ClimateEntity):
)
super()._handle_entity_events(event)
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_set_fan_mode(self, fan_mode: str) -> None:
"""Set fan mode."""
await self.entity_data.entity.async_set_fan_mode(fan_mode=fan_mode)
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_set_hvac_mode(self, hvac_mode: HVACMode) -> None:
"""Set new target operation mode."""
await self.entity_data.entity.async_set_hvac_mode(hvac_mode=hvac_mode)
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_set_preset_mode(self, preset_mode: str) -> None:
"""Set new preset mode."""
await self.entity_data.entity.async_set_preset_mode(preset_mode=preset_mode)
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_set_temperature(self, **kwargs: Any) -> None:
"""Set new target temperature."""
await self.entity_data.entity.async_set_temperature(
+4
View File
@@ -75,3 +75,7 @@ MFG_CLUSTER_ID_START = 0xFC00
ZHA_ALARM_OPTIONS = "zha_alarm_options"
ZHA_OPTIONS = "zha_options"
# Dispatcher signal carrying device reconfigure progress events (bind result,
# attribute reporting result, configure complete) to the websocket subscriber.
SIGNAL_DEVICE_RECONFIGURE_EVENT = "zha_device_reconfigure_event"
+8 -8
View File
@@ -122,31 +122,31 @@ class ZhaCover(ZHAEntity, CoverEntity):
"""Return the current tilt position of the cover."""
return self.entity_data.entity.current_cover_tilt_position
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_open_cover(self, **kwargs: Any) -> None:
"""Open the cover."""
await self.entity_data.entity.async_open_cover()
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_open_cover_tilt(self, **kwargs: Any) -> None:
"""Open the cover tilt."""
await self.entity_data.entity.async_open_cover_tilt()
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_close_cover(self, **kwargs: Any) -> None:
"""Close the cover."""
await self.entity_data.entity.async_close_cover()
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_close_cover_tilt(self, **kwargs: Any) -> None:
"""Close the cover tilt."""
await self.entity_data.entity.async_close_cover_tilt()
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_set_cover_position(self, **kwargs: Any) -> None:
"""Move the cover to a specific position."""
await self.entity_data.entity.async_set_cover_position(
@@ -154,7 +154,7 @@ class ZhaCover(ZHAEntity, CoverEntity):
)
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_set_cover_tilt_position(self, **kwargs: Any) -> None:
"""Move the cover tilt to a specific position."""
await self.entity_data.entity.async_set_cover_tilt_position(
@@ -162,13 +162,13 @@ class ZhaCover(ZHAEntity, CoverEntity):
)
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_stop_cover(self, **kwargs: Any) -> None:
"""Stop the cover."""
await self.entity_data.entity.async_stop_cover()
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_stop_cover_tilt(self, **kwargs: Any) -> None:
"""Stop the cover tilt."""
await self.entity_data.entity.async_stop_cover_tilt()
+77 -93
View File
@@ -3,36 +3,26 @@
from typing import Any
import voluptuous as vol
from zha.exceptions import ZHAException
from zha.zigbee.cluster_handlers.const import (
CLUSTER_HANDLER_IAS_WD,
CLUSTER_HANDLER_INOVELLI,
)
from zha.zigbee.cluster_handlers.manufacturerspecific import (
AllLEDEffectType,
SingleLEDEffectType,
)
from zhaquirks.inovelli.types import AllLEDEffectType, SingleLEDEffectType
from zigpy.zcl.clusters.security import IasWd
from homeassistant.components.device_automation import InvalidDeviceAutomationConfig
from homeassistant.const import CONF_DEVICE_ID, CONF_DOMAIN, CONF_TYPE
from homeassistant.core import Context, HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.typing import ConfigType, TemplateVarsType
from .const import DOMAIN
from .helpers import async_get_zha_device_proxy
from .helpers import async_get_zha_device_proxy, convert_zha_error_to_ha_error
from .websocket_api import SERVICE_WARNING_DEVICE_SQUAWK, SERVICE_WARNING_DEVICE_WARN
# mypy: disallow-any-generics
INOVELLI_CLUSTER_ID = 0xFC31
ACTION_SQUAWK = "squawk"
ACTION_WARN = "warn"
ATTR_DATA = "data"
ATTR_IEEE = "ieee"
CONF_ZHA_ACTION_TYPE = "zha_action_type"
ZHA_ACTION_TYPE_SERVICE_CALL = "service_call"
ZHA_ACTION_TYPE_CLUSTER_HANDLER_COMMAND = "cluster_handler_command"
INOVELLI_ALL_LED_EFFECT = "issue_all_led_effect"
INOVELLI_INDIVIDUAL_LED_EFFECT = "issue_individual_led_effect"
@@ -73,24 +63,18 @@ ACTION_SCHEMA = vol.Any(
DEFAULT_ACTION_SCHEMA,
)
DEVICE_ACTIONS = {
CLUSTER_HANDLER_IAS_WD: [
# Maps a cluster_id the device must expose to the available actions.
DEVICE_ACTIONS_BY_CLUSTER_ID: dict[int, list[dict[str, str]]] = {
IasWd.cluster_id: [
{CONF_TYPE: ACTION_SQUAWK, CONF_DOMAIN: DOMAIN},
{CONF_TYPE: ACTION_WARN, CONF_DOMAIN: DOMAIN},
],
CLUSTER_HANDLER_INOVELLI: [
INOVELLI_CLUSTER_ID: [
{CONF_TYPE: INOVELLI_ALL_LED_EFFECT, CONF_DOMAIN: DOMAIN},
{CONF_TYPE: INOVELLI_INDIVIDUAL_LED_EFFECT, CONF_DOMAIN: DOMAIN},
],
}
DEVICE_ACTION_TYPES = {
ACTION_SQUAWK: ZHA_ACTION_TYPE_SERVICE_CALL,
ACTION_WARN: ZHA_ACTION_TYPE_SERVICE_CALL,
INOVELLI_ALL_LED_EFFECT: ZHA_ACTION_TYPE_CLUSTER_HANDLER_COMMAND,
INOVELLI_INDIVIDUAL_LED_EFFECT: ZHA_ACTION_TYPE_CLUSTER_HANDLER_COMMAND,
}
DEVICE_ACTION_SCHEMAS = {
INOVELLI_ALL_LED_EFFECT: vol.Schema(
{
@@ -116,11 +100,6 @@ SERVICE_NAMES = {
ACTION_WARN: SERVICE_WARNING_DEVICE_WARN,
}
CLUSTER_HANDLER_MAPPINGS = {
INOVELLI_ALL_LED_EFFECT: CLUSTER_HANDLER_INOVELLI,
INOVELLI_INDIVIDUAL_LED_EFFECT: CLUSTER_HANDLER_INOVELLI,
}
async def async_call_action_from_config(
hass: HomeAssistant,
@@ -129,9 +108,9 @@ async def async_call_action_from_config(
context: Context | None,
) -> None:
"""Perform an action based on configuration."""
await ZHA_ACTION_TYPES[DEVICE_ACTION_TYPES[config[CONF_TYPE]]](
hass, config, variables, context
)
action_type = config[CONF_TYPE]
handler = ACTION_HANDLERS[action_type]
await handler(hass, config, context)
async def async_validate_action_config(
@@ -150,19 +129,18 @@ async def async_get_actions(
zha_device = async_get_zha_device_proxy(hass, device_id).device
except KeyError, AttributeError:
return []
cluster_handlers = [
ch.name
for endpoint in zha_device.endpoints.values()
for ch in endpoint.claimed_cluster_handlers.values()
]
actions = [
action
for cluster_handler, cluster_handler_actions in DEVICE_ACTIONS.items()
for action in cluster_handler_actions
if cluster_handler in cluster_handlers
]
for action in actions:
action[CONF_DEVICE_ID] = device_id
cluster_ids = {
cluster_id
for ep_id, endpoint in zha_device.device.endpoints.items()
if ep_id != 0
for cluster_id in endpoint.in_clusters
}
actions: list[dict[str, str]] = []
for required_cluster_id, cluster_actions in DEVICE_ACTIONS_BY_CLUSTER_ID.items():
if required_cluster_id in cluster_ids:
actions.extend(
{**action, CONF_DEVICE_ID: device_id} for action in cluster_actions
)
return actions
@@ -175,69 +153,75 @@ async def async_get_action_capabilities(
return {"extra_fields": fields}
async def _execute_service_based_action(
async def _execute_siren_service(
hass: HomeAssistant,
config: dict[str, Any],
variables: TemplateVarsType,
context: Context | None,
) -> None:
action_type = config[CONF_TYPE]
service_name = SERVICE_NAMES[action_type]
try:
zha_device = async_get_zha_device_proxy(hass, config[CONF_DEVICE_ID]).device
except KeyError, AttributeError:
return
service_data = {ATTR_IEEE: str(zha_device.ieee)}
await hass.services.async_call(
DOMAIN, service_name, service_data, blocking=True, context=context
DOMAIN,
SERVICE_NAMES[config[CONF_TYPE]],
{ATTR_IEEE: str(zha_device.ieee)},
blocking=True,
context=context,
)
async def _execute_cluster_handler_command_based_action(
hass: HomeAssistant,
config: dict[str, Any],
variables: TemplateVarsType,
context: Context | None,
) -> None:
action_type = config[CONF_TYPE]
cluster_handler_name = CLUSTER_HANDLER_MAPPINGS[action_type]
def _find_inovelli_cluster(hass: HomeAssistant, config: dict[str, Any]) -> Any:
try:
zha_device = async_get_zha_device_proxy(hass, config[CONF_DEVICE_ID]).device
except KeyError, AttributeError:
return
action_cluster_handler = None
for endpoint in zha_device.endpoints.values():
for cluster_handler in endpoint.all_cluster_handlers.values():
if cluster_handler.name == cluster_handler_name:
action_cluster_handler = cluster_handler
break
if action_cluster_handler is None:
except (KeyError, AttributeError) as err:
raise InvalidDeviceAutomationConfig(
f"Unable to execute cluster handler action -"
f" cluster handler: {cluster_handler_name} action:"
f" {action_type}"
)
if not hasattr(action_cluster_handler, action_type):
raise InvalidDeviceAutomationConfig(
f"Unable to execute cluster handler -"
f" cluster handler: {cluster_handler_name} action:"
f" {action_type}"
)
f"ZHA device {config[CONF_DEVICE_ID]} not found"
) from err
try:
await getattr(action_cluster_handler, action_type)(**config)
except ZHAException as err:
raise HomeAssistantError(err) from err
return zha_device.device.find_cluster(cluster_id=INOVELLI_CLUSTER_ID)
except ValueError as err:
raise InvalidDeviceAutomationConfig(
f"Device does not expose Inovelli cluster 0x{INOVELLI_CLUSTER_ID:04x}"
) from err
ZHA_ACTION_TYPES = {
ZHA_ACTION_TYPE_SERVICE_CALL: _execute_service_based_action,
ZHA_ACTION_TYPE_CLUSTER_HANDLER_COMMAND: (
_execute_cluster_handler_command_based_action
),
async def _execute_inovelli_all_led_effect(
hass: HomeAssistant,
config: dict[str, Any],
context: Context | None,
) -> None:
cluster = _find_inovelli_cluster(hass, config)
async with convert_zha_error_to_ha_error():
await cluster.led_effect(
led_effect=config["effect_type"],
led_color=config["color"],
led_level=config["level"],
led_duration=config["duration"],
)
async def _execute_inovelli_individual_led_effect(
hass: HomeAssistant,
config: dict[str, Any],
context: Context | None,
) -> None:
cluster = _find_inovelli_cluster(hass, config)
async with convert_zha_error_to_ha_error():
await cluster.individual_led_effect(
led_effect=config["effect_type"],
led_color=config["color"],
led_level=config["level"],
led_duration=config["duration"],
led_number=config["led_number"],
)
ACTION_HANDLERS = {
ACTION_SQUAWK: _execute_siren_service,
ACTION_WARN: _execute_siren_service,
INOVELLI_ALL_LED_EFFECT: _execute_inovelli_all_led_effect,
INOVELLI_INDIVIDUAL_LED_EFFECT: _execute_inovelli_individual_led_effect,
}
+1 -1
View File
@@ -206,7 +206,7 @@ class ZHAEntity(LogMixin, RestoreEntity, Entity):
await super().async_will_remove_from_hass()
self.remove_future.set_result(True)
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_update(self) -> None:
"""Update the entity."""
await self.entity_data.entity.async_update()
+4 -4
View File
@@ -92,7 +92,7 @@ class ZhaFan(FanEntity, ZHAEntity):
"""Return the number of speeds the fan supports."""
return self.entity_data.entity.speed_count
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_turn_on(
self,
percentage: int | None = None,
@@ -105,19 +105,19 @@ class ZhaFan(FanEntity, ZHAEntity):
)
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the entity off."""
await self.entity_data.entity.async_turn_off()
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_set_percentage(self, percentage: int) -> None:
"""Set the speed percentage of the fan."""
await self.entity_data.entity.async_set_percentage(percentage=percentage)
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_set_preset_mode(self, preset_mode: str) -> None:
"""Set the preset mode for the fan."""
await self.entity_data.entity.async_set_preset_mode(preset_mode=preset_mode)
+63 -56
View File
@@ -2,23 +2,23 @@
import asyncio
import collections
from collections.abc import Awaitable, Callable, Coroutine, Mapping
from collections.abc import AsyncGenerator, Callable, Mapping
from contextlib import asynccontextmanager
import copy
import dataclasses
import enum
import functools
import itertools
import logging
import queue
import re
import time
from types import MappingProxyType
from typing import TYPE_CHECKING, Any, Concatenate, NamedTuple, cast
from typing import TYPE_CHECKING, Any, NamedTuple, cast
from zoneinfo import ZoneInfo
import voluptuous as vol
from zha.application import Platform as ZhaPlatform
from zha.application.const import (
ATTR_CLUSTER_ID,
ATTR_DEVICE_IEEE,
ATTR_TYPE,
ATTR_UNIQUE_ID,
@@ -28,11 +28,6 @@ from zha.application.const import (
CONF_DEFAULT_CONSIDER_UNAVAILABLE_MAINS,
UNKNOWN_MANUFACTURER,
UNKNOWN_MODEL,
ZHA_CLUSTER_HANDLER_CFG_DONE,
ZHA_CLUSTER_HANDLER_MSG,
ZHA_CLUSTER_HANDLER_MSG_BIND,
ZHA_CLUSTER_HANDLER_MSG_CFG_RPT,
ZHA_CLUSTER_HANDLER_MSG_DATA,
ZHA_EVENT,
ZHA_GW_MSG,
ZHA_GW_MSG_DEVICE_FULL_INIT,
@@ -71,10 +66,11 @@ from zha.application.platforms import GroupEntity, PlatformEntity
from zha.event import EventBase
from zha.exceptions import ZHAException
from zha.mixins import LogMixin
from zha.zigbee.cluster_handlers import ClusterBindEvent, ClusterConfigureReportingEvent
from zha.zigbee.device import (
ClusterHandlerConfigurationComplete,
ClusterBindEvent,
ClusterConfigureReportingEvent,
Device,
DeviceConfiguredEvent,
DeviceEntityAddedEvent,
DeviceEntityRemovedEvent,
DeviceFirmwareInfoUpdatedEvent,
@@ -126,9 +122,7 @@ from homeassistant.util.logging import HomeAssistantQueueHandler
from .const import (
ATTR_ACTIVE_COORDINATOR,
ATTR_ATTRIBUTES,
ATTR_AVAILABLE,
ATTR_CLUSTER_NAME,
ATTR_DEVICE_TYPE,
ATTR_ENDPOINT_NAMES,
ATTR_EXPOSES_FEATURES,
@@ -144,7 +138,6 @@ from .const import (
ATTR_ROUTES,
ATTR_RSSI,
ATTR_SIGNATURE,
ATTR_SUCCESS,
CONF_ALARM_ARM_REQUIRES_CODE,
CONF_ALARM_FAILED_TRIES,
CONF_ALARM_MASTER_CODE,
@@ -168,6 +161,7 @@ from .const import (
DEFAULT_DATABASE_NAME,
DEVICE_PAIRING_STATUS,
DOMAIN,
SIGNAL_DEVICE_RECONFIGURE_EVENT,
ZHA_ALARM_OPTIONS,
ZHA_OPTIONS,
)
@@ -450,50 +444,46 @@ class ZHADeviceProxy(EventBase):
)
@callback
def handle_zha_channel_configure_reporting(
def handle_zha_cluster_bind(self, event: ClusterBindEvent) -> None:
"""Forward a cluster bind result to the reconfigure websocket."""
async_dispatcher_send(
self.gateway_proxy.hass,
SIGNAL_DEVICE_RECONFIGURE_EVENT,
{
"type": "zha_channel_bind",
"zha_channel_msg_data": {
"cluster_name": event.cluster_name,
"cluster_id": event.cluster_id,
"success": event.success,
},
},
)
@callback
def handle_zha_cluster_configure_reporting(
self, event: ClusterConfigureReportingEvent
) -> None:
"""Handle a ZHA cluster configure reporting event."""
"""Forward a cluster reporting-configured result to the reconfigure websocket."""
async_dispatcher_send(
self.gateway_proxy.hass,
ZHA_CLUSTER_HANDLER_MSG,
SIGNAL_DEVICE_RECONFIGURE_EVENT,
{
ATTR_TYPE: ZHA_CLUSTER_HANDLER_MSG_CFG_RPT,
ZHA_CLUSTER_HANDLER_MSG_DATA: {
ATTR_CLUSTER_NAME: event.cluster_name,
ATTR_CLUSTER_ID: event.cluster_id,
ATTR_ATTRIBUTES: event.attributes,
"type": "zha_channel_configure_reporting",
"zha_channel_msg_data": {
"cluster_name": event.cluster_name,
"cluster_id": event.cluster_id,
"attributes": event.attributes,
},
},
)
@callback
def handle_zha_channel_cfg_done(
self, event: ClusterHandlerConfigurationComplete
) -> None:
"""Handle a ZHA cluster configure reporting event."""
def handle_zha_device_configured(self, event: DeviceConfiguredEvent) -> None:
"""Forward the device configuration-complete signal to the reconfigure websocket."""
async_dispatcher_send(
self.gateway_proxy.hass,
ZHA_CLUSTER_HANDLER_MSG,
{
ATTR_TYPE: ZHA_CLUSTER_HANDLER_CFG_DONE,
},
)
@callback
def handle_zha_channel_bind(self, event: ClusterBindEvent) -> None:
"""Handle a ZHA cluster bind event."""
async_dispatcher_send(
self.gateway_proxy.hass,
ZHA_CLUSTER_HANDLER_MSG,
{
ATTR_TYPE: ZHA_CLUSTER_HANDLER_MSG_BIND,
ZHA_CLUSTER_HANDLER_MSG_DATA: {
ATTR_CLUSTER_NAME: event.cluster_name,
ATTR_CLUSTER_ID: event.cluster_id,
ATTR_SUCCESS: event.success,
},
},
SIGNAL_DEVICE_RECONFIGURE_EVENT,
{"type": "zha_channel_cfg_done"},
)
@callback
@@ -501,6 +491,9 @@ class ZHADeviceProxy(EventBase):
self, event: DeviceEntityAddedEvent
) -> None:
"""Handle a new entity being added to a device at runtime."""
if event.platform is ZhaPlatform.VIRTUAL:
return
key = (event.platform, event.unique_id)
if (entity := self.device.platform_entities.get(key)) is None:
return
@@ -515,6 +508,9 @@ class ZHADeviceProxy(EventBase):
self, event: DeviceEntityRemovedEvent
) -> None:
"""Handle an entity being removed from a device at runtime."""
if event.platform is ZhaPlatform.VIRTUAL:
return
if not event.remove:
# Soft remove: signal the entity to unload; registry entry stays
async_dispatcher_send(
@@ -911,6 +907,9 @@ class ZHAGatewayProxy(EventBase):
if isinstance(proxy_object, ZHADeviceProxy):
for entity in proxy_object.device.platform_entities.values():
if entity.PLATFORM is ZhaPlatform.VIRTUAL:
continue
ha_zha_data.platforms[Platform(entity.PLATFORM)].append(
EntityData(
entity=entity, device_proxy=proxy_object, group_proxy=None
@@ -918,6 +917,9 @@ class ZHAGatewayProxy(EventBase):
)
else:
for entity in proxy_object.group.group_entities.values():
if entity.PLATFORM is ZhaPlatform.VIRTUAL:
continue
ha_zha_data.platforms[Platform(entity.PLATFORM)].append(
EntityData(
entity=entity,
@@ -1386,19 +1388,24 @@ def create_zha_config(hass: HomeAssistant, ha_zha_data: HAZHAData) -> ZHAData:
)
def convert_zha_error_to_ha_error[**_P, _EntityT: ZHAEntity](
func: Callable[Concatenate[_EntityT, _P], Awaitable[None]],
) -> Callable[Concatenate[_EntityT, _P], Coroutine[Any, Any, None]]:
@asynccontextmanager
async def convert_zha_error_to_ha_error() -> AsyncGenerator[None]:
"""Decorate ZHA commands and re-raises ZHAException as HomeAssistantError."""
try:
yield
except TimeoutError as exc:
raise HomeAssistantError(
"Failed to send request: device did not respond"
) from exc
except zigpy.exceptions.ZigbeeException as exc:
message = "Failed to send request"
@functools.wraps(func)
async def handler(self: _EntityT, *args: _P.args, **kwargs: _P.kwargs) -> None:
try:
return await func(self, *args, **kwargs)
except ZHAException as err:
raise HomeAssistantError(err) from err
if str(exc):
message = f"{message}: {exc}"
return handler
raise HomeAssistantError(message) from exc
except ZHAException as err:
raise HomeAssistantError(err) from err
def exclude_none_values(obj: Mapping[str, Any]) -> dict[str, Any]:
+2 -2
View File
@@ -171,7 +171,7 @@ class Light(LightEntity, ZHAEntity):
"""Return the current effect."""
return self.entity_data.entity.effect
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn the entity on."""
color_temp = (
@@ -189,7 +189,7 @@ class Light(LightEntity, ZHAEntity):
)
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the entity off."""
await self.entity_data.entity.async_turn_off(
+6 -6
View File
@@ -94,19 +94,19 @@ class ZhaDoorLock(ZHAEntity, LockEntity):
"""Return true if entity is locked."""
return self.entity_data.entity.is_locked
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_lock(self, **kwargs: Any) -> None:
"""Lock the lock."""
await self.entity_data.entity.async_lock()
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_unlock(self, **kwargs: Any) -> None:
"""Unlock the lock."""
await self.entity_data.entity.async_unlock()
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_set_lock_user_code(self, code_slot: int, user_code: str) -> None:
"""Set the user_code to index X on the lock."""
await self.entity_data.entity.async_set_lock_user_code(
@@ -114,19 +114,19 @@ class ZhaDoorLock(ZHAEntity, LockEntity):
)
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_enable_lock_user_code(self, code_slot: int) -> None:
"""Enable user_code at index X on the lock."""
await self.entity_data.entity.async_enable_lock_user_code(code_slot=code_slot)
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_disable_lock_user_code(self, code_slot: int) -> None:
"""Disable user_code at index X on the lock."""
await self.entity_data.entity.async_disable_lock_user_code(code_slot=code_slot)
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_clear_lock_user_code(self, code_slot: int) -> None:
"""Clear the user_code at index X on the lock."""
await self.entity_data.entity.async_clear_lock_user_code(code_slot=code_slot)
+1 -1
View File
@@ -23,7 +23,7 @@
"universal_silabs_flasher",
"serialx"
],
"requirements": ["zha==1.3.1"],
"requirements": ["zha==1.4.0"],
"usb": [
{
"description": "*2652*",
+1 -1
View File
@@ -78,7 +78,7 @@ class ZhaNumber(ZHAEntity, RestoreNumber):
"""Return the unit the value is expressed in."""
return self.entity_data.entity.native_unit_of_measurement
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_set_native_value(self, value: float) -> None:
"""Update the current value from HA."""
await self.entity_data.entity.async_set_native_value(value=value)
+1 -1
View File
@@ -58,7 +58,7 @@ class ZHAEnumSelectEntity(ZHAEntity, SelectEntity):
"""Return the selected entity option to represent the entity state."""
return self.entity_data.entity.current_option
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_select_option(self, option: str) -> None:
"""Change the selected option."""
await self.entity_data.entity.async_select_option(option=option)
+11 -16
View File
@@ -3,15 +3,10 @@
import functools
from typing import Any
from zha.application.const import (
WARNING_DEVICE_MODE_BURGLAR,
WARNING_DEVICE_MODE_EMERGENCY,
WARNING_DEVICE_MODE_EMERGENCY_PANIC,
WARNING_DEVICE_MODE_FIRE,
WARNING_DEVICE_MODE_FIRE_PANIC,
WARNING_DEVICE_MODE_POLICE_PANIC,
from zha.application.platforms.siren import (
SirenEntityFeature as ZHASirenEntityFeature,
WarningMode,
)
from zha.application.platforms.siren import SirenEntityFeature as ZHASirenEntityFeature
from homeassistant.components.siren import (
ATTR_DURATION,
@@ -59,12 +54,12 @@ class ZHASiren(ZHAEntity, SirenEntity):
"""Representation of a ZHA siren."""
_attr_available_tones: list[int | str] | dict[int, str] | None = {
WARNING_DEVICE_MODE_BURGLAR: "Burglar",
WARNING_DEVICE_MODE_FIRE: "Fire",
WARNING_DEVICE_MODE_EMERGENCY: "Emergency",
WARNING_DEVICE_MODE_POLICE_PANIC: "Police Panic",
WARNING_DEVICE_MODE_FIRE_PANIC: "Fire Panic",
WARNING_DEVICE_MODE_EMERGENCY_PANIC: "Emergency Panic",
WarningMode.Burglar: "Burglar",
WarningMode.Fire: "Fire",
WarningMode.Emergency: "Emergency",
WarningMode.Police_Panic: "Police Panic",
WarningMode.Fire_Panic: "Fire Panic",
WarningMode.Emergency_Panic: "Emergency Panic",
}
def __init__(self, entity_data: EntityData, **kwargs: Any) -> None:
@@ -92,7 +87,7 @@ class ZHASiren(ZHAEntity, SirenEntity):
"""Return True if entity is on."""
return self.entity_data.entity.is_on
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn on siren."""
await self.entity_data.entity.async_turn_on(
@@ -102,7 +97,7 @@ class ZHASiren(ZHAEntity, SirenEntity):
)
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn off siren."""
await self.entity_data.entity.async_turn_off()
+2 -2
View File
@@ -49,13 +49,13 @@ class Switch(ZHAEntity, SwitchEntity):
"""Return if the switch is on based on the statemachine."""
return self.entity_data.entity.is_on
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn the entity on."""
await self.entity_data.entity.async_turn_on()
self.async_write_ha_state()
@convert_zha_error_to_ha_error
@convert_zha_error_to_ha_error()
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the entity off."""
await self.entity_data.entity.async_turn_off()
+1 -1
View File
@@ -181,7 +181,7 @@ class ZHAFirmwareUpdateEntity(
return self.entity_data.entity.release_url
# We explicitly convert ZHA exceptions to HA exceptions here so there is no need to
# use the `@convert_zha_error_to_ha_error` decorator.
# use the `@convert_zha_error_to_ha_error()` decorator.
async def async_install(
self, version: str | None, backup: bool, **kwargs: Any
) -> None:
+34 -80
View File
@@ -29,12 +29,6 @@ from zha.application.const import (
CLUSTER_COMMANDS_SERVER,
CLUSTER_TYPE_IN,
CLUSTER_TYPE_OUT,
WARNING_DEVICE_MODE_EMERGENCY,
WARNING_DEVICE_SOUND_HIGH,
WARNING_DEVICE_SQUAWK_MODE_ARMED,
WARNING_DEVICE_STROBE_HIGH,
WARNING_DEVICE_STROBE_YES,
ZHA_CLUSTER_HANDLER_MSG,
ZHA_GW_MSG,
)
from zha.application.gateway import Gateway
@@ -44,7 +38,14 @@ from zha.application.helpers import (
get_matched_clusters,
qr_to_install_code,
)
from zha.zigbee.cluster_handlers.const import CLUSTER_HANDLER_IAS_WD
from zha.application.platforms.siren import (
BaseSiren,
SirenLevel,
SquawkMode,
Strobe,
StrobeLevel,
WarningMode,
)
from zha.zigbee.group import GroupMemberReference
import zigpy.backups
from zigpy.config import CONF_DEVICE
@@ -59,7 +60,7 @@ import zigpy.zdo.types as zdo_types
from homeassistant.components import websocket_api
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import ATTR_COMMAND, ATTR_ID, ATTR_NAME
from homeassistant.const import ATTR_COMMAND, ATTR_ID, ATTR_NAME, Platform
from homeassistant.core import HomeAssistant, ServiceCall, callback
from homeassistant.helpers import config_validation as cv, entity_registry as er
from homeassistant.helpers.dispatcher import async_dispatcher_connect
@@ -79,6 +80,7 @@ from .const import (
GROUP_IDS,
GROUP_NAME,
MFG_CLUSTER_ID_START,
SIGNAL_DEVICE_RECONFIGURE_EVENT,
ZHA_ALARM_OPTIONS,
ZHA_OPTIONS,
)
@@ -180,13 +182,13 @@ SERVICE_SCHEMAS: dict[str, VolSchemaType] = {
{
vol.Required(ATTR_IEEE): IEEE_SCHEMA,
vol.Optional(
ATTR_WARNING_DEVICE_MODE, default=WARNING_DEVICE_SQUAWK_MODE_ARMED
ATTR_WARNING_DEVICE_MODE, default=SquawkMode.Armed
): cv.positive_int,
vol.Optional(
ATTR_WARNING_DEVICE_STROBE, default=WARNING_DEVICE_STROBE_YES
ATTR_WARNING_DEVICE_STROBE, default=Strobe.Strobe
): cv.positive_int,
vol.Optional(
ATTR_LEVEL, default=WARNING_DEVICE_SOUND_HIGH
ATTR_LEVEL, default=SirenLevel.High_level_sound
): cv.positive_int,
}
),
@@ -194,20 +196,21 @@ SERVICE_SCHEMAS: dict[str, VolSchemaType] = {
{
vol.Required(ATTR_IEEE): IEEE_SCHEMA,
vol.Optional(
ATTR_WARNING_DEVICE_MODE, default=WARNING_DEVICE_MODE_EMERGENCY
ATTR_WARNING_DEVICE_MODE, default=WarningMode.Emergency
): cv.positive_int,
vol.Optional(
ATTR_WARNING_DEVICE_STROBE, default=WARNING_DEVICE_STROBE_YES
ATTR_WARNING_DEVICE_STROBE, default=Strobe.Strobe
): cv.positive_int,
vol.Optional(
ATTR_LEVEL, default=WARNING_DEVICE_SOUND_HIGH
ATTR_LEVEL, default=SirenLevel.High_level_sound
): cv.positive_int,
vol.Optional(ATTR_WARNING_DEVICE_DURATION, default=5): cv.positive_int,
vol.Optional(
ATTR_WARNING_DEVICE_STROBE_DUTY_CYCLE, default=0x00
): cv.positive_int,
vol.Optional(
ATTR_WARNING_DEVICE_STROBE_INTENSITY, default=WARNING_DEVICE_STROBE_HIGH
ATTR_WARNING_DEVICE_STROBE_INTENSITY,
default=StrobeLevel.High_level_strobe,
): cv.positive_int,
}
),
@@ -424,10 +427,7 @@ async def websocket_get_groupable_devices(
),
}
for entity_ref in entity_refs
if list(entity_ref.entity_data.entity.cluster_handlers.values())[
0
].cluster.endpoint.endpoint_id
== ep_id
if entity_ref.entity_data.entity.endpoint.id == ep_id
],
"device": device.zha_device_info,
}
@@ -649,7 +649,7 @@ async def websocket_reconfigure_node(
connection.send_message(websocket_api.event_message(msg["id"], data))
remove_dispatcher_function = async_dispatcher_connect(
hass, ZHA_CLUSTER_HANDLER_MSG, forward_messages
hass, SIGNAL_DEVICE_RECONFIGURE_EVENT, forward_messages
)
@callback
@@ -1480,15 +1480,6 @@ def async_load_api(hass: HomeAssistant) -> None:
schema=SERVICE_SCHEMAS[SERVICE_ISSUE_ZIGBEE_GROUP_COMMAND],
)
def _get_ias_wd_cluster_handler(zha_device):
"""Get the IASWD cluster handler for a device."""
cluster_handlers = {
ch.name: ch
for endpoint in zha_device.endpoints.values()
for ch in endpoint.claimed_cluster_handlers.values()
}
return cluster_handlers.get(CLUSTER_HANDLER_IAS_WD)
async def warning_device_squawk(service: ServiceCall) -> None:
"""Issue the squawk command for an IAS warning device."""
ieee: EUI64 = service.data[ATTR_IEEE]
@@ -1496,31 +1487,10 @@ def async_load_api(hass: HomeAssistant) -> None:
strobe: int = service.data[ATTR_WARNING_DEVICE_STROBE]
level: int = service.data[ATTR_LEVEL]
if (zha_device := zha_gateway.get_device(ieee)) is not None:
if cluster_handler := _get_ias_wd_cluster_handler(zha_device):
await cluster_handler.issue_squawk(mode, strobe, level)
else:
_LOGGER.error(
"Squawking IASWD: %s: [%s] is missing"
" the required IASWD cluster handler!",
ATTR_IEEE,
str(ieee),
)
else:
_LOGGER.error(
"Squawking IASWD: %s: [%s] could not be found!", ATTR_IEEE, str(ieee)
)
_LOGGER.debug(
"Squawking IASWD: %s: [%s] %s: [%s] %s: [%s] %s: [%s]",
ATTR_IEEE,
str(ieee),
ATTR_WARNING_DEVICE_MODE,
mode,
ATTR_WARNING_DEVICE_STROBE,
strobe,
ATTR_LEVEL,
level,
)
device = zha_gateway.get_device(ieee)
siren: BaseSiren = device.get_entity(Platform.SIREN, pick_first=True)
await siren.async_squawk(mode=mode, strobe=strobe, squawk_level=level)
async_register_admin_service(
hass,
@@ -1540,32 +1510,16 @@ def async_load_api(hass: HomeAssistant) -> None:
duty_mode: int = service.data[ATTR_WARNING_DEVICE_STROBE_DUTY_CYCLE]
intensity: int = service.data[ATTR_WARNING_DEVICE_STROBE_INTENSITY]
if (zha_device := zha_gateway.get_device(ieee)) is not None:
if cluster_handler := _get_ias_wd_cluster_handler(zha_device):
await cluster_handler.issue_start_warning(
mode, strobe, level, duration, duty_mode, intensity
)
else:
_LOGGER.error(
"Warning IASWD: %s: [%s] is missing"
" the required IASWD cluster handler!",
ATTR_IEEE,
str(ieee),
)
else:
_LOGGER.error(
"Warning IASWD: %s: [%s] could not be found!", ATTR_IEEE, str(ieee)
)
_LOGGER.debug(
"Warning IASWD: %s: [%s] %s: [%s] %s: [%s] %s: [%s]",
ATTR_IEEE,
str(ieee),
ATTR_WARNING_DEVICE_MODE,
mode,
ATTR_WARNING_DEVICE_STROBE,
strobe,
ATTR_LEVEL,
level,
device = zha_gateway.get_device(ieee)
siren: BaseSiren = device.get_entity(Platform.SIREN, pick_first=True)
await siren.async_turn_on(
tone=mode,
volume_level=level,
duration=duration,
strobe=strobe,
strobe_duty_cycle=duty_mode,
strobe_intensity=intensity,
)
async_register_admin_service(
+14
View File
@@ -3,5 +3,19 @@
"reload": {
"service": "mdi:reload"
}
},
"triggers": {
"entered": {
"trigger": "mdi:map-marker-plus"
},
"left": {
"trigger": "mdi:map-marker-minus"
},
"occupancy_cleared": {
"trigger": "mdi:account-off"
},
"occupancy_detected": {
"trigger": "mdi:account-group"
}
}
}
@@ -1,8 +1,74 @@
{
"common": {
"trigger_behavior_name": "Trigger when",
"trigger_for_name": "For at least",
"trigger_zone_description": "The zone to trigger on.",
"trigger_zone_name": "Zone"
},
"services": {
"reload": {
"description": "Reloads zones from the YAML-configuration.",
"name": "Reload zones"
}
},
"triggers": {
"entered": {
"description": "Triggers when one or more persons or device trackers enter a zone.",
"fields": {
"behavior": {
"name": "[%key:component::zone::common::trigger_behavior_name%]"
},
"for": {
"name": "[%key:component::zone::common::trigger_for_name%]"
},
"zone": {
"description": "[%key:component::zone::common::trigger_zone_description%]",
"name": "[%key:component::zone::common::trigger_zone_name%]"
}
},
"name": "Entered zone"
},
"left": {
"description": "Triggers when one or more persons or device trackers leave a zone.",
"fields": {
"behavior": {
"name": "[%key:component::zone::common::trigger_behavior_name%]"
},
"for": {
"name": "[%key:component::zone::common::trigger_for_name%]"
},
"zone": {
"description": "[%key:component::zone::common::trigger_zone_description%]",
"name": "[%key:component::zone::common::trigger_zone_name%]"
}
},
"name": "Left zone"
},
"occupancy_cleared": {
"description": "Triggers when a zone transitions from occupied to unoccupied.",
"fields": {
"for": {
"name": "[%key:component::zone::common::trigger_for_name%]"
},
"zone": {
"description": "[%key:component::zone::triggers::occupancy_detected::fields::zone::description%]",
"name": "[%key:component::zone::triggers::occupancy_detected::fields::zone::name%]"
}
},
"name": "Zone occupancy cleared"
},
"occupancy_detected": {
"description": "Triggers when a zone transitions to an occupied state.",
"fields": {
"for": {
"name": "[%key:component::zone::common::trigger_for_name%]"
},
"zone": {
"description": "The zone to monitor.",
"name": "Zone"
}
},
"name": "Zone occupancy detected"
}
}
}
+229 -76
View File
@@ -1,22 +1,26 @@
"""Offer zone automation rules."""
import logging
from typing import TYPE_CHECKING, Any, cast
import voluptuous as vol
from homeassistant.components.device_tracker import ATTR_IN_ZONES
from homeassistant.const import (
ATTR_FRIENDLY_NAME,
CONF_ENTITY_ID,
CONF_EVENT,
CONF_PLATFORM,
CONF_FOR,
CONF_OPTIONS,
CONF_TARGET,
CONF_ZONE,
)
from homeassistant.core import (
CALLBACK_TYPE,
Event,
EventStateChangedData,
HassJob,
HomeAssistant,
State,
callback,
)
from homeassistant.helpers import (
@@ -24,8 +28,18 @@ from homeassistant.helpers import (
entity_registry as er,
location,
)
from homeassistant.helpers.automation import (
DomainSpec,
move_top_level_schema_fields_to_options,
)
from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.helpers.trigger import TriggerActionType, TriggerInfo
from homeassistant.helpers.trigger import (
ENTITY_STATE_TRIGGER_SCHEMA_WITH_BEHAVIOR,
EntityTriggerBase,
Trigger,
TriggerActionRunner,
TriggerConfig,
)
from homeassistant.helpers.typing import ConfigType
from . import condition
@@ -38,93 +52,232 @@ _LOGGER = logging.getLogger(__name__)
_EVENT_DESCRIPTION = {EVENT_ENTER: "entering", EVENT_LEAVE: "leaving"}
_TRIGGER_SCHEMA = cv.TRIGGER_BASE_SCHEMA.extend(
_LEGACY_OPTIONS_SCHEMA: dict[vol.Marker, Any] = {
vol.Required(CONF_ENTITY_ID): cv.entity_ids_or_uuids,
vol.Required(CONF_ZONE): cv.entity_id,
vol.Required(CONF_EVENT, default=DEFAULT_EVENT): vol.Any(EVENT_ENTER, EVENT_LEAVE),
}
_LEGACY_TRIGGER_OPTIONS_SCHEMA = vol.Schema(
{
vol.Required(CONF_PLATFORM): "zone",
vol.Required(CONF_ENTITY_ID): cv.entity_ids_or_uuids,
vol.Required(CONF_ZONE): cv.entity_id,
vol.Required(CONF_EVENT, default=DEFAULT_EVENT): vol.Any(
EVENT_ENTER, EVENT_LEAVE
),
vol.Required(CONF_OPTIONS): _LEGACY_OPTIONS_SCHEMA,
},
)
# New-style zone trigger schema
_ZONE_TRIGGER_SCHEMA = ENTITY_STATE_TRIGGER_SCHEMA_WITH_BEHAVIOR.extend(
{
vol.Required(CONF_OPTIONS): {
vol.Required(CONF_ZONE): cv.entity_domain("zone"),
},
}
)
async def async_validate_trigger_config(
hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate trigger config."""
config = _TRIGGER_SCHEMA(config)
registry = er.async_get(hass)
config[CONF_ENTITY_ID] = er.async_validate_entity_ids(
registry, config[CONF_ENTITY_ID]
)
return config
_DOMAIN_SPECS: dict[str, DomainSpec] = {
"person": DomainSpec(),
"device_tracker": DomainSpec(),
}
async def async_attach_trigger(
hass: HomeAssistant,
config: ConfigType,
action: TriggerActionType,
trigger_info: TriggerInfo,
*,
platform_type: str = "zone",
) -> CALLBACK_TYPE:
"""Listen for state changes based on configuration."""
trigger_data = trigger_info["trigger_data"]
entity_id: list[str] = config[CONF_ENTITY_ID]
zone_entity_id: str = config[CONF_ZONE]
event: str = config[CONF_EVENT]
job = HassJob(action)
class LegacyZoneTrigger(Trigger):
"""Legacy zone trigger (platform: zone)."""
@callback
def zone_automation_listener(zone_event: Event[EventStateChangedData]) -> None:
"""Listen for state changes and calls action."""
entity = zone_event.data["entity_id"]
from_s = zone_event.data["old_state"]
to_s = zone_event.data["new_state"]
@classmethod
async def async_validate_complete_config(
cls, hass: HomeAssistant, complete_config: ConfigType
) -> ConfigType:
"""Validate complete config, migrating legacy format to options."""
complete_config = move_top_level_schema_fields_to_options(
complete_config, _LEGACY_OPTIONS_SCHEMA
)
return await super().async_validate_complete_config(hass, complete_config)
if (from_s and not location.has_location(from_s)) or (
to_s and not location.has_location(to_s)
):
return
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
config = cast(ConfigType, _LEGACY_TRIGGER_OPTIONS_SCHEMA(config))
registry = er.async_get(hass)
config[CONF_OPTIONS][CONF_ENTITY_ID] = er.async_validate_entity_ids(
registry, config[CONF_OPTIONS][CONF_ENTITY_ID]
)
return config
if not (zone_state := hass.states.get(zone_entity_id)):
_LOGGER.warning(
(
"Automation '%s' is referencing non-existing zone '%s' in a zone"
" trigger"
),
trigger_info["name"],
zone_entity_id,
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.options is not None
self._options = config.options
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Listen for state changes based on configuration."""
entity_id: list[str] = self._options[CONF_ENTITY_ID]
zone_entity_id: str = self._options[CONF_ZONE]
event: str = self._options[CONF_EVENT]
@callback
def zone_automation_listener(zone_event: Event[EventStateChangedData]) -> None:
"""Listen for state changes and calls action."""
entity = zone_event.data["entity_id"]
from_s = zone_event.data["old_state"]
to_s = zone_event.data["new_state"]
if (from_s and not location.has_location(from_s)) or (
to_s and not location.has_location(to_s)
):
return
if not (zone_state := self._hass.states.get(zone_entity_id)):
_LOGGER.warning(
"Non-existing zone '%s' in a zone trigger",
zone_entity_id,
)
return
from_match = (
condition.zone(self._hass, zone_state, from_s) if from_s else False
)
return
to_match = condition.zone(self._hass, zone_state, to_s) if to_s else False
from_match = condition.zone(hass, zone_state, from_s) if from_s else False
to_match = condition.zone(hass, zone_state, to_s) if to_s else False
if (event == EVENT_ENTER and not from_match and to_match) or (
event == EVENT_LEAVE and from_match and not to_match
):
description = (
f"{entity} {_EVENT_DESCRIPTION[event]}"
f" {zone_state.attributes[ATTR_FRIENDLY_NAME]}"
)
hass.async_run_hass_job(
job,
{
"trigger": {
**trigger_data,
"platform": platform_type,
if (event == EVENT_ENTER and not from_match and to_match) or (
event == EVENT_LEAVE and from_match and not to_match
):
description = f"{entity} {_EVENT_DESCRIPTION[event]} {zone_state.attributes[ATTR_FRIENDLY_NAME]}"
run_action(
{
"entity_id": entity,
"from_state": from_s,
"to_state": to_s,
"zone": zone_state,
"event": event,
"description": description,
}
},
to_s.context if to_s else None,
)
},
description,
to_s.context if to_s else None,
)
return async_track_state_change_event(hass, entity_id, zone_automation_listener)
return async_track_state_change_event(
self._hass, entity_id, zone_automation_listener
)
class ZoneTriggerBase(EntityTriggerBase):
"""Base for zone-based triggers targeting person and device_tracker entities."""
_domain_specs = _DOMAIN_SPECS
_schema = _ZONE_TRIGGER_SCHEMA
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the trigger."""
super().__init__(hass, config)
self._zone: str = self._options[CONF_ZONE]
def _in_target_zone(self, state: State) -> bool:
"""Check if the entity is in the selected zone."""
in_zones = state.attributes.get(ATTR_IN_ZONES) or ()
return self._zone in in_zones
class EnteredZoneTrigger(ZoneTriggerBase):
"""Trigger when an entity enters the selected zone."""
def is_valid_transition(self, from_state: State, to_state: State) -> bool:
"""Check that the entity was not already in the selected zone."""
return not self._in_target_zone(from_state)
def is_valid_state(self, state: State) -> bool:
"""Check that the entity is now in the selected zone."""
return self._in_target_zone(state)
class LeftZoneTrigger(ZoneTriggerBase):
"""Trigger when an entity leaves the selected zone."""
def is_valid_transition(self, from_state: State, to_state: State) -> bool:
"""Check that the entity was previously in the selected zone."""
return self._in_target_zone(from_state)
def is_valid_state(self, state: State) -> bool:
"""Check that the entity is no longer in the selected zone."""
return not self._in_target_zone(state)
_OCCUPANCY_TRIGGER_SCHEMA = vol.Schema(
{
vol.Required(CONF_OPTIONS, default={}): {
vol.Required(CONF_ZONE): cv.entity_id,
vol.Optional(CONF_FOR): cv.positive_time_period,
},
}
)
class _ZoneOccupancyTriggerBase(EntityTriggerBase):
"""Base for zone occupancy triggers (single zone, no behavior)."""
_domain_specs = {"zone": DomainSpec()}
_schema = _OCCUPANCY_TRIGGER_SCHEMA
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config and synthesize a target from the zone option."""
config = cast(ConfigType, cls._schema(config))
config[CONF_TARGET] = {CONF_ENTITY_ID: [config[CONF_OPTIONS][CONF_ZONE]]}
return config
@staticmethod
def _occupancy_count(state: State) -> int | None:
"""Return the zone's persons-in-zone count; None if unparsable."""
try:
return int(state.state)
except TypeError, ValueError:
return None
@classmethod
def _is_occupied(cls, state: State) -> bool:
"""Return True if the zone has at least one occupant."""
count = cls._occupancy_count(state)
return count is not None and count >= 1
class OccupancyDetectedTrigger(_ZoneOccupancyTriggerBase):
"""Trigger when a zone transitions to an occupied state."""
def is_valid_state(self, state: State) -> bool:
"""Check that the zone is occupied."""
return self._is_occupied(state)
def is_valid_transition(self, from_state: State, to_state: State) -> bool:
"""Check that the zone was previously not occupied."""
return not self._is_occupied(from_state)
class OccupancyClearedTrigger(_ZoneOccupancyTriggerBase):
"""Trigger when a zone transitions from occupied to unoccupied."""
def is_valid_state(self, state: State) -> bool:
"""Check that the zone is empty (count == 0)."""
return self._occupancy_count(state) == 0
def is_valid_transition(self, from_state: State, to_state: State) -> bool:
"""Check that the zone was previously occupied."""
return self._is_occupied(from_state)
TRIGGERS: dict[str, type[Trigger]] = {
"_": LegacyZoneTrigger,
"entered": EnteredZoneTrigger,
"left": LeftZoneTrigger,
"occupancy_detected": OccupancyDetectedTrigger,
"occupancy_cleared": OccupancyClearedTrigger,
}
async def async_get_triggers(hass: HomeAssistant) -> dict[str, type[Trigger]]:
"""Return the triggers for zones."""
return TRIGGERS
@@ -0,0 +1,42 @@
.trigger_zone: &trigger_zone
target:
entity:
domain:
- person
- device_tracker
fields:
behavior:
required: true
default: each
selector:
automation_behavior:
mode: trigger
for:
required: true
default: 00:00:00
selector:
duration:
zone:
required: true
selector:
entity:
domain: zone
entered: *trigger_zone
left: *trigger_zone
.trigger_occupancy: &trigger_occupancy
fields:
for:
required: true
default: 00:00:00
selector:
duration:
zone:
required: true
selector:
entity:
domain: zone
occupancy_detected: *trigger_occupancy
occupancy_cleared: *trigger_occupancy
+8 -1
View File
@@ -1714,7 +1714,14 @@ def async_extract_entities(trigger_conf: dict) -> list[str]:
return [trigger_conf[CONF_OPTIONS][CONF_ENTITY_ID]]
if trigger_conf[CONF_PLATFORM] == "zone":
return trigger_conf[CONF_ENTITY_ID] + [trigger_conf[CONF_ZONE]] # type: ignore[no-any-return]
options = trigger_conf[CONF_OPTIONS]
return [*options[CONF_ENTITY_ID], options[CONF_ZONE]]
if trigger_conf[CONF_PLATFORM] in ("zone.entered", "zone.left"):
return [
*async_extract_targets(trigger_conf, CONF_ENTITY_ID),
trigger_conf[CONF_OPTIONS][CONF_ZONE],
]
if trigger_conf[CONF_PLATFORM] == "geo_location":
return [trigger_conf[CONF_ZONE]]
+1 -1
View File
@@ -30,7 +30,7 @@ certifi>=2021.5.30
ciso8601==2.3.3
cronsim==2.7
cryptography==48.0.0
dbus-fast==5.0.15
dbus-fast==5.0.16
file-read-backwards==2.0.0
fnv-hash-fast==2.0.3
go2rtc-client==0.4.0
+6 -6
View File
@@ -190,7 +190,7 @@ aioairzone-cloud==0.7.2
aioairzone==1.0.5
# homeassistant.components.alexa_devices
aioamazondevices==13.8.0
aioamazondevices==13.8.1
# homeassistant.components.ambient_network
# homeassistant.components.ambient_station
@@ -797,7 +797,7 @@ datadog==0.52.0
datapoint==0.12.1
# homeassistant.components.bluetooth
dbus-fast==5.0.15
dbus-fast==5.0.16
# homeassistant.components.debugpy
debugpy==1.8.17
@@ -1371,7 +1371,7 @@ insteon-frontend-home-assistant==0.6.2
intellifire4py==4.4.0
# homeassistant.components.iometer
iometer==0.4.0
iometer==1.0.1
# homeassistant.components.iotty
iottycloud==0.3.0
@@ -1737,7 +1737,7 @@ ondilo==0.5.0
onedrive-personal-sdk==0.1.7
# homeassistant.components.onvif
onvif-zeep-async==4.1.0
onvif-zeep-async==4.1.1
# homeassistant.components.onvif
onvif_parsers==2.3.0
@@ -3403,7 +3403,7 @@ yalexs-ble==3.3.0
# homeassistant.components.august
# homeassistant.components.yale
yalexs==9.2.0
yalexs==9.2.1
# homeassistant.components.yeelight
yeelight==0.7.16
@@ -3442,7 +3442,7 @@ zeroconf==0.149.16
zeversolar==0.3.2
# homeassistant.components.zha
zha==1.3.1
zha==1.4.0
# homeassistant.components.zhong_hong
zhong-hong-hvac==1.0.13
+29 -10
View File
@@ -1586,12 +1586,12 @@ async def _validate_trigger_options(
options: dict[str, Any] | None,
*,
valid: bool,
supports_target: bool = True,
) -> None:
"""Assert that a trigger accepts or rejects the given options during validation."""
trigger_config: dict[str, Any] = {
CONF_PLATFORM: trigger,
CONF_TARGET: {ATTR_LABEL_ID: "test_label"},
}
trigger_config: dict[str, Any] = {CONF_PLATFORM: trigger}
if supports_target:
trigger_config[CONF_TARGET] = {ATTR_LABEL_ID: "test_label"}
if options is not None:
trigger_config[CONF_OPTIONS] = options
if valid:
@@ -1608,6 +1608,7 @@ async def assert_trigger_options_supported(
*,
supports_behavior: bool,
supports_duration: bool,
supports_target: bool = True,
) -> None:
"""Assert which options a trigger supports.
@@ -1624,9 +1625,15 @@ async def assert_trigger_options_supported(
# Minimal config should always be valid
supports_empty = not bool(base_options)
await _validate_trigger_options(hass, trigger, None, valid=supports_empty)
await _validate_trigger_options(hass, trigger, {}, valid=supports_empty)
await _validate_trigger_options(hass, trigger, base_options, valid=True)
await _validate_trigger_options(
hass, trigger, None, valid=supports_empty, supports_target=supports_target
)
await _validate_trigger_options(
hass, trigger, {}, valid=supports_empty, supports_target=supports_target
)
await _validate_trigger_options(
hass, trigger, base_options, valid=True, supports_target=supports_target
)
def _merge(extra: dict[str, Any]) -> dict[str, Any]:
return {**(base_options or {}), **extra}
@@ -1634,18 +1641,30 @@ async def assert_trigger_options_supported(
# Behavior
for behavior in ("each", "first", "all"):
await _validate_trigger_options(
hass, trigger, _merge({"behavior": behavior}), valid=supports_behavior
hass,
trigger,
_merge({"behavior": behavior}),
valid=supports_behavior,
supports_target=supports_target,
)
# Duration
for for_value in ({"seconds": 5}, "00:00:05", 5):
await _validate_trigger_options(
hass, trigger, _merge({"for": for_value}), valid=supports_duration
hass,
trigger,
_merge({"for": for_value}),
valid=supports_duration,
supports_target=supports_target,
)
# Unknown option should always be rejected
await _validate_trigger_options(
hass, trigger, _merge({"unknown_option": True}), valid=False
hass,
trigger,
_merge({"unknown_option": True}),
valid=False,
supports_target=supports_target,
)
@@ -1379,6 +1379,76 @@ def test_base_tracker_entity() -> None:
entity.state_attributes # noqa: B018
async def test_attr_location_name_deprecation_warning(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test that setting _attr_location_name logs a deprecation warning."""
error_message = "is setting the deprecated _attr_location_name attribute"
class _Subclass(TrackerEntity):
pass
# No warning when _attr_location_name is unset (default None)
entity_no_attr = _Subclass()
entity_no_attr.hass = hass
assert entity_no_attr.location_name is None
assert error_message not in caplog.text
# Warning fires when _attr_location_name has a non-None value
entity = _Subclass()
entity.hass = hass
entity._attr_location_name = "the_zone"
caplog.clear()
assert entity.location_name == "the_zone"
assert error_message in caplog.text
# Warning does not fire again on subsequent access for the same instance
caplog.clear()
assert entity.location_name == "the_zone"
assert error_message not in caplog.text
# Warning is suppressed for this instance even after the cached value is
# invalidated by a subsequent _attr_location_name assignment.
entity._attr_location_name = "another_zone"
caplog.clear()
assert entity.location_name == "another_zone"
assert error_message not in caplog.text
# A fresh instance warns once again
entity_new = _Subclass()
entity_new.hass = hass
entity_new._attr_location_name = "the_zone"
caplog.clear()
assert entity_new.location_name == "the_zone"
assert error_message in caplog.text
def test_location_name_override_deprecation_warning(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test that overriding location_name in a subclass logs a warning."""
error_message = "is overriding the deprecated location_name property"
caplog.clear()
class _SubclassWithOverride(TrackerEntity):
@property
def location_name(self) -> str | None:
return "custom"
assert error_message in caplog.text
assert _SubclassWithOverride.__name__ in caplog.text
# No warning for a subclass that does not override location_name
caplog.clear()
class _SubclassWithoutOverride(TrackerEntity):
pass
assert error_message not in caplog.text
@pytest.mark.parametrize(
("mac_address", "unique_id"), [(TEST_MAC_ADDRESS, f"{TEST_MAC_ADDRESS}_yo1")]
)
@@ -1,5 +1,6 @@
{
"__typename": "iometer.reading.v1",
"installationId": "658c2b34-2017-45f2-a12b-731235f8bb97",
"meter": {
"number": "1ISK0000000000",
"reading": {
@@ -1,5 +1,6 @@
{
"__typename": "iometer.status.v1",
"installationId": "658c2b34-2017-45f2-a12b-731235f8bb97",
"meter": {
"number": "1ISK0000000000"
},
File diff suppressed because it is too large Load Diff
+99 -1
View File
@@ -12,7 +12,7 @@ from syrupy.assertion import SnapshotAssertion
from homeassistant.components.matter.binary_sensor import (
DISCOVERY_SCHEMAS as BINARY_SENSOR_SCHEMAS,
)
from homeassistant.const import Platform
from homeassistant.const import EntityCategory, Platform
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
@@ -733,3 +733,101 @@ async def test_co_detector(
state = hass.states.get("binary_sensor.smart_co_sensor_carbon_monoxide")
assert state
assert state.state == "off"
@pytest.mark.usefixtures("entity_registry_enabled_by_default")
@pytest.mark.parametrize("node_fixture", ["device_diagnostics"])
async def test_general_diagnostics_fault_sensors(
hass: HomeAssistant,
entity_registry: er.EntityRegistry,
matter_client: MagicMock,
matter_node: MatterNode,
) -> None:
"""Test GeneralDiagnostics active fault binary sensors."""
# ActiveHardwareFaults (cluster 51, attr 5) = [] (no faults)
entity_id = "binary_sensor.m5stamp_lighting_app_hardware_faults"
state = hass.states.get(entity_id)
assert state
assert state.state == "off"
entry = entity_registry.async_get(entity_id)
assert entry
assert entry.entity_category is EntityCategory.DIAGNOSTIC
# Simulate hardware fault
set_node_attribute(matter_node, 0, 51, 5, [1])
await trigger_subscription_callback(hass, matter_client)
state = hass.states.get(entity_id)
assert state
assert state.state == "on"
# Clear faults
set_node_attribute(matter_node, 0, 51, 5, [])
await trigger_subscription_callback(hass, matter_client)
state = hass.states.get(entity_id)
assert state
assert state.state == "off"
# ActiveRadioFaults (cluster 51, attr 6) = [] (no faults)
state = hass.states.get("binary_sensor.m5stamp_lighting_app_radio_faults")
assert state
assert state.state == "off"
set_node_attribute(matter_node, 0, 51, 6, [1])
await trigger_subscription_callback(hass, matter_client)
state = hass.states.get("binary_sensor.m5stamp_lighting_app_radio_faults")
assert state
assert state.state == "on"
set_node_attribute(matter_node, 0, 51, 6, [])
await trigger_subscription_callback(hass, matter_client)
state = hass.states.get("binary_sensor.m5stamp_lighting_app_radio_faults")
assert state
assert state.state == "off"
# ActiveNetworkFaults (cluster 51, attr 7) = [] (no faults)
state = hass.states.get("binary_sensor.m5stamp_lighting_app_network_faults")
assert state
assert state.state == "off"
set_node_attribute(matter_node, 0, 51, 7, [1])
await trigger_subscription_callback(hass, matter_client)
state = hass.states.get("binary_sensor.m5stamp_lighting_app_network_faults")
assert state
assert state.state == "on"
set_node_attribute(matter_node, 0, 51, 7, [])
await trigger_subscription_callback(hass, matter_client)
state = hass.states.get("binary_sensor.m5stamp_lighting_app_network_faults")
assert state
assert state.state == "off"
entry = entity_registry.async_get(
"binary_sensor.m5stamp_lighting_app_hardware_faults"
)
assert entry
assert entry.entity_category is EntityCategory.DIAGNOSTIC
@pytest.mark.parametrize("node_fixture", ["device_diagnostics"])
async def test_general_diagnostics_fault_sensors_disabled_by_default(
hass: HomeAssistant,
entity_registry: er.EntityRegistry,
matter_client: MagicMock,
matter_node: MatterNode,
) -> None:
"""Test GeneralDiagnostics fault binary sensors are disabled by default."""
for entity_id in (
"binary_sensor.m5stamp_lighting_app_hardware_faults",
"binary_sensor.m5stamp_lighting_app_radio_faults",
"binary_sensor.m5stamp_lighting_app_network_faults",
):
entry = entity_registry.async_get(entity_id)
assert entry, f"Expected {entity_id} to be registered"
assert entry.disabled_by is er.RegistryEntryDisabler.INTEGRATION
+73
View File
@@ -885,3 +885,76 @@ async def test_bridged_device_reachable_updates_availability(
state = hass.states.get(entity_id)
assert state
assert state.state != STATE_UNAVAILABLE
@pytest.mark.freeze_time("2025-01-01T14:00:00+00:00")
@pytest.mark.usefixtures("entity_registry_enabled_by_default")
@pytest.mark.parametrize("node_fixture", ["device_diagnostics"])
async def test_general_diagnostics_sensors(
hass: HomeAssistant,
entity_registry: er.EntityRegistry,
matter_client: MagicMock,
matter_node: MatterNode,
) -> None:
"""Test GeneralDiagnostics cluster sensors."""
# RebootCount (cluster 51, attr 1) = 3
state = hass.states.get("sensor.m5stamp_lighting_app_reboot_count")
assert state
assert state.state == "3"
set_node_attribute(matter_node, 0, 51, 1, 5)
await trigger_subscription_callback(hass, matter_client)
state = hass.states.get("sensor.m5stamp_lighting_app_reboot_count")
assert state
assert state.state == "5"
entry = entity_registry.async_get("sensor.m5stamp_lighting_app_reboot_count")
assert entry
assert entry.entity_category == EntityCategory.DIAGNOSTIC
# UpTime (cluster 51, attr 2) = 213 seconds → boot at now - 213s
state = hass.states.get("sensor.m5stamp_lighting_app_uptime")
assert state
assert state.state == "2025-01-01T13:56:27+00:00"
set_node_attribute(matter_node, 0, 51, 2, 3600)
await trigger_subscription_callback(hass, matter_client)
state = hass.states.get("sensor.m5stamp_lighting_app_uptime")
assert state
assert state.state == "2025-01-01T13:00:00+00:00"
# BootReason (cluster 51, attr 4) = 1 (PowerOnReboot)
state = hass.states.get("sensor.m5stamp_lighting_app_boot_reason")
assert state
assert state.state == "power_on_reboot"
set_node_attribute(matter_node, 0, 51, 4, 6)
await trigger_subscription_callback(hass, matter_client)
state = hass.states.get("sensor.m5stamp_lighting_app_boot_reason")
assert state
assert state.state == "software_reset"
entry = entity_registry.async_get("sensor.m5stamp_lighting_app_boot_reason")
assert entry
assert entry.entity_category == EntityCategory.DIAGNOSTIC
@pytest.mark.parametrize("node_fixture", ["device_diagnostics"])
async def test_general_diagnostics_sensors_disabled_by_default(
hass: HomeAssistant,
entity_registry: er.EntityRegistry,
matter_client: MagicMock,
matter_node: MatterNode,
) -> None:
"""Test GeneralDiagnostics sensors are disabled by default."""
for entity_id in (
"sensor.m5stamp_lighting_app_reboot_count",
"sensor.m5stamp_lighting_app_uptime",
"sensor.m5stamp_lighting_app_boot_reason",
):
entry = entity_registry.async_get(entity_id)
assert entry, f"Expected {entity_id} to be registered"
assert entry.disabled_by is er.RegistryEntryDisabler.INTEGRATION
+12 -12
View File
@@ -10,35 +10,35 @@ import pytest
from homeassistant.const import STATE_UNAVAILABLE
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
from . import spot_price_fetcher
from tests.common import MockConfigEntry, async_fire_time_changed
@freeze_time("2024-01-15T14:01:00Z")
async def test_sensor_setup(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_pyomie: MagicMock,
entity_registry: er.EntityRegistry,
mock_omie_results_jan15: OMIEResults,
) -> None:
"""Test sensor platform setup."""
mock_pyomie.spot_price.side_effect = spot_price_fetcher(
{
"2024-01-15": mock_omie_results_jan15,
}
)
mock_config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
entities = er.async_entries_for_config_entry(
entity_registry, mock_config_entry.entry_id
)
# Should have 2 sensors (PT and ES)
assert len(entities) == 2
unique_ids = {entity.unique_id for entity in entities}
expected_ids = {"pt_spot_price", "es_spot_price"}
assert unique_ids == expected_ids
assert (pt_state := hass.states.get("sensor.omie_portugal_spot_price"))
assert (es_state := hass.states.get("sensor.omie_spain_spot_price"))
assert pt_state.state == "351151500.0"
assert es_state.state == "34151500.0"
@pytest.mark.usefixtures("hass_lisbon")
@@ -246,29 +246,13 @@
'routes': list([
]),
'rssi': None,
'version': 1,
'version': 2,
'zha_lib_entities': dict({
'alarm_control_panel': list([
dict({
'info_object': dict({
'available': True,
'class_name': 'AlarmControlPanel',
'cluster_handlers': list([
dict({
'class_name': 'IasAceClientClusterHandler',
'cluster': dict({
'id': 1281,
'name': 'IAS Ancillary Control Equipment',
'type': 'client',
}),
'endpoint_id': 1,
'generic_id': 'cluster_handler_0x0501_client',
'id': '1:0x0501_client',
'status': 'INITIALIZED',
'unique_id': '**REDACTED**',
'value_attribute': None,
}),
]),
'code_arm_required': False,
'code_format': 'number',
'device_class': None,
@@ -302,22 +286,6 @@
'attribute_name': 'zone_status',
'available': True,
'class_name': 'IASZone',
'cluster_handlers': list([
dict({
'class_name': 'IASZoneClusterHandler',
'cluster': dict({
'id': 1280,
'name': 'IAS Zone',
'type': 'server',
}),
'endpoint_id': 1,
'generic_id': 'cluster_handler_0x0500',
'id': '1:0x0500',
'status': 'INITIALIZED',
'unique_id': '**REDACTED**',
'value_attribute': None,
}),
]),
'device_class': None,
'device_ieee': '**REDACTED**',
'enabled': True,
+1 -6
View File
@@ -102,12 +102,7 @@ async def test_cover(
entity_id = find_entity_id(Platform.COVER, zha_device_proxy, hass)
assert entity_id is not None
assert (
not zha_device_proxy.device.endpoints[1]
.all_cluster_handlers[f"1:0x{cluster.cluster_id:04x}"]
.inverted
)
assert cluster.read_attributes.call_count == 3
assert cluster.read_attributes.call_count == 2
assert (
WCAttrs.current_position_lift_percentage.name
in cluster.read_attributes.call_args[0][0]
+2 -47
View File
@@ -202,12 +202,9 @@ async def test_action(
await hass.async_block_till_done()
calls = async_mock_service(hass, DOMAIN, "warning_device_warn")
cluster_handler = (
gateway.get_device(zigpy_device.ieee)
.endpoints[1]
.client_cluster_handlers["1:0x0006_client"]
zigpy_device.endpoints[1].out_clusters[general.OnOff.cluster_id].listener_event(
"zha_send_event", COMMAND_SINGLE, []
)
cluster_handler.zha_send_event(COMMAND_SINGLE, [])
await hass.async_block_till_done()
assert len(calls) == 1
@@ -216,48 +213,6 @@ async def test_action(
assert calls[0].data["ieee"] == ieee_address
async def test_invalid_zha_event_type(
hass: HomeAssistant,
setup_zha: Callable[..., Coroutine[None]],
zigpy_device_mock: Callable[..., Device],
) -> None:
"""Test that unexpected types are not passed to `zha_send_event`."""
await setup_zha()
gateway = get_zha_gateway(hass)
zigpy_device = zigpy_device_mock(
{
1: {
SIG_EP_INPUT: [
general.Basic.cluster_id,
security.IasZone.cluster_id,
security.IasWd.cluster_id,
],
SIG_EP_OUTPUT: [general.OnOff.cluster_id],
SIG_EP_TYPE: zha.DeviceType.ON_OFF_SWITCH,
SIG_EP_PROFILE: zha.PROFILE_ID,
}
}
)
zigpy_device.device_automation_triggers = {
(SHORT_PRESS, SHORT_PRESS): {COMMAND: COMMAND_SINGLE}
}
gateway.get_or_create_device(zigpy_device)
await gateway.async_device_initialized(zigpy_device)
await hass.async_block_till_done(wait_background_tasks=True)
cluster_handler = (
gateway.get_device(zigpy_device.ieee)
.endpoints[1]
.client_cluster_handlers["1:0x0006_client"]
)
# `zha_send_event` accepts only zigpy responses, lists, and dicts
with pytest.raises(TypeError):
cluster_handler.zha_send_event(COMMAND_SINGLE, 123)
async def test_client_unique_id_suffix_stripped(
hass: HomeAssistant,
setup_zha: Callable[..., Coroutine[None]],
+15 -18
View File
@@ -5,10 +5,7 @@ from datetime import timedelta
from unittest.mock import ANY, call, patch
import pytest
from zha.application.const import (
WARNING_DEVICE_MODE_EMERGENCY_PANIC,
WARNING_DEVICE_SOUND_MEDIUM,
)
from zha.application.platforms.siren import SirenLevel, WarningMode
from zigpy.const import SIG_EP_INPUT, SIG_EP_OUTPUT, SIG_EP_PROFILE, SIG_EP_TYPE
from zigpy.device import Device
from zigpy.profiles import zha
@@ -108,12 +105,12 @@ async def test_siren(
False,
0,
ANY,
50, # bitmask for default args
5, # duration in seconds
0,
2,
manufacturer=None,
expect_reply=True,
warning=50, # bitmask for default args
warning_duration=5,
strobe_duty_cycle=0,
stobe_level=2,
)
]
@@ -142,12 +139,12 @@ async def test_siren(
False,
0,
ANY,
2, # bitmask for default args
5, # duration in seconds
0,
2,
manufacturer=None,
expect_reply=True,
warning=2, # bitmask for default args
warning_duration=5,
strobe_duty_cycle=0,
stobe_level=2,
)
]
@@ -173,8 +170,8 @@ async def test_siren(
{
"entity_id": entity_id,
ATTR_DURATION: 10,
ATTR_TONE: WARNING_DEVICE_MODE_EMERGENCY_PANIC,
ATTR_VOLUME_LEVEL: WARNING_DEVICE_SOUND_MEDIUM,
ATTR_TONE: WarningMode.Emergency_Panic,
ATTR_VOLUME_LEVEL: SirenLevel.Medium_level_sound,
},
blocking=True,
)
@@ -184,12 +181,12 @@ async def test_siren(
False,
0,
ANY,
97, # bitmask for passed args
10, # duration in seconds
0,
2,
manufacturer=None,
expect_reply=True,
warning=97, # bitmask for passed args
warning_duration=10,
strobe_duty_cycle=0,
stobe_level=2,
)
]
# test that the state has changed to on
+16 -19
View File
@@ -20,8 +20,12 @@ from zha.application.const import (
ATTR_TYPE,
CLUSTER_TYPE_IN,
)
from zha.zigbee.cluster_handlers import ClusterBindEvent, ClusterConfigureReportingEvent
from zha.zigbee.device import ClusterHandlerConfigurationComplete, Device
from zha.zigbee.device import (
ClusterBindEvent,
ClusterConfigureReportingEvent,
Device,
DeviceConfiguredEvent,
)
import zigpy.backups
from zigpy.const import SIG_EP_INPUT, SIG_EP_OUTPUT, SIG_EP_PROFILE, SIG_EP_TYPE
import zigpy.profiles.zha
@@ -1179,10 +1183,12 @@ async def test_websocket_reconfigure(
zha_device_proxy = get_zha_gateway_proxy(hass).get_device_proxy(zha_device.ieee)
async def mock_reinterview(ieee: EUI64) -> None:
zha_device_proxy.handle_zha_channel_configure_reporting(
zha_device_proxy.handle_zha_cluster_configure_reporting(
ClusterConfigureReportingEvent(
cluster_name="Window Covering",
device_ieee=zha_device_proxy.device.ieee,
endpoint_id=1,
cluster_id=258,
cluster_name="Window Covering",
attributes={
"current_position_lift_percentage": {
"min": 0,
@@ -1201,30 +1207,21 @@ async def test_websocket_reconfigure(
"status": "SUCCESS",
},
},
cluster_handler_unique_id="28:2c:02:bf:ff:ea:05:68:1:0x0102",
event_type="zha_channel_message",
event="zha_channel_configure_reporting",
)
)
zha_device_proxy.handle_zha_channel_bind(
zha_device_proxy.handle_zha_cluster_bind(
ClusterBindEvent(
cluster_name="Window Covering",
device_ieee=zha_device_proxy.device.ieee,
endpoint_id=1,
cluster_id=1,
cluster_name="Window Covering",
success=True,
cluster_handler_unique_id="28:2c:02:bf:ff:ea:05:68:1:0x0012",
event_type="zha_channel_message",
event="zha_channel_bind",
)
)
zha_device_proxy.handle_zha_channel_cfg_done(
ClusterHandlerConfigurationComplete(
device_ieee="28:2c:02:bf:ff:ea:05:68",
unique_id="28:2c:02:bf:ff:ea:05:68",
event_type="zha_channel_message",
event="zha_channel_cfg_done",
)
zha_device_proxy.handle_zha_device_configured(
DeviceConfiguredEvent(device_ieee=zha_device_proxy.device.ieee)
)
with patch.object(
+411 -9
View File
@@ -1,14 +1,36 @@
"""The tests for the location automation."""
from datetime import timedelta
from typing import Any
from freezegun.api import FrozenDateTimeFactory
import pytest
import voluptuous as vol
from homeassistant.components import automation, zone
from homeassistant.const import ATTR_ENTITY_ID, ENTITY_MATCH_ALL, SERVICE_TURN_OFF
from homeassistant.const import (
ATTR_ENTITY_ID,
ENTITY_MATCH_ALL,
SERVICE_TURN_OFF,
STATE_UNAVAILABLE,
STATE_UNKNOWN,
)
from homeassistant.core import Context, HomeAssistant, ServiceCall
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.trigger import async_validate_trigger_config
from homeassistant.setup import async_setup_component
from tests.common import mock_component
from tests.common import async_fire_time_changed, mock_component
from tests.components.common import (
TriggerStateDescription,
assert_trigger_behavior_all,
assert_trigger_behavior_each,
assert_trigger_behavior_first,
assert_trigger_options_supported,
parametrize_target_entities,
parametrize_trigger_states,
target_entities,
)
@pytest.fixture(autouse=True)
@@ -343,10 +365,7 @@ async def test_unknown_zone(
},
)
assert (
"Automation 'My Automation' is referencing non-existing zone"
" 'zone.no_such_zone' in a zone trigger" not in caplog.text
)
assert "Non-existing zone 'zone.no_such_zone' in a zone trigger" not in caplog.text
hass.states.async_set(
"test.entity",
@@ -356,7 +375,390 @@ async def test_unknown_zone(
)
await hass.async_block_till_done()
assert (
"Automation 'My Automation' is referencing non-existing zone"
" 'zone.no_such_zone' in a zone trigger" in caplog.text
assert "Non-existing zone 'zone.no_such_zone' in a zone trigger" in caplog.text
# --- New-style zone trigger tests ---
ZONE_HOME = "zone.home"
ZONE_WORK = "zone.work"
IN_ZONES_HOME = {"in_zones": [ZONE_HOME]}
IN_ZONES_WORK = {"in_zones": [ZONE_WORK]}
IN_ZONES_NONE: dict[str, list[str]] = {"in_zones": []}
TRIGGER_ZONE = ZONE_HOME
@pytest.mark.parametrize(
("trigger_key", "base_options", "supports_behavior", "supports_duration"),
[
("zone.entered", {"zone": TRIGGER_ZONE}, True, True),
("zone.left", {"zone": TRIGGER_ZONE}, True, True),
],
)
async def test_zone_trigger_options_validation(
hass: HomeAssistant,
trigger_key: str,
base_options: dict[str, Any] | None,
supports_behavior: bool,
supports_duration: bool,
) -> None:
"""Test that zone triggers support the expected options."""
await assert_trigger_options_supported(
hass,
trigger_key,
base_options,
supports_behavior=supports_behavior,
supports_duration=supports_duration,
)
@pytest.mark.parametrize("trigger_key", ["zone.entered", "zone.left"])
async def test_zone_trigger_rejects_non_zone_entity_id(
hass: HomeAssistant, trigger_key: str
) -> None:
"""Test that the zone option must reference entities in the zone domain."""
with pytest.raises(vol.Invalid):
await async_validate_trigger_config(
hass,
[
{
"platform": trigger_key,
"target": {"entity_id": "person.alice"},
"options": {"zone": "person.alice"},
}
],
)
@pytest.fixture
async def target_zone_entities(
hass: HomeAssistant, domain: str
) -> dict[str, list[str]]:
"""Create multiple zone-trackable entities associated with different targets."""
return await target_entities(hass, domain, domain_excluded="sensor")
_ZONE_TRIGGER_STATES = [
*parametrize_trigger_states(
trigger="zone.entered",
trigger_options={"zone": TRIGGER_ZONE},
target_states=[
("home", IN_ZONES_HOME),
],
other_states=[
("not_home", IN_ZONES_NONE),
("Work", IN_ZONES_WORK),
],
),
*parametrize_trigger_states(
trigger="zone.left",
trigger_options={"zone": TRIGGER_ZONE},
target_states=[
("not_home", IN_ZONES_NONE),
("Work", IN_ZONES_WORK),
],
other_states=[
("home", IN_ZONES_HOME),
],
),
]
def _parametrize_zone_target_entities() -> list[tuple[dict[str, Any], str, int, str]]:
"""Parametrize target entities for all supported zone trigger domains."""
return [
(*params, domain)
for domain in ("person", "device_tracker")
for params in parametrize_target_entities(domain)
]
@pytest.mark.parametrize(
("trigger_target_config", "entity_id", "entities_in_target", "domain"),
_parametrize_zone_target_entities(),
)
@pytest.mark.parametrize(
("trigger", "trigger_options", "states"),
_ZONE_TRIGGER_STATES,
)
async def test_zone_trigger_behavior_each(
hass: HomeAssistant,
target_zone_entities: dict[str, list[str]],
trigger_target_config: dict[str, Any],
entity_id: str,
entities_in_target: int,
trigger: str,
trigger_options: dict[str, Any],
states: list[TriggerStateDescription],
) -> None:
"""Test zone triggers fire when any targeted entity changes."""
await assert_trigger_behavior_each(
hass,
target_entities=target_zone_entities,
trigger_target_config=trigger_target_config,
entity_id=entity_id,
entities_in_target=entities_in_target,
trigger=trigger,
trigger_options=trigger_options,
states=states,
)
@pytest.mark.parametrize(
("trigger_target_config", "entity_id", "entities_in_target", "domain"),
_parametrize_zone_target_entities(),
)
@pytest.mark.parametrize(
("trigger", "trigger_options", "states"),
_ZONE_TRIGGER_STATES,
)
async def test_zone_trigger_behavior_first(
hass: HomeAssistant,
target_zone_entities: dict[str, list[str]],
trigger_target_config: dict[str, Any],
entity_id: str,
entities_in_target: int,
trigger: str,
trigger_options: dict[str, Any],
states: list[TriggerStateDescription],
) -> None:
"""Test zone triggers fire when first targeted entity changes."""
await assert_trigger_behavior_first(
hass,
target_entities=target_zone_entities,
trigger_target_config=trigger_target_config,
entity_id=entity_id,
entities_in_target=entities_in_target,
trigger=trigger,
trigger_options=trigger_options,
states=states,
)
@pytest.mark.parametrize(
("trigger_target_config", "entity_id", "entities_in_target", "domain"),
_parametrize_zone_target_entities(),
)
@pytest.mark.parametrize(
("trigger", "trigger_options", "states"),
_ZONE_TRIGGER_STATES,
)
async def test_zone_trigger_behavior_all(
hass: HomeAssistant,
target_zone_entities: dict[str, list[str]],
trigger_target_config: dict[str, Any],
entity_id: str,
entities_in_target: int,
trigger: str,
trigger_options: dict[str, Any],
states: list[TriggerStateDescription],
) -> None:
"""Test zone triggers fire when last targeted entity changes."""
await assert_trigger_behavior_all(
hass,
target_entities=target_zone_entities,
trigger_target_config=trigger_target_config,
entity_id=entity_id,
entities_in_target=entities_in_target,
trigger=trigger,
trigger_options=trigger_options,
states=states,
)
# --- Zone occupancy trigger tests ---
@pytest.mark.usefixtures("enable_labs_preview_features")
@pytest.mark.parametrize(
("trigger_key"),
["zone.occupancy_detected", "zone.occupancy_cleared"],
)
async def test_zone_occupancy_trigger_options_validation(
hass: HomeAssistant,
trigger_key: str,
) -> None:
"""Test that occupancy triggers support the expected options."""
await assert_trigger_options_supported(
hass,
trigger_key,
{"zone": ZONE_HOME},
supports_behavior=False,
supports_duration=True,
supports_target=False,
)
@pytest.mark.usefixtures("enable_labs_preview_features")
@pytest.mark.parametrize(
("trigger_key", "from_state", "to_state", "should_fire"),
[
# occupancy_detected
pytest.param("zone.occupancy_detected", "0", "1", True, id="detected_0_to_1"),
pytest.param("zone.occupancy_detected", "0", "3", True, id="detected_0_to_3"),
pytest.param("zone.occupancy_detected", "1", "2", False, id="detected_1_to_2"),
pytest.param("zone.occupancy_detected", "2", "0", False, id="detected_2_to_0"),
pytest.param(
"zone.occupancy_detected",
STATE_UNKNOWN,
"1",
False,
id="detected_unknown_to_1",
),
pytest.param(
"zone.occupancy_detected",
STATE_UNAVAILABLE,
"1",
False,
id="detected_unavailable_to_1",
),
pytest.param(
"zone.occupancy_detected",
"0",
STATE_UNAVAILABLE,
False,
id="detected_0_to_unavailable",
),
# occupancy_cleared
pytest.param("zone.occupancy_cleared", "1", "0", True, id="cleared_1_to_0"),
pytest.param("zone.occupancy_cleared", "3", "0", True, id="cleared_3_to_0"),
pytest.param("zone.occupancy_cleared", "2", "1", False, id="cleared_2_to_1"),
pytest.param("zone.occupancy_cleared", "0", "1", False, id="cleared_0_to_1"),
pytest.param(
"zone.occupancy_cleared",
"1",
STATE_UNAVAILABLE,
False,
id="cleared_1_to_unavailable",
),
pytest.param(
"zone.occupancy_cleared",
"1",
STATE_UNKNOWN,
False,
id="cleared_1_to_unknown",
),
],
)
async def test_zone_occupancy_trigger_transitions(
hass: HomeAssistant,
service_calls: list[ServiceCall],
trigger_key: str,
from_state: str,
to_state: str,
should_fire: bool,
) -> None:
"""Test occupancy triggers fire on the expected numeric-state transitions."""
hass.states.async_set(ZONE_HOME, from_state)
await hass.async_block_till_done()
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: {
"trigger": {
"trigger": trigger_key,
"options": {"zone": ZONE_HOME},
},
"action": {"service": "test.automation"},
}
},
)
hass.states.async_set(ZONE_HOME, to_state)
await hass.async_block_till_done()
assert (len(service_calls) == 1) is should_fire
@pytest.mark.usefixtures("enable_labs_preview_features")
@pytest.mark.parametrize(
("trigger_key", "from_value", "to_value", "revert_value"),
[
("zone.occupancy_detected", "0", "1", "0"),
("zone.occupancy_cleared", "1", "0", "1"),
],
)
async def test_zone_occupancy_trigger_for_duration(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
service_calls: list[ServiceCall],
trigger_key: str,
from_value: str,
to_value: str,
revert_value: str,
) -> None:
"""Test that `for` delays the firing and an early revert cancels it."""
hass.states.async_set(ZONE_HOME, from_value)
await hass.async_block_till_done()
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: {
"trigger": {
"trigger": trigger_key,
"options": {"zone": ZONE_HOME, "for": {"seconds": 5}},
},
"action": {"service": "test.automation"},
}
},
)
# Transition, then revert before the duration elapses -> no fire.
hass.states.async_set(ZONE_HOME, to_value)
await hass.async_block_till_done()
hass.states.async_set(ZONE_HOME, revert_value)
await hass.async_block_till_done()
freezer.tick(timedelta(seconds=10))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(service_calls) == 0
# Transition and hold past the duration -> fire once.
hass.states.async_set(ZONE_HOME, to_value)
await hass.async_block_till_done()
freezer.tick(timedelta(seconds=10))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(service_calls) == 1
@pytest.mark.usefixtures("enable_labs_preview_features")
async def test_zone_occupancy_trigger_payload(
hass: HomeAssistant, service_calls: list[ServiceCall]
) -> None:
"""Test the payload exposed to the action template."""
hass.states.async_set(ZONE_HOME, "0")
await hass.async_block_till_done()
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: {
"trigger": {
"trigger": "zone.occupancy_detected",
"options": {"zone": ZONE_HOME},
},
"action": {
"service": "test.automation",
"data_template": {
"some": (
"{{ trigger.entity_id }}"
" - {{ trigger.from_state.state }}"
" - {{ trigger.to_state.state }}"
" - {{ trigger.for }}"
)
},
},
}
},
)
hass.states.async_set(ZONE_HOME, "2")
await hass.async_block_till_done()
assert len(service_calls) == 1
assert service_calls[0].data["some"] == f"{ZONE_HOME} - 0 - 2 - None"
+137 -19
View File
@@ -14,6 +14,7 @@ from pytest_unordered import unordered
import voluptuous as vol
from homeassistant.components import automation
from homeassistant.components.device_automation import DEVICE_TRIGGER_BASE_SCHEMA
from homeassistant.components.sun import DOMAIN as SUN_DOMAIN
from homeassistant.components.system_health import DOMAIN as SYSTEM_HEALTH_DOMAIN
from homeassistant.components.tag import DOMAIN as TAG_DOMAIN
@@ -41,7 +42,11 @@ from homeassistant.core import (
callback,
)
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_validation as cv, trigger
from homeassistant.helpers import (
config_validation as cv,
device_registry as dr,
trigger,
)
from homeassistant.helpers.automation import (
DomainSpec,
move_top_level_schema_fields_to_options,
@@ -78,6 +83,7 @@ from homeassistant.util.unit_conversion import TemperatureConverter
from homeassistant.util.yaml.loader import parse_yaml
from tests.common import (
MockConfigEntry,
MockModule,
MockPlatform,
async_fire_time_changed,
@@ -4618,6 +4624,34 @@ async def test_entity_trigger_duration_cancelled_on_invalid_state(
unsub()
@pytest.fixture
def mock_test_modern_trigger(hass: HomeAssistant) -> None:
"""Register a mock 'test' integration and trigger platform exposing 'test.modern'."""
class MockModernTrigger(Trigger):
"""Mock modern trigger that accepts any options/target."""
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
return config
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
return lambda: None
async def async_get_triggers(
hass: HomeAssistant,
) -> dict[str, type[Trigger]]:
return {"modern": MockModernTrigger}
mock_integration(hass, MockModule("test"))
mock_platform(hass, "test.trigger", Mock(async_get_triggers=async_get_triggers))
@pytest.mark.usefixtures("mock_test_modern_trigger")
@pytest.mark.parametrize(
("trigger_conf", "expected"),
[
@@ -4627,7 +4661,7 @@ async def test_entity_trigger_duration_cancelled_on_invalid_state(
id="state",
),
pytest.param(
{"platform": "numeric_state", "entity_id": ["sensor.a"]},
{"platform": "numeric_state", "entity_id": ["sensor.a"], "above": 5},
["sensor.a"],
id="numeric_state",
),
@@ -4639,36 +4673,61 @@ async def test_entity_trigger_duration_cancelled_on_invalid_state(
pytest.param(
{
"platform": "zone",
"entity_id": ["person.a"],
"zone": "zone.home",
"event": "enter",
"options": {
"entity_id": ["person.a"],
"zone": "zone.home",
"event": "enter",
},
},
["person.a", "zone.home"],
id="zone-legacy",
),
pytest.param(
{"platform": "geo_location", "zone": "zone.home"},
{
"platform": "zone.entered",
"target": {"entity_id": ["person.a", "device_tracker.b"]},
"options": {"zone": "zone.home"},
},
["person.a", "device_tracker.b", "zone.home"],
id="zone-entered-modern",
),
pytest.param(
{
"platform": "zone.left",
"target": {"entity_id": "person.a"},
"options": {"zone": "zone.home"},
},
["person.a", "zone.home"],
id="zone-left-modern",
),
pytest.param(
{"platform": "geo_location", "zone": "zone.home", "source": "test"},
["zone.home"],
id="geo_location",
),
pytest.param(
{"platform": "sun"},
{"platform": "sun", "event": "sunrise"},
["sun.sun"],
id="sun",
),
pytest.param(
{"platform": "event", "event_data": {"entity_id": "sensor.x"}},
{
"platform": "event",
"event_type": "test_event",
"event_data": {"entity_id": "sensor.x"},
},
["sensor.x"],
id="event-with-entity-id",
),
pytest.param(
{"platform": "event"},
{"platform": "event", "event_type": "test_event"},
[],
id="event-without-entity-id",
),
pytest.param(
{
"platform": "event",
"event_type": "test_event",
"event_data": {"entity_id": "not-a-valid-entity-id"},
},
[],
@@ -4677,6 +4736,7 @@ async def test_entity_trigger_duration_cancelled_on_invalid_state(
pytest.param(
{
"platform": "event",
"event_type": "test_event",
"event_data": {"entity_id": ["sensor.x", "sensor.y"]},
},
[],
@@ -4707,38 +4767,89 @@ async def test_entity_trigger_duration_cancelled_on_invalid_state(
),
],
)
def test_async_extract_entities(
trigger_conf: dict[str, Any], expected: list[str]
async def test_async_extract_entities(
hass: HomeAssistant,
trigger_conf: dict[str, Any],
expected: list[str],
) -> None:
"""Test extracting entities from various trigger config shapes."""
[trigger_conf] = await trigger.async_validate_trigger_config(hass, [trigger_conf])
assert trigger.async_extract_entities(trigger_conf) == expected
_MOCK_DEVICE_ID = "_mock_device_id_"
@pytest.fixture
async def mock_device_automation(hass: HomeAssistant) -> str:
"""Register a mock 'test' integration, device_trigger platform, and device.
Returns the device id, which tests substitute for _MOCK_DEVICE_ID in their
parametrized configs so the device platform branch can validate properly.
"""
mock_integration(hass, MockModule("test"))
mock_platform(
hass,
"test.device_trigger",
Mock(
TRIGGER_SCHEMA=DEVICE_TRIGGER_BASE_SCHEMA.extend({}, extra=vol.ALLOW_EXTRA)
),
)
config_entry = MockConfigEntry(domain="test")
config_entry.add_to_hass(hass)
device = dr.async_get(hass).async_get_or_create(
config_entry_id=config_entry.entry_id,
identifiers={("test", "test")},
)
return device.id
def _substitute(obj: Any, placeholder: str, value: str) -> Any:
"""Recursively replace `placeholder` with `value` inside lists/dicts."""
if isinstance(obj, dict):
return {k: _substitute(v, placeholder, value) for k, v in obj.items()}
if isinstance(obj, list):
return [_substitute(v, placeholder, value) for v in obj]
return value if obj == placeholder else obj
@pytest.mark.parametrize(
("trigger_conf", "expected"),
[
pytest.param(
{"platform": "device", "device_id": "abc123"},
["abc123"],
{
"platform": "device",
"device_id": _MOCK_DEVICE_ID,
"domain": "test",
},
[_MOCK_DEVICE_ID],
id="device",
),
pytest.param(
{"platform": "event", "event_data": {"device_id": "abc123"}},
{
"platform": "event",
"event_type": "test_event",
"event_data": {"device_id": "abc123"},
},
["abc123"],
id="event-with-device-id",
),
pytest.param(
{"platform": "event"},
{"platform": "event", "event_type": "test_event"},
[],
id="event-without-device-id",
),
pytest.param(
{"platform": "tag", "device_id": ["abc123", "def456"]},
{
"platform": "tag",
"tag_id": "mytag",
"device_id": ["abc123", "def456"],
},
["abc123", "def456"],
id="tag-with-device-id",
),
pytest.param(
{"platform": "tag"},
{"platform": "tag", "tag_id": "mytag"},
[],
id="tag-without-device-id",
),
@@ -4762,8 +4873,15 @@ def test_async_extract_entities(
),
],
)
def test_async_extract_devices(
trigger_conf: dict[str, Any], expected: list[str]
async def test_async_extract_devices(
hass: HomeAssistant,
mock_test_modern_trigger: None,
mock_device_automation: str,
trigger_conf: dict[str, Any],
expected: list[str],
) -> None:
"""Test extracting devices from various trigger config shapes."""
trigger_conf = _substitute(trigger_conf, _MOCK_DEVICE_ID, mock_device_automation)
expected = _substitute(expected, _MOCK_DEVICE_ID, mock_device_automation)
[trigger_conf] = await trigger.async_validate_trigger_config(hass, [trigger_conf])
assert trigger.async_extract_devices(trigger_conf) == expected
+55 -22
View File
@@ -1,5 +1,6 @@
"""Test methods in backup_restore."""
from io import BytesIO
import json
from pathlib import Path
import tarfile
@@ -386,8 +387,8 @@ def test_restore_backup(
}
def test_restore_backup_filter_files(tmp_path: Path) -> None:
"""Test filtering dangerous files when restoring a backup."""
def test_restore_backup_rejects_unsafe_files(tmp_path: Path) -> None:
"""Test that a backup with unsafe paths is rejected."""
backup_file_path = tmp_path / "backups" / "test.tar"
backup_file_path.parent.mkdir()
get_fixture_path(
@@ -410,7 +411,55 @@ def test_restore_backup_filter_files(tmp_path: Path) -> None:
"data/home-assistant_v2.db-wal",
}
real_extractone = tarfile.TarFile._extract_one
with (
mock.patch(
"homeassistant.backup_restore.restore_backup_file_content",
return_value=backup_restore.RestoreBackupFileContent(
backup_file_path=backup_file_path,
password=None,
remove_after_restore=False,
restore_database=True,
restore_homeassistant=True,
),
),
pytest.raises(tarfile.FilterError),
):
backup_restore.restore_backup(tmp_path.as_posix())
result = restore_result_file_content(tmp_path)
assert result is not None
assert result["success"] is False
assert result["error_type"] in {"AbsolutePathError", "OutsideDestinationError"}
def test_restore_backup_rejects_absolute_symlink(tmp_path: Path) -> None:
"""Test rejection of a symlink whose linkname escapes the destination.
A SYMTYPE entry followed by a regular file whose name traverses the
symlink would otherwise land attacker-controlled bytes outside the
extraction directory. The tar filter resolves the path after the
symlink and rejects the entry.
"""
backup_file_path = tmp_path / "backups" / "test.tar"
backup_file_path.parent.mkdir()
with tarfile.open(backup_file_path, "w") as tar:
backup_json = json.dumps(
{"homeassistant": {"version": "0.0.0"}, "compressed": False}
).encode()
info = tarfile.TarInfo(name="./backup.json")
info.size = len(backup_json)
tar.addfile(info, BytesIO(backup_json))
symlink = tarfile.TarInfo(name="pwn")
symlink.type = tarfile.SYMTYPE
symlink.linkname = "/tmp" # noqa: S108
tar.addfile(symlink)
payload = b"pwned"
evil = tarfile.TarInfo(name="pwn/ha_escape_target")
evil.size = len(payload)
tar.addfile(evil, BytesIO(payload))
with (
mock.patch(
@@ -423,27 +472,11 @@ def test_restore_backup_filter_files(tmp_path: Path) -> None:
restore_homeassistant=True,
),
),
mock.patch(
"tarfile.TarFile._extract_one", autospec=True, wraps=real_extractone
) as extractone_mock,
pytest.raises(tarfile.FilterError),
):
assert backup_restore.restore_backup(tmp_path.as_posix()) is True
backup_restore.restore_backup(tmp_path.as_posix())
# Check the unsafe files are not extracted, and that the safe files are extracted
extracted_files = {call.args[1].name for call in extractone_mock.mock_calls}
assert extracted_files == {
"./backup.json", # From the outer tar
"homeassistant.tar.gz", # From the outer tar
".",
"data",
"data/home-assistant_v2.db",
"data/home-assistant_v2.db-wal",
}
assert restore_result_file_content(tmp_path) == {
"error": None,
"error_type": None,
"success": True,
}
assert not Path("/tmp/ha_escape_target").exists() # noqa: S108
@pytest.mark.parametrize(("remove_after_restore"), [True, False])