Skip to content

Results

When a models is evaluated in MTEB it produces results. These results consist of:

  • TaskResult: Result for a single task
  • ModelResult: Result for a model on a set of tasks
  • BenchmarkResults: Result for a set of models on a set of tasks

In normal use these come up when running a model:

# ...
models_results = mteb.evaluate(model, tasks)
type(models_results) # mteb.results.ModelResults

task_result = models_results.task_results
type(models_results) # mteb.results.TaskResult

Results cache

mteb.cache.ResultCache

Class to handle the local cache of MTEB results.

Examples:

>>> from mteb.cache import ResultCache
>>> cache = ResultCache(cache_path="~/.cache/mteb") # default
>>> cache.download_from_remote() # download the latest results from the remote repository
>>> result = cache.load_results("task_name", "model_name")
Source code in mteb/cache.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
class ResultCache:
    """Class to handle the local cache of MTEB results.

    Examples:
        >>> from mteb.cache import ResultCache
        >>> cache = ResultCache(cache_path="~/.cache/mteb") # default
        >>> cache.download_from_remote() # download the latest results from the remote repository
        >>> result = cache.load_results("task_name", "model_name")
    """

    cache_path: Path

    def __init__(self, cache_path: Path | str | None = None) -> None:
        if cache_path is not None:
            self.cache_path = Path(cache_path)
        else:
            self.cache_path = self.default_cache_path
        self.cache_path.mkdir(parents=True, exist_ok=True)

    @property
    def has_remote(self) -> bool:
        """Check if the remote results repository exists in the cache directory.

        Returns:
            True if the remote results repository exists, False otherwise.
        """
        return (self.cache_path / "remote").exists()

    def get_task_result_path(
        self,
        task_name: str,
        model_name: str | ModelMeta,
        model_revision: str | None = None,
        remote: bool = False,
    ) -> Path:
        """Get the path to the results of a specific task for a specific model and revision.

        Args:
            task_name: The name of the task.
            model_name: The name of the model as a valid directory name or a ModelMeta object.
            model_revision: The revision of the model. Must be specified if model_name is a string.
            remote: If True, it will return the path to the remote results repository, otherwise it will return the path to the local results repository.

        Returns:
            The path to the results of the task.
        """
        results_folder = (
            self.cache_path / "results"
            if not remote
            else self.cache_path / "remote" / "results"
        )

        if isinstance(model_name, ModelMeta):
            if model_revision is not None:
                logger.warning(
                    "model_revision is ignored when model_name is a ModelMeta object"
                )
            model_revision = model_name.revision
            model_name = model_name.model_name_as_path()
        elif isinstance(model_name, str):
            model_name = model_name.replace("/", "__").replace(" ", "_")

        model_path = results_folder / model_name

        if model_revision is None:
            logger.warning(
                "model_revision is not specified, attempting to load the latest revision. To disable this behavior, specify model_revision explicitly."
            )
            # get revs from paths
            revisions = [p for p in model_path.glob("*") if p.is_dir()]
            if not revisions:
                model_revision = "no_revision_available"
            else:
                if len(revisions) > 1:
                    logger.warning(
                        f"Multiple revisions found for model {model_name}: {revisions}. Using the latest one (according to latest edit)."
                    )
                    # sort folder by latest edit time
                    revisions.sort(key=lambda p: p.stat().st_mtime, reverse=True)
                model_revision = revisions[0].name

        return model_path / model_revision / f"{task_name}.json"

    def load_task_result(
        self,
        task_name: str,
        model_name: str | ModelMeta,
        model_revision: str | None = None,
        raise_if_not_found: bool = False,
        prioritize_remote: bool = False,
    ) -> TaskResult | None:
        """Load the results from the local cache directory.

        Args:
            task_name: The name of the task.
            model_name: The name of the model as a valid directory name or a ModelMeta object.
            model_revision: The revision of the model. Must be specified if model_name is a string.
            raise_if_not_found: If True, raise an error if the results are not found.
            prioritize_remote: If True, it will first try to load the results from the remote repository, if available.

        Returns:
            The results of the task, or None if not found.
        """
        result_path = self.get_task_result_path(
            model_name=model_name,
            model_revision=model_revision,
            task_name=task_name,
        )

        if self.has_remote:
            remote_result_path = self.get_task_result_path(
                model_name=model_name,
                model_revision=model_revision,
                task_name=task_name,
                remote=True,
            )
            if remote_result_path.exists() and prioritize_remote:
                result_path = remote_result_path
            elif not result_path.exists():
                result_path = remote_result_path

        if not result_path.exists():
            msg = f"Results for {model_name} on {task_name} not found in {result_path}"
            if raise_if_not_found:
                raise FileNotFoundError(msg)
            logger.debug(msg)
            return None

        return TaskResult.from_disk(result_path)

    def save_to_cache(
        self,
        task_result: TaskResult,
        model_name: str | ModelMeta,
        model_revision: str | None = None,
    ) -> None:
        """Save the task results to the local cache directory in the location {model_name}/{model_revision}/{task_name}.json.

        Where model_name is a path-normalized model name.
        In addition we also save a model_meta.json in the revision folder to preserve the model metadata.

        Args:
            task_result: The results of the task.
            model_name: The name of the model as a valid directory name or a ModelMeta object.
            model_revision: The revision of the model. Must be specified if model_name is a string.
        """
        result_path = self.get_task_result_path(
            model_name=model_name,
            model_revision=model_revision,
            task_name=task_result.task_name,
        )
        result_path.parent.mkdir(parents=True, exist_ok=True)
        task_result.to_disk(result_path)

        model_meta_path = result_path.parent / "model_meta.json"
        if isinstance(model_name, ModelMeta):
            meta = model_name
            with model_meta_path.open("w") as f:
                json.dump(meta.to_dict(), f, default=str)

    @property
    def default_cache_path(self) -> Path:
        """Get the local cache directory for MTEB results.

        Returns:
            The path to the local cache directory.
        """
        default_cache_directory = Path.home() / ".cache" / "mteb"

        _cache_directory = os.environ.get("MTEB_CACHE", None)
        cache_directory = (
            Path(_cache_directory) if _cache_directory else default_cache_directory
        )
        return cache_directory

    def download_from_remote(
        self,
        remote: str = "https://github.com/embeddings-benchmark/results",
        download_latest: bool = True,
    ) -> Path:
        """Downloads the latest version of the results repository from GitHub to a local cache directory. Required git to be installed.

        Args:
            remote: The URL of the results repository on GitHub.
            download_latest: If True it will download the latest version of the repository, otherwise it will only update the existing repository.

        Returns:
            The path to the local cache directory.
        """
        if not self.cache_path.exists() and not self.cache_path.is_dir():
            logger.info(
                f"Cache directory {self.cache_path} does not exist, creating it"
            )

        # if "results" folder already exists update it
        results_directory = self.cache_path / "remote"

        if results_directory.exists():
            # check repository in the directory is the same as the remote
            remote_url = subprocess.run(
                ["git", "config", "--get", "remote.origin.url"],
                cwd=results_directory,
                capture_output=True,
                text=True,
            ).stdout.strip()
            if remote_url != remote:
                msg = (
                    f"remote repository '{remote}' does not match the one in {results_directory},  which is '{remote_url}'."
                    + " Please remove the directory and try again."
                )
                raise ValueError(msg)

            if download_latest:
                logger.info(
                    f"remote repository already exists in {results_directory}, updating it using git pull"
                )
                subprocess.run(["git", "pull"], cwd=results_directory)
            else:
                logger.debug(
                    f"Results repository already exists in {results_directory}, skipping update, set download_latest=True to update it"
                )
            return results_directory

        logger.info(
            f"No results repository found in {results_directory}, cloning it from {remote}"
        )

        subprocess.run(["git", "clone", remote, "remote"], cwd=self.cache_path)

        return results_directory

    def clear_cache(self) -> None:
        """Clear the local cache directory."""
        if self.cache_path.exists() and self.cache_path.is_dir():
            shutil.rmtree(self.cache_path)
            logger.info(f"Cache directory {self.cache_path} cleared.")
        else:
            logger.warning(f"Cache directory {self.cache_path} does not exist.")

    def __repr__(self) -> str:
        return f"ResultCache(cache_path={self.cache_path})"

    def get_cache_paths(
        self,
        models: Sequence[str] | Sequence[ModelMeta] | None = None,
        tasks: Sequence[str] | Sequence[AbsTask] | None = None,
        require_model_meta: bool = True,
        include_remote: bool = True,
    ) -> list[Path]:
        """Get all paths to result JSON files in the cache directory.

        These paths can then be used to fetch task results, like:
        ```python
        for path in paths:
            task_result = TaskResult.from_disk(path)
        ```

        Args:
            models: A list of model names or ModelMeta objects to filter the paths.
            tasks: A list of task names to filter the paths.
            require_model_meta: If True, only return paths that have a model_meta.json file.
            include_remote: If True, include remote results in the returned paths.

        Returns:
            A list of paths in the cache directory.

        Examples:
            >>> from mteb.cache import ResultCache
            >>> cache = ResultCache()
            >>>
            >>> # Get all cache paths
            >>> paths = cache.get_cache_paths()
            >>>
            >>> # Get all cache paths for a specific task
            >>> paths = cache.get_cache_paths(tasks=["STS12"])
            >>>
            >>> # Get all cache paths for a specific model
            >>> paths = cache.get_cache_paths(models=["sentence-transformers/all-MiniLM-L6-v2"])
            >>>
            >>> # Get all cache paths for a specific model and revision
            >>> model_meta = mteb.get_model_meta("sentence-transformers/all-MiniLM-L6-v2")
            >>> paths = cache.get_cache_paths(models=[model_meta])
        """
        cache_paths = [
            p
            for p in (self.cache_path / "results").glob("**/*.json")
            if p.name != "model_meta.json"
        ]
        if include_remote:
            cache_paths += [
                p
                for p in (self.cache_path / "remote" / "results").glob("**/*.json")
                if p.name != "model_meta.json"
            ]

        cache_paths = self._filter_paths_by_model_and_revision(
            cache_paths,
            models=models,
        )
        cache_paths = self._filter_paths_by_task(cache_paths, tasks=tasks)

        if require_model_meta:
            cache_paths = [
                p for p in cache_paths if (p.parent / "model_meta.json").exists()
            ]
        return cache_paths

    def get_models(
        self,
        tasks: Sequence[str] | None = None,
        require_model_meta: bool = True,
        include_remote: bool = True,
    ) -> list[tuple[ModelName, Revision]]:
        """Get all models in the cache directory.

        Args:
            tasks: A list of task names to filter the models.
            require_model_meta: If True, only return models that have a model_meta.json file.
            include_remote: If True, include remote results in the returned models.

        Returns:
            A list of tuples containing the model name and revision.
        """
        cache_paths = self.get_cache_paths(
            tasks=tasks,
            require_model_meta=require_model_meta,
            include_remote=include_remote,
        )
        models = [(p.parent.parent.name, p.parent.name) for p in cache_paths]
        return list(set(models))

    def get_task_names(
        self,
        models: list[str] | list[ModelMeta] | None = None,
        require_model_meta: bool = True,
        include_remote: bool = True,
    ) -> list[str]:
        """Get all task names in the cache directory.

        Args:
            models: A list of model names or ModelMeta objects to filter the task names.
            require_model_meta: If True, only return task names that have a model_meta.json file
            include_remote: If True, include remote results in the returned task names.

        Returns:
            A list of task names in the cache directory.
        """
        cache_paths = self.get_cache_paths(
            models=models,
            require_model_meta=require_model_meta,
            include_remote=include_remote,
        )
        tasks = [p.stem for p in cache_paths]
        return list(set(tasks))

    @staticmethod
    def _get_model_name_and_revision_from_path(
        revision_path: Path,
    ) -> tuple[ModelName, Revision]:
        model_meta = revision_path / "model_meta.json"
        model_path = revision_path.parent

        if not model_meta.exists():
            logger.debug(
                f"model_meta.json not found in {revision_path}, extracting model_name and revision from the path"
            )
            model_name = model_path.name.replace("__", "/")
            revision = revision_path.name
            return model_name, revision
        with model_meta.open("r") as f:
            model_meta_json = json.load(f)
            model_name = model_meta_json["name"]
            revision = model_meta_json["revision"]
        return model_name, revision

    @staticmethod
    def _filter_paths_by_model_and_revision(
        paths: list[Path],
        models: Sequence[str] | Sequence[ModelMeta] | None = None,
    ) -> list[Path]:
        """Filter a list of paths by model name and optional revision.

        Returns:
            A list of paths that match the specified model names and revisions.
        """
        if not models:
            return paths

        if isinstance(models[0], ModelMeta):
            models = cast(list[ModelMeta], models)
            name_and_revision = {
                (m.model_name_as_path(), m.revision or "no_revision_available")
                for m in models
            }
            return [
                p
                for p in paths
                if (p.parent.parent.name, p.parent.name) in name_and_revision
            ]

        model_names = {m.replace("/", "__").replace(" ", "_") for m in models}
        return [p for p in paths if p.parent.parent.name in model_names]

    @staticmethod
    def _filter_paths_by_task(
        paths: list[Path],
        tasks: Sequence[str] | Sequence[AbsTask] | None = None,
    ) -> list[Path]:
        if tasks is not None:
            task_names = set()

            for task in tasks:
                if isinstance(task, AbsTask):
                    task_names.add(task.metadata.name)
                else:
                    task_names.add(task)

            paths = [p for p in paths if p.stem in task_names]
        return paths

    def load_results(
        self,
        models: Sequence[str] | Sequence[ModelMeta] | None = None,
        tasks: Sequence[str] | Sequence[AbsTask] | None = None,
        require_model_meta: bool = True,
        include_remote: bool = True,
        validate_and_filter: bool = False,
        only_main_score: bool = False,
    ) -> BenchmarkResults:
        """Loads the results from the cache directory and returns a BenchmarkResults object.

        Args:
            models: A list of model names to load the results for. If None it will load the results for all models.
            tasks: A list of task names to load the results for. If None it will load the results for all tasks.
            require_model_meta: If True it will ignore results that do not have a model_meta.json file. If false it attempt to
                extract the model name and revision from the path.
            include_remote: If True, it will include results from the remote repository.
            validate_and_filter: If True it will validate that the results object for the task contains the correct splits and filter out
                splits from the results object that are not default in the task metadata.
            only_main_score: If True, only the main score will be loaded.

        Returns:
            A BenchmarkResults object containing the results for the specified models and tasks.

        Examples:
            >>> from mteb.cache import ResultCache
            >>> cache = ResultCache()
            >>>
            >>> # Load results for specific models and tasks
            >>> results = cache.load_results(
            ...     models=["sentence-transformers/all-MiniLM-L6-v2"],
            ...     tasks=["STS12"],
            ...     require_model_meta=True,
            ... )
        """
        paths = self.get_cache_paths(
            models=models,
            tasks=tasks,
            require_model_meta=require_model_meta,
            include_remote=include_remote,
        )
        models_results = defaultdict(list)

        task_names = {}
        if tasks is not None:
            for task in tasks:
                if isinstance(task, AbsTask):
                    task_names[task.metadata.name] = task
                else:
                    task_names[task] = None

        for path in paths:
            task_result = TaskResult.from_disk(path)

            if only_main_score:
                task_result = task_result.only_main_score()
            model_name, revision = self._get_model_name_and_revision_from_path(
                path.parent
            )

            if validate_and_filter:
                task = task_names[task_result.task_name]
                try:
                    task_result.validate_and_filter_scores(task=task)
                except Exception as e:
                    logger.info(
                        f"Validation failed for {task_result.task_name} in {model_name} {revision}: {e}"
                    )
                    continue

            models_results[(model_name, revision)].append(task_result)

        # create BenchmarkResults object
        models_results = [
            ModelResult(
                model_name=model_name,
                model_revision=revision,
                task_results=task_results,
            )
            for (model_name, revision), task_results in models_results.items()
        ]

        benchmark_results = BenchmarkResults(
            model_results=models_results,
        )

        return benchmark_results

