En estos momentos me encuentro trabajando en un proyecto con arquitectura limpia. Ha llegado un punto donde necesito interactuar con Redis, una base de datos en memoria muy popular, pero cuando tu código se organiza por capas (como demanda la arquitectura limpia) no puedes lanzar comandos directamente a una aplicación… por muy fácil y tentador que resulte. Debes crear un adaptador, una interfaz que gestione la comunicación. De este modo podrás cambiar de base de datos sin tener que modificar todo tu código.
Los requerimientos son los siguientes:
get
: Obtener un valor por clave.set
: Establecer un valor por clave.get_list
: Obtener todos los elementos de una lista.add_to_list
: Añadir un elemento a una lista.update_list
: Actualizar los elementos de una lista que coincidan con los filtros adecuados (where
si estuvieramos en SQL).remove_from_list
: Eliminar un elemento de una lista indicando la clave y los filtros.search_list
: Buscar los elementos de una lista que coincidan con los filtros adecuados.
Primero necesitamos instalar las dependencias necesarias para interactuar con Redis. En nuestro caso necesitaremos dos:
redis
: La librería oficial de Redis para Python.python-redis-lock
: Una librería que nos permitirá bloquear la cola de Redis para evitar problemas de concurrencia.
El código sería el siguiente:
import json
import os
from typing import Any
import redis
import redis_lock
class ResponseTypes:
SUCCESS = 'Success' # The process ended correctly
PARAMETERS_ERROR = 'ParametersError' # Missing or invalid parameters
RESOURCE_ERROR = 'ResourceError' # The process ended correctly but the resource is not available (DB, file, etc)
SYSTEM_ERROR = (
'SystemError' # The process ended with an error. Python error
)
class RedisStore:
def __init__(self):
"""
Initializes the connection to the Redis server
"""
self.connection = redis.from_url(
os.environ.get('STORE_URI', 'redis://redis:6379')
)
self.expiration = 60
if not self.connection.ping():
return {
'type': ResponseTypes.RESOURCE_ERROR,
'errors': ['Connection failed'],
'data': None,
}
def __is_json(self, text: str) -> bool:
"""
Checks if a string is a valid JSON
:param text: The text to check
:return: True if the text is a valid JSON, False otherwise
"""
try:
json.loads(text)
except ValueError:
return False
return True
def __get_search_index(
self, key: str, filters: dict | None = None
) -> list:
"""
Searches the positions of the elements that match the filters
:param key: The key of the list
:param filters: The filters to apply
:return: The positions of the elements that match all the filters
"""
# Return all the positions if there are no filters
if filters is None or filters == {}:
return list(range(0, len(self.get_list(key)['data'])))
# Search the positions to update
find_index = []
for index, item in enumerate(self.get_list(key)['data']):
if isinstance(item, str) and self.__is_json(item):
item = json.loads(item)
if isinstance(item, dict):
matchs = 0
# Check if the key exists and the value matches
for filter_key, filter_value in filters.items():
if filter_key in item and item[filter_key] == filter_value:
matchs += 1
# If all filters match, add the index
if matchs == len(filters):
find_index.append(index)
return find_index
def set(self, key: str, value: str | int | bool | dict | list) -> dict:
"""
Sets a key-value pair in the Redis server
Example:
--
set('name', 'John Doe')
set('age', 25)
set('is_active', True)
set('profile', {'name': 'John Doe', 'age': 25})
set('tasks', ['Task 1', 'Task 2', 'Task 3']) # Must be used with add_to_list
--
:param key: The key to set
:param value: The value to set
"""
if key == '':
return {
'type': ResponseTypes.PARAMETERS_ERROR,
'errors': ['Invalid key'],
'data': None,
}
if isinstance(value, dict | list):
value = json.dumps(value)
if isinstance(value, bool):
value = str(value)
try:
return {
'type': ResponseTypes.SUCCESS,
'errors': None,
'data': self.connection.set(key, value),
}
except Exception:
return {
'type': ResponseTypes.PARAMETERS_ERROR,
'errors': [f'Error setting key: {key}'],
'data': None,
}
def get(self, key: str) -> dict:
"""
Gets the value of a key in the Redis server
Example:
--
result = get('name')
result['data']
# 'John Doe'
--
:param key: The key to get
:return: The value of the key
"""
try:
return {
'type': ResponseTypes.SUCCESS,
'errors': None,
'data': self.connection.get(key).decode('utf-8'),
}
except Exception:
return {
'type': ResponseTypes.PARAMETERS_ERROR,
'errors': None,
'data': None,
}
def get_list(self, key: str) -> dict:
"""
Custom method to get all elements from a list
Example:
--
result = get_list('cats')
result['data']
# ['Garfield', 'Felix', 'Tom' ...]
--
:param key: The key of the list
:return: A list with all the elements
"""
try:
data = [
item_raw.decode('utf-8')
for item_raw in self.connection.lrange(key, 0, -1)
]
data_fixed_bool = list(
map(
lambda item: item == 'True'
if item in ['True', 'False']
else item,
data,
)
)
def format_item(item: str | int | bool | dict | list) -> Any:
if (
isinstance(item, str)
and not isinstance(item, dict | list)
and self.__is_json(item)
):
return json.loads(item)
return item
data_final = list(map(format_item, data_fixed_bool))
return {
'type': ResponseTypes.SUCCESS,
'errors': None,
'data': data_final,
}
except Exception as e:
return {
'type': ResponseTypes.PARAMETERS_ERROR,
'errors': [str(e)],
'data': [],
}
def add_to_list(
self,
key: str,
data: str | int | bool | dict | list,
) -> dict:
"""
Adds an element to a list
Example:
--
add_to_list('cats', 'Garfield')
add_to_list('cats', {'name': 'Felix', 'age': 5, 'color': 'black'})
--
:param key: The key of the list
:param data: The data to add
:param index: The index to add the element. If None, the element is added at the end
"""
if isinstance(data, dict | list):
data = json.dumps(data)
if isinstance(data, bool):
data = str(data)
with redis_lock.Lock(
self.connection,
name=key,
expire=self.expiration,
auto_renewal=True,
):
return {
'type': ResponseTypes.SUCCESS,
'errors': None,
'data': self.connection.rpush(key, data),
}
def update_list(
self,
key: str,
data: str | int | bool | dict | list,
filters: dict | None = None,
) -> dict:
"""
Updates the elements of a list that match the filters
Example:
--
update_list('cats', {'name': 'Felix', 'age': 5, 'color': 'black'}, {'name': 'Felix', 'age': 11})
--
:param key: The key of the list
:param data: The data to update
:param filters: The filters to apply
"""
if filters is None:
return {
'type': ResponseTypes.PARAMETERS_ERROR,
'errors': ['Filters are required'],
'data': None,
}
# Lock the queue
# https://pypi.org/project/python-redis-lock/
if isinstance(data, dict | list):
data = json.dumps(data)
with redis_lock.Lock(
self.connection,
name=key,
expire=self.expiration,
auto_renewal=True,
):
# Search the positions to update
update_positions = self.__get_search_index(key, filters)
# Update the positions
for position in update_positions:
try:
self.connection.lset(key, position, data)
except Exception as e:
return {
'type': ResponseTypes.PARAMETERS_ERROR,
'errors': ['Error updating list: ' + str(e)],
'data': None,
}
return {
'type': ResponseTypes.SUCCESS,
'errors': None,
'data': update_positions,
}
def remove_from_list(self, key: str, filters: dict | None = None) -> dict:
"""
Removes an element from a list
Example:
--
remove_from_list('cats', {'name': 'Felix', 'age': 5, 'color': 'black'})
--
:param key: The key of the list
:param filters: The filters to apply
"""
try:
with redis_lock.Lock(
self.connection,
name=key,
expire=self.expiration,
auto_renewal=True,
):
# The reason there there isn't a remove by index operation is that it does not play well with Redis atomic operations. There is no way without an explicit lock to use it in a safe way.
# https://groups.google.com/g/redis-db/c/c-IpJ0YWa9I
key_delete = '__DELETED__'
def delete_item():
find_index = self.__get_search_index(key, filters)
if len(find_index) > 0:
self.connection.lset(key, find_index[0], key_delete)
self.connection.lrem(key, 0, key_delete)
delete_item()
delete_item()
return {
'type': ResponseTypes.SUCCESS,
'errors': None,
'data': None,
}
except Exception as e:
return {
'type': ResponseTypes.PARAMETERS_ERROR,
'errors': ['Error removing element from list: ' + str(e)],
'data': None,
}
def search_list(self, key: str, filters: dict | None = None) -> dict:
"""
Searches the positions of the elements that match the filters
Example:
--
result = search_list('cats', {'age': 5})
result['data']
# [{'name': 'Felix', 'age': 5, 'color': 'black'}]
result = search_list('tasks', {'completed': True})
result['data']
# [{'name': 'Task 1', 'completed': True}, {'name': 'Task 3', 'completed': True}]
--
:param key: The key of the list
:param filters: The filters to apply
:return: The positions of the elements that match the filters
"""
if filters is None:
return {
'type': ResponseTypes.PARAMETERS_ERROR,
'errors': ['Filters are required'],
'data': None,
}
with redis_lock.Lock(
self.connection,
name=key,
expire=self.expiration,
auto_renewal=True,
):
if isinstance(filters, dict):
# Dictionary
find_index = self.__get_search_index(key, filters)
else:
return {
'type': ResponseTypes.PARAMETERS_ERROR,
'errors': ['Invalid filters'],
'data': None,
}
key_list = self.get_list(key)['data']
data = [key_list[position] for position in find_index]
return {
'type': ResponseTypes.SUCCESS,
'errors': None,
'data': data,
}
Los test que implementé están adaptados a Pytest. Aquí te dejo un ejemplo de cómo podrías testear la clase:
import json
import pytest
from RedisStore import RedisStore, ResponseTypes
"""
Test the RedisStore class
test_redis_store_1: set and get. Set a key-value pair.
test_redis_store_2: set and get. Set wrong key.
test_redis_store_3: set and get. Set value str.
test_redis_store_4: set and get. Set value int.
test_redis_store_5: set and get. Set value bool.
test_redis_store_6: set and get. Set value dict.
test_redis_store_7: set and get. Set value list.
test_redis_store_8: get_list. Get all elements from a list.
test_redis_store_9: get_list. Get all elements from a list with wrong key.
test_redis_store_10: add_to_list. Add an element to a list.
test_redis_store_11: update_list. Update elements of a list that match the filters.
test_redis_store_12: update_list. Update elements of a list without filters.
test_redis_store_13: update_list. Update elements of a list with wrong key.
test_redis_store_14: search_list. Search the positions of the elements that match the filters.
test_redis_store_15: search_list. Search the positions of the elements that match the filters with wrong key.
test_redis_store_16: search_list. Search the positions of the elements without filters.
test_redis_store_17: remove_from_list. Remove an element from a list.
test_redis_store_18: remove_from_list. Remove an element from a list with wrong key.
test_redis_store_19: remove_from_list. Remove an element from a list with wrong index.
"""
@pytest.fixture()
def connection():
return RedisStore()
@pytest.fixture()
def key():
return 'test'
@pytest.fixture()
def key_list():
return 'test_list'
def clear_list(connection, key_list):
return connection.remove_from_list(key_list, {})
def test_redis_store_1(connection, key):
# Given
value = 'foo'
# When
result = connection.set(key, value)
# Then
assert (
result['type'] == ResponseTypes.SUCCESS
), 'The response type is not correct'
assert connection.get(key)['data'] == value, 'The value is not correct'
def test_redis_store_2(connection, key):
# Given
value = 'foo'
# When
result = connection.set('', value)
# Then
assert (
result['type'] == ResponseTypes.PARAMETERS_ERROR
), 'The response type is not correct'
assert result['errors'] == ['Invalid key'], 'The errors are not correct'
def test_redis_store_3(connection, key):
# Given
value_1 = 'foo'
value_2 = 'bar'
# When
connection.set(key, value_1)
result = connection.set(key, value_2)
# Then
assert (
result['type'] == ResponseTypes.SUCCESS
), 'The response type is not correct'
assert connection.get(key)['data'] == value_2, 'The value is not correct'
def test_redis_store_4(connection, key):
# Given
value = 1
# When
result = connection.set(key, value)
# Then
assert (
result['type'] == ResponseTypes.SUCCESS
), 'The response type is not correct'
assert (
int(connection.get(key)['data']) == value
), 'The value is not correct'
def test_redis_store_5(connection, key):
# Given
value = True
# When
result = connection.set(key, value)
# Then
assert (
result['type'] == ResponseTypes.SUCCESS
), 'The response type is not correct'
assert bool(connection.get(key)['data']), 'The value is not correct'
def test_redis_store_6(connection, key):
# Given
value = {'foo': 'bar'}
# When
result = connection.set(key, value)
# Then
assert (
result['type'] == ResponseTypes.SUCCESS
), 'The response type is not correct'
assert (
json.loads(connection.get(key)['data']) == value
), 'The value is not correct'
def test_redis_store_7(connection, key):
# Given
value = ['foo', 'bar']
# When
result = connection.set(key, value)
# Then
assert (
result['type'] == ResponseTypes.SUCCESS
), 'The response type is not correct'
assert (
json.loads(connection.get(key)['data']) == value
), 'The value is not correct'
def test_redis_store_8(connection, key_list):
# Given
clear_list(connection, key_list)
my_list = ['foo', 'bar']
# Add the item to the list
for item in my_list:
connection.add_to_list(key_list, item)
# When
result = connection.get_list(key_list)
# Then
assert (
result['type'] == ResponseTypes.SUCCESS
), 'The response type is not correct'
assert result['data'] == my_list, 'The value is not correct'
def test_redis_store_9(connection, key_list):
# Given
clear_list(connection, key_list)
my_list = ['foo', 'bar']
# Add the item to the list
for item in my_list:
connection.add_to_list(key_list, item)
# When
result = connection.get_list('wrong_key')
# Then
assert (
result['type'] == ResponseTypes.SUCCESS
), 'The response type is not correct'
assert len(result['data']) == 0, 'The value is not correct'
def test_redis_store_10(connection, key_list):
# Given
clear_list(connection, key_list)
my_list = ['foo', 23, True, {'foo': 'bar'}, ['foo', 'bar']]
# When
results = []
for item in my_list:
results.append(connection.add_to_list(key_list, item))
# Then
for result in results:
assert (
result['type'] == ResponseTypes.SUCCESS
), 'The response type is not correct'
store_list = connection.get_list(key_list)['data']
store_list[1] = int(store_list[1])
assert store_list == my_list, 'The value is not correct'
def test_redis_store_11(connection, key_list):
# Given
clear_list(connection, key_list)
bar_dict_1 = {'to': 'vo'}
bar_dict_2 = {'mo': 'so', 'to': 'do'}
my_list = [
'foo',
'bar',
{'fo': 'mo', 'to': 'vo'},
{'to': 'vo'},
{'to': 'do', 'bo': 'co', 'do': 'eo', 'fo': 'go', 'mo': 'so'},
]
for item in my_list:
connection.add_to_list(key_list, item)
# When
connection.update_list(key_list, 'match', bar_dict_1) # Match
connection.update_list(key_list, 'nothing', {'nothing': 'zoo'}) # No match
connection.update_list(key_list, 'double match', bar_dict_2) # Match
# Then
store_list = connection.get_list(key_list)['data']
assert store_list == [
'foo',
'bar',
'match',
'match',
'double match',
], 'The value is not correct'
def test_redis_store_12(connection, key_list):
# Given
clear_list(connection, key_list)
my_list = ['foo', 'bar', 'foo', 'bar']
# Add the item to the list
for item in my_list:
connection.add_to_list(key_list, item)
# When
response = connection.update_list(key_list, 'coo')
# Then
assert (
response['type'] == ResponseTypes.PARAMETERS_ERROR
), 'The response type is not correct'
def test_redis_store_13(connection, key_list):
# Given
clear_list(connection, key_list)
my_list = ['foo', 'bar', 'foo', 'bar']
# Add the item to the list
for item in my_list:
connection.add_to_list(key_list, item)
# When
response = connection.update_list('wrong_key', 'coo')
# Then
assert (
response['type'] == ResponseTypes.PARAMETERS_ERROR
), 'The response type is not correct'
def test_redis_store_14(connection, key_list):
# Given
clear_list(connection, key_list)
bar_dict_1 = {'to': 'vo'}
bar_dict_2 = {'mo': 'so', 'to': 'do'}
bar_dict_3 = {'to': 'do', 'bo': 'co', 'do': 'eo', 'fo': 'go', 'mo': 'so'}
my_list = [
'foo',
'bar',
bar_dict_1,
{'fo': 'mo', 'to': 'vo'},
bar_dict_2,
bar_dict_3,
]
for item in my_list:
connection.add_to_list(key_list, item)
# When
results_1 = connection.search_list(key_list, bar_dict_1) # Match
results_2 = connection.search_list(
key_list, {'nothing': 'zoo'}
) # No match
results_3 = connection.search_list(key_list, bar_dict_2) # Match
# Then
assert (
results_1['type'] == ResponseTypes.SUCCESS
), 'The response type is not correct'
assert (
results_2['type'] == ResponseTypes.SUCCESS
), 'The response type is not correct'
assert (
results_3['type'] == ResponseTypes.SUCCESS
), 'The response type is not correct'
assert results_1['data'] == [
bar_dict_1,
{'fo': 'mo', 'to': 'vo'},
], 'The value is not correct'
assert results_2['data'] == [], 'The value is not correct'
assert results_3['data'] == [
bar_dict_2,
bar_dict_3,
], 'The value is not correct'
def test_redis_store_15(connection, key_list):
# Given
clear_list(connection, key_list)
# When
result = connection.search_list('wrong_key', {'wrong': 'dict'})
# Then
assert (
result['type'] == ResponseTypes.SUCCESS
), 'The response type is not correct'
assert len(result['data']) == 0, 'The value is not correct'
def test_redis_store_16(connection, key_list):
# Given
clear_list(connection, key_list)
my_list = ['foo', 'bar', 'foo', 'bar']
# Add the item to the list
for item in my_list:
connection.add_to_list(key_list, item)
# When
result = connection.search_list(key_list)
# Then
assert (
result['type'] == ResponseTypes.PARAMETERS_ERROR
), 'The response type is not correct'
def test_redis_store_17(connection, key_list):
# Given
clear_list(connection, key_list)
my_list = [{'foo': 'bar'}, {'bar': 'foo'}, {'boo': 'moo'}]
for item in my_list:
connection.add_to_list(key_list, item)
# When
result_1 = connection.remove_from_list(key_list, {'boo': 'moo'})
save_list_1 = connection.get_list(key_list)['data']
result_2 = connection.remove_from_list(key_list, {})
save_list_2 = connection.get_list(key_list)['data']
# Then
assert (
result_1['type'] == ResponseTypes.SUCCESS
), 'The response type is not correct'
assert save_list_1 == [
{'foo': 'bar'},
{'bar': 'foo'},
], 'The value is not correct'
assert (
result_2['type'] == ResponseTypes.SUCCESS
), 'The response type is not correct'
assert save_list_2 == [], 'The value is not correct'
def test_redis_store_18(connection, key_list):
# Given
clear_list(connection, key_list)
my_list = [{'foo': 'bar'}, {'bar': 'foo'}, {'boo': 'moo'}]
for item in my_list:
connection.add_to_list(key_list, item)
# When
result = connection.remove_from_list('wrong_key', {'boo': 'moo'})
# Then
assert (
result['type'] == ResponseTypes.SUCCESS
), 'The response type is not correct'
assert (
connection.get_list(key_list)['data'] == my_list
), 'The value is not correct'
def test_redis_store_19(connection, key_list):
# Given
clear_list(connection, key_list)
my_list = [{'foo': 'bar'}, {'bar': 'foo'}, {'boo': 'moo'}]
for item in my_list:
connection.add_to_list(key_list, item)
# When
result = connection.remove_from_list(key_list, 3)
# Then
assert (
result['type'] == ResponseTypes.PARAMETERS_ERROR
), 'The response type is not correct'
assert (
connection.get_list(key_list)['data'] == my_list
), 'The value is not correct'
En cada función puedes encontrar un ejemplo de como utilizarlo. Todos siguen el mismo patrón.
Primero debes instanciar la clase RedisStore
:
my_store = RedisStore()
Después puedes utilizar los métodos que necesites:
my_store.set('name', 'John Doe')
Si quieres saber más sobre como adaptar tu proyecto en Python a una arquitectura limpia, te recomiendo que leas el artículo.
Espero que te sirva de ayuda para tu proyecto. ¡Happy coding! 🚀
{{ comments.length }} comentarios