Tutorial de flask para realizar un notificador de Wallapop

El tutorial es un taller que realicé en la PyConES17. Su nombre original es Vigilante de Wallapop (Aplicación para vigilar precios en Wallapop). El código es libre y puedes usarlo para cualquier uso no comercial mientras hagas referencia a esta página. Es una práctica perfecta para adentrarte profesionalmente a Flask y conocer todas sus herramientas.

El código fuente lo puedes encontrar en Github:

Demo

Actualmente esta implementado en un sitio real:

Run (Ejecutar)

Para los impacientes, podéis jugar con el ejercicio acabado. Debéis descargar el código y ejecutar los siguientes comandos.

git clone https://github.com/tanrax/flask-wallapop-watcher.git
cd flask-wallapop-watcher
pip3 install virtualenv
virtualenv --python=python3 .venv
source .venv/bin/activate
pip3 install -r requirements.txt
python3 models.py db init
python3 models.py db migrate
python3 models.py db upgrade
python3 app.py

Después abrir en tu navegador favorito, que posiblemente será el fantástico Firefox, una pestaña nueva con


Requisitos mínimos

Comprobamos que tenemos todo

python3 --help
wget --help
sqlite3 --help
ping -c 5 google.com

Parte 1 - Nucleo de Flask y Buscador 50 min

1.1 Ready?

Preparamos nuestro entorno virtual.

mkdir flask-wallapop-watcher
cd flask-wallapop-watcher
pip3 install virtualenv
virtualenv --python=python3 .venv
source .venv/bin/activate
wget https://raw.githubusercontent.com/tanrax/flask-wallapop-watcher/master/requirements.txt
pip3 install -r requirements.txt

1.1 Hello PyConES17

Plantilla Flask. Creamos un nuevo archivo llamado app.py.

from flask import Flask

# Flask
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'mi secreto'

@app.route('/')
def buscador():
    return 'Hello PyConES17 !!!'


if __name__ == '__main__':
    app.run()

Ejecutamos y comprobamos que todo funciona.

python3 app.py
http://127.0.0.1:5000

1.2 Templates

Creamos una carpeta llamada templates. Dentro dos más: layouts y items. En layouts haremos uno nuevo con el nombre master.html.

<!DOCTYPE html>
<html lang="es">
    <head>
        <title>{% block title %}{% endblock %} | Vigilador de Wallapop</title>
        <meta charset="UTF-8">
        <meta name="viewport" content="width=device-width, initial-scale=1">
        <link rel="stylesheet" href="https://bootswatch.com/superhero/bootstrap.css">
    </head>
    <body>
        <div class="container">
            <ul class="nav nav-pills nav-justified">
                <li role="presentation" {% if active_page == "buscador" or not active_page %}class="active"{% endif %}><a href="">Buscador</a></li>
                <li role="presentation" {% if active_page == "programadas" %}class="active"{% endif %}><a href="#">Programadas</a></li>
            </ul>
            {% block body %}{% endblock %}
        </div>
    </body>
</html>

En items vamos a tener nuestra primera página real que va a heredar de master.html. Dentro de items creamos buscador.html.

{% extends 'layouts/master.html' %}
{% set active_page = "buscador" %}
{% block title %}Buscador{% endblock %}
{% block body %}
<h1>Buscador</h1>
{% endblock %}

Actulizamos app.py para que trabaje nuestro motor de plantillas.

from flask import Flask, render_template

# Flask
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'mi secreto'


@app.route('/')
def buscador():
    return render_template('items/buscador.html')


if __name__ == '__main__':
    app.run()

Creamos la segunda página donde tendremos nuestras busquedas almacenadas. Dentro de items creamos un fichero nuevo con el nombre de programadas.html.

{% extends 'layouts/master.html' %}
{% set active_page = "programadas" %}
{% block title %}Programadas{% endblock %}
{% block body %}
<h1>Soy la página donde estará las programadas</h1>
{% endblock %}
{% endhighlight %}


Actulizamos **app.py** con la nueva página.

{% highlight python3 %}
from flask import Flask, render_template

# Flask
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'mi secreto'


@app.route('/')
def buscador():
    return render_template('items/buscador.html')


@app.route('/programadas')
def programadas():
    return render_template('items/programadas.html')


