""" Netzausgleichungs-Modul Methode der kleinsten Quadrate für geodätische Netze Basiert auf Beobachtungen aus JXL-Dateien """ import math import numpy as np from scipy import sparse from scipy.sparse.linalg import spsolve from typing import List, Dict, Tuple, Optional, Set from dataclasses import dataclass, field from .jxl_parser import JXLParser, Point, Station @dataclass class Observation: """Repräsentiert eine geodätische Beobachtung""" obs_type: str # 'direction', 'distance', 'zenith', 'height_diff' from_station: str to_point: str value: float # Messwert (Gon, Meter, etc.) std_dev: float # Standardabweichung residual: float = 0.0 # Verbesserung weight: float = 1.0 def __post_init__(self): if self.std_dev > 0: self.weight = 1.0 / (self.std_dev ** 2) @dataclass class AdjustedPoint: """Ausgeglichener Punkt mit Genauigkeitsangaben""" name: str x: float # East y: float # North z: float # Elevation # A-priori Koordinaten x_approx: float = 0.0 y_approx: float = 0.0 z_approx: float = 0.0 # Korrekturen dx: float = 0.0 dy: float = 0.0 dz: float = 0.0 # Standardabweichungen std_x: float = 0.0 std_y: float = 0.0 std_z: float = 0.0 std_position: float = 0.0 # 2D Punktlagegenauigkeit # Konfidenzellipse semi_major: float = 0.0 semi_minor: float = 0.0 orientation: float = 0.0 # in Gon # Status is_fixed: bool = False @dataclass class AdjustmentResult: """Ergebnis der Netzausgleichung""" # Ausgeglichene Punkte adjusted_points: Dict[str, AdjustedPoint] = field(default_factory=dict) # Beobachtungen mit Residuen observations: List[Observation] = field(default_factory=list) # Globale Qualitätsparameter sigma_0_priori: float = 1.0 sigma_0_posteriori: float = 0.0 chi_square: float = 0.0 redundancy: int = 0 # RMSE der Residuen rmse_directions: float = 0.0 # in mgon rmse_distances: float = 0.0 # in mm rmse_zenith: float = 0.0 # in mgon # Iterationsinfo iterations: int = 0 converged: bool = False # Statistiken num_points: int = 0 num_fixed_points: int = 0 num_observations: int = 0 num_unknowns: int = 0 class NetworkAdjustment: """ Netzausgleichung nach der Methode der kleinsten Quadrate Gauß-Markov-Modell mit Beobachtungsgleichungen KORRIGIERT: Mit Orientierungsunbekannten pro Station """ def __init__(self, parser: JXLParser): self.parser = parser self.observations: List[Observation] = [] self.points: Dict[str, AdjustedPoint] = {} self.fixed_points: Set[str] = set() self.result: Optional[AdjustmentResult] = None # Orientierungsunbekannte pro Station self.orientations: Dict[str, float] = {} # Station -> Orientierungskorrektur in Gon self.station_names: List[str] = [] # Speichere ursprüngliche Koordinaten für Vergleich self.coords_before: Dict[str, Tuple[float, float, float]] = {} # Konfiguration self.max_iterations: int = 20 self.convergence_limit: float = 1e-8 # Meter self.sigma_0_priori: float = 1.0 # Standard-Genauigkeiten (falls nicht aus JXL) self.default_std_direction: float = 0.0003 # Gon (0.3 mgon) self.default_std_distance: float = 0.002 # Meter self.default_std_zenith: float = 0.0003 # Gon def extract_observations(self): """ Extrahiert Beobachtungen aus JXL-Daten WICHTIG: Richtungen werden als ROHE Kreislesungen extrahiert (ohne Orientierungskorrektur), da die Orientierung als Unbekannte geschätzt wird. """ self.observations = [] for station_id, station in self.parser.stations.items(): # Messungen von dieser Station measurements = self.parser.get_measurements_from_station(station_id) for meas in measurements: if meas.name and not meas.deleted: # Richtung - OHNE Orientierungskorrektur (wird als Unbekannte geschätzt) if meas.horizontal_circle is not None: std = meas.hz_std_error if meas.hz_std_error else self.default_std_direction self.observations.append(Observation( obs_type='direction', from_station=station.name, to_point=meas.name, value=meas.horizontal_circle, # Rohe Kreislesung std_dev=std )) # Strecke (Horizontalstrecke aus 3D-Messung) if meas.edm_distance is not None and meas.vertical_circle is not None: std = meas.dist_std_error if meas.dist_std_error else self.default_std_distance # Prismenkonstante berücksichtigen prism_const = 0.0 if meas.target_id in self.parser.targets: prism_const = self.parser.targets[meas.target_id].prism_constant # Schrägstrecke slope_distance = meas.edm_distance + prism_const # Horizontalstrecke aus Schrägstrecke und Zenitwinkel zenith_rad = meas.vertical_circle * math.pi / 200.0 horizontal_distance = slope_distance * math.sin(zenith_rad) self.observations.append(Observation( obs_type='distance', from_station=station.name, to_point=meas.name, value=horizontal_distance, # Horizontalstrecke std_dev=std )) # Zenitwinkel if meas.vertical_circle is not None: std = meas.vz_std_error if meas.vz_std_error else self.default_std_zenith self.observations.append(Observation( obs_type='zenith', from_station=station.name, to_point=meas.name, value=meas.vertical_circle, std_dev=std )) return self.observations def initialize_points(self): """Initialisiert Näherungskoordinaten aus JXL-Daten""" self.points = {} self.coords_before = {} self.station_names = [] self.orientations = {} # Alle aktiven Punkte for name, point in self.parser.get_active_points().items(): x = point.east if point.east is not None else 0.0 y = point.north if point.north is not None else 0.0 z = point.elevation if point.elevation is not None else 0.0 self.points[name] = AdjustedPoint( name=name, x=x, y=y, z=z, x_approx=x, y_approx=y, z_approx=z ) self.coords_before[name] = (x, y, z) # Stationen hinzufügen for station_id, station in self.parser.stations.items(): if station.name not in self.points: x = station.east if station.east is not None else 0.0 y = station.north if station.north is not None else 0.0 z = station.elevation if station.elevation is not None else 0.0 self.points[station.name] = AdjustedPoint( name=station.name, x=x, y=y, z=z, x_approx=x, y_approx=y, z_approx=z ) self.coords_before[station.name] = (x, y, z) if station.name not in self.station_names: self.station_names.append(station.name) # Orientierungen aus bekannten Koordinaten und Beobachtungen berechnen self._compute_initial_orientations() def _compute_initial_orientations(self): """ Berechnet initiale Orientierungen aus den bekannten Koordinaten. Dies ist wichtig, da die JXL-Koordinaten bereits fertig berechnet sind. """ for station_name in self.station_names: orientations_for_station = [] for obs in self.observations: if obs.obs_type == 'direction' and obs.from_station == station_name: from_pt = self.points.get(station_name) to_pt = self.points.get(obs.to_point) if from_pt and to_pt: dx = to_pt.x - from_pt.x dy = to_pt.y - from_pt.y dist = math.sqrt(dx**2 + dy**2) if dist > 0.01: # Mindestentfernung # Azimut aus Koordinaten (geodätisch: atan2(dx, dy)) azimuth = math.atan2(dx, dy) * 200.0 / math.pi if azimuth < 0: azimuth += 400.0 # Orientierung = Azimut - Richtung orientation = azimuth - obs.value # Normalisieren auf [0, 400] gon while orientation < 0: orientation += 400.0 while orientation >= 400: orientation -= 400.0 orientations_for_station.append(orientation) # Mittlere Orientierung verwenden (robust gegen Ausreißer) if orientations_for_station: # Kreismittel berechnen (für Winkel) sin_sum = sum(math.sin(o * math.pi / 200) for o in orientations_for_station) cos_sum = sum(math.cos(o * math.pi / 200) for o in orientations_for_station) mean_orientation = math.atan2(sin_sum, cos_sum) * 200 / math.pi if mean_orientation < 0: mean_orientation += 400.0 self.orientations[station_name] = mean_orientation else: self.orientations[station_name] = 0.0 def set_fixed_point(self, point_name: str): """Setzt einen Punkt als Festpunkt""" if point_name in self.points: self.fixed_points.add(point_name) self.points[point_name].is_fixed = True def set_fixed_points_auto(self): """Setzt automatisch Festpunkte (Referenzpunkte)""" # Referenzlinie als Festpunkte ref_line = self.parser.get_reference_line() if ref_line: self.set_fixed_point(ref_line.start_point) self.set_fixed_point(ref_line.end_point) # Zusätzlich: Erste Station als Festpunkt falls keine Referenzlinie if not self.fixed_points: stations = list(self.parser.stations.values()) if stations: first_station = min(stations, key=lambda s: s.timestamp) self.set_fixed_point(first_station.name) def check_consistency(self) -> dict: """ Prüft die Konsistenz zwischen Koordinaten und Beobachtungen. Gibt einen Bericht über die Qualität der Daten zurück. """ if not self.observations: self.extract_observations() if not self.points: self.initialize_points() result = { 'consistent': True, 'orientation_spread': {}, 'distance_residuals': [], 'issues': [] } # Prüfe Orientierungen pro Station for station_name in self.station_names: orientations = [] for obs in self.observations: if obs.obs_type == 'direction' and obs.from_station == station_name: from_pt = self.points.get(station_name) to_pt = self.points.get(obs.to_point) if from_pt and to_pt: dx = to_pt.x - from_pt.x dy = to_pt.y - from_pt.y dist = math.sqrt(dx**2 + dy**2) if dist > 0.01: azimuth = math.atan2(dx, dy) * 200.0 / math.pi if azimuth < 0: azimuth += 400.0 ori = azimuth - obs.value while ori < 0: ori += 400.0 while ori >= 400: ori -= 400.0 orientations.append(ori) if orientations: spread = max(orientations) - min(orientations) result['orientation_spread'][station_name] = { 'min': min(orientations), 'max': max(orientations), 'spread': spread } if spread > 1.0: # > 1 gon Spannweite result['consistent'] = False result['issues'].append(f"Station {station_name}: Orientierungsspannweite {spread:.2f} gon") return result def adjust(self, mode: str = "residuals_only") -> AdjustmentResult: """ Führt die Netzausgleichung durch. mode: "residuals_only" - Berechnet nur Residuen, keine Koordinatenänderung (Standard) "full" - Vollständige Ausgleichung mit Koordinatenkorrektur """ if not self.observations: self.extract_observations() if not self.points: self.initialize_points() if not self.fixed_points: self.set_fixed_points_auto() # Unbekannte bestimmen (nur nicht-fixierte Punkte) unknown_points = [name for name in self.points.keys() if name not in self.fixed_points] num_observations = len([o for o in self.observations if o.obs_type in ['direction', 'distance']]) if mode == "residuals_only": # Nur Residuenberechnung - Koordinaten bleiben unverändert # Orientierungen wurden bereits in initialize_points berechnet # Residuen berechnen self._compute_residuals() # Sigma-0 aus Residuen berechnen dir_residuals = [o.residual for o in self.observations if o.obs_type == 'direction'] dist_residuals = [o.residual for o in self.observations if o.obs_type == 'distance'] # Einfache Schätzung von sigma-0 if dir_residuals: rmse_dir = math.sqrt(sum(r**2 for r in dir_residuals) / len(dir_residuals)) else: rmse_dir = 0 if dist_residuals: rmse_dist = math.sqrt(sum(r**2 for r in dist_residuals) / len(dist_residuals)) else: rmse_dist = 0 self.sigma_0_posteriori = max(rmse_dir * 1000, rmse_dist * 1000) # vereinfacht # Ergebnis zusammenstellen self.result = AdjustmentResult( adjusted_points=self.points, observations=self.observations, sigma_0_priori=self.sigma_0_priori, iterations=0, converged=True, num_points=len(self.points), num_fixed_points=len(self.fixed_points), num_observations=num_observations, num_unknowns=0, redundancy=num_observations ) self._compute_global_statistics() return self.result # Vollständige Ausgleichung (mode == "full") # Anzahl Unbekannte: 2 pro Punkt (X,Y) + 1 Orientierung pro Station num_point_unknowns = len(unknown_points) * 2 num_orientation_unknowns = len(self.station_names) num_unknowns = num_point_unknowns + num_orientation_unknowns if num_unknowns == 0: raise ValueError("Keine unbekannten Punkte!") if num_observations < num_unknowns: raise ValueError(f"Unterbestimmtes System: {num_observations} Beobachtungen, " f"{num_unknowns} Unbekannte") # Index-Mapping für Unbekannte point_index = {name: i for i, name in enumerate(unknown_points)} orientation_index = {name: i for i, name in enumerate(self.station_names)} # Iterative Lösung converged = False iteration = 0 while not converged and iteration < self.max_iterations: iteration += 1 A, l, P = self._build_normal_equations_with_orientation( point_index, orientation_index, num_point_unknowns, num_unknowns) N = A.T @ np.diag(P) @ A n = A.T @ np.diag(P) @ l try: dx = np.linalg.solve(N, n) except np.linalg.LinAlgError: N += np.eye(num_unknowns) * 1e-10 dx = np.linalg.solve(N, n) max_correction = 0.0 for name, idx in point_index.items(): i = idx * 2 self.points[name].dx = dx[i] self.points[name].dy = dx[i + 1] self.points[name].x += dx[i] self.points[name].y += dx[i + 1] max_correction = max(max_correction, abs(dx[i]), abs(dx[i + 1])) for name, idx in orientation_index.items(): i = num_point_unknowns + idx self.orientations[name] += dx[i] if max_correction < self.convergence_limit: converged = True self._compute_residuals() self._compute_accuracy(point_index, num_point_unknowns) self.result = AdjustmentResult( adjusted_points=self.points, observations=self.observations, sigma_0_priori=self.sigma_0_priori, iterations=iteration, converged=converged, num_points=len(self.points), num_fixed_points=len(self.fixed_points), num_observations=num_observations, num_unknowns=num_unknowns, redundancy=num_observations - num_unknowns ) self._compute_global_statistics() return self.result def _build_normal_equations_with_orientation(self, point_index: Dict[str, int], orientation_index: Dict[str, int], num_point_unknowns: int, num_unknowns: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Erstellt Designmatrix A und Beobachtungsvektor l MIT Orientierungsunbekannten pro Station Unbekannte: [x1, y1, x2, y2, ..., o1, o2, ...] """ # Nur Richtungen und Strecken für 2D-Ausgleichung relevant_obs = [o for o in self.observations if o.obs_type in ['direction', 'distance']] n_obs = len(relevant_obs) A = np.zeros((n_obs, num_unknowns)) l = np.zeros(n_obs) P = np.zeros(n_obs) for i, obs in enumerate(relevant_obs): from_pt = self.points.get(obs.from_station) to_pt = self.points.get(obs.to_point) if from_pt is None or to_pt is None: continue dx = to_pt.x - from_pt.x dy = to_pt.y - from_pt.y dist = math.sqrt(dx**2 + dy**2) if dist < 1e-10: continue from_idx = point_index.get(obs.from_station) to_idx = point_index.get(obs.to_point) if obs.obs_type == 'direction': # Azimut aus Koordinaten berechnen azimuth_calc = math.atan2(dx, dy) * 200.0 / math.pi if azimuth_calc < 0: azimuth_calc += 400.0 # Orientierung der Station station_orientation = self.orientations.get(obs.from_station, 0.0) # Berechnete Richtung = Azimut - Orientierung direction_calc = azimuth_calc - station_orientation while direction_calc < 0: direction_calc += 400.0 while direction_calc >= 400: direction_calc -= 400.0 # Partielle Ableitungen für Richtungen (in Gon) rho = 200.0 / math.pi factor = rho / (dist ** 2) # Ableitungen nach Punktkoordinaten if from_idx is not None: A[i, from_idx * 2] = -dy * factor # dRichtung/dx_from A[i, from_idx * 2 + 1] = dx * factor # dRichtung/dy_from if to_idx is not None: A[i, to_idx * 2] = dy * factor # dRichtung/dx_to A[i, to_idx * 2 + 1] = -dx * factor # dRichtung/dy_to # Ableitung nach Orientierung: dRichtung/do = -1 ori_idx = orientation_index.get(obs.from_station) if ori_idx is not None: A[i, num_point_unknowns + ori_idx] = -1.0 # Verkürzung l = beobachtet - berechnet diff = obs.value - direction_calc # Normalisierung auf [-200, 200] Gon while diff > 200: diff -= 400 while diff < -200: diff += 400 l[i] = diff elif obs.obs_type == 'distance': # Partielle Ableitungen für Strecken if from_idx is not None: A[i, from_idx * 2] = -dx / dist A[i, from_idx * 2 + 1] = -dy / dist if to_idx is not None: A[i, to_idx * 2] = dx / dist A[i, to_idx * 2 + 1] = dy / dist # Strecken sind unabhängig von der Orientierung # Verkürzung l[i] = obs.value - dist # Gewicht P[i] = obs.weight return A, l, P def _build_normal_equations(self, point_index: Dict[str, int], num_unknowns: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Erstellt Designmatrix A und Beobachtungsvektor l (alte Version ohne Orientierung)""" # Nur Richtungen und Strecken für 2D-Ausgleichung relevant_obs = [o for o in self.observations if o.obs_type in ['direction', 'distance']] n_obs = len(relevant_obs) A = np.zeros((n_obs, num_unknowns)) l = np.zeros(n_obs) P = np.zeros(n_obs) for i, obs in enumerate(relevant_obs): from_pt = self.points.get(obs.from_station) to_pt = self.points.get(obs.to_point) if from_pt is None or to_pt is None: continue dx = to_pt.x - from_pt.x dy = to_pt.y - from_pt.y dist = math.sqrt(dx**2 + dy**2) if dist < 1e-10: continue from_idx = point_index.get(obs.from_station) to_idx = point_index.get(obs.to_point) if obs.obs_type == 'direction': # Azimut berechnen azimuth_calc = math.atan2(dx, dy) * 200.0 / math.pi if azimuth_calc < 0: azimuth_calc += 400.0 # Partielle Ableitungen (in Gon) rho = 200.0 / math.pi factor = rho / (dist ** 2) if from_idx is not None: A[i, from_idx * 2] = -dy * factor # dAz/dx_from A[i, from_idx * 2 + 1] = dx * factor # dAz/dy_from if to_idx is not None: A[i, to_idx * 2] = dy * factor # dAz/dx_to A[i, to_idx * 2 + 1] = -dx * factor # dAz/dy_to # Verkürzung l = beobachtet - berechnet diff = obs.value - azimuth_calc # Normalisierung auf [-200, 200] Gon while diff > 200: diff -= 400 while diff < -200: diff += 400 l[i] = diff elif obs.obs_type == 'distance': # Partielle Ableitungen if from_idx is not None: A[i, from_idx * 2] = -dx / dist A[i, from_idx * 2 + 1] = -dy / dist if to_idx is not None: A[i, to_idx * 2] = dx / dist A[i, to_idx * 2 + 1] = dy / dist # Verkürzung l[i] = obs.value - dist # Gewicht P[i] = obs.weight return A, l, P def _compute_residuals(self): """Berechnet Verbesserungen (Residuen) für alle Beobachtungen""" for obs in self.observations: from_pt = self.points.get(obs.from_station) to_pt = self.points.get(obs.to_point) if from_pt is None or to_pt is None: continue dx = to_pt.x - from_pt.x dy = to_pt.y - from_pt.y dz = to_pt.z - from_pt.z dist_2d = math.sqrt(dx**2 + dy**2) dist_3d = math.sqrt(dx**2 + dy**2 + dz**2) if obs.obs_type == 'direction': azimuth_calc = math.atan2(dx, dy) * 200.0 / math.pi if azimuth_calc < 0: azimuth_calc += 400.0 # Berücksichtige Orientierung station_orientation = self.orientations.get(obs.from_station, 0.0) direction_calc = azimuth_calc - station_orientation while direction_calc < 0: direction_calc += 400.0 while direction_calc >= 400: direction_calc -= 400.0 residual = obs.value - direction_calc while residual > 200: residual -= 400 while residual < -200: residual += 400 obs.residual = residual elif obs.obs_type == 'distance': obs.residual = obs.value - dist_2d # 2D Strecke verwenden elif obs.obs_type == 'zenith': if dist_2d > 0: zenith_calc = math.atan2(dist_2d, dz) * 200.0 / math.pi obs.residual = obs.value - zenith_calc def _compute_accuracy(self, point_index: Dict[str, int], num_unknowns: int): """Berechnet Genauigkeitsmaße für ausgeglichene Punkte""" # Vereinfachte Berechnung der Standardabweichungen # Vollständige Varianzfortpflanzung würde Inverse von N erfordern # Erstelle Normalgleichungsmatrix erneut A, l, P = self._build_normal_equations(point_index, num_unknowns) N = A.T @ np.diag(P) @ A try: Q = np.linalg.inv(N) # Kofaktormatrix except np.linalg.LinAlgError: Q = np.eye(num_unknowns) * 0.001 # A-posteriori Varianzfaktor v = A @ np.zeros(num_unknowns) - l # Residuen (vereinfacht) redundancy = len(l) - num_unknowns if redundancy > 0: sum_pvv = np.sum(P * v**2) sigma_0_post = math.sqrt(sum_pvv / redundancy) else: sigma_0_post = self.sigma_0_priori self.sigma_0_posteriori = sigma_0_post # Punktgenauigkeiten for name, idx in point_index.items(): i = idx * 2 # Standardabweichungen self.points[name].std_x = sigma_0_post * math.sqrt(abs(Q[i, i])) self.points[name].std_y = sigma_0_post * math.sqrt(abs(Q[i + 1, i + 1])) # Punktlagegenauigkeit self.points[name].std_position = math.sqrt( self.points[name].std_x**2 + self.points[name].std_y**2 ) # Konfidenzellipse (vereinfacht) # Vollständige Berechnung würde Eigenwertanalyse erfordern cov_xy = Q[i, i + 1] if i + 1 < num_unknowns else 0 var_x = Q[i, i] var_y = Q[i + 1, i + 1] # Eigenwerte für Ellipse trace = var_x + var_y det = var_x * var_y - cov_xy**2 discriminant = trace**2 / 4 - det if discriminant >= 0: sqrt_disc = math.sqrt(discriminant) lambda1 = trace / 2 + sqrt_disc lambda2 = trace / 2 - sqrt_disc self.points[name].semi_major = sigma_0_post * math.sqrt(max(lambda1, 0)) self.points[name].semi_minor = sigma_0_post * math.sqrt(max(lambda2, 0)) if abs(var_x - var_y) > 1e-10: self.points[name].orientation = 0.5 * math.atan2(2 * cov_xy, var_x - var_y) * 200 / math.pi def _compute_global_statistics(self): """Berechnet globale Statistiken""" if self.result is None: return # RMSE für verschiedene Beobachtungstypen dir_residuals = [o.residual for o in self.observations if o.obs_type == 'direction'] dist_residuals = [o.residual for o in self.observations if o.obs_type == 'distance'] zen_residuals = [o.residual for o in self.observations if o.obs_type == 'zenith'] if dir_residuals: self.result.rmse_directions = math.sqrt(sum(r**2 for r in dir_residuals) / len(dir_residuals)) * 1000 # mgon if dist_residuals: self.result.rmse_distances = math.sqrt(sum(r**2 for r in dist_residuals) / len(dist_residuals)) * 1000 # mm if zen_residuals: self.result.rmse_zenith = math.sqrt(sum(r**2 for r in zen_residuals) / len(zen_residuals)) * 1000 # mgon self.result.sigma_0_posteriori = self.sigma_0_posteriori # Chi-Quadrat-Test if self.result.redundancy > 0: self.result.chi_square = (self.sigma_0_posteriori / self.sigma_0_priori) ** 2 * self.result.redundancy def get_adjustment_report(self) -> str: """Erstellt einen detaillierten Ausgleichungsbericht""" if self.result is None: return "Keine Ausgleichung durchgeführt." lines = [] lines.append("=" * 80) lines.append("NETZAUSGLEICHUNG - ERGEBNISBERICHT") lines.append("=" * 80) lines.append("") # Allgemeine Informationen lines.append("ALLGEMEINE INFORMATIONEN") lines.append("-" * 80) lines.append(f"Job: {self.parser.job_name}") lines.append(f"Anzahl Punkte: {self.result.num_points}") lines.append(f" davon Festpunkte: {self.result.num_fixed_points}") lines.append(f" davon Neupunkte: {self.result.num_points - self.result.num_fixed_points}") lines.append(f"Anzahl Beobachtungen: {self.result.num_observations}") lines.append(f"Anzahl Unbekannte: {self.result.num_unknowns}") lines.append(f"Redundanz: {self.result.redundancy}") lines.append(f"Iterationen: {self.result.iterations}") lines.append(f"Konvergiert: {'Ja' if self.result.converged else 'Nein'}") lines.append("") # Qualitätsparameter lines.append("GLOBALE QUALITÄTSPARAMETER") lines.append("-" * 80) lines.append(f"Sigma-0 a-priori: {self.sigma_0_priori:.4f}") lines.append(f"Sigma-0 a-posteriori: {self.result.sigma_0_posteriori:.4f}") lines.append(f"Chi-Quadrat: {self.result.chi_square:.2f}") lines.append(f"RMSE Richtungen: {self.result.rmse_directions:.2f} mgon") lines.append(f"RMSE Strecken: {self.result.rmse_distances:.2f} mm") lines.append(f"RMSE Zenitwinkel: {self.result.rmse_zenith:.2f} mgon") lines.append("") # Festpunkte lines.append("FESTPUNKTE") lines.append("-" * 80) lines.append(f"{'Punkt':<12} {'X [m]':>14} {'Y [m]':>14} {'Z [m]':>12}") lines.append("-" * 80) for name in self.fixed_points: if name in self.points: p = self.points[name] lines.append(f"{name:<12} {p.x:>14.4f} {p.y:>14.4f} {p.z:>12.4f}") lines.append("") # Ausgeglichene Koordinaten lines.append("AUSGEGLICHENE KOORDINATEN (NEUPUNKTE)") lines.append("-" * 80) lines.append(f"{'Punkt':<12} {'X [m]':>14} {'Y [m]':>14} {'Z [m]':>12} {'σX [mm]':>10} {'σY [mm]':>10} {'σPos [mm]':>10}") lines.append("-" * 80) for name, p in sorted(self.points.items()): if name not in self.fixed_points: lines.append(f"{name:<12} {p.x:>14.4f} {p.y:>14.4f} {p.z:>12.4f} " f"{p.std_x*1000:>10.2f} {p.std_y*1000:>10.2f} {p.std_position*1000:>10.2f}") lines.append("") # Beobachtungen und Residuen lines.append("BEOBACHTUNGEN UND VERBESSERUNGEN") lines.append("-" * 80) lines.append(f"{'Von':<10} {'Nach':<10} {'Typ':<10} {'Messwert':>14} {'Residuum':>12}") lines.append("-" * 80) for obs in sorted(self.observations, key=lambda x: (x.from_station, x.to_point)): if obs.obs_type == 'direction': unit = "gon" res_str = f"{obs.residual*1000:.2f} mgon" elif obs.obs_type == 'distance': unit = "m" res_str = f"{obs.residual*1000:.2f} mm" elif obs.obs_type == 'zenith': unit = "gon" res_str = f"{obs.residual*1000:.2f} mgon" else: unit = "" res_str = f"{obs.residual:.4f}" lines.append(f"{obs.from_station:<10} {obs.to_point:<10} {obs.obs_type:<10} " f"{obs.value:>12.4f} {unit:<2} {res_str:>12}") lines.append("") lines.append("=" * 80) return "\n".join(lines) def export_adjusted_points(self, filepath: str, format: str = 'csv'): """Exportiert ausgeglichene Punkte""" if format == 'csv': lines = ["Punkt;X;Y;Z;Sigma_X;Sigma_Y;Sigma_Pos;Festpunkt"] for name, p in sorted(self.points.items()): fixed = "Ja" if p.is_fixed else "Nein" lines.append(f"{name};{p.x:.4f};{p.y:.4f};{p.z:.4f};" f"{p.std_x*1000:.2f};{p.std_y*1000:.2f};{p.std_position*1000:.2f};{fixed}") else: lines = [] for name, p in sorted(self.points.items()): lines.append(f"{name}\t{p.x:.4f}\t{p.y:.4f}\t{p.z:.4f}") with open(filepath, 'w', encoding='utf-8') as f: f.write("\n".join(lines)) def export_report(self, filepath: str): """Exportiert den vollständigen Bericht""" report = self.get_adjustment_report() with open(filepath, 'w', encoding='utf-8') as f: f.write(report) def get_point_comparison(self) -> List[Dict]: """ Erstellt Punktevergleich: Koordinaten vor und nach der Ausgleichung Gibt Liste von Dicts zurück, sortiert nach größter Abweichung """ comparison = [] for name, pt in self.points.items(): if name in self.coords_before: x_before, y_before, z_before = self.coords_before[name] x_after, y_after, z_after = pt.x, pt.y, pt.z dx = x_after - x_before dy = y_after - y_before dz = z_after - z_before d3d = math.sqrt(dx**2 + dy**2 + dz**2) d2d = math.sqrt(dx**2 + dy**2) comparison.append({ 'name': name, 'x_before': x_before, 'y_before': y_before, 'z_before': z_before, 'x_after': x_after, 'y_after': y_after, 'z_after': z_after, 'dx': dx, 'dy': dy, 'dz': dz, 'd2d': d2d, 'd3d': d3d, 'is_fixed': pt.is_fixed }) # Sortiert nach größter 3D-Abweichung comparison.sort(key=lambda x: x['d3d'], reverse=True) return comparison def get_comparison_report(self) -> str: """Erstellt einen Punktevergleichs-Bericht""" comparison = self.get_point_comparison() lines = [] lines.append("=" * 120) lines.append("PUNKTEVERGLEICH: VORHER vs. NACHHER") lines.append("=" * 120) lines.append("") lines.append(f"{'Punkt':<10} {'X_vorher':>12} {'Y_vorher':>12} {'Z_vorher':>10} | " f"{'X_nachher':>12} {'Y_nachher':>12} {'Z_nachher':>10} | " f"{'ΔX [mm]':>10} {'ΔY [mm]':>10} {'ΔZ [mm]':>10} {'Δ3D [mm]':>10}") lines.append("-" * 120) for c in comparison: fixed_marker = "*" if c['is_fixed'] else " " lines.append(f"{c['name']:<9}{fixed_marker} " f"{c['x_before']:>12.4f} {c['y_before']:>12.4f} {c['z_before']:>10.4f} | " f"{c['x_after']:>12.4f} {c['y_after']:>12.4f} {c['z_after']:>10.4f} | " f"{c['dx']*1000:>10.2f} {c['dy']*1000:>10.2f} {c['dz']*1000:>10.2f} {c['d3d']*1000:>10.2f}") lines.append("") lines.append("* = Festpunkt (nicht ausgeglichen)") lines.append("") # Plausibilitätsprüfung lines.append("=" * 80) lines.append("PLAUSIBILITÄTSPRÜFUNG") lines.append("=" * 80) non_fixed = [c for c in comparison if not c['is_fixed']] critical_errors = [c for c in non_fixed if c['d3d'] > 1.0] warnings = [c for c in non_fixed if 0.1 < c['d3d'] <= 1.0] if critical_errors: lines.append("") lines.append("🚨 KRITISCHE FEHLER (Abweichung > 1m):") for c in critical_errors[:10]: # Max 10 anzeigen lines.append(f" ❌ {c['name']}: {c['d3d']*1000:.1f} mm") if len(critical_errors) > 10: lines.append(f" ... und {len(critical_errors)-10} weitere") if warnings: lines.append("") lines.append("⚠️ WARNUNGEN (Abweichung > 10cm):") for c in warnings[:10]: lines.append(f" ⚠ {c['name']}: {c['d3d']*1000:.1f} mm") if len(warnings) > 10: lines.append(f" ... und {len(warnings)-10} weitere") # Statistik if non_fixed: d3d_values = [c['d3d'] for c in non_fixed] dx_values = [abs(c['dx']) for c in non_fixed] dy_values = [abs(c['dy']) for c in non_fixed] lines.append("") lines.append("### Statistik (nur Neupunkte):") lines.append(f" Anzahl Punkte: {len(non_fixed)}") lines.append(f" Min Δ3D: {min(d3d_values)*1000:.2f} mm") lines.append(f" Max Δ3D: {max(d3d_values)*1000:.2f} mm") lines.append(f" Mittel Δ3D: {sum(d3d_values)/len(d3d_values)*1000:.2f} mm") lines.append(f" Max |ΔX|: {max(dx_values)*1000:.2f} mm") lines.append(f" Max |ΔY|: {max(dy_values)*1000:.2f} mm") if not critical_errors and not warnings: lines.append("") lines.append("✅ Alle Abweichungen im akzeptablen Bereich (< 10cm)") return "\n".join(lines) def export_comparison(self, filepath: str, format: str = 'csv'): """Exportiert den Punktevergleich""" comparison = self.get_point_comparison() if format == 'csv': lines = ["Punkt;X_vorher;Y_vorher;Z_vorher;X_nachher;Y_nachher;Z_nachher;DX_mm;DY_mm;DZ_mm;D3D_mm;Festpunkt"] for c in comparison: fixed = "Ja" if c['is_fixed'] else "Nein" lines.append(f"{c['name']};{c['x_before']:.4f};{c['y_before']:.4f};{c['z_before']:.4f};" f"{c['x_after']:.4f};{c['y_after']:.4f};{c['z_after']:.4f};" f"{c['dx']*1000:.2f};{c['dy']*1000:.2f};{c['dz']*1000:.2f};{c['d3d']*1000:.2f};{fixed}") else: lines = [self.get_comparison_report()] with open(filepath, 'w', encoding='utf-8') as f: f.write("\n".join(lines))