Skip to content

Models

A model in mteb covers two concepts: metadata and implementation. - Metadata contains information about the model such as maximum input length, valid frameworks, license, and degree of openness. - Implementation is a reproducible workflow, which allows others to run the same model again, using the same prompts, hyperparameters, aggregation strategies, etc.

An overview of the model and its metadata within mteb

Utilities

mteb.get_model_metas(model_names=None, languages=None, open_weights=None, frameworks=None, n_parameters_range=(None, None), use_instructions=None, zero_shot_on=None, model_types=None, modalities=None, exclusive_modality_filter=False)

Load all models' metadata that fit the specified criteria.

Parameters:

Name Type Description Default
model_names Iterable[str] | None

A list of model names to filter by. If None, all models are included.

None
languages Iterable[str] | None

A list of languages to filter by. If None, all languages are included.

None
open_weights bool | None

Whether to filter by models with open weights. If None this filter is ignored.

None
frameworks Iterable[str] | None

A list of frameworks to filter by. If None, all frameworks are included.

None
n_parameters_range tuple[int | None, int | None]

A tuple of lower and upper bounds of the number of parameters to filter by. If (None, None), this filter is ignored.

(None, None)
use_instructions bool | None

Whether to filter by models that use instructions. If None, all models are included.

None
zero_shot_on list[AbsTask] | None

A list of tasks on which the model is zero-shot. If None this filter is ignored.

None
model_types Iterable[str] | None

A list of model types to filter by. If None, all model types are included.

None
modalities Iterable[Modalities] | None

A list of modalities to filter by. If None, all modalities are included.

None
exclusive_modality_filter bool

If True, only return models whose modalities exactly match the provided modalities. If False, return models whose modalities include the provided modalities.

False

Returns:

Type Description
list[ModelMeta]

A list of model metadata objects that fit the specified criteria.

Source code in mteb/models/get_model_meta.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def get_model_metas(  # noqa: PLR0913, PLR0917
    model_names: Iterable[str] | None = None,
    languages: Iterable[str] | None = None,
    open_weights: bool | None = None,
    frameworks: Iterable[str] | None = None,
    n_parameters_range: tuple[int | None, int | None] = (None, None),
    use_instructions: bool | None = None,
    zero_shot_on: list[AbsTask] | None = None,
    model_types: Iterable[str] | None = None,
    modalities: Iterable[Modalities] | None = None,
    exclusive_modality_filter: bool = False,
) -> list[ModelMeta]:
    """Load all models' metadata that fit the specified criteria.

    Args:
        model_names: A list of model names to filter by. If None, all models are included.
        languages: A list of languages to filter by. If None, all languages are included.
        open_weights: Whether to filter by models with open weights. If None this filter is ignored.
        frameworks: A list of frameworks to filter by. If None, all frameworks are included.
        n_parameters_range: A tuple of lower and upper bounds of the number of parameters to filter by.
            If (None, None), this filter is ignored.
        use_instructions: Whether to filter by models that use instructions. If None, all models are included.
        zero_shot_on: A list of tasks on which the model is zero-shot. If None this filter is ignored.
        model_types: A list of model types to filter by. If None, all model types are included.
        modalities: A list of modalities to filter by. If None, all modalities are included.
        exclusive_modality_filter: If True, only return models whose modalities exactly match the provided
            modalities. If False, return models whose modalities include the provided modalities.

    Returns:
        A list of model metadata objects that fit the specified criteria.
    """
    res = []
    model_names = set(model_names) if model_names is not None else None
    languages = set(languages) if languages is not None else None
    frameworks = set(frameworks) if frameworks is not None else None
    model_types_set = set(model_types) if model_types is not None else None
    modalities_set = set(modalities) if modalities is not None else None

    for model_meta in MODEL_REGISTRY.values():
        if (model_names is not None) and (model_meta.name not in model_names):
            continue
        if languages is not None:
            if (model_meta.languages is None) or not (
                languages <= set(model_meta.languages)
            ):
                continue
        if (open_weights is not None) and (model_meta.open_weights != open_weights):
            continue
        if (frameworks is not None) and not (frameworks <= set(model_meta.framework)):
            continue
        if (use_instructions is not None) and (
            model_meta.use_instructions != use_instructions
        ):
            continue
        if model_types_set is not None and not model_types_set.intersection(
            model_meta.model_type
        ):
            continue
        if modalities_set is not None:
            model_modalities = set(model_meta.modalities)
            if exclusive_modality_filter:
                if model_modalities != modalities_set:
                    continue
            elif not modalities_set <= model_modalities:
                continue

        lower, upper = n_parameters_range
        n_parameters = model_meta.n_parameters

        if upper is not None:
            if (n_parameters is None) or (n_parameters > upper):
                continue
            if lower is not None and n_parameters < lower:
                continue

        if zero_shot_on is not None:
            if not model_meta.is_zero_shot_on(zero_shot_on):
                continue
        res.append(model_meta)
    return res

mteb.get_model_meta(model_name, revision=None, fetch_from_hf=False, fill_missing=False, experiment_kwargs=None)

A function to fetch a model metadata object by name.

Parameters:

Name Type Description Default
model_name str

Name of the model to fetch

required
revision str | None

Revision of the model to fetch

None
fetch_from_hf bool

Whether to fetch the model from HuggingFace Hub if not found in the registry

False
fill_missing bool

Fill missing attributes from the metadata including number of parameters and memory usage.

False
experiment_kwargs Mapping[str, Any] | None

Optional dictionary of parameters to fill in the metadata for experimental models.

None

Returns:

Type Description
ModelMeta

A model metadata object

Source code in mteb/models/get_model_meta.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def get_model_meta(
    model_name: str,
    revision: str | None = None,
    fetch_from_hf: bool = False,
    fill_missing: bool = False,
    experiment_kwargs: Mapping[str, Any] | None = None,
) -> ModelMeta:
    """A function to fetch a model metadata object by name.

    Args:
        model_name: Name of the model to fetch
        revision: Revision of the model to fetch
        fetch_from_hf: Whether to fetch the model from HuggingFace Hub if not found in the registry
        fill_missing: Fill missing attributes from the metadata including number of parameters and memory usage.
        experiment_kwargs: Optional dictionary of parameters to fill in the metadata for experimental models.

    Returns:
        A model metadata object
    """
    if model_name in _MODEL_RENAMES:
        new_name = _MODEL_RENAMES[model_name]
        msg = f"The model '{model_name}' has been renamed to '{new_name}'. To prevent this warning use the new name."
        warnings.warn(msg, DeprecationWarning, stacklevel=2)
        model_name = new_name

    if model_name in MODEL_REGISTRY:
        model_meta = MODEL_REGISTRY[model_name]

        if revision and (not model_meta.revision == revision):
            raise ValueError(
                f"Model revision {revision} not found for model {model_name}. Expected {model_meta.revision}."
            )

        if experiment_kwargs is not None:
            model_meta = model_meta.model_copy(
                update={"experiment_kwargs": experiment_kwargs}
            )

        if fill_missing and fetch_from_hf:
            original_meta_dict = model_meta.model_dump()
            new_meta = ModelMeta.from_hub(model_name, fill_missing=fill_missing)
            new_meta_dict = new_meta.model_dump(exclude_none=True)

            updates = {
                k: v
                for k, v in new_meta_dict.items()
                if original_meta_dict.get(k) is None
            }

            if updates:
                return model_meta.model_copy(update=updates)
        return model_meta

    if fetch_from_hf:
        logger.info(
            f"Model not found in model registry. Attempting to extract metadata by loading the model ({model_name}) using HuggingFace."
        )
        meta = ModelMeta.from_hub(model_name, revision)
        return meta

    not_found_msg = f"Model '{model_name}' not found in MTEB registry"
    not_found_msg += " nor on the Huggingface Hub." if fetch_from_hf else "."

    close_matches = difflib.get_close_matches(model_name, MODEL_REGISTRY.keys())
    model_names_no_org = {mdl: mdl.split("/")[-1] for mdl in MODEL_REGISTRY.keys()}
    if model_name in model_names_no_org:
        close_matches = [model_names_no_org[model_name]] + close_matches

    suggestion = ""
    if close_matches:
        if len(close_matches) > 1:
            suggestion = f" Did you mean: '{close_matches[0]}' or {close_matches[1]}?"
        else:
            suggestion = f" Did you mean: '{close_matches[0]}'?"

    raise KeyError(not_found_msg + suggestion)

mteb.get_model(model_name, revision=None, device=None, *, embed_dim=None, **kwargs)

A function to fetch and load model object by name.

Note

This function loads the model into memory. If you only want to fetch the metadata, use get_model_meta instead.

Parameters:

Name Type Description Default
model_name str

Name of the model to fetch

required
revision str | None

Revision of the model to fetch

None
device str | None

Device used to load the model

None
embed_dim int | None

Optional embedding dimension to load the model with. This is only used for models that support loading with a specified embedding dimension, and will be ignored for other models.

None
**kwargs Any

Additional keyword arguments to pass to the model loader

{}

Returns:

Type Description
MTEBModels

A model object

Source code in mteb/models/get_model_meta.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def get_model(
    model_name: str,
    revision: str | None = None,
    device: str | None = None,
    *,
    embed_dim: int | None = None,
    **kwargs: Any,
) -> MTEBModels:
    """A function to fetch and load model object by name.

    !!! note
        This function loads the model into memory. If you only want to fetch the metadata, use [`get_model_meta`](#mteb.get_model_meta) instead.

    Args:
        model_name: Name of the model to fetch
        revision: Revision of the model to fetch
        device: Device used to load the model
        embed_dim: Optional embedding dimension to load the model with. This is only used for models that support loading with a specified embedding dimension, and will be ignored for other models.
        **kwargs: Additional keyword arguments to pass to the model loader

    Returns:
        A model object
    """
    meta = get_model_meta(model_name, revision, fetch_from_hf=True)
    model = meta.load_model(device=device, embed_dim=embed_dim, **kwargs)
    return model

Metadata

mteb.models.model_meta.ModelMeta

Bases: BaseModel

The model metadata object.

Attributes:

Name Type Description
loader Callable[..., MTEBModels] | None

The function that loads the model. If None it assumes that the model is not implemented.

loader_kwargs dict[str, Any]

The keyword arguments to pass to the loader function.

name str | None

The name of the model, ideally the name on huggingface. It should be in the format "organization/model_name".

n_parameters int | None

The total number of parameters in the model, e.g. 7_000_000 for a 7M parameter model. Can be none in case the number of parameters is unknown.

n_embedding_parameters int | None

The number of parameters used for the embedding layer. Can be None if the number of embedding parameters is not known (e.g. for proprietary models).

n_active_parameters_override int | None

The number of active parameters used bu model. Should be used only for Mixture of Experts models.

memory_usage_mb float | None

The memory usage of the model in MB. Can be None if the memory usage is not known (e.g. for proprietary models). To calculate it use the calculate_memory_usage_mb method.

max_tokens float | None

The maximum number of tokens the model can handle. Can be None if the maximum number of tokens is not known (e.g. for proprietary models).

embed_dim int | Sequence[int] | None

The dimension of the embeddings produced by the model. Currently all models are assumed to produce fixed-size embeddings. If annotated as list this will be treated as a range of possible embedding dimensions (Matryoshka).

revision str | None

The revision number of the model. If None, it is assumed that the metadata (including the loader) is valid for all revisions of the model.

release_date StrDate | None

The date the model's revision was released. If None, then release date will be added based on 1st commit in hf repository of model.

license Licenses | StrURL | None

The license under which the model is released. Required if open_weights is True.

open_weights bool | None

Whether the model is open source or proprietary.

public_training_code str | None

A link to the publicly available training code. If None, it is assumed that the training code is not publicly available.

public_training_data str | bool | None

A link to the publicly available training data. If None, it is assumed that the training data is not publicly available.

similarity_fn_name ScoringFunction | None

The distance metric used by the model.

framework list[FRAMEWORKS]

The framework the model is implemented in, can be a list of frameworks e.g. ["Sentence Transformers", "PyTorch"].

reference StrURL | None

A URL to the model's page on huggingface or another source.

languages list[ISOLanguageScript] | None

The languages the model is intended to be specified as a 3-letter language code followed by a script code e.g., "eng-Latn" for English in the Latin script.

use_instructions bool | None

Whether the model uses instructions E.g. for prompt-based models. This also includes models that require a specific format for input, such as "query: {document}" or "passage: {document}".

citation str | None

The citation for the model. This is a bibtex string.

training_datasets set[str] | None

A dictionary of datasets that the model was trained on. Names should be names as their appear in mteb for example {"ArguAna"} if the model is trained on the ArguAna test set. This field is used to determine if a model generalizes zero-shot to a benchmark as well as mark dataset contaminations.

adapted_from str | None

Name of the model from which this model is adapted. For quantizations, fine-tunes, long doc extensions, etc.

superseded_by str | None

Name of the model that supersedes this model, e.g., nvidia/NV-Embed-v2 supersedes v1.

model_type list[MODEL_TYPES]

A list of strings representing the type of model.

modalities list[Modalities]

A list of strings representing the modalities the model supports. Default is ["text"].

contacts list[str] | None

The people to contact in case of a problem in the model, preferably a GitHub handle.

experiment_kwargs Mapping[str, Any] | None

A dictionary of parameters used in the experiment that are not covered by other fields. This is used to create experiment names for ablation studies and similar experiments.

output_dtypes OutputDType | list[OutputDType] | None

Output embedding data types (e.g. int8, binary, float) natively supported by the model. If None, it is assumed that the model only returns float embeddings.

