Compare commits

...

6 Commits

Author SHA1 Message Date
Rolf
afd7f37323 fix in ui 2026-02-13 21:17:23 +01:00
Rolf
08bb09c418 fix in ui 2026-02-13 21:17:04 +01:00
Rolf
154c3a7af3 fixed 2026-02-04 22:49:05 +01:00
Rolf
ac76798d81 fixed 2026-02-04 22:41:49 +01:00
Rolf
674cf900c0 fixed 2026-02-04 22:02:14 +01:00
Rolf
63644dedc7 fixed bugs 2026-02-04 21:22:51 +01:00
5 changed files with 179 additions and 262 deletions

View File

@@ -1,45 +1,39 @@
"""Uster Waste integration."""
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.const import Platform, SERVICE_UPDATE_ENTITY
from homeassistant.const import Platform
from .const import DOMAIN
from .const import DOMAIN, MANUAL_REFRESH_SERVICE
from .sensor import UsterWasteDataUpdateCoordinator
PLATFORMS = [Platform.SENSOR]
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Uster Waste from a config entry."""
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = entry.data
# Register manual refresh service
hass.services.async_register(
DOMAIN,
entry.data.get("name", "uster_waste"),
async_handle_manual_refresh
)
# Set up coordinator *once per entry*
coordinator = UsterWasteDataUpdateCoordinator(hass, entry)
hass.data[DOMAIN][entry.entry_id] = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
# Register manual refresh service
async def handle_refresh(call):
entry_id = call.data.get("entry_id", entry.entry_id)
coord = hass.data[DOMAIN].get(entry_id)
if coord:
await coord.async_request_refresh()
else:
hass.components.persistent_notification.create(
"Uster Waste: No active entry found. Reload the integration.",
title="Uster Waste Error"
)
hass.services.async_register(DOMAIN, MANUAL_REFRESH_SERVICE, handle_refresh)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok
async def async_handle_manual_refresh(service):
"""Handle manual refresh service call."""
entry_id = service.data.get("entry_id")
if not entry_id or entry_id not in hass.data[DOMAIN]:
return
# Trigger update for all sensors under this entry
await hass.services.async_call(
"homeassistant", "update_entity",
{"entity_id": [f"sensor.uster_waste_{entry_id}"]},
blocking=False,
)
"""Unload config entry."""
hass.services.async_remove(DOMAIN, MANUAL_REFRESH_SERVICE)
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

View File

@@ -1,6 +1,5 @@
"""Button platform for Uster Waste."""
import logging
from typing import Optional
from homeassistant.components.button import ButtonEntity
from homeassistant.config_entries import ConfigEntry
@@ -18,12 +17,9 @@ async def async_setup_entry(
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the button."""
config = entry.data
name = config.get("name", "Uster Waste")
entity = UsterWasteButton(
entry_id=entry.entry_id,
name=name
name=entry.data.get("name", "Uster Waste")
)
async_add_entities([entity])
@@ -45,22 +41,16 @@ class UsterWasteButton(ButtonEntity):
async def async_press(self) -> None:
"""Handle the button press (manual refresh)."""
# Trigger a manual update of the sensor
hass = self.hass
coordinator = None
# Trigger a manual update by calling the sensor's update method
# The sensor will fetch fresh data from the Uster website
entity_registry = self.hass.data["entity_registry"]
# Find the coordinator for this entry
if DOMAIN in hass.data:
for entry_id, entry_data in hass.data[DOMAIN].items():
if entry_id == self._entry_id and "coordinator" in entry_data:
coordinator = entry_data["coordinator"]
break
# Find the sensor entity for this entry
entity_id = f"sensor.uster_waste_{self._entry_id}"
if coordinator:
await coordinator.async_update()
# Force sensor to update
for entity in coordinator.entities:
await entity.async_update()
entity.async_write_ha_state()
else:
_LOGGER.error("Coordinator not found for manual refresh")
# Trigger update for the sensor
await self.hass.services.async_call(
"homeassistant", "update_entity",
{"entity_id": entity_id},
blocking=False,
)

View File

