commit 86002208c3a473c176c4a3f256eb6726fc0f700a Author: Arian Nasr Date: Fri Mar 27 20:03:07 2026 -0400 init. working relay diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..93526df --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +venv/ +__pycache__/ diff --git a/main.py b/main.py new file mode 100644 index 0000000..bea6a12 --- /dev/null +++ b/main.py @@ -0,0 +1,105 @@ +import amqp +import ssl +import sys +import logging +import requests +from weather_utility import parse_weather_xml, publish_weather + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) +logger = logging.getLogger(__name__) + +# Server Configuration +HOST = "dd.weather.gc.ca" +PORT = 5671 +USER = "anonymous" +PASSWORD = "anonymous" +EXCHANGE = "xpublic" + +QUEUE_NAME = "q_anonymous.subscribe.citypage.companyis2ari.ca" +SUBTOPIC = "#.WXO-DD.citypage_weather.ON.#" + +def on_message(message): + routing_key = message.delivery_info.get('routing_key') + body = message.body.decode('utf-8') if isinstance(message.body, bytes) else message.body + + # Split the body by whitespace + # parts[0] = Timestamp + # parts[1] = Base URL (https://dd1.weather.gc.ca/) + # parts[2] = Relative Path + parts = body.split() + + clean_url = f"{parts[1].strip()}{parts[2].strip()}" + + if clean_url.endswith("s0000458_en.xml"): + logger.debug(f"New Toronto weather update available at: {clean_url}") + try: + response = requests.get(clean_url) + response.raise_for_status() + weather_data = parse_weather_xml(response.content) + publish_weather(weather_data) + except Exception as e: + logger.error(f"Failed to fetch or process weather data: {e}") + elif clean_url.endswith("s0000305_en.xml"): + logger.debug(f"New Kaladar weather update available at: {clean_url}") + try: + response = requests.get(clean_url) + response.raise_for_status() + weather_data = parse_weather_xml(response.content) + publish_weather(weather_data) + except Exception as e: + logger.error(f"Failed to fetch or process weather data: {e}") + else: + logger.debug(f"Ignored URL (not Toronto weather): {clean_url}") + + + message.channel.basic_ack(message.delivery_info['delivery_tag']) + +def main(): + ssl_context = ssl.create_default_context() + + try: + logger.debug(f"Connecting to {HOST}:{PORT} as {USER}") + conn = amqp.Connection( + host=f"{HOST}:{PORT}", + userid=USER, + password=PASSWORD, + ssl=ssl_context, + virtual_host="/" + ) + conn.connect() + channel = conn.channel() + + logger.debug(f"Declaring exclusive, auto-deleting queue: {QUEUE_NAME}") + channel.queue_declare( + queue=QUEUE_NAME, + exclusive=True, + auto_delete=True + ) + + logger.debug(f"Binding queue to exchange '{EXCHANGE}' with routing key '{SUBTOPIC}'") + channel.queue_bind( + queue=QUEUE_NAME, + exchange=EXCHANGE, + routing_key=SUBTOPIC + ) + + + channel.basic_consume(queue=QUEUE_NAME, callback=on_message) + + logger.info("Waiting for Ontario weather updates...") + while True: + conn.drain_events() + + except KeyboardInterrupt: + logger.info(f"Stopping script...") + sys.exit(0) + except Exception as e: + logger.error(f"An error occurred: {e}") + +if __name__ == "__main__": + main() + diff --git a/weather_utility.py b/weather_utility.py new file mode 100644 index 0000000..7a944de --- /dev/null +++ b/weather_utility.py @@ -0,0 +1,170 @@ +import xml.etree.ElementTree as ET +import json +import logging +import paho.mqtt.client as mqtt +import re +import io +import os + +logger = logging.getLogger(__name__) + +# --- Configuration --- +BROKER = os.getenv("MQTT_BROKER", "192.168.2.17") +PORT = int(os.getenv("MQTT_PORT", 1883)) +DISCOVERY_PREFIX = "homeassistant" +BASE_TOPIC = "weather/station" + + +def slugify(text): + return re.sub(r'\W+', '_', text).lower() if text else "unknown_station" + + +def parse_weather_xml(source) -> dict | None: + try: + tree = ET.parse(io.BytesIO(source)) + root = tree.getroot() + current = root.find('currentConditions') + if current is None: return None + + wind_speed_str = current.findtext('wind/speed') + if wind_speed_str == "calm": + wind_speed_str = "0" + + # Extract and cast data + return { + "station_name": current.findtext('station'), + "condition": current.findtext('condition'), + "temperature": float(current.findtext('temperature') or 0), + "relative_humidity": float(current.findtext('relativeHumidity') or 0), + "pressure": float(current.findtext('pressure') or 0), + "wind_speed": float(wind_speed_str or 0), + "wind_bearing": float(current.findtext('wind/bearing') or 0), + } + except Exception as e: + logger.error(f"XML Parse Error: {e}") + return None + + +def setup_ha_discovery(client, data): + station_id = slugify(data['station_name']) + state_topic = f"{BASE_TOPIC}/{station_id}/state" + + # --- SENSOR CONFIGURATION --- + # device_class: Describes what the sensor measures (temperature, humidity, etc.) + # state_class: 'measurement' allows HA to create long-term statistics/graphs + sensors = [ + { + "id": "cond", + "name": "Condition", + "class": None, # Generic text sensor + "state_class": None, # Text cannot have a state class + "unit": None, + "key": "condition", + "icon": "mdi:weather-partly-cloudy" # Default icon + }, + { + "id": "temp", + "name": "Temperature", + "class": "temperature", + "state_class": "measurement", + "unit": "°C", + "key": "temperature" + }, + { + "id": "hum", + "name": "Humidity", + "class": "humidity", + "state_class": "measurement", + "unit": "%", + "key": "relative_humidity" + }, + { + "id": "pres", + "name": "Pressure", + "class": "atmospheric_pressure", + "state_class": "measurement", + "unit": "kPa", + "key": "pressure" + }, + { + "id": "wind_s", + "name": "Wind Speed", + "class": "wind_speed", + "state_class": "measurement", + "unit": "km/h", + "key": "wind_speed" + }, + { + "id": "wind_b", + "name": "Wind Bearing", + "class": "wind_direction", # Correct HA class for bearing + "state_class": "measurement", + "unit": "°", + "key": "wind_bearing" + }, + ] + + device_info = { + "identifiers": [f"weather_station_{station_id}"], + "name": data['station_name'], + "model": "XML Weather Station", + "manufacturer": "Python Script" + } + + for s in sensors: + discovery_topic = f"{DISCOVERY_PREFIX}/sensor/{station_id}/{s['id']}/config" + + payload = { + "name": f"{data['station_name']} {s['name']}", + "unique_id": f"{station_id}_{s['id']}", + "state_topic": state_topic, + "value_template": f"{{{{ value_json.{s['key']} }}}}", + "device": device_info + } + + # Apply Optional Configurations + if s.get('class'): + payload["device_class"] = s['class'] + + if s.get('state_class'): + payload["state_class"] = s['state_class'] + + if s.get('unit'): + payload["unit_of_measurement"] = s['unit'] + + if s.get('icon'): + payload["icon"] = s['icon'] + + client.publish(discovery_topic, json.dumps(payload), retain=True) + + return state_topic + + +def publish_weather(data): + if not data: return + + client = mqtt.Client() + try: + client.connect(BROKER, PORT, 60) + + # Start the network loop to handle message handshakes + client.loop_start() + + # This creates the entities in HA (Retained) + state_topic = setup_ha_discovery(client, data) + + # FIX 1: Add retain=True so HA sees the value even if it subscribes late + # FIX 2: Capture the message info object to verify delivery + msg_info = client.publish(state_topic, json.dumps(data), retain=True) + + # FIX 3: Block execution until the message is actually sent to the broker + msg_info.wait_for_publish() + + logger.info(f"Published all fields for {data['station_name']} to {state_topic}") + + except Exception as e: + logger.error(f"MQTT Error: {e}") + finally: + # Stop the loop and disconnect cleanly + client.loop_stop() + client.disconnect()