from asyncio import get_event_loop
from functools import lru_cache
from typing import Any, Awaitable, Callable, Dict, Generic, Type, TypeVar, overload
from httpx import AsyncClient, Client, Request, Response
from pydantic import ValidationError
from qdrant_client.http.api.beta_api import AsyncBetaApi, SyncBetaApi
from qdrant_client.http.api.cluster_api import AsyncClusterApi, SyncClusterApi
from qdrant_client.http.api.collections_api import AsyncCollectionsApi, SyncCollectionsApi
from qdrant_client.http.api.points_api import AsyncPointsApi, SyncPointsApi
from qdrant_client.http.api.service_api import AsyncServiceApi, SyncServiceApi
from qdrant_client.http.api.snapshots_api import AsyncSnapshotsApi, SyncSnapshotsApi
from qdrant_client.http.exceptions import ResponseHandlingException, UnexpectedResponse
ClientT = TypeVar("ClientT", bound="ApiClient")
AsyncClientT = TypeVar("AsyncClientT", bound="AsyncApiClient")
[docs]class AsyncApis(Generic[AsyncClientT]):
def __init__(self, host: str = None, **kwargs: Any):
self.client = AsyncApiClient(host, **kwargs)
self.beta_api = AsyncBetaApi(self.client)
self.cluster_api = AsyncClusterApi(self.client)
self.collections_api = AsyncCollectionsApi(self.client)
self.points_api = AsyncPointsApi(self.client)
self.service_api = AsyncServiceApi(self.client)
self.snapshots_api = AsyncSnapshotsApi(self.client)
[docs] async def aclose(self) -> None:
await self.client.aclose()
[docs]class SyncApis(Generic[ClientT]):
def __init__(self, host: str = None, **kwargs: Any):
self.client = ApiClient(host, **kwargs)
self.beta_api = SyncBetaApi(self.client)
self.cluster_api = SyncClusterApi(self.client)
self.collections_api = SyncCollectionsApi(self.client)
self.points_api = SyncPointsApi(self.client)
self.service_api = SyncServiceApi(self.client)
self.snapshots_api = SyncSnapshotsApi(self.client)
[docs] def close(self) -> None:
self.client.close()
T = TypeVar("T")
Send = Callable[[Request], Response]
SendAsync = Callable[[Request], Awaitable[Response]]
MiddlewareT = Callable[[Request, Send], Response]
AsyncMiddlewareT = Callable[[Request, SendAsync], Awaitable[Response]]
[docs]class ApiClient:
def __init__(self, host: str = None, **kwargs: Any) -> None:
self.host = host
self.middleware: MiddlewareT = BaseMiddleware()
self._client = Client(**kwargs)
@overload
def request(self, *, type_: Type[T], method: str, url: str, path_params: Dict[str, Any] = None, **kwargs: Any) -> T:
...
@overload # noqa F811
def request(self, *, type_: None, method: str, url: str, path_params: Dict[str, Any] = None, **kwargs: Any) -> None:
...
[docs] def request( # noqa F811
self, *, type_: Any, method: str, url: str, path_params: Dict[str, Any] = None, **kwargs: Any
) -> Any:
if path_params is None:
path_params = {}
url = (self.host or "") + url.format(**path_params)
if "params" in kwargs and "timeout" in kwargs["params"]:
kwargs["timeout"] = int(kwargs["params"]["timeout"])
request = self._client.build_request(method, url, **kwargs)
return self.send(request, type_)
@overload
def request_sync(self, *, type_: Type[T], **kwargs: Any) -> T:
...
@overload # noqa F811
def request_sync(self, *, type_: None, **kwargs: Any) -> None:
...
[docs] def request_sync(self, *, type_: Any, **kwargs: Any) -> Any: # noqa F811
"""
This method is not used by the generated apis, but is included for convenience
"""
return get_event_loop().run_until_complete(self.request(type_=type_, **kwargs))
[docs] def send(self, request: Request, type_: Type[T]) -> T:
response = self.middleware(request, self.send_inner)
if response.status_code in [200, 201, 202]:
try:
return parse_as_type(response.json(), type_)
except ValidationError as e:
raise ResponseHandlingException(e)
raise UnexpectedResponse.for_response(response)
[docs] def send_inner(self, request: Request) -> Response:
try:
response = self._client.send(request)
except Exception as e:
raise ResponseHandlingException(e)
return response
[docs] def close(self) -> None:
self._client.close()
[docs] def add_middleware(self, middleware: MiddlewareT) -> None:
current_middleware = self.middleware
def new_middleware(request: Request, call_next: Send) -> Response:
def inner_send(request: Request) -> Response:
return current_middleware(request, call_next)
return middleware(request, inner_send)
self.middleware = new_middleware
[docs]class AsyncApiClient:
def __init__(self, host: str = None, **kwargs: Any) -> None:
self.host = host
self.middleware: AsyncMiddlewareT = BaseAsyncMiddleware()
self._async_client = AsyncClient(**kwargs)
@overload
async def request(
self, *, type_: Type[T], method: str, url: str, path_params: Dict[str, Any] = None, **kwargs: Any
) -> T:
...
@overload # noqa F811
async def request(
self, *, type_: None, method: str, url: str, path_params: Dict[str, Any] = None, **kwargs: Any
) -> None:
...
[docs] async def request( # noqa F811
self, *, type_: Any, method: str, url: str, path_params: Dict[str, Any] = None, **kwargs: Any
) -> Any:
if path_params is None:
path_params = {}
url = (self.host or "") + url.format(**path_params)
request = self._async_client.build_request(method, url, **kwargs)
return await self.send(request, type_)
@overload
def request_sync(self, *, type_: Type[T], **kwargs: Any) -> T:
...
@overload # noqa F811
def request_sync(self, *, type_: None, **kwargs: Any) -> None:
...
[docs] def request_sync(self, *, type_: Any, **kwargs: Any) -> Any: # noqa F811
"""
This method is not used by the generated apis, but is included for convenience
"""
return get_event_loop().run_until_complete(self.request(type_=type_, **kwargs))
[docs] async def send(self, request: Request, type_: Type[T]) -> T:
response = await self.middleware(request, self.send_inner)
if response.status_code in [200, 201, 202]:
try:
return parse_as_type(response.json(), type_)
except ValidationError as e:
raise ResponseHandlingException(e)
raise UnexpectedResponse.for_response(response)
[docs] async def send_inner(self, request: Request) -> Response:
try:
response = await self._async_client.send(request)
except Exception as e:
raise ResponseHandlingException(e)
return response
[docs] async def aclose(self) -> None:
await self._async_client.aclose()
[docs] def add_middleware(self, middleware: AsyncMiddlewareT) -> None:
current_middleware = self.middleware
async def new_middleware(request: Request, call_next: SendAsync) -> Response:
async def inner_send(request: Request) -> Response:
return await current_middleware(request, call_next)
return await middleware(request, inner_send)
self.middleware = new_middleware
[docs]class BaseAsyncMiddleware:
async def __call__(self, request: Request, call_next: SendAsync) -> Response:
return await call_next(request)
[docs]class BaseMiddleware:
def __call__(self, request: Request, call_next: Send) -> Response:
return call_next(request)
@lru_cache(maxsize=None)
def _get_parsing_type(type_: Any, source: str) -> Any:
from pydantic.main import create_model
type_name = getattr(type_, "__name__", str(type_))
return create_model(f"ParsingModel[{type_name}] (for {source})", obj=(type_, ...))
[docs]def parse_as_type(obj: Any, type_: Type[T]) -> T:
model_type = _get_parsing_type(type_, source=parse_as_type.__name__)
return model_type(obj=obj).obj