Beanstalkd 学习研究


Beanstalkd 学习研究


1. Beanstalkd 介绍

Beanstalkd是一个简单、高效的工作队列系统,其最初设计目的是通过后台异步执行耗时任务方式降低高容量Web应用的页面延时。而其简单、轻量、易用等特点,和对任务优先级(priority)任务延时(delay)任务超时重发(time-to-run)任务预留(buired)等控制,以及众多语言版本的客户端的良好支持,使其能够很好的支持分布式的后台任务和定时任务处理。

beanstalkd还提供了binlog机制,当重启beanstalkd,当前任务的状态能够从记录的本地binlog中恢复。

2. Beanstalkd 中的重要概念

2.1 核心概念

beanstalkd-architecture

Beanstalkd 使用 Producer-Consumer设计模式,无论是其协议结构还是使用方式都是类似Memcached风格的。以下是Beanstalkd设计思想中核心概念:

job - 任务

job是一个需要异步处理的处理,是 Beanstalkd中的基本单元,每个job都会有一个id和优先级,job需要放在一个tube中。 Beanstalkd中的任务(job)类似于其消息队列中的消息(message)的概念。

tube - 管道

管道即某一种类型的任务队列,其类似于消息的主题(topic),是ProducerConsumer的操作对象。一个Beanstalkd中可以有多个管道,每个管道都有自己的生产者(Producer)和消费者(Consumer),管道之间互相不影响。

producer - 生产者

任务(job)的生产者,通过put命令来将一个job放到一个tube中。

consumer - 消费者

任务(job)的消费者,通过reserve来获取job,通过deletereleasebury来改变job的状态。

2.2 任务生命周期

Beanstalkd中的任务(job)替代了消息(message)的概念,任务会有一系列状态。任务的生命周期如下:

beanstalkd-job-status

一个 Beanstalkd任务可能会包含以下状态:

  • READY - 需要立即处理的任务。当producter直接put一个任务时,任务就处理READY状态,以等待consumer来处理。当延时(DELAYED)任务到期后会自动成为当前READY状态的任务
  • RESERVED - 已经被消费者获取,正在执行的任务。当consumer获取了当前READY的任务后,该任务的状态就会迁移到RESERVED状态,这时其它的consumer就不能再操作该任务。Beanstalkd会检查任务是否在TTRtime-to-run)内完成
  • DELETED - 消息被删除,Beanstalkd不再维持这些消息。即任务生命周期结束
  • DELAYED - 延迟执行的任务。当任务被延时put时,任务就处理DELAYED状态。等待时间过后,任务会被迁移到READY状态。当消费者处理任务后,可以将任务再次放回DELAYED队列延迟执行
  • BURIED - 埋葬的任务,这时任务不会被执行,也不会消失。当consumer完成该任务后,可以选择deletereleasebury操作
    • delete后,任务被删除,生命周期结束
    • release操作可以把任务状态迁移回READY状态或DELAYED状态,使其它consumer可以继续获取和执行该任务
    • bury操作会埋葬任务,等需要该任务时,再将埋葬的任务kickREADY,也可以通过delete删除BURIED状态的任务

2.3 Beanstalkd特点

  • 任务优先级(priority)

任务(job)可以有0~2^32个优先级,0表示优先级最高。Beanstalkd采用最大最小堆(Minx-max heap)处理任务优先级排序,任何时刻调用reverse命令的消费者总是能拿到当前优先级最高的任务,时间复杂度为O(logn)

  • 任务延时(delay)

Beanstalkd中可以通过两种方式延时执行任务:生产者发布任务时指定延时;或者当任务处理完毕后,消费者再次将任务放入队列延时执行(release with delay)。这种机制可以实现分布式定时任务,这种任务机制的优势是:如果某个消费者节点故障,任务超时重发(time-to-run)以保证任务转移到其它节点执行

  • 任务超时重发(time-to-run)

Beanstalkd把任务返回给消费者后,消费者必须在预设的TTR(time-to-run)时间内发送delete、或release、或bury命令改变任务的状态;否则Beanstalkd会认为任务处理失败,然后把任务交给另外的消费者节点执行。如果消费者预计在TTR时间内无法完成任务,可以发送touch命令,以使Beanstalkd重新计算TTR

  • 任务预留(buried)

