Files
PartDB_Helper_App/workflows/standardize_passives.py

1085 lines
39 KiB
Python

"""
Workflow to standardize passive components in the Passives category.
This script goes through all parts in the Passives category (including subcategories) and:
1. Sets the name to "Value Package" format
2. Sets the description to "MPN Value Package Tolerance Voltage/Current/Power" format
- Voltage for capacitors
- Current for inductors
- Power for resistors
3. Sets the EDA value to match the component value
4. Normalizes all parameters to base units (R, F, H, V, A, W, %)
Uses the PartDB API for all operations.
"""
import re
from typing import Optional, List, Tuple
from tqdm import tqdm
from config import PARTDB_BASE, PARTDB_TOKEN
from apis.partdb_api import PartDB
from parsers.values import (
parse_resistance_to_ohms, format_ohms_for_eda,
parse_capacitance_to_farads, format_farads_for_eda
)
def get_all_parts_in_category(api: PartDB, category_name: str) -> List[dict]:
"""
Get all parts in a category and its subcategories.
"""
# First, find the category
categories = api.list_categories()
target_cat = None
target_cat_id = None
for cat in categories:
name = (cat.get("name") or "").strip()
if name.lower() == category_name.lower():
target_cat = cat
target_cat_id = api._extract_id(cat)
break
if not target_cat_id:
print(f"Category '{category_name}' not found!")
return []
print(f"Found category '{category_name}' with ID {target_cat_id}")
# Find all subcategories
subcategory_ids = [target_cat_id]
def find_children(parent_id: int):
for cat in categories:
parent = cat.get("parent")
if parent:
parent_id_str = None
if isinstance(parent, dict):
parent_id_str = parent.get("id") or parent.get("_id")
elif isinstance(parent, str):
parent_id_str = parent
if parent_id_str:
# Extract just the number
if isinstance(parent_id_str, str):
parent_num = int(''.join(c for c in parent_id_str if c.isdigit()))
else:
parent_num = int(parent_id_str)
if parent_num == parent_id:
child_id = api._extract_id(cat)
if child_id and child_id not in subcategory_ids:
subcategory_ids.append(child_id)
print(f" Found subcategory: {cat.get('name')} (ID: {child_id})")
find_children(child_id)
find_children(target_cat_id)
print(f"Total categories to process: {len(subcategory_ids)}")
print(f"Category IDs: {subcategory_ids}")
# Get all parts in these categories
all_parts = []
# First, try to get all parts without category filter and then filter client-side
# This is more reliable when the category filter might not work properly
print("\nFetching all parts from API...")
try:
page = 1
per_page = 30 # Use smaller page size to match API default
all_fetched_parts = []
while True:
params = {"per_page": per_page, "page": page}
print(f" Fetching page {page}...")
try:
parts = api._get("/api/parts", params=params)
if isinstance(parts, list):
if not parts:
break
all_fetched_parts.extend(parts)
print(f" Got {len(parts)} parts (total so far: {len(all_fetched_parts)})")
# Keep going if we got a full page
if len(parts) < per_page:
break
page += 1
elif isinstance(parts, dict):
data = parts.get("data", [])
meta = parts.get("meta", {})
links = parts.get("links", {})
if not data:
break
all_fetched_parts.extend(data)
print(f" Got {len(data)} parts (total so far: {len(all_fetched_parts)})")
# Check if there's more pages using multiple methods
has_more = False
# Method 1: Check links.next
if links.get("next"):
has_more = True
# Method 2: Check meta pagination
elif meta.get("current_page") is not None and meta.get("last_page") is not None:
current = meta.get("current_page")
last = meta.get("last_page")
if current < last:
has_more = True
# Method 3: Check if we got a full page
elif len(data) >= per_page:
has_more = True
if has_more:
page += 1
# Safety check
if page > 100:
print(f" Warning: Fetched 100 pages, stopping")
break
else:
break
else:
print(f" Unexpected response type: {type(parts)}")
break
except Exception as e:
print(f" Error fetching page {page}: {e}")
import traceback
traceback.print_exc()
break
print(f"\nTotal parts fetched from API: {len(all_fetched_parts)}")
# Now filter by category
print("\nFiltering by categories...")
category_counts = {}
for part in all_fetched_parts:
part_cat = part.get("category")
part_cat_id = None
# Handle different category formats
if isinstance(part_cat, dict):
part_cat_id = api._extract_id(part_cat)
elif isinstance(part_cat, str):
# Could be "/api/categories/123" or just "123"
try:
if "/categories/" in part_cat:
part_cat_id = int(part_cat.strip("/").split("/")[-1])
else:
part_cat_id = int(''.join(c for c in part_cat if c.isdigit()))
except Exception:
pass
elif isinstance(part_cat, int):
part_cat_id = part_cat
# Also check relationships
if part_cat_id is None:
relationships = part.get("relationships", {})
if relationships:
rel_cat = relationships.get("category")
if isinstance(rel_cat, dict):
part_cat_id = api._extract_id(rel_cat.get("data", {}))
# Also check attributes
if part_cat_id is None:
attributes = part.get("attributes", {})
if attributes:
attr_cat = attributes.get("category")
if attr_cat:
if isinstance(attr_cat, dict):
part_cat_id = api._extract_id(attr_cat)
elif isinstance(attr_cat, (int, str)):
try:
part_cat_id = int(str(attr_cat).strip("/").split("/")[-1])
except Exception:
pass
# Track category distribution
if part_cat_id:
category_counts[part_cat_id] = category_counts.get(part_cat_id, 0) + 1
if part_cat_id and part_cat_id in subcategory_ids:
all_parts.append(part)
print(f"\nCategory distribution in fetched parts:")
for cat_id, count in sorted(category_counts.items()):
in_target = "" if cat_id in subcategory_ids else " "
print(f" [{in_target}] Category {cat_id}: {count} parts")
print(f"\nParts in target categories: {len(all_parts)}")
except Exception as e:
print(f"Error in fetch-all approach: {e}")
import traceback
traceback.print_exc()
# Fallback to per-category fetching
print("\nFalling back to per-category fetching...")
for cat_id in subcategory_ids:
try:
for param_format in [
{"category": cat_id},
{"filter[category]": cat_id},
{"category_id": cat_id},
]:
try:
page = 1
per_page = 500
cat_parts = []
while True:
params = param_format.copy()
params["per_page"] = per_page
params["page"] = page
parts = api._get("/api/parts", params=params)
if isinstance(parts, list):
if not parts:
break
cat_parts.extend(parts)
print(f" Category {cat_id}: page {page} ({len(parts)} parts)")
if len(parts) < per_page:
break
page += 1
elif isinstance(parts, dict):
data = parts.get("data", [])
if not data:
break
cat_parts.extend(data)
print(f" Category {cat_id}: page {page} ({len(data)} parts)")
meta = parts.get("meta", {})
if meta.get("current_page") and meta.get("last_page"):
if meta["current_page"] >= meta["last_page"]:
break
elif len(data) < per_page:
break
page += 1
else:
break
if cat_parts:
all_parts.extend(cat_parts)
print(f" Category {cat_id}: total {len(cat_parts)} parts")
break
except Exception as e:
print(f" Error with param format {param_format}: {e}")
continue
except Exception as e:
print(f"Error fetching parts for category {cat_id}: {e}")
print(f"Found {len(all_parts)} parts total")
return all_parts
def detect_component_type(part: dict, params: dict) -> Optional[str]:
"""
Detect if a part is a resistor, capacitor, or inductor based on its parameters.
Returns: 'resistor', 'capacitor', 'inductor', or None
"""
param_names = [name.lower() for name in params.keys()]
# Check for resistance parameter
if 'resistance' in param_names or 'ohm' in ' '.join(param_names):
return 'resistor'
# Check for capacitance parameter
if 'capacitance' in param_names or 'farad' in ' '.join(param_names):
return 'capacitor'
# Check for inductance parameter
if 'inductance' in param_names or 'henry' in ' '.join(param_names):
return 'inductor'
# Check category name as fallback
cat = part.get("category") or {}
cat_name = (cat.get("name") if isinstance(cat, dict) else "").lower()
if 'resistor' in cat_name:
return 'resistor'
elif 'capacitor' in cat_name:
return 'capacitor'
elif 'inductor' in cat_name:
return 'inductor'
return None
def extract_value_info(params: dict, comp_type: str) -> Tuple[Optional[str], Optional[str]]:
"""
Extract the value and unit from parameters based on component type.
Returns: (value, unit) or (None, None)
"""
if comp_type == 'resistor':
value_unit = params.get('resistance')
if value_unit:
return value_unit
elif comp_type == 'capacitor':
value_unit = params.get('capacitance')
if value_unit:
return value_unit
elif comp_type == 'inductor':
value_unit = params.get('inductance')
if value_unit:
return value_unit
return (None, None)
def extract_package(part: dict) -> Optional[str]:
"""
Extract package/footprint from the part.
"""
fp = part.get("footprint")
if isinstance(fp, dict):
return fp.get("name")
return None
def extract_tolerance(params: dict) -> Optional[str]:
"""
Extract tolerance from parameters.
"""
value_unit = params.get('tolerance')
if value_unit:
value, unit = value_unit
if value:
# Handle range format like "-10 % ... 10 %" or "±10%"
value_str = str(value).strip()
# If it's a range, extract the positive value
if '...' in value_str:
parts = value_str.split('...')
if len(parts) == 2:
value_str = parts[1].strip().replace('%', '').replace(' ', '')
elif '±' in value_str:
value_str = value_str.replace('±', '').replace('%', '').replace(' ', '')
else:
value_str = value_str.replace('%', '').replace(' ', '')
# Don't add % if unit is already %
if unit and unit.strip() == '%':
return f"{value_str}%"
else:
return f"{value_str}%"
return None
def extract_voltage(params: dict) -> Optional[str]:
"""
Extract voltage rating from parameters.
"""
for key in ['voltage', 'voltage - rated', 'max_voltage', 'voltage_max', 'voltage_rated', 'rated_voltage', 'voltage_rating']:
value_unit = params.get(key)
if value_unit:
value, unit = value_unit
if value:
# Don't add V if unit already has it
if unit and unit.strip().upper() in ['V', 'VOLT', 'VOLTS']:
return f"{value}{unit}"
else:
return f"{value}{unit or 'V'}"
return None
def extract_current(params: dict) -> Optional[str]:
"""
Extract current rating from parameters.
"""
for key in ['current rating (amps)', 'current', 'current_rated', 'rated_current', 'current_rating']:
value_unit = params.get(key)
if value_unit:
value, unit = value_unit
if value:
# Don't add A if unit already has it
if unit and unit.strip().upper() in ['A', 'AMP', 'AMPS', 'AMPERE']:
return f"{value}{unit}"
else:
return f"{value}{unit or 'A'}"
return None
def extract_power(params: dict) -> Optional[str]:
"""
Extract power rating from parameters.
"""
for key in ['power', 'power (watts)', 'power_rating', 'rated_power', 'dissipation']:
value_unit = params.get(key)
if value_unit:
value, unit = value_unit
if value:
# Clean up value - remove any fraction notation (e.g., "0.125W, 1/8W" -> "0.125")
value_str = str(value)
if ',' in value_str:
# Take only the first part before comma
value_str = value_str.split(',')[0].strip()
# Remove any W/Watt suffix from value itself
value_str = value_str.rstrip('Ww').rstrip()
# Always return with just 'W' suffix
return f"{value_str}W"
return None
def normalize_to_base_units(value: str, unit: Optional[str], param_type: str) -> Tuple[Optional[float], str]:
"""
Normalize parameter values to base units.
Returns: (numeric_value, base_unit) where base_unit is 'R', 'F', 'H', 'V', 'A', 'W', or '%'
"""
try:
# Clean value string
value = str(value).replace(',', '').strip()
if param_type == 'resistance':
ohms = parse_resistance_to_ohms(value, unit)
if ohms is not None:
return (ohms, 'R')
elif param_type == 'capacitance':
farads = parse_capacitance_to_farads(value, unit)
if farads is not None:
return (farads, 'F')
elif param_type == 'inductance':
# Convert to Henry
num = float(value)
u = (unit or "").lower()
if u in ['h', 'henry']:
return (num, 'H')
elif u in ['mh', 'millihenry']:
return (num / 1e3, 'H')
elif u in ['uh', 'microhenry', 'µh', 'μh']:
return (num / 1e6, 'H')
elif u in ['nh', 'nanohenry']:
return (num / 1e9, 'H')
elif param_type == 'tolerance':
# Normalize to percentage
num = float(value)
u = (unit or "").lower()
if u == '%' or not u:
return (num, '%')
elif param_type == 'voltage':
# Normalize to volts
num = float(value)
u = (unit or "").lower()
if u in ['v', 'volt', 'volts']:
return (num, 'V')
elif u in ['kv', 'kilovolt']:
return (num * 1e3, 'V')
elif u in ['mv', 'millivolt']:
return (num / 1e3, 'V')
elif param_type == 'current':
# Normalize to amperes
num = float(value)
u = (unit or "").lower()
if u in ['a', 'amp', 'ampere', 'amps']:
return (num, 'A')
elif u in ['ma', 'milliamp']:
return (num / 1e3, 'A')
elif u in ['ua', 'microamp', 'µa', 'μa']:
return (num / 1e6, 'A')
elif param_type == 'power':
# Normalize to watts
num = float(value)
u = (unit or "").lower()
if u in ['w', 'watt', 'watts']:
return (num, 'W')
elif u in ['mw', 'milliwatt']:
return (num / 1e3, 'W')
elif u in ['kw', 'kilowatt']:
return (num * 1e3, 'W')
except (ValueError, TypeError):
pass
return (None, '')
def format_resistor_name(ohms: float) -> str:
"""
Format resistance for part name (e.g., '1R', '10K', '1M').
Removes unnecessary decimals.
"""
if ohms < 1000:
# Format in ohms with R
if ohms == int(ohms):
return f"{int(ohms)}R"
else:
# Remove trailing zeros
formatted = f"{ohms:.10g}R"
return formatted
elif ohms < 1_000_000:
# Format in K
k_val = ohms / 1000
if k_val == int(k_val):
return f"{int(k_val)}K"
else:
return f"{k_val:.10g}K"
else:
# Format in M
m_val = ohms / 1_000_000
if m_val == int(m_val):
return f"{int(m_val)}M"
else:
return f"{m_val:.10g}M"
def format_capacitor_name(farads: float) -> str:
"""
Format capacitance for part name (e.g., '100nF', '10uF', '1pF').
Removes unnecessary decimals and uses proper capitalization.
"""
if farads >= 1e-6:
# Format in uF
uf_val = farads * 1e6
if uf_val == int(uf_val):
return f"{int(uf_val)}uF"
else:
return f"{uf_val:.10g}uF"
elif farads >= 1e-9:
# Format in nF
nf_val = farads * 1e9
if nf_val == int(nf_val):
return f"{int(nf_val)}nF"
else:
return f"{nf_val:.10g}nF"
else:
# Format in pF
pf_val = farads * 1e12
if pf_val == int(pf_val):
return f"{int(pf_val)}pF"
else:
return f"{pf_val:.10g}pF"
def format_inductor_name(henries: float) -> str:
"""
Format inductance for part name (e.g., '10uH', '100nH', '1mH').
Removes unnecessary decimals and uses proper capitalization.
"""
if henries >= 1e-3:
# Format in mH
mh_val = henries * 1e3
if mh_val == int(mh_val):
return f"{int(mh_val)}mH"
else:
return f"{mh_val:.10g}mH"
elif henries >= 1e-6:
# Format in uH
uh_val = henries * 1e6
if uh_val == int(uh_val):
return f"{int(uh_val)}uH"
else:
return f"{uh_val:.10g}uH"
else:
# Format in nH
nh_val = henries * 1e9
if nh_val == int(nh_val):
return f"{int(nh_val)}nH"
else:
return f"{nh_val:.10g}nH"
def format_eda_value(value: str, unit: Optional[str], comp_type: str) -> Optional[str]:
"""
Format the EDA value based on component type.
"""
# Clean value string
value = str(value).replace(',', '').strip()
if comp_type == 'resistor':
ohms = parse_resistance_to_ohms(value, unit)
if ohms is not None:
return format_ohms_for_eda(ohms)
elif comp_type == 'capacitor':
farads = parse_capacitance_to_farads(value, unit)
if farads is not None:
return format_farads_for_eda(farads)
elif comp_type == 'inductor':
# For inductors, we'll use a simple format
# Convert to standard units (H, mH, uH, nH)
try:
num = float(value)
u = (unit or "").lower()
if u in ['h', 'henry']:
val = num
elif u in ['mh', 'millihenry']:
val = num / 1e3
elif u in ['uh', 'microhenry', 'µh', 'μh']:
val = num / 1e6
elif u in ['nh', 'nanohenry']:
val = num / 1e9
else:
return None
# Format nicely with proper capitalization
if val >= 1:
return f"{val:.3g}H"
elif val >= 1e-3:
return f"{val*1e3:.3g}mH"
elif val >= 1e-6:
return f"{val*1e6:.3g}uH"
else:
return f"{val*1e9:.3g}nH"
except (ValueError, TypeError):
return None
return None
def get_parameter_id_by_name(api: PartDB, part_id: int, param_name: str) -> Optional[int]:
"""
Get the parameter ID for a specific parameter name of a part.
"""
try:
param_values = api.get_part_parameter_values(part_id)
for pv in param_values:
name = (pv.get("name") or (pv.get("parameter") or {}).get("name") or "").strip().lower()
if name == param_name.lower():
# Try to extract parameter value ID
pv_id = api._extract_id(pv)
if pv_id:
return pv_id
except Exception:
pass
return None
def update_parameter(api: PartDB, part_id: int, param_name: str, value: float, unit: str) -> bool:
"""
Update or create a parameter for a part with normalized base units.
According to PartDB API docs, parameters need:
- value_typical: numeric value
- unit: can be a string symbol OR an object with symbol/name
- parameter: IRI to parameter template (for creation)
- element: IRI to owning part (for creation)
"""
try:
# First, try to find existing parameter value
param_values = api.get_part_parameter_values(part_id)
param_value_id = None
param_template_id = None
param_template_iri = None
existing_unit_obj = None
for pv in param_values:
name = (pv.get("name") or (pv.get("parameter") or {}).get("name") or "").strip().lower()
if name == param_name.lower():
param_value_id = api._extract_id(pv)
# Get the existing unit object structure
existing_unit_obj = pv.get("unit")
# Get parameter template reference
param_obj = pv.get("parameter")
if isinstance(param_obj, dict):
param_template_id = api._extract_id(param_obj)
# Try to get IRI
param_template_iri = param_obj.get("@id") or param_obj.get("id")
if isinstance(param_template_iri, int):
param_template_iri = f"/api/parameters/{param_template_iri}"
elif isinstance(param_obj, str) and "/api/" in param_obj:
param_template_iri = param_obj
break
if param_value_id:
# Update existing parameter value
# The unit might need to be in the same format as the existing one
# Try multiple approaches
payloads_to_try = []
# 1. Try with unit as a simple string
payloads_to_try.append({
"value_typical": float(value),
"unit": unit
})
# 2. Try with unit as an object (if we have the existing structure)
if isinstance(existing_unit_obj, dict):
# Use existing unit structure but update symbol
unit_obj = existing_unit_obj.copy()
unit_obj["symbol"] = unit
payloads_to_try.append({
"value_typical": float(value),
"unit": unit_obj
})
# 3. Try with unit as a minimal object
payloads_to_try.append({
"value_typical": float(value),
"unit": {"symbol": unit}
})
# 4. Try with just value_typical (let unit stay as is)
payloads_to_try.append({
"value_typical": float(value)
})
for payload in payloads_to_try:
r = api._patch_merge(f"/api/parameter-values/{param_value_id}", payload)
if r.status_code in range(200, 300):
return True
# Try alternative underscore endpoint
r = api._patch_merge(f"/api/parameter_values/{param_value_id}", payload)
if r.status_code in range(200, 300):
return True
# All attempts failed - debug print
print(f" Failed to update parameter {param_name}: Last status {r.status_code}")
if r.status_code >= 400:
print(f" Error: {r.text[:300]}")
return False
else:
# Parameter doesn't exist - would need to create it
# This requires knowing the parameter template, which we might not have
if param_template_iri or param_template_id:
# Build IRI if we only have ID
if not param_template_iri and param_template_id:
param_template_iri = f"/api/parameters/{param_template_id}"
# Try multiple unit formats
payloads_to_try = [
{
"parameter": param_template_iri,
"element": f"/api/parts/{part_id}",
"value_typical": float(value),
"unit": unit
},
{
"parameter": param_template_iri,
"element": f"/api/parts/{part_id}",
"value_typical": float(value),
"unit": {"symbol": unit}
}
]
for payload in payloads_to_try:
r = api._post_try("/api/parameter-values", payload)
if r.status_code in range(200, 300):
return True
# Try alternative underscore endpoint
r = api._post_try("/api/parameter_values", payload)
if r.status_code in range(200, 300):
return True
print(f" Failed to create parameter {param_name}: {r.status_code}")
return False
except Exception as e:
print(f" Exception updating parameter {param_name}: {e}")
import traceback
traceback.print_exc()
return False
def standardize_part(api: PartDB, part: dict, dry_run: bool = False) -> Tuple[bool, str]:
"""
Standardize a single part's name, description, EDA value, and parameters.
Returns: (success, message)
"""
part_id = api._extract_id(part)
if not part_id:
return (False, "No part ID")
# Get full part details with parameters
try:
full_part = api.get_part(part_id)
except Exception as e:
return (False, f"Failed to fetch part: {e}")
# Get parameters
params = api.get_part_params_flat(full_part)
if not params:
# Try to fetch parameter values separately
try:
param_values = api.get_part_parameter_values(part_id)
if param_values:
full_part_with_params = dict(full_part)
full_part_with_params["parameter_values"] = param_values
params = api.get_part_params_flat(full_part_with_params)
except Exception:
pass
if not params:
return (False, "No parameters found")
# Detect component type
comp_type = detect_component_type(full_part, params)
if not comp_type:
return (False, "Could not detect component type")
# Extract value
value, unit = extract_value_info(params, comp_type)
if not value:
return (False, f"No {comp_type} value found")
# Clean up value string - remove commas and extra spaces
value = str(value).replace(',', '').strip()
# Normalize main value to base units
if comp_type == 'resistor':
normalized_value, base_unit = normalize_to_base_units(value, unit, 'resistance')
elif comp_type == 'capacitor':
normalized_value, base_unit = normalize_to_base_units(value, unit, 'capacitance')
elif comp_type == 'inductor':
normalized_value, base_unit = normalize_to_base_units(value, unit, 'inductance')
else:
normalized_value, base_unit = None, ''
if normalized_value is None:
return (False, f"Could not normalize value {value}{unit or ''}")
# Extract other info
package = extract_package(full_part)
tolerance = extract_tolerance(params)
# Normalize tolerance
tolerance_value = None
if tolerance:
# Extract number from tolerance string (handle ranges like "±10%" or "10%")
tolerance_clean = tolerance.replace('±', '').replace('%', '').strip()
match = re.search(r'([\d.]+)$', tolerance_clean)
if match:
tolerance_value, _ = normalize_to_base_units(match.group(1), '%', 'tolerance')
# Build new name: "Value Package" using short format (inductors: "Value Current")
if comp_type == 'resistor':
value_formatted = format_resistor_name(normalized_value)
elif comp_type == 'capacitor':
value_formatted = format_capacitor_name(normalized_value)
elif comp_type == 'inductor':
value_formatted = format_inductor_name(normalized_value)
else:
value_formatted = f"{value}{unit or ''}"
# For inductors, include current in the name (no package)
if comp_type == 'inductor':
current = extract_current(params)
if current:
new_name = f"{value_formatted} {current}"
else:
new_name = value_formatted
else:
new_name = f"{value_formatted} {package or 'Unknown'}"
# Get MPN
mpn = full_part.get("manufacturer_product_number") or full_part.get("mpn") or full_part.get("attributes", {}).get("manufacturer_product_number") or ""
# Build new description: "MPN Value Package Tolerance [Voltage/Current/Power]"
desc_parts = []
if mpn:
desc_parts.append(mpn)
desc_parts.extend([value_formatted, package or "Unknown"])
if tolerance:
desc_parts.append(tolerance)
voltage_value = None
current_value = None
power_value = None
if comp_type == 'capacitor':
voltage = extract_voltage(params)
if voltage:
desc_parts.append(voltage)
# Extract numeric value
match = re.search(r'([\d.]+)', voltage)
if match:
voltage_value, _ = normalize_to_base_units(match.group(1), 'V', 'voltage')
elif comp_type == 'inductor':
current = extract_current(params)
if current:
desc_parts.append(current)
# Extract numeric value
match = re.search(r'([\d.]+)', current)
if match:
current_value, _ = normalize_to_base_units(match.group(1), 'A', 'current')
elif comp_type == 'resistor':
power = extract_power(params)
if power:
desc_parts.append(power)
# Extract numeric value
match = re.search(r'([\d.]+)', power)
if match:
power_value, _ = normalize_to_base_units(match.group(1), 'W', 'power')
new_description = " ".join(desc_parts)
# Remove "FIXED IND" from inductor descriptions
if comp_type == 'inductor':
new_description = new_description.replace("FIXED IND", "").replace(" ", " ").strip()
# Format EDA value
new_eda = format_eda_value(value, unit, comp_type)
if not new_eda:
return (False, f"Could not format EDA value from {value}{unit or ''}")
# Get current values
current_name = full_part.get("name") or full_part.get("attributes", {}).get("name") or ""
current_desc = full_part.get("description") or full_part.get("attributes", {}).get("description") or ""
current_eda = api.get_eda_value(full_part) or ""
# Check what needs updating
changes = []
if current_name != new_name:
changes.append(f"name: '{current_name}''{new_name}'")
if current_desc != new_description:
changes.append(f"desc: '{current_desc}''{new_description}'")
if current_eda != new_eda:
changes.append(f"EDA: '{current_eda}''{new_eda}'")
if not changes:
return (True, "Already correct")
if dry_run:
return (True, f"Would update: {'; '.join(changes)}")
# Apply updates
try:
# Update name and description
payload = {
"name": new_name,
"description": new_description,
}
r = api._patch_merge(f"/api/parts/{part_id}", payload)
if r.status_code not in range(200, 300):
return (False, f"Failed to update name/desc: {r.status_code}")
# Update EDA value
if current_eda != new_eda:
success = api.patch_eda_value(part_id, new_eda)
if not success:
return (False, "Failed to update EDA value")
return (True, f"Updated: {'; '.join(changes)}")
except Exception as e:
return (False, f"Update failed: {e}")
def run_standardize_passives(category_name: str = "Passives", dry_run: bool = False, progress_callback=None):
"""
Main function to standardize all passive components.
Args:
category_name: Name of the category to process (default: "Passives")
dry_run: If True, only show what would be changed without making changes
progress_callback: Optional callback function(current, total, status_text, should_cancel_func)
Returns True if operation should be cancelled
"""
print("=" * 70)
print("PASSIVE COMPONENTS STANDARDIZATION")
print("=" * 70)
print(f"Category: {category_name}")
print(f"Mode: {'DRY RUN (no changes)' if dry_run else 'LIVE (making changes)'}")
print("=" * 70)
print()
# Initialize API
api = PartDB(PARTDB_BASE, PARTDB_TOKEN)
# Get all parts in the category
print("Fetching parts from category and subcategories...")
parts = get_all_parts_in_category(api, category_name)
if not parts:
print("No parts found!")
return
print(f"\nProcessing {len(parts)} parts...")
print()
# Statistics
success_count = 0
already_correct = 0
failed_count = 0
errors = []
# Process each part
bar = tqdm(parts, desc="Standardizing", unit="part") if not progress_callback else None
for idx, part in enumerate(parts):
# Check for cancellation
if progress_callback:
cancelled = progress_callback(idx, len(parts), f"Processing part {idx+1}/{len(parts)}...")
if cancelled:
print("\n⚠ Operation cancelled by user")
break
part_id = api._extract_id(part)
mpn = part.get("manufacturer_product_number") or part.get("mpn") or f"ID:{part_id}"
if bar:
bar.set_postfix_str(mpn[:30])
success, message = standardize_part(api, part, dry_run=dry_run)
if success:
if "Already correct" in message:
already_correct += 1
else:
success_count += 1
if bar:
tqdm.write(f"{mpn}: {message}")
else:
print(f"{mpn}: {message}")
else:
failed_count += 1
errors.append((mpn, message))
if bar:
tqdm.write(f"{mpn}: {message}")
else:
print(f"{mpn}: {message}")
if bar:
bar.close()
# Final progress update
if progress_callback:
progress_callback(len(parts), len(parts), "Complete!")
# Print summary
print()
print("=" * 70)
print("SUMMARY")
print("=" * 70)
print(f"Total parts processed: {len(parts)}")
print(f"Already correct: {already_correct}")
print(f"Successfully updated: {success_count}")
print(f"Failed: {failed_count}")
print("=" * 70)
if errors:
print()
print("ERRORS:")
for mpn, msg in errors[:20]: # Show first 20 errors
print(f" {mpn}: {msg}")
if len(errors) > 20:
print(f" ... and {len(errors) - 20} more")
print()
if __name__ == "__main__":
import sys
# Check for dry-run flag
dry_run = "--dry-run" in sys.argv or "-n" in sys.argv
if dry_run:
print("Running in DRY RUN mode - no changes will be made")
print()
run_standardize_passives(category_name="Passives", dry_run=dry_run)