# flake8: noqa E501
from typing import IO, TYPE_CHECKING, Any, Dict, Set, TypeVar, Union
from pydantic import BaseModel
from pydantic.main import BaseModel
from pydantic.version import VERSION as PYDANTIC_VERSION
from qdrant_client.http.models import *
from qdrant_client.http.models import models as m
PYDANTIC_V2 = PYDANTIC_VERSION.startswith("2.")
Model = TypeVar("Model", bound="BaseModel")
SetIntStr = Set[Union[int, str]]
DictIntStrAny = Dict[Union[int, str], Any]
file = None
[docs]def to_json(model: BaseModel, *args: Any, **kwargs: Any) -> str:
if PYDANTIC_V2:
return model.model_dump_json(*args, **kwargs)
else:
return model.json(*args, **kwargs)
[docs]def jsonable_encoder(
obj: Any,
include: Union[SetIntStr, DictIntStrAny] = None,
exclude=None,
by_alias: bool = True,
skip_defaults: bool = None,
exclude_unset: bool = True,
exclude_none: bool = True,
):
if hasattr(obj, "json") or hasattr(obj, "model_dump_json"):
return to_json(
obj,
include=include,
exclude=exclude,
by_alias=by_alias,
exclude_unset=bool(exclude_unset or skip_defaults),
exclude_none=exclude_none,
)
return obj
if TYPE_CHECKING:
from qdrant_client.http.api_client import ApiClient
class _CollectionsApi:
def __init__(self, api_client: "Union[ApiClient, AsyncApiClient]"):
self.api_client = api_client
def _build_for_collection_cluster_info(
self,
collection_name: str,
):
"""
Get cluster information for a collection
"""
path_params = {
"collection_name": str(collection_name),
}
headers = {}
return self.api_client.request(
type_=m.InlineResponse2008,
method="GET",
url="/collections/{collection_name}/cluster",
headers=headers if headers else None,
path_params=path_params,
)
def _build_for_collection_exists(
self,
collection_name: str,
):
"""
Returns \"true\" if the given collection name exists, and \"false\" otherwise
"""
path_params = {
"collection_name": str(collection_name),
}
headers = {}
return self.api_client.request(
type_=m.InlineResponse2007,
method="GET",
url="/collections/{collection_name}/exists",
headers=headers if headers else None,
path_params=path_params,
)
def _build_for_create_collection(
self,
collection_name: str,
timeout: int = None,
create_collection: m.CreateCollection = None,
):
"""
Create new collection with given parameters
"""
path_params = {
"collection_name": str(collection_name),
}
query_params = {}
if timeout is not None:
query_params["timeout"] = str(timeout)
headers = {}
body = jsonable_encoder(create_collection)
if "Content-Type" not in headers:
headers["Content-Type"] = "application/json"
return self.api_client.request(
type_=m.InlineResponse200,
method="PUT",
url="/collections/{collection_name}",
headers=headers if headers else None,
path_params=path_params,
params=query_params,
content=body,
)
def _build_for_create_field_index(
self,
collection_name: str,
wait: bool = None,
ordering: WriteOrdering = None,
create_field_index: m.CreateFieldIndex = None,
):
"""
Create index for field in collection
"""
path_params = {
"collection_name": str(collection_name),
}
query_params = {}
if wait is not None:
query_params["wait"] = str(wait).lower()
if ordering is not None:
query_params["ordering"] = str(ordering)
headers = {}
body = jsonable_encoder(create_field_index)
if "Content-Type" not in headers:
headers["Content-Type"] = "application/json"
return self.api_client.request(
type_=m.InlineResponse2006,
method="PUT",
url="/collections/{collection_name}/index",
headers=headers if headers else None,
path_params=path_params,
params=query_params,
content=body,
)
def _build_for_create_shard_key(
self,
collection_name: str,
timeout: int = None,
create_sharding_key: m.CreateShardingKey = None,
):
path_params = {
"collection_name": str(collection_name),
}
query_params = {}
if timeout is not None:
query_params["timeout"] = str(timeout)
headers = {}
body = jsonable_encoder(create_sharding_key)
if "Content-Type" not in headers:
headers["Content-Type"] = "application/json"
return self.api_client.request(
type_=m.InlineResponse200,
method="PUT",
url="/collections/{collection_name}/shards",
headers=headers if headers else None,
path_params=path_params,
params=query_params,
content=body,
)
def _build_for_create_shard_snapshot(
self,
collection_name: str,
shard_id: int,
wait: bool = None,
):
"""
Create new snapshot of a shard for a collection
"""
path_params = {
"collection_name": str(collection_name),
"shard_id": str(shard_id),
}
query_params = {}
if wait is not None:
query_params["wait"] = str(wait).lower()
headers = {}
return self.api_client.request(
type_=m.InlineResponse20011,
method="POST",
url="/collections/{collection_name}/shards/{shard_id}/snapshots",
headers=headers if headers else None,
path_params=path_params,
params=query_params,
)
def _build_for_create_snapshot(
self,
collection_name: str,
wait: bool = None,
):
"""
Create new snapshot for a collection
"""
path_params = {
"collection_name": str(collection_name),
}
query_params = {}
if wait is not None:
query_params["wait"] = str(wait).lower()
headers = {}
return self.api_client.request(
type_=m.InlineResponse20011,
method="POST",
url="/collections/{collection_name}/snapshots",
headers=headers if headers else None,
path_params=path_params,
params=query_params,
)
def _build_for_delete_collection(
self,
collection_name: str,
timeout: int = None,
):
"""
Drop collection and all associated data
"""
path_params = {
"collection_name": str(collection_name),
}
query_params = {}
if timeout is not None:
query_params["timeout"] = str(timeout)
headers = {}
return self.api_client.request(
type_=m.InlineResponse200,
method="DELETE",
url="/collections/{collection_name}",
headers=headers if headers else None,
path_params=path_params,
params=query_params,
)
def _build_for_delete_field_index(
self,
collection_name: str,
field_name: str,
wait: bool = None,
ordering: WriteOrdering = None,
):
"""
Delete field index for collection
"""
path_params = {
"collection_name": str(collection_name),
"field_name": str(field_name),
}
query_params = {}
if wait is not None:
query_params["wait"] = str(wait).lower()
if ordering is not None:
query_params["ordering"] = str(ordering)
headers = {}
return self.api_client.request(
type_=m.InlineResponse2006,
method="DELETE",
url="/collections/{collection_name}/index/{field_name}",
headers=headers if headers else None,
path_params=path_params,
params=query_params,
)
def _build_for_delete_shard_key(
self,
collection_name: str,
timeout: int = None,
drop_sharding_key: m.DropShardingKey = None,
):
path_params = {
"collection_name": str(collection_name),
}
query_params = {}
if timeout is not None:
query_params["timeout"] = str(timeout)
headers = {}
body = jsonable_encoder(drop_sharding_key)
if "Content-Type" not in headers:
headers["Content-Type"] = "application/json"
return self.api_client.request(
type_=m.InlineResponse200,
method="POST",
url="/collections/{collection_name}/shards/delete",
headers=headers if headers else None,
path_params=path_params,
params=query_params,
content=body,
)
def _build_for_delete_shard_snapshot(
self,
collection_name: str,
shard_id: int,
snapshot_name: str,
wait: bool = None,
):
"""
Delete snapshot of a shard for a collection
"""
path_params = {
"collection_name": str(collection_name),
"shard_id": str(shard_id),
"snapshot_name": str(snapshot_name),
}
query_params = {}
if wait is not None:
query_params["wait"] = str(wait).lower()
headers = {}
return self.api_client.request(
type_=m.InlineResponse200,
method="DELETE",
url="/collections/{collection_name}/shards/{shard_id}/snapshots/{snapshot_name}",
headers=headers if headers else None,
path_params=path_params,
params=query_params,
)
def _build_for_delete_snapshot(
self,
collection_name: str,
snapshot_name: str,
wait: bool = None,
):
"""
Delete snapshot for a collection
"""
path_params = {
"collection_name": str(collection_name),
"snapshot_name": str(snapshot_name),
}
query_params = {}
if wait is not None:
query_params["wait"] = str(wait).lower()
headers = {}
return self.api_client.request(
type_=m.InlineResponse200,
method="DELETE",
url="/collections/{collection_name}/snapshots/{snapshot_name}",
headers=headers if headers else None,
path_params=path_params,
params=query_params,
)
def _build_for_get_collection(
self,
collection_name: str,
):
"""
Get detailed information about specified existing collection
"""
path_params = {
"collection_name": str(collection_name),
}
headers = {}
return self.api_client.request(
type_=m.InlineResponse2005,
method="GET",
url="/collections/{collection_name}",
headers=headers if headers else None,
path_params=path_params,
)
def _build_for_get_collection_aliases(
self,
collection_name: str,
):
"""
Get list of all aliases for a collection
"""
path_params = {
"collection_name": str(collection_name),
}
headers = {}
return self.api_client.request(
type_=m.InlineResponse2009,
method="GET",
url="/collections/{collection_name}/aliases",
headers=headers if headers else None,
path_params=path_params,
)
def _build_for_get_collections(
self,
):
"""
Get list name of all existing collections
"""
headers = {}
return self.api_client.request(
type_=m.InlineResponse2004,
method="GET",
url="/collections",
headers=headers if headers else None,
)
def _build_for_get_collections_aliases(
self,
):
"""
Get list of all existing collections aliases
"""
headers = {}
return self.api_client.request(
type_=m.InlineResponse2009,
method="GET",
url="/aliases",
headers=headers if headers else None,
)
def _build_for_get_shard_snapshot(
self,
collection_name: str,
shard_id: int,
snapshot_name: str,
):
"""
Download specified snapshot of a shard from a collection as a file
"""
path_params = {
"collection_name": str(collection_name),
"shard_id": str(shard_id),
"snapshot_name": str(snapshot_name),
}
headers = {}
return self.api_client.request(
type_=file,
method="GET",
url="/collections/{collection_name}/shards/{shard_id}/snapshots/{snapshot_name}",
headers=headers if headers else None,
path_params=path_params,
)
def _build_for_get_snapshot(
self,
collection_name: str,
snapshot_name: str,
):
"""
Download specified snapshot from a collection as a file
"""
path_params = {
"collection_name": str(collection_name),
"snapshot_name": str(snapshot_name),
}
headers = {}
return self.api_client.request(
type_=file,
method="GET",
url="/collections/{collection_name}/snapshots/{snapshot_name}",
headers=headers if headers else None,
path_params=path_params,
)
def _build_for_list_shard_snapshots(
self,
collection_name: str,
shard_id: int,
):
"""
Get list of snapshots for a shard of a collection
"""
path_params = {
"collection_name": str(collection_name),
"shard_id": str(shard_id),
}
headers = {}
return self.api_client.request(
type_=m.InlineResponse20010,
method="GET",
url="/collections/{collection_name}/shards/{shard_id}/snapshots",
headers=headers if headers else None,
path_params=path_params,
)
def _build_for_list_snapshots(
self,
collection_name: str,
):
"""
Get list of snapshots for a collection
"""
path_params = {
"collection_name": str(collection_name),
}
headers = {}
return self.api_client.request(
type_=m.InlineResponse20010,
method="GET",
url="/collections/{collection_name}/snapshots",
headers=headers if headers else None,
path_params=path_params,
)
def _build_for_recover_from_snapshot(
self,
collection_name: str,
wait: bool = None,
snapshot_recover: m.SnapshotRecover = None,
):
"""
Recover local collection data from a snapshot. This will overwrite any data, stored on this node, for the collection. If collection does not exist - it will be created.
"""
path_params = {
"collection_name": str(collection_name),
}
query_params = {}
if wait is not None:
query_params["wait"] = str(wait).lower()
headers = {}
body = jsonable_encoder(snapshot_recover)
if "Content-Type" not in headers:
headers["Content-Type"] = "application/json"
return self.api_client.request(
type_=m.InlineResponse200,
method="PUT",
url="/collections/{collection_name}/snapshots/recover",
headers=headers if headers else None,
path_params=path_params,
params=query_params,
content=body,
)
def _build_for_recover_from_uploaded_snapshot(
self,
collection_name: str,
wait: bool = None,
priority: SnapshotPriority = None,
checksum: str = None,
snapshot: IO[Any] = None,
):
"""
Recover local collection data from an uploaded snapshot. This will overwrite any data, stored on this node, for the collection. If collection does not exist - it will be created.
"""
path_params = {
"collection_name": str(collection_name),
}
query_params = {}
if wait is not None:
query_params["wait"] = str(wait).lower()
if priority is not None:
query_params["priority"] = str(priority)
if checksum is not None:
query_params["checksum"] = str(checksum)
headers = {}
files: Dict[str, IO[Any]] = {} # noqa F841
data: Dict[str, Any] = {} # noqa F841
if snapshot is not None:
files["snapshot"] = snapshot
return self.api_client.request(
type_=m.InlineResponse200,
method="POST",
url="/collections/{collection_name}/snapshots/upload",
headers=headers if headers else None,
path_params=path_params,
params=query_params,
data=data,
files=files,
)
def _build_for_recover_shard_from_snapshot(
self,
collection_name: str,
shard_id: int,
wait: bool = None,
shard_snapshot_recover: m.ShardSnapshotRecover = None,
):
"""
Recover shard of a local collection data from a snapshot. This will overwrite any data, stored in this shard, for the collection.
"""
path_params = {
"collection_name": str(collection_name),
"shard_id": str(shard_id),
}
query_params = {}
if wait is not None:
query_params["wait"] = str(wait).lower()
headers = {}
body = jsonable_encoder(shard_snapshot_recover)
if "Content-Type" not in headers:
headers["Content-Type"] = "application/json"
return self.api_client.request(
type_=m.InlineResponse200,
method="PUT",
url="/collections/{collection_name}/shards/{shard_id}/snapshots/recover",
headers=headers if headers else None,
path_params=path_params,
params=query_params,
content=body,
)
def _build_for_recover_shard_from_uploaded_snapshot(
self,
collection_name: str,
shard_id: int,
wait: bool = None,
priority: SnapshotPriority = None,
checksum: str = None,
snapshot: IO[Any] = None,
):
"""
Recover shard of a local collection from an uploaded snapshot. This will overwrite any data, stored on this node, for the collection shard.
"""
path_params = {
"collection_name": str(collection_name),
"shard_id": str(shard_id),
}
query_params = {}
if wait is not None:
query_params["wait"] = str(wait).lower()
if priority is not None:
query_params["priority"] = str(priority)
if checksum is not None:
query_params["checksum"] = str(checksum)
headers = {}
files: Dict[str, IO[Any]] = {} # noqa F841
data: Dict[str, Any] = {} # noqa F841
if snapshot is not None:
files["snapshot"] = snapshot
return self.api_client.request(
type_=m.InlineResponse200,
method="POST",
url="/collections/{collection_name}/shards/{shard_id}/snapshots/upload",
headers=headers if headers else None,
path_params=path_params,
params=query_params,
data=data,
files=files,
)
def _build_for_update_aliases(
self,
timeout: int = None,
change_aliases_operation: m.ChangeAliasesOperation = None,
):
query_params = {}
if timeout is not None:
query_params["timeout"] = str(timeout)
headers = {}
body = jsonable_encoder(change_aliases_operation)
if "Content-Type" not in headers:
headers["Content-Type"] = "application/json"
return self.api_client.request(
type_=m.InlineResponse200,
method="POST",
url="/collections/aliases",
headers=headers if headers else None,
params=query_params,
content=body,
)
def _build_for_update_collection(
self,
collection_name: str,
timeout: int = None,
update_collection: m.UpdateCollection = None,
):
"""
Update parameters of the existing collection
"""
path_params = {
"collection_name": str(collection_name),
}
query_params = {}
if timeout is not None:
query_params["timeout"] = str(timeout)
headers = {}
body = jsonable_encoder(update_collection)
if "Content-Type" not in headers:
headers["Content-Type"] = "application/json"
return self.api_client.request(
type_=m.InlineResponse200,
method="PATCH",
url="/collections/{collection_name}",
headers=headers if headers else None,
path_params=path_params,
params=query_params,
content=body,
)
def _build_for_update_collection_cluster(
self,
collection_name: str,
timeout: int = None,
cluster_operations: m.ClusterOperations = None,
):
path_params = {
"collection_name": str(collection_name),
}
query_params = {}
if timeout is not None:
query_params["timeout"] = str(timeout)
headers = {}
body = jsonable_encoder(cluster_operations)
if "Content-Type" not in headers:
headers["Content-Type"] = "application/json"
return self.api_client.request(
type_=m.InlineResponse200,
method="POST",
url="/collections/{collection_name}/cluster",
headers=headers if headers else None,
path_params=path_params,
params=query_params,
content=body,
)
[docs]class AsyncCollectionsApi(_CollectionsApi):
[docs] async def collection_cluster_info(
self,
collection_name: str,
) -> m.InlineResponse2008:
"""
Get cluster information for a collection
"""
return await self._build_for_collection_cluster_info(
collection_name=collection_name,
)
[docs] async def collection_exists(
self,
collection_name: str,
) -> m.InlineResponse2007:
"""
Returns \"true\" if the given collection name exists, and \"false\" otherwise
"""
return await self._build_for_collection_exists(
collection_name=collection_name,
)
[docs] async def create_collection(
self,
collection_name: str,
timeout: int = None,
create_collection: m.CreateCollection = None,
) -> m.InlineResponse200:
"""
Create new collection with given parameters
"""
return await self._build_for_create_collection(
collection_name=collection_name,
timeout=timeout,
create_collection=create_collection,
)
[docs] async def create_field_index(
self,
collection_name: str,
wait: bool = None,
ordering: WriteOrdering = None,
create_field_index: m.CreateFieldIndex = None,
) -> m.InlineResponse2006:
"""
Create index for field in collection
"""
return await self._build_for_create_field_index(
collection_name=collection_name,
wait=wait,
ordering=ordering,
create_field_index=create_field_index,
)
[docs] async def create_shard_key(
self,
collection_name: str,
timeout: int = None,
create_sharding_key: m.CreateShardingKey = None,
) -> m.InlineResponse200:
return await self._build_for_create_shard_key(
collection_name=collection_name,
timeout=timeout,
create_sharding_key=create_sharding_key,
)
[docs] async def create_shard_snapshot(
self,
collection_name: str,
shard_id: int,
wait: bool = None,
) -> m.InlineResponse20011:
"""
Create new snapshot of a shard for a collection
"""
return await self._build_for_create_shard_snapshot(
collection_name=collection_name,
shard_id=shard_id,
wait=wait,
)
[docs] async def create_snapshot(
self,
collection_name: str,
wait: bool = None,
) -> m.InlineResponse20011:
"""
Create new snapshot for a collection
"""
return await self._build_for_create_snapshot(
collection_name=collection_name,
wait=wait,
)
[docs] async def delete_collection(
self,
collection_name: str,
timeout: int = None,
) -> m.InlineResponse200:
"""
Drop collection and all associated data
"""
return await self._build_for_delete_collection(
collection_name=collection_name,
timeout=timeout,
)
[docs] async def delete_field_index(
self,
collection_name: str,
field_name: str,
wait: bool = None,
ordering: WriteOrdering = None,
) -> m.InlineResponse2006:
"""
Delete field index for collection
"""
return await self._build_for_delete_field_index(
collection_name=collection_name,
field_name=field_name,
wait=wait,
ordering=ordering,
)
[docs] async def delete_shard_key(
self,
collection_name: str,
timeout: int = None,
drop_sharding_key: m.DropShardingKey = None,
) -> m.InlineResponse200:
return await self._build_for_delete_shard_key(
collection_name=collection_name,
timeout=timeout,
drop_sharding_key=drop_sharding_key,
)
[docs] async def delete_shard_snapshot(
self,
collection_name: str,
shard_id: int,
snapshot_name: str,
wait: bool = None,
) -> m.InlineResponse200:
"""
Delete snapshot of a shard for a collection
"""
return await self._build_for_delete_shard_snapshot(
collection_name=collection_name,
shard_id=shard_id,
snapshot_name=snapshot_name,
wait=wait,
)
[docs] async def delete_snapshot(
self,
collection_name: str,
snapshot_name: str,
wait: bool = None,
) -> m.InlineResponse200:
"""
Delete snapshot for a collection
"""
return await self._build_for_delete_snapshot(
collection_name=collection_name,
snapshot_name=snapshot_name,
wait=wait,
)
[docs] async def get_collection(
self,
collection_name: str,
) -> m.InlineResponse2005:
"""
Get detailed information about specified existing collection
"""
return await self._build_for_get_collection(
collection_name=collection_name,
)
[docs] async def get_collection_aliases(
self,
collection_name: str,
) -> m.InlineResponse2009:
"""
Get list of all aliases for a collection
"""
return await self._build_for_get_collection_aliases(
collection_name=collection_name,
)
[docs] async def get_collections(
self,
) -> m.InlineResponse2004:
"""
Get list name of all existing collections
"""
return await self._build_for_get_collections()
[docs] async def get_collections_aliases(
self,
) -> m.InlineResponse2009:
"""
Get list of all existing collections aliases
"""
return await self._build_for_get_collections_aliases()
[docs] async def get_shard_snapshot(
self,
collection_name: str,
shard_id: int,
snapshot_name: str,
) -> file:
"""
Download specified snapshot of a shard from a collection as a file
"""
return await self._build_for_get_shard_snapshot(
collection_name=collection_name,
shard_id=shard_id,
snapshot_name=snapshot_name,
)
[docs] async def get_snapshot(
self,
collection_name: str,
snapshot_name: str,
) -> file:
"""
Download specified snapshot from a collection as a file
"""
return await self._build_for_get_snapshot(
collection_name=collection_name,
snapshot_name=snapshot_name,
)
[docs] async def list_shard_snapshots(
self,
collection_name: str,
shard_id: int,
) -> m.InlineResponse20010:
"""
Get list of snapshots for a shard of a collection
"""
return await self._build_for_list_shard_snapshots(
collection_name=collection_name,
shard_id=shard_id,
)
[docs] async def list_snapshots(
self,
collection_name: str,
) -> m.InlineResponse20010:
"""
Get list of snapshots for a collection
"""
return await self._build_for_list_snapshots(
collection_name=collection_name,
)
[docs] async def recover_from_snapshot(
self,
collection_name: str,
wait: bool = None,
snapshot_recover: m.SnapshotRecover = None,
) -> m.InlineResponse200:
"""
Recover local collection data from a snapshot. This will overwrite any data, stored on this node, for the collection. If collection does not exist - it will be created.
"""
return await self._build_for_recover_from_snapshot(
collection_name=collection_name,
wait=wait,
snapshot_recover=snapshot_recover,
)
[docs] async def recover_from_uploaded_snapshot(
self,
collection_name: str,
wait: bool = None,
priority: SnapshotPriority = None,
checksum: str = None,
snapshot: IO[Any] = None,
) -> m.InlineResponse200:
"""
Recover local collection data from an uploaded snapshot. This will overwrite any data, stored on this node, for the collection. If collection does not exist - it will be created.
"""
return await self._build_for_recover_from_uploaded_snapshot(
collection_name=collection_name,
wait=wait,
priority=priority,
checksum=checksum,
snapshot=snapshot,
)
[docs] async def recover_shard_from_snapshot(
self,
collection_name: str,
shard_id: int,
wait: bool = None,
shard_snapshot_recover: m.ShardSnapshotRecover = None,
) -> m.InlineResponse200:
"""
Recover shard of a local collection data from a snapshot. This will overwrite any data, stored in this shard, for the collection.
"""
return await self._build_for_recover_shard_from_snapshot(
collection_name=collection_name,
shard_id=shard_id,
wait=wait,
shard_snapshot_recover=shard_snapshot_recover,
)
[docs] async def recover_shard_from_uploaded_snapshot(
self,
collection_name: str,
shard_id: int,
wait: bool = None,
priority: SnapshotPriority = None,
checksum: str = None,
snapshot: IO[Any] = None,
) -> m.InlineResponse200:
"""
Recover shard of a local collection from an uploaded snapshot. This will overwrite any data, stored on this node, for the collection shard.
"""
return await self._build_for_recover_shard_from_uploaded_snapshot(
collection_name=collection_name,
shard_id=shard_id,
wait=wait,
priority=priority,
checksum=checksum,
snapshot=snapshot,
)
[docs] async def update_aliases(
self,
timeout: int = None,
change_aliases_operation: m.ChangeAliasesOperation = None,
) -> m.InlineResponse200:
return await self._build_for_update_aliases(
timeout=timeout,
change_aliases_operation=change_aliases_operation,
)
[docs] async def update_collection(
self,
collection_name: str,
timeout: int = None,
update_collection: m.UpdateCollection = None,
) -> m.InlineResponse200:
"""
Update parameters of the existing collection
"""
return await self._build_for_update_collection(
collection_name=collection_name,
timeout=timeout,
update_collection=update_collection,
)
[docs] async def update_collection_cluster(
self,
collection_name: str,
timeout: int = None,
cluster_operations: m.ClusterOperations = None,
) -> m.InlineResponse200:
return await self._build_for_update_collection_cluster(
collection_name=collection_name,
timeout=timeout,
cluster_operations=cluster_operations,
)
[docs]class SyncCollectionsApi(_CollectionsApi):
[docs] def collection_cluster_info(
self,
collection_name: str,
) -> m.InlineResponse2008:
"""
Get cluster information for a collection
"""
return self._build_for_collection_cluster_info(
collection_name=collection_name,
)
[docs] def collection_exists(
self,
collection_name: str,
) -> m.InlineResponse2007:
"""
Returns \"true\" if the given collection name exists, and \"false\" otherwise
"""
return self._build_for_collection_exists(
collection_name=collection_name,
)
[docs] def create_collection(
self,
collection_name: str,
timeout: int = None,
create_collection: m.CreateCollection = None,
) -> m.InlineResponse200:
"""
Create new collection with given parameters
"""
return self._build_for_create_collection(
collection_name=collection_name,
timeout=timeout,
create_collection=create_collection,
)
[docs] def create_field_index(
self,
collection_name: str,
wait: bool = None,
ordering: WriteOrdering = None,
create_field_index: m.CreateFieldIndex = None,
) -> m.InlineResponse2006:
"""
Create index for field in collection
"""
return self._build_for_create_field_index(
collection_name=collection_name,
wait=wait,
ordering=ordering,
create_field_index=create_field_index,
)
[docs] def create_shard_key(
self,
collection_name: str,
timeout: int = None,
create_sharding_key: m.CreateShardingKey = None,
) -> m.InlineResponse200:
return self._build_for_create_shard_key(
collection_name=collection_name,
timeout=timeout,
create_sharding_key=create_sharding_key,
)
[docs] def create_shard_snapshot(
self,
collection_name: str,
shard_id: int,
wait: bool = None,
) -> m.InlineResponse20011:
"""
Create new snapshot of a shard for a collection
"""
return self._build_for_create_shard_snapshot(
collection_name=collection_name,
shard_id=shard_id,
wait=wait,
)
[docs] def create_snapshot(
self,
collection_name: str,
wait: bool = None,
) -> m.InlineResponse20011:
"""
Create new snapshot for a collection
"""
return self._build_for_create_snapshot(
collection_name=collection_name,
wait=wait,
)
[docs] def delete_collection(
self,
collection_name: str,
timeout: int = None,
) -> m.InlineResponse200:
"""
Drop collection and all associated data
"""
return self._build_for_delete_collection(
collection_name=collection_name,
timeout=timeout,
)
[docs] def delete_field_index(
self,
collection_name: str,
field_name: str,
wait: bool = None,
ordering: WriteOrdering = None,
) -> m.InlineResponse2006:
"""
Delete field index for collection
"""
return self._build_for_delete_field_index(
collection_name=collection_name,
field_name=field_name,
wait=wait,
ordering=ordering,
)
[docs] def delete_shard_key(
self,
collection_name: str,
timeout: int = None,
drop_sharding_key: m.DropShardingKey = None,
) -> m.InlineResponse200:
return self._build_for_delete_shard_key(
collection_name=collection_name,
timeout=timeout,
drop_sharding_key=drop_sharding_key,
)
[docs] def delete_shard_snapshot(
self,
collection_name: str,
shard_id: int,
snapshot_name: str,
wait: bool = None,
) -> m.InlineResponse200:
"""
Delete snapshot of a shard for a collection
"""
return self._build_for_delete_shard_snapshot(
collection_name=collection_name,
shard_id=shard_id,
snapshot_name=snapshot_name,
wait=wait,
)
[docs] def delete_snapshot(
self,
collection_name: str,
snapshot_name: str,
wait: bool = None,
) -> m.InlineResponse200:
"""
Delete snapshot for a collection
"""
return self._build_for_delete_snapshot(
collection_name=collection_name,
snapshot_name=snapshot_name,
wait=wait,
)
[docs] def get_collection(
self,
collection_name: str,
) -> m.InlineResponse2005:
"""
Get detailed information about specified existing collection
"""
return self._build_for_get_collection(
collection_name=collection_name,
)
[docs] def get_collection_aliases(
self,
collection_name: str,
) -> m.InlineResponse2009:
"""
Get list of all aliases for a collection
"""
return self._build_for_get_collection_aliases(
collection_name=collection_name,
)
[docs] def get_collections(
self,
) -> m.InlineResponse2004:
"""
Get list name of all existing collections
"""
return self._build_for_get_collections()
[docs] def get_collections_aliases(
self,
) -> m.InlineResponse2009:
"""
Get list of all existing collections aliases
"""
return self._build_for_get_collections_aliases()
[docs] def get_shard_snapshot(
self,
collection_name: str,
shard_id: int,
snapshot_name: str,
) -> file:
"""
Download specified snapshot of a shard from a collection as a file
"""
return self._build_for_get_shard_snapshot(
collection_name=collection_name,
shard_id=shard_id,
snapshot_name=snapshot_name,
)
[docs] def get_snapshot(
self,
collection_name: str,
snapshot_name: str,
) -> file:
"""
Download specified snapshot from a collection as a file
"""
return self._build_for_get_snapshot(
collection_name=collection_name,
snapshot_name=snapshot_name,
)
[docs] def list_shard_snapshots(
self,
collection_name: str,
shard_id: int,
) -> m.InlineResponse20010:
"""
Get list of snapshots for a shard of a collection
"""
return self._build_for_list_shard_snapshots(
collection_name=collection_name,
shard_id=shard_id,
)
[docs] def list_snapshots(
self,
collection_name: str,
) -> m.InlineResponse20010:
"""
Get list of snapshots for a collection
"""
return self._build_for_list_snapshots(
collection_name=collection_name,
)
[docs] def recover_from_snapshot(
self,
collection_name: str,
wait: bool = None,
snapshot_recover: m.SnapshotRecover = None,
) -> m.InlineResponse200:
"""
Recover local collection data from a snapshot. This will overwrite any data, stored on this node, for the collection. If collection does not exist - it will be created.
"""
return self._build_for_recover_from_snapshot(
collection_name=collection_name,
wait=wait,
snapshot_recover=snapshot_recover,
)
[docs] def recover_from_uploaded_snapshot(
self,
collection_name: str,
wait: bool = None,
priority: SnapshotPriority = None,
checksum: str = None,
snapshot: IO[Any] = None,
) -> m.InlineResponse200:
"""
Recover local collection data from an uploaded snapshot. This will overwrite any data, stored on this node, for the collection. If collection does not exist - it will be created.
"""
return self._build_for_recover_from_uploaded_snapshot(
collection_name=collection_name,
wait=wait,
priority=priority,
checksum=checksum,
snapshot=snapshot,
)
[docs] def recover_shard_from_snapshot(
self,
collection_name: str,
shard_id: int,
wait: bool = None,
shard_snapshot_recover: m.ShardSnapshotRecover = None,
) -> m.InlineResponse200:
"""
Recover shard of a local collection data from a snapshot. This will overwrite any data, stored in this shard, for the collection.
"""
return self._build_for_recover_shard_from_snapshot(
collection_name=collection_name,
shard_id=shard_id,
wait=wait,
shard_snapshot_recover=shard_snapshot_recover,
)
[docs] def recover_shard_from_uploaded_snapshot(
self,
collection_name: str,
shard_id: int,
wait: bool = None,
priority: SnapshotPriority = None,
checksum: str = None,
snapshot: IO[Any] = None,
) -> m.InlineResponse200:
"""
Recover shard of a local collection from an uploaded snapshot. This will overwrite any data, stored on this node, for the collection shard.
"""
return self._build_for_recover_shard_from_uploaded_snapshot(
collection_name=collection_name,
shard_id=shard_id,
wait=wait,
priority=priority,
checksum=checksum,
snapshot=snapshot,
)
[docs] def update_aliases(
self,
timeout: int = None,
change_aliases_operation: m.ChangeAliasesOperation = None,
) -> m.InlineResponse200:
return self._build_for_update_aliases(
timeout=timeout,
change_aliases_operation=change_aliases_operation,
)
[docs] def update_collection(
self,
collection_name: str,
timeout: int = None,
update_collection: m.UpdateCollection = None,
) -> m.InlineResponse200:
"""
Update parameters of the existing collection
"""
return self._build_for_update_collection(
collection_name=collection_name,
timeout=timeout,
update_collection=update_collection,
)
[docs] def update_collection_cluster(
self,
collection_name: str,
timeout: int = None,
cluster_operations: m.ClusterOperations = None,
) -> m.InlineResponse200:
return self._build_for_update_collection_cluster(
collection_name=collection_name,
timeout=timeout,
cluster_operations=cluster_operations,
)