diff --git a/pus_tc/cmd_definitions.py b/pus_tc/cmd_definitions.py index 16bd542..357c126 100644 --- a/pus_tc/cmd_definitions.py +++ b/pus_tc/cmd_definitions.py @@ -324,6 +324,7 @@ def add_bpx_cmd_definitions(cmd_dict: ServiceOpCodeDictT): def add_pl_pcdu_cmds(cmd_dict: ServiceOpCodeDictT): from pus_tc.devs.plpcdu import OpCodes, Info + op_code_dict = dict() add_op_code_entry( op_code_dict=op_code_dict, keys=OpCodes.SWITCH_ON, info=Info.SWITCH_ON diff --git a/pus_tc/devs/imtq.py b/pus_tc/devs/imtq.py index 7c1f136..3784167 100644 --- a/pus_tc/devs/imtq.py +++ b/pus_tc/devs/imtq.py @@ -211,7 +211,6 @@ def pack_imtq_test_into( z_dipole = 0 duration = 0 # ms command = pack_dipole_command(object_id, x_dipole, y_dipole, z_dipole, duration) - command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if op_code == "0" or op_code == "8": @@ -225,7 +224,7 @@ def pack_imtq_test_into( def pack_dipole_command( object_id: bytearray, x_dipole: int, y_dipole: int, z_dipole: int, duration: int -) -> bytearray: +) -> PusTelecommand: """This function packs the command causing the ISIS IMTQ to generate a dipole. @param object_id The object id of the IMTQ handler. @param x_dipole The dipole of the x coil in 10^-4*Am^2 (max. 2000) @@ -242,4 +241,5 @@ def pack_dipole_command( command.extend(y_dipole.to_bytes(length=2, byteorder="big")) command.extend(z_dipole.to_bytes(length=2, byteorder="big")) command.extend(duration.to_bytes(length=2, byteorder="big")) + command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) return command diff --git a/pus_tc/system/core.py b/pus_tc/system/core.py index ece17c2..f44ad87 100644 --- a/pus_tc/system/core.py +++ b/pus_tc/system/core.py @@ -133,7 +133,9 @@ def pack_core_commands(tc_queue: TcQueueT, op_code: str): ) if op_code in OpCodes.REBOOT_FULL: tc_queue.appendleft((QueueCommands.PRINT, f"Core Command: {Info.REBOOT_FULL}")) - cmd = generate_action_command(object_id=CORE_CONTROLLER_ID, action_id=ActionIds.FULL_REBOOT) + cmd = generate_action_command( + object_id=CORE_CONTROLLER_ID, action_id=ActionIds.FULL_REBOOT + ) tc_queue.appendleft(cmd.pack_command_tuple()) if op_code in OpCodes.XSC_REBOOT_SELF: perform_reboot_cmd(tc_queue=tc_queue, reboot_self=True) diff --git a/pus_tc/system/proc.py b/pus_tc/system/proc.py index edd2c29..c2cb707 100644 --- a/pus_tc/system/proc.py +++ b/pus_tc/system/proc.py @@ -1,6 +1,8 @@ from tmtccmd.config import QueueCommands from tmtccmd.tc.definitions import TcQueueT from tmtccmd.tc.pus_3_fsfw_hk import * + +""" from config.object_ids import ( BPX_HANDLER_ID, P60_DOCK_HANDLER, @@ -9,10 +11,25 @@ from config.object_ids import ( ACU_HANDLER_ID, CORE_CONTROLLER_ID, ) +""" import config.object_ids as oids from pus_tc.devs.bpx_batt import BpxSetIds from pus_tc.system.core import SetIds as CoreSetIds from gomspace.gomspace_common import SetIds as GsSetIds +from pus_tc.devs.rad_sensor import SetIds as RadSetIds +from pus_tc.devs.mgms import MgmLis3SetIds as MgmSetIds_0_2 +from pus_tc.devs.mgms import MgmRm3100SetIds as MgmSetIds_1_3 +from pus_tc.devs.gyros import AdisGyroSetIds as GyroSetIds_0_2 +from pus_tc.devs.gyros import L3gGyroSetIds as GyroSetIds_1_3 +from pus_tc.devs.gps import Gnss0SetIds as GnssSetIds_0 +from pus_tc.devs.gps import Gnss1SetIds as GnssSetIds_1 +from pus_tc.devs.imtq import ImtqSetIds + + +from pus_tc.system.tcs import pack_tcs_sys_commands +from pus_tc.system.controllers import pack_controller_commands +from pus_tc.system.acs import pack_acs_command +from pus_tc.devs.imtq import pack_dipole_command class OpCodes: @@ -20,68 +37,235 @@ class OpCodes: BAT_FT = ["bat-ft"] CORE_FT = ["core-ft"] PCDU_FT = ["pcdu-ft"] + RAD_SEN_FT = ["rad-sen-ft"] + TCS_FT_ON = ["tcs-ft-on"] + TCS_FT_OFF = ["tcs-ft-off"] + ACS_FT = ["acs-ft"] + MGT_FT = ["mgt-ft"] + MGT_FT_DP = ["mgt-ft-dp"] class KeyAndInfo: HEATER = ["Heater", "heater procedure"] BAT_FT = ["BPX Battery", "battery functional test"] - PCDU_FT = ["PCDU", "PCDU functional test"] CORE_FT = ["OBC", "OBC functional test"] + PCDU_FT = ["PCDU", "PCDU functional test"] + RAD_SEN_FT = ["Radiation Sensor", "Radiation Sensor functional test"] + TCS_FT_ON = ["TCS Act.", "TCS functional test activation"] + TCS_FT_OFF = ["TCS Deact.", "TCS functional test deactivation"] + ACS_FT = ["ACS", "ACS functional test"] + MGT_FT = ["MGT", "MGT functional test"] + MGT_FT_DP = ["MGT dipole", "MGT functional test with dipole"] KAI = KeyAndInfo PROC_INFO_DICT = { KAI.BAT_FT[0]: [OpCodes.BAT_FT, KAI.BAT_FT[1], 120.0, 10.0], - KAI.PCDU_FT[0]: [OpCodes.PCDU_FT, KeyAndInfo.PCDU_FT[1], 120.0, 10.0], - KAI.CORE_FT[0]: [OpCodes.CORE_FT, KeyAndInfo.CORE_FT[1], 120.0, 10.0], + KAI.CORE_FT[0]: [OpCodes.CORE_FT, KAI.CORE_FT[1], 120.0, 10.0], + KAI.PCDU_FT[0]: [OpCodes.PCDU_FT, KAI.PCDU_FT[1], 120.0, 10.0], + KAI.RAD_SEN_FT[0]: [OpCodes.RAD_SEN_FT, KAI.RAD_SEN_FT[1], 120.0, 10.0], + KAI.TCS_FT_ON[0]: [OpCodes.TCS_FT_ON, KAI.TCS_FT_ON[1], 120.0, 10.0], + KAI.TCS_FT_OFF[0]: [OpCodes.TCS_FT_OFF, KAI.TCS_FT_OFF[1], 120.0, 10.0], + KAI.ACS_FT[0]: [OpCodes.ACS_FT, KAI.ACS_FT[1], 120.0, 10.0], + KAI.MGT_FT[0]: [OpCodes.MGT_FT, KAI.MGT_FT[1], 120.0, 10.0], + KAI.MGT_FT_DP[0]: [OpCodes.MGT_FT_DP, KAI.MGT_FT_DP[1], 120.0, 10.0], + # collection_time for KAI.MGT_FT_DP maybe be reduced as a full 120seconds is not needed after MGTs are tested } -def generic_print(tc_queue: TcQueueT, key: str): - info = PROC_INFO_DICT[key] +def generic_print(tc_queue: TcQueueT, info: dict): tc_queue.appendleft( - (QueueCommands.PRINT, f"Executing {info[0]} Procedure (OpCodes: {info[1]})") + (QueueCommands.PRINT, f"Executing {info[1]} Procedure (OpCodes: {info[0]})") ) def pack_generic_hk_listening_cmds( - tc_queue: TcQueueT, proc_key: str, sid: bytes, diag: bool + tc_queue: TcQueueT, + proc_key: str, + sid_list: list[bytearray], + diag: bool, + mgt: bool, ): - generic_print(tc_queue=tc_queue, key=proc_key) - listen_to_hk_for_x_seconds( - diag=diag, - tc_queue=tc_queue, - device=proc_key, - sid=sid, - interval_seconds=10.0, - collection_time=120.0, - ) + info = PROC_INFO_DICT[proc_key] + collection_time = info[2] + generic_print(tc_queue=tc_queue, info=info) + + for sid in sid_list: + enable_listen_to_hk_for_x_seconds( + diag=diag, + tc_queue=tc_queue, + device=proc_key, + sid=sid, + interval_seconds=info[3], + ) + + if mgt is True: + activate_mgts_alternately( + tc_queue=tc_queue, + ) + + else: + pass + + tc_queue.appendleft((QueueCommands.WAIT, collection_time)) + + for sid in sid_list: + disable_listen_to_hk_for_x_seconds( + diag=diag, + tc_queue=tc_queue, + device=proc_key, + sid=sid, + ) + + sid_list.clear() def pack_proc_commands(tc_queue: TcQueueT, op_code: str): + sid_list = [] if op_code in OpCodes.BAT_FT: key = KAI.BAT_FT[0] - sid = make_sid(BPX_HANDLER_ID, BpxSetIds.GET_HK_SET) + sid_list.append(make_sid(oids.BPX_HANDLER_ID, BpxSetIds.GET_HK_SET)) pack_generic_hk_listening_cmds( - tc_queue=tc_queue, proc_key=key, sid=sid, diag=False - ) - - if op_code in OpCodes.PCDU_FT: - key = KAI.PCDU_FT[0] - sid = make_sid(P60_DOCK_HANDLER, GsSetIds.P60_CORE) - pack_generic_hk_listening_cmds( - tc_queue=tc_queue, proc_key=key, sid=sid, diag=False + tc_queue=tc_queue, proc_key=key, sid_list=sid_list, diag=False, mgt=False ) if op_code in OpCodes.CORE_FT: key = KAI.CORE_FT[0] - sid = make_sid(oids.CORE_CONTROLLER_ID, CoreSetIds.HK) + sid_list.append(make_sid(oids.CORE_CONTROLLER_ID, CoreSetIds.HK)) pack_generic_hk_listening_cmds( - tc_queue=tc_queue, proc_key=key, sid=sid, diag=False + tc_queue=tc_queue, proc_key=key, sid_list=sid_list, diag=False, mgt=False ) + if op_code in OpCodes.PCDU_FT: + key = KAI.PCDU_FT[0] + sid_list.append(make_sid(oids.P60_DOCK_HANDLER, GsSetIds.P60_CORE)) + sid_list.append(make_sid(oids.PDU_1_HANDLER_ID, GsSetIds.PDU_1_CORE)) + sid_list.append(make_sid(oids.PDU_2_HANDLER_ID, GsSetIds.PDU_2_CORE)) + sid_list.append(make_sid(oids.ACU_HANDLER_ID, GsSetIds.ACU_CORE)) + sid_list.append(make_sid(oids.P60_DOCK_HANDLER, GsSetIds.P60_AUX)) + sid_list.append(make_sid(oids.PDU_1_HANDLER_ID, GsSetIds.PDU_1_AUX)) + sid_list.append(make_sid(oids.PDU_2_HANDLER_ID, GsSetIds.PDU_2_AUX)) + sid_list.append(make_sid(oids.ACU_HANDLER_ID, GsSetIds.ACU_AUX)) + pack_generic_hk_listening_cmds( + tc_queue=tc_queue, proc_key=key, sid_list=sid_list, diag=False, mgt=False + ) + if op_code in OpCodes.RAD_SEN_FT: + key = KAI.RAD_SEN_FT[0] + sid_list.append(make_sid(oids.RAD_SENSOR_ID, RadSetIds.RAD_SEN_CORE)) + pack_generic_hk_listening_cmds( + tc_queue=tc_queue, proc_key=key, sid_list=sid_list, diag=False, mgt=False + ) + + if op_code in OpCodes.TCS_FT_ON: + # check whether tcs_assembly also has to be commanded to NORMAL Mode + # pack_tcs_sys_commands(tc_queue=tc_queue, op_code="tcs-normal") + pack_controller_commands(tc_queue=tc_queue, op_code="thermal_controller") + + if op_code in OpCodes.TCS_FT_OFF: + # check whether tcs_assembly also has to be commanded to OFF Mode + # pack_tcs_sys_commands(tc_queue=tc_queue, op_code="tcs-off") + pack_controller_commands(tc_queue=tc_queue, op_code="thermal_controller") + + if op_code in OpCodes.ACS_FT: + key = KAI.ACS_FT[0] + + pack_acs_command(tc_queue=tc_queue, op_code="acs-a") + + # MGMs + sid_list.append(make_sid(oids.MGM_0_LIS3_HANDLER_ID, MgmSetIds_0_2.CORE_HK)) + sid_list.append(make_sid(oids.MGM_1_RM3100_HANDLER_ID, MgmSetIds_1_3.CORE_HK)) + # Gyros + sid_list.append(make_sid(oids.GYRO_0_ADIS_HANDLER_ID, GyroSetIds_0_2.CORE_HK)) + sid_list.append(make_sid(oids.GYRO_0_ADIS_HANDLER_ID, GyroSetIds_0_2.CFG_HK)) + sid_list.append(make_sid(oids.GYRO_1_L3G_HANDLER_ID, GyroSetIds_1_3.CORE_HK)) + # GNSS0 + sid_list.append(make_sid(oids.GPS_HANDLER_0_ID, GnssSetIds_0.CORE_HK)) + pack_generic_hk_listening_cmds( + tc_queue=tc_queue, proc_key=key, sid_list=sid_list, diag=False, mgt=False + ) + + pack_acs_command(tc_queue=tc_queue, op_code="acs-off") + tc_queue.appendleft((QueueCommands.WAIT, 5.0)) + pack_acs_command(tc_queue=tc_queue, op_code="acs-b") + + # MGMs + sid_list.append(make_sid(oids.MGM_2_LIS3_HANDLER_ID, MgmSetIds_0_2.CORE_HK)) + sid_list.append(make_sid(oids.MGM_3_RM3100_HANDLER_ID, MgmSetIds_1_3.CORE_HK)) + # Gyros + sid_list.append(make_sid(oids.GYRO_2_ADIS_HANDLER_ID, GyroSetIds_0_2.CORE_HK)) + sid_list.append(make_sid(oids.GYRO_2_ADIS_HANDLER_ID, GyroSetIds_0_2.CFG_HK)) + sid_list.append(make_sid(oids.GYRO_3_L3G_HANDLER_ID, GyroSetIds_1_3.CORE_HK)) + # GNSS1 + sid_list.append(make_sid(oids.GPS_HANDLER_1_ID, GnssSetIds_1.CORE_HK)) + pack_generic_hk_listening_cmds( + tc_queue=tc_queue, proc_key=key, sid_list=sid_list, diag=False, mgt=False + ) + + pack_acs_command(tc_queue=tc_queue, op_code="acs-off") + tc_queue.appendleft((QueueCommands.WAIT, 5.0)) + pack_acs_command(tc_queue=tc_queue, op_code="acs-d") + + # MGMs + sid_list.append(make_sid(oids.MGM_0_LIS3_HANDLER_ID, MgmSetIds_0_2.CORE_HK)) + sid_list.append(make_sid(oids.MGM_1_RM3100_HANDLER_ID, MgmSetIds_1_3.CORE_HK)) + sid_list.append(make_sid(oids.MGM_2_LIS3_HANDLER_ID, MgmSetIds_0_2.CORE_HK)) + sid_list.append(make_sid(oids.MGM_3_RM3100_HANDLER_ID, MgmSetIds_1_3.CORE_HK)) + # Gyros + sid_list.append(make_sid(oids.GYRO_0_ADIS_HANDLER_ID, GyroSetIds_0_2.CORE_HK)) + sid_list.append(make_sid(oids.GYRO_0_ADIS_HANDLER_ID, GyroSetIds_0_2.CFG_HK)) + sid_list.append(make_sid(oids.GYRO_1_L3G_HANDLER_ID, GyroSetIds_1_3.CORE_HK)) + sid_list.append(make_sid(oids.GYRO_2_ADIS_HANDLER_ID, GyroSetIds_0_2.CORE_HK)) + sid_list.append(make_sid(oids.GYRO_2_ADIS_HANDLER_ID, GyroSetIds_0_2.CFG_HK)) + sid_list.append(make_sid(oids.GYRO_3_L3G_HANDLER_ID, GyroSetIds_1_3.CORE_HK)) + # GNSS0+1 + sid_list.append(make_sid(oids.GPS_HANDLER_0_ID, GnssSetIds_0.CORE_HK)) + sid_list.append(make_sid(oids.GPS_HANDLER_1_ID, GnssSetIds_1.CORE_HK)) + pack_generic_hk_listening_cmds( + tc_queue=tc_queue, proc_key=key, sid_list=sid_list, diag=False, mgt=False + ) + + pack_acs_command(tc_queue=tc_queue, op_code="acs-off") + + if op_code in OpCodes.MGT_FT: + key = KAI.MGT_FT[0] + + # missing imtq to ON + # mgt 1: imtq und hk + + # MISSING: imtq board activation to ON (implementation pending by Jakob) + + sid_list.append(make_sid(oids.IMTQ_HANDLER_ID, ImtqSetIds.ENG_HK_SET)) + sid_list.append(make_sid(oids.IMTQ_HANDLER_ID, ImtqSetIds.CAL_MTM_SET)) + sid_list.append(make_sid(oids.IMTQ_HANDLER_ID, ImtqSetIds.RAW_MTM_SET)) + pack_generic_hk_listening_cmds( + tc_queue=tc_queue, proc_key=key, sid_list=sid_list, diag=False, mgt=False + ) + + # MISSING: imtq board deactivation to OFF (implementation pending by Jakob) + + # missing imtq to ON + # mgt 2.: imtq + dual side + dipole + + if op_code in OpCodes.MGT_FT_DP: + key = KAI.MGT_FT_DP[0] + pack_acs_command(tc_queue=tc_queue, op_code="acs-d") + + # MISSING: imtq board activation to ON (implementation pending by Jakob) + + sid_list.append(make_sid(oids.IMTQ_HANDLER_ID, ImtqSetIds.ENG_HK_SET)) + sid_list.append(make_sid(oids.IMTQ_HANDLER_ID, ImtqSetIds.CAL_MTM_SET)) + sid_list.append(make_sid(oids.IMTQ_HANDLER_ID, ImtqSetIds.RAW_MTM_SET)) + pack_generic_hk_listening_cmds( + tc_queue=tc_queue, proc_key=key, sid_list=sid_list, diag=False, mgt=True + ) + + # MISSING: imtq board deactivation to OFF (implementation pending by Jakob) + pack_acs_command(tc_queue=tc_queue, op_code="acs-off") + + +""" def listen_to_hk_for_x_seconds( tc_queue: TcQueueT, diag: bool, @@ -90,20 +274,6 @@ def listen_to_hk_for_x_seconds( interval_seconds: float, collection_time: float, ): - """ - enable_periodic_hk_command_with_interval( - diag: bool, sid: bytes, interval_seconds: float, ssc: int - """ - """ - function with periodic HK generation - interval_seconds = at which rate HK is saved - collection_time = how long the HK is saved for - device = for which device the HK is saved - functional_test = - diagnostic Hk = yes diagnostic or no diagnostic - sid = structural ID for specific device - device Hk set ID - """ tc_queue.appendleft((QueueCommands.PRINT, f"Enabling periodic HK for {device}")) cmd_tuple = enable_periodic_hk_command_with_interval( @@ -118,3 +288,98 @@ def listen_to_hk_for_x_seconds( tc_queue.appendleft( disable_periodic_hk_command(diag=diag, sid=sid, ssc=0).pack_command_tuple() ) +""" + + +def enable_listen_to_hk_for_x_seconds( + tc_queue: TcQueueT, + diag: bool, + device: str, + sid: bytes, + interval_seconds: float, +): + tc_queue.appendleft((QueueCommands.PRINT, f"Enabling periodic HK for {device}")) + cmd_tuple = enable_periodic_hk_command_with_interval( + diag=diag, sid=sid, interval_seconds=interval_seconds, ssc=0 + ) + for cmd in cmd_tuple: + tc_queue.appendleft(cmd.pack_command_tuple()) + + tc_queue.appendleft((QueueCommands.WAIT, 2.0)) + + +def disable_listen_to_hk_for_x_seconds( + tc_queue: TcQueueT, + diag: bool, + device: str, + sid: bytes, +): + tc_queue.appendleft((QueueCommands.PRINT, f"Disabling periodic HK for {device}")) + tc_queue.appendleft( + disable_periodic_hk_command(diag=diag, sid=sid, ssc=0).pack_command_tuple() + ) + + +def activate_mgts_alternately( + tc_queue: TcQueueT, +): + + command = pack_dipole_command( + object_id=oids.IMTQ_HANDLER_ID, + x_dipole=2000, + y_dipole=0, + z_dipole=0, + duration=30000, + ) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft((QueueCommands.WAIT, 10.0)) + + command = pack_dipole_command( + object_id=oids.IMTQ_HANDLER_ID, + x_dipole=-2000, + y_dipole=0, + z_dipole=0, + duration=30000, + ) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft((QueueCommands.WAIT, 10.0)) + + command = pack_dipole_command( + object_id=oids.IMTQ_HANDLER_ID, + x_dipole=0, + y_dipole=2000, + z_dipole=0, + duration=30000, + ) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft((QueueCommands.WAIT, 10.0)) + + command = pack_dipole_command( + object_id=oids.IMTQ_HANDLER_ID, + x_dipole=0, + y_dipole=-2000, + z_dipole=0, + duration=30000, + ) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft((QueueCommands.WAIT, 10.0)) + + command = pack_dipole_command( + object_id=oids.IMTQ_HANDLER_ID, + x_dipole=0, + y_dipole=0, + z_dipole=2000, + duration=30000, + ) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft((QueueCommands.WAIT, 10.0)) + + command = pack_dipole_command( + object_id=oids.IMTQ_HANDLER_ID, + x_dipole=0, + y_dipole=0, + z_dipole=-2000, + duration=30000, + ) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft((QueueCommands.WAIT, 10.0))