Heater Commands #63

Merged
muellerr merged 10 commits from mueller/heater-cmds into develop 2022-05-05 14:32:42 +02:00
7 changed files with 143 additions and 19 deletions
Showing only changes of commit d86a85b06d - Show all commits

View File

@ -67,6 +67,16 @@ STR_IMG_HELPER_ID = bytes([0x44, 0x33, 0x00, 0x02])
PLOC_MPSOC_ID = bytes([0x44, 0x33, 0x00, 0x15])
PL_PCDU_ID = bytes([0x44, 0x30, 0x00, 0x00])
# Other
HEATER_0_OBC_BRD = bytes([0x60, 0x00, 0x00, 0x00])
HEATER_1_PLOC_PROC_BRD = bytes([0x60, 0x00, 0x00, 0x01])
HEATER_2_ACS_BRD = bytes([0x60, 0x00, 0x00, 0x02])
HEATER_3_PCDU_BRD = bytes([0x60, 0x00, 0x00, 0x03])
HEATER_4_CAMERA = bytes([0x60, 0x00, 0x00, 0x04])
HEATER_5_STR = bytes([0x60, 0x00, 0x00, 0x05])
HEATER_6_DRO = bytes([0x60, 0x00, 0x00, 0x06])
HEATER_7_HPA = bytes([0x60, 0x00, 0x00, 0x07])
# System and Assembly Objects
ACS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x01])
SUS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x02])

View File

