首页后端开发PHP教你使用mixphp打造多进程异步邮件发送

教你使用mixphp打造多进程异步邮件发送

时间2024-02-02 10:33:03发布访客分类PHP浏览147
导读:收集整理的这篇文章主要介绍了教你使用mixphp打造多进程异步邮件发送,觉得挺不错的,现在分享给大家,也给大家做个参考。注意:这个是 MixpHP V1 的范例邮件发送是很常见的需求,由于发送邮件的操作一般是比较耗时的,所以我们一般采用异步...
收集整理的这篇文章主要介绍了教你使用mixphp打造多进程异步邮件发送,觉得挺不错的,现在分享给大家,也给大家做个参考。注意:这个是 MixpHP V1 的范例

邮件发送是很常见的需求,由于发送邮件的操作一般是比较耗时的,所以我们一般采用异步处理来提升用户体验,而异步通常我们使用消息队列来实现。

传统 MVC 框架由于缺少多进程开发能力,通常是采用同一个脚本执行多次,产生多个进程的方式,mixphp 封装了 TaskExecutor 专用于多进程开发,用户能非常简单的开发出功能完善的高可用多进程应用。

推荐:《PHP视频教程》

下面演示一个异步邮件发送系统的开发过程,涉及知识点:

  • 异步
  • 消息队列
  • 多进程
  • 守护进程

如何使用消息队列实现异步

PHP 使用消息队列通常是使用中间件来实现,常用的消息中间件有:

  • redis
  • rabbITmq
  • kafka

本次我们选用 redis 来实现异步邮件发送,redis 的数据类型中有一个 list 类型,可实现消息队列,使用以下命令:

// 入列$redis->
    lpush($key, $data);
    // 出列$data = $redis->
    rpop($key);
    // 阻塞出列$data = $redis->
    brpop($key, 10);
    

架构设计

本实例由传统 MVC 框架投递邮件发送需求,MixPHP 多进程执行发送任务。

邮件发送库选型

以往我们通常使用框架提供的邮件发送库,或者网上下载别的用户分享的库,composer 出现后,https://packagist.org/ 上有大量优质的库,我们只需选择一个最好的即可,本例选择 swiftmailer。

由于发送任务是由 MixPHP 执行,所以 swiftmailer 是安装在 MixPHP 项目中,在项目根目录中执行以下命令安装:

composer require swiftmailer/swiftmailer

生产者开发

在邮件发送这个需求中生产者是指投递发送任务的一方,这一方通常是一个接口或网页,这个部分并不一定需 mixphp 开发,TP、CI、YII 这些都可以,只需在接口或网页中把任务信息投递到消息队列中即可。

在传统 MVC 框架的控制器中增加如下代码:

通常框架中使用 redis 会安装一个类库来使用,本例使用原生代码,便于理解。
// 连接$redis = new \Redis();
    if (!$redis->
connect('127.0.0.1', 6379)) {
        throw new \Exception('Redis Connect Failure');
}
    $redis->
    auth('');
    $redis->
    select(0);
    // 投递任务$data = [    'to'      =>
     ['***@QQ.COM' =>
     'A name'],    'body'    =>
     'Here is the message itself',    'subject' =>
     'The title content',];
    $redis->
    lpush('queue:email', serialize($data));
    

通常异步开发中,投递完成后就会立即响应一个消息给用户,当然此时该任务并没有执行。

消费者开发

本例我们使用 MixPHP 的多进程开发工具 TaskExecutor 来完成这个需求,通常使用常驻进程来处理队列的消费,所以我们使用 TaskExecutor 的 TYPE_DAEMON 类型,MODE_PUSH 模式。

TaskExecutor 的 MODE_PUSH 模式有二种进程:

  • 左进程:负责从消息队列取出任务数据,投放给中进程。

  • 中进程:负责执行邮件发送任务。

PushCommand.php 代码如下:

