Skip to content

Results

When a models is evaluated in MTEB it produces results. These results consist of:

  • TaskResult: Result for a single task
  • ModelResult: Result for a model on a set of tasks
  • BenchmarkResults: Result for a set of models models on a set of tasks

In normal use these come up when running a model:

# ...
models_results = mteb.evaluate(model, tasks) 
type(models_results) # mteb.results.ModelResults

task_result = models_results.task_results
type(models_results) # mteb.results.TaskResult

Result Objects

mteb.results.TaskResult

Bases: BaseModel

A class to represent the MTEB result.

Attributes:

Name Type Description
task_name str

The name of the MTEB task.

dataset_revision str

The revision dataset for the task on HuggingFace dataset hub.

mteb_version str | None

The version of the MTEB used to evaluate the model.

scores dict[SplitName, list[ScoresDict]]

The scores of the model on the dataset. The scores is a dictionary with the following structure; dict[SplitName, list[Scores]]. Where Scores is a dictionary with the following structure; dict[str, Any]. Where the keys and values are scores. Split is the split of the dataset.

evaluation_time float | None

The time taken to evaluate the model.

kg_co2_emissions float | None

The kg of CO2 emissions produced by the model during evaluation.

Examples:

>>> scores = {
...     "evaluation_time": 100,
...     "train": {
...         "en-de": {
...             "main_score": 0.5,
...         },
...         "en-fr": {
...             "main_score": 0.6,
...         },
...     },
... }
>>> sample_task = ... # some MTEB task
>>> mteb_results = TaskResult.from_task_results(sample_task, scores)
>>> mteb_results.get_score()  # get the main score for all languages
0.55
>>> mteb_results.get_score(languages=["fra"])  # get the main score for French
0.6
>>> mteb_results.to_dict()
{'dataset_revision': '1.0', 'task_name': 'sample_task', 'mteb_version': '1.0.0', 'evaluation_time': 100, 'scores': {'train':
    [
        {'main_score': 0.5, 'hf_subset': 'en-de', 'languages': ['eng-Latn', 'deu-Latn']},
        {'main_score': 0.6, 'hf_subset': 'en-fr', 'languages': ['eng-Latn', 'fra-Latn']}
    ]}
}
Source code in mteb/results/task_result.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
class TaskResult(BaseModel):
    """A class to represent the MTEB result.

    Attributes:
        task_name: The name of the MTEB task.
        dataset_revision: The revision dataset for the task on HuggingFace dataset hub.
        mteb_version: The version of the MTEB used to evaluate the model.
        scores: The scores of the model on the dataset. The scores is a dictionary with the following structure; dict[SplitName, list[Scores]].
            Where Scores is a dictionary with the following structure; dict[str, Any]. Where the keys and values are scores. Split is the split of
            the dataset.
        evaluation_time: The time taken to evaluate the model.
        kg_co2_emissions: The kg of CO2 emissions produced by the model during evaluation.

    Examples:
        >>> scores = {
        ...     "evaluation_time": 100,
        ...     "train": {
        ...         "en-de": {
        ...             "main_score": 0.5,
        ...         },
        ...         "en-fr": {
        ...             "main_score": 0.6,
        ...         },
        ...     },
        ... }
        >>> sample_task = ... # some MTEB task
        >>> mteb_results = TaskResult.from_task_results(sample_task, scores)
        >>> mteb_results.get_score()  # get the main score for all languages
        0.55
        >>> mteb_results.get_score(languages=["fra"])  # get the main score for French
        0.6
        >>> mteb_results.to_dict()
        {'dataset_revision': '1.0', 'task_name': 'sample_task', 'mteb_version': '1.0.0', 'evaluation_time': 100, 'scores': {'train':
            [
                {'main_score': 0.5, 'hf_subset': 'en-de', 'languages': ['eng-Latn', 'deu-Latn']},
                {'main_score': 0.6, 'hf_subset': 'en-fr', 'languages': ['eng-Latn', 'fra-Latn']}
            ]}
        }
    """

    dataset_revision: str
    task_name: str
    mteb_version: str | None
    scores: dict[SplitName, list[ScoresDict]]
    evaluation_time: float | None
    kg_co2_emissions: float | None = None

    @classmethod
    def from_task_results(
        cls,
        task: AbsTask | type[AbsTask],
        scores: dict[SplitName, dict[HFSubset, ScoresDict]],
        evaluation_time: float,
        kg_co2_emissions: float | None = None,
    ) -> TaskResult:
        task_meta = task.metadata
        subset2langscripts = task_meta.hf_subsets_to_langscripts
        flat_scores = defaultdict(list)
        for split, hf_subset_scores in scores.items():
            for hf_subset, hf_scores in hf_subset_scores.items():
                eval_langs = subset2langscripts[hf_subset]
                _scores = {
                    **hf_scores,
                    "hf_subset": hf_subset,
                    "languages": eval_langs,
                }
                flat_scores[split].append(_scores)

        return TaskResult(
            dataset_revision=task.metadata.revision,
            task_name=task.metadata.name,
            mteb_version=version("mteb"),
            scores=flat_scores,
            evaluation_time=evaluation_time,
            kg_co2_emissions=kg_co2_emissions,
        )

    @field_validator("scores")
    @classmethod
    def _validate_scores(
        cls, v: dict[SplitName, list[ScoresDict]]
    ) -> dict[SplitName, list[ScoresDict]]:
        for split, hf_subset_scores in v.items():
            for hf_subset_score in hf_subset_scores:
                if not isinstance(hf_subset_score, dict):
                    raise ValueError("Scores should be a dictionary")
                cls._validate_scores_dict(hf_subset_score)
        return v

    @staticmethod
    def _validate_scores_dict(scores: ScoresDict) -> None:
        if "main_score" not in scores:
            raise ValueError("'main_score' should be in scores")
        if "hf_subset" not in scores or not isinstance(scores["hf_subset"], str):
            raise ValueError("hf_subset should be in scores and should be a string")
        if "languages" not in scores or not isinstance(scores["languages"], list):
            raise ValueError("languages should be in scores and should be a list")

        # check that it is json serializable
        try:
            _ = json.dumps(scores)
        except Exception as e:
            raise ValueError(f"Scores are not json serializable: {e}")

    @property
    def languages(self) -> list[str]:
        """Get the languages present in the scores."""
        langs = []
        for split, split_res in self.scores.items():
            for entry in split_res:
                langs.extend([lang.split("-")[0] for lang in entry["languages"]])
        return list(set(langs))

    @cached_property
    def task(self) -> AbsTask:
        """Get the task associated with the result."""
        from mteb.overview import get_task

        return get_task(self.task_name)

    @property
    def domains(self) -> list[str]:
        """Get the domains of the task."""
        doms = self.task.metadata.domains
        if doms is None:
            doms = []
        return doms  # type: ignore

    @property
    def task_type(self) -> str:
        """Get the type of the task."""
        return self.task.metadata.type

    @property
    def hf_subsets(self) -> list[str]:
        """Get the hf_subsets present in the scores."""
        hf_subsets = set()
        for split, split_res in self.scores.items():
            for entry in split_res:
                hf_subsets.add(entry["hf_subset"])
        return list(hf_subsets)

    @property
    def eval_splits(self) -> list[str]:
        """Get the eval splits present in the scores."""
        return list(self.scores.keys())

    def to_dict(self) -> dict:
        """Convert the TaskResult to a dictionary."""
        return self.model_dump()

    @classmethod
    def from_dict(cls, data: dict) -> TaskResult:
        """Create a TaskResult from a dictionary."""
        return cls.model_validate(data)

    def _round_scores(self, scores: dict[SplitName, list[ScoresDict]], n: int) -> None:
        """Recursively round scores to n decimal places"""
        for key, value in scores.items():
            if isinstance(value, dict):
                self._round_scores(value, n)
            elif isinstance(value, list):
                for i, v in enumerate(value):
                    if isinstance(v, dict):
                        self._round_scores(v, n)
                    elif isinstance(v, float):
                        value[i] = round(v, n)

            elif isinstance(value, float):
                scores[key] = round(value, n)

    def to_disk(self, path: Path) -> None:
        json_obj = self.model_dump()
        self._round_scores(json_obj["scores"], 6)

        with path.open("w") as f:
            json.dump(json_obj, f, indent=2)

    @classmethod
    def from_disk(cls, path: Path, load_historic_data: bool = True) -> TaskResult:  # type: ignore
        """Load TaskResult from disk.

        Args:
            path: The path to the file to load.
            load_historic_data: Whether to attempt to load historic data from before v1.11.0.
        """
        with path.open("r", encoding="utf-8") as f:
            data = json.load(f)

        if not load_historic_data:
            try:
                return cls.model_validate(data)
            except Exception as e:
                raise ValueError(
                    f"Error loading TaskResult from disk. You can try to load historic data by setting `load_historic_data=True`. Error: {e}"
                )

        pre_1_11_load = (
            (
                "mteb_version" in data
                and data["mteb_version"] is not None
                and Version(data["mteb_version"]) < Version("1.11.0")
            )
            or "mteb_version" not in data
        )  # assume it is before 1.11.0 if the version is not present

        try:
            obj = cls.model_validate(data)
        except Exception as e:
            if not pre_1_11_load:
                raise e
            logger.debug(
                f"Could not load TaskResult from disk, got error: {e}. Attempting to load from disk using format from before v1.11.0"
            )
            obj = cls._convert_from_before_v1_11_0(data)

        pre_v_12_48 = (
            "mteb_version" in data
            and data["mteb_version"] is not None
            and Version(data["mteb_version"]) < Version("1.12.48")
        )

        if pre_v_12_48:
            cls._fix_pair_classification_scores(obj)

        return obj

    @classmethod
    def _fix_pair_classification_scores(cls, obj: TaskResult) -> None:
        from mteb import get_task

        task_name = obj.task_name
        if task_name in outdated_tasks:
            task = outdated_tasks[task_name]
        else:
            task = get_task(obj.task_name)

        if task.metadata.type == "PairClassification":
            for split, split_scores in obj.scores.items():
                for hf_subset_scores in split_scores:
                    # concatenate score e.g. ["max"]["ap"] -> ["max_ap"]
                    for key in list(hf_subset_scores.keys()):
                        if isinstance(hf_subset_scores[key], dict):
                            for k, v in hf_subset_scores[key].items():
                                hf_subset_scores[f"{key}_{k}"] = v
                            hf_subset_scores.pop(key)

    @classmethod
    def _convert_from_before_v1_11_0(cls, data: dict) -> TaskResult:
        from mteb.overview import _TASKS_REGISTRY

        # in case the task name is not found in the registry, try to find a lower case version
        lower_case_registry = {k.lower(): v for k, v in _TASKS_REGISTRY.items()}

        scores = {**data}

        dataset_revision = scores.pop(
            "dataset_revision", "dataset revision not available"
        )
        task_name = scores.pop("mteb_dataset_name")
        mteb_version = scores.pop("mteb_version", "mteb version not available")

        # calculate evaluation time across all splits (move to top level)
        evaluation_time = 0
        for split, split_score in scores.items():
            if "evaluation_time" in split_score:
                evaluation_time += split_score.pop("evaluation_time")

        # normalize the scores to always be {split: {hf_subset: scores}}
        contains_hf_subset = any(
            isinstance(hf_subset_scores, dict)
            for split_scores in scores.values()
            for k, hf_subset_scores in split_scores.items()
            if k
            not in {"v_measures", "cos_sim", "euclidean", "manhattan", "dot", "max"}
        )
        if not contains_hf_subset:
            for split, split_score in scores.items():
                scores[split] = {"default": split_score.copy()}

        if task_name in outdated_tasks:
            logger.debug(
                f"Loading {task_name} as a dummy task as it no longer exists within MTEB. To avoid this set `load_historic_data=False`"
            )
            task = outdated_tasks[task_name]
        else:
            if task_name in renamed_tasks:
                task_name = renamed_tasks[task_name]
            task = _TASKS_REGISTRY.get(
                task_name, lower_case_registry[task_name.lower()]
            )

        # make sure that main score exists
        main_score = task.metadata.main_score
        for split, split_score in scores.items():
            for hf_subset, hf_subset_scores in split_score.items():
                for name, prev_name in [
                    (ScoringFunction.COSINE.value, "cos_sim"),
                    (ScoringFunction.MANHATTAN.value, "manhattan"),
                    (ScoringFunction.EUCLIDEAN.value, "euclidean"),
                    (ScoringFunction.DOT_PRODUCT.value, "dot"),
                    ("max", "max"),
                    ("similarity", "similarity"),
                ]:
                    prev_name_scores = hf_subset_scores.pop(prev_name, None)
                    if prev_name_scores is not None:
                        for k, v in prev_name_scores.items():
                            hf_subset_scores[f"{name}_{k}"] = v

                if "main_score" not in hf_subset_scores:
                    if main_score in hf_subset_scores:
                        hf_subset_scores["main_score"] = hf_subset_scores[main_score]
                    else:
                        logger.warning(f"Main score {main_score} not found in scores")
                        hf_subset_scores["main_score"] = None

        # specific fixes:
        if task_name == "MLSUMClusteringP2P" and mteb_version in [
            "1.1.2.dev0",
            "1.1.3.dev0",
        ]:  # back then it was only the french subsection which was implemented
            scores["test"]["fr"] = scores["test"].pop("default")
        if task_name == "MLSUMClusteringS2S" and mteb_version in [
            "1.1.2.dev0",
            "1.1.3.dev0",
        ]:
            scores["test"]["fr"] = scores["test"].pop("default")
        if task_name == "XPQARetrieval":  # subset were renamed from "fr" to "fra-fra"
            if "test" in scores and "fr" in scores["test"]:
                scores["test"]["fra-fra"] = scores["test"].pop("fr")

        result: TaskResult = TaskResult.from_task_results(
            task,  # type: ignore
            scores,
            evaluation_time,
            kg_co2_emissions=None,
        )
        result.dataset_revision = dataset_revision
        result.mteb_version = mteb_version
        return result

    def get_score(
        self,
        splits: list[SplitName] | None = None,
        languages: list[ISOLanguage | ISOLanguageScript] | None = None,
        scripts: list[ISOLanguageScript] | None = None,
        getter: Callable[[ScoresDict], Score] = lambda scores: scores["main_score"],
        aggregation: Callable[[list[Score]], Any] = np.mean,
    ) -> Any:
        """Get a score for the specified splits, languages, scripts and aggregation function.

        Args:
            splits: The splits to consider.
            languages: The languages to consider. Can be ISO language codes or ISO language script codes.
            scripts: The scripts to consider.
            getter: A function that takes a scores dictionary and returns a score e.g. "main_score" or "evaluation_time".
            aggregation: The aggregation function to use.

        Returns:
            The result of the aggregation function on the scores.
        """
        if splits is None:
            splits = list(self.scores.keys())

        lang_scripts = LanguageScripts.from_languages_and_scripts(languages, scripts)

        values = []
        for split in splits:
            if split not in self.scores:
                raise ValueError(f"Split {split} not found in scores")

            for scores in self.scores[split]:
                eval_langs = scores["languages"]
                for lang in eval_langs:
                    if lang_scripts.contains_language(lang):
                        values.append(getter(scores))
                        break

        return aggregation(values)

    def _get_score_fast(
        self,
        splits: Iterable[str] | None = None,
        languages: str | None = None,
        subsets: Iterable[str] | None = None,
    ) -> float:
        """Sped up version of get_score that will be used if no aggregation, script or getter needs to be specified."""
        if splits is None:
            splits = self.scores.keys()
        val_sum = 0
        n_val = 0
        for split in splits:
            if split not in self.scores:
                raise ValueError(f"Split missing from scores: {split}")

            for scores in self.scores[split]:
                langs = scores["languages"]
                hf_subset = scores["hf_subset"]
                main_score = scores.get("main_score", None)
                if main_score is None:
                    raise ValueError(f"Missing main score for subset: {hf_subset}")
                if subsets and hf_subset not in subsets:
                    continue
                elif subsets:
                    val_sum += main_score
                    n_val += 1
                    continue

                if languages is None:
                    val_sum += main_score
                    n_val += 1
                    continue
                for lang in langs:
                    if lang.split("-")[0] in languages:
                        val_sum += main_score
                        n_val += 1
                        logger.info(f"{val_sum=}, {n_val=}")
                        break
        if n_val == 0:
            raise ValueError("No splits had scores for the specified languages.")
        return val_sum / n_val

    @classmethod
    def from_validated(cls, **data) -> TaskResult:
        return cls.model_construct(**data)

    def __repr__(self) -> str:
        return f"TaskResult(task_name={self.task_name}, scores=...)"

    def only_main_score(self) -> TaskResult:
        new_scores = {}
        for split in self.scores:
            new_scores[split] = []
            for subset_scores in self.scores[split]:
                new_scores[split].append(
                    {
                        "hf_subset": subset_scores.get("hf_subset", "default"),
                        "main_score": subset_scores.get("main_score", np.nan),
                        "languages": subset_scores.get("languages", []),
                    }
                )
        new_res = {**self.to_dict(), "scores": new_scores}
        new_res = TaskResult.from_validated(**new_res)
        return new_res

    def validate_and_filter_scores(self, task: AbsTask | None = None) -> TaskResult:
        """This ensures that the scores are correct for the given task, by removing any splits besides those specified in the task metadata.
        Additionally it also ensure that all of the splits required as well as the languages are present in the scores.
        Returns new TaskResult object.

        Args:
            task: The task to validate the scores against. E.g. if the task supplied is limited to certain splits and languages,
                the scores will be filtered to only include those splits and languages. If None it will attempt to get the task from the task_name.
        """
        from mteb.overview import get_task

        if task is None:
            task = get_task(self.task_name)

        splits = task.eval_splits
        hf_subsets = task.hf_subsets
        hf_subsets = set(hf_subsets)

        new_scores = {}
        seen_splits = set()
        for split in self.scores:
            if split not in splits:
                continue
            new_scores[split] = []
            seen_subsets = set()
            for _scores in self.scores[split]:
                if _scores["hf_subset"] not in hf_subsets:
                    continue
                new_scores[split].append(_scores)
                seen_subsets.add(_scores["hf_subset"])
            if seen_subsets != hf_subsets:
                missing_subsets = hf_subsets - seen_subsets
                if len(missing_subsets) > 2:
                    subset1, subset2 = list(missing_subsets)[:2]
                    missing_subsets_str = f"{{'{subset1}', '{subset2}', ...}}"
                else:
                    missing_subsets_str = str(missing_subsets)

                logger.warning(
                    f"{task.metadata.name}: Missing subsets {missing_subsets_str} for split {split}"
                )
            seen_splits.add(split)
        if seen_splits != set(splits):
            logger.warning(
                f"{task.metadata.name}: Missing splits {set(splits) - seen_splits}"
            )
        new_res = {**self.to_dict(), "scores": new_scores}
        new_res = TaskResult.from_validated(**new_res)
        return new_res

    def is_mergeable(
        self,
        result: TaskResult | AbsTask,
        criteria: list[str] | list[Criterias] = [
            "mteb_version",
            "dataset_revision",
        ],
        raise_error: bool = False,
    ) -> bool:
        """Checks if the TaskResult object can be merged with another TaskResult or Task.

        Args:
            result: The TaskResult or Task object to check against.
            criteria: Additional criteria to check for merging. Can be "mteb_version" or "dataset_revision".
                It will always check that the task name match.
            raise_error: If True, raises an error if the objects cannot be merged. If False, returns False.

        Returns:
            True if the TaskResult object can be merged with the other object, False otherwise.
        """
        criteria = [
            Criterias.from_str(c) if isinstance(c, str) else c for c in criteria
        ]
        if isinstance(result, TaskResult):
            name = result.task_name
            revision = result.dataset_revision
            mteb_version = result.mteb_version
        elif isinstance(result, AbsTask):
            mteb_version = version("mteb")
            name = result.metadata.name
            revision = result.metadata.revision
        else:
            return False

        if self.task_name != name:
            if raise_error:
                raise ValueError(
                    f"Cannot merge TaskResult objects as they are derived from different tasks ({self.task_name} and {name})"
                )
            return False

        if Criterias.MTEB_VERSION in criteria and self.mteb_version != mteb_version:
            if raise_error:
                raise ValueError(
                    f"Cannot merge TaskResult objects as they are derived from different MTEB versions ({self.mteb_version} and {mteb_version})"
                )
            return False

        if Criterias.DATASET_REVISION in criteria and self.dataset_revision != revision:
            if raise_error:
                raise ValueError(
                    f"Cannot merge TaskResult objects as they are derived from different dataset revisions ({self.dataset_revision} and {revision})"
                )
            return False

        return True

    def merge(
        self,
        new_results: TaskResult,
        criteria: list[str] | list[Criterias] = [
            "mteb_version",
            "dataset_revision",
        ],
    ) -> TaskResult:
        """Merges two TaskResult objects.

        Args:
            new_results: The new TaskResult object to merge with the current one.
            criteria: Additional criteria to check for merging. Can be "mteb_version" or "dataset_revision".
                It will always check that the task name match.

        Returns:
            A new TaskResult object with the merged scores.
        """
        self.is_mergeable(new_results, criteria=criteria, raise_error=True)

        merged_scores = self.scores.copy()

        for split, scores in new_results.scores.items():
            if split in merged_scores:
                merged_scores[split] = self._merge_split_scores(
                    merged_scores[split], scores
                )
            else:
                merged_scores[split] = scores

        existing_kg_co2_emissions = (
            self.kg_co2_emissions if self.kg_co2_emissions else 0
        )
        new_kg_co2_emissions = (
            new_results.kg_co2_emissions if new_results.kg_co2_emissions else 0
        )
        merged_kg_co2_emissions = None
        if existing_kg_co2_emissions and new_kg_co2_emissions:
            merged_kg_co2_emissions = existing_kg_co2_emissions + new_kg_co2_emissions

        merged_evaluation_time = None
        if self.evaluation_time and new_results.evaluation_time:
            merged_evaluation_time = self.evaluation_time + new_results.evaluation_time
        merged_results = TaskResult(
            dataset_revision=new_results.dataset_revision,
            task_name=new_results.task_name,
            mteb_version=new_results.mteb_version,
            scores=merged_scores,
            evaluation_time=merged_evaluation_time,
            kg_co2_emissions=merged_kg_co2_emissions,
        )

        return merged_results

    @staticmethod
    def _merge_split_scores(
        existing_scores: list[ScoresDict], new_scores: list[ScoresDict]
    ) -> list[ScoresDict]:
        merged = {score["hf_subset"]: score for score in existing_scores}
        for score in new_scores:
            merged[score["hf_subset"]] = score
        return list(merged.values())

    def get_missing_evaluations(self, task: AbsTask) -> dict[str, list[str]]:
        """Checks which splits and subsets are missing from the results.

        Args:
            task: The task to check against.

        Returns:
            A dictionary with the splits as keys and a list of missing subsets as values.
        """
        missing_splits = {}
        for splits in task.eval_splits:
            if splits not in self.scores:  # split it fully missing
                missing_splits[splits] = task.hf_subsets
            if splits in self.scores:
                hf_subsets = {score["hf_subset"] for score in self.scores[splits]}
                missing_subsets = list(set(task.hf_subsets) - hf_subsets)
                if missing_subsets:
                    missing_splits[splits] = missing_subsets

        return missing_splits

    def get_hf_eval_results(self) -> list[EvalResult]:
        """Create HF evaluation results objects from TaskResult objects.

        Returns:
            List of EvalResult objects for each split and subset.
        """
        task_metadata = self.task.metadata
        task_type = task_metadata._hf_task_type()[0]
        results = []
        for split, scores in self.scores.items():
            for subset_results in scores:
                subset = subset_results.get("hf_subset", "default")
                results.append(
                    EvalResult(
                        task_type=task_type,
                        task_name=task_metadata.type,
                        dataset_type=task_metadata.dataset["path"],
                        dataset_name=f"{task_metadata.name} ({subset})",
                        dataset_config=subset,
                        dataset_split=split,
                        dataset_revision=task_metadata.dataset["revision"],
                        metric_type=task_metadata.main_score,
                        metric_name=task_metadata.main_score,
                        metric_value=subset_results["main_score"],
                        source_name="MTEB",
                        source_url="https://github.com/embeddings-benchmark/mteb/",
                    )
                )
        return results

