https://d226lax1qjow5r.cloudfront.net/blog/blogposts/fast-voice-broadcast-python-dr/livecoding-voice-broadcast.png

Emisión de voz superrápida con Python asíncrono y Sanic

Publicado el May 7, 2021

Tiempo de lectura: 11 minutos

Los SMS se han convertido en el método de facto para enviar notificaciones cuando push no está disponible. Tanto es así que ya casi nunca recibo un SMS de una "persona" real. Mis colegas usan Slack, mis amigos Facebook Messenger, mis amigos preocupados por la seguridad Telegram y mis amigos paranoicos Signal. Incluso mi madre, que no tuvo su primer smartphone hasta el año pasado, ahora me envía fotos bonitas de mi sobrina por WhatsApp en lugar de por correo electrónico.

A medida que dejaba de ver los SMS como una forma de comunicarme con amigos y familiares para convertirlos en un canal de notificación de servicios, me di cuenta de que ya no reaccionaba de la misma manera ante el nuevo sonido de los SMS. No transmite la misma urgencia que antes; ahora sé que es más probable que sea un cupón para mi pizzería local que algo que requiera mi atención inmediata.

Y a menudo esto está bien. No todas las notificaciones son urgentes. Pero, ¿qué pasa con los mensajes que sí lo son? ¿Las alertas críticas? Notificaciones sobre las que hay que actuar ahora mismo, como cortes de servicio o avisos de condiciones meteorológicas extremas. Para eso necesitamos algo más difícil de ignorar: un teléfono que suena.

¿No es curioso? Oyes sonar un teléfono y podría ser cualquiera. Pero a un teléfono que suena hay que contestar, ¿no? - El que llama, Phone Booth (2002)

Antes de empezar

Antes de empezar, necesitarás algunas cosas.

  • Python 3.5 o superior, vamos a utilizar algunas de las nuevas funciones async de Pythonpor lo que necesitaremos una versión bastante moderna. También recomendaría virtualenv ya que vamos a necesitar instalar algunas dependencias.

  • MongoDB instalado localmente

  • ngrok o una forma similar de exponer su aplicación a Internet

Vonage API Account

To complete this tutorial, you will need a Vonage API account. If you don’t have one already, you can sign up today and start building with free credit. Once you have an account, you can find your API Key and API Secret at the top of the Vonage API Dashboard.

This tutorial also uses a virtual phone number. To purchase one, go to Numbers > Buy Numbers and search for one that meets your needs.

Configurar nuestra base de datos

Vamos a utilizar MongoDB para almacenar los números de las personas a las que tenemos que llamar. Nuestros documentos van a ser especialmente simples:

{
    "_id" : ObjectId("599d3f2c736544a32f48d3c4"),
    "number" : "<NUMBER TO CALL>"
}

Así que antes de empezar vamos a abrir nuestro shell y añadir algunos documentos a nuestra colección. Inicia la shell de MongoDB ejecutando mongodb.

use contactsDatabase
db.contactsCollection.insert([{"number": "<NUMBER TO CALL>"}, {"number": "<NUMBER TO CALL>"}])

El método .insert() acepta una lista de documentos, así que añade algunos números diferentes a los que quieras llamar como parte de este ejemplo. Estos también pueden ser Nexmo Virtual Numbers si necesita algunos extras para pruebas.

Configuración de nuestro proyecto Python Voice Broadcast

Todo el código de este ejemplo se encuentra en Comunidad Nexmo Github. Deberíamos clonar esto ahora e instalar nuestros requisitos. A partir de ahora asegúrate de que estás ejecutando todos estos comandos dentro de tu entorno virtual. A medida que avancemos necesitarás múltiples ventanas de terminal, así que recuerda activar el entorno en cada una de ellas.

View the Python voice broadcast example code

git clone
cd python-sanic-voice-broadcast/
pip install -r requirements.txt

Ahora que tenemos todo nuestro código localmente, y hemos instalado todas nuestras dependencias con pipnos queda un último paso. Para utilizar la Voice API de Nexmonecesitaremos el ID de nuestra aplicación, la clave privada para la aplicación, y el número virtual Nexmo a utilizar como número de la persona que llama.

