trimble_geodesy/modules/network_adjustment.py

634 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Netzausgleichungs-Modul
Methode der kleinsten Quadrate für geodätische Netze
Basiert auf Beobachtungen aus JXL-Dateien
"""
import math
import numpy as np
from scipy import sparse
from scipy.sparse.linalg import spsolve
from typing import List, Dict, Tuple, Optional, Set
from dataclasses import dataclass, field
from .jxl_parser import JXLParser, Point, Station
@dataclass
class Observation:
"""Repräsentiert eine geodätische Beobachtung"""
obs_type: str # 'direction', 'distance', 'zenith', 'height_diff'
from_station: str
to_point: str
value: float # Messwert (Gon, Meter, etc.)
std_dev: float # Standardabweichung
residual: float = 0.0 # Verbesserung
weight: float = 1.0
def __post_init__(self):
if self.std_dev > 0:
self.weight = 1.0 / (self.std_dev ** 2)
@dataclass
class AdjustedPoint:
"""Ausgeglichener Punkt mit Genauigkeitsangaben"""
name: str
x: float # East
y: float # North
z: float # Elevation
# A-priori Koordinaten
x_approx: float = 0.0
y_approx: float = 0.0
z_approx: float = 0.0
# Korrekturen
dx: float = 0.0
dy: float = 0.0
dz: float = 0.0
# Standardabweichungen
std_x: float = 0.0
std_y: float = 0.0
std_z: float = 0.0
std_position: float = 0.0 # 2D Punktlagegenauigkeit
# Konfidenzellipse
semi_major: float = 0.0
semi_minor: float = 0.0
orientation: float = 0.0 # in Gon
# Status
is_fixed: bool = False
@dataclass
class AdjustmentResult:
"""Ergebnis der Netzausgleichung"""
# Ausgeglichene Punkte
adjusted_points: Dict[str, AdjustedPoint] = field(default_factory=dict)
# Beobachtungen mit Residuen
observations: List[Observation] = field(default_factory=list)
# Globale Qualitätsparameter
sigma_0_priori: float = 1.0
sigma_0_posteriori: float = 0.0
chi_square: float = 0.0
redundancy: int = 0
# RMSE der Residuen
rmse_directions: float = 0.0 # in mgon
rmse_distances: float = 0.0 # in mm
rmse_zenith: float = 0.0 # in mgon
# Iterationsinfo
iterations: int = 0
converged: bool = False
# Statistiken
num_points: int = 0
num_fixed_points: int = 0
num_observations: int = 0
num_unknowns: int = 0
class NetworkAdjustment:
"""
Netzausgleichung nach der Methode der kleinsten Quadrate
Gauß-Markov-Modell mit Beobachtungsgleichungen
"""
def __init__(self, parser: JXLParser):
self.parser = parser
self.observations: List[Observation] = []
self.points: Dict[str, AdjustedPoint] = {}
self.fixed_points: Set[str] = set()
self.result: Optional[AdjustmentResult] = None
# Konfiguration
self.max_iterations: int = 20
self.convergence_limit: float = 1e-8 # Meter
self.sigma_0_priori: float = 1.0
# Standard-Genauigkeiten (falls nicht aus JXL)
self.default_std_direction: float = 0.0003 # Gon (0.3 mgon)
self.default_std_distance: float = 0.002 # Meter
self.default_std_zenith: float = 0.0003 # Gon
def extract_observations(self):
"""Extrahiert Beobachtungen aus JXL-Daten"""
self.observations = []
for station_id, station in self.parser.stations.items():
# Messungen von dieser Station
measurements = self.parser.get_measurements_from_station(station_id)
# Backbearing für Orientierung finden
orientation = 0.0
for bb_id, bb in self.parser.backbearings.items():
if bb.station_record_id == station_id:
if bb.orientation_correction is not None:
orientation = bb.orientation_correction
break
for meas in measurements:
if meas.name and not meas.deleted:
# Richtung
if meas.horizontal_circle is not None:
std = meas.hz_std_error if meas.hz_std_error else self.default_std_direction
azimuth = meas.horizontal_circle + orientation
self.observations.append(Observation(
obs_type='direction',
from_station=station.name,
to_point=meas.name,
value=azimuth,
std_dev=std
))
# Strecke
if meas.edm_distance is not None:
std = meas.dist_std_error if meas.dist_std_error else self.default_std_distance
# Prismenkonstante berücksichtigen
prism_const = 0.0
if meas.target_id in self.parser.targets:
prism_const = self.parser.targets[meas.target_id].prism_constant
distance = meas.edm_distance + prism_const
self.observations.append(Observation(
obs_type='distance',
from_station=station.name,
to_point=meas.name,
value=distance,
std_dev=std
))
# Zenitwinkel
if meas.vertical_circle is not None:
std = meas.vz_std_error if meas.vz_std_error else self.default_std_zenith
self.observations.append(Observation(
obs_type='zenith',
from_station=station.name,
to_point=meas.name,
value=meas.vertical_circle,
std_dev=std
))
return self.observations
def initialize_points(self):
"""Initialisiert Näherungskoordinaten aus JXL-Daten"""
self.points = {}
# Alle aktiven Punkte
for name, point in self.parser.get_active_points().items():
self.points[name] = AdjustedPoint(
name=name,
x=point.east if point.east is not None else 0.0,
y=point.north if point.north is not None else 0.0,
z=point.elevation if point.elevation is not None else 0.0,
x_approx=point.east if point.east is not None else 0.0,
y_approx=point.north if point.north is not None else 0.0,
z_approx=point.elevation if point.elevation is not None else 0.0
)
# Stationen hinzufügen
for station_id, station in self.parser.stations.items():
if station.name not in self.points:
self.points[station.name] = AdjustedPoint(
name=station.name,
x=station.east if station.east is not None else 0.0,
y=station.north if station.north is not None else 0.0,
z=station.elevation if station.elevation is not None else 0.0,
x_approx=station.east if station.east is not None else 0.0,
y_approx=station.north if station.north is not None else 0.0,
z_approx=station.elevation if station.elevation is not None else 0.0
)
def set_fixed_point(self, point_name: str):
"""Setzt einen Punkt als Festpunkt"""
if point_name in self.points:
self.fixed_points.add(point_name)
self.points[point_name].is_fixed = True
def set_fixed_points_auto(self):
"""Setzt automatisch Festpunkte (Referenzpunkte)"""
# Referenzlinie als Festpunkte
ref_line = self.parser.get_reference_line()
if ref_line:
self.set_fixed_point(ref_line.start_point)
self.set_fixed_point(ref_line.end_point)
# Zusätzlich: Erste Station als Festpunkt falls keine Referenzlinie
if not self.fixed_points:
stations = list(self.parser.stations.values())
if stations:
first_station = min(stations, key=lambda s: s.timestamp)
self.set_fixed_point(first_station.name)
def adjust(self) -> AdjustmentResult:
"""
Führt die Netzausgleichung durch
Iterative Lösung nach Gauß-Newton
"""
if not self.observations:
self.extract_observations()
if not self.points:
self.initialize_points()
if not self.fixed_points:
self.set_fixed_points_auto()
# Unbekannte bestimmen (nur nicht-fixierte Punkte)
unknown_points = [name for name in self.points.keys()
if name not in self.fixed_points]
num_unknowns = len(unknown_points) * 2 # Nur X, Y (2D-Ausgleichung)
num_observations = len([o for o in self.observations
if o.obs_type in ['direction', 'distance']])
if num_unknowns == 0:
raise ValueError("Keine unbekannten Punkte!")
if num_observations < num_unknowns:
raise ValueError(f"Unterbestimmtes System: {num_observations} Beobachtungen, "
f"{num_unknowns} Unbekannte")
# Index-Mapping für Unbekannte
point_index = {name: i for i, name in enumerate(unknown_points)}
# Iterative Lösung
converged = False
iteration = 0
while not converged and iteration < self.max_iterations:
iteration += 1
# Designmatrix A und Beobachtungsvektor l erstellen
A, l, P = self._build_normal_equations(point_index, num_unknowns)
# Normalgleichungssystem: N = A^T * P * A, n = A^T * P * l
N = A.T @ np.diag(P) @ A
n = A.T @ np.diag(P) @ l
# Lösung: x = N^-1 * n
try:
dx = np.linalg.solve(N, n)
except np.linalg.LinAlgError:
# Regularisierung bei singulärer Matrix
N += np.eye(num_unknowns) * 1e-10
dx = np.linalg.solve(N, n)
# Koordinaten aktualisieren
max_correction = 0.0
for name, idx in point_index.items():
i = idx * 2
self.points[name].dx = dx[i]
self.points[name].dy = dx[i + 1]
self.points[name].x += dx[i]
self.points[name].y += dx[i + 1]
max_correction = max(max_correction,
abs(dx[i]), abs(dx[i + 1]))
# Konvergenzprüfung
if max_correction < self.convergence_limit:
converged = True
# Nachbearbeitung
self._compute_residuals()
self._compute_accuracy(point_index, num_unknowns)
# Ergebnis zusammenstellen
self.result = AdjustmentResult(
adjusted_points=self.points,
observations=self.observations,
sigma_0_priori=self.sigma_0_priori,
iterations=iteration,
converged=converged,
num_points=len(self.points),
num_fixed_points=len(self.fixed_points),
num_observations=num_observations,
num_unknowns=num_unknowns,
redundancy=num_observations - num_unknowns
)
self._compute_global_statistics()
return self.result
def _build_normal_equations(self, point_index: Dict[str, int],
num_unknowns: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Erstellt Designmatrix A und Beobachtungsvektor l"""
# Nur Richtungen und Strecken für 2D-Ausgleichung
relevant_obs = [o for o in self.observations
if o.obs_type in ['direction', 'distance']]
n_obs = len(relevant_obs)
A = np.zeros((n_obs, num_unknowns))
l = np.zeros(n_obs)
P = np.zeros(n_obs)
for i, obs in enumerate(relevant_obs):
from_pt = self.points.get(obs.from_station)
to_pt = self.points.get(obs.to_point)
if from_pt is None or to_pt is None:
continue
dx = to_pt.x - from_pt.x
dy = to_pt.y - from_pt.y
dist = math.sqrt(dx**2 + dy**2)
if dist < 1e-10:
continue
from_idx = point_index.get(obs.from_station)
to_idx = point_index.get(obs.to_point)
if obs.obs_type == 'direction':
# Azimut berechnen
azimuth_calc = math.atan2(dx, dy) * 200.0 / math.pi
if azimuth_calc < 0:
azimuth_calc += 400.0
# Partielle Ableitungen (in Gon)
# dAz/dx = dy / (rho * s^2)
# dAz/dy = -dx / (rho * s^2)
rho = 200.0 / math.pi
factor = rho / (dist ** 2)
if from_idx is not None:
A[i, from_idx * 2] = -dy * factor # dAz/dx_from
A[i, from_idx * 2 + 1] = dx * factor # dAz/dy_from
if to_idx is not None:
A[i, to_idx * 2] = dy * factor # dAz/dx_to
A[i, to_idx * 2 + 1] = -dx * factor # dAz/dy_to
# Verkürzung l = beobachtet - berechnet
diff = obs.value - azimuth_calc
# Normalisierung auf [-200, 200] Gon
while diff > 200:
diff -= 400
while diff < -200:
diff += 400
l[i] = diff
elif obs.obs_type == 'distance':
# Partielle Ableitungen
# ds/dx = dx/s
# ds/dy = dy/s
if from_idx is not None:
A[i, from_idx * 2] = -dx / dist
A[i, from_idx * 2 + 1] = -dy / dist
if to_idx is not None:
A[i, to_idx * 2] = dx / dist
A[i, to_idx * 2 + 1] = dy / dist
# Verkürzung
l[i] = obs.value - dist
# Gewicht
P[i] = obs.weight
return A, l, P
def _compute_residuals(self):
"""Berechnet Verbesserungen (Residuen) für alle Beobachtungen"""
for obs in self.observations:
from_pt = self.points.get(obs.from_station)
to_pt = self.points.get(obs.to_point)
if from_pt is None or to_pt is None:
continue
dx = to_pt.x - from_pt.x
dy = to_pt.y - from_pt.y
dz = to_pt.z - from_pt.z
dist_2d = math.sqrt(dx**2 + dy**2)
dist_3d = math.sqrt(dx**2 + dy**2 + dz**2)
if obs.obs_type == 'direction':
azimuth_calc = math.atan2(dx, dy) * 200.0 / math.pi
if azimuth_calc < 0:
azimuth_calc += 400.0
residual = obs.value - azimuth_calc
while residual > 200:
residual -= 400
while residual < -200:
residual += 400
obs.residual = residual
elif obs.obs_type == 'distance':
obs.residual = obs.value - dist_3d
elif obs.obs_type == 'zenith':
if dist_2d > 0:
zenith_calc = math.atan2(dist_2d, dz) * 200.0 / math.pi
obs.residual = obs.value - zenith_calc
def _compute_accuracy(self, point_index: Dict[str, int], num_unknowns: int):
"""Berechnet Genauigkeitsmaße für ausgeglichene Punkte"""
# Vereinfachte Berechnung der Standardabweichungen
# Vollständige Varianzfortpflanzung würde Inverse von N erfordern
# Erstelle Normalgleichungsmatrix erneut
A, l, P = self._build_normal_equations(point_index, num_unknowns)
N = A.T @ np.diag(P) @ A
try:
Q = np.linalg.inv(N) # Kofaktormatrix
except np.linalg.LinAlgError:
Q = np.eye(num_unknowns) * 0.001
# A-posteriori Varianzfaktor
v = A @ np.zeros(num_unknowns) - l # Residuen (vereinfacht)
redundancy = len(l) - num_unknowns
if redundancy > 0:
sum_pvv = np.sum(P * v**2)
sigma_0_post = math.sqrt(sum_pvv / redundancy)
else:
sigma_0_post = self.sigma_0_priori
self.sigma_0_posteriori = sigma_0_post
# Punktgenauigkeiten
for name, idx in point_index.items():
i = idx * 2
# Standardabweichungen
self.points[name].std_x = sigma_0_post * math.sqrt(abs(Q[i, i]))
self.points[name].std_y = sigma_0_post * math.sqrt(abs(Q[i + 1, i + 1]))
# Punktlagegenauigkeit
self.points[name].std_position = math.sqrt(
self.points[name].std_x**2 + self.points[name].std_y**2
)
# Konfidenzellipse (vereinfacht)
# Vollständige Berechnung würde Eigenwertanalyse erfordern
cov_xy = Q[i, i + 1] if i + 1 < num_unknowns else 0
var_x = Q[i, i]
var_y = Q[i + 1, i + 1]
# Eigenwerte für Ellipse
trace = var_x + var_y
det = var_x * var_y - cov_xy**2
discriminant = trace**2 / 4 - det
if discriminant >= 0:
sqrt_disc = math.sqrt(discriminant)
lambda1 = trace / 2 + sqrt_disc
lambda2 = trace / 2 - sqrt_disc
self.points[name].semi_major = sigma_0_post * math.sqrt(max(lambda1, 0))
self.points[name].semi_minor = sigma_0_post * math.sqrt(max(lambda2, 0))
if abs(var_x - var_y) > 1e-10:
self.points[name].orientation = 0.5 * math.atan2(2 * cov_xy, var_x - var_y) * 200 / math.pi
def _compute_global_statistics(self):
"""Berechnet globale Statistiken"""
if self.result is None:
return
# RMSE für verschiedene Beobachtungstypen
dir_residuals = [o.residual for o in self.observations if o.obs_type == 'direction']
dist_residuals = [o.residual for o in self.observations if o.obs_type == 'distance']
zen_residuals = [o.residual for o in self.observations if o.obs_type == 'zenith']
if dir_residuals:
self.result.rmse_directions = math.sqrt(sum(r**2 for r in dir_residuals) / len(dir_residuals)) * 1000 # mgon
if dist_residuals:
self.result.rmse_distances = math.sqrt(sum(r**2 for r in dist_residuals) / len(dist_residuals)) * 1000 # mm
if zen_residuals:
self.result.rmse_zenith = math.sqrt(sum(r**2 for r in zen_residuals) / len(zen_residuals)) * 1000 # mgon
self.result.sigma_0_posteriori = self.sigma_0_posteriori
# Chi-Quadrat-Test
if self.result.redundancy > 0:
self.result.chi_square = (self.sigma_0_posteriori / self.sigma_0_priori) ** 2 * self.result.redundancy
def get_adjustment_report(self) -> str:
"""Erstellt einen detaillierten Ausgleichungsbericht"""
if self.result is None:
return "Keine Ausgleichung durchgeführt."
lines = []
lines.append("=" * 80)
lines.append("NETZAUSGLEICHUNG - ERGEBNISBERICHT")
lines.append("=" * 80)
lines.append("")
# Allgemeine Informationen
lines.append("ALLGEMEINE INFORMATIONEN")
lines.append("-" * 80)
lines.append(f"Job: {self.parser.job_name}")
lines.append(f"Anzahl Punkte: {self.result.num_points}")
lines.append(f" davon Festpunkte: {self.result.num_fixed_points}")
lines.append(f" davon Neupunkte: {self.result.num_points - self.result.num_fixed_points}")
lines.append(f"Anzahl Beobachtungen: {self.result.num_observations}")
lines.append(f"Anzahl Unbekannte: {self.result.num_unknowns}")
lines.append(f"Redundanz: {self.result.redundancy}")
lines.append(f"Iterationen: {self.result.iterations}")
lines.append(f"Konvergiert: {'Ja' if self.result.converged else 'Nein'}")
lines.append("")
# Qualitätsparameter
lines.append("GLOBALE QUALITÄTSPARAMETER")
lines.append("-" * 80)
lines.append(f"Sigma-0 a-priori: {self.sigma_0_priori:.4f}")
lines.append(f"Sigma-0 a-posteriori: {self.result.sigma_0_posteriori:.4f}")
lines.append(f"Chi-Quadrat: {self.result.chi_square:.2f}")
lines.append(f"RMSE Richtungen: {self.result.rmse_directions:.2f} mgon")
lines.append(f"RMSE Strecken: {self.result.rmse_distances:.2f} mm")
lines.append(f"RMSE Zenitwinkel: {self.result.rmse_zenith:.2f} mgon")
lines.append("")
# Festpunkte
lines.append("FESTPUNKTE")
lines.append("-" * 80)
lines.append(f"{'Punkt':<12} {'X [m]':>14} {'Y [m]':>14} {'Z [m]':>12}")
lines.append("-" * 80)
for name in self.fixed_points:
if name in self.points:
p = self.points[name]
lines.append(f"{name:<12} {p.x:>14.4f} {p.y:>14.4f} {p.z:>12.4f}")
lines.append("")
# Ausgeglichene Koordinaten
lines.append("AUSGEGLICHENE KOORDINATEN (NEUPUNKTE)")
lines.append("-" * 80)
lines.append(f"{'Punkt':<12} {'X [m]':>14} {'Y [m]':>14} {'Z [m]':>12} {'σX [mm]':>10} {'σY [mm]':>10} {'σPos [mm]':>10}")
lines.append("-" * 80)
for name, p in sorted(self.points.items()):
if name not in self.fixed_points:
lines.append(f"{name:<12} {p.x:>14.4f} {p.y:>14.4f} {p.z:>12.4f} "
f"{p.std_x*1000:>10.2f} {p.std_y*1000:>10.2f} {p.std_position*1000:>10.2f}")
lines.append("")
# Beobachtungen und Residuen
lines.append("BEOBACHTUNGEN UND VERBESSERUNGEN")
lines.append("-" * 80)
lines.append(f"{'Von':<10} {'Nach':<10} {'Typ':<10} {'Messwert':>14} {'Residuum':>12}")
lines.append("-" * 80)
for obs in sorted(self.observations, key=lambda x: (x.from_station, x.to_point)):
if obs.obs_type == 'direction':
unit = "gon"
res_str = f"{obs.residual*1000:.2f} mgon"
elif obs.obs_type == 'distance':
unit = "m"
res_str = f"{obs.residual*1000:.2f} mm"
elif obs.obs_type == 'zenith':
unit = "gon"
res_str = f"{obs.residual*1000:.2f} mgon"
else:
unit = ""
res_str = f"{obs.residual:.4f}"
lines.append(f"{obs.from_station:<10} {obs.to_point:<10} {obs.obs_type:<10} "
f"{obs.value:>12.4f} {unit:<2} {res_str:>12}")
lines.append("")
lines.append("=" * 80)
return "\n".join(lines)
def export_adjusted_points(self, filepath: str, format: str = 'csv'):
"""Exportiert ausgeglichene Punkte"""
if format == 'csv':
lines = ["Punkt;X;Y;Z;Sigma_X;Sigma_Y;Sigma_Pos;Festpunkt"]
for name, p in sorted(self.points.items()):
fixed = "Ja" if p.is_fixed else "Nein"
lines.append(f"{name};{p.x:.4f};{p.y:.4f};{p.z:.4f};"
f"{p.std_x*1000:.2f};{p.std_y*1000:.2f};{p.std_position*1000:.2f};{fixed}")
else:
lines = []
for name, p in sorted(self.points.items()):
lines.append(f"{name}\t{p.x:.4f}\t{p.y:.4f}\t{p.z:.4f}")
with open(filepath, 'w', encoding='utf-8') as f:
f.write("\n".join(lines))
def export_report(self, filepath: str):
"""Exportiert den vollständigen Bericht"""
report = self.get_adjustment_report()
with open(filepath, 'w', encoding='utf-8') as f:
f.write(report)