domains property

Get the domains of the task.

eval_splits property

Get the eval splits present in the scores.

hf_subsets property

Get the hf_subsets present in the scores.

languages property

Get the languages present in the scores.

task cached property

Get the task associated with the result.

task_type property

Get the type of the task.

from_dict(data) classmethod

Create a TaskResult from a dictionary.

Source code in mteb/results/task_result.py
266
267
268
269
@classmethod
def from_dict(cls, data: dict) -> TaskResult:
    """Create a TaskResult from a dictionary."""
    return cls.model_validate(data)

from_disk(path, load_historic_data=True) classmethod

Load TaskResult from disk.

Parameters:

Name Type Description Default
path Path

The path to the file to load.

required
load_historic_data bool

Whether to attempt to load historic data from before v1.11.0.

True
Source code in mteb/results/task_result.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
@classmethod
def from_disk(cls, path: Path, load_historic_data: bool = True) -> TaskResult:  # type: ignore
    """Load TaskResult from disk.

    Args:
        path: The path to the file to load.
        load_historic_data: Whether to attempt to load historic data from before v1.11.0.
    """
    with path.open("r", encoding="utf-8") as f:
        data = json.load(f)

    if not load_historic_data:
        try:
            return cls.model_validate(data)
        except Exception as e:
            raise ValueError(
                f"Error loading TaskResult from disk. You can try to load historic data by setting `load_historic_data=True`. Error: {e}"
            )

    pre_1_11_load = (
        (
            "mteb_version" in data
            and data["mteb_version"] is not None
            and Version(data["mteb_version"]) < Version("1.11.0")
        )
        or "mteb_version" not in data
    )  # assume it is before 1.11.0 if the version is not present

    try:
        obj = cls.model_validate(data)
    except Exception as e:
        if not pre_1_11_load:
            raise e
        logger.debug(
            f"Could not load TaskResult from disk, got error: {e}. Attempting to load from disk using format from before v1.11.0"
        )
        obj = cls._convert_from_before_v1_11_0(data)

    pre_v_12_48 = (
        "mteb_version" in data
        and data["mteb_version"] is not None
        and Version(data["mteb_version"]) < Version("1.12.48")
    )

    if pre_v_12_48:
        cls._fix_pair_classification_scores(obj)

    return obj

