# ****** WARNING: THIS FILE IS AUTOGENERATED ******
#
# This file is autogenerated. Do not edit it manually.
# To regenerate this file, use
#
# ```
# bash -x tools/generate_async_client.sh
# ```
#
# ****** WARNING: THIS FILE IS AUTOGENERATED ******
import importlib.metadata
import itertools
import json
import os
import shutil
from copy import deepcopy
from io import TextIOWrapper
from typing import Any, Generator, Iterable, Mapping, Optional, Sequence, Union, get_args
from uuid import uuid4
import numpy as np
import portalocker
from qdrant_client.common.client_warnings import show_warning, show_warning_once
from qdrant_client._pydantic_compat import to_dict
from qdrant_client.async_client_base import AsyncQdrantBase
from qdrant_client.conversions import common_types as types
from qdrant_client.http import models as rest_models
from qdrant_client.http.models.models import RecommendExample
from qdrant_client.local.local_collection import (
LocalCollection,
DEFAULT_VECTOR_NAME,
ignore_mentioned_ids_filter,
)
META_INFO_FILENAME = "meta.json"
[docs]class AsyncQdrantLocal(AsyncQdrantBase):
"""
Everything Qdrant server can do, but locally.
Use this implementation to run vector search without running a Qdrant server.
Everything that works with local Qdrant will work with server Qdrant as well.
Use for small-scale data, demos, and tests.
If you need more speed or size, use Qdrant server.
"""
LARGE_DATA_THRESHOLD = 20000
def __init__(self, location: str, force_disable_check_same_thread: bool = False) -> None:
"""
Initialize local Qdrant.
Args:
location: Where to store data. Can be a path to a directory or `:memory:` for in-memory storage.
force_disable_check_same_thread: Disable SQLite check_same_thread check. Use only if you know what you are doing.
"""
super().__init__()
self.force_disable_check_same_thread = force_disable_check_same_thread
self.location = location
self.persistent = location != ":memory:"
self.collections: dict[str, LocalCollection] = {}
self.aliases: dict[str, str] = {}
self._flock_file: Optional[TextIOWrapper] = None
self._load()
self._closed: bool = False
@property
def closed(self) -> bool:
return self._closed
[docs] async def close(self, **kwargs: Any) -> None:
self._closed = True
for collection in self.collections.values():
if collection is not None:
collection.close()
else:
show_warning(
message=f"Collection appears to be None before closing. The existing collections are: {list(self.collections.keys())}",
category=UserWarning,
stacklevel=4,
)
try:
if self._flock_file is not None and (not self._flock_file.closed):
portalocker.unlock(self._flock_file)
self._flock_file.close()
except TypeError:
pass
def _load(self) -> None:
if not self.persistent:
return
meta_path = os.path.join(self.location, META_INFO_FILENAME)
if not os.path.exists(meta_path):
os.makedirs(self.location, exist_ok=True)
with open(meta_path, "w") as f:
f.write(json.dumps({"collections": {}, "aliases": {}}))
else:
with open(meta_path, "r") as f:
meta = json.load(f)
for collection_name, config_json in meta["collections"].items():
config = rest_models.CreateCollection(**config_json)
collection_path = self._collection_path(collection_name)
collection = LocalCollection(
config,
collection_path,
force_disable_check_same_thread=self.force_disable_check_same_thread,
)
self.collections[collection_name] = collection
if len(collection.ids) > self.LARGE_DATA_THRESHOLD:
show_warning(
f"Local mode is not recommended for collections with more than {self.LARGE_DATA_THRESHOLD:,} points. Collection <{collection_name}> contains {len(collection.ids)} points. Consider using Qdrant in Docker or Qdrant Cloud for better performance with large datasets.",
category=UserWarning,
stacklevel=5,
)
self.aliases = meta["aliases"]
lock_file_path = os.path.join(self.location, ".lock")
if not os.path.exists(lock_file_path):
os.makedirs(self.location, exist_ok=True)
with open(lock_file_path, "w") as f:
f.write("tmp lock file")
self._flock_file = open(lock_file_path, "r+")
try:
portalocker.lock(
self._flock_file,
portalocker.LockFlags.EXCLUSIVE | portalocker.LockFlags.NON_BLOCKING,
)
except portalocker.exceptions.LockException:
raise RuntimeError(
f"Storage folder {self.location} is already accessed by another instance of Qdrant client. If you require concurrent access, use Qdrant server instead."
)
def _save(self) -> None:
if not self.persistent:
return
if self.closed:
raise RuntimeError("QdrantLocal instance is closed. Please create a new instance.")
meta_path = os.path.join(self.location, META_INFO_FILENAME)
with open(meta_path, "w") as f:
f.write(
json.dumps(
{
"collections": {
collection_name: to_dict(collection.config)
for (collection_name, collection) in self.collections.items()
},
"aliases": self.aliases,
}
)
)
def _get_collection(self, collection_name: str) -> LocalCollection:
if self.closed:
raise RuntimeError("QdrantLocal instance is closed. Please create a new instance.")
if collection_name in self.collections:
return self.collections[collection_name]
if collection_name in self.aliases:
return self.collections[self.aliases[collection_name]]
raise ValueError(f"Collection {collection_name} not found")
[docs] async def search_batch(
self, collection_name: str, requests: Sequence[types.SearchRequest], **kwargs: Any
) -> list[list[types.ScoredPoint]]:
collection = self._get_collection(collection_name)
return [
collection.search(
query_vector=request.vector,
query_filter=request.filter,
limit=request.limit,
offset=request.offset,
with_payload=request.with_payload,
with_vectors=request.with_vector,
score_threshold=request.score_threshold,
)
for request in requests
]
[docs] async def search(
self,
collection_name: str,
query_vector: Union[
types.NumpyArray,
Sequence[float],
tuple[str, list[float]],
types.NamedVector,
types.NamedSparseVector,
],
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
offset: Optional[int] = None,
with_payload: Union[bool, Sequence[str], types.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
score_threshold: Optional[float] = None,
**kwargs: Any,
) -> list[types.ScoredPoint]:
collection = self._get_collection(collection_name)
return collection.search(
query_vector=query_vector,
query_filter=query_filter,
limit=limit,
offset=offset,
with_payload=with_payload,
with_vectors=with_vectors,
score_threshold=score_threshold,
)
[docs] async def search_matrix_offsets(
self,
collection_name: str,
query_filter: Optional[types.Filter] = None,
limit: int = 3,
sample: int = 10,
using: Optional[str] = None,
**kwargs: Any,
) -> types.SearchMatrixOffsetsResponse:
collection = self._get_collection(collection_name)
return collection.search_matrix_offsets(
query_filter=query_filter, limit=limit, sample=sample, using=using
)
[docs] async def search_matrix_pairs(
self,
collection_name: str,
query_filter: Optional[types.Filter] = None,
limit: int = 3,
sample: int = 10,
using: Optional[str] = None,
**kwargs: Any,
) -> types.SearchMatrixPairsResponse:
collection = self._get_collection(collection_name)
return collection.search_matrix_pairs(
query_filter=query_filter, limit=limit, sample=sample, using=using
)
[docs] async def search_groups(
self,
collection_name: str,
query_vector: Union[
types.NumpyArray, Sequence[float], tuple[str, list[float]], types.NamedVector
],
group_by: str,
query_filter: Optional[rest_models.Filter] = None,
search_params: Optional[rest_models.SearchParams] = None,
limit: int = 10,
group_size: int = 1,
with_payload: Union[bool, Sequence[str], rest_models.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
score_threshold: Optional[float] = None,
with_lookup: Optional[types.WithLookupInterface] = None,
**kwargs: Any,
) -> types.GroupsResult:
collection = self._get_collection(collection_name)
with_lookup_collection = None
if with_lookup is not None:
if isinstance(with_lookup, str):
with_lookup_collection = self._get_collection(with_lookup)
else:
with_lookup_collection = self._get_collection(with_lookup.collection)
return collection.search_groups(
query_vector=query_vector,
query_filter=query_filter,
limit=limit,
group_by=group_by,
group_size=group_size,
with_payload=with_payload,
with_vectors=with_vectors,
score_threshold=score_threshold,
with_lookup=with_lookup,
with_lookup_collection=with_lookup_collection,
)
def _resolve_query_input(
self,
collection_name: str,
query: Optional[types.Query],
using: Optional[str],
lookup_from: Optional[types.LookupLocation],
) -> tuple[types.Query, set[types.PointId]]:
"""
Resolves any possible ids into vectors and returns a new query object, along with a set of the mentioned
point ids that should be filtered when searching.
"""
lookup_collection_name = lookup_from.collection if lookup_from else collection_name
collection = self._get_collection(lookup_collection_name)
search_in_vector_name = using if using is not None else DEFAULT_VECTOR_NAME
vector_name = (
lookup_from.vector
if lookup_from is not None and lookup_from.vector is not None
else search_in_vector_name
)
sparse = vector_name in collection.sparse_vectors
multi = vector_name in collection.multivectors
if sparse:
collection_vectors = collection.sparse_vectors
elif multi:
collection_vectors = collection.multivectors
else:
collection_vectors = collection.vectors
mentioned_ids: set[types.PointId] = set()
def input_into_vector(vector_input: types.VectorInput) -> types.VectorInput:
if isinstance(vector_input, get_args(types.PointId)):
point_id = vector_input
if point_id not in collection.ids:
raise ValueError(f"Point {point_id} is not found in the collection")
idx = collection.ids[point_id]
if vector_name in collection_vectors:
vec = collection_vectors[vector_name][idx]
else:
raise ValueError(f"Vector {vector_name} not found")
if isinstance(vec, np.ndarray):
vec = vec.tolist()
if collection_name == lookup_collection_name:
mentioned_ids.add(point_id)
return vec
else:
return vector_input
query = deepcopy(query)
if isinstance(query, rest_models.NearestQuery):
query.nearest = input_into_vector(query.nearest)
elif isinstance(query, rest_models.RecommendQuery):
if query.recommend.negative is not None:
query.recommend.negative = [
input_into_vector(vector_input) for vector_input in query.recommend.negative
]
if query.recommend.positive is not None:
query.recommend.positive = [
input_into_vector(vector_input) for vector_input in query.recommend.positive
]
elif isinstance(query, rest_models.DiscoverQuery):
query.discover.target = input_into_vector(query.discover.target)
pairs = (
query.discover.context
if isinstance(query.discover.context, list)
else [query.discover.context]
)
query.discover.context = [
rest_models.ContextPair(
positive=input_into_vector(pair.positive),
negative=input_into_vector(pair.negative),
)
for pair in pairs
]
elif isinstance(query, rest_models.ContextQuery):
pairs = query.context if isinstance(query.context, list) else [query.context]
query.context = [
rest_models.ContextPair(
positive=input_into_vector(pair.positive),
negative=input_into_vector(pair.negative),
)
for pair in pairs
]
elif isinstance(query, rest_models.OrderByQuery):
pass
elif isinstance(query, rest_models.FusionQuery):
pass
return (query, mentioned_ids)
def _resolve_prefetches_input(
self,
prefetch: Optional[Union[Sequence[types.Prefetch], types.Prefetch]],
collection_name: str,
) -> list[types.Prefetch]:
if prefetch is None:
return []
if isinstance(prefetch, list) and len(prefetch) == 0:
return []
prefetches = []
if isinstance(prefetch, types.Prefetch):
prefetches = [prefetch]
prefetches.extend(
prefetch.prefetch if isinstance(prefetch.prefetch, list) else [prefetch.prefetch]
)
elif isinstance(prefetch, Sequence):
prefetches = list(prefetch)
return [
self._resolve_prefetch_input(prefetch, collection_name)
for prefetch in prefetches
if prefetch is not None
]
def _resolve_prefetch_input(
self, prefetch: types.Prefetch, collection_name: str
) -> types.Prefetch:
if prefetch.query is None:
return prefetch
prefetch = deepcopy(prefetch)
(query, mentioned_ids) = self._resolve_query_input(
collection_name, prefetch.query, prefetch.using, prefetch.lookup_from
)
prefetch.query = query
prefetch.filter = ignore_mentioned_ids_filter(prefetch.filter, list(mentioned_ids))
prefetch.prefetch = self._resolve_prefetches_input(prefetch.prefetch, collection_name)
return prefetch
[docs] async def query_points(
self,
collection_name: str,
query: Optional[types.Query] = None,
using: Optional[str] = None,
prefetch: Union[types.Prefetch, list[types.Prefetch], None] = None,
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
offset: Optional[int] = None,
with_payload: Union[bool, Sequence[str], types.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
score_threshold: Optional[float] = None,
lookup_from: Optional[types.LookupLocation] = None,
**kwargs: Any,
) -> types.QueryResponse:
collection = self._get_collection(collection_name)
if query is not None:
(query, mentioned_ids) = self._resolve_query_input(
collection_name, query, using, lookup_from
)
query_filter = ignore_mentioned_ids_filter(query_filter, list(mentioned_ids))
prefetch = self._resolve_prefetches_input(prefetch, collection_name)
return collection.query_points(
query=query,
prefetch=prefetch,
query_filter=query_filter,
using=using,
score_threshold=score_threshold,
limit=limit,
offset=offset or 0,
with_payload=with_payload,
with_vectors=with_vectors,
)
[docs] async def query_batch_points(
self, collection_name: str, requests: Sequence[types.QueryRequest], **kwargs: Any
) -> list[types.QueryResponse]:
collection = self._get_collection(collection_name)
return [
collection.query_points(
query=request.query,
prefetch=request.prefetch,
query_filter=request.filter,
limit=request.limit,
offset=request.offset or 0,
with_payload=request.with_payload,
with_vectors=request.with_vector,
score_threshold=request.score_threshold,
using=request.using,
lookup_from_collection=self._get_collection(request.lookup_from.collection)
if request.lookup_from
else None,
lookup_from_vector_name=request.lookup_from.vector
if request.lookup_from
else None,
)
for request in requests
]
[docs] async def query_points_groups(
self,
collection_name: str,
group_by: str,
query: Union[
types.PointId,
list[float],
list[list[float]],
types.SparseVector,
types.Query,
types.NumpyArray,
types.Document,
types.Image,
types.InferenceObject,
None,
] = None,
using: Optional[str] = None,
prefetch: Union[types.Prefetch, list[types.Prefetch], None] = None,
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
group_size: int = 3,
with_payload: Union[bool, Sequence[str], types.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
score_threshold: Optional[float] = None,
with_lookup: Optional[types.WithLookupInterface] = None,
lookup_from: Optional[types.LookupLocation] = None,
**kwargs: Any,
) -> types.GroupsResult:
collection = self._get_collection(collection_name)
if query is not None:
(query, mentioned_ids) = self._resolve_query_input(
collection_name, query, using, lookup_from
)
query_filter = ignore_mentioned_ids_filter(query_filter, list(mentioned_ids))
with_lookup_collection = None
if with_lookup is not None:
if isinstance(with_lookup, str):
with_lookup_collection = self._get_collection(with_lookup)
else:
with_lookup_collection = self._get_collection(with_lookup.collection)
return collection.query_groups(
query=query,
query_filter=query_filter,
using=using,
prefetch=prefetch,
limit=limit,
group_by=group_by,
group_size=group_size,
with_payload=with_payload,
with_vectors=with_vectors,
score_threshold=score_threshold,
with_lookup=with_lookup,
with_lookup_collection=with_lookup_collection,
)
[docs] async def recommend_batch(
self, collection_name: str, requests: Sequence[types.RecommendRequest], **kwargs: Any
) -> list[list[types.ScoredPoint]]:
collection = self._get_collection(collection_name)
return [
collection.recommend(
positive=request.positive,
negative=request.negative,
query_filter=request.filter,
limit=request.limit,
offset=request.offset,
with_payload=request.with_payload,
with_vectors=request.with_vector,
score_threshold=request.score_threshold,
using=request.using,
lookup_from_collection=self._get_collection(request.lookup_from.collection)
if request.lookup_from
else None,
lookup_from_vector_name=request.lookup_from.vector
if request.lookup_from
else None,
strategy=request.strategy,
)
for request in requests
]
[docs] async def recommend(
self,
collection_name: str,
positive: Optional[Sequence[RecommendExample]] = None,
negative: Optional[Sequence[RecommendExample]] = None,
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
offset: int = 0,
with_payload: Union[bool, list[str], types.PayloadSelector] = True,
with_vectors: Union[bool, list[str]] = False,
score_threshold: Optional[float] = None,
using: Optional[str] = None,
lookup_from: Optional[types.LookupLocation] = None,
strategy: Optional[types.RecommendStrategy] = None,
**kwargs: Any,
) -> list[types.ScoredPoint]:
collection = self._get_collection(collection_name)
return collection.recommend(
positive=positive,
negative=negative,
query_filter=query_filter,
limit=limit,
offset=offset,
with_payload=with_payload,
with_vectors=with_vectors,
score_threshold=score_threshold,
using=using,
lookup_from_collection=self._get_collection(lookup_from.collection)
if lookup_from
else None,
lookup_from_vector_name=lookup_from.vector if lookup_from else None,
strategy=strategy,
)
[docs] async def recommend_groups(
self,
collection_name: str,
group_by: str,
positive: Optional[Sequence[Union[types.PointId, list[float]]]] = None,
negative: Optional[Sequence[Union[types.PointId, list[float]]]] = None,
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
group_size: int = 1,
score_threshold: Optional[float] = None,
with_payload: Union[bool, Sequence[str], types.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
using: Optional[str] = None,
lookup_from: Optional[types.LookupLocation] = None,
with_lookup: Optional[types.WithLookupInterface] = None,
strategy: Optional[types.RecommendStrategy] = None,
**kwargs: Any,
) -> types.GroupsResult:
collection = self._get_collection(collection_name)
with_lookup_collection = None
if with_lookup is not None:
if isinstance(with_lookup, str):
with_lookup_collection = self._get_collection(with_lookup)
else:
with_lookup_collection = self._get_collection(with_lookup.collection)
return collection.recommend_groups(
positive=positive,
negative=negative,
group_by=group_by,
group_size=group_size,
query_filter=query_filter,
limit=limit,
with_payload=with_payload,
with_vectors=with_vectors,
score_threshold=score_threshold,
using=using,
lookup_from_collection=self._get_collection(lookup_from.collection)
if lookup_from
else None,
lookup_from_vector_name=lookup_from.vector if lookup_from else None,
with_lookup=with_lookup,
with_lookup_collection=with_lookup_collection,
strategy=strategy,
)
[docs] async def discover(
self,
collection_name: str,
target: Optional[types.TargetVector] = None,
context: Optional[Sequence[types.ContextExamplePair]] = None,
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
offset: int = 0,
with_payload: Union[bool, list[str], types.PayloadSelector] = True,
with_vectors: Union[bool, list[str]] = False,
using: Optional[str] = None,
lookup_from: Optional[types.LookupLocation] = None,
consistency: Optional[types.ReadConsistency] = None,
timeout: Optional[int] = None,
**kwargs: Any,
) -> list[types.ScoredPoint]:
collection = self._get_collection(collection_name)
return collection.discover(
target=target,
context=context,
query_filter=query_filter,
limit=limit,
offset=offset,
with_payload=with_payload,
with_vectors=with_vectors,
using=using,
lookup_from_collection=self._get_collection(lookup_from.collection)
if lookup_from
else None,
lookup_from_vector_name=lookup_from.vector if lookup_from else None,
)
[docs] async def discover_batch(
self, collection_name: str, requests: Sequence[types.DiscoverRequest], **kwargs: Any
) -> list[list[types.ScoredPoint]]:
collection = self._get_collection(collection_name)
return [
collection.discover(
target=request.target,
context=request.context,
query_filter=request.filter,
limit=request.limit,
offset=request.offset,
with_payload=request.with_payload,
with_vectors=request.with_vector,
using=request.using,
lookup_from_collection=self._get_collection(request.lookup_from.collection)
if request.lookup_from
else None,
lookup_from_vector_name=request.lookup_from.vector
if request.lookup_from
else None,
)
for request in requests
]
[docs] async def count(
self,
collection_name: str,
count_filter: Optional[types.Filter] = None,
exact: bool = True,
**kwargs: Any,
) -> types.CountResult:
collection = self._get_collection(collection_name)
return collection.count(count_filter=count_filter)
[docs] async def facet(
self,
collection_name: str,
key: str,
facet_filter: Optional[types.Filter] = None,
limit: int = 10,
exact: bool = False,
**kwargs: Any,
) -> types.FacetResponse:
collection = self._get_collection(collection_name)
return collection.facet(key=key, facet_filter=facet_filter, limit=limit)
[docs] async def upsert(
self, collection_name: str, points: types.Points, **kwargs: Any
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.upsert(points)
return self._default_update_result()
[docs] async def update_vectors(
self, collection_name: str, points: Sequence[types.PointVectors], **kwargs: Any
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.update_vectors(points)
return self._default_update_result()
[docs] async def delete_vectors(
self,
collection_name: str,
vectors: Sequence[str],
points: types.PointsSelector,
**kwargs: Any,
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.delete_vectors(vectors, points)
return self._default_update_result()
[docs] async def retrieve(
self,
collection_name: str,
ids: Sequence[types.PointId],
with_payload: Union[bool, Sequence[str], types.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
**kwargs: Any,
) -> list[types.Record]:
collection = self._get_collection(collection_name)
return collection.retrieve(ids, with_payload, with_vectors)
@classmethod
def _default_update_result(cls, operation_id: int = 0) -> types.UpdateResult:
return types.UpdateResult(
operation_id=operation_id, status=rest_models.UpdateStatus.COMPLETED
)
[docs] async def delete(
self, collection_name: str, points_selector: types.PointsSelector, **kwargs: Any
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.delete(points_selector)
return self._default_update_result()
[docs] async def set_payload(
self,
collection_name: str,
payload: types.Payload,
points: types.PointsSelector,
key: Optional[str] = None,
**kwargs: Any,
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.set_payload(payload=payload, selector=points, key=key)
return self._default_update_result()
[docs] async def overwrite_payload(
self,
collection_name: str,
payload: types.Payload,
points: types.PointsSelector,
**kwargs: Any,
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.overwrite_payload(payload=payload, selector=points)
return self._default_update_result()
[docs] async def delete_payload(
self,
collection_name: str,
keys: Sequence[str],
points: types.PointsSelector,
**kwargs: Any,
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.delete_payload(keys=keys, selector=points)
return self._default_update_result()
[docs] async def clear_payload(
self, collection_name: str, points_selector: types.PointsSelector, **kwargs: Any
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.clear_payload(selector=points_selector)
return self._default_update_result()
[docs] async def batch_update_points(
self,
collection_name: str,
update_operations: Sequence[types.UpdateOperation],
**kwargs: Any,
) -> list[types.UpdateResult]:
collection = self._get_collection(collection_name)
collection.batch_update_points(update_operations)
return [self._default_update_result()] * len(update_operations)
[docs] async def update_collection_aliases(
self, change_aliases_operations: Sequence[types.AliasOperations], **kwargs: Any
) -> bool:
for operation in change_aliases_operations:
if isinstance(operation, rest_models.CreateAliasOperation):
self._get_collection(operation.create_alias.collection_name)
self.aliases[operation.create_alias.alias_name] = (
operation.create_alias.collection_name
)
elif isinstance(operation, rest_models.DeleteAliasOperation):
self.aliases.pop(operation.delete_alias.alias_name, None)
elif isinstance(operation, rest_models.RenameAliasOperation):
new_name = operation.rename_alias.new_alias_name
old_name = operation.rename_alias.old_alias_name
self.aliases[new_name] = self.aliases.pop(old_name)
else:
raise ValueError(f"Unknown operation: {operation}")
self._save()
return True
[docs] async def get_collection_aliases(
self, collection_name: str, **kwargs: Any
) -> types.CollectionsAliasesResponse:
if self.closed:
raise RuntimeError("QdrantLocal instance is closed. Please create a new instance.")
return types.CollectionsAliasesResponse(
aliases=[
rest_models.AliasDescription(alias_name=alias_name, collection_name=name)
for (alias_name, name) in self.aliases.items()
if name == collection_name
]
)
[docs] async def get_aliases(self, **kwargs: Any) -> types.CollectionsAliasesResponse:
if self.closed:
raise RuntimeError("QdrantLocal instance is closed. Please create a new instance.")
return types.CollectionsAliasesResponse(
aliases=[
rest_models.AliasDescription(alias_name=alias_name, collection_name=name)
for (alias_name, name) in self.aliases.items()
]
)
[docs] async def get_collections(self, **kwargs: Any) -> types.CollectionsResponse:
if self.closed:
raise RuntimeError("QdrantLocal instance is closed. Please create a new instance.")
return types.CollectionsResponse(
collections=[
rest_models.CollectionDescription(name=name)
for (name, _) in self.collections.items()
]
)
[docs] async def get_collection(self, collection_name: str, **kwargs: Any) -> types.CollectionInfo:
collection = self._get_collection(collection_name)
return collection.info()
[docs] async def collection_exists(self, collection_name: str, **kwargs: Any) -> bool:
try:
self._get_collection(collection_name)
return True
except ValueError:
return False
[docs] async def update_collection(
self,
collection_name: str,
sparse_vectors_config: Optional[Mapping[str, types.SparseVectorParams]] = None,
**kwargs: Any,
) -> bool:
_collection = self._get_collection(collection_name)
if sparse_vectors_config is not None:
for vector_name, vector_params in sparse_vectors_config.items():
_collection.update_sparse_vectors_config(vector_name, vector_params)
return True
return False
def _collection_path(self, collection_name: str) -> Optional[str]:
if self.persistent:
return os.path.join(self.location, "collection", collection_name)
else:
return None
[docs] async def delete_collection(self, collection_name: str, **kwargs: Any) -> bool:
if self.closed:
raise RuntimeError("QdrantLocal instance is closed. Please create a new instance.")
_collection = self.collections.pop(collection_name, None)
del _collection
self.aliases = {
alias_name: name
for (alias_name, name) in self.aliases.items()
if name != collection_name
}
collection_path = self._collection_path(collection_name)
if collection_path is not None:
shutil.rmtree(collection_path, ignore_errors=True)
self._save()
return True
[docs] async def create_collection(
self,
collection_name: str,
vectors_config: Union[types.VectorParams, Mapping[str, types.VectorParams]],
init_from: Optional[types.InitFrom] = None,
sparse_vectors_config: Optional[Mapping[str, types.SparseVectorParams]] = None,
**kwargs: Any,
) -> bool:
if self.closed:
raise RuntimeError("QdrantLocal instance is closed. Please create a new instance.")
src_collection = None
from_collection_name = None
if init_from is not None:
from_collection_name = (
init_from if isinstance(init_from, str) else init_from.collection
)
src_collection = self._get_collection(from_collection_name)
if collection_name in self.collections:
raise ValueError(f"Collection {collection_name} already exists")
collection_path = self._collection_path(collection_name)
if collection_path is not None:
os.makedirs(collection_path, exist_ok=True)
collection = LocalCollection(
rest_models.CreateCollection(
vectors=vectors_config, sparse_vectors=sparse_vectors_config
),
location=collection_path,
force_disable_check_same_thread=self.force_disable_check_same_thread,
)
self.collections[collection_name] = collection
if src_collection and from_collection_name:
batch_size = 100
(records, next_offset) = await self.scroll(
from_collection_name, limit=2, with_vectors=True
)
self.upload_records(collection_name, records)
while next_offset is not None:
(records, next_offset) = await self.scroll(
from_collection_name, offset=next_offset, limit=batch_size, with_vectors=True
)
self.upload_records(collection_name, records)
self._save()
return True
[docs] async def recreate_collection(
self,
collection_name: str,
vectors_config: Union[types.VectorParams, Mapping[str, types.VectorParams]],
init_from: Optional[types.InitFrom] = None,
sparse_vectors_config: Optional[Mapping[str, types.SparseVectorParams]] = None,
**kwargs: Any,
) -> bool:
await self.delete_collection(collection_name)
return await self.create_collection(
collection_name, vectors_config, init_from, sparse_vectors_config
)
[docs] def upload_points(
self, collection_name: str, points: Iterable[types.PointStruct], **kwargs: Any
) -> None:
self._upload_points(collection_name, points)
[docs] def upload_records(
self, collection_name: str, records: Iterable[types.Record], **kwargs: Any
) -> None:
self._upload_points(collection_name, records)
def _upload_points(
self, collection_name: str, points: Iterable[Union[types.PointStruct, types.Record]]
) -> None:
collection = self._get_collection(collection_name)
collection.upsert(
[
rest_models.PointStruct(
id=point.id, vector=point.vector or {}, payload=point.payload or {}
)
for point in points
]
)
[docs] def upload_collection(
self,
collection_name: str,
vectors: Union[
dict[str, types.NumpyArray], types.NumpyArray, Iterable[types.VectorStruct]
],
payload: Optional[Iterable[dict[Any, Any]]] = None,
ids: Optional[Iterable[types.PointId]] = None,
**kwargs: Any,
) -> None:
def uuid_generator() -> Generator[str, None, None]:
while True:
yield str(uuid4())
collection = self._get_collection(collection_name)
if isinstance(vectors, dict) and any(
(isinstance(v, np.ndarray) for v in vectors.values())
):
assert (
len(set([arr.shape[0] for arr in vectors.values()])) == 1
), "Each named vector should have the same number of vectors"
num_vectors = next(iter(vectors.values())).shape[0]
vectors = [
{name: vectors[name][i].tolist() for name in vectors.keys()}
for i in range(num_vectors)
]
collection.upsert(
[
rest_models.PointStruct(
id=point_id,
vector=(vector.tolist() if isinstance(vector, np.ndarray) else vector) or {},
payload=payload or {},
)
for (point_id, vector, payload) in zip(
ids or uuid_generator(), iter(vectors), payload or itertools.cycle([{}])
)
]
)
[docs] async def create_payload_index(
self,
collection_name: str,
field_name: str,
field_schema: Optional[types.PayloadSchemaType] = None,
field_type: Optional[types.PayloadSchemaType] = None,
**kwargs: Any,
) -> types.UpdateResult:
show_warning_once(
message="Payload indexes have no effect in the local Qdrant. Please use server Qdrant if you need payload indexes.",
category=UserWarning,
idx="create-local-payload-indexes",
stacklevel=5,
)
return self._default_update_result()
[docs] async def delete_payload_index(
self, collection_name: str, field_name: str, **kwargs: Any
) -> types.UpdateResult:
show_warning_once(
message="Payload indexes have no effect in the local Qdrant. Please use server Qdrant if you need payload indexes.",
category=UserWarning,
idx="delete-local-payload-indexes",
stacklevel=5,
)
return self._default_update_result()
[docs] async def list_snapshots(
self, collection_name: str, **kwargs: Any
) -> list[types.SnapshotDescription]:
return []
[docs] async def create_snapshot(
self, collection_name: str, **kwargs: Any
) -> Optional[types.SnapshotDescription]:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] async def delete_snapshot(
self, collection_name: str, snapshot_name: str, **kwargs: Any
) -> bool:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] async def list_full_snapshots(self, **kwargs: Any) -> list[types.SnapshotDescription]:
return []
[docs] async def create_full_snapshot(self, **kwargs: Any) -> types.SnapshotDescription:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] async def delete_full_snapshot(self, snapshot_name: str, **kwargs: Any) -> bool:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] async def recover_snapshot(self, collection_name: str, location: str, **kwargs: Any) -> bool:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] async def list_shard_snapshots(
self, collection_name: str, shard_id: int, **kwargs: Any
) -> list[types.SnapshotDescription]:
return []
[docs] async def create_shard_snapshot(
self, collection_name: str, shard_id: int, **kwargs: Any
) -> Optional[types.SnapshotDescription]:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need snapshots."
)
[docs] async def delete_shard_snapshot(
self, collection_name: str, shard_id: int, snapshot_name: str, **kwargs: Any
) -> bool:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need snapshots."
)
[docs] async def recover_shard_snapshot(
self, collection_name: str, shard_id: int, location: str, **kwargs: Any
) -> bool:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need snapshots."
)
[docs] async def lock_storage(self, reason: str, **kwargs: Any) -> types.LocksOption:
raise NotImplementedError(
"Locks are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] async def unlock_storage(self, **kwargs: Any) -> types.LocksOption:
raise NotImplementedError(
"Locks are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] async def get_locks(self, **kwargs: Any) -> types.LocksOption:
return types.LocksOption(error_message=None, write=False)
[docs] async def create_shard_key(
self,
collection_name: str,
shard_key: types.ShardKey,
shards_number: Optional[int] = None,
replication_factor: Optional[int] = None,
placement: Optional[list[int]] = None,
**kwargs: Any,
) -> bool:
raise NotImplementedError(
"Sharding is not supported in the local Qdrant. Please use server Qdrant if you need sharding."
)
[docs] async def delete_shard_key(
self, collection_name: str, shard_key: types.ShardKey, **kwargs: Any
) -> bool:
raise NotImplementedError(
"Sharding is not supported in the local Qdrant. Please use server Qdrant if you need sharding."
)
[docs] async def info(self) -> types.VersionInfo:
version = importlib.metadata.version("qdrant-client")
return rest_models.VersionInfo(
title="qdrant - vector search engine", version=version, commit=None
)