import csv
import os
import urllib.request
from pathlib import Path
import astropy.units as u
import numpy as np
import skyfield.api
from astropy.time import Time
from astropy.timeseries import TimeSeries
from shapely import geometry
from skyfield.iokit import parse_tle_file
from padre_craft import NORAD_ID, _data_directory, log
__all__ = ["PadreOrbit", "get_ephemeris_file", "get_latest_tle", "get_celestrak_url"]
_SAA_VERTICES = np.array(
[
[-48.514111, -3.000365],
[-79.502424, -16.25564],
[-83.458183, -35.407033],
[-69.116229, -43.917163],
[-33.628958, -49.678745],
[23.697636, -38.845567],
[11.394957, -23.423524],
[-48.514111, -3.000365],
]
)
_UPPER_BELT_VERTICES = np.array(
[
[-180, 66],
[-120, 66],
[-79, 62],
[-20, 70],
[32, 71],
[112, 72],
[180, 66],
[180, 50],
[150, 55],
[85, 55],
[45, 57],
[-28, 56],
[-100, 35],
[-130, 41],
[-180, 50],
[-180, 66],
]
)
_LOWER_BELT_VERTICES = np.array(
[
[-180, -45],
[-126, -48],
[-83, -48],
[-33, -49],
[14, -48],
[41, -51],
[82, -45],
[135, -40],
[180, -45],
[180, -60],
[133, -54],
[92, -61],
[44, -72],
[-29, -82],
[-122, -82],
[-180, -60],
[-180, -45],
]
)
def vertices_to_polygon(vertices: np.array) -> geometry.Polygon:
"""Given a set of 2D vertices return a polygon.
Parameters
----------
vertices
Returns
-------
polygon
"""
# TODO: check that the polygon closes, the last point should be the same as first point
# TODO: check that shape is 2
pointList = []
for this_lon, this_lat in vertices:
this_point = geometry.Point(this_lon, this_lat)
pointList.append(this_point)
poly = geometry.Polygon(np.array(pointList))
return poly
[docs]
def get_ephemeris_file(download_if_missing: bool = True) -> Path:
"""Get the path to the ephemeris file. If it does not exist, download it from JPL and put it into the data directory"""
filename = "de421.bsp"
# Check if the LAMBDA_ENVIRONMENT environment variable is set
lambda_environment = os.getenv("LAMBDA_ENVIRONMENT")
if lambda_environment:
# If running in AWS Lambda, check the /tmp directory
file_path = Path("/tmp") / filename
else:
file_path = _data_directory / filename
if file_path.exists():
return file_path
elif download_if_missing:
url = f"https://ssd.jpl.nasa.gov/ftp/eph/planets/bsp/{filename}"
log.info(f"{filename} is not found. Downloading from {url}")
urllib.request.urlretrieve(url, filename)
# move this file into the data directory
new_path = _data_directory / filename
Path(filename).replace(new_path)
return new_path
else:
log.warning(f"{filename} not found and download_if_missing is False.")
return None
[docs]
def get_latest_tle() -> Path:
"""Get the latest TLE in CSV format. If it does not exist, download it from Celestrak and put it into the data directory.
Returns
-------
tle_path
Path to the tle file
"""
url = get_celestrak_url(format="csv")
filename = Path(f"{Time.now().iso.replace('-', '')[0:8]}_padre_tle.csv")
lambda_environment = os.getenv("LAMBDA_ENVIRONMENT")
if lambda_environment:
# If running in AWS Lambda, check the /tmp directory
file_path = Path("/tmp") / filename
else:
file_path = _data_directory / filename
if not file_path.exists():
log.info(f"Existing {filename} is not found. Downloading from {url}")
urllib.request.urlretrieve(url, filename)
Path(filename).replace(file_path)
return file_path
[docs]
def get_celestrak_url(format="csv") -> str:
"""Returns the url for the PADRE orbit information from celestrak
Raises
------
ValueError
If format is not one recognized by Celestrak [JSON, CSV, TLE]
Returns
-------
url: str
The url to download the celestrak orbit information
"""
if format.upper() not in ["CSV", "TLE", "JSON"]:
raise ValueError(f"Format {format} is not recognized.")
return f"https://celestrak.org/NORAD/elements/gp.php?CATNR={NORAD_ID}&FORMAT={format.upper()}"
[docs]
class PadreOrbit:
"""A class to load and calculate the orbit of the PADRE spacecraft.
Parameters
----------
tle_filename : Path, str, or None
Can parse CSV or standard TLE files. If given none, will download todays TLE from Celestrack
Raises
------
ValueError
If does not recognize the TLE file or cannot download one if not given.
Examples
--------
>>> from padre_craft.orbit import PadreOrbit
>>> from astropy.time import Time
>>> import astropy.units as u
>>> from padre_craft import _test_files_directory
>>> tle_filename = _test_files_directory / "20251219_padre_tle.csv"
>>> padre_orbit = PadreOrbit(tle_filename) # doctest: +SKIP
>>> padre_orbit.calculate(tstart=Time("2025-12-19T01:00"), tend=Time("2025-12-19T01:06"), dt=1 * u.min) # doctest: +SKIP
>>> padre_orbit.in_sun # doctest: +SKIP
array([ True, True, True, True, True, True])
>>> padre_orbit.in_particles # doctest: +SKIP
array([False, False, False, True, True, True])
>>> padre_orbit.good_flag # doctest: +SKIP
array([ True, True, True, False, False, False])
"""
def __init__(self, tle_filename=None):
self.ts = skyfield.api.load.timescale()
self.satellite = None
self.timeseries = None
self.calculated = False
self.time = None
if tle_filename:
if isinstance(tle_filename, str):
tle_filename = Path(tle_filename)
else:
tle_filename = get_latest_tle()
if tle_filename.suffix == ".csv":
with skyfield.api.load.open(str(tle_filename), mode="r") as f:
data = list(csv.DictReader(f))
sats = [
skyfield.api.EarthSatellite.from_omm(self.ts, fields) for fields in data
]
elif tle_filename.suffix == ".tle":
with skyfield.api.load.open(str(tle_filename)) as f:
sats = list(parse_tle_file(f, self.ts))
else:
raise ValueError(f"File type of {tle_filename} not recognized.")
if len(sats) >= 1:
self.satellite = sats[0]
else:
raise ValueError(f"Problem loading tle file {tle_filename}")
if self.satellite is None:
raise ValueError("No TLE file found")
self.eph = skyfield.api.load_file(get_ephemeris_file())
self.polys = {
"saa": vertices_to_polygon(_SAA_VERTICES),
"upper_belt": vertices_to_polygon(_UPPER_BELT_VERTICES),
"lower_belt": vertices_to_polygon(_LOWER_BELT_VERTICES),
}
[docs]
def calculate(self, tstart: Time, tend: Time, dt=5 * u.s):
"""Calculates the position of PADRE in its orbit and associated parameters.
Raises : ValueError
If start or end times are beyond 2 weeks from TLE epoch
"""
if (np.abs(tstart - self.satellite.epoch.to_astropy()) > 2 * u.week) or (
np.abs(tend - self.satellite.epoch.to_astropy()) > 2 * u.week
):
raise ValueError(
f"Times provided too far from tle epoch {self.satellite.epoch}"
)
self.calculated = True
n_samples = np.ceil(((tend - tstart) / dt).decompose().value).astype(np.uint)
self.timeseries = TimeSeries(
time_start=tstart, time_delta=dt, n_samples=n_samples
)
t = self.ts.from_astropy(self.timeseries.time)
self.time = self.timeseries.time
self.geocentric = self.satellite.at(t)
self.in_sun = self.satellite.at(t).is_sunlit(self.eph)
geopos = skyfield.api.wgs84.geographic_position_of(self.geocentric)
self.longitude = [
float(this_lon) for this_lon in geopos.longitude.degrees
] * u.deg
self.latitude = [
float(this_lat) for this_lat in geopos.latitude.degrees
] * u.deg
self.altitude = [float(this_lat) for this_lat in geopos.elevation.km] * u.km
self.speed = (
np.linalg.norm(self.geocentric.velocity.km_per_s, axis=0) * u.km / u.s
)
self.in_upper_belt = np.array(
[
geometry.Point(this_lon, this_lat).within(self.polys["upper_belt"])
for this_lon, this_lat in zip(self.longitude.value, self.latitude.value)
]
)
self.in_lower_belt = np.array(
[
geometry.Point(this_lon, this_lat).within(self.polys["lower_belt"])
for this_lon, this_lat in zip(self.longitude.value, self.latitude.value)
]
)
self.in_saa = np.array(
[
geometry.Point(this_lon, this_lat).within(self.polys["saa"])
for this_lon, this_lat in zip(self.longitude.value, self.latitude.value)
]
)
self.in_particles = self.in_saa | self.in_lower_belt | self.in_upper_belt
self.good_flag = self.in_sun & ~self.in_particles
self.timeseries = TimeSeries(
time=self.time,
data={
"longitude": self.longitude,
"latitude": self.latitude,
"altitude": self.altitude,
"speed": self.speed,
"in_sun": self.in_sun,
"in_saa": self.in_saa,
"in_upper_belt": self.in_upper_belt,
"in_lower_belt": self.in_lower_belt,
},
)
self.calculated = True
[docs]
def plot_geolocation(self):
if self.calculated:
import matplotlib.pyplot as plt
from matplotlib import colormaps
from mpl_toolkits.basemap import Basemap
cm = colormaps["rainbow"]
colorlist = (self.time.jd - self.time[0].jd) / (
self.time[-1].jd - self.time[0].jd
)
m = Basemap(projection="mill", lon_0=0, resolution="c")
m.drawcoastlines()
# draw parallels and meridians.
m.drawparallels(np.arange(-90.0, 120.0, 30.0))
m.drawmeridians(np.arange(0.0, 360.0, 60.0))
m.scatter(
self.longitude,
self.latitude.value,
latlon=True,
marker="o",
c=colorlist,
cmap=cm,
)
m.plot(*self.polys["saa"].exterior.xy, color="red", latlon=True)
m.plot(*self.polys["upper_belt"].exterior.xy, color="red", latlon=True)
m.plot(*self.polys["lower_belt"].exterior.xy, color="red", latlon=True)
m.colorbar(label="Normalized Time")
m.nightshade(self.time[0].to_datetime())
plt.title(f"Padre Craft Orbit from {self.time[0]} to {self.time[-1]}")
plt.show()
[docs]
def plot_state(self):
"""Plot the orbit state as a function of time."""
import matplotlib.pyplot as plt
fig = plt.figure(figsize=(11, 5))
ylabel = [
"In Sunlight",
"In SAA",
"In Lower Belt",
"In Upper Belt",
"In Particles",
"Good Flag",
] # y axis labels for subplots
state_vals = [
self.in_sun.astype(int),
self.in_saa.astype(int),
self.in_lower_belt.astype(int),
self.in_upper_belt.astype(int),
self.in_particles.astype(int),
self.good_flag.astype(int),
]
color_list = [
"C0",
"C1",
"C2",
"C3",
"C4",
"C5",
"C6",
"C7",
"C8",
"C9",
] # colors for plot lines
axs = fig.subplots(len(ylabel), sharex=True, sharey=False)
line_list = [] # empty list of plot lines for figure legend
# y values for each subplot
for i, a in enumerate(axs): # for each subplot
line = a.fill_between(
self.time.to_datetime(),
state_vals[i],
y2=0,
color=color_list[i],
step="post",
label=ylabel[i],
)
line_list.append(line) # save handle for fig legend
# a.set_yticks([0, 1], ['Off', 'On'])
a.grid(color="lightgray")
a.set_xlabel("Seconds")
a.minorticks_on()
a.set_ylim(0, 1)
a.tick_params(
axis="y", which="minor", length=0
) # hide minor ticks on y axis
a.label_outer() # axis labels on left and bottom sides of subplots
plt.figlegend(handles=line_list, loc="right") # overall figure legend
plt.show()