Copie la clave privada que Nexmo generó cuando creó su aplicación de voz y colóquela en su directorio python-sanic-voice-broadcast/ directorio. Deberá cambiarle el nombre a broadcast.key también.

A continuación, vamos a guardar nuestro ID de aplicación y número virtual como variables de entorno para que podamos acceder a ellos en nuestro código. Necesitarás export estas variables cada vez que abras un nuevo Terminal, o si estás usando virtualenvwrapper puedes usar postactivate para establecerlas automáticamente cuando actives tu entorno virtual.

export BROADCAST_APPLICATION_ID="<YOUR APPLICATION ID>"
export BROADCAST_NUMBER_FROM="<YOUR NEXMO VIRTUAL NUMBER>"

Llamadas de voz salientes y objetos de control de llamadas Nexmo

Tenemos que indicar a la API Nexmo qué acciones debe realizar cada vez que el usuario responda a nuestra llamada. Al igual que en mi anterior entrada del blog de texto-a-voz vamos a utilizar la talk acción y una voz sintetizada para leer nuestra notificación al usuario.

La API de Nexmo realizará una solicitud GET a la URL de respuesta que proporcionaste al crear tu aplicación. Esta URL tendrá que ser accesible por Nexmo, así que si vas a ejecutar tu servidor localmente, tendrás que utilizar una herramienta como ngrok para exponer tu servidor local a la Internet pública.

¡Hay que ir rápido! Creación de un servidor asíncrono Python Sanic

Sanic running in a TerminalSanic running in a Terminal

Puedes encontrar el código para el servidor en el archivo server.py pero concentrémonos en la ruta answer por ahora.

@app.route("/")
async def answer(request):
    return json([{
        'action': 'talk',
        'text': 'This is a message from the Nexmo broadcast system'
    }])

Nuestra OCN es increíblemente simple. Tenemos una única acción que leerá el texto "Este es un mensaje del sistema de difusión Nexmo" cada vez que un usuario responda a nuestra llamada saliente. Estamos utilizando el método Sanic json para asegurarnos de que enviamos las cabeceras HTTP correctas con nuestra respuesta JSON.

Intentémoslo ahora. En tu terminal ejecuta python server.py y visita http://127.0.0.1:8000 en tu navegador.

Este servidor sólo es accesible localmente, pero necesitamos que esté disponible para la API de Nexmo. Si estás usando ngrok para crear un túnel a tu localhost, entonces este sería un buen momento para abrir otra Terminal e iniciar ngrok:

ngrok http 8000

Recuerda actualizar la respuesta de tu aplicación de voz y las URL de los eventos para que coincidan con tu dirección ngrok. Puedes encontrar la ruta del evento en el archivo server.py archivo.

Realizar una llamada de voz síncrona

En este ejemplo, vamos a utilizar el cliente cliente Python Nexmo. Pero el código es prácticamente el mismo para JavaScript, Java, PHP, Ruby o ASP.NET.

import os
import nexmo
from pymongo import MongoClient


if __name__ == '__main__':
    # Connect to our mongo database
    db_client = MongoClient('mongodb://localhost:27017/')
    collection = db_client.contactsDatabase.contactsCollection

    # Create our Nexmo client
    nexmo_client = nexmo.Client(
        application_id=os.environ['BROADCAST_APPLICATION_ID'],
        private_key='broadcast.key'
    )

    # Grab a single contact from our database
    contact = collection.find_one()

    # Create an outbound call to the selected user
    response = nexmo_client.create_call({
        'to': [{'type': 'phone', 'number': contact['number']}],
        'from': {'type': 'phone', 'number': os.environ['BROADCAST_NUMBER_FROM']},
        'answer_url': ['https://nexmo-broadcast.ngrok.io']
    })

    print(response)

El proceso es sencillo: nos conectamos a nuestro almacén de datos, obtenemos un número de teléfono al que llamar, creamos una nueva llamada saliente utilizando la API de Nexmo y la librería cliente Python.

Utilizar el cliente Python es la forma más sencilla de realizar una llamada saliente. Pero también es síncrono. Nuestro cliente Python utiliza la increíble librería requests. Pero desafortunadamente, requests no es una librería asíncrona, aunque está en camino.

