Skip to content

Tasks

A task is an implementation of a dataset for evaluation. It could, for instance, be the MIRACL dataset consisting of queries, a corpus of documents ,and the correct documents to retrieve for a given query. In addition to the dataset, a task includes the specifications for how a model should be run on the dataset and how its output should be evaluated. Each task also comes with extensive metadata including the license, who annotated the data, etc.

An overview of the tasks within mteb

Utilities

mteb.get_tasks

This script contains functions that are used to get an overview of the MTEB benchmark.

MTEBTasks

Bases: tuple[AbsTask]

A tuple of tasks with additional methods to get an overview of the tasks.

Source code in mteb/get_tasks.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
class MTEBTasks(tuple[AbsTask]):
    """A tuple of tasks with additional methods to get an overview of the tasks."""

    def __repr__(self) -> str:
        return "MTEBTasks" + super().__repr__()

    @staticmethod
    def _extract_property_from_task(task: AbsTask, property: str):
        if hasattr(task.metadata, property):
            return getattr(task.metadata, property)
        elif hasattr(task, property):
            return getattr(task, property)
        else:
            raise KeyError("Property neither in Task attribute or in task metadata.")

    @property
    def languages(self) -> set:
        """Return all languages from tasks"""
        langs = set()
        for task in self:
            for lg in task.languages:
                langs.add(lg)
        return langs

    def count_languages(self) -> Counter:
        """Summarize count of all languages from tasks

        Returns:
            Counter with language as key and count as value.
        """
        langs = []
        for task in self:
            langs.extend(task.languages)
        return Counter(langs)

    def to_markdown(
        self,
        properties: Sequence[str] = _DEFAULT_PROPRIETIES,
        limit_n_entries: int | None = 3,
    ) -> str:
        """Generate markdown table with tasks summary

        Args:
            properties: list of metadata to summarize from a Task class.
            limit_n_entries: Limit the number of entries for cell values, e.g. number of languages and domains. Will use "..." to indicate that
                there are more entries.

        Returns:
            string with a markdown table.
        """

        def _limit_entries_in_cell_inner(cell: Any):
            if isinstance(cell, list | set):
                return self._limit_entries_in_cell(cell, limit_n_entries)
            return cell

        markdown_table = "| Task" + "".join([f"| {p}  " for p in properties]) + "|\n"
        _head_sep = "| ---" * (len(properties) + 1) + " |\n"
        markdown_table += _head_sep
        for task in self:
            markdown_table += f"| {task.metadata.name} "
            markdown_table += "".join(
                [
                    f"| {_limit_entries_in_cell_inner(self._extract_property_from_task(task, p))} "
                    for p in properties
                ]
            )
            markdown_table += " |\n"
        return markdown_table

    def to_dataframe(
        self,
        properties: Sequence[str] = _DEFAULT_PROPRIETIES,
    ) -> pd.DataFrame:
        """Generate pandas DataFrame with tasks summary

        Args:
            properties: list of metadata to summarize from a Task class.

        Returns:
            pandas DataFrame.
        """
        data = []
        for task in self:
            data.append(
                {p: self._extract_property_from_task(task, p) for p in properties}
            )
        return pd.DataFrame(data)

    @staticmethod
    def _limit_entries_in_cell(
        cell: list | set, limit_n_entries: int | None = 3
    ) -> str:
        if limit_n_entries and len(cell) > limit_n_entries:
            ending = "]" if isinstance(cell, list) else "}"
            cell = sorted(cell)
            return str(cell[:limit_n_entries])[:-1] + ", ..." + ending
        else:
            return str(cell)

    def to_latex(
        self,
        properties: Sequence[str] = _DEFAULT_PROPRIETIES,
        group_indices: Sequence[str] | None = ("type", "name"),
        include_citation_in_name: bool = True,
        limit_n_entries: int | None = 3,
    ) -> str:
        """Generate a LaTeX table of the tasks.

        Args:
            properties: list of metadata to summarize from a Task class.
            group_indices: list of properties to group the table by.
            include_citation_in_name: Whether to include the citation in the name.
            limit_n_entries: Limit the number of entries for cell values, e.g. number of languages and domains. Will use "..." to indicate that
                there are more entries.

        Returns:
            string with a LaTeX table.
        """
        if include_citation_in_name and "name" in properties:
            properties += ["intext_citation"]
            df = self.to_dataframe(properties)
            df["name"] = df["name"] + " " + df["intext_citation"]
            df = df.drop(columns=["intext_citation"])
        else:
            df = self.to_dataframe(properties)

        if limit_n_entries and df.shape[0]:  # ensure that there are entries
            for col in df.columns:
                # check if content is a list or set
                if isinstance(df[col].iloc[0], list | set):
                    _col = []
                    for val in df[col]:
                        str_col = self._limit_entries_in_cell(val, limit_n_entries)

                        # escape } and { characters
                        str_col = str_col.replace("{", "\\{").replace("}", "\\}")
                        _col.append(str_col)
                    df[col] = _col

        if group_indices:
            df = df.set_index(group_indices)

        return df.to_latex()
languages property

Return all languages from tasks

count_languages()

Summarize count of all languages from tasks

Returns:

Type Description
Counter

Counter with language as key and count as value.

Source code in mteb/get_tasks.py
101
102
103
104
105
106
107
108
109
110
def count_languages(self) -> Counter:
    """Summarize count of all languages from tasks

    Returns:
        Counter with language as key and count as value.
    """
    langs = []
    for task in self:
        langs.extend(task.languages)
    return Counter(langs)
to_dataframe(properties=_DEFAULT_PROPRIETIES)

Generate pandas DataFrame with tasks summary

Parameters:

Name Type Description Default
properties Sequence[str]

list of metadata to summarize from a Task class.

_DEFAULT_PROPRIETIES

Returns:

Type Description
DataFrame

pandas DataFrame.

Source code in mteb/get_tasks.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def to_dataframe(
    self,
    properties: Sequence[str] = _DEFAULT_PROPRIETIES,
) -> pd.DataFrame:
    """Generate pandas DataFrame with tasks summary

    Args:
        properties: list of metadata to summarize from a Task class.

    Returns:
        pandas DataFrame.
    """
    data = []
    for task in self:
        data.append(
            {p: self._extract_property_from_task(task, p) for p in properties}
        )
    return pd.DataFrame(data)
to_latex(properties=_DEFAULT_PROPRIETIES, group_indices=('type', 'name'), include_citation_in_name=True, limit_n_entries=3)

Generate a LaTeX table of the tasks.

Parameters:

Name Type Description Default
properties Sequence[str]

list of metadata to summarize from a Task class.

_DEFAULT_PROPRIETIES
group_indices Sequence[str] | None

list of properties to group the table by.

('type', 'name')
include_citation_in_name bool

Whether to include the citation in the name.

True
limit_n_entries int | None

Limit the number of entries for cell values, e.g. number of languages and domains. Will use "..." to indicate that there are more entries.

3

Returns:

Type Description
str

string with a LaTeX table.

Source code in mteb/get_tasks.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def to_latex(
    self,
    properties: Sequence[str] = _DEFAULT_PROPRIETIES,
    group_indices: Sequence[str] | None = ("type", "name"),
    include_citation_in_name: bool = True,
    limit_n_entries: int | None = 3,
) -> str:
    """Generate a LaTeX table of the tasks.

    Args:
        properties: list of metadata to summarize from a Task class.
        group_indices: list of properties to group the table by.
        include_citation_in_name: Whether to include the citation in the name.
        limit_n_entries: Limit the number of entries for cell values, e.g. number of languages and domains. Will use "..." to indicate that
            there are more entries.

    Returns:
        string with a LaTeX table.
    """
    if include_citation_in_name and "name" in properties:
        properties += ["intext_citation"]
        df = self.to_dataframe(properties)
        df["name"] = df["name"] + " " + df["intext_citation"]
        df = df.drop(columns=["intext_citation"])
    else:
        df = self.to_dataframe(properties)

    if limit_n_entries and df.shape[0]:  # ensure that there are entries
        for col in df.columns:
            # check if content is a list or set
            if isinstance(df[col].iloc[0], list | set):
                _col = []
                for val in df[col]:
                    str_col = self._limit_entries_in_cell(val, limit_n_entries)

                    # escape } and { characters
                    str_col = str_col.replace("{", "\\{").replace("}", "\\}")
                    _col.append(str_col)
                df[col] = _col

    if group_indices:
        df = df.set_index(group_indices)

    return df.to_latex()
to_markdown(properties=_DEFAULT_PROPRIETIES, limit_n_entries=3)

Generate markdown table with tasks summary

Parameters:

Name Type Description Default
properties Sequence[str]

list of metadata to summarize from a Task class.

_DEFAULT_PROPRIETIES
limit_n_entries int | None

Limit the number of entries for cell values, e.g. number of languages and domains. Will use "..." to indicate that there are more entries.

3

Returns:

Type Description
str

string with a markdown table.

Source code in mteb/get_tasks.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def to_markdown(
    self,
    properties: Sequence[str] = _DEFAULT_PROPRIETIES,
    limit_n_entries: int | None = 3,
) -> str:
    """Generate markdown table with tasks summary

    Args:
        properties: list of metadata to summarize from a Task class.
        limit_n_entries: Limit the number of entries for cell values, e.g. number of languages and domains. Will use "..." to indicate that
            there are more entries.

    Returns:
        string with a markdown table.
    """

    def _limit_entries_in_cell_inner(cell: Any):
        if isinstance(cell, list | set):
            return self._limit_entries_in_cell(cell, limit_n_entries)
        return cell

    markdown_table = "| Task" + "".join([f"| {p}  " for p in properties]) + "|\n"
    _head_sep = "| ---" * (len(properties) + 1) + " |\n"
    markdown_table += _head_sep
    for task in self:
        markdown_table += f"| {task.metadata.name} "
        markdown_table += "".join(
            [
                f"| {_limit_entries_in_cell_inner(self._extract_property_from_task(task, p))} "
                for p in properties
            ]
        )
        markdown_table += " |\n"
    return markdown_table

get_task(task_name, languages=None, script=None, eval_splits=None, hf_subsets=None, exclusive_language_filter=False)

Get a task by name.

Parameters:

Name Type Description Default
task_name str

The name of the task to fetch.

required
languages list[str] | None

A list of languages either specified as 3 letter languages codes (ISO 639-3, e.g. "eng") or as script languages codes e.g. "eng-Latn". For multilingual tasks this will also remove languages that are not in the specified list.

None
script list[str] | None

A list of script codes (ISO 15924 codes). If None, all scripts are included. For multilingual tasks this will also remove scripts

None
eval_splits list[str] | None

A list of evaluation splits to include. If None, all splits are included.

None
hf_subsets list[str] | None

A list of Huggingface subsets to evaluate on.

None
exclusive_language_filter bool

Some datasets contains more than one language e.g. for STS22 the subset "de-en" contain eng and deu. If exclusive_language_filter is set to False both of these will be kept, but if set to True only those that contains all the languages specified will be kept.

False

Returns:

Type Description
AbsTask

An initialized task object.

Examples:

>>> get_task("BornholmBitextMining")
Source code in mteb/get_tasks.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
def get_task(
    task_name: str,
    languages: list[str] | None = None,
    script: list[str] | None = None,
    eval_splits: list[str] | None = None,
    hf_subsets: list[str] | None = None,
    exclusive_language_filter: bool = False,
) -> AbsTask:
    """Get a task by name.

    Args:
        task_name: The name of the task to fetch.
        languages: A list of languages either specified as 3 letter languages codes (ISO 639-3, e.g. "eng") or as script languages codes e.g.
            "eng-Latn". For multilingual tasks this will also remove languages that are not in the specified list.
        script: A list of script codes (ISO 15924 codes). If None, all scripts are included. For multilingual tasks this will also remove scripts
        eval_splits: A list of evaluation splits to include. If None, all splits are included.
        hf_subsets: A list of Huggingface subsets to evaluate on.
        exclusive_language_filter: Some datasets contains more than one language e.g. for STS22 the subset "de-en" contain eng and deu. If
            exclusive_language_filter is set to False both of these will be kept, but if set to True only those that contains all the languages
            specified will be kept.

    Returns:
        An initialized task object.

    Examples:
        >>> get_task("BornholmBitextMining")
    """
    if task_name in _TASK_RENAMES:
        _task_name = _TASK_RENAMES[task_name]
        logger.warning(
            f"The task with the given name '{task_name}' has been renamed to '{_task_name}'. To prevent this warning use the new name."
        )

    if task_name not in _TASKS_REGISTRY:
        close_matches = difflib.get_close_matches(task_name, _TASKS_REGISTRY.keys())
        if close_matches:
            suggestion = f"KeyError: '{task_name}' not found. Did you mean: '{close_matches[0]}'?"
        else:
            suggestion = (
                f"KeyError: '{task_name}' not found and no similar keys were found."
            )
        raise KeyError(suggestion)
    task = _TASKS_REGISTRY[task_name]()
    if eval_splits:
        task.filter_eval_splits(eval_splits=eval_splits)
    return task.filter_languages(
        languages,
        script,
        hf_subsets=hf_subsets,
        exclusive_language_filter=exclusive_language_filter,
    )

get_tasks(tasks=None, *, languages=None, script=None, domains=None, task_types=None, categories=None, exclude_superseded=True, eval_splits=None, exclusive_language_filter=False, modalities=None, exclusive_modality_filter=False, exclude_aggregate=False, exclude_private=True)

Get a list of tasks based on the specified filters.

Parameters:

Name Type Description Default
tasks list[str] | None

A list of task names to include. If None, all tasks which pass the filters are included. If passed, other filters are ignored.

None
languages list[str] | None

A list of languages either specified as 3 letter languages codes (ISO 639-3, e.g. "eng") or as script languages codes e.g. "eng-Latn". For multilingual tasks this will also remove languages that are not in the specified list.

None
script list[str] | None

A list of script codes (ISO 15924 codes, e.g. "Latn"). If None, all scripts are included. For multilingual tasks this will also remove scripts that are not in the specified list.

None
domains list[TaskDomain] | None

A list of task domains, e.g. "Legal", "Medical", "Fiction".

None
task_types list[TaskType] | None

A string specifying the type of task e.g. "Classification" or "Retrieval". If None, all tasks are included.

None
categories list[TaskCategory] | None

A list of task categories these include "t2t" (text to text), "t2i" (text to image). See TaskMetadata for the full list.

None
exclude_superseded bool

A boolean flag to exclude datasets which are superseded by another.

True
eval_splits list[str] | None

A list of evaluation splits to include. If None, all splits are included.

None
exclusive_language_filter bool

Some datasets contains more than one language e.g. for STS22 the subset "de-en" contain eng and deu. If exclusive_language_filter is set to False both of these will be kept, but if set to True only those that contains all the languages specified will be kept.

False
modalities list[Modalities] | None

A list of modalities to include. If None, all modalities are included.

None
exclusive_modality_filter bool

If True, only keep tasks where all filter modalities are included in the task's modalities and ALL task modalities are in filter modalities (exact match). If False, keep tasks if any of the task's modalities match the filter modalities.

False
exclude_aggregate bool

If True, exclude aggregate tasks. If False, both aggregate and non-aggregate tasks are returned.

False
exclude_private bool

If True (default), exclude private/closed datasets (is_public=False). If False, include both public and private datasets.

True

Returns:

Type Description
MTEBTasks

A list of all initialized tasks objects which pass all of the filters (AND operation).

Examples:

>>> get_tasks(languages=["eng", "deu"], script=["Latn"], domains=["Legal"])
>>> get_tasks(languages=["eng"], script=["Latn"], task_types=["Classification"])
>>> get_tasks(languages=["eng"], script=["Latn"], task_types=["Clustering"], exclude_superseded=False)
>>> get_tasks(languages=["eng"], tasks=["WikipediaRetrievalMultilingual"], eval_splits=["test"])
>>> get_tasks(tasks=["STS22"], languages=["eng"], exclusive_language_filter=True) # don't include multilingual subsets containing English
Source code in mteb/get_tasks.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def get_tasks(
    tasks: list[str] | None = None,
    *,
    languages: list[str] | None = None,
    script: list[str] | None = None,
    domains: list[TaskDomain] | None = None,
    task_types: list[TaskType] | None = None,  # type: ignore
    categories: list[TaskCategory] | None = None,
    exclude_superseded: bool = True,
    eval_splits: list[str] | None = None,
    exclusive_language_filter: bool = False,
    modalities: list[Modalities] | None = None,
    exclusive_modality_filter: bool = False,
    exclude_aggregate: bool = False,
    exclude_private: bool = True,
) -> MTEBTasks:
    """Get a list of tasks based on the specified filters.

    Args:
        tasks: A list of task names to include. If None, all tasks which pass the filters are included. If passed, other filters are ignored.
        languages: A list of languages either specified as 3 letter languages codes (ISO 639-3, e.g. "eng") or as script languages codes e.g.
            "eng-Latn". For multilingual tasks this will also remove languages that are not in the specified list.
        script: A list of script codes (ISO 15924 codes, e.g. "Latn"). If None, all scripts are included. For multilingual tasks this will also remove scripts
            that are not in the specified list.
        domains: A list of task domains, e.g. "Legal", "Medical", "Fiction".
        task_types: A string specifying the type of task e.g. "Classification" or "Retrieval". If None, all tasks are included.
        categories: A list of task categories these include "t2t" (text to text), "t2i" (text to image). See TaskMetadata for the full list.
        exclude_superseded: A boolean flag to exclude datasets which are superseded by another.
        eval_splits: A list of evaluation splits to include. If None, all splits are included.
        exclusive_language_filter: Some datasets contains more than one language e.g. for STS22 the subset "de-en" contain eng and deu. If
            exclusive_language_filter is set to False both of these will be kept, but if set to True only those that contains all the languages
            specified will be kept.
        modalities: A list of modalities to include. If None, all modalities are included.
        exclusive_modality_filter: If True, only keep tasks where _all_ filter modalities are included in the
            task's modalities and ALL task modalities are in filter modalities (exact match).
            If False, keep tasks if _any_ of the task's modalities match the filter modalities.
        exclude_aggregate: If True, exclude aggregate tasks. If False, both aggregate and non-aggregate tasks are returned.
        exclude_private: If True (default), exclude private/closed datasets (is_public=False). If False, include both public and private datasets.

    Returns:
        A list of all initialized tasks objects which pass all of the filters (AND operation).

    Examples:
        >>> get_tasks(languages=["eng", "deu"], script=["Latn"], domains=["Legal"])
        >>> get_tasks(languages=["eng"], script=["Latn"], task_types=["Classification"])
        >>> get_tasks(languages=["eng"], script=["Latn"], task_types=["Clustering"], exclude_superseded=False)
        >>> get_tasks(languages=["eng"], tasks=["WikipediaRetrievalMultilingual"], eval_splits=["test"])
        >>> get_tasks(tasks=["STS22"], languages=["eng"], exclusive_language_filter=True) # don't include multilingual subsets containing English
    """
    if tasks:
        if domains or task_types or categories:
            logger.warning(
                "When `tasks` is provided, other filters like domains, task_types, and categories are ignored. "
                + "If you want to filter a list of tasks, please use `mteb.filter_tasks` instead."
            )
        _tasks = [
            get_task(
                task,
                languages,
                script,
                eval_splits=eval_splits,
                exclusive_language_filter=exclusive_language_filter,
            )
            for task in tasks
        ]
        return MTEBTasks(_tasks)

    _tasks = filter_tasks(
        TASK_LIST,
        languages=languages,
        script=script,
        domains=domains,
        task_types=task_types,
        categories=categories,
        modalities=modalities,
        exclusive_modality_filter=exclusive_modality_filter,
        exclude_superseded=exclude_superseded,
        exclude_aggregate=exclude_aggregate,
        exclude_private=exclude_private,
    )
    _tasks = [
        cls().filter_languages(languages, script).filter_eval_splits(eval_splits)
        for cls in _tasks
    ]

    return MTEBTasks(_tasks)

mteb.get_task(task_name, languages=None, script=None, eval_splits=None, hf_subsets=None, exclusive_language_filter=False)

Get a task by name.

Parameters:

Name Type Description Default
task_name str

The name of the task to fetch.

required
languages list[str] | None

A list of languages either specified as 3 letter languages codes (ISO 639-3, e.g. "eng") or as script languages codes e.g. "eng-Latn". For multilingual tasks this will also remove languages that are not in the specified list.

None
script list[str] | None

A list of script codes (ISO 15924 codes). If None, all scripts are included. For multilingual tasks this will also remove scripts

None
eval_splits list[str] | None

A list of evaluation splits to include. If None, all splits are included.

None
hf_subsets list[str] | None

A list of Huggingface subsets to evaluate on.

None
exclusive_language_filter bool

Some datasets contains more than one language e.g. for STS22 the subset "de-en" contain eng and deu. If exclusive_language_filter is set to False both of these will be kept, but if set to True only those that contains all the languages specified will be kept.

False

Returns:

Type Description
AbsTask

An initialized task object.

Examples:

>>> get_task("BornholmBitextMining")
Source code in mteb/get_tasks.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
def get_task(
    task_name: str,
    languages: list[str] | None = None,
    script: list[str] | None = None,
    eval_splits: list[str] | None = None,
    hf_subsets: list[str] | None = None,
    exclusive_language_filter: bool = False,
) -> AbsTask:
    """Get a task by name.

    Args:
        task_name: The name of the task to fetch.
        languages: A list of languages either specified as 3 letter languages codes (ISO 639-3, e.g. "eng") or as script languages codes e.g.
            "eng-Latn". For multilingual tasks this will also remove languages that are not in the specified list.
        script: A list of script codes (ISO 15924 codes). If None, all scripts are included. For multilingual tasks this will also remove scripts
        eval_splits: A list of evaluation splits to include. If None, all splits are included.
        hf_subsets: A list of Huggingface subsets to evaluate on.
        exclusive_language_filter: Some datasets contains more than one language e.g. for STS22 the subset "de-en" contain eng and deu. If
            exclusive_language_filter is set to False both of these will be kept, but if set to True only those that contains all the languages
            specified will be kept.

    Returns:
        An initialized task object.

    Examples:
        >>> get_task("BornholmBitextMining")
    """
    if task_name in _TASK_RENAMES:
        _task_name = _TASK_RENAMES[task_name]
        logger.warning(
            f"The task with the given name '{task_name}' has been renamed to '{_task_name}'. To prevent this warning use the new name."
        )

    if task_name not in _TASKS_REGISTRY:
        close_matches = difflib.get_close_matches(task_name, _TASKS_REGISTRY.keys())
        if close_matches:
            suggestion = f"KeyError: '{task_name}' not found. Did you mean: '{close_matches[0]}'?"
        else:
            suggestion = (
                f"KeyError: '{task_name}' not found and no similar keys were found."
            )
        raise KeyError(suggestion)
    task = _TASKS_REGISTRY[task_name]()
    if eval_splits:
        task.filter_eval_splits(eval_splits=eval_splits)
    return task.filter_languages(
        languages,
        script,
        hf_subsets=hf_subsets,
        exclusive_language_filter=exclusive_language_filter,
    )

mteb.filter_tasks

This script contains functions that are used to get an overview of the MTEB benchmark.

filter_tasks(tasks, *, languages=None, script=None, domains=None, task_types=None, categories=None, modalities=None, exclusive_modality_filter=False, exclude_superseded=False, exclude_aggregate=False, exclude_private=False)

filter_tasks(tasks: Sequence[AbsTask], *, languages: list[str] | None = None, script: list[str] | None = None, domains: list[TaskDomain] | None = None, task_types: list[TaskType] | None = None, categories: list[TaskCategory] | None = None, modalities: list[Modalities] | None = None, exclusive_modality_filter: bool = False, exclude_superseded: bool = False, exclude_aggregate: bool = False, exclude_private: bool = False) -> list[AbsTask]
filter_tasks(tasks: Sequence[type[AbsTask]], *, languages: list[str] | None = None, script: list[str] | None = None, domains: list[TaskDomain] | None = None, task_types: list[TaskType] | None = None, categories: list[TaskCategory] | None = None, modalities: list[Modalities] | None = None, exclusive_modality_filter: bool = False, exclude_superseded: bool = False, exclude_aggregate: bool = False, exclude_private: bool = False) -> list[type[AbsTask]]

Filter tasks based on the specified criteria.

Parameters:

Name Type Description Default
tasks Sequence[AbsTask] | Sequence[type[AbsTask]]

A list of task names to include. If None, all tasks which pass the filters are included. If passed, other filters are ignored.

required
languages list[str] | None

A list of languages either specified as 3 letter languages codes (ISO 639-3, e.g. "eng") or as script languages codes e.g. "eng-Latn". For multilingual tasks this will also remove languages that are not in the specified list.

None
script list[str] | None

A list of script codes (ISO 15924 codes, e.g. "Latn"). If None, all scripts are included. For multilingual tasks this will also remove scripts that are not in the specified list.

None
domains list[TaskDomain] | None

A list of task domains, e.g. "Legal", "Medical", "Fiction".

None
task_types list[TaskType] | None

A string specifying the type of task e.g. "Classification" or "Retrieval". If None, all tasks are included.

None
categories list[TaskCategory] | None

A list of task categories these include "t2t" (text to text), "t2i" (text to image). See TaskMetadata for the full list.

None
exclude_superseded bool

A boolean flag to exclude datasets which are superseded by another.

False
eval_splits

A list of evaluation splits to include. If None, all splits are included.

required
modalities list[Modalities] | None

A list of modalities to include. If None, all modalities are included.

None
exclusive_modality_filter bool

If True, only keep tasks where all filter modalities are included in the task's modalities and ALL task modalities are in filter modalities (exact match). If False, keep tasks if any of the task's modalities match the filter modalities.

False
exclude_aggregate bool

If True, exclude aggregate tasks. If False, both aggregate and non-aggregate tasks are returned.

False
exclude_private bool

If True (default), exclude private/closed datasets (is_public=False). If False, include both public and private datasets.

False

Returns:

Type Description
list[AbsTask] | list[type[AbsTask]]

A list of tasks objects which pass all of the filters.

Examples:

