diff --git a/.run/IMTQ Dipole.run.xml b/.run/IMTQ Dipole.run.xml
new file mode 100644
index 0000000..b027275
--- /dev/null
+++ b/.run/IMTQ Dipole.run.xml
@@ -0,0 +1,24 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/PDU1.run.xml b/.run/PDU1.run.xml
new file mode 100644
index 0000000..f93718a
--- /dev/null
+++ b/.run/PDU1.run.xml
@@ -0,0 +1,24 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/PDU2.run.xml b/.run/PDU2.run.xml
new file mode 100644
index 0000000..e0c2883
--- /dev/null
+++ b/.run/PDU2.run.xml
@@ -0,0 +1,24 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/RW All Off.run.xml b/.run/RW All Off.run.xml
new file mode 100644
index 0000000..482a5e7
--- /dev/null
+++ b/.run/RW All Off.run.xml
@@ -0,0 +1,24 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/RW All On.run.xml b/.run/RW All On.run.xml
new file mode 100644
index 0000000..bc80f1a
--- /dev/null
+++ b/.run/RW All On.run.xml
@@ -0,0 +1,24 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/RW1.run.xml b/.run/RW1.run.xml
new file mode 100644
index 0000000..f5f1941
--- /dev/null
+++ b/.run/RW1.run.xml
@@ -0,0 +1,24 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/RW2.run.xml b/.run/RW2.run.xml
new file mode 100644
index 0000000..58e7433
--- /dev/null
+++ b/.run/RW2.run.xml
@@ -0,0 +1,24 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/RW3.run.xml b/.run/RW3.run.xml
new file mode 100644
index 0000000..7548b59
--- /dev/null
+++ b/.run/RW3.run.xml
@@ -0,0 +1,24 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/RW4.run.xml b/.run/RW4.run.xml
new file mode 100644
index 0000000..2ada545
--- /dev/null
+++ b/.run/RW4.run.xml
@@ -0,0 +1,24 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/gomspace/gomspace_common.py b/gomspace/gomspace_common.py
index 0c1e0a8..1ce71a5 100644
--- a/gomspace/gomspace_common.py
+++ b/gomspace/gomspace_common.py
@@ -31,46 +31,6 @@ class GomspaceDeviceActionIds(enum.IntEnum):
PRINT_LATCHUPS = 33
-class GomspaceOpCodes:
- # Request HK
- REQUEST_CORE_HK_ONCE = ["hk_core"]
- REQUEST_AUX_HK_ONCE = ["hk_aux"]
- PRINT_SWITCH_V_I = ["print_switch_vi"]
- PRINT_LATCHUPS = ["print_latchups"]
- GET_PARAM = ["get_param"]
- SET_INTEGER_PARAM = ["set_int_param"]
- SAVE_TABLE = ["save_table"]
- RESET_GND_WATCHDOG = ["reset_gnd_wdt"]
- SAVE_TABLE_DEFAULT = ["save_table_default"]
- LOAD_TABLE = ["load_table"]
- REQUEST_CONFIG_TABLE = ["cfg_table"]
-
-
-class GsInfo:
- REQUEST_CORE_HK_ONCE = "Requesting Core HK once"
- REQUEST_AUX_HK_ONCE = "Requesting Aux HK once"
- PRINT_SWITCH_V_I = "Print Switch V I Info"
- PRINT_LATCHUPS = "Print latchups"
- GET_PARAMETER = "Get parameter"
- SET_PARAMETER = "Set integer parameter"
- REQUEST_CONFIG_TABLE = "Request Config Table"
- RESET_GND_WATCHDOG = "Reset GND watchdog"
- SAVE_TABLE = "Save table non-volatile (file)"
- SAVE_TABLE_DEFAULT = "Save table non-volatile (default)"
- LOAD_TABLE = "Load Table"
-
-
-class SetIds:
- PDU_1_CORE = 1
- PDU_1_AUX = 2
- PDU_2_CORE = 3
- PDU_2_AUX = 4
- P60_CORE = 5
- P60_AUX = 6
- ACU_CORE = 7
- ACU_AUX = 8
-
-
class ParamTypes(enum.Enum):
U8 = 0
U16 = 1
diff --git a/pus_tc/cmd_definitions.py b/pus_tc/cmd_definitions.py
index f9fe83e..31a26cf 100644
--- a/pus_tc/cmd_definitions.py
+++ b/pus_tc/cmd_definitions.py
@@ -185,26 +185,6 @@ def add_time_cmds(defs: TmtcDefinitionWrapper):
)
-@tmtc_definitions_provider
-def add_imtq_cmds(defs: TmtcDefinitionWrapper):
- oce = OpCodeEntry()
- oce.add("0", "Mode Off")
- oce.add("1", "Mode On")
- oce.add("2", "Mode Normal")
- oce.add("3", "IMTQ perform pos X self test")
- oce.add("4", "IMTQ perform neg X self test")
- oce.add("5", "IMTQ perform pos Y self test")
- oce.add("6", "IMTQ perform neg Y self test")
- oce.add("7", "IMTQ perform pos Z self test")
- oce.add("8", "IMTQ perform neg Z self test")
- oce.add("9", "IMTQ command dipole")
- oce.add("10", "IMTQ get commanded dipole")
- oce.add("11", "IMTQ get engineering hk set")
- oce.add("12", "IMTQ get calibrated MTM measurement one shot")
- oce.add("13", "IMTQ get raw MTM measurement one shot")
- defs.add_service(CustomServiceList.IMTQ.value, "IMQT Device", oce)
-
-
@tmtc_definitions_provider
def add_system_cmds(defs: TmtcDefinitionWrapper):
from pus_tc.system.acs import AcsOpCodes, SusOpCodes
diff --git a/pus_tc/devs/reaction_wheels.py b/pus_tc/devs/reaction_wheels.py
deleted file mode 100644
index d83d592..0000000
--- a/pus_tc/devs/reaction_wheels.py
+++ /dev/null
@@ -1,219 +0,0 @@
-# -*- coding: utf-8 -*-
-"""reaction_wheels.py
-@brief Tests for the reaction wheel handler
-@author J. Meier
-@date 20.06.2021
-"""
-import struct
-
-from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry
-from tmtccmd.config.tmtc import tmtc_definitions_provider
-from tmtccmd.tc import DefaultPusQueueHelper
-from tmtccmd.tc.pus_3_fsfw_hk import (
- generate_one_hk_command,
- generate_one_diag_command,
- make_sid,
-)
-from spacepackets.ecss.tc import PusTelecommand
-from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes, Subservices
-from config.definitions import CustomServiceList
-
-
-class OpCodesDevs:
- SPEED = ["0", "speed"]
- ON = ["1", "on"]
- NML = ["2", "nml"]
- OFF = ["3", "off"]
- GET_STATUS = ["4", "status"]
- GET_TM = ["5", "tm"]
-
-
-class InfoDevs:
- SPEED = "Set speed"
- ON = "Set On"
- NML = "Set Normal"
- OFF = "Set Off"
- GET_STATUS = "Get Status HK"
- GET_TM = "Get TM HK"
-
-
-class OpCodesAss:
- ON = ["0", "on"]
- NML = ["1", "nml"]
- OFF = ["2", "off"]
-
-
-class InfoAss:
- ON = "Mode On: 3/4 RWs min. on"
- NML = "Mode Normal: 3/4 RWs min. normal"
- OFF = "Mode Off: All RWs off"
-
-
-class RwSetIds:
- STATUS_SET_ID = 4
- TEMPERATURE_SET_ID = 8
- LAST_RESET = 2
- TM_SET = 9
-
-
-class RwCommandIds:
- RESET_MCU = bytearray([0x0, 0x0, 0x0, 0x01])
- # Reads status information from reaction wheel into dataset with id 4
- GET_RW_STATUS = bytearray([0x0, 0x0, 0x0, 0x04])
- INIT_RW_CONTROLLER = bytearray([0x0, 0x0, 0x0, 0x05])
- SET_SPEED = bytearray([0x0, 0x0, 0x0, 0x06])
- # Reads temperature from reaction wheel into dataset with id 8
- GET_TEMPERATURE = bytearray([0x0, 0x0, 0x0, 0x08])
- GET_TM = bytearray([0x0, 0x0, 0x0, 0x09])
-
-
-class SpeedDefinitions:
- RPM_100 = 1000
- RPM_5000 = 5000
-
-
-class RampTime:
- MS_1000 = 1000
-
-
-@tmtc_definitions_provider
-def add_rw_cmds(defs: TmtcDefinitionWrapper):
- oce = OpCodeEntry()
- oce.add(info=InfoDevs.SPEED, keys=OpCodesDevs.SPEED)
- oce.add(info=InfoDevs.ON, keys=OpCodesDevs.ON)
- oce.add(info=InfoDevs.OFF, keys=OpCodesDevs.OFF)
- oce.add(info=InfoDevs.NML, keys=OpCodesDevs.NML)
- oce.add(info=InfoDevs.GET_STATUS, keys=OpCodesDevs.GET_STATUS)
- oce.add(info=InfoDevs.GET_TM, keys=OpCodesDevs.GET_TM)
- defs.add_service(
- name=CustomServiceList.REACTION_WHEEL_1.value,
- info="Reaction Wheel 1",
- op_code_entry=oce,
- )
- defs.add_service(
- name=CustomServiceList.REACTION_WHEEL_2.value,
- info="Reaction Wheel 2",
- op_code_entry=oce,
- )
- defs.add_service(
- name=CustomServiceList.REACTION_WHEEL_3.value,
- info="Reaction Wheel 3",
- op_code_entry=oce,
- )
- defs.add_service(
- name=CustomServiceList.REACTION_WHEEL_4.value,
- info="Reaction Wheel 4",
- op_code_entry=oce,
- )
- oce = OpCodeEntry()
- oce.add(info=InfoAss.ON, keys=OpCodesAss.ON)
- oce.add(info=InfoAss.NML, keys=OpCodesAss.NML)
- oce.add(info=InfoAss.OFF, keys=OpCodesAss.OFF)
- defs.add_service(
- name=CustomServiceList.RW_ASSEMBLY.value,
- info="Reaction Wheel Assembly",
- op_code_entry=oce,
- )
-
-
-def pack_single_rw_test_into(
- object_id: bytes, rw_idx: int, q: DefaultPusQueueHelper, op_code: str
-):
- if op_code in OpCodesDevs.SPEED:
- speed = int(input("Specify speed [0.1 RPM]: "))
- ramp_time = int(input("Specify ramp time [ms]: "))
- q.add_log_cmd(f"RW {rw_idx}: {InfoDevs.SPEED}")
- q.add_pus_tc(pack_set_speed_command(object_id, speed, ramp_time))
-
- if op_code in OpCodesDevs.ON:
- q.add_log_cmd(f"RW {rw_idx}: {InfoDevs.ON}")
- mode_data = pack_mode_data(object_id, Modes.ON, 0)
- q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
-
- if op_code in OpCodesDevs.NML:
- q.add_log_cmd(f"RW {rw_idx}: {InfoDevs.NML}")
- mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
- q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
-
- if op_code in OpCodesDevs.OFF:
- q.add_log_cmd(f"RW {rw_idx}: {InfoDevs.OFF}")
- mode_data = pack_mode_data(object_id, Modes.OFF, 0)
- q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
-
- if op_code in OpCodesDevs.GET_TM:
- q.add_log_cmd(f"RW {rw_idx}: {InfoDevs.GET_TM}")
- q.add_pus_tc(
- generate_one_hk_command(
- sid=make_sid(object_id=object_id, set_id=RwSetIds.TM_SET)
- )
- )
- if op_code in OpCodesDevs.GET_STATUS:
- q.add_log_cmd(f"RW {rw_idx}: {InfoDevs.GET_STATUS}")
- q.add_pus_tc(
- generate_one_diag_command(
- sid=make_sid(object_id=object_id, set_id=RwSetIds.STATUS_SET_ID)
- )
- )
-
-
-def pack_rw_ass_cmds(q: DefaultPusQueueHelper, object_id: bytes, op_code: str):
- if op_code in OpCodesAss.OFF:
- data = pack_mode_data(object_id=object_id, mode=Modes.OFF, submode=0)
- q.add_pus_tc(
- PusTelecommand(
- service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=data
- )
- )
- if op_code in OpCodesAss.ON:
- data = pack_mode_data(object_id=object_id, mode=Modes.ON, submode=0)
- q.add_pus_tc(
- PusTelecommand(
- service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=data
- )
- )
- if op_code in OpCodesAss.NML:
- data = pack_mode_data(object_id=object_id, mode=Modes.NORMAL, submode=0)
- q.add_pus_tc(
- PusTelecommand(
- service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=data
- )
- )
-
-
-def pack_set_speed_command(
- object_id: bytes, speed: int, ramp_time_ms: int
-) -> PusTelecommand:
- """With this function a command is packed to set the speed of a reaction wheel
- :param object_id: The object id of the reaction wheel handler.
- :param speed: Valid speeds are [-65000, -1000] and [1000, 65000]. Values are
- specified in 0.1 * RPM
- :param ramp_time_ms: The time after which the reaction wheel will reach the commanded speed.
- Valid times are 10 - 10000 ms
- """
- if speed > 0:
- if speed < 1000 or speed > 65000:
- raise ValueError(
- "Invalid RW speed specified. "
- "Allowed range is [1000, 65000] 0.1 * RPM"
- )
- elif speed < 0:
- if speed < -65000 or speed > -1000:
- raise ValueError(
- "Invalid RW speed specified. "
- "Allowed range is [-65000, -1000] 0.1 * RPM"
- )
- else:
- # Speed is 0
- pass
-
- if ramp_time_ms < 0 or (
- ramp_time_ms > 0 and (ramp_time_ms > 10000 or ramp_time_ms < 10)
- ):
- raise ValueError("Invalid Ramp Speed time. Allowed range is [10-10000] ms")
- command_id = RwCommandIds.SET_SPEED
- command = bytearray()
- command += object_id + command_id
- command = command + struct.pack("!i", speed)
- command = command + ramp_time_ms.to_bytes(length=2, byteorder="big")
- command = PusTelecommand(service=8, subservice=128, app_data=command)
- return command
diff --git a/pus_tc/procedure_packer.py b/pus_tc/procedure_packer.py
index 862d8a1..a788af3 100644
--- a/pus_tc/procedure_packer.py
+++ b/pus_tc/procedure_packer.py
@@ -27,11 +27,10 @@ from tmtc.power.p60dock import pack_p60dock_cmds
from tmtc.power.pdu2 import pack_pdu2_commands
from tmtc.power.pdu1 import pack_pdu1_commands
from tmtc.power.acu import pack_acu_commands
-from tmtc.solar_array_deployment import pack_solar_array_deployment_test_into
-from pus_tc.devs.imtq import pack_imtq_test_into
+from tmtc.acs.imtq import pack_imtq_test_into
from pus_tc.devs.tmp1075 import pack_tmp1075_test_into
from pus_tc.devs.heater import pack_heater_cmds
-from pus_tc.devs.reaction_wheels import pack_single_rw_test_into, pack_rw_ass_cmds
+from tmtc.acs.reaction_wheels import pack_single_rw_test_into, pack_rw_ass_cmds
from pus_tc.devs.rad_sensor import pack_rad_sensor_test_into
from tmtc.ploc_memory_dumper import pack_ploc_memory_dumper_cmd
from pus_tc.devs.ccsds_handler import pack_ccsds_handler_test
@@ -65,7 +64,6 @@ from config.object_ids import (
PDEC_HANDLER_ID,
STR_IMG_HELPER_ID,
SYRLINKS_HANDLER_ID,
- SOLAR_ARRAY_DEPLOYMENT_ID,
RW_ASSEMBLY,
get_object_ids,
)
diff --git a/pus_tc/system/acs.py b/pus_tc/system/acs.py
index 6c6005c..6558fe1 100644
--- a/pus_tc/system/acs.py
+++ b/pus_tc/system/acs.py
@@ -8,20 +8,20 @@ from .common import command_mode
class AcsOpCodes:
- ACS_ASS_A_SIDE = ["0", "acs-a"]
- ACS_ASS_B_SIDE = ["1", "acs-b"]
- ACS_ASS_DUAL_MODE = ["2", "acs-d"]
- ACS_ASS_OFF = ["3", "acs-off"]
- ACS_ASS_A_ON = ["4", "acs-ao"]
- ACS_ASS_B_ON = ["5", "acs-bo"]
- ACS_ASS_DUAL_ON = ["6", "acs-do"]
+ ACS_ASS_A_SIDE = ["0", "a"]
+ ACS_ASS_B_SIDE = ["1", "b"]
+ ACS_ASS_DUAL_MODE = ["2", "dual"]
+ ACS_ASS_OFF = ["3", "off"]
+ ACS_ASS_A_ON = ["4", "a_on"]
+ ACS_ASS_B_ON = ["5", "b_on"]
+ ACS_ASS_DUAL_ON = ["6", "dual_on"]
class SusOpCodes:
- SUS_ASS_NOM_SIDE = ["0", "sus-nom"]
- SUS_ASS_RED_SIDE = ["1", "sus-red"]
- SUS_ASS_DUAL_MODE = ["2", "sus-d"]
- SUS_ASS_OFF = ["3", "sus-off"]
+ SUS_ASS_NOM_SIDE = ["0", "nom"]
+ SUS_ASS_RED_SIDE = ["1", "red"]
+ SUS_ASS_DUAL_MODE = ["2", "dual"]
+ SUS_ASS_OFF = ["3", "off"]
class DualSideSubmodes(enum.IntEnum):
diff --git a/pus_tc/system/proc.py b/pus_tc/system/proc.py
index 7782176..081419f 100644
--- a/pus_tc/system/proc.py
+++ b/pus_tc/system/proc.py
@@ -2,7 +2,6 @@ from __future__ import annotations
import time
from datetime import timedelta
-from typing import List
from config.definitions import CustomServiceList
from config.object_ids import get_object_ids
@@ -22,25 +21,23 @@ import config.object_ids as oids
from pus_tc.system.tcs import OpCodes as TcsOpCodes
from pus_tc.devs.bpx_batt import BpxSetIds
from pus_tc.system.core import SetIds as CoreSetIds
-from gomspace.gomspace_common import SetIds as GsSetIds
+from tmtc.power.common_power import SetIds as GsSetIds
from pus_tc.devs.rad_sensor import SetIds as RadSetIds
from pus_tc.devs.mgms import MgmLis3SetIds as MgmLis3SetIds_0_2
from pus_tc.devs.mgms import MgmRm3100SetIds as MgmRm3100SetIds_1_3
from pus_tc.devs.gyros import AdisGyroSetIds as AdisGyroSetIds_0_2
from pus_tc.devs.gyros import L3gGyroSetIds as L3gGyroSetIds_1_3
-from pus_tc.devs.syrlinks_hk_handler import OpCodes as SyrlinksOpCodes
from pus_tc.devs.syrlinks_hk_handler import SetIds as SyrlinksSetIds
-from pus_tc.devs.star_tracker import OpCodes as StrOpCodes
from pus_tc.devs.gps import SetIds as GpsSetIds
-from pus_tc.devs.imtq import ImtqSetIds
+from tmtc.acs.imtq import ImtqSetIds
from pus_tc.devs.sus import SetIds
from pus_tc.devs.star_tracker import SetIds as StrSetIds
-from pus_tc.devs.reaction_wheels import RwSetIds
+from tmtc.acs.reaction_wheels import RwSetIds, rw_speed_up_cmd_consec
from pus_tc.system.controllers import pack_cmd_ctrl_to_off, pack_cmd_ctrl_to_nml
from pus_tc.system.acs import pack_acs_command, pack_sus_cmds
-from pus_tc.devs.imtq import pack_imtq_test_into, pack_dipole_command
+from tmtc.acs.imtq import pack_imtq_test_into, pack_dipole_command
from pus_tc.devs.star_tracker import pack_star_tracker_commands
-from pus_tc.devs.reaction_wheels import pack_rw_ass_cmds, pack_set_speed_command
+from tmtc.acs.reaction_wheels import pack_rw_ass_cmds, pack_set_speed_command
class OpCodes:
@@ -247,14 +244,14 @@ def pack_proc_commands(q: DefaultPusQueueHelper, op_code: str):
if op_code in OpCodes.PCDU_FT:
key = KAI.PCDU_FT[0]
pcdu_pairs = [
- (oids.P60_DOCK_HANDLER, GsSetIds.P60_CORE),
- (oids.PDU_1_HANDLER_ID, GsSetIds.PDU_1_CORE),
- (oids.PDU_2_HANDLER_ID, GsSetIds.PDU_2_CORE),
- (oids.ACU_HANDLER_ID, GsSetIds.ACU_CORE),
- (oids.P60_DOCK_HANDLER, GsSetIds.P60_AUX),
- (oids.PDU_1_HANDLER_ID, GsSetIds.PDU_1_AUX),
- (oids.PDU_2_HANDLER_ID, GsSetIds.PDU_2_AUX),
- (oids.ACU_HANDLER_ID, GsSetIds.ACU_AUX),
+ (oids.P60_DOCK_HANDLER, GsSetIds.CORE),
+ (oids.PDU_1_HANDLER_ID, GsSetIds.CORE),
+ (oids.PDU_2_HANDLER_ID, GsSetIds.CORE),
+ (oids.ACU_HANDLER_ID, GsSetIds.CORE),
+ (oids.P60_DOCK_HANDLER, GsSetIds.AUX),
+ (oids.PDU_1_HANDLER_ID, GsSetIds.AUX),
+ (oids.PDU_2_HANDLER_ID, GsSetIds.AUX),
+ (oids.ACU_HANDLER_ID, GsSetIds.AUX),
]
diag_list = [
@@ -587,10 +584,10 @@ def pack_proc_commands(q: DefaultPusQueueHelper, op_code: str):
]
# HK listening
pack_generic_hk_listening_cmds(
- tc_queue=tc_queue,
+ q=q,
proc_key=key,
sid_list=sid_list,
- diag=False,
+ diag_list=[False],
cfg=GenericHkListeningCfg.default(),
)
if op_code in OpCodes.STR_FT:
@@ -726,7 +723,7 @@ def enable_listen_to_hk_for_x_seconds(
):
q.add_log_cmd(f"Enabling periodic HK for {device}")
cmd_tuple = enable_periodic_hk_command_with_interval(
- diag=diag, sid=sid, interval_seconds=interval_seconds, ssc=0
+ diag=diag, sid=sid, interval_seconds=interval_seconds
)
for cmd in cmd_tuple:
q.add_pus_tc(cmd)
@@ -827,24 +824,6 @@ def rw_speed_cmd_single(
q.add_pus_tc(pack_set_speed_command(object_id=oid, speed=0, ramp_time_ms=ramp_time))
-def rw_speed_up_cmd_consec(
- q: DefaultPusQueueHelper, obids: List[bytes], speed: int, ramp_time: int
-):
- for oid in obids:
- q.add_pus_tc(
- pack_set_speed_command(object_id=oid, speed=speed, ramp_time_ms=ramp_time)
- )
-
-
-def rw_speed_down_cmd_consec(
- q: DefaultPusQueueHelper, obids: List[bytes], ramp_time: int
-):
- for oid in obids:
- q.add_pus_tc(
- pack_set_speed_command(object_id=oid, speed=0, ramp_time_ms=ramp_time)
- )
-
-
def activate_all_rws_in_sequence(
q: DefaultPusQueueHelper, init_ssc: int, test_speed: int, test_ramp_time: int
):
diff --git a/pus_tm/action_reply_handler.py b/pus_tm/action_reply_handler.py
index 520ae26..cb07dc4 100644
--- a/pus_tm/action_reply_handler.py
+++ b/pus_tm/action_reply_handler.py
@@ -1,6 +1,6 @@
import struct
from config.object_ids import *
-from pus_tc.devs.imtq import ImtqActionIds
+from tmtc.acs.imtq import ImtqActionIds
from pus_tm.defs import PrintWrapper
from tmtc.ploc_mpsoc import PlocReplyIds
from tmtc.ploc_supervisor import SupvActionIds
diff --git a/pus_tm/devs/reaction_wheels.py b/pus_tm/devs/reaction_wheels.py
deleted file mode 100644
index 6e072f5..0000000
--- a/pus_tm/devs/reaction_wheels.py
+++ /dev/null
@@ -1,124 +0,0 @@
-import struct
-
-from pus_tm.defs import PrintWrapper, FsfwTmTcPrinter
-from tmtccmd.util.obj_id import ObjectIdU32
-
-
-def handle_rw_hk_data(
- printer: FsfwTmTcPrinter, object_id: ObjectIdU32, set_id: int, hk_data: bytes
-):
- from pus_tc.devs.reaction_wheels import RwSetIds
-
- pw = PrintWrapper(printer)
- current_idx = 0
- if set_id == RwSetIds.STATUS_SET_ID:
- pw.dlog(
- f"Received Status HK (ID {set_id}) from Reaction Wheel {object_id.name}"
- )
- fmt_str = "!IiiBB"
- inc_len = struct.calcsize(fmt_str)
- (temp, speed, ref_speed, state, clc_mode) = struct.unpack(
- fmt_str, hk_data[current_idx : current_idx + inc_len]
- )
- current_idx += inc_len
- speed_rpm = speed / 10.0
- ref_speed_rpm = ref_speed / 10.0
- pw.dlog(
- f"Temperature {temp} C | Speed {speed_rpm} rpm | Reference Speed {ref_speed_rpm} rpm"
- )
- pw.dlog(
- f"State {state}. 0: Error, 1: Idle, 2: Coasting, 3: Running, speed stable, "
- f"4: Running, speed changing"
- )
- pw.dlog(
- f"Current Limit Control mode {clc_mode}. 0: Low Current Mode (0.3 A), "
- f"1: High Current Mode (0.6 A)"
- )
- printer.print_validity_buffer(hk_data[current_idx:], 5)
- if set_id == RwSetIds.LAST_RESET:
- pw.dlog(
- f"Received Last Reset HK (ID {set_id}) from Reaction Wheel {object_id.name}"
- )
- fmt_str = "!BB"
- inc_len = struct.calcsize(fmt_str)
- (last_not_cleared_reset_status, current_reset_status) = struct.unpack(
- fmt_str, hk_data[current_idx : current_idx + inc_len]
- )
- current_idx += inc_len
- pw.dlog(
- f"Last Non-Cleared (Cached) Reset Status {last_not_cleared_reset_status} | "
- f"Current Reset Status {current_reset_status}"
- )
- if set_id == RwSetIds.TM_SET:
- pw.dlog(f"Received TM HK (ID {set_id}) from Reaction Wheel {object_id.name}")
- fmt_str = "!BiffBBiiIIIIIIIIIIIIIIII"
- inc_len = struct.calcsize(fmt_str)
- (
- last_reset_status,
- mcu_temp,
- pressure_sens_temp,
- pressure,
- state,
- clc_mode,
- current_speed,
- ref_speed,
- num_invalid_crc_packets,
- num_invalid_len_packets,
- num_invalid_cmd_packets,
- num_of_cmd_executed_requests,
- num_of_cmd_replies,
- uart_num_of_bytes_written,
- uart_num_of_bytes_read,
- uart_num_parity_errors,
- uart_num_noise_errors,
- uart_num_frame_errors,
- uart_num_reg_overrun_errors,
- uart_total_num_errors,
- spi_num_bytes_written,
- spi_num_bytes_read,
- spi_num_reg_overrun_errors,
- spi_total_num_errors,
- ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
-
- pw.dlog(
- f"MCU Temperature {mcu_temp} | Pressure Sensore Temperature {pressure_sens_temp} C"
- )
- pw.dlog(f"Last Reset Status {last_reset_status}")
- pw.dlog(
- f"Current Limit Control mode {clc_mode}. 0: Low Current Mode (0.3 A), "
- f"1: High Current Mode (0.6 A)"
- )
- pw.dlog(f"Speed {current_speed} rpm | Reference Speed {ref_speed} rpm")
- pw.dlog(
- f"State {state}. 0: Error, 1: Idle, 2: Coasting, 3: Running, speed stable, "
- f"4: Running, speed changing"
- )
- pw.dlog(f"Number Of Invalid Packets:")
- pw.dlog("CRC | Length | CMD")
- pw.dlog(
- f"{num_invalid_crc_packets} | {num_invalid_len_packets} | {num_invalid_cmd_packets}"
- )
- pw.dlog(
- f"Num Of CMD Executed Requests {num_of_cmd_executed_requests} | "
- f"Num of CMD Replies {num_of_cmd_replies}"
- )
- pw.dlog("UART COM information:")
- pw.dlog(
- f"NumBytesWritten | NumBytesRead | ParityErrs | NoiseErrs | FrameErrs | "
- f"RegOverrunErrs | TotalErrs"
- )
- pw.dlog(
- f"{uart_num_of_bytes_written} | {uart_num_of_bytes_read} | {uart_num_parity_errors} | "
- f"{uart_num_noise_errors} | {uart_num_frame_errors} | {uart_num_reg_overrun_errors} | "
- f"{uart_total_num_errors}"
- )
- pw.dlog("SPI COM Info:")
- pw.dlog(f"NumBytesWritten | NumBytesRead | RegOverrunErrs | TotalErrs")
- pw.dlog(
- f"{spi_num_bytes_written} | {spi_num_bytes_read} | {spi_num_reg_overrun_errors} | "
- f"{spi_total_num_errors}"
- )
- if current_idx > 0:
- printer.print_validity_buffer(
- validity_buffer=hk_data[current_idx:], num_vars=27
- )
diff --git a/pus_tm/hk_handling.py b/pus_tm/hk_handling.py
index 1077246..24b0475 100644
--- a/pus_tm/hk_handling.py
+++ b/pus_tm/hk_handling.py
@@ -6,6 +6,7 @@ from pus_tm.devs.rad_sensor import handle_rad_sensor_data
from pus_tm.devs.sus import handle_sus_hk
from pus_tm.system.tcs import handle_thermal_controller_hk_data
from tmtc.ploc_supervisor import handle_supv_hk_data
+from tmtc.acs.reaction_wheels import handle_rw_hk_data
from tmtccmd.tm.pus_3_fsfw_hk import (
Service3Base,
HkContentType,
@@ -25,8 +26,7 @@ from pus_tm.devs.imtq_mgt import (
)
from tmtc.power.tm import handle_pdu_data, handle_p60_hk_data, handle_acu_hk_data
from pus_tm.devs.syrlinks import handle_syrlinks_hk_data
-from pus_tc.devs.imtq import ImtqSetIds
-from pus_tm.devs.reaction_wheels import handle_rw_hk_data
+from tmtc.acs.imtq import ImtqSetIds
from pus_tm.defs import FsfwTmTcPrinter
from pus_tm.system.core import handle_core_hk_data
from pus_tm.devs.mgms import handle_mgm_hk_data
diff --git a/tmtc/acs/__init__.py b/tmtc/acs/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/pus_tc/devs/imtq.py b/tmtc/acs/imtq.py
similarity index 72%
rename from pus_tc/devs/imtq.py
rename to tmtc/acs/imtq.py
index c78127a..2287ce9 100644
--- a/pus_tc/devs/imtq.py
+++ b/tmtc/acs/imtq.py
@@ -7,7 +7,13 @@
"""
import struct
+from config.definitions import CustomServiceList
from spacepackets.ecss.tc import PusTelecommand
+from tmtccmd.config.tmtc import (
+ tmtc_definitions_provider,
+ OpCodeEntry,
+ TmtcDefinitionWrapper,
+)
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import (
make_sid,
@@ -18,6 +24,13 @@ from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
from tmtccmd.util import ObjectIdU32
+class OpCodes:
+ ON = ["on"]
+ NORMAL = ["normal"]
+ OFF = ["off"]
+ SET_DIPOLE = ["set_dipole"]
+
+
class ImtqSetIds:
ENG_HK_SET = 1
CAL_MTM_SET = 2
@@ -44,20 +57,40 @@ class ImtqActionIds:
read_self_test_results = bytearray([0x0, 0x0, 0x0, 0x0D])
+@tmtc_definitions_provider
+def add_imtq_cmds(defs: TmtcDefinitionWrapper):
+ oce = OpCodeEntry()
+ oce.add(OpCodes.OFF, "Mode Off")
+ oce.add(OpCodes.ON, "Mode On")
+ oce.add(OpCodes.NORMAL, "Mode Normal")
+ oce.add("3", "IMTQ perform pos X self test")
+ oce.add("4", "IMTQ perform neg X self test")
+ oce.add("5", "IMTQ perform pos Y self test")
+ oce.add("6", "IMTQ perform neg Y self test")
+ oce.add("7", "IMTQ perform pos Z self test")
+ oce.add("8", "IMTQ perform neg Z self test")
+ oce.add(OpCodes.SET_DIPOLE, "IMTQ command dipole")
+ oce.add("10", "IMTQ get commanded dipole")
+ oce.add("11", "IMTQ get engineering hk set")
+ oce.add("12", "IMTQ get calibrated MTM measurement one shot")
+ oce.add("13", "IMTQ get raw MTM measurement one shot")
+ defs.add_service(CustomServiceList.IMTQ.value, "IMQT Device", oce)
+
+
def pack_imtq_test_into(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str):
q.add_log_cmd(
f"Testing ISIS IMTQ handler with object id: {object_id.as_hex_string}"
)
- if op_code == "0":
+ if op_code in OpCodes.OFF:
q.add_log_cmd("IMTQ: Set mode off")
command = pack_mode_data(object_id.as_bytes, Modes.OFF, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command))
- if op_code == "1":
+ if op_code in OpCodes.ON:
q.add_log_cmd("IMTQ: Set mode on")
command = pack_mode_data(object_id.as_bytes, Modes.ON, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command))
- if op_code == "2":
+ if op_code in OpCodes.NORMAL:
q.add_log_cmd("IMTQ: Mode Normal")
command = pack_mode_data(object_id.as_bytes, Modes.NORMAL, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command))
@@ -134,12 +167,20 @@ def pack_imtq_test_into(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_cod
sid = make_sid(object_id.as_bytes, ImtqSetIds.NEGATIVE_Z_TEST)
q.add_pus_tc(generate_one_hk_command(sid))
- if op_code == "9":
- q.add_log_cmd("IMTQ: Commanding dipole")
- x_dipole = 0
- y_dipole = 0
- z_dipole = 0
- duration = 0 # ms
+ if op_code in OpCodes.SET_DIPOLE:
+ x_dipole = int(input("Specify X dipole [range [0, 2000] * 10^-4 * Am^2]: "))
+ y_dipole = int(input("Specify Y dipole [range [0, 2000] * 10^-4 * Am^2]: "))
+ z_dipole = int(input("Specify Z dipole [range [0, 2000] * 10^-4 * Am^2]: "))
+ duration = int(
+ input(
+ f"Specify torque duration [range [0, {pow(2, 16) - 1}, "
+ f"0 for continuous generation until update]: "
+ )
+ )
+ dur_str = "infinite" if duration == 0 else str(duration)
+ q.add_log_cmd(
+ f"IMTQ: Commanding dipole X={x_dipole}, Y={y_dipole}, Z={y_dipole}, duration={dur_str}"
+ )
q.add_pus_tc(
pack_dipole_command(
object_id.as_bytes, x_dipole, y_dipole, z_dipole, duration
@@ -184,19 +225,39 @@ def pack_dipole_command(
object_id: bytes, x_dipole: int, y_dipole: int, z_dipole: int, duration: int
) -> PusTelecommand:
"""This function packs the command causing the ISIS IMTQ to generate a dipole.
- @param object_id The object id of the IMTQ handler.
- @param x_dipole The dipole of the x coil in 10^-4*Am^2 (max. 2000)
- @param y_dipole The dipole of the y coil in 10^-4*Am^2 (max. 2000)
- @param z_dipole The dipole of the z coil in 10^-4*Am^2 (max. 2000)
- @param duration The duration in milliseconds the dipole will be generated by the coils.
+ :param object_id: The object id of the IMTQ handler.
+ :param x_dipole: The dipole of the x coil in 10^-4*Am^2 (max. 2000)
+ :param y_dipole: The dipole of the y coil in 10^-4*Am^2 (max. 2000)
+ :param z_dipole: The dipole of the z coil in 10^-4*Am^2 (max. 2000)
+ :param duration: The duration in milliseconds the dipole will be generated by the coils.
When set to 0, the dipole will be generated until a new dipole actuation
command is sent.
"""
action_id = ImtqActionIds.start_actuation_dipole
command = object_id + action_id
+ x_dipole = int(round(x_dipole))
+ y_dipole = int(round(y_dipole))
+ z_dipole = int(round(z_dipole))
+ if x_dipole < -2000 or x_dipole > 2000:
+ raise_dipole_error("X dipole", x_dipole)
+ if y_dipole < -2000 or y_dipole > 2000:
+ raise_dipole_error("Y dipole", y_dipole)
+ if z_dipole < -2000 or z_dipole > 2000:
+ raise_dipole_error("Z dipole", z_dipole)
+ duration = int(round(duration))
+ if duration < 0 or duration > pow(2, 16) - 1:
+ raise ValueError(
+ f"Duration in ms of {duration} smaller than 0 or larger than allowed {pow(2, 16) - 1}"
+ )
command += struct.pack("!h", x_dipole)
command += struct.pack("!h", y_dipole)
command += struct.pack("!h", z_dipole)
- command += struct.pack("!h", duration)
+ command += struct.pack("!H", duration)
command = PusTelecommand(service=8, subservice=128, app_data=command)
return command
+
+
+def raise_dipole_error(dipole_str: str, value: int):
+ raise ValueError(
+ f"{dipole_str} {value} negative or larger than maximum allowed 2000 * 10^-4*Am^2"
+ )
diff --git a/tmtc/acs/reaction_wheels.py b/tmtc/acs/reaction_wheels.py
new file mode 100644
index 0000000..74987d1
--- /dev/null
+++ b/tmtc/acs/reaction_wheels.py
@@ -0,0 +1,410 @@
+# -*- coding: utf-8 -*-
+"""reaction_wheels.py
+@brief Tests for the reaction wheel handler
+@author J. Meier
+@date 20.06.2021
+"""
+import struct
+from typing import List
+
+from pus_tm.defs import PrintWrapper
+from config.object_ids import RW1_ID, RW2_ID, RW3_ID, RW4_ID
+from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry
+from tmtccmd.config.tmtc import tmtc_definitions_provider
+from tmtccmd.tc import DefaultPusQueueHelper
+from tmtccmd.tc.pus_3_fsfw_hk import (
+ generate_one_hk_command,
+ generate_one_diag_command,
+ make_sid,
+ enable_periodic_hk_command_with_interval,
+ disable_periodic_hk_command,
+)
+from spacepackets.ecss.tc import PusTelecommand
+from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes, Subservices
+from config.definitions import CustomServiceList
+from tmtccmd.util import ObjectIdU32
+from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
+
+
+class OpCodesDevs:
+ SPEED = ["0", "speed"]
+ ON = ["1", "on"]
+ NML = ["2", "nml"]
+ OFF = ["3", "off"]
+ GET_STATUS = ["4", "status"]
+ GET_TM = ["5", "tm"]
+ ENABLE_STATUS_HK = ["6", "enable_status_hk"]
+ DISABLE_STATUS_HK = ["7", "disable_status_hk"]
+
+
+class InfoDevs:
+ SPEED = "Set speed"
+ ON = "Set On"
+ NML = "Set Normal"
+ OFF = "Set Off"
+ GET_STATUS = "Get Status HK"
+ GET_TM = "Get TM HK"
+ ENABLE_STATUS_HK = "Enable Status HK"
+ DISABLE_STATUS_HK = "Disable Status HK"
+
+
+class OpCodesAss:
+ ON = ["0", "on"]
+ NML = ["1", "nml"]
+ OFF = ["2", "off"]
+ ALL_SPEED_UP = ["3", "speed_up"]
+ ALL_SPEED_OFF = ["4", "speed_off"]
+
+
+class InfoAss:
+ ON = "Mode On: 3/4 RWs min. on"
+ NML = "Mode Normal: 3/4 RWs min. normal"
+ OFF = "Mode Off: All RWs off"
+ ALL_SPEED_UP = "Speed up consecutively"
+ ALL_SPEED_OFF = "Speed down to 0"
+
+
+class RwSetIds:
+ STATUS_SET_ID = 4
+ TEMPERATURE_SET_ID = 8
+ LAST_RESET = 2
+ TM_SET = 9
+
+
+class RwCommandIds:
+ RESET_MCU = bytearray([0x0, 0x0, 0x0, 0x01])
+ # Reads status information from reaction wheel into dataset with id 4
+ GET_RW_STATUS = bytearray([0x0, 0x0, 0x0, 0x04])
+ INIT_RW_CONTROLLER = bytearray([0x0, 0x0, 0x0, 0x05])
+ SET_SPEED = bytearray([0x0, 0x0, 0x0, 0x06])
+ # Reads temperature from reaction wheel into dataset with id 8
+ GET_TEMPERATURE = bytearray([0x0, 0x0, 0x0, 0x08])
+ GET_TM = bytearray([0x0, 0x0, 0x0, 0x09])
+
+
+class SpeedDefinitions:
+ RPM_100 = 1000
+ RPM_5000 = 5000
+
+
+class RampTime:
+ MS_1000 = 1000
+
+
+@tmtc_definitions_provider
+def add_rw_cmds(defs: TmtcDefinitionWrapper):
+ oce = OpCodeEntry()
+ oce.add(info=InfoDevs.SPEED, keys=OpCodesDevs.SPEED)
+ oce.add(info=InfoDevs.ON, keys=OpCodesDevs.ON)
+ oce.add(info=InfoDevs.OFF, keys=OpCodesDevs.OFF)
+ oce.add(info=InfoDevs.NML, keys=OpCodesDevs.NML)
+ oce.add(info=InfoDevs.GET_STATUS, keys=OpCodesDevs.GET_STATUS)
+ oce.add(info=InfoDevs.GET_TM, keys=OpCodesDevs.GET_TM)
+ oce.add(info=InfoDevs.ENABLE_STATUS_HK, keys=OpCodesDevs.ENABLE_STATUS_HK)
+ oce.add(info=InfoDevs.DISABLE_STATUS_HK, keys=OpCodesDevs.DISABLE_STATUS_HK)
+ defs.add_service(
+ name=CustomServiceList.REACTION_WHEEL_1.value,
+ info="Reaction Wheel 1",
+ op_code_entry=oce,
+ )
+ defs.add_service(
+ name=CustomServiceList.REACTION_WHEEL_2.value,
+ info="Reaction Wheel 2",
+ op_code_entry=oce,
+ )
+ defs.add_service(
+ name=CustomServiceList.REACTION_WHEEL_3.value,
+ info="Reaction Wheel 3",
+ op_code_entry=oce,
+ )
+ defs.add_service(
+ name=CustomServiceList.REACTION_WHEEL_4.value,
+ info="Reaction Wheel 4",
+ op_code_entry=oce,
+ )
+ oce = OpCodeEntry()
+ oce.add(info=InfoAss.ON, keys=OpCodesAss.ON)
+ oce.add(info=InfoAss.NML, keys=OpCodesAss.NML)
+ oce.add(info=InfoAss.OFF, keys=OpCodesAss.OFF)
+ oce.add(info=InfoAss.ALL_SPEED_UP, keys=OpCodesAss.ALL_SPEED_UP)
+ oce.add(info=InfoAss.ALL_SPEED_OFF, keys=OpCodesAss.ALL_SPEED_OFF)
+ defs.add_service(
+ name=CustomServiceList.RW_ASSEMBLY.value,
+ info="Reaction Wheel Assembly",
+ op_code_entry=oce,
+ )
+
+
+def pack_single_rw_test_into(
+ object_id: bytes, rw_idx: int, q: DefaultPusQueueHelper, op_code: str
+):
+ if op_code in OpCodesDevs.SPEED:
+ speed, ramp_time = prompt_speed_ramp_time()
+ q.add_log_cmd(
+ f"RW {rw_idx}: {InfoDevs.SPEED} with target "
+ f"speed {speed / 10.0} RPM and {ramp_time} ms ramp time"
+ )
+ q.add_pus_tc(pack_set_speed_command(object_id, speed, ramp_time))
+
+ if op_code in OpCodesDevs.ON:
+ q.add_log_cmd(f"RW {rw_idx}: {InfoDevs.ON}")
+ mode_data = pack_mode_data(object_id, Modes.ON, 0)
+ q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
+
+ if op_code in OpCodesDevs.NML:
+ q.add_log_cmd(f"RW {rw_idx}: {InfoDevs.NML}")
+ mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
+ q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
+
+ if op_code in OpCodesDevs.OFF:
+ q.add_log_cmd(f"RW {rw_idx}: {InfoDevs.OFF}")
+ mode_data = pack_mode_data(object_id, Modes.OFF, 0)
+ q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
+
+ if op_code in OpCodesDevs.GET_TM:
+ q.add_log_cmd(f"RW {rw_idx}: {InfoDevs.GET_TM}")
+ q.add_pus_tc(
+ generate_one_hk_command(
+ sid=make_sid(object_id=object_id, set_id=RwSetIds.TM_SET)
+ )
+ )
+ if op_code in OpCodesDevs.GET_STATUS:
+ q.add_log_cmd(f"RW {rw_idx}: {InfoDevs.GET_STATUS}")
+ q.add_pus_tc(
+ generate_one_diag_command(
+ sid=make_sid(object_id=object_id, set_id=RwSetIds.STATUS_SET_ID)
+ )
+ )
+ if op_code in OpCodesDevs.ENABLE_STATUS_HK:
+ q.add_log_cmd(f"RW {rw_idx}: {InfoDevs.ENABLE_STATUS_HK}")
+ interval = float(input("Please enter HK interval in floating point seconds: "))
+ cmds = enable_periodic_hk_command_with_interval(
+ True, make_sid(object_id, RwSetIds.STATUS_SET_ID), interval
+ )
+ for cmd in cmds:
+ q.add_pus_tc(cmd)
+ if op_code in OpCodesDevs.DISABLE_STATUS_HK:
+ q.add_log_cmd(f"RW {rw_idx}: {InfoDevs.DISABLE_STATUS_HK}")
+ q.add_pus_tc(
+ disable_periodic_hk_command(
+ True, make_sid(object_id, RwSetIds.STATUS_SET_ID)
+ )
+ )
+
+
+def pack_rw_ass_cmds(q: DefaultPusQueueHelper, object_id: bytes, op_code: str):
+ if op_code in OpCodesAss.OFF:
+ data = pack_mode_data(object_id=object_id, mode=Modes.OFF, submode=0)
+ q.add_pus_tc(
+ PusTelecommand(
+ service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=data
+ )
+ )
+ if op_code in OpCodesAss.ON:
+ data = pack_mode_data(object_id=object_id, mode=Modes.ON, submode=0)
+ q.add_pus_tc(
+ PusTelecommand(
+ service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=data
+ )
+ )
+ if op_code in OpCodesAss.NML:
+ data = pack_mode_data(object_id=object_id, mode=Modes.NORMAL, submode=0)
+ q.add_pus_tc(
+ PusTelecommand(
+ service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=data
+ )
+ )
+ if op_code in OpCodesAss.ALL_SPEED_UP:
+ speed, ramp_time = prompt_speed_ramp_time()
+ rw_speed_up_cmd_consec(q, [RW1_ID, RW2_ID, RW3_ID, RW4_ID], speed, ramp_time)
+ if op_code in OpCodesAss.ALL_SPEED_OFF:
+ rw_speed_down_cmd_consec(
+ q, [RW1_ID, RW2_ID, RW3_ID, RW4_ID], prompt_ramp_time()
+ )
+
+
+def prompt_speed_ramp_time() -> (int, int):
+ speed = int(
+ input("Specify speed [0.1 RPM, 0 or range [-65000, -1000] and [1000, 65000]: ")
+ )
+ return speed, prompt_ramp_time()
+
+
+def prompt_ramp_time() -> int:
+ return int(input("Specify ramp time [ms, range [10, 20000]]: "))
+
+
+def pack_set_speed_command(
+ object_id: bytes, speed: int, ramp_time_ms: int
+) -> PusTelecommand:
+ """With this function a command is packed to set the speed of a reaction wheel
+ :param object_id: The object id of the reaction wheel handler.
+ :param speed: Valid speeds are 0, [-65000, -1000] and [1000, 65000]. Values are
+ specified in 0.1 * RPM
+ :param ramp_time_ms: The time after which the reaction wheel will reach the commanded speed.
+ Valid times are 10 - 20000 ms
+ """
+ if speed > 0:
+ if speed < 1000 or speed > 65000:
+ raise ValueError(
+ "Invalid RW speed specified. "
+ "Allowed range is [1000, 65000] 0.1 * RPM"
+ )
+ elif speed < 0:
+ if speed < -65000 or speed > -1000:
+ raise ValueError(
+ "Invalid RW speed specified. "
+ "Allowed range is [-65000, -1000] 0.1 * RPM"
+ )
+ else:
+ # Speed is 0
+ pass
+
+ if ramp_time_ms < 0 or (
+ ramp_time_ms > 0 and (ramp_time_ms > 20000 or ramp_time_ms < 10)
+ ):
+ raise ValueError("Invalid Ramp Speed time. Allowed range is [10-20000] ms")
+ command_id = RwCommandIds.SET_SPEED
+ command = bytearray()
+ command += object_id + command_id
+ command = command + struct.pack("!i", speed)
+ command = command + ramp_time_ms.to_bytes(length=2, byteorder="big")
+ command = PusTelecommand(service=8, subservice=128, app_data=command)
+ return command
+
+
+def handle_rw_hk_data(
+ printer: FsfwTmTcPrinter, object_id: ObjectIdU32, set_id: int, hk_data: bytes
+):
+
+ pw = PrintWrapper(printer)
+ current_idx = 0
+ if set_id == RwSetIds.STATUS_SET_ID:
+ pw.dlog(
+ f"Received Status HK (ID {set_id}) from Reaction Wheel {object_id.name}"
+ )
+ fmt_str = "!IiiBB"
+ inc_len = struct.calcsize(fmt_str)
+ (temp, speed, ref_speed, state, clc_mode) = struct.unpack(
+ fmt_str, hk_data[current_idx : current_idx + inc_len]
+ )
+ current_idx += inc_len
+ speed_rpm = speed / 10.0
+ ref_speed_rpm = ref_speed / 10.0
+ pw.dlog(
+ f"Temperature {temp} C | Speed {speed_rpm} rpm | Reference Speed {ref_speed_rpm} rpm"
+ )
+ pw.dlog(
+ f"State {state}. 0: Error, 1: Idle, 2: Coasting, 3: Running, speed stable, "
+ f"4: Running, speed changing"
+ )
+ pw.dlog(
+ f"Current Limit Control mode {clc_mode}. 0: Low Current Mode (0.3 A), "
+ f"1: High Current Mode (0.6 A)"
+ )
+ printer.print_validity_buffer(hk_data[current_idx:], 5)
+ if set_id == RwSetIds.LAST_RESET:
+ pw.dlog(
+ f"Received Last Reset HK (ID {set_id}) from Reaction Wheel {object_id.name}"
+ )
+ fmt_str = "!BB"
+ inc_len = struct.calcsize(fmt_str)
+ (last_not_cleared_reset_status, current_reset_status) = struct.unpack(
+ fmt_str, hk_data[current_idx : current_idx + inc_len]
+ )
+ current_idx += inc_len
+ pw.dlog(
+ f"Last Non-Cleared (Cached) Reset Status {last_not_cleared_reset_status} | "
+ f"Current Reset Status {current_reset_status}"
+ )
+ if set_id == RwSetIds.TM_SET:
+ pw.dlog(f"Received TM HK (ID {set_id}) from Reaction Wheel {object_id.name}")
+ fmt_str = "!BiffBBiiIIIIIIIIIIIIIIII"
+ inc_len = struct.calcsize(fmt_str)
+ (
+ last_reset_status,
+ mcu_temp,
+ pressure_sens_temp,
+ pressure,
+ state,
+ clc_mode,
+ current_speed,
+ ref_speed,
+ num_invalid_crc_packets,
+ num_invalid_len_packets,
+ num_invalid_cmd_packets,
+ num_of_cmd_executed_requests,
+ num_of_cmd_replies,
+ uart_num_of_bytes_written,
+ uart_num_of_bytes_read,
+ uart_num_parity_errors,
+ uart_num_noise_errors,
+ uart_num_frame_errors,
+ uart_num_reg_overrun_errors,
+ uart_total_num_errors,
+ spi_num_bytes_written,
+ spi_num_bytes_read,
+ spi_num_reg_overrun_errors,
+ spi_total_num_errors,
+ ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
+
+ pw.dlog(
+ f"MCU Temperature {mcu_temp} | Pressure Sensore Temperature {pressure_sens_temp} C"
+ )
+ pw.dlog(f"Last Reset Status {last_reset_status}")
+ pw.dlog(
+ f"Current Limit Control mode {clc_mode}. 0: Low Current Mode (0.3 A), "
+ f"1: High Current Mode (0.6 A)"
+ )
+ pw.dlog(f"Speed {current_speed} rpm | Reference Speed {ref_speed} rpm")
+ pw.dlog(
+ f"State {state}. 0: Error, 1: Idle, 2: Coasting, 3: Running, speed stable, "
+ f"4: Running, speed changing"
+ )
+ pw.dlog(f"Number Of Invalid Packets:")
+ pw.dlog("CRC | Length | CMD")
+ pw.dlog(
+ f"{num_invalid_crc_packets} | {num_invalid_len_packets} | {num_invalid_cmd_packets}"
+ )
+ pw.dlog(
+ f"Num Of CMD Executed Requests {num_of_cmd_executed_requests} | "
+ f"Num of CMD Replies {num_of_cmd_replies}"
+ )
+ pw.dlog("UART COM information:")
+ pw.dlog(
+ f"NumBytesWritten | NumBytesRead | ParityErrs | NoiseErrs | FrameErrs | "
+ f"RegOverrunErrs | TotalErrs"
+ )
+ pw.dlog(
+ f"{uart_num_of_bytes_written} | {uart_num_of_bytes_read} | {uart_num_parity_errors} | "
+ f"{uart_num_noise_errors} | {uart_num_frame_errors} | {uart_num_reg_overrun_errors} | "
+ f"{uart_total_num_errors}"
+ )
+ pw.dlog("SPI COM Info:")
+ pw.dlog(f"NumBytesWritten | NumBytesRead | RegOverrunErrs | TotalErrs")
+ pw.dlog(
+ f"{spi_num_bytes_written} | {spi_num_bytes_read} | {spi_num_reg_overrun_errors} | "
+ f"{spi_total_num_errors}"
+ )
+ if current_idx > 0:
+ printer.print_validity_buffer(
+ validity_buffer=hk_data[current_idx:], num_vars=27
+ )
+
+
+def rw_speed_up_cmd_consec(
+ q: DefaultPusQueueHelper, obids: List[bytes], speed: int, ramp_time: int
+):
+ for oid in obids:
+ q.add_pus_tc(
+ pack_set_speed_command(object_id=oid, speed=speed, ramp_time_ms=ramp_time)
+ )
+
+
+def rw_speed_down_cmd_consec(
+ q: DefaultPusQueueHelper, obids: List[bytes], ramp_time: int
+):
+ for oid in obids:
+ q.add_pus_tc(
+ pack_set_speed_command(object_id=oid, speed=0, ramp_time_ms=ramp_time)
+ )
diff --git a/tmtc/power/acu.py b/tmtc/power/acu.py
index 5c1dad3..9da4bc5 100644
--- a/tmtc/power/acu.py
+++ b/tmtc/power/acu.py
@@ -6,25 +6,18 @@
from config.definitions import CustomServiceList
from tmtc.power.common_power import (
- pack_gomspace_cmds,
+ pack_common_gomspace_cmds,
add_gomspace_cmd_defs,
req_hk_cmds,
+ pack_common_power_cmds,
+ SetIds,
)
from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry
from tmtccmd.config.tmtc import tmtc_definitions_provider
from tmtccmd.tc import DefaultPusQueueHelper
-from tmtccmd.tc.pus_3_fsfw_hk import (
- make_sid,
- generate_one_diag_command,
- generate_one_hk_command,
-)
import gomspace.gomspace_common as gs
-from gomspace.gomspace_common import GomspaceOpCodes
-from gomspace.gomspace_common import GsInfo as GsInfo
from config.object_ids import ACU_HANDLER_ID
-from tmtc.power.p60dock import P60DockConfigTable
-from tmtccmd.tc.pus_8_funccmd import make_fsfw_action_cmd
from tmtccmd.util import ObjectIdU32
@@ -68,15 +61,14 @@ def add_acu_cmds(defs: TmtcDefinitionWrapper):
def pack_acu_commands(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str):
q.add_log_cmd("Handling ACU command")
- pack_gomspace_cmds("ACU", object_id, q, op_code)
+ pack_common_power_cmds("ACU", object_id, q, op_code)
+ pack_common_gomspace_cmds("ACU", object_id, q, op_code)
acu_req_hk_cmds(q, op_code)
pack_test_cmds(object_id=object_id, q=q)
def acu_req_hk_cmds(q: DefaultPusQueueHelper, op_code: str):
- req_hk_cmds(
- "ACU", q, op_code, ACU_HANDLER_ID, [gs.SetIds.ACU_CORE, gs.SetIds.ACU_AUX]
- )
+ req_hk_cmds("ACU", q, op_code, ACU_HANDLER_ID, [SetIds.CORE, SetIds.AUX])
class ACUTestProcedure:
@@ -173,13 +165,3 @@ def pack_test_cmds(object_id: ObjectIdU32, q: DefaultPusQueueHelper):
ACUConfigTable.ov_mode.parameter_size,
)
)
- if ACUTestProcedure.all or ACUTestProcedure.off:
- q.add_log_cmd("P60 Dock: Turning off ACU")
- q.add_pus_tc(
- gs.pack_set_param_command(
- ACU_HANDLER_ID,
- P60DockConfigTable.out_en_0.parameter_address,
- P60DockConfigTable.out_en_0.parameter_size,
- gs.Channel.off,
- )
- )
diff --git a/tmtc/power/common_power.py b/tmtc/power/common_power.py
index 20d8d06..59f6b2c 100644
--- a/tmtc/power/common_power.py
+++ b/tmtc/power/common_power.py
@@ -1,8 +1,6 @@
from gomspace.gomspace_common import (
pack_set_u8_param_command,
Channel,
- GomspaceOpCodes,
- GsInfo,
GomspaceDeviceActionIds,
prompt_and_pack_set_integer_param_command,
prompt_and_pack_get_param_command,
@@ -18,16 +16,44 @@ from tmtccmd.tc.pus_3_fsfw_hk import (
make_sid,
generate_one_diag_command,
generate_one_hk_command,
+ enable_periodic_hk_command_with_interval,
+ disable_periodic_hk_command,
)
from tmtccmd.tc.pus_8_funccmd import make_fsfw_action_cmd
from tmtccmd.util import ObjectIdU32, ObjectIdBase
+class GomspaceOpCodes:
+ GET_PARAM = ["get_param"]
+ SET_INTEGER_PARAM = ["set_int_param"]
+ SAVE_TABLE = ["save_table"]
+ RESET_GND_WATCHDOG = ["reset_gnd_wdt"]
+ SAVE_TABLE_DEFAULT = ["save_table_default"]
+ LOAD_TABLE = ["load_table"]
+ REQUEST_CONFIG_TABLE = ["cfg_table"]
+
+
+class GsInfo:
+ GET_PARAMETER = "Get parameter"
+ SET_PARAMETER = "Set integer parameter"
+ REQUEST_CONFIG_TABLE = "Request Config Table"
+ RESET_GND_WATCHDOG = "Reset GND watchdog"
+ SAVE_TABLE = "Save table non-volatile (file)"
+ SAVE_TABLE_DEFAULT = "Save table non-volatile (default)"
+ LOAD_TABLE = "Load Table"
+
+
class PowerInfo:
INFO_CORE = "Core Information"
INFO_AUX = "Auxiliary Information"
INFO_ALL = "All Information"
+ ENABLE_INFO_HK = "Enable Core Info HK"
+ DISABLE_INFO_HK = "Disable Core Info HK"
RESET_ALL_GND_WDTS = "Reset all Ground Watchdogs"
+ REQUEST_CORE_HK_ONCE = "Requesting Core HK once"
+ REQUEST_AUX_HK_ONCE = "Requesting Aux HK once"
+ PRINT_SWITCH_V_I = "Print Switch V I Info"
+ PRINT_LATCHUPS = "Print latchups"
class PowerOpCodes:
@@ -68,24 +94,54 @@ class PowerOpCodes:
PL_CAM_OFF = ["cam_off"]
INFO_CORE = ["info"]
+ ENABLE_INFO_HK = ["info_hk_on"]
+ DISABLE_INFO_HK = ["info_hk_off"]
INFO_AUX = ["info_aux"]
INFO_ALL = ["info_all"]
RESET_ALL_GND_WDTS = ["reset_gnd_wdts"]
+ # Request HK
+ REQUEST_CORE_HK_ONCE = ["hk_core"]
+ REQUEST_AUX_HK_ONCE = ["hk_aux"]
+ PRINT_SWITCH_V_I = ["print_switch_vi"]
+ PRINT_LATCHUPS = ["print_latchups"]
-def pack_gomspace_cmds(
+class SetIds:
+ CORE = 1
+ AUX = 2
+ CONFIG = 3
+
+
+def pack_common_power_cmds(
prefix: str, object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
):
objb = object_id.as_bytes
- if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I:
- q.add_log_cmd(f"{prefix}: {GsInfo.PRINT_SWITCH_V_I}")
+ if op_code in PowerOpCodes.ENABLE_INFO_HK:
+ interval = float(input("Specify HK interval in floating point seconds: "))
+ q.add_log_cmd(f"{prefix}: {PowerInfo.ENABLE_INFO_HK} with interval {interval}")
+ cmds = enable_periodic_hk_command_with_interval(
+ True, make_sid(objb, SetIds.CORE), interval
+ )
+ for cmd in cmds:
+ q.add_pus_tc(cmd)
+ if op_code in PowerOpCodes.DISABLE_INFO_HK:
+ q.add_log_cmd(f"{prefix}: {PowerInfo.DISABLE_INFO_HK}")
+ q.add_pus_tc(disable_periodic_hk_command(True, make_sid(objb, SetIds.CORE)))
+
+
+def pack_common_gomspace_cmds(
+ prefix: str, object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
+):
+ objb = object_id.as_bytes
+ if op_code in PowerOpCodes.PRINT_SWITCH_V_I:
+ q.add_log_cmd(f"{prefix}: {PowerInfo.PRINT_SWITCH_V_I}")
q.add_pus_tc(
make_fsfw_action_cmd(
object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I
)
)
- if op_code in GomspaceOpCodes.PRINT_LATCHUPS:
- q.add_log_cmd(f"{prefix}: {GsInfo.PRINT_LATCHUPS}")
+ if op_code in PowerOpCodes.PRINT_LATCHUPS:
+ q.add_log_cmd(f"{prefix}: {PowerInfo.PRINT_LATCHUPS}")
q.add_pus_tc(
make_fsfw_action_cmd(
object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS
@@ -189,12 +245,12 @@ def req_hk_cmds(
obj_id: bytes,
set_id_pair: [int, int],
):
- if op_code in GomspaceOpCodes.REQUEST_CORE_HK_ONCE:
- q.add_log_cmd(f"{prefix}: {GsInfo.REQUEST_CORE_HK_ONCE}")
+ if op_code in PowerOpCodes.REQUEST_CORE_HK_ONCE:
+ q.add_log_cmd(f"{prefix}: {PowerInfo.REQUEST_CORE_HK_ONCE}")
hk_sid = make_sid(object_id=obj_id, set_id=set_id_pair[0])
q.add_pus_tc(generate_one_diag_command(sid=hk_sid))
- if op_code in GomspaceOpCodes.REQUEST_AUX_HK_ONCE:
- q.add_log_cmd(f"{prefix}: {GsInfo.REQUEST_AUX_HK_ONCE}")
+ if op_code in PowerOpCodes.REQUEST_AUX_HK_ONCE:
+ q.add_log_cmd(f"{prefix}: {PowerInfo.REQUEST_AUX_HK_ONCE}")
hk_sid = make_sid(object_id=obj_id, set_id=set_id_pair[1])
q.add_pus_tc(generate_one_hk_command(sid=hk_sid))
@@ -229,17 +285,24 @@ def generic_off_cmd(
)
+def add_common_power_defs(oce: OpCodeEntry):
+ oce.add(keys=PowerOpCodes.REQUEST_CORE_HK_ONCE, info=PowerInfo.REQUEST_CORE_HK_ONCE)
+ oce.add(keys=PowerOpCodes.REQUEST_AUX_HK_ONCE, info=PowerInfo.REQUEST_AUX_HK_ONCE)
+ oce.add(keys=PowerOpCodes.ENABLE_INFO_HK, info=PowerInfo.ENABLE_INFO_HK)
+ oce.add(keys=PowerOpCodes.DISABLE_INFO_HK, info=PowerInfo.DISABLE_INFO_HK)
+
+
def add_gomspace_cmd_defs(oce: OpCodeEntry):
oce.add(
- keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE,
- info=GsInfo.REQUEST_CORE_HK_ONCE,
+ keys=PowerOpCodes.REQUEST_CORE_HK_ONCE,
+ info=PowerInfo.REQUEST_CORE_HK_ONCE,
)
oce.add(
- keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE,
- info=GsInfo.REQUEST_AUX_HK_ONCE,
+ keys=PowerOpCodes.REQUEST_AUX_HK_ONCE,
+ info=PowerInfo.REQUEST_AUX_HK_ONCE,
)
+ oce.add(keys=PowerOpCodes.PRINT_LATCHUPS, info=PowerInfo.PRINT_LATCHUPS)
oce.add(keys=GomspaceOpCodes.GET_PARAM, info=GsInfo.GET_PARAMETER)
- oce.add(keys=GomspaceOpCodes.PRINT_LATCHUPS, info=GsInfo.PRINT_LATCHUPS)
oce.add(keys=GomspaceOpCodes.SET_INTEGER_PARAM, info=GsInfo.SET_PARAMETER)
oce.add(keys=GomspaceOpCodes.REQUEST_CONFIG_TABLE, info=GsInfo.REQUEST_CONFIG_TABLE)
oce.add(keys=GomspaceOpCodes.SAVE_TABLE, info=GsInfo.SAVE_TABLE)
diff --git a/tmtc/power/p60dock.py b/tmtc/power/p60dock.py
index 993fa86..e9fed56 100644
--- a/tmtc/power/p60dock.py
+++ b/tmtc/power/p60dock.py
@@ -5,7 +5,12 @@
@author J. Meier
@date 13.12.2020
"""
-from tmtc.power.common_power import pack_gomspace_cmds, req_hk_cmds
+from tmtc.power.common_power import (
+ pack_common_gomspace_cmds,
+ req_hk_cmds,
+ pack_common_power_cmds,
+ SetIds,
+)
from tmtccmd.tc import DefaultPusQueueHelper
from gomspace.gomspace_common import (
TableEntry,
@@ -17,7 +22,6 @@ from gomspace.gomspace_common import (
pack_reboot_command,
pack_set_u8_param_command,
pack_set_u16_param_command,
- SetIds,
)
from config.object_ids import P60_DOCK_HANDLER
from tmtccmd.util import ObjectIdU32
@@ -96,7 +100,8 @@ class P60DockHkTable:
def pack_p60dock_cmds(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str):
objb = object_id.as_bytes
- pack_gomspace_cmds("P60 Dock", object_id, q, op_code)
+ pack_common_power_cmds("P60 Dock", object_id, q, op_code)
+ pack_common_gomspace_cmds("P60 Dock", object_id, q, op_code)
p60_dock_req_hk_cmds(q, op_code)
if op_code in P60OpCodes.STACK_3V3_ON:
q.add_log_cmd(P60Info.STACK_3V3_ON)
@@ -233,6 +238,4 @@ def pack_p60dock_cmds(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code:
def p60_dock_req_hk_cmds(q: DefaultPusQueueHelper, op_code: str):
- req_hk_cmds(
- "P60 Dock", q, op_code, P60_DOCK_HANDLER, [SetIds.P60_CORE, SetIds.P60_AUX]
- )
+ req_hk_cmds("P60 Dock", q, op_code, P60_DOCK_HANDLER, [SetIds.CORE, SetIds.AUX])
diff --git a/tmtc/power/pdu1.py b/tmtc/power/pdu1.py
index 7220632..2ee949c 100644
--- a/tmtc/power/pdu1.py
+++ b/tmtc/power/pdu1.py
@@ -3,18 +3,27 @@
@author J. Meier
@date 17.12.2020
"""
+from config.definitions import CustomServiceList
from config.object_ids import PDU_1_HANDLER_ID
from tmtc.power.common_power import (
- pack_gomspace_cmds,
+ pack_common_gomspace_cmds,
req_hk_cmds,
PowerOpCodes,
generic_on_cmd,
generic_off_cmd,
+ add_gomspace_cmd_defs,
+ pack_common_power_cmds,
+ GomspaceOpCodes,
+ GsInfo,
+ PowerInfo,
+ add_common_power_defs,
+ SetIds,
)
from gomspace.gomspace_common import *
from gomspace.gomspace_pdu_definitions import *
-from tmtccmd.config import OpCodeEntry
+from tmtccmd.config import OpCodeEntry, TmtcDefinitionWrapper
+from tmtccmd.config.tmtc import tmtc_definitions_provider
class Pdu1InfoBase:
@@ -59,9 +68,10 @@ class PDU1TestProcedure:
def pack_pdu1_commands(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str):
q.add_log_cmd("Commanding PDU1")
objb = object_id.as_bytes
- pdu1_cmds(q, op_code)
+ pdu1_switch_cmds(q, op_code)
pdu1_req_hk_cmds(q, op_code)
- pack_gomspace_cmds("PDU1", object_id, q, op_code)
+ pack_common_power_cmds("PDU1", object_id, q, op_code)
+ pack_common_gomspace_cmds("PDU1", object_id, q, op_code)
if PDU1TestProcedure.all or PDU1TestProcedure.ping:
q.add_log_cmd("PDU1: Ping Test")
ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
@@ -79,9 +89,7 @@ def pack_pdu1_commands(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code
def pdu1_req_hk_cmds(q: DefaultPusQueueHelper, op_code: str):
- req_hk_cmds(
- "PDU1", q, op_code, PDU_1_HANDLER_ID, [SetIds.PDU_1_CORE, SetIds.PDU_1_AUX]
- )
+ req_hk_cmds("PDU1", q, op_code, PDU_1_HANDLER_ID, [SetIds.CORE, SetIds.AUX])
def info_on_pdu1(base: str) -> str:
@@ -92,7 +100,7 @@ def info_off_pdu1(base: str) -> str:
return "PDU1: " + base + " off"
-def pdu1_cmds(q: DefaultPusQueueHelper, op_code: str):
+def pdu1_switch_cmds(q: DefaultPusQueueHelper, op_code: str):
if op_code in PowerOpCodes.TCS_ON:
tcs_on_cmd(q)
elif op_code in PowerOpCodes.TCS_OFF:
@@ -146,6 +154,25 @@ def add_pdu1_common_defs(oce: OpCodeEntry):
oce.add(keys=PowerOpCodes.SCEX_OFF, info=info_off_pdu1(Pdu1InfoBase.SCEX))
+@tmtc_definitions_provider
+def add_pdu1_cmds(defs: TmtcDefinitionWrapper):
+ oce = OpCodeEntry()
+ add_pdu1_common_defs(oce)
+ add_common_power_defs(oce)
+ add_gomspace_cmd_defs(oce)
+ oce.add(
+ keys=PowerOpCodes.PRINT_SWITCH_V_I,
+ info="PDU1: Print Switches, Voltages, Currents",
+ )
+ oce.add(keys=GomspaceOpCodes.GET_PARAM, info=GsInfo.GET_PARAMETER)
+
+ defs.add_service(
+ name=CustomServiceList.PDU1.value,
+ info="PDU1 Device",
+ op_code_entry=oce,
+ )
+
+
def tcs_on_cmd(q: DefaultPusQueueHelper):
generic_on_cmd(PDU_1_HANDLER_ID, q, Pdu1InfoBase.TCS, Pdu1ChIndex.TCS)
diff --git a/tmtc/power/pdu2.py b/tmtc/power/pdu2.py
index 683cd39..6ae17f5 100644
--- a/tmtc/power/pdu2.py
+++ b/tmtc/power/pdu2.py
@@ -8,15 +8,20 @@
"""
from config.object_ids import PDU_2_HANDLER_ID
from tmtc.power.common_power import (
- pack_gomspace_cmds,
+ pack_common_gomspace_cmds,
req_hk_cmds,
PowerOpCodes,
generic_on_cmd,
generic_off_cmd,
+ add_gomspace_cmd_defs,
+ pack_common_power_cmds,
+ SetIds,
+ add_common_power_defs,
)
from gomspace.gomspace_common import *
from gomspace.gomspace_pdu_definitions import *
-from tmtccmd.config import OpCodeEntry
+from tmtccmd.config import OpCodeEntry, TmtcDefinitionWrapper
+from tmtccmd.config.tmtc import tmtc_definitions_provider
class Pdu2InfoBase:
@@ -74,7 +79,8 @@ def pack_pdu2_commands(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code
objb = object_id.as_bytes
pdu2_cmds(q, op_code)
pdu2_req_hk_cmds(q, op_code)
- pack_gomspace_cmds("PDU2", object_id, q, op_code)
+ pack_common_power_cmds("PDU2", object_id, q, op_code)
+ pack_common_gomspace_cmds("PDU2", object_id, q, op_code)
if PDU2TestProcedure.all or PDU2TestProcedure.reboot:
q.add_log_cmd("PDU2: Reboot")
q.add_pus_tc(pack_reboot_command(object_id))
@@ -130,6 +136,27 @@ def pack_pdu2_commands(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code
q.add_pus_tc(pack_request_full_hk_table_command(object_id))
+@tmtc_definitions_provider
+def add_pdu2_cmds(defs: TmtcDefinitionWrapper):
+ oce = OpCodeEntry()
+ add_pdu2_common_defs(oce)
+ add_common_power_defs(oce)
+ add_gomspace_cmd_defs(oce)
+ oce.add(
+ keys=PowerOpCodes.PRINT_SWITCH_V_I,
+ info="PDU2: Print Switches, Voltages, Currents",
+ )
+ oce.add(
+ keys=PowerOpCodes.PRINT_LATCHUPS,
+ info="PDU2: Print Latchups",
+ )
+ defs.add_service(
+ name="pdu2",
+ info="PDU2 Device",
+ op_code_entry=oce,
+ )
+
+
def pdu2_cmds(q: DefaultPusQueueHelper, op_code: str):
if op_code in PowerOpCodes.PL_PCDU_VBAT_NOM_ON:
pl_pcdu_bat_nom_on_cmd(q)
@@ -211,9 +238,7 @@ def add_pdu2_common_defs(oce: OpCodeEntry):
def pdu2_req_hk_cmds(q: DefaultPusQueueHelper, op_code: str):
- req_hk_cmds(
- "PDU2", q, op_code, PDU_2_HANDLER_ID, [SetIds.PDU_2_CORE, SetIds.PDU_2_AUX]
- )
+ req_hk_cmds("PDU2", q, op_code, PDU_2_HANDLER_ID, [SetIds.CORE, SetIds.AUX])
def pl_pcdu_bat_nom_on_cmd(q: DefaultPusQueueHelper):
diff --git a/tmtc/power/power.py b/tmtc/power/power.py
index 7391fbc..50d4d96 100644
--- a/tmtc/power/power.py
+++ b/tmtc/power/power.py
@@ -1,4 +1,3 @@
-from gomspace.gomspace_common import GsInfo, GomspaceOpCodes
from tmtc.power.common_power import (
PowerOpCodes,
PowerInfo,
@@ -13,8 +12,18 @@ from config.object_ids import (
PDU_2_HANDLER_ID,
get_object_ids,
)
-from tmtc.power.pdu1 import pdu1_req_hk_cmds, pdu1_cmds, add_pdu1_common_defs
-from tmtc.power.pdu2 import pdu2_req_hk_cmds, add_pdu2_common_defs, pdu2_cmds
+from tmtc.power.pdu1 import (
+ pdu1_req_hk_cmds,
+ pdu1_switch_cmds,
+ add_pdu1_common_defs,
+ add_pdu1_cmds,
+)
+from tmtc.power.pdu2 import (
+ pdu2_req_hk_cmds,
+ add_pdu2_common_defs,
+ pdu2_cmds,
+ add_pdu2_cmds,
+)
from tmtccmd import get_console_logger
from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry
@@ -28,29 +37,29 @@ LOGGER = get_console_logger()
def pack_power_commands(q: DefaultPusQueueHelper, op_code: str):
- pdu1_cmds(q, op_code)
+ pdu1_switch_cmds(q, op_code)
pdu2_cmds(q, op_code)
if op_code in PowerOpCodes.INFO_CORE:
- pdu1_req_hk_cmds(q, GomspaceOpCodes.REQUEST_CORE_HK_ONCE[0])
- pdu2_req_hk_cmds(q, GomspaceOpCodes.REQUEST_CORE_HK_ONCE[0])
- p60_dock_req_hk_cmds(q, GomspaceOpCodes.REQUEST_CORE_HK_ONCE[0])
- acu_req_hk_cmds(q, GomspaceOpCodes.REQUEST_CORE_HK_ONCE[0])
+ pdu1_req_hk_cmds(q, PowerOpCodes.REQUEST_CORE_HK_ONCE[0])
+ pdu2_req_hk_cmds(q, PowerOpCodes.REQUEST_CORE_HK_ONCE[0])
+ p60_dock_req_hk_cmds(q, PowerOpCodes.REQUEST_CORE_HK_ONCE[0])
+ acu_req_hk_cmds(q, PowerOpCodes.REQUEST_CORE_HK_ONCE[0])
q.add_wait_seconds(8.0)
elif op_code in PowerOpCodes.INFO_AUX:
- pdu1_req_hk_cmds(q, GomspaceOpCodes.REQUEST_AUX_HK_ONCE[0])
- pdu2_req_hk_cmds(q, GomspaceOpCodes.REQUEST_AUX_HK_ONCE[0])
- p60_dock_req_hk_cmds(q, GomspaceOpCodes.REQUEST_AUX_HK_ONCE[0])
- acu_req_hk_cmds(q, GomspaceOpCodes.REQUEST_AUX_HK_ONCE[0])
+ pdu1_req_hk_cmds(q, PowerOpCodes.REQUEST_AUX_HK_ONCE[0])
+ pdu2_req_hk_cmds(q, PowerOpCodes.REQUEST_AUX_HK_ONCE[0])
+ p60_dock_req_hk_cmds(q, PowerOpCodes.REQUEST_AUX_HK_ONCE[0])
+ acu_req_hk_cmds(q, PowerOpCodes.REQUEST_AUX_HK_ONCE[0])
q.add_wait_seconds(8.0)
elif op_code in PowerOpCodes.INFO_ALL:
- pdu1_req_hk_cmds(q, GomspaceOpCodes.REQUEST_CORE_HK_ONCE[0])
- pdu2_req_hk_cmds(q, GomspaceOpCodes.REQUEST_CORE_HK_ONCE[0])
- pdu1_req_hk_cmds(q, GomspaceOpCodes.REQUEST_AUX_HK_ONCE[0])
- pdu2_req_hk_cmds(q, GomspaceOpCodes.REQUEST_AUX_HK_ONCE[0])
- p60_dock_req_hk_cmds(q, GomspaceOpCodes.REQUEST_CORE_HK_ONCE[0])
- p60_dock_req_hk_cmds(q, GomspaceOpCodes.REQUEST_AUX_HK_ONCE[0])
- acu_req_hk_cmds(q, GomspaceOpCodes.REQUEST_CORE_HK_ONCE[0])
- acu_req_hk_cmds(q, GomspaceOpCodes.REQUEST_AUX_HK_ONCE[0])
+ pdu1_req_hk_cmds(q, PowerOpCodes.REQUEST_CORE_HK_ONCE[0])
+ pdu2_req_hk_cmds(q, PowerOpCodes.REQUEST_CORE_HK_ONCE[0])
+ pdu1_req_hk_cmds(q, PowerOpCodes.REQUEST_AUX_HK_ONCE[0])
+ pdu2_req_hk_cmds(q, PowerOpCodes.REQUEST_AUX_HK_ONCE[0])
+ p60_dock_req_hk_cmds(q, PowerOpCodes.REQUEST_CORE_HK_ONCE[0])
+ p60_dock_req_hk_cmds(q, PowerOpCodes.REQUEST_AUX_HK_ONCE[0])
+ acu_req_hk_cmds(q, PowerOpCodes.REQUEST_CORE_HK_ONCE[0])
+ acu_req_hk_cmds(q, PowerOpCodes.REQUEST_AUX_HK_ONCE[0])
q.add_wait_seconds(8.0)
elif op_code in PowerOpCodes.RESET_ALL_GND_WDTS:
oids = get_object_ids()
@@ -93,46 +102,6 @@ def add_power_cmd_defs(defs: TmtcDefinitionWrapper):
)
-@tmtc_definitions_provider
-def add_pdu1_cmds(defs: TmtcDefinitionWrapper):
- oce = OpCodeEntry()
- add_pdu1_common_defs(oce)
- add_gomspace_cmd_defs(oce)
- oce.add(keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, info=GsInfo.REQUEST_CORE_HK_ONCE)
- oce.add(keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, info=GsInfo.REQUEST_AUX_HK_ONCE)
- oce.add(
- keys=GomspaceOpCodes.PRINT_SWITCH_V_I,
- info="PDU1: Print Switches, Voltages, Currents",
- )
- oce.add(keys=GomspaceOpCodes.GET_PARAM, info=GsInfo.GET_PARAMETER)
-
- defs.add_service(
- name=CustomServiceList.PDU1.value,
- info="PDU1 Device",
- op_code_entry=oce,
- )
-
-
-@tmtc_definitions_provider
-def add_pdu2_cmds(defs: TmtcDefinitionWrapper):
- oce = OpCodeEntry()
- add_pdu2_common_defs(oce)
- add_gomspace_cmd_defs(oce)
- oce.add(
- keys=GomspaceOpCodes.PRINT_SWITCH_V_I,
- info="PDU2: Print Switches, Voltages, Currents",
- )
- oce.add(
- keys=GomspaceOpCodes.PRINT_LATCHUPS,
- info="PDU2: Print Latchups",
- )
- defs.add_service(
- name="pdu2",
- info="PDU2 Device",
- op_code_entry=oce,
- )
-
-
def add_pcdu_cmds(defs: TmtcDefinitionWrapper):
add_p60_cmds(defs)
add_pdu1_cmds(defs)
diff --git a/tmtc/power/tm.py b/tmtc/power/tm.py
index 9f918f1..93f32d2 100644
--- a/tmtc/power/tm.py
+++ b/tmtc/power/tm.py
@@ -1,10 +1,11 @@
import struct
from typing import List, Tuple
+from tmtc.power.common_power import SetIds
from tmtccmd.util import ObjectIdBase
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
from pus_tm.defs import PrintWrapper
-from gomspace.gomspace_common import SetIds, GomspaceDeviceActionIds
+from gomspace.gomspace_common import GomspaceDeviceActionIds
from config.object_ids import (
PDU_1_HANDLER_ID,
PDU_2_HANDLER_ID,
@@ -145,7 +146,7 @@ def handle_pdu_data(
pw = PrintWrapper(printer=printer)
current_idx = 0
priv_idx = pdu_idx - 1
- if set_id == SetIds.PDU_1_AUX or set_id == SetIds.PDU_2_AUX:
+ if set_id == SetIds.AUX or set_id == SetIds.AUX:
fmt_str = "!hhBBBIIH"
inc_len = struct.calcsize(fmt_str)
(
@@ -182,7 +183,7 @@ def handle_pdu_data(
wdt.print()
pw.dlog(f"PDU Device Types: 0:FRAM|1:ADC|2:ADC|3:TempSens|4,5,6,7:Reserved")
dev_parser.print(pw=pw)
- if set_id == SetIds.PDU_1_CORE or set_id == SetIds.PDU_2_CORE:
+ if set_id == SetIds.CORE or set_id == SetIds.CORE:
pw.dlog(f"Received PDU HK from PDU {pdu_idx}")
current_list = []
for idx in range(len(PDU1_CHANNELS_NAMES)):
@@ -223,7 +224,7 @@ def handle_pdu_data(
def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
pw = PrintWrapper(printer=printer)
- if set_id == SetIds.P60_CORE:
+ if set_id == SetIds.CORE:
pw.dlog("Received P60 Core HK. Voltages in mV, currents in mA")
current_idx = 0
current_list = []
@@ -270,7 +271,7 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
pw.dlog(temps)
pw.dlog(batt_info)
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=9)
- if set_id == SetIds.P60_AUX:
+ if set_id == SetIds.AUX:
pw.dlog("Received P60 AUX HK. Voltages in mV, currents in mA")
current_idx = 0
latchup_list = []
@@ -349,7 +350,7 @@ def gen_six_entry_u16_list(hk_data: bytes, current_idx: int) -> Tuple[int, List[
def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
pw = PrintWrapper(printer=printer)
- if set_id == SetIds.ACU_CORE:
+ if set_id == SetIds.CORE:
mppt_mode = hk_data[0]
current_idx = 1
current_idx, currents = gen_six_entry_u16_list(
@@ -393,7 +394,7 @@ def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
printer.print_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=12
)
- if set_id == SetIds.ACU_AUX:
+ if set_id == SetIds.AUX:
current_idx = 0
fmt_str = "!BBB"
inc_len = struct.calcsize(fmt_str)