import logging import struct from eive_tmtc.config.object_ids import ( HK_TM_STORE, MISC_TM_STORE, OK_TM_STORE, NOT_OK_TM_STORE, CFDP_TM_STORE, get_object_ids, ) from eive_tmtc.config.definitions import CustomServiceList from tmtccmd.config import TmtcDefinitionWrapper from tmtccmd.config.tmtc import tmtc_definitions_provider, OpCodeEntry from tmtccmd.tc import service_provider from tmtccmd.tc.decorator import ServiceProviderParams from dateutil.parser import parse from spacepackets.ecss import PusService from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.pus_15_tm_storage import Subservice from tmtccmd.util import ObjectIdU32 class OpCode: DUMP = "dump" DELETE = "delete" class Info: DUMP = "Dump Telemetry Packets" DELETE = "Delete Telemetry Packets" _LOGGER = logging.getLogger(__name__) @service_provider(CustomServiceList.TM_STORE) def pack_tm_store_commands(p: ServiceProviderParams): q = p.queue_helper o = p.op_code if o == OpCode.DELETE: q.add_log_cmd(Info.DELETE) delete_up_to_time = parse( input( "Please enter delete end time in any format supported by dateutil.parser.parse\n" "Recommended format: UTC ISO format YYYY-MM-DDThh:mm:ssZ: " ) ) print(f"Parsed timestamp: {delete_up_to_time}") end_stamp = delete_up_to_time.timestamp() obj_id = store_select_prompt() app_data = bytearray(obj_id.as_bytes) app_data.extend(struct.pack("!I", end_stamp)) # q.add_pus_tc( print( PusTelecommand( service=15, subservice=Subservice.DELETE_UP_TO, app_data=app_data ) ) # ) elif o == OpCode.DUMP: q.add_log_cmd(Info.DUMP) pass @tmtc_definitions_provider def add_persistent_tm_store_cmd_defs(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() oce.add(keys=OpCode.DELETE, info=Info.DELETE) oce.add(keys=OpCode.DUMP, info=Info.DUMP) defs.add_service( CustomServiceList.TM_STORE, "Persistent TM Store", op_code_entry=oce ) STORE_DICT = { OK_TM_STORE: "OK Store (Verification)", NOT_OK_TM_STORE: "NOT OK Store (Events, Verification Failures..)", MISC_TM_STORE: "Miscellaneous Store", HK_TM_STORE: "HK TM Store", CFDP_TM_STORE: "CFDP TM Store", } def store_select_prompt() -> ObjectIdU32: obj_id_dict = get_object_ids() print("Available TM stores:") idx_to_obj_id = dict() for idx, (k, v) in enumerate(STORE_DICT.items()): idx_to_obj_id.update({idx: k}) print(f"{idx}: {v}") while True: target_index = int( input("Please enter the target store for the TM store transaction: ") ) obj_id_raw = idx_to_obj_id.get(target_index) if obj_id_raw is None: _LOGGER.warning("Invalid index. Try again") continue break return obj_id_dict.get(obj_id_raw)