Source code for ephemeris_tools.tracker

"""Moon tracker tool: PostScript plot and text table (port of tracker3_xxx.f)."""

from __future__ import annotations

import math
import os
import sys
from typing import TextIO, TypedDict, cast

import cspyce

from ephemeris_tools.constants import (
    EARTH_ID,
    EPHEM_DESCRIPTIONS_BY_PLANET,
    SPACECRAFT_IDS,
    SPACECRAFT_NAMES,
    spacecraft_code_to_id,
    spacecraft_name_to_code,
)
from ephemeris_tools.params import Observer, TrackerParams, parse_ring_spec
from ephemeris_tools.rendering.draw_tracker import (
    PLANET_GRAY,
    RING_DATA,
    draw_moon_tracks,
)
from ephemeris_tools.spice.common import get_state
from ephemeris_tools.spice.geometry import moon_tracker_offsets
from ephemeris_tools.spice.load import load_spacecraft, load_spice_files
from ephemeris_tools.spice.observer import set_observer_id, set_observer_location
from ephemeris_tools.time_utils import (
    day_sec_from_tai,
    hms_from_sec,
    interval_seconds,
    mjd_from_tai,
    parse_datetime,
    tai_from_day_sec,
    tdb_from_tai,
    ymd_from_day,
)
from ephemeris_tools.viewer import get_planet_config


class _RunTrackerKwargs(TypedDict, total=False):
    """Keyword arguments for run_tracker (legacy signature from TrackerParams)."""

    planet_num: int
    start_time: str
    stop_time: str
    interval: float
    time_unit: str
    viewpoint: str
    viewpoint_display: str | None
    moon_ids: list[int] | None
    ephem_version: int
    sc_trajectory: int
    xrange: float | None
    xscaled: bool
    title: str
    ring_options: list[int] | None
    observer_latitude: float | None
    observer_longitude: float | None
    observer_altitude: float | None
    output_ps: TextIO | None
    output_txt: TextIO | None


# Radians to arcsec for tracker plot (Earth observatories; RSPK_TrackMoons)
_RAD_TO_ARCSEC = 180.0 / math.pi * 3600.0
# Radians to degrees for spacecraft observers (RSPK_TrackMoonC)
_RAD_TO_DEG = 180.0 / math.pi


def _ring_options_to_flags(planet_num: int, ring_options: list[int], nrings: int) -> list[bool]:
    """Convert CGI ring option codes to ring visibility flags (tracker3_xxx.f).

    Parameters:
        planet_num: Planet number (5=Jupiter, 6=Saturn, etc.).
        ring_options: List of option codes (e.g. 61=Saturn main, 62=G+E).
        nrings: Number of ring entries in geometry.

    Returns:
        List of bool, one per ring; True = show.
    """
    required = 5 if planet_num == 6 else (3 if planet_num == 5 else 1)
    flags = [False] * max(nrings, required, 1)
    for opt in ring_options:
        if planet_num == 5:  # Jupiter: 51=Main, 52=Gossamer
            if opt == 51:
                flags[0] = True
            elif opt == 52:
                flags[1] = flags[2] = True
        elif planet_num == 6:  # Saturn: 61=Main, 62=G+E, 63=outer
            if opt == 61:
                flags[0] = True
            elif opt == 62:
                flags[1] = flags[2] = True
            elif opt == 63:
                flags[3] = flags[4] = True
        elif (planet_num == 7 and opt == 71) or (planet_num == 8 and opt == 81):  # Uranus Epsilon
            flags[0] = True
    return flags