get_hf_eval_results()

Create HF evaluation results objects from TaskResult objects.

Returns:

Type Description
list[EvalResult]

List of EvalResult objects for each split and subset.

Source code in mteb/results/task_result.py
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
def get_hf_eval_results(self) -> list[EvalResult]:
    """Create HF evaluation results objects from TaskResult objects.

    Returns:
        List of EvalResult objects for each split and subset.
    """
    task_metadata = self.task.metadata
    task_type = task_metadata._hf_task_type()[0]
    results = []
    for split, scores in self.scores.items():
        for subset_results in scores:
            subset = subset_results.get("hf_subset", "default")
            results.append(
                EvalResult(
                    task_type=task_type,
                    task_name=task_metadata.type,
                    dataset_type=task_metadata.dataset["path"],
                    dataset_name=f"{task_metadata.name} ({subset})",
                    dataset_config=subset,
                    dataset_split=split,
                    dataset_revision=task_metadata.dataset["revision"],
                    metric_type=task_metadata.main_score,
                    metric_name=task_metadata.main_score,
                    metric_value=subset_results["main_score"],
                    source_name="MTEB",
                    source_url="https://github.com/embeddings-benchmark/mteb/",
                )
            )
    return results

get_missing_evaluations(task)

Checks which splits and subsets are missing from the results.

Parameters:

Name Type Description Default
task AbsTask

The task to check against.

required

Returns:

Type Description
dict[str, list[str]]

A dictionary with the splits as keys and a list of missing subsets as values.