>>> text_classification_tasks = filter_tasks(my_tasks, task_types=["Classification"], modalities=["text"])
>>> medical_tasks = filter_tasks(my_tasks, domains=["Medical"])
>>> english_tasks = filter_tasks(my_tasks, languages=["eng"])
>>> latin_script_tasks = filter_tasks(my_tasks, script=["Latn"])
>>> text_image_tasks = filter_tasks(my_tasks, modalities=["text", "image"], exclusive_modality_filter=True)
Source code in mteb/filter_tasks.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def filter_tasks(
    tasks: Sequence[AbsTask] | Sequence[type[AbsTask]],
    *,
    languages: list[str] | None = None,
    script: list[str] | None = None,
    domains: list[TaskDomain] | None = None,
    task_types: list[TaskType] | None = None,  # type: ignore
    categories: list[TaskCategory] | None = None,
    modalities: list[Modalities] | None = None,
    exclusive_modality_filter: bool = False,
    exclude_superseded: bool = False,
    exclude_aggregate: bool = False,
    exclude_private: bool = False,
) -> list[AbsTask] | list[type[AbsTask]]:
    """Filter tasks based on the specified criteria.

    Args:
        tasks: A list of task names to include. If None, all tasks which pass the filters are included. If passed, other filters are ignored.
        languages: A list of languages either specified as 3 letter languages codes (ISO 639-3, e.g. "eng") or as script languages codes e.g.
            "eng-Latn". For multilingual tasks this will also remove languages that are not in the specified list.
        script: A list of script codes (ISO 15924 codes, e.g. "Latn"). If None, all scripts are included. For multilingual tasks this will also remove scripts
            that are not in the specified list.
        domains: A list of task domains, e.g. "Legal", "Medical", "Fiction".
        task_types: A string specifying the type of task e.g. "Classification" or "Retrieval". If None, all tasks are included.
        categories: A list of task categories these include "t2t" (text to text), "t2i" (text to image). See TaskMetadata for the full list.
        exclude_superseded: A boolean flag to exclude datasets which are superseded by another.
        eval_splits: A list of evaluation splits to include. If None, all splits are included.
        modalities: A list of modalities to include. If None, all modalities are included.
        exclusive_modality_filter: If True, only keep tasks where _all_ filter modalities are included in the
            task's modalities and ALL task modalities are in filter modalities (exact match).
            If False, keep tasks if _any_ of the task's modalities match the filter modalities.
        exclude_aggregate: If True, exclude aggregate tasks. If False, both aggregate and non-aggregate tasks are returned.
        exclude_private: If True (default), exclude private/closed datasets (is_public=False). If False, include both public and private datasets.

    Returns:
        A list of tasks objects which pass all of the filters.

    Examples:
        >>> text_classification_tasks = filter_tasks(my_tasks, task_types=["Classification"], modalities=["text"])
        >>> medical_tasks = filter_tasks(my_tasks, domains=["Medical"])
        >>> english_tasks = filter_tasks(my_tasks, languages=["eng"])
        >>> latin_script_tasks = filter_tasks(my_tasks, script=["Latn"])
        >>> text_image_tasks = filter_tasks(my_tasks, modalities=["text", "image"], exclusive_modality_filter=True)

    """
    langs_to_keep = None
    if languages:
        [_check_is_valid_language(lang) for lang in languages]
        langs_to_keep = set(languages)

    script_to_keep = None
    if script:
        [_check_is_valid_script(s) for s in script]
        script_to_keep = set(script)

    domains_to_keep = None
    if domains:
        domains_to_keep = set(domains)

    def _convert_to_set(domain: list[TaskDomain] | None) -> set:
        return set(domain) if domain is not None else set()

    task_types_to_keep = None
    if task_types:
        task_types_to_keep = set(task_types)

    categories_to_keep = None
    if categories:
        categories_to_keep = set(categories)

    modalities_to_keep = None
    if modalities:
        modalities_to_keep = set(modalities)

    _tasks = []
    for t in tasks:
        # For metadata and superseded_by, we can access them directly
        metadata = t.metadata

        if langs_to_keep and not langs_to_keep.intersection(metadata.languages):
            continue
        if script_to_keep and not script_to_keep.intersection(metadata.scripts):
            continue
        if domains_to_keep and not domains_to_keep.intersection(
            _convert_to_set(metadata.domains)
        ):
            continue
        if task_types_to_keep and metadata.type not in task_types_to_keep:
            continue
        if categories_to_keep and metadata.category not in categories_to_keep:
            continue
        if modalities_to_keep:
            if exclusive_modality_filter:
                if set(metadata.modalities) != modalities_to_keep:
                    continue
            else:
                if not modalities_to_keep.intersection(metadata.modalities):
                    continue
        if exclude_superseded and metadata.superseded_by is not None:
            continue
        is_aggregate = (
            issubclass(t, AbsTaskAggregate)
            if isinstance(t, type)
            else isinstance(t, AbsTaskAggregate)
        )
        if exclude_aggregate and is_aggregate:
            continue
        if exclude_private and not metadata.is_public:
            continue

        _tasks.append(t)

    return _tasks

Metadata

Each task also contains extensive metadata. We annotate this using the following object, which allows us to use pydantic to validate the metadata.

mteb.TaskMetadata

Bases: BaseModel

Metadata for a task.

Attributes:

Name Type Description
dataset MetadataDatasetDict

All arguments to pass to datasets.load_dataset to load the dataset for the task.

name str

The name of the task.

description str

A description of the task.

type TaskType

The type of the task. This includes "Classification", "Summarization", "STS", "Retrieval", "Reranking", "Clustering", "PairClassification", "BitextMining". The type should match the abstask type.

category TaskCategory | None

The category of the task. E.g. includes "t2t" (text to text), "t2i" (text to image).

reference StrURL | None

A URL to the documentation of the task. E.g. a published paper.

eval_splits list[str]

The splits of the dataset used for evaluation.

eval_langs Languages

The languages of the dataset used for evaluation. Languages follows a ETF BCP 47 standard consisting of "{language}-{script}" tag (e.g. "eng-Latn"). Where language is specified as a list of ISO 639-3 language codes (e.g. "eng") followed by ISO 15924 script codes (e.g. "Latn"). Can be either a list of languages or a dictionary mapping huggingface subsets to lists of languages (e.g. if a the huggingface dataset contain different languages).

main_score str

The main score used for evaluation.

date tuple[StrDate, StrDate] | None

The date when the data was collected. Specified as a tuple of two dates.

domains list[TaskDomain] | None

The domains of the data. This includes "Non-fiction", "Social", "Fiction", "News", "Academic", "Blog", "Encyclopaedic", "Government", "Legal", "Medical", "Poetry", "Religious", "Reviews", "Web", "Spoken", "Written". A dataset can belong to multiple domains.

task_subtypes list[TaskSubtype] | None

The subtypes of the task. E.g. includes "Sentiment/Hate speech", "Thematic Clustering". Feel free to update the list as needed.

license Licenses | StrURL | None

The license of the data specified as lowercase, e.g. "cc-by-nc-4.0". If the license is not specified, use "not specified". For custom licenses a URL is used.

annotations_creators AnnotatorType | None

The type of the annotators. Includes "expert-annotated" (annotated by experts), "human-annotated" (annotated e.g. by mturkers), "derived" (derived from structure in the data).

dialect list[str] | None

The dialect of the data, if applicable. Ideally specified as a BCP-47 language tag. Empty list if no dialects are present.

sample_creation SampleCreationMethod | None

The method of text creation. Includes "found", "created", "machine-translated", "machine-translated and verified", and "machine-translated and localized".

prompt str | PromptDict | None

The prompt used for the task. Can be a string or a dictionary containing the query and passage prompts.

bibtex_citation str | None

The BibTeX citation for the dataset. Should be an empty string if no citation is available.

adapted_from Sequence[str] | None

Datasets adapted (translated, sampled from, etc.) from other datasets.

is_public bool

Whether the dataset is publicly available. If False (closed/private), a HuggingFace token is required to run the datasets.

superseded_by str | None

Denotes the task that this task is superseded by. Used to issue warning to users of outdated datasets, while maintaining reproducibility of existing benchmarks.

Source code in mteb/abstasks/task_metadata.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
class TaskMetadata(BaseModel):
    """Metadata for a task.

    Attributes:
        dataset: All arguments to pass to [datasets.load_dataset](https://huggingface.co/docs/datasets/v2.18.0/en/package_reference/loading_methods#datasets.load_dataset) to load the dataset for the task.
        name: The name of the task.
        description: A description of the task.
        type: The type of the task. This includes "Classification", "Summarization", "STS", "Retrieval", "Reranking", "Clustering",
            "PairClassification", "BitextMining". The type should match the abstask type.
        category: The category of the task. E.g. includes "t2t" (text to text), "t2i" (text to image).
        reference: A URL to the documentation of the task. E.g. a published paper.
        eval_splits: The splits of the dataset used for evaluation.
        eval_langs: The languages of the dataset used for evaluation. Languages follows a ETF BCP 47 standard consisting of "{language}-{script}"
            tag (e.g. "eng-Latn"). Where language is specified as a list of ISO 639-3 language codes (e.g. "eng") followed by ISO 15924 script codes
            (e.g. "Latn"). Can be either a list of languages or a dictionary mapping huggingface subsets to lists of languages (e.g. if a the
            huggingface dataset contain different languages).
        main_score: The main score used for evaluation.
        date: The date when the data was collected. Specified as a tuple of two dates.
        domains: The domains of the data. This includes "Non-fiction", "Social", "Fiction", "News", "Academic", "Blog", "Encyclopaedic",
            "Government", "Legal", "Medical", "Poetry", "Religious", "Reviews", "Web", "Spoken", "Written". A dataset can belong to multiple domains.
        task_subtypes: The subtypes of the task. E.g. includes "Sentiment/Hate speech", "Thematic Clustering". Feel free to update the list as needed.
        license: The license of the data specified as lowercase, e.g. "cc-by-nc-4.0". If the license is not specified, use "not specified". For custom licenses a URL is used.
        annotations_creators: The type of the annotators. Includes "expert-annotated" (annotated by experts), "human-annotated" (annotated e.g. by
            mturkers), "derived" (derived from structure in the data).
        dialect: The dialect of the data, if applicable. Ideally specified as a BCP-47 language tag. Empty list if no dialects are present.
        sample_creation: The method of text creation. Includes "found", "created", "machine-translated", "machine-translated and verified", and
            "machine-translated and localized".
        prompt: The prompt used for the task. Can be a string or a dictionary containing the query and passage prompts.
        bibtex_citation: The BibTeX citation for the dataset. Should be an empty string if no citation is available.
        adapted_from: Datasets adapted (translated, sampled from, etc.) from other datasets.
        is_public: Whether the dataset is publicly available. If False (closed/private), a HuggingFace token is required to run the datasets.
        superseded_by: Denotes the task that this task is superseded by. Used to issue warning to users of outdated datasets, while maintaining
            reproducibility of existing benchmarks.
    """

    model_config = ConfigDict(extra="forbid")

    dataset: MetadataDatasetDict

    name: str
    description: str
    prompt: str | PromptDict | None = None
    type: TaskType
    modalities: list[Modalities] = ["text"]
    category: TaskCategory | None = None
    reference: StrURL | None = None

    eval_splits: list[str] = ["test"]
    eval_langs: Languages
    main_score: str

    date: tuple[StrDate, StrDate] | None = None
    domains: list[TaskDomain] | None = None
    task_subtypes: list[TaskSubtype] | None = None
    license: Licenses | StrURL | None = None

    annotations_creators: AnnotatorType | None = None
    dialect: list[str] | None = None

    sample_creation: SampleCreationMethod | None = None
    bibtex_citation: str | None = None
    adapted_from: Sequence[str] | None = None
    is_public: bool = True
    superseded_by: str | None = None

    def _validate_metadata(self) -> None:
        self._eval_langs_are_valid(self.eval_langs)

    @field_validator("prompt")
    @classmethod
    def _check_prompt_is_valid(
        cls, prompt: str | PromptDict | None
    ) -> str | PromptDict | None:
        if isinstance(prompt, dict):
            for key in prompt:
                if key not in [e.value for e in PromptType]:
                    raise ValueError(
                        "The prompt dictionary should only contain the keys 'query' and 'passage'."
                    )
        return prompt

    def _eval_langs_are_valid(self, eval_langs: Languages) -> None:
        """This method checks that the eval_langs are specified as a list of languages."""
        if isinstance(eval_langs, dict):
            for langs in eval_langs.values():
                for code in langs:
                    check_language_code(code)
        else:
            for code in eval_langs:
                check_language_code(code)

    @property
    def bcp47_codes(self) -> list[ISOLanguageScript]:
        """Return the languages and script codes of the dataset formatting in accordance with the BCP-47 standard."""
        if isinstance(self.eval_langs, dict):
            return sorted(
                {lang for langs in self.eval_langs.values() for lang in langs}
            )
        return sorted(set(self.eval_langs))

    @property
    def languages(self) -> list[str]:
        """Return the languages of the dataset as iso639-3 codes."""

        def get_lang(lang: str) -> str:
            return lang.split("-")[0]

        if isinstance(self.eval_langs, dict):
            return sorted(
                {get_lang(lang) for langs in self.eval_langs.values() for lang in langs}
            )
        return sorted({get_lang(lang) for lang in self.eval_langs})

    @property
    def scripts(self) -> set[str]:
        """Return the scripts of the dataset as iso15924 codes."""

        def get_script(lang: str) -> str:
            return lang.split("-")[1]

        if isinstance(self.eval_langs, dict):
            return {
                get_script(lang) for langs in self.eval_langs.values() for lang in langs
            }
        return {get_script(lang) for lang in self.eval_langs}

    def is_filled(self) -> bool:
        """Check if all the metadata fields are filled.

        Returns:
            True if all the metadata fields are filled, False otherwise.
        """
        return all(
            getattr(self, field_name) is not None
            for field_name in self.model_fields
            if field_name not in ["prompt", "adapted_from", "superseded_by"]
        )

    @property
    def hf_subsets_to_langscripts(self) -> dict[HFSubset, list[ISOLanguageScript]]:
        """Return a dictionary mapping huggingface subsets to languages."""
        if isinstance(self.eval_langs, dict):
            return self.eval_langs
        return {"default": self.eval_langs}  # type: ignore

    @property
    def intext_citation(self, include_cite: bool = True) -> str:
        """Create an in-text citation for the dataset."""
        cite = ""
        if self.bibtex_citation:
            cite = f"{self.bibtex_citation.split(',')[0].split('{')[1]}"
        if include_cite and cite:
            # check for whitespace in the citation
            if " " in cite:
                logger.warning(
                    "Citation contains whitespace. Please ensure that the citation is correctly formatted."
                )
            return f"\\cite{{{cite}}}"
        return cite

    @property
    def descriptive_stats(self) -> dict[str, DescriptiveStatistics] | None:
        """Return the descriptive statistics for the dataset."""
        if self.descriptive_stat_path.exists():
            with self.descriptive_stat_path.open("r") as f:
                return json.load(f)
        return None

    @property
    def descriptive_stat_path(self) -> Path:
        """Return the path to the descriptive statistics file."""
        descriptive_stat_base_dir = Path(__file__).parent.parent / "descriptive_stats"
        if self.type in MIEB_TASK_TYPE:
            descriptive_stat_base_dir = descriptive_stat_base_dir / "Image"
        task_type_dir = descriptive_stat_base_dir / self.type
        if not descriptive_stat_base_dir.exists():
            descriptive_stat_base_dir.mkdir()
        if not task_type_dir.exists():
            task_type_dir.mkdir()
        return task_type_dir / f"{self.name}.json"

    @property
    def n_samples(self) -> dict[str, int] | None:
        """Returns the number of samples in the dataset"""
        stats = self.descriptive_stats
        if not stats:
            return None

        n_samples = {}
        for subset, subset_value in stats.items():
            if subset == "hf_subset_descriptive_stats":
                continue
            n_samples[subset] = subset_value["num_samples"]  # type: ignore
        return n_samples

    @property
    def hf_subsets(self) -> list[str]:
        """Return the huggingface subsets."""
        return list(self.hf_subsets_to_langscripts.keys())

    @property
    def is_multilingual(self) -> bool:
        """Check if the task is multilingual."""
        return isinstance(self.eval_langs, dict)

    def __hash__(self) -> int:
        return hash(self.model_dump_json())

    @property
    def revision(self) -> str:
        """Return the dataset revision."""
        return self.dataset["revision"]

    def get_modalities(self, prompt_type: PromptType | None = None) -> list[Modalities]:
        """Get the modalities for the task based category if prompt_type provided.

        Args:
            prompt_type: The prompt type to get the modalities for.

        Returns:
            A list of modalities for the task.

        Raises:
            ValueError: If the prompt type is not recognized.
        """
        if prompt_type is None:
            return self.modalities
        query_modalities, doc_modalities = self.category.split("2")
        category_to_modality: dict[str, Modalities] = {
            "t": "text",
            "i": "image",
        }
        if prompt_type == PromptType.query:
            return [
                category_to_modality[query_modality]
                for query_modality in query_modalities
            ]
        if prompt_type == PromptType.document:
            return [
                category_to_modality[doc_modality] for doc_modality in doc_modalities
            ]
        raise ValueError(f"Unknown prompt type: {prompt_type}")

    def _create_dataset_card_data(
        self,
        existing_dataset_card_data: DatasetCardData | None = None,
    ) -> tuple[DatasetCardData, dict[str, Any]]:
        """Create a DatasetCardData object from the task metadata.

        Args:
            existing_dataset_card_data: The existing DatasetCardData object to update. If None, a new object will be created.

        Returns:
            A DatasetCardData object with the metadata for the task with kwargs to card
        """
        if existing_dataset_card_data is None:
            existing_dataset_card_data = DatasetCardData()

        dataset_type = [
            *self._hf_task_type(),
            *self._hf_task_category(),
            *self._hf_subtypes(),
        ]
        languages = self._hf_languages()

        multilinguality = "monolingual" if len(languages) == 1 else "multilingual"
        if self.sample_creation and "translated" in self.sample_creation:
            multilinguality = "translated"

        if self.adapted_from is not None:
            source_datasets = [
                task.metadata.dataset["path"]
                for task in mteb.get_tasks(self.adapted_from)
            ]
            source_datasets.append(self.dataset["path"])
        else:
            source_datasets = None if not self.dataset else [self.dataset["path"]]

        tags = ["mteb"] + self.modalities

        descriptive_stats = self.descriptive_stats
        if descriptive_stats is not None:
            for split, split_stat in descriptive_stats.items():
                if len(split_stat.get("hf_subset_descriptive_stats", {})) > 10:
                    split_stat.pop("hf_subset_descriptive_stats", {})
            descriptive_stats = json.dumps(descriptive_stats, indent=4)

        dataset_card_data_params = existing_dataset_card_data.to_dict()
        # override the existing values
        dataset_card_data_params.update(
            dict(
                language=languages,
                license=self._hf_license(),
                annotations_creators=[self.annotations_creators]
                if self.annotations_creators
                else None,
                multilinguality=multilinguality,
                source_datasets=source_datasets,
                task_categories=dataset_type,
                task_ids=self._hf_subtypes(),
                tags=tags,
            )
        )

        return (
            DatasetCardData(**dataset_card_data_params),
            # parameters for readme generation
            dict(
                citation=self.bibtex_citation,
                dataset_description=self.description,
                dataset_reference=self.reference,
                descritptive_stats=descriptive_stats,
                dataset_task_name=self.name,
                category=self.category,
                domains=", ".join(self.domains) if self.domains else None,
            ),
        )

    def generate_dataset_card(
        self,
        existing_dataset_card: DatasetCard | None = None,
    ) -> DatasetCard:
        """Generates a dataset card for the task.

        Args:
            existing_dataset_card: The existing dataset card to update. If None, a new dataset card will be created.

        Returns:
            DatasetCard: The dataset card for the task.
        """
        path = Path(__file__).parent / "dataset_card_template.md"
        existing_dataset_card_data = (
            existing_dataset_card.data if existing_dataset_card else None
        )
        dataset_card_data, template_kwargs = self._create_dataset_card_data(
            existing_dataset_card_data
        )
        dataset_card = DatasetCard.from_template(
            card_data=dataset_card_data,
            template_path=str(path),
            **template_kwargs,
        )
        return dataset_card

    def push_dataset_card_to_hub(self, repo_name: str) -> None:
        """Pushes the dataset card to the huggingface hub.

        Args:
            repo_name: The name of the repository to push the dataset card to.
        """
        dataset_card = None
        if repo_exists(
            repo_name, repo_type=constants.REPO_TYPE_DATASET
        ) and file_exists(
            repo_name, constants.REPOCARD_NAME, repo_type=constants.REPO_TYPE_DATASET
        ):
            dataset_card = DatasetCard.load(repo_name)
        dataset_card = self.generate_dataset_card(dataset_card)
        dataset_card.push_to_hub(repo_name, commit_message="Add dataset card")

    def _hf_subtypes(self) -> list[str]:
        # to get full list of available task_ids execute
        # requests.post("https://huggingface.co/api/validate-yaml", json={
        #   "content": "---\ntask_ids: 'test'\n---",
        #   "repoType": "dataset"
        # })
        mteb_to_hf_subtype = {
            "Article retrieval": ["document-retrieval"],
            "Conversational retrieval": ["conversational", "utterance-retrieval"],
            "Dialect pairing": [],
            "Dialog Systems": ["dialogue-modeling", "dialogue-generation"],
            "Discourse coherence": [],
            "Duplicate Image Retrieval": [],
            "Language identification": ["language-identification"],
            "Linguistic acceptability": ["acceptability-classification"],
            "Political classification": [],
            "Question answering": [
                "multiple-choice-qa",
                "question-answering",
            ],
            "Sentiment/Hate speech": [
                "sentiment-analysis",
                "sentiment-scoring",
                "sentiment-classification",
                "hate-speech-detection",
            ],
            "Thematic clustering": [],
            "Scientific Reranking": [],
            "Claim verification": ["fact-checking", "fact-checking-retrieval"],
            "Topic classification": ["topic-classification"],
            "Code retrieval": [],
            "False Friends": [],
            "Cross-Lingual Semantic Discrimination": [],
            "Textual Entailment": ["natural-language-inference"],
            "Counterfactual Detection": [],
            "Emotion classification": [],
            "Reasoning as Retrieval": [],
            "Rendered Texts Understanding": [],
            "Image Text Retrieval": [],
            "Object recognition": [],
            "Scene recognition": [],
            "Caption Pairing": ["image-captioning"],
            "Emotion recognition": [],
            "Textures recognition": [],
            "Activity recognition": [],
            "Tumor detection": [],
            "Duplicate Detection": [],
            "Rendered semantic textual similarity": [
                "semantic-similarity-scoring",
                "rendered semantic textual similarity",
            ],
            "Intent classification": [
                "intent-classification",
            ],
        }
        subtypes = []
        if self.task_subtypes:
            for subtype in self.task_subtypes:
                subtypes.extend(mteb_to_hf_subtype.get(subtype, []))
        return subtypes

    def _hf_task_type(self) -> list[str]:
        # to get full list of task_types execute:
        # requests.post("https://huggingface.co/api/validate-yaml", json={
        #     "content": "---\ntask_categories: ['test']\n---", "repoType": "dataset"
        # }).json()
        # or look at https://huggingface.co/tasks
        mteb_task_type_to_datasets = {
            # Text
            "BitextMining": ["translation"],
            "Classification": ["text-classification"],
            "MultilabelClassification": ["text-classification"],
            "Clustering": ["text-classification"],
            "PairClassification": ["text-classification"],
            "Reranking": ["text-ranking"],
            "Retrieval": ["text-retrieval"],
            "STS": ["sentence-similarity"],
            "Summarization": ["summarization"],
            "InstructionRetrieval": ["text-retrieval"],
            "InstructionReranking": ["text-ranking"],
            # Image
            "Any2AnyMultiChoice": ["visual-question-answering"],
            "Any2AnyRetrieval": ["visual-document-retrieval"],
            "Any2AnyMultilingualRetrieval": ["visual-document-retrieval"],
            "VisionCentricQA": ["visual-question-answering"],
            "ImageClustering": ["image-clustering"],
            "ImageClassification": ["image-classification"],
            "ImageMultilabelClassification": ["image-classification"],
            "DocumentUnderstanding": ["visual-document-retrieval"],
            "VisualSTS(eng)": ["other"],
            "VisualSTS(multi)": ["other"],
            "ZeroShotClassification": ["zero-shot-classification"],
            "Compositionality": ["other"],
        }
        if self.type == "ZeroShotClassification":
            if self.modalities == ["image"]:
                return ["zero-shot-image-classification"]
            return ["zero-shot-classification"]

        return mteb_task_type_to_datasets[self.type]

    def _hf_task_category(self) -> list[str]:
        dataset_type = []
        if self.category in ["i2i", "it2i", "i2it", "it2it"]:
            dataset_type.append("image-to-image")
        if self.category in ["i2t", "t2i", "it2t", "it2i", "t2it", "i2it", "it2it"]:
            dataset_type.extend(["image-to-text", "text-to-image"])
        if self.category in ["it2t", "it2i", "t2it", "i2it", "it2it"]:
            dataset_type.extend(["image-text-to-text"])
        return dataset_type

    def _hf_languages(self) -> list[str]:
        languages: list[str] = []
        if self.is_multilingual:
            for val in list(self.eval_langs.values()):
                languages.extend(val)
        else:
            languages = self.eval_langs
        # value "python" is not valid. It must be an ISO 639-1, 639-2 or 639-3 code (two/three letters),
        # or a special value like "code", "multilingual".
        readme_langs = []
        for lang in languages:
            lang_name, family = lang.split("-")
            if family == "Code":
                readme_langs.append("code")
            else:
                readme_langs.append(lang_name)
        return sorted(set(readme_langs))

    def _hf_license(self) -> str:
        dataset_license = self.license
        if dataset_license:
            license_mapping = {
                "not specified": "unknown",
                "msr-la-nc": "other",
                "cc-by-nd-2.1-jp": "other",
            }
            dataset_license = license_mapping.get(
                dataset_license,
                "other" if dataset_license.startswith("http") else dataset_license,
            )
        return dataset_license

bcp47_codes property

Return the languages and script codes of the dataset formatting in accordance with the BCP-47 standard.

descriptive_stat_path property

Return the path to the descriptive statistics file.

descriptive_stats property

Return the descriptive statistics for the dataset.

hf_subsets property

Return the huggingface subsets.

hf_subsets_to_langscripts property

Return a dictionary mapping huggingface subsets to languages.

intext_citation property

Create an in-text citation for the dataset.

is_multilingual property

Check if the task is multilingual.

languages property

Return the languages of the dataset as iso639-3 codes.

n_samples property

Returns the number of samples in the dataset

revision property

Return the dataset revision.

scripts property

Return the scripts of the dataset as iso15924 codes.

generate_dataset_card(existing_dataset_card=None)

Generates a dataset card for the task.

Parameters:

Name Type Description Default
existing_dataset_card DatasetCard | None

The existing dataset card to update. If None, a new dataset card will be created.

None

Returns:

Name Type Description
DatasetCard DatasetCard

The dataset card for the task.

Source code in mteb/abstasks/task_metadata.py
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
def generate_dataset_card(
    self,
    existing_dataset_card: DatasetCard | None = None,
) -> DatasetCard:
    """Generates a dataset card for the task.

    Args:
        existing_dataset_card: The existing dataset card to update. If None, a new dataset card will be created.

    Returns:
        DatasetCard: The dataset card for the task.
    """
    path = Path(__file__).parent / "dataset_card_template.md"
    existing_dataset_card_data = (
        existing_dataset_card.data if existing_dataset_card else None
    )
    dataset_card_data, template_kwargs = self._create_dataset_card_data(
        existing_dataset_card_data
    )
    dataset_card = DatasetCard.from_template(
        card_data=dataset_card_data,
        template_path=str(path),
        **template_kwargs,
    )
    return dataset_card

get_modalities(prompt_type=None)

Get the modalities for the task based category if prompt_type provided.

Parameters:

Name Type Description Default
prompt_type PromptType | None

The prompt type to get the modalities for.

None

Returns:

Type Description
list[Modalities]

A list of modalities for the task.

Raises:

Type Description
ValueError

If the prompt type is not recognized.

Source code in mteb/abstasks/task_metadata.py
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def get_modalities(self, prompt_type: PromptType | None = None) -> list[Modalities]:
    """Get the modalities for the task based category if prompt_type provided.

    Args:
        prompt_type: The prompt type to get the modalities for.

    Returns:
        A list of modalities for the task.

    Raises:
        ValueError: If the prompt type is not recognized.
    """
    if prompt_type is None:
        return self.modalities
    query_modalities, doc_modalities = self.category.split("2")
    category_to_modality: dict[str, Modalities] = {
        "t": "text",
        "i": "image",
    }
    if prompt_type == PromptType.query:
        return [
            category_to_modality[query_modality]
            for query_modality in query_modalities
        ]
    if prompt_type == PromptType.document:
        return [
            category_to_modality[doc_modality] for doc_modality in doc_modalities
        ]
    raise ValueError(f"Unknown prompt type: {prompt_type}")

is_filled()

Check if all the metadata fields are filled.

Returns:

Type Description
bool

True if all the metadata fields are filled, False otherwise.

Source code in mteb/abstasks/task_metadata.py
350
351
352
353
354
355
356
357
358
359
360
def is_filled(self) -> bool:
    """Check if all the metadata fields are filled.

    Returns:
        True if all the metadata fields are filled, False otherwise.
    """
    return all(
        getattr(self, field_name) is not None
        for field_name in self.model_fields
        if field_name not in ["prompt", "adapted_from", "superseded_by"]
    )

push_dataset_card_to_hub(repo_name)

Pushes the dataset card to the huggingface hub.

Parameters:

Name Type Description Default
repo_name str

The name of the repository to push the dataset card to.