def _tracker_call_kwargs_from_params(params: TrackerParams) -> _RunTrackerKwargs:
    """Convert a TrackerParams object to legacy run_tracker keyword arguments.

    Parameters:
        params: TrackerParams with planet_num, start_time, stop_time, interval,
            time_unit, observer, moon_ids, ring_names, xrange, xunit, etc.
            observer.name is used for viewpoint; if None, defaults to 'Earth'.
            viewpoint_display is passed through when set (lat/lon caption).

    Returns:
        _RunTrackerKwargs mapping with the same fields, suitable for run_tracker.

    Raises:
        None.
    """
    viewpoint = params.observer.name if params.observer.name is not None else 'Earth'
    viewpoint_display = params.viewpoint_display
    xunit = params.xunit.lower()
    ring_options = None
    if params.ring_names:
        ring_options = parse_ring_spec(params.planet_num, params.ring_names)
    return {
        'planet_num': params.planet_num,
        'start_time': params.start_time,
        'stop_time': params.stop_time,
        'interval': params.interval,
        'time_unit': params.time_unit,
        'viewpoint': viewpoint,
        'viewpoint_display': viewpoint_display,
        'moon_ids': params.moon_ids,
        'ephem_version': params.ephem_version,
        'sc_trajectory': params.sc_trajectory,
        'xrange': params.xrange,
        'xscaled': 'radii' in xunit,
        'title': params.title,
        'ring_options': ring_options,
        'observer_latitude': params.observer.latitude_deg,
        'observer_longitude': params.observer.longitude_deg,
        'observer_altitude': params.observer.altitude_m,
        'output_ps': params.output_ps,
        'output_txt': params.output_txt,
    }


