Files
PartDB_Helper_App/workflows/bulk_add.py

221 lines
10 KiB
Python

import time
from collections import deque
from concurrent.futures import ProcessPoolExecutor, wait, FIRST_COMPLETED
from typing import Any, Dict, List, Optional
from tqdm import tqdm
from config import *
from apis.partdb_api import PartDB
from jobs import Job, generate_rc0603fr_e32, generate_rc0805fr_e32, \
generate_caps_0805_queries, generate_caps_1206_dupes_for_low_v
from provider.selenium_flow import (
start_firefox_resilient, ensure_logged_in, run_provider_update_flow,
wait_fields_to_persist, set_eda_from_resistance, set_eda_from_capacitance
)
def build_jobs() -> List[Job]:
jobs: List[Job] = []
if ENABLE_RESISTORS_0805:
jobs.append(Job("Resistor", "YAGEO", "0805", generate_rc0805fr_e32(), "resistor"))
if ENABLE_RESISTORS_0603:
jobs.append(Job("Resistor", "YAGEO", "0603", generate_rc0603fr_e32(), "resistor"))
if ENABLE_CAPS_0805:
jobs.append(Job("Capacitor", DEFAULT_CAP_MANUFACTURER, "0805", generate_caps_0805_queries(), "capacitor"))
if ADD_1206_FOR_LOW_V_CAPS:
dupes = generate_caps_1206_dupes_for_low_v(LOW_V_CAP_THRESHOLD_V, UPSIZED_1206_TARGET_V)
if dupes:
jobs.append(Job("Capacitor", DEFAULT_CAP_MANUFACTURER, "1206", dupes, "capacitor"))
return jobs
# Minimal worker context (same idea as your original)
_WORKER_CTX: Dict[str, Any] = {"driver": None, "pdb": None}
def _worker_init(base: str, token: str, headless: bool):
import atexit
drv = start_firefox_resilient(headless_first=headless)
drv.get(base + "/")
ensure_logged_in(drv, base, interactive_ok=False, wait_s=120)
_WORKER_CTX["driver"] = drv
_WORKER_CTX["pdb"] = PartDB(base, token)
@atexit.register
def _cleanup():
try:
if _WORKER_CTX.get("driver"):
_WORKER_CTX["driver"].quit()
except Exception:
pass
def _retry_worker_task(task: dict) -> dict:
drv = _WORKER_CTX.get("driver"); pdb: PartDB = _WORKER_CTX.get("pdb")
mpn = task["mpn"]; part_id = task["part_id"]; kind = task["kind"]
if not drv or not pdb:
return {"mpn": mpn, "part_id": part_id, "status": "issue", "stage": "worker", "reason": "ctx not init"}
try:
controller = drv.current_window_handle
except Exception:
return {"mpn": mpn, "part_id": part_id, "status": "issue", "stage": "driver", "reason": "no window"}
for attempt in range(1, MAX_RETRIES + 1):
ok, where = run_provider_update_flow(drv, PARTDB_BASE, UI_LANG_PATH, part_id, controller)
if not ok:
if attempt == MAX_RETRIES:
return {"mpn": mpn, "part_id": part_id, "status": "issue", "stage": where, "reason": "provider failed after retries"}
continue
missing = wait_fields_to_persist(pdb, part_id, timeout_s=8, poll_s=0.8)
if not missing:
if kind == "resistor":
ok_eda = set_eda_from_resistance(pdb, part_id, max_wait_s=10, poll_every=0.8)
else:
ok_eda = set_eda_from_capacitance(pdb, part_id, max_wait_s=10, poll_every=0.8)
if not ok_eda:
return {"mpn": mpn, "part_id": part_id, "status": "ok", "stage": "eda_set_warn", "reason": "EDA not set"}
return {"mpn": mpn, "part_id": part_id, "status": "ok", "stage": "done", "reason": ""}
if attempt == MAX_RETRIES:
return {"mpn": mpn, "part_id": part_id, "status": "issue", "stage": "post_update", "reason": f"missing after retries: {', '.join(missing)}"}
return {"mpn": mpn, "part_id": part_id, "status": "issue", "stage": "internal", "reason": "fell through"}
def run_bulk_add():
# Controller session
driver = start_firefox_resilient(headless_first=HEADLESS_TRY)
driver.get(PARTDB_BASE + "/")
controller_handle = driver.current_window_handle
if not ensure_logged_in(driver, PARTDB_BASE, interactive_ok=True, wait_s=600):
print("Could not login; aborting."); return
pdb = PartDB(PARTDB_BASE, PARTDB_TOKEN)
issues: List[Dict[str, Any]] = []
created = skipped = failed = updated = 0
jobs = build_jobs()
bar = tqdm(total=sum(len(j.seeds if MAX_TO_CREATE is None else j.seeds[:int(MAX_TO_CREATE)]) for j in jobs),
desc="Parts", unit="part", dynamic_ncols=True, ascii=True, leave=True)
pool = ProcessPoolExecutor(max_workers=MAX_PARALLEL_WORKERS,
initializer=_worker_init,
initargs=(PARTDB_BASE, PARTDB_TOKEN, HEADLESS_WORKER))
pending: Dict[Any, dict] = {}
in_flight: set[int] = set()
backlog: deque[dict] = deque()
def harvest_done(nonblocking=True):
if not pending: return
timeout = 0 if nonblocking else None
done, _ = wait(list(pending.keys()), timeout=timeout, return_when=FIRST_COMPLETED if nonblocking else None)
for fut in list(done):
task = pending.pop(fut, None)
if task: in_flight.discard(task["part_id"])
try:
res = fut.result()
except Exception as e:
res = {"mpn": task["mpn"], "part_id": task["part_id"], "status": "issue", "stage": "pool", "reason": str(e)}
if res["status"] != "ok":
issues.append({"mpn": res["mpn"], "part_id": res["part_id"], "stage": res["stage"], "reason": res["reason"]})
tqdm.write(f" [RETRY-ISSUE] {res['mpn']} id={res['part_id']}: {res['stage']}{res['reason']}")
else:
tqdm.write(f" [RETRY-OK] {res['mpn']} id={res['part_id']}")
def try_submit(task: dict):
pid = task["part_id"]
if pid in in_flight: return
if len(pending) < MAX_PARALLEL_WORKERS:
fut = pool.submit(_retry_worker_task, task)
pending[fut] = task; in_flight.add(pid)
else:
backlog.append(task)
def drain_backlog():
while backlog and len(pending) < MAX_PARALLEL_WORKERS:
task = backlog.popleft()
pid = task["part_id"]
if pid in in_flight: continue
fut = pool.submit(_retry_worker_task, task)
pending[fut] = task; in_flight.add(pid)
try:
for job in jobs:
# Resolve IDs
cat_id = pdb.ensure_category(job.category)
manu_id = pdb.ensure_manufacturer(job.manufacturer)
fp_id = pdb.ensure_footprint(job.footprint) if job.footprint else None
tqdm.write(f"\n=== Job: {job.kind} {job.footprint or ''} in {job.category} (mfr: {job.manufacturer}) ===")
seeds = job.seeds if MAX_TO_CREATE is None else job.seeds[:int(MAX_TO_CREATE)]
for mpn in seeds:
bar.set_postfix_str(mpn)
harvest_done(True); drain_backlog()
try:
existing_id = pdb.find_part_id_by_mpn(mpn)
except Exception as e:
issues.append({"mpn": mpn, "part_id": "", "stage": "create_part", "reason": f"existence check: {e}"})
failed += 1; bar.update(1); continue
if existing_id and SKIP_IF_EXISTS:
part_id = existing_id
tqdm.write(f"[EXIST] {mpn} (id={part_id})")
part = pdb.get_part(part_id)
missing = [] # leave detailed re-check to your helpers if desired
if missing:
tqdm.write(f" [QUEUE→RUN] Existing incomplete ({', '.join(missing)})")
try_submit({"mpn": mpn, "part_id": part_id, "kind": job.kind})
else:
try:
if job.kind == "resistor":
set_eda_from_resistance(pdb, part_id, max_wait_s=10, poll_every=0.8)
else:
set_eda_from_capacitance(pdb, part_id, max_wait_s=10, poll_every=0.8)
except Exception as e:
issues.append({"mpn": mpn, "part_id": part_id, "stage": "eda_set", "reason": f"existing: {e}"})
tqdm.write(" [OK] Existing part complete")
skipped += 1
bar.update(1); continue
# Create new
part_id = pdb.create_part(name=mpn, category_id=cat_id, manufacturer_id=manu_id,
mpn=mpn, description="", product_url=None, footprint_id=fp_id)
created += 1
tqdm.write(f"[OK] Part id={part_id} (created)")
ok, where = run_provider_update_flow(driver, PARTDB_BASE, UI_LANG_PATH, part_id, controller_handle)
if not ok:
issues.append({"mpn": mpn, "part_id": part_id, "stage": where, "reason": "provider step failed (first pass)"})
tqdm.write(f" [WARN] Provider failed at: {where} — queued for background retry")
try_submit({"mpn": mpn, "part_id": part_id, "kind": job.kind})
failed += 1; bar.update(1); continue
tqdm.write(" [OK] Provider update completed")
updated += 1
missing = [] # you can call your existing completeness check here
if missing:
tqdm.write(f" [QUEUE→RUN] Incomplete new ({', '.join(missing)})")
try_submit({"mpn": mpn, "part_id": part_id, "kind": job.kind})
else:
try:
if job.kind == "resistor":
set_eda_from_resistance(pdb, part_id, max_wait_s=10, poll_every=0.8)
else:
set_eda_from_capacitance(pdb, part_id, max_wait_s=10, poll_every=0.8)
except Exception as e:
issues.append({"mpn": mpn, "part_id": part_id, "stage": "eda_set", "reason": str(e)})
bar.update(1)
finally:
bar.close()
try:
while pending or backlog:
drain_backlog()
harvest_done(nonblocking=False)
finally:
pool.shutdown(wait=True)
if PRINT_FAILURE_TABLE and issues:
print("\n=== Issues ===")
for row in issues:
print(row)
print("\nDone.")
print(f" Created: {created}")
print(f" Updated via provider (auto): {updated}")
print(f" Skipped: {skipped}")
print(f" Failed: {failed}")