""" Netzausgleichungs-Modul Methode der kleinsten Quadrate für geodätische Netze Basiert auf Beobachtungen aus JXL-Dateien """ import math import numpy as np from scipy import sparse from scipy.sparse.linalg import spsolve from typing import List, Dict, Tuple, Optional, Set from dataclasses import dataclass, field from .jxl_parser import JXLParser, Point, Station @dataclass class Observation: """Repräsentiert eine geodätische Beobachtung""" obs_type: str # 'direction', 'distance', 'zenith', 'height_diff' from_station: str to_point: str value: float # Messwert (Gon, Meter, etc.) std_dev: float # Standardabweichung residual: float = 0.0 # Verbesserung weight: float = 1.0 def __post_init__(self): if self.std_dev > 0: self.weight = 1.0 / (self.std_dev ** 2) @dataclass class AdjustedPoint: """Ausgeglichener Punkt mit Genauigkeitsangaben""" name: str x: float # East y: float # North z: float # Elevation # A-priori Koordinaten x_approx: float = 0.0 y_approx: float = 0.0 z_approx: float = 0.0 # Korrekturen dx: float = 0.0 dy: float = 0.0 dz: float = 0.0 # Standardabweichungen std_x: float = 0.0 std_y: float = 0.0 std_z: float = 0.0 std_position: float = 0.0 # 2D Punktlagegenauigkeit # Konfidenzellipse semi_major: float = 0.0 semi_minor: float = 0.0 orientation: float = 0.0 # in Gon # Status is_fixed: bool = False @dataclass class AdjustmentResult: """Ergebnis der Netzausgleichung""" # Ausgeglichene Punkte adjusted_points: Dict[str, AdjustedPoint] = field(default_factory=dict) # Beobachtungen mit Residuen observations: List[Observation] = field(default_factory=list) # Globale Qualitätsparameter sigma_0_priori: float = 1.0 sigma_0_posteriori: float = 0.0 chi_square: float = 0.0 redundancy: int = 0 # RMSE der Residuen rmse_directions: float = 0.0 # in mgon rmse_distances: float = 0.0 # in mm rmse_zenith: float = 0.0 # in mgon # Iterationsinfo iterations: int = 0 converged: bool = False # Statistiken num_points: int = 0 num_fixed_points: int = 0 num_observations: int = 0 num_unknowns: int = 0 class NetworkAdjustment: """ Netzausgleichung nach der Methode der kleinsten Quadrate Gauß-Markov-Modell mit Beobachtungsgleichungen """ def __init__(self, parser: JXLParser): self.parser = parser self.observations: List[Observation] = [] self.points: Dict[str, AdjustedPoint] = {} self.fixed_points: Set[str] = set() self.result: Optional[AdjustmentResult] = None # Konfiguration self.max_iterations: int = 20 self.convergence_limit: float = 1e-8 # Meter self.sigma_0_priori: float = 1.0 # Standard-Genauigkeiten (falls nicht aus JXL) self.default_std_direction: float = 0.0003 # Gon (0.3 mgon) self.default_std_distance: float = 0.002 # Meter self.default_std_zenith: float = 0.0003 # Gon def extract_observations(self): """Extrahiert Beobachtungen aus JXL-Daten""" self.observations = [] for station_id, station in self.parser.stations.items(): # Messungen von dieser Station measurements = self.parser.get_measurements_from_station(station_id) # Backbearing für Orientierung finden orientation = 0.0 for bb_id, bb in self.parser.backbearings.items(): if bb.station_record_id == station_id: if bb.orientation_correction is not None: orientation = bb.orientation_correction break for meas in measurements: if meas.name and not meas.deleted: # Richtung if meas.horizontal_circle is not None: std = meas.hz_std_error if meas.hz_std_error else self.default_std_direction azimuth = meas.horizontal_circle + orientation self.observations.append(Observation( obs_type='direction', from_station=station.name, to_point=meas.name, value=azimuth, std_dev=std )) # Strecke if meas.edm_distance is not None: std = meas.dist_std_error if meas.dist_std_error else self.default_std_distance # Prismenkonstante berücksichtigen prism_const = 0.0 if meas.target_id in self.parser.targets: prism_const = self.parser.targets[meas.target_id].prism_constant distance = meas.edm_distance + prism_const self.observations.append(Observation( obs_type='distance', from_station=station.name, to_point=meas.name, value=distance, std_dev=std )) # Zenitwinkel if meas.vertical_circle is not None: std = meas.vz_std_error if meas.vz_std_error else self.default_std_zenith self.observations.append(Observation( obs_type='zenith', from_station=station.name, to_point=meas.name, value=meas.vertical_circle, std_dev=std )) return self.observations def initialize_points(self): """Initialisiert Näherungskoordinaten aus JXL-Daten""" self.points = {} # Alle aktiven Punkte for name, point in self.parser.get_active_points().items(): self.points[name] = AdjustedPoint( name=name, x=point.east if point.east is not None else 0.0, y=point.north if point.north is not None else 0.0, z=point.elevation if point.elevation is not None else 0.0, x_approx=point.east if point.east is not None else 0.0, y_approx=point.north if point.north is not None else 0.0, z_approx=point.elevation if point.elevation is not None else 0.0 ) # Stationen hinzufügen for station_id, station in self.parser.stations.items(): if station.name not in self.points: self.points[station.name] = AdjustedPoint( name=station.name, x=station.east if station.east is not None else 0.0, y=station.north if station.north is not None else 0.0, z=station.elevation if station.elevation is not None else 0.0, x_approx=station.east if station.east is not None else 0.0, y_approx=station.north if station.north is not None else 0.0, z_approx=station.elevation if station.elevation is not None else 0.0 ) def set_fixed_point(self, point_name: str): """Setzt einen Punkt als Festpunkt""" if point_name in self.points: self.fixed_points.add(point_name) self.points[point_name].is_fixed = True def set_fixed_points_auto(self): """Setzt automatisch Festpunkte (Referenzpunkte)""" # Referenzlinie als Festpunkte ref_line = self.parser.get_reference_line() if ref_line: self.set_fixed_point(ref_line.start_point) self.set_fixed_point(ref_line.end_point) # Zusätzlich: Erste Station als Festpunkt falls keine Referenzlinie if not self.fixed_points: stations = list(self.parser.stations.values()) if stations: first_station = min(stations, key=lambda s: s.timestamp) self.set_fixed_point(first_station.name) def adjust(self) -> AdjustmentResult: """ Führt die Netzausgleichung durch Iterative Lösung nach Gauß-Newton """ if not self.observations: self.extract_observations() if not self.points: self.initialize_points() if not self.fixed_points: self.set_fixed_points_auto() # Unbekannte bestimmen (nur nicht-fixierte Punkte) unknown_points = [name for name in self.points.keys() if name not in self.fixed_points] num_unknowns = len(unknown_points) * 2 # Nur X, Y (2D-Ausgleichung) num_observations = len([o for o in self.observations if o.obs_type in ['direction', 'distance']]) if num_unknowns == 0: raise ValueError("Keine unbekannten Punkte!") if num_observations < num_unknowns: raise ValueError(f"Unterbestimmtes System: {num_observations} Beobachtungen, " f"{num_unknowns} Unbekannte") # Index-Mapping für Unbekannte point_index = {name: i for i, name in enumerate(unknown_points)} # Iterative Lösung converged = False iteration = 0 while not converged and iteration < self.max_iterations: iteration += 1 # Designmatrix A und Beobachtungsvektor l erstellen A, l, P = self._build_normal_equations(point_index, num_unknowns) # Normalgleichungssystem: N = A^T * P * A, n = A^T * P * l N = A.T @ np.diag(P) @ A n = A.T @ np.diag(P) @ l # Lösung: x = N^-1 * n try: dx = np.linalg.solve(N, n) except np.linalg.LinAlgError: # Regularisierung bei singulärer Matrix N += np.eye(num_unknowns) * 1e-10 dx = np.linalg.solve(N, n) # Koordinaten aktualisieren max_correction = 0.0 for name, idx in point_index.items(): i = idx * 2 self.points[name].dx = dx[i] self.points[name].dy = dx[i + 1] self.points[name].x += dx[i] self.points[name].y += dx[i + 1] max_correction = max(max_correction, abs(dx[i]), abs(dx[i + 1])) # Konvergenzprüfung if max_correction < self.convergence_limit: converged = True # Nachbearbeitung self._compute_residuals() self._compute_accuracy(point_index, num_unknowns) # Ergebnis zusammenstellen self.result = AdjustmentResult( adjusted_points=self.points, observations=self.observations, sigma_0_priori=self.sigma_0_priori, iterations=iteration, converged=converged, num_points=len(self.points), num_fixed_points=len(self.fixed_points), num_observations=num_observations, num_unknowns=num_unknowns, redundancy=num_observations - num_unknowns ) self._compute_global_statistics() return self.result def _build_normal_equations(self, point_index: Dict[str, int], num_unknowns: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Erstellt Designmatrix A und Beobachtungsvektor l""" # Nur Richtungen und Strecken für 2D-Ausgleichung relevant_obs = [o for o in self.observations if o.obs_type in ['direction', 'distance']] n_obs = len(relevant_obs) A = np.zeros((n_obs, num_unknowns)) l = np.zeros(n_obs) P = np.zeros(n_obs) for i, obs in enumerate(relevant_obs): from_pt = self.points.get(obs.from_station) to_pt = self.points.get(obs.to_point) if from_pt is None or to_pt is None: continue dx = to_pt.x - from_pt.x dy = to_pt.y - from_pt.y dist = math.sqrt(dx**2 + dy**2) if dist < 1e-10: continue from_idx = point_index.get(obs.from_station) to_idx = point_index.get(obs.to_point) if obs.obs_type == 'direction': # Azimut berechnen azimuth_calc = math.atan2(dx, dy) * 200.0 / math.pi if azimuth_calc < 0: azimuth_calc += 400.0 # Partielle Ableitungen (in Gon) # dAz/dx = dy / (rho * s^2) # dAz/dy = -dx / (rho * s^2) rho = 200.0 / math.pi factor = rho / (dist ** 2) if from_idx is not None: A[i, from_idx * 2] = -dy * factor # dAz/dx_from A[i, from_idx * 2 + 1] = dx * factor # dAz/dy_from if to_idx is not None: A[i, to_idx * 2] = dy * factor # dAz/dx_to A[i, to_idx * 2 + 1] = -dx * factor # dAz/dy_to # Verkürzung l = beobachtet - berechnet diff = obs.value - azimuth_calc # Normalisierung auf [-200, 200] Gon while diff > 200: diff -= 400 while diff < -200: diff += 400 l[i] = diff elif obs.obs_type == 'distance': # Partielle Ableitungen # ds/dx = dx/s # ds/dy = dy/s if from_idx is not None: A[i, from_idx * 2] = -dx / dist A[i, from_idx * 2 + 1] = -dy / dist if to_idx is not None: A[i, to_idx * 2] = dx / dist A[i, to_idx * 2 + 1] = dy / dist # Verkürzung l[i] = obs.value - dist # Gewicht P[i] = obs.weight return A, l, P def _compute_residuals(self): """Berechnet Verbesserungen (Residuen) für alle Beobachtungen""" for obs in self.observations: from_pt = self.points.get(obs.from_station) to_pt = self.points.get(obs.to_point) if from_pt is None or to_pt is None: continue dx = to_pt.x - from_pt.x dy = to_pt.y - from_pt.y dz = to_pt.z - from_pt.z dist_2d = math.sqrt(dx**2 + dy**2) dist_3d = math.sqrt(dx**2 + dy**2 + dz**2) if obs.obs_type == 'direction': azimuth_calc = math.atan2(dx, dy) * 200.0 / math.pi if azimuth_calc < 0: azimuth_calc += 400.0 residual = obs.value - azimuth_calc while residual > 200: residual -= 400 while residual < -200: residual += 400 obs.residual = residual elif obs.obs_type == 'distance': obs.residual = obs.value - dist_3d elif obs.obs_type == 'zenith': if dist_2d > 0: zenith_calc = math.atan2(dist_2d, dz) * 200.0 / math.pi obs.residual = obs.value - zenith_calc def _compute_accuracy(self, point_index: Dict[str, int], num_unknowns: int): """Berechnet Genauigkeitsmaße für ausgeglichene Punkte""" # Vereinfachte Berechnung der Standardabweichungen # Vollständige Varianzfortpflanzung würde Inverse von N erfordern # Erstelle Normalgleichungsmatrix erneut A, l, P = self._build_normal_equations(point_index, num_unknowns) N = A.T @ np.diag(P) @ A try: Q = np.linalg.inv(N) # Kofaktormatrix except np.linalg.LinAlgError: Q = np.eye(num_unknowns) * 0.001 # A-posteriori Varianzfaktor v = A @ np.zeros(num_unknowns) - l # Residuen (vereinfacht) redundancy = len(l) - num_unknowns if redundancy > 0: sum_pvv = np.sum(P * v**2) sigma_0_post = math.sqrt(sum_pvv / redundancy) else: sigma_0_post = self.sigma_0_priori self.sigma_0_posteriori = sigma_0_post # Punktgenauigkeiten for name, idx in point_index.items(): i = idx * 2 # Standardabweichungen self.points[name].std_x = sigma_0_post * math.sqrt(abs(Q[i, i])) self.points[name].std_y = sigma_0_post * math.sqrt(abs(Q[i + 1, i + 1])) # Punktlagegenauigkeit self.points[name].std_position = math.sqrt( self.points[name].std_x**2 + self.points[name].std_y**2 ) # Konfidenzellipse (vereinfacht) # Vollständige Berechnung würde Eigenwertanalyse erfordern cov_xy = Q[i, i + 1] if i + 1 < num_unknowns else 0 var_x = Q[i, i] var_y = Q[i + 1, i + 1] # Eigenwerte für Ellipse trace = var_x + var_y det = var_x * var_y - cov_xy**2 discriminant = trace**2 / 4 - det if discriminant >= 0: sqrt_disc = math.sqrt(discriminant) lambda1 = trace / 2 + sqrt_disc lambda2 = trace / 2 - sqrt_disc self.points[name].semi_major = sigma_0_post * math.sqrt(max(lambda1, 0)) self.points[name].semi_minor = sigma_0_post * math.sqrt(max(lambda2, 0)) if abs(var_x - var_y) > 1e-10: self.points[name].orientation = 0.5 * math.atan2(2 * cov_xy, var_x - var_y) * 200 / math.pi def _compute_global_statistics(self): """Berechnet globale Statistiken""" if self.result is None: return # RMSE für verschiedene Beobachtungstypen dir_residuals = [o.residual for o in self.observations if o.obs_type == 'direction'] dist_residuals = [o.residual for o in self.observations if o.obs_type == 'distance'] zen_residuals = [o.residual for o in self.observations if o.obs_type == 'zenith'] if dir_residuals: self.result.rmse_directions = math.sqrt(sum(r**2 for r in dir_residuals) / len(dir_residuals)) * 1000 # mgon if dist_residuals: self.result.rmse_distances = math.sqrt(sum(r**2 for r in dist_residuals) / len(dist_residuals)) * 1000 # mm if zen_residuals: self.result.rmse_zenith = math.sqrt(sum(r**2 for r in zen_residuals) / len(zen_residuals)) * 1000 # mgon self.result.sigma_0_posteriori = self.sigma_0_posteriori # Chi-Quadrat-Test if self.result.redundancy > 0: self.result.chi_square = (self.sigma_0_posteriori / self.sigma_0_priori) ** 2 * self.result.redundancy def get_adjustment_report(self) -> str: """Erstellt einen detaillierten Ausgleichungsbericht""" if self.result is None: return "Keine Ausgleichung durchgeführt." lines = [] lines.append("=" * 80) lines.append("NETZAUSGLEICHUNG - ERGEBNISBERICHT") lines.append("=" * 80) lines.append("") # Allgemeine Informationen lines.append("ALLGEMEINE INFORMATIONEN") lines.append("-" * 80) lines.append(f"Job: {self.parser.job_name}") lines.append(f"Anzahl Punkte: {self.result.num_points}") lines.append(f" davon Festpunkte: {self.result.num_fixed_points}") lines.append(f" davon Neupunkte: {self.result.num_points - self.result.num_fixed_points}") lines.append(f"Anzahl Beobachtungen: {self.result.num_observations}") lines.append(f"Anzahl Unbekannte: {self.result.num_unknowns}") lines.append(f"Redundanz: {self.result.redundancy}") lines.append(f"Iterationen: {self.result.iterations}") lines.append(f"Konvergiert: {'Ja' if self.result.converged else 'Nein'}") lines.append("") # Qualitätsparameter lines.append("GLOBALE QUALITÄTSPARAMETER") lines.append("-" * 80) lines.append(f"Sigma-0 a-priori: {self.sigma_0_priori:.4f}") lines.append(f"Sigma-0 a-posteriori: {self.result.sigma_0_posteriori:.4f}") lines.append(f"Chi-Quadrat: {self.result.chi_square:.2f}") lines.append(f"RMSE Richtungen: {self.result.rmse_directions:.2f} mgon") lines.append(f"RMSE Strecken: {self.result.rmse_distances:.2f} mm") lines.append(f"RMSE Zenitwinkel: {self.result.rmse_zenith:.2f} mgon") lines.append("") # Festpunkte lines.append("FESTPUNKTE") lines.append("-" * 80) lines.append(f"{'Punkt':<12} {'X [m]':>14} {'Y [m]':>14} {'Z [m]':>12}") lines.append("-" * 80) for name in self.fixed_points: if name in self.points: p = self.points[name] lines.append(f"{name:<12} {p.x:>14.4f} {p.y:>14.4f} {p.z:>12.4f}") lines.append("") # Ausgeglichene Koordinaten lines.append("AUSGEGLICHENE KOORDINATEN (NEUPUNKTE)") lines.append("-" * 80) lines.append(f"{'Punkt':<12} {'X [m]':>14} {'Y [m]':>14} {'Z [m]':>12} {'σX [mm]':>10} {'σY [mm]':>10} {'σPos [mm]':>10}") lines.append("-" * 80) for name, p in sorted(self.points.items()): if name not in self.fixed_points: lines.append(f"{name:<12} {p.x:>14.4f} {p.y:>14.4f} {p.z:>12.4f} " f"{p.std_x*1000:>10.2f} {p.std_y*1000:>10.2f} {p.std_position*1000:>10.2f}") lines.append("") # Beobachtungen und Residuen lines.append("BEOBACHTUNGEN UND VERBESSERUNGEN") lines.append("-" * 80) lines.append(f"{'Von':<10} {'Nach':<10} {'Typ':<10} {'Messwert':>14} {'Residuum':>12}") lines.append("-" * 80) for obs in sorted(self.observations, key=lambda x: (x.from_station, x.to_point)): if obs.obs_type == 'direction': unit = "gon" res_str = f"{obs.residual*1000:.2f} mgon" elif obs.obs_type == 'distance': unit = "m" res_str = f"{obs.residual*1000:.2f} mm" elif obs.obs_type == 'zenith': unit = "gon" res_str = f"{obs.residual*1000:.2f} mgon" else: unit = "" res_str = f"{obs.residual:.4f}" lines.append(f"{obs.from_station:<10} {obs.to_point:<10} {obs.obs_type:<10} " f"{obs.value:>12.4f} {unit:<2} {res_str:>12}") lines.append("") lines.append("=" * 80) return "\n".join(lines) def export_adjusted_points(self, filepath: str, format: str = 'csv'): """Exportiert ausgeglichene Punkte""" if format == 'csv': lines = ["Punkt;X;Y;Z;Sigma_X;Sigma_Y;Sigma_Pos;Festpunkt"] for name, p in sorted(self.points.items()): fixed = "Ja" if p.is_fixed else "Nein" lines.append(f"{name};{p.x:.4f};{p.y:.4f};{p.z:.4f};" f"{p.std_x*1000:.2f};{p.std_y*1000:.2f};{p.std_position*1000:.2f};{fixed}") else: lines = [] for name, p in sorted(self.points.items()): lines.append(f"{name}\t{p.x:.4f}\t{p.y:.4f}\t{p.z:.4f}") with open(filepath, 'w', encoding='utf-8') as f: f.write("\n".join(lines)) def export_report(self, filepath: str): """Exportiert den vollständigen Bericht""" report = self.get_adjustment_report() with open(filepath, 'w', encoding='utf-8') as f: f.write(report)