default_cache_path property

Get the local cache directory for MTEB results.

Returns:

Type Description
Path

The path to the local cache directory.

has_remote property

Check if the remote results repository exists in the cache directory.

Returns:

Type Description
bool

True if the remote results repository exists, False otherwise.

clear_cache()

Clear the local cache directory.

Source code in mteb/cache.py
250
251
252
253
254
255
256
def clear_cache(self) -> None:
    """Clear the local cache directory."""
    if self.cache_path.exists() and self.cache_path.is_dir():
        shutil.rmtree(self.cache_path)
        logger.info(f"Cache directory {self.cache_path} cleared.")
    else:
        logger.warning(f"Cache directory {self.cache_path} does not exist.")

download_from_remote(remote='https://github.com/embeddings-benchmark/results', download_latest=True)

Downloads the latest version of the results repository from GitHub to a local cache directory. Required git to be installed.

Parameters:

Name Type Description Default
remote str

The URL of the results repository on GitHub.

'https://github.com/embeddings-benchmark/results'
download_latest bool

If True it will download the latest version of the repository, otherwise it will only update the existing repository.

True

Returns:

Type Description
Path

The path to the local cache directory.

Source code in mteb/cache.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def download_from_remote(
    self,
    remote: str = "https://github.com/embeddings-benchmark/results",
    download_latest: bool = True,
) -> Path:
    """Downloads the latest version of the results repository from GitHub to a local cache directory. Required git to be installed.

    Args:
        remote: The URL of the results repository on GitHub.
        download_latest: If True it will download the latest version of the repository, otherwise it will only update the existing repository.

    Returns:
        The path to the local cache directory.
    """
    if not self.cache_path.exists() and not self.cache_path.is_dir():
        logger.info(
            f"Cache directory {self.cache_path} does not exist, creating it"
        )

    # if "results" folder already exists update it
    results_directory = self.cache_path / "remote"

    if results_directory.exists():
        # check repository in the directory is the same as the remote
        remote_url = subprocess.run(
            ["git", "config", "--get", "remote.origin.url"],
            cwd=results_directory,
            capture_output=True,
            text=True,
        ).stdout.strip()
        if remote_url != remote:
            msg = (
                f"remote repository '{remote}' does not match the one in {results_directory},  which is '{remote_url}'."
                + " Please remove the directory and try again."
            )
            raise ValueError(msg)

        if download_latest:
            logger.info(
                f"remote repository already exists in {results_directory}, updating it using git pull"
            )
            subprocess.run(["git", "pull"], cwd=results_directory)
        else:
            logger.debug(
                f"Results repository already exists in {results_directory}, skipping update, set download_latest=True to update it"
            )
        return results_directory

    logger.info(
        f"No results repository found in {results_directory}, cloning it from {remote}"
    )

    subprocess.run(["git", "clone", remote, "remote"], cwd=self.cache_path)

    return results_directory

get_cache_paths(models=None, tasks=None, require_model_meta=True, include_remote=True)

Get all paths to result JSON files in the cache directory.

These paths can then be used to fetch task results, like:

for path in paths:
    task_result = TaskResult.from_disk(path)

Parameters:

Name Type Description Default
models Sequence[str] | Sequence[ModelMeta] | None

A list of model names or ModelMeta objects to filter the paths.

None
tasks Sequence[str] | Sequence[AbsTask] | None

A list of task names to filter the paths.

None
require_model_meta bool

If True, only return paths that have a model_meta.json file.

True
include_remote bool

If True, include remote results in the returned paths.

True

Returns:

Type Description
list[Path]

A list of paths in the cache directory.

Examples:

>>> from mteb.cache import ResultCache
>>> cache = ResultCache()
>>>
>>> # Get all cache paths
>>> paths = cache.get_cache_paths()
>>>
>>> # Get all cache paths for a specific task
>>> paths = cache.get_cache_paths(tasks=["STS12"])
>>>
>>> # Get all cache paths for a specific model
>>> paths = cache.get_cache_paths(models=["sentence-transformers/all-MiniLM-L6-v2"])
>>>
>>> # Get all cache paths for a specific model and revision
>>> model_meta = mteb.get_model_meta("sentence-transformers/all-MiniLM-L6-v2")
>>> paths = cache.get_cache_paths(models=[model_meta])
Source code in mteb/cache.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def get_cache_paths(
    self,
    models: Sequence[str] | Sequence[ModelMeta] | None = None,
    tasks: Sequence[str] | Sequence[AbsTask] | None = None,
    require_model_meta: bool = True,
    include_remote: bool = True,
) -> list[Path]:
    """Get all paths to result JSON files in the cache directory.

    These paths can then be used to fetch task results, like:
    ```python
    for path in paths:
        task_result = TaskResult.from_disk(path)
    ```

    Args:
        models: A list of model names or ModelMeta objects to filter the paths.
        tasks: A list of task names to filter the paths.
        require_model_meta: If True, only return paths that have a model_meta.json file.
        include_remote: If True, include remote results in the returned paths.

    Returns:
        A list of paths in the cache directory.

    Examples:
        >>> from mteb.cache import ResultCache
        >>> cache = ResultCache()
        >>>
        >>> # Get all cache paths
        >>> paths = cache.get_cache_paths()
        >>>
        >>> # Get all cache paths for a specific task
        >>> paths = cache.get_cache_paths(tasks=["STS12"])
        >>>
        >>> # Get all cache paths for a specific model
        >>> paths = cache.get_cache_paths(models=["sentence-transformers/all-MiniLM-L6-v2"])
        >>>
        >>> # Get all cache paths for a specific model and revision
        >>> model_meta = mteb.get_model_meta("sentence-transformers/all-MiniLM-L6-v2")
        >>> paths = cache.get_cache_paths(models=[model_meta])
    """
    cache_paths = [
        p
        for p in (self.cache_path / "results").glob("**/*.json")
        if p.name != "model_meta.json"
    ]
    if include_remote:
        cache_paths += [
            p
            for p in (self.cache_path / "remote" / "results").glob("**/*.json")
            if p.name != "model_meta.json"
        ]

    cache_paths = self._filter_paths_by_model_and_revision(
        cache_paths,
        models=models,
    )
    cache_paths = self._filter_paths_by_task(cache_paths, tasks=tasks)

    if require_model_meta:
        cache_paths = [
            p for p in cache_paths if (p.parent / "model_meta.json").exists()
        ]
    return cache_paths

get_models(tasks=None, require_model_meta=True, include_remote=True)

Get all models in the cache directory.

Parameters:

Name Type Description Default
tasks Sequence[str] | None

A list of task names to filter the models.

None
require_model_meta bool

If True, only return models that have a model_meta.json file.

True
include_remote bool

If True, include remote results in the returned models.

True

Returns:

Type Description
list[tuple[ModelName, Revision]]

A list of tuples containing the model name and revision.

Source code in mteb/cache.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
def get_models(
    self,
    tasks: Sequence[str] | None = None,
    require_model_meta: bool = True,
    include_remote: bool = True,
) -> list[tuple[ModelName, Revision]]:
    """Get all models in the cache directory.

    Args:
        tasks: A list of task names to filter the models.
        require_model_meta: If True, only return models that have a model_meta.json file.
        include_remote: If True, include remote results in the returned models.

    Returns:
        A list of tuples containing the model name and revision.
    """
    cache_paths = self.get_cache_paths(
        tasks=tasks,
        require_model_meta=require_model_meta,
        include_remote=include_remote,
    )
    models = [(p.parent.parent.name, p.parent.name) for p in cache_paths]
    return list(set(models))

get_task_names(models=None, require_model_meta=True, include_remote=True)

Get all task names in the cache directory.

Parameters:

Name Type Description Default
models list[str] | list[ModelMeta] | None

A list of model names or ModelMeta objects to filter the task names.

None
require_model_meta bool

If True, only return task names that have a model_meta.json file

True
include_remote bool

If True, include remote results in the returned task names.

True

Returns:

Type Description
list[str]

A list of task names in the cache directory.

Source code in mteb/cache.py
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
def get_task_names(
    self,
    models: list[str] | list[ModelMeta] | None = None,
    require_model_meta: bool = True,
    include_remote: bool = True,
) -> list[str]:
    """Get all task names in the cache directory.

    Args:
        models: A list of model names or ModelMeta objects to filter the task names.
        require_model_meta: If True, only return task names that have a model_meta.json file
        include_remote: If True, include remote results in the returned task names.

    Returns:
        A list of task names in the cache directory.
    """
    cache_paths = self.get_cache_paths(
        models=models,
        require_model_meta=require_model_meta,
        include_remote=include_remote,
    )
    tasks = [p.stem for p in cache_paths]
    return list(set(tasks))

get_task_result_path(task_name, model_name, model_revision=None, remote=False)

Get the path to the results of a specific task for a specific model and revision.

Parameters:

Name Type Description Default
task_name str

The name of the task.

required
model_name str | ModelMeta

The name of the model as a valid directory name or a ModelMeta object.

required
model_revision str | None

The revision of the model. Must be specified if model_name is a string.

None
remote bool

If True, it will return the path to the remote results repository, otherwise it will return the path to the local results repository.

False

Returns:

Type Description
Path

The path to the results of the task.

Source code in mteb/cache.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def get_task_result_path(
    self,
    task_name: str,
    model_name: str | ModelMeta,
    model_revision: str | None = None,
    remote: bool = False,
) -> Path:
    """Get the path to the results of a specific task for a specific model and revision.

    Args:
        task_name: The name of the task.
        model_name: The name of the model as a valid directory name or a ModelMeta object.
        model_revision: The revision of the model. Must be specified if model_name is a string.
        remote: If True, it will return the path to the remote results repository, otherwise it will return the path to the local results repository.

    Returns:
        The path to the results of the task.
    """
    results_folder = (
        self.cache_path / "results"
        if not remote
        else self.cache_path / "remote" / "results"
    )

    if isinstance(model_name, ModelMeta):
        if model_revision is not None:
            logger.warning(
                "model_revision is ignored when model_name is a ModelMeta object"
            )
        model_revision = model_name.revision
        model_name = model_name.model_name_as_path()
    elif isinstance(model_name, str):
        model_name = model_name.replace("/", "__").replace(" ", "_")

    model_path = results_folder / model_name

    if model_revision is None:
        logger.warning(
            "model_revision is not specified, attempting to load the latest revision. To disable this behavior, specify model_revision explicitly."
        )
        # get revs from paths
        revisions = [p for p in model_path.glob("*") if p.is_dir()]
        if not revisions:
            model_revision = "no_revision_available"
        else:
            if len(revisions) > 1:
                logger.warning(
                    f"Multiple revisions found for model {model_name}: {revisions}. Using the latest one (according to latest edit)."
                )
                # sort folder by latest edit time
                revisions.sort(key=lambda p: p.stat().st_mtime, reverse=True)
            model_revision = revisions[0].name

    return model_path / model_revision / f"{task_name}.json"

load_results(models=None, tasks=None, require_model_meta=True, include_remote=True, validate_and_filter=False, only_main_score=False)

Loads the results from the cache directory and returns a BenchmarkResults object.

Parameters:

Name Type Description Default
models Sequence[str] | Sequence[ModelMeta] | None

A list of model names to load the results for. If None it will load the results for all models.

None
tasks Sequence[str] | Sequence[AbsTask] | None

A list of task names to load the results for. If None it will load the results for all tasks.

None
require_model_meta bool

If True it will ignore results that do not have a model_meta.json file. If false it attempt to extract the model name and revision from the path.

True
include_remote bool

If True, it will include results from the remote repository.

True
validate_and_filter bool

If True it will validate that the results object for the task contains the correct splits and filter out splits from the results object that are not default in the task metadata.

False
only_main_score bool

If True, only the main score will be loaded.

False

Returns:

Type Description
BenchmarkResults

A BenchmarkResults object containing the results for the specified models and tasks.

Examples:

>>> from mteb.cache import ResultCache
>>> cache = ResultCache()
>>>
>>> # Load results for specific models and tasks
>>> results = cache.load_results(
...     models=["sentence-transformers/all-MiniLM-L6-v2"],
...     tasks=["STS12"],
...     require_model_meta=True,
... )
Source code in mteb/cache.py
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
def load_results(
    self,
    models: Sequence[str] | Sequence[ModelMeta] | None = None,
    tasks: Sequence[str] | Sequence[AbsTask] | None = None,
    require_model_meta: bool = True,
    include_remote: bool = True,
    validate_and_filter: bool = False,
    only_main_score: bool = False,
) -> BenchmarkResults:
    """Loads the results from the cache directory and returns a BenchmarkResults object.

    Args:
        models: A list of model names to load the results for. If None it will load the results for all models.
        tasks: A list of task names to load the results for. If None it will load the results for all tasks.
        require_model_meta: If True it will ignore results that do not have a model_meta.json file. If false it attempt to
            extract the model name and revision from the path.
        include_remote: If True, it will include results from the remote repository.
        validate_and_filter: If True it will validate that the results object for the task contains the correct splits and filter out
            splits from the results object that are not default in the task metadata.
        only_main_score: If True, only the main score will be loaded.

    Returns:
        A BenchmarkResults object containing the results for the specified models and tasks.

    Examples:
        >>> from mteb.cache import ResultCache
        >>> cache = ResultCache()
        >>>
        >>> # Load results for specific models and tasks
        >>> results = cache.load_results(
        ...     models=["sentence-transformers/all-MiniLM-L6-v2"],
        ...     tasks=["STS12"],
        ...     require_model_meta=True,
        ... )
    """
    paths = self.get_cache_paths(
        models=models,
        tasks=tasks,
        require_model_meta=require_model_meta,
        include_remote=include_remote,
    )
    models_results = defaultdict(list)

    task_names = {}
    if tasks is not None:
        for task in tasks:
            if isinstance(task, AbsTask):
                task_names[task.metadata.name] = task
            else:
                task_names[task] = None

    for path in paths:
        task_result = TaskResult.from_disk(path)

        if only_main_score:
            task_result = task_result.only_main_score()
        model_name, revision = self._get_model_name_and_revision_from_path(
            path.parent
        )

        if validate_and_filter:
            task = task_names[task_result.task_name]
            try:
                task_result.validate_and_filter_scores(task=task)
            except Exception as e:
                logger.info(
                    f"Validation failed for {task_result.task_name} in {model_name} {revision}: {e}"
                )
                continue

        models_results[(model_name, revision)].append(task_result)

    # create BenchmarkResults object
    models_results = [
        ModelResult(
            model_name=model_name,
            model_revision=revision,
            task_results=task_results,
        )
        for (model_name, revision), task_results in models_results.items()
    ]

    benchmark_results = BenchmarkResults(
        model_results=models_results,
    )

    return benchmark_results

