import datetime import logging import math import struct from eive_tmtc.config.object_ids import ( HK_TM_STORE, MISC_TM_STORE, OK_TM_STORE, NOT_OK_TM_STORE, CFDP_TM_STORE, get_object_ids, ) from eive_tmtc.config.definitions import CustomServiceList from tmtccmd.config import TmtcDefinitionWrapper from tmtccmd.config.tmtc import tmtc_definitions_provider, OpCodeEntry from tmtccmd.tc import service_provider from tmtccmd.tc.decorator import ServiceProviderParams from dateutil.parser import parse from spacepackets.ecss import PusService # noqa from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.pus_15_tm_storage import Subservice from tmtccmd.util import ObjectIdU32 class OpCode: RETRIEVAL_BY_TIME_RANGE = "retrieval_time_range" DELETE_UP_TO = "delete_up_to" class Info: RETRIEVAL_BY_TIME_RANGE = "Dump Telemetry Packets by time range" DELETE_UP_TO = "Delete Telemetry Packets up to time" _LOGGER = logging.getLogger(__name__) @service_provider(CustomServiceList.TM_STORE) def pack_tm_store_commands(p: ServiceProviderParams): q = p.queue_helper o = p.op_code if o == OpCode.DELETE_UP_TO: obj_id, store_string = store_select_prompt() app_data = bytearray(obj_id.as_bytes) delete_up_to_time = time_prompt() end_stamp = int(math.floor(delete_up_to_time.timestamp())) app_data.extend(struct.pack("!I", end_stamp)) q.add_log_cmd(Info.DELETE_UP_TO) q.add_log_cmd(f"Selected Store: {obj_id}") q.add_log_cmd( f"Deletion up to time " f"{datetime.datetime.fromtimestamp(end_stamp, tz=datetime.timezone.utc)}" ) q.add_pus_tc( PusTelecommand( service=15, subservice=Subservice.DELETE_UP_TO, app_data=app_data ) ) elif o == OpCode.RETRIEVAL_BY_TIME_RANGE: q.add_log_cmd(Info.RETRIEVAL_BY_TIME_RANGE) pass @tmtc_definitions_provider def add_persistent_tm_store_cmd_defs(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() oce.add(keys=OpCode.DELETE_UP_TO, info=Info.DELETE_UP_TO) oce.add(keys=OpCode.RETRIEVAL_BY_TIME_RANGE, info=Info.RETRIEVAL_BY_TIME_RANGE) defs.add_service( CustomServiceList.TM_STORE, "Persistent TM Store", op_code_entry=oce ) STORE_DICT = { OK_TM_STORE: "OK Store (Verification)", NOT_OK_TM_STORE: "NOT OK Store (Events, Verification Failures..)", MISC_TM_STORE: "Miscellaneous Store", HK_TM_STORE: "HK TM Store", CFDP_TM_STORE: "CFDP TM Store", } TIME_INPUT_DICT = { 1: "Full manual input with dateutil.parser.parse", 2: "Offset from now in seconds", } def time_prompt() -> datetime.datetime: print("Available time input types: ") for k, v in TIME_INPUT_DICT.items(): print(f" {k}: {v}") while True: time_input_key = int(input("Please specify a time input type by key: ")) if time_input_key not in TIME_INPUT_DICT.keys(): _LOGGER.warning("Invalid key, try again") continue break if time_input_key == 1: return time_prompt_fully_manually() elif time_input_key == 2: return time_prompt_offset_from_now() def time_prompt_fully_manually() -> datetime.datetime: # TODO: Add support for offset from now in seconds time = parse( input( "Please enter the time in any format supported by dateutil.parser.parse\n" "Recommended format: UTC ISO format YYYY-MM-DDThh:mm:ssZ: " ) ) print(f"Parsed timestamp: {time}") return time def time_prompt_offset_from_now() -> datetime.datetime: seconds_offset = math.floor( float( input( "Please enter the time as a offset from now in seconds. Negative offset is allowed: " ) ) ) time_now_with_offset = datetime.datetime.now( tz=datetime.timezone.utc ) + datetime.timedelta(seconds=seconds_offset) print(f"Absolute resulting time: {time_now_with_offset}") return time_now_with_offset def store_select_prompt() -> (ObjectIdU32, str): obj_id_dict = get_object_ids() print("Available TM stores:") idx_to_obj_id = dict() for idx, (k, v) in enumerate(STORE_DICT.items()): idx_to_obj_id.update({idx: (k, v)}) print(f" {idx}: {v}") while True: target_index = int( input("Please enter the target store for the TM store transaction: ") ) obj_id_and_store_str = idx_to_obj_id.get(target_index) if obj_id_and_store_str is None: _LOGGER.warning("Invalid index. Try again") continue break obj_id_raw = obj_id_and_store_str[0] return obj_id_dict.get(obj_id_raw), obj_id_and_store_str[1]