Worker

MuZeroCollector

class lzero.worker.muzero_collector.MuZeroCollector(collect_print_freq: int = 100, env: BaseEnvManager = None, policy: namedtuple = None, tb_logger: SummaryWriter = None, exp_name: str | None = 'default_experiment', instance_name: str | None = 'collector', policy_config: policy_config = None)[source]

Bases: ISerialCollector

Overview:

The Episode Collector for MCTS+RL algorithms, including MuZero, EfficientZero, Sampled EfficientZero, Gumbel MuZero. It manages the data collection process for training these algorithms using a serial mechanism.

Interfaces:

__init__, reset, reset_env, reset_policy, _reset_stat, envstep, __del__, _compute_priorities, pad_and_save_last_trajectory, collect, _output_log, close

Properties:

envstep

__init__(collect_print_freq: int = 100, env: BaseEnvManager = None, policy: namedtuple = None, tb_logger: SummaryWriter = None, exp_name: str | None = 'default_experiment', instance_name: str | None = 'collector', policy_config: policy_config = None) None[source]
Overview:

Initialize the MuZeroCollector with the given parameters.

Parameters:
  • collect_print_freq (-) – Frequency (in training steps) at which to print collection information.

  • env (-) – Instance of the subclass of vectorized environment manager.

  • policy (-) – namedtuple of the collection mode policy API.

  • tb_logger (-) – TensorBoard logger instance.

  • exp_name (-) – Name of the experiment, used for logging and saving purposes.

  • instance_name (-) – Unique identifier for this collector instance.

  • policy_config (-) – Configuration object for the policy.

_abc_impl = <_abc._abc_data object>
_compute_priorities(i: int, pred_values_lst: List[float], search_values_lst: List[float]) ndarray[source]
Overview:

Compute the priorities for transitions based on prediction and search value discrepancies.

Parameters:
  • i (-) – Index of the values in the list to compute the priority for.

  • pred_values_lst (-) – List of predicted values.

  • search_values_lst (-) – List of search values obtained from MCTS.

Returns:

Array of computed priorities.

Return type:

  • priorities (np.ndarray)

_output_log(train_iter: int) None[source]
Overview:

Log the collector’s data and output the log information.

Parameters:

train_iter (-) – Current training iteration number for logging context.

_reset_stat(env_id: int) None[source]
Overview:

Reset the collector’s state. Including reset the traj_buffer, obs_pool, policy_output_pool and env_info. Reset these states according to env_id. You can refer to base_serial_collector to get more messages.

Parameters:

env_id (-) – the id where we need to reset the collector’s state

close() None[source]
Overview:

Close the collector. If end_flag is False, close the environment, flush the tb_logger and close the tb_logger.

collect(n_episode: int | None = None, train_iter: int = 0, policy_kwargs: dict | None = None, collect_with_pure_policy: bool = False) List[Any][source]
Overview:

Collect n_episode episodes of data with policy_kwargs, trained for train_iter iterations.

Parameters:
  • n_episode (-) – Number of episodes to collect.

  • train_iter (-) – Number of training iterations completed so far.

  • policy_kwargs (-) – Additional keyword arguments for the policy.

  • collect_with_pure_policy (-) – Whether to collect data using pure policy without MCTS.

Returns:

Collected data in the form of a list.

Return type:

  • return_data (List[Any])

config = {}
classmethod default_config() EasyDict
Overview:

Get collector’s default config. We merge collector’s default config with other default configs and user’s config to get the final config.

Returns:

(EasyDict): collector’s default config

Return type:

cfg

property envstep: int
Overview:

Get the total number of environment steps collected.

Returns:

Total number of environment steps collected.

Return type:

  • envstep (int)

pad_and_save_last_trajectory(i: int, last_game_segments: List[GameSegment], last_game_priorities: List[ndarray], game_segments: List[GameSegment], done: ndarray) None[source]
Overview:

Save the game segment to the pool if the current game is finished, padding it if necessary.

Parameters:
  • i (-) – Index of the current game segment.

  • last_game_segments (-) – List of the last game segments to be padded and saved.

  • last_game_priorities (-) – List of priorities of the last game segments.

  • game_segments (-) – List of the current game segments.

  • done (-) – Array indicating whether each game is done.

Note

(last_game_segments[i].obs_segment[-4:][j] == game_segments[i].obs_segment[:4][j]).all() is True

reset(_policy: namedtuple | None = None, _env: BaseEnvManager | None = None) None[source]
Overview:

Reset the collector with the given policy and/or environment. If _env is None, reset the old environment. If _env is not None, replace the old environment in the collector with the new passed in environment and launch. If _policy is None, reset the old policy. If _policy is not None, replace the old policy in the collector with the new passed in policy.

Parameters:
  • policy (-) – the api namedtuple of collect_mode policy

  • env (-) – instance of the subclass of vectorized env_manager(BaseEnvManager)

reset_env(_env: BaseEnvManager | None = None) None[source]
Overview:

