Source code for lightrft.datasets.rapidata
import os
import copy
from typing import List, Dict, Any, Tuple
from loguru import logger
from .utils import BaseDataHandler, get_task_instructions
[docs]class RapidataT2VHandler(BaseDataHandler):
"""
Data Handler for Rapidata text-to-video human preferences dataset.
Support datasets:
- Rapidata/text-2-video-human-preferences-pika2.2
- Rapidata/text-2-image-human-preferences-veo3
- Rapidata/text-2-video-human-preferences-wan2.1
This dataset contains pairs of videos (video1, video2) generated from a prompt.
It includes weighted scores for Preference, Coherence, and Alignment.
- 'A' means video1 (messages0) is preferred.
- 'B' means video2 (messages1) is preferred.
- 'C' means they are equal or tied.
Dataset Repo: https://huggingface.co/Rapidata/datasets
"""
task_type = "text-to-video"
[docs] def load_data(self, path: str) -> List[Dict[str, Any]]:
"""
Loads data from parquet file.
:param path: Path to the parquet file
:type path: str
:return: List of samples with 'data_root' attached
:rtype: List[Dict[str, Any]]
**Example:**
.. code-block:: python
handler = RapidataT2VHandler()
data = handler.load_data("path/to/Rapidata/data.parquet")
"""
raw_data = []
import pyarrow.parquet as pq
data_table = pq.read_table(path)
raw_data = [{
name: col[i].as_py()
for name, col in zip(data_table.column_names, data_table.itercolumns())
}
for i in range(data_table.num_rows)]
data_root = os.path.dirname(os.path.dirname(path))
for item in raw_data:
item['data_root'] = data_root
logger.info(f"Loaded {len(raw_data)} samples from {path}")
return raw_data
[docs] def get_media_info(self, item: Dict[str, Any]) -> Dict[str, Dict[str, str]]:
"""
Extract path info for the two videos.
"""
data_root = item["data_root"]
if not data_root:
raise ValueError("Missing 'data_root' in item. Cannot resolve video paths.")
if 'file_name1' not in item or 'file_name2' not in item:
raise ValueError("Item missing 'file_name1' or 'file_name2'.")
# Try both "videos" and "Videos"
video_dir = "videos"
if not os.path.exists(os.path.join(data_root, video_dir)):
if os.path.exists(os.path.join(data_root, "Videos")):
video_dir = "Videos"
full_path1 = os.path.join(data_root, video_dir, item['file_name1'])
full_path2 = os.path.join(data_root, video_dir, item['file_name2'])
return {'video1': {'video_local_path': full_path1}, 'video2': {'video_local_path': full_path2}}
def _get_label(self, val1: float, val2: float) -> str:
"""
Helper to determine preference label based on two scores.
"""
if val1 is None or val2 is None:
return "C"
if val1 > val2:
return "A"
elif val1 < val2:
return "B"
else:
return "C" # TIE
[docs] def parse_item(self, item: Dict[str, Any], media_content: Dict[str, Any],
config: Dict[str, Any]) -> Tuple[List[Dict], List[Dict], Dict]:
"""
Parse a single Rapidata-T2V data item into message pairs for ranking.
:param item: The raw data item
:type item: Dict[str, Any]
:param media_content: Loaded visual content
:type media_content: Dict[str, Any]
:param config: Configuration for task instructions, max_pixels, and fps
:type config: Dict[str, Any]
:return: A tuple of (messages0, messages1, metadata)
:rtype: Tuple[List[Dict], List[Dict], Dict]
**Example:**
.. code-block:: python
msg0, msg1, other = handler.parse_item(item, media_content, config)
"""
video1 = media_content['video1']
video2 = media_content['video2']
if not all([video1, video2]):
raise ValueError("Missing visual content for 'video1' or 'video2'.")
# Get generation prompt from data item
video_gen_prompt = item["prompt"]
# Get system prompts from config
task_instruction_template = config["task_instruction"]
task_instruction = task_instruction_template.format(prompt=video_gen_prompt)
# Get max_pixels from config
max_pixels = config["max_pixels"]
# Get FPS from config
fps = config["video_fps"]
# Build messages
messages0 = [{
"role": "system",
"content": copy.deepcopy(task_instruction)
}, {
"role": "user",
"content": [{
"type": "video",
"video": video1,
"fps": fps,
"max_pixels": max_pixels
}]
}]
messages1 = [{
"role": "system",
"content": copy.deepcopy(task_instruction)
}, {
"role": "user",
"content": [{
"type": "video",
"video": video2,
"fps": fps,
"max_pixels": max_pixels
}]
}]
# Get human preference labels and total scores based on weighted metrics
metrics = ['Preference', 'Coherence', 'Alignment']
labels = {
f"{m.lower()}_label": self._get_label(
item.get(f'weighted_results1_{m}'), item.get(f'weighted_results2_{m}')
)
for m in metrics
}
score1 = sum(item.get(f'weighted_results1_{m}') or 0.0 for m in metrics)
score2 = sum(item.get(f'weighted_results2_{m}') or 0.0 for m in metrics)
other = {
"preference": self._get_label(score1, score2),
**labels,
"source": item['source'],
"task_type": self.task_type,
}
return messages0, messages1, other
[docs]class RapidataI2VHandler(RapidataT2VHandler):
"""
Data Handler for Rapidata image-to-video human preferences dataset.
Support datasets:
- Rapidata/image-2-video-human-preferences-seedance-1-pro
Dataset Repo: https://huggingface.co/Rapidata/datasets
"""
task_type = "image-to-video"
def __init__(self):
super().__init__()
[docs] def get_media_info(self, item: Dict[str, Any]) -> Dict[str, Dict[str, str]]:
"""
Extract media info (paths) for the two videos and bytes for initial image.
:param item: A data item from load_data
:type item: Dict[str, Any]
:return: Dict containing local paths for videos and bytes for 'init_image'
:rtype: Dict[str, Dict[str, Any]]
**Example:**
.. code-block:: python
info = handler.get_media_info(item)
"""
data_root = item["data_root"]
if not data_root:
raise ValueError("Missing 'data_root' in item. Cannot resolve video paths.")
# Get video paths
if 'file_name1' not in item or 'file_name2' not in item:
raise ValueError("Item missing 'file_name1' or 'file_name2'.")
def process_path(fname, root_path):
"""
Process filename to construct local path from URL or relative path.
:param fname: Filename or URL.
:type fname: str
:param root_path: Root directory path.
:type root_path: str
:return: Local file path.
:rtype: str
"""
if fname.startswith("https"):
fname = fname.split("/")
model_name = fname[-2]
video_name = fname[-1]
return os.path.join(root_path, "videos", model_name, video_name)
elif "hailuo" in fname:
return os.path.join(root_path, "videos", "hailuo-02", fname)
else:
return os.path.join(root_path, "videos", "hailuo-02", "marey", fname)
full_path1 = process_path(item['file_name1'], data_root)
full_path2 = process_path(item['file_name2'], data_root)
# Get initial image bytes
assert 'prompt_asset' in item, "Missing initial image in item."
img_bytes = item['prompt_asset']['bytes']
return {
'video1': {
'video_local_path': full_path1
},
'video2': {
'video_local_path': full_path2
},
'init_image': {
'image_bytes': img_bytes
}
}
[docs] def parse_item(self, item: Dict[str, Any], media_content: Dict[str, Any],
config: Dict[str, Any]) -> Tuple[List[Dict], List[Dict], Dict]:
"""
Parse a single Rapidata-I2V data item into message pairs for ranking.
:param item: The raw data item
:type item: Dict[str, Any]
:param media_content: Loaded visual content (videos and images)
:type media_content: Dict[str, Any]
:param config: Configuration for task instructions, max_pixels, and fps
:type config: Dict[str, Any]
:return: A tuple of (messages0, messages1, metadata)
:rtype: Tuple[List[Dict], List[Dict], Dict]
**Example:**
.. code-block:: python
msg0, msg1, other = handler.parse_item(item, media_content, config)
"""
video1 = media_content['video1']
video2 = media_content['video2']
init_image = media_content['init_image']
if not all([video1, video2, init_image]):
raise ValueError("Missing visual content for 'video1' or 'video2' or 'init_image'.")
# Get generation prompt from data item
prompt_text = item["prompt"]
# Get system prompts from config
task_instruction_template = config["task_instruction"]
task_instruction = task_instruction_template.format(prompt=prompt_text)
# Get max_pixels from config
max_pixels = config["max_pixels"]
# Get FPS from config
fps = config["video_fps"]
# Build messages
messages0 = [{
"role": "system",
"content": copy.deepcopy(task_instruction)
}, {
"role": "user",
"content": [{
"type": "image",
"image": copy.deepcopy(init_image),
"max_pixels": max_pixels
}, {
"type": "video",
"video": video1,
"fps": fps,
"max_pixels": max_pixels
}]
}]
messages1 = [{
"role": "system",
"content": copy.deepcopy(task_instruction)
}, {
"role": "user",
"content": [{
"type": "image",
"image": copy.deepcopy(init_image),
"max_pixels": max_pixels
}, {
"type": "video",
"video": video2,
"fps": fps,
"max_pixels": max_pixels
}]
}]
# Get human preference labels and total scores based on weighted metrics
metrics = ['Preference', 'Coherence', 'Alignment']
labels = {
f"{m.lower()}_label": self._get_label(
item.get(f'weighted_results1_{m}'), item.get(f'weighted_results2_{m}')
)
for m in metrics
}
score1 = sum(item.get(f'weighted_results1_{m}') or 0.0 for m in metrics)
score2 = sum(item.get(f'weighted_results2_{m}') or 0.0 for m in metrics)
other = {
"preference": self._get_label(score1, score2),
**labels,
"source": item['source'],
"task_type": self.task_type,
}
return messages0, messages1, other
[docs]class RapidataT2VPairHandler(RapidataT2VHandler):
"""
Data Handler for Rapidata text-to-video human preferences dataset in pairwise format.
"""
def __init__(self):
super().__init__()
[docs] def parse_item(self, item: Dict[str, Any], media_content: Dict[str, Any],
config: Dict[str, Any]) -> Tuple[List[Dict], Dict]:
"""
Parse a text-to-video data item into pairwise messages and metadata.
:param item: The raw data item
:type item: Dict[str, Any]
:param media_content: Loaded visual content
:type media_content: Dict[str, Any]
:param config: Configuration for task instructions, max_pixels, and fps
:type config: Dict[str, Any]
:return: A tuple of (messages, metadata)
:rtype: Tuple[List[Dict], Dict]
**Example:**
.. code-block:: python
messages, other = handler.parse_item(item, media_content, config)
"""
video1 = media_content['video1']
video2 = media_content['video2']
if not all([video1, video2]):
raise ValueError("Missing visual content for 'video1' or 'video2'.")
# Get generation prompt from data item
video_gen_prompt = item["prompt"]
# Get system prompts from config
task_instruction_template = get_task_instructions(self, config)
task_instruction = task_instruction_template.format(prompt=video_gen_prompt)
# Get max_pixels from config
max_pixels = config["max_pixels"]
# Get FPS from config
fps = config["video_fps"]
# Build messages
messages = [{
"role": "system",
"content": [{
"type": "text",
"text": task_instruction
}]
}, {
"role": "user",
"content": [
{
"type": "text",
"text": "The following is the first video."
},
{
"type": "video",
"video": video1,
"fps": fps,
"max_pixels": max_pixels
},
{
"type": "text",
"text": "The following is the second video."
},
{
"type": "video",
"video": video2,
"fps": fps,
"max_pixels": max_pixels
},
]
}]
# Get human preference labels and total scores based on weighted metrics
metrics = ['Preference', 'Coherence', 'Alignment']
labels = {
f"{m.lower()}_label": self._get_label(
item.get(f'weighted_results1_{m}'), item.get(f'weighted_results2_{m}')
)
for m in metrics
}
score1 = sum(item.get(f'weighted_results1_{m}') or 0.0 for m in metrics)
score2 = sum(item.get(f'weighted_results2_{m}') or 0.0 for m in metrics)
other = {
"preference": self._get_label(score1, score2),
**labels,
"source": item['source'],
"task_type": self.task_type,
"reward_rule_label": "general",
}
return messages, other
[docs]class RapidataI2VPairHandler(RapidataI2VHandler):
"""
Data Handler for Rapidata image-to-video human preferences dataset in pairwise format.
"""
task_type = "image-to-video"
def __init__(self):
super().__init__()
[docs] def parse_item(self, item: Dict[str, Any], media_content: Dict[str, Any],
config: Dict[str, Any]) -> Tuple[List[Dict], Dict]:
"""
Parse an image-to-video data item into pairwise messages and metadata.
:param item: The raw data item
:type item: Dict[str, Any]
:param media_content: Loaded visual content
:type media_content: Dict[str, Any]
:param config: Configuration for task instructions, max_pixels, and fps
:type config: Dict[str, Any]
:return: A tuple of (messages, metadata)
:rtype: Tuple[List[Dict], Dict]
**Example:**
.. code-block:: python
messages, other = handler.parse_item(item, media_content, config)
"""
video1 = media_content['video1']
video2 = media_content['video2']
init_image = media_content['init_image']
if not all([video1, video2, init_image]):
raise ValueError("Missing visual content for 'video1' or 'video2' or 'init_image'.")
# Get generation prompt from data item
prompt_text = item["prompt"]
# Get system prompts from config
task_instruction_template = get_task_instructions(self, config)
task_instruction = task_instruction_template.format(prompt=prompt_text)
# Get FPS from config
fps = config["video_fps"]
max_pixels = config["max_pixels"]
# Build messages
messages = [{
"role": "system",
"content": [{
"type": "text",
"text": task_instruction
}]
}, {
"role": "user",
"content": [
{
"type": "text",
"text": "Reference Image:"
},
{
"type": "image",
"image": init_image,
"max_pixels": max_pixels
},
{
"type": "text",
"text": "The following is the first video."
},
{
"type": "video",
"video": video1,
"fps": fps,
"max_pixels": max_pixels
},
{
"type": "text",
"text": "The following is the second video."
},
{
"type": "video",
"video": video2,
"fps": fps,
"max_pixels": max_pixels
},
]
}]
# Get human preference labels and total scores based on weighted metrics
metrics = ['Preference', 'Coherence', 'Alignment']
labels = {
f"{m.lower()}_label": self._get_label(
item.get(f'weighted_results1_{m}'), item.get(f'weighted_results2_{m}')
)
for m in metrics
}
score1 = sum(item.get(f'weighted_results1_{m}') or 0.0 for m in metrics)
score2 = sum(item.get(f'weighted_results2_{m}') or 0.0 for m in metrics)
other = {
"preference": self._get_label(score1, score2),
**labels,
"source": item['source'],
"task_type": self.task_type,
"reward_rule_label": "general",
}
return messages, other