Source code in mteb/results/task_result.py
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
def get_missing_evaluations(self, task: AbsTask) -> dict[str, list[str]]:
    """Checks which splits and subsets are missing from the results.

    Args:
        task: The task to check against.

    Returns:
        A dictionary with the splits as keys and a list of missing subsets as values.
    """
    missing_splits = {}
    for splits in task.eval_splits:
        if splits not in self.scores:  # split it fully missing
            missing_splits[splits] = task.hf_subsets
        if splits in self.scores:
            hf_subsets = {score["hf_subset"] for score in self.scores[splits]}
            missing_subsets = list(set(task.hf_subsets) - hf_subsets)
            if missing_subsets:
                missing_splits[splits] = missing_subsets

    return missing_splits

get_score(splits=None, languages=None, scripts=None, getter=lambda scores: scores['main_score'], aggregation=np.mean)

Get a score for the specified splits, languages, scripts and aggregation function.

Parameters:

Name Type Description Default
splits list[SplitName] | None

The splits to consider.

None
languages list[ISOLanguage | ISOLanguageScript] | None

The languages to consider. Can be ISO language codes or ISO language script codes.

None
scripts list[ISOLanguageScript] | None

The scripts to consider.

None
getter Callable[[ScoresDict], Score]

A function that takes a scores dictionary and returns a score e.g. "main_score" or "evaluation_time".

lambda scores: scores['main_score']
aggregation Callable[[list[Score]], Any]

The aggregation function to use.

mean

Returns:

Type Description
Any

The result of the aggregation function on the scores.

Source code in mteb/results/task_result.py
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
def get_score(
    self,
    splits: list[SplitName] | None = None,
    languages: list[ISOLanguage | ISOLanguageScript] | None = None,
    scripts: list[ISOLanguageScript] | None = None,
    getter: Callable[[ScoresDict], Score] = lambda scores: scores["main_score"],
    aggregation: Callable[[list[Score]], Any] = np.mean,
) -> Any:
    """Get a score for the specified splits, languages, scripts and aggregation function.

    Args:
        splits: The splits to consider.
        languages: The languages to consider. Can be ISO language codes or ISO language script codes.
        scripts: The scripts to consider.
        getter: A function that takes a scores dictionary and returns a score e.g. "main_score" or "evaluation_time".
        aggregation: The aggregation function to use.

    Returns:
        The result of the aggregation function on the scores.
    """
    if splits is None:
        splits = list(self.scores.keys())

    lang_scripts = LanguageScripts.from_languages_and_scripts(languages, scripts)

    values = []
    for split in splits:
        if split not in self.scores:
            raise ValueError(f"Split {split} not found in scores")

        for scores in self.scores[split]:
            eval_langs = scores["languages"]
            for lang in eval_langs:
                if lang_scripts.contains_language(lang):
                    values.append(getter(scores))
                    break

    return aggregation(values)

is_mergeable(result, criteria=['mteb_version', 'dataset_revision'], raise_error=False)

Checks if the TaskResult object can be merged with another TaskResult or Task.

Parameters:

Name Type Description Default
result TaskResult | AbsTask

The TaskResult or Task object to check against.

required
criteria list[str] | list[Criterias]

Additional criteria to check for merging. Can be "mteb_version" or "dataset_revision". It will always check that the task name match.

['mteb_version', 'dataset_revision']
raise_error bool

If True, raises an error if the objects cannot be merged. If False, returns False.

False

Returns:

Type Description
bool

True if the TaskResult object can be merged with the other object, False otherwise.

Source code in mteb/results/task_result.py
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
def is_mergeable(
    self,
    result: TaskResult | AbsTask,
    criteria: list[str] | list[Criterias] = [
        "mteb_version",
        "dataset_revision",
    ],
    raise_error: bool = False,
) -> bool:
    """Checks if the TaskResult object can be merged with another TaskResult or Task.

    Args:
        result: The TaskResult or Task object to check against.
        criteria: Additional criteria to check for merging. Can be "mteb_version" or "dataset_revision".
            It will always check that the task name match.
        raise_error: If True, raises an error if the objects cannot be merged. If False, returns False.

    Returns:
        True if the TaskResult object can be merged with the other object, False otherwise.
    """
    criteria = [
        Criterias.from_str(c) if isinstance(c, str) else c for c in criteria
    ]
    if isinstance(result, TaskResult):
        name = result.task_name
        revision = result.dataset_revision
        mteb_version = result.mteb_version
    elif isinstance(result, AbsTask):
        mteb_version = version("mteb")
        name = result.metadata.name
        revision = result.metadata.revision
    else:
        return False

    if self.task_name != name:
        if raise_error:
            raise ValueError(
                f"Cannot merge TaskResult objects as they are derived from different tasks ({self.task_name} and {name})"
            )
        return False

    if Criterias.MTEB_VERSION in criteria and self.mteb_version != mteb_version:
        if raise_error:
            raise ValueError(
                f"Cannot merge TaskResult objects as they are derived from different MTEB versions ({self.mteb_version} and {mteb_version})"
            )
        return False

    if Criterias.DATASET_REVISION in criteria and self.dataset_revision != revision:
        if raise_error:
            raise ValueError(
                f"Cannot merge TaskResult objects as they are derived from different dataset revisions ({self.dataset_revision} and {revision})"
            )
        return False

    return True

merge(new_results, criteria=['mteb_version', 'dataset_revision'])

Merges two TaskResult objects.

Parameters:

Name Type Description Default
new_results TaskResult

The new TaskResult object to merge with the current one.

required
criteria list[str] | list[Criterias]

Additional criteria to check for merging. Can be "mteb_version" or "dataset_revision". It will always check that the task name match.

['mteb_version', 'dataset_revision']

Returns:

Type Description
TaskResult

A new TaskResult object with the merged scores.

Source code in mteb/results/task_result.py
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
def merge(
    self,
    new_results: TaskResult,
    criteria: list[str] | list[Criterias] = [
        "mteb_version",
        "dataset_revision",
    ],
) -> TaskResult:
    """Merges two TaskResult objects.

    Args:
        new_results: The new TaskResult object to merge with the current one.
        criteria: Additional criteria to check for merging. Can be "mteb_version" or "dataset_revision".
            It will always check that the task name match.

    Returns:
        A new TaskResult object with the merged scores.
    """
    self.is_mergeable(new_results, criteria=criteria, raise_error=True)

    merged_scores = self.scores.copy()

    for split, scores in new_results.scores.items():
        if split in merged_scores:
            merged_scores[split] = self._merge_split_scores(
                merged_scores[split], scores
            )
        else:
            merged_scores[split] = scores

    existing_kg_co2_emissions = (
        self.kg_co2_emissions if self.kg_co2_emissions else 0
    )
    new_kg_co2_emissions = (
        new_results.kg_co2_emissions if new_results.kg_co2_emissions else 0
    )
    merged_kg_co2_emissions = None
    if existing_kg_co2_emissions and new_kg_co2_emissions:
        merged_kg_co2_emissions = existing_kg_co2_emissions + new_kg_co2_emissions

    merged_evaluation_time = None
    if self.evaluation_time and new_results.evaluation_time:
        merged_evaluation_time = self.evaluation_time + new_results.evaluation_time
    merged_results = TaskResult(
        dataset_revision=new_results.dataset_revision,
        task_name=new_results.task_name,
        mteb_version=new_results.mteb_version,
        scores=merged_scores,
        evaluation_time=merged_evaluation_time,
        kg_co2_emissions=merged_kg_co2_emissions,
    )

    return merged_results

to_dict()

Convert the TaskResult to a dictionary.

Source code in mteb/results/task_result.py
262
263
264
def to_dict(self) -> dict:
    """Convert the TaskResult to a dictionary."""
    return self.model_dump()

validate_and_filter_scores(task=None)

This ensures that the scores are correct for the given task, by removing any splits besides those specified in the task metadata. Additionally it also ensure that all of the splits required as well as the languages are present in the scores. Returns new TaskResult object.

Parameters:

Name Type Description Default
task AbsTask | None

The task to validate the scores against. E.g. if the task supplied is limited to certain splits and languages, the scores will be filtered to only include those splits and languages. If None it will attempt to get the task from the task_name.

None
Source code in mteb/results/task_result.py
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
def validate_and_filter_scores(self, task: AbsTask | None = None) -> TaskResult:
    """This ensures that the scores are correct for the given task, by removing any splits besides those specified in the task metadata.
    Additionally it also ensure that all of the splits required as well as the languages are present in the scores.
    Returns new TaskResult object.

    Args:
        task: The task to validate the scores against. E.g. if the task supplied is limited to certain splits and languages,
            the scores will be filtered to only include those splits and languages. If None it will attempt to get the task from the task_name.
    """
    from mteb.overview import get_task

    if task is None:
        task = get_task(self.task_name)

    splits = task.eval_splits
    hf_subsets = task.hf_subsets
    hf_subsets = set(hf_subsets)

    new_scores = {}
    seen_splits = set()
    for split in self.scores:
        if split not in splits:
            continue
        new_scores[split] = []
        seen_subsets = set()
        for _scores in self.scores[split]:
            if _scores["hf_subset"] not in hf_subsets:
                continue
            new_scores[split].append(_scores)
            seen_subsets.add(_scores["hf_subset"])
        if seen_subsets != hf_subsets:
            missing_subsets = hf_subsets - seen_subsets
            if len(missing_subsets) > 2:
                subset1, subset2 = list(missing_subsets)[:2]
                missing_subsets_str = f"{{'{subset1}', '{subset2}', ...}}"
            else:
                missing_subsets_str = str(missing_subsets)

            logger.warning(
                f"{task.metadata.name}: Missing subsets {missing_subsets_str} for split {split}"
            )
        seen_splits.add(split)
    if seen_splits != set(splits):
        logger.warning(
            f"{task.metadata.name}: Missing splits {set(splits) - seen_splits}"
        )
    new_res = {**self.to_dict(), "scores": new_scores}
    new_res = TaskResult.from_validated(**new_res)
    return new_res