Source code in mteb/models/model_meta.py
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
class ModelMeta(BaseModel):  # noqa: PLR0904
    """The model metadata object.

    Attributes:
        loader: The function that loads the model. If None it assumes that the model is not implemented.
        loader_kwargs: The keyword arguments to pass to the loader function.
        name: The name of the model, ideally the name on huggingface. It should be in the format "organization/model_name".
        n_parameters: The total number of parameters in the model, e.g. `7_000_000` for a 7M parameter model. Can be none in case the number of parameters is unknown.
        n_embedding_parameters: The number of parameters used for the embedding layer. Can be None if the number of embedding parameters is not known (e.g. for proprietary models).
        n_active_parameters_override: The number of active parameters used bu model. Should be used **only** for Mixture of Experts models.
        memory_usage_mb: The memory usage of the model in MB. Can be None if the memory usage is not known (e.g. for proprietary models). To calculate it use the `calculate_memory_usage_mb` method.
        max_tokens: The maximum number of tokens the model can handle. Can be None if the maximum number of tokens is not known (e.g. for proprietary
            models).
        embed_dim: The dimension of the embeddings produced by the model. Currently all models are assumed to produce fixed-size embeddings.
            If annotated as list this will be treated as a range of possible embedding dimensions (Matryoshka).
        revision: The revision number of the model. If None, it is assumed that the metadata (including the loader) is valid for all revisions of the model.
        release_date: The date the model's revision was released. If None, then release date will be added based on 1st commit in hf repository of model.
        license: The license under which the model is released. Required if open_weights is True.
        open_weights: Whether the model is open source or proprietary.
        public_training_code: A link to the publicly available training code. If None, it is assumed that the training code is not publicly available.
        public_training_data: A link to the publicly available training data. If None, it is assumed that the training data is not publicly available.
        similarity_fn_name: The distance metric used by the model.
        framework: The framework the model is implemented in, can be a list of frameworks e.g. `["Sentence Transformers", "PyTorch"]`.
        reference: A URL to the model's page on huggingface or another source.
        languages: The languages the model is intended to be specified as a 3-letter language code followed by a script code e.g., "eng-Latn" for English
            in the Latin script.
        use_instructions: Whether the model uses instructions E.g. for prompt-based models. This also includes models that require a specific format for
            input, such as "query: {document}" or "passage: {document}".
        citation: The citation for the model. This is a bibtex string.
        training_datasets: A dictionary of datasets that the model was trained on. Names should be names as their appear in `mteb` for example
            {"ArguAna"} if the model is trained on the ArguAna test set. This field is used to determine if a model generalizes zero-shot to
            a benchmark as well as mark dataset contaminations.
        adapted_from: Name of the model from which this model is adapted. For quantizations, fine-tunes, long doc extensions, etc.
        superseded_by: Name of the model that supersedes this model, e.g., nvidia/NV-Embed-v2 supersedes v1.
        model_type: A list of strings representing the type of model.
        modalities: A list of strings representing the modalities the model supports. Default is ["text"].
        contacts: The people to contact in case of a problem in the model, preferably a GitHub handle.
        experiment_kwargs: A dictionary of parameters used in the experiment that are not covered by other fields. This is used to create experiment names for ablation studies and similar experiments.
        output_dtypes: Output embedding data types (e.g. int8, binary, float) natively supported by the model. If None, it is assumed that the model only returns float embeddings.
    """

    model_config = ConfigDict(extra="forbid")

    # loaders
    loader: Callable[..., MTEBModels] | None
    loader_kwargs: dict[str, Any] = field(default_factory=dict)
    name: str | None
    revision: str | None
    release_date: StrDate | None
    languages: list[ISOLanguageScript] | None
    n_parameters: int | None
    n_active_parameters_override: int | None = None
    n_embedding_parameters: int | None = None
    memory_usage_mb: float | None
    max_tokens: float | None
    embed_dim: int | Sequence[int] | None
    license: Licenses | StrURL | None
    open_weights: bool | None
    public_training_code: str | None
    public_training_data: str | bool | None
    framework: list[FRAMEWORKS]
    reference: StrURL | None = None
    similarity_fn_name: ScoringFunction | None
    use_instructions: bool | None
    training_datasets: set[str] | None
    adapted_from: str | None = None
    superseded_by: str | None = None
    modalities: list[Modalities] = ["text"]
    model_type: list[MODEL_TYPES] = ["dense"]
    citation: str | None = None
    contacts: list[str] | None = None
    experiment_kwargs: Mapping[str, Any] | None = None
    output_dtypes: OutputDType | list[OutputDType] | None = None

    def __setattr__(self, name: str, value: Any) -> None:
        """Deprecation warning for direct attribute mutation. Use model_copy(update={...}) instead."""
        warnings.warn(
            f"Mutating '{name}' is deprecated and will be removed in future versions. "
            "Use .model_copy(update={...}) instead.",
            DeprecationWarning,
            stacklevel=2,
        )
        super().__setattr__(name, value)

    @model_validator(mode="before")
    @classmethod
    def _handle_legacy_is_cross_encoder(cls, data: Any) -> Any:
        """Handle legacy is_cross_encoder field by converting it to model_type.

        This validator handles backward compatibility for the deprecated is_cross_encoder field.
        If is_cross_encoder=True is provided, it adds "cross_encoder" to model_type.
        """
        if isinstance(data, dict) and "is_cross_encoder" in data:
            is_cross_encoder_value = data.pop("is_cross_encoder")

            if is_cross_encoder_value is not None:
                warnings.warn(
                    "is_cross_encoder is deprecated and will be removed in a future version. "
                    "Use model_type=['cross-encoder'] instead.",
                    DeprecationWarning,
                    stacklevel=2,
                )

                model_type = data.get("model_type", ["dense"])

                if is_cross_encoder_value:
                    if "cross-encoder" not in model_type:
                        data["model_type"] = ["cross-encoder"]
                elif "cross-encoder" in model_type:
                    model_type = [t for t in model_type if t != "cross-encoder"]
                    data["model_type"] = model_type if model_type else ["dense"]

        return data

    @property
    def is_cross_encoder(self) -> bool:
        """Returns True if the model is a cross-encoder.

        Derived from model_type field. A model is considered a cross-encoder if "cross-encoder" is in its model_type list.
        """
        return "cross-encoder" in self.model_type

    @property
    def n_active_parameters(self):
        """Number of active parameters. Assumed to be `n_parameters - n_embedding_parameters`. Can be overwritten using `n_active_parameters_override` e.g. for MoE models."""
        if self.n_active_parameters_override is not None:
            return self.n_active_parameters_override

        if self.n_parameters is not None and self.n_embedding_parameters is not None:
            return self.n_parameters - self.n_embedding_parameters
        return None

    @field_validator("similarity_fn_name", mode="before")
    @classmethod
    def _validate_similarity_fn_name(cls, value: str) -> ScoringFunction | None:
        """Converts the similarity function name to the corresponding enum value.

        Sentence_transformers uses Literal['cosine', 'dot', 'euclidean', 'manhattan'],
        and pylate uses Literal['MaxSim']

        Args:
            value: The similarity function name as a string.

        Returns:
            The corresponding ScoringFunction enum value.
        """
        if type(value) is ScoringFunction or value is None:
            return value
        mapping = {
            "cosine": ScoringFunction.COSINE,
            "dot": ScoringFunction.DOT_PRODUCT,
            "MaxSim": ScoringFunction.MAX_SIM,
        }
        if value in mapping:
            return mapping[value]
        raise ValueError(f"Invalid similarity function name: {value}")

    def to_dict(self):
        """Returns a dictionary representation of the model metadata."""
        meta = self.model_copy(deep=True)
        dict_repr = meta.model_dump()
        if isinstance(meta.embed_dim, Sequence):
            dict_repr["embed_dim"] = max(meta.embed_dim)
        loader = dict_repr.pop("loader", None)
        dict_repr["training_datasets"] = (
            list(dict_repr["training_datasets"])
            if isinstance(dict_repr["training_datasets"], set)
            else dict_repr["training_datasets"]
        )
        dict_repr["loader"] = _get_loader_name(loader)
        dict_repr["is_cross_encoder"] = self.is_cross_encoder
        return dict_repr

    @field_validator("languages")
    @classmethod
    def _languages_are_valid(
        cls, languages: list[ISOLanguageScript] | None
    ) -> list[ISOLanguageScript] | None:
        if languages is None:
            return None

        for code in languages:
            check_language_code(code)
        return languages

    @field_validator("name")
    @classmethod
    def _check_name(cls, v: str | None) -> str | None:
        if v is None:
            return v
        if "/" not in v:
            raise ValueError(
                "Model name must be in the format 'organization/model_name'"
            )
        return v

    def __hash__(self) -> int:
        """Make ModelMeta hashable based on name, revision, experiment_kwargs and embed_dim.

        This allows ModelMeta instances to be used as dictionary keys.
        Two ModelMeta instances with the same name, revision, experiment_kwargs and embed_dim will have the same hash.
        """
        # Serialize experiment_kwargs to a deterministic, hashable representation
        exp_kwargs_repr = (
            _serialize_experiment_kwargs_to_name(self.experiment_kwargs)
            if self.experiment_kwargs
            else None
        )
        return hash(
            (
                self.name,
                self.revision,
                exp_kwargs_repr,
                tuple(self.embed_dim)
                if isinstance(self.embed_dim, Sequence)
                else self.embed_dim,
            )
        )

    def __eq__(self, other: object) -> bool:
        """Check equality based on name, revision, experiment_kwargs and embed_dim.

        Two ModelMeta instances are equal if they have the same name, revision, experiment_kwargs and embed_dim.
        """
        if not isinstance(other, ModelMeta):
            return NotImplemented
        self_dict = self.model_dump()
        other_dict = other.model_dump()
        return self_dict == other_dict

    def load_model(
        self,
        device: str | None = None,
        *,
        embed_dim: int | None = None,
        **kwargs: Any,
    ) -> MTEBModels:
        """Loads the model using the specified loader function."""
        # create a copy so that changing the model meta on the model does not influence the original meta
        _self = self.model_copy(deep=True)

        if _self.loader is None:
            raise NotImplementedError(
                "No model implementation is available for this model."
            )
        if _self.name is None:
            raise ValueError("name is not set for ModelMeta. Cannot load model.")

        loader = _self.loader
        name = _self.name
        revision = _self.revision
        updates: dict[str, Any] = {}
        base_exp_kwargs = (
            dict(_self.experiment_kwargs) if _self.experiment_kwargs else {}
        )

        if embed_dim is not None:
            if (
                _self.embed_dim is not None
                and isinstance(_self.embed_dim, int)
                and _self.embed_dim != embed_dim
            ):
                raise ValueError(
                    f"Requested embedding dimension {embed_dim} does not match the model's embedding dimension {_self.embed_dim}. "
                    "Model does not support loading with a different embedding dimension. "
                    "You can change supported embedding dimensions in `meta.embed_dim`."
                )
            elif isinstance(_self.embed_dim, list) and embed_dim not in _self.embed_dim:
                raise ValueError(
                    f"Requested embedding dimension {embed_dim} is not in the model's supported embedding dimensions {_self.embed_dim}."
                )
            updates["embed_dim"] = embed_dim
            kwargs["embed_dim"] = embed_dim

        merged_exp_kwargs = {**base_exp_kwargs, **kwargs} if kwargs else base_exp_kwargs
        updates["experiment_kwargs"] = merged_exp_kwargs or None

        # Allow overwrites
        _kwargs = _self.loader_kwargs.copy()
        _kwargs.update(merged_exp_kwargs)
        if device is not None:
            _kwargs["device"] = device

        updates["loader_kwargs"] = _kwargs
        _self = _self.model_copy(update=updates)
        model: MTEBModels = loader(
            name,
            revision=revision,
            **_kwargs,
        )
        model.mteb_model_meta = _self  # type: ignore[misc]
        return model

    def model_name_as_path(self) -> str:
        """Returns the model name in a format that can be used as a file path.

        Replaces "/" with "__" and spaces with "_".
        """
        if self.name is None:
            raise ValueError("Model name is not set")
        return self.name.replace("/", "__").replace(" ", "_")

    @property
    def experiment_name(self) -> str | None:
        """Create a filesystem-safe string representation of the experiment parameters.

        Uses deterministic serialization and hashing to ensure stable, bounded output.

        Examples:
            >>> import mteb
            >>> model = mteb.get_model("mteb/baseline-random-encoder", param1="test")
            >>>
            >>> print(model.mteb_model_meta.experiment_name)
            >>> # param1_test
        """
        return _serialize_experiment_kwargs_to_name(
            experiment_kwargs=self.experiment_kwargs
        )

    @property
    def model_name_with_experiment(self) -> str | None:
        """Combines the model name with the experiment parameters for a more descriptive name."""
        if self.name is None:
            return None
        experiment_str = _serialize_experiment_kwargs_to_name(
            experiment_kwargs=self.experiment_kwargs,
            value_field_separator="=",
            kwargs_separator=", ",
        )
        return f"{self.name} ({experiment_str})" if experiment_str else self.name

    @classmethod
    def _detect_cross_encoder_or_dense(
        cls,
        model_name: str,
        revision: str | None,
        config: dict[str, Any] | None,
        encoder_loader: Callable[..., MTEBModels],
        cross_encoder_loader: Callable[..., MTEBModels],
    ) -> tuple[Callable[..., MTEBModels] | None, MODEL_TYPES]:
        """Detect if model is CrossEncoder or default to dense."""
        if not config:
            logger.warning(
                f"Could not load config.json for {model_name}. "
                "Defaulting to SentenceTransformer loader."
            )
            return encoder_loader, "dense"

        architectures = config.get("architectures", [])

        is_cross_encoder = any(
            arch.endswith("ForSequenceClassification") for arch in architectures
        )
        if is_cross_encoder:
            return cross_encoder_loader, "cross-encoder"

        if cls._is_causal_lm_reranker(architectures, config, model_name):
            return cross_encoder_loader, "cross-encoder"

        logger.info(
            f"Model {model_name} does not have modules.json or recognized architecture. "
            "Defaulting to SentenceTransformer loader."
        )
        return encoder_loader, "dense"

    @staticmethod
    def _is_causal_lm_reranker(
        architectures: list[str], config: dict[str, Any], model_name: str
    ) -> bool:
        """Check if model is a CausalLM-style reranker."""
        is_causal_lm = any(arch.endswith("ForCausalLM") for arch in architectures)

        if not is_causal_lm:
            return False

        num_labels = config.get("num_labels", 0)
        model_name_lower = model_name.lower()

        return (
            num_labels > 0
            or "rerank" in model_name_lower
            or "cross-encoder" in model_name_lower
        )

    @classmethod
    def _detect_model_type_and_loader(
        cls,
        model_name: str,
        revision: str | None = None,
        config: dict[str, Any] | None = None,
    ) -> tuple[Callable[..., MTEBModels] | None, MODEL_TYPES]:
        """Detect the model type and appropriate loader based on HuggingFace Hub configuration files.

        This follows the Sentence Transformers architecture detection logic:
        1. Check for modules.json - If present, model is a SentenceTransformer (dense encoder)
        2. If no modules.json, check config.json for architecture:
            - ForSequenceClassification → CrossEncoder
            - CausalLM with reranking indicators → CrossEncoder
        3. Default to dense (SentenceTransformer) if no clear indicators are found

        Detection for CausalLM-style rerankers:
        - Model has ForCausalLM architecture AND
        - Has num_labels > 0 in config, OR
        - Model name contains "rerank" or "cross-encoder"

        Args:
            model_name: The HuggingFace model name
            revision: The model revision
            config: The loaded config.json from the HuggingFace model repository. If not provided, it will be fetched from the hub.


        Returns:
            A tuple of (loader_function, model_type) where:
            - loader_function: A callable that returns MTEBModels, or None if model doesn't exist
            - model_type: One of "dense", "cross-encoder", or "late-interaction"
        """
        from mteb.models import (
            CrossEncoderWrapper,
            SentenceTransformerEncoderWrapper,
        )

        try:
            modules_config = _get_json_from_hub(
                model_name, "modules.json", "model", revision=revision
            )

            if (
                modules_config
            ):  # SentenceTransformer/SparseEncoder (Not support for now)
                return SentenceTransformerEncoderWrapper, "dense"
            else:
                return cls._detect_cross_encoder_or_dense(
                    model_name,
                    revision,
                    config,
                    SentenceTransformerEncoderWrapper,
                    cross_encoder_loader=CrossEncoderWrapper,
                )

        except Exception as e:
            logger.warning(
                f"Error detecting model type for {model_name}: {e}. "
                "Defaulting to SentenceTransformer loader."
            )

        return SentenceTransformerEncoderWrapper, "dense"

    @classmethod
    def create_empty(cls, overwrites: dict[str, Any] | None = None) -> Self:
        """Creates an empty ModelMeta with all fields set to None or empty."""
        empty_model = cls(
            loader=None,
            name=None,
            revision=None,
            release_date=None,
            languages=None,
            n_parameters=None,
            n_embedding_parameters=None,
            memory_usage_mb=None,
            max_tokens=None,
            embed_dim=None,
            license=None,
            open_weights=None,
            public_training_code=None,
            public_training_data=None,
            framework=[],
            reference=None,
            similarity_fn_name=None,
            use_instructions=None,
            training_datasets=None,
            adapted_from=None,
            superseded_by=None,
            citation=None,
            contacts=None,
        )
        if overwrites:
            empty_model = empty_model.model_copy(update=overwrites)

        updates: dict[str, Any] = {}
        if empty_model.name is None:
            updates["name"] = "no_model_name/available"
        if empty_model.revision is None:
            updates["revision"] = "no_revision_available"
        if updates:
            empty_model = empty_model.model_copy(update=updates)

        return empty_model

    def merge(self, overwrite: Self) -> Self:
        """Merges another this ModelMeta with another ModelMeta.

        Args:
            overwrite: The ModelMeta to merge into this one. Non-None fields in this ModelMeta will overwrite the corresponding fields in this
                ModelMeta. the `framework` and `model_type` fields with combined.

        Returns:
            A new ModelMeta with the merged fields.
        """
        merged_data = self.model_dump()
        overwrite_data = overwrite.model_dump()

        for key, value in overwrite_data.items():
            if (
                key == "name"
                and value == "no_model_name/available"
                and self.name != "no_model_name/available"
            ):
                continue  # skip overwriting name if overwrite has no name available
            if (
                key == "revision"
                and value == "no_revision_available"
                and self.revision != "no_revision_available"
            ):
                continue  # skip overwriting revision if overwrite has no revision available
            if key in ["framework", "model_type"]:  # noqa: PLR6201
                # Combine lists and remove duplicates
                merged_list = set(merged_data.get(key, [])) | set(value or [])
                merged_data[key] = list(merged_list)
            if value is not None:
                merged_data[key] = value

        return self.model_copy(update=merged_data)

    @classmethod
    def _from_sentence_transformer_model(cls, model: SentenceTransformer) -> Self:
        """Generates a ModelMeta from only a SentenceTransformer model, without fetching any additional metadata from HuggingFace Hub."""
        from mteb.models import SentenceTransformerEncoderWrapper

        name: str | None = (
            model.model_card_data.model_name
            if model.model_card_data.model_name
            else model.model_card_data.base_model
        )
        n_embedding_parameters = (
            cls._get_n_embedding_parameters_from_sentence_transformers(model)
        )
        return cls.create_empty(
            overwrites=dict(
                name=name,
                revision=model.model_card_data.base_model_revision,
                loader=SentenceTransformerEncoderWrapper,
                max_tokens=model.max_seq_length,
                embed_dim=model.get_sentence_embedding_dimension(),
                similarity_fn_name=ScoringFunction.from_str(model.similarity_fn_name)
                if model.similarity_fn_name
                else None,
                framework=["Sentence Transformers", "PyTorch"],
                n_embedding_parameters=n_embedding_parameters,
            )
        )

    @staticmethod
    def _get_n_embedding_parameters_from_sentence_transformers(
        model: SentenceTransformer | CrossEncoder,
    ) -> int | None:
        """Calculates the number of embedding parameters in a SentenceTransformer model

        This is based on the heuristic: `vocab_size * embedding_dim` where vocab_size and embedding_dim are extracted from the model's first
        Transformer module.
        """
        logger.info(
            "Calculating number of embedding parameters for SentenceTransformer model."
        )

        emb = None
        if isinstance(model, CrossEncoder) and hasattr(
            model.model, "get_input_embeddings"
        ):
            emb = model.model.get_input_embeddings()
            return int(np.prod(emb.weight.shape))
        elif isinstance(model, SentenceTransformer):
            vocab = None
            try:
                vocab = len(model.tokenizer.vocab)
            except Exception as e:
                msg = f"Could not determine vocab size for model {model.model_card_data.model_name} and therefore cannot calculate number of embedding parameters. \nError: \n{e}"
                logger.warning(msg)
            embedding_dimensions = model.get_sentence_embedding_dimension()
            if embedding_dimensions is not None and vocab is not None:
                return vocab * embedding_dimensions

        logger.warning(
            f"Model does not have a recognized architecture for calculating embedding parameters (model={model.model_card_data.model_name})."
        )
        return None

    @classmethod
    def _from_cross_encoder_model(cls, model: CrossEncoder) -> Self:
        """Generates a ModelMeta from only a CrossEncoder model, without fetching any additional metadata from HuggingFace Hub."""
        from mteb.models import CrossEncoderWrapper

        return cls.create_empty(
            overwrites=dict(
                loader=CrossEncoderWrapper,
                name=model.model.name_or_path,
                revision=model.config._commit_hash,
                framework=["Sentence Transformers", "PyTorch"],
                model_type=["cross-encoder"],
                n_embedding_parameters=cls._get_n_embedding_parameters_from_sentence_transformers(
                    model
                ),
            )
        )

    @classmethod
    def _from_hub(  # noqa: PLR0914
        cls,
        model_name: str,
        revision: str | None = None,
    ) -> Self:
        """Generates a ModelMeta from a HuggingFace model name.

        Args:
            model_name: The HuggingFace model name.
            revision: Revision of the model
            fill_missing: Fill missing attributes from the metadata including number of parameters and memory usage.

        Returns:
            The generated ModelMeta.
        """
        loader: Callable[..., MTEBModels] | None
        model_type: MODEL_TYPES

        reference = "https://huggingface.co/" + model_name

        if not _repo_exists(model_name):
            warnings.warn(
                f"Could not find model {model_name} on HuggingFace Hub repository ({reference}). Metadata will be limited."
            )
            return cls.create_empty(
                overwrites=dict(
                    name=model_name,
                    revision=revision,
                )
            )
        config = _get_json_from_hub(
            model_name, "config.json", "model", revision=revision
        )
        loader, model_type = cls._detect_model_type_and_loader(
            model_name, revision, config=config
        )
        card = ModelCard.load(model_name)
        card_data = card.data
        card_data = cast("ModelCardData", card_data)
        try:
            model_config = AutoConfig.from_pretrained(model_name)
        except Exception as e:
            # some models can't load AutoConfig (e.g. `average_word_embeddings_levy_dependency`)
            model_config = None
            logger.warning(
                f"Can't get model configuration for {model_name}. Error: {e}"
            )

        frameworks = cls._get_frameworks_from_hf_tags(model_name) if model_name else []

        if revision is None:
            revisions = _get_repo_commits(model_name, "model")
            revision = revisions[0].commit_id if revisions else None

        model_license = card_data.license if card_data.license != "other" else None
        n_parameters = cls._calculate_num_parameters_from_hub(model_name)
        n_embedding_parameters = cls._estimate_embedding_parameters_from_hub(
            model_name, revision=revision, config=config
        )
        memory_usage_mb = cls._calculate_memory_usage_mb(
            model_name, n_parameters, fetch_from_hf=True
        )

        embedding_dim = getattr(model_config, "hidden_size", None)
        max_tokens = getattr(model_config, "max_position_embeddings", None)

        sbert_config = _get_json_from_hub(
            model_name, "sentence_bert_config.json", "model", revision=revision
        )
        if sbert_config:
            if max_tokens is None:
                max_tokens = sbert_config.get("max_seq_length", None)
        # have model type, similarity function fields
        config_sbert = _get_json_from_hub(
            model_name, "config_sentence_transformers.json", "model", revision=revision
        )
        similarity_fn_name = (
            ScoringFunction.from_str(config_sbert["similarity_fn_name"])
            if config_sbert is not None
            and config_sbert.get("similarity_fn_name") is not None
            else ScoringFunction.COSINE
        )

        return cls.create_empty(
            overwrites=dict(
                loader=loader,
                name=model_name,
                model_type=[model_type],
                revision=revision,
                reference=reference,
                release_date=cls.fetch_release_date(model_name),
                license=model_license,
                framework=frameworks,
                n_parameters=n_parameters,
                n_embedding_parameters=n_embedding_parameters,
                memory_usage_mb=memory_usage_mb,
                max_tokens=max_tokens,
                embed_dim=embedding_dim,
                similarity_fn_name=similarity_fn_name,
            )
        )

    @classmethod
    def from_sentence_transformer_model(
        cls,
        model: SentenceTransformer,
        revision: str | None = None,
        fill_missing: bool = False,
        compute_metadata: bool | None = None,
        fetch_from_hf: bool = False,
    ) -> Self:
        """Generates a ModelMeta from a SentenceTransformer model.

        Args:
            model: SentenceTransformer model.
            revision: Revision of the model
            fill_missing: Fill missing attributes from the metadata including number of parameters and memory usage.
            compute_metadata: Deprecated. Use fill_missing instead.
            fetch_from_hf: Whether to fetch additional metadata from HuggingFace Hub based on the model name. If False, only metadata that can be
                extracted from the SentenceTransformer model will be used.

        Returns:
            The generated ModelMeta.
        """
        if compute_metadata is not None:
            warnings.warn(
                "The compute_metadata parameter is deprecated and will be removed in a future version. "
                f"Use fetch_from_hf instead. Setting `fetch_from_hf={compute_metadata}`.",
                DeprecationWarning,
                stacklevel=2,
            )
            fetch_from_hf = compute_metadata

        if fill_missing is not None:
            warnings.warn(
                "The fill_missing parameter is deprecated and will be removed in a future version. "
                f"Use fetch_from_hf instead. Setting `fetch_from_hf={fill_missing}`.",
                DeprecationWarning,
                stacklevel=2,
            )
            fetch_from_hf = fill_missing

        meta = cls._from_sentence_transformer_model(model)
        if fetch_from_hf:
            if meta.name is None:
                logger.warning(
                    "Model name is not set in metadata extracted from SentenceTransformer model. Cannot fetch additional metadata from HuggingFace Hub."
                )
            else:
                name = cast("str", meta.name)
                meta_hub = cls._from_hub(name, revision)
                # prioritize metadata from the model card but fill missing fields from the hub
                meta = meta_hub.merge(meta)

        return meta

    @classmethod
    def from_hub(
        cls,
        model: str,
        revision: str | None = None,
        fill_missing: bool | None = None,
        compute_metadata: bool | None = None,
    ) -> Self:
        """Generates a ModelMeta for model from HuggingFace hub.

        Args:
            model: Name of the model from HuggingFace hub. For example, `intfloat/multilingual-e5-large`
            revision: Revision of the model
            fill_missing: Deprecated. The fill missing did not add any functionality for this function, but was added for compatibility with
                'from_sentence_transformer_model' and `from_cross_encoder`. It will be removed in a future version.
            compute_metadata: Deprecated. Was superseded by fill_missing.

        Returns:
            The generated ModelMeta.
        """
        if compute_metadata is not None:
            warnings.warn(
                "The compute_metadata parameter is deprecated and will be removed in a future version. It will be ignored.",
                DeprecationWarning,
                stacklevel=2,
            )

        if fill_missing is not None:
            warnings.warn(
                "The fill_missing parameter is deprecated and will be removed in a future version. It will be ignored.",
                DeprecationWarning,
                stacklevel=2,
            )

        meta = cls._from_hub(
            model,
            revision,
        )

        return meta

    @classmethod
    def from_cross_encoder(
        cls,
        model: CrossEncoder,
        revision: str | None = None,
        fill_missing: bool | None = None,
        compute_metadata: bool | None = None,
        fetch_from_hf: bool = False,
    ) -> Self:
        """Generates a ModelMeta from a CrossEncoder.

        Args:
            model: The CrossEncoder model
            revision: Revision of the model
            fill_missing: Fill missing attributes from the metadata including number of parameters and memory usage.
            compute_metadata: Deprecated. Use fill_missing instead.
            fetch_from_hf: Whether to fetch additional metadata from HuggingFace Hub based on the model name. If False, only metadata that can be
                extracted from the CrossEncoder model will be used.

        Returns:
            The generated ModelMeta
        """
        if compute_metadata is not None:
            warnings.warn(
                "The compute_metadata parameter is deprecated and will be removed in a future version. "
                f"Use fetch_from_hf instead. Setting `fetch_from_hf={compute_metadata}`.",
                DeprecationWarning,
                stacklevel=2,
            )
            fetch_from_hf = compute_metadata
        if fill_missing is not None:
            warnings.warn(
                "The fill_missing parameter is deprecated and will be removed in a future version. "
                f"Use fill_missing instead. Setting `fill_missing={fill_missing}`.",
                DeprecationWarning,
                stacklevel=2,
            )
            fetch_from_hf = fill_missing

        meta = cls._from_cross_encoder_model(model)
        if fetch_from_hf:
            name = cast("str", meta.name)
            meta_hub = cls._from_hub(name, revision)
            # prioritize metadata from the model card but fill missing fields from the hub
            meta = meta_hub.merge(meta)

        return meta

    def is_zero_shot_on(self, tasks: Sequence[AbsTask] | Sequence[str]) -> bool | None:
        """Indicates whether the given model can be considered zero-shot or not on the given tasks.

        Returns:
             None if no training data is specified on the model.
        """
        # If no tasks were specified, we're obviously zero-shot
        if not tasks:
            return True
        training_datasets = self.get_training_datasets()
        # If no tasks were specified, we're obviously zero-shot
        if training_datasets is None:
            return None

        if isinstance(tasks[0], str):
            benchmark_datasets = set(tasks)
        else:
            tasks = cast("Sequence[AbsTask]", tasks)
            benchmark_datasets = set()
            for task in tasks:
                benchmark_datasets.add(task.metadata.name)
        intersection = training_datasets & benchmark_datasets
        return len(intersection) == 0

    def get_training_datasets(self) -> set[str] | None:
        """Returns all training datasets of the model including similar tasks."""
        import mteb

        if self.training_datasets is None:
            return None

        training_datasets = self.training_datasets.copy()
        if self.adapted_from is not None:
            try:
                adapted_from_model = mteb.get_model_meta(
                    self.adapted_from, fetch_from_hf=False
                )
                adapted_training_datasets = adapted_from_model.get_training_datasets()
                if adapted_training_datasets is not None:
                    training_datasets |= adapted_training_datasets
            except (ValueError, KeyError) as e:
                msg = f"Could not get source model: {e} in MTEB"
                logger.warning(msg)
                warnings.warn(msg)

        return_dataset = training_datasets.copy()
        visited: set[str] = set()

        for dataset in training_datasets:
            similar_tasks = _collect_similar_tasks(dataset, visited)
            return_dataset |= similar_tasks

        return return_dataset

    def zero_shot_percentage(
        self, tasks: Sequence[AbsTask] | Sequence[str]
    ) -> int | None:
        """Indicates how out-of-domain the selected tasks are for the given model.

        Args:
            tasks: A sequence of tasks or dataset names to evaluate against.

        Returns:
            An integer percentage (0-100) indicating how out-of-domain the tasks are for the model.
            Returns None if no training data is specified on the model or if no tasks are provided.
        """
        training_datasets = self.get_training_datasets()
        if (training_datasets is None) or (not tasks):
            return None
        if isinstance(tasks[0], str):
            benchmark_datasets = set(tasks)
        else:
            tasks = cast("Sequence[AbsTask]", tasks)
            benchmark_datasets = {task.metadata.name for task in tasks}
        overlap = training_datasets & benchmark_datasets
        perc_overlap = 100 * (len(overlap) / len(benchmark_datasets))
        return int(100 - perc_overlap)

    @staticmethod
    def _calculate_num_parameters_from_hub(model_name: str | None = None) -> int | None:
        if not model_name:
            return None
        try:
            safetensors_metadata = get_safetensors_metadata(model_name)
            if len(safetensors_metadata.parameter_count) >= 0:
                return sum(safetensors_metadata.parameter_count.values())
        except (
            NotASafetensorsRepoError,
            SafetensorsParsingError,
            GatedRepoError,
            RepositoryNotFoundError,
        ) as e:
            logger.warning(
                f"Can't calculate number of parameters for {model_name}. Got error {e}"
            )
        return None

    def calculate_num_parameters_from_hub(self) -> int | None:
        """Calculates the number of parameters in the model.

        Returns:
            Number of parameters in the model.
        """
        return self._calculate_num_parameters_from_hub(self.name)

    @staticmethod
    def _estimate_embedding_parameters_from_hub(
        model_name: str | None = None,
        revision: str | None = None,
        config: dict[str, Any] | None = None,
    ) -> int | None:
        """Calculate the number of embedding parameters from the model config (vocab_size * hidden_size).  Note that this is an heuristic that works for many models, but might be incorrect.

        Returns:
            Number of embedding parameters in the model.
        """
        if not model_name:
            return None

        if not config:
            logger.warning(
                f"Could not calculate embedding parameters for {model_name} as config.json could not be loaded"
            )
            return None

        vocab_size = config.get("vocab_size")
        if vocab_size is None and "text_config" in config:
            vocab_size = config["text_config"].get("vocab_size")

        if vocab_size is None:
            logger.warning(
                f"Could not calculate embedding parameters for {model_name} as vocab_size is missing from config"
            )
            return None

        hidden_size = config.get("hidden_size") or config.get("hidden_dim")
        if hidden_size is None and "text_config" in config:
            hidden_size = config["text_config"].get("hidden_size") or config[
                "text_config"
            ].get("hidden_dim")

        if hidden_size is None:
            logger.warning(
                f"Could not calculate embedding parameters for {model_name} as hidden_size/hidden_dim is missing from config"
            )
            return None
        return vocab_size * hidden_size

    @staticmethod
    def _calculate_memory_usage_mb(
        model_name: str,
        n_parameters: int | None,
        *,
        fetch_from_hf: bool = False,
    ) -> int | None:
        MB = 1024**2  # noqa: N806

        if fetch_from_hf:
            try:
                safetensors_metadata = get_safetensors_metadata(model_name)
                if safetensors_metadata.parameter_count:
                    dtype_size_map = {
                        "F64": 8,  # 64-bit float
                        "F32": 4,  # 32-bit float (FP32)
                        "F16": 2,  # 16-bit float (FP16)
                        "BF16": 2,  # BFloat16
                        "I64": 8,  # 64-bit integer
                        "I32": 4,  # 32-bit integer
                        "I16": 2,  # 16-bit integer
                        "I8": 1,  # 8-bit integer
                        "U8": 1,  # Unsigned 8-bit integer
                        "BOOL": 1,  # Boolean (assuming 1 byte per value)
                    }
                    total_memory_bytes = sum(
                        parameters * dtype_size_map.get(dtype, 4)
                        for dtype, parameters in safetensors_metadata.parameter_count.items()
                    )
                    return round(total_memory_bytes / MB)  # Convert to MB
            except (
                NotASafetensorsRepoError,
                SafetensorsParsingError,
                GatedRepoError,
                RepositoryNotFoundError,
            ) as e:
                logger.warning(
                    f"Can't calculate memory usage for {model_name}. Got error {e}"
                )

        if n_parameters is None:
            return None
        # Model memory in bytes. For FP32 each parameter is 4 bytes.
        model_memory_bytes = n_parameters * 4

        # Convert to MB
        model_memory_mb = model_memory_bytes / MB
        return round(model_memory_mb)

    def calculate_memory_usage_mb(self, fetch_from_hf: bool = False) -> int | None:
        """Calculates the memory usage of the model in MB.

        Args:
            fetch_from_hf: If True, fetch safetensors metadata from HuggingFace Hub
                to get precise dtype-aware memory usage. If False (default), estimate
                from n_parameters assuming FP32 (4 bytes per parameter).

        Returns:
            The memory usage of the model in MB, or None if it cannot be determined.
        """
        if "API" in self.framework or self.name is None:
            return None

        return self._calculate_memory_usage_mb(
            self.name, self.n_parameters, fetch_from_hf=fetch_from_hf
        )

    @staticmethod
    def fetch_release_date(model_name: str) -> StrDate | None:
        """Fetches the release date from HuggingFace Hub based on the first commit.

        Returns:
            The release date in YYYY-MM-DD format, or None if it cannot be determined.
        """
        commits = _get_repo_commits(repo_id=model_name, repo_type="model")
        if commits:
            initial_commit = commits[-1]
            release_date = initial_commit.created_at.strftime("%Y-%m-%d")
            return release_date
        return None

    @staticmethod
    def _get_frameworks_from_hf_tags(model_name: str) -> list[FRAMEWORKS]:
        """Extract frameworks supported by the model from HuggingFace model tags.

        HuggingFace derives tags like ``pytorch``, ``tf``, ``jax``, ``onnx``,
        ``safetensors``, ``gguf``, and ``openvino`` from the files present in a
        repository. This method maps those tags to MTEB framework names.

        Args:
            model_name: HuggingFace model name

        Returns:
            List of framework names found in tags. Defaults to empty list if no frameworks found.
        """
        try:
            info = model_info(model_name)
            if not info.tags:
                return []
        except Exception as e:
            logger.warning(
                f"Failed to fetch frameworks from HuggingFace tags for {model_name}: {e}"
            )
            return []

        # Mapping from HuggingFace tags to MTEB framework names.
        # Order determines the order of the returned list.
        tag_to_framework: dict[str, FRAMEWORKS] = {
            "sentence-transformers": "Sentence Transformers",
            "pytorch": "PyTorch",
            "tf": "TensorFlow",
            "jax": "JAX",
            "transformers": "Transformers",
            "onnx": "ONNX",
            "safetensors": "safetensors",
            "gguf": "GGUF",
            "openvino": "OpenVINO",
        }

        frameworks: list[FRAMEWORKS] = []

        for framework_tag, framework_name in tag_to_framework.items():
            if framework_tag in info.tags:
                frameworks.append(framework_name)

        return frameworks

    def to_python(self) -> str:
        """Returns a string representation of the model."""
        return _pydantic_instance_to_code(self, exclude_fields=["experiment_kwargs"])

    def push_eval_results(
        self,
        user: str | None = None,
        *,
        tasks: Sequence[AbsTask] | Sequence[str] | None = None,
        cache: ResultCache | None = None,
        create_pr: bool = False,
    ) -> None:
        """Pushes the evaluation results of the model to the HuggingFace Hub.

        Args:
            user: The user or organization of results source.
            tasks: The tasks to push results for. If None, results for all tasks will be pushed.
            cache: The ResultCache containing the evaluation results to push.
            create_pr: Whether to create a pull request for the model card update if the model card already exists on the HuggingFace Hub. If False, the model card will be updated directly without a pull request.
        """
        from mteb.cache import ResultCache

        if cache is None:
            cache = ResultCache()

        benchmark_result = cache.load_results(
            models=[self],
            tasks=tasks,
        )
        model_result = benchmark_result.model_results[0]
        model_result.push_model_results(
            user=user,
            create_pr=create_pr,
        )