load_task_result(task_name, model_name, model_revision=None, raise_if_not_found=False, prioritize_remote=False)

Load the results from the local cache directory.

Parameters:

Name Type Description Default
task_name str

The name of the task.

required
model_name str | ModelMeta

The name of the model as a valid directory name or a ModelMeta object.

required
model_revision str | None

The revision of the model. Must be specified if model_name is a string.

None
raise_if_not_found bool

If True, raise an error if the results are not found.

False
prioritize_remote bool

If True, it will first try to load the results from the remote repository, if available.

False

Returns:

Type Description
TaskResult | None

The results of the task, or None if not found.

Source code in mteb/cache.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def load_task_result(
    self,
    task_name: str,
    model_name: str | ModelMeta,
    model_revision: str | None = None,
    raise_if_not_found: bool = False,
    prioritize_remote: bool = False,
) -> TaskResult | None:
    """Load the results from the local cache directory.

    Args:
        task_name: The name of the task.
        model_name: The name of the model as a valid directory name or a ModelMeta object.
        model_revision: The revision of the model. Must be specified if model_name is a string.
        raise_if_not_found: If True, raise an error if the results are not found.
        prioritize_remote: If True, it will first try to load the results from the remote repository, if available.

    Returns:
        The results of the task, or None if not found.
    """
    result_path = self.get_task_result_path(
        model_name=model_name,
        model_revision=model_revision,
        task_name=task_name,
    )

    if self.has_remote:
        remote_result_path = self.get_task_result_path(
            model_name=model_name,
            model_revision=model_revision,
            task_name=task_name,
            remote=True,
        )
        if remote_result_path.exists() and prioritize_remote:
            result_path = remote_result_path
        elif not result_path.exists():
            result_path = remote_result_path

    if not result_path.exists():
        msg = f"Results for {model_name} on {task_name} not found in {result_path}"
        if raise_if_not_found:
            raise FileNotFoundError(msg)
        logger.debug(msg)
        return None

    return TaskResult.from_disk(result_path)

save_to_cache(task_result, model_name, model_revision=None)

Save the task results to the local cache directory in the location {model_name}/{model_revision}/{task_name}.json.

Where model_name is a path-normalized model name. In addition we also save a model_meta.json in the revision folder to preserve the model metadata.

Parameters:

Name Type Description Default
task_result TaskResult

The results of the task.

required
model_name str | ModelMeta

The name of the model as a valid directory name or a ModelMeta object.

required
model_revision str | None

The revision of the model. Must be specified if model_name is a string.

None
Source code in mteb/cache.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def save_to_cache(
    self,
    task_result: TaskResult,
    model_name: str | ModelMeta,
    model_revision: str | None = None,
) -> None:
    """Save the task results to the local cache directory in the location {model_name}/{model_revision}/{task_name}.json.

    Where model_name is a path-normalized model name.
    In addition we also save a model_meta.json in the revision folder to preserve the model metadata.

    Args:
        task_result: The results of the task.
        model_name: The name of the model as a valid directory name or a ModelMeta object.
        model_revision: The revision of the model. Must be specified if model_name is a string.
    """
    result_path = self.get_task_result_path(
        model_name=model_name,
        model_revision=model_revision,
        task_name=task_result.task_name,
    )
    result_path.parent.mkdir(parents=True, exist_ok=True)
    task_result.to_disk(result_path)

    model_meta_path = result_path.parent / "model_meta.json"
    if isinstance(model_name, ModelMeta):
        meta = model_name
        with model_meta_path.open("w") as f:
            json.dump(meta.to_dict(), f, default=str)

Result Objects

mteb.results.TaskResult

Bases: BaseModel

A class to represent the MTEB result.

Attributes:

Name Type Description
task_name str

The name of the MTEB task.

dataset_revision str

The revision dataset for the task on HuggingFace dataset hub.

mteb_version str | None

The version of the MTEB used to evaluate the model.

scores dict[SplitName, list[ScoresDict]]

The scores of the model on the dataset. The scores is a dictionary with the following structure; dict[SplitName, list[Scores]]. Where Scores is a dictionary with the following structure; dict[str, Any]. Where the keys and values are scores. Split is the split of the dataset.

evaluation_time float | None

The time taken to evaluate the model.

kg_co2_emissions float | None

The kg of CO2 emissions produced by the model during evaluation.

Examples:

