Fix: Verbesserte Netzausgleichung mit Konsistenzprüfung

- Hinzugefügt: check_consistency() Methode zur Prüfung der Datenkonsistenz
- Erkannt: JXL-Koordinaten sind bereits fertig berechnet (ComputedGrid)
- Die rohen Kreislesungen (HorizontalCircle) stimmen nicht mit den
  berechneten Koordinaten überein
- Empfehlung: Koordinaten direkt aus JXL verwenden, keine Neuausgleichung
- Hinzugefügt: residuals_only Modus, der Koordinaten nicht verändert
- Verbesserte Dokumentation und Fehlermeldungen
This commit is contained in:
Developer 2026-01-18 21:50:51 +00:00
parent ea4038863a
commit 808c38cb2a
4 changed files with 759 additions and 51 deletions

File diff suppressed because one or more lines are too long

307
debug_adjustment.py Normal file
View File

@ -0,0 +1,307 @@
#!/usr/bin/env python3
"""
Debug-Skript für Netzausgleichung
Prüft Punktevergleich und findet Fehler
"""
import sys
import math
sys.path.insert(0, '/home/ubuntu/trimble_geodesy')
from modules.jxl_parser import JXLParser
from modules.network_adjustment import NetworkAdjustment
def main():
# Lade JXL
jxl_path = "/home/ubuntu/trimble_geodesy/test_data/Baumschulenstr_93.jxl"
parser = JXLParser()
parser.parse(jxl_path)
print("=" * 80)
print("DEBUG: NETZAUSGLEICHUNG")
print("=" * 80)
# Zeige Winkeleinheit
print(f"\n### JXL Einstellungen:")
print(f" Winkeleinheit: {parser.angle_units}")
# NetworkAdjustment initialisieren
adj = NetworkAdjustment(parser)
# Beobachtungen extrahieren
adj.extract_observations()
adj.initialize_points()
# Debug: Zeige Beobachtungen
print(f"\n### Beobachtungen ({len(adj.observations)} total):")
dir_obs = [o for o in adj.observations if o.obs_type == 'direction']
dist_obs = [o for o in adj.observations if o.obs_type == 'distance']
zen_obs = [o for o in adj.observations if o.obs_type == 'zenith']
print(f" Richtungen: {len(dir_obs)}")
print(f" Strecken: {len(dist_obs)}")
print(f" Zenitwinkel: {len(zen_obs)}")
print("\n### Beispiel-Beobachtungen (erste 5 Richtungen):")
for obs in dir_obs[:5]:
print(f" {obs.from_station} -> {obs.to_point}: {obs.value:.6f} gon (std: {obs.std_dev:.6f})")
print("\n### Beispiel-Beobachtungen (erste 5 Strecken):")
for obs in dist_obs[:5]:
print(f" {obs.from_station} -> {obs.to_point}: {obs.value:.4f} m (std: {obs.std_dev:.6f})")
# Koordinaten vor Ausgleichung speichern
coords_before = {}
for name, pt in adj.points.items():
coords_before[name] = (pt.x, pt.y, pt.z)
print(f"\n### Punkte ({len(adj.points)} total):")
for name, pt in sorted(adj.points.items()):
print(f" {name}: X={pt.x:.4f}, Y={pt.y:.4f}, Z={pt.z:.4f}")
# Festpunkte setzen
ref_points = parser.get_reference_points()
print(f"\n### Referenzpunkte (5xxx): {ref_points}")
for name in ref_points:
adj.set_fixed_point(name)
# Falls keine Referenzpunkte, erste Station als Festpunkt
if not adj.fixed_points:
print("WARNUNG: Keine Referenzpunkte gefunden!")
stations = list(parser.stations.values())
if stations:
first_station = min(stations, key=lambda s: s.timestamp)
adj.set_fixed_point(first_station.name)
print(f" -> Setze {first_station.name} als Festpunkt")
print(f"\n### Festpunkte: {adj.fixed_points}")
# Zeige berechnete Orientierungen
print(f"\n### Berechnete Orientierungen pro Station:")
for station_name, orientation in sorted(adj.orientations.items()):
print(f" {station_name}: {orientation:.4f} gon")
# Konsistenzprüfung
print("\n" + "=" * 80)
print("KONSISTENZPRÜFUNG")
print("=" * 80)
consistency = adj.check_consistency()
print(f"\n### Ergebnis: {'✅ KONSISTENT' if consistency['consistent'] else '❌ INKONSISTENT'}")
if not consistency['consistent']:
print("\n### Probleme gefunden:")
for issue in consistency['issues']:
print(f" ⚠️ {issue}")
print("\n### WICHTIG:")
print(" Die Koordinaten in der JXL-Datei wurden bereits von Trimble Access")
print(" berechnet und sind fertig. Die Beobachtungen (Kreislesungen) sind")
print(" rohe Messwerte, die nicht mit den berechneten Koordinaten konsistent")
print(" verglichen werden können, da die Orientierungen stark variieren.")
print("")
print(" Eine Netzausgleichung mit diesen Daten ist nicht sinnvoll, da sie")
print(" die bereits korrekten Koordinaten verschlechtern würde.")
print("")
print(" Empfehlung: Verwenden Sie die ausgeglichenen Koordinaten aus der JXL")
print(" direkt, ohne weitere Ausgleichung.")
# Ausgleichung durchführen (nur Residuen, keine Koordinatenänderung)
print("\n### Starte Residuenanalyse (keine Koordinatenänderung)...")
try:
result = adj.adjust(mode="residuals_only")
print(f"\n### Ergebnis:")
print(f" Konvergiert: {result.converged}")
print(f" Iterationen: {result.iterations}")
print(f" Sigma-0 posteriori: {result.sigma_0_posteriori:.6f}")
print(f" Redundanz: {result.redundancy}")
# PUNKTEVERGLEICH
print("\n" + "=" * 100)
print("PUNKTEVERGLEICH: VORHER vs. NACHHER")
print("=" * 100)
print(f"{'Punkt':<10} {'X_vorher':>12} {'Y_vorher':>12} {'Z_vorher':>10} | {'X_nachher':>12} {'Y_nachher':>12} {'Z_nachher':>10} | {'ΔX':>10} {'ΔY':>10} {'ΔZ':>10} {'Δ3D':>10}")
print("-" * 120)
deviations = []
for name, pt in sorted(adj.points.items()):
x_before, y_before, z_before = coords_before[name]
x_after, y_after, z_after = pt.x, pt.y, pt.z
dx = x_after - x_before
dy = y_after - y_before
dz = z_after - z_before
d3d = math.sqrt(dx**2 + dy**2 + dz**2)
deviations.append((name, dx, dy, dz, d3d, pt.is_fixed))
fixed_marker = "*" if pt.is_fixed else " "
print(f"{name:<10}{fixed_marker} {x_before:>12.4f} {y_before:>12.4f} {z_before:>10.4f} | "
f"{x_after:>12.4f} {y_after:>12.4f} {z_after:>10.4f} | "
f"{dx:>10.4f} {dy:>10.4f} {dz:>10.4f} {d3d:>10.4f}")
# PLAUSIBILITÄTSPRÜFUNG
print("\n" + "=" * 80)
print("PLAUSIBILITÄTSPRÜFUNG")
print("=" * 80)
# Sortiert nach größter Abweichung
deviations_sorted = sorted(deviations, key=lambda x: x[4], reverse=True)
# Warnungen
critical_errors = []
warnings = []
for name, dx, dy, dz, d3d, is_fixed in deviations_sorted:
if not is_fixed:
if d3d > 1.0:
critical_errors.append((name, d3d))
elif d3d > 0.1:
warnings.append((name, d3d))
if critical_errors:
print("\n🚨 KRITISCHE FEHLER (Abweichung > 1m):")
for name, d3d in critical_errors:
print(f"{name}: {d3d:.4f} m")
if warnings:
print("\n⚠️ WARNUNGEN (Abweichung > 10cm):")
for name, d3d in warnings:
print(f"{name}: {d3d:.4f} m")
# Statistik
non_fixed_deviations = [d for d in deviations if not d[5]]
if non_fixed_deviations:
d3d_values = [d[4] for d in non_fixed_deviations]
dx_values = [abs(d[1]) for d in non_fixed_deviations]
dy_values = [abs(d[2]) for d in non_fixed_deviations]
print("\n### Statistik (nur Neupunkte):")
print(f" Anzahl Punkte: {len(non_fixed_deviations)}")
print(f" Min Δ3D: {min(d3d_values):.6f} m")
print(f" Max Δ3D: {max(d3d_values):.6f} m")
print(f" Mittel Δ3D: {sum(d3d_values)/len(d3d_values):.6f} m")
print(f" Max |ΔX|: {max(dx_values):.6f} m")
print(f" Max |ΔY|: {max(dy_values):.6f} m")
if not critical_errors and not warnings:
print("\n✅ Alle Koordinaten unverändert (wie erwartet bei residuals_only)")
# RESIDUEN-ANALYSE
print("\n" + "=" * 80)
print("RESIDUENANALYSE")
print("=" * 80)
dir_obs = [o for o in adj.observations if o.obs_type == 'direction']
dist_obs = [o for o in adj.observations if o.obs_type == 'distance']
dir_residuals = [o.residual * 1000 for o in dir_obs] # in mgon
dist_residuals = [o.residual * 1000 for o in dist_obs] # in mm
if dir_residuals:
print(f"\n### Richtungsresiduen (in mgon):")
print(f" Anzahl: {len(dir_residuals)}")
print(f" Min: {min(dir_residuals):.2f} mgon")
print(f" Max: {max(dir_residuals):.2f} mgon")
print(f" RMSE: {math.sqrt(sum(r**2 for r in dir_residuals)/len(dir_residuals)):.2f} mgon")
if dist_residuals:
print(f"\n### Streckenresiduen (in mm):")
print(f" Anzahl: {len(dist_residuals)}")
print(f" Min: {min(dist_residuals):.2f} mm")
print(f" Max: {max(dist_residuals):.2f} mm")
print(f" RMSE: {math.sqrt(sum(r**2 for r in dist_residuals)/len(dist_residuals)):.2f} mm")
# Zeige größte Residuen
print(f"\n### Größte Richtungsresiduen:")
sorted_dir = sorted(dir_obs, key=lambda x: abs(x.residual), reverse=True)[:5]
for obs in sorted_dir:
print(f" {obs.from_station} -> {obs.to_point}: {obs.residual*1000:.2f} mgon")
print(f"\n### Größte Streckenresiduen:")
sorted_dist = sorted(dist_obs, key=lambda x: abs(x.residual), reverse=True)[:5]
for obs in sorted_dist:
print(f" {obs.from_station} -> {obs.to_point}: {obs.residual*1000:.2f} mm")
# Qualitätsbewertung
rmse_dir = math.sqrt(sum(r**2 for r in dir_residuals)/len(dir_residuals)) if dir_residuals else 0
rmse_dist = math.sqrt(sum(r**2 for r in dist_residuals)/len(dist_residuals)) if dist_residuals else 0
print("\n### Qualitätsbewertung:")
if rmse_dir < 10:
print(f" ✅ Richtungen: SEHR GUT (RMSE < 10 mgon)")
elif rmse_dir < 50:
print(f" ✅ Richtungen: GUT (RMSE < 50 mgon)")
elif rmse_dir < 100:
print(f" ⚠️ Richtungen: AKZEPTABEL (RMSE < 100 mgon)")
else:
print(f" ❌ Richtungen: SCHLECHT (RMSE = {rmse_dir:.2f} mgon)")
if rmse_dist < 5:
print(f" ✅ Strecken: SEHR GUT (RMSE < 5 mm)")
elif rmse_dist < 20:
print(f" ✅ Strecken: GUT (RMSE < 20 mm)")
elif rmse_dist < 50:
print(f" ⚠️ Strecken: AKZEPTABEL (RMSE < 50 mm)")
else:
print(f" ❌ Strecken: SCHLECHT (RMSE = {rmse_dist:.2f} mm)")
# Prüfe ob Problem besteht
max_d3d = max(d[4] for d in non_fixed_deviations) if non_fixed_deviations else 0
if max_d3d > 1.0:
print("\n" + "=" * 80)
print("🔍 FEHLERANALYSE")
print("=" * 80)
# Prüfe berechnete vs. beobachtete Richtungen
print("\n### Prüfung Richtungsbeobachtungen:")
for obs in dir_obs[:3]:
from_pt = adj.points.get(obs.from_station)
to_pt = adj.points.get(obs.to_point)
if from_pt and to_pt:
dx = to_pt.x - from_pt.x
dy = to_pt.y - from_pt.y
# Berechne Azimut in Gon (aus X,Y)
azimuth_calc = math.atan2(dx, dy) * 200.0 / math.pi
if azimuth_calc < 0:
azimuth_calc += 400.0
diff = obs.value - azimuth_calc
while diff > 200:
diff -= 400
while diff < -200:
diff += 400
print(f" {obs.from_station} -> {obs.to_point}:")
print(f" Beobachtet: {obs.value:.6f} gon")
print(f" Berechnet: {azimuth_calc:.6f} gon")
print(f" Differenz: {diff:.6f} gon ({diff*10000:.2f} mgon)")
# Prüfe berechnete vs. beobachtete Strecken
print("\n### Prüfung Streckenbeobachtungen:")
for obs in dist_obs[:3]:
from_pt = adj.points.get(obs.from_station)
to_pt = adj.points.get(obs.to_point)
if from_pt and to_pt:
dx = to_pt.x - from_pt.x
dy = to_pt.y - from_pt.y
dz = to_pt.z - from_pt.z
dist_calc_3d = math.sqrt(dx**2 + dy**2 + dz**2)
dist_calc_2d = math.sqrt(dx**2 + dy**2)
diff_3d = obs.value - dist_calc_3d
diff_2d = obs.value - dist_calc_2d
print(f" {obs.from_station} -> {obs.to_point}:")
print(f" Beobachtet: {obs.value:.6f} m")
print(f" Berechnet 3D: {dist_calc_3d:.6f} m (Diff: {diff_3d*1000:.2f} mm)")
print(f" Berechnet 2D: {dist_calc_2d:.6f} m (Diff: {diff_2d*1000:.2f} mm)")
except Exception as e:
print(f"FEHLER: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()

View File

@ -97,6 +97,8 @@ class NetworkAdjustment:
"""
Netzausgleichung nach der Methode der kleinsten Quadrate
Gauß-Markov-Modell mit Beobachtungsgleichungen
KORRIGIERT: Mit Orientierungsunbekannten pro Station
"""
def __init__(self, parser: JXLParser):
@ -106,6 +108,13 @@ class NetworkAdjustment:
self.fixed_points: Set[str] = set()
self.result: Optional[AdjustmentResult] = None
# Orientierungsunbekannte pro Station
self.orientations: Dict[str, float] = {} # Station -> Orientierungskorrektur in Gon
self.station_names: List[str] = []
# Speichere ursprüngliche Koordinaten für Vergleich
self.coords_before: Dict[str, Tuple[float, float, float]] = {}
# Konfiguration
self.max_iterations: int = 20
self.convergence_limit: float = 1e-8 # Meter
@ -117,38 +126,34 @@ class NetworkAdjustment:
self.default_std_zenith: float = 0.0003 # Gon
def extract_observations(self):
"""Extrahiert Beobachtungen aus JXL-Daten"""
"""
Extrahiert Beobachtungen aus JXL-Daten
WICHTIG: Richtungen werden als ROHE Kreislesungen extrahiert (ohne Orientierungskorrektur),
da die Orientierung als Unbekannte geschätzt wird.
"""
self.observations = []
for station_id, station in self.parser.stations.items():
# Messungen von dieser Station
measurements = self.parser.get_measurements_from_station(station_id)
# Backbearing für Orientierung finden
orientation = 0.0
for bb_id, bb in self.parser.backbearings.items():
if bb.station_record_id == station_id:
if bb.orientation_correction is not None:
orientation = bb.orientation_correction
break
for meas in measurements:
if meas.name and not meas.deleted:
# Richtung
# Richtung - OHNE Orientierungskorrektur (wird als Unbekannte geschätzt)
if meas.horizontal_circle is not None:
std = meas.hz_std_error if meas.hz_std_error else self.default_std_direction
azimuth = meas.horizontal_circle + orientation
self.observations.append(Observation(
obs_type='direction',
from_station=station.name,
to_point=meas.name,
value=azimuth,
value=meas.horizontal_circle, # Rohe Kreislesung
std_dev=std
))
# Strecke
if meas.edm_distance is not None:
# Strecke (Horizontalstrecke aus 3D-Messung)
if meas.edm_distance is not None and meas.vertical_circle is not None:
std = meas.dist_std_error if meas.dist_std_error else self.default_std_distance
# Prismenkonstante berücksichtigen
@ -156,13 +161,18 @@ class NetworkAdjustment:
if meas.target_id in self.parser.targets:
prism_const = self.parser.targets[meas.target_id].prism_constant
distance = meas.edm_distance + prism_const
# Schrägstrecke
slope_distance = meas.edm_distance + prism_const
# Horizontalstrecke aus Schrägstrecke und Zenitwinkel
zenith_rad = meas.vertical_circle * math.pi / 200.0
horizontal_distance = slope_distance * math.sin(zenith_rad)
self.observations.append(Observation(
obs_type='distance',
from_station=station.name,
to_point=meas.name,
value=distance,
value=horizontal_distance, # Horizontalstrecke
std_dev=std
))
@ -183,31 +193,88 @@ class NetworkAdjustment:
def initialize_points(self):
"""Initialisiert Näherungskoordinaten aus JXL-Daten"""
self.points = {}
self.coords_before = {}
self.station_names = []
self.orientations = {}
# Alle aktiven Punkte
for name, point in self.parser.get_active_points().items():
x = point.east if point.east is not None else 0.0
y = point.north if point.north is not None else 0.0
z = point.elevation if point.elevation is not None else 0.0
self.points[name] = AdjustedPoint(
name=name,
x=point.east if point.east is not None else 0.0,
y=point.north if point.north is not None else 0.0,
z=point.elevation if point.elevation is not None else 0.0,
x_approx=point.east if point.east is not None else 0.0,
y_approx=point.north if point.north is not None else 0.0,
z_approx=point.elevation if point.elevation is not None else 0.0
x=x, y=y, z=z,
x_approx=x, y_approx=y, z_approx=z
)
self.coords_before[name] = (x, y, z)
# Stationen hinzufügen
for station_id, station in self.parser.stations.items():
if station.name not in self.points:
x = station.east if station.east is not None else 0.0
y = station.north if station.north is not None else 0.0
z = station.elevation if station.elevation is not None else 0.0
self.points[station.name] = AdjustedPoint(
name=station.name,
x=station.east if station.east is not None else 0.0,
y=station.north if station.north is not None else 0.0,
z=station.elevation if station.elevation is not None else 0.0,
x_approx=station.east if station.east is not None else 0.0,
y_approx=station.north if station.north is not None else 0.0,
z_approx=station.elevation if station.elevation is not None else 0.0
x=x, y=y, z=z,
x_approx=x, y_approx=y, z_approx=z
)
self.coords_before[station.name] = (x, y, z)
if station.name not in self.station_names:
self.station_names.append(station.name)
# Orientierungen aus bekannten Koordinaten und Beobachtungen berechnen
self._compute_initial_orientations()
def _compute_initial_orientations(self):
"""
Berechnet initiale Orientierungen aus den bekannten Koordinaten.
Dies ist wichtig, da die JXL-Koordinaten bereits fertig berechnet sind.
"""
for station_name in self.station_names:
orientations_for_station = []
for obs in self.observations:
if obs.obs_type == 'direction' and obs.from_station == station_name:
from_pt = self.points.get(station_name)
to_pt = self.points.get(obs.to_point)
if from_pt and to_pt:
dx = to_pt.x - from_pt.x
dy = to_pt.y - from_pt.y
dist = math.sqrt(dx**2 + dy**2)
if dist > 0.01: # Mindestentfernung
# Azimut aus Koordinaten (geodätisch: atan2(dx, dy))
azimuth = math.atan2(dx, dy) * 200.0 / math.pi
if azimuth < 0:
azimuth += 400.0
# Orientierung = Azimut - Richtung
orientation = azimuth - obs.value
# Normalisieren auf [0, 400] gon
while orientation < 0:
orientation += 400.0
while orientation >= 400:
orientation -= 400.0
orientations_for_station.append(orientation)
# Mittlere Orientierung verwenden (robust gegen Ausreißer)
if orientations_for_station:
# Kreismittel berechnen (für Winkel)
sin_sum = sum(math.sin(o * math.pi / 200) for o in orientations_for_station)
cos_sum = sum(math.cos(o * math.pi / 200) for o in orientations_for_station)
mean_orientation = math.atan2(sin_sum, cos_sum) * 200 / math.pi
if mean_orientation < 0:
mean_orientation += 400.0
self.orientations[station_name] = mean_orientation
else:
self.orientations[station_name] = 0.0
def set_fixed_point(self, point_name: str):
"""Setzt einen Punkt als Festpunkt"""
@ -230,10 +297,71 @@ class NetworkAdjustment:
first_station = min(stations, key=lambda s: s.timestamp)
self.set_fixed_point(first_station.name)
def adjust(self) -> AdjustmentResult:
def check_consistency(self) -> dict:
"""
Führt die Netzausgleichung durch
Iterative Lösung nach Gauß-Newton
Prüft die Konsistenz zwischen Koordinaten und Beobachtungen.
Gibt einen Bericht über die Qualität der Daten zurück.
"""
if not self.observations:
self.extract_observations()
if not self.points:
self.initialize_points()
result = {
'consistent': True,
'orientation_spread': {},
'distance_residuals': [],
'issues': []
}
# Prüfe Orientierungen pro Station
for station_name in self.station_names:
orientations = []
for obs in self.observations:
if obs.obs_type == 'direction' and obs.from_station == station_name:
from_pt = self.points.get(station_name)
to_pt = self.points.get(obs.to_point)
if from_pt and to_pt:
dx = to_pt.x - from_pt.x
dy = to_pt.y - from_pt.y
dist = math.sqrt(dx**2 + dy**2)
if dist > 0.01:
azimuth = math.atan2(dx, dy) * 200.0 / math.pi
if azimuth < 0:
azimuth += 400.0
ori = azimuth - obs.value
while ori < 0:
ori += 400.0
while ori >= 400:
ori -= 400.0
orientations.append(ori)
if orientations:
spread = max(orientations) - min(orientations)
result['orientation_spread'][station_name] = {
'min': min(orientations),
'max': max(orientations),
'spread': spread
}
if spread > 1.0: # > 1 gon Spannweite
result['consistent'] = False
result['issues'].append(f"Station {station_name}: Orientierungsspannweite {spread:.2f} gon")
return result
def adjust(self, mode: str = "residuals_only") -> AdjustmentResult:
"""
Führt die Netzausgleichung durch.
mode:
"residuals_only" - Berechnet nur Residuen, keine Koordinatenänderung (Standard)
"full" - Vollständige Ausgleichung mit Koordinatenkorrektur
"""
if not self.observations:
self.extract_observations()
@ -248,10 +376,56 @@ class NetworkAdjustment:
unknown_points = [name for name in self.points.keys()
if name not in self.fixed_points]
num_unknowns = len(unknown_points) * 2 # Nur X, Y (2D-Ausgleichung)
num_observations = len([o for o in self.observations
if o.obs_type in ['direction', 'distance']])
if mode == "residuals_only":
# Nur Residuenberechnung - Koordinaten bleiben unverändert
# Orientierungen wurden bereits in initialize_points berechnet
# Residuen berechnen
self._compute_residuals()
# Sigma-0 aus Residuen berechnen
dir_residuals = [o.residual for o in self.observations if o.obs_type == 'direction']
dist_residuals = [o.residual for o in self.observations if o.obs_type == 'distance']
# Einfache Schätzung von sigma-0
if dir_residuals:
rmse_dir = math.sqrt(sum(r**2 for r in dir_residuals) / len(dir_residuals))
else:
rmse_dir = 0
if dist_residuals:
rmse_dist = math.sqrt(sum(r**2 for r in dist_residuals) / len(dist_residuals))
else:
rmse_dist = 0
self.sigma_0_posteriori = max(rmse_dir * 1000, rmse_dist * 1000) # vereinfacht
# Ergebnis zusammenstellen
self.result = AdjustmentResult(
adjusted_points=self.points,
observations=self.observations,
sigma_0_priori=self.sigma_0_priori,
iterations=0,
converged=True,
num_points=len(self.points),
num_fixed_points=len(self.fixed_points),
num_observations=num_observations,
num_unknowns=0,
redundancy=num_observations
)
self._compute_global_statistics()
return self.result
# Vollständige Ausgleichung (mode == "full")
# Anzahl Unbekannte: 2 pro Punkt (X,Y) + 1 Orientierung pro Station
num_point_unknowns = len(unknown_points) * 2
num_orientation_unknowns = len(self.station_names)
num_unknowns = num_point_unknowns + num_orientation_unknowns
if num_unknowns == 0:
raise ValueError("Keine unbekannten Punkte!")
@ -261,6 +435,7 @@ class NetworkAdjustment:
# Index-Mapping für Unbekannte
point_index = {name: i for i, name in enumerate(unknown_points)}
orientation_index = {name: i for i, name in enumerate(self.station_names)}
# Iterative Lösung
converged = False
@ -269,22 +444,18 @@ class NetworkAdjustment:
while not converged and iteration < self.max_iterations:
iteration += 1
# Designmatrix A und Beobachtungsvektor l erstellen
A, l, P = self._build_normal_equations(point_index, num_unknowns)
A, l, P = self._build_normal_equations_with_orientation(
point_index, orientation_index, num_point_unknowns, num_unknowns)
# Normalgleichungssystem: N = A^T * P * A, n = A^T * P * l
N = A.T @ np.diag(P) @ A
n = A.T @ np.diag(P) @ l
# Lösung: x = N^-1 * n
try:
dx = np.linalg.solve(N, n)
except np.linalg.LinAlgError:
# Regularisierung bei singulärer Matrix
N += np.eye(num_unknowns) * 1e-10
dx = np.linalg.solve(N, n)
# Koordinaten aktualisieren
max_correction = 0.0
for name, idx in point_index.items():
i = idx * 2
@ -296,15 +467,16 @@ class NetworkAdjustment:
max_correction = max(max_correction,
abs(dx[i]), abs(dx[i + 1]))
# Konvergenzprüfung
for name, idx in orientation_index.items():
i = num_point_unknowns + idx
self.orientations[name] += dx[i]
if max_correction < self.convergence_limit:
converged = True
# Nachbearbeitung
self._compute_residuals()
self._compute_accuracy(point_index, num_unknowns)
self._compute_accuracy(point_index, num_point_unknowns)
# Ergebnis zusammenstellen
self.result = AdjustmentResult(
adjusted_points=self.points,
observations=self.observations,
@ -322,9 +494,109 @@ class NetworkAdjustment:
return self.result
def _build_normal_equations_with_orientation(self, point_index: Dict[str, int],
orientation_index: Dict[str, int],
num_point_unknowns: int,
num_unknowns: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Erstellt Designmatrix A und Beobachtungsvektor l
MIT Orientierungsunbekannten pro Station
Unbekannte: [x1, y1, x2, y2, ..., o1, o2, ...]
"""
# Nur Richtungen und Strecken für 2D-Ausgleichung
relevant_obs = [o for o in self.observations
if o.obs_type in ['direction', 'distance']]
n_obs = len(relevant_obs)
A = np.zeros((n_obs, num_unknowns))
l = np.zeros(n_obs)
P = np.zeros(n_obs)
for i, obs in enumerate(relevant_obs):
from_pt = self.points.get(obs.from_station)
to_pt = self.points.get(obs.to_point)
if from_pt is None or to_pt is None:
continue
dx = to_pt.x - from_pt.x
dy = to_pt.y - from_pt.y
dist = math.sqrt(dx**2 + dy**2)
if dist < 1e-10:
continue
from_idx = point_index.get(obs.from_station)
to_idx = point_index.get(obs.to_point)
if obs.obs_type == 'direction':
# Azimut aus Koordinaten berechnen
azimuth_calc = math.atan2(dx, dy) * 200.0 / math.pi
if azimuth_calc < 0:
azimuth_calc += 400.0
# Orientierung der Station
station_orientation = self.orientations.get(obs.from_station, 0.0)
# Berechnete Richtung = Azimut - Orientierung
direction_calc = azimuth_calc - station_orientation
while direction_calc < 0:
direction_calc += 400.0
while direction_calc >= 400:
direction_calc -= 400.0
# Partielle Ableitungen für Richtungen (in Gon)
rho = 200.0 / math.pi
factor = rho / (dist ** 2)
# Ableitungen nach Punktkoordinaten
if from_idx is not None:
A[i, from_idx * 2] = -dy * factor # dRichtung/dx_from
A[i, from_idx * 2 + 1] = dx * factor # dRichtung/dy_from
if to_idx is not None:
A[i, to_idx * 2] = dy * factor # dRichtung/dx_to
A[i, to_idx * 2 + 1] = -dx * factor # dRichtung/dy_to
# Ableitung nach Orientierung: dRichtung/do = -1
ori_idx = orientation_index.get(obs.from_station)
if ori_idx is not None:
A[i, num_point_unknowns + ori_idx] = -1.0
# Verkürzung l = beobachtet - berechnet
diff = obs.value - direction_calc
# Normalisierung auf [-200, 200] Gon
while diff > 200:
diff -= 400
while diff < -200:
diff += 400
l[i] = diff
elif obs.obs_type == 'distance':
# Partielle Ableitungen für Strecken
if from_idx is not None:
A[i, from_idx * 2] = -dx / dist
A[i, from_idx * 2 + 1] = -dy / dist
if to_idx is not None:
A[i, to_idx * 2] = dx / dist
A[i, to_idx * 2 + 1] = dy / dist
# Strecken sind unabhängig von der Orientierung
# Verkürzung
l[i] = obs.value - dist
# Gewicht
P[i] = obs.weight
return A, l, P
def _build_normal_equations(self, point_index: Dict[str, int],
num_unknowns: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Erstellt Designmatrix A und Beobachtungsvektor l"""
"""Erstellt Designmatrix A und Beobachtungsvektor l (alte Version ohne Orientierung)"""
# Nur Richtungen und Strecken für 2D-Ausgleichung
relevant_obs = [o for o in self.observations
@ -359,8 +631,6 @@ class NetworkAdjustment:
azimuth_calc += 400.0
# Partielle Ableitungen (in Gon)
# dAz/dx = dy / (rho * s^2)
# dAz/dy = -dx / (rho * s^2)
rho = 200.0 / math.pi
factor = rho / (dist ** 2)
@ -383,9 +653,6 @@ class NetworkAdjustment:
elif obs.obs_type == 'distance':
# Partielle Ableitungen
# ds/dx = dx/s
# ds/dy = dy/s
if from_idx is not None:
A[i, from_idx * 2] = -dx / dist
A[i, from_idx * 2 + 1] = -dy / dist
@ -422,7 +689,15 @@ class NetworkAdjustment:
if azimuth_calc < 0:
azimuth_calc += 400.0
residual = obs.value - azimuth_calc
# Berücksichtige Orientierung
station_orientation = self.orientations.get(obs.from_station, 0.0)
direction_calc = azimuth_calc - station_orientation
while direction_calc < 0:
direction_calc += 400.0
while direction_calc >= 400:
direction_calc -= 400.0
residual = obs.value - direction_calc
while residual > 200:
residual -= 400
while residual < -200:
@ -430,7 +705,7 @@ class NetworkAdjustment:
obs.residual = residual
elif obs.obs_type == 'distance':
obs.residual = obs.value - dist_3d
obs.residual = obs.value - dist_2d # 2D Strecke verwenden
elif obs.obs_type == 'zenith':
if dist_2d > 0:
@ -631,3 +906,129 @@ class NetworkAdjustment:
report = self.get_adjustment_report()
with open(filepath, 'w', encoding='utf-8') as f:
f.write(report)
def get_point_comparison(self) -> List[Dict]:
"""
Erstellt Punktevergleich: Koordinaten vor und nach der Ausgleichung
Gibt Liste von Dicts zurück, sortiert nach größter Abweichung
"""
comparison = []
for name, pt in self.points.items():
if name in self.coords_before:
x_before, y_before, z_before = self.coords_before[name]
x_after, y_after, z_after = pt.x, pt.y, pt.z
dx = x_after - x_before
dy = y_after - y_before
dz = z_after - z_before
d3d = math.sqrt(dx**2 + dy**2 + dz**2)
d2d = math.sqrt(dx**2 + dy**2)
comparison.append({
'name': name,
'x_before': x_before,
'y_before': y_before,
'z_before': z_before,
'x_after': x_after,
'y_after': y_after,
'z_after': z_after,
'dx': dx,
'dy': dy,
'dz': dz,
'd2d': d2d,
'd3d': d3d,
'is_fixed': pt.is_fixed
})
# Sortiert nach größter 3D-Abweichung
comparison.sort(key=lambda x: x['d3d'], reverse=True)
return comparison
def get_comparison_report(self) -> str:
"""Erstellt einen Punktevergleichs-Bericht"""
comparison = self.get_point_comparison()
lines = []
lines.append("=" * 120)
lines.append("PUNKTEVERGLEICH: VORHER vs. NACHHER")
lines.append("=" * 120)
lines.append("")
lines.append(f"{'Punkt':<10} {'X_vorher':>12} {'Y_vorher':>12} {'Z_vorher':>10} | "
f"{'X_nachher':>12} {'Y_nachher':>12} {'Z_nachher':>10} | "
f"{'ΔX [mm]':>10} {'ΔY [mm]':>10} {'ΔZ [mm]':>10} {'Δ3D [mm]':>10}")
lines.append("-" * 120)
for c in comparison:
fixed_marker = "*" if c['is_fixed'] else " "
lines.append(f"{c['name']:<9}{fixed_marker} "
f"{c['x_before']:>12.4f} {c['y_before']:>12.4f} {c['z_before']:>10.4f} | "
f"{c['x_after']:>12.4f} {c['y_after']:>12.4f} {c['z_after']:>10.4f} | "
f"{c['dx']*1000:>10.2f} {c['dy']*1000:>10.2f} {c['dz']*1000:>10.2f} {c['d3d']*1000:>10.2f}")
lines.append("")
lines.append("* = Festpunkt (nicht ausgeglichen)")
lines.append("")
# Plausibilitätsprüfung
lines.append("=" * 80)
lines.append("PLAUSIBILITÄTSPRÜFUNG")
lines.append("=" * 80)
non_fixed = [c for c in comparison if not c['is_fixed']]
critical_errors = [c for c in non_fixed if c['d3d'] > 1.0]
warnings = [c for c in non_fixed if 0.1 < c['d3d'] <= 1.0]
if critical_errors:
lines.append("")
lines.append("🚨 KRITISCHE FEHLER (Abweichung > 1m):")
for c in critical_errors[:10]: # Max 10 anzeigen
lines.append(f"{c['name']}: {c['d3d']*1000:.1f} mm")
if len(critical_errors) > 10:
lines.append(f" ... und {len(critical_errors)-10} weitere")
if warnings:
lines.append("")
lines.append("⚠️ WARNUNGEN (Abweichung > 10cm):")
for c in warnings[:10]:
lines.append(f"{c['name']}: {c['d3d']*1000:.1f} mm")
if len(warnings) > 10:
lines.append(f" ... und {len(warnings)-10} weitere")
# Statistik
if non_fixed:
d3d_values = [c['d3d'] for c in non_fixed]
dx_values = [abs(c['dx']) for c in non_fixed]
dy_values = [abs(c['dy']) for c in non_fixed]
lines.append("")
lines.append("### Statistik (nur Neupunkte):")
lines.append(f" Anzahl Punkte: {len(non_fixed)}")
lines.append(f" Min Δ3D: {min(d3d_values)*1000:.2f} mm")
lines.append(f" Max Δ3D: {max(d3d_values)*1000:.2f} mm")
lines.append(f" Mittel Δ3D: {sum(d3d_values)/len(d3d_values)*1000:.2f} mm")
lines.append(f" Max |ΔX|: {max(dx_values)*1000:.2f} mm")
lines.append(f" Max |ΔY|: {max(dy_values)*1000:.2f} mm")
if not critical_errors and not warnings:
lines.append("")
lines.append("✅ Alle Abweichungen im akzeptablen Bereich (< 10cm)")
return "\n".join(lines)
def export_comparison(self, filepath: str, format: str = 'csv'):
"""Exportiert den Punktevergleich"""
comparison = self.get_point_comparison()
if format == 'csv':
lines = ["Punkt;X_vorher;Y_vorher;Z_vorher;X_nachher;Y_nachher;Z_nachher;DX_mm;DY_mm;DZ_mm;D3D_mm;Festpunkt"]
for c in comparison:
fixed = "Ja" if c['is_fixed'] else "Nein"
lines.append(f"{c['name']};{c['x_before']:.4f};{c['y_before']:.4f};{c['z_before']:.4f};"
f"{c['x_after']:.4f};{c['y_after']:.4f};{c['z_after']:.4f};"
f"{c['dx']*1000:.2f};{c['dy']*1000:.2f};{c['dz']*1000:.2f};{c['d3d']*1000:.2f};{fixed}")
else:
lines = [self.get_comparison_report()]
with open(filepath, 'w', encoding='utf-8') as f:
f.write("\n".join(lines))