使用 Carla 和 Python 的自动驾驶汽车第 4 部分 —— 强化学习代理
在我們的自動駕駛汽車的第四部分,Carla, Python, TensorFlow,和強化學習項目,我們將為我們的實際代理編碼。在前一篇教程中,我們研究了環境類,我們的代理將與之交互。
在考慮如何創建代理時,我們還必須考慮模型本身。假設你已經完成了強化學習教程(你最好這樣做,否則事情會讓你感到非常困惑),你知道代理所采取的每一步都伴隨著一個合理的預測(取決于探索/epsilon),而且還伴隨著一個調整!這意味著我們要同時進行訓練和預測,但我們的代理必須獲得盡可能高的FPS(幀/秒)。
為了實現這一點,我們可以使用多處理或線程。線程處理允許我們保持事情相當簡單。以后,我可能至少會在某個時候為這個任務開放一些多處理代碼,但現在,它是線程化的。
import glob import os import sys import random import time import numpy as np import cv2 import mathtry:sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % (sys.version_info.major,sys.version_info.minor,'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0]) except IndexError:pass import carlaSHOW_PREVIEW = False IM_WIDTH = 640 IM_HEIGHT = 480 SECONDS_PER_EPISODE = 10class CarEnv:SHOW_CAM = SHOW_PREVIEWSTEER_AMT = 1.0im_width = IM_WIDTHim_height = IM_HEIGHTfront_camera = Nonedef __init__(self):self.client = carla.Client("localhost", 2000)self.client.set_timeout(2.0)self.world = self.client.get_world()self.blueprint_library = self.world.get_blueprint_library()self.model_3 = self.blueprint_library.filter("model3")[0]def reset(self):self.collision_hist = []self.actor_list = []self.transform = random.choice(self.world.get_map().get_spawn_points())self.vehicle = self.world.spawn_actor(self.model_3, self.transform)self.actor_list.append(self.vehicle)self.rgb_cam = self.blueprint_library.find('sensor.camera.rgb')self.rgb_cam.set_attribute("image_size_x", f"{self.im_width}")self.rgb_cam.set_attribute("image_size_y", f"{self.im_height}")self.rgb_cam.set_attribute("fov", f"110")transform = carla.Transform(carla.Location(x=2.5, z=0.7))self.sensor = self.world.spawn_actor(self.rgb_cam, transform, attach_to=self.vehicle)self.actor_list.append(self.sensor)self.sensor.listen(lambda data: self.process_img(data))self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0))time.sleep(4)colsensor = self.blueprint_library.find("sensor.other.collision")self.colsensor = self.world.spawn_actor(colsensor, transform, attach_to=self.vehicle)self.actor_list.append(self.colsensor)self.colsensor.listen(lambda event: self.collision_data(event))while self.front_camera is None:time.sleep(0.01)self.episode_start = time.time()self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0))return self.front_cameradef collision_data(self, event):self.collision_hist.append(event)def process_img(self, image):i = np.array(image.raw_data)#print(i.shape)i2 = i.reshape((self.im_height, self.im_width, 4))i3 = i2[:, :, :3]if self.SHOW_CAM:cv2.imshow("", i3)cv2.waitKey(1)self.front_camera = i3def step(self, action):if action == 0:self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=-1*self.STEER_AMT))elif action == 1:self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer= 0))elif action == 2:self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=1*self.STEER_AMT))v = self.vehicle.get_velocity()kmh = int(3.6 * math.sqrt(v.x**2 + v.y**2 + v.z**2))if len(self.collision_hist) != 0:done = Truereward = -200elif kmh < 50:done = Falsereward = -1else:done = Falsereward = 1if self.episode_start + SECONDS_PER_EPISODE < time.time():done = Truereturn self.front_camera, reward, done, None現在,我們將創建一個新類,DQNAgent類。我們將從:
class DQNAgent:def __init__(self):self.model = self.create_model()self.target_model = self.create_model()self.target_model.set_weights(self.model.get_weights())這和強化學習教程中的概念是一樣的,我們有一個不斷進化的主網絡,然后是目標網絡,我們更新每n個東西,其中n是你想要的東西,比如步驟或情節。
當我們訓練時,我們從回放記憶中隨機選擇的數據中進行訓練:
self.replay_memory = deque(maxlen=REPLAY_MEMORY_SIZE)出于與之前(RL教程)相同的原因,我們將修改TensorBoard:
self.tensorboard = ModifiedTensorBoard(log_dir=f"logs/{MODEL_NAME}-{int(time.time())}")現在,我們可以通過設置一些最終值來包裝init方法:
self.target_update_counter = 0 # will track when it's time to update the target modelself.graph = tf.get_default_graph()self.terminate = False # Should we quit?self.last_logged_episode = 0self.training_initialized = False # waiting for TF to get rolling我們要用self.training_initialized用于跟蹤TensorFlow何時準備就緒。當一個模型開始的時候,第一次的預測/調整會花費額外的時間,所以我們會先傳遞一些無意義的信息,讓我們的模型能夠真正開始運行。
現在讓我們創建我們的模型:
def create_model(self):base_model = Xception(weights=None, include_top=False, input_shape=(IM_HEIGHT, IM_WIDTH,3))x = base_model.outputx = GlobalAveragePooling2D()(x)predictions = Dense(3, activation="linear")(x)model = Model(inputs=base_model.input, outputs=predictions)model.compile(loss="mse", optimizer=Adam(lr=0.001), metrics=["accuracy"])return model這里,我們將使用預先制作好的Xception模型,但是您也可以制作其他模型,或者導入一個不同的模型。請注意,我們將GlobalAveragePooling添加到我們的輸出層,同時顯然還添加了3個神經元輸出,這是代理要采取的每個可能的動作。讓我們為這個方法做一些必需的導入:
from keras.applications.xception import Xception from keras.layers import Dense, GlobalAveragePooling2D from keras.optimizers import Adam from keras.models import Model在我們的DQNAgent中,我們需要一個快速的方法來更新重放內存:
def update_replay_memory(self, transition):# transition = (current_state, action, reward, new_state, done)self.replay_memory.append(transition)只要我們的方法這么簡單,我們可能甚至不需要一個方法來做這個,但我先把它留到現在。
現在來談談訓練方法。首先,我們只想在回放記憶中樣本最少的情況下進行訓練:
def train(self):if len(self.replay_memory) < MIN_REPLAY_MEMORY_SIZE:return因為我們有很多像這樣的常數,讓我們繼續,把它們一次性塞進去,以免我忘記:
REPLAY_MEMORY_SIZE = 5_000 MIN_REPLAY_MEMORY_SIZE = 1_000 MINIBATCH_SIZE = 16 PREDICTION_BATCH_SIZE = 1 TRAINING_BATCH_SIZE = MINIBATCH_SIZE // 4 UPDATE_TARGET_EVERY = 5 MODEL_NAME = "Xception"MEMORY_FRACTION = 0.8 MIN_REWARD = -200EPISODES = 100DISCOUNT = 0.99 epsilon = 1 EPSILON_DECAY = 0.95 ## 0.9975 99975 MIN_EPSILON = 0.001AGGREGATE_STATS_EVERY = 10把它們放在腳本的頂部,顯然,在所有類/函數/方法等之外。如果您在任何時候對內容的走向感到困惑,您可以查看本教程的結尾,查看到此為止的代碼。
回到我們的訓練方法:
def train(self):if len(self.replay_memory) < MIN_REPLAY_MEMORY_SIZE:return如果我們沒有足夠的樣品,我們就會回來,然后結束。如果我們做到了,我們就會開始訓練。首先,我們需要隨機抽取一小批:
minibatch = random.sample(self.replay_memory, MINIBATCH_SIZE)一旦我們有了我們的小批處理,我們想要獲取我們當前和未來的q值:
current_states = np.array([transition[0] for transition in minibatch])/255with self.graph.as_default():current_qs_list = self.model.predict(current_states, PREDICTION_BATCH_SIZE)new_current_states = np.array([transition[3] for transition in minibatch])/255with self.graph.as_default():future_qs_list = self.target_model.predict(new_current_states, PREDICTION_BATCH_SIZE)回憶一下過渡是: transition = (current_state, action, reward, new_state, done)
現在,我們創建輸入(X)和輸出(y):
X = []y = []for index, (current_state, action, reward, new_state, done) in enumerate(minibatch):if not done:max_future_q = np.max(future_qs_list[index])new_q = reward + DISCOUNT * max_future_qelse:new_q = rewardcurrent_qs = current_qs_list[index]current_qs[action] = new_qX.append(current_state)y.append(current_qs)這里沒有什么新奇的東西,幾乎與我們的強化學習教程(DQN)相同。
log_this_step = Falseif self.tensorboard.step > self.last_logged_episode:log_this_step = Trueself.last_log_episode = self.tensorboard.step我們只嘗試記錄每一集,而不是實際的訓練步驟,所以我們將使用上面的方法來跟蹤。
接下來,我們將配合:
with self.graph.as_default():self.model.fit(np.array(X)/255, np.array(y), batch_size=TRAINING_BATCH_SIZE, verbose=0, shuffle=False, callbacks=[self.tensorboard] if log_this_step else None)注意,只有當log_this_step為真時,我們才會設置tensorboard回調。如果它是假的,我們仍然適合,我們只是不會登錄到TensorBoard。
接下來,我們想繼續跟蹤日志記錄:
if log_this_step:self.target_update_counter += 1最后,我們來看看是不是該更新我們的target_model:
if self.target_update_counter > UPDATE_TARGET_EVERY:self.target_model.set_weights(self.model.get_weights())self.target_update_counter = 0現在我們只需要另外2個方法,我們就完成了代理類。首先,我們需要一個方法來獲得q值(基本上是為了進行預測)
def get_qs(self, state):return self.model.predict(np.array(state).reshape(-1, *state.shape)/255)[0]最后,我們只需要進行培訓:
def train_in_loop(self):X = np.random.uniform(size=(1, IM_HEIGHT, IM_WIDTH, 3)).astype(np.float32)y = np.random.uniform(size=(1, 3)).astype(np.float32)with self.graph.as_default():self.model.fit(X,y, verbose=False, batch_size=1)self.training_initialized = True首先,我們使用一些隨機數據,如上面的初始化,然后我們開始無限循環:
while True:if self.terminate:returnself.train()time.sleep(0.01)這是我們的DQNAgent類。完整的代碼到這一點:
import glob import os import sys import random import time import numpy as np import cv2 import math from collections import deque from keras.applications.xception import Xception from keras.layers import Dense, GlobalAveragePooling2D from keras.optimizers import Adam from keras.models import Modeltry:sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % (sys.version_info.major,sys.version_info.minor,'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0]) except IndexError:pass import carlaSHOW_PREVIEW = False IM_WIDTH = 640 IM_HEIGHT = 480 SECONDS_PER_EPISODE = 10 REPLAY_MEMORY_SIZE = 5_000 MIN_REPLAY_MEMORY_SIZE = 1_000 MINIBATCH_SIZE = 16 PREDICTION_BATCH_SIZE = 1 TRAINING_BATCH_SIZE = MINIBATCH_SIZE // 4 UPDATE_TARGET_EVERY = 5 MODEL_NAME = "Xception"MEMORY_FRACTION = 0.8 MIN_REWARD = -200EPISODES = 100DISCOUNT = 0.99 epsilon = 1 EPSILON_DECAY = 0.95 ## 0.9975 99975 MIN_EPSILON = 0.001AGGREGATE_STATS_EVERY = 10class CarEnv:SHOW_CAM = SHOW_PREVIEWSTEER_AMT = 1.0im_width = IM_WIDTHim_height = IM_HEIGHTfront_camera = Nonedef __init__(self):self.client = carla.Client("localhost", 2000)self.client.set_timeout(2.0)self.world = self.client.get_world()self.blueprint_library = self.world.get_blueprint_library()self.model_3 = self.blueprint_library.filter("model3")[0]def reset(self):self.collision_hist = []self.actor_list = []self.transform = random.choice(self.world.get_map().get_spawn_points())self.vehicle = self.world.spawn_actor(self.model_3, self.transform)self.actor_list.append(self.vehicle)self.rgb_cam = self.blueprint_library.find('sensor.camera.rgb')self.rgb_cam.set_attribute("image_size_x", f"{self.im_width}")self.rgb_cam.set_attribute("image_size_y", f"{self.im_height}")self.rgb_cam.set_attribute("fov", f"110")transform = carla.Transform(carla.Location(x=2.5, z=0.7))self.sensor = self.world.spawn_actor(self.rgb_cam, transform, attach_to=self.vehicle)self.actor_list.append(self.sensor)self.sensor.listen(lambda data: self.process_img(data))self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0))time.sleep(4)colsensor = self.blueprint_library.find("sensor.other.collision")self.colsensor = self.world.spawn_actor(colsensor, transform, attach_to=self.vehicle)self.actor_list.append(self.colsensor)self.colsensor.listen(lambda event: self.collision_data(event))while self.front_camera is None:time.sleep(0.01)self.episode_start = time.time()self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0))return self.front_cameradef collision_data(self, event):self.collision_hist.append(event)def process_img(self, image):i = np.array(image.raw_data)#print(i.shape)i2 = i.reshape((self.im_height, self.im_width, 4))i3 = i2[:, :, :3]if self.SHOW_CAM:cv2.imshow("", i3)cv2.waitKey(1)self.front_camera = i3def step(self, action):if action == 0:self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=-1*self.STEER_AMT))elif action == 1:self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer= 0))elif action == 2:self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=1*self.STEER_AMT))v = self.vehicle.get_velocity()kmh = int(3.6 * math.sqrt(v.x**2 + v.y**2 + v.z**2))if len(self.collision_hist) != 0:done = Truereward = -200elif kmh < 50:done = Falsereward = -1else:done = Falsereward = 1if self.episode_start + SECONDS_PER_EPISODE < time.time():done = Truereturn self.front_camera, reward, done, Noneclass DQNAgent:def __init__(self):self.model = self.create_model()self.target_model = self.create_model()self.target_model.set_weights(self.model.get_weights())self.replay_memory = deque(maxlen=REPLAY_MEMORY_SIZE)self.tensorboard = ModifiedTensorBoard(log_dir=f"logs/{MODEL_NAME}-{int(time.time())}")self.target_update_counter = 0self.graph = tf.get_default_graph()self.terminate = Falseself.last_logged_episode = 0self.training_initialized = Falsedef create_model(self):base_model = Xception(weights=None, include_top=False, input_shape=(IM_HEIGHT, IM_WIDTH,3))x = base_model.outputx = GlobalAveragePooling2D()(x)predictions = Dense(3, activation="linear")(x)model = Model(inputs=base_model.input, outputs=predictions)model.compile(loss="mse", optimizer=Adam(lr=0.001), metrics=["accuracy"])return modeldef update_replay_memory(self, transition):# transition = (current_state, action, reward, new_state, done)self.replay_memory.append(transition)def train(self):if len(self.replay_memory) < MIN_REPLAY_MEMORY_SIZE:returnminibatch = random.sample(self.replay_memory, MINIBATCH_SIZE)current_states = np.array([transition[0] for transition in minibatch])/255with self.graph.as_default():current_qs_list = self.model.predict(current_states, PREDICTION_BATCH_SIZE)new_current_states = np.array([transition[3] for transition in minibatch])/255with self.graph.as_default():future_qs_list = self.target_model.predict(new_current_states, PREDICTION_BATCH_SIZE)X = []y = []for index, (current_state, action, reward, new_state, done) in enumerate(minibatch):if not done:max_future_q = np.max(future_qs_list[index])new_q = reward + DISCOUNT * max_future_qelse:new_q = rewardcurrent_qs = current_qs_list[index]current_qs[action] = new_qX.append(current_state)y.append(current_qs)log_this_step = Falseif self.tensorboard.step > self.last_logged_episode:log_this_step = Trueself.last_log_episode = self.tensorboard.stepwith self.graph.as_default():self.model.fit(np.array(X)/255, np.array(y), batch_size=TRAINING_BATCH_SIZE, verbose=0, shuffle=False, callbacks=[self.tensorboard] if log_this_step else None)if log_this_step:self.target_update_counter += 1if self.target_update_counter > UPDATE_TARGET_EVERY:self.target_model.set_weights(self.model.get_weights())self.target_update_counter = 0def get_qs(self, state):return self.model.predict(np.array(state).reshape(-1, *state.shape)/255)[0]def train_in_loop(self):X = np.random.uniform(size=(1, IM_HEIGHT, IM_WIDTH, 3)).astype(np.float32)y = np.random.uniform(size=(1, 3)).astype(np.float32)with self.graph.as_default():self.model.fit(X,y, verbose=False, batch_size=1)self.training_initialized = Truewhile True:if self.terminate:returnself.train()time.sleep(0.01)現在,在下一個教程中,我們將對事情進行最后的潤色,并實際運行看看我們得到了什么!
總結
以上是生活随笔為你收集整理的使用 Carla 和 Python 的自动驾驶汽车第 4 部分 —— 强化学习代理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: LeetCode Algorithm 3
- 下一篇: 使用 Carla 和 Python 的自