>>> scores = {
...     "evaluation_time": 100,
...     "train": {
...         "en-de": {
...             "main_score": 0.5,
...         },
...         "en-fr": {
...             "main_score": 0.6,
...         },
...     },
... }
>>> sample_task = ... # some MTEB task
>>> mteb_results = TaskResult.from_task_results(sample_task, scores)
>>> mteb_results.get_score()  # get the main score for all languages
0.55
>>> mteb_results.get_score(languages=["fra"])  # get the main score for French
0.6
>>> mteb_results.to_dict()
{'dataset_revision': '1.0', 'task_name': 'sample_task', 'mteb_version': '1.0.0', 'evaluation_time': 100, 'scores': {'train':
    [
        {'main_score': 0.5, 'hf_subset': 'en-de', 'languages': ['eng-Latn', 'deu-Latn']},
        {'main_score': 0.6, 'hf_subset': 'en-fr', 'languages': ['eng-Latn', 'fra-Latn']}
    ]}
}
Source code in mteb/results/task_result.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
class TaskResult(BaseModel):
    """A class to represent the MTEB result.

    Attributes:
        task_name: The name of the MTEB task.
        dataset_revision: The revision dataset for the task on HuggingFace dataset hub.
        mteb_version: The version of the MTEB used to evaluate the model.
        scores: The scores of the model on the dataset. The scores is a dictionary with the following structure; dict[SplitName, list[Scores]].
            Where Scores is a dictionary with the following structure; dict[str, Any]. Where the keys and values are scores. Split is the split of
            the dataset.
        evaluation_time: The time taken to evaluate the model.
        kg_co2_emissions: The kg of CO2 emissions produced by the model during evaluation.

    Examples:
        >>> scores = {
        ...     "evaluation_time": 100,
        ...     "train": {
        ...         "en-de": {
        ...             "main_score": 0.5,
        ...         },
        ...         "en-fr": {
        ...             "main_score": 0.6,
        ...         },
        ...     },
        ... }
        >>> sample_task = ... # some MTEB task
        >>> mteb_results = TaskResult.from_task_results(sample_task, scores)
        >>> mteb_results.get_score()  # get the main score for all languages
        0.55
        >>> mteb_results.get_score(languages=["fra"])  # get the main score for French
        0.6
        >>> mteb_results.to_dict()
        {'dataset_revision': '1.0', 'task_name': 'sample_task', 'mteb_version': '1.0.0', 'evaluation_time': 100, 'scores': {'train':
            [
                {'main_score': 0.5, 'hf_subset': 'en-de', 'languages': ['eng-Latn', 'deu-Latn']},
                {'main_score': 0.6, 'hf_subset': 'en-fr', 'languages': ['eng-Latn', 'fra-Latn']}
            ]}
        }
    """

    dataset_revision: str
    task_name: str
    mteb_version: str | None
    scores: dict[SplitName, list[ScoresDict]]
    evaluation_time: float | None
    kg_co2_emissions: float | None = None

    @classmethod
    def from_task_results(
        cls,
        task: AbsTask | type[AbsTask],
        scores: dict[SplitName, dict[HFSubset, ScoresDict]],
        evaluation_time: float,
        kg_co2_emissions: float | None = None,
    ) -> Self:
        """Create a TaskResult from the task and scores.

        Args:
            task: The task to create the TaskResult from.
            scores: The scores of the model on the dataset. The scores is a dictionary with the following structure; dict[SplitName, dict[HFSubset, Scores]].
                Where Scores is a dictionary with the following structure; dict[str, Any]. Where the keys and values are scores. Split is the split of
                the dataset.
            evaluation_time: The time taken to evaluate the model.
            kg_co2_emissions: The kg of CO2 emissions produced by the model during evaluation.
        """
        task_meta = task.metadata
        subset2langscripts = task_meta.hf_subsets_to_langscripts
        flat_scores = defaultdict(list)
        for split, hf_subset_scores in scores.items():
            for hf_subset, hf_scores in hf_subset_scores.items():
                eval_langs = subset2langscripts[hf_subset]
                _scores = {
                    **hf_scores,
                    "hf_subset": hf_subset,
                    "languages": eval_langs,
                }
                flat_scores[split].append(_scores)

        return TaskResult(
            dataset_revision=task.metadata.revision,
            task_name=task.metadata.name,
            mteb_version=version("mteb"),
            scores=flat_scores,
            evaluation_time=evaluation_time,
            kg_co2_emissions=kg_co2_emissions,
        )

    @field_validator("scores")
    @classmethod
    def _validate_scores(
        cls, v: dict[SplitName, list[ScoresDict]]
    ) -> dict[SplitName, list[ScoresDict]]:
        for split, hf_subset_scores in v.items():
            for hf_subset_score in hf_subset_scores:
                if not isinstance(hf_subset_score, dict):
                    raise ValueError("Scores should be a dictionary")
                cls._validate_scores_dict(hf_subset_score)
        return v

    @staticmethod
    def _validate_scores_dict(scores: ScoresDict) -> None:
        if "main_score" not in scores:
            raise ValueError("'main_score' should be in scores")
        if "hf_subset" not in scores or not isinstance(scores["hf_subset"], str):
            raise ValueError("hf_subset should be in scores and should be a string")
        if "languages" not in scores or not isinstance(scores["languages"], list):
            raise ValueError("languages should be in scores and should be a list")

        # check that it is json serializable
        try:
            _ = json.dumps(scores)
        except Exception as e:
            raise ValueError(f"Scores are not json serializable: {e}")

    @property
    def languages(self) -> list[str]:
        """Get the languages present in the scores."""
        langs = []
        for split, split_res in self.scores.items():
            for entry in split_res:
                langs.extend([lang.split("-")[0] for lang in entry["languages"]])
        return list(set(langs))

    @cached_property
    def task(self) -> AbsTask:
        """Get the task associated with the result."""
        from mteb.get_tasks import get_task

        return get_task(self.task_name)

    @property
    def domains(self) -> list[str]:
        """Get the domains of the task."""
        doms = self.task.metadata.domains
        if doms is None:
            doms = []
        return doms  # type: ignore

    @property
    def task_type(self) -> str:
        """Get the type of the task."""
        return self.task.metadata.type

    @property
    def is_public(self) -> bool:
        """Check if the task is public."""
        return self.task.metadata.is_public

    @property
    def hf_subsets(self) -> list[str]:
        """Get the hf_subsets present in the scores."""
        hf_subsets = set()
        for split, split_res in self.scores.items():
            for entry in split_res:
                hf_subsets.add(entry["hf_subset"])
        return list(hf_subsets)

    @property
    def eval_splits(self) -> list[str]:
        """Get the eval splits present in the scores."""
        return list(self.scores.keys())

    def to_dict(self) -> dict:
        """Convert the TaskResult to a dictionary.

        Returns:
            The TaskResult as a dictionary.
        """
        return self.model_dump()

    @classmethod
    def from_dict(cls, data: dict) -> Self:
        """Create a TaskResult from a dictionary.

        Args:
            data: The dictionary to create the TaskResult from.

        Returns:
            The created TaskResult object.
        """
        return cls.model_validate(data)

    def _round_scores(self, scores: dict[SplitName, list[ScoresDict]], n: int) -> None:
        """Recursively round scores to n decimal places"""
        for key, value in scores.items():
            if isinstance(value, dict):
                self._round_scores(value, n)
            elif isinstance(value, list):
                for i, v in enumerate(value):
                    if isinstance(v, dict):
                        self._round_scores(v, n)
                    elif isinstance(v, float):
                        value[i] = round(v, n)

            elif isinstance(value, float):
                scores[key] = round(value, n)

    def to_disk(self, path: Path) -> None:
        """Save TaskResult to disk.

        Args:
            path: The path to the file to save.
        """
        json_obj = self.model_dump()
        self._round_scores(json_obj["scores"], 6)

        with path.open("w") as f:
            json.dump(json_obj, f, indent=2)

    @classmethod
    def from_disk(cls, path: Path, load_historic_data: bool = True) -> Self:  # type: ignore
        """Load TaskResult from disk.

        Args:
            path: The path to the file to load.
            load_historic_data: Whether to attempt to load historic data from before v1.11.0.

        Returns:
            The loaded TaskResult object.
        """
        with path.open("r", encoding="utf-8") as f:
            data = json.load(f)

        if not load_historic_data:
            try:
                return cls.model_validate(data)
            except Exception as e:
                raise ValueError(
                    f"Error loading TaskResult from disk. You can try to load historic data by setting `load_historic_data=True`. Error: {e}"
                )

        pre_1_11_load = (
            (
                "mteb_version" in data
                and data["mteb_version"] is not None
                and Version(data["mteb_version"]) < Version("1.11.0")
            )
            or "mteb_version" not in data
        )  # assume it is before 1.11.0 if the version is not present

        try:
            obj = cls.model_validate(data)
        except Exception as e:
            if not pre_1_11_load:
                raise e
            logger.debug(
                f"Could not load TaskResult from disk, got error: {e}. Attempting to load from disk using format from before v1.11.0"
            )
            obj = cls._convert_from_before_v1_11_0(data)

        pre_v_12_48 = (
            "mteb_version" in data
            and data["mteb_version"] is not None
            and Version(data["mteb_version"]) < Version("1.12.48")
        )

        if pre_v_12_48:
            cls._fix_pair_classification_scores(obj)

        return obj

    @classmethod
    def _fix_pair_classification_scores(cls, obj: TaskResult) -> None:
        from mteb import get_task

        task_name = obj.task_name
        if task_name in outdated_tasks:
            task = outdated_tasks[task_name]
        else:
            task = get_task(obj.task_name)

        if task.metadata.type == "PairClassification":
            for split, split_scores in obj.scores.items():
                for hf_subset_scores in split_scores:
                    # concatenate score e.g. ["max"]["ap"] -> ["max_ap"]
                    for key in list(hf_subset_scores.keys()):
                        if isinstance(hf_subset_scores[key], dict):
                            for k, v in hf_subset_scores[key].items():
                                hf_subset_scores[f"{key}_{k}"] = v
                            hf_subset_scores.pop(key)

    @classmethod
    def _convert_from_before_v1_11_0(cls, data: dict) -> Self:
        from mteb.get_tasks import _TASKS_REGISTRY

        # in case the task name is not found in the registry, try to find a lower case version
        lower_case_registry = {k.lower(): v for k, v in _TASKS_REGISTRY.items()}

        scores = {**data}

        dataset_revision = scores.pop(
            "dataset_revision", "dataset revision not available"
        )
        task_name = scores.pop("mteb_dataset_name")
        mteb_version = scores.pop("mteb_version", "mteb version not available")

        # calculate evaluation time across all splits (move to top level)
        evaluation_time = 0
        for split, split_score in scores.items():
            if "evaluation_time" in split_score:
                evaluation_time += split_score.pop("evaluation_time")

        # normalize the scores to always be {split: {hf_subset: scores}}
        contains_hf_subset = any(
            isinstance(hf_subset_scores, dict)
            for split_scores in scores.values()
            for k, hf_subset_scores in split_scores.items()
            if k
            not in {"v_measures", "cos_sim", "euclidean", "manhattan", "dot", "max"}
        )
        if not contains_hf_subset:
            for split, split_score in scores.items():
                scores[split] = {"default": split_score.copy()}

        if task_name in outdated_tasks:
            logger.debug(
                f"Loading {task_name} as a dummy task as it no longer exists within MTEB. To avoid this set `load_historic_data=False`"
            )
            task = outdated_tasks[task_name]
        else:
            if task_name in renamed_tasks:
                task_name = renamed_tasks[task_name]
            task = _TASKS_REGISTRY.get(
                task_name, lower_case_registry[task_name.lower()]
            )

        # make sure that main score exists
        main_score = task.metadata.main_score
        for split, split_score in scores.items():
            for hf_subset, hf_subset_scores in split_score.items():
                for name, prev_name in [
                    (ScoringFunction.COSINE.value, "cos_sim"),
                    (ScoringFunction.MANHATTAN.value, "manhattan"),
                    (ScoringFunction.EUCLIDEAN.value, "euclidean"),
                    (ScoringFunction.DOT_PRODUCT.value, "dot"),
                    ("max", "max"),
                    ("similarity", "similarity"),
                ]:
                    prev_name_scores = hf_subset_scores.pop(prev_name, None)
                    if prev_name_scores is not None:
                        for k, v in prev_name_scores.items():
                            hf_subset_scores[f"{name}_{k}"] = v

                if "main_score" not in hf_subset_scores:
                    if main_score in hf_subset_scores:
                        hf_subset_scores["main_score"] = hf_subset_scores[main_score]
                    else:
                        logger.warning(f"Main score {main_score} not found in scores")
                        hf_subset_scores["main_score"] = None

        # specific fixes:
        if task_name == "MLSUMClusteringP2P" and mteb_version in [
            "1.1.2.dev0",
            "1.1.3.dev0",
        ]:  # back then it was only the french subsection which was implemented
            scores["test"]["fr"] = scores["test"].pop("default")
        if task_name == "MLSUMClusteringS2S" and mteb_version in [
            "1.1.2.dev0",
            "1.1.3.dev0",
        ]:
            scores["test"]["fr"] = scores["test"].pop("default")
        if task_name == "XPQARetrieval":  # subset were renamed from "fr" to "fra-fra"
            if "test" in scores and "fr" in scores["test"]:
                scores["test"]["fra-fra"] = scores["test"].pop("fr")

        result: TaskResult = TaskResult.from_task_results(
            task,  # type: ignore
            scores,
            evaluation_time,
            kg_co2_emissions=None,
        )
        result.dataset_revision = dataset_revision
        result.mteb_version = mteb_version
        return result

    def get_score(
        self,
        splits: list[SplitName] | None = None,
        languages: list[ISOLanguage | ISOLanguageScript] | None = None,
        scripts: list[ISOLanguageScript] | None = None,
        getter: Callable[[ScoresDict], Score] = lambda scores: scores["main_score"],
        aggregation: Callable[[list[Score]], Any] = np.mean,
    ) -> Any:
        """Get a score for the specified splits, languages, scripts and aggregation function.

        Args:
            splits: The splits to consider.
            languages: The languages to consider. Can be ISO language codes or ISO language script codes.
            scripts: The scripts to consider.
            getter: A function that takes a scores dictionary and returns a score e.g. "main_score" or "evaluation_time".
            aggregation: The aggregation function to use.

        Returns:
            The result of the aggregation function on the scores.
        """
        if splits is None:
            splits = list(self.scores.keys())

        lang_scripts = LanguageScripts.from_languages_and_scripts(languages, scripts)

        values = []
        for split in splits:
            if split not in self.scores:
                raise ValueError(f"Split {split} not found in scores")

            for scores in self.scores[split]:
                eval_langs = scores["languages"]
                for lang in eval_langs:
                    if lang_scripts.contains_language(lang):
                        values.append(getter(scores))
                        break

        return aggregation(values)

    def _get_score_fast(
        self,
        splits: Iterable[str] | None = None,
        languages: str | None = None,
        subsets: Iterable[str] | None = None,
    ) -> float:
        """Sped up version of get_score that will be used if no aggregation, script or getter needs to be specified.

        Args:
            splits: The splits to consider.
            languages: The languages to consider. Can be ISO language codes or ISO language script codes.
            subsets: The hf_subsets to consider.

        Returns:
            The mean main score for the specified splits, languages and subsets.
        """
        if splits is None:
            splits = self.scores.keys()
        val_sum = 0
        n_val = 0
        for split in splits:
            if split not in self.scores:
                raise ValueError(f"Split missing from scores: {split}")

            for scores in self.scores[split]:
                langs = scores["languages"]
                hf_subset = scores["hf_subset"]
                main_score = scores.get("main_score", None)
                if main_score is None:
                    raise ValueError(f"Missing main score for subset: {hf_subset}")
                if subsets and hf_subset not in subsets:
                    continue
                elif subsets:
                    val_sum += main_score
                    n_val += 1
                    continue

                if languages is None:
                    val_sum += main_score
                    n_val += 1
                    continue
                for lang in langs:
                    if lang.split("-")[0] in languages:
                        val_sum += main_score
                        n_val += 1
                        logger.info(f"{val_sum=}, {n_val=}")
                        break
        if n_val == 0:
            raise ValueError("No splits had scores for the specified languages.")
        return val_sum / n_val

    @classmethod
    def from_validated(cls, **data) -> Self:
        """Create a TaskResult from validated data.

        Returns:
            The created TaskResult object.
        """
        return cls.model_construct(**data)

    def __repr__(self) -> str:
        return f"TaskResult(task_name={self.task_name}, scores=...)"

    def only_main_score(self) -> Self:
        """Return a new TaskResult object with only the main score.

        Returns:
            A new TaskResult object with only the main score.
        """
        new_scores = {}
        for split in self.scores:
            new_scores[split] = []
            for subset_scores in self.scores[split]:
                new_scores[split].append(
                    {
                        "hf_subset": subset_scores.get("hf_subset", "default"),
                        "main_score": subset_scores.get("main_score", np.nan),
                        "languages": subset_scores.get("languages", []),
                    }
                )
        new_res = {**self.to_dict(), "scores": new_scores}
        new_res = TaskResult.from_validated(**new_res)
        return new_res

    def validate_and_filter_scores(self, task: AbsTask | None = None) -> Self:
        """Validate and filter the scores against the task metadata.

        This ensures that the scores are correct for the given task, by removing any splits besides those specified in the task metadata.
        Additionally it also ensure that all of the splits required as well as the languages are present in the scores.
        Returns new TaskResult object.

        Args:
            task: The task to validate the scores against. E.g. if the task supplied is limited to certain splits and languages,
                the scores will be filtered to only include those splits and languages. If None it will attempt to get the task from the task_name.

        Returns:
            A new TaskResult object with the validated and filtered scores.
        """
        from mteb.get_tasks import get_task

        if task is None:
            task = get_task(self.task_name)

        splits = task.eval_splits
        hf_subsets = task.hf_subsets
        hf_subsets = set(hf_subsets)

        new_scores = {}
        seen_splits = set()
        for split in self.scores:
            if split not in splits:
                continue
            new_scores[split] = []
            seen_subsets = set()
            for _scores in self.scores[split]:
                if _scores["hf_subset"] not in hf_subsets:
                    continue
                new_scores[split].append(_scores)
                seen_subsets.add(_scores["hf_subset"])
            if seen_subsets != hf_subsets:
                missing_subsets = hf_subsets - seen_subsets
                if len(missing_subsets) > 2:
                    subset1, subset2 = list(missing_subsets)[:2]
                    missing_subsets_str = f"{{'{subset1}', '{subset2}', ...}}"
                else:
                    missing_subsets_str = str(missing_subsets)

                logger.warning(
                    f"{task.metadata.name}: Missing subsets {missing_subsets_str} for split {split}"
                )
            seen_splits.add(split)
        if seen_splits != set(splits):
            logger.warning(
                f"{task.metadata.name}: Missing splits {set(splits) - seen_splits}"
            )
        new_res = {**self.to_dict(), "scores": new_scores}
        new_res = TaskResult.from_validated(**new_res)
        return new_res

    def is_mergeable(
        self,
        result: TaskResult | AbsTask,
        criteria: list[str] | list[Criteria] = [
            "mteb_version",
            "dataset_revision",
        ],
        raise_error: bool = False,
    ) -> bool:
        """Checks if the TaskResult object can be merged with another TaskResult or Task.

        Args:
            result: The TaskResult or Task object to check against.
            criteria: Additional criteria to check for merging. Can be "mteb_version" or "dataset_revision".
                It will always check that the task name match.
            raise_error: If True, raises an error if the objects cannot be merged. If False, returns False.

        Returns:
            True if the TaskResult object can be merged with the other object, False otherwise.
        """
        criteria = [Criteria.from_str(c) if isinstance(c, str) else c for c in criteria]
        if isinstance(result, TaskResult):
            name = result.task_name
            revision = result.dataset_revision
            mteb_version = result.mteb_version
        elif isinstance(result, AbsTask):
            mteb_version = version("mteb")
            name = result.metadata.name
            revision = result.metadata.revision
        else:
            msg = "result must be a TaskResult or AbsTask object"
            if raise_error:
                raise ValueError(msg)
            logger.debug(msg)
            return False

        if self.task_name != name:
            msg = f"Cannot merge TaskResult objects as they are derived from different tasks ({self.task_name} and {name})"
            if raise_error:
                raise ValueError(msg)
            logger.debug(msg)
            return False

        if Criteria.MTEB_VERSION in criteria and self.mteb_version != mteb_version:
            msg = f"Cannot merge TaskResult objects as they are derived from different MTEB versions ({self.mteb_version} (loaded) and {mteb_version} (current))"
            if raise_error:
                raise ValueError(msg)
            logger.debug(msg)
            return False

        if Criteria.DATASET_REVISION in criteria and self.dataset_revision != revision:
            msg = f"Cannot merge TaskResult objects as they are derived from different dataset revisions ({self.dataset_revision} and {revision})"
            if raise_error:
                raise ValueError(msg)
            logger.debug(msg)
            return False

        return True

    def merge(
        self,
        new_results: TaskResult,
        criteria: list[str] | list[Criteria] = [
            "mteb_version",
            "dataset_revision",
        ],
    ) -> Self:
        """Merges two TaskResult objects.

        Args:
            new_results: The new TaskResult object to merge with the current one.
            criteria: Additional criteria to check for merging. Can be "mteb_version" or "dataset_revision".
                It will always check that the task name match.

        Returns:
            A new TaskResult object with the merged scores.
        """
        self.is_mergeable(new_results, criteria=criteria, raise_error=True)

        merged_scores = self.scores.copy()

        for split, scores in new_results.scores.items():
            if split in merged_scores:
                merged_scores[split] = self._merge_split_scores(
                    merged_scores[split], scores
                )
            else:
                merged_scores[split] = scores

        existing_kg_co2_emissions = (
            self.kg_co2_emissions if self.kg_co2_emissions else 0
        )
        new_kg_co2_emissions = (
            new_results.kg_co2_emissions if new_results.kg_co2_emissions else 0
        )
        merged_kg_co2_emissions = None
        if existing_kg_co2_emissions and new_kg_co2_emissions:
            merged_kg_co2_emissions = existing_kg_co2_emissions + new_kg_co2_emissions

        merged_evaluation_time = None
        if self.evaluation_time and new_results.evaluation_time:
            merged_evaluation_time = self.evaluation_time + new_results.evaluation_time
        merged_results = TaskResult(
            dataset_revision=new_results.dataset_revision,
            task_name=new_results.task_name,
            mteb_version=new_results.mteb_version,
            scores=merged_scores,
            evaluation_time=merged_evaluation_time,
            kg_co2_emissions=merged_kg_co2_emissions,
        )

        return merged_results

    @staticmethod
    def _merge_split_scores(
        existing_scores: list[ScoresDict], new_scores: list[ScoresDict]
    ) -> list[ScoresDict]:
        merged = {score["hf_subset"]: score for score in existing_scores}
        for score in new_scores:
            merged[score["hf_subset"]] = score
        return list(merged.values())

    def get_missing_evaluations(self, task: AbsTask) -> dict[str, list[str]]:
        """Checks which splits and subsets are missing from the results.

        Args:
            task: The task to check against.

        Returns:
            A dictionary with the splits as keys and a list of missing subsets as values.
        """
        missing_splits = {}
        for splits in task.eval_splits:
            if splits not in self.scores:  # split it fully missing
                missing_splits[splits] = task.hf_subsets
            if splits in self.scores:
                hf_subsets = {score["hf_subset"] for score in self.scores[splits]}
                missing_subsets = list(set(task.hf_subsets) - hf_subsets)
                if missing_subsets:
                    missing_splits[splits] = missing_subsets

        return missing_splits

    def get_hf_eval_results(self) -> list[EvalResult]:
        """Create HF evaluation results objects from TaskResult objects.

        Returns:
            List of EvalResult objects for each split and subset.
        """
        task_metadata = self.task.metadata
        task_type = task_metadata._hf_task_type()[0]
        results = []
        for split, scores in self.scores.items():
            for subset_results in scores:
                subset = subset_results.get("hf_subset", "default")
                results.append(
                    EvalResult(
                        task_type=task_type,
                        task_name=task_metadata.type,
                        dataset_type=task_metadata.dataset["path"],
                        dataset_name=f"{task_metadata.name} ({subset})",
                        dataset_config=subset,
                        dataset_split=split,
                        dataset_revision=task_metadata.dataset["revision"],
                        metric_type=task_metadata.main_score,
                        metric_name=task_metadata.main_score,
                        metric_value=subset_results["main_score"],
                        source_name="MTEB",
                        source_url="https://github.com/embeddings-benchmark/mteb/",
                    )
                )
        return results

domains property

Get the domains of the task.

