commit 6af2c0333f7b1afb8cf00eb3b4786b16ad761772 Author: Developer Date: Sun Jan 18 12:00:39 2026 +0000 Initial commit: Trimble Geodesy Tool mit PyQt5 GUI Features: - JXL-Datei Analyse und Bearbeitung - COR-Datei Generierung - Koordinatentransformation (Rotation/Translation) - Georeferenzierung mit Passpunkten - Netzausgleichung nach kleinsten Quadraten diff --git a/.abacus.donotdelete b/.abacus.donotdelete new file mode 100644 index 0000000..3c36da4 --- /dev/null +++ b/.abacus.donotdelete @@ -0,0 +1 @@ +gAAAAABpbMkVj3c_kR7OnAA7zo0Sfzw05-3MyOtYVW_kwm6pXuL5RNaqVAIlgDozOuzqrE7RPfvpzW2IRwTvNCMn1AQOXx-EsHqqzRj1WS153NEFBXvkT8g= \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..edf4aec --- /dev/null +++ b/README.md @@ -0,0 +1,128 @@ +# Trimble Geodesy Tool + +Ein vollständiges Python-Programm mit grafischer Benutzeroberfläche (GUI) für geodätische Vermessungsarbeiten mit Trimble JXL-Dateien. + +## Funktionen + +### 1. JXL-Datei Analyse und Bearbeitung +- Trimble JXL-Dateien einlesen und analysieren +- Übersicht über Punkte, Stationen und Messungen +- Passpunkte entfernen +- Prismenkonstanten ändern +- Parameter bearbeiten + +### 2. COR-Datei Generierung +- Aus JXL-Dateien Punktdateien im COR-Format berechnen +- Unterstützt zwei Stationierungsarten: + - Erste Stationierung über eine Referenzlinie + - Alle anderen als freie Stationierungen +- Export in verschiedene Formate (COR, CSV, TXT, DXF) + +### 3. Koordinatensystem-Transformation +- Rotation um einen wählbaren Punkt +- Verschiebung in XY-Richtung +- Verschiebung in Z-Richtung +- Zwei Eingabemodi: + - Manuelle Eingabe von Transformationsparametern + - Definition über 2 Punkte (Ursprung und Y-Richtung) +- **Keine Maßstabsänderung** (wie gefordert) + +### 4. Georeferenzierung +- Mindestens 3 Passpunkte für Transformation +- Rotation und Translation des Koordinatensystems +- **Keine Maßstabsänderung** (keine Helmert-Transformation) +- Restfehler ausgleichen und anzeigen +- Detaillierte Qualitätsparameter (RMSE, Residuen) + +### 5. Netzausgleichung +- Methode der kleinsten Quadrate +- Basierend auf Beobachtungen der JXL-Datei +- Automatische Festpunkterkennung +- Ausgabe von: + - Ausgeglichenen Koordinaten + - Standardabweichungen + - Residuen + - Qualitätsparametern (Sigma-0, Chi-Quadrat, RMSE) + +## Installation + +### Voraussetzungen +- Python 3.8 oder höher +- PyQt5 +- NumPy +- SciPy +- lxml + +### Installation der Abhängigkeiten +```bash +pip install PyQt5 numpy scipy lxml +``` + +## Verwendung + +### Programm starten +```bash +cd /home/ubuntu/trimble_geodesy +python3 main.py +``` + +### Workflow +1. **JXL-Analyse Tab**: JXL-Datei laden und analysieren +2. **COR-Generator Tab**: Koordinaten generieren und exportieren +3. **Transformation Tab**: Koordinatensystem rotieren/verschieben +4. **Georeferenzierung Tab**: Mit Passpunkten transformieren +5. **Netzausgleichung Tab**: Netzausgleichung durchführen + +## Dateistruktur + +``` +trimble_geodesy/ +├── main.py # Hauptprogramm mit GUI +├── modules/ +│ ├── __init__.py +│ ├── jxl_parser.py # JXL-Datei Parser +│ ├── cor_generator.py # COR-Datei Generator +│ ├── transformation.py # Koordinatentransformation +│ ├── georeferencing.py # Georeferenzierung +│ └── network_adjustment.py # Netzausgleichung +├── output/ # Ausgabeverzeichnis +└── README.md +``` + +## Technische Details + +### JXL-Format +Das JXL-Format (Trimble JobXML) ist ein XML-basiertes Format für Vermessungsdaten: +- ``: Punktdaten mit Koordinaten und Messungen +- ``: Stationsinformationen +- ``: Prismeneinstellungen +- ``: Orientierungsdaten + +### COR-Format +Das COR-Format ist ein einfaches Textformat für Koordinaten: +``` +|Punkt |X |Y |Z | +|5001 |0.000 |0.000 |0.000 | +|5002 |0 | 11.407 | -0.035 | +``` + +### Transformation +Die Transformation verwendet eine 4-Parameter-Transformation: +- Translation X, Y, Z +- Rotation um die Z-Achse +- **Kein Maßstabsfaktor** (Maßstab = 1.0) + +### Netzausgleichung +Die Netzausgleichung verwendet: +- Gauß-Markov-Modell +- Beobachtungsgleichungen für Richtungen und Strecken +- Iterative Lösung nach Gauß-Newton +- Varianzfortpflanzung für Genauigkeitsmaße + +## Lizenz + +Dieses Programm wurde für geodätische Vermessungsarbeiten entwickelt. + +## Autor + +Entwickelt für geodätische Vermessungsarbeiten mit Trimble-Instrumenten. diff --git a/main.py b/main.py new file mode 100644 index 0000000..60e7a0f --- /dev/null +++ b/main.py @@ -0,0 +1,1056 @@ +#!/usr/bin/env python3 +""" +Trimble Geodesy Tool - Hauptprogramm mit GUI +Geodätische Vermessungsarbeiten mit JXL-Dateien +""" + +import sys +import os +from pathlib import Path + +# Module-Pfad hinzufügen +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from PyQt5.QtWidgets import ( + QApplication, QMainWindow, QWidget, QTabWidget, QVBoxLayout, QHBoxLayout, + QGridLayout, QLabel, QPushButton, QLineEdit, QTextEdit, QFileDialog, + QTableWidget, QTableWidgetItem, QMessageBox, QGroupBox, QComboBox, + QSpinBox, QDoubleSpinBox, QCheckBox, QSplitter, QFrame, QScrollArea, + QHeaderView, QListWidget, QListWidgetItem, QDialog, QDialogButtonBox, + QFormLayout, QProgressBar, QStatusBar, QMenuBar, QMenu, QAction, + QToolBar, QStyle +) +from PyQt5.QtCore import Qt, QThread, pyqtSignal +from PyQt5.QtGui import QFont, QIcon, QPalette, QColor + +from modules.jxl_parser import JXLParser +from modules.cor_generator import CORGenerator, CORPoint +from modules.transformation import CoordinateTransformer, LocalSystemTransformer +from modules.georeferencing import Georeferencer, ControlPoint +from modules.network_adjustment import NetworkAdjustment + + +class JXLAnalysisTab(QWidget): + """Tab für JXL-Datei Analyse und Bearbeitung""" + + def __init__(self, parent=None): + super().__init__(parent) + self.main_window = parent + self.setup_ui() + + def setup_ui(self): + layout = QVBoxLayout(self) + + # Datei laden + file_group = QGroupBox("JXL-Datei") + file_layout = QHBoxLayout(file_group) + + self.file_path_edit = QLineEdit() + self.file_path_edit.setPlaceholderText("JXL-Datei auswählen...") + file_layout.addWidget(self.file_path_edit) + + browse_btn = QPushButton("Durchsuchen") + browse_btn.clicked.connect(self.browse_file) + file_layout.addWidget(browse_btn) + + load_btn = QPushButton("Laden") + load_btn.clicked.connect(self.load_file) + file_layout.addWidget(load_btn) + + layout.addWidget(file_group) + + # Splitter für Info und Tabellen + splitter = QSplitter(Qt.Vertical) + + # Zusammenfassung + summary_group = QGroupBox("Zusammenfassung") + summary_layout = QVBoxLayout(summary_group) + self.summary_text = QTextEdit() + self.summary_text.setReadOnly(True) + self.summary_text.setMaximumHeight(200) + summary_layout.addWidget(self.summary_text) + splitter.addWidget(summary_group) + + # Punkte-Tabelle + points_group = QGroupBox("Punkte") + points_layout = QVBoxLayout(points_group) + + self.points_table = QTableWidget() + self.points_table.setColumnCount(7) + self.points_table.setHorizontalHeaderLabels( + ["Name", "Code", "East (X)", "North (Y)", "Elevation (Z)", "Methode", "Aktiv"]) + self.points_table.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch) + points_layout.addWidget(self.points_table) + + # Punkt-Aktionen + point_actions = QHBoxLayout() + + remove_point_btn = QPushButton("Ausgewählten Punkt entfernen") + remove_point_btn.clicked.connect(self.remove_selected_point) + point_actions.addWidget(remove_point_btn) + + point_actions.addStretch() + points_layout.addLayout(point_actions) + + splitter.addWidget(points_group) + + # Prismenkonstanten + prism_group = QGroupBox("Prismenkonstanten") + prism_layout = QVBoxLayout(prism_group) + + self.prism_table = QTableWidget() + self.prism_table.setColumnCount(4) + self.prism_table.setHorizontalHeaderLabels( + ["Target ID", "Prismentyp", "Konstante [mm]", "Neue Konstante [mm]"]) + self.prism_table.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch) + prism_layout.addWidget(self.prism_table) + + prism_actions = QHBoxLayout() + apply_prism_btn = QPushButton("Prismenkonstanten übernehmen") + apply_prism_btn.clicked.connect(self.apply_prism_changes) + prism_actions.addWidget(apply_prism_btn) + prism_actions.addStretch() + prism_layout.addLayout(prism_actions) + + splitter.addWidget(prism_group) + + layout.addWidget(splitter) + + def browse_file(self): + file_path, _ = QFileDialog.getOpenFileName( + self, "JXL-Datei öffnen", "", "JXL Files (*.jxl);;All Files (*)") + if file_path: + self.file_path_edit.setText(file_path) + + def load_file(self): + file_path = self.file_path_edit.text() + if not file_path or not os.path.exists(file_path): + QMessageBox.warning(self, "Fehler", "Bitte eine gültige JXL-Datei auswählen!") + return + + parser = JXLParser() + if parser.parse(file_path): + self.main_window.parser = parser + self.update_display() + self.main_window.statusBar().showMessage(f"JXL-Datei geladen: {file_path}") + else: + QMessageBox.critical(self, "Fehler", "Datei konnte nicht geladen werden!") + + def update_display(self): + if not self.main_window.parser: + return + + parser = self.main_window.parser + + # Zusammenfassung + self.summary_text.setText(parser.get_summary()) + + # Punkte-Tabelle + points = parser.get_active_points() + self.points_table.setRowCount(len(points)) + + for row, (name, point) in enumerate(sorted(points.items())): + self.points_table.setItem(row, 0, QTableWidgetItem(name)) + self.points_table.setItem(row, 1, QTableWidgetItem(point.code or "")) + self.points_table.setItem(row, 2, QTableWidgetItem(f"{point.east:.4f}" if point.east else "")) + self.points_table.setItem(row, 3, QTableWidgetItem(f"{point.north:.4f}" if point.north else "")) + self.points_table.setItem(row, 4, QTableWidgetItem(f"{point.elevation:.4f}" if point.elevation else "")) + self.points_table.setItem(row, 5, QTableWidgetItem(point.method)) + self.points_table.setItem(row, 6, QTableWidgetItem("Ja" if not point.deleted else "Nein")) + + # Prismen-Tabelle + targets = parser.targets + self.prism_table.setRowCount(len(targets)) + + for row, (tid, target) in enumerate(targets.items()): + self.prism_table.setItem(row, 0, QTableWidgetItem(tid)) + self.prism_table.setItem(row, 1, QTableWidgetItem(target.prism_type)) + self.prism_table.setItem(row, 2, QTableWidgetItem(f"{target.prism_constant * 1000:.1f}")) + + spin = QDoubleSpinBox() + spin.setRange(-100, 100) + spin.setDecimals(1) + spin.setValue(target.prism_constant * 1000) + self.prism_table.setCellWidget(row, 3, spin) + + def remove_selected_point(self): + row = self.points_table.currentRow() + if row >= 0: + name = self.points_table.item(row, 0).text() + reply = QMessageBox.question( + self, "Bestätigung", + f"Punkt '{name}' wirklich entfernen?", + QMessageBox.Yes | QMessageBox.No) + + if reply == QMessageBox.Yes: + self.main_window.parser.remove_point(name) + self.update_display() + + def apply_prism_changes(self): + parser = self.main_window.parser + if not parser: + return + + for row in range(self.prism_table.rowCount()): + tid = self.prism_table.item(row, 0).text() + spin = self.prism_table.cellWidget(row, 3) + new_const = spin.value() / 1000.0 # mm to m + parser.modify_prism_constant(tid, new_const) + + QMessageBox.information(self, "Info", "Prismenkonstanten wurden aktualisiert!") + + +class CORGeneratorTab(QWidget): + """Tab für COR-Datei Generierung""" + + def __init__(self, parent=None): + super().__init__(parent) + self.main_window = parent + self.cor_generator = None + self.setup_ui() + + def setup_ui(self): + layout = QVBoxLayout(self) + + # Optionen + options_group = QGroupBox("Generierungsoptionen") + options_layout = QGridLayout(options_group) + + options_layout.addWidget(QLabel("Methode:"), 0, 0) + self.method_combo = QComboBox() + self.method_combo.addItems([ + "Aus berechneten Koordinaten (ComputedGrid)", + "Aus Rohbeobachtungen berechnen" + ]) + options_layout.addWidget(self.method_combo, 0, 1) + + self.include_header_check = QCheckBox("Stations-Header einfügen") + self.include_header_check.setChecked(True) + options_layout.addWidget(self.include_header_check, 1, 0, 1, 2) + + layout.addWidget(options_group) + + # Generieren Button + generate_btn = QPushButton("COR-Datei generieren") + generate_btn.clicked.connect(self.generate_cor) + layout.addWidget(generate_btn) + + # Vorschau + preview_group = QGroupBox("Vorschau") + preview_layout = QVBoxLayout(preview_group) + + self.preview_table = QTableWidget() + self.preview_table.setColumnCount(4) + self.preview_table.setHorizontalHeaderLabels(["Punkt", "X (East)", "Y (North)", "Z (Elev)"]) + self.preview_table.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch) + preview_layout.addWidget(self.preview_table) + + # Statistiken + self.stats_text = QTextEdit() + self.stats_text.setReadOnly(True) + self.stats_text.setMaximumHeight(100) + preview_layout.addWidget(self.stats_text) + + layout.addWidget(preview_group) + + # Export + export_group = QGroupBox("Export") + export_layout = QHBoxLayout(export_group) + + export_cor_btn = QPushButton("Als COR speichern") + export_cor_btn.clicked.connect(lambda: self.export_file('cor')) + export_layout.addWidget(export_cor_btn) + + export_csv_btn = QPushButton("Als CSV speichern") + export_csv_btn.clicked.connect(lambda: self.export_file('csv')) + export_layout.addWidget(export_csv_btn) + + export_txt_btn = QPushButton("Als TXT speichern") + export_txt_btn.clicked.connect(lambda: self.export_file('txt')) + export_layout.addWidget(export_txt_btn) + + export_dxf_btn = QPushButton("Als DXF speichern") + export_dxf_btn.clicked.connect(lambda: self.export_file('dxf')) + export_layout.addWidget(export_dxf_btn) + + layout.addWidget(export_group) + + def generate_cor(self): + if not self.main_window.parser: + QMessageBox.warning(self, "Fehler", "Bitte zuerst eine JXL-Datei laden!") + return + + self.cor_generator = CORGenerator(self.main_window.parser) + + if self.method_combo.currentIndex() == 0: + points = self.cor_generator.generate_from_computed_grid() + else: + points = self.cor_generator.compute_from_observations() + + # Tabelle aktualisieren + self.preview_table.setRowCount(len(points)) + for row, p in enumerate(points): + self.preview_table.setItem(row, 0, QTableWidgetItem(p.name)) + self.preview_table.setItem(row, 1, QTableWidgetItem(f"{p.x:.4f}")) + self.preview_table.setItem(row, 2, QTableWidgetItem(f"{p.y:.4f}")) + self.preview_table.setItem(row, 3, QTableWidgetItem(f"{p.z:.4f}")) + + # Statistiken + self.stats_text.setText(self.cor_generator.get_statistics()) + + self.main_window.statusBar().showMessage(f"{len(points)} Punkte generiert") + + def export_file(self, format_type): + if not self.cor_generator or not self.cor_generator.cor_points: + QMessageBox.warning(self, "Fehler", "Bitte zuerst Punkte generieren!") + return + + filters = { + 'cor': "COR Files (*.cor)", + 'csv': "CSV Files (*.csv)", + 'txt': "Text Files (*.txt)", + 'dxf': "DXF Files (*.dxf)" + } + + file_path, _ = QFileDialog.getSaveFileName( + self, "Speichern unter", "", filters.get(format_type, "All Files (*)")) + + if file_path: + if format_type == 'cor': + self.cor_generator.write_cor_file(file_path, self.include_header_check.isChecked()) + elif format_type == 'csv': + self.cor_generator.export_csv(file_path) + elif format_type == 'txt': + self.cor_generator.export_txt(file_path) + elif format_type == 'dxf': + self.cor_generator.export_dxf(file_path) + + QMessageBox.information(self, "Erfolg", f"Datei gespeichert: {file_path}") + + +class TransformationTab(QWidget): + """Tab für Koordinatentransformation""" + + def __init__(self, parent=None): + super().__init__(parent) + self.main_window = parent + self.transformer = CoordinateTransformer() + self.setup_ui() + + def setup_ui(self): + layout = QVBoxLayout(self) + + # Methode auswählen + method_group = QGroupBox("Transformationsmethode") + method_layout = QVBoxLayout(method_group) + + self.manual_radio = QCheckBox("Manuelle Parameter") + self.manual_radio.setChecked(True) + self.manual_radio.stateChanged.connect(self.toggle_method) + method_layout.addWidget(self.manual_radio) + + self.twopoint_radio = QCheckBox("Über 2 Punkte definieren") + self.twopoint_radio.stateChanged.connect(self.toggle_method) + method_layout.addWidget(self.twopoint_radio) + + layout.addWidget(method_group) + + # Manuelle Parameter + self.manual_group = QGroupBox("Manuelle Parameter") + manual_layout = QGridLayout(self.manual_group) + + manual_layout.addWidget(QLabel("Verschiebung X (East):"), 0, 0) + self.dx_spin = QDoubleSpinBox() + self.dx_spin.setRange(-1000000, 1000000) + self.dx_spin.setDecimals(4) + manual_layout.addWidget(self.dx_spin, 0, 1) + manual_layout.addWidget(QLabel("m"), 0, 2) + + manual_layout.addWidget(QLabel("Verschiebung Y (North):"), 1, 0) + self.dy_spin = QDoubleSpinBox() + self.dy_spin.setRange(-1000000, 1000000) + self.dy_spin.setDecimals(4) + manual_layout.addWidget(self.dy_spin, 1, 1) + manual_layout.addWidget(QLabel("m"), 1, 2) + + manual_layout.addWidget(QLabel("Verschiebung Z (Höhe):"), 2, 0) + self.dz_spin = QDoubleSpinBox() + self.dz_spin.setRange(-1000000, 1000000) + self.dz_spin.setDecimals(4) + manual_layout.addWidget(self.dz_spin, 2, 1) + manual_layout.addWidget(QLabel("m"), 2, 2) + + manual_layout.addWidget(QLabel("Rotation:"), 3, 0) + self.rotation_spin = QDoubleSpinBox() + self.rotation_spin.setRange(-400, 400) + self.rotation_spin.setDecimals(6) + manual_layout.addWidget(self.rotation_spin, 3, 1) + manual_layout.addWidget(QLabel("gon"), 3, 2) + + manual_layout.addWidget(QLabel("Drehpunkt X:"), 4, 0) + self.pivot_x_spin = QDoubleSpinBox() + self.pivot_x_spin.setRange(-1000000, 1000000) + self.pivot_x_spin.setDecimals(4) + manual_layout.addWidget(self.pivot_x_spin, 4, 1) + + manual_layout.addWidget(QLabel("Drehpunkt Y:"), 5, 0) + self.pivot_y_spin = QDoubleSpinBox() + self.pivot_y_spin.setRange(-1000000, 1000000) + self.pivot_y_spin.setDecimals(4) + manual_layout.addWidget(self.pivot_y_spin, 5, 1) + + layout.addWidget(self.manual_group) + + # 2-Punkte-Definition + self.twopoint_group = QGroupBox("2-Punkte-Definition") + twopoint_layout = QGridLayout(self.twopoint_group) + + twopoint_layout.addWidget(QLabel("Ursprung (0,0):"), 0, 0) + self.origin_combo = QComboBox() + twopoint_layout.addWidget(self.origin_combo, 0, 1) + + twopoint_layout.addWidget(QLabel("Y-Richtung:"), 1, 0) + self.direction_combo = QComboBox() + twopoint_layout.addWidget(self.direction_combo, 1, 1) + + twopoint_layout.addWidget(QLabel("Z-Referenz (0):"), 2, 0) + self.zref_combo = QComboBox() + twopoint_layout.addWidget(self.zref_combo, 2, 1) + + refresh_btn = QPushButton("Punktliste aktualisieren") + refresh_btn.clicked.connect(self.refresh_point_lists) + twopoint_layout.addWidget(refresh_btn, 3, 0, 1, 2) + + self.twopoint_group.setVisible(False) + layout.addWidget(self.twopoint_group) + + # Transformation durchführen + transform_btn = QPushButton("Transformation durchführen") + transform_btn.clicked.connect(self.execute_transformation) + layout.addWidget(transform_btn) + + # Ergebnisse + results_group = QGroupBox("Ergebnisse") + results_layout = QVBoxLayout(results_group) + + self.results_text = QTextEdit() + self.results_text.setReadOnly(True) + results_layout.addWidget(self.results_text) + + # Export + export_layout = QHBoxLayout() + export_report_btn = QPushButton("Bericht exportieren") + export_report_btn.clicked.connect(self.export_report) + export_layout.addWidget(export_report_btn) + + export_points_btn = QPushButton("Punkte exportieren") + export_points_btn.clicked.connect(self.export_points) + export_layout.addWidget(export_points_btn) + + results_layout.addLayout(export_layout) + + layout.addWidget(results_group) + + def toggle_method(self): + self.manual_group.setVisible(self.manual_radio.isChecked()) + self.twopoint_group.setVisible(self.twopoint_radio.isChecked()) + + if self.twopoint_radio.isChecked(): + self.refresh_point_lists() + + def refresh_point_lists(self): + if not self.main_window.parser: + return + + points = list(self.main_window.parser.get_active_points().keys()) + + self.origin_combo.clear() + self.direction_combo.clear() + self.zref_combo.clear() + + self.zref_combo.addItem("(wie Ursprung)") + + for name in sorted(points): + self.origin_combo.addItem(name) + self.direction_combo.addItem(name) + self.zref_combo.addItem(name) + + def execute_transformation(self): + if not self.main_window.parser: + QMessageBox.warning(self, "Fehler", "Bitte zuerst eine JXL-Datei laden!") + return + + # Punkte aus Parser holen + points = [] + for name, p in self.main_window.parser.get_active_points().items(): + if p.east is not None and p.north is not None: + points.append(CORPoint( + name=name, + x=p.east, + y=p.north, + z=p.elevation or 0.0 + )) + + self.transformer.set_points(points) + + if self.manual_radio.isChecked(): + self.transformer.set_manual_parameters( + dx=self.dx_spin.value(), + dy=self.dy_spin.value(), + dz=self.dz_spin.value(), + rotation_gon=self.rotation_spin.value(), + pivot_x=self.pivot_x_spin.value(), + pivot_y=self.pivot_y_spin.value() + ) + elif self.twopoint_radio.isChecked(): + origin = self.origin_combo.currentText() + direction = self.direction_combo.currentText() + zref = self.zref_combo.currentText() + + if zref == "(wie Ursprung)": + zref = None + + if not self.transformer.compute_from_two_points(origin, direction, zref): + QMessageBox.warning(self, "Fehler", "Punkte nicht gefunden!") + return + + self.transformer.transform() + + # Ergebnisse anzeigen + report = self.transformer.get_parameters_report() + report += "\n\n" + report += self.transformer.get_comparison_table() + + self.results_text.setText(report) + self.main_window.statusBar().showMessage("Transformation durchgeführt") + + def export_report(self): + file_path, _ = QFileDialog.getSaveFileName( + self, "Bericht speichern", "", "Text Files (*.txt)") + if file_path: + with open(file_path, 'w', encoding='utf-8') as f: + f.write(self.results_text.toPlainText()) + QMessageBox.information(self, "Erfolg", f"Bericht gespeichert: {file_path}") + + def export_points(self): + if not self.transformer.transformed_points: + QMessageBox.warning(self, "Fehler", "Keine transformierten Punkte vorhanden!") + return + + file_path, _ = QFileDialog.getSaveFileName( + self, "Punkte speichern", "", "CSV Files (*.csv)") + if file_path: + lines = ["Punkt;X;Y;Z"] + for p in self.transformer.transformed_points: + lines.append(f"{p.name};{p.x:.4f};{p.y:.4f};{p.z:.4f}") + + with open(file_path, 'w', encoding='utf-8') as f: + f.write("\n".join(lines)) + QMessageBox.information(self, "Erfolg", f"Punkte gespeichert: {file_path}") + + +class GeoreferencingTab(QWidget): + """Tab für Georeferenzierung""" + + def __init__(self, parent=None): + super().__init__(parent) + self.main_window = parent + self.georeferencer = Georeferencer() + self.setup_ui() + + def setup_ui(self): + layout = QVBoxLayout(self) + + # Passpunkte + cp_group = QGroupBox("Passpunkte (mind. 3)") + cp_layout = QVBoxLayout(cp_group) + + self.cp_table = QTableWidget() + self.cp_table.setColumnCount(7) + self.cp_table.setHorizontalHeaderLabels([ + "Punkt", "X_lokal", "Y_lokal", "Z_lokal", "X_Ziel", "Y_Ziel", "Z_Ziel"]) + self.cp_table.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch) + cp_layout.addWidget(self.cp_table) + + # Passpunkt-Buttons + cp_buttons = QHBoxLayout() + + add_cp_btn = QPushButton("Passpunkt hinzufügen") + add_cp_btn.clicked.connect(self.add_control_point) + cp_buttons.addWidget(add_cp_btn) + + remove_cp_btn = QPushButton("Entfernen") + remove_cp_btn.clicked.connect(self.remove_control_point) + cp_buttons.addWidget(remove_cp_btn) + + load_local_btn = QPushButton("Lokale Koordinaten aus JXL") + load_local_btn.clicked.connect(self.load_local_from_jxl) + cp_buttons.addWidget(load_local_btn) + + load_target_btn = QPushButton("Zielkoordinaten importieren") + load_target_btn.clicked.connect(self.load_target_coords) + cp_buttons.addWidget(load_target_btn) + + cp_buttons.addStretch() + cp_layout.addLayout(cp_buttons) + + layout.addWidget(cp_group) + + # Transformation berechnen + calc_btn = QPushButton("Transformation berechnen") + calc_btn.clicked.connect(self.calculate_transformation) + layout.addWidget(calc_btn) + + # Ergebnisse + results_group = QGroupBox("Ergebnisse") + results_layout = QVBoxLayout(results_group) + + self.results_text = QTextEdit() + self.results_text.setReadOnly(True) + self.results_text.setFont(QFont("Courier")) + results_layout.addWidget(self.results_text) + + # Export + export_layout = QHBoxLayout() + + export_report_btn = QPushButton("Bericht exportieren") + export_report_btn.clicked.connect(self.export_report) + export_layout.addWidget(export_report_btn) + + transform_all_btn = QPushButton("Alle Punkte transformieren") + transform_all_btn.clicked.connect(self.transform_all_points) + export_layout.addWidget(transform_all_btn) + + results_layout.addLayout(export_layout) + + layout.addWidget(results_group) + + def add_control_point(self): + row = self.cp_table.rowCount() + self.cp_table.insertRow(row) + + # Punkt-Auswahl + combo = QComboBox() + if self.main_window.parser: + for name in sorted(self.main_window.parser.get_active_points().keys()): + combo.addItem(name) + self.cp_table.setCellWidget(row, 0, combo) + + # Editierbare Felder für Koordinaten + for col in range(1, 7): + spin = QDoubleSpinBox() + spin.setRange(-10000000, 10000000) + spin.setDecimals(4) + self.cp_table.setCellWidget(row, col, spin) + + def remove_control_point(self): + row = self.cp_table.currentRow() + if row >= 0: + self.cp_table.removeRow(row) + + def load_local_from_jxl(self): + if not self.main_window.parser: + QMessageBox.warning(self, "Fehler", "Bitte zuerst eine JXL-Datei laden!") + return + + for row in range(self.cp_table.rowCount()): + combo = self.cp_table.cellWidget(row, 0) + if combo: + name = combo.currentText() + if name in self.main_window.parser.points: + p = self.main_window.parser.points[name] + + self.cp_table.cellWidget(row, 1).setValue(p.east or 0) + self.cp_table.cellWidget(row, 2).setValue(p.north or 0) + self.cp_table.cellWidget(row, 3).setValue(p.elevation or 0) + + def load_target_coords(self): + file_path, _ = QFileDialog.getOpenFileName( + self, "Zielkoordinaten laden", "", + "CSV Files (*.csv);;Text Files (*.txt);;All Files (*)") + + if not file_path: + return + + try: + with open(file_path, 'r', encoding='utf-8') as f: + lines = f.readlines() + + # Format: Punkt;X;Y;Z oder Punkt,X,Y,Z + coord_dict = {} + for line in lines: + line = line.strip() + if not line or line.startswith('#'): + continue + + parts = line.replace(',', ';').split(';') + if len(parts) >= 4: + name = parts[0].strip() + x = float(parts[1]) + y = float(parts[2]) + z = float(parts[3]) + coord_dict[name] = (x, y, z) + + # In Tabelle eintragen + for row in range(self.cp_table.rowCount()): + combo = self.cp_table.cellWidget(row, 0) + if combo: + name = combo.currentText() + if name in coord_dict: + x, y, z = coord_dict[name] + self.cp_table.cellWidget(row, 4).setValue(x) + self.cp_table.cellWidget(row, 5).setValue(y) + self.cp_table.cellWidget(row, 6).setValue(z) + + QMessageBox.information(self, "Erfolg", f"{len(coord_dict)} Koordinaten geladen!") + + except Exception as e: + QMessageBox.critical(self, "Fehler", f"Fehler beim Laden: {e}") + + def calculate_transformation(self): + self.georeferencer.clear_control_points() + + for row in range(self.cp_table.rowCount()): + combo = self.cp_table.cellWidget(row, 0) + if not combo: + continue + + name = combo.currentText() + local_x = self.cp_table.cellWidget(row, 1).value() + local_y = self.cp_table.cellWidget(row, 2).value() + local_z = self.cp_table.cellWidget(row, 3).value() + target_x = self.cp_table.cellWidget(row, 4).value() + target_y = self.cp_table.cellWidget(row, 5).value() + target_z = self.cp_table.cellWidget(row, 6).value() + + self.georeferencer.add_control_point( + name, local_x, local_y, local_z, target_x, target_y, target_z) + + if len(self.georeferencer.control_points) < 3: + QMessageBox.warning(self, "Fehler", "Mindestens 3 Passpunkte erforderlich!") + return + + try: + self.georeferencer.compute_transformation() + report = self.georeferencer.get_transformation_report() + self.results_text.setText(report) + self.main_window.statusBar().showMessage("Georeferenzierung berechnet") + except Exception as e: + QMessageBox.critical(self, "Fehler", f"Berechnung fehlgeschlagen: {e}") + + def export_report(self): + file_path, _ = QFileDialog.getSaveFileName( + self, "Bericht speichern", "", "Text Files (*.txt)") + if file_path: + with open(file_path, 'w', encoding='utf-8') as f: + f.write(self.results_text.toPlainText()) + QMessageBox.information(self, "Erfolg", f"Bericht gespeichert: {file_path}") + + def transform_all_points(self): + if self.georeferencer.result is None: + QMessageBox.warning(self, "Fehler", "Bitte zuerst Transformation berechnen!") + return + + if not self.main_window.parser: + return + + # Punkte transformieren + points = [] + for name, p in self.main_window.parser.get_active_points().items(): + if p.east is not None and p.north is not None: + points.append(CORPoint( + name=name, x=p.east, y=p.north, z=p.elevation or 0)) + + self.georeferencer.set_points_to_transform(points) + transformed = self.georeferencer.transform_points() + + # Export-Dialog + file_path, _ = QFileDialog.getSaveFileName( + self, "Transformierte Punkte speichern", "", "CSV Files (*.csv)") + + if file_path: + lines = ["Punkt;X;Y;Z"] + for p in transformed: + lines.append(f"{p.name};{p.x:.4f};{p.y:.4f};{p.z:.4f}") + + with open(file_path, 'w', encoding='utf-8') as f: + f.write("\n".join(lines)) + + QMessageBox.information(self, "Erfolg", + f"{len(transformed)} Punkte transformiert und gespeichert!") + + +class NetworkAdjustmentTab(QWidget): + """Tab für Netzausgleichung""" + + def __init__(self, parent=None): + super().__init__(parent) + self.main_window = parent + self.adjustment = None + self.setup_ui() + + def setup_ui(self): + layout = QVBoxLayout(self) + + # Konfiguration + config_group = QGroupBox("Konfiguration") + config_layout = QGridLayout(config_group) + + config_layout.addWidget(QLabel("Max. Iterationen:"), 0, 0) + self.max_iter_spin = QSpinBox() + self.max_iter_spin.setRange(1, 100) + self.max_iter_spin.setValue(20) + config_layout.addWidget(self.max_iter_spin, 0, 1) + + config_layout.addWidget(QLabel("Konvergenzlimit [mm]:"), 1, 0) + self.convergence_spin = QDoubleSpinBox() + self.convergence_spin.setRange(0.001, 10) + self.convergence_spin.setDecimals(3) + self.convergence_spin.setValue(0.01) + config_layout.addWidget(self.convergence_spin, 1, 1) + + config_layout.addWidget(QLabel("Sigma-0 a-priori:"), 2, 0) + self.sigma0_spin = QDoubleSpinBox() + self.sigma0_spin.setRange(0.1, 10) + self.sigma0_spin.setDecimals(2) + self.sigma0_spin.setValue(1.0) + config_layout.addWidget(self.sigma0_spin, 2, 1) + + layout.addWidget(config_group) + + # Festpunkte + fixed_group = QGroupBox("Festpunkte") + fixed_layout = QVBoxLayout(fixed_group) + + self.fixed_list = QListWidget() + self.fixed_list.setSelectionMode(QListWidget.MultiSelection) + fixed_layout.addWidget(self.fixed_list) + + fixed_buttons = QHBoxLayout() + refresh_btn = QPushButton("Liste aktualisieren") + refresh_btn.clicked.connect(self.refresh_point_list) + fixed_buttons.addWidget(refresh_btn) + + auto_btn = QPushButton("Auto (Referenzpunkte)") + auto_btn.clicked.connect(self.auto_select_fixed) + fixed_buttons.addWidget(auto_btn) + + fixed_layout.addLayout(fixed_buttons) + layout.addWidget(fixed_group) + + # Ausgleichung durchführen + adjust_btn = QPushButton("Netzausgleichung durchführen") + adjust_btn.clicked.connect(self.run_adjustment) + layout.addWidget(adjust_btn) + + # Ergebnisse + results_group = QGroupBox("Ergebnisse") + results_layout = QVBoxLayout(results_group) + + self.results_text = QTextEdit() + self.results_text.setReadOnly(True) + self.results_text.setFont(QFont("Courier", 9)) + results_layout.addWidget(self.results_text) + + # Export + export_layout = QHBoxLayout() + + export_report_btn = QPushButton("Bericht exportieren") + export_report_btn.clicked.connect(self.export_report) + export_layout.addWidget(export_report_btn) + + export_points_btn = QPushButton("Koordinaten exportieren") + export_points_btn.clicked.connect(self.export_points) + export_layout.addWidget(export_points_btn) + + results_layout.addLayout(export_layout) + layout.addWidget(results_group) + + def refresh_point_list(self): + self.fixed_list.clear() + + if not self.main_window.parser: + return + + for name in sorted(self.main_window.parser.get_active_points().keys()): + item = QListWidgetItem(name) + self.fixed_list.addItem(item) + + def auto_select_fixed(self): + if not self.main_window.parser: + return + + self.fixed_list.clearSelection() + + ref_line = self.main_window.parser.get_reference_line() + if ref_line: + for i in range(self.fixed_list.count()): + item = self.fixed_list.item(i) + if item.text() in [ref_line.start_point, ref_line.end_point]: + item.setSelected(True) + + def run_adjustment(self): + if not self.main_window.parser: + QMessageBox.warning(self, "Fehler", "Bitte zuerst eine JXL-Datei laden!") + return + + # Adjustment erstellen + self.adjustment = NetworkAdjustment(self.main_window.parser) + self.adjustment.max_iterations = self.max_iter_spin.value() + self.adjustment.convergence_limit = self.convergence_spin.value() / 1000.0 + self.adjustment.sigma_0_priori = self.sigma0_spin.value() + + # Beobachtungen extrahieren + self.adjustment.extract_observations() + self.adjustment.initialize_points() + + # Festpunkte setzen + for item in self.fixed_list.selectedItems(): + self.adjustment.set_fixed_point(item.text()) + + if not self.adjustment.fixed_points: + self.adjustment.set_fixed_points_auto() + + try: + result = self.adjustment.adjust() + report = self.adjustment.get_adjustment_report() + self.results_text.setText(report) + + status = "konvergiert" if result.converged else "nicht konvergiert" + self.main_window.statusBar().showMessage( + f"Ausgleichung abgeschlossen ({status}, {result.iterations} Iterationen)") + + except Exception as e: + QMessageBox.critical(self, "Fehler", f"Ausgleichung fehlgeschlagen: {e}") + import traceback + traceback.print_exc() + + def export_report(self): + if not self.adjustment or not self.adjustment.result: + QMessageBox.warning(self, "Fehler", "Keine Ergebnisse vorhanden!") + return + + file_path, _ = QFileDialog.getSaveFileName( + self, "Bericht speichern", "", "Text Files (*.txt)") + if file_path: + self.adjustment.export_report(file_path) + QMessageBox.information(self, "Erfolg", f"Bericht gespeichert: {file_path}") + + def export_points(self): + if not self.adjustment or not self.adjustment.result: + QMessageBox.warning(self, "Fehler", "Keine Ergebnisse vorhanden!") + return + + file_path, _ = QFileDialog.getSaveFileName( + self, "Koordinaten speichern", "", "CSV Files (*.csv)") + if file_path: + self.adjustment.export_adjusted_points(file_path, 'csv') + QMessageBox.information(self, "Erfolg", f"Koordinaten gespeichert: {file_path}") + + +class MainWindow(QMainWindow): + """Hauptfenster der Anwendung""" + + def __init__(self): + super().__init__() + self.parser: JXLParser = None + self.setup_ui() + + def setup_ui(self): + self.setWindowTitle("Trimble Geodesy Tool - Geodätische Vermessungsarbeiten") + self.setMinimumSize(1200, 800) + + # Menüleiste + self.setup_menu() + + # Statusleiste + self.statusBar().showMessage("Bereit") + + # Zentrale Widget mit Tabs + central_widget = QWidget() + self.setCentralWidget(central_widget) + + layout = QVBoxLayout(central_widget) + + # Tab-Widget + self.tabs = QTabWidget() + + self.jxl_tab = JXLAnalysisTab(self) + self.tabs.addTab(self.jxl_tab, "📁 JXL-Analyse") + + self.cor_tab = CORGeneratorTab(self) + self.tabs.addTab(self.cor_tab, "📄 COR-Generator") + + self.transform_tab = TransformationTab(self) + self.tabs.addTab(self.transform_tab, "🔄 Transformation") + + self.georef_tab = GeoreferencingTab(self) + self.tabs.addTab(self.georef_tab, "🌍 Georeferenzierung") + + self.adjust_tab = NetworkAdjustmentTab(self) + self.tabs.addTab(self.adjust_tab, "📐 Netzausgleichung") + + layout.addWidget(self.tabs) + + def setup_menu(self): + menubar = self.menuBar() + + # Datei-Menü + file_menu = menubar.addMenu("&Datei") + + open_action = QAction("&Öffnen...", self) + open_action.setShortcut("Ctrl+O") + open_action.triggered.connect(self.open_file) + file_menu.addAction(open_action) + + file_menu.addSeparator() + + exit_action = QAction("&Beenden", self) + exit_action.setShortcut("Ctrl+Q") + exit_action.triggered.connect(self.close) + file_menu.addAction(exit_action) + + # Hilfe-Menü + help_menu = menubar.addMenu("&Hilfe") + + about_action = QAction("&Über", self) + about_action.triggered.connect(self.show_about) + help_menu.addAction(about_action) + + def open_file(self): + file_path, _ = QFileDialog.getOpenFileName( + self, "JXL-Datei öffnen", "", "JXL Files (*.jxl);;All Files (*)") + if file_path: + self.jxl_tab.file_path_edit.setText(file_path) + self.jxl_tab.load_file() + + def show_about(self): + QMessageBox.about(self, "Über Trimble Geodesy Tool", + "

