<?php /** * 队列进程基类--生成子进程 * * @version 2.0 - 2011-11-25 * @package Daemon */ set_time_limit(0); include_once(dirname(dirname(dirname(__FILE__)))."/library/publicBase.php"); abstract class dealBase extends publicBase { protected $processnum = 0; // 程序限制进程数,默认为1 protected $is_while = true; // 是否无限循环,默认为是 protected $deal_sleep = 1; // 每执行一次后等待的时间,单位为妙,默认为1 protected $proc_code; // 当前队列自定义进程数号码标识 protected $server_ip; private $path_conf; // 配置文件路径 private $path_proc; // 进程状态路径 private $conf_md5; // 配置文件MD5 private $deal_md5; // 执行队列文件MD5 private $base_md5; // 基础类文件MD5 private $file_deal; // 队列文件路径 private $path_deal; // 队列文件路径(包含标识) private $deal_flag=''; // 队列类别标识 private $close_path; // 开关标识文件 private $restart_server; // 重启服务 public function __construct() { $this->path_conf = dirname(dirname(__FILE__))."/config/daemonconf.php"; $this->path_proc = dirname(dirname(dirname(__FILE__))).'/queue/cache/proc/deal_%s.txt'; $this->close_path = dirname(dirname(__FILE__)).'/cache/proc/close.txt'; $this->file_deal = $_SERVER['SCRIPT_FILENAME']; // 识别队列自定义进程数号码标识 if(count($_SERVER['argv'])==3) { $this->deal_flag = $_SERVER['argv'][1]; $this->proc_code = $_SERVER['argv'][2]; } else { $this->proc_code = $_SERVER['argv'][1]; } $this->path_deal = trim($this->file_deal.' '.$this->deal_flag); include_once($this->path_conf); $this->server_ip = $this->getServerIp(); $this->setPara(); $this->collectVerifyMd5(); // 循环达到一定次数,重置最大进程数 $i=0; do { if($i == GET_DAEMON_MAXNUM_CYCLE_TIMES) $i=0; if($i == 0) { $obj = new mDaemon(); $this->processnum = $obj->getDaemonNum($this->server_ip, $this->deal_flag); } $this->setProcHealth(); if(file_exists($this->close_path)) { sleep(10); return ; } $this->checkVerifyMd5(); if(!$this->cProcessNum()) exit("Process limit <{$this->processnum}>\n"); // 每次循环执行任务调用ps系统命令导致cpu使用过高 $this->deal(); sleep($this->deal_sleep); $i++; } while($this->is_while); return; } /* 抽象方法,子类必须实现,负责设置daemon运行所必须的属性 */ abstract function setPara(); /* 抽象方法,子类必须实现,负责每条日志处理的过程 */ abstract function deal(); /* 收集文件MD5 */ private function collectVerifyMd5() { $this->conf_md5 = md5_file($this->path_conf); $this->deal_md5 = md5_file($this->file_deal); $this->base_md5 = md5_file(__FILE__); $obj = new mBase(); $rdobj = $obj->initRedis(); $this->restart_server = $rdobj->get(sprintf(_RD_RESTART_SERVER, $this->server_ip)); } /* 校验三个文件是否修改,如修改,退出 */ private function checkVerifyMd5() { if($this->conf_md5 != md5_file($this->path_conf)) exit("daemon conf file modified.\n"); if($this->deal_md5 != md5_file($this->file_deal)) exit("deal file modified.\n"); if($this->base_md5 != md5_file(__FILE__)) exit("base class file modified.\n"); $obj = new mBase(); $rdobj = $obj->initRedis(); $restart_server = $rdobj->get(sprintf(_RD_RESTART_SERVER, $this->server_ip)); if($this->restart_server != $restart_server) exit("restart server.\n"); } /** * 设置进程健康状态 * * @return boolean */ protected function setProcHealth() { $obj = new mBase(); $rdobj = $obj->initRedis(); $res = $rdobj->set(sprintf(_DAEMON_HEALTH, $this->server_ip, $this->deal_flag, $this->proc_code), time()); return true; } /** * 进程上限限制 * * @return boolean */ private function cProcessNum() { if($this->proc_code+1>$this->processnum) { $obj = new mBase(); $rdobj = $obj->initRedis(); $is_exist = $rdobj->get(sprintf(_RD_UP_DOWN, $this->server_ip, $this->deal_flag, $this->proc_code)); if($is_exist) return true; //error_log('['.date('Y-m-d H:i:s').']'.$this->proc_code."|procnum:{$this->processnum}\n", 3, dirname(__FILE__).'/deal_track.log'); return false; } return true; } /** * 跟踪日志 * @param int $type * @param string $log * @return boolean */ public function trackLog($type, $log, $log_path='') { if(empty($log_path)) $log_path = sprintf(LOG_TRACK_SAVE_PATH, date('Y-m-d'), $type); $log_dir = dirname($log_path); $isfirst = false; if(!is_dir($log_dir)) { mkdir($log_dir, 0775, true); chown($log_dir, 'nobody'); chgrp($log_dir, 'nobody'); $isfirst = true; } error_log(date('H:i:s').'|'.$log."\r\n", 3, $log_path); chown($log_path, 'nobody'); chgrp($log_path, 'nobody'); return true; } /** * 获取上传/下载任务 * @param unknown $task_key * @param unknown $auto_type // 0上传 1下载 */ public function getTask($task_key, $auto_type=0) { $obj = new mBase(); $robj = $obj->initRedis(); // 该进程是否存在上次未处理完的任务 $proc_key = sprintf(_RD_UP_DOWN, $this->server_ip, $this->deal_flag, $this->proc_code); $sale_id = $robj->get($proc_key); if($sale_id==false) { // 不存在未处理完的任务, 从队列中取 $sale_id = $robj->rpop($task_key); if($sale_id === false) return false; $res = $robj->set($proc_key, $sale_id); } // key saleid // value 进程key // 用于 重新上传 重新下载时 把原占用进程删除, 防止原进程一直在占用这个saleid $active_key = sprintf(_RD_UPLOAD_ACTIVE_PROC, $sale_id); if($auto_type==AUTO_CHECK_DOWN) $active_key = sprintf(_RD_DOWN_ACTIVE_PROC, $sale_id); $this->setProcKey($active_key, $proc_key, 24*60*60); return $sale_id; } /** * 重新放入任务队列 * @param unknown $key * @param unknown $val * @param string $proc_key * @return boolean */ public function lpushToList($key, $val, $proc_key='') { $obj = new mBase(); $robj = $obj->initRedis(); $robj->lpush($key, $val); if($proc_key) $this->delProcKey($proc_key); return true; } /** * 删除进程key * @param unknown $proc_key */ public function delProcKey($proc_key) { $obj = new mBase(); $robj = $obj->initRedis(); $robj->del($proc_key); return true; } /** * 设置进程key * @param unknown $key * @param unknown $val * @param number $expire_time * @return boolean */ public function setProcKey($key, $val, $expire_time=0) { $obj = new mBase(); $robj = $obj->initRedis(); $robj->set($key, $val); if($expire_time>0) $robj->expire($key, $expire_time); return true; } }