mteb.results.ModelResult

Bases: BaseModel

Data class to hold the results of a model on a set of tasks.

Attributes:

Name Type Description
model_name str

Name of the model.

model_revision str | None

Revision of the model.

task_results list[TaskResult]

List of TaskResult objects.

Source code in mteb/results/model_result.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
class ModelResult(BaseModel):
    """Data class to hold the results of a model on a set of tasks.

    Attributes:
        model_name: Name of the model.
        model_revision: Revision of the model.
        task_results: List of TaskResult objects.
    """

    model_name: str
    model_revision: str | None
    task_results: list[TaskResult]
    default_modalities: list[Modalities] = Field(
        default_factory=lambda: ["text"], alias="modalities"
    )
    model_config = (
        ConfigDict(  # to free up the name model_* which is otherwise protected
            protected_namespaces=(),
        )
    )

    def __repr__(self) -> str:
        n_entries = len(self.task_results)
        return f"ModelResult(model_name={self.model_name}, model_revision={self.model_revision}, task_results=[...](#{n_entries}))"

    @classmethod
    def from_validated(cls, **data) -> ModelResult:
        data["task_results"] = [
            TaskResult.from_validated(**res) for res in data["task_results"]
        ]
        return cls.model_construct(**data)

    def _filter_tasks(
        self,
        task_names: list[str] | None = None,
        languages: list[str] | None = None,
        domains: list[TaskDomain] | None = None,
        task_types: list[TaskType] | None = None,
        modalities: list[Modalities] | None = None,
    ) -> ModelResult:
        new_task_results = []
        for task_result in self.task_results:
            if (task_names is not None) and (task_result.task_name not in task_names):
                continue
            if languages is not None:
                task_languages = task_result.languages
                if not any(lang in task_languages for lang in languages):
                    continue
            if domains is not None:
                task_domains = task_result.domains
                if not any(domain in task_domains for domain in domains):
                    continue
            if (task_types is not None) and (task_result.task_type not in task_types):
                continue
            if modalities is not None:
                task_modalities = getattr(task_result, "modalities", [])
                if not any(modality in task_modalities for modality in modalities):
                    continue
            new_task_results.append(task_result)
        return type(self).model_construct(
            model_name=self.model_name,
            model_revision=self.model_revision,
            task_results=new_task_results,
        )

    def select_tasks(self, tasks: Sequence[AbsTask]) -> ModelResult:
        task_name_to_task = {task.metadata.name: task for task in tasks}
        new_task_results = [
            task_res.validate_and_filter_scores(task_name_to_task[task_res.task_name])
            for task_res in self.task_results
            if task_res.task_name in task_name_to_task
        ]
        return type(self).model_construct(
            model_name=self.model_name,
            model_revision=self.model_revision,
            task_results=new_task_results,
        )

    def _get_scores(
        self,
        splits: list[SplitName] | None = None,
        languages: list[ISOLanguage | ISOLanguageScript] | None = None,
        scripts: list[ISOLanguageScript] | None = None,
        getter: Callable[[ScoresDict], Score] | None = None,
        aggregation: Callable[[list[Score]], Any] | None = None,
        format: Literal["wide", "long"] = "wide",
    ) -> dict | list:
        if (getter is not None) or (aggregation is not None) or (scripts is not None):
            use_fast = False
            getter = (
                getter if getter is not None else lambda scores: scores["main_score"]
            )
            aggregation = aggregation if aggregation is not None else np.mean
        else:
            use_fast = True
        if format == "wide":
            scores = {}
            for res in self.task_results:
                try:
                    if use_fast:
                        scores[res.task_name] = res._get_score_fast(
                            splits=splits,  # type: ignore
                            languages=languages,  # type: ignore
                        )
                    else:
                        scores[res.task_name] = res.get_score(
                            splits=splits,
                            languages=languages,
                            aggregation=aggregation,  # type: ignore
                            getter=getter,  # type: ignore
                            scripts=scripts,
                        )
                except Exception as e:
                    warnings.warn(
                        f"Couldn't get scores for {res.task_name} due to {e}."
                    )
            return scores
        if format == "long":
            entries = []
            for task_res in self.task_results:
                try:
                    if use_fast:
                        score = task_res._get_score_fast(
                            splits=splits,
                            languages=languages,  # type: ignore
                        )
                    else:
                        score = task_res.get_score(
                            splits=splits,
                            languages=languages,
                            aggregation=aggregation,  # type: ignore
                            getter=getter,  # type: ignore
                            scripts=scripts,
                        )
                    entry = dict(
                        model_name=self.model_name,
                        model_revision=self.model_revision,
                        task_name=task_res.task_name,
                        score=score,
                        mteb_version=task_res.mteb_version,
                        dataset_revision=task_res.dataset_revision,
                        evaluation_time=task_res.evaluation_time,
                        kg_co2_emissions=task_res.kg_co2_emissions,
                    )
                    entries.append(entry)
                except Exception as e:
                    warnings.warn(
                        f"Couldn't get scores for {task_res.task_name} due to {e}."
                    )
            return entries

    def _get_score_for_table(self) -> list[dict[str, str | float]]:
        scores_data = []
        model_name = self.model_name
        for task_result in self.task_results:
            task_name = task_result.task_name
            for split, scores_list in task_result.scores.items():
                for score_item in scores_list:
                    row = {
                        "model_name": model_name,
                        "model_revision": self.model_revision,
                        "task_name": task_name,
                        "split": split,
                        "subset": score_item.get("hf_subset", "default"),
                        "score": score_item.get("main_score", None),
                    }

                    scores_data.append(row)

        return scores_data

    def to_dataframe(
        self,
        aggregation_level: Literal["subset", "split", "task"] = "task",
        aggregation_fn: Callable[[list[Score]], Any] | None = None,
        include_model_revision: bool = False,
        format: Literal["wide", "long"] = "wide",
    ) -> pd.DataFrame:
        """Get a DataFrame with the scores for all models and tasks.
        The DataFrame will have the following columns in addition to the metadata columns:

        - model_name: The name of the model.
        - task_name: The name of the task.
        - score: The main score of the model on the task.

        In addition, the DataFrame can have the following columns depending on the aggregation level:

        - split: The split of the task. E.g. "test", "train", "validation".
        - subset: The subset of the task. E.g. "en", "fr-en".

        Afterwards, the DataFrame will be aggregated according to the aggregation method and pivoted to either a wide format.

        Args:
            aggregation_level: The aggregation to use. Can be one of:
                - "subset"/None: No aggregation will be done. The DataFrame will have one row per model, task, split and subset.
                - "split": Aggregates the scores by split. The DataFrame will have one row per model, task and split.
                - "task": Aggregates the scores by task. The DataFrame will have one row per model and task.
            aggregation_fn: The function to use for aggregation. If None, the mean will be used.
            include_model_revision: If True, the model revision will be included in the DataFrame. If False, it will be excluded.
            format: The format of the DataFrame. Can be one of:
                - "wide": The DataFrame will be of shape (number of tasks, number of models). Scores will be in the cells.
                - "long": The DataFrame will of length (number of tasks * number of model). Scores will be in columns.

        Returns:
            A DataFrame with the scores for all models and tasks.
        """
        scores_data = self._get_score_for_table()

        if not scores_data:
            logger.warning("No scores data available. Returning empty DataFrame.")
            return pd.DataFrame()

        # Create DataFrame
        df = pd.DataFrame(scores_data)

        _columns = ["model_name"]
        if include_model_revision is False:
            df = df.drop(columns=["model_revision"])
        else:
            _columns.append("model_revision")

        return _aggregate_and_pivot(
            df,
            columns=_columns,
            aggregation_level=aggregation_level,
            format=format,
            aggregation_fn=aggregation_fn,
        )

    def __hash__(self) -> int:
        return id(self)

    def __iter__(self) -> Iterable[TaskResult]:
        return iter(self.task_results)

    def __getitem__(self, index) -> TaskResult:
        return self.task_results[index]

    def __len__(self) -> int:
        return len(self.task_results)

    @property
    def languages(self) -> list[str]:
        """Get all languages in the model results.

        Returns:
            A list of languages in the model results.
        """
        langs = []
        for task_res in self.task_results:
            langs.extend(task_res.languages)
        return list(set(langs))

    @property
    def domains(self) -> list[str]:
        """Get all domains in the model results.

        Returns:
            A list of domains in the model results.

        """
        ds = []
        for task_res in self.task_results:
            ds.extend(task_res.domains)
        return list(set(ds))

    @property
    def task_types(self) -> list[str]:
        """Get all task types in the model results.

        Returns:
            A list of task types in the model results.
        """
        return list({task_res.task_type for task_res in self.task_results})

    @property
    def task_names(self) -> list[str]:
        """Get all task names in the model results.

        Returns:
            A list of task names in the model results.
        """
        return [task_res.task_name for task_res in self.task_results]

    @property
    def modalities(self) -> list[str]:
        """Get all modalities in the task results.

        Returns:
            A list of modalities in the task results.
        """
        mods = []
        for task_res in self.task_results:
            task_modalities = getattr(task_res, "modalities", [])
            mods.extend(task_modalities)
        if not mods:
            mods = self.default_modalities
        return list(set(mods))

domains property

