Shortcuts

Source code for qdrant_client.http.api_client

from asyncio import get_event_loop
from functools import lru_cache
from typing import Any, Awaitable, Callable, Dict, Generic, Type, TypeVar, overload

from httpx import AsyncClient, Client, Request, Response
from pydantic import ValidationError
from qdrant_client.http.api.beta_api import AsyncBetaApi, SyncBetaApi
from qdrant_client.http.api.cluster_api import AsyncClusterApi, SyncClusterApi
from qdrant_client.http.api.collections_api import AsyncCollectionsApi, SyncCollectionsApi
from qdrant_client.http.api.points_api import AsyncPointsApi, SyncPointsApi
from qdrant_client.http.api.service_api import AsyncServiceApi, SyncServiceApi
from qdrant_client.http.api.snapshots_api import AsyncSnapshotsApi, SyncSnapshotsApi
from qdrant_client.http.exceptions import ResponseHandlingException, UnexpectedResponse

ClientT = TypeVar("ClientT", bound="ApiClient")
AsyncClientT = TypeVar("AsyncClientT", bound="AsyncApiClient")


[docs]class AsyncApis(Generic[AsyncClientT]): def __init__(self, host: str = None, **kwargs: Any): self.client = AsyncApiClient(host, **kwargs) self.beta_api = AsyncBetaApi(self.client) self.cluster_api = AsyncClusterApi(self.client) self.collections_api = AsyncCollectionsApi(self.client) self.points_api = AsyncPointsApi(self.client) self.service_api = AsyncServiceApi(self.client) self.snapshots_api = AsyncSnapshotsApi(self.client)
[docs] async def aclose(self) -> None: await self.client.aclose()
[docs]class SyncApis(Generic[ClientT]): def __init__(self, host: str = None, **kwargs: Any): self.client = ApiClient(host, **kwargs) self.beta_api = SyncBetaApi(self.client) self.cluster_api = SyncClusterApi(self.client) self.collections_api = SyncCollectionsApi(self.client) self.points_api = SyncPointsApi(self.client) self.service_api = SyncServiceApi(self.client) self.snapshots_api = SyncSnapshotsApi(self.client)
[docs] def close(self) -> None: self.client.close()
T = TypeVar("T") Send = Callable[[Request], Response] SendAsync = Callable[[Request], Awaitable[Response]] MiddlewareT = Callable[[Request, Send], Response] AsyncMiddlewareT = Callable[[Request, SendAsync], Awaitable[Response]]
[docs]class ApiClient: def __init__(self, host: str = None, **kwargs: Any) -> None: self.host = host self.middleware: MiddlewareT = BaseMiddleware() self._client = Client(**kwargs) @overload def request(self, *, type_: Type[T], method: str, url: str, path_params: Dict[str, Any] = None, **kwargs: Any) -> T: ... @overload # noqa F811 def request(self, *, type_: None, method: str, url: str, path_params: Dict[str, Any] = None, **kwargs: Any) -> None: ...
[docs] def request( # noqa F811 self, *, type_: Any, method: str, url: str, path_params: Dict[str, Any] = None, **kwargs: Any ) -> Any: if path_params is None: path_params = {} url = (self.host or "") + url.format(**path_params) if "params" in kwargs and "timeout" in kwargs["params"]: kwargs["timeout"] = int(kwargs["params"]["timeout"]) request = self._client.build_request(method, url, **kwargs) return self.send(request, type_)
@overload def request_sync(self, *, type_: Type[T], **kwargs: Any) -> T: ... @overload # noqa F811 def request_sync(self, *, type_: None, **kwargs: Any) -> None: ...
[docs] def request_sync(self, *, type_: Any, **kwargs: Any) -> Any: # noqa F811 """ This method is not used by the generated apis, but is included for convenience """ return get_event_loop().run_until_complete(self.request(type_=type_, **kwargs))
[docs] def send(self, request: Request, type_: Type[T]) -> T: response = self.middleware(request, self.send_inner) if response.status_code in [200, 201, 202]: try: return parse_as_type(response.json(), type_) except ValidationError as e: raise ResponseHandlingException(e) raise UnexpectedResponse.for_response(response)
[docs] def send_inner(self, request: Request) -> Response: try: response = self._client.send(request) except Exception as e: raise ResponseHandlingException(e) return response
[docs] def close(self) -> None: self._client.close()
[docs] def add_middleware(self, middleware: MiddlewareT) -> None: current_middleware = self.middleware def new_middleware(request: Request, call_next: Send) -> Response: def inner_send(request: Request) -> Response: return current_middleware(request, call_next) return middleware(request, inner_send) self.middleware = new_middleware
[docs]class AsyncApiClient: def __init__(self, host: str = None, **kwargs: Any) -> None: self.host = host self.middleware: AsyncMiddlewareT = BaseAsyncMiddleware() self._async_client = AsyncClient(**kwargs) @overload async def request( self, *, type_: Type[T], method: str, url: str, path_params: Dict[str, Any] = None, **kwargs: Any ) -> T: ... @overload # noqa F811 async def request( self, *, type_: None, method: str, url: str, path_params: Dict[str, Any] = None, **kwargs: Any ) -> None: ...
[docs] async def request( # noqa F811 self, *, type_: Any, method: str, url: str, path_params: Dict[str, Any] = None, **kwargs: Any ) -> Any: if path_params is None: path_params = {} url = (self.host or "") + url.format(**path_params) request = self._async_client.build_request(method, url, **kwargs) return await self.send(request, type_)
@overload def request_sync(self, *, type_: Type[T], **kwargs: Any) -> T: ... @overload # noqa F811 def request_sync(self, *, type_: None, **kwargs: Any) -> None: ...
[docs] def request_sync(self, *, type_: Any, **kwargs: Any) -> Any: # noqa F811 """ This method is not used by the generated apis, but is included for convenience """ return get_event_loop().run_until_complete(self.request(type_=type_, **kwargs))
[docs] async def send(self, request: Request, type_: Type[T]) -> T: response = await self.middleware(request, self.send_inner) if response.status_code in [200, 201, 202]: try: return parse_as_type(response.json(), type_) except ValidationError as e: raise ResponseHandlingException(e) raise UnexpectedResponse.for_response(response)
[docs] async def send_inner(self, request: Request) -> Response: try: response = await self._async_client.send(request) except Exception as e: raise ResponseHandlingException(e) return response
[docs] async def aclose(self) -> None: await self._async_client.aclose()
[docs] def add_middleware(self, middleware: AsyncMiddlewareT) -> None: current_middleware = self.middleware async def new_middleware(request: Request, call_next: SendAsync) -> Response: async def inner_send(request: Request) -> Response: return await current_middleware(request, call_next) return await middleware(request, inner_send) self.middleware = new_middleware
[docs]class BaseAsyncMiddleware: async def __call__(self, request: Request, call_next: SendAsync) -> Response: return await call_next(request)
[docs]class BaseMiddleware: def __call__(self, request: Request, call_next: Send) -> Response: return call_next(request)
@lru_cache(maxsize=None) def _get_parsing_type(type_: Any, source: str) -> Any: from pydantic.main import create_model type_name = getattr(type_, "__name__", str(type_)) return create_model(f"ParsingModel[{type_name}] (for {source})", obj=(type_, ...))
[docs]def parse_as_type(obj: Any, type_: Type[T]) -> T: model_type = _get_parsing_type(type_, source=parse_as_type.__name__) return model_type(obj=obj).obj

Qdrant

Learn more about Qdrant vector search project and ecosystem

Discover Qdrant

Similarity Learning

Explore practical problem solving with Similarity Learning

Learn Similarity Learning

Community

Find people dealing with similar problems and get answers to your questions

Join Community