RESERVED状态的任务因为某些原因无法执行时,消费者可以使用bury命令将其设置为buried状态,这时Beanstalkd会继续保留这些任务。在具备任务执行条件时,再通过kick将任务迁移回READY状态

3. beanstalkd安装使用

Beanstalkd分为服务端和客户端两部分。可以在其官网查找相关安装包及安装方法:

3.1 服务端

源码安装

下载、解压并进入源码目录后,执行makemake install命令即可:

$ sudo make
// 或
$ sudo make install
// 或
$ sudo make install PERFIX=/usr/bin/beanstalkd

安装包安装

在Unbuntu或Debian系统中,可以使用以下命令安装:

$ sudo apt-get install beanstalkd

在CentOS或RHEL系统中,首先需要更新EPEL源,然后再使用yum命令安装。

小提示:更多关于EPEL的知识可以查阅下面的资料:

在RHEL6/CentOS6中使用以下命令更新源:

$ su -c 'rpm -Uvh http://download.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm'

在RHEL7中:

$ su -c 'rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-9.noarch.rpm'

检查EPEL源是否更新成功:

$ yum repolist enabled | grep epel
 * epel: mirrors.yun-idc.com
epel                  Extra Packages for Enterprise Linux 6 - i386        10,254

执行安装:

$ yum install -y beanstalkd

加入开启自启动:

$ chkconfig beanstalkd  on

添加用户组:

$ groupadd beanstalkd

添加用户:

$ useradd -M -g beanstalkd -s /sbin/nologin beanstalkd

创建binlog存放目录并修改所有者/所属组(有权限写入):

$ mkdir -p /data/beanstalkd/binlog/
$ chown -R beanstalkd:beanstalkd  /data/beanstalkd

修改配置文件中存放binlog的目录:

$ vi /etc/sysconfig/beanstalkd

BEANSTALKD_BINLOG_DIR=/data/beanstalkd/binlog

运行beanstalkd

Beanstalkd安装后,就可以通过beanstalkd命令来启动或配置Beanstalkd。该命令的使用格式如下:

beanstalkd [OPTIONS]

可选[OPTIONS]参数有:

  • -b DIR - wal目录(开启binlog,断电重启后会自动恢复任务)
  • -f MS - 指定MS毫秒内的 fsync (-f0 为"always fsync")
  • -F - 从不 fsync (默认)
  • -l ADDR - 指定监听地址(默认为:0.0.0.0)
  • -p PORT - 指定监听端口(默认为:11300)
  • -u USER - 用户与用户组
  • -z BYTE - 最大的任务大小(默认为:65535)
  • -s BYTE - 每个wal文件的大小(默认为:10485760)
  • -c - 压缩binlog(默认)
  • -n - 不压缩binlog
  • -v - 显示版本信息
  • -h - 显示帮助

我们使用nohup&来配合启动程序,这样能免疫 Ctrl-C发送的SIGINT信号和关闭session发送的SIGHUP信号

$ nohup beanstalkd -l 0.0.0.0 -p 11300 -b /data/beanstalkd/binlog/ -u beanstalkd &

3.2 客户端

客户端包含了Beanstalkd设计概念中的任务生产者(Producer)消费者(Consumer)。Beanstalkd有很多语言版本客户端的实现,点击Beanstalkd 客户端查找自已所需要的版本,如果都不能满足需要,还可以根据Beanstalkd 协议自行实现。

笔者日常工作中,接触PHP语言较多,以下用一个PHP版本的Beanstalkd 客户端:pda/pheanstalk为例,简单演示Beanstalkd的任务处理流程。

安装 pda/pheanstalk

$ composer require pda/pheanstalk

生产 job

创建一个 producer 来生产 job

producer.php

<?php

require_once('./vendor/autoload.php');

use Pheanstalk\Pheanstalk;

$pheanstalk  = new Pheanstalk('127.0.0.1', 11300);

$tubeName = 'syslog';

$jobData = [
    'type' => 'Debug',
    'level' => 3, // error log
    'content' => 'queue connect failed',
    'timestamp' => round(microtime(true) * 1000),
    'timeCreated' => date('Y-m-d H:i:s'),
];