experiment_name property

Create a filesystem-safe string representation of the experiment parameters.

Uses deterministic serialization and hashing to ensure stable, bounded output.

Examples:

>>> import mteb
>>> model = mteb.get_model("mteb/baseline-random-encoder", param1="test")
>>>
>>> print(model.mteb_model_meta.experiment_name)
>>> # param1_test

is_cross_encoder property

Returns True if the model is a cross-encoder.

Derived from model_type field. A model is considered a cross-encoder if "cross-encoder" is in its model_type list.

model_name_with_experiment property

Combines the model name with the experiment parameters for a more descriptive name.

n_active_parameters property

Number of active parameters. Assumed to be n_parameters - n_embedding_parameters. Can be overwritten using n_active_parameters_override e.g. for MoE models.

__eq__(other)

Check equality based on name, revision, experiment_kwargs and embed_dim.

Two ModelMeta instances are equal if they have the same name, revision, experiment_kwargs and embed_dim.

Source code in mteb/models/model_meta.py
322
323
324
325
326
327
328
329
330
331
def __eq__(self, other: object) -> bool:
    """Check equality based on name, revision, experiment_kwargs and embed_dim.

    Two ModelMeta instances are equal if they have the same name, revision, experiment_kwargs and embed_dim.
    """
    if not isinstance(other, ModelMeta):
        return NotImplemented
    self_dict = self.model_dump()
    other_dict = other.model_dump()
    return self_dict == other_dict

__hash__()

Make ModelMeta hashable based on name, revision, experiment_kwargs and embed_dim.

This allows ModelMeta instances to be used as dictionary keys. Two ModelMeta instances with the same name, revision, experiment_kwargs and embed_dim will have the same hash.

Source code in mteb/models/model_meta.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def __hash__(self) -> int:
    """Make ModelMeta hashable based on name, revision, experiment_kwargs and embed_dim.

    This allows ModelMeta instances to be used as dictionary keys.
    Two ModelMeta instances with the same name, revision, experiment_kwargs and embed_dim will have the same hash.
    """
    # Serialize experiment_kwargs to a deterministic, hashable representation
    exp_kwargs_repr = (
        _serialize_experiment_kwargs_to_name(self.experiment_kwargs)
        if self.experiment_kwargs
        else None
    )
    return hash(
        (
            self.name,
            self.revision,
            exp_kwargs_repr,
            tuple(self.embed_dim)
            if isinstance(self.embed_dim, Sequence)
            else self.embed_dim,
        )
    )

__setattr__(name, value)

Deprecation warning for direct attribute mutation. Use model_copy(update={...}) instead.

Source code in mteb/models/model_meta.py
177
178
179
180
181
182
183
184
185
def __setattr__(self, name: str, value: Any) -> None:
    """Deprecation warning for direct attribute mutation. Use model_copy(update={...}) instead."""
    warnings.warn(
        f"Mutating '{name}' is deprecated and will be removed in future versions. "
        "Use .model_copy(update={...}) instead.",
        DeprecationWarning,
        stacklevel=2,
    )
    super().__setattr__(name, value)

calculate_memory_usage_mb(fetch_from_hf=False)

Calculates the memory usage of the model in MB.

Parameters:

Name Type Description Default
fetch_from_hf bool

If True, fetch safetensors metadata from HuggingFace Hub to get precise dtype-aware memory usage. If False (default), estimate from n_parameters assuming FP32 (4 bytes per parameter).

False

Returns:

Type Description
int | None

The memory usage of the model in MB, or None if it cannot be determined.

Source code in mteb/models/model_meta.py
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
def calculate_memory_usage_mb(self, fetch_from_hf: bool = False) -> int | None:
    """Calculates the memory usage of the model in MB.

    Args:
        fetch_from_hf: If True, fetch safetensors metadata from HuggingFace Hub
            to get precise dtype-aware memory usage. If False (default), estimate
            from n_parameters assuming FP32 (4 bytes per parameter).

    Returns:
        The memory usage of the model in MB, or None if it cannot be determined.
    """
    if "API" in self.framework or self.name is None:
        return None

    return self._calculate_memory_usage_mb(
        self.name, self.n_parameters, fetch_from_hf=fetch_from_hf
    )

calculate_num_parameters_from_hub()

Calculates the number of parameters in the model.

Returns:

Type Description
int | None

Number of parameters in the model.

Source code in mteb/models/model_meta.py
1050
1051
1052
1053
1054
1055
1056
def calculate_num_parameters_from_hub(self) -> int | None:
    """Calculates the number of parameters in the model.

    Returns:
        Number of parameters in the model.
    """
    return self._calculate_num_parameters_from_hub(self.name)

create_empty(overwrites=None) classmethod

Creates an empty ModelMeta with all fields set to None or empty.

Source code in mteb/models/model_meta.py
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
@classmethod
def create_empty(cls, overwrites: dict[str, Any] | None = None) -> Self:
    """Creates an empty ModelMeta with all fields set to None or empty."""
    empty_model = cls(
        loader=None,
        name=None,
        revision=None,
        release_date=None,
        languages=None,
        n_parameters=None,
        n_embedding_parameters=None,
        memory_usage_mb=None,
        max_tokens=None,
        embed_dim=None,
        license=None,
        open_weights=None,
        public_training_code=None,
        public_training_data=None,
        framework=[],
        reference=None,
        similarity_fn_name=None,
        use_instructions=None,
        training_datasets=None,
        adapted_from=None,
        superseded_by=None,
        citation=None,
        contacts=None,
    )
    if overwrites:
        empty_model = empty_model.model_copy(update=overwrites)

    updates: dict[str, Any] = {}
    if empty_model.name is None:
        updates["name"] = "no_model_name/available"
    if empty_model.revision is None:
        updates["revision"] = "no_revision_available"
    if updates:
        empty_model = empty_model.model_copy(update=updates)

    return empty_model

