Merge remote-tracking branch 'origin/main' into mueller/obsw-update-cmds

This commit is contained in:
2022-09-29 13:28:09 +02:00
7 changed files with 396 additions and 336 deletions

View File

@ -1,10 +1,12 @@
import enum
import json
from spacepackets.ecss import PusTelecommand
from config.definitions import CustomServiceList
from tmtccmd import DefaultProcedureInfo, TcHandlerBase
from tmtccmd.config.tmtc import tmtc_definitions_provider
from tmtccmd.tc import DefaultPusQueueHelper, service_provider
from tmtccmd.tc.pus_200_fsfw_modes import Modes, pack_mode_data, Subservices
from tmtccmd.tc import service_provider
from tmtccmd.tc.decorator import ServiceProviderParams
from tmtccmd.tc.pus_8_funccmd import make_fsfw_action_cmd
from tmtccmd.config import OpCodeEntry, TmtcDefinitionWrapper
@ -24,6 +26,9 @@ class OpCodes:
ALL_CELLS_CMD = ["5", "allcells"]
FRAM = ["6", "fram"]
SWITCH_ON = ["7", "on"]
SWITCH_OFF = ["8", "off"]
class ActionIds(enum.IntEnum):
PING = 7
@ -46,6 +51,9 @@ class Info:
ALL_CELLS_CMD = "All Cells"
FRAM = "Read FRAM"
SWITCH_ON = "Switch Scex on"
SWITCH_OFF = "Switch Scex off"
@tmtc_definitions_provider
def add_scex_cmds(defs: TmtcDefinitionWrapper):
@ -58,6 +66,8 @@ def add_scex_cmds(defs: TmtcDefinitionWrapper):
oce.add(keys=OpCodes.ALL_CELLS_CMD, info=Info.ALL_CELLS_CMD)
oce.add(keys=OpCodes.FRAM, info=Info.FRAM)
oce.add(keys=OpCodes.SWITCH_ON, info=Info.SWITCH_ON)
oce.add(keys=OpCodes.SWITCH_OFF, info=Info.SWITCH_OFF)
defs.add_service(
name=CustomServiceList.SCEX.value, info="SCEX Device", op_code_entry=oce
@ -68,6 +78,24 @@ def add_scex_cmds(defs: TmtcDefinitionWrapper):
def pack_scex_cmds(p: ServiceProviderParams):
op_code = p.op_code
q = p.queue_helper
if op_code in OpCodes.SWITCH_ON:
q.add_log_cmd(Info.SWITCH_ON)
q.add_pus_tc(
PusTelecommand(
service=200,
subservice=Subservices.TC_MODE_COMMAND,
app_data=pack_mode_data(SCEX_HANDLER_ID, Modes.ON, 0),
)
)
if op_code in OpCodes.SWITCH_OFF:
q.add_log_cmd(Info.SWITCH_OFF)
q.add_pus_tc(
PusTelecommand(
service=200,
subservice=Subservices.TC_MODE_COMMAND,
app_data=pack_mode_data(SCEX_HANDLER_ID, Modes.OFF, 0),
)
)
if op_code in OpCodes.PING:
q.add_log_cmd(Info.PING)
app_data = bytes([0])
@ -93,7 +121,7 @@ def pack_scex_cmds(p: ServiceProviderParams):
# one cell
if op_code in OpCodes.ONE_CELLS_CMD:
q.add_log_cmd(Info.ONE_CELLS_CMD)
app_data = bytearray()
app_data = bytearray([0])
# cell number
cn = 0
@ -125,6 +153,7 @@ def pack_scex_cmds(p: ServiceProviderParams):
# in app_data
# app_data.extend(struct.pack("!H", first_dac))
app_data.append(cell_select)
append_16_bit_val(packet=app_data, val=first_dac[cn])
append_16_bit_val(packet=app_data, val=last_dac[cn])
append_16_bit_val(packet=app_data, val=res_switch1[cn])
@ -140,7 +169,7 @@ def pack_scex_cmds(p: ServiceProviderParams):
if op_code in OpCodes.ALL_CELLS_CMD:
q.add_log_cmd(Info.ALL_CELLS_CMD)
app_data = bytearray()
app_data = bytearray([0])
# cell number
cn = 0

View File

@ -211,7 +211,14 @@ def handle_default_procedure(
gui=tc_handler.gui,
)
if service == CustomServiceList.SCEX.value:
return pack_scex_cmds(q=queue_helper, op_code=op_code)
return pack_scex_cmds(
ServiceProviderParams(
handler_base=tc_base,
op_code=op_code,
info=info,
queue_helper=queue_helper,
)
)
if not route_to_registered_service_handlers(
service,
ServiceProviderParams(