import importlib.metadata
import itertools
import json
import logging
import os
import shutil
from copy import deepcopy
from io import TextIOWrapper
from typing import (
Any,
Dict,
Generator,
Iterable,
List,
Mapping,
Optional,
Sequence,
Tuple,
Union,
get_args,
Set,
)
from uuid import uuid4
import numpy as np
import portalocker
from qdrant_client._pydantic_compat import to_dict
from qdrant_client.client_base import QdrantBase
from qdrant_client.conversions import common_types as types
from qdrant_client.http import models as rest_models
from qdrant_client.http.models.models import RecommendExample
from qdrant_client.local.local_collection import (
LocalCollection,
DEFAULT_VECTOR_NAME,
ignore_mentioned_ids_filter,
)
META_INFO_FILENAME = "meta.json"
[docs]class QdrantLocal(QdrantBase):
"""
Everything Qdrant server can do, but locally.
Use this implementation to run vector search without running a Qdrant server.
Everything that works with local Qdrant will work with server Qdrant as well.
Use for small-scale data, demos, and tests.
If you need more speed or size, use Qdrant server.
"""
def __init__(self, location: str, force_disable_check_same_thread: bool = False) -> None:
"""
Initialize local Qdrant.
Args:
location: Where to store data. Can be a path to a directory or `:memory:` for in-memory storage.
force_disable_check_same_thread: Disable SQLite check_same_thread check. Use only if you know what you are doing.
"""
super().__init__()
self.force_disable_check_same_thread = force_disable_check_same_thread
self.location = location
self.persistent = location != ":memory:"
self.collections: Dict[str, LocalCollection] = {}
self.aliases: Dict[str, str] = {}
self._flock_file: Optional[TextIOWrapper] = None
self._load()
self._closed: bool = False
@property
def closed(self) -> bool:
return self._closed
[docs] def close(self, **kwargs: Any) -> None:
self._closed = True
for collection in self.collections.values():
if collection is not None:
collection.close()
else:
logging.warning(
f"Collection appears to be None before closing. The existing collections are: "
f"{list(self.collections.keys())}"
)
try:
if self._flock_file is not None and not self._flock_file.closed:
portalocker.unlock(self._flock_file)
self._flock_file.close()
except TypeError: # sometimes portalocker module can be garbage collected before
# QdrantLocal instance
pass
def _load(self) -> None:
if not self.persistent:
return
meta_path = os.path.join(self.location, META_INFO_FILENAME)
if not os.path.exists(meta_path):
os.makedirs(self.location, exist_ok=True)
with open(meta_path, "w") as f:
f.write(json.dumps({"collections": {}, "aliases": {}}))
else:
with open(meta_path, "r") as f:
meta = json.load(f)
for collection_name, config_json in meta["collections"].items():
config = rest_models.CreateCollection(**config_json)
collection_path = self._collection_path(collection_name)
self.collections[collection_name] = LocalCollection(
config,
collection_path,
force_disable_check_same_thread=self.force_disable_check_same_thread,
)
self.aliases = meta["aliases"]
lock_file_path = os.path.join(self.location, ".lock")
if not os.path.exists(lock_file_path):
os.makedirs(self.location, exist_ok=True)
with open(lock_file_path, "w") as f:
f.write("tmp lock file")
self._flock_file = open(lock_file_path, "r+")
try:
portalocker.lock(
self._flock_file,
portalocker.LockFlags.EXCLUSIVE | portalocker.LockFlags.NON_BLOCKING,
)
except portalocker.exceptions.LockException:
raise RuntimeError(
f"Storage folder {self.location} is already accessed by another instance of Qdrant client."
f" If you require concurrent access, use Qdrant server instead."
)
def _save(self) -> None:
if not self.persistent:
return
if self.closed:
raise RuntimeError("QdrantLocal instance is closed. Please create a new instance.")
meta_path = os.path.join(self.location, META_INFO_FILENAME)
with open(meta_path, "w") as f:
f.write(
json.dumps(
{
"collections": {
collection_name: to_dict(collection.config)
for collection_name, collection in self.collections.items()
},
"aliases": self.aliases,
}
)
)
def _get_collection(self, collection_name: str) -> LocalCollection:
if self.closed:
raise RuntimeError("QdrantLocal instance is closed. Please create a new instance.")
if collection_name in self.collections:
return self.collections[collection_name]
if collection_name in self.aliases:
return self.collections[self.aliases[collection_name]]
raise ValueError(f"Collection {collection_name} not found")
[docs] def search_batch(
self,
collection_name: str,
requests: Sequence[types.SearchRequest],
**kwargs: Any,
) -> List[List[types.ScoredPoint]]:
collection = self._get_collection(collection_name)
return [
collection.search(
query_vector=request.vector,
query_filter=request.filter,
limit=request.limit,
offset=request.offset,
with_payload=request.with_payload,
with_vectors=request.with_vector,
score_threshold=request.score_threshold,
)
for request in requests
]
[docs] def search(
self,
collection_name: str,
query_vector: Union[
types.NumpyArray,
Sequence[float],
Tuple[str, List[float]],
types.NamedVector,
types.NamedSparseVector,
],
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
offset: Optional[int] = None,
with_payload: Union[bool, Sequence[str], types.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
score_threshold: Optional[float] = None,
**kwargs: Any,
) -> List[types.ScoredPoint]:
collection = self._get_collection(collection_name)
return collection.search(
query_vector=query_vector,
query_filter=query_filter,
limit=limit,
offset=offset,
with_payload=with_payload,
with_vectors=with_vectors,
score_threshold=score_threshold,
)
[docs] def search_matrix_offsets(
self,
collection_name: str,
query_filter: Optional[types.Filter] = None,
limit: int = 3,
sample: int = 10,
using: Optional[str] = None,
**kwargs: Any,
) -> types.SearchMatrixOffsetsResponse:
collection = self._get_collection(collection_name)
return collection.search_matrix_offsets(
query_filter=query_filter, limit=limit, sample=sample, using=using
)
[docs] def search_matrix_pairs(
self,
collection_name: str,
query_filter: Optional[types.Filter] = None,
limit: int = 3,
sample: int = 10,
using: Optional[str] = None,
**kwargs: Any,
) -> types.SearchMatrixPairsResponse:
collection = self._get_collection(collection_name)
return collection.search_matrix_pairs(
query_filter=query_filter, limit=limit, sample=sample, using=using
)
[docs] def search_groups(
self,
collection_name: str,
query_vector: Union[
types.NumpyArray,
Sequence[float],
Tuple[str, List[float]],
types.NamedVector,
],
group_by: str,
query_filter: Optional[rest_models.Filter] = None,
search_params: Optional[rest_models.SearchParams] = None,
limit: int = 10,
group_size: int = 1,
with_payload: Union[bool, Sequence[str], rest_models.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
score_threshold: Optional[float] = None,
with_lookup: Optional[types.WithLookupInterface] = None,
**kwargs: Any,
) -> types.GroupsResult:
collection = self._get_collection(collection_name)
with_lookup_collection = None
if with_lookup is not None:
if isinstance(with_lookup, str):
with_lookup_collection = self._get_collection(with_lookup)
else:
with_lookup_collection = self._get_collection(with_lookup.collection)
return collection.search_groups(
query_vector=query_vector,
query_filter=query_filter,
limit=limit,
group_by=group_by,
group_size=group_size,
with_payload=with_payload,
with_vectors=with_vectors,
score_threshold=score_threshold,
with_lookup=with_lookup,
with_lookup_collection=with_lookup_collection,
)
def _resolve_query_input(
self,
collection_name: str,
query: Optional[types.Query],
using: Optional[str],
lookup_from: Optional[types.LookupLocation],
) -> Tuple[types.Query, Set[types.PointId]]:
"""
Resolves any possible ids into vectors and returns a new query object, along with a set of the mentioned
point ids that should be filtered when searching.
"""
lookup_collection_name = lookup_from.collection if lookup_from else collection_name
collection = self._get_collection(lookup_collection_name)
search_in_vector_name = using if using is not None else DEFAULT_VECTOR_NAME
vector_name = (
lookup_from.vector
if lookup_from is not None and lookup_from.vector is not None
else search_in_vector_name
)
sparse = vector_name in collection.sparse_vectors
multi = vector_name in collection.multivectors
if sparse:
collection_vectors = collection.sparse_vectors
elif multi:
collection_vectors = collection.multivectors
else:
collection_vectors = collection.vectors
# mentioned ids in the search collection which should be excluded from search
mentioned_ids: Set[types.PointId] = set()
def input_into_vector(
vector_input: types.VectorInput,
) -> types.VectorInput:
if isinstance(vector_input, get_args(types.PointId)):
point_id = vector_input # rename for clarity
if point_id not in collection.ids:
raise ValueError(f"Point {point_id} is not found in the collection")
idx = collection.ids[point_id]
if vector_name in collection_vectors:
vec = collection_vectors[vector_name][idx]
else:
raise ValueError(f"Vector {vector_name} not found")
if isinstance(vec, np.ndarray):
vec = vec.tolist()
if collection_name == lookup_collection_name:
mentioned_ids.add(point_id)
return vec
else:
return vector_input
query = deepcopy(query)
if isinstance(query, rest_models.NearestQuery):
query.nearest = input_into_vector(query.nearest)
elif isinstance(query, rest_models.RecommendQuery):
if query.recommend.negative is not None:
query.recommend.negative = [
input_into_vector(vector_input) for vector_input in query.recommend.negative
]
if query.recommend.positive is not None:
query.recommend.positive = [
input_into_vector(vector_input) for vector_input in query.recommend.positive
]
elif isinstance(query, rest_models.DiscoverQuery):
query.discover.target = input_into_vector(query.discover.target)
pairs = (
query.discover.context
if isinstance(query.discover.context, list)
else [query.discover.context]
)
query.discover.context = [
rest_models.ContextPair(
positive=input_into_vector(pair.positive),
negative=input_into_vector(pair.negative),
)
for pair in pairs
]
elif isinstance(query, rest_models.ContextQuery):
pairs = query.context if isinstance(query.context, list) else [query.context]
query.context = [
rest_models.ContextPair(
positive=input_into_vector(pair.positive),
negative=input_into_vector(pair.negative),
)
for pair in pairs
]
elif isinstance(query, rest_models.OrderByQuery):
pass
elif isinstance(query, rest_models.FusionQuery):
pass
return query, mentioned_ids
def _resolve_prefetches_input(
self,
prefetch: Optional[Union[Sequence[types.Prefetch], types.Prefetch]],
collection_name: str,
) -> List[types.Prefetch]:
if prefetch is None:
return []
if isinstance(prefetch, list) and len(prefetch) == 0:
return []
prefetches = []
if isinstance(prefetch, types.Prefetch):
prefetches = [prefetch]
prefetches.extend(
prefetch.prefetch if isinstance(prefetch.prefetch, list) else [prefetch.prefetch]
)
elif isinstance(prefetch, Sequence):
prefetches = list(prefetch)
return [
self._resolve_prefetch_input(prefetch, collection_name)
for prefetch in prefetches
if prefetch is not None
]
def _resolve_prefetch_input(
self, prefetch: types.Prefetch, collection_name: str
) -> types.Prefetch:
if prefetch.query is None:
return prefetch
prefetch = deepcopy(prefetch)
query, mentioned_ids = self._resolve_query_input(
collection_name,
prefetch.query,
prefetch.using,
prefetch.lookup_from,
)
prefetch.query = query
prefetch.filter = ignore_mentioned_ids_filter(prefetch.filter, list(mentioned_ids))
prefetch.prefetch = self._resolve_prefetches_input(prefetch.prefetch, collection_name)
return prefetch
[docs] def query_points(
self,
collection_name: str,
query: Optional[types.Query] = None,
using: Optional[str] = None,
prefetch: Union[types.Prefetch, List[types.Prefetch], None] = None,
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
offset: Optional[int] = None,
with_payload: Union[bool, Sequence[str], types.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
score_threshold: Optional[float] = None,
lookup_from: Optional[types.LookupLocation] = None,
**kwargs: Any,
) -> types.QueryResponse:
collection = self._get_collection(collection_name)
if query is not None:
query, mentioned_ids = self._resolve_query_input(
collection_name, query, using, lookup_from
)
query_filter = ignore_mentioned_ids_filter(query_filter, list(mentioned_ids))
prefetch = self._resolve_prefetches_input(prefetch, collection_name)
return collection.query_points(
query=query,
prefetch=prefetch,
query_filter=query_filter,
using=using,
score_threshold=score_threshold,
limit=limit,
offset=offset or 0,
with_payload=with_payload,
with_vectors=with_vectors,
)
[docs] def query_batch_points(
self,
collection_name: str,
requests: Sequence[types.QueryRequest],
**kwargs: Any,
) -> List[types.QueryResponse]:
collection = self._get_collection(collection_name)
return [
collection.query_points(
query=request.query,
prefetch=request.prefetch,
query_filter=request.filter,
limit=request.limit,
offset=request.offset or 0,
with_payload=request.with_payload,
with_vectors=request.with_vector,
score_threshold=request.score_threshold,
using=request.using,
lookup_from_collection=self._get_collection(request.lookup_from.collection)
if request.lookup_from
else None,
lookup_from_vector_name=request.lookup_from.vector
if request.lookup_from
else None,
)
for request in requests
]
[docs] def query_points_groups(
self,
collection_name: str,
group_by: str,
query: Union[
types.PointId,
List[float],
List[List[float]],
types.SparseVector,
types.Query,
types.NumpyArray,
types.Document,
None,
] = None,
using: Optional[str] = None,
prefetch: Union[types.Prefetch, List[types.Prefetch], None] = None,
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
group_size: int = 3,
with_payload: Union[bool, Sequence[str], types.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
score_threshold: Optional[float] = None,
with_lookup: Optional[types.WithLookupInterface] = None,
lookup_from: Optional[types.LookupLocation] = None,
**kwargs: Any,
) -> types.GroupsResult:
collection = self._get_collection(collection_name)
if query is not None:
query, mentioned_ids = self._resolve_query_input(
collection_name, query, using, lookup_from
)
query_filter = ignore_mentioned_ids_filter(query_filter, list(mentioned_ids))
with_lookup_collection = None
if with_lookup is not None:
if isinstance(with_lookup, str):
with_lookup_collection = self._get_collection(with_lookup)
else:
with_lookup_collection = self._get_collection(with_lookup.collection)
return collection.query_groups(
query=query,
query_filter=query_filter,
using=using,
prefetch=prefetch,
limit=limit,
group_by=group_by,
group_size=group_size,
with_payload=with_payload,
with_vectors=with_vectors,
score_threshold=score_threshold,
with_lookup=with_lookup,
with_lookup_collection=with_lookup_collection,
)
[docs] def recommend_batch(
self,
collection_name: str,
requests: Sequence[types.RecommendRequest],
**kwargs: Any,
) -> List[List[types.ScoredPoint]]:
collection = self._get_collection(collection_name)
return [
collection.recommend(
positive=request.positive,
negative=request.negative,
query_filter=request.filter,
limit=request.limit,
offset=request.offset,
with_payload=request.with_payload,
with_vectors=request.with_vector,
score_threshold=request.score_threshold,
using=request.using,
lookup_from_collection=self._get_collection(request.lookup_from.collection)
if request.lookup_from
else None,
lookup_from_vector_name=request.lookup_from.vector
if request.lookup_from
else None,
strategy=request.strategy,
)
for request in requests
]
[docs] def recommend(
self,
collection_name: str,
positive: Optional[Sequence[RecommendExample]] = None,
negative: Optional[Sequence[RecommendExample]] = None,
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
offset: int = 0,
with_payload: Union[bool, List[str], types.PayloadSelector] = True,
with_vectors: Union[bool, List[str]] = False,
score_threshold: Optional[float] = None,
using: Optional[str] = None,
lookup_from: Optional[types.LookupLocation] = None,
strategy: Optional[types.RecommendStrategy] = None,
**kwargs: Any,
) -> List[types.ScoredPoint]:
collection = self._get_collection(collection_name)
return collection.recommend(
positive=positive,
negative=negative,
query_filter=query_filter,
limit=limit,
offset=offset,
with_payload=with_payload,
with_vectors=with_vectors,
score_threshold=score_threshold,
using=using,
lookup_from_collection=self._get_collection(lookup_from.collection)
if lookup_from
else None,
lookup_from_vector_name=lookup_from.vector if lookup_from else None,
strategy=strategy,
)
[docs] def recommend_groups(
self,
collection_name: str,
group_by: str,
positive: Optional[Sequence[Union[types.PointId, List[float]]]] = None,
negative: Optional[Sequence[Union[types.PointId, List[float]]]] = None,
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
group_size: int = 1,
score_threshold: Optional[float] = None,
with_payload: Union[bool, Sequence[str], types.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
using: Optional[str] = None,
lookup_from: Optional[types.LookupLocation] = None,
with_lookup: Optional[types.WithLookupInterface] = None,
strategy: Optional[types.RecommendStrategy] = None,
**kwargs: Any,
) -> types.GroupsResult:
collection = self._get_collection(collection_name)
with_lookup_collection = None
if with_lookup is not None:
if isinstance(with_lookup, str):
with_lookup_collection = self._get_collection(with_lookup)
else:
with_lookup_collection = self._get_collection(with_lookup.collection)
return collection.recommend_groups(
positive=positive,
negative=negative,
group_by=group_by,
group_size=group_size,
query_filter=query_filter,
limit=limit,
with_payload=with_payload,
with_vectors=with_vectors,
score_threshold=score_threshold,
using=using,
lookup_from_collection=self._get_collection(lookup_from.collection)
if lookup_from
else None,
lookup_from_vector_name=lookup_from.vector if lookup_from else None,
with_lookup=with_lookup,
with_lookup_collection=with_lookup_collection,
strategy=strategy,
)
[docs] def discover(
self,
collection_name: str,
target: Optional[types.TargetVector] = None,
context: Optional[Sequence[types.ContextExamplePair]] = None,
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
offset: int = 0,
with_payload: Union[bool, List[str], types.PayloadSelector] = True,
with_vectors: Union[bool, List[str]] = False,
using: Optional[str] = None,
lookup_from: Optional[types.LookupLocation] = None,
consistency: Optional[types.ReadConsistency] = None,
timeout: Optional[int] = None,
**kwargs: Any,
) -> List[types.ScoredPoint]:
collection = self._get_collection(collection_name)
return collection.discover(
target=target,
context=context,
query_filter=query_filter,
limit=limit,
offset=offset,
with_payload=with_payload,
with_vectors=with_vectors,
using=using,
lookup_from_collection=self._get_collection(lookup_from.collection)
if lookup_from
else None,
lookup_from_vector_name=lookup_from.vector if lookup_from else None,
)
[docs] def discover_batch(
self,
collection_name: str,
requests: Sequence[types.DiscoverRequest],
**kwargs: Any,
) -> List[List[types.ScoredPoint]]:
collection = self._get_collection(collection_name)
return [
collection.discover(
target=request.target,
context=request.context,
query_filter=request.filter,
limit=request.limit,
offset=request.offset,
with_payload=request.with_payload,
with_vectors=request.with_vector,
using=request.using,
lookup_from_collection=self._get_collection(request.lookup_from.collection)
if request.lookup_from
else None,
lookup_from_vector_name=request.lookup_from.vector
if request.lookup_from
else None,
)
for request in requests
]
[docs] def count(
self,
collection_name: str,
count_filter: Optional[types.Filter] = None,
exact: bool = True,
**kwargs: Any,
) -> types.CountResult:
collection = self._get_collection(collection_name)
return collection.count(count_filter=count_filter)
[docs] def facet(
self,
collection_name: str,
key: str,
facet_filter: Optional[types.Filter] = None,
limit: int = 10,
exact: bool = False,
**kwargs: Any,
) -> types.FacetResponse:
collection = self._get_collection(collection_name)
return collection.facet(key=key, facet_filter=facet_filter, limit=limit)
[docs] def upsert(
self, collection_name: str, points: types.Points, **kwargs: Any
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.upsert(points)
return self._default_update_result()
[docs] def update_vectors(
self,
collection_name: str,
points: Sequence[types.PointVectors],
**kwargs: Any,
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.update_vectors(points)
return self._default_update_result()
[docs] def delete_vectors(
self,
collection_name: str,
vectors: Sequence[str],
points: types.PointsSelector,
**kwargs: Any,
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.delete_vectors(vectors, points)
return self._default_update_result()
[docs] def retrieve(
self,
collection_name: str,
ids: Sequence[types.PointId],
with_payload: Union[bool, Sequence[str], types.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
**kwargs: Any,
) -> List[types.Record]:
collection = self._get_collection(collection_name)
return collection.retrieve(ids, with_payload, with_vectors)
@classmethod
def _default_update_result(cls, operation_id: int = 0) -> types.UpdateResult:
return types.UpdateResult(
operation_id=operation_id,
status=rest_models.UpdateStatus.COMPLETED,
)
[docs] def delete(
self, collection_name: str, points_selector: types.PointsSelector, **kwargs: Any
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.delete(points_selector)
return self._default_update_result()
[docs] def set_payload(
self,
collection_name: str,
payload: types.Payload,
points: types.PointsSelector,
key: Optional[str] = None,
**kwargs: Any,
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.set_payload(payload=payload, selector=points, key=key)
return self._default_update_result()
[docs] def overwrite_payload(
self,
collection_name: str,
payload: types.Payload,
points: types.PointsSelector,
**kwargs: Any,
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.overwrite_payload(payload=payload, selector=points)
return self._default_update_result()
[docs] def delete_payload(
self,
collection_name: str,
keys: Sequence[str],
points: types.PointsSelector,
**kwargs: Any,
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.delete_payload(keys=keys, selector=points)
return self._default_update_result()
[docs] def clear_payload(
self, collection_name: str, points_selector: types.PointsSelector, **kwargs: Any
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.clear_payload(selector=points_selector)
return self._default_update_result()
[docs] def batch_update_points(
self,
collection_name: str,
update_operations: Sequence[types.UpdateOperation],
**kwargs: Any,
) -> List[types.UpdateResult]:
collection = self._get_collection(collection_name)
collection.batch_update_points(update_operations)
return [self._default_update_result()] * len(update_operations)
[docs] def update_collection_aliases(
self, change_aliases_operations: Sequence[types.AliasOperations], **kwargs: Any
) -> bool:
for operation in change_aliases_operations:
if isinstance(operation, rest_models.CreateAliasOperation):
self._get_collection(operation.create_alias.collection_name)
self.aliases[operation.create_alias.alias_name] = (
operation.create_alias.collection_name
)
elif isinstance(operation, rest_models.DeleteAliasOperation):
self.aliases.pop(operation.delete_alias.alias_name, None)
elif isinstance(operation, rest_models.RenameAliasOperation):
new_name = operation.rename_alias.new_alias_name
old_name = operation.rename_alias.old_alias_name
self.aliases[new_name] = self.aliases.pop(old_name)
else:
raise ValueError(f"Unknown operation: {operation}")
self._save()
return True
[docs] def get_collection_aliases(
self, collection_name: str, **kwargs: Any
) -> types.CollectionsAliasesResponse:
if self.closed:
raise RuntimeError("QdrantLocal instance is closed. Please create a new instance.")
return types.CollectionsAliasesResponse(
aliases=[
rest_models.AliasDescription(
alias_name=alias_name,
collection_name=name,
)
for alias_name, name in self.aliases.items()
if name == collection_name
]
)
[docs] def get_aliases(self, **kwargs: Any) -> types.CollectionsAliasesResponse:
if self.closed:
raise RuntimeError("QdrantLocal instance is closed. Please create a new instance.")
return types.CollectionsAliasesResponse(
aliases=[
rest_models.AliasDescription(
alias_name=alias_name,
collection_name=name,
)
for alias_name, name in self.aliases.items()
]
)
[docs] def get_collections(self, **kwargs: Any) -> types.CollectionsResponse:
if self.closed:
raise RuntimeError("QdrantLocal instance is closed. Please create a new instance.")
return types.CollectionsResponse(
collections=[
rest_models.CollectionDescription(name=name)
for name, _ in self.collections.items()
]
)
[docs] def get_collection(self, collection_name: str, **kwargs: Any) -> types.CollectionInfo:
collection = self._get_collection(collection_name)
return collection.info()
[docs] def collection_exists(self, collection_name: str, **kwargs: Any) -> bool:
try:
self._get_collection(collection_name)
return True
except ValueError:
return False
[docs] def update_collection(
self,
collection_name: str,
sparse_vectors_config: Optional[Mapping[str, types.SparseVectorParams]] = None,
**kwargs: Any,
) -> bool:
_collection = self._get_collection(collection_name)
if sparse_vectors_config is not None:
for vector_name, vector_params in sparse_vectors_config.items():
_collection.update_sparse_vectors_config(vector_name, vector_params)
return True
return False
def _collection_path(self, collection_name: str) -> Optional[str]:
if self.persistent:
return os.path.join(self.location, "collection", collection_name)
else:
return None
[docs] def delete_collection(self, collection_name: str, **kwargs: Any) -> bool:
if self.closed:
raise RuntimeError("QdrantLocal instance is closed. Please create a new instance.")
_collection = self.collections.pop(collection_name, None)
del _collection
self.aliases = {
alias_name: name
for alias_name, name in self.aliases.items()
if name != collection_name
}
collection_path = self._collection_path(collection_name)
if collection_path is not None:
shutil.rmtree(collection_path, ignore_errors=True)
self._save()
return True
[docs] def create_collection(
self,
collection_name: str,
vectors_config: Union[types.VectorParams, Mapping[str, types.VectorParams]],
init_from: Optional[types.InitFrom] = None,
sparse_vectors_config: Optional[Mapping[str, types.SparseVectorParams]] = None,
**kwargs: Any,
) -> bool:
if self.closed:
raise RuntimeError("QdrantLocal instance is closed. Please create a new instance.")
src_collection = None
from_collection_name = None
if init_from is not None:
from_collection_name = (
init_from if isinstance(init_from, str) else init_from.collection
)
src_collection = self._get_collection(from_collection_name)
if collection_name in self.collections:
raise ValueError(f"Collection {collection_name} already exists")
collection_path = self._collection_path(collection_name)
if collection_path is not None:
os.makedirs(collection_path, exist_ok=True)
collection = LocalCollection(
rest_models.CreateCollection(
vectors=vectors_config,
sparse_vectors=sparse_vectors_config,
),
location=collection_path,
force_disable_check_same_thread=self.force_disable_check_same_thread,
)
self.collections[collection_name] = collection
if src_collection and from_collection_name:
batch_size = 100
records, next_offset = self.scroll(from_collection_name, limit=2, with_vectors=True)
self.upload_records(
collection_name, records
) # it is not crucial to replace upload_records here
# since it is an internal usage, and we don't have custom shard keys in qdrant local
while next_offset is not None:
records, next_offset = self.scroll(
from_collection_name, offset=next_offset, limit=batch_size, with_vectors=True
)
self.upload_records(collection_name, records)
self._save()
return True
[docs] def recreate_collection(
self,
collection_name: str,
vectors_config: Union[types.VectorParams, Mapping[str, types.VectorParams]],
init_from: Optional[types.InitFrom] = None,
sparse_vectors_config: Optional[Mapping[str, types.SparseVectorParams]] = None,
**kwargs: Any,
) -> bool:
self.delete_collection(collection_name)
return self.create_collection(
collection_name, vectors_config, init_from, sparse_vectors_config
)
[docs] def upload_points(
self, collection_name: str, points: Iterable[types.PointStruct], **kwargs: Any
) -> None:
self._upload_points(collection_name, points)
[docs] def upload_records(
self, collection_name: str, records: Iterable[types.Record], **kwargs: Any
) -> None:
# upload_records in local mode behaves like upload_records with wait=True in server mode
self._upload_points(collection_name, records)
def _upload_points(
self,
collection_name: str,
points: Iterable[Union[types.PointStruct, types.Record]],
) -> None:
collection = self._get_collection(collection_name)
collection.upsert(
[
rest_models.PointStruct(
id=point.id,
vector=point.vector or {},
payload=point.payload or {},
)
for point in points
]
)
[docs] def upload_collection(
self,
collection_name: str,
vectors: Union[
Dict[str, types.NumpyArray], types.NumpyArray, Iterable[types.VectorStruct]
],
payload: Optional[Iterable[Dict[Any, Any]]] = None,
ids: Optional[Iterable[types.PointId]] = None,
**kwargs: Any,
) -> None:
# upload_collection in local mode behaves like upload_collection with wait=True in server mode
def uuid_generator() -> Generator[str, None, None]:
while True:
yield str(uuid4())
collection = self._get_collection(collection_name)
if isinstance(vectors, dict) and any(isinstance(v, np.ndarray) for v in vectors.values()):
assert (
len(set([arr.shape[0] for arr in vectors.values()])) == 1
), "Each named vector should have the same number of vectors"
num_vectors = next(iter(vectors.values())).shape[0]
# convert Dict[str, np.ndarray] to List[Dict[str, List[float]]]
vectors = [
{name: vectors[name][i].tolist() for name in vectors.keys()}
for i in range(num_vectors)
]
collection.upsert(
[
rest_models.PointStruct(
id=point_id,
vector=(vector.tolist() if isinstance(vector, np.ndarray) else vector) or {},
payload=payload or {},
)
for (point_id, vector, payload) in zip(
ids or uuid_generator(),
iter(vectors),
payload or itertools.cycle([{}]),
)
]
)
[docs] def create_payload_index(
self,
collection_name: str,
field_name: str,
field_schema: Optional[types.PayloadSchemaType] = None,
field_type: Optional[types.PayloadSchemaType] = None,
**kwargs: Any,
) -> types.UpdateResult:
logging.warning(
"Payload indexes have no effect in the local Qdrant. Please use server Qdrant if you need payload indexes."
)
return self._default_update_result()
[docs] def delete_payload_index(
self, collection_name: str, field_name: str, **kwargs: Any
) -> types.UpdateResult:
logging.warning(
"Payload indexes have no effect in the local Qdrant. Please use server Qdrant if you need payload indexes."
)
return self._default_update_result()
[docs] def list_snapshots(
self, collection_name: str, **kwargs: Any
) -> List[types.SnapshotDescription]:
return []
[docs] def create_snapshot(
self, collection_name: str, **kwargs: Any
) -> Optional[types.SnapshotDescription]:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] def delete_snapshot(self, collection_name: str, snapshot_name: str, **kwargs: Any) -> bool:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] def list_full_snapshots(self, **kwargs: Any) -> List[types.SnapshotDescription]:
return []
[docs] def create_full_snapshot(self, **kwargs: Any) -> types.SnapshotDescription:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] def delete_full_snapshot(self, snapshot_name: str, **kwargs: Any) -> bool:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] def recover_snapshot(self, collection_name: str, location: str, **kwargs: Any) -> bool:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] def list_shard_snapshots(
self, collection_name: str, shard_id: int, **kwargs: Any
) -> List[types.SnapshotDescription]:
return []
[docs] def create_shard_snapshot(
self, collection_name: str, shard_id: int, **kwargs: Any
) -> Optional[types.SnapshotDescription]:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need snapshots."
)
[docs] def delete_shard_snapshot(
self, collection_name: str, shard_id: int, snapshot_name: str, **kwargs: Any
) -> bool:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need snapshots."
)
[docs] def recover_shard_snapshot(
self,
collection_name: str,
shard_id: int,
location: str,
**kwargs: Any,
) -> bool:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need snapshots."
)
[docs] def lock_storage(self, reason: str, **kwargs: Any) -> types.LocksOption:
raise NotImplementedError(
"Locks are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] def unlock_storage(self, **kwargs: Any) -> types.LocksOption:
raise NotImplementedError(
"Locks are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] def get_locks(self, **kwargs: Any) -> types.LocksOption:
return types.LocksOption(
error_message=None,
write=False,
)
[docs] def create_shard_key(
self,
collection_name: str,
shard_key: types.ShardKey,
shards_number: Optional[int] = None,
replication_factor: Optional[int] = None,
placement: Optional[List[int]] = None,
**kwargs: Any,
) -> bool:
raise NotImplementedError(
"Sharding is not supported in the local Qdrant. Please use server Qdrant if you need sharding."
)
[docs] def delete_shard_key(
self,
collection_name: str,
shard_key: types.ShardKey,
**kwargs: Any,
) -> bool:
raise NotImplementedError(
"Sharding is not supported in the local Qdrant. Please use server Qdrant if you need sharding."
)
[docs] def info(self) -> types.VersionInfo:
version = importlib.metadata.version("qdrant-client")
return rest_models.VersionInfo(
title="qdrant - vector search engine", version=version, commit=None
)