Spaces:
Running
Running
# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"). You | |
# may not use this file except in compliance with the License. A copy of | |
# the License is located at | |
# | |
# https://aws.amazon.com/apache2.0/ | |
# | |
# or in the "license" file accompanying this file. This file is | |
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF | |
# ANY KIND, either express or implied. See the License for the specific | |
# language governing permissions and limitations under the License. | |
import re | |
from collections import namedtuple | |
from boto3.exceptions import ( | |
DynamoDBNeedsConditionError, | |
DynamoDBNeedsKeyConditionError, | |
DynamoDBOperationNotSupportedError, | |
) | |
ATTR_NAME_REGEX = re.compile(r'[^.\[\]]+(?![^\[]*\])') | |
class ConditionBase: | |
expression_format = '' | |
expression_operator = '' | |
has_grouped_values = False | |
def __init__(self, *values): | |
self._values = values | |
def __and__(self, other): | |
if not isinstance(other, ConditionBase): | |
raise DynamoDBOperationNotSupportedError('AND', other) | |
return And(self, other) | |
def __or__(self, other): | |
if not isinstance(other, ConditionBase): | |
raise DynamoDBOperationNotSupportedError('OR', other) | |
return Or(self, other) | |
def __invert__(self): | |
return Not(self) | |
def get_expression(self): | |
return { | |
'format': self.expression_format, | |
'operator': self.expression_operator, | |
'values': self._values, | |
} | |
def __eq__(self, other): | |
if isinstance(other, type(self)): | |
if self._values == other._values: | |
return True | |
return False | |
def __ne__(self, other): | |
return not self.__eq__(other) | |
class AttributeBase: | |
def __init__(self, name): | |
self.name = name | |
def __and__(self, value): | |
raise DynamoDBOperationNotSupportedError('AND', self) | |
def __or__(self, value): | |
raise DynamoDBOperationNotSupportedError('OR', self) | |
def __invert__(self): | |
raise DynamoDBOperationNotSupportedError('NOT', self) | |
def eq(self, value): | |
"""Creates a condition where the attribute is equal to the value. | |
:param value: The value that the attribute is equal to. | |
""" | |
return Equals(self, value) | |
def lt(self, value): | |
"""Creates a condition where the attribute is less than the value. | |
:param value: The value that the attribute is less than. | |
""" | |
return LessThan(self, value) | |
def lte(self, value): | |
"""Creates a condition where the attribute is less than or equal to the | |
value. | |
:param value: The value that the attribute is less than or equal to. | |
""" | |
return LessThanEquals(self, value) | |
def gt(self, value): | |
"""Creates a condition where the attribute is greater than the value. | |
:param value: The value that the attribute is greater than. | |
""" | |
return GreaterThan(self, value) | |
def gte(self, value): | |
"""Creates a condition where the attribute is greater than or equal to | |
the value. | |
:param value: The value that the attribute is greater than or equal to. | |
""" | |
return GreaterThanEquals(self, value) | |
def begins_with(self, value): | |
"""Creates a condition where the attribute begins with the value. | |
:param value: The value that the attribute begins with. | |
""" | |
return BeginsWith(self, value) | |
def between(self, low_value, high_value): | |
"""Creates a condition where the attribute is greater than or equal | |
to the low value and less than or equal to the high value. | |
:param low_value: The value that the attribute is greater than or equal to. | |
:param high_value: The value that the attribute is less than or equal to. | |
""" | |
return Between(self, low_value, high_value) | |
def __eq__(self, other): | |
return isinstance(other, type(self)) and self.name == other.name | |
def __ne__(self, other): | |
return not self.__eq__(other) | |
class ConditionAttributeBase(ConditionBase, AttributeBase): | |
"""This base class is for conditions that can have attribute methods. | |
One example is the Size condition. To complete a condition, you need | |
to apply another AttributeBase method like eq(). | |
""" | |
def __init__(self, *values): | |
ConditionBase.__init__(self, *values) | |
# This is assuming the first value to the condition is the attribute | |
# in which can be used to generate its attribute base. | |
AttributeBase.__init__(self, values[0].name) | |
def __eq__(self, other): | |
return ConditionBase.__eq__(self, other) and AttributeBase.__eq__( | |
self, other | |
) | |
def __ne__(self, other): | |
return not self.__eq__(other) | |
class ComparisonCondition(ConditionBase): | |
expression_format = '{0} {operator} {1}' | |
class Equals(ComparisonCondition): | |
expression_operator = '=' | |
class NotEquals(ComparisonCondition): | |
expression_operator = '<>' | |
class LessThan(ComparisonCondition): | |
expression_operator = '<' | |
class LessThanEquals(ComparisonCondition): | |
expression_operator = '<=' | |
class GreaterThan(ComparisonCondition): | |
expression_operator = '>' | |
class GreaterThanEquals(ComparisonCondition): | |
expression_operator = '>=' | |
class In(ComparisonCondition): | |
expression_operator = 'IN' | |
has_grouped_values = True | |
class Between(ConditionBase): | |
expression_operator = 'BETWEEN' | |
expression_format = '{0} {operator} {1} AND {2}' | |
class BeginsWith(ConditionBase): | |
expression_operator = 'begins_with' | |
expression_format = '{operator}({0}, {1})' | |
class Contains(ConditionBase): | |
expression_operator = 'contains' | |
expression_format = '{operator}({0}, {1})' | |
class Size(ConditionAttributeBase): | |
expression_operator = 'size' | |
expression_format = '{operator}({0})' | |
class AttributeType(ConditionBase): | |
expression_operator = 'attribute_type' | |
expression_format = '{operator}({0}, {1})' | |
class AttributeExists(ConditionBase): | |
expression_operator = 'attribute_exists' | |
expression_format = '{operator}({0})' | |
class AttributeNotExists(ConditionBase): | |
expression_operator = 'attribute_not_exists' | |
expression_format = '{operator}({0})' | |
class And(ConditionBase): | |
expression_operator = 'AND' | |
expression_format = '({0} {operator} {1})' | |
class Or(ConditionBase): | |
expression_operator = 'OR' | |
expression_format = '({0} {operator} {1})' | |
class Not(ConditionBase): | |
expression_operator = 'NOT' | |
expression_format = '({operator} {0})' | |
class Key(AttributeBase): | |
pass | |
class Attr(AttributeBase): | |
"""Represents an DynamoDB item's attribute.""" | |
def ne(self, value): | |
"""Creates a condition where the attribute is not equal to the value | |
:param value: The value that the attribute is not equal to. | |
""" | |
return NotEquals(self, value) | |
def is_in(self, value): | |
"""Creates a condition where the attribute is in the value, | |
:type value: list | |
:param value: The value that the attribute is in. | |
""" | |
return In(self, value) | |
def exists(self): | |
"""Creates a condition where the attribute exists.""" | |
return AttributeExists(self) | |
def not_exists(self): | |
"""Creates a condition where the attribute does not exist.""" | |
return AttributeNotExists(self) | |
def contains(self, value): | |
"""Creates a condition where the attribute contains the value. | |
:param value: The value the attribute contains. | |
""" | |
return Contains(self, value) | |
def size(self): | |
"""Creates a condition for the attribute size. | |
Note another AttributeBase method must be called on the returned | |
size condition to be a valid DynamoDB condition. | |
""" | |
return Size(self) | |
def attribute_type(self, value): | |
"""Creates a condition for the attribute type. | |
:param value: The type of the attribute. | |
""" | |
return AttributeType(self, value) | |
BuiltConditionExpression = namedtuple( | |
'BuiltConditionExpression', | |
[ | |
'condition_expression', | |
'attribute_name_placeholders', | |
'attribute_value_placeholders', | |
], | |
) | |
class ConditionExpressionBuilder: | |
"""This class is used to build condition expressions with placeholders""" | |
def __init__(self): | |
self._name_count = 0 | |
self._value_count = 0 | |
self._name_placeholder = 'n' | |
self._value_placeholder = 'v' | |
def _get_name_placeholder(self): | |
return '#' + self._name_placeholder + str(self._name_count) | |
def _get_value_placeholder(self): | |
return ':' + self._value_placeholder + str(self._value_count) | |
def reset(self): | |
"""Resets the placeholder name and values""" | |
self._name_count = 0 | |
self._value_count = 0 | |
def build_expression(self, condition, is_key_condition=False): | |
"""Builds the condition expression and the dictionary of placeholders. | |
:type condition: ConditionBase | |
:param condition: A condition to be built into a condition expression | |
string with any necessary placeholders. | |
:type is_key_condition: Boolean | |
:param is_key_condition: True if the expression is for a | |
KeyConditionExpression. False otherwise. | |
:rtype: (string, dict, dict) | |
:returns: Will return a string representing the condition with | |
placeholders inserted where necessary, a dictionary of | |
placeholders for attribute names, and a dictionary of | |
placeholders for attribute values. Here is a sample return value: | |
('#n0 = :v0', {'#n0': 'myattribute'}, {':v1': 'myvalue'}) | |
""" | |
if not isinstance(condition, ConditionBase): | |
raise DynamoDBNeedsConditionError(condition) | |
attribute_name_placeholders = {} | |
attribute_value_placeholders = {} | |
condition_expression = self._build_expression( | |
condition, | |
attribute_name_placeholders, | |
attribute_value_placeholders, | |
is_key_condition=is_key_condition, | |
) | |
return BuiltConditionExpression( | |
condition_expression=condition_expression, | |
attribute_name_placeholders=attribute_name_placeholders, | |
attribute_value_placeholders=attribute_value_placeholders, | |
) | |
def _build_expression( | |
self, | |
condition, | |
attribute_name_placeholders, | |
attribute_value_placeholders, | |
is_key_condition, | |
): | |
expression_dict = condition.get_expression() | |
replaced_values = [] | |
for value in expression_dict['values']: | |
# Build the necessary placeholders for that value. | |
# Placeholders are built for both attribute names and values. | |
replaced_value = self._build_expression_component( | |
value, | |
attribute_name_placeholders, | |
attribute_value_placeholders, | |
condition.has_grouped_values, | |
is_key_condition, | |
) | |
replaced_values.append(replaced_value) | |
# Fill out the expression using the operator and the | |
# values that have been replaced with placeholders. | |
return expression_dict['format'].format( | |
*replaced_values, operator=expression_dict['operator'] | |
) | |
def _build_expression_component( | |
self, | |
value, | |
attribute_name_placeholders, | |
attribute_value_placeholders, | |
has_grouped_values, | |
is_key_condition, | |
): | |
# Continue to recurse if the value is a ConditionBase in order | |
# to extract out all parts of the expression. | |
if isinstance(value, ConditionBase): | |
return self._build_expression( | |
value, | |
attribute_name_placeholders, | |
attribute_value_placeholders, | |
is_key_condition, | |
) | |
# If it is not a ConditionBase, we can recurse no further. | |
# So we check if it is an attribute and add placeholders for | |
# its name | |
elif isinstance(value, AttributeBase): | |
if is_key_condition and not isinstance(value, Key): | |
raise DynamoDBNeedsKeyConditionError( | |
f'Attribute object {value.name} is of type {type(value)}. ' | |
f'KeyConditionExpression only supports Attribute objects ' | |
f'of type Key' | |
) | |
return self._build_name_placeholder( | |
value, attribute_name_placeholders | |
) | |
# If it is anything else, we treat it as a value and thus placeholders | |
# are needed for the value. | |
else: | |
return self._build_value_placeholder( | |
value, attribute_value_placeholders, has_grouped_values | |
) | |
def _build_name_placeholder(self, value, attribute_name_placeholders): | |
attribute_name = value.name | |
# Figure out which parts of the attribute name that needs replacement. | |
attribute_name_parts = ATTR_NAME_REGEX.findall(attribute_name) | |
# Add a temporary placeholder for each of these parts. | |
placeholder_format = ATTR_NAME_REGEX.sub('%s', attribute_name) | |
str_format_args = [] | |
for part in attribute_name_parts: | |
name_placeholder = self._get_name_placeholder() | |
self._name_count += 1 | |
str_format_args.append(name_placeholder) | |
# Add the placeholder and value to dictionary of name placeholders. | |
attribute_name_placeholders[name_placeholder] = part | |
# Replace the temporary placeholders with the designated placeholders. | |
return placeholder_format % tuple(str_format_args) | |
def _build_value_placeholder( | |
self, value, attribute_value_placeholders, has_grouped_values=False | |
): | |
# If the values are grouped, we need to add a placeholder for | |
# each element inside of the actual value. | |
if has_grouped_values: | |
placeholder_list = [] | |
for v in value: | |
value_placeholder = self._get_value_placeholder() | |
self._value_count += 1 | |
placeholder_list.append(value_placeholder) | |
attribute_value_placeholders[value_placeholder] = v | |
# Assuming the values are grouped by parenthesis. | |
# IN is the currently the only one that uses this so it maybe | |
# needed to be changed in future. | |
return '(' + ', '.join(placeholder_list) + ')' | |
# Otherwise, treat the value as a single value that needs only | |
# one placeholder. | |
else: | |
value_placeholder = self._get_value_placeholder() | |
self._value_count += 1 | |
attribute_value_placeholders[value_placeholder] = value | |
return value_placeholder | |