Source code for lightrft.datasets.omnirewardbench
import os
import copy
from typing import List, Dict, Any, Tuple
from loguru import logger
from .utils import BaseDataHandler, get_task_instructions
[docs]class OmniRewardBenchT2IHandler(BaseDataHandler):
"""
Data Handler for OmniRewardBench text-to-image human preferences benchmark.
Process for scalar reward model training of pairwise-ranking task.
Paper: https://huggingface.co/papers/2510.23451
Dataset Repo: https://huggingface.co/datasets/HongbangYuan/OmniRewardBench
"""
task_type = "text-to-image"
[docs] def load_data(self, path: str) -> List[Dict[str, Any]]:
"""
Loads data from parquet file.
:param path: Path to the parquet file
:type path: str
:return: List of samples with 'data_root' attached
:rtype: List[Dict[str, Any]]
**Example:**
.. code-block:: python
handler = OmniRewardBenchT2IHandler()
data = handler.load_data("path/to/OmniRewardBench/data.parquet")
"""
raw_data = []
import pyarrow.parquet as pq
data_table = pq.read_table(path)
raw_data = [{
name: col[i].as_py()
for name, col in zip(data_table.column_names, data_table.itercolumns())
}
for i in range(data_table.num_rows)]
data_root = os.path.dirname(os.path.dirname(path))
for item in raw_data:
item['data_root'] = data_root
logger.info(f"Loaded {len(raw_data)} samples from {path}")
return raw_data
[docs] def get_media_info(self, item: Dict[str, Any]) -> Dict[str, Dict[str, str]]:
"""
Extract media info (paths) for the two images.
:param item: A data item from load_data
:type item: Dict[str, Any]
:return: Dict containing local paths for 'image1' and 'image2'
:rtype: Dict[str, Dict[str, str]]
**Example:**
.. code-block:: python
info = handler.get_media_info(item)
"""
data_root = item['data_root']
if not data_root:
raise ValueError("Missing 'data_root' in item. Cannot resolve video paths.")
full_path1 = os.path.join(data_root, "media_data", item['response1_path'])
full_path2 = os.path.join(data_root, "media_data", item['response2_path'])
return {'image1': {'image_local_path': full_path1}, 'image2': {'image_local_path': full_path2}}
def _get_label(self, choice: str) -> str:
"""
Helper to determine preference label.
"""
if choice == "response1":
return "A"
elif choice == "response2":
return "B"
else:
return "C" # TIE
[docs] def parse_item(self, item: Dict[str, Any], media_content: Dict[str, Any],
config: Dict[str, Any]) -> Tuple[List[Dict], List[Dict], Dict]:
"""
Parse a data item from OmniRewardBench-T2I into messages and metadata.
:param item: The raw data item
:type item: Dict[str, Any]
:param media_content: Loaded media content with 'image1' and 'image2' keys.
:type media_content: Dict[str, Any]
:param config: Configuration for task instructions and max_pixels
:type config: Dict[str, Any]
:return: A tuple of (messages0, messages1, metadata)
:rtype: Tuple[List[Dict], List[Dict], Dict]
**Example:**
.. code-block:: python
msg0, msg1, other = handler.parse_item(item, media_content, config)
"""
image1 = media_content['image1']
image2 = media_content['image2']
if not all([image1, image2]):
raise ValueError("Missing visual content for 'image1' or 'image2'.")
# Get generation prompt from data item
gen_prompt = item["prompt"]
# Get system prompts from config
task_instruction_template = config["task_instruction"]
task_instruction = task_instruction_template.format(prompt=gen_prompt)
# criteria = item["criteria"]
# Get max_pixels from config
max_pixels = config["max_pixels"]
# Build messages
messages0 = [
{
"role": "system",
"content": copy.deepcopy(task_instruction)
},
# {"role": "system", "content": f"Please give your evaluation considering the following criteria: {criteria}."}, # noqa: E501
{
"role": "user",
"content": [{
"type": "image",
"image": image1,
"max_pixels": max_pixels
}]
}
]
messages1 = [
{
"role": "system",
"content": copy.deepcopy(task_instruction)
},
# {"role": "system", "content": f"Please give your evaluation considering the following criteria: {criteria}."}, # noqa: E501
{
"role": "user",
"content": [{
"type": "image",
"image": image2,
"max_pixels": max_pixels
}]
}
]
# Get human preference labels based on weighted scores
pref_label = self._get_label(item["criteria_preference"])
other = {
"preference": pref_label,
"task_type": self.task_type,
"criteria": item["criteria"],
"criteria_preference": item["criteria_preference"],
"id": item["id"],
"prompt": gen_prompt,
"source": item['source'],
"image1_path": item['response1_path'],
"image2_path": item['response2_path'],
"model1": item['model1'],
"model2": item['model2'],
}
return messages0, messages1, other
[docs]class OmniRewardBenchT2VHandler(OmniRewardBenchT2IHandler):
"""
Data Handler for OmniRewardBench text-to-video human preferences benchmark.
Process for scalar reward model training of pairwise-ranking task.
Paper: https://huggingface.co/papers/2510.23451
Dataset Repo: https://huggingface.co/datasets/HongbangYuan/OmniRewardBench
"""
task_type = "text-to-video"
[docs] def get_media_info(self, item: Dict[str, Any]) -> Dict[str, Dict[str, str]]:
"""
Extract media info (paths) for the two videos.
:param item: A data item from load_data
:type item: Dict[str, Any]
:return: Dict containing local paths for 'video1' and 'video2'
:rtype: Dict[str, Dict[str, str]]
**Example:**
.. code-block:: python
info = handler.get_media_info(item)
"""
data_root = item['data_root']
if not data_root:
raise ValueError("Missing 'data_root' in item. Cannot resolve video paths.")
full_path1 = os.path.join(data_root, "media_data", item['response1'])
full_path2 = os.path.join(data_root, "media_data", item['response2'])
return {'video1': {'video_local_path': full_path1}, 'video2': {'video_local_path': full_path2}}
[docs] def parse_item(self, item: Dict[str, Any], media_content: Dict[str, Any],
config: Dict[str, Any]) -> Tuple[List[Dict], List[Dict], Dict]:
"""
Parse a data item from OmniRewardBench-T2V into messages and metadata.
:param item: The raw data item
:type item: Dict[str, Any]
:param media_content: Loaded visual content
:type media_content: Dict[str, Any]
:param config: Configuration for task instructions, max_pixels, and fps
:type config: Dict[str, Any]
:return: A tuple of (messages0, messages1, metadata)
:rtype: Tuple[List[Dict], List[Dict], Dict]
**Example:**
.. code-block:: python
msg0, msg1, other = handler.parse_item(item, media_content, config)
"""
video1 = media_content['video1']
video2 = media_content['video2']
if not all([video1, video2]):
raise ValueError("Missing visual content for 'video1' or 'video2'.")
# Get generation prompt from data item
gen_prompt = item["prompt"]
# Get system prompts from config
task_instruction_template = config["task_instruction"]
task_instruction = task_instruction_template.format(prompt=gen_prompt)
# Get max_pixels from config
max_pixels = config["max_pixels"]
# Get FPS from config
fps = config["video_fps"]
# Build messages
messages0 = [{
"role": "system",
"content": copy.deepcopy(task_instruction)
}, {
"role": "user",
"content": [{
"type": "text",
"text": "Please evaluate the following video based on the given task instruction."
}, {
"type": "video",
"video": video1,
"fps": fps,
"max_pixels": max_pixels
}]
}]
messages1 = [{
"role": "system",
"content": copy.deepcopy(task_instruction)
}, {
"role": "user",
"content": [{
"type": "text",
"text": "Please evaluate the following video based on the given task instruction."
}, {
"type": "video",
"video": video2,
"fps": fps,
"max_pixels": max_pixels
}]
}]
# Get human preference labels based on weighted scores
pref_label = self._get_label(item["criteria_preference"])
other = {
"preference": pref_label,
"task_type": self.task_type,
"criteria": item["criteria"],
"criteria_preference": item["criteria_preference"],
"id": item["id"],
"prompt": gen_prompt,
"source": item['source'],
"video1_path": item['response1'],
"video2_path": item['response2'],
"model1": item['model1'],
"model2": item['model2'],
}
return messages0, messages1, other
[docs]class OmniRewardBenchT2AHandler(OmniRewardBenchT2IHandler):
"""
Data Handler for OmniRewardBench text-to-audio human preferences benchmark.
Process for scalar reward model training of pairwise-ranking task.
Paper: https://huggingface.co/papers/2510.23451
Dataset Repo: https://huggingface.co/datasets/HongbangYuan/OmniRewardBench
"""
task_type = "text-to-audio"
[docs] def get_media_info(self, item: Dict[str, Any]) -> Dict[str, Dict[str, str]]:
"""
Extract media info (paths) for the two audios.
:param item: A data item from load_data
:type item: Dict[str, Any]
:return: Dict containing local paths for 'audio1' and 'audio2'
:rtype: Dict[str, Dict[str, str]]
**Example:**
.. code-block:: python
info = handler.get_media_info(item)
"""
data_root = item['data_root']
if not data_root:
raise ValueError("Missing 'data_root' in item. Cannot resolve audio paths.")
full_path1 = os.path.join(data_root, "media_data", item['response1_path'])
full_path2 = os.path.join(data_root, "media_data", item['response2_path'])
return {'audio1': {'audio_local_path': full_path1}, 'audio2': {'audio_local_path': full_path2}}
[docs] def parse_item(self, item: Dict[str, Any], media_content: Dict[str, Any],
config: Dict[str, Any]) -> Tuple[List[Dict], List[Dict], Dict]:
"""
Parse a data item from OmniRewardBench-T2A into messages and metadata.
:param item: The raw data item
:type item: Dict[str, Any]
:param media_content: Loaded visual content
:type media_content: Dict[str, Any]
:param config: Configuration for task instructions
:type config: Dict[str, Any]
:return: A tuple of (messages0, messages1, metadata)
:rtype: Tuple[List[Dict], List[Dict], Dict]
**Example:**
.. code-block:: python
msg0, msg1, other = handler.parse_item(item, media_content, config)
"""
audio1 = media_content['audio1']
audio2 = media_content['audio2']
if not all([audio1, audio2]):
raise ValueError("Missing visual content for 'audio1' or 'audio2'.")
# Get generation prompt from data item
gen_prompt = item["prompt"]
# Get system prompts from config
task_instruction_template = config["task_instruction"]
task_instruction = task_instruction_template.format(prompt=gen_prompt)
# Build messages
messages0 = [{
"role": "system",
"content": copy.deepcopy(task_instruction)
}, {
"role": "user",
"content": [{
"type": "text",
"text": "Please evaluate the following audio based on the given task instruction."
}, {
"type": "audio",
"audio": audio1
}]
}]
messages1 = [{
"role": "system",
"content": copy.deepcopy(task_instruction)
}, {
"role": "user",
"content": [{
"type": "text",
"text": "Please evaluate the following audio based on the given task instruction."
}, {
"type": "audio",
"audio": audio2
}]
}]
# Get human preference labels based on weighted scores
pref_label = self._get_label(item["criteria_preference"])
other = {
"preference": pref_label,
"task_type": self.task_type,
"criteria": item["criteria"],
"criteria_preference": item["criteria_preference"],
"id": item["id"],
"prompt": gen_prompt,
"source": item['source'],
"audio1_path": item['response1_path'],
"audio2_path": item['response2_path'],
"model1": item['model1'],
"model2": item['model2'],
}
return messages0, messages1, other
[docs]class OmniRewardBenchT2IGRMHandler(OmniRewardBenchT2IHandler):
"""
Data Handler for OmniRewardBench text-to-image human preferences benchmark.
Process for generative reward model training of pair-wise ranking task.
Paper: https://huggingface.co/papers/2510.23451
Dataset Repo: https://huggingface.co/datasets/HongbangYuan/OmniRewardBench
"""
[docs] def parse_item(self, item: Dict[str, Any], media_content: Dict[str, Any],
config: Dict[str, Any]) -> Tuple[List[Dict], List[Dict], Dict]:
"""
Parse a data item from OmniRewardBench-T2I into one message and metadata.
For generative reward model training in pair-wise ranking task.
:param item: The raw data item
:type item: Dict[str, Any]
:param media_content: Loaded visual content
:type media_content: Dict[str, Any]
:param config: Configuration for task instructions and max_pixels
:type config: Dict[str, Any]
:return: A tuple of (messages, metadata)
:rtype: Tuple[List[Dict], Dict]
**Example:**
.. code-block:: python
messages, other = handler.parse_item(item, media_content, config)
"""
image1 = media_content['image1']
image2 = media_content['image2']
if not all([image1, image2]):
raise ValueError("Missing visual content for 'image1' or 'image2'.")
# Get generation prompt from data item
gen_prompt = item["prompt"]
# Get system prompts from config
task_instruction_template = config["task_instruction"]
task_instruction = task_instruction_template.format(prompt=gen_prompt)
criteria = item["criteria"]
# Get max_pixels from config
max_pixels = config["max_pixels"]
# Build messages
messages = [
{
"role": "system",
"content": task_instruction
},
{
"role": "system",
"content": f"Please give your evaluation considering the following criteria: {criteria}."
},
{
"role": "user",
"content": [{
"type": "text",
"text": "**Image 1:**"
}, {
"type": "image",
"image": image1,
"max_pixels": max_pixels
}]
},
{
"role": "user",
"content": [{
"type": "text",
"text": "**Image 2:**"
}, {
"type": "image",
"image": image2,
"max_pixels": max_pixels
}]
},
]
# Get human preference labels based on weighted scores
pref_label = self._get_label(item["criteria_preference"])
other = {
"preference": pref_label,
"task_type": self.task_type,
"criteria": item["criteria"],
"criteria_preference": item["criteria_preference"],
"id": item["id"],
"prompt": gen_prompt,
"source": item['source'],
"image1_path": item['response1_path'],
"image2_path": item['response2_path'],
"model1": item['model1'],
"model2": item['model2'],
}
return messages, other
[docs]class OmniRewardBenchT2IPairHandler(OmniRewardBenchT2IHandler):
"""
Data Handler for OmniRewardBench text-to-image human preferences benchmark.
Process for generative reward model on pair-wise ranking task.
Paper: https://huggingface.co/papers/2510.23451
Dataset Repo: https://huggingface.co/datasets/HongbangYuan/OmniRewardBench
"""
[docs] def parse_item(self, item: Dict[str, Any], media_content: Dict[str, Any],
config: Dict[str, Any]) -> Tuple[List[Dict], Dict]:
"""
Parse a data item into generative messages and metadata.
:param item: The raw data item
:type item: Dict[str, Any]
:param media_content: Loaded visual content
:type media_content: Dict[str, Any]
:param config: Configuration for task instructions
:type config: Dict[str, Any]
:return: A tuple of (messages, metadata)
:rtype: Tuple[List[Dict], Dict]
**Example:**
.. code-block:: python
messages, other = handler.parse_item(item, media_content, config)
"""
image1 = media_content['image1']
image2 = media_content['image2']
if not all([image1, image2]):
raise ValueError("Missing visual content for 'image1' or 'image2'.")
# Get generation prompt from data item
prompt_text = item["prompt"]
# Get system prompts from config
task_instruction_template = get_task_instructions(self, config)
task_instruction = task_instruction_template.format(prompt=prompt_text)
criteria = item["criteria"]
# Get max_pixels from config
max_pixels = config["max_pixels"]
# Build messages
messages = [
{
"role": "system",
"content": task_instruction
},
{
"role": "system",
"content": f"Please give your evaluation considering the following criteria: {criteria}."
},
{
"role": "user",
"content": [{
"type": "text",
"text": "**Image 1:**"
}, {
"type": "image",
"image": image1,
"max_pixels": max_pixels,
}]
},
{
"role": "user",
"content": [{
"type": "text",
"text": "**Image 2:**"
}, {
"type": "image",
"image": image2,
"max_pixels": max_pixels,
}]
},
]
# Get human preference labels based on weighted scores
pref_label = self._get_label(item["criteria_preference"])
other = {
"preference": pref_label,
"reward_rule_label": "general",
"task_type": self.task_type,
"criteria": item["criteria"],
"criteria_preference": item["criteria_preference"],
"id": item["id"],
"prompt": prompt_text,
"source": item['source'],
"image1_path": item['response1_path'],
"image2_path": item['response2_path'],
"model1": item['model1'],
"model2": item['model2'],
}
return messages, other
[docs]class OmniRewardBenchT2VPairHandler(OmniRewardBenchT2VHandler):
"""
Data Handler for OmniRewardBench text-to-video human preferences benchmark.
Process for generative reward model on pair-wise ranking task.
Paper: https://huggingface.co/papers/2510.23451
Dataset Repo: https://huggingface.co/datasets/HongbangYuan/OmniRewardBench
"""
[docs] def parse_item(self, item: Dict[str, Any], media_content: Dict[str, Any],
config: Dict[str, Any]) -> Tuple[List[Dict], Dict]:
"""
Parse a data item into generative messages and metadata.
:param item: The raw data item
:type item: Dict[str, Any]
:param media_content: Loaded visual content
:type media_content: Dict[str, Any]
:param config: Configuration for task instructions, max_pixels, and fps
:type config: Dict[str, Any]
:return: A tuple of (messages, metadata)
:rtype: Tuple[List[Dict], Dict]
**Example:**
.. code-block:: python
messages, other = handler.parse_item(item, media_content, config)
"""
video1 = media_content['video1']
video2 = media_content['video2']
if not all([video1, video2]):
raise ValueError("Missing visual content for 'video1' or 'video2'.")
# Get generation prompt from data item
gen_prompt = item["prompt"]
# Get system prompts from config
task_instruction_template = get_task_instructions(self, config)
task_instruction = task_instruction_template.format(prompt=gen_prompt)
# Get FPS and max_pixels from config
fps = config["video_fps"]
max_pixels = config["max_pixels"]
# Build messages
messages = [{
"role": "system",
"content": copy.deepcopy(task_instruction)
}, {
"role": "user",
"content": [{
"type": "text",
"text": "**Video 1:**"
}, {
"type": "video",
"video": video1,
"fps": fps,
"max_pixels": max_pixels
}]
}, {
"role": "user",
"content": [{
"type": "text",
"text": "**Video 2:**"
}, {
"type": "video",
"video": video2,
"fps": fps,
"max_pixels": max_pixels
}]
}]
# Get human preference labels based on weighted scores
pref_label = self._get_label(item["criteria_preference"])
other = {
"preference": pref_label,
"reward_rule_label": "general",
"task_type": self.task_type,
"criteria": item["criteria"],
"criteria_preference": item["criteria_preference"],
"id": item["id"],
"prompt": gen_prompt,
"source": item['source'],
"video1_path": item['response1'],
"video2_path": item['response2'],
"model1": item['model1'],
"model2": item['model2'],
}
return messages, other