Get all domains in the model results.

Returns:

Type Description
list[str]

A list of domains in the model results.

languages property

Get all languages in the model results.

Returns:

Type Description
list[str]

A list of languages in the model results.

modalities property

Get all modalities in the task results.

Returns:

Type Description
list[str]

A list of modalities in the task results.

task_names property

Get all task names in the model results.

Returns:

Type Description
list[str]

A list of task names in the model results.

task_types property

Get all task types in the model results.

Returns:

Type Description
list[str]

A list of task types in the model results.

to_dataframe(aggregation_level='task', aggregation_fn=None, include_model_revision=False, format='wide')

Get a DataFrame with the scores for all models and tasks. The DataFrame will have the following columns in addition to the metadata columns:

  • model_name: The name of the model.
  • task_name: The name of the task.
  • score: The main score of the model on the task.

In addition, the DataFrame can have the following columns depending on the aggregation level:

  • split: The split of the task. E.g. "test", "train", "validation".
  • subset: The subset of the task. E.g. "en", "fr-en".

Afterwards, the DataFrame will be aggregated according to the aggregation method and pivoted to either a wide format.

Parameters:

Name Type Description Default
aggregation_level Literal['subset', 'split', 'task']

The aggregation to use. Can be one of: - "subset"/None: No aggregation will be done. The DataFrame will have one row per model, task, split and subset. - "split": Aggregates the scores by split. The DataFrame will have one row per model, task and split. - "task": Aggregates the scores by task. The DataFrame will have one row per model and task.

'task'
aggregation_fn Callable[[list[Score]], Any] | None

The function to use for aggregation. If None, the mean will be used.

None
include_model_revision bool

If True, the model revision will be included in the DataFrame. If False, it will be excluded.

False
format Literal['wide', 'long']

The format of the DataFrame. Can be one of: - "wide": The DataFrame will be of shape (number of tasks, number of models). Scores will be in the cells. - "long": The DataFrame will of length (number of tasks * number of model). Scores will be in columns.

'wide'

Returns:

Type Description
DataFrame

A DataFrame with the scores for all models and tasks.

Source code in mteb/results/model_result.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def to_dataframe(
    self,
    aggregation_level: Literal["subset", "split", "task"] = "task",
    aggregation_fn: Callable[[list[Score]], Any] | None = None,
    include_model_revision: bool = False,
    format: Literal["wide", "long"] = "wide",
) -> pd.DataFrame:
    """Get a DataFrame with the scores for all models and tasks.
    The DataFrame will have the following columns in addition to the metadata columns:

    - model_name: The name of the model.
    - task_name: The name of the task.
    - score: The main score of the model on the task.

    In addition, the DataFrame can have the following columns depending on the aggregation level:

    - split: The split of the task. E.g. "test", "train", "validation".
    - subset: The subset of the task. E.g. "en", "fr-en".

    Afterwards, the DataFrame will be aggregated according to the aggregation method and pivoted to either a wide format.

    Args:
        aggregation_level: The aggregation to use. Can be one of:
            - "subset"/None: No aggregation will be done. The DataFrame will have one row per model, task, split and subset.
            - "split": Aggregates the scores by split. The DataFrame will have one row per model, task and split.
            - "task": Aggregates the scores by task. The DataFrame will have one row per model and task.
        aggregation_fn: The function to use for aggregation. If None, the mean will be used.
        include_model_revision: If True, the model revision will be included in the DataFrame. If False, it will be excluded.
        format: The format of the DataFrame. Can be one of:
            - "wide": The DataFrame will be of shape (number of tasks, number of models). Scores will be in the cells.
            - "long": The DataFrame will of length (number of tasks * number of model). Scores will be in columns.

    Returns:
        A DataFrame with the scores for all models and tasks.
    """
    scores_data = self._get_score_for_table()

    if not scores_data:
        logger.warning("No scores data available. Returning empty DataFrame.")
        return pd.DataFrame()

    # Create DataFrame
    df = pd.DataFrame(scores_data)

    _columns = ["model_name"]
    if include_model_revision is False:
        df = df.drop(columns=["model_revision"])
    else:
        _columns.append("model_revision")

    return _aggregate_and_pivot(
        df,
        columns=_columns,
        aggregation_level=aggregation_level,
        format=format,
        aggregation_fn=aggregation_fn,
    )

mteb.results.BenchmarkResults

Bases: BaseModel

Data class to hold the benchmark results of a model.

Attributes:

Name Type Description
model_results list[ModelResult]

List of ModelResult objects.

