如果有仔细看过 swoole task 的文档的话,应该都会注意到这句话

task 操作的次数必须小于onTask处理速度,如果投递容量超过处理能力,task会塞满缓存区,导致worker进程发生阻塞。worker进程将无法接收新的请求

task 如果阻塞会引发 woker 进程阻塞,造成服务无法工作,引发问题。

我曾经使用 task 发送服务的链路日志,接收日志的服务出现bug,造成发送日志的 task 阻塞,然后服务 gg 的情况,之后我就对 task 做了一波优化。

思路就是使用 swoole channel (https://wiki.swoole.com/wiki/page/647.html)和 swoole user process (https://wiki.swoole.com/wiki/page/390.html)实现一套 task 。

使用 channel 接收数据,然后在 user process 消费数据,假如 channel 满了仅仅会造成 push 数据失败,并不会引发阻塞,因为是链路日志,是允许丢失的,所以这个方案完全没问题。

在swoole user process 消费 channel 的策略的伪代码如下

$sleepTime = 5;
$maxSleepTime = 100;
while (true) {
    $task = $chan->pop();
    if ($task === false) {
        $sleepTime = $sleepTime + 5;
        if ($sleepTime > $maxSleepTime) {
            $sleepTime = $maxSleepTime;
        }
        usleep($sleepTime * 1000);
        continue;
    }
    $sleepTime = 0;
    // 处理数据
}

如果消费到channel的数据,就使用死循环处理数据,因为处理数据过程中是有其他操作的,所以并不会占用大量 cpu。

如果消费不到数据,就 sleep 5ms,sleep的时间依次累加,直到达到最大值 100ms,达到 cpu 使用率和处理数据实时性的一个平衡,具体平衡点可以根据自己的业务按需调整。

另外,还有使用channel还有一个小坑,往 channel 发送数据使用的 swoole 进程通信机制,swoole 的进程通信有个点需要注意一下,那就是超过8k的数据会在磁盘产生临时文件,而且这个临时文件 swoole 自己并不会清理,因为这个问题,测试环境的磁盘被这些临时文件打爆过。

怎么处理这个问题呢?我的思路就是使用gzip压缩一下数据,这样可以解决在我的应用场景中超过8k的数据,但是如果在其他场景下,即使处理过数据依然超过8k怎么办呢?写个脚本,定时删吧,好像也没很好的方法。