import datetime import enum import logging import math import struct from typing import Tuple from dateutil.parser import parse from spacepackets.ecss.pus_15_tm_storage import Subservice from spacepackets.ecss.tc import PusTelecommand from tmtccmd.config import CmdTreeNode, TmtcDefinitionWrapper from tmtccmd.config.tmtc import OpCodeEntry from tmtccmd.tmtc.queue import DefaultPusQueueHelper from tmtccmd.util import ObjectIdU32 from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.config.object_ids import ( CFDP_TM_STORE, HK_TM_STORE, MISC_TM_STORE, NOT_OK_TM_STORE, OK_TM_STORE, get_object_ids, ) class CustomSubservice(enum.IntEnum): DELETE_BY_TIME_RANGE = 128 class OpCode: RETRIEVAL_BY_TIME_RANGE = "retrieval_time_range" DELETE_UP_TO = "delete_up_to" DELETE_BY_TIME_RANGE = "delete_time_range" class Info: RETRIEVAL_BY_TIME_RANGE = "Dump Telemetry Packets by time range" DELETE_UP_TO = "Delete Telemetry Packets up to time" DELETE_BY_TIME_RANGE = "Delete Telemetry by time range" _LOGGER = logging.getLogger(__name__) def pack_tm_store_commands(q: DefaultPusQueueHelper, cmd_path: str): if cmd_path == OpCode.DELETE_UP_TO: obj_id, store_string = store_select_prompt() app_data = bytearray(obj_id.as_bytes) delete_up_to_time = time_prompt("Determining deletion end time") end_stamp = int(math.floor(delete_up_to_time.timestamp())) app_data.extend(struct.pack("!I", end_stamp)) q.add_log_cmd(Info.DELETE_UP_TO) q.add_log_cmd(f"Selected Store: {obj_id}") q.add_log_cmd(f"Deletion up to time {delete_up_to_time}") q.add_pus_tc( PusTelecommand( service=15, subservice=Subservice.DELETE_UP_TO, app_data=app_data ) ) elif cmd_path == OpCode.RETRIEVAL_BY_TIME_RANGE: q.add_log_cmd(Info.RETRIEVAL_BY_TIME_RANGE) obj_id, _ = store_select_prompt() app_data = bytearray(obj_id.as_bytes) start_of_dump_time = time_prompt("Determining retrieval start time") start_stamp = int(math.floor(start_of_dump_time.timestamp())) end_of_dump_time = time_prompt("Determining retrieval end time") end_stamp = int(math.floor(end_of_dump_time.timestamp())) app_data.extend(struct.pack("!I", start_stamp)) app_data.extend(struct.pack("!I", end_stamp)) q.add_log_cmd(Info.RETRIEVAL_BY_TIME_RANGE) q.add_log_cmd(f"Selected Store: {obj_id}") q.add_log_cmd( f"Retrieval from time {start_of_dump_time} up to time {end_of_dump_time}" ) q.add_pus_tc( PusTelecommand( service=15, subservice=Subservice.RETRIEVAL_BY_TIME_RANGE, app_data=app_data, ) ) elif cmd_path == OpCode.DELETE_BY_TIME_RANGE: q.add_log_cmd(Info.DELETE_BY_TIME_RANGE) obj_id, _ = store_select_prompt() app_data = bytearray(obj_id.as_bytes) start_of_dump_time = time_prompt("Determining deletion start time") start_stamp = int(math.floor(start_of_dump_time.timestamp())) end_of_dump_time = time_prompt("Determining deletion end time") end_stamp = int(math.floor(end_of_dump_time.timestamp())) app_data.extend(struct.pack("!I", start_stamp)) app_data.extend(struct.pack("!I", end_stamp)) q.add_log_cmd(Info.DELETE_BY_TIME_RANGE) q.add_log_cmd(f"Selected Store: {obj_id}") q.add_log_cmd( f"Deletion from time {start_of_dump_time} up to time {end_of_dump_time}" ) q.add_pus_tc( PusTelecommand( service=15, subservice=CustomSubservice.DELETE_BY_TIME_RANGE, app_data=app_data, ) ) def create_persistent_tm_store_node() -> CmdTreeNode: node = CmdTreeNode("tm_store", "Persistent TM Store") node.add_child(CmdTreeNode(OpCode.DELETE_UP_TO, Info.DELETE_UP_TO)) node.add_child(CmdTreeNode(OpCode.DELETE_BY_TIME_RANGE, Info.DELETE_BY_TIME_RANGE)) node.add_child( CmdTreeNode(OpCode.RETRIEVAL_BY_TIME_RANGE, Info.RETRIEVAL_BY_TIME_RANGE) ) return node def add_persistent_tm_store_cmd_defs(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() oce.add(keys=OpCode.DELETE_UP_TO, info=Info.DELETE_UP_TO) oce.add(keys=OpCode.RETRIEVAL_BY_TIME_RANGE, info=Info.RETRIEVAL_BY_TIME_RANGE) oce.add(keys=OpCode.DELETE_BY_TIME_RANGE, info=Info.DELETE_BY_TIME_RANGE) defs.add_service( CustomServiceList.TM_STORE, "Persistent TM Store", op_code_entry=oce ) class TmStoreSelect(enum.IntEnum): OK_TM_STORE = 0 NOT_OK_TM_STORE = 1 MISC_TM_STORE = 2 HK_TM_STORE = 3 CFDP_TM_STORE = 4 STORE_DICT = { TmStoreSelect.OK_TM_STORE: (OK_TM_STORE, "OK Store (Verification)"), TmStoreSelect.NOT_OK_TM_STORE: ( NOT_OK_TM_STORE, "NOT OK Store (Events, Verification Failures..)", ), TmStoreSelect.MISC_TM_STORE: (MISC_TM_STORE, "Miscellaneous Store"), TmStoreSelect.HK_TM_STORE: (HK_TM_STORE, "HK TM Store"), TmStoreSelect.CFDP_TM_STORE: (CFDP_TM_STORE, "CFDP TM Store"), } TIME_INPUT_DICT = { 1: "Full manual input with dateutil.parser.parse", 2: "Offset from now in seconds", } def time_prompt(info_str: str) -> datetime.datetime: print(f"{info_str}. Available time input types: ") for k, v in TIME_INPUT_DICT.items(): print(f" {k}: {v}") while True: time_input_key = int(input("Please specify a time input type by key: ")) if time_input_key not in TIME_INPUT_DICT.keys(): _LOGGER.warning("Invalid key, try again") continue break if time_input_key == 1: return time_prompt_fully_manually() elif time_input_key == 2: return time_prompt_offset_from_now() raise ValueError("can not determine datetime") def time_prompt_fully_manually() -> datetime.datetime: # TODO: Add support for offset from now in seconds time = parse( input( "Please enter the time in any format supported by dateutil.parser.parse\n" "Recommended format: UTC ISO format YYYY-MM-DDThh:mm:ssZ: " ) ) print(f"Parsed timestamp: {time}") return time def time_prompt_offset_from_now() -> datetime.datetime: seconds_offset = math.floor( float( input( "Please enter the time as a offset from now in seconds. Negative offset" " is allowed: " ) ) ) time_now_with_offset = datetime.datetime.now( tz=datetime.timezone.utc ) + datetime.timedelta(seconds=seconds_offset) print(f"Absolute resulting time: {time_now_with_offset}") return time_now_with_offset def store_select_prompt() -> Tuple[ObjectIdU32, str]: obj_id_dict = get_object_ids() print("Available TM stores:") for k, v in STORE_DICT.items(): print(f" {k}: {v[1]}") while True: target_index = int( input("Please enter the target store for the TM store transaction: ") ) desc_and_obj_id = STORE_DICT.get(TmStoreSelect(target_index)) if desc_and_obj_id is None: _LOGGER.warning("Invalid index. Try again") continue break obj_id_raw = desc_and_obj_id[0] obj_id = obj_id_dict.get(obj_id_raw) assert obj_id is not None print(f"Selected store: {obj_id} ({desc_and_obj_id[1]})") return obj_id, desc_and_obj_id[1]