eval_splits property

Get the eval splits present in the scores.

hf_subsets property

Get the hf_subsets present in the scores.

is_public property

Check if the task is public.

languages property

Get the languages present in the scores.

task cached property

Get the task associated with the result.

task_type property

Get the type of the task.

from_dict(data) classmethod

Create a TaskResult from a dictionary.

Parameters:

Name Type Description Default
data dict

The dictionary to create the TaskResult from.

required

Returns:

Type Description
Self

The created TaskResult object.

Source code in mteb/results/task_result.py
288
289
290
291
292
293
294
295
296
297
298
@classmethod
def from_dict(cls, data: dict) -> Self:
    """Create a TaskResult from a dictionary.

    Args:
        data: The dictionary to create the TaskResult from.

    Returns:
        The created TaskResult object.
    """
    return cls.model_validate(data)

from_disk(path, load_historic_data=True) classmethod

Load TaskResult from disk.

Parameters:

Name Type Description Default
path Path

The path to the file to load.

required
load_historic_data bool

Whether to attempt to load historic data from before v1.11.0.

True

Returns:

Type Description
Self

The loaded TaskResult object.

Source code in mteb/results/task_result.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
@classmethod
def from_disk(cls, path: Path, load_historic_data: bool = True) -> Self:  # type: ignore
    """Load TaskResult from disk.

    Args:
        path: The path to the file to load.
        load_historic_data: Whether to attempt to load historic data from before v1.11.0.

    Returns:
        The loaded TaskResult object.
    """
    with path.open("r", encoding="utf-8") as f:
        data = json.load(f)

    if not load_historic_data:
        try:
            return cls.model_validate(data)
        except Exception as e:
            raise ValueError(
                f"Error loading TaskResult from disk. You can try to load historic data by setting `load_historic_data=True`. Error: {e}"
            )

    pre_1_11_load = (
        (
            "mteb_version" in data
            and data["mteb_version"] is not None
            and Version(data["mteb_version"]) < Version("1.11.0")
        )
        or "mteb_version" not in data
    )  # assume it is before 1.11.0 if the version is not present

    try:
        obj = cls.model_validate(data)
    except Exception as e:
        if not pre_1_11_load:
            raise e
        logger.debug(
            f"Could not load TaskResult from disk, got error: {e}. Attempting to load from disk using format from before v1.11.0"
        )
        obj = cls._convert_from_before_v1_11_0(data)

    pre_v_12_48 = (
        "mteb_version" in data
        and data["mteb_version"] is not None
        and Version(data["mteb_version"]) < Version("1.12.48")
    )

    if pre_v_12_48:
        cls._fix_pair_classification_scores(obj)

    return obj

from_task_results(task, scores, evaluation_time, kg_co2_emissions=None) classmethod

Create a TaskResult from the task and scores.

Parameters:

Name Type Description Default
task AbsTask | type[AbsTask]

The task to create the TaskResult from.

required
scores dict[SplitName, dict[HFSubset, ScoresDict]]

The scores of the model on the dataset. The scores is a dictionary with the following structure; dict[SplitName, dict[HFSubset, Scores]]. Where Scores is a dictionary with the following structure; dict[str, Any]. Where the keys and values are scores. Split is the split of the dataset.

required
evaluation_time float

The time taken to evaluate the model.

required
kg_co2_emissions float | None

The kg of CO2 emissions produced by the model during evaluation.

None
Source code in mteb/results/task_result.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
@classmethod
def from_task_results(
    cls,
    task: AbsTask | type[AbsTask],
    scores: dict[SplitName, dict[HFSubset, ScoresDict]],
    evaluation_time: float,
    kg_co2_emissions: float | None = None,
) -> Self:
    """Create a TaskResult from the task and scores.

    Args:
        task: The task to create the TaskResult from.
        scores: The scores of the model on the dataset. The scores is a dictionary with the following structure; dict[SplitName, dict[HFSubset, Scores]].
            Where Scores is a dictionary with the following structure; dict[str, Any]. Where the keys and values are scores. Split is the split of
            the dataset.
        evaluation_time: The time taken to evaluate the model.
        kg_co2_emissions: The kg of CO2 emissions produced by the model during evaluation.
    """
    task_meta = task.metadata
    subset2langscripts = task_meta.hf_subsets_to_langscripts
    flat_scores = defaultdict(list)
    for split, hf_subset_scores in scores.items():
        for hf_subset, hf_scores in hf_subset_scores.items():
            eval_langs = subset2langscripts[hf_subset]
            _scores = {
                **hf_scores,
                "hf_subset": hf_subset,
                "languages": eval_langs,
            }
            flat_scores[split].append(_scores)

    return TaskResult(
        dataset_revision=task.metadata.revision,
        task_name=task.metadata.name,
        mteb_version=version("mteb"),
        scores=flat_scores,
        evaluation_time=evaluation_time,
        kg_co2_emissions=kg_co2_emissions,
    )

from_validated(**data) classmethod

Create a TaskResult from validated data.

Returns:

Type Description
Self

The created TaskResult object.

Source code in mteb/results/task_result.py
583
584
585
586
587
588
589
590
@classmethod
def from_validated(cls, **data) -> Self:
    """Create a TaskResult from validated data.

    Returns:
        The created TaskResult object.
    """
    return cls.model_construct(**data)

get_hf_eval_results()

Create HF evaluation results objects from TaskResult objects.

Returns:

Type Description
list[EvalResult]

List of EvalResult objects for each split and subset.

Source code in mteb/results/task_result.py
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
def get_hf_eval_results(self) -> list[EvalResult]:
    """Create HF evaluation results objects from TaskResult objects.

    Returns:
        List of EvalResult objects for each split and subset.
    """
    task_metadata = self.task.metadata
    task_type = task_metadata._hf_task_type()[0]
    results = []
    for split, scores in self.scores.items():
        for subset_results in scores:
            subset = subset_results.get("hf_subset", "default")
            results.append(
                EvalResult(
                    task_type=task_type,
                    task_name=task_metadata.type,
                    dataset_type=task_metadata.dataset["path"],
                    dataset_name=f"{task_metadata.name} ({subset})",
                    dataset_config=subset,
                    dataset_split=split,
                    dataset_revision=task_metadata.dataset["revision"],
                    metric_type=task_metadata.main_score,
                    metric_name=task_metadata.main_score,
                    metric_value=subset_results["main_score"],
                    source_name="MTEB",
                    source_url="https://github.com/embeddings-benchmark/mteb/",
                )
            )
    return results

get_missing_evaluations(task)

Checks which splits and subsets are missing from the results.

Parameters:

Name Type Description Default
task AbsTask

The task to check against.

required

Returns:

Type Description
dict[str, list[str]]

A dictionary with the splits as keys and a list of missing subsets as values.

Source code in mteb/results/task_result.py
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
def get_missing_evaluations(self, task: AbsTask) -> dict[str, list[str]]:
    """Checks which splits and subsets are missing from the results.

    Args:
        task: The task to check against.

    Returns:
        A dictionary with the splits as keys and a list of missing subsets as values.
    """
    missing_splits = {}
    for splits in task.eval_splits:
        if splits not in self.scores:  # split it fully missing
            missing_splits[splits] = task.hf_subsets
        if splits in self.scores:
            hf_subsets = {score["hf_subset"] for score in self.scores[splits]}
            missing_subsets = list(set(task.hf_subsets) - hf_subsets)
            if missing_subsets:
                missing_splits[splits] = missing_subsets

    return missing_splits

get_score(splits=None, languages=None, scripts=None, getter=lambda scores: scores['main_score'], aggregation=np.mean)

Get a score for the specified splits, languages, scripts and aggregation function.

Parameters:

Name Type Description Default
splits list[SplitName] | None

The splits to consider.

None
languages list[ISOLanguage | ISOLanguageScript] | None

The languages to consider. Can be ISO language codes or ISO language script codes.

None
scripts list[ISOLanguageScript] | None

The scripts to consider.

None
getter Callable[[ScoresDict], Score]

A function that takes a scores dictionary and returns a score e.g. "main_score" or "evaluation_time".

lambda scores: scores['main_score']
aggregation Callable[[list[Score]], Any]

The aggregation function to use.

mean

Returns:

Type Description
Any

The result of the aggregation function on the scores.

Source code in mteb/results/task_result.py
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
def get_score(
    self,
    splits: list[SplitName] | None = None,
    languages: list[ISOLanguage | ISOLanguageScript] | None = None,
    scripts: list[ISOLanguageScript] | None = None,
    getter: Callable[[ScoresDict], Score] = lambda scores: scores["main_score"],
    aggregation: Callable[[list[Score]], Any] = np.mean,
) -> Any:
    """Get a score for the specified splits, languages, scripts and aggregation function.

    Args:
        splits: The splits to consider.
        languages: The languages to consider. Can be ISO language codes or ISO language script codes.
        scripts: The scripts to consider.
        getter: A function that takes a scores dictionary and returns a score e.g. "main_score" or "evaluation_time".
        aggregation: The aggregation function to use.

    Returns:
        The result of the aggregation function on the scores.
    """
    if splits is None:
        splits = list(self.scores.keys())

    lang_scripts = LanguageScripts.from_languages_and_scripts(languages, scripts)

    values = []
    for split in splits:
        if split not in self.scores:
            raise ValueError(f"Split {split} not found in scores")

        for scores in self.scores[split]:
            eval_langs = scores["languages"]
            for lang in eval_langs:
                if lang_scripts.contains_language(lang):
                    values.append(getter(scores))
                    break

    return aggregation(values)

is_mergeable(result, criteria=['mteb_version', 'dataset_revision'], raise_error=False)

Checks if the TaskResult object can be merged with another TaskResult or Task.

Parameters:

Name Type Description Default
result TaskResult | AbsTask

The TaskResult or Task object to check against.

required
criteria list[str] | list[Criteria]

Additional criteria to check for merging. Can be "mteb_version" or "dataset_revision". It will always check that the task name match.

['mteb_version', 'dataset_revision']
raise_error bool

If True, raises an error if the objects cannot be merged. If False, returns False.

False

Returns:

Type Description
bool

True if the TaskResult object can be merged with the other object, False otherwise.

Source code in mteb/results/task_result.py
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
def is_mergeable(
    self,
    result: TaskResult | AbsTask,
    criteria: list[str] | list[Criteria] = [
        "mteb_version",
        "dataset_revision",
    ],
    raise_error: bool = False,
) -> bool:
    """Checks if the TaskResult object can be merged with another TaskResult or Task.

    Args:
        result: The TaskResult or Task object to check against.
        criteria: Additional criteria to check for merging. Can be "mteb_version" or "dataset_revision".
            It will always check that the task name match.
        raise_error: If True, raises an error if the objects cannot be merged. If False, returns False.

    Returns:
        True if the TaskResult object can be merged with the other object, False otherwise.
    """
    criteria = [Criteria.from_str(c) if isinstance(c, str) else c for c in criteria]
    if isinstance(result, TaskResult):
        name = result.task_name
        revision = result.dataset_revision
        mteb_version = result.mteb_version
    elif isinstance(result, AbsTask):
        mteb_version = version("mteb")
        name = result.metadata.name
        revision = result.metadata.revision
    else:
        msg = "result must be a TaskResult or AbsTask object"
        if raise_error:
            raise ValueError(msg)
        logger.debug(msg)
        return False

    if self.task_name != name:
        msg = f"Cannot merge TaskResult objects as they are derived from different tasks ({self.task_name} and {name})"
        if raise_error:
            raise ValueError(msg)
        logger.debug(msg)
        return False

    if Criteria.MTEB_VERSION in criteria and self.mteb_version != mteb_version:
        msg = f"Cannot merge TaskResult objects as they are derived from different MTEB versions ({self.mteb_version} (loaded) and {mteb_version} (current))"
        if raise_error:
            raise ValueError(msg)
        logger.debug(msg)
        return False

    if Criteria.DATASET_REVISION in criteria and self.dataset_revision != revision:
        msg = f"Cannot merge TaskResult objects as they are derived from different dataset revisions ({self.dataset_revision} and {revision})"
        if raise_error:
            raise ValueError(msg)
        logger.debug(msg)
        return False

    return True

merge(new_results, criteria=['mteb_version', 'dataset_revision'])

Merges two TaskResult objects.

Parameters:

Name Type Description Default
new_results TaskResult

The new TaskResult object to merge with the current one.

required
criteria list[str] | list[Criteria]

Additional criteria to check for merging. Can be "mteb_version" or "dataset_revision". It will always check that the task name match.

['mteb_version', 'dataset_revision']

Returns:

Type Description
Self

A new TaskResult object with the merged scores.

Source code in mteb/results/task_result.py
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
def merge(
    self,
    new_results: TaskResult,
    criteria: list[str] | list[Criteria] = [
        "mteb_version",
        "dataset_revision",
    ],
) -> Self:
    """Merges two TaskResult objects.

    Args:
        new_results: The new TaskResult object to merge with the current one.
        criteria: Additional criteria to check for merging. Can be "mteb_version" or "dataset_revision".
            It will always check that the task name match.

    Returns:
        A new TaskResult object with the merged scores.
    """
    self.is_mergeable(new_results, criteria=criteria, raise_error=True)

    merged_scores = self.scores.copy()

    for split, scores in new_results.scores.items():
        if split in merged_scores:
            merged_scores[split] = self._merge_split_scores(
                merged_scores[split], scores
            )
        else:
            merged_scores[split] = scores

    existing_kg_co2_emissions = (
        self.kg_co2_emissions if self.kg_co2_emissions else 0
    )
    new_kg_co2_emissions = (
        new_results.kg_co2_emissions if new_results.kg_co2_emissions else 0
    )
    merged_kg_co2_emissions = None
    if existing_kg_co2_emissions and new_kg_co2_emissions:
        merged_kg_co2_emissions = existing_kg_co2_emissions + new_kg_co2_emissions

    merged_evaluation_time = None
    if self.evaluation_time and new_results.evaluation_time:
        merged_evaluation_time = self.evaluation_time + new_results.evaluation_time
    merged_results = TaskResult(
        dataset_revision=new_results.dataset_revision,
        task_name=new_results.task_name,
        mteb_version=new_results.mteb_version,
        scores=merged_scores,
        evaluation_time=merged_evaluation_time,
        kg_co2_emissions=merged_kg_co2_emissions,
    )

    return merged_results

only_main_score()

Return a new TaskResult object with only the main score.

Returns:

Type Description
Self

A new TaskResult object with only the main score.

Source code in mteb/results/task_result.py
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
def only_main_score(self) -> Self:
    """Return a new TaskResult object with only the main score.

    Returns:
        A new TaskResult object with only the main score.
    """
    new_scores = {}
    for split in self.scores:
        new_scores[split] = []
        for subset_scores in self.scores[split]:
            new_scores[split].append(
                {
                    "hf_subset": subset_scores.get("hf_subset", "default"),
                    "main_score": subset_scores.get("main_score", np.nan),
                    "languages": subset_scores.get("languages", []),
                }
            )
    new_res = {**self.to_dict(), "scores": new_scores}
    new_res = TaskResult.from_validated(**new_res)
    return new_res

