Interfaz en Python para interactuar con Redis | Programador Web Valencia

Interfaz en Python para interactuar con Redis

11 minutos

Redis

En estos momentos me encuentro trabajando en un proyecto con arquitectura limpia. Ha llegado un punto donde necesito interactuar con Redis, una base de datos en memoria muy popular, pero cuando tu código se organiza por capas (como demanda la arquitectura limpia) no puedes lanzar comandos directamente a una aplicación… por muy fácil y tentador que resulte. Debes crear un adaptador, una interfaz que gestione la comunicación. De este modo podrás cambiar de base de datos sin tener que modificar todo tu código.

Los requerimientos son los siguientes:

  • get: Obtener un valor por clave.
  • set: Establecer un valor por clave.
  • get_list: Obtener todos los elementos de una lista.
  • add_to_list: Añadir un elemento a una lista.
  • update_list: Actualizar los elementos de una lista que coincidan con los filtros adecuados (where si estuvieramos en SQL).
  • remove_from_list: Eliminar un elemento de una lista indicando la clave y los filtros.
  • search_list: Buscar los elementos de una lista que coincidan con los filtros adecuados.

Primero necesitamos instalar las dependencias necesarias para interactuar con Redis. En nuestro caso necesitaremos dos:

  • redis: La librería oficial de Redis para Python.
  • python-redis-lock: Una librería que nos permitirá bloquear la cola de Redis para evitar problemas de concurrencia.

El código sería el siguiente:

import json
import os
from typing import Any

import redis
import redis_lock

class ResponseTypes:
    SUCCESS = 'Success'  # The process ended correctly
    PARAMETERS_ERROR = 'ParametersError'  # Missing or invalid parameters
    RESOURCE_ERROR = 'ResourceError'  # The process ended correctly but the resource is not available (DB, file, etc)
    SYSTEM_ERROR = (
        'SystemError'  # The process ended with an error. Python error
    )



