Files
PartDB_Helper_App/provider/selenium_flow.py

1063 lines
42 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os, time, tempfile, shutil, traceback, json
from typing import Tuple
from selenium import webdriver
from selenium.webdriver.common.by import By
# waits
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# Firefox
from selenium.webdriver.firefox.service import Service as FirefoxService
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from webdriver_manager.firefox import GeckoDriverManager
from apis.partdb_api import PartDB
from config import (
PARTDB_BASE,
UI_LANG_PATH,
ENV_USER_VAR,
ENV_PASS_VAR,
ENV_USER,
ENV_PASSWORD,
HEADLESS_PROVIDER,
HEADLESS_CONTROLLER,
HEADLESS_WORKER,
)
GECKO_LOG_PATH = os.path.abspath("geckodriver.log")
def start_firefox_resilient(*, headless_first: bool, allow_chrome_fallback: bool = True):
"""
Start Firefox. Headless is EXACTLY as requested. If Firefox fails and
allow_chrome_fallback=True, we fall back to Chrome (also non-headless).
Emits clear debug prints so you know what launched.
"""
# Make sure MOZ_HEADLESS doesn't override us
os.environ.pop("MOZ_HEADLESS", None)
def _ff_try(headless: bool, binary: str | None = None):
profile_dir = tempfile.mkdtemp(prefix="ff-profile-")
try:
opts = FirefoxOptions()
if headless:
opts.add_argument("-headless") # ONLY when explicitly requested
opts.set_preference("browser.shell.checkDefaultBrowser", False)
opts.set_preference("browser.startup.homepage_override.mstone", "ignore")
opts.add_argument("-profile")
opts.add_argument(profile_dir)
if binary:
opts.binary_location = binary
service = FirefoxService(GeckoDriverManager().install(), log_output=GECKO_LOG_PATH)
drv = webdriver.Firefox(service=service, options=opts)
try:
drv.maximize_window()
except Exception:
pass
print(f"[Selenium] Firefox launched (headless={headless}, bin={binary or 'default'}).")
return drv
except Exception as e:
print(f"[Selenium] Firefox launch failed (headless={headless}, bin={binary or 'default'}): {e}")
shutil.rmtree(profile_dir, ignore_errors=True)
return None
# 1) exact request
d = _ff_try(headless_first, None)
if d:
return d
# 2) try flipping headless (sometimes GPU/driver quirks)
d = _ff_try(not headless_first, None)
if d:
print(f"[Selenium] NOTE: flipped headless to {not headless_first} due to previous failure.")
return d
# 3) try known Firefox binaries
for binpath in [
r"C:\Program Files\Mozilla Firefox\firefox.exe",
r"C:\Program Files (x86)\Mozilla Firefox\firefox.exe",
shutil.which("firefox") or "",
]:
if binpath and os.path.exists(binpath):
d = _ff_try(headless_first, binpath) or _ff_try(not headless_first, binpath)
if d:
return d
# 4) fallback to Chrome if allowed
if allow_chrome_fallback:
try:
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options as ChromeOptions
from webdriver_manager.chrome import ChromeDriverManager
c_opts = ChromeOptions()
# IMPORTANT: no headless unless requested
if headless_first:
c_opts.add_argument("--headless=new")
c_srv = ChromeService(ChromeDriverManager().install())
drv = webdriver.Chrome(service=c_srv, options=c_opts)
try:
drv.maximize_window()
except Exception:
pass
print(f"[Selenium] Chrome fallback launched (headless={headless_first}).")
return drv
except Exception as e:
print(f"[Selenium] Chrome fallback failed: {e}")
# 5) show geckodriver tail to help debug
try:
with open(GECKO_LOG_PATH, "r", encoding="utf-8", errors="ignore") as f:
tail = "".join(f.readlines()[-80:])
print("\n[geckodriver.log tail]\n" + tail + "\n")
except Exception:
pass
raise RuntimeError("Could not launch a visible browser. See messages above.")
def _is_logged_in(driver) -> bool:
try:
url = (driver.current_url or "").lower()
if "/login" in url:
return False
# any “Logout” link or user-menu
if driver.find_elements(By.XPATH, "//a[contains(@href,'logout') or contains(., 'Logout') or contains(., 'Sign out')]"):
return True
# many Part-DB skins have a user dropdown with a logout item:
if driver.find_elements(By.CSS_SELECTOR, "[data-user-menu], .user-menu, a[href*='/logout']"):
return True
# if we can see a main nav thats only visible when logged in:
if driver.find_elements(By.CSS_SELECTOR, "nav, .navbar, header .nav"):
# dont return True on the login pages navbar:
return "/login" not in url
except Exception:
pass
return False
COOKIES_FILE = os.path.abspath("partdb_cookies.json")
def _save_cookies(driver, base_url: str, path: str = COOKIES_FILE):
try:
driver.get(base_url + "/")
time.sleep(0.2)
with open(path, "w", encoding="utf-8") as f:
json.dump(driver.get_cookies(), f)
print("[Login] Cookies saved.")
except Exception as e:
print("[Login] Save cookies failed:", e)
def _load_cookies(driver, base_url: str, path: str = COOKIES_FILE) -> bool:
if not os.path.exists(path):
return False
try:
driver.get(base_url + "/")
time.sleep(0.2)
with open(path, "r", encoding="utf-8") as f:
for c in json.load(f):
try:
driver.add_cookie(c)
except Exception:
pass
driver.get(base_url + "/")
time.sleep(0.6)
ok = _is_logged_in(driver)
print("[Login] Cookies loaded →", "OK" if ok else "not logged in")
return ok
except Exception as e:
print("[Login] Load cookies failed:", e)
return False
def _try_auto_login(driver, base_url: str, username: str, password: str, timeout_s: int = 120) -> bool:
"""
Very robust login:
- opens login page
- fills known selectors
- JS fallback to submit the form
- waits until /login disappears AND a logout link is visible
"""
login_url = base_url.rstrip("/") + UI_LANG_PATH + "/login"
driver.get(login_url)
wait = WebDriverWait(driver, 30)
# Find username field (many variants)
user_sel = [
"input[name='_username']",
"input#username",
"input[name='username']",
"input[type='email']",
"input[type='text']",
]
pass_sel = [
"input[name='_password']",
"input#password",
"input[name='password']",
"input[type='password']",
]
submit_sel = [
"button[type='submit']",
"input[type='submit']",
"button.btn-primary",
"button[class*='login']",
"form button",
]
def _query_any(selectors):
for s in selectors:
els = driver.find_elements(By.CSS_SELECTOR, s)
if els:
return els[0]
return None
# Wait for any username/password field to appear
try:
wait.until(lambda d: _query_any(user_sel) and _query_any(pass_sel))
except Exception:
return False
u = _query_any(user_sel); p = _query_any(pass_sel)
try:
u.clear(); u.send_keys(username)
p.clear(); p.send_keys(password)
except Exception:
pass
# Try clicking a submit button; fall back to JS submit
btn = _query_any(submit_sel)
if btn:
try:
driver.execute_script("arguments[0].scrollIntoView({block:'center'});", btn)
except Exception:
pass
try:
btn.click()
except Exception:
# JS click fallback
try:
driver.execute_script("arguments[0].click();", btn)
except Exception:
pass
else:
# No visible submit → try submitting the first form
try:
driver.execute_script("var f=document.querySelector('form'); if(f){f.submit();}")
except Exception:
pass
# Wait until not on /login and a logout becomes visible (or timeout)
end = time.time() + timeout_s
while time.time() < end:
if _is_logged_in(driver):
return True
time.sleep(0.5)
return False
def ensure_logged_in(driver, base_url: str, *, interactive_ok: bool = True, wait_s: int = 240) -> bool:
# Try cookies first
if _load_cookies(driver, base_url):
return True
# Auto login with creds if available
user = (os.getenv(ENV_USER_VAR) or ENV_USER or "").strip()
pwd = (os.getenv(ENV_PASS_VAR) or ENV_PASSWORD or "").strip()
if user and pwd:
if _auto_login_once(driver, base_url, user, pwd, timeout_s=120):
_save_cookies(driver, base_url)
return True
if not interactive_ok:
return False
# Manual fallback: let user log in, then save cookies
login_url = base_url.rstrip("/") + UI_LANG_PATH + "/login"
print("Please login to Part-DB in the opened browser window…")
try:
driver.get(login_url)
try:
driver.maximize_window()
except Exception:
pass
end = time.time() + wait_s
while time.time() < end:
if _is_logged_in(driver):
print("Login detected; saving cookies.")
_save_cookies(driver, base_url)
return True
time.sleep(0.6)
print("Login was not detected before timeout.")
return False
except Exception:
return False
def click_by_text(driver, labels, *, exact_only=False, exclude=None, index=None, timeout=60) -> bool:
"""
Robust text clicker that NEVER caches elements across DOM changes.
It re-queries candidates every attempt, scrolls into view, and retries on staleness.
"""
import time, re
want = [re.sub(r"\s+"," ", (x or "")).strip().lower() for x in labels]
exclude = [re.sub(r"\s+"," ", (x or "")).strip().lower() for x in (exclude or [])]
end = time.time() + timeout
def label(el):
t = (el.text or "").strip()
if not t:
for a in ("value","aria-label","title","data-original-title"):
v = el.get_attribute(a)
if v: return v.strip()
return t
def all_clickables():
# Do NOT keep references across attempts.
sels = "button, a.btn, a.button, input[type='submit'], input[type='button'], .btn, .button"
return [e for e in driver.find_elements(By.CSS_SELECTOR, sels) if e.is_displayed() and e.is_enabled()]
last_err = None
while time.time() < end:
try:
# tiny settle
time.sleep(0.05)
# REFRESH candidates every loop
cands = all_clickables()
# map and filter
mapped = []
for el in cands:
lab = re.sub(r"\s+"," ", label(el)).strip().lower()
if any(x in lab for x in exclude):
continue
if exact_only:
if lab in want:
mapped.append(el)
else:
if any(lab == w or lab.startswith(w) for w in want):
mapped.append(el)
if mapped:
# if a specific index requested, re-check bounds every attempt
target = mapped[index] if (index is not None and index < len(mapped)) else mapped[0]
try:
driver.execute_script("arguments[0].scrollIntoView({block:'center'});", target)
except Exception:
pass
time.sleep(0.05)
try:
target.click()
except (StaleElementReferenceException, ElementClickInterceptedException):
# try one fresh JS click
try:
# IMPORTANT: refetch an equivalent fresh node by XPath, using the visible text of target
txt = label(target)
if txt:
xp = f"//*[normalize-space(text())={json.dumps(txt)}][self::button or self::a or self::span or self::div]/ancestor-or-self::button | //*[normalize-space(text())={json.dumps(txt)}]/ancestor::a"
fresh = driver.find_elements(By.XPATH, xp)
fresh = [e for e in fresh if e.is_displayed() and e.is_enabled()]
if fresh:
driver.execute_script("arguments[0].scrollIntoView({block:'center'});", fresh[0])
time.sleep(0.05)
driver.execute_script("arguments[0].click();", fresh[0])
return True
except Exception as e:
last_err = e
# one more cycle
time.sleep(0.15)
continue
return True
except Exception as e:
last_err = e
time.sleep(0.2)
if last_err:
print(" [DBG] click_by_text last error:", last_err)
return False
def wait_for_new_window_and_switch(driver, old_handles, timeout=30):
end = time.time() + timeout; old = set(old_handles)
while time.time() < end:
now = driver.window_handles; new = [h for h in now if h not in old]
if new:
driver.switch_to.window(new[0]); return new[0]
time.sleep(0.2)
raise TimeoutError("New window/tab did not appear.")
def smart_click_xpath(driver, xpath: str, timeout=25) -> bool:
from selenium.webdriver.support import expected_conditions as EC
try:
end = time.time() + timeout
last_err = None
while time.time() < end:
try:
#elem = WebDriverWait(driver, 4).until(EC.element_to_be_clickable((By.XPATH, xpath)))
driver.execute_script("arguments[0].scrollIntoView({block:'center'});", elem)
time.sleep(0.05)
try:
elem.click()
return True
except (StaleElementReferenceException, ElementClickInterceptedException):
# Refresh element and js-click
fresh = driver.find_elements(By.XPATH, xpath)
fresh = [e for e in fresh if e.is_displayed() and e.is_enabled()]
if fresh:
driver.execute_script("arguments[0].scrollIntoView({block:'center'});", fresh[0])
time.sleep(0.05)
driver.execute_script("arguments[0].click();", fresh[0])
return True
except Exception as e:
last_err = e
time.sleep(0.2)
if last_err:
print(" [DBG] smart_click_xpath last error:", last_err)
return False
except Exception:
return False
def click_text_robust(driver, labels: list[str], *, exact=False, exclude_substrings: list[str]|None=None,
index: int|None=None, total_timeout: float=40.0) -> bool:
"""
Re-queries candidates every attempt; clicks via native, then JS fallback;
retries on StaleElementReference and intercepts.
"""
import time, re, json
end = time.time() + total_timeout
want = [re.sub(r"\s+"," ", (x or "")).strip().lower() for x in labels]
bads = [re.sub(r"\s+"," ", (x or "")).strip().lower() for x in (exclude_substrings or [])]
def label_of(el):
t = (el.text or "").strip()
if not t:
for a in ("value","aria-label","title","data-original-title"):
v = el.get_attribute(a)
if v: return v.strip()
return t
css = "button, a.btn, a.button, input[type='submit'], input[type='button'], .btn, .button"
last_err = None
while time.time() < end:
try:
# re-find fresh candidates each loop
candidates = [e for e in driver.find_elements(By.CSS_SELECTOR, css)
if e.is_displayed() and e.is_enabled()]
matches = []
for el in candidates:
lab = re.sub(r"\s+"," ", label_of(el)).strip().lower()
if any(b in lab for b in bads):
continue
ok = (lab in want) if exact else any(lab == w or lab.startswith(w) for w in want)
if ok:
matches.append(el)
if matches:
target = matches[index] if (index is not None and index < len(matches)) else matches[0]
try:
driver.execute_script("arguments[0].scrollIntoView({block:'center'});", target)
except Exception:
pass
time.sleep(0.05)
try:
target.click()
return True
except (StaleElementReferenceException, ElementClickInterceptedException):
# refetch by visible text and JS-click
txt = label_of(target)
if txt:
xp = (
f"//button[normalize-space(.)={json.dumps(txt)}]"
f"|//a[normalize-space(.)={json.dumps(txt)}]"
f"|//input[@type='submit' and @value={json.dumps(txt)}]"
)
fresh = [e for e in driver.find_elements(By.XPATH, xp) if e.is_displayed() and e.is_enabled()]
if fresh:
driver.execute_script("arguments[0].scrollIntoView({block:'center'});", fresh[0])
time.sleep(0.05)
driver.execute_script("arguments[0].click();", fresh[0])
return True
time.sleep(0.15)
except Exception as e:
last_err = e
time.sleep(0.2)
if last_err:
print(" [DBG] click_text_robust last error:", last_err)
return False
def click_xpath_robust(driver, xpaths: list[str], total_timeout: float=25.0) -> bool:
"""
Waits for any XPath to be clickable; on stale/intercepted, re-finds and JS-clicks.
"""
import time
end = time.time() + total_timeout
last_err = None
while time.time() < end:
try:
# try each xpath fresh this loop
for xp in xpaths:
try:
elem = WebDriverWait(driver, 3).until(EC.element_to_be_clickable((By.XPATH, xp)))
except TimeoutException:
continue
try:
driver.execute_script("arguments[0].scrollIntoView({block:'center'});", elem)
except Exception:
pass
time.sleep(0.05)
try:
elem.click()
return True
except (StaleElementReferenceException, ElementClickInterceptedException) as e:
last_err = e
# re-find and JS click
fresh = [e for e in driver.find_elements(By.XPATH, xp) if e.is_displayed() and e.is_enabled()]
if fresh:
driver.execute_script("arguments[0].scrollIntoView({block:'center'});", fresh[0])
time.sleep(0.05)
driver.execute_script("arguments[0].click();", fresh[0])
return True
time.sleep(0.15)
except Exception as e:
last_err = e
time.sleep(0.2)
if last_err:
print(" [DBG] click_xpath_robust last error:", last_err)
return False
def run_provider_update_flow(driver, base_url: str, lang: str, part_id: int, controller_handle: str) -> tuple[bool, str]:
# Page A
driver.switch_to.new_window("window")
update_handle = driver.current_window_handle
driver.get(f"{base_url}{lang}/tools/info_providers/update/{part_id}")
time.sleep(0.25)
# --- Step A1: Search (avoid 'Search options')
if not _click_search_button(driver, timeout=40):
try:
if update_handle in driver.window_handles: driver.close()
finally:
if controller_handle in driver.window_handles: driver.switch_to.window(controller_handle)
return (False, "search")
# tiny settle so results panel can render
time.sleep(0.4)
# --- Step A2: Create new part (middle button)
before = list(driver.window_handles)
# Prefer a stable XPath to the 2nd “Create new part” if possible:
# (Keeps re-querying until its clickable)
created = False
for _ in range(3): # a few short attempts
ok = click_text_robust(driver, ["Create new part"], exact=False, index=1, total_timeout=40)
if ok:
created = True
break
time.sleep(0.25)
if not created:
try:
if update_handle in driver.window_handles: driver.close()
finally:
if controller_handle in driver.window_handles: driver.switch_to.window(controller_handle)
return (False, "create_new_part")
# --- Step B: switch to the newly opened tab
try:
new_tab = wait_for_new_window_and_switch(driver, before, timeout=25)
except Exception:
try:
if update_handle in driver.window_handles: driver.close()
finally:
if controller_handle in driver.window_handles: driver.switch_to.window(controller_handle)
return (False, "create_new_part (no new tab)")
# tiny settle; new document is loading
time.sleep(0.25)
# --- Step B1: Save changes
ok = click_xpath_robust(driver, [
"//button[normalize-space()='Save changes']",
"//button[contains(.,'Save changes')]",
"//input[@type='submit' and @value='Save changes']",
], total_timeout=25)
if not ok:
ok = click_xpath_robust(driver, [
"//button[normalize-space()='Save changes']",
"//button[contains(.,'Save changes')]",
"//input[@type='submit' and @value='Save changes']",
], total_timeout=25)
if not ok:
# cleanup and return
try:
if new_tab in driver.window_handles: driver.close()
if update_handle in driver.window_handles:
driver.switch_to.window(update_handle); driver.close()
if controller_handle in driver.window_handles: driver.switch_to.window(controller_handle)
except Exception:
pass
return (False, "save_changes")
# Close Page B and Page A, back to controller
try:
if new_tab in driver.window_handles: driver.close()
except Exception: pass
try:
if update_handle in driver.window_handles:
driver.switch_to.window(update_handle); driver.close()
except Exception: pass
try:
if controller_handle in driver.window_handles: driver.switch_to.window(controller_handle)
except Exception: pass
return (True, "ok")
def wait_fields_to_persist(api: PartDB, part_id: int, timeout_s: int = 8, poll_s: float = 0.8) -> list[str]:
deadline = time.time() + timeout_s; last_missing: List[str] = []
while time.time() < deadline:
part = api.get_part(part_id)
last_missing = _missing_fields(api, part_id, part)
if not last_missing: break
time.sleep(poll_s)
return last_missing
def set_eda_from_resistance(api: PartDB, part_id: int, *, max_wait_s: int = 12, poll_every: float = 0.8) -> bool:
deadline = time.time() + max_wait_s
while True:
part = api.get_part(part_id)
params = api.get_part_params_flat(part)
if not params:
vals = api.get_part_parameter_values(part_id)
if vals:
part = dict(part); part["parameter_values"] = vals
params = api.get_part_params_flat(part)
value_unit = params.get("resistance")
if value_unit:
value, unit = value_unit
ohms = parse_resistance_to_ohms(value, unit)
if ohms is None: return False
eda = format_ohms_for_eda(ohms)
existing = api.get_eda_value(part)
if existing and str(existing).strip(): return True
return api.patch_eda_value(part_id, eda)
if time.time() >= deadline: return False
time.sleep(poll_every)
def set_eda_from_capacitance(api: PartDB, part_id: int, *, max_wait_s: int = 12, poll_every: float = 0.8) -> bool:
deadline = time.time() + max_wait_s
while True:
part = api.get_part(part_id)
params = api.get_part_params_flat(part)
if not params:
vals = api.get_part_parameter_values(part_id)
if vals:
part = dict(part); part["parameter_values"] = vals
params = api.get_part_params_flat(part)
cap = params.get("capacitance")
if cap:
value, unit = cap
F = parse_capacitance_to_farads(value, unit)
if F is None: return False
eda = format_farads_for_eda(F)
existing = api.get_eda_value(part)
if existing and str(existing).strip(): return True
return api.patch_eda_value(part_id, eda)
if time.time() >= deadline: return False
time.sleep(poll_every)
def accept_bulk_import_jobs(driver, base_url: str, lang: str, job_url: str = None, max_iterations: int = 100) -> Tuple[int, int, int]:
"""
Automates accepting bulk import jobs by:
1. Finding and marking skipped parts (cards with "0 results found") as pending
2. Finding the first selectable "Update part" button in a border-success card
3. Clicking it and waiting for page load
4. Clicking "Save" and waiting for page load
5. Clicking "Save" again and waiting for page load
6. Clicking "Complete" and waiting for page load
7. Repeating until no more selectable buttons exist
Returns (successful_count, failed_count, skipped_count)
"""
# Navigate to the job/import page if URL provided
if job_url:
driver.get(job_url)
time.sleep(1.5)
successful = 0
failed = 0
total_skipped = 0
for iteration in range(max_iterations):
print(f"\n[Accept Jobs] Iteration {iteration + 1}/{max_iterations}")
# Scroll to top first to ensure we see all buttons
try:
driver.execute_script("window.scrollTo(0, 0);")
time.sleep(0.5)
except Exception:
pass
# First check for skipped parts (border-warning cards with "0 results found" or "No results found")
# and mark them as skipped by clicking "Mark Pending"
skipped_count = 0
try:
# Find cards with border-warning that have "0 results found" badge or "No results found" alert
warning_cards = driver.find_elements(By.XPATH, "//div[contains(@class, 'card') and contains(@class, 'border-warning')]")
for card in warning_cards:
try:
# Check if it has "0 results found" badge or "No results found" message
has_no_results = False
try:
badge = card.find_element(By.XPATH, ".//span[contains(@class, 'badge') and contains(@class, 'bg-info') and contains(., 'results found')]")
if badge and '0 results' in badge.text:
has_no_results = True
except Exception:
pass
if not has_no_results:
try:
alert = card.find_element(By.XPATH, ".//div[contains(@class, 'alert-info') and contains(., 'No results found')]")
if alert:
has_no_results = True
except Exception:
pass
if has_no_results:
# This card should be skipped, click "Mark Skipped" button if available
try:
mark_skipped_btn = card.find_element(By.XPATH, ".//button[contains(., 'Mark Skipped')]")
if mark_skipped_btn and mark_skipped_btn.is_displayed():
driver.execute_script("arguments[0].scrollIntoView({block:'center', behavior:'smooth'});", mark_skipped_btn)
time.sleep(0.3)
try:
mark_skipped_btn.click()
except Exception:
driver.execute_script("arguments[0].click();", mark_skipped_btn)
skipped_count += 1
print(f"[Accept Jobs] Marked card as skipped (no results found)")
time.sleep(0.5)
except Exception:
pass
except Exception as e:
continue
if skipped_count > 0:
print(f"[Accept Jobs] Marked {skipped_count} cards as pending (no results)")
total_skipped += skipped_count
time.sleep(1.0) # Wait after marking items as pending
except Exception as e:
print(f"[Accept Jobs] Error checking for skipped cards: {e}")
# Find all "Update part" buttons that are NOT disabled (no 'disabled' in class)
update_button = None
try:
# Find <a> or <button> elements with "Update Part" text that don't have 'disabled' in their class
# Must be within btn-group-vertical and must not have 'disabled' class
possible_xpaths = [
"//a[contains(@class, 'btn') and not(contains(@class, 'disabled')) and contains(., 'Update Part')]",
"//a[contains(@class, 'btn') and not(contains(@class, 'disabled')) and contains(., 'Update part')]",
"//button[contains(@class, 'btn') and not(contains(@class, 'disabled')) and contains(., 'Update Part')]",
"//button[contains(@class, 'btn') and not(contains(@class, 'disabled')) and contains(., 'Update part')]",
]
for xpath in possible_xpaths:
try:
elements = driver.find_elements(By.XPATH, xpath)
for el in elements:
# Double-check: must not have 'disabled' in class attribute
class_attr = el.get_attribute('class') or ''
if 'disabled' in class_attr.lower():
continue
if el.is_displayed() and el.is_enabled():
# Found a valid button, use the first one
update_button = el
print(f"[Accept Jobs] Found button with text: '{el.text.strip()}' and class: '{class_attr}'")
break
if update_button:
break
except Exception as e:
print(f"[Accept Jobs] Error with xpath {xpath}: {e}")
continue
except Exception as e:
print(f"[Accept Jobs] Error finding buttons: {e}")
break
if not update_button:
print("[Accept Jobs] No more selectable 'Update part' buttons (without 'disabled' class) found. Done.")
break
# Click the button and wait for page load
try:
# Scroll into view
driver.execute_script("arguments[0].scrollIntoView({block:'center', behavior:'smooth'});", update_button)
time.sleep(0.5)
# Click the button
try:
update_button.click()
except Exception:
# Fallback to JS click
driver.execute_script("arguments[0].click();", update_button)
print("[Accept Jobs] Clicked 'Update part' button, waiting for page load...")
time.sleep(2.5) # Wait for page to load
except Exception as e:
print(f"[Accept Jobs] Failed to click button: {e}")
failed += 1
continue
# Now on the same page (no tab switch) - click Save first time
save_success = False
try:
print("[Accept Jobs] Looking for first 'Save' button...")
ok = click_xpath_robust(driver, [
"//button[@type='submit' and @id='part_base_save']",
"//button[@type='submit' and contains(@name, 'save')]",
"//button[normalize-space()='Save changes']",
"//button[contains(normalize-space(), 'Save changes')]",
"//button[normalize-space()='Save']",
"//input[@type='submit' and contains(@value, 'Save')]",
], total_timeout=15)
if ok:
print("[Accept Jobs] First 'Save' clicked, waiting for page load...")
time.sleep(2.5) # Wait for page to load
# Click Save second time
print("[Accept Jobs] Looking for second 'Save' button...")
ok = click_xpath_robust(driver, [
"//button[@type='submit' and @id='part_base_save']",
"//button[@type='submit' and contains(@name, 'save')]",
"//button[normalize-space()='Save changes']",
"//button[contains(normalize-space(), 'Save changes')]",
"//button[normalize-space()='Save']",
"//input[@type='submit' and contains(@value, 'Save')]",
], total_timeout=15)
if ok:
print("[Accept Jobs] Second 'Save' clicked, waiting for page load...")
time.sleep(2.5) # Wait for page to load
# Click Complete
print("[Accept Jobs] Looking for 'Complete' button...")
ok = click_xpath_robust(driver, [
"//button[normalize-space()='Complete']",
"//button[contains(normalize-space(), 'Complete')]",
"//input[@type='submit' and contains(@value, 'Complete')]",
"//a[contains(normalize-space(), 'Complete')]",
], total_timeout=15)
if ok:
print("[Accept Jobs] 'Complete' clicked successfully!")
save_success = True
time.sleep(2.0) # Wait for completion to process
else:
print("[Accept Jobs] Failed to find/click 'Complete' button")
else:
print("[Accept Jobs] Failed to find/click second 'Save' button")
else:
print("[Accept Jobs] Failed to find/click first 'Save' button")
except Exception as e:
print(f"[Accept Jobs] Error during save/complete sequence: {e}")
if save_success:
successful += 1
print(f"[Accept Jobs] Job accepted successfully! (Total: {successful})")
else:
failed += 1
print(f"[Accept Jobs] Job failed! (Total failed: {failed})")
# Small delay before next iteration
time.sleep(1.0)
print(f"\n[Accept Jobs] Complete: {successful} successful, {failed} failed, {total_skipped} skipped")
return (successful, failed, total_skipped)
def update_part_from_providers_once(part_id: int, *, headless: bool = True) -> Tuple[bool, str]:
"""
Run the same 'Tools → Info providers → Update' flow once for a part.
Returns (ok, where) where 'where' is 'ok' or the stage that failed.
"""
if headless is None:
headless = HEADLESS_PROVIDER
try:
drv = start_firefox_resilient(headless_first=headless)
drv.get(PARTDB_BASE + "/")
if not ensure_logged_in(drv, PARTDB_BASE, interactive_ok=True, wait_s=120):
try: drv.quit()
except Exception: pass
return (False, "login")
controller = drv.current_window_handle
ok, where = run_provider_update_flow(drv, PARTDB_BASE, UI_LANG_PATH, part_id, controller)
try: drv.quit()
except Exception: pass
if ok:
return (True, "ok")
return (False, where or "provider")
except Exception as e:
try: drv.quit()
except Exception: pass
return (False, f"exception: {e}")
def _click_search_button(driver, timeout=40) -> bool:
"""
Clicks the 'Search' button on the provider update page.
Avoids 'Search options' and works across themes.
"""
import time
end = time.time() + timeout
XPATHS = [
# exact 'Search' on a button
"//button[normalize-space(.)='Search']",
# buttons containing Search but not 'Search options'
"//button[contains(normalize-space(.), 'Search') and not(contains(., 'options'))]",
# inputs of type submit with value=Search
"//input[@type='submit' and translate(@value,'abcdefghijklmnopqrstuvwxyz','ABCDEFGHIJKLMNOPQRSTUVWXYZ')='SEARCH']",
# any clickable with text Search
"//*[self::button or self::a or self::input][contains(normalize-space(.), 'Search')]"
]
while time.time() < end:
# try all xpaths fresh each loop
for xp in XPATHS:
try:
els = [e for e in driver.find_elements(By.XPATH, xp) if e.is_displayed() and e.is_enabled()]
# filter out 'Search options'
els = [e for e in els if "options" not in (e.text or "").lower()]
if not els:
continue
el = els[0]
try:
driver.execute_script("arguments[0].scrollIntoView({block:'center'});", el)
except Exception:
pass
time.sleep(0.05)
try:
el.click()
except Exception:
# JS click fallback
try:
driver.execute_script("arguments[0].click();", el)
except Exception:
continue
return True
except Exception:
pass
time.sleep(0.2)
return False
def _query_any(driver, selectors):
for s in selectors:
els = driver.find_elements(By.CSS_SELECTOR, s)
if els:
return els[0]
return None
def _fill_via_js(driver, el, value: str):
# Set value and dispatch events so reactive forms notice
driver.execute_script("""
const el = arguments[0], val = arguments[1];
if (!el) return;
const nativeSetter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value').set;
nativeSetter.call(el, val);
el.dispatchEvent(new Event('input', { bubbles: true }));
el.dispatchEvent(new Event('change', { bubbles: true }));
""", el, value)
def _auto_login_once(driver, base_url: str, username: str, password: str, timeout_s: int = 90) -> bool:
login_url = base_url.rstrip("/") + UI_LANG_PATH + "/login"
print("[Login] Navigating to:", login_url)
driver.get(login_url)
# Some skins show a “Login” button that reveals the form
for xp in [
"//button[contains(., 'Login')]",
"//a[contains(., 'Login')]",
"//button[contains(., 'Sign in')]",
"//a[contains(., 'Sign in')]",
]:
try:
btns = driver.find_elements(By.XPATH, xp)
if btns:
try:
driver.execute_script("arguments[0].click();", btns[0])
time.sleep(0.4)
except Exception:
pass
except Exception:
pass
wait = WebDriverWait(driver, 25)
user_sel = [
"input[name='_username']",
"input#username",
"input[name='username']",
"input[type='email']",
"form input[type='text']",
]
pass_sel = [
"input[name='_password']",
"input#password",
"input[name='password']",
"form input[type='password']",
]
submit_sel = [
"button[type='submit']",
"input[type='submit']",
"button.btn-primary",
"form button",
]
# Wait for fields
try:
wait.until(lambda d: _query_any(d, user_sel) and _query_any(d, pass_sel))
except Exception:
print("[Login] Could not find username/password inputs.")
return False
u = _query_any(driver, user_sel)
p = _query_any(driver, pass_sel)
if not u or not p:
print("[Login] Inputs missing after wait.")
return False
try:
_fill_via_js(driver, u, username)
_fill_via_js(driver, p, password)
time.sleep(0.2)
except Exception as e:
print("[Login] Filling fields failed:", e)
return False
btn = _query_any(driver, submit_sel)
if btn:
try:
driver.execute_script("arguments[0].scrollIntoView({block:'center'});", btn)
time.sleep(0.1)
btn.click()
except Exception:
try:
driver.execute_script("arguments[0].click();", btn)
except Exception:
pass
else:
# last resort: submit the first form
try:
driver.execute_script("var f=document.querySelector('form'); if(f){f.submit();}")
except Exception:
pass
# Wait until logged in
end = time.time() + timeout_s
while time.time() < end:
if _is_logged_in(driver):
print("[Login] Logged in.")
return True
time.sleep(0.5)
print("[Login] Timeout waiting for post-login.")
return False