# -*- coding: utf-8 -*- """reaction_wheels.py @brief Tests for the reaction wheel handler @author J. Meier @date 20.06.2021 """ import struct from tmtccmd.config.definitions import QueueCommands, ServiceOpCodeDictT from tmtccmd.tc.pus_3_fsfw_hk import ( generate_one_hk_command, generate_one_diag_command, make_sid, ) from tmtccmd.config.globals import add_op_code_entry, add_service_op_code_entry from tmtccmd.tc.packer import TcQueueT from spacepackets.ecss.tc import PusTelecommand from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes, Subservices from config.definitions import CustomServiceList class OpCodesDevs: SPEED = ["0", "speed"] ON = ["1", "on"] NML = ["2", "nml"] OFF = ["3", "off"] GET_STATUS = ["4", "status"] GET_TM = ["5", "tm"] class InfoDevs: SPEED = "Set speed" ON = "Set On" NML = "Set Normal" OFF = "Set Off" GET_STATUS = "Get Status HK" GET_TM = "Get TM HK" class OpCodesAss: ON = ["0", "on"] NML = ["1", "nml"] OFF = ["2", "off"] class InfoAss: ON = "Mode On: 3/4 RWs min. on" NML = "Mode Normal: 3/4 RWs min. normal" OFF = "Mode Off: All RWs off" class RwSetIds: STATUS_SET_ID = 4 TEMPERATURE_SET_ID = 8 LAST_RESET = 2 TM_SET = 9 class RwCommandIds: RESET_MCU = bytearray([0x0, 0x0, 0x0, 0x01]) # Reads status information from reaction wheel into dataset with id 4 GET_RW_STATUS = bytearray([0x0, 0x0, 0x0, 0x04]) INIT_RW_CONTROLLER = bytearray([0x0, 0x0, 0x0, 0x05]) SET_SPEED = bytearray([0x0, 0x0, 0x0, 0x06]) # Reads temperature from reaction wheel into dataset with id 8 GET_TEMPERATURE = bytearray([0x0, 0x0, 0x0, 0x08]) GET_TM = bytearray([0x0, 0x0, 0x0, 0x09]) class SpeedDefinitions: RPM_100 = 1000 RPM_5000 = 5000 class RampTime: MS_1000 = 1000 def add_rw_cmds(cmd_dict: ServiceOpCodeDictT): op_code_dict = dict() add_op_code_entry( op_code_dict=op_code_dict, info=InfoDevs.SPEED, keys=OpCodesDevs.SPEED ) add_op_code_entry(op_code_dict=op_code_dict, info=InfoDevs.ON, keys=OpCodesDevs.ON) add_op_code_entry( op_code_dict=op_code_dict, info=InfoDevs.OFF, keys=OpCodesDevs.OFF ) add_op_code_entry( op_code_dict=op_code_dict, info=InfoDevs.NML, keys=OpCodesDevs.NML ) add_op_code_entry( op_code_dict=op_code_dict, info=InfoDevs.GET_STATUS, keys=OpCodesDevs.GET_STATUS ) add_op_code_entry( op_code_dict=op_code_dict, info=InfoDevs.GET_TM, keys=OpCodesDevs.GET_TM ) add_service_op_code_entry( srv_op_code_dict=cmd_dict, name=CustomServiceList.REACTION_WHEEL_1.value, op_code_entry=op_code_dict, info="Reaction Wheel 1", ) add_service_op_code_entry( srv_op_code_dict=cmd_dict, name=CustomServiceList.REACTION_WHEEL_2.value, op_code_entry=op_code_dict, info="Reaction Wheel 2", ) add_service_op_code_entry( srv_op_code_dict=cmd_dict, name=CustomServiceList.REACTION_WHEEL_3.value, op_code_entry=op_code_dict, info="Reaction Wheel 3", ) add_service_op_code_entry( srv_op_code_dict=cmd_dict, name=CustomServiceList.REACTION_WHEEL_4.value, op_code_entry=op_code_dict, info="Reaction Wheel 4", ) op_code_dict = dict() add_op_code_entry(op_code_dict=op_code_dict, info=InfoAss.ON, keys=OpCodesAss.ON) add_op_code_entry(op_code_dict=op_code_dict, info=InfoAss.NML, keys=OpCodesAss.NML) add_op_code_entry(op_code_dict=op_code_dict, info=InfoAss.OFF, keys=OpCodesAss.OFF) add_service_op_code_entry( srv_op_code_dict=cmd_dict, name=CustomServiceList.RW_ASSEMBLY.value, op_code_entry=op_code_dict, info="Reaction Wheel Assembly", ) def pack_single_rw_test_into( object_id: bytes, rw_idx: int, tc_queue: TcQueueT, op_code: str ) -> TcQueueT: if op_code in OpCodesDevs.SPEED: speed = int(input("Specify speed [0.1 RPM]: ")) ramp_time = int(input("Specify ramp time [ms]: ")) tc_queue.appendleft((QueueCommands.PRINT, f"RW {rw_idx}: {InfoDevs.SPEED}")) command = pack_set_speed_command(object_id, speed, ramp_time, 40) tc_queue.appendleft(command.pack_command_tuple()) if op_code in OpCodesDevs.ON: tc_queue.appendleft((QueueCommands.PRINT, f"RW {rw_idx}: {InfoDevs.ON}")) mode_data = pack_mode_data(object_id, Modes.ON, 0) command = PusTelecommand(service=200, subservice=1, ssc=41, app_data=mode_data) tc_queue.appendleft(command.pack_command_tuple()) if op_code in OpCodesDevs.NML: tc_queue.appendleft((QueueCommands.PRINT, f"RW {rw_idx}: {InfoDevs.NML}")) mode_data = pack_mode_data(object_id, Modes.NORMAL, 0) command = PusTelecommand(service=200, subservice=1, ssc=42, app_data=mode_data) tc_queue.appendleft(command.pack_command_tuple()) if op_code in OpCodesDevs.OFF: tc_queue.appendleft((QueueCommands.PRINT, f"RW {rw_idx}: {InfoDevs.OFF}")) mode_data = pack_mode_data(object_id, Modes.OFF, 0) command = PusTelecommand(service=200, subservice=1, ssc=43, app_data=mode_data) tc_queue.appendleft(command.pack_command_tuple()) if op_code in OpCodesDevs.GET_TM: tc_queue.appendleft((QueueCommands.PRINT, f"RW {rw_idx}: {InfoDevs.GET_TM}")) command = generate_one_hk_command( sid=make_sid(object_id=object_id, set_id=RwSetIds.TM_SET), ssc=0 ) tc_queue.appendleft(command.pack_command_tuple()) if op_code in OpCodesDevs.GET_STATUS: tc_queue.appendleft( (QueueCommands.PRINT, f"RW {rw_idx}: {InfoDevs.GET_STATUS}") ) command = generate_one_diag_command( sid=make_sid(object_id=object_id, set_id=RwSetIds.STATUS_SET_ID), ssc=0 ) tc_queue.appendleft(command.pack_command_tuple()) return tc_queue def pack_rw_ass_cmds(tc_queue: TcQueueT, object_id: bytes, op_code: str): if op_code in OpCodesAss.OFF: data = pack_mode_data(object_id=object_id, mode=Modes.OFF, submode=0) cmd = PusTelecommand( service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=data ) tc_queue.appendleft(cmd.pack_command_tuple()) if op_code in OpCodesAss.ON: data = pack_mode_data(object_id=object_id, mode=Modes.ON, submode=0) cmd = PusTelecommand( service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=data ) tc_queue.appendleft(cmd.pack_command_tuple()) if op_code in OpCodesAss.NML: data = pack_mode_data(object_id=object_id, mode=Modes.NORMAL, submode=0) cmd = PusTelecommand( service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=data ) tc_queue.appendleft(cmd.pack_command_tuple()) def pack_set_speed_command( object_id: bytes, speed: int, ramp_time_ms: int, ssc: int ) -> PusTelecommand: """With this function a command is packed to set the speed of a reaction wheel :param object_id: The object id of the reaction wheel handler. :param speed: Valid speeds are [-65000, -1000] and [1000, 65000]. Values are specified in 0.1 * RPM :param ramp_time_ms: The time after which the reaction wheel will reach the commanded speed. Valid times are 10 - 10000 ms :param ssc: Source sequence count """ if speed > 0: if speed < 1000 or speed > 65000: raise ValueError( "Invalid RW speed specified. " "Allowed range is [1000, 65000] 0.1 * RPM" ) elif speed < 0: if speed < -65000 or speed > -1000: raise ValueError( "Invalid RW speed specified. " "Allowed range is [-65000, -1000] 0.1 * RPM" ) else: # Speed is 0 pass if ramp_time_ms < 0 or ( ramp_time_ms > 0 and (ramp_time_ms > 10000 or ramp_time_ms < 10) ): raise ValueError("Invalid Ramp Speed time. Allowed range is [10-10000] ms") command_id = RwCommandIds.SET_SPEED command = bytearray() command += object_id + command_id command = command + struct.pack("!i", speed) command = command + ramp_time_ms.to_bytes(length=2, byteorder="big") command = PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=command) return command