Trimble Geodesy Tool

" + "

Geodätische Vermessungsarbeiten mit JXL-Dateien

" + "

Funktionen:

" + "
    " + "
  • JXL-Datei Analyse und Bearbeitung
  • " + "
  • COR-Datei Generierung
  • " + "
  • Koordinatentransformation (Rotation/Translation)
  • " + "
  • Georeferenzierung mit Passpunkten
  • " + "
  • Netzausgleichung nach kleinsten Quadraten
  • " + "
" + "

Version 1.0

") + + +def main(): + app = QApplication(sys.argv) + + # Stil setzen + app.setStyle('Fusion') + + # Fenster erstellen und anzeigen + window = MainWindow() + window.show() + + sys.exit(app.exec_()) + + +if __name__ == "__main__": + main() diff --git a/modules/__init__.py b/modules/__init__.py new file mode 100644 index 0000000..5ba9474 --- /dev/null +++ b/modules/__init__.py @@ -0,0 +1,6 @@ +# Trimble Geodesy Modules +from .jxl_parser import JXLParser +from .cor_generator import CORGenerator +from .transformation import CoordinateTransformer +from .georeferencing import Georeferencer +from .network_adjustment import NetworkAdjustment diff --git a/modules/__pycache__/__init__.cpython-311.pyc b/modules/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..ae56889 Binary files /dev/null and b/modules/__pycache__/__init__.cpython-311.pyc differ diff --git a/modules/__pycache__/cor_generator.cpython-311.pyc b/modules/__pycache__/cor_generator.cpython-311.pyc new file mode 100644 index 0000000..a474397 Binary files /dev/null and b/modules/__pycache__/cor_generator.cpython-311.pyc differ diff --git a/modules/__pycache__/georeferencing.cpython-311.pyc b/modules/__pycache__/georeferencing.cpython-311.pyc new file mode 100644 index 0000000..4bf4843 Binary files /dev/null and b/modules/__pycache__/georeferencing.cpython-311.pyc differ diff --git a/modules/__pycache__/jxl_parser.cpython-311.pyc b/modules/__pycache__/jxl_parser.cpython-311.pyc new file mode 100644 index 0000000..77a2ba9 Binary files /dev/null and b/modules/__pycache__/jxl_parser.cpython-311.pyc differ diff --git a/modules/__pycache__/network_adjustment.cpython-311.pyc b/modules/__pycache__/network_adjustment.cpython-311.pyc new file mode 100644 index 0000000..6b778cb Binary files /dev/null and b/modules/__pycache__/network_adjustment.cpython-311.pyc differ diff --git a/modules/__pycache__/transformation.cpython-311.pyc b/modules/__pycache__/transformation.cpython-311.pyc new file mode 100644 index 0000000..2476ea5 Binary files /dev/null and b/modules/__pycache__/transformation.cpython-311.pyc differ diff --git a/modules/cor_generator.py b/modules/cor_generator.py new file mode 100644 index 0000000..002a939 --- /dev/null +++ b/modules/cor_generator.py @@ -0,0 +1,406 @@ +""" +COR Generator Module - Generiert COR-Punktdateien aus JXL-Daten +Unterstützt Referenzlinien-Stationierung und freie Stationierung +""" + +import math +from typing import Dict, List, Optional, Tuple +from dataclasses import dataclass +from .jxl_parser import JXLParser, Point, Station + + +@dataclass +class CORPoint: + """Repräsentiert einen Punkt im COR-Format""" + name: str + x: float # East + y: float # North + z: float # Elevation + + def to_cor_line(self) -> str: + """Formatiert als COR-Zeile""" + # Format: |Name |X |Y |Z | + x_str = f"{self.x:.3f}" if abs(self.x) >= 0.001 else "0" + y_str = f"{self.y:.3f}" if abs(self.y) >= 0.001 else "0" + z_str = f"{self.z:.3f}" if abs(self.z) >= 0.001 else "0" + + return f"|{self.name} |{x_str} | {y_str} | {z_str} |" + + +class CORGenerator: + """Generator für COR-Dateien aus JXL-Daten""" + + def __init__(self, parser: JXLParser): + self.parser = parser + self.cor_points: List[CORPoint] = [] + + # Transformationsparameter (für lokales System) + self.origin_east: float = 0.0 + self.origin_north: float = 0.0 + self.origin_elev: float = 0.0 + self.rotation: float = 0.0 # in Radiant + + def compute_from_observations(self) -> List[CORPoint]: + """ + Berechnet Koordinaten aus den Rohbeobachtungen + Erste Station: Referenzlinie + Weitere Stationen: Freie Stationierung + """ + self.cor_points = [] + computed_coords: Dict[str, Tuple[float, float, float]] = {} + + # Referenzlinie finden + ref_line = self.parser.get_reference_line() + + # Stationen sortieren nach Aufnahmezeitpunkt + stations_sorted = sorted(self.parser.stations.items(), + key=lambda x: x[1].timestamp) + + first_station = True + + for station_id, station in stations_sorted: + if station.station_type == 'ReflineStationSetup' and first_station: + # Erste Station mit Referenzlinie + self._compute_refline_station(station_id, station, ref_line, computed_coords) + first_station = False + else: + # Freie Stationierung + self._compute_free_station(station_id, station, computed_coords) + + # COR-Punkte erstellen + for name, (e, n, elev) in computed_coords.items(): + self.cor_points.append(CORPoint( + name=name, + x=e, + y=n, + z=elev + )) + + return self.cor_points + + def _compute_refline_station(self, station_id: str, station: Station, + ref_line, computed_coords: Dict): + """Berechnet Punkte für eine Referenzlinien-Station""" + + # Referenzpunkte definieren das lokale System + if ref_line: + start_name = ref_line.start_point + end_name = ref_line.end_point + + # Startpunkt ist Ursprung (0,0) + if start_name in self.parser.points: + computed_coords[start_name] = (0.0, 0.0, 0.0) + + # Endpunkt liegt auf der Y-Achse (Nord-Richtung) + # Die Distanz muss aus den Messungen berechnet werden + + # Alle Messungen von dieser Station + measurements = self.parser.get_measurements_from_station(station_id) + + # Finde Backbearing für Orientierung + backbearing = None + for bb_id, bb in self.parser.backbearings.items(): + if bb.station_record_id == station_id: + backbearing = bb + break + + # Stationskoordinaten + if station.name in self.parser.points: + st_point = self.parser.points[station.name] + st_e = st_point.east if st_point.east is not None else 0.0 + st_n = st_point.north if st_point.north is not None else 0.0 + st_elev = st_point.elevation if st_point.elevation is not None else 0.0 + + computed_coords[station.name] = (st_e, st_n, st_elev) + + # Punkte aus Messungen berechnen + for meas in measurements: + if meas.name and meas.horizontal_circle is not None and meas.edm_distance is not None: + # Prismenkonstante holen + prism_const = 0.0 + if meas.target_id in self.parser.targets: + prism_const = self.parser.targets[meas.target_id].prism_constant + + # Korrigierte Distanz + dist = meas.edm_distance + prism_const + + # Vertikalwinkel + vz = meas.vertical_circle if meas.vertical_circle is not None else 100.0 + vz_rad = vz * math.pi / 200.0 # Gon zu Radiant + + # Horizontaldistanz + h_dist = dist * math.sin(vz_rad) + + # Höhendifferenz (von Zenitwinkel) + dh = dist * math.cos(vz_rad) + + # Richtung berechnen + hz = meas.horizontal_circle + + # Orientierung anwenden + if backbearing and backbearing.orientation_correction is not None: + ori = backbearing.orientation_correction + else: + ori = 0.0 + + # Azimut berechnen + # Bei Referenzlinie: Der Hz-Kreis zum Backsight definiert die Nordrichtung + azimut_gon = hz + ori + azimut_rad = azimut_gon * math.pi / 200.0 + + # Koordinatenberechnung + de = h_dist * math.sin(azimut_rad) + dn = h_dist * math.cos(azimut_rad) + + # Endkoordinaten + if meas.north is not None and meas.east is not None: + # Verwende berechnete Koordinaten aus JXL + e = meas.east + n = meas.north + elev = meas.elevation if meas.elevation is not None else 0.0 + else: + # Berechne aus Messungen + st_point = self.parser.points.get(station.name) + if st_point: + e = (st_point.east or 0.0) + de + n = (st_point.north or 0.0) + dn + elev = (st_point.elevation or 0.0) + dh + else: + e = de + n = dn + elev = dh + + # Nur hinzufügen wenn noch nicht vorhanden oder neuere Messung + if meas.name not in computed_coords: + computed_coords[meas.name] = (e, n, elev) + + def _compute_free_station(self, station_id: str, station: Station, + computed_coords: Dict): + """Berechnet Punkte für eine freie Stationierung""" + + # Bei freier Stationierung: Station wurde bereits berechnet + # Wir verwenden die Koordinaten aus dem Parser + + measurements = self.parser.get_measurements_from_station(station_id) + + # Stationskoordinaten hinzufügen falls vorhanden + if station.name in self.parser.points: + st_point = self.parser.points[station.name] + if st_point.east is not None and st_point.north is not None: + computed_coords[station.name] = ( + st_point.east, + st_point.north, + st_point.elevation or 0.0 + ) + + # Punkte aus Messungen + for meas in measurements: + if meas.name and meas.north is not None and meas.east is not None: + if meas.name not in computed_coords: + computed_coords[meas.name] = ( + meas.east, + meas.north, + meas.elevation or 0.0 + ) + + def generate_from_computed_grid(self) -> List[CORPoint]: + """ + Generiert COR-Punkte direkt aus den ComputedGrid-Koordinaten der JXL-Datei + """ + self.cor_points = [] + seen_names = set() + + # Referenzlinie finden für Header + ref_line = self.parser.get_reference_line() + + # Sortiere Stationen nach Zeitstempel + stations_sorted = sorted(self.parser.stations.items(), + key=lambda x: x[1].timestamp) + + current_station_header = None + + for station_id, station in stations_sorted: + # Neuer Stationsheader wenn sich die Station ändert + if station.station_type == 'ReflineStationSetup': + if ref_line: + # Header für Referenzlinie-Station + header_point = CORPoint( + name=ref_line.start_point, + x=0.0, y=0.0, z=0.0 + ) + if ref_line.start_point not in seen_names: + self.cor_points.append(header_point) + seen_names.add(ref_line.start_point) + + # Punkte von dieser Station + measurements = self.parser.get_measurements_from_station(station_id) + + for meas in measurements: + if meas.name and meas.name not in seen_names: + if meas.north is not None and meas.east is not None: + self.cor_points.append(CORPoint( + name=meas.name, + x=meas.east, + y=meas.north, + z=meas.elevation or 0.0 + )) + seen_names.add(meas.name) + + # Alle verbleibenden Punkte hinzufügen + for name, point in self.parser.get_active_points().items(): + if name not in seen_names and point.east is not None and point.north is not None: + self.cor_points.append(CORPoint( + name=name, + x=point.east, + y=point.north, + z=point.elevation or 0.0 + )) + seen_names.add(name) + + return self.cor_points + + def write_cor_file(self, output_path: str, include_header: bool = True) -> str: + """Schreibt die COR-Datei""" + lines = [] + + # Sammle eindeutige Stationsstarts für Header + ref_line = self.parser.get_reference_line() + stations_sorted = sorted(self.parser.stations.items(), + key=lambda x: x[1].timestamp) + + current_station_idx = 0 + written_points = set() + + for station_id, station in stations_sorted: + # Header für neue Station (Referenzlinie) + if station.station_type == 'ReflineStationSetup' and ref_line: + if include_header: + # Markdown-Style Header + lines.append(f"|{ref_line.start_point} |0.000 |0.000.1 |0.000.2 |") + lines.append("|----:|----:|----:|----:|") + + # Punkte von dieser Station + measurements = self.parser.get_measurements_from_station(station_id) + for meas in measurements: + if meas.name and meas.name not in written_points: + # Finde den COR-Punkt + for cp in self.cor_points: + if cp.name == meas.name: + lines.append(cp.to_cor_line()) + written_points.add(meas.name) + break + + # Verbleibende Punkte + for cp in self.cor_points: + if cp.name not in written_points: + lines.append(cp.to_cor_line()) + written_points.add(cp.name) + + content = "\n".join(lines) + + with open(output_path, 'w', encoding='utf-8') as f: + f.write(content) + + return content + + def export_csv(self, output_path: str) -> str: + """Exportiert als CSV-Datei""" + lines = ["Punktname;East;North;Elevation"] + + for cp in self.cor_points: + lines.append(f"{cp.name};{cp.x:.4f};{cp.y:.4f};{cp.z:.4f}") + + content = "\n".join(lines) + + with open(output_path, 'w', encoding='utf-8') as f: + f.write(content) + + return content + + def export_txt(self, output_path: str, delimiter: str = "\t") -> str: + """Exportiert als TXT-Datei""" + lines = [] + + for cp in self.cor_points: + lines.append(f"{cp.name}{delimiter}{cp.x:.4f}{delimiter}{cp.y:.4f}{delimiter}{cp.z:.4f}") + + content = "\n".join(lines) + + with open(output_path, 'w', encoding='utf-8') as f: + f.write(content) + + return content + + def export_dxf(self, output_path: str) -> str: + """Exportiert als DXF-Datei (einfaches Format)""" + lines = [] + + # DXF Header + lines.extend([ + "0", "SECTION", + "2", "ENTITIES" + ]) + + # Punkte als POINT und TEXT + for cp in self.cor_points: + # POINT entity + lines.extend([ + "0", "POINT", + "8", "POINTS", # Layer + "10", f"{cp.x:.4f}", # X + "20", f"{cp.y:.4f}", # Y + "30", f"{cp.z:.4f}" # Z + ]) + + # TEXT entity für Punktname + lines.extend([ + "0", "TEXT", + "8", "NAMES", # Layer + "10", f"{cp.x + 0.5:.4f}", # X offset + "20", f"{cp.y + 0.5:.4f}", # Y offset + "30", f"{cp.z:.4f}", # Z + "40", "0.5", # Texthöhe + "1", cp.name # Text + ]) + + # DXF Footer + lines.extend([ + "0", "ENDSEC", + "0", "EOF" + ]) + + content = "\n".join(lines) + + with open(output_path, 'w', encoding='utf-8') as f: + f.write(content) + + return content + + def get_statistics(self) -> str: + """Gibt Statistiken über die generierten Punkte zurück""" + if not self.cor_points: + return "Keine Punkte generiert." + + x_vals = [p.x for p in self.cor_points] + y_vals = [p.y for p in self.cor_points] + z_vals = [p.z for p in self.cor_points] + + stats = [] + stats.append(f"Anzahl Punkte: {len(self.cor_points)}") + stats.append(f"") + stats.append(f"X (East):") + stats.append(f" Min: {min(x_vals):.3f} m") + stats.append(f" Max: {max(x_vals):.3f} m") + stats.append(f" Spanne: {max(x_vals) - min(x_vals):.3f} m") + stats.append(f"") + stats.append(f"Y (North):") + stats.append(f" Min: {min(y_vals):.3f} m") + stats.append(f" Max: {max(y_vals):.3f} m") + stats.append(f" Spanne: {max(y_vals) - min(y_vals):.3f} m") + stats.append(f"") + stats.append(f"Z (Elevation):") + stats.append(f" Min: {min(z_vals):.3f} m") + stats.append(f" Max: {max(z_vals):.3f} m") + stats.append(f" Spanne: {max(z_vals) - min(z_vals):.3f} m") + + return "\n".join(stats) diff --git a/modules/georeferencing.py b/modules/georeferencing.py new file mode 100644 index 0000000..f05789f --- /dev/null +++ b/modules/georeferencing.py @@ -0,0 +1,369 @@ +""" +Georeferenzierungs-Modul +Transformation mit Passpunkten (mind. 3 Punkte) +NUR Rotation und Translation - KEINE Maßstabsänderung +""" + +import math +import numpy as np +from typing import List, Tuple, Optional, Dict +from dataclasses import dataclass +from .cor_generator import CORPoint + + +@dataclass +class ControlPoint: + """Passpunkt mit Soll- und Ist-Koordinaten""" + name: str + # Ist-Koordinaten (lokales System) + local_x: float + local_y: float + local_z: float + # Soll-Koordinaten (Zielsystem, z.B. UTM) + target_x: float + target_y: float + target_z: float + # Residuen nach Transformation + residual_x: float = 0.0 + residual_y: float = 0.0 + residual_z: float = 0.0 + + @property + def residual_2d(self) -> float: + """2D-Residuum""" + return math.sqrt(self.residual_x**2 + self.residual_y**2) + + @property + def residual_3d(self) -> float: + """3D-Residuum""" + return math.sqrt(self.residual_x**2 + self.residual_y**2 + self.residual_z**2) + + +@dataclass +class GeoreferenceResult: + """Ergebnis der Georeferenzierung""" + # Transformationsparameter + translation_x: float = 0.0 + translation_y: float = 0.0 + translation_z: float = 0.0 + rotation_rad: float = 0.0 + + # Qualitätsparameter + rmse_x: float = 0.0 + rmse_y: float = 0.0 + rmse_z: float = 0.0 + rmse_2d: float = 0.0 + rmse_3d: float = 0.0 + max_residual_2d: float = 0.0 + max_residual_3d: float = 0.0 + + # Passpunkte mit Residuen + control_points: List[ControlPoint] = None + + def __post_init__(self): + if self.control_points is None: + self.control_points = [] + + @property + def rotation_gon(self) -> float: + """Rotation in Gon""" + return self.rotation_rad * 200.0 / math.pi + + @property + def rotation_deg(self) -> float: + """Rotation in Grad""" + return math.degrees(self.rotation_rad) + + +class Georeferencer: + """ + Georeferenzierung mit mindestens 3 Passpunkten + Verwendet Methode der kleinsten Quadrate für Überbestimmung + NUR Rotation und Translation (4 Parameter) + """ + + def __init__(self): + self.control_points: List[ControlPoint] = [] + self.result: Optional[GeoreferenceResult] = None + self.points_to_transform: List[CORPoint] = [] + self.transformed_points: List[CORPoint] = [] + + def add_control_point(self, name: str, + local_x: float, local_y: float, local_z: float, + target_x: float, target_y: float, target_z: float): + """Fügt einen Passpunkt hinzu""" + self.control_points.append(ControlPoint( + name=name, + local_x=local_x, local_y=local_y, local_z=local_z, + target_x=target_x, target_y=target_y, target_z=target_z + )) + + def clear_control_points(self): + """Löscht alle Passpunkte""" + self.control_points = [] + self.result = None + + def set_points_to_transform(self, points: List[CORPoint]): + """Setzt die zu transformierenden Punkte""" + self.points_to_transform = points.copy() + + def compute_transformation(self) -> GeoreferenceResult: + """ + Berechnet die Transformationsparameter + 4-Parameter-Transformation: Rotation + Translation (kein Maßstab) + Methode der kleinsten Quadrate + """ + if len(self.control_points) < 3: + raise ValueError("Mindestens 3 Passpunkte erforderlich!") + + n = len(self.control_points) + + # Koordinaten extrahieren + local_coords = np.array([[cp.local_x, cp.local_y] for cp in self.control_points]) + target_coords = np.array([[cp.target_x, cp.target_y] for cp in self.control_points]) + + # Schwerpunkte berechnen + local_centroid = np.mean(local_coords, axis=0) + target_centroid = np.mean(target_coords, axis=0) + + # Zum Schwerpunkt verschieben + local_centered = local_coords - local_centroid + target_centered = target_coords - target_centroid + + # Rotation berechnen mit SVD (Procrustes ohne Skalierung) + H = local_centered.T @ target_centered + U, S, Vt = np.linalg.svd(H) + + # Rotationsmatrix + R = Vt.T @ U.T + + # Reflexion korrigieren falls nötig + if np.linalg.det(R) < 0: + Vt[-1, :] *= -1 + R = Vt.T @ U.T + + # Rotationswinkel aus Matrix + rotation_rad = math.atan2(R[1, 0], R[0, 0]) + + # Translation berechnen + translation = target_centroid - R @ local_centroid + + # Ergebnis speichern + self.result = GeoreferenceResult( + translation_x=translation[0], + translation_y=translation[1], + rotation_rad=rotation_rad + ) + + # Z-Translation (Mittelwert der Höhendifferenzen) + z_diffs = [cp.target_z - cp.local_z for cp in self.control_points] + self.result.translation_z = np.mean(z_diffs) + + # Residuen berechnen + self._compute_residuals() + + return self.result + + def _compute_residuals(self): + """Berechnet Residuen für alle Passpunkte""" + if self.result is None: + return + + cos_r = math.cos(self.result.rotation_rad) + sin_r = math.sin(self.result.rotation_rad) + + sum_res_x2 = 0.0 + sum_res_y2 = 0.0 + sum_res_z2 = 0.0 + max_res_2d = 0.0 + max_res_3d = 0.0 + + for cp in self.control_points: + # Transformation anwenden + x_trans = cos_r * cp.local_x - sin_r * cp.local_y + self.result.translation_x + y_trans = sin_r * cp.local_x + cos_r * cp.local_y + self.result.translation_y + z_trans = cp.local_z + self.result.translation_z + + # Residuen + cp.residual_x = cp.target_x - x_trans + cp.residual_y = cp.target_y - y_trans + cp.residual_z = cp.target_z - z_trans + + sum_res_x2 += cp.residual_x**2 + sum_res_y2 += cp.residual_y**2 + sum_res_z2 += cp.residual_z**2 + + max_res_2d = max(max_res_2d, cp.residual_2d) + max_res_3d = max(max_res_3d, cp.residual_3d) + + n = len(self.control_points) + self.result.rmse_x = math.sqrt(sum_res_x2 / n) + self.result.rmse_y = math.sqrt(sum_res_y2 / n) + self.result.rmse_z = math.sqrt(sum_res_z2 / n) + self.result.rmse_2d = math.sqrt((sum_res_x2 + sum_res_y2) / n) + self.result.rmse_3d = math.sqrt((sum_res_x2 + sum_res_y2 + sum_res_z2) / n) + self.result.max_residual_2d = max_res_2d + self.result.max_residual_3d = max_res_3d + self.result.control_points = self.control_points + + def transform_points(self) -> List[CORPoint]: + """Transformiert alle Punkte""" + if self.result is None: + raise ValueError("Transformation noch nicht berechnet!") + + self.transformed_points = [] + + cos_r = math.cos(self.result.rotation_rad) + sin_r = math.sin(self.result.rotation_rad) + + for p in self.points_to_transform: + x_trans = cos_r * p.x - sin_r * p.y + self.result.translation_x + y_trans = sin_r * p.x + cos_r * p.y + self.result.translation_y + z_trans = p.z + self.result.translation_z + + self.transformed_points.append(CORPoint( + name=p.name, + x=x_trans, + y=y_trans, + z=z_trans + )) + + return self.transformed_points + + def transform_single_point(self, x: float, y: float, z: float) -> Tuple[float, float, float]: + """Transformiert einen einzelnen Punkt""" + if self.result is None: + raise ValueError("Transformation noch nicht berechnet!") + + cos_r = math.cos(self.result.rotation_rad) + sin_r = math.sin(self.result.rotation_rad) + + x_trans = cos_r * x - sin_r * y + self.result.translation_x + y_trans = sin_r * x + cos_r * y + self.result.translation_y + z_trans = z + self.result.translation_z + + return x_trans, y_trans, z_trans + + def get_transformation_report(self) -> str: + """Erstellt einen detaillierten Transformationsbericht""" + if self.result is None: + return "Keine Transformation berechnet." + + lines = [] + lines.append("=" * 70) + lines.append("GEOREFERENZIERUNG - TRANSFORMATIONSBERICHT") + lines.append("=" * 70) + lines.append("") + lines.append("TRANSFORMATIONSPARAMETER (4-Parameter, ohne Maßstab)") + lines.append("-" * 70) + lines.append(f" Translation X: {self.result.translation_x:>15.4f} m") + lines.append(f" Translation Y: {self.result.translation_y:>15.4f} m") + lines.append(f" Translation Z: {self.result.translation_z:>15.4f} m") + lines.append(f" Rotation: {self.result.rotation_gon:>15.6f} gon") + lines.append(f" {self.result.rotation_deg:>15.6f}°") + lines.append(f" {self.result.rotation_rad:>15.8f} rad") + lines.append(f" Maßstab: {1.0:>15.6f} (fest)") + lines.append("") + lines.append("QUALITÄTSPARAMETER") + lines.append("-" * 70) + lines.append(f" RMSE X: {self.result.rmse_x*1000:>15.2f} mm") + lines.append(f" RMSE Y: {self.result.rmse_y*1000:>15.2f} mm") + lines.append(f" RMSE Z: {self.result.rmse_z*1000:>15.2f} mm") + lines.append(f" RMSE 2D: {self.result.rmse_2d*1000:>15.2f} mm") + lines.append(f" RMSE 3D: {self.result.rmse_3d*1000:>15.2f} mm") + lines.append(f" Max. Residuum 2D: {self.result.max_residual_2d*1000:>15.2f} mm") + lines.append(f" Max. Residuum 3D: {self.result.max_residual_3d*1000:>15.2f} mm") + lines.append("") + lines.append("PASSPUNKTE UND RESIDUEN") + lines.append("-" * 70) + lines.append(f"{'Punkt':<10} {'vX [mm]':>10} {'vY [mm]':>10} {'vZ [mm]':>10} {'v2D [mm]':>10} {'v3D [mm]':>10}") + lines.append("-" * 70) + + for cp in self.result.control_points: + lines.append(f"{cp.name:<10} {cp.residual_x*1000:>10.2f} {cp.residual_y*1000:>10.2f} " + f"{cp.residual_z*1000:>10.2f} {cp.residual_2d*1000:>10.2f} {cp.residual_3d*1000:>10.2f}") + + lines.append("-" * 70) + lines.append(f"Anzahl Passpunkte: {len(self.control_points)}") + lines.append(f"Redundanz (2D): {2 * len(self.control_points) - 4}") + lines.append("") + lines.append("=" * 70) + + return "\n".join(lines) + + def get_control_points_comparison(self) -> str: + """Erstellt eine Vergleichstabelle für Passpunkte""" + if self.result is None: + return "Keine Transformation berechnet." + + lines = [] + lines.append("PASSPUNKT-KOORDINATEN: LOKAL → ZIEL") + lines.append("=" * 100) + lines.append(f"{'Punkt':<8} {'X_lokal':>12} {'Y_lokal':>12} {'Z_lokal':>10} | " + f"{'X_Ziel':>12} {'Y_Ziel':>12} {'Z_Ziel':>10}") + lines.append("-" * 100) + + for cp in self.control_points: + lines.append(f"{cp.name:<8} {cp.local_x:>12.3f} {cp.local_y:>12.3f} {cp.local_z:>10.3f} | " + f"{cp.target_x:>12.3f} {cp.target_y:>12.3f} {cp.target_z:>10.3f}") + + lines.append("=" * 100) + return "\n".join(lines) + + def export_report(self, filepath: str): + """Exportiert den vollständigen Bericht""" + report = self.get_transformation_report() + with open(filepath, 'w', encoding='utf-8') as f: + f.write(report) + + +class RigidBodyTransformation: + """ + Alternative Implementierung: Rigide Körpertransformation + 6 Parameter: 3 Translationen + 3 Rotationen + Für 3D-Daten mit mehreren Rotationsachsen + """ + + def __init__(self): + self.control_points: List[ControlPoint] = [] + self.translation = np.zeros(3) + self.rotation_matrix = np.eye(3) + + def compute(self, local_points: np.ndarray, target_points: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + """ + Berechnet rigide Transformation mit Kabsch-Algorithmus + Keine Maßstabsänderung + """ + # Schwerpunkte + centroid_local = np.mean(local_points, axis=0) + centroid_target = np.mean(target_points, axis=0) + + # Zentrieren + local_centered = local_points - centroid_local + target_centered = target_points - centroid_target + + # Kovarianzmatrix + H = local_centered.T @ target_centered + + # SVD + U, S, Vt = np.linalg.svd(H) + + # Rotation + R = Vt.T @ U.T + + # Reflexionskorrektur + if np.linalg.det(R) < 0: + Vt[-1, :] *= -1 + R = Vt.T @ U.T + + # Translation + t = centroid_target - R @ centroid_local + + self.rotation_matrix = R + self.translation = t + + return R, t + + def transform(self, points: np.ndarray) -> np.ndarray: + """Transformiert Punkte""" + return (self.rotation_matrix @ points.T).T + self.translation diff --git a/modules/jxl_parser.py b/modules/jxl_parser.py new file mode 100644 index 0000000..32b5a91 --- /dev/null +++ b/modules/jxl_parser.py @@ -0,0 +1,616 @@ +""" +JXL Parser Module - Trimble JXL-Datei Parser +Liest und analysiert Trimble JXL-Dateien (XML-basiert) +""" + +import xml.etree.ElementTree as ET +from dataclasses import dataclass, field +from typing import List, Dict, Optional, Tuple +import math +import copy + + +@dataclass +class Point: + """Repräsentiert einen vermessenen Punkt""" + name: str + code: str = "" + method: str = "" + survey_method: str = "" + classification: str = "" + deleted: bool = False + + # Grid-Koordinaten + north: Optional[float] = None + east: Optional[float] = None + elevation: Optional[float] = None + + # Kreis-Messungen (Rohwerte) + horizontal_circle: Optional[float] = None # in Gon + vertical_circle: Optional[float] = None # in Gon + edm_distance: Optional[float] = None # in Meter + face: str = "Face1" + + # Standardabweichungen + hz_std_error: Optional[float] = None + vz_std_error: Optional[float] = None + dist_std_error: Optional[float] = None + + # Referenzen + station_id: str = "" + target_id: str = "" + backsight_id: str = "" + + # Zusätzliche Infos + timestamp: str = "" + record_id: str = "" + pressure: Optional[float] = None + temperature: Optional[float] = None + + +@dataclass +class Station: + """Repräsentiert eine Instrumentenstation""" + name: str + theodolite_height: float = 0.0 + station_type: str = "" # ReflineStationSetup, StandardResection, etc. + instrument_id: str = "" + atmosphere_id: str = "" + scale_factor: float = 1.0 + + # Berechnete Koordinaten + north: Optional[float] = None + east: Optional[float] = None + elevation: Optional[float] = None + + # Orientierung + orientation_correction: Optional[float] = None + + record_id: str = "" + timestamp: str = "" + + +@dataclass +class Target: + """Repräsentiert ein Ziel/Prisma""" + prism_type: str = "" + prism_constant: float = 0.0 + target_height: float = 0.0 + record_id: str = "" + + +@dataclass +class BackBearing: + """Repräsentiert einen Rückwärtseinschnitt""" + station: str = "" + backsight: str = "" + face1_hz: Optional[float] = None + face2_hz: Optional[float] = None + orientation_correction: Optional[float] = None + record_id: str = "" + station_record_id: str = "" + + +@dataclass +class Instrument: + """Repräsentiert ein Vermessungsinstrument""" + instrument_type: str = "" + model: str = "" + serial: str = "" + edm_precision: float = 0.001 + edm_ppm: float = 1.5 + hz_precision: float = 0.0 + vz_precision: float = 0.0 + edm_constant: float = 0.0 + record_id: str = "" + + +@dataclass +class Atmosphere: + """Atmosphärische Bedingungen""" + pressure: float = 1013.25 + temperature: float = 20.0 + ppm: float = 0.0 + refraction_coefficient: float = 0.13 + record_id: str = "" + + +@dataclass +class Line: + """Repräsentiert eine Referenzlinie""" + name: str = "" + start_point: str = "" + end_point: str = "" + start_station: float = 0.0 + + +class JXLParser: + """Parser für Trimble JXL-Dateien""" + + def __init__(self): + self.job_name: str = "" + self.coordinate_system: str = "" + self.zone_name: str = "" + self.datum_name: str = "" + self.angle_units: str = "Gons" # Gons oder Degrees + self.distance_units: str = "Metres" + + self.points: Dict[str, Point] = {} + self.stations: Dict[str, Station] = {} + self.targets: Dict[str, Target] = {} + self.backbearings: Dict[str, BackBearing] = {} + self.instruments: Dict[str, Instrument] = {} + self.atmospheres: Dict[str, Atmosphere] = {} + self.lines: Dict[str, Line] = {} + + # Alle Messungen (auch gelöschte) + self.all_point_records: List[Point] = [] + + self.raw_xml = None + self.file_path: str = "" + + def parse(self, file_path: str) -> bool: + """Parst eine JXL-Datei""" + self.file_path = file_path + try: + tree = ET.parse(file_path) + self.raw_xml = tree + root = tree.getroot() + + # Job-Informationen + self.job_name = root.get('jobName', '') + + # FieldBook durchsuchen + fieldbook = root.find('FieldBook') + if fieldbook is None: + return False + + for element in fieldbook: + self._parse_element(element) + + # Stationskoordinaten aus Punkten zuweisen + self._assign_station_coordinates() + + return True + + except Exception as e: + print(f"Fehler beim Parsen: {e}") + return False + + def _parse_element(self, element): + """Parst ein einzelnes Element""" + tag = element.tag + record_id = element.get('ID', '') + timestamp = element.get('TimeStamp', '') + + if tag == 'CoordinateSystemRecord': + self._parse_coordinate_system(element) + elif tag == 'UnitsRecord': + self._parse_units(element) + elif tag == 'PointRecord': + self._parse_point(element, record_id, timestamp) + elif tag == 'StationRecord': + self._parse_station(element, record_id, timestamp) + elif tag == 'TargetRecord': + self._parse_target(element, record_id) + elif tag == 'BackBearingRecord': + self._parse_backbearing(element, record_id) + elif tag == 'InstrumentRecord': + self._parse_instrument(element, record_id) + elif tag == 'AtmosphereRecord': + self._parse_atmosphere(element, record_id) + elif tag == 'LineRecord': + self._parse_line(element) + + def _parse_coordinate_system(self, element): + """Parst Koordinatensystem-Informationen""" + system = element.find('SystemName') + zone = element.find('ZoneName') + datum = element.find('DatumName') + + if system is not None and system.text: + self.coordinate_system = system.text + if zone is not None and zone.text: + self.zone_name = zone.text + if datum is not None and datum.text: + self.datum_name = datum.text + + def _parse_units(self, element): + """Parst Einheiten""" + angle = element.find('AngleUnits') + distance = element.find('DistanceUnits') + + if angle is not None and angle.text: + self.angle_units = angle.text + if distance is not None and distance.text: + self.distance_units = distance.text + + def _parse_point(self, element, record_id: str, timestamp: str): + """Parst einen Punkt""" + point = Point(name="") + point.record_id = record_id + point.timestamp = timestamp + + # Basisdaten + name_elem = element.find('Name') + point.name = name_elem.text if name_elem is not None and name_elem.text else "" + + code_elem = element.find('Code') + point.code = code_elem.text if code_elem is not None and code_elem.text else "" + + method_elem = element.find('Method') + point.method = method_elem.text if method_elem is not None and method_elem.text else "" + + survey_elem = element.find('SurveyMethod') + point.survey_method = survey_elem.text if survey_elem is not None and survey_elem.text else "" + + class_elem = element.find('Classification') + point.classification = class_elem.text if class_elem is not None and class_elem.text else "" + + deleted_elem = element.find('Deleted') + point.deleted = deleted_elem is not None and deleted_elem.text == 'true' + + # Station und Target IDs + station_id_elem = element.find('StationID') + point.station_id = station_id_elem.text if station_id_elem is not None and station_id_elem.text else "" + + target_id_elem = element.find('TargetID') + point.target_id = target_id_elem.text if target_id_elem is not None and target_id_elem.text else "" + + backsight_id_elem = element.find('BackBearingID') + point.backsight_id = backsight_id_elem.text if backsight_id_elem is not None and backsight_id_elem.text else "" + + # Grid-Koordinaten + grid = element.find('Grid') + if grid is not None: + north = grid.find('North') + east = grid.find('East') + elev = grid.find('Elevation') + + if north is not None and north.text: + try: + point.north = float(north.text) + except ValueError: + pass + if east is not None and east.text: + try: + point.east = float(east.text) + except ValueError: + pass + if elev is not None and elev.text: + try: + point.elevation = float(elev.text) + except ValueError: + pass + + # ComputedGrid-Koordinaten (falls vorhanden) + computed_grid = element.find('ComputedGrid') + if computed_grid is not None: + north = computed_grid.find('North') + east = computed_grid.find('East') + elev = computed_grid.find('Elevation') + + if north is not None and north.text: + try: + point.north = float(north.text) + except ValueError: + pass + if east is not None and east.text: + try: + point.east = float(east.text) + except ValueError: + pass + if elev is not None and elev.text: + try: + point.elevation = float(elev.text) + except ValueError: + pass + + # Kreis-Messungen + circle = element.find('Circle') + if circle is not None: + hz = circle.find('HorizontalCircle') + vz = circle.find('VerticalCircle') + dist = circle.find('EDMDistance') + face = circle.find('Face') + + if hz is not None and hz.text: + point.horizontal_circle = float(hz.text) + if vz is not None and vz.text: + point.vertical_circle = float(vz.text) + if dist is not None and dist.text: + point.edm_distance = float(dist.text) + if face is not None and face.text: + point.face = face.text + + # Standardabweichungen + hz_std = circle.find('HorizontalCircleStandardError') + vz_std = circle.find('VerticalCircleStandardError') + dist_std = circle.find('EDMDistanceStandardError') + + if hz_std is not None and hz_std.text: + point.hz_std_error = float(hz_std.text) + if vz_std is not None and vz_std.text: + point.vz_std_error = float(vz_std.text) + if dist_std is not None and dist_std.text: + point.dist_std_error = float(dist_std.text) + + # Atmosphäre + pressure_elem = element.find('Pressure') + temp_elem = element.find('Temperature') + + if pressure_elem is not None and pressure_elem.text: + point.pressure = float(pressure_elem.text) + if temp_elem is not None and temp_elem.text: + point.temperature = float(temp_elem.text) + + # Zur Liste aller Punkt-Records hinzufügen + self.all_point_records.append(point) + + # Nur nicht-gelöschte Punkte zum Dictionary hinzufügen + # Überschreibe vorhandene Punkte (neuester Wert) + if not point.deleted and point.name: + self.points[point.name] = point + + def _parse_station(self, element, record_id: str, timestamp: str): + """Parst eine Station""" + name_elem = element.find('StationName') + name = name_elem.text if name_elem is not None and name_elem.text else "" + + station = Station(name=name) + station.record_id = record_id + station.timestamp = timestamp + + th_elem = element.find('TheodoliteHeight') + if th_elem is not None and th_elem.text: + station.theodolite_height = float(th_elem.text) + + type_elem = element.find('StationType') + if type_elem is not None and type_elem.text: + station.station_type = type_elem.text + + inst_elem = element.find('InstrumentID') + if inst_elem is not None and inst_elem.text: + station.instrument_id = inst_elem.text + + atm_elem = element.find('AtmosphereID') + if atm_elem is not None and atm_elem.text: + station.atmosphere_id = atm_elem.text + + scale_elem = element.find('ScaleFactor') + if scale_elem is not None and scale_elem.text: + station.scale_factor = float(scale_elem.text) + + ori_elem = element.find('OrientationCorrection') + if ori_elem is not None and ori_elem.text: + station.orientation_correction = float(ori_elem.text) + + self.stations[record_id] = station + + def _parse_target(self, element, record_id: str): + """Parst ein Target/Prisma""" + target = Target() + target.record_id = record_id + + type_elem = element.find('PrismType') + if type_elem is not None and type_elem.text: + target.prism_type = type_elem.text + + const_elem = element.find('PrismConstant') + if const_elem is not None and const_elem.text: + target.prism_constant = float(const_elem.text) + + height_elem = element.find('TargetHeight') + if height_elem is not None and height_elem.text: + target.target_height = float(height_elem.text) + + self.targets[record_id] = target + + def _parse_backbearing(self, element, record_id: str): + """Parst einen Rückwärtseinschnitt""" + bb = BackBearing() + bb.record_id = record_id + + station_elem = element.find('Station') + if station_elem is not None and station_elem.text: + bb.station = station_elem.text + + bs_elem = element.find('BackSight') + if bs_elem is not None and bs_elem.text: + bb.backsight = bs_elem.text + + face1_elem = element.find('Face1HorizontalCircle') + if face1_elem is not None and face1_elem.text: + bb.face1_hz = float(face1_elem.text) + + face2_elem = element.find('Face2HorizontalCircle') + if face2_elem is not None and face2_elem.text: + bb.face2_hz = float(face2_elem.text) + + ori_elem = element.find('OrientationCorrection') + if ori_elem is not None and ori_elem.text: + bb.orientation_correction = float(ori_elem.text) + + station_record_elem = element.find('StationRecordID') + if station_record_elem is not None and station_record_elem.text: + bb.station_record_id = station_record_elem.text + + self.backbearings[record_id] = bb + + def _parse_instrument(self, element, record_id: str): + """Parst ein Instrument""" + inst = Instrument() + inst.record_id = record_id + + type_elem = element.find('Type') + if type_elem is not None and type_elem.text: + inst.instrument_type = type_elem.text + + model_elem = element.find('Model') + if model_elem is not None and model_elem.text: + inst.model = model_elem.text + + serial_elem = element.find('Serial') + if serial_elem is not None and serial_elem.text: + inst.serial = serial_elem.text + + edm_prec_elem = element.find('EDMPrecision') + if edm_prec_elem is not None and edm_prec_elem.text: + inst.edm_precision = float(edm_prec_elem.text) + + edm_ppm_elem = element.find('EDMppm') + if edm_ppm_elem is not None and edm_ppm_elem.text: + inst.edm_ppm = float(edm_ppm_elem.text) + + hz_prec_elem = element.find('HorizontalAnglePrecision') + if hz_prec_elem is not None and hz_prec_elem.text: + inst.hz_precision = float(hz_prec_elem.text) + + vz_prec_elem = element.find('VerticalAnglePrecision') + if vz_prec_elem is not None and vz_prec_elem.text: + inst.vz_precision = float(vz_prec_elem.text) + + edm_const_elem = element.find('InstrumentAppliedEDMConstant') + if edm_const_elem is not None and edm_const_elem.text: + inst.edm_constant = float(edm_const_elem.text) + + self.instruments[record_id] = inst + + def _parse_atmosphere(self, element, record_id: str): + """Parst Atmosphäre-Daten""" + atm = Atmosphere() + atm.record_id = record_id + + press_elem = element.find('Pressure') + if press_elem is not None and press_elem.text: + atm.pressure = float(press_elem.text) + + temp_elem = element.find('Temperature') + if temp_elem is not None and temp_elem.text: + atm.temperature = float(temp_elem.text) + + ppm_elem = element.find('PPM') + if ppm_elem is not None and ppm_elem.text: + atm.ppm = float(ppm_elem.text) + + refr_elem = element.find('RefractionCoefficient') + if refr_elem is not None and refr_elem.text: + atm.refraction_coefficient = float(refr_elem.text) + + self.atmospheres[record_id] = atm + + def _parse_line(self, element): + """Parst eine Referenzlinie""" + line = Line() + + name_elem = element.find('Name') + if name_elem is not None and name_elem.text: + line.name = name_elem.text + + start_elem = element.find('StartPoint') + if start_elem is not None and start_elem.text: + line.start_point = start_elem.text + + end_elem = element.find('EndPoint') + if end_elem is not None and end_elem.text: + line.end_point = end_elem.text + + station_elem = element.find('StartStation') + if station_elem is not None and station_elem.text: + line.start_station = float(station_elem.text) + + if line.name: + self.lines[line.name] = line + + def _assign_station_coordinates(self): + """Weist den Stationen Koordinaten aus berechneten Punkten zu""" + for station_id, station in self.stations.items(): + if station.name in self.points: + point = self.points[station.name] + station.north = point.north + station.east = point.east + station.elevation = point.elevation + + def get_active_points(self) -> Dict[str, Point]: + """Gibt nur aktive (nicht gelöschte) Punkte zurück""" + return {name: p for name, p in self.points.items() if not p.deleted} + + def get_control_points(self) -> Dict[str, Point]: + """Gibt Passpunkte zurück (BackSight klassifiziert)""" + return {name: p for name, p in self.points.items() + if p.classification == 'BackSight' and not p.deleted} + + def get_measurements_for_point(self, point_name: str) -> List[Point]: + """Gibt alle Messungen für einen Punkt zurück""" + return [p for p in self.all_point_records if p.name == point_name] + + def get_measurements_from_station(self, station_id: str) -> List[Point]: + """Gibt alle Messungen von einer Station zurück""" + return [p for p in self.all_point_records + if p.station_id == station_id and not p.deleted] + + def get_prism_constants(self) -> Dict[str, float]: + """Gibt alle verwendeten Prismenkonstanten zurück""" + return {tid: t.prism_constant for tid, t in self.targets.items()} + + def modify_prism_constant(self, target_id: str, new_constant: float): + """Ändert die Prismenkonstante für ein Target""" + if target_id in self.targets: + self.targets[target_id].prism_constant = new_constant + + def remove_point(self, point_name: str): + """Entfernt einen Punkt (markiert als gelöscht)""" + if point_name in self.points: + del self.points[point_name] + + def get_station_list(self) -> List[Tuple[str, str]]: + """Gibt Liste der Stationen mit Typ zurück""" + result = [] + for sid, station in self.stations.items(): + result.append((station.name, station.station_type)) + return result + + def get_reference_line(self) -> Optional[Line]: + """Gibt die erste gefundene Referenzlinie zurück""" + if self.lines: + return list(self.lines.values())[0] + return None + + def gon_to_rad(self, gon: float) -> float: + """Konvertiert Gon zu Radiant""" + return gon * math.pi / 200.0 + + def rad_to_gon(self, rad: float) -> float: + """Konvertiert Radiant zu Gon""" + return rad * 200.0 / math.pi + + def get_summary(self) -> str: + """Gibt eine Zusammenfassung der geladenen Daten zurück""" + summary = [] + summary.append(f"Job: {self.job_name}") + summary.append(f"Koordinatensystem: {self.coordinate_system}") + summary.append(f"Zone: {self.zone_name}") + summary.append(f"Datum: {self.datum_name}") + summary.append(f"Winkeleinheit: {self.angle_units}") + summary.append(f"") + summary.append(f"Anzahl Punkte (aktiv): {len(self.get_active_points())}") + summary.append(f"Anzahl Stationen: {len(self.stations)}") + summary.append(f"Anzahl Messungen gesamt: {len(self.all_point_records)}") + summary.append(f"Anzahl Targets/Prismen: {len(self.targets)}") + summary.append(f"Anzahl Referenzlinien: {len(self.lines)}") + + # Stationsübersicht + summary.append(f"\nStationen:") + for sid, station in self.stations.items(): + summary.append(f" - {station.name}: {station.station_type}") + + # Prismenkonstanten + summary.append(f"\nPrismenkonstanten:") + for tid, target in self.targets.items(): + summary.append(f" - {target.prism_type}: {target.prism_constant*1000:.1f} mm") + + return "\n".join(summary) + + def create_copy(self): + """Erstellt eine tiefe Kopie des Parsers""" + return copy.deepcopy(self) diff --git a/modules/network_adjustment.py b/modules/network_adjustment.py new file mode 100644 index 0000000..56d9883 --- /dev/null +++ b/modules/network_adjustment.py @@ -0,0 +1,633 @@ +""" +Netzausgleichungs-Modul +Methode der kleinsten Quadrate für geodätische Netze +Basiert auf Beobachtungen aus JXL-Dateien +""" + +import math +import numpy as np +from scipy import sparse +from scipy.sparse.linalg import spsolve +from typing import List, Dict, Tuple, Optional, Set +from dataclasses import dataclass, field +from .jxl_parser import JXLParser, Point, Station + + +@dataclass +class Observation: + """Repräsentiert eine geodätische Beobachtung""" + obs_type: str # 'direction', 'distance', 'zenith', 'height_diff' + from_station: str + to_point: str + value: float # Messwert (Gon, Meter, etc.) + std_dev: float # Standardabweichung + residual: float = 0.0 # Verbesserung + weight: float = 1.0 + + def __post_init__(self): + if self.std_dev > 0: + self.weight = 1.0 / (self.std_dev ** 2) + + +@dataclass +class AdjustedPoint: + """Ausgeglichener Punkt mit Genauigkeitsangaben""" + name: str + x: float # East + y: float # North + z: float # Elevation + + # A-priori Koordinaten + x_approx: float = 0.0 + y_approx: float = 0.0 + z_approx: float = 0.0 + + # Korrekturen + dx: float = 0.0 + dy: float = 0.0 + dz: float = 0.0 + + # Standardabweichungen + std_x: float = 0.0 + std_y: float = 0.0 + std_z: float = 0.0 + std_position: float = 0.0 # 2D Punktlagegenauigkeit + + # Konfidenzellipse + semi_major: float = 0.0 + semi_minor: float = 0.0 + orientation: float = 0.0 # in Gon + + # Status + is_fixed: bool = False + + +@dataclass +class AdjustmentResult: + """Ergebnis der Netzausgleichung""" + # Ausgeglichene Punkte + adjusted_points: Dict[str, AdjustedPoint] = field(default_factory=dict) + + # Beobachtungen mit Residuen + observations: List[Observation] = field(default_factory=list) + + # Globale Qualitätsparameter + sigma_0_priori: float = 1.0 + sigma_0_posteriori: float = 0.0 + chi_square: float = 0.0 + redundancy: int = 0 + + # RMSE der Residuen + rmse_directions: float = 0.0 # in mgon + rmse_distances: float = 0.0 # in mm + rmse_zenith: float = 0.0 # in mgon + + # Iterationsinfo + iterations: int = 0 + converged: bool = False + + # Statistiken + num_points: int = 0 + num_fixed_points: int = 0 + num_observations: int = 0 + num_unknowns: int = 0 + + +class NetworkAdjustment: + """ + Netzausgleichung nach der Methode der kleinsten Quadrate + Gauß-Markov-Modell mit Beobachtungsgleichungen + """ + + def __init__(self, parser: JXLParser): + self.parser = parser + self.observations: List[Observation] = [] + self.points: Dict[str, AdjustedPoint] = {} + self.fixed_points: Set[str] = set() + self.result: Optional[AdjustmentResult] = None + + # Konfiguration + self.max_iterations: int = 20 + self.convergence_limit: float = 1e-8 # Meter + self.sigma_0_priori: float = 1.0 + + # Standard-Genauigkeiten (falls nicht aus JXL) + self.default_std_direction: float = 0.0003 # Gon (0.3 mgon) + self.default_std_distance: float = 0.002 # Meter + self.default_std_zenith: float = 0.0003 # Gon + + def extract_observations(self): + """Extrahiert Beobachtungen aus JXL-Daten""" + self.observations = [] + + for station_id, station in self.parser.stations.items(): + # Messungen von dieser Station + measurements = self.parser.get_measurements_from_station(station_id) + + # Backbearing für Orientierung finden + orientation = 0.0 + for bb_id, bb in self.parser.backbearings.items(): + if bb.station_record_id == station_id: + if bb.orientation_correction is not None: + orientation = bb.orientation_correction + break + + for meas in measurements: + if meas.name and not meas.deleted: + # Richtung + if meas.horizontal_circle is not None: + std = meas.hz_std_error if meas.hz_std_error else self.default_std_direction + azimuth = meas.horizontal_circle + orientation + + self.observations.append(Observation( + obs_type='direction', + from_station=station.name, + to_point=meas.name, + value=azimuth, + std_dev=std + )) + + # Strecke + if meas.edm_distance is not None: + std = meas.dist_std_error if meas.dist_std_error else self.default_std_distance + + # Prismenkonstante berücksichtigen + prism_const = 0.0 + if meas.target_id in self.parser.targets: + prism_const = self.parser.targets[meas.target_id].prism_constant + + distance = meas.edm_distance + prism_const + + self.observations.append(Observation( + obs_type='distance', + from_station=station.name, + to_point=meas.name, + value=distance, + std_dev=std + )) + + # Zenitwinkel + if meas.vertical_circle is not None: + std = meas.vz_std_error if meas.vz_std_error else self.default_std_zenith + + self.observations.append(Observation( + obs_type='zenith', + from_station=station.name, + to_point=meas.name, + value=meas.vertical_circle, + std_dev=std + )) + + return self.observations + + def initialize_points(self): + """Initialisiert Näherungskoordinaten aus JXL-Daten""" + self.points = {} + + # Alle aktiven Punkte + for name, point in self.parser.get_active_points().items(): + self.points[name] = AdjustedPoint( + name=name, + x=point.east if point.east is not None else 0.0, + y=point.north if point.north is not None else 0.0, + z=point.elevation if point.elevation is not None else 0.0, + x_approx=point.east if point.east is not None else 0.0, + y_approx=point.north if point.north is not None else 0.0, + z_approx=point.elevation if point.elevation is not None else 0.0 + ) + + # Stationen hinzufügen + for station_id, station in self.parser.stations.items(): + if station.name not in self.points: + self.points[station.name] = AdjustedPoint( + name=station.name, + x=station.east if station.east is not None else 0.0, + y=station.north if station.north is not None else 0.0, + z=station.elevation if station.elevation is not None else 0.0, + x_approx=station.east if station.east is not None else 0.0, + y_approx=station.north if station.north is not None else 0.0, + z_approx=station.elevation if station.elevation is not None else 0.0 + ) + + def set_fixed_point(self, point_name: str): + """Setzt einen Punkt als Festpunkt""" + if point_name in self.points: + self.fixed_points.add(point_name) + self.points[point_name].is_fixed = True + + def set_fixed_points_auto(self): + """Setzt automatisch Festpunkte (Referenzpunkte)""" + # Referenzlinie als Festpunkte + ref_line = self.parser.get_reference_line() + if ref_line: + self.set_fixed_point(ref_line.start_point) + self.set_fixed_point(ref_line.end_point) + + # Zusätzlich: Erste Station als Festpunkt falls keine Referenzlinie + if not self.fixed_points: + stations = list(self.parser.stations.values()) + if stations: + first_station = min(stations, key=lambda s: s.timestamp) + self.set_fixed_point(first_station.name) + + def adjust(self) -> AdjustmentResult: + """ + Führt die Netzausgleichung durch + Iterative Lösung nach Gauß-Newton + """ + if not self.observations: + self.extract_observations() + + if not self.points: + self.initialize_points() + + if not self.fixed_points: + self.set_fixed_points_auto() + + # Unbekannte bestimmen (nur nicht-fixierte Punkte) + unknown_points = [name for name in self.points.keys() + if name not in self.fixed_points] + + num_unknowns = len(unknown_points) * 2 # Nur X, Y (2D-Ausgleichung) + num_observations = len([o for o in self.observations + if o.obs_type in ['direction', 'distance']]) + + if num_unknowns == 0: + raise ValueError("Keine unbekannten Punkte!") + + if num_observations < num_unknowns: + raise ValueError(f"Unterbestimmtes System: {num_observations} Beobachtungen, " + f"{num_unknowns} Unbekannte") + + # Index-Mapping für Unbekannte + point_index = {name: i for i, name in enumerate(unknown_points)} + + # Iterative Lösung + converged = False + iteration = 0 + + while not converged and iteration < self.max_iterations: + iteration += 1 + + # Designmatrix A und Beobachtungsvektor l erstellen + A, l, P = self._build_normal_equations(point_index, num_unknowns) + + # Normalgleichungssystem: N = A^T * P * A, n = A^T * P * l + N = A.T @ np.diag(P) @ A + n = A.T @ np.diag(P) @ l + + # Lösung: x = N^-1 * n + try: + dx = np.linalg.solve(N, n) + except np.linalg.LinAlgError: + # Regularisierung bei singulärer Matrix + N += np.eye(num_unknowns) * 1e-10 + dx = np.linalg.solve(N, n) + + # Koordinaten aktualisieren + max_correction = 0.0 + for name, idx in point_index.items(): + i = idx * 2 + self.points[name].dx = dx[i] + self.points[name].dy = dx[i + 1] + self.points[name].x += dx[i] + self.points[name].y += dx[i + 1] + + max_correction = max(max_correction, + abs(dx[i]), abs(dx[i + 1])) + + # Konvergenzprüfung + if max_correction < self.convergence_limit: + converged = True + + # Nachbearbeitung + self._compute_residuals() + self._compute_accuracy(point_index, num_unknowns) + + # Ergebnis zusammenstellen + self.result = AdjustmentResult( + adjusted_points=self.points, + observations=self.observations, + sigma_0_priori=self.sigma_0_priori, + iterations=iteration, + converged=converged, + num_points=len(self.points), + num_fixed_points=len(self.fixed_points), + num_observations=num_observations, + num_unknowns=num_unknowns, + redundancy=num_observations - num_unknowns + ) + + self._compute_global_statistics() + + return self.result + + def _build_normal_equations(self, point_index: Dict[str, int], + num_unknowns: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: + """Erstellt Designmatrix A und Beobachtungsvektor l""" + + # Nur Richtungen und Strecken für 2D-Ausgleichung + relevant_obs = [o for o in self.observations + if o.obs_type in ['direction', 'distance']] + + n_obs = len(relevant_obs) + A = np.zeros((n_obs, num_unknowns)) + l = np.zeros(n_obs) + P = np.zeros(n_obs) + + for i, obs in enumerate(relevant_obs): + from_pt = self.points.get(obs.from_station) + to_pt = self.points.get(obs.to_point) + + if from_pt is None or to_pt is None: + continue + + dx = to_pt.x - from_pt.x + dy = to_pt.y - from_pt.y + dist = math.sqrt(dx**2 + dy**2) + + if dist < 1e-10: + continue + + from_idx = point_index.get(obs.from_station) + to_idx = point_index.get(obs.to_point) + + if obs.obs_type == 'direction': + # Azimut berechnen + azimuth_calc = math.atan2(dx, dy) * 200.0 / math.pi + if azimuth_calc < 0: + azimuth_calc += 400.0 + + # Partielle Ableitungen (in Gon) + # dAz/dx = dy / (rho * s^2) + # dAz/dy = -dx / (rho * s^2) + rho = 200.0 / math.pi + factor = rho / (dist ** 2) + + if from_idx is not None: + A[i, from_idx * 2] = -dy * factor # dAz/dx_from + A[i, from_idx * 2 + 1] = dx * factor # dAz/dy_from + + if to_idx is not None: + A[i, to_idx * 2] = dy * factor # dAz/dx_to + A[i, to_idx * 2 + 1] = -dx * factor # dAz/dy_to + + # Verkürzung l = beobachtet - berechnet + diff = obs.value - azimuth_calc + # Normalisierung auf [-200, 200] Gon + while diff > 200: + diff -= 400 + while diff < -200: + diff += 400 + l[i] = diff + + elif obs.obs_type == 'distance': + # Partielle Ableitungen + # ds/dx = dx/s + # ds/dy = dy/s + + if from_idx is not None: + A[i, from_idx * 2] = -dx / dist + A[i, from_idx * 2 + 1] = -dy / dist + + if to_idx is not None: + A[i, to_idx * 2] = dx / dist + A[i, to_idx * 2 + 1] = dy / dist + + # Verkürzung + l[i] = obs.value - dist + + # Gewicht + P[i] = obs.weight + + return A, l, P + + def _compute_residuals(self): + """Berechnet Verbesserungen (Residuen) für alle Beobachtungen""" + for obs in self.observations: + from_pt = self.points.get(obs.from_station) + to_pt = self.points.get(obs.to_point) + + if from_pt is None or to_pt is None: + continue + + dx = to_pt.x - from_pt.x + dy = to_pt.y - from_pt.y + dz = to_pt.z - from_pt.z + dist_2d = math.sqrt(dx**2 + dy**2) + dist_3d = math.sqrt(dx**2 + dy**2 + dz**2) + + if obs.obs_type == 'direction': + azimuth_calc = math.atan2(dx, dy) * 200.0 / math.pi + if azimuth_calc < 0: + azimuth_calc += 400.0 + + residual = obs.value - azimuth_calc + while residual > 200: + residual -= 400 + while residual < -200: + residual += 400 + obs.residual = residual + + elif obs.obs_type == 'distance': + obs.residual = obs.value - dist_3d + + elif obs.obs_type == 'zenith': + if dist_2d > 0: + zenith_calc = math.atan2(dist_2d, dz) * 200.0 / math.pi + obs.residual = obs.value - zenith_calc + + def _compute_accuracy(self, point_index: Dict[str, int], num_unknowns: int): + """Berechnet Genauigkeitsmaße für ausgeglichene Punkte""" + # Vereinfachte Berechnung der Standardabweichungen + # Vollständige Varianzfortpflanzung würde Inverse von N erfordern + + # Erstelle Normalgleichungsmatrix erneut + A, l, P = self._build_normal_equations(point_index, num_unknowns) + N = A.T @ np.diag(P) @ A + + try: + Q = np.linalg.inv(N) # Kofaktormatrix + except np.linalg.LinAlgError: + Q = np.eye(num_unknowns) * 0.001 + + # A-posteriori Varianzfaktor + v = A @ np.zeros(num_unknowns) - l # Residuen (vereinfacht) + redundancy = len(l) - num_unknowns + + if redundancy > 0: + sum_pvv = np.sum(P * v**2) + sigma_0_post = math.sqrt(sum_pvv / redundancy) + else: + sigma_0_post = self.sigma_0_priori + + self.sigma_0_posteriori = sigma_0_post + + # Punktgenauigkeiten + for name, idx in point_index.items(): + i = idx * 2 + + # Standardabweichungen + self.points[name].std_x = sigma_0_post * math.sqrt(abs(Q[i, i])) + self.points[name].std_y = sigma_0_post * math.sqrt(abs(Q[i + 1, i + 1])) + + # Punktlagegenauigkeit + self.points[name].std_position = math.sqrt( + self.points[name].std_x**2 + self.points[name].std_y**2 + ) + + # Konfidenzellipse (vereinfacht) + # Vollständige Berechnung würde Eigenwertanalyse erfordern + cov_xy = Q[i, i + 1] if i + 1 < num_unknowns else 0 + var_x = Q[i, i] + var_y = Q[i + 1, i + 1] + + # Eigenwerte für Ellipse + trace = var_x + var_y + det = var_x * var_y - cov_xy**2 + discriminant = trace**2 / 4 - det + + if discriminant >= 0: + sqrt_disc = math.sqrt(discriminant) + lambda1 = trace / 2 + sqrt_disc + lambda2 = trace / 2 - sqrt_disc + + self.points[name].semi_major = sigma_0_post * math.sqrt(max(lambda1, 0)) + self.points[name].semi_minor = sigma_0_post * math.sqrt(max(lambda2, 0)) + + if abs(var_x - var_y) > 1e-10: + self.points[name].orientation = 0.5 * math.atan2(2 * cov_xy, var_x - var_y) * 200 / math.pi + + def _compute_global_statistics(self): + """Berechnet globale Statistiken""" + if self.result is None: + return + + # RMSE für verschiedene Beobachtungstypen + dir_residuals = [o.residual for o in self.observations if o.obs_type == 'direction'] + dist_residuals = [o.residual for o in self.observations if o.obs_type == 'distance'] + zen_residuals = [o.residual for o in self.observations if o.obs_type == 'zenith'] + + if dir_residuals: + self.result.rmse_directions = math.sqrt(sum(r**2 for r in dir_residuals) / len(dir_residuals)) * 1000 # mgon + + if dist_residuals: + self.result.rmse_distances = math.sqrt(sum(r**2 for r in dist_residuals) / len(dist_residuals)) * 1000 # mm + + if zen_residuals: + self.result.rmse_zenith = math.sqrt(sum(r**2 for r in zen_residuals) / len(zen_residuals)) * 1000 # mgon + + self.result.sigma_0_posteriori = self.sigma_0_posteriori + + # Chi-Quadrat-Test + if self.result.redundancy > 0: + self.result.chi_square = (self.sigma_0_posteriori / self.sigma_0_priori) ** 2 * self.result.redundancy + + def get_adjustment_report(self) -> str: + """Erstellt einen detaillierten Ausgleichungsbericht""" + if self.result is None: + return "Keine Ausgleichung durchgeführt." + + lines = [] + lines.append("=" * 80) + lines.append("NETZAUSGLEICHUNG - ERGEBNISBERICHT") + lines.append("=" * 80) + lines.append("") + + # Allgemeine Informationen + lines.append("ALLGEMEINE INFORMATIONEN") + lines.append("-" * 80) + lines.append(f"Job: {self.parser.job_name}") + lines.append(f"Anzahl Punkte: {self.result.num_points}") + lines.append(f" davon Festpunkte: {self.result.num_fixed_points}") + lines.append(f" davon Neupunkte: {self.result.num_points - self.result.num_fixed_points}") + lines.append(f"Anzahl Beobachtungen: {self.result.num_observations}") + lines.append(f"Anzahl Unbekannte: {self.result.num_unknowns}") + lines.append(f"Redundanz: {self.result.redundancy}") + lines.append(f"Iterationen: {self.result.iterations}") + lines.append(f"Konvergiert: {'Ja' if self.result.converged else 'Nein'}") + lines.append("") + + # Qualitätsparameter + lines.append("GLOBALE QUALITÄTSPARAMETER") + lines.append("-" * 80) + lines.append(f"Sigma-0 a-priori: {self.sigma_0_priori:.4f}") + lines.append(f"Sigma-0 a-posteriori: {self.result.sigma_0_posteriori:.4f}") + lines.append(f"Chi-Quadrat: {self.result.chi_square:.2f}") + lines.append(f"RMSE Richtungen: {self.result.rmse_directions:.2f} mgon") + lines.append(f"RMSE Strecken: {self.result.rmse_distances:.2f} mm") + lines.append(f"RMSE Zenitwinkel: {self.result.rmse_zenith:.2f} mgon") + lines.append("") + + # Festpunkte + lines.append("FESTPUNKTE") + lines.append("-" * 80) + lines.append(f"{'Punkt':<12} {'X [m]':>14} {'Y [m]':>14} {'Z [m]':>12}") + lines.append("-" * 80) + for name in self.fixed_points: + if name in self.points: + p = self.points[name] + lines.append(f"{name:<12} {p.x:>14.4f} {p.y:>14.4f} {p.z:>12.4f}") + lines.append("") + + # Ausgeglichene Koordinaten + lines.append("AUSGEGLICHENE KOORDINATEN (NEUPUNKTE)") + lines.append("-" * 80) + lines.append(f"{'Punkt':<12} {'X [m]':>14} {'Y [m]':>14} {'Z [m]':>12} {'σX [mm]':>10} {'σY [mm]':>10} {'σPos [mm]':>10}") + lines.append("-" * 80) + + for name, p in sorted(self.points.items()): + if name not in self.fixed_points: + lines.append(f"{name:<12} {p.x:>14.4f} {p.y:>14.4f} {p.z:>12.4f} " + f"{p.std_x*1000:>10.2f} {p.std_y*1000:>10.2f} {p.std_position*1000:>10.2f}") + lines.append("") + + # Beobachtungen und Residuen + lines.append("BEOBACHTUNGEN UND VERBESSERUNGEN") + lines.append("-" * 80) + lines.append(f"{'Von':<10} {'Nach':<10} {'Typ':<10} {'Messwert':>14} {'Residuum':>12}") + lines.append("-" * 80) + + for obs in sorted(self.observations, key=lambda x: (x.from_station, x.to_point)): + if obs.obs_type == 'direction': + unit = "gon" + res_str = f"{obs.residual*1000:.2f} mgon" + elif obs.obs_type == 'distance': + unit = "m" + res_str = f"{obs.residual*1000:.2f} mm" + elif obs.obs_type == 'zenith': + unit = "gon" + res_str = f"{obs.residual*1000:.2f} mgon" + else: + unit = "" + res_str = f"{obs.residual:.4f}" + + lines.append(f"{obs.from_station:<10} {obs.to_point:<10} {obs.obs_type:<10} " + f"{obs.value:>12.4f} {unit:<2} {res_str:>12}") + + lines.append("") + lines.append("=" * 80) + + return "\n".join(lines) + + def export_adjusted_points(self, filepath: str, format: str = 'csv'): + """Exportiert ausgeglichene Punkte""" + if format == 'csv': + lines = ["Punkt;X;Y;Z;Sigma_X;Sigma_Y;Sigma_Pos;Festpunkt"] + for name, p in sorted(self.points.items()): + fixed = "Ja" if p.is_fixed else "Nein" + lines.append(f"{name};{p.x:.4f};{p.y:.4f};{p.z:.4f};" + f"{p.std_x*1000:.2f};{p.std_y*1000:.2f};{p.std_position*1000:.2f};{fixed}") + else: + lines = [] + for name, p in sorted(self.points.items()): + lines.append(f"{name}\t{p.x:.4f}\t{p.y:.4f}\t{p.z:.4f}") + + with open(filepath, 'w', encoding='utf-8') as f: + f.write("\n".join(lines)) + + def export_report(self, filepath: str): + """Exportiert den vollständigen Bericht""" + report = self.get_adjustment_report() + with open(filepath, 'w', encoding='utf-8') as f: + f.write(report) diff --git a/modules/transformation.py b/modules/transformation.py new file mode 100644 index 0000000..e0da406 --- /dev/null +++ b/modules/transformation.py @@ -0,0 +1,321 @@ +""" +Koordinatentransformations-Modul +Unterstützt Rotation, Translation in XY und Z +KEINE Maßstabsänderung (wie vom Benutzer gefordert) +""" + +import math +import numpy as np +from typing import List, Tuple, Optional, Dict +from dataclasses import dataclass +from .cor_generator import CORPoint + + +@dataclass +class TransformationParameters: + """Parameter für die Koordinatentransformation""" + # Translation + dx: float = 0.0 # Verschiebung in X (East) + dy: float = 0.0 # Verschiebung in Y (North) + dz: float = 0.0 # Verschiebung in Z (Höhe) + + # Rotation um Z-Achse (in Gon) + rotation_gon: float = 0.0 + + # Drehpunkt (für Rotation) + pivot_x: float = 0.0 + pivot_y: float = 0.0 + + def rotation_rad(self) -> float: + """Gibt Rotation in Radiant zurück""" + return self.rotation_gon * math.pi / 200.0 + + +class CoordinateTransformer: + """Transformiert Koordinaten: Rotation und Translation""" + + def __init__(self): + self.params = TransformationParameters() + self.original_points: List[CORPoint] = [] + self.transformed_points: List[CORPoint] = [] + + def set_points(self, points: List[CORPoint]): + """Setzt die zu transformierenden Punkte""" + self.original_points = points.copy() + self.transformed_points = [] + + def set_manual_parameters(self, dx: float, dy: float, dz: float, + rotation_gon: float, pivot_x: float = 0.0, + pivot_y: float = 0.0): + """Setzt Transformationsparameter manuell""" + self.params.dx = dx + self.params.dy = dy + self.params.dz = dz + self.params.rotation_gon = rotation_gon + self.params.pivot_x = pivot_x + self.params.pivot_y = pivot_y + + def compute_from_two_points(self, + point1_name: str, + point2_name: str, + z_reference_name: Optional[str] = None) -> bool: + """ + Berechnet Transformation aus zwei Punkten: + - point1 wird zum Ursprung (0,0) + - point2 definiert die Y-Richtung (Nordrichtung) + - z_reference (optional) definiert Z=0 + """ + # Finde Punkte + point1 = None + point2 = None + z_ref = None + + for p in self.original_points: + if p.name == point1_name: + point1 = p + elif p.name == point2_name: + point2 = p + if z_reference_name and p.name == z_reference_name: + z_ref = p + + if point1 is None or point2 is None: + return False + + # Drehpunkt ist point1 + self.params.pivot_x = point1.x + self.params.pivot_y = point1.y + + # Translation: point1 zum Ursprung + self.params.dx = -point1.x + self.params.dy = -point1.y + + # Rotation: point2 soll auf der positiven Y-Achse liegen + # Berechne Richtung von point1 zu point2 + dx_12 = point2.x - point1.x + dy_12 = point2.y - point1.y + + # Aktueller Winkel zur Y-Achse (Nordrichtung) + current_angle_rad = math.atan2(dx_12, dy_12) # atan2(x,y) für Azimut + + # Rotation um diesen Winkel (negativ, um auf Y-Achse zu drehen) + self.params.rotation_gon = -current_angle_rad * 200.0 / math.pi + + # Z-Verschiebung + if z_ref: + self.params.dz = -z_ref.z + else: + self.params.dz = -point1.z + + return True + + def transform(self) -> List[CORPoint]: + """Führt die Transformation durch""" + self.transformed_points = [] + + rot_rad = self.params.rotation_rad() + cos_r = math.cos(rot_rad) + sin_r = math.sin(rot_rad) + + for p in self.original_points: + # 1. Zum Drehpunkt verschieben + x_shifted = p.x - self.params.pivot_x + y_shifted = p.y - self.params.pivot_y + + # 2. Rotation anwenden + x_rotated = x_shifted * cos_r - y_shifted * sin_r + y_rotated = x_shifted * sin_r + y_shifted * cos_r + + # 3. Translation anwenden + x_final = x_rotated + self.params.pivot_x + self.params.dx + y_final = y_rotated + self.params.pivot_y + self.params.dy + z_final = p.z + self.params.dz + + self.transformed_points.append(CORPoint( + name=p.name, + x=x_final, + y=y_final, + z=z_final + )) + + return self.transformed_points + + def transform_single_point(self, x: float, y: float, z: float) -> Tuple[float, float, float]: + """Transformiert einen einzelnen Punkt""" + rot_rad = self.params.rotation_rad() + cos_r = math.cos(rot_rad) + sin_r = math.sin(rot_rad) + + # Zum Drehpunkt verschieben + x_shifted = x - self.params.pivot_x + y_shifted = y - self.params.pivot_y + + # Rotation + x_rotated = x_shifted * cos_r - y_shifted * sin_r + y_rotated = x_shifted * sin_r + y_shifted * cos_r + + # Translation + x_final = x_rotated + self.params.pivot_x + self.params.dx + y_final = y_rotated + self.params.pivot_y + self.params.dy + z_final = z + self.params.dz + + return x_final, y_final, z_final + + def inverse_transform(self, x: float, y: float, z: float) -> Tuple[float, float, float]: + """Inverse Transformation""" + # Inverse Translation + x_inv = x - self.params.dx - self.params.pivot_x + y_inv = y - self.params.dy - self.params.pivot_y + z_inv = z - self.params.dz + + # Inverse Rotation + rot_rad = -self.params.rotation_rad() + cos_r = math.cos(rot_rad) + sin_r = math.sin(rot_rad) + + x_rotated = x_inv * cos_r - y_inv * sin_r + y_rotated = x_inv * sin_r + y_inv * cos_r + + # Zurück vom Drehpunkt + x_final = x_rotated + self.params.pivot_x + y_final = y_rotated + self.params.pivot_y + + return x_final, y_final, z_inv + + def get_transformation_matrix(self) -> np.ndarray: + """Gibt die 4x4 Transformationsmatrix zurück""" + rot_rad = self.params.rotation_rad() + cos_r = math.cos(rot_rad) + sin_r = math.sin(rot_rad) + + # Translation zum Drehpunkt + T1 = np.array([ + [1, 0, 0, -self.params.pivot_x], + [0, 1, 0, -self.params.pivot_y], + [0, 0, 1, 0], + [0, 0, 0, 1] + ]) + + # Rotation + R = np.array([ + [cos_r, -sin_r, 0, 0], + [sin_r, cos_r, 0, 0], + [0, 0, 1, 0], + [0, 0, 0, 1] + ]) + + # Translation zurück und zusätzliche Verschiebung + T2 = np.array([ + [1, 0, 0, self.params.pivot_x + self.params.dx], + [0, 1, 0, self.params.pivot_y + self.params.dy], + [0, 0, 1, self.params.dz], + [0, 0, 0, 1] + ]) + + return T2 @ R @ T1 + + def get_parameters_report(self) -> str: + """Gibt einen Bericht über die Transformationsparameter zurück""" + report = [] + report.append("=" * 50) + report.append("KOORDINATENTRANSFORMATION - PARAMETER") + report.append("=" * 50) + report.append("") + report.append("Translation:") + report.append(f" dX (East): {self.params.dx:+.4f} m") + report.append(f" dY (North): {self.params.dy:+.4f} m") + report.append(f" dZ (Höhe): {self.params.dz:+.4f} m") + report.append("") + report.append("Rotation:") + report.append(f" Winkel: {self.params.rotation_gon:+.6f} gon") + report.append(f" Winkel: {self.params.rotation_gon * 0.9:+.6f}°") + report.append(f" Winkel: {self.params.rotation_rad():+.8f} rad") + report.append("") + report.append("Drehpunkt:") + report.append(f" X: {self.params.pivot_x:.4f} m") + report.append(f" Y: {self.params.pivot_y:.4f} m") + report.append("") + report.append("Hinweis: Keine Maßstabsänderung (Maßstab = 1.0)") + report.append("=" * 50) + + return "\n".join(report) + + def get_comparison_table(self) -> str: + """Erstellt eine Vergleichstabelle Original vs. Transformiert""" + if not self.original_points or not self.transformed_points: + return "Keine Daten verfügbar." + + lines = [] + lines.append("=" * 90) + lines.append("KOORDINATENVERGLEICH: ORIGINAL → TRANSFORMIERT") + lines.append("=" * 90) + lines.append(f"{'Punkt':<10} {'X_orig':>12} {'Y_orig':>12} {'Z_orig':>10} | " + f"{'X_trans':>12} {'Y_trans':>12} {'Z_trans':>10}") + lines.append("-" * 90) + + for orig, trans in zip(self.original_points, self.transformed_points): + lines.append(f"{orig.name:<10} {orig.x:>12.4f} {orig.y:>12.4f} {orig.z:>10.4f} | " + f"{trans.x:>12.4f} {trans.y:>12.4f} {trans.z:>10.4f}") + + lines.append("=" * 90) + return "\n".join(lines) + + +class LocalSystemTransformer: + """ + Spezielle Transformation für lokale Systeme + Transformiert in ein System mit definiertem Ursprung und Ausrichtung + """ + + def __init__(self): + self.origin_point: Optional[str] = None + self.direction_point: Optional[str] = None + self.z_reference_point: Optional[str] = None + self.transformer = CoordinateTransformer() + + def setup_local_system(self, points: List[CORPoint], + origin_name: str, + direction_name: str, + z_ref_name: Optional[str] = None) -> bool: + """ + Richtet ein lokales Koordinatensystem ein: + - origin_name: Punkt bei (0,0) + - direction_name: Punkt definiert Y-Richtung (liegt auf positiver Y-Achse) + - z_ref_name: Punkt definiert Z=0 + """ + self.origin_point = origin_name + self.direction_point = direction_name + self.z_reference_point = z_ref_name + + self.transformer.set_points(points) + + success = self.transformer.compute_from_two_points( + origin_name, + direction_name, + z_ref_name + ) + + if success: + self.transformer.transform() + + return success + + def get_transformed_points(self) -> List[CORPoint]: + """Gibt die transformierten Punkte zurück""" + return self.transformer.transformed_points + + def get_report(self) -> str: + """Gibt einen vollständigen Bericht zurück""" + report = [] + report.append("=" * 60) + report.append("LOKALES KOORDINATENSYSTEM") + report.append("=" * 60) + report.append(f"Ursprung (0,0): {self.origin_point}") + report.append(f"Y-Richtung: {self.direction_point}") + if self.z_reference_point: + report.append(f"Z-Referenz (0): {self.z_reference_point}") + report.append("") + report.append(self.transformer.get_parameters_report()) + report.append("") + report.append(self.transformer.get_comparison_table()) + + return "\n".join(report)