fetch_release_date(model_name) staticmethod

Fetches the release date from HuggingFace Hub based on the first commit.

Returns:

Type Description
StrDate | None

The release date in YYYY-MM-DD format, or None if it cannot be determined.

Source code in mteb/models/model_meta.py
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
@staticmethod
def fetch_release_date(model_name: str) -> StrDate | None:
    """Fetches the release date from HuggingFace Hub based on the first commit.

    Returns:
        The release date in YYYY-MM-DD format, or None if it cannot be determined.
    """
    commits = _get_repo_commits(repo_id=model_name, repo_type="model")
    if commits:
        initial_commit = commits[-1]
        release_date = initial_commit.created_at.strftime("%Y-%m-%d")
        return release_date
    return None

from_cross_encoder(model, revision=None, fill_missing=None, compute_metadata=None, fetch_from_hf=False) classmethod

Generates a ModelMeta from a CrossEncoder.

Parameters:

Name Type Description Default
model CrossEncoder

The CrossEncoder model

required
revision str | None

Revision of the model

None
fill_missing bool | None

Fill missing attributes from the metadata including number of parameters and memory usage.

None
compute_metadata bool | None

Deprecated. Use fill_missing instead.

None
fetch_from_hf bool

Whether to fetch additional metadata from HuggingFace Hub based on the model name. If False, only metadata that can be extracted from the CrossEncoder model will be used.

False

Returns:

Type Description
Self

The generated ModelMeta

Source code in mteb/models/model_meta.py
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
@classmethod
def from_cross_encoder(
    cls,
    model: CrossEncoder,
    revision: str | None = None,
    fill_missing: bool | None = None,
    compute_metadata: bool | None = None,
    fetch_from_hf: bool = False,
) -> Self:
    """Generates a ModelMeta from a CrossEncoder.

    Args:
        model: The CrossEncoder model
        revision: Revision of the model
        fill_missing: Fill missing attributes from the metadata including number of parameters and memory usage.
        compute_metadata: Deprecated. Use fill_missing instead.
        fetch_from_hf: Whether to fetch additional metadata from HuggingFace Hub based on the model name. If False, only metadata that can be
            extracted from the CrossEncoder model will be used.

    Returns:
        The generated ModelMeta
    """
    if compute_metadata is not None:
        warnings.warn(
            "The compute_metadata parameter is deprecated and will be removed in a future version. "
            f"Use fetch_from_hf instead. Setting `fetch_from_hf={compute_metadata}`.",
            DeprecationWarning,
            stacklevel=2,
        )
        fetch_from_hf = compute_metadata
    if fill_missing is not None:
        warnings.warn(
            "The fill_missing parameter is deprecated and will be removed in a future version. "
            f"Use fill_missing instead. Setting `fill_missing={fill_missing}`.",
            DeprecationWarning,
            stacklevel=2,
        )
        fetch_from_hf = fill_missing

    meta = cls._from_cross_encoder_model(model)
    if fetch_from_hf:
        name = cast("str", meta.name)
        meta_hub = cls._from_hub(name, revision)
        # prioritize metadata from the model card but fill missing fields from the hub
        meta = meta_hub.merge(meta)

    return meta

from_hub(model, revision=None, fill_missing=None, compute_metadata=None) classmethod

Generates a ModelMeta for model from HuggingFace hub.

Parameters:

Name Type Description Default
model str

Name of the model from HuggingFace hub. For example, intfloat/multilingual-e5-large

required
revision str | None

Revision of the model

None
fill_missing bool | None

Deprecated. The fill missing did not add any functionality for this function, but was added for compatibility with 'from_sentence_transformer_model' and from_cross_encoder. It will be removed in a future version.

None
compute_metadata bool | None

Deprecated. Was superseded by fill_missing.

None

Returns:

Type Description
Self

The generated ModelMeta.

Source code in mteb/models/model_meta.py
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
@classmethod
def from_hub(
    cls,
    model: str,
    revision: str | None = None,
    fill_missing: bool | None = None,
    compute_metadata: bool | None = None,
) -> Self:
    """Generates a ModelMeta for model from HuggingFace hub.

    Args:
        model: Name of the model from HuggingFace hub. For example, `intfloat/multilingual-e5-large`
        revision: Revision of the model
        fill_missing: Deprecated. The fill missing did not add any functionality for this function, but was added for compatibility with
            'from_sentence_transformer_model' and `from_cross_encoder`. It will be removed in a future version.
        compute_metadata: Deprecated. Was superseded by fill_missing.

    Returns:
        The generated ModelMeta.
    """
    if compute_metadata is not None:
        warnings.warn(
            "The compute_metadata parameter is deprecated and will be removed in a future version. It will be ignored.",
            DeprecationWarning,
            stacklevel=2,
        )

    if fill_missing is not None:
        warnings.warn(
            "The fill_missing parameter is deprecated and will be removed in a future version. It will be ignored.",
            DeprecationWarning,
            stacklevel=2,
        )

    meta = cls._from_hub(
        model,
        revision,
    )

    return meta

from_sentence_transformer_model(model, revision=None, fill_missing=False, compute_metadata=None, fetch_from_hf=False) classmethod

Generates a ModelMeta from a SentenceTransformer model.

Parameters:

Name Type Description Default
model SentenceTransformer

SentenceTransformer model.

required
revision str | None

Revision of the model

None
fill_missing bool

Fill missing attributes from the metadata including number of parameters and memory usage.

False
compute_metadata bool | None

Deprecated. Use fill_missing instead.

None
fetch_from_hf bool

Whether to fetch additional metadata from HuggingFace Hub based on the model name. If False, only metadata that can be extracted from the SentenceTransformer model will be used.

False

Returns:

Type Description
Self

The generated ModelMeta.

Source code in mteb/models/model_meta.py
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
@classmethod
def from_sentence_transformer_model(
    cls,
    model: SentenceTransformer,
    revision: str | None = None,
    fill_missing: bool = False,
    compute_metadata: bool | None = None,
    fetch_from_hf: bool = False,
) -> Self:
    """Generates a ModelMeta from a SentenceTransformer model.

    Args:
        model: SentenceTransformer model.
        revision: Revision of the model
        fill_missing: Fill missing attributes from the metadata including number of parameters and memory usage.
        compute_metadata: Deprecated. Use fill_missing instead.
        fetch_from_hf: Whether to fetch additional metadata from HuggingFace Hub based on the model name. If False, only metadata that can be
            extracted from the SentenceTransformer model will be used.

    Returns:
        The generated ModelMeta.
    """
    if compute_metadata is not None:
        warnings.warn(
            "The compute_metadata parameter is deprecated and will be removed in a future version. "
            f"Use fetch_from_hf instead. Setting `fetch_from_hf={compute_metadata}`.",
            DeprecationWarning,
            stacklevel=2,
        )
        fetch_from_hf = compute_metadata

    if fill_missing is not None:
        warnings.warn(
            "The fill_missing parameter is deprecated and will be removed in a future version. "
            f"Use fetch_from_hf instead. Setting `fetch_from_hf={fill_missing}`.",
            DeprecationWarning,
            stacklevel=2,
        )
        fetch_from_hf = fill_missing

    meta = cls._from_sentence_transformer_model(model)
    if fetch_from_hf:
        if meta.name is None:
            logger.warning(
                "Model name is not set in metadata extracted from SentenceTransformer model. Cannot fetch additional metadata from HuggingFace Hub."
            )
        else:
            name = cast("str", meta.name)
            meta_hub = cls._from_hub(name, revision)
            # prioritize metadata from the model card but fill missing fields from the hub
            meta = meta_hub.merge(meta)

    return meta

get_training_datasets()

Returns all training datasets of the model including similar tasks.

Source code in mteb/models/model_meta.py
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
def get_training_datasets(self) -> set[str] | None:
    """Returns all training datasets of the model including similar tasks."""
    import mteb

    if self.training_datasets is None:
        return None

    training_datasets = self.training_datasets.copy()
    if self.adapted_from is not None:
        try:
            adapted_from_model = mteb.get_model_meta(
                self.adapted_from, fetch_from_hf=False
            )
            adapted_training_datasets = adapted_from_model.get_training_datasets()
            if adapted_training_datasets is not None:
                training_datasets |= adapted_training_datasets
        except (ValueError, KeyError) as e:
            msg = f"Could not get source model: {e} in MTEB"
            logger.warning(msg)
            warnings.warn(msg)

    return_dataset = training_datasets.copy()
    visited: set[str] = set()

    for dataset in training_datasets:
        similar_tasks = _collect_similar_tasks(dataset, visited)
        return_dataset |= similar_tasks

    return return_dataset

is_zero_shot_on(tasks)

Indicates whether the given model can be considered zero-shot or not on the given tasks.

Returns:

Type Description
bool | None

None if no training data is specified on the model.

Source code in mteb/models/model_meta.py
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
def is_zero_shot_on(self, tasks: Sequence[AbsTask] | Sequence[str]) -> bool | None:
    """Indicates whether the given model can be considered zero-shot or not on the given tasks.

    Returns:
         None if no training data is specified on the model.
    """
    # If no tasks were specified, we're obviously zero-shot
    if not tasks:
        return True
    training_datasets = self.get_training_datasets()
    # If no tasks were specified, we're obviously zero-shot
    if training_datasets is None:
        return None

    if isinstance(tasks[0], str):
        benchmark_datasets = set(tasks)
    else:
        tasks = cast("Sequence[AbsTask]", tasks)
        benchmark_datasets = set()
        for task in tasks:
            benchmark_datasets.add(task.metadata.name)
    intersection = training_datasets & benchmark_datasets
    return len(intersection) == 0

load_model(device=None, *, embed_dim=None, **kwargs)

Loads the model using the specified loader function.

Source code in mteb/models/model_meta.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
def load_model(
    self,
    device: str | None = None,
    *,
    embed_dim: int | None = None,
    **kwargs: Any,
) -> MTEBModels:
    """Loads the model using the specified loader function."""
    # create a copy so that changing the model meta on the model does not influence the original meta
    _self = self.model_copy(deep=True)

    if _self.loader is None:
        raise NotImplementedError(
            "No model implementation is available for this model."
        )
    if _self.name is None:
        raise ValueError("name is not set for ModelMeta. Cannot load model.")

    loader = _self.loader
    name = _self.name
    revision = _self.revision
    updates: dict[str, Any] = {}
    base_exp_kwargs = (
        dict(_self.experiment_kwargs) if _self.experiment_kwargs else {}
    )

    if embed_dim is not None:
        if (
            _self.embed_dim is not None
            and isinstance(_self.embed_dim, int)
            and _self.embed_dim != embed_dim
        ):
            raise ValueError(
                f"Requested embedding dimension {embed_dim} does not match the model's embedding dimension {_self.embed_dim}. "
                "Model does not support loading with a different embedding dimension. "
                "You can change supported embedding dimensions in `meta.embed_dim`."
            )
        elif isinstance(_self.embed_dim, list) and embed_dim not in _self.embed_dim:
            raise ValueError(
                f"Requested embedding dimension {embed_dim} is not in the model's supported embedding dimensions {_self.embed_dim}."
            )
        updates["embed_dim"] = embed_dim
        kwargs["embed_dim"] = embed_dim

    merged_exp_kwargs = {**base_exp_kwargs, **kwargs} if kwargs else base_exp_kwargs
    updates["experiment_kwargs"] = merged_exp_kwargs or None

    # Allow overwrites
    _kwargs = _self.loader_kwargs.copy()
    _kwargs.update(merged_exp_kwargs)
    if device is not None:
        _kwargs["device"] = device

    updates["loader_kwargs"] = _kwargs
    _self = _self.model_copy(update=updates)
    model: MTEBModels = loader(
        name,
        revision=revision,
        **_kwargs,
    )
    model.mteb_model_meta = _self  # type: ignore[misc]
    return model

merge(overwrite)

Merges another this ModelMeta with another ModelMeta.

Parameters:

Name Type Description Default
overwrite Self

The ModelMeta to merge into this one. Non-None fields in this ModelMeta will overwrite the corresponding fields in this ModelMeta. the framework and model_type fields with combined.

required

Returns:

Type Description
Self

A new ModelMeta with the merged fields.

Source code in mteb/models/model_meta.py
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
def merge(self, overwrite: Self) -> Self:
    """Merges another this ModelMeta with another ModelMeta.

    Args:
        overwrite: The ModelMeta to merge into this one. Non-None fields in this ModelMeta will overwrite the corresponding fields in this
            ModelMeta. the `framework` and `model_type` fields with combined.

    Returns:
        A new ModelMeta with the merged fields.
    """
    merged_data = self.model_dump()
    overwrite_data = overwrite.model_dump()

    for key, value in overwrite_data.items():
        if (
            key == "name"
            and value == "no_model_name/available"
            and self.name != "no_model_name/available"
        ):
            continue  # skip overwriting name if overwrite has no name available
        if (
            key == "revision"
            and value == "no_revision_available"
            and self.revision != "no_revision_available"
        ):
            continue  # skip overwriting revision if overwrite has no revision available
        if key in ["framework", "model_type"]:  # noqa: PLR6201
            # Combine lists and remove duplicates
            merged_list = set(merged_data.get(key, [])) | set(value or [])
            merged_data[key] = list(merged_list)
        if value is not None:
            merged_data[key] = value

    return self.model_copy(update=merged_data)

model_name_as_path()

Returns the model name in a format that can be used as a file path.

Replaces "/" with "__" and spaces with "_".

Source code in mteb/models/model_meta.py
396
397
398
399
400
401
402
403
def model_name_as_path(self) -> str:
    """Returns the model name in a format that can be used as a file path.

    Replaces "/" with "__" and spaces with "_".
    """
    if self.name is None:
        raise ValueError("Model name is not set")
    return self.name.replace("/", "__").replace(" ", "_")

push_eval_results(user=None, *, tasks=None, cache=None, create_pr=False)

Pushes the evaluation results of the model to the HuggingFace Hub.

Parameters:

Name Type Description Default
user str | None

The user or organization of results source.

None
tasks Sequence[AbsTask] | Sequence[str] | None

The tasks to push results for. If None, results for all tasks will be pushed.

None
cache ResultCache | None

The ResultCache containing the evaluation results to push.

None
create_pr bool

Whether to create a pull request for the model card update if the model card already exists on the HuggingFace Hub. If False, the model card will be updated directly without a pull request.

False
Source code in mteb/models/model_meta.py
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
def push_eval_results(
    self,
    user: str | None = None,
    *,
    tasks: Sequence[AbsTask] | Sequence[str] | None = None,
    cache: ResultCache | None = None,
    create_pr: bool = False,
) -> None:
    """Pushes the evaluation results of the model to the HuggingFace Hub.

    Args:
        user: The user or organization of results source.
        tasks: The tasks to push results for. If None, results for all tasks will be pushed.
        cache: The ResultCache containing the evaluation results to push.
        create_pr: Whether to create a pull request for the model card update if the model card already exists on the HuggingFace Hub. If False, the model card will be updated directly without a pull request.
    """
    from mteb.cache import ResultCache

    if cache is None:
        cache = ResultCache()

    benchmark_result = cache.load_results(
        models=[self],
        tasks=tasks,
    )
    model_result = benchmark_result.model_results[0]
    model_result.push_model_results(
        user=user,
        create_pr=create_pr,
    )

to_dict()

Returns a dictionary representation of the model metadata.

Source code in mteb/models/model_meta.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def to_dict(self):
    """Returns a dictionary representation of the model metadata."""
    meta = self.model_copy(deep=True)
    dict_repr = meta.model_dump()
    if isinstance(meta.embed_dim, Sequence):
        dict_repr["embed_dim"] = max(meta.embed_dim)
    loader = dict_repr.pop("loader", None)
    dict_repr["training_datasets"] = (
        list(dict_repr["training_datasets"])
        if isinstance(dict_repr["training_datasets"], set)
        else dict_repr["training_datasets"]
    )
    dict_repr["loader"] = _get_loader_name(loader)
    dict_repr["is_cross_encoder"] = self.is_cross_encoder
    return dict_repr

to_python()

Returns a string representation of the model.

Source code in mteb/models/model_meta.py
1228
1229
1230
def to_python(self) -> str:
    """Returns a string representation of the model."""
    return _pydantic_instance_to_code(self, exclude_fields=["experiment_kwargs"])

zero_shot_percentage(tasks)

Indicates how out-of-domain the selected tasks are for the given model.

Parameters:

Name Type Description Default
tasks Sequence[AbsTask] | Sequence[str]

A sequence of tasks or dataset names to evaluate against.

required

Returns:

Type Description
int | None

An integer percentage (0-100) indicating how out-of-domain the tasks are for the model.

int | None

Returns None if no training data is specified on the model or if no tasks are provided.

Source code in mteb/models/model_meta.py
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
def zero_shot_percentage(
    self, tasks: Sequence[AbsTask] | Sequence[str]
) -> int | None:
    """Indicates how out-of-domain the selected tasks are for the given model.

    Args:
        tasks: A sequence of tasks or dataset names to evaluate against.

    Returns:
        An integer percentage (0-100) indicating how out-of-domain the tasks are for the model.
        Returns None if no training data is specified on the model or if no tasks are provided.
    """
    training_datasets = self.get_training_datasets()
    if (training_datasets is None) or (not tasks):
        return None
    if isinstance(tasks[0], str):
        benchmark_datasets = set(tasks)
    else:
        tasks = cast("Sequence[AbsTask]", tasks)
        benchmark_datasets = {task.metadata.name for task in tasks}
    overlap = training_datasets & benchmark_datasets
    perc_overlap = 100 * (len(overlap) / len(benchmark_datasets))
    return int(100 - perc_overlap)

Model Protocols

mteb.models.EncoderProtocol

Bases: Protocol

The interface for an encoder in MTEB.

Besides the required functions specified below, the encoder can additionally specify the following signatures seen below. In general the interface is kept aligned with sentence-transformers interface. In cases where exceptions occurs these are handled within MTEB.

