El tutorial es un taller que realicé en la PyConES17. Su nombre original es Vigilante de Wallapop (Aplicación para vigilar precios en Wallapop). El código es libre y puedes usarlo para cualquier uso no comercial mientras hagas referencia a esta página. Es una práctica perfecta para adentrarte profesionalmente a Flask y conocer todas sus herramientas.
El código fuente lo puedes encontrar en Github:
https://github.com/tanrax/flask-wallapop-watcher.git
Demo
Actualmente esta implementado en un sitio real:
Run (Ejecutar)
Para los impacientes, podéis jugar con el ejercicio acabado. Debéis descargar el código y ejecutar los siguientes comandos.
git clone https://github.com/tanrax/flask-wallapop-watcher.git
cd flask-wallapop-watcher
pip3 install virtualenv
virtualenv --python=python3 .venv
source .venv/bin/activate
pip3 install -r requirements.txt
python3 models.py db init
python3 models.py db migrate
python3 models.py db upgrade
python3 app.py
Después abrir en tu navegador favorito, que posiblemente será el fantástico Firefox, una pestaña nueva con http://127.0.0.1:5000
Requisitos mínimos
- python3
- wget
- Editor de texto enriquecido: Vim, Sublime Text, VSCode, Atom…
- sqlite3
- Conexión a internet
Comprobamos que tenemos todo
python3 --help
wget --help
sqlite3 --help
ping -c 5 google.com
Parte 1 - Nucleo de Flask y Buscador 50 min
1.1 Ready?
Preparamos nuestro entorno virtual.
mkdir flask-wallapop-watcher
cd flask-wallapop-watcher
pip3 install virtualenv
virtualenv --python=python3 .venv
source .venv/bin/activate
wget https://raw.githubusercontent.com/tanrax/flask-wallapop-watcher/master/requirements.txt
pip3 install -r requirements.txt
1.1 Hello PyConES17
Plantilla Flask. Creamos un nuevo archivo llamado app.py.
from flask import Flask
# Flask
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'mi secreto'
@app.route('/')
def buscador():
return 'Hello PyConES17 !!!'
if __name__ == '__main__':
app.run()
Ejecutamos y comprobamos que todo funciona.
python3 app.py
http://127.0.0.1:5000
1.2 Templates
Creamos una carpeta llamada templates. Dentro dos más: layouts y items. En layouts haremos uno nuevo con el nombre master.html.
<!DOCTYPE html>
<html lang="es">
<head>
<title>{% block title %}{% endblock %} | Vigilador de Wallapop</title>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" href="https://bootswatch.com/superhero/bootstrap.css">
</head>
<body>
<div class="container">
<ul class="nav nav-pills nav-justified">
<li role="presentation" {% if active_page == "buscador" or not active_page %}class="active"{% endif %}><a href="">Buscador</a></li>
<li role="presentation" {% if active_page == "programadas" %}class="active"{% endif %}><a href="#">Programadas</a></li>
</ul>
{% block body %}{% endblock %}
</div>
</body>
</html>
En items vamos a tener nuestra primera página real que va a heredar de master.html. Dentro de items creamos buscador.html.
{% extends 'layouts/master.html' %}
{% set active_page = "buscador" %}
{% block title %}Buscador{% endblock %}
{% block body %}
<h1>Buscador</h1>
{% endblock %}
Actulizamos app.py para que trabaje nuestro motor de plantillas.
from flask import Flask, render_template
# Flask
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'mi secreto'
@app.route('/')
def buscador():
return render_template('items/buscador.html')
if __name__ == '__main__':
app.run()
Creamos la segunda página donde tendremos nuestras busquedas almacenadas. Dentro de items creamos un fichero nuevo con el nombre de programadas.html.
{% extends 'layouts/master.html' %}
{% set active_page = "programadas" %}
{% block title %}Programadas{% endblock %}
{% block body %}
<h1>Soy la página donde estará las programadas</h1>
{% endblock %}
Actulizamos app.py con la nueva página.
from flask import Flask, render_template
# Flask
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'mi secreto'
@app.route('/')
def buscador():
return render_template('items/buscador.html')
@app.route('/programadas')
def programadas():
return render_template('items/programadas.html')
if __name__ == '__main__':
app.run()
Como último detalle haremos que nuestros botones del navegador tengan las rutas correctas.
``` jinja2{% raw %} <!DOCTYPE html>
---
#### 1.3 Forms
Realizamos el nuevo archivo **forms.py**.
``` python3
from flask_wtf import FlaskForm
from wtforms import StringField, IntegerField
from wtforms.validators import DataRequired, Length, NumberRange, Optional
class SearchForm(FlaskForm):
name = StringField('Nombre', [Length(min=1, max=100, message='Es demasiado largo'), DataRequired(message='Campo obligatorio')])
price_max = IntegerField('Precio', [NumberRange(1, message='No puede ser inferior a 1'), Optional()])
Lo cargamos y se lo pasamos a la plantilla.
from flask import Flask, render_template
from forms import SearchForm
# Flask
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'mi secreto'
@app.route('/', methods=('GET', 'POST'))
def buscador():
form = SearchForm()
if form.validate_on_submit():
pass
return render_template('items/buscador.html', form=form)
@app.route('/programadas')
def programadas():
return render_template('items/programadas.html')
if __name__ == '__main__':
app.run()
Imprimimos los campos con un bucle en nuestra plantilla buscador.html.
{% extends 'layouts/master.html' %}
{% set active_page = "buscador" %}
{% block title %}Buscador{% endblock %}
{% block body %}
<h1>Buscador</h1>
<div class="row">
<div class="col-xs-12">
<form method="post">
{{ form.csrf_token }}
{% for input in form %}
{% if input.type != 'CSRFTokenField' %}
<div class="form-group">
{# Label #}
{{ input.label }}
{# Input #}
{{ input(class="form-control") }}
{# Errors #}
{% if input.errors %}
<div class="has-error">
{% for error in input.errors %}
<label class="help-block">
{{ error }}
</label>
{% endfor %}
</div>
{% endif %}
</div>
{% endif %}
{% endfor %}
<input type="submit" class="btn btn-primary" value="Buscar">
</form>
</div>
</div>
{% endblock %}
1.4 Search
Ha llegado la hora de lo divertido. Primero actulizamos nuestro app.py para obtener los datos del formulario si pasa las validaciones. Después, con esa información, haremos una llamada al API de Wallapop. Solo necesitaremos la URL que utilizan en su APP. Con urllib3 tendremos todos los resultados en un sencillo diccionario. Lo cual es magnífico, ya que es fácil de iterar dentro de nuestra plantilla.
from flask import Flask, render_template, request
from forms import SearchForm
# Get data Wallapop
import json
from urllib3 import PoolManager
import urllib.parse
# Flask
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'mi secreto'
@app.route('/', methods=('GET', 'POST'))
def buscador():
form = SearchForm()
results = None
if form.validate_on_submit():
name = form.name.data
price_max = form.price_max.data or ''
# Search in Wallapop
results = get_resultados(name, price_max)
return render_template('items/buscador.html', form=form, results=results)
@app.route('/programadas')
def programadas():
return render_template('items/programadas.html')
def get_resultados(name='', price_max=''):
http = PoolManager()
url_api = 'http://es.wallapop.com/rest/items?minPrice=&maxPrice={price_max}&dist=&order=creationDate-des&lat=41.398077&lng=2.170432&kws={kws}'.format(
kws=urllib.parse.quote(name, safe=''),
price_max=price_max
)
results = http.request('GET', url_api)
results = json.loads(
results.data.decode('utf-8')
)
return results['items']
if __name__ == '__main__':
app.run()
Y en nuestra plantilla de buscador.html.
{% extends 'layouts/master.html' %}
{% set active_page = "buscador" %}
{% block title %}Buscador{% endblock %}
{% block body %}
<h1>Buscador</h1>
<div class="row">
<div class="col-xs-12">
<form method="post">
{{ form.csrf_token }}
{% for input in form %}
{% if input.type != 'CSRFTokenField' %}
<div class="form-group">
{# Label #}
{{ input.label }}
{# Input #}
{{ input(class="form-control") }}
{# Errors #}
{% if input.errors %}
<div class="has-error">
{% for error in input.errors %}
<label class="help-block">
{{ error }}
</label>
{% endfor %}
</div>
{% endif %}
</div>
{% endif %}
{% endfor %}
<input type="submit" class="btn btn-primary" value="Buscar">
</form>
</div>
</div>
{% if results %}
<table class="table">
{% for item in results %}
<tr>
<td><img class="img-responsive" src="{{ item.pictureURL }}" alt="{{ item.title }}"></td>
<td>{{ item.title }}</td>
<td>{{ item.price }}</td>
<td><a href="#" class="btn btn-success">+</a></td>
</tr>
{% endfor %}
</table>
{% endif %}
{% endblock %}
Y… ya tenemos un buscador no oficial.
Bases de datos y CRUD elementos con Flask
2.1 Models
Con Flask-alquemy vamos a definir la estructura de nuestra base de datos. En este caso tendremos una única tabla llamada Programado con los campos: id, title y last_item. Para ello crearemos un nuevo archivo con el nombre models.py.
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.sqlite'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
class Programado(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(128))
last_item = db.Column(db.Integer)
Esta forma de trabajar tan limpia carece de varias funcionalidades básicas, como migraciones o la posibilidad de ejecutar ordenes por medio del terminal. Para ello le sumaremos Flask-Migrate para las migraciones automáticas y Flask-Script para su gestión.
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.sqlite'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
migrate = Migrate(app, db)
manager = Manager(app)
manager.add_command('db', MigrateCommand)
class Programado(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(128))
last_item = db.Column(db.Integer)
if __name__ == "__main__":
manager.run()
Abrimos nuestro terminal e iniciamos la base de datos, creamos la primera migración y actualizamos la base de datos.
python3 models.py db init
python3 models.py db migrate
python3 models.py db upgrade
Comprobamos que todo ha ido bien.
sqlite3 database.sqlite
.schema Programado
.exit
2.2 Save item
Para guarda un elemento necesitamos modificar nuestra plantilla buscador.html. Enviaremos un POST. Qué sencillamente será un <form> con las variables ocultas. En este caso lo que haremos será mostrar el botón solamente visible cuando recibamos una petición POST. Su finalidad será realizar una petición a la página programadas_nuevo() con el nombre que hemos buscado.
{% extends 'layouts/master.html' %}
{% set active_page = "buscador" %}
{% block title %}Buscador{% endblock %}
{% block body %}
<h1>Buscador</h1>
<div class="row">
<div class="col-xs-12">
<form method="post">
{{ form.csrf_token }}
{% for input in form %}
{% if input.type != 'CSRFTokenField' %}
<div class="form-group">
{# Label #}
{{ input.label }}
{# Input #}
{{ input(class="form-control") }}
{# Errors #}
{% if input.errors %}
<div class="has-error">
{% for error in input.errors %}
<label class="help-block">
{{ error }}
</label>
{% endfor %}
</div>
{% endif %}
</div>
{% endif %}
{% endfor %}
<input type="submit" class="btn btn-primary" value="Buscar">
</form>
{% if results %}
<form action="{{ url_for('programadas_nuevo') }}" method="post">
<input type="hidden" name="title" value="{{ form.name.data }}">
<input type="submit" class="btn btn-success" value="Programar">
</form>
{% endif %}
</div>
</div>
{% if results %}
<table class="table">
{% for item in results %}
<tr>
<td><img class="img-responsive" src="{{ item.pictureURL }}" alt="{{ item.title }}"></td>
<td>{{ item.title }}</td>
<td>{{ item.price }}</td>
</tr>
{% endfor %}
</table>
{% endif %}
{% endblock %}
Ahora, es el momento de crear la función para programadas_nuevo() en app.py. Lo primero que le indicamos es que solo puede aceptar peticiones POST. A continuación creamos variables para guardar la información del formulario. Después creamos el registro en la base de datos. Por último redireccionamos a la anterior página para ver el nuevo elemento.
Vayamos por partes. Importamos db que será nuestro ORM, y Programado que será la tabla a manipular.
from models import db, Programado
Creamos el nuevo registro.
my_program = Programado(
title=title
)
Lo añadimos a la cola.
db.session.add(my_program)
Y ejecutamos las modificaciones. En caso que fallara, dejaría los datos como estaban.
try:
db.session.commit()
except:
db.session.rollback()
Todo junto quedará así.
from flask import Flask, render_template, request, redirect, url_for
from forms import SearchForm
# Get data Wallapop
import json
from urllib3 import PoolManager
import urllib.parse
# Database
from models import db, Programado
# Flask
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'mi secreto'
@app.route('/', methods=('GET', 'POST'))
def buscador():
form = SearchForm()
results = None
if form.validate_on_submit():
name = form.name.data
price_max = form.price_max.data or ''
# Search in Wallapop
results = get_resultados(name, price_max)
return render_template('items/buscador.html', form=form, results=results)
def get_resultados(name='', price_max=''):
http = PoolManager()
url_api = 'http://es.wallapop.com/rest/items?minPrice=&maxPrice={price_max}&dist=&order=creationDate-des&lat=41.398077&lng=2.170432&kws={kws}'.format(
kws=urllib.parse.quote(name, safe=''),
price_max=price_max
)
results = http.request('GET', url_api)
results = json.loads(
results.data.decode('utf-8')
)
return results['items']
@app.route('/programadas')
def programadas():
return render_template('items/programadas.html')
@app.route('/programadas/nuevo', methods=('POST',))
def programadas_nuevo():
title = request.form['title']
# We saved in the database
my_program = Programado(
title=title
)
db.session.add(my_program)
try:
db.session.commit()
except:
db.session.rollback()
return redirect(url_for('programadas'))
if __name__ == '__main__':
app.run()
2.3 View items
Lamentablemente veremos la página vacía. Por ahora. Haremos una consulta a la base de datos para que nos de todos los registros de la tabla Programado, y se lo pasaremos a la plantilla. Para ello modificaremos la función que renderiza la plantilla programadas.html, que en nuestro caso se llama programadas() y esta en app.py.
from flask import Flask, render_template, request, redirect, url_for
from forms import SearchForm
# Get data Wallapop
import json
from urllib3 import PoolManager
import urllib.parse
# Database
from models import db, Programado
# Flask
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'mi secreto'
@app.route('/', methods=('GET', 'POST'))
def buscador():
form = SearchForm()
results = None
if form.validate_on_submit():
name = form.name.data
price_max = form.price_max.data or ''
# Search in Wallapop
results = get_resultados(name, price_max)
return render_template('items/buscador.html', form=form, results=results)
@app.route('/programadas')
def programadas():
programado_all = Programado.query.all()
return render_template('items/programadas.html', programado_all=programado_all)
def get_resultados(name='', price_max=''):
http = PoolManager()
url_api = 'http://es.wallapop.com/rest/items?minPrice=&maxPrice={price_max}&dist=&order=creationDate-des&lat=41.398077&lng=2.170432&kws={kws}'.format(
kws=urllib.parse.quote(name, safe=''),
price_max=price_max
)
results = http.request('GET', url_api)
results = json.loads(
results.data.decode('utf-8')
)
return results['items']
@app.route('/programadas/nuevo', methods=('POST',))
def programadas_nuevo():
title = request.form['title']
# We saved in the database
my_program = Programado(
title=title
)
db.session.add(my_program)
try:
db.session.commit()
except:
db.session.rollback()
return redirect(url_for('programadas'))
if __name__ == '__main__':
app.run()
Actualizamos la plantilla programadas.html con un bucle para mostrar todos los resultados en una tabla de HTML.
``` jinja2{% raw %} {% extends ‘layouts/master.html’ %} {% set active_page = “programadas” %} {% block title %}Programadas{% endblock %} {% block body %}
Programadas
Titulo |
---|
{{ item.title }} |
{% endblock %}
---
#### 2.4 Delete item
Repetiremos la estrategia anterior. En el *bucle* que muestra todos los resultados en **programadas.html**, añadimos un formulario que nos envíe un *id* a una futura función que definiremos en **app.py**.
``` jinja2
{% extends 'layouts/master.html' %}
{% set active_page = "programadas" %}
{% block title %}Programadas{% endblock %}
{% block body %}
<h1>Programadas</h1>
<table class="table">
<thead>
<tr>
<th scope="col">Titulo</th>
<th></th>
</tr>
</thead>
<tbody>
{% for item in programado_all %}
<tr>
<td>{{ item.title }}</td>
<td>
<form action="{{ url_for('programadas_borrar') }}" method="post">
<input type="hidden" name="id" value="{{ item.id }}">
<input type="submit" class="btn btn-danger" value="-">
</form>
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endblock %}
La manera de eliminar un registro consiste en realizar una busqueda de los elementos que quieres eliminar, y luego ponerlo en la cola para eliminarlos. Por último ejecutas la orden como antes.
db.session.delete(my_program)
try:
db.session.commit()
except:
db.session.rollback()
Quedaría así.
from flask import Flask, render_template, request, redirect, url_for
from forms import SearchForm
# Get data Wallapop
import json
from urllib3 import PoolManager
import urllib.parse
# Database
from models import db, Programado
# Flask
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'mi secreto'
@app.route('/', methods=('GET', 'POST'))
def buscador():
form = SearchForm()
results = None
if form.validate_on_submit():
name = form.name.data
price_max = form.price_max.data or ''
# Search in Wallapop
results = get_resultados(name, price_max)
return render_template('items/buscador.html', form=form, results=results)
@app.route('/programadas')
def programadas():
programado_all = Programado.query.all()
return render_template('items/programadas.html', programado_all=programado_all)
def get_resultados(name='', price_max=''):
http = PoolManager()
url_api = 'http://es.wallapop.com/rest/items?minPrice=&maxPrice={price_max}&dist=&order=creationDate-des&lat=41.398077&lng=2.170432&kws={kws}'.format(
kws=urllib.parse.quote(name, safe=''),
price_max=price_max
)
results = http.request('GET', url_api)
results = json.loads(
results.data.decode('utf-8')
)
return results['items']
@app.route('/programadas/nuevo', methods=('POST',))
def programadas_nuevo():
title = request.form['title']
# We saved in the database
my_program = Programado(
title=title
)
db.session.add(my_program)
try:
db.session.commit()
except:
db.session.rollback()
return redirect(url_for('programadas'))
@app.route('/programadas/borrar', methods=('POST',))
def programadas_borrar():
my_program = Programado.query.get(request.form['id'])
db.session.delete(my_program)
try:
db.session.commit()
except:
db.session.rollback()
return redirect(url_for('programadas'))
if __name__ == '__main__':
app.run()
2.5 Flash messages
Tenemos un problema de usabilidad: ¡El usuario esta a ciegas cuando añade o borra! Tenemos que informarle de que esta pasando cuando apreta botones. Para ello nos haremos uso de los Fash messages. Como queremos que se vean en todas nuestras páginas, modificamos master.html.
<!DOCTYPE html>
<html lang="es">
<head>
<title>{% block title %}{% endblock %} | Vigilador de Wallapop</title>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" href="https://bootswatch.com/superhero/bootstrap.css">
</head>
<body>
<div class="container">
<ul class="nav nav-pills nav-justified">
<li role="presentation" {% if active_page == "buscador" or not active_page %}class="active"{% endif %}><a href="{{ url_for('buscador') }}">Buscador</a></li>
<li role="presentation" {% if active_page == "programadas" %}class="active"{% endif %}><a href="{{ url_for('programadas') }}">Programadas</a></li>
</ul>
{# Flashed messages #}
{% with messages = get_flashed_messages() %}
{% if messages %}
{% for message in messages %}
<div class="alert alert-success" role="alert">{{ message }}</div>
{% endfor %}
{% endif %}
{% endwith %}
{# End Flashed messages #}
{% block body %}{% endblock %}
</div>
</body>
</html>
Ahora ya será visible todos los mensajes en cajas elegantes de Bootstrap. Importamos flash.
from flask import Flask, render_template, request, redirect, url_for, flash
Y añadimos los mensajes que deseamos ver. Por ejemplo.
flash('Añadida con éxito.')
Nuestro app.py se nos quedaría así.
from flask import Flask, render_template, request, redirect, url_for, flash
from forms import SearchForm
# Get data Wallapop
import json
from urllib3 import PoolManager
import urllib.parse
# Database
from models import db, Programado
# Flask
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'mi secreto'
@app.route('/', methods=('GET', 'POST'))
def buscador():
form = SearchForm()
results = None
if form.validate_on_submit():
name = form.name.data
price_max = form.price_max.data or ''
# Search in Wallapop
results = get_resultados(name, price_max)
return render_template('items/buscador.html', form=form, results=results)
@app.route('/programadas')
def programadas():
programado_all = Programado.query.all()
return render_template('items/programadas.html', programado_all=programado_all)
def get_resultados(name='', price_max=''):
http = PoolManager()
url_api = 'http://es.wallapop.com/rest/items?minPrice=&maxPrice={price_max}&dist=&order=creationDate-des&lat=41.398077&lng=2.170432&kws={kws}'.format(
kws=urllib.parse.quote(name, safe=''),
price_max=price_max
)
results = http.request('GET', url_api)
results = json.loads(
results.data.decode('utf-8')
)
return results['items']
@app.route('/programadas/nuevo', methods=('POST',))
def programadas_nuevo():
title = request.form['title']
# We saved in the database
my_program = Programado(
title=title
)
db.session.add(my_program)
try:
db.session.commit()
flash('Añadida con éxito.')
except:
db.session.rollback()
return redirect(url_for('programadas'))
@app.route('/programadas/borrar', methods=('POST',))
def programadas_borrar():
my_program = Programado.query.get(request.form['id'])
db.session.delete(my_program)
try:
db.session.commit()
flash('Borrada "{title}".'.format(title=my_program.title))
except:
db.session.rollback()
return redirect(url_for('programadas'))
if __name__ == '__main__':
app.run()
Envío de emails con nuevos elementos
3.1 Command
Ya tenemos montado nuestra interfaz para gestionar nuestras busquedas. Lo siguiente será crear un script que se encargue de verificar si hay nuevos resultados. Y si es así, enviarnos un email. El primer paso será crear con Flask un comando personalizado. Realizamos un nuevo archivo llamado avisador.py.
#!/usr/bin/env python3
from flask_script import Manager
from app import app
manager = Manager(app)
@manager.command
def hello():
print('hello PyConES17')
if __name__ == "__main__":
manager.run()
Para probar que funciona ejecutamos, siempre con el entorno virtual activo, lo siguiente.
chmod +x avisador.py
./avisador.py hello
Si todo ha ido bien nos responderá.
hello PyConES17
3.2 SMTP Server
Para enviar un email si o si necesitaremos una servidor SMTP. Podéis usar GMail, Hotmail, Fastmail… o cualquier cuenta de correo. Para el taller, usaremos Mailgun. Un poderoso servicio profesional para el envío de emails. Nos permite 10.000 envíos mensuales gratuitos. Suficientes para lo que necesitamos. Entramos aquí.
Creamos una nueva cuenta.
Confirmamos nuestra cuenta por el enlace que nos han enviado a nuestro email.
Al pulsar sobre el enlace nos llevará a esta página. Confirmamos nuestro teléfono y pulsamos en Domains. ¡Ojo! Si no confirmáis vuestro teléfono, Mailgun no funcionará.
Entramos en nuestro dominio activo.
Aquí tendremos los accesos que necesitaremos. Dejamos abierta esta página para más adelante.
3.3 Send email
Ya tenemos nuestro servidor de email. Ahora vamos a enviar un correo de prueba. Abrimos avisador.py para importar flask-mail.
from flask_mail import Mail, Message
Configuramos flask-mail. MAIL_USERNAME será Default SMTP Login de mailgun. Y MAIL_PASSWORD será Default Password de mailgun.
app.config.update(
MAIL_SERVER='smtp.mailgun.org',
MAIL_PORT=587,
MAIL_USERNAME='tu_default_smtp_login',
MAIL_PASSWORD='tu_default_password'
)
mail = Mail(app)
Creamos un comando para que nos envíe un email de prueba.
@manager.command
def send_email():
msg = Message(
"Nuevo aviso",
sender="no-reply@pycon17.es",
recipients=["tu_email"]
)
msg.body = "testing"
msg.html = "<b>testing</b>"
mail.send(msg)
Todo junto quedaría así.
#!/usr/bin/env python3
from flask_script import Manager
from app import app
from flask_mail import Mail, Message
app.config.update(
MAIL_SERVER='smtp.mailgun.org',
MAIL_PORT=587,
MAIL_USERNAME='tu_default_smtp_login',
MAIL_PASSWORD='tu_default_password'
)
mail = Mail(app)
manager = Manager(app)
@manager.command
def send_email():
msg = Message(
"Nuevo aviso",
sender="no-reply@pycon17.es",
recipients=["tu_email"]
)
msg.body = "testing"
msg.html = "<b>testing</b>"
mail.send(msg)
if __name__ == "__main__":
manager.run()
Ejecutamos.
./avisador.py send_email
Revisamos nuestra bandeja de entrada. En caso contrario buscamos en spam.
3.4 Notification
Estamos listos para realizar el sistema de notificación. La lógica será básica: buscamos todos los productos que tenga la palabra que tenemos guarda. Nos quedamos con la primera id. Si la id es la misma que tenemos en la base de datos, no hacemos nada. Si es diferente, actualizamos la base de datos y enviamos un email.
Abrimos avisador.py. Primero, importamos nuestra funcion para obtener los elementos del API de Wallapop.
from app import app, get_resultados
Además, importamos nuestra tabla con las palabras guardadas.
from models import db, Programado
Recorremos todas las palabras almacenadas.
@manager.command
def send_email():
programados = Programado.query.all()
for item in programados:
Obtenemos el primer id. Que lo usaremos para comparar si la tenemos guardada.
@manager.command
def send_email():
programados = Programado.query.all()
for item in programados:
# Get last id
results = get_resultados(item.title)
itemId = results[0]['itemId']
if int(itemId) != item.last_item:
Para actualizar en SQLAlchemy hay que obtener el resultado, modificar el objeto, y devolverlo. Para ilustrar la forma de trabajar dejo un ejemplo. Modifico la columna gana que es un boolean y la columna nombre que es un string.
spartano = User.query.filter_by(id=1).first()
spartano.gana = False
spartano.nombre = 'Leónidas'
db.session.add(spartano)
db.session.commit()
En nuestro código quedaría implementado de la siguiente forma.
@manager.command
def send_email():
programados = Programado.query.all()
for item in programados:
# Get last id
results = get_resultados(item.title)
itemId = results[0]['itemId']
# Update last item in database
if int(itemId) != item.last_item:
programado_update = Programado.query.filter_by(id=item.id).first()
programado_update.last_item = itemId
db.session.add(programado_update)
try:
db.session.commit()
except:
db.session.rollback()
Ya solo nos queda enviar el email.
@manager.command
def send_email():
programados = Programado.query.all()
for item in programados:
# Get last id
results = get_resultados(item.title)
itemId = results[0]['itemId']
# Update last item in database
if int(itemId) != item.last_item:
programado_update = Programado.query.filter_by(id=item.id).first()
programado_update.last_item = itemId
db.session.add(programado_update)
try:
db.session.commit()
except:
db.session.rollback()
# Send email
msg = Message(
"Nuevo aviso",
sender="no-reply@pycon17.es",
recipients=["tu email"]
)
msg.body = render_template('emails/notificacion.txt', title=results[0]['title'], id=itemId)
msg.html = render_template('emails/notificacion.html', title=results[0]['title'], id=itemId)
mail.send(msg)
Todo junto quedaría.
#!/usr/bin/env python3
from flask import render_template
from flask_script import Manager
from app import app, get_resultados
from flask_mail import Mail, Message
from models import db, Programado
app.config.update(
MAIL_SERVER='smtp.mailgun.org',
MAIL_PORT=587,
MAIL_USERNAME='tu_default_smtp_login',
MAIL_PASSWORD='tu_default_password'
)
mail = Mail(app)
manager = Manager(app)
@manager.command
def send_email():
programados = Programado.query.all()
for item in programados:
# Get last id
results = get_resultados(item.title)
itemId = results[0]['itemId']
# Update last item in database
if int(itemId) != item.last_item:
programado_update = Programado.query.filter_by(id=item.id).first()
programado_update.last_item = itemId
db.session.add(programado_update)
try:
db.session.commit()
except:
db.session.rollback()
# Send email
msg = Message(
"Nuevo aviso",
sender="no-reply@pycon17.es",
recipients=["tu email"]
)
msg.body = render_template('emails/notificacion.txt', title=results[0]['title'], id=itemId)
msg.html = render_template('emails/notificacion.html', title=results[0]['title'], id=itemId)
mail.send(msg)
if __name__ == "__main__":
manager.run()
3.5 View email
Ya no estoy enviando un texto sencillo en el email. Necesito la magia de flask con su render_template. Puedes observar como hago uso de dos plantillas donde paso dos variables. El titulo y la id del item.
Creamos una carpeta nueva dentro de templates con el nombre emails. Y dentro de esta, el archivo notificacion.html y notificacion.txt. Quedará la estructura así.
--> templates
--> emails
--> notificacion.html
--> notificacion.txt
--> items
--> buscador.html
--> programadas.html
--> layouts
--> master.html
Abrimos notificacion.txt e introducimos.
Aviso
{{ title }}
http://p.wallapop.com/i/{{ id }}?_pid=web&_me=www&campaign=mobile_item
Y en notificacion.html lo siguiente.
<!DOCTYPE html>
<html lang="es">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta http-equiv="X-UA-Compatible" content="ie=edge">
<title>Notificacion</title>
</head>
<body>
<h1>Aviso</h1>
<h2>{{ title }}</h2>
<a href="http://p.wallapop.com/i/{{ id }}?_pid=web&_me=www&campaign=mobile_item">Ver</a>
</body>
</html>
¡E voilà! Ya hemos terminado. Cuando nos llegue los nuevos avisos podremos pulsar en el enlace para ver la ficha del producto. Y si estamos en el smartphone, se nos abrirá la aplicación oficial.
3.6 Automation
Para revisar y recibir los emails solo tendrás que ejecutar el comando personalizado. De la misma forma que antes.
./avisador.py send_email
Mi recomendación es ejecutarlo en un cron cada hora y listo.
{{ comments.length }} comentarios