?phpnamespace apps\daemon\commands;
    use mix\console\ExitCode;
    use mix\faCADes\Input;
    use mix\facades\Redis;
    use mix\task\centerPRocess;
    use mix\task\LeftProcess;
    use mix\task\TaskExecutor;
    /** * 推送模式范例 * @author 刘健 coder.liu@qq.com>
 */class PushCommand extends BaseCommand{
        // 配置信息    const HOST = 'smtpdm.aliyun.com';
        const PORT = 465;
        const SECURITY = 'ssl';
        const USERNAME = '****@email.***.com';
        const PASSWORD = '****';
    // 初始化事件    public function onInitialize()    {
            parent::onInitialize();
     // TODO: Change the autogenerated stub        // 获取程序名称        $this->
    programName = Input::getCommandName();
            // 设置piDFile        $this->
pidFile = "/VAR/run/{
    $this->
programName}
    .pid";
    }
    /**     * 获取服务     * @return TaskExecutor     */    public function getTaskService()    {
            return create_object(            [                // 类路径                'class'         =>
     'mix\task\TaskExecutor',                // 服务名称                'name'          =>
 "mix-daemon: {
    $this->
programName}
    ",                // 执行类型                'type'          =>
     \mix\task\TaskExecutor::TYPE_DAEMON,                // 执行模式                'mode'          =>
     \mix\task\TaskExecutor::MODE_PUSH,                // 左进程数                'leftProcess'   =>
     1,                // 中进程数                'centerProcess' =>
     5,                // 任务超时时间 (秒)                'timeout'       =>
     5,            ]        );
    }
    // 启动    public function actionStart()    {
        // 预处理        if (!parent::actionStart()) {
                return ExitCode::UNSPECIFIED_ERROR;
        }
            // 启动服务        $service = $this->
    getTaskService();
            $service->
    on('LeftStart', [$this, 'onLeftStart']);
            $service->
    on('CenterStart', [$this, 'onCenterStart']);
            $service->
    start();
            // 返回退出码        return ExitCode::OK;
    }
    // 左进程启动事件回调函数    public function onLeftStart(LeftProcess $worker)    {
        try {
                // 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线            $queueModel = Redis::getInstance();
                // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出            for ($j = 0;
     $j  16000;
 $j++) {
                    // 从消息队列中间件阻塞获取一条消息                $data = $queueModel->
    brpop('queue:email', 10);
                if (empty($data)) {
                        continue;
                }
                    list(, $data) = $data;
                    // 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.htML)                $worker->
    push($data, false);
            }
        }
 catch (\Exception $e) {
                // 休息一会,避免 CPU 出现 100%            sleep(1);
                // 抛出错误            throw $e;
        }
    }
    // 中进程启动事件回调函数    public function onCenterStart(CenterProcess $worker)    {
            // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出        for ($j = 0;
     $j  16000;
 $j++) {
                // 从进程消息队列中抢占一条消息            $data = $worker->
    pop();
            if (empty($data)) {
                    continue;
            }
            // 处理消息            try {
                    // 处理消息,比如:发送短信、发送邮件、微信推送                var_dump($data);
                    $ret = self::sendEmail($data);
                    var_dump($ret);
            }
 catch (\Exception $e) {
                    // 回退数据到消息队列                $worker->
    rollback($data);
                    // 休息一会,避免 CPU 出现 100%                sleep(1);
                    // 抛出错误                throw $e;
            }
        }
    }
    // 发送邮件    public static function sendEmail($data)    {
            // Create the Transport        $transport = (new \Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY))            ->
    setUsername(self::USERNAME)            ->
    setPassword(self::PASSWORD);
            // Create the Mailer using your created Transport        $mailer = new \Swift_Mailer($transport);
            // Create a message        $message = (new \Swift_Message($data['subject']))            ->
    setFrom([self::USERNAME =>
     '**网'])            ->
    setTo($data['to'])            ->
    setBody($data['body']);
            // Send the message        $result = $mailer->
    send($message);
            return $result;
    }
}
    

测试

1.在 shell 中启动 push 常驻程序。
[root@localhost bin]# ./mix-daemon push startmix-daemon 'push' start successed.
1.调用接口往消息队列投放任务。

此时 shell 终端将打印:

成功收到测试邮件:

MixPHP

GitHub: https://github.com/mix-php/mix
官网:http://www.mixphp.cn/

以上就是教你使用Mixphp打造多进程异步邮件发送的详细内容,更多请关注其它相关文章!

声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!


若转载请注明出处: 教你使用mixphp打造多进程异步邮件发送
本文地址: https://pptw.com/jishu/596725.html
值得一看!高级PHP工程师必备的编码技巧及思维 详解php命令行写shell实例

游客 回复需填写必要信息