Source code in mteb/models/models_protocols.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
@runtime_checkable
class EncoderProtocol(Protocol):
    """The interface for an encoder in MTEB.

    Besides the required functions specified below, the encoder can additionally specify the following signatures seen below.
    In general the interface is kept aligned with sentence-transformers interface. In cases where exceptions occurs these are handled within MTEB.
    """

    def __init__(
        self,
        model_name: str,
        revision: str | None,
        *,
        device: str | None = None,
        **kwargs: Any,
    ) -> None:
        """The initialization function for the encoder. Used when calling it from the mteb run CLI.

        Args:
            model_name: Name of the model
            revision: revision of the model
            device: Device used to load the model
            kwargs: Any additional kwargs
        """
        ...

    def encode(
        self,
        inputs: DataLoader[BatchedInput],
        *,
        task_metadata: TaskMetadata,
        hf_split: str,
        hf_subset: str,
        prompt_type: PromptType | None = None,
        **kwargs: Unpack[EncodeKwargs],
    ) -> Array:
        """Encodes the given sentences using the encoder.

        Args:
            inputs: Batch of inputs to encode.
            task_metadata: The metadata of the task. Encoders (e.g. SentenceTransformers) use to
                select the appropriate prompts, with priority given to more specific task/prompt combinations over general ones.

                The order of priorities for prompt selection are:
                    1. Composed prompt of task name + prompt type (query or passage)
                    2. Specific task prompt
                    3. Composed prompt of task type + prompt type (query or passage)
                    4. Specific task type prompt
                    5. Specific prompt type (query or passage)
            hf_split: Split of current task, allows to know some additional information about current split.
                E.g. Current language
            hf_subset: Subset of current task. Similar to `hf_split` to get more information
            prompt_type: The name type of prompt. (query or passage)
            **kwargs: Additional arguments to pass to the encoder.

        Returns:
            The encoded input in a numpy array or torch tensor of the shape (Number of sentences) x (Embedding dimension).
        """
        ...

    def similarity(
        self,
        embeddings1: Array,
        embeddings2: Array,
    ) -> Array:
        """Compute the similarity between two collections of embeddings.

        The output will be a matrix with the similarity scores between all embeddings from the first parameter and all
        embeddings from the second parameter. This differs from similarity_pairwise which computes the similarity
        between corresponding pairs of embeddings.

        Read more at: https://www.sbert.net/docs/package_reference/sentence_transformer/SentenceTransformer.html#sentence_transformers.SentenceTransformer.similarity

        Args:
            embeddings1: [num_embeddings_1, embedding_dim] or [embedding_dim]-shaped numpy array or torch tensor.
            embeddings2: [num_embeddings_2, embedding_dim] or [embedding_dim]-shaped numpy array or torch tensor.

        Returns:
            A [num_embeddings_1, num_embeddings_2]-shaped torch tensor with similarity scores.
        """
        ...

    def similarity_pairwise(
        self,
        embeddings1: Array,
        embeddings2: Array,
    ) -> Array:
        """Compute the similarity between two collections of embeddings. The output will be a vector with the similarity scores between each pair of embeddings.

        Read more at: https://www.sbert.net/docs/package_reference/sentence_transformer/SentenceTransformer.html#sentence_transformers.SentenceTransformer.similarity_pairwise

        Args:
            embeddings1: [num_embeddings, embedding_dim] or [embedding_dim]-shaped numpy array or torch tensor.
            embeddings2: [num_embeddings, embedding_dim] or [embedding_dim]-shaped numpy array or torch tensor.

        Returns:
            A [num_embeddings]-shaped torch tensor with pairwise similarity scores.
        """
        ...

    @property
    def mteb_model_meta(self) -> ModelMeta:
        """Metadata of the model"""
        ...

mteb_model_meta property

Metadata of the model

__init__(model_name, revision, *, device=None, **kwargs)

The initialization function for the encoder. Used when calling it from the mteb run CLI.

Parameters:

Name Type Description Default
model_name str

Name of the model

required
revision str | None

revision of the model

required
device str | None

Device used to load the model

None
kwargs Any

Any additional kwargs

{}
Source code in mteb/models/models_protocols.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def __init__(
    self,
    model_name: str,
    revision: str | None,
    *,
    device: str | None = None,
    **kwargs: Any,
) -> None:
    """The initialization function for the encoder. Used when calling it from the mteb run CLI.

    Args:
        model_name: Name of the model
        revision: revision of the model
        device: Device used to load the model
        kwargs: Any additional kwargs
    """
    ...

encode(inputs, *, task_metadata, hf_split, hf_subset, prompt_type=None, **kwargs)

Encodes the given sentences using the encoder.

Parameters:

Name Type Description Default
inputs DataLoader[BatchedInput]

Batch of inputs to encode.

required
task_metadata TaskMetadata

The metadata of the task. Encoders (e.g. SentenceTransformers) use to select the appropriate prompts, with priority given to more specific task/prompt combinations over general ones.

The order of priorities for prompt selection are: 1. Composed prompt of task name + prompt type (query or passage) 2. Specific task prompt 3. Composed prompt of task type + prompt type (query or passage) 4. Specific task type prompt 5. Specific prompt type (query or passage)

required
hf_split str

Split of current task, allows to know some additional information about current split. E.g. Current language

required
hf_subset str

Subset of current task. Similar to hf_split to get more information

required
prompt_type PromptType | None

The name type of prompt. (query or passage)

None
**kwargs Unpack[EncodeKwargs]

Additional arguments to pass to the encoder.

{}

Returns:

Type Description
Array

The encoded input in a numpy array or torch tensor of the shape (Number of sentences) x (Embedding dimension).

Source code in mteb/models/models_protocols.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def encode(
    self,
    inputs: DataLoader[BatchedInput],
    *,
    task_metadata: TaskMetadata,
    hf_split: str,
    hf_subset: str,
    prompt_type: PromptType | None = None,
    **kwargs: Unpack[EncodeKwargs],
) -> Array:
    """Encodes the given sentences using the encoder.

    Args:
        inputs: Batch of inputs to encode.
        task_metadata: The metadata of the task. Encoders (e.g. SentenceTransformers) use to
            select the appropriate prompts, with priority given to more specific task/prompt combinations over general ones.

            The order of priorities for prompt selection are:
                1. Composed prompt of task name + prompt type (query or passage)
                2. Specific task prompt
                3. Composed prompt of task type + prompt type (query or passage)
                4. Specific task type prompt
                5. Specific prompt type (query or passage)
        hf_split: Split of current task, allows to know some additional information about current split.
            E.g. Current language
        hf_subset: Subset of current task. Similar to `hf_split` to get more information
        prompt_type: The name type of prompt. (query or passage)
        **kwargs: Additional arguments to pass to the encoder.

    Returns:
        The encoded input in a numpy array or torch tensor of the shape (Number of sentences) x (Embedding dimension).
    """
    ...

similarity(embeddings1, embeddings2)

Compute the similarity between two collections of embeddings.

The output will be a matrix with the similarity scores between all embeddings from the first parameter and all embeddings from the second parameter. This differs from similarity_pairwise which computes the similarity between corresponding pairs of embeddings.

Read more at: https://www.sbert.net/docs/package_reference/sentence_transformer/SentenceTransformer.html#sentence_transformers.SentenceTransformer.similarity

Parameters:

Name Type Description Default
embeddings1 Array

[num_embeddings_1, embedding_dim] or [embedding_dim]-shaped numpy array or torch tensor.

required
embeddings2 Array

[num_embeddings_2, embedding_dim] or [embedding_dim]-shaped numpy array or torch tensor.

required

Returns:

Type Description
Array

A [num_embeddings_1, num_embeddings_2]-shaped torch tensor with similarity scores.

Source code in mteb/models/models_protocols.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def similarity(
    self,
    embeddings1: Array,
    embeddings2: Array,
) -> Array:
    """Compute the similarity between two collections of embeddings.

    The output will be a matrix with the similarity scores between all embeddings from the first parameter and all
    embeddings from the second parameter. This differs from similarity_pairwise which computes the similarity
    between corresponding pairs of embeddings.

    Read more at: https://www.sbert.net/docs/package_reference/sentence_transformer/SentenceTransformer.html#sentence_transformers.SentenceTransformer.similarity

    Args:
        embeddings1: [num_embeddings_1, embedding_dim] or [embedding_dim]-shaped numpy array or torch tensor.
        embeddings2: [num_embeddings_2, embedding_dim] or [embedding_dim]-shaped numpy array or torch tensor.

    Returns:
        A [num_embeddings_1, num_embeddings_2]-shaped torch tensor with similarity scores.
    """
    ...

similarity_pairwise(embeddings1, embeddings2)

Compute the similarity between two collections of embeddings. The output will be a vector with the similarity scores between each pair of embeddings.

Read more at: https://www.sbert.net/docs/package_reference/sentence_transformer/SentenceTransformer.html#sentence_transformers.SentenceTransformer.similarity_pairwise

Parameters:

Name Type Description Default
embeddings1 Array

[num_embeddings, embedding_dim] or [embedding_dim]-shaped numpy array or torch tensor.

required
embeddings2 Array

[num_embeddings, embedding_dim] or [embedding_dim]-shaped numpy array or torch tensor.

required

Returns:

Type Description
Array

A [num_embeddings]-shaped torch tensor with pairwise similarity scores.

Source code in mteb/models/models_protocols.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def similarity_pairwise(
    self,
    embeddings1: Array,
    embeddings2: Array,
) -> Array:
    """Compute the similarity between two collections of embeddings. The output will be a vector with the similarity scores between each pair of embeddings.

    Read more at: https://www.sbert.net/docs/package_reference/sentence_transformer/SentenceTransformer.html#sentence_transformers.SentenceTransformer.similarity_pairwise

    Args:
        embeddings1: [num_embeddings, embedding_dim] or [embedding_dim]-shaped numpy array or torch tensor.
        embeddings2: [num_embeddings, embedding_dim] or [embedding_dim]-shaped numpy array or torch tensor.

    Returns:
        A [num_embeddings]-shaped torch tensor with pairwise similarity scores.
    """
    ...

mteb.models.SearchProtocol

Bases: Protocol

Interface for searching models.

Source code in mteb/models/models_protocols.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@runtime_checkable
class SearchProtocol(Protocol):
    """Interface for searching models."""

    def index(
        self,
        corpus: CorpusDatasetType,
        *,
        task_metadata: TaskMetadata,
        hf_split: str,
        hf_subset: str,
        encode_kwargs: EncodeKwargs,
        num_proc: int | None,
    ) -> None:
        """Index the corpus for retrieval.

        Args:
            corpus: Corpus dataset to index.
            task_metadata: Metadata of the task, used to determine how to index the corpus.
            hf_split: Split of current task, allows to know some additional information about current split.
            hf_subset: Subset of current task. Similar to `hf_split` to get more information
            encode_kwargs: Additional arguments to pass to the encoder during indexing.
            num_proc: Number of processes to use for dataloading.
        """
        ...

    def search(
        self,
        queries: QueryDatasetType,
        *,
        task_metadata: TaskMetadata,
        hf_split: str,
        hf_subset: str,
        top_k: int,
        encode_kwargs: EncodeKwargs,
        top_ranked: TopRankedDocumentsType | None = None,
        num_proc: int | None,
    ) -> RetrievalOutputType:
        """Search the corpus using the given queries.

        Args:
            queries: Queries to find
            task_metadata: Task metadata
            hf_split: split of the dataset
            hf_subset: subset of the dataset
            top_ranked: Top-ranked documents for each query, mapping query IDs to a list of document IDs.
                Passed only from Reranking tasks.
            top_k: Number of top documents to return for each query.
            encode_kwargs: Additional arguments to pass to the encoder during indexing.
            num_proc: Number of processes to use for dataloading.

        Returns:
            Dictionary with query IDs as keys with dict as values, where each value is a mapping of document IDs to their relevance scores.
        """
        ...

    @property
    def mteb_model_meta(self) -> ModelMeta:
        """Metadata of the model"""
        ...

mteb_model_meta property

Metadata of the model

index(corpus, *, task_metadata, hf_split, hf_subset, encode_kwargs, num_proc)

Index the corpus for retrieval.

Parameters:

Name Type Description Default
corpus CorpusDatasetType

Corpus dataset to index.

required
task_metadata TaskMetadata

Metadata of the task, used to determine how to index the corpus.

required
hf_split str

Split of current task, allows to know some additional information about current split.

required
hf_subset str

Subset of current task. Similar to hf_split to get more information

required
encode_kwargs EncodeKwargs

Additional arguments to pass to the encoder during indexing.

required
num_proc int | None

Number of processes to use for dataloading.

required
Source code in mteb/models/models_protocols.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def index(
    self,
    corpus: CorpusDatasetType,
    *,
    task_metadata: TaskMetadata,
    hf_split: str,
    hf_subset: str,
    encode_kwargs: EncodeKwargs,
    num_proc: int | None,
) -> None:
    """Index the corpus for retrieval.

    Args:
        corpus: Corpus dataset to index.
        task_metadata: Metadata of the task, used to determine how to index the corpus.
        hf_split: Split of current task, allows to know some additional information about current split.
        hf_subset: Subset of current task. Similar to `hf_split` to get more information
        encode_kwargs: Additional arguments to pass to the encoder during indexing.
        num_proc: Number of processes to use for dataloading.
    """
    ...

search(queries, *, task_metadata, hf_split, hf_subset, top_k, encode_kwargs, top_ranked=None, num_proc)

Search the corpus using the given queries.

Parameters:

Name Type Description Default
queries QueryDatasetType

Queries to find

required
task_metadata TaskMetadata

Task metadata

required
hf_split str

split of the dataset

required
hf_subset str

subset of the dataset

required
top_ranked TopRankedDocumentsType | None

Top-ranked documents for each query, mapping query IDs to a list of document IDs. Passed only from Reranking tasks.

None
top_k int

Number of top documents to return for each query.

required
encode_kwargs EncodeKwargs

Additional arguments to pass to the encoder during indexing.

required
num_proc int | None

Number of processes to use for dataloading.

required

Returns:

Type Description
RetrievalOutputType

Dictionary with query IDs as keys with dict as values, where each value is a mapping of document IDs to their relevance scores.

Source code in mteb/models/models_protocols.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def search(
    self,
    queries: QueryDatasetType,
    *,
    task_metadata: TaskMetadata,
    hf_split: str,
    hf_subset: str,
    top_k: int,
    encode_kwargs: EncodeKwargs,
    top_ranked: TopRankedDocumentsType | None = None,
    num_proc: int | None,
) -> RetrievalOutputType:
    """Search the corpus using the given queries.

    Args:
        queries: Queries to find
        task_metadata: Task metadata
        hf_split: split of the dataset
        hf_subset: subset of the dataset
        top_ranked: Top-ranked documents for each query, mapping query IDs to a list of document IDs.
            Passed only from Reranking tasks.
        top_k: Number of top documents to return for each query.
        encode_kwargs: Additional arguments to pass to the encoder during indexing.
        num_proc: Number of processes to use for dataloading.

    Returns:
        Dictionary with query IDs as keys with dict as values, where each value is a mapping of document IDs to their relevance scores.
    """
    ...

mteb.models.CrossEncoderProtocol

Bases: Protocol

The interface for a CrossEncoder in MTEB.

Besides the required functions specified below, the cross-encoder can additionally specify the following signatures seen below. In general the interface is kept aligned with sentence-transformers interface. In cases where exceptions occurs these are handled within MTEB.

Source code in mteb/models/models_protocols.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
@runtime_checkable
class CrossEncoderProtocol(Protocol):
    """The interface for a CrossEncoder in MTEB.

    Besides the required functions specified below, the cross-encoder can additionally specify the following signatures seen below.
    In general the interface is kept aligned with sentence-transformers interface. In cases where exceptions occurs these are handled within MTEB.
    """

    def __init__(
        self,
        model_name: str,
        revision: str | None,
        device: str | None = None,
        **kwargs: Any,
    ) -> None:
        """The initialization function for the encoder. Used when calling it from the mteb run CLI.

        Args:
            model_name: Name of the model
            revision: revision of the model
            device: Device used to load the model
            kwargs: Any additional kwargs
        """
        ...

    def predict(
        self,
        inputs1: DataLoader[BatchedInput],
        inputs2: DataLoader[BatchedInput],
        *,
        task_metadata: TaskMetadata,
        hf_split: str,
        hf_subset: str,
        prompt_type: PromptType | None = None,
        **kwargs: Unpack[EncodeKwargs],
    ) -> Array:
        """Predicts relevance scores for pairs of inputs. Note that, unlike the encoder, the cross-encoder can compare across inputs.

        Args:
            inputs1: First Dataloader of inputs to encode. For reranking tasks, these are queries (for text only tasks `QueryDatasetType`).
            inputs2: Second Dataloader of inputs to encode. For reranking, these are documents (for text only tasks `RetrievalOutputType`).
            task_metadata: Metadata of the current task.
            hf_split: Split of current task, allows to know some additional information about current split.
                E.g. Current language
            hf_subset: Subset of current task. Similar to `hf_split` to get more information
            prompt_type: The name type of prompt. (query or passage)
            **kwargs: Additional arguments to pass to the cross-encoder.

        Returns:
            The predicted relevance scores for each inputs pair.
        """
        ...

    @property
    def mteb_model_meta(self) -> ModelMeta:
        """Metadata of the model"""
        ...

mteb_model_meta property

Metadata of the model

__init__(model_name, revision, device=None, **kwargs)

The initialization function for the encoder. Used when calling it from the mteb run CLI.

Parameters:

Name Type Description Default
model_name str

Name of the model

required
revision str | None

revision of the model

required
device str | None

Device used to load the model

None
kwargs Any

Any additional kwargs

{}
Source code in mteb/models/models_protocols.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def __init__(
    self,
    model_name: str,
    revision: str | None,
    device: str | None = None,
    **kwargs: Any,
) -> None:
    """The initialization function for the encoder. Used when calling it from the mteb run CLI.

    Args:
        model_name: Name of the model
        revision: revision of the model
        device: Device used to load the model
        kwargs: Any additional kwargs
    """
    ...

predict(inputs1, inputs2, *, task_metadata, hf_split, hf_subset, prompt_type=None, **kwargs)

Predicts relevance scores for pairs of inputs. Note that, unlike the encoder, the cross-encoder can compare across inputs.

Parameters:

Name Type Description Default
inputs1 DataLoader[BatchedInput]

First Dataloader of inputs to encode. For reranking tasks, these are queries (for text only tasks QueryDatasetType).

required
inputs2 DataLoader[BatchedInput]

Second Dataloader of inputs to encode. For reranking, these are documents (for text only tasks RetrievalOutputType).

required
task_metadata TaskMetadata

Metadata of the current task.

required
hf_split str

Split of current task, allows to know some additional information about current split. E.g. Current language

required
hf_subset str

Subset of current task. Similar to hf_split to get more information

required
prompt_type PromptType | None

The name type of prompt. (query or passage)

None
**kwargs Unpack[EncodeKwargs]

Additional arguments to pass to the cross-encoder.

{}

Returns:

Type Description
Array

The predicted relevance scores for each inputs pair.

Source code in mteb/models/models_protocols.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def predict(
    self,
    inputs1: DataLoader[BatchedInput],
    inputs2: DataLoader[BatchedInput],
    *,
    task_metadata: TaskMetadata,
    hf_split: str,
    hf_subset: str,
    prompt_type: PromptType | None = None,
    **kwargs: Unpack[EncodeKwargs],
) -> Array:
    """Predicts relevance scores for pairs of inputs. Note that, unlike the encoder, the cross-encoder can compare across inputs.

    Args:
        inputs1: First Dataloader of inputs to encode. For reranking tasks, these are queries (for text only tasks `QueryDatasetType`).
        inputs2: Second Dataloader of inputs to encode. For reranking, these are documents (for text only tasks `RetrievalOutputType`).
        task_metadata: Metadata of the current task.
        hf_split: Split of current task, allows to know some additional information about current split.
            E.g. Current language
        hf_subset: Subset of current task. Similar to `hf_split` to get more information
        prompt_type: The name type of prompt. (query or passage)
        **kwargs: Additional arguments to pass to the cross-encoder.

    Returns:
        The predicted relevance scores for each inputs pair.
    """
    ...

mteb.models.MTEBModels = EncoderProtocol | CrossEncoderProtocol | SearchProtocol module-attribute

Type alias for all MTEB model types as many models implement multiple protocols and many tasks can be solved by multiple model types.

Cache Wrappers

mteb.models.CachedEmbeddingWrapper

Wraps an encoder and caches embeddings for text and images.

Examples:

