MSC ROS Unit Test Quick Start Manual
MSC provides control of the Simulator via an API without directly using the UI, enabling script-based control and automation of all basic commands within MORAI SIM: Drive.
This page contains the tutorial for using the MSC API with ROS as the main interface between the simulator and the user.
Setup
Download Example Code
The downloaded file is configured below.
K-City: K-City Scenario File (move this file into this path:
MORAI_Lanucher_Data/SaveFile/Scenario
)msc_ros :
msc_ros_example
files (move files intoworkspace/src
)
Python version
Example code was written and tested in python 3.7.5.
Run Morai Launcher
To use MSC, use Morai Launcher and check for the message 'MSC is Connect' on the upper left-hand corner of the screen.
* If 'MSC in DisConnect' message appears instead, check if another launcher is being run.
Run Code
If the Simulator and code are run in different environments (environments with different IP addresses), corresponding IP addresses in
params.txt
andparams.json
files must be edited.Open params.txt from downloaded files and enter ID, password, version, vehicle, and map (case sensitive). Refer to example below.
receive_user_ip : 127.0.0.1
receive_user_port : 10329
request_dst_ip : 127.0.0.1
request_dst_port : 10509
user_id : id
user_pw : pw
version: v.3.9.210226.3
map/vehicle: K-City/NiroRunning on unit_test_ros.py, the simulator is executed to check the test algorithm using the corresponding ID, password, version, vehicle, and map information in params.txt.
Simulator Network setting
Network Settings configure network information for the saved scenario.
For example, Network Settings can be found in the upper menu bar by selecting Network → Network Settings.
Bridge IP: 127.0.0.1
Code
class sim_ctrl
Class for running simulations via MSC by reading information entered in parmas.txtclass gen_planner
Algorithm for unit test
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import rospy,rospkg
from math import cos,sin,sqrt,pow,atan2,pi
import sys,os,time
import json
from morai_msgs.msg import EgoVehicleStatus,ObjectInfo,CtrlCmd,ScenarioLoad
from lib.ros_utils import pathReader,findLocalPath,purePursuit,Point,cruiseControl,vaildObject,velocityPlanning,pidController
from lib.udp_parser import udp_parser,udp_sender
#read params.txt
file = open('params.txt', 'r')
line=file.read()
params=line.split('\n')
for i in range(0,len(params)):
params[i]=(params[i].split(':')[1].replace(" ","")).replace('\r','')
recive_user_ip = params[0]
recive_user_port = int(params[1])
request_dst_ip = params[2]
request_dst_port = int(params[3])
class sim_ctrl :
def __init__(self):
self.get_status = udp_parser(recive_user_ip, recive_user_port,'get_sim_status')
self.set_status = udp_sender(request_dst_ip, request_dst_port,'set_sim_status')
self.main_loop()
def main_loop(self):
while True:
self.status_data=self.get_status.get_data()
print("data : ",self.status_data)
if not len(self.status_data)==0:
self.data_platform = self.status_data[0]
self.data_stage = self.status_data[1]
self.data_status = self.status_data[2]
self.command_platform = self.status_data[3]
self.command_cmd = self.status_data[4]
self.command_option = self.status_data[5]
self.command_result = self.status_data[6]
if self.data_platform == "0x01" and self.data_stage=="0x01": #Launcher platform에서 로그인 전 상태
cmd_platform="0x01"
cmd_command="0x0001"
cmd_option=params[4]+"/"+params[5]#(ID/PW)
self.send_data([cmd_platform,cmd_command,cmd_option])#Login명령
if self.data_platform=="0x01" and self.data_stage=="0x02": #Launcher platform에서 로그인 후 상태
cmd_platform = "0x01"
cmd_command = "0x0002"
cmd_option = params[6]#simulator version
self.send_data([cmd_platform,cmd_command,cmd_option]) #version 선택 명령
if self.data_status=="0x0001": #Result = 성공
cmd_platform="0x01"
cmd_command="0x0004"
cmd_option=""
self.send_data([cmd_platform,cmd_command,cmd_option]) #Simulator 실행 명령
time.sleep(3)
elif self.data_status=="0x0012": #Result = Simulator 설치 안됨
cmd_platform="0x01"
cmd_command="0x0003"
cmd_option=""
self.send_data([cmd_platform,cmd_command,cmd_option]) #Simulator 설치 명령
if self.data_platform=="0x02" and self.data_stage=="0x01" and self.data_status=="0x0001": #Simulator platform에서 로비 Stage의 대기상태
cmd_platform="0x02"
cmd_command="0x0001"
cmd_option=params[7]#(MAP/Vehicle)
self.send_data([cmd_platform,cmd_command,cmd_option]) #시뮬레이션/옵션 변경 실행 명령
time.sleep(3)
while True: #로딩이 정상적으로 완료될때까지 대기.
_,self.data_stage,_,_,_,_,_=self.get_status.get_data()
print(self.data_stage)
if self.data_stage== "0x02":
if self.data_platform=="0x02" and self.data_stage=="0x02" and self.data_status=="0x0001": #Simulator platform에서 Play상태
break
time.sleep(1)
break
time.sleep(1)
else :
print("[NO Simulator Control Data]")
time.sleep(0.5)
gen_planner() ##test알고리즘
def send_data(self, data):
try:
print("send",data)
cmd_platform=int(data[0],0)
cmd_command=int(data[1],0)
cmd_option=data[2]
self.set_status.send_data([cmd_platform,cmd_command,cmd_option])
except ValueError:
print("Invalid input")
class gen_planner():
def __init__(self):
rospy.init_node('gen_planner', anonymous=True)
#publisher
ctrl_pub = rospy.Publisher('/ctrl_cmd',CtrlCmd, queue_size=1) ## Vehicl Control
scenario_pub = rospy.Publisher('/ScenarioLoad',ScenarioLoad, queue_size=1) ## Vehicl Control
ctrl_msg= CtrlCmd()
scenarioLoad_msgs = ScenarioLoad()
scenarioLoad_msgs.file_name = "scenario_example_ros"
scenarioLoad_msgs.load_ego_vehicle_data=True
scenarioLoad_msgs.load_pedestrian_data=True
scenarioLoad_msgs.load_surrounding_vehicle_data=True
scenarioLoad_msgs.load_object_data=True
#subscriber
rospy.Subscriber("/Ego_topic", EgoVehicleStatus, self.statusCB) ## Vehicl Status Subscriber
rospy.Subscriber("/Object_topic", ObjectInfo, self.objectInfoCB) ## Object information Subscriber
#def
self.is_status=False ## 차량 상태 점검
self.is_obj=False ## 장애물 상태 점검
#class
path_reader=pathReader('gen_ros') ## 경로 파일의 위치
pure_pursuit=purePursuit() ## purePursuit import
self.cc=cruiseControl(0.5,1) ## cruiseControl import (object_vel_gain, object_dis_gain)
self.vo=vaildObject()
pid=pidController() ## pidController import
#read path
self.global_path=path_reader.read_txt("scenario_example.txt") ## 출력할 경로의 이름
vel_planner=velocityPlanning(40/3.6,1.5) ## 속도 계획
vel_profile=vel_planner.curveBasedVelocity(self.global_path,50)
#time var
count=0
rate = rospy.Rate(30) # 30hz
while not rospy.is_shutdown():
if self.is_status==True and self.is_obj==True: ## 차량의 상태, 장애물 상태 점검
if count==0:
time.sleep(5)
scenario_pub.publish(scenarioLoad_msgs)
## global_path와 차량의 status_msg를 이용해 현제 waypoint와 local_path를 생성
local_path,self.current_waypoint=findLocalPath(self.global_path,self.status_msg,0)
## 장애물의 숫자와 Type 위치 속도 (object_num, object type, object pose_x, object pose_y, object velocity)
self.vo.get_object(self.object_info_msg.num_of_objects,self.object_info_msg.object_type,self.object_info_msg.pose_x,self.object_info_msg.pose_y,self.object_info_msg.velocity)
global_obj,local_obj=self.vo.calc_vaild_obj([self.status_msg.pose_x,self.status_msg.pose_y,(self.status_msg.heading+90)/180*pi])
self.cc.checkObject(local_path,global_obj,local_obj)
pure_pursuit.getPath(local_path) ## pure_pursuit 알고리즘에 Local path 적용
pure_pursuit.getEgoStatus(self.status_msg) ## pure_pursuit 알고리즘에 차량의 status 적용
ctrl_msg.steering=-pure_pursuit.steering_angle()/180*pi
cc_vel = self.cc.acc(local_obj,self.status_msg.velocity,vel_profile[self.current_waypoint],self.status_msg) ## advanced cruise control 적용한 속도 계획
target_velocity = cc_vel
control_input=pid.pid(target_velocity,self.status_msg.velocity) ## 속도 제어를 위한 PID 적용 (target Velocity, Status Velocity)
if control_input > 0 :
ctrl_msg.accel= control_input
ctrl_msg.brake= 0
else :
ctrl_msg.accel= 0
ctrl_msg.brake= -control_input
if self.status_msg.velocity < 3.0 and target_velocity<=0.0:
ctrl_msg.accel=0
ctrl_msg.brake=1
print(self.current_waypoint,len(self.global_path.poses)-1)
print(ctrl_msg)
ctrl_pub.publish(ctrl_msg) ## Vehicl Control 출력
if self.current_waypoint == (len(self.global_path.poses)-1) :
print("??????????")
count=0
self.current_waypoint=0
scenario_pub.publish(scenarioLoad_msgs)
time.sleep(3)
count+=1
rate.sleep()
def statusCB(self,data): ## Vehicl Status Subscriber
self.status_msg=data
self.is_status=True
def objectInfoCB(self,data): ## Object information Subscriber
self.object_info_msg=data
self.is_obj=True
if __name__ == "__main__":
ctrl=sim_ctrl()