"""
This module provides general utility functions.
"""
import numbers
import os
import re
import time
import traceback
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Union
import astropy.units as u
import boto3
import numpy as np
import requests
import sunpy.net.attrs as a
import sunpy.time
from astropy.time import Time
from astropy.timeseries import TimeSeries
from botocore import UNSIGNED
from botocore.client import Config
from botocore.exceptions import ClientError, NoCredentialsError
from dateutil.relativedelta import relativedelta
from parfive import Downloader
from sunpy.net.attr import AttrAnd, AttrOr, AttrWalker, SimpleAttr
from sunpy.net.base_client import BaseClient, QueryResponseTable, convert_row_to_table
import swxsoc
from swxsoc.util.exceptions import warn_user
__all__ = [
"create_science_filename",
"parse_science_filename",
"SWXSOCClient",
"SearchTime",
"Level",
"Instrument",
"Descriptor",
"DevelopmentBucket",
"record_timeseries",
"get_dashboard_id",
"get_panel_id",
"query_annotations",
"create_annotation",
"remove_annotation_by_id",
"_record_dimension_timestream",
]
TIME_FORMAT = "%Y%m%dT%H%M%S" # YYYYMMDDTHHMMSS
TIME_PATTERNS = {
"unix_ms": re.compile(r"(?<!\d)\d{13}(?!\d)"), # unix time stamps in milliseconds
"unix_s": re.compile(r"(?<!\d)\d{10}(?!\d)"), # unix time stamps in seconds
"%Y-%m-%dT%H:%M:%S": re.compile(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}"), # ISO 8601
"%Y%m%d-%H%M%S": re.compile(r"\d{8}-\d{6}"), # YYYYMMDD-HHMMSS
"%Y%m%dT%H%M%S": re.compile(r"\d{8}T\d{6}"), # YYYYMMDDTHHMMSS
"%Y%m%d%H%M%S": re.compile(r"(?<!\d)\d{14}(?!\d)"), # YYYYMMDDHHMMSS
"%y%m%d%H%M%S": re.compile(r"(?<!\d)\d{12}(?!\d)"), # YYMMDDHHMMSS
"%Y%j-%H%M%S": re.compile(r"\d{7}-\d{6}"), # YYYYJJJ-HHMMSS
"%Y%j_%H%M%S": re.compile(r"\d{7}_\d{6}"), # YYYYJJJ_HHMMSS
"%Y%m%d": re.compile(r"(?<!\d)\d{8}(?!\d)"), # YYYYMMDD
}
def create_science_filename(
instrument: str,
time: str,
level: str,
version: str,
mode: str = "",
descriptor: str = "",
test: bool = False,
):
"""Return a compliant filename. The format is defined as
{mission}_{inst}_{mode}_{level}{test}_{descriptor}_{time}_v{version}.cdf
This format is only appropriate for data level >= 1.
Parameters
----------
instrument : `str`
The instrument name. Must be one of the following "eea", "nemesis", "merit", "spani"
time : `str` (in isot format) or ~astropy.time
The time
level : `str`
The data level. Must be one of the following "l0", "l1", "l2", "l3", "l4", "ql"
version : `str`
The file version which must be given as X.Y.Z
descriptor : `str`
An optional file descriptor.
mode : `str`
An optional instrument mode.
test : bool
Selects whether the file is a test file.
Returns
-------
filename : `str`
A CDF file name including the given parameters that matches the mission's file naming conventions
Raises
------
ValueError: If the instrument is not recognized as one of the mission's instruments
ValueError: If the data level is not recognized as one of the mission's valid data levels
ValueError: If the data version does not match the mission's data version formatting conventions
ValueError: If the data product descriptor or instrument mode do not match the mission's formatting conventions
"""
test_str = ""
mission_config = swxsoc.config["mission"]
if isinstance(time, str):
time_str = Time(time, format="isot").strftime(TIME_FORMAT)
else:
time_str = time.strftime(TIME_FORMAT)
if instrument not in mission_config["inst_names"]:
raise ValueError(
f"Instrument, {instrument}, is not recognized. Must be one of {mission_config['inst_names']}."
)
if level not in mission_config["valid_data_levels"]:
raise ValueError(
f"Level, {level}, is not recognized. Must be one of {mission_config['valid_data_levels']}."
)
# check that version is in the right format with three parts
if len(version.split(".")) != 3:
raise ValueError(
f"Version, {version}, is not formatted correctly. Should be X.Y.Z"
)
# check that version has integers in each part
for item in version.split("."):
try:
int(item)
except ValueError:
raise ValueError(f"Version, {version}, is not all integers.")
if test is True:
test_str = "test"
# the parse_science_filename function depends on _ not being present elsewhere
if ("_" in mode) or ("_" in descriptor):
raise ValueError(
"The underscore symbol _ is not allowed in mode or descriptor."
)
# Parse Filename and Instrument Name out of the config
mission_name = mission_config["mission_name"]
instrument_shortname = mission_config["inst_to_shortname"].get(
instrument, instrument
)
# Combine Parts into Filename
filename = f"{mission_name}_{instrument_shortname}_{mode}_{level}{test_str}_{descriptor}_{time_str}_v{version}"
filename = filename.replace("__", "_") # reformat if mode or descriptor not given
return filename + mission_config["file_extension"]
def _get_instrument_mapping(config: dict) -> dict:
"""
Maps instrument shortnames to their full names and additional names.
This is used for parsing filenames and ensuring consistency in naming.
Parameters
----------
config : dict
The configuration dictionary containing mission and instrument details.
Returns
-------
dict
A dictionary mapping shortnames to full names and additional names.
"""
return {
**{s: m for m, s in config["inst_to_shortname"].items()},
**{s: m for m, lst in config["inst_to_extra_inst_names"].items() for s in lst},
}
def _parse_standard_format(filename: str, mission_config: dict) -> dict:
"""
Parses the standard filename format and extracts relevant fields.
Handles the following format:
{mission}_{inst}_{mode}_{level}{test}_{descriptor}_{time}_v{version}.{extension}
Parameters
----------
filename : str
The filename to parse (with or without path).
mission_config : dict
The configuration dictionary containing mission and instrument details.
Returns
-------
dict
A dictionary containing the parsed fields.
Raises
------
ValueError
If the filename does not match the expected format or contains invalid values.
"""
result = {}
mission_name = mission_config["mission_name"]
shortnames = mission_config["inst_shortnames"]
# Split the filename into components
filename = Path(filename).stem
components = filename.split("_")
# Handle mission names that contain underscores (e.g. "swxsoc_pipeline")
# by joining the appropriate number of leading components
mission_name_parts = mission_name.split("_")
n_mission_parts = len(mission_name_parts)
parsed_mission_name = "_".join(components[:n_mission_parts])
if parsed_mission_name != mission_name:
warn_user(
f"Not a valid mission name: {parsed_mission_name}. Expected: {mission_name}. Reverting to parsing with assumption of configured mission name.",
)
else:
# Strip mission name parts so remaining components start with instrument
components = components[n_mission_parts:]
if components[0] not in shortnames:
raise ValueError(
f"Invalid instrument shortname: {components[0]}. Expected one of {shortnames}"
)
# Parse Instrument Name
inst_name = components[0]
mapping = _get_instrument_mapping(mission_config)
result["instrument"] = mapping.get(inst_name.lower(), inst_name)
result["time"] = _extract_time(
filename, expected_format=TIME_FORMAT, mission_config=mission_config
)
# Handle optional fields: mode, test, descriptor
result["test"] = "test" in components[1] or "test" in components[2]
if components[1][:2] not in mission_config["valid_data_levels"]:
result["mode"] = components[1]
result["level"] = components[2].replace("test", "")
if len(components) == 6:
result["descriptor"] = components[3]
else:
result["level"] = components[1].replace("test", "")
if len(components) == 5:
result["descriptor"] = components[2]
result["version"] = components[-1].lstrip("v")
return result
def _extract_instrument_name(filename: str, mission_config: dict) -> str:
"""
Extracts the instrument name from the filename using regex patterns.
Parameters
----------
filename : str
The filename from which to extract the instrument name.
mission_config : dict
The configuration dictionary containing mission and instrument details.
Returns
-------
str
The extracted instrument name.
Raises
------
ValueError
If no valid instrument name is found in the filename.
"""
all_inst_names = [
name.lower()
for name in (
mission_config["inst_names"]
+ mission_config["inst_shortnames"]
+ [n for sublist in mission_config["extra_inst_names"] for n in sublist]
)
]
mission_name = mission_config["mission_name"].lower()
pattern = re.compile(
rf"(?:^|[_\-.]|{mission_name})(" # Group 1: Prefix
+ "|".join(
re.escape(name) for name in all_inst_names
) # Group 2: Instrument name
+ r"(?:\d+)?)(?:[_\-.]|$|\d)", # Group 3: Suffix,
re.IGNORECASE,
)
matches = pattern.findall(filename.lower())
if not matches:
raise ValueError(f"No valid instrument name found in {filename}")
if len(matches) > 1:
raise ValueError(f"Multiple instrument names found: {matches}")
return matches[0]
def _extract_data_level(filename: str, possible_levels: List[str]) -> str:
"""
Extracts the data level from the filename using regex patterns. If no data level is found, then the first possible level is returned.
Parameters
----------
filename : str
The filename from which to extract the data level.
possible_levels : List[str]
A list of possible data levels to search for.
Returns
-------
str
The extracted data level.
"""
if len(possible_levels) == 1:
# Exact match (e.g. 'raw')
return possible_levels[0]
# Grouped levels (L0-L3): Extract from filename
# Search filename for 'l0', 'l1', etc.
found_level = None
for lvl in possible_levels:
# Simple check: is 'l1' sandwiched by delimiters?
if re.search(rf"[_\-.]{lvl}[_\-.]", filename, re.IGNORECASE):
found_level = lvl
break
return found_level if found_level else possible_levels[0]
def _extract_time(
filename: str,
expected_format: Optional[str] = None,
mission_config: Optional[dict] = None,
) -> Time:
"""
Extracts time from the filename using regex patterns.
Handles various formats including ISO 8601 and legacy L0 formats.
Parameters
----------
filename : str
The filename from which to extract the time.
expected_format : Optional[str]
The expected time format to use for parsing.
mission_config : Optional[dict]
The configuration dictionary containing mission details.
Returns
-------
Time
The extracted time as an astropy Time object.
Raises
------
ValueError
If no recognizable time format is found in the filename.
ValueError
If the extracted time is outside the valid range defined in the mission configuration.
"""
time_parsers = [
_try_parse_with_expected_format,
_try_all_patterns,
]
# Use Strategy Pattern to try different parsers
for parser in time_parsers:
result = parser(filename, expected_format)
if result:
return _validate_time(result, mission_config=mission_config)
raise ValueError(f"No recognizable time format in {filename}")
def _try_parse_with_expected_format(
filename: str, expected_format: str
) -> Optional[Time]:
"""
Try to parse time using the expected format.
Parameters
----------
filename : str
The filename from which to extract the time.
expected_format : str
The expected time format to use for parsing.
Examples
--------
>>> _try_parse_with_expected_format("swxsoc_eea_l1_20230115T123045_v1.0.0.cdf", "%Y%m%dT%H%M%S")
<Time object: scale='utc' format='datetime' value=2023-01-15 12:30:45>
>>> _try_parse_with_expected_format("padre_get_EPS_9_Data_1673785845000.csv", "unix_ms")
<Time object: scale='utc' format='isot' value=2023-01-15T12:30:45.000>
"""
# Return early if no expected format is provided
if not expected_format:
return None
# Get the regex pattern for the expected format
pattern = TIME_PATTERNS.get(expected_format)
if not pattern:
swxsoc.log.warning(
f"No regex pattern found for expected time format '{expected_format}'. "
"Falling back to all patterns."
)
return None
# Look for a match in the filename using the expected format
match = pattern.search(filename)
if not match:
swxsoc.log.warning(
f"No time string matching expected format '{expected_format}' found in {filename}."
)
return None
time_str = match.group(0)
return _parse_time_string(time_str, expected_format)
def _try_all_patterns(filename: str, *args, **kwargs) -> Optional[Time]:
"""
Try to parse time using all known patterns.
Parameters
----------
filename : str
The filename from which to extract the time.
Returns
-------
Time
The extracted time as an astropy Time object, or None if not found.
Examples
--------
>>> _try_all_patterns("swxsoc_eea_l1_20230115T123045_v1.0.0.cdf")
<Time object: scale='utc' format='datetime' value=2023-01-15 12:30:45>
"""
for format_str, pattern in TIME_PATTERNS.items():
match = pattern.search(filename)
if match:
time_str = match.group(0)
parsed_time = _parse_time_string(time_str, format_str)
if parsed_time:
return parsed_time
return None
def _parse_time_string(time_str: str, format_str: str) -> Optional[Time]:
"""
Parse a time string with a specific format.
Parameters
----------
time_str : str
The time string to parse.
format_str : str
The format string to use for parsing.
Examples
--------
>>> _parse_time_string("2023-01-15 12:30:45", "%Y-%m-%d %H:%M:%S")
<Time object: scale='utc' format='datetime' value=2023-01-15 12:30:45>
>>> _parse_time_string("1673785845000", "unix_ms")
<Time object: scale='utc' format='isot' value=2023-01-15T12:30:45.000>
>>> _parse_time_string("invalid", "%Y-%m-%d")
"""
# Special case for unix time
if format_str in ("unix_ms", "unix_s"):
return _parse_unix_timestamp(time_str, format_str)
# Try datetime string formatters
try:
return Time(datetime.strptime(time_str, format_str))
except ValueError:
pass
# Fall back to sunpy parser as last resort
try:
return Time(sunpy.time.parse_time(time_str))
except Exception:
return None
def _parse_unix_timestamp(time_str: str, format_str: str) -> Time:
"""
Parse Unix timestamp in milliseconds or seconds.
Parameters
----------
time_str : str
The Unix timestamp string.
format_str : str
The format identifier: ``"unix_ms"`` for milliseconds, or ``"unix_s"`` for seconds.
Returns
-------
Time
The parsed time as an astropy Time object.
Examples
--------
>>> _parse_unix_timestamp("1673785845000", "unix_ms")
<Time object: scale='utc' format='isot' value=2023-01-15T12:30:45.000>
>>> _parse_unix_timestamp("1673785845", "unix_s")
<Time object: scale='utc' format='isot' value=2023-01-15T12:30:45.000>
"""
divisor = 1000.0 if format_str == "unix_ms" else 1.0
t_unix = Time(int(time_str) / divisor, format="unix")
t_unix.format = "isot" # Need to set format to isot for consistency
return t_unix
def _validate_time(extracted_time: Time, mission_config: Optional[dict] = None) -> Time:
"""
Validate the extracted time against configured mission constraints.
When mission_config is provided, raises ValueError for times outside the valid range.
When mission_config is None, issues warnings for suspicious times but does not raise.
Parameters
----------
extracted_time : Time
The extracted time to validate.
mission_config : Optional[dict], optional
The configuration dictionary containing mission details with 'min_valid_time'
and 'max_valid_time' keys. If None, performs basic validation with warnings only.
Returns
-------
Time
The validated time (same as input).
Raises
------
ValueError
If mission_config is provided and the extracted time is before the configured
minimum valid time (mission_config['min_valid_time']).
ValueError
If mission_config is provided and the extracted time is after the configured
maximum valid time (mission_config['max_valid_time']).
"""
if mission_config is None:
# Fallback to basic validation when no config provided
if extracted_time > Time.now():
swxsoc.log.warning(f"Found future time {extracted_time}.")
if extracted_time < Time("1970-01-01"):
swxsoc.log.warning(f"Found suspiciously old time {extracted_time}.")
return extracted_time
# Get configured time constraints
min_valid_time = mission_config.get("min_valid_time")
max_valid_time = mission_config.get("max_valid_time")
# Validate minimum time
if min_valid_time and extracted_time < min_valid_time:
raise ValueError(
f"Extracted time {extracted_time} is before mission minimum valid time {min_valid_time}."
)
# Validate maximum time
if max_valid_time and extracted_time > max_valid_time:
raise ValueError(
f"Extracted time {extracted_time} is after mission maximum valid time {max_valid_time}."
)
return extracted_time
[docs]
def parse_science_filename(filepath: str) -> dict:
"""
Parses a science filename into its constituent properties.
Parameters
----------
filepath : str
Fully qualified filepath of an input file.
Returns
-------
dict
Parsed fields such as instrument, mode, test, time, level, version, and descriptor.
Raises
------
ValueError
If mission name or instrument is not recognized, or time format is invalid.
"""
import swxsoc
# setup defaults
mission_config = swxsoc.config["mission"]
filepath = Path(filepath)
filename = filepath.name
file_ext = filepath.suffix
result = {
"instrument": None,
"mode": None,
"test": False,
"time": None,
"level": None,
"version": None,
"descriptor": None,
}
# Case 1: The file is in a standard format used for archive/science files
if file_ext == mission_config["file_extension"]:
parsed = _parse_standard_format(filename, mission_config)
result.update(parsed)
return result
# Extract instrument name for file rule matching
try:
inst_name_raw = _extract_instrument_name(filename, mission_config)
mapping = _get_instrument_mapping(mission_config)
inst_name = mapping.get(inst_name_raw.lower(), inst_name_raw)
result["instrument"] = inst_name
except ValueError as e:
raise ValueError(f"Error extracting instrument name: {e}")
# Check for specific File Rules
matched_rule = None
mission_rules = mission_config.get("inst_file_rules", {})
inst_rules = mission_rules.get(inst_name, [])
for rule in inst_rules:
# Check Extension
if file_ext.lower() == rule["extension"].lower():
matched_rule = rule
break
# Case 2: The file is in a non-standard format, but matches a known rule
if matched_rule:
# Extract Data Level
data_level = _extract_data_level(filename, matched_rule["levels"])
# Get the expected time format based on rule definition
expected_format = matched_rule.get("time_format")
# Parse time using the expected format
parsed_time = _extract_time(
filename, expected_format=expected_format, mission_config=mission_config
)
result.update(
{
"mission": mission_config["mission_name"].lower(),
"level": data_level,
"time": parsed_time,
}
)
# Case 3: The file does not match any known format
else:
parsed_time = _extract_time(filename, mission_config=mission_config)
result.update(
{
"mission": mission_config["mission_name"].lower(),
"instrument": inst_name, # At least we got the instrument from the filename
"time": parsed_time,
"level": _extract_data_level(
filename, mission_config["valid_data_levels"]
),
}
)
return result
def get_instrument_package(instrument_name: str) -> str:
"""
Determines the package name of the correct instrument package to use for processing a file based on the instrument name.
This is determined through two possibilities:
1. The instrument name is directly mapped to a package in the instrument configuration under "instrument_package".
2. The package is default determined by "{mission__name}_{instrument_name}"
Parameters
----------
instrument_name : str
The name of the instrument to find the package for.
Returns
-------
str
The name of the package to use for processing files from the specified instrument.
Raises
------
ValueError
If the instrument name is not recognized as one of the mission's instruments.
"""
mission_config = swxsoc.config["mission"]
# sanitize instrument name for matching (e.g. case insensitive)
instrument_name = instrument_name.lower()
# check if the instrument is available for the mission
if instrument_name not in mission_config["inst_names"]:
raise ValueError(
f"Instrument, {instrument_name}, is not recognized. Must be one of {list(mission_config['inst_names'])}."
)
# get the instrument configuration
inst_package = mission_config["inst_packages"].get(instrument_name)
if inst_package:
# if a package is explicitly defined for the instrument, use it
return inst_package
else:
# otherwise, default to the convention of {mission_name}_{instrument_name}
return f"{mission_config['mission_name'].lower()}_{instrument_name.lower()}"
# ================================================================================================
# SWXSOC FIDO CLIENT
# ================================================================================================
# Initialize the attribute walker
walker = AttrWalker()
# Map sunpy attributes to SWXSOC attributes for easy access
class SearchTime(a.Time):
"""
Attribute for specifying the time range for the search.
Attributes
----------
start : `str`
The start time in ISO format.
end : `str`
The end time in ISO format.
"""
class Level(a.Level):
"""
Attribute for specifying the data level for the search.
Attributes
----------
value : str
The data level value.
"""
class Instrument(a.Instrument):
"""
Attribute for specifying the instrument for the search.
Attributes
----------
value : str
The instrument value.
"""
class Descriptor(a.Detector):
"""
Attribute to specify the data type for the search.
Attributes
----------
value : str
The data type
"""
class DevelopmentBucket(SimpleAttr):
"""
Attribute for specifying whether to search in the DevelopmentBucket for testing purposes.
Attributes
----------
value : bool
Whether to use the DevelopmentBucket. Defaults to False.
"""
@walker.add_creator(AttrOr)
def create_or(wlk, tree):
"""
Creates an 'AttrOr' object from the provided tree of attributes.
Parameters
----------
wlk : AttrWalker
The AttrWalker instance used for creating the attributes.
tree : AttrOr
The 'AttrOr' tree structure.
Returns
-------
list
A list of created attributes.
"""
results = []
for sub in tree.attrs:
results.append(wlk.create(sub))
return results
@walker.add_creator(AttrAnd)
def create_and(wlk, tree):
"""
Creates an 'AttrAnd' object from the provided tree of attributes.
Parameters
----------
wlk : AttrWalker
The AttrWalker instance used for creating the attributes.
tree : AttrAnd
The 'AttrAnd' tree structure.
Returns
-------
list
A list containing a single dictionary of attributes.
"""
result = {}
for sub in tree.attrs:
wlk.apply(sub, result)
return [result]
@walker.add_applier(SearchTime)
def apply_time(wlk, attr, params):
"""
Applies 'a.Time' attribute to the parameters.
Parameters
----------
wlk : AttrWalker
The AttrWalker instance used for applying the attributes.
attr : a.Time
The 'a.Time' attribute to be applied.
params : dict
The parameters dictionary to be updated.
"""
params.update({"startTime": attr.start.isot, "endTime": attr.end.isot})
@walker.add_applier(Level)
def apply_level(wlk, attr, params):
"""
Applies 'a.Level' attribute to the parameters.
Parameters
----------
wlk : AttrWalker
The AttrWalker instance used for applying the attributes.
attr : a.Level
The 'a.Level' attribute to be applied.
params : dict
The parameters dictionary to be updated.
"""
params.update({"level": attr.value.lower()})
@walker.add_applier(Instrument)
def apply_instrument(wlk, attr, params):
"""
Applies 'a.Instrument' attribute to the parameters.
Parameters
----------
wlk : AttrWalker
The AttrWalker instance used for applying the attributes.
attr : a.Instrument
The 'a.Instrument' attribute to be applied.
params : dict
The parameters dictionary to be updated.
"""
params.update({"instrument": attr.value.upper()})
@walker.add_applier(DevelopmentBucket)
def apply_development_bucket(wlk, attr, params):
"""
Applies 'DevelopmentBucket' attribute to the parameters.
Parameters
----------
wlk : AttrWalker
The AttrWalker instance used for applying the attributes.
attr : DevelopmentBucket
The 'DevelopmentBucket' attribute to be applied.
params : dict
The parameters dictionary to be updated.
"""
params.update({"use_development_bucket": attr.value})
@walker.add_applier(Descriptor)
def apply_descriptor(wlk, attr, params):
"""
Applies 'DevelopmentBucket' attribute to the parameters.
Parameters
----------
wlk : AttrWalker
The AttrWalker instance used for applying the attributes.
attr : DevelopmentBucket
The 'DevelopmentBucket' attribute to be applied.
params : dict
The parameters dictionary to be updated.
"""
params.update({"descriptor": attr.value})
class SWXSOCClient(BaseClient):
"""
Client for searching for SWXSOC data on AWS.
This client provides search and fetch functionality for SWXSOC data and is based on the sunpy BaseClient for FIDO.
For more information on the sunpy BaseClient, see: https://docs.sunpy.org/en/stable/generated/api/sunpy.net.base_client.BaseClient.html
Note that AWS buckets may require access keys.
Examples
--------
>>> from swxsoc.util import SWXSOCClient, SearchTime, Level, Descriptor, Instrument
>>> client = SWXSOCClient()
>>> query = AttrAnd([SearchTime(start=Time("2025-07-10T00:00:00"), end=Time("2025-07-11T00:00:00")),
... Instrument("meddea"),
... Level("l0"),
... Descriptor("housekeeping")])
>>> results = client.search(query) # doctest: +SKIP
"""
size_column = "size"
def search(self, query=None):
"""
Searches for data based on the given query.
Parameters
----------
query : AttrAnd
The query object specifying search criteria.
Returns
-------
QueryResponseTable
A table containing the search results.
"""
if query is None:
query = AttrAnd([])
queries = walker.create(query)
swxsoc.log.info(f"Searching with {queries}")
results = []
for query_parameters in queries:
results.extend(self._make_search(query_parameters))
if results == []:
return QueryResponseTable(names=[], rows=[], client=self)
names = [
"instrument",
"mode",
"test",
"time",
"level",
"version",
"descriptor",
"key",
"size",
"bucket",
"etag",
"storage_class",
"last_modified",
]
return QueryResponseTable(names=names, rows=results, client=self)
@convert_row_to_table
def fetch(self, query_results, *, path, downloader, **kwargs):
"""
Fetches the files based on query results and queues them up to be downloaded to the specified path by your downloader.
Note: The downloader must be an instance of parfive.Downloader
Parameters
----------
query_results : list
The results of the search query.
path : str
The directory path where files should be saved.
downloader : Downloader
The parfive downloader instance used for fetching files.
"""
if not isinstance(downloader, Downloader):
raise ValueError("Downloader must be an instance of parfive.Downloader")
for row in query_results:
swxsoc.log.info(f"Fetching {row['key']}")
if path is None or path == ".":
path = os.getcwd()
if os.path.exists(path) and not os.path.isdir(path):
raise ValueError(f"Path {path} is not a directory")
filepath = self._make_filename(path, row)
presigned_url = self.generate_presigned_url(row["bucket"], row["key"])
url = (
presigned_url
if presigned_url is not None
else f"https://{row['bucket']}.s3.amazonaws.com/{row['key']}"
)
downloader.enqueue_file(url, filename=filepath)
@classmethod
def _make_filename(cls, path, row):
"""
Creates a filename based on the provided path and row data.
Parameters
----------
path : str
The directory path.
row : dict
The row data containing the file key.
Returns
-------
str
The full file path.
"""
return os.path.join(path, row["key"].split("/")[-1])
@staticmethod
def generate_presigned_url(bucket_name, object_key, expiration=3600):
"""
Generates a presigned URL for accessing an object in S3. If credentials are not available
or access is denied, attempts an unsigned request for public access.
Parameters
----------
bucket_name : str
The name of the S3 bucket.
object_key : str
The key of the S3 object.
expiration : int, optional
The expiration time in seconds for the presigned URL. Default is 3600 seconds.
Returns
-------
str or None
The presigned URL if successful, or a direct unsigned URL if public access is allowed.
Otherwise, returns None.
"""
try:
# Attempt to generate a presigned URL with credentials
s3_client = boto3.client("s3")
# Try to list one object to check if credentials are available
s3_client.list_objects_v2(Bucket=bucket_name, MaxKeys=1)
response = s3_client.generate_presigned_url(
"get_object",
Params={"Bucket": bucket_name, "Key": object_key},
ExpiresIn=expiration,
)
return response
except NoCredentialsError:
swxsoc.log.warning("Credentials not available. Trying unsigned access.")
except ClientError as e:
error_code = e.response["Error"]["Code"]
if error_code == "AccessDenied":
swxsoc.log.warning(
f"Access denied to {bucket_name}/{object_key}. Trying unsigned access."
)
else:
swxsoc.log.warning(f"Error generating presigned URL: {e}")
return None
# If credentials are missing or access is denied, try unsigned access
try:
# Attempt to access the object with an unsigned request (public access)
swxsoc.log.info(f"Attempting unsigned access to {bucket_name}/{object_key}")
url = f"https://{bucket_name}.s3.amazonaws.com/{object_key}"
return url
except ClientError as unsigned_error:
print(f"Unsigned access failed: {unsigned_error}")
return None
@classmethod
def _can_handle_query(cls, *query):
"""
Determines if the client can handle the given query based on its attributes.
Parameters
----------
query : tuple
The query attributes to check.
Returns
-------
bool
True if the client can handle the query, otherwise False.
"""
query_attrs = set(type(x) for x in query)
supported_attrs = {SearchTime, Level, Instrument, DevelopmentBucket}
return supported_attrs.issuperset(query_attrs)
@classmethod
def _make_search(cls, query):
"""
Performs a search based on the provided query parameters.
Parameters
----------
query : dict
The query parameters including instrument, levels, time range, and development bucket flag.
Returns
-------
list
A list of rows containing the search results.
"""
instrument = query.get("instrument")
levels = query.get("level")
start_time = query.get("startTime")
end_time = query.get("endTime")
descriptor = query.get("descriptor")
use_development_bucket = query.get("use_development_bucket")
if levels is not None and not isinstance(levels, list):
levels = [levels]
if levels is not None and len(levels) > 0:
for level in levels:
if level not in swxsoc.config["mission"]["valid_data_levels"]:
raise ValueError(f"Invalid data level: {level}")
else:
levels = swxsoc.config["mission"]["valid_data_levels"]
if start_time is None:
start_time = "2000-01-01"
if end_time is None:
end_time = datetime.now().isoformat()
instrument_buckets = {
f"{swxsoc.config['mission']['inst_to_targetname'][inst]}": (
f"{'dev-' if use_development_bucket else ''}"
f"{swxsoc.config['mission']['mission_name']}-{inst}"
)
for inst in swxsoc.config["mission"]["inst_names"]
}
swxsoc.log.debug(f"Mapping of instruments to S3 buckets: {instrument_buckets}")
if instrument is None or instrument not in instrument_buckets:
swxsoc.log.info(
"No instrument specified or invalid instrument. Searching all instruments."
)
instrument_bucket_to_search = instrument_buckets.values()
else:
swxsoc.log.info(f"Searching for instrument: {instrument}")
instrument_bucket_to_search = [instrument_buckets[instrument]]
swxsoc.log.info(f"Searching in buckets: {instrument_bucket_to_search}")
files_in_s3 = cls.list_files_in_s3(instrument_bucket_to_search)
if levels is not None or start_time is not None or end_time is not None:
swxsoc.log.info(
f"Searching for files with level {levels} between {start_time} and {end_time}"
)
if descriptor:
swxsoc.log.info(f"Searching for files with descriptor: {descriptor}")
prefixes = cls.generate_prefixes(levels, start_time, end_time, descriptor)
matched_files = []
for this_s3_file in files_in_s3:
for this_prefix_list in prefixes:
if all(
this_token in str(Path(this_s3_file["Key"]).parent)
for this_token in this_prefix_list
):
matched_files.append(this_s3_file)
else:
swxsoc.log.info("Searching for all files")
# remove duplicates
unique_matched_files = []
seen = []
for this_file in matched_files:
if this_file["Key"] not in seen:
seen.append(this_file["Key"])
unique_matched_files.append(this_file)
matched_files = unique_matched_files
swxsoc.log.info(
f"Found {len(matched_files)} files in S3 matching search criteria"
)
rows = []
for s3_object in matched_files:
swxsoc.log.debug(f"Processing S3 object: {s3_object}")
try:
info = parse_science_filename(s3_object["Key"])
except ValueError:
info = {}
row = [
info.get("instrument", "unknown"),
info.get("mode", "unknown"),
info.get("test", False),
info.get("time", "unknown"),
info.get("level", "unknown"),
info.get("version", "unknown"),
info.get("descriptor", "unknown"),
s3_object["Key"],
s3_object["Size"] * u.byte,
s3_object["Bucket"],
s3_object["ETag"],
s3_object["StorageClass"],
s3_object["LastModified"],
]
rows.append(row)
return rows
@staticmethod
def list_files_in_s3(bucket_names: list) -> list:
"""
Lists all files in the specified S3 buckets. If access is denied, it retries with an unsigned request.
Parameters
----------
bucket_names : list
A list of S3 bucket names.
Returns
-------
list
A list of dictionaries containing metadata about each S3 object.
"""
content = []
s3 = boto3.client("s3")
paginator = s3.get_paginator("list_objects_v2")
for bucket_name in bucket_names:
try:
# Try with authenticated client
pages = paginator.paginate(Bucket=bucket_name)
for page in pages:
for obj in page.get("Contents", []):
metadata = {
"Key": obj["Key"],
"LastModified": sunpy.time.parse_time(obj["LastModified"]),
"Size": obj["Size"],
"ETag": obj["ETag"],
"StorageClass": obj.get("StorageClass", "STANDARD"),
"Bucket": bucket_name,
}
content.append(metadata)
except (ClientError, NoCredentialsError) as e:
swxsoc.log.warning(f"Error accessing bucket {bucket_name}: {e}")
if isinstance(e, NoCredentialsError):
error_code = "NoCredentialsError"
elif isinstance(e, ClientError):
error_code = e.response["Error"]["Code"]
# Retry?
if error_code == "AccessDenied" or error_code == "NoCredentialsError":
swxsoc.log.warning(
f"Access denied to bucket {bucket_name}. Trying unsigned request."
)
# Retry with an unsigned (anonymous) client
try:
unsigned_s3 = boto3.client(
"s3", config=Config(signature_version=UNSIGNED)
)
unsigned_paginator = unsigned_s3.get_paginator(
"list_objects_v2"
)
pages = unsigned_paginator.paginate(Bucket=bucket_name)
for page in pages:
for obj in page.get("Contents", []):
metadata = {
"Key": obj["Key"],
"LastModified": sunpy.time.parse_time(
obj["LastModified"]
),
"Size": obj["Size"],
"ETag": obj["ETag"],
"StorageClass": obj.get("StorageClass", "STANDARD"),
"Bucket": bucket_name,
}
content.append(metadata)
except ClientError:
raise Exception(
f"Unsigned request failed for bucket {bucket_name} (Ensure you have the correct IAM permissions, or are on the VPN)"
)
else:
raise Exception(f"Error accessing bucket {bucket_name}: {e}")
return content
@staticmethod
def generate_prefixes(
levels: list, start_time: str, end_time: str, descriptor: str
) -> list:
"""
Generates a list of prefixes based on the level and time range.
Parameters
----------
levels : list
A list of data levels.
start_time : str
The start time in ISO format.
end_time : str
The end time in ISO format.
descriptor : str
The file descriptor
Returns
-------
list
A list of prefixes.
"""
current_time = datetime.fromisoformat(start_time)
end_time = datetime.fromisoformat(end_time)
prefixes = []
while current_time <= end_time:
for level in levels:
these_tokens = [
f"{current_time.year}",
f"{current_time.month:02d}",
level,
]
if descriptor:
these_tokens.append(descriptor)
prefixes.append(these_tokens)
current_time += relativedelta(months=1)
swxsoc.log.debug(f"Generated prefix: {prefixes}")
return prefixes
def record_timeseries(
ts: TimeSeries, ts_name: str = None, instrument_name: str = ""
) -> None:
"""
Record a timeseries of measurements to AWS Timestream for viewing on a dashboard like Grafana.
This function requires AWS credentials with permission to write to the AWS Timestream database.
Parameters
----------
ts : TimeSeries
A timeseries with column data to record. Note that times are assumed to be in UTC.
ts_name : str, optional
The name of the timeseries to record. If None or empty string, defaults to ts.meta['name']
or 'measurement_group'.
instrument_name : str, optional
The instrument name. If not provided or empty, uses ts.meta['INSTRUME'].
Returns
-------
None
Raises
------
ValueError
If instrument_name is invalid or not in the configured mission instrument names.
Notes
-----
Records are written in batches of 100 to comply with Timestream API limits.
Database and table names are automatically prefixed with 'dev-' when not in PRODUCTION environment.
NaN values are skipped entirely and not written to Timestream. When a NaN is encountered in the
timeseries data, that specific measure value is omitted from the record. The function logs the
total count of NaN values skipped across all columns and time points.
Data type inference follows a hierarchical approach to determine the appropriate Timestream type:
- **BOOLEAN**: Values of type `bool` or `np.bool_` are stored as BOOLEAN type with lowercase
string representation ("true" or "false") as required by Timestream.
- **DOUBLE**: Numeric values (instances of `numbers.Number`) are stored as DOUBLE type.
- **VARCHAR**: All other values default to VARCHAR type for text/string storage.
The boolean check is performed first since `bool` is a subclass of `int` in Python. This ensures
boolean flags are correctly identified and not mistakenly stored as numeric DOUBLE values.
"""
timestream_client = boto3.client("timestream-write", region_name="us-east-1")
# Get mission name swxsoc config
mission_name = swxsoc.config["mission"]["mission_name"]
# Validate Instrument name
instrument_name = (
instrument_name.lower()
if "INSTRUME" not in ts.meta
else ts.meta["INSTRUME"].lower()
)
if instrument_name == "" or instrument_name is None:
error = f"Invalid instrument name: {instrument_name}. Must be one of {swxsoc.config['mission']['inst_names']}."
swxsoc.log.error(error)
raise ValueError(error)
# Validate Timeseries name
if ts_name is None or ts_name == "":
ts_name = ts.meta.get("name", "measurement_group")
# Get the Database and Table names based on Dev / Prod environment
database_name = f"{mission_name}_sdc_aws_logs"
table_name = f"{mission_name}_measures_table"
if os.getenv("LAMBDA_ENVIRONMENT") != "PRODUCTION":
database_name = f"dev-{database_name}"
table_name = f"dev-{table_name}"
dimensions = [
{"Name": "mission", "Value": mission_name},
{"Name": "source", "Value": os.getenv("LAMBDA_ENVIRONMENT", "DEVELOPMENT")},
{"Name": "instrument", "Value": instrument_name},
]
# Create a list to hold all records to be written
records = []
total_nan_count = 0
# Loop over each time point in the timeseries, creating a record for each
for i, time_point in enumerate(ts.time):
measure_record = {
"Time": str(int(time_point.to_datetime().timestamp() * 1000)),
"Dimensions": dimensions,
"MeasureName": ts_name,
"MeasureValueType": "MULTI",
"MeasureValues": [],
}
for this_col in ts.colnames:
if this_col == "time": # skip the time column
continue
if len(ts[this_col].shape) == 1: # usual case, a single value in the column
# Handle both Quantity and regular values
if isinstance(ts[this_col], u.Quantity):
measure_unit = ts[this_col].unit
value = ts[this_col].value[i]
else:
measure_unit = ""
value = ts[this_col][i]
# Skip adding NaN values to the record
if isinstance(value, numbers.Number) and np.isnan(value):
total_nan_count += 1
continue
# Determine the appropriate Timestream data type
if isinstance(value, (bool, np.bool_)):
measure_type = "BOOLEAN"
measure_value = str(
value
).lower() # Timestream expects "true" or "false"
elif isinstance(value, numbers.Number):
measure_type = "DOUBLE"
measure_value = str(value)
else:
measure_type = "VARCHAR"
measure_value = str(value)
measure_record["MeasureValues"].append(
{
"Name": (
f"{this_col}_{measure_unit}" if measure_unit else this_col
),
"Value": measure_value,
"Type": measure_type,
}
)
else: # the values in the timeseries are arrays
values = ts[this_col][i]
if isinstance(values, u.Quantity):
values = values.value # remove the unit
values = values.flatten()
# Loop over each value in the array and add to MeasureValues
for i, value in enumerate(values):
# Skip adding NaN values to the record
if isinstance(value, numbers.Number) and np.isnan(value):
total_nan_count += 1
continue
# Determine the appropriate Timestream data type for array values
if isinstance(value, (bool, np.bool_)):
measure_type = "BOOLEAN"
measure_value = str(
value
).lower() # Timestream expects "true" or "false"
elif isinstance(value, numbers.Number):
measure_type = "DOUBLE"
measure_value = str(float(value))
else:
measure_type = "VARCHAR"
measure_value = str(value)
measure_record["MeasureValues"].append(
{
"Name": f"{this_col}_val{i}",
"Value": measure_value,
"Type": measure_type,
}
)
# Only add the record if there are MeasureValues to write
if measure_record["MeasureValues"]:
records.append(measure_record)
else:
swxsoc.log.debug(
f"Skipping record at time {time_point} for {ts_name} due to all NaN values."
)
# Log total NaN values skipped
if total_nan_count > 0:
swxsoc.log.info(f"Skipped {total_nan_count} NaN values in {ts_name}")
# Process records in batches of 100 to avoid exceeding the Timestream API limit
batch_size = 100
for start in range(0, len(records), batch_size):
chunk = records[start : start + batch_size] # noqa: E203
try:
result = timestream_client.write_records(
DatabaseName=database_name,
TableName=table_name,
Records=chunk,
)
swxsoc.log.info(
f"Successfully wrote {len(chunk)} {ts_name} records to Timestream: {database_name}/{table_name}, "
f"writeRecords Status: {result['ResponseMetadata']['HTTPStatusCode']}"
)
except timestream_client.exceptions.RejectedRecordsException as err:
swxsoc.log.error(f"Failed to write records to Timestream: {err}")
for rr in err.response["RejectedRecords"]:
swxsoc.log.info(f"Rejected Index {rr['RecordIndex']}: {rr['Reason']}")
if "ExistingVersion" in rr:
swxsoc.log.info(
f"Rejected record existing version: {rr['ExistingVersion']}"
)
except Exception as err:
swxsoc.log.error(f"Failed to write to Timestream: {err}")
# Log Stack trace for debugging
swxsoc.log.error(traceback.format_exc())
def _record_dimension_timestream(
dimensions: list,
instrument_name: str = None,
measure_name: str = "timestamp",
measure_value: any = None,
measure_value_type: str = "DOUBLE",
timestamp: str = None,
) -> None:
"""
Record a single measurement to an `AWS timestream <https://docs.aws.amazon.com/timestream/>`_ for viewing on a dashboard such as Grafana.
.. warning::
This function requires AWS credentials with permission to write to the AWS timestream database.
:param dimensions: A list of dimensions to record. Each dimension should be a dictionary with 'Name' and 'Value' keys.
:type dimensions: list[dict]
:param instrument_name: Optional. Name of the instrument to add as a dimension. Defaults to None.
:type instrument_name: str, optional
:param measure_name: The name of the measure being recorded. Defaults to "timestamp".
:type measure_name: str
:param measure_value: The value of the measure being recorded. Defaults to the current UTC timestamp if not provided.
:type measure_value: any, optional
:param measure_value_type: The type of the measure value (e.g., "DOUBLE", "BIGINT"). Defaults to "DOUBLE".
:type measure_value_type: str
:param timestamp: The timestamp for the record in milliseconds. Defaults to the current time if not provided.
:type timestamp: str, optional
:return: None
"""
timestream_client = boto3.client("timestream-write", region_name="us-east-1")
# Use current time in milliseconds if no timestamp is provided
if not timestamp:
timestamp = int(time.time() * 1000)
# Default measure_value to current UTC timestamp if not provided
utc_now = datetime.now(timezone.utc)
if measure_value is None:
measure_value = str(utc_now.timestamp())
swxsoc.log.info(f"Using timestamp: {timestamp}")
# Lowercase instrument name for consistency if provided
if instrument_name:
instrument_name = instrument_name.lower()
# Add instrument_name as a dimension if provided
if instrument_name and instrument_name in swxsoc.config["mission"]["inst_names"]:
dimensions.append({"Name": "InstrumentName", "Value": instrument_name})
else:
swxsoc.log.info(
"No valid instrument name provided. Skipping instrument dimension."
)
try:
# Get mission name from environment or default to 'hermes'
mission_name = swxsoc.config["mission"]["mission_name"]
# Define database and table names based on mission and environment
database_name = f"{mission_name}_sdc_aws_logs"
table_name = f"{mission_name}_measures_table"
if os.getenv("LAMBDA_ENVIRONMENT") != "PRODUCTION":
database_name = f"dev-{database_name}"
table_name = f"dev-{table_name}"
record = {
"Time": str(timestamp),
"Dimensions": dimensions,
"MeasureName": measure_name,
"MeasureValue": str(measure_value),
"MeasureValueType": measure_value_type,
}
# Write records to Timestream
timestream_client.write_records(
DatabaseName=database_name,
TableName=table_name,
Records=[record],
)
swxsoc.log.info(
f"Successfully wrote record {record} to Timestream: {database_name}/{table_name}"
)
except Exception as e:
swxsoc.log.error(f"Failed to write to Timestream: {e}")
def _to_milliseconds(dt: datetime) -> int:
"""
Converts a datetime object to milliseconds since epoch.
Args:
dt (datetime): Datetime object to convert.
Returns:
int: Milliseconds since epoch.
"""
if isinstance(dt, Time):
# Convert astropy Time object to a standard datetime object in UTC
dt = dt.to_datetime(timezone=None) # Convert to naive datetime in UTC
return int(dt.timestamp() * 1000)
return int(dt.timestamp() * 1000)
def get_dashboard_id(
dashboard_name: str, mission_dashboard: Optional[str] = None
) -> Optional[int]:
"""
Retrieves the dashboard UID by its name. Issues a warning if multiple dashboards with the same name are found.
Args:
dashboard_name (str): Name of the dashboard to retrieve.
Returns:
Optional[int]: The UID of the dashboard, or None if not found.
"""
try:
# Set the base URL and API key for Grafana Annotations API
# You need to set the GRAFANA_API_KEY environment variables to use this feature
API_KEY = os.environ.get("GRAFANA_API_KEY", None)
HEADERS = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}
BASE_URL = (
f"https://grafana.{swxsoc.config['mission']['mission_name']}.swsoc.smce.nasa.gov"
if not mission_dashboard
else f"https://grafana.{mission_dashboard}.swsoc.smce.nasa.gov"
)
response = requests.get(
f"{BASE_URL}/api/search", headers=HEADERS, params={"query": dashboard_name}
)
response.raise_for_status()
dashboards = response.json()
except requests.exceptions.HTTPError as e:
swxsoc.log.error(f"Failed to retrieve dashboards: {e}")
return None
except requests.exceptions.ConnectionError as e:
swxsoc.log.error(f"Failed to retrieve panels for dashboard: {e}")
return None
matching_dashboards = [
dashboard
for dashboard in dashboards
if "title" in dashboard and dashboard["title"] == dashboard_name
]
if len(matching_dashboards) == 0:
swxsoc.log.warning(
f"Dashboard with title '{dashboard_name}' not found. Annotation will be created without a dashboard."
)
if len(matching_dashboards) > 1:
swxsoc.log.warning(
f"Multiple dashboards with title '{dashboard_name}' found. "
f"Using the first matching dashboard UID ({matching_dashboards[0]['uid']}). Consider using unique dashboard titles."
)
return matching_dashboards[0]["uid"] if matching_dashboards else None
def get_panel_id(
dashboard_id: int, panel_name: str, mission_dashboard: Optional[str] = None
) -> Optional[int]:
"""
Retrieves the panel ID by dashboard UID and panel name. Issues a warning if multiple panels with the same name are found.
Args:
dashboard_id (int): UID of the dashboard.
panel_name (str): Name of the panel to retrieve.
Returns:
Optional[int]: The ID of the panel, or None if not found.
"""
try:
# Set the base URL and API key for Grafana Annotations API
# You need to set the GRAFANA_API_KEY environment variables to use this feature
API_KEY = os.environ.get("GRAFANA_API_KEY", None)
HEADERS = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}
BASE_URL = (
f"https://grafana.{swxsoc.config['mission']['mission_name']}.swsoc.smce.nasa.gov"
if not mission_dashboard
else f"https://grafana.{mission_dashboard}.swsoc.smce.nasa.gov"
)
response = requests.get(
f"{BASE_URL}/api/dashboards/uid/{dashboard_id}", headers=HEADERS
)
response.raise_for_status()
panels = response.json().get("dashboard", {}).get("panels", [])
except requests.exceptions.HTTPError as e:
swxsoc.log.error(
f"Failed to retrieve panels for dashboard ID {dashboard_id}: {e}"
)
return None
except requests.exceptions.ConnectionError as e:
swxsoc.log.error(
f"Failed to retrieve panels for dashboard ID {dashboard_id}: {e}"
)
return None
matching_panels = [panel for panel in panels if panel["title"] == panel_name]
if len(matching_panels) == 0:
swxsoc.log.warning(
f"Panel with title '{panel_name}' not found in dashboard ID {dashboard_id}. Annotation will be created without a panel."
)
if len(matching_panels) > 1:
swxsoc.log.warning(
f"Multiple panels with title '{panel_name}' found in dashboard ID {dashboard_id}. "
f"Using the first matching panel ID ({matching_panels[0]['id']}). Consider using unique panel titles."
)
return matching_panels[0]["id"] if matching_panels else None
def query_annotations(
start_time: datetime,
end_time: Optional[datetime] = None,
tags: Optional[List[str]] = None,
limit: Optional[int] = 100,
dashboard_id: Optional[int] = None,
panel_id: Optional[int] = None,
dashboard_name: Optional[str] = None,
panel_name: Optional[str] = None,
mission_dashboard: Optional[str] = None,
) -> List[Dict[str, Union[str, int]]]:
"""
Queries annotations within a specific timeframe with optional filters for tags, dashboard, and panel names.
Args:
start_time (datetime): Start time of the query in UTC.
end_time (Optional[datetime]): End time of the query; defaults to start_time if None.
tags (Optional[List[str]]): List of tags to filter the annotations.
limit (Optional[int]): Maximum number of annotations to retrieve.
dashboard_id (Optional[int]): UID of the dashboard to filter annotations.
panel_id (Optional[int]): ID of the panel to filter annotations.
dashboard_name (Optional[str]): Name of the dashboard to look up UID if `dashboard_id` is not provided.
panel_name (Optional[str]): Name of the panel to look up ID if `panel_id` is not provided.
Returns:
List[Dict[str, Union[str, int]]]: List of annotations matching the query criteria.
"""
# Look up dashboard and panel IDs if names are provided
if dashboard_name and not dashboard_id:
dashboard_id = get_dashboard_id(dashboard_name, mission_dashboard)
if dashboard_id and panel_name and not panel_id:
panel_id = get_panel_id(dashboard_id, panel_name, mission_dashboard)
if not end_time:
end_time = start_time
params = {
"from": _to_milliseconds(start_time),
"to": _to_milliseconds(end_time),
"limit": limit,
}
if tags:
params["tags"] = tags
if dashboard_id:
params["dashboardUID"] = dashboard_id
if panel_id:
params["panelId"] = panel_id
try:
# Set the base URL and API key for Grafana Annotations API
# You need to set the GRAFANA_API_KEY environment variables to use this feature
API_KEY = os.environ.get("GRAFANA_API_KEY", None)
HEADERS = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}
BASE_URL = (
f"https://grafana.{swxsoc.config['mission']['mission_name']}.swsoc.smce.nasa.gov"
if not mission_dashboard
else f"https://grafana.{mission_dashboard}.swsoc.smce.nasa.gov"
)
response = requests.get(
f"{BASE_URL}/api/annotations", headers=HEADERS, params=params
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
swxsoc.log.error(f"Failed to query annotations: {e}")
return []
except requests.exceptions.ConnectionError as e:
swxsoc.log.error(
f"Failed to retrieve panels for dashboard ID {dashboard_id}: {e}"
)
return []
def create_annotation(
start_time: datetime,
text: str,
tags: List[str],
end_time: Optional[datetime] = None,
dashboard_id: Optional[int] = None,
panel_id: Optional[int] = None,
dashboard_name: Optional[str] = None,
panel_name: Optional[str] = None,
mission_dashboard: Optional[str] = None,
overwrite: bool = False,
) -> Dict[str, Union[str, int]]:
"""
Creates a new annotation for a specified event or time period, with optional filtering by dashboard and panel names.
Args:
start_time (datetime): Start time of the annotation in UTC.
text (str): Annotation text to display.
tags (List[str]): List of tags for categorizing the annotation.
end_time (Optional[datetime]): End time of the annotation, if applicable.
dashboard_id (Optional[int]): UID of the dashboard to associate the annotation.
panel_id (Optional[int]): ID of the panel to associate the annotation.
dashboard_name (Optional[str]): Name of the dashboard to look up UID if `dashboard_id` is not provided.
panel_name (Optional[str]): Name of the panel to look up ID if `panel_id` is not provided.
Returns:
Dict[str, Union[str, int]]: The created annotation data.
"""
# Look up dashboard and panel IDs if names are provided
if dashboard_name and not dashboard_id:
dashboard_id = get_dashboard_id(dashboard_name, mission_dashboard)
if dashboard_id and panel_name and not panel_id:
panel_id = get_panel_id(dashboard_id, panel_name, mission_dashboard)
# Overwrite functionality: query and remove existing identical annotations
if overwrite:
swxsoc.log.info("Overwriting existing annotations.")
existing_annotations = query_annotations(
start_time=start_time,
end_time=end_time or start_time,
tags=tags,
dashboard_id=dashboard_id,
panel_id=panel_id,
mission_dashboard=mission_dashboard,
)
for annotation in existing_annotations:
if annotation.get("text") == text:
annotation_id = annotation.get("id")
if annotation_id:
removed = remove_annotation_by_id(annotation_id, mission_dashboard)
if removed:
swxsoc.log.info(
f"Removed existing annotation with ID {annotation_id}."
)
payload = {
"time": _to_milliseconds(start_time),
"text": text,
"tags": tags,
}
if end_time:
payload["timeEnd"] = _to_milliseconds(end_time)
if dashboard_id:
payload["dashboardUID"] = dashboard_id
if panel_id:
payload["panelId"] = panel_id
try:
# Set the base URL and API key for Grafana Annotations API
# You need to set the GRAFANA_API_KEY environment variables to use this feature
API_KEY = os.environ.get("GRAFANA_API_KEY", None)
HEADERS = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}
BASE_URL = (
f"https://grafana.{swxsoc.config['mission']['mission_name']}.swsoc.smce.nasa.gov"
if not mission_dashboard
else f"https://grafana.{mission_dashboard}.swsoc.smce.nasa.gov"
)
response = requests.post(
f"{BASE_URL}/api/annotations", headers=HEADERS, json=payload
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
swxsoc.log.error(f"Failed to create annotation: {e}")
return {}
except requests.exceptions.ConnectionError as e:
swxsoc.log.error(
f"Failed to retrieve panels for dashboard ID {dashboard_id}: {e}"
)
return {}
def remove_annotation_by_id(
annotation_id: int, mission_dashboard: Optional[str] = None
) -> bool:
"""
Deletes an annotation by its ID.
Args:
annotation_id (int): The ID of the annotation to delete.
Returns:
bool: True if the annotation was successfully deleted, False otherwise.
"""
try:
# Set the base URL and API key for Grafana Annotations API
# You need to set the GRAFANA_API_KEY environment variables to use this feature
API_KEY = os.environ.get("GRAFANA_API_KEY", None)
HEADERS = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}
BASE_URL = (
f"https://grafana.{swxsoc.config['mission']['mission_name']}.swsoc.smce.nasa.gov"
if not mission_dashboard
else f"https://grafana.{mission_dashboard}.swsoc.smce.nasa.gov"
)
full_url = f"{BASE_URL}/api/annotations/{annotation_id}"
response = requests.delete(full_url, headers=HEADERS)
response.raise_for_status()
return (
response.status_code == 200
) # Returns True if annotation was deleted successfully (204 No Content)
except requests.exceptions.HTTPError as e:
swxsoc.log.error(
f"Failed to remove annotation with ID {annotation_id}: {e} [swxsoc.util.util]"
)
return False
except requests.exceptions.ConnectionError as e:
swxsoc.log.error(f"Failed to connect to the server: {e}")
return False