>>> import mteb
>>> from mteb.models.cache_wrappers import CachedEmbeddingWrapper
>>> from pathlib import Path
>>> model = mteb.get_model("sentence-transformers/all-MiniLM-L6-v2")
>>> cache_path = Path.cwd() / "cache"
>>> cached_model = CachedEmbeddingWrapper(model, cache_path)
>>> task = mteb.get_task("NanoArguAnaRetrieval")
>>> mteb.evaluate(cached_model, task)
Source code in mteb/models/cache_wrappers/cache_wrapper.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
class CachedEmbeddingWrapper:
    """Wraps an encoder and caches embeddings for text and images.

    Examples:
        >>> import mteb
        >>> from mteb.models.cache_wrappers import CachedEmbeddingWrapper
        >>> from pathlib import Path
        >>> model = mteb.get_model("sentence-transformers/all-MiniLM-L6-v2")
        >>> cache_path = Path.cwd() / "cache"
        >>> cached_model = CachedEmbeddingWrapper(model, cache_path)
        >>> task = mteb.get_task("NanoArguAnaRetrieval")
        >>> mteb.evaluate(cached_model, task)
    """

    def __init__(
        self,
        model: EncoderProtocol,
        cache_path: str | Path,
        cache_backend: type[CacheBackendProtocol] = NumpyCache,
    ) -> None:
        """Init

        Args:
            model: Model to be wrapped.
            cache_path: Path to the directory where cached embeddings are stored.
            cache_backend: Cache backend class to use for storing embeddings.
        """
        self._model = model
        self.cache_path = Path(cache_path)
        self.cache_path.mkdir(parents=True, exist_ok=True)
        if not hasattr(model, "encode"):
            raise ValueError("Model must have an 'encode' method.")
        self.cache_backend = cache_backend
        self.cache_dict: dict[str, CacheBackendProtocol] = {}
        logger.info("Initialized CachedEmbeddingWrapper")

    @property
    def mteb_model_meta(self) -> ModelMeta | None:
        """Return wrapped model meta data."""
        return self._model.mteb_model_meta

    def encode(
        self,
        inputs: DataLoader[BatchedInput],
        *,
        task_metadata: TaskMetadata,
        hf_split: str,
        hf_subset: str,
        prompt_type: PromptType | None = None,
        batch_size: int = 32,
        **kwargs: Any,
    ) -> Array:
        """Encodes the given sentences using the encoder.

        Args:
            inputs: Batch of inputs to encode.
            task_metadata: The metadata of the task.
            hf_split: Split of current task
            hf_subset: Subset of current task
            prompt_type: The name type of prompt. (query or passage)
            batch_size: Batch size
            **kwargs: Additional arguments to pass to the encoder.

        Returns:
            The encoded input in a numpy array or torch tensor of the shape (Number of sentences) x (Embedding dimension).
        """
        task_name = task_metadata.name
        try:
            cache = self._get_or_create_cache(task_name)

            uncached_items: list[dict[str, Any]] = []
            uncached_indices: list[int] = []
            all_items: Dataset = inputs.dataset
            cached_vectors: dict[int, Array] = {}

            for i, item in enumerate(all_items):
                vector = cache.get_vector(item)
                if vector is not None:
                    cached_vectors[i] = vector
                else:
                    uncached_items.append(item)
                    uncached_indices.append(i)

            newly_encoded: dict[int, Array] = {}
            if uncached_items:
                logger.info(f"Encoding {len(uncached_items)} new items")
                # Build a simple DataLoader with only uncached items
                dataset = Dataset.from_list(uncached_items)
                dl = create_dataloader(
                    dataset,
                    task_metadata=task_metadata,
                    prompt_type=prompt_type,
                    **kwargs,
                )
                new_vectors = self._model.encode(
                    dl,
                    task_metadata=task_metadata,
                    hf_split=hf_split,
                    hf_subset=hf_subset,
                    prompt_type=prompt_type,
                    batch_size=batch_size,
                    **kwargs,
                )
                if isinstance(new_vectors, torch.Tensor):
                    new_vectors = new_vectors.cpu().numpy()
                cache.add(uncached_items, new_vectors)
                cache.save()
                for vector, original_idx in zip(new_vectors, uncached_indices):
                    newly_encoded[original_idx] = vector
            else:
                logger.info("All items found in cache")

            final_results = []
            for i in range(len(all_items)):
                if i in cached_vectors:
                    final_results.append(cached_vectors[i])
                else:
                    final_results.append(newly_encoded[i])

            return np.array(final_results)
        except Exception as e:
            logger.error(f"Error in cached encoding: {str(e)}")
            raise

    def _get_or_create_cache(self, task_name: str) -> CacheBackendProtocol:
        """Get or create cache for a specific task.

        Args:
            task_name: Name of the task

        Returns:
            Cache backend instance for the task
        """
        if task_name not in self.cache_dict:
            cache = self.cache_backend(self.cache_path / task_name)
            cache.load()
            self.cache_dict[task_name] = cache
        return self.cache_dict[task_name]

    def __del__(self):
        self.close()

    def close(self) -> None:
        """Unload cache from memory."""
        for task in list(self.cache_dict.keys()):
            self.cache_dict[task].close()

    def similarity(
        self,
        embeddings1: Array,
        embeddings2: Array,
    ) -> Array:
        """Refer to [EncoderProtocol.similarity][mteb.models.EncoderProtocol.similarity] for more details."""
        return self._model.similarity(embeddings1, embeddings2)

    def similarity_pairwise(
        self,
        embeddings1: Array,
        embeddings2: Array,
    ) -> Array:
        """Refer to [EncoderProtocol.similarity][mteb.models.EncoderProtocol.similarity_pairwise] for more details."""
        return self._model.similarity_pairwise(embeddings1, embeddings2)

mteb_model_meta property

Return wrapped model meta data.

__init__(model, cache_path, cache_backend=NumpyCache)

Init

Parameters:

Name Type Description Default
model EncoderProtocol

Model to be wrapped.

required
cache_path str | Path

Path to the directory where cached embeddings are stored.

required
cache_backend type[CacheBackendProtocol]

Cache backend class to use for storing embeddings.

NumpyCache
Source code in mteb/models/cache_wrappers/cache_wrapper.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def __init__(
    self,
    model: EncoderProtocol,
    cache_path: str | Path,
    cache_backend: type[CacheBackendProtocol] = NumpyCache,
) -> None:
    """Init

    Args:
        model: Model to be wrapped.
        cache_path: Path to the directory where cached embeddings are stored.
        cache_backend: Cache backend class to use for storing embeddings.
    """
    self._model = model
    self.cache_path = Path(cache_path)
    self.cache_path.mkdir(parents=True, exist_ok=True)
    if not hasattr(model, "encode"):
        raise ValueError("Model must have an 'encode' method.")
    self.cache_backend = cache_backend
    self.cache_dict: dict[str, CacheBackendProtocol] = {}
    logger.info("Initialized CachedEmbeddingWrapper")

close()

Unload cache from memory.

Source code in mteb/models/cache_wrappers/cache_wrapper.py
170
171
172
173
def close(self) -> None:
    """Unload cache from memory."""
    for task in list(self.cache_dict.keys()):
        self.cache_dict[task].close()

encode(inputs, *, task_metadata, hf_split, hf_subset, prompt_type=None, batch_size=32, **kwargs)

Encodes the given sentences using the encoder.

Parameters:

Name Type Description Default
inputs DataLoader[BatchedInput]

Batch of inputs to encode.

required
task_metadata TaskMetadata

The metadata of the task.

required
hf_split str

Split of current task

required
hf_subset str

Subset of current task

required
prompt_type PromptType | None

The name type of prompt. (query or passage)

None
batch_size int

Batch size

32
**kwargs Any

Additional arguments to pass to the encoder.

{}

Returns:

Type Description
Array

The encoded input in a numpy array or torch tensor of the shape (Number of sentences) x (Embedding dimension).

Source code in mteb/models/cache_wrappers/cache_wrapper.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def encode(
    self,
    inputs: DataLoader[BatchedInput],
    *,
    task_metadata: TaskMetadata,
    hf_split: str,
    hf_subset: str,
    prompt_type: PromptType | None = None,
    batch_size: int = 32,
    **kwargs: Any,
) -> Array:
    """Encodes the given sentences using the encoder.

    Args:
        inputs: Batch of inputs to encode.
        task_metadata: The metadata of the task.
        hf_split: Split of current task
        hf_subset: Subset of current task
        prompt_type: The name type of prompt. (query or passage)
        batch_size: Batch size
        **kwargs: Additional arguments to pass to the encoder.

    Returns:
        The encoded input in a numpy array or torch tensor of the shape (Number of sentences) x (Embedding dimension).
    """
    task_name = task_metadata.name
    try:
        cache = self._get_or_create_cache(task_name)

        uncached_items: list[dict[str, Any]] = []
        uncached_indices: list[int] = []
        all_items: Dataset = inputs.dataset
        cached_vectors: dict[int, Array] = {}

        for i, item in enumerate(all_items):
            vector = cache.get_vector(item)
            if vector is not None:
                cached_vectors[i] = vector
            else:
                uncached_items.append(item)
                uncached_indices.append(i)

        newly_encoded: dict[int, Array] = {}
        if uncached_items:
            logger.info(f"Encoding {len(uncached_items)} new items")
            # Build a simple DataLoader with only uncached items
            dataset = Dataset.from_list(uncached_items)
            dl = create_dataloader(
                dataset,
                task_metadata=task_metadata,
                prompt_type=prompt_type,
                **kwargs,
            )
            new_vectors = self._model.encode(
                dl,
                task_metadata=task_metadata,
                hf_split=hf_split,
                hf_subset=hf_subset,
                prompt_type=prompt_type,
                batch_size=batch_size,
                **kwargs,
            )
            if isinstance(new_vectors, torch.Tensor):
                new_vectors = new_vectors.cpu().numpy()
            cache.add(uncached_items, new_vectors)
            cache.save()
            for vector, original_idx in zip(new_vectors, uncached_indices):
                newly_encoded[original_idx] = vector
        else:
            logger.info("All items found in cache")

        final_results = []
        for i in range(len(all_items)):
            if i in cached_vectors:
                final_results.append(cached_vectors[i])
            else:
                final_results.append(newly_encoded[i])

        return np.array(final_results)
    except Exception as e:
        logger.error(f"Error in cached encoding: {str(e)}")
        raise

similarity(embeddings1, embeddings2)

Refer to EncoderProtocol.similarity for more details.

Source code in mteb/models/cache_wrappers/cache_wrapper.py
175
176
177
178
179
180
181
def similarity(
    self,
    embeddings1: Array,
    embeddings2: Array,
) -> Array:
    """Refer to [EncoderProtocol.similarity][mteb.models.EncoderProtocol.similarity] for more details."""
    return self._model.similarity(embeddings1, embeddings2)

similarity_pairwise(embeddings1, embeddings2)

Refer to EncoderProtocol.similarity for more details.

Source code in mteb/models/cache_wrappers/cache_wrapper.py
183
184
185
186
187
188
189
def similarity_pairwise(
    self,
    embeddings1: Array,
    embeddings2: Array,
) -> Array:
    """Refer to [EncoderProtocol.similarity][mteb.models.EncoderProtocol.similarity_pairwise] for more details."""
    return self._model.similarity_pairwise(embeddings1, embeddings2)

mteb.models.cache_wrappers.CacheBackendProtocol

Bases: Protocol

Protocol for a vector cache map (used to store text/image embeddings).

Implementations may back the cache with different storage backends.

The cache maps an input item (text or image) to its vector embedding, identified by a deterministic hash.

Source code in mteb/models/cache_wrappers/cache_backend_protocol.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@runtime_checkable
class CacheBackendProtocol(Protocol):
    """Protocol for a vector cache map (used to store text/image embeddings).

    Implementations may back the cache with different storage backends.

    The cache maps an input item (text or image) to its vector embedding,
    identified by a deterministic hash.
    """

    def __init__(self, directory: Path | None = None, **kwargs: Any) -> None:
        """Initialize the cache backend.

        Args:
            directory: Directory path to store cache files.
            **kwargs: Additional backend-specific arguments.
        """

    def add(self, item: list[dict[str, Any]], vectors: Array) -> None:
        """Add a vector to the cache.

        Args:
            item: Input item containing 'text' or 'image'.
            vectors: Embedding vector of shape (dim,) or (1, dim).
        """

    def get_vector(self, item: dict[str, Any]) -> Array | None:
        """Retrieve the cached vector for the given item.

        Args:
            item: Input item.

        Returns:
            Cached vector as np.ndarray, or None if not found.
        """

    def save(self) -> None:
        """Persist cache data to disk (index + metadata)."""

    def load(self) -> None:
        """Load cache from disk (index + metadata)."""

    def close(self) -> None:
        """Release resources or flush data."""

    def __contains__(self, item: dict[str, Any]) -> bool:
        """Check whether the cache contains an item."""

__contains__(item)

Check whether the cache contains an item.

Source code in mteb/models/cache_wrappers/cache_backend_protocol.py
56
57
def __contains__(self, item: dict[str, Any]) -> bool:
    """Check whether the cache contains an item."""

__init__(directory=None, **kwargs)

Initialize the cache backend.

Parameters:

Name Type Description Default
directory Path | None

Directory path to store cache files.

None
**kwargs Any

Additional backend-specific arguments.

{}
Source code in mteb/models/cache_wrappers/cache_backend_protocol.py
21
22
23
24
25
26
27
def __init__(self, directory: Path | None = None, **kwargs: Any) -> None:
    """Initialize the cache backend.

    Args:
        directory: Directory path to store cache files.
        **kwargs: Additional backend-specific arguments.
    """

add(item, vectors)

Add a vector to the cache.

Parameters:

Name Type Description Default
item list[dict[str, Any]]

Input item containing 'text' or 'image'.

required
vectors Array

Embedding vector of shape (dim,) or (1, dim).

required
Source code in mteb/models/cache_wrappers/cache_backend_protocol.py
29
30
31
32
33
34
35
def add(self, item: list[dict[str, Any]], vectors: Array) -> None:
    """Add a vector to the cache.

    Args:
        item: Input item containing 'text' or 'image'.
        vectors: Embedding vector of shape (dim,) or (1, dim).
    """

close()

Release resources or flush data.

Source code in mteb/models/cache_wrappers/cache_backend_protocol.py
53
54
def close(self) -> None:
    """Release resources or flush data."""

get_vector(item)

Retrieve the cached vector for the given item.

Parameters:

Name Type Description Default
item dict[str, Any]

Input item.

required

Returns:

Type Description
Array | None

Cached vector as np.ndarray, or None if not found.

Source code in mteb/models/cache_wrappers/cache_backend_protocol.py
37
38
39
40
41
42
43
44
45
def get_vector(self, item: dict[str, Any]) -> Array | None:
    """Retrieve the cached vector for the given item.

    Args:
        item: Input item.

    Returns:
        Cached vector as np.ndarray, or None if not found.
    """

load()

Load cache from disk (index + metadata).

Source code in mteb/models/cache_wrappers/cache_backend_protocol.py
50
51
def load(self) -> None:
    """Load cache from disk (index + metadata)."""

save()

Persist cache data to disk (index + metadata).

Source code in mteb/models/cache_wrappers/cache_backend_protocol.py
47
48
def save(self) -> None:
    """Persist cache data to disk (index + metadata)."""

mteb.models.cache_wrappers.cache_backends.NumpyCache

Generic vector cache for both text and images.

Source code in mteb/models/cache_wrappers/cache_backends/numpy_cache.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
class NumpyCache:
    """Generic vector cache for both text and images."""

    def __init__(self, directory: str | Path, initial_vectors: int = 100_000):
        self.directory = Path(directory)
        self.directory.mkdir(parents=True, exist_ok=True)
        self.vectors_file = self.directory / "vectors.npy"
        self.index_file = self.directory / "index.json"
        self.dimension_file = self.directory / "dimension"
        self.hash_to_index: dict[str, int] = {}
        self.vectors: np.memmap | None = None
        self.vector_dim: int | None = None
        self.initial_vectors = initial_vectors
        logger.info(f"Initialized VectorCacheMap in directory: {self.directory}")
        self._initialize_vectors_file()

    def add(self, items: list[dict[str, Any]], vectors: Array) -> None:
        """Add a vector to the cache."""
        try:
            if self.vector_dim is None:
                self.vector_dim = (
                    vectors.shape[0] if vectors.ndim == 1 else vectors.shape[1]
                )
                self._initialize_vectors_file()
                self._save_dimension()
                logger.info(f"Initialized vector dimension to {self.vector_dim}")

            if self.vectors is None:
                raise RuntimeError(
                    "Vectors file not initialized. Call _initialize_vectors_file() first."
                )

            for item, vec in zip(items, vectors):
                item_hash = _hash_item(item)
                if item_hash in self.hash_to_index:
                    msg = f"Hash collision or duplicate item for hash {item_hash}. Overwriting existing vector."
                    logger.warning(msg)
                    warnings.warn(msg)
                    index = self.hash_to_index[item_hash]
                else:
                    index = len(self.hash_to_index)
                    if index >= len(self.vectors):
                        self._double_vectors_file()
                    self.hash_to_index[item_hash] = index

                self.vectors[index] = vec
                logger.debug(
                    f"Added new item-vector pair. Total pairs: {len(self.hash_to_index)}"
                )
        except Exception as e:
            logger.error(f"Error adding item-vector pair: {str(e)}")
            raise

    def _initialize_vectors_file(self) -> None:
        if self.vector_dim is None:
            logger.info("Vector dimension not set. Waiting for first add() call.")
            return
        if not self.vectors_file.exists():
            logger.info(
                f"Creating initial vectors file with {self.initial_vectors} vectors"
            )
            self.vectors = np.memmap(
                self.vectors_file,
                dtype="float32",
                mode="w+",
                shape=(self.initial_vectors, self.vector_dim),
            )
        else:
            self.vectors = np.memmap(
                self.vectors_file,
                dtype="float32",
                mode="r+",
                shape=(-1, self.vector_dim),
            )
        logger.info(f"Vectors file initialized with shape: {self.vectors.shape}")

    def _double_vectors_file(self) -> None:
        if self.vectors is None or self.vector_dim is None:
            raise RuntimeError(
                "Vectors file not initialized. Call _initialize_vectors_file() first."
            )
        current_size = len(self.vectors)
        new_size = current_size * 2
        logger.info(f"Doubling vectors file from {current_size} to {new_size} vectors")
        self.vectors.flush()
        new_vectors = np.memmap(
            str(self.vectors_file),
            dtype=np.float32,
            mode="r+",
            shape=(new_size, self.vector_dim),
        )
        new_vectors[:current_size] = self.vectors[:]
        self.vectors = new_vectors

    def _save_dimension(self) -> None:
        with self.dimension_file.open("w") as f:
            f.write(str(self.vector_dim))
        logger.info(
            f"Saved vector dimension {self.vector_dim} to {self.dimension_file}"
        )

    def _load_dimension(self) -> None:
        if self.dimension_file.exists():
            with self.dimension_file.open() as f:
                self.vector_dim = int(f.read().strip())
            logger.info(
                f"Loaded vector dimension {self.vector_dim} from {self.dimension_file}"
            )
        else:
            msg = "Dimension file not found. Vector dimension remains uninitialized."
            logger.warning(msg)
            warnings.warn(msg)

    def save(self) -> None:
        """Persist VectorCacheMap to disk."""
        try:
            if self.vectors is not None:
                self.vectors.flush()

            # Convert hash_to_index dict to a format suitable for JSON
            # JSON doesn't support integer keys, so we keep everything as strings
            serializable_index = {
                str(hash_): int(index)  # Ensure indices are serialized as integers
                for hash_, index in self.hash_to_index.items()
            }

            with self.index_file.open("w", encoding="utf-8") as f:
                json.dump(serializable_index, f, indent=2)
            self._save_dimension()
            logger.info(f"Saved VectorCacheMap to {self.directory}")
        except Exception as e:
            logger.error(f"Error saving VectorCacheMap: {str(e)}")
            raise

    def load(self) -> None:
        """Load VectorCacheMap from disk."""
        try:
            self._load_dimension()
            if self.index_file.exists() and self.vectors_file.exists():
                with self.index_file.open(encoding="utf-8") as f:
                    loaded_index = json.load(f)
                    self.hash_to_index = {
                        str(hash_): int(index)  # Ensure we maintain the correct types
                        for hash_, index in loaded_index.items()
                    }

                if self.vector_dim is not None:
                    self.vectors = np.memmap(
                        self.vectors_file,
                        dtype="float32",
                        mode="r+",
                        shape=(-1, self.vector_dim),
                    )
                    logger.info(f"Loaded vectors file with shape: {self.vectors.shape}")
                else:
                    msg = "Vector dimension not set. Unable to load vectors file."
                    logger.warning(msg)
                    warnings.warn(msg)
                logger.info(f"Loaded VectorCacheMap from {self.directory}")
            else:
                msg = "No existing files found. Initialized empty VectorCacheMap."
                logger.warning(msg)
                warnings.warn(msg)
        except Exception as e:
            logger.error(f"Error loading VectorCacheMap: {str(e)}")
            raise

    def get_vector(self, item: dict[str, Any]) -> Array | None:
        """Retrieve vector from index by hash."""
        if self.vectors is None:
            return None

        try:
            item_hash = _hash_item(item)
            if item_hash not in self.hash_to_index:
                logger.debug(f"Item hash not found in index: {item_hash}")
                return None
            index = self.hash_to_index[item_hash]
            return self.vectors[index]
        except Exception as e:
            logger.error(f"Error retrieving vector for item: {str(e)}")
            raise

    def __contains__(self, item: dict[str, Any]) -> bool:
        return _hash_item(item) in self.hash_to_index

    def __del__(self):
        self.close()

    def close(self) -> None:
        """Delete all ve"""
        if hasattr(self, "vectors") and self.vectors is not None:
            self.vectors.flush()
            del self.vectors
            self.vectors = None

