From 2b89a7f2697a21e9fc9101ca320afb7bf49c8e9d Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 3 Feb 2022 16:02:55 +0100 Subject: [PATCH] extracted some command definitions --- .idea/runConfigurations/BPX.xml | 24 ++++ config/cmd_definitions.py | 213 +++++++++++++++++++++++++++++ config/hook_implementations.py | 228 +++++--------------------------- pus_tc/bpx_batt.py | 26 +++- pus_tc/ccsds_handler.py | 52 +++++--- pus_tc/star_tracker.py | 11 +- pus_tm/hk_handling.py | 40 +++--- pus_tm/service_8_hook.py | 12 +- tmtccmd | 2 +- 9 files changed, 367 insertions(+), 241 deletions(-) create mode 100644 .idea/runConfigurations/BPX.xml create mode 100644 config/cmd_definitions.py diff --git a/.idea/runConfigurations/BPX.xml b/.idea/runConfigurations/BPX.xml new file mode 100644 index 0000000..a1fc5d3 --- /dev/null +++ b/.idea/runConfigurations/BPX.xml @@ -0,0 +1,24 @@ + + + + + \ No newline at end of file diff --git a/config/cmd_definitions.py b/config/cmd_definitions.py new file mode 100644 index 0000000..7ffac58 --- /dev/null +++ b/config/cmd_definitions.py @@ -0,0 +1,213 @@ +from tmtccmd.config import ( + add_op_code_entry, + add_service_op_code_entry, + ServiceOpCodeDictT, + OpCodeDictKeys, +) +from config.definitions import CustomServiceList +from pus_tc.bpx_batt import BpxOpCodes + + +def add_bpx_cmd_definitions(cmd_dict: ServiceOpCodeDictT): + op_code_dict = dict() + add_op_code_entry( + op_code_dict=op_code_dict, keys=BpxOpCodes.HK, info="Request BPX HK" + ) + add_op_code_entry( + op_code_dict=op_code_dict, keys=BpxOpCodes.RST_BOOT_CNT, info="Reset Boot Count" + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=BpxOpCodes.REQUEST_CFG, + info="Request Configuration Struct (Step 1)", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=BpxOpCodes.REQUEST_CFG_HK, + info="Request Configuration Struct HK (Step 2)", + ) + add_op_code_entry( + op_code_dict=op_code_dict, keys=BpxOpCodes.REBOOT, info="Reboot Command" + ) + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + name=CustomServiceList.BPX_BATTERY.value, + info="BPX Battery Handler", + op_code_entry=op_code_dict, + ) + + +def add_core_controller_definitions(cmd_dict: ServiceOpCodeDictT): + od = dict() + add_op_code_entry(op_code_dict=od, keys=["0", "reboot"], info="Reboot with Prompt") + add_op_code_entry(op_code_dict=od, keys=["1", "reboot_0_0"], info="Reboot 0 0") + add_op_code_entry(op_code_dict=od, keys=["2", "reboot_0_1"], info="Reboot 0 1") + add_op_code_entry(op_code_dict=od, keys=["3", "reboot_1_0"], info="Reboot 1 0") + add_op_code_entry(op_code_dict=od, keys=["4", "reboot_1_1"], info="Reboot 1 1") + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + name=CustomServiceList.CORE.value, + info="Reboot Self", + op_code_entry=od, + ) + + +def add_pcdu_cmds(cmd_dict: ServiceOpCodeDictT): + from pus_tc.p60dock import P60OpCodes, GomspaceOpCodes + from pus_tc.pdu1 import Pdu1OpCodes + from pus_tc.pdu2 import Pdu2OpCodes + + op_code_dict = dict() + add_op_code_entry(op_code_dict=op_code_dict, keys="0", info="P60 Tests") + add_op_code_entry( + op_code_dict=op_code_dict, + keys=P60OpCodes.STACK_3V3_ON.value, + info="P60 Dock: Turn stack 3V3 on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=P60OpCodes.STACK_3V3_OFF.value, + info="P60 Dock: Turn stack 3V3 off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=P60OpCodes.STACK_5V_ON.value, + info="P60 Dock: Turn stack 5V on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=P60OpCodes.STACK_5V_OFF.value, + info="P60 Dock: Turn stack 5V off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.PRINT_SWITCH_V_I.value, + info="P60 Dock: Print Switches, Voltages, Currents", + ) + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + name=CustomServiceList.P60DOCK.value, + info="P60 Device", + op_code_entry=op_code_dict, + ) + + op_code_dict_srv_pdu1 = { + "0": ("PDU1 Tests", {OpCodeDictKeys.TIMEOUT: 2.0}), + Pdu1OpCodes.TCS_BOARD_ON.value: ( + "PDU1: Turn TCS board on", + {OpCodeDictKeys.TIMEOUT: 2.0}, + ), + Pdu1OpCodes.TCS_BOARD_OFF.value: ( + "PDU1: Turn TCS board off", + {OpCodeDictKeys.TIMEOUT: 2.0}, + ), + Pdu1OpCodes.STAR_TRACKER_ON.value: ( + "PDU1: Turn star tracker on", + {OpCodeDictKeys.TIMEOUT: 2.0}, + ), + Pdu1OpCodes.STAR_TRACKER_OFF.value: ( + "PDU1: Turn star tracker off", + {OpCodeDictKeys.TIMEOUT: 2.0}, + ), + Pdu1OpCodes.SUS_NOMINAL_ON.value: ( + "PDU1: Turn SUS nominal on", + {OpCodeDictKeys.TIMEOUT: 2.0}, + ), + Pdu1OpCodes.SUS_NOMINAL_OFF.value: ( + "PDU1: Turn SUS nominal off", + {OpCodeDictKeys.TIMEOUT: 2.0}, + ), + Pdu1OpCodes.ACS_A_SIDE_ON.value: ( + "PDU1: Turn ACS Side A on", + {OpCodeDictKeys.TIMEOUT: 2.0}, + ), + Pdu1OpCodes.ACS_A_SIDE_OFF.value: ( + "PDU1: Turn ACS Side A off", + {OpCodeDictKeys.TIMEOUT: 2.0}, + ), + Pdu1OpCodes.SYRLINKS_ON.value: ( + "PDU1: Turn Syrlinks on", + {OpCodeDictKeys.TIMEOUT: 2.0}, + ), + Pdu1OpCodes.SYRLINKS_OFF.value: ( + "PDU1: Turn Syrlinks off", + {OpCodeDictKeys.TIMEOUT: 2.0}, + ), + Pdu1OpCodes.MGT_ON.value: ( + "PDU1: Turn MGT on", + {OpCodeDictKeys.TIMEOUT: 2.0}, + ), + Pdu1OpCodes.MGT_OFF.value: ( + "PDU1: Turn MGT off", + {OpCodeDictKeys.TIMEOUT: 2.0}, + ), + GomspaceOpCodes.PRINT_SWITCH_V_I.value: ( + "PDU1: Print Switches, Voltages, Currents", + {OpCodeDictKeys.TIMEOUT: 2.0}, + ), + } + service_pdu1_tuple = ("PDU1 Device", op_code_dict_srv_pdu1) + cmd_dict[CustomServiceList.PDU1.value] = service_pdu1_tuple + + op_code_dict = dict() + add_op_code_entry( + op_code_dict=op_code_dict, + keys="0", + info="PDU2 Tests", + options={OpCodeDictKeys.TIMEOUT: 2.0}, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.ACS_SIDE_B_ON.value, + info="PDU2: Turn ACS Side B on", + options={OpCodeDictKeys.TIMEOUT: 2.0}, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.ACS_SIDE_B_OFF.value, + info="PDU2: Turn ACS Side B off", + options={OpCodeDictKeys.TIMEOUT: 2.0}, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.SUS_REDUNDANT_ON.value, + info="PDU2: Turn SUS redundant on", + options={OpCodeDictKeys.TIMEOUT: 2.0}, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.SUS_REDUNDANT_OFF.value, + info="PDU2: Turn SUS redundant off", + options={OpCodeDictKeys.TIMEOUT: 2.0}, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.RW_ON.value, + info="PDU2: Turn reaction wheels on", + options={OpCodeDictKeys.TIMEOUT: 2.0}, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.RW_OFF.value, + info="PDU2: Turn reaction wheels off", + options={OpCodeDictKeys.TIMEOUT: 2.0}, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.Q7S_OFF.value, + info="Q7S Off", + options={OpCodeDictKeys.TIMEOUT: 2.0}, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.PRINT_SWITCH_V_I.value, + info="PDU2: Print Switches, Voltages, Currents", + options={OpCodeDictKeys.TIMEOUT: 2.0}, + ) + + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + name="pdu2", + info="PDU2 Device", + op_code_entry=op_code_dict, + ) diff --git a/config/hook_implementations.py b/config/hook_implementations.py index 872ea4e..9fa73ef 100644 --- a/config/hook_implementations.py +++ b/config/hook_implementations.py @@ -1,18 +1,18 @@ import argparse from typing import Union, Dict, Tuple -from tmtccmd.config.definitions import ServiceOpCodeDictT, HkReplyUnpacked, DataReplyUnpacked +from tmtccmd.config.definitions import ( + ServiceOpCodeDictT, + HkReplyUnpacked, + DataReplyUnpacked, +) from tmtccmd.tm.service_3_base import Service3Base from tmtccmd.tc.definitions import TcQueueT from tmtccmd.com_if.com_interface_base import CommunicationInterface from tmtccmd.core.backend import TmTcHandler from tmtccmd.config.hook import TmTcHookBase from tmtccmd.utility.tmtc_printer import TmTcPrinter -from tmtccmd.config.globals import ( - OpCodeDictKeys, - add_op_code_entry, - add_service_op_code_entry, -) +from tmtccmd.config.globals import OpCodeDictKeys from config.object_ids import RW1_ID from config.definitions import CustomServiceList @@ -81,6 +81,7 @@ class EiveHookObject(TmTcHookBase): object_id: bytes, action_id: int, custom_data: bytearray ) -> DataReplyUnpacked: from pus_tm.service_8_hook import user_analyze_service_8_data + return user_analyze_service_8_data( object_id=object_id, action_id=action_id, custom_data=custom_data ) @@ -90,6 +91,7 @@ class EiveHookObject(TmTcHookBase): object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base ) -> HkReplyUnpacked: from pus_tm.hk_handling import handle_user_hk_packet + return handle_user_hk_packet( object_id=object_id, set_id=set_id, @@ -108,22 +110,15 @@ class EiveHookObject(TmTcHookBase): def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT): - from pus_tc.pdu1 import Pdu1OpCodes - from pus_tc.pdu2 import Pdu2OpCodes - from pus_tc.p60dock import P60OpCodes - from gomspace.gomspace_common import GomspaceOpCodes + from config.cmd_definitions import ( + add_bpx_cmd_definitions, + add_core_controller_definitions, + add_pcdu_cmds, + ) from pus_tc.gps import GpsOpCodes - op_code_dict = { - "reboot": ("Reboot with Prompt", {OpCodeDictKeys.TIMEOUT: 2.0}), - "reboot_self": ("Reboot Self", {OpCodeDictKeys.TIMEOUT: 4.0}), - "reboot_0_0": ("Reboot 0 0", {OpCodeDictKeys.TIMEOUT: 4.0}), - "reboot_0_1": ("Reboot 0 1", {OpCodeDictKeys.TIMEOUT: 4.0}), - "reboot_1_0": ("Reboot 1 0", {OpCodeDictKeys.TIMEOUT: 4.0}), - "reboot_1_1": ("Reboot 1 1", {OpCodeDictKeys.TIMEOUT: 4.0}), - } - service_tuple = ("Core Controller", op_code_dict) - service_op_code_dict[CustomServiceList.CORE.value] = service_tuple + add_bpx_cmd_definitions(cmd_dict=service_op_code_dict) + add_core_controller_definitions(cmd_dict=service_op_code_dict) op_code_dict = { GpsOpCodes.RESET_GNSS.value: ("Reset GPS", {OpCodeDictKeys.TIMEOUT: 2.0}) @@ -147,170 +142,7 @@ def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT): service_tuple = ("TMP1075 2", op_code_dict) service_op_code_dict[CustomServiceList.TMP1075_2.value] = service_tuple - op_code_dict = dict() - add_op_code_entry( - op_code_dict=op_code_dict, - keys="0", - info="P60 Tests", - options={OpCodeDictKeys.TIMEOUT: 2.0}, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=P60OpCodes.STACK_3V3_ON.value, - info="P60 Dock: Turn stack 3V3 on", - options={OpCodeDictKeys.TIMEOUT: 2.0}, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=P60OpCodes.STACK_3V3_OFF.value, - info="P60 Dock: Turn stack 3V3 off", - options={OpCodeDictKeys.TIMEOUT: 2.0}, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=P60OpCodes.STACK_5V_ON.value, - info="P60 Dock: Turn stack 5V on", - options={OpCodeDictKeys.TIMEOUT: 2.0}, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=P60OpCodes.STACK_5V_OFF.value, - info="P60 Dock: Turn stack 5V off", - options={OpCodeDictKeys.TIMEOUT: 2.0}, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.PRINT_SWITCH_V_I.value, - info="P60 Dock: Print Switches, Voltages, Currents", - options={OpCodeDictKeys.TIMEOUT: 2.0}, - ) - add_service_op_code_entry( - srv_op_code_dict=service_op_code_dict, - name=CustomServiceList.P60DOCK.value, - info="P60 Device", - op_code_entry=op_code_dict, - ) - - op_code_dict_srv_pdu1 = { - "0": ("PDU1 Tests", {OpCodeDictKeys.TIMEOUT: 2.0}), - Pdu1OpCodes.TCS_BOARD_ON.value: ( - "PDU1: Turn TCS board on", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - Pdu1OpCodes.TCS_BOARD_OFF.value: ( - "PDU1: Turn TCS board off", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - Pdu1OpCodes.STAR_TRACKER_ON.value: ( - "PDU1: Turn star tracker on", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - Pdu1OpCodes.STAR_TRACKER_OFF.value: ( - "PDU1: Turn star tracker off", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - Pdu1OpCodes.SUS_NOMINAL_ON.value: ( - "PDU1: Turn SUS nominal on", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - Pdu1OpCodes.SUS_NOMINAL_OFF.value: ( - "PDU1: Turn SUS nominal off", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - Pdu1OpCodes.ACS_A_SIDE_ON.value: ( - "PDU1: Turn ACS Side A on", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - Pdu1OpCodes.ACS_A_SIDE_OFF.value: ( - "PDU1: Turn ACS Side A off", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - Pdu1OpCodes.SYRLINKS_ON.value: ( - "PDU1: Turn Syrlinks on", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - Pdu1OpCodes.SYRLINKS_OFF.value: ( - "PDU1: Turn Syrlinks off", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - Pdu1OpCodes.MGT_ON.value: ( - "PDU1: Turn MGT on", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - Pdu1OpCodes.MGT_OFF.value: ( - "PDU1: Turn MGT off", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - GomspaceOpCodes.PRINT_SWITCH_V_I.value: ( - "PDU1: Print Switches, Voltages, Currents", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - } - service_pdu1_tuple = ("PDU1 Device", op_code_dict_srv_pdu1) - - op_code_dict = dict() - add_op_code_entry( - op_code_dict=op_code_dict, - keys="0", - info="PDU2 Tests", - options={OpCodeDictKeys.TIMEOUT: 2.0}, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.ACS_SIDE_B_ON.value, - info="PDU2: Turn ACS Side B on", - options={OpCodeDictKeys.TIMEOUT: 2.0}, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.ACS_SIDE_B_OFF.value, - info="PDU2: Turn ACS Side B off", - options={OpCodeDictKeys.TIMEOUT: 2.0}, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.SUS_REDUNDANT_ON.value, - info="PDU2: Turn SUS redundant on", - options={OpCodeDictKeys.TIMEOUT: 2.0}, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.SUS_REDUNDANT_OFF.value, - info="PDU2: Turn SUS redundant off", - options={OpCodeDictKeys.TIMEOUT: 2.0}, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.RW_ON.value, - info="PDU2: Turn reaction wheels on", - options={OpCodeDictKeys.TIMEOUT: 2.0}, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.RW_OFF.value, - info="PDU2: Turn reaction wheels off", - options={OpCodeDictKeys.TIMEOUT: 2.0}, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.Q7S_OFF.value, - info="Q7S Off", - options={OpCodeDictKeys.TIMEOUT: 2.0}, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.PRINT_SWITCH_V_I.value, - info="PDU2: Print Switches, Voltages, Currents", - options={OpCodeDictKeys.TIMEOUT: 2.0}, - ) - - add_service_op_code_entry( - srv_op_code_dict=service_op_code_dict, - name="pdu2", - info="PDU2 Device", - op_code_entry=op_code_dict, - ) - # service_pdu2_tuple = ("PDU2 Device", op_code_dict_srv_pdu2) + add_pcdu_cmds(cmd_dict=service_op_code_dict) op_code_dict_srv_heater = { "0": ("Heater Tests", {OpCodeDictKeys.TIMEOUT: 2.0}), @@ -536,7 +368,10 @@ def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT): "48": ("Star Tracker: Request camera parameter", {OpCodeDictKeys.TIMEOUT: 2.0}), "49": ("Star Tracker: Request limits", {OpCodeDictKeys.TIMEOUT: 2.0}), "50": ("Star Tracker: Request blob parameters", {OpCodeDictKeys.TIMEOUT: 2.0}), - "51": ("Star Tracker: Set image processor parameters", {OpCodeDictKeys.TIMEOUT: 2.0}), + "51": ( + "Star Tracker: Set image processor parameters", + {OpCodeDictKeys.TIMEOUT: 2.0}, + ), } service_star_tracker_tuple = ("Star tracker", op_code_dict_srv_star_tracker) @@ -546,10 +381,22 @@ def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT): "2": ("CCSDS Handler: Enable transmitter", {OpCodeDictKeys.TIMEOUT: 2.0}), "3": ("CCSDS Handler: Disable transmitter", {OpCodeDictKeys.TIMEOUT: 2.0}), "4": ("CCSDS Handler: Set arbitrary bitrate", {OpCodeDictKeys.TIMEOUT: 2.0}), - "5": ("CCSDS Handler: Enable tx clock manipulator", {OpCodeDictKeys.TIMEOUT: 2.0}), - "6": ("CCSDS Handler: Disable tx clock manipulator", {OpCodeDictKeys.TIMEOUT: 2.0}), - "7": ("CCSDS Handler: Update tx data on rising edge", {OpCodeDictKeys.TIMEOUT: 2.0}), - "8": ("CCSDS Handler: Update tx data on falling edge", {OpCodeDictKeys.TIMEOUT: 2.0}), + "5": ( + "CCSDS Handler: Enable tx clock manipulator", + {OpCodeDictKeys.TIMEOUT: 2.0}, + ), + "6": ( + "CCSDS Handler: Disable tx clock manipulator", + {OpCodeDictKeys.TIMEOUT: 2.0}, + ), + "7": ( + "CCSDS Handler: Update tx data on rising edge", + {OpCodeDictKeys.TIMEOUT: 2.0}, + ), + "8": ( + "CCSDS Handler: Update tx data on falling edge", + {OpCodeDictKeys.TIMEOUT: 2.0}, + ), } service_ccsds_handler_tuple = ("CCSDS Handler", op_code_dict_srv_ccsds_handler) @@ -578,9 +425,6 @@ def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT): "Syrlinks Handler", op_code_dict_srv_syrlinks_handler, ) - - service_op_code_dict[CustomServiceList.PDU1.value] = service_pdu1_tuple - # service_op_code_dict[CustomServiceList.PDU2.value] = service_pdu2_tuple service_op_code_dict[CustomServiceList.HEATER.value] = service_heater_tuple service_op_code_dict[CustomServiceList.IMTQ.value] = service_imtq_tuple service_op_code_dict[CustomServiceList.REACTION_WHEEL_1.value] = service_rw_tuple diff --git a/pus_tc/bpx_batt.py b/pus_tc/bpx_batt.py index a69978a..a9f6311 100644 --- a/pus_tc/bpx_batt.py +++ b/pus_tc/bpx_batt.py @@ -16,27 +16,41 @@ class BpxActionIds: GET_CFG = 5 +class BpxOpCodes: + HK = ["0", "hk"] + RST_BOOT_CNT = ["1", "rst_boot_cnt"] + REQUEST_CFG = ["2", "cfg"] + REQUEST_CFG_HK = ["3", "cfg_hk"] + REBOOT = ["4", "reboot"] + + def pack_bpx_commands(tc_queue: TcQueueT, op_code: str): - if op_code in ["0", "hk"]: + if op_code in BpxOpCodes.HK: tc_queue.appendleft((QueueCommands.PRINT, "Requesting BPX battery HK set")) sid = make_sid(object_id=BPX_HANDLER_ID, set_id=BpxSetIds.GET_HK_SET) cmd = generate_one_hk_command(sid=sid, ssc=0) tc_queue.appendleft(cmd.pack_command_tuple()) - if op_code in ["1", "rst_boot_cnt"]: + if op_code in BpxOpCodes.RST_BOOT_CNT: tc_queue.appendleft((QueueCommands.PRINT, "Resetting reboot counters")) cmd = generate_action_command( object_id=BPX_HANDLER_ID, action_id=BpxActionIds.RESET_COUNTERS ) tc_queue.appendleft(cmd.pack_command_tuple()) - if op_code in ["2", "cfg"]: - tc_queue.appendleft((QueueCommands.PRINT, "Resetting reboot counters")) + if op_code in BpxOpCodes.REQUEST_CFG: + tc_queue.appendleft((QueueCommands.PRINT, "Requesting configuration struct")) cmd = generate_action_command( object_id=BPX_HANDLER_ID, action_id=BpxActionIds.GET_CFG ) tc_queue.appendleft(cmd.pack_command_tuple()) - if op_code in ["3", "cfg_hk"]: - tc_queue.appendleft((QueueCommands.PRINT, "Requesting BPX Configuration Struct")) + if op_code in BpxOpCodes.REQUEST_CFG_HK: + tc_queue.appendleft((QueueCommands.PRINT, "Requesting configuration struct HK")) sid = make_sid(object_id=BPX_HANDLER_ID, set_id=BpxSetIds.GET_CFG_SET) cmd = generate_one_hk_command(sid=sid, ssc=0) tc_queue.appendleft(cmd.pack_command_tuple()) + if op_code in BpxOpCodes.REBOOT: + tc_queue.appendleft((QueueCommands.PRINT, "Rebooting BPX battery")) + cmd = generate_action_command( + object_id=BPX_HANDLER_ID, action_id=BpxActionIds.REBOOT + ) + tc_queue.appendleft(cmd.pack_command_tuple()) pass diff --git a/pus_tc/ccsds_handler.py b/pus_tc/ccsds_handler.py index acf004f..a7e400d 100644 --- a/pus_tc/ccsds_handler.py +++ b/pus_tc/ccsds_handler.py @@ -43,22 +43,26 @@ def pack_ccsds_handler_test( ) if op_code == "0": tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Set low rate")) - command = object_id + struct.pack('!I', CommandIds.SET_LOW_RATE) + command = object_id + struct.pack("!I", CommandIds.SET_LOW_RATE) command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if op_code == "1": tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Set high rate")) - command = object_id + struct.pack('!I', CommandIds.SET_HIGH_RATE) + command = object_id + struct.pack("!I", CommandIds.SET_HIGH_RATE) command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if op_code == "2": - tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Enables the transmitter")) - command = object_id + struct.pack('!I', CommandIds.EN_TRANSMITTER) + tc_queue.appendleft( + (QueueCommands.PRINT, "CCSDS Handler: Enables the transmitter") + ) + command = object_id + struct.pack("!I", CommandIds.EN_TRANSMITTER) command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if op_code == "3": - tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Disables the transmitter")) - command = object_id + struct.pack('!I', CommandIds.DIS_TRANSMITTER) + tc_queue.appendleft( + (QueueCommands.PRINT, "CCSDS Handler: Disables the transmitter") + ) + command = object_id + struct.pack("!I", CommandIds.DIS_TRANSMITTER) command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if op_code == "4": @@ -66,27 +70,45 @@ def pack_ccsds_handler_test( (QueueCommands.PRINT, "CCSDS Handler: Set arbitrary bitrate") ) bitrate = int(input("Specify bit rate (bps): ")) - command = object_id + struct.pack('!I', CommandIds.ARBITRARY_BITRATE) + struct.pack('!I', bitrate) + command = ( + object_id + + struct.pack("!I", CommandIds.ARBITRARY_BITRATE) + + struct.pack("!I", bitrate) + ) command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if op_code == "5": - tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Enable tx clock manipulator")) - command = object_id + struct.pack('!I', CommandIds.ENABLE_TX_CLK_MANIPULATOR) + tc_queue.appendleft( + (QueueCommands.PRINT, "CCSDS Handler: Enable tx clock manipulator") + ) + command = object_id + struct.pack("!I", CommandIds.ENABLE_TX_CLK_MANIPULATOR) command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if op_code == "6": - tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Disable tx clock manipulator")) - command = object_id + struct.pack('!I', CommandIds.DISABLE_TX_CLK_MANIPULATOR) + tc_queue.appendleft( + (QueueCommands.PRINT, "CCSDS Handler: Disable tx clock manipulator") + ) + command = object_id + struct.pack("!I", CommandIds.DISABLE_TX_CLK_MANIPULATOR) command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if op_code == "7": - tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Update tx data on rising edge of tx clock")) - command = object_id + struct.pack('!I', CommandIds.UPDATE_ON_RISING_EDGE) + tc_queue.appendleft( + ( + QueueCommands.PRINT, + "CCSDS Handler: Update tx data on rising edge of tx clock", + ) + ) + command = object_id + struct.pack("!I", CommandIds.UPDATE_ON_RISING_EDGE) command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if op_code == "8": - tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Update tx data on falling edge of tx clock")) - command = object_id + struct.pack('!I', CommandIds.UPDATE_ON_FALLING_EDGE) + tc_queue.appendleft( + ( + QueueCommands.PRINT, + "CCSDS Handler: Update tx data on falling edge of tx clock", + ) + ) + command = object_id + struct.pack("!I", CommandIds.UPDATE_ON_FALLING_EDGE) command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) diff --git a/pus_tc/star_tracker.py b/pus_tc/star_tracker.py index 8112271..148d4a6 100644 --- a/pus_tc/star_tracker.py +++ b/pus_tc/star_tracker.py @@ -501,9 +501,14 @@ def pack_star_tracker_commands( command = PusTelecommand(service=8, subservice=128, ssc=73, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if op_code == "51": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Set image processor parameters")) - command = object_id + struct.pack('!I', StarTrackerActionIds.IMAGE_PROCESSOR) + \ - bytearray(ImagePathDefs.jsonFile, 'utf-8') + tc_queue.appendleft( + (QueueCommands.PRINT, "Star tracker: Set image processor parameters") + ) + command = ( + object_id + + struct.pack("!I", StarTrackerActionIds.IMAGE_PROCESSOR) + + bytearray(ImagePathDefs.jsonFile, "utf-8") + ) command = PusTelecommand(service=8, subservice=128, ssc=70, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) diff --git a/pus_tm/hk_handling.py b/pus_tm/hk_handling.py index cf78f16..311a7eb 100644 --- a/pus_tm/hk_handling.py +++ b/pus_tm/hk_handling.py @@ -14,7 +14,7 @@ from config.object_ids import ( IMTQ_HANDLER_ID, GPS_HANDLER_0_ID, GPS_HANDLER_1_ID, - BPX_HANDLER_ID + BPX_HANDLER_ID, ) LOGGER = get_console_logger() @@ -245,7 +245,7 @@ def handle_gps_data(hk_data: bytearray) -> HkReplyUnpacked: "Fix Mode", "Sats in Use", "Date", - "Unix Seconds" + "Unix Seconds", ] latitude = struct.unpack("!d", hk_data[0:8])[0] longitude = struct.unpack("!d", hk_data[8:16])[0] @@ -267,7 +267,7 @@ def handle_gps_data(hk_data: bytearray) -> HkReplyUnpacked: fix_mode, sat_in_use, date_string, - unix_seconds + unix_seconds, ] var_index += 13 reply.num_of_vars = var_index @@ -290,15 +290,15 @@ def handle_bpx_hk_data(hk_data: bytes, set_id: int) -> HkReplyUnpacked: LOGGER.info(f"Received BPX data, HK data length {len(hk_data)}") reply = HkReplyUnpacked() if set_id == BpxSetIds.GET_HK_SET: - charge_current = struct.unpack('!H', hk_data[0:2])[0] - discharge_current = struct.unpack('!H', hk_data[2:4])[0] - heater_current = struct.unpack('!H', hk_data[4:6])[0] - batt_voltage = struct.unpack('!H', hk_data[6:8])[0] - batt_temp_1 = struct.unpack('!h', hk_data[8:10])[0] - batt_temp_2 = struct.unpack('!h', hk_data[10:12])[0] - batt_temp_3 = struct.unpack('!h', hk_data[12:14])[0] - batt_temp_4 = struct.unpack('!h', hk_data[14:16])[0] - reboot_cntr = struct.unpack('!I', hk_data[16:20])[0] + charge_current = struct.unpack("!H", hk_data[0:2])[0] + discharge_current = struct.unpack("!H", hk_data[2:4])[0] + heater_current = struct.unpack("!H", hk_data[4:6])[0] + batt_voltage = struct.unpack("!H", hk_data[6:8])[0] + batt_temp_1 = struct.unpack("!h", hk_data[8:10])[0] + batt_temp_2 = struct.unpack("!h", hk_data[10:12])[0] + batt_temp_3 = struct.unpack("!h", hk_data[12:14])[0] + batt_temp_4 = struct.unpack("!h", hk_data[14:16])[0] + reboot_cntr = struct.unpack("!I", hk_data[16:20])[0] boot_cause = hk_data[20] reply.header_list = [ "Charge Current", @@ -310,7 +310,7 @@ def handle_bpx_hk_data(hk_data: bytes, set_id: int) -> HkReplyUnpacked: "Batt Temp 3", "Batt Temp 4", "Reboot Counter", - "Boot Cause" + "Boot Cause", ] reply.content_list = [ charge_current, @@ -322,22 +322,18 @@ def handle_bpx_hk_data(hk_data: bytes, set_id: int) -> HkReplyUnpacked: batt_temp_3, batt_temp_4, reboot_cntr, - boot_cause + boot_cause, ] reply.validity_buffer = hk_data[21:] elif set_id == BpxSetIds.GET_CFG_SET: battheat_mode = hk_data[0] - battheat_low = struct.unpack('!b', hk_data[1:2])[0] - battheat_high = struct.unpack('!b', hk_data[2:3])[0] + battheat_low = struct.unpack("!b", hk_data[1:2])[0] + battheat_high = struct.unpack("!b", hk_data[2:3])[0] reply.header_list = [ "Battery Heater Mode", "Battery Heater Low Limit", - "Battery Heater High Limit" - ] - reply.content_list = [ - battheat_mode, - battheat_low, - battheat_high + "Battery Heater High Limit", ] + reply.content_list = [battheat_mode, battheat_low, battheat_high] reply.validity_buffer = hk_data[3:] return reply diff --git a/pus_tm/service_8_hook.py b/pus_tm/service_8_hook.py index b157092..735e64f 100644 --- a/pus_tm/service_8_hook.py +++ b/pus_tm/service_8_hook.py @@ -49,7 +49,11 @@ def user_analyze_service_8_data( def handle_imtq_replies(action_id: int, custom_data: bytearray) -> DataReplyUnpacked: reply = DataReplyUnpacked() if action_id == struct.unpack("!I", ImtqActionIds.get_commanded_dipole)[0]: - reply.header_list = ["Commanded X-Dipole", "Commanded Y-Dipole", "Commanded Z-Dipole"] + reply.header_list = [ + "Commanded X-Dipole", + "Commanded Y-Dipole", + "Commanded Z-Dipole", + ] x_dipole = struct.unpack("!H", custom_data[:2]) y_dipole = struct.unpack("!H", custom_data[2:4]) z_dipole = struct.unpack("!H", custom_data[4:6]) @@ -60,7 +64,11 @@ def handle_imtq_replies(action_id: int, custom_data: bytearray) -> DataReplyUnpa def handle_ploc_replies(action_id: int, custom_data: bytearray) -> DataReplyUnpacked: reply = DataReplyUnpacked() if action_id == PlocReplyIds.tm_mem_read_report: - reply.header_list = ["PLOC Memory Address", "PLOC Mem Len", "PLOC Read Memory Data"] + reply.header_list = [ + "PLOC Memory Address", + "PLOC Mem Len", + "PLOC Read Memory Data", + ] reply.content_list = [custom_data[:4], custom_data[4:6], custom_data[6:10]] return reply diff --git a/tmtccmd b/tmtccmd index 892d131..06eb6c9 160000 --- a/tmtccmd +++ b/tmtccmd @@ -1 +1 @@ -Subproject commit 892d13117f0e3c6d598f157668bff9ed6b31574d +Subproject commit 06eb6c9bbec1ae1cb8bea14bd4f9079ee3a29ad5