# ======================================================================
# MODULE DETAILS
# This section provides metadata about the module, including its
# creation date, author, copyright information, and a brief description
# of the module's purpose and functionality.
# ======================================================================
# __| \ _ \ | _ \ __| __ __| __ __|
# ( _ \ / | ( | (_ | | |
# \___| _/ _\ _|_\ ____| \___/ \___| _| _|
# src/carlogtt_python_library/database/database_dynamo.py
# Created 9/30/23 - 4:38 PM UK Time (London) by carlogtt
"""
This module ...
"""
# ======================================================================
# EXCEPTIONS
# This section documents any exceptions made or code quality rules.
# These exceptions may be necessary due to specific coding requirements
# or to bypass false positives.
# ======================================================================
#
# ======================================================================
# IMPORTS
# Importing required libraries and modules for the application.
# ======================================================================
# Standard Library Imports
import decimal
import logging
import numbers
import time
from collections.abc import Generator, Iterable, Mapping, MutableMapping, Sequence
from typing import Any, Optional, TypedDict, Union
# Third Party Library Imports
import botocore.config
import botocore.exceptions
import mypy_boto3_dynamodb
from mypy_boto3_dynamodb import type_defs
# Local Folder (Relative) Imports
from .. import aws_boto3, exceptions, utils
# END IMPORTS
# ======================================================================
# List of public names in the module
__all__ = [
'DynamoDB',
]
# Setting up logger for current module
module_logger = logging.getLogger(__name__)
# Type aliases
DynamoDBClient = mypy_boto3_dynamodb.client.DynamoDBClient
# Placeholder, replace Any with AttributeValue later
DynamoDbList = Sequence[Any]
DynamoDbListDeserialized = Sequence[Any]
DynamoDbMap = Mapping[str, Any]
DynamoDbMapDeserialized = Mapping[str, Any]
PartitionKeyValue = Union[bytes, str, float]
SortKeyValue = Union[bytes, str, float]
AttributeValue = Union[
str,
bytes,
bytearray,
int,
float,
decimal.Decimal,
set[str],
set[bytes],
set[int],
set[float],
set[decimal.Decimal],
DynamoDbList,
DynamoDbMap,
bool,
None,
]
AttributeValueDeserialized = Union[
str,
bytes,
bytearray,
int,
float,
set[str],
set[bytes],
set[int],
set[float],
DynamoDbListDeserialized,
DynamoDbMapDeserialized,
bool,
None,
]
# Now replace the placeholders with actual definition
DynamoDbList = Sequence[AttributeValue] # type: ignore
DynamoDbListDeserialized = Sequence[AttributeValueDeserialized] # type: ignore
DynamoDbMap = Mapping[str, AttributeValue] # type: ignore
DynamoDbMapDeserialized = Mapping[str, AttributeValueDeserialized] # type: ignore
# General DynamoDB type annotations
PartitionKeyTypeDef = TypedDict(
"PartitionKeyTypeDef",
{
"S": str,
"N": str,
"B": bytes,
},
total=False,
)
PartitionKeyItem = dict[str, PartitionKeyTypeDef]
Item = dict[str, type_defs.AttributeValueTypeDef]
[docs]
class DynamoDB(aws_boto3.aws_service_base.AwsServiceBase[DynamoDBClient]):
"""
The DynamoDB class provides a simplified interface for interacting
with Amazon DynamoDB services within a Python application.
It includes an option to cache the client session to minimize
the number of AWS API call.
:param aws_region_name: The name of the AWS region where the
service is to be used. This parameter is required to
configure the AWS client.
:param aws_profile_name: The name of the AWS profile to use for
credentials. This is useful if you have multiple profiles
configured in your AWS credentials file.
Default is None, which means the default profile or
environment variables will be used if not provided.
:param aws_access_key_id: The AWS access key ID for
programmatically accessing AWS services. This parameter
is optional and only needed if not using a profile from
the AWS credentials file.
:param aws_secret_access_key: The AWS secret access key
corresponding to the provided access key ID. Like the
access key ID, this parameter is optional and only needed
if not using a profile.
:param aws_session_token: The AWS temporary session token
corresponding to the provided access key ID. Like the
access key ID, this parameter is optional and only needed
if not using a profile.
:param caching: Determines whether to enable caching for the
client session. If set to True, the client session will
be cached to improve performance and reduce the number
of API calls. Default is False.
:param client_parameters: A key-value pair object of parameters that
will be passed to the low-level service client.
"""
def __init__(
self,
aws_region_name: str,
*,
aws_profile_name: Optional[str] = None,
aws_access_key_id: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
aws_session_token: Optional[str] = None,
caching: bool = False,
client_parameters: Optional[dict[str, Any]] = None,
) -> None:
super().__init__(
aws_region_name=aws_region_name,
aws_profile_name=aws_profile_name,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_session_token=aws_session_token,
caching=caching,
client_parameters=client_parameters,
aws_service_name="dynamodb",
exception_type=exceptions.DynamoDBError,
)
self._serializer = DynamoDbSerializer()
[docs]
@utils.retry(exception_to_check=exceptions.DynamoDBError, delay_secs=1)
def get_tables(self) -> list[str]:
"""
Returns an array of table names associated with the current
account and endpoint.
:return: List of table names.
:raise DynamoDBError: If listing fails.
"""
try:
ddb_response = self._client.list_tables()
except botocore.exceptions.ClientError as ex:
raise exceptions.DynamoDBError(str(ex.response)) from None
except Exception as ex:
raise exceptions.DynamoDBError(str(ex)) from None
# If TableNames is not present then return an empty list
try:
response = ddb_response['TableNames']
except KeyError:
response = []
return response
[docs]
def get_items(self, table: str) -> Generator[dict[str, AttributeValueDeserialized], None, None]:
"""
Returns an Iterable of deserialized items in the table.
:param table: DynamoDB table name.
:return: Generator of deserialized items.
Iterable of dictionaries of all the columns in DynamoDB
i.e. {dynamodb_column_name: column_value, ...}
:raise DynamoDBError: If retrieval fails.
"""
ddb_scan_args: type_defs.ScanInputTypeDef = {'TableName': table}
try:
while True:
try:
with utils.retry(exception_to_check=Exception, delay_secs=1) as retryer:
ddb_response = retryer(self._client.scan, **ddb_scan_args)
except botocore.exceptions.ClientError as ex:
raise exceptions.DynamoDBError(str(ex.response))
except Exception as ex:
raise exceptions.DynamoDBError(str(ex))
if ddb_response.get('Items') and len(ddb_response['Items']) > 0:
# Convert the DynamoDB attribute values to
# deserialized values
deser_items = (
{
key: self._serializer.deserialize_att(value)
for key, value in ddb_item.items()
}
for ddb_item in ddb_response['Items']
)
yield from deser_items
else:
# Nothing to yield
yield from ()
# If LastEvaluatedKey is present then we need to scan
# for more items
if ddb_response.get('LastEvaluatedKey'):
ddb_scan_args['ExclusiveStartKey'] = ddb_response['LastEvaluatedKey']
# If no LastEvaluatedKey then break out of the while
# loop as we're done
else:
break
except Exception as ex:
raise exceptions.DynamoDBError(str(ex)) from None
[docs]
def get_items_count(self, table: str) -> int:
"""
Returns the number of items in a table.
:param table: DynamoDB table name.
:return: Item count.
:raise DynamoDBError: If count fails.
"""
running_total = 0
for _ in self.get_items(table):
running_total += 1
return running_total
[docs]
def get_item(
self,
table: str,
partition_key_key: str,
partition_key_value: PartitionKeyValue,
sort_key_key: Optional[str] = None,
sort_key_value: Optional[SortKeyValue] = None,
) -> Optional[dict[str, AttributeValueDeserialized]]:
"""
The get_item_from_table operation returns a dictionary of
deserialized attribute values for the item with the given
primary key. If there is no matching item, get_item_from_table
returns None.
:param table: DynamoDB table name.
:param partition_key_key: The key of the partition key.
:param partition_key_value: The value of the partition key.:
:param sort_key_key: The key of the sort key.
:param sort_key_value: The value of the sort key.
:return: Deserialized item or None.
:raise DynamoDBError: If retrieval fails.
"""
pk = self._serializer.serialize_p_key(
pk_key=partition_key_key,
pk_value=partition_key_value,
sk_key=sort_key_key,
sk_value=sort_key_value,
)
try:
with utils.retry(exception_to_check=Exception, delay_secs=1) as retryer:
ddb_response = retryer(self._client.get_item, TableName=table, Key=pk)
except botocore.exceptions.ClientError as ex:
raise exceptions.DynamoDBError(str(ex.response)) from None
except Exception as ex:
raise exceptions.DynamoDBError(str(ex)) from None
ddb_item = ddb_response.get('Item')
if not ddb_item:
return None
# Convert the DynamoDB attribute values to deserialized values
response = {key: self._serializer.deserialize_att(value) for key, value in ddb_item.items()}
return response
[docs]
def put_item(
self,
table: str,
partition_key_key: str,
partition_key_value: Optional[PartitionKeyValue] = None,
sort_key_key: Optional[str] = None,
sort_key_value: Optional[SortKeyValue] = None,
auto_generate_partition_key_value: Optional[bool] = False,
**items: AttributeValue,
) -> dict[str, AttributeValueDeserialized]:
"""
Creates a new item. If an item that has the same primary key as
the new item already exists in the specified table, the
operation will fail.
:param table: DynamoDB table name.
:param partition_key_key: The key of the partition key.
:param partition_key_value: The value of the partition key.
:param sort_key_key: The key of the sort key.
:param sort_key_value: The value of the sort key.
:param auto_generate_partition_key_value: If set to True, this
option instructs DynamoDB to automatically generate a
partition key value based on a counter mechanism.
For this to work, your table must contain a special item
with its partition key value set to '__PK_VALUE_COUNTER__'.
This item should have a numerical attribute named
'current_counter_value', which will be used and incremented
as the basis for generating new partition key values.
:param items: Additional items to add.
:return: The stored DynamoDB Item deserialized.
:raise DynamoDBError: If operation fails.
:raise DynamoDBConflictError: If put item fails due to a
conflict.
"""
if partition_key_value is not None and auto_generate_partition_key_value is True:
raise exceptions.DynamoDBError(
"If auto_generate_partition_key_value is enabled, a partition_key_value MUST NOT be"
" passed in."
)
elif partition_key_value is None and auto_generate_partition_key_value is False:
raise exceptions.DynamoDBError(
"If auto_generate_partition_key_value is disabled, a partition_key_value MUST be"
" passed in."
)
elif partition_key_value is not None and auto_generate_partition_key_value is False:
# If we don't need to increment the counter we just put the
# item in the table
try:
item_put = self._put_single_item(
table=table,
partition_key_key=partition_key_key,
partition_key_value=partition_key_value,
sort_key_key=sort_key_key,
sort_key_value=sort_key_value,
**items,
)
except Exception as ex:
raise exceptions.DynamoDBError(str(ex)) from None
elif partition_key_value is None and auto_generate_partition_key_value is True:
# If we need to increment the counter we do it with an
# atomic write
# Put new item
put_in_db: list[dict[str, AttributeValue]] = [
{
'TableName': table,
'PartitionKeyKey': partition_key_key,
'AutoGeneratePartitionKeyValue': True,
'Items': items,
},
]
atomic_write_response = self.atomic_writes(put=put_in_db)
# If we get here it means that the item has been added
# successfully therefore we return it
item_put = atomic_write_response['Put'][0]
else:
raise exceptions.DynamoDBError(
"Unable to determine a valid operation with the provided 'partition_key_value' and"
" 'auto_generate_partition_key_value'."
)
return item_put
[docs]
def update_item(
self,
table: str,
partition_key: dict[str, PartitionKeyValue],
sort_key: Optional[dict[str, SortKeyValue]] = None,
condition_attribute: Optional[dict[str, Any]] = None,
**items: AttributeValue,
) -> dict[str, AttributeValueDeserialized]:
"""
Performs a strict update on an existing item in DynamoDB.
This method enforces that the item with the specified partition
key must already exist before updating. If the item does not
exist, or if any specified condition does not match, the update
fails with a `DynamoDBConflictError`. No new item is created in
this scenario.
Edits an existing item’s attributes. If
condition_attribute_value is passed, the item will be updated
only if condition_attribute_value is a match with the value
stored in DynamoDB.
:param table: DynamoDB table name.
:param partition_key: DynamoDB partition key as dict of
partition_key {key: value}.
:param sort_key: DynamoDB sort key as dict of sort_key
{key: value}.
:param condition_attribute: DynamoDB attribute to matched as
dict of attribute_to_match {key: value}. When sent to
DynamoDB, the attribute will be as a condition to match.
:param items: Values for items to be updated.
:return: The updated DynamoDB Item deserialized.
:raise DynamoDBError: If update fails.
:raise DynamoDBConflictError: If update fails due to a conflict.
"""
# items is an optional parameter by default as using the **
# However, if no values are passed as **items we raise an
# exception as there is nothing to update
if not items:
raise exceptions.DynamoDBError(
"No values to update were passed to the DynamoDB update_item_in_table method."
)
# Initialize a dictionary with all the arguments to pass into
# the DynamoDB update_item call
ddb_update_item_args: dict[str, Any] = {
'TableName': table,
'ReturnValues': 'ALL_OLD',
}
# Serialize partition key
pk_key, pk_value = next(iter(partition_key.items()))
sk_key, sk_value = next(iter(sort_key.items())) if sort_key is not None else (None, None)
pk_ser = self._serializer.serialize_p_key(
pk_key=pk_key, pk_value=pk_value, sk_key=sk_key, sk_value=sk_value
)
# Serialize attributes
exp, exp_att_names, exp_att_values = self._serializer.serialize_update_items(**items)
# Build a condition expression for “strict update”
ddb_update_item_args.update({'ConditionExpression': f"attribute_exists(#{pk_key})"})
exp_att_names.update({f"#{pk_key}": pk_key})
# Check if a condition is required
if condition_attribute is not None:
# Unpack condition attribute dictionary
# We cant mutate the original dictionary because of the
# retry decorator will need to run through it again in case
# of failure
cond_att_key, cond_att_value = next(iter(condition_attribute.items()))
# If condition attribute exists pass it to the DynamoDB call
ddb_update_item_args.update({
'ConditionExpression': (
f"{ddb_update_item_args['ConditionExpression']} AND"
f" #{cond_att_key} = :condition_attribute_value_placeholder"
)
})
# #condition_attribute_key has to be passed
# along the ExpressionAttributeNames because is used by the
# ConditionExpression
exp_att_names[f"#{cond_att_key}"] = cond_att_key
# :condition_attribute_value_placeholder has to be passed
# along the ExpressionAttributeValues because is used by the
# ConditionExpression
exp_att_values[':condition_attribute_value_placeholder'] = (
self._serializer.serialize_att(cond_att_value)
)
# Update DynamoDB call arguments
ddb_update_item_args['Key'] = pk_ser
ddb_update_item_args['UpdateExpression'] = exp
ddb_update_item_args['ExpressionAttributeNames'] = exp_att_names
ddb_update_item_args['ExpressionAttributeValues'] = exp_att_values
module_logger.debug(ddb_update_item_args)
try:
with utils.retry(exception_to_check=Exception, delay_secs=1) as retryer:
ddb_response = retryer(self._client.update_item, **ddb_update_item_args)
except botocore.exceptions.ClientError as ex:
if "ConditionalCheckFailed" in str(ex):
raise exceptions.DynamoDBConflictError(str(ex.response)) from None
else:
raise exceptions.DynamoDBError(str(ex.response)) from None
except Exception as ex:
raise exceptions.DynamoDBError(str(ex)) from None
# If we get here it means that the item has been updated
# successfully therefore we return it
item = ddb_response.get('Attributes', {})
item_deser = {key: self._serializer.deserialize_att(value) for key, value in item.items()}
pk_key, pk_value, sk_key, sk_value = self._serializer.deserialize_p_key(pk_ser)
updated_item = {**item_deser, **items, **{pk_key: pk_value}}
if sk_key is not None:
updated_item.update({sk_key: sk_value})
# Because the **items in the updated_item dict is not serialized
# we need to normalize the whole dict before returning a dict of
# type dict[str, AttributeValueDeserialized]
updated_item_deser = self._serializer.normalize_item(updated_item)
return updated_item_deser
[docs]
def upsert_item(
self,
table: str,
partition_key: dict[str, PartitionKeyValue],
sort_key: Optional[dict[str, SortKeyValue]] = None,
condition_attribute: Optional[dict[str, Any]] = None,
**items: AttributeValue,
) -> dict[str, AttributeValueDeserialized]:
"""
Upsert-like operation.
1. Tries to update an existing item (strict update).
2. If that update fails because the item does not exist, it
puts a brand-new item.
:param table: DynamoDB table name.
:param partition_key: DynamoDB partition key as dict of
partition_key {key: value}.
:param sort_key: DynamoDB sort key as dict of sort_key
{key: value}.
:param condition_attribute: DynamoDB attribute to matched as
dict of attribute_to_match {key: value}. When sent to
DynamoDB, the attribute will be as a condition to match.
:param items: Values for items to be updated.
:return: The updated DynamoDB Item deserialized.
:raise DynamoDBError: If update fails.
:raise DynamoDBConflictError: If update fails due to a conflict.
"""
# items is an optional parameter by default as using the **
# However, if no values are passed as **items we raise an
# exception as there is nothing to update
if not items:
raise exceptions.DynamoDBError(
"No values to update were passed to the DynamoDB update_item_in_table method."
)
# Initialize a dictionary with all the arguments to pass into
# the DynamoDB update_item call
ddb_update_item_args: dict[str, Any] = {
'TableName': table,
'ReturnValues': 'ALL_OLD',
}
# Serialize partition key
pk_key, pk_value = next(iter(partition_key.items()))
sk_key, sk_value = next(iter(sort_key.items())) if sort_key is not None else (None, None)
pk_ser = self._serializer.serialize_p_key(
pk_key=pk_key, pk_value=pk_value, sk_key=sk_key, sk_value=sk_value
)
# Serialize attributes
exp, exp_att_names, exp_att_values = self._serializer.serialize_update_items(**items)
# Check if a condition is required
if condition_attribute is not None:
# Unpack condition attribute dictionary
# We cant mutate the original dictionary because of the
# retry decorator will need to run through it again in case
# of failure
cond_att_key, cond_att_value = next(iter(condition_attribute.items()))
# If condition attribute exists pass it to the DynamoDB call
ddb_update_item_args.update(
{'ConditionExpression': f"#{cond_att_key} = :condition_attribute_value_placeholder"}
)
# #condition_attribute_key has to be passed
# along the ExpressionAttributeNames because is used by the
# ConditionExpression
exp_att_names[f"#{cond_att_key}"] = cond_att_key
# :condition_attribute_value_placeholder has to be passed
# along the ExpressionAttributeValues because is used by the
# ConditionExpression
exp_att_values[':condition_attribute_value_placeholder'] = (
self._serializer.serialize_att(cond_att_value)
)
# Update DynamoDB call arguments
ddb_update_item_args['Key'] = pk_ser
ddb_update_item_args['UpdateExpression'] = exp
ddb_update_item_args['ExpressionAttributeNames'] = exp_att_names
ddb_update_item_args['ExpressionAttributeValues'] = exp_att_values
module_logger.debug(ddb_update_item_args)
try:
with utils.retry(exception_to_check=Exception, delay_secs=1) as retryer:
ddb_response = retryer(self._client.update_item, **ddb_update_item_args)
except botocore.exceptions.ClientError as ex:
if "ConditionalCheckFailed" in str(ex):
raise exceptions.DynamoDBConflictError(str(ex.response)) from None
else:
raise exceptions.DynamoDBError(str(ex.response)) from None
except Exception as ex:
raise exceptions.DynamoDBError(str(ex)) from None
# If we get here it means that the item has been updated
# successfully therefore we return it
item = ddb_response.get('Attributes', {})
item_deser = {key: self._serializer.deserialize_att(value) for key, value in item.items()}
pk_key, pk_value, sk_key, sk_value = self._serializer.deserialize_p_key(pk_ser)
updated_item = {**item_deser, **items, **{pk_key: pk_value}}
if sk_key is not None:
updated_item.update({sk_key: sk_value})
# Because the **items in the updated_item dict is not serialized
# we need to normalize the whole dict before returning a dict of
# type dict[str, AttributeValueDeserialized]
updated_item_deser = self._serializer.normalize_item(updated_item)
return updated_item_deser
[docs]
def delete_item(
self,
table: str,
partition_key_key: str,
partition_key_value: Union[PartitionKeyValue, Iterable[PartitionKeyValue]],
sort_key_key: Optional[str] = None,
sort_key_value: Optional[Union[SortKeyValue, Iterable[SortKeyValue]]] = None,
) -> list[dict[str, AttributeValueDeserialized]]:
"""
Deletes item(s) in a table by primary key.
If a single partition key value is provided, it deletes the
corresponding item. If multiple partition key values are
provided, it deletes all the corresponding items.
:param table: DynamoDB table name.
:param partition_key_key: The key of the partition key.
:param partition_key_value: The value or an iterable of values
of the partition key of the item or items to delete from
DynamoDB.
:param sort_key_key: The key of the sort key.
:param sort_key_value: The value or an iterable of values
of the sort key of the item or items to delete from
DynamoDB.
:return: A list of deleted DynamoDB Items deserialized.
:raise DynamoDBError: If deletion fails.
"""
if (sort_key_key is None) ^ (sort_key_value is None):
raise exceptions.DynamoDBError(
"Both sort_key_key and sort_key_value must be provided or both must be None."
)
has_sk = sort_key_key is not None and sort_key_value is not None
# Check if it's only one item to delete or many
pk_values: list[PartitionKeyValue] = []
if isinstance(partition_key_value, (str, bytes, int, float)):
pk_values.append(partition_key_value)
else:
pk_values.extend(partition_key_value)
sk_values: list[Optional[SortKeyValue]] = []
if has_sk:
assert sort_key_value is not None
if isinstance(sort_key_value, (str, bytes, int, float)):
sk_values.append(sort_key_value)
else:
sk_values.extend(sort_key_value)
else:
sk_values.extend([None] * len(pk_values))
if len(pk_values) != len(sk_values):
raise exceptions.DynamoDBError(
"The number of partition key values and sort key values must be the same."
)
# Prepare response list
response: list[dict[str, AttributeValueDeserialized]] = []
for pk_val, sk_val in zip(pk_values, sk_values):
pk_ser = self._serializer.serialize_p_key(
pk_key=partition_key_key, pk_value=pk_val, sk_key=sort_key_key, sk_value=sk_val
)
# Initialize a dictionary with all the arguments to pass
# into the DynamoDB delete_item call
ddb_delete_item_args: dict[str, Any] = {
'TableName': table,
'Key': pk_ser,
'ReturnValues': 'ALL_OLD',
}
try:
with utils.retry(exception_to_check=Exception, delay_secs=1) as retryer:
ddb_response = retryer(self._client.delete_item, **ddb_delete_item_args)
except botocore.exceptions.ClientError as ex:
raise exceptions.DynamoDBError(str(ex.response)) from None
except Exception as ex:
raise exceptions.DynamoDBError(str(ex)) from None
# If we get here it means that the item has been deleted
# successfully therefore we return it
item = ddb_response.get('Attributes', {})
item_deser = {
key: self._serializer.deserialize_att(value) for key, value in item.items()
}
pk_key, pk_value, sk_key, sk_value = self._serializer.deserialize_p_key(pk_ser)
deleted_item = {**item_deser, **{pk_key: pk_value}}
if sk_key is not None:
deleted_item.update({sk_key: sk_value})
response.append(deleted_item)
return response
[docs]
def delete_item_att(
self,
table: str,
partition_key_key: str,
partition_key_value: PartitionKeyValue,
attributes_to_delete: Iterable[str],
sort_key_key: Optional[str] = None,
sort_key_value: Optional[SortKeyValue] = None,
) -> dict[str, AttributeValueDeserialized]:
"""
Deletes item specific values in a table by primary key.
:param table: DynamoDB table name.
:param partition_key_key: The key of the partition key.
:param partition_key_value: The value of the partition key.
:param sort_key_key: The key of the sort key.
:param sort_key_value: The value of the sort key.
:param attributes_to_delete: An iterable of specific attributes
that are to be deleted from DynamoDB.
:return: The updated DynamoDB Item deserialized.
:raise DynamoDBError: If deletion fails.
"""
pk_ser = self._serializer.serialize_p_key(
pk_key=partition_key_key,
pk_value=partition_key_value,
sk_key=sort_key_key,
sk_value=sort_key_value,
)
att_updates: dict[str, type_defs.AttributeValueUpdateTypeDef] = {
item_value: {'Action': "DELETE"} for item_value in attributes_to_delete
}
ddb_update_item_args: dict[str, Any] = {
'TableName': table,
'Key': pk_ser,
'AttributeUpdates': att_updates,
'ReturnValues': 'ALL_NEW',
}
try:
with utils.retry(exception_to_check=Exception, delay_secs=1) as retryer:
ddb_response = retryer(self._client.update_item, **ddb_update_item_args)
except botocore.exceptions.ClientError as ex:
if "ConditionalCheckFailed" in str(ex):
raise exceptions.DynamoDBConflictError(str(ex.response)) from None
else:
raise exceptions.DynamoDBError(str(ex.response)) from None
except Exception as ex:
raise exceptions.DynamoDBError(str(ex)) from None
# If we get here it means that the item has been updated
# successfully therefore we return it
item = ddb_response.get('Attributes', {})
item_deser = {key: self._serializer.deserialize_att(value) for key, value in item.items()}
return item_deser
[docs]
def atomic_writes(
self,
put: Optional[Iterable[dict[str, AttributeValue]]] = None,
update: Optional[Iterable[dict[str, AttributeValue]]] = None,
upsert: Optional[Iterable[dict[str, AttributeValue]]] = None,
delete: Optional[Iterable[dict[str, AttributeValue]]] = None,
condition_check: Optional[Iterable[dict[str, AttributeValue]]] = None,
**kwargs,
) -> dict[str, list[dict[str, AttributeValueDeserialized]]]:
"""
A synchronous write operation that groups up to 100 action
requests. These actions can target items in different tables.
The actions are completed atomically so that either all of them
succeed, or all of them fail.
:param put: Initiates a PutItem operation to write a new item.
schema = {
'TableName': "string DynamoDB Table Name",
'PartitionKeyKey': "string of the PartitionKey key",
'PartitionKeyValue': "OPTIONAL - partition key value",
'SortKeyKey': "OPTIONAL - string of the SortKey key",
'SortKeyValue': "OPTIONAL - sort key value",
'AutoGeneratePartitionKeyValue': "OPTIONAL - bool",
'Items': "dict containing all the items to put in DynamoDB",
}
:param update: Initiates an UpdateItem operation to update an
existing item.
schema = {
'TableName': "string DynamoDB Table Name",
'PartitionKey': "The partition key as dict of partition_key
{key: value}",
'SortKey': "OPTIONAL - The sort key as dict of sort_key
{key: value}",
'Items': "dict containing all the values for items to be
updated",
'ConditionAttribute': "OPTIONAL - attribute to matched
as dict of attribute_to_match {key: value}",
}
:param upsert: Initiates an UpdateOrInsertItem operation to
update an existing item or insert if not in the table.
schema = {
'TableName': "string DynamoDB Table Name",
'PartitionKey': "The partition key as dict of partition_key
{key: value}",
'SortKey': "OPTIONAL - The sort key as dict of sort_key
{key: value}",
'Items': "dict containing all the values for items to be
updated",
'ConditionAttribute': "OPTIONAL - attribute to matched
as dict of attribute_to_match {key: value}",
}
:param delete: Initiates a DeleteItem operation to delete an
existing item.
schema = {
'TableName': "string DynamoDB Table Name",
'PartitionKey': "The partition key as dict of partition_key
{key: value}",
'SortKey': "OPTIONAL - The sort key as dict of sort_key
{key: value}",
}
:param condition_check: Applies a condition to an item that is
not being modified by the transaction. The condition must
be satisfied for the transaction to succeed.
schema = {
'TableName': "string DynamoDB Table Name",
'PartitionKey': "The partition key as dict of partition_key
{key: value}",
'SortKey': "OPTIONAL - The sort key as dict of sort_key
{key: value}",
}
:return: A dictionary with keys
'Put', 'Update', 'Delete', 'ConditionCheck',
and a list of items writes to the DynamoDB in the same
order as they were passed in.
schema = {
'Put': "list of items writes to DynamoDB",
'Update': "list of items writes to DynamoDB",
'Upsert': "list of items writes to DynamoDB",
'Delete': "list of items writes to DynamoDB",
'ConditionCheck': "list of items writes to DynamoDB",
}
:raise DynamoDBError: If atomic writes fail.
:raise DynamoDBConflictError: If atomic writes fail due to a
conflict.
"""
# Initialize missing arguments
put = put or {}
update = update or {}
upsert = upsert or {}
delete = delete or {}
condition_check = condition_check or {}
# Prepare the list of transactional items to be passed to the
# DynamoDB call
transact_items: list[type_defs.TransactWriteItemTypeDef] = []
# Prepare the response object
response: dict[str, list[dict[str, AttributeValueDeserialized]]] = {
'Put': [],
'Update': [],
'Upsert': [],
'Delete': [],
'ConditionCheck': [],
}
module_logger.debug(
f"Atomic Writes in Table initial request - Put: {put}, Update: {update}, Delete:"
f" {delete}, ConditionCheck: {condition_check}"
)
transact_items, response = self._atomic_writes_put(
put=put,
transact_items=transact_items,
response=response,
)
transact_items, response = self._atomic_writes_update(
update=update,
transact_items=transact_items,
response=response,
)
transact_items, response = self._atomic_writes_upsert(
upsert=upsert,
transact_items=transact_items,
response=response,
)
transact_items, response = self._atomic_writes_delete(
delete=delete,
transact_items=transact_items,
response=response,
)
transact_items, response = self._atomic_writes_condition_check(
condition_check=condition_check,
transact_items=transact_items,
response=response,
)
module_logger.debug(f"Atomic Writes in Table serialized request - {transact_items}")
# Make the DynamoDB atomic api call
try:
with utils.retry(exception_to_check=Exception, delay_secs=1) as retryer:
retryer(self._client.transact_write_items, TransactItems=transact_items, **kwargs)
except botocore.exceptions.ClientError as ex:
if "ConditionalCheckFailed" in str(ex):
raise exceptions.DynamoDBConflictError(str(ex.response)) from None
else:
raise exceptions.DynamoDBError(str(ex.response)) from None
except Exception as ex:
raise exceptions.DynamoDBError(str(ex)) from None
return response
def _atomic_writes_put(
self,
put: Iterable[dict[str, AttributeValue]],
transact_items: list[type_defs.TransactWriteItemTypeDef],
response: dict[str, list[dict[str, AttributeValueDeserialized]]],
) -> tuple[
list[type_defs.TransactWriteItemTypeDef],
dict[str, list[dict[str, AttributeValueDeserialized]]],
]:
"""
A helper method to prepare the atomic writes for put items.
:param put: A list of put items.
:param transact_items:
:param response:
:return:
"""
# Normalize each put item in the list for DynamoDB
# transactional call
for el in put:
assert isinstance(el['TableName'], str)
assert isinstance(el['Items'], Mapping)
assert isinstance(el['PartitionKeyKey'], str)
pk_value = el.get('PartitionKeyValue')
ag_pk_value = el.get('AutoGeneratePartitionKeyValue', False)
if pk_value is not None and ag_pk_value is True:
raise exceptions.DynamoDBError(
"If AutoGeneratePartitionKeyValue is enabled, a PartitionKeyValue MUST NOT be"
" passed in."
)
elif pk_value is not None and ag_pk_value is False:
# This is the case where we use el['PartitionKeyValue']
# as 'PartitionKeyValue'
pass
elif pk_value is None and ag_pk_value is True:
# This is the case where we auto generate the
# PartitionKeyValue
cur_counter, last_mod_timestamp = self._get_atomic_counter(table=el['TableName'])
new_counter = cur_counter + 1
# Check what is the type of the PartitionKey key of
# the table
pk_type = self._get_pk_type(table=el['TableName'])
if issubclass(pk_type, str):
auto_generate_pk_value: PartitionKeyValue = str(new_counter)
elif issubclass(pk_type, bytes):
auto_generate_pk_value = str(new_counter).encode()
elif issubclass(pk_type, float):
auto_generate_pk_value = new_counter
elif pk_value is None and ag_pk_value is False:
raise exceptions.DynamoDBError(
"If AutoGeneratePartitionKeyValue is disabled, a PartitionKeyValue MUST be"
" passed in."
)
else:
raise exceptions.DynamoDBError(
"Unable to determine a valid operation with the provided PartitionKeyValue and"
" AutoGeneratePartitionKeyValue."
)
pk_value_resolved = el.get('PartitionKeyValue') or auto_generate_pk_value
assert isinstance(pk_value_resolved, (str, bytes, int, float))
pk: dict[str, PartitionKeyValue] = {el['PartitionKeyKey']: pk_value_resolved}
if (el.get('SortKeyKey') is None) ^ (el.get('SortKeyValue') is None):
raise exceptions.DynamoDBError(
"Both SortKeyKey and SortKeyValue must be provided or both must be None."
)
has_sk = el.get('SortKeyKey') is not None and el.get('SortKeyValue') is not None
sk: dict[str, PartitionKeyValue] = {}
if has_sk:
sk_key = el['SortKeyKey']
sk_value = el['SortKeyValue']
assert isinstance(sk_key, str)
assert isinstance(sk_value, (str, bytes, int, float))
sk = {sk_key: sk_value}
# If we don't need to increment the counter we just put the
# item in the table
el_put_ser: type_defs.PutTypeDef = {
'TableName': el['TableName'],
'Item': self._serializer.serialize_put_items(**pk, **sk, **el['Items']),
'ConditionExpression': f"attribute_not_exists({el['PartitionKeyKey']})",
}
# Append the 'put' item to the DynamoDB atomic call
transact_items.append({'Put': el_put_ser})
# Append the 'put' item to the return list
response['Put'].append({
key: self._serializer.deserialize_att(value) # type: ignore
for key, value in el_put_ser['Item'].items()
})
# If we need to increment the counter we update the counter
if ag_pk_value:
# Update the counter
counter_update_ser = self._set_atomic_counter(
table=el['TableName'],
counter_value=new_counter,
last_modified_timestamp=last_mod_timestamp,
)
# Append the 'update' item to the DynamoDB atomic call
transact_items.append({'Update': counter_update_ser})
# Append the 'update' item to the return list
updated_item: dict[str, Any] = {
key[1:-12]: self._serializer.deserialize_att(value) # type: ignore
for key, value in counter_update_ser['ExpressionAttributeValues'].items()
}
del updated_item['condition_attribute_value']
# Ignoring sk because _set_atomic_counter is not
# using it
p_key_k, p_key_v, *_ = self._serializer.deserialize_p_key(
counter_update_ser['Key'] # type: ignore
)
updated_item[p_key_k] = p_key_v
response['Update'].append(updated_item)
return transact_items, response
def _atomic_writes_update(
self,
update: Iterable[dict[str, AttributeValue]],
transact_items: list[type_defs.TransactWriteItemTypeDef],
response: dict[str, list[dict[str, AttributeValueDeserialized]]],
) -> tuple[
list[type_defs.TransactWriteItemTypeDef],
dict[str, list[dict[str, AttributeValueDeserialized]]],
]:
# Normalize each update item in the list for DynamoDB
# transactional call
for el in update:
assert isinstance(el['TableName'], str)
assert isinstance(el['Items'], Mapping)
assert isinstance(el['PartitionKey'], MutableMapping)
pk_key, pk_value = next(iter(el['PartitionKey'].items()))
assert isinstance(pk_value, (str, bytes, int, float))
# Check if a sort_key is passed in
if el.get('SortKey') is not None:
# Unpack sort_key attribute dictionary
assert isinstance(el['SortKey'], MutableMapping)
sk_key, sk_value = next(iter(el['SortKey'].items()))
assert isinstance(sk_value, (str, bytes, int, float))
else:
sk_key, sk_value = (None, None)
pk_ser = self._serializer.serialize_p_key(
pk_key=pk_key, pk_value=pk_value, sk_key=sk_key, sk_value=sk_value
)
exp, exp_att_names, exp_att_values = self._serializer.serialize_update_items(
**el['Items']
)
# Build a condition expression for “strict update”
cond_exp = f"attribute_exists(#{pk_key})"
exp_att_names.update({f"#{pk_key}": pk_key})
el_update_ser: type_defs.UpdateTypeDef = {
'TableName': el['TableName'],
'Key': pk_ser,
'ConditionExpression': cond_exp,
'UpdateExpression': exp,
'ExpressionAttributeNames': exp_att_names,
'ExpressionAttributeValues': exp_att_values,
}
# Check if a condition is passed in
if el.get('ConditionAttribute') is not None:
# Unpack condition attribute dictionary
assert isinstance(el['ConditionAttribute'], MutableMapping)
# We cant mutate the original dictionary because of the
# retry decorator will need to run through it again in
# case of failure
cond_att_key, cond_att_value = next(iter(el['ConditionAttribute'].items()))
# If condition attribute exists pass it to the DynamoDB
# call
el_update_ser.update({
'ConditionExpression': (
f"{el_update_ser['ConditionExpression']} AND"
f" #{cond_att_key} = :condition_attribute_value_placeholder"
)
})
# #condition_att_key has to be passed
# along the ExpressionAttributeNames because is used by
# the ConditionExpression
exp_att_names[f"#{cond_att_key}"] = cond_att_key
# :condition_attribute_value_placeholder has to be
# passed along the ExpressionAttributeValues because
# is used by the ConditionExpression
exp_att_values[':condition_attribute_value_placeholder'] = (
self._serializer.serialize_att(cond_att_value)
)
# Append the 'update' item to the DynamoDB atomic call
transact_items.append({'Update': el_update_ser})
# Append the 'update' item to the return list
updated_item = {
key[1:-12]: self._serializer.deserialize_att(value)
for key, value in exp_att_values.items()
}
if el.get('ConditionAttribute') is not None:
del updated_item['condition_attribute_value']
pk_key, pk_value, sk_key, sk_value = self._serializer.deserialize_p_key(pk_ser)
updated_item[pk_key] = pk_value
if sk_key is not None:
updated_item[sk_key] = sk_value
response['Update'].append(updated_item)
return transact_items, response
def _atomic_writes_upsert(
self,
upsert: Iterable[dict[str, AttributeValue]],
transact_items: list[type_defs.TransactWriteItemTypeDef],
response: dict[str, list[dict[str, AttributeValueDeserialized]]],
) -> tuple[
list[type_defs.TransactWriteItemTypeDef],
dict[str, list[dict[str, AttributeValueDeserialized]]],
]:
# Normalize each upsert item in the list for DynamoDB
# transactional call
for el in upsert:
assert isinstance(el['TableName'], str)
assert isinstance(el['Items'], Mapping)
assert isinstance(el['PartitionKey'], MutableMapping)
pk_key, pk_value = next(iter(el['PartitionKey'].items()))
assert isinstance(pk_value, (str, bytes, int, float))
# Check if a sort_key is passed in
if el.get('SortKey') is not None:
# Unpack sort_key attribute dictionary
assert isinstance(el['SortKey'], MutableMapping)
sk_key, sk_value = next(iter(el['SortKey'].items()))
assert isinstance(sk_value, (str, bytes, int, float))
else:
sk_key, sk_value = (None, None)
pk_ser = self._serializer.serialize_p_key(
pk_key=pk_key, pk_value=pk_value, sk_key=sk_key, sk_value=sk_value
)
exp, exp_att_names, exp_att_values = self._serializer.serialize_update_items(
**el['Items']
)
el_upsert_ser: type_defs.UpdateTypeDef = {
'TableName': el['TableName'],
'Key': pk_ser,
'UpdateExpression': exp,
'ExpressionAttributeNames': exp_att_names,
'ExpressionAttributeValues': exp_att_values,
}
# Check if a condition is passed in
if el.get('ConditionAttribute') is not None:
# Unpack condition attribute dictionary
assert isinstance(el['ConditionAttribute'], MutableMapping)
# We cant mutate the original dictionary because of the
# retry decorator will need to run through it again in
# case of failure
cond_att_key, cond_att_value = next(iter(el['ConditionAttribute'].items()))
# If condition attribute exists pass it to the DynamoDB
# call
el_upsert_ser.update({
'ConditionExpression': (
f"#{cond_att_key} = :condition_attribute_value_placeholder"
)
})
# #condition_att_key has to be passed
# along the ExpressionAttributeNames because is used by
# the ConditionExpression
exp_att_names[f"#{cond_att_key}"] = cond_att_key
# :condition_attribute_value_placeholder has to be
# passed along the ExpressionAttributeValues because
# is used by the ConditionExpression
exp_att_values[':condition_attribute_value_placeholder'] = (
self._serializer.serialize_att(cond_att_value)
)
# Append the 'update' item to the DynamoDB atomic call
transact_items.append({'Update': el_upsert_ser})
# Append the 'upsert' item to the return list
upsert_item = {
key[1:-12]: self._serializer.deserialize_att(value)
for key, value in exp_att_values.items()
}
if el.get('ConditionAttribute') is not None:
del upsert_item['condition_attribute_value']
pk_key, pk_value, sk_key, sk_value = self._serializer.deserialize_p_key(pk_ser)
upsert_item[pk_key] = pk_value
if sk_key is not None:
upsert_item[sk_key] = sk_value
response['Upsert'].append(upsert_item)
return transact_items, response
def _atomic_writes_delete(
self,
delete: Iterable[dict[str, AttributeValue]],
transact_items: list[type_defs.TransactWriteItemTypeDef],
response: dict[str, list[dict[str, AttributeValueDeserialized]]],
) -> tuple[
list[type_defs.TransactWriteItemTypeDef],
dict[str, list[dict[str, AttributeValueDeserialized]]],
]:
# Normalize each delete item in the list for DynamoDB
# transactional call
for el in delete:
assert isinstance(el['TableName'], str)
assert isinstance(el['PartitionKey'], MutableMapping)
pk_key, pk_value = next(iter(el['PartitionKey'].items()))
assert isinstance(pk_value, (str, bytes, int, float))
# Check if a sort_key is passed in
if el.get('SortKey') is not None:
# Unpack sort_key attribute dictionary
assert isinstance(el['SortKey'], MutableMapping)
sk_key, sk_value = next(iter(el['SortKey'].items()))
assert isinstance(sk_value, (str, bytes, int, float))
else:
sk_key, sk_value = (None, None)
pk_ser = self._serializer.serialize_p_key(
pk_key=pk_key, pk_value=pk_value, sk_key=sk_key, sk_value=sk_value
)
el_delete_ser: type_defs.DeleteTypeDef = {
'TableName': el['TableName'],
'Key': pk_ser,
}
# Append the 'delete' item to the DynamoDB atomic call
transact_items.append({'Delete': el_delete_ser})
# Append the 'delete' item to the return list
deleted_item: dict[str, AttributeValueDeserialized] = {}
pk_key, pk_value, sk_key, sk_value = self._serializer.deserialize_p_key(pk_ser)
deleted_item[pk_key] = pk_value
if sk_key is not None:
deleted_item[sk_key] = sk_value
response['Delete'].append(deleted_item)
return transact_items, response
def _atomic_writes_condition_check(
self,
condition_check: Iterable[dict[str, AttributeValue]],
transact_items: list[type_defs.TransactWriteItemTypeDef],
response: dict[str, list[dict[str, AttributeValueDeserialized]]],
) -> tuple[
list[type_defs.TransactWriteItemTypeDef],
dict[str, list[dict[str, AttributeValueDeserialized]]],
]:
# Normalize each conditional check item in the list for DynamoDB
# transactional call
for el in condition_check:
assert isinstance(el['TableName'], str)
assert isinstance(el['PartitionKey'], MutableMapping)
pk_key, pk_value = next(iter(el['PartitionKey'].items()))
assert isinstance(pk_value, (str, bytes, int, float))
# Check if a sort_key is passed in
if el.get('SortKey') is not None:
# Unpack sort_key attribute dictionary
assert isinstance(el['SortKey'], MutableMapping)
sk_key, sk_value = next(iter(el['SortKey'].items()))
assert isinstance(sk_value, (str, bytes, int, float))
else:
sk_key, sk_value = (None, None)
pk_ser = self._serializer.serialize_p_key(
pk_key=pk_key, pk_value=pk_value, sk_key=sk_key, sk_value=sk_value
)
el_cond_check_ser: type_defs.ConditionCheckTypeDef = {
'TableName': el['TableName'],
'Key': pk_ser,
# TODO(carlogtt): not sure how to use the below yet
'ConditionExpression': 'string',
'ExpressionAttributeNames': {'string': 'string'},
'ExpressionAttributeValues': {},
}
# Append the 'condition_check' item to the DynamoDB atomic
# call
transact_items.append({'ConditionCheck': el_cond_check_ser})
# Append the 'condition_check' item to the return list
cond_check_item: dict[str, AttributeValueDeserialized] = {}
pk_key, pk_value, sk_key, sk_value = self._serializer.deserialize_p_key(pk_ser)
cond_check_item[pk_key] = pk_value
if sk_key is not None:
cond_check_item[sk_key] = sk_value
response['ConditionCheck'].append(cond_check_item)
return transact_items, response
[docs]
@utils.retry(exception_to_check=exceptions.DynamoDBError, delay_secs=1)
def put_atomic_counter(
self,
table: str,
):
"""
In Amazon DynamoDB, there isn't an in-built auto-increment
functionality like in SQL databases for generating record IDs
(Primary Key values). However, we can achieve a similar outcome
by managing an atomic counter.
This method put the initial __PK_VALUE_COUNTER__ item in the
table and set the value to 0.
If the __PK_VALUE_COUNTER__ item already exists in the table
then it does nothing.
:param table: DynamoDB table name.
:return: None
:raise DynamoDBError: If operation fails.
"""
# Using the tableName_SysItems table for lookup
sys_table = table + "_SysItems"
pk_ser = self._serializer.serialize_p_key(pk_key="pk_id", pk_value="__PK_VALUE_COUNTER__")
try:
ddb_response = self._client.get_item(TableName=sys_table, Key=pk_ser)
counter = ddb_response.get('Item')
except self._client.exceptions.ResourceNotFoundException:
module_logger.debug(f"Table: {sys_table} not found in DynamoDB, Creating it.")
# Initialize a dictionary with all the arguments to pass
# into the DynamoDB put_item call
ddb_create_table_args: dict[str, Any] = {
'TableName': sys_table,
'BillingMode': 'PAY_PER_REQUEST',
'AttributeDefinitions': [{
'AttributeName': 'pk_id',
'AttributeType': 'S',
}],
'KeySchema': [{
'AttributeName': 'pk_id',
'KeyType': 'HASH',
}],
'DeletionProtectionEnabled': True,
}
try:
self._client.create_table(**ddb_create_table_args)
# Give it time to create the table
time.sleep(15)
# Setting the counter to None, so we can put the counter
# item in the newly created table
counter = None
except botocore.exceptions.ClientError as ex:
raise exceptions.DynamoDBError(str(ex.response))
except Exception as ex:
raise exceptions.DynamoDBError(str(ex))
except botocore.exceptions.ClientError as ex:
raise exceptions.DynamoDBError(str(ex.response)) from None
except Exception as ex:
raise exceptions.DynamoDBError(str(ex)) from None
if not counter:
self.put_item(
table=sys_table,
partition_key_key="pk_id",
partition_key_value="__PK_VALUE_COUNTER__",
current_counter_value=0,
last_modified_timestamp=time.time_ns(),
)
def _put_single_item(
self,
table: str,
partition_key_key: str,
partition_key_value: PartitionKeyValue,
sort_key_key: Optional[str] = None,
sort_key_value: Optional[SortKeyValue] = None,
**items: AttributeValue,
) -> dict[str, AttributeValueDeserialized]:
"""
Put the item in the table.
:param table: DynamoDB table name.
:param partition_key_key: The key of the partition key.
:param partition_key_value: The value of the partition key.
:param sort_key_key: The key of the sort key.
:param sort_key_value: The value of the sort key.
:param items: Additional items to add.
:return: The stored DynamoDB Item deserialized.
:raise DynamoDBError: If operation fails.
:raise DynamoDBConflictError: If put item fails due to a
conflict.
"""
pk_ser = self._serializer.serialize_p_key(
pk_key=partition_key_key,
pk_value=partition_key_value,
sk_key=sort_key_key,
sk_value=sort_key_value,
)
additional_items = self._serializer.serialize_put_items(**items)
items_ser = {**additional_items, **pk_ser}
# Initialize a dictionary with all the arguments to pass into
# the DynamoDB put_item call
ddb_put_item_args: dict[str, Any] = {
'TableName': table,
'Item': items_ser,
'ConditionExpression': f"attribute_not_exists({partition_key_key})",
}
try:
with utils.retry(exception_to_check=Exception, delay_secs=1) as retryer:
retryer(self._client.put_item, **ddb_put_item_args)
except botocore.exceptions.ClientError as ex:
if "ConditionalCheckFailed" in str(ex):
raise exceptions.DynamoDBConflictError(str(ex.response))
else:
raise exceptions.DynamoDBError(str(ex.response))
except Exception as ex:
raise exceptions.DynamoDBError(str(ex))
# If we get here it means that the item has been added
# successfully therefore we return it
item_put = {
key: self._serializer.deserialize_att(value) for key, value in items_ser.items()
}
return item_put
def _get_atomic_counter(self, table: str) -> tuple[int, int]:
"""
This method will get the value of the atomic counter.
:param table: DynamoDB table name.
:return: Counter value and last_modified_timestamp as tuple.
:raise DynamoDBError: If counter not found.
"""
# Using the tableName_SysItems table for lookup
sys_table = table + "_SysItems"
try:
item = self.get_item(
table=sys_table,
partition_key_key="pk_id",
partition_key_value="__PK_VALUE_COUNTER__",
)
assert item is not None
current_counter_value = item['current_counter_value']
assert isinstance(current_counter_value, int)
last_modified_timestamp = item['last_modified_timestamp']
assert isinstance(last_modified_timestamp, int)
except AssertionError:
raise exceptions.DynamoDBError(
f"table: {table!r} doesn't have the '__PK_VALUE_COUNTER__' item, call"
f" '{self.put_atomic_counter.__name__}' to create one."
)
except (TypeError, KeyError):
raise exceptions.DynamoDBError(
f"Item '__PK_VALUE_COUNTER__' in table: {table!r} is missing some or all of the"
" mandatory attributes: 'current_counter_value' and 'last_modified_timestamp'."
)
except Exception as ex:
if "ResourceNotFoundException" in str(ex):
raise exceptions.DynamoDBError(
f"'__PK_VALUE_COUNTER__' not found as table: '{table}_SysItems' doesn't exists,"
f" call '{self.put_atomic_counter.__name__}('{table}')' to create one."
)
else:
raise
return current_counter_value, last_modified_timestamp
def _set_atomic_counter(
self, table: str, counter_value: int, last_modified_timestamp: int
) -> type_defs.UpdateTypeDef:
"""
This method will prepare an atomic update dictionary.
:param table: DynamoDB table name.
:param counter_value: The new value to se to the counter.
:param last_modified_timestamp: The last modified timestamp to
use as condition expression.
:return: An atomic update dictionary.
:raise DynamoDBError: If operation fails.
"""
# Using the tableName_SysItems table for lookup
sys_table = table + "_SysItems"
# Update the counter
exp, exp_att_names, exp_att_values = self._serializer.serialize_update_items(**{
'current_counter_value': counter_value,
'last_modified_timestamp': time.time_ns(),
})
# :condition_attribute_value_placeholder has to be
# passed along the ExpressionAttributeValues because
# is used by the ConditionExpression
exp_att_values[':condition_attribute_value_placeholder'] = self._serializer.serialize_att(
last_modified_timestamp
)
pk_ser = self._serializer.serialize_p_key(pk_key="pk_id", pk_value="__PK_VALUE_COUNTER__")
el_update_ser: type_defs.UpdateTypeDef = {
'TableName': sys_table,
'Key': pk_ser,
'UpdateExpression': exp,
'ExpressionAttributeNames': exp_att_names,
'ExpressionAttributeValues': exp_att_values,
'ConditionExpression': (
"#last_modified_timestamp = :condition_attribute_value_placeholder"
),
}
return el_update_ser
def _get_pk_type(self, table: str) -> Union[type[bytes], type[str], type[float]]:
"""
Scan the table and return the type of the PartitionKeyItem Key.
:param table: DynamoDB table name.
:return: The type of the PartitionKey key.
:raise DynamoDBError: If operation fails.
"""
# Initialize values to prevent UnboundLocalError
pk_key = ""
pk_key_type = ""
try:
with utils.retry(exception_to_check=Exception, delay_secs=1) as retryer:
ddb_response = retryer(self._client.describe_table, TableName=table)
except botocore.exceptions.ClientError as ex:
raise exceptions.DynamoDBError(str(ex.response))
except Exception as ex:
raise exceptions.DynamoDBError(str(ex))
# Get the PartitionKey Key
for idx, schemas in enumerate(ddb_response['Table']['KeySchema']):
for k, v in schemas.items():
if k == 'KeyType' and v == 'HASH':
pk_key = ddb_response['Table']['KeySchema'][idx]['AttributeName']
# Get the PartitionKey Key Type
for idx, attributes in enumerate(ddb_response['Table']['AttributeDefinitions']):
for k, v in attributes.items():
if k == 'AttributeName' and v == pk_key:
pk_key_type = ddb_response['Table']['AttributeDefinitions'][idx][
'AttributeType'
]
# Convert to Python type and return
if pk_key_type == 'S':
return str
elif pk_key_type == 'B':
return bytes
elif pk_key_type == 'N':
return float
else:
raise exceptions.DynamoDBError("PartitionKey Key Type not found")
class DynamoDbSerializer:
"""
DynamoDbSerializer is a utility class for serializing and
deserializing Python data types to DynamoDB compatible formats and
vice versa.
It provides methods for converting Python objects to DynamoDB
attributes and for converting DynamoDB attributes back to Python
objects.
This class is useful for preparing data for storage in a DynamoDB
table, and for reading data back into Python objects.
"""
def __init__(self):
self.string_utils = utils.StringUtils()
def serialize_att(self, attribute_value: AttributeValue) -> type_defs.AttributeValueTypeDef:
"""
Serialize a Python data type into a format suitable for
AWS DynamoDB.
Transforms a Python data type into a format that is compatible
with AWS DynamoDB by mapping it into its corresponding DynamoDB
data type.
:param attribute_value: The attribute value to be serialized.
:return: A dictionary containing the serialized attribute and
its corresponding DynamoDB data type descriptor.
i.e. "string" -> {"S": "string"}
:raise DynamoDBError: If the provided attribute_value type is
not supported.
"""
if attribute_value is None:
return {"NULL": True}
elif isinstance(attribute_value, bool):
return {"BOOL": attribute_value}
elif isinstance(attribute_value, str):
return {"S": attribute_value}
elif isinstance(attribute_value, bytes) or isinstance(attribute_value, bytearray):
return {"B": attribute_value}
elif isinstance(attribute_value, (numbers.Real, decimal.Decimal)):
return {"N": str(attribute_value)}
elif isinstance(attribute_value, set):
# Check the set contains at least one element
try:
set_el_sample = next(iter(attribute_value))
except StopIteration:
raise exceptions.DynamoDBError(
f"Object of type {type(attribute_value)!r} for {attribute_value!r} is not"
f" supported by DynamoDB serialization because {type(attribute_value)!r} empty."
)
# Check if set is homogeneous
if not all(isinstance(el, type(set_el_sample)) for el in attribute_value):
raise exceptions.DynamoDBError(
f"Object of type {type(attribute_value)!r} for {attribute_value!r} is not"
f" supported by DynamoDB serialization because {type(attribute_value)!r} must"
" be homogeneous."
)
if isinstance(set_el_sample, str):
return {"SS": list(attribute_value)} # type: ignore
elif isinstance(set_el_sample, bytes):
return {"BS": list(attribute_value)} # type: ignore
elif isinstance(set_el_sample, (numbers.Real, decimal.Decimal)):
return {"NS": [str(el) for el in attribute_value]}
else:
raise exceptions.DynamoDBError(
f"Object of type {type(attribute_value)!r} for {attribute_value!r} is not"
f" supported by DynamoDB {type(attribute_value)!r}."
f" {type(attribute_value)!r} must be homogeneous of [str | bytes | int | float]"
)
elif isinstance(attribute_value, Sequence):
list_value = list(attribute_value)
for idx, el in enumerate(list_value):
list_value[idx] = self.serialize_att(el)
return {"L": list_value}
elif isinstance(attribute_value, MutableMapping):
dict_value = dict(attribute_value)
for key, val in dict_value.items():
dict_value[key] = self.serialize_att(val)
return {"M": dict_value}
else:
raise exceptions.DynamoDBError(
f"Object of type {type(attribute_value)!r} for {attribute_value!r} is not supported"
" by DynamoDB serialization."
)
def deserialize_att(
self,
dynamodb_attribute: Union[type_defs.AttributeValueTypeDef, PartitionKeyTypeDef],
) -> AttributeValueDeserialized:
"""
Deserialize an AWS DynamoDB data type into its corresponding
Python data type. Transforms an AWS DynamoDB attribute into a
Python data type by identifying the DynamoDB data type
descriptor and mapping it to its corresponding Python data type.
:param dynamodb_attribute: The DynamoDB attribute to be
deserialized. It should be a dictionary containing the
DynamoDB data type descriptor and the attribute value.
:return: The deserialized Python data type.
i.e. i.e. {"S": "string"} -> "string"
:raise DynamoDBError: If the provided dynamodb_attribute is not
supported for deserialization.
"""
# The sentinel value is a unique object identifier used as a
# default fallback when querying dictionary keys during the
# deserialization process. The unique ensures that the
# sentinel is not accidentally found in `dynamodb_attribute`
# dictionary values. Utilizing sentinel helps in distinguishing
# between a None value and absence of a key. During the
# deserialization, if the `.get()` method returns the sentinel,
# it implies the key was not found in `dynamodb_attribute`;
# otherwise, it returns the actual value (which might be None
# or other falsy values) related to the looked-up key.
sentinel = object()
if dynamodb_attribute.get("NULL", sentinel) is not sentinel:
return None
elif dynamodb_attribute.get("BOOL", sentinel) is not sentinel:
return bool(dynamodb_attribute["BOOL"]) # type: ignore
elif dynamodb_attribute.get("S", sentinel) is not sentinel:
return str(dynamodb_attribute["S"])
elif dynamodb_attribute.get("B", sentinel) is not sentinel:
return dynamodb_attribute["B"]
elif dynamodb_attribute.get("N", sentinel) is not sentinel:
if '.' in dynamodb_attribute["N"]:
return float(dynamodb_attribute["N"])
else:
return int(dynamodb_attribute["N"])
elif dynamodb_attribute.get("SS", sentinel) is not sentinel:
return set(dynamodb_attribute["SS"]) # type: ignore
elif dynamodb_attribute.get("BS", sentinel) is not sentinel:
return set(dynamodb_attribute["BS"]) # type: ignore
elif dynamodb_attribute.get("NS", sentinel) is not sentinel:
if "." in dynamodb_attribute["NS"][0]: # type: ignore
return {float(el) for el in dynamodb_attribute["NS"]} # type: ignore
else:
return {int(el) for el in dynamodb_attribute["NS"]} # type: ignore
elif dynamodb_attribute.get("L", sentinel) is not sentinel:
return [self.deserialize_att(el) for el in dynamodb_attribute["L"]] # type: ignore
elif dynamodb_attribute.get("M", sentinel) is not sentinel:
return {
key: self.deserialize_att(val)
for key, val in dynamodb_attribute["M"].items() # type: ignore
}
else:
raise exceptions.DynamoDBError(
f"Object {dynamodb_attribute!r} of type {type(dynamodb_attribute)!r} is not"
" supported for DynamoDB deserialization."
)
def serialize_p_key(
self,
pk_key: str,
pk_value: PartitionKeyValue,
sk_key: Optional[str] = None,
sk_value: Optional[SortKeyValue] = None,
) -> PartitionKeyItem:
"""
Return a serialized DynamoDB partition key.
:param pk_key: The key of the partition key.
:param pk_value: The value of the partition key.
:param sk_key: The key of the sort key.
:param sk_value: The value of the sort key.
:return: Serialized partition key.
i.e. {"id": {"S": "string"}, "sortKey": {"S": "string"}}
"""
if (sk_key is None) ^ (sk_value is None):
raise exceptions.DynamoDBError(
"Both sort_key_key and sort_key_value must be provided or both must be None."
)
has_sk = sk_key is not None and sk_value is not None
pk_ser: PartitionKeyItem = {pk_key: self.serialize_att(pk_value)}
if has_sk:
assert sk_key is not None
pk_ser[sk_key] = self.serialize_att(sk_value)
return pk_ser
def deserialize_p_key(
self, pk_serialized: PartitionKeyItem
) -> tuple[str, PartitionKeyValue, Optional[str], Optional[SortKeyValue]]:
"""
Deserialize a serialized DynamoDB partition key.
:param pk_serialized: The serialized partition key.
i.e., {"id": {"S": "string"}, "sortKey": {"S": "string"}}.
:return: A tuple containing the partition key and its value.
"""
iter_pk_ser = iter(pk_serialized.items())
pk_key, pk_value_ser = next(iter_pk_ser)
try:
sk_key, sk_value_ser = next(iter_pk_ser)
except StopIteration:
sk_key, sk_value_ser = (None, None)
pk_value = self.deserialize_att(pk_value_ser)
assert isinstance(pk_value, (str, bytes, int, float))
if sk_value_ser is not None:
sk_value = self.deserialize_att(sk_value_ser)
assert isinstance(sk_value, (str, bytes, int, float))
else:
sk_value = None
return pk_key, pk_value, sk_key, sk_value
def serialize_put_items(self, **items: AttributeValue) -> Item:
"""
Returns a dictionary of additional items with keys and values
serialized for DynamoDB put_item call.
:param items: Key-value pairs to serialize.
:return: Items ready for put_item.
i.e. {"col1": {"S": "value1"}, "col2": {"S": "value2"},
"col3": {"S": "value3"}, ...}
"""
additional_items: Item = {}
for key, value in items.items():
normalized_key = self.string_utils.snake_case_v2(key)
ddb_attribute = self.serialize_att(value)
# Now add it to the additional_items serialized dictionary
additional_items[normalized_key] = ddb_attribute
return additional_items
def serialize_update_items(self, **items: AttributeValue) -> tuple[str, dict[str, str], Item]:
"""
Returns a tuple containing the UpdateExpression and the
ExpressionAttributeValues ready to be passed to the DynamoDB
update_item call.
:param items: Key-value pairs to serialize.
:return: A tuple with UpdateExpression, ExpressionAttributeNames
and ExpressionAttributeValues.
"""
update_exp = "SET "
# In DynamoDB operations(such as UpdateItem, Query, or Scan),
# you can use ExpressionAttributeNames to provide alternate
# names for attributes in your expressions.This is most
# commonly used to work around DynamoDB's reserved keywords or
# to use attribute names that contain special characters not
# allowed in expressions.
exp_att_names: dict[str, str] = {}
# In DynamoDB API, the ExpressionAttributeValues dictionary is
# used to pass in placeholders for values that will be used in
# your UpdateExpression and ConditionExpression. The keys for
# these placeholders should start with a : and should not be
# confused with actual column names.
exp_att_values: Item = {}
for key, value in items.items():
normalized_key = self.string_utils.snake_case_v2(key)
ddb_attribute = self.serialize_att(value)
# Add to the update_expression string
update_exp += f"#{normalized_key} = :{normalized_key}_placeholder, "
# Add to the expression_attribute_names dict
exp_att_names[f"#{normalized_key}"] = normalized_key
# Add to the expression_attribute_values serialized dict
exp_att_values[f":{normalized_key}_placeholder"] = ddb_attribute
# Removing the trailing ", " from the update_expression string
update_exp = update_exp[:-2]
return update_exp, exp_att_names, exp_att_values
def normalize_item(self, item: dict[str, Any]) -> dict[str, AttributeValueDeserialized]:
"""
Serialize a Python item to DynamoDB format and then deserialize
it back to ensure a fully consistent DynamoDB-deserialized
structure.
:param item: A dict of Python data, possibly partially
deserialized or in raw Python format.
:return: A dict representing the item in full
DynamoDB-deserialized format
(i.e., matching AttributeValueDeserialized).
"""
item_ser = {key: self.serialize_att(value) for key, value in item.items()}
item_deser = {key: self.deserialize_att(value) for key, value in item_ser.items()}
return item_deser