required
Source code in mteb/abstasks/task_metadata.py
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
def push_dataset_card_to_hub(self, repo_name: str) -> None:
    """Pushes the dataset card to the huggingface hub.

    Args:
        repo_name: The name of the repository to push the dataset card to.
    """
    dataset_card = None
    if repo_exists(
        repo_name, repo_type=constants.REPO_TYPE_DATASET
    ) and file_exists(
        repo_name, constants.REPOCARD_NAME, repo_type=constants.REPO_TYPE_DATASET
    ):
        dataset_card = DatasetCard.load(repo_name)
    dataset_card = self.generate_dataset_card(dataset_card)
    dataset_card.push_to_hub(repo_name, commit_message="Add dataset card")

Metadata Types

mteb.abstasks.task_metadata.AnnotatorType = Literal['expert-annotated', 'human-annotated', 'derived', 'LM-generated', 'LM-generated and reviewed'] module-attribute

The type of the annotators. Is often important for understanding the quality of a dataset.

mteb.abstasks.task_metadata.SampleCreationMethod = Literal['found', 'created', 'human-translated and localized', 'human-translated', 'machine-translated', 'machine-translated and verified', 'machine-translated and localized', 'LM-generated and verified', 'machine-translated and LM verified', 'rendered', 'multiple'] module-attribute

How the text was created. It can be an important factor for understanding the quality of a dataset. E.g. used to filter out machine-translated datasets.

mteb.abstasks.task_metadata.TaskCategory = Literal['t2t', 't2c', 'i2i', 'i2c', 'i2t', 't2i', 'it2t', 'it2i', 'i2it', 't2it', 'it2it'] module-attribute

The category of the task.

  1. t2t: text to text
  2. t2c: text to category
  3. i2i: image to image
  4. i2c: image to category
  5. i2t: image to text
  6. t2i: text to image
  7. it2t: image+text to text
  8. it2i: image+text to image
  9. i2it: image to image+text
  10. t2it: text to image+text
  11. it2it: image+text to image+text

mteb.abstasks.task_metadata.TaskDomain = Literal['Academic', 'Blog', 'Constructed', 'Encyclopaedic', 'Engineering', 'Fiction', 'Government', 'Legal', 'Medical', 'News', 'Non-fiction', 'Poetry', 'Religious', 'Reviews', 'Scene', 'Social', 'Spoken', 'Subtitles', 'Web', 'Written', 'Programming', 'Chemistry', 'Financial', 'Entertainment'] module-attribute

The domains follow the categories used in the Universal Dependencies project, though we updated them where deemed appropriate. These do not have to be mutually exclusive.

mteb.abstasks.task_metadata.TaskType = Literal[_TASK_TYPE] module-attribute

The type of the task. E.g. includes "Classification", "Retrieval" and "Clustering".

mteb.abstasks.task_metadata.TaskSubtype = Literal['Article retrieval', 'Patent retrieval', 'Conversational retrieval', 'Dialect pairing', 'Dialog Systems', 'Discourse coherence', 'Duplicate Image Retrieval', 'Language identification', 'Linguistic acceptability', 'Political classification', 'Question answering', 'Sentiment/Hate speech', 'Thematic clustering', 'Scientific Reranking', 'Claim verification', 'Topic classification', 'Code retrieval', 'False Friends', 'Cross-Lingual Semantic Discrimination', 'Textual Entailment', 'Counterfactual Detection', 'Emotion classification', 'Reasoning as Retrieval', 'Rendered Texts Understanding', 'Image Text Retrieval', 'Object recognition', 'Scene recognition', 'Caption Pairing', 'Emotion recognition', 'Textures recognition', 'Activity recognition', 'Tumor detection', 'Duplicate Detection', 'Rendered semantic textual similarity', 'Intent classification'] module-attribute

The subtypes of the task. E.g. includes "Sentiment/Hate speech", "Thematic Clustering". This list can be updated as needed.

mteb.abstasks.task_metadata.PromptDict = TypedDict('PromptDict', {(prompt_type.value): strfor prompt_type in PromptType}, total=False) module-attribute

A dictionary containing the prompt used for the task.

Attributes:

Name Type Description
query

The prompt used for the queries in the task.

document

The prompt used for the passages in the task.

The Task Object

All tasks in mteb inherits from the following abstract class.

mteb.AbsTask

Bases: ABC

The abstract class for the tasks

Attributes:

Name Type Description
metadata TaskMetadata

The metadata describing the task

dataset dict[HFSubset, DatasetDict] | None

The dataset represented as a dictionary on the form {"hf subset": {"split": Dataset}} where "split" is the dataset split (e.g. "test") and Dataset is a datasets.Dataset object. "hf subset" is the data subset on Huggingface typically used to denote the language e.g. datasets.load_dataset("data", "en"). If the dataset does not have a subset this is simply "default".

seed

The random seed used for reproducibility.

hf_subsets list[HFSubset]

The list of Huggingface subsets to use.

data_loaded bool

Denotes if the dataset is loaded or not. This is used to avoid loading the dataset multiple times.

abstask_prompt str | None

Prompt to use for the task for instruction model if not prompt is provided in TaskMetadata.prompt.

fast_loading bool

Deprecated. Denotes if the task should be loaded using the fast loading method. This is only possible if the dataset have a "default" config. We don't recommend to use this method, and suggest to use different subsets for loading datasets. This was used only for historical reasons and will be removed in the future.

Source code in mteb/abstasks/abstask.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
class AbsTask(ABC):
    """The abstract class for the tasks

    Attributes:
        metadata: The metadata describing the task
        dataset: The dataset represented as a dictionary on the form {"hf subset": {"split": Dataset}} where "split" is the dataset split (e.g. "test")
            and Dataset is a datasets.Dataset object. "hf subset" is the data subset on Huggingface typically used to denote the language e.g.
            datasets.load_dataset("data", "en"). If the dataset does not have a subset this is simply "default".
        seed: The random seed used for reproducibility.
        hf_subsets: The list of Huggingface subsets to use.
        data_loaded: Denotes if the dataset is loaded or not. This is used to avoid loading the dataset multiple times.
        abstask_prompt: Prompt to use for the task for instruction model if not prompt is provided in TaskMetadata.prompt.
        fast_loading: **Deprecated**. Denotes if the task should be loaded using the fast loading method.
            This is only possible if the dataset have a "default" config. We don't recommend to use this method, and suggest to use different subsets for loading datasets.
            This was used only for historical reasons and will be removed in the future.
    """

    metadata: TaskMetadata
    abstask_prompt: str | None = None
    _eval_splits: list[str] | None = None
    dataset: dict[HFSubset, DatasetDict] | None = None
    data_loaded: bool = False
    hf_subsets: list[HFSubset]
    fast_loading: bool = False

    _support_cross_encoder: bool = False
    _support_search: bool = False

    def __init__(self, seed: int = 42, **kwargs: Any) -> None:
        """The init function. This is called primarily to set the seed.

        Args:
            seed: An integer seed.
            kwargs: arguments passed to subclasses.
        """
        self.seed = seed
        self.rng_state, self.np_rng = _set_seed(seed)
        self.hf_subsets = self.metadata.hf_subsets

    def check_if_dataset_is_superseded(self) -> None:
        """Check if the dataset is superseded by a newer version."""
        if self.superseded_by:
            logger.warning(
                f"Dataset '{self.metadata.name}' is superseded by '{self.superseded_by}', you might consider using the newer version of the dataset."
            )

    def dataset_transform(self):
        """A transform operations applied to the dataset after loading.

        This method is useful when the dataset from Huggingface is not in an `mteb` compatible format.
        Override this method if your dataset requires additional transformation.
        """
        pass

    def evaluate(
        self,
        model: MTEBModels,
        split: str = "test",
        subsets_to_run: list[HFSubset] | None = None,
        *,
        encode_kwargs: dict[str, Any],
        prediction_folder: Path | None = None,
        **kwargs: Any,
    ) -> dict[HFSubset, ScoresDict]:
        """Evaluates an MTEB compatible model on the task.

        Args:
            model: MTEB compatible model. Implements a encode(sentences) method, that encodes sentences and returns an array of embeddings
            split: Which split (e.g. *"test"*) to be used.
            subsets_to_run: List of huggingface subsets (HFSubsets) to evaluate. If None, all subsets are evaluated.
            encode_kwargs: Additional keyword arguments that are passed to the model's `encode` method.
            prediction_folder: Folder to save model predictions
            kwargs: Additional keyword arguments that are passed to the _evaluate_subset method.

        Returns:
            A dictionary with the scores for each subset.

        Raises:
            TypeError: If the model is a CrossEncoder and the task does not support CrossEncoders.
            TypeError: If the model is a SearchProtocol and the task does not support Search.
        """
        if isinstance(model, CrossEncoderProtocol) and not self._support_cross_encoder:
            raise TypeError(
                f"Model {model} is a CrossEncoder, but this task {self.metadata.name} does not support CrossEncoders. "
                "Please use a Encoder model instead."
            )

        # encoders might implement search protocols
        if (
            isinstance(model, SearchProtocol)
            and not isinstance(model, EncoderProtocol)
            and not self._support_search
        ):
            raise TypeError(
                f"Model {model} is a SearchProtocol, but this task {self.metadata.name} does not support Search. "
                "Please use a Encoder model instead."
            )

        if not self.data_loaded:
            self.load_data()

        self.dataset = cast(dict[HFSubset, DatasetDict], self.dataset)

        scores = {}
        if self.hf_subsets is None:
            hf_subsets = list(self.dataset.keys())
        else:
            hf_subsets = copy(self.hf_subsets)

        if subsets_to_run is not None:  # allow overwrites of pre-filtering
            hf_subsets = [s for s in hf_subsets if s in subsets_to_run]

        for hf_subset in hf_subsets:
            logger.info(
                f"Running task {self.metadata.name} ({split=}, {hf_subset=})..."
            )
            if hf_subset not in self.dataset and hf_subset == "default":
                data_split = self.dataset[split]
            else:
                data_split = self.dataset[hf_subset][split]
            scores[hf_subset] = self._evaluate_subset(
                model,
                data_split,
                hf_split=split,
                hf_subset=hf_subset,
                encode_kwargs=encode_kwargs,
                prediction_folder=prediction_folder,
                **kwargs,
            )
            self._add_main_score(scores[hf_subset])
        return scores

    @abstractmethod
    def _evaluate_subset(
        self,
        model: EncoderProtocol,
        data_split: Dataset,
        *,
        encode_kwargs: dict[str, Any],
        hf_split: str,
        hf_subset: str,
        prediction_folder: Path | None = None,
        **kwargs: Any,
    ) -> ScoresDict:
        raise NotImplementedError(
            "If you are using the default evaluate method, you must implement _evaluate_subset method."
        )

    def _save_task_predictions(
        self,
        predictions: dict[str, Any] | list[Any],
        model: MTEBModels,
        prediction_folder: Path,
        hf_split: str,
        hf_subset: str,
    ) -> None:
        """Saves the predictions of the model on the task to a json file.

        Args:
            predictions: Dictionary containing the predictions.
            model: The model used to generate the predictions.
            prediction_folder: The folder to save the predictions to.
            hf_split: The split of the dataset (e.g. "test").
            hf_subset: The subset of the dataset (e.g. "en").
        """
        predictions_path = self._predictions_path(prediction_folder)
        existing_results = {
            "mteb_model_meta": {
                "model_name": model.mteb_model_meta.name,
                "revision": model.mteb_model_meta.revision,
            }
        }
        if predictions_path.exists():
            with predictions_path.open("r") as predictions_file:
                existing_results = json.load(predictions_file)

        if hf_subset not in existing_results:
            existing_results[hf_subset] = {}

        existing_results[hf_subset][hf_split] = predictions
        with predictions_path.open("w") as predictions_file:
            json.dump(existing_results, predictions_file)

    def _predictions_path(
        self,
        output_folder: Path | str,
    ) -> Path:
        if isinstance(output_folder, str):
            output_folder = Path(output_folder)

        if not output_folder.exists():
            output_folder.mkdir(parents=True, exist_ok=True)
        return output_folder / self.prediction_file_name

    @property
    def prediction_file_name(self) -> str:
        """The name of the prediction file in format {task_name}_predictions.json"""
        return f"{self.metadata.name}_predictions.json"

    @staticmethod
    def stratified_subsampling(
        dataset_dict: DatasetDict,
        seed: int,
        splits: list[str] = ["test"],
        label: str = "label",
        n_samples: int = 2048,
    ) -> DatasetDict:
        """Subsamples the dataset with stratification by the supplied label.

        Args:
            dataset_dict: the DatasetDict object.
            seed: the random seed.
            splits: the splits of the dataset.
            label: the label with which the stratified sampling is based on.
            n_samples: Optional, number of samples to subsample. Default is max_n_samples.

        Returns:
            A subsampled DatasetDict object.
        """
        # Can only do this if the label column is of ClassLabel.
        if not isinstance(dataset_dict[splits[0]].features[label], ClassLabel):
            try:
                dataset_dict = dataset_dict.class_encode_column(label)
            except ValueError as e:
                if isinstance(dataset_dict[splits[0]][label][0], Sequence):
                    return _multilabel_subsampling(
                        dataset_dict, seed, splits, label, n_samples
                    )
                else:
                    raise e

        for split in splits:
            if n_samples >= len(dataset_dict[split]):
                logger.debug(
                    f"Subsampling not needed for split {split}, as n_samples is equal or greater than the number of samples."
                )
                continue
            dataset_dict.update(
                {
                    split: dataset_dict[split].train_test_split(
                        test_size=n_samples, seed=seed, stratify_by_column=label
                    )["test"]
                }
            )  # only take the specified test split.
        return dataset_dict

    def load_data(self) -> None:
        """Loads dataset from HuggingFace hub

        This is the main loading function for Task. Do not overwrite this, instead we recommend using `dataset_transform`, which is called after the
        dataset is loaded using `datasets.load_dataset`.
        """
        if self.data_loaded:
            return
        if self.metadata.is_multilingual:
            if self.fast_loading:
                self.fast_load()
            else:
                self.dataset = {}
                for hf_subset in self.hf_subsets:
                    self.dataset[hf_subset] = load_dataset(
                        name=hf_subset,
                        **self.metadata.dataset,
                    )
        else:
            # some of monolingual datasets explicitly adding the split name to the dataset name
            self.dataset = load_dataset(**self.metadata.dataset)  # type: ignore
        self.dataset_transform()
        self.data_loaded = True

    def fast_load(self) -> None:
        """**Deprecated**. Load all subsets at once, then group by language. Using fast loading has two requirements:

        - Each row in the dataset should have a 'lang' feature giving the corresponding language/language pair
        - The datasets must have a 'default' config that loads all the subsets of the dataset (see more [here](https://huggingface.co/docs/datasets/en/repository_structure#configurations))
        """
        self.dataset = {}
        merged_dataset = load_dataset(**self.metadata.dataset)  # load "default" subset
        for split in merged_dataset.keys():
            df_split = merged_dataset[split].to_polars()
            df_grouped = dict(df_split.group_by(["lang"]))
            for lang in set(df_split["lang"].unique()) & set(self.hf_subsets):
                self.dataset.setdefault(lang, {})
                self.dataset[lang][split] = Dataset.from_polars(
                    df_grouped[(lang,)].drop("lang")
                )  # Remove lang column and convert back to HF datasets, not strictly necessary but better for compatibility
        for lang, subset in self.dataset.items():
            self.dataset[lang] = DatasetDict(subset)

    def calculate_descriptive_statistics(
        self, overwrite_results: bool = False
    ) -> dict[str, DescriptiveStatistics]:
        """Calculates descriptive statistics from the dataset.

        Args:
            overwrite_results: Whether to overwrite existing results. If False and results already exist, the existing results will be loaded from cache.

        Returns:
            A dictionary containing descriptive statistics for each split.
        """
        from mteb.abstasks import AbsTaskClassification

        if self.metadata.descriptive_stat_path.exists() and not overwrite_results:
            logger.info("Loading metadata descriptive statistics from cache.")
            return self.metadata.descriptive_stats

        if not self.data_loaded:
            self.load_data()

        descriptive_stats: dict[str, DescriptiveStatistics] = {}
        hf_subset_stat = "hf_subset_descriptive_stats"
        eval_splits = self.metadata.eval_splits
        if isinstance(self, AbsTaskClassification):
            eval_splits.append(self.train_split)

        pbar_split = tqdm(eval_splits, desc="Processing Splits...")
        for split in pbar_split:
            pbar_split.set_postfix_str(f"Split: {split}")
            logger.info(f"Processing metadata for split {split}")
            if self.metadata.is_multilingual:
                descriptive_stats[split] = (
                    self._calculate_descriptive_statistics_from_split(
                        split, compute_overall=True
                    )
                )
                descriptive_stats[split][hf_subset_stat] = {}

                pbar_subsets = tqdm(
                    self.metadata.hf_subsets,
                    desc="Processing Languages...",
                )
                for hf_subset in pbar_subsets:
                    pbar_subsets.set_postfix_str(f"Huggingface subset: {hf_subset}")
                    logger.info(f"Processing metadata for subset {hf_subset}")
                    split_details = self._calculate_descriptive_statistics_from_split(
                        split, hf_subset
                    )
                    descriptive_stats[split][hf_subset_stat][hf_subset] = split_details
            else:
                split_details = self._calculate_descriptive_statistics_from_split(split)
                descriptive_stats[split] = split_details

        with self.metadata.descriptive_stat_path.open("w") as f:
            json.dump(descriptive_stats, f, indent=4)

        return descriptive_stats

    def calculate_metadata_metrics(
        self, overwrite_results: bool = False
    ) -> dict[str, DescriptiveStatistics]:
        """Old name of `calculate_descriptive_statistics`, kept for backward compatibility."""
        return self.calculate_descriptive_statistics(
            overwrite_results=overwrite_results
        )

    @abstractmethod
    def _calculate_descriptive_statistics_from_split(
        self, split: str, hf_subset: str | None = None, compute_overall: bool = False
    ) -> SplitDescriptiveStatistics:
        raise NotImplementedError

    @property
    def languages(self) -> list[str]:
        """Returns the languages of the task."""
        if self.hf_subsets:
            eval_langs = self.metadata.hf_subsets_to_langscripts
            languages = []

            for lang in self.hf_subsets:
                for langscript in eval_langs[lang]:
                    iso_lang, script = langscript.split("-")
                    languages.append(iso_lang)

            return sorted(set(languages))

        return self.metadata.languages

    def filter_eval_splits(self, eval_splits: list[str] | None) -> Self:
        """Filter the evaluation splits of the task.

        Args:
            eval_splits: A list of evaluation splits to keep. If None, all splits are kept.

        Returns:
            The filtered task
        """
        self._eval_splits = eval_splits
        return self

    def filter_languages(
        self,
        languages: list[str] | None,
        script: list[str] | None = None,
        hf_subsets: list[HFSubset] | None = None,
        exclusive_language_filter: bool = False,
    ) -> Self:
        """Filter the languages of the task.

        Args:
            languages: list of languages to filter the task by can be either a 3-letter langauge code (e.g. "eng") or also include the script
                (e.g. "eng-Latn")
            script: A list of scripts to filter the task by. Will be ignored if language code specified the script. If None, all scripts are included.
                If the language code does not specify the script the intersection of the language and script will be used.
            hf_subsets: A list of huggingface subsets to filter on. This is useful if a dataset have multiple subsets containing the desired language,
                but you only want to test on one. An example is STS22 which e.g. have both "en" and "de-en" which both contains English.
            exclusive_language_filter: Some datasets contains more than one language e.g. for STS22 the subset "de-en" contain eng and deu. If
                exclusive_language_filter is set to False both of these will be kept, but if set to True only those that contains all the languages
                specified will be kept.

        Returns:
            The filtered task
        """
        lang_scripts = LanguageScripts.from_languages_and_scripts(languages, script)

        subsets_to_keep = []

        for hf_subset, langs in self.metadata.hf_subsets_to_langscripts.items():
            if (hf_subsets is not None) and (hf_subset not in hf_subsets):
                continue
            if exclusive_language_filter is False:
                for langscript in langs:
                    if lang_scripts.contains_language(
                        langscript
                    ) or lang_scripts.contains_script(langscript):
                        subsets_to_keep.append(hf_subset)
                        break

            if exclusive_language_filter is True and languages:
                if lang_scripts.contains_languages(langs):
                    subsets_to_keep.append(hf_subset)

        self.hf_subsets = subsets_to_keep
        return self

    def _add_main_score(self, scores: dict[HFSubset, ScoresDict]) -> None:
        scores["main_score"] = scores[self.metadata.main_score]

    def _upload_dataset_to_hub(
        self, repo_name: str, fields: list[str] | dict[str, str]
    ) -> None:
        if self.metadata.is_multilingual:
            for config in self.metadata.eval_langs:
                logger.info(f"Converting {config} of {self.metadata.name}")
                sentences = {}
                for split in self.dataset[config]:
                    if isinstance(fields, dict):
                        sentences[split] = Dataset.from_dict(
                            {
                                mapped_name: self.dataset[config][split][original_name]
                                for original_name, mapped_name in fields.items()
                            }
                        )
                    else:
                        sentences[split] = Dataset.from_dict(
                            {
                                field: self.dataset[config][split][field]
                                for field in fields
                            }
                        )
                sentences = DatasetDict(sentences)
                sentences.push_to_hub(
                    repo_name, config, commit_message=f"Add {config} dataset"
                )
        else:
            sentences = {}
            for split in self.dataset:
                if isinstance(fields, dict):
                    sentences[split] = Dataset.from_dict(
                        {
                            mapped_name: self.dataset[split][original_name]
                            for original_name, mapped_name in fields.items()
                        }
                    )
                else:
                    sentences[split] = Dataset.from_dict(
                        {field: self.dataset[split][field] for field in fields}
                    )
            sentences = DatasetDict(sentences)
            sentences.push_to_hub(repo_name, commit_message="Add dataset")

    def _push_dataset_to_hub(self, repo_name: str) -> None:
        raise NotImplementedError

    def push_dataset_to_hub(self, repo_name: str) -> None:
        """Push the dataset to the HuggingFace Hub.

        Args:
            repo_name: The name of the repository to push the dataset to.

        Examples:
            >>> import mteb
            >>> task = mteb.get_task("Caltech101")
            >>> repo_name = f"myorg/{task.metadata.name}"
            >>> # Push the dataset to the Hub
            >>> task.push_dataset_to_hub(repo_name)
        """
        if not self.data_loaded:
            self.load_data()

        self._push_dataset_to_hub(repo_name)
        # dataset repo not creating when pushing card
        self.metadata.push_dataset_card_to_hub(repo_name)

    @property
    def is_aggregate(self) -> bool:
        """Whether the task is an aggregate of multiple tasks."""
        return False

    @property
    def eval_splits(self) -> list[str]:
        """Returns the evaluation splits of the task."""
        if self._eval_splits:
            return self._eval_splits
        return self.metadata.eval_splits

    @property
    def modalities(self) -> list[Modalities]:
        """Returns the modalities of the task."""
        return self.metadata.modalities

    def __repr__(self) -> str:
        # Format the representation of the task such that it appears as:
        # TaskObjectName(name='{name}', languages={lang1, lang2, ...})

        langs = self.languages
        if len(langs) > 3:
            langs = langs[:3]
            langs.append("...")
        return (
            f"{self.__class__.__name__}(name='{self.metadata.name}', languages={langs})"
        )

    def __hash__(self) -> int:
        return hash(self.metadata)

    def unload_data(self) -> None:
        """Unloads the dataset from memory"""
        if self.data_loaded:
            self.dataset = None
            self.data_loaded = False
            logger.info(f"Unloaded dataset {self.metadata.name} from memory.")
        else:
            logger.warning(
                f"Dataset {self.metadata.name} is not loaded, cannot unload it."
            )

    @property
    def superseded_by(self) -> str | None:
        """If the dataset is superseded by another dataset, return the name of the new dataset."""
        return self.metadata.superseded_by

eval_splits property

Returns the evaluation splits of the task.

is_aggregate property

Whether the task is an aggregate of multiple tasks.

languages property

Returns the languages of the task.

modalities property

Returns the modalities of the task.

prediction_file_name property

The name of the prediction file in format {task_name}_predictions.json

superseded_by property

If the dataset is superseded by another dataset, return the name of the new dataset.

__init__(seed=42, **kwargs)

The init function. This is called primarily to set the seed.

Parameters:

Name Type Description Default
seed int

An integer seed.

42
kwargs Any

arguments passed to subclasses.

{}
Source code in mteb/abstasks/abstask.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def __init__(self, seed: int = 42, **kwargs: Any) -> None:
    """The init function. This is called primarily to set the seed.

    Args:
        seed: An integer seed.
        kwargs: arguments passed to subclasses.
    """
    self.seed = seed
    self.rng_state, self.np_rng = _set_seed(seed)
    self.hf_subsets = self.metadata.hf_subsets

calculate_descriptive_statistics(overwrite_results=False)

Calculates descriptive statistics from the dataset.

Parameters:

Name Type Description Default
overwrite_results bool

Whether to overwrite existing results. If False and results already exist, the existing results will be loaded from cache.

False

Returns:

Type Description
dict[str, DescriptiveStatistics]

A dictionary containing descriptive statistics for each split.

Source code in mteb/abstasks/abstask.py
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
def calculate_descriptive_statistics(
    self, overwrite_results: bool = False
) -> dict[str, DescriptiveStatistics]:
    """Calculates descriptive statistics from the dataset.

    Args:
        overwrite_results: Whether to overwrite existing results. If False and results already exist, the existing results will be loaded from cache.

    Returns:
        A dictionary containing descriptive statistics for each split.
    """
    from mteb.abstasks import AbsTaskClassification

    if self.metadata.descriptive_stat_path.exists() and not overwrite_results:
        logger.info("Loading metadata descriptive statistics from cache.")
        return self.metadata.descriptive_stats

    if not self.data_loaded:
        self.load_data()

    descriptive_stats: dict[str, DescriptiveStatistics] = {}
    hf_subset_stat = "hf_subset_descriptive_stats"
    eval_splits = self.metadata.eval_splits
    if isinstance(self, AbsTaskClassification):
        eval_splits.append(self.train_split)

    pbar_split = tqdm(eval_splits, desc="Processing Splits...")
    for split in pbar_split:
        pbar_split.set_postfix_str(f"Split: {split}")
        logger.info(f"Processing metadata for split {split}")
        if self.metadata.is_multilingual:
            descriptive_stats[split] = (
                self._calculate_descriptive_statistics_from_split(
                    split, compute_overall=True
                )
            )
            descriptive_stats[split][hf_subset_stat] = {}

            pbar_subsets = tqdm(
                self.metadata.hf_subsets,
                desc="Processing Languages...",
            )
            for hf_subset in pbar_subsets:
                pbar_subsets.set_postfix_str(f"Huggingface subset: {hf_subset}")
                logger.info(f"Processing metadata for subset {hf_subset}")
                split_details = self._calculate_descriptive_statistics_from_split(
                    split, hf_subset
                )
                descriptive_stats[split][hf_subset_stat][hf_subset] = split_details
        else:
            split_details = self._calculate_descriptive_statistics_from_split(split)
            descriptive_stats[split] = split_details

    with self.metadata.descriptive_stat_path.open("w") as f:
        json.dump(descriptive_stats, f, indent=4)

    return descriptive_stats

calculate_metadata_metrics(overwrite_results=False)

Old name of calculate_descriptive_statistics, kept for backward compatibility.

Source code in mteb/abstasks/abstask.py
410
411
412
413
414
415
416
def calculate_metadata_metrics(
    self, overwrite_results: bool = False
) -> dict[str, DescriptiveStatistics]:
    """Old name of `calculate_descriptive_statistics`, kept for backward compatibility."""
    return self.calculate_descriptive_statistics(
        overwrite_results=overwrite_results
    )

check_if_dataset_is_superseded()

Check if the dataset is superseded by a newer version.

Source code in mteb/abstasks/abstask.py
102
103
104
105
106
107
def check_if_dataset_is_superseded(self) -> None:
    """Check if the dataset is superseded by a newer version."""
    if self.superseded_by:
        logger.warning(
            f"Dataset '{self.metadata.name}' is superseded by '{self.superseded_by}', you might consider using the newer version of the dataset."
        )