[docs] def run_tracker(params: TrackerParams) -> None: """Generate moon tracker PostScript plot and optional text table (tracker3_xxx.f). Loads SPICE, computes moon offsets over the time range, draws plot and table. Parameters: params: Structured tracker inputs (planet, time range, interval, observer, moons, rings, output streams). Raises: ValueError: Invalid time range or planet. RuntimeError: SPICE load failure. """ kwargs = _tracker_call_kwargs_from_params(params) _run_tracker_impl(**kwargs)
def _run_tracker_impl( *, planet_num: int, start_time: str = '', stop_time: str = '', interval: float = 1.0, time_unit: str = 'hour', viewpoint: str = 'Earth', viewpoint_display: str | None = None, moon_ids: list[int] | None = None, ephem_version: int = 0, xrange: float | None = None, xscaled: bool = False, title: str = '', ring_options: list[int] | None = None, observer_latitude: float | None = None, observer_longitude: float | None = None, observer_altitude: float | None = None, sc_trajectory: int = 0, output_ps: TextIO | None = None, output_txt: TextIO | None = None, ) -> None: """Internal tracker implementation (flat kwargs from TrackerParams).""" if moon_ids is None: moon_ids = [] ok, reason = load_spice_files(planet_num, ephem_version) if not ok: raise RuntimeError(f'Failed to load SPICE kernels: {reason}') if observer_latitude is not None and observer_longitude is not None: set_observer_location( observer_latitude, observer_longitude, observer_altitude if observer_altitude is not None else 0.0, ) else: code = spacecraft_name_to_code(viewpoint) if code is not None: sc_id = spacecraft_code_to_id(code) if sc_id: load_spacecraft(sc_id, planet_num, sc_trajectory, set_obs=True) else: set_observer_id(EARTH_ID) else: set_observer_id(EARTH_ID) start_parsed = parse_datetime(start_time) stop_parsed = parse_datetime(stop_time) if start_parsed is None or stop_parsed is None: raise ValueError('Invalid start or stop time') day1, sec1 = start_parsed day2, sec2 = stop_parsed # Match FORTRAN: round start time to nearest minute (tracker3_xxx.f sec = 60*aint(sec/60+0.5)) sec1_rounded = 60.0 * int(sec1 / 60.0 + 0.5) if sec1_rounded >= 86400.0: day1, sec1_rounded = day1 + 1, 0.0 tai1 = tai_from_day_sec(day1, sec1_rounded) tai2 = tai_from_day_sec(day2, sec2) dsec = interval_seconds(interval, time_unit, min_seconds=60.0, round_to_minutes=True) ntimes = int((tai2 - tai1) / dsec) + 1 if ntimes < 2: raise ValueError('Time range too short or interval too large') if ntimes > 10000: if output_ps is not None: path = getattr(output_ps, 'name', None) try: output_ps.close() except OSError: pass if path and isinstance(path, (str, os.PathLike)) and os.path.isfile(path): try: os.remove(path) except OSError: pass print(' Error---Number of time steps exceeds limit of 10000', file=sys.stdout) sys.exit(1) cfg = get_planet_config(planet_num) if cfg is None: raise ValueError(f'Unknown planet number: {planet_num}') state = get_state() # Moon body IDs: preserve URL/CGI order when moon_ids provided (match FORTRAN). all_moon_ids = [m.id for m in cfg.moons if m.id != state.planet_id] if moon_ids: track_moon_ids = [tid for tid in moon_ids if tid in set(all_moon_ids)] else: track_moon_ids = list(all_moon_ids) id_to_name = {m.id: m.name for m in cfg.moons} moon_names = [id_to_name.get(tid, str(tid)) for tid in track_moon_ids] # FORTRAN uses degrees for spacecraft observers (RSPK_TrackMoonC), arcsec for # Earth/JWST/HST (RSPK_TrackMoons). obs_is_spacecraft = False obs_vp = (viewpoint or '').strip() if obs_vp.lower() not in ('', 'earth', 'observatory', 'latlon'): for sc_name in SPACECRAFT_NAMES: if obs_vp.lower() == sc_name.lower(): obs_is_spacecraft = True break if not obs_is_spacecraft: for sc_id in SPACECRAFT_IDS: if obs_vp.upper() == sc_id: obs_is_spacecraft = True break use_degrees = obs_is_spacecraft and 'JWST' not in obs_vp.upper() and 'HST' not in obs_vp.upper() rad_to_angle = _RAD_TO_DEG if use_degrees else _RAD_TO_ARCSEC # FORTRAN quirk (tracker3_xxx + rspk_trackmoons): moon geometry is sampled # on an evenly stretched grid from start->stop, while table timestamps use # the original fixed interval grid. sample_dt = (tai2 - tai1) / (ntimes - 1) if ntimes > 1 else 1.0 table_times_tai: list[float] = [] moon_offsets_arcsec: list[list[float]] = [[] for _ in track_moon_ids] limb_arcsec: list[float] = [] for i in range(ntimes): sample_tai = tai1 + i * sample_dt table_tai = tai1 + i * dsec et = tdb_from_tai(sample_tai) offsets_rad, limb_rad = moon_tracker_offsets(et, track_moon_ids) table_times_tai.append(table_tai) limb_arcsec.append(limb_rad * rad_to_angle) for j, o in enumerate(offsets_rad): moon_offsets_arcsec[j].append(o * rad_to_angle) # X-axis range: user xrange (arcsec or radii per xscaled) or from limb. explicit_xrange = xrange is not None and xrange > 0 if explicit_xrange: xrange_val = xrange else: xrange_val = max(limb_arcsec) * 2.0 if limb_arcsec else 100.0 xrange_val = max(xrange_val, 10.0) xscaled = False if not xscaled and not explicit_xrange: xrange_val = max(xrange_val if xrange_val is not None else 0.0, 10.0) radii = cspyce.bodvrd(str(state.planet_id), 'RADII') rplanet_km = radii[0] nrings, ring_rads_list, ring_grays_list = RING_DATA.get(planet_num, (0, [0.0], [0.75])) if ring_options: ring_flags = _ring_options_to_flags(planet_num, ring_options, nrings) else: ring_flags = [False] * max(nrings, 1) ring_rads_km = (ring_rads_list + [0.0] * 5)[:5] ring_grays = (ring_grays_list + [0.5] * 5)[:5] out_name = getattr(output_ps, 'name', None) if output_ps else None filename = str(out_name) if out_name else 'tracker.ps' ephem_caption = EPHEM_DESCRIPTIONS_BY_PLANET.get(planet_num, 'DE440') # FORTRAN caption text in current CGI behavior does not append empty # parentheses for non-spacecraft observers. viewpoint_caption = (viewpoint_display or viewpoint or 'Earth').strip() if not viewpoint_caption or viewpoint_caption.lower() in ('earth', 'observatory'): viewpoint_caption = "Earth's center" ncaptions = 2 lcaptions = ['Ephemeris:', 'Viewpoint:'] rcaptions = [ephem_caption, viewpoint_caption] # FORTRAN uses DOY format for spacecraft observers (not Earth, JWST, HST). use_doy_format = use_degrees if output_ps: draw_moon_tracks( output_ps, planet_num=planet_num, ntimes=ntimes, time1_tai=tai1, time2_tai=tai2, dt=sample_dt, xrange=xrange_val if xrange_val is not None else 100.0, xscaled=xscaled, moon_arcsec=moon_offsets_arcsec, limb_arcsec=limb_arcsec, moon_names=moon_names, nrings=nrings, ring_flags=ring_flags, ring_rads_km=ring_rads_km, ring_grays=ring_grays, planet_gray=PLANET_GRAY, rplanet_km=rplanet_km, title=title or '', ncaptions=ncaptions, lcaptions=lcaptions, rcaptions=rcaptions, align_loc=90.0 if use_doy_format else 180.0, filename=filename, use_doy_format=use_doy_format, ) if output_txt: header = ' mjd year mo dy hr mi limb' for name in moon_names[:25]: # FORTRAN: moon_names are uppercase (transformed in tracker3_xxx.f) uname = name.upper() header += ' ' + (uname[:9] if len(uname) >= 9 else uname + ' ' * (9 - len(uname))) output_txt.write(header.rstrip() + '\n') for irec in range(ntimes): tai = table_times_tai[irec] day, sec = day_sec_from_tai(tai) _, _, sec_frac = hms_from_sec(sec) if sec_frac >= 30.0: tai = tai + 30.0 day, sec = day_sec_from_tai(tai) year, month, day = ymd_from_day(day) hour, minute, _ = hms_from_sec(sec) mjd = mjd_from_tai(tai) limb = limb_arcsec[irec] row = f'{mjd:11.4f}{year:5d}{month:3d}{day:3d}{hour:3d}{minute:3d}{limb:7.3f}' for j in range(len(track_moon_ids)): row += f'{moon_offsets_arcsec[j][irec]:10.3f}' for _ in range(len(track_moon_ids), 25): row += ' ' * 10 output_txt.write(row.rstrip() + '\n') output_txt.flush()
[docs] def tracker_params_from_legacy_kwargs(**kwargs: object) -> TrackerParams: """Build TrackerParams from flat legacy keyword arguments (e.g. for tests). Accepts the same keyword names as the legacy run_tracker signature. """ def _get(key: str, default: object = None) -> object: return kwargs.get(key, default) observer = Observer( name=cast('str | None', _get('viewpoint')), latitude_deg=cast('float | None', _get('observer_latitude')), longitude_deg=cast('float | None', _get('observer_longitude')), altitude_m=cast('float | None', _get('observer_altitude')), ) xscaled = bool(_get('xscaled', False)) return TrackerParams( planet_num=int(cast('int', _get('planet_num', 0))), start_time=str(_get('start_time', '')), stop_time=str(_get('stop_time', '')), interval=float(cast('float', _get('interval', 1.0))), time_unit=str(_get('time_unit', 'hour')), observer=observer, ephem_version=int(cast('int', _get('ephem_version', 0))), sc_trajectory=int(cast('int', _get('sc_trajectory', 0))), moon_ids=cast('list[int]', _get('moon_ids') or []), ring_names=None, xrange=cast('float | None', _get('xrange')), xunit='radii' if xscaled else 'arcsec', title=str(_get('title', '')), output_ps=cast('TextIO | None', _get('output_ps')), output_txt=cast('TextIO | None', _get('output_txt')), )