diff --git a/config/definitions.py b/config/definitions.py index 80a7ddb..d6a75f9 100644 --- a/config/definitions.py +++ b/config/definitions.py @@ -27,10 +27,11 @@ class CustomServiceList(enum.Enum): PCDU = "pcdu" PL_PCDU = "plpcdu" SA_DEPLYOMENT = "sa_depl" - REACTION_WHEEL_1 = "reaction_wheel_1" - REACTION_WHEEL_2 = "reaction_wheel_2" - REACTION_WHEEL_3 = "reaction_wheel_3" - REACTION_WHEEL_4 = "reaction_wheel_4" + REACTION_WHEEL_1 = "rw-1" + REACTION_WHEEL_2 = "rw-2" + REACTION_WHEEL_3 = "rw-3" + REACTION_WHEEL_4 = "rw-4" + RW_ASSEMBLY = "rw-ass" RAD_SENSOR = "rad_sensor" PLOC_UPDATER = "ploc_updater" GPS_0 = "gps0" diff --git a/config/hook_implementations.py b/config/hook_implementations.py index 9f50c1f..75a1b7f 100644 --- a/config/hook_implementations.py +++ b/config/hook_implementations.py @@ -65,7 +65,6 @@ def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT): add_pcdu_cmds, add_pl_pcdu_cmds, add_imtq_cmds, - add_rw_cmds, add_rad_sens_cmds, add_ploc_mpsoc_cmds, add_ploc_supv_cmds, @@ -73,6 +72,7 @@ def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT): add_time_cmds, ) from pus_tc.devs.gps import GpsOpCodes + from pus_tc.devs.reaction_wheels import add_rw_cmds add_bpx_cmd_definitions(cmd_dict=service_op_code_dict) add_core_controller_definitions(cmd_dict=service_op_code_dict) diff --git a/config/object_ids.py b/config/object_ids.py index c1b4236..b23e196 100644 --- a/config/object_ids.py +++ b/config/object_ids.py @@ -71,6 +71,7 @@ PL_PCDU_ID = bytes([0x44, 0x30, 0x00, 0x00]) ACS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x01]) SUS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x02]) TCS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x03]) +RW_ASSEMBLY = bytes([0x73, 0x00, 0x00, 0x04]) def get_object_ids() -> ObjectIdDictT: diff --git a/pus_tc/cmd_definitions.py b/pus_tc/cmd_definitions.py index 7c078d3..d34f2a9 100644 --- a/pus_tc/cmd_definitions.py +++ b/pus_tc/cmd_definitions.py @@ -489,25 +489,6 @@ def add_imtq_cmds(cmd_dict: ServiceOpCodeDictT): cmd_dict[CustomServiceList.IMTQ.value] = service_imtq_tuple -def add_rw_cmds(cmd_dict: ServiceOpCodeDictT): - op_code_dict_srv_rw = { - "0": ("Reaction Wheel: Run all commands", {OpCodeDictKeys.TIMEOUT: 2.0}), - "1": ("Reaction Wheel: Set speed", {OpCodeDictKeys.TIMEOUT: 2.0}), - "2": ("Reaction Wheel: Set mode on", {OpCodeDictKeys.TIMEOUT: 2.0}), - "3": ("Reaction Wheel: Set mode normal", {OpCodeDictKeys.TIMEOUT: 2.0}), - "4": ("Reaction Wheel: Set mode off", {OpCodeDictKeys.TIMEOUT: 2.0}), - "5": ( - "Reaction Wheel: Send get-telemetry-command", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - } - service_rw_tuple = ("Reaction Wheel", op_code_dict_srv_rw) - cmd_dict[CustomServiceList.REACTION_WHEEL_1.value] = service_rw_tuple - cmd_dict[CustomServiceList.REACTION_WHEEL_2.value] = service_rw_tuple - cmd_dict[CustomServiceList.REACTION_WHEEL_3.value] = service_rw_tuple - cmd_dict[CustomServiceList.REACTION_WHEEL_4.value] = service_rw_tuple - - def add_rad_sens_cmds(cmd_dict: ServiceOpCodeDictT): op_code_dict_srv_rad_sensor = { "0": ("Radiation Sensor: Set mode on", {OpCodeDictKeys.TIMEOUT: 2.0}), @@ -619,8 +600,14 @@ def add_ploc_supv_cmds(cmd_dict: ServiceOpCodeDictT): "46": ("PLOC Supervisor: Factory flash", {OpCodeDictKeys.TIMEOUT: 2.0}), "47": ("PLOC Supervisor: Enable auto TM", {OpCodeDictKeys.TIMEOUT: 2.0}), "48": ("PLOC Supervisor: Disable auto TM", {OpCodeDictKeys.TIMEOUT: 2.0}), - "51": ("PLOC Supervisor: Logging request event buffers", {OpCodeDictKeys.TIMEOUT: 2.0}), - "52": ("PLOC Supervisor: Logging clear counters", {OpCodeDictKeys.TIMEOUT: 2.0}), + "51": ( + "PLOC Supervisor: Logging request event buffers", + {OpCodeDictKeys.TIMEOUT: 2.0}, + ), + "52": ( + "PLOC Supervisor: Logging clear counters", + {OpCodeDictKeys.TIMEOUT: 2.0}, + ), "53": ("PLOC Supervisor: Logging set topic", {OpCodeDictKeys.TIMEOUT: 2.0}), "54": ( "PLOC Supervisor: Logging request counters", diff --git a/pus_tc/devs/ploc_mpsoc.py b/pus_tc/devs/ploc_mpsoc.py index f5985b9..da81496 100644 --- a/pus_tc/devs/ploc_mpsoc.py +++ b/pus_tc/devs/ploc_mpsoc.py @@ -64,7 +64,6 @@ class PlocReplyIds(enum.IntEnum): TM_CAM_CMD_RPT = 19 - def pack_ploc_mpsoc_commands( object_id: bytearray, tc_queue: TcQueueT, op_code: str ) -> TcQueueT: diff --git a/pus_tc/devs/ploc_supervisor.py b/pus_tc/devs/ploc_supervisor.py index d8ff98f..34cfa7b 100644 --- a/pus_tc/devs/ploc_supervisor.py +++ b/pus_tc/devs/ploc_supervisor.py @@ -32,8 +32,14 @@ MANUAL_INPUT = "1" update_file_dict = { MANUAL_INPUT: ["manual input", ""], "2": ["/mnt/sd0/ploc/supervisor/update.bin", "/mnt/sd0/ploc/supervisor/update.bin"], - "3": ["/mnt/sd0/ploc/supervisor/update-large.bin", "/mnt/sd0/ploc/supervisor/update-large.bin"], - "4": ["/mnt/sd0/ploc/supervisor/update-small.bin", "/mnt/sd0/ploc/supervisor/update-small.bin"], + "3": [ + "/mnt/sd0/ploc/supervisor/update-large.bin", + "/mnt/sd0/ploc/supervisor/update-large.bin", + ], + "4": [ + "/mnt/sd0/ploc/supervisor/update-small.bin", + "/mnt/sd0/ploc/supervisor/update-small.bin", + ], } event_buffer_path_dict = { @@ -375,7 +381,7 @@ def pack_ploc_supv_commands( tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "56": tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Reset PL")) - command = object_id + struct.pack('!I', SupvActionIds.RESET_PL) + command = object_id + struct.pack("!I", SupvActionIds.RESET_PL) command = PusTelecommand(service=8, subservice=128, ssc=71, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) diff --git a/pus_tc/devs/reaction_wheels.py b/pus_tc/devs/reaction_wheels.py index e23dab0..ffa4909 100644 --- a/pus_tc/devs/reaction_wheels.py +++ b/pus_tc/devs/reaction_wheels.py @@ -1,21 +1,51 @@ # -*- coding: utf-8 -*- -""" -@file reaction_wheels.py +"""reaction_wheels.py @brief Tests for the reaction wheel handler @author J. Meier @date 20.06.2021 """ import struct -from tmtccmd.config.definitions import QueueCommands - +from tmtccmd.config.definitions import QueueCommands, ServiceOpCodeDictT, OpCodeDictKeys +from tmtccmd.config.globals import add_op_code_entry, add_service_op_code_entry from tmtccmd.tc.packer import TcQueueT from spacepackets.ecss.tc import PusTelecommand -from pus_tc.service_200_mode import pack_mode_data +from tmtccmd.tc.service_200_mode import pack_mode_data, Modes +from config.definitions import CustomServiceList + + +class OpCodesDevs: + SPEED = ["0", "speed"] + ON = ["1", "on"] + NML = ["2", "nml"] + OFF = ["3", "off"] + GET_TM = ["4", "tm"] + + +class InfoDevs: + SPEED = "Set speed" + ON = "Set On" + NML = "Set Normal" + OFF = "Set Off" + GET_TM = "Get TM HK" + + +class OpCodesAss: + ON = ["0", "on"] + NML = ["1", "nml"] + OFF = ["2", "off"] + + +class InfoAss: + ON = "Mode On: 3/4 RWs min. on" + NML = "Mode Normal: 3/4 RWs min. normal" + OFF = "Mode Off: All RWs off" class RwSetIds: STATUS_SET_ID = 4 TEMPERATURE_SET_ID = 8 + LAST_RESET = 2 + TM_SET = 9 class RwCommandIds: @@ -38,8 +68,60 @@ class RampTime: MS_1000 = 1000 +def add_rw_cmds(cmd_dict: ServiceOpCodeDictT): + op_code_dict = dict() + add_op_code_entry( + op_code_dict=op_code_dict, info=InfoDevs.SPEED, keys=OpCodesDevs.SPEED + ) + add_op_code_entry(op_code_dict=op_code_dict, info=InfoDevs.ON, keys=OpCodesDevs.ON) + add_op_code_entry( + op_code_dict=op_code_dict, info=InfoDevs.OFF, keys=OpCodesDevs.OFF + ) + add_op_code_entry( + op_code_dict=op_code_dict, info=InfoDevs.NML, keys=OpCodesDevs.NML + ) + add_op_code_entry( + op_code_dict=op_code_dict, info=InfoDevs.GET_TM, keys=OpCodesDevs.GET_TM + ) + + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + name=CustomServiceList.REACTION_WHEEL_1.value, + op_code_entry=op_code_dict, + info="Reaction Wheel 1", + ) + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + name=CustomServiceList.REACTION_WHEEL_2.value, + op_code_entry=op_code_dict, + info="Reaction Wheel 2", + ) + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + name=CustomServiceList.REACTION_WHEEL_3.value, + op_code_entry=op_code_dict, + info="Reaction Wheel 3", + ) + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + name=CustomServiceList.REACTION_WHEEL_4.value, + op_code_entry=op_code_dict, + info="Reaction Wheel 4", + ) + op_code_dict = dict() + add_op_code_entry(op_code_dict=op_code_dict, info=InfoAss.ON, keys=OpCodesAss.ON) + add_op_code_entry(op_code_dict=op_code_dict, info=InfoAss.NML, keys=OpCodesAss.NML) + add_op_code_entry(op_code_dict=op_code_dict, info=InfoAss.OFF, keys=OpCodesAss.OFF) + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + name=CustomServiceList.RW_ASSEMBLY.value, + op_code_entry=op_code_dict, + info="Reaction Wheel Assembly", + ) + + def pack_single_rw_test_into( - object_id: bytearray, tc_queue: TcQueueT, op_code: str + object_id: bytes, tc_queue: TcQueueT, op_code: str ) -> TcQueueT: tc_queue.appendleft( ( @@ -48,7 +130,7 @@ def pack_single_rw_test_into( ) ) - if op_code == "0" or op_code == "1": + if op_code in OpCodesDevs.SPEED: speed = int(input("Specify speed [0.1 RPM]: ")) ramp_time = int(input("Specify ramp time [ms]: ")) tc_queue.appendleft((QueueCommands.PRINT, "Reaction Wheel: Set speed")) @@ -56,27 +138,27 @@ def pack_single_rw_test_into( command = PusTelecommand(service=8, subservice=128, ssc=40, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) - if op_code == "2": + if op_code in OpCodesDevs.ON: tc_queue.appendleft((QueueCommands.PRINT, "Reaction Wheel: Switch to mode on")) - mode_data = pack_mode_data(object_id, 1, 0) + mode_data = pack_mode_data(object_id, Modes.ON, 0) command = PusTelecommand(service=200, subservice=1, ssc=41, app_data=mode_data) tc_queue.appendleft(command.pack_command_tuple()) - if op_code == "3": + if op_code in OpCodesDevs.NML: tc_queue.appendleft( (QueueCommands.PRINT, "Reaction Wheel: Switch to mode normal") ) - mode_data = pack_mode_data(object_id, 2, 0) + mode_data = pack_mode_data(object_id, Modes.NORMAL, 0) command = PusTelecommand(service=200, subservice=1, ssc=42, app_data=mode_data) tc_queue.appendleft(command.pack_command_tuple()) - if op_code == "4": + if op_code in OpCodesDevs.OFF: tc_queue.appendleft((QueueCommands.PRINT, "Reaction Wheel: Switch to mode off")) - mode_data = pack_mode_data(object_id, 0, 0) + mode_data = pack_mode_data(object_id, Modes.OFF, 0) command = PusTelecommand(service=200, subservice=1, ssc=43, app_data=mode_data) tc_queue.appendleft(command.pack_command_tuple()) - if op_code == "5": + if op_code in OpCodesDevs.GET_TM: tc_queue.appendleft( (QueueCommands.PRINT, "Reaction Wheel: Send get-telemetry-command") ) @@ -86,18 +168,26 @@ def pack_single_rw_test_into( return tc_queue -def pack_set_speed_command( - object_id: bytearray, speed: int, ramp_time: int -) -> bytearray: +def pack_rw_ass_cmds(tc_queue: TcQueueT, object_id: bytes, op_code: str): + if op_code in OpCodesAss.OFF: + pass + if op_code in OpCodesAss.ON: + pass + if op_code in OpCodesAss.NML: + pass + + +def pack_set_speed_command(object_id: bytes, speed: int, ramp_time: int) -> bytearray: """With this function a command is packed to set the speed of a reaction wheel - @param object_id The object id of the reaction wheel handler. - @param speed Valid speeds are [-65000, -1000] and [1000, 65000]. Values are specified in 0.1 * RPM - @param ramp_time The time after which the reaction wheel will reached the commanded speed. Valid times are - 10 - 10000 ms + :param object_id The object id of the reaction wheel handler. + :param speed Valid speeds are [-65000, -1000] and [1000, 65000]. Values are + specified in 0.1 * RPM + :param ramp_time The time after which the reaction wheel will reached the commanded speed. + Valid times are 10 - 10000 ms """ command_id = RwCommandIds.SET_SPEED command = bytearray() - command = object_id + command_id + command += object_id + command_id command = command + struct.pack("!i", speed) command = command + ramp_time.to_bytes(length=2, byteorder="big") return command diff --git a/pus_tc/devs/rws.py b/pus_tc/devs/rws.py deleted file mode 100644 index 44c51e9..0000000 --- a/pus_tc/devs/rws.py +++ /dev/null @@ -1,16 +0,0 @@ -from config.object_ids import RW1_ID, RW2_ID, RW3_ID, RW4_ID - - -class SetIds: - TEMP_SET = 8 - STATUS = 4 - LAST_RESET = 2 - TM_SET = 9 - - -class Info: - pass - - -def pack_rw_cmds(op_code: str): - pass diff --git a/pus_tc/tc_packer_hook.py b/pus_tc/tc_packer_hook.py index e3bba4a..e6a64b2 100644 --- a/pus_tc/tc_packer_hook.py +++ b/pus_tc/tc_packer_hook.py @@ -27,7 +27,7 @@ from pus_tc.devs.tmp1075 import pack_tmp1075_test_into from pus_tc.devs.ploc_mpsoc import pack_ploc_mpsoc_commands from pus_tc.devs.ploc_supervisor import pack_ploc_supv_commands from pus_tc.devs.heater import pack_heater_test_into -from pus_tc.devs.reaction_wheels import pack_single_rw_test_into +from pus_tc.devs.reaction_wheels import pack_single_rw_test_into, pack_rw_ass_cmds from pus_tc.devs.rad_sensor import pack_rad_sensor_test_into from pus_tc.devs.ploc_upater import pack_ploc_updater_test_into from pus_tc.devs.ploc_memory_dumper import pack_ploc_memory_dumper_cmd @@ -40,7 +40,6 @@ from pus_tc.system.time import pack_set_current_time_ascii_command from pus_tc.system.acs import pack_acs_command, pack_sus_cmds from pus_tc.devs.plpcdu import pack_pl_pcdu_commands from pus_tc.devs.str_img_helper import pack_str_img_helper_command -from pus_tc.devs.rws import pack_rw_cmds from pus_tc.system.tcs import pack_tcs_sys_commands from config.definitions import CustomServiceList from config.object_ids import ( @@ -69,6 +68,7 @@ from config.object_ids import ( STR_IMG_HELPER_ID, SYRLINKS_HANDLER_ID, SOLAR_ARRAY_DEPLOYMENT_ID, + RW_ASSEMBLY, ) @@ -147,24 +147,20 @@ def pack_service_queue_user( object_id=object_id, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.REACTION_WHEEL_1.value: - object_id = RW1_ID return pack_single_rw_test_into( - object_id=object_id, tc_queue=service_queue, op_code=op_code + object_id=RW1_ID, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.REACTION_WHEEL_2.value: - object_id = RW2_ID return pack_single_rw_test_into( - object_id=object_id, tc_queue=service_queue, op_code=op_code + object_id=RW2_ID, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.REACTION_WHEEL_3.value: - object_id = RW3_ID return pack_single_rw_test_into( - object_id=object_id, tc_queue=service_queue, op_code=op_code + object_id=RW3_ID, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.REACTION_WHEEL_4.value: - object_id = RW4_ID return pack_single_rw_test_into( - object_id=object_id, tc_queue=service_queue, op_code=op_code + object_id=RW4_ID, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.RAD_SENSOR.value: object_id = RAD_SENSOR_ID @@ -234,13 +230,10 @@ def pack_service_queue_user( return pack_tcs_sys_commands(tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.TIME.value: return pack_set_current_time_ascii_command(tc_queue=service_queue, ssc=0) - if service in [ - CustomServiceList.REACTION_WHEEL_1, - CustomServiceList.REACTION_WHEEL_2, - CustomServiceList.REACTION_WHEEL_3, - CustomServiceList.REACTION_WHEEL_4, - ]: - return pack_rw_cmds(op_code=op_code) + if service == CustomServiceList.RW_ASSEMBLY: + return pack_rw_ass_cmds( + tc_queue=service_queue, object_id=RW_ASSEMBLY, op_code=op_code + ) LOGGER.warning("Invalid Service !") diff --git a/pus_tm/action_reply_handler.py b/pus_tm/action_reply_handler.py index 17085de..cd35815 100644 --- a/pus_tm/action_reply_handler.py +++ b/pus_tm/action_reply_handler.py @@ -75,7 +75,10 @@ def handle_ploc_replies( printer.file_logger.info(content_list) elif action_id == PlocReplyIds.TM_CAM_CMD_RPT: header_list = ["Camera reply string", "ACK"] - content_list = [custom_data[:len(custom_data) - 1].decode('utf-8'), hex(custom_data[-1])] + content_list = [ + custom_data[: len(custom_data) - 1].decode("utf-8"), + hex(custom_data[-1]), + ] print(header_list) print(content_list) printer.file_logger.info(header_list) @@ -95,7 +98,7 @@ def handle_supervisor_replies( printer.file_logger.info(content_list) elif action_id == SupvActionIds.READ_GPIO: header_list = ["GPIO state"] - content_list = [struct.unpack('!H', custom_data[:2])[0]] + content_list = [struct.unpack("!H", custom_data[:2])[0]] print(header_list) print(content_list) printer.file_logger.info(header_list)