trimble_geodesy/modules/network_adjustment.py

1035 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Netzausgleichungs-Modul
Methode der kleinsten Quadrate für geodätische Netze
Basiert auf Beobachtungen aus JXL-Dateien
"""
import math
import numpy as np
from scipy import sparse
from scipy.sparse.linalg import spsolve
from typing import List, Dict, Tuple, Optional, Set
from dataclasses import dataclass, field
from .jxl_parser import JXLParser, Point, Station
@dataclass
class Observation:
"""Repräsentiert eine geodätische Beobachtung"""
obs_type: str # 'direction', 'distance', 'zenith', 'height_diff'
from_station: str
to_point: str
value: float # Messwert (Gon, Meter, etc.)
std_dev: float # Standardabweichung
residual: float = 0.0 # Verbesserung
weight: float = 1.0
def __post_init__(self):
if self.std_dev > 0:
self.weight = 1.0 / (self.std_dev ** 2)
@dataclass
class AdjustedPoint:
"""Ausgeglichener Punkt mit Genauigkeitsangaben"""
name: str
x: float # East
y: float # North
z: float # Elevation
# A-priori Koordinaten
x_approx: float = 0.0
y_approx: float = 0.0
z_approx: float = 0.0
# Korrekturen
dx: float = 0.0
dy: float = 0.0
dz: float = 0.0
# Standardabweichungen
std_x: float = 0.0
std_y: float = 0.0
std_z: float = 0.0
std_position: float = 0.0 # 2D Punktlagegenauigkeit
# Konfidenzellipse
semi_major: float = 0.0
semi_minor: float = 0.0
orientation: float = 0.0 # in Gon
# Status
is_fixed: bool = False
@dataclass
class AdjustmentResult:
"""Ergebnis der Netzausgleichung"""
# Ausgeglichene Punkte
adjusted_points: Dict[str, AdjustedPoint] = field(default_factory=dict)
# Beobachtungen mit Residuen
observations: List[Observation] = field(default_factory=list)
# Globale Qualitätsparameter
sigma_0_priori: float = 1.0
sigma_0_posteriori: float = 0.0
chi_square: float = 0.0
redundancy: int = 0
# RMSE der Residuen
rmse_directions: float = 0.0 # in mgon
rmse_distances: float = 0.0 # in mm
rmse_zenith: float = 0.0 # in mgon
# Iterationsinfo
iterations: int = 0
converged: bool = False
# Statistiken
num_points: int = 0
num_fixed_points: int = 0
num_observations: int = 0
num_unknowns: int = 0
class NetworkAdjustment:
"""
Netzausgleichung nach der Methode der kleinsten Quadrate
Gauß-Markov-Modell mit Beobachtungsgleichungen
KORRIGIERT: Mit Orientierungsunbekannten pro Station
"""
def __init__(self, parser: JXLParser):
self.parser = parser
self.observations: List[Observation] = []
self.points: Dict[str, AdjustedPoint] = {}
self.fixed_points: Set[str] = set()
self.result: Optional[AdjustmentResult] = None
# Orientierungsunbekannte pro Station
self.orientations: Dict[str, float] = {} # Station -> Orientierungskorrektur in Gon
self.station_names: List[str] = []
# Speichere ursprüngliche Koordinaten für Vergleich
self.coords_before: Dict[str, Tuple[float, float, float]] = {}
# Konfiguration
self.max_iterations: int = 20
self.convergence_limit: float = 1e-8 # Meter
self.sigma_0_priori: float = 1.0
# Standard-Genauigkeiten (falls nicht aus JXL)
self.default_std_direction: float = 0.0003 # Gon (0.3 mgon)
self.default_std_distance: float = 0.002 # Meter
self.default_std_zenith: float = 0.0003 # Gon
def extract_observations(self):
"""
Extrahiert Beobachtungen aus JXL-Daten
WICHTIG: Richtungen werden als ROHE Kreislesungen extrahiert (ohne Orientierungskorrektur),
da die Orientierung als Unbekannte geschätzt wird.
"""
self.observations = []
for station_id, station in self.parser.stations.items():
# Messungen von dieser Station
measurements = self.parser.get_measurements_from_station(station_id)
for meas in measurements:
if meas.name and not meas.deleted:
# Richtung - OHNE Orientierungskorrektur (wird als Unbekannte geschätzt)
if meas.horizontal_circle is not None:
std = meas.hz_std_error if meas.hz_std_error else self.default_std_direction
self.observations.append(Observation(
obs_type='direction',
from_station=station.name,
to_point=meas.name,
value=meas.horizontal_circle, # Rohe Kreislesung
std_dev=std
))
# Strecke (Horizontalstrecke aus 3D-Messung)
if meas.edm_distance is not None and meas.vertical_circle is not None:
std = meas.dist_std_error if meas.dist_std_error else self.default_std_distance
# Prismenkonstante berücksichtigen
prism_const = 0.0
if meas.target_id in self.parser.targets:
prism_const = self.parser.targets[meas.target_id].prism_constant
# Schrägstrecke
slope_distance = meas.edm_distance + prism_const
# Horizontalstrecke aus Schrägstrecke und Zenitwinkel
zenith_rad = meas.vertical_circle * math.pi / 200.0
horizontal_distance = slope_distance * math.sin(zenith_rad)
self.observations.append(Observation(
obs_type='distance',
from_station=station.name,
to_point=meas.name,
value=horizontal_distance, # Horizontalstrecke
std_dev=std
))
# Zenitwinkel
if meas.vertical_circle is not None:
std = meas.vz_std_error if meas.vz_std_error else self.default_std_zenith
self.observations.append(Observation(
obs_type='zenith',
from_station=station.name,
to_point=meas.name,
value=meas.vertical_circle,
std_dev=std
))
return self.observations
def initialize_points(self):
"""Initialisiert Näherungskoordinaten aus JXL-Daten"""
self.points = {}
self.coords_before = {}
self.station_names = []
self.orientations = {}
# Alle aktiven Punkte
for name, point in self.parser.get_active_points().items():
x = point.east if point.east is not None else 0.0
y = point.north if point.north is not None else 0.0
z = point.elevation if point.elevation is not None else 0.0
self.points[name] = AdjustedPoint(
name=name,
x=x, y=y, z=z,
x_approx=x, y_approx=y, z_approx=z
)
self.coords_before[name] = (x, y, z)
# Stationen hinzufügen
for station_id, station in self.parser.stations.items():
if station.name not in self.points:
x = station.east if station.east is not None else 0.0
y = station.north if station.north is not None else 0.0
z = station.elevation if station.elevation is not None else 0.0
self.points[station.name] = AdjustedPoint(
name=station.name,
x=x, y=y, z=z,
x_approx=x, y_approx=y, z_approx=z
)
self.coords_before[station.name] = (x, y, z)
if station.name not in self.station_names:
self.station_names.append(station.name)
# Orientierungen aus bekannten Koordinaten und Beobachtungen berechnen
self._compute_initial_orientations()
def _compute_initial_orientations(self):
"""
Berechnet initiale Orientierungen aus den bekannten Koordinaten.
Dies ist wichtig, da die JXL-Koordinaten bereits fertig berechnet sind.
"""
for station_name in self.station_names:
orientations_for_station = []
for obs in self.observations:
if obs.obs_type == 'direction' and obs.from_station == station_name:
from_pt = self.points.get(station_name)
to_pt = self.points.get(obs.to_point)
if from_pt and to_pt:
dx = to_pt.x - from_pt.x
dy = to_pt.y - from_pt.y
dist = math.sqrt(dx**2 + dy**2)
if dist > 0.01: # Mindestentfernung
# Azimut aus Koordinaten (geodätisch: atan2(dx, dy))
azimuth = math.atan2(dx, dy) * 200.0 / math.pi
if azimuth < 0:
azimuth += 400.0
# Orientierung = Azimut - Richtung
orientation = azimuth - obs.value
# Normalisieren auf [0, 400] gon
while orientation < 0:
orientation += 400.0
while orientation >= 400:
orientation -= 400.0
orientations_for_station.append(orientation)
# Mittlere Orientierung verwenden (robust gegen Ausreißer)
if orientations_for_station:
# Kreismittel berechnen (für Winkel)
sin_sum = sum(math.sin(o * math.pi / 200) for o in orientations_for_station)
cos_sum = sum(math.cos(o * math.pi / 200) for o in orientations_for_station)
mean_orientation = math.atan2(sin_sum, cos_sum) * 200 / math.pi
if mean_orientation < 0:
mean_orientation += 400.0
self.orientations[station_name] = mean_orientation
else:
self.orientations[station_name] = 0.0
def set_fixed_point(self, point_name: str):
"""Setzt einen Punkt als Festpunkt"""
if point_name in self.points:
self.fixed_points.add(point_name)
self.points[point_name].is_fixed = True
def set_fixed_points_auto(self):
"""Setzt automatisch Festpunkte (Referenzpunkte)"""
# Referenzlinie als Festpunkte
ref_line = self.parser.get_reference_line()
if ref_line:
self.set_fixed_point(ref_line.start_point)
self.set_fixed_point(ref_line.end_point)
# Zusätzlich: Erste Station als Festpunkt falls keine Referenzlinie
if not self.fixed_points:
stations = list(self.parser.stations.values())
if stations:
first_station = min(stations, key=lambda s: s.timestamp)
self.set_fixed_point(first_station.name)
def check_consistency(self) -> dict:
"""
Prüft die Konsistenz zwischen Koordinaten und Beobachtungen.
Gibt einen Bericht über die Qualität der Daten zurück.
"""
if not self.observations:
self.extract_observations()
if not self.points:
self.initialize_points()
result = {
'consistent': True,
'orientation_spread': {},
'distance_residuals': [],
'issues': []
}
# Prüfe Orientierungen pro Station
for station_name in self.station_names:
orientations = []
for obs in self.observations:
if obs.obs_type == 'direction' and obs.from_station == station_name:
from_pt = self.points.get(station_name)
to_pt = self.points.get(obs.to_point)
if from_pt and to_pt:
dx = to_pt.x - from_pt.x
dy = to_pt.y - from_pt.y
dist = math.sqrt(dx**2 + dy**2)
if dist > 0.01:
azimuth = math.atan2(dx, dy) * 200.0 / math.pi
if azimuth < 0:
azimuth += 400.0
ori = azimuth - obs.value
while ori < 0:
ori += 400.0
while ori >= 400:
ori -= 400.0
orientations.append(ori)
if orientations:
spread = max(orientations) - min(orientations)
result['orientation_spread'][station_name] = {
'min': min(orientations),
'max': max(orientations),
'spread': spread
}
if spread > 1.0: # > 1 gon Spannweite
result['consistent'] = False
result['issues'].append(f"Station {station_name}: Orientierungsspannweite {spread:.2f} gon")
return result
def adjust(self, mode: str = "residuals_only") -> AdjustmentResult:
"""
Führt die Netzausgleichung durch.
mode:
"residuals_only" - Berechnet nur Residuen, keine Koordinatenänderung (Standard)
"full" - Vollständige Ausgleichung mit Koordinatenkorrektur
"""
if not self.observations:
self.extract_observations()
if not self.points:
self.initialize_points()
if not self.fixed_points:
self.set_fixed_points_auto()
# Unbekannte bestimmen (nur nicht-fixierte Punkte)
unknown_points = [name for name in self.points.keys()
if name not in self.fixed_points]
num_observations = len([o for o in self.observations
if o.obs_type in ['direction', 'distance']])
if mode == "residuals_only":
# Nur Residuenberechnung - Koordinaten bleiben unverändert
# Orientierungen wurden bereits in initialize_points berechnet
# Residuen berechnen
self._compute_residuals()
# Sigma-0 aus Residuen berechnen
dir_residuals = [o.residual for o in self.observations if o.obs_type == 'direction']
dist_residuals = [o.residual for o in self.observations if o.obs_type == 'distance']
# Einfache Schätzung von sigma-0
if dir_residuals:
rmse_dir = math.sqrt(sum(r**2 for r in dir_residuals) / len(dir_residuals))
else:
rmse_dir = 0
if dist_residuals:
rmse_dist = math.sqrt(sum(r**2 for r in dist_residuals) / len(dist_residuals))
else:
rmse_dist = 0
self.sigma_0_posteriori = max(rmse_dir * 1000, rmse_dist * 1000) # vereinfacht
# Ergebnis zusammenstellen
self.result = AdjustmentResult(
adjusted_points=self.points,
observations=self.observations,
sigma_0_priori=self.sigma_0_priori,
iterations=0,
converged=True,
num_points=len(self.points),
num_fixed_points=len(self.fixed_points),
num_observations=num_observations,
num_unknowns=0,
redundancy=num_observations
)
self._compute_global_statistics()
return self.result
# Vollständige Ausgleichung (mode == "full")
# Anzahl Unbekannte: 2 pro Punkt (X,Y) + 1 Orientierung pro Station
num_point_unknowns = len(unknown_points) * 2
num_orientation_unknowns = len(self.station_names)
num_unknowns = num_point_unknowns + num_orientation_unknowns
if num_unknowns == 0:
raise ValueError("Keine unbekannten Punkte!")
if num_observations < num_unknowns:
raise ValueError(f"Unterbestimmtes System: {num_observations} Beobachtungen, "
f"{num_unknowns} Unbekannte")
# Index-Mapping für Unbekannte
point_index = {name: i for i, name in enumerate(unknown_points)}
orientation_index = {name: i for i, name in enumerate(self.station_names)}
# Iterative Lösung
converged = False
iteration = 0
while not converged and iteration < self.max_iterations:
iteration += 1
A, l, P = self._build_normal_equations_with_orientation(
point_index, orientation_index, num_point_unknowns, num_unknowns)
N = A.T @ np.diag(P) @ A
n = A.T @ np.diag(P) @ l
try:
dx = np.linalg.solve(N, n)
except np.linalg.LinAlgError:
N += np.eye(num_unknowns) * 1e-10
dx = np.linalg.solve(N, n)
max_correction = 0.0
for name, idx in point_index.items():
i = idx * 2
self.points[name].dx = dx[i]
self.points[name].dy = dx[i + 1]
self.points[name].x += dx[i]
self.points[name].y += dx[i + 1]
max_correction = max(max_correction,
abs(dx[i]), abs(dx[i + 1]))
for name, idx in orientation_index.items():
i = num_point_unknowns + idx
self.orientations[name] += dx[i]
if max_correction < self.convergence_limit:
converged = True
self._compute_residuals()
self._compute_accuracy(point_index, num_point_unknowns)
self.result = AdjustmentResult(
adjusted_points=self.points,
observations=self.observations,
sigma_0_priori=self.sigma_0_priori,
iterations=iteration,
converged=converged,
num_points=len(self.points),
num_fixed_points=len(self.fixed_points),
num_observations=num_observations,
num_unknowns=num_unknowns,
redundancy=num_observations - num_unknowns
)
self._compute_global_statistics()
return self.result
def _build_normal_equations_with_orientation(self, point_index: Dict[str, int],
orientation_index: Dict[str, int],
num_point_unknowns: int,
num_unknowns: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Erstellt Designmatrix A und Beobachtungsvektor l
MIT Orientierungsunbekannten pro Station
Unbekannte: [x1, y1, x2, y2, ..., o1, o2, ...]
"""
# Nur Richtungen und Strecken für 2D-Ausgleichung
relevant_obs = [o for o in self.observations
if o.obs_type in ['direction', 'distance']]
n_obs = len(relevant_obs)
A = np.zeros((n_obs, num_unknowns))
l = np.zeros(n_obs)
P = np.zeros(n_obs)
for i, obs in enumerate(relevant_obs):
from_pt = self.points.get(obs.from_station)
to_pt = self.points.get(obs.to_point)
if from_pt is None or to_pt is None:
continue
dx = to_pt.x - from_pt.x
dy = to_pt.y - from_pt.y
dist = math.sqrt(dx**2 + dy**2)
if dist < 1e-10:
continue
from_idx = point_index.get(obs.from_station)
to_idx = point_index.get(obs.to_point)
if obs.obs_type == 'direction':
# Azimut aus Koordinaten berechnen
azimuth_calc = math.atan2(dx, dy) * 200.0 / math.pi
if azimuth_calc < 0:
azimuth_calc += 400.0
# Orientierung der Station
station_orientation = self.orientations.get(obs.from_station, 0.0)
# Berechnete Richtung = Azimut - Orientierung
direction_calc = azimuth_calc - station_orientation
while direction_calc < 0:
direction_calc += 400.0
while direction_calc >= 400:
direction_calc -= 400.0
# Partielle Ableitungen für Richtungen (in Gon)
rho = 200.0 / math.pi
factor = rho / (dist ** 2)
# Ableitungen nach Punktkoordinaten
if from_idx is not None:
A[i, from_idx * 2] = -dy * factor # dRichtung/dx_from
A[i, from_idx * 2 + 1] = dx * factor # dRichtung/dy_from
if to_idx is not None:
A[i, to_idx * 2] = dy * factor # dRichtung/dx_to
A[i, to_idx * 2 + 1] = -dx * factor # dRichtung/dy_to
# Ableitung nach Orientierung: dRichtung/do = -1
ori_idx = orientation_index.get(obs.from_station)
if ori_idx is not None:
A[i, num_point_unknowns + ori_idx] = -1.0
# Verkürzung l = beobachtet - berechnet
diff = obs.value - direction_calc
# Normalisierung auf [-200, 200] Gon
while diff > 200:
diff -= 400
while diff < -200:
diff += 400
l[i] = diff
elif obs.obs_type == 'distance':
# Partielle Ableitungen für Strecken
if from_idx is not None:
A[i, from_idx * 2] = -dx / dist
A[i, from_idx * 2 + 1] = -dy / dist
if to_idx is not None:
A[i, to_idx * 2] = dx / dist
A[i, to_idx * 2 + 1] = dy / dist
# Strecken sind unabhängig von der Orientierung
# Verkürzung
l[i] = obs.value - dist
# Gewicht
P[i] = obs.weight
return A, l, P
def _build_normal_equations(self, point_index: Dict[str, int],
num_unknowns: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Erstellt Designmatrix A und Beobachtungsvektor l (alte Version ohne Orientierung)"""
# Nur Richtungen und Strecken für 2D-Ausgleichung
relevant_obs = [o for o in self.observations
if o.obs_type in ['direction', 'distance']]
n_obs = len(relevant_obs)
A = np.zeros((n_obs, num_unknowns))
l = np.zeros(n_obs)
P = np.zeros(n_obs)
for i, obs in enumerate(relevant_obs):
from_pt = self.points.get(obs.from_station)
to_pt = self.points.get(obs.to_point)
if from_pt is None or to_pt is None:
continue
dx = to_pt.x - from_pt.x
dy = to_pt.y - from_pt.y
dist = math.sqrt(dx**2 + dy**2)
if dist < 1e-10:
continue
from_idx = point_index.get(obs.from_station)
to_idx = point_index.get(obs.to_point)
if obs.obs_type == 'direction':
# Azimut berechnen
azimuth_calc = math.atan2(dx, dy) * 200.0 / math.pi
if azimuth_calc < 0:
azimuth_calc += 400.0
# Partielle Ableitungen (in Gon)
rho = 200.0 / math.pi
factor = rho / (dist ** 2)
if from_idx is not None:
A[i, from_idx * 2] = -dy * factor # dAz/dx_from
A[i, from_idx * 2 + 1] = dx * factor # dAz/dy_from
if to_idx is not None:
A[i, to_idx * 2] = dy * factor # dAz/dx_to
A[i, to_idx * 2 + 1] = -dx * factor # dAz/dy_to
# Verkürzung l = beobachtet - berechnet
diff = obs.value - azimuth_calc
# Normalisierung auf [-200, 200] Gon
while diff > 200:
diff -= 400
while diff < -200:
diff += 400
l[i] = diff
elif obs.obs_type == 'distance':
# Partielle Ableitungen
if from_idx is not None:
A[i, from_idx * 2] = -dx / dist
A[i, from_idx * 2 + 1] = -dy / dist
if to_idx is not None:
A[i, to_idx * 2] = dx / dist
A[i, to_idx * 2 + 1] = dy / dist
# Verkürzung
l[i] = obs.value - dist
# Gewicht
P[i] = obs.weight
return A, l, P
def _compute_residuals(self):
"""Berechnet Verbesserungen (Residuen) für alle Beobachtungen"""
for obs in self.observations:
from_pt = self.points.get(obs.from_station)
to_pt = self.points.get(obs.to_point)
if from_pt is None or to_pt is None:
continue
dx = to_pt.x - from_pt.x
dy = to_pt.y - from_pt.y
dz = to_pt.z - from_pt.z
dist_2d = math.sqrt(dx**2 + dy**2)
dist_3d = math.sqrt(dx**2 + dy**2 + dz**2)
if obs.obs_type == 'direction':
azimuth_calc = math.atan2(dx, dy) * 200.0 / math.pi
if azimuth_calc < 0:
azimuth_calc += 400.0
# Berücksichtige Orientierung
station_orientation = self.orientations.get(obs.from_station, 0.0)
direction_calc = azimuth_calc - station_orientation
while direction_calc < 0:
direction_calc += 400.0
while direction_calc >= 400:
direction_calc -= 400.0
residual = obs.value - direction_calc
while residual > 200:
residual -= 400
while residual < -200:
residual += 400
obs.residual = residual
elif obs.obs_type == 'distance':
obs.residual = obs.value - dist_2d # 2D Strecke verwenden
elif obs.obs_type == 'zenith':
if dist_2d > 0:
zenith_calc = math.atan2(dist_2d, dz) * 200.0 / math.pi
obs.residual = obs.value - zenith_calc
def _compute_accuracy(self, point_index: Dict[str, int], num_unknowns: int):
"""Berechnet Genauigkeitsmaße für ausgeglichene Punkte"""
# Vereinfachte Berechnung der Standardabweichungen
# Vollständige Varianzfortpflanzung würde Inverse von N erfordern
# Erstelle Normalgleichungsmatrix erneut
A, l, P = self._build_normal_equations(point_index, num_unknowns)
N = A.T @ np.diag(P) @ A
try:
Q = np.linalg.inv(N) # Kofaktormatrix
except np.linalg.LinAlgError:
Q = np.eye(num_unknowns) * 0.001
# A-posteriori Varianzfaktor
v = A @ np.zeros(num_unknowns) - l # Residuen (vereinfacht)
redundancy = len(l) - num_unknowns
if redundancy > 0:
sum_pvv = np.sum(P * v**2)
sigma_0_post = math.sqrt(sum_pvv / redundancy)
else:
sigma_0_post = self.sigma_0_priori
self.sigma_0_posteriori = sigma_0_post
# Punktgenauigkeiten
for name, idx in point_index.items():
i = idx * 2
# Standardabweichungen
self.points[name].std_x = sigma_0_post * math.sqrt(abs(Q[i, i]))
self.points[name].std_y = sigma_0_post * math.sqrt(abs(Q[i + 1, i + 1]))
# Punktlagegenauigkeit
self.points[name].std_position = math.sqrt(
self.points[name].std_x**2 + self.points[name].std_y**2
)
# Konfidenzellipse (vereinfacht)
# Vollständige Berechnung würde Eigenwertanalyse erfordern
cov_xy = Q[i, i + 1] if i + 1 < num_unknowns else 0
var_x = Q[i, i]
var_y = Q[i + 1, i + 1]
# Eigenwerte für Ellipse
trace = var_x + var_y
det = var_x * var_y - cov_xy**2
discriminant = trace**2 / 4 - det
if discriminant >= 0:
sqrt_disc = math.sqrt(discriminant)
lambda1 = trace / 2 + sqrt_disc
lambda2 = trace / 2 - sqrt_disc
self.points[name].semi_major = sigma_0_post * math.sqrt(max(lambda1, 0))
self.points[name].semi_minor = sigma_0_post * math.sqrt(max(lambda2, 0))
if abs(var_x - var_y) > 1e-10:
self.points[name].orientation = 0.5 * math.atan2(2 * cov_xy, var_x - var_y) * 200 / math.pi
def _compute_global_statistics(self):
"""Berechnet globale Statistiken"""
if self.result is None:
return
# RMSE für verschiedene Beobachtungstypen
dir_residuals = [o.residual for o in self.observations if o.obs_type == 'direction']
dist_residuals = [o.residual for o in self.observations if o.obs_type == 'distance']
zen_residuals = [o.residual for o in self.observations if o.obs_type == 'zenith']
if dir_residuals:
self.result.rmse_directions = math.sqrt(sum(r**2 for r in dir_residuals) / len(dir_residuals)) * 1000 # mgon
if dist_residuals:
self.result.rmse_distances = math.sqrt(sum(r**2 for r in dist_residuals) / len(dist_residuals)) * 1000 # mm
if zen_residuals:
self.result.rmse_zenith = math.sqrt(sum(r**2 for r in zen_residuals) / len(zen_residuals)) * 1000 # mgon
self.result.sigma_0_posteriori = self.sigma_0_posteriori
# Chi-Quadrat-Test
if self.result.redundancy > 0:
self.result.chi_square = (self.sigma_0_posteriori / self.sigma_0_priori) ** 2 * self.result.redundancy
def get_adjustment_report(self) -> str:
"""Erstellt einen detaillierten Ausgleichungsbericht"""
if self.result is None:
return "Keine Ausgleichung durchgeführt."
lines = []
lines.append("=" * 80)
lines.append("NETZAUSGLEICHUNG - ERGEBNISBERICHT")
lines.append("=" * 80)
lines.append("")
# Allgemeine Informationen
lines.append("ALLGEMEINE INFORMATIONEN")
lines.append("-" * 80)
lines.append(f"Job: {self.parser.job_name}")
lines.append(f"Anzahl Punkte: {self.result.num_points}")
lines.append(f" davon Festpunkte: {self.result.num_fixed_points}")
lines.append(f" davon Neupunkte: {self.result.num_points - self.result.num_fixed_points}")
lines.append(f"Anzahl Beobachtungen: {self.result.num_observations}")
lines.append(f"Anzahl Unbekannte: {self.result.num_unknowns}")
lines.append(f"Redundanz: {self.result.redundancy}")
lines.append(f"Iterationen: {self.result.iterations}")
lines.append(f"Konvergiert: {'Ja' if self.result.converged else 'Nein'}")
lines.append("")
# Qualitätsparameter
lines.append("GLOBALE QUALITÄTSPARAMETER")
lines.append("-" * 80)
lines.append(f"Sigma-0 a-priori: {self.sigma_0_priori:.4f}")
lines.append(f"Sigma-0 a-posteriori: {self.result.sigma_0_posteriori:.4f}")
lines.append(f"Chi-Quadrat: {self.result.chi_square:.2f}")
lines.append(f"RMSE Richtungen: {self.result.rmse_directions:.2f} mgon")
lines.append(f"RMSE Strecken: {self.result.rmse_distances:.2f} mm")
lines.append(f"RMSE Zenitwinkel: {self.result.rmse_zenith:.2f} mgon")
lines.append("")
# Festpunkte
lines.append("FESTPUNKTE")
lines.append("-" * 80)
lines.append(f"{'Punkt':<12} {'X [m]':>14} {'Y [m]':>14} {'Z [m]':>12}")
lines.append("-" * 80)
for name in self.fixed_points:
if name in self.points:
p = self.points[name]
lines.append(f"{name:<12} {p.x:>14.4f} {p.y:>14.4f} {p.z:>12.4f}")
lines.append("")
# Ausgeglichene Koordinaten
lines.append("AUSGEGLICHENE KOORDINATEN (NEUPUNKTE)")
lines.append("-" * 80)
lines.append(f"{'Punkt':<12} {'X [m]':>14} {'Y [m]':>14} {'Z [m]':>12} {'σX [mm]':>10} {'σY [mm]':>10} {'σPos [mm]':>10}")
lines.append("-" * 80)
for name, p in sorted(self.points.items()):
if name not in self.fixed_points:
lines.append(f"{name:<12} {p.x:>14.4f} {p.y:>14.4f} {p.z:>12.4f} "
f"{p.std_x*1000:>10.2f} {p.std_y*1000:>10.2f} {p.std_position*1000:>10.2f}")
lines.append("")
# Beobachtungen und Residuen
lines.append("BEOBACHTUNGEN UND VERBESSERUNGEN")
lines.append("-" * 80)
lines.append(f"{'Von':<10} {'Nach':<10} {'Typ':<10} {'Messwert':>14} {'Residuum':>12}")
lines.append("-" * 80)
for obs in sorted(self.observations, key=lambda x: (x.from_station, x.to_point)):
if obs.obs_type == 'direction':
unit = "gon"
res_str = f"{obs.residual*1000:.2f} mgon"
elif obs.obs_type == 'distance':
unit = "m"
res_str = f"{obs.residual*1000:.2f} mm"
elif obs.obs_type == 'zenith':
unit = "gon"
res_str = f"{obs.residual*1000:.2f} mgon"
else:
unit = ""
res_str = f"{obs.residual:.4f}"
lines.append(f"{obs.from_station:<10} {obs.to_point:<10} {obs.obs_type:<10} "
f"{obs.value:>12.4f} {unit:<2} {res_str:>12}")
lines.append("")
lines.append("=" * 80)
return "\n".join(lines)
def export_adjusted_points(self, filepath: str, format: str = 'csv'):
"""Exportiert ausgeglichene Punkte"""
if format == 'csv':
lines = ["Punkt;X;Y;Z;Sigma_X;Sigma_Y;Sigma_Pos;Festpunkt"]
for name, p in sorted(self.points.items()):
fixed = "Ja" if p.is_fixed else "Nein"
lines.append(f"{name};{p.x:.4f};{p.y:.4f};{p.z:.4f};"
f"{p.std_x*1000:.2f};{p.std_y*1000:.2f};{p.std_position*1000:.2f};{fixed}")
else:
lines = []
for name, p in sorted(self.points.items()):
lines.append(f"{name}\t{p.x:.4f}\t{p.y:.4f}\t{p.z:.4f}")
with open(filepath, 'w', encoding='utf-8') as f:
f.write("\n".join(lines))
def export_report(self, filepath: str):
"""Exportiert den vollständigen Bericht"""
report = self.get_adjustment_report()
with open(filepath, 'w', encoding='utf-8') as f:
f.write(report)
def get_point_comparison(self) -> List[Dict]:
"""
Erstellt Punktevergleich: Koordinaten vor und nach der Ausgleichung
Gibt Liste von Dicts zurück, sortiert nach größter Abweichung
"""
comparison = []
for name, pt in self.points.items():
if name in self.coords_before:
x_before, y_before, z_before = self.coords_before[name]
x_after, y_after, z_after = pt.x, pt.y, pt.z
dx = x_after - x_before
dy = y_after - y_before
dz = z_after - z_before
d3d = math.sqrt(dx**2 + dy**2 + dz**2)
d2d = math.sqrt(dx**2 + dy**2)
comparison.append({
'name': name,
'x_before': x_before,
'y_before': y_before,
'z_before': z_before,
'x_after': x_after,
'y_after': y_after,
'z_after': z_after,
'dx': dx,
'dy': dy,
'dz': dz,
'd2d': d2d,
'd3d': d3d,
'is_fixed': pt.is_fixed
})
# Sortiert nach größter 3D-Abweichung
comparison.sort(key=lambda x: x['d3d'], reverse=True)
return comparison
def get_comparison_report(self) -> str:
"""Erstellt einen Punktevergleichs-Bericht"""
comparison = self.get_point_comparison()
lines = []
lines.append("=" * 120)
lines.append("PUNKTEVERGLEICH: VORHER vs. NACHHER")
lines.append("=" * 120)
lines.append("")
lines.append(f"{'Punkt':<10} {'X_vorher':>12} {'Y_vorher':>12} {'Z_vorher':>10} | "
f"{'X_nachher':>12} {'Y_nachher':>12} {'Z_nachher':>10} | "
f"{'ΔX [mm]':>10} {'ΔY [mm]':>10} {'ΔZ [mm]':>10} {'Δ3D [mm]':>10}")
lines.append("-" * 120)
for c in comparison:
fixed_marker = "*" if c['is_fixed'] else " "
lines.append(f"{c['name']:<9}{fixed_marker} "
f"{c['x_before']:>12.4f} {c['y_before']:>12.4f} {c['z_before']:>10.4f} | "
f"{c['x_after']:>12.4f} {c['y_after']:>12.4f} {c['z_after']:>10.4f} | "
f"{c['dx']*1000:>10.2f} {c['dy']*1000:>10.2f} {c['dz']*1000:>10.2f} {c['d3d']*1000:>10.2f}")
lines.append("")
lines.append("* = Festpunkt (nicht ausgeglichen)")
lines.append("")
# Plausibilitätsprüfung
lines.append("=" * 80)
lines.append("PLAUSIBILITÄTSPRÜFUNG")
lines.append("=" * 80)
non_fixed = [c for c in comparison if not c['is_fixed']]
critical_errors = [c for c in non_fixed if c['d3d'] > 1.0]
warnings = [c for c in non_fixed if 0.1 < c['d3d'] <= 1.0]
if critical_errors:
lines.append("")
lines.append("🚨 KRITISCHE FEHLER (Abweichung > 1m):")
for c in critical_errors[:10]: # Max 10 anzeigen
lines.append(f"{c['name']}: {c['d3d']*1000:.1f} mm")
if len(critical_errors) > 10:
lines.append(f" ... und {len(critical_errors)-10} weitere")
if warnings:
lines.append("")
lines.append("⚠️ WARNUNGEN (Abweichung > 10cm):")
for c in warnings[:10]:
lines.append(f"{c['name']}: {c['d3d']*1000:.1f} mm")
if len(warnings) > 10:
lines.append(f" ... und {len(warnings)-10} weitere")
# Statistik
if non_fixed:
d3d_values = [c['d3d'] for c in non_fixed]
dx_values = [abs(c['dx']) for c in non_fixed]
dy_values = [abs(c['dy']) for c in non_fixed]
lines.append("")
lines.append("### Statistik (nur Neupunkte):")
lines.append(f" Anzahl Punkte: {len(non_fixed)}")
lines.append(f" Min Δ3D: {min(d3d_values)*1000:.2f} mm")
lines.append(f" Max Δ3D: {max(d3d_values)*1000:.2f} mm")
lines.append(f" Mittel Δ3D: {sum(d3d_values)/len(d3d_values)*1000:.2f} mm")
lines.append(f" Max |ΔX|: {max(dx_values)*1000:.2f} mm")
lines.append(f" Max |ΔY|: {max(dy_values)*1000:.2f} mm")
if not critical_errors and not warnings:
lines.append("")
lines.append("✅ Alle Abweichungen im akzeptablen Bereich (< 10cm)")
return "\n".join(lines)
def export_comparison(self, filepath: str, format: str = 'csv'):
"""Exportiert den Punktevergleich"""
comparison = self.get_point_comparison()
if format == 'csv':
lines = ["Punkt;X_vorher;Y_vorher;Z_vorher;X_nachher;Y_nachher;Z_nachher;DX_mm;DY_mm;DZ_mm;D3D_mm;Festpunkt"]
for c in comparison:
fixed = "Ja" if c['is_fixed'] else "Nein"
lines.append(f"{c['name']};{c['x_before']:.4f};{c['y_before']:.4f};{c['z_before']:.4f};"
f"{c['x_after']:.4f};{c['y_after']:.4f};{c['z_after']:.4f};"
f"{c['dx']*1000:.2f};{c['dy']*1000:.2f};{c['dz']*1000:.2f};{c['d3d']*1000:.2f};{fixed}")
else:
lines = [self.get_comparison_report()]
with open(filepath, 'w', encoding='utf-8') as f:
f.write("\n".join(lines))