343 lines
15 KiB
Python
343 lines
15 KiB
Python
import re, requests
|
|
from typing import Optional, Dict, Any, List
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
|
|
class PartDB:
|
|
def __init__(self, base_url: str, token: str, timeout: int = 30):
|
|
self.base = base_url.rstrip("/")
|
|
self.s = requests.Session()
|
|
self.s.headers.update({"Authorization": f"Bearer {token}", "Accept": "application/json"})
|
|
adapter = HTTPAdapter(pool_connections=64, pool_maxsize=64,
|
|
max_retries=Retry(total=3, backoff_factor=0.3, status_forcelist=(502,503,504)))
|
|
self.s.mount("http://", adapter); self.s.mount("https://", adapter)
|
|
self.timeout = timeout
|
|
|
|
def _get(self, path: str, params: Dict[str, Any] | None = None):
|
|
r = self.s.get(f"{self.base}{path}", params=params, timeout=self.timeout)
|
|
r.raise_for_status(); return r.json()
|
|
|
|
def _post_try(self, path: str, payload: Dict[str, Any]) -> requests.Response:
|
|
r = self.s.post(f"{self.base}{path}", json=payload, timeout=self.timeout,
|
|
headers={"Content-Type":"application/json"})
|
|
if r.status_code in (200,201): return r
|
|
if r.status_code in (400,415):
|
|
r = self.s.post(f"{self.base}{path}", json=payload, timeout=self.timeout,
|
|
headers={"Content-Type":"application/vnd.api+json"})
|
|
return r
|
|
|
|
def _patch_merge(self, path: str, payload: Dict[str, Any]) -> requests.Response:
|
|
return self.s.patch(f"{self.base}{path}", json=payload, timeout=self.timeout,
|
|
headers={"Content-Type":"application/merge-patch+json"})
|
|
|
|
@staticmethod
|
|
def _extract_id(obj: Dict[str, Any]) -> Optional[int]:
|
|
if obj is None: return None
|
|
if isinstance(obj.get("id"), int): return obj["id"]
|
|
if isinstance(obj.get("_id"), int): return obj["_id"]
|
|
raw = obj.get("id")
|
|
if isinstance(raw, str):
|
|
digits = "".join(ch for ch in raw if ch.isdigit())
|
|
if digits: return int(digits)
|
|
attrs = obj.get("attributes") or {}
|
|
if isinstance(attrs.get("_id"), int): return attrs["_id"]
|
|
return None
|
|
|
|
def ensure_category(self, name: str) -> int:
|
|
js = self._get("/api/categories", params={"name": name})
|
|
if isinstance(js, list) and js: return int(self._extract_id(js[0]))
|
|
r = self._post_try("/api/categories", {"name": name}); r.raise_for_status()
|
|
return int(self._extract_id(r.json()))
|
|
|
|
def ensure_manufacturer(self, name: str) -> int:
|
|
js = self._get("/api/manufacturers", params={"name": name})
|
|
if isinstance(js, list) and js: return int(self._extract_id(js[0]))
|
|
r = self._post_try("/api/manufacturers", {"name": name}); r.raise_for_status()
|
|
return int(self._extract_id(r.json()))
|
|
|
|
def ensure_footprint(self, name: str) -> int:
|
|
js = self._get("/api/footprints", params={"name": name})
|
|
if isinstance(js, list) and js: return int(self._extract_id(js[0]))
|
|
r = self._post_try("/api/footprints", {"name": name}); r.raise_for_status()
|
|
return int(self._extract_id(r.json()))
|
|
|
|
def find_part_id_by_mpn(self, mpn: str) -> Optional[int]:
|
|
def norm(s: Optional[str]) -> str:
|
|
import re
|
|
return re.sub(r"\s+","",(s or "")).strip().lower()
|
|
want = norm(mpn)
|
|
def get_field(p: dict, key: str) -> Optional[str]:
|
|
if key in p: return p.get(key)
|
|
return (p.get("attributes") or {}).get(key)
|
|
for params in (
|
|
{"manufacturer_product_number": mpn, "per_page": 50},
|
|
{"name": mpn, "per_page": 50},
|
|
{"search": mpn, "per_page": 50},
|
|
):
|
|
try:
|
|
js = self._get("/api/parts", params=params)
|
|
except Exception:
|
|
continue
|
|
if not isinstance(js, list):
|
|
continue
|
|
for p in js:
|
|
if norm(get_field(p,"manufacturer_product_number")) == want or norm(get_field(p,"name")) == want:
|
|
return self._extract_id(p)
|
|
return None
|
|
|
|
def create_part(self, *, name: str, category_id: int, manufacturer_id: int,
|
|
mpn: str, description: str = "", product_url: Optional[str] = None,
|
|
footprint_id: Optional[int] = None) -> int:
|
|
payload = {
|
|
"name": name, "description": description or "",
|
|
"category": f"/api/categories/{category_id}",
|
|
"manufacturer": f"/api/manufacturers/{manufacturer_id}",
|
|
"manufacturer_product_number": mpn,
|
|
}
|
|
if product_url: payload["manufacturer_product_url"] = product_url
|
|
if footprint_id is not None: payload["footprint"] = f"/api/footprints/{footprint_id}"
|
|
r = self._post_try("/api/parts", payload)
|
|
if r.status_code not in (200,201):
|
|
raise RuntimeError(f"Create part failed: {r.status_code} {r.text}")
|
|
pid = self._extract_id(r.json())
|
|
if not pid: raise RuntimeError(f"Create part: no id in response: {r.text}")
|
|
return int(pid)
|
|
|
|
def get_part(self, part_id: int) -> dict:
|
|
return self._get(f"/api/parts/{part_id}")
|
|
|
|
def get_part_parameter_values(self, part_id: int):
|
|
for path in (
|
|
f"/api/parts/{part_id}/parameter-values",
|
|
f"/api/parts/{part_id}/parameter_values",
|
|
f"/api/parameter-values?filter[part_id]={part_id}",
|
|
):
|
|
try:
|
|
js = self._get(path)
|
|
if isinstance(js, list) and js: return js
|
|
if isinstance(js, dict) and js.get("data"): return js["data"]
|
|
except Exception:
|
|
pass
|
|
return []
|
|
|
|
def get_part_params_flat(self, part: dict) -> Dict[str, tuple[Optional[str], Optional[str]]]:
|
|
out: Dict[str, tuple[Optional[str], Optional[str]]] = {}
|
|
params = part.get("parameters") or part.get("parameter_values") or []
|
|
for p in params:
|
|
name = (p.get("name") or (p.get("parameter") or {}).get("name") or "").strip()
|
|
if not name: continue
|
|
raw_val = None
|
|
for key in ("value_typical","typ","value_text","value","max","min","nominal"):
|
|
if p.get(key) not in (None,""):
|
|
raw_val = p.get(key); break
|
|
u = p.get("unit"); unit = (u.get("symbol") or u.get("name")) if isinstance(u, dict) else u
|
|
if raw_val in (None,"") and p.get("formatted"):
|
|
raw_val = p["formatted"]; unit = None
|
|
out[name.lower()] = (None if raw_val in (None,"") else str(raw_val), unit)
|
|
return out
|
|
|
|
def get_eda_value(self, part: dict):
|
|
eda = part.get("eda_info") or part.get("eda")
|
|
if isinstance(eda, dict): return eda.get("value")
|
|
return part.get("eda_value") or part.get("edaValue")
|
|
|
|
def patch_eda_value(self, part_id: int, eda_value: str) -> bool:
|
|
for payload in ({"eda_info":{"value":eda_value}},
|
|
{"eda":{"value":eda_value}},
|
|
{"eda_value":eda_value}):
|
|
r = self._patch_merge(f"/api/parts/{part_id}", payload)
|
|
if 200 <= r.status_code < 300: return True
|
|
payload = {"data":{"type":"parts","id":str(part_id),"attributes":{"eda_value":eda_value}}}
|
|
r = self.s.patch(f"{self.base}/api/parts/{part_id}", json=payload,
|
|
headers={"Content-Type":"application/vnd.api+json"}, timeout=self.timeout)
|
|
return 200 <= r.status_code < 300
|
|
|
|
def list_categories(self) -> list[dict]:
|
|
# Paginates if your server is configured that way; adjust per your API.
|
|
js = self._get("/api/categories", params={"per_page": 500})
|
|
return js if isinstance(js, list) else []
|
|
|
|
def create_category_if_missing(self, name: str) -> int:
|
|
# You already have ensure_category, but this uses list to avoid extra calls
|
|
name = (name or "").strip()
|
|
if not name:
|
|
raise ValueError("Category name empty")
|
|
for c in self.list_categories():
|
|
if (c.get("name") or "").strip().lower() == name.lower():
|
|
cid = self._extract_id(c)
|
|
if cid: return cid
|
|
r = self._post_try("/api/categories", {"name": name})
|
|
r.raise_for_status()
|
|
cid = self._extract_id(r.json())
|
|
if not cid:
|
|
raise RuntimeError("Create category: missing id in response")
|
|
return cid
|
|
|
|
def find_part_by_mpn_or_search(self, key: str) -> Optional[int]:
|
|
key = (key or "").strip()
|
|
if not key:
|
|
return None
|
|
# 1) exact by manufacturer_product_number
|
|
pid = self.find_part_id_by_mpn(key)
|
|
if pid:
|
|
return pid
|
|
# 2) name match
|
|
try:
|
|
js = self._get("/api/parts", params={"name": key, "per_page": 50})
|
|
if isinstance(js, list) and js:
|
|
cand = js[0]
|
|
return self._extract_id(cand)
|
|
except Exception:
|
|
pass
|
|
# 3) generic search
|
|
try:
|
|
js = self._get("/api/parts", params={"search": key, "per_page": 50})
|
|
if isinstance(js, list) and js:
|
|
cand = js[0]
|
|
return self._extract_id(cand)
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
def summarize_part(self, part_id: int) -> dict:
|
|
"""Returns a small summary used by the UI (robust to different schemas)."""
|
|
p = self.get_part(part_id)
|
|
|
|
def _get(path, default=""):
|
|
obj = p
|
|
for k in path.split("."):
|
|
if not isinstance(obj, dict):
|
|
return default
|
|
obj = obj.get(k)
|
|
return obj if obj is not None else default
|
|
|
|
# name / mpn
|
|
name = _get("name") or _get("attributes.name") or ""
|
|
mpn = _get("manufacturer_product_number") or _get("attributes.manufacturer_product_number") or ""
|
|
|
|
# category
|
|
cat = _get("category") or _get("relationships.category") or {}
|
|
cat_name = (cat.get("name") if isinstance(cat, dict) else "") or ""
|
|
|
|
# footprint
|
|
fp = _get("footprint") or _get("relationships.footprint") or {}
|
|
fp_name = (fp.get("name") if isinstance(fp, dict) else "") or ""
|
|
|
|
# eda value
|
|
eda = self.get_eda_value(p) or ""
|
|
|
|
# description
|
|
desc = _get("description") or _get("attributes.description") or ""
|
|
|
|
# stock / locations (best-effort)
|
|
stock_total = None
|
|
locations: list[str] = []
|
|
|
|
# Try common embedded places
|
|
for key in ("stock", "stock_total", "stockTotal"):
|
|
v = p.get(key)
|
|
if isinstance(v, (int, float)) and v >= 0:
|
|
stock_total = int(v); break
|
|
|
|
# Try a dedicated endpoint, if your instance exposes it
|
|
if stock_total is None:
|
|
for path in (f"/api/parts/{part_id}/stock-entries",
|
|
f"/api/stock-entries?part={part_id}",
|
|
f"/api/stock_entries?part={part_id}"):
|
|
try:
|
|
js = self._get(path)
|
|
if isinstance(js, list):
|
|
stock_total = sum(int(row.get("amount") or 0) for row in js)
|
|
for row in js:
|
|
loc = (row.get("storage_location") or {}).get("name") if isinstance(row.get("storage_location"), dict) else None
|
|
if loc: locations.append(str(loc))
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
"id": part_id,
|
|
"name": name,
|
|
"mpn": mpn,
|
|
"category": cat_name,
|
|
"footprint": fp_name,
|
|
"eda_value": eda,
|
|
"description": desc,
|
|
"stock": stock_total,
|
|
"locations": sorted(set(locations)) if locations else [],
|
|
}
|
|
|
|
def find_part_exact(self, mpn=None, dkpn=None):
|
|
if dkpn:
|
|
res = self._get_try(f"/api/parts?digikey_pn={dkpn}")
|
|
if res.status_code == 200 and res.json():
|
|
return res.json()[0]
|
|
|
|
if mpn:
|
|
res = self._get_try(f"/api/parts?manufacturer_product_number={mpn}")
|
|
if res.status_code == 200 and res.json():
|
|
return res.json()[0]
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def _norm(s) -> str:
|
|
return ("" if s is None else str(s)).strip().lower()
|
|
|
|
def _orderdetails_iter(self, part: dict):
|
|
"""Yield supplier name and sku-ish fields from order details (varies by Part-DB version)."""
|
|
rows = part.get("orderdetails") or part.get("orderDetails") or []
|
|
for r in rows if isinstance(rows, list) else []:
|
|
supplier = r.get("supplier") or r.get("vendor") or {}
|
|
supplier_name = supplier.get("name") if isinstance(supplier, dict) else (supplier or "")
|
|
# common field names we see in the wild
|
|
sku = (
|
|
r.get("order_number") or r.get("orderNumber") or
|
|
r.get("order_part_number") or r.get("orderPartNumber") or
|
|
r.get("supplier_product_number") or r.get("supplierProductNumber") or
|
|
r.get("sku") or r.get("mpn_or_sku")
|
|
)
|
|
yield str(supplier_name or ""), None if sku in (None, "") else str(sku)
|
|
|
|
def _part_has_dkpn(self, part: dict, dkpn: str) -> bool:
|
|
"""True if any order-detail SKU equals the dkpn (case-insensitive)."""
|
|
want = self._norm(dkpn)
|
|
if not want:
|
|
return False
|
|
# direct field (some installs keep a vendor PN field)
|
|
for key in ("digikey_pn", "dk_pn", "vendor_pn", "external_sku"):
|
|
v = part.get(key) or (part.get("attributes") or {}).get(key)
|
|
if self._norm(v) == want:
|
|
return True
|
|
# look inside order details
|
|
for supplier_name, sku in self._orderdetails_iter(part):
|
|
if self._norm(sku) == want:
|
|
return True
|
|
return False
|
|
|
|
def find_part_exact(self, *, dkpn: str | None = None, mpn: str | None = None) -> int | None:
|
|
"""
|
|
Prefer exact DKPN match (by checking supplier SKU in order details),
|
|
then fall back to exact MPN match. Returns part_id or None.
|
|
"""
|
|
# 1) Try to locate by DKPN: do a broad search, then verify exact match
|
|
if dkpn:
|
|
try:
|
|
js = self._get("/api/parts", params={"search": dkpn, "per_page": 100})
|
|
if isinstance(js, list):
|
|
for p in js:
|
|
if self._part_has_dkpn(p, dkpn):
|
|
pid = self._extract_id(p)
|
|
if pid:
|
|
return pid
|
|
except Exception:
|
|
pass
|
|
|
|
# 2) Exact by MPN (your helper is already exact)
|
|
if mpn:
|
|
pid = self.find_part_id_by_mpn(mpn)
|
|
if pid:
|
|
return pid
|
|
|
|
return None
|