to_dict()

Convert the TaskResult to a dictionary.

Returns:

Type Description
dict

The TaskResult as a dictionary.

Source code in mteb/results/task_result.py
280
281
282
283
284
285
286
def to_dict(self) -> dict:
    """Convert the TaskResult to a dictionary.

    Returns:
        The TaskResult as a dictionary.
    """
    return self.model_dump()

to_disk(path)

Save TaskResult to disk.

Parameters:

Name Type Description Default
path Path

The path to the file to save.

required
Source code in mteb/results/task_result.py
315
316
317
318
319
320
321
322
323
324
325
def to_disk(self, path: Path) -> None:
    """Save TaskResult to disk.

    Args:
        path: The path to the file to save.
    """
    json_obj = self.model_dump()
    self._round_scores(json_obj["scores"], 6)

    with path.open("w") as f:
        json.dump(json_obj, f, indent=2)

validate_and_filter_scores(task=None)

Validate and filter the scores against the task metadata.

This ensures that the scores are correct for the given task, by removing any splits besides those specified in the task metadata. Additionally it also ensure that all of the splits required as well as the languages are present in the scores. Returns new TaskResult object.

Parameters:

Name Type Description Default
task AbsTask | None

The task to validate the scores against. E.g. if the task supplied is limited to certain splits and languages, the scores will be filtered to only include those splits and languages. If None it will attempt to get the task from the task_name.

None

Returns:

Type Description
Self

A new TaskResult object with the validated and filtered scores.

Source code in mteb/results/task_result.py
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
def validate_and_filter_scores(self, task: AbsTask | None = None) -> Self:
    """Validate and filter the scores against the task metadata.

    This ensures that the scores are correct for the given task, by removing any splits besides those specified in the task metadata.
    Additionally it also ensure that all of the splits required as well as the languages are present in the scores.
    Returns new TaskResult object.

    Args:
        task: The task to validate the scores against. E.g. if the task supplied is limited to certain splits and languages,
            the scores will be filtered to only include those splits and languages. If None it will attempt to get the task from the task_name.

    Returns:
        A new TaskResult object with the validated and filtered scores.
    """
    from mteb.get_tasks import get_task

    if task is None:
        task = get_task(self.task_name)

    splits = task.eval_splits
    hf_subsets = task.hf_subsets
    hf_subsets = set(hf_subsets)

    new_scores = {}
    seen_splits = set()
    for split in self.scores:
        if split not in splits:
            continue
        new_scores[split] = []
        seen_subsets = set()
        for _scores in self.scores[split]:
            if _scores["hf_subset"] not in hf_subsets:
                continue
            new_scores[split].append(_scores)
            seen_subsets.add(_scores["hf_subset"])
        if seen_subsets != hf_subsets:
            missing_subsets = hf_subsets - seen_subsets
            if len(missing_subsets) > 2:
                subset1, subset2 = list(missing_subsets)[:2]
                missing_subsets_str = f"{{'{subset1}', '{subset2}', ...}}"
            else:
                missing_subsets_str = str(missing_subsets)

            logger.warning(
                f"{task.metadata.name}: Missing subsets {missing_subsets_str} for split {split}"
            )
        seen_splits.add(split)
    if seen_splits != set(splits):
        logger.warning(
            f"{task.metadata.name}: Missing splits {set(splits) - seen_splits}"
        )
    new_res = {**self.to_dict(), "scores": new_scores}
    new_res = TaskResult.from_validated(**new_res)
    return new_res

mteb.results.ModelResult

Bases: BaseModel

Data class to hold the results of a model on a set of tasks.

Attributes:

Name Type Description
model_name str

Name of the model.

model_revision str | None

Revision of the model.

task_results list[TaskResult]

List of TaskResult objects.

Source code in mteb/results/model_result.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
class ModelResult(BaseModel):
    """Data class to hold the results of a model on a set of tasks.

    Attributes:
        model_name: Name of the model.
        model_revision: Revision of the model.
        task_results: List of TaskResult objects.
    """

    model_name: str
    model_revision: str | None
    task_results: list[TaskResult]
    default_modalities: list[Modalities] = Field(
        default_factory=lambda: ["text"], alias="modalities"
    )
    model_config = (
        ConfigDict(  # to free up the name model_* which is otherwise protected
            protected_namespaces=(),
        )
    )

    def __repr__(self) -> str:
        n_entries = len(self.task_results)
        return f"ModelResult(model_name={self.model_name}, model_revision={self.model_revision}, task_results=[...](#{n_entries}))"

    @classmethod
    def from_validated(cls, **data: dict[str, Any]) -> Self:
        """Create a ModelResult from validated data.

        Args:
            data: The validated data.
        """
        data["task_results"] = [
            TaskResult.from_validated(**res) for res in data["task_results"]
        ]
        return cls.model_construct(**data)

    def _filter_tasks(
        self,
        task_names: list[str] | None = None,
        languages: list[str] | None = None,
        domains: list[TaskDomain] | None = None,
        task_types: list[TaskType] | None = None,
        modalities: list[Modalities] | None = None,
        is_public: bool | None = None,
    ) -> Self:
        new_task_results = []
        for task_result in self.task_results:
            if (task_names is not None) and (task_result.task_name not in task_names):
                continue
            if languages is not None:
                task_languages = task_result.languages
                if not any(lang in task_languages for lang in languages):
                    continue
            if domains is not None:
                task_domains = task_result.domains
                if not any(domain in task_domains for domain in domains):
                    continue
            if (task_types is not None) and (task_result.task_type not in task_types):
                continue
            if modalities is not None:
                task_modalities = getattr(task_result, "modalities", [])
                if not any(modality in task_modalities for modality in modalities):
                    continue
            if (is_public is not None) and (task_result.is_public is not is_public):
                continue
            new_task_results.append(task_result)
        return type(self).model_construct(
            model_name=self.model_name,
            model_revision=self.model_revision,
            task_results=new_task_results,
        )

    def select_tasks(self, tasks: Sequence[AbsTask]) -> Self:
        """Select tasks from the ModelResult based on a list of AbsTask objects.

        Args:
            tasks: A sequence of AbsTask objects to select from the ModelResult.
        """
        task_name_to_task = {task.metadata.name: task for task in tasks}
        new_task_results = [
            task_res.validate_and_filter_scores(task_name_to_task[task_res.task_name])
            for task_res in self.task_results
            if task_res.task_name in task_name_to_task
        ]
        return type(self).model_construct(
            model_name=self.model_name,
            model_revision=self.model_revision,
            task_results=new_task_results,
        )

    def _get_scores(
        self,
        splits: list[SplitName] | None = None,
        languages: list[ISOLanguage | ISOLanguageScript] | None = None,
        scripts: list[ISOLanguageScript] | None = None,
        getter: Callable[[ScoresDict], Score] | None = None,
        aggregation: Callable[[list[Score]], Any] | None = None,
        format: Literal["wide", "long"] = "wide",
    ) -> dict | list:
        if (getter is not None) or (aggregation is not None) or (scripts is not None):
            use_fast = False
            getter = (
                getter if getter is not None else lambda scores: scores["main_score"]
            )
            aggregation = aggregation if aggregation is not None else np.mean
        else:
            use_fast = True
        if format == "wide":
            scores = {}
            for res in self.task_results:
                try:
                    if use_fast:
                        scores[res.task_name] = res._get_score_fast(
                            splits=splits,  # type: ignore
                            languages=languages,  # type: ignore
                        )
                    else:
                        scores[res.task_name] = res.get_score(
                            splits=splits,
                            languages=languages,
                            aggregation=aggregation,  # type: ignore
                            getter=getter,  # type: ignore
                            scripts=scripts,
                        )
                except Exception as e:
                    warnings.warn(
                        f"Couldn't get scores for {res.task_name} due to {e}."
                    )
            return scores
        if format == "long":
            entries = []
            for task_res in self.task_results:
                try:
                    if use_fast:
                        score = task_res._get_score_fast(
                            splits=splits,
                            languages=languages,  # type: ignore
                        )
                    else:
                        score = task_res.get_score(
                            splits=splits,
                            languages=languages,
                            aggregation=aggregation,  # type: ignore
                            getter=getter,  # type: ignore
                            scripts=scripts,
                        )
                    entry = dict(
                        model_name=self.model_name,
                        model_revision=self.model_revision,
                        task_name=task_res.task_name,
                        score=score,
                        mteb_version=task_res.mteb_version,
                        dataset_revision=task_res.dataset_revision,
                        evaluation_time=task_res.evaluation_time,
                        kg_co2_emissions=task_res.kg_co2_emissions,
                    )
                    entries.append(entry)
                except Exception as e:
                    warnings.warn(
                        f"Couldn't get scores for {task_res.task_name} due to {e}."
                    )
            return entries

    def _get_score_for_table(self) -> list[dict[str, str | float]]:
        scores_data = []
        model_name = self.model_name
        for task_result in self.task_results:
            task_name = task_result.task_name
            for split, scores_list in task_result.scores.items():
                for score_item in scores_list:
                    row = {
                        "model_name": model_name,
                        "model_revision": self.model_revision,
                        "task_name": task_name,
                        "split": split,
                        "subset": score_item.get("hf_subset", "default"),
                        "score": score_item.get("main_score", None),
                    }

                    scores_data.append(row)

        return scores_data

    def to_dataframe(
        self,
        aggregation_level: Literal["subset", "split", "task"] = "task",
        aggregation_fn: Callable[[list[Score]], Any] | None = None,
        include_model_revision: bool = False,
        format: Literal["wide", "long"] = "wide",
    ) -> pd.DataFrame:
        """Get a DataFrame with the scores for all models and tasks.

        The DataFrame will have the following columns in addition to the metadata columns:

        - model_name: The name of the model.
        - task_name: The name of the task.
        - score: The main score of the model on the task.

        In addition, the DataFrame can have the following columns depending on the aggregation level:

        - split: The split of the task. E.g. "test", "train", "validation".
        - subset: The subset of the task. E.g. "en", "fr-en".

        Afterwards, the DataFrame will be aggregated according to the aggregation method and pivoted to either a wide format.

        Args:
            aggregation_level: The aggregation to use. Can be one of:
                - "subset"/None: No aggregation will be done. The DataFrame will have one row per model, task, split and subset.
                - "split": Aggregates the scores by split. The DataFrame will have one row per model, task and split.
                - "task": Aggregates the scores by task. The DataFrame will have one row per model and task.
            aggregation_fn: The function to use for aggregation. If None, the mean will be used.
            include_model_revision: If True, the model revision will be included in the DataFrame. If False, it will be excluded.
            format: The format of the DataFrame. Can be one of:
                - "wide": The DataFrame will be of shape (number of tasks, number of models). Scores will be in the cells.
                - "long": The DataFrame will of length (number of tasks * number of model). Scores will be in columns.

        Returns:
            A DataFrame with the scores for all models and tasks.
        """
        scores_data = self._get_score_for_table()

        if not scores_data:
            logger.warning("No scores data available. Returning empty DataFrame.")
            return pd.DataFrame()

        # Create DataFrame
        df = pd.DataFrame(scores_data)

        _columns = ["model_name"]
        if include_model_revision is False:
            df = df.drop(columns=["model_revision"])
        else:
            _columns.append("model_revision")

        return _aggregate_and_pivot(
            df,
            columns=_columns,
            aggregation_level=aggregation_level,
            format=format,
            aggregation_fn=aggregation_fn,
        )

    def __hash__(self) -> int:
        return id(self)

    def __iter__(self) -> Iterable[TaskResult]:
        return iter(self.task_results)

    def __getitem__(self, index) -> TaskResult:
        return self.task_results[index]

    def __len__(self) -> int:
        return len(self.task_results)

    @property
    def languages(self) -> list[str]:
        """Get all languages in the model results.

        Returns:
            A list of languages in the model results.
        """
        langs = []
        for task_res in self.task_results:
            langs.extend(task_res.languages)
        return list(set(langs))

    @property
    def domains(self) -> list[str]:
        """Get all domains in the model results.

        Returns:
            A list of domains in the model results.

        """
        ds = []
        for task_res in self.task_results:
            ds.extend(task_res.domains)
        return list(set(ds))

    @property
    def task_types(self) -> list[str]:
        """Get all task types in the model results.

        Returns:
            A list of task types in the model results.
        """
        return list({task_res.task_type for task_res in self.task_results})

    @property
    def task_names(self) -> list[str]:
        """Get all task names in the model results.

        Returns:
            A list of task names in the model results.
        """
        return [task_res.task_name for task_res in self.task_results]

    @property
    def modalities(self) -> list[str]:
        """Get all modalities in the task results.

        Returns:
            A list of modalities in the task results.
        """
        mods = []
        for task_res in self.task_results:
            task_modalities = getattr(task_res, "modalities", [])
            mods.extend(task_modalities)
        if not mods:
            mods = self.default_modalities
        return list(set(mods))

domains property

Get all domains in the model results.

Returns:

Type Description
list[str]

A list of domains in the model results.

languages property

Get all languages in the model results.

Returns:

Type Description
list[str]

A list of languages in the model results.

modalities property

Get all modalities in the task results.

Returns:

Type Description
list[str]

A list of modalities in the task results.

task_names property

Get all task names in the model results.

Returns:

Type Description
list[str]

A list of task names in the model results.

task_types property

Get all task types in the model results.

Returns:

Type Description
list[str]

A list of task types in the model results.

from_validated(**data) classmethod

Create a ModelResult from validated data.

Parameters:

Name Type Description Default
data dict[str, Any]

The validated data.

{}
Source code in mteb/results/model_result.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
@classmethod
def from_validated(cls, **data: dict[str, Any]) -> Self:
    """Create a ModelResult from validated data.

    Args:
        data: The validated data.
    """
    data["task_results"] = [
        TaskResult.from_validated(**res) for res in data["task_results"]
    ]
    return cls.model_construct(**data)

select_tasks(tasks)

Select tasks from the ModelResult based on a list of AbsTask objects.

Parameters:

Name Type Description Default
tasks Sequence[AbsTask]

A sequence of AbsTask objects to select from the ModelResult.

required
Source code in mteb/results/model_result.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def select_tasks(self, tasks: Sequence[AbsTask]) -> Self:
    """Select tasks from the ModelResult based on a list of AbsTask objects.

    Args:
        tasks: A sequence of AbsTask objects to select from the ModelResult.
    """
    task_name_to_task = {task.metadata.name: task for task in tasks}
    new_task_results = [
        task_res.validate_and_filter_scores(task_name_to_task[task_res.task_name])
        for task_res in self.task_results
        if task_res.task_name in task_name_to_task
    ]
    return type(self).model_construct(
        model_name=self.model_name,
        model_revision=self.model_revision,
        task_results=new_task_results,
    )

to_dataframe(aggregation_level='task', aggregation_fn=None, include_model_revision=False, format='wide')