dataset_transform()

A transform operations applied to the dataset after loading.

This method is useful when the dataset from Huggingface is not in an mteb compatible format. Override this method if your dataset requires additional transformation.

Source code in mteb/abstasks/abstask.py
109
110
111
112
113
114
115
def dataset_transform(self):
    """A transform operations applied to the dataset after loading.

    This method is useful when the dataset from Huggingface is not in an `mteb` compatible format.
    Override this method if your dataset requires additional transformation.
    """
    pass

evaluate(model, split='test', subsets_to_run=None, *, encode_kwargs, prediction_folder=None, **kwargs)

Evaluates an MTEB compatible model on the task.

Parameters:

Name Type Description Default
model MTEBModels

MTEB compatible model. Implements a encode(sentences) method, that encodes sentences and returns an array of embeddings

required
split str

Which split (e.g. "test") to be used.

'test'
subsets_to_run list[HFSubset] | None

List of huggingface subsets (HFSubsets) to evaluate. If None, all subsets are evaluated.

None
encode_kwargs dict[str, Any]

Additional keyword arguments that are passed to the model's encode method.

required
prediction_folder Path | None

Folder to save model predictions

None
kwargs Any

Additional keyword arguments that are passed to the _evaluate_subset method.

{}

Returns:

Type Description
dict[HFSubset, ScoresDict]

A dictionary with the scores for each subset.

Raises:

Type Description
TypeError

If the model is a CrossEncoder and the task does not support CrossEncoders.

TypeError

If the model is a SearchProtocol and the task does not support Search.

Source code in mteb/abstasks/abstask.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def evaluate(
    self,
    model: MTEBModels,
    split: str = "test",
    subsets_to_run: list[HFSubset] | None = None,
    *,
    encode_kwargs: dict[str, Any],
    prediction_folder: Path | None = None,
    **kwargs: Any,
) -> dict[HFSubset, ScoresDict]:
    """Evaluates an MTEB compatible model on the task.

    Args:
        model: MTEB compatible model. Implements a encode(sentences) method, that encodes sentences and returns an array of embeddings
        split: Which split (e.g. *"test"*) to be used.
        subsets_to_run: List of huggingface subsets (HFSubsets) to evaluate. If None, all subsets are evaluated.
        encode_kwargs: Additional keyword arguments that are passed to the model's `encode` method.
        prediction_folder: Folder to save model predictions
        kwargs: Additional keyword arguments that are passed to the _evaluate_subset method.

    Returns:
        A dictionary with the scores for each subset.

    Raises:
        TypeError: If the model is a CrossEncoder and the task does not support CrossEncoders.
        TypeError: If the model is a SearchProtocol and the task does not support Search.
    """
    if isinstance(model, CrossEncoderProtocol) and not self._support_cross_encoder:
        raise TypeError(
            f"Model {model} is a CrossEncoder, but this task {self.metadata.name} does not support CrossEncoders. "
            "Please use a Encoder model instead."
        )

    # encoders might implement search protocols
    if (
        isinstance(model, SearchProtocol)
        and not isinstance(model, EncoderProtocol)
        and not self._support_search
    ):
        raise TypeError(
            f"Model {model} is a SearchProtocol, but this task {self.metadata.name} does not support Search. "
            "Please use a Encoder model instead."
        )

    if not self.data_loaded:
        self.load_data()

    self.dataset = cast(dict[HFSubset, DatasetDict], self.dataset)

    scores = {}
    if self.hf_subsets is None:
        hf_subsets = list(self.dataset.keys())
    else:
        hf_subsets = copy(self.hf_subsets)

    if subsets_to_run is not None:  # allow overwrites of pre-filtering
        hf_subsets = [s for s in hf_subsets if s in subsets_to_run]

    for hf_subset in hf_subsets:
        logger.info(
            f"Running task {self.metadata.name} ({split=}, {hf_subset=})..."
        )
        if hf_subset not in self.dataset and hf_subset == "default":
            data_split = self.dataset[split]
        else:
            data_split = self.dataset[hf_subset][split]
        scores[hf_subset] = self._evaluate_subset(
            model,
            data_split,
            hf_split=split,
            hf_subset=hf_subset,
            encode_kwargs=encode_kwargs,
            prediction_folder=prediction_folder,
            **kwargs,
        )
        self._add_main_score(scores[hf_subset])
    return scores

fast_load()

Deprecated. Load all subsets at once, then group by language. Using fast loading has two requirements:

  • Each row in the dataset should have a 'lang' feature giving the corresponding language/language pair
  • The datasets must have a 'default' config that loads all the subsets of the dataset (see more here)
Source code in mteb/abstasks/abstask.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
def fast_load(self) -> None:
    """**Deprecated**. Load all subsets at once, then group by language. Using fast loading has two requirements:

    - Each row in the dataset should have a 'lang' feature giving the corresponding language/language pair
    - The datasets must have a 'default' config that loads all the subsets of the dataset (see more [here](https://huggingface.co/docs/datasets/en/repository_structure#configurations))
    """
    self.dataset = {}
    merged_dataset = load_dataset(**self.metadata.dataset)  # load "default" subset
    for split in merged_dataset.keys():
        df_split = merged_dataset[split].to_polars()
        df_grouped = dict(df_split.group_by(["lang"]))
        for lang in set(df_split["lang"].unique()) & set(self.hf_subsets):
            self.dataset.setdefault(lang, {})
            self.dataset[lang][split] = Dataset.from_polars(
                df_grouped[(lang,)].drop("lang")
            )  # Remove lang column and convert back to HF datasets, not strictly necessary but better for compatibility
    for lang, subset in self.dataset.items():
        self.dataset[lang] = DatasetDict(subset)

filter_eval_splits(eval_splits)

Filter the evaluation splits of the task.

Parameters:

Name Type Description Default
eval_splits list[str] | None

A list of evaluation splits to keep. If None, all splits are kept.

required

Returns:

Type Description
Self

The filtered task

Source code in mteb/abstasks/abstask.py
440
441
442
443
444
445
446
447
448
449
450
def filter_eval_splits(self, eval_splits: list[str] | None) -> Self:
    """Filter the evaluation splits of the task.

    Args:
        eval_splits: A list of evaluation splits to keep. If None, all splits are kept.

    Returns:
        The filtered task
    """
    self._eval_splits = eval_splits
    return self

filter_languages(languages, script=None, hf_subsets=None, exclusive_language_filter=False)

Filter the languages of the task.

Parameters:

Name Type Description Default
languages list[str] | None

list of languages to filter the task by can be either a 3-letter langauge code (e.g. "eng") or also include the script (e.g. "eng-Latn")

required
script list[str] | None

A list of scripts to filter the task by. Will be ignored if language code specified the script. If None, all scripts are included. If the language code does not specify the script the intersection of the language and script will be used.

None
hf_subsets list[HFSubset] | None

A list of huggingface subsets to filter on. This is useful if a dataset have multiple subsets containing the desired language, but you only want to test on one. An example is STS22 which e.g. have both "en" and "de-en" which both contains English.

None
exclusive_language_filter bool

Some datasets contains more than one language e.g. for STS22 the subset "de-en" contain eng and deu. If exclusive_language_filter is set to False both of these will be kept, but if set to True only those that contains all the languages specified will be kept.

False

Returns:

Type Description
Self

The filtered task

Source code in mteb/abstasks/abstask.py
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
def filter_languages(
    self,
    languages: list[str] | None,
    script: list[str] | None = None,
    hf_subsets: list[HFSubset] | None = None,
    exclusive_language_filter: bool = False,
) -> Self:
    """Filter the languages of the task.

    Args:
        languages: list of languages to filter the task by can be either a 3-letter langauge code (e.g. "eng") or also include the script
            (e.g. "eng-Latn")
        script: A list of scripts to filter the task by. Will be ignored if language code specified the script. If None, all scripts are included.
            If the language code does not specify the script the intersection of the language and script will be used.
        hf_subsets: A list of huggingface subsets to filter on. This is useful if a dataset have multiple subsets containing the desired language,
            but you only want to test on one. An example is STS22 which e.g. have both "en" and "de-en" which both contains English.
        exclusive_language_filter: Some datasets contains more than one language e.g. for STS22 the subset "de-en" contain eng and deu. If
            exclusive_language_filter is set to False both of these will be kept, but if set to True only those that contains all the languages
            specified will be kept.

    Returns:
        The filtered task
    """
    lang_scripts = LanguageScripts.from_languages_and_scripts(languages, script)

    subsets_to_keep = []

    for hf_subset, langs in self.metadata.hf_subsets_to_langscripts.items():
        if (hf_subsets is not None) and (hf_subset not in hf_subsets):
            continue
        if exclusive_language_filter is False:
            for langscript in langs:
                if lang_scripts.contains_language(
                    langscript
                ) or lang_scripts.contains_script(langscript):
                    subsets_to_keep.append(hf_subset)
                    break

        if exclusive_language_filter is True and languages:
            if lang_scripts.contains_languages(langs):
                subsets_to_keep.append(hf_subset)

    self.hf_subsets = subsets_to_keep
    return self

load_data()

Loads dataset from HuggingFace hub

This is the main loading function for Task. Do not overwrite this, instead we recommend using dataset_transform, which is called after the dataset is loaded using datasets.load_dataset.

Source code in mteb/abstasks/abstask.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
def load_data(self) -> None:
    """Loads dataset from HuggingFace hub

    This is the main loading function for Task. Do not overwrite this, instead we recommend using `dataset_transform`, which is called after the
    dataset is loaded using `datasets.load_dataset`.
    """
    if self.data_loaded:
        return
    if self.metadata.is_multilingual:
        if self.fast_loading:
            self.fast_load()
        else:
            self.dataset = {}
            for hf_subset in self.hf_subsets:
                self.dataset[hf_subset] = load_dataset(
                    name=hf_subset,
                    **self.metadata.dataset,
                )
    else:
        # some of monolingual datasets explicitly adding the split name to the dataset name
        self.dataset = load_dataset(**self.metadata.dataset)  # type: ignore
    self.dataset_transform()
    self.data_loaded = True

push_dataset_to_hub(repo_name)

Push the dataset to the HuggingFace Hub.

Parameters:

Name Type Description Default
repo_name str

The name of the repository to push the dataset to.

required

Examples:

>>> import mteb
>>> task = mteb.get_task("Caltech101")
>>> repo_name = f"myorg/{task.metadata.name}"
>>> # Push the dataset to the Hub
>>> task.push_dataset_to_hub(repo_name)
Source code in mteb/abstasks/abstask.py
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
def push_dataset_to_hub(self, repo_name: str) -> None:
    """Push the dataset to the HuggingFace Hub.

    Args:
        repo_name: The name of the repository to push the dataset to.

    Examples:
        >>> import mteb
        >>> task = mteb.get_task("Caltech101")
        >>> repo_name = f"myorg/{task.metadata.name}"
        >>> # Push the dataset to the Hub
        >>> task.push_dataset_to_hub(repo_name)
    """
    if not self.data_loaded:
        self.load_data()

    self._push_dataset_to_hub(repo_name)
    # dataset repo not creating when pushing card
    self.metadata.push_dataset_card_to_hub(repo_name)

stratified_subsampling(dataset_dict, seed, splits=['test'], label='label', n_samples=2048) staticmethod

Subsamples the dataset with stratification by the supplied label.

Parameters:

Name Type Description Default
dataset_dict DatasetDict

the DatasetDict object.

required
seed int

the random seed.

required
splits list[str]

the splits of the dataset.

['test']
label str

the label with which the stratified sampling is based on.

'label'
n_samples int

Optional, number of samples to subsample. Default is max_n_samples.

2048

Returns:

Type Description
DatasetDict

A subsampled DatasetDict object.

Source code in mteb/abstasks/abstask.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
@staticmethod
def stratified_subsampling(
    dataset_dict: DatasetDict,
    seed: int,
    splits: list[str] = ["test"],
    label: str = "label",
    n_samples: int = 2048,
) -> DatasetDict:
    """Subsamples the dataset with stratification by the supplied label.

    Args:
        dataset_dict: the DatasetDict object.
        seed: the random seed.
        splits: the splits of the dataset.
        label: the label with which the stratified sampling is based on.
        n_samples: Optional, number of samples to subsample. Default is max_n_samples.

    Returns:
        A subsampled DatasetDict object.
    """
    # Can only do this if the label column is of ClassLabel.
    if not isinstance(dataset_dict[splits[0]].features[label], ClassLabel):
        try:
            dataset_dict = dataset_dict.class_encode_column(label)
        except ValueError as e:
            if isinstance(dataset_dict[splits[0]][label][0], Sequence):
                return _multilabel_subsampling(
                    dataset_dict, seed, splits, label, n_samples
                )
            else:
                raise e

    for split in splits:
        if n_samples >= len(dataset_dict[split]):
            logger.debug(
                f"Subsampling not needed for split {split}, as n_samples is equal or greater than the number of samples."
            )
            continue
        dataset_dict.update(
            {
                split: dataset_dict[split].train_test_split(
                    test_size=n_samples, seed=seed, stratify_by_column=label
                )["test"]
            }
        )  # only take the specified test split.
    return dataset_dict

unload_data()

Unloads the dataset from memory

Source code in mteb/abstasks/abstask.py
598
599
600
601
602
603
604
605
606
607
def unload_data(self) -> None:
    """Unloads the dataset from memory"""
    if self.data_loaded:
        self.dataset = None
        self.data_loaded = False
        logger.info(f"Unloaded dataset {self.metadata.name} from memory.")
    else:
        logger.warning(
            f"Dataset {self.metadata.name} is not loaded, cannot unload it."
        )

Multimodal Tasks

Tasks that support any modality (text, image, etc.) inherit from the following abstract class. Retrieval tasks support multimodal input (e.g. image + text queries and image corpus or vice versa).

mteb.abstasks.retrieval.AbsTaskRetrieval

Bases: AbsTask

Abstract class for retrieval experiments.

Attributes:

Name Type Description
dataset dict[str, dict[str, RetrievalSplitData]]

A nested dictionary where the first key is the subset (language or "default"), the second key is the split (e.g., "train", "test"), and the value is a RetrievalSplitData object.

ignore_identical_ids bool

If True, identical IDs in queries and corpus are ignored during evaluation.

k_values Sequence[int]

A sequence of integers representing the k values for evaluation metrics.

skip_first_result bool

If True, the first result is skipped during evaluation

abstask_prompt

Prompt to use for the task for instruction model if not prompt is provided in TaskMetadata.prompt.

Source code in mteb/abstasks/retrieval.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
class AbsTaskRetrieval(AbsTask):
    """Abstract class for retrieval experiments.

    Attributes:
        dataset: A nested dictionary where the first key is the subset (language or "default"),
                 the second key is the split (e.g., "train", "test"), and the value is a RetrievalSplitData object.
        ignore_identical_ids: If True, identical IDs in queries and corpus are ignored during evaluation.
        k_values: A sequence of integers representing the k values for evaluation metrics.
        skip_first_result: If True, the first result is skipped during evaluation
        abstask_prompt: Prompt to use for the task for instruction model if not prompt is provided in TaskMetadata.prompt.
    """

    ignore_identical_ids: bool = False
    abstask_prompt = "Retrieve text based on user query."
    k_values: Sequence[int] = (1, 3, 5, 10, 20, 100, 1000)
    _top_k: int = max(k_values)
    dataset: dict[str, dict[str, RetrievalSplitData]]
    _support_cross_encoder: bool = True
    _support_search: bool = True
    _previous_results_model_meta: dict[str, Any] | None = None
    skip_first_result: bool = False

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        empty_dataset = Dataset.from_dict({})
        self.dataset = defaultdict(
            lambda: defaultdict(
                lambda: RetrievalSplitData(
                    corpus=empty_dataset,
                    queries=empty_dataset,
                    relevant_docs={},
                    top_ranked=None,
                )
            )
        )

    def convert_v1_dataset_format_to_v2(self):
        """Convert dataset from v1 (from `self.queries`, `self.document`) format to v2 format (`self.dotaset`)."""
        # check if dataset is `v1` version
        if not hasattr(self, "queries"):
            return
        empty_dataset = Dataset.from_dict({})

        self.dataset = defaultdict(
            lambda: defaultdict(
                lambda: RetrievalSplitData(
                    corpus=empty_dataset,
                    queries=empty_dataset,
                    relevant_docs={},
                    top_ranked=None,
                )
            )
        )

        def _process_split(
            ds_queries: dict | Dataset, ds_corpus: dict | Dataset
        ) -> tuple[Dataset, Dataset]:
            if isinstance(ds_queries, dict):
                queries = Dataset.from_list(
                    [{"id": k, "text": v} for k, v in ds_queries.items()]
                )
            elif isinstance(ds_queries, Dataset):
                queries = ds_queries
            else:
                raise ValueError(f"Can't convert queries of type {type(ds_queries)}")

            if isinstance(ds_corpus, dict):
                corpus = Dataset.from_list(
                    [
                        {
                            "id": k,
                            "text": v if isinstance(v, str) else v["text"],
                            "title": v.get("title", "") if isinstance(v, dict) else "",
                        }
                        for k, v in ds_corpus.items()
                    ]
                )
            elif isinstance(ds_corpus, Dataset):
                corpus = ds_corpus
            else:
                raise ValueError(f"Can't convert corpus of type {type(ds_corpus)}")
            return queries, corpus

        if self.metadata.is_multilingual:
            for subset in self.queries:
                for split in self.queries[subset]:
                    queries = self.queries[subset][split]
                    corpus = self.corpus[subset][split]

                    (
                        self.dataset[subset][split]["queries"],
                        self.dataset[subset][split]["corpus"],
                    ) = _process_split(queries, corpus)

                    self.dataset[subset][split]["relevant_docs"] = self.relevant_docs[
                        subset
                    ][split]
                    if hasattr(self, "instructions"):
                        instructions = self.instructions[subset][split]
                        self.dataset[subset][split]["queries"] = (
                            _combine_queries_with_instructions_datasets(
                                self.dataset[subset][split]["queries"],
                                instructions,
                            )
                        )
                    if hasattr(self, "top_ranked"):
                        self.dataset[subset][split]["top_ranked"] = self.top_ranked[
                            subset
                        ][split]
        else:
            subset = "default"
            for split in self.queries:
                queries = self.queries[split]
                corpus = self.corpus[split]
                (
                    self.dataset[subset][split]["queries"],
                    self.dataset[subset][split]["corpus"],
                ) = _process_split(queries, corpus)

                self.dataset[subset][split]["relevant_docs"] = self.relevant_docs[
                    split
                ].copy()
                if hasattr(self, "instructions"):
                    instructions = self.instructions[split]
                    self.dataset[subset][split]["queries"] = (
                        _combine_queries_with_instructions_datasets(
                            self.dataset[subset][split]["queries"],
                            instructions,
                        )
                    )
                if hasattr(self, "top_ranked"):
                    self.dataset[subset][split]["top_ranked"] = self.top_ranked[
                        split
                    ].copy()

        del self.queries
        del self.corpus
        del self.relevant_docs
        if hasattr(self, "instructions"):
            del self.instructions
        if hasattr(self, "top_ranked"):
            del self.top_ranked

    def load_data(self) -> None:
        """Load the dataset for the retrieval task."""
        if self.data_loaded:
            return

        dataset_path = self.metadata.dataset["path"]
        eval_splits = self.metadata.eval_splits
        trust_remote_code = self.metadata.dataset.get("trust_remote_code", False)
        revision = self.metadata.dataset["revision"]

        def _process_data(split: str, hf_subset: str = "default"):
            """Helper function to load and process data for a given split and language"""
            logger.debug(
                f"Loading {split} split for {hf_subset} subset of {self.metadata.name}"
            )

            self.dataset[hf_subset][split] = RetrievalDatasetLoader(
                hf_repo=dataset_path,
                revision=revision,
                trust_remote_code=trust_remote_code,
                split=split,
                config=hf_subset,
            ).load()

        if self.metadata.is_multilingual:
            for lang in self.metadata.eval_langs:
                for split in eval_splits:
                    _process_data(split, lang)
        else:
            for split in eval_splits:
                _process_data(split)
        self.dataset_transform()
        self.data_loaded = True

    def evaluate(
        self,
        model: MTEBModels,
        split: str = "test",
        subsets_to_run: list[HFSubset] | None = None,
        *,
        encode_kwargs: dict[str, Any],
        prediction_folder: Path | None = None,
        **kwargs,
    ) -> dict[HFSubset, ScoresDict]:
        """Evaluate the model on the retrieval task.

        Args:
            model: Model to evaluate. Model should implement the [SearchProtocol][mteb.models.models_protocols.SearchProtocol]
                or be an [Encoder][mteb.models.models_protocols.EncoderProtocol] or [CrossEncoderProtocol][mteb.models.models_protocols.CrossEncoderProtocol].
            split: Split to evaluate on
            subsets_to_run: Optional list of subsets to evaluate on
            encode_kwargs: Keyword arguments passed to the encoder
            prediction_folder: Folder to save model predictions
            **kwargs: Additional keyword arguments passed to the evaluator


        Returns:
            Dictionary mapping subsets to their evaluation scores
        """
        if not self.data_loaded:
            self.load_data()
        # TODO: convert all tasks directly https://github.com/embeddings-benchmark/mteb/issues/2030
        self.convert_v1_dataset_format_to_v2()

        return super().evaluate(
            model,
            split,
            subsets_to_run,
            encode_kwargs=encode_kwargs,
            prediction_folder=prediction_folder,
            **kwargs,
        )

    def _evaluate_subset(
        self,
        model: MTEBModels,
        data_split: RetrievalSplitData,
        encode_kwargs: dict[str, Any],
        hf_split: str,
        hf_subset: str,
        prediction_folder: Path | None = None,
        **kwargs,
    ) -> ScoresDict:
        """Evaluate a model on a specific subset of the data.

        Args:
            model: Model to evaluate
            data_split: Data split to evaluate on
            encode_kwargs: Keyword arguments passed to the encoder
            hf_split: Split to evaluate on
            hf_subset: Subset to evaluate on
            prediction_folder: Folder with results prediction
            **kwargs: Additional keyword arguments passed to the evaluator

        Returns:
            Dictionary of evaluation scores
        """
        # ensure queries format (see #3030)
        data_split["relevant_docs"], data_split["queries"] = (
            _filter_queries_without_positives(
                data_split["relevant_docs"], data_split["queries"]
            )
        )
        retriever = RetrievalEvaluator(
            corpus=data_split["corpus"],
            queries=data_split["queries"],
            task_metadata=self.metadata,
            hf_split=hf_split,
            hf_subset=hf_subset,
            top_ranked=data_split["top_ranked"],
            top_k=self._top_k,
            **kwargs,
        )

        if isinstance(model, EncoderProtocol) and not isinstance(model, SearchProtocol):
            search_model = SearchEncoderWrapper(model)
        elif isinstance(model, CrossEncoderProtocol):
            search_model = SearchCrossEncoderWrapper(model)
        elif isinstance(model, SearchProtocol):
            search_model = model
        else:
            raise TypeError(
                f"RetrievalEvaluator expects a SearchInterface, Encoder, or CrossEncoder, got {type(model)}"
            )

        start_time = time()
        results = retriever(
            search_model,
            encode_kwargs=encode_kwargs,
        )
        end_time = time()
        logger.debug(
            f"Running retrieval task - Time taken to retrieve: {end_time - start_time:.2f} seconds"
        )

        if prediction_folder:
            self._save_task_predictions(
                results,
                model,
                prediction_folder,
                hf_subset=hf_subset,
                hf_split=hf_split,
            )

        logger.info("Running retrieval task - Evaluating retrieval scores...")
        (
            all_scores,
            ndcg,
            _map,
            recall,
            precision,
            naucs,
            mrr,
            naucs_mrr,
            cv_recall,
        ) = retriever.evaluate(
            data_split["relevant_docs"],
            results,
            self.k_values,
            ignore_identical_ids=self.ignore_identical_ids,
            skip_first_result=self.skip_first_result,
        )
        task_specific_scores = self.task_specific_scores(
            all_scores,
            data_split["relevant_docs"],
            results,
            hf_split=hf_split,
            hf_subset=hf_subset,
        )
        logger.info("Running retrieval task - Finished.")
        return make_score_dict(
            ndcg,
            _map,
            recall,
            precision,
            mrr,
            naucs,
            naucs_mrr,
            cv_recall,
            task_specific_scores,
            self._previous_results_model_meta,
        )

    def task_specific_scores(
        self,
        scores: dict[str, dict[str, float]],
        qrels: RelevantDocumentsType,
        results: dict[str, dict[str, float]],
        hf_split: str,
        hf_subset: str,
    ) -> dict[str, float]:
        """Calculate task specific scores. Override in subclass if needed.

        Args:
            scores: Dictionary of scores
            qrels: Relevant documents
            results: Retrieval results
            hf_split: Split to evaluate on
            hf_subset: Subset to evaluate on
        """
        return {}

    def _calculate_descriptive_statistics_from_split(
        self, split: str, hf_subset: str | None = None, compute_overall: bool = False
    ) -> RetrievalDescriptiveStatistics:
        self.convert_v1_dataset_format_to_v2()
        if hf_subset and hf_subset in self.dataset:
            split_data = self.dataset[hf_subset][split]
            queries = split_data["queries"]
            corpus = split_data["corpus"]
            relevant_docs = split_data["relevant_docs"]
            top_ranked = split_data["top_ranked"]
        elif compute_overall:
            queries = None
            corpus = None
            relevant_docs = {}
            top_ranked = {}
            for hf_subset in self.metadata.eval_langs:
                split_data = self.dataset[hf_subset][split]
                if queries is None:
                    queries = split_data["queries"]
                else:
                    queries = concatenate_datasets([queries, split_data["queries"]])
                if corpus is None:
                    corpus = split_data["corpus"]
                else:
                    corpus = concatenate_datasets([corpus, split_data["corpus"]])

                relevant_docs.update(
                    _process_relevant_docs(
                        split_data["relevant_docs"], hf_subset, split
                    )
                )

                if "top_ranked" in split_data and split_data["top_ranked"] is not None:
                    top_ranked.update(
                        {
                            f"{split}_{hf_subset}_{k}": v
                            for k, v in split_data["top_ranked"].items()
                        }
                    )
        else:
            if "default" in self.dataset and split != "default":
                return self._calculate_descriptive_statistics_from_split(
                    split=split, hf_subset="default"
                )
            split_data = self.dataset["default"][split]
            queries = split_data["queries"]
            corpus = split_data["corpus"]
            relevant_docs = split_data["relevant_docs"]
            top_ranked = split_data["top_ranked"]

        num_documents = len(corpus)
        num_queries = len(queries)

        if self.metadata.category is None:
            queries_modalities = "t"
            corpus_modalities = "t"
        else:
            queries_modalities, corpus_modalities = self.metadata.category.split("2")

        number_of_characters = 0

        documents_text_statistics = None
        documents_image_statistics = None
        queries_text_statistics = None
        queries_image_statistics = None

        if "t" in corpus_modalities:
            corpus_texts = corpus.map(_corpus_to_dict)["text"]
            documents_text_statistics = calculate_text_statistics(corpus_texts)
            number_of_characters += documents_text_statistics["total_text_length"]

        if "i" in corpus_modalities:
            documents_image_statistics = calculate_image_statistics(corpus["image"])

        if "t" in queries_modalities:
            queries_ = queries
            if "instruction" in queries_[0]:
                queries_ = queries_.map(_combine_queries_with_instruction_text)

            if isinstance(queries_["text"][0], dict | list):
                queries_ = queries_.map(_convert_conv_history_to_query)
            queries_text_statistics = calculate_text_statistics(queries_["text"])

            number_of_characters += queries_text_statistics["total_text_length"]

        if "i" in queries_modalities:
            queries_image_statistics = calculate_image_statistics(queries["image"])

        relevant_docs_statistics = calculate_relevant_docs_statistics(relevant_docs)

        if top_ranked is not None and num_queries and len(top_ranked) > 0:
            top_ranked_statistics = calculate_top_ranked_statistics(
                top_ranked, num_queries
            )
        else:
            top_ranked_statistics = None

        return RetrievalDescriptiveStatistics(
            num_samples=num_documents + num_queries,
            number_of_characters=number_of_characters,
            documents_text_statistics=documents_text_statistics,
            documents_image_statistics=documents_image_statistics,
            queries_text_statistics=queries_text_statistics,
            queries_image_statistics=queries_image_statistics,
            relevant_docs_statistics=relevant_docs_statistics,
            top_ranked_statistics=top_ranked_statistics,
        )

    def _push_dataset_to_hub(self, repo_name: str) -> None:
        self.convert_v1_dataset_format_to_v2()

        def _push_section(
            data: dict[str, RetrievalSplitData],
            subset_item: Literal["corpus", "queries", "relevant_docs", "top_ranked"],
            hf_subset_name: str,
            converter: Callable[[Any, Any], dict[str, Any]] | None = None,
        ) -> None:
            """Helper function to push dataset

            Args:
                data: Dataset with all items
                subset_item: Select which part to take. E. g. corpus, queries etc
                hf_subset_name: Name of the current item on HF
                converter: Function to convert dict to datasets format
            """
            sections = {}
            for split in data.keys():
                # skip empty instructions and top ranked
                if subset_item not in data[split] or data[split][subset_item] is None:
                    continue
                if isinstance(data[split][subset_item], Dataset):
                    sections[split] = data[split][subset_item]
                elif converter is not None:
                    sections[split] = Dataset.from_list(
                        [
                            converter(idx, item)
                            for idx, item in data[split][subset_item].items()
                        ]
                    )
                else:
                    raise ValueError(
                        f"Unexpected subset item type {subset_item} without converter"
                    )
            if len(sections) > 0:
                DatasetDict(sections).push_to_hub(
                    repo_name,
                    hf_subset_name,
                    commit_message=f"Add {hf_subset_name}-{subset_item}",
                )

        for subset in self.dataset:
            logger.info(f"Converting {subset} of {self.metadata.name}")
            _push_section(
                self.dataset[subset],
                "queries",
                f"{subset}-queries" if subset != "default" else "queries",
            )
            _push_section(
                self.dataset[subset],
                "corpus",
                f"{subset}-corpus" if subset != "default" else "corpus",
            )
            # Handle relevant_docs separately since one entry expands to multiple records.
            relevant_sections = {}
            for split, values in self.dataset[subset].items():
                relevant_docs = values["relevant_docs"]
                entries = []
                for query_id, docs in relevant_docs.items():
                    for doc_id, score in docs.items():
                        entries.append(
                            {
                                "query-id": query_id,
                                "corpus-id": doc_id,
                                "score": score,
                            }
                        )
                relevant_sections[split] = Dataset.from_list(entries)
            DatasetDict(relevant_sections).push_to_hub(
                repo_name,
                f"{subset}-qrels" if subset != "default" else "qrels",
                commit_message=f"Add {subset}-qrels",
            )

            _push_section(
                self.dataset[subset],
                "top_ranked",
                f"{subset}-top_ranked" if subset != "default" else "top_ranked",
                lambda idx, docs: {"query-id": idx, "corpus-ids": docs},
            )

    def convert_to_reranking(
        self,
        top_ranked_path: str | Path,
        top_k: int = 10,
    ) -> Self:
        """Converts a reranking task to re-ranking by loading predictions from previous model run where the `prediction_folder` was specified.

        Args:
            top_ranked_path: Path to file or folder with the top ranked predictions.
            top_k: Number of results to load.

        Returns:
            The current task reformulated as a reranking task

        Raises:
            FileNotFoundError: If the specified path does not exist.
            ValueError: If the loaded top ranked results are not in the expected format.
        """
        top_ranked_path = Path(top_ranked_path)
        if top_ranked_path.is_dir():
            top_ranked_path = self._predictions_path(top_ranked_path)

        if not top_ranked_path.exists():
            raise FileNotFoundError(
                f"Can't find previous results for this task. File {top_ranked_path} does not exist."
            )

        with top_ranked_path.open("r") as previous_results_file:
            previous_results = json.load(previous_results_file)

        if not self.data_loaded:
            self.load_data()

        self._previous_results_model_meta = previous_results["mteb_model_meta"]

        for subset in self.dataset:
            for split in self.dataset[subset]:
                top_ranked: RetrievalOutputType = previous_results[subset][split]
                if not isinstance(top_ranked, dict):
                    raise ValueError("Previous top ranked results is not a dictionary.")

                top_k_sorted = defaultdict(list)
                for query_id, values in top_ranked.items():
                    sorted_keys = sorted(values, key=values.get, reverse=True)
                    top_k_sorted[query_id] = sorted_keys[: self._top_k]

                self.dataset[subset][split]["top_ranked"] = top_k_sorted
        self._top_k = top_k
        return self