$pheanstalk
    ->useTube($tubeName)
    ->put(json_encode($jobData));

运行 producer.php

$ php producer.php

消费 job

创建一个 consumer 来消费 job

consumer.php

<?php

if (PHP_SAPI !== 'cli') {
    echo 'Warning: should be invoked via the CLI version of PHP, not the '.PHP_SAPI.' SAPI'.PHP_EOL;
}

require_once('./vendor/autoload.php');

use Pheanstalk\Pheanstalk;

$pheanstalk  = new Pheanstalk('127.0.0.1', 11300);

$tubeName = 'syslog';

while (true) {
    // 从指定队列获取信息,reserve 阻塞获取
    $job = $pheanstalk->useTube($tubeName)->watch($tubeName)->ignore('default')->reserve(60);

    if ($job !== false) {
        // do stuff
        echo $data = $job->getData();
        // 处理完成,删除 job
        $pheanstalk->delete($job);
    }

    usleep(500000); // 0.5 s
}

运行 consumer.php

$ php consumer.php
{"type":"Debug","level":3,"content":"queue connect failed","timestamp":1576113851568,"timeCreated":"2019-12-12 01:24:11"}

可以使用deamontoolssupervisor等将php consumer.php变为常驻内存的进程。

监控 beanstalkd 状态

创建一个 heartbeat 来检查与服务器的连接状态

heartbeat.php

<?php
/**
 * 心跳检查脚本:定期在定时任务系统(crontab、MySQL Event Scheduler、Elastic-Job)上运行并收集与服务器连接状态信息,
 * 如果连接不是活的状态,则可以发送消息报警(sms、email)
 */

if (PHP_SAPI !== 'cli') {
    echo 'Warning: should be invoked via the CLI version of PHP, not the '.PHP_SAPI.' SAPI'.PHP_EOL;
}

require_once('./vendor/autoload.php');

use Pheanstalk\Pheanstalk;

$pheanstalk  = new Pheanstalk('127.0.0.1', 11300);

$isAlive = $pheanstalk->getConnection()->isServiceListening();

var_dump($isAlive);

运行 heartbeat.php

$ php heartbeat.php
bool(true)

4. beanstalkd 管理工具

Tools:https://github.com/beanstalkd/beanstalkd/wiki/Tools

笔者经常使用的两款工具:

web 界面:https://github.com/ptrofimov/beanstalk_console

命令行:https://github.com/src-d/beanstool

对 beanstalkd 的操作也可以使用telnet,比如 telnet 127.0.0.1 11300。然后便可以执行 beanstalkd 的各命令,如 stats 查看信息,use, put, watch 等等。

telnet对beanstalkd的操作:

$ telnet 127.0.0.1 11300
stats
OK 929
---
current-jobs-urgent: 0
current-jobs-ready: 0
current-jobs-reserved: 0
current-jobs-delayed: 0
current-jobs-buried: 0
...

list-tubes
OK 23
---
- default
- syslog

5. Beanstalkd 使用总结

  • 如果需要对job持久化的需要,在启动beanstalkd时可以使用-b参数来开启binlog(二进制日志), 通过binlog可以将job及其状态记录到文件里,如果断电,则可以使用相同的选项重新启动beanstalkd,它将读取binlog来恢复之前的job及状态

  • put前先要use tube xxxtube,这样put的时候就会把job放到指定名称的tube中,否则会放到一个defaulttube

  • reservereserve-with-timeout前先要watch xxxtube,可以同时监控多个tube,这样可以同时取几个队列的任务。但是,千万要小心,如果在一个进程中,不小心watch到了多个tube,那么有时候会取错任务,一般取job的步骤为:useTube xxxtube -> watch xxxtube -> ignore default -> reserve

  • job处理完成,应该delete删除掉,或者release再放回队列,或者bury把它埋葬掉,这个取决于你的设计

6. Beanstalkd 不足

  • 无最大内存控制,如果有消息堆积或者业务使用方式有误,而导致内存暴涨拖垮机器

  • Memcached类似,没有master-slave故障切换机制,需要自己解决单点问题

7. 参考资料


文章作者: 张权
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 张权 !
评论
  目录