Source code for ephemeris_tools.ephemeris

"""Ephemeris table generator (ported from ephem3_xxx.f)."""

from __future__ import annotations

import math
from typing import TextIO

from ephemeris_tools.constants import (
    EARTH_ID,
    MOON_ID,
    PLANET_NUM_TO_ID,
    SUN_ID,
    spacecraft_code_to_id,
    spacecraft_name_to_code,
)
from ephemeris_tools.params import (
    COL_EARTHRD,
    COL_LPHASE,
    COL_LSEP,
    COL_MJD,
    COL_OBSDIST,
    COL_OBSLON,
    COL_OBSOPEN,
    COL_PHASE,
    COL_RADDEG,
    COL_RADEC,
    COL_RADIUS,
    COL_SUBOBS,
    COL_SUBSOL,
    COL_SUNDIST,
    COL_SUNLON,
    COL_SUNOPEN,
    COL_SUNRD,
    COL_SUNSEP,
    COL_YDHM,
    COL_YDHMS,
    COL_YMDHM,
    COL_YMDHMS,
    MCOL_OBSDIST,
    MCOL_OFFDEG,
    MCOL_OFFSET,
    MCOL_ORBLON,
    MCOL_ORBOPEN,
    MCOL_PHASE,
    MCOL_RADEC,
    MCOL_SUBOBS,
    MCOL_SUBSOL,
    EphemerisParams,
    _parse_observatory_coords,
)
from ephemeris_tools.planets import (
    JUPITER_CONFIG,
    MARS_CONFIG,
    NEPTUNE_CONFIG,
    PLUTO_CONFIG,
    SATURN_CONFIG,
    URANUS_CONFIG,
)
from ephemeris_tools.record import Record
from ephemeris_tools.spice.geometry import (
    body_latlon,
    body_phase,
    body_radec,
    body_ranges,
    conjunction_angle,
    limb_radius,
    planet_phase,
    planet_ranges,
)
from ephemeris_tools.spice.load import load_spacecraft, load_spice_files
from ephemeris_tools.spice.observer import set_observer_id, set_observer_location
from ephemeris_tools.spice.orbits import orbit_opening
from ephemeris_tools.spice.rings import ring_opening
from ephemeris_tools.time_utils import (
    day_sec_from_tai,
    hms_from_sec,
    interval_seconds,
    mjd_from_tai,
    parse_datetime,
    tai_from_day_sec,
    tdb_from_tai,
    yd_from_day,
    ymd_from_day,
)

DPR = 180.0 / math.pi
HPR = DPR / 15.0
SPR = DPR * 3600.0
MAXSECS = 360.0 * 60.0 * 60.0


def _set_observer_from_params(params: EphemerisParams, *, sc_loaded_early: bool = False) -> None:
    """Set SPICE observer from params (ephem3_xxx.f observer setup).

    Selection order: if viewpoint is latlon and latitude_deg/longitude_deg are
    set, use set_observer_location; else if observatory is set and maps to a
    spacecraft code, use that (with load_spacecraft unless sc_loaded_early);
    otherwise observer remains Earth.

    Parameters:
        params: EphemerisParams with viewpoint, latitude_deg, longitude_deg,
            altitude_m, observatory, planet_num, sc_trajectory. Expected types
            match the EphemerisParams dataclass.
        sc_loaded_early: If True, spacecraft kernels were already loaded in
            generate_ephemeris; only set_observer_id is called (no kernel reload).

    Returns:
        None.

    Raises:
        May propagate ValueError or SPICE errors from observer lookup or
        spacecraft/body ID resolution when observer name is invalid or
        kernels are missing.
    """
    if (
        params.viewpoint == 'latlon'
        and params.latitude_deg is not None
        and params.longitude_deg is not None
    ):
        alt = params.altitude_m if params.altitude_m is not None else 0.0
        set_observer_location(params.latitude_deg, params.longitude_deg, alt)
        return
    if params.observatory and params.observatory != "Earth's center":
        code = spacecraft_name_to_code(params.observatory)
        if code is not None:
            if sc_loaded_early:
                set_observer_id(code)
                return
            sc_id = spacecraft_code_to_id(code)
            if sc_id:
                load_spacecraft(
                    sc_id,
                    params.planet_num,
                    params.sc_trajectory,
                    set_obs=True,
                )
                return
        parsed_coords = _parse_observatory_coords(params.observatory)
        if parsed_coords is not None:
            lat, lon, alt = parsed_coords
            set_observer_location(lat, lon, alt)
            return
        if 'Earth' in params.observatory or not params.observatory.strip():
            set_observer_id(EARTH_ID)
            return
    set_observer_id(EARTH_ID)


