1035 lines
42 KiB
Python
1035 lines
42 KiB
Python
"""
|
||
Netzausgleichungs-Modul
|
||
Methode der kleinsten Quadrate für geodätische Netze
|
||
Basiert auf Beobachtungen aus JXL-Dateien
|
||
"""
|
||
|
||
import math
|
||
import numpy as np
|
||
from scipy import sparse
|
||
from scipy.sparse.linalg import spsolve
|
||
from typing import List, Dict, Tuple, Optional, Set
|
||
from dataclasses import dataclass, field
|
||
from .jxl_parser import JXLParser, Point, Station
|
||
|
||
|
||
@dataclass
|
||
class Observation:
|
||
"""Repräsentiert eine geodätische Beobachtung"""
|
||
obs_type: str # 'direction', 'distance', 'zenith', 'height_diff'
|
||
from_station: str
|
||
to_point: str
|
||
value: float # Messwert (Gon, Meter, etc.)
|
||
std_dev: float # Standardabweichung
|
||
residual: float = 0.0 # Verbesserung
|
||
weight: float = 1.0
|
||
|
||
def __post_init__(self):
|
||
if self.std_dev > 0:
|
||
self.weight = 1.0 / (self.std_dev ** 2)
|
||
|
||
|
||
@dataclass
|
||
class AdjustedPoint:
|
||
"""Ausgeglichener Punkt mit Genauigkeitsangaben"""
|
||
name: str
|
||
x: float # East
|
||
y: float # North
|
||
z: float # Elevation
|
||
|
||
# A-priori Koordinaten
|
||
x_approx: float = 0.0
|
||
y_approx: float = 0.0
|
||
z_approx: float = 0.0
|
||
|
||
# Korrekturen
|
||
dx: float = 0.0
|
||
dy: float = 0.0
|
||
dz: float = 0.0
|
||
|
||
# Standardabweichungen
|
||
std_x: float = 0.0
|
||
std_y: float = 0.0
|
||
std_z: float = 0.0
|
||
std_position: float = 0.0 # 2D Punktlagegenauigkeit
|
||
|
||
# Konfidenzellipse
|
||
semi_major: float = 0.0
|
||
semi_minor: float = 0.0
|
||
orientation: float = 0.0 # in Gon
|
||
|
||
# Status
|
||
is_fixed: bool = False
|
||
|
||
|
||
@dataclass
|
||
class AdjustmentResult:
|
||
"""Ergebnis der Netzausgleichung"""
|
||
# Ausgeglichene Punkte
|
||
adjusted_points: Dict[str, AdjustedPoint] = field(default_factory=dict)
|
||
|
||
# Beobachtungen mit Residuen
|
||
observations: List[Observation] = field(default_factory=list)
|
||
|
||
# Globale Qualitätsparameter
|
||
sigma_0_priori: float = 1.0
|
||
sigma_0_posteriori: float = 0.0
|
||
chi_square: float = 0.0
|
||
redundancy: int = 0
|
||
|
||
# RMSE der Residuen
|
||
rmse_directions: float = 0.0 # in mgon
|
||
rmse_distances: float = 0.0 # in mm
|
||
rmse_zenith: float = 0.0 # in mgon
|
||
|
||
# Iterationsinfo
|
||
iterations: int = 0
|
||
converged: bool = False
|
||
|
||
# Statistiken
|
||
num_points: int = 0
|
||
num_fixed_points: int = 0
|
||
num_observations: int = 0
|
||
num_unknowns: int = 0
|
||
|
||
|
||
class NetworkAdjustment:
|
||
"""
|
||
Netzausgleichung nach der Methode der kleinsten Quadrate
|
||
Gauß-Markov-Modell mit Beobachtungsgleichungen
|
||
|
||
KORRIGIERT: Mit Orientierungsunbekannten pro Station
|
||
"""
|
||
|
||
def __init__(self, parser: JXLParser):
|
||
self.parser = parser
|
||
self.observations: List[Observation] = []
|
||
self.points: Dict[str, AdjustedPoint] = {}
|
||
self.fixed_points: Set[str] = set()
|
||
self.result: Optional[AdjustmentResult] = None
|
||
|
||
# Orientierungsunbekannte pro Station
|
||
self.orientations: Dict[str, float] = {} # Station -> Orientierungskorrektur in Gon
|
||
self.station_names: List[str] = []
|
||
|
||
# Speichere ursprüngliche Koordinaten für Vergleich
|
||
self.coords_before: Dict[str, Tuple[float, float, float]] = {}
|
||
|
||
# Konfiguration
|
||
self.max_iterations: int = 20
|
||
self.convergence_limit: float = 1e-8 # Meter
|
||
self.sigma_0_priori: float = 1.0
|
||
|
||
# Standard-Genauigkeiten (falls nicht aus JXL)
|
||
self.default_std_direction: float = 0.0003 # Gon (0.3 mgon)
|
||
self.default_std_distance: float = 0.002 # Meter
|
||
self.default_std_zenith: float = 0.0003 # Gon
|
||
|
||
def extract_observations(self):
|
||
"""
|
||
Extrahiert Beobachtungen aus JXL-Daten
|
||
|
||
WICHTIG: Richtungen werden als ROHE Kreislesungen extrahiert (ohne Orientierungskorrektur),
|
||
da die Orientierung als Unbekannte geschätzt wird.
|
||
"""
|
||
self.observations = []
|
||
|
||
for station_id, station in self.parser.stations.items():
|
||
# Messungen von dieser Station
|
||
measurements = self.parser.get_measurements_from_station(station_id)
|
||
|
||
for meas in measurements:
|
||
if meas.name and not meas.deleted:
|
||
# Richtung - OHNE Orientierungskorrektur (wird als Unbekannte geschätzt)
|
||
if meas.horizontal_circle is not None:
|
||
std = meas.hz_std_error if meas.hz_std_error else self.default_std_direction
|
||
|
||
self.observations.append(Observation(
|
||
obs_type='direction',
|
||
from_station=station.name,
|
||
to_point=meas.name,
|
||
value=meas.horizontal_circle, # Rohe Kreislesung
|
||
std_dev=std
|
||
))
|
||
|
||
# Strecke (Horizontalstrecke aus 3D-Messung)
|
||
if meas.edm_distance is not None and meas.vertical_circle is not None:
|
||
std = meas.dist_std_error if meas.dist_std_error else self.default_std_distance
|
||
|
||
# Prismenkonstante berücksichtigen
|
||
prism_const = 0.0
|
||
if meas.target_id in self.parser.targets:
|
||
prism_const = self.parser.targets[meas.target_id].prism_constant
|
||
|
||
# Schrägstrecke
|
||
slope_distance = meas.edm_distance + prism_const
|
||
|
||
# Horizontalstrecke aus Schrägstrecke und Zenitwinkel
|
||
zenith_rad = meas.vertical_circle * math.pi / 200.0
|
||
horizontal_distance = slope_distance * math.sin(zenith_rad)
|
||
|
||
self.observations.append(Observation(
|
||
obs_type='distance',
|
||
from_station=station.name,
|
||
to_point=meas.name,
|
||
value=horizontal_distance, # Horizontalstrecke
|
||
std_dev=std
|
||
))
|
||
|
||
# Zenitwinkel
|
||
if meas.vertical_circle is not None:
|
||
std = meas.vz_std_error if meas.vz_std_error else self.default_std_zenith
|
||
|
||
self.observations.append(Observation(
|
||
obs_type='zenith',
|
||
from_station=station.name,
|
||
to_point=meas.name,
|
||
value=meas.vertical_circle,
|
||
std_dev=std
|
||
))
|
||
|
||
return self.observations
|
||
|
||
def initialize_points(self):
|
||
"""Initialisiert Näherungskoordinaten aus JXL-Daten"""
|
||
self.points = {}
|
||
self.coords_before = {}
|
||
self.station_names = []
|
||
self.orientations = {}
|
||
|
||
# Alle aktiven Punkte
|
||
for name, point in self.parser.get_active_points().items():
|
||
x = point.east if point.east is not None else 0.0
|
||
y = point.north if point.north is not None else 0.0
|
||
z = point.elevation if point.elevation is not None else 0.0
|
||
|
||
self.points[name] = AdjustedPoint(
|
||
name=name,
|
||
x=x, y=y, z=z,
|
||
x_approx=x, y_approx=y, z_approx=z
|
||
)
|
||
self.coords_before[name] = (x, y, z)
|
||
|
||
# Stationen hinzufügen
|
||
for station_id, station in self.parser.stations.items():
|
||
if station.name not in self.points:
|
||
x = station.east if station.east is not None else 0.0
|
||
y = station.north if station.north is not None else 0.0
|
||
z = station.elevation if station.elevation is not None else 0.0
|
||
|
||
self.points[station.name] = AdjustedPoint(
|
||
name=station.name,
|
||
x=x, y=y, z=z,
|
||
x_approx=x, y_approx=y, z_approx=z
|
||
)
|
||
self.coords_before[station.name] = (x, y, z)
|
||
|
||
if station.name not in self.station_names:
|
||
self.station_names.append(station.name)
|
||
|
||
# Orientierungen aus bekannten Koordinaten und Beobachtungen berechnen
|
||
self._compute_initial_orientations()
|
||
|
||
def _compute_initial_orientations(self):
|
||
"""
|
||
Berechnet initiale Orientierungen aus den bekannten Koordinaten.
|
||
Dies ist wichtig, da die JXL-Koordinaten bereits fertig berechnet sind.
|
||
"""
|
||
for station_name in self.station_names:
|
||
orientations_for_station = []
|
||
|
||
for obs in self.observations:
|
||
if obs.obs_type == 'direction' and obs.from_station == station_name:
|
||
from_pt = self.points.get(station_name)
|
||
to_pt = self.points.get(obs.to_point)
|
||
|
||
if from_pt and to_pt:
|
||
dx = to_pt.x - from_pt.x
|
||
dy = to_pt.y - from_pt.y
|
||
dist = math.sqrt(dx**2 + dy**2)
|
||
|
||
if dist > 0.01: # Mindestentfernung
|
||
# Azimut aus Koordinaten (geodätisch: atan2(dx, dy))
|
||
azimuth = math.atan2(dx, dy) * 200.0 / math.pi
|
||
if azimuth < 0:
|
||
azimuth += 400.0
|
||
|
||
# Orientierung = Azimut - Richtung
|
||
orientation = azimuth - obs.value
|
||
# Normalisieren auf [0, 400] gon
|
||
while orientation < 0:
|
||
orientation += 400.0
|
||
while orientation >= 400:
|
||
orientation -= 400.0
|
||
|
||
orientations_for_station.append(orientation)
|
||
|
||
# Mittlere Orientierung verwenden (robust gegen Ausreißer)
|
||
if orientations_for_station:
|
||
# Kreismittel berechnen (für Winkel)
|
||
sin_sum = sum(math.sin(o * math.pi / 200) for o in orientations_for_station)
|
||
cos_sum = sum(math.cos(o * math.pi / 200) for o in orientations_for_station)
|
||
mean_orientation = math.atan2(sin_sum, cos_sum) * 200 / math.pi
|
||
if mean_orientation < 0:
|
||
mean_orientation += 400.0
|
||
self.orientations[station_name] = mean_orientation
|
||
else:
|
||
self.orientations[station_name] = 0.0
|
||
|
||
def set_fixed_point(self, point_name: str):
|
||
"""Setzt einen Punkt als Festpunkt"""
|
||
if point_name in self.points:
|
||
self.fixed_points.add(point_name)
|
||
self.points[point_name].is_fixed = True
|
||
|
||
def set_fixed_points_auto(self):
|
||
"""Setzt automatisch Festpunkte (Referenzpunkte)"""
|
||
# Referenzlinie als Festpunkte
|
||
ref_line = self.parser.get_reference_line()
|
||
if ref_line:
|
||
self.set_fixed_point(ref_line.start_point)
|
||
self.set_fixed_point(ref_line.end_point)
|
||
|
||
# Zusätzlich: Erste Station als Festpunkt falls keine Referenzlinie
|
||
if not self.fixed_points:
|
||
stations = list(self.parser.stations.values())
|
||
if stations:
|
||
first_station = min(stations, key=lambda s: s.timestamp)
|
||
self.set_fixed_point(first_station.name)
|
||
|
||
def check_consistency(self) -> dict:
|
||
"""
|
||
Prüft die Konsistenz zwischen Koordinaten und Beobachtungen.
|
||
Gibt einen Bericht über die Qualität der Daten zurück.
|
||
"""
|
||
if not self.observations:
|
||
self.extract_observations()
|
||
if not self.points:
|
||
self.initialize_points()
|
||
|
||
result = {
|
||
'consistent': True,
|
||
'orientation_spread': {},
|
||
'distance_residuals': [],
|
||
'issues': []
|
||
}
|
||
|
||
# Prüfe Orientierungen pro Station
|
||
for station_name in self.station_names:
|
||
orientations = []
|
||
|
||
for obs in self.observations:
|
||
if obs.obs_type == 'direction' and obs.from_station == station_name:
|
||
from_pt = self.points.get(station_name)
|
||
to_pt = self.points.get(obs.to_point)
|
||
|
||
if from_pt and to_pt:
|
||
dx = to_pt.x - from_pt.x
|
||
dy = to_pt.y - from_pt.y
|
||
dist = math.sqrt(dx**2 + dy**2)
|
||
|
||
if dist > 0.01:
|
||
azimuth = math.atan2(dx, dy) * 200.0 / math.pi
|
||
if azimuth < 0:
|
||
azimuth += 400.0
|
||
|
||
ori = azimuth - obs.value
|
||
while ori < 0:
|
||
ori += 400.0
|
||
while ori >= 400:
|
||
ori -= 400.0
|
||
|
||
orientations.append(ori)
|
||
|
||
if orientations:
|
||
spread = max(orientations) - min(orientations)
|
||
result['orientation_spread'][station_name] = {
|
||
'min': min(orientations),
|
||
'max': max(orientations),
|
||
'spread': spread
|
||
}
|
||
|
||
if spread > 1.0: # > 1 gon Spannweite
|
||
result['consistent'] = False
|
||
result['issues'].append(f"Station {station_name}: Orientierungsspannweite {spread:.2f} gon")
|
||
|
||
return result
|
||
|
||
def adjust(self, mode: str = "residuals_only") -> AdjustmentResult:
|
||
"""
|
||
Führt die Netzausgleichung durch.
|
||
|
||
mode:
|
||
"residuals_only" - Berechnet nur Residuen, keine Koordinatenänderung (Standard)
|
||
"full" - Vollständige Ausgleichung mit Koordinatenkorrektur
|
||
"""
|
||
if not self.observations:
|
||
self.extract_observations()
|
||
|
||
if not self.points:
|
||
self.initialize_points()
|
||
|
||
if not self.fixed_points:
|
||
self.set_fixed_points_auto()
|
||
|
||
# Unbekannte bestimmen (nur nicht-fixierte Punkte)
|
||
unknown_points = [name for name in self.points.keys()
|
||
if name not in self.fixed_points]
|
||
|
||
num_observations = len([o for o in self.observations
|
||
if o.obs_type in ['direction', 'distance']])
|
||
|
||
if mode == "residuals_only":
|
||
# Nur Residuenberechnung - Koordinaten bleiben unverändert
|
||
# Orientierungen wurden bereits in initialize_points berechnet
|
||
|
||
# Residuen berechnen
|
||
self._compute_residuals()
|
||
|
||
# Sigma-0 aus Residuen berechnen
|
||
dir_residuals = [o.residual for o in self.observations if o.obs_type == 'direction']
|
||
dist_residuals = [o.residual for o in self.observations if o.obs_type == 'distance']
|
||
|
||
# Einfache Schätzung von sigma-0
|
||
if dir_residuals:
|
||
rmse_dir = math.sqrt(sum(r**2 for r in dir_residuals) / len(dir_residuals))
|
||
else:
|
||
rmse_dir = 0
|
||
if dist_residuals:
|
||
rmse_dist = math.sqrt(sum(r**2 for r in dist_residuals) / len(dist_residuals))
|
||
else:
|
||
rmse_dist = 0
|
||
|
||
self.sigma_0_posteriori = max(rmse_dir * 1000, rmse_dist * 1000) # vereinfacht
|
||
|
||
# Ergebnis zusammenstellen
|
||
self.result = AdjustmentResult(
|
||
adjusted_points=self.points,
|
||
observations=self.observations,
|
||
sigma_0_priori=self.sigma_0_priori,
|
||
iterations=0,
|
||
converged=True,
|
||
num_points=len(self.points),
|
||
num_fixed_points=len(self.fixed_points),
|
||
num_observations=num_observations,
|
||
num_unknowns=0,
|
||
redundancy=num_observations
|
||
)
|
||
|
||
self._compute_global_statistics()
|
||
|
||
return self.result
|
||
|
||
# Vollständige Ausgleichung (mode == "full")
|
||
# Anzahl Unbekannte: 2 pro Punkt (X,Y) + 1 Orientierung pro Station
|
||
num_point_unknowns = len(unknown_points) * 2
|
||
num_orientation_unknowns = len(self.station_names)
|
||
num_unknowns = num_point_unknowns + num_orientation_unknowns
|
||
|
||
if num_unknowns == 0:
|
||
raise ValueError("Keine unbekannten Punkte!")
|
||
|
||
if num_observations < num_unknowns:
|
||
raise ValueError(f"Unterbestimmtes System: {num_observations} Beobachtungen, "
|
||
f"{num_unknowns} Unbekannte")
|
||
|
||
# Index-Mapping für Unbekannte
|
||
point_index = {name: i for i, name in enumerate(unknown_points)}
|
||
orientation_index = {name: i for i, name in enumerate(self.station_names)}
|
||
|
||
# Iterative Lösung
|
||
converged = False
|
||
iteration = 0
|
||
|
||
while not converged and iteration < self.max_iterations:
|
||
iteration += 1
|
||
|
||
A, l, P = self._build_normal_equations_with_orientation(
|
||
point_index, orientation_index, num_point_unknowns, num_unknowns)
|
||
|
||
N = A.T @ np.diag(P) @ A
|
||
n = A.T @ np.diag(P) @ l
|
||
|
||
try:
|
||
dx = np.linalg.solve(N, n)
|
||
except np.linalg.LinAlgError:
|
||
N += np.eye(num_unknowns) * 1e-10
|
||
dx = np.linalg.solve(N, n)
|
||
|
||
max_correction = 0.0
|
||
for name, idx in point_index.items():
|
||
i = idx * 2
|
||
self.points[name].dx = dx[i]
|
||
self.points[name].dy = dx[i + 1]
|
||
self.points[name].x += dx[i]
|
||
self.points[name].y += dx[i + 1]
|
||
|
||
max_correction = max(max_correction,
|
||
abs(dx[i]), abs(dx[i + 1]))
|
||
|
||
for name, idx in orientation_index.items():
|
||
i = num_point_unknowns + idx
|
||
self.orientations[name] += dx[i]
|
||
|
||
if max_correction < self.convergence_limit:
|
||
converged = True
|
||
|
||
self._compute_residuals()
|
||
self._compute_accuracy(point_index, num_point_unknowns)
|
||
|
||
self.result = AdjustmentResult(
|
||
adjusted_points=self.points,
|
||
observations=self.observations,
|
||
sigma_0_priori=self.sigma_0_priori,
|
||
iterations=iteration,
|
||
converged=converged,
|
||
num_points=len(self.points),
|
||
num_fixed_points=len(self.fixed_points),
|
||
num_observations=num_observations,
|
||
num_unknowns=num_unknowns,
|
||
redundancy=num_observations - num_unknowns
|
||
)
|
||
|
||
self._compute_global_statistics()
|
||
|
||
return self.result
|
||
|
||
def _build_normal_equations_with_orientation(self, point_index: Dict[str, int],
|
||
orientation_index: Dict[str, int],
|
||
num_point_unknowns: int,
|
||
num_unknowns: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
|
||
"""
|
||
Erstellt Designmatrix A und Beobachtungsvektor l
|
||
MIT Orientierungsunbekannten pro Station
|
||
|
||
Unbekannte: [x1, y1, x2, y2, ..., o1, o2, ...]
|
||
"""
|
||
|
||
# Nur Richtungen und Strecken für 2D-Ausgleichung
|
||
relevant_obs = [o for o in self.observations
|
||
if o.obs_type in ['direction', 'distance']]
|
||
|
||
n_obs = len(relevant_obs)
|
||
A = np.zeros((n_obs, num_unknowns))
|
||
l = np.zeros(n_obs)
|
||
P = np.zeros(n_obs)
|
||
|
||
for i, obs in enumerate(relevant_obs):
|
||
from_pt = self.points.get(obs.from_station)
|
||
to_pt = self.points.get(obs.to_point)
|
||
|
||
if from_pt is None or to_pt is None:
|
||
continue
|
||
|
||
dx = to_pt.x - from_pt.x
|
||
dy = to_pt.y - from_pt.y
|
||
dist = math.sqrt(dx**2 + dy**2)
|
||
|
||
if dist < 1e-10:
|
||
continue
|
||
|
||
from_idx = point_index.get(obs.from_station)
|
||
to_idx = point_index.get(obs.to_point)
|
||
|
||
if obs.obs_type == 'direction':
|
||
# Azimut aus Koordinaten berechnen
|
||
azimuth_calc = math.atan2(dx, dy) * 200.0 / math.pi
|
||
if azimuth_calc < 0:
|
||
azimuth_calc += 400.0
|
||
|
||
# Orientierung der Station
|
||
station_orientation = self.orientations.get(obs.from_station, 0.0)
|
||
|
||
# Berechnete Richtung = Azimut - Orientierung
|
||
direction_calc = azimuth_calc - station_orientation
|
||
while direction_calc < 0:
|
||
direction_calc += 400.0
|
||
while direction_calc >= 400:
|
||
direction_calc -= 400.0
|
||
|
||
# Partielle Ableitungen für Richtungen (in Gon)
|
||
rho = 200.0 / math.pi
|
||
factor = rho / (dist ** 2)
|
||
|
||
# Ableitungen nach Punktkoordinaten
|
||
if from_idx is not None:
|
||
A[i, from_idx * 2] = -dy * factor # dRichtung/dx_from
|
||
A[i, from_idx * 2 + 1] = dx * factor # dRichtung/dy_from
|
||
|
||
if to_idx is not None:
|
||
A[i, to_idx * 2] = dy * factor # dRichtung/dx_to
|
||
A[i, to_idx * 2 + 1] = -dx * factor # dRichtung/dy_to
|
||
|
||
# Ableitung nach Orientierung: dRichtung/do = -1
|
||
ori_idx = orientation_index.get(obs.from_station)
|
||
if ori_idx is not None:
|
||
A[i, num_point_unknowns + ori_idx] = -1.0
|
||
|
||
# Verkürzung l = beobachtet - berechnet
|
||
diff = obs.value - direction_calc
|
||
# Normalisierung auf [-200, 200] Gon
|
||
while diff > 200:
|
||
diff -= 400
|
||
while diff < -200:
|
||
diff += 400
|
||
l[i] = diff
|
||
|
||
elif obs.obs_type == 'distance':
|
||
# Partielle Ableitungen für Strecken
|
||
if from_idx is not None:
|
||
A[i, from_idx * 2] = -dx / dist
|
||
A[i, from_idx * 2 + 1] = -dy / dist
|
||
|
||
if to_idx is not None:
|
||
A[i, to_idx * 2] = dx / dist
|
||
A[i, to_idx * 2 + 1] = dy / dist
|
||
|
||
# Strecken sind unabhängig von der Orientierung
|
||
|
||
# Verkürzung
|
||
l[i] = obs.value - dist
|
||
|
||
# Gewicht
|
||
P[i] = obs.weight
|
||
|
||
return A, l, P
|
||
|
||
def _build_normal_equations(self, point_index: Dict[str, int],
|
||
num_unknowns: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
|
||
"""Erstellt Designmatrix A und Beobachtungsvektor l (alte Version ohne Orientierung)"""
|
||
|
||
# Nur Richtungen und Strecken für 2D-Ausgleichung
|
||
relevant_obs = [o for o in self.observations
|
||
if o.obs_type in ['direction', 'distance']]
|
||
|
||
n_obs = len(relevant_obs)
|
||
A = np.zeros((n_obs, num_unknowns))
|
||
l = np.zeros(n_obs)
|
||
P = np.zeros(n_obs)
|
||
|
||
for i, obs in enumerate(relevant_obs):
|
||
from_pt = self.points.get(obs.from_station)
|
||
to_pt = self.points.get(obs.to_point)
|
||
|
||
if from_pt is None or to_pt is None:
|
||
continue
|
||
|
||
dx = to_pt.x - from_pt.x
|
||
dy = to_pt.y - from_pt.y
|
||
dist = math.sqrt(dx**2 + dy**2)
|
||
|
||
if dist < 1e-10:
|
||
continue
|
||
|
||
from_idx = point_index.get(obs.from_station)
|
||
to_idx = point_index.get(obs.to_point)
|
||
|
||
if obs.obs_type == 'direction':
|
||
# Azimut berechnen
|
||
azimuth_calc = math.atan2(dx, dy) * 200.0 / math.pi
|
||
if azimuth_calc < 0:
|
||
azimuth_calc += 400.0
|
||
|
||
# Partielle Ableitungen (in Gon)
|
||
rho = 200.0 / math.pi
|
||
factor = rho / (dist ** 2)
|
||
|
||
if from_idx is not None:
|
||
A[i, from_idx * 2] = -dy * factor # dAz/dx_from
|
||
A[i, from_idx * 2 + 1] = dx * factor # dAz/dy_from
|
||
|
||
if to_idx is not None:
|
||
A[i, to_idx * 2] = dy * factor # dAz/dx_to
|
||
A[i, to_idx * 2 + 1] = -dx * factor # dAz/dy_to
|
||
|
||
# Verkürzung l = beobachtet - berechnet
|
||
diff = obs.value - azimuth_calc
|
||
# Normalisierung auf [-200, 200] Gon
|
||
while diff > 200:
|
||
diff -= 400
|
||
while diff < -200:
|
||
diff += 400
|
||
l[i] = diff
|
||
|
||
elif obs.obs_type == 'distance':
|
||
# Partielle Ableitungen
|
||
if from_idx is not None:
|
||
A[i, from_idx * 2] = -dx / dist
|
||
A[i, from_idx * 2 + 1] = -dy / dist
|
||
|
||
if to_idx is not None:
|
||
A[i, to_idx * 2] = dx / dist
|
||
A[i, to_idx * 2 + 1] = dy / dist
|
||
|
||
# Verkürzung
|
||
l[i] = obs.value - dist
|
||
|
||
# Gewicht
|
||
P[i] = obs.weight
|
||
|
||
return A, l, P
|
||
|
||
def _compute_residuals(self):
|
||
"""Berechnet Verbesserungen (Residuen) für alle Beobachtungen"""
|
||
for obs in self.observations:
|
||
from_pt = self.points.get(obs.from_station)
|
||
to_pt = self.points.get(obs.to_point)
|
||
|
||
if from_pt is None or to_pt is None:
|
||
continue
|
||
|
||
dx = to_pt.x - from_pt.x
|
||
dy = to_pt.y - from_pt.y
|
||
dz = to_pt.z - from_pt.z
|
||
dist_2d = math.sqrt(dx**2 + dy**2)
|
||
dist_3d = math.sqrt(dx**2 + dy**2 + dz**2)
|
||
|
||
if obs.obs_type == 'direction':
|
||
azimuth_calc = math.atan2(dx, dy) * 200.0 / math.pi
|
||
if azimuth_calc < 0:
|
||
azimuth_calc += 400.0
|
||
|
||
# Berücksichtige Orientierung
|
||
station_orientation = self.orientations.get(obs.from_station, 0.0)
|
||
direction_calc = azimuth_calc - station_orientation
|
||
while direction_calc < 0:
|
||
direction_calc += 400.0
|
||
while direction_calc >= 400:
|
||
direction_calc -= 400.0
|
||
|
||
residual = obs.value - direction_calc
|
||
while residual > 200:
|
||
residual -= 400
|
||
while residual < -200:
|
||
residual += 400
|
||
obs.residual = residual
|
||
|
||
elif obs.obs_type == 'distance':
|
||
obs.residual = obs.value - dist_2d # 2D Strecke verwenden
|
||
|
||
elif obs.obs_type == 'zenith':
|
||
if dist_2d > 0:
|
||
zenith_calc = math.atan2(dist_2d, dz) * 200.0 / math.pi
|
||
obs.residual = obs.value - zenith_calc
|
||
|
||
def _compute_accuracy(self, point_index: Dict[str, int], num_unknowns: int):
|
||
"""Berechnet Genauigkeitsmaße für ausgeglichene Punkte"""
|
||
# Vereinfachte Berechnung der Standardabweichungen
|
||
# Vollständige Varianzfortpflanzung würde Inverse von N erfordern
|
||
|
||
# Erstelle Normalgleichungsmatrix erneut
|
||
A, l, P = self._build_normal_equations(point_index, num_unknowns)
|
||
N = A.T @ np.diag(P) @ A
|
||
|
||
try:
|
||
Q = np.linalg.inv(N) # Kofaktormatrix
|
||
except np.linalg.LinAlgError:
|
||
Q = np.eye(num_unknowns) * 0.001
|
||
|
||
# A-posteriori Varianzfaktor
|
||
v = A @ np.zeros(num_unknowns) - l # Residuen (vereinfacht)
|
||
redundancy = len(l) - num_unknowns
|
||
|
||
if redundancy > 0:
|
||
sum_pvv = np.sum(P * v**2)
|
||
sigma_0_post = math.sqrt(sum_pvv / redundancy)
|
||
else:
|
||
sigma_0_post = self.sigma_0_priori
|
||
|
||
self.sigma_0_posteriori = sigma_0_post
|
||
|
||
# Punktgenauigkeiten
|
||
for name, idx in point_index.items():
|
||
i = idx * 2
|
||
|
||
# Standardabweichungen
|
||
self.points[name].std_x = sigma_0_post * math.sqrt(abs(Q[i, i]))
|
||
self.points[name].std_y = sigma_0_post * math.sqrt(abs(Q[i + 1, i + 1]))
|
||
|
||
# Punktlagegenauigkeit
|
||
self.points[name].std_position = math.sqrt(
|
||
self.points[name].std_x**2 + self.points[name].std_y**2
|
||
)
|
||
|
||
# Konfidenzellipse (vereinfacht)
|
||
# Vollständige Berechnung würde Eigenwertanalyse erfordern
|
||
cov_xy = Q[i, i + 1] if i + 1 < num_unknowns else 0
|
||
var_x = Q[i, i]
|
||
var_y = Q[i + 1, i + 1]
|
||
|
||
# Eigenwerte für Ellipse
|
||
trace = var_x + var_y
|
||
det = var_x * var_y - cov_xy**2
|
||
discriminant = trace**2 / 4 - det
|
||
|
||
if discriminant >= 0:
|
||
sqrt_disc = math.sqrt(discriminant)
|
||
lambda1 = trace / 2 + sqrt_disc
|
||
lambda2 = trace / 2 - sqrt_disc
|
||
|
||
self.points[name].semi_major = sigma_0_post * math.sqrt(max(lambda1, 0))
|
||
self.points[name].semi_minor = sigma_0_post * math.sqrt(max(lambda2, 0))
|
||
|
||
if abs(var_x - var_y) > 1e-10:
|
||
self.points[name].orientation = 0.5 * math.atan2(2 * cov_xy, var_x - var_y) * 200 / math.pi
|
||
|
||
def _compute_global_statistics(self):
|
||
"""Berechnet globale Statistiken"""
|
||
if self.result is None:
|
||
return
|
||
|
||
# RMSE für verschiedene Beobachtungstypen
|
||
dir_residuals = [o.residual for o in self.observations if o.obs_type == 'direction']
|
||
dist_residuals = [o.residual for o in self.observations if o.obs_type == 'distance']
|
||
zen_residuals = [o.residual for o in self.observations if o.obs_type == 'zenith']
|
||
|
||
if dir_residuals:
|
||
self.result.rmse_directions = math.sqrt(sum(r**2 for r in dir_residuals) / len(dir_residuals)) * 1000 # mgon
|
||
|
||
if dist_residuals:
|
||
self.result.rmse_distances = math.sqrt(sum(r**2 for r in dist_residuals) / len(dist_residuals)) * 1000 # mm
|
||
|
||
if zen_residuals:
|
||
self.result.rmse_zenith = math.sqrt(sum(r**2 for r in zen_residuals) / len(zen_residuals)) * 1000 # mgon
|
||
|
||
self.result.sigma_0_posteriori = self.sigma_0_posteriori
|
||
|
||
# Chi-Quadrat-Test
|
||
if self.result.redundancy > 0:
|
||
self.result.chi_square = (self.sigma_0_posteriori / self.sigma_0_priori) ** 2 * self.result.redundancy
|
||
|
||
def get_adjustment_report(self) -> str:
|
||
"""Erstellt einen detaillierten Ausgleichungsbericht"""
|
||
if self.result is None:
|
||
return "Keine Ausgleichung durchgeführt."
|
||
|
||
lines = []
|
||
lines.append("=" * 80)
|
||
lines.append("NETZAUSGLEICHUNG - ERGEBNISBERICHT")
|
||
lines.append("=" * 80)
|
||
lines.append("")
|
||
|
||
# Allgemeine Informationen
|
||
lines.append("ALLGEMEINE INFORMATIONEN")
|
||
lines.append("-" * 80)
|
||
lines.append(f"Job: {self.parser.job_name}")
|
||
lines.append(f"Anzahl Punkte: {self.result.num_points}")
|
||
lines.append(f" davon Festpunkte: {self.result.num_fixed_points}")
|
||
lines.append(f" davon Neupunkte: {self.result.num_points - self.result.num_fixed_points}")
|
||
lines.append(f"Anzahl Beobachtungen: {self.result.num_observations}")
|
||
lines.append(f"Anzahl Unbekannte: {self.result.num_unknowns}")
|
||
lines.append(f"Redundanz: {self.result.redundancy}")
|
||
lines.append(f"Iterationen: {self.result.iterations}")
|
||
lines.append(f"Konvergiert: {'Ja' if self.result.converged else 'Nein'}")
|
||
lines.append("")
|
||
|
||
# Qualitätsparameter
|
||
lines.append("GLOBALE QUALITÄTSPARAMETER")
|
||
lines.append("-" * 80)
|
||
lines.append(f"Sigma-0 a-priori: {self.sigma_0_priori:.4f}")
|
||
lines.append(f"Sigma-0 a-posteriori: {self.result.sigma_0_posteriori:.4f}")
|
||
lines.append(f"Chi-Quadrat: {self.result.chi_square:.2f}")
|
||
lines.append(f"RMSE Richtungen: {self.result.rmse_directions:.2f} mgon")
|
||
lines.append(f"RMSE Strecken: {self.result.rmse_distances:.2f} mm")
|
||
lines.append(f"RMSE Zenitwinkel: {self.result.rmse_zenith:.2f} mgon")
|
||
lines.append("")
|
||
|
||
# Festpunkte
|
||
lines.append("FESTPUNKTE")
|
||
lines.append("-" * 80)
|
||
lines.append(f"{'Punkt':<12} {'X [m]':>14} {'Y [m]':>14} {'Z [m]':>12}")
|
||
lines.append("-" * 80)
|
||
for name in self.fixed_points:
|
||
if name in self.points:
|
||
p = self.points[name]
|
||
lines.append(f"{name:<12} {p.x:>14.4f} {p.y:>14.4f} {p.z:>12.4f}")
|
||
lines.append("")
|
||
|
||
# Ausgeglichene Koordinaten
|
||
lines.append("AUSGEGLICHENE KOORDINATEN (NEUPUNKTE)")
|
||
lines.append("-" * 80)
|
||
lines.append(f"{'Punkt':<12} {'X [m]':>14} {'Y [m]':>14} {'Z [m]':>12} {'σX [mm]':>10} {'σY [mm]':>10} {'σPos [mm]':>10}")
|
||
lines.append("-" * 80)
|
||
|
||
for name, p in sorted(self.points.items()):
|
||
if name not in self.fixed_points:
|
||
lines.append(f"{name:<12} {p.x:>14.4f} {p.y:>14.4f} {p.z:>12.4f} "
|
||
f"{p.std_x*1000:>10.2f} {p.std_y*1000:>10.2f} {p.std_position*1000:>10.2f}")
|
||
lines.append("")
|
||
|
||
# Beobachtungen und Residuen
|
||
lines.append("BEOBACHTUNGEN UND VERBESSERUNGEN")
|
||
lines.append("-" * 80)
|
||
lines.append(f"{'Von':<10} {'Nach':<10} {'Typ':<10} {'Messwert':>14} {'Residuum':>12}")
|
||
lines.append("-" * 80)
|
||
|
||
for obs in sorted(self.observations, key=lambda x: (x.from_station, x.to_point)):
|
||
if obs.obs_type == 'direction':
|
||
unit = "gon"
|
||
res_str = f"{obs.residual*1000:.2f} mgon"
|
||
elif obs.obs_type == 'distance':
|
||
unit = "m"
|
||
res_str = f"{obs.residual*1000:.2f} mm"
|
||
elif obs.obs_type == 'zenith':
|
||
unit = "gon"
|
||
res_str = f"{obs.residual*1000:.2f} mgon"
|
||
else:
|
||
unit = ""
|
||
res_str = f"{obs.residual:.4f}"
|
||
|
||
lines.append(f"{obs.from_station:<10} {obs.to_point:<10} {obs.obs_type:<10} "
|
||
f"{obs.value:>12.4f} {unit:<2} {res_str:>12}")
|
||
|
||
lines.append("")
|
||
lines.append("=" * 80)
|
||
|
||
return "\n".join(lines)
|
||
|
||
def export_adjusted_points(self, filepath: str, format: str = 'csv'):
|
||
"""Exportiert ausgeglichene Punkte"""
|
||
if format == 'csv':
|
||
lines = ["Punkt;X;Y;Z;Sigma_X;Sigma_Y;Sigma_Pos;Festpunkt"]
|
||
for name, p in sorted(self.points.items()):
|
||
fixed = "Ja" if p.is_fixed else "Nein"
|
||
lines.append(f"{name};{p.x:.4f};{p.y:.4f};{p.z:.4f};"
|
||
f"{p.std_x*1000:.2f};{p.std_y*1000:.2f};{p.std_position*1000:.2f};{fixed}")
|
||
else:
|
||
lines = []
|
||
for name, p in sorted(self.points.items()):
|
||
lines.append(f"{name}\t{p.x:.4f}\t{p.y:.4f}\t{p.z:.4f}")
|
||
|
||
with open(filepath, 'w', encoding='utf-8') as f:
|
||
f.write("\n".join(lines))
|
||
|
||
def export_report(self, filepath: str):
|
||
"""Exportiert den vollständigen Bericht"""
|
||
report = self.get_adjustment_report()
|
||
with open(filepath, 'w', encoding='utf-8') as f:
|
||
f.write(report)
|
||
|
||
def get_point_comparison(self) -> List[Dict]:
|
||
"""
|
||
Erstellt Punktevergleich: Koordinaten vor und nach der Ausgleichung
|
||
Gibt Liste von Dicts zurück, sortiert nach größter Abweichung
|
||
"""
|
||
comparison = []
|
||
|
||
for name, pt in self.points.items():
|
||
if name in self.coords_before:
|
||
x_before, y_before, z_before = self.coords_before[name]
|
||
x_after, y_after, z_after = pt.x, pt.y, pt.z
|
||
|
||
dx = x_after - x_before
|
||
dy = y_after - y_before
|
||
dz = z_after - z_before
|
||
d3d = math.sqrt(dx**2 + dy**2 + dz**2)
|
||
d2d = math.sqrt(dx**2 + dy**2)
|
||
|
||
comparison.append({
|
||
'name': name,
|
||
'x_before': x_before,
|
||
'y_before': y_before,
|
||
'z_before': z_before,
|
||
'x_after': x_after,
|
||
'y_after': y_after,
|
||
'z_after': z_after,
|
||
'dx': dx,
|
||
'dy': dy,
|
||
'dz': dz,
|
||
'd2d': d2d,
|
||
'd3d': d3d,
|
||
'is_fixed': pt.is_fixed
|
||
})
|
||
|
||
# Sortiert nach größter 3D-Abweichung
|
||
comparison.sort(key=lambda x: x['d3d'], reverse=True)
|
||
return comparison
|
||
|
||
def get_comparison_report(self) -> str:
|
||
"""Erstellt einen Punktevergleichs-Bericht"""
|
||
comparison = self.get_point_comparison()
|
||
|
||
lines = []
|
||
lines.append("=" * 120)
|
||
lines.append("PUNKTEVERGLEICH: VORHER vs. NACHHER")
|
||
lines.append("=" * 120)
|
||
lines.append("")
|
||
lines.append(f"{'Punkt':<10} {'X_vorher':>12} {'Y_vorher':>12} {'Z_vorher':>10} | "
|
||
f"{'X_nachher':>12} {'Y_nachher':>12} {'Z_nachher':>10} | "
|
||
f"{'ΔX [mm]':>10} {'ΔY [mm]':>10} {'ΔZ [mm]':>10} {'Δ3D [mm]':>10}")
|
||
lines.append("-" * 120)
|
||
|
||
for c in comparison:
|
||
fixed_marker = "*" if c['is_fixed'] else " "
|
||
lines.append(f"{c['name']:<9}{fixed_marker} "
|
||
f"{c['x_before']:>12.4f} {c['y_before']:>12.4f} {c['z_before']:>10.4f} | "
|
||
f"{c['x_after']:>12.4f} {c['y_after']:>12.4f} {c['z_after']:>10.4f} | "
|
||
f"{c['dx']*1000:>10.2f} {c['dy']*1000:>10.2f} {c['dz']*1000:>10.2f} {c['d3d']*1000:>10.2f}")
|
||
|
||
lines.append("")
|
||
lines.append("* = Festpunkt (nicht ausgeglichen)")
|
||
lines.append("")
|
||
|
||
# Plausibilitätsprüfung
|
||
lines.append("=" * 80)
|
||
lines.append("PLAUSIBILITÄTSPRÜFUNG")
|
||
lines.append("=" * 80)
|
||
|
||
non_fixed = [c for c in comparison if not c['is_fixed']]
|
||
critical_errors = [c for c in non_fixed if c['d3d'] > 1.0]
|
||
warnings = [c for c in non_fixed if 0.1 < c['d3d'] <= 1.0]
|
||
|
||
if critical_errors:
|
||
lines.append("")
|
||
lines.append("🚨 KRITISCHE FEHLER (Abweichung > 1m):")
|
||
for c in critical_errors[:10]: # Max 10 anzeigen
|
||
lines.append(f" ❌ {c['name']}: {c['d3d']*1000:.1f} mm")
|
||
if len(critical_errors) > 10:
|
||
lines.append(f" ... und {len(critical_errors)-10} weitere")
|
||
|
||
if warnings:
|
||
lines.append("")
|
||
lines.append("⚠️ WARNUNGEN (Abweichung > 10cm):")
|
||
for c in warnings[:10]:
|
||
lines.append(f" ⚠ {c['name']}: {c['d3d']*1000:.1f} mm")
|
||
if len(warnings) > 10:
|
||
lines.append(f" ... und {len(warnings)-10} weitere")
|
||
|
||
# Statistik
|
||
if non_fixed:
|
||
d3d_values = [c['d3d'] for c in non_fixed]
|
||
dx_values = [abs(c['dx']) for c in non_fixed]
|
||
dy_values = [abs(c['dy']) for c in non_fixed]
|
||
|
||
lines.append("")
|
||
lines.append("### Statistik (nur Neupunkte):")
|
||
lines.append(f" Anzahl Punkte: {len(non_fixed)}")
|
||
lines.append(f" Min Δ3D: {min(d3d_values)*1000:.2f} mm")
|
||
lines.append(f" Max Δ3D: {max(d3d_values)*1000:.2f} mm")
|
||
lines.append(f" Mittel Δ3D: {sum(d3d_values)/len(d3d_values)*1000:.2f} mm")
|
||
lines.append(f" Max |ΔX|: {max(dx_values)*1000:.2f} mm")
|
||
lines.append(f" Max |ΔY|: {max(dy_values)*1000:.2f} mm")
|
||
|
||
if not critical_errors and not warnings:
|
||
lines.append("")
|
||
lines.append("✅ Alle Abweichungen im akzeptablen Bereich (< 10cm)")
|
||
|
||
return "\n".join(lines)
|
||
|
||
def export_comparison(self, filepath: str, format: str = 'csv'):
|
||
"""Exportiert den Punktevergleich"""
|
||
comparison = self.get_point_comparison()
|
||
|
||
if format == 'csv':
|
||
lines = ["Punkt;X_vorher;Y_vorher;Z_vorher;X_nachher;Y_nachher;Z_nachher;DX_mm;DY_mm;DZ_mm;D3D_mm;Festpunkt"]
|
||
for c in comparison:
|
||
fixed = "Ja" if c['is_fixed'] else "Nein"
|
||
lines.append(f"{c['name']};{c['x_before']:.4f};{c['y_before']:.4f};{c['z_before']:.4f};"
|
||
f"{c['x_after']:.4f};{c['y_after']:.4f};{c['z_after']:.4f};"
|
||
f"{c['dx']*1000:.2f};{c['dy']*1000:.2f};{c['dz']*1000:.2f};{c['d3d']*1000:.2f};{fixed}")
|
||
else:
|
||
lines = [self.get_comparison_report()]
|
||
|
||
with open(filepath, 'w', encoding='utf-8') as f:
|
||
f.write("\n".join(lines))
|