convert_to_reranking(top_ranked_path, top_k=10)

Converts a reranking task to re-ranking by loading predictions from previous model run where the prediction_folder was specified.

Parameters:

Name Type Description Default
top_ranked_path str | Path

Path to file or folder with the top ranked predictions.

required
top_k int

Number of results to load.

10

Returns:

Type Description
Self

The current task reformulated as a reranking task

Raises:

Type Description
FileNotFoundError

If the specified path does not exist.

ValueError

If the loaded top ranked results are not in the expected format.

Source code in mteb/abstasks/retrieval.py
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
def convert_to_reranking(
    self,
    top_ranked_path: str | Path,
    top_k: int = 10,
) -> Self:
    """Converts a reranking task to re-ranking by loading predictions from previous model run where the `prediction_folder` was specified.

    Args:
        top_ranked_path: Path to file or folder with the top ranked predictions.
        top_k: Number of results to load.

    Returns:
        The current task reformulated as a reranking task

    Raises:
        FileNotFoundError: If the specified path does not exist.
        ValueError: If the loaded top ranked results are not in the expected format.
    """
    top_ranked_path = Path(top_ranked_path)
    if top_ranked_path.is_dir():
        top_ranked_path = self._predictions_path(top_ranked_path)

    if not top_ranked_path.exists():
        raise FileNotFoundError(
            f"Can't find previous results for this task. File {top_ranked_path} does not exist."
        )

    with top_ranked_path.open("r") as previous_results_file:
        previous_results = json.load(previous_results_file)

    if not self.data_loaded:
        self.load_data()

    self._previous_results_model_meta = previous_results["mteb_model_meta"]

    for subset in self.dataset:
        for split in self.dataset[subset]:
            top_ranked: RetrievalOutputType = previous_results[subset][split]
            if not isinstance(top_ranked, dict):
                raise ValueError("Previous top ranked results is not a dictionary.")

            top_k_sorted = defaultdict(list)
            for query_id, values in top_ranked.items():
                sorted_keys = sorted(values, key=values.get, reverse=True)
                top_k_sorted[query_id] = sorted_keys[: self._top_k]

            self.dataset[subset][split]["top_ranked"] = top_k_sorted
    self._top_k = top_k
    return self

convert_v1_dataset_format_to_v2()

Convert dataset from v1 (from self.queries, self.document) format to v2 format (self.dotaset).

Source code in mteb/abstasks/retrieval.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def convert_v1_dataset_format_to_v2(self):
    """Convert dataset from v1 (from `self.queries`, `self.document`) format to v2 format (`self.dotaset`)."""
    # check if dataset is `v1` version
    if not hasattr(self, "queries"):
        return
    empty_dataset = Dataset.from_dict({})

    self.dataset = defaultdict(
        lambda: defaultdict(
            lambda: RetrievalSplitData(
                corpus=empty_dataset,
                queries=empty_dataset,
                relevant_docs={},
                top_ranked=None,
            )
        )
    )

    def _process_split(
        ds_queries: dict | Dataset, ds_corpus: dict | Dataset
    ) -> tuple[Dataset, Dataset]:
        if isinstance(ds_queries, dict):
            queries = Dataset.from_list(
                [{"id": k, "text": v} for k, v in ds_queries.items()]
            )
        elif isinstance(ds_queries, Dataset):
            queries = ds_queries
        else:
            raise ValueError(f"Can't convert queries of type {type(ds_queries)}")

        if isinstance(ds_corpus, dict):
            corpus = Dataset.from_list(
                [
                    {
                        "id": k,
                        "text": v if isinstance(v, str) else v["text"],
                        "title": v.get("title", "") if isinstance(v, dict) else "",
                    }
                    for k, v in ds_corpus.items()
                ]
            )
        elif isinstance(ds_corpus, Dataset):
            corpus = ds_corpus
        else:
            raise ValueError(f"Can't convert corpus of type {type(ds_corpus)}")
        return queries, corpus

    if self.metadata.is_multilingual:
        for subset in self.queries:
            for split in self.queries[subset]:
                queries = self.queries[subset][split]
                corpus = self.corpus[subset][split]

                (
                    self.dataset[subset][split]["queries"],
                    self.dataset[subset][split]["corpus"],
                ) = _process_split(queries, corpus)

                self.dataset[subset][split]["relevant_docs"] = self.relevant_docs[
                    subset
                ][split]
                if hasattr(self, "instructions"):
                    instructions = self.instructions[subset][split]
                    self.dataset[subset][split]["queries"] = (
                        _combine_queries_with_instructions_datasets(
                            self.dataset[subset][split]["queries"],
                            instructions,
                        )
                    )
                if hasattr(self, "top_ranked"):
                    self.dataset[subset][split]["top_ranked"] = self.top_ranked[
                        subset
                    ][split]
    else:
        subset = "default"
        for split in self.queries:
            queries = self.queries[split]
            corpus = self.corpus[split]
            (
                self.dataset[subset][split]["queries"],
                self.dataset[subset][split]["corpus"],
            ) = _process_split(queries, corpus)

            self.dataset[subset][split]["relevant_docs"] = self.relevant_docs[
                split
            ].copy()
            if hasattr(self, "instructions"):
                instructions = self.instructions[split]
                self.dataset[subset][split]["queries"] = (
                    _combine_queries_with_instructions_datasets(
                        self.dataset[subset][split]["queries"],
                        instructions,
                    )
                )
            if hasattr(self, "top_ranked"):
                self.dataset[subset][split]["top_ranked"] = self.top_ranked[
                    split
                ].copy()

    del self.queries
    del self.corpus
    del self.relevant_docs
    if hasattr(self, "instructions"):
        del self.instructions
    if hasattr(self, "top_ranked"):
        del self.top_ranked

evaluate(model, split='test', subsets_to_run=None, *, encode_kwargs, prediction_folder=None, **kwargs)

Evaluate the model on the retrieval task.

Parameters:

Name Type Description Default
model MTEBModels

Model to evaluate. Model should implement the SearchProtocol or be an Encoder or CrossEncoderProtocol.

required
split str

Split to evaluate on

'test'
subsets_to_run list[HFSubset] | None

Optional list of subsets to evaluate on

None
encode_kwargs dict[str, Any]

Keyword arguments passed to the encoder

required
prediction_folder Path | None

Folder to save model predictions

None
**kwargs

Additional keyword arguments passed to the evaluator

{}

Returns:

Type Description
dict[HFSubset, ScoresDict]

Dictionary mapping subsets to their evaluation scores

Source code in mteb/abstasks/retrieval.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def evaluate(
    self,
    model: MTEBModels,
    split: str = "test",
    subsets_to_run: list[HFSubset] | None = None,
    *,
    encode_kwargs: dict[str, Any],
    prediction_folder: Path | None = None,
    **kwargs,
) -> dict[HFSubset, ScoresDict]:
    """Evaluate the model on the retrieval task.

    Args:
        model: Model to evaluate. Model should implement the [SearchProtocol][mteb.models.models_protocols.SearchProtocol]
            or be an [Encoder][mteb.models.models_protocols.EncoderProtocol] or [CrossEncoderProtocol][mteb.models.models_protocols.CrossEncoderProtocol].
        split: Split to evaluate on
        subsets_to_run: Optional list of subsets to evaluate on
        encode_kwargs: Keyword arguments passed to the encoder
        prediction_folder: Folder to save model predictions
        **kwargs: Additional keyword arguments passed to the evaluator


    Returns:
        Dictionary mapping subsets to their evaluation scores
    """
    if not self.data_loaded:
        self.load_data()
    # TODO: convert all tasks directly https://github.com/embeddings-benchmark/mteb/issues/2030
    self.convert_v1_dataset_format_to_v2()

    return super().evaluate(
        model,
        split,
        subsets_to_run,
        encode_kwargs=encode_kwargs,
        prediction_folder=prediction_folder,
        **kwargs,
    )

load_data()

Load the dataset for the retrieval task.

Source code in mteb/abstasks/retrieval.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def load_data(self) -> None:
    """Load the dataset for the retrieval task."""
    if self.data_loaded:
        return

    dataset_path = self.metadata.dataset["path"]
    eval_splits = self.metadata.eval_splits
    trust_remote_code = self.metadata.dataset.get("trust_remote_code", False)
    revision = self.metadata.dataset["revision"]

    def _process_data(split: str, hf_subset: str = "default"):
        """Helper function to load and process data for a given split and language"""
        logger.debug(
            f"Loading {split} split for {hf_subset} subset of {self.metadata.name}"
        )

        self.dataset[hf_subset][split] = RetrievalDatasetLoader(
            hf_repo=dataset_path,
            revision=revision,
            trust_remote_code=trust_remote_code,
            split=split,
            config=hf_subset,
        ).load()

    if self.metadata.is_multilingual:
        for lang in self.metadata.eval_langs:
            for split in eval_splits:
                _process_data(split, lang)
    else:
        for split in eval_splits:
            _process_data(split)
    self.dataset_transform()
    self.data_loaded = True

task_specific_scores(scores, qrels, results, hf_split, hf_subset)

Calculate task specific scores. Override in subclass if needed.

Parameters:

Name Type Description Default
scores dict[str, dict[str, float]]

Dictionary of scores

required
qrels RelevantDocumentsType

Relevant documents

required
results dict[str, dict[str, float]]

Retrieval results

required
hf_split str

Split to evaluate on

required
hf_subset str

Subset to evaluate on

required
Source code in mteb/abstasks/retrieval.py
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
def task_specific_scores(
    self,
    scores: dict[str, dict[str, float]],
    qrels: RelevantDocumentsType,
    results: dict[str, dict[str, float]],
    hf_split: str,
    hf_subset: str,
) -> dict[str, float]:
    """Calculate task specific scores. Override in subclass if needed.

    Args:
        scores: Dictionary of scores
        qrels: Relevant documents
        results: Retrieval results
        hf_split: Split to evaluate on
        hf_subset: Subset to evaluate on
    """
    return {}

mteb.abstasks.retrieval_dataset_loaders.RetrievalSplitData

Bases: TypedDict

A dictionary containing the corpus, queries, relevant documents, instructions, and top-ranked documents for a retrieval task.

Attributes:

Name Type Description
corpus CorpusDatasetType

The corpus dataset containing documents. Should have columns id, title, text or image.

queries QueryDatasetType

The queries dataset containing queries. Should have columns id, text, instruction (for instruction retrieval/reranking) or image.

relevant_docs RelevantDocumentsType

A mapping of query IDs to relevant document IDs and their relevance scores. Should have columns query-id, corpus-id, score.

top_ranked TopRankedDocumentsType | None

A mapping of query IDs to a list of top-ranked document IDs. Should have columns query-id, corpus-ids (list[str]). This is optional and used for reranking tasks.

Source code in mteb/abstasks/retrieval_dataset_loaders.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class RetrievalSplitData(TypedDict):
    """A dictionary containing the corpus, queries, relevant documents, instructions, and top-ranked documents for a retrieval task.

    Attributes:
        corpus: The corpus dataset containing documents. Should have columns `id`, `title`, `text` or `image`.
        queries: The queries dataset containing queries. Should have columns `id`, `text`, `instruction` (for instruction retrieval/reranking) or `image`.
        relevant_docs: A mapping of query IDs to relevant document IDs and their relevance scores. Should have columns `query-id`, `corpus-id`, `score`.
        top_ranked: A mapping of query IDs to a list of top-ranked document IDs. Should have columns `query-id`, `corpus-ids` (list[str]). This is optional and used for reranking tasks.
    """

    corpus: CorpusDatasetType
    queries: QueryDatasetType
    relevant_docs: RelevantDocumentsType
    top_ranked: TopRankedDocumentsType | None

mteb.abstasks.classification.AbsTaskClassification

Bases: AbsTask

Abstract class for classification tasks

Attributes:

Name Type Description
dataset dict[HFSubset, DatasetDict] | None

