import re, requests from typing import Optional, Dict, Any, List from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry class PartDB: def __init__(self, base_url: str, token: str, timeout: int = 30): self.base = base_url.rstrip("/") self.s = requests.Session() self.s.headers.update({"Authorization": f"Bearer {token}", "Accept": "application/json"}) adapter = HTTPAdapter(pool_connections=64, pool_maxsize=64, max_retries=Retry(total=3, backoff_factor=0.3, status_forcelist=(502,503,504))) self.s.mount("http://", adapter); self.s.mount("https://", adapter) self.timeout = timeout def _get(self, path: str, params: Dict[str, Any] | None = None): r = self.s.get(f"{self.base}{path}", params=params, timeout=self.timeout) r.raise_for_status(); return r.json() def _post_try(self, path: str, payload: Dict[str, Any]) -> requests.Response: r = self.s.post(f"{self.base}{path}", json=payload, timeout=self.timeout, headers={"Content-Type":"application/json"}) if r.status_code in (200,201): return r if r.status_code in (400,415): r = self.s.post(f"{self.base}{path}", json=payload, timeout=self.timeout, headers={"Content-Type":"application/vnd.api+json"}) return r def _patch_merge(self, path: str, payload: Dict[str, Any]) -> requests.Response: return self.s.patch(f"{self.base}{path}", json=payload, timeout=self.timeout, headers={"Content-Type":"application/merge-patch+json"}) @staticmethod def _extract_id(obj: Dict[str, Any]) -> Optional[int]: if obj is None: return None if isinstance(obj.get("id"), int): return obj["id"] if isinstance(obj.get("_id"), int): return obj["_id"] raw = obj.get("id") if isinstance(raw, str): digits = "".join(ch for ch in raw if ch.isdigit()) if digits: return int(digits) attrs = obj.get("attributes") or {} if isinstance(attrs.get("_id"), int): return attrs["_id"] return None def ensure_category(self, name: str) -> int: js = self._get("/api/categories", params={"name": name}) if isinstance(js, list) and js: return int(self._extract_id(js[0])) r = self._post_try("/api/categories", {"name": name}); r.raise_for_status() return int(self._extract_id(r.json())) def ensure_manufacturer(self, name: str) -> int: js = self._get("/api/manufacturers", params={"name": name}) if isinstance(js, list) and js: return int(self._extract_id(js[0])) r = self._post_try("/api/manufacturers", {"name": name}); r.raise_for_status() return int(self._extract_id(r.json())) def ensure_footprint(self, name: str) -> int: js = self._get("/api/footprints", params={"name": name}) if isinstance(js, list) and js: return int(self._extract_id(js[0])) r = self._post_try("/api/footprints", {"name": name}); r.raise_for_status() return int(self._extract_id(r.json())) def find_part_id_by_mpn(self, mpn: str) -> Optional[int]: def norm(s: Optional[str]) -> str: import re return re.sub(r"\s+","",(s or "")).strip().lower() want = norm(mpn) def get_field(p: dict, key: str) -> Optional[str]: if key in p: return p.get(key) return (p.get("attributes") or {}).get(key) for params in ( {"manufacturer_product_number": mpn, "per_page": 50}, {"name": mpn, "per_page": 50}, {"search": mpn, "per_page": 50}, ): try: js = self._get("/api/parts", params=params) except Exception: continue if not isinstance(js, list): continue for p in js: if norm(get_field(p,"manufacturer_product_number")) == want or norm(get_field(p,"name")) == want: return self._extract_id(p) return None def create_part(self, *, name: str, category_id: int, manufacturer_id: int, mpn: str, description: str = "", product_url: Optional[str] = None, footprint_id: Optional[int] = None) -> int: payload = { "name": name, "description": description or "", "category": f"/api/categories/{category_id}", "manufacturer": f"/api/manufacturers/{manufacturer_id}", "manufacturer_product_number": mpn, } if product_url: payload["manufacturer_product_url"] = product_url if footprint_id is not None: payload["footprint"] = f"/api/footprints/{footprint_id}" r = self._post_try("/api/parts", payload) if r.status_code not in (200,201): raise RuntimeError(f"Create part failed: {r.status_code} {r.text}") pid = self._extract_id(r.json()) if not pid: raise RuntimeError(f"Create part: no id in response: {r.text}") return int(pid) def get_part(self, part_id: int) -> dict: return self._get(f"/api/parts/{part_id}") def get_part_parameter_values(self, part_id: int): for path in ( f"/api/parts/{part_id}/parameter-values", f"/api/parts/{part_id}/parameter_values", f"/api/parameter-values?filter[part_id]={part_id}", ): try: js = self._get(path) if isinstance(js, list) and js: return js if isinstance(js, dict) and js.get("data"): return js["data"] except Exception: pass return [] def get_part_params_flat(self, part: dict) -> Dict[str, tuple[Optional[str], Optional[str]]]: out: Dict[str, tuple[Optional[str], Optional[str]]] = {} params = part.get("parameters") or part.get("parameter_values") or [] for p in params: name = (p.get("name") or (p.get("parameter") or {}).get("name") or "").strip() if not name: continue raw_val = None for key in ("value_typical","typ","value_text","value","max","min","nominal"): if p.get(key) not in (None,""): raw_val = p.get(key); break u = p.get("unit"); unit = (u.get("symbol") or u.get("name")) if isinstance(u, dict) else u if raw_val in (None,"") and p.get("formatted"): raw_val = p["formatted"]; unit = None out[name.lower()] = (None if raw_val in (None,"") else str(raw_val), unit) return out def get_eda_value(self, part: dict): eda = part.get("eda_info") or part.get("eda") if isinstance(eda, dict): return eda.get("value") return part.get("eda_value") or part.get("edaValue") def patch_eda_value(self, part_id: int, eda_value: str) -> bool: for payload in ({"eda_info":{"value":eda_value}}, {"eda":{"value":eda_value}}, {"eda_value":eda_value}): r = self._patch_merge(f"/api/parts/{part_id}", payload) if 200 <= r.status_code < 300: return True payload = {"data":{"type":"parts","id":str(part_id),"attributes":{"eda_value":eda_value}}} r = self.s.patch(f"{self.base}/api/parts/{part_id}", json=payload, headers={"Content-Type":"application/vnd.api+json"}, timeout=self.timeout) return 200 <= r.status_code < 300 def list_categories(self) -> list[dict]: # Paginates if your server is configured that way; adjust per your API. js = self._get("/api/categories", params={"per_page": 500}) return js if isinstance(js, list) else [] def create_category_if_missing(self, name: str) -> int: # You already have ensure_category, but this uses list to avoid extra calls name = (name or "").strip() if not name: raise ValueError("Category name empty") for c in self.list_categories(): if (c.get("name") or "").strip().lower() == name.lower(): cid = self._extract_id(c) if cid: return cid r = self._post_try("/api/categories", {"name": name}) r.raise_for_status() cid = self._extract_id(r.json()) if not cid: raise RuntimeError("Create category: missing id in response") return cid def find_part_by_mpn_or_search(self, key: str) -> Optional[int]: key = (key or "").strip() if not key: return None # 1) exact by manufacturer_product_number pid = self.find_part_id_by_mpn(key) if pid: return pid # 2) name match try: js = self._get("/api/parts", params={"name": key, "per_page": 50}) if isinstance(js, list) and js: cand = js[0] return self._extract_id(cand) except Exception: pass # 3) generic search try: js = self._get("/api/parts", params={"search": key, "per_page": 50}) if isinstance(js, list) and js: cand = js[0] return self._extract_id(cand) except Exception: pass return None def summarize_part(self, part_id: int) -> dict: """Returns a small summary used by the UI (robust to different schemas).""" p = self.get_part(part_id) def _get(path, default=""): obj = p for k in path.split("."): if not isinstance(obj, dict): return default obj = obj.get(k) return obj if obj is not None else default # name / mpn name = _get("name") or _get("attributes.name") or "" mpn = _get("manufacturer_product_number") or _get("attributes.manufacturer_product_number") or "" # category cat = _get("category") or _get("relationships.category") or {} cat_name = (cat.get("name") if isinstance(cat, dict) else "") or "" # footprint fp = _get("footprint") or _get("relationships.footprint") or {} fp_name = (fp.get("name") if isinstance(fp, dict) else "") or "" # eda value eda = self.get_eda_value(p) or "" # description desc = _get("description") or _get("attributes.description") or "" # stock / locations (best-effort) stock_total = None locations: list[str] = [] # Try common embedded places for key in ("stock", "stock_total", "stockTotal"): v = p.get(key) if isinstance(v, (int, float)) and v >= 0: stock_total = int(v); break # Try a dedicated endpoint, if your instance exposes it if stock_total is None: for path in (f"/api/parts/{part_id}/stock-entries", f"/api/stock-entries?part={part_id}", f"/api/stock_entries?part={part_id}"): try: js = self._get(path) if isinstance(js, list): stock_total = sum(int(row.get("amount") or 0) for row in js) for row in js: loc = (row.get("storage_location") or {}).get("name") if isinstance(row.get("storage_location"), dict) else None if loc: locations.append(str(loc)) break except Exception: pass return { "id": part_id, "name": name, "mpn": mpn, "category": cat_name, "footprint": fp_name, "eda_value": eda, "description": desc, "stock": stock_total, "locations": sorted(set(locations)) if locations else [], } def find_part_exact(self, mpn=None, dkpn=None): if dkpn: res = self._get_try(f"/api/parts?digikey_pn={dkpn}") if res.status_code == 200 and res.json(): return res.json()[0] if mpn: res = self._get_try(f"/api/parts?manufacturer_product_number={mpn}") if res.status_code == 200 and res.json(): return res.json()[0] return None @staticmethod def _norm(s) -> str: return ("" if s is None else str(s)).strip().lower() def _orderdetails_iter(self, part: dict): """Yield supplier name and sku-ish fields from order details (varies by Part-DB version).""" rows = part.get("orderdetails") or part.get("orderDetails") or [] for r in rows if isinstance(rows, list) else []: supplier = r.get("supplier") or r.get("vendor") or {} supplier_name = supplier.get("name") if isinstance(supplier, dict) else (supplier or "") # common field names we see in the wild sku = ( r.get("order_number") or r.get("orderNumber") or r.get("order_part_number") or r.get("orderPartNumber") or r.get("supplier_product_number") or r.get("supplierProductNumber") or r.get("sku") or r.get("mpn_or_sku") ) yield str(supplier_name or ""), None if sku in (None, "") else str(sku) def _part_has_dkpn(self, part: dict, dkpn: str) -> bool: """True if any order-detail SKU equals the dkpn (case-insensitive).""" want = self._norm(dkpn) if not want: return False # direct field (some installs keep a vendor PN field) for key in ("digikey_pn", "dk_pn", "vendor_pn", "external_sku"): v = part.get(key) or (part.get("attributes") or {}).get(key) if self._norm(v) == want: return True # look inside order details for supplier_name, sku in self._orderdetails_iter(part): if self._norm(sku) == want: return True return False def find_part_exact(self, *, dkpn: str | None = None, mpn: str | None = None) -> int | None: """ Prefer exact DKPN match (by checking supplier SKU in order details), then fall back to exact MPN match. Returns part_id or None. """ # 1) Try to locate by DKPN: do a broad search, then verify exact match if dkpn: try: js = self._get("/api/parts", params={"search": dkpn, "per_page": 100}) if isinstance(js, list): for p in js: if self._part_has_dkpn(p, dkpn): pid = self._extract_id(p) if pid: return pid except Exception: pass # 2) Exact by MPN (your helper is already exact) if mpn: pid = self.find_part_id_by_mpn(mpn) if pid: return pid return None