continued update to new API

This commit is contained in:
Robin Müller 2022-07-04 17:59:09 +02:00
parent 27edcbd71d
commit 2b285417a0
No known key found for this signature in database
GPG Key ID: 71B58F8A3CDFA9AC
19 changed files with 727 additions and 1012 deletions

2
deps/tmtccmd vendored

@ -1 +1 @@
Subproject commit e5d12ca4c0da06338e575701972aaf4180b49635
Subproject commit 2890c62e31999706ff1a7164b8a9f7cbc7bc34d5

View File

@ -7,9 +7,9 @@
"""
import struct
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper
from tmtccmd.utility import ObjectId
class CommandIds:
@ -32,84 +32,47 @@ class CommandIds:
UPDATE_ON_FALLING_EDGE = 8
def pack_ccsds_handler_test(
object_id: bytearray, tc_queue: TcQueueT, op_code: str
) -> TcQueueT:
tc_queue.appendleft(
(
QueueCommands.PRINT,
"Testing CCSDS handler with object id: 0x" + object_id.hex(),
)
)
def pack_ccsds_handler_test(object_id: ObjectId, q: QueueHelper, op_code: str):
obyt = object_id.as_bytes
q.add_log_cmd(f"Testing CCSDS handler with object id: {object_id.as_hex_string}")
if op_code == "0":
tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Set low rate"))
command = object_id + struct.pack("!I", CommandIds.SET_LOW_RATE)
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("CCSDS Handler: Set low rate")
command = obyt + struct.pack("!I", CommandIds.SET_LOW_RATE)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "1":
tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Set high rate"))
command = object_id + struct.pack("!I", CommandIds.SET_HIGH_RATE)
command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("CCSDS Handler: Set high rate")
command = obyt + struct.pack("!I", CommandIds.SET_HIGH_RATE)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "2":
tc_queue.appendleft(
(QueueCommands.PRINT, "CCSDS Handler: Enables the transmitter")
)
command = object_id + struct.pack("!I", CommandIds.EN_TRANSMITTER)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("CCSDS Handler: Enables the transmitter")
command = obyt + struct.pack("!I", CommandIds.EN_TRANSMITTER)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "3":
tc_queue.appendleft(
(QueueCommands.PRINT, "CCSDS Handler: Disables the transmitter")
)
command = object_id + struct.pack("!I", CommandIds.DIS_TRANSMITTER)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("CCSDS Handler: Disables the transmitter")
command = obyt + struct.pack("!I", CommandIds.DIS_TRANSMITTER)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "4":
tc_queue.appendleft(
(QueueCommands.PRINT, "CCSDS Handler: Set arbitrary bitrate")
)
q.add_log_cmd("CCSDS Handler: Set arbitrary bitrate")
bitrate = int(input("Specify bit rate (bps): "))
command = (
object_id
obyt
+ struct.pack("!I", CommandIds.ARBITRARY_BITRATE)
+ struct.pack("!I", bitrate)
)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "5":
tc_queue.appendleft(
(QueueCommands.PRINT, "CCSDS Handler: Enable tx clock manipulator")
)
command = object_id + struct.pack("!I", CommandIds.ENABLE_TX_CLK_MANIPULATOR)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("CCSDS Handler: Enable tx clock manipulator")
command = obyt + struct.pack("!I", CommandIds.ENABLE_TX_CLK_MANIPULATOR)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "6":
tc_queue.appendleft(
(QueueCommands.PRINT, "CCSDS Handler: Disable tx clock manipulator")
)
command = object_id + struct.pack("!I", CommandIds.DISABLE_TX_CLK_MANIPULATOR)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("CCSDS Handler: Disable tx clock manipulator")
command = obyt + struct.pack("!I", CommandIds.DISABLE_TX_CLK_MANIPULATOR)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "7":
tc_queue.appendleft(
(
QueueCommands.PRINT,
"CCSDS Handler: Update tx data on rising edge of tx clock",
)
)
command = object_id + struct.pack("!I", CommandIds.UPDATE_ON_RISING_EDGE)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("CCSDS Handler: Update tx data on rising edge of tx clock")
command = obyt + struct.pack("!I", CommandIds.UPDATE_ON_RISING_EDGE)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "8":
tc_queue.appendleft(
(
QueueCommands.PRINT,
"CCSDS Handler: Update tx data on falling edge of tx clock",
)
)
command = object_id + struct.pack("!I", CommandIds.UPDATE_ON_FALLING_EDGE)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue
q.add_log_cmd("CCSDS Handler: Update tx data on falling edge of tx clock")
command = obyt + struct.pack("!I", CommandIds.UPDATE_ON_FALLING_EDGE)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))

View File

@ -1,12 +1,9 @@
import enum
from config.definitions import CustomServiceList
from tmtccmd.config import add_op_code_entry, add_service_op_code_entry
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
from tmtccmd.config.definitions import QueueCommands, ServiceOpCodeDictT
from tmtccmd.logging import get_console_logger
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.pus_8_funccmd import generate_action_command
LOGGER = get_console_logger()
@ -42,13 +39,12 @@ def add_gps_cmds(cmd_dict: ServiceOpCodeDictT):
)
def pack_gps_command(object_id: bytes, tc_queue: TcQueueT, op_code: str):
def pack_gps_command(object_id: bytes, q: QueueHelper, op_code: str):
if op_code in OpCodes.RESET_GNSS:
# TODO: This needs to be re-implemented
LOGGER.warning("Reset pin handling needs to be re-implemented")
if op_code in OpCodes.REQ_OS_HK:
tc_queue.appendleft((QueueCommands.PRINT, f"GMSS: {Info.REQ_OS_HK}"))
cmd = generate_one_hk_command(
sid=make_sid(object_id=object_id, set_id=SetIds.HK), ssc=0
q.add_log_cmd(f"GMSS: {Info.REQ_OS_HK}")
q.add_pus_tc(
generate_one_hk_command(sid=make_sid(object_id=object_id, set_id=SetIds.HK))
)
tc_queue.appendleft(cmd.pack_command_tuple())

View File

@ -8,36 +8,30 @@
"""
import struct
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper
from tmtccmd.utility import ObjectId
class ActionIds:
DUMP_MRAM = 1
def pack_ploc_memory_dumper_cmd(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
tc_queue.appendleft(
(
QueueCommands.PRINT,
"Testing PLOC memory dumper with object id: 0x" + object_id.hex(),
)
def pack_ploc_memory_dumper_cmd(object_id: ObjectId, q: QueueHelper, op_code: str):
q.add_log_cmd(
f"Testing PLOC memory dumper with object id: {object_id.as_hex_string}"
)
if op_code == "0":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Dump MRAM"))
command = pack_mram_dump_cmd(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("PLOC Supervisor: Dump MRAM")
command = pack_mram_dump_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
def pack_mram_dump_cmd(object_id: bytearray) -> bytearray:
def pack_mram_dump_cmd(object_id: bytes) -> bytearray:
start = int(input("Start address: 0x"), 16)
end = int(input("End address: 0x"), 16)
command = bytearray()
command = object_id + struct.pack("!I", ActionIds.DUMP_MRAM)
command = command + struct.pack("!I", start)
command = command + struct.pack("!I", end)
return command
return bytearray(command)

View File

@ -11,6 +11,8 @@ from tmtccmd.config import (
add_service_op_code_entry,
)
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import (
generate_one_hk_command,
make_sid,
@ -197,18 +199,14 @@ def add_pl_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
)
def pack_pl_pcdu_commands(tc_queue: TcQueueT, op_code: str):
def pack_pl_pcdu_commands(q: QueueHelper, op_code: str):
if op_code in OpCodes.SWITCH_ON:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue, info=Info.SWITCH_ON, mode=Modes.ON, submode=0
)
pack_pl_pcdu_mode_cmd(q=q, info=Info.SWITCH_ON, mode=Modes.ON, submode=0)
if op_code in OpCodes.SWITCH_OFF:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue, info=Info.SWITCH_OFF, mode=Modes.OFF, submode=0
)
pack_pl_pcdu_mode_cmd(q=q, info=Info.SWITCH_OFF, mode=Modes.OFF, submode=0)
if op_code in OpCodes.NORMAL_SSR:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue,
q=q,
info=Info.NORMAL_SSR,
mode=Modes.NORMAL,
submode=submode_mask_to_submode(
@ -217,64 +215,63 @@ def pack_pl_pcdu_commands(tc_queue: TcQueueT, op_code: str):
)
if op_code in OpCodes.NORMAL_DRO:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue,
q=q,
info=Info.NORMAL_DRO,
mode=Modes.NORMAL,
submode=submode_mask_to_submode(NormalSubmodesMask.DRO_ON),
)
if op_code in OpCodes.NORMAL_X8:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue,
q=q,
info=Info.NORMAL_X8,
mode=Modes.NORMAL,
submode=submode_mask_to_submode(NormalSubmodesMask.X8_ON),
)
if op_code in OpCodes.NORMAL_TX:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue,
q=q,
info=Info.NORMAL_TX,
mode=Modes.NORMAL,
submode=submode_mask_to_submode(NormalSubmodesMask.TX_ON),
)
if op_code in OpCodes.NORMAL_MPA:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue,
q=q,
info=Info.NORMAL_MPA,
mode=Modes.NORMAL,
submode=submode_mask_to_submode(NormalSubmodesMask.MPA_ON),
)
if op_code in OpCodes.NORMAL_HPA:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue,
q=q,
info=Info.NORMAL_HPA,
mode=Modes.NORMAL,
submode=submode_mask_to_submode(NormalSubmodesMask.HPA_ON),
)
if op_code in OpCodes.REQ_OS_HK:
tc_queue.appendleft((QueueCommands.PRINT, f"PL PCDU: {Info.REQ_OS_HK}"))
cmd = generate_one_diag_command(
sid=make_sid(object_id=PL_PCDU_ID, set_id=SetIds.ADC), ssc=0
q.add_log_cmd(f"PL PCDU: {Info.REQ_OS_HK}")
q.add_pus_tc(
generate_one_diag_command(
sid=make_sid(object_id=PL_PCDU_ID, set_id=SetIds.ADC)
)
)
tc_queue.appendleft(cmd.pack_command_tuple())
if op_code in OpCodes.SWITCH_HPA_ON_PROC:
hpa_on_procedure(tc_queue)
hpa_on_procedure(q)
if op_code in OpCodes.INJECT_ALL_ON_FAILURE:
pack_failure_injection_cmd(
tc_queue=tc_queue,
q=q,
param_id=ParamIds.INJECT_ALL_ON_FAILURE,
print_str="All On",
)
def hpa_on_procedure(tc_queue: TcQueueT):
def hpa_on_procedure(q: QueueHelper):
delay_dro_to_x8 = request_wait_time()
if delay_dro_to_x8 is None:
delay_dro_to_x8 = 900
tc_queue.appendleft(
(
QueueCommands.PRINT,
f"Starting procedure to switch on PL PCDU HPA with DRO to X8 delay of {delay_dro_to_x8} seconds",
)
q.add_log_cmd(
f"Starting procedure to switch on PL PCDU HPA with DRO to X8 "
f"delay of {delay_dro_to_x8} seconds"
)
pl_pcdu_on = PusTelecommand(
service=200,
@ -339,54 +336,52 @@ def hpa_on_procedure(tc_queue: TcQueueT):
)
current_time = time.time()
enb_sched = generate_enable_tc_sched_cmd(ssc=0)
enb_sched = generate_enable_tc_sched_cmd()
sched_time = current_time + 10
tc_queue.appendleft(enb_sched.pack_command_tuple())
q.add_pus_tc(enb_sched)
tagged_on_cmd = generate_time_tagged_cmd(
release_time=struct.pack("!I", sched_time),
tc_to_insert=pl_pcdu_on,
ssc=1,
)
tc_queue.appendleft(tagged_on_cmd.pack_command_tuple())
q.add_pus_tc(tagged_on_cmd)
sched_time += 5
tagged_ssr_cmd = generate_time_tagged_cmd(
release_time=struct.pack("!I", sched_time),
tc_to_insert=ssr_on,
ssc=2,
)
tc_queue.appendleft(tagged_ssr_cmd.pack_command_tuple())
q.add_pus_tc(tagged_ssr_cmd)
sched_time += 5
tagged_dro_cmd = generate_time_tagged_cmd(
release_time=struct.pack("!I", sched_time), tc_to_insert=dro_on, ssc=3
release_time=struct.pack("!I", sched_time), tc_to_insert=dro_on
)
tc_queue.appendleft(tagged_dro_cmd.pack_command_tuple())
q.add_pus_tc(tagged_dro_cmd)
sched_time += delay_dro_to_x8
tagged_x8_cmd = generate_time_tagged_cmd(
release_time=struct.pack("!I", sched_time), tc_to_insert=x8_on, ssc=4
release_time=struct.pack("!I", sched_time), tc_to_insert=x8_on
)
tc_queue.appendleft(tagged_x8_cmd.pack_command_tuple())
q.add_pus_tc(tagged_x8_cmd)
sched_time += 5
tagged_tx_cmd = generate_time_tagged_cmd(
release_time=struct.pack("!I", sched_time), tc_to_insert=tx_on, ssc=5
release_time=struct.pack("!I", sched_time), tc_to_insert=tx_on
)
tc_queue.appendleft(tagged_tx_cmd.pack_command_tuple())
q.add_pus_tc(tagged_tx_cmd)
sched_time += 5
tagged_mpa_cmd = generate_time_tagged_cmd(
release_time=struct.pack("!I", sched_time), tc_to_insert=mpa_on, ssc=6
release_time=struct.pack("!I", sched_time), tc_to_insert=mpa_on
)
tc_queue.appendleft(tagged_mpa_cmd.pack_command_tuple())
q.add_pus_tc(tagged_mpa_cmd)
sched_time += 5
tagged_hpa_cmd = generate_time_tagged_cmd(
release_time=struct.pack("!I", sched_time), tc_to_insert=hpa_on, ssc=7
release_time=struct.pack("!I", sched_time), tc_to_insert=hpa_on
)
tc_queue.appendleft(tagged_hpa_cmd.pack_command_tuple())
q.add_pus_tc(tagged_hpa_cmd)
def request_wait_time() -> Optional[float]:
@ -457,28 +452,23 @@ def pack_wait_time_cmd(tc_queue: TcQueueT, param_id: int, print_str: str):
unique_id=param_id,
parameter=wait_time,
)
cmd = pack_fsfw_load_param_cmd(ssc=0, app_data=param_data)
cmd = pack_fsfw_load_param_cmd(app_data=param_data)
tc_queue.appendleft(cmd.pack_command_tuple())
def pack_failure_injection_cmd(tc_queue: TcQueueT, param_id: int, print_str: str):
tc_queue.appendleft((QueueCommands.PRINT, f"Inserting {print_str} error"))
def pack_failure_injection_cmd(q: QueueHelper, param_id: int, print_str: str):
q.add_log_cmd(f"Inserting {print_str} error")
param_data = pack_boolean_parameter_app_data(
object_id=PL_PCDU_ID, domain_id=0, unique_id=param_id, parameter=True
)
cmd = pack_fsfw_load_param_cmd(ssc=0, app_data=param_data)
tc_queue.appendleft(cmd.pack_command_tuple())
q.add_pus_tc(pack_fsfw_load_param_cmd(app_data=param_data))
def pack_pl_pcdu_mode_cmd(tc_queue: TcQueueT, info: str, mode: Modes, submode: int):
tc_queue.appendleft(
(
QueueCommands.PRINT,
info,
def pack_pl_pcdu_mode_cmd(q: QueueHelper, info: str, mode: Modes, submode: int):
q.add_log_cmd(info)
mode_data = pack_mode_data(object_id=PL_PCDU_ID, mode=mode, submode=submode)
q.add_pus_tc(
PusTelecommand(
service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=mode_data
)
)
mode_data = pack_mode_data(object_id=PL_PCDU_ID, mode=mode, submode=submode)
mode_cmd = PusTelecommand(
service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=mode_data
)
tc_queue.appendleft(mode_cmd.pack_command_tuple())

View File

@ -130,7 +130,7 @@ def add_rw_cmds(cmd_dict: ServiceOpCodeDictT):
def pack_single_rw_test_into(
object_id: bytes, rw_idx: int, q: QueueHelper, op_code: str
) -> TcQueueT:
):
if op_code in OpCodesDevs.SPEED:
speed = int(input("Specify speed [0.1 RPM]: "))
ramp_time = int(input("Specify ramp time [ms]: "))
@ -166,28 +166,30 @@ def pack_single_rw_test_into(
sid=make_sid(object_id=object_id, set_id=RwSetIds.STATUS_SET_ID)
)
)
return q
def pack_rw_ass_cmds(tc_queue: TcQueueT, object_id: bytes, op_code: str):
def pack_rw_ass_cmds(q: QueueHelper, object_id: bytes, op_code: str):
if op_code in OpCodesAss.OFF:
data = pack_mode_data(object_id=object_id, mode=Modes.OFF, submode=0)
cmd = PusTelecommand(
service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=data
q.add_pus_tc(
PusTelecommand(
service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=data
)
)
tc_queue.appendleft(cmd.pack_command_tuple())
if op_code in OpCodesAss.ON:
data = pack_mode_data(object_id=object_id, mode=Modes.ON, submode=0)
cmd = PusTelecommand(
service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=data
q.add_pus_tc(
PusTelecommand(
service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=data
)
)
tc_queue.appendleft(cmd.pack_command_tuple())
if op_code in OpCodesAss.NML:
data = pack_mode_data(object_id=object_id, mode=Modes.NORMAL, submode=0)
cmd = PusTelecommand(
service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=data
q.add_pus_tc(
PusTelecommand(
service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=data
)
)
tc_queue.appendleft(cmd.pack_command_tuple())
def pack_set_speed_command(

View File

@ -5,18 +5,16 @@
@author J. Meier
@date 15.02.2021
"""
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.packer import PusTelecommand
from spacepackets.ecss import PusTelecommand
from tmtccmd.tc import QueueHelper
class ActionIds:
DEPLOY_SOLAR_ARRAYS = bytearray([0x0, 0x0, 0x0, 0x5])
def pack_solar_array_deployment_test_into(object_id: bytearray, tc_queue: TcQueueT):
tc_queue.appendleft((QueueCommands.PRINT, "Testing S/A Deployment"))
def pack_solar_array_deployment_test_into(object_id: bytearray, q: QueueHelper):
q.add_log_cmd("Testing S/A Deployment")
command = object_id + ActionIds.DEPLOY_SOLAR_ARRAYS
command = PusTelecommand(service=8, subservice=128, ssc=200, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))

View File

@ -7,12 +7,12 @@
"""
import struct
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
from tmtccmd.logging import get_console_logger
from tmtccmd.tc import QueueHelper
from tmtccmd.utility import ObjectId
from utility.input_helper import InputHelper
@ -150,623 +150,483 @@ class Submode:
FIRMWARE = 2
def pack_star_tracker_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
tc_queue.appendleft(
(
QueueCommands.PRINT,
"Generate command for star tracker with object id: 0x" + object_id.hex(),
)
def pack_star_tracker_commands(object_id: ObjectId, q: QueueHelper, op_code: str):
q.add_log_cmd(
f"Generate command for star tracker with object id: {object_id.as_hex_string}"
)
obyt = object_id.as_bytes
if op_code == "0":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Mode On, Submode Bootloader")
)
command = pack_mode_data(object_id, Modes.ON, Submode.BOOTLOADER)
command = PusTelecommand(service=200, subservice=1, ssc=9, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Mode On, Submode Bootloader")
data = pack_mode_data(obyt, Modes.ON, Submode.BOOTLOADER)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data))
if op_code == "1":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Mode On, Submode Firmware")
)
command = pack_mode_data(object_id, Modes.ON, Submode.FIRMWARE)
command = PusTelecommand(service=200, subservice=1, ssc=10, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Mode On, Submode Firmware")
data = pack_mode_data(obyt, Modes.ON, Submode.FIRMWARE)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data))
if op_code == "2":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Mode Normal"))
command = pack_mode_data(object_id, Modes.NORMAL, 0)
command = PusTelecommand(service=200, subservice=1, ssc=11, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Mode Normal")
data = pack_mode_data(obyt, Modes.NORMAL, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data))
if op_code == "3":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Mode Off"))
command = pack_mode_data(object_id, Modes.OFF, 0)
command = PusTelecommand(service=200, subservice=1, ssc=12, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Mode Off")
data = pack_mode_data(obyt, Modes.OFF, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data))
if op_code == "4":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Mode Raw"))
command = pack_mode_data(object_id, 3, 0)
command = PusTelecommand(service=200, subservice=1, ssc=13, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Mode Raw")
data = pack_mode_data(obyt, Modes.RAW, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data))
if op_code == "5":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Ping"))
command = object_id + struct.pack("!I", StarTrackerActionIds.PING)
command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Ping")
data = obyt + struct.pack("!I", StarTrackerActionIds.PING)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "6":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Switch to bootloader program")
)
command = object_id + struct.pack(
q.add_log_cmd("Star tracker: Switch to bootloader program")
data = obyt + struct.pack(
"!I", StarTrackerActionIds.SWITCH_TO_BOOTLOADER_PROGRAM
)
command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "7":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Temperature request"))
command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_TEMPERATURE)
command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Temperature request")
data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_TEMPERATURE)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "8":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Request version"))
command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_VERSION)
command = PusTelecommand(service=8, subservice=128, ssc=33, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Request version")
data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_VERSION)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "9":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Request interface"))
command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_INTERFACE)
command = PusTelecommand(service=8, subservice=128, ssc=34, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Request interface")
data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_INTERFACE)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "10":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Request power"))
command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_POWER)
command = PusTelecommand(service=8, subservice=128, ssc=35, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Request power")
data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_POWER)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "11":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Set subscription parameters")
)
q.add_log_cmd("Star tracker: Set subscription parameters")
json_file = get_config_file()
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.SUBSCRIPTION)
+ bytearray(json_file, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=36, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "12":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Boot"))
command = object_id + struct.pack("!I", StarTrackerActionIds.BOOT)
command = PusTelecommand(service=8, subservice=128, ssc=37, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Boot")
data = obyt + struct.pack("!I", StarTrackerActionIds.BOOT)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "13":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Request time"))
command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_TIME)
command = PusTelecommand(service=8, subservice=128, ssc=38, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Request time")
data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_TIME)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "14":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Request solution"))
command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_SOLUTION)
command = PusTelecommand(service=8, subservice=128, ssc=39, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Request solution")
data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_SOLUTION)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "15":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Upload image"))
q.add_log_cmd("Star tracker: Upload image")
image = get_upload_image()
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.UPLOAD_IMAGE)
+ bytearray(image, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=40, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "16":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Download image"))
q.add_log_cmd("Star tracker: Download image")
path = input("Specify storage location (default - /mnt/sd0/startracker): ")
if not path:
path = FileDefs.download_path
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.DOWNLOAD_IMAGE)
+ bytearray(path, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=53, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "17":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Set limits"))
q.add_log_cmd("Star tracker: Set limits")
json_file = get_config_file()
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.LIMITS)
+ bytearray(json_file, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=42, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "18":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Set tracking parameters")
)
q.add_log_cmd("Star tracker: Set tracking parameters")
json_file = get_config_file()
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.TRACKING)
+ bytearray(json_file, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=43, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "19":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Mounting"))
q.add_log_cmd("Star tracker: Mounting")
json_file = get_config_file()
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.MOUNTING)
+ bytearray(json_file, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=44, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "20":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Camera"))
q.add_log_cmd("Star tracker: Camera")
json_file = get_config_file()
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.CAMERA)
+ bytearray(json_file, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=45, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "22":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Centroiding"))
q.add_log_cmd("Star tracker: Centroiding")
json_file = get_config_file()
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.CENTROIDING)
+ bytearray(json_file, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=47, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "23":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: LISA"))
q.add_log_cmd("Star tracker: LISA")
json_file = get_config_file()
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.LISA)
+ bytearray(json_file, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=48, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "24":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Matching"))
q.add_log_cmd("Star tracker: Matching")
json_file = get_config_file()
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.MATCHING)
+ bytearray(json_file, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=49, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "25":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Validation"))
q.add_log_cmd("Star tracker: Validation")
json_file = get_config_file()
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.VALIDATION)
+ bytearray(json_file, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=50, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "26":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Algo"))
q.add_log_cmd("Star tracker: Algo")
json_file = get_config_file()
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.ALGO)
+ bytearray(json_file, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=51, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "27":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Take image"))
q.add_log_cmd("Star tracker: Take image")
actionid = int(input("Specify parameter ID (take image - 4): "))
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.TAKE_IMAGE)
+ struct.pack("!B", actionid)
)
command = PusTelecommand(service=8, subservice=128, ssc=52, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "28":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Stop str helper"))
command = object_id + struct.pack("!I", StarTrackerActionIds.STOP_STR_HELPER)
command = PusTelecommand(service=8, subservice=128, ssc=54, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Stop str helper")
data = obyt + struct.pack("!I", StarTrackerActionIds.STOP_STR_HELPER)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "30":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Set name of download image")
)
q.add_log_cmd("Star tracker: Set name of download image")
filename = input("Specify download image name: ")
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.CHANGE_DOWNLOAD_IMAGE)
+ bytearray(filename, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=54, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "31":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Request histogram"))
command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_HISTOGRAM)
command = PusTelecommand(service=8, subservice=128, ssc=55, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Request histogram")
data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_HISTOGRAM)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "32":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Request contrast"))
command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_CONTRAST)
command = PusTelecommand(service=8, subservice=128, ssc=56, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Request contrast")
data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_CONTRAST)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "33":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Set json filename"))
q.add_log_cmd("Star tracker: Set json filename")
json_file = get_config_file()
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.SET_JSON_FILE_NAME)
+ bytearray(json_file, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=57, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "35":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Flash read"))
command = pack_read_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=59, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Flash read")
data = pack_read_command(obyt)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "36":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Set flash read filename")
)
q.add_log_cmd("Star tracker: Set flash read filename")
filename = input("Specify filename: ")
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.SET_FLASH_READ_FILENAME)
+ bytearray(filename, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=60, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "37":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Get checksum"))
command = pack_checksum_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=61, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Get checksum")
data = pack_checksum_command(obyt)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "38":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Set time"))
q.add_log_cmd("Star tracker: Set time")
unix_time = 1640783543
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.SET_TIME)
+ struct.pack("!Q", unix_time)
)
command = PusTelecommand(service=8, subservice=128, ssc=61, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "39":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Download Centroid"))
q.add_log_cmd("Star tracker: Download Centroid")
id = 0
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.DOWNLOAD_CENTROID)
+ struct.pack("!B", id)
)
command = PusTelecommand(service=8, subservice=128, ssc=62, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "41":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Download matched star")
)
q.add_log_cmd("Star tracker: Download matched star")
id = 0
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.DOWNLOAD_MATCHED_STAR)
+ struct.pack("!B", id)
)
command = PusTelecommand(service=8, subservice=128, ssc=64, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "42":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Download DB Image"))
q.add_log_cmd("Star tracker: Download DB Image")
id = 0
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.DOWNLOAD_DBIMAGE)
+ struct.pack("!B", id)
)
command = PusTelecommand(service=8, subservice=128, ssc=65, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "43":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Download Blob Pixel"))
q.add_log_cmd("Star tracker: Download Blob Pixel")
id = 0
type = 1 # 0 - normal, 1 - fast
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.DOWNLOAD_BLOBPIXEL)
+ struct.pack("!B", id)
+ struct.pack("!B", type)
)
command = PusTelecommand(service=8, subservice=128, ssc=65, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "44":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Download FPGA Image"))
q.add_log_cmd("Star tracker: Download FPGA Image")
position = int(input("Start position: "))
length = int(input("Size to download: "))
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.DOWNLOAD_FPGA_IMAGE)
+ struct.pack("!I", position)
+ struct.pack("!I", length)
+ bytearray(FileDefs.downloadFpgaImagePath, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=66, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "45":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Change donwload FPGA image file name")
)
command = (
object_id
q.add_log_cmd("Star tracker: Change donwload FPGA image file name")
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.CHANGE_FPGA_DOWNLOAD_FILE)
+ bytearray(FileDefs.downloadFpgaImageName, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=67, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "46":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Upload FPGA image"))
command = (
object_id
q.add_log_cmd("Star tracker: Upload FPGA image")
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.UPLOAD_FPGA_IMAGE)
+ bytearray(FileDefs.uploadFpgaImageName, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=68, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "47":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: FPGA action"))
q.add_log_cmd("Star tracker: FPGA action")
id = 3
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.FPGA_ACTION)
+ struct.pack("!B", id)
)
command = PusTelecommand(service=8, subservice=128, ssc=69, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "48":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Unlock"))
command = object_id + struct.pack("!I", StarTrackerActionIds.UNLOCK)
command = PusTelecommand(service=8, subservice=128, ssc=70, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Unlock")
data = obyt + struct.pack("!I", StarTrackerActionIds.UNLOCK)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "49":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Request camera parameters")
)
command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_CAMERA_PARAMS)
command = PusTelecommand(service=8, subservice=128, ssc=71, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Request camera parameters")
data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_CAMERA_PARAMS)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "50":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Request limits"))
command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_LIMITS)
command = PusTelecommand(service=8, subservice=128, ssc=72, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Request limits")
data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_LIMITS)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "51":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Set image processor parameters")
)
q.add_log_cmd("Star tracker: Set image processor parameters")
json_file = get_config_file()
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.IMAGE_PROCESSOR)
+ bytearray(json_file, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=70, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "52":
tc_queue.appendleft(
(
QueueCommands.PRINT,
"Star tracker: EGSE load ground config camera parameters",
)
)
command = (
object_id
q.add_log_cmd("Star tracker: EGSE load ground config camera parameters")
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.CAMERA)
+ bytearray(FileDefs.egse_ground_config, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=71, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "53":
tc_queue.appendleft(
(
QueueCommands.PRINT,
"Star tracker: EGSE load flight config camera parameters",
)
)
command = (
object_id
q.add_log_cmd("Star tracker: EGSE load flight config camera parameters")
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.CAMERA)
+ bytearray(FileDefs.egse_flight_config, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=72, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "54":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Request log level parameters")
)
command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_LOG_LEVEL)
command = PusTelecommand(service=8, subservice=128, ssc=74, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Request log level parameters")
data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_LOG_LEVEL)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "55":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Request mounting parameters")
)
command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_MOUNTING)
command = PusTelecommand(service=8, subservice=128, ssc=75, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Request mounting parameters")
data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_MOUNTING)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "56":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Request image processor parameters")
)
command = object_id + struct.pack(
"!I", StarTrackerActionIds.REQ_IMAGE_PROCESSOR
)
command = PusTelecommand(service=8, subservice=128, ssc=76, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Request image processor parameters")
data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_IMAGE_PROCESSOR)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "57":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Request centroiding parameters")
)
command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_CENTROIDING)
command = PusTelecommand(service=8, subservice=128, ssc=75, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Request centroiding parameters")
data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_CENTROIDING)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "58":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Request lisa parameters")
)
command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_LISA)
command = PusTelecommand(service=8, subservice=128, ssc=76, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Request lisa parameters")
data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_LISA)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "59":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Request matching parameters")
)
command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_MATCHING)
command = PusTelecommand(service=8, subservice=128, ssc=77, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Request matching parameters")
data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_MATCHING)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "60":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Request tracking parameters")
)
command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_TRACKING)
command = PusTelecommand(service=8, subservice=128, ssc=78, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Request tracking parameters")
data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_TRACKING)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "61":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Request validation parameters")
)
command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_VALIDATION)
command = PusTelecommand(service=8, subservice=128, ssc=79, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Request validation parameters")
data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_VALIDATION)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "62":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Request algo parameters")
)
command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_ALGO)
command = PusTelecommand(service=8, subservice=128, ssc=80, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Request algo parameters")
data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_ALGO)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "63":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Request subscription parameters")
)
command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_SUBSCRIPTION)
command = PusTelecommand(service=8, subservice=128, ssc=81, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Request subscription parameters")
data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_SUBSCRIPTION)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "64":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Request log subscription parameters")
)
command = object_id + struct.pack(
"!I", StarTrackerActionIds.REQ_LOG_SUBSCRIPTION
)
command = PusTelecommand(service=8, subservice=128, ssc=82, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Request log subscription parameters")
data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_LOG_SUBSCRIPTION)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "65":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Request debug camera parameters")
)
command = object_id + struct.pack("!I", StarTrackerActionIds.REQ_DEBUG_CAMERA)
command = PusTelecommand(service=8, subservice=128, ssc=83, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Star tracker: Request debug camera parameters")
data = obyt + struct.pack("!I", StarTrackerActionIds.REQ_DEBUG_CAMERA)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "66":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Set log level parameters")
)
q.add_log_cmd("Star tracker: Set log level parameters")
json_file = get_config_file()
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.LOGLEVEL)
+ bytearray(json_file, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=84, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "67":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Set log subscription parameters")
)
q.add_log_cmd("Star tracker: Set log subscription parameters")
json_file = get_config_file()
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.LOG_SUBSCRIPTION)
+ bytearray(json_file, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=85, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "68":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Set debug camera parameters")
)
q.add_log_cmd("Star tracker: Set debug camera parameters")
json_file = get_config_file()
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.DEBUG_CAMERA)
+ bytearray(json_file, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=86, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "69":
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Firmware update"))
q.add_log_cmd("Star tracker: Firmware update")
firmware = get_firmware()
command = (
object_id
data = (
obyt
+ struct.pack("!I", StarTrackerActionIds.FIRMWARE_UPDATE)
+ bytearray(firmware, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=87, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "70":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Disable timestamp generation")
)
command = object_id + struct.pack(
q.add_log_cmd("Star tracker: Disable timestamp generation")
command = obyt + struct.pack(
"!I", StarTrackerActionIds.DISBALE_TIMESTAMP_GENERATION
)
command = PusTelecommand(service=8, subservice=128, ssc=88, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "71":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker: Enable timestamp generation")
)
command = object_id + struct.pack(
q.add_log_cmd("Star tracker: Enable timestamp generation")
command = obyt + struct.pack(
"!I", StarTrackerActionIds.ENABLE_TIMESTAMP_GENERATION
)
command = PusTelecommand(service=8, subservice=128, ssc=89, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
def pack_read_command(object_id: bytearray) -> bytearray:
def pack_read_command(object_id: bytes) -> bytearray:
start_region = StartRegion.STAR_TRACKER_FIRMWARE
size = PartitionSize.STAR_TRACKER_FIRMWARE
path = input("Specify storage location (default - /mnt/sd0/startracker): ")
if not path:
path = FileDefs.download_path
command = (
data = (
object_id
+ struct.pack("!I", StarTrackerActionIds.FLASH_READ)
+ struct.pack("!B", start_region)
+ struct.pack("!I", size)
+ bytearray(path, "utf-8")
)
return command
return bytearray(data)
def pack_checksum_command(object_id: bytearray) -> bytearray:
def pack_checksum_command(object_id: bytes) -> bytearray:
start_region = StartRegion.STAR_TRACKER_FIRMWARE
address = 0
size = PartitionSize.STAR_TRACKER_FIRMWARE
command = (
data = (
object_id
+ struct.pack("!I", StarTrackerActionIds.CHECKSUM)
+ struct.pack("!B", start_region)
+ struct.pack("!I", address)
+ struct.pack("!I", size)
)
return command
return bytearray(data)
def get_config_file() -> str:

View File

@ -10,10 +10,9 @@
"""
import struct
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper
from tmtccmd.utility import ObjectId
class Commands:
@ -25,24 +24,15 @@ class ImagePathDefs:
uploadFile = "/mnt/sd0/startracker/gemma.bin"
def pack_str_img_helper_command(
object_id: bytearray, tc_queue: TcQueueT, op_code: str
) -> TcQueueT:
tc_queue.appendleft(
(
QueueCommands.PRINT,
"Testing star tracker image helper object id: 0x" + object_id.hex(),
)
def pack_str_img_helper_command(object_id: ObjectId, q: QueueHelper, op_code: str):
q.add_log_cmd(
f"Testing star tracker image helper object id: {object_id.as_hex_string}"
)
if op_code == "0":
tc_queue.appendleft(
(QueueCommands.PRINT, "Star tracker image helper: Upload image")
)
q.add_log_cmd("Star tracker image helper: Upload image")
command = (
object_id
object_id.as_bytes
+ struct.pack("!I", Commands.UPLOAD_IMAGE)
+ bytearray(ImagePathDefs.uploadFile, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))

View File

@ -8,11 +8,15 @@
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
import struct
from tmtccmd.utility import ObjectId
class SetIds:
RX_REGISTERS_DATASET = 1
@ -38,116 +42,87 @@ class CommandIds:
def pack_syrlinks_command(
object_id: bytearray, tc_queue: TcQueueT, op_code: str
object_id: ObjectId, q: QueueHelper, op_code: str
) -> TcQueueT:
tc_queue.appendleft(
(
QueueCommands.PRINT,
"Testing Syrlinks with object id: 0x" + object_id.hex(),
)
)
obyt = object_id.as_bytes
q.add_log_cmd(f"Testing Syrlinks with object id: {object_id.as_hex_string}")
if op_code == "0":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set mode off"))
command = pack_mode_data(object_id, Modes.OFF, 0)
command = PusTelecommand(service=200, subservice=1, ssc=9, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Syrlinks: Set mode off")
data = pack_mode_data(obyt, Modes.OFF, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data))
if op_code == "1":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set mode on"))
command = pack_mode_data(object_id, Modes.ON, 0)
command = PusTelecommand(service=200, subservice=1, ssc=10, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Syrlinks: Set mode on")
data = pack_mode_data(obyt, Modes.ON, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data))
if op_code == "2":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Mode Normal"))
command = pack_mode_data(object_id, Modes.NORMAL, 0)
command = PusTelecommand(service=200, subservice=1, ssc=11, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Syrlinks: Mode Normal")
data = pack_mode_data(obyt, Modes.NORMAL, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data))
if op_code == "3":
tc_queue.appendleft((QueueCommands.PRINT, "syrlinks: Set TX mode standby"))
command = object_id + struct.pack("!I", CommandIds.SET_TX_MODE_STANDBY)
command = PusTelecommand(service=8, subservice=128, ssc=10, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("syrlinks: Set TX mode standby")
data = obyt + struct.pack("!I", CommandIds.SET_TX_MODE_STANDBY)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "4":
tc_queue.appendleft((QueueCommands.PRINT, "syrlinks: Set TX mode modulation"))
command = object_id + struct.pack("!I", CommandIds.SET_TX_MODE_MODULATION)
command = PusTelecommand(service=8, subservice=128, ssc=11, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("syrlinks: Set TX mode modulation")
data = obyt + struct.pack("!I", CommandIds.SET_TX_MODE_MODULATION)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "5":
tc_queue.appendleft((QueueCommands.PRINT, "syrlinks: Set TX mode CW"))
command = object_id + struct.pack("!I", CommandIds.SET_TX_MODE_CW)
command = PusTelecommand(service=8, subservice=128, ssc=12, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("syrlinks: Set TX mode CW")
data = obyt + struct.pack("!I", CommandIds.SET_TX_MODE_CW)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "6":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Get RX Registers"))
sid = make_sid(object_id, SetIds.RX_REGISTERS_DATASET)
command = generate_one_hk_command(sid, 200)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Syrlinks: Get RX Registers")
sid = make_sid(obyt, SetIds.RX_REGISTERS_DATASET)
q.add_pus_tc(generate_one_hk_command(sid))
if op_code == "7":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Get TX Registers"))
sid = make_sid(object_id, SetIds.TX_REGISTERS_DATASET)
command = generate_one_hk_command(sid, 201)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Syrlinks: Get TX Registers")
sid = make_sid(obyt, SetIds.TX_REGISTERS_DATASET)
q.add_pus_tc(generate_one_hk_command(sid))
if op_code == "8":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Read TX status"))
command = object_id + struct.pack("!I", CommandIds.READ_TX_STATUS)
command = PusTelecommand(service=8, subservice=128, ssc=13, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Syrlinks: Read TX status")
command = obyt + struct.pack("!I", CommandIds.READ_TX_STATUS)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "9":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Read TX waveform"))
command = object_id + struct.pack("!I", CommandIds.READ_TX_WAVEFORM)
command = PusTelecommand(service=8, subservice=128, ssc=14, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Syrlinks: Read TX waveform")
command = obyt + struct.pack("!I", CommandIds.READ_TX_WAVEFORM)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "10":
tc_queue.appendleft(
(QueueCommands.PRINT, "Syrlinks: Read TX AGC value high byte")
)
command = object_id + struct.pack("!I", CommandIds.READ_TX_AGC_VALUE_HIGH_BYTE)
command = PusTelecommand(service=8, subservice=128, ssc=15, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Syrlinks: Read TX AGC value high byte")
command = obyt + struct.pack("!I", CommandIds.READ_TX_AGC_VALUE_HIGH_BYTE)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "11":
tc_queue.appendleft(
(QueueCommands.PRINT, "Syrlinks: Read TX AGC value low byte")
)
command = object_id + struct.pack("!I", CommandIds.READ_TX_AGC_VALUE_LOW_BYTE)
command = PusTelecommand(service=8, subservice=128, ssc=16, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Syrlinks: Read TX AGC value low byte")
command = obyt + struct.pack("!I", CommandIds.READ_TX_AGC_VALUE_LOW_BYTE)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "12":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Write LCL config"))
command = object_id + struct.pack("!I", CommandIds.WRITE_LCL_CONFIG)
command = PusTelecommand(service=8, subservice=128, ssc=17, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Syrlinks: Write LCL config")
command = obyt + struct.pack("!I", CommandIds.WRITE_LCL_CONFIG)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "13":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Read RX status registers"))
command = object_id + struct.pack("!I", CommandIds.READ_RX_STATUS_REGISTERS)
command = PusTelecommand(service=8, subservice=128, ssc=18, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Syrlinks: Read RX status registers")
command = obyt + struct.pack("!I", CommandIds.READ_RX_STATUS_REGISTERS)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "14":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Read LCL config register"))
command = object_id + struct.pack("!I", CommandIds.READ_LCL_CONFIG_REGISTER)
command = PusTelecommand(service=8, subservice=128, ssc=19, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Syrlinks: Read LCL config register")
command = obyt + struct.pack("!I", CommandIds.READ_LCL_CONFIG_REGISTER)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "15":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set waveform OQPSK"))
command = object_id + struct.pack("!I", CommandIds.SET_WAVEFORM_OQPSK)
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Syrlinks: Set waveform OQPSK")
command = obyt + struct.pack("!I", CommandIds.SET_WAVEFORM_OQPSK)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "16":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set waveform BPSK"))
command = object_id + struct.pack("!I", CommandIds.SET_WAVEFORM_BPSK)
command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Syrlinks: Set waveform BPSK")
command = obyt + struct.pack("!I", CommandIds.SET_WAVEFORM_BPSK)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "17":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set second config"))
command = object_id + struct.pack("!I", CommandIds.SET_SECOND_CONFIG)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Syrlinks: Set second config")
command = obyt + struct.pack("!I", CommandIds.SET_SECOND_CONFIG)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "18":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Enable debug printout"))
command = object_id + struct.pack("!I", CommandIds.ENABLE_DEBUG)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Syrlinks: Enable debug printout")
command = obyt + struct.pack("!I", CommandIds.ENABLE_DEBUG)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "19":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Disable debug printout"))
command = object_id + struct.pack("!I", CommandIds.DISABLE_DEBUG)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Syrlinks: Disable debug printout")
command = obyt + struct.pack("!I", CommandIds.DISABLE_DEBUG)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))

View File

@ -1,8 +1,6 @@
"""Hook function which packs telecommands based on service and operation code string
"""
import logging
import os
from collections import deque
from typing import Union
from pus_tc.devs.rtd import pack_rtd_commands
@ -12,7 +10,6 @@ from tmtccmd.config import CoreServiceList
from tmtccmd.logging import get_console_logger
from tmtccmd.tc import FeedWrapper
from tmtccmd.tc.pus_5_event import (
pack_generic_service5_test_into,
pack_generic_service_5_test_into,
)
from tmtccmd.pus.pus_17_test import pack_service_17_ping_command
@ -44,10 +41,8 @@ from pus_tc.devs.plpcdu import pack_pl_pcdu_commands
from pus_tc.devs.str_img_helper import pack_str_img_helper_command
from pus_tc.system.tcs import pack_tcs_sys_commands
from pus_tc.system.proc import pack_proc_commands
from pus_tc.system.controllers import pack_cmd_ctrl_to_prompted_mode
from config.definitions import CustomServiceList
from config.object_ids import (
get_object_ids,
P60_DOCK_HANDLER,
PDU_1_HANDLER_ID,
PDU_2_HANDLER_ID,
@ -65,7 +60,6 @@ from config.object_ids import (
PLOC_SUPV_ID,
STAR_TRACKER_ID,
PLOC_MEMORY_DUMPER_ID,
GPS_CONTROLLER,
CCSDS_HANDLER_ID,
PDEC_HANDLER_ID,
STR_IMG_HELPER_ID,
@ -156,60 +150,56 @@ def handle_default_procedure(info: DefaultProcedureInfo, wrapper: FeedWrapper):
if service == CustomServiceList.STAR_TRACKER.value:
object_id = STAR_TRACKER_ID
return pack_star_tracker_commands(
object_id=object_id, tc_queue=service_queue, op_code=op_code
object_id=object_id, q=queue_helper, op_code=op_code
)
if service == CustomServiceList.STR_IMG_HELPER.value:
object_id = STR_IMG_HELPER_ID
return pack_str_img_helper_command(
object_id=object_id, tc_queue=service_queue, op_code=op_code
object_id=object_id, q=queue_helper, op_code=op_code
)
if service == CustomServiceList.CORE.value:
return pack_core_commands(tc_queue=service_queue, op_code=op_code)
return pack_core_commands(q=queue_helper, op_code=op_code)
if service == CustomServiceList.PLOC_MEMORY_DUMPER.value:
object_id = PLOC_MEMORY_DUMPER_ID
return pack_ploc_memory_dumper_cmd(
object_id=object_id, tc_queue=service_queue, op_code=op_code
object_id=object_id, q=queue_helper, op_code=op_code
)
if service == CustomServiceList.ACS.value:
return pack_acs_command(tc_queue=service_queue, op_code=op_code)
return pack_acs_command(q=queue_helper, op_code=op_code)
if service == CustomServiceList.GPS_CTRL.value:
return pack_gps_command(
object_id=oids.GPS_CONTROLLER, tc_queue=service_queue, op_code=op_code
object_id=oids.GPS_CONTROLLER, q=queue_helper, op_code=op_code
)
if service == CustomServiceList.CCSDS_HANDLER.value:
return pack_ccsds_handler_test(
object_id=CCSDS_HANDLER_ID, tc_queue=service_queue, op_code=op_code
object_id=CCSDS_HANDLER_ID, q=queue_helper, op_code=op_code
)
if service == CustomServiceList.PDEC_HANDLER.value:
return pack_ccsds_handler_test(
object_id=PDEC_HANDLER_ID, tc_queue=service_queue, op_code=op_code
object_id=PDEC_HANDLER_ID, q=queue_helper, op_code=op_code
)
if service == CustomServiceList.SYRLINKS.value:
return pack_syrlinks_command(
object_id=SYRLINKS_HANDLER_ID, tc_queue=service_queue, op_code=op_code
object_id=SYRLINKS_HANDLER_ID, q=queue_helper, op_code=op_code
)
if service == CustomServiceList.SA_DEPLYOMENT.value:
return pack_solar_array_deployment_test_into(
object_id=SOLAR_ARRAY_DEPLOYMENT_ID, tc_queue=service_queue
object_id=SOLAR_ARRAY_DEPLOYMENT_ID, q=queue_helper
)
if service == CustomServiceList.PROCEDURE.value:
return pack_proc_commands(tc_queue=service_queue, op_code=op_code)
return pack_proc_commands(q=queue_helper, op_code=op_code)
if service == CustomServiceList.SUS_ASS.value:
return pack_sus_cmds(tc_queue=service_queue, op_code=op_code)
return pack_sus_cmds(q=queue_helper, op_code=op_code)
if service == CustomServiceList.PL_PCDU.value:
return pack_pl_pcdu_commands(tc_queue=service_queue, op_code=op_code)
return pack_pl_pcdu_commands(q=queue_helper, op_code=op_code)
if service == CustomServiceList.ACS_ASS.value:
return pack_acs_command(tc_queue=service_queue, op_code=op_code)
return pack_acs_command(q=queue_helper, op_code=op_code)
if service == CustomServiceList.TCS_ASS.value:
return pack_tcs_sys_commands(tc_queue=service_queue, op_code=op_code)
return pack_tcs_sys_commands(q=queue_helper, op_code=op_code)
if service == CustomServiceList.TIME.value:
return pack_set_current_time_ascii_command(tc_queue=service_queue, ssc=0)
return pack_set_current_time_ascii_command(q=queue_helper)
if service == CustomServiceList.RW_ASSEMBLY.value:
return pack_rw_ass_cmds(
tc_queue=service_queue, object_id=RW_ASSEMBLY, op_code=op_code
)
if service == CustomServiceList.CONTROLLERS.value:
return pack_controller_commands(tc_queue=service_queue, op_code=op_code)
return pack_rw_ass_cmds(q=queue_helper, object_id=RW_ASSEMBLY, op_code=op_code)
LOGGER.warning(f"Invalid Service {service}")
@ -231,18 +221,3 @@ def pre_tc_send_cb(
elif isinstance(queue_entry, QueueCommands):
if queue_entry == QueueCommands.PRINT:
file_logger.info(queue_info)
def pack_service_queue_user(
service: Union[str, int], op_code: str, service_queue: TcQueueT
):
pass
def create_total_tc_queue_user() -> TcQueueT:
if not os.path.exists("log"):
os.mkdir("log")
tc_queue = deque()
pack_generic_service5_test_into(tc_queue)
tc_queue.appendleft(pack_service_17_ping_command(ssc=1700).pack_command_tuple())
return tc_queue

View File

@ -1,5 +1,6 @@
import enum
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import Modes
from config.object_ids import ACS_BOARD_ASS_ID, SUS_BOARD_ASS_ID
@ -29,13 +30,13 @@ class DualSideSubmodes(enum.IntEnum):
DUAL_SIDE = 2
def pack_acs_command(tc_queue: TcQueueT, op_code: str):
def pack_acs_command(q: QueueHelper, op_code: str):
if op_code in AcsOpCodes.ACS_ASS_A_SIDE:
command_mode(
object_id=ACS_BOARD_ASS_ID,
mode=Modes.NORMAL,
submode=DualSideSubmodes.A_SIDE,
tc_queue=tc_queue,
q=q,
info="Switching to ACS board assembly A side",
)
if op_code in AcsOpCodes.ACS_ASS_B_SIDE:
@ -43,7 +44,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str):
object_id=ACS_BOARD_ASS_ID,
mode=Modes.NORMAL,
submode=DualSideSubmodes.B_SIDE,
tc_queue=tc_queue,
q=q,
info="Switching to ACS board assembly B side",
)
if op_code in AcsOpCodes.ACS_ASS_DUAL_MODE:
@ -51,7 +52,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str):
object_id=ACS_BOARD_ASS_ID,
mode=Modes.NORMAL,
submode=DualSideSubmodes.DUAL_SIDE,
tc_queue=tc_queue,
q=q,
info="Switching to ACS board assembly dual mode",
)
if op_code in AcsOpCodes.ACS_ASS_A_ON:
@ -59,7 +60,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str):
object_id=ACS_BOARD_ASS_ID,
mode=Modes.ON,
submode=DualSideSubmodes.A_SIDE,
tc_queue=tc_queue,
q=q,
info="Switching ACS board assembly A side on",
)
if op_code in AcsOpCodes.ACS_ASS_B_ON:
@ -67,7 +68,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str):
object_id=ACS_BOARD_ASS_ID,
mode=Modes.ON,
submode=DualSideSubmodes.B_SIDE,
tc_queue=tc_queue,
q=q,
info="Switching ACS board assembly B side on",
)
if op_code in AcsOpCodes.ACS_ASS_DUAL_ON:
@ -75,7 +76,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str):
object_id=ACS_BOARD_ASS_ID,
mode=Modes.ON,
submode=DualSideSubmodes.B_SIDE,
tc_queue=tc_queue,
q=q,
info="Switching ACS board assembly dual side on",
)
if op_code in AcsOpCodes.ACS_ASS_OFF:
@ -83,18 +84,18 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str):
object_id=ACS_BOARD_ASS_ID,
mode=Modes.OFF,
submode=0,
tc_queue=tc_queue,
q=q,
info="Switching to ACS board assembly off",
)
def pack_sus_cmds(tc_queue: TcQueueT, op_code: str):
def pack_sus_cmds(q: QueueHelper, op_code: str):
if op_code in SusOpCodes.SUS_ASS_NOM_SIDE:
command_mode(
object_id=SUS_BOARD_ASS_ID,
mode=Modes.NORMAL,
submode=DualSideSubmodes.A_SIDE,
tc_queue=tc_queue,
q=q,
info="Switching to SUS board to nominal side",
)
if op_code in SusOpCodes.SUS_ASS_RED_SIDE:
@ -102,7 +103,7 @@ def pack_sus_cmds(tc_queue: TcQueueT, op_code: str):
object_id=SUS_BOARD_ASS_ID,
mode=Modes.NORMAL,
submode=DualSideSubmodes.B_SIDE,
tc_queue=tc_queue,
q=q,
info="Switching to SUS board to redundant side",
)
if op_code in SusOpCodes.SUS_ASS_OFF:
@ -110,7 +111,7 @@ def pack_sus_cmds(tc_queue: TcQueueT, op_code: str):
object_id=SUS_BOARD_ASS_ID,
mode=Modes.OFF,
submode=0,
tc_queue=tc_queue,
q=q,
info="Switching SUS board off",
)
if op_code in SusOpCodes.SUS_ASS_DUAL_MODE:
@ -118,6 +119,6 @@ def pack_sus_cmds(tc_queue: TcQueueT, op_code: str):
object_id=SUS_BOARD_ASS_ID,
mode=Modes.NORMAL,
submode=DualSideSubmodes.DUAL_SIDE,
tc_queue=tc_queue,
q=q,
info="Switching to SUS board to dual side",
)

View File

@ -1,7 +1,7 @@
from typing import Union
from tmtccmd.tc.definitions import TcQueueT, QueueCommands
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes, Subservices
@ -9,16 +9,17 @@ def command_mode(
object_id: bytes,
mode: Union[int, Modes],
submode: int,
tc_queue: TcQueueT,
q: QueueHelper,
info: str,
):
tc_queue.appendleft((QueueCommands.PRINT, info))
q.add_log_cmd(info)
mode_data = pack_mode_data(
object_id=object_id,
mode=mode,
submode=submode,
)
cmd = PusTelecommand(
service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=mode_data
q.add_pus_tc(
PusTelecommand(
service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=mode_data
)
)
tc_queue.appendleft(cmd.pack_command_tuple())

View File

@ -1,5 +1,4 @@
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.config import QueueCommands
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import Modes
from tmtccmd.utility import ObjectId
@ -19,7 +18,7 @@ class Info:
CORE_CONTROLLER = "ACS controller"
def pack_cmd_ctrl_to_prompted_mode(tc_queue: TcQueueT, object_id: ObjectId):
def pack_cmd_ctrl_to_prompted_mode(q: QueueHelper, object_id: ObjectId):
parameters = prompt_parameters(
[
{"name": "Mode", "defaultValue": "2"},
@ -35,37 +34,37 @@ def pack_cmd_ctrl_to_prompted_mode(tc_queue: TcQueueT, object_id: ObjectId):
object_id=object_id.as_bytes,
mode=mode,
submode=submode,
tc_queue=tc_queue,
q=q,
info=f"Commanding {object_id} to {mode}, {submode}",
)
def pack_cmd_ctrl_to_off(tc_queue: TcQueueT, object_id: ObjectId):
def pack_cmd_ctrl_to_off(q: QueueHelper, object_id: ObjectId):
command_mode(
object_id=object_id.as_bytes,
mode=Modes.OFF,
submode=0,
tc_queue=tc_queue,
q=q,
info=f"Commanding {object_id} OFF",
)
def pack_cmd_ctrl_to_on(tc_queue: TcQueueT, object_id: ObjectId):
def pack_cmd_ctrl_to_on(q: QueueHelper, object_id: ObjectId):
command_mode(
object_id=object_id.as_bytes,
mode=Modes.ON,
submode=0,
tc_queue=tc_queue,
q=q,
info=f"Commanding {object_id} ON",
)
def pack_cmd_ctrl_to_nml(tc_queue: TcQueueT, object_id: ObjectId):
def pack_cmd_ctrl_to_nml(q: QueueHelper, object_id: ObjectId):
command_mode(
object_id=object_id.as_bytes,
mode=Modes.NORMAL,
submode=0,
tc_queue=tc_queue,
q=q,
info=f"Commanding {object_id} NORMAL",
)

View File

@ -4,6 +4,8 @@ from config.definitions import CustomServiceList
from tmtccmd.config import add_op_code_entry, add_service_op_code_entry
from tmtccmd.config.definitions import QueueCommands, ServiceOpCodeDictT
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_8_funccmd import generate_action_command
from tmtccmd.logging import get_console_logger
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
@ -122,47 +124,48 @@ def add_core_controller_definitions(cmd_dict: ServiceOpCodeDictT):
)
def pack_core_commands(tc_queue: TcQueueT, op_code: str):
def pack_core_commands(q: QueueHelper, op_code: str):
if op_code in OpCodes.REBOOT_XSC:
reboot_self, chip_select, copy_select = determine_reboot_params()
perform_reboot_cmd(
tc_queue=tc_queue,
q=q,
reboot_self=reboot_self,
chip=chip_select,
copy=copy_select,
)
if op_code in OpCodes.REBOOT_FULL:
tc_queue.appendleft((QueueCommands.PRINT, f"Core Command: {Info.REBOOT_FULL}"))
cmd = generate_action_command(
object_id=CORE_CONTROLLER_ID, action_id=ActionIds.FULL_REBOOT
q.add_log_cmd(f"Core Command: {Info.REBOOT_FULL}")
q.add_pus_tc(
generate_action_command(
object_id=CORE_CONTROLLER_ID, action_id=ActionIds.FULL_REBOOT
)
)
tc_queue.appendleft(cmd.pack_command_tuple())
if op_code in OpCodes.XSC_REBOOT_SELF:
perform_reboot_cmd(tc_queue=tc_queue, reboot_self=True)
perform_reboot_cmd(q=q, reboot_self=True)
if op_code in OpCodes.XSC_REBOOT_0_0:
perform_reboot_cmd(
tc_queue=tc_queue, reboot_self=False, chip=Chip.CHIP_0, copy=Copy.COPY_0_NOM
q=q, reboot_self=False, chip=Chip.CHIP_0, copy=Copy.COPY_0_NOM
)
if op_code in OpCodes.XSC_REBOOT_0_1:
perform_reboot_cmd(
tc_queue=tc_queue,
q=q,
reboot_self=False,
chip=Chip.CHIP_0,
copy=Copy.COPY_1_GOLD,
)
if op_code in OpCodes.XSC_REBOOT_1_0:
perform_reboot_cmd(
tc_queue=tc_queue, reboot_self=False, chip=Chip.CHIP_1, copy=Copy.COPY_0_NOM
q=q, reboot_self=False, chip=Chip.CHIP_1, copy=Copy.COPY_0_NOM
)
if op_code in OpCodes.XSC_REBOOT_1_1:
perform_reboot_cmd(
tc_queue=tc_queue,
q=q,
reboot_self=False,
chip=Chip.CHIP_1,
copy=Copy.COPY_1_GOLD,
)
if op_code in OpCodes.DISABLE_REBOOT_FILE_HANDLING:
tc_queue.appendleft((QueueCommands.PRINT, "Disabling reboot file handling"))
q.add_log_cmd("Disabling reboot file handling")
app_data = bytearray([0])
generate_action_command(
object_id=CORE_CONTROLLER_ID,
@ -170,7 +173,7 @@ def pack_core_commands(tc_queue: TcQueueT, op_code: str):
app_data=app_data,
)
if op_code in OpCodes.ENABLE_REBOOT_FILE_HANDLING:
tc_queue.appendleft((QueueCommands.PRINT, "Enabling reboot file handling"))
q.add_log_cmd("Enabling reboot file handling")
app_data = bytearray([1])
generate_action_command(
object_id=CORE_CONTROLLER_ID,
@ -178,35 +181,34 @@ def pack_core_commands(tc_queue: TcQueueT, op_code: str):
app_data=app_data,
)
if op_code in OpCodes.RESET_ALL_REBOOT_COUNTERS:
tc_queue.appendleft((QueueCommands.PRINT, "Resetting all reboot counters"))
q.add_log_cmd("Resetting all reboot counters")
generate_action_command(
object_id=CORE_CONTROLLER_ID, action_id=ActionIds.RESET_ALL_REBOOT_COUNTERS
)
if op_code in OpCodes.RESET_REBOOT_COUNTER_00:
tc_queue.appendleft((QueueCommands.PRINT, "Resetting reboot counter 0 0"))
q.add_log_cmd("Resetting reboot counter 0 0")
generate_action_command(
object_id=CORE_CONTROLLER_ID, action_id=ActionIds.RESET_REBOOT_COUNTER_00
)
if op_code in OpCodes.RESET_REBOOT_COUNTER_01:
tc_queue.appendleft((QueueCommands.PRINT, "Resetting reboot counter 0 1"))
q.add_log_cmd("Resetting reboot counter 0 1")
generate_action_command(
object_id=CORE_CONTROLLER_ID, action_id=ActionIds.RESET_REBOOT_COUNTER_01
)
if op_code in OpCodes.RESET_REBOOT_COUNTER_10:
tc_queue.appendleft((QueueCommands.PRINT, "Resetting reboot counter 1 0"))
q.add_log_cmd("Resetting reboot counter 1 0")
generate_action_command(
object_id=CORE_CONTROLLER_ID, action_id=ActionIds.RESET_REBOOT_COUNTER_10
)
if op_code in OpCodes.RESET_REBOOT_COUNTER_11:
tc_queue.appendleft((QueueCommands.PRINT, "Resetting reboot counter 1 1"))
q.add_log_cmd("Resetting reboot counter 1 1")
generate_action_command(
object_id=CORE_CONTROLLER_ID, action_id=ActionIds.RESET_REBOOT_COUNTER_11
)
if op_code in OpCodes.GET_HK:
tc_queue.appendleft((QueueCommands.PRINT, "Requesting housekeeping set"))
q.add_log_cmd("Requesting housekeeping set")
sid = make_sid(object_id=CORE_CONTROLLER_ID, set_id=SetIds.HK)
command = generate_one_hk_command(sid, 201)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(generate_one_hk_command(sid))
def determine_reboot_params() -> (bool, Chip, Copy):
@ -241,31 +243,25 @@ def determine_reboot_params() -> (bool, Chip, Copy):
def perform_reboot_cmd(
tc_queue: TcQueueT,
q: QueueHelper,
reboot_self: bool,
chip: Chip = Chip.NONE,
copy: Copy = Copy.NONE,
):
tc_data = bytearray()
if reboot_self:
tc_queue.appendleft(
(QueueCommands.PRINT, "Packing reboot command for current image")
)
q.add_log_cmd("Packing reboot command for current image")
tc_data.append(True)
else:
tc_data.append(False)
tc_data.append(chip)
tc_data.append(copy)
tc_queue.append(
(
QueueCommands.PRINT,
f"Packing reboot command for chip {chip} and copy {copy}",
)
q.add_log_cmd(f"Packing reboot command for chip {chip} and copy {copy}")
q.add_pus_tc(
generate_action_command(
object_id=CORE_CONTROLLER_ID,
action_id=ActionIds.XSC_REBOOT,
app_data=tc_data,
ssc=0,
)
action_cmd = generate_action_command(
object_id=CORE_CONTROLLER_ID,
action_id=ActionIds.XSC_REBOOT,
app_data=tc_data,
ssc=0,
)
tc_queue.appendleft(action_cmd.pack_command_tuple())

View File

@ -1,19 +1,14 @@
from __future__ import annotations
import struct
import time
from datetime import timedelta
from typing import List
from config.definitions import CustomServiceList
from config.object_ids import get_object_ids
from pus_tc.system.tcs import pack_tcs_sys_commands
from tmtccmd.config import (
QueueCommands,
ServiceOpCodeDictT,
add_op_code_entry,
add_service_op_code_entry,
)
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_11_tc_sched import (
generate_time_tagged_cmd,
generate_enable_tc_sched_cmd,
@ -126,10 +121,8 @@ class GenericHkListeningCfg:
return GenericHkListeningCfg(False, False, False)
def generic_print(tc_queue: TcQueueT, info: dict):
tc_queue.appendleft(
(QueueCommands.PRINT, f"Executing {info[1]} Procedure (OpCodes: {info[0]})")
)
def generic_print(q: QueueHelper, info: dict):
q.add_log_cmd(f"Executing {info[1]} Procedure (OpCodes: {info[0]})")
def add_proc_cmds(cmd_dict: ServiceOpCodeDictT):
@ -147,7 +140,7 @@ def add_proc_cmds(cmd_dict: ServiceOpCodeDictT):
def pack_generic_hk_listening_cmds(
tc_queue: TcQueueT,
q: QueueHelper,
proc_key: str,
sid_list: list[bytearray],
diag_list: list[bool],
@ -155,83 +148,77 @@ def pack_generic_hk_listening_cmds(
):
info = PROC_INFO_DICT[proc_key]
collection_time = info[2]
generic_print(tc_queue=tc_queue, info=info)
generic_print(q=q, info=info)
for i in range(len(sid_list)):
enable_listen_to_hk_for_x_seconds(
diag=diag_list[i],
tc_queue=tc_queue,
q=q,
device=proc_key,
sid=sid_list[i],
interval_seconds=info[3],
)
if not cfg.use_tc_sched:
tc_queue.appendleft((QueueCommands.WAIT, 2.0))
q.add_wait_seconds(2.0)
if cfg.mgt:
activate_mgts_alternately(
tc_queue=tc_queue,
)
activate_mgts_alternately(q)
elif cfg.one_rw:
activate_all_rws_in_sequence(
tc_queue=tc_queue, test_speed=20000, test_ramp_time=10000, init_ssc=0
q=q, test_speed=20000, test_ramp_time=10000, init_ssc=0
)
elif cfg.two_rws:
activate_all_rws_two_consecutively(tc_queue=tc_queue, init_ssc=0)
activate_all_rws_two_consecutively(q=q)
else:
pass
if not cfg.use_tc_sched:
tc_queue.appendleft((QueueCommands.WAIT, collection_time))
q.add_wait_seconds(collection_time)
disable_cmd_list = []
for i in range(len(sid_list)):
disable_cmd_list.append(
gen_disable_listen_to_hk_for_x_seconds(
diag=diag_list[i],
tc_queue=tc_queue,
q=q,
device=proc_key,
sid=sid_list[i],
)
)
if cfg.one_rw or cfg.two_rws:
activate_all_rws_in_sequence(
tc_queue=tc_queue, test_speed=0, test_ramp_time=5000, init_ssc=0
)
tc_queue.appendleft((QueueCommands.WAIT, 60))
activate_all_rws_in_sequence(q=q, test_speed=0, test_ramp_time=5000, init_ssc=0)
q.add_wait_seconds(60.0)
current_time = time.time()
current_time += collection_time
if not cfg.use_tc_sched:
for cmd in disable_cmd_list:
tc_queue.appendleft(cmd.pack_command_tuple())
q.add_pus_tc(cmd)
else:
for cmd in disable_cmd_list:
tc_queue.appendleft(
q.add_pus_tc(
generate_time_tagged_cmd(
release_time=struct.pack("!I", int(current_time)),
tc_to_insert=cmd,
ssc=0,
release_time=struct.pack("!I", int(current_time)), tc_to_insert=cmd
)
)
if not cfg.use_tc_sched:
tc_queue.appendleft((QueueCommands.WAIT, 60))
q.add_wait_seconds(60.0)
sid_list.clear()
diag_list.clear()
def pack_proc_commands(tc_queue: TcQueueT, op_code: str):
def pack_proc_commands(q: QueueHelper, op_code: str):
sid_list = []
obj_id_dict = get_object_ids()
if op_code in OpCodes.RESET_SCHED:
tc_queue.appendleft((QueueCommands.PRINT, "Resetting/Clearing TC schedule"))
tc_queue.appendleft(generate_reset_tc_sched_cmd().pack_command_tuple())
q.add_log_cmd("Resetting/Clearing TC schedule")
q.add_pus_tc(generate_reset_tc_sched_cmd())
if op_code in OpCodes.BAT_FT:
key = KAI.BAT_FT[0]
sid_list.append(make_sid(oids.BPX_HANDLER_ID, BpxSetIds.GET_HK_SET))
diag_list = [False]
pack_generic_hk_listening_cmds(
tc_queue=tc_queue,
q=q,
proc_key=key,
sid_list=sid_list,
diag_list=diag_list,
@ -243,7 +230,7 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str):
sid_list.append(make_sid(oids.CORE_CONTROLLER_ID, CoreSetIds.HK))
diag_list = [False]
pack_generic_hk_listening_cmds(
tc_queue=tc_queue,
q=q,
proc_key=key,
sid_list=sid_list,
diag_list=diag_list,
@ -279,7 +266,7 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str):
set_id = pcdu_dev[1]
sid_list.append(make_sid(oid, set_id))
pack_generic_hk_listening_cmds(
tc_queue=tc_queue,
q=q,
proc_key=key,
sid_list=sid_list,
diag_list=diag_list,
@ -291,7 +278,7 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str):
sid_list.append(make_sid(oids.RAD_SENSOR_ID, RadSetIds.HK))
diag_list = [False]
pack_generic_hk_listening_cmds(
tc_queue=tc_queue,
q=q,
proc_key=key,
sid_list=sid_list,
diag_list=diag_list,
@ -300,20 +287,14 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str):
if op_code in OpCodes.TV_SETUP_TCS_FT_ON:
# Enable scheduling
tc_queue.appendleft(generate_enable_tc_sched_cmd(ssc=22).pack_command_tuple())
q.add_pus_tc(generate_enable_tc_sched_cmd())
# check whether tcs_assembly also has to be commanded to NORMAL Mode
pack_tcs_sys_commands(
tc_queue=tc_queue, op_code=TcsOpCodes.TCS_BOARD_ASS_NORMAL[0]
)
pack_cmd_ctrl_to_nml(
tc_queue=tc_queue, object_id=obj_id_dict.get(oids.THERMAL_CONTROLLER_ID)
)
pack_tcs_sys_commands(q=q, op_code=TcsOpCodes.TCS_BOARD_ASS_NORMAL[0])
pack_cmd_ctrl_to_nml(q=q, object_id=obj_id_dict.get(oids.THERMAL_CONTROLLER_ID))
if op_code in OpCodes.TV_TEARDOWN_TCS_FT_OFF:
# TCS board should always be on anyway, do not command it off here
pack_cmd_ctrl_to_off(
tc_queue=tc_queue, object_id=obj_id_dict.get(oids.THERMAL_CONTROLLER_ID)
)
pack_cmd_ctrl_to_off(q=q, object_id=obj_id_dict.get(oids.THERMAL_CONTROLLER_ID))
if op_code in OpCodes.ACS_FT:
key = KAI.ACS_FT[0]
@ -333,23 +314,23 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str):
]
d_side_pairs = a_side_pairs + b_side_pairs
diag_list = [False, False, True, False, False]
pack_acs_command(tc_queue=tc_queue, op_code="acs-a")
pack_acs_command(q=q, op_code="acs-a")
for a_side_dev in a_side_pairs:
oid = a_side_dev[0]
set_id = a_side_dev[1]
sid_list.append(make_sid(oid, set_id))
pack_generic_hk_listening_cmds(
tc_queue=tc_queue,
q=q,
proc_key=key,
sid_list=sid_list,
diag_list=diag_list,
cfg=GenericHkListeningCfg.default(),
)
pack_acs_command(tc_queue=tc_queue, op_code="acs-off")
tc_queue.appendleft((QueueCommands.WAIT, 5.0))
pack_acs_command(tc_queue=tc_queue, op_code="acs-b")
pack_acs_command(q=q, op_code="acs-off")
q.add_wait_seconds(5.0)
pack_acs_command(q=q, op_code="acs-b")
sid_list.clear()
diag_list = [False, False, True, False, False]
@ -359,16 +340,16 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str):
set_id = b_side_dev[1]
sid_list.append(make_sid(oid, set_id))
pack_generic_hk_listening_cmds(
tc_queue=tc_queue,
q=q,
proc_key=key,
sid_list=sid_list,
diag_list=diag_list,
cfg=GenericHkListeningCfg.default(),
)
pack_acs_command(tc_queue=tc_queue, op_code="acs-off")
tc_queue.appendleft((QueueCommands.WAIT, 5.0))
pack_acs_command(tc_queue=tc_queue, op_code="acs-d")
pack_acs_command(q=q, op_code="acs-off")
q.add_wait_seconds(5.0)
pack_acs_command(q=q, op_code="acs-d")
sid_list.clear()
@ -389,14 +370,14 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str):
False,
]
pack_generic_hk_listening_cmds(
tc_queue=tc_queue,
q=q,
proc_key=key,
sid_list=sid_list,
diag_list=diag_list,
cfg=GenericHkListeningCfg.default(),
)
pack_acs_command(tc_queue=tc_queue, op_code="acs-off")
pack_acs_command(q=q, op_code="acs-off")
if op_code in OpCodes.MGT_FT:
key = KAI.MGT_FT[0]
@ -412,24 +393,24 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str):
]
# Command MGT to mode on
pack_imtq_test_into(oids.IMTQ_HANDLER_ID, q=tc_queue, op_code="1")
tc_queue.appendleft((QueueCommands.WAIT, 5))
pack_imtq_test_into(oids.IMTQ_HANDLER_ID, q=q, op_code="1")
q.add_wait_seconds(5.0)
# Command MGT to normal mode
pack_imtq_test_into(oids.IMTQ_HANDLER_ID, q=tc_queue, op_code="2")
pack_imtq_test_into(oids.IMTQ_HANDLER_ID, q=q, op_code="2")
for imtq_dev in imtq_pairs:
oid = imtq_dev[0]
set_id = imtq_dev[1]
sid_list.append(make_sid(oid, set_id))
pack_generic_hk_listening_cmds(
tc_queue=tc_queue,
q=q,
proc_key=key,
sid_list=sid_list,
diag_list=diag_list,
cfg=GenericHkListeningCfg.default(),
)
pack_imtq_test_into(oids.IMTQ_HANDLER_ID, q=tc_queue, op_code="0")
pack_imtq_test_into(oids.IMTQ_HANDLER_ID, q=q, op_code="0")
if op_code in OpCodes.MGT_FT_DP:
key = KAI.MGT_FT_DP[0]
@ -468,12 +449,12 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str):
True,
True,
]
pack_acs_command(tc_queue=tc_queue, op_code="acs-d")
pack_acs_command(q=q, op_code="acs-d")
# Command MGT to mode on
pack_imtq_test_into(oids.IMTQ_HANDLER_ID, q=tc_queue, op_code="1")
tc_queue.appendleft((QueueCommands.WAIT, 20))
pack_imtq_test_into(oids.IMTQ_HANDLER_ID, q=q, op_code="1")
q.add_wait_seconds(20.0)
# Command MGT to normal mode
pack_imtq_test_into(oids.IMTQ_HANDLER_ID, q=tc_queue, op_code="2")
pack_imtq_test_into(oids.IMTQ_HANDLER_ID, q=q, op_code="2")
for d_side_and_imtq_dev in d_side_and_imtq_pairs:
oid = d_side_and_imtq_dev[0]
@ -483,20 +464,20 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str):
cfg = GenericHkListeningCfg.default()
cfg.mgt = True
pack_generic_hk_listening_cmds(
tc_queue=tc_queue,
q=q,
proc_key=key,
sid_list=sid_list,
diag_list=diag_list,
cfg=cfg,
)
pack_imtq_test_into(oids.IMTQ_HANDLER_ID, q=tc_queue, op_code="0")
pack_acs_command(tc_queue=tc_queue, op_code="acs-off")
pack_imtq_test_into(oids.IMTQ_HANDLER_ID, q=q, op_code="0")
pack_acs_command(q=q, op_code="acs-off")
if op_code in OpCodes.SUS_FT:
key = KAI.SUS_FT[0]
pack_sus_cmds(tc_queue=tc_queue, op_code="sus-nom")
pack_sus_cmds(q=q, op_code="sus-nom")
sus_n_ids = [
oids.SUS_0_N_LOC_XFYFZM_PT_XF,
@ -527,16 +508,16 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str):
for nom_sus in sus_n_ids:
sid_list.append(make_sid(nom_sus, SetIds.HK))
pack_generic_hk_listening_cmds(
tc_queue=tc_queue,
q=q,
proc_key=key,
sid_list=sid_list,
diag_list=diag_list,
cfg=GenericHkListeningCfg.default(),
)
pack_acs_command(tc_queue=tc_queue, op_code="sus-off")
tc_queue.appendleft((QueueCommands.WAIT, 5.0))
pack_sus_cmds(tc_queue=tc_queue, op_code="sus-red")
pack_acs_command(q=q, op_code="sus-off")
q.add_wait_seconds(5.0)
pack_sus_cmds(q=q, op_code="sus-red")
diag_list = [
True,
@ -551,16 +532,16 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str):
for red_sus in sus_r_ids:
sid_list.append(make_sid(red_sus, SetIds.HK))
pack_generic_hk_listening_cmds(
tc_queue=tc_queue,
q=q,
proc_key=key,
sid_list=sid_list,
diag_list=diag_list,
cfg=GenericHkListeningCfg.default(),
)
pack_acs_command(tc_queue=tc_queue, op_code="sus-off")
tc_queue.appendleft((QueueCommands.WAIT, 5.0))
pack_sus_cmds(tc_queue=tc_queue, op_code="sus-d")
pack_acs_command(q=q, op_code="sus-off")
q.add_wait_seconds(5.0)
pack_sus_cmds(q=q, op_code="sus-d")
# SUSs
for nom_sus in sus_n_ids:
@ -582,36 +563,32 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str):
True,
]
pack_generic_hk_listening_cmds(
tc_queue=tc_queue,
q=q,
proc_key=key,
sid_list=sid_list,
diag_list=diag_list,
cfg=GenericHkListeningCfg.default(),
)
pack_acs_command(tc_queue=tc_queue, op_code="sus-off")
pack_acs_command(q=q, op_code="sus-off")
if op_code in OpCodes.STR_FT:
key = KAI.STR_FT[0]
pack_star_tracker_commands(
object_id=oids.STAR_TRACKER_ID, tc_queue=tc_queue, op_code="2"
)
pack_star_tracker_commands(object_id=oids.STAR_TRACKER_ID, q=q, op_code="2")
# STR
sid_list.append(make_sid(oids.STAR_TRACKER_ID, StrSetIds.TEMPERATURE))
diag_list = [False]
pack_generic_hk_listening_cmds(
tc_queue=tc_queue,
q=q,
proc_key=key,
sid_list=sid_list,
diag_list=diag_list,
cfg=GenericHkListeningCfg.default(),
)
pack_star_tracker_commands(
object_id=oids.STAR_TRACKER_ID, tc_queue=tc_queue, op_code="3"
)
pack_star_tracker_commands(object_id=oids.STAR_TRACKER_ID, q=q, op_code="3")
if op_code in OpCodes.RW_FT_ONE_RW:
key = KAI.RW_FT_ONE_RW[0]
@ -644,7 +621,7 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str):
False,
]
# RW NORMAL
pack_rw_ass_cmds(object_id=oids.RW_ASSEMBLY, tc_queue=tc_queue, op_code="nml")
pack_rw_ass_cmds(object_id=oids.RW_ASSEMBLY, q=q, op_code="nml")
# RW HK für alle RWs nur einzeln
for rw_dev in rw_pairs:
@ -652,14 +629,14 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str):
set_id = rw_dev[1]
sid_list.append(make_sid(oid, set_id))
pack_generic_hk_listening_cmds(
tc_queue=tc_queue,
q=q,
proc_key=key,
sid_list=sid_list,
diag_list=diag_list,
cfg=GenericHkListeningCfg(mgt=False, one_rw=True, two_rws=False),
)
# RW OFF
pack_rw_ass_cmds(object_id=oids.RW_ASSEMBLY, tc_queue=tc_queue, op_code="off")
pack_rw_ass_cmds(object_id=oids.RW_ASSEMBLY, q=q, op_code="off")
# ass command with 2 rws to speed
if op_code in OpCodes.RW_FT_TWO_RWS:
@ -701,7 +678,7 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str):
False,
]
# RW NORMAL
pack_rw_ass_cmds(object_id=oids.RW_ASSEMBLY, tc_queue=tc_queue, op_code="nml")
pack_rw_ass_cmds(object_id=oids.RW_ASSEMBLY, q=q, op_code="nml")
# RW
for rw_dev in rw_pairs:
@ -709,190 +686,162 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str):
set_id = rw_dev[1]
sid_list.append(make_sid(oid, set_id))
pack_generic_hk_listening_cmds(
tc_queue=tc_queue,
q=q,
proc_key=key,
sid_list=sid_list,
diag_list=diag_list,
cfg=GenericHkListeningCfg(mgt=False, one_rw=False, two_rws=True),
)
# RW OFF
pack_rw_ass_cmds(object_id=oids.RW_ASSEMBLY, tc_queue=tc_queue, op_code="off")
pack_rw_ass_cmds(object_id=oids.RW_ASSEMBLY, q=q, op_code="off")
def enable_listen_to_hk_for_x_seconds(
tc_queue: TcQueueT,
q: QueueHelper,
diag: bool,
device: str,
sid: bytes,
interval_seconds: float,
):
tc_queue.appendleft((QueueCommands.PRINT, f"Enabling periodic HK for {device}"))
q.add_log_cmd(f"Enabling periodic HK for {device}")
cmd_tuple = enable_periodic_hk_command_with_interval(
diag=diag, sid=sid, interval_seconds=interval_seconds, ssc=0
)
for cmd in cmd_tuple:
tc_queue.appendleft(cmd.pack_command_tuple())
q.add_pus_tc(cmd)
def gen_disable_listen_to_hk_for_x_seconds(
tc_queue: TcQueueT,
q: QueueHelper,
diag: bool,
device: str,
sid: bytes,
) -> PusTelecommand:
tc_queue.appendleft((QueueCommands.PRINT, f"Disabling periodic HK for {device}"))
return disable_periodic_hk_command(diag=diag, sid=sid, ssc=0)
q.add_log_cmd(f"Disabling periodic HK for {device}")
return disable_periodic_hk_command(diag=diag, sid=sid)
def activate_mgts_alternately(
tc_queue: TcQueueT,
q: QueueHelper,
):
command = pack_dipole_command(
object_id=oids.IMTQ_HANDLER_ID,
x_dipole=2000,
y_dipole=0,
z_dipole=0,
duration=30000,
q.add_pus_tc(
pack_dipole_command(
object_id=oids.IMTQ_HANDLER_ID,
x_dipole=2000,
y_dipole=0,
z_dipole=0,
duration=30000,
)
)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.WAIT, 40.0))
q.add_wait_seconds(40.0)
command = pack_dipole_command(
object_id=oids.IMTQ_HANDLER_ID,
x_dipole=-2000,
y_dipole=0,
z_dipole=0,
duration=30000,
q.add_pus_tc(
pack_dipole_command(
object_id=oids.IMTQ_HANDLER_ID,
x_dipole=-2000,
y_dipole=0,
z_dipole=0,
duration=30000,
)
)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.WAIT, 40.0))
q.add_wait_seconds(40.0)
command = pack_dipole_command(
object_id=oids.IMTQ_HANDLER_ID,
x_dipole=0,
y_dipole=2000,
z_dipole=0,
duration=30000,
q.add_pus_tc(
pack_dipole_command(
object_id=oids.IMTQ_HANDLER_ID,
x_dipole=0,
y_dipole=2000,
z_dipole=0,
duration=30000,
)
)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.WAIT, 40.0))
q.add_wait_seconds(40.0)
command = pack_dipole_command(
object_id=oids.IMTQ_HANDLER_ID,
x_dipole=0,
y_dipole=-2000,
z_dipole=0,
duration=30000,
q.add_pus_tc(
pack_dipole_command(
object_id=oids.IMTQ_HANDLER_ID,
x_dipole=0,
y_dipole=-2000,
z_dipole=0,
duration=30000,
)
)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.WAIT, 40.0))
q.add_wait_seconds(40.0)
command = pack_dipole_command(
object_id=oids.IMTQ_HANDLER_ID,
x_dipole=0,
y_dipole=0,
z_dipole=2000,
duration=30000,
q.add_pus_tc(
pack_dipole_command(
object_id=oids.IMTQ_HANDLER_ID,
x_dipole=0,
y_dipole=0,
z_dipole=2000,
duration=30000,
)
)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.WAIT, 40.0))
q.add_wait_seconds(40.0)
command = pack_dipole_command(
object_id=oids.IMTQ_HANDLER_ID,
x_dipole=0,
y_dipole=0,
z_dipole=-2000,
duration=30000,
q.add_pus_tc(
pack_dipole_command(
object_id=oids.IMTQ_HANDLER_ID,
x_dipole=0,
y_dipole=0,
z_dipole=-2000,
duration=30000,
)
)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.WAIT, 40.0))
q.add_wait_seconds(40.0)
def rw_speed_cmd_single(
tc_queue: TcQueueT, oid: bytes, init_ssc: int, speed: int, ramp_time: int
) -> int:
command = pack_set_speed_command(
object_id=oid, speed=speed, ramp_time_ms=ramp_time, ssc=init_ssc
def rw_speed_cmd_single(q: QueueHelper, oid: bytes, speed: int, ramp_time: int):
q.add_pus_tc(
pack_set_speed_command(object_id=oid, speed=speed, ramp_time_ms=ramp_time)
)
init_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.WAIT, 70.0))
command = pack_set_speed_command(
object_id=oid, speed=0, ramp_time_ms=ramp_time, ssc=init_ssc
)
tc_queue.appendleft(command.pack_command_tuple())
return init_ssc + 1
q.add_wait(timedelta(seconds=70.0))
q.add_pus_tc(pack_set_speed_command(object_id=oid, speed=0, ramp_time_ms=ramp_time))
def rw_speed_up_cmd_consec(
tc_queue: TcQueueT, obids: List[bytes], init_ssc: int, speed: int, ramp_time: int
) -> int:
q: QueueHelper, obids: List[bytes], speed: int, ramp_time: int
):
for oid in obids:
command = pack_set_speed_command(
object_id=oid, speed=speed, ramp_time_ms=ramp_time, ssc=init_ssc
q.add_pus_tc(
pack_set_speed_command(object_id=oid, speed=speed, ramp_time_ms=ramp_time)
)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
return init_ssc
def rw_speed_down_cmd_consec(
tc_queue: TcQueueT, obids: List[bytes], init_ssc: int, ramp_time: int
) -> int:
def rw_speed_down_cmd_consec(q: QueueHelper, obids: List[bytes], ramp_time: int):
for oid in obids:
command = pack_set_speed_command(
object_id=oid, speed=0, ramp_time_ms=ramp_time, ssc=init_ssc
q.add_pus_tc(
pack_set_speed_command(object_id=oid, speed=0, ramp_time_ms=ramp_time)
)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
return init_ssc
def activate_all_rws_in_sequence(
tc_queue: TcQueueT, init_ssc: int, test_speed: int, test_ramp_time: int
) -> int:
q: QueueHelper, init_ssc: int, test_speed: int, test_ramp_time: int
):
new_ssc = init_ssc
# RW1 speed cmd
tc_queue.appendleft((QueueCommands.WAIT, 58))
new_ssc = rw_speed_cmd_single(
tc_queue, oids.RW1_ID, new_ssc, test_speed, test_ramp_time
)
tc_queue.appendleft((QueueCommands.WAIT, 30))
new_ssc = rw_speed_cmd_single(
tc_queue, oids.RW2_ID, new_ssc, test_speed, test_ramp_time
)
tc_queue.appendleft((QueueCommands.WAIT, 30))
new_ssc = rw_speed_cmd_single(
tc_queue, oids.RW3_ID, new_ssc, test_speed, test_ramp_time
)
tc_queue.appendleft((QueueCommands.WAIT, 30))
new_ssc = rw_speed_cmd_single(
tc_queue, oids.RW4_ID, new_ssc, test_speed, test_ramp_time
)
tc_queue.appendleft((QueueCommands.WAIT, 30))
q.add_wait(timedelta(seconds=58.0))
rw_speed_cmd_single(q, oids.RW1_ID, test_speed, test_ramp_time)
q.add_wait_seconds(30.0)
rw_speed_cmd_single(q, oids.RW2_ID, test_speed, test_ramp_time)
q.add_wait_seconds(30.0)
rw_speed_cmd_single(q, oids.RW3_ID, test_speed, test_ramp_time)
q.add_wait_seconds(30.0)
rw_speed_cmd_single(q, oids.RW4_ID, test_speed, test_ramp_time)
q.add_wait_seconds(30.0)
return new_ssc
def activate_all_rws_two_consecutively(tc_queue: TcQueueT, init_ssc: int) -> int:
new_ssc = init_ssc
def activate_all_rws_two_consecutively(q: QueueHelper):
# RW1+3 speed cmd
tc_queue.appendleft((QueueCommands.WAIT, 5.0))
new_ssc = rw_speed_up_cmd_consec(
tc_queue, [oids.RW1_ID, oids.RW3_ID], new_ssc, -20000, 10000
)
tc_queue.appendleft((QueueCommands.WAIT, 70.0))
new_ssc = rw_speed_down_cmd_consec(
tc_queue, [oids.RW1_ID, oids.RW3_ID], new_ssc, 10000
)
tc_queue.appendleft((QueueCommands.WAIT, 15.0))
q.add_wait_seconds(5.0)
rw_speed_up_cmd_consec(q, [oids.RW1_ID, oids.RW3_ID], -20000, 10000)
q.add_wait_seconds(70.0)
rw_speed_down_cmd_consec(q, [oids.RW1_ID, oids.RW3_ID], 10000)
q.add_wait_seconds(15.0)
# RW2+4 speed cmd
new_ssc = rw_speed_up_cmd_consec(
tc_queue, [oids.RW2_ID, oids.RW4_ID], new_ssc, -20000, 10000
)
tc_queue.appendleft((QueueCommands.WAIT, 70.0))
new_ssc = rw_speed_down_cmd_consec(
tc_queue, [oids.RW2_ID, oids.RW4_ID], new_ssc, 10000
)
tc_queue.appendleft((QueueCommands.WAIT, 15.0))
return new_ssc
rw_speed_up_cmd_consec(q, [oids.RW2_ID, oids.RW4_ID], -20000, 10000)
q.add_wait_seconds(70.0)
rw_speed_down_cmd_consec(q, [oids.RW2_ID, oids.RW4_ID], 10000)
q.add_wait_seconds(15.0)

View File

@ -1,4 +1,4 @@
from tmtccmd.tc.definitions import TcQueueT, QueueCommands
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import Modes
from .common import command_mode
@ -15,13 +15,13 @@ class Info:
TCS_BOARD_ASS_OFF = "Switching TCS board assembly off"
def pack_tcs_sys_commands(tc_queue: TcQueueT, op_code: str):
def pack_tcs_sys_commands(q: QueueHelper, op_code: str):
if op_code in OpCodes.TCS_BOARD_ASS_NORMAL:
command_mode(
object_id=TCS_BOARD_ASS_ID,
mode=Modes.NORMAL,
submode=0,
tc_queue=tc_queue,
q=q,
info=Info.TCS_BOARD_ASS_NORMAL,
)
if op_code in OpCodes.TCS_BOARD_ASS_OFF:
@ -29,6 +29,6 @@ def pack_tcs_sys_commands(tc_queue: TcQueueT, op_code: str):
object_id=TCS_BOARD_ASS_ID,
mode=Modes.OFF,
submode=0,
tc_queue=tc_queue,
q=q,
info=Info.TCS_BOARD_ASS_OFF,
)

View File

@ -1,10 +1,9 @@
from datetime import datetime
from spacepackets.ecss import PusTelecommand
from tmtccmd.config import QueueCommands
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.logging import get_console_logger
from tmtccmd.tc import QueueHelper
LOGGER = get_console_logger()
@ -17,12 +16,9 @@ class Info:
SET_CURRENT_TIME = "Setting current time in ASCII format"
def pack_set_current_time_ascii_command(tc_queue: TcQueueT, ssc: int):
def pack_set_current_time_ascii_command(q: QueueHelper):
time_test_current_time = datetime.utcnow().isoformat() + "Z" + "\0"
current_time_ascii = time_test_current_time.encode("ascii")
LOGGER.info(f"Current time in ASCII format: {current_time_ascii}")
tc_queue.appendleft((QueueCommands.PRINT, Info.SET_CURRENT_TIME))
command = PusTelecommand(
service=9, subservice=128, ssc=ssc, app_data=current_time_ascii
)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd(Info.SET_CURRENT_TIME)
q.add_pus_tc(PusTelecommand(service=9, subservice=128, app_data=current_time_ascii))

View File

@ -7,14 +7,22 @@ from tmtccmd.config.definitions import CoreModeList
from spacepackets.ecss import PusVerificator
from tmtccmd import get_console_logger, TcHandlerBase
from tmtccmd.com_if import ComInterface
from tmtccmd.config.globals import update_global, CoreGlobalIds
from deps.tmtccmd.tmtccmd.logging.pus import RawTmtcTimedLogWrapper
from deps.tmtccmd.tmtccmd.pus import VerificationWrapper
from deps.tmtccmd.tmtccmd.tm import SpecificApidHandlerBase, GenericApidHandlerBase
from deps.tmtccmd.tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.logging import get_current_time_string
from tmtccmd.pus import FileSeqCountProvider
from tmtccmd.tc import ProcedureHelper, FeedWrapper, TcProcedureType
from tmtccmd.tc import (
ProcedureHelper,
FeedWrapper,
TcProcedureType,
QueueEntryHelper,
TcQueueEntryType,
)
try:
import spacepackets
@ -48,8 +56,7 @@ from config import __version__
from config.definitions import PUS_APID
from config.hook_implementations import EiveHookObject
from pus_tm.factory_hook import pus_factory_hook
from pus_tc.procedure_packer import pre_tc_send_cb
from pus_tc.procedure_packer import pre_tc_send_cb, handle_default_procedure
LOGGER = get_console_logger()
@ -93,6 +100,29 @@ class TcHandler(TcHandlerBase):
if info.proc_type == TcProcedureType.DEFAULT:
handle_default_procedure(info.to_def_procedure(), wrapper)
def send_cb(self, entry_helper: QueueEntryHelper, com_if: ComInterface):
if entry_helper.is_tc:
if entry_helper.entry_type == TcQueueEntryType.PUS_TC:
pus_tc_wrapper = entry_helper.to_pus_tc_entry()
pus_tc_wrapper.pus_tc.seq_count = (
self.seq_count_provider.next_seq_count()
)
pus_tc_wrapper.pus_tc.apid = PUS_APID
# Add TC after Sequence Count stamping
self.pus_verificator.add_tc(pus_tc_wrapper.pus_tc)
raw_tc = pus_tc_wrapper.pus_tc.pack()
self.raw_logger.log_tc(pus_tc_wrapper.pus_tc)
tc_info_string = f"Sent {pus_tc_wrapper.pus_tc}"
LOGGER.info(tc_info_string)
self.file_logger.info(
f"{get_current_time_string(True)}: {tc_info_string}"
)
com_if.send(raw_tc)
elif entry_helper.entry_type == TcQueueEntryType.LOG:
log_entry = entry_helper.to_log_entry()
LOGGER.info(log_entry.log_str)
self.file_logger.info(log_entry.log_str)
def tmtcc_pre_args() -> EiveHookObject:
print(f"-- eive tmtc v{__version__} --")