Reset or replace the environment managed by this collector. If _env is None, reset the old environment. If _env is not None, replace the old environment in the collector with the new passed in environment and launch.

Parameters:

env (-) – New environment to manage, if provided.

reset_policy(_policy: namedtuple | None = None) None[source]
Overview:

Reset or replace the policy used by this collector. If _policy is None, reset the old policy. If _policy is not None, replace the old policy in the collector with the new passed in policy.

Parameters:

policy (-) – the api namedtuple of collect_mode policy

MuZeroEvaluator

class lzero.worker.muzero_evaluator.MuZeroEvaluator(eval_freq: int = 1000, n_evaluator_episode: int = 3, stop_value: int = 1000000.0, env: BaseEnvManager = None, policy: namedtuple = None, tb_logger: SummaryWriter = None, exp_name: str | None = 'default_experiment', instance_name: str | None = 'evaluator', policy_config: policy_config = None)[source]

Bases: ISerialEvaluator

Overview:

The Evaluator class for MCTS+RL algorithms, such as MuZero, EfficientZero, and Sampled EfficientZero.

Interfaces:

__init__, reset, reset_policy, reset_env, close, should_eval, eval

Properties:

env, policy

__init__(eval_freq: int = 1000, n_evaluator_episode: int = 3, stop_value: int = 1000000.0, env: BaseEnvManager = None, policy: namedtuple = None, tb_logger: SummaryWriter = None, exp_name: str | None = 'default_experiment', instance_name: str | None = 'evaluator', policy_config: policy_config = None) None[source]
Overview:

Initialize the evaluator with configuration settings for various components such as logger helper and timer.

Parameters:
  • eval_freq (-) – Evaluation frequency in terms of training steps.

  • n_evaluator_episode (-) – Number of episodes to evaluate in total.

  • stop_value (-) – A reward threshold above which the training is considered converged.

  • env (-) – An optional instance of a subclass of BaseEnvManager.

  • policy (-) – An optional API namedtuple defining the policy for evaluation.

  • tb_logger (-) – Optional TensorBoard logger instance.

  • exp_name (-) – Name of the experiment, used to determine output directory.

  • instance_name (-) – Name of this evaluator instance.

  • policy_config (-) – Optional configuration for the game policy.

_abc_impl = <_abc._abc_data object>
close() None[source]
Overview:

Close the evaluator, the environment, flush and close the TensorBoard logger if applicable.

config = {'eval_freq': 50}
classmethod default_config() EasyDict[source]
Overview:

Retrieve the default configuration for the evaluator by merging evaluator-specific defaults with other defaults and any user-provided configuration.

Returns:

The default configuration for the evaluator.

Return type:

  • cfg (EasyDict)

eval(save_ckpt_fn: Callable = None, train_iter: int = -1, envstep: int = -1, n_episode: int | None = None, return_trajectory: bool = False) Tuple[bool, float][source]
Overview:

Evaluate the current policy, storing the best policy if it achieves the highest historical reward.

Parameters:
  • save_ckpt_fn (-) – Optional function to save a checkpoint when a new best reward is achieved.

  • train_iter (-) – The current training iteration count.

  • envstep (-) – The current environment step count.

  • n_episode (-) – Optional number of evaluation episodes; defaults to the evaluator’s setting.

  • return_trajectory (-) – Return the evaluated trajectory game_segments in episode_info if True.

Returns:

Indicates whether the training can be stopped based on the stop value. - episode_info (Dict[str, Any]): A dictionary containing information about the evaluation episodes.

Return type:

  • stop_flag (bool)

reset(_policy: namedtuple | None = None, _env: BaseEnvManager | None = None) None[source]
Overview:

Reset both the policy and environment for the evaluator, optionally replacing them. If _env is None, reset the old environment. If _env is not None, replace the old environment in the evaluator with the new passed in environment and launch. If _policy is None, reset the old policy. If _policy is not None, replace the old policy in the evaluator with the new passed in policy.

Parameters:
  • _policy (-) – An optional new policy namedtuple to replace the existing one.

  • _env (-) – An optional new environment instance to replace the existing one.

reset_env(_env: BaseEnvManager | None = None) None[source]
Overview:

Reset the environment for the evaluator, optionally replacing it with a new environment. If _env is None, reset the old environment. If _env is not None, replace the old environment in the evaluator with the new passed in environment and launch.

Parameters:

_env (-) – An optional new environment instance to replace the existing one.

reset_policy(_policy: namedtuple | None = None) None[source]
Overview:

Reset the policy for the evaluator, optionally replacing it with a new policy. If _policy is None, reset the old policy. If _policy is not None, replace the old policy in the evaluator with the new passed in policy.

Parameters:

_policy (-) – An optional new policy namedtuple to replace the existing one.

should_eval(train_iter: int) bool[source]
Overview:

Determine whether to initiate evaluation based on the training iteration count and evaluation frequency.

Parameters:

train_iter (-) – The current count of training iterations.

Returns:

True if evaluation should be initiated, otherwise False.

Return type:

  • (bool)