1547 lines
66 KiB
Python
1547 lines
66 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
CSV-Processor v2.0: Erweiterte Verarbeitung und Formatierung von CSV/Excel-Dateien
|
||
mit Voreinstellungsverwaltung und Multi-Format-Export
|
||
"""
|
||
|
||
import csv
|
||
import json
|
||
import os
|
||
import sys
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional, Any, Tuple
|
||
import re
|
||
|
||
# Optional: Externe Bibliotheken für Excel/ODT Support
|
||
try:
|
||
import openpyxl
|
||
from openpyxl.styles import Font, PatternFill
|
||
EXCEL_SUPPORT = True
|
||
except ImportError:
|
||
EXCEL_SUPPORT = False
|
||
|
||
try:
|
||
from odf.opendocument import OpenDocumentSpreadsheet
|
||
from odf.table import Table, TableRow, TableCell
|
||
from odf.text import P
|
||
ODT_SUPPORT = True
|
||
except ImportError:
|
||
ODT_SUPPORT = False
|
||
|
||
|
||
class CSVProcessor:
|
||
def __init__(self):
|
||
self.config_dir = Path("csv_processor_config")
|
||
self.config_dir.mkdir(exist_ok=True)
|
||
self.current_config = {}
|
||
|
||
def clear_screen(self):
|
||
"""Bildschirm löschen für bessere Übersicht"""
|
||
os.system('cls' if os.name == 'nt' else 'clear')
|
||
|
||
def print_header(self, text: str):
|
||
"""Formatierte Überschrift ausgeben"""
|
||
print("\n" + "="*60)
|
||
print(f" {text}")
|
||
print("="*60 + "\n")
|
||
|
||
def get_available_presets(self) -> List[str]:
|
||
"""Liste aller verfügbaren Voreinstellungen"""
|
||
presets = []
|
||
for file in self.config_dir.glob("*.json"):
|
||
presets.append(file.stem)
|
||
return presets
|
||
|
||
def load_preset(self, preset_name: str) -> Dict:
|
||
"""Voreinstellung laden"""
|
||
preset_file = self.config_dir / f"{preset_name}.json"
|
||
if preset_file.exists():
|
||
with open(preset_file, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
return {}
|
||
|
||
def save_preset(self, preset_name: str, config: Dict):
|
||
"""Voreinstellung speichern"""
|
||
preset_file = self.config_dir / f"{preset_name}.json"
|
||
with open(preset_file, 'w', encoding='utf-8') as f:
|
||
json.dump(config, f, indent=2, ensure_ascii=False)
|
||
print(f"\n✓ Voreinstellung '{preset_name}' gespeichert.")
|
||
|
||
def delete_preset(self, preset_name: str):
|
||
"""Voreinstellung löschen"""
|
||
preset_file = self.config_dir / f"{preset_name}.json"
|
||
if preset_file.exists():
|
||
preset_file.unlink()
|
||
print(f"\n✓ Voreinstellung '{preset_name}' gelöscht.")
|
||
else:
|
||
print(f"\n✗ Voreinstellung '{preset_name}' nicht gefunden.")
|
||
|
||
def ask_yes_no(self, question: str, default: Optional[bool] = None) -> bool:
|
||
"""Ja/Nein Frage stellen"""
|
||
if default is not None:
|
||
question += f" (Standard: {'Ja' if default else 'Nein'})"
|
||
|
||
while True:
|
||
answer = input(f"{question} [j/n]: ").lower().strip()
|
||
if answer in ['j', 'ja', 'y', 'yes']:
|
||
return True
|
||
elif answer in ['n', 'nein', 'no']:
|
||
return False
|
||
elif answer == '' and default is not None:
|
||
return default
|
||
else:
|
||
print("Bitte antworten Sie mit 'j' für Ja oder 'n' für Nein.")
|
||
|
||
def select_file(self, prompt: str, extension: str = "*") -> Optional[Path]:
|
||
"""Datei auswählen"""
|
||
while True:
|
||
filename = input(f"{prompt}: ").strip()
|
||
if not filename:
|
||
return None
|
||
|
||
path = Path(filename)
|
||
if path.exists():
|
||
return path
|
||
else:
|
||
print(f"Datei '{filename}' nicht gefunden. Bitte erneut versuchen.")
|
||
if not self.ask_yes_no("Möchten Sie es erneut versuchen?", True):
|
||
return None
|
||
|
||
def load_column_mappings(self, mapping_file: Optional[Path]) -> Dict[str, str]:
|
||
"""Spaltennamen-Zuordnungen aus JSON laden"""
|
||
if not mapping_file or not mapping_file.exists():
|
||
return {}
|
||
|
||
try:
|
||
with open(mapping_file, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
except json.JSONDecodeError:
|
||
print(f"Fehler beim Lesen der Mapping-Datei: {mapping_file}")
|
||
return {}
|
||
|
||
def detect_csv_format(self, filepath: Path) -> Tuple[str, str, str]:
|
||
"""CSV-Format automatisch erkennen"""
|
||
encodings = ['utf-8', 'latin-1', 'cp1252']
|
||
|
||
for encoding in encodings:
|
||
try:
|
||
with open(filepath, 'r', encoding=encoding) as f:
|
||
sample = f.read(2048)
|
||
|
||
sniffer = csv.Sniffer()
|
||
dialect = sniffer.sniff(sample)
|
||
delimiter = dialect.delimiter
|
||
|
||
if dialect.quoting == csv.QUOTE_ALL:
|
||
quoting = 'ALLE'
|
||
elif dialect.quoting == csv.QUOTE_MINIMAL:
|
||
quoting = 'MINIMAL'
|
||
elif dialect.quoting == csv.QUOTE_NONNUMERIC:
|
||
quoting = 'NICHT_NUMERISCH'
|
||
else:
|
||
quoting = 'KEINE'
|
||
|
||
return encoding, delimiter, quoting
|
||
except:
|
||
continue
|
||
|
||
return 'utf-8', ';', 'MINIMAL'
|
||
|
||
def parse_csv_line(self, line: str, delimiter: str, quoting: str) -> List[str]:
|
||
"""Eine CSV-Zeile mit gegebenen Einstellungen parsen"""
|
||
quoting_map = {
|
||
'ALLE': csv.QUOTE_ALL,
|
||
'MINIMAL': csv.QUOTE_MINIMAL,
|
||
'NICHT_NUMERISCH': csv.QUOTE_NONNUMERIC,
|
||
'KEINE': csv.QUOTE_NONE
|
||
}
|
||
|
||
try:
|
||
reader = csv.reader([line], delimiter=delimiter, quoting=quoting_map.get(quoting, csv.QUOTE_MINIMAL))
|
||
return next(reader)
|
||
except:
|
||
return line.split(delimiter)
|
||
|
||
def configure_csv_import(self, filepath: Path) -> Tuple[str, str, str]:
|
||
"""Interaktive CSV-Import-Konfiguration"""
|
||
detected_encoding, detected_delimiter, detected_quoting = self.detect_csv_format(filepath)
|
||
|
||
delimiter_names = {
|
||
';': 'Semikolon (;)',
|
||
',': 'Komma (,)',
|
||
'\t': 'Tab',
|
||
'|': 'Pipe (|)',
|
||
' ': 'Leerzeichen'
|
||
}
|
||
|
||
with open(filepath, 'r', encoding=detected_encoding) as f:
|
||
test_lines = [f.readline().strip() for _ in range(3)]
|
||
|
||
print("\n" + "="*70)
|
||
print(" CSV-IMPORT KONFIGURATION")
|
||
print("="*70)
|
||
print(f"\nDatei: {filepath.name}")
|
||
print(f"Erkanntes Encoding: {detected_encoding}")
|
||
print(f"Erkannter Delimiter: {delimiter_names.get(detected_delimiter, detected_delimiter)}")
|
||
print(f"Erkanntes Quoting: {detected_quoting}")
|
||
|
||
current_encoding = detected_encoding
|
||
current_delimiter = detected_delimiter
|
||
current_quoting = detected_quoting
|
||
|
||
while True:
|
||
print("\n" + "-"*70)
|
||
print(" VORSCHAU DER ERSTEN ZEILEN:")
|
||
print("-"*70)
|
||
|
||
for i, line in enumerate(test_lines, 1):
|
||
if line:
|
||
parsed = self.parse_csv_line(line, current_delimiter, current_quoting)
|
||
print(f"\nZeile {i}:")
|
||
print(f" Rohtext: {line[:80]}...")
|
||
print(f" Geparst: {len(parsed)} Felder")
|
||
for j, field in enumerate(parsed[:5], 1):
|
||
print(f" Feld {j}: '{field}'")
|
||
if len(parsed) > 5:
|
||
print(f" ... und {len(parsed)-5} weitere Felder")
|
||
|
||
print("\n" + "-"*70)
|
||
print(" AKTUELLE EINSTELLUNGEN:")
|
||
print("-"*70)
|
||
print(f" 1. Encoding: {current_encoding}")
|
||
print(f" 2. Delimiter: {delimiter_names.get(current_delimiter, current_delimiter)}")
|
||
print(f" 3. Quoting: {current_quoting}")
|
||
print("\n 0. ✓ Diese Einstellungen verwenden")
|
||
print("-"*70)
|
||
|
||
choice = input("\nWas möchten Sie ändern? [0-3]: ").strip()
|
||
|
||
if choice == '0':
|
||
break
|
||
elif choice == '1':
|
||
print("\nVerfügbare Encodings:")
|
||
encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']
|
||
for i, enc in enumerate(encodings, 1):
|
||
print(f" {i}. {enc}")
|
||
enc_choice = input("Ihre Wahl: ").strip()
|
||
try:
|
||
idx = int(enc_choice) - 1
|
||
if 0 <= idx < len(encodings):
|
||
current_encoding = encodings[idx]
|
||
with open(filepath, 'r', encoding=current_encoding) as f:
|
||
test_lines = [f.readline().strip() for _ in range(3)]
|
||
except:
|
||
pass
|
||
|
||
elif choice == '2':
|
||
print("\nVerfügbare Delimiters:")
|
||
print(" 1. Semikolon (;)")
|
||
print(" 2. Komma (,)")
|
||
print(" 3. Tab")
|
||
print(" 4. Pipe (|)")
|
||
print(" 5. Leerzeichen")
|
||
delim_choice = input("Ihre Wahl: ").strip()
|
||
delim_map = {'1': ';', '2': ',', '3': '\t', '4': '|', '5': ' '}
|
||
if delim_choice in delim_map:
|
||
current_delimiter = delim_map[delim_choice]
|
||
|
||
elif choice == '3':
|
||
print("\nQuoting-Optionen:")
|
||
print(" 1. MINIMAL - Nur Felder mit Sonderzeichen werden quoted")
|
||
print(" 2. ALLE - Alle Felder werden in Anführungszeichen gesetzt")
|
||
print(" 3. NICHT_NUMERISCH - Alle nicht-numerischen Felder werden quoted")
|
||
print(" 4. KEINE - Keine Anführungszeichen")
|
||
quot_choice = input("Ihre Wahl: ").strip()
|
||
quot_map = {'1': 'MINIMAL', '2': 'ALLE', '3': 'NICHT_NUMERISCH', '4': 'KEINE'}
|
||
if quot_choice in quot_map:
|
||
current_quoting = quot_map[quot_choice]
|
||
|
||
print("\n✓ Import-Einstellungen bestätigt")
|
||
return current_encoding, current_delimiter, current_quoting
|
||
|
||
def read_csv(self, filepath: Path, has_header: Optional[bool] = None,
|
||
encoding: str = None, delimiter: str = None, quoting: str = None) -> Tuple[List[str], List[Dict], bool, Dict]:
|
||
"""CSV-Datei lesen"""
|
||
if encoding is None or delimiter is None:
|
||
encoding, delimiter, quoting = self.configure_csv_import(filepath)
|
||
|
||
quoting_map = {
|
||
'ALLE': csv.QUOTE_ALL,
|
||
'MINIMAL': csv.QUOTE_MINIMAL,
|
||
'NICHT_NUMERISCH': csv.QUOTE_NONNUMERIC,
|
||
'KEINE': csv.QUOTE_NONE
|
||
}
|
||
|
||
csv_quoting = quoting_map.get(quoting, csv.QUOTE_MINIMAL)
|
||
|
||
try:
|
||
with open(filepath, 'r', encoding=encoding) as f:
|
||
if has_header is None:
|
||
has_header = self.ask_yes_no("\nHat die CSV-Datei eine Kopfzeile mit Spaltennamen?", True)
|
||
|
||
if has_header:
|
||
reader = csv.DictReader(f, delimiter=delimiter, quoting=csv_quoting)
|
||
headers = list(reader.fieldnames)
|
||
data = list(reader)
|
||
else:
|
||
reader = csv.reader(f, delimiter=delimiter, quoting=csv_quoting)
|
||
rows = list(reader)
|
||
if rows:
|
||
headers = [f"Spalte_{i+1}" for i in range(len(rows[0]))]
|
||
data = []
|
||
for row in rows:
|
||
row_dict = {headers[i]: row[i] if i < len(row) else ''
|
||
for i in range(len(headers))}
|
||
data.append(row_dict)
|
||
else:
|
||
headers = []
|
||
data = []
|
||
|
||
import_settings = {
|
||
'encoding': encoding,
|
||
'delimiter': delimiter,
|
||
'quoting': quoting
|
||
}
|
||
|
||
return headers, data, has_header, import_settings
|
||
except Exception as e:
|
||
print(f"Fehler beim Lesen der CSV: {e}")
|
||
raise
|
||
|
||
def read_excel(self, filepath: Path) -> Tuple[Optional[List[str]], List[Dict], bool, Dict]:
|
||
"""Excel-Datei lesen"""
|
||
if not EXCEL_SUPPORT:
|
||
print("Fehler: openpyxl ist nicht installiert. Installieren Sie es mit: pip install openpyxl")
|
||
return None, [], False, {}
|
||
|
||
try:
|
||
wb = openpyxl.load_workbook(filepath, data_only=True)
|
||
|
||
sheet_names = wb.sheetnames
|
||
if len(sheet_names) > 1:
|
||
print("\nVerfügbare Tabellenblätter:")
|
||
for i, name in enumerate(sheet_names, 1):
|
||
print(f" {i}. {name}")
|
||
|
||
while True:
|
||
choice = input("Nummer des Tabellenblatts: ").strip()
|
||
try:
|
||
idx = int(choice) - 1
|
||
if 0 <= idx < len(sheet_names):
|
||
ws = wb[sheet_names[idx]]
|
||
break
|
||
except ValueError:
|
||
pass
|
||
print("Ungültige Eingabe.")
|
||
else:
|
||
ws = wb.active
|
||
|
||
has_header = self.ask_yes_no("Hat die Datei eine Kopfzeile mit Spaltennamen?", True)
|
||
|
||
data = []
|
||
headers = None
|
||
|
||
for i, row in enumerate(ws.iter_rows(values_only=True)):
|
||
if i == 0 and has_header:
|
||
headers = [str(cell) if cell is not None else f"Spalte_{j+1}"
|
||
for j, cell in enumerate(row)]
|
||
else:
|
||
if headers is None:
|
||
headers = [f"Spalte_{j+1}" for j in range(len(row))]
|
||
|
||
row_dict = {}
|
||
for j, cell in enumerate(row):
|
||
if j < len(headers):
|
||
row_dict[headers[j]] = str(cell) if cell is not None else ''
|
||
data.append(row_dict)
|
||
|
||
import_settings = {}
|
||
|
||
return headers, data, has_header, import_settings
|
||
|
||
except Exception as e:
|
||
print(f"Fehler beim Lesen der Excel-Datei: {e}")
|
||
return None, [], False, {}
|
||
|
||
def apply_column_mappings(self, headers: List[str], data: List[Dict], mappings: Dict[str, str]) -> Tuple[List[str], List[Dict], Dict[str, str]]:
|
||
"""Spaltennamen umbenennen und Original-Namen behalten"""
|
||
new_headers = [mappings.get(h, h) for h in headers]
|
||
original_names = {new_h: old_h for old_h, new_h in zip(headers, new_headers)}
|
||
|
||
new_data = []
|
||
for row in data:
|
||
new_row = {}
|
||
for old_h, new_h in zip(headers, new_headers):
|
||
new_row[new_h] = row.get(old_h, '')
|
||
new_data.append(new_row)
|
||
|
||
return new_headers, new_data, original_names
|
||
|
||
def remove_empty_columns(self, headers: List[str], data: List[Dict]) -> Tuple[List[str], List[Dict]]:
|
||
"""Leere Spalten entfernen"""
|
||
non_empty_headers = []
|
||
|
||
for header in headers:
|
||
has_values = any(row.get(header, '').strip() for row in data)
|
||
if has_values:
|
||
non_empty_headers.append(header)
|
||
|
||
filtered_data = []
|
||
for row in data:
|
||
filtered_row = {h: row.get(h, '') for h in non_empty_headers}
|
||
filtered_data.append(filtered_row)
|
||
|
||
return non_empty_headers, filtered_data
|
||
|
||
def remove_empty_rows(self, data: List[Dict]) -> List[Dict]:
|
||
"""Komplett leere Zeilen entfernen"""
|
||
filtered_data = []
|
||
for row in data:
|
||
has_values = any(str(v).strip() for v in row.values())
|
||
if has_values:
|
||
filtered_data.append(row)
|
||
return filtered_data
|
||
|
||
def analyze_row_completeness(self, headers: List[str], data: List[Dict]) -> Dict[int, int]:
|
||
"""Analysiere Zeilen nach Anzahl ausgefüllter Felder"""
|
||
analysis = {}
|
||
for idx, row in enumerate(data):
|
||
filled_count = sum(1 for v in row.values() if str(v).strip())
|
||
analysis[idx] = filled_count
|
||
return analysis
|
||
|
||
def filter_rows_by_filled_fields(self, headers: List[str], data: List[Dict]) -> List[Dict]:
|
||
"""Zeilen mit zu wenig Informationen filtern"""
|
||
total_columns = len(headers)
|
||
analysis = self.analyze_row_completeness(headers, data)
|
||
|
||
print(f"\nAnalyse der Datenvollständigkeit:")
|
||
print(f"Gesamtanzahl Spalten: {total_columns}")
|
||
print(f"Gesamtanzahl Zeilen: {len(data)}")
|
||
|
||
from collections import Counter
|
||
counts = Counter(analysis.values())
|
||
|
||
print("\nVerteilung ausgefüllter Felder pro Zeile:")
|
||
for filled_count in sorted(counts.keys()):
|
||
print(f" {filled_count} Felder ausgefüllt: {counts[filled_count]} Zeilen")
|
||
|
||
print(f"\nBei wie vielen oder weniger ausgefüllten Feldern sollen Zeilen entfernt werden?")
|
||
while True:
|
||
try:
|
||
threshold = int(input(f"Schwellenwert (0-{total_columns}): ").strip())
|
||
if 0 <= threshold <= total_columns:
|
||
break
|
||
print(f"Bitte eine Zahl zwischen 0 und {total_columns} eingeben.")
|
||
except ValueError:
|
||
print("Ungültige Eingabe.")
|
||
|
||
rows_to_remove = [idx for idx, count in analysis.items() if count <= threshold]
|
||
|
||
if rows_to_remove:
|
||
print(f"\n{len(rows_to_remove)} Zeilen haben {threshold} oder weniger ausgefüllte Felder:")
|
||
for i, idx in enumerate(rows_to_remove[:10]):
|
||
print(f" Zeile {idx + 1}: {analysis[idx]} Felder ausgefüllt")
|
||
sample = {k: v for k, v in data[idx].items() if str(v).strip()}
|
||
print(f" Daten: {sample}")
|
||
|
||
if len(rows_to_remove) > 10:
|
||
print(f" ... und {len(rows_to_remove) - 10} weitere Zeilen")
|
||
|
||
if self.ask_yes_no(f"\nMöchten Sie diese {len(rows_to_remove)} Zeilen löschen?"):
|
||
filtered_data = [row for idx, row in enumerate(data) if idx not in rows_to_remove]
|
||
print(f"✓ {len(rows_to_remove)} Zeilen entfernt")
|
||
return filtered_data
|
||
else:
|
||
print(f"Keine Zeilen mit {threshold} oder weniger ausgefüllten Feldern gefunden.")
|
||
|
||
return data
|
||
|
||
def select_columns(self, headers: List[str], original_names: Dict[str, str] = None,
|
||
preselected: Optional[List[str]] = None) -> List[str]:
|
||
"""Spalten für Export auswählen mit interaktiver An-/Abwahl"""
|
||
GREEN = '\033[92m'
|
||
RED = '\033[91m'
|
||
RESET = '\033[0m'
|
||
BOLD = '\033[1m'
|
||
|
||
if preselected and all(h in headers for h in preselected):
|
||
selected = set(preselected)
|
||
else:
|
||
selected = set(headers)
|
||
|
||
def show_columns():
|
||
print("\n" + "="*70)
|
||
print(" SPALTENAUSWAHL")
|
||
print("="*70)
|
||
print(f"{GREEN}● = Angewählt{RESET} | {RED}○ = Abgewählt{RESET}")
|
||
print("-"*70)
|
||
|
||
for i, header in enumerate(headers, 1):
|
||
symbol = f"{GREEN}●{RESET}" if header in selected else f"{RED}○{RESET}"
|
||
|
||
if original_names and header in original_names:
|
||
original = original_names[header]
|
||
if original != header:
|
||
display = f"{original} {BOLD}→ {header}{RESET}"
|
||
else:
|
||
display = header
|
||
else:
|
||
display = header
|
||
|
||
print(f" {symbol} {i:3d}. {display}")
|
||
|
||
print("-"*70)
|
||
print(f"Aktuell ausgewählt: {len(selected)} von {len(headers)} Spalten")
|
||
print("="*70)
|
||
|
||
def parse_selection(input_str: str) -> set:
|
||
indices = set()
|
||
parts = input_str.split(',')
|
||
|
||
for part in parts:
|
||
part = part.strip()
|
||
if '-' in part:
|
||
try:
|
||
start, end = part.split('-')
|
||
start_idx = int(start.strip()) - 1
|
||
end_idx = int(end.strip()) - 1
|
||
if 0 <= start_idx < len(headers) and 0 <= end_idx < len(headers):
|
||
for i in range(start_idx, end_idx + 1):
|
||
if 0 <= i < len(headers):
|
||
indices.add(i)
|
||
except ValueError:
|
||
pass
|
||
else:
|
||
try:
|
||
idx = int(part) - 1
|
||
if 0 <= idx < len(headers):
|
||
indices.add(idx)
|
||
except ValueError:
|
||
pass
|
||
|
||
return indices
|
||
|
||
while True:
|
||
show_columns()
|
||
|
||
print("\nOptionen:")
|
||
print(" [Nummern] - Spalten an/abwählen (z.B. '1,2,3' oder '1-5,10-15')")
|
||
print(" + [Nummern] - Spalten ANwählen (z.B. '+1,2,3' oder '+10-20')")
|
||
print(" - [Nummern] - Spalten ABwählen (z.B. '-1,2,3' oder '-5-10')")
|
||
print(" alle - Alle Spalten anwählen")
|
||
print(" keine - Alle Spalten abwählen")
|
||
print(" q - Auswahl beenden und fortfahren")
|
||
|
||
choice = input("\nIhre Wahl: ").strip()
|
||
|
||
if choice.lower() == 'q':
|
||
if selected:
|
||
break
|
||
else:
|
||
print(f"\n{RED}Fehler: Mindestens eine Spalte muss ausgewählt sein!{RESET}")
|
||
|
||
elif choice.lower() == 'alle':
|
||
selected = set(headers)
|
||
print(f"{GREEN}✓ Alle Spalten angewählt{RESET}")
|
||
|
||
elif choice.lower() == 'keine':
|
||
selected = set()
|
||
print(f"{RED}✓ Alle Spalten abgewählt{RESET}")
|
||
|
||
elif choice.startswith('+'):
|
||
indices = parse_selection(choice[1:])
|
||
for idx in indices:
|
||
selected.add(headers[idx])
|
||
print(f"{GREEN}✓ {len(indices)} Spalte(n) angewählt{RESET}")
|
||
|
||
elif choice.startswith('-'):
|
||
indices = parse_selection(choice[1:])
|
||
for idx in indices:
|
||
selected.discard(headers[idx])
|
||
print(f"{RED}✓ {len(indices)} Spalte(n) abgewählt{RESET}")
|
||
|
||
else:
|
||
indices = parse_selection(choice)
|
||
for idx in indices:
|
||
if headers[idx] in selected:
|
||
selected.discard(headers[idx])
|
||
else:
|
||
selected.add(headers[idx])
|
||
if indices:
|
||
print(f"✓ {len(indices)} Spalte(n) umgeschaltet")
|
||
|
||
return [h for h in headers if h in selected]
|
||
|
||
def sort_data(self, data: List[Dict], sort_column: str, data_type: str) -> List[Dict]:
|
||
"""Daten nach Spalte sortieren"""
|
||
def get_sort_key(row):
|
||
value = row.get(sort_column, '')
|
||
|
||
if data_type == 'datum':
|
||
formats = ['%d.%m.%Y', '%Y-%m-%d', '%d/%m/%Y', '%m/%d/%Y']
|
||
for fmt in formats:
|
||
try:
|
||
return datetime.strptime(value, fmt)
|
||
except ValueError:
|
||
continue
|
||
return datetime.min
|
||
|
||
elif data_type == 'zeit':
|
||
formats = ['%H:%M:%S', '%H:%M']
|
||
for fmt in formats:
|
||
try:
|
||
return datetime.strptime(value, fmt).time()
|
||
except ValueError:
|
||
continue
|
||
return datetime.min.time()
|
||
|
||
elif data_type == 'dezimalzahl':
|
||
try:
|
||
value = value.replace(',', '.')
|
||
return float(value)
|
||
except ValueError:
|
||
return 0.0
|
||
|
||
else:
|
||
return value.lower()
|
||
|
||
return sorted(data, key=get_sort_key)
|
||
|
||
def configure_csv_export(self) -> Tuple[str, str]:
|
||
"""CSV-Export-Einstellungen konfigurieren"""
|
||
print("\n" + "="*70)
|
||
print(" CSV-EXPORT KONFIGURATION")
|
||
print("="*70)
|
||
|
||
print("\nDelimiter wählen:")
|
||
print(" 1. Semikolon (;) - Standard für deutsche Excel-Versionen")
|
||
print(" 2. Komma (,) - Internationaler Standard")
|
||
print(" 3. Tab - Gut für Import in andere Programme")
|
||
|
||
while True:
|
||
delim_choice = input("Ihre Wahl [1-3]: ").strip()
|
||
delim_map = {'1': ';', '2': ',', '3': '\t'}
|
||
if delim_choice in delim_map:
|
||
delimiter = delim_map[delim_choice]
|
||
break
|
||
print("Ungültige Eingabe.")
|
||
|
||
print("\nQuoting (Anführungszeichen) wählen:")
|
||
print(" 1. MINIMAL - Nur Felder mit Sonderzeichen (empfohlen)")
|
||
print(" 2. ALLE - Alle Felder in Anführungszeichen")
|
||
print(" 3. NICHT_NUMERISCH - Nur Text-Felder")
|
||
print(" 4. KEINE - Keine Anführungszeichen (kann Probleme verursachen)")
|
||
|
||
while True:
|
||
quot_choice = input("Ihre Wahl [1-4]: ").strip()
|
||
quot_map = {'1': 'MINIMAL', '2': 'ALLE', '3': 'NICHT_NUMERISCH', '4': 'KEINE'}
|
||
if quot_choice in quot_map:
|
||
quoting = quot_map[quot_choice]
|
||
break
|
||
print("Ungültige Eingabe.")
|
||
|
||
delimiter_names = {';': 'Semikolon', ',': 'Komma', '\t': 'Tab'}
|
||
print("\n" + "-"*70)
|
||
print(f"Gewählte Einstellungen:")
|
||
print(f" Delimiter: {delimiter_names.get(delimiter, delimiter)}")
|
||
print(f" Quoting: {quoting}")
|
||
print("-"*70)
|
||
|
||
return delimiter, quoting
|
||
|
||
def write_csv(self, filepath: Path, headers: List[str], data: List[Dict],
|
||
header_text: str = "", footer_text: str = "",
|
||
delimiter: str = ';', quoting: str = 'MINIMAL'):
|
||
"""CSV-Datei schreiben"""
|
||
quoting_map = {
|
||
'ALLE': csv.QUOTE_ALL,
|
||
'MINIMAL': csv.QUOTE_MINIMAL,
|
||
'NICHT_NUMERISCH': csv.QUOTE_NONNUMERIC,
|
||
'KEINE': csv.QUOTE_NONE
|
||
}
|
||
|
||
csv_quoting = quoting_map.get(quoting, csv.QUOTE_MINIMAL)
|
||
|
||
with open(filepath, 'w', encoding='utf-8', newline='') as f:
|
||
if header_text:
|
||
f.write(f"# {header_text}\n")
|
||
|
||
writer = csv.DictWriter(f, fieldnames=headers, delimiter=delimiter,
|
||
quoting=csv_quoting, quotechar='"')
|
||
writer.writeheader()
|
||
writer.writerows(data)
|
||
|
||
if footer_text:
|
||
f.write(f"# {footer_text}\n")
|
||
|
||
print(f"✓ CSV-Datei erfolgreich gespeichert: {filepath}")
|
||
|
||
def write_excel(self, filepath: Path, headers: List[str], data: List[Dict],
|
||
header_text: str = "", footer_text: str = ""):
|
||
"""Excel-Datei schreiben"""
|
||
if not EXCEL_SUPPORT:
|
||
print("Fehler: openpyxl ist nicht installiert. Installieren Sie es mit: pip install openpyxl")
|
||
return
|
||
|
||
wb = openpyxl.Workbook()
|
||
ws = wb.active
|
||
ws.title = "Daten"
|
||
|
||
row_num = 1
|
||
|
||
if header_text:
|
||
ws.cell(row=row_num, column=1, value=f"# {header_text}")
|
||
ws.cell(row=row_num, column=1).font = Font(italic=True)
|
||
row_num += 1
|
||
|
||
for col_num, header in enumerate(headers, 1):
|
||
cell = ws.cell(row=row_num, column=col_num, value=header)
|
||
cell.font = Font(bold=True)
|
||
cell.fill = PatternFill(start_color="CCCCCC", end_color="CCCCCC", fill_type="solid")
|
||
row_num += 1
|
||
|
||
for row_data in data:
|
||
for col_num, header in enumerate(headers, 1):
|
||
ws.cell(row=row_num, column=col_num, value=row_data.get(header, ''))
|
||
row_num += 1
|
||
|
||
if footer_text:
|
||
ws.cell(row=row_num, column=1, value=f"# {footer_text}")
|
||
ws.cell(row=row_num, column=1).font = Font(italic=True)
|
||
|
||
for column in ws.columns:
|
||
max_length = 0
|
||
column_letter = column[0].column_letter
|
||
for cell in column:
|
||
try:
|
||
if len(str(cell.value)) > max_length:
|
||
max_length = len(cell.value)
|
||
except:
|
||
pass
|
||
adjusted_width = min(max_length + 2, 50)
|
||
ws.column_dimensions[column_letter].width = adjusted_width
|
||
|
||
wb.save(filepath)
|
||
print(f"✓ Excel-Datei erfolgreich gespeichert: {filepath}")
|
||
|
||
def write_ods(self, filepath: Path, headers: List[str], data: List[Dict],
|
||
header_text: str = "", footer_text: str = ""):
|
||
"""OpenDocument Spreadsheet (ODS) schreiben"""
|
||
if not ODT_SUPPORT:
|
||
print("Fehler: odfpy ist nicht installiert. Installieren Sie es mit: pip install odfpy")
|
||
return
|
||
|
||
doc = OpenDocumentSpreadsheet()
|
||
table = Table(name="Daten")
|
||
|
||
if header_text:
|
||
row = TableRow()
|
||
cell = TableCell()
|
||
cell.addElement(P(text=f"# {header_text}"))
|
||
row.addElement(cell)
|
||
table.addElement(row)
|
||
|
||
row = TableRow()
|
||
for header in headers:
|
||
cell = TableCell()
|
||
cell.addElement(P(text=header))
|
||
row.addElement(cell)
|
||
table.addElement(row)
|
||
|
||
for row_data in data:
|
||
row = TableRow()
|
||
for header in headers:
|
||
cell = TableCell()
|
||
cell.addElement(P(text=str(row_data.get(header, ''))))
|
||
row.addElement(cell)
|
||
table.addElement(row)
|
||
|
||
if footer_text:
|
||
row = TableRow()
|
||
cell = TableCell()
|
||
cell.addElement(P(text=f"# {footer_text}"))
|
||
row.addElement(cell)
|
||
table.addElement(row)
|
||
|
||
doc.spreadsheet.addElement(table)
|
||
doc.save(filepath)
|
||
print(f"✓ ODS-Datei erfolgreich gespeichert: {filepath}")
|
||
|
||
def select_output_format(self) -> str:
|
||
"""Ausgabeformat auswählen"""
|
||
formats = ["csv"]
|
||
print("\nVerfügbare Ausgabeformate:")
|
||
print(" 1. CSV")
|
||
|
||
format_num = 2
|
||
if EXCEL_SUPPORT:
|
||
formats.append("xlsx")
|
||
print(f" {format_num}. Excel (XLSX)")
|
||
format_num += 1
|
||
|
||
if ODT_SUPPORT:
|
||
formats.append("ods")
|
||
print(f" {format_num}. OpenDocument (ODS)")
|
||
|
||
while True:
|
||
choice = input("Ihre Wahl: ").strip()
|
||
try:
|
||
idx = int(choice) - 1
|
||
if 0 <= idx < len(formats):
|
||
return formats[idx]
|
||
except ValueError:
|
||
if choice.lower() in formats:
|
||
return choice.lower()
|
||
print("Ungültige Eingabe.")
|
||
|
||
def manage_presets(self):
|
||
"""Voreinstellungen verwalten"""
|
||
while True:
|
||
self.clear_screen()
|
||
print("╔═══════════════════════════════════════════════════════╗")
|
||
print("║ VOREINSTELLUNGEN VERWALTEN ║")
|
||
print("╚═══════════════════════════════════════════════════════╝")
|
||
|
||
presets = self.get_available_presets()
|
||
|
||
if presets:
|
||
print("\nVerfügbare Voreinstellungen:")
|
||
for i, preset in enumerate(presets, 1):
|
||
print(f" {i}. {preset}")
|
||
else:
|
||
print("\n⚠ Keine Voreinstellungen vorhanden.")
|
||
|
||
print("\nOptionen:")
|
||
print(" 1. Voreinstellung bearbeiten")
|
||
print(" 2. Voreinstellung löschen")
|
||
print(" 0. Zurück zum Hauptmenü")
|
||
|
||
choice = input("\nIhre Wahl: ").strip()
|
||
|
||
if choice == '0':
|
||
return
|
||
|
||
elif choice == '1' and presets:
|
||
while True:
|
||
preset_choice = input("\nNummer oder Name der Voreinstellung: ").strip()
|
||
|
||
try:
|
||
idx = int(preset_choice) - 1
|
||
if 0 <= idx < len(presets):
|
||
preset_name = presets[idx]
|
||
else:
|
||
print("Ungültige Nummer.")
|
||
continue
|
||
except ValueError:
|
||
preset_name = preset_choice
|
||
|
||
if preset_name in presets:
|
||
self.edit_preset(preset_name)
|
||
break
|
||
else:
|
||
print(f"Voreinstellung '{preset_name}' nicht gefunden.")
|
||
|
||
elif choice == '2' and presets:
|
||
while True:
|
||
preset_choice = input("\nNummer oder Name der zu löschenden Voreinstellung: ").strip()
|
||
|
||
try:
|
||
idx = int(preset_choice) - 1
|
||
if 0 <= idx < len(presets):
|
||
preset_name = presets[idx]
|
||
else:
|
||
print("Ungültige Nummer.")
|
||
continue
|
||
except ValueError:
|
||
preset_name = preset_choice
|
||
|
||
if preset_name in presets:
|
||
if self.ask_yes_no(f"Voreinstellung '{preset_name}' wirklich löschen?"):
|
||
self.delete_preset(preset_name)
|
||
input("\nDrücken Sie Enter um fortzufahren...")
|
||
break
|
||
else:
|
||
print(f"Voreinstellung '{preset_name}' nicht gefunden.")
|
||
|
||
def edit_preset(self, preset_name: str):
|
||
"""Voreinstellung bearbeiten"""
|
||
config = self.load_preset(preset_name)
|
||
|
||
print(f"\n✓ Voreinstellung '{preset_name}' geladen.")
|
||
print("\nSie können nun die Einstellungen Schritt für Schritt durchgehen und ändern.")
|
||
print("Die aktuellen Werte werden als Vorschläge angezeigt.")
|
||
input("\nDrücken Sie Enter um zu beginnen...")
|
||
|
||
config = self.configure_settings(config, edit_mode=True)
|
||
|
||
self.clear_screen()
|
||
print("╔═══════════════════════════════════════════════════════╗")
|
||
print("║ VOREINSTELLUNG SPEICHERN ║")
|
||
print("╚═══════════════════════════════════════════════════════╝")
|
||
|
||
print("\nMöchten Sie die Änderungen speichern?")
|
||
print(f" 1. Unter gleichem Namen überschreiben ('{preset_name}')")
|
||
print(f" 2. Als neue Voreinstellung speichern")
|
||
print(f" 3. Änderungen verwerfen")
|
||
|
||
while True:
|
||
save_choice = input("\nIhre Wahl [1-3]: ").strip()
|
||
|
||
if save_choice == '1':
|
||
self.save_preset(preset_name, config)
|
||
input("\nDrücken Sie Enter um fortzufahren...")
|
||
return
|
||
|
||
elif save_choice == '2':
|
||
new_name = input("\nName für die neue Voreinstellung: ").strip()
|
||
if new_name:
|
||
self.save_preset(new_name, config)
|
||
input("\nDrücken Sie Enter um fortzufahren...")
|
||
return
|
||
|
||
elif save_choice == '3':
|
||
print("\nÄnderungen verworfen.")
|
||
input("\nDrücken Sie Enter um fortzufahren...")
|
||
return
|
||
|
||
def configure_settings(self, config: Dict = None, edit_mode: bool = False) -> Dict:
|
||
"""Einstellungen konfigurieren (für Bearbeitung oder neue Voreinstellung)"""
|
||
if config is None:
|
||
config = {}
|
||
|
||
self.clear_screen()
|
||
print("╔═══════════════════════════════════════════════════════╗")
|
||
print("║ EINSTELLUNGEN KONFIGURIEREN ║")
|
||
print("╚═══════════════════════════════════════════════════════╝")
|
||
|
||
# 1. Kopfzeile vorhanden?
|
||
print("\n" + "="*60)
|
||
print(" 1. Kopfzeile")
|
||
print("="*60)
|
||
current = config.get('has_header')
|
||
if current is not None:
|
||
print(f"Aktuell: {'Ja' if current else 'Nein'}")
|
||
config['has_header'] = self.ask_yes_no("Hat die Datei normalerweise eine Kopfzeile?", current)
|
||
|
||
# 2. Mapping-Datei
|
||
print("\n" + "="*60)
|
||
print(" 2. Spaltennamen-Mapping")
|
||
print("="*60)
|
||
current_mapping = config.get('mapping_file')
|
||
if current_mapping:
|
||
print(f"Aktuell: {current_mapping}")
|
||
|
||
if self.ask_yes_no("Möchten Sie eine Mapping-Datei verwenden?", current_mapping is not None):
|
||
mapping_file = self.select_file("Pfad zur Mapping-JSON-Datei")
|
||
if mapping_file:
|
||
config['mapping_file'] = str(mapping_file)
|
||
else:
|
||
config['mapping_file'] = None
|
||
|
||
# 3. Leere Zeilen entfernen
|
||
print("\n" + "="*60)
|
||
print(" 3. Leere Zeilen")
|
||
print("="*60)
|
||
current = config.get('remove_empty_rows')
|
||
if current is not None:
|
||
print(f"Aktuell: {'Ja' if current else 'Nein'}")
|
||
config['remove_empty_rows'] = self.ask_yes_no("Leere Zeilen entfernen?", current)
|
||
|
||
# 4. Unvollständige Zeilen filtern
|
||
print("\n" + "="*60)
|
||
print(" 4. Unvollständige Zeilen")
|
||
print("="*60)
|
||
current = config.get('filter_incomplete_rows')
|
||
if current is not None:
|
||
print(f"Aktuell: {'Ja' if current else 'Nein'}")
|
||
config['filter_incomplete_rows'] = self.ask_yes_no("Zeilen mit zu wenig Daten analysieren?", current)
|
||
|
||
# 5. Leere Spalten entfernen
|
||
print("\n" + "="*60)
|
||
print(" 5. Leere Spalten")
|
||
print("="*60)
|
||
current = config.get('remove_empty')
|
||
if current is not None:
|
||
print(f"Aktuell: {'Ja' if current else 'Nein'}")
|
||
config['remove_empty'] = self.ask_yes_no("Leere Spalten entfernen?", current)
|
||
|
||
# 6. Spaltenauswahl
|
||
print("\n" + "="*60)
|
||
print(" 6. Spaltenauswahl")
|
||
print("="*60)
|
||
|
||
if 'selected_columns' in config and config['selected_columns']:
|
||
print(f"Aktuell gespeicherte Spalten: {len(config['selected_columns'])}")
|
||
print("\nGespeicherte Spalten:")
|
||
for i, col in enumerate(config['selected_columns'], 1):
|
||
print(f" {i}. {col}")
|
||
|
||
if self.ask_yes_no("\nMöchten Sie die Spaltenauswahl ändern?"):
|
||
print("\nBitte laden Sie eine Beispieldatei, um Spalten auszuwählen:")
|
||
example_file = self.select_file("Pfad zur Beispieldatei (CSV/Excel)")
|
||
|
||
if example_file:
|
||
try:
|
||
file_ext = example_file.suffix.lower()
|
||
if file_ext in ['.xlsx', '.xls']:
|
||
ex_headers, ex_data, ex_has_header, _ = self.read_excel(example_file)
|
||
else:
|
||
ex_headers, ex_data, ex_has_header, _ = self.read_csv(example_file)
|
||
|
||
if config.get('mapping_file'):
|
||
mapping_file = Path(config['mapping_file'])
|
||
if mapping_file.exists():
|
||
mappings = self.load_column_mappings(mapping_file)
|
||
ex_headers, ex_data, ex_original_names = self.apply_column_mappings(ex_headers, ex_data, mappings)
|
||
else:
|
||
ex_original_names = {h: h for h in ex_headers}
|
||
else:
|
||
ex_original_names = {h: h for h in ex_headers}
|
||
|
||
config['selected_columns'] = self.select_columns(ex_headers, ex_original_names, config['selected_columns'])
|
||
|
||
except Exception as e:
|
||
print(f"Fehler beim Laden der Beispieldatei: {e}")
|
||
else:
|
||
print("Keine Spalten gespeichert.")
|
||
if self.ask_yes_no("Möchten Sie Spalten auswählen?"):
|
||
print("\nBitte laden Sie eine Beispieldatei, um Spalten auszuwählen:")
|
||
example_file = self.select_file("Pfad zur Beispieldatei (CSV/Excel)")
|
||
|
||
if example_file:
|
||
try:
|
||
file_ext = example_file.suffix.lower()
|
||
if file_ext in ['.xlsx', '.xls']:
|
||
ex_headers, ex_data, ex_has_header, _ = self.read_excel(example_file)
|
||
else:
|
||
ex_headers, ex_data, ex_has_header, _ = self.read_csv(example_file)
|
||
|
||
if config.get('mapping_file'):
|
||
mapping_file = Path(config['mapping_file'])
|
||
if mapping_file.exists():
|
||
mappings = self.load_column_mappings(mapping_file)
|
||
ex_headers, ex_data, ex_original_names = self.apply_column_mappings(ex_headers, ex_data, mappings)
|
||
else:
|
||
ex_original_names = {h: h for h in ex_headers}
|
||
else:
|
||
ex_original_names = {h: h for h in ex_headers}
|
||
|
||
config['selected_columns'] = self.select_columns(ex_headers, ex_original_names, None)
|
||
|
||
except Exception as e:
|
||
print(f"Fehler beim Laden der Beispieldatei: {e}")
|
||
|
||
# 7. Kopf- und Fußzeilen
|
||
print("\n" + "="*60)
|
||
print(" 7. Kopf- und Fußzeilen")
|
||
print("="*60)
|
||
current = config.get('add_header_footer')
|
||
if current is not None:
|
||
print(f"Aktuell: {'Ja' if current else 'Nein'}")
|
||
if current:
|
||
print(f" Kopfzeile: {config.get('header_text', '')}")
|
||
print(f" Fußzeile: {config.get('footer_text', '')}")
|
||
|
||
if self.ask_yes_no("Kopf-/Fußzeile hinzufügen?", current):
|
||
header_text = input("Kopfzeile (Enter für keine): ").strip()
|
||
footer_text = input("Fußzeile (Enter für keine): ").strip()
|
||
config['add_header_footer'] = True
|
||
config['header_text'] = header_text
|
||
config['footer_text'] = footer_text
|
||
else:
|
||
config['add_header_footer'] = False
|
||
|
||
# 8. Sortierung
|
||
print("\n" + "="*60)
|
||
print(" 8. Sortierung")
|
||
print("="*60)
|
||
current_sort = config.get('sort_column')
|
||
if current_sort:
|
||
print(f"Aktuell: Nach '{current_sort}' ({config.get('sort_type', 'string')})")
|
||
|
||
if self.ask_yes_no("Daten sortieren?", current_sort is not None):
|
||
sort_column = input("Spaltenname für Sortierung: ").strip()
|
||
if sort_column:
|
||
print("\nDatentyp für Sortierung:")
|
||
print(" 1. Text (string)")
|
||
print(" 2. Datum")
|
||
print(" 3. Zeit")
|
||
print(" 4. Dezimalzahl")
|
||
|
||
type_map = {'1': 'string', '2': 'datum', '3': 'zeit', '4': 'dezimalzahl'}
|
||
data_type = type_map.get(input("Ihre Wahl [1-4]: ").strip(), 'string')
|
||
|
||
config['sort_column'] = sort_column
|
||
config['sort_type'] = data_type
|
||
else:
|
||
config['sort_column'] = None
|
||
|
||
# 9. Ausgabeformat
|
||
print("\n" + "="*60)
|
||
print(" 9. Ausgabeformat")
|
||
print("="*60)
|
||
current_format = config.get('output_format', 'csv')
|
||
print(f"Aktuell: {current_format.upper()}")
|
||
|
||
formats = ["csv"]
|
||
print("\nVerfügbare Ausgabeformate:")
|
||
print(" 1. CSV")
|
||
|
||
format_num = 2
|
||
if EXCEL_SUPPORT:
|
||
formats.append("xlsx")
|
||
print(f" {format_num}. Excel (XLSX)")
|
||
format_num += 1
|
||
|
||
if ODT_SUPPORT:
|
||
formats.append("ods")
|
||
print(f" {format_num}. OpenDocument (ODS)")
|
||
|
||
while True:
|
||
choice = input("Ihre Wahl: ").strip()
|
||
try:
|
||
idx = int(choice) - 1
|
||
if 0 <= idx < len(formats):
|
||
config['output_format'] = formats[idx]
|
||
break
|
||
except ValueError:
|
||
if choice.lower() in formats:
|
||
config['output_format'] = choice.lower()
|
||
break
|
||
print("Ungültige Eingabe.")
|
||
|
||
# 10. CSV-Export-Einstellungen (nur wenn CSV gewählt)
|
||
if config['output_format'] == 'csv':
|
||
print("\n" + "="*60)
|
||
print(" 10. CSV-Export-Einstellungen")
|
||
print("="*60)
|
||
|
||
current_delim = config.get('export_delimiter')
|
||
current_quot = config.get('export_quoting')
|
||
|
||
if current_delim and current_quot:
|
||
delimiter_names = {';': 'Semikolon', ',': 'Komma', '\t': 'Tab'}
|
||
print(f"Aktuell: Delimiter={delimiter_names.get(current_delim, current_delim)}, Quoting={current_quot}")
|
||
|
||
if self.ask_yes_no("CSV-Export-Einstellungen konfigurieren?", True):
|
||
export_delimiter, export_quoting = self.configure_csv_export()
|
||
config['export_delimiter'] = export_delimiter
|
||
config['export_quoting'] = export_quoting
|
||
|
||
return config
|
||
|
||
def main_menu(self):
|
||
"""Hauptmenü anzeigen"""
|
||
while True:
|
||
self.clear_screen()
|
||
print("╔═══════════════════════════════════════════════════════╗")
|
||
print("║ CSV-PROCESSOR v2.0 - Dateiverarbeitung ║")
|
||
print("╚═══════════════════════════════════════════════════════╝")
|
||
|
||
print("\nVerfügbare Features:")
|
||
print(f" Import: CSV{', Excel (XLSX/XLS)' if EXCEL_SUPPORT else ''}")
|
||
print(f" Export: CSV{', Excel (XLSX)' if EXCEL_SUPPORT else ''}{', OpenDocument (ODS)' if ODT_SUPPORT else ''}")
|
||
|
||
if not EXCEL_SUPPORT:
|
||
print(f" ⚠ Excel-Support: pip install openpyxl")
|
||
if not ODT_SUPPORT:
|
||
print(f" ⚠ ODS-Support: pip install odfpy")
|
||
|
||
print("\n" + "="*60)
|
||
print(" HAUPTMENÜ")
|
||
print("="*60)
|
||
print(" 1. Neue Datei verarbeiten")
|
||
print(" 2. Voreinstellungen verwalten")
|
||
print(" 0. Beenden")
|
||
|
||
choice = input("\nIhre Wahl: ").strip()
|
||
|
||
if choice == '1':
|
||
self.run()
|
||
elif choice == '2':
|
||
self.manage_presets()
|
||
elif choice == '0':
|
||
print("\nAuf Wiedersehen!")
|
||
sys.exit(0)
|
||
else:
|
||
print("\nUngültige Eingabe.")
|
||
input("Drücken Sie Enter um fortzufahren...")
|
||
|
||
def run(self):
|
||
"""Hauptprogramm ausführen"""
|
||
self.clear_screen()
|
||
print("╔═══════════════════════════════════════════════════════╗")
|
||
print("║ DATEI VERARBEITEN ║")
|
||
print("╚═══════════════════════════════════════════════════════╝")
|
||
|
||
config = {}
|
||
use_preset = False
|
||
preset_loaded_file = False
|
||
|
||
# 1. Voreinstellungen abfragen
|
||
presets = self.get_available_presets()
|
||
if presets:
|
||
self.print_header("Schritt 1: Voreinstellungen")
|
||
print("Verfügbare Voreinstellungen:")
|
||
for i, preset in enumerate(presets, 1):
|
||
print(f" {i}. {preset}")
|
||
|
||
if self.ask_yes_no("\nMöchten Sie eine Voreinstellung laden?"):
|
||
while True:
|
||
choice = input("Nummer oder Name der Voreinstellung: ").strip()
|
||
|
||
try:
|
||
idx = int(choice) - 1
|
||
if 0 <= idx < len(presets):
|
||
preset_name = presets[idx]
|
||
else:
|
||
print("Ungültige Nummer.")
|
||
continue
|
||
except ValueError:
|
||
preset_name = choice
|
||
|
||
if preset_name in presets:
|
||
config = self.load_preset(preset_name)
|
||
use_preset = True
|
||
print(f"✓ Voreinstellung '{preset_name}' geladen.")
|
||
|
||
# Zeige was geladen wurde
|
||
if 'source_file' in config and config['source_file']:
|
||
print(f" → Quelldatei vorbelegt: {config['source_file']}")
|
||
if 'mapping_file' in config and config['mapping_file']:
|
||
print(f" → Mapping vorbelegt: {config['mapping_file']}")
|
||
if 'selected_columns' in config and config['selected_columns']:
|
||
print(f" → Spaltenauswahl: {len(config['selected_columns'])} Spalten")
|
||
|
||
print("\nℹ Leere Spalten werden automatisch übersprungen (vor Spaltenauswahl)")
|
||
|
||
break
|
||
else:
|
||
print(f"Voreinstellung '{preset_name}' nicht gefunden.")
|
||
|
||
# 2. Quell-Datei
|
||
self.print_header("Schritt 2: Quelldatei auswählen")
|
||
|
||
# Wenn Voreinstellung Quelldatei enthält, diese verwenden
|
||
if use_preset and 'source_file' in config and config['source_file']:
|
||
source_path = Path(config['source_file'])
|
||
if source_path.exists():
|
||
print(f"Verwende Quelldatei aus Voreinstellung: {source_path}")
|
||
if self.ask_yes_no("Diese Datei verwenden?", True):
|
||
source_file = source_path
|
||
preset_loaded_file = True
|
||
else:
|
||
source_file = None
|
||
else:
|
||
print(f"⚠ Gespeicherte Quelldatei nicht gefunden: {config['source_file']}")
|
||
source_file = None
|
||
else:
|
||
source_file = None
|
||
|
||
# Wenn keine Datei aus Voreinstellung oder Benutzer will andere Datei
|
||
if not source_file:
|
||
while not source_file:
|
||
source_file = self.select_file("Pfad zur Quelldatei (CSV/XLSX/XLS/ODS)")
|
||
if not source_file:
|
||
print("Eine Quelldatei ist erforderlich!")
|
||
|
||
# Datei lesen
|
||
file_ext = source_file.suffix.lower()
|
||
has_header = config.get('has_header', None) if use_preset else None
|
||
|
||
try:
|
||
if file_ext in ['.xlsx', '.xls']:
|
||
headers, data, has_header, import_settings = self.read_excel(source_file)
|
||
else:
|
||
import_encoding = config.get('import_encoding') if use_preset else None
|
||
import_delimiter = config.get('import_delimiter') if use_preset else None
|
||
import_quoting = config.get('import_quoting') if use_preset else None
|
||
|
||
headers, data, has_header, import_settings = self.read_csv(
|
||
source_file, has_header, import_encoding, import_delimiter, import_quoting
|
||
)
|
||
|
||
config['import_encoding'] = import_settings['encoding']
|
||
config['import_delimiter'] = import_settings['delimiter']
|
||
config['import_quoting'] = import_settings['quoting']
|
||
|
||
if headers is None:
|
||
print("Fehler beim Lesen der Datei.")
|
||
return
|
||
|
||
config['has_header'] = has_header
|
||
print(f"\n✓ Datei geladen: {len(headers)} Spalten, {len(data)} Zeilen")
|
||
except Exception as e:
|
||
print(f"Fehler beim Lesen der Datei: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return
|
||
|
||
# 3. Spaltennamen-Mapping
|
||
self.print_header("Schritt 3: Spaltennamen umbenennen")
|
||
original_names = {}
|
||
|
||
if use_preset and 'mapping_file' in config:
|
||
mapping_file = Path(config['mapping_file']) if config['mapping_file'] else None
|
||
if mapping_file and mapping_file.exists():
|
||
print(f"Verwende Mapping-Datei aus Voreinstellung: {mapping_file}")
|
||
mappings = self.load_column_mappings(mapping_file)
|
||
else:
|
||
if mapping_file:
|
||
print(f"⚠ Mapping-Datei {mapping_file} nicht gefunden.")
|
||
mappings = {}
|
||
else:
|
||
if self.ask_yes_no("Möchten Sie Spaltennamen umbenennen?"):
|
||
mapping_file = self.select_file("Pfad zur Mapping-JSON-Datei")
|
||
mappings = self.load_column_mappings(mapping_file)
|
||
if mapping_file:
|
||
config['mapping_file'] = str(mapping_file)
|
||
else:
|
||
mappings = {}
|
||
|
||
if mappings:
|
||
headers, data, original_names = self.apply_column_mappings(headers, data, mappings)
|
||
print(f"✓ {len(mappings)} Spaltennamen umbenannt")
|
||
else:
|
||
original_names = {h: h for h in headers}
|
||
|
||
# 4. Leere Zeilen entfernen
|
||
self.print_header("Schritt 4: Leere Zeilen")
|
||
if use_preset and 'remove_empty_rows' in config:
|
||
remove_empty_rows = config['remove_empty_rows']
|
||
print(f"Verwende Voreinstellung: {'Ja' if remove_empty_rows else 'Nein'}")
|
||
else:
|
||
remove_empty_rows = self.ask_yes_no("Sollen komplett leere Zeilen entfernt werden?")
|
||
config['remove_empty_rows'] = remove_empty_rows
|
||
|
||
if remove_empty_rows:
|
||
original_count = len(data)
|
||
data = self.remove_empty_rows(data)
|
||
removed_count = original_count - len(data)
|
||
print(f"✓ {removed_count} leere Zeilen entfernt")
|
||
|
||
# 5. Zeilen mit zu wenig Daten filtern
|
||
self.print_header("Schritt 5: Unvollständige Zeilen")
|
||
if use_preset and 'filter_incomplete_rows' in config:
|
||
filter_incomplete = config['filter_incomplete_rows']
|
||
print(f"Verwende Voreinstellung: {'Ja' if filter_incomplete else 'Nein'}")
|
||
else:
|
||
filter_incomplete = self.ask_yes_no("Möchten Sie Zeilen mit zu wenig Informationen analysieren/entfernen?")
|
||
config['filter_incomplete_rows'] = filter_incomplete
|
||
|
||
if filter_incomplete:
|
||
data = self.filter_rows_by_filled_fields(headers, data)
|
||
|
||
# 6. Leere Spalten entfernen (VOR Spaltenauswahl!)
|
||
self.print_header("Schritt 6: Leere Spalten")
|
||
if use_preset and 'remove_empty' in config:
|
||
remove_empty = config['remove_empty']
|
||
print(f"Verwende Voreinstellung: {'Ja' if remove_empty else 'Nein'}")
|
||
else:
|
||
remove_empty = self.ask_yes_no("Sollen leere Spalten entfernt werden?")
|
||
config['remove_empty'] = remove_empty
|
||
|
||
if remove_empty:
|
||
original_count = len(headers)
|
||
headers, data = self.remove_empty_columns(headers, data)
|
||
removed_count = original_count - len(headers)
|
||
print(f"✓ {removed_count} leere Spalten entfernt")
|
||
|
||
# WICHTIG: original_names aktualisieren
|
||
original_names = {h: original_names[h] for h in headers if h in original_names}
|
||
|
||
# 7. Spaltenauswahl
|
||
self.print_header("Schritt 7: Spaltenauswahl")
|
||
|
||
# Bei Voreinstellung: Nur vorhandene (nicht-leere) Spalten verwenden
|
||
if use_preset and 'selected_columns' in config:
|
||
preselected = [col for col in config['selected_columns'] if col in headers]
|
||
|
||
if preselected:
|
||
selected_columns = preselected
|
||
print(f"Verwende gespeicherte Spaltenauswahl:")
|
||
print(f"(Nur nicht-leere Spalten werden berücksichtigt)")
|
||
for i, col in enumerate(selected_columns, 1):
|
||
orig = original_names.get(col, col)
|
||
if orig != col:
|
||
print(f" {i}. {orig} → {col}")
|
||
else:
|
||
print(f" {i}. {col}")
|
||
print(f"\n✓ {len(selected_columns)} Spalten ausgewählt")
|
||
|
||
# Zeige übersprungene Spalten
|
||
skipped = [col for col in config['selected_columns'] if col not in headers]
|
||
if skipped:
|
||
print(f"\nℹ Übersprungene Spalten (leer oder nicht vorhanden): {len(skipped)}")
|
||
for col in skipped[:5]:
|
||
print(f" - {col}")
|
||
if len(skipped) > 5:
|
||
print(f" ... und {len(skipped)-5} weitere")
|
||
else:
|
||
print("⚠ Alle gespeicherten Spalten sind leer oder nicht vorhanden.")
|
||
print("Bitte neue Auswahl treffen:")
|
||
selected_columns = self.select_columns(headers, original_names, None)
|
||
config['selected_columns'] = selected_columns
|
||
else:
|
||
selected_columns = self.select_columns(headers, original_names, None)
|
||
config['selected_columns'] = selected_columns
|
||
|
||
# Daten filtern
|
||
filtered_data = []
|
||
for row in data:
|
||
filtered_row = {col: row.get(col, '') for col in selected_columns}
|
||
filtered_data.append(filtered_row)
|
||
data = filtered_data
|
||
|
||
# 8. Kopf- und Fußzeilen
|
||
self.print_header("Schritt 8: Kopf- und Fußzeilen")
|
||
header_text = ""
|
||
footer_text = ""
|
||
|
||
if use_preset and 'add_header_footer' in config:
|
||
add_header_footer = config['add_header_footer']
|
||
if add_header_footer:
|
||
header_text = config.get('header_text', '')
|
||
footer_text = config.get('footer_text', '')
|
||
print(f"Verwende Kopfzeile: {header_text}")
|
||
print(f"Verwende Fußzeile: {footer_text}")
|
||
else:
|
||
if self.ask_yes_no("Möchten Sie eine Kopf- oder Fußzeile hinzufügen?"):
|
||
header_text = input("Kopfzeile (Enter für keine): ").strip()
|
||
footer_text = input("Fußzeile (Enter für keine): ").strip()
|
||
config['add_header_footer'] = True
|
||
config['header_text'] = header_text
|
||
config['footer_text'] = footer_text
|
||
else:
|
||
config['add_header_footer'] = False
|
||
|
||
# 9. Sortierung
|
||
self.print_header("Schritt 9: Sortierung")
|
||
if use_preset and 'sort_column' in config:
|
||
sort_column = config['sort_column']
|
||
if sort_column and sort_column in selected_columns:
|
||
data_type = config.get('sort_type', 'string')
|
||
print(f"Sortiere nach '{sort_column}' ({data_type})")
|
||
data = self.sort_data(data, sort_column, data_type)
|
||
else:
|
||
if self.ask_yes_no("Möchten Sie die Daten sortieren?"):
|
||
print("\nVerfügbare Spalten zum Sortieren:")
|
||
for i, col in enumerate(selected_columns, 1):
|
||
print(f" {i}. {col}")
|
||
|
||
while True:
|
||
choice = input("Nummer der Spalte: ").strip()
|
||
try:
|
||
idx = int(choice) - 1
|
||
if 0 <= idx < len(selected_columns):
|
||
sort_column = selected_columns[idx]
|
||
break
|
||
except ValueError:
|
||
pass
|
||
print("Ungültige Eingabe.")
|
||
|
||
print("\nDatentyp für Sortierung:")
|
||
print(" 1. Text (string)")
|
||
print(" 2. Datum")
|
||
print(" 3. Zeit")
|
||
print(" 4. Dezimalzahl")
|
||
|
||
type_map = {'1': 'string', '2': 'datum', '3': 'zeit', '4': 'dezimalzahl'}
|
||
data_type = type_map.get(input("Ihre Wahl [1-4]: ").strip(), 'string')
|
||
|
||
data = self.sort_data(data, sort_column, data_type)
|
||
config['sort_column'] = sort_column
|
||
config['sort_type'] = data_type
|
||
print(f"✓ Daten nach '{sort_column}' sortiert")
|
||
else:
|
||
config['sort_column'] = None
|
||
|
||
# 10. Ausgabeformat und Zieldatei
|
||
self.print_header("Schritt 10: Zieldatei speichern")
|
||
|
||
if use_preset and 'output_format' in config:
|
||
output_format = config['output_format']
|
||
print(f"Ausgabeformat aus Voreinstellung: {output_format.upper()}")
|
||
else:
|
||
output_format = self.select_output_format()
|
||
config['output_format'] = output_format
|
||
|
||
while True:
|
||
target_path = input(f"Name/Pfad der Zieldatei (ohne Endung): ").strip()
|
||
if target_path:
|
||
target_file = Path(target_path).with_suffix(f'.{output_format}')
|
||
|
||
if target_file.exists():
|
||
if self.ask_yes_no(f"Datei {target_file} existiert bereits. Überschreiben?"):
|
||
break
|
||
else:
|
||
break
|
||
|
||
# Datei schreiben
|
||
if output_format == 'csv':
|
||
if use_preset and 'export_delimiter' in config and 'export_quoting' in config:
|
||
export_delimiter = config['export_delimiter']
|
||
export_quoting = config['export_quoting']
|
||
delimiter_names = {';': 'Semikolon', ',': 'Komma', '\t': 'Tab'}
|
||
print(f"\nVerwende Export-Einstellungen aus Voreinstellung:")
|
||
print(f" Delimiter: {delimiter_names.get(export_delimiter, export_delimiter)}")
|
||
print(f" Quoting: {export_quoting}")
|
||
else:
|
||
export_delimiter, export_quoting = self.configure_csv_export()
|
||
config['export_delimiter'] = export_delimiter
|
||
config['export_quoting'] = export_quoting
|
||
|
||
self.write_csv(target_file, selected_columns, data, header_text, footer_text,
|
||
export_delimiter, export_quoting)
|
||
elif output_format == 'xlsx':
|
||
self.write_excel(target_file, selected_columns, data, header_text, footer_text)
|
||
elif output_format == 'ods':
|
||
self.write_ods(target_file, selected_columns, data, header_text, footer_text)
|
||
|
||
print("\n" + "="*60)
|
||
print(" Verarbeitung abgeschlossen!")
|
||
print("="*60)
|
||
|
||
# Voreinstellung speichern
|
||
if not use_preset:
|
||
if self.ask_yes_no("\nMöchten Sie diese Einstellungen als Voreinstellung speichern?"):
|
||
# Fragen ob Quelldatei mitgespeichert werden soll
|
||
save_source = False
|
||
if self.ask_yes_no("Pfad zur Quelldatei mit speichern? (Datei wird dann automatisch geladen)", False):
|
||
config['source_file'] = str(source_file)
|
||
save_source = True
|
||
|
||
preset_name = input("Name für die Voreinstellung: ").strip()
|
||
if preset_name:
|
||
self.save_preset(preset_name, config)
|
||
print(f"\nGespeichert:")
|
||
print(f" - Einstellungen: Ja")
|
||
print(f" - Mapping: {'Ja' if config.get('mapping_file') else 'Nein'}")
|
||
print(f" - Quelldatei: {'Ja' if save_source else 'Nein'}")
|
||
print(f" - Spaltenauswahl: {len(config['selected_columns'])} Spalten")
|
||
|
||
input("\nDrücken Sie Enter um zum Hauptmenü zurückzukehren...")
|
||
|
||
|
||
def main():
|
||
"""Hauptfunktion"""
|
||
processor = CSVProcessor()
|
||
|
||
try:
|
||
processor.main_menu()
|
||
except KeyboardInterrupt:
|
||
print("\n\nProgramm wurde durch Benutzer beendet.")
|
||
sys.exit(0)
|
||
except Exception as e:
|
||
print(f"\nFehler: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
sys.exit(1)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|