Various fixes and updates

- Make test event work
- Improved pyclient capabilities and split it up a bit
This commit is contained in:
2023-02-16 01:02:14 +01:00
parent 9da051973c
commit 58544fac7a
10 changed files with 313 additions and 202 deletions

View File

@ -1,22 +1,18 @@
#!/usr/bin/env python3
"""Example client for the sat-rs example application"""
import enum
import logging
import struct
import sys
import time
from typing import Optional
import datetime
import tmtccmd
from spacepackets.ecss import PusTelemetry, PusTelecommand, PusVerificator
from spacepackets.ecss import PusTelemetry, PusVerificator
from spacepackets.ecss.pus_17_test import Service17Tm
from spacepackets.ecss.pus_1_verification import UnpackParams, Service1Tm
from spacepackets.ccsds.time import CdsShortTimestamp
from tmtccmd import CcsdsTmtcBackend, TcHandlerBase, ProcedureParamsWrapper
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid
from tmtccmd.tc.pus_11_tc_sched import create_time_tagged_cmd
from tmtccmd.core.base import BackendRequest
from tmtccmd.pus import VerificationWrapper
from tmtccmd.tm import CcsdsTmHandler, SpecificApidHandlerBase
@ -26,8 +22,6 @@ from tmtccmd.config import (
SetupParams,
HookBase,
TmtcDefinitionWrapper,
CoreServiceList,
OpCodeEntry,
params_to_procedure_conversion,
)
from tmtccmd.config import PreArgsParsingWrapper, SetupWrapper
@ -46,15 +40,16 @@ from tmtccmd.tc import (
DefaultPusQueueHelper,
QueueWrapper,
)
from tmtccmd.tm.pus_5_fsfw_event import Service5Tm
from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider
from tmtccmd.util.obj_id import ObjectIdDictT
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
_LOGGER = logging.getLogger(__name__)
import pus_tc
import tc_definitions
from common import EXAMPLE_PUS_APID, EventSeverity, EventU32
EXAMPLE_PUS_APID = 0x02
_LOGGER = logging.getLogger(__name__)
class SatRsConfigHook(HookBase):
@ -75,38 +70,7 @@ class SatRsConfigHook(HookBase):
return create_com_interface_default(cfg)
def get_tmtc_definitions(self) -> TmtcDefinitionWrapper:
from tmtccmd.config.globals import get_default_tmtc_defs
defs = get_default_tmtc_defs()
srv_5 = OpCodeEntry()
srv_5.add("0", "Event Test")
defs.add_service(
name=CoreServiceList.SERVICE_5.value,
info="PUS Service 5 Event",
op_code_entry=srv_5,
)
srv_17 = OpCodeEntry()
srv_17.add("0", "Ping Test")
defs.add_service(
name=CoreServiceList.SERVICE_17_ALT,
info="PUS Service 17 Test",
op_code_entry=srv_17,
)
srv_3 = OpCodeEntry()
srv_3.add(HkOpCodes.GENERATE_ONE_SHOT, "Generate AOCS one shot HK")
defs.add_service(
name=CoreServiceList.SERVICE_3,
info="PUS Service 3 Housekeeping",
op_code_entry=srv_3,
)
srv_11 = OpCodeEntry()
srv_11.add("0", "Scheduled TC Test")
defs.add_service(
name=CoreServiceList.SERVICE_11,
info="PUS Service 11 TC Scheduling",
op_code_entry=srv_11,
)
return defs
return tc_definitions.tc_definitions()
def perform_mode_operation(self, tmtc_backend: CcsdsTmtcBackend, mode: int):
_LOGGER.info("Mode operation hook was called")
@ -168,7 +132,14 @@ class PusHandler(SpecificApidHandlerBase):
json_str = pus_tm.source_data[8:]
dedicated_handler = True
if service == 5:
tm_packet = Service5Tm.unpack(packet, time_reader=CdsShortTimestamp.empty())
tm_packet = PusTelemetry.unpack(
packet, time_reader=CdsShortTimestamp.empty()
)
src_data = tm_packet.source_data
event_u32 = EventU32.unpack(src_data)
_LOGGER.info(f"Received event packet. Event: {event_u32}")
if event_u32.group_id == 0 and event_u32.unique_id == 0:
_LOGGER.info("Received test event")
if service == 17:
tm_packet = Service17Tm.unpack(
packet, time_reader=CdsShortTimestamp.empty()
@ -197,24 +168,6 @@ class PusHandler(SpecificApidHandlerBase):
# self.printer.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet)
def make_addressable_id(target_id: int, unique_id: int) -> bytes:
byte_string = bytearray(struct.pack("!I", target_id))
byte_string.extend(struct.pack("!I", unique_id))
return byte_string
class RequestTargetId(enum.IntEnum):
ACS = 1
class AcsHkIds(enum.IntEnum):
MGM_SET = 1
class HkOpCodes:
GENERATE_ONE_SHOT = ["0", "oneshot"]
class TcHandler(TcHandlerBase):
def __init__(
self,
@ -259,33 +212,7 @@ class TcHandler(TcHandlerBase):
def_proc = helper.to_def_procedure()
service = def_proc.service
op_code = def_proc.op_code
if (
service == CoreServiceList.SERVICE_17
or service == CoreServiceList.SERVICE_17_ALT
):
q.add_log_cmd("Sending PUS ping telecommand")
return q.add_pus_tc(PusTelecommand(service=17, subservice=1))
if service == CoreServiceList.SERVICE_11:
q.add_log_cmd("Sending PUS scheduled TC telecommand")
crt_time = CdsShortTimestamp.from_now()
time_stamp = crt_time + datetime.timedelta(seconds=10)
time_stamp = time_stamp.pack()
return q.add_pus_tc(
create_time_tagged_cmd(
time_stamp,
PusTelecommand(service=17, subservice=1),
apid=EXAMPLE_PUS_APID,
)
)
if service == CoreServiceList.SERVICE_3:
if op_code in HkOpCodes.GENERATE_ONE_SHOT:
q.add_log_cmd("Sending HK one shot request")
q.add_pus_tc(
generate_one_hk_command(
make_addressable_id(RequestTargetId.ACS, AcsHkIds.MGM_SET)
)
)
pass
pus_tc.pack_pus_telecommands(q, service, op_code)
def main():