Source code in mteb/results/benchmark_results.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
class BenchmarkResults(BaseModel):
    """Data class to hold the benchmark results of a model.

    Attributes:
        model_results: List of ModelResult objects.
    """

    model_results: list[ModelResult]
    model_config = (
        ConfigDict(  # to free up the name model_results which is otherwise protected
            protected_namespaces=(),
        )
    )

    def __repr__(self) -> str:
        n_models = len(self.model_results)
        return f"BenchmarkResults(model_results=[...](#{n_models}))"

    def __hash__(self) -> int:
        return id(self)

    def _filter_tasks(
        self,
        task_names: list[str] | None = None,
        languages: list[str] | None = None,
        domains: list[TaskDomain] | None = None,
        task_types: list[TaskType] | None = None,  # type: ignore
        modalities: list[Modalities] | None = None,
        is_public: bool | None = None,
    ) -> BenchmarkResults:
        # TODO: Same as filter_models
        model_results = [
            res._filter_tasks(
                task_names=task_names,
                languages=languages,
                domains=domains,
                task_types=task_types,
                modalities=modalities,
                is_public=is_public,
            )
            for res in self.model_results
        ]
        return type(self).model_construct(
            model_results=[res for res in model_results if res.task_results]
        )

    def select_tasks(self, tasks: Sequence[AbsTask]) -> BenchmarkResults:
        """Select tasks from the benchmark results.

        Args:
            tasks: List of tasks to select. Can be a list of AbsTask objects or task names.
        """
        new_model_results = [
            model_res.select_tasks(tasks) for model_res in self.model_results
        ]
        return type(self).model_construct(model_results=new_model_results)

    def select_models(
        self,
        names: list[str] | list[ModelMeta],
        revisions: list[str | None] | None = None,
    ) -> BenchmarkResults:
        """Get models by name and revision.

        Args:
            names: List of model names to filter by. Can also be a list of ModelMeta objects. In which case, the revision is ignored.
            revisions: List of model revisions to filter by. If None, all revisions are returned.
        """
        models_res = []
        _revisions = revisions if revisions is not None else [None] * len(names)

        name_rev = {}

        if len(names) != len(_revisions):
            raise ValueError(
                "The length of names and revisions must be the same or revisions must be None."
            )

        for name, revision in zip(names, _revisions):
            if isinstance(name, ModelMeta):
                name_rev[name.name] = name.revision
            else:
                name_rev[name] = revision

        for model_res in self.model_results:
            model_name = model_res.model_name
            revision = model_res.model_revision
            if model_name in name_rev:
                if name_rev[model_name] is None or revision == name_rev[model_name]:
                    models_res.append(model_res)

        return type(self).model_construct(model_results=models_res)

    def _filter_models(
        self,
        model_names: Iterable[str] | None = None,
        languages: Iterable[str] | None = None,
        open_weights: bool | None = None,
        frameworks: Iterable[str] | None = None,
        n_parameters_range: tuple[int | None, int | None] = (None, None),
        use_instructions: bool | None = None,
        zero_shot_on: list[AbsTask] | None = None,
    ) -> BenchmarkResults:
        # mostly a utility function for the leaderboard app.
        # I would probably move the filtering of the models outside of this call. No need to call get_model_metas inside the filter.
        # interface would then be the same as the get_models function

        model_metas = get_model_metas(
            model_names=model_names,
            languages=languages,
            open_weights=open_weights,
            frameworks=frameworks,
            n_parameters_range=n_parameters_range,
            use_instructions=use_instructions,
            zero_shot_on=zero_shot_on,
        )
        models = {meta.name for meta in model_metas}
        # model_revision_pairs = {(meta.name, meta.revision) for meta in model_metas}
        new_model_results = []
        for model_res in self:
            if model_res.model_name in models:
                new_model_results.append(model_res)

        return type(self).model_construct(model_results=new_model_results)

    def join_revisions(self) -> BenchmarkResults:
        """Join revisions of the same model.

        In case of conflicts, the following rules are applied:
        1) If the main revision is present, it is kept. The main revision is the defined in the models ModelMeta object.
        2) If there is multiple revisions and some of them are None or na, they are filtered out.
        3) If there is no main revision, we prefer the one run using the latest mteb version.
        """

        def parse_version(version_str: str) -> Version | None:
            try:
                return Version(version_str)
            except (InvalidVersion, TypeError):
                return None

        def keep_best(group: pd.DataFrame) -> pd.DataFrame:
            # Filtering out task_results where no scores are present
            group = group[group["has_scores"]]
            is_main_revision = group["revision"] == group["main_revision"]
            # If the main revision is present we select that
            if is_main_revision.sum() > 0:
                return group[is_main_revision].head(n=1)
            unique_revisions = group["revision"].unique()

            # ensure None/NA/"external" revisions is filtered out
            group.loc[group["revision"].isna(), "revision"] = "no_revision_available"
            group.loc[group["revision"] == "external", "revision"] = (
                "no_revision_available"
            )

            # Filtering out no_revision_available if other revisions are present
            if (len(unique_revisions) > 1) and (
                "no_revision_available" in unique_revisions
            ):
                group = group[group["revision"] != "no_revision_available"]
            # If there are any not-NA mteb versions, we select the latest one
            if group["mteb_version"].notna().any():
                group = group.dropna(subset=["mteb_version"])
                group = group.sort_values("mteb_version", ascending=False)
                return group.head(n=1)
            return group.head(n=1)

        records = []
        for model_result in self:
            for task_result in model_result.task_results:
                records.append(
                    dict(
                        model=model_result.model_name,
                        revision=model_result.model_revision,
                        task_name=task_result.task_name,
                        mteb_version=task_result.mteb_version,
                        task_result=task_result,
                        has_scores=bool(task_result.scores),
                    )
                )
        if not records:
            return BenchmarkResults.model_construct(model_results=[])
        task_df = pd.DataFrame.from_records(records)
        model_to_main_revision = {
            meta.name: meta.revision for meta in get_model_metas()
        }
        task_df["main_revision"] = task_df["model"].map(model_to_main_revision)  # type: ignore
        task_df["mteb_version"] = task_df["mteb_version"].map(parse_version)  # type: ignore
        task_df = (
            task_df.groupby(["model", "task_name"])
            .apply(keep_best)
            .reset_index(drop=True)
        )
        model_results = []
        for (model, model_revision), group in task_df.groupby(["model", "revision"]):
            model_result = ModelResult.model_construct(
                model_name=model,
                model_revision=model_revision,
                task_results=list(group["task_result"]),
            )
            model_results.append(model_result)
        return BenchmarkResults.model_construct(model_results=model_results)

    def _get_scores(
        self,
        splits: list[SplitName] | None = None,
        languages: list[ISOLanguage | ISOLanguageScript] | None = None,
        scripts: list[ISOLanguageScript] | None = None,
        getter: Callable[[ScoresDict], Score] | None = None,
        aggregation: Callable[[list[Score]], Any] | None = None,
        format: Literal["wide", "long"] = "wide",
    ) -> list[dict]:
        entries = []
        if format == "wide":
            for model_res in self:
                try:
                    model_scores = model_res._get_scores(
                        splits=splits,
                        languages=languages,
                        scripts=scripts,
                        getter=getter,
                        aggregation=aggregation,
                        format="wide",
                    )
                    entries.append(
                        {
                            "model": model_res.model_name,
                            "revision": model_res.model_revision,
                            **model_scores,  # type: ignore
                        }
                    )
                except Exception as e:
                    warnings.warn(
                        f"Couldn't get scores for {model_res.model_name}({model_res.model_revision}), due to: {e}"
                    )
        if format == "long":
            for model_res in self:
                try:
                    entries.extend(
                        model_res._get_scores(
                            splits=splits,
                            languages=languages,
                            scripts=scripts,
                            getter=getter,
                            aggregation=aggregation,
                            format="long",
                        )
                    )
                except Exception as e:
                    warnings.warn(
                        f"Couldn't get scores for {model_res.model_name}({model_res.model_revision}), due to: {e}"
                    )
        return entries

    def to_dataframe(
        self,
        aggregation_level: Literal["subset", "split", "task"] = "task",
        aggregation_fn: Callable[[list[Score]], Any] | None = None,
        include_model_revision: bool = False,
        format: Literal["wide", "long"] = "wide",
    ) -> pd.DataFrame:
        """Get a DataFrame with the scores for all models and tasks.
        The DataFrame will have the following columns in addition to the metadata columns:

        - model_name: The name of the model.
        - task_name: The name of the task.
        - score: The main score of the model on the task.

        In addition, the DataFrame can have the following columns depending on the aggregation level:

        - split: The split of the task. E.g. "test", "train", "validation".
        - subset: The subset of the task. E.g. "en", "fr-en".

        Afterwards, the DataFrame will be aggregated according to the aggregation method and pivoted to either a wide format.

        Args:
            aggregation_level: The aggregation to use. Can be one of:
                - "subset"/None: No aggregation will be done. The DataFrame will have one row per model, task, split and subset.
                - "split": Aggregates the scores by split. The DataFrame will have one row per model, task and split.
                - "task": Aggregates the scores by task. The DataFrame will have one row per model and task.
            aggregation_fn: The function to use for aggregation. If None, the mean will be used.
            include_model_revision: If True, the model revision will be included in the DataFrame. If False, it will be excluded.
                If there are multiple revisions for the same model, they will be joined using the `join_revisions` method.
            format: The format of the DataFrame. Can be one of:
                - "wide": The DataFrame will be of shape (number of tasks, number of models). Scores will be in the cells.
                - "long": The DataFrame will of length (number of tasks * number of model). Scores will be in columns.

        Returns:
            A DataFrame with the scores for all models and tasks.
        """
        bench_results = self
        if include_model_revision is False:
            bench_results = bench_results.join_revisions()

        scores_data = []
        for model_result in bench_results:
            scores_data.extend(model_result._get_score_for_table())

        if not scores_data:
            logger.warning("No scores data available. Returning empty DataFrame.")
            return pd.DataFrame()

        # Create DataFrame
        df = pd.DataFrame(scores_data)

        _columns = ["model_name"]
        if include_model_revision is False:
            df = df.drop(columns=["model_revision"])
        else:
            _columns.append("model_revision")

        # Aggregation
        return _aggregate_and_pivot(
            df,
            columns=_columns,
            aggregation_level=aggregation_level,
            aggregation_fn=aggregation_fn,
            format=format,
        )

    def __iter__(self) -> Iterator[ModelResult]:
        return iter(self.model_results)

    def __getitem__(self, index: int) -> ModelResult:
        return self.model_results[index]

    def to_dict(self) -> dict:
        return self.model_dump()

    @classmethod
    def from_dict(cls, data: dict) -> BenchmarkResults:
        return cls.model_validate(data)

    def to_disk(self, path: Path | str) -> None:
        """Save the BenchmarkResults to a JSON file."""
        path = Path(path)
        with path.open("w") as out_file:
            out_file.write(self.model_dump_json(indent=2))

    @classmethod
    def from_validated(cls, **data) -> BenchmarkResults:
        model_results = []
        for model_res in data["model_results"]:
            model_results.append(ModelResult.from_validated(**model_res))
        return cls.model_construct(model_results=model_results)

    @classmethod
    def from_disk(cls, path: Path | str) -> BenchmarkResults:
        """Load the BenchmarkResults from a JSON file."""
        path = Path(path)
        with path.open() as in_file:
            data = json.loads(in_file.read())
        return cls.from_dict(data)

    @property
    def languages(self) -> list[str]:
        """Get all languages in the benchmark results.

        Returns:
            A list of languages in ISO 639-1 format.
        """
        langs = []
        for model_res in self.model_results:
            langs.extend(model_res.languages)
        return list(set(langs))

    @property
    def domains(self) -> list[str]:
        """Get all domains in the benchmark results.

        Returns:
            A list of domains in ISO 639-1 format.
        """
        ds = []
        for model_res in self.model_results:
            ds.extend(model_res.domains)
        return list(set(ds))

    @property
    def task_types(self) -> list[str]:
        """Get all task types in the benchmark results.

        Returns:
            A list of task types.
        """
        ts = []
        for model_res in self.model_results:
            ts.extend(model_res.task_types)
        return list(set(ts))

    @property
    def task_names(self) -> list[str]:
        """Get all task names in the benchmark results.

        Returns:
            A list of task names.
        """
        names = []
        for model_res in self.model_results:
            names.extend(model_res.task_names)
        return list(set(names))

    @property
    def modalities(self) -> list[str]:
        """Get all modalities in the benchmark results.

        Returns:
            A list of modalities.
        """
        mod = []
        for model_res in self.model_results:
            mod.extend(model_res.modalities)
        return list(set(mod))

    @property
    def model_names(self) -> list[str]:
        """Get all model names in the benchmark results.

        Returns:
            A list of model names.
        """
        return [model_res.model_name for model_res in self.model_results]

    @property
    def model_revisions(self) -> list[dict[str, str | None]]:
        """Get all model revisions in the benchmark results.

        Returns:
            A list of dictionaries with model names and revisions.
        """
        return [
            {"model_name": model_res.model_name, "revision": model_res.model_revision}
            for model_res in self.model_results
        ]

domains property

Get all domains in the benchmark results.

Returns:

Type Description
list[str]

A list of domains in ISO 639-1 format.

languages property

Get all languages in the benchmark results.

Returns:

Type Description
list[str]

A list of languages in ISO 639-1 format.

modalities property

Get all modalities in the benchmark results.

Returns:

Type Description
list[str]

A list of modalities.

model_names property

Get all model names in the benchmark results.

Returns:

Type Description
list[str]

A list of model names.

model_revisions property

Get all model revisions in the benchmark results.

Returns:

Type Description
list[dict[str, str | None]]

A list of dictionaries with model names and revisions.

task_names property

Get all task names in the benchmark results.

Returns:

Type Description
list[str]

A list of task names.

task_types property

Get all task types in the benchmark results.

Returns:

Type Description
list[str]

A list of task types.

from_disk(path) classmethod

Load the BenchmarkResults from a JSON file.

Source code in mteb/results/benchmark_results.py
381
382
383
384
385
386
387
@classmethod
def from_disk(cls, path: Path | str) -> BenchmarkResults:
    """Load the BenchmarkResults from a JSON file."""
    path = Path(path)
    with path.open() as in_file:
        data = json.loads(in_file.read())
    return cls.from_dict(data)

join_revisions()

Join revisions of the same model.

