path_conf = dirname(dirname(__FILE__))."/config/daemonconf.php"; $this->path_proc = dirname(dirname(dirname(__FILE__))).'/queue/cache/proc/deal_%s.txt'; $this->close_path = dirname(dirname(__FILE__)).'/cache/proc/close.txt'; $this->file_deal = $_SERVER['SCRIPT_FILENAME']; // 识别队列自定义进程数号码标识 if(count($_SERVER['argv'])==3) { $this->deal_flag = $_SERVER['argv'][1]; $this->proc_code = $_SERVER['argv'][2]; } else { $this->proc_code = $_SERVER['argv'][1]; } $this->path_deal = trim($this->file_deal.' '.$this->deal_flag); include_once($this->path_conf); $this->server_ip = $this->getServerIp(); $this->setPara(); $this->collectVerifyMd5(); // 循环达到一定次数,重置最大进程数 $i=0; do { if($i == GET_DAEMON_MAXNUM_CYCLE_TIMES) $i=0; if($i == 0) { $obj = new mDaemon(); $this->processnum = $obj->getDaemonNum($this->server_ip, $this->deal_flag); } $this->setProcHealth(); if(file_exists($this->close_path)) { sleep(10); return ; } $this->checkVerifyMd5(); if(!$this->cProcessNum()) exit("Process limit <{$this->processnum}>\n"); // 每次循环执行任务调用ps系统命令导致cpu使用过高 $this->deal(); sleep($this->deal_sleep); $i++; } while($this->is_while); return; } /* 抽象方法,子类必须实现,负责设置daemon运行所必须的属性 */ abstract function setPara(); /* 抽象方法,子类必须实现,负责每条日志处理的过程 */ abstract function deal(); /* 收集文件MD5 */ private function collectVerifyMd5() { $this->conf_md5 = md5_file($this->path_conf); $this->deal_md5 = md5_file($this->file_deal); $this->base_md5 = md5_file(__FILE__); $obj = new mBase(); $rdobj = $obj->initRedis(); $this->restart_server = $rdobj->get(sprintf(_RD_RESTART_SERVER, $this->server_ip)); } /* 校验三个文件是否修改,如修改,退出 */ private function checkVerifyMd5() { if($this->conf_md5 != md5_file($this->path_conf)) exit("daemon conf file modified.\n"); if($this->deal_md5 != md5_file($this->file_deal)) exit("deal file modified.\n"); if($this->base_md5 != md5_file(__FILE__)) exit("base class file modified.\n"); $obj = new mBase(); $rdobj = $obj->initRedis(); $restart_server = $rdobj->get(sprintf(_RD_RESTART_SERVER, $this->server_ip)); if($this->restart_server != $restart_server) exit("restart server.\n"); } /** * 设置进程健康状态 * * @return boolean */ protected function setProcHealth() { $obj = new mBase(); $rdobj = $obj->initRedis(); $res = $rdobj->set(sprintf(_DAEMON_HEALTH, $this->server_ip, $this->deal_flag, $this->proc_code), time()); return true; } /** * 进程上限限制 * * @return boolean */ private function cProcessNum() { if($this->proc_code+1>$this->processnum) { $obj = new mBase(); $rdobj = $obj->initRedis(); $is_exist = $rdobj->get(sprintf(_RD_UP_DOWN, $this->server_ip, $this->deal_flag, $this->proc_code)); if($is_exist) return true; //error_log('['.date('Y-m-d H:i:s').']'.$this->proc_code."|procnum:{$this->processnum}\n", 3, dirname(__FILE__).'/deal_track.log'); return false; } return true; } /** * 跟踪日志 * @param int $type * @param string $log * @return boolean */ public function trackLog($type, $log, $log_path='') { if(empty($log_path)) $log_path = sprintf(LOG_TRACK_SAVE_PATH, date('Y-m-d'), $type); $log_dir = dirname($log_path); $isfirst = false; if(!is_dir($log_dir)) { mkdir($log_dir, 0775, true); chown($log_dir, 'nobody'); chgrp($log_dir, 'nobody'); $isfirst = true; } error_log(date('H:i:s').'|'.$log."\r\n", 3, $log_path); chown($log_path, 'nobody'); chgrp($log_path, 'nobody'); return true; } /** * 获取上传/下载任务 * @param unknown $task_key * @param unknown $auto_type // 0上传 1下载 */ public function getTask($task_key, $auto_type=0) { $obj = new mBase(); $robj = $obj->initRedis(); // 该进程是否存在上次未处理完的任务 $proc_key = sprintf(_RD_UP_DOWN, $this->server_ip, $this->deal_flag, $this->proc_code); $sale_id = $robj->get($proc_key); if($sale_id==false) { // 不存在未处理完的任务, 从队列中取 $sale_id = $robj->rpop($task_key); if($sale_id === false) return false; $res = $robj->set($proc_key, $sale_id); } // key saleid // value 进程key // 用于 重新上传 重新下载时 把原占用进程删除, 防止原进程一直在占用这个saleid $active_key = sprintf(_RD_UPLOAD_ACTIVE_PROC, $sale_id); if($auto_type==AUTO_CHECK_DOWN) $active_key = sprintf(_RD_DOWN_ACTIVE_PROC, $sale_id); $this->setProcKey($active_key, $proc_key, 24*60*60); return $sale_id; } /** * 重新放入任务队列 * @param unknown $key * @param unknown $val * @param string $proc_key * @return boolean */ public function lpushToList($key, $val, $proc_key='') { $obj = new mBase(); $robj = $obj->initRedis(); $robj->lpush($key, $val); if($proc_key) $this->delProcKey($proc_key); return true; } /** * 删除进程key * @param unknown $proc_key */ public function delProcKey($proc_key) { $obj = new mBase(); $robj = $obj->initRedis(); $robj->del($proc_key); return true; } /** * 设置进程key * @param unknown $key * @param unknown $val * @param number $expire_time * @return boolean */ public function setProcKey($key, $val, $expire_time=0) { $obj = new mBase(); $robj = $obj->initRedis(); $robj->set($key, $val); if($expire_time>0) $robj->expire($key, $expire_time); return true; } }