: MORAI Simulator Control(MSC) Quick Start Manual.

MSC provides control of Simulator via API without having to manipulate the UI. This page provides the Unit Test Example Manual using the MSC.

Setup

Download Example Code

Python version

  • The example code was written and tested in Python 3.7.5.

Run Morai Launcher

  • To use MSC, use Morai Launcher and check for the message 'MSC is Connect' on the upper left-hand corner of the screen.
    * If 'MSC in DisConnect' message appears instead, check if another launcher is being run.

Run Code

  • If the Simulator and code are running in different environments (environments with different IP addresses), corresponding IP addresses in params.txt and params.json files must be edited.

  • Open params.txt from downloaded files and enter ID, password, version, vehicle, and map (case sensitive). Refer to example below.
    receive_user_ip : 127.0.0.1
    receive_user_port : 10329
    request_dst_ip : 127.0.0.1
    request_dst_port : 10509
    user_id : id
    user_pw : pw
    version: v.3.9.210226.3
    map/vehicle: K-City/Niro

  • Running on unit_test_ros.py, the simulator is executed to check the test algorithm using the corresponding ID, password, version, vehicle, and map information in params.txt.

Simulator Network setting

  • Network Settings configure network information for the saved scenario.

  • Network Settings are configured as below by selecting Network → Network Settings on the upper menu bar.

    • Host IP: 127.0.0.1

    • Destination IP: 127.0.0.1

    • Ego Ctrl Cmd

      • Host PORT: 7601

      • Destination PORT: 7600

    • Ego Vehicle Status

      • Host PORT: 8001

      • Destination PORT: 8000

    • Object Info

      • Host PORT: 8101

      • Destination PORT: 8100


코드

  • class sim_ctrl Class for running simulations via MSC by reading information entered in parmas.txt

  • class planner Algorithm for unit test

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from lib.udp_parser import udp_parser,udp_sender
import time
import threading
import threading
from math import cos,sin,sqrt,pow,atan2,pi
import sys,os,time
import struct
import csv,json
from inputimeout import inputimeout, TimeoutOccurred
from lib.utils import pathReader,findLocalPath,purePursuit,Point,cruiseControl,vaildObject,velocityPlanning,pidController

#read params.txt
file = open('params.txt', 'r')
line=file.read()
params=line.split('\n')
for i in range(0,len(params)):
    params[i]=params[i].split(':')[1].replace(" ","")


recive_user_ip = params[0]
recive_user_port = int(params[1])

request_dst_ip = params[2]
request_dst_port = int(params[3])

class sim_ctrl :

    def __init__(self):            
        self.get_status = udp_parser(recive_user_ip, recive_user_port,'get_sim_status')        
        self.set_status = udp_sender(request_dst_ip, request_dst_port,'set_sim_status') 
        
        self.main_loop()                           

    def main_loop(self):               
        while True:
            self.status_data=self.get_status.get_data()
            
            if not len(self.status_data)==0:
                
                self.data_platform = self.status_data[0]           
                self.data_stage = self.status_data[1]
                self.data_status = self.status_data[2]

                self.command_platform = self.status_data[3]
                self.command_cmd = self.status_data[4]
                self.command_option = self.status_data[5]
                self.command_result = self.status_data[6]               
                          
                
                if self.data_platform == "0x01" and self.data_stage=="0x01": #Launcher platform에서 로그인 전 상태
                    cmd_platform="0x01"
                    cmd_command="0x0001"
                    cmd_option=params[4]+"/"+params[5]#(ID/PW)       
                    self.send_data([cmd_platform,cmd_command,cmd_option])#Login명령
                
                if self.data_platform=="0x01" and self.data_stage=="0x02": #Launcher platform에서 로그인 후 상태
                    cmd_platform = "0x01"
                    cmd_command = "0x0002"
                    cmd_option = params[6]#simulator version
                    self.send_data([cmd_platform,cmd_command,cmd_option]) #version 선택 명령
                    
                    if self.command_result=="0x01": #Result = 성공
                        cmd_platform="0x01"
                        cmd_command="0x0004"
                        cmd_option=""                                                
                        
                        self.send_data([cmd_platform,cmd_command,cmd_option]) #Simulator 실행 명령                       
                        time.sleep(3)
                        
                        
                    elif self.command_result=="0x26": #Result = Simulator 설치 안됨
                        cmd_platform="0x01"
                        cmd_command="0x0003"
                        cmd_option=""
                        self.send_data([cmd_platform,cmd_command,cmd_option]) #Simulator 설치 명령
                  

                if self.data_platform=="0x02" and self.data_stage=="0x01" and self.data_status=="0x0001": #Simulator platform에서 로비 Stage의 대기상태
                    cmd_platform="0x02"
                    cmd_command="0x0001"
                    cmd_option=params[7]#(MAP/Vehicle)                    
                    self.send_data([cmd_platform,cmd_command,cmd_option]) #시뮬레이션/옵션 변경 실행 명령
                    
                    while True: #로딩이 정상적으로 완료될때까지 대기.
                        _,self.data_stage,_,_,_,_,_=self.get_status.get_data()

                        if self.data_stage== "0x02":
                            break        
                    
                    
                    # break

                if self.data_platform=="0x02" and self.data_stage=="0x02" and self.data_status=="0x0001": #Simulator platform에서 Play상태
                    cmd_platform="0x02"
                    cmd_command="0x0013"
                    cmd_option="scenario_example"#(Scenario file name)                                    
                                            
                    cmd_option2="0x01 0x02 0x04 0x08" #option2 or 연산
                    temp_option=cmd_option2.split()
                    for i in range(0,len(temp_option)):
                        temp_option[i]=int(temp_option[i],0)                     
                    result=temp_option[0]
                    
                                                        
                    for i in range(1,len(temp_option)):
                        result=(result|temp_option[i])  
                    cmd_option=cmd_option+"/"+str(result)    
                    
                    time.sleep(1)
                    
                    self.send_data([cmd_platform,cmd_command,cmd_option]) #시나리오 세팅 명령

                    break

                time.sleep(1)
                
            else :
                print("[NO Simulator Control Data]")
                time.sleep(0.5)          

        planner() ##알고리즘 class
            

    def send_data(self, data):
   
        try:
            cmd_platform=int(data[0],0)
            cmd_command=int(data[1],0)
            cmd_option=data[2]         
            self.set_status.send_data([cmd_platform,cmd_command,cmd_option])
        except ValueError:
            print("Invalid input")


