#!/usr/bin/python
######################################################################
# This file copyright the Georgia Institute of Technology
#
# Permission is given to students to use or modify this file (only)
# to work on their assignments.
#
# You may NOT publish this file or make it available to others not in
# the course.
#
######################################################################
import unittest
import random
import math
import time
import traceback
import hashlib
import copy
import string
from typing import List, Dict
import sys
from test_cases import GemFinderPartATestCases, GemFinderPartBTestCases
try:
import gem_finder
studentExc = None
except Exception as e:
studentExc = e
import robot
PI = math.pi
########################################################################
# for debugging set the time limit to a big number
########################################################################
TIME_LIMIT = 5 # seconds
########################################################################
# set to True for lots-o-output, also passed into robot under test
########################################################################
VERBOSE_FLAG = False
########################################################################
# set to True to disable multiprocessing while running in a debugger
########################################################################
DEBUGGING_SINGLE_PROCESS = False
########################################################################
# TODO: you can set NOISE_FLAG to false during development
# but be sure to run and test with noise = True
# before submitting your solution.
########################################################################
NOISE_FLAG = True
NOISE_MOVE = 0.01
if DEBUGGING_SINGLE_PROCESS:
import multiprocessing.dummy as mproc
else:
import multiprocessing as mproc
########################################################################
# used to generate unique ids for landmarks. will change for grader
########################################################################
HASH_SEED = ‘some_seed’
PART_A_CREDIT = 0.40
PART_B_CREDIT = 0.60
# DO NOT REMOVE THESE VARIABLES.
PART_A_SCORE = None
PART_B_SCORE = None
class Submission:
“””Student Submission.
Attributes:
submission_action_plan(Queue): Student score of executed action plan.
submission_error(Queue): Error messages generated during executed action plan.
submission_reported_gem_locations(Queue): log of gem locations reported by the extract action used for grading.
“””
def __init__(self):
# if DEBUGGING_SINGLE_PROCESS:
# import queue
# self.time_left = queue.Queue(1)
# self.submission_action_plan = queue.Queue(1)
# self.submission_error = queue.Queue(1)
# self.submission_reported_gem_locations = queue.Queue(1)
# else:
self.time_left = mproc.Manager().Queue(1)
self.submission_action_plan = mproc.Manager().Queue(1)
self.submission_error = mproc.Manager().Queue(1)
self.submission_reported_gem_locations = mproc.Manager().Queue(1)
def _reset(self):
“””Reset submission results.
“””
while not self.time_left.empty():
self.submission_action_plan.get()
self.time_left.put(True)
while not self.submission_action_plan.empty():
self.submission_action_plan.get()
while not self.submission_error.empty():
self.submission_error.get()
while not self.submission_reported_gem_locations.empty():
self.submission_reported_gem_locations.get()
def execute_student_plan(self, area_map: List[list], gem_checklist: List[str], max_distance: float = 1.0,
max_steering: float = PI / 2. + 0.01, robot_distance_noise: float = 0.05,
robot_bearing_noise: float = 0.02, horizon: float = float(‘inf’)):
“””Execute student plan and store results in submission.
Args:
area_map: the area map to test against.
gem_checklist: the list of gems to extract.
max_distance: maximum distance per move.
max_steering: maximum steering per move.
robot_distance_noise: distance noise to set for Robot.
robot_bearing_noise: bearing noise to set for Robot.
horizon: distance of max measurement
“””
self._reset()
state = State(area_map,
gem_checklist,
max_distance,
max_steering,
measure_distance_noise=robot_distance_noise,
measure_bearing_noise=robot_bearing_noise,
horizon=horizon)
if VERBOSE_FLAG:
print(‘Initial State:’)
print(state)
try:
student_planner = gem_finder.GemExtractionPlanner(max_distance, max_steering)
state_output = ”
time_left = self.time_left.get()
while len(state.collected_gems) < len(gem_checklist) and time_left:
state_output += str(state)
ret = student_planner.next_move(copy.deepcopy(state.gem_checklist),
state.generate_measurements())
if isinstance(ret, str):
action = ret
else:
action, locs = ret
state.update_according_to(action)
if VERBOSE_FLAG:
print(state)
if not self.time_left.empty():
time_left = self.time_left.get()
if VERBOSE_FLAG:
print('Final State:')
print(state)
self.submission_action_plan.put(state.collected_gems)
if not time_left:
error_message = ('Test aborted due to timeout. ' +
f'Test was expected to finish in fewer than {TIME_LIMIT} second(s).')
self.submission_error.put(error_message)
except Exception:
self.submission_error.put(traceback.format_exc())
self.submission_action_plan.put([])
class State:
"""Current State.
Args:
area_map: the area map.
gem_checklist: the list of gems you need to collect
max_distance: the max distance the robot can travel in a single move.
max_steering: the max steering angle the robot can turn in a single move.
measure_distance_noise: Noise of the distance measurement
measure_bearing_noise: Noise of the bearing measurement
horizon: distance of max measurement
Attributes:
gem_checklist: the list of needed gems
collected_gems: gems successfully extracted.
max_distance: max distance the robot can travel in one move.
max_steering: the max steering angle the robot can turn in a single move.
_start_position: location of initial robot placement
"""
EXTRACTION_DISTANCE = 0.15
WAIT_PENALTY = 0.1 # seconds
def __init__(self, area_map: List[list], gem_checklist: List[str] = None, max_distance: float = 1.0,
max_steering: float = PI / 2. + 0.01, measure_distance_noise: float = 0.05,
measure_bearing_noise: float = 0.02, horizon: float = float('inf')):
if gem_checklist is None:
gem_checklist = []
self.gem_checklist = list(gem_checklist)
self.collected_gems = []
self.max_distance = max_distance
self.max_steering = max_steering
self.gem_locs_on_map = []
self.orig_gem_checklist = []
self.horizon = horizon
rows = len(area_map)
cols = len(area_map[0])
self._start_position = dict()
# Now process the interior of the provided map
for i in range(rows):
for j in range(cols):
this_square = area_map[i][j]
x, y = float(j), -float(i)
# Process gems
if this_square in string.ascii_uppercase:
gem = {'id': int(hashlib.md5((str(this_square) + str(random.random())).encode('utf-8')).hexdigest(),
16),
'x': x + 0.5,
'y': y - 0.5,
'type': this_square}
self.gem_locs_on_map.append(gem)
self.orig_gem_checklist.append(gem)
# Process start
elif this_square == '@':
self._start_position['x'] = x + 0.5
self._start_position['y'] = y - 0.5
# initialize the robot at the start position and at a bearing pointing due east
self.robot = robot.Robot(x=self._start_position['x'],
y=self._start_position['y'],
bearing=0.0,
max_distance=self.max_distance,
max_steering=self.max_steering,
measure_distance_noise=measure_distance_noise,
measure_bearing_noise=measure_bearing_noise)
def generate_measurements(self, noise: bool = NOISE_FLAG):
"""Generate measurements of gems on map.
Args:
noise: Move with noise if True.
Default: NOISE_FLAG
Returns:
Measurements to gems in the format:
{'unique gem id':{'distance': 0.0, 'bearing': 0.0, 'type': 'A'}, ...}
"""
measurements = dict()
# process gems
for location in self.gem_locs_on_map:
distance, bearing = self.robot.measure_distance_and_bearing_to((location['x'], location['y']), noise=noise)
if distance < self.horizon:
measurements[location['id']] = {'distance': distance,
'bearing': bearing,
'type': location['type']}
return measurements
def update_according_to(self, action: str, noise: bool = NOISE_FLAG):
"""Update state according to action.
Args:
action: action to execute.
noise: Move with noise if True.
Default: NOISE_FLAG
Raises:
Exception: if improperly formatted action.
"""
action = action.split()
action_type = action[0]
if action_type == 'move':
steering, distance = action[1:]
self._attempt_move(float(steering), float(distance), noise=noise)
elif action_type == 'extract':
try:
gem_type = action[1]
estimate_x = float(action[2])
estimate_y = float(action[3])
self._attempt_extraction(gem_type, estimate_x, estimate_y)
except IndexError:
# improper move format: kill test
raise Exception(f"improperly formatted action: {' '.join(action)}")
else:
# improper move format: kill test
raise Exception(f"improperly formatted action: {' '.join(action)}")
def _attempt_move(self, steering: float, distance: float, noise: bool = NOISE_FLAG):
"""Attempt move action if valid.
The robot may move between 0 and max_distance
The robot may turn between -max_steering and +max_steering
Illegal moves - the robot will not move
- Moving a distance outside of [0,max_distance]
- Steering angle outside [-max_steering, max_steering]
Args:
steering: Angle to turn before moving.
distance: Distance to travel.
Raises:
ValueError: if improperly formatted move destination.
"""
try:
distance_ok = 0.0 <= distance <= self.max_distance
steering_ok = (-self.max_steering) <= steering <= self.max_steering
if noise:
steering += random.gauss(0.0, NOISE_MOVE)
distance += random.gauss(0.0, NOISE_MOVE)
if distance_ok and steering_ok:
self.robot.move(steering, distance, True)
except ValueError:
raise Exception(f"improperly formatted move command : {steering} {distance}")
def _attempt_extraction(self, gem_type: str, estimate_x: float, estimate_y: float):
"""Attempt to extract a gem from the current x,y location.
Extract gem if current location is within EXTRACTION_DISTANCE of specified gem_type.
Otherwise, pause for WAIT_PENALTY
"""
for gem_location in self.gem_locs_on_map:
if gem_location['type'] == gem_type:
robot_distance = robot.compute_distance((self.robot.x, self.robot.y),
(gem_location['x'], gem_location['y']))
translated_x = estimate_x + self._start_position['x']
translated_y = estimate_y + self._start_position['y']
estimate_distance = robot.compute_distance((translated_x, translated_y),
(gem_location['x'], gem_location['y']))
if robot_distance <= self.EXTRACTION_DISTANCE and estimate_distance <= self.EXTRACTION_DISTANCE:
self.collected_gems.append(gem_location)
self.gem_locs_on_map.remove(gem_location)
self.gem_checklist.remove(gem_location['type'])
return
time.sleep(self.WAIT_PENALTY)
if VERBOSE_FLAG:
print(f"*** Location ({self.robot.x}, {self.robot.y}) does not contain a gem type <{gem_type}> within ”
f”the extraction distance.”)
def __repr__(self):
“””Output state object as string.
“””
output = ‘\n’
output += ‘Robot State:\n’
output += f’\t x = {self.robot.x:6.2f}, y = {self.robot.y:6.2f}, hdg = {self.robot.bearing * 180. / PI:6.2f}\n’
output += f’Gems Extracted: {self.collected_gems}\n’
output += f’Remaining Gems Needed: {self.gem_checklist}\n’
return output
class GemFinderTestResult(unittest.TestResult):
def __init__(self, stream=None, descriptions=None, verbosity=None):
super(GemFinderTestResult, self).__init__(stream, verbosity, descriptions)
self.stream = stream
self.credit = []
self.results = []
def stopTest(self, test):
super(GemFinderTestResult, self).stopTest(test)
try:
self.credit.append(test.last_credit)
self.results.append(test.last_result)
self.stream.write(test.last_result + ‘\n’)
except AttributeError as exp:
self.stream.write(str(exp))
@property
def avg_credit(self):
try:
return sum(self.credit) / len(self.credit)
except Exception:
return 0.0
class PartATestCase(unittest.TestCase):
“””Test PartA
“””
results_file = ‘results_partA.txt’
results = [”, ‘PART A TEST CASE RESULTS’]
SCORE_TEMPLATE = “\n”.join((
“\nPart A Test Case {test_case} Results”,
” Expected Location:\t{expected}”,
” SLAM Location:\t{location}”,
” Credit: {score:.0%}”
“\n\n- – – – END OF TEST CASE – – – -\n”,
))
FAIL_TEMPLATE = “\n”.join((
“\nPart A Test Case {test_case} Results”,
” Failed: {message}”,
” Expected Location:\t{expected}”,
” SLAM Location:\t{location}”,
” Credit: 0.0″
“\n\n- – – – END OF TEST CASE – – – -\n”,
))
credit = []
def setUp(self):
self.last_result = ”
self.last_credit = 0.0
if studentExc:
self.last_result = str(studentExc)
raise studentExc
def run_with_params(self, params: Dict):
“””Run test case using desired parameters.
Args:
params: a dictionary of test parameters.
“””
state = State(params[‘area_map’],
measure_distance_noise=params[‘robot_distance_noise’],
measure_bearing_noise=params[‘robot_bearing_noise’])
robot_dist_error = float(‘inf’)
landmark_dist_errors = dict()
state_beliefs = list()
ground_truth = list()
try:
gem_finder_slam = gem_finder.SLAM()
# calculate robot position error
for move in params[‘move’]:
meas = state.generate_measurements()
gem_finder_slam.process_measurements(meas)
action = move.split()
state.update_according_to(move)
belief = gem_finder_slam.process_movement(float(action[1]), float(action[2]))
truth = (state.robot.x – state._start_position[‘x’],
state.robot.y – state._start_position[‘y’])
robot_dist_error = robot.compute_distance(belief, truth)
if VERBOSE_FLAG:
print(“Current Belief:”, belief)
print(“True Position:”, truth)
print(“Error:”, robot_dist_error, “\n”)
state_beliefs.append(belief)
ground_truth.append(truth)
# calculate landmark errors
for landmark in state.orig_gem_checklist:
student_landmark_x, student_landmark_y = gem_finder_slam.get_coordinates_by_landmark_id(landmark[‘id’])
translated_x = student_landmark_x + state._start_position[‘x’]
translated_y = student_landmark_y + state._start_position[‘y’]
landmark_dist_errors[landmark[‘id’]] = robot.compute_distance((translated_x, translated_y),
(landmark[‘x’], landmark[‘y’]))
except Exception as exc:
self.last_result = self.FAIL_TEMPLATE.format(message=traceback.format_exc(),
expected=”exception”,
location=”exception”,
**params)
self.last_credit = 0.0
self.fail(str(exc))
max_robot_score = 0.5
max_landmark_score = 0.5
robot_score = 0.0
landmark_score = 0.0
# calculate score for robot distance error
if robot_dist_error < params['robot_tolerance']:
robot_score += max_robot_score
# calculate score for landmark distance errors
missed_landmarks = list()
for landmark_type, landmark_error in landmark_dist_errors.items():
if landmark_error < params['landmark_tolerance']:
landmark_score += max_landmark_score / len(state.orig_gem_checklist)
else:
missed_landmarks.append({'landmark': landmark_type,
'error': landmark_error})
robot_score = round(robot_score, 5)
landmark_score = round(landmark_score, 5)
total_score = robot_score + landmark_score
if total_score >= 1.0:
result = self.SCORE_TEMPLATE.format(expected=ground_truth,
location=state_beliefs,
score=total_score, **params)
else:
if robot_score < max_robot_score:
robot_message = f"Robot location error {robot_dist_error} is greater than {params['robot_tolerance']}. "
else:
robot_message = ''
if landmark_score < max_landmark_score:
landmark_message = f"A landmark locations are greater than {params['landmark_tolerance']}"
else:
landmark_message = ''
result = self.FAIL_TEMPLATE.format(message=robot_message + landmark_message,
expected=ground_truth,
location=state_beliefs, **params)
self.last_result = result
self.last_credit = total_score
self.assertTrue(robot_score >= max_robot_score,
f”Robot location error {robot_dist_error} is greater than {params[‘robot_tolerance’]}”)
self.assertTrue(landmark_score >= max_landmark_score,
f”A landmark location error is greater than {params[‘landmark_tolerance’]}\n{missed_landmarks}”)
def test_case1(self):
self.run_with_params(GemFinderPartATestCases.test_case_1)
def test_case2(self):
self.run_with_params(GemFinderPartATestCases.test_case_2)
def test_case3(self):
self.run_with_params(GemFinderPartATestCases.test_case_3)
def test_case4(self):
self.run_with_params(GemFinderPartATestCases.test_case_4)
def test_case5(self):
self.run_with_params(GemFinderPartATestCases.test_case_5)
def test_case6(self):
self.run_with_params(GemFinderPartATestCases.test_case_6)
class PartBTestCase(unittest.TestCase):
“”” Test PartB.
“””
results_file = ‘results_partB.txt’
results = [”, ‘PART B TEST CASE RESULTS’]
SCORE_TEMPLATE = “\n”.join((
“\nPart B Test Case {test_case} Results”,
” Needed Gems:\t {needed_gems}”,
” Collected Gems:{collected}”,
” Credit: {score:.0%}”
“\n\n- – – – END OF TEST CASE – – – -\n”,
))
FAIL_TEMPLATE = “\n”.join((
“\nPart B Test Case {test_case} Results”,
” Failed: {message}”,
” Needed Gems:\t {needed_gems}”,
” Collected Gems:{collected}”,
” Credit: {score:.0%}”
“\n\n- – – – END OF TEST CASE – – – -\n”,
))
credit = []
def setUp(self):
“””Initialize test setup.
“””
self.last_result = ”
self.last_credit = 0.0
if studentExc:
self.last_result = str(studentExc)
raise studentExc
self.student_submission = Submission()
def check_results(self, params: Dict, error_message: str):
extracted_gems = 0
# Get number of gems collected
if not self.student_submission.submission_action_plan.empty():
extracted_gems = self.student_submission.submission_action_plan.get()
extracted_gem_types = sorted([g[‘type’] for g in extracted_gems])
extracted_gems_needed = sorted(list(set(extracted_gem_types).intersection(set(params[‘needed_gems’]))))
score = len(extracted_gems_needed) // float(len(params[‘needed_gems’]))
if not self.student_submission.submission_error.empty():
error_message = self.student_submission.submission_error.get()
result = self.FAIL_TEMPLATE.format(message=error_message, collected=extracted_gem_types, score=score, **params)
else:
result = self.SCORE_TEMPLATE.format(collected=extracted_gem_types, score=score, **params)
return result, score, error_message, extracted_gems_needed
def run_with_params(self, params: Dict):
“””Run test case using desired parameters.
Args:
params: a dictionary of test parameters.
“””
sys.stdout.write(f’~ ~ ~ Start of test case # {params[“test_case”]} ~ ~ ~\n\n’)
error_message = ”
if DEBUGGING_SINGLE_PROCESS:
try:
self.student_submission.execute_student_plan(params[‘area_map’],
params[‘needed_gems’],
params[‘max_distance’],
params[‘max_steering’],
params[‘robot_distance_noise’],
params[‘robot_bearing_noise’],
params[‘horizon’])
except Exception as exp:
error_message = exp
result, score, error_message, extracted_gems_needed = self.check_results(params, error_message)
else:
test_process = mproc.Process(target=self.student_submission.execute_student_plan,
args=(params[‘area_map’],
params[‘needed_gems’],
params[‘max_distance’],
params[‘max_steering’],
params[‘robot_distance_noise’],
params[‘robot_bearing_noise’],
params[‘horizon’]))
try:
test_process.start()
test_process.join(TIME_LIMIT)
self.student_submission.time_left.put(False) # notify child process to finish
test_process.join(1) # give child process a second to wrap things up
except Exception as exp:
error_message = exp
# If test still running then terminate
if test_process.is_alive():
test_process.terminate()
extracted_gem_types = []
error_message = (‘Test ended unexpectedly! No extracted gem data available’)
result = self.FAIL_TEMPLATE.format(message=error_message, collected=extracted_gem_types, **params)
score = 0.0
extracted_gems_needed = len(params[‘needed_gems’])
else:
result, score, error_message, extracted_gems_needed = self.check_results(params, error_message)
self.last_result = result
self.last_credit = score
self.assertFalse(error_message, error_message)
self.assertTrue(round(score, 7) == 1.0,
f”Only {len(extracted_gems_needed)} gems were extracted out of {len(params[‘needed_gems’])}”)
def test_case1(self):
self.run_with_params(GemFinderPartBTestCases.test_case_1)
def test_case2(self):
self.run_with_params(GemFinderPartBTestCases.test_case_2)
def test_case3(self):
self.run_with_params(GemFinderPartBTestCases.test_case_3)
def test_case4(self):
self.run_with_params(GemFinderPartBTestCases.test_case_4)
def test_case5(self):
self.run_with_params(GemFinderPartBTestCases.test_case_5)
def test_case6(self):
self.run_with_params(GemFinderPartBTestCases.test_case_6)
def test_case7(self):
self.run_with_params(GemFinderPartBTestCases.test_case_7)
def test_case8(self):
self.run_with_params(GemFinderPartBTestCases.test_case_8)
def test_case9(self):
self.run_with_params(GemFinderPartBTestCases.test_case_9)
def run_all(stream):
suites = map(lambda case: unittest.TestSuite(unittest.TestLoader().loadTestsFromTestCase(case)),
[PartATestCase,
PartBTestCase])
avgs = ()
for suite in suites:
result = GemFinderTestResult(stream=stream)
suite.run(result)
avgs += (result.avg_credit,)
stream.write(‘part A score: %.02f\n’ % (avgs[0] * 100))
stream.write(‘part B score: %.02f\n’ % (avgs[1] * 100))
weights = (PART_A_CREDIT, PART_B_CREDIT)
total_score = round(sum(avgs[i] * weights[i] for i in (0, 1)) * 100)
stream.write(‘score: %.02f\n’ % total_score)
# Only run all of the test automatically if this file was executed from the command line.
# Otherwise, let Nose/py.test do it’s own thing with the test cases.
if __name__ == “__main__”:
student_id = gem_finder.who_am_i()
if student_id:
run_all(sys.stdout)
else:
print(“Student ID not specified. Please fill in ‘whoami’ variable.”)
print(‘score: 0’)