def _round_sec_to_min(sec: float) -> float:
    """Round seconds to nearest minute (60 * round(sec/60)).

    Parameters:
        sec: Seconds within day.

    Returns:
        Seconds rounded to nearest 60.
    """
    # Match FORTRAN's half-up behavior: int(sec/60 + 0.5), not banker's rounding.
    return 60.0 * int(sec / 60.0 + 0.5)


def _round_sec_to_sec(sec: float) -> float:
    """Round to nearest second.

    Parameters:
        sec: Seconds (fractional).

    Returns:
        Rounded seconds.
    """
    return round(sec)


def _normalized_start_sec(sec: float, time_unit: str) -> float:
    """Normalize start second field to match FORTRAN stepping behavior.

    FORTRAN ephemeris rounds the start boundary second field to the nearest
    minute before building the stepping range, including second-based stepping.

    Parameters:
        sec: Seconds within day.
        time_unit: Requested stepping unit (kept for API symmetry).

    Returns:
        Normalized start seconds within day.
    """
    del time_unit
    return _round_sec_to_min(sec)


[docs] def generate_ephemeris(params: EphemerisParams, output: TextIO | None = None) -> None: """Generate ephemeris table and write to output (port of ephem3_xxx.f). Loads SPICE, sets observer from params, steps over time range, and writes requested columns (and moon columns) to the given stream. Parameters: params: Time range, planet, columns, viewpoint, etc. output: Output text stream; if None, uses params.output. Raises: ValueError: Invalid start/stop time or time range. RuntimeError: SPICE load failure. """ out = output or params.output if out is None: return start_parsed = parse_datetime(params.start_time) stop_parsed = parse_datetime(params.stop_time) if start_parsed is None or stop_parsed is None: raise ValueError('Invalid start or stop time') day1, sec1 = start_parsed day2, sec2 = stop_parsed tai1 = tai_from_day_sec(day1, _normalized_start_sec(sec1, params.time_unit)) tai2 = tai_from_day_sec(day2, sec2) dsec = interval_seconds(params.interval, params.time_unit) ntimes = int((tai2 - tai1) / dsec) + 1 if ntimes < 2: raise ValueError('Time range too short or interval too large') if ntimes > 100000: raise ValueError('Number of time steps exceeds limit of 100000') # Match FORTRAN ephem3 load order: spacecraft first (RSPK_LoadSC), then planet # (RSPK_LoadFiles). Kernel load order affects which segments are used when # multiple kernels cover the same body; matching FORTRAN avoids observer-state # discrepancies. Must not call load_spacecraft again in _set_observer_from_params, # or the pool order changes and observer positions differ. sc_loaded_early = False if params.observatory and params.observatory != "Earth's center": code = spacecraft_name_to_code(params.observatory) if code is not None: sc_id = spacecraft_code_to_id(code) if sc_id and load_spacecraft( sc_id, params.planet_num, params.sc_trajectory, set_obs=True, ): sc_loaded_early = True ok, reason = load_spice_files(params.planet_num, params.ephem_version) if not ok: raise RuntimeError(f'Failed to load SPICE kernels: {reason}') _set_observer_from_params(params, sc_loaded_early=sc_loaded_early) rec = Record() columns = list(params.columns) mooncols = list(params.mooncols) # Match CGI/FORTRAN behavior: if either columns or moon columns are explicitly # selected, do not inject default planet columns. if len(columns) == 0 and len(mooncols) == 0: columns = [COL_MJD, COL_YMDHMS, COL_RADEC, COL_PHASE] moon_ids = list(params.moon_ids) planet_id = PLANET_NUM_TO_ID.get(params.planet_num, 100 * params.planet_num + 99) # Match FORTRAN's time = time1 - 2*dsec; do irec=0,ntimes; time = time + dsec tai = tai1 - 2.0 * dsec for irec in range(ntimes + 1): tai = tai + dsec et = tdb_from_tai(tai) if irec > 0 else 0.0 if irec > 0: planet_ra, planet_dec = body_radec(et, planet_id) cosdec = math.cos(planet_dec) else: planet_ra = planet_dec = cosdec = 0.0 ring_geom = None if irec > 0 and any( c in columns for c in (COL_OBSOPEN, COL_SUNOPEN, COL_OBSLON, COL_SUNLON) ): ring_geom = ring_opening(et) subobs_lat = subobs_lon = subsol_lat = subsol_lon = 0.0 if irec > 0 and any(c in columns for c in (COL_SUBOBS, COL_SUBSOL)): subobs_lat, subsol_lat, subobs_lon, subsol_lon = body_latlon(et, planet_id) sun_dist = obs_dist = 0.0 if irec > 0 and any(c in columns for c in (COL_OBSDIST, COL_SUNDIST)): sun_dist, obs_dist = planet_ranges(et) rec.init() for col in columns: if col == COL_MJD: if irec == 0: rec.append(' mjd ') else: rec.append(f'{mjd_from_tai(tai):12.5f}') elif col == COL_YMDHM: if irec == 0: rec.append('year mo dy hr mi') else: d, s = day_sec_from_tai(tai) s = _round_sec_to_min(s) if s >= 86400.0: d += 1 s = 0.0 year, month, day = ymd_from_day(d) hour, minute, _ = hms_from_sec(s) rec.append(f'{year:4d}{month:3d}{day:3d}{hour:3d}{minute:3d}') elif col == COL_YMDHMS: if irec == 0: rec.append('year mo dy hr mi sc') else: d, s = day_sec_from_tai(tai) s = _round_sec_to_sec(s) if s >= 86400.0: d += 1 s = 0.0 year, month, day = ymd_from_day(d) hour, minute, sec = hms_from_sec(s) rec.append(f'{year:4d}{month:3d}{day:3d}{hour:3d}{minute:3d}{int(sec):3d}') elif col == COL_YDHM: if irec == 0: rec.append('year doy hr mi') else: d, s = day_sec_from_tai(tai) s = _round_sec_to_min(s) if s >= 86400.0: d += 1 s = 0.0 year, doy = yd_from_day(d) hour, minute, _ = hms_from_sec(s) rec.append(f'{year:4d}{doy:4d}{hour:3d}{minute:3d}') elif col == COL_YDHMS: if irec == 0: rec.append('year doy hr mi sc') else: d, s = day_sec_from_tai(tai) s = _round_sec_to_sec(s) if s >= 86400.0: d += 1 s = 0.0 year, doy = yd_from_day(d) hour, minute, sec = hms_from_sec(s) rec.append(f'{year:4d}{doy:4d}{hour:3d}{minute:3d}{int(sec):3d}') elif col == COL_OBSDIST: if irec == 0: rec.append(' obs_dist') else: cell = f'{obs_dist:10.0f}' # FORTRAN f10.0 includes a trailing decimal point, # so it overflows one digit earlier than Python. if len(cell) > 10 or (len(cell) == 10 and cell[0] != ' '): cell = f'{obs_dist:10.4E}' rec.append(cell[:10]) elif col == COL_SUNDIST: if irec == 0: rec.append(' sun_dist') else: rec.append(f'{sun_dist:10.4E}') elif col == COL_PHASE: if irec == 0: rec.append(' phase') else: phase = planet_phase(et) rec.append(f'{phase * DPR:9.5f}') elif col == COL_OBSOPEN: if irec == 0: rec.append(' obs_open') else: rec.append(f'{(ring_geom or ring_opening(et)).obs_b * DPR:9.5f}') elif col == COL_SUNOPEN: if irec == 0: rec.append(' sun_open') else: rec.append(f'{(ring_geom or ring_opening(et)).sun_b * DPR:9.5f}') elif col == COL_OBSLON: if irec == 0: rec.append(' obs_lon') else: rec.append(f'{(ring_geom or ring_opening(et)).obs_long * DPR:9.5f}') elif col == COL_SUNLON: if irec == 0: rec.append(' sun_lon') else: rec.append(f'{(ring_geom or ring_opening(et)).sun_long * DPR:9.5f}') elif col == COL_SUBOBS: if irec == 0: rec.append('subobslat subobslon') else: rec.append(f'{subobs_lat * DPR:9.5f}{subobs_lon * DPR:10.5f}') elif col == COL_SUBSOL: if irec == 0: rec.append('subsollat subsollon') else: rec.append(f'{subsol_lat * DPR:9.5f}{subsol_lon * DPR:10.5f}') elif col == COL_RADEC: if irec == 0: rec.append('planet_ra planet_dec') else: rec.append(f'{planet_ra * HPR:9.6f}{planet_dec * DPR:11.5f}') elif col == COL_EARTHRD: if irec == 0: rec.append(' earth_ra earth_dec') else: ra, dec = body_radec(et, EARTH_ID) rec.append(f'{ra * HPR:9.6f}{dec * DPR:10.5f}') elif col == COL_SUNRD: if irec == 0: rec.append(' sun_ra sun_dec') else: ra, dec = body_radec(et, SUN_ID) rec.append(f'{ra * HPR:9.6f}{dec * DPR:10.5f}') elif col == COL_RADIUS: if irec == 0: rec.append(' radius') else: _, rrad = limb_radius(et) rec.append(f'{rrad * SPR:8.3f}') elif col == COL_RADDEG: if irec == 0: rec.append(' r_deg') else: _, rrad = limb_radius(et) rec.append(f'{rrad * DPR:7.3f}') elif col == COL_LPHASE: if irec == 0: rec.append('lun_phase') else: mp = body_phase(et, MOON_ID) rec.append(f'{mp * DPR:9.5f}') elif col == COL_SUNSEP: if irec == 0: rec.append(' sun_sep') else: sep = conjunction_angle(et, planet_id, SUN_ID) rec.append(f'{sep * DPR:9.5f}') elif col == COL_LSEP: if irec == 0: rec.append(' moon_sep') else: sep = conjunction_angle(et, planet_id, MOON_ID) rec.append(f'{sep * DPR:9.5f}') for mid in moon_ids: prefix = _moon_prefix(mid, params.planet_num) for mcol in mooncols: if mcol == MCOL_OBSDIST: if irec == 0: rec.append(prefix + 'dist') else: _, obs_d = body_ranges(et, mid) cell = f'{obs_d:10.0f}' # FORTRAN f10.0 includes trailing decimal point, # so it overflows one digit earlier than Python. if len(cell) > 10 or (len(cell) == 10 and cell[0] != ' '): cell = f'{obs_d:10.4E}' rec.append(cell[:10]) elif mcol == MCOL_PHASE: if irec == 0: rec.append(prefix + 'phase') else: ph = body_phase(et, mid) rec.append(f'{ph * DPR:10.5f}') elif mcol == MCOL_SUBOBS: if irec == 0: rec.append(prefix + 'olat ' + prefix + 'olon') else: subobs_lat_m, _, subobs_lon_m, _ = body_latlon(et, mid) rec.append(f'{subobs_lat_m * DPR:9.5f}{subobs_lon_m * DPR:10.5f}') elif mcol == MCOL_SUBSOL: if irec == 0: rec.append(prefix + 'slat ' + prefix + 'slon') else: _, subsol_lat_m, _, subsol_lon_m = body_latlon(et, mid) rec.append(f'{subsol_lat_m * DPR:9.5f}{subsol_lon_m * DPR:10.5f}') elif mcol == MCOL_RADEC: if irec == 0: rec.append(' ' + prefix + 'ra ' + prefix + 'dec') else: ra, dec = body_radec(et, mid) rec.append(f'{ra * HPR:9.6f}{dec * DPR:10.5f}') elif mcol == MCOL_OFFSET: if irec == 0: rec.append(' ' + prefix + 'dra ' + prefix + 'ddec') else: ra, dec = body_radec(et, mid) ddec = SPR * (dec - planet_dec) dra = SPR * (ra - planet_ra) if dra < -0.5 * MAXSECS: dra += MAXSECS if dra > 0.5 * MAXSECS: dra -= MAXSECS dra *= cosdec rec.append(f'{dra:10.3f}{ddec:11.3f}') elif mcol == MCOL_OFFDEG: if irec == 0: rec.append(' ' + prefix + 'dra ' + prefix + 'ddec') else: ra, dec = body_radec(et, mid) ddec = DPR * (dec - planet_dec) dra = DPR * (ra - planet_ra) # Match FORTRAN ephem3_xxx.f: wrap by 180, not 360 if dra < -180.0: dra += 180.0 if dra > 180.0: dra -= 180.0 dra *= cosdec rec.append(f'{dra:10.5f}{ddec:11.5f}') elif mcol == MCOL_ORBLON: if irec == 0: rec.append(prefix + 'orbit') else: _, obs_lon = orbit_opening(et, mid) rec.append(f'{obs_lon * DPR:10.5f}') elif mcol == MCOL_ORBOPEN: if irec == 0: rec.append(prefix + 'open') else: obs_b, _ = orbit_opening(et, mid) rec.append(f'{obs_b * DPR:9.5f}') rec.write(out)
def _moon_prefix(moon_id: int, planet_num: int) -> str: """Short prefix for moon column labels (ephem3_xxx.f 5-char style). Parameters: moon_id: SPICE body ID of moon. planet_num: Planet number (4-9). Returns: Uppercased name prefix, 4 chars + '_', padded if needed. """ configs = { 4: MARS_CONFIG, 5: JUPITER_CONFIG, 6: SATURN_CONFIG, 7: URANUS_CONFIG, 8: NEPTUNE_CONFIG, 9: PLUTO_CONFIG, } cfg = configs.get(planet_num) if cfg: m = cfg.moon_by_id(moon_id) if m: s = m.name.upper()[:4] s = (s + '_' * 4)[:4] return s + '_' return 'moon_'