Source code for padre_craft.io.aws_db
"""Provides functions to upload data to the time series database for display"""
from astropy.timeseries import TimeSeries
from padre_meddea.housekeeping.calibration import calibrate_hk_ts
from swxsoc.util.util import record_timeseries
import padre_craft.util.util as util
from padre_craft.dirlist.dirlist import DirList
[docs]
def record_housekeeping(hk_ts: TimeSeries, data_type: str) -> None:
"""
Record housekeeping time series data to AWS TimeStream database.
This function processes and stores housekeeping telemetry data, with special
handling for MEDDEA data.
Parameters
----------
hk_ts : TimeSeries
The housekeeping time series data to be recorded. The original time series
is not modified; a copy is created for processing.
data_type : str
The type of data being recorded (e.g., "meddea"). This determines the
processing pipeline applied to the data.
"""
# Create a copy to avoid modifying the original
my_ts = hk_ts.copy()
# Special handling for MEDDEA data
if data_type == "meddea":
# Convert MEDDEA column names from OBC input to PADRE MEDDEA standard
my_ts = util.convert_meddea_colnames(my_ts)
# Remove unwanted columns
col_to_removes = ["CCSDS1", "CCSDS3", "checksum", "timestamp_ms"]
for this_col in col_to_removes:
if this_col in hk_ts.colnames:
my_ts.remove_column(this_col)
# Fix Bad Data in the Time Series
my_ts = util.remove_bad_data(my_ts)
# Apply calibration from padre_meddea to housekeeping data
my_ts = calibrate_hk_ts(my_ts)
# Record the Time Series in the TimeStream Database
record_timeseries(ts=my_ts, ts_name=data_type, instrument_name="craft")
[docs]
def record_orbit(padre_orbit_ts: TimeSeries) -> None:
"""Send the orbit time series to AWS."""
record_timeseries(padre_orbit_ts, "orbit", "craft")
[docs]
def record_dirlist(this_dirlist: DirList) -> None:
"""
Record directory listing summary data (file sizes and counts) to AWS.
This function converts a DirList into summary time series for file sizes
and file counts, then uploads those summaries to the time series database.
Parameters
----------
this_dirlist : DirList
Directory listing to be summarized into file size and file count
time series for upload.
"""
# record the file sizes
summary_ts = this_dirlist.to_summary_ts(metric_type="size")
record_timeseries(
ts=summary_ts, ts_name="dirlist_file_size", instrument_name="craft"
)
# record the file counts
summary_ts = this_dirlist.to_summary_ts(metric_type="count")
record_timeseries(
ts=summary_ts, ts_name="dirlist_file_count", instrument_name="craft"
)