385 lines
14 KiB
Python
385 lines
14 KiB
Python
"""
|
|
COR Generator Module - Generiert COR-Punktdateien aus JXL-Daten
|
|
Unterstützt Referenzlinien-Stationierung und freie Stationierung
|
|
"""
|
|
|
|
import math
|
|
from typing import Dict, List, Optional, Tuple
|
|
from dataclasses import dataclass
|
|
from .jxl_parser import JXLParser, Point, Station
|
|
|
|
|
|
@dataclass
|
|
class CORPoint:
|
|
"""Repräsentiert einen Punkt im COR-Format"""
|
|
name: str
|
|
x: float # East
|
|
y: float # North
|
|
z: float # Elevation
|
|
|
|
def to_cor_line(self) -> str:
|
|
"""Formatiert als COR-Zeile"""
|
|
# Format: |Name |X |Y |Z |
|
|
x_str = f"{self.x:.3f}" if abs(self.x) >= 0.001 else "0"
|
|
y_str = f"{self.y:.3f}" if abs(self.y) >= 0.001 else "0"
|
|
z_str = f"{self.z:.3f}" if abs(self.z) >= 0.001 else "0"
|
|
|
|
return f"|{self.name} |{x_str} | {y_str} | {z_str} |"
|
|
|
|
|
|
class CORGenerator:
|
|
"""Generator für COR-Dateien aus JXL-Daten"""
|
|
|
|
def __init__(self, parser: JXLParser):
|
|
self.parser = parser
|
|
self.cor_points: List[CORPoint] = []
|
|
|
|
# Transformationsparameter (für lokales System)
|
|
self.origin_east: float = 0.0
|
|
self.origin_north: float = 0.0
|
|
self.origin_elev: float = 0.0
|
|
self.rotation: float = 0.0 # in Radiant
|
|
|
|
def compute_from_observations(self) -> List[CORPoint]:
|
|
"""
|
|
Berechnet Koordinaten aus den Rohbeobachtungen
|
|
Erste Station: Referenzlinie
|
|
Weitere Stationen: Freie Stationierung
|
|
"""
|
|
self.cor_points = []
|
|
computed_coords: Dict[str, Tuple[float, float, float]] = {}
|
|
|
|
# Referenzlinie finden
|
|
ref_line = self.parser.get_reference_line()
|
|
|
|
# Stationen sortieren nach Aufnahmezeitpunkt
|
|
stations_sorted = sorted(self.parser.stations.items(),
|
|
key=lambda x: x[1].timestamp)
|
|
|
|
first_station = True
|
|
|
|
for station_id, station in stations_sorted:
|
|
if station.station_type == 'ReflineStationSetup' and first_station:
|
|
# Erste Station mit Referenzlinie
|
|
self._compute_refline_station(station_id, station, ref_line, computed_coords)
|
|
first_station = False
|
|
else:
|
|
# Freie Stationierung
|
|
self._compute_free_station(station_id, station, computed_coords)
|
|
|
|
# COR-Punkte erstellen
|
|
for name, (e, n, elev) in computed_coords.items():
|
|
self.cor_points.append(CORPoint(
|
|
name=name,
|
|
x=e,
|
|
y=n,
|
|
z=elev
|
|
))
|
|
|
|
return self.cor_points
|
|
|
|
def _compute_refline_station(self, station_id: str, station: Station,
|
|
ref_line, computed_coords: Dict):
|
|
"""Berechnet Punkte für eine Referenzlinien-Station"""
|
|
|
|
# Referenzpunkte definieren das lokale System
|
|
if ref_line:
|
|
start_name = ref_line.start_point
|
|
end_name = ref_line.end_point
|
|
|
|
# Startpunkt ist Ursprung (0,0)
|
|
if start_name in self.parser.points:
|
|
computed_coords[start_name] = (0.0, 0.0, 0.0)
|
|
|
|
# Endpunkt liegt auf der Y-Achse (Nord-Richtung)
|
|
# Die Distanz muss aus den Messungen berechnet werden
|
|
|
|
# Alle Messungen von dieser Station
|
|
measurements = self.parser.get_measurements_from_station(station_id)
|
|
|
|
# Finde Backbearing für Orientierung
|
|
backbearing = None
|
|
for bb_id, bb in self.parser.backbearings.items():
|
|
if bb.station_record_id == station_id:
|
|
backbearing = bb
|
|
break
|
|
|
|
# Stationskoordinaten
|
|
if station.name in self.parser.points:
|
|
st_point = self.parser.points[station.name]
|
|
st_e = st_point.east if st_point.east is not None else 0.0
|
|
st_n = st_point.north if st_point.north is not None else 0.0
|
|
st_elev = st_point.elevation if st_point.elevation is not None else 0.0
|
|
|
|
computed_coords[station.name] = (st_e, st_n, st_elev)
|
|
|
|
# Punkte aus Messungen berechnen
|
|
for meas in measurements:
|
|
if meas.name and meas.horizontal_circle is not None and meas.edm_distance is not None:
|
|
# Prismenkonstante holen
|
|
prism_const = 0.0
|
|
if meas.target_id in self.parser.targets:
|
|
prism_const = self.parser.targets[meas.target_id].prism_constant
|
|
|
|
# Korrigierte Distanz
|
|
dist = meas.edm_distance + prism_const
|
|
|
|
# Vertikalwinkel
|
|
vz = meas.vertical_circle if meas.vertical_circle is not None else 100.0
|
|
vz_rad = vz * math.pi / 200.0 # Gon zu Radiant
|
|
|
|
# Horizontaldistanz
|
|
h_dist = dist * math.sin(vz_rad)
|
|
|
|
# Höhendifferenz (von Zenitwinkel)
|
|
dh = dist * math.cos(vz_rad)
|
|
|
|
# Richtung berechnen
|
|
hz = meas.horizontal_circle
|
|
|
|
# Orientierung anwenden
|
|
if backbearing and backbearing.orientation_correction is not None:
|
|
ori = backbearing.orientation_correction
|
|
else:
|
|
ori = 0.0
|
|
|
|
# Azimut berechnen
|
|
# Bei Referenzlinie: Der Hz-Kreis zum Backsight definiert die Nordrichtung
|
|
azimut_gon = hz + ori
|
|
azimut_rad = azimut_gon * math.pi / 200.0
|
|
|
|
# Koordinatenberechnung
|
|
de = h_dist * math.sin(azimut_rad)
|
|
dn = h_dist * math.cos(azimut_rad)
|
|
|
|
# Endkoordinaten
|
|
if meas.north is not None and meas.east is not None:
|
|
# Verwende berechnete Koordinaten aus JXL
|
|
e = meas.east
|
|
n = meas.north
|
|
elev = meas.elevation if meas.elevation is not None else 0.0
|
|
else:
|
|
# Berechne aus Messungen
|
|
st_point = self.parser.points.get(station.name)
|
|
if st_point:
|
|
e = (st_point.east or 0.0) + de
|
|
n = (st_point.north or 0.0) + dn
|
|
elev = (st_point.elevation or 0.0) + dh
|
|
else:
|
|
e = de
|
|
n = dn
|
|
elev = dh
|
|
|
|
# Nur hinzufügen wenn noch nicht vorhanden oder neuere Messung
|
|
if meas.name not in computed_coords:
|
|
computed_coords[meas.name] = (e, n, elev)
|
|
|
|
def _compute_free_station(self, station_id: str, station: Station,
|
|
computed_coords: Dict):
|
|
"""Berechnet Punkte für eine freie Stationierung"""
|
|
|
|
# Bei freier Stationierung: Station wurde bereits berechnet
|
|
# Wir verwenden die Koordinaten aus dem Parser
|
|
|
|
measurements = self.parser.get_measurements_from_station(station_id)
|
|
|
|
# Stationskoordinaten hinzufügen falls vorhanden
|
|
if station.name in self.parser.points:
|
|
st_point = self.parser.points[station.name]
|
|
if st_point.east is not None and st_point.north is not None:
|
|
computed_coords[station.name] = (
|
|
st_point.east,
|
|
st_point.north,
|
|
st_point.elevation or 0.0
|
|
)
|
|
|
|
# Punkte aus Messungen
|
|
for meas in measurements:
|
|
if meas.name and meas.north is not None and meas.east is not None:
|
|
if meas.name not in computed_coords:
|
|
computed_coords[meas.name] = (
|
|
meas.east,
|
|
meas.north,
|
|
meas.elevation or 0.0
|
|
)
|
|
|
|
def generate_from_computed_grid(self) -> List[CORPoint]:
|
|
"""
|
|
Generiert COR-Punkte direkt aus den ComputedGrid-Koordinaten der JXL-Datei
|
|
"""
|
|
self.cor_points = []
|
|
seen_names = set()
|
|
|
|
# Referenzlinie finden für Header
|
|
ref_line = self.parser.get_reference_line()
|
|
|
|
# Sortiere Stationen nach Zeitstempel
|
|
stations_sorted = sorted(self.parser.stations.items(),
|
|
key=lambda x: x[1].timestamp)
|
|
|
|
current_station_header = None
|
|
|
|
for station_id, station in stations_sorted:
|
|
# Neuer Stationsheader wenn sich die Station ändert
|
|
if station.station_type == 'ReflineStationSetup':
|
|
if ref_line:
|
|
# Header für Referenzlinie-Station
|
|
header_point = CORPoint(
|
|
name=ref_line.start_point,
|
|
x=0.0, y=0.0, z=0.0
|
|
)
|
|
if ref_line.start_point not in seen_names:
|
|
self.cor_points.append(header_point)
|
|
seen_names.add(ref_line.start_point)
|
|
|
|
# Punkte von dieser Station
|
|
measurements = self.parser.get_measurements_from_station(station_id)
|
|
|
|
for meas in measurements:
|
|
if meas.name and meas.name not in seen_names:
|
|
if meas.north is not None and meas.east is not None:
|
|
self.cor_points.append(CORPoint(
|
|
name=meas.name,
|
|
x=meas.east,
|
|
y=meas.north,
|
|
z=meas.elevation or 0.0
|
|
))
|
|
seen_names.add(meas.name)
|
|
|
|
# Alle verbleibenden Punkte hinzufügen
|
|
for name, point in self.parser.get_active_points().items():
|
|
if name not in seen_names and point.east is not None and point.north is not None:
|
|
self.cor_points.append(CORPoint(
|
|
name=name,
|
|
x=point.east,
|
|
y=point.north,
|
|
z=point.elevation or 0.0
|
|
))
|
|
seen_names.add(name)
|
|
|
|
return self.cor_points
|
|
|
|
def write_cor_file(self, output_path: str, include_header: bool = False) -> str:
|
|
"""
|
|
Schreibt die COR-Datei
|
|
Format: PunktID,X,Y,Z (Komma-getrennt, KEINE Header-Zeile)
|
|
"""
|
|
lines = []
|
|
|
|
for cp in self.cor_points:
|
|
# Format: Name,X,Y,Z (ohne Header, Komma als Trenner)
|
|
lines.append(f"{cp.name},{cp.x:.4f},{cp.y:.4f},{cp.z:.4f}")
|
|
|
|
content = "\n".join(lines)
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
f.write(content)
|
|
|
|
return content
|
|
|
|
def export_csv(self, output_path: str) -> str:
|
|
"""
|
|
Exportiert als CSV-Datei
|
|
Format: PunktID,X,Y,Z (Komma-getrennt, KEINE Header-Zeile)
|
|
"""
|
|
lines = []
|
|
|
|
for cp in self.cor_points:
|
|
# Format: Name,X,Y,Z (ohne Header, Komma als Trenner)
|
|
lines.append(f"{cp.name},{cp.x:.4f},{cp.y:.4f},{cp.z:.4f}")
|
|
|
|
content = "\n".join(lines)
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
f.write(content)
|
|
|
|
return content
|
|
|
|
def export_txt(self, output_path: str, delimiter: str = "\t") -> str:
|
|
"""Exportiert als TXT-Datei"""
|
|
lines = []
|
|
|
|
for cp in self.cor_points:
|
|
lines.append(f"{cp.name}{delimiter}{cp.x:.4f}{delimiter}{cp.y:.4f}{delimiter}{cp.z:.4f}")
|
|
|
|
content = "\n".join(lines)
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
f.write(content)
|
|
|
|
return content
|
|
|
|
def export_dxf(self, output_path: str) -> str:
|
|
"""Exportiert als DXF-Datei (einfaches Format)"""
|
|
lines = []
|
|
|
|
# DXF Header
|
|
lines.extend([
|
|
"0", "SECTION",
|
|
"2", "ENTITIES"
|
|
])
|
|
|
|
# Punkte als POINT und TEXT
|
|
for cp in self.cor_points:
|
|
# POINT entity
|
|
lines.extend([
|
|
"0", "POINT",
|
|
"8", "POINTS", # Layer
|
|
"10", f"{cp.x:.4f}", # X
|
|
"20", f"{cp.y:.4f}", # Y
|
|
"30", f"{cp.z:.4f}" # Z
|
|
])
|
|
|
|
# TEXT entity für Punktname
|
|
lines.extend([
|
|
"0", "TEXT",
|
|
"8", "NAMES", # Layer
|
|
"10", f"{cp.x + 0.5:.4f}", # X offset
|
|
"20", f"{cp.y + 0.5:.4f}", # Y offset
|
|
"30", f"{cp.z:.4f}", # Z
|
|
"40", "0.5", # Texthöhe
|
|
"1", cp.name # Text
|
|
])
|
|
|
|
# DXF Footer
|
|
lines.extend([
|
|
"0", "ENDSEC",
|
|
"0", "EOF"
|
|
])
|
|
|
|
content = "\n".join(lines)
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
f.write(content)
|
|
|
|
return content
|
|
|
|
def get_statistics(self) -> str:
|
|
"""Gibt Statistiken über die generierten Punkte zurück"""
|
|
if not self.cor_points:
|
|
return "Keine Punkte generiert."
|
|
|
|
x_vals = [p.x for p in self.cor_points]
|
|
y_vals = [p.y for p in self.cor_points]
|
|
z_vals = [p.z for p in self.cor_points]
|
|
|
|
stats = []
|
|
stats.append(f"Anzahl Punkte: {len(self.cor_points)}")
|
|
stats.append(f"")
|
|
stats.append(f"X (East):")
|
|
stats.append(f" Min: {min(x_vals):.3f} m")
|
|
stats.append(f" Max: {max(x_vals):.3f} m")
|
|
stats.append(f" Spanne: {max(x_vals) - min(x_vals):.3f} m")
|
|
stats.append(f"")
|
|
stats.append(f"Y (North):")
|
|
stats.append(f" Min: {min(y_vals):.3f} m")
|
|
stats.append(f" Max: {max(y_vals):.3f} m")
|
|
stats.append(f" Spanne: {max(y_vals) - min(y_vals):.3f} m")
|
|
stats.append(f"")
|
|
stats.append(f"Z (Elevation):")
|
|
stats.append(f" Min: {min(z_vals):.3f} m")
|
|
stats.append(f" Max: {max(z_vals):.3f} m")
|
|
stats.append(f" Spanne: {max(z_vals) - min(z_vals):.3f} m")
|
|
|
|
return "\n".join(stats)
|