Get a DataFrame with the scores for all models and tasks.

The DataFrame will have the following columns in addition to the metadata columns:

  • model_name: The name of the model.
  • task_name: The name of the task.
  • score: The main score of the model on the task.

In addition, the DataFrame can have the following columns depending on the aggregation level:

  • split: The split of the task. E.g. "test", "train", "validation".
  • subset: The subset of the task. E.g. "en", "fr-en".

Afterwards, the DataFrame will be aggregated according to the aggregation method and pivoted to either a wide format.

Parameters:

Name Type Description Default
aggregation_level Literal['subset', 'split', 'task']

The aggregation to use. Can be one of: - "subset"/None: No aggregation will be done. The DataFrame will have one row per model, task, split and subset. - "split": Aggregates the scores by split. The DataFrame will have one row per model, task and split. - "task": Aggregates the scores by task. The DataFrame will have one row per model and task.

'task'
aggregation_fn Callable[[list[Score]], Any] | None

The function to use for aggregation. If None, the mean will be used.

None
include_model_revision bool

If True, the model revision will be included in the DataFrame. If False, it will be excluded.

False
format Literal['wide', 'long']

The format of the DataFrame. Can be one of: - "wide": The DataFrame will be of shape (number of tasks, number of models). Scores will be in the cells. - "long": The DataFrame will of length (number of tasks * number of model). Scores will be in columns.

'wide'

Returns:

Type Description
DataFrame

A DataFrame with the scores for all models and tasks.

Source code in mteb/results/model_result.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
def to_dataframe(
    self,
    aggregation_level: Literal["subset", "split", "task"] = "task",
    aggregation_fn: Callable[[list[Score]], Any] | None = None,
    include_model_revision: bool = False,
    format: Literal["wide", "long"] = "wide",
) -> pd.DataFrame:
    """Get a DataFrame with the scores for all models and tasks.

    The DataFrame will have the following columns in addition to the metadata columns:

    - model_name: The name of the model.
    - task_name: The name of the task.
    - score: The main score of the model on the task.

    In addition, the DataFrame can have the following columns depending on the aggregation level:

    - split: The split of the task. E.g. "test", "train", "validation".
    - subset: The subset of the task. E.g. "en", "fr-en".

    Afterwards, the DataFrame will be aggregated according to the aggregation method and pivoted to either a wide format.

    Args:
        aggregation_level: The aggregation to use. Can be one of:
            - "subset"/None: No aggregation will be done. The DataFrame will have one row per model, task, split and subset.
            - "split": Aggregates the scores by split. The DataFrame will have one row per model, task and split.
            - "task": Aggregates the scores by task. The DataFrame will have one row per model and task.
        aggregation_fn: The function to use for aggregation. If None, the mean will be used.
        include_model_revision: If True, the model revision will be included in the DataFrame. If False, it will be excluded.
        format: The format of the DataFrame. Can be one of:
            - "wide": The DataFrame will be of shape (number of tasks, number of models). Scores will be in the cells.
            - "long": The DataFrame will of length (number of tasks * number of model). Scores will be in columns.

    Returns:
        A DataFrame with the scores for all models and tasks.
    """
    scores_data = self._get_score_for_table()

    if not scores_data:
        logger.warning("No scores data available. Returning empty DataFrame.")
        return pd.DataFrame()

    # Create DataFrame
    df = pd.DataFrame(scores_data)

    _columns = ["model_name"]
    if include_model_revision is False:
        df = df.drop(columns=["model_revision"])
    else:
        _columns.append("model_revision")

    return _aggregate_and_pivot(
        df,
        columns=_columns,
        aggregation_level=aggregation_level,
        format=format,
        aggregation_fn=aggregation_fn,
    )

mteb.results.BenchmarkResults

Bases: BaseModel

Data class to hold the benchmark results of a model.

Attributes:

Name Type Description
model_results list[ModelResult]

List of ModelResult objects.

Source code in mteb/results/benchmark_results.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
class BenchmarkResults(BaseModel):
    """Data class to hold the benchmark results of a model.

    Attributes:
        model_results: List of ModelResult objects.
    """

    model_results: list[ModelResult]
    model_config = (
        ConfigDict(  # to free up the name model_results which is otherwise protected
            protected_namespaces=(),
        )
    )

    def __repr__(self) -> str:
        n_models = len(self.model_results)
        return f"BenchmarkResults(model_results=[...](#{n_models}))"

    def __hash__(self) -> int:
        return id(self)

    def _filter_tasks(
        self,
        task_names: list[str] | None = None,
        languages: list[str] | None = None,
        domains: list[TaskDomain] | None = None,
        task_types: list[TaskType] | None = None,  # type: ignore
        modalities: list[Modalities] | None = None,
        is_public: bool | None = None,
    ) -> Self:
        # TODO: Same as filter_models
        model_results = [
            res._filter_tasks(
                task_names=task_names,
                languages=languages,
                domains=domains,
                task_types=task_types,
                modalities=modalities,
                is_public=is_public,
            )
            for res in self.model_results
        ]
        return type(self).model_construct(
            model_results=[res for res in model_results if res.task_results]
        )

    def select_tasks(self, tasks: Sequence[AbsTask]) -> Self:
        """Select tasks from the benchmark results.

        Args:
            tasks: List of tasks to select. Can be a list of AbsTask objects or task names.

        Returns:
            A new BenchmarkResults object with the selected tasks.
        """
        new_model_results = [
            model_res.select_tasks(tasks) for model_res in self.model_results
        ]
        return type(self).model_construct(model_results=new_model_results)

    def select_models(
        self,
        names: list[str] | list[ModelMeta],
        revisions: list[str | None] | None = None,
    ) -> Self:
        """Get models by name and revision.

        Args:
            names: List of model names to filter by. Can also be a list of ModelMeta objects. In which case, the revision is ignored.
            revisions: List of model revisions to filter by. If None, all revisions are returned.

        Returns:
            A new BenchmarkResults object with the filtered models.
        """
        models_res = []
        _revisions = revisions if revisions is not None else [None] * len(names)

        name_rev = {}

        if len(names) != len(_revisions):
            raise ValueError(
                "The length of names and revisions must be the same or revisions must be None."
            )

        for name, revision in zip(names, _revisions):
            if isinstance(name, ModelMeta):
                name_rev[name.name] = name.revision
            else:
                name_rev[name] = revision

        for model_res in self.model_results:
            model_name = model_res.model_name
            revision = model_res.model_revision
            if model_name in name_rev:
                if name_rev[model_name] is None or revision == name_rev[model_name]:
                    models_res.append(model_res)

        return type(self).model_construct(model_results=models_res)

    def _filter_models(
        self,
        model_names: Iterable[str] | None = None,
        languages: Iterable[str] | None = None,
        open_weights: bool | None = None,
        frameworks: Iterable[str] | None = None,
        n_parameters_range: tuple[int | None, int | None] = (None, None),
        use_instructions: bool | None = None,
        zero_shot_on: list[AbsTask] | None = None,
    ) -> Self:
        # mostly a utility function for the leaderboard app.
        # I would probably move the filtering of the models outside of this call. No need to call get_model_metas inside the filter.
        # interface would then be the same as the get_models function

        model_metas = get_model_metas(
            model_names=model_names,
            languages=languages,
            open_weights=open_weights,
            frameworks=frameworks,
            n_parameters_range=n_parameters_range,
            use_instructions=use_instructions,
            zero_shot_on=zero_shot_on,
        )
        models = {meta.name for meta in model_metas}
        # model_revision_pairs = {(meta.name, meta.revision) for meta in model_metas}
        new_model_results = []
        for model_res in self:
            if model_res.model_name in models:
                new_model_results.append(model_res)

        return type(self).model_construct(model_results=new_model_results)

    def join_revisions(self) -> Self:
        """Join revisions of the same model.

        In case of conflicts, the following rules are applied:
        1) If the main revision is present, it is kept. The main revision is the defined in the models ModelMeta object.
        2) If there is multiple revisions and some of them are None or na, they are filtered out.
        3) If there is no main revision, we prefer the one run using the latest mteb version.

        Returns:
            A new BenchmarkResults object with the revisions joined.
        """

        def parse_version(version_str: str) -> Version | None:
            try:
                return Version(version_str)
            except (InvalidVersion, TypeError):
                return None

        def keep_best(group: pd.DataFrame) -> pd.DataFrame:
            # Filtering out task_results where no scores are present
            group = group[group["has_scores"]]
            is_main_revision = group["revision"] == group["main_revision"]
            # If the main revision is present we select that
            if is_main_revision.sum() > 0:
                return group[is_main_revision].head(n=1)
            unique_revisions = group["revision"].unique()

            # ensure None/NA/"external" revisions is filtered out
            group.loc[group["revision"].isna(), "revision"] = "no_revision_available"
            group.loc[group["revision"] == "external", "revision"] = (
                "no_revision_available"
            )

            # Filtering out no_revision_available if other revisions are present
            if (len(unique_revisions) > 1) and (
                "no_revision_available" in unique_revisions
            ):
                group = group[group["revision"] != "no_revision_available"]
            # If there are any not-NA mteb versions, we select the latest one
            if group["mteb_version"].notna().any():
                group = group.dropna(subset=["mteb_version"])
                group = group.sort_values("mteb_version", ascending=False)
                return group.head(n=1)
            return group.head(n=1)

        records = []
        for model_result in self:
            for task_result in model_result.task_results:
                records.append(
                    dict(
                        model=model_result.model_name,
                        revision=model_result.model_revision,
                        task_name=task_result.task_name,
                        mteb_version=task_result.mteb_version,
                        task_result=task_result,
                        has_scores=bool(task_result.scores),
                    )
                )
        if not records:
            return BenchmarkResults.model_construct(model_results=[])
        task_df = pd.DataFrame.from_records(records)
        model_to_main_revision = {
            meta.name: meta.revision for meta in get_model_metas()
        }
        task_df["main_revision"] = task_df["model"].map(model_to_main_revision)  # type: ignore
        task_df["mteb_version"] = task_df["mteb_version"].map(parse_version)  # type: ignore
        task_df = (
            task_df.groupby(["model", "task_name"])
            .apply(keep_best)
            .reset_index(drop=True)
        )
        model_results = []
        for (model, model_revision), group in task_df.groupby(["model", "revision"]):
            model_result = ModelResult.model_construct(
                model_name=model,
                model_revision=model_revision,
                task_results=list(group["task_result"]),
            )
            model_results.append(model_result)
        return BenchmarkResults.model_construct(model_results=model_results)

    def _get_scores(
        self,
        splits: list[SplitName] | None = None,
        languages: list[ISOLanguage | ISOLanguageScript] | None = None,
        scripts: list[ISOLanguageScript] | None = None,
        getter: Callable[[ScoresDict], Score] | None = None,
        aggregation: Callable[[list[Score]], Any] | None = None,
        format: Literal["wide", "long"] = "wide",
    ) -> list[dict]:
        entries = []
        if format == "wide":
            for model_res in self:
                try:
                    model_scores = model_res._get_scores(
                        splits=splits,
                        languages=languages,
                        scripts=scripts,
                        getter=getter,
                        aggregation=aggregation,
                        format="wide",
                    )
                    entries.append(
                        {
                            "model": model_res.model_name,
                            "revision": model_res.model_revision,
                            **model_scores,  # type: ignore
                        }
                    )
                except Exception as e:
                    warnings.warn(
                        f"Couldn't get scores for {model_res.model_name}({model_res.model_revision}), due to: {e}"
                    )
        if format == "long":
            for model_res in self:
                try:
                    entries.extend(
                        model_res._get_scores(
                            splits=splits,
                            languages=languages,
                            scripts=scripts,
                            getter=getter,
                            aggregation=aggregation,
                            format="long",
                        )
                    )
                except Exception as e:
                    warnings.warn(
                        f"Couldn't get scores for {model_res.model_name}({model_res.model_revision}), due to: {e}"
                    )
        return entries

    def to_dataframe(
        self,
        aggregation_level: Literal["subset", "split", "task"] = "task",
        aggregation_fn: Callable[[list[Score]], Any] | None = None,
        include_model_revision: bool = False,
        format: Literal["wide", "long"] = "wide",
    ) -> pd.DataFrame:
        """Get a DataFrame with the scores for all models and tasks.

        The DataFrame will have the following columns in addition to the metadata columns:

        - model_name: The name of the model.
        - task_name: The name of the task.
        - score: The main score of the model on the task.

        In addition, the DataFrame can have the following columns depending on the aggregation level:

        - split: The split of the task. E.g. "test", "train", "validation".
        - subset: The subset of the task. E.g. "en", "fr-en".

        Afterward, the DataFrame will be aggregated according to the aggregation method and pivoted to either a wide format.

        Args:
            aggregation_level: The aggregation to use. Can be one of:
                - "subset"/None: No aggregation will be done. The DataFrame will have one row per model, task, split and subset.
                - "split": Aggregates the scores by split. The DataFrame will have one row per model, task and split.
                - "task": Aggregates the scores by task. The DataFrame will have one row per model and task.
            aggregation_fn: The function to use for aggregation. If None, the mean will be used.
            include_model_revision: If True, the model revision will be included in the DataFrame. If False, it will be excluded.
                If there are multiple revisions for the same model, they will be joined using the `join_revisions` method.
            format: The format of the DataFrame. Can be one of:
                - "wide": The DataFrame will be of shape (number of tasks, number of models). Scores will be in the cells.
                - "long": The DataFrame will of length (number of tasks * number of model). Scores will be in columns.

        Returns:
            A DataFrame with the scores for all models and tasks.
        """
        bench_results = self
        if include_model_revision is False:
            bench_results = bench_results.join_revisions()

        scores_data = []
        for model_result in bench_results:
            scores_data.extend(model_result._get_score_for_table())

        if not scores_data:
            logger.warning("No scores data available. Returning empty DataFrame.")
            return pd.DataFrame()

        # Create DataFrame
        df = pd.DataFrame(scores_data)

        _columns = ["model_name"]
        if include_model_revision is False:
            df = df.drop(columns=["model_revision"])
        else:
            _columns.append("model_revision")

        # Aggregation
        return _aggregate_and_pivot(
            df,
            columns=_columns,
            aggregation_level=aggregation_level,
            aggregation_fn=aggregation_fn,
            format=format,
        )

    def __iter__(self) -> Iterator[ModelResult]:
        return iter(self.model_results)

    def __getitem__(self, index: int) -> ModelResult:
        return self.model_results[index]

    def to_dict(self) -> dict:
        """Convert BenchmarkResults to a dictionary."""
        return self.model_dump()

    @classmethod
    def from_dict(cls, data: dict) -> Self:
        """Create BenchmarkResults from a dictionary."""
        return cls.model_validate(data)

    def to_disk(self, path: Path | str) -> None:
        """Save the BenchmarkResults to a JSON file."""
        path = Path(path)
        with path.open("w") as out_file:
            out_file.write(self.model_dump_json(indent=2))

    @classmethod
    def from_validated(cls, **data) -> Self:
        """Create BenchmarkResults from validated data.

        Args:
            data: Dictionary containing the data.

        Returns:
            An instance of BenchmarkResults.
        """
        model_results = []
        for model_res in data["model_results"]:
            model_results.append(ModelResult.from_validated(**model_res))
        return cls.model_construct(model_results=model_results)

    @classmethod
    def from_disk(cls, path: Path | str) -> Self:
        """Load the BenchmarkResults from a JSON file.

        Args:
            path: Path to the JSON file.

        Returns:
            An instance of BenchmarkResults.
        """
        path = Path(path)
        with path.open() as in_file:
            data = json.loads(in_file.read())
        return cls.from_dict(data)

    @property
    def languages(self) -> list[str]:
        """Get all languages in the benchmark results.

        Returns:
            A list of languages in ISO 639-1 format.
        """
        langs = []
        for model_res in self.model_results:
            langs.extend(model_res.languages)
        return list(set(langs))

    @property
    def domains(self) -> list[str]:
        """Get all domains in the benchmark results.

        Returns:
            A list of domains in ISO 639-1 format.
        """
        ds = []
        for model_res in self.model_results:
            ds.extend(model_res.domains)
        return list(set(ds))

    @property
    def task_types(self) -> list[str]:
        """Get all task types in the benchmark results.

        Returns:
            A list of task types.
        """
        ts = []
        for model_res in self.model_results:
            ts.extend(model_res.task_types)
        return list(set(ts))

    @property
    def task_names(self) -> list[str]:
        """Get all task names in the benchmark results.

        Returns:
            A list of task names.
        """
        names = []
        for model_res in self.model_results:
            names.extend(model_res.task_names)
        return list(set(names))

    @property
    def modalities(self) -> list[str]:
        """Get all modalities in the benchmark results.

        Returns:
            A list of modalities.
        """
        mod = []
        for model_res in self.model_results:
            mod.extend(model_res.modalities)
        return list(set(mod))

    @property
    def model_names(self) -> list[str]:
        """Get all model names in the benchmark results.

        Returns:
            A list of model names.
        """
        return [model_res.model_name for model_res in self.model_results]

    @property
    def model_revisions(self) -> list[dict[str, str | None]]:
        """Get all model revisions in the benchmark results.

        Returns:
            A list of dictionaries with model names and revisions.
        """
        return [
            {"model_name": model_res.model_name, "revision": model_res.model_revision}
            for model_res in self.model_results
        ]