@@ -1,48 +1,37 @@
"""Config flow for Uster Waste integration."""
"""Config flow for Uster Waste."""
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.data_entry_flow import FlowResult
from homeassistant.const import CONF_NAME
from .const import DOMAIN
# Suggested defaults (update these as needed)
DEFAULT_NAME = "Uster Waste Schedule"
DATA_SCHEMA = vol.Schema({
vol.Required(CONF_NAME, default=DEFAULT_NAME): str,
vol.Required("token", default=""): str,
vol.Required("id", default=""): str,
vol.Required(CONF_NAME, default="Uster Waste"): str,
vol.Required("token", default="").strip(),
vol.Required("id", default="").strip(),
})
class UsterWasteConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Uster Waste."""
VERSION = 1
async def async_step_user(
self, user_input: dict[str, str] | None = None
) -> FlowResult:
"""Handle the initial step."""
async def async_step_user(self, user_input=None):
if user_input is None:
return self.async_show_form(step_id="user", data_schema=DATA_SCHEMA)
token = user_input["token"].strip()
waste_id = user_input["id"].strip()
if not token or not waste_id:
return self.async_show_form(
step_id="user",
data_schema=DATA_SCHEMA,
errors={"base": "missing_params"},
step_id="user", data_schema=DATA_SCHEMA,
errors={"base": "missing_params"}
)
# Build the URL (store config, not raw URL)
config = {
CONF_NAME: user_input[CONF_NAME],
"token": token,
"id": waste_id,
}
return self.async_create_entry(title=user_input[CONF_NAME], data=config)
return self.async_create_entry(
title=user_input[CONF_NAME],
data={
CONF_NAME: user_input[CONF_NAME],
"token": token,
"id": waste_id,
}
)

View File

@@ -11,3 +11,5 @@ ATTR_ERROR = "error"
# UI Labels
MANUAL_REFRESH = "manual_refresh"
MANUAL_REFRESH_SERVICE = "refresh_waste_schedule"

View File

@@ -1,237 +1,179 @@
"""Sensor platform for Uster Waste."""
import asyncio
"""Uster Waste Sensor."""
import logging
import re
import asyncio
from datetime import datetime, timedelta
from typing import Optional
import aiohttp
from bs4 import BeautifulSoup
from homeassistant.components.sensor import SensorEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .const import (
DOMAIN,
ATTR_NEXT_COLLECTION,
ATTR_DATE,
ATTR_TYPE,
ATTR_DAYS_UNTIL,
ATTR_ENTRIES,
ATTR_ERROR,
MANUAL_REFRESH,
from homeassistant.helpers.update_coordinator import (
DataUpdateCoordinator,
UpdateFailed,
)
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import DOMAIN, MANUAL_REFRESH_SERVICE
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(days=1)
# ✅ CORRECTED URL FORMAT (no %5B, no extra encoded brackets)
BASE_URL = "https://www.uster.ch/abfallstrassenabschnitt"
# Swiss date helpers
MONTH_MAP = {
# For Swiss date parsing
MONTHS = {
"Jan": "01", "Feb": "02", "Mrz": "03", "Mär": "03",
"Apr": "04", "Mai": "05", "Jun": "06", "Jul": "07",
"Aug": "08", "Sep": "09", "Okt": "10", "Nov": "11", "Dez": "12"
}
def _parse_date(date_str: str) -> Optional[datetime]:
"""Convert Swiss date string (e.g., '24.10.2023' or '24. Okt. 2023') to datetime."""
def parse_swiss_date(date_str: str) -> Optional[datetime]:
"""Parse Swiss date like '24. Okt. 2023' or '24.10.2023'."""
date_str = date_str.strip()
# Normalize Swiss month abbreviations
for key, value in MONTH_MAP.items():
date_str = date_str.replace(key, value)
# Try formats: dd.mm.yyyy, d.m.yy
if not date_str:
return None
# Replace Swiss months (e.g., "Okt" → "10")
for swiss, num in MONTHS.items():
date_str = date_str.replace(swiss, num)
# Clean extra spaces/dots
date_str = re.sub(r'\.\s*\.', '.', date_str)
# Try multiple formats
for fmt in ["%d.%m.%Y", "%d.%m.%y"]:
try:
return datetime.strptime(date_str, fmt)
except ValueError:
continue
_LOGGER.warning(f"Could not parse date: '{date_str}'")
_LOGGER.warning(f"Failed to parse date: {date_str}")
return None
class UsterWasteDataUpdateCoordinator(DataUpdateCoordinator[dict]):
"""Fetch and cache Uster waste data."""
def __init__(self, hass: HomeAssistant, entry: ConfigEntry):
self.hass = hass
self.entry = entry
self.token = entry.data["token"]
self.id = entry.data["id"]
self.name = entry.data.get("name", "Uster Waste")
# ✅ CORRECT URL:
self.url = f"{BASE_URL}?token={self.token}&strassenabschnittId={self.id}"
super().__init__(hass, _LOGGER, name="uster_waste", update_interval=timedelta(days=1))
async def _async_update_data(self) -> dict:
"""Fetch the latest data from Uster."""
session = async_get_clientsession(self.hass)
try:
async with session.get(self.url, timeout=10) as resp:
if resp.status != 200:
raise UpdateFailed(
f"HTTP {resp.status} — check token & ID (token may be expired)."
"Get a fresh URL from https://www.uster.ch/abfallstrassenabschnitt"
)
html = await resp.text()
except aiohttp.ClientError as e:
raise UpdateFailed(f"Network error: {e}") from e
# Parse HTML
soup = BeautifulSoup(html, "html.parser")
# Find the table (robust: class="table table-striped" or any <table>)
table = soup.find("table", class_="table table-striped") or soup.find("table")
if not table:
raise UpdateFailed("No waste schedule table found on Uster.ch page.")
rows = table.find_all("tr")
if len(rows) < 2:
raise UpdateFailed("Table has no data rows — check page layout changed.")
now = datetime.now()
entries = []
for row in rows[1:4]: # First 3 data rows
cols = row.find_all("td")
if len(cols) < 2:
continue
collection = cols[0].get_text(strip=True)
date_str = cols[1].get_text(strip=True)
dt = parse_swiss_date(date_str)
if not dt:
_LOGGER.warning(f"Skipping row: invalid date '{date_str}'")
continue
entries.append({
"type": collection,
"date": date_str,
"days_until": (dt - now).days
})
if not entries:
raise UpdateFailed("No valid dates found in table.")
# Sort ascending by date
entries.sort(key=lambda x: x["date"])
return {
"next_collection": entries[0]["type"],
"date": entries[0]["date"],
"days_until": entries[0]["days_until"],
"entries": entries,
}
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the sensor."""
config = entry.data
token = config["token"]
waste_id = config["id"]
name = config.get("name", "Uster Waste")
session = async_get_clientsession(hass)
coordinator = UsterWasteDataUpdateCoordinator(hass, session, token, waste_id)
entity = UsterWasteSensor(
entry_id=entry.entry_id,
coordinator=coordinator,
name=name
)
async_add_entities([entity], update_before_add=True)
class UsterWasteDataUpdateCoordinator:
"""Fetch data from Uster website."""
def __init__(
self,
hass: HomeAssistant,
session: aiohttp.ClientSession,
token: str,
waste_id: str,
):
self.hass = hass
self.session = session
self.token = token
self.waste_id = waste_id
self.data = None
self.last_error = None
self.last_updated = None
async def async_update(self) -> dict:
"""Fetch data (cache if valid)."""
# 1. Check cache (valid for 24h)
if self.last_updated and (datetime.now() - self.last_updated) < SCAN_INTERVAL:
return self.data
url = (
"https://www.uster.ch/abfallstrassenabschnitt"
f"?strassenabschnitt%5B_token%5D={self.token}"
f"&strassenabschnitt%5BstrassenabschnittId%5D={self.waste_id}"
)
try:
async with self.session.get(url, timeout=10) as response:
if response.status == 403 or response.status == 404:
raise Exception(
"Token expired or invalid. "
"Please get a fresh URL from https://www.uster.ch/abfallstrassenabschnitt"
)
response.raise_for_status()
html = await response.text()
# Parse HTML
soup = BeautifulSoup(html, "html.parser")
table = soup.find("table", class_="table table-striped")
if not table:
table = soup.find("table")
if not table:
raise ValueError("No table found on page.")
rows = table.find_all("tr")
if len(rows) < 2:
raise ValueError("Table has no data rows.")
entries = []
now = datetime.now()
for row in rows[1:4]: # Next 3 entries
cols = row.find_all("td")
if len(cols) < 2:
continue
collection_type = cols[0].get_text(strip=True)
date_str = cols[1].get_text(strip=True).replace(" \u00a0", " ") # Clean no-break space
dt = _parse_date(date_str)
if not dt:
_LOGGER.warning(f"Skipping row with invalid date: {date_str}")
continue
entries.append({
"Sammlung": collection_type,
"Wann?": date_str,
"date_obj": dt,
"days_until": (dt - now).days
})
# Sort by date (ascending)
entries.sort(key=lambda x: x["date_obj"])
self.data = {
"next_collection": entries[0]["Sammlung"] if entries else None,
"date": entries[0]["Wann?"] if entries else None,
"type": entries[0]["Sammlung"] if entries else None,
"days_until": entries[0]["days_until"] if entries else None,
"entries": [
{
"type": e["Sammlung"],
"date": e["Wann?"],
"days_until": e["days_until"]
}
for e in entries[:3]
]
}
self.last_updated = datetime.now()
except Exception as e:
_LOGGER.error("Error fetching Uster data: %s", e)
self.data = {
ATTR_ERROR: str(e),
"next_collection": None,
"date": None,
"entries": []
}
self.last_error = str(e)
return self.data
coordinator: UsterWasteDataUpdateCoordinator = hass.data[DOMAIN][entry.entry_id]
async_add_entities([UsterWasteSensor(entry, coordinator)], update_before_add=True)
class UsterWasteSensor(SensorEntity):
"""Uster Waste Sensor Entity."""
"""Uster Waste Collection Sensor."""
_attr_has_entity_name = True
_attr_name = None # Use coordinator name
_attr_icon = "mdi:recycle"
_attr_device_class = None
_attr_native_unit_of_measurement = "days" if "days_until" else None
def __init__(
self,
entry_id: str,
entry: ConfigEntry,
coordinator: UsterWasteDataUpdateCoordinator,
name: str
):
self._entry_id = entry_id
self.coordinator = coordinator
self._attr_name = f"{name} Schedule"
self._attr_unique_id = f"uster_waste_{entry_id}"
self._attr_extra_state_attributes = {
ATTR_ENTRIES: [],
self._attr_unique_id = f"uster_waste_{entry.entry_id}"
self._attr_device_info = {
"name": coordinator.name,
"identifiers": {(DOMAIN, entry.entry_id)},
}
async def async_update(self):
"""Update sensor state."""
self._attr_native_value = len(self.coordinator.data.get("entries", []))
self._attr_extra_state_attributes.update(
{
ATTR_NEXT_COLLECTION: self.coordinator.data.get("next_collection"),
ATTR_DATE: self.coordinator.data.get("date"),
ATTR_TYPE: self.coordinator.data.get("type"),
ATTR_DAYS_UNTIL: self.coordinator.data.get("days_until"),
ATTR_ENTRIES: self.coordinator.data.get("entries", []),
ATTR_ERROR: self.coordinator.data.get("error")
}
)
async def async_added_to_hass(self):
"""When entity is added to hass."""
await super().async_added_to_hass()
self.async_on_remove(
self.coordinator.async_add_listener(
self.async_write_ha_state, self.coordinator.last_updated
)
)
# Force first update
await self.async_update()
@property
def native_value(self) -> Optional[str]:
"""Return next collection type."""
return self.coordinator.data.get("next_collection")
@property
def available(self) -> bool:
"""Return if entity is available."""
return self.coordinator.last_updated is not None
def extra_state_attributes(self) -> dict:
"""Return extended attributes."""
data = self.coordinator.data or {}
return {
"date": data.get("date"),
"days_until": data.get("days_until"),
"entries": data.get("entries", []),
"url": self.coordinator.url, # For debugging
}
async def async_press(self):
"""Handle the button press (manual refresh)."""
await self.async_update()
self.async_write_ha_state()
async def async_update(self) -> None:
"""Manually trigger update (used by refresh service)."""
await self.coordinator.async_request_refresh()