class RedisStore:
    def __init__(self):
        """
        Initializes the connection to the Redis server
        """
        self.connection = redis.from_url(
            os.environ.get('STORE_URI', 'redis://redis:6379')
        )
        self.expiration = 60
        if not self.connection.ping():
            return {
                'type': ResponseTypes.RESOURCE_ERROR,
                'errors': ['Connection failed'],
                'data': None,
            }

    def __is_json(self, text: str) -> bool:
        """
        Checks if a string is a valid JSON

        :param text: The text to check
        :return: True if the text is a valid JSON, False otherwise
        """
        try:
            json.loads(text)
        except ValueError:
            return False
        return True

    def __get_search_index(
        self, key: str, filters: dict | None = None
    ) -> list:
        """
        Searches the positions of the elements that match the filters

        :param key: The key of the list
        :param filters: The filters to apply
        :return: The positions of the elements that match all the filters
        """
        # Return all the positions if there are no filters
        if filters is None or filters == {}:
            return list(range(0, len(self.get_list(key)['data'])))
        # Search the positions to update
        find_index = []
        for index, item in enumerate(self.get_list(key)['data']):
            if isinstance(item, str) and self.__is_json(item):
                item = json.loads(item)
            if isinstance(item, dict):
                matchs = 0
                # Check if the key exists and the value matches
                for filter_key, filter_value in filters.items():
                    if filter_key in item and item[filter_key] == filter_value:
                        matchs += 1
                # If all filters match, add the index
                if matchs == len(filters):
                    find_index.append(index)

        return find_index

    def set(self, key: str, value: str | int | bool | dict | list) -> dict:
        """
        Sets a key-value pair in the Redis server

        Example:
        --
        set('name', 'John Doe')
        set('age', 25)
        set('is_active', True)
        set('profile', {'name': 'John Doe', 'age': 25})
        set('tasks', ['Task 1', 'Task 2', 'Task 3']) # Must be used with add_to_list
        --

        :param key: The key to set
        :param value: The value to set
        """
        if key == '':
            return {
                'type': ResponseTypes.PARAMETERS_ERROR,
                'errors': ['Invalid key'],
                'data': None,
            }
        if isinstance(value, dict | list):
            value = json.dumps(value)
        if isinstance(value, bool):
            value = str(value)
        try:
            return {
                'type': ResponseTypes.SUCCESS,
                'errors': None,
                'data': self.connection.set(key, value),
            }
        except Exception:
            return {
                'type': ResponseTypes.PARAMETERS_ERROR,
                'errors': [f'Error setting key: {key}'],
                'data': None,
            }

    def get(self, key: str) -> dict:
        """
        Gets the value of a key in the Redis server

        Example:
        --
        result = get('name')
        result['data']
        # 'John Doe'
        --

        :param key: The key to get
        :return: The value of the key
        """

        try:
            return {
                'type': ResponseTypes.SUCCESS,
                'errors': None,
                'data': self.connection.get(key).decode('utf-8'),
            }
        except Exception:
            return {
                'type': ResponseTypes.PARAMETERS_ERROR,
                'errors': None,
                'data': None,
            }

    def get_list(self, key: str) -> dict:
        """
        Custom method to get all elements from a list

        Example:
        --
        result = get_list('cats')
        result['data']
        # ['Garfield', 'Felix', 'Tom' ...]
        --

        :param key: The key of the list
        :return: A list with all the elements
        """
        try:
            data = [
                item_raw.decode('utf-8')
                for item_raw in self.connection.lrange(key, 0, -1)
            ]
            data_fixed_bool = list(
                map(
                    lambda item: item == 'True'
                    if item in ['True', 'False']
                    else item,
                    data,
                )
            )

            def format_item(item: str | int | bool | dict | list) -> Any:
                if (
                    isinstance(item, str)
                    and not isinstance(item, dict | list)
                    and self.__is_json(item)
                ):
                    return json.loads(item)
                return item

            data_final = list(map(format_item, data_fixed_bool))
            return {
                'type': ResponseTypes.SUCCESS,
                'errors': None,
                'data': data_final,
            }
        except Exception as e:
            return {
                'type': ResponseTypes.PARAMETERS_ERROR,
                'errors': [str(e)],
                'data': [],
            }

    def add_to_list(
        self,
        key: str,
        data: str | int | bool | dict | list,
    ) -> dict:
        """
        Adds an element to a list

        Example:
        --
        add_to_list('cats', 'Garfield')
        add_to_list('cats', {'name': 'Felix', 'age': 5, 'color': 'black'})
        --

        :param key: The key of the list
        :param data: The data to add
        :param index: The index to add the element. If None, the element is added at the end
        """
        if isinstance(data, dict | list):
            data = json.dumps(data)
        if isinstance(data, bool):
            data = str(data)
        with redis_lock.Lock(
            self.connection,
            name=key,
            expire=self.expiration,
            auto_renewal=True,
        ):
            return {
                'type': ResponseTypes.SUCCESS,
                'errors': None,
                'data': self.connection.rpush(key, data),
            }

    def update_list(
        self,
        key: str,
        data: str | int | bool | dict | list,
        filters: dict | None = None,
    ) -> dict:
        """
        Updates the elements of a list that match the filters

        Example:
        --
        update_list('cats', {'name': 'Felix', 'age': 5, 'color': 'black'}, {'name': 'Felix', 'age': 11})
        --

        :param key: The key of the list
        :param data: The data to update
        :param filters: The filters to apply
        """
        if filters is None:
            return {
                'type': ResponseTypes.PARAMETERS_ERROR,
                'errors': ['Filters are required'],
                'data': None,
            }
        # Lock the queue
        # https://pypi.org/project/python-redis-lock/
        if isinstance(data, dict | list):
            data = json.dumps(data)
        with redis_lock.Lock(
            self.connection,
            name=key,
            expire=self.expiration,
            auto_renewal=True,
        ):
            # Search the positions to update
            update_positions = self.__get_search_index(key, filters)
            # Update the positions
            for position in update_positions:
                try:
                    self.connection.lset(key, position, data)
                except Exception as e:
                    return {
                        'type': ResponseTypes.PARAMETERS_ERROR,
                        'errors': ['Error updating list: ' + str(e)],
                        'data': None,
                    }
            return {
                'type': ResponseTypes.SUCCESS,
                'errors': None,
                'data': update_positions,
            }

    def remove_from_list(self, key: str, filters: dict | None = None) -> dict:
        """
        Removes an element from a list

        Example:
        --
        remove_from_list('cats', {'name': 'Felix', 'age': 5, 'color': 'black'})
        --

        :param key: The key of the list
        :param filters: The filters to apply
        """
        try:
            with redis_lock.Lock(
                self.connection,
                name=key,
                expire=self.expiration,
                auto_renewal=True,
            ):
                # The reason there there isn't a remove by index operation is that it does not play well with Redis atomic operations. There is no way without an explicit lock to use it in a safe way.
                # https://groups.google.com/g/redis-db/c/c-IpJ0YWa9I
                key_delete = '__DELETED__'

                def delete_item():
                    find_index = self.__get_search_index(key, filters)
                    if len(find_index) > 0:
                        self.connection.lset(key, find_index[0], key_delete)
                        self.connection.lrem(key, 0, key_delete)
                        delete_item()

                delete_item()
                return {
                    'type': ResponseTypes.SUCCESS,
                    'errors': None,
                    'data': None,
                }
        except Exception as e:
            return {
                'type': ResponseTypes.PARAMETERS_ERROR,
                'errors': ['Error removing element from list: ' + str(e)],
                'data': None,
            }

    def search_list(self, key: str, filters: dict | None = None) -> dict:
        """
        Searches the positions of the elements that match the filters
        Example:
        --
        result = search_list('cats', {'age': 5})
        result['data']
        # [{'name': 'Felix', 'age': 5, 'color': 'black'}]
        result = search_list('tasks', {'completed': True})
        result['data']
        # [{'name': 'Task 1', 'completed': True}, {'name': 'Task 3', 'completed': True}]
        --

        :param key: The key of the list
        :param filters: The filters to apply
        :return: The positions of the elements that match the filters
        """
        if filters is None:
            return {
                'type': ResponseTypes.PARAMETERS_ERROR,
                'errors': ['Filters are required'],
                'data': None,
            }
        with redis_lock.Lock(
            self.connection,
            name=key,
            expire=self.expiration,
            auto_renewal=True,
        ):
            if isinstance(filters, dict):
                # Dictionary
                find_index = self.__get_search_index(key, filters)
            else:
                return {
                    'type': ResponseTypes.PARAMETERS_ERROR,
                    'errors': ['Invalid filters'],
                    'data': None,
                }
            key_list = self.get_list(key)['data']
            data = [key_list[position] for position in find_index]
            return {
                'type': ResponseTypes.SUCCESS,
                'errors': None,
                'data': data,
            }

