import os, time, tempfile, shutil, traceback, json from typing import Tuple from selenium import webdriver from selenium.webdriver.common.by import By # waits from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC # Firefox from selenium.webdriver.firefox.service import Service as FirefoxService from selenium.webdriver.firefox.options import Options as FirefoxOptions from webdriver_manager.firefox import GeckoDriverManager from apis.partdb_api import PartDB from config import ( PARTDB_BASE, UI_LANG_PATH, ENV_USER_VAR, ENV_PASS_VAR, ENV_USER, ENV_PASSWORD, HEADLESS_PROVIDER, HEADLESS_CONTROLLER, HEADLESS_WORKER, ) GECKO_LOG_PATH = os.path.abspath("geckodriver.log") def start_firefox_resilient(*, headless_first: bool, allow_chrome_fallback: bool = True): """ Start Firefox. Headless is EXACTLY as requested. If Firefox fails and allow_chrome_fallback=True, we fall back to Chrome (also non-headless). Emits clear debug prints so you know what launched. """ # Make sure MOZ_HEADLESS doesn't override us os.environ.pop("MOZ_HEADLESS", None) def _ff_try(headless: bool, binary: str | None = None): profile_dir = tempfile.mkdtemp(prefix="ff-profile-") try: opts = FirefoxOptions() if headless: opts.add_argument("-headless") # ONLY when explicitly requested opts.set_preference("browser.shell.checkDefaultBrowser", False) opts.set_preference("browser.startup.homepage_override.mstone", "ignore") opts.add_argument("-profile") opts.add_argument(profile_dir) if binary: opts.binary_location = binary service = FirefoxService(GeckoDriverManager().install(), log_output=GECKO_LOG_PATH) drv = webdriver.Firefox(service=service, options=opts) try: drv.maximize_window() except Exception: pass print(f"[Selenium] Firefox launched (headless={headless}, bin={binary or 'default'}).") return drv except Exception as e: print(f"[Selenium] Firefox launch failed (headless={headless}, bin={binary or 'default'}): {e}") shutil.rmtree(profile_dir, ignore_errors=True) return None # 1) exact request d = _ff_try(headless_first, None) if d: return d # 2) try flipping headless (sometimes GPU/driver quirks) d = _ff_try(not headless_first, None) if d: print(f"[Selenium] NOTE: flipped headless to {not headless_first} due to previous failure.") return d # 3) try known Firefox binaries for binpath in [ r"C:\Program Files\Mozilla Firefox\firefox.exe", r"C:\Program Files (x86)\Mozilla Firefox\firefox.exe", shutil.which("firefox") or "", ]: if binpath and os.path.exists(binpath): d = _ff_try(headless_first, binpath) or _ff_try(not headless_first, binpath) if d: return d # 4) fallback to Chrome if allowed if allow_chrome_fallback: try: from selenium.webdriver.chrome.service import Service as ChromeService from selenium.webdriver.chrome.options import Options as ChromeOptions from webdriver_manager.chrome import ChromeDriverManager c_opts = ChromeOptions() # IMPORTANT: no headless unless requested if headless_first: c_opts.add_argument("--headless=new") c_srv = ChromeService(ChromeDriverManager().install()) drv = webdriver.Chrome(service=c_srv, options=c_opts) try: drv.maximize_window() except Exception: pass print(f"[Selenium] Chrome fallback launched (headless={headless_first}).") return drv except Exception as e: print(f"[Selenium] Chrome fallback failed: {e}") # 5) show geckodriver tail to help debug try: with open(GECKO_LOG_PATH, "r", encoding="utf-8", errors="ignore") as f: tail = "".join(f.readlines()[-80:]) print("\n[geckodriver.log tail]\n" + tail + "\n") except Exception: pass raise RuntimeError("Could not launch a visible browser. See messages above.") def _is_logged_in(driver) -> bool: try: url = (driver.current_url or "").lower() if "/login" in url: return False # any “Logout” link or user-menu if driver.find_elements(By.XPATH, "//a[contains(@href,'logout') or contains(., 'Logout') or contains(., 'Sign out')]"): return True # many Part-DB skins have a user dropdown with a logout item: if driver.find_elements(By.CSS_SELECTOR, "[data-user-menu], .user-menu, a[href*='/logout']"): return True # if we can see a main nav that’s only visible when logged in: if driver.find_elements(By.CSS_SELECTOR, "nav, .navbar, header .nav"): # don’t return True on the login page’s navbar: return "/login" not in url except Exception: pass return False COOKIES_FILE = os.path.abspath("partdb_cookies.json") def _save_cookies(driver, base_url: str, path: str = COOKIES_FILE): try: driver.get(base_url + "/") time.sleep(0.2) with open(path, "w", encoding="utf-8") as f: json.dump(driver.get_cookies(), f) print("[Login] Cookies saved.") except Exception as e: print("[Login] Save cookies failed:", e) def _load_cookies(driver, base_url: str, path: str = COOKIES_FILE) -> bool: if not os.path.exists(path): return False try: driver.get(base_url + "/") time.sleep(0.2) with open(path, "r", encoding="utf-8") as f: for c in json.load(f): try: driver.add_cookie(c) except Exception: pass driver.get(base_url + "/") time.sleep(0.6) ok = _is_logged_in(driver) print("[Login] Cookies loaded →", "OK" if ok else "not logged in") return ok except Exception as e: print("[Login] Load cookies failed:", e) return False def _try_auto_login(driver, base_url: str, username: str, password: str, timeout_s: int = 120) -> bool: """ Very robust login: - opens login page - fills known selectors - JS fallback to submit the form - waits until /login disappears AND a logout link is visible """ login_url = base_url.rstrip("/") + UI_LANG_PATH + "/login" driver.get(login_url) wait = WebDriverWait(driver, 30) # Find username field (many variants) user_sel = [ "input[name='_username']", "input#username", "input[name='username']", "input[type='email']", "input[type='text']", ] pass_sel = [ "input[name='_password']", "input#password", "input[name='password']", "input[type='password']", ] submit_sel = [ "button[type='submit']", "input[type='submit']", "button.btn-primary", "button[class*='login']", "form button", ] def _query_any(selectors): for s in selectors: els = driver.find_elements(By.CSS_SELECTOR, s) if els: return els[0] return None # Wait for any username/password field to appear try: wait.until(lambda d: _query_any(user_sel) and _query_any(pass_sel)) except Exception: return False u = _query_any(user_sel); p = _query_any(pass_sel) try: u.clear(); u.send_keys(username) p.clear(); p.send_keys(password) except Exception: pass # Try clicking a submit button; fall back to JS submit btn = _query_any(submit_sel) if btn: try: driver.execute_script("arguments[0].scrollIntoView({block:'center'});", btn) except Exception: pass try: btn.click() except Exception: # JS click fallback try: driver.execute_script("arguments[0].click();", btn) except Exception: pass else: # No visible submit → try submitting the first form try: driver.execute_script("var f=document.querySelector('form'); if(f){f.submit();}") except Exception: pass # Wait until not on /login and a logout becomes visible (or timeout) end = time.time() + timeout_s while time.time() < end: if _is_logged_in(driver): return True time.sleep(0.5) return False def ensure_logged_in(driver, base_url: str, *, interactive_ok: bool = True, wait_s: int = 240) -> bool: # Try cookies first if _load_cookies(driver, base_url): return True # Auto login with creds if available user = (os.getenv(ENV_USER_VAR) or ENV_USER or "").strip() pwd = (os.getenv(ENV_PASS_VAR) or ENV_PASSWORD or "").strip() if user and pwd: if _auto_login_once(driver, base_url, user, pwd, timeout_s=120): _save_cookies(driver, base_url) return True if not interactive_ok: return False # Manual fallback: let user log in, then save cookies login_url = base_url.rstrip("/") + UI_LANG_PATH + "/login" print("Please login to Part-DB in the opened browser window…") try: driver.get(login_url) try: driver.maximize_window() except Exception: pass end = time.time() + wait_s while time.time() < end: if _is_logged_in(driver): print("Login detected; saving cookies.") _save_cookies(driver, base_url) return True time.sleep(0.6) print("Login was not detected before timeout.") return False except Exception: return False def click_by_text(driver, labels, *, exact_only=False, exclude=None, index=None, timeout=60) -> bool: """ Robust text clicker that NEVER caches elements across DOM changes. It re-queries candidates every attempt, scrolls into view, and retries on staleness. """ import time, re want = [re.sub(r"\s+"," ", (x or "")).strip().lower() for x in labels] exclude = [re.sub(r"\s+"," ", (x or "")).strip().lower() for x in (exclude or [])] end = time.time() + timeout def label(el): t = (el.text or "").strip() if not t: for a in ("value","aria-label","title","data-original-title"): v = el.get_attribute(a) if v: return v.strip() return t def all_clickables(): # Do NOT keep references across attempts. sels = "button, a.btn, a.button, input[type='submit'], input[type='button'], .btn, .button" return [e for e in driver.find_elements(By.CSS_SELECTOR, sels) if e.is_displayed() and e.is_enabled()] last_err = None while time.time() < end: try: # tiny settle time.sleep(0.05) # REFRESH candidates every loop cands = all_clickables() # map and filter mapped = [] for el in cands: lab = re.sub(r"\s+"," ", label(el)).strip().lower() if any(x in lab for x in exclude): continue if exact_only: if lab in want: mapped.append(el) else: if any(lab == w or lab.startswith(w) for w in want): mapped.append(el) if mapped: # if a specific index requested, re-check bounds every attempt target = mapped[index] if (index is not None and index < len(mapped)) else mapped[0] try: driver.execute_script("arguments[0].scrollIntoView({block:'center'});", target) except Exception: pass time.sleep(0.05) try: target.click() except (StaleElementReferenceException, ElementClickInterceptedException): # try one fresh JS click try: # IMPORTANT: refetch an equivalent fresh node by XPath, using the visible text of target txt = label(target) if txt: xp = f"//*[normalize-space(text())={json.dumps(txt)}][self::button or self::a or self::span or self::div]/ancestor-or-self::button | //*[normalize-space(text())={json.dumps(txt)}]/ancestor::a" fresh = driver.find_elements(By.XPATH, xp) fresh = [e for e in fresh if e.is_displayed() and e.is_enabled()] if fresh: driver.execute_script("arguments[0].scrollIntoView({block:'center'});", fresh[0]) time.sleep(0.05) driver.execute_script("arguments[0].click();", fresh[0]) return True except Exception as e: last_err = e # one more cycle time.sleep(0.15) continue return True except Exception as e: last_err = e time.sleep(0.2) if last_err: print(" [DBG] click_by_text last error:", last_err) return False def wait_for_new_window_and_switch(driver, old_handles, timeout=30): end = time.time() + timeout; old = set(old_handles) while time.time() < end: now = driver.window_handles; new = [h for h in now if h not in old] if new: driver.switch_to.window(new[0]); return new[0] time.sleep(0.2) raise TimeoutError("New window/tab did not appear.") def smart_click_xpath(driver, xpath: str, timeout=25) -> bool: from selenium.webdriver.support import expected_conditions as EC try: end = time.time() + timeout last_err = None while time.time() < end: try: #elem = WebDriverWait(driver, 4).until(EC.element_to_be_clickable((By.XPATH, xpath))) driver.execute_script("arguments[0].scrollIntoView({block:'center'});", elem) time.sleep(0.05) try: elem.click() return True except (StaleElementReferenceException, ElementClickInterceptedException): # Refresh element and js-click fresh = driver.find_elements(By.XPATH, xpath) fresh = [e for e in fresh if e.is_displayed() and e.is_enabled()] if fresh: driver.execute_script("arguments[0].scrollIntoView({block:'center'});", fresh[0]) time.sleep(0.05) driver.execute_script("arguments[0].click();", fresh[0]) return True except Exception as e: last_err = e time.sleep(0.2) if last_err: print(" [DBG] smart_click_xpath last error:", last_err) return False except Exception: return False def click_text_robust(driver, labels: list[str], *, exact=False, exclude_substrings: list[str]|None=None, index: int|None=None, total_timeout: float=40.0) -> bool: """ Re-queries candidates every attempt; clicks via native, then JS fallback; retries on StaleElementReference and intercepts. """ import time, re, json end = time.time() + total_timeout want = [re.sub(r"\s+"," ", (x or "")).strip().lower() for x in labels] bads = [re.sub(r"\s+"," ", (x or "")).strip().lower() for x in (exclude_substrings or [])] def label_of(el): t = (el.text or "").strip() if not t: for a in ("value","aria-label","title","data-original-title"): v = el.get_attribute(a) if v: return v.strip() return t css = "button, a.btn, a.button, input[type='submit'], input[type='button'], .btn, .button" last_err = None while time.time() < end: try: # re-find fresh candidates each loop candidates = [e for e in driver.find_elements(By.CSS_SELECTOR, css) if e.is_displayed() and e.is_enabled()] matches = [] for el in candidates: lab = re.sub(r"\s+"," ", label_of(el)).strip().lower() if any(b in lab for b in bads): continue ok = (lab in want) if exact else any(lab == w or lab.startswith(w) for w in want) if ok: matches.append(el) if matches: target = matches[index] if (index is not None and index < len(matches)) else matches[0] try: driver.execute_script("arguments[0].scrollIntoView({block:'center'});", target) except Exception: pass time.sleep(0.05) try: target.click() return True except (StaleElementReferenceException, ElementClickInterceptedException): # refetch by visible text and JS-click txt = label_of(target) if txt: xp = ( f"//button[normalize-space(.)={json.dumps(txt)}]" f"|//a[normalize-space(.)={json.dumps(txt)}]" f"|//input[@type='submit' and @value={json.dumps(txt)}]" ) fresh = [e for e in driver.find_elements(By.XPATH, xp) if e.is_displayed() and e.is_enabled()] if fresh: driver.execute_script("arguments[0].scrollIntoView({block:'center'});", fresh[0]) time.sleep(0.05) driver.execute_script("arguments[0].click();", fresh[0]) return True time.sleep(0.15) except Exception as e: last_err = e time.sleep(0.2) if last_err: print(" [DBG] click_text_robust last error:", last_err) return False def click_xpath_robust(driver, xpaths: list[str], total_timeout: float=25.0) -> bool: """ Waits for any XPath to be clickable; on stale/intercepted, re-finds and JS-clicks. """ import time end = time.time() + total_timeout last_err = None while time.time() < end: try: # try each xpath fresh this loop for xp in xpaths: try: elem = WebDriverWait(driver, 3).until(EC.element_to_be_clickable((By.XPATH, xp))) except TimeoutException: continue try: driver.execute_script("arguments[0].scrollIntoView({block:'center'});", elem) except Exception: pass time.sleep(0.05) try: elem.click() return True except (StaleElementReferenceException, ElementClickInterceptedException) as e: last_err = e # re-find and JS click fresh = [e for e in driver.find_elements(By.XPATH, xp) if e.is_displayed() and e.is_enabled()] if fresh: driver.execute_script("arguments[0].scrollIntoView({block:'center'});", fresh[0]) time.sleep(0.05) driver.execute_script("arguments[0].click();", fresh[0]) return True time.sleep(0.15) except Exception as e: last_err = e time.sleep(0.2) if last_err: print(" [DBG] click_xpath_robust last error:", last_err) return False def run_provider_update_flow(driver, base_url: str, lang: str, part_id: int, controller_handle: str) -> tuple[bool, str]: # Page A driver.switch_to.new_window("window") update_handle = driver.current_window_handle driver.get(f"{base_url}{lang}/tools/info_providers/update/{part_id}") time.sleep(0.25) # --- Step A1: Search (avoid 'Search options') if not _click_search_button(driver, timeout=40): try: if update_handle in driver.window_handles: driver.close() finally: if controller_handle in driver.window_handles: driver.switch_to.window(controller_handle) return (False, "search") # tiny settle so results panel can render time.sleep(0.4) # --- Step A2: Create new part (middle button) before = list(driver.window_handles) # Prefer a stable XPath to the 2nd “Create new part” if possible: # (Keeps re-querying until it’s clickable) created = False for _ in range(3): # a few short attempts ok = click_text_robust(driver, ["Create new part"], exact=False, index=1, total_timeout=40) if ok: created = True break time.sleep(0.25) if not created: try: if update_handle in driver.window_handles: driver.close() finally: if controller_handle in driver.window_handles: driver.switch_to.window(controller_handle) return (False, "create_new_part") # --- Step B: switch to the newly opened tab try: new_tab = wait_for_new_window_and_switch(driver, before, timeout=25) except Exception: try: if update_handle in driver.window_handles: driver.close() finally: if controller_handle in driver.window_handles: driver.switch_to.window(controller_handle) return (False, "create_new_part (no new tab)") # tiny settle; new document is loading time.sleep(0.25) # --- Step B1: Save changes ok = click_xpath_robust(driver, [ "//button[normalize-space()='Save changes']", "//button[contains(.,'Save changes')]", "//input[@type='submit' and @value='Save changes']", ], total_timeout=25) if not ok: ok = click_xpath_robust(driver, [ "//button[normalize-space()='Save changes']", "//button[contains(.,'Save changes')]", "//input[@type='submit' and @value='Save changes']", ], total_timeout=25) if not ok: # cleanup and return try: if new_tab in driver.window_handles: driver.close() if update_handle in driver.window_handles: driver.switch_to.window(update_handle); driver.close() if controller_handle in driver.window_handles: driver.switch_to.window(controller_handle) except Exception: pass return (False, "save_changes") # Close Page B and Page A, back to controller try: if new_tab in driver.window_handles: driver.close() except Exception: pass try: if update_handle in driver.window_handles: driver.switch_to.window(update_handle); driver.close() except Exception: pass try: if controller_handle in driver.window_handles: driver.switch_to.window(controller_handle) except Exception: pass return (True, "ok") def wait_fields_to_persist(api: PartDB, part_id: int, timeout_s: int = 8, poll_s: float = 0.8) -> list[str]: deadline = time.time() + timeout_s; last_missing: List[str] = [] while time.time() < deadline: part = api.get_part(part_id) last_missing = _missing_fields(api, part_id, part) if not last_missing: break time.sleep(poll_s) return last_missing def set_eda_from_resistance(api: PartDB, part_id: int, *, max_wait_s: int = 12, poll_every: float = 0.8) -> bool: deadline = time.time() + max_wait_s while True: part = api.get_part(part_id) params = api.get_part_params_flat(part) if not params: vals = api.get_part_parameter_values(part_id) if vals: part = dict(part); part["parameter_values"] = vals params = api.get_part_params_flat(part) value_unit = params.get("resistance") if value_unit: value, unit = value_unit ohms = parse_resistance_to_ohms(value, unit) if ohms is None: return False eda = format_ohms_for_eda(ohms) existing = api.get_eda_value(part) if existing and str(existing).strip(): return True return api.patch_eda_value(part_id, eda) if time.time() >= deadline: return False time.sleep(poll_every) def set_eda_from_capacitance(api: PartDB, part_id: int, *, max_wait_s: int = 12, poll_every: float = 0.8) -> bool: deadline = time.time() + max_wait_s while True: part = api.get_part(part_id) params = api.get_part_params_flat(part) if not params: vals = api.get_part_parameter_values(part_id) if vals: part = dict(part); part["parameter_values"] = vals params = api.get_part_params_flat(part) cap = params.get("capacitance") if cap: value, unit = cap F = parse_capacitance_to_farads(value, unit) if F is None: return False eda = format_farads_for_eda(F) existing = api.get_eda_value(part) if existing and str(existing).strip(): return True return api.patch_eda_value(part_id, eda) if time.time() >= deadline: return False time.sleep(poll_every) def accept_bulk_import_jobs(driver, base_url: str, lang: str, job_url: str = None, max_iterations: int = 100) -> Tuple[int, int, int]: """ Automates accepting bulk import jobs by: 1. Finding and marking skipped parts (cards with "0 results found") as pending 2. Finding the first selectable "Update part" button in a border-success card 3. Clicking it and waiting for page load 4. Clicking "Save" and waiting for page load 5. Clicking "Save" again and waiting for page load 6. Clicking "Complete" and waiting for page load 7. Repeating until no more selectable buttons exist Returns (successful_count, failed_count, skipped_count) """ # Navigate to the job/import page if URL provided if job_url: driver.get(job_url) time.sleep(1.5) successful = 0 failed = 0 total_skipped = 0 for iteration in range(max_iterations): print(f"\n[Accept Jobs] Iteration {iteration + 1}/{max_iterations}") # Scroll to top first to ensure we see all buttons try: driver.execute_script("window.scrollTo(0, 0);") time.sleep(0.5) except Exception: pass # First check for skipped parts (border-warning cards with "0 results found" or "No results found") # and mark them as skipped by clicking "Mark Pending" skipped_count = 0 try: # Find cards with border-warning that have "0 results found" badge or "No results found" alert warning_cards = driver.find_elements(By.XPATH, "//div[contains(@class, 'card') and contains(@class, 'border-warning')]") for card in warning_cards: try: # Check if it has "0 results found" badge or "No results found" message has_no_results = False try: badge = card.find_element(By.XPATH, ".//span[contains(@class, 'badge') and contains(@class, 'bg-info') and contains(., 'results found')]") if badge and '0 results' in badge.text: has_no_results = True except Exception: pass if not has_no_results: try: alert = card.find_element(By.XPATH, ".//div[contains(@class, 'alert-info') and contains(., 'No results found')]") if alert: has_no_results = True except Exception: pass if has_no_results: # This card should be skipped, click "Mark Skipped" button if available try: mark_skipped_btn = card.find_element(By.XPATH, ".//button[contains(., 'Mark Skipped')]") if mark_skipped_btn and mark_skipped_btn.is_displayed(): driver.execute_script("arguments[0].scrollIntoView({block:'center', behavior:'smooth'});", mark_skipped_btn) time.sleep(0.3) try: mark_skipped_btn.click() except Exception: driver.execute_script("arguments[0].click();", mark_skipped_btn) skipped_count += 1 print(f"[Accept Jobs] Marked card as skipped (no results found)") time.sleep(0.5) except Exception: pass except Exception as e: continue if skipped_count > 0: print(f"[Accept Jobs] Marked {skipped_count} cards as pending (no results)") total_skipped += skipped_count time.sleep(1.0) # Wait after marking items as pending except Exception as e: print(f"[Accept Jobs] Error checking for skipped cards: {e}") # Find all "Update part" buttons that are NOT disabled (no 'disabled' in class) update_button = None try: # Find or