457 lines
17 KiB
Python
457 lines
17 KiB
Python
"""
|
|
Workflow to standardize ASDMB crystal parts.
|
|
|
|
This script goes through all parts in the "Clock - ASDMB" category and:
|
|
1. Splits the name at "/" - first part becomes name, second part becomes description
|
|
2. For parts without a description after splitting, triggers info provider update
|
|
|
|
Uses the PartDB API for all operations.
|
|
"""
|
|
|
|
import re
|
|
from typing import Optional, List, Tuple
|
|
from tqdm import tqdm
|
|
|
|
from config import PARTDB_BASE, PARTDB_TOKEN
|
|
from apis.partdb_api import PartDB
|
|
|
|
|
|
def get_all_parts_in_category(api: PartDB, category_name: str) -> List[dict]:
|
|
"""
|
|
Get all parts in a specific category and its subcategories.
|
|
"""
|
|
# First, find the category
|
|
categories = api.list_categories()
|
|
target_cat_id = None
|
|
|
|
for cat in categories:
|
|
name = (cat.get("name") or "").strip()
|
|
if name.lower() == category_name.lower():
|
|
target_cat_id = api._extract_id(cat)
|
|
break
|
|
|
|
if not target_cat_id:
|
|
print(f"Category '{category_name}' not found!")
|
|
return []
|
|
|
|
print(f"Found category '{category_name}' with ID {target_cat_id}")
|
|
|
|
# Find all subcategories
|
|
subcategory_ids = [target_cat_id]
|
|
|
|
def find_children(parent_id: int):
|
|
for cat in categories:
|
|
parent = cat.get("parent")
|
|
if parent:
|
|
parent_id_str = None
|
|
if isinstance(parent, dict):
|
|
parent_id_str = parent.get("id") or parent.get("_id")
|
|
elif isinstance(parent, str):
|
|
parent_id_str = parent
|
|
|
|
if parent_id_str:
|
|
# Extract just the number
|
|
if isinstance(parent_id_str, str):
|
|
parent_num = int(''.join(c for c in parent_id_str if c.isdigit()))
|
|
else:
|
|
parent_num = int(parent_id_str)
|
|
|
|
if parent_num == parent_id:
|
|
child_id = api._extract_id(cat)
|
|
if child_id and child_id not in subcategory_ids:
|
|
subcategory_ids.append(child_id)
|
|
print(f" Found subcategory: {cat.get('name')} (ID: {child_id})")
|
|
find_children(child_id)
|
|
|
|
find_children(target_cat_id)
|
|
|
|
print(f"Total categories to process: {len(subcategory_ids)}")
|
|
print(f"Category IDs: {subcategory_ids}")
|
|
|
|
# Fetch all parts in this category with pagination
|
|
all_parts = []
|
|
page = 1
|
|
per_page = 30 # Use smaller page size to match API default
|
|
|
|
print("\nFetching parts from API...")
|
|
while True:
|
|
params = {"per_page": per_page, "page": page}
|
|
print(f" Fetching page {page}...")
|
|
|
|
try:
|
|
parts = api._get("/api/parts", params=params)
|
|
|
|
if isinstance(parts, list):
|
|
if not parts:
|
|
print(f" No parts returned, stopping")
|
|
break
|
|
|
|
# Filter by category
|
|
matches_this_page = 0
|
|
category_ids_found = set()
|
|
for part in parts:
|
|
part_cat = part.get("category")
|
|
part_cat_id = None
|
|
|
|
if isinstance(part_cat, dict):
|
|
part_cat_id = api._extract_id(part_cat)
|
|
elif isinstance(part_cat, str):
|
|
try:
|
|
if "/categories/" in part_cat:
|
|
part_cat_id = int(part_cat.strip("/").split("/")[-1])
|
|
else:
|
|
part_cat_id = int(''.join(c for c in part_cat if c.isdigit()))
|
|
except Exception:
|
|
pass
|
|
elif isinstance(part_cat, int):
|
|
part_cat_id = part_cat
|
|
|
|
# Also check relationships for category
|
|
if part_cat_id is None:
|
|
relationships = part.get("relationships", {})
|
|
if relationships:
|
|
rel_cat = relationships.get("category")
|
|
if isinstance(rel_cat, dict):
|
|
rel_cat_data = rel_cat.get("data", {})
|
|
if isinstance(rel_cat_data, dict):
|
|
part_cat_id = api._extract_id(rel_cat_data)
|
|
|
|
# Also check attributes
|
|
if part_cat_id is None:
|
|
attributes = part.get("attributes", {})
|
|
if attributes:
|
|
attr_cat = attributes.get("category")
|
|
if attr_cat:
|
|
if isinstance(attr_cat, dict):
|
|
part_cat_id = api._extract_id(attr_cat)
|
|
elif isinstance(attr_cat, (int, str)):
|
|
try:
|
|
part_cat_id = int(str(attr_cat).strip("/").split("/")[-1])
|
|
except Exception:
|
|
pass
|
|
|
|
if part_cat_id:
|
|
category_ids_found.add(part_cat_id)
|
|
|
|
if part_cat_id and part_cat_id in subcategory_ids:
|
|
all_parts.append(part)
|
|
matches_this_page += 1
|
|
|
|
print(f" Got {len(parts)} parts ({matches_this_page} matches, total: {len(all_parts)})")
|
|
|
|
# Continue to next page if we got a full page
|
|
if len(parts) < per_page:
|
|
break
|
|
page += 1
|
|
|
|
elif isinstance(parts, dict):
|
|
data = parts.get("data", [])
|
|
meta = parts.get("meta", {})
|
|
|
|
if not data:
|
|
print(f" No data returned, stopping")
|
|
break
|
|
|
|
# Filter by category
|
|
matches_this_page = 0
|
|
category_ids_found = set()
|
|
for part in data:
|
|
part_cat = part.get("category")
|
|
part_cat_id = None
|
|
|
|
if isinstance(part_cat, dict):
|
|
part_cat_id = api._extract_id(part_cat)
|
|
elif isinstance(part_cat, str):
|
|
try:
|
|
if "/categories/" in part_cat:
|
|
part_cat_id = int(part_cat.strip("/").split("/")[-1])
|
|
else:
|
|
part_cat_id = int(''.join(c for c in part_cat if c.isdigit()))
|
|
except Exception:
|
|
pass
|
|
elif isinstance(part_cat, int):
|
|
part_cat_id = part_cat
|
|
|
|
# Also check relationships for category
|
|
if part_cat_id is None:
|
|
relationships = part.get("relationships", {})
|
|
if relationships:
|
|
rel_cat = relationships.get("category")
|
|
if isinstance(rel_cat, dict):
|
|
rel_cat_data = rel_cat.get("data", {})
|
|
if isinstance(rel_cat_data, dict):
|
|
part_cat_id = api._extract_id(rel_cat_data)
|
|
|
|
# Also check attributes
|
|
if part_cat_id is None:
|
|
attributes = part.get("attributes", {})
|
|
if attributes:
|
|
attr_cat = attributes.get("category")
|
|
if attr_cat:
|
|
if isinstance(attr_cat, dict):
|
|
part_cat_id = api._extract_id(attr_cat)
|
|
elif isinstance(attr_cat, (int, str)):
|
|
try:
|
|
part_cat_id = int(str(attr_cat).strip("/").split("/")[-1])
|
|
except Exception:
|
|
pass
|
|
|
|
if part_cat_id:
|
|
category_ids_found.add(part_cat_id)
|
|
|
|
if part_cat_id and part_cat_id in subcategory_ids:
|
|
all_parts.append(part)
|
|
matches_this_page += 1
|
|
|
|
print(f" Got {len(data)} parts ({matches_this_page} matches, total: {len(all_parts)})")
|
|
|
|
# Check if there's more pages using meta or data length
|
|
has_more = False
|
|
if meta.get("current_page") and meta.get("last_page"):
|
|
if meta["current_page"] < meta["last_page"]:
|
|
has_more = True
|
|
elif len(data) >= per_page:
|
|
has_more = True
|
|
|
|
if not has_more:
|
|
break
|
|
|
|
page += 1
|
|
# Safety check
|
|
if page > 100:
|
|
print(f" Warning: Fetched 100 pages, stopping")
|
|
break
|
|
else:
|
|
break
|
|
|
|
except Exception as e:
|
|
print(f" Error fetching page {page}: {e}")
|
|
break
|
|
|
|
print(f"\nFound {len(all_parts)} parts in category")
|
|
return all_parts
|
|
|
|
|
|
def standardize_asdmb_part(api: PartDB, part: dict, dry_run: bool = False) -> Tuple[bool, str, bool]:
|
|
"""
|
|
Standardize a single ASDMB crystal part.
|
|
|
|
Returns: (success, message, needs_provider_update)
|
|
"""
|
|
part_id = api._extract_id(part)
|
|
if not part_id:
|
|
return (False, "No part ID", False)
|
|
|
|
# Get current name and description
|
|
current_name = part.get("name") or part.get("attributes", {}).get("name") or ""
|
|
current_desc = part.get("description") or part.get("attributes", {}).get("description") or ""
|
|
|
|
# Split name at "/" to get new name (first part)
|
|
new_name = current_name
|
|
if "/" in current_name:
|
|
new_name = current_name.split("/", 1)[0].strip()
|
|
|
|
# Split description at "/" to get new description (second part)
|
|
new_description = ""
|
|
needs_provider_update = False
|
|
|
|
if "/" in current_desc:
|
|
parts = current_desc.split("/", 1)
|
|
new_description = parts[1].strip() if len(parts) > 1 else ""
|
|
if not new_description:
|
|
needs_provider_update = True
|
|
elif not current_desc.strip():
|
|
# No description at all
|
|
needs_provider_update = True
|
|
else:
|
|
# Has description but no "/" - leave as is
|
|
new_description = current_desc
|
|
|
|
# Check what needs updating
|
|
changes = []
|
|
|
|
if current_name != new_name:
|
|
changes.append(f"name: '{current_name}' → '{new_name}'")
|
|
|
|
if new_description and current_desc != new_description:
|
|
changes.append(f"desc: '{current_desc}' → '{new_description}'")
|
|
|
|
if needs_provider_update:
|
|
changes.append("needs provider update for description")
|
|
|
|
if not changes:
|
|
return (True, "Already correct", False)
|
|
|
|
if dry_run:
|
|
return (True, f"Would update: {'; '.join(changes)}", needs_provider_update)
|
|
|
|
# Apply updates
|
|
try:
|
|
payload = {
|
|
"name": new_name
|
|
}
|
|
|
|
# Only update description if we have one and it changed
|
|
if new_description and new_description != current_desc:
|
|
payload["description"] = new_description
|
|
|
|
r = api._patch_merge(f"/api/parts/{part_id}", payload)
|
|
if r.status_code not in range(200, 300):
|
|
return (False, f"Failed to update: {r.status_code}", needs_provider_update)
|
|
|
|
result_msg = f"Updated: {'; '.join(changes)}"
|
|
return (True, result_msg, needs_provider_update)
|
|
|
|
except Exception as e:
|
|
return (False, f"Update failed: {e}", needs_provider_update)
|
|
|
|
|
|
def run_standardize_asdmb(category_name: str = "Clock - ASDMB", dry_run: bool = False, update_providers: bool = False, progress_callback=None):
|
|
"""
|
|
Main function to standardize all ASDMB crystal parts.
|
|
|
|
Args:
|
|
category_name: Name of the category to process (default: "Clock - ASDMB")
|
|
dry_run: If True, don't make any changes
|
|
update_providers: If True, trigger provider updates for parts without descriptions
|
|
progress_callback: Optional callback function(current, total, status_text, should_cancel_func)
|
|
Returns True if operation should be cancelled
|
|
"""
|
|
print("=" * 70)
|
|
print("ASDMB CRYSTAL STANDARDIZATION")
|
|
print("=" * 70)
|
|
print(f"Category: {category_name}")
|
|
print(f"Mode: {'DRY RUN (no changes)' if dry_run else 'LIVE MODE (will update parts)'}")
|
|
print(f"Provider updates: {'ENABLED' if update_providers else 'DISABLED'}")
|
|
print("=" * 70)
|
|
|
|
# Initialize API
|
|
api = PartDB(PARTDB_BASE, PARTDB_TOKEN)
|
|
|
|
# Get all parts in category
|
|
print("\nFetching parts from category...")
|
|
parts = get_all_parts_in_category(api, category_name)
|
|
|
|
if not parts:
|
|
print("No parts found!")
|
|
return
|
|
|
|
print(f"\nProcessing {len(parts)} parts...")
|
|
|
|
# Track results
|
|
successful = 0
|
|
failed = 0
|
|
skipped = 0
|
|
needs_provider = []
|
|
|
|
# Process each part
|
|
use_tqdm = not progress_callback
|
|
iterator = tqdm(parts, desc="Processing parts") if use_tqdm else parts
|
|
|
|
for idx, part in enumerate(iterator):
|
|
# Check for cancellation
|
|
if progress_callback:
|
|
cancelled = progress_callback(idx, len(parts), f"Processing part {idx+1}/{len(parts)}...")
|
|
if cancelled:
|
|
print("\n⚠ Operation cancelled by user")
|
|
break
|
|
|
|
part_name = part.get("name") or "Unknown"
|
|
part_id = api._extract_id(part)
|
|
|
|
success, message, needs_update = standardize_asdmb_part(api, part, dry_run)
|
|
|
|
if success:
|
|
if "Already correct" in message or "skipping" in message:
|
|
skipped += 1
|
|
else:
|
|
successful += 1
|
|
print(f"✓ {part_name}: {message}")
|
|
|
|
if needs_update:
|
|
needs_provider.append((part_id, part_name))
|
|
else:
|
|
failed += 1
|
|
print(f"✗ {part_name}: {message}")
|
|
|
|
# Final progress update
|
|
if progress_callback:
|
|
progress_callback(len(parts), len(parts), "Complete!")
|
|
|
|
# Summary
|
|
print("\n" + "=" * 70)
|
|
print("SUMMARY")
|
|
print("=" * 70)
|
|
print(f"Total parts: {len(parts)}")
|
|
print(f"Updated: {successful}")
|
|
print(f"Failed: {failed}")
|
|
print(f"Skipped: {skipped}")
|
|
print(f"Need provider update: {len(needs_provider)}")
|
|
|
|
if needs_provider and update_providers and not dry_run:
|
|
print("\n" + "=" * 70)
|
|
print("TRIGGERING PROVIDER UPDATES")
|
|
print("=" * 70)
|
|
|
|
# Import selenium flow for provider updates
|
|
try:
|
|
from provider.selenium_flow import start_firefox_resilient, ensure_logged_in, run_provider_update_flow
|
|
from config import HEADLESS_PROVIDER
|
|
|
|
print("Starting browser...")
|
|
driver = start_firefox_resilient(headless_first=HEADLESS_PROVIDER)
|
|
|
|
print("Logging in...")
|
|
driver.get(PARTDB_BASE + "/")
|
|
if not ensure_logged_in(driver, PARTDB_BASE, interactive_ok=True, wait_s=120):
|
|
print("Failed to log in!")
|
|
driver.quit()
|
|
return
|
|
|
|
controller = driver.current_window_handle
|
|
provider_success = 0
|
|
provider_failed = 0
|
|
|
|
for part_id, part_name in tqdm(needs_provider, desc="Updating from providers"):
|
|
print(f"\nUpdating {part_name}...")
|
|
ok, where = run_provider_update_flow(driver, PARTDB_BASE, "/en/", part_id, controller)
|
|
|
|
if ok:
|
|
provider_success += 1
|
|
print(f" ✓ Success")
|
|
else:
|
|
provider_failed += 1
|
|
print(f" ✗ Failed at: {where}")
|
|
|
|
driver.quit()
|
|
|
|
print("\n" + "=" * 70)
|
|
print("PROVIDER UPDATE SUMMARY")
|
|
print("=" * 70)
|
|
print(f"Successful: {provider_success}")
|
|
print(f"Failed: {provider_failed}")
|
|
|
|
except Exception as e:
|
|
print(f"Error during provider updates: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
elif needs_provider and not update_providers:
|
|
print("\nParts needing provider update:")
|
|
for part_id, part_name in needs_provider[:10]: # Show first 10
|
|
print(f" - {part_name} (ID: {part_id})")
|
|
if len(needs_provider) > 10:
|
|
print(f" ... and {len(needs_provider) - 10} more")
|
|
print("\nRe-run with update_providers=True to trigger provider updates")
|
|
|
|
print("\n" + "=" * 70)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
dry_run = "--dry-run" in sys.argv or "-d" in sys.argv
|
|
update_providers = "--update-providers" in sys.argv or "-u" in sys.argv
|
|
|
|
run_standardize_asdmb(dry_run=dry_run, update_providers=update_providers)
|