Files
ArcWater/arcwater_to_influx.py
2026-03-05 15:02:31 +11:00

223 lines
7.8 KiB
Python

#!/usr/bin/env python3
import os
import sys
import time
import argparse
import traceback
from dataclasses import dataclass
from datetime import date, datetime, timedelta
from zoneinfo import ZoneInfo
import requests
from playwright.sync_api import sync_playwright
from influxdb_client import InfluxDBClient, Point, WriteOptions
BASE = "https://arcwater.armidale.nsw.gov.au"
LOGIN_URL = BASE + "/"
DATA_URL = BASE + "/wsAjaxFunctions.asmx/recentWaterUsage"
LOCAL_TZ = ZoneInfo(os.environ.get("TZ", "Australia/Sydney"))
UTC = ZoneInfo("UTC")
@dataclass(frozen=True)
class Config:
arc_user: str
arc_pass: str
install_id: str
influx_url: str
influx_token: str
influx_org: str
influx_bucket: str
influx_measurement: str
def as_requests_cookiejar(pw_cookies):
jar = requests.cookies.RequestsCookieJar()
for c in pw_cookies:
jar.set(c["name"], c["value"], domain=c.get("domain"), path=c.get("path", "/"))
return jar
def playwright_login_get_cookies(arc_user: str, arc_pass: str, install_id: str, headless: bool = True):
meter_url = f"{BASE}/meter/{install_id}"
with sync_playwright() as p:
browser = p.chromium.launch(
headless=headless,
args=["--no-sandbox", "--disable-dev-shm-usage", "--disable-gpu"],
)
context = browser.new_context(
viewport={"width": 1280, "height": 720},
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
)
page = context.new_page()
page.goto(LOGIN_URL, wait_until="networkidle")
page.wait_for_timeout(1000)
email = page.locator("input[placeholder='Email']").first
password = page.locator("input[type='password']:not([name='fakepasswordautofill'])").first
btn = page.locator("#ctl00_cphBody_pnlLogin_ctl00_btnLogin")
email.wait_for(state="visible", timeout=30000)
password.wait_for(state="visible", timeout=30000)
btn.wait_for(state="visible", timeout=30000)
email.click()
page.keyboard.press("Control+A")
page.keyboard.type(arc_user, delay=35)
password.click()
page.keyboard.press("Control+A")
page.keyboard.type(arc_pass, delay=35)
btn.click(force=True)
page.wait_for_url("**/home*", timeout=30000)
page.goto(meter_url, wait_until="networkidle")
if page.url.rstrip("/") == BASE.rstrip("/"):
raise RuntimeError("Meter page redirected to root — not authenticated or install_id not accessible.")
cookies = context.cookies()
cookies = [c for c in cookies if "arcwater.armidale.nsw.gov.au" in (c.get("domain") or "")]
browser.close()
return cookies, page.url
def arc_fetch_series(install_id: str, days_back: int, granularity: str, chart_type: str, pw_cookies, referer_url: str):
"""
granularity: DAILY or HOURLY -> timeOpt
chart_type: POT or AVERAGE -> chartType
Returns list of (utc_timestamp, litres)
"""
end = date.today()
start = end - timedelta(days=days_back)
payload = {
"registrationID": -1,
"pathwayAssessmentKey": "",
"reportDate1": start.isoformat(),
"reportDate2": end.isoformat(),
"chartType": chart_type, # POT (you), AVERAGE (town avg)
"installID": str(install_id),
"privateMeterOpt": "0",
"referenceOpt": "SIMILAR",
"selectedMeasureUnit": "L",
"timeOpt": granularity, # DAILY or HOURLY
"useNetworkData": False,
}
headers = {
"Content-Type": "application/json; charset=utf-8",
"Accept": "application/json, text/javascript, */*; q=0.01",
"X-Requested-With": "XMLHttpRequest",
"Origin": BASE,
"Referer": referer_url,
"User-Agent": "Mozilla/5.0",
}
jar = as_requests_cookiejar(pw_cookies)
with requests.Session() as s:
s.cookies = jar
r = s.post(DATA_URL, headers=headers, json=payload, timeout=30, allow_redirects=True)
if r.status_code != 200:
raise RuntimeError(f"ASMX HTTP {r.status_code}")
data = r.json()
d = data.get("d", {})
if d.get("userNotLoggedIn"):
raise RuntimeError("ASMX says not logged in (userNotLoggedIn=1).")
dates = (d.get("chartDates") or "").split("|")
readings = list(map(int, (d.get("chartReadings") or "").split("|"))) if d.get("chartReadings") else []
if not dates or not readings:
raise RuntimeError("ASMX returned empty chart data.")
if len(dates) != len(readings):
raise RuntimeError(f"ASMX mismatch: {len(dates)} dates vs {len(readings)} readings")
points = []
for dt_str, litres in zip(dates, readings):
dt_str = dt_str.strip()
local_ts = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S").replace(tzinfo=LOCAL_TZ)
utc_ts = local_ts.astimezone(UTC)
points.append((utc_ts, int(litres)))
return points
def write_points(cfg: Config, granularity: str, series_name: str, points):
with InfluxDBClient(url=cfg.influx_url, token=cfg.influx_token, org=cfg.influx_org) as client:
write_api = client.write_api(write_options=WriteOptions(batch_size=2000, flush_interval=2000))
influx_points = []
for ts_utc, litres in points:
p = (
Point(cfg.influx_measurement)
.tag("source", "arcwater")
.tag("install_id", cfg.install_id)
.tag("granularity", granularity)
.tag("series", series_name) # POT or AVERAGE
.field("litres", litres)
.time(ts_utc)
)
influx_points.append(p)
write_api.write(bucket=cfg.influx_bucket, record=influx_points)
write_api.flush()
def load_cfg() -> Config:
return Config(
arc_user=os.environ["ARCWATER_USER"],
arc_pass=os.environ["ARCWATER_PASS"],
install_id=os.environ.get("ARCWATER_INSTALL_ID", "11254"),
influx_url=os.environ["INFLUX_URL"],
influx_token=os.environ["INFLUX_TOKEN"],
influx_org=os.environ["INFLUX_ORG"],
influx_bucket=os.environ["INFLUX_BUCKET"],
influx_measurement=os.environ.get("INFLUX_MEASUREMENT", "arcwater_usage"),
)
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--granularity", choices=["DAILY", "HOURLY"], required=True)
ap.add_argument("--days-back", type=int, default=int(os.environ.get("ARCWATER_DAYS_BACK", "30")))
args = ap.parse_args()
cfg = load_cfg()
attempts = 3
for attempt in range(1, attempts + 1):
try:
cookies, referer = playwright_login_get_cookies(cfg.arc_user, cfg.arc_pass, cfg.install_id, headless=True)
for series_name in ("POT", "AVERAGE"):
pts = arc_fetch_series(
install_id=cfg.install_id,
days_back=args.days_back,
granularity=args.granularity,
chart_type=series_name,
pw_cookies=cookies,
referer_url=referer,
)
write_points(cfg, args.granularity, series_name, pts)
print(f"OK wrote {len(pts)} points: install_id={cfg.install_id} granularity={args.granularity} series={series_name}")
return
except Exception as e:
print(f"FAILED attempt {attempt}/{attempts}: {e}")
if attempt == attempts:
print(traceback.format_exc())
sys.exit(1)
time.sleep(2 * attempt)
if __name__ == "__main__":
main()