PHP 实现 企业级 异步多任务模板消息任务推送

实现功能点

1:异步发送,添加端只管添加,不需要考虑发送
2:定时定点,误差不超过5s(任务多会排队)
3:保证每条消息发送成功(尝试N次后自动丢弃)
4:可设置多线程
5:保证不重复发送

实现原理
crontab(定时执行) +php(实现推送)+mysql(消息存储)

步骤
1:创建消息表,保存消息(友情提示 注意索引)

CREATE TABLE `sent_tpl_task` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
   `tpl_id` varchar(64) NOT NULL DEFAULT '', COMMENT '模板消息id',
  `data` varchar(2048) NOT NULL DEFAULT '', COMMENT '模板消息json',
  `send_time` int(11) NOT NULL DEFAULT '0',
  `status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '0 删除 1待发送 2已成功 3发送失败',
  `do_times` tinyint(4) NOT NULL DEFAULT '0',
  `err_msg` varchar(255) NOT NULL DEFAULT '',
  `remark` varchar(255) NOT NULL DEFAULT '' COMMENT '备注',
  PRIMARY KEY (`id`),
  KEY `dt_st_s` (`do_times`,`send_time`,`status`) USING BTREE,
  KEY `tpl_id` (`tpl_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

2:PHP MinuteTask(定时任务入口)

<?php
/**
 * Created by PhpStorm.
 * User: 05
 * Date: 2019/7/12
 * 此文件由Crontab每分钟执行一次
 */

$day=date('d');
$h = date('H');
$min = date('i');

$host=$_SERVER['HTTP_HOST'];
if (!empty($host)){
    return ;//外部访问 直接返回
}


$host='http://www.test'; //Task 项目域名

doTplTask($host);//每分钟执行

//每10分钟执行
if ($min%10==0){
//TOOD other
}

//这里为了保证所有任务都能执行,使用http 非阻塞访问task,task 应该直接返回true 后在执行任务

function doTplTask($host){
    $token="m_token";//自定义token,防止攻击
    $url =$host.'/api/task/doTempleMsgTask?m_token='.$token;
    $task_count=1;//大于0 并发大要考虑带宽
    for ($i=0;$i<$task_count;$i++){
        httpsPost($url);
    }

}


//使用http 请求
function httpsPost($url, $data = null)
{
    $curl = curl_init();
    curl_setopt($curl, CURLOPT_URL, $url);
    curl_setopt($curl, CURLOPT_SSL_VERIFYPEER, FALSE);
    curl_setopt($curl, CURLOPT_SSL_VERIFYHOST, FALSE);
    if (!empty($data)) {
        curl_setopt($curl, CURLOPT_POST, 1);
        curl_setopt($curl, CURLOPT_POSTFIELDS, $data);
    }

    curl_setopt($curl, CURLOPT_RETURNTRANSFER, 1);
    $output = curl_exec($curl);
    curl_close($curl);

    return $output;
}

Task 处理

<?php

namespace app\Api\controller;
use think\Controller;
use think\Exception;

class Task extends Controller {

	//处理消息
    public function doTempleMsgTask(){
        echo true;
        fastcgi_finish_request();//直接返回成功 保证上游任务调用的实时完成

		//token 验证,防止外部攻击
        $token=input("m_token");
        if (empty($token)){
        	//自行验证token
            exit();
        }


        $t=time();
        $wxmsg = load_wechat('message'); //加载微信模板消息类库
        
        //进入工作模式 
        for (;;){
            try{
				//每条进程最多执行一分钟 自动结束,等带下一个进程
                if (time()-$t>59){
                    exit("time over");
                }
				
				//这里必须开启事务,不然多任务时可能拿到同一个消息
                model("TplTask")->startTrans();

				//这里获取消息,并发高数据量大注意建立索引 或考虑使用redis 代替(注意持久化) 
                $task=model("TplTask")
                       ->field("id")  //只取主键,提高并发
                       //条件为状态 未发送 and 发送时间<= 当前时间,执行次数小于规定次数
                    ->where(array('status'=>1,'send_time'=>array('elt',time(),'do_times'=>array('lt',1))))
                    //排序,执行次数,发送时间
                    ->order('do_times,send_time')
                	//必须加锁写锁,防止并发重复读取
                    ->lock(true)
                    ->find();
                    
				//任务表为空 则 休息5秒,防止频繁操作空数据库
                if (empty($task)){
                    model("TplTask")->commit();
                    sleep(5);
                    continue;
                }
                
                //根据id 修改状态,直接提交事务,减少事务时间
                model("TplTask")->where(array('id'=>$task['id']))->update(array('do_times'=>1,'status'=>2));
                model("TplTask")->commit();
				
				//根据id 找到数据
				   $task=model("TplTask")
                    ->where(array('id'=>$task['id']))
                    ->find();
				//发送数据
                $data=json_decode($task['data'],1);
                $result=$wxmsg->sendTemplateMessage($data);
                if ($result['errcode']!=0){
               		 //发送失败后修改状态跟次数,并记录错误信息
                    model("TplTask")->where(array('id'=>$task['id']))->update(array('status'=>3,'err_msg'=>$result['errcode']."|".$result['errmsg']));
                }

            }catch (Exception $e){

            }
        }
    }

}

3:crontab 执行

* * * * * php /home/wwwroot/Test/MinuteTask.php

更多推荐

PHP 实现异步定时多任务消息推送(企业实用级)