一个简单的Promise多线程队列实现

打算搞一个把多个 git 仓库合并成一个独立仓库的轮子,为了提高速度,需要同时拉取多个仓库。

node 提供了很方便的子进程库,在新版本中也有 promisify 接口require('util').promisify,不过好像并没有多线程队列。

然后网上找了下现成的库也不怎么好使,要么没反应要么调用有问题,于是干脆自己实现一个好了。

基本思路是搞几个 Promise 塞到一个数组里,Promise 的数量就算线程数,然后接收一个队列,从队列里 pop 出一个 promise,await 完之后继续递归。

不是很复杂,直接上代码好了:

const os = require('os')

function PromiseQueue(queue = [], threadnum = os.cpus.length || 4) {
	return new Promise(onReturn => {
		const thread = function(threadId) {
			return new Promise(resolve => {
				const onFinal = function() {
					if (threads[threadId] instanceof Promise) {
						threads[threadId] = thread(threadId)
					}
					resolve()
				}
				if (queue.length > 0) {
					let func = queue.pop()
					if (typeof func === 'function') {
						let promise = func()
						if (promise instanceof Promise) {
							return promise.then(onFinal)
						}
					}
					onFinal()
				} else onReturn()
			})
		}
		let threads = Array(threadnum)
			.fill(thread)
			.map((callback, i) => callback(i))
	})
}
js

Demo:

function timer() {
	return new Promise(resolve => {
		setTimeout(() => {
			console.log(Date.now())
			resolve()
		}, 500)
	})
}

PromiseQueue(Array(20).fill(timer)).then(() => {
	PromiseQueue(Array(20).fill(timer))
})
js

用了一段时间后发现还有优化空间,于是加上了错误重试和 promise 返回值处理(高亮行):

function PromiseQueue(queue = [], threadnum = os.cpus.length || 4) {
	return new Promise(onReturn => {
		let result = []
		const thread = function(threadId) {
			return new Promise(resolve => {
				const onFinal = function(ret) {
					if (threads[threadId] instanceof Promise) {
						threads[threadId] = thread(threadId)
					}
					if (ret) result.push(ret)
					resolve()
				}
				if (queue.length > 0) {
					let func = queue.pop()
					if (typeof func === 'function' && (typeof func.retry !== 'number' || func.retry < 3)) {
						let promise = func()
						if (promise instanceof Promise) {
							return promise.then(onFinal).catch(err => {
								func.errmsg = func.errmsg || []
								func.errmsg.push(err)
								func.retry = (Number(func.retry) | 0) + 1
								queue.push(func)
								onFinal()
							})
						}
					}
					if (Array.isArray(func.errmsg)) {
						func.errmsg.forEach(err => console.log(err))
					}
					onFinal()
				} else onReturn(result)
			})
		}
		let threads = Array(threadnum)
			.fill(thread)
			.map((callback, i) => callback(i))
	})
}
js