Adapt New VLMs
You can add your new vlms to B2DVL. Before doing that, please check out our supported list first.
Supported list
model | key | single frame, front-cam | single frame, multi-cam | single frame, BEV | multi frame, front-cam | multi frame, multi-cam | multi frame, BEV |
---|---|---|---|---|---|---|---|
Qwen2.5VL | Qwen2.5VL | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
LLaVA_NeXT | LLaVANeXT | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
Gemma3 | Gemma | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
InternVL3 | InternVL | ✅ | ❌ | ✅ | ❌ | ❌ | ❌ |
Janus-pro | Janus-Pro | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
API (openai template) | api | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
You can use their model keys to use these models. Furthermore, you can use gt
as model name to use grounf truth directly, but it will be more convenient to use MINIMAL=1
since you don't need to set up VLM server if using this option.
Add a new VLM
Implement VLM interface
To add a new VLM, you may create a python file under B2DVL_Adapter/models
to implement corresponding interface.
Take Qwen2.5VL's interface as an example:
qwen25.py
from .VLMInterface import VLMInterface
from .interact_utils import get_image_descriptions, get_carla_image_descriptions
from PIL import Image
from transformers import Qwen2_5_VLForConditionalGeneration, AutoTokenizer, AutoProcessor
from qwen_vl_utils import process_vision_info
import torch
def image_template(image_path, use_base64):
return {
"type": "image",
"image": image_path if use_base64 else f"file://{image_path}"
}
class Qwen25Interface(VLMInterface):
def initialize(self, gpu_id: int, use_all_cameras: bool, no_history: bool,
input_window: int, frame_rate: int, model_path: str, use_bev: bool=False,
in_carla: bool=False, use_base64: bool=False):
print(f"Initializing Qwen2.5VL on GPU {gpu_id}...")
# Load model, weights, and allocate resources here
self.in_carla = in_carla
self.use_bev = use_bev
self.use_all_cameras = use_all_cameras
self.input_window = input_window
self.no_history = no_history
self.gpu_id = gpu_id
self.model_path = model_path
self.frame_rate = frame_rate
self.use_base64 = use_base64
torch.cuda.set_device(self.gpu_id)
self.device = torch.device(f"cuda:{self.gpu_id}")
self.multi_image_flag = (self.use_all_cameras or (self.no_history == False and self.input_window > 1))
self.model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
self.model_path, low_cpu_mem_usage=True
)
self.model.to(f"cuda:{gpu_id}")
self.processor = AutoProcessor.from_pretrained(self.model_path)
print(f"Qwen2.5VL loaded on GPU {gpu_id} successfully")
def get_image_descriptions(self, images_dict, image_frame_list, start_frame, end_frame):
"""
Returns content list about descriptions of all images in range (start_frame, end_frame]
"""
if self.in_carla:
return get_carla_image_descriptions(images_dict=images_dict,
image_frame_list=image_frame_list,
start_frame=start_frame,
end_frame=end_frame,
frame_rate=self.frame_rate,
template_func=image_template,
use_all_cameras=self.use_all_cameras,
use_bev=self.use_bev,
use_base64=self.use_base64)
else:
return get_image_descriptions(images_dict=images_dict,
image_frame_list=image_frame_list,
start_frame=start_frame,
end_frame=end_frame,
frame_rate=self.frame_rate,
template_func=image_template,
use_all_cameras=self.use_all_cameras,
use_base64=self.use_base64)
def interact(self, bubble, conversation):
torch.cuda.set_device(self.gpu_id)
self.device = torch.device(f"cuda:{self.gpu_id}")
input_conversation = []
images_list = bubble.get_full_images()
image_frame_list = sorted(images_list.keys())
start_frame = bubble.frame_number
current_frame = bubble.frame_number
if conversation is not None and len(conversation) > 0:
start_frame = conversation[0].frame_number
prev_frame = -1
# context
context_str = ""
if self.no_history == False:
# all in one context
context_str = ""
context_bb = {
"role": "context",
"content": []
}
for bb in conversation:
is_user = (bb.actor == "User")
if is_user and prev_frame < bb.frame_number:
image_content, _ = self.get_image_descriptions(images_list, image_frame_list,
prev_frame, bb.frame_number)
prev_frame = bb.frame_number
if context_str is not None and context_str != "":
frame_content = {
"type": "text",
"text": context_str
}
context_bb['content'].append(frame_content)
context_str = ""
context_bb['content'].extend(image_content)
header = "Q" if is_user else "A"
context_str += f"{header}(frame {bb.frame_number}): {bb.words}\n"
if context_str is not None and context_str != "":
frame_content = {
"type": "text",
"text": context_str
}
context_bb['content'].append(frame_content)
context_str = ""
input_conversation.append(context_bb)
bb_dict = {
"role": "user",
"content": []
}
if prev_frame < current_frame:
image_content, _ = self.get_image_descriptions(images_list, image_frame_list,
prev_frame, current_frame)
prev_frame = current_frame
bb_dict['content'].extend(image_content)
bb_dict['content'].append({
"type": "text",
"text": f"Q(frame {bubble.frame_number}): {bubble.get_full_words()}"
})
input_conversation.append(bb_dict)
input_image_files = []
for frame_number in image_frame_list:
if (frame_number < current_frame and self.no_history == False) or \
frame_number == current_frame:
if self.use_all_cameras:
for key in images_list[frame_number].keys():
if key in ['CAM_FRONT_CONCAT', 'CAM_BACK_CONCAT']:
input_image_files.append(images_list[frame_number][key])
else:
input_image_files.append(images_list[frame_number]['CAM_FRONT'])
prompts = self.processor.apply_chat_template(input_conversation, tokenize=False, add_generation_prompt=True)
image_inputs, video_inputs = process_vision_info(input_conversation)
inputs = self.processor(text=[prompts], images=image_inputs, videos=video_inputs,
padding=True, return_tensors="pt").to(self.device)
generated_ids = self.model.generate(**inputs, max_new_tokens=1024)
generated_ids_trimmed = [
out_ids[len(in_ids) :] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
output_text = self.processor.batch_decode(
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)
result = output_text
if isinstance(result, list):
result = result[0]
return result
Register new VLM
To make your new VLM usable, you need to register your VLM interface in B2DVL_Adapter/models/register.py
register.py
MODEL_MAP = {
"gt": "VLMInterface",
"LLaVANeXT": "LLaVANeXTInterface",
"Qwen2.5VL": "Qwen25Interface",
"api": "VLMAPIInterface",
"Gemma": "GemmaInterface",
"Janus-Pro": "JanusProInterface",
"InternVL": "InternVLInterface"
# Add other models as you need
}
def get_model_interface(model_name):
"""
Retrieve the appropriate model interface class based on the model name.
:param model_name: Name of the model.
:return: An instance of the corresponding model interface.
"""
if model_name not in MODEL_MAP:
raise ValueError(f"Model {model_name} is not supported. Available models: {list(MODEL_MAP.keys())}")
class_name = MODEL_MAP[model_name]
# Lazy import based on model_name
if model_name == "gt":
from .VLMInterface import VLMInterface
return VLMInterface()
elif model_name == "LLaVANeXT":
from .LLaVA_NeXT import LLaVANeXTInterface
return LLaVANeXTInterface()
elif model_name == "Qwen2.5VL":
from .qwen25 import Qwen25Interface
return Qwen25Interface()
elif model_name == "api":
from .vlm_api import VLMAPIInterface
return VLMAPIInterface()
elif model_name == "Gemma":
from .gemma import GemmaInterface
return GemmaInterface()
elif model_name == "Janus-Pro":
from .janus import JanusProInterface
return JanusProInterface()
elif model_name == "InternVL":
from .intern import InternVLInterface
return InternVLInterface()
# As new interface as you need