finished seq count, aocs handlers, housekeeper, maybe other stuff

This commit is contained in:
lkoester
2023-03-17 11:09:17 +01:00
parent 8ceb2c7a79
commit 51471c0a2d
8 changed files with 864894 additions and 336 deletions

View File

@ -9,6 +9,7 @@ from typing import Optional
import datetime
import json
import pprint
import struct
import tmtccmd
from spacepackets.ecss import PusTelemetry, PusTelecommand, PusVerificator
@ -17,7 +18,7 @@ from spacepackets.ecss.pus_1_verification import UnpackParams, Service1Tm
from spacepackets.ccsds.time import CdsShortTimestamp
from tmtccmd import CcsdsTmtcBackend, TcHandlerBase, ProcedureParamsWrapper
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid, create_enable_periodic_hk_command, create_disable_periodic_hk_command
from tmtccmd.tc.pus_11_tc_sched import create_time_tagged_cmd
from tmtccmd.core.base import BackendRequest
from tmtccmd.pus import VerificationWrapper
@ -56,7 +57,7 @@ from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
_LOGGER = logging.getLogger(__name__)
EXAMPLE_PUS_APID = 0x02
EXAMPLE_PUS_APID = 0x50
class SatRsConfigHook(HookBase):
@ -96,6 +97,8 @@ class SatRsConfigHook(HookBase):
)
srv_3 = OpCodeEntry()
srv_3.add(HkOpCodes.GENERATE_ONE_SHOT, "Generate AOCS one shot HK")
srv_3.add(HkOpCodes.ENABLE_PERIODIC, "Enable Periodic AOCS HK")
srv_3.add(HkOpCodes.DISABLE_PERIODIC, "Disable Periodic AOCS HK")
defs.add_service(
name=CoreServiceList.SERVICE_3,
info="PUS Service 3 Housekeeping",
@ -118,6 +121,15 @@ class SatRsConfigHook(HookBase):
info="PUS Service 11 TC Scheduling",
op_code_entry=srv_11,
)
srv_200 = OpCodeEntry()
srv_200.add("0", "AOCS Mode Idle")
srv_200.add("1", "AOCS Mode On")
defs.add_service(
name = CoreServiceList.SERVICE_200,
info="PUS Custom Service 200 Mode",
op_code_entry=srv_200,
)
return defs
def perform_mode_operation(self, tmtc_backend: CcsdsTmtcBackend, mode: int):
@ -178,9 +190,17 @@ class PusHandler(SpecificApidHandlerBase):
if pus_tm.subservice == 25:
if len(pus_tm.source_data) < 8:
raise ValueError("No addressable ID in HK packet")
json_str = str(pus_tm.source_data[8:].decode('utf8'))
json_object = json.loads(json_str)
pprint.pprint(json_object)
#json_str = str(pus_tm.source_data[8:].decode('utf8'))
#json_object = json.loads(json_str)
#pprint.pprint(json_object)
print(pus_tm.tm_data.hex())
i = 8
print(struct.unpack('d', pus_tm.tm_data[8:16]))
while i < len(pus_tm.tm_data):
print(pus_tm.tm_data[i:i+8].hex())
print(struct.unpack('d', pus_tm.tm_data[i:i+8]))
i += 8
dedicated_handler = True
if service == 5:
tm_packet = Service5Tm.unpack(packet, time_reader=CdsShortTimestamp.empty())
@ -242,12 +262,19 @@ class AcsHkIds(enum.IntEnum):
class HkOpCodes:
GENERATE_ONE_SHOT = ["0", "oneshot"]
ENABLE_PERIODIC = ["1", "enable periodic"]
DISABLE_PERIODIC = ["2", "disable periodic"]
def make_target_id(target_id: int) -> bytes:
byte_string = bytearray(struct.pack("!I", target_id))
return byte_string
def make_mode_submode(mode: int, submode: int) -> bytes:
byte_string = bytearray(struct.pack("!I", mode))
byte_string.extend(struct.pack("!I", submode))
return byte_string
class TcHandler(TcHandlerBase):
def __init__(
@ -294,6 +321,15 @@ class TcHandler(TcHandlerBase):
def_proc = helper.to_def_procedure()
service = def_proc.service
op_code = def_proc.op_code
if service == CoreServiceList.SERVICE_200:
q.add_log_cmd("Sending PUS Mode Request telecommand")
if op_code == "0":
tc = PusTelecommand(service=200, subservice=1, app_data=make_mode_submode(0,0), apid=0x50)
return q.add_pus_tc(tc)
if op_code == "1":
tc = PusTelecommand(service=200, subservice=1, app_data=make_mode_submode(1,0), apid=0x50)
return q.add_pus_tc(tc)
if (
service == CoreServiceList.SERVICE_17
or service == CoreServiceList.SERVICE_17_ALT
@ -346,6 +382,16 @@ class TcHandler(TcHandlerBase):
make_addressable_id(RequestTargetId.ACS, AcsHkIds.MGM_SET)
)
)
if op_code in HkOpCodes.ENABLE_PERIODIC:
q.add_log_cmd("Sending periodic HK request")
tc = create_enable_periodic_hk_command(False, make_addressable_id(RequestTargetId.ACS, AcsHkIds.MGM_SET))
q.add_log_cmd(tc)
q.add_pus_tc(tc)
if op_code in HkOpCodes.DISABLE_PERIODIC:
q.add_log_cmd("Sending periodic HK request")
tc = create_disable_periodic_hk_command(False, make_addressable_id(RequestTargetId.ACS, AcsHkIds.MGM_SET))
q.add_log_cmd(tc)
q.add_pus_tc(tc)
pass