add(items, vectors)

Add a vector to the cache.

Source code in mteb/models/cache_wrappers/cache_backends/numpy_cache.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def add(self, items: list[dict[str, Any]], vectors: Array) -> None:
    """Add a vector to the cache."""
    try:
        if self.vector_dim is None:
            self.vector_dim = (
                vectors.shape[0] if vectors.ndim == 1 else vectors.shape[1]
            )
            self._initialize_vectors_file()
            self._save_dimension()
            logger.info(f"Initialized vector dimension to {self.vector_dim}")

        if self.vectors is None:
            raise RuntimeError(
                "Vectors file not initialized. Call _initialize_vectors_file() first."
            )

        for item, vec in zip(items, vectors):
            item_hash = _hash_item(item)
            if item_hash in self.hash_to_index:
                msg = f"Hash collision or duplicate item for hash {item_hash}. Overwriting existing vector."
                logger.warning(msg)
                warnings.warn(msg)
                index = self.hash_to_index[item_hash]
            else:
                index = len(self.hash_to_index)
                if index >= len(self.vectors):
                    self._double_vectors_file()
                self.hash_to_index[item_hash] = index

            self.vectors[index] = vec
            logger.debug(
                f"Added new item-vector pair. Total pairs: {len(self.hash_to_index)}"
            )
    except Exception as e:
        logger.error(f"Error adding item-vector pair: {str(e)}")
        raise

close()

Delete all ve

Source code in mteb/models/cache_wrappers/cache_backends/numpy_cache.py
208
209
210
211
212
213
def close(self) -> None:
    """Delete all ve"""
    if hasattr(self, "vectors") and self.vectors is not None:
        self.vectors.flush()
        del self.vectors
        self.vectors = None

get_vector(item)

Retrieve vector from index by hash.

Source code in mteb/models/cache_wrappers/cache_backends/numpy_cache.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def get_vector(self, item: dict[str, Any]) -> Array | None:
    """Retrieve vector from index by hash."""
    if self.vectors is None:
        return None

    try:
        item_hash = _hash_item(item)
        if item_hash not in self.hash_to_index:
            logger.debug(f"Item hash not found in index: {item_hash}")
            return None
        index = self.hash_to_index[item_hash]
        return self.vectors[index]
    except Exception as e:
        logger.error(f"Error retrieving vector for item: {str(e)}")
        raise

load()

Load VectorCacheMap from disk.

Source code in mteb/models/cache_wrappers/cache_backends/numpy_cache.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def load(self) -> None:
    """Load VectorCacheMap from disk."""
    try:
        self._load_dimension()
        if self.index_file.exists() and self.vectors_file.exists():
            with self.index_file.open(encoding="utf-8") as f:
                loaded_index = json.load(f)
                self.hash_to_index = {
                    str(hash_): int(index)  # Ensure we maintain the correct types
                    for hash_, index in loaded_index.items()
                }

            if self.vector_dim is not None:
                self.vectors = np.memmap(
                    self.vectors_file,
                    dtype="float32",
                    mode="r+",
                    shape=(-1, self.vector_dim),
                )
                logger.info(f"Loaded vectors file with shape: {self.vectors.shape}")
            else:
                msg = "Vector dimension not set. Unable to load vectors file."
                logger.warning(msg)
                warnings.warn(msg)
            logger.info(f"Loaded VectorCacheMap from {self.directory}")
        else:
            msg = "No existing files found. Initialized empty VectorCacheMap."
            logger.warning(msg)
            warnings.warn(msg)
    except Exception as e:
        logger.error(f"Error loading VectorCacheMap: {str(e)}")
        raise

save()

Persist VectorCacheMap to disk.

Source code in mteb/models/cache_wrappers/cache_backends/numpy_cache.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def save(self) -> None:
    """Persist VectorCacheMap to disk."""
    try:
        if self.vectors is not None:
            self.vectors.flush()

        # Convert hash_to_index dict to a format suitable for JSON
        # JSON doesn't support integer keys, so we keep everything as strings
        serializable_index = {
            str(hash_): int(index)  # Ensure indices are serialized as integers
            for hash_, index in self.hash_to_index.items()
        }

        with self.index_file.open("w", encoding="utf-8") as f:
            json.dump(serializable_index, f, indent=2)
        self._save_dimension()
        logger.info(f"Saved VectorCacheMap to {self.directory}")
    except Exception as e:
        logger.error(f"Error saving VectorCacheMap: {str(e)}")
        raise

mteb.models.cache_wrappers.cache_backends.FaissCache

FAISS-based vector cache that uses embeddings directly as lookup keys.

Source code in mteb/models/cache_wrappers/cache_backends/faiss_cache.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
class FaissCache:
    """FAISS-based vector cache that uses embeddings directly as lookup keys."""

    def __init__(self, directory: str | Path):
        requires_package(
            self,
            "faiss",
            "FAISS-based vector cache",
            install_instruction="pip install mteb[faiss-cpu]",
        )

        self.directory = Path(directory)
        self.directory.mkdir(parents=True, exist_ok=True)
        self.index_file = self.directory / "vectors.faiss"
        self.map_file = self.directory / "index.json"

        self.hash_to_index: dict[str, int] = {}
        self.index: faiss.Index | None = None
        self.vector_dim: int | None = None

        logger.info(f"Initialized FAISS VectorCacheMap in {self.directory}")
        self.load()

    def add(self, items: list[dict[str, Any]], vectors: Array) -> None:
        """Add vector to FAISS index."""
        import faiss

        if vectors.ndim == 1:
            vectors = vectors[None, :]
        if self.vector_dim is None:
            self.vector_dim = vectors.shape[1]
            self.index = faiss.IndexFlatL2(self.vector_dim)
        elif self.index is None:
            self.index = faiss.IndexFlatL2(self.vector_dim)

        start_id = len(self.hash_to_index)
        vectors_to_add = []
        for i, (item, vectors) in enumerate(zip(items, vectors)):
            item_hash = _hash_item(item)
            if item_hash in self.hash_to_index:
                continue
            self.hash_to_index[item_hash] = start_id + i
            vectors_to_add.append(vectors)
        if len(vectors_to_add) > 0:
            vectors_array = np.vstack(vectors_to_add).astype(np.float32)
            self.index.add(vectors_array)

    def get_vector(self, item: dict[str, Any]) -> Array | None:
        """Retrieve vector from index by hash."""
        if self.index is None:
            return None
        item_hash = _hash_item(item)
        if item_hash not in self.hash_to_index:
            return None
        idx = self.hash_to_index[item_hash]
        try:
            return self.index.reconstruct(idx)
        except Exception:
            msg = f"Vector id {idx} missing for hash {item_hash}"
            logger.warning(msg)
            warnings.warn(msg)
            return None

    def save(self) -> None:
        """Persist FAISS index and mapping to disk."""
        import faiss

        if self.index is not None:
            faiss.write_index(self.index, str(self.index_file))
        with self.map_file.open("w") as f:
            json.dump(self.hash_to_index, f, indent=2)
        logger.info(f"Saved FAISS cache to {self.directory}")

    def load(self) -> None:
        """Load FAISS index and mapping from disk."""
        import faiss

        if self.map_file.exists():
            with self.map_file.open() as f:
                self.hash_to_index = json.load(f)
        if self.index_file.exists():
            try:
                self.index = faiss.read_index(str(self.index_file))
                logger.info(f"Loaded FAISS index with {self.index.ntotal} vectors")
            except Exception as e:
                logger.error(f"Failed to load FAISS index: {e}")
                self.index = None
        else:
            self.index = None

    def close(self) -> None:
        """Close cache."""
        self.save()
        self.index = None

    def __contains__(self, item: BatchedInput) -> bool:
        return _hash_item(item) in self.hash_to_index

    def __del__(self):
        self.close()

add(items, vectors)

Add vector to FAISS index.

Source code in mteb/models/cache_wrappers/cache_backends/faiss_cache.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def add(self, items: list[dict[str, Any]], vectors: Array) -> None:
    """Add vector to FAISS index."""
    import faiss

    if vectors.ndim == 1:
        vectors = vectors[None, :]
    if self.vector_dim is None:
        self.vector_dim = vectors.shape[1]
        self.index = faiss.IndexFlatL2(self.vector_dim)
    elif self.index is None:
        self.index = faiss.IndexFlatL2(self.vector_dim)

    start_id = len(self.hash_to_index)
    vectors_to_add = []
    for i, (item, vectors) in enumerate(zip(items, vectors)):
        item_hash = _hash_item(item)
        if item_hash in self.hash_to_index:
            continue
        self.hash_to_index[item_hash] = start_id + i
        vectors_to_add.append(vectors)
    if len(vectors_to_add) > 0:
        vectors_array = np.vstack(vectors_to_add).astype(np.float32)
        self.index.add(vectors_array)

close()

Close cache.

Source code in mteb/models/cache_wrappers/cache_backends/faiss_cache.py
113
114
115
116
def close(self) -> None:
    """Close cache."""
    self.save()
    self.index = None

get_vector(item)

Retrieve vector from index by hash.

Source code in mteb/models/cache_wrappers/cache_backends/faiss_cache.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def get_vector(self, item: dict[str, Any]) -> Array | None:
    """Retrieve vector from index by hash."""
    if self.index is None:
        return None
    item_hash = _hash_item(item)
    if item_hash not in self.hash_to_index:
        return None
    idx = self.hash_to_index[item_hash]
    try:
        return self.index.reconstruct(idx)
    except Exception:
        msg = f"Vector id {idx} missing for hash {item_hash}"
        logger.warning(msg)
        warnings.warn(msg)
        return None

load()

Load FAISS index and mapping from disk.

Source code in mteb/models/cache_wrappers/cache_backends/faiss_cache.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def load(self) -> None:
    """Load FAISS index and mapping from disk."""
    import faiss

    if self.map_file.exists():
        with self.map_file.open() as f:
            self.hash_to_index = json.load(f)
    if self.index_file.exists():
        try:
            self.index = faiss.read_index(str(self.index_file))
            logger.info(f"Loaded FAISS index with {self.index.ntotal} vectors")
        except Exception as e:
            logger.error(f"Failed to load FAISS index: {e}")
            self.index = None
    else:
        self.index = None

save()

Persist FAISS index and mapping to disk.

Source code in mteb/models/cache_wrappers/cache_backends/faiss_cache.py
86
87
88
89
90
91
92
93
94
def save(self) -> None:
    """Persist FAISS index and mapping to disk."""
    import faiss

    if self.index is not None:
        faiss.write_index(self.index, str(self.index_file))
    with self.map_file.open("w") as f:
        json.dump(self.hash_to_index, f, indent=2)
    logger.info(f"Saved FAISS cache to {self.directory}")

Compression Wrapper

mteb.models.CompressionWrapper

Wraps a model to quantize the embeddings and compute results on the compressed vectors instead.

Examples:

>>> import mteb
>>> from mteb.models import CompressionWrapper
>>> from mteb.types import OutputDType
>>> model = mteb.get_model("sentence-transformers/all-MiniLM-L6-v2")
>>> compression_model = CompressionWrapper(model, OutputDType.INT8)
>>> task = mteb.get_task("NanoArguAnaRetrieval")
>>> mteb.evaluate(compression_model, task)
Source code in mteb/models/compression_wrappers/compression_wrapper.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
class CompressionWrapper:
    """Wraps a model to quantize the embeddings and compute results on the compressed vectors instead.

    Examples:
        >>> import mteb
        >>> from mteb.models import CompressionWrapper
        >>> from mteb.types import OutputDType
        >>> model = mteb.get_model("sentence-transformers/all-MiniLM-L6-v2")
        >>> compression_model = CompressionWrapper(model, OutputDType.INT8)
        >>> task = mteb.get_task("NanoArguAnaRetrieval")
        >>> mteb.evaluate(compression_model, task)
    """

    def __init__(
        self,
        model: EncoderProtocol,
        output_dtype: OutputDType = OutputDType.INT8,
        clipping_margin: tuple[float, float] | None = None,
    ) -> None:
        """Instantiates the wrapper with an embedding model and sets the quantization level.

        Args:
            model: The model to produce quantized embeddings.
            output_dtype: The output data type to compress to. Has to be supported by the quantize_embeddings method.
            clipping_margin: Optional lower and upper percentiles to crop embeddings before integer quantization.
        """
        self.model = model
        self._quantization_level = output_dtype
        self.clipping_margin = None
        self.min_embeds = 10_000
        meta = model.mteb_model_meta
        embed_types = meta.output_dtypes
        exp_kwargs = dict(meta.experiment_kwargs) if meta.experiment_kwargs else {}
        exp_kwargs["output_dtypes"] = output_dtype.value

        if clipping_margin is not None:
            if not 0 < clipping_margin[0] < clipping_margin[1] < 1:
                raise ValueError(
                    f"Clipping margin must be between 0 and 1 with lower bound {clipping_margin[0]} < "
                    f"upper bound {clipping_margin[1]}, but got {clipping_margin}."
                )
            self.clipping_margin = torch.tensor(clipping_margin)
            exp_kwargs["clipping_margin"] = list(clipping_margin)

        model.mteb_model_meta = meta.model_copy(  # type: ignore[misc]
            update={
                "output_dtypes": [output_dtype],
                "experiment_kwargs": exp_kwargs,
            }
        )
        if embed_types and output_dtype in embed_types:
            msg = (
                f"The model {model.mteb_model_meta.name} natively supports quantization to {output_dtype.value} and "
                f"can be configured to return a compressed embedding vector without using the wrapper. Please note "
                f"that performance on compressed embedding might be better when using compressed embeddings returned "
                f"directly by the model."
            )
            logger.warning(msg)
            warnings.warn(msg)
        logger.info("Initialized CompressionWrapper.")

    @property
    def mteb_model_meta(self) -> ModelMeta | None:
        """Return wrapped model meta data."""
        return self.model.mteb_model_meta

    def encode(
        self,
        inputs: DataLoader[BatchedInput],
        *,
        task_metadata: TaskMetadata,
        hf_split: str,
        hf_subset: str,
        prompt_type: PromptType | None = None,
        batch_size: int = 32,
        **kwargs: Any,
    ) -> Array:
        """Encodes the given inputs using the encoder, then quantizes the embeddings.

        Generates embeddings for the given inputs, then compresses them based on the specified output dtype. While
        embeddings returned by this function are compressed to the value range determined by the output type, it returns
        32- or 16-bit floats to avoid issues with potential downstream calculations and array conversions.

        Args:
            inputs: Batch of inputs to encode.
            task_metadata: The metadata of the task.
            hf_split: Split of current task
            hf_subset: Subset of current task
            prompt_type: The name type of prompt. (query or passage)
            batch_size: Batch size
            **kwargs: Additional arguments to pass to the encoder.

        Returns:
            The encoded and quantized input in an array of the shape (Number of sentences) x (Embedding dimension).
        """
        embeddings = self.model.encode(
            inputs,
            task_metadata=task_metadata,
            hf_split=hf_split,
            hf_subset=hf_subset,
            prompt_type=prompt_type,
            batch_size=batch_size,
            **kwargs,
        )

        if not isinstance(embeddings, torch.Tensor):
            embeddings = torch.tensor(embeddings).float()

        logger.info(f"Quantizing embeddings to {self._quantization_level.value}.")
        return self._quantize_embeddings(embeddings)

    def _quantize_embeddings(
        self,
        embeddings: torch.Tensor,
    ) -> Array:
        """Compresses embeddings to represent each dimension with lower bit-precision.

        Takes full-precision embeddings as input and quantizes them to the chosen bit range. When quantizing to
        integers, the minimum and maximum values per dimension need to be estimated first.

        Args:
            embeddings: The embeddings to quantize.

        Returns:
            The quantized embeddings.
        """
        torch_dtype = self._quantization_level.get_dtype()
        if self._quantization_level in [  # noqa: PLR6201
            OutputDType.FLOAT8_E4M3FN,
            OutputDType.FLOAT8_E5M2,
            OutputDType.FLOAT8_E8M0FNU,
            OutputDType.FLOAT8_E4M3FNUZ,
            OutputDType.FLOAT8_E5M2FNUZ,
            OutputDType.FLOAT16,
        ]:
            # Cast to float8, then back to float16 using PyTorch as numpy doesn't support float8
            quantized = embeddings.type(torch_dtype).type(torch.float16)
        elif self._quantization_level == OutputDType.BF16:
            # Cast to bf16, then back to float32 using PyTorch as numpy doesn't support bf16
            quantized = embeddings.type(torch_dtype).float()
        elif self._quantization_level in [  # noqa: PLR6201
            OutputDType.INT8,
            OutputDType.UINT8,
            OutputDType.INT4,
            OutputDType.UINT4,
        ]:
            num_bits = (
                8
                if self._quantization_level in [OutputDType.INT8, OutputDType.UINT8]  # noqa: PLR6201
                else 4
            )
            if self.clipping_margin is not None:
                cutoffs = torch.quantile(embeddings, self.clipping_margin, dim=0)
                embeddings = torch.clip(embeddings, cutoffs[0], cutoffs[1])
            mins, maxs = self._get_min_max_per_dim(embeddings)
            steps = (maxs - mins) / (2**num_bits - 1)
            subtract = (
                0
                if self._quantization_level in [OutputDType.UINT8, OutputDType.UINT4]  # noqa: PLR6201
                else int(2**num_bits * 0.5)
            )
            quantized = torch.floor((embeddings - mins) / steps) - subtract
        elif self._quantization_level == OutputDType.BINARY:
            quantized = torch.where(embeddings > 0, 1.0, 0.0)
        else:
            raise ValueError(
                f"Quantization method '{self._quantization_level.value}' is not supported!"
            )
        return quantized

    def _get_min_max_per_dim(
        self,
        embeddings: torch.Tensor,
    ) -> tuple[torch.Tensor, torch.Tensor]:
        """Computes thresholds for integer quantization.

        Calculates the minimum and maximum values per embedding dimension and returns these. The values are used to
        estimate the bin boundaries used to map floating points to discrete integer values. If the number of passed
        embeddings is small, a warning is raised that the calculated values might be unstable.

        Args:
            embeddings: The embeddings for which minima and maxima should be calculated.

        Returns:
            The minimum and maximum values per embedding dimension.
        """
        if len(embeddings) < self.min_embeds:
            msg = (
                f"Estimating quantization parameters on less than {self.min_embeds} embeddings (only "
                f"{len(embeddings)}). Parameters are likely unstable and results might not generalize."
            )
            logger.warning(msg)
            warnings.warn(msg)

        mins, maxs = (
            torch.min(embeddings, dim=0).values,
            torch.max(embeddings, dim=0).values,
        )
        return mins, maxs

    def similarity(
        self,
        embeddings1: Array,
        embeddings2: Array,
    ) -> Array:
        """Refer to [EncoderProtocol.similarity][mteb.models.EncoderProtocol.similarity] for more details."""
        return self.model.similarity(embeddings1, embeddings2)

    def similarity_pairwise(
        self,
        embeddings1: Array,
        embeddings2: Array,
    ) -> Array:
        """Refer to [EncoderProtocol.similarity][mteb.models.EncoderProtocol.similarity_pairwise] for more details."""
        return self.model.similarity_pairwise(embeddings1, embeddings2)

