Source code for padre_craft.io.file_tools

"""
Provides generic file readers.
"""

from pathlib import Path

import astropy.io.fits as fits
from astropy.io import ascii
from astropy.time import Time
from astropy.timeseries import TimeSeries

from padre_craft import LAUNCH_DATE
from padre_craft.util.util import filename_to_datatype

__all__ = ["read_file", "read_raw_file", "read_fits"]


[docs] def read_file(filename: Path): """ Read a file. Parameters ---------- filename: Path A file to read. Returns ------- data Examples -------- """ # TODO: the following should use parse_science_filename this_path = Path(filename) match this_path.suffix.lower(): case ".csv": # raw binary file result = read_raw_file(this_path) case ".fits": # level 0 or above result = read_fits(this_path) case _: raise ValueError(f"File extension {this_path.suffix} not recognized.") return result
[docs] def read_raw_file(file_path: Path) -> TimeSeries: """ Read a raw (csv) data file and return a timeseries. Note that columns that cannot are not recognized as ints or floats are removed. Parameters --------- filename : Path A file to read Returns ------- ts : TimeSeries The timeseries data """ if not isinstance(file_path, Path): file_path = Path(file_path) # Read in the CSV data_table = ascii.read(file_path, format="csv") # Make sure the time column exists time_column_name = "timestamp_ms" if time_column_name not in data_table.colnames: raise ValueError("No time column found, timestamp_ms") # Convert Unix milliseconds to Astropy Time time = Time(data_table[time_column_name] / 1000.0, format="unix") time.format = "isot" # Create the TimeSeries ts = TimeSeries(time=time, data=data_table) # Validate Times in the Data if any(ts.time < LAUNCH_DATE) > 0: raise ValueError("Found time before launch.") # Filter out non-numeric columns except time bad_col_names = [] for this_col in ts.itercols(): if not isinstance(this_col, Time): if this_col.dtype.kind not in ["i", "f"]: bad_col_names.append(this_col.name) if len(bad_col_names) > 0: ts.remove_columns(bad_col_names) # Record Metadata about the File ts.meta.update({"filename": file_path.name}) ts.meta.update({"data_type": filename_to_datatype(file_path.name)}) ts.sort() return ts
[docs] def read_fits(filename: Path): """ Read a fits file of any level and return the appropriate data objects. """ hdul = fits.open(filename) # header = hdul[0].header.copy() return hdul