You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
			
				
					243 lines
				
				7.7 KiB
			
		
		
			
		
	
	
					243 lines
				
				7.7 KiB
			| 
								 
											3 years ago
										 
									 | 
							
								<?php
							 | 
						||
| 
								 | 
							
								/**
							 | 
						||
| 
								 | 
							
								 * 队列进程基类--生成子进程
							 | 
						||
| 
								 | 
							
								 *
							 | 
						||
| 
								 | 
							
								 * @version 2.0 - 2011-11-25
							 | 
						||
| 
								 | 
							
								 * @package Daemon
							 | 
						||
| 
								 | 
							
								 */
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								set_time_limit(0);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								include_once(dirname(dirname(dirname(__FILE__)))."/library/publicBase.php");
							 | 
						||
| 
								 | 
							
								abstract class dealBase extends publicBase {
							 | 
						||
| 
								 | 
							
								    protected $processnum = 0;      // 程序限制进程数,默认为1
							 | 
						||
| 
								 | 
							
								    protected $is_while = true;     // 是否无限循环,默认为是
							 | 
						||
| 
								 | 
							
								    protected $deal_sleep = 1;      // 每执行一次后等待的时间,单位为妙,默认为1
							 | 
						||
| 
								 | 
							
								    protected $proc_code;             // 当前队列自定义进程数号码标识
							 | 
						||
| 
								 | 
							
								    protected $server_ip;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    private $path_conf;             // 配置文件路径
							 | 
						||
| 
								 | 
							
								    private $path_proc;             // 进程状态路径
							 | 
						||
| 
								 | 
							
								    private $conf_md5;              // 配置文件MD5
							 | 
						||
| 
								 | 
							
								    private $deal_md5;              // 执行队列文件MD5
							 | 
						||
| 
								 | 
							
								    private $base_md5;              // 基础类文件MD5
							 | 
						||
| 
								 | 
							
								    private $file_deal;             // 队列文件路径
							 | 
						||
| 
								 | 
							
								    private $path_deal;             // 队列文件路径(包含标识)
							 | 
						||
| 
								 | 
							
								    private $deal_flag='';          // 队列类别标识
							 | 
						||
| 
								 | 
							
								    private $close_path;            // 开关标识文件
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    private $restart_server;     // 重启服务
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    public function __construct() {
							 | 
						||
| 
								 | 
							
								        $this->path_conf = dirname(dirname(__FILE__))."/config/daemonconf.php";
							 | 
						||
| 
								 | 
							
								        $this->path_proc = dirname(dirname(dirname(__FILE__))).'/queue/cache/proc/deal_%s.txt';
							 | 
						||
| 
								 | 
							
								        $this->close_path = dirname(dirname(__FILE__)).'/cache/proc/close.txt';
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        $this->file_deal = $_SERVER['SCRIPT_FILENAME'];
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        // 识别队列自定义进程数号码标识
							 | 
						||
| 
								 | 
							
								        if(count($_SERVER['argv'])==3) {
							 | 
						||
| 
								 | 
							
								            $this->deal_flag = $_SERVER['argv'][1];
							 | 
						||
| 
								 | 
							
								            $this->proc_code = $_SERVER['argv'][2];
							 | 
						||
| 
								 | 
							
								        } else {
							 | 
						||
| 
								 | 
							
								            $this->proc_code = $_SERVER['argv'][1];
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        $this->path_deal = trim($this->file_deal.' '.$this->deal_flag);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        include_once($this->path_conf);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        $this->server_ip = $this->getServerIp();
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        $this->setPara();
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        $this->collectVerifyMd5();
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        // 循环达到一定次数,重置最大进程数
							 | 
						||
| 
								 | 
							
								        $i=0;
							 | 
						||
| 
								 | 
							
								        do {
							 | 
						||
| 
								 | 
							
								            if($i == GET_DAEMON_MAXNUM_CYCLE_TIMES) $i=0;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            if($i == 0) {
							 | 
						||
| 
								 | 
							
								                $obj = new mDaemon();
							 | 
						||
| 
								 | 
							
								                $this->processnum = $obj->getDaemonNum($this->server_ip, $this->deal_flag);
							 | 
						||
| 
								 | 
							
								            }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            $this->setProcHealth();
							 | 
						||
| 
								 | 
							
								            if(file_exists($this->close_path)) {
							 | 
						||
| 
								 | 
							
								                sleep(10);
							 | 
						||
| 
								 | 
							
								                return ;
							 | 
						||
| 
								 | 
							
								            }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            $this->checkVerifyMd5();
							 | 
						||
| 
								 | 
							
								            if(!$this->cProcessNum()) exit("Process limit <{$this->processnum}>\n");  // 每次循环执行任务调用ps系统命令导致cpu使用过高
							 | 
						||
| 
								 | 
							
								            $this->deal();
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            sleep($this->deal_sleep);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            $i++;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        } while($this->is_while);
							 | 
						||
| 
								 | 
							
								        return;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /* 抽象方法,子类必须实现,负责设置daemon运行所必须的属性 */
							 | 
						||
| 
								 | 
							
								    abstract function setPara();
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /* 抽象方法,子类必须实现,负责每条日志处理的过程 */
							 | 
						||
| 
								 | 
							
								    abstract function deal();
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /* 收集文件MD5 */
							 | 
						||
| 
								 | 
							
								    private function collectVerifyMd5() {
							 | 
						||
| 
								 | 
							
								        $this->conf_md5 = md5_file($this->path_conf);
							 | 
						||
| 
								 | 
							
								        $this->deal_md5 = md5_file($this->file_deal);
							 | 
						||
| 
								 | 
							
								        $this->base_md5 = md5_file(__FILE__);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        $obj = new mBase();
							 | 
						||
| 
								 | 
							
								        $rdobj = $obj->initRedis();
							 | 
						||
| 
								 | 
							
								        $this->restart_server = $rdobj->get(sprintf(_RD_RESTART_SERVER, $this->server_ip));
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /* 校验三个文件是否修改,如修改,退出 */
							 | 
						||
| 
								 | 
							
								    private function checkVerifyMd5() {
							 | 
						||
| 
								 | 
							
								        if($this->conf_md5 != md5_file($this->path_conf)) exit("daemon conf file modified.\n");
							 | 
						||
| 
								 | 
							
								        if($this->deal_md5 != md5_file($this->file_deal)) exit("deal file modified.\n");
							 | 
						||
| 
								 | 
							
								        if($this->base_md5 != md5_file(__FILE__)) exit("base class file modified.\n");
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        $obj = new mBase();
							 | 
						||
| 
								 | 
							
								        $rdobj = $obj->initRedis();
							 | 
						||
| 
								 | 
							
								        $restart_server = $rdobj->get(sprintf(_RD_RESTART_SERVER, $this->server_ip));
							 | 
						||
| 
								 | 
							
								        if($this->restart_server != $restart_server) exit("restart server.\n");
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * 设置进程健康状态
							 | 
						||
| 
								 | 
							
								     *
							 | 
						||
| 
								 | 
							
								     * @return	boolean
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    protected function setProcHealth() {
							 | 
						||
| 
								 | 
							
								        $obj = new mBase();
							 | 
						||
| 
								 | 
							
								        $rdobj = $obj->initRedis();
							 | 
						||
| 
								 | 
							
								        $res = $rdobj->set(sprintf(_DAEMON_HEALTH, $this->server_ip, $this->deal_flag, $this->proc_code), time());
							 | 
						||
| 
								 | 
							
								        return true;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * 进程上限限制
							 | 
						||
| 
								 | 
							
								     *
							 | 
						||
| 
								 | 
							
								     * @return	boolean
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    private function cProcessNum() {
							 | 
						||
| 
								 | 
							
								        if($this->proc_code+1>$this->processnum) {
							 | 
						||
| 
								 | 
							
								            $obj = new mBase();
							 | 
						||
| 
								 | 
							
								            $rdobj = $obj->initRedis();
							 | 
						||
| 
								 | 
							
								            $is_exist = $rdobj->get(sprintf(_RD_UP_DOWN, $this->server_ip, $this->deal_flag, $this->proc_code));
							 | 
						||
| 
								 | 
							
								            if($is_exist) return true;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            //error_log('['.date('Y-m-d H:i:s').']'.$this->proc_code."|procnum:{$this->processnum}\n", 3, dirname(__FILE__).'/deal_track.log');
							 | 
						||
| 
								 | 
							
								            return false;
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								        return true;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * 跟踪日志
							 | 
						||
| 
								 | 
							
								     * @param int $type
							 | 
						||
| 
								 | 
							
								     * @param string $log
							 | 
						||
| 
								 | 
							
								     * @return boolean
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    public function trackLog($type, $log, $log_path='') {
							 | 
						||
| 
								 | 
							
								        if(empty($log_path)) $log_path = sprintf(LOG_TRACK_SAVE_PATH, date('Y-m-d'), $type);
							 | 
						||
| 
								 | 
							
								        $log_dir = dirname($log_path);
							 | 
						||
| 
								 | 
							
								        $isfirst = false;
							 | 
						||
| 
								 | 
							
								        if(!is_dir($log_dir)) {
							 | 
						||
| 
								 | 
							
								            mkdir($log_dir, 0775, true);
							 | 
						||
| 
								 | 
							
								            chown($log_dir, 'nobody');
							 | 
						||
| 
								 | 
							
								            chgrp($log_dir, 'nobody');
							 | 
						||
| 
								 | 
							
								            $isfirst = true;
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        error_log(date('H:i:s').'|'.$log."\r\n", 3, $log_path);
							 | 
						||
| 
								 | 
							
								        chown($log_path, 'nobody');
							 | 
						||
| 
								 | 
							
								        chgrp($log_path, 'nobody');
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        return true;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * 获取上传/下载任务
							 | 
						||
| 
								 | 
							
								     * @param unknown $task_key
							 | 
						||
| 
								 | 
							
								     * @param unknown $auto_type   // 0上传 1下载
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    public function getTask($task_key, $auto_type=0) {
							 | 
						||
| 
								 | 
							
								        $obj = new mBase();
							 | 
						||
| 
								 | 
							
								        $robj = $obj->initRedis();
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        // 该进程是否存在上次未处理完的任务
							 | 
						||
| 
								 | 
							
								        $proc_key = sprintf(_RD_UP_DOWN, $this->server_ip, $this->deal_flag, $this->proc_code);
							 | 
						||
| 
								 | 
							
								        $sale_id = $robj->get($proc_key);
							 | 
						||
| 
								 | 
							
								        if($sale_id==false) {
							 | 
						||
| 
								 | 
							
								            // 不存在未处理完的任务, 从队列中取
							 | 
						||
| 
								 | 
							
								            $sale_id = $robj->rpop($task_key);
							 | 
						||
| 
								 | 
							
								            if($sale_id === false) return false;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            $res = $robj->set($proc_key, $sale_id);
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        // key saleid
							 | 
						||
| 
								 | 
							
								        // value 进程key
							 | 
						||
| 
								 | 
							
								        // 用于 重新上传 重新下载时 把原占用进程删除, 防止原进程一直在占用这个saleid
							 | 
						||
| 
								 | 
							
								        $active_key = sprintf(_RD_UPLOAD_ACTIVE_PROC, $sale_id);
							 | 
						||
| 
								 | 
							
								        if($auto_type==AUTO_CHECK_DOWN) $active_key = sprintf(_RD_DOWN_ACTIVE_PROC, $sale_id);
							 | 
						||
| 
								 | 
							
								        $this->setProcKey($active_key, $proc_key, 24*60*60);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        return $sale_id;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * 重新放入任务队列
							 | 
						||
| 
								 | 
							
								     * @param unknown $key
							 | 
						||
| 
								 | 
							
								     * @param unknown $val
							 | 
						||
| 
								 | 
							
								     * @param string $proc_key
							 | 
						||
| 
								 | 
							
								     * @return boolean
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    public function lpushToList($key, $val, $proc_key='') {
							 | 
						||
| 
								 | 
							
								        $obj = new mBase();
							 | 
						||
| 
								 | 
							
								        $robj = $obj->initRedis();
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        $robj->lpush($key, $val);
							 | 
						||
| 
								 | 
							
								        if($proc_key) $this->delProcKey($proc_key);
							 | 
						||
| 
								 | 
							
								        return true;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * 删除进程key
							 | 
						||
| 
								 | 
							
								     * @param unknown $proc_key
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    public function delProcKey($proc_key) {
							 | 
						||
| 
								 | 
							
								        $obj = new mBase();
							 | 
						||
| 
								 | 
							
								        $robj = $obj->initRedis();
							 | 
						||
| 
								 | 
							
								        $robj->del($proc_key);
							 | 
						||
| 
								 | 
							
								        return true;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * 设置进程key
							 | 
						||
| 
								 | 
							
								     * @param unknown $key
							 | 
						||
| 
								 | 
							
								     * @param unknown $val
							 | 
						||
| 
								 | 
							
								     * @param number $expire_time
							 | 
						||
| 
								 | 
							
								     * @return boolean
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    public function setProcKey($key, $val, $expire_time=0) {
							 | 
						||
| 
								 | 
							
								        $obj = new mBase();
							 | 
						||
| 
								 | 
							
								        $robj = $obj->initRedis();
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        $robj->set($key, $val);
							 | 
						||
| 
								 | 
							
								        if($expire_time>0) $robj->expire($key, $expire_time);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        return true;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 |