ACU HK Parsing #80
12
CHANGELOG.md
12
CHANGELOG.md
@ -10,6 +10,18 @@ list yields a list of all related PRs for each release.
|
|||||||
|
|
||||||
# [unreleased]
|
# [unreleased]
|
||||||
|
|
||||||
|
# [v1.12.0]
|
||||||
|
|
||||||
|
- Add Rad Sensor HK parsing
|
||||||
|
PR: https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/pulls/81
|
||||||
|
- Add procedures, parser functions and general application functionalities
|
||||||
|
for the thermal-vacuum test. This includes daemon functionality to poll
|
||||||
|
all Telemetry even when there is no operator present
|
||||||
|
PR: https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/pulls/76
|
||||||
|
https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/pulls/74
|
||||||
|
https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/pulls/79
|
||||||
|
https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/pulls/73
|
||||||
|
|
||||||
# [v1.11.0]
|
# [v1.11.0]
|
||||||
|
|
||||||
- Add `setup.cfg` and `setup.py` file, allowing package installation
|
- Add `setup.cfg` and `setup.py` file, allowing package installation
|
||||||
|
@ -48,5 +48,6 @@ class CustomServiceList(enum.Enum):
|
|||||||
TCS_ASS = "tcs-ass"
|
TCS_ASS = "tcs-ass"
|
||||||
TIME = "time"
|
TIME = "time"
|
||||||
PROCEDURE = "proc"
|
PROCEDURE = "proc"
|
||||||
|
RTD = "rtd"
|
||||||
TVTTESTPROCEDURE = "tvtestproc"
|
TVTTESTPROCEDURE = "tvtestproc"
|
||||||
CONTROLLERS = "controllers"
|
CONTROLLERS = "controllers"
|
||||||
|
@ -77,6 +77,24 @@ HEATER_5_STR = bytes([0x60, 0x00, 0x00, 0x05])
|
|||||||
HEATER_6_DRO = bytes([0x60, 0x00, 0x00, 0x06])
|
HEATER_6_DRO = bytes([0x60, 0x00, 0x00, 0x06])
|
||||||
HEATER_7_HPA = bytes([0x60, 0x00, 0x00, 0x07])
|
HEATER_7_HPA = bytes([0x60, 0x00, 0x00, 0x07])
|
||||||
|
|
||||||
|
# RTDs
|
||||||
|
RTD_0_PLOC_HSPD = bytes([0x44, 0x42, 0x00, 0x16])
|
||||||
|
RTD_1_PLOC_MISSIONBRD= bytes([0x44, 0x42, 0x00, 0x17])
|
||||||
|
RTD_2_4K_CAM = bytes([0x44, 0x42, 0x00, 0x18])
|
||||||
|
RTD_3_DAC_HSPD = bytes([0x44, 0x42, 0x00, 0x19])
|
||||||
|
RTD_4_STR = bytes([0x44, 0x42, 0x00, 0x20])
|
||||||
|
RTD_5_RW1_MX_MY = bytes([0x44, 0x42, 0x00, 0x21])
|
||||||
|
RTD_6_DRO = bytes([0x44, 0x42, 0x00, 0x22])
|
||||||
|
RTD_7_SCEX = bytes([0x44, 0x42, 0x00, 0x23])
|
||||||
|
RTD_8_X8 = bytes([0x44, 0x42, 0x00, 0x24])
|
||||||
|
RTD_9_HPA = bytes([0x44, 0x42, 0x00, 0x25])
|
||||||
|
RTD_10_PL_TX = bytes([0x44, 0x42, 0x00, 0x26])
|
||||||
|
RTD_11_MPA = bytes([0x44, 0x42, 0x00, 0x27])
|
||||||
|
RTD_12_ACU = bytes([0x44, 0x42, 0x00, 0x28])
|
||||||
|
RTD_13_PLPCDU_HSPD = bytes([0x44, 0x42, 0x00, 0x29])
|
||||||
|
RTD_14_TCS_BRD = bytes([0x44, 0x42, 0x00, 0x30])
|
||||||
|
RTD_15_IMTQ = bytes([0x44, 0x42, 0x00, 0x31])
|
||||||
|
|
||||||
# System and Assembly Objects
|
# System and Assembly Objects
|
||||||
ACS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x01])
|
ACS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x01])
|
||||||
SUS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x02])
|
SUS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x02])
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
from pus_tc.devs.gps import GpsOpCodes
|
from pus_tc.devs.gps import GpsOpCodes
|
||||||
from pus_tc.devs.pcdu import add_pcdu_cmds
|
from pus_tc.devs.pcdu import add_pcdu_cmds
|
||||||
|
from pus_tc.devs.rad_sensor import add_rad_sens_cmds
|
||||||
from tmtccmd.config import (
|
from tmtccmd.config import (
|
||||||
add_op_code_entry,
|
add_op_code_entry,
|
||||||
add_service_op_code_entry,
|
add_service_op_code_entry,
|
||||||
@ -9,6 +10,7 @@ from tmtccmd.config import (
|
|||||||
)
|
)
|
||||||
from config.definitions import CustomServiceList
|
from config.definitions import CustomServiceList
|
||||||
from pus_tc.devs.heater import add_heater_cmds
|
from pus_tc.devs.heater import add_heater_cmds
|
||||||
|
from pus_tc.devs.rtd import specify_rtd_cmds
|
||||||
from pus_tc.devs.reaction_wheels import add_rw_cmds
|
from pus_tc.devs.reaction_wheels import add_rw_cmds
|
||||||
from pus_tc.devs.bpx_batt import BpxOpCodes
|
from pus_tc.devs.bpx_batt import BpxOpCodes
|
||||||
|
|
||||||
@ -21,6 +23,7 @@ def get_eive_service_op_code_dict() -> ServiceOpCodeDictT:
|
|||||||
add_core_controller_definitions(cmd_dict=service_op_code_dict)
|
add_core_controller_definitions(cmd_dict=service_op_code_dict)
|
||||||
add_pl_pcdu_cmds(cmd_dict=service_op_code_dict)
|
add_pl_pcdu_cmds(cmd_dict=service_op_code_dict)
|
||||||
add_pcdu_cmds(cmd_dict=service_op_code_dict)
|
add_pcdu_cmds(cmd_dict=service_op_code_dict)
|
||||||
|
specify_rtd_cmds(cmd_dict=service_op_code_dict)
|
||||||
add_imtq_cmds(cmd_dict=service_op_code_dict)
|
add_imtq_cmds(cmd_dict=service_op_code_dict)
|
||||||
add_rad_sens_cmds(cmd_dict=service_op_code_dict)
|
add_rad_sens_cmds(cmd_dict=service_op_code_dict)
|
||||||
add_rw_cmds(cmd_dict=service_op_code_dict)
|
add_rw_cmds(cmd_dict=service_op_code_dict)
|
||||||
@ -493,20 +496,6 @@ def add_imtq_cmds(cmd_dict: ServiceOpCodeDictT):
|
|||||||
cmd_dict[CustomServiceList.IMTQ.value] = service_imtq_tuple
|
cmd_dict[CustomServiceList.IMTQ.value] = service_imtq_tuple
|
||||||
|
|
||||||
|
|
||||||
def add_rad_sens_cmds(cmd_dict: ServiceOpCodeDictT):
|
|
||||||
op_code_dict_srv_rad_sensor = {
|
|
||||||
"0": ("Radiation Sensor: Set mode on", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
|
||||||
"1": ("Radiation Sensor: Set mode normal", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
|
||||||
"2": ("Radiation Sensor: Set mode off", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
|
||||||
"3": ("Radiation Sensor: Start conversions", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
|
||||||
"4": ("Radiation Sensor: Read conversions", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
|
||||||
"5": ("Radiation Sensor: Enable debug output", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
|
||||||
"6": ("Radiation Sensor: Disable debug putput", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
|
||||||
}
|
|
||||||
service_rad_sensor_tuple = ("Radiation Sensor", op_code_dict_srv_rad_sensor)
|
|
||||||
cmd_dict[CustomServiceList.RAD_SENSOR.value] = service_rad_sensor_tuple
|
|
||||||
|
|
||||||
|
|
||||||
def add_ploc_mpsoc_cmds(cmd_dict: ServiceOpCodeDictT):
|
def add_ploc_mpsoc_cmds(cmd_dict: ServiceOpCodeDictT):
|
||||||
op_code_dict_srv_ploc_mpsoc = {
|
op_code_dict_srv_ploc_mpsoc = {
|
||||||
"0": ("Ploc MPSoC: Set mode off", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
"0": ("Ploc MPSoC: Set mode off", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||||
|
@ -7,11 +7,37 @@
|
|||||||
"""
|
"""
|
||||||
import struct
|
import struct
|
||||||
|
|
||||||
from tmtccmd.config.definitions import QueueCommands
|
from config.definitions import CustomServiceList
|
||||||
|
from tmtccmd.config import add_op_code_entry, add_service_op_code_entry
|
||||||
|
from tmtccmd.config.definitions import QueueCommands, ServiceOpCodeDictT, OpCodeDictKeys
|
||||||
|
|
||||||
from tmtccmd.tc.packer import TcQueueT
|
from tmtccmd.tc.packer import TcQueueT
|
||||||
from spacepackets.ecss.tc import PusTelecommand
|
from spacepackets.ecss.tc import PusTelecommand
|
||||||
from pus_tc.service_200_mode import pack_mode_data, Modes
|
from pus_tc.service_200_mode import pack_mode_data, Modes
|
||||||
|
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid
|
||||||
|
from tmtccmd.utility import ObjectId
|
||||||
|
|
||||||
|
|
||||||
|
class SetIds:
|
||||||
|
HK = 3
|
||||||
|
|
||||||
|
|
||||||
|
class OpCodes:
|
||||||
|
ON = ["0", "on"]
|
||||||
|
NORMAL = ["1", "normal"]
|
||||||
|
OFF = ["2", "off"]
|
||||||
|
REQ_HK_ONCE = ["3", "hk-os"]
|
||||||
|
DEBUG_ON = ["10", "dbg-on"]
|
||||||
|
DEBUG_OFF = ["11", "dbg-off"]
|
||||||
|
|
||||||
|
|
||||||
|
class Info:
|
||||||
|
ON = "Switch Rad Sensor on"
|
||||||
|
NORMAL = "Switch Rad Sensor normal"
|
||||||
|
OFF = "Switch Rad sensor off"
|
||||||
|
REQ_OS_HK = "Request one-shot HK"
|
||||||
|
DEBUG_ON = "Switch debug output on"
|
||||||
|
DEBUG_OFF = "Switch debug output off"
|
||||||
|
|
||||||
|
|
||||||
class CommandIds:
|
class CommandIds:
|
||||||
@ -21,52 +47,64 @@ class CommandIds:
|
|||||||
DISABLE_DEBUG_OUTPUT = 5
|
DISABLE_DEBUG_OUTPUT = 5
|
||||||
|
|
||||||
|
|
||||||
def pack_rad_sensor_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
|
def add_rad_sens_cmds(cmd_dict: ServiceOpCodeDictT):
|
||||||
tc_queue.appendleft(
|
|
||||||
(
|
op_code_dict = dict()
|
||||||
QueueCommands.PRINT,
|
add_op_code_entry(op_code_dict=op_code_dict, info=Info.ON, keys=OpCodes.ON)
|
||||||
"Testing radiation sensor handler with object id: 0x" + object_id.hex(),
|
add_op_code_entry(op_code_dict=op_code_dict, info=Info.OFF, keys=OpCodes.OFF)
|
||||||
|
add_op_code_entry(op_code_dict=op_code_dict, info=Info.NORMAL, keys=OpCodes.NORMAL)
|
||||||
|
add_op_code_entry(
|
||||||
|
op_code_dict=op_code_dict, info=Info.REQ_OS_HK, keys=OpCodes.REQ_HK_ONCE
|
||||||
)
|
)
|
||||||
|
add_op_code_entry(
|
||||||
|
op_code_dict=op_code_dict, info=Info.DEBUG_ON, keys=OpCodes.DEBUG_ON
|
||||||
|
)
|
||||||
|
add_op_code_entry(
|
||||||
|
op_code_dict=op_code_dict, info=Info.DEBUG_OFF, keys=OpCodes.DEBUG_OFF
|
||||||
|
)
|
||||||
|
add_service_op_code_entry(
|
||||||
|
srv_op_code_dict=cmd_dict,
|
||||||
|
name=CustomServiceList.RAD_SENSOR.value,
|
||||||
|
info="Radiation Sensor",
|
||||||
|
op_code_entry=op_code_dict,
|
||||||
)
|
)
|
||||||
|
|
||||||
if op_code == "0":
|
|
||||||
tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Switch to mode on"))
|
def pack_rad_sensor_test_into(object_id: ObjectId, tc_queue: TcQueueT, op_code: str):
|
||||||
mode_data = pack_mode_data(object_id, Modes.ON, 0)
|
tc_queue.appendleft(
|
||||||
|
(QueueCommands.PRINT, f"Commanding Radiation sensor handler {object_id}")
|
||||||
|
)
|
||||||
|
|
||||||
|
if op_code in OpCodes.ON:
|
||||||
|
rad_sensor_mode_cmd(object_id, Modes.ON, Info.ON, tc_queue)
|
||||||
|
if op_code in OpCodes.NORMAL:
|
||||||
|
rad_sensor_mode_cmd(object_id, Modes.NORMAL, Info.NORMAL, tc_queue)
|
||||||
|
if op_code in OpCodes.OFF:
|
||||||
|
rad_sensor_mode_cmd(object_id, Modes.OFF, Info.OFF, tc_queue)
|
||||||
|
if op_code in OpCodes.REQ_HK_ONCE:
|
||||||
|
tc_queue.appendleft((QueueCommands.PRINT, f"Rad sensor: {Info.REQ_OS_HK}"))
|
||||||
|
cmd = generate_one_hk_command(
|
||||||
|
sid=make_sid(object_id.as_bytes, set_id=SetIds.HK), ssc=0
|
||||||
|
)
|
||||||
|
tc_queue.appendleft(cmd.pack_command_tuple())
|
||||||
|
if op_code in OpCodes.DEBUG_ON:
|
||||||
|
tc_queue.appendleft((QueueCommands.PRINT, f"Rad sensor: {Info.DEBUG_ON}"))
|
||||||
|
command = object_id.as_bytes + struct.pack("!I", CommandIds.ENABLE_DEBUG_OUTPUT)
|
||||||
|
command = PusTelecommand(service=8, subservice=128, ssc=45, app_data=command)
|
||||||
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
|
if op_code in OpCodes.DEBUG_OFF:
|
||||||
|
tc_queue.appendleft((QueueCommands.PRINT, f"Rad sensor: {Info.DEBUG_OFF}"))
|
||||||
|
command = object_id.as_bytes + struct.pack(
|
||||||
|
"!I", CommandIds.DISABLE_DEBUG_OUTPUT
|
||||||
|
)
|
||||||
|
command = PusTelecommand(service=8, subservice=128, ssc=45, app_data=command)
|
||||||
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
|
|
||||||
|
|
||||||
|
def rad_sensor_mode_cmd(
|
||||||
|
object_id: ObjectId, mode: Modes, info: str, tc_queue: TcQueueT
|
||||||
|
):
|
||||||
|
tc_queue.appendleft((QueueCommands.PRINT, f"Rad sensor: {info}"))
|
||||||
|
mode_data = pack_mode_data(object_id.as_bytes, mode, 0)
|
||||||
command = PusTelecommand(service=200, subservice=1, ssc=41, app_data=mode_data)
|
command = PusTelecommand(service=200, subservice=1, ssc=41, app_data=mode_data)
|
||||||
tc_queue.appendleft(command.pack_command_tuple())
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
|
|
||||||
if op_code == "1":
|
|
||||||
tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Switch to mode normal"))
|
|
||||||
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
|
|
||||||
command = PusTelecommand(service=200, subservice=1, ssc=42, app_data=mode_data)
|
|
||||||
tc_queue.appendleft(command.pack_command_tuple())
|
|
||||||
|
|
||||||
if op_code == "2":
|
|
||||||
tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Switch to mode off"))
|
|
||||||
mode_data = pack_mode_data(object_id, Modes.OFF, 0)
|
|
||||||
command = PusTelecommand(service=200, subservice=1, ssc=42, app_data=mode_data)
|
|
||||||
tc_queue.appendleft(command.pack_command_tuple())
|
|
||||||
|
|
||||||
if op_code == "3":
|
|
||||||
tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Start conversions"))
|
|
||||||
command = object_id + struct.pack("!I", CommandIds.START_CONVERSIONS)
|
|
||||||
command = PusTelecommand(service=8, subservice=128, ssc=43, app_data=command)
|
|
||||||
tc_queue.appendleft(command.pack_command_tuple())
|
|
||||||
|
|
||||||
if op_code == "4":
|
|
||||||
tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Read conversions"))
|
|
||||||
command = object_id + struct.pack("!I", CommandIds.READ_CONVERSIONS)
|
|
||||||
command = PusTelecommand(service=8, subservice=128, ssc=44, app_data=command)
|
|
||||||
tc_queue.appendleft(command.pack_command_tuple())
|
|
||||||
|
|
||||||
if op_code == "5":
|
|
||||||
tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Enable debug output"))
|
|
||||||
command = object_id + struct.pack("!I", CommandIds.ENABLE_DEBUG_OUTPUT)
|
|
||||||
command = PusTelecommand(service=8, subservice=128, ssc=45, app_data=command)
|
|
||||||
tc_queue.appendleft(command.pack_command_tuple())
|
|
||||||
|
|
||||||
if op_code == "6":
|
|
||||||
tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Disable debug output"))
|
|
||||||
command = object_id + struct.pack("!I", CommandIds.DISABLE_DEBUG_OUTPUT)
|
|
||||||
command = PusTelecommand(service=8, subservice=128, ssc=45, app_data=command)
|
|
||||||
tc_queue.appendleft(command.pack_command_tuple())
|
|
||||||
|
113
pus_tc/devs/rtd.py
Normal file
113
pus_tc/devs/rtd.py
Normal file
@ -0,0 +1,113 @@
|
|||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from config.definitions import CustomServiceList
|
||||||
|
from spacepackets.ecss import PusTelecommand
|
||||||
|
from tmtccmd.config import ServiceOpCodeDictT, add_op_code_entry, add_service_op_code_entry
|
||||||
|
from tmtccmd.tc.definitions import TcQueueT
|
||||||
|
from tmtccmd.utility import ObjectId
|
||||||
|
from tmtccmd.tc.pus_200_fsfw_modes import Modes, pack_mode_data, Subservices
|
||||||
|
import config.object_ids as oids
|
||||||
|
from config.object_ids import get_object_ids
|
||||||
|
|
||||||
|
RTD_IDS = [
|
||||||
|
oids.RTD_0_PLOC_HSPD,
|
||||||
|
oids.RTD_1_PLOC_MISSIONBRD,
|
||||||
|
oids.RTD_2_4K_CAM,
|
||||||
|
oids.RTD_3_DAC_HSPD,
|
||||||
|
oids.RTD_4_STR,
|
||||||
|
oids.RTD_5_RW1_MX_MY,
|
||||||
|
oids.RTD_6_DRO,
|
||||||
|
oids.RTD_7_SCEX,
|
||||||
|
oids.RTD_8_X8,
|
||||||
|
oids.RTD_9_HPA,
|
||||||
|
oids.RTD_10_PL_TX,
|
||||||
|
oids.RTD_11_MPA,
|
||||||
|
oids.RTD_12_ACU,
|
||||||
|
oids.RTD_13_PLPCDU_HSPD,
|
||||||
|
oids.RTD_14_TCS_BRD,
|
||||||
|
oids.RTD_15_IMTQ
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class OpCodes:
|
||||||
|
ON = ["0", "on"]
|
||||||
|
OFF = ["1", "off"]
|
||||||
|
NORMAL = ["2", "normal"]
|
||||||
|
|
||||||
|
|
||||||
|
class Info:
|
||||||
|
ON = "Switch handler on"
|
||||||
|
OFF = "Switch handler off"
|
||||||
|
NORMAL = "Switch handler normal"
|
||||||
|
|
||||||
|
|
||||||
|
def specify_rtd_cmds(cmd_dict: ServiceOpCodeDictT):
|
||||||
|
op_code_dict = dict()
|
||||||
|
add_op_code_entry(
|
||||||
|
op_code_dict=op_code_dict,
|
||||||
|
keys=OpCodes.ON,
|
||||||
|
info=Info.ON
|
||||||
|
)
|
||||||
|
add_op_code_entry(
|
||||||
|
op_code_dict=op_code_dict,
|
||||||
|
keys=OpCodes.NORMAL,
|
||||||
|
info=Info.NORMAL
|
||||||
|
)
|
||||||
|
add_op_code_entry(
|
||||||
|
op_code_dict=op_code_dict,
|
||||||
|
keys=OpCodes.OFF,
|
||||||
|
info=Info.OFF
|
||||||
|
)
|
||||||
|
add_service_op_code_entry(
|
||||||
|
srv_op_code_dict=cmd_dict,
|
||||||
|
op_code_entry=op_code_dict,
|
||||||
|
name=CustomServiceList.RTD.value,
|
||||||
|
info="RTD commands"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def pack_rtd_commands(op_code: str, object_id: Optional[ObjectId], tc_queue: TcQueueT):
|
||||||
|
if object_id is not None and object_id not in RTD_IDS:
|
||||||
|
print("Specified object ID not a valid RTD ID")
|
||||||
|
object_id = None
|
||||||
|
if object_id is None:
|
||||||
|
tgt_rtd_idx = prompt_rtd_idx()
|
||||||
|
object_id_dict = get_object_ids()
|
||||||
|
object_id = object_id_dict.get(RTD_IDS[tgt_rtd_idx])
|
||||||
|
if op_code in OpCodes.ON:
|
||||||
|
app_data = pack_mode_data(object_id=object_id.as_bytes, mode=Modes.ON, submode=0)
|
||||||
|
cmd = PusTelecommand(
|
||||||
|
service=200,
|
||||||
|
subservice=Subservices.TC_MODE_COMMAND,
|
||||||
|
app_data=app_data
|
||||||
|
)
|
||||||
|
tc_queue.appendleft(cmd.pack_command_tuple())
|
||||||
|
if op_code in OpCodes.NORMAL:
|
||||||
|
app_data = pack_mode_data(object_id=object_id.as_bytes, mode=Modes.NORMAL, submode=0)
|
||||||
|
cmd = PusTelecommand(
|
||||||
|
service=200,
|
||||||
|
subservice=Subservices.TC_MODE_COMMAND,
|
||||||
|
app_data=app_data
|
||||||
|
)
|
||||||
|
tc_queue.appendleft(cmd.pack_command_tuple())
|
||||||
|
if op_code in OpCodes.OFF:
|
||||||
|
app_data = pack_mode_data(object_id=object_id.as_bytes, mode=Modes.OFF, submode=0)
|
||||||
|
cmd = PusTelecommand(
|
||||||
|
service=200,
|
||||||
|
subservice=Subservices.TC_MODE_COMMAND,
|
||||||
|
app_data=app_data
|
||||||
|
)
|
||||||
|
tc_queue.appendleft(cmd.pack_command_tuple())
|
||||||
|
|
||||||
|
|
||||||
|
def prompt_rtd_idx():
|
||||||
|
while True:
|
||||||
|
rtd_idx = input("Please specify RTD index [0-15]: ")
|
||||||
|
if not rtd_idx.isdigit():
|
||||||
|
print("Invalid input")
|
||||||
|
continue
|
||||||
|
rtd_idx = int(rtd_idx)
|
||||||
|
if rtd_idx < 0 or rtd_idx > 15:
|
||||||
|
print("Invalid device index")
|
||||||
|
continue
|
||||||
|
return rtd_idx
|
@ -1,5 +1,11 @@
|
|||||||
from PyQt5.QtWidgets import (
|
from PyQt5.QtWidgets import (
|
||||||
QDialog, QDialogButtonBox, QVBoxLayout, QLabel, QGroupBox, QGridLayout, QLineEdit
|
QDialog,
|
||||||
|
QDialogButtonBox,
|
||||||
|
QVBoxLayout,
|
||||||
|
QLabel,
|
||||||
|
QGroupBox,
|
||||||
|
QGridLayout,
|
||||||
|
QLineEdit,
|
||||||
)
|
)
|
||||||
|
|
||||||
from PyQt5 import QtCore
|
from PyQt5 import QtCore
|
||||||
@ -22,6 +28,7 @@ class Parameter:
|
|||||||
self.widget.setPlaceholderText(self.defaultValue)
|
self.widget.setPlaceholderText(self.defaultValue)
|
||||||
self.widget.setText("")
|
self.widget.setText("")
|
||||||
|
|
||||||
|
|
||||||
class ParameterDialog(QDialog):
|
class ParameterDialog(QDialog):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
@ -59,7 +66,6 @@ class ParameterDialog(QDialog):
|
|||||||
|
|
||||||
self.parameters[name] = parameter
|
self.parameters[name] = parameter
|
||||||
|
|
||||||
|
|
||||||
def _reset(self):
|
def _reset(self):
|
||||||
for value in self.parameters.values():
|
for value in self.parameters.values():
|
||||||
value.reset()
|
value.reset()
|
||||||
@ -80,14 +86,13 @@ class ParameterDialog(QDialog):
|
|||||||
super().reject()
|
super().reject()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
"""Prompt the user to specify additional Parameters
|
"""Prompt the user to specify additional Parameters
|
||||||
|
|
||||||
:param parameterList: array of dictionaries with name and defaultValue attributes
|
:param parameterList: array of dictionaries with name and defaultValue attributes
|
||||||
:return: dict with all names as key and user supplied input as value string
|
:return: dict with all names as key and user supplied input as value string
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
def prompt_parameters(parameterList):
|
def prompt_parameters(parameterList):
|
||||||
gui = get_global(CoreGlobalIds.GUI)
|
gui = get_global(CoreGlobalIds.GUI)
|
||||||
mode = get_global(CoreGlobalIds.MODE)
|
mode = get_global(CoreGlobalIds.MODE)
|
||||||
@ -98,6 +103,7 @@ def prompt_parameters(parameterList):
|
|||||||
else:
|
else:
|
||||||
return _cli_prompt(parameterList)
|
return _cli_prompt(parameterList)
|
||||||
|
|
||||||
|
|
||||||
def _gui_prompt(parameterList):
|
def _gui_prompt(parameterList):
|
||||||
dialog = ParameterDialog()
|
dialog = ParameterDialog()
|
||||||
for parameter in parameterList:
|
for parameter in parameterList:
|
||||||
@ -105,10 +111,13 @@ def _gui_prompt(parameterList):
|
|||||||
dialog.exec_()
|
dialog.exec_()
|
||||||
return dialog.getParameters()
|
return dialog.getParameters()
|
||||||
|
|
||||||
|
|
||||||
def _cli_prompt(parameterList):
|
def _cli_prompt(parameterList):
|
||||||
result = {}
|
result = {}
|
||||||
for parameter in parameterList:
|
for parameter in parameterList:
|
||||||
userInput = input("Specify {} [{}]: ".format(parameter["name"], parameter["defaultValue"]))
|
userInput = input(
|
||||||
|
"Specify {} [{}]: ".format(parameter["name"], parameter["defaultValue"])
|
||||||
|
)
|
||||||
if userInput == "":
|
if userInput == "":
|
||||||
userInput = parameter["defaultValue"]
|
userInput = parameter["defaultValue"]
|
||||||
result[parameter["name"]] = userInput
|
result[parameter["name"]] = userInput
|
||||||
|
@ -18,8 +18,12 @@ class Info:
|
|||||||
|
|
||||||
|
|
||||||
def pack_controller_commands(tc_queue: TcQueueT, op_code: str):
|
def pack_controller_commands(tc_queue: TcQueueT, op_code: str):
|
||||||
parameters = prompt_parameters([{"name": "Mode", "defaultValue": "2"}, {
|
parameters = prompt_parameters(
|
||||||
"name": "Submode", "defaultValue": "0"}])
|
[
|
||||||
|
{"name": "Mode", "defaultValue": "2"},
|
||||||
|
{"name": "Submode", "defaultValue": "0"},
|
||||||
|
]
|
||||||
|
)
|
||||||
mode = int(parameters["Mode"])
|
mode = int(parameters["Mode"])
|
||||||
if mode < 0 or mode > 2:
|
if mode < 0 or mode > 2:
|
||||||
print("Invalid Mode, defaulting to OFF")
|
print("Invalid Mode, defaulting to OFF")
|
||||||
|
@ -5,6 +5,7 @@ import os
|
|||||||
from collections import deque
|
from collections import deque
|
||||||
from typing import Union
|
from typing import Union
|
||||||
|
|
||||||
|
from pus_tc.devs.rtd import pack_rtd_commands
|
||||||
from spacepackets.ecss import PusTelecommand
|
from spacepackets.ecss import PusTelecommand
|
||||||
from tmtccmd.com_if.com_interface_base import CommunicationInterface
|
from tmtccmd.com_if.com_interface_base import CommunicationInterface
|
||||||
from tmtccmd.config.definitions import CoreServiceList, QueueCommands
|
from tmtccmd.config.definitions import CoreServiceList, QueueCommands
|
||||||
@ -70,6 +71,7 @@ from config.object_ids import (
|
|||||||
SYRLINKS_HANDLER_ID,
|
SYRLINKS_HANDLER_ID,
|
||||||
SOLAR_ARRAY_DEPLOYMENT_ID,
|
SOLAR_ARRAY_DEPLOYMENT_ID,
|
||||||
RW_ASSEMBLY,
|
RW_ASSEMBLY,
|
||||||
|
get_object_ids,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -113,6 +115,8 @@ def pack_service_queue_user(
|
|||||||
return pack_p60dock_cmds(
|
return pack_p60dock_cmds(
|
||||||
object_id=object_id, tc_queue=service_queue, op_code=op_code
|
object_id=object_id, tc_queue=service_queue, op_code=op_code
|
||||||
)
|
)
|
||||||
|
if service == CustomServiceList.RTD.value:
|
||||||
|
return pack_rtd_commands(object_id=None, tc_queue=service_queue, op_code=op_code)
|
||||||
if service == CustomServiceList.PDU1.value:
|
if service == CustomServiceList.PDU1.value:
|
||||||
object_id = obj_id_man.get(PDU_1_HANDLER_ID)
|
object_id = obj_id_man.get(PDU_1_HANDLER_ID)
|
||||||
return pack_pdu1_commands(
|
return pack_pdu1_commands(
|
||||||
@ -172,7 +176,7 @@ def pack_service_queue_user(
|
|||||||
object_id=RW4_ID, rw_idx=4, tc_queue=service_queue, op_code=op_code
|
object_id=RW4_ID, rw_idx=4, tc_queue=service_queue, op_code=op_code
|
||||||
)
|
)
|
||||||
if service == CustomServiceList.RAD_SENSOR.value:
|
if service == CustomServiceList.RAD_SENSOR.value:
|
||||||
object_id = RAD_SENSOR_ID
|
object_id = obj_id_man.get(RAD_SENSOR_ID)
|
||||||
return pack_rad_sensor_test_into(
|
return pack_rad_sensor_test_into(
|
||||||
object_id=object_id, tc_queue=service_queue, op_code=op_code
|
object_id=object_id, tc_queue=service_queue, op_code=op_code
|
||||||
)
|
)
|
||||||
|
24
pus_tm/devs/rad_sensor.py
Normal file
24
pus_tm/devs/rad_sensor.py
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
import struct
|
||||||
|
|
||||||
|
from pus_tm.defs import PrintWrapper
|
||||||
|
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
|
||||||
|
from pus_tc.devs.rad_sensor import SetIds
|
||||||
|
|
||||||
|
|
||||||
|
def handle_rad_sensor_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
|
||||||
|
if set_id == SetIds.HK:
|
||||||
|
pw = PrintWrapper(printer)
|
||||||
|
current_idx = 0
|
||||||
|
pw.dlog("Received Radiation Sensor HK data")
|
||||||
|
fmt_str = "!fHHHHHH"
|
||||||
|
inc_len = struct.calcsize(fmt_str)
|
||||||
|
(temp, ain0, ain1, ain4, ain5, ain6, ain7) = struct.unpack(
|
||||||
|
fmt_str, hk_data[current_idx : current_idx + inc_len]
|
||||||
|
)
|
||||||
|
ain_dict = {0: ain0, 1: ain1, 4: ain4, 5: ain5, 6: ain6, 7: ain7}
|
||||||
|
pw.dlog(f"Temperature: {temp} C")
|
||||||
|
pw.dlog(f"AIN Channel | Raw Value (hex) | Raw Value (dec)")
|
||||||
|
for idx, val in ain_dict.items():
|
||||||
|
pw.dlog(f"{idx} | {val:#06x} | {str(val).ljust(5)}")
|
||||||
|
current_idx += inc_len
|
||||||
|
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=7)
|
@ -1,6 +1,7 @@
|
|||||||
"""HK Handling for EIVE OBSW"""
|
"""HK Handling for EIVE OBSW"""
|
||||||
import struct
|
import struct
|
||||||
|
|
||||||
|
from pus_tm.devs.rad_sensor import handle_rad_sensor_data
|
||||||
from pus_tm.system.tcs import handle_thermal_controller_hk_data, TM_TCP_SERVER
|
from pus_tm.system.tcs import handle_thermal_controller_hk_data, TM_TCP_SERVER
|
||||||
from tmtccmd.config.definitions import HkReplyUnpacked
|
from tmtccmd.config.definitions import HkReplyUnpacked
|
||||||
from tmtccmd.tm.pus_3_fsfw_hk import (
|
from tmtccmd.tm.pus_3_fsfw_hk import (
|
||||||
@ -99,6 +100,8 @@ def handle_regular_hk_print(
|
|||||||
)
|
)
|
||||||
if objb == obj_ids.ACU_HANDLER_ID:
|
if objb == obj_ids.ACU_HANDLER_ID:
|
||||||
return handle_acu_hk_data(printer=printer, hk_data=hk_data, set_id=set_id)
|
return handle_acu_hk_data(printer=printer, hk_data=hk_data, set_id=set_id)
|
||||||
|
if objb == obj_ids.RAD_SENSOR_ID:
|
||||||
|
return handle_rad_sensor_data(printer=printer, hk_data=hk_data, set_id=set_id)
|
||||||
if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]:
|
if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]:
|
||||||
return handle_rw_hk_data(
|
return handle_rw_hk_data(
|
||||||
printer=printer, object_id=object_id, set_id=set_id, hk_data=hk_data
|
printer=printer, object_id=object_id, set_id=set_id, hk_data=hk_data
|
||||||
|
@ -5,7 +5,8 @@ from tmtccmd.config.args import (
|
|||||||
create_default_args_parser,
|
create_default_args_parser,
|
||||||
add_gui_tmtccmd_args,
|
add_gui_tmtccmd_args,
|
||||||
parse_gui_input_arguments,
|
parse_gui_input_arguments,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
hook_obj = tmtcc_pre_args()
|
hook_obj = tmtcc_pre_args()
|
||||||
|
Loading…
Reference in New Issue
Block a user