English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية
Requisito: La función A necesita llamar a una API externa para obtener datos, y la API externa es un método de procesamiento asincrónico. Después de la llamada, devuelve datos y estado {data: "resultado de la consulta", "status": "en procesamiento asincrónico"}. Por lo tanto, se necesita esperar un tiempo antes de volver a llamar a la API externa para obtener datos. Para que el usuario no tenga que esperar porque la API externa está en procesamiento asincrónico al usar la función A, se añade la solicitud del usuario a la cola de tareas, se devuelve parte de los datos y se cierra la solicitud. Luego, se extraen tareas de la cola de tareas de forma programada y se llama a la API externa. Si el estado devuelto es "en procesamiento asincrónico", se vuelve a añadir la tarea a la cola de tareas. Si el estado devuelto es "procesado completamente", se almacena el retorno de datos en la base de datos.
Basado en los problemas mencionados anteriormente, se considera el uso de Node.js + Redis sorted set se utiliza para implementar la cola de tareas. Node.js implementa su propia API para aceptar solicitudes de usuarios, combina los datos almacenados en la base de datos con los datos devueltos por la API y los devuelve al usuario, y añade tareas a la cola de tareas. Utiliza Node.js child process y cron para extraer tareas de la cola de tareas y ejecutarlas.
Al diseñar la cola de tareas, es necesario considerar varios problemas
Solución para los problemas mencionados anteriormente
示例代码
// remote_api.js 模拟第三方 API 'use strict'; const app = require('express')(); app.get('/', (req, res) => { setTimeout(() => { let arr = [200, 300]; // 200 代表成功,300 代表失败需要重新请求 res.status(200).send({ 'status': arr[parseInt(Math.random() * 2]); }, 3000); }); app.listen('9001', () => { console.log('API 服务监听端口:9001); }); // producer.js 自身应用 API,用来接受用户请求并将任务加入任务队列 'use strict'; const app = require('express')(); const redisClient = require('redis').createClient(); const QUEUE_NAME = 'queue:example'; function addTaskToQueue(taskName, callback) { // 先判断任务是否已经存在,存在:跳过,不存在:加入任务队列 redisClient.zscore(QUEUE_NAME, taskName, (error, task) => { if (error) { console.log(error); } if (task) { console.log('任务已存在,不新增相同任务'); callback(null, task); } redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => { if (error) { callback(error); } callback(null, result); } }); } } }); } app.get('/', (req, res) => { let taskName = req.query['task-name']; addTaskToQueue(taskName, (error, result) => { if (error) { console.log(error); } res.status(200).send('正在查询中......'); } }); }); app.listen(9002, () => { console.log('生产者服务监听端口:9002); }); // consumer.js 定时获取任务并执行 'use strict'; const redisClient = require('redis').createClient(); const request = require('request'); const schedule = require('node-schedule); const QUEUE_NAME = 'queue:expmple'; const PARALLEL_TASK_NUMBER = 2; // número de tareas ejecutadas en paralelo function getTasksFromQueue(callback) { // obtener múltiples tareas redisClient.zrangebyscore([QUEUE_NAME, 1, new Date().getTime(), 'LIMIT', 0, PARALLEL_TASK_NUMBER], (error, tasks) => { if (error) { callback(error); } // asignar el valor de la puntuación de la tarea a 0, lo que significa que está en proceso if (tasks.length > 0) { let tmp = []; tasks.forEach((task) => { tmp.push(0); tmp.push(task); }); redisClient.zadd([QUEUE_NAME].concat(tmp), (error, result) => { if (error) { callback(error); } callback(null, tasks); } }); } } }); } function addFailedTaskToQueue(taskName, callback) { redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => { if (error) { callback(error); } callback(null, result); } }); } function removeSucceedTaskFromQueue(taskName, callback) { redisClient.zrem(QUEUE_NAME, taskName, (error, result) => { if (error) { callback(error); } callback(null, result); } } } function execTask(taskName) { return new Promise((resolve, reject) => { let requestOptions = { 'url': 'http://127.0.0.1:9001', 'method': 'GET', 'timeout': 5000 }; request(requestOptions, (error, response, body) => { if (error) { resolve('failed'); console.log(error); addFailedTaskToQueue(taskName, (error) => { if (error) { console.log(error); } } }); } try { body = typeof body !== 'object' ? JSON.parse(body) : body; } resolve('failed'); console.log(error); addFailedTaskToQueue(taskName, (error, result) => { if (error) { console.log(error); } } }); return; } if (body.status !== 200) { resolve('failed'); addFailedTaskToQueue(taskName, (error, result) => { if (error) { console.log(error); } } }); } resolve('succeed'); removeSucceedTaskFromQueue(taskName, (error, result) => { if (error) { console.log(error); } } }); } } }); }); } // Programar, cada 5 secundos obtener nuevas tareas para ejecutar let job = schedule.scheduleJob('*/5 * * * * *', () => { console.log('Obtener nuevas tareas'); getTasksFromQueue((error, tasks) => { if (error) { console.log(error); } if (tasks.length > 0) { console.log(tasks); Promise.all(tasks.map(execTask)) .then((results) => { console.log(results); } .catch((error) => { console.log(error); }); } } }); });