Los test que implementé están adaptados a Pytest. Aquí te dejo un ejemplo de cómo podrías testear la clase:

import json

import pytest

from RedisStore import RedisStore, ResponseTypes

"""
Test the RedisStore class

test_redis_store_1: set and get. Set a key-value pair.
test_redis_store_2: set and get. Set wrong key.
test_redis_store_3: set and get. Set value str.
test_redis_store_4: set and get. Set value int.
test_redis_store_5: set and get. Set value bool.
test_redis_store_6: set and get. Set value dict.
test_redis_store_7: set and get. Set value list.
test_redis_store_8: get_list. Get all elements from a list.
test_redis_store_9: get_list. Get all elements from a list with wrong key.
test_redis_store_10: add_to_list. Add an element to a list.
test_redis_store_11: update_list. Update elements of a list that match the filters.
test_redis_store_12: update_list. Update elements of a list without filters.
test_redis_store_13: update_list. Update elements of a list with wrong key.
test_redis_store_14: search_list. Search the positions of the elements that match the filters.
test_redis_store_15: search_list. Search the positions of the elements that match the filters with wrong key.
test_redis_store_16: search_list. Search the positions of the elements without filters.
test_redis_store_17: remove_from_list. Remove an element from a list.
test_redis_store_18: remove_from_list. Remove an element from a list with wrong key.
test_redis_store_19: remove_from_list. Remove an element from a list with wrong index.
"""


