From ab0a422292847e953e184a5cd52a2a781a9b11ec Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Fri, 11 Jun 2021 12:53:35 +0200 Subject: [PATCH] added tmtc commander files --- tmtc/README.md | 8 ++ tmtc/config/__init__.py | 0 tmtc/config/custom_com_config.py | 1 + tmtc/config/custom_definitions.py | 24 ++++ tmtc/config/custom_globals.py | 3 + tmtc/config/custom_mode_op.py | 19 ++++ tmtc/config/hook_base.py | 73 ++++++++++++ tmtc/config/object_ids.py | 19 ++++ tmtc/config/tmtc_config.json | 6 + tmtc/config/version.py | 4 + tmtc/log/tmtc_error.log | 0 tmtc/logo.png | Bin 0 -> 13369 bytes tmtc/pus_tc/__init__.py | 0 tmtc/pus_tc/command_data.py | 11 ++ tmtc/pus_tc/service_200_mode.py | 52 +++++++++ tmtc/pus_tc/service_20_parameters.py | 93 ++++++++++++++++ tmtc/pus_tc/service_2_raw_cmd.py | 77 +++++++++++++ tmtc/pus_tc/service_3_housekeeping.py | 154 ++++++++++++++++++++++++++ tmtc/pus_tc/service_8_func_cmd.py | 56 ++++++++++ tmtc/pus_tc/tc_packing.py | 49 ++++++++ tmtc/pus_tm/__init__.py | 0 tmtc/pus_tm/factory_hook.py | 40 +++++++ tmtc/pus_tm/service_3_hk_handling.py | 70 ++++++++++++ tmtc/pus_tm/service_8_handling.py | 19 ++++ tmtc/tmtc_client_cli.py | 46 ++++++++ tmtc/tmtc_client_gui.py | 40 +++++++ tmtc/utility/__init__.py | 0 27 files changed, 864 insertions(+) create mode 100644 tmtc/README.md create mode 100644 tmtc/config/__init__.py create mode 100644 tmtc/config/custom_com_config.py create mode 100644 tmtc/config/custom_definitions.py create mode 100644 tmtc/config/custom_globals.py create mode 100644 tmtc/config/custom_mode_op.py create mode 100644 tmtc/config/hook_base.py create mode 100644 tmtc/config/object_ids.py create mode 100644 tmtc/config/tmtc_config.json create mode 100644 tmtc/config/version.py create mode 100644 tmtc/log/tmtc_error.log create mode 100644 tmtc/logo.png create mode 100644 tmtc/pus_tc/__init__.py create mode 100644 tmtc/pus_tc/command_data.py create mode 100644 tmtc/pus_tc/service_200_mode.py create mode 100644 tmtc/pus_tc/service_20_parameters.py create mode 100644 tmtc/pus_tc/service_2_raw_cmd.py create mode 100644 tmtc/pus_tc/service_3_housekeeping.py create mode 100644 tmtc/pus_tc/service_8_func_cmd.py create mode 100644 tmtc/pus_tc/tc_packing.py create mode 100644 tmtc/pus_tm/__init__.py create mode 100644 tmtc/pus_tm/factory_hook.py create mode 100644 tmtc/pus_tm/service_3_hk_handling.py create mode 100644 tmtc/pus_tm/service_8_handling.py create mode 100644 tmtc/tmtc_client_cli.py create mode 100644 tmtc/tmtc_client_gui.py create mode 100644 tmtc/utility/__init__.py diff --git a/tmtc/README.md b/tmtc/README.md new file mode 100644 index 0000000..0b7b7ab --- /dev/null +++ b/tmtc/README.md @@ -0,0 +1,8 @@ +### How to use this folder + +This folder contains template files to set up the TMTC commander +for a new mission or project. These files are the adaption +point to customize the TMTC commander. + +To do so, simply copy all folder inside the TMTC commander root. This +step is also required because the TMTC commander core will load some modules. diff --git a/tmtc/config/__init__.py b/tmtc/config/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tmtc/config/custom_com_config.py b/tmtc/config/custom_com_config.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/tmtc/config/custom_com_config.py @@ -0,0 +1 @@ + diff --git a/tmtc/config/custom_definitions.py b/tmtc/config/custom_definitions.py new file mode 100644 index 0000000..b34597e --- /dev/null +++ b/tmtc/config/custom_definitions.py @@ -0,0 +1,24 @@ +""" +@brief This file transfers control of the custom definitions like modes to the user. +@details Template configuration file. Copy this folder to the TMTC commander root and adapt + it to your needs. +""" + +import enum +from enum import auto + + +class CustomServiceList(enum.IntEnum): + pass + + +class SerialConfig(enum.Enum): + SERIAL_PORT = auto() + SERIAL_BAUD_RATE = auto() + SERIAL_TIMEOUT = auto() + SERIAL_COMM_TYPE = auto() + + +class EthernetConfig(enum.Enum): + SEND_ADDRESS = auto() + RECV_ADDRESS = auto() diff --git a/tmtc/config/custom_globals.py b/tmtc/config/custom_globals.py new file mode 100644 index 0000000..b28b04f --- /dev/null +++ b/tmtc/config/custom_globals.py @@ -0,0 +1,3 @@ + + + diff --git a/tmtc/config/custom_mode_op.py b/tmtc/config/custom_mode_op.py new file mode 100644 index 0000000..5977c3c --- /dev/null +++ b/tmtc/config/custom_mode_op.py @@ -0,0 +1,19 @@ +""" +@brief This file transfers control of custom mode handling to the user. +@details Template configuration file. Copy this folder to the TMTC commander root and adapt + it to your needs. +""" +import sys + +from tmtccmd.core.backend import TmTcHandler +from tmtccmd.utility.logger import get_logger + +LOGGER = get_logger() + + +def perform_mode_operation_user(tmtc_backend: TmTcHandler, mode: int): + """ + Custom modes can be implemented here + """ + LOGGER.error(f"Unknown mode {mode}, Configuration error !") + sys.exit() diff --git a/tmtc/config/hook_base.py b/tmtc/config/hook_base.py new file mode 100644 index 0000000..9512857 --- /dev/null +++ b/tmtc/config/hook_base.py @@ -0,0 +1,73 @@ +import argparse +from typing import Dict, Union, Tuple + +from tmtccmd.config.definitions import ServiceOpCodeDictT +from tmtccmd.config.hook import TmTcHookBase +from tmtccmd.pus_tm.service_3_base import Service3Base + + +class FsfwHookBase(TmTcHookBase): + from tmtccmd.com_if.com_interface_base import CommunicationInterface + from tmtccmd.core.backend import TmTcHandler + from tmtccmd.pus_tc.definitions import TcQueueT + from tmtccmd.ecss.tm import PusTelemetry + from tmtccmd.utility.tmtc_printer import TmTcPrinter + + def get_version(self) -> str: + from config.version import SW_NAME, SW_VERSION, SW_SUBVERSION, SW_SUBSUBVERSION + return f"{SW_NAME} {SW_VERSION}.{SW_SUBVERSION}.{SW_SUBSUBVERSION}" + + def get_json_config_file_path(self) -> str: + return "config/tmtc_config.json" + + def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT: + from tmtccmd.config.globals import get_default_service_op_code_dict + return get_default_service_op_code_dict() + + def add_globals_pre_args_parsing(self, gui: bool = False): + from tmtccmd.config.globals import set_default_globals_pre_args_parsing + set_default_globals_pre_args_parsing(gui=gui, apid=0xef) + + def add_globals_post_args_parsing(self, args: argparse.Namespace): + from tmtccmd.config.globals import set_default_globals_post_args_parsing + set_default_globals_post_args_parsing(args=args, json_cfg_path=self.get_json_config_file_path()) + + def assign_communication_interface(self, com_if_key: str, tmtc_printer: TmTcPrinter) -> \ + Union[CommunicationInterface, None]: + from tmtccmd.config.com_if import create_communication_interface_default + return create_communication_interface_default( + com_if_key=com_if_key, tmtc_printer=tmtc_printer, json_cfg_path=self.get_json_config_file_path() + ) + + def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int): + print("No custom mode operation implemented") + + def pack_service_queue(self, service: int, op_code: str, service_queue: TcQueueT): + from pus_tc.tc_packing import pack_service_queue_user + pack_service_queue_user(service=service, op_code=op_code, service_queue=service_queue) + + def tm_user_factory_hook(self, raw_tm_packet: bytearray) -> Union[None, PusTelemetry]: + from pus_tm.factory_hook import tm_user_factory_hook + return tm_user_factory_hook(raw_tm_packet=raw_tm_packet) + + def get_object_ids(self) -> Dict[bytes, list]: + from config.object_ids import get_object_ids + return get_object_ids() + + @staticmethod + def handle_service_8_telemetry( + object_id: int, action_id: int, custom_data: bytearray + ) -> Tuple[list, list]: + from pus_tm.service_8_handling import custom_service_8_handling + return custom_service_8_handling( + object_id=object_id, action_id=action_id, custom_data=custom_data + ) + + @staticmethod + def handle_service_3_housekeeping( + object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base + ) -> Tuple[list, list, bytearray, int]: + from pus_tm.service_3_hk_handling import service_3_hk_handling + return service_3_hk_handling( + object_id=object_id, set_id=set_id, hk_data=hk_data, service3_packet=service3_packet + ) diff --git a/tmtc/config/object_ids.py b/tmtc/config/object_ids.py new file mode 100644 index 0000000..cbea7e3 --- /dev/null +++ b/tmtc/config/object_ids.py @@ -0,0 +1,19 @@ +""" +@brief This file transfers control of the object IDs to the user. +@details Template configuration file. Copy this folder to the TMTC commander root and adapt + it to your needs. +""" +from typing import Dict + +PUS_SERVICE_17_ID = bytes([0x53, 0x00, 0x00, 0x17]) +TEST_DEVICE_0_ID = bytes([0x44, 0x01, 0xAF, 0xFE]) +TEST_DEVICE_1_ID = bytes([0x44, 0x02, 0xAF, 0xFE]) + + +def get_object_ids() -> Dict[bytes, list]: + object_id_dict = { + PUS_SERVICE_17_ID: ["PUS Service 17"], + TEST_DEVICE_0_ID: ["Test Device 0"], + TEST_DEVICE_1_ID: ["Test Device 1"] + } + return object_id_dict diff --git a/tmtc/config/tmtc_config.json b/tmtc/config/tmtc_config.json new file mode 100644 index 0000000..6c77d3c --- /dev/null +++ b/tmtc/config/tmtc_config.json @@ -0,0 +1,6 @@ +{ + "COM_IF_KEY": "udp", + "TCPIP_UDP_RECV_MAX_SIZE": 1500, + "TCPIP_UDP_DEST_IP_ADDRESS": "127.0.0.1", + "TCPIP_UDP_DEST_PORT": 7301 +} \ No newline at end of file diff --git a/tmtc/config/version.py b/tmtc/config/version.py new file mode 100644 index 0000000..2c9e6a8 --- /dev/null +++ b/tmtc/config/version.py @@ -0,0 +1,4 @@ +SW_NAME = "fsfw-tmtc" +SW_VERSION = 1 +SW_SUBVERSION = 2 +SW_SUBSUBVERSION = 0 diff --git a/tmtc/log/tmtc_error.log b/tmtc/log/tmtc_error.log new file mode 100644 index 0000000..e69de29 diff --git a/tmtc/logo.png b/tmtc/logo.png new file mode 100644 index 0000000000000000000000000000000000000000..99b9b4f9608f4e53b86ff83747a114477004c9f1 GIT binary patch literal 13369 zcmXwA1ymF6*WMW2&1h-qP6sF{ASxv&T>{cDx}{qrCS4**4;3aLj2s~;>gbW1NP~2I z^Y=gB;cRDTZ0~#D``&o&bMKNLKh&inXCnsy091PSwTuA(f;Rl?K2l=*vycAE7ytkm z=ccLoSWi>)mXDwJb2krX0N{33M3(CP-n$(Aww6^DxzvHQbVHAe#gdqOr$rdcHQqBl z3y}J9L{T8eFD#Mq(!IMov%e-b^Nzz?C)JTO6;zwWRQ;qh?f;b+_}DRirF@>-F$5^R z3mJj-@-E&gS^NYYyXd2P@|tnQf5kUe;{N4ez|6Yhn0De7MApG+>adT_p&Z@qNAmG? zIsGF#T_&^2@Ti~8@rTSVpN$>s!wy&y4rYgCez{TeC2Nb_ivzIA=NQGrgX`IZ1*blh z?(NYiM`l^cd>OV|v$)G7!jGc4Ch2OIr`Xq16wU#(*?L?!GLr5PjJA13anA)F5|Tnd z=FjGAJpr<%Z-31_J0d0Z2C@9|G+YEp1VwiXSxd;ma0Ji{oTK7zYXSa((F1f$| zKrMCI9-EPlz9hZ(-r7UEAnCQ7!-<8Zfu)0+k#-?(GCJkGxOn{&)x3)PTZBCDU4F)7 zfjm+Nsu{kaLsT*3rS0pmjXQ@|i}@EJT{ed)eZ^;sVLSQxve=R^YEqs!T8;mIx=kEF zA4BV)Jf370Gj=!3Zhg&y2uw47n=VLkyJHl!Aq+F7>zc} zcHCaW<#CL{{KjnkWT1auM;;L>5dS1n5g-oqFXMUx4FBA>FTD4Y9>s50@$R$S5izyR z@J%6cM;1q;5Vzep0k@uq1}STsMxUOposgZVU20#G$Fo(n01{$?3xO+3=Iz&xZXNmH z!HUbvTgok0V_jW^;EqQGGysNZZF(533^bm0-3esJVh7mcD{_$|Yy)6{VI>rbcQ)HEz5%DIkP*4JJU z6s<$wh*yh|SRkPefxo{L??)M^|ByH|0ySFc7llIosaTF&8>(< z8ko+5q=}~_u<~WUyb^&GmHoZ^ZVrm;mN7a;WWW2{bwNAtq~17t_E~G zq;Gvg+G=a-1#;6gXcfJT%tKxywgfbi2+CoxwEd2{nCE4l8C`JNR_KMt0d{1lwA9pR zGy|EFGhWO>-+S$AMN~%%CMS$j`nlUtLMcZ{eBVHb!x1}N|^nGH+ElmdhF zF8`@n?dTC@N1ch-C`k!m3~&weW6qlRhL|E$pI z+XfQaY*-LQ;RBv1C%@{NMt3RJPD>g+Yy%1rWb$+SJzY>dj8+w}!GD_fjtXek>8K)- z4c6Ndk&BZ0i5`!T-{$4}*|Y8ysotB}$3X~-g&M$~W zMn#&d$(j`RATy?>mPWixc!5P$`9%&P#Rh*r%7(*2J=T*xFYB)9aY_MNXPW69Pi*O{ z@XKYD>>$%}B$uU;=P_xO#qeF2+om>?Yh3DT@zWUioPh%)ex)ht1gJeZ35h~R#(;ED z)dvv!33l~q7~5krQT+RffoXGB&QR+15i!J77;>(C&{u3l#%z4)dPnLz)*wm z{|xEWw+?=#i6l)nOiBsiRubtCNz{kprRqCy%dG#lYh|~JL`^frOC&k?HX6COyn6K8 zgGY;Kuu@bjm;)du|EY`Z%`G@tmavczah;^NPyg+>|4euUgH32jjiwNg;gNA6@TuJ| zy5d*hal(=F`{|>nM*f z=rP9Z zL?h{lBm2%H;sJc5h`U#;bxZ4U2SwfH%&{7#T7DDTg3P4cHYX%zomGF8CP@|5yB*y~ zT%4_p=E*{_376J#ujo&xWu;xj_en8-cHzXxYhLI+e)!MoR>JC%#fvA)*p358f9qF0 zS+x+y!WD_A2i7G@O22~_K?w|zvj~e|GRp$#cp`E-(l*w3(nPBCZSL&B8hsN?u$|1F zBnG9drg;a{)gKaw0^HHNlwY^koMT3GO#T)HM?7#32RvWUpG`3;aVsUv?z-W! z!{>|g!1EQMkgT4GuO<$)BA(G+Ll;);+auj#7=MhR)tfhz){jC75-|#|pgCN*d5u1W zVO)edz((e>ac0ShKLsoEPstFhIHt#?1wgC-x3;e7!5)j?BJp#AdOrkAJG2du2x?z# zE_TgrEBc07Pvs_=kDudSw*D+7SY<)enjI$Q^CoTncIQqIU<@g)tX>?Bk)uM4TT^)o zxuy6!Yg>7EG~T@klG>v&H2GoS)Pen(HRu=BX-oKk@H&a&cUG3AO##OEl9M8BpvswJ z!yVOM0ahm|F`(q86H<*9&4&Hx25v@*merSET@*~syCzr$Qmd|Zk4G=Lwbcx0X<8~9 zA2{}|MO?G@1X?9s;s}dgQn)FxVBg^5qbfOq+*M+M7P*PwJj}#&T=Fa+nBTdH_^%R7 znYW%3nAh(FTfK|>97ByLJ{8Ztu_3`x6cNsAIKzMTzB*}o_CWg6_ATG_9vy_)j2C_U z{jD|zUwf)Blt>(CNOHU?V};umNJ3aqtC}9MYuOYAuk{Vrd@((4BOLHM#7=RZzA#q- zIf>ABbBnxq@pZSRhio9kjW5f2covZ>_O%@#U56(0l?(YF7IlDFTZ(-Mdi) zTjY=zkVap>l0n#`^lj%HXO#ABt9z%D6tMhq?HQ_tRNr;e_&sN-G1(`g1*~)9KXRPM z=mw6lfD@=zuh;Pqbk8|NR@Uk`X|-zXpj=wt791CiIxa!_{r zOgMvsw5M-Kkh`<%Q;V|S!#UzHKNF5y*Dm3{?eME-uId_#7Uo7dzpMR-%TN8YN|x^) zHmic8v`@cCYMb(MXvPp?cN#WeJknYr}VbVA9|hnW>rF}tFL_KD<356WTEqvyn6c&#$KuAzOMI$ zTU)+W3L0Spw^g)QC{q70D}PkVTIxn_{i@uO`KM5Ft)V+!13?m_ zUmh{Yi^sov$a}wE2dS2D%mS3u^Dx>|v;xO!t9Rb+H8!`z1SK3-MkiKDPVO=JbnO@I z(_2Crw1-YvYNPvPPqojw(>dZQ@oGgw|jZSFQS6!@a~+C7VQ^V zS?OoQLWxTNk-Yn4S!C6#+z0z)do0OfD%3KcipkT&+RDQ;9JbDzXq8M<=sCMkRV9b^ zolyQ~HV@BYz5~JOqI(mT4vf6=mVhg&?Qz}3c{I)~e5?fYO}M$dT}HyA8}=tlQBgq! zqzj(4O&<9mw&F;)S4uv%L<9zPfUJnOsOos(8N8+3CAvv_qLvK`Wi4(Kr%(&@=d~tC7g2_GpsCDuS?u;e+ag6J6;%?ubw7M zzb`dsyKqyr83?I3u2QIaryZ&wGxnF8U&fnDZt$;XPNkAgi4<;73v=@w%PgO{bIZdQ z=3<~*;V9Km)JXHuci%?ilDaj-_is@{SEL{1gKIT9*p3e#)R`K_B&8Kc+u(&Mhn&LU zcbam`ucR=4cxez6UPC>O*2gXBtT;CPia9C2vUc+N-EK=+=am37@`( zwFBZ@h76{v*Qd#61+I{koV?pzUBTL>xsR7Ac}2o;4H|0`1+=o7@D|~B$1ni5 z-lGZ!e~)#(L{KRUwC(`+B(ptc2l49Vko?ZbGh{t7;I!JDaYo9PcQEa|@Am$>Gv;}+ zxJUswVC^ZUp-R}&2%Hc#zYa}d4H6dlMQ3m}NJqp~OS!Pgu51qYBVdz2^4_!P=o$gN zV=X6d*4vgcb)VN#gao5-a^}Z23!`OEBv_!_eq=9PE1@k7RqPY@KlP*Y`t>O(QQViT zDnc2yl9yQ&(_!#=#l3UpRts^1N12L0y9gSpxl&t6_4OdEbxMFEN&TyHph68o5$ z4B^h=A!5F2j0Hxg)p>7-KjoHap{|HI^It}BZM?A8lYG)A({C;OFcdEL327Fnaz|Rt z#rSKE)A-Q)8gfxq>^da5?p~x@j-xH!S0f##vuw1veKPw}SMFf4tN11-zXzLeaALq? zibprXtZngo_b*A6fCwts4BsP1^Yz-pYA);#GwPG!om53inQ#l<)C^{1J2B^%R?65; zloW<0=I2y4E_kqvFPrT9ML@niWzf)sI7a}2w$2vB=PSMPl&$TfZ#g5c?K`%OHR9wy zVH}>*e-`PVLW^9dvpiGOFxzP$wTF1)j!0KI7`cn`{cKMe1uM4<-t?_Ci;X5>(8LoA zK?p9AFNYQOY6E&!OBKozhNX>{JFvluO8LE*d%9kty=5pc@gG^sYgx}r--~?AB{*j3$|W4XBGng{Zg&P%vsHLwCe5=@+S6e=KeeI zZO-J3g)|kv$4JMJ5vdGSXIx%h=)BkJnQ;07SE$(9uq~CpGz+$$ae}^nKd<``YbcSN_k9)yHL+5 znqm?jyxH=4u)^`pJ>3n*kzNk|t8|)dP5h0(8;x|2$%4hcQm($ymNwBFn)vK*K(2YB zkt}Y0&Wc^7O-^HKa^^H5`gCf+dfHRQXKpFM<9i$dhnH& z6hA7%D39fAP^RU!B=2zMlByy#J^KS8AtbJ>S>a;w@_4BXsw!4RVl%?^0yGEwmhZ1> zR%Sx^RUXQ14l2ZCy1ey;+z5C73VWZh^Zc!7>zp7;?Z>i)w^+n^io=G51h1&T!4ogs z?nh#&xmz%GgW?Lug}#v0T0R%%2;^vXqxBLnGdn{|*WCkN*Gi;T;%EYXa%XsAG(8m4 z%joa-G$TSqPCH6x)5ypSp94z2+#M%f^ApGl*^J9)r&Kv$y~g`n|1)DF1OI(r@|J@v z9@xVC%&9;-AovrZKe)M^qJ>e>fU6oi0kgsD;b-U3_sWVIk0ei%*%jJ^F@d}xl>zLW zvN7L+;)wD?(er4_vp(k(KHshNv1P=6OkkFzuwq=DecxA)u^ zpzkI}_munHd(}<8{cwLfuWAEmkR7yFks!)XN;Seq)r5Mxhp&n}!tZ*B<70t6@ztA0 zh@qK_tTBV2Fue_FsOGP8!1aza(J9q$7`kO3e*W#%RcEN_betI+wJCx0>|88!YDNPJ ztG(V&3#o*HImv}sB9Qg){{HmWc#VZ&{9`k$@bPx@bzy)_3T(ML$3@YPj&flK!HJ;| zQF;z+1qcD}VDdg#M(A?S61aoHKrX8t@Z8G{dHgOptHxtu z?}FfvslI0tY7!DN;mrLL^#&!16y!b~+2Ie8XTOsLx4-;y34j%Tal41|mOfISLOsn% zlg15(-A%2aITfoEa*Dt&W)avu4*W@3HlFV5Rxak80?Xs7&!Su?m@FEG?Eg>_TnR*^ z`k;|Gq#G`nLaD|s0h5RB_|lbIqovEe8X?t$1KF_7jUCqFaoQy7+(11Y@F+;CW!Q6M zSj>2UTwhxT+HL0EA^4ath*BcbIt~?x=qSw{+s7gW5Iu_J=D@GqbMHp;jI(iI_e-?< zvm+o4YU9HnLHuQpP^rjd&%WLKoigWqbL+Q4fjrd8Jb}G~KVioq6W%yj;V zpvlwW$Vz*?oNP(;`D)R|p3 zEz3{y5vl6LG~#xBQc6|ZZPYL` z&Qw)FUl!X$ILkmL5-<>Y~bZIM3WBN#;2%&#UaGvT(Q<@#WJYTuN*Nl$qz`4c$tBX{KP$nh)?$HD-dgT!s+qYCNmLL zCX4sK(^G5-+*xODq$h8Bn(0~)BPRcfmHJ6XUn{2b95Y!@P6)MSi zI0^}{tHZ_2&a$)11D#uM-7+#((^L*tD`CPHE;npCvwS&asnRvg6(8a{4`A;uE+x%d&=B zjnNS9?^-0?Oepiu0p~+BHJZ4!yqcn+CSOD@`NbqF?77q~c5thY)klZg(KVx<@!Zug z4EkvxkJsCYmDuycwQ)VdfQ~u<$$%FmeKtqanhs5GUKv5yu=3*@hmG-Jw5-w4FS%)u zYD_sY;#9KOu(#^-SrBhnNSW5pEsel4Uziw!D$v|#MG>*r_;9WL_a7}s?$*g^U6frV zwYTvRaM!6rm>QO$g(|^EgIi^tiYRf-l9)NOqIH9BA*pk^W<{r|VHOdL!v3RPwr1NT2Wc+>^_h;+h%0&27B>Ukmw8(k- z&tJTo0|yGv?C>hx@}AC8rI2gc_BB{Od_Rx&flF~kg)FAATv_nZRYns!awZ!&#GNsmy1p{6G$mnEnXE$DVCME zhd&H}v1y^I&{InR;^)ycGK=%=|ElydBi-{HkIO`YkH#IOLUbnyql>0OrEp5?ti0O$ zEHyqFWvjoFMop^2gw)!=OD-%-5y@+`_cP@nNX$de-7wBp@*ik?mjyH|z9_~&~Wf5Z54uR{z-35^A1nJV84;yn_pzr{LjMA{EM zN;MS6X~G}C91o}K1?7`6YVjGcn0_r@eKt7zBoA-ELpq!@Dg^n{z}9N1MF-hgFFpJF zMF~^PDVsj^Mv5q+NSdI2PqqWjaUZ;eK~Z_uP8Rqm$q|e%3@`o_mu!~xCq{Hb?+W*; zngQmo1OyjlhOuG`eiiz4X@{=on;T+1jidp&_7GbF&bP#q``mcMN?d1p1iblqN^e~7 z_Ed>#PGZfFFMFJ5Anf)#t6a~t=6szuZUtJ(aOOlX+iCF6e&(PLnZu`O(z%jk6TCe= zLDw)Y2;ua7Jb)6_?NVYnb8+P_y2&~ZGN*$g27?`Zq zqAoCInh2MjE#un-iQ`?8CjmH)sye0f^;K(uR4?%6nTdLYh@!N4VW6b;*CnJdI^wV4 za(i$K3Y$xt@7$a}pqbJmyGIs*e4%g#)MPOd8Kp7Rfu^#8$+hVnt8mrT1$L9ZX0Y!`RX{vw> zAh3Jm>%1aCKF3Q)8!x0q2Dj$XV-vyZ2p-Al8)Wg_#}~Sb{1bI|nenvP<)P^qYP*fa z*SBW5Ta85N5iiKD10E5S{2-~oZpK0(-{-4k#J{vBD?P6bp#-!@f8tKDtP-y)pJP?( z@mUb{HA~mBl8la}+SbJcx{zbV;bLUN6HL6}nu=>dWdhphJFf8x3Y$$#%K2KzFMAPG z3nu@FCIg7@tG-t5B16dB7}AW0VM*%8M9arM0j<4+ov@gU~RO5{dq6AxY=f} z@2=3}J5FjcF2v{?)qe*sj>ea_EY!%nr)zR{_E}U50nw$b4MD;!0v1N$3O)a5C1Aso zw);26R%Br>?(%4@e^UHJ<;nLf!XfAMz!sxp7-A|%@KE|w6g7hH=$`nlvod4UqXkOL zIG(Oi`fdk}CQYpZ7{EgpRnD7dhozX;H>n7v-ibhwy+_G)A}0-lCAv-3y|wY>?Q|`WMyg37 zbkp$F;&*iCljPs2FL2ic79$hajO4UI!ZPc(?eN`Bd+n&FUdaZxw@1Fj&og>$dU+f^q}KG!q1cVrX-f&Y2kux`rAq15qSJkV@w=^5Hs zS3Gq)n#l>rT=E2u?{`7?&aV-iz~^Q8RXqZ{gWsRo;L4gYh?PA6U$$i*InWE(1COBw zbC19JjfOb|*AF1}-gf*3TntVz{R4AhC3{n+M@x0>d{y5cF+PTFdI}4diFWZ0A zIlav~OO|AqeQGuP%VAw|RIuYk2AUq0Nz!?LQqn*GRT#~emJw$iU0ZW&NmaJfChDNk zw(CxdVeZXO&ox7LJocZ9spfBa?^+6O>zseHqRhpSxmv33$=gr)jpg?TlqbRdkF5RM zveEIf@O7!JXMQ0Z4mrAwB1a$S_R8V5e6~Y2ja3gN^VcyO|FWZ}^vlO>^FF(-ZZ*;?fr~ zWY`|&{RvW~9~0Ng^TuyzoqG!+%9)xp$081>EKk9fB7=~Rl9b36k>Fy}@j{0EM$AGzIL>4<+yO%Pg{%; z*L_iv&>-+GnAxY$bUc+AnU!$7>hmR88v^ZV`GwIZ?Bc79En;*gr=5TdduC9_+1EY z@4P+tQr(hu&O1|q3e~5AT*mW?7=?Gtixb-(5bS+B%iji!(4IU+Z>s|triyQAL~8OS z*}A!MsV+xeRvRx*VqlbAlj@0)yUwl`tj~Cw?IKDWNs4&k+P$P@`7U!$k)^{@xKO)y z`$fz{nuk&mWO)TGNGh}l|A%m=P{tFRx-ZQMl56(3owP0&=rsLJMCQI?bNHB|G~2jh zC8av^^LI)vxA%UnYj&9Hx&LotVb=@; zQ7EM8-Ezl8jIKcE^!rnNID+i7s{HC5auXTGdTcKl>m}|szerKE6|ysoSl<(p-8`g5 zP(eFydvp&N2#>RqZovjGfL1(ARf%G1e-TB0bO8~R*ipckWGXG&>ZWCNdb>4^<&2%t zLuLm9bwRwl7AN{FQ{;_41Nqpz7IS?$x33J;V0<%on{n!869l$X1cSq8UaxW_;5 zX=M_R8H9MI;mL+1^RaC%NwxnJw#_B5d%* z!-Bndv1Z9jQrP);qLIPb$p>U`ulMt;=ieM~2?(;$bKj>UuaG7&?a)+m(0AXNPx|IR zphHz}PNF$|Lzv?&xNP|q;{!LI_v#KDi{5v-XGdj=d$hj)qtl)u)gq8h&fv7?kxwD) zd$Y3FNvn64H>poUA$abW-&qi_snIt>d!B=$!(<>PLUF^@ctt!|-(MoH8-fAMH@F(H zYqrK}F%!}_7}TRnhADPVLv>2+XUx*YmMOMHwoXbCS~7{wd#oqXoj;ZDWBJ=ERFxb; ztX|HU+UT>9H$-v6ygil3U0If}?pB@s7yRU`Z7@HegV zC55&nR~~pQJ^F{=mMkK-;t{CWWkh5c71zqELZw8G24nk8t}hWw@)sD$mnz#YC!-Wf zY-r&SC7J>8kl(dUm){<3M4DSsPbAapHFSt;wlg91^%q(2qY?O?HwM<#N==P`zgH z3fsD4tc?h#?x607Hc;lR>5cMjpNp^nxh7MD(0;}WVrVPN-A4mnNQ1rqdYtg=@iyhV zTwE)8mr2%wEQ4E$_0aZ43+X3*xCbN+`WXY^3=UJ$AuUOMQWP+=KSZBmtVoR`5Dc~Eg#yAuaKusj((#qIko8k9g97-@DN{Xq z#AoymsN|Od`pKi56qEl(+|q1b-|%G-#mk3x3yq=ym$GYb4Ow#n3g9gW@r1{u7aoiW|_#I;%)btncy9>~RDhZD}`4^yYP_#P;l^N2vngoto2CKa_ zqf;~FJ8-i0**NNdV|}AH4yiCJveU{wX37&s(GP8Pdqhhf6#i)mFEkZx6EszzKRuSU>)d@`!bz6BXQ>aFdQC)^HT>Af9%oHuR zB%>G-9DUR1+ewPx!ME&WTZ|Evz5==b8C)Sd$In-)KLMz93mmp`u;e&hCW285 zqD|L|y-a}p?VDmMt5v^of9y(+d9xjpm9pbf&!gGT(zE( z%i1TzECRLHO+S{X2pJ!$gnFs&WtA_bD%uxgXRaxFEIeH)IY>K9x~~EnxQzad@H%@C ze5=7Ycid9Gaj-LEZ8LKYT2+c9^w8N}g^Ta^@A6|G9C+L`vW@G>Vz6&h`>hXxa~%>% z$ne{D@70pq4yR_GAq4(L_;87{ww+CJV7revQ8UQj3CDldrguT<2RHfU+YjXnQpI0q zesyD=*R^fhni|FL94F;xuQRZ-;waDL+`Fp>Eju9StMWqOur)cCYV7gO@+NWQId9x;+H?T|9`b%(jF#1A)5w`%qTA29 zQjXN8q#f}TL2n2;C?KSe8sVoB{?YBxS27(NPMH3piwH?6x!yltV@8w<6F8%}LGFNB zkUP-2Rw?mgOzfaqZsEVP6Db{uutY>}oG8{#-by?>9Li3urZQ|8_EgP3O|G?^} zZAATeJidfnCuP=Pq-9E@tw22J@#Me50FyM28~KVLxb@W6o$0)T6jshB+8~MPuue5N zIeu$u@n3k@lw~*n;l_vIO6J3bw24-_M`)?V^Rn@Z7Kc*5u_!wWr2t1;K0Vdn6pf>_ z=F##DkqwbP!Ixtk_xwzBetqF&k+r7U=@nlxAMT=%(d7okMpOM=zWv{4gXd|EfT09I z=99PRN3dMYWVYa^{c2G<=TuI3)y#uW=R4x(*1iZfqFl--YfgVkW%LDfs3e-HvkEgIj3-y2b_+h@i;F!-%&c`Rkf43*z(lqwO zSJ`{%e%hQ%G@0oVNaBA7PWynUd1UY;@}?z8|zNUVE)S5#6l&K=M^x z;{zwx^_jlM4Md9kZFcLstXJ3V$!ZS_G9JBNjhNSIR(u>@kMG4?( int: + new_ssc = init_ssc + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200")) + # Object ID: DUMMY Device + object_id = TEST_DEVICE_0_ID + # Set On Mode + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode On")) + mode_data = pack_mode_data(object_id, 1, 0) + command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + new_ssc += 1 + # Set Normal mode + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Normal")) + mode_data = pack_mode_data(object_id, 2, 0) + command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + new_ssc += 1 + # Set Raw Mode + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Raw")) + mode_data = pack_mode_data(object_id, 3, 0) + command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + new_ssc += 1 + # Set Off Mode + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Off")) + mode_data = pack_mode_data(object_id, 0, 0) + command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + new_ssc += 1 + tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service200.txt")) + return new_ssc + diff --git a/tmtc/pus_tc/service_20_parameters.py b/tmtc/pus_tc/service_20_parameters.py new file mode 100644 index 0000000..28116b1 --- /dev/null +++ b/tmtc/pus_tc/service_20_parameters.py @@ -0,0 +1,93 @@ +import struct + +from tmtccmd.config.definitions import QueueCommands +from tmtccmd.ecss.tc import PusTelecommand +from tmtccmd.pus_tc.definitions import TcQueueT +from tmtccmd.pus_tc.service_20_parameter import pack_type_and_matrix_data, \ + pack_parameter_id +from tmtccmd.pus_tc.service_200_mode import pack_mode_data +from tmtccmd.utility.logger import get_logger + +from config.object_ids import TEST_DEVICE_0_ID + +LOGGER = get_logger() + + +def pack_service20_commands_into(tc_queue: TcQueueT, op_code: str): + if op_code == "0": + pack_service20_test_into(tc_queue=tc_queue) + + +def pack_service20_test_into(tc_queue: TcQueueT, called_externally: bool = False): + if called_externally is False: + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20")) + object_id = TEST_DEVICE_0_ID + + # set mode normal + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Set Normal Mode")) + mode_data = pack_mode_data(object_id, 2, 0) + command = PusTelecommand(service=200, subservice=1, ssc=2000, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + + load_param_0_simple_test_commands(tc_queue=tc_queue) + load_param_1_simple_test_commands(tc_queue=tc_queue) + load_param_2_simple_test_commands(tc_queue=tc_queue) + + if called_externally is False: + tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service20.txt")) + + +def load_param_0_simple_test_commands(tc_queue: TcQueueT): + object_id = TEST_DEVICE_0_ID + parameter_id_0 = pack_parameter_id(domain_id=0, unique_id=0, linear_index=0) + # test checking Load for uint32_t + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load uint32_t")) + type_and_matrix_data = pack_type_and_matrix_data(3, 14, 1, 1) + parameter_data = struct.pack("!I", 42) + payload = object_id + parameter_id_0 + type_and_matrix_data + parameter_data + command = PusTelecommand(service=20, subservice=128, ssc=2010, app_data=payload) + tc_queue.appendleft(command.pack_command_tuple()) + + # test checking Dump for uint32_t + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Dump uint32_t")) + payload = object_id + parameter_id_0 + command = PusTelecommand(service=20, subservice=129, ssc=2020, app_data=payload) + tc_queue.appendleft(command.pack_command_tuple()) + + +def load_param_1_simple_test_commands(tc_queue: TcQueueT): + object_id = TEST_DEVICE_0_ID + parameter_id_1 = pack_parameter_id(domain_id=0, unique_id=1, linear_index=0) + # test checking Load for int32_t + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load int32_t")) + type_and_matrix_data = pack_type_and_matrix_data(4, 14, 1, 1) + parameter_data = struct.pack("!i", -42) + payload = object_id + parameter_id_1 + type_and_matrix_data + parameter_data + command = PusTelecommand(service=20, subservice=128, ssc=2030, app_data=payload) + tc_queue.appendleft(command.pack_command_tuple()) + + # test checking Dump for int32_t + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Dump int32_t")) + payload = object_id + parameter_id_1 + command = PusTelecommand(service=20, subservice=129, ssc=2040, app_data=payload) + tc_queue.appendleft(command.pack_command_tuple()) + + +def load_param_2_simple_test_commands(tc_queue: TcQueueT): + object_id = TEST_DEVICE_0_ID + parameter_id_2 = pack_parameter_id(domain_id=0, unique_id=2, linear_index=0) + # test checking Load for float + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load float")) + type_and_matrix_data = pack_type_and_matrix_data(ptc=5, pfc=1, rows=1, columns=3) + parameter_data = struct.pack("!fff", 4.2, -4.2, 49) + payload = object_id + parameter_id_2 + type_and_matrix_data + parameter_data + command = PusTelecommand(service=20, subservice=128, ssc=2050, app_data=payload) + tc_queue.appendleft(command.pack_command_tuple()) + + # test checking Dump for float + # Skip dump for now, still not properly implemented + # tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Dump float")) + # payload = object_id + parameter_id_2 + # command = PusTelecommand(service=20, subservice=129, ssc=2060, app_data=payload) + # tc_queue.appendleft(command.pack_command_tuple()) + diff --git a/tmtc/pus_tc/service_2_raw_cmd.py b/tmtc/pus_tc/service_2_raw_cmd.py new file mode 100644 index 0000000..f26f33e --- /dev/null +++ b/tmtc/pus_tc/service_2_raw_cmd.py @@ -0,0 +1,77 @@ +# -*- coding: utf-8 -*- +""" +@file tmtcc_tc_service_2_raw_cmd.py +@brief PUS Service 2: Device Access, native low-level commanding +@author R. Mueller +@date 01.11.2019 +""" +import struct + +from tmtccmd.config.definitions import QueueCommands +from tmtccmd.ecss.tc import PusTelecommand +from tmtccmd.pus_tc.definitions import TcQueueT +from pus_tc.service_200_mode import pack_mode_data + +import pus_tc.command_data as cmd_data +from config.object_ids import TEST_DEVICE_0_ID + + +def pack_service_2_commands_into(tc_queue: TcQueueT, op_code: str): + if op_code == "0": + pack_generic_service_2_test_into(0, tc_queue) + else: + print(f"pack_service_2_test: Operation code {op_code} unknown!") + + +def pack_generic_service_2_test_into(init_ssc: int, tc_queue: TcQueueT) -> int: + new_ssc = init_ssc + object_id = TEST_DEVICE_0_ID # dummy device + # Set Raw Mode + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Setting Raw Mode")) + mode_data = pack_mode_data(object_id, 3, 0) + command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + new_ssc += 1 + # toggle wiretapping raw + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Toggling Wiretapping Raw")) + wiretapping_toggle_data = pack_wiretapping_mode(object_id, 1) + toggle_wiretapping_on_command = PusTelecommand( + service=2, subservice=129, ssc=new_ssc, app_data=wiretapping_toggle_data + ) + tc_queue.appendleft(toggle_wiretapping_on_command.pack_command_tuple()) + new_ssc += 1 + # send raw command, wiretapping should be returned via TM[2,130] and TC[2,131] + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Sending Raw Command")) + raw_command = cmd_data.TEST_COMMAND_0 + raw_data = object_id + raw_command + raw_command = PusTelecommand(service=2, subservice=128, ssc=new_ssc, app_data=raw_data) + tc_queue.appendleft(raw_command.pack_command_tuple()) + new_ssc += 1 + # toggle wiretapping off + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Toggle Wiretapping Off")) + wiretapping_toggle_data = pack_wiretapping_mode(object_id, 0) + toggle_wiretapping_off_command = PusTelecommand(service=2, subservice=129, ssc=new_ssc, + app_data=wiretapping_toggle_data) + tc_queue.appendleft(toggle_wiretapping_off_command.pack_command_tuple()) + new_ssc += 1 + # send raw command which should be returned via TM[2,130] + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Send second raw command")) + command = PusTelecommand(service=2, subservice=128, ssc=new_ssc, app_data=raw_data) + tc_queue.appendleft(command.pack_command_tuple()) + new_ssc += 1 + + # Set mode off + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Setting Off Mode")) + mode_data = pack_mode_data(object_id, 0, 0) + command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + new_ssc += 1 + tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service_2.txt")) + return new_ssc + + +# wiretappingMode = 0: MODE_OFF, wiretappingMode = 1: MODE_RAW +def pack_wiretapping_mode(object_id, wiretapping_mode_): + wiretapping_mode = struct.pack(">B", wiretapping_mode_) # MODE_OFF : 0x00, MODE_RAW: 0x01 + wiretapping_toggle_data = object_id + wiretapping_mode + return wiretapping_toggle_data diff --git a/tmtc/pus_tc/service_3_housekeeping.py b/tmtc/pus_tc/service_3_housekeeping.py new file mode 100644 index 0000000..a5b4f94 --- /dev/null +++ b/tmtc/pus_tc/service_3_housekeeping.py @@ -0,0 +1,154 @@ +from tmtccmd.config.definitions import QueueCommands +from tmtccmd.pus_tc.service_200_mode import pack_mode_data +from tmtccmd.pus_tc.service_20_parameter import pack_boolean_parameter_command +from tmtccmd.pus_tc.service_3_housekeeping import make_sid, generate_one_hk_command, \ + Srv3Subservice +from tmtccmd.ecss.tc import PusTelecommand +from tmtccmd.pus_tc.definitions import TcQueueT +from tmtccmd.pus_tc.service_8_functional_cmd import generate_action_command + +from config.object_ids import TEST_DEVICE_0_ID, TEST_DEVICE_1_ID + + +# Set IDs +TEST_SET_ID = 0 + +# Action IDs +TEST_NOTIFICATION_ACTION_ID = 3 + +# Parameters +PARAM_ACTIVATE_CHANGING_DATASETS = 4 + + +def pack_service_3_commands_into(tc_queue: TcQueueT, op_code: str): + current_ssc = 3000 + # TODO: Import this from config instead + device_idx = 0 + if device_idx == 0: + object_id = TEST_DEVICE_0_ID + else: + object_id = TEST_DEVICE_1_ID + + if op_code == "0": + # This will pack all the tests + pack_service_3_test_info(tc_queue=tc_queue, init_ssc=current_ssc, object_id=object_id, + device_idx=device_idx) + elif op_code == "1": + # Extremely simple, generate one HK packet + pack_gen_one_hk_command(tc_queue=tc_queue, device_idx=device_idx, init_ssc=current_ssc, + object_id=object_id) + elif op_code == "2": + # Housekeeping basic test + pack_housekeeping_basic_test(tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc) + elif op_code == "3": + # Notification demo + pack_notification_basic_test(tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc) + + +def pack_service_3_test_info(tc_queue: TcQueueT, device_idx: int, object_id: bytearray, + init_ssc: int): + tc_queue.appendleft((QueueCommands.PRINT, "Service 3 (Housekeeping Service): All tests")) + current_ssc = init_ssc + + current_ssc += pack_gen_one_hk_command( + tc_queue=tc_queue, device_idx=device_idx, object_id=object_id, init_ssc=current_ssc + ) + current_ssc += pack_housekeeping_basic_test( + tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc + ) + current_ssc += pack_notification_basic_test( + tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc, enable_normal_mode=False + ) + + +def pack_gen_one_hk_command( + tc_queue: TcQueueT, device_idx: int, init_ssc: int, object_id: bytearray +) -> int: + test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID) + tc_queue.appendleft( + (QueueCommands.PRINT, f"Service 3 Test: Generate one test set packet for " + f"test device {device_idx}") + ) + command = generate_one_hk_command(ssc=init_ssc, sid=test_sid) + init_ssc += 1 + tc_queue.appendleft(command.pack_command_tuple()) + return init_ssc + + +def pack_housekeeping_basic_test( + tc_queue: TcQueueT, object_id: bytearray, init_ssc: int, enable_normal_mode: bool = True +) -> int: + """ + This basic test will request one HK packet, then it will enable periodic packets and listen + to the periodic packets for a few seconds. After that, HK packets will be disabled again. + """ + test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID) + current_ssc = init_ssc + # Enable changing datasets via parameter service (Service 20) + tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Performing basic HK tests")) + + if enable_normal_mode: + # Set mode normal so that sets are changed/read regularly + tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode")) + mode_data = pack_mode_data(object_id, 2, 0) + command = PusTelecommand(service=200, subservice=1, ssc=current_ssc, app_data=mode_data) + current_ssc += 1 + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft((QueueCommands.PRINT, "Enabling changing datasets")) + command = pack_boolean_parameter_command( + object_id=object_id, domain_id=0, unique_id=PARAM_ACTIVATE_CHANGING_DATASETS, + parameter=True, ssc=current_ssc + ) + current_ssc += 1 + tc_queue.appendleft(command.pack_command_tuple()) + + # Enable periodic reporting + tc_queue.appendleft((QueueCommands.PRINT, + "Enabling periodic thermal sensor packet generation: ")) + command = PusTelecommand(service=3, subservice=Srv3Subservice.ENABLE_PERIODIC_HK_GEN.value, + ssc=current_ssc, app_data=test_sid) + current_ssc += 1 + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft((QueueCommands.WAIT, 2.0)) + + # Disable periodic reporting + tc_queue.appendleft((QueueCommands.PRINT, + "Disabling periodic thermal sensor packet generation: ")) + command = PusTelecommand(service=3, subservice=Srv3Subservice.DISABLE_PERIODIC_HK_GEN.value, + ssc=current_ssc, app_data=test_sid) + current_ssc += 1 + tc_queue.appendleft(command.pack_command_tuple()) + + # Disable changing datasets via parameter service (Service 20) + tc_queue.appendleft((QueueCommands.PRINT, "Disabling changing datasets")) + command = pack_boolean_parameter_command( + object_id=object_id, domain_id=0, unique_id=PARAM_ACTIVATE_CHANGING_DATASETS, + parameter=False, ssc=current_ssc + ) + current_ssc += 1 + tc_queue.appendleft(command.pack_command_tuple()) + return current_ssc + + +def pack_notification_basic_test(tc_queue: TcQueueT, object_id: bytearray, init_ssc: int, + enable_normal_mode: bool = True) -> int: + current_ssc = init_ssc + tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Performing notification tests")) + + if enable_normal_mode: + # Set mode normal so that sets are changed/read regularly + tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode")) + mode_data = pack_mode_data(object_id, 2, 0) + command = PusTelecommand(service=200, subservice=1, ssc=current_ssc, app_data=mode_data) + current_ssc += 1 + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft((QueueCommands.PRINT, "Triggering notification")) + command = generate_action_command( + object_id=object_id, action_id=TEST_NOTIFICATION_ACTION_ID, ssc=current_ssc + ) + tc_queue.appendleft(command.pack_command_tuple()) + current_ssc += 1 + return current_ssc diff --git a/tmtc/pus_tc/service_8_func_cmd.py b/tmtc/pus_tc/service_8_func_cmd.py new file mode 100644 index 0000000..a53d44b --- /dev/null +++ b/tmtc/pus_tc/service_8_func_cmd.py @@ -0,0 +1,56 @@ +from tmtccmd.config.definitions import QueueCommands +from tmtccmd.ecss.tc import PusTelecommand +from tmtccmd.pus_tc.definitions import TcQueueT + +import pus_tc.command_data as cmd_data +from pus_tc.service_200_mode import pack_mode_data + +from config.object_ids import TEST_DEVICE_0_ID + + +def pack_service_8_commands_into(tc_queue: TcQueueT, op_code: str): + if op_code == "0": + pack_generic_service_8_test_into(tc_queue=tc_queue) + else: + print(f"pack_service_8_test: Operation code {op_code} unknown!") + + +def pack_generic_service_8_test_into(tc_queue: TcQueueT): + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8")) + object_id = TEST_DEVICE_0_ID + + # set mode on + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set On Mode")) + mode_data = pack_mode_data(object_id, 1, 0) + command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + + # set mode normal + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set Normal Mode")) + mode_data = pack_mode_data(object_id, 2, 0) + command = PusTelecommand(service=200, subservice=1, ssc=810, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + + # Direct command which triggers completion reply + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Trigger Step and Completion Reply")) + action_id = cmd_data.TEST_COMMAND_0 + direct_command = object_id + action_id + command = PusTelecommand(service=8, subservice=128, ssc=820, app_data=direct_command) + tc_queue.appendleft(command.pack_command_tuple()) + + # Direct command which triggers _tm_data reply + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Trigger Data Reply")) + action_id = cmd_data.TEST_COMMAND_1 + command_param1 = cmd_data.TEST_COMMAND_1_PARAM_1 + command_param2 = cmd_data.TEST_COMMAND_1_PARAM_2 + direct_command = object_id + action_id + command_param1 + command_param2 + command = PusTelecommand(service=8, subservice=128, ssc=830, app_data=direct_command) + tc_queue.appendleft(command.pack_command_tuple()) + + # Set mode off + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set Off Mode")) + mode_data = pack_mode_data(object_id, 0, 0) + command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service_8.txt")) diff --git a/tmtc/pus_tc/tc_packing.py b/tmtc/pus_tc/tc_packing.py new file mode 100644 index 0000000..757c58f --- /dev/null +++ b/tmtc/pus_tc/tc_packing.py @@ -0,0 +1,49 @@ +""" +@brief This file transfers control of TC packing to the user +@details Template configuration file. Copy this folder to the TMTC commander root and adapt + it to your needs. +""" + +import os +from collections import deque +from typing import Union + +from pus_tc.service_20_parameters import pack_service20_commands_into +from pus_tc.service_2_raw_cmd import pack_service_2_commands_into +from pus_tc.service_3_housekeeping import pack_service_3_commands_into +from pus_tc.service_8_func_cmd import pack_service_8_commands_into +from tmtccmd.utility.logger import get_logger +from tmtccmd.pus_tc.definitions import TcQueueT +from tmtccmd.config.definitions import CoreServiceList +from tmtccmd.pus_tc.service_5_event import pack_generic_service5_test_into +from tmtccmd.pus_tc.service_17_test import pack_generic_service17_test +from pus_tc.service_200_mode import pack_service_200_commands_into + +LOGGER = get_logger() + + +def pack_service_queue_user(service: Union[str, int], op_code: str, service_queue: TcQueueT): + if service == CoreServiceList.SERVICE_2.value: + return pack_service_2_commands_into(op_code=op_code, tc_queue=service_queue) + if service == CoreServiceList.SERVICE_3.value: + return pack_service_3_commands_into(op_code=op_code, tc_queue=service_queue) + if service == CoreServiceList.SERVICE_5.value: + return pack_generic_service5_test_into(tc_queue=service_queue) + if service == CoreServiceList.SERVICE_8.value: + return pack_service_8_commands_into(op_code=op_code, tc_queue=service_queue) + if service == CoreServiceList.SERVICE_17.value: + return pack_generic_service17_test(init_ssc=1700, tc_queue=service_queue) + if service == CoreServiceList.SERVICE_20.value: + return pack_service20_commands_into(tc_queue=service_queue, op_code=op_code) + if service == CoreServiceList.SERVICE_200.value: + return pack_service_200_commands_into(tc_queue=service_queue, op_code=op_code) + LOGGER.warning("Invalid Service !") + + +def create_total_tc_queue_user() -> TcQueueT: + if not os.path.exists("log"): + os.mkdir("log") + tc_queue = deque() + pack_service_2_commands_into(op_code="0", tc_queue=tc_queue) + pack_generic_service17_test(init_ssc=1700, tc_queue=tc_queue) + return tc_queue diff --git a/tmtc/pus_tm/__init__.py b/tmtc/pus_tm/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tmtc/pus_tm/factory_hook.py b/tmtc/pus_tm/factory_hook.py new file mode 100644 index 0000000..8e5115a --- /dev/null +++ b/tmtc/pus_tm/factory_hook.py @@ -0,0 +1,40 @@ +""" +@brief This file transfers control of TM parsing to the user +@details Template configuration file. Copy this folder to the TMTC commander root and adapt + it to your needs. +""" + +from tmtccmd.ecss.tm import PusTelemetry +from tmtccmd.utility.logger import get_logger +from tmtccmd.pus_tm.service_1_verification import Service1TM +from tmtccmd.pus_tm.service_2_raw_cmd import Service2TM +from tmtccmd.pus_tm.service_3_housekeeping import Service3TM +from tmtccmd.pus_tm.service_5_event import Service5TM +from tmtccmd.pus_tm.service_8_functional_cmd import Service8TM +from tmtccmd.pus_tm.service_17_test import Service17TM +from tmtccmd.pus_tm.service_20_parameters import Service20TM +from tmtccmd.pus_tm.service_200_mode import Service200TM + +LOGGER = get_logger() + + +def tm_user_factory_hook(raw_tm_packet: bytearray) -> PusTelemetry: + service_type = raw_tm_packet[7] + if service_type == 1: + return Service1TM(raw_tm_packet) + if service_type == 2: + return Service2TM(raw_tm_packet) + if service_type == 3: + return Service3TM(raw_tm_packet) + if service_type == 8: + return Service8TM(raw_tm_packet) + if service_type == 5: + return Service5TM(raw_tm_packet) + if service_type == 17: + return Service17TM(raw_tm_packet) + if service_type == 20: + return Service20TM(raw_tm_packet) + if service_type == 200: + return Service200TM(raw_tm_packet) + LOGGER.info("The service " + str(service_type) + " is not implemented in Telemetry Factory") + return PusTelemetry(raw_tm_packet) diff --git a/tmtc/pus_tm/service_3_hk_handling.py b/tmtc/pus_tm/service_3_hk_handling.py new file mode 100644 index 0000000..138ae00 --- /dev/null +++ b/tmtc/pus_tm/service_3_hk_handling.py @@ -0,0 +1,70 @@ +""" +@brief This file transfers control of housekeeping handling (PUS service 3) to the + developer +@details Template configuration file. Copy this folder to the TMTC commander root and adapt + it to your needs. +""" +import struct +from typing import Tuple +from tmtccmd.pus_tm.service_3_housekeeping import Service3Base +from tmtccmd.utility.logger import get_logger + +from config.object_ids import TEST_DEVICE_0_ID, TEST_DEVICE_1_ID +LOGGER = get_logger() + + +def service_3_hk_handling( + object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base +) -> Tuple[list, list, bytearray, int]: + """ + This function is called when a Service 3 Housekeeping packet is received. + + Please note that the object IDs should be compared by value because direct comparison of + enumerations does not work in Python. For example use: + + if object_id.value == ObjectIds.TEST_OBJECT.value + + to test equality based on the object ID list. + + @param object_id: + @param set_id: + @param hk_data: + @param service3_packet: + @return: Expects a tuple, consisting of two lists, a bytearray and an integer + The first list contains the header columns, the second list the list with + the corresponding values. The bytearray is the validity buffer, which is usually appended + at the end of the housekeeping packet. The last value is the number of parameters. + """ + if object_id == TEST_DEVICE_0_ID or object_id == TEST_DEVICE_1_ID: + return handle_test_set_deserialization(hk_data=hk_data) + else: + LOGGER.info("Service3TM: Parsing for this SID has not been implemented.") + return [], [], bytearray(), 0 + + +def handle_test_set_deserialization(hk_data: bytearray) -> Tuple[list, list, bytearray, int]: + header_list = [] + content_list = [] + validity_buffer = bytearray() + # uint8 (1) + uint32_t (4) + float vector with 3 entries (12) + validity buffer (1) + if len(hk_data) < 18: + LOGGER.warning("Invalid HK data format for test set reply!") + return header_list, content_list, validity_buffer, 0 + uint8_value = struct.unpack('!B', hk_data[0:1])[0] + uint32_value = struct.unpack('!I', hk_data[1:5])[0] + float_value_1 = struct.unpack('!f', hk_data[5:9])[0] + float_value_2 = struct.unpack('!f', hk_data[9:13])[0] + float_value_3 = struct.unpack('!f', hk_data[13:17])[0] + validity_buffer.append(hk_data[17]) + header_list.append("uint8 value") + header_list.append("uint32 value") + header_list.append("float vec value 1") + header_list.append("float vec value 2") + header_list.append("float vec value 3") + + content_list.append(uint8_value) + content_list.append(uint32_value) + content_list.append(float_value_1) + content_list.append(float_value_2) + content_list.append(float_value_3) + return header_list, content_list, validity_buffer, 3 diff --git a/tmtc/pus_tm/service_8_handling.py b/tmtc/pus_tm/service_8_handling.py new file mode 100644 index 0000000..259e7e4 --- /dev/null +++ b/tmtc/pus_tm/service_8_handling.py @@ -0,0 +1,19 @@ +from typing import Tuple + + +def custom_service_8_handling( + object_id: int, action_id: int, custom_data: bytearray) -> Tuple[list, list]: + """ + This function is called by the TMTC core if a Service 8 data reply (subservice 130) + is received. The user can return a tuple of two lists, where the first list + is a list of header strings to print and the second list is a list of values to print. + The TMTC core will take care of printing both lists and logging them. + + @param object_id: + @param action_id: + @param custom_data: + @return: + """ + header_list = [] + content_list = [] + return header_list, content_list diff --git a/tmtc/tmtc_client_cli.py b/tmtc/tmtc_client_cli.py new file mode 100644 index 0000000..cb844d1 --- /dev/null +++ b/tmtc/tmtc_client_cli.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python3 +""" +@brief TMTC Commander entry point for command line mode. +@details +This client was developed by KSat for the SOURCE project to test the on-board software but +has evolved into a more generic tool for satellite developers to perform TMTC (Telemetry and Telecommand) +handling and testing via different communication interfaces. Currently, only the PUS standard is +implemented as a packet standard. + +Run this file with the -h flag to display options. + +@license +Copyright 2020 KSat e.V. Stuttgart + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +@author R. Mueller +""" +from config.hook_base import FsfwHookBase +try: + from tmtccmd.runner import run_tmtc_commander, initialize_tmtc_commander +except ImportError: + run_tmtc_commander = None + initialize_tmtc_commander = None + print("Python tmtccmd submodule not installed") + print("Install with \"cd tmtccmd && python3 -m pip install -e .\" for interactive installation") + + +def main(): + hook_obj = FsfwHookBase() + initialize_tmtc_commander(hook_object=hook_obj) + run_tmtc_commander(use_gui=False, app_name="TMTC Commander FSFW") + + +if __name__ == "__main__": + main() diff --git a/tmtc/tmtc_client_gui.py b/tmtc/tmtc_client_gui.py new file mode 100644 index 0000000..6ac7ce7 --- /dev/null +++ b/tmtc/tmtc_client_gui.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 +""" +@brief TMTC Commander entry point for GUI mode. +@details +This client was developed by KSat for the SOURCE project to test the on-board software but +has evolved into a more generic tool for satellite developers to perform TMTC (Telemetry and Telecommand) +handling and testing via different communication interfaces. Currently, only the PUS standard is +implemented as a packet standard. + +Run this file with the -h flag to display options. + +@license +Copyright 2020 KSat e.V. Stuttgart + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +@author R. Mueller +""" +from config.hook_base import FsfwHookBase +from tmtccmd.runner import run_tmtc_commander, initialize_tmtc_commander + + +def main(): + hook_obj = FsfwHookBase() + initialize_tmtc_commander(hook_object=hook_obj) + run_tmtc_commander(use_gui=True, app_name="TMTC Commander FSFW") + + +if __name__ == "__main__": + main() diff --git a/tmtc/utility/__init__.py b/tmtc/utility/__init__.py new file mode 100644 index 0000000..e69de29