if __name__ == '__main__':
    app.run()
{% endhighlight %}


Como último detalle haremos que nuestros botones del navegador tengan las rutas correctas.

{% highlight jinja2 %}{% raw %}
<!DOCTYPE html>
<html lang="es">
    <head>
        <title>{% block title %}{% endblock %} | Vigilador de Wallapop</title>
        <meta charset="UTF-8">
        <meta name="viewport" content="width=device-width, initial-scale=1">
        <link rel="stylesheet" href="https://bootswatch.com/superhero/bootstrap.css">
    </head>
    <body>
        <div class="container">
            <ul class="nav nav-pills nav-justified">
                <li role="presentation" {% if active_page == "buscador" or not active_page %}class="active"{% endif %}><a href="{{ url_for('buscador') }}">Buscador</a></li>
                <li role="presentation" {% if active_page == "programadas" %}class="active"{% endif %}><a href="{{ url_for('programadas') }}">Programadas</a></li>
            </ul>
            {% block body %}{% endblock %}
        </div>
    </body>
</html>

1.3 Forms

Realizamos el nuevo archivo forms.py.

from flask_wtf import FlaskForm
from wtforms import StringField, IntegerField
from wtforms.validators import DataRequired, Length, NumberRange, Optional


class SearchForm(FlaskForm):
    name = StringField('Nombre', [Length(min=1, max=100, message='Es demasiado largo'), DataRequired(message='Campo obligatorio')])
    price_max = IntegerField('Precio', [NumberRange(1, message='No puede ser inferior a 1'), Optional()])

Lo cargamos y se lo pasamos a la plantilla.

from flask import Flask, render_template
from forms import SearchForm

# Flask
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'mi secreto'

@app.route('/', methods=('GET', 'POST'))
def buscador():
    form = SearchForm()
    if form.validate_on_submit():
        pass
    return render_template('items/buscador.html', form=form)


@app.route('/programadas')
def programadas():
    return render_template('items/programadas.html')


if __name__ == '__main__':
    app.run()

Imprimimos los campos con un bucle en nuestra plantilla buscador.html.