@pytest.fixture()
def connection():
    return RedisStore()


@pytest.fixture()
def key():
    return 'test'


@pytest.fixture()
def key_list():
    return 'test_list'


def clear_list(connection, key_list):
    return connection.remove_from_list(key_list, {})


def test_redis_store_1(connection, key):
    # Given
    value = 'foo'

    # When
    result = connection.set(key, value)

    # Then
    assert (
        result['type'] == ResponseTypes.SUCCESS
    ), 'The response type is not correct'
    assert connection.get(key)['data'] == value, 'The value is not correct'


def test_redis_store_2(connection, key):
    # Given
    value = 'foo'

    # When
    result = connection.set('', value)

    # Then
    assert (
        result['type'] == ResponseTypes.PARAMETERS_ERROR
    ), 'The response type is not correct'
    assert result['errors'] == ['Invalid key'], 'The errors are not correct'


def test_redis_store_3(connection, key):
    # Given
    value_1 = 'foo'
    value_2 = 'bar'

    # When
    connection.set(key, value_1)
    result = connection.set(key, value_2)

    # Then
    assert (
        result['type'] == ResponseTypes.SUCCESS
    ), 'The response type is not correct'
    assert connection.get(key)['data'] == value_2, 'The value is not correct'


def test_redis_store_4(connection, key):
    # Given
    value = 1

    # When
    result = connection.set(key, value)

    # Then
    assert (
        result['type'] == ResponseTypes.SUCCESS
    ), 'The response type is not correct'
    assert (
        int(connection.get(key)['data']) == value
    ), 'The value is not correct'


def test_redis_store_5(connection, key):
    # Given
    value = True

    # When
    result = connection.set(key, value)

    # Then
    assert (
        result['type'] == ResponseTypes.SUCCESS
    ), 'The response type is not correct'
    assert bool(connection.get(key)['data']), 'The value is not correct'


def test_redis_store_6(connection, key):
    # Given
    value = {'foo': 'bar'}

    # When
    result = connection.set(key, value)

    # Then
    assert (
        result['type'] == ResponseTypes.SUCCESS
    ), 'The response type is not correct'
    assert (
        json.loads(connection.get(key)['data']) == value
    ), 'The value is not correct'


def test_redis_store_7(connection, key):
    # Given
    value = ['foo', 'bar']

    # When
    result = connection.set(key, value)

    # Then
    assert (
        result['type'] == ResponseTypes.SUCCESS
    ), 'The response type is not correct'
    assert (
        json.loads(connection.get(key)['data']) == value
    ), 'The value is not correct'


def test_redis_store_8(connection, key_list):
    # Given
    clear_list(connection, key_list)
    my_list = ['foo', 'bar']

    # Add the item to the list
    for item in my_list:
        connection.add_to_list(key_list, item)

    # When
    result = connection.get_list(key_list)

    # Then
    assert (
        result['type'] == ResponseTypes.SUCCESS
    ), 'The response type is not correct'
    assert result['data'] == my_list, 'The value is not correct'


def test_redis_store_9(connection, key_list):
    # Given
    clear_list(connection, key_list)
    my_list = ['foo', 'bar']

    # Add the item to the list
    for item in my_list:
        connection.add_to_list(key_list, item)

    # When
    result = connection.get_list('wrong_key')

    # Then
    assert (
        result['type'] == ResponseTypes.SUCCESS
    ), 'The response type is not correct'
    assert len(result['data']) == 0, 'The value is not correct'