![Captura de pantalla del cliente Nexmo Python ejecutándose en la clase Terminal"](https://d226lax1qjow5r.cloudfront.net/blog/blogposts/super-fast-voice-broadcast-with-asynchronous-python-and-sanic/making-outbound-voice-call-terminal.png "Captura de pantalla del cliente Nexmo Python ejecutándose en la clase Terminal")

Si usted tiene un pequeño número de notificaciones para enviar esto es probablemente lo suficientemente bueno. La API de Nexmo es rápida, pero hay que conectarse a ella en línea. Va a haber algo de latencia. Desde mi oficina en Glasgow, Escocia se tarda aproximadamente 1 segundo para la solicitud y respuesta de la API Nexmo. Así que el envío de unas pocas notificaciones de esta manera probablemente estaría bien, pero si quería enviar miles de notificaciones, o incluso cientos de miles de notificaciones de forma sincrónica probablemente no es la mejor manera.

Hacer una llamada de voz asíncrona

Todo el código de esta sección se encuentra en el archivo broadcast.py archivo. Es un poco más complejo que el anterior ya que vamos a recrear una pequeña parte del cliente Python de Nexmo; de forma que soporte llamadas asíncronas.

Echemos un vistazo primero a nuestro bucle de eventos.

def run_event_loop():
    loop = asyncio.get_event_loop()
    future = asyncio.Future()

    asyncio.ensure_future(broadcast(future, loop))
    loop.run_until_complete(future)

    logger.debug(future.result())
    loop.close()

Aquí tenemos un futuro y una coroutine, esta Future se ejecutará hasta que nuestro método broadcast indique que se ha completado llamando a future.set_result. Es esta broadcast coroutine la que reunirá todas las llamadas que necesitamos hacer. Veámosla a continuación.

async def broadcast(future, loop):
    # Connect to MongoDB
    client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
    contacts_collection = client.contactsDatabase.contactsCollection
    cursor = contacts_collection.find()

    # Use the aiohttp client which is async
    async with aiohttp.ClientSession(loop=loop) as session:

        # Use a list comprehension to call create_call with each number
        tasks = [
            create_call(session=session, number=document['number'])
            for document in await cursor.to_list(length=100)
        ]
        await asyncio.gather(*tasks)

    # Signal that our future is now complete
    future.set_result(f'attempted to ring {len(tasks)} people')

Lo primero que hay que tener en cuenta es que se trata de una coroutine, y que estamos usando la sintaxis async def lo que significa que necesitaremos una versión de Python >= 3.5

Python Motor MascotPython Motor Mascot

Nuestro código de cliente MongoDB también ha cambiado algo. Ahora usando motor en lugar de la librería pymongo. Hemos hecho este cambio porque pymongo no es asíncrono. Para evitar que nuestras llamadas a Mongo se bloqueen tenemos que usar motor en su lugar.

La parte más importante de esta coroutine es la introducción de aiohttp. Este módulo nos permite hacer peticiones HTTP asíncronas. Vamos a crear una sesión de cliente asíncrona y pasarla a nuestra create_call coroutine.

Escribir nuestro método create_call

Puede que hayas notado en el primer ejemplo de bloqueo que el cliente Nexmo Python tiene un método create_callvamos a utilizar el mismo nombre para nuestra coroutine que llamamos anteriormente para cada uno de los números en nuestro MongoDB.

Al inicio de nuestra coroutine, estamos creando un nuevo archivo BroadcastClient.

# Wrap our JWT generation in a new class
client = await BroadcastClient.create(number_to=number)
headers = client.get_headers()
payload = client.get_payload()

La Voice API de Nexmo utiliza JSON Web Tokens (JWT) para la autenticación. Este BroadcastClient generará nuestro token y nos proporcionará métodos para crear las cabeceras correctas y la carga útil que debemos enviar con nuestra solicitud de API. Veamos esto primero antes de volver a nuestro método create_call método.

Tokens web JSON y nuestra carga útil de alerta de voz

Si se fija en el BroadcastClientnotarás que no tiene un método __init__ esto es a propósito. Necesitamos que la creación de nuestro cliente no sea bloqueante, pero también necesitamos leer el contenido de nuestra clave privada desde el disco. Normalmente, realizaríamos esta configuración en el método __init__ pero los métodos mágicos de Python no están diseñados para trabajar con async/await. En su lugar, vamos a utilizar el patrón de fábrica.

El sitio BroadcastClient tiene un método create que es asíncrono y devuelve un objeto BroadcastClient que tiene los atributos de clase correctos, incluyendo el contenido de nuestro archivo de clave privada. Puedes ver cómo lo usamos para instanciar nuestro cliente en el ejemplo de código anterior.

client = await BroadcastClient.create(number_to=number)

Una vez que tenemos nuestro cliente instanciado podemos generar las cabeceras de nuestra petición, que incluirán nuestro token.

def get_headers(self):
    iat = int(time.time())
    payload = {
        'iat': iat,
        'application_id': self.APPLICATION_ID,
        'exp': iat + 60,
        'jti': str(uuid.uuid4())
    }

    token = jwt.encode(payload, self.PRIVATE_KEY, algorithm='RS256')

    headers = {
        'User-Agent': self.USER_AGENT,
        'Authorization': 'Bearer ' + token.decode('utf-8')
    }

    return headers

Si has trabajado antes con JWT este código te resultará familiar. Si no lo has hecho, te recomiendo que leas la especificación especificación completa para entender cómo funciona.

La clave privada utilizada para codificar el token debe coincidir con la clave pública configurada para su aplicación de voz. Otra cosa a tener en cuenta es el atributo User-Agent se utiliza para identificar la aplicación y debe ser único.

La carga útil de la alerta de voz

Observará que es muy similar al diccionario que se pasa al cliente cliente Nexmo en nuestro primer ejemplo síncrono:

# Nexmo Python client (synchronous)
response = nexmo_client.create_call({
    'to': [{'type': 'phone', 'number': contact['number']}],
    'from': {'type': 'phone', 'number': os.environ['BROADCAST_NUMBER_FROM']},
    'answer_url': ['https://nexmo-broadcast.ngrok.io']
})

# Get payload method for our asynchronous example
def get_payload(self):
    return {
        'to': [{'type': 'phone', 'number': self.NUMBER_TO}],
        'from': {'type': 'phone', 'number': self.NUMBER_FROM},
        'answer_url': [self.ANSWER_URL]
    }

Las librerías cliente de Nexmo son finas sobre una API REST. Como se puede ver, incluso al escribir nuestro código sin utilizar el cliente Python, se ve muy similar.

Bien, volvamos a nuestro método create_call y veamos cómo usamos nuestro método BroadcastClient para enviar nuestras notificaciones urgentes.

Llamada a la Voice API de Nexmo

async def create_call(session, number):
    logger.info(f'calling {number}')

    # Wrap our JWT generation in a new class
    client = await BroadcastClient.create(number_to=number)
    headers = client.get_headers()
    payload = client.get_payload()

    # POST to the Nexmo API
    async with session.post('https://api.nexmo.com/v1/calls', headers=headers, json=payload) as response:
        status = response.status
        nexmo_response = await response.text()

        # 429 == rate limited, need to back off
        if status == 429:
            raise NexmoRateError

        logger.info(f'call requested to {number} ({status})')

    # The Nexmo JSON response will contain 'started' as the status
    # if everything has gone to plan
    return 'started' in nexmo_response

Una vez que hemos instanciado nuestro cliente, utilizamos la sesión aiohttp para POST a la calls de la API de Nexmo. Nuestras cabeceras ahora incluirán nuestro token JWT, y la carga útil es una representación JSON del diccionario devuelto por get_payload.

Después de hacer la solicitud POST a la API Nexmo, debemos comprobar el 429 estado HTTP. Si hemos excedido nuestro límite de velocidad este es el código que Nexmo devolverá. Así que si recibimos un 429deberíamos backoff. Actualmente, el límite de velocidad de la API para solicitudes POST a la Voice API es de dos solicitudes por segundo. Veremos los decoradores backoff dentro de un momento.

Finalmente, nuestra create_call devolverá True o False dependiendo de si la cadena JSON contiene o no el estado "iniciado". Aunque esta comprobación parece un poco rudimentaria, veremos su importancia en la siguiente sección.

Retroceder y ser un usuario educado de la API

Tenemos los siguientes decoradores en nuestro método create_call método.

@backoff.on_exception(backoff.expo, NexmoRateError, on_backoff=backoff_exception_handler)
@backoff.on_predicate(backoff.fibo, on_backoff=backoff_predicate_handler, max_tries=5)

Aquí estamos utilizando la biblioteca backoff para volver a ejecutar nuestra llamada a la API si falla, pero también esperará antes de volver a intentarlo para que no martilleemos el endpoint de la API.

Tenemos dos decoradores, cada uno esperando un tipo diferente de error de la API. El decorador on_exception se activa cuando la coroutine lanza una excepción NexmoRateError; esta excepción se produce cada vez que tenemos un estado HTTP de 429. No hemos establecido un número máximo de reintentos para este decorador, pero le hemos ordenado que utilice una duración exponencial para nuestro backoff, con jitter.

Llamadas con backoff exponencial y sin jitter

Graph showing clustering with no jitterGraph showing clustering with no jitter

Llamadas con backoff exponencial y jitter total

Graph with no clustering as jitter is appliedGraph with no clustering as jitter is applied

Gráficos de "Backoff Exponencial y Jitter" del Blog de arquitectura de AWS

Como podemos ver en el segundo gráfico, hay muchos menos grupos de llamadas cuando añadimos jitter a nuestro algoritmo. El blog de arquitectura de AWS lo explica especialmente bien en su post Backoff exponencial y fluctuación de fase.

Nuestro segundo decorador on_predicate se activa cada vez que la coroutine devuelve un valor Falsey valor. Nuestro generador para el tiempo de espera en este ejemplo es fiboque producirá los números de la secuencia de secuencia de Fibonaccide nuevo con un poco de fluctuación para evitar la agrupación.

def fibo(max_value=None):
    a = 1
    b = 1
    while True:
        if max_value is None or a < max_value:
            yield a
            a, b = b, a + b
        else:
            yield max_value

Si los generadores exponencial o Fibonacci no se adaptan a su caso de uso, es sencillo escribir los suyos propios. Según xkcd 221 aquí hay un generador que siempre produce una duración de espera aleatoria.

XKCD 221 - Random number generatorXKCD 221 - Random number generator

def xkcd():
    while True:
        yield 4  # chosen by fair dice roll, guaranteed to be random

Nuestra coroutine create_call coroutine devolverá un valor Falsey cada vez que la respuesta JSON de la API Nexmo no contenga started. Hay muchas razones diferentes por las que esto podría ser: nuestra clave privada podría ser incorrecta, tenemos valores no válidos en nuestra carga útil, no tenemos suficiente crédito en nuestra cuenta Nexmo, etc.. Estos no son problemas que se resolverán simplemente llamando a la API de nuevo. Así que en este ejemplo, le damos un par de intentos para permitir que un parpadeo menor en la conectividad o similar, y luego simplemente nos damos por vencidos.

Probarlo todo

Screencast showing multiple async tasks making outbound voice callsScreencast showing multiple async tasks making outbound voice calls

Antes de intentar ejecutar cualquiera de los dos scripts recuerde que necesitará tener Sanic y ngrok para que Nexmo pueda recuperar tu archivo NCCO.

También tendrás que completar la sección de configuración al principio de este artículo. Asegúrese de que tiene MongoDB ejecutándose con varios documentos en tu contactsCollectionhas configurado las variables de entorno necesarias, has guardado tu clave privada en la raíz de tu proyecto como broadcast.keyy que has instalado todos los requisitos en tu entorno virtual usando pip.

Una vez que haya completado todo lo anterior puede ejecutar la tarea sincrónica con:

python blocking_broadcast.py

Y para probar la versión asíncrona ejecuta:

python broadcast.py

Observe la salida del script asíncrono; el orden de las etiquetas calling number y call requested to number son probablemente diferentes ya que es asíncrono.

Compartir:

https://a.storyblok.com/f/270183/150x150/a3d03a85fd/placeholder.svg
Aaron BassettAntiguos alumnos de Vonage

Aaron era un defensor de los desarrolladores en Nexmo. Ingeniero de software experimentado y aspirante a artista digital, Aaron suele crear cosas con código o electrónica; a veces ambas cosas. Cuando está trabajando en algo nuevo, suele percibir el olor a componentes quemados en el aire.