#!/usr/bin/env python3 import os import sys import time import argparse import traceback from dataclasses import dataclass from datetime import date, datetime, timedelta from zoneinfo import ZoneInfo import requests from playwright.sync_api import sync_playwright from influxdb_client import InfluxDBClient, Point, WriteOptions BASE = "https://arcwater.armidale.nsw.gov.au" LOGIN_URL = BASE + "/" DATA_URL = BASE + "/wsAjaxFunctions.asmx/recentWaterUsage" LOCAL_TZ = ZoneInfo(os.environ.get("TZ", "Australia/Sydney")) UTC = ZoneInfo("UTC") @dataclass(frozen=True) class Config: arc_user: str arc_pass: str install_id: str influx_url: str influx_token: str influx_org: str influx_bucket: str influx_measurement: str def as_requests_cookiejar(pw_cookies): jar = requests.cookies.RequestsCookieJar() for c in pw_cookies: jar.set(c["name"], c["value"], domain=c.get("domain"), path=c.get("path", "/")) return jar def playwright_login_get_cookies(arc_user: str, arc_pass: str, install_id: str, headless: bool = True): meter_url = f"{BASE}/meter/{install_id}" with sync_playwright() as p: browser = p.chromium.launch( headless=headless, args=["--no-sandbox", "--disable-dev-shm-usage", "--disable-gpu"], ) context = browser.new_context( viewport={"width": 1280, "height": 720}, user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", ) page = context.new_page() page.goto(LOGIN_URL, wait_until="networkidle") page.wait_for_timeout(1000) email = page.locator("input[placeholder='Email']").first password = page.locator("input[type='password']:not([name='fakepasswordautofill'])").first btn = page.locator("#ctl00_cphBody_pnlLogin_ctl00_btnLogin") email.wait_for(state="visible", timeout=30000) password.wait_for(state="visible", timeout=30000) btn.wait_for(state="visible", timeout=30000) email.click() page.keyboard.press("Control+A") page.keyboard.type(arc_user, delay=35) password.click() page.keyboard.press("Control+A") page.keyboard.type(arc_pass, delay=35) btn.click(force=True) page.wait_for_url("**/home*", timeout=30000) page.goto(meter_url, wait_until="networkidle") if page.url.rstrip("/") == BASE.rstrip("/"): raise RuntimeError("Meter page redirected to root — not authenticated or install_id not accessible.") cookies = context.cookies() cookies = [c for c in cookies if "arcwater.armidale.nsw.gov.au" in (c.get("domain") or "")] browser.close() return cookies, page.url def arc_fetch_series(install_id: str, days_back: int, granularity: str, chart_type: str, pw_cookies, referer_url: str): """ granularity: DAILY or HOURLY -> timeOpt chart_type: POT or AVERAGE -> chartType Returns list of (utc_timestamp, litres) """ end = date.today() start = end - timedelta(days=days_back) payload = { "registrationID": -1, "pathwayAssessmentKey": "", "reportDate1": start.isoformat(), "reportDate2": end.isoformat(), "chartType": chart_type, # POT (you), AVERAGE (town avg) "installID": str(install_id), "privateMeterOpt": "0", "referenceOpt": "SIMILAR", "selectedMeasureUnit": "L", "timeOpt": granularity, # DAILY or HOURLY "useNetworkData": False, } headers = { "Content-Type": "application/json; charset=utf-8", "Accept": "application/json, text/javascript, */*; q=0.01", "X-Requested-With": "XMLHttpRequest", "Origin": BASE, "Referer": referer_url, "User-Agent": "Mozilla/5.0", } jar = as_requests_cookiejar(pw_cookies) with requests.Session() as s: s.cookies = jar r = s.post(DATA_URL, headers=headers, json=payload, timeout=30, allow_redirects=True) if r.status_code != 200: raise RuntimeError(f"ASMX HTTP {r.status_code}") data = r.json() d = data.get("d", {}) if d.get("userNotLoggedIn"): raise RuntimeError("ASMX says not logged in (userNotLoggedIn=1).") dates = (d.get("chartDates") or "").split("|") readings = list(map(int, (d.get("chartReadings") or "").split("|"))) if d.get("chartReadings") else [] if not dates or not readings: raise RuntimeError("ASMX returned empty chart data.") if len(dates) != len(readings): raise RuntimeError(f"ASMX mismatch: {len(dates)} dates vs {len(readings)} readings") points = [] for dt_str, litres in zip(dates, readings): dt_str = dt_str.strip() local_ts = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S").replace(tzinfo=LOCAL_TZ) utc_ts = local_ts.astimezone(UTC) points.append((utc_ts, int(litres))) return points def write_points(cfg: Config, granularity: str, series_name: str, points): with InfluxDBClient(url=cfg.influx_url, token=cfg.influx_token, org=cfg.influx_org) as client: write_api = client.write_api(write_options=WriteOptions(batch_size=2000, flush_interval=2000)) influx_points = [] for ts_utc, litres in points: p = ( Point(cfg.influx_measurement) .tag("source", "arcwater") .tag("install_id", cfg.install_id) .tag("granularity", granularity) .tag("series", series_name) # POT or AVERAGE .field("litres", litres) .time(ts_utc) ) influx_points.append(p) write_api.write(bucket=cfg.influx_bucket, record=influx_points) write_api.flush() def load_cfg() -> Config: return Config( arc_user=os.environ["ARCWATER_USER"], arc_pass=os.environ["ARCWATER_PASS"], install_id=os.environ.get("ARCWATER_INSTALL_ID", "11254"), influx_url=os.environ["INFLUX_URL"], influx_token=os.environ["INFLUX_TOKEN"], influx_org=os.environ["INFLUX_ORG"], influx_bucket=os.environ["INFLUX_BUCKET"], influx_measurement=os.environ.get("INFLUX_MEASUREMENT", "arcwater_usage"), ) def main(): ap = argparse.ArgumentParser() ap.add_argument("--granularity", choices=["DAILY", "HOURLY"], required=True) ap.add_argument("--days-back", type=int, default=int(os.environ.get("ARCWATER_DAYS_BACK", "30"))) args = ap.parse_args() cfg = load_cfg() attempts = 3 for attempt in range(1, attempts + 1): try: cookies, referer = playwright_login_get_cookies(cfg.arc_user, cfg.arc_pass, cfg.install_id, headless=True) for series_name in ("POT", "AVERAGE"): pts = arc_fetch_series( install_id=cfg.install_id, days_back=args.days_back, granularity=args.granularity, chart_type=series_name, pw_cookies=cookies, referer_url=referer, ) write_points(cfg, args.granularity, series_name, pts) print(f"OK wrote {len(pts)} points: install_id={cfg.install_id} granularity={args.granularity} series={series_name}") return except Exception as e: print(f"FAILED attempt {attempt}/{attempts}: {e}") if attempt == attempts: print(traceback.format_exc()) sys.exit(1) time.sleep(2 * attempt) if __name__ == "__main__": main()