Merge pull request 'meier/ccsds' (#28) from meier/ccsds into develop
Reviewed-on: #28
This commit is contained in:
commit
1d374230b3
@ -36,3 +36,5 @@ class CustomServiceList(enum.Enum):
|
|||||||
PLOC_MEMORY_DUMPER = "ploc_memory_dumper"
|
PLOC_MEMORY_DUMPER = "ploc_memory_dumper"
|
||||||
CORE = 'core'
|
CORE = 'core'
|
||||||
STAR_TRACKER = 'star_tracker'
|
STAR_TRACKER = 'star_tracker'
|
||||||
|
CCSDS_HANDLER = 'ccsds_handler'
|
||||||
|
PDEC_HANDLER = 'pdec_handler'
|
||||||
|
@ -282,6 +282,19 @@ def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT):
|
|||||||
}
|
}
|
||||||
service_star_tracker_tuple = ("Star tracker", op_code_dict_srv_star_tracker)
|
service_star_tracker_tuple = ("Star tracker", op_code_dict_srv_star_tracker)
|
||||||
|
|
||||||
|
op_code_dict_srv_ccsds_handler = {
|
||||||
|
"0": ("CCSDS Handler: Set low rate", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||||
|
"1": ("CCSDS Handler: Set high rate", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||||
|
"2": ("CCSDS Handler: Enable transmitter", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||||
|
"3": ("CCSDS Handler: Disable transmitter", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||||
|
}
|
||||||
|
service_ccsds_handler_tuple = ("CCSDS Handler", op_code_dict_srv_ccsds_handler)
|
||||||
|
|
||||||
|
op_code_dict_srv_pdec_handler = {
|
||||||
|
"0": ("PDEC Handler: Print CLCW", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||||
|
}
|
||||||
|
service_pdec_handler_tuple = ("PDEC Handler", op_code_dict_srv_pdec_handler)
|
||||||
|
|
||||||
service_op_code_dict[CustomServiceList.P60DOCK.value] = service_p60_tuple
|
service_op_code_dict[CustomServiceList.P60DOCK.value] = service_p60_tuple
|
||||||
service_op_code_dict[CustomServiceList.PDU1.value] = service_pdu1_tuple
|
service_op_code_dict[CustomServiceList.PDU1.value] = service_pdu1_tuple
|
||||||
service_op_code_dict[CustomServiceList.PDU2.value] = service_pdu2_tuple
|
service_op_code_dict[CustomServiceList.PDU2.value] = service_pdu2_tuple
|
||||||
@ -295,5 +308,6 @@ def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT):
|
|||||||
service_op_code_dict[CustomServiceList.PLOC_SUPV.value] = service_ploc_supv_tuple
|
service_op_code_dict[CustomServiceList.PLOC_SUPV.value] = service_ploc_supv_tuple
|
||||||
service_op_code_dict[CustomServiceList.PLOC_UPDATER.value] = service_ploc_updater_tuple
|
service_op_code_dict[CustomServiceList.PLOC_UPDATER.value] = service_ploc_updater_tuple
|
||||||
service_op_code_dict[CustomServiceList.STAR_TRACKER.value] = service_star_tracker_tuple
|
service_op_code_dict[CustomServiceList.STAR_TRACKER.value] = service_star_tracker_tuple
|
||||||
service_op_code_dict[CustomServiceList.PLOC_MEMORY_DUMPER.value] = \
|
service_op_code_dict[CustomServiceList.PLOC_MEMORY_DUMPER.value] = service_ploc_memory_dumper_tuple
|
||||||
service_ploc_memory_dumper_tuple
|
service_op_code_dict[CustomServiceList.CCSDS_HANDLER.value] = service_ccsds_handler_tuple
|
||||||
|
service_op_code_dict[CustomServiceList.PDEC_HANDLER.value] = service_pdec_handler_tuple
|
||||||
|
@ -44,6 +44,8 @@ IMTQ_HANDLER_ID = bytes([0x44, 0x14, 0x00, 0x14])
|
|||||||
# Misc Object IDs
|
# Misc Object IDs
|
||||||
PUS_SERVICE_17_ID = bytes([0x53, 0x00, 0x00, 0x17])
|
PUS_SERVICE_17_ID = bytes([0x53, 0x00, 0x00, 0x17])
|
||||||
TEST_DEVICE_ID = bytes([0x54, 0x00, 0xAF, 0xFE])
|
TEST_DEVICE_ID = bytes([0x54, 0x00, 0xAF, 0xFE])
|
||||||
|
CCSDS_HANDLER_ID = bytes([0x50, 0x00, 0x08, 0x00])
|
||||||
|
PDEC_HANDLER_ID = bytes([0x50, 0x00, 0x07, 0x04])
|
||||||
|
|
||||||
# Payload Object IDs
|
# Payload Object IDs
|
||||||
STAR_TRACKER_ID = bytes([0x44, 0x13, 0x00, 0x1])
|
STAR_TRACKER_ID = bytes([0x44, 0x13, 0x00, 0x1])
|
||||||
@ -76,5 +78,6 @@ def get_object_ids() -> Dict[bytes, list]:
|
|||||||
RAD_SENSOR_ID: "Radiation Sensor",
|
RAD_SENSOR_ID: "Radiation Sensor",
|
||||||
PLOC_SUPV_ID: "PLOC Supervisor",
|
PLOC_SUPV_ID: "PLOC Supervisor",
|
||||||
CORE_CONTROLLER_ID: "Core Controller",
|
CORE_CONTROLLER_ID: "Core Controller",
|
||||||
|
CORE_CONTROLLER_ID: "CCSDS Handler",
|
||||||
})
|
})
|
||||||
return object_id_dict
|
return object_id_dict
|
||||||
|
54
pus_tc/ccsds_handler.py
Normal file
54
pus_tc/ccsds_handler.py
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
"""
|
||||||
|
@file ccsds_handler.py
|
||||||
|
@brief Test commanding of CCSDS Handler
|
||||||
|
@author J. Meier
|
||||||
|
@date 20.11.2021
|
||||||
|
"""
|
||||||
|
from tmtccmd.config.definitions import QueueCommands
|
||||||
|
from tmtccmd.tc.packer import TcQueueT
|
||||||
|
from spacepackets.ecss.tc import PusTelecommand
|
||||||
|
|
||||||
|
|
||||||
|
class CommandIds:
|
||||||
|
# Configures input rate of syrlinks to 400 Khz (results in downlink rate of 200 kbps)
|
||||||
|
SET_LOW_RATE = bytearray([0x0, 0x0, 0x0, 0x0])
|
||||||
|
# Configures input rate of syrlinks to 2000 Khz (results in downlink rate of 1000 kbps)
|
||||||
|
SET_HIGH_RATE = bytearray([0x0, 0x0, 0x0, 0x1])
|
||||||
|
# Enables the syrlinks transmitter (by using RS485 enables lines)
|
||||||
|
EN_TRANSMITTER = bytearray([0x0, 0x0, 0x0, 0x2])
|
||||||
|
# Disables the syrlinks transmitter (by using RS485 enables lines)
|
||||||
|
DIS_TRANSMITTER = bytearray([0x0, 0x0, 0x0, 0x3])
|
||||||
|
|
||||||
|
|
||||||
|
def pack_ccsds_handler_test(object_id: bytearray, tc_queue: TcQueueT, op_code: str) -> TcQueueT:
|
||||||
|
tc_queue.appendleft(
|
||||||
|
(QueueCommands.PRINT,
|
||||||
|
"Testing ccsds handler with object id: 0x" + object_id.hex())
|
||||||
|
)
|
||||||
|
|
||||||
|
if op_code == "0":
|
||||||
|
tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Set low rate"))
|
||||||
|
command = object_id + CommandIds.SET_LOW_RATE
|
||||||
|
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
|
||||||
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
|
|
||||||
|
if op_code == "1":
|
||||||
|
tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Set high rate"))
|
||||||
|
command = object_id + CommandIds.SET_HIGH_RATE
|
||||||
|
command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
|
||||||
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
|
|
||||||
|
if op_code == "2":
|
||||||
|
tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Enables the transmitter"))
|
||||||
|
command = object_id + CommandIds.EN_TRANSMITTER
|
||||||
|
command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
|
||||||
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
|
|
||||||
|
if op_code == "3":
|
||||||
|
tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Disables the transmitter"))
|
||||||
|
command = object_id + CommandIds.DIS_TRANSMITTER
|
||||||
|
command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
|
||||||
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
|
|
||||||
|
return tc_queue
|
28
pus_tc/pdec_handler.py
Normal file
28
pus_tc/pdec_handler.py
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
"""
|
||||||
|
@file pdec_handler.py
|
||||||
|
@brief Test commanding of PDEC Handler
|
||||||
|
@author J. Meier
|
||||||
|
@date 22.11.2021
|
||||||
|
"""
|
||||||
|
from tmtccmd.config.definitions import QueueCommands
|
||||||
|
from tmtccmd.tc.packer import TcQueueT
|
||||||
|
from spacepackets.ecss.tc import PusTelecommand
|
||||||
|
|
||||||
|
|
||||||
|
class CommandIds:
|
||||||
|
# prints the clcw to the console. Useful for debugging
|
||||||
|
PRINT_CLCW = bytearray([0x0, 0x0, 0x0, 0x0])
|
||||||
|
|
||||||
|
|
||||||
|
def pack_pdec_handler_test(object_id: bytearray, tc_queue: TcQueueT, op_code: str) -> TcQueueT:
|
||||||
|
tc_queue.appendleft(
|
||||||
|
(QueueCommands.PRINT,
|
||||||
|
"Testing PDEC handler with object id: 0x" + object_id.hex())
|
||||||
|
)
|
||||||
|
|
||||||
|
if op_code == "0":
|
||||||
|
tc_queue.appendleft((QueueCommands.PRINT, "PDEC Handler: Print CLCW"))
|
||||||
|
command = object_id + CommandIds.PRINT_CLCW
|
||||||
|
command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command)
|
||||||
|
tc_queue.appendleft(command.pack_command_tuple())
|
@ -24,6 +24,7 @@ from pus_tc.reaction_wheels import pack_single_rw_test_into
|
|||||||
from pus_tc.rad_sensor import pack_rad_sensor_test_into
|
from pus_tc.rad_sensor import pack_rad_sensor_test_into
|
||||||
from pus_tc.ploc_upater import pack_ploc_updater_test_into
|
from pus_tc.ploc_upater import pack_ploc_updater_test_into
|
||||||
from pus_tc.ploc_memory_dumper import pack_ploc_memory_dumper_cmd
|
from pus_tc.ploc_memory_dumper import pack_ploc_memory_dumper_cmd
|
||||||
|
from pus_tc.ccsds_handler import pack_ccsds_handler_test
|
||||||
from pus_tc.core import pack_core_commands
|
from pus_tc.core import pack_core_commands
|
||||||
from pus_tc.star_tracker import pack_star_tracker_commands_into
|
from pus_tc.star_tracker import pack_star_tracker_commands_into
|
||||||
from pus_tc.gps import pack_gps_command
|
from pus_tc.gps import pack_gps_command
|
||||||
@ -32,7 +33,8 @@ from config.definitions import CustomServiceList
|
|||||||
from config.object_ids import P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, \
|
from config.object_ids import P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, \
|
||||||
ACU_HANDLER_ID, TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID, IMTQ_HANDLER_ID, \
|
ACU_HANDLER_ID, TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID, IMTQ_HANDLER_ID, \
|
||||||
PLOC_MPSOC_ID, RW1_ID, RW2_ID, RW3_ID, RW4_ID, RAD_SENSOR_ID, PLOC_SUPV_ID, PLOC_UPDATER_ID, \
|
PLOC_MPSOC_ID, RW1_ID, RW2_ID, RW3_ID, RW4_ID, RAD_SENSOR_ID, PLOC_SUPV_ID, PLOC_UPDATER_ID, \
|
||||||
STAR_TRACKER_ID, PLOC_MEMORY_DUMPER_ID, GPS_HANDLER_0_ID, GPS_HANDLER_1_ID
|
STAR_TRACKER_ID, PLOC_MEMORY_DUMPER_ID, GPS_HANDLER_0_ID, GPS_HANDLER_1_ID, CCSDS_HANDLER_ID, \
|
||||||
|
PDEC_HANDLER_ID
|
||||||
|
|
||||||
|
|
||||||
LOGGER = get_console_logger()
|
LOGGER = get_console_logger()
|
||||||
@ -117,6 +119,10 @@ def pack_service_queue_user(service: Union[str, int], op_code: str, service_queu
|
|||||||
return pack_gps_command(object_id=GPS_HANDLER_0_ID, tc_queue=service_queue, op_code=op_code)
|
return pack_gps_command(object_id=GPS_HANDLER_0_ID, tc_queue=service_queue, op_code=op_code)
|
||||||
if service == CustomServiceList.GPS_1.value:
|
if service == CustomServiceList.GPS_1.value:
|
||||||
return pack_gps_command(object_id=GPS_HANDLER_1_ID, tc_queue=service_queue, op_code=op_code)
|
return pack_gps_command(object_id=GPS_HANDLER_1_ID, tc_queue=service_queue, op_code=op_code)
|
||||||
|
if service == CustomServiceList.CCSDS_HANDLER.value:
|
||||||
|
return pack_ccsds_handler_test(object_id=CCSDS_HANDLER_ID, tc_queue=service_queue, op_code=op_code)
|
||||||
|
if service == CustomServiceList.PDEC_HANDLER.value:
|
||||||
|
return pack_ccsds_handler_test(object_id=PDEC_HANDLER_ID, tc_queue=service_queue, op_code=op_code)
|
||||||
LOGGER.warning("Invalid Service !")
|
LOGGER.warning("Invalid Service !")
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user