#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ CSV-Processor v2.0: Erweiterte Verarbeitung und Formatierung von CSV/Excel-Dateien mit Voreinstellungsverwaltung und Multi-Format-Export """ import csv import json import os import sys from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Any, Tuple import re # Optional: Externe Bibliotheken für Excel/ODT Support try: import openpyxl from openpyxl.styles import Font, PatternFill EXCEL_SUPPORT = True except ImportError: EXCEL_SUPPORT = False try: from odf.opendocument import OpenDocumentSpreadsheet from odf.table import Table, TableRow, TableCell from odf.text import P ODT_SUPPORT = True except ImportError: ODT_SUPPORT = False class CSVProcessor: def __init__(self): self.config_dir = Path("csv_processor_config") self.config_dir.mkdir(exist_ok=True) self.current_config = {} def clear_screen(self): """Bildschirm löschen für bessere Übersicht""" os.system('cls' if os.name == 'nt' else 'clear') def print_header(self, text: str): """Formatierte Überschrift ausgeben""" print("\n" + "="*60) print(f" {text}") print("="*60 + "\n") def get_available_presets(self) -> List[str]: """Liste aller verfügbaren Voreinstellungen""" presets = [] for file in self.config_dir.glob("*.json"): presets.append(file.stem) return presets def load_preset(self, preset_name: str) -> Dict: """Voreinstellung laden""" preset_file = self.config_dir / f"{preset_name}.json" if preset_file.exists(): with open(preset_file, 'r', encoding='utf-8') as f: return json.load(f) return {} def save_preset(self, preset_name: str, config: Dict): """Voreinstellung speichern""" preset_file = self.config_dir / f"{preset_name}.json" with open(preset_file, 'w', encoding='utf-8') as f: json.dump(config, f, indent=2, ensure_ascii=False) print(f"\n✓ Voreinstellung '{preset_name}' gespeichert.") def delete_preset(self, preset_name: str): """Voreinstellung löschen""" preset_file = self.config_dir / f"{preset_name}.json" if preset_file.exists(): preset_file.unlink() print(f"\n✓ Voreinstellung '{preset_name}' gelöscht.") else: print(f"\n✗ Voreinstellung '{preset_name}' nicht gefunden.") def ask_yes_no(self, question: str, default: Optional[bool] = None) -> bool: """Ja/Nein Frage stellen""" if default is not None: question += f" (Standard: {'Ja' if default else 'Nein'})" while True: answer = input(f"{question} [j/n]: ").lower().strip() if answer in ['j', 'ja', 'y', 'yes']: return True elif answer in ['n', 'nein', 'no']: return False elif answer == '' and default is not None: return default else: print("Bitte antworten Sie mit 'j' für Ja oder 'n' für Nein.") def select_file(self, prompt: str, extension: str = "*") -> Optional[Path]: """Datei auswählen""" while True: filename = input(f"{prompt}: ").strip() if not filename: return None path = Path(filename) if path.exists(): return path else: print(f"Datei '{filename}' nicht gefunden. Bitte erneut versuchen.") if not self.ask_yes_no("Möchten Sie es erneut versuchen?", True): return None def load_column_mappings(self, mapping_file: Optional[Path]) -> Dict[str, str]: """Spaltennamen-Zuordnungen aus JSON laden""" if not mapping_file or not mapping_file.exists(): return {} try: with open(mapping_file, 'r', encoding='utf-8') as f: return json.load(f) except json.JSONDecodeError: print(f"Fehler beim Lesen der Mapping-Datei: {mapping_file}") return {} def detect_csv_format(self, filepath: Path) -> Tuple[str, str, str]: """CSV-Format automatisch erkennen""" encodings = ['utf-8', 'latin-1', 'cp1252'] for encoding in encodings: try: with open(filepath, 'r', encoding=encoding) as f: sample = f.read(2048) sniffer = csv.Sniffer() dialect = sniffer.sniff(sample) delimiter = dialect.delimiter if dialect.quoting == csv.QUOTE_ALL: quoting = 'ALLE' elif dialect.quoting == csv.QUOTE_MINIMAL: quoting = 'MINIMAL' elif dialect.quoting == csv.QUOTE_NONNUMERIC: quoting = 'NICHT_NUMERISCH' else: quoting = 'KEINE' return encoding, delimiter, quoting except: continue return 'utf-8', ';', 'MINIMAL' def parse_csv_line(self, line: str, delimiter: str, quoting: str) -> List[str]: """Eine CSV-Zeile mit gegebenen Einstellungen parsen""" quoting_map = { 'ALLE': csv.QUOTE_ALL, 'MINIMAL': csv.QUOTE_MINIMAL, 'NICHT_NUMERISCH': csv.QUOTE_NONNUMERIC, 'KEINE': csv.QUOTE_NONE } try: reader = csv.reader([line], delimiter=delimiter, quoting=quoting_map.get(quoting, csv.QUOTE_MINIMAL)) return next(reader) except: return line.split(delimiter) def configure_csv_import(self, filepath: Path) -> Tuple[str, str, str]: """Interaktive CSV-Import-Konfiguration""" detected_encoding, detected_delimiter, detected_quoting = self.detect_csv_format(filepath) delimiter_names = { ';': 'Semikolon (;)', ',': 'Komma (,)', '\t': 'Tab', '|': 'Pipe (|)', ' ': 'Leerzeichen' } with open(filepath, 'r', encoding=detected_encoding) as f: test_lines = [f.readline().strip() for _ in range(3)] print("\n" + "="*70) print(" CSV-IMPORT KONFIGURATION") print("="*70) print(f"\nDatei: {filepath.name}") print(f"Erkanntes Encoding: {detected_encoding}") print(f"Erkannter Delimiter: {delimiter_names.get(detected_delimiter, detected_delimiter)}") print(f"Erkanntes Quoting: {detected_quoting}") current_encoding = detected_encoding current_delimiter = detected_delimiter current_quoting = detected_quoting while True: print("\n" + "-"*70) print(" VORSCHAU DER ERSTEN ZEILEN:") print("-"*70) for i, line in enumerate(test_lines, 1): if line: parsed = self.parse_csv_line(line, current_delimiter, current_quoting) print(f"\nZeile {i}:") print(f" Rohtext: {line[:80]}...") print(f" Geparst: {len(parsed)} Felder") for j, field in enumerate(parsed[:5], 1): print(f" Feld {j}: '{field}'") if len(parsed) > 5: print(f" ... und {len(parsed)-5} weitere Felder") print("\n" + "-"*70) print(" AKTUELLE EINSTELLUNGEN:") print("-"*70) print(f" 1. Encoding: {current_encoding}") print(f" 2. Delimiter: {delimiter_names.get(current_delimiter, current_delimiter)}") print(f" 3. Quoting: {current_quoting}") print("\n 0. ✓ Diese Einstellungen verwenden") print("-"*70) choice = input("\nWas möchten Sie ändern? [0-3]: ").strip() if choice == '0': break elif choice == '1': print("\nVerfügbare Encodings:") encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1'] for i, enc in enumerate(encodings, 1): print(f" {i}. {enc}") enc_choice = input("Ihre Wahl: ").strip() try: idx = int(enc_choice) - 1 if 0 <= idx < len(encodings): current_encoding = encodings[idx] with open(filepath, 'r', encoding=current_encoding) as f: test_lines = [f.readline().strip() for _ in range(3)] except: pass elif choice == '2': print("\nVerfügbare Delimiters:") print(" 1. Semikolon (;)") print(" 2. Komma (,)") print(" 3. Tab") print(" 4. Pipe (|)") print(" 5. Leerzeichen") delim_choice = input("Ihre Wahl: ").strip() delim_map = {'1': ';', '2': ',', '3': '\t', '4': '|', '5': ' '} if delim_choice in delim_map: current_delimiter = delim_map[delim_choice] elif choice == '3': print("\nQuoting-Optionen:") print(" 1. MINIMAL - Nur Felder mit Sonderzeichen werden quoted") print(" 2. ALLE - Alle Felder werden in Anführungszeichen gesetzt") print(" 3. NICHT_NUMERISCH - Alle nicht-numerischen Felder werden quoted") print(" 4. KEINE - Keine Anführungszeichen") quot_choice = input("Ihre Wahl: ").strip() quot_map = {'1': 'MINIMAL', '2': 'ALLE', '3': 'NICHT_NUMERISCH', '4': 'KEINE'} if quot_choice in quot_map: current_quoting = quot_map[quot_choice] print("\n✓ Import-Einstellungen bestätigt") return current_encoding, current_delimiter, current_quoting def read_csv(self, filepath: Path, has_header: Optional[bool] = None, encoding: str = None, delimiter: str = None, quoting: str = None) -> Tuple[List[str], List[Dict], bool, Dict]: """CSV-Datei lesen""" if encoding is None or delimiter is None: encoding, delimiter, quoting = self.configure_csv_import(filepath) quoting_map = { 'ALLE': csv.QUOTE_ALL, 'MINIMAL': csv.QUOTE_MINIMAL, 'NICHT_NUMERISCH': csv.QUOTE_NONNUMERIC, 'KEINE': csv.QUOTE_NONE } csv_quoting = quoting_map.get(quoting, csv.QUOTE_MINIMAL) try: with open(filepath, 'r', encoding=encoding) as f: if has_header is None: has_header = self.ask_yes_no("\nHat die CSV-Datei eine Kopfzeile mit Spaltennamen?", True) if has_header: reader = csv.DictReader(f, delimiter=delimiter, quoting=csv_quoting) headers = list(reader.fieldnames) data = list(reader) else: reader = csv.reader(f, delimiter=delimiter, quoting=csv_quoting) rows = list(reader) if rows: headers = [f"Spalte_{i+1}" for i in range(len(rows[0]))] data = [] for row in rows: row_dict = {headers[i]: row[i] if i < len(row) else '' for i in range(len(headers))} data.append(row_dict) else: headers = [] data = [] import_settings = { 'encoding': encoding, 'delimiter': delimiter, 'quoting': quoting } return headers, data, has_header, import_settings except Exception as e: print(f"Fehler beim Lesen der CSV: {e}") raise def read_excel(self, filepath: Path) -> Tuple[Optional[List[str]], List[Dict], bool, Dict]: """Excel-Datei lesen""" if not EXCEL_SUPPORT: print("Fehler: openpyxl ist nicht installiert. Installieren Sie es mit: pip install openpyxl") return None, [], False, {} try: wb = openpyxl.load_workbook(filepath, data_only=True) sheet_names = wb.sheetnames if len(sheet_names) > 1: print("\nVerfügbare Tabellenblätter:") for i, name in enumerate(sheet_names, 1): print(f" {i}. {name}") while True: choice = input("Nummer des Tabellenblatts: ").strip() try: idx = int(choice) - 1 if 0 <= idx < len(sheet_names): ws = wb[sheet_names[idx]] break except ValueError: pass print("Ungültige Eingabe.") else: ws = wb.active has_header = self.ask_yes_no("Hat die Datei eine Kopfzeile mit Spaltennamen?", True) data = [] headers = None for i, row in enumerate(ws.iter_rows(values_only=True)): if i == 0 and has_header: headers = [str(cell) if cell is not None else f"Spalte_{j+1}" for j, cell in enumerate(row)] else: if headers is None: headers = [f"Spalte_{j+1}" for j in range(len(row))] row_dict = {} for j, cell in enumerate(row): if j < len(headers): row_dict[headers[j]] = str(cell) if cell is not None else '' data.append(row_dict) import_settings = {} return headers, data, has_header, import_settings except Exception as e: print(f"Fehler beim Lesen der Excel-Datei: {e}") return None, [], False, {} def apply_column_mappings(self, headers: List[str], data: List[Dict], mappings: Dict[str, str]) -> Tuple[List[str], List[Dict], Dict[str, str]]: """Spaltennamen umbenennen und Original-Namen behalten""" new_headers = [mappings.get(h, h) for h in headers] original_names = {new_h: old_h for old_h, new_h in zip(headers, new_headers)} new_data = [] for row in data: new_row = {} for old_h, new_h in zip(headers, new_headers): new_row[new_h] = row.get(old_h, '') new_data.append(new_row) return new_headers, new_data, original_names def remove_empty_columns(self, headers: List[str], data: List[Dict]) -> Tuple[List[str], List[Dict]]: """Leere Spalten entfernen""" non_empty_headers = [] for header in headers: has_values = any(row.get(header, '').strip() for row in data) if has_values: non_empty_headers.append(header) filtered_data = [] for row in data: filtered_row = {h: row.get(h, '') for h in non_empty_headers} filtered_data.append(filtered_row) return non_empty_headers, filtered_data def remove_empty_rows(self, data: List[Dict]) -> List[Dict]: """Komplett leere Zeilen entfernen""" filtered_data = [] for row in data: has_values = any(str(v).strip() for v in row.values()) if has_values: filtered_data.append(row) return filtered_data def analyze_row_completeness(self, headers: List[str], data: List[Dict]) -> Dict[int, int]: """Analysiere Zeilen nach Anzahl ausgefüllter Felder""" analysis = {} for idx, row in enumerate(data): filled_count = sum(1 for v in row.values() if str(v).strip()) analysis[idx] = filled_count return analysis def filter_rows_by_filled_fields(self, headers: List[str], data: List[Dict]) -> List[Dict]: """Zeilen mit zu wenig Informationen filtern""" total_columns = len(headers) analysis = self.analyze_row_completeness(headers, data) print(f"\nAnalyse der Datenvollständigkeit:") print(f"Gesamtanzahl Spalten: {total_columns}") print(f"Gesamtanzahl Zeilen: {len(data)}") from collections import Counter counts = Counter(analysis.values()) print("\nVerteilung ausgefüllter Felder pro Zeile:") for filled_count in sorted(counts.keys()): print(f" {filled_count} Felder ausgefüllt: {counts[filled_count]} Zeilen") print(f"\nBei wie vielen oder weniger ausgefüllten Feldern sollen Zeilen entfernt werden?") while True: try: threshold = int(input(f"Schwellenwert (0-{total_columns}): ").strip()) if 0 <= threshold <= total_columns: break print(f"Bitte eine Zahl zwischen 0 und {total_columns} eingeben.") except ValueError: print("Ungültige Eingabe.") rows_to_remove = [idx for idx, count in analysis.items() if count <= threshold] if rows_to_remove: print(f"\n{len(rows_to_remove)} Zeilen haben {threshold} oder weniger ausgefüllte Felder:") for i, idx in enumerate(rows_to_remove[:10]): print(f" Zeile {idx + 1}: {analysis[idx]} Felder ausgefüllt") sample = {k: v for k, v in data[idx].items() if str(v).strip()} print(f" Daten: {sample}") if len(rows_to_remove) > 10: print(f" ... und {len(rows_to_remove) - 10} weitere Zeilen") if self.ask_yes_no(f"\nMöchten Sie diese {len(rows_to_remove)} Zeilen löschen?"): filtered_data = [row for idx, row in enumerate(data) if idx not in rows_to_remove] print(f"✓ {len(rows_to_remove)} Zeilen entfernt") return filtered_data else: print(f"Keine Zeilen mit {threshold} oder weniger ausgefüllten Feldern gefunden.") return data def select_columns(self, headers: List[str], original_names: Dict[str, str] = None, preselected: Optional[List[str]] = None) -> List[str]: """Spalten für Export auswählen mit interaktiver An-/Abwahl""" GREEN = '\033[92m' RED = '\033[91m' RESET = '\033[0m' BOLD = '\033[1m' if preselected and all(h in headers for h in preselected): selected = set(preselected) else: selected = set(headers) def show_columns(): print("\n" + "="*70) print(" SPALTENAUSWAHL") print("="*70) print(f"{GREEN}● = Angewählt{RESET} | {RED}○ = Abgewählt{RESET}") print("-"*70) for i, header in enumerate(headers, 1): symbol = f"{GREEN}●{RESET}" if header in selected else f"{RED}○{RESET}" if original_names and header in original_names: original = original_names[header] if original != header: display = f"{original} {BOLD}→ {header}{RESET}" else: display = header else: display = header print(f" {symbol} {i:3d}. {display}") print("-"*70) print(f"Aktuell ausgewählt: {len(selected)} von {len(headers)} Spalten") print("="*70) def parse_selection(input_str: str) -> set: indices = set() parts = input_str.split(',') for part in parts: part = part.strip() if '-' in part: try: start, end = part.split('-') start_idx = int(start.strip()) - 1 end_idx = int(end.strip()) - 1 if 0 <= start_idx < len(headers) and 0 <= end_idx < len(headers): for i in range(start_idx, end_idx + 1): if 0 <= i < len(headers): indices.add(i) except ValueError: pass else: try: idx = int(part) - 1 if 0 <= idx < len(headers): indices.add(idx) except ValueError: pass return indices while True: show_columns() print("\nOptionen:") print(" [Nummern] - Spalten an/abwählen (z.B. '1,2,3' oder '1-5,10-15')") print(" + [Nummern] - Spalten ANwählen (z.B. '+1,2,3' oder '+10-20')") print(" - [Nummern] - Spalten ABwählen (z.B. '-1,2,3' oder '-5-10')") print(" alle - Alle Spalten anwählen") print(" keine - Alle Spalten abwählen") print(" q - Auswahl beenden und fortfahren") choice = input("\nIhre Wahl: ").strip() if choice.lower() == 'q': if selected: break else: print(f"\n{RED}Fehler: Mindestens eine Spalte muss ausgewählt sein!{RESET}") elif choice.lower() == 'alle': selected = set(headers) print(f"{GREEN}✓ Alle Spalten angewählt{RESET}") elif choice.lower() == 'keine': selected = set() print(f"{RED}✓ Alle Spalten abgewählt{RESET}") elif choice.startswith('+'): indices = parse_selection(choice[1:]) for idx in indices: selected.add(headers[idx]) print(f"{GREEN}✓ {len(indices)} Spalte(n) angewählt{RESET}") elif choice.startswith('-'): indices = parse_selection(choice[1:]) for idx in indices: selected.discard(headers[idx]) print(f"{RED}✓ {len(indices)} Spalte(n) abgewählt{RESET}") else: indices = parse_selection(choice) for idx in indices: if headers[idx] in selected: selected.discard(headers[idx]) else: selected.add(headers[idx]) if indices: print(f"✓ {len(indices)} Spalte(n) umgeschaltet") return [h for h in headers if h in selected] def sort_data(self, data: List[Dict], sort_column: str, data_type: str) -> List[Dict]: """Daten nach Spalte sortieren""" def get_sort_key(row): value = row.get(sort_column, '') if data_type == 'datum': formats = ['%d.%m.%Y', '%Y-%m-%d', '%d/%m/%Y', '%m/%d/%Y'] for fmt in formats: try: return datetime.strptime(value, fmt) except ValueError: continue return datetime.min elif data_type == 'zeit': formats = ['%H:%M:%S', '%H:%M'] for fmt in formats: try: return datetime.strptime(value, fmt).time() except ValueError: continue return datetime.min.time() elif data_type == 'dezimalzahl': try: value = value.replace(',', '.') return float(value) except ValueError: return 0.0 else: return value.lower() return sorted(data, key=get_sort_key) def configure_csv_export(self) -> Tuple[str, str]: """CSV-Export-Einstellungen konfigurieren""" print("\n" + "="*70) print(" CSV-EXPORT KONFIGURATION") print("="*70) print("\nDelimiter wählen:") print(" 1. Semikolon (;) - Standard für deutsche Excel-Versionen") print(" 2. Komma (,) - Internationaler Standard") print(" 3. Tab - Gut für Import in andere Programme") while True: delim_choice = input("Ihre Wahl [1-3]: ").strip() delim_map = {'1': ';', '2': ',', '3': '\t'} if delim_choice in delim_map: delimiter = delim_map[delim_choice] break print("Ungültige Eingabe.") print("\nQuoting (Anführungszeichen) wählen:") print(" 1. MINIMAL - Nur Felder mit Sonderzeichen (empfohlen)") print(" 2. ALLE - Alle Felder in Anführungszeichen") print(" 3. NICHT_NUMERISCH - Nur Text-Felder") print(" 4. KEINE - Keine Anführungszeichen (kann Probleme verursachen)") while True: quot_choice = input("Ihre Wahl [1-4]: ").strip() quot_map = {'1': 'MINIMAL', '2': 'ALLE', '3': 'NICHT_NUMERISCH', '4': 'KEINE'} if quot_choice in quot_map: quoting = quot_map[quot_choice] break print("Ungültige Eingabe.") delimiter_names = {';': 'Semikolon', ',': 'Komma', '\t': 'Tab'} print("\n" + "-"*70) print(f"Gewählte Einstellungen:") print(f" Delimiter: {delimiter_names.get(delimiter, delimiter)}") print(f" Quoting: {quoting}") print("-"*70) return delimiter, quoting def write_csv(self, filepath: Path, headers: List[str], data: List[Dict], header_text: str = "", footer_text: str = "", delimiter: str = ';', quoting: str = 'MINIMAL'): """CSV-Datei schreiben""" quoting_map = { 'ALLE': csv.QUOTE_ALL, 'MINIMAL': csv.QUOTE_MINIMAL, 'NICHT_NUMERISCH': csv.QUOTE_NONNUMERIC, 'KEINE': csv.QUOTE_NONE } csv_quoting = quoting_map.get(quoting, csv.QUOTE_MINIMAL) with open(filepath, 'w', encoding='utf-8', newline='') as f: if header_text: f.write(f"# {header_text}\n") writer = csv.DictWriter(f, fieldnames=headers, delimiter=delimiter, quoting=csv_quoting, quotechar='"') writer.writeheader() writer.writerows(data) if footer_text: f.write(f"# {footer_text}\n") print(f"✓ CSV-Datei erfolgreich gespeichert: {filepath}") def write_excel(self, filepath: Path, headers: List[str], data: List[Dict], header_text: str = "", footer_text: str = ""): """Excel-Datei schreiben""" if not EXCEL_SUPPORT: print("Fehler: openpyxl ist nicht installiert. Installieren Sie es mit: pip install openpyxl") return wb = openpyxl.Workbook() ws = wb.active ws.title = "Daten" row_num = 1 if header_text: ws.cell(row=row_num, column=1, value=f"# {header_text}") ws.cell(row=row_num, column=1).font = Font(italic=True) row_num += 1 for col_num, header in enumerate(headers, 1): cell = ws.cell(row=row_num, column=col_num, value=header) cell.font = Font(bold=True) cell.fill = PatternFill(start_color="CCCCCC", end_color="CCCCCC", fill_type="solid") row_num += 1 for row_data in data: for col_num, header in enumerate(headers, 1): ws.cell(row=row_num, column=col_num, value=row_data.get(header, '')) row_num += 1 if footer_text: ws.cell(row=row_num, column=1, value=f"# {footer_text}") ws.cell(row=row_num, column=1).font = Font(italic=True) for column in ws.columns: max_length = 0 column_letter = column[0].column_letter for cell in column: try: if len(str(cell.value)) > max_length: max_length = len(cell.value) except: pass adjusted_width = min(max_length + 2, 50) ws.column_dimensions[column_letter].width = adjusted_width wb.save(filepath) print(f"✓ Excel-Datei erfolgreich gespeichert: {filepath}") def write_ods(self, filepath: Path, headers: List[str], data: List[Dict], header_text: str = "", footer_text: str = ""): """OpenDocument Spreadsheet (ODS) schreiben""" if not ODT_SUPPORT: print("Fehler: odfpy ist nicht installiert. Installieren Sie es mit: pip install odfpy") return doc = OpenDocumentSpreadsheet() table = Table(name="Daten") if header_text: row = TableRow() cell = TableCell() cell.addElement(P(text=f"# {header_text}")) row.addElement(cell) table.addElement(row) row = TableRow() for header in headers: cell = TableCell() cell.addElement(P(text=header)) row.addElement(cell) table.addElement(row) for row_data in data: row = TableRow() for header in headers: cell = TableCell() cell.addElement(P(text=str(row_data.get(header, '')))) row.addElement(cell) table.addElement(row) if footer_text: row = TableRow() cell = TableCell() cell.addElement(P(text=f"# {footer_text}")) row.addElement(cell) table.addElement(row) doc.spreadsheet.addElement(table) doc.save(filepath) print(f"✓ ODS-Datei erfolgreich gespeichert: {filepath}") def select_output_format(self) -> str: """Ausgabeformat auswählen""" formats = ["csv"] print("\nVerfügbare Ausgabeformate:") print(" 1. CSV") format_num = 2 if EXCEL_SUPPORT: formats.append("xlsx") print(f" {format_num}. Excel (XLSX)") format_num += 1 if ODT_SUPPORT: formats.append("ods") print(f" {format_num}. OpenDocument (ODS)") while True: choice = input("Ihre Wahl: ").strip() try: idx = int(choice) - 1 if 0 <= idx < len(formats): return formats[idx] except ValueError: if choice.lower() in formats: return choice.lower() print("Ungültige Eingabe.") def manage_presets(self): """Voreinstellungen verwalten""" while True: self.clear_screen() print("╔═══════════════════════════════════════════════════════╗") print("║ VOREINSTELLUNGEN VERWALTEN ║") print("╚═══════════════════════════════════════════════════════╝") presets = self.get_available_presets() if presets: print("\nVerfügbare Voreinstellungen:") for i, preset in enumerate(presets, 1): print(f" {i}. {preset}") else: print("\n⚠ Keine Voreinstellungen vorhanden.") print("\nOptionen:") print(" 1. Voreinstellung bearbeiten") print(" 2. Voreinstellung löschen") print(" 0. Zurück zum Hauptmenü") choice = input("\nIhre Wahl: ").strip() if choice == '0': return elif choice == '1' and presets: while True: preset_choice = input("\nNummer oder Name der Voreinstellung: ").strip() try: idx = int(preset_choice) - 1 if 0 <= idx < len(presets): preset_name = presets[idx] else: print("Ungültige Nummer.") continue except ValueError: preset_name = preset_choice if preset_name in presets: self.edit_preset(preset_name) break else: print(f"Voreinstellung '{preset_name}' nicht gefunden.") elif choice == '2' and presets: while True: preset_choice = input("\nNummer oder Name der zu löschenden Voreinstellung: ").strip() try: idx = int(preset_choice) - 1 if 0 <= idx < len(presets): preset_name = presets[idx] else: print("Ungültige Nummer.") continue except ValueError: preset_name = preset_choice if preset_name in presets: if self.ask_yes_no(f"Voreinstellung '{preset_name}' wirklich löschen?"): self.delete_preset(preset_name) input("\nDrücken Sie Enter um fortzufahren...") break else: print(f"Voreinstellung '{preset_name}' nicht gefunden.") def edit_preset(self, preset_name: str): """Voreinstellung bearbeiten""" config = self.load_preset(preset_name) print(f"\n✓ Voreinstellung '{preset_name}' geladen.") print("\nSie können nun die Einstellungen Schritt für Schritt durchgehen und ändern.") print("Die aktuellen Werte werden als Vorschläge angezeigt.") input("\nDrücken Sie Enter um zu beginnen...") config = self.configure_settings(config, edit_mode=True) self.clear_screen() print("╔═══════════════════════════════════════════════════════╗") print("║ VOREINSTELLUNG SPEICHERN ║") print("╚═══════════════════════════════════════════════════════╝") print("\nMöchten Sie die Änderungen speichern?") print(f" 1. Unter gleichem Namen überschreiben ('{preset_name}')") print(f" 2. Als neue Voreinstellung speichern") print(f" 3. Änderungen verwerfen") while True: save_choice = input("\nIhre Wahl [1-3]: ").strip() if save_choice == '1': self.save_preset(preset_name, config) input("\nDrücken Sie Enter um fortzufahren...") return elif save_choice == '2': new_name = input("\nName für die neue Voreinstellung: ").strip() if new_name: self.save_preset(new_name, config) input("\nDrücken Sie Enter um fortzufahren...") return elif save_choice == '3': print("\nÄnderungen verworfen.") input("\nDrücken Sie Enter um fortzufahren...") return def configure_settings(self, config: Dict = None, edit_mode: bool = False) -> Dict: """Einstellungen konfigurieren (für Bearbeitung oder neue Voreinstellung)""" if config is None: config = {} self.clear_screen() print("╔═══════════════════════════════════════════════════════╗") print("║ EINSTELLUNGEN KONFIGURIEREN ║") print("╚═══════════════════════════════════════════════════════╝") # 1. Kopfzeile vorhanden? print("\n" + "="*60) print(" 1. Kopfzeile") print("="*60) current = config.get('has_header') if current is not None: print(f"Aktuell: {'Ja' if current else 'Nein'}") config['has_header'] = self.ask_yes_no("Hat die Datei normalerweise eine Kopfzeile?", current) # 2. Mapping-Datei print("\n" + "="*60) print(" 2. Spaltennamen-Mapping") print("="*60) current_mapping = config.get('mapping_file') if current_mapping: print(f"Aktuell: {current_mapping}") if self.ask_yes_no("Möchten Sie eine Mapping-Datei verwenden?", current_mapping is not None): mapping_file = self.select_file("Pfad zur Mapping-JSON-Datei") if mapping_file: config['mapping_file'] = str(mapping_file) else: config['mapping_file'] = None # 3. Leere Zeilen entfernen print("\n" + "="*60) print(" 3. Leere Zeilen") print("="*60) current = config.get('remove_empty_rows') if current is not None: print(f"Aktuell: {'Ja' if current else 'Nein'}") config['remove_empty_rows'] = self.ask_yes_no("Leere Zeilen entfernen?", current) # 4. Unvollständige Zeilen filtern print("\n" + "="*60) print(" 4. Unvollständige Zeilen") print("="*60) current = config.get('filter_incomplete_rows') if current is not None: print(f"Aktuell: {'Ja' if current else 'Nein'}") config['filter_incomplete_rows'] = self.ask_yes_no("Zeilen mit zu wenig Daten analysieren?", current) # 5. Leere Spalten entfernen print("\n" + "="*60) print(" 5. Leere Spalten") print("="*60) current = config.get('remove_empty') if current is not None: print(f"Aktuell: {'Ja' if current else 'Nein'}") config['remove_empty'] = self.ask_yes_no("Leere Spalten entfernen?", current) # 6. Spaltenauswahl print("\n" + "="*60) print(" 6. Spaltenauswahl") print("="*60) if 'selected_columns' in config and config['selected_columns']: print(f"Aktuell gespeicherte Spalten: {len(config['selected_columns'])}") print("\nGespeicherte Spalten:") for i, col in enumerate(config['selected_columns'], 1): print(f" {i}. {col}") if self.ask_yes_no("\nMöchten Sie die Spaltenauswahl ändern?"): print("\nBitte laden Sie eine Beispieldatei, um Spalten auszuwählen:") example_file = self.select_file("Pfad zur Beispieldatei (CSV/Excel)") if example_file: try: file_ext = example_file.suffix.lower() if file_ext in ['.xlsx', '.xls']: ex_headers, ex_data, ex_has_header, _ = self.read_excel(example_file) else: ex_headers, ex_data, ex_has_header, _ = self.read_csv(example_file) if config.get('mapping_file'): mapping_file = Path(config['mapping_file']) if mapping_file.exists(): mappings = self.load_column_mappings(mapping_file) ex_headers, ex_data, ex_original_names = self.apply_column_mappings(ex_headers, ex_data, mappings) else: ex_original_names = {h: h for h in ex_headers} else: ex_original_names = {h: h for h in ex_headers} config['selected_columns'] = self.select_columns(ex_headers, ex_original_names, config['selected_columns']) except Exception as e: print(f"Fehler beim Laden der Beispieldatei: {e}") else: print("Keine Spalten gespeichert.") if self.ask_yes_no("Möchten Sie Spalten auswählen?"): print("\nBitte laden Sie eine Beispieldatei, um Spalten auszuwählen:") example_file = self.select_file("Pfad zur Beispieldatei (CSV/Excel)") if example_file: try: file_ext = example_file.suffix.lower() if file_ext in ['.xlsx', '.xls']: ex_headers, ex_data, ex_has_header, _ = self.read_excel(example_file) else: ex_headers, ex_data, ex_has_header, _ = self.read_csv(example_file) if config.get('mapping_file'): mapping_file = Path(config['mapping_file']) if mapping_file.exists(): mappings = self.load_column_mappings(mapping_file) ex_headers, ex_data, ex_original_names = self.apply_column_mappings(ex_headers, ex_data, mappings) else: ex_original_names = {h: h for h in ex_headers} else: ex_original_names = {h: h for h in ex_headers} config['selected_columns'] = self.select_columns(ex_headers, ex_original_names, None) except Exception as e: print(f"Fehler beim Laden der Beispieldatei: {e}") # 7. Kopf- und Fußzeilen print("\n" + "="*60) print(" 7. Kopf- und Fußzeilen") print("="*60) current = config.get('add_header_footer') if current is not None: print(f"Aktuell: {'Ja' if current else 'Nein'}") if current: print(f" Kopfzeile: {config.get('header_text', '')}") print(f" Fußzeile: {config.get('footer_text', '')}") if self.ask_yes_no("Kopf-/Fußzeile hinzufügen?", current): header_text = input("Kopfzeile (Enter für keine): ").strip() footer_text = input("Fußzeile (Enter für keine): ").strip() config['add_header_footer'] = True config['header_text'] = header_text config['footer_text'] = footer_text else: config['add_header_footer'] = False # 8. Sortierung print("\n" + "="*60) print(" 8. Sortierung") print("="*60) current_sort = config.get('sort_column') if current_sort: print(f"Aktuell: Nach '{current_sort}' ({config.get('sort_type', 'string')})") if self.ask_yes_no("Daten sortieren?", current_sort is not None): sort_column = input("Spaltenname für Sortierung: ").strip() if sort_column: print("\nDatentyp für Sortierung:") print(" 1. Text (string)") print(" 2. Datum") print(" 3. Zeit") print(" 4. Dezimalzahl") type_map = {'1': 'string', '2': 'datum', '3': 'zeit', '4': 'dezimalzahl'} data_type = type_map.get(input("Ihre Wahl [1-4]: ").strip(), 'string') config['sort_column'] = sort_column config['sort_type'] = data_type else: config['sort_column'] = None # 9. Ausgabeformat print("\n" + "="*60) print(" 9. Ausgabeformat") print("="*60) current_format = config.get('output_format', 'csv') print(f"Aktuell: {current_format.upper()}") formats = ["csv"] print("\nVerfügbare Ausgabeformate:") print(" 1. CSV") format_num = 2 if EXCEL_SUPPORT: formats.append("xlsx") print(f" {format_num}. Excel (XLSX)") format_num += 1 if ODT_SUPPORT: formats.append("ods") print(f" {format_num}. OpenDocument (ODS)") while True: choice = input("Ihre Wahl: ").strip() try: idx = int(choice) - 1 if 0 <= idx < len(formats): config['output_format'] = formats[idx] break except ValueError: if choice.lower() in formats: config['output_format'] = choice.lower() break print("Ungültige Eingabe.") # 10. CSV-Export-Einstellungen (nur wenn CSV gewählt) if config['output_format'] == 'csv': print("\n" + "="*60) print(" 10. CSV-Export-Einstellungen") print("="*60) current_delim = config.get('export_delimiter') current_quot = config.get('export_quoting') if current_delim and current_quot: delimiter_names = {';': 'Semikolon', ',': 'Komma', '\t': 'Tab'} print(f"Aktuell: Delimiter={delimiter_names.get(current_delim, current_delim)}, Quoting={current_quot}") if self.ask_yes_no("CSV-Export-Einstellungen konfigurieren?", True): export_delimiter, export_quoting = self.configure_csv_export() config['export_delimiter'] = export_delimiter config['export_quoting'] = export_quoting return config def main_menu(self): """Hauptmenü anzeigen""" while True: self.clear_screen() print("╔═══════════════════════════════════════════════════════╗") print("║ CSV-PROCESSOR v2.0 - Dateiverarbeitung ║") print("╚═══════════════════════════════════════════════════════╝") print("\nVerfügbare Features:") print(f" Import: CSV{', Excel (XLSX/XLS)' if EXCEL_SUPPORT else ''}") print(f" Export: CSV{', Excel (XLSX)' if EXCEL_SUPPORT else ''}{', OpenDocument (ODS)' if ODT_SUPPORT else ''}") if not EXCEL_SUPPORT: print(f" ⚠ Excel-Support: pip install openpyxl") if not ODT_SUPPORT: print(f" ⚠ ODS-Support: pip install odfpy") print("\n" + "="*60) print(" HAUPTMENÜ") print("="*60) print(" 1. Neue Datei verarbeiten") print(" 2. Voreinstellungen verwalten") print(" 0. Beenden") choice = input("\nIhre Wahl: ").strip() if choice == '1': self.run() elif choice == '2': self.manage_presets() elif choice == '0': print("\nAuf Wiedersehen!") sys.exit(0) else: print("\nUngültige Eingabe.") input("Drücken Sie Enter um fortzufahren...") def run(self): """Hauptprogramm ausführen""" self.clear_screen() print("╔═══════════════════════════════════════════════════════╗") print("║ DATEI VERARBEITEN ║") print("╚═══════════════════════════════════════════════════════╝") config = {} use_preset = False preset_loaded_file = False # 1. Voreinstellungen abfragen presets = self.get_available_presets() if presets: self.print_header("Schritt 1: Voreinstellungen") print("Verfügbare Voreinstellungen:") for i, preset in enumerate(presets, 1): print(f" {i}. {preset}") if self.ask_yes_no("\nMöchten Sie eine Voreinstellung laden?"): while True: choice = input("Nummer oder Name der Voreinstellung: ").strip() try: idx = int(choice) - 1 if 0 <= idx < len(presets): preset_name = presets[idx] else: print("Ungültige Nummer.") continue except ValueError: preset_name = choice if preset_name in presets: config = self.load_preset(preset_name) use_preset = True print(f"✓ Voreinstellung '{preset_name}' geladen.") # Zeige was geladen wurde if 'source_file' in config and config['source_file']: print(f" → Quelldatei vorbelegt: {config['source_file']}") if 'mapping_file' in config and config['mapping_file']: print(f" → Mapping vorbelegt: {config['mapping_file']}") if 'selected_columns' in config and config['selected_columns']: print(f" → Spaltenauswahl: {len(config['selected_columns'])} Spalten") print("\nℹ Leere Spalten werden automatisch übersprungen (vor Spaltenauswahl)") break else: print(f"Voreinstellung '{preset_name}' nicht gefunden.") # 2. Quell-Datei self.print_header("Schritt 2: Quelldatei auswählen") # Wenn Voreinstellung Quelldatei enthält, diese verwenden if use_preset and 'source_file' in config and config['source_file']: source_path = Path(config['source_file']) if source_path.exists(): print(f"Verwende Quelldatei aus Voreinstellung: {source_path}") if self.ask_yes_no("Diese Datei verwenden?", True): source_file = source_path preset_loaded_file = True else: source_file = None else: print(f"⚠ Gespeicherte Quelldatei nicht gefunden: {config['source_file']}") source_file = None else: source_file = None # Wenn keine Datei aus Voreinstellung oder Benutzer will andere Datei if not source_file: while not source_file: source_file = self.select_file("Pfad zur Quelldatei (CSV/XLSX/XLS/ODS)") if not source_file: print("Eine Quelldatei ist erforderlich!") # Datei lesen file_ext = source_file.suffix.lower() has_header = config.get('has_header', None) if use_preset else None try: if file_ext in ['.xlsx', '.xls']: headers, data, has_header, import_settings = self.read_excel(source_file) else: import_encoding = config.get('import_encoding') if use_preset else None import_delimiter = config.get('import_delimiter') if use_preset else None import_quoting = config.get('import_quoting') if use_preset else None headers, data, has_header, import_settings = self.read_csv( source_file, has_header, import_encoding, import_delimiter, import_quoting ) config['import_encoding'] = import_settings['encoding'] config['import_delimiter'] = import_settings['delimiter'] config['import_quoting'] = import_settings['quoting'] if headers is None: print("Fehler beim Lesen der Datei.") return config['has_header'] = has_header print(f"\n✓ Datei geladen: {len(headers)} Spalten, {len(data)} Zeilen") except Exception as e: print(f"Fehler beim Lesen der Datei: {e}") import traceback traceback.print_exc() return # 3. Spaltennamen-Mapping self.print_header("Schritt 3: Spaltennamen umbenennen") original_names = {} if use_preset and 'mapping_file' in config: mapping_file = Path(config['mapping_file']) if config['mapping_file'] else None if mapping_file and mapping_file.exists(): print(f"Verwende Mapping-Datei aus Voreinstellung: {mapping_file}") mappings = self.load_column_mappings(mapping_file) else: if mapping_file: print(f"⚠ Mapping-Datei {mapping_file} nicht gefunden.") mappings = {} else: if self.ask_yes_no("Möchten Sie Spaltennamen umbenennen?"): mapping_file = self.select_file("Pfad zur Mapping-JSON-Datei") mappings = self.load_column_mappings(mapping_file) if mapping_file: config['mapping_file'] = str(mapping_file) else: mappings = {} if mappings: headers, data, original_names = self.apply_column_mappings(headers, data, mappings) print(f"✓ {len(mappings)} Spaltennamen umbenannt") else: original_names = {h: h for h in headers} # 4. Leere Zeilen entfernen self.print_header("Schritt 4: Leere Zeilen") if use_preset and 'remove_empty_rows' in config: remove_empty_rows = config['remove_empty_rows'] print(f"Verwende Voreinstellung: {'Ja' if remove_empty_rows else 'Nein'}") else: remove_empty_rows = self.ask_yes_no("Sollen komplett leere Zeilen entfernt werden?") config['remove_empty_rows'] = remove_empty_rows if remove_empty_rows: original_count = len(data) data = self.remove_empty_rows(data) removed_count = original_count - len(data) print(f"✓ {removed_count} leere Zeilen entfernt") # 5. Zeilen mit zu wenig Daten filtern self.print_header("Schritt 5: Unvollständige Zeilen") if use_preset and 'filter_incomplete_rows' in config: filter_incomplete = config['filter_incomplete_rows'] print(f"Verwende Voreinstellung: {'Ja' if filter_incomplete else 'Nein'}") else: filter_incomplete = self.ask_yes_no("Möchten Sie Zeilen mit zu wenig Informationen analysieren/entfernen?") config['filter_incomplete_rows'] = filter_incomplete if filter_incomplete: data = self.filter_rows_by_filled_fields(headers, data) # 6. Leere Spalten entfernen (VOR Spaltenauswahl!) self.print_header("Schritt 6: Leere Spalten") if use_preset and 'remove_empty' in config: remove_empty = config['remove_empty'] print(f"Verwende Voreinstellung: {'Ja' if remove_empty else 'Nein'}") else: remove_empty = self.ask_yes_no("Sollen leere Spalten entfernt werden?") config['remove_empty'] = remove_empty if remove_empty: original_count = len(headers) headers, data = self.remove_empty_columns(headers, data) removed_count = original_count - len(headers) print(f"✓ {removed_count} leere Spalten entfernt") # WICHTIG: original_names aktualisieren original_names = {h: original_names[h] for h in headers if h in original_names} # 7. Spaltenauswahl self.print_header("Schritt 7: Spaltenauswahl") # Bei Voreinstellung: Nur vorhandene (nicht-leere) Spalten verwenden if use_preset and 'selected_columns' in config: preselected = [col for col in config['selected_columns'] if col in headers] if preselected: selected_columns = preselected print(f"Verwende gespeicherte Spaltenauswahl:") print(f"(Nur nicht-leere Spalten werden berücksichtigt)") for i, col in enumerate(selected_columns, 1): orig = original_names.get(col, col) if orig != col: print(f" {i}. {orig} → {col}") else: print(f" {i}. {col}") print(f"\n✓ {len(selected_columns)} Spalten ausgewählt") # Zeige übersprungene Spalten skipped = [col for col in config['selected_columns'] if col not in headers] if skipped: print(f"\nℹ Übersprungene Spalten (leer oder nicht vorhanden): {len(skipped)}") for col in skipped[:5]: print(f" - {col}") if len(skipped) > 5: print(f" ... und {len(skipped)-5} weitere") else: print("⚠ Alle gespeicherten Spalten sind leer oder nicht vorhanden.") print("Bitte neue Auswahl treffen:") selected_columns = self.select_columns(headers, original_names, None) config['selected_columns'] = selected_columns else: selected_columns = self.select_columns(headers, original_names, None) config['selected_columns'] = selected_columns # Daten filtern filtered_data = [] for row in data: filtered_row = {col: row.get(col, '') for col in selected_columns} filtered_data.append(filtered_row) data = filtered_data # 8. Kopf- und Fußzeilen self.print_header("Schritt 8: Kopf- und Fußzeilen") header_text = "" footer_text = "" if use_preset and 'add_header_footer' in config: add_header_footer = config['add_header_footer'] if add_header_footer: header_text = config.get('header_text', '') footer_text = config.get('footer_text', '') print(f"Verwende Kopfzeile: {header_text}") print(f"Verwende Fußzeile: {footer_text}") else: if self.ask_yes_no("Möchten Sie eine Kopf- oder Fußzeile hinzufügen?"): header_text = input("Kopfzeile (Enter für keine): ").strip() footer_text = input("Fußzeile (Enter für keine): ").strip() config['add_header_footer'] = True config['header_text'] = header_text config['footer_text'] = footer_text else: config['add_header_footer'] = False # 9. Sortierung self.print_header("Schritt 9: Sortierung") if use_preset and 'sort_column' in config: sort_column = config['sort_column'] if sort_column and sort_column in selected_columns: data_type = config.get('sort_type', 'string') print(f"Sortiere nach '{sort_column}' ({data_type})") data = self.sort_data(data, sort_column, data_type) else: if self.ask_yes_no("Möchten Sie die Daten sortieren?"): print("\nVerfügbare Spalten zum Sortieren:") for i, col in enumerate(selected_columns, 1): print(f" {i}. {col}") while True: choice = input("Nummer der Spalte: ").strip() try: idx = int(choice) - 1 if 0 <= idx < len(selected_columns): sort_column = selected_columns[idx] break except ValueError: pass print("Ungültige Eingabe.") print("\nDatentyp für Sortierung:") print(" 1. Text (string)") print(" 2. Datum") print(" 3. Zeit") print(" 4. Dezimalzahl") type_map = {'1': 'string', '2': 'datum', '3': 'zeit', '4': 'dezimalzahl'} data_type = type_map.get(input("Ihre Wahl [1-4]: ").strip(), 'string') data = self.sort_data(data, sort_column, data_type) config['sort_column'] = sort_column config['sort_type'] = data_type print(f"✓ Daten nach '{sort_column}' sortiert") else: config['sort_column'] = None # 10. Ausgabeformat und Zieldatei self.print_header("Schritt 10: Zieldatei speichern") if use_preset and 'output_format' in config: output_format = config['output_format'] print(f"Ausgabeformat aus Voreinstellung: {output_format.upper()}") else: output_format = self.select_output_format() config['output_format'] = output_format while True: target_path = input(f"Name/Pfad der Zieldatei (ohne Endung): ").strip() if target_path: target_file = Path(target_path).with_suffix(f'.{output_format}') if target_file.exists(): if self.ask_yes_no(f"Datei {target_file} existiert bereits. Überschreiben?"): break else: break # Datei schreiben if output_format == 'csv': if use_preset and 'export_delimiter' in config and 'export_quoting' in config: export_delimiter = config['export_delimiter'] export_quoting = config['export_quoting'] delimiter_names = {';': 'Semikolon', ',': 'Komma', '\t': 'Tab'} print(f"\nVerwende Export-Einstellungen aus Voreinstellung:") print(f" Delimiter: {delimiter_names.get(export_delimiter, export_delimiter)}") print(f" Quoting: {export_quoting}") else: export_delimiter, export_quoting = self.configure_csv_export() config['export_delimiter'] = export_delimiter config['export_quoting'] = export_quoting self.write_csv(target_file, selected_columns, data, header_text, footer_text, export_delimiter, export_quoting) elif output_format == 'xlsx': self.write_excel(target_file, selected_columns, data, header_text, footer_text) elif output_format == 'ods': self.write_ods(target_file, selected_columns, data, header_text, footer_text) print("\n" + "="*60) print(" Verarbeitung abgeschlossen!") print("="*60) # Voreinstellung speichern if not use_preset: if self.ask_yes_no("\nMöchten Sie diese Einstellungen als Voreinstellung speichern?"): # Fragen ob Quelldatei mitgespeichert werden soll save_source = False if self.ask_yes_no("Pfad zur Quelldatei mit speichern? (Datei wird dann automatisch geladen)", False): config['source_file'] = str(source_file) save_source = True preset_name = input("Name für die Voreinstellung: ").strip() if preset_name: self.save_preset(preset_name, config) print(f"\nGespeichert:") print(f" - Einstellungen: Ja") print(f" - Mapping: {'Ja' if config.get('mapping_file') else 'Nein'}") print(f" - Quelldatei: {'Ja' if save_source else 'Nein'}") print(f" - Spaltenauswahl: {len(config['selected_columns'])} Spalten") input("\nDrücken Sie Enter um zum Hauptmenü zurückzukehren...") def main(): """Hauptfunktion""" processor = CSVProcessor() try: processor.main_menu() except KeyboardInterrupt: print("\n\nProgramm wurde durch Benutzer beendet.") sys.exit(0) except Exception as e: print(f"\nFehler: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()