{% extends 'layouts/master.html' %}
{% set active_page = "buscador" %}
{% block title %}Buscador{% endblock %}
{% block body %}
<h1>Buscador</h1>
<div class="row">
    <div class="col-xs-12">
        <form method="post">
            {{ form.csrf_token }}
            {% for input in form %}
                {% if input.type != 'CSRFTokenField' %}
                    <div class="form-group">
                        {# Label #}
                        {{ input.label }}
                        {# Input #}
                        {{ input(class="form-control") }}
                        {# Errors #}
                        {% if input.errors %}
                            <div class="has-error">
                            {% for error in input.errors %}
                                <label class="help-block">
                                    {{ error }}
                                </label>
                            {% endfor %}
                            </div>
                        {% endif %}
                    </div>
                {% endif %}
            {% endfor %}
            <input type="submit" class="btn btn-primary" value="Buscar">
        </form>
    </div>
</div>
{% endblock %}

Ha llegado la hora de lo divertido. Primero actulizamos nuestro app.py para obtener los datos del formulario si pasa las validaciones. Después, con esa información, haremos una llamada al API de Wallapop. Solo necesitaremos la URL que utilizan en su APP. Con urllib3 tendremos todos los resultados en un sencillo diccionario. Lo cual es magnífico, ya que es fácil de iterar dentro de nuestra plantilla.

from flask import Flask, render_template, request
from forms import SearchForm
# Get data Wallapop
import json
from urllib3 import PoolManager
import urllib.parse

# Flask
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'mi secreto'

@app.route('/', methods=('GET', 'POST'))
def buscador():
    form = SearchForm()
    results = None
    if form.validate_on_submit():
        name = form.name.data
        price_max = form.price_max.data or ''
        # Search in Wallapop
        results = get_resultados(name, price_max)
    return render_template('items/buscador.html', form=form, results=results)


@app.route('/programadas')
def programadas():
    return render_template('items/programadas.html')


def get_resultados(name='', price_max=''):
    http = PoolManager()
    url_api = 'http://es.wallapop.com/rest/items?minPrice=&maxPrice={price_max}&dist=&order=creationDate-des&lat=41.398077&lng=2.170432&kws={kws}'.format(
        kws=urllib.parse.quote(name, safe=''),
        price_max=price_max
    )
    results = http.request('GET', url_api)
    results = json.loads(
        results.data.decode('utf-8')
    )
    return results['items']


if __name__ == '__main__':
    app.run()

Y en nuestra plantilla de buscador.html.

{% extends 'layouts/master.html' %}
{% set active_page = "buscador" %}
{% block title %}Buscador{% endblock %}
{% block body %}
<h1>Buscador</h1>
<div class="row">
    <div class="col-xs-12">
        <form method="post">
            {{ form.csrf_token }}
            {% for input in form %}
                {% if input.type != 'CSRFTokenField' %}
                    <div class="form-group">
                        {# Label #}
                        {{ input.label }}
                        {# Input #}
                        {{ input(class="form-control") }}
                        {# Errors #}
                        {% if input.errors %}
                            <div class="has-error">
                            {% for error in input.errors %}
                                <label class="help-block">
                                    {{ error }}
                                </label>
                            {% endfor %}
                            </div>
                        {% endif %}
                    </div>
                {% endif %}
            {% endfor %}
            <input type="submit" class="btn btn-primary" value="Buscar">
        </form>
    </div>
</div>
{% if results %}
    <table class="table">
        {% for item in results %}
        <tr>
            <td><img class="img-responsive" src="{{ item.pictureURL }}" alt="{{ item.title }}"></td>
            <td>{{ item.title }}</td>
            <td>{{ item.price }}</td>
            <td><a href="#" class="btn btn-success">+</a></td>
        </tr>
        {% endfor %}
    </table>
{% endif %}
{% endblock %}

Y… ya tenemos un buscador no oficial.

Bases de datos y CRUD elementos con Flask

2.1 Models

Con Flask-alquemy vamos a definir la estructura de nuestra base de datos. En este caso tendremos una única tabla llamada Programado con los campos: id, title y last_item. Para ello crearemos un nuevo archivo con el nombre models.py.

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.sqlite'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)

class Programado(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(128))
    last_item = db.Column(db.Integer)

Esta forma de trabajar tan limpia carece de varias funcionalidades básicas, como migraciones o la posibilidad de ejecutar ordenes por medio del terminal. Para ello le sumaremos Flask-Migrate para las migraciones automáticas y Flask-Script para su gestión.

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.sqlite'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)

migrate = Migrate(app, db)
manager = Manager(app)
manager.add_command('db', MigrateCommand)

class Programado(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(128))
    last_item = db.Column(db.Integer)

if __name__ == "__main__":
    manager.run()

Abrimos nuestro terminal e iniciamos la base de datos, creamos la primera migración y actualizamos la base de datos.

python3 models.py db init
python3 models.py db migrate
python3 models.py db upgrade

Comprobamos que todo ha ido bien.

sqlite3 database.sqlite
.schema Programado
.exit

2.2 Save item

Para guarda un elemento necesitamos modificar nuestra plantilla buscador.html. Enviaremos un POST. Qué sencillamente será un <form> con las variables ocultas. En este caso lo que haremos será mostrar el botón solamente visible cuando recibamos una petición POST. Su finalidad será realizar una petición a la página programadas_nuevo() con el nombre que hemos buscado.

{% extends 'layouts/master.html' %}
{% set active_page = "buscador" %}
{% block title %}Buscador{% endblock %}
{% block body %}
<h1>Buscador</h1>
<div class="row">
    <div class="col-xs-12">
        <form method="post">
            {{ form.csrf_token }}
            {% for input in form %}
                {% if input.type != 'CSRFTokenField' %}
                    <div class="form-group">
                        {# Label #}
                        {{ input.label }}
                        {# Input #}
                        {{ input(class="form-control") }}
                        {# Errors #}
                        {% if input.errors %}
                            <div class="has-error">
                            {% for error in input.errors %}
                                <label class="help-block">
                                    {{ error }}
                                </label>
                            {% endfor %}
                            </div>
                        {% endif %}
                    </div>
                {% endif %}
            {% endfor %}
            <input type="submit" class="btn btn-primary" value="Buscar">
            </form>
            {% if results %}
                <form action="{{ url_for('programadas_nuevo') }}" method="post">
                    <input type="hidden" name="title" value="{{ form.name.data }}">
                    <input type="submit" class="btn btn-success" value="Programar">
                </form>
            {% endif %}
    </div>
</div>
{% if results %}
    <table class="table">
        {% for item in results %}
        <tr>
            <td><img class="img-responsive" src="{{ item.pictureURL }}" alt="{{ item.title }}"></td>
            <td>{{ item.title }}</td>
            <td>{{ item.price }}</td>
        </tr>
        {% endfor %}
    </table>
{% endif %}
{% endblock %}
{% endhighlight %}


Ahora, es el momento de crear la función para *programadas_nuevo()* en **app.py**. Lo primero que le indicamos es que solo puede aceptar peticiones *POST*. A continuación creamos variables para guardar la información del formulario. Después creamos el registro en la base de datos. Por último redireccionamos a la anterior página para ver el nuevo elemento.


Vayamos por partes. Importamos **db** que será nuestro *ORM*, y **Programado** que será la tabla a manipular.

{% highlight python3 %}
from models import db, Programado
{% endhighlight %}


Creamos el nuevo registro.

{% highlight python3 %}
my_program = Programado(
    title=title
    )
{% endhighlight %}


Lo añadimos a la cola.

{% highlight python3 %}
db.session.add(my_program)
{% endhighlight %}


Y ejecutamos las modificaciones. En caso que fallara, dejaría los datos como estaban. 

{% highlight python3 %}
try:
    db.session.commit()
except:
    db.session.rollback()
{% endhighlight %}


Todo junto quedará así.

{% highlight python3 %}
from flask import Flask, render_template, request, redirect, url_for
from forms import SearchForm
# Get data Wallapop
import json
from urllib3 import PoolManager
import urllib.parse
# Database
from models import db, Programado

# Flask
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'mi secreto'

@app.route('/', methods=('GET', 'POST'))
def buscador():
    form = SearchForm()
    results = None
    if form.validate_on_submit():
        name = form.name.data
        price_max = form.price_max.data or ''
        # Search in Wallapop
        results = get_resultados(name, price_max)
    return render_template('items/buscador.html', form=form, results=results)


def get_resultados(name='', price_max=''):
    http = PoolManager()
    url_api = 'http://es.wallapop.com/rest/items?minPrice=&maxPrice={price_max}&dist=&order=creationDate-des&lat=41.398077&lng=2.170432&kws={kws}'.format(
        kws=urllib.parse.quote(name, safe=''),
        price_max=price_max
    )
    results = http.request('GET', url_api)
    results = json.loads(
        results.data.decode('utf-8')
    )
    return results['items']

@app.route('/programadas')
def programadas():
    return render_template('items/programadas.html')

@app.route('/programadas/nuevo', methods=('POST',))
def programadas_nuevo():
    title = request.form['title']
    # We saved in the database
    my_program = Programado(
        title=title
        )
    db.session.add(my_program)
    try:
        db.session.commit()
    except:
        db.session.rollback()

    return redirect(url_for('programadas'))


if __name__ == '__main__':
    app.run()
{% endhighlight %}

---
#### 2.3 View items


Lamentablemente veremos la página vacía. Por ahora. Haremos una consulta a la base de datos para que nos de todos los registros de la tabla **Programado**, y se lo pasaremos a la plantilla. Para ello modificaremos la función que renderiza la plantilla **programadas.html**, que en nuestro caso se llama **programadas()** y esta en **app.py**.

{% highlight python3 %}
from flask import Flask, render_template, request, redirect, url_for
from forms import SearchForm
# Get data Wallapop
import json
from urllib3 import PoolManager
import urllib.parse
# Database
from models import db, Programado

# Flask
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'mi secreto'

@app.route('/', methods=('GET', 'POST'))
def buscador():
    form = SearchForm()
    results = None
    if form.validate_on_submit():
        name = form.name.data
        price_max = form.price_max.data or ''
        # Search in Wallapop
        results = get_resultados(name, price_max)
    return render_template('items/buscador.html', form=form, results=results)


@app.route('/programadas')
def programadas():
    programado_all = Programado.query.all()
    return render_template('items/programadas.html', programado_all=programado_all)


def get_resultados(name='', price_max=''):
    http = PoolManager()
    url_api = 'http://es.wallapop.com/rest/items?minPrice=&maxPrice={price_max}&dist=&order=creationDate-des&lat=41.398077&lng=2.170432&kws={kws}'.format(
        kws=urllib.parse.quote(name, safe=''),
        price_max=price_max
    )
    results = http.request('GET', url_api)
    results = json.loads(
        results.data.decode('utf-8')
    )
    return results['items']


@app.route('/programadas/nuevo', methods=('POST',))
def programadas_nuevo():
    title = request.form['title']
    # We saved in the database

    my_program = Programado(
        title=title
        )
    db.session.add(my_program)
    try:
        db.session.commit()
    except:
        db.session.rollback()

    return redirect(url_for('programadas'))


if __name__ == '__main__':
    app.run()
{% endhighlight %}


Actualizamos la plantilla **programadas.html** con un *bucle* para mostrar todos los resultados en una tabla de HTML.

{% highlight jinja2 %}{% raw %}
{% extends 'layouts/master.html' %}
{% set active_page = "programadas" %}
{% block title %}Programadas{% endblock %}
{% block body %}
<h1>Programadas</h1>
<table class="table">
    <thead>
        <tr>
            <th scope="col">Titulo</th>
        </tr>
    </thead>
    <tbody>
        {% for item in programado_all %}
        <tr>
            <td>{{ item.title }}</td>
        </tr>
        {% endfor %}
    </tbody>
</table>
{% endblock %}

2.4 Delete item

Repetiremos la estrategia anterior. En el bucle que muestra todos los resultados en programadas.html, añadimos un formulario que nos envíe un id a una futura función que definiremos en app.py.

{% extends 'layouts/master.html' %}
{% set active_page = "programadas" %}
{% block title %}Programadas{% endblock %}
{% block body %}
<h1>Programadas</h1>
<table class="table">
    <thead>
        <tr>
            <th scope="col">Titulo</th>
            <th></th>
        </tr>
    </thead>
    <tbody>
        {% for item in programado_all %}
        <tr>
            <td>{{ item.title }}</td>
            <td>
                <form action="{{ url_for('programadas_borrar') }}" method="post">
                    <input type="hidden" name="id" value="{{ item.id }}">                    
                    <input type="submit" class="btn btn-danger" value="-">
                </form>
            </td>
        </tr>
        {% endfor %}
    </tbody>
</table>
{% endblock %}

La manera de eliminar un registro consiste en realizar una busqueda de los elementos que quieres eliminar, y luego ponerlo en la cola para eliminarlos. Por último ejecutas la orden como antes.

db.session.delete(my_program)
try:
    db.session.commit()
except:
    db.session.rollback()

Quedaría así.

from flask import Flask, render_template, request, redirect, url_for
from forms import SearchForm
# Get data Wallapop
import json
from urllib3 import PoolManager
import urllib.parse
# Database
from models import db, Programado

# Flask
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'mi secreto'

@app.route('/', methods=('GET', 'POST'))
def buscador():
    form = SearchForm()
    results = None
    if form.validate_on_submit():
        name = form.name.data
        price_max = form.price_max.data or ''
        # Search in Wallapop
        results = get_resultados(name, price_max)
    return render_template('items/buscador.html', form=form, results=results)


@app.route('/programadas')
def programadas():
    programado_all = Programado.query.all()
    return render_template('items/programadas.html', programado_all=programado_all)


def get_resultados(name='', price_max=''):
    http = PoolManager()
    url_api = 'http://es.wallapop.com/rest/items?minPrice=&maxPrice={price_max}&dist=&order=creationDate-des&lat=41.398077&lng=2.170432&kws={kws}'.format(
        kws=urllib.parse.quote(name, safe=''),
        price_max=price_max
    )
    results = http.request('GET', url_api)
    results = json.loads(
        results.data.decode('utf-8')
    )
    return results['items']


@app.route('/programadas/nuevo', methods=('POST',))
def programadas_nuevo():
    title = request.form['title']
    # We saved in the database
    my_program = Programado(
        title=title
        )
    db.session.add(my_program)
    try:
        db.session.commit()
    except:
        db.session.rollback()

    return redirect(url_for('programadas'))


@app.route('/programadas/borrar', methods=('POST',))
def programadas_borrar():
    my_program = Programado.query.get(request.form['id'])
    db.session.delete(my_program)
    try:
        db.session.commit()
    except:
        db.session.rollback()

    return redirect(url_for('programadas'))


if __name__ == '__main__':
    app.run()

2.5 Flash messages

Tenemos un problema de usabilidad: ¡El usuario esta a ciegas cuando añade o borra! Tenemos que informarle de que esta pasando cuando apreta botones. Para ello nos haremos uso de los Fash messages. Como queremos que se vean en todas nuestras páginas, modificamos master.html.

<!DOCTYPE html>
<html lang="es">
    <head>
        <title>{% block title %}{% endblock %} | Vigilador de Wallapop</title>
        <meta charset="UTF-8">
        <meta name="viewport" content="width=device-width, initial-scale=1">
        <link rel="stylesheet" href="https://bootswatch.com/superhero/bootstrap.css">
    </head>
    <body>
        <div class="container">
            <ul class="nav nav-pills nav-justified">
                <li role="presentation" {% if active_page == "buscador" or not active_page %}class="active"{% endif %}><a href="{{ url_for('buscador') }}">Buscador</a></li>
                <li role="presentation" {% if active_page == "programadas" %}class="active"{% endif %}><a href="{{ url_for('programadas') }}">Programadas</a></li>
            </ul>
            {# Flashed messages #}
            {% with messages = get_flashed_messages() %}
              {% if messages %}
                {% for message in messages %}
                  <div class="alert alert-success" role="alert">{{ message }}</div>
                {% endfor %}
              {% endif %}
            {% endwith %}
            {# End Flashed messages #}
            {% block body %}{% endblock %}
        </div>
    </body>
</html>

Ahora ya será visible todos los mensajes en cajas elegantes de Bootstrap. Importamos flash.

from flask import Flask, render_template, request, redirect, url_for, flash

Y añadimos los mensajes que deseamos ver. Por ejemplo.

flash('Añadida con éxito.')

Nuestro app.py se nos quedaría así.

from flask import Flask, render_template, request, redirect, url_for, flash
from forms import SearchForm
# Get data Wallapop
import json
from urllib3 import PoolManager
import urllib.parse
# Database
from models import db, Programado

# Flask
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'mi secreto'

@app.route('/', methods=('GET', 'POST'))
def buscador():
    form = SearchForm()
    results = None
    if form.validate_on_submit():
        name = form.name.data
        price_max = form.price_max.data or ''
        # Search in Wallapop
        results = get_resultados(name, price_max)
    return render_template('items/buscador.html', form=form, results=results)


@app.route('/programadas')
def programadas():
    programado_all = Programado.query.all()
    return render_template('items/programadas.html', programado_all=programado_all)


def get_resultados(name='', price_max=''):
    http = PoolManager()
    url_api = 'http://es.wallapop.com/rest/items?minPrice=&maxPrice={price_max}&dist=&order=creationDate-des&lat=41.398077&lng=2.170432&kws={kws}'.format(
        kws=urllib.parse.quote(name, safe=''),
        price_max=price_max
    )
    results = http.request('GET', url_api)
    results = json.loads(
        results.data.decode('utf-8')
    )
    return results['items']


@app.route('/programadas/nuevo', methods=('POST',))
def programadas_nuevo():
    title = request.form['title']
    # We saved in the database
    my_program = Programado(
        title=title
        )
    db.session.add(my_program)
    try:
        db.session.commit()
        flash('Añadida con éxito.')
    except:
        db.session.rollback()

    return redirect(url_for('programadas'))


@app.route('/programadas/borrar', methods=('POST',))
def programadas_borrar():
    my_program = Programado.query.get(request.form['id'])
    db.session.delete(my_program)
    try:
        db.session.commit()
        flash('Borrada "{title}".'.format(title=my_program.title))
    except:
        db.session.rollback()

    return redirect(url_for('programadas'))


if __name__ == '__main__':
    app.run()

Envío de emails con nuevos elementos

3.1 Command

Ya tenemos montado nuestra interfaz para gestionar nuestras busquedas. Lo siguiente será crear un script que se encargue de verificar si hay nuevos resultados. Y si es así, enviarnos un email. El primer paso será crear con Flask un comando personalizado. Realizamos un nuevo archivo llamado avisador.py.

#!/usr/bin/env python3
from flask_script import Manager
from app import app

manager = Manager(app)

@manager.command
def hello():
    print('hello PyConES17')

if __name__ == "__main__":
    manager.run()

Para probar que funciona ejecutamos, siempre con el entorno virtual activo, lo siguiente.

chmod +x avisador.py
./avisador.py hello

Si todo ha ido bien nos responderá.

hello PyConES17

3.2 SMTP Server

Para enviar un email si o si necesitaremos una servidor SMTP. Podéis usar GMail, Hotmail, Fastmail… o cualquier cuenta de correo. Para el taller, usaremos Mailgun. Un poderoso servicio profesional para el envío de emails. Nos permite 10.000 envíos mensuales gratuitos. Suficientes para lo que necesitamos. Entramos aquí.

Creamos una nueva cuenta.

Confirmamos nuestra cuenta por el enlace que nos han enviado a nuestro email.

Al pulsar sobre el enlace nos llevará a esta página. Confirmamos nuestro teléfono y pulsamos en Domains. ¡Ojo! Si no confirmáis vuestro teléfono, Mailgun no funcionará.

Entramos en nuestro dominio activo.

Aquí tendremos los accesos que necesitaremos. Dejamos abierta esta página para más adelante.


3.3 Send email

Ya tenemos nuestro servidor de email. Ahora vamos a enviar un correo de prueba. Abrimos avisador.py para importar flask-mail.

from flask_mail import Mail, Message

Configuramos flask-mail. MAIL_USERNAME será Default SMTP Login de mailgun. Y MAIL_PASSWORD será Default Password de mailgun.

app.config.update(
    MAIL_SERVER='smtp.mailgun.org',
    MAIL_PORT=587,
    MAIL_USERNAME='tu_default_smtp_login',
    MAIL_PASSWORD='tu_default_password'
)
mail = Mail(app)

Creamos un comando para que nos envíe un email de prueba.

@manager.command
def send_email():
    msg = Message(
        "Nuevo aviso",
        sender="no-reply@pycon17.es",
        recipients=["tu_email"]
        )
    msg.body = "testing"
    msg.html = "<b>testing</b>"
    mail.send(msg)

Todo junto quedaría así.

#!/usr/bin/env python3
from flask_script import Manager
from app import app
from flask_mail import Mail, Message

app.config.update(
    MAIL_SERVER='smtp.mailgun.org',
    MAIL_PORT=587,
    MAIL_USERNAME='tu_default_smtp_login',
    MAIL_PASSWORD='tu_default_password'
)
mail = Mail(app)

manager = Manager(app)

@manager.command
def send_email():
    msg = Message(
        "Nuevo aviso",
        sender="no-reply@pycon17.es",
        recipients=["tu_email"]
        )
    msg.body = "testing"
    msg.html = "<b>testing</b>"
    mail.send(msg)


if __name__ == "__main__":
    manager.run()

Ejecutamos.

./avisador.py send_email

Revisamos nuestra bandeja de entrada. En caso contrario buscamos en spam.


3.4 Notification

Estamos listos para realizar el sistema de notificación. La lógica será básica: buscamos todos los productos que tenga la palabra que tenemos guarda. Nos quedamos con la primera id. Si la id es la misma que tenemos en la base de datos, no hacemos nada. Si es diferente, actualizamos la base de datos y enviamos un email.

Abrimos avisador.py. Primero, importamos nuestra funcion para obtener los elementos del API de Wallapop.

from app import app, get_resultados

Además, importamos nuestra tabla con las palabras guardadas.

from models import db, Programado

Recorremos todas las palabras almacenadas.

@manager.command
def send_email():
    programados = Programado.query.all()
    for item in programados:

Obtenemos el primer id. Que lo usaremos para comparar si la tenemos guardada.

@manager.command
def send_email():
    programados = Programado.query.all()
    for item in programados:
        # Get last id
        results = get_resultados(item.title)
        itemId = results[0]['itemId']
        if int(itemId) != item.last_item:

Para actualizar en SQLAlchemy hay que obtener el resultado, modificar el objeto, y devolverlo. Para ilustrar la forma de trabajar dejo un ejemplo. Modifico la columna gana que es un boolean y la columna nombre que es un string.

spartano = User.query.filter_by(id=1).first()
spartano.gana = False
spartano.nombre = 'Leónidas'
db.session.add(spartano)
db.session.commit()

En nuestro código quedaría implementado de la siguiente forma.

@manager.command
def send_email():
    programados = Programado.query.all()
    for item in programados:
        # Get last id
        results = get_resultados(item.title)
        itemId = results[0]['itemId']
        # Update last item in database
        if int(itemId) != item.last_item:
            programado_update = Programado.query.filter_by(id=item.id).first()
            programado_update.last_item = itemId
            db.session.add(programado_update)
            try:
                db.session.commit()
            except:
                db.session.rollback()

Ya solo nos queda enviar el email.

@manager.command
def send_email():
    programados = Programado.query.all()
    for item in programados:
        # Get last id
        results = get_resultados(item.title)
        itemId = results[0]['itemId']
        # Update last item in database
        if int(itemId) != item.last_item:
            programado_update = Programado.query.filter_by(id=item.id).first()
            programado_update.last_item = itemId
            db.session.add(programado_update)
            try:
                db.session.commit()
            except:
                db.session.rollback()
            # Send email
            msg = Message(
                "Nuevo aviso",
                sender="no-reply@pycon17.es",
                recipients=["tu email"]
            )
            msg.body = render_template('emails/notificacion.txt', title=results[0]['title'], id=itemId)
            msg.html = render_template('emails/notificacion.html', title=results[0]['title'], id=itemId)
            mail.send(msg)

Todo junto quedaría.

#!/usr/bin/env python3
from flask import render_template
from flask_script import Manager
from app import app, get_resultados
from flask_mail import Mail, Message
from models import db, Programado

app.config.update(
    MAIL_SERVER='smtp.mailgun.org',
    MAIL_PORT=587,
    MAIL_USERNAME='tu_default_smtp_login',
    MAIL_PASSWORD='tu_default_password'
)
mail = Mail(app)

manager = Manager(app)

@manager.command
def send_email():
    programados = Programado.query.all()
    for item in programados:
        # Get last id
        results = get_resultados(item.title)
        itemId = results[0]['itemId']
        # Update last item in database
        if int(itemId) != item.last_item:
            programado_update = Programado.query.filter_by(id=item.id).first()
            programado_update.last_item = itemId
            db.session.add(programado_update)
            try:
                db.session.commit()
            except:
                db.session.rollback()
            # Send email
            msg = Message(
                "Nuevo aviso",
                sender="no-reply@pycon17.es",
                recipients=["tu email"]
            )
            msg.body = render_template('emails/notificacion.txt', title=results[0]['title'], id=itemId)
            msg.html = render_template('emails/notificacion.html', title=results[0]['title'], id=itemId)
            mail.send(msg)

if __name__ == "__main__":
    manager.run()

3.5 View email

Ya no estoy enviando un texto sencillo en el email. Necesito la magia de flask con su render_template. Puedes observar como hago uso de dos plantillas donde paso dos variables. El titulo y la id del item.

Creamos una carpeta nueva dentro de templates con el nombre emails. Y dentro de esta, el archivo notificacion.html y notificacion.txt. Quedará la estructura así.

--> templates
        --> emails
            --> notificacion.html
            --> notificacion.txt
        --> items
            --> buscador.html
            --> programadas.html
        --> layouts
            --> master.html

Abrimos notificacion.txt e introducimos.

Aviso

{{ title }}

http://p.wallapop.com/i/{{ id }}?_pid=web&_me=www&campaign=mobile_item

Y en notificacion.html lo siguiente.

<!DOCTYPE html>
<html lang="es">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>Notificacion</title>
</head>
<body>
   <h1>Aviso</h1> 
   <h2>{{ title }}</h2>
   <a href="http://p.wallapop.com/i/{{ id }}?_pid=web&_me=www&campaign=mobile_item">Ver</a>
</body>
</html>

¡E voilà! Ya hemos terminado. Cuando nos llegue los nuevos avisos podremos pulsar en el enlace para ver la ficha del producto. Y si estamos en el smartphone, se nos abrirá la aplicación oficial.


3.6 Automation

Para revisar y recibir los emails solo tendrás que ejecutar el comando personalizado. De la misma forma que antes.

./avisador.py send_email

Mi recomendación es ejecutarlo en un cron cada hora y listo.

Versión escritorio