import datetime import logging import math import struct from eive_tmtc.config.object_ids import ( HK_TM_STORE, MISC_TM_STORE, OK_TM_STORE, NOT_OK_TM_STORE, CFDP_TM_STORE, get_object_ids, ) from eive_tmtc.config.definitions import CustomServiceList from tmtccmd.config import TmtcDefinitionWrapper from tmtccmd.config.tmtc import tmtc_definitions_provider, OpCodeEntry from tmtccmd.tc import service_provider from tmtccmd.tc.decorator import ServiceProviderParams from dateutil.parser import parse from spacepackets.ecss import PusService from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.pus_15_tm_storage import Subservice from tmtccmd.util import ObjectIdU32 class OpCode: DUMP = "dump" DELETE = "delete" class Info: DUMP = "Dump Telemetry Packets" DELETE = "Delete Telemetry Packets" _LOGGER = logging.getLogger(__name__) @service_provider(CustomServiceList.TM_STORE) def pack_tm_store_commands(p: ServiceProviderParams): q = p.queue_helper o = p.op_code if o == OpCode.DELETE: obj_id, store_string = store_select_prompt() app_data = bytearray(obj_id.as_bytes) delete_up_to_time = time_prompt() end_stamp = int(math.floor(delete_up_to_time.timestamp())) app_data.extend(struct.pack("!I", end_stamp)) q.add_log_cmd(Info.DELETE) q.add_log_cmd(f"Selected Store: {obj_id}") q.add_log_cmd(f"Deletion up to time {delete_up_to_time}") print( PusTelecommand( service=15, subservice=Subservice.DELETE_UP_TO, app_data=app_data ) ) # ) elif o == OpCode.DUMP: q.add_log_cmd(Info.DUMP) pass @tmtc_definitions_provider def add_persistent_tm_store_cmd_defs(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() oce.add(keys=OpCode.DELETE, info=Info.DELETE) oce.add(keys=OpCode.DUMP, info=Info.DUMP) defs.add_service( CustomServiceList.TM_STORE, "Persistent TM Store", op_code_entry=oce ) STORE_DICT = { OK_TM_STORE: "OK Store (Verification)", NOT_OK_TM_STORE: "NOT OK Store (Events, Verification Failures..)", MISC_TM_STORE: "Miscellaneous Store", HK_TM_STORE: "HK TM Store", CFDP_TM_STORE: "CFDP TM Store", } def time_prompt() -> datetime.datetime: # TODO: Add support for offset from now in seconds delete_up_to_time = parse( input( "Please enter delete end time in any format supported by dateutil.parser.parse\n" "Recommended format: UTC ISO format YYYY-MM-DDThh:mm:ssZ: " ) ) print(f"Parsed timestamp: {delete_up_to_time}") return delete_up_to_time def store_select_prompt() -> (ObjectIdU32, str): obj_id_dict = get_object_ids() print("Available TM stores:") idx_to_obj_id = dict() for idx, (k, v) in enumerate(STORE_DICT.items()): idx_to_obj_id.update({idx: (k, v)}) print(f"{idx}: {v}") while True: target_index = int( input("Please enter the target store for the TM store transaction: ") ) obj_id_and_store_str = idx_to_obj_id.get(target_index) if obj_id_and_store_str is None: _LOGGER.warning("Invalid index. Try again") continue break obj_id_raw = obj_id_and_store_str[0] return obj_id_dict.get(obj_id_raw), obj_id_and_store_str[1]