Crear un chat bot para XMPP | Programador Web Valencia

Crear un chat bot para XMPP

2 minutos

Chatbot

Os voy a enseñar a crear un bot para XMPP usando Python y la librería slixmpp. Me resulta de mucha utilidad para automatizar tareas en mis servidores y para crear pequeñas aplicaciones que se comuniquen conmigo. Por ejemplo: saber el uso de memoria RAM, avisos ante ciertos eventos (como un gran uso repentino de CPU), conocer cuantos clientes están conectados, ejecutar rutinas de mantenimiento, lanzar despliegues u otras más banales (toma este hilo de conversación y hazme un resumen, por ejemplo).

¡Atención! No es compatible con el protocolo de encritación end-to-end OMEMO. En caso de necesidad, puedes usar el fork slixmpp-omemo con sus respectivas configuraciones.

Sencillo echo

Empezaremos creando un bot que repita los mensajes que le enviamos. Para ello, vamos a crear una clase que herede de slixmpp.ClientXMPP y que implemente los métodos start y message. El primero se ejecuta cuando se inicia la sesión y el segundo cuando recibimos un mensaje.

Crearemos un archivo llamado bot.py con el siguiente contenido:

import slixmpp


class Bot(slixmpp.ClientXMPP):

    def __init__(self, jid, password):
        super().__init__(jid, password)
        self.add_event_handler("session_start", self.start)
        self.add_event_handler("message", self.message)

    async def start(self, event):
        self.send_presence()
        await self.get_roster()

    def message(self, msg):
        # Evento que se ejecuta cuando recibimos un mensaje
        if msg["type"] in ("chat", "normal"):
            # Envia el mensaje de vuelta. msg['body'] es el mensaje recibido.
            msg.reply(msg['body']).send()

# Configurar información de la cuenta
jid = "tuusuario@dominio.com"
password = "tu contraseña"
xmpp = Bot(jid, password)

# Conectar al servidor XMPP
xmpp.connect()
xmpp.process(forever=False)

Para ejecutarlo usaremos:

python3 bot.py

Para pararlo debes pulsar Ctrl + C.

Con la base anterior podemos ir todo lo lejos que queramos.

Un ejemplo más completo

Si ya hemos entendido como hacerlo funcionar, le daremos algo más de complejidad.

En este caso vamos a crear un bot que responda a los siguientes comandos:

  • help: Muestra una ayuda con los comandos disponibles.
  • echo: Repite el mensaje.
  • mem: Muestra el uso de memoria RAM.

Además incluiremos un decorador llamado loading que nos permitirá mostrar un mensaje de espera mientras se ejecuta el comando.

import slixmpp
import threading
import subprocess


class Bot(slixmpp.ClientXMPP):

    def __init__(self, jid, password):
        super().__init__(jid, password)
        self.add_event_handler("session_start", self.start)
        self.add_event_handler("message", self.message)

    async def start(self, event):
        self.send_presence()
        await self.get_roster()

    @staticmethod
    def loading(func):
        def wrapper(*args, **kwargs):
            if "msg" in kwargs:
                msg = kwargs["msg"]
                def callback(msg):
                    msg.reply("Espere por favor...").send()
                threading.Thread(target=wrapper, args=(msg,)).start()
            func(*args, **kwargs)
        return wrapper

    def message(self, msg):
        body = msg['body']
        command = body.split(" ")[0] if " " in body else body
        args = body.split(" ")[1:] if " " in body else None
        if msg["type"] in ("chat", "normal"):
            match command:
                case "help":
                    self.run_help(command, args, msg)
                case "echo":
                    msg.reply(" ".join(args)).send()
                case "mem":
                    self.run_mem(command, args, msg)
                case _:
                    pass

    def run_help(self, command, args, msg):
        msg.reply("Comandos disponibles: \n" +
                  "help: Muestra esta ayuda\n" +
                  "echo: Repite el mensaje\n" +
                  "mem: Muestra el uso de memoria RAM").send()

    def run_echo(self, command, args, msg):
        def callback():
            msg.reply(" ".join(args)).send()
        threading.Thread(target=callback).start()

    @loading
    def run_mem(self, command, args, msg):
        def callback():
            result = subprocess.run(["free", "-h"], stdout=subprocess.PIPE)
            msg.reply(result.stdout.decode("utf-8")).send()
        threading.Thread(target=callback).start()

# Configurar información de la cuenta
jid = "tuusuario@dominio.com"
password = "tu contraseña"
xmpp = Bot(jid, password)

# Conectar al servidor XMPP
xmpp.connect()
xmpp.process(forever=False)

Si lo complementas con tu propio servidor XMPP, puedes tener un bot privado que te responda a tus necesidades.

Espero que os resulte útil.

Esta obra está bajo una Licencia Creative Commons Atribución-NoComercial-SinDerivadas 4.0 Internacional.

Atribución/Reconocimiento-NoComercial-SinDerivados 4.0 Internacional

¿Me ayudas?

Comprame un café
Pulsa sobre la imagen

No te sientas obligado a realizar una donación, pero cada aportación mantiene el sitio en activo logrando que continúe existiendo y sea accesible para otras personas. Además me motiva a crear nuevo contenido.

Comentarios

{{ comments.length }} comentarios

Nuevo comentario

Nueva replica  {{ formatEllipsisAuthor(replyComment.author) }}

Acepto la política de Protección de Datos.

Escribe el primer comentario

Tal vez también te interese...