def test_redis_store_10(connection, key_list):
    # Given
    clear_list(connection, key_list)
    my_list = ['foo', 23, True, {'foo': 'bar'}, ['foo', 'bar']]

    # When
    results = []
    for item in my_list:
        results.append(connection.add_to_list(key_list, item))

    # Then
    for result in results:
        assert (
            result['type'] == ResponseTypes.SUCCESS
        ), 'The response type is not correct'
    store_list = connection.get_list(key_list)['data']
    store_list[1] = int(store_list[1])
    assert store_list == my_list, 'The value is not correct'


def test_redis_store_11(connection, key_list):
    # Given
    clear_list(connection, key_list)
    bar_dict_1 = {'to': 'vo'}
    bar_dict_2 = {'mo': 'so', 'to': 'do'}
    my_list = [
        'foo',
        'bar',
        {'fo': 'mo', 'to': 'vo'},
        {'to': 'vo'},
        {'to': 'do', 'bo': 'co', 'do': 'eo', 'fo': 'go', 'mo': 'so'},
    ]
    for item in my_list:
        connection.add_to_list(key_list, item)

    # When
    connection.update_list(key_list, 'match', bar_dict_1)  # Match
    connection.update_list(key_list, 'nothing', {'nothing': 'zoo'})  # No match
    connection.update_list(key_list, 'double match', bar_dict_2)  # Match

    # Then
    store_list = connection.get_list(key_list)['data']
    assert store_list == [
        'foo',
        'bar',
        'match',
        'match',
        'double match',
    ], 'The value is not correct'


def test_redis_store_12(connection, key_list):
    # Given
    clear_list(connection, key_list)
    my_list = ['foo', 'bar', 'foo', 'bar']

    # Add the item to the list
    for item in my_list:
        connection.add_to_list(key_list, item)

    # When
    response = connection.update_list(key_list, 'coo')

    # Then
    assert (
        response['type'] == ResponseTypes.PARAMETERS_ERROR
    ), 'The response type is not correct'


def test_redis_store_13(connection, key_list):
    # Given
    clear_list(connection, key_list)
    my_list = ['foo', 'bar', 'foo', 'bar']

    # Add the item to the list
    for item in my_list:
        connection.add_to_list(key_list, item)

    # When
    response = connection.update_list('wrong_key', 'coo')

    # Then
    assert (
        response['type'] == ResponseTypes.PARAMETERS_ERROR
    ), 'The response type is not correct'


def test_redis_store_14(connection, key_list):
    # Given
    clear_list(connection, key_list)
    bar_dict_1 = {'to': 'vo'}
    bar_dict_2 = {'mo': 'so', 'to': 'do'}
    bar_dict_3 = {'to': 'do', 'bo': 'co', 'do': 'eo', 'fo': 'go', 'mo': 'so'}
    my_list = [
        'foo',
        'bar',
        bar_dict_1,
        {'fo': 'mo', 'to': 'vo'},
        bar_dict_2,
        bar_dict_3,
    ]
    for item in my_list:
        connection.add_to_list(key_list, item)

    # When
    results_1 = connection.search_list(key_list, bar_dict_1)  # Match
    results_2 = connection.search_list(
        key_list, {'nothing': 'zoo'}
    )  # No match
    results_3 = connection.search_list(key_list, bar_dict_2)  # Match

    # Then
    assert (
        results_1['type'] == ResponseTypes.SUCCESS
    ), 'The response type is not correct'
    assert (
        results_2['type'] == ResponseTypes.SUCCESS
    ), 'The response type is not correct'
    assert (
        results_3['type'] == ResponseTypes.SUCCESS
    ), 'The response type is not correct'
    assert results_1['data'] == [
        bar_dict_1,
        {'fo': 'mo', 'to': 'vo'},
    ], 'The value is not correct'
    assert results_2['data'] == [], 'The value is not correct'
    assert results_3['data'] == [
        bar_dict_2,
        bar_dict_3,
    ], 'The value is not correct'


