421 lines
15 KiB
Python
421 lines
15 KiB
Python
"""
|
|
Import parts from Digi-Key CSV exports and update from providers.
|
|
|
|
This workflow:
|
|
1. Scans the "import CSVs" folder for CSV files
|
|
2. Reads Digi-Key part numbers and other info from each CSV
|
|
3. Creates parts in PartDB if they don't exist
|
|
4. Triggers provider updates to fetch full information
|
|
5. Sets EDA values based on part type
|
|
"""
|
|
|
|
import os
|
|
import csv
|
|
import re
|
|
from pathlib import Path
|
|
from typing import List, Dict, Tuple
|
|
from tqdm import tqdm
|
|
|
|
from config import PARTDB_BASE, PARTDB_TOKEN, HEADLESS_PROVIDER
|
|
from apis.partdb_api import PartDB
|
|
from provider.selenium_flow import (
|
|
start_firefox_resilient, ensure_logged_in, run_provider_update_flow
|
|
)
|
|
from parsers.values import (
|
|
parse_resistance_to_ohms, format_ohms_for_eda,
|
|
parse_capacitance_to_farads, format_farads_for_eda
|
|
)
|
|
|
|
|
|
def find_csv_files(folder_path: str = "import CSVs") -> List[Path]:
|
|
"""Find all CSV files in the import folder."""
|
|
folder = Path(folder_path)
|
|
if not folder.exists():
|
|
print(f"Creating folder: {folder}")
|
|
folder.mkdir(parents=True, exist_ok=True)
|
|
return []
|
|
|
|
csv_files = list(folder.glob("*.csv"))
|
|
return csv_files
|
|
|
|
|
|
def parse_digikey_csv(csv_path: Path) -> List[Dict[str, str]]:
|
|
"""
|
|
Parse a Digi-Key CSV export file.
|
|
|
|
Expected columns (case-insensitive):
|
|
- Digi-Key Part Number
|
|
- Manufacturer Part Number
|
|
- Manufacturer
|
|
- Description
|
|
- Quantity Available
|
|
- Unit Price
|
|
|
|
Returns list of part dictionaries.
|
|
"""
|
|
parts = []
|
|
|
|
with open(csv_path, 'r', encoding='utf-8-sig') as f:
|
|
reader = csv.DictReader(f)
|
|
|
|
# Normalize column names (remove BOM, strip whitespace, lowercase)
|
|
if reader.fieldnames:
|
|
reader.fieldnames = [name.strip().lower() for name in reader.fieldnames]
|
|
|
|
for row in reader:
|
|
# Skip empty rows
|
|
if not any(row.values()):
|
|
continue
|
|
|
|
# Extract relevant fields (try multiple column name variations)
|
|
dkpn = (row.get('dk part #') or
|
|
row.get('digi-key part number') or
|
|
row.get('digikey part number') or
|
|
row.get('part number') or '').strip()
|
|
|
|
# Handle multiple DK part numbers separated by commas
|
|
if dkpn and ',' in dkpn:
|
|
dkpn = dkpn.split(',')[0].strip()
|
|
|
|
mpn = (row.get('mfr part #') or
|
|
row.get('manufacturer part number') or
|
|
row.get('mfr part number') or
|
|
row.get('mpn') or '').strip()
|
|
|
|
manufacturer = (row.get('mfr') or
|
|
row.get('manufacturer') or '').strip()
|
|
|
|
description = (row.get('description') or
|
|
row.get('product description') or '').strip()
|
|
|
|
# Skip if no MPN
|
|
if not mpn:
|
|
continue
|
|
|
|
parts.append({
|
|
'dkpn': dkpn,
|
|
'mpn': mpn,
|
|
'manufacturer': manufacturer,
|
|
'description': description
|
|
})
|
|
|
|
return parts
|
|
|
|
|
|
def create_part_if_not_exists(api: PartDB, part_info: Dict[str, str]) -> Tuple[bool, int, str]:
|
|
"""
|
|
Create a part in PartDB if it doesn't already exist.
|
|
|
|
Returns: (created, part_id, message)
|
|
- created: True if part was created, False if it already existed
|
|
- part_id: The part ID (existing or newly created)
|
|
- message: Status message
|
|
"""
|
|
mpn = part_info['mpn']
|
|
dkpn = part_info.get('dkpn')
|
|
manufacturer_name = part_info.get('manufacturer', 'Unknown')
|
|
|
|
# Check if part already exists
|
|
existing_id = api.find_part_exact(dkpn=dkpn, mpn=mpn)
|
|
if existing_id:
|
|
return (False, existing_id, "Already exists")
|
|
|
|
# Create minimal part (provider update will fill in details)
|
|
try:
|
|
# Get or create manufacturer
|
|
manufacturer_id = api.ensure_manufacturer(manufacturer_name)
|
|
|
|
# Use a default category - provider update will suggest better one
|
|
# You can change this to a "To Be Categorized" category ID
|
|
default_category_id = 1 # Change this to your default category
|
|
|
|
part_id = api.create_part(
|
|
category_id=default_category_id,
|
|
manufacturer_id=manufacturer_id,
|
|
name=mpn, # Will be updated by provider
|
|
mpn=mpn,
|
|
description=part_info.get('description', ''),
|
|
footprint_id=None,
|
|
product_url=f"https://www.digikey.com/product-detail/en/-/{dkpn}" if dkpn else None
|
|
)
|
|
|
|
return (True, part_id, "Created")
|
|
|
|
except Exception as e:
|
|
return (False, 0, f"Failed: {str(e)}")
|
|
|
|
|
|
def set_eda_value_for_part(api: PartDB, part_id: int, mpn: str):
|
|
"""
|
|
Set the EDA value for a part based on its type.
|
|
|
|
For passives (resistors, capacitors, inductors): use the component value
|
|
For ICs and other components: use the base of the IC name
|
|
|
|
Args:
|
|
api: PartDB API instance
|
|
part_id: ID of the part to update
|
|
mpn: Manufacturer part number
|
|
"""
|
|
# Get part details
|
|
try:
|
|
part = api.get_part(part_id)
|
|
name = part.get('name', mpn).upper()
|
|
description = part.get('description', '').upper()
|
|
|
|
# Detect part type and extract value
|
|
eda_value = None
|
|
|
|
# Check for resistors
|
|
if any(indicator in name or indicator in description for indicator in ['OHM', 'Ω', 'RESISTOR', 'RES ']):
|
|
try:
|
|
ohms = parse_resistance_to_ohms(name)
|
|
if ohms is not None:
|
|
eda_value = format_ohms_for_eda(ohms)
|
|
print(f" Setting resistor EDA value: {eda_value}")
|
|
except:
|
|
pass
|
|
|
|
# Check for capacitors
|
|
elif any(indicator in name or indicator in description for indicator in ['FARAD', 'F ', 'CAP', 'CAPACITOR']):
|
|
try:
|
|
farads = parse_capacitance_to_farads(name)
|
|
if farads is not None:
|
|
eda_value = format_farads_for_eda(farads)
|
|
print(f" Setting capacitor EDA value: {eda_value}")
|
|
except:
|
|
pass
|
|
|
|
# Check for inductors
|
|
elif any(indicator in name or indicator in description for indicator in ['INDUCTOR', 'HENRY', 'H ', 'IND ']):
|
|
# Extract inductance value (similar pattern to resistance/capacitance)
|
|
# Look for patterns like "10uH", "100nH", etc.
|
|
match = re.search(r'(\d+\.?\d*)\s*(M|K|µ|U|N|P)?H', name, re.IGNORECASE)
|
|
if match:
|
|
value = float(match.group(1))
|
|
unit = match.group(2).upper() if match.group(2) else ''
|
|
|
|
# Convert to henries
|
|
if unit in ['M', 'MH']:
|
|
value *= 1e-3
|
|
elif unit in ['µ', 'U', 'UH']:
|
|
value *= 1e-6
|
|
elif unit in ['N', 'NH']:
|
|
value *= 1e-9
|
|
elif unit in ['P', 'PH']:
|
|
value *= 1e-12
|
|
|
|
# Format for EDA
|
|
if value >= 1:
|
|
eda_value = f"{value:.2f}H"
|
|
elif value >= 1e-3:
|
|
eda_value = f"{value * 1e3:.2f}mH"
|
|
elif value >= 1e-6:
|
|
eda_value = f"{value * 1e6:.2f}µH"
|
|
else:
|
|
eda_value = f"{value * 1e9:.2f}nH"
|
|
print(f" Setting inductor EDA value: {eda_value}")
|
|
|
|
# For ICs and other components, use base name
|
|
if eda_value is None:
|
|
# Extract base name (remove package suffix)
|
|
# Common IC patterns: "TPS54302DDCR" -> "TPS54302"
|
|
# Look for alphanumeric base followed by optional package code
|
|
|
|
# Try to extract the base part number (before package code)
|
|
match = re.match(r'^([A-Z0-9]+?)([A-Z]{2,4}[A-Z]?)?$', mpn)
|
|
if match:
|
|
base_name = match.group(1)
|
|
# If we found a reasonable base (at least 5 chars), use it
|
|
if len(base_name) >= 5:
|
|
eda_value = base_name
|
|
print(f" Setting IC/component EDA value: {eda_value}")
|
|
else:
|
|
# Use full MPN if base is too short
|
|
eda_value = mpn
|
|
print(f" Setting EDA value to full MPN: {eda_value}")
|
|
else:
|
|
# Use full MPN if pattern doesn't match
|
|
eda_value = mpn
|
|
print(f" Setting EDA value to full MPN: {eda_value}")
|
|
|
|
# Set the EDA value
|
|
if eda_value:
|
|
api.patch_eda_value(part_id, eda_value)
|
|
|
|
except Exception as e:
|
|
raise Exception(f"Error setting EDA value: {e}")
|
|
|
|
|
|
def run_import_from_csv(folder_path: str = "import CSVs", update_providers: bool = True, progress_callback=None):
|
|
"""
|
|
Main function to import parts from CSV files.
|
|
|
|
Args:
|
|
folder_path: Path to folder containing CSV files
|
|
update_providers: If True, trigger provider updates after creating parts
|
|
progress_callback: Optional callback for progress updates
|
|
"""
|
|
print("=" * 70)
|
|
print("IMPORT PARTS FROM CSV FILES")
|
|
print("=" * 70)
|
|
print(f"Folder: {folder_path}")
|
|
print(f"Provider updates: {'ENABLED' if update_providers else 'DISABLED'}")
|
|
print("=" * 70)
|
|
print()
|
|
|
|
# Find CSV files
|
|
csv_files = find_csv_files(folder_path)
|
|
if not csv_files:
|
|
print(f"No CSV files found in '{folder_path}'")
|
|
print("Place Digi-Key CSV exports in this folder and try again.")
|
|
return
|
|
|
|
print(f"Found {len(csv_files)} CSV file(s):")
|
|
for csv_file in csv_files:
|
|
print(f" - {csv_file.name}")
|
|
print()
|
|
|
|
# Initialize API
|
|
api = PartDB(PARTDB_BASE, PARTDB_TOKEN)
|
|
|
|
# Process each CSV file
|
|
all_created_parts = []
|
|
total_processed = 0
|
|
total_created = 0
|
|
total_skipped = 0
|
|
total_failed = 0
|
|
|
|
for csv_file in csv_files:
|
|
print(f"\nProcessing: {csv_file.name}")
|
|
print("-" * 70)
|
|
|
|
# Parse CSV
|
|
try:
|
|
parts = parse_digikey_csv(csv_file)
|
|
print(f"Found {len(parts)} parts in CSV")
|
|
except Exception as e:
|
|
print(f"Error parsing CSV: {e}")
|
|
continue
|
|
|
|
if not parts:
|
|
print("No valid parts found in CSV")
|
|
continue
|
|
|
|
# Create parts
|
|
use_tqdm = not progress_callback
|
|
iterator = tqdm(parts, desc="Creating parts") if use_tqdm else parts
|
|
|
|
for idx, part_info in enumerate(iterator):
|
|
# Check for cancellation
|
|
if progress_callback:
|
|
cancelled = progress_callback(
|
|
total_processed + idx,
|
|
sum(len(parse_digikey_csv(f)) for f in csv_files),
|
|
f"Processing {csv_file.name}: {part_info['mpn']}"
|
|
)
|
|
if cancelled:
|
|
print("\n⚠ Operation cancelled by user")
|
|
return
|
|
|
|
created, part_id, message = create_part_if_not_exists(api, part_info)
|
|
|
|
if created:
|
|
total_created += 1
|
|
all_created_parts.append((part_id, part_info['mpn']))
|
|
if not use_tqdm:
|
|
print(f"✓ Created: {part_info['mpn']} (ID: {part_id})")
|
|
elif part_id > 0:
|
|
total_skipped += 1
|
|
else:
|
|
total_failed += 1
|
|
print(f"✗ Failed: {part_info['mpn']} - {message}")
|
|
|
|
total_processed += 1
|
|
|
|
# Summary
|
|
print("\n" + "=" * 70)
|
|
print("IMPORT SUMMARY")
|
|
print("=" * 70)
|
|
print(f"Total parts processed: {total_processed}")
|
|
print(f"Created: {total_created}")
|
|
print(f"Skipped (exist): {total_skipped}")
|
|
print(f"Failed: {total_failed}")
|
|
print("=" * 70)
|
|
|
|
# Provider updates
|
|
if all_created_parts and update_providers:
|
|
print("\n" + "=" * 70)
|
|
print("TRIGGERING PROVIDER UPDATES")
|
|
print("=" * 70)
|
|
print(f"Updating {len(all_created_parts)} newly created parts from providers...")
|
|
|
|
try:
|
|
print("Starting browser...")
|
|
driver = start_firefox_resilient(headless_first=HEADLESS_PROVIDER)
|
|
|
|
print("Logging in...")
|
|
driver.get(PARTDB_BASE + "/")
|
|
if not ensure_logged_in(driver, PARTDB_BASE, interactive_ok=True, wait_s=120):
|
|
print("Failed to log in!")
|
|
driver.quit()
|
|
return
|
|
|
|
controller = driver.current_window_handle
|
|
provider_success = 0
|
|
provider_failed = 0
|
|
|
|
use_tqdm = not progress_callback
|
|
iterator = tqdm(all_created_parts, desc="Updating from providers") if use_tqdm else all_created_parts
|
|
|
|
for idx, (part_id, mpn) in enumerate(iterator):
|
|
# Check for cancellation
|
|
if progress_callback:
|
|
cancelled = progress_callback(
|
|
idx,
|
|
len(all_created_parts),
|
|
f"Updating from providers: {mpn}"
|
|
)
|
|
if cancelled:
|
|
print("\n⚠ Provider updates cancelled by user")
|
|
break
|
|
|
|
try:
|
|
success = run_provider_update_flow(driver, PARTDB_BASE, part_id, controller)
|
|
if success:
|
|
provider_success += 1
|
|
# Set EDA value after provider update
|
|
try:
|
|
set_eda_value_for_part(api, part_id, mpn)
|
|
except Exception as e:
|
|
print(f" Warning: Could not set EDA value for {mpn}: {e}")
|
|
else:
|
|
provider_failed += 1
|
|
print(f"✗ Provider update failed for: {mpn}")
|
|
except Exception as e:
|
|
provider_failed += 1
|
|
print(f"✗ Error updating {mpn}: {e}")
|
|
|
|
driver.quit()
|
|
|
|
print("\n" + "=" * 70)
|
|
print("PROVIDER UPDATE SUMMARY")
|
|
print("=" * 70)
|
|
print(f"Successful: {provider_success}")
|
|
print(f"Failed: {provider_failed}")
|
|
print("=" * 70)
|
|
|
|
except Exception as e:
|
|
print(f"Error during provider updates: {e}")
|
|
|
|
print("\nDone!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
# Check for --no-provider flag
|
|
update_providers = "--no-provider" not in sys.argv
|
|
|
|
run_import_from_csv(update_providers=update_providers)
|