diff --git a/deps/tmtccmd b/deps/tmtccmd index e5d12ca..2890c62 160000 --- a/deps/tmtccmd +++ b/deps/tmtccmd @@ -1 +1 @@ -Subproject commit e5d12ca4c0da06338e575701972aaf4180b49635 +Subproject commit 2890c62e31999706ff1a7164b8a9f7cbc7bc34d5 diff --git a/pus_tc/devs/ccsds_handler.py b/pus_tc/devs/ccsds_handler.py index a7e400d..8b21825 100644 --- a/pus_tc/devs/ccsds_handler.py +++ b/pus_tc/devs/ccsds_handler.py @@ -7,9 +7,9 @@ """ import struct -from tmtccmd.config.definitions import QueueCommands -from tmtccmd.tc.packer import TcQueueT from spacepackets.ecss.tc import PusTelecommand +from tmtccmd.tc import QueueHelper +from tmtccmd.utility import ObjectId class CommandIds: @@ -32,84 +32,47 @@ class CommandIds: UPDATE_ON_FALLING_EDGE = 8 -def pack_ccsds_handler_test( - object_id: bytearray, tc_queue: TcQueueT, op_code: str -) -> TcQueueT: - tc_queue.appendleft( - ( - QueueCommands.PRINT, - "Testing CCSDS handler with object id: 0x" + object_id.hex(), - ) - ) +def pack_ccsds_handler_test(object_id: ObjectId, q: QueueHelper, op_code: str): + obyt = object_id.as_bytes + q.add_log_cmd(f"Testing CCSDS handler with object id: {object_id.as_hex_string}") if op_code == "0": - tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Set low rate")) - command = object_id + struct.pack("!I", CommandIds.SET_LOW_RATE) - command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("CCSDS Handler: Set low rate") + command = obyt + struct.pack("!I", CommandIds.SET_LOW_RATE) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "1": - tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Set high rate")) - command = object_id + struct.pack("!I", CommandIds.SET_HIGH_RATE) - command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("CCSDS Handler: Set high rate") + command = obyt + struct.pack("!I", CommandIds.SET_HIGH_RATE) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "2": - tc_queue.appendleft( - (QueueCommands.PRINT, "CCSDS Handler: Enables the transmitter") - ) - command = object_id + struct.pack("!I", CommandIds.EN_TRANSMITTER) - command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("CCSDS Handler: Enables the transmitter") + command = obyt + struct.pack("!I", CommandIds.EN_TRANSMITTER) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "3": - tc_queue.appendleft( - (QueueCommands.PRINT, "CCSDS Handler: Disables the transmitter") - ) - command = object_id + struct.pack("!I", CommandIds.DIS_TRANSMITTER) - command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("CCSDS Handler: Disables the transmitter") + command = obyt + struct.pack("!I", CommandIds.DIS_TRANSMITTER) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "4": - tc_queue.appendleft( - (QueueCommands.PRINT, "CCSDS Handler: Set arbitrary bitrate") - ) + q.add_log_cmd("CCSDS Handler: Set arbitrary bitrate") bitrate = int(input("Specify bit rate (bps): ")) command = ( - object_id + obyt + struct.pack("!I", CommandIds.ARBITRARY_BITRATE) + struct.pack("!I", bitrate) ) - command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "5": - tc_queue.appendleft( - (QueueCommands.PRINT, "CCSDS Handler: Enable tx clock manipulator") - ) - command = object_id + struct.pack("!I", CommandIds.ENABLE_TX_CLK_MANIPULATOR) - command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("CCSDS Handler: Enable tx clock manipulator") + command = obyt + struct.pack("!I", CommandIds.ENABLE_TX_CLK_MANIPULATOR) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "6": - tc_queue.appendleft( - (QueueCommands.PRINT, "CCSDS Handler: Disable tx clock manipulator") - ) - command = object_id + struct.pack("!I", CommandIds.DISABLE_TX_CLK_MANIPULATOR) - command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("CCSDS Handler: Disable tx clock manipulator") + command = obyt + struct.pack("!I", CommandIds.DISABLE_TX_CLK_MANIPULATOR) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "7": - tc_queue.appendleft( - ( - QueueCommands.PRINT, - "CCSDS Handler: Update tx data on rising edge of tx clock", - ) - ) - command = object_id + struct.pack("!I", CommandIds.UPDATE_ON_RISING_EDGE) - command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("CCSDS Handler: Update tx data on rising edge of tx clock") + command = obyt + struct.pack("!I", CommandIds.UPDATE_ON_RISING_EDGE) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "8": - tc_queue.appendleft( - ( - QueueCommands.PRINT, - "CCSDS Handler: Update tx data on falling edge of tx clock", - ) - ) - command = object_id + struct.pack("!I", CommandIds.UPDATE_ON_FALLING_EDGE) - command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) - - return tc_queue + q.add_log_cmd("CCSDS Handler: Update tx data on falling edge of tx clock") + command = obyt + struct.pack("!I", CommandIds.UPDATE_ON_FALLING_EDGE) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) diff --git a/pus_tc/devs/gps.py b/pus_tc/devs/gps.py index c7577fc..1cc39ed 100644 --- a/pus_tc/devs/gps.py +++ b/pus_tc/devs/gps.py @@ -1,12 +1,9 @@ import enum from config.definitions import CustomServiceList -from tmtccmd.config import add_op_code_entry, add_service_op_code_entry +from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command -from tmtccmd.config.definitions import QueueCommands, ServiceOpCodeDictT from tmtccmd.logging import get_console_logger -from tmtccmd.tc.definitions import TcQueueT -from tmtccmd.tc.pus_8_funccmd import generate_action_command LOGGER = get_console_logger() @@ -42,13 +39,12 @@ def add_gps_cmds(cmd_dict: ServiceOpCodeDictT): ) -def pack_gps_command(object_id: bytes, tc_queue: TcQueueT, op_code: str): +def pack_gps_command(object_id: bytes, q: QueueHelper, op_code: str): if op_code in OpCodes.RESET_GNSS: # TODO: This needs to be re-implemented LOGGER.warning("Reset pin handling needs to be re-implemented") if op_code in OpCodes.REQ_OS_HK: - tc_queue.appendleft((QueueCommands.PRINT, f"GMSS: {Info.REQ_OS_HK}")) - cmd = generate_one_hk_command( - sid=make_sid(object_id=object_id, set_id=SetIds.HK), ssc=0 + q.add_log_cmd(f"GMSS: {Info.REQ_OS_HK}") + q.add_pus_tc( + generate_one_hk_command(sid=make_sid(object_id=object_id, set_id=SetIds.HK)) ) - tc_queue.appendleft(cmd.pack_command_tuple()) diff --git a/pus_tc/devs/ploc_memory_dumper.py b/pus_tc/devs/ploc_memory_dumper.py index 36edc8e..85f27fa 100644 --- a/pus_tc/devs/ploc_memory_dumper.py +++ b/pus_tc/devs/ploc_memory_dumper.py @@ -8,36 +8,30 @@ """ import struct -from tmtccmd.config.definitions import QueueCommands - -from tmtccmd.tc.packer import TcQueueT from spacepackets.ecss.tc import PusTelecommand +from tmtccmd.tc import QueueHelper +from tmtccmd.utility import ObjectId class ActionIds: DUMP_MRAM = 1 -def pack_ploc_memory_dumper_cmd(object_id: bytearray, tc_queue: TcQueueT, op_code: str): - tc_queue.appendleft( - ( - QueueCommands.PRINT, - "Testing PLOC memory dumper with object id: 0x" + object_id.hex(), - ) +def pack_ploc_memory_dumper_cmd(object_id: ObjectId, q: QueueHelper, op_code: str): + q.add_log_cmd( + f"Testing PLOC memory dumper with object id: {object_id.as_hex_string}" ) if op_code == "0": - tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Dump MRAM")) - command = pack_mram_dump_cmd(object_id) - command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("PLOC Supervisor: Dump MRAM") + command = pack_mram_dump_cmd(object_id.as_bytes) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) -def pack_mram_dump_cmd(object_id: bytearray) -> bytearray: +def pack_mram_dump_cmd(object_id: bytes) -> bytearray: start = int(input("Start address: 0x"), 16) end = int(input("End address: 0x"), 16) - command = bytearray() command = object_id + struct.pack("!I", ActionIds.DUMP_MRAM) command = command + struct.pack("!I", start) command = command + struct.pack("!I", end) - return command + return bytearray(command) diff --git a/pus_tc/devs/plpcdu.py b/pus_tc/devs/plpcdu.py index 1009794..77462b3 100644 --- a/pus_tc/devs/plpcdu.py +++ b/pus_tc/devs/plpcdu.py @@ -11,6 +11,8 @@ from tmtccmd.config import ( add_service_op_code_entry, ) from tmtccmd.tc.definitions import TcQueueT + +from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_3_fsfw_hk import ( generate_one_hk_command, make_sid, @@ -197,18 +199,14 @@ def add_pl_pcdu_cmds(cmd_dict: ServiceOpCodeDictT): ) -def pack_pl_pcdu_commands(tc_queue: TcQueueT, op_code: str): +def pack_pl_pcdu_commands(q: QueueHelper, op_code: str): if op_code in OpCodes.SWITCH_ON: - pack_pl_pcdu_mode_cmd( - tc_queue=tc_queue, info=Info.SWITCH_ON, mode=Modes.ON, submode=0 - ) + pack_pl_pcdu_mode_cmd(q=q, info=Info.SWITCH_ON, mode=Modes.ON, submode=0) if op_code in OpCodes.SWITCH_OFF: - pack_pl_pcdu_mode_cmd( - tc_queue=tc_queue, info=Info.SWITCH_OFF, mode=Modes.OFF, submode=0 - ) + pack_pl_pcdu_mode_cmd(q=q, info=Info.SWITCH_OFF, mode=Modes.OFF, submode=0) if op_code in OpCodes.NORMAL_SSR: pack_pl_pcdu_mode_cmd( - tc_queue=tc_queue, + q=q, info=Info.NORMAL_SSR, mode=Modes.NORMAL, submode=submode_mask_to_submode( @@ -217,64 +215,63 @@ def pack_pl_pcdu_commands(tc_queue: TcQueueT, op_code: str): ) if op_code in OpCodes.NORMAL_DRO: pack_pl_pcdu_mode_cmd( - tc_queue=tc_queue, + q=q, info=Info.NORMAL_DRO, mode=Modes.NORMAL, submode=submode_mask_to_submode(NormalSubmodesMask.DRO_ON), ) if op_code in OpCodes.NORMAL_X8: pack_pl_pcdu_mode_cmd( - tc_queue=tc_queue, + q=q, info=Info.NORMAL_X8, mode=Modes.NORMAL, submode=submode_mask_to_submode(NormalSubmodesMask.X8_ON), ) if op_code in OpCodes.NORMAL_TX: pack_pl_pcdu_mode_cmd( - tc_queue=tc_queue, + q=q, info=Info.NORMAL_TX, mode=Modes.NORMAL, submode=submode_mask_to_submode(NormalSubmodesMask.TX_ON), ) if op_code in OpCodes.NORMAL_MPA: pack_pl_pcdu_mode_cmd( - tc_queue=tc_queue, + q=q, info=Info.NORMAL_MPA, mode=Modes.NORMAL, submode=submode_mask_to_submode(NormalSubmodesMask.MPA_ON), ) if op_code in OpCodes.NORMAL_HPA: pack_pl_pcdu_mode_cmd( - tc_queue=tc_queue, + q=q, info=Info.NORMAL_HPA, mode=Modes.NORMAL, submode=submode_mask_to_submode(NormalSubmodesMask.HPA_ON), ) if op_code in OpCodes.REQ_OS_HK: - tc_queue.appendleft((QueueCommands.PRINT, f"PL PCDU: {Info.REQ_OS_HK}")) - cmd = generate_one_diag_command( - sid=make_sid(object_id=PL_PCDU_ID, set_id=SetIds.ADC), ssc=0 + q.add_log_cmd(f"PL PCDU: {Info.REQ_OS_HK}") + q.add_pus_tc( + generate_one_diag_command( + sid=make_sid(object_id=PL_PCDU_ID, set_id=SetIds.ADC) + ) ) - tc_queue.appendleft(cmd.pack_command_tuple()) if op_code in OpCodes.SWITCH_HPA_ON_PROC: - hpa_on_procedure(tc_queue) + hpa_on_procedure(q) if op_code in OpCodes.INJECT_ALL_ON_FAILURE: pack_failure_injection_cmd( - tc_queue=tc_queue, + q=q, param_id=ParamIds.INJECT_ALL_ON_FAILURE, print_str="All On", ) -def hpa_on_procedure(tc_queue: TcQueueT): +def hpa_on_procedure(q: QueueHelper): delay_dro_to_x8 = request_wait_time() if delay_dro_to_x8 is None: delay_dro_to_x8 = 900 - tc_queue.appendleft( - ( - QueueCommands.PRINT, - f"Starting procedure to switch on PL PCDU HPA with DRO to X8 delay of {delay_dro_to_x8} seconds", - ) + q.add_log_cmd( + f"Starting procedure to switch on PL PCDU HPA with DRO to X8 " + f"delay of {delay_dro_to_x8} seconds" ) pl_pcdu_on = PusTelecommand( service=200, @@ -339,54 +336,52 @@ def hpa_on_procedure(tc_queue: TcQueueT): ) current_time = time.time() - enb_sched = generate_enable_tc_sched_cmd(ssc=0) + enb_sched = generate_enable_tc_sched_cmd() sched_time = current_time + 10 - tc_queue.appendleft(enb_sched.pack_command_tuple()) + q.add_pus_tc(enb_sched) tagged_on_cmd = generate_time_tagged_cmd( release_time=struct.pack("!I", sched_time), tc_to_insert=pl_pcdu_on, - ssc=1, ) - tc_queue.appendleft(tagged_on_cmd.pack_command_tuple()) + q.add_pus_tc(tagged_on_cmd) sched_time += 5 tagged_ssr_cmd = generate_time_tagged_cmd( release_time=struct.pack("!I", sched_time), tc_to_insert=ssr_on, - ssc=2, ) - tc_queue.appendleft(tagged_ssr_cmd.pack_command_tuple()) + q.add_pus_tc(tagged_ssr_cmd) sched_time += 5 tagged_dro_cmd = generate_time_tagged_cmd( - release_time=struct.pack("!I", sched_time), tc_to_insert=dro_on, ssc=3 + release_time=struct.pack("!I", sched_time), tc_to_insert=dro_on ) - tc_queue.appendleft(tagged_dro_cmd.pack_command_tuple()) + q.add_pus_tc(tagged_dro_cmd) sched_time += delay_dro_to_x8 tagged_x8_cmd = generate_time_tagged_cmd( - release_time=struct.pack("!I", sched_time), tc_to_insert=x8_on, ssc=4 + release_time=struct.pack("!I", sched_time), tc_to_insert=x8_on ) - tc_queue.appendleft(tagged_x8_cmd.pack_command_tuple()) + q.add_pus_tc(tagged_x8_cmd) sched_time += 5 tagged_tx_cmd = generate_time_tagged_cmd( - release_time=struct.pack("!I", sched_time), tc_to_insert=tx_on, ssc=5 + release_time=struct.pack("!I", sched_time), tc_to_insert=tx_on ) - tc_queue.appendleft(tagged_tx_cmd.pack_command_tuple()) + q.add_pus_tc(tagged_tx_cmd) sched_time += 5 tagged_mpa_cmd = generate_time_tagged_cmd( - release_time=struct.pack("!I", sched_time), tc_to_insert=mpa_on, ssc=6 + release_time=struct.pack("!I", sched_time), tc_to_insert=mpa_on ) - tc_queue.appendleft(tagged_mpa_cmd.pack_command_tuple()) + q.add_pus_tc(tagged_mpa_cmd) sched_time += 5 tagged_hpa_cmd = generate_time_tagged_cmd( - release_time=struct.pack("!I", sched_time), tc_to_insert=hpa_on, ssc=7 + release_time=struct.pack("!I", sched_time), tc_to_insert=hpa_on ) - tc_queue.appendleft(tagged_hpa_cmd.pack_command_tuple()) + q.add_pus_tc(tagged_hpa_cmd) def request_wait_time() -> Optional[float]: @@ -457,28 +452,23 @@ def pack_wait_time_cmd(tc_queue: TcQueueT, param_id: int, print_str: str): unique_id=param_id, parameter=wait_time, ) - cmd = pack_fsfw_load_param_cmd(ssc=0, app_data=param_data) + cmd = pack_fsfw_load_param_cmd(app_data=param_data) tc_queue.appendleft(cmd.pack_command_tuple()) -def pack_failure_injection_cmd(tc_queue: TcQueueT, param_id: int, print_str: str): - tc_queue.appendleft((QueueCommands.PRINT, f"Inserting {print_str} error")) +def pack_failure_injection_cmd(q: QueueHelper, param_id: int, print_str: str): + q.add_log_cmd(f"Inserting {print_str} error") param_data = pack_boolean_parameter_app_data( object_id=PL_PCDU_ID, domain_id=0, unique_id=param_id, parameter=True ) - cmd = pack_fsfw_load_param_cmd(ssc=0, app_data=param_data) - tc_queue.appendleft(cmd.pack_command_tuple()) + q.add_pus_tc(pack_fsfw_load_param_cmd(app_data=param_data)) -def pack_pl_pcdu_mode_cmd(tc_queue: TcQueueT, info: str, mode: Modes, submode: int): - tc_queue.appendleft( - ( - QueueCommands.PRINT, - info, +def pack_pl_pcdu_mode_cmd(q: QueueHelper, info: str, mode: Modes, submode: int): + q.add_log_cmd(info) + mode_data = pack_mode_data(object_id=PL_PCDU_ID, mode=mode, submode=submode) + q.add_pus_tc( + PusTelecommand( + service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=mode_data ) ) - mode_data = pack_mode_data(object_id=PL_PCDU_ID, mode=mode, submode=submode) - mode_cmd = PusTelecommand( - service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=mode_data - ) - tc_queue.appendleft(mode_cmd.pack_command_tuple()) diff --git a/pus_tc/devs/reaction_wheels.py b/pus_tc/devs/reaction_wheels.py index 25cf89a..591b69f 100644 --- a/pus_tc/devs/reaction_wheels.py +++ b/pus_tc/devs/reaction_wheels.py @@ -130,7 +130,7 @@ def add_rw_cmds(cmd_dict: ServiceOpCodeDictT): def pack_single_rw_test_into( object_id: bytes, rw_idx: int, q: QueueHelper, op_code: str -) -> TcQueueT: +): if op_code in OpCodesDevs.SPEED: speed = int(input("Specify speed [0.1 RPM]: ")) ramp_time = int(input("Specify ramp time [ms]: ")) @@ -166,28 +166,30 @@ def pack_single_rw_test_into( sid=make_sid(object_id=object_id, set_id=RwSetIds.STATUS_SET_ID) ) ) - return q -def pack_rw_ass_cmds(tc_queue: TcQueueT, object_id: bytes, op_code: str): +def pack_rw_ass_cmds(q: QueueHelper, object_id: bytes, op_code: str): if op_code in OpCodesAss.OFF: data = pack_mode_data(object_id=object_id, mode=Modes.OFF, submode=0) - cmd = PusTelecommand( - service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=data + q.add_pus_tc( + PusTelecommand( + service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=data + ) ) - tc_queue.appendleft(cmd.pack_command_tuple()) if op_code in OpCodesAss.ON: data = pack_mode_data(object_id=object_id, mode=Modes.ON, submode=0) - cmd = PusTelecommand( - service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=data + q.add_pus_tc( + PusTelecommand( + service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=data + ) ) - tc_queue.appendleft(cmd.pack_command_tuple()) if op_code in OpCodesAss.NML: data = pack_mode_data(object_id=object_id, mode=Modes.NORMAL, submode=0) - cmd = PusTelecommand( - service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=data + q.add_pus_tc( + PusTelecommand( + service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=data + ) ) - tc_queue.appendleft(cmd.pack_command_tuple()) def pack_set_speed_command( diff --git a/pus_tc/devs/solar_array_deployment.py b/pus_tc/devs/solar_array_deployment.py index f6a9b69..111496f 100644 --- a/pus_tc/devs/solar_array_deployment.py +++ b/pus_tc/devs/solar_array_deployment.py @@ -5,18 +5,16 @@ @author J. Meier @date 15.02.2021 """ -from tmtccmd.config.definitions import QueueCommands -from tmtccmd.tc.definitions import TcQueueT -from tmtccmd.tc.packer import PusTelecommand +from spacepackets.ecss import PusTelecommand +from tmtccmd.tc import QueueHelper class ActionIds: DEPLOY_SOLAR_ARRAYS = bytearray([0x0, 0x0, 0x0, 0x5]) -def pack_solar_array_deployment_test_into(object_id: bytearray, tc_queue: TcQueueT): - tc_queue.appendleft((QueueCommands.PRINT, "Testing S/A Deployment")) +def pack_solar_array_deployment_test_into(object_id: bytearray, q: QueueHelper): + q.add_log_cmd("Testing S/A Deployment") command = object_id + ActionIds.DEPLOY_SOLAR_ARRAYS - command = PusTelecommand(service=8, subservice=128, ssc=200, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) diff --git a/pus_tc/devs/star_tracker.py b/pus_tc/devs/star_tracker.py index d8c9a94..9345b68 100644 --- a/pus_tc/devs/star_tracker.py +++ b/pus_tc/devs/star_tracker.py @@ -7,12 +7,12 @@ """ import struct -from tmtccmd.config.definitions import QueueCommands - -from tmtccmd.tc.packer import TcQueueT from spacepackets.ecss.tc import PusTelecommand from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes from tmtccmd.logging import get_console_logger + +from tmtccmd.tc import QueueHelper +from tmtccmd.utility import ObjectId from utility.input_helper import InputHelper @@ -150,623 +150,483 @@ class Submode: FIRMWARE = 2 -def pack_star_tracker_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): - tc_queue.appendleft( - ( - QueueCommands.PRINT, - "Generate command for star tracker with object id: 0x" + object_id.hex(), - ) +def pack_star_tracker_commands(object_id: ObjectId, q: QueueHelper, op_code: str): + q.add_log_cmd( + f"Generate command for star tracker with object id: {object_id.as_hex_string}" ) - + obyt = object_id.as_bytes if op_code == "0": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Mode On, Submode Bootloader") - ) - command = pack_mode_data(object_id, Modes.ON, Submode.BOOTLOADER) - command = PusTelecommand(service=200, subservice=1, ssc=9, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Mode On, Submode Bootloader") + data = pack_mode_data(obyt, Modes.ON, Submode.BOOTLOADER) + q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) if op_code == "1": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Mode On, Submode Firmware") - ) - command = pack_mode_data(object_id, Modes.ON, Submode.FIRMWARE) - command = PusTelecommand(service=200, subservice=1, ssc=10, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Mode On, Submode Firmware") + data = pack_mode_data(obyt, Modes.ON, Submode.FIRMWARE) + q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) if op_code == "2": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Mode Normal")) - command = pack_mode_data(object_id, Modes.NORMAL, 0) - command = PusTelecommand(service=200, subservice=1, ssc=11, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Mode Normal") + data = pack_mode_data(obyt, Modes.NORMAL, 0) + q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) if op_code == "3": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Mode Off")) - command = pack_mode_data(object_id, Modes.OFF, 0) - command = PusTelecommand(service=200, subservice=1, ssc=12, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Mode Off") + data = pack_mode_data(obyt, Modes.OFF, 0) + q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) if op_code == "4": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Mode Raw")) - command = pack_mode_data(object_id, 3, 0) - command = PusTelecommand(service=200, subservice=1, ssc=13, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Mode Raw") + data = pack_mode_data(obyt, Modes.RAW, 0) + q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) if op_code == "5": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Ping")) - command = object_id + struct.pack("!I", StarTrackerActionIds.PING) - command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Ping") + data = obyt + struct.pack("!I", StarTrackerActionIds.PING) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "6": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Switch to bootloader program") - ) - command = object_id + struct.pack( + q.add_log_cmd("Star tracker: Switch to bootloader program") + data = obyt + struct.pack( "!I", StarTrackerActionIds.SWITCH_TO_BOOTLOADER_PROGRAM ) - command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "7": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Temperature request")) - command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_TEMPERATURE) - command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Temperature request") + data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_TEMPERATURE) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "8": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Request version")) - command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_VERSION) - command = PusTelecommand(service=8, subservice=128, ssc=33, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Request version") + data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_VERSION) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "9": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Request interface")) - command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_INTERFACE) - command = PusTelecommand(service=8, subservice=128, ssc=34, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Request interface") + data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_INTERFACE) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "10": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Request power")) - command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_POWER) - command = PusTelecommand(service=8, subservice=128, ssc=35, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Request power") + data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_POWER) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "11": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Set subscription parameters") - ) + q.add_log_cmd("Star tracker: Set subscription parameters") json_file = get_config_file() - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.SUBSCRIPTION) + bytearray(json_file, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=36, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "12": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Boot")) - command = object_id + struct.pack("!I", StarTrackerActionIds.BOOT) - command = PusTelecommand(service=8, subservice=128, ssc=37, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Boot") + data = obyt + struct.pack("!I", StarTrackerActionIds.BOOT) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "13": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Request time")) - command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_TIME) - command = PusTelecommand(service=8, subservice=128, ssc=38, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Request time") + data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_TIME) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "14": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Request solution")) - command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_SOLUTION) - command = PusTelecommand(service=8, subservice=128, ssc=39, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Request solution") + data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_SOLUTION) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "15": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Upload image")) + q.add_log_cmd("Star tracker: Upload image") image = get_upload_image() - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.UPLOAD_IMAGE) + bytearray(image, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=40, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "16": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Download image")) + q.add_log_cmd("Star tracker: Download image") path = input("Specify storage location (default - /mnt/sd0/startracker): ") if not path: path = FileDefs.download_path - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.DOWNLOAD_IMAGE) + bytearray(path, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=53, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "17": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Set limits")) + q.add_log_cmd("Star tracker: Set limits") json_file = get_config_file() - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.LIMITS) + bytearray(json_file, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=42, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "18": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Set tracking parameters") - ) + q.add_log_cmd("Star tracker: Set tracking parameters") json_file = get_config_file() - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.TRACKING) + bytearray(json_file, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=43, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "19": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Mounting")) + q.add_log_cmd("Star tracker: Mounting") json_file = get_config_file() - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.MOUNTING) + bytearray(json_file, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=44, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "20": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Camera")) + q.add_log_cmd("Star tracker: Camera") json_file = get_config_file() - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.CAMERA) + bytearray(json_file, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=45, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "22": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Centroiding")) + q.add_log_cmd("Star tracker: Centroiding") json_file = get_config_file() - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.CENTROIDING) + bytearray(json_file, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=47, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "23": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: LISA")) + q.add_log_cmd("Star tracker: LISA") json_file = get_config_file() - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.LISA) + bytearray(json_file, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=48, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "24": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Matching")) + q.add_log_cmd("Star tracker: Matching") json_file = get_config_file() - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.MATCHING) + bytearray(json_file, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=49, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "25": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Validation")) + q.add_log_cmd("Star tracker: Validation") json_file = get_config_file() - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.VALIDATION) + bytearray(json_file, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=50, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "26": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Algo")) + q.add_log_cmd("Star tracker: Algo") json_file = get_config_file() - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.ALGO) + bytearray(json_file, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=51, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "27": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Take image")) + q.add_log_cmd("Star tracker: Take image") actionid = int(input("Specify parameter ID (take image - 4): ")) - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.TAKE_IMAGE) + struct.pack("!B", actionid) ) - command = PusTelecommand(service=8, subservice=128, ssc=52, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "28": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Stop str helper")) - command = object_id + struct.pack("!I", StarTrackerActionIds.STOP_STR_HELPER) - command = PusTelecommand(service=8, subservice=128, ssc=54, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Stop str helper") + data = obyt + struct.pack("!I", StarTrackerActionIds.STOP_STR_HELPER) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "30": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Set name of download image") - ) + q.add_log_cmd("Star tracker: Set name of download image") filename = input("Specify download image name: ") - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.CHANGE_DOWNLOAD_IMAGE) + bytearray(filename, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=54, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "31": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Request histogram")) - command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_HISTOGRAM) - command = PusTelecommand(service=8, subservice=128, ssc=55, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Request histogram") + data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_HISTOGRAM) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "32": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Request contrast")) - command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_CONTRAST) - command = PusTelecommand(service=8, subservice=128, ssc=56, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Request contrast") + data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_CONTRAST) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "33": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Set json filename")) + q.add_log_cmd("Star tracker: Set json filename") json_file = get_config_file() - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.SET_JSON_FILE_NAME) + bytearray(json_file, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=57, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "35": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Flash read")) - command = pack_read_command(object_id) - command = PusTelecommand(service=8, subservice=128, ssc=59, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Flash read") + data = pack_read_command(obyt) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "36": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Set flash read filename") - ) + q.add_log_cmd("Star tracker: Set flash read filename") filename = input("Specify filename: ") - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.SET_FLASH_READ_FILENAME) + bytearray(filename, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=60, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "37": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Get checksum")) - command = pack_checksum_command(object_id) - command = PusTelecommand(service=8, subservice=128, ssc=61, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Get checksum") + data = pack_checksum_command(obyt) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "38": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Set time")) + q.add_log_cmd("Star tracker: Set time") unix_time = 1640783543 - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.SET_TIME) + struct.pack("!Q", unix_time) ) - command = PusTelecommand(service=8, subservice=128, ssc=61, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "39": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Download Centroid")) + q.add_log_cmd("Star tracker: Download Centroid") id = 0 - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.DOWNLOAD_CENTROID) + struct.pack("!B", id) ) - command = PusTelecommand(service=8, subservice=128, ssc=62, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "41": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Download matched star") - ) + q.add_log_cmd("Star tracker: Download matched star") id = 0 - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.DOWNLOAD_MATCHED_STAR) + struct.pack("!B", id) ) - command = PusTelecommand(service=8, subservice=128, ssc=64, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "42": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Download DB Image")) + q.add_log_cmd("Star tracker: Download DB Image") id = 0 - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.DOWNLOAD_DBIMAGE) + struct.pack("!B", id) ) - command = PusTelecommand(service=8, subservice=128, ssc=65, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "43": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Download Blob Pixel")) + q.add_log_cmd("Star tracker: Download Blob Pixel") id = 0 type = 1 # 0 - normal, 1 - fast - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.DOWNLOAD_BLOBPIXEL) + struct.pack("!B", id) + struct.pack("!B", type) ) - command = PusTelecommand(service=8, subservice=128, ssc=65, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "44": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Download FPGA Image")) + q.add_log_cmd("Star tracker: Download FPGA Image") position = int(input("Start position: ")) length = int(input("Size to download: ")) - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.DOWNLOAD_FPGA_IMAGE) + struct.pack("!I", position) + struct.pack("!I", length) + bytearray(FileDefs.downloadFpgaImagePath, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=66, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "45": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Change donwload FPGA image file name") - ) - command = ( - object_id + q.add_log_cmd("Star tracker: Change donwload FPGA image file name") + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.CHANGE_FPGA_DOWNLOAD_FILE) + bytearray(FileDefs.downloadFpgaImageName, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=67, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "46": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Upload FPGA image")) - command = ( - object_id + q.add_log_cmd("Star tracker: Upload FPGA image") + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.UPLOAD_FPGA_IMAGE) + bytearray(FileDefs.uploadFpgaImageName, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=68, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "47": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: FPGA action")) + q.add_log_cmd("Star tracker: FPGA action") id = 3 - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.FPGA_ACTION) + struct.pack("!B", id) ) - command = PusTelecommand(service=8, subservice=128, ssc=69, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "48": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Unlock")) - command = object_id + struct.pack("!I", StarTrackerActionIds.UNLOCK) - command = PusTelecommand(service=8, subservice=128, ssc=70, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Unlock") + data = obyt + struct.pack("!I", StarTrackerActionIds.UNLOCK) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "49": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Request camera parameters") - ) - command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_CAMERA_PARAMS) - command = PusTelecommand(service=8, subservice=128, ssc=71, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Request camera parameters") + data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_CAMERA_PARAMS) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "50": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Request limits")) - command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_LIMITS) - command = PusTelecommand(service=8, subservice=128, ssc=72, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Request limits") + data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_LIMITS) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "51": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Set image processor parameters") - ) + q.add_log_cmd("Star tracker: Set image processor parameters") json_file = get_config_file() - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.IMAGE_PROCESSOR) + bytearray(json_file, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=70, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "52": - tc_queue.appendleft( - ( - QueueCommands.PRINT, - "Star tracker: EGSE load ground config camera parameters", - ) - ) - command = ( - object_id + q.add_log_cmd("Star tracker: EGSE load ground config camera parameters") + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.CAMERA) + bytearray(FileDefs.egse_ground_config, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=71, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "53": - tc_queue.appendleft( - ( - QueueCommands.PRINT, - "Star tracker: EGSE load flight config camera parameters", - ) - ) - command = ( - object_id + q.add_log_cmd("Star tracker: EGSE load flight config camera parameters") + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.CAMERA) + bytearray(FileDefs.egse_flight_config, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=72, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "54": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Request log level parameters") - ) - command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_LOG_LEVEL) - command = PusTelecommand(service=8, subservice=128, ssc=74, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Request log level parameters") + data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_LOG_LEVEL) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "55": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Request mounting parameters") - ) - command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_MOUNTING) - command = PusTelecommand(service=8, subservice=128, ssc=75, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Request mounting parameters") + data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_MOUNTING) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "56": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Request image processor parameters") - ) - command = object_id + struct.pack( - "!I", StarTrackerActionIds.REQ_IMAGE_PROCESSOR - ) - command = PusTelecommand(service=8, subservice=128, ssc=76, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Request image processor parameters") + data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_IMAGE_PROCESSOR) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "57": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Request centroiding parameters") - ) - command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_CENTROIDING) - command = PusTelecommand(service=8, subservice=128, ssc=75, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Request centroiding parameters") + data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_CENTROIDING) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "58": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Request lisa parameters") - ) - command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_LISA) - command = PusTelecommand(service=8, subservice=128, ssc=76, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Request lisa parameters") + data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_LISA) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "59": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Request matching parameters") - ) - command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_MATCHING) - command = PusTelecommand(service=8, subservice=128, ssc=77, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Request matching parameters") + data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_MATCHING) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "60": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Request tracking parameters") - ) - command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_TRACKING) - command = PusTelecommand(service=8, subservice=128, ssc=78, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Request tracking parameters") + data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_TRACKING) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "61": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Request validation parameters") - ) - command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_VALIDATION) - command = PusTelecommand(service=8, subservice=128, ssc=79, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Request validation parameters") + data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_VALIDATION) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "62": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Request algo parameters") - ) - command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_ALGO) - command = PusTelecommand(service=8, subservice=128, ssc=80, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Request algo parameters") + data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_ALGO) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "63": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Request subscription parameters") - ) - command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_SUBSCRIPTION) - command = PusTelecommand(service=8, subservice=128, ssc=81, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Request subscription parameters") + data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_SUBSCRIPTION) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "64": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Request log subscription parameters") - ) - command = object_id + struct.pack( - "!I", StarTrackerActionIds.REQ_LOG_SUBSCRIPTION - ) - command = PusTelecommand(service=8, subservice=128, ssc=82, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Request log subscription parameters") + data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_LOG_SUBSCRIPTION) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "65": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Request debug camera parameters") - ) - command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_DEBUG_CAMERA) - command = PusTelecommand(service=8, subservice=128, ssc=83, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Star tracker: Request debug camera parameters") + data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_DEBUG_CAMERA) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "66": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Set log level parameters") - ) + q.add_log_cmd("Star tracker: Set log level parameters") json_file = get_config_file() - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.LOGLEVEL) + bytearray(json_file, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=84, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "67": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Set log subscription parameters") - ) + q.add_log_cmd("Star tracker: Set log subscription parameters") + json_file = get_config_file() - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.LOG_SUBSCRIPTION) + bytearray(json_file, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=85, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "68": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Set debug camera parameters") - ) + q.add_log_cmd("Star tracker: Set debug camera parameters") json_file = get_config_file() - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.DEBUG_CAMERA) + bytearray(json_file, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=86, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "69": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Firmware update")) + q.add_log_cmd("Star tracker: Firmware update") firmware = get_firmware() - command = ( - object_id + data = ( + obyt + struct.pack("!I", StarTrackerActionIds.FIRMWARE_UPDATE) + bytearray(firmware, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=87, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "70": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Disable timestamp generation") - ) - command = object_id + struct.pack( + q.add_log_cmd("Star tracker: Disable timestamp generation") + command = obyt + struct.pack( "!I", StarTrackerActionIds.DISBALE_TIMESTAMP_GENERATION ) - command = PusTelecommand(service=8, subservice=128, ssc=88, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "71": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker: Enable timestamp generation") - ) - command = object_id + struct.pack( + q.add_log_cmd("Star tracker: Enable timestamp generation") + command = obyt + struct.pack( "!I", StarTrackerActionIds.ENABLE_TIMESTAMP_GENERATION ) - command = PusTelecommand(service=8, subservice=128, ssc=89, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) -def pack_read_command(object_id: bytearray) -> bytearray: +def pack_read_command(object_id: bytes) -> bytearray: start_region = StartRegion.STAR_TRACKER_FIRMWARE size = PartitionSize.STAR_TRACKER_FIRMWARE path = input("Specify storage location (default - /mnt/sd0/startracker): ") if not path: path = FileDefs.download_path - command = ( + data = ( object_id + struct.pack("!I", StarTrackerActionIds.FLASH_READ) + struct.pack("!B", start_region) + struct.pack("!I", size) + bytearray(path, "utf-8") ) - return command + return bytearray(data) -def pack_checksum_command(object_id: bytearray) -> bytearray: +def pack_checksum_command(object_id: bytes) -> bytearray: start_region = StartRegion.STAR_TRACKER_FIRMWARE address = 0 size = PartitionSize.STAR_TRACKER_FIRMWARE - command = ( + data = ( object_id + struct.pack("!I", StarTrackerActionIds.CHECKSUM) + struct.pack("!B", start_region) + struct.pack("!I", address) + struct.pack("!I", size) ) - return command + return bytearray(data) def get_config_file() -> str: diff --git a/pus_tc/devs/str_img_helper.py b/pus_tc/devs/str_img_helper.py index 35c3ff4..266011b 100644 --- a/pus_tc/devs/str_img_helper.py +++ b/pus_tc/devs/str_img_helper.py @@ -10,10 +10,9 @@ """ import struct -from tmtccmd.config.definitions import QueueCommands - -from tmtccmd.tc.packer import TcQueueT from spacepackets.ecss.tc import PusTelecommand +from tmtccmd.tc import QueueHelper +from tmtccmd.utility import ObjectId class Commands: @@ -25,24 +24,15 @@ class ImagePathDefs: uploadFile = "/mnt/sd0/startracker/gemma.bin" -def pack_str_img_helper_command( - object_id: bytearray, tc_queue: TcQueueT, op_code: str -) -> TcQueueT: - tc_queue.appendleft( - ( - QueueCommands.PRINT, - "Testing star tracker image helper object id: 0x" + object_id.hex(), - ) +def pack_str_img_helper_command(object_id: ObjectId, q: QueueHelper, op_code: str): + q.add_log_cmd( + f"Testing star tracker image helper object id: {object_id.as_hex_string}" ) - if op_code == "0": - tc_queue.appendleft( - (QueueCommands.PRINT, "Star tracker image helper: Upload image") - ) + q.add_log_cmd("Star tracker image helper: Upload image") command = ( - object_id + object_id.as_bytes + struct.pack("!I", Commands.UPLOAD_IMAGE) + bytearray(ImagePathDefs.uploadFile, "utf-8") ) - command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) diff --git a/pus_tc/devs/syrlinks_hk_handler.py b/pus_tc/devs/syrlinks_hk_handler.py index 48adbcd..6e919b6 100644 --- a/pus_tc/devs/syrlinks_hk_handler.py +++ b/pus_tc/devs/syrlinks_hk_handler.py @@ -8,11 +8,15 @@ from tmtccmd.config.definitions import QueueCommands from tmtccmd.tc.definitions import TcQueueT + +from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command from spacepackets.ecss.tc import PusTelecommand from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes import struct +from tmtccmd.utility import ObjectId + class SetIds: RX_REGISTERS_DATASET = 1 @@ -38,116 +42,87 @@ class CommandIds: def pack_syrlinks_command( - object_id: bytearray, tc_queue: TcQueueT, op_code: str + object_id: ObjectId, q: QueueHelper, op_code: str ) -> TcQueueT: - tc_queue.appendleft( - ( - QueueCommands.PRINT, - "Testing Syrlinks with object id: 0x" + object_id.hex(), - ) - ) - + obyt = object_id.as_bytes + q.add_log_cmd(f"Testing Syrlinks with object id: {object_id.as_hex_string}") if op_code == "0": - tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set mode off")) - command = pack_mode_data(object_id, Modes.OFF, 0) - command = PusTelecommand(service=200, subservice=1, ssc=9, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Syrlinks: Set mode off") + data = pack_mode_data(obyt, Modes.OFF, 0) + q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) if op_code == "1": - tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set mode on")) - command = pack_mode_data(object_id, Modes.ON, 0) - command = PusTelecommand(service=200, subservice=1, ssc=10, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Syrlinks: Set mode on") + data = pack_mode_data(obyt, Modes.ON, 0) + q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) if op_code == "2": - tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Mode Normal")) - command = pack_mode_data(object_id, Modes.NORMAL, 0) - command = PusTelecommand(service=200, subservice=1, ssc=11, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Syrlinks: Mode Normal") + data = pack_mode_data(obyt, Modes.NORMAL, 0) + q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) if op_code == "3": - tc_queue.appendleft((QueueCommands.PRINT, "syrlinks: Set TX mode standby")) - command = object_id + struct.pack("!I", CommandIds.SET_TX_MODE_STANDBY) - command = PusTelecommand(service=8, subservice=128, ssc=10, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("syrlinks: Set TX mode standby") + data = obyt + struct.pack("!I", CommandIds.SET_TX_MODE_STANDBY) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "4": - tc_queue.appendleft((QueueCommands.PRINT, "syrlinks: Set TX mode modulation")) - command = object_id + struct.pack("!I", CommandIds.SET_TX_MODE_MODULATION) - command = PusTelecommand(service=8, subservice=128, ssc=11, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("syrlinks: Set TX mode modulation") + data = obyt + struct.pack("!I", CommandIds.SET_TX_MODE_MODULATION) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "5": - tc_queue.appendleft((QueueCommands.PRINT, "syrlinks: Set TX mode CW")) - command = object_id + struct.pack("!I", CommandIds.SET_TX_MODE_CW) - command = PusTelecommand(service=8, subservice=128, ssc=12, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("syrlinks: Set TX mode CW") + data = obyt + struct.pack("!I", CommandIds.SET_TX_MODE_CW) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "6": - tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Get RX Registers")) - sid = make_sid(object_id, SetIds.RX_REGISTERS_DATASET) - command = generate_one_hk_command(sid, 200) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Syrlinks: Get RX Registers") + sid = make_sid(obyt, SetIds.RX_REGISTERS_DATASET) + q.add_pus_tc(generate_one_hk_command(sid)) if op_code == "7": - tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Get TX Registers")) - sid = make_sid(object_id, SetIds.TX_REGISTERS_DATASET) - command = generate_one_hk_command(sid, 201) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Syrlinks: Get TX Registers") + sid = make_sid(obyt, SetIds.TX_REGISTERS_DATASET) + q.add_pus_tc(generate_one_hk_command(sid)) if op_code == "8": - tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Read TX status")) - command = object_id + struct.pack("!I", CommandIds.READ_TX_STATUS) - command = PusTelecommand(service=8, subservice=128, ssc=13, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Syrlinks: Read TX status") + command = obyt + struct.pack("!I", CommandIds.READ_TX_STATUS) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "9": - tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Read TX waveform")) - command = object_id + struct.pack("!I", CommandIds.READ_TX_WAVEFORM) - command = PusTelecommand(service=8, subservice=128, ssc=14, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Syrlinks: Read TX waveform") + command = obyt + struct.pack("!I", CommandIds.READ_TX_WAVEFORM) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "10": - tc_queue.appendleft( - (QueueCommands.PRINT, "Syrlinks: Read TX AGC value high byte") - ) - command = object_id + struct.pack("!I", CommandIds.READ_TX_AGC_VALUE_HIGH_BYTE) - command = PusTelecommand(service=8, subservice=128, ssc=15, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Syrlinks: Read TX AGC value high byte") + command = obyt + struct.pack("!I", CommandIds.READ_TX_AGC_VALUE_HIGH_BYTE) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "11": - tc_queue.appendleft( - (QueueCommands.PRINT, "Syrlinks: Read TX AGC value low byte") - ) - command = object_id + struct.pack("!I", CommandIds.READ_TX_AGC_VALUE_LOW_BYTE) - command = PusTelecommand(service=8, subservice=128, ssc=16, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Syrlinks: Read TX AGC value low byte") + command = obyt + struct.pack("!I", CommandIds.READ_TX_AGC_VALUE_LOW_BYTE) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "12": - tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Write LCL config")) - command = object_id + struct.pack("!I", CommandIds.WRITE_LCL_CONFIG) - command = PusTelecommand(service=8, subservice=128, ssc=17, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Syrlinks: Write LCL config") + command = obyt + struct.pack("!I", CommandIds.WRITE_LCL_CONFIG) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "13": - tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Read RX status registers")) - command = object_id + struct.pack("!I", CommandIds.READ_RX_STATUS_REGISTERS) - command = PusTelecommand(service=8, subservice=128, ssc=18, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Syrlinks: Read RX status registers") + command = obyt + struct.pack("!I", CommandIds.READ_RX_STATUS_REGISTERS) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "14": - tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Read LCL config register")) - command = object_id + struct.pack("!I", CommandIds.READ_LCL_CONFIG_REGISTER) - command = PusTelecommand(service=8, subservice=128, ssc=19, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Syrlinks: Read LCL config register") + command = obyt + struct.pack("!I", CommandIds.READ_LCL_CONFIG_REGISTER) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "15": - tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set waveform OQPSK")) - command = object_id + struct.pack("!I", CommandIds.SET_WAVEFORM_OQPSK) - command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Syrlinks: Set waveform OQPSK") + command = obyt + struct.pack("!I", CommandIds.SET_WAVEFORM_OQPSK) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "16": - tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set waveform BPSK")) - command = object_id + struct.pack("!I", CommandIds.SET_WAVEFORM_BPSK) - command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Syrlinks: Set waveform BPSK") + command = obyt + struct.pack("!I", CommandIds.SET_WAVEFORM_BPSK) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "17": - tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set second config")) - command = object_id + struct.pack("!I", CommandIds.SET_SECOND_CONFIG) - command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Syrlinks: Set second config") + command = obyt + struct.pack("!I", CommandIds.SET_SECOND_CONFIG) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "18": - tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Enable debug printout")) - command = object_id + struct.pack("!I", CommandIds.ENABLE_DEBUG) - command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Syrlinks: Enable debug printout") + command = obyt + struct.pack("!I", CommandIds.ENABLE_DEBUG) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "19": - tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Disable debug printout")) - command = object_id + struct.pack("!I", CommandIds.DISABLE_DEBUG) - command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd("Syrlinks: Disable debug printout") + command = obyt + struct.pack("!I", CommandIds.DISABLE_DEBUG) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) diff --git a/pus_tc/procedure_packer.py b/pus_tc/procedure_packer.py index 35a8d5a..6c7c8e9 100644 --- a/pus_tc/procedure_packer.py +++ b/pus_tc/procedure_packer.py @@ -1,8 +1,6 @@ """Hook function which packs telecommands based on service and operation code string """ import logging -import os -from collections import deque from typing import Union from pus_tc.devs.rtd import pack_rtd_commands @@ -12,7 +10,6 @@ from tmtccmd.config import CoreServiceList from tmtccmd.logging import get_console_logger from tmtccmd.tc import FeedWrapper from tmtccmd.tc.pus_5_event import ( - pack_generic_service5_test_into, pack_generic_service_5_test_into, ) from tmtccmd.pus.pus_17_test import pack_service_17_ping_command @@ -44,10 +41,8 @@ from pus_tc.devs.plpcdu import pack_pl_pcdu_commands from pus_tc.devs.str_img_helper import pack_str_img_helper_command from pus_tc.system.tcs import pack_tcs_sys_commands from pus_tc.system.proc import pack_proc_commands -from pus_tc.system.controllers import pack_cmd_ctrl_to_prompted_mode from config.definitions import CustomServiceList from config.object_ids import ( - get_object_ids, P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, @@ -65,7 +60,6 @@ from config.object_ids import ( PLOC_SUPV_ID, STAR_TRACKER_ID, PLOC_MEMORY_DUMPER_ID, - GPS_CONTROLLER, CCSDS_HANDLER_ID, PDEC_HANDLER_ID, STR_IMG_HELPER_ID, @@ -156,60 +150,56 @@ def handle_default_procedure(info: DefaultProcedureInfo, wrapper: FeedWrapper): if service == CustomServiceList.STAR_TRACKER.value: object_id = STAR_TRACKER_ID return pack_star_tracker_commands( - object_id=object_id, tc_queue=service_queue, op_code=op_code + object_id=object_id, q=queue_helper, op_code=op_code ) if service == CustomServiceList.STR_IMG_HELPER.value: object_id = STR_IMG_HELPER_ID return pack_str_img_helper_command( - object_id=object_id, tc_queue=service_queue, op_code=op_code + object_id=object_id, q=queue_helper, op_code=op_code ) if service == CustomServiceList.CORE.value: - return pack_core_commands(tc_queue=service_queue, op_code=op_code) + return pack_core_commands(q=queue_helper, op_code=op_code) if service == CustomServiceList.PLOC_MEMORY_DUMPER.value: object_id = PLOC_MEMORY_DUMPER_ID return pack_ploc_memory_dumper_cmd( - object_id=object_id, tc_queue=service_queue, op_code=op_code + object_id=object_id, q=queue_helper, op_code=op_code ) if service == CustomServiceList.ACS.value: - return pack_acs_command(tc_queue=service_queue, op_code=op_code) + return pack_acs_command(q=queue_helper, op_code=op_code) if service == CustomServiceList.GPS_CTRL.value: return pack_gps_command( - object_id=oids.GPS_CONTROLLER, tc_queue=service_queue, op_code=op_code + object_id=oids.GPS_CONTROLLER, q=queue_helper, op_code=op_code ) if service == CustomServiceList.CCSDS_HANDLER.value: return pack_ccsds_handler_test( - object_id=CCSDS_HANDLER_ID, tc_queue=service_queue, op_code=op_code + object_id=CCSDS_HANDLER_ID, q=queue_helper, op_code=op_code ) if service == CustomServiceList.PDEC_HANDLER.value: return pack_ccsds_handler_test( - object_id=PDEC_HANDLER_ID, tc_queue=service_queue, op_code=op_code + object_id=PDEC_HANDLER_ID, q=queue_helper, op_code=op_code ) if service == CustomServiceList.SYRLINKS.value: return pack_syrlinks_command( - object_id=SYRLINKS_HANDLER_ID, tc_queue=service_queue, op_code=op_code + object_id=SYRLINKS_HANDLER_ID, q=queue_helper, op_code=op_code ) if service == CustomServiceList.SA_DEPLYOMENT.value: return pack_solar_array_deployment_test_into( - object_id=SOLAR_ARRAY_DEPLOYMENT_ID, tc_queue=service_queue + object_id=SOLAR_ARRAY_DEPLOYMENT_ID, q=queue_helper ) if service == CustomServiceList.PROCEDURE.value: - return pack_proc_commands(tc_queue=service_queue, op_code=op_code) + return pack_proc_commands(q=queue_helper, op_code=op_code) if service == CustomServiceList.SUS_ASS.value: - return pack_sus_cmds(tc_queue=service_queue, op_code=op_code) + return pack_sus_cmds(q=queue_helper, op_code=op_code) if service == CustomServiceList.PL_PCDU.value: - return pack_pl_pcdu_commands(tc_queue=service_queue, op_code=op_code) + return pack_pl_pcdu_commands(q=queue_helper, op_code=op_code) if service == CustomServiceList.ACS_ASS.value: - return pack_acs_command(tc_queue=service_queue, op_code=op_code) + return pack_acs_command(q=queue_helper, op_code=op_code) if service == CustomServiceList.TCS_ASS.value: - return pack_tcs_sys_commands(tc_queue=service_queue, op_code=op_code) + return pack_tcs_sys_commands(q=queue_helper, op_code=op_code) if service == CustomServiceList.TIME.value: - return pack_set_current_time_ascii_command(tc_queue=service_queue, ssc=0) + return pack_set_current_time_ascii_command(q=queue_helper) if service == CustomServiceList.RW_ASSEMBLY.value: - return pack_rw_ass_cmds( - tc_queue=service_queue, object_id=RW_ASSEMBLY, op_code=op_code - ) - if service == CustomServiceList.CONTROLLERS.value: - return pack_controller_commands(tc_queue=service_queue, op_code=op_code) + return pack_rw_ass_cmds(q=queue_helper, object_id=RW_ASSEMBLY, op_code=op_code) LOGGER.warning(f"Invalid Service {service}") @@ -231,18 +221,3 @@ def pre_tc_send_cb( elif isinstance(queue_entry, QueueCommands): if queue_entry == QueueCommands.PRINT: file_logger.info(queue_info) - - -def pack_service_queue_user( - service: Union[str, int], op_code: str, service_queue: TcQueueT -): - pass - - -def create_total_tc_queue_user() -> TcQueueT: - if not os.path.exists("log"): - os.mkdir("log") - tc_queue = deque() - pack_generic_service5_test_into(tc_queue) - tc_queue.appendleft(pack_service_17_ping_command(ssc=1700).pack_command_tuple()) - return tc_queue diff --git a/pus_tc/system/acs.py b/pus_tc/system/acs.py index 76b3dcc..f995f67 100644 --- a/pus_tc/system/acs.py +++ b/pus_tc/system/acs.py @@ -1,5 +1,6 @@ import enum -from tmtccmd.tc.definitions import TcQueueT + +from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_200_fsfw_modes import Modes from config.object_ids import ACS_BOARD_ASS_ID, SUS_BOARD_ASS_ID @@ -29,13 +30,13 @@ class DualSideSubmodes(enum.IntEnum): DUAL_SIDE = 2 -def pack_acs_command(tc_queue: TcQueueT, op_code: str): +def pack_acs_command(q: QueueHelper, op_code: str): if op_code in AcsOpCodes.ACS_ASS_A_SIDE: command_mode( object_id=ACS_BOARD_ASS_ID, mode=Modes.NORMAL, submode=DualSideSubmodes.A_SIDE, - tc_queue=tc_queue, + q=q, info="Switching to ACS board assembly A side", ) if op_code in AcsOpCodes.ACS_ASS_B_SIDE: @@ -43,7 +44,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str): object_id=ACS_BOARD_ASS_ID, mode=Modes.NORMAL, submode=DualSideSubmodes.B_SIDE, - tc_queue=tc_queue, + q=q, info="Switching to ACS board assembly B side", ) if op_code in AcsOpCodes.ACS_ASS_DUAL_MODE: @@ -51,7 +52,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str): object_id=ACS_BOARD_ASS_ID, mode=Modes.NORMAL, submode=DualSideSubmodes.DUAL_SIDE, - tc_queue=tc_queue, + q=q, info="Switching to ACS board assembly dual mode", ) if op_code in AcsOpCodes.ACS_ASS_A_ON: @@ -59,7 +60,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str): object_id=ACS_BOARD_ASS_ID, mode=Modes.ON, submode=DualSideSubmodes.A_SIDE, - tc_queue=tc_queue, + q=q, info="Switching ACS board assembly A side on", ) if op_code in AcsOpCodes.ACS_ASS_B_ON: @@ -67,7 +68,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str): object_id=ACS_BOARD_ASS_ID, mode=Modes.ON, submode=DualSideSubmodes.B_SIDE, - tc_queue=tc_queue, + q=q, info="Switching ACS board assembly B side on", ) if op_code in AcsOpCodes.ACS_ASS_DUAL_ON: @@ -75,7 +76,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str): object_id=ACS_BOARD_ASS_ID, mode=Modes.ON, submode=DualSideSubmodes.B_SIDE, - tc_queue=tc_queue, + q=q, info="Switching ACS board assembly dual side on", ) if op_code in AcsOpCodes.ACS_ASS_OFF: @@ -83,18 +84,18 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str): object_id=ACS_BOARD_ASS_ID, mode=Modes.OFF, submode=0, - tc_queue=tc_queue, + q=q, info="Switching to ACS board assembly off", ) -def pack_sus_cmds(tc_queue: TcQueueT, op_code: str): +def pack_sus_cmds(q: QueueHelper, op_code: str): if op_code in SusOpCodes.SUS_ASS_NOM_SIDE: command_mode( object_id=SUS_BOARD_ASS_ID, mode=Modes.NORMAL, submode=DualSideSubmodes.A_SIDE, - tc_queue=tc_queue, + q=q, info="Switching to SUS board to nominal side", ) if op_code in SusOpCodes.SUS_ASS_RED_SIDE: @@ -102,7 +103,7 @@ def pack_sus_cmds(tc_queue: TcQueueT, op_code: str): object_id=SUS_BOARD_ASS_ID, mode=Modes.NORMAL, submode=DualSideSubmodes.B_SIDE, - tc_queue=tc_queue, + q=q, info="Switching to SUS board to redundant side", ) if op_code in SusOpCodes.SUS_ASS_OFF: @@ -110,7 +111,7 @@ def pack_sus_cmds(tc_queue: TcQueueT, op_code: str): object_id=SUS_BOARD_ASS_ID, mode=Modes.OFF, submode=0, - tc_queue=tc_queue, + q=q, info="Switching SUS board off", ) if op_code in SusOpCodes.SUS_ASS_DUAL_MODE: @@ -118,6 +119,6 @@ def pack_sus_cmds(tc_queue: TcQueueT, op_code: str): object_id=SUS_BOARD_ASS_ID, mode=Modes.NORMAL, submode=DualSideSubmodes.DUAL_SIDE, - tc_queue=tc_queue, + q=q, info="Switching to SUS board to dual side", ) diff --git a/pus_tc/system/common.py b/pus_tc/system/common.py index f6fc397..aecbb5f 100644 --- a/pus_tc/system/common.py +++ b/pus_tc/system/common.py @@ -1,7 +1,7 @@ from typing import Union -from tmtccmd.tc.definitions import TcQueueT, QueueCommands from spacepackets.ecss.tc import PusTelecommand +from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes, Subservices @@ -9,16 +9,17 @@ def command_mode( object_id: bytes, mode: Union[int, Modes], submode: int, - tc_queue: TcQueueT, + q: QueueHelper, info: str, ): - tc_queue.appendleft((QueueCommands.PRINT, info)) + q.add_log_cmd(info) mode_data = pack_mode_data( object_id=object_id, mode=mode, submode=submode, ) - cmd = PusTelecommand( - service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=mode_data + q.add_pus_tc( + PusTelecommand( + service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=mode_data + ) ) - tc_queue.appendleft(cmd.pack_command_tuple()) diff --git a/pus_tc/system/controllers.py b/pus_tc/system/controllers.py index 2f072cd..eaf3854 100644 --- a/pus_tc/system/controllers.py +++ b/pus_tc/system/controllers.py @@ -1,5 +1,4 @@ -from tmtccmd.tc.definitions import TcQueueT -from tmtccmd.config import QueueCommands +from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_200_fsfw_modes import Modes from tmtccmd.utility import ObjectId @@ -19,7 +18,7 @@ class Info: CORE_CONTROLLER = "ACS controller" -def pack_cmd_ctrl_to_prompted_mode(tc_queue: TcQueueT, object_id: ObjectId): +def pack_cmd_ctrl_to_prompted_mode(q: QueueHelper, object_id: ObjectId): parameters = prompt_parameters( [ {"name": "Mode", "defaultValue": "2"}, @@ -35,37 +34,37 @@ def pack_cmd_ctrl_to_prompted_mode(tc_queue: TcQueueT, object_id: ObjectId): object_id=object_id.as_bytes, mode=mode, submode=submode, - tc_queue=tc_queue, + q=q, info=f"Commanding {object_id} to {mode}, {submode}", ) -def pack_cmd_ctrl_to_off(tc_queue: TcQueueT, object_id: ObjectId): +def pack_cmd_ctrl_to_off(q: QueueHelper, object_id: ObjectId): command_mode( object_id=object_id.as_bytes, mode=Modes.OFF, submode=0, - tc_queue=tc_queue, + q=q, info=f"Commanding {object_id} OFF", ) -def pack_cmd_ctrl_to_on(tc_queue: TcQueueT, object_id: ObjectId): +def pack_cmd_ctrl_to_on(q: QueueHelper, object_id: ObjectId): command_mode( object_id=object_id.as_bytes, mode=Modes.ON, submode=0, - tc_queue=tc_queue, + q=q, info=f"Commanding {object_id} ON", ) -def pack_cmd_ctrl_to_nml(tc_queue: TcQueueT, object_id: ObjectId): +def pack_cmd_ctrl_to_nml(q: QueueHelper, object_id: ObjectId): command_mode( object_id=object_id.as_bytes, mode=Modes.NORMAL, submode=0, - tc_queue=tc_queue, + q=q, info=f"Commanding {object_id} NORMAL", ) diff --git a/pus_tc/system/core.py b/pus_tc/system/core.py index f44ad87..01ecca5 100644 --- a/pus_tc/system/core.py +++ b/pus_tc/system/core.py @@ -4,6 +4,8 @@ from config.definitions import CustomServiceList from tmtccmd.config import add_op_code_entry, add_service_op_code_entry from tmtccmd.config.definitions import QueueCommands, ServiceOpCodeDictT from tmtccmd.tc.definitions import TcQueueT + +from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_8_funccmd import generate_action_command from tmtccmd.logging import get_console_logger from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command @@ -122,47 +124,48 @@ def add_core_controller_definitions(cmd_dict: ServiceOpCodeDictT): ) -def pack_core_commands(tc_queue: TcQueueT, op_code: str): +def pack_core_commands(q: QueueHelper, op_code: str): if op_code in OpCodes.REBOOT_XSC: reboot_self, chip_select, copy_select = determine_reboot_params() perform_reboot_cmd( - tc_queue=tc_queue, + q=q, reboot_self=reboot_self, chip=chip_select, copy=copy_select, ) if op_code in OpCodes.REBOOT_FULL: - tc_queue.appendleft((QueueCommands.PRINT, f"Core Command: {Info.REBOOT_FULL}")) - cmd = generate_action_command( - object_id=CORE_CONTROLLER_ID, action_id=ActionIds.FULL_REBOOT + q.add_log_cmd(f"Core Command: {Info.REBOOT_FULL}") + q.add_pus_tc( + generate_action_command( + object_id=CORE_CONTROLLER_ID, action_id=ActionIds.FULL_REBOOT + ) ) - tc_queue.appendleft(cmd.pack_command_tuple()) if op_code in OpCodes.XSC_REBOOT_SELF: - perform_reboot_cmd(tc_queue=tc_queue, reboot_self=True) + perform_reboot_cmd(q=q, reboot_self=True) if op_code in OpCodes.XSC_REBOOT_0_0: perform_reboot_cmd( - tc_queue=tc_queue, reboot_self=False, chip=Chip.CHIP_0, copy=Copy.COPY_0_NOM + q=q, reboot_self=False, chip=Chip.CHIP_0, copy=Copy.COPY_0_NOM ) if op_code in OpCodes.XSC_REBOOT_0_1: perform_reboot_cmd( - tc_queue=tc_queue, + q=q, reboot_self=False, chip=Chip.CHIP_0, copy=Copy.COPY_1_GOLD, ) if op_code in OpCodes.XSC_REBOOT_1_0: perform_reboot_cmd( - tc_queue=tc_queue, reboot_self=False, chip=Chip.CHIP_1, copy=Copy.COPY_0_NOM + q=q, reboot_self=False, chip=Chip.CHIP_1, copy=Copy.COPY_0_NOM ) if op_code in OpCodes.XSC_REBOOT_1_1: perform_reboot_cmd( - tc_queue=tc_queue, + q=q, reboot_self=False, chip=Chip.CHIP_1, copy=Copy.COPY_1_GOLD, ) if op_code in OpCodes.DISABLE_REBOOT_FILE_HANDLING: - tc_queue.appendleft((QueueCommands.PRINT, "Disabling reboot file handling")) + q.add_log_cmd("Disabling reboot file handling") app_data = bytearray([0]) generate_action_command( object_id=CORE_CONTROLLER_ID, @@ -170,7 +173,7 @@ def pack_core_commands(tc_queue: TcQueueT, op_code: str): app_data=app_data, ) if op_code in OpCodes.ENABLE_REBOOT_FILE_HANDLING: - tc_queue.appendleft((QueueCommands.PRINT, "Enabling reboot file handling")) + q.add_log_cmd("Enabling reboot file handling") app_data = bytearray([1]) generate_action_command( object_id=CORE_CONTROLLER_ID, @@ -178,35 +181,34 @@ def pack_core_commands(tc_queue: TcQueueT, op_code: str): app_data=app_data, ) if op_code in OpCodes.RESET_ALL_REBOOT_COUNTERS: - tc_queue.appendleft((QueueCommands.PRINT, "Resetting all reboot counters")) + q.add_log_cmd("Resetting all reboot counters") generate_action_command( object_id=CORE_CONTROLLER_ID, action_id=ActionIds.RESET_ALL_REBOOT_COUNTERS ) if op_code in OpCodes.RESET_REBOOT_COUNTER_00: - tc_queue.appendleft((QueueCommands.PRINT, "Resetting reboot counter 0 0")) + q.add_log_cmd("Resetting reboot counter 0 0") generate_action_command( object_id=CORE_CONTROLLER_ID, action_id=ActionIds.RESET_REBOOT_COUNTER_00 ) if op_code in OpCodes.RESET_REBOOT_COUNTER_01: - tc_queue.appendleft((QueueCommands.PRINT, "Resetting reboot counter 0 1")) + q.add_log_cmd("Resetting reboot counter 0 1") generate_action_command( object_id=CORE_CONTROLLER_ID, action_id=ActionIds.RESET_REBOOT_COUNTER_01 ) if op_code in OpCodes.RESET_REBOOT_COUNTER_10: - tc_queue.appendleft((QueueCommands.PRINT, "Resetting reboot counter 1 0")) + q.add_log_cmd("Resetting reboot counter 1 0") generate_action_command( object_id=CORE_CONTROLLER_ID, action_id=ActionIds.RESET_REBOOT_COUNTER_10 ) if op_code in OpCodes.RESET_REBOOT_COUNTER_11: - tc_queue.appendleft((QueueCommands.PRINT, "Resetting reboot counter 1 1")) + q.add_log_cmd("Resetting reboot counter 1 1") generate_action_command( object_id=CORE_CONTROLLER_ID, action_id=ActionIds.RESET_REBOOT_COUNTER_11 ) if op_code in OpCodes.GET_HK: - tc_queue.appendleft((QueueCommands.PRINT, "Requesting housekeeping set")) + q.add_log_cmd("Requesting housekeeping set") sid = make_sid(object_id=CORE_CONTROLLER_ID, set_id=SetIds.HK) - command = generate_one_hk_command(sid, 201) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(generate_one_hk_command(sid)) def determine_reboot_params() -> (bool, Chip, Copy): @@ -241,31 +243,25 @@ def determine_reboot_params() -> (bool, Chip, Copy): def perform_reboot_cmd( - tc_queue: TcQueueT, + q: QueueHelper, reboot_self: bool, chip: Chip = Chip.NONE, copy: Copy = Copy.NONE, ): tc_data = bytearray() if reboot_self: - tc_queue.appendleft( - (QueueCommands.PRINT, "Packing reboot command for current image") - ) + q.add_log_cmd("Packing reboot command for current image") tc_data.append(True) else: tc_data.append(False) tc_data.append(chip) tc_data.append(copy) - tc_queue.append( - ( - QueueCommands.PRINT, - f"Packing reboot command for chip {chip} and copy {copy}", - ) + q.add_log_cmd(f"Packing reboot command for chip {chip} and copy {copy}") + q.add_pus_tc( + generate_action_command( + object_id=CORE_CONTROLLER_ID, + action_id=ActionIds.XSC_REBOOT, + app_data=tc_data, + ssc=0, ) - action_cmd = generate_action_command( - object_id=CORE_CONTROLLER_ID, - action_id=ActionIds.XSC_REBOOT, - app_data=tc_data, - ssc=0, ) - tc_queue.appendleft(action_cmd.pack_command_tuple()) diff --git a/pus_tc/system/proc.py b/pus_tc/system/proc.py index f22c481..84e0209 100644 --- a/pus_tc/system/proc.py +++ b/pus_tc/system/proc.py @@ -1,19 +1,14 @@ from __future__ import annotations -import struct import time +from datetime import timedelta from typing import List from config.definitions import CustomServiceList from config.object_ids import get_object_ids from pus_tc.system.tcs import pack_tcs_sys_commands -from tmtccmd.config import ( - QueueCommands, - ServiceOpCodeDictT, - add_op_code_entry, - add_service_op_code_entry, -) -from tmtccmd.tc.definitions import TcQueueT + +from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_11_tc_sched import ( generate_time_tagged_cmd, generate_enable_tc_sched_cmd, @@ -126,10 +121,8 @@ class GenericHkListeningCfg: return GenericHkListeningCfg(False, False, False) -def generic_print(tc_queue: TcQueueT, info: dict): - tc_queue.appendleft( - (QueueCommands.PRINT, f"Executing {info[1]} Procedure (OpCodes: {info[0]})") - ) +def generic_print(q: QueueHelper, info: dict): + q.add_log_cmd(f"Executing {info[1]} Procedure (OpCodes: {info[0]})") def add_proc_cmds(cmd_dict: ServiceOpCodeDictT): @@ -147,7 +140,7 @@ def add_proc_cmds(cmd_dict: ServiceOpCodeDictT): def pack_generic_hk_listening_cmds( - tc_queue: TcQueueT, + q: QueueHelper, proc_key: str, sid_list: list[bytearray], diag_list: list[bool], @@ -155,83 +148,77 @@ def pack_generic_hk_listening_cmds( ): info = PROC_INFO_DICT[proc_key] collection_time = info[2] - generic_print(tc_queue=tc_queue, info=info) + generic_print(q=q, info=info) for i in range(len(sid_list)): enable_listen_to_hk_for_x_seconds( diag=diag_list[i], - tc_queue=tc_queue, + q=q, device=proc_key, sid=sid_list[i], interval_seconds=info[3], ) if not cfg.use_tc_sched: - tc_queue.appendleft((QueueCommands.WAIT, 2.0)) + q.add_wait_seconds(2.0) if cfg.mgt: - activate_mgts_alternately( - tc_queue=tc_queue, - ) + activate_mgts_alternately(q) elif cfg.one_rw: activate_all_rws_in_sequence( - tc_queue=tc_queue, test_speed=20000, test_ramp_time=10000, init_ssc=0 + q=q, test_speed=20000, test_ramp_time=10000, init_ssc=0 ) elif cfg.two_rws: - activate_all_rws_two_consecutively(tc_queue=tc_queue, init_ssc=0) + activate_all_rws_two_consecutively(q=q) else: pass if not cfg.use_tc_sched: - tc_queue.appendleft((QueueCommands.WAIT, collection_time)) + q.add_wait_seconds(collection_time) disable_cmd_list = [] for i in range(len(sid_list)): disable_cmd_list.append( gen_disable_listen_to_hk_for_x_seconds( diag=diag_list[i], - tc_queue=tc_queue, + q=q, device=proc_key, sid=sid_list[i], ) ) if cfg.one_rw or cfg.two_rws: - activate_all_rws_in_sequence( - tc_queue=tc_queue, test_speed=0, test_ramp_time=5000, init_ssc=0 - ) - tc_queue.appendleft((QueueCommands.WAIT, 60)) + activate_all_rws_in_sequence(q=q, test_speed=0, test_ramp_time=5000, init_ssc=0) + q.add_wait_seconds(60.0) current_time = time.time() current_time += collection_time if not cfg.use_tc_sched: for cmd in disable_cmd_list: - tc_queue.appendleft(cmd.pack_command_tuple()) + q.add_pus_tc(cmd) else: for cmd in disable_cmd_list: - tc_queue.appendleft( + q.add_pus_tc( generate_time_tagged_cmd( - release_time=struct.pack("!I", int(current_time)), - tc_to_insert=cmd, - ssc=0, + release_time=struct.pack("!I", int(current_time)), tc_to_insert=cmd ) ) if not cfg.use_tc_sched: - tc_queue.appendleft((QueueCommands.WAIT, 60)) + q.add_wait_seconds(60.0) sid_list.clear() diag_list.clear() -def pack_proc_commands(tc_queue: TcQueueT, op_code: str): +def pack_proc_commands(q: QueueHelper, op_code: str): sid_list = [] obj_id_dict = get_object_ids() if op_code in OpCodes.RESET_SCHED: - tc_queue.appendleft((QueueCommands.PRINT, "Resetting/Clearing TC schedule")) - tc_queue.appendleft(generate_reset_tc_sched_cmd().pack_command_tuple()) + q.add_log_cmd("Resetting/Clearing TC schedule") + q.add_pus_tc(generate_reset_tc_sched_cmd()) if op_code in OpCodes.BAT_FT: key = KAI.BAT_FT[0] sid_list.append(make_sid(oids.BPX_HANDLER_ID, BpxSetIds.GET_HK_SET)) diag_list = [False] pack_generic_hk_listening_cmds( - tc_queue=tc_queue, + q=q, proc_key=key, sid_list=sid_list, diag_list=diag_list, @@ -243,7 +230,7 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): sid_list.append(make_sid(oids.CORE_CONTROLLER_ID, CoreSetIds.HK)) diag_list = [False] pack_generic_hk_listening_cmds( - tc_queue=tc_queue, + q=q, proc_key=key, sid_list=sid_list, diag_list=diag_list, @@ -279,7 +266,7 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): set_id = pcdu_dev[1] sid_list.append(make_sid(oid, set_id)) pack_generic_hk_listening_cmds( - tc_queue=tc_queue, + q=q, proc_key=key, sid_list=sid_list, diag_list=diag_list, @@ -291,7 +278,7 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): sid_list.append(make_sid(oids.RAD_SENSOR_ID, RadSetIds.HK)) diag_list = [False] pack_generic_hk_listening_cmds( - tc_queue=tc_queue, + q=q, proc_key=key, sid_list=sid_list, diag_list=diag_list, @@ -300,20 +287,14 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): if op_code in OpCodes.TV_SETUP_TCS_FT_ON: # Enable scheduling - tc_queue.appendleft(generate_enable_tc_sched_cmd(ssc=22).pack_command_tuple()) + q.add_pus_tc(generate_enable_tc_sched_cmd()) # check whether tcs_assembly also has to be commanded to NORMAL Mode - pack_tcs_sys_commands( - tc_queue=tc_queue, op_code=TcsOpCodes.TCS_BOARD_ASS_NORMAL[0] - ) - pack_cmd_ctrl_to_nml( - tc_queue=tc_queue, object_id=obj_id_dict.get(oids.THERMAL_CONTROLLER_ID) - ) + pack_tcs_sys_commands(q=q, op_code=TcsOpCodes.TCS_BOARD_ASS_NORMAL[0]) + pack_cmd_ctrl_to_nml(q=q, object_id=obj_id_dict.get(oids.THERMAL_CONTROLLER_ID)) if op_code in OpCodes.TV_TEARDOWN_TCS_FT_OFF: # TCS board should always be on anyway, do not command it off here - pack_cmd_ctrl_to_off( - tc_queue=tc_queue, object_id=obj_id_dict.get(oids.THERMAL_CONTROLLER_ID) - ) + pack_cmd_ctrl_to_off(q=q, object_id=obj_id_dict.get(oids.THERMAL_CONTROLLER_ID)) if op_code in OpCodes.ACS_FT: key = KAI.ACS_FT[0] @@ -333,23 +314,23 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): ] d_side_pairs = a_side_pairs + b_side_pairs diag_list = [False, False, True, False, False] - pack_acs_command(tc_queue=tc_queue, op_code="acs-a") + pack_acs_command(q=q, op_code="acs-a") for a_side_dev in a_side_pairs: oid = a_side_dev[0] set_id = a_side_dev[1] sid_list.append(make_sid(oid, set_id)) pack_generic_hk_listening_cmds( - tc_queue=tc_queue, + q=q, proc_key=key, sid_list=sid_list, diag_list=diag_list, cfg=GenericHkListeningCfg.default(), ) - pack_acs_command(tc_queue=tc_queue, op_code="acs-off") - tc_queue.appendleft((QueueCommands.WAIT, 5.0)) - pack_acs_command(tc_queue=tc_queue, op_code="acs-b") + pack_acs_command(q=q, op_code="acs-off") + q.add_wait_seconds(5.0) + pack_acs_command(q=q, op_code="acs-b") sid_list.clear() diag_list = [False, False, True, False, False] @@ -359,16 +340,16 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): set_id = b_side_dev[1] sid_list.append(make_sid(oid, set_id)) pack_generic_hk_listening_cmds( - tc_queue=tc_queue, + q=q, proc_key=key, sid_list=sid_list, diag_list=diag_list, cfg=GenericHkListeningCfg.default(), ) - pack_acs_command(tc_queue=tc_queue, op_code="acs-off") - tc_queue.appendleft((QueueCommands.WAIT, 5.0)) - pack_acs_command(tc_queue=tc_queue, op_code="acs-d") + pack_acs_command(q=q, op_code="acs-off") + q.add_wait_seconds(5.0) + pack_acs_command(q=q, op_code="acs-d") sid_list.clear() @@ -389,14 +370,14 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): False, ] pack_generic_hk_listening_cmds( - tc_queue=tc_queue, + q=q, proc_key=key, sid_list=sid_list, diag_list=diag_list, cfg=GenericHkListeningCfg.default(), ) - pack_acs_command(tc_queue=tc_queue, op_code="acs-off") + pack_acs_command(q=q, op_code="acs-off") if op_code in OpCodes.MGT_FT: key = KAI.MGT_FT[0] @@ -412,24 +393,24 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): ] # Command MGT to mode on - pack_imtq_test_into(oids.IMTQ_HANDLER_ID, q=tc_queue, op_code="1") - tc_queue.appendleft((QueueCommands.WAIT, 5)) + pack_imtq_test_into(oids.IMTQ_HANDLER_ID, q=q, op_code="1") + q.add_wait_seconds(5.0) # Command MGT to normal mode - pack_imtq_test_into(oids.IMTQ_HANDLER_ID, q=tc_queue, op_code="2") + pack_imtq_test_into(oids.IMTQ_HANDLER_ID, q=q, op_code="2") for imtq_dev in imtq_pairs: oid = imtq_dev[0] set_id = imtq_dev[1] sid_list.append(make_sid(oid, set_id)) pack_generic_hk_listening_cmds( - tc_queue=tc_queue, + q=q, proc_key=key, sid_list=sid_list, diag_list=diag_list, cfg=GenericHkListeningCfg.default(), ) - pack_imtq_test_into(oids.IMTQ_HANDLER_ID, q=tc_queue, op_code="0") + pack_imtq_test_into(oids.IMTQ_HANDLER_ID, q=q, op_code="0") if op_code in OpCodes.MGT_FT_DP: key = KAI.MGT_FT_DP[0] @@ -468,12 +449,12 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): True, True, ] - pack_acs_command(tc_queue=tc_queue, op_code="acs-d") + pack_acs_command(q=q, op_code="acs-d") # Command MGT to mode on - pack_imtq_test_into(oids.IMTQ_HANDLER_ID, q=tc_queue, op_code="1") - tc_queue.appendleft((QueueCommands.WAIT, 20)) + pack_imtq_test_into(oids.IMTQ_HANDLER_ID, q=q, op_code="1") + q.add_wait_seconds(20.0) # Command MGT to normal mode - pack_imtq_test_into(oids.IMTQ_HANDLER_ID, q=tc_queue, op_code="2") + pack_imtq_test_into(oids.IMTQ_HANDLER_ID, q=q, op_code="2") for d_side_and_imtq_dev in d_side_and_imtq_pairs: oid = d_side_and_imtq_dev[0] @@ -483,20 +464,20 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): cfg = GenericHkListeningCfg.default() cfg.mgt = True pack_generic_hk_listening_cmds( - tc_queue=tc_queue, + q=q, proc_key=key, sid_list=sid_list, diag_list=diag_list, cfg=cfg, ) - pack_imtq_test_into(oids.IMTQ_HANDLER_ID, q=tc_queue, op_code="0") - pack_acs_command(tc_queue=tc_queue, op_code="acs-off") + pack_imtq_test_into(oids.IMTQ_HANDLER_ID, q=q, op_code="0") + pack_acs_command(q=q, op_code="acs-off") if op_code in OpCodes.SUS_FT: key = KAI.SUS_FT[0] - pack_sus_cmds(tc_queue=tc_queue, op_code="sus-nom") + pack_sus_cmds(q=q, op_code="sus-nom") sus_n_ids = [ oids.SUS_0_N_LOC_XFYFZM_PT_XF, @@ -527,16 +508,16 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): for nom_sus in sus_n_ids: sid_list.append(make_sid(nom_sus, SetIds.HK)) pack_generic_hk_listening_cmds( - tc_queue=tc_queue, + q=q, proc_key=key, sid_list=sid_list, diag_list=diag_list, cfg=GenericHkListeningCfg.default(), ) - pack_acs_command(tc_queue=tc_queue, op_code="sus-off") - tc_queue.appendleft((QueueCommands.WAIT, 5.0)) - pack_sus_cmds(tc_queue=tc_queue, op_code="sus-red") + pack_acs_command(q=q, op_code="sus-off") + q.add_wait_seconds(5.0) + pack_sus_cmds(q=q, op_code="sus-red") diag_list = [ True, @@ -551,16 +532,16 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): for red_sus in sus_r_ids: sid_list.append(make_sid(red_sus, SetIds.HK)) pack_generic_hk_listening_cmds( - tc_queue=tc_queue, + q=q, proc_key=key, sid_list=sid_list, diag_list=diag_list, cfg=GenericHkListeningCfg.default(), ) - pack_acs_command(tc_queue=tc_queue, op_code="sus-off") - tc_queue.appendleft((QueueCommands.WAIT, 5.0)) - pack_sus_cmds(tc_queue=tc_queue, op_code="sus-d") + pack_acs_command(q=q, op_code="sus-off") + q.add_wait_seconds(5.0) + pack_sus_cmds(q=q, op_code="sus-d") # SUSs for nom_sus in sus_n_ids: @@ -582,36 +563,32 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): True, ] pack_generic_hk_listening_cmds( - tc_queue=tc_queue, + q=q, proc_key=key, sid_list=sid_list, diag_list=diag_list, cfg=GenericHkListeningCfg.default(), ) - pack_acs_command(tc_queue=tc_queue, op_code="sus-off") + pack_acs_command(q=q, op_code="sus-off") if op_code in OpCodes.STR_FT: key = KAI.STR_FT[0] - pack_star_tracker_commands( - object_id=oids.STAR_TRACKER_ID, tc_queue=tc_queue, op_code="2" - ) + pack_star_tracker_commands(object_id=oids.STAR_TRACKER_ID, q=q, op_code="2") # STR sid_list.append(make_sid(oids.STAR_TRACKER_ID, StrSetIds.TEMPERATURE)) diag_list = [False] pack_generic_hk_listening_cmds( - tc_queue=tc_queue, + q=q, proc_key=key, sid_list=sid_list, diag_list=diag_list, cfg=GenericHkListeningCfg.default(), ) - pack_star_tracker_commands( - object_id=oids.STAR_TRACKER_ID, tc_queue=tc_queue, op_code="3" - ) + pack_star_tracker_commands(object_id=oids.STAR_TRACKER_ID, q=q, op_code="3") if op_code in OpCodes.RW_FT_ONE_RW: key = KAI.RW_FT_ONE_RW[0] @@ -644,7 +621,7 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): False, ] # RW NORMAL - pack_rw_ass_cmds(object_id=oids.RW_ASSEMBLY, tc_queue=tc_queue, op_code="nml") + pack_rw_ass_cmds(object_id=oids.RW_ASSEMBLY, q=q, op_code="nml") # RW HK für alle RWs nur einzeln for rw_dev in rw_pairs: @@ -652,14 +629,14 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): set_id = rw_dev[1] sid_list.append(make_sid(oid, set_id)) pack_generic_hk_listening_cmds( - tc_queue=tc_queue, + q=q, proc_key=key, sid_list=sid_list, diag_list=diag_list, cfg=GenericHkListeningCfg(mgt=False, one_rw=True, two_rws=False), ) # RW OFF - pack_rw_ass_cmds(object_id=oids.RW_ASSEMBLY, tc_queue=tc_queue, op_code="off") + pack_rw_ass_cmds(object_id=oids.RW_ASSEMBLY, q=q, op_code="off") # ass command with 2 rws to speed if op_code in OpCodes.RW_FT_TWO_RWS: @@ -701,7 +678,7 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): False, ] # RW NORMAL - pack_rw_ass_cmds(object_id=oids.RW_ASSEMBLY, tc_queue=tc_queue, op_code="nml") + pack_rw_ass_cmds(object_id=oids.RW_ASSEMBLY, q=q, op_code="nml") # RW for rw_dev in rw_pairs: @@ -709,190 +686,162 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): set_id = rw_dev[1] sid_list.append(make_sid(oid, set_id)) pack_generic_hk_listening_cmds( - tc_queue=tc_queue, + q=q, proc_key=key, sid_list=sid_list, diag_list=diag_list, cfg=GenericHkListeningCfg(mgt=False, one_rw=False, two_rws=True), ) # RW OFF - pack_rw_ass_cmds(object_id=oids.RW_ASSEMBLY, tc_queue=tc_queue, op_code="off") + pack_rw_ass_cmds(object_id=oids.RW_ASSEMBLY, q=q, op_code="off") def enable_listen_to_hk_for_x_seconds( - tc_queue: TcQueueT, + q: QueueHelper, diag: bool, device: str, sid: bytes, interval_seconds: float, ): - tc_queue.appendleft((QueueCommands.PRINT, f"Enabling periodic HK for {device}")) + q.add_log_cmd(f"Enabling periodic HK for {device}") cmd_tuple = enable_periodic_hk_command_with_interval( diag=diag, sid=sid, interval_seconds=interval_seconds, ssc=0 ) for cmd in cmd_tuple: - tc_queue.appendleft(cmd.pack_command_tuple()) + q.add_pus_tc(cmd) def gen_disable_listen_to_hk_for_x_seconds( - tc_queue: TcQueueT, + q: QueueHelper, diag: bool, device: str, sid: bytes, ) -> PusTelecommand: - tc_queue.appendleft((QueueCommands.PRINT, f"Disabling periodic HK for {device}")) - return disable_periodic_hk_command(diag=diag, sid=sid, ssc=0) + q.add_log_cmd(f"Disabling periodic HK for {device}") + return disable_periodic_hk_command(diag=diag, sid=sid) def activate_mgts_alternately( - tc_queue: TcQueueT, + q: QueueHelper, ): - command = pack_dipole_command( - object_id=oids.IMTQ_HANDLER_ID, - x_dipole=2000, - y_dipole=0, - z_dipole=0, - duration=30000, + q.add_pus_tc( + pack_dipole_command( + object_id=oids.IMTQ_HANDLER_ID, + x_dipole=2000, + y_dipole=0, + z_dipole=0, + duration=30000, + ) ) - tc_queue.appendleft(command.pack_command_tuple()) - tc_queue.appendleft((QueueCommands.WAIT, 40.0)) + q.add_wait_seconds(40.0) - command = pack_dipole_command( - object_id=oids.IMTQ_HANDLER_ID, - x_dipole=-2000, - y_dipole=0, - z_dipole=0, - duration=30000, + q.add_pus_tc( + pack_dipole_command( + object_id=oids.IMTQ_HANDLER_ID, + x_dipole=-2000, + y_dipole=0, + z_dipole=0, + duration=30000, + ) ) - tc_queue.appendleft(command.pack_command_tuple()) - tc_queue.appendleft((QueueCommands.WAIT, 40.0)) + q.add_wait_seconds(40.0) - command = pack_dipole_command( - object_id=oids.IMTQ_HANDLER_ID, - x_dipole=0, - y_dipole=2000, - z_dipole=0, - duration=30000, + q.add_pus_tc( + pack_dipole_command( + object_id=oids.IMTQ_HANDLER_ID, + x_dipole=0, + y_dipole=2000, + z_dipole=0, + duration=30000, + ) ) - tc_queue.appendleft(command.pack_command_tuple()) - tc_queue.appendleft((QueueCommands.WAIT, 40.0)) + q.add_wait_seconds(40.0) - command = pack_dipole_command( - object_id=oids.IMTQ_HANDLER_ID, - x_dipole=0, - y_dipole=-2000, - z_dipole=0, - duration=30000, + q.add_pus_tc( + pack_dipole_command( + object_id=oids.IMTQ_HANDLER_ID, + x_dipole=0, + y_dipole=-2000, + z_dipole=0, + duration=30000, + ) ) - tc_queue.appendleft(command.pack_command_tuple()) - tc_queue.appendleft((QueueCommands.WAIT, 40.0)) + q.add_wait_seconds(40.0) - command = pack_dipole_command( - object_id=oids.IMTQ_HANDLER_ID, - x_dipole=0, - y_dipole=0, - z_dipole=2000, - duration=30000, + q.add_pus_tc( + pack_dipole_command( + object_id=oids.IMTQ_HANDLER_ID, + x_dipole=0, + y_dipole=0, + z_dipole=2000, + duration=30000, + ) ) - tc_queue.appendleft(command.pack_command_tuple()) - tc_queue.appendleft((QueueCommands.WAIT, 40.0)) + q.add_wait_seconds(40.0) - command = pack_dipole_command( - object_id=oids.IMTQ_HANDLER_ID, - x_dipole=0, - y_dipole=0, - z_dipole=-2000, - duration=30000, + q.add_pus_tc( + pack_dipole_command( + object_id=oids.IMTQ_HANDLER_ID, + x_dipole=0, + y_dipole=0, + z_dipole=-2000, + duration=30000, + ) ) - tc_queue.appendleft(command.pack_command_tuple()) - tc_queue.appendleft((QueueCommands.WAIT, 40.0)) + q.add_wait_seconds(40.0) -def rw_speed_cmd_single( - tc_queue: TcQueueT, oid: bytes, init_ssc: int, speed: int, ramp_time: int -) -> int: - command = pack_set_speed_command( - object_id=oid, speed=speed, ramp_time_ms=ramp_time, ssc=init_ssc +def rw_speed_cmd_single(q: QueueHelper, oid: bytes, speed: int, ramp_time: int): + q.add_pus_tc( + pack_set_speed_command(object_id=oid, speed=speed, ramp_time_ms=ramp_time) ) - init_ssc += 1 - tc_queue.appendleft(command.pack_command_tuple()) - tc_queue.appendleft((QueueCommands.WAIT, 70.0)) - command = pack_set_speed_command( - object_id=oid, speed=0, ramp_time_ms=ramp_time, ssc=init_ssc - ) - tc_queue.appendleft(command.pack_command_tuple()) - return init_ssc + 1 + q.add_wait(timedelta(seconds=70.0)) + q.add_pus_tc(pack_set_speed_command(object_id=oid, speed=0, ramp_time_ms=ramp_time)) def rw_speed_up_cmd_consec( - tc_queue: TcQueueT, obids: List[bytes], init_ssc: int, speed: int, ramp_time: int -) -> int: + q: QueueHelper, obids: List[bytes], speed: int, ramp_time: int +): for oid in obids: - command = pack_set_speed_command( - object_id=oid, speed=speed, ramp_time_ms=ramp_time, ssc=init_ssc + q.add_pus_tc( + pack_set_speed_command(object_id=oid, speed=speed, ramp_time_ms=ramp_time) ) - tc_queue.appendleft(command.pack_command_tuple()) - init_ssc += 1 - return init_ssc -def rw_speed_down_cmd_consec( - tc_queue: TcQueueT, obids: List[bytes], init_ssc: int, ramp_time: int -) -> int: +def rw_speed_down_cmd_consec(q: QueueHelper, obids: List[bytes], ramp_time: int): for oid in obids: - command = pack_set_speed_command( - object_id=oid, speed=0, ramp_time_ms=ramp_time, ssc=init_ssc + q.add_pus_tc( + pack_set_speed_command(object_id=oid, speed=0, ramp_time_ms=ramp_time) ) - tc_queue.appendleft(command.pack_command_tuple()) - init_ssc += 1 - return init_ssc def activate_all_rws_in_sequence( - tc_queue: TcQueueT, init_ssc: int, test_speed: int, test_ramp_time: int -) -> int: + q: QueueHelper, init_ssc: int, test_speed: int, test_ramp_time: int +): new_ssc = init_ssc # RW1 speed cmd - tc_queue.appendleft((QueueCommands.WAIT, 58)) - new_ssc = rw_speed_cmd_single( - tc_queue, oids.RW1_ID, new_ssc, test_speed, test_ramp_time - ) - tc_queue.appendleft((QueueCommands.WAIT, 30)) - new_ssc = rw_speed_cmd_single( - tc_queue, oids.RW2_ID, new_ssc, test_speed, test_ramp_time - ) - tc_queue.appendleft((QueueCommands.WAIT, 30)) - new_ssc = rw_speed_cmd_single( - tc_queue, oids.RW3_ID, new_ssc, test_speed, test_ramp_time - ) - tc_queue.appendleft((QueueCommands.WAIT, 30)) - new_ssc = rw_speed_cmd_single( - tc_queue, oids.RW4_ID, new_ssc, test_speed, test_ramp_time - ) - tc_queue.appendleft((QueueCommands.WAIT, 30)) + q.add_wait(timedelta(seconds=58.0)) + rw_speed_cmd_single(q, oids.RW1_ID, test_speed, test_ramp_time) + q.add_wait_seconds(30.0) + rw_speed_cmd_single(q, oids.RW2_ID, test_speed, test_ramp_time) + q.add_wait_seconds(30.0) + rw_speed_cmd_single(q, oids.RW3_ID, test_speed, test_ramp_time) + q.add_wait_seconds(30.0) + rw_speed_cmd_single(q, oids.RW4_ID, test_speed, test_ramp_time) + q.add_wait_seconds(30.0) return new_ssc -def activate_all_rws_two_consecutively(tc_queue: TcQueueT, init_ssc: int) -> int: - new_ssc = init_ssc +def activate_all_rws_two_consecutively(q: QueueHelper): # RW1+3 speed cmd - tc_queue.appendleft((QueueCommands.WAIT, 5.0)) - new_ssc = rw_speed_up_cmd_consec( - tc_queue, [oids.RW1_ID, oids.RW3_ID], new_ssc, -20000, 10000 - ) - tc_queue.appendleft((QueueCommands.WAIT, 70.0)) - new_ssc = rw_speed_down_cmd_consec( - tc_queue, [oids.RW1_ID, oids.RW3_ID], new_ssc, 10000 - ) - tc_queue.appendleft((QueueCommands.WAIT, 15.0)) + q.add_wait_seconds(5.0) + rw_speed_up_cmd_consec(q, [oids.RW1_ID, oids.RW3_ID], -20000, 10000) + q.add_wait_seconds(70.0) + rw_speed_down_cmd_consec(q, [oids.RW1_ID, oids.RW3_ID], 10000) + q.add_wait_seconds(15.0) # RW2+4 speed cmd - new_ssc = rw_speed_up_cmd_consec( - tc_queue, [oids.RW2_ID, oids.RW4_ID], new_ssc, -20000, 10000 - ) - tc_queue.appendleft((QueueCommands.WAIT, 70.0)) - new_ssc = rw_speed_down_cmd_consec( - tc_queue, [oids.RW2_ID, oids.RW4_ID], new_ssc, 10000 - ) - tc_queue.appendleft((QueueCommands.WAIT, 15.0)) - return new_ssc + rw_speed_up_cmd_consec(q, [oids.RW2_ID, oids.RW4_ID], -20000, 10000) + q.add_wait_seconds(70.0) + rw_speed_down_cmd_consec(q, [oids.RW2_ID, oids.RW4_ID], 10000) + q.add_wait_seconds(15.0) diff --git a/pus_tc/system/tcs.py b/pus_tc/system/tcs.py index 47ab31d..b8e8df5 100644 --- a/pus_tc/system/tcs.py +++ b/pus_tc/system/tcs.py @@ -1,4 +1,4 @@ -from tmtccmd.tc.definitions import TcQueueT, QueueCommands +from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_200_fsfw_modes import Modes from .common import command_mode @@ -15,13 +15,13 @@ class Info: TCS_BOARD_ASS_OFF = "Switching TCS board assembly off" -def pack_tcs_sys_commands(tc_queue: TcQueueT, op_code: str): +def pack_tcs_sys_commands(q: QueueHelper, op_code: str): if op_code in OpCodes.TCS_BOARD_ASS_NORMAL: command_mode( object_id=TCS_BOARD_ASS_ID, mode=Modes.NORMAL, submode=0, - tc_queue=tc_queue, + q=q, info=Info.TCS_BOARD_ASS_NORMAL, ) if op_code in OpCodes.TCS_BOARD_ASS_OFF: @@ -29,6 +29,6 @@ def pack_tcs_sys_commands(tc_queue: TcQueueT, op_code: str): object_id=TCS_BOARD_ASS_ID, mode=Modes.OFF, submode=0, - tc_queue=tc_queue, + q=q, info=Info.TCS_BOARD_ASS_OFF, ) diff --git a/pus_tc/system/time.py b/pus_tc/system/time.py index c66bc17..6663595 100644 --- a/pus_tc/system/time.py +++ b/pus_tc/system/time.py @@ -1,10 +1,9 @@ from datetime import datetime from spacepackets.ecss import PusTelecommand -from tmtccmd.config import QueueCommands -from tmtccmd.tc.definitions import TcQueueT from tmtccmd.logging import get_console_logger +from tmtccmd.tc import QueueHelper LOGGER = get_console_logger() @@ -17,12 +16,9 @@ class Info: SET_CURRENT_TIME = "Setting current time in ASCII format" -def pack_set_current_time_ascii_command(tc_queue: TcQueueT, ssc: int): +def pack_set_current_time_ascii_command(q: QueueHelper): time_test_current_time = datetime.utcnow().isoformat() + "Z" + "\0" current_time_ascii = time_test_current_time.encode("ascii") LOGGER.info(f"Current time in ASCII format: {current_time_ascii}") - tc_queue.appendleft((QueueCommands.PRINT, Info.SET_CURRENT_TIME)) - command = PusTelecommand( - service=9, subservice=128, ssc=ssc, app_data=current_time_ascii - ) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_log_cmd(Info.SET_CURRENT_TIME) + q.add_pus_tc(PusTelecommand(service=9, subservice=128, app_data=current_time_ascii)) diff --git a/tmtcc.py b/tmtcc.py index f7adf36..188429f 100644 --- a/tmtcc.py +++ b/tmtcc.py @@ -7,14 +7,22 @@ from tmtccmd.config.definitions import CoreModeList from spacepackets.ecss import PusVerificator from tmtccmd import get_console_logger, TcHandlerBase +from tmtccmd.com_if import ComInterface from tmtccmd.config.globals import update_global, CoreGlobalIds from deps.tmtccmd.tmtccmd.logging.pus import RawTmtcTimedLogWrapper from deps.tmtccmd.tmtccmd.pus import VerificationWrapper from deps.tmtccmd.tmtccmd.tm import SpecificApidHandlerBase, GenericApidHandlerBase from deps.tmtccmd.tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter +from tmtccmd.logging import get_current_time_string from tmtccmd.pus import FileSeqCountProvider -from tmtccmd.tc import ProcedureHelper, FeedWrapper, TcProcedureType +from tmtccmd.tc import ( + ProcedureHelper, + FeedWrapper, + TcProcedureType, + QueueEntryHelper, + TcQueueEntryType, +) try: import spacepackets @@ -48,8 +56,7 @@ from config import __version__ from config.definitions import PUS_APID from config.hook_implementations import EiveHookObject from pus_tm.factory_hook import pus_factory_hook -from pus_tc.procedure_packer import pre_tc_send_cb - +from pus_tc.procedure_packer import pre_tc_send_cb, handle_default_procedure LOGGER = get_console_logger() @@ -93,6 +100,29 @@ class TcHandler(TcHandlerBase): if info.proc_type == TcProcedureType.DEFAULT: handle_default_procedure(info.to_def_procedure(), wrapper) + def send_cb(self, entry_helper: QueueEntryHelper, com_if: ComInterface): + if entry_helper.is_tc: + if entry_helper.entry_type == TcQueueEntryType.PUS_TC: + pus_tc_wrapper = entry_helper.to_pus_tc_entry() + pus_tc_wrapper.pus_tc.seq_count = ( + self.seq_count_provider.next_seq_count() + ) + pus_tc_wrapper.pus_tc.apid = PUS_APID + # Add TC after Sequence Count stamping + self.pus_verificator.add_tc(pus_tc_wrapper.pus_tc) + raw_tc = pus_tc_wrapper.pus_tc.pack() + self.raw_logger.log_tc(pus_tc_wrapper.pus_tc) + tc_info_string = f"Sent {pus_tc_wrapper.pus_tc}" + LOGGER.info(tc_info_string) + self.file_logger.info( + f"{get_current_time_string(True)}: {tc_info_string}" + ) + com_if.send(raw_tc) + elif entry_helper.entry_type == TcQueueEntryType.LOG: + log_entry = entry_helper.to_log_entry() + LOGGER.info(log_entry.log_str) + self.file_logger.info(log_entry.log_str) + def tmtcc_pre_args() -> EiveHookObject: print(f"-- eive tmtc v{__version__} --")