In case of conflicts, the following rules are applied: 1) If the main revision is present, it is kept. The main revision is the defined in the models ModelMeta object. 2) If there is multiple revisions and some of them are None or na, they are filtered out. 3) If there is no main revision, we prefer the one run using the latest mteb version.

Source code in mteb/results/benchmark_results.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def join_revisions(self) -> BenchmarkResults:
    """Join revisions of the same model.

    In case of conflicts, the following rules are applied:
    1) If the main revision is present, it is kept. The main revision is the defined in the models ModelMeta object.
    2) If there is multiple revisions and some of them are None or na, they are filtered out.
    3) If there is no main revision, we prefer the one run using the latest mteb version.
    """

    def parse_version(version_str: str) -> Version | None:
        try:
            return Version(version_str)
        except (InvalidVersion, TypeError):
            return None

    def keep_best(group: pd.DataFrame) -> pd.DataFrame:
        # Filtering out task_results where no scores are present
        group = group[group["has_scores"]]
        is_main_revision = group["revision"] == group["main_revision"]
        # If the main revision is present we select that
        if is_main_revision.sum() > 0:
            return group[is_main_revision].head(n=1)
        unique_revisions = group["revision"].unique()

        # ensure None/NA/"external" revisions is filtered out
        group.loc[group["revision"].isna(), "revision"] = "no_revision_available"
        group.loc[group["revision"] == "external", "revision"] = (
            "no_revision_available"
        )

        # Filtering out no_revision_available if other revisions are present
        if (len(unique_revisions) > 1) and (
            "no_revision_available" in unique_revisions
        ):
            group = group[group["revision"] != "no_revision_available"]
        # If there are any not-NA mteb versions, we select the latest one
        if group["mteb_version"].notna().any():
            group = group.dropna(subset=["mteb_version"])
            group = group.sort_values("mteb_version", ascending=False)
            return group.head(n=1)
        return group.head(n=1)

    records = []
    for model_result in self:
        for task_result in model_result.task_results:
            records.append(
                dict(
                    model=model_result.model_name,
                    revision=model_result.model_revision,
                    task_name=task_result.task_name,
                    mteb_version=task_result.mteb_version,
                    task_result=task_result,
                    has_scores=bool(task_result.scores),
                )
            )
    if not records:
        return BenchmarkResults.model_construct(model_results=[])
    task_df = pd.DataFrame.from_records(records)
    model_to_main_revision = {
        meta.name: meta.revision for meta in get_model_metas()
    }
    task_df["main_revision"] = task_df["model"].map(model_to_main_revision)  # type: ignore
    task_df["mteb_version"] = task_df["mteb_version"].map(parse_version)  # type: ignore
    task_df = (
        task_df.groupby(["model", "task_name"])
        .apply(keep_best)
        .reset_index(drop=True)
    )
    model_results = []
    for (model, model_revision), group in task_df.groupby(["model", "revision"]):
        model_result = ModelResult.model_construct(
            model_name=model,
            model_revision=model_revision,
            task_results=list(group["task_result"]),
        )
        model_results.append(model_result)
    return BenchmarkResults.model_construct(model_results=model_results)

select_models(names, revisions=None)

Get models by name and revision.

Parameters:

Name Type Description Default
names list[str] | list[ModelMeta]

List of model names to filter by. Can also be a list of ModelMeta objects. In which case, the revision is ignored.

required
revisions list[str | None] | None

List of model revisions to filter by. If None, all revisions are returned.

None
Source code in mteb/results/benchmark_results.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def select_models(
    self,
    names: list[str] | list[ModelMeta],
    revisions: list[str | None] | None = None,
) -> BenchmarkResults:
    """Get models by name and revision.

    Args:
        names: List of model names to filter by. Can also be a list of ModelMeta objects. In which case, the revision is ignored.
        revisions: List of model revisions to filter by. If None, all revisions are returned.
    """
    models_res = []
    _revisions = revisions if revisions is not None else [None] * len(names)

    name_rev = {}

    if len(names) != len(_revisions):
        raise ValueError(
            "The length of names and revisions must be the same or revisions must be None."
        )

    for name, revision in zip(names, _revisions):
        if isinstance(name, ModelMeta):
            name_rev[name.name] = name.revision
        else:
            name_rev[name] = revision

    for model_res in self.model_results:
        model_name = model_res.model_name
        revision = model_res.model_revision
        if model_name in name_rev:
            if name_rev[model_name] is None or revision == name_rev[model_name]:
                models_res.append(model_res)

    return type(self).model_construct(model_results=models_res)

select_tasks(tasks)

Select tasks from the benchmark results.

Parameters:

Name Type Description Default
tasks Sequence[AbsTask]

List of tasks to select. Can be a list of AbsTask objects or task names.

required
Source code in mteb/results/benchmark_results.py
81
82
83
84
85
86
87
88
89
90
def select_tasks(self, tasks: Sequence[AbsTask]) -> BenchmarkResults:
    """Select tasks from the benchmark results.

    Args:
        tasks: List of tasks to select. Can be a list of AbsTask objects or task names.
    """
    new_model_results = [
        model_res.select_tasks(tasks) for model_res in self.model_results
    ]
    return type(self).model_construct(model_results=new_model_results)

to_dataframe(aggregation_level='task', aggregation_fn=None, include_model_revision=False, format='wide')

Get a DataFrame with the scores for all models and tasks. The DataFrame will have the following columns in addition to the metadata columns:

  • model_name: The name of the model.
  • task_name: The name of the task.
  • score: The main score of the model on the task.

In addition, the DataFrame can have the following columns depending on the aggregation level:

  • split: The split of the task. E.g. "test", "train", "validation".
  • subset: The subset of the task. E.g. "en", "fr-en".

Afterwards, the DataFrame will be aggregated according to the aggregation method and pivoted to either a wide format.

Parameters:

Name Type Description Default
aggregation_level Literal['subset', 'split', 'task']

The aggregation to use. Can be one of: - "subset"/None: No aggregation will be done. The DataFrame will have one row per model, task, split and subset. - "split": Aggregates the scores by split. The DataFrame will have one row per model, task and split. - "task": Aggregates the scores by task. The DataFrame will have one row per model and task.

'task'
aggregation_fn Callable[[list[Score]], Any] | None

The function to use for aggregation. If None, the mean will be used.

None
include_model_revision bool

If True, the model revision will be included in the DataFrame. If False, it will be excluded. If there are multiple revisions for the same model, they will be joined using the join_revisions method.

False
format Literal['wide', 'long']

The format of the DataFrame. Can be one of: - "wide": The DataFrame will be of shape (number of tasks, number of models). Scores will be in the cells. - "long": The DataFrame will of length (number of tasks * number of model). Scores will be in columns.

'wide'

Returns:

Type Description
DataFrame

A DataFrame with the scores for all models and tasks.

Source code in mteb/results/benchmark_results.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def to_dataframe(
    self,
    aggregation_level: Literal["subset", "split", "task"] = "task",
    aggregation_fn: Callable[[list[Score]], Any] | None = None,
    include_model_revision: bool = False,
    format: Literal["wide", "long"] = "wide",
) -> pd.DataFrame:
    """Get a DataFrame with the scores for all models and tasks.
    The DataFrame will have the following columns in addition to the metadata columns:

    - model_name: The name of the model.
    - task_name: The name of the task.
    - score: The main score of the model on the task.

    In addition, the DataFrame can have the following columns depending on the aggregation level:

    - split: The split of the task. E.g. "test", "train", "validation".
    - subset: The subset of the task. E.g. "en", "fr-en".

    Afterwards, the DataFrame will be aggregated according to the aggregation method and pivoted to either a wide format.

    Args:
        aggregation_level: The aggregation to use. Can be one of:
            - "subset"/None: No aggregation will be done. The DataFrame will have one row per model, task, split and subset.
            - "split": Aggregates the scores by split. The DataFrame will have one row per model, task and split.
            - "task": Aggregates the scores by task. The DataFrame will have one row per model and task.
        aggregation_fn: The function to use for aggregation. If None, the mean will be used.
        include_model_revision: If True, the model revision will be included in the DataFrame. If False, it will be excluded.
            If there are multiple revisions for the same model, they will be joined using the `join_revisions` method.
        format: The format of the DataFrame. Can be one of:
            - "wide": The DataFrame will be of shape (number of tasks, number of models). Scores will be in the cells.
            - "long": The DataFrame will of length (number of tasks * number of model). Scores will be in columns.

    Returns:
        A DataFrame with the scores for all models and tasks.
    """
    bench_results = self
    if include_model_revision is False:
        bench_results = bench_results.join_revisions()

    scores_data = []
    for model_result in bench_results:
        scores_data.extend(model_result._get_score_for_table())

    if not scores_data:
        logger.warning("No scores data available. Returning empty DataFrame.")
        return pd.DataFrame()

    # Create DataFrame
    df = pd.DataFrame(scores_data)

    _columns = ["model_name"]
    if include_model_revision is False:
        df = df.drop(columns=["model_revision"])
    else:
        _columns.append("model_revision")

    # Aggregation
    return _aggregate_and_pivot(
        df,
        columns=_columns,
        aggregation_level=aggregation_level,
        aggregation_fn=aggregation_fn,
        format=format,
    )

to_disk(path)

Save the BenchmarkResults to a JSON file.

Source code in mteb/results/benchmark_results.py
368
369
370
371
372
def to_disk(self, path: Path | str) -> None:
    """Save the BenchmarkResults to a JSON file."""
    path = Path(path)
    with path.open("w") as out_file:
        out_file.write(self.model_dump_json(indent=2))