@ -899,8 +899,14 @@ def add_ploc_supv_cmds(cmd_dict: ServiceOpCodeDictT):
"46": ("PLOC Supervisor: Factory flash", {OpCodeDictKeys.TIMEOUT: 2.0}),
"47": ("PLOC Supervisor: Enable auto TM", {OpCodeDictKeys.TIMEOUT: 2.0}),
"48": ("PLOC Supervisor: Disable auto TM", {OpCodeDictKeys.TIMEOUT: 2.0}),
"51": ("PLOC Supervisor: Logging request event buffers", {OpCodeDictKeys.TIMEOUT: 2.0}),
"52": ("PLOC Supervisor: Logging clear counters", {OpCodeDictKeys.TIMEOUT: 2.0}),
"51": (
"PLOC Supervisor: Logging request event buffers",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"52": (
"PLOC Supervisor: Logging clear counters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"53": ("PLOC Supervisor: Logging set topic", {OpCodeDictKeys.TIMEOUT: 2.0}),
"54": (
"PLOC Supervisor: Logging request counters",

View File

@ -6,7 +6,10 @@
import enum
from config.definitions import CustomServiceList
from config.object_ids import get_object_ids
from tmtccmd.pus.obj_id import ObjectIdDictT, ObjectId
from tmtccmd.config.definitions import QueueCommands, ServiceOpCodeDictT
from tmtccmd.tc.cpus_201_health import pack_set_health_cmd_data, FsfwHealth, Subservices
from tmtccmd.tc.service_8_functional_cmd import generate_action_command
from tmtccmd.config.globals import add_service_op_code_entry, add_op_code_entry
from tmtccmd.tc.packer import TcQueueT
@ -27,12 +30,16 @@ class SwitchNumbers:
class OpCodes:
HEATER_CMD = ["0", "switch-cmd"]
HEATER_EXT_CTRL = ["1", "heater-ext-ctrl"]
HEATER_EXT_CTRL = ["1", "set-ext-ctrl"]
HEATER_FAULTY_CMD = ["2", "set-faulty"]
HEATER_HEALTHY_CMD = ["3", "set-healthy"]
class Info:
HEATER_CMD = "Heater Switch Command"
HEATER_EXT_CTRL = "Set to external control"
HEATER_FAULTY_CMD = "Set to faulty"
HEATER_HEALTHY_CMD = "Set to healthy"
class ActionIds(enum.IntEnum):
@ -44,6 +51,21 @@ def add_heater_cmds(cmd_dict: ServiceOpCodeDictT):
add_op_code_entry(
op_code_dict=op_code_dict, keys=OpCodes.HEATER_CMD, info=Info.HEATER_CMD
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.HEATER_HEALTHY_CMD,
info=Info.HEATER_HEALTHY_CMD,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.HEATER_EXT_CTRL,
info=Info.HEATER_EXT_CTRL,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.HEATER_FAULTY_CMD,
info=Info.HEATER_FAULTY_CMD,
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
name=CustomServiceList.HEATER.value,
@ -55,16 +77,7 @@ def add_heater_cmds(cmd_dict: ServiceOpCodeDictT):
def pack_heater_cmds(object_id: bytearray, op_code: str, tc_queue: TcQueueT):
if op_code in OpCodes.HEATER_CMD:
tc_queue.appendleft((QueueCommands.PRINT, "Heater Switching"))
while True:
heater_number = input("Type number of heater to switch [0-7]: ")
if not heater_number.isdigit():
print("Heater number not a digit")
continue
heater_number = int(heater_number)
if heater_number >= SwitchNumbers.NUMBER_OF_SWITCHES or heater_number < 0:
print("Invalid heater switch number")
continue
break
heater_number = prompt_heater()
while True:
action = input("Turn switch on or off? (0 - off, 1 - on): ")
if not action.isdigit():
@ -84,7 +97,98 @@ def pack_heater_cmds(object_id: bytearray, op_code: str, tc_queue: TcQueueT):
command = pack_switch_heater_command(object_id, heater_number, action)
tc_queue.appendleft(command.pack_command_tuple())
if op_code in OpCodes.HEATER_EXT_CTRL:
pass
heater_number = prompt_heater()
obj_id = heater_idx_to_obj(heater_number)
health_cmd(
tc_queue=tc_queue,
object_id=obj_id,
health=FsfwHealth.EXTERNAL_CTRL,
health_str="External Control",
heater_idx=heater_number,
)
if op_code in OpCodes.HEATER_FAULTY_CMD:
heater_number = prompt_heater()
obj_id = heater_idx_to_obj(heater_number)
health_cmd(
tc_queue=tc_queue,
object_id=obj_id,
health=FsfwHealth.FAULTY,
health_str="Faulty",
heater_idx=heater_number,
)
if op_code in OpCodes.HEATER_HEALTHY_CMD:
heater_number = prompt_heater()
obj_id = heater_idx_to_obj(heater_number)
health_cmd(
tc_queue=tc_queue,
object_id=obj_id,
health=FsfwHealth.HEALTHY,
health_str="Healthy",
heater_idx=heater_number,
)
def heater_idx_to_obj(heater: int) -> ObjectId:
from config.object_ids import (
HEATER_0_OBC_BRD,
HEATER_1_PLOC_PROC_BRD,
HEATER_2_ACS_BRD,
HEATER_3_PCDU_BRD,
HEATER_4_CAMERA,
HEATER_5_STR,
HEATER_6_DRO,
HEATER_7_HPA,
)
obj_id_array = [
HEATER_0_OBC_BRD,
HEATER_1_PLOC_PROC_BRD,
HEATER_2_ACS_BRD,
HEATER_3_PCDU_BRD,
HEATER_4_CAMERA,
HEATER_5_STR,
HEATER_6_DRO,
HEATER_7_HPA,
]
obj_dict = get_object_ids()
obj_id_obj = obj_dict.get(obj_id_array[heater])
if obj_id_obj is None:
return ObjectId.from_bytes(obj_id_array[heater])
return obj_id_obj
def prompt_heater() -> int:
while True:
heater_number = input("Type number of heater to switch [0-7]: ")
if not heater_number.isdigit():
print("Heater number not a digit")
continue
heater_number = int(heater_number)
if heater_number >= SwitchNumbers.NUMBER_OF_SWITCHES or heater_number < 0:
print("Invalid heater switch number")
continue
break
return heater_number
def health_cmd(
tc_queue: TcQueueT,
heater_idx: int,
object_id: ObjectId,
health: FsfwHealth,
health_str: str,
):
tc_queue.appendleft(
(
QueueCommands.PRINT,
f"Setting Heater {heater_idx} {object_id} to {health_str}",
)
)
app_data = pack_set_health_cmd_data(object_id=object_id.as_bytes, health=health)
cmd = PusTelecommand(
service=201, subservice=Subservices.TC_SET_HEALTH, app_data=app_data
)
tc_queue.appendleft(cmd.pack_command_tuple())
def pack_switch_heater_command(

View File

@ -64,7 +64,6 @@ class PlocReplyIds(enum.IntEnum):
TM_CAM_CMD_RPT = 19
def pack_ploc_mpsoc_commands(
object_id: bytearray, tc_queue: TcQueueT, op_code: str
) -> TcQueueT:

View File

@ -134,7 +134,9 @@ def pack_service_queue_user(
)
if service == CustomServiceList.HEATER.value:
object_id = HEATER_ID
return pack_heater_cmds(object_id=object_id, tc_queue=service_queue, op_code=op_code)
return pack_heater_cmds(
object_id=object_id, tc_queue=service_queue, op_code=op_code
)
if service == CustomServiceList.IMTQ.value:
object_id = IMTQ_HANDLER_ID
return pack_imtq_test_into(

View File

@ -75,7 +75,10 @@ def handle_ploc_replies(
printer.file_logger.info(content_list)
elif action_id == PlocReplyIds.TM_CAM_CMD_RPT:
header_list = ["Camera reply string", "ACK"]
content_list = [custom_data[:len(custom_data) - 1].decode('utf-8'), hex(custom_data[-1])]
content_list = [
custom_data[: len(custom_data) - 1].decode("utf-8"),
hex(custom_data[-1]),
]
print(header_list)
print(content_list)
printer.file_logger.info(header_list)
@ -95,7 +98,7 @@ def handle_supervisor_replies(
printer.file_logger.info(content_list)
elif action_id == SupvActionIds.READ_GPIO:
header_list = ["GPIO state"]
content_list = [struct.unpack('!H', custom_data[:2])[0]]
content_list = [struct.unpack("!H", custom_data[:2])[0]]
print(header_list)
print(content_list)
printer.file_logger.info(header_list)

@ -1 +1 @@
Subproject commit 9f566a739ab91ed8f461cdbb6a3179faf912e4e0
Subproject commit 18912c1e906bf9a997a5e927a36df6ef38874d17