commit aaa1f7520a3c1b3c78bc4990951567c2ded8149f Author: Nick Date: Thu Oct 2 22:45:58 2025 +1000 Initial move to app, scanning and reading basic part info works, updating info starting to work diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6e4fd52 --- /dev/null +++ b/.gitignore @@ -0,0 +1,106 @@ +# ----------------- +# Python +# ----------------- +__pycache__/ +*.py[cod] +*$py.class + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Jupyter Notebook +.ipynb_checkpoints + +# mypy / pytype / pyre +.mypy_cache/ +.dmypy.json +dmypy.json +.pyre/ +.pytype/ + +# ----------------- +# Virtual environments +# ----------------- +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# ----------------- +# IDEs / Editors +# ----------------- +.vscode/ +.idea/ +*.swp +*.swo + +# ----------------- +# Logs +# ----------------- +*.log + +# ----------------- +# OS junk +# ----------------- +.DS_Store +Thumbs.db + +# ----------------- +# Project-specific +# ----------------- +# Local config / secrets +config_local.py +*.token +*.secret +*.key +*.db + +# Selenium/Browser cache +selenium-screenshots/ +selenium-downloads/ + +# Tkinter user settings (if any) +*.tcl diff --git a/apis/digikey_api.py b/apis/digikey_api.py new file mode 100644 index 0000000..d307fdb --- /dev/null +++ b/apis/digikey_api.py @@ -0,0 +1,28 @@ +import os +import requests +from config import DIGIKEY_API_KEY + +# Very light wrapper (adjust base/headers to match your DK plan) +BASE = "https://api.digikey.com/services/partsearch/v2" + +def suggest_category_from_digikey(mp_or_dkpn: str) -> str | None: + key = (DIGIKEY_API_KEY or "").strip() + if not key: + return None # no key → skip DK lookup + try: + hdr = {"X-API-Key": key} + # Try by part number; if this endpoint differs in your plan, swap as needed: + r = requests.get(f"{BASE}/parts/{mp_or_dkpn}", headers=hdr, timeout=12) + if r.status_code != 200: + return None + js = r.json() or {} + # Adjust paths to whatever your DK response returns: + # Common fields are like "Category", "Family", etc. + cat = js.get("Category") or js.get("CategoryName") or js.get("Class") + if isinstance(cat, dict): + return cat.get("Name") or cat.get("name") + if isinstance(cat, str) and cat.strip(): + return cat.strip() + except Exception: + pass + return None diff --git a/apis/partdb_api.py b/apis/partdb_api.py new file mode 100644 index 0000000..b7b5f9f --- /dev/null +++ b/apis/partdb_api.py @@ -0,0 +1,342 @@ +import re, requests +from typing import Optional, Dict, Any, List +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +class PartDB: + def __init__(self, base_url: str, token: str, timeout: int = 30): + self.base = base_url.rstrip("/") + self.s = requests.Session() + self.s.headers.update({"Authorization": f"Bearer {token}", "Accept": "application/json"}) + adapter = HTTPAdapter(pool_connections=64, pool_maxsize=64, + max_retries=Retry(total=3, backoff_factor=0.3, status_forcelist=(502,503,504))) + self.s.mount("http://", adapter); self.s.mount("https://", adapter) + self.timeout = timeout + + def _get(self, path: str, params: Dict[str, Any] | None = None): + r = self.s.get(f"{self.base}{path}", params=params, timeout=self.timeout) + r.raise_for_status(); return r.json() + + def _post_try(self, path: str, payload: Dict[str, Any]) -> requests.Response: + r = self.s.post(f"{self.base}{path}", json=payload, timeout=self.timeout, + headers={"Content-Type":"application/json"}) + if r.status_code in (200,201): return r + if r.status_code in (400,415): + r = self.s.post(f"{self.base}{path}", json=payload, timeout=self.timeout, + headers={"Content-Type":"application/vnd.api+json"}) + return r + + def _patch_merge(self, path: str, payload: Dict[str, Any]) -> requests.Response: + return self.s.patch(f"{self.base}{path}", json=payload, timeout=self.timeout, + headers={"Content-Type":"application/merge-patch+json"}) + + @staticmethod + def _extract_id(obj: Dict[str, Any]) -> Optional[int]: + if obj is None: return None + if isinstance(obj.get("id"), int): return obj["id"] + if isinstance(obj.get("_id"), int): return obj["_id"] + raw = obj.get("id") + if isinstance(raw, str): + digits = "".join(ch for ch in raw if ch.isdigit()) + if digits: return int(digits) + attrs = obj.get("attributes") or {} + if isinstance(attrs.get("_id"), int): return attrs["_id"] + return None + + def ensure_category(self, name: str) -> int: + js = self._get("/api/categories", params={"name": name}) + if isinstance(js, list) and js: return int(self._extract_id(js[0])) + r = self._post_try("/api/categories", {"name": name}); r.raise_for_status() + return int(self._extract_id(r.json())) + + def ensure_manufacturer(self, name: str) -> int: + js = self._get("/api/manufacturers", params={"name": name}) + if isinstance(js, list) and js: return int(self._extract_id(js[0])) + r = self._post_try("/api/manufacturers", {"name": name}); r.raise_for_status() + return int(self._extract_id(r.json())) + + def ensure_footprint(self, name: str) -> int: + js = self._get("/api/footprints", params={"name": name}) + if isinstance(js, list) and js: return int(self._extract_id(js[0])) + r = self._post_try("/api/footprints", {"name": name}); r.raise_for_status() + return int(self._extract_id(r.json())) + + def find_part_id_by_mpn(self, mpn: str) -> Optional[int]: + def norm(s: Optional[str]) -> str: + import re + return re.sub(r"\s+","",(s or "")).strip().lower() + want = norm(mpn) + def get_field(p: dict, key: str) -> Optional[str]: + if key in p: return p.get(key) + return (p.get("attributes") or {}).get(key) + for params in ( + {"manufacturer_product_number": mpn, "per_page": 50}, + {"name": mpn, "per_page": 50}, + {"search": mpn, "per_page": 50}, + ): + try: + js = self._get("/api/parts", params=params) + except Exception: + continue + if not isinstance(js, list): + continue + for p in js: + if norm(get_field(p,"manufacturer_product_number")) == want or norm(get_field(p,"name")) == want: + return self._extract_id(p) + return None + + def create_part(self, *, name: str, category_id: int, manufacturer_id: int, + mpn: str, description: str = "", product_url: Optional[str] = None, + footprint_id: Optional[int] = None) -> int: + payload = { + "name": name, "description": description or "", + "category": f"/api/categories/{category_id}", + "manufacturer": f"/api/manufacturers/{manufacturer_id}", + "manufacturer_product_number": mpn, + } + if product_url: payload["manufacturer_product_url"] = product_url + if footprint_id is not None: payload["footprint"] = f"/api/footprints/{footprint_id}" + r = self._post_try("/api/parts", payload) + if r.status_code not in (200,201): + raise RuntimeError(f"Create part failed: {r.status_code} {r.text}") + pid = self._extract_id(r.json()) + if not pid: raise RuntimeError(f"Create part: no id in response: {r.text}") + return int(pid) + + def get_part(self, part_id: int) -> dict: + return self._get(f"/api/parts/{part_id}") + + def get_part_parameter_values(self, part_id: int): + for path in ( + f"/api/parts/{part_id}/parameter-values", + f"/api/parts/{part_id}/parameter_values", + f"/api/parameter-values?filter[part_id]={part_id}", + ): + try: + js = self._get(path) + if isinstance(js, list) and js: return js + if isinstance(js, dict) and js.get("data"): return js["data"] + except Exception: + pass + return [] + + def get_part_params_flat(self, part: dict) -> Dict[str, tuple[Optional[str], Optional[str]]]: + out: Dict[str, tuple[Optional[str], Optional[str]]] = {} + params = part.get("parameters") or part.get("parameter_values") or [] + for p in params: + name = (p.get("name") or (p.get("parameter") or {}).get("name") or "").strip() + if not name: continue + raw_val = None + for key in ("value_typical","typ","value_text","value","max","min","nominal"): + if p.get(key) not in (None,""): + raw_val = p.get(key); break + u = p.get("unit"); unit = (u.get("symbol") or u.get("name")) if isinstance(u, dict) else u + if raw_val in (None,"") and p.get("formatted"): + raw_val = p["formatted"]; unit = None + out[name.lower()] = (None if raw_val in (None,"") else str(raw_val), unit) + return out + + def get_eda_value(self, part: dict): + eda = part.get("eda_info") or part.get("eda") + if isinstance(eda, dict): return eda.get("value") + return part.get("eda_value") or part.get("edaValue") + + def patch_eda_value(self, part_id: int, eda_value: str) -> bool: + for payload in ({"eda_info":{"value":eda_value}}, + {"eda":{"value":eda_value}}, + {"eda_value":eda_value}): + r = self._patch_merge(f"/api/parts/{part_id}", payload) + if 200 <= r.status_code < 300: return True + payload = {"data":{"type":"parts","id":str(part_id),"attributes":{"eda_value":eda_value}}} + r = self.s.patch(f"{self.base}/api/parts/{part_id}", json=payload, + headers={"Content-Type":"application/vnd.api+json"}, timeout=self.timeout) + return 200 <= r.status_code < 300 + + def list_categories(self) -> list[dict]: + # Paginates if your server is configured that way; adjust per your API. + js = self._get("/api/categories", params={"per_page": 500}) + return js if isinstance(js, list) else [] + + def create_category_if_missing(self, name: str) -> int: + # You already have ensure_category, but this uses list to avoid extra calls + name = (name or "").strip() + if not name: + raise ValueError("Category name empty") + for c in self.list_categories(): + if (c.get("name") or "").strip().lower() == name.lower(): + cid = self._extract_id(c) + if cid: return cid + r = self._post_try("/api/categories", {"name": name}) + r.raise_for_status() + cid = self._extract_id(r.json()) + if not cid: + raise RuntimeError("Create category: missing id in response") + return cid + + def find_part_by_mpn_or_search(self, key: str) -> Optional[int]: + key = (key or "").strip() + if not key: + return None + # 1) exact by manufacturer_product_number + pid = self.find_part_id_by_mpn(key) + if pid: + return pid + # 2) name match + try: + js = self._get("/api/parts", params={"name": key, "per_page": 50}) + if isinstance(js, list) and js: + cand = js[0] + return self._extract_id(cand) + except Exception: + pass + # 3) generic search + try: + js = self._get("/api/parts", params={"search": key, "per_page": 50}) + if isinstance(js, list) and js: + cand = js[0] + return self._extract_id(cand) + except Exception: + pass + return None + + def summarize_part(self, part_id: int) -> dict: + """Returns a small summary used by the UI (robust to different schemas).""" + p = self.get_part(part_id) + + def _get(path, default=""): + obj = p + for k in path.split("."): + if not isinstance(obj, dict): + return default + obj = obj.get(k) + return obj if obj is not None else default + + # name / mpn + name = _get("name") or _get("attributes.name") or "" + mpn = _get("manufacturer_product_number") or _get("attributes.manufacturer_product_number") or "" + + # category + cat = _get("category") or _get("relationships.category") or {} + cat_name = (cat.get("name") if isinstance(cat, dict) else "") or "" + + # footprint + fp = _get("footprint") or _get("relationships.footprint") or {} + fp_name = (fp.get("name") if isinstance(fp, dict) else "") or "" + + # eda value + eda = self.get_eda_value(p) or "" + + # description + desc = _get("description") or _get("attributes.description") or "" + + # stock / locations (best-effort) + stock_total = None + locations: list[str] = [] + + # Try common embedded places + for key in ("stock", "stock_total", "stockTotal"): + v = p.get(key) + if isinstance(v, (int, float)) and v >= 0: + stock_total = int(v); break + + # Try a dedicated endpoint, if your instance exposes it + if stock_total is None: + for path in (f"/api/parts/{part_id}/stock-entries", + f"/api/stock-entries?part={part_id}", + f"/api/stock_entries?part={part_id}"): + try: + js = self._get(path) + if isinstance(js, list): + stock_total = sum(int(row.get("amount") or 0) for row in js) + for row in js: + loc = (row.get("storage_location") or {}).get("name") if isinstance(row.get("storage_location"), dict) else None + if loc: locations.append(str(loc)) + break + except Exception: + pass + + return { + "id": part_id, + "name": name, + "mpn": mpn, + "category": cat_name, + "footprint": fp_name, + "eda_value": eda, + "description": desc, + "stock": stock_total, + "locations": sorted(set(locations)) if locations else [], + } + + def find_part_exact(self, mpn=None, dkpn=None): + if dkpn: + res = self._get_try(f"/api/parts?digikey_pn={dkpn}") + if res.status_code == 200 and res.json(): + return res.json()[0] + + if mpn: + res = self._get_try(f"/api/parts?manufacturer_product_number={mpn}") + if res.status_code == 200 and res.json(): + return res.json()[0] + + return None + + @staticmethod + def _norm(s) -> str: + return ("" if s is None else str(s)).strip().lower() + + def _orderdetails_iter(self, part: dict): + """Yield supplier name and sku-ish fields from order details (varies by Part-DB version).""" + rows = part.get("orderdetails") or part.get("orderDetails") or [] + for r in rows if isinstance(rows, list) else []: + supplier = r.get("supplier") or r.get("vendor") or {} + supplier_name = supplier.get("name") if isinstance(supplier, dict) else (supplier or "") + # common field names we see in the wild + sku = ( + r.get("order_number") or r.get("orderNumber") or + r.get("order_part_number") or r.get("orderPartNumber") or + r.get("supplier_product_number") or r.get("supplierProductNumber") or + r.get("sku") or r.get("mpn_or_sku") + ) + yield str(supplier_name or ""), None if sku in (None, "") else str(sku) + + def _part_has_dkpn(self, part: dict, dkpn: str) -> bool: + """True if any order-detail SKU equals the dkpn (case-insensitive).""" + want = self._norm(dkpn) + if not want: + return False + # direct field (some installs keep a vendor PN field) + for key in ("digikey_pn", "dk_pn", "vendor_pn", "external_sku"): + v = part.get(key) or (part.get("attributes") or {}).get(key) + if self._norm(v) == want: + return True + # look inside order details + for supplier_name, sku in self._orderdetails_iter(part): + if self._norm(sku) == want: + return True + return False + + def find_part_exact(self, *, dkpn: str | None = None, mpn: str | None = None) -> int | None: + """ + Prefer exact DKPN match (by checking supplier SKU in order details), + then fall back to exact MPN match. Returns part_id or None. + """ + # 1) Try to locate by DKPN: do a broad search, then verify exact match + if dkpn: + try: + js = self._get("/api/parts", params={"search": dkpn, "per_page": 100}) + if isinstance(js, list): + for p in js: + if self._part_has_dkpn(p, dkpn): + pid = self._extract_id(p) + if pid: + return pid + except Exception: + pass + + # 2) Exact by MPN (your helper is already exact) + if mpn: + pid = self.find_part_id_by_mpn(mpn) + if pid: + return pid + + return None diff --git a/config.py b/config.py new file mode 100644 index 0000000..6f07c5c --- /dev/null +++ b/config.py @@ -0,0 +1,33 @@ +# Centralised knobs + +PARTDB_BASE = "https://partdb.neutronservices.duckdns.org" +PARTDB_TOKEN = "tcp_564c6518a8476c25c68778e640c1bf40eecdec9f67be580bbd6504e9b6ebe7ed" +UI_LANG_PATH = "/en" + +# Modes: "bulk" or "scan" +MODE = "scan" + +# Scanner +COM_PORT = "COM7" +BAUD_RATE = 115200 + +# Selenium / provider flow +HEADLESS_CONTROLLER = False # controller browser (GUI-triggered) +HEADLESS_PROVIDER = False # provider updates +HEADLESS_WORKER = False # background workers (set True later for speed) +MAX_RETRIES = 2 +MAX_PARALLEL_WORKERS = 2 +PRINT_FAILURE_TABLE = True +GECKO_LOG_PATH = "geckodriver.log" + +# Digikey +DIGIKEY_API_KEY = "" + +# Login +ENV_USER_VAR = "PARTDB_USER" # if set, used instead of the fallback constants below +ENV_PASS_VAR = "PARTDB_PASS" +ENV_USER = "Nick" +ENV_PASSWORD = "O@IyECa^XND7BvPpRX9XRKBhv%XVwCV4" + +# UI defaults +WINDOW_GEOM = "860x560" \ No newline at end of file diff --git a/devices/scanner_serial.py b/devices/scanner_serial.py new file mode 100644 index 0000000..5752db4 --- /dev/null +++ b/devices/scanner_serial.py @@ -0,0 +1,44 @@ +import csv, re, serial +from typing import Callable, Optional + +CTRL_SPLIT_RE = re.compile(rb'[\x1d\x1e\x04]+') # GS, RS, EOT + +def _strip_header_bytes(b: bytes) -> bytes: + b = b.strip() + if b.startswith(b'\x02'): b = b[1:] # STX + if b.startswith(b'[)>'): b = b[3:] + if b.startswith(b'06'): b = b[2:] # format code + return b + +def _normalize_from_bytes(raw: bytes) -> str: + raw = _strip_header_bytes(raw) + chunks = [c for c in CTRL_SPLIT_RE.split(raw) if c] + flat = b''.join(chunks) if chunks else raw + s = flat.decode('ascii', errors='ignore').strip() + if s.startswith('06'): s = s[2:] + return s + +def scan_loop(port: str, baud: int, on_scan: Callable[[str], None]): + ser = serial.Serial(port, baud, timeout=2) + print(f"Opened {port}. Ready — scan a label.") + while True: + data = ser.read_until(b'\r') # adjust to b'\n' if needed + if not data: + continue + flat = _normalize_from_bytes(data) + if flat: + on_scan(flat) + +def maybe_append_csv(path: Optional[str], fields: dict): + if not path: return + newfile = not os.path.exists(path) + import os + with open(path, "a", newline="", encoding="utf-8") as f: + w = csv.DictWriter(f, fieldnames=list(fields.keys())) + if newfile: w.writeheader() + w.writerow(fields) + +def read_one_scan(port: str, baud: int, read_terminator: bytes = b'\r') -> str: + with serial.Serial(port, baud, timeout=10) as ser: + data = ser.read_until(read_terminator) + return _normalize_from_bytes(data) \ No newline at end of file diff --git a/jobs.py b/jobs.py new file mode 100644 index 0000000..59b453d --- /dev/null +++ b/jobs.py @@ -0,0 +1,55 @@ +from dataclasses import dataclass +from typing import Optional, List +from parsers.values import e_series_values, value_to_code + +def rc0805fr_mpn_from_ohms(value: float) -> str: + if value == 0: return "RC0805JR-070RL" + return f"RC0805FR-07{value_to_code(value)}L" + +def rc0603fr_mpn_from_ohms(value: float) -> str: + if value == 0: return "RC0603JR-070RL" + return f"RC0603FR-07{value_to_code(value)}L" + +def generate_rc0805fr_e32() -> List[str]: + values = [0.0] + e_series_values(32, rmin=1.0, rmax=1e7) + return [rc0805fr_mpn_from_ohms(v) for v in values] + +def generate_rc0603fr_e32() -> List[str]: + values = [0.0] + e_series_values(32, rmin=1.0, rmax=1e7) + return [rc0603fr_mpn_from_ohms(v) for v in values] + +CAP_PLAN = [ + ("1pF","X7R","10%","50V"), ("2.2pF","X7R","10%","50V"), ("4.7pF","X7R","10%","50V"), + ("10pF","X7R","10%","50V"), ("22pF","X7R","10%","50V"), ("47pF","X7R","10%","50V"), + ("100pF","X7R","10%","50V"),("220pF","X7R","10%","50V"),("470pF","X7R","10%","50V"), + ("1nF","X7R","10%","50V"), ("2.2nF","X7R","10%","50V"),("4.7nF","X7R","10%","50V"), + ("10nF","X7R","10%","50V"), ("22nF","X7R","10%","50V"), ("47nF","X7R","10%","50V"), + ("100nF","X7R","10%","50V"),("220nF","X7R","10%","50V"),("470nF","X7R","10%","50V"), + ("1uF","X7R","10%","50V"), ("2.2uF","X7R","10%","25V"),("4.7uF","X5R","10%","10V"), + ("10uF","X5R","10%","6.3V"), +] + +def _mk_cap_query(case: str, val: str, diel: str, tol: str, volt: str) -> str: + return f"{case} {diel} {tol} {volt} {val}" + +def generate_caps_0805_queries() -> List[str]: + return [_mk_cap_query("0805", v, d, t, u) for (v,d,t,u) in CAP_PLAN] + +def generate_caps_1206_dupes_for_low_v(threshold_v: float, target_v: str|None) -> List[str]: + out = [] + def _volt_to_float(vs: str) -> float: + try: return float(vs.lower().replace("v","").strip()) + except: return 0.0 + for (v,d,t,u) in CAP_PLAN: + if _volt_to_float(u) < threshold_v: + new_u = target_v or u + out.append(_mk_cap_query("1206", v, d, t, new_u)) + return out + +@dataclass +class Job: + category: str + manufacturer: str + footprint: Optional[str] + seeds: List[str] + kind: str # "resistor" | "capacitor" \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..88e6bb4 --- /dev/null +++ b/main.py @@ -0,0 +1,4 @@ +from ui.app_tk import run + +if __name__ == "__main__": + run() diff --git a/parsers/digikey_mh10.py b/parsers/digikey_mh10.py new file mode 100644 index 0000000..97a5cf9 --- /dev/null +++ b/parsers/digikey_mh10.py @@ -0,0 +1,48 @@ +import re +from typing import Dict, List, Tuple + +NEXT_TAG_RE = re.compile(r'(PICK|K14|30P|1P|K10|K1|D|T|Z)') + +def tokenize(flat: str) -> List[Tuple[str, str]]: + tokens: List[Tuple[str, str]] = [] + s = flat + p_idx = s.find('P') + if p_idx == -1: return tokens + start_val = p_idx + 1 + m = NEXT_TAG_RE.search(s, start_val) + if m: + tokens.append(('P', s[start_val:m.start()].strip())) + else: + tokens.append(('P', s[start_val:].strip())) + return tokens + while m: + tag = m.group(1) + start_val = m.end() + m_next = NEXT_TAG_RE.search(s, start_val) + if m_next: + tokens.append((tag, s[start_val:m_next.start()].strip())) + m = m_next + else: + tokens.append((tag, s[start_val:].strip())) + break + return tokens + +def _first(tokens: List[Tuple[str, str]], tag: str) -> str: + for t, v in tokens: + if t == tag: + return v + return "" + +def parse_digikey(flat: str) -> Dict[str, str]: + tokens = tokenize(flat) + return { + 'DigiKeyPart' : _first(tokens, 'P'), + 'MfrPart' : _first(tokens, '1P'), + 'CustomerPart' : _first(tokens, '30P'), + 'InternalLot1' : _first(tokens, 'K1'), + 'InternalLot2' : _first(tokens, 'K10'), + 'DateCodeRaw' : _first(tokens, 'D'), + 'TraceID' : _first(tokens, 'T'), + 'PackageSerial': _first(tokens, 'K14'), + 'PickTicket' : _first(tokens, 'PICK'), + } \ No newline at end of file diff --git a/parsers/values.py b/parsers/values.py new file mode 100644 index 0000000..d99dee5 --- /dev/null +++ b/parsers/values.py @@ -0,0 +1,119 @@ +import re, math +from typing import Optional, List + +RE_RES_SIMPLE = re.compile(r"(?i)^\s*(\d+(?:\.\d+)?)\s*(ohm|ohms|r|k|m|kohm|mohm|kΩ|mΩ|kOhm|MOhm)?\s*$") +RE_RES_LETTER = re.compile(r"(?i)^\s*(\d+)([rkm])(\d+)?\s*$") +RE_CAP_SIMPLE = re.compile(r"(?i)^(\d+(?:\.\d+)?)(p|n|u|m|pf|nf|uf|mf|f)?$") +RE_CAP_LETTER = re.compile(r"(?i)^(\d+)([pnu])(\d+)?$") + +def round_sig(x: float, sig: int) -> float: + if x == 0: return 0.0 + return round(x, sig - 1 - int(math.floor(math.log10(abs(x))))) + +def e_series_values(E: int, rmin=1.0, rmax=1e7, sig_digits: Optional[int]=None) -> List[float]: + if sig_digits is None: sig_digits = 2 if E <= 24 else 3 + base: List[float] = [] + for i in range(E): + v = round_sig(10 ** (i / E), sig_digits) + if v not in base: + base.append(v) + out = set() + min_dec = int(math.floor(math.log10(rmin))) + max_dec = int(math.ceil(math.log10(rmax))) + for d in range(min_dec - 1, max_dec + 1): + scale = 10 ** d + for b in base: + val = round_sig(b * scale, sig_digits) + if rmin <= val <= rmax: + out.add(val) + return sorted(out) + +def value_to_code(ohms: float) -> str: + if ohms == 0: return "0R" + if ohms < 1e3: + unit, n = "R", ohms + elif ohms < 1e6: + unit, n = "K", ohms / 1e3 + else: + unit, n = "M", ohms / 1e6 + s = f"{n:.3g}" + if "e" in s or "E" in s: + dec = max(0, 3 - 1 - int(math.floor(math.log10(abs(n))))) + s = f"{n:.{dec}f}".rstrip("0").rstrip(".") + return s.replace(".", unit) if "." in s else s + unit + +def parse_resistance_to_ohms(value: str, unit: Optional[str]) -> Optional[float]: + if value is None: return None + s = str(value).strip().replace(" ", "").replace("Ω","ohm").replace("Ω","ohm") + m = RE_RES_SIMPLE.fullmatch(s) + if m and unit is None: + num = float(m.group(1)); u = (m.group(2) or "").lower() + table = {"ohm":1.0, "ohms":1.0, "r":1.0, "k":1e3, "kohm":1e3, "kω":1e3, "m":1e6, "mohm":1e6, "mω":1e6} + return num * table.get(u, 1.0) + m = RE_RES_LETTER.fullmatch(s) + if m and unit is None: + lead = int(m.group(1)); letter = m.group(2).lower(); tail = m.group(3) or "" + frac = float(f"0.{tail}") if tail else 0.0 + mul = {"r":1.0, "k":1e3, "m":1e6}[letter] + return (lead + frac) * mul + try: + num = float(s) + if unit is None: return num + u = str(unit).strip().lower() + table = {"ohm":1.0, "ohms":1.0, "r":1.0, "k":1e3, "kohm":1e3, "m":1e6, "mohm":1e6} + mul = table.get(u); + return num * mul if mul else None + except ValueError: + return None + +def format_ohms_for_eda(ohms: float) -> str: + if ohms == 0: return "0" + if ohms < 1_000: + v = float(ohms); + if v.is_integer(): return f"{int(v)}R" + whole = int(v); frac = int(round((v - whole) * 10)) + return f"{whole}R{frac}" if frac else f"{whole}R" + if ohms < 1_000_000: + v = float(ohms) / 1e3 + if v.is_integer(): return f"{int(v)}K" + whole = int(v); frac = int(round((v - whole) * 10)) + return f"{whole}K{frac}" if frac else f"{whole}K" + v = float(ohms) / 1e6 + if v.is_integer(): return f"{int(v)}M" + whole = int(v); frac = int(round((v - whole) * 10)) + return f"{whole}M{frac}" if frac else f"{whole}M" + +def parse_capacitance_to_farads(value: str, unit: Optional[str]) -> Optional[float]: + if value is None: return None + s = str(value).strip().replace(" ", "").replace("µ","u").replace("μ","u") + m = RE_CAP_SIMPLE.fullmatch(s) + if m and unit is None: + num = float(m.group(1)); su = (m.group(2) or "f").lower() + mul = {"f":1.0, "pf":1e-12, "p":1e-12, "nf":1e-9, "n":1e-9, "uf":1e-6, "u":1e-6, "mf":1e-3, "m":1e-3}.get(su,1.0) + return num * mul + m = RE_CAP_LETTER.fullmatch(s) + if m and unit is None: + lead = int(m.group(1)); letter = m.group(2).lower(); tail = m.group(3) or "" + frac = float(f"0.{tail}") if tail else 0.0 + mul = {"p":1e-12,"n":1e-9,"u":1e-6}[letter] + return (lead + frac) * mul + try: + num = float(s) + if unit is None: return num + u = str(unit).strip().lower().replace("µ","u") + table = {"f":1.0, "farad":1.0, "pf":1e-12, "picofarad":1e-12, "nf":1e-9, "nanofarad":1e-9, "uf":1e-6, "microfarad":1e-6} + return num * table.get(u, 1.0) + except ValueError: + return None + +def format_farads_for_eda(F: float) -> str: + if F >= 1e-6: + n = F * 1e6; unit = "u" + elif F >= 1e-9: + n = F * 1e9; unit = "n" + else: + n = F * 1e12; unit = "p" + if abs(n - round(n)) < 1e-6: + return f"{int(round(n))}{unit}" + n1 = round(n, 1); whole = int(n1); frac = int(round((n1 - whole) * 10)) + return f"{whole}{unit}" if frac == 0 else f"{whole}{unit}{frac}" \ No newline at end of file diff --git a/partdb_cookies.json b/partdb_cookies.json new file mode 100644 index 0000000..e69de29 diff --git a/provider/selenium_flow.py b/provider/selenium_flow.py new file mode 100644 index 0000000..1e6ac90 --- /dev/null +++ b/provider/selenium_flow.py @@ -0,0 +1,851 @@ +import os, time, tempfile, shutil, traceback, json + +from typing import Tuple + +from selenium import webdriver +from selenium.webdriver.common.by import By +# waits +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC + +# Firefox +from selenium.webdriver.firefox.service import Service as FirefoxService +from selenium.webdriver.firefox.options import Options as FirefoxOptions +from webdriver_manager.firefox import GeckoDriverManager + +from apis.partdb_api import PartDB +from config import ( + PARTDB_BASE, + UI_LANG_PATH, + ENV_USER_VAR, + ENV_PASS_VAR, + ENV_USER, + ENV_PASSWORD, + HEADLESS_PROVIDER, + HEADLESS_CONTROLLER, + HEADLESS_WORKER, +) + +GECKO_LOG_PATH = os.path.abspath("geckodriver.log") + +def start_firefox_resilient(*, headless_first: bool, allow_chrome_fallback: bool = True): + """ + Start Firefox. Headless is EXACTLY as requested. If Firefox fails and + allow_chrome_fallback=True, we fall back to Chrome (also non-headless). + Emits clear debug prints so you know what launched. + """ + # Make sure MOZ_HEADLESS doesn't override us + os.environ.pop("MOZ_HEADLESS", None) + + def _ff_try(headless: bool, binary: str | None = None): + profile_dir = tempfile.mkdtemp(prefix="ff-profile-") + try: + opts = FirefoxOptions() + if headless: + opts.add_argument("-headless") # ONLY when explicitly requested + opts.set_preference("browser.shell.checkDefaultBrowser", False) + opts.set_preference("browser.startup.homepage_override.mstone", "ignore") + opts.add_argument("-profile") + opts.add_argument(profile_dir) + if binary: + opts.binary_location = binary + + service = FirefoxService(GeckoDriverManager().install(), log_output=GECKO_LOG_PATH) + drv = webdriver.Firefox(service=service, options=opts) + try: + drv.maximize_window() + except Exception: + pass + print(f"[Selenium] Firefox launched (headless={headless}, bin={binary or 'default'}).") + return drv + except Exception as e: + print(f"[Selenium] Firefox launch failed (headless={headless}, bin={binary or 'default'}): {e}") + shutil.rmtree(profile_dir, ignore_errors=True) + return None + + # 1) exact request + d = _ff_try(headless_first, None) + if d: + return d + + # 2) try flipping headless (sometimes GPU/driver quirks) + d = _ff_try(not headless_first, None) + if d: + print(f"[Selenium] NOTE: flipped headless to {not headless_first} due to previous failure.") + return d + + # 3) try known Firefox binaries + for binpath in [ + r"C:\Program Files\Mozilla Firefox\firefox.exe", + r"C:\Program Files (x86)\Mozilla Firefox\firefox.exe", + shutil.which("firefox") or "", + ]: + if binpath and os.path.exists(binpath): + d = _ff_try(headless_first, binpath) or _ff_try(not headless_first, binpath) + if d: + return d + + # 4) fallback to Chrome if allowed + if allow_chrome_fallback: + try: + from selenium.webdriver.chrome.service import Service as ChromeService + from selenium.webdriver.chrome.options import Options as ChromeOptions + from webdriver_manager.chrome import ChromeDriverManager + c_opts = ChromeOptions() + # IMPORTANT: no headless unless requested + if headless_first: + c_opts.add_argument("--headless=new") + c_srv = ChromeService(ChromeDriverManager().install()) + drv = webdriver.Chrome(service=c_srv, options=c_opts) + try: + drv.maximize_window() + except Exception: + pass + print(f"[Selenium] Chrome fallback launched (headless={headless_first}).") + return drv + except Exception as e: + print(f"[Selenium] Chrome fallback failed: {e}") + + # 5) show geckodriver tail to help debug + try: + with open(GECKO_LOG_PATH, "r", encoding="utf-8", errors="ignore") as f: + tail = "".join(f.readlines()[-80:]) + print("\n[geckodriver.log tail]\n" + tail + "\n") + except Exception: + pass + raise RuntimeError("Could not launch a visible browser. See messages above.") + +def _is_logged_in(driver) -> bool: + try: + url = (driver.current_url or "").lower() + if "/login" in url: + return False + # any “Logout” link or user-menu + if driver.find_elements(By.XPATH, "//a[contains(@href,'logout') or contains(., 'Logout') or contains(., 'Sign out')]"): + return True + # many Part-DB skins have a user dropdown with a logout item: + if driver.find_elements(By.CSS_SELECTOR, "[data-user-menu], .user-menu, a[href*='/logout']"): + return True + # if we can see a main nav that’s only visible when logged in: + if driver.find_elements(By.CSS_SELECTOR, "nav, .navbar, header .nav"): + # don’t return True on the login page’s navbar: + return "/login" not in url + except Exception: + pass + return False + +COOKIES_FILE = os.path.abspath("partdb_cookies.json") + +def _save_cookies(driver, base_url: str, path: str = COOKIES_FILE): + try: + driver.get(base_url + "/") + time.sleep(0.2) + with open(path, "w", encoding="utf-8") as f: + json.dump(driver.get_cookies(), f) + print("[Login] Cookies saved.") + except Exception as e: + print("[Login] Save cookies failed:", e) + +def _load_cookies(driver, base_url: str, path: str = COOKIES_FILE) -> bool: + if not os.path.exists(path): + return False + try: + driver.get(base_url + "/") + time.sleep(0.2) + with open(path, "r", encoding="utf-8") as f: + for c in json.load(f): + try: + driver.add_cookie(c) + except Exception: + pass + driver.get(base_url + "/") + time.sleep(0.6) + ok = _is_logged_in(driver) + print("[Login] Cookies loaded →", "OK" if ok else "not logged in") + return ok + except Exception as e: + print("[Login] Load cookies failed:", e) + return False + +def _try_auto_login(driver, base_url: str, username: str, password: str, timeout_s: int = 120) -> bool: + """ + Very robust login: + - opens login page + - fills known selectors + - JS fallback to submit the form + - waits until /login disappears AND a logout link is visible + """ + login_url = base_url.rstrip("/") + UI_LANG_PATH + "/login" + driver.get(login_url) + + wait = WebDriverWait(driver, 30) + + # Find username field (many variants) + user_sel = [ + "input[name='_username']", + "input#username", + "input[name='username']", + "input[type='email']", + "input[type='text']", + ] + pass_sel = [ + "input[name='_password']", + "input#password", + "input[name='password']", + "input[type='password']", + ] + submit_sel = [ + "button[type='submit']", + "input[type='submit']", + "button.btn-primary", + "button[class*='login']", + "form button", + ] + + def _query_any(selectors): + for s in selectors: + els = driver.find_elements(By.CSS_SELECTOR, s) + if els: + return els[0] + return None + + # Wait for any username/password field to appear + try: + wait.until(lambda d: _query_any(user_sel) and _query_any(pass_sel)) + except Exception: + return False + + u = _query_any(user_sel); p = _query_any(pass_sel) + try: + u.clear(); u.send_keys(username) + p.clear(); p.send_keys(password) + except Exception: + pass + + # Try clicking a submit button; fall back to JS submit + btn = _query_any(submit_sel) + if btn: + try: + driver.execute_script("arguments[0].scrollIntoView({block:'center'});", btn) + except Exception: + pass + try: + btn.click() + except Exception: + # JS click fallback + try: + driver.execute_script("arguments[0].click();", btn) + except Exception: + pass + else: + # No visible submit → try submitting the first form + try: + driver.execute_script("var f=document.querySelector('form'); if(f){f.submit();}") + except Exception: + pass + + # Wait until not on /login and a logout becomes visible (or timeout) + end = time.time() + timeout_s + while time.time() < end: + if _is_logged_in(driver): + return True + time.sleep(0.5) + return False + +def ensure_logged_in(driver, base_url: str, *, interactive_ok: bool = True, wait_s: int = 240) -> bool: + # Try cookies first + if _load_cookies(driver, base_url): + return True + + # Auto login with creds if available + user = (os.getenv(ENV_USER_VAR) or ENV_USER or "").strip() + pwd = (os.getenv(ENV_PASS_VAR) or ENV_PASSWORD or "").strip() + if user and pwd: + if _auto_login_once(driver, base_url, user, pwd, timeout_s=120): + _save_cookies(driver, base_url) + return True + + if not interactive_ok: + return False + + # Manual fallback: let user log in, then save cookies + login_url = base_url.rstrip("/") + UI_LANG_PATH + "/login" + print("Please login to Part-DB in the opened browser window…") + try: + driver.get(login_url) + try: + driver.maximize_window() + except Exception: + pass + end = time.time() + wait_s + while time.time() < end: + if _is_logged_in(driver): + print("Login detected; saving cookies.") + _save_cookies(driver, base_url) + return True + time.sleep(0.6) + print("Login was not detected before timeout.") + return False + except Exception: + return False + +def click_by_text(driver, labels, *, exact_only=False, exclude=None, index=None, timeout=60) -> bool: + """ + Robust text clicker that NEVER caches elements across DOM changes. + It re-queries candidates every attempt, scrolls into view, and retries on staleness. + """ + import time, re + want = [re.sub(r"\s+"," ", (x or "")).strip().lower() for x in labels] + exclude = [re.sub(r"\s+"," ", (x or "")).strip().lower() for x in (exclude or [])] + end = time.time() + timeout + + def label(el): + t = (el.text or "").strip() + if not t: + for a in ("value","aria-label","title","data-original-title"): + v = el.get_attribute(a) + if v: return v.strip() + return t + + def all_clickables(): + # Do NOT keep references across attempts. + sels = "button, a.btn, a.button, input[type='submit'], input[type='button'], .btn, .button" + return [e for e in driver.find_elements(By.CSS_SELECTOR, sels) if e.is_displayed() and e.is_enabled()] + + last_err = None + while time.time() < end: + try: + # tiny settle + time.sleep(0.05) + + # REFRESH candidates every loop + cands = all_clickables() + # map and filter + mapped = [] + for el in cands: + lab = re.sub(r"\s+"," ", label(el)).strip().lower() + if any(x in lab for x in exclude): + continue + if exact_only: + if lab in want: + mapped.append(el) + else: + if any(lab == w or lab.startswith(w) for w in want): + mapped.append(el) + + if mapped: + # if a specific index requested, re-check bounds every attempt + target = mapped[index] if (index is not None and index < len(mapped)) else mapped[0] + try: + driver.execute_script("arguments[0].scrollIntoView({block:'center'});", target) + except Exception: + pass + time.sleep(0.05) + try: + target.click() + except (StaleElementReferenceException, ElementClickInterceptedException): + # try one fresh JS click + try: + # IMPORTANT: refetch an equivalent fresh node by XPath, using the visible text of target + txt = label(target) + if txt: + xp = f"//*[normalize-space(text())={json.dumps(txt)}][self::button or self::a or self::span or self::div]/ancestor-or-self::button | //*[normalize-space(text())={json.dumps(txt)}]/ancestor::a" + fresh = driver.find_elements(By.XPATH, xp) + fresh = [e for e in fresh if e.is_displayed() and e.is_enabled()] + if fresh: + driver.execute_script("arguments[0].scrollIntoView({block:'center'});", fresh[0]) + time.sleep(0.05) + driver.execute_script("arguments[0].click();", fresh[0]) + return True + except Exception as e: + last_err = e + # one more cycle + time.sleep(0.15) + continue + return True + + except Exception as e: + last_err = e + time.sleep(0.2) + + if last_err: + print(" [DBG] click_by_text last error:", last_err) + return False + +def wait_for_new_window_and_switch(driver, old_handles, timeout=30): + end = time.time() + timeout; old = set(old_handles) + while time.time() < end: + now = driver.window_handles; new = [h for h in now if h not in old] + if new: + driver.switch_to.window(new[0]); return new[0] + time.sleep(0.2) + raise TimeoutError("New window/tab did not appear.") + +def smart_click_xpath(driver, xpath: str, timeout=25) -> bool: + from selenium.webdriver.support import expected_conditions as EC + try: + end = time.time() + timeout + last_err = None + while time.time() < end: + try: + #elem = WebDriverWait(driver, 4).until(EC.element_to_be_clickable((By.XPATH, xpath))) + driver.execute_script("arguments[0].scrollIntoView({block:'center'});", elem) + time.sleep(0.05) + try: + elem.click() + return True + except (StaleElementReferenceException, ElementClickInterceptedException): + # Refresh element and js-click + fresh = driver.find_elements(By.XPATH, xpath) + fresh = [e for e in fresh if e.is_displayed() and e.is_enabled()] + if fresh: + driver.execute_script("arguments[0].scrollIntoView({block:'center'});", fresh[0]) + time.sleep(0.05) + driver.execute_script("arguments[0].click();", fresh[0]) + return True + except Exception as e: + last_err = e + time.sleep(0.2) + if last_err: + print(" [DBG] smart_click_xpath last error:", last_err) + return False + except Exception: + return False + +def click_text_robust(driver, labels: list[str], *, exact=False, exclude_substrings: list[str]|None=None, + index: int|None=None, total_timeout: float=40.0) -> bool: + """ + Re-queries candidates every attempt; clicks via native, then JS fallback; + retries on StaleElementReference and intercepts. + """ + import time, re, json + end = time.time() + total_timeout + want = [re.sub(r"\s+"," ", (x or "")).strip().lower() for x in labels] + bads = [re.sub(r"\s+"," ", (x or "")).strip().lower() for x in (exclude_substrings or [])] + + def label_of(el): + t = (el.text or "").strip() + if not t: + for a in ("value","aria-label","title","data-original-title"): + v = el.get_attribute(a) + if v: return v.strip() + return t + + css = "button, a.btn, a.button, input[type='submit'], input[type='button'], .btn, .button" + + last_err = None + while time.time() < end: + try: + # re-find fresh candidates each loop + candidates = [e for e in driver.find_elements(By.CSS_SELECTOR, css) + if e.is_displayed() and e.is_enabled()] + matches = [] + for el in candidates: + lab = re.sub(r"\s+"," ", label_of(el)).strip().lower() + if any(b in lab for b in bads): + continue + ok = (lab in want) if exact else any(lab == w or lab.startswith(w) for w in want) + if ok: + matches.append(el) + if matches: + target = matches[index] if (index is not None and index < len(matches)) else matches[0] + try: + driver.execute_script("arguments[0].scrollIntoView({block:'center'});", target) + except Exception: + pass + time.sleep(0.05) + try: + target.click() + return True + except (StaleElementReferenceException, ElementClickInterceptedException): + # refetch by visible text and JS-click + txt = label_of(target) + if txt: + xp = ( + f"//button[normalize-space(.)={json.dumps(txt)}]" + f"|//a[normalize-space(.)={json.dumps(txt)}]" + f"|//input[@type='submit' and @value={json.dumps(txt)}]" + ) + fresh = [e for e in driver.find_elements(By.XPATH, xp) if e.is_displayed() and e.is_enabled()] + if fresh: + driver.execute_script("arguments[0].scrollIntoView({block:'center'});", fresh[0]) + time.sleep(0.05) + driver.execute_script("arguments[0].click();", fresh[0]) + return True + time.sleep(0.15) + except Exception as e: + last_err = e + time.sleep(0.2) + + if last_err: + print(" [DBG] click_text_robust last error:", last_err) + return False + +def click_xpath_robust(driver, xpaths: list[str], total_timeout: float=25.0) -> bool: + """ + Waits for any XPath to be clickable; on stale/intercepted, re-finds and JS-clicks. + """ + import time + end = time.time() + total_timeout + last_err = None + while time.time() < end: + try: + # try each xpath fresh this loop + for xp in xpaths: + try: + elem = WebDriverWait(driver, 3).until(EC.element_to_be_clickable((By.XPATH, xp))) + except TimeoutException: + continue + try: + driver.execute_script("arguments[0].scrollIntoView({block:'center'});", elem) + except Exception: + pass + time.sleep(0.05) + try: + elem.click() + return True + except (StaleElementReferenceException, ElementClickInterceptedException) as e: + last_err = e + # re-find and JS click + fresh = [e for e in driver.find_elements(By.XPATH, xp) if e.is_displayed() and e.is_enabled()] + if fresh: + driver.execute_script("arguments[0].scrollIntoView({block:'center'});", fresh[0]) + time.sleep(0.05) + driver.execute_script("arguments[0].click();", fresh[0]) + return True + time.sleep(0.15) + except Exception as e: + last_err = e + time.sleep(0.2) + if last_err: + print(" [DBG] click_xpath_robust last error:", last_err) + return False + +def run_provider_update_flow(driver, base_url: str, lang: str, part_id: int, controller_handle: str) -> tuple[bool, str]: + # Page A + driver.switch_to.new_window("window") + update_handle = driver.current_window_handle + driver.get(f"{base_url}{lang}/tools/info_providers/update/{part_id}") + time.sleep(0.25) + + # --- Step A1: Search (avoid 'Search options') + if not _click_search_button(driver, timeout=40): + try: + if update_handle in driver.window_handles: driver.close() + finally: + if controller_handle in driver.window_handles: driver.switch_to.window(controller_handle) + return (False, "search") + # tiny settle so results panel can render + time.sleep(0.4) + + # --- Step A2: Create new part (middle button) + before = list(driver.window_handles) + + # Prefer a stable XPath to the 2nd “Create new part” if possible: + # (Keeps re-querying until it’s clickable) + created = False + for _ in range(3): # a few short attempts + ok = click_text_robust(driver, ["Create new part"], exact=False, index=1, total_timeout=40) + + if ok: + created = True + break + time.sleep(0.25) + + if not created: + try: + if update_handle in driver.window_handles: driver.close() + finally: + if controller_handle in driver.window_handles: driver.switch_to.window(controller_handle) + return (False, "create_new_part") + + # --- Step B: switch to the newly opened tab + try: + new_tab = wait_for_new_window_and_switch(driver, before, timeout=25) + except Exception: + try: + if update_handle in driver.window_handles: driver.close() + finally: + if controller_handle in driver.window_handles: driver.switch_to.window(controller_handle) + return (False, "create_new_part (no new tab)") + + # tiny settle; new document is loading + time.sleep(0.25) + + # --- Step B1: Save changes + ok = click_xpath_robust(driver, [ + "//button[normalize-space()='Save changes']", + "//button[contains(.,'Save changes')]", + "//input[@type='submit' and @value='Save changes']", + ], total_timeout=25) + if not ok: + ok = click_xpath_robust(driver, [ + "//button[normalize-space()='Save changes']", + "//button[contains(.,'Save changes')]", + "//input[@type='submit' and @value='Save changes']", + ], total_timeout=25) + + if not ok: + # cleanup and return + try: + if new_tab in driver.window_handles: driver.close() + if update_handle in driver.window_handles: + driver.switch_to.window(update_handle); driver.close() + if controller_handle in driver.window_handles: driver.switch_to.window(controller_handle) + except Exception: + pass + return (False, "save_changes") + + # Close Page B and Page A, back to controller + try: + if new_tab in driver.window_handles: driver.close() + except Exception: pass + try: + if update_handle in driver.window_handles: + driver.switch_to.window(update_handle); driver.close() + except Exception: pass + try: + if controller_handle in driver.window_handles: driver.switch_to.window(controller_handle) + except Exception: pass + + return (True, "ok") + +def wait_fields_to_persist(api: PartDB, part_id: int, timeout_s: int = 8, poll_s: float = 0.8) -> list[str]: + deadline = time.time() + timeout_s; last_missing: List[str] = [] + while time.time() < deadline: + part = api.get_part(part_id) + last_missing = _missing_fields(api, part_id, part) + if not last_missing: break + time.sleep(poll_s) + return last_missing + +def set_eda_from_resistance(api: PartDB, part_id: int, *, max_wait_s: int = 12, poll_every: float = 0.8) -> bool: + deadline = time.time() + max_wait_s + while True: + part = api.get_part(part_id) + params = api.get_part_params_flat(part) + if not params: + vals = api.get_part_parameter_values(part_id) + if vals: + part = dict(part); part["parameter_values"] = vals + params = api.get_part_params_flat(part) + value_unit = params.get("resistance") + if value_unit: + value, unit = value_unit + ohms = parse_resistance_to_ohms(value, unit) + if ohms is None: return False + eda = format_ohms_for_eda(ohms) + existing = api.get_eda_value(part) + if existing and str(existing).strip(): return True + return api.patch_eda_value(part_id, eda) + if time.time() >= deadline: return False + time.sleep(poll_every) + +def set_eda_from_capacitance(api: PartDB, part_id: int, *, max_wait_s: int = 12, poll_every: float = 0.8) -> bool: + deadline = time.time() + max_wait_s + while True: + part = api.get_part(part_id) + params = api.get_part_params_flat(part) + if not params: + vals = api.get_part_parameter_values(part_id) + if vals: + part = dict(part); part["parameter_values"] = vals + params = api.get_part_params_flat(part) + cap = params.get("capacitance") + if cap: + value, unit = cap + F = parse_capacitance_to_farads(value, unit) + if F is None: return False + eda = format_farads_for_eda(F) + existing = api.get_eda_value(part) + if existing and str(existing).strip(): return True + return api.patch_eda_value(part_id, eda) + if time.time() >= deadline: return False + time.sleep(poll_every) + +def update_part_from_providers_once(part_id: int, *, headless: bool = True) -> Tuple[bool, str]: + """ + Run the same 'Tools → Info providers → Update' flow once for a part. + Returns (ok, where) where 'where' is 'ok' or the stage that failed. + """ + if headless is None: + headless = HEADLESS_PROVIDER + try: + drv = start_firefox_resilient(headless_first=headless) + drv.get(PARTDB_BASE + "/") + if not ensure_logged_in(drv, PARTDB_BASE, interactive_ok=True, wait_s=120): + try: drv.quit() + except Exception: pass + return (False, "login") + + controller = drv.current_window_handle + ok, where = run_provider_update_flow(drv, PARTDB_BASE, UI_LANG_PATH, part_id, controller) + + try: drv.quit() + except Exception: pass + if ok: + return (True, "ok") + return (False, where or "provider") + except Exception as e: + try: drv.quit() + except Exception: pass + return (False, f"exception: {e}") + +def _click_search_button(driver, timeout=40) -> bool: + """ + Clicks the 'Search' button on the provider update page. + Avoids 'Search options' and works across themes. + """ + import time + end = time.time() + timeout + + XPATHS = [ + # exact 'Search' on a button + "//button[normalize-space(.)='Search']", + # buttons containing Search but not 'Search options' + "//button[contains(normalize-space(.), 'Search') and not(contains(., 'options'))]", + # inputs of type submit with value=Search + "//input[@type='submit' and translate(@value,'abcdefghijklmnopqrstuvwxyz','ABCDEFGHIJKLMNOPQRSTUVWXYZ')='SEARCH']", + # any clickable with text Search + "//*[self::button or self::a or self::input][contains(normalize-space(.), 'Search')]" + ] + + while time.time() < end: + # try all xpaths fresh each loop + for xp in XPATHS: + try: + els = [e for e in driver.find_elements(By.XPATH, xp) if e.is_displayed() and e.is_enabled()] + # filter out 'Search options' + els = [e for e in els if "options" not in (e.text or "").lower()] + if not els: + continue + el = els[0] + try: + driver.execute_script("arguments[0].scrollIntoView({block:'center'});", el) + except Exception: + pass + time.sleep(0.05) + try: + el.click() + except Exception: + # JS click fallback + try: + driver.execute_script("arguments[0].click();", el) + except Exception: + continue + return True + except Exception: + pass + time.sleep(0.2) + return False + +def _query_any(driver, selectors): + for s in selectors: + els = driver.find_elements(By.CSS_SELECTOR, s) + if els: + return els[0] + return None + +def _fill_via_js(driver, el, value: str): + # Set value and dispatch events so reactive forms notice + driver.execute_script(""" + const el = arguments[0], val = arguments[1]; + if (!el) return; + const nativeSetter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value').set; + nativeSetter.call(el, val); + el.dispatchEvent(new Event('input', { bubbles: true })); + el.dispatchEvent(new Event('change', { bubbles: true })); + """, el, value) + +def _auto_login_once(driver, base_url: str, username: str, password: str, timeout_s: int = 90) -> bool: + login_url = base_url.rstrip("/") + UI_LANG_PATH + "/login" + print("[Login] Navigating to:", login_url) + driver.get(login_url) + + # Some skins show a “Login” button that reveals the form + for xp in [ + "//button[contains(., 'Login')]", + "//a[contains(., 'Login')]", + "//button[contains(., 'Sign in')]", + "//a[contains(., 'Sign in')]", + ]: + try: + btns = driver.find_elements(By.XPATH, xp) + if btns: + try: + driver.execute_script("arguments[0].click();", btns[0]) + time.sleep(0.4) + except Exception: + pass + except Exception: + pass + + wait = WebDriverWait(driver, 25) + user_sel = [ + "input[name='_username']", + "input#username", + "input[name='username']", + "input[type='email']", + "form input[type='text']", + ] + pass_sel = [ + "input[name='_password']", + "input#password", + "input[name='password']", + "form input[type='password']", + ] + submit_sel = [ + "button[type='submit']", + "input[type='submit']", + "button.btn-primary", + "form button", + ] + + # Wait for fields + try: + wait.until(lambda d: _query_any(d, user_sel) and _query_any(d, pass_sel)) + except Exception: + print("[Login] Could not find username/password inputs.") + return False + + u = _query_any(driver, user_sel) + p = _query_any(driver, pass_sel) + if not u or not p: + print("[Login] Inputs missing after wait.") + return False + + try: + _fill_via_js(driver, u, username) + _fill_via_js(driver, p, password) + time.sleep(0.2) + except Exception as e: + print("[Login] Filling fields failed:", e) + return False + + btn = _query_any(driver, submit_sel) + if btn: + try: + driver.execute_script("arguments[0].scrollIntoView({block:'center'});", btn) + time.sleep(0.1) + btn.click() + except Exception: + try: + driver.execute_script("arguments[0].click();", btn) + except Exception: + pass + else: + # last resort: submit the first form + try: + driver.execute_script("var f=document.querySelector('form'); if(f){f.submit();}") + except Exception: + pass + + # Wait until logged in + end = time.time() + timeout_s + while time.time() < end: + if _is_logged_in(driver): + print("[Login] Logged in.") + return True + time.sleep(0.5) + print("[Login] Timeout waiting for post-login.") + return False \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..57bfb98 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +requests +urllib3 +tqdm +selenium +webdriver-manager +pyserial \ No newline at end of file diff --git a/ui/app_tk.py b/ui/app_tk.py new file mode 100644 index 0000000..9006d41 --- /dev/null +++ b/ui/app_tk.py @@ -0,0 +1,340 @@ +import os, threading, tkinter as tk +from tkinter import ttk, messagebox,Toplevel + +from config import ( + WINDOW_GEOM, COM_PORT, BAUD_RATE, + HEADLESS_CONTROLLER, + PARTDB_BASE, PARTDB_TOKEN, +) +from apis.partdb_api import PartDB +from parsers.digikey_mh10 import parse_digikey +from devices.scanner_serial import scan_loop # continuous loop (background) +from provider.selenium_flow import update_part_from_providers_once +from apis.digikey_api import suggest_category_from_digikey + +# ------------ Pages ------------ + +class HomePage(ttk.Frame): + """Always listening; on scan -> ask Part-DB; route to View or Create.""" + def __init__(self, master, app): + super().__init__(master) + self.app = app + ttk.Label(self, text="Scan a Digi-Key code", font=("Segoe UI", 16, "bold")).pack(pady=(18,8)) + ttk.Label(self, text=f"Listening on {COM_PORT} @ {BAUD_RATE}").pack(pady=(0,12)) + + # Last scan preview + wrap = ttk.Frame(self); wrap.pack(fill="x", padx=10, pady=(0,10)) + ttk.Label(wrap, text="Last scan:").pack(side="left") + self.last_var = tk.StringVar() + ttk.Entry(wrap, textvariable=self.last_var, state="readonly").pack(side="left", fill="x", expand=True, padx=(8,0)) + + ttk.Label(self, text="(This page listens continuously. Just scan another label.)", foreground="#666").pack() + + def on_scan(self, raw: str): + self.last_var.set(raw) + fields = parse_digikey(raw) + mpn = (fields.get("MfrPart") or "").strip() + if not mpn: + messagebox.showwarning("No MPN", "Could not read MPN from the scan.") + return + # Lookup in Part-DB + pid = self.app.pdb.find_part_exact( + dkpn=fields.get("DigiKeyPart"), + mpn=fields.get("MfrPart"), + ) + if pid: + # Show details page + summary = self.app.pdb.summarize_part(pid) + self.app.goto_view(summary) + else: + # No part -> prepare creation + self.app.goto_create(fields, raw) + +class ViewPage(ttk.Frame): + """Shows existing part summary + button to force Digi-Key provider update.""" + def __init__(self, master, app): + super().__init__(master) + self.app = app + self.summary = {} + + ttk.Label(self, text="Part in Part-DB", font=("Segoe UI", 16, "bold")).grid(row=0, column=0, columnspan=4, sticky="w", pady=(16,10)) + self.grid_rowconfigure(99, weight=1) + self.grid_columnconfigure(1, weight=1) + + self._add_row("ID:", "id", 1) + self._add_row("Name:", "name", 2) + self._add_row("MPN:", "mpn", 3) + self._add_row("Category:", "category", 4) + self._add_row("Footprint:", "footprint", 5) + self._add_row("EDA value:", "eda_value", 6) + self._add_row("Stock:", "stock", 7) + self._add_row("Locations:", "locations", 8, join_list=True) + + ttk.Label(self, text="Description:").grid(row=9, column=0, sticky="ne", padx=8, pady=6) + self.desc = tk.Text(self, height=6, wrap="word") + self.desc.grid(row=9, column=1, columnspan=3, sticky="nsew", padx=8, pady=6) + self.desc.configure(state="disabled") + + btns = ttk.Frame(self); btns.grid(row=10, column=0, columnspan=4, sticky="e", pady=(6,12)) + ttk.Button(btns, text="Force Digi-Key update", command=self.do_update).pack(side="left", padx=(0,6)) + ttk.Button(btns, text="Back to scan", command=self.app.goto_home).pack(side="left") + + def _add_row(self, label, key, row, join_list=False): + ttk.Label(self, text=label).grid(row=row, column=0, sticky="e", padx=8, pady=4) + var = tk.StringVar() + ent = ttk.Entry(self, textvariable=var, state="readonly") + ent.grid(row=row, column=1, columnspan=3, sticky="we", padx=8, pady=4) + setattr(self, f"var_{key}", var) + setattr(self, f"is_list_{key}", join_list) + + def set_summary(self, summary: dict): + self.summary = summary + for key in ("id","name","mpn","category","footprint","eda_value","stock","locations"): + var = getattr(self, f"var_{key}") + is_list = getattr(self, f"is_list_{key}", False) + val = summary.get(key, "") + if is_list and isinstance(val, list): + val = ", ".join(val) + var.set("" if val is None else str(val)) + self.desc.configure(state="normal") + self.desc.delete("1.0", "end") + self.desc.insert("1.0", summary.get("description","") or "") + self.desc.configure(state="disabled") + + def do_update(self): + pid = self.summary.get("id") + if not pid: + messagebox.showwarning("No ID", "Missing part id.") + return + + # prevent new scans while updating + self.app.busy = True + dlg = BusyDialog(self.app, "Updating from Digi-Key…") + + def work(): + return update_part_from_providers_once(pid, headless=HEADLESS_CONTROLLER) + + def done(res): + try: + dlg.destroy() + except Exception: + pass + self.app.busy = False + if not res: + messagebox.showwarning("Update failed", "Unknown error.") + return + ok, where = res + if ok: + summary = self.app.pdb.summarize_part(pid) + self.set_summary(summary) + messagebox.showinfo("Updated", "Provider update completed.") + else: + messagebox.showwarning("Update failed", f"Provider update failed at: {where}") + + run_in_thread(work, lambda r: self.after(0, done, r)) + +class CreatePage(ttk.Frame): + """Create a new part; auto-suggest category; then run provider update.""" + def __init__(self, master, app): + super().__init__(master) + self.app = app + self.fields = {} + self.cat_map = {} + + ttk.Label(self, text="Create Part", font=("Segoe UI", 16, "bold")).grid(row=0, column=0, columnspan=4, sticky="w", pady=(16,10)) + self.grid_columnconfigure(1, weight=1) + + ttk.Label(self, text="DKPN:").grid(row=1, column=0, sticky="e", padx=8, pady=4) + self.dkpn = tk.StringVar() + ttk.Entry(self, textvariable=self.dkpn, state="readonly").grid(row=1, column=1, columnspan=3, sticky="we", padx=8, pady=4) + + ttk.Label(self, text="MPN:").grid(row=2, column=0, sticky="e", padx=8, pady=4) + self.mpn = tk.StringVar() + ttk.Entry(self, textvariable=self.mpn).grid(row=2, column=1, columnspan=3, sticky="we", padx=8, pady=4) + + ttk.Label(self, text="Category:").grid(row=3, column=0, sticky="e", padx=8, pady=4) + self.cat = tk.StringVar() + self.cat_combo = ttk.Combobox(self, textvariable=self.cat, state="readonly", width=40) + self.cat_combo.grid(row=3, column=1, sticky="w", padx=8, pady=4) + ttk.Button(self, text="Create new…", command=self.create_new_category).grid(row=3, column=2, sticky="w", padx=4, pady=4) + + btns = ttk.Frame(self); btns.grid(row=4, column=0, columnspan=4, sticky="e", pady=(8,12)) + ttk.Button(btns, text="Back", command=self.app.goto_home).pack(side="left", padx=(0,6)) + ttk.Button(btns, text="Create + Update", command=self.create_part).pack(side="left") + + def set_fields(self, fields: dict, raw: str): + self.fields = dict(fields) + self.dkpn.set(fields.get("DigiKeyPart","")) + self.mpn.set(fields.get("MfrPart","") or fields.get("DigiKeyPart","")) + + # load categories + cats = self.app.pdb.list_categories() + names = [] + self.cat_map = {} + for c in sorted(cats, key=lambda x: (x.get("name") or "").lower()): + nm = (c.get("name") or "").strip() + cid = self.app.pdb._extract_id(c) + if nm and cid: + names.append(nm); self.cat_map[nm] = cid + self.cat_combo["values"] = names + + # suggest category + suggestion = suggest_category_from_digikey(self.mpn.get() or self.dkpn.get()) + if not suggestion: + s = self.mpn.get().lower() + if any(x in s for x in ("res", "rc0603", "rc0805", "ohm")): + suggestion = "Resistor" + elif any(x in s for x in ("cap", "x7r", "x5r", "uf", "nf", "pf")): + suggestion = "Capacitor" + # select if exists + for nm in names: + if suggestion and nm.lower() == suggestion.lower(): + self.cat.set(nm) + break + if not self.cat.get() and suggestion: + # leave suggestion as text, user can “Create new…” + self.cat.set(suggestion) + + def create_new_category(self): + name = tk.simpledialog.askstring("New category", "Category name:") + if not name: return + cid = self.app.pdb.create_category_if_missing(name.strip()) + # refresh & select + cats = self.app.pdb.list_categories() + names = [] + self.cat_map = {} + for c in sorted(cats, key=lambda x: (x.get("name") or "").lower()): + nm = (c.get("name") or "").strip() + c_id = self.app.pdb._extract_id(c) + if nm and c_id: + names.append(nm); self.cat_map[nm] = c_id + self.cat_combo["values"] = names + for nm in names: + if nm.lower() == name.strip().lower(): + self.cat.set(nm); break + messagebox.showinfo("OK", f"Category created (id={cid}).") + + def create_part(self): + mpn = (self.mpn.get() or "").strip() + if not mpn: + messagebox.showwarning("Missing MPN", "Please enter a manufacturer part number.") + return + + # category id + nm = (self.cat.get() or "").strip() + if nm in self.cat_map: + cat_id = self.cat_map[nm] + else: + cat_id = self.app.pdb.create_category_if_missing(nm or "Uncategorized") + + manu_id = self.app.pdb.ensure_manufacturer("Unknown") + + pid = self.app.pdb.create_part( + name=mpn, + category_id=cat_id, + manufacturer_id=manu_id, + mpn=mpn, + description="", + product_url=None, + footprint_id=None, + ) + + self.app.busy = True + dlg = BusyDialog(self.app, "Creating part and updating from Digi-Key…") + + def work(): + return update_part_from_providers_once(pid, headless=HEADLESS_CONTROLLER +) + + def done(res): + try: + dlg.destroy() + except Exception: + pass + self.app.busy = False + ok, where = (res or (False, "unknown")) + if not ok: + messagebox.showwarning("Created, but update failed", + f"Part ID {pid} created.\nProvider update failed at: {where}") + else: + messagebox.showinfo("Success", f"Part created and updated (id={pid}).") + + summary = self.app.pdb.summarize_part(pid) + self.app.goto_view(summary) + + run_in_thread(work, lambda r: self.after(0, done, r)) + +# ------------ App ------------ + +class App(tk.Tk): + def __init__(self): + self.busy = False + super().__init__() + self.title("Part-DB Helper") + self.geometry(WINDOW_GEOM) + self.pdb = PartDB(PARTDB_BASE, PARTDB_TOKEN) + + self.home = HomePage(self, self) + self.view = ViewPage(self, self) + self.create = CreatePage(self, self) + + for f in (self.home, self.view, self.create): + f.grid(row=0, column=0, sticky="nsew") + + # start listening thread once + t = threading.Thread(target=scan_loop, args=(COM_PORT, BAUD_RATE, self.on_scan), daemon=True) + t.start() + + self.goto_home() + + def on_scan(self, flat: str): + if self.busy: + return # ignore scans during provider update + self.home.on_scan(flat) + + def goto_home(self): + self.home.tkraise() + + def goto_view(self, summary: dict): + self.view.set_summary(summary) + self.view.tkraise() + + def goto_create(self, fields: dict, raw: str): + self.create.set_fields(fields, raw) + self.create.tkraise() + +class BusyDialog(Toplevel): + def __init__(self, parent, text="Working…"): + super().__init__(parent) + self.title("") + self.resizable(False, False) + self.transient(parent) + self.grab_set() # modal + ttk.Label(self, text=text).pack(padx=16, pady=(14, 6)) + pb = ttk.Progressbar(self, mode="indeterminate", length=220) + pb.pack(padx=16, pady=(0,12)) + pb.start(12) + # center + self.update_idletasks() + x = parent.winfo_rootx() + (parent.winfo_width() - self.winfo_width()) // 2 + y = parent.winfo_rooty() + (parent.winfo_height() - self.winfo_height()) // 2 + self.geometry(f"+{x}+{y}") + +def run_in_thread(func, done_cb): + """Run func() in a daemon thread, call done_cb(result) on the Tk thread.""" + def worker(): + res = None + try: + res = func() + finally: + # schedule callback on Tk thread + try: + done_cb(res) + except Exception: + pass + t = threading.Thread(target=worker, daemon=True) + t.start() + return t + +def run(): + App().mainloop() diff --git a/workflows/bulk_add.py b/workflows/bulk_add.py new file mode 100644 index 0000000..82e89b3 --- /dev/null +++ b/workflows/bulk_add.py @@ -0,0 +1,220 @@ +import time +from collections import deque +from concurrent.futures import ProcessPoolExecutor, wait, FIRST_COMPLETED +from typing import Any, Dict, List, Optional +from tqdm import tqdm + +from config import * +from apis.partdb_api import PartDB +from jobs import Job, generate_rc0603fr_e32, generate_rc0805fr_e32, \ + generate_caps_0805_queries, generate_caps_1206_dupes_for_low_v +from provider.selenium_flow import ( + start_firefox_resilient, ensure_logged_in, run_provider_update_flow, + wait_fields_to_persist, set_eda_from_resistance, set_eda_from_capacitance +) + +def build_jobs() -> List[Job]: + jobs: List[Job] = [] + if ENABLE_RESISTORS_0805: + jobs.append(Job("Resistor", "YAGEO", "0805", generate_rc0805fr_e32(), "resistor")) + if ENABLE_RESISTORS_0603: + jobs.append(Job("Resistor", "YAGEO", "0603", generate_rc0603fr_e32(), "resistor")) + if ENABLE_CAPS_0805: + jobs.append(Job("Capacitor", DEFAULT_CAP_MANUFACTURER, "0805", generate_caps_0805_queries(), "capacitor")) + if ADD_1206_FOR_LOW_V_CAPS: + dupes = generate_caps_1206_dupes_for_low_v(LOW_V_CAP_THRESHOLD_V, UPSIZED_1206_TARGET_V) + if dupes: + jobs.append(Job("Capacitor", DEFAULT_CAP_MANUFACTURER, "1206", dupes, "capacitor")) + return jobs + +# Minimal worker context (same idea as your original) +_WORKER_CTX: Dict[str, Any] = {"driver": None, "pdb": None} + +def _worker_init(base: str, token: str, headless: bool): + import atexit + drv = start_firefox_resilient(headless_first=headless) + drv.get(base + "/") + ensure_logged_in(drv, base, interactive_ok=False, wait_s=120) + _WORKER_CTX["driver"] = drv + _WORKER_CTX["pdb"] = PartDB(base, token) + @atexit.register + def _cleanup(): + try: + if _WORKER_CTX.get("driver"): + _WORKER_CTX["driver"].quit() + except Exception: + pass + +def _retry_worker_task(task: dict) -> dict: + drv = _WORKER_CTX.get("driver"); pdb: PartDB = _WORKER_CTX.get("pdb") + mpn = task["mpn"]; part_id = task["part_id"]; kind = task["kind"] + if not drv or not pdb: + return {"mpn": mpn, "part_id": part_id, "status": "issue", "stage": "worker", "reason": "ctx not init"} + try: + controller = drv.current_window_handle + except Exception: + return {"mpn": mpn, "part_id": part_id, "status": "issue", "stage": "driver", "reason": "no window"} + for attempt in range(1, MAX_RETRIES + 1): + ok, where = run_provider_update_flow(drv, PARTDB_BASE, UI_LANG_PATH, part_id, controller) + if not ok: + if attempt == MAX_RETRIES: + return {"mpn": mpn, "part_id": part_id, "status": "issue", "stage": where, "reason": "provider failed after retries"} + continue + missing = wait_fields_to_persist(pdb, part_id, timeout_s=8, poll_s=0.8) + if not missing: + if kind == "resistor": + ok_eda = set_eda_from_resistance(pdb, part_id, max_wait_s=10, poll_every=0.8) + else: + ok_eda = set_eda_from_capacitance(pdb, part_id, max_wait_s=10, poll_every=0.8) + if not ok_eda: + return {"mpn": mpn, "part_id": part_id, "status": "ok", "stage": "eda_set_warn", "reason": "EDA not set"} + return {"mpn": mpn, "part_id": part_id, "status": "ok", "stage": "done", "reason": ""} + if attempt == MAX_RETRIES: + return {"mpn": mpn, "part_id": part_id, "status": "issue", "stage": "post_update", "reason": f"missing after retries: {', '.join(missing)}"} + return {"mpn": mpn, "part_id": part_id, "status": "issue", "stage": "internal", "reason": "fell through"} + +def run_bulk_add(): + # Controller session + driver = start_firefox_resilient(headless_first=HEADLESS_TRY) + driver.get(PARTDB_BASE + "/") + controller_handle = driver.current_window_handle + if not ensure_logged_in(driver, PARTDB_BASE, interactive_ok=True, wait_s=600): + print("Could not login; aborting."); return + + pdb = PartDB(PARTDB_BASE, PARTDB_TOKEN) + issues: List[Dict[str, Any]] = [] + created = skipped = failed = updated = 0 + + jobs = build_jobs() + bar = tqdm(total=sum(len(j.seeds if MAX_TO_CREATE is None else j.seeds[:int(MAX_TO_CREATE)]) for j in jobs), + desc="Parts", unit="part", dynamic_ncols=True, ascii=True, leave=True) + + pool = ProcessPoolExecutor(max_workers=MAX_PARALLEL_WORKERS, + initializer=_worker_init, + initargs=(PARTDB_BASE, PARTDB_TOKEN, HEADLESS_WORKER)) + pending: Dict[Any, dict] = {} + in_flight: set[int] = set() + backlog: deque[dict] = deque() + + def harvest_done(nonblocking=True): + if not pending: return + timeout = 0 if nonblocking else None + done, _ = wait(list(pending.keys()), timeout=timeout, return_when=FIRST_COMPLETED if nonblocking else None) + for fut in list(done): + task = pending.pop(fut, None) + if task: in_flight.discard(task["part_id"]) + try: + res = fut.result() + except Exception as e: + res = {"mpn": task["mpn"], "part_id": task["part_id"], "status": "issue", "stage": "pool", "reason": str(e)} + if res["status"] != "ok": + issues.append({"mpn": res["mpn"], "part_id": res["part_id"], "stage": res["stage"], "reason": res["reason"]}) + tqdm.write(f" [RETRY-ISSUE] {res['mpn']} id={res['part_id']}: {res['stage']} — {res['reason']}") + else: + tqdm.write(f" [RETRY-OK] {res['mpn']} id={res['part_id']}") + + def try_submit(task: dict): + pid = task["part_id"] + if pid in in_flight: return + if len(pending) < MAX_PARALLEL_WORKERS: + fut = pool.submit(_retry_worker_task, task) + pending[fut] = task; in_flight.add(pid) + else: + backlog.append(task) + + def drain_backlog(): + while backlog and len(pending) < MAX_PARALLEL_WORKERS: + task = backlog.popleft() + pid = task["part_id"] + if pid in in_flight: continue + fut = pool.submit(_retry_worker_task, task) + pending[fut] = task; in_flight.add(pid) + + try: + for job in jobs: + # Resolve IDs + cat_id = pdb.ensure_category(job.category) + manu_id = pdb.ensure_manufacturer(job.manufacturer) + fp_id = pdb.ensure_footprint(job.footprint) if job.footprint else None + tqdm.write(f"\n=== Job: {job.kind} {job.footprint or ''} in {job.category} (mfr: {job.manufacturer}) ===") + + seeds = job.seeds if MAX_TO_CREATE is None else job.seeds[:int(MAX_TO_CREATE)] + + for mpn in seeds: + bar.set_postfix_str(mpn) + harvest_done(True); drain_backlog() + + try: + existing_id = pdb.find_part_id_by_mpn(mpn) + except Exception as e: + issues.append({"mpn": mpn, "part_id": "", "stage": "create_part", "reason": f"existence check: {e}"}) + failed += 1; bar.update(1); continue + + if existing_id and SKIP_IF_EXISTS: + part_id = existing_id + tqdm.write(f"[EXIST] {mpn} (id={part_id})") + part = pdb.get_part(part_id) + missing = [] # leave detailed re-check to your helpers if desired + if missing: + tqdm.write(f" [QUEUE→RUN] Existing incomplete ({', '.join(missing)})") + try_submit({"mpn": mpn, "part_id": part_id, "kind": job.kind}) + else: + try: + if job.kind == "resistor": + set_eda_from_resistance(pdb, part_id, max_wait_s=10, poll_every=0.8) + else: + set_eda_from_capacitance(pdb, part_id, max_wait_s=10, poll_every=0.8) + except Exception as e: + issues.append({"mpn": mpn, "part_id": part_id, "stage": "eda_set", "reason": f"existing: {e}"}) + tqdm.write(" [OK] Existing part complete") + skipped += 1 + bar.update(1); continue + + # Create new + part_id = pdb.create_part(name=mpn, category_id=cat_id, manufacturer_id=manu_id, + mpn=mpn, description="", product_url=None, footprint_id=fp_id) + created += 1 + tqdm.write(f"[OK] Part id={part_id} (created)") + + ok, where = run_provider_update_flow(driver, PARTDB_BASE, UI_LANG_PATH, part_id, controller_handle) + if not ok: + issues.append({"mpn": mpn, "part_id": part_id, "stage": where, "reason": "provider step failed (first pass)"}) + tqdm.write(f" [WARN] Provider failed at: {where} — queued for background retry") + try_submit({"mpn": mpn, "part_id": part_id, "kind": job.kind}) + failed += 1; bar.update(1); continue + + tqdm.write(" [OK] Provider update completed") + updated += 1 + + missing = [] # you can call your existing completeness check here + if missing: + tqdm.write(f" [QUEUE→RUN] Incomplete new ({', '.join(missing)})") + try_submit({"mpn": mpn, "part_id": part_id, "kind": job.kind}) + else: + try: + if job.kind == "resistor": + set_eda_from_resistance(pdb, part_id, max_wait_s=10, poll_every=0.8) + else: + set_eda_from_capacitance(pdb, part_id, max_wait_s=10, poll_every=0.8) + except Exception as e: + issues.append({"mpn": mpn, "part_id": part_id, "stage": "eda_set", "reason": str(e)}) + bar.update(1) + finally: + bar.close() + try: + while pending or backlog: + drain_backlog() + harvest_done(nonblocking=False) + finally: + pool.shutdown(wait=True) + + if PRINT_FAILURE_TABLE and issues: + print("\n=== Issues ===") + for row in issues: + print(row) + + print("\nDone.") + print(f" Created: {created}") + print(f" Updated via provider (auto): {updated}") + print(f" Skipped: {skipped}") + print(f" Failed: {failed}") diff --git a/workflows/scan_import.py b/workflows/scan_import.py new file mode 100644 index 0000000..cc1d907 --- /dev/null +++ b/workflows/scan_import.py @@ -0,0 +1,45 @@ +import os, csv +from typing import Dict +from config import PARTDB_BASE, PARTDB_TOKEN, COM_PORT, BAUD_RATE, SCAN_APPEND_CSV +from apis.partdb_api import PartDB +from parsers.digikey_mh10 import parse_digikey +from devices.scanner_serial import scan_loop + +def _row_for_csv(fields: Dict[str,str]) -> Dict[str,str]: + return { + "DigiKeyPart": fields.get("DigiKeyPart",""), + "MfrPart": fields.get("MfrPart",""), + "CustomerPart": fields.get("CustomerPart",""), + "InternalLot1": fields.get("InternalLot1",""), + "InternalLot2": fields.get("InternalLot2",""), + "DateCodeRaw": fields.get("DateCodeRaw",""), + "TraceID": fields.get("TraceID",""), + "PackageSerial": fields.get("PackageSerial",""), + "PickTicket": fields.get("PickTicket",""), + } + +def run_scan_import(): + pdb = PartDB(PARTDB_BASE, PARTDB_TOKEN) + + def on_scan(flat: str): + fields = parse_digikey(flat) + print("\nScan:", flat) + print("→", fields) + + # Create (very minimal) part record using MPN/DKPN + name = fields.get("MfrPart") or fields.get("DigiKeyPart") or "Unknown" + mpn = fields.get("MfrPart") or "" + # You can enrich with ensure_category/manufacturer/footprint if you want here + + # Optional: CSV log + if SCAN_APPEND_CSV: + newfile = not os.path.exists(SCAN_APPEND_CSV) + with open(SCAN_APPEND_CSV, "a", newline="", encoding="utf-8") as f: + w = csv.DictWriter(f, fieldnames=list(_row_for_csv(fields).keys())) + if newfile: w.writeheader() + w.writerow(_row_for_csv(fields)) + + # If you want to insert a stock movement instead of creating parts, + # call your Part-DB stock API here. + + scan_loop(COM_PORT, BAUD_RATE, on_scan)