domains property

Get all domains in the benchmark results.

Returns:

Type Description
list[str]

A list of domains in ISO 639-1 format.

languages property

Get all languages in the benchmark results.

Returns:

Type Description
list[str]

A list of languages in ISO 639-1 format.

modalities property

Get all modalities in the benchmark results.

Returns:

Type Description
list[str]

A list of modalities.

model_names property

Get all model names in the benchmark results.

Returns:

Type Description
list[str]

A list of model names.

model_revisions property

Get all model revisions in the benchmark results.

Returns:

Type Description
list[dict[str, str | None]]

A list of dictionaries with model names and revisions.

task_names property

Get all task names in the benchmark results.

Returns:

Type Description
list[str]

A list of task names.

task_types property

Get all task types in the benchmark results.

Returns:

Type Description
list[str]

A list of task types.

from_dict(data) classmethod

Create BenchmarkResults from a dictionary.

Source code in mteb/results/benchmark_results.py
374
375
376
377
@classmethod
def from_dict(cls, data: dict) -> Self:
    """Create BenchmarkResults from a dictionary."""
    return cls.model_validate(data)

from_disk(path) classmethod

Load the BenchmarkResults from a JSON file.

Parameters:

Name Type Description Default
path Path | str

Path to the JSON file.

required

Returns:

Type Description
Self

An instance of BenchmarkResults.

Source code in mteb/results/benchmark_results.py
400
401
402
403
404
405
406
407
408
409
410
411
412
413
@classmethod
def from_disk(cls, path: Path | str) -> Self:
    """Load the BenchmarkResults from a JSON file.

    Args:
        path: Path to the JSON file.

    Returns:
        An instance of BenchmarkResults.
    """
    path = Path(path)
    with path.open() as in_file:
        data = json.loads(in_file.read())
    return cls.from_dict(data)

from_validated(**data) classmethod

Create BenchmarkResults from validated data.

Parameters:

Name Type Description Default
data

Dictionary containing the data.

{}

Returns:

Type Description
Self

An instance of BenchmarkResults.

Source code in mteb/results/benchmark_results.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
@classmethod
def from_validated(cls, **data) -> Self:
    """Create BenchmarkResults from validated data.

    Args:
        data: Dictionary containing the data.

    Returns:
        An instance of BenchmarkResults.
    """
    model_results = []
    for model_res in data["model_results"]:
        model_results.append(ModelResult.from_validated(**model_res))
    return cls.model_construct(model_results=model_results)

join_revisions()

Join revisions of the same model.

In case of conflicts, the following rules are applied: 1) If the main revision is present, it is kept. The main revision is the defined in the models ModelMeta object. 2) If there is multiple revisions and some of them are None or na, they are filtered out. 3) If there is no main revision, we prefer the one run using the latest mteb version.

Returns:

Type Description
Self

A new BenchmarkResults object with the revisions joined.

Source code in mteb/results/benchmark_results.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def join_revisions(self) -> Self:
    """Join revisions of the same model.

    In case of conflicts, the following rules are applied:
    1) If the main revision is present, it is kept. The main revision is the defined in the models ModelMeta object.
    2) If there is multiple revisions and some of them are None or na, they are filtered out.
    3) If there is no main revision, we prefer the one run using the latest mteb version.

    Returns:
        A new BenchmarkResults object with the revisions joined.
    """

    def parse_version(version_str: str) -> Version | None:
        try:
            return Version(version_str)
        except (InvalidVersion, TypeError):
            return None

    def keep_best(group: pd.DataFrame) -> pd.DataFrame:
        # Filtering out task_results where no scores are present
        group = group[group["has_scores"]]
        is_main_revision = group["revision"] == group["main_revision"]
        # If the main revision is present we select that
        if is_main_revision.sum() > 0:
            return group[is_main_revision].head(n=1)
        unique_revisions = group["revision"].unique()

        # ensure None/NA/"external" revisions is filtered out
        group.loc[group["revision"].isna(), "revision"] = "no_revision_available"
        group.loc[group["revision"] == "external", "revision"] = (
            "no_revision_available"
        )

        # Filtering out no_revision_available if other revisions are present
        if (len(unique_revisions) > 1) and (
            "no_revision_available" in unique_revisions
        ):
            group = group[group["revision"] != "no_revision_available"]
        # If there are any not-NA mteb versions, we select the latest one
        if group["mteb_version"].notna().any():
            group = group.dropna(subset=["mteb_version"])
            group = group.sort_values("mteb_version", ascending=False)
            return group.head(n=1)
        return group.head(n=1)

    records = []
    for model_result in self:
        for task_result in model_result.task_results:
            records.append(
                dict(
                    model=model_result.model_name,
                    revision=model_result.model_revision,
                    task_name=task_result.task_name,
                    mteb_version=task_result.mteb_version,
                    task_result=task_result,
                    has_scores=bool(task_result.scores),
                )
            )
    if not records:
        return BenchmarkResults.model_construct(model_results=[])
    task_df = pd.DataFrame.from_records(records)
    model_to_main_revision = {
        meta.name: meta.revision for meta in get_model_metas()
    }
    task_df["main_revision"] = task_df["model"].map(model_to_main_revision)  # type: ignore
    task_df["mteb_version"] = task_df["mteb_version"].map(parse_version)  # type: ignore
    task_df = (
        task_df.groupby(["model", "task_name"])
        .apply(keep_best)
        .reset_index(drop=True)
    )
    model_results = []
    for (model, model_revision), group in task_df.groupby(["model", "revision"]):
        model_result = ModelResult.model_construct(
            model_name=model,
            model_revision=model_revision,
            task_results=list(group["task_result"]),
        )
        model_results.append(model_result)
    return BenchmarkResults.model_construct(model_results=model_results)

select_models(names, revisions=None)

Get models by name and revision.

Parameters:

Name Type Description Default
names list[str] | list[ModelMeta]

List of model names to filter by. Can also be a list of ModelMeta objects. In which case, the revision is ignored.

required
revisions list[str | None] | None

List of model revisions to filter by. If None, all revisions are returned.

None

Returns:

Type Description
Self

A new BenchmarkResults object with the filtered models.

Source code in mteb/results/benchmark_results.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def select_models(
    self,
    names: list[str] | list[ModelMeta],
    revisions: list[str | None] | None = None,
) -> Self:
    """Get models by name and revision.

    Args:
        names: List of model names to filter by. Can also be a list of ModelMeta objects. In which case, the revision is ignored.
        revisions: List of model revisions to filter by. If None, all revisions are returned.

    Returns:
        A new BenchmarkResults object with the filtered models.
    """
    models_res = []
    _revisions = revisions if revisions is not None else [None] * len(names)

    name_rev = {}

    if len(names) != len(_revisions):
        raise ValueError(
            "The length of names and revisions must be the same or revisions must be None."
        )

    for name, revision in zip(names, _revisions):
        if isinstance(name, ModelMeta):
            name_rev[name.name] = name.revision
        else:
            name_rev[name] = revision

    for model_res in self.model_results:
        model_name = model_res.model_name
        revision = model_res.model_revision
        if model_name in name_rev:
            if name_rev[model_name] is None or revision == name_rev[model_name]:
                models_res.append(model_res)

    return type(self).model_construct(model_results=models_res)

select_tasks(tasks)

Select tasks from the benchmark results.

Parameters:

Name Type Description Default
tasks Sequence[AbsTask]

List of tasks to select. Can be a list of AbsTask objects or task names.

required

Returns:

Type Description
Self

A new BenchmarkResults object with the selected tasks.

Source code in mteb/results/benchmark_results.py
80
81
82
83
84
85
86
87
88
89
90
91
92
def select_tasks(self, tasks: Sequence[AbsTask]) -> Self:
    """Select tasks from the benchmark results.

    Args:
        tasks: List of tasks to select. Can be a list of AbsTask objects or task names.

    Returns:
        A new BenchmarkResults object with the selected tasks.
    """
    new_model_results = [
        model_res.select_tasks(tasks) for model_res in self.model_results
    ]
    return type(self).model_construct(model_results=new_model_results)

to_dataframe(aggregation_level='task', aggregation_fn=None, include_model_revision=False, format='wide')

Get a DataFrame with the scores for all models and tasks.

The DataFrame will have the following columns in addition to the metadata columns:

  • model_name: The name of the model.
  • task_name: The name of the task.
  • score: The main score of the model on the task.

In addition, the DataFrame can have the following columns depending on the aggregation level:

  • split: The split of the task. E.g. "test", "train", "validation".
  • subset: The subset of the task. E.g. "en", "fr-en".

Afterward, the DataFrame will be aggregated according to the aggregation method and pivoted to either a wide format.

Parameters:

Name Type Description Default
aggregation_level Literal['subset', 'split', 'task']

The aggregation to use. Can be one of: - "subset"/None: No aggregation will be done. The DataFrame will have one row per model, task, split and subset. - "split": Aggregates the scores by split. The DataFrame will have one row per model, task and split. - "task": Aggregates the scores by task. The DataFrame will have one row per model and task.

'task'
aggregation_fn Callable[[list[Score]], Any] | None

The function to use for aggregation. If None, the mean will be used.

None
include_model_revision bool

If True, the model revision will be included in the DataFrame. If False, it will be excluded. If there are multiple revisions for the same model, they will be joined using the join_revisions method.

False
format Literal['wide', 'long']

The format of the DataFrame. Can be one of: - "wide": The DataFrame will be of shape (number of tasks, number of models). Scores will be in the cells. - "long": The DataFrame will of length (number of tasks * number of model). Scores will be in columns.

'wide'

Returns:

Type Description
DataFrame

A DataFrame with the scores for all models and tasks.

Source code in mteb/results/benchmark_results.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
def to_dataframe(
    self,
    aggregation_level: Literal["subset", "split", "task"] = "task",
    aggregation_fn: Callable[[list[Score]], Any] | None = None,
    include_model_revision: bool = False,
    format: Literal["wide", "long"] = "wide",
) -> pd.DataFrame:
    """Get a DataFrame with the scores for all models and tasks.

    The DataFrame will have the following columns in addition to the metadata columns:

    - model_name: The name of the model.
    - task_name: The name of the task.
    - score: The main score of the model on the task.

    In addition, the DataFrame can have the following columns depending on the aggregation level:

    - split: The split of the task. E.g. "test", "train", "validation".
    - subset: The subset of the task. E.g. "en", "fr-en".

    Afterward, the DataFrame will be aggregated according to the aggregation method and pivoted to either a wide format.

    Args:
        aggregation_level: The aggregation to use. Can be one of:
            - "subset"/None: No aggregation will be done. The DataFrame will have one row per model, task, split and subset.
            - "split": Aggregates the scores by split. The DataFrame will have one row per model, task and split.
            - "task": Aggregates the scores by task. The DataFrame will have one row per model and task.
        aggregation_fn: The function to use for aggregation. If None, the mean will be used.
        include_model_revision: If True, the model revision will be included in the DataFrame. If False, it will be excluded.
            If there are multiple revisions for the same model, they will be joined using the `join_revisions` method.
        format: The format of the DataFrame. Can be one of:
            - "wide": The DataFrame will be of shape (number of tasks, number of models). Scores will be in the cells.
            - "long": The DataFrame will of length (number of tasks * number of model). Scores will be in columns.

    Returns:
        A DataFrame with the scores for all models and tasks.
    """
    bench_results = self
    if include_model_revision is False:
        bench_results = bench_results.join_revisions()

    scores_data = []
    for model_result in bench_results:
        scores_data.extend(model_result._get_score_for_table())

    if not scores_data:
        logger.warning("No scores data available. Returning empty DataFrame.")
        return pd.DataFrame()

    # Create DataFrame
    df = pd.DataFrame(scores_data)

    _columns = ["model_name"]
    if include_model_revision is False:
        df = df.drop(columns=["model_revision"])
    else:
        _columns.append("model_revision")

    # Aggregation
    return _aggregate_and_pivot(
        df,
        columns=_columns,
        aggregation_level=aggregation_level,
        aggregation_fn=aggregation_fn,
        format=format,
    )

to_dict()

Convert BenchmarkResults to a dictionary.

Source code in mteb/results/benchmark_results.py
370
371
372
def to_dict(self) -> dict:
    """Convert BenchmarkResults to a dictionary."""
    return self.model_dump()

to_disk(path)

Save the BenchmarkResults to a JSON file.

Source code in mteb/results/benchmark_results.py
379
380
381
382
383
def to_disk(self, path: Path | str) -> None:
    """Save the BenchmarkResults to a JSON file."""
    path = Path(path)
    with path.open("w") as out_file:
        out_file.write(self.model_dump_json(indent=2))