from typing import Any, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple, Union
from qdrant_client.conversions import common_types as types
from qdrant_client.http import models
[docs]class QdrantBase:
def __init__(self, **kwargs: Any):
pass
[docs] def search_batch(
self,
collection_name: str,
requests: Sequence[types.SearchRequest],
**kwargs: Any,
) -> List[List[types.ScoredPoint]]:
raise NotImplementedError()
[docs] def search(
self,
collection_name: str,
query_vector: Union[
types.NumpyArray,
Sequence[float],
Tuple[str, List[float]],
types.NamedVector,
types.NamedSparseVector,
],
query_filter: Optional[models.Filter] = None,
search_params: Optional[models.SearchParams] = None,
limit: int = 10,
offset: Optional[int] = None,
with_payload: Union[bool, Sequence[str], models.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
score_threshold: Optional[float] = None,
**kwargs: Any,
) -> List[types.ScoredPoint]:
raise NotImplementedError()
[docs] def search_groups(
self,
collection_name: str,
query_vector: Union[
types.NumpyArray,
Sequence[float],
Tuple[str, List[float]],
types.NamedVector,
types.NamedSparseVector,
],
group_by: str,
query_filter: Optional[models.Filter] = None,
search_params: Optional[models.SearchParams] = None,
limit: int = 10,
group_size: int = 1,
with_payload: Union[bool, Sequence[str], models.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
score_threshold: Optional[float] = None,
with_lookup: Optional[types.WithLookupInterface] = None,
**kwargs: Any,
) -> types.GroupsResult:
raise NotImplementedError()
[docs] def search_matrix_offsets(
self,
collection_name: str,
query_filter: Optional[types.Filter] = None,
limit: int = 3,
sample: int = 10,
using: Optional[str] = None,
**kwargs: Any,
) -> types.SearchMatrixOffsetsResponse:
raise NotImplementedError()
[docs] def search_matrix_pairs(
self,
collection_name: str,
query_filter: Optional[types.Filter] = None,
limit: int = 3,
sample: int = 10,
using: Optional[str] = None,
**kwargs: Any,
) -> types.SearchMatrixPairsResponse:
raise NotImplementedError()
[docs] def query_batch_points(
self,
collection_name: str,
requests: Sequence[types.QueryRequest],
**kwargs: Any,
) -> List[types.QueryResponse]:
raise NotImplementedError()
[docs] def query_points(
self,
collection_name: str,
query: Union[
types.PointId,
List[float],
List[List[float]],
types.SparseVector,
types.Query,
types.NumpyArray,
types.Document,
None,
] = None,
using: Optional[str] = None,
prefetch: Union[types.Prefetch, List[types.Prefetch], None] = None,
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
offset: Optional[int] = None,
with_payload: Union[bool, Sequence[str], types.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
score_threshold: Optional[float] = None,
lookup_from: Optional[types.LookupLocation] = None,
**kwargs: Any,
) -> types.QueryResponse:
raise NotImplementedError()
[docs] def query_points_groups(
self,
collection_name: str,
group_by: str,
query: Union[
types.PointId,
List[float],
List[List[float]],
types.SparseVector,
types.Query,
types.NumpyArray,
types.Document,
None,
] = None,
using: Optional[str] = None,
prefetch: Union[types.Prefetch, List[types.Prefetch], None] = None,
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
group_size: int = 3,
with_payload: Union[bool, Sequence[str], types.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
score_threshold: Optional[float] = None,
with_lookup: Optional[types.WithLookupInterface] = None,
lookup_from: Optional[types.LookupLocation] = None,
**kwargs: Any,
) -> types.GroupsResult:
raise NotImplementedError()
[docs] def recommend_batch(
self,
collection_name: str,
requests: Sequence[types.RecommendRequest],
**kwargs: Any,
) -> List[List[types.ScoredPoint]]:
raise NotImplementedError()
[docs] def recommend(
self,
collection_name: str,
positive: Optional[Sequence[types.RecommendExample]] = None,
negative: Optional[Sequence[types.RecommendExample]] = None,
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
offset: int = 0,
with_payload: Union[bool, List[str], types.PayloadSelector] = True,
with_vectors: Union[bool, List[str]] = False,
score_threshold: Optional[float] = None,
using: Optional[str] = None,
lookup_from: Optional[types.LookupLocation] = None,
strategy: Optional[types.RecommendStrategy] = None,
**kwargs: Any,
) -> List[types.ScoredPoint]:
raise NotImplementedError()
[docs] def recommend_groups(
self,
collection_name: str,
group_by: str,
positive: Optional[Sequence[types.RecommendExample]] = None,
negative: Optional[Sequence[types.RecommendExample]] = None,
query_filter: Optional[models.Filter] = None,
search_params: Optional[models.SearchParams] = None,
limit: int = 10,
group_size: int = 1,
score_threshold: Optional[float] = None,
with_payload: Union[bool, Sequence[str], models.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
using: Optional[str] = None,
lookup_from: Optional[models.LookupLocation] = None,
with_lookup: Optional[types.WithLookupInterface] = None,
strategy: Optional[types.RecommendStrategy] = None,
**kwargs: Any,
) -> types.GroupsResult:
raise NotImplementedError()
[docs] def discover(
self,
collection_name: str,
target: Optional[types.TargetVector] = None,
context: Optional[Sequence[types.ContextExamplePair]] = None,
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
offset: int = 0,
with_payload: Union[bool, List[str], types.PayloadSelector] = True,
with_vectors: Union[bool, List[str]] = False,
using: Optional[str] = None,
lookup_from: Optional[types.LookupLocation] = None,
consistency: Optional[types.ReadConsistency] = None,
**kwargs: Any,
) -> List[types.ScoredPoint]:
raise NotImplementedError()
[docs] def discover_batch(
self,
collection_name: str,
requests: Sequence[types.DiscoverRequest],
**kwargs: Any,
) -> List[List[types.ScoredPoint]]:
raise NotImplementedError()
[docs] def count(
self,
collection_name: str,
count_filter: Optional[types.Filter] = None,
exact: bool = True,
**kwargs: Any,
) -> types.CountResult:
raise NotImplementedError()
[docs] def facet(
self,
collection_name: str,
key: str,
facet_filter: Optional[types.Filter] = None,
limit: int = 10,
exact: bool = False,
**kwargs: Any,
) -> types.FacetResponse:
raise NotImplementedError()
[docs] def upsert(
self,
collection_name: str,
points: types.Points,
**kwargs: Any,
) -> types.UpdateResult:
raise NotImplementedError()
[docs] def update_vectors(
self,
collection_name: str,
points: Sequence[types.PointVectors],
**kwargs: Any,
) -> types.UpdateResult:
raise NotImplementedError()
[docs] def delete_vectors(
self,
collection_name: str,
vectors: Sequence[str],
points: types.PointsSelector,
**kwargs: Any,
) -> types.UpdateResult:
raise NotImplementedError()
[docs] def retrieve(
self,
collection_name: str,
ids: Sequence[types.PointId],
with_payload: Union[bool, Sequence[str], types.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
**kwargs: Any,
) -> List[types.Record]:
raise NotImplementedError()
[docs] def delete(
self,
collection_name: str,
points_selector: types.PointsSelector,
**kwargs: Any,
) -> types.UpdateResult:
raise NotImplementedError()
[docs] def set_payload(
self,
collection_name: str,
payload: types.Payload,
points: types.PointsSelector,
key: Optional[str] = None,
**kwargs: Any,
) -> types.UpdateResult:
raise NotImplementedError()
[docs] def overwrite_payload(
self,
collection_name: str,
payload: types.Payload,
points: types.PointsSelector,
**kwargs: Any,
) -> types.UpdateResult:
raise NotImplementedError()
[docs] def delete_payload(
self,
collection_name: str,
keys: Sequence[str],
points: types.PointsSelector,
**kwargs: Any,
) -> types.UpdateResult:
raise NotImplementedError()
[docs] def clear_payload(
self,
collection_name: str,
points_selector: types.PointsSelector,
**kwargs: Any,
) -> types.UpdateResult:
raise NotImplementedError()
[docs] def batch_update_points(
self,
collection_name: str,
update_operations: Sequence[types.UpdateOperation],
**kwargs: Any,
) -> List[types.UpdateResult]:
raise NotImplementedError()
[docs] def update_collection_aliases(
self,
change_aliases_operations: Sequence[types.AliasOperations],
**kwargs: Any,
) -> bool:
raise NotImplementedError()
[docs] def get_collection_aliases(
self, collection_name: str, **kwargs: Any
) -> types.CollectionsAliasesResponse:
raise NotImplementedError()
[docs] def get_aliases(self, **kwargs: Any) -> types.CollectionsAliasesResponse:
raise NotImplementedError()
[docs] def get_collections(self, **kwargs: Any) -> types.CollectionsResponse:
raise NotImplementedError()
[docs] def get_collection(self, collection_name: str, **kwargs: Any) -> types.CollectionInfo:
raise NotImplementedError()
[docs] def collection_exists(self, collection_name: str, **kwargs: Any) -> bool:
raise NotImplementedError()
[docs] def update_collection(
self,
collection_name: str,
**kwargs: Any,
) -> bool:
raise NotImplementedError()
[docs] def delete_collection(self, collection_name: str, **kwargs: Any) -> bool:
raise NotImplementedError()
[docs] def create_collection(
self,
collection_name: str,
vectors_config: Union[types.VectorParams, Mapping[str, types.VectorParams]],
**kwargs: Any,
) -> bool:
raise NotImplementedError()
[docs] def recreate_collection(
self,
collection_name: str,
vectors_config: Union[types.VectorParams, Mapping[str, types.VectorParams]],
**kwargs: Any,
) -> bool:
raise NotImplementedError()
[docs] def upload_records(
self,
collection_name: str,
records: Iterable[types.Record],
**kwargs: Any,
) -> None:
raise NotImplementedError()
[docs] def upload_points(
self,
collection_name: str,
points: Iterable[types.PointStruct],
**kwargs: Any,
) -> None:
raise NotImplementedError()
[docs] def upload_collection(
self,
collection_name: str,
vectors: Union[
Dict[str, types.NumpyArray], types.NumpyArray, Iterable[types.VectorStruct]
],
payload: Optional[Iterable[Dict[Any, Any]]] = None,
ids: Optional[Iterable[types.PointId]] = None,
**kwargs: Any,
) -> None:
raise NotImplementedError()
[docs] def create_payload_index(
self,
collection_name: str,
field_name: str,
field_schema: Optional[types.PayloadSchemaType] = None,
field_type: Optional[types.PayloadSchemaType] = None,
**kwargs: Any,
) -> types.UpdateResult:
raise NotImplementedError()
[docs] def delete_payload_index(
self,
collection_name: str,
field_name: str,
**kwargs: Any,
) -> types.UpdateResult:
raise NotImplementedError()
[docs] def list_snapshots(
self, collection_name: str, **kwargs: Any
) -> List[types.SnapshotDescription]:
raise NotImplementedError()
[docs] def create_snapshot(
self, collection_name: str, **kwargs: Any
) -> Optional[types.SnapshotDescription]:
raise NotImplementedError()
[docs] def delete_snapshot(
self, collection_name: str, snapshot_name: str, **kwargs: Any
) -> Optional[bool]:
raise NotImplementedError()
[docs] def list_full_snapshots(self, **kwargs: Any) -> List[types.SnapshotDescription]:
raise NotImplementedError()
[docs] def create_full_snapshot(self, **kwargs: Any) -> Optional[types.SnapshotDescription]:
raise NotImplementedError()
[docs] def delete_full_snapshot(self, snapshot_name: str, **kwargs: Any) -> Optional[bool]:
raise NotImplementedError()
[docs] def recover_snapshot(
self,
collection_name: str,
location: str,
**kwargs: Any,
) -> Optional[bool]:
raise NotImplementedError()
[docs] def list_shard_snapshots(
self, collection_name: str, shard_id: int, **kwargs: Any
) -> List[types.SnapshotDescription]:
raise NotImplementedError()
[docs] def create_shard_snapshot(
self, collection_name: str, shard_id: int, **kwargs: Any
) -> Optional[types.SnapshotDescription]:
raise NotImplementedError()
[docs] def delete_shard_snapshot(
self, collection_name: str, shard_id: int, snapshot_name: str, **kwargs: Any
) -> Optional[bool]:
raise NotImplementedError()
[docs] def recover_shard_snapshot(
self,
collection_name: str,
shard_id: int,
location: str,
**kwargs: Any,
) -> Optional[bool]:
raise NotImplementedError()
[docs] def lock_storage(self, reason: str, **kwargs: Any) -> types.LocksOption:
raise NotImplementedError()
[docs] def unlock_storage(self, **kwargs: Any) -> types.LocksOption:
raise NotImplementedError()
[docs] def get_locks(self, **kwargs: Any) -> types.LocksOption:
raise NotImplementedError()
[docs] def close(self, **kwargs: Any) -> None:
pass
[docs] def migrate(
self,
dest_client: "QdrantBase",
collection_names: Optional[List[str]] = None,
batch_size: int = 100,
recreate_on_collision: bool = False,
) -> None:
raise NotImplementedError()
[docs] def create_shard_key(
self,
collection_name: str,
shard_key: types.ShardKey,
shards_number: Optional[int] = None,
replication_factor: Optional[int] = None,
placement: Optional[List[int]] = None,
**kwargs: Any,
) -> bool:
raise NotImplementedError()
[docs] def delete_shard_key(
self,
collection_name: str,
shard_key: types.ShardKey,
**kwargs: Any,
) -> bool:
raise NotImplementedError()
[docs] def info(self) -> types.VersionInfo:
raise NotImplementedError()