class planner :

    def __init__(self):

        path = os.path.dirname( os.path.abspath( __file__ ) )

        with open(os.path.join(path,("params.json")),'r') as fp :
            params = json.load(fp)

        params=params["params"]
        user_ip = params["user_ip"]
        host_ip = params["host_ip"]


        status_port =params["vehicle_status_dst_port"]
        object_port =params["object_info_dst_port"]
        get_traffic_port=params["get_traffic_dst_port"]

        set_traffic_port=params["set_traffic_host_port"]
        ctrl_cmd_port = params["ctrl_cmd_host_port"]

        planner_path_file_name = params["planner_path_file_name"]

        traffic_greenlight_setting= params["traffic_greenlight_setting"]
        
        
        self.status=udp_parser(user_ip, status_port,'status')
        self.obj=udp_parser(user_ip, object_port,'obj')
        self.traffic=udp_parser(user_ip, get_traffic_port,'get_traffic')


        self.ctrl_cmd=udp_sender(host_ip,ctrl_cmd_port,'ctrl_cmd')
        self.set_traffic=udp_sender(host_ip,set_traffic_port,'set_traffic')
        

        self.txt_reader=pathReader()
        self.global_path=self.txt_reader.read(planner_path_file_name)

        vel_planner=velocityPlanning(40,0.7) ## 경로기반 속도 계획
        self.vel_profile=vel_planner.curveBasedVelocity(self.global_path,50)        

        self.pure_pursuit=purePursuit()
        self.cc=cruiseControl(0.5,1) ## cruiseControl import (object_vel_gain, object_dis_gain)
        
        self.vo=vaildObject()## 장애물 유무 확인

        self.pid=pidController() ## pidController import        
  

        self._is_status=False
        while not self._is_status :
            if not self.status.get_data() :
                print('No Status Data Cannot run main_loop')
                time.sleep(0.5)
            else :
                self._is_status=True

        self.main_loop()

    
    def main_loop(self):
        self.timer=threading.Timer(0.1,self.main_loop)
        self.timer.start()
        
        status_data=self.status.get_data()
        obj_data=self.obj.get_data()
        traffic_data = self.traffic.get_data()
        position_x=status_data[4]
        position_y=status_data[5]
        position_z=status_data[6]
        heading=status_data[9]+90# degree
        velocity=status_data[10]
        
        if not len(traffic_data) == 0 and traffic_greenlight_setting == "True": #set trafficlight (green)                        
            self.set_traffic.send_data([False,traffic_data[1],16])
            traffic_data[3]=16


        #fine_local_path, waypoint
        local_path,current_waypoint =findLocalPath(self.global_path,position_x,position_y)
        
        
        ## 장애물의 숫자와 Type 위치 속도 (object_num, object type, object pose_x, object pose_y, object velocity)
        self.vo.get_object(obj_data)
        global_obj,local_obj=self.vo.calc_vaild_obj([position_x,position_y,(heading)/180*pi])


        self.cc.checkObject(local_path,global_obj,local_obj)         
               
        self.pure_pursuit.getPath(local_path) 
        self.pure_pursuit.getEgoStatus(position_x,position_y,position_z,velocity,heading)
            
        steering_angle=self.pure_pursuit.steering_angle()


        #ACC                 
        cc_vel = self.cc.acc(local_obj,velocity,self.vel_profile[current_waypoint]) ## advanced cruise control 적용한 속도 계획        
        target_velocity = cc_vel
              

        control_input=self.pid.pid(target_velocity,velocity) ## 속도 제어를 위한 PID 적용 (target Velocity, Status Velocity)
        if control_input > 0 :
            accel= control_input
            brake= 0
        else :
            accel= 0
            brake= -control_input

        ctrl_mode = 2 # 2 = AutoMode / 1 = KeyBoard
        Gear = 4 # 4 1 : (P / parking ) 2 (R / reverse) 3 (N / Neutral)  4 : (D / Drive) 5 : (L)


        self.ctrl_cmd.send_data([2,Gear,accel,brake,steering_angle])        
       


if __name__ == "__main__":
    ctrl=sim_ctrl()
PY