Shortcuts

Source code for qdrant_client.local.payload_filters

from datetime import date, datetime, timezone
from typing import Any, Optional, Union, Dict

import numpy as np

from qdrant_client.http import models
from qdrant_client.local import datetime_utils
from qdrant_client.local.geo import boolean_point_in_polygon, geo_distance
from qdrant_client.local.payload_value_extractor import value_by_key


[docs]def get_value_counts(values: list[Any]) -> list[int]: counts = [] if all(value is None for value in values): counts.append(0) else: for value in values: if value is None: counts.append(0) elif hasattr(value, "__len__") and not isinstance(value, str): counts.append(len(value)) else: counts.append(1) return counts
[docs]def check_values_count(condition: models.ValuesCount, values: Optional[list[Any]]) -> bool: if values is None: return False counts = get_value_counts(values) if condition.lt is not None and all(count >= condition.lt for count in counts): return False if condition.lte is not None and all(count > condition.lte for count in counts): return False if condition.gt is not None and all(count <= condition.gt for count in counts): return False if condition.gte is not None and all(count < condition.gte for count in counts): return False return True
[docs]def check_geo_radius(condition: models.GeoRadius, values: Any) -> bool: if isinstance(values, dict) and "lat" in values and "lon" in values: lat = values["lat"] lon = values["lon"] distance = geo_distance( lon1=lon, lat1=lat, lon2=condition.center.lon, lat2=condition.center.lat, ) return distance < condition.radius return False
[docs]def check_geo_bounding_box(condition: models.GeoBoundingBox, values: Any) -> bool: if isinstance(values, dict) and "lat" in values and "lon" in values: lat = values["lat"] lon = values["lon"] # handle anti-meridian crossing case if condition.top_left.lon > condition.bottom_right.lon: longitude_condition = ( condition.top_left.lon <= lon <= 180 or -180 <= lon <= condition.bottom_right.lon ) else: longitude_condition = condition.top_left.lon <= lon <= condition.bottom_right.lon latitude_condition = condition.top_left.lat >= lat >= condition.bottom_right.lat return longitude_condition and latitude_condition return False
[docs]def check_geo_polygon(condition: models.GeoPolygon, values: Any) -> bool: if isinstance(values, dict) and "lat" in values and "lon" in values: lat = values["lat"] lon = values["lon"] exterior = [(point.lat, point.lon) for point in condition.exterior.points] interiors = [] if condition.interiors is not None: interiors = [ [(point.lat, point.lon) for point in interior.points] for interior in condition.interiors ] return boolean_point_in_polygon(point=(lat, lon), exterior=exterior, interiors=interiors) return False
[docs]def check_range_interface(condition: models.RangeInterface, value: Any) -> bool: if isinstance(condition, models.Range): return check_range(condition, value) if isinstance(condition, models.DatetimeRange): return check_datetime_range(condition, value) return False
[docs]def check_range(condition: models.Range, value: Any) -> bool: if not isinstance(value, (int, float)): return False return ( (condition.lt is None or value < condition.lt) and (condition.lte is None or value <= condition.lte) and (condition.gt is None or value > condition.gt) and (condition.gte is None or value >= condition.gte) )
[docs]def check_datetime_range(condition: models.DatetimeRange, value: Any) -> bool: def make_condition_tz_aware(dt: Optional[Union[datetime, date]]) -> Optional[datetime]: if isinstance(dt, date) and not isinstance(dt, datetime): dt = datetime.combine(dt, datetime.min.time()) if dt is None or dt.tzinfo is not None: return dt # Assume UTC if no timezone is provided return dt.replace(tzinfo=timezone.utc) if not isinstance(value, str): return False dt = datetime_utils.parse(value) if dt is None: return False condition.lt = make_condition_tz_aware(condition.lt) condition.lte = make_condition_tz_aware(condition.lte) condition.gt = make_condition_tz_aware(condition.gt) condition.gte = make_condition_tz_aware(condition.gte) return ( (condition.lt is None or dt < condition.lt) and (condition.lte is None or dt <= condition.lte) and (condition.gt is None or dt > condition.gt) and (condition.gte is None or dt >= condition.gte) )
[docs]def check_match(condition: models.Match, value: Any) -> bool: if isinstance(condition, models.MatchValue): return value == condition.value if isinstance(condition, models.MatchText): return value is not None and condition.text in value if isinstance(condition, models.MatchAny): return value in condition.any if isinstance(condition, models.MatchExcept): return value not in condition.except_ raise ValueError(f"Unknown match condition: {condition}")
[docs]def check_nested_filter(nested_filter: models.Filter, values: list[Any]) -> bool: return any(check_filter(nested_filter, v, point_id=-1, has_vector={}) for v in values)
[docs]def check_condition( condition: models.Condition, payload: dict, point_id: models.ExtendedPointId, has_vector: Dict[str, bool], ) -> bool: if isinstance(condition, models.IsNullCondition): values = value_by_key(payload, condition.is_null.key, flat=False) if values is None: return False if any(v is None for v in values): return True elif isinstance(condition, models.IsEmptyCondition): values = value_by_key(payload, condition.is_empty.key, flat=False) if ( values is None or len(values) == 0 or all((v is None or (isinstance(v, list) and len(v) == 0)) for v in values) ): return True elif isinstance(condition, models.HasIdCondition): if point_id in condition.has_id: return True elif isinstance(condition, models.HasVectorCondition): if condition.has_vector in has_vector and has_vector[condition.has_vector]: return True elif isinstance(condition, models.FieldCondition): values = value_by_key(payload, condition.key) if condition.match is not None: if values is None: return False return any(check_match(condition.match, v) for v in values) if condition.range is not None: if values is None: return False return any(check_range_interface(condition.range, v) for v in values) if condition.geo_bounding_box is not None: if values is None: return False return any(check_geo_bounding_box(condition.geo_bounding_box, v) for v in values) if condition.geo_radius is not None: if values is None: return False return any(check_geo_radius(condition.geo_radius, v) for v in values) if condition.values_count is not None: values = value_by_key(payload, condition.key, flat=False) return check_values_count(condition.values_count, values) if condition.geo_polygon is not None: if values is None: return False return any(check_geo_polygon(condition.geo_polygon, v) for v in values) elif isinstance(condition, models.NestedCondition): values = value_by_key(payload, condition.nested.key) if values is None: return False return check_nested_filter(condition.nested.filter, values) elif isinstance(condition, models.Filter): return check_filter(condition, payload, point_id, has_vector) else: raise ValueError(f"Unknown condition: {condition}") return False
[docs]def check_must( conditions: list[models.Condition], payload: dict, point_id: models.ExtendedPointId, has_vector: Dict[str, bool], ) -> bool: return all( check_condition(condition, payload, point_id, has_vector) for condition in conditions )
[docs]def check_must_not( conditions: list[models.Condition], payload: dict, point_id: models.ExtendedPointId, has_vector: Dict[str, bool], ) -> bool: return all( not check_condition(condition, payload, point_id, has_vector) for condition in conditions )
[docs]def check_should( conditions: list[models.Condition], payload: dict, point_id: models.ExtendedPointId, has_vector: Dict[str, bool], ) -> bool: return any( check_condition(condition, payload, point_id, has_vector) for condition in conditions )
[docs]def check_min_should( conditions: list[models.Condition], payload: dict, point_id: models.ExtendedPointId, vectors: Dict[str, Any], min_count: int, ) -> bool: return ( sum(check_condition(condition, payload, point_id, vectors) for condition in conditions) >= min_count )
[docs]def check_filter( payload_filter: models.Filter, payload: dict, point_id: models.ExtendedPointId, has_vector: Dict[str, bool], ) -> bool: if payload_filter.must is not None: if not check_must(payload_filter.must, payload, point_id, has_vector): return False if payload_filter.must_not is not None: if not check_must_not(payload_filter.must_not, payload, point_id, has_vector): return False if payload_filter.should is not None: if not check_should(payload_filter.should, payload, point_id, has_vector): return False if payload_filter.min_should is not None: if not check_min_should( payload_filter.min_should.conditions, payload, point_id, has_vector, payload_filter.min_should.min_count, ): return False return True
[docs]def calculate_payload_mask( payloads: list[dict], payload_filter: Optional[models.Filter], ids_inv: list[models.ExtendedPointId], deleted_per_vector: Dict[str, np.ndarray], ) -> np.ndarray: if payload_filter is None: return np.ones(len(payloads), dtype=bool) mask = np.zeros(len(payloads), dtype=bool) for i, payload in enumerate(payloads): has_vector = {} for vector_name, deleted in deleted_per_vector.items(): if not deleted[i]: has_vector[vector_name] = True if check_filter(payload_filter, payload, ids_inv[i], has_vector): mask[i] = True return mask

Qdrant

Learn more about Qdrant vector search project and ecosystem

Discover Qdrant

Similarity Learning

Explore practical problem solving with Similarity Learning

Learn Similarity Learning

Community

Find people dealing with similar problems and get answers to your questions

Join Community