mteb_model_meta property

Return wrapped model meta data.

__init__(model, output_dtype=OutputDType.INT8, clipping_margin=None)

Instantiates the wrapper with an embedding model and sets the quantization level.

Parameters:

Name Type Description Default
model EncoderProtocol

The model to produce quantized embeddings.

required
output_dtype OutputDType

The output data type to compress to. Has to be supported by the quantize_embeddings method.

INT8
clipping_margin tuple[float, float] | None

Optional lower and upper percentiles to crop embeddings before integer quantization.

None
Source code in mteb/models/compression_wrappers/compression_wrapper.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def __init__(
    self,
    model: EncoderProtocol,
    output_dtype: OutputDType = OutputDType.INT8,
    clipping_margin: tuple[float, float] | None = None,
) -> None:
    """Instantiates the wrapper with an embedding model and sets the quantization level.

    Args:
        model: The model to produce quantized embeddings.
        output_dtype: The output data type to compress to. Has to be supported by the quantize_embeddings method.
        clipping_margin: Optional lower and upper percentiles to crop embeddings before integer quantization.
    """
    self.model = model
    self._quantization_level = output_dtype
    self.clipping_margin = None
    self.min_embeds = 10_000
    meta = model.mteb_model_meta
    embed_types = meta.output_dtypes
    exp_kwargs = dict(meta.experiment_kwargs) if meta.experiment_kwargs else {}
    exp_kwargs["output_dtypes"] = output_dtype.value

    if clipping_margin is not None:
        if not 0 < clipping_margin[0] < clipping_margin[1] < 1:
            raise ValueError(
                f"Clipping margin must be between 0 and 1 with lower bound {clipping_margin[0]} < "
                f"upper bound {clipping_margin[1]}, but got {clipping_margin}."
            )
        self.clipping_margin = torch.tensor(clipping_margin)
        exp_kwargs["clipping_margin"] = list(clipping_margin)

    model.mteb_model_meta = meta.model_copy(  # type: ignore[misc]
        update={
            "output_dtypes": [output_dtype],
            "experiment_kwargs": exp_kwargs,
        }
    )
    if embed_types and output_dtype in embed_types:
        msg = (
            f"The model {model.mteb_model_meta.name} natively supports quantization to {output_dtype.value} and "
            f"can be configured to return a compressed embedding vector without using the wrapper. Please note "
            f"that performance on compressed embedding might be better when using compressed embeddings returned "
            f"directly by the model."
        )
        logger.warning(msg)
        warnings.warn(msg)
    logger.info("Initialized CompressionWrapper.")

encode(inputs, *, task_metadata, hf_split, hf_subset, prompt_type=None, batch_size=32, **kwargs)

Encodes the given inputs using the encoder, then quantizes the embeddings.

Generates embeddings for the given inputs, then compresses them based on the specified output dtype. While embeddings returned by this function are compressed to the value range determined by the output type, it returns 32- or 16-bit floats to avoid issues with potential downstream calculations and array conversions.

Parameters:

Name Type Description Default
inputs DataLoader[BatchedInput]

Batch of inputs to encode.

required
task_metadata TaskMetadata

The metadata of the task.

required
hf_split str

Split of current task

required
hf_subset str

Subset of current task

required
prompt_type PromptType | None

The name type of prompt. (query or passage)

None
batch_size int

Batch size

32
**kwargs Any

Additional arguments to pass to the encoder.

{}

Returns:

Type Description
Array

The encoded and quantized input in an array of the shape (Number of sentences) x (Embedding dimension).

Source code in mteb/models/compression_wrappers/compression_wrapper.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def encode(
    self,
    inputs: DataLoader[BatchedInput],
    *,
    task_metadata: TaskMetadata,
    hf_split: str,
    hf_subset: str,
    prompt_type: PromptType | None = None,
    batch_size: int = 32,
    **kwargs: Any,
) -> Array:
    """Encodes the given inputs using the encoder, then quantizes the embeddings.

    Generates embeddings for the given inputs, then compresses them based on the specified output dtype. While
    embeddings returned by this function are compressed to the value range determined by the output type, it returns
    32- or 16-bit floats to avoid issues with potential downstream calculations and array conversions.

    Args:
        inputs: Batch of inputs to encode.
        task_metadata: The metadata of the task.
        hf_split: Split of current task
        hf_subset: Subset of current task
        prompt_type: The name type of prompt. (query or passage)
        batch_size: Batch size
        **kwargs: Additional arguments to pass to the encoder.

    Returns:
        The encoded and quantized input in an array of the shape (Number of sentences) x (Embedding dimension).
    """
    embeddings = self.model.encode(
        inputs,
        task_metadata=task_metadata,
        hf_split=hf_split,
        hf_subset=hf_subset,
        prompt_type=prompt_type,
        batch_size=batch_size,
        **kwargs,
    )

    if not isinstance(embeddings, torch.Tensor):
        embeddings = torch.tensor(embeddings).float()

    logger.info(f"Quantizing embeddings to {self._quantization_level.value}.")
    return self._quantize_embeddings(embeddings)

similarity(embeddings1, embeddings2)

Refer to EncoderProtocol.similarity for more details.

Source code in mteb/models/compression_wrappers/compression_wrapper.py
222
223
224
225
226
227
228
def similarity(
    self,
    embeddings1: Array,
    embeddings2: Array,
) -> Array:
    """Refer to [EncoderProtocol.similarity][mteb.models.EncoderProtocol.similarity] for more details."""
    return self.model.similarity(embeddings1, embeddings2)

similarity_pairwise(embeddings1, embeddings2)

Refer to EncoderProtocol.similarity for more details.

Source code in mteb/models/compression_wrappers/compression_wrapper.py
230
231
232
233
234
235
236
def similarity_pairwise(
    self,
    embeddings1: Array,
    embeddings2: Array,
) -> Array:
    """Refer to [EncoderProtocol.similarity][mteb.models.EncoderProtocol.similarity_pairwise] for more details."""
    return self.model.similarity_pairwise(embeddings1, embeddings2)

Search Index Backends

mteb.models.search_encoder_index.search_backend_protocol.IndexEncoderSearchProtocol

Bases: Protocol

Protocol for search backends used in encoder-based retrieval.

Source code in mteb/models/search_encoder_index/search_backend_protocol.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class IndexEncoderSearchProtocol(Protocol):
    """Protocol for search backends used in encoder-based retrieval."""

    def add_documents(
        self,
        embeddings: Array,
        idxs: list[str],
    ) -> None:
        """Add documents to the search backend.

        Args:
            embeddings: Embeddings of the documents to add.
            idxs: IDs of the documents to add.
        """

    def search(
        self,
        embeddings: Array,
        top_k: int,
        similarity_fn: Callable[[Array, Array], Array],
        top_ranked: TopRankedDocumentsType | None = None,
        query_idx_to_id: dict[int, str] | None = None,
    ) -> tuple[list[list[float]], list[list[int]]]:
        """Search through added corpus embeddings or rerank top-ranked documents.

        Supports both full-corpus and reranking search modes:
            - Full-corpus mode: `top_ranked=None`, uses added corpus embeddings.
            - Reranking mode:  `top_ranked` contains mapping {query_id: [doc_ids]}.

        Args:
            embeddings: Query embeddings, shape (num_queries, dim).
            top_k: Number of top results to return.
            similarity_fn: Function to compute similarity between query and corpus.
            top_ranked: Mapping of query_id -> list of candidate doc_ids. Used for reranking.
            query_idx_to_id: Mapping of query index -> query_id. Used for reranking.

        Returns:
            A tuple (top_k_values, top_k_indices), for each query:
                - top_k_values: List of top-k similarity scores.
                - top_k_indices: List of indices of the top-k documents in the added corpus.
        """

    def clear(self) -> None:
        """Clear all stored documents and embeddings from the backend."""

add_documents(embeddings, idxs)

Add documents to the search backend.

Parameters:

Name Type Description Default
embeddings Array

Embeddings of the documents to add.

required
idxs list[str]

IDs of the documents to add.

required
Source code in mteb/models/search_encoder_index/search_backend_protocol.py
14
15
16
17
18
19
20
21
22
23
24
def add_documents(
    self,
    embeddings: Array,
    idxs: list[str],
) -> None:
    """Add documents to the search backend.

    Args:
        embeddings: Embeddings of the documents to add.
        idxs: IDs of the documents to add.
    """

clear()

Clear all stored documents and embeddings from the backend.

Source code in mteb/models/search_encoder_index/search_backend_protocol.py
53
54
def clear(self) -> None:
    """Clear all stored documents and embeddings from the backend."""

search(embeddings, top_k, similarity_fn, top_ranked=None, query_idx_to_id=None)

Search through added corpus embeddings or rerank top-ranked documents.

Supports both full-corpus and reranking search modes
  • Full-corpus mode: top_ranked=None, uses added corpus embeddings.
  • Reranking mode: top_ranked contains mapping {query_id: [doc_ids]}.

Parameters:

Name Type Description Default
embeddings Array

Query embeddings, shape (num_queries, dim).

required
top_k int

Number of top results to return.

required
similarity_fn Callable[[Array, Array], Array]

Function to compute similarity between query and corpus.

required
top_ranked TopRankedDocumentsType | None

Mapping of query_id -> list of candidate doc_ids. Used for reranking.

None
query_idx_to_id dict[int, str] | None

Mapping of query index -> query_id. Used for reranking.

None

Returns:

Type Description
tuple[list[list[float]], list[list[int]]]

A tuple (top_k_values, top_k_indices), for each query: - top_k_values: List of top-k similarity scores. - top_k_indices: List of indices of the top-k documents in the added corpus.

Source code in mteb/models/search_encoder_index/search_backend_protocol.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def search(
    self,
    embeddings: Array,
    top_k: int,
    similarity_fn: Callable[[Array, Array], Array],
    top_ranked: TopRankedDocumentsType | None = None,
    query_idx_to_id: dict[int, str] | None = None,
) -> tuple[list[list[float]], list[list[int]]]:
    """Search through added corpus embeddings or rerank top-ranked documents.

    Supports both full-corpus and reranking search modes:
        - Full-corpus mode: `top_ranked=None`, uses added corpus embeddings.
        - Reranking mode:  `top_ranked` contains mapping {query_id: [doc_ids]}.

    Args:
        embeddings: Query embeddings, shape (num_queries, dim).
        top_k: Number of top results to return.
        similarity_fn: Function to compute similarity between query and corpus.
        top_ranked: Mapping of query_id -> list of candidate doc_ids. Used for reranking.
        query_idx_to_id: Mapping of query index -> query_id. Used for reranking.

    Returns:
        A tuple (top_k_values, top_k_indices), for each query:
            - top_k_values: List of top-k similarity scores.
            - top_k_indices: List of indices of the top-k documents in the added corpus.
    """

mteb.models.search_encoder_index.search_indexes.faiss_search_index.FaissSearchIndex

FAISS-based backend for encoder-based search.

Supports both full-corpus retrieval and reranking (via top_ranked).

Notes
  • Stores all embeddings in memory (IndexFlatIP or IndexFlatL2).
  • Expects embeddings to be normalized if cosine similarity is desired.
Source code in mteb/models/search_encoder_index/search_indexes/faiss_search_index.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class FaissSearchIndex:
    """FAISS-based backend for encoder-based search.

    Supports both full-corpus retrieval and reranking (via `top_ranked`).

    Notes:
        - Stores *all* embeddings in memory (IndexFlatIP or IndexFlatL2).
        - Expects embeddings to be normalized if cosine similarity is desired.
    """

    _normalize: bool = False

    def __init__(self, model: EncoderProtocol) -> None:
        requires_package(
            self,
            "faiss",
            "FAISS-based search",
            install_instruction="pip install mteb[faiss-cpu]",
        )

        from faiss import IndexFlatIP, IndexFlatL2

        # https://github.com/facebookresearch/faiss/wiki/Faiss-indexes
        if model.mteb_model_meta.similarity_fn_name is ScoringFunction.DOT_PRODUCT:
            self.index_type = IndexFlatIP
        elif model.mteb_model_meta.similarity_fn_name is ScoringFunction.COSINE:
            self.index_type = IndexFlatIP
            self._normalize = True
        elif model.mteb_model_meta.similarity_fn_name is ScoringFunction.EUCLIDEAN:
            self.index_type = IndexFlatL2
        else:
            raise ValueError(
                f"FAISS backend does not support similarity function {model.mteb_model_meta.similarity_fn_name}. "
                f"Available: {ScoringFunction.DOT_PRODUCT}, {ScoringFunction.COSINE}, {ScoringFunction.EUCLIDEAN}."
            )

        self.idxs: list[str] = []
        self.index: faiss.Index | None = None

    def add_documents(self, embeddings: Array, idxs: list[str]) -> None:
        """Add all document embeddings and their IDs to FAISS index."""
        import faiss

        if isinstance(embeddings, torch.Tensor):
            embeddings = embeddings.detach().cpu().numpy()

        embeddings = embeddings.astype(np.float32)
        self.idxs.extend(idxs)

        if self._normalize:
            faiss.normalize_L2(embeddings)

        dim = embeddings.shape[1]
        if self.index is None:
            self.index = self.index_type(dim)

        self.index.add(embeddings)
        logger.info(f"FAISS index built with {len(idxs)} vectors of dim {dim}.")

    def search(
        self,
        embeddings: Array,
        top_k: int,
        similarity_fn: Callable[[Array, Array], Array],
        top_ranked: TopRankedDocumentsType | None = None,
        query_idx_to_id: dict[int, str] | None = None,
    ) -> tuple[list[list[float]], list[list[int]]]:
        """Search using FAISS."""
        import faiss

        if self.index is None:
            raise ValueError("No index built. Call add_document() first.")

        if isinstance(embeddings, torch.Tensor):
            embeddings = embeddings.detach().cpu().numpy()

        if self._normalize:
            faiss.normalize_L2(embeddings)

        if top_ranked is not None:
            if query_idx_to_id is None:
                raise ValueError("query_idx_to_id must be provided when reranking.")

            similarities, ids = self._reranking(
                embeddings,
                top_k,
                top_ranked=top_ranked,
                query_idx_to_id=query_idx_to_id,
            )
        else:
            similarities, ids = self.index.search(embeddings.astype(np.float32), top_k)
            similarities = similarities.tolist()
            ids = ids.tolist()

        if issubclass(self.index_type, faiss.IndexFlatL2):
            similarities = (-np.sqrt(np.maximum(similarities, 0))).tolist()

        return similarities, ids

    def _reranking(
        self,
        embeddings: Array,
        top_k: int,
        top_ranked: TopRankedDocumentsType,
        query_idx_to_id: dict[int, str],
    ) -> tuple[list[list[float]], list[list[int]]]:
        doc_id_to_idx = {doc_id: i for i, doc_id in enumerate(self.idxs)}
        scores_all: list[list[float]] = []
        idxs_all: list[list[int]] = []

        for query_idx, query_emb in enumerate(embeddings):
            query_id = query_idx_to_id[query_idx]
            ranked_ids = top_ranked.get(query_id)
            if not ranked_ids:
                msg = f"No top-ranked documents for query {query_id}"
                logger.warning(msg)
                warnings.warn(msg)
                scores_all.append([])
                idxs_all.append([])
                continue

            candidate_indices = [doc_id_to_idx[doc_id] for doc_id in ranked_ids]
            d = self.index.d  # type: ignore[union-attr]
            candidate_embs = np.vstack(
                [self.index.reconstruct(idx) for idx in candidate_indices]  # type: ignore[union-attr]
            )
            sub_reranking_index = self.index_type(d)
            sub_reranking_index.add(candidate_embs)

            # Search returns scores and indices in one call
            scores, local_indices = sub_reranking_index.search(
                query_emb.reshape(1, -1).astype(np.float32),
                min(top_k, len(candidate_indices)),
            )
            # faiss will output 2d arrays even for single query
            scores_all.append(scores[0].tolist())
            idxs_all.append(local_indices[0].tolist())

        return scores_all, idxs_all

    def clear(self) -> None:
        """Clear all stored documents and embeddings from the backend."""
        self.index = None
        self.idxs = []

add_documents(embeddings, idxs)

Add all document embeddings and their IDs to FAISS index.

Source code in mteb/models/search_encoder_index/search_indexes/faiss_search_index.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def add_documents(self, embeddings: Array, idxs: list[str]) -> None:
    """Add all document embeddings and their IDs to FAISS index."""
    import faiss

    if isinstance(embeddings, torch.Tensor):
        embeddings = embeddings.detach().cpu().numpy()

    embeddings = embeddings.astype(np.float32)
    self.idxs.extend(idxs)

    if self._normalize:
        faiss.normalize_L2(embeddings)

    dim = embeddings.shape[1]
    if self.index is None:
        self.index = self.index_type(dim)

    self.index.add(embeddings)
    logger.info(f"FAISS index built with {len(idxs)} vectors of dim {dim}.")

clear()

Clear all stored documents and embeddings from the backend.

Source code in mteb/models/search_encoder_index/search_indexes/faiss_search_index.py
165
166
167
168
def clear(self) -> None:
    """Clear all stored documents and embeddings from the backend."""
    self.index = None
    self.idxs = []

search(embeddings, top_k, similarity_fn, top_ranked=None, query_idx_to_id=None)

Search using FAISS.

Source code in mteb/models/search_encoder_index/search_indexes/faiss_search_index.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def search(
    self,
    embeddings: Array,
    top_k: int,
    similarity_fn: Callable[[Array, Array], Array],
    top_ranked: TopRankedDocumentsType | None = None,
    query_idx_to_id: dict[int, str] | None = None,
) -> tuple[list[list[float]], list[list[int]]]:
    """Search using FAISS."""
    import faiss

    if self.index is None:
        raise ValueError("No index built. Call add_document() first.")

    if isinstance(embeddings, torch.Tensor):
        embeddings = embeddings.detach().cpu().numpy()

    if self._normalize:
        faiss.normalize_L2(embeddings)

    if top_ranked is not None:
        if query_idx_to_id is None:
            raise ValueError("query_idx_to_id must be provided when reranking.")

        similarities, ids = self._reranking(
            embeddings,
            top_k,
            top_ranked=top_ranked,
            query_idx_to_id=query_idx_to_id,
        )
    else:
        similarities, ids = self.index.search(embeddings.astype(np.float32), top_k)
        similarities = similarities.tolist()
        ids = ids.tolist()

    if issubclass(self.index_type, faiss.IndexFlatL2):
        similarities = (-np.sqrt(np.maximum(similarities, 0))).tolist()

    return similarities, ids