Hugging Face dataset containing the data for the task. Should have train split (split name can be changed by train_split. Must contain the following columns: text: str (for text) or PIL.Image (for image). Column name can be changed via input_column_name attribute. label: int. Column name can be changed via label_column_name attribute.

evaluator_model SklearnModelProtocol

The model to use for evaluation. Can be any sklearn compatible model. Default is LogisticRegression. Full details of api in [SklearnModelProtocol][mteb._evaluators.sklearn_evaluator.SklearnModelProtocol].

samples_per_label int

Number of samples per label to use for training the evaluator model. Default is 8.

n_experiments int

Number of experiments to run. Default is 10.

train_split str

Name of the split to use for training the evaluator model. Default is "train".

label_column_name str

Name of the column containing the labels. Default is "label".

input_column_name str

Name of the column containing the input data. Default is "text".

abstask_prompt

Prompt to use for the task for instruction model if not prompt is provided in TaskMetadata.prompt.

Source code in mteb/abstasks/classification.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
class AbsTaskClassification(AbsTask):
    """Abstract class for classification tasks

    Attributes:
        dataset: Hugging Face dataset containing the data for the task. Should have train split (split name can be changed by train_split. Must contain the following columns:
            text: str (for text) or PIL.Image (for image). Column name can be changed via `input_column_name` attribute.
            label: int. Column name can be changed via `label_column_name` attribute.
        evaluator_model: The model to use for evaluation. Can be any sklearn compatible model. Default is `LogisticRegression`.
            Full details of api in [`SklearnModelProtocol`][mteb._evaluators.sklearn_evaluator.SklearnModelProtocol].
        samples_per_label: Number of samples per label to use for training the evaluator model. Default is 8.
        n_experiments: Number of experiments to run. Default is 10.
        train_split: Name of the split to use for training the evaluator model. Default is "train".
        label_column_name: Name of the column containing the labels. Default is "label".
        input_column_name: Name of the column containing the input data. Default is "text".
        abstask_prompt: Prompt to use for the task for instruction model if not prompt is provided in TaskMetadata.prompt.
    """

    evaluator: type[SklearnEvaluator] = SklearnEvaluator
    evaluator_model: SklearnModelProtocol = LogisticRegression(
        n_jobs=-1,
        max_iter=100,
    )

    samples_per_label: int = 8
    n_experiments: int = 10
    train_split: str = "train"
    label_column_name: str = "label"
    input_column_name: str = "text"
    abstask_prompt = "Classify user passages."

    def evaluate(
        self,
        model: MTEBModels,
        split: str = "test",
        subsets_to_run: list[HFSubset] | None = None,
        *,
        encode_kwargs: dict[str, Any],
        prediction_folder: Path | None = None,
        **kwargs: Any,
    ) -> dict[HFSubset, ScoresDict]:
        """Evaluate a model on the classification task.

        Differs from other tasks as it requires train split.
        """
        if not isinstance(model, EncoderProtocol):
            raise TypeError(
                f"Model {model} is a SearchProtocol, but this task {self.metadata.name} does not support Search. "
                "Please use a Encoder model instead."
            )

        if not self.data_loaded:
            self.load_data()

        if "random_state" in self.evaluator_model.get_params():
            self.evaluator_model = self.evaluator_model.set_params(
                random_state=self.seed
            )
        scores = {}
        hf_subsets = self.hf_subsets
        if subsets_to_run is not None:
            hf_subsets = [s for s in hf_subsets if s in subsets_to_run]

        for hf_subset in hf_subsets:
            logger.info(
                f"Task: {self.metadata.name}, split: {split}, subset: {hf_subset}. Running..."
            )

            if hf_subset not in self.dataset and hf_subset == "default":
                ds = self.dataset
            else:
                ds = self.dataset[hf_subset]

            if isinstance(ds, Dataset | DatasetDict):
                ds = ds.select_columns([self.label_column_name, self.input_column_name])
            scores[hf_subset] = self._evaluate_subset(
                model,
                ds,
                hf_split=split,
                hf_subset=hf_subset,
                encode_kwargs=encode_kwargs,
                prediction_folder=prediction_folder,
                **kwargs,
            )
            self._add_main_score(scores[hf_subset])

        return scores

    def _evaluate_subset(
        self,
        model: EncoderProtocol,
        data_split: DatasetDict,
        *,
        encode_kwargs: dict[str, Any],
        hf_split: str,
        hf_subset: str,
        prediction_folder: Path | None = None,
        **kwargs: Any,
    ) -> FullClassificationMetrics:
        train_split = data_split[self.train_split]
        eval_split = data_split[hf_split]

        scores = []
        # we store idxs to make the shuffling reproducible
        test_cache, idxs = None, None

        all_predictions = []
        for i in range(self.n_experiments):
            logger.info(f"Running experiment ({i}/{self.n_experiments})")
            # Bootstrap `self.samples_per_label` samples per label for each split
            train_dataset, idxs = self._undersample_data(
                train_split,
                i,
                idxs,
            )

            evaluator = self.evaluator(
                train_dataset,
                eval_split,
                self.input_column_name,
                self.label_column_name,
                task_metadata=self.metadata,
                hf_split=hf_split,
                hf_subset=hf_subset,
                evaluator_model=self.evaluator_model,
            )
            y_pred, test_cache = evaluator(
                model, encode_kwargs=encode_kwargs, test_cache=test_cache
            )
            if prediction_folder:
                all_predictions.append(y_pred.tolist())
            y_test = eval_split[self.label_column_name]
            scores_exp = self._calculate_scores(y_test, y_pred)
            scores.append(scores_exp)

        if prediction_folder:
            self._save_task_predictions(
                all_predictions,
                model,
                prediction_folder,
                hf_subset=hf_subset,
                hf_split=hf_split,
            )

        avg_scores: dict[str, Any] = {
            # ap will be none for non binary classification tasks
            k: (
                float(np.mean(values))
                if (values := [s[k] for s in scores if s[k] is not None])
                else np.nan
            )
            for k in scores[0].keys()
        }
        logger.info(f"Running {self.metadata.name} - Finished.")
        return FullClassificationMetrics(
            scores_per_experiment=scores,
            **avg_scores,
        )

    def _calculate_scores(
        self,
        y_test: np.ndarray | list[int],
        y_pred: np.ndarray,
    ) -> ClassificationMetrics:
        scores = ClassificationMetrics(
            accuracy=accuracy_score(y_test, y_pred),
            f1=f1_score(y_test, y_pred, average="macro"),
            f1_weighted=f1_score(y_test, y_pred, average="weighted"),
            precision=precision_score(y_test, y_pred, average="macro"),
            precision_weighted=precision_score(y_test, y_pred, average="weighted"),
            recall=recall_score(y_test, y_pred, average="macro"),
            recall_weighted=recall_score(y_test, y_pred, average="weighted"),
            ap=None,
            ap_weighted=None,
        )

        # if binary classification
        if len(np.unique(y_test)) == 2:
            scores["ap"] = average_precision_score(y_test, y_pred, average="macro")
            scores["ap_weighted"] = average_precision_score(
                y_test, y_pred, average="weighted"
            )
        return scores

    def _undersample_data(
        self, dataset: Dataset, experiment_num: int, idxs: list[int] | None = None
    ) -> tuple[Dataset, list[int]]:
        """Undersample data to have `samples_per_label` samples of each label.

        Args:
            dataset: Hugging Face `datasets.Dataset` containing "text" and "label".
            experiment_num: Experiment number, used to set the random seed.
            idxs: Optional indices to shuffle and sample from.

        Returns:
            A new Dataset containing undersampled examples.
            The shuffled indices used for sampling.
        """
        if idxs is None:
            idxs = list(range(len(dataset)))

        # using RandomState for backward compatibility with `v1`
        rng_state = np.random.RandomState(self.seed)
        rng_state.shuffle(idxs)

        label_counter: dict[str, int] = defaultdict(int)
        sampled_idxs = []

        for i in idxs:
            label = dataset[i][self.label_column_name]
            if label_counter[label] < self.samples_per_label:
                sampled_idxs.append(i)
                label_counter[label] += 1

        return dataset.select(sampled_idxs), idxs

    def _calculate_descriptive_statistics_from_split(
        self, split: str, hf_subset: str | None = None, compute_overall: bool = False
    ) -> ClassificationDescriptiveStatistics:
        train_text = []
        if hf_subset:
            inputs = self.dataset[hf_subset][split][self.input_column_name]
            label = self.dataset[hf_subset][split][self.label_column_name]
            if split != self.train_split:
                train_text = self.dataset[hf_subset][self.train_split][
                    self.input_column_name
                ]
        elif compute_overall:
            inputs = []
            label = []
            for hf_subset in self.metadata.eval_langs:
                inputs.extend(self.dataset[hf_subset][split][self.input_column_name])
                label.extend(self.dataset[hf_subset][split][self.label_column_name])
                if split != self.train_split:
                    train_text.extend(
                        self.dataset[hf_subset][self.train_split][
                            self.input_column_name
                        ]
                    )
        else:
            inputs = self.dataset[split][self.input_column_name]
            label = self.dataset[split][self.label_column_name]
            if split != self.train_split:
                train_text = self.dataset[self.train_split][self.input_column_name]

        image_statistics = None
        text_statistics = None
        num_texts_in_train = None

        if "image" in self.metadata.modalities:
            image_statistics = calculate_image_statistics(inputs)
        if "text" in self.metadata.modalities:
            text_statistics = calculate_text_statistics(inputs)
            num_texts_in_train = (
                len(set(inputs) & set(train_text))
                if split != self.train_split
                else None
            )

        label_statistics = calculate_label_statistics(label)

        return ClassificationDescriptiveStatistics(
            num_samples=len(inputs),
            number_texts_intersect_with_train=num_texts_in_train,
            text_statistics=text_statistics,
            image_statistics=image_statistics,
            label_statistics=label_statistics,
        )

    def _push_dataset_to_hub(self, repo_name: str) -> None:
        self._upload_dataset_to_hub(
            repo_name,
            [
                self.input_column_name,
                self.label_column_name,
            ],
        )

evaluate(model, split='test', subsets_to_run=None, *, encode_kwargs, prediction_folder=None, **kwargs)

Evaluate a model on the classification task.

Differs from other tasks as it requires train split.

Source code in mteb/abstasks/classification.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def evaluate(
    self,
    model: MTEBModels,
    split: str = "test",
    subsets_to_run: list[HFSubset] | None = None,
    *,
    encode_kwargs: dict[str, Any],
    prediction_folder: Path | None = None,
    **kwargs: Any,
) -> dict[HFSubset, ScoresDict]:
    """Evaluate a model on the classification task.

    Differs from other tasks as it requires train split.
    """
    if not isinstance(model, EncoderProtocol):
        raise TypeError(
            f"Model {model} is a SearchProtocol, but this task {self.metadata.name} does not support Search. "
            "Please use a Encoder model instead."
        )

    if not self.data_loaded:
        self.load_data()

    if "random_state" in self.evaluator_model.get_params():
        self.evaluator_model = self.evaluator_model.set_params(
            random_state=self.seed
        )
    scores = {}
    hf_subsets = self.hf_subsets
    if subsets_to_run is not None:
        hf_subsets = [s for s in hf_subsets if s in subsets_to_run]

    for hf_subset in hf_subsets:
        logger.info(
            f"Task: {self.metadata.name}, split: {split}, subset: {hf_subset}. Running..."
        )

        if hf_subset not in self.dataset and hf_subset == "default":
            ds = self.dataset
        else:
            ds = self.dataset[hf_subset]

        if isinstance(ds, Dataset | DatasetDict):
            ds = ds.select_columns([self.label_column_name, self.input_column_name])
        scores[hf_subset] = self._evaluate_subset(
            model,
            ds,
            hf_split=split,
            hf_subset=hf_subset,
            encode_kwargs=encode_kwargs,
            prediction_folder=prediction_folder,
            **kwargs,
        )
        self._add_main_score(scores[hf_subset])

    return scores

mteb.abstasks.multilabel_classification.AbsTaskMultilabelClassification

Bases: AbsTaskClassification

Abstract class for multioutput classification tasks

Attributes:

Name Type Description
dataset dict[HFSubset, DatasetDict] | None

Huggingface dataset containing the data for the task. Dataset must contain columns specified by input_column_name and label_column_name. Input column must contain the text or image to be classified, and label column must contain a list of labels for each example.

input_column_name str

Name of the column containing the input text.

label_column_name str

Name of the column containing the labels.

samples_per_label int

Number of samples to use pr. label. These samples are embedded and a classifier is fit using the labels and samples.

evaluator SklearnModelProtocol

Classifier to use for evaluation. Must implement the SklearnModelProtocol.

Source code in mteb/abstasks/multilabel_classification.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
class AbsTaskMultilabelClassification(AbsTaskClassification):
    """Abstract class for multioutput classification tasks

    Attributes:
        dataset: Huggingface dataset containing the data for the task. Dataset must contain columns specified by input_column_name and label_column_name.
            Input column must contain the text or image to be classified, and label column must contain a list of labels for each example.
        input_column_name: Name of the column containing the input text.
        label_column_name: Name of the column containing the labels.
        samples_per_label: Number of samples to use pr. label. These samples are embedded and a classifier is fit using the labels and samples.
        evaluator: Classifier to use for evaluation. Must implement the SklearnModelProtocol.
    """

    evaluator: SklearnModelProtocol = KNeighborsClassifier(n_neighbors=5)
    input_column_name: str = "text"
    label_column_name: str = "label"

    @override
    def _evaluate_subset(
        self,
        model: EncoderProtocol,
        data_split: DatasetDict,
        *,
        encode_kwargs: dict[str, Any],
        hf_split: str,
        hf_subset: str,
        prediction_folder: Path | None = None,
        **kwargs: Any,
    ) -> FullMultilabelClassificationMetrics:
        if isinstance(data_split, DatasetDict):
            data_split = data_split.select_columns(
                [self.input_column_name, self.label_column_name]
            )
        train_split = data_split[self.train_split]
        eval_split = data_split[hf_split]

        logger.info(
            "Running multilabel classification task - Sampling training data..."
        )
        scores = []
        # Bootstrap sample indices from training set for each experiment
        train_samples = []
        for _ in range(self.n_experiments):
            sample_indices, _ = self._undersample_data_indices(
                train_split[self.label_column_name], self.samples_per_label, None
            )
            train_samples.append(sample_indices)
        # Encode all unique sentences at the indices
        unique_train_indices = list(set(itertools.chain.from_iterable(train_samples)))
        unique_train_dataset = train_split.select(unique_train_indices).select_columns(
            self.input_column_name
        )
        dataloader_train = create_dataloader(
            unique_train_dataset,
            self.metadata,
            input_column=self.input_column_name,
            batch_size=encode_kwargs["batch_size"],
        )

        logger.info("Running multilabel classification - Encoding training set...")
        _unique_train_embeddings = model.encode(
            dataloader_train,
            task_metadata=self.metadata,
            hf_split=self.train_split,
            hf_subset=hf_subset,
            **encode_kwargs,
        )
        unique_train_embeddings = dict(
            zip(unique_train_indices, _unique_train_embeddings)
        )
        # Stratified subsampling of test set to 2000 examples.
        test_dataset = eval_split
        try:
            if len(test_dataset) > 2000:
                split_dataset = eval_split.train_test_split(
                    test_size=2000, seed=42, stratify_by_column="label"
                )
                test_dataset = split_dataset["test"]
        except ValueError:
            logger.warning("Couldn't subsample, continuing with the entire test set.")

        dataloader_test = create_dataloader(
            test_dataset.select_columns(self.input_column_name),
            self.metadata,
            input_column=self.input_column_name,
            batch_size=encode_kwargs["batch_size"],
        )

        logger.info("Running multilabel classification - Encoding test set...")
        X_test = model.encode(
            dataloader_test,
            task_metadata=self.metadata,
            hf_split=hf_split,
            hf_subset=hf_subset,
            **encode_kwargs,
        )
        binarizer = MultiLabelBinarizer()
        y_test = binarizer.fit_transform(test_dataset[self.label_column_name])

        logger.info("Running multilabel classification - Evaluating classifiers...")
        all_predictions = []
        for i_experiment, sample_indices in enumerate(train_samples):
            X_train = np.stack([unique_train_embeddings[idx] for idx in sample_indices])
            y_train = train_split.select(sample_indices)[self.label_column_name]
            y_train = binarizer.transform(y_train)
            y_pred, current_classifier = _evaluate_classifier(
                X_train, y_train, X_test, self.evaluator
            )
            if prediction_folder:
                all_predictions.append(y_pred.tolist())

            scores_exp = self._calculate_scores(
                y_test, y_pred, X_test, current_classifier
            )
            scores.append(scores_exp)

        if prediction_folder:
            self._save_task_predictions(
                all_predictions,
                model,
                prediction_folder,
                hf_subset=hf_subset,
                hf_split=hf_split,
            )

        avg_scores: dict[str, Any] = {
            k: np.mean([s[k] for s in scores]) for k in scores[0].keys()
        }
        logger.info("Running multilabel classification - Finished.")
        return FullMultilabelClassificationMetrics(
            scores_per_experiment=scores,
            **avg_scores,
        )

    def _calculate_scores(
        self,
        y_test: np.ndarray,
        y_pred: np.ndarray,
        x_test_embedding: np.ndarray,
        current_classifier: SklearnModelProtocol,
    ) -> MultilabelClassificationMetrics:
        accuracy = current_classifier.score(x_test_embedding, y_test)
        if isinstance(current_classifier, MultiOutputClassifier):
            predictions = current_classifier.predict_proba(x_test_embedding)
            all_probs = [emb[:, 1] for emb in predictions]

            y_score = np.stack(all_probs, axis=1)  # shape: (n_samples, n_labels)
            lrap = label_ranking_average_precision_score(y_test, y_score)
        else:
            lrap = label_ranking_average_precision_score(y_test, y_pred)
        f1 = f1_score(y_test, y_pred, average="macro")
        return MultilabelClassificationMetrics(
            accuracy=accuracy,
            lrap=lrap,
            f1=f1,
        )

    def _undersample_data_indices(
        self, y: list[list[int]], samples_per_label: int, idxs: list[int] | None = None
    ) -> tuple[list[int], list[int]]:
        """Undersample data to have samples_per_label samples of each label.

        Returns:
            A tuple containing:
                - List of sampled indices.
                - List of all indices after shuffling.
        """
        sample_indices = []
        if idxs is None:
            idxs = np.arange(len(y))
        self.np_rng.shuffle(idxs)
        idxs = idxs.tolist()
        label_counter = defaultdict(int)
        for i in idxs:
            if any((label_counter[label] < samples_per_label) for label in y[i]):
                sample_indices.append(i)
                for label in y[i]:
                    label_counter[label] += 1
        return sample_indices, idxs

mteb.abstasks.clustering.AbsTaskClustering

Bases: AbsTask

Abstract class for Clustering tasks.

This class embeds the corpus sentences then samples N samples from the corpus and clusters them. The similarity then is calculated using the V-measure metric, which is invariant to the permutation of the labels. This approach is then repeated K times.

There are two ways to specify how a dataset is downsampled max_document_to_embed and max_fraction_of_documents_to_embed. If both parameters are set to None, no downsampling is done in self._evaluate_subset(). Only one of these two parameters can be not None at the same time.

If the clustering is hierarchical, and more than one label is specified in order for each observation, V-measures are calculated in the outlined way on each of the levels separately.

Attributes:

Name Type Description
dataset dict[HFSubset, DatasetDict] | None

A HuggingFace Dataset containing the data for the clustering task. Must contain the following columns sentences that contains inputs (texts or images) and labels columns.

max_fraction_of_documents_to_embed float | None

Fraction of documents to embed for clustering.

max_document_to_embed int | None

Maximum number of documents to embed for clustering.

max_documents_per_cluster int

Number of documents to sample for each clustering experiment.

n_clusters int

Number of clustering experiments to run.

k_mean_batch_size int

Batch size to use for k-means clustering.

max_depth

Maximum depth to evaluate clustering. If None, evaluates all levels.

input_column_name str

Name of the column containing the input sentences or data points.

label_column_name str

Name of the column containing the true cluster labels.

abstask_prompt

Prompt to use for the task for instruction model if not prompt is provided in TaskMetadata.prompt.

Source code in mteb/abstasks/clustering.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
class AbsTaskClustering(AbsTask):
    """Abstract class for Clustering tasks.

    This class embeds the corpus sentences then samples N samples from the corpus and clusters them.
    The similarity then is calculated using the V-measure metric, which is invariant to the permutation of the labels.
    This approach is then repeated K times.

    There are two ways to specify how a dataset is downsampled `max_document_to_embed` and `max_fraction_of_documents_to_embed`.
    If both parameters are set to None, no downsampling is done in self._evaluate_subset().
    Only one of these two parameters can be not None at the same time.

    If the clustering is hierarchical, and more than one label is specified in order for each observation,
    V-measures are calculated in the outlined way on each of the levels separately.

    Attributes:
        dataset: A HuggingFace Dataset containing the data for the clustering task. Must contain the following columns `sentences` that contains inputs (texts or images) and labels columns.
        max_fraction_of_documents_to_embed: Fraction of documents to embed for clustering.
        max_document_to_embed: Maximum number of documents to embed for clustering.
        max_documents_per_cluster: Number of documents to sample for each clustering experiment.
        n_clusters: Number of clustering experiments to run.
        k_mean_batch_size: Batch size to use for k-means clustering.
        max_depth: Maximum depth to evaluate clustering. If None, evaluates all levels.
        input_column_name: Name of the column containing the input sentences or data points.
        label_column_name: Name of the column containing the true cluster labels.
        abstask_prompt: Prompt to use for the task for instruction model if not prompt is provided in TaskMetadata.prompt.
    """

    max_fraction_of_documents_to_embed: float | None = 0.04
    max_document_to_embed: int | None = None
    max_documents_per_cluster: int = 16_384
    n_clusters: int = 10
    k_mean_batch_size: int = 512
    max_depth = None
    abstask_prompt = "Identify categories in user passages."
    input_column_name: str = "sentences"
    label_column_name: str = "labels"

    def _evaluate_subset(
        self,
        model: EncoderProtocol,
        data_split: Dataset,
        *,
        encode_kwargs: dict[str, Any],
        hf_split: str,
        hf_subset: str,
        prediction_folder: Path | None = None,
        **kwargs: Any,
    ) -> ScoresDict:
        if (
            self.max_document_to_embed is not None
            and self.max_fraction_of_documents_to_embed is not None
        ):
            raise Exception(
                "Both max_document_to_embed and max_fraction_of_documents_to_embed are set. Please only set one."
            )

        logger.info("Running clustering - Preparing data...")
        if (
            self.max_document_to_embed is None
            and self.max_fraction_of_documents_to_embed is None
        ):
            downsampled_dataset = data_split
        else:
            if self.max_fraction_of_documents_to_embed is not None:
                max_documents_to_embed = int(
                    self.max_fraction_of_documents_to_embed * len(data_split)
                )
            else:
                max_documents_to_embed = self.max_document_to_embed

            max_documents_to_embed = min(len(data_split), max_documents_to_embed)  # type: ignore
            example_indices = self.rng_state.sample(
                range(len(data_split)), k=max_documents_to_embed
            )
            downsampled_dataset = data_split.select(example_indices)  # type: ignore

        downsampled_dataset = downsampled_dataset.select_columns(
            [self.input_column_name, self.label_column_name]
        )

        logger.info("Running clustering - Encoding samples...")
        embeddings = model.encode(
            create_dataloader(
                downsampled_dataset,
                self.metadata,
                input_column=self.input_column_name,
                batch_size=encode_kwargs["batch_size"],
            ),
            task_metadata=self.metadata,
            hf_subset=hf_subset,
            hf_split=hf_split,
            **encode_kwargs,
        )

        logger.info("Running clustering - Evaluating clustering...")
        labels = []
        for label in downsampled_dataset[self.label_column_name]:
            if not isinstance(label, list):
                label = [label]
            labels.append(label)

        all_v_scores, all_assignments = _evaluate_clustering_bootstrapped(
            embeddings,
            labels,
            n_clusters=self.n_clusters,
            cluster_size=self.max_documents_per_cluster,
            kmean_batch_size=self.k_mean_batch_size,
            max_depth=self.max_depth,
            rng_state=self.rng_state,
            seed=self.seed,
        )

        if prediction_folder:
            self._save_task_predictions(
                all_assignments,
                model,
                prediction_folder,
                hf_subset=hf_subset,
                hf_split=hf_split,
            )

        v_measures = list(itertools.chain.from_iterable(all_v_scores.values()))

        logger.info("Running clustering - Finished.")
        mean_v_measure = np.mean(v_measures)
        v_std = np.std(v_measures)
        return {
            "v_measures": all_v_scores,
            "v_measure": float(mean_v_measure),
            "v_measure_std": v_std,
        }

    def _calculate_descriptive_statistics_from_split(
        self, split: str, hf_subset: str | None = None, compute_overall: bool = False
    ) -> ClusteringFastDescriptiveStatistics:
        if hf_subset:
            inputs = self.dataset[hf_subset][split][self.input_column_name]
            labels = self.dataset[hf_subset][split][self.label_column_name]
        elif compute_overall:
            inputs = []
            labels = []
            for hf_subset in self.metadata.eval_langs:
                inputs.extend(self.dataset[hf_subset][split][self.input_column_name])
                labels.extend(self.dataset[hf_subset][split][self.label_column_name])
        else:
            inputs = self.dataset[split][self.input_column_name]
            labels = self.dataset[split][self.label_column_name]

        if isinstance(inputs[0], list):
            inputs = [item for sublist in inputs for item in sublist]
        if isinstance(labels[0], list):
            labels = [item for sublist in labels for item in sublist]

        text_statistics, image_statistics = None, None
        if "image" in self.metadata.modalities:
            image_statistics = calculate_image_statistics(inputs)

        if "text" in self.metadata.modalities:
            text_statistics = calculate_text_statistics(inputs)

        label_statistics = calculate_label_statistics(labels)

        return ClusteringFastDescriptiveStatistics(
            num_samples=len(inputs),
            text_statistics=text_statistics,
            image_statistics=image_statistics,
            labels_statistics=label_statistics,
        )

    def _push_dataset_to_hub(self, repo_name: str) -> None:
        self._upload_dataset_to_hub(
            repo_name, [self.input_column_name, self.label_column_name]
        )

mteb.abstasks.sts.AbsTaskSTS

Bases: AbsTask

Abstract class for STS experiments.

Attributes:

Name Type Description
dataset dict[HFSubset, DatasetDict] | None

Dataset or dict of Datasets for different subsets (e.g., languages). Dataset must contain columns specified in column_names and a 'score' column. Columns in column_names should contain the text or image data to be compared.

column_names tuple[str, str]

Tuple containing the names of the two columns to compare.

min_score int

Minimum possible score in the dataset.

max_score int

Maximum possible score in the dataset.

abstask_prompt

Prompt to use for the task for instruction model if not prompt is provided in TaskMetadata.prompt.

Source code in mteb/abstasks/sts.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
class AbsTaskSTS(AbsTask):
    """Abstract class for STS experiments.

    Attributes:
        dataset: Dataset or dict of Datasets for different subsets (e.g., languages). Dataset must contain columns specified in column_names and a 'score' column.
            Columns in column_names should contain the text or image data to be compared.
        column_names: Tuple containing the names of the two columns to compare.
        min_score: Minimum possible score in the dataset.
        max_score: Maximum possible score in the dataset.
        abstask_prompt: Prompt to use for the task for instruction model if not prompt is provided in TaskMetadata.prompt.
    """

    abstask_prompt = "Retrieve semantically similar text."
    column_names: tuple[str, str] = ("sentence1", "sentence2")
    min_score: int = 0
    max_score: int = 5

    def _evaluate_subset(
        self,
        model: EncoderProtocol,
        data_split: Dataset,
        encode_kwargs: dict[str, Any],
        hf_split: str,
        hf_subset: str,
        prediction_folder: Path | None = None,
        **kwargs: Any,
    ) -> STSMetrics:
        normalized_scores = list(map(self._normalize, data_split["score"]))
        data_split = data_split.select_columns(list(self.column_names))

        evaluator = AnySTSEvaluator(
            data_split,
            self.column_names,
            task_metadata=self.metadata,
            hf_split=hf_split,
            hf_subset=hf_subset,
            **kwargs,
        )
        scores = evaluator(model, encode_kwargs=encode_kwargs)

        if prediction_folder:
            self._save_task_predictions(
                scores,
                model,
                prediction_folder,
                hf_subset=hf_subset,
                hf_split=hf_split,
            )

        return self._calculate_scores(scores, normalized_scores)

    def _calculate_scores(
        self, scores: STSEvaluatorScores, normalized_scores: list[float]
    ) -> STSMetrics:
        def compute_corr(x: list[float], y: list[float]) -> tuple[float, float]:
            """Return (pearson, spearman) correlations between x and y."""
            return pearsonr(x, y)[0], spearmanr(x, y)[0]

        cosine_pearson, cosine_spearman = compute_corr(
            normalized_scores, scores["cosine_scores"]
        )
        manhattan_pearson, manhattan_spearman = compute_corr(
            normalized_scores, scores["manhattan_distances"]
        )
        euclidean_pearson, euclidean_spearman = compute_corr(
            normalized_scores, scores["euclidean_distances"]
        )

        if scores["similarity_scores"] is not None:
            pearson, spearman = compute_corr(
                normalized_scores, scores["similarity_scores"]
            )
        else:
            # if model does not have a similarity function, assume cosine similarity
            pearson, spearman = cosine_pearson, cosine_spearman

        return STSMetrics(
            # using the models own similarity score
            pearson=pearson,
            spearman=spearman,
            # generic similarity scores
            cosine_pearson=cosine_pearson,
            cosine_spearman=cosine_spearman,
            manhattan_pearson=manhattan_pearson,
            manhattan_spearman=manhattan_spearman,
            euclidean_pearson=euclidean_pearson,
            euclidean_spearman=euclidean_spearman,
        )

    def _calculate_descriptive_statistics_from_split(
        self, split: str, hf_subset: str | None = None, compute_overall: bool = False
    ) -> AnySTSDescriptiveStatistics:
        first_column, second_column = self.column_names
        self.dataset = cast(dict[str, dict[str, Dataset]], self.dataset)

        if hf_subset:
            sentence1 = self.dataset[hf_subset][split][first_column]
            sentence2 = self.dataset[hf_subset][split][second_column]
            score = self.dataset[hf_subset][split]["score"]
        elif compute_overall:
            sentence1 = []
            sentence2 = []
            score = []
            for hf_subset in self.metadata.eval_langs:
                sentence1.extend(self.dataset[hf_subset][split][first_column])
                sentence2.extend(self.dataset[hf_subset][split][second_column])
                score.extend(self.dataset[hf_subset][split]["score"])
        else:
            sentence1 = self.dataset[split][first_column]
            sentence2 = self.dataset[split][second_column]
            score = self.dataset[split]["score"]

        if "text" in self.metadata.modalities:
            text1_statistics = calculate_text_statistics(sentence1)
            text2_statistics = calculate_text_statistics(sentence2)

            unique_pairs = len(set(zip(sentence1, sentence2)))
        else:
            text1_statistics = None
            text2_statistics = None
            unique_pairs = None

        if "image" in self.metadata.modalities:
            image1_statistics = calculate_image_statistics(sentence1)
            image2_statistics = calculate_image_statistics(sentence2)
        else:
            image1_statistics = None
            image2_statistics = None

        labels_statistics = calculate_score_statistics(score)

        return AnySTSDescriptiveStatistics(
            num_samples=len(sentence1),
            number_of_characters=(
                text1_statistics["total_text_length"]
                + text2_statistics["total_text_length"]
                if text1_statistics
                else None
            ),
            unique_pairs=unique_pairs,
            text1_statistics=text1_statistics,
            text2_statistics=text2_statistics,
            image1_statistics=image1_statistics,
            image2_statistics=image2_statistics,
            label_statistics=labels_statistics,
        )

    def _push_dataset_to_hub(self, repo_name: str) -> None:
        self._upload_dataset_to_hub(
            repo_name, [self.column_names[0], self.column_names[1], "score"]
        )

    def _normalize(self, x: float) -> float:
        return (x - self.min_score) / (self.max_score - self.min_score)

mteb.abstasks.zeroshot_classification.AbsTaskZeroShotClassification

Bases: AbsTask

Abstract class for ZeroShot Classification tasks for any modality.

The similarity between an input (can be image or text) and candidate text prompts, such as this is a dog/this is a cat.

Attributes:

Name Type Description
dataset dict[HFSubset, DatasetDict] | None

Huggingface dataset containing the data for the task. Dataset must contain columns specified by self.input_column_name and self.label_column_name.

input_column_name str

Name of the column containing the inputs (image or text).

label_column_name str

Name of the column containing the labels (str).

Source code in mteb/abstasks/zeroshot_classification.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
class AbsTaskZeroShotClassification(AbsTask):
    """Abstract class for ZeroShot Classification tasks for any modality.

    The similarity between an input (can be image or text) and candidate text prompts, such as this is a dog/this is a cat.

    Attributes:
        dataset: Huggingface dataset containing the data for the task. Dataset must contain columns specified by self.input_column_name and self.label_column_name.
        input_column_name: Name of the column containing the inputs (image or text).
        label_column_name: Name of the column containing the labels (str).
    """

    input_column_name: str = "image"
    label_column_name: str = "label"

    def _calculate_descriptive_statistics_from_split(
        self, split: str, hf_subset: str | None = None, compute_overall: bool = False
    ) -> ZeroShotClassificationDescriptiveStatistics:
        if hf_subset:
            inputs = self.dataset[hf_subset][split][self.input_column_name]
            labels = self.dataset[hf_subset][split][self.label_column_name]
        elif compute_overall:
            inputs, labels = [], []
            for hf_subset in self.metadata.eval_langs:
                inputs.extend(self.dataset[hf_subset][split][self.input_column_name])
                labels.extend(self.dataset[hf_subset][split][self.label_column_name])
        else:
            inputs = self.dataset[split][self.input_column_name]
            labels = self.dataset[split][self.label_column_name]

        num_samples = len(inputs)

        image_statistics = None
        text_statistics = None

        if "image" in self.metadata.modalities:
            image_statistics = calculate_image_statistics(inputs)
        if self.metadata.modalities == ["text"]:
            text_statistics = calculate_text_statistics(inputs)

        label_statistics = calculate_label_statistics(labels)
        candidate_lens = calculate_text_statistics(self.get_candidate_labels())

        return ZeroShotClassificationDescriptiveStatistics(
            num_samples=num_samples,
            number_of_characters=None,
            text_statistics=text_statistics,
            image_statistics=image_statistics,
            label_statistics=label_statistics,
            candidates_labels_text_statistics=candidate_lens,
        )

    def _evaluate_subset(
        self,
        model: EncoderProtocol,
        data_split: Dataset,
        *,
        hf_split: str,
        hf_subset: str,
        encode_kwargs: dict[str, Any],
        prediction_folder: Path | None = None,
        **kwargs,
    ) -> ZeroShotClassificationMetrics:
        candidate_labels = self.get_candidate_labels()
        data_split = data_split.select_columns(
            [self.input_column_name, self.label_column_name]
        )
        evaluator = ZeroShotClassificationEvaluator(
            data_split,
            self.input_column_name,
            candidate_labels,
            task_metadata=self.metadata,
            hf_split=hf_split,
            hf_subset=hf_subset,
            **kwargs,
        )
        probs = evaluator(model, encode_kwargs=encode_kwargs)

        if prediction_folder:
            self._save_task_predictions(
                probs.tolist(),
                model,
                prediction_folder,
                hf_subset=hf_subset,
                hf_split=hf_split,
            )

        return self._calculate_scores(
            data_split[self.label_column_name],
            torch.tensor(probs).argmax(dim=1).tolist(),
        )

    def _calculate_scores(
        self,
        labels: list[int],
        predictions: list[float],
    ) -> ZeroShotClassificationMetrics:
        return ZeroShotClassificationMetrics(
            accuracy=metrics.accuracy_score(labels, predictions),
        )

    def _push_dataset_to_hub(self, repo_name: str) -> None:
        self._upload_dataset_to_hub(
            repo_name,
            [
                self.input_column_name,
                self.label_column_name,
            ],
        )
        labels_dataset = Dataset.from_dict({"labels": self.get_candidate_labels()})
        labels_dataset.push_to_hub(repo_name, config_name="labels")

    def get_candidate_labels(self) -> list[str]:
        """Return the text candidates for zeroshot classification"""
        raise NotImplementedError("This method should be overridden by subclasses")

get_candidate_labels()

Return the text candidates for zeroshot classification

Source code in mteb/abstasks/zeroshot_classification.py
172
173
174
def get_candidate_labels(self) -> list[str]:
    """Return the text candidates for zeroshot classification"""
    raise NotImplementedError("This method should be overridden by subclasses")

mteb.abstasks.regression.AbsTaskRegression

Bases: AbsTaskClassification

Abstract class for regression tasks

self.load_data() must generate a huggingface dataset with a split matching self.metadata.eval_splits, and assign it to self.dataset. It must contain the following columns: text: str value: float

Attributes:

Name Type Description
dataset dict[HFSubset, DatasetDict] | None

A HuggingFace Dataset containing the data for the regression task. It must contain the following columns: input_column_name and label_column_name. Input can be any text or images, and label must be a continuous value.

input_column_name str

Name of the column containing the text inputs.

label_column_name str

Name of the column containing the continuous values.

train_split str

Name of the training split in the dataset.

n_experiments int

Number of experiments to run with different random seeds.

n_samples int

Number of samples to use for training the regression model. If the dataset has fewer samples than n_samples, all samples are used.

abstask_prompt

Prompt to use for the task for instruction model if not prompt is provided in TaskMetadata.prompt.

evaluator_model SklearnModelProtocol

The model to use for evaluation. Can be any sklearn compatible model. Default is LinearRegression. Full details of api in [SklearnModelProtocol][mteb._evaluators.sklearn_evaluator.SklearnModelProtocol].

Source code in mteb/abstasks/regression.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
class AbsTaskRegression(AbsTaskClassification):
    """Abstract class for regression tasks

    self.load_data() must generate a huggingface dataset with a split matching self.metadata.eval_splits, and assign it to self.dataset. It
    must contain the following columns:
        text: str
        value: float

    Attributes:
        dataset: A HuggingFace Dataset containing the data for the regression task. It must contain the following columns: input_column_name and label_column_name.
            Input can be any text or images, and label must be a continuous value.
        input_column_name: Name of the column containing the text inputs.
        label_column_name: Name of the column containing the continuous values.
        train_split: Name of the training split in the dataset.
        n_experiments: Number of experiments to run with different random seeds.
        n_samples: Number of samples to use for training the regression model. If the dataset has fewer samples than n_samples, all samples are used.
        abstask_prompt: Prompt to use for the task for instruction model if not prompt is provided in TaskMetadata.prompt.
        evaluator_model: The model to use for evaluation. Can be any sklearn compatible model. Default is `LinearRegression`.
            Full details of api in [`SklearnModelProtocol`][mteb._evaluators.sklearn_evaluator.SklearnModelProtocol].
    """

    evaluator: type[SklearnModelProtocol] = SklearnEvaluator
    evaluator_model: SklearnModelProtocol = LinearRegression(n_jobs=-1)

    train_split: str = "train"
    label_column_name: str = "value"
    input_column_name: str = "text"
    abstask_prompt = "Predict the value of the user passage."

    n_experiments: int = 10
    n_samples: int = 2048

    def _undersample_data(
        self, dataset: Dataset, experiment_num: int, idxs: list[int] | None = None
    ) -> tuple[Dataset, list[int]]:
        if self.n_samples >= len(dataset):
            train_split_sampled = dataset
        else:
            train_split_sampled = self.stratified_subsampling(
                datasets.DatasetDict({"train": dataset}),
                seed=self.seed + experiment_num,
                splits=["train"],
                label=self.label_column_name,
                n_samples=self.n_samples,
            )["train"]
        return train_split_sampled, []

    def _calculate_scores(
        self,
        y_test: np.ndarray | list[int],
        y_pred: np.ndarray,
    ) -> RegressionMetrics:
        mse = mean_squared_error(y_test, y_pred)
        return RegressionMetrics(
            mse=mse,
            mae=mean_absolute_error(y_test, y_pred),
            r2=r2_score(y_test, y_pred),
            kendalltau=kendalltau(y_test, y_pred).statistic,
            rmse=np.sqrt(mse),
        )

    @staticmethod
    def stratified_subsampling(
        dataset_dict: datasets.DatasetDict,
        seed: int,
        splits: list[str] = ["test"],
        label: str = "value",
        n_samples: int = 2048,
        n_bins: int = 10,
    ) -> datasets.DatasetDict:
        """Subsamples the dataset with stratification by the supplied label, which is assumed to be a continuous value.

        The continuous values are bucketized into `n_bins` bins based on quantiles.

        Args:
            dataset_dict: the DatasetDict object.
            seed: the random seed.
            splits: the splits of the dataset.
            label: the label with which the stratified sampling is based on.
            n_samples: Optional, number of samples to subsample.
            n_bins: Optional, number of bins to bucketize the continuous label.

        Returns:
            A subsampled DatasetDict object.
        """
        stratify_col_name = f"{label}_binned_for_stratification"

        for split in splits:
            if n_samples >= len(dataset_dict[split]):
                logger.debug(
                    "Subsampling not needed for split %s, as n_samples is equal or greater than the number of samples.",
                    split,
                )
                continue

            dataset = dataset_dict[split]
            labels = dataset[label]

            binned_labels = pd.qcut(labels, q=n_bins, labels=False, duplicates="drop")
            dataset_with_bins: datasets.Dataset = dataset.add_column(
                name=stratify_col_name,
                column=binned_labels.tolist(),
            )
            dataset_with_bins = dataset_with_bins.cast_column(
                stratify_col_name,
                datasets.ClassLabel(names=np.unique(binned_labels).tolist()),
            )

            subsampled_dataset = dataset_with_bins.train_test_split(
                test_size=n_samples, seed=seed, stratify_by_column=stratify_col_name
            )["test"]

            subsampled_dataset = subsampled_dataset.remove_columns([stratify_col_name])
            dataset_dict[split] = subsampled_dataset

        return dataset_dict

    def _calculate_descriptive_statistics_from_split(
        self, split: str, hf_subset: str | None = None, compute_overall: bool = False
    ) -> RegressionDescriptiveStatistics:
        train_text = []
        if hf_subset:
            texts = self.dataset[hf_subset][split][self.input_column_name]
            values = self.dataset[hf_subset][split][self.label_column_name]
            if split != self.train_split:
                train_text = self.dataset[hf_subset][self.train_split][
                    self.input_column_name
                ]
        elif compute_overall:
            texts = []
            values = []
            for lang_subset in self.metadata.eval_langs:
                texts.extend(self.dataset[lang_subset][split][self.input_column_name])
                values.extend(self.dataset[lang_subset][split][self.label_column_name])
                if split != "train":
                    train_text.extend(
                        self.dataset[lang_subset][self.train_split][
                            self.input_column_name
                        ]
                    )
        else:
            texts = self.dataset[split][self.input_column_name]
            values = self.dataset[split][self.label_column_name]
            if split != "train":
                train_text = self.dataset[self.train_split][self.input_column_name]

        text_statistics = None
        image_statistics = None
        num_texts_in_train = None
        if self.metadata.modalities == ["text"]:
            text_statistics = calculate_text_statistics(texts)
            num_texts_in_train = (
                len(set(texts) & set(train_text)) if split != self.train_split else None
            )
        elif self.metadata.modalities == ["image"]:
            image_statistics = calculate_image_statistics(texts)

        return RegressionDescriptiveStatistics(
            num_samples=len(texts),
            num_texts_in_train=num_texts_in_train,
            text_statistics=text_statistics,
            image_statistics=image_statistics,
            values_statistics=calculate_score_statistics(values),
        )

stratified_subsampling(dataset_dict, seed, splits=['test'], label='value', n_samples=2048, n_bins=10) staticmethod

Subsamples the dataset with stratification by the supplied label, which is assumed to be a continuous value.

The continuous values are bucketized into n_bins bins based on quantiles.

Parameters:

Name Type Description Default
dataset_dict DatasetDict

the DatasetDict object.

required
seed int

the random seed.

required
splits list[str]

the splits of the dataset.

['test']
label str

the label with which the stratified sampling is based on.

'value'
n_samples int

Optional, number of samples to subsample.

2048
n_bins int

Optional, number of bins to bucketize the continuous label.

10

Returns:

Type Description
DatasetDict

A subsampled DatasetDict object.

Source code in mteb/abstasks/regression.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
@staticmethod
def stratified_subsampling(
    dataset_dict: datasets.DatasetDict,
    seed: int,
    splits: list[str] = ["test"],
    label: str = "value",
    n_samples: int = 2048,
    n_bins: int = 10,
) -> datasets.DatasetDict:
    """Subsamples the dataset with stratification by the supplied label, which is assumed to be a continuous value.

    The continuous values are bucketized into `n_bins` bins based on quantiles.

    Args:
        dataset_dict: the DatasetDict object.
        seed: the random seed.
        splits: the splits of the dataset.
        label: the label with which the stratified sampling is based on.
        n_samples: Optional, number of samples to subsample.
        n_bins: Optional, number of bins to bucketize the continuous label.

    Returns:
        A subsampled DatasetDict object.
    """
    stratify_col_name = f"{label}_binned_for_stratification"

    for split in splits:
        if n_samples >= len(dataset_dict[split]):
            logger.debug(
                "Subsampling not needed for split %s, as n_samples is equal or greater than the number of samples.",
                split,
            )
            continue

        dataset = dataset_dict[split]
        labels = dataset[label]

        binned_labels = pd.qcut(labels, q=n_bins, labels=False, duplicates="drop")
        dataset_with_bins: datasets.Dataset = dataset.add_column(
            name=stratify_col_name,
            column=binned_labels.tolist(),
        )
        dataset_with_bins = dataset_with_bins.cast_column(
            stratify_col_name,
            datasets.ClassLabel(names=np.unique(binned_labels).tolist()),
        )

        subsampled_dataset = dataset_with_bins.train_test_split(
            test_size=n_samples, seed=seed, stratify_by_column=stratify_col_name
        )["test"]

        subsampled_dataset = subsampled_dataset.remove_columns([stratify_col_name])
        dataset_dict[split] = subsampled_dataset

    return dataset_dict

mteb.abstasks.clustering_legacy.AbsTaskClusteringLegacy

Bases: AbsTask

Legacy abstract task for clustering. For new tasks, we recommend using AbsTaskClustering because it is faster, more sample-efficient, and produces more robust statistical estimates.

Attributes:

Name Type Description
dataset dict[HFSubset, DatasetDict] | None

A HuggingFace Dataset containing the data for the clustering task. It must contain the following columns: sentences: List of inputs to be clustered. Can be text, images, etc. Name can be changed via input_column_name. labels: List of integer labels representing the true cluster assignments. Name can be changed via label_column_name.

input_column_name str

The name of the column in the dataset that contains the input sentences or data points.

label_column_name str

The name of the column in the dataset that contains the true cluster labels.

abstask_prompt

Prompt to use for the task for instruction model if not prompt is provided in TaskMetadata.prompt.

Source code in mteb/abstasks/clustering_legacy.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
class AbsTaskClusteringLegacy(AbsTask):
    """Legacy abstract task for clustering. For new tasks, we recommend using AbsTaskClustering because it is faster, more sample-efficient, and produces more robust statistical estimates.

    Attributes:
        dataset: A HuggingFace Dataset containing the data for the clustering task. It must contain the following columns:
            sentences: List of inputs to be clustered. Can be text, images, etc. Name can be changed via `input_column_name`.
            labels: List of integer labels representing the true cluster assignments. Name can be changed via `label_column_name`.
        input_column_name: The name of the column in the dataset that contains the input sentences or data points.
        label_column_name: The name of the column in the dataset that contains the true cluster labels.
        abstask_prompt: Prompt to use for the task for instruction model if not prompt is provided in TaskMetadata.prompt.
    """

    abstask_prompt = "Identify categories in user passages."
    evaluator: type[ClusteringEvaluator] = ClusteringEvaluator
    input_column_name: str = "sentences"
    label_column_name: str = "labels"

    def _evaluate_subset(
        self,
        model: EncoderProtocol,
        data_split: Dataset,
        *,
        encode_kwargs: dict[str, Any],
        hf_split: str,
        hf_subset: str,
        prediction_folder: Path | None = None,
        **kwargs: Any,
    ) -> ScoresDict:
        # MTEB text clustering requires renaming and eval per subset.
        if self.metadata.modalities == ["text"]:
            all_metrics = []
            clusters = []
            for i, cluster_set in enumerate(data_split):
                logger.info(
                    f"Running clustering on cluster ({i + 1}/{len(data_split)})"
                )
                clustering_dataset = Dataset.from_dict(cluster_set).select_columns(
                    [self.input_column_name, self.label_column_name]
                )
                evaluator = self.evaluator(
                    clustering_dataset,
                    input_column_name=self.input_column_name,
                    label_column_name=self.label_column_name,
                    task_metadata=self.metadata,
                    hf_split=hf_split,
                    hf_subset=hf_subset,
                    **kwargs,
                )
                clusters_assignment = evaluator(model, encode_kwargs=encode_kwargs)
                clusters.append(clusters_assignment)
                set_metrics = self._compute_metrics(
                    clustering_dataset[self.label_column_name],
                    clusters_assignment,
                    v_measure_only=True,
                )
                all_metrics.append(set_metrics)

            if prediction_folder:
                self._save_task_predictions(
                    clusters,
                    model,
                    prediction_folder,
                    hf_subset=hf_subset,
                    hf_split=hf_split,
                )
            v_measures = [m["v_measure"] for m in all_metrics]
            v_mean = np.mean(v_measures)
            v_std = np.std(v_measures)
            scores = {
                "v_measure": v_mean,
                "v_measure_std": v_std,
                "v_measures": v_measures,
            }
            return scores

        data_split = data_split.select_columns(
            [self.input_column_name, self.label_column_name]
        )
        evaluator = self.evaluator(
            data_split,
            input_column_name=self.input_column_name,
            label_column_name=self.label_column_name,
            task_metadata=self.metadata,
            hf_split=hf_split,
            hf_subset=hf_subset,
            **kwargs,
        )
        clusters = evaluator(model, encode_kwargs=encode_kwargs)
        if prediction_folder:
            self._save_task_predictions(
                clusters,
                model,
                prediction_folder,
                hf_subset=hf_subset,
                hf_split=hf_split,
            )

        return self._compute_metrics(
            data_split[self.label_column_name],
            clusters,
        )

    def _compute_metrics(
        self,
        labels: list[int],
        cluster_assignment: list[int],
        v_measure_only: bool = False,
    ) -> ClusteringMetrics:
        logger.info("Running clustering - Evaluating clustering...")
        v_measure = metrics.cluster.v_measure_score(labels, cluster_assignment)
        if v_measure_only:
            return ClusteringMetrics(
                v_measure=v_measure,
            )
        nmi = metrics.cluster.normalized_mutual_info_score(labels, cluster_assignment)
        ari = metrics.cluster.adjusted_rand_score(labels, cluster_assignment)

        matrix = metrics.confusion_matrix(labels, cluster_assignment)
        # get linear sum assignment
        row_ind, col_ind = linear_sum_assignment(matrix, maximize=True)
        total_correct = matrix[row_ind, col_ind].sum()
        clustering_accuracy = total_correct / len(labels)
        return ClusteringMetrics(
            v_measure=float(v_measure),
            nmi=float(nmi),
            ari=float(ari),
            cluster_accuracy=float(clustering_accuracy),
        )

    def _calculate_descriptive_statistics_from_split(
        self, split: str, hf_subset: str | None = None, compute_overall: bool = False
    ) -> ClusteringDescriptiveStatistics:
        if hf_subset:
            inputs = self.dataset[hf_subset][split][self.input_column_name]
            labels = self.dataset[hf_subset][split][self.label_column_name]
        elif compute_overall:
            inputs = []
            labels = []
            for hf_subset in self.metadata.eval_langs:
                inputs.extend(self.dataset[hf_subset][split][self.input_column_name])
                labels.extend(self.dataset[hf_subset][split][self.label_column_name])
        else:
            inputs = self.dataset[split][self.input_column_name]
            labels = self.dataset[split][self.label_column_name]

        if isinstance(inputs[0], list):
            inputs = [item for sublist in inputs for item in sublist]
        if isinstance(labels[0], list):
            labels = [item for sublist in labels for item in sublist]

        text_statistics, image_statistics = None, None
        if "image" in self.metadata.modalities:
            image_statistics = calculate_image_statistics(inputs)

        if "text" in self.metadata.modalities:
            text_statistics = calculate_text_statistics(inputs)

        label_statistics = calculate_label_statistics(labels)

        return ClusteringDescriptiveStatistics(
            num_samples=len(inputs),
            text_statistics=text_statistics,
            image_statistics=image_statistics,
            label_statistics=label_statistics,
        )

    def _push_dataset_to_hub(self, repo_name: str) -> None:
        self._upload_dataset_to_hub(
            repo_name,
            [
                self.input_column_name,
                self.label_column_name,
            ],
        )

Text Tasks

mteb.abstasks.text.bitext_mining.AbsTaskBitextMining

Bases: AbsTask

Abstract class for BitextMining tasks

The similarity is computed between pairs and the results are ranked.

Attributes:

Name Type Description
dataset dict[HFSubset, DatasetDict] | None

A HuggingFace dataset containing the data for the task. It must contain the following columns sentence1 and sentence2 for the two texts to be compared.

parallel_subsets

If true task language pairs should be in one split as column names, otherwise each language pair should be a subset.

abstask_prompt

Prompt to use for the task for instruction model if not prompt is provided in TaskMetadata.prompt.

Source code in mteb/abstasks/text/bitext_mining.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
class AbsTaskBitextMining(AbsTask):
    """Abstract class for BitextMining tasks

    The similarity is computed between pairs and the results are ranked.

    Attributes:
        dataset: A HuggingFace dataset containing the data for the task. It must contain the following columns sentence1 and sentence2 for the two texts to be compared.
        parallel_subsets: If true task language pairs should be in one split as column names, otherwise each language pair should be a subset.
        abstask_prompt: Prompt to use for the task for instruction model if not prompt is provided in TaskMetadata.prompt.
    """

    parallel_subsets = False
    abstask_prompt = "Retrieve parallel sentences."
    _DEFAULT_PAIR: ClassVar[list[tuple[str, str]]] = [("sentence1", "sentence2")]

    def evaluate(
        self,
        model: MTEBModels,
        split: str = "test",
        subsets_to_run: list[HFSubset] | None = None,
        *,
        encode_kwargs: dict[str, Any],
        prediction_folder: Path | None = None,
        **kwargs: Any,
    ) -> dict[HFSubset, ScoresDict]:
        """Added load for "parallel" datasets"""
        if not self.data_loaded:
            self.load_data()

        hf_subsets = self.hf_subsets

        # If subsets_to_run is specified, filter the hf_subsets accordingly
        if subsets_to_run is not None:
            hf_subsets = [s for s in hf_subsets if s in subsets_to_run]

        scores = {}
        if self.parallel_subsets:
            scores = self._evaluate_subset(
                model,
                self.dataset[split],  # type: ignore
                parallel=True,
                hf_split=split,
                hf_subset="parallel",
                encode_kwargs=encode_kwargs,
                prediction_folder=prediction_folder,
                **kwargs,
            )
        else:
            for hf_subset in hf_subsets:
                logger.info(
                    f"Task: {self.metadata.name}, split: {split}, subset: {hf_subset}. Running..."
                )

                if hf_subset not in self.dataset and hf_subset == "default":
                    data_split = self.dataset[split]
                else:
                    data_split = self.dataset[hf_subset][split]
                scores[hf_subset] = self._evaluate_subset(
                    model,
                    data_split,
                    hf_split=split,
                    hf_subset=hf_subset,
                    encode_kwargs=encode_kwargs,
                    prediction_folder=prediction_folder,
                    **kwargs,
                )

        return scores

    def _get_pairs(self, parallel: bool) -> list[tuple[str, str]]:
        pairs = self._DEFAULT_PAIR
        if parallel:
            pairs = [langpair.split("-") for langpair in self.hf_subsets]
        return pairs

    def _evaluate_subset(
        self,
        model: EncoderProtocol,
        data_split: Dataset,
        *,
        hf_split: str,
        hf_subset: str,
        parallel: bool = False,
        encode_kwargs: dict[str, Any],
        prediction_folder: Path | None = None,
        **kwargs,
    ) -> ScoresDict:
        pairs = self._get_pairs(parallel)

        evaluator = BitextMiningEvaluator(
            data_split,
            task_metadata=self.metadata,
            pair_columns=pairs,  # type: ignore
            hf_split=hf_split,
            hf_subset=hf_subset,
            **kwargs,
        )
        # NOTE: used only by BUCC
        gold = (
            list(zip(range(len(data_split)), range(len(data_split))))
            if "gold" not in data_split
            else data_split["gold"]
        )

        neighbours = evaluator(model, encode_kwargs=encode_kwargs)

        if prediction_folder:
            self._save_task_predictions(
                neighbours,
                model,
                prediction_folder,
                hf_subset=hf_subset,
                hf_split=hf_split,
            )

        if parallel:
            metrics = {}
            for keys, nearest_neighbors in neighbours.items():
                metrics[keys] = self._compute_metrics(nearest_neighbors, gold)

            for v in metrics.values():
                self._add_main_score(v)
        else:
            def_pair_str = "-".join(self._DEFAULT_PAIR[0])
            metrics = self._compute_metrics(neighbours[def_pair_str], gold)
            self._add_main_score(metrics)
        return metrics

    def _compute_metrics(
        self,
        nearest_neighbors: list[dict[str, float]],
        gold: list[tuple[int, int]],
    ) -> BitextMiningMetrics:
        logger.info("Computing metrics...")
        labels = []
        predictions = []
        for i, x in enumerate(nearest_neighbors):
            j = x["corpus_id"]
            predictions.append(j)
            labels.append(gold[i][1])

        return BitextMiningMetrics(
            precision=precision_score(
                labels, predictions, zero_division=0, average="weighted"
            ),
            recall=recall_score(
                labels, predictions, zero_division=0, average="weighted"
            ),
            f1=f1_score(labels, predictions, zero_division=0, average="weighted"),
            accuracy=accuracy_score(labels, predictions),
        )

    def _calculate_descriptive_statistics_from_split(
        self, split: str, hf_subset: str | None = None, compute_overall: bool = False
    ) -> BitextDescriptiveStatistics:
        pairs_cols = self._get_pairs(self.parallel_subsets)
        if hf_subset:
            if self.parallel_subsets:
                sent_1, sent_2 = hf_subset.split("-")
                sentence1 = self.dataset[split][sent_1]
                sentence2 = self.dataset[split][sent_2]
            else:
                sent_1, sent_2 = pairs_cols[0]
                sentence1 = self.dataset[hf_subset][split][sent_1]
                sentence2 = self.dataset[hf_subset][split][sent_2]
        elif compute_overall:
            sentence1, sentence2 = [], []
            if self.parallel_subsets:
                for hf_subset in self.metadata.eval_langs:
                    sent_1, sent_2 = hf_subset.split("-")
                    sentence1.extend(self.dataset[split][sent_1])
                    sentence2.extend(self.dataset[split][sent_2])
            else:
                sent_1, sent_2 = pairs_cols[0]
                for hf_subset in self.metadata.eval_langs:
                    sentence1.extend(self.dataset[hf_subset][split][sent_1])
                    sentence2.extend(self.dataset[hf_subset][split][sent_2])
        else:
            sent_1, sent_2 = pairs_cols[0]
            sentence1 = self.dataset[split][sent_1]
            sentence2 = self.dataset[split][sent_2]

        text1_statistics = calculate_text_statistics(sentence1)
        text2_statistics = calculate_text_statistics(sentence2)
        unique_pairs = len(set(zip(sentence1, sentence2)))

        return BitextDescriptiveStatistics(
            num_samples=len(sentence1),
            number_of_characters=(
                text1_statistics["total_text_length"]
                + text2_statistics["total_text_length"]
            ),
            unique_pairs=unique_pairs,
            sentence1_statistics=text1_statistics,
            sentence2_statistics=text2_statistics,
        )

    def _push_dataset_to_hub(self, repo_name: str) -> None:
        if self.metadata.is_multilingual:
            dataset = defaultdict(dict)
            for config in self.metadata.eval_langs:
                logger.info(f"Converting {config} of {self.metadata.name}")

                if self.parallel_subsets:
                    for split in self.dataset:
                        sent_1, sent_2 = config.split("-")
                        dataset[split][sent_1] = self.dataset[split][sent_1]
                        dataset[split][sent_2] = self.dataset[split][sent_2]
                else:
                    sent_1, sent_2 = self._get_pairs(self.parallel_subsets)[0]
                    lang_1, lang_2 = config.split("-")
                    for split in self.dataset[config]:
                        dataset[split][lang_1] = self.dataset[config][split][sent_1]
                        dataset[split][lang_2] = self.dataset[config][split][sent_2]
            for split in dataset:
                dataset[split] = Dataset.from_dict(dataset[split])
            dataset = DatasetDict(dataset)
            dataset.push_to_hub(repo_name)
        else:
            sentences = {}
            for split in self.dataset:
                sent_1, sent_2 = self._get_pairs(self.parallel_subsets)[0]
                sentences[split] = Dataset.from_dict(
                    {
                        "sentence1": self.dataset[split][sent_1],
                        "sentence2": self.dataset[split][sent_2],
                    }
                )
            sentences = DatasetDict(sentences)
            sentences.push_to_hub(repo_name)

evaluate(model, split='test', subsets_to_run=None, *, encode_kwargs, prediction_folder=None, **kwargs)

Added load for "parallel" datasets

Source code in mteb/abstasks/text/bitext_mining.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def evaluate(
    self,
    model: MTEBModels,
    split: str = "test",
    subsets_to_run: list[HFSubset] | None = None,
    *,
    encode_kwargs: dict[str, Any],
    prediction_folder: Path | None = None,
    **kwargs: Any,
) -> dict[HFSubset, ScoresDict]:
    """Added load for "parallel" datasets"""
    if not self.data_loaded:
        self.load_data()

    hf_subsets = self.hf_subsets

    # If subsets_to_run is specified, filter the hf_subsets accordingly
    if subsets_to_run is not None:
        hf_subsets = [s for s in hf_subsets if s in subsets_to_run]

    scores = {}
    if self.parallel_subsets:
        scores = self._evaluate_subset(
            model,
            self.dataset[split],  # type: ignore
            parallel=True,
            hf_split=split,
            hf_subset="parallel",
            encode_kwargs=encode_kwargs,
            prediction_folder=prediction_folder,
            **kwargs,
        )
    else:
        for hf_subset in hf_subsets:
            logger.info(
                f"Task: {self.metadata.name}, split: {split}, subset: {hf_subset}. Running..."
            )

            if hf_subset not in self.dataset and hf_subset == "default":
                data_split = self.dataset[split]
            else:
                data_split = self.dataset[hf_subset][split]
            scores[hf_subset] = self._evaluate_subset(
                model,
                data_split,
                hf_split=split,
                hf_subset=hf_subset,
                encode_kwargs=encode_kwargs,
                prediction_folder=prediction_folder,
                **kwargs,
            )

    return scores

mteb.abstasks.pair_classification.AbsTaskPairClassification

Bases: AbsTask

Abstract class for PairClassificationTasks

The similarity is computed between pairs and the results are ranked. Average precision is computed to measure how well the methods can be used for pairwise pair classification.

Attributes:

Name Type Description
dataset dict[HFSubset, DatasetDict] | None

A HuggingFace dataset containing the data for the task. Should contain the following columns: sentence1, sentence2, labels.

input1_column_name str

The name of the column containing the first sentence in the pair.

input2_column_name str

The name of the column containing the second sentence in the pair.

label_column_name str

The name of the column containing the labels for the pairs. Labels should be 0 or 1.

abstask_prompt

Prompt to use for the task for instruction model if not prompt is provided in TaskMetadata.prompt.

Source code in mteb/abstasks/pair_classification.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
class AbsTaskPairClassification(AbsTask):
    """Abstract class for PairClassificationTasks

    The similarity is computed between pairs and the results are ranked. Average precision
    is computed to measure how well the methods can be used for pairwise pair classification.

    Attributes:
        dataset: A HuggingFace dataset containing the data for the task. Should contain the following columns: sentence1, sentence2, labels.
        input1_column_name: The name of the column containing the first sentence in the pair.
        input2_column_name: The name of the column containing the second sentence in the pair.
        label_column_name: The name of the column containing the labels for the pairs. Labels should be 0 or 1.
        abstask_prompt: Prompt to use for the task for instruction model if not prompt is provided in TaskMetadata.prompt.
    """

    abstask_prompt = "Retrieve text that are semantically similar to the given text."
    input1_column_name: str = "sentence1"
    input2_column_name: str = "sentence2"
    label_column_name: str = "labels"

    def _evaluate_subset(
        self,
        model: EncoderProtocol,
        data_split: Dataset,
        *,
        hf_split: str,
        hf_subset: str,
        encode_kwargs: dict[str, str],
        prediction_folder: Path | None = None,
        **kwargs,
    ) -> dict[str, float]:
        if self.metadata.modalities == ["text"]:
            # for compatibility with v1 version where datasets were stored in a single row
            data_split = data_split[0] if len(data_split) == 1 else data_split
        evaluator = PairClassificationEvaluator(
            data_split,
            self.input1_column_name,
            self.input2_column_name,
            task_metadata=self.metadata,
            hf_split=hf_split,
            hf_subset=hf_subset,
            **kwargs,
        )
        similarity_scores = evaluator(model, encode_kwargs=encode_kwargs)

        if prediction_folder:
            self._save_task_predictions(
                similarity_scores,
                model,
                prediction_folder,
                hf_subset=hf_subset,
                hf_split=hf_split,
            )
        return self._compute_metrics(
            similarity_scores, data_split[self.label_column_name]
        )

    def _compute_metrics(
        self, similarity_scores: PairClassificationDistances, labels: list[int]
    ) -> dict[str, float]:
        logger.info("Computing metrics...")
        labels = np.asarray(labels)
        output_scores = {}
        max_scores = defaultdict(list)
        for short_name, scores, reverse in [
            [
                "similarity",
                similarity_scores["similarity_scores"],
                True,
            ],
            [ScoringFunction.COSINE.value, similarity_scores["cosine_scores"], True],
            [
                ScoringFunction.MANHATTAN.value,
                similarity_scores["manhattan_distances"],
                False,
            ],
            [
                ScoringFunction.EUCLIDEAN.value,
                similarity_scores["euclidean_distances"],
                False,
            ],
            [ScoringFunction.DOT_PRODUCT.value, similarity_scores["dot_scores"], True],
        ]:
            metrics = self._compute_metrics_values(scores, labels, reverse)
            for metric_name, metric_value in metrics.items():
                output_scores[f"{short_name}_{metric_name}"] = metric_value
                max_scores[metric_name].append(metric_value)

        for metric in max_scores:
            if metric in ["f1", "ap", "precision", "recall", "accuracy"]:
                output_scores[f"max_{metric}"] = max(max_scores[metric])
        return output_scores

    def _calculate_descriptive_statistics_from_split(
        self, split: str, hf_subset: str | None = None, compute_overall: bool = False
    ) -> PairClassificationDescriptiveStatistics:
        if hf_subset:
            dataset = self.dataset[hf_subset][split]
        elif compute_overall:
            dataset = defaultdict(list)
            for hf_subset in self.metadata.eval_langs:
                cur_dataset = self.dataset[hf_subset][split]
                # for compatibility with v1 version where datasets were stored in a single row
                if isinstance(cur_dataset, list) or len(cur_dataset) == 1:
                    cur_dataset = cur_dataset[0]
                if isinstance(cur_dataset, Dataset):
                    for row in cur_dataset:
                        for k, v in row.items():
                            dataset[k].append(v)
                else:
                    for key, value in cur_dataset.items():
                        dataset[key].extend(value[0] if len(value) == 1 else value)
        else:
            dataset = self.dataset[split]

        if isinstance(dataset, list):
            dataset = dataset[0]

        input1 = (
            dataset[self.input1_column_name][0]
            if len(dataset[self.input1_column_name]) == 1
            else dataset[self.input1_column_name]
        )
        input2 = (
            dataset[self.input2_column_name][0]
            if len(dataset[self.input2_column_name]) == 1
            else dataset[self.input2_column_name]
        )
        labels = (
            dataset[self.label_column_name][0]
            if len(dataset[self.label_column_name]) == 1
            else dataset[self.label_column_name]
        )

        text1_statistics = None
        text2_statistics = None
        image1_statistics = None
        image2_statistics = None
        number_of_characters = None
        unique_pairs = None
        if self.metadata.modalities == ["text"]:
            text1_statistics = calculate_text_statistics(input1)
            text2_statistics = calculate_text_statistics(input2)
            number_of_characters = (
                text1_statistics["total_text_length"]
                + text2_statistics["total_text_length"]
            )
            unique_pairs = len(set(zip(input1, input2)))

        elif self.metadata.modalities == ["image"]:
            image1_statistics = calculate_image_statistics(input1)
            image2_statistics = calculate_image_statistics(input2)

            def _compute_image_hash(inputs: list) -> list[str]:
                hashes = set()
                for img in inputs:
                    img_bytes = img.tobytes()
                    img_hash = hashlib.md5(img_bytes).hexdigest()
                    hashes.add(img_hash)
                return list(hashes)

            image_1_hashes = _compute_image_hash(input1)
            image_2_hashes = _compute_image_hash(input2)
            unique_pairs = len(set(zip(image_1_hashes, image_2_hashes)))

        return PairClassificationDescriptiveStatistics(
            num_samples=len(input1),
            unique_pairs=unique_pairs,
            number_of_characters=number_of_characters,
            text1_statistics=text1_statistics,
            image1_statistics=image1_statistics,
            text2_statistics=text2_statistics,
            image2_statistics=image2_statistics,
            labels_statistics=calculate_label_statistics(labels),
        )

    def _push_dataset_to_hub(self, repo_name: str) -> None:
        # previously pair classification datasets were stored in a single row
        if self.metadata.is_multilingual:
            for subset in self.dataset:
                for split in self.dataset[subset]:
                    if len(self.dataset[subset][split]) == 1:
                        self.dataset[subset][split] = self.dataset[subset][split][0]
        else:
            for split in self.dataset:
                if len(self.dataset[split]) == 1:
                    self.dataset[split] = self.dataset[split][0]
        self._upload_dataset_to_hub(
            repo_name,
            [
                self.input1_column_name,
                self.input2_column_name,
                self.label_column_name,
            ],
        )

    def _compute_metrics_values(
        self, scores: list[float], labels: np.ndarray, high_score_more_similar: bool
    ) -> dict[str, float]:
        """Compute the metrics for the given scores and labels.

        Args:
            scores: The similarity/dissimilarity scores for the pairs, specified as an array of shape (n_pairs, ).
            labels: The labels for the pairs, specified as an array of shape (n_pairs, ).
            high_score_more_similar: If true, then the higher the score, the more similar the pairs are.

        Returns:
            The metrics for the given scores and labels.
        """
        acc, acc_threshold = self._find_best_acc_and_threshold(
            scores, labels, high_score_more_similar
        )
        (
            f1,
            precision,
            recall,
            f1_threshold,
        ) = self._find_best_f1_and_threshold(scores, labels, high_score_more_similar)
        ap = average_precision_score(
            labels, np.array(scores) * (1 if high_score_more_similar else -1)
        )

        return dict(
            accuracy=float(acc),
            f1=float(f1),
            precision=float(precision),
            recall=float(recall),
            ap=float(ap),
        )

    def _find_best_acc_and_threshold(
        self, scores: np.ndarray, labels: np.ndarray, high_score_more_similar: bool
    ) -> tuple[float, float]:
        rows = list(zip(scores, labels))
        rows = sorted(rows, key=lambda x: x[0], reverse=high_score_more_similar)

        max_acc = 0
        best_threshold = -1
        positive_so_far = 0
        remaining_negatives = sum(np.array(labels) == 0)

        for i in range(len(rows) - 1):
            score, label = rows[i]
            if label == 1:
                positive_so_far += 1
            else:
                remaining_negatives -= 1

            acc = (positive_so_far + remaining_negatives) / len(labels)
            if acc > max_acc:
                max_acc = acc
                best_threshold = (rows[i][0] + rows[i + 1][0]) / 2
        return max_acc, best_threshold

    def _find_best_f1_and_threshold(
        self, scores, labels, high_score_more_similar: bool
    ) -> tuple[float, float, float, float]:
        scores = np.asarray(scores)
        labels = np.asarray(labels)

        rows = list(zip(scores, labels))

        rows = sorted(rows, key=lambda x: x[0], reverse=high_score_more_similar)

        best_f1 = best_precision = best_recall = 0
        threshold = 0
        nextract = 0
        ncorrect = 0
        total_num_duplicates = sum(labels)

        for i in range(len(rows) - 1):
            score, label = rows[i]
            nextract += 1

            if label == 1:
                ncorrect += 1

            if ncorrect > 0:
                precision = ncorrect / nextract
                recall = ncorrect / total_num_duplicates
                f1 = 2 * precision * recall / (precision + recall)
                if f1 > best_f1:
                    best_f1 = f1
                    best_precision = precision
                    best_recall = recall
                    threshold = (rows[i][0] + rows[i + 1][0]) / 2

        return best_f1, best_precision, best_recall, threshold

mteb.abstasks.text.summarization.AbsTaskSummarization

Bases: AbsTask

Abstract class for summarization experiments.

Attributes:

Name Type Description
dataset dict[HFSubset, DatasetDict] | None

HuggingFace dataset containing the data for the task. Should have columns: - text: The original text to be summarized. - human_summaries: A list of human-written summaries for the text. - machine_summaries: A list of machine-generated summaries for the text. - relevance: A list of relevance scores (integers) corresponding to each machine summary, indicating how relevant each summary is to the original text.

min_score int

Minimum possible relevance score (inclusive).

max_score int

Maximum possible relevance score (inclusive).

human_summaries_column_name str

Name of the column containing human summaries.

machine_summaries_column_name str

Name of the column containing machine summaries.

text_column_name str

Name of the column containing the original text.

relevancy_column_name str

Name of the column containing relevance scores.

abstask_prompt

Prompt to use for the task for instruction model if not prompt is provided in TaskMetadata.prompt.

Source code in mteb/abstasks/text/summarization.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
class AbsTaskSummarization(AbsTask):
    """Abstract class for summarization experiments.

    Attributes:
        dataset: HuggingFace dataset containing the data for the task. Should have columns:
            - text: The original text to be summarized.
            - human_summaries: A list of human-written summaries for the text.
            - machine_summaries: A list of machine-generated summaries for the text.
            - relevance: A list of relevance scores (integers) corresponding to each machine summary, indicating how relevant each summary is to the original text.
        min_score: Minimum possible relevance score (inclusive).
        max_score: Maximum possible relevance score (inclusive).
        human_summaries_column_name: Name of the column containing human summaries.
        machine_summaries_column_name: Name of the column containing machine summaries.
        text_column_name: Name of the column containing the original text.
        relevancy_column_name: Name of the column containing relevance scores.
        abstask_prompt: Prompt to use for the task for instruction model if not prompt is provided in TaskMetadata.prompt.
    """

    min_score: int
    max_score: int

    abstask_prompt = (
        "Given a news summary, retrieve other semantically similar summaries."
    )
    # SummEval has DeprecatedSummarizationEvaluator
    evaluator = SummarizationEvaluator
    text_column_name: str = "text"
    human_summaries_column_name: str = "human_summaries"
    machine_summaries_column_name: str = "machine_summaries"
    relevancy_column_name: str = "relevance"

    def _evaluate_subset(
        self,
        model: EncoderProtocol,
        data_split: Dataset,
        *,
        hf_split: str,
        hf_subset: str,
        encode_kwargs: dict[str, Any],
        prediction_folder: Path | None = None,
        **kwargs,
    ) -> SummarizationMetrics:
        normalized_scores = [
            (np.array(x) - self.min_score) / (self.max_score - self.min_score)
            for x in data_split[self.relevancy_column_name]
        ]
        evaluator = self.evaluator(
            machine_summaries=data_split[self.machine_summaries_column_name],
            human_summaries=data_split[self.human_summaries_column_name],
            texts=data_split[self.text_column_name],
            gold_scores=normalized_scores,
            task_metadata=self.metadata,
            hf_split=hf_split,
            hf_subset=hf_subset,
            **kwargs,
        )
        scores = evaluator(model, encode_kwargs=encode_kwargs)
        if prediction_folder:
            self._save_task_predictions(
                scores,
                model,
                prediction_folder,
                hf_subset=hf_subset,
                hf_split=hf_split,
            )
        return evaluator._calculate_metrics(scores)

    def _calculate_descriptive_statistics_from_split(
        self, split: str, hf_subset: str | None = None, compute_overall: bool = False
    ) -> SummarizationDescriptiveStatistics:
        if hf_subset:
            text = self.dataset[hf_subset][split][self.text_column_name]
            human_summaries = self.dataset[hf_subset][split][
                self.human_summaries_column_name
            ]
            machine_summaries = self.dataset[hf_subset][split][
                self.machine_summaries_column_name
            ]
            relevance = self.dataset[hf_subset][split][self.relevancy_column_name]
        elif compute_overall:
            text = []
            human_summaries = []
            machine_summaries = []
            relevance = []

            for hf_subset in self.metadata.eval_langs:
                text.extend(self.dataset[hf_subset][split][self.text_column_name])
                human_summaries.extend(
                    self.dataset[hf_subset][split][self.human_summaries_column_name]
                )
                machine_summaries.extend(
                    self.dataset[hf_subset][split][self.machine_summaries_column_name]
                )
                relevance.extend(
                    self.dataset[hf_subset][split][self.relevancy_column_name]
                )
        else:
            text = self.dataset[split][self.text_column_name]
            human_summaries = self.dataset[split][self.human_summaries_column_name]
            machine_summaries = self.dataset[split][self.machine_summaries_column_name]
            relevance = self.dataset[split][self.relevancy_column_name]

        all_human_summaries = []
        for s in human_summaries:
            all_human_summaries.extend(s)

        all_machine_summaries = []
        for s in machine_summaries:
            all_machine_summaries.extend(s)

        text_statistics = calculate_text_statistics(text)
        human_summaries_statistics = calculate_text_statistics(all_human_summaries)
        machine_summaries_statistics = calculate_text_statistics(all_machine_summaries)

        relevance = [item for sublist in relevance for item in sublist]

        return SummarizationDescriptiveStatistics(
            num_samples=len(text),
            number_of_characters=(
                text_statistics["total_text_length"]
                + human_summaries_statistics["total_text_length"]
                + machine_summaries_statistics["total_text_length"]
            ),
            text_statistics=text_statistics,
            human_summaries_statistics=human_summaries_statistics,
            machine_summaries_statistics=machine_summaries_statistics,
            score_statistics=calculate_score_statistics(relevance),
        )

mteb.abstasks.text.reranking.AbsTaskReranking

Bases: AbsTaskRetrieval

Reranking task class.

Deprecated

This class is deprecated and will be removed in future versions. Please use the updated retrieval tasks instead. You can add your task name to mteb.abstasks.text.reranking.OLD_FORMAT_RERANKING_TASKS to load it in new format. You can reupload it using task.push_dataset_to_hub('your/repository') after loading the data. For dataformat and other information, see AbsTaskRetrieval.

Source code in mteb/abstasks/text/reranking.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
@deprecated(
    "This class is deprecated and will be removed in future versions. Please use the updated retrieval tasks instead. "
    "You can add your task name to mteb.abstasks.text.reranking.OLD_FORMAT_RERANKING_TASKS to load it in new format. "
    "You can reupload it using `task.push_dataset_to_hub('your/repository')` after loading the data."
)
class AbsTaskReranking(AbsTaskRetrieval):
    """Reranking task class.

    Warning: Deprecated
        This class is deprecated and will be removed in future versions. Please use the updated retrieval tasks instead.
        You can add your task name to mteb.abstasks.text.reranking.OLD_FORMAT_RERANKING_TASKS to load it in new format.
        You can reupload it using `task.push_dataset_to_hub('your/repository')` after loading the data.
        For dataformat and other information, see [AbsTaskRetrieval][mteb.abstasks.retrieval.AbsTaskRetrieval].
    """

    def load_data(self) -> None:
        """Load the dataset."""
        if self.data_loaded:
            return

        if self.metadata.name in OLD_FORMAT_RERANKING_TASKS:
            self.transform_old_dataset_format()
        else:
            # use AbsTaskRetrieval default to load the data
            return super().load_data()

    def _process_example(self, example: dict, split: str, query_idx: int) -> dict:
        """Process a single example from the dataset.

        Args:
            example: A single example from the dataset containing 'query', 'positive', and 'negative' fields.
            split: The dataset split (e.g., 'train', 'validation', 'test').
            query_idx: The index of the query in the dataset split.

        Returns:
            A dictionary containing the processed example with query_id, query text, document ids, document texts, and relevance scores.
        """
        query = example["query"]
        positive_docs = example["positive"]
        negative_docs = example["negative"]

        query_id = f"{split}_query{query_idx}"

        # Initialize the structures for this example
        example_data = {
            "query_id": query_id,
            "query": query,
            "doc_ids": [],
            "doc_texts": [],
            "relevance_scores": [],
        }

        for i, pos_doc in enumerate(positive_docs):
            # format i as a five digit number
            formatted_i = str(i).zfill(5)
            # have "a" in front so that positives are first, then negatives
            #   this shouldn't matter except for ties, and the previous reranking results
            #   had the positives first
            doc_id = f"apositive_{query_id}_{formatted_i}"
            example_data["doc_ids"].append(doc_id)
            example_data["doc_texts"].append(pos_doc)
            example_data["relevance_scores"].append(1)

        for i, neg_doc in enumerate(negative_docs):
            formatted_i = str(i).zfill(5)
            doc_id = f"negative_{query_id}_{formatted_i}"
            example_data["doc_ids"].append(doc_id)
            example_data["doc_texts"].append(neg_doc)
            example_data["relevance_scores"].append(0)

        return example_data

    def transform_old_dataset_format(self, given_dataset: Dataset | None = None):
        """Transform the old format to the new format using HF datasets mapping. This is a one-time transformation for datasets which are in the old format.

        Args:
            given_dataset (Dataset, optional): The dataset to transform. Defaults to None. This is helpful for some older datasets which are loaded with custom code, but need to be transformed still.
        """
        if self.metadata.name not in OLD_FORMAT_RERANKING_TASKS:
            return

        logging.info(
            f"Transforming old format to standard format for {self.metadata.name}"
        )

        given_dataset = copy(given_dataset)
        self.dataset = defaultdict(lambda: defaultdict(dict))

        hf_subsets = self.hf_subsets

        for hf_subset in hf_subsets:
            if given_dataset:
                cur_dataset = given_dataset
                if hf_subset in cur_dataset:
                    cur_dataset = cur_dataset[hf_subset]
            elif "name" in self.metadata.dataset:
                cur_dataset = datasets.load_dataset(**self.metadata.dataset)  # type: ignore
                assert hf_subset == "default", (
                    f"Only default subset is supported for {self.metadata.name} since `name` is given in the metadata."
                )
            else:
                cur_dataset = datasets.load_dataset(
                    **self.metadata.dataset, name=hf_subset
                )  # type: ignore

            for split in cur_dataset:
                corpus = []
                queries = []
                relevant_docs = defaultdict(dict)
                top_ranked = defaultdict(list)

                # Create an enumerated dataset to pass indices
                enumerated_dataset = Dataset.from_dict(
                    {
                        "index": range(len(cur_dataset[split])),
                        "query": cur_dataset[split]["query"],
                        "positive": cur_dataset[split]["positive"],
                        "negative": cur_dataset[split]["negative"],
                    }
                )

                # first, filter out the ones that have no positive or no negatives
                enumerated_dataset = enumerated_dataset.filter(
                    lambda example: len(example["positive"]) > 0
                    and len(example["negative"]) > 0
                )

                logger.info(
                    f"Filtered out {len(cur_dataset[split]) - len(enumerated_dataset)} examples with no positive or no negative examples. {len(enumerated_dataset)} examples remaining."
                )

                # Map the transformation function over the dataset
                processed_dataset = enumerated_dataset.map(
                    lambda example, idx: self._process_example(example, split, idx),
                    with_indices=True,
                    remove_columns=enumerated_dataset.column_names,
                )

                # Populate the data structures
                for item in processed_dataset:
                    query_id = item["query_id"]
                    queries.append({"id": query_id, "text": item["query"]})

                    # Add documents and relevance information
                    for doc_id, doc_text, relevance in zip(
                        item["doc_ids"], item["doc_texts"], item["relevance_scores"]
                    ):
                        corpus.append(
                            {
                                "title": "",
                                "text": doc_text,
                                "id": doc_id,
                            }
                        )
                        top_ranked[query_id].append(doc_id)
                        relevant_docs[query_id][doc_id] = relevance

                self.dataset[hf_subset][split] = RetrievalSplitData(
                    corpus=Dataset.from_list(corpus),
                    queries=Dataset.from_list(queries),
                    relevant_docs=relevant_docs,
                    top_ranked=top_ranked,
                )
        self.data_loaded = True

load_data()

Load the dataset.

Source code in mteb/abstasks/text/reranking.py
37
38
39
40
41
42
43
44
45
46
def load_data(self) -> None:
    """Load the dataset."""
    if self.data_loaded:
        return

    if self.metadata.name in OLD_FORMAT_RERANKING_TASKS:
        self.transform_old_dataset_format()
    else:
        # use AbsTaskRetrieval default to load the data
        return super().load_data()

transform_old_dataset_format(given_dataset=None)

Transform the old format to the new format using HF datasets mapping. This is a one-time transformation for datasets which are in the old format.

Parameters:

Name Type Description Default
given_dataset Dataset

The dataset to transform. Defaults to None. This is helpful for some older datasets which are loaded with custom code, but need to be transformed still.

None
Source code in mteb/abstasks/text/reranking.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def transform_old_dataset_format(self, given_dataset: Dataset | None = None):
    """Transform the old format to the new format using HF datasets mapping. This is a one-time transformation for datasets which are in the old format.

    Args:
        given_dataset (Dataset, optional): The dataset to transform. Defaults to None. This is helpful for some older datasets which are loaded with custom code, but need to be transformed still.
    """
    if self.metadata.name not in OLD_FORMAT_RERANKING_TASKS:
        return

    logging.info(
        f"Transforming old format to standard format for {self.metadata.name}"
    )

    given_dataset = copy(given_dataset)
    self.dataset = defaultdict(lambda: defaultdict(dict))

    hf_subsets = self.hf_subsets

    for hf_subset in hf_subsets:
        if given_dataset:
            cur_dataset = given_dataset
            if hf_subset in cur_dataset:
                cur_dataset = cur_dataset[hf_subset]
        elif "name" in self.metadata.dataset:
            cur_dataset = datasets.load_dataset(**self.metadata.dataset)  # type: ignore
            assert hf_subset == "default", (
                f"Only default subset is supported for {self.metadata.name} since `name` is given in the metadata."
            )
        else:
            cur_dataset = datasets.load_dataset(
                **self.metadata.dataset, name=hf_subset
            )  # type: ignore

        for split in cur_dataset:
            corpus = []
            queries = []
            relevant_docs = defaultdict(dict)
            top_ranked = defaultdict(list)

            # Create an enumerated dataset to pass indices
            enumerated_dataset = Dataset.from_dict(
                {
                    "index": range(len(cur_dataset[split])),
                    "query": cur_dataset[split]["query"],
                    "positive": cur_dataset[split]["positive"],
                    "negative": cur_dataset[split]["negative"],
                }
            )

            # first, filter out the ones that have no positive or no negatives
            enumerated_dataset = enumerated_dataset.filter(
                lambda example: len(example["positive"]) > 0
                and len(example["negative"]) > 0
            )

            logger.info(
                f"Filtered out {len(cur_dataset[split]) - len(enumerated_dataset)} examples with no positive or no negative examples. {len(enumerated_dataset)} examples remaining."
            )

            # Map the transformation function over the dataset
            processed_dataset = enumerated_dataset.map(
                lambda example, idx: self._process_example(example, split, idx),
                with_indices=True,
                remove_columns=enumerated_dataset.column_names,
            )

            # Populate the data structures
            for item in processed_dataset:
                query_id = item["query_id"]
                queries.append({"id": query_id, "text": item["query"]})

                # Add documents and relevance information
                for doc_id, doc_text, relevance in zip(
                    item["doc_ids"], item["doc_texts"], item["relevance_scores"]
                ):
                    corpus.append(
                        {
                            "title": "",
                            "text": doc_text,
                            "id": doc_id,
                        }
                    )
                    top_ranked[query_id].append(doc_id)
                    relevant_docs[query_id][doc_id] = relevance

            self.dataset[hf_subset][split] = RetrievalSplitData(
                corpus=Dataset.from_list(corpus),
                queries=Dataset.from_list(queries),
                relevant_docs=relevant_docs,
                top_ranked=top_ranked,
            )
    self.data_loaded = True

Image Tasks

mteb.abstasks.image.image_text_pair_classification.AbsTaskImageTextPairClassification

Bases: AbsTask

Abstract class for Image Text Pair Classification tasks (Compositionality evaluation).

The similarity is computed between pairs and the results are ranked. Note that the number of images and the number of captions can be different.

Attributes:

Name Type Description
dataset dict[HFSubset, DatasetDict] | None

A HuggingFace Dataset containing the data for the ImageTextPairClassification task. Should have columns: - images: List of images. - captions: List of captions.

images_column_names str | Sequence[str]

Name of the column(s) containing the images.

texts_column_names str | Sequence[str]

Name of the column(s) containing the captions.

abstask_prompt str | None

Prompt to use for the task for instruction model if not prompt is provided in TaskMetadata.prompt.

Source code in mteb/abstasks/image/image_text_pair_classification.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
class AbsTaskImageTextPairClassification(AbsTask):
    """Abstract class for Image Text Pair Classification tasks (Compositionality evaluation).

    The similarity is computed between pairs and the results are ranked.
    Note that the number of images and the number of captions can be different.

    Attributes:
        dataset: A HuggingFace Dataset containing the data for the ImageTextPairClassification task. Should have columns:
            - images: List of images.
            - captions: List of captions.
        images_column_names: Name of the column(s) containing the images.
        texts_column_names: Name of the column(s) containing the captions.
        abstask_prompt: Prompt to use for the task for instruction model if not prompt is provided in TaskMetadata.prompt.
    """

    # it can be ["image_0", "image_1"]; ["text_0", "text_1"] for datasets like WinoGround
    images_column_names: str | Sequence[str] = "image"
    texts_column_names: str | Sequence[str] = "caption"

    def _calculate_descriptive_statistics_from_split(
        self, split: str, hf_subset: str | None = None, compute_overall: bool = False
    ) -> ImageTextPairClassificationDescriptiveStatistics:
        if compute_overall:
            dataset = concatenate_datasets(
                [
                    self.dataset[hf_subset][split]
                    for hf_subset in self.metadata.eval_langs
                ]
            )
        else:
            dataset = (
                self.dataset[split]
                if hf_subset is None
                else self.dataset[hf_subset][split]
            )
        num_samples = len(dataset)

        images = None
        texts = None

        if isinstance(self.images_column_names, str):
            images = list(dataset[self.images_column_names])
        elif isinstance(self.images_column_names, Sequence):
            images = [
                img
                for img_column in self.images_column_names
                for img in dataset[img_column]
            ]

        if isinstance(self.texts_column_names, str):
            texts = list(dataset[self.texts_column_names])
        elif isinstance(self.texts_column_names, Sequence):
            texts = [
                text
                for text_column in self.texts_column_names
                for text in dataset[text_column]
            ]

        return ImageTextPairClassificationDescriptiveStatistics(
            num_samples=num_samples,
            text_statistics=calculate_text_statistics(texts),
            image_statistics=calculate_image_statistics(images),
        )

    def _evaluate_subset(
        self,
        model: EncoderProtocol,
        data_split: Dataset,
        *,
        encode_kwargs: dict[str, Any],
        hf_split: str,
        hf_subset: str,
        prediction_folder: Path | None = None,
        **kwargs: Any,
    ) -> ImageTextPairClassificationMetrics:
        select_columns = []
        for columns in (self.images_column_names, self.texts_column_names):
            if isinstance(columns, str):
                select_columns.append(columns)
            else:
                select_columns.extend(columns)

        data_split = data_split.select_columns(select_columns)
        num_images_per_sample = (
            1
            if isinstance(self.images_column_names, str)
            else len(self.images_column_names)
        )
        num_texts_per_sample = (
            1
            if isinstance(self.texts_column_names, str)
            else len(self.texts_column_names)
        )
        evaluator = ImageTextPairClassificationEvaluator(
            data_split,
            images_column_names=self.images_column_names,
            texts_column_names=self.texts_column_names,
            task_metadata=self.metadata,
            num_texts_per_sample=num_texts_per_sample,
            num_images_per_sample=num_images_per_sample,
            hf_split=hf_split,
            hf_subset=hf_subset,
            **kwargs,
        )
        scores = evaluator(model, encode_kwargs=encode_kwargs)
        if prediction_folder:
            self._save_task_predictions(
                [score.tolist() for score in scores],
                model,
                prediction_folder,
                hf_subset=hf_subset,
                hf_split=hf_split,
            )

        return self._compute_metrics(
            scores,
            num_images_per_sample,
            num_texts_per_sample,
        )

    def _compute_metrics(
        self,
        scores: list[torch.Tensor],
        num_images_per_sample: int,
        num_texts_per_sample: int,
    ) -> ImageTextPairClassificationMetrics:
        image_score = []
        text_score = []
        all_correct_scores = []
        img_ground_truths = torch.arange(num_images_per_sample)
        caption_ground_truths = torch.arange(num_texts_per_sample)

        for score in scores:
            image_closest_text = score.argmax(dim=1)  # shape = (num_images_per_sample)
            text_closest_image = score.argmax(dim=0)  # shape = (num_texts_per_sample)
            pred_text_is_correct = (
                (image_closest_text == img_ground_truths).all().item()
            )
            pred_image_is_correct = (
                (text_closest_image == caption_ground_truths).all().item()
            )
            all_correct = pred_text_is_correct and pred_image_is_correct
            image_score.append(pred_image_is_correct)
            text_score.append(pred_text_is_correct)
            all_correct_scores.append(all_correct)

        return ImageTextPairClassificationMetrics(
            image_acc=torch.Tensor(image_score).float().mean().item(),
            text_acc=torch.Tensor(text_score).float().mean().item(),
            accuracy=torch.Tensor(all_correct_scores).float().mean().item(),
        )

    def _push_dataset_to_hub(self, repo_name: str) -> None:
        text_columns = (
            [self.texts_column_names]
            if isinstance(self.texts_column_names, str)
            else self.texts_column_names
        )
        image_columns = (
            [self.images_column_names]
            if isinstance(self.images_column_names, str)
            else self.images_column_names
        )

        self._upload_dataset_to_hub(
            repo_name,
            [*text_columns, *image_columns],
        )