import datetime import enum import logging import math import struct from eive_tmtc.config.object_ids import ( HK_TM_STORE, MISC_TM_STORE, OK_TM_STORE, NOT_OK_TM_STORE, CFDP_TM_STORE, get_object_ids, ) from eive_tmtc.config.definitions import CustomServiceList from tmtccmd.config import TmtcDefinitionWrapper from tmtccmd.config.tmtc import tmtc_definitions_provider, OpCodeEntry from tmtccmd.tc import service_provider from tmtccmd.tc.decorator import ServiceProviderParams from dateutil.parser import parse from spacepackets.ecss import PusService # noqa from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.pus_15_tm_storage import Subservice from tmtccmd.util import ObjectIdU32 class OpCode: RETRIEVAL_BY_TIME_RANGE = "retrieval_time_range" DELETE_UP_TO = "delete_up_to" class Info: RETRIEVAL_BY_TIME_RANGE = "Dump Telemetry Packets by time range" DELETE_UP_TO = "Delete Telemetry Packets up to time" _LOGGER = logging.getLogger(__name__) @service_provider(CustomServiceList.TM_STORE) def pack_tm_store_commands(p: ServiceProviderParams): q = p.queue_helper o = p.op_code if o == OpCode.DELETE_UP_TO: obj_id, store_string = store_select_prompt() app_data = bytearray(obj_id.as_bytes) delete_up_to_time = time_prompt("Determining deletion end time") end_stamp = int(math.floor(delete_up_to_time.timestamp())) app_data.extend(struct.pack("!I", end_stamp)) q.add_log_cmd(Info.DELETE_UP_TO) q.add_log_cmd(f"Selected Store: {obj_id}") q.add_log_cmd(f"Deletion up to time {delete_up_to_time}") q.add_pus_tc( PusTelecommand( service=15, subservice=Subservice.DELETE_UP_TO, app_data=app_data ) ) elif o == OpCode.RETRIEVAL_BY_TIME_RANGE: q.add_log_cmd(Info.RETRIEVAL_BY_TIME_RANGE) obj_id, store_string = store_select_prompt() app_data = bytearray(obj_id.as_bytes) start_of_dump_time = time_prompt("Determining retrieval start time") start_stamp = int(math.floor(start_of_dump_time.timestamp())) end_of_dump_time = time_prompt("Determining retrieval end time") end_stamp = int(math.floor(end_of_dump_time.timestamp())) app_data.extend(struct.pack("!I", start_stamp)) app_data.extend(struct.pack("!I", end_stamp)) q.add_log_cmd(Info.RETRIEVAL_BY_TIME_RANGE) q.add_log_cmd(f"Selected Store: {obj_id}") q.add_log_cmd( f"Retrieval from time {start_of_dump_time} up to time {end_of_dump_time}" ) q.add_pus_tc( PusTelecommand( service=15, subservice=Subservice.RETRIEVAL_BY_TIME_RANGE, app_data=app_data, ) ) @tmtc_definitions_provider def add_persistent_tm_store_cmd_defs(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() oce.add(keys=OpCode.DELETE_UP_TO, info=Info.DELETE_UP_TO) oce.add(keys=OpCode.RETRIEVAL_BY_TIME_RANGE, info=Info.RETRIEVAL_BY_TIME_RANGE) defs.add_service( CustomServiceList.TM_STORE, "Persistent TM Store", op_code_entry=oce ) class TmStoreSelect(enum.IntEnum): OK_TM_STORE = 0 NOT_OK_TM_STORE = 1 MISC_TM_STORE = 2 HK_TM_STORE = 3 CFDP_TM_STORE = 4 STORE_DICT = { TmStoreSelect.OK_TM_STORE: (OK_TM_STORE, "OK Store (Verification)"), TmStoreSelect.NOT_OK_TM_STORE: ( NOT_OK_TM_STORE, "NOT OK Store (Events, Verification Failures..)", ), TmStoreSelect.MISC_TM_STORE: (MISC_TM_STORE, "Miscellaneous Store"), TmStoreSelect.HK_TM_STORE: (HK_TM_STORE, "HK TM Store"), TmStoreSelect.CFDP_TM_STORE: (CFDP_TM_STORE, "CFDP TM Store"), } TIME_INPUT_DICT = { 1: "Full manual input with dateutil.parser.parse", 2: "Offset from now in seconds", } def time_prompt(info_str: str) -> datetime.datetime: print(f"{info_str}. Available time input types: ") for k, v in TIME_INPUT_DICT.items(): print(f" {k}: {v}") while True: time_input_key = int(input("Please specify a time input type by key: ")) if time_input_key not in TIME_INPUT_DICT.keys(): _LOGGER.warning("Invalid key, try again") continue break if time_input_key == 1: return time_prompt_fully_manually() elif time_input_key == 2: return time_prompt_offset_from_now() def time_prompt_fully_manually() -> datetime.datetime: # TODO: Add support for offset from now in seconds time = parse( input( "Please enter the time in any format supported by dateutil.parser.parse\n" "Recommended format: UTC ISO format YYYY-MM-DDThh:mm:ssZ: " ) ) print(f"Parsed timestamp: {time}") return time def time_prompt_offset_from_now() -> datetime.datetime: seconds_offset = math.floor( float( input( "Please enter the time as a offset from now in seconds. Negative offset is allowed: " ) ) ) time_now_with_offset = datetime.datetime.now( tz=datetime.timezone.utc ) + datetime.timedelta(seconds=seconds_offset) print(f"Absolute resulting time: {time_now_with_offset}") return time_now_with_offset def store_select_prompt() -> (ObjectIdU32, str): obj_id_dict = get_object_ids() print("Available TM stores:") for k, v in STORE_DICT.items(): print(f" {k}: {v[1]}") while True: target_index = int( input("Please enter the target store for the TM store transaction: ") ) desc_and_obj_id = STORE_DICT.get(TmStoreSelect(target_index)) if desc_and_obj_id is None: _LOGGER.warning("Invalid index. Try again") continue break obj_id_raw = desc_and_obj_id[0] obj_id = obj_id_dict.get(obj_id_raw) print(f"Selected store: {obj_id} ({desc_and_obj_id[1]})") return obj_id_dict.get(obj_id_raw), desc_and_obj_id[1]