def test_redis_store_15(connection, key_list):
    # Given
    clear_list(connection, key_list)

    # When
    result = connection.search_list('wrong_key', {'wrong': 'dict'})

    # Then
    assert (
        result['type'] == ResponseTypes.SUCCESS
    ), 'The response type is not correct'
    assert len(result['data']) == 0, 'The value is not correct'


def test_redis_store_16(connection, key_list):
    # Given
    clear_list(connection, key_list)
    my_list = ['foo', 'bar', 'foo', 'bar']

    # Add the item to the list
    for item in my_list:
        connection.add_to_list(key_list, item)

    # When
    result = connection.search_list(key_list)

    # Then
    assert (
        result['type'] == ResponseTypes.PARAMETERS_ERROR
    ), 'The response type is not correct'


def test_redis_store_17(connection, key_list):
    # Given
    clear_list(connection, key_list)
    my_list = [{'foo': 'bar'}, {'bar': 'foo'}, {'boo': 'moo'}]
    for item in my_list:
        connection.add_to_list(key_list, item)

    # When
    result_1 = connection.remove_from_list(key_list, {'boo': 'moo'})
    save_list_1 = connection.get_list(key_list)['data']
    result_2 = connection.remove_from_list(key_list, {})
    save_list_2 = connection.get_list(key_list)['data']

    # Then
    assert (
        result_1['type'] == ResponseTypes.SUCCESS
    ), 'The response type is not correct'
    assert save_list_1 == [
        {'foo': 'bar'},
        {'bar': 'foo'},
    ], 'The value is not correct'
    assert (
        result_2['type'] == ResponseTypes.SUCCESS
    ), 'The response type is not correct'
    assert save_list_2 == [], 'The value is not correct'


def test_redis_store_18(connection, key_list):
    # Given
    clear_list(connection, key_list)
    my_list = [{'foo': 'bar'}, {'bar': 'foo'}, {'boo': 'moo'}]
    for item in my_list:
        connection.add_to_list(key_list, item)

    # When
    result = connection.remove_from_list('wrong_key', {'boo': 'moo'})

    # Then
    assert (
        result['type'] == ResponseTypes.SUCCESS
    ), 'The response type is not correct'
    assert (
        connection.get_list(key_list)['data'] == my_list
    ), 'The value is not correct'


def test_redis_store_19(connection, key_list):
    # Given
    clear_list(connection, key_list)
    my_list = [{'foo': 'bar'}, {'bar': 'foo'}, {'boo': 'moo'}]
    for item in my_list:
        connection.add_to_list(key_list, item)

    # When
    result = connection.remove_from_list(key_list, 3)

    # Then
    assert (
        result['type'] == ResponseTypes.PARAMETERS_ERROR
    ), 'The response type is not correct'
    assert (
        connection.get_list(key_list)['data'] == my_list
    ), 'The value is not correct'

En cada función puedes encontrar un ejemplo de como utilizarlo. Todos siguen el mismo patrón.

Primero debes instanciar la clase RedisStore:

my_store = RedisStore()

Después puedes utilizar los métodos que necesites:

my_store.set('name', 'John Doe')

Si quieres saber más sobre como adaptar tu proyecto en Python a una arquitectura limpia, te recomiendo que leas el artículo.

Espero que te sirva de ayuda para tu proyecto. ¡Happy coding! 🚀

Esta obra está bajo una Licencia Creative Commons Atribución-NoComercial-SinDerivadas 4.0 Internacional.

Atribución/Reconocimiento-NoComercial-SinDerivados 4.0 Internacional

¿Me invitas a un café? ☕

Puedes hacerlo usando el terminal.

ssh customer@andros.dev -p 5555

Comentarios

{{ comments.length }} comentarios

Nuevo comentario

Nueva replica  {{ formatEllipsisAuthor(replyComment.author) }}

Acepto la política de Protección de Datos.

Escribe el primer comentario

Tal vez también te interese...