From 3957e4c34f78beb3545be57fe6064b5961e48279 Mon Sep 17 00:00:00 2001 From: pengda <10266652509@qq.com> Date: Wed, 28 Aug 2024 12:28:16 +0800 Subject: [PATCH] =?UTF-8?q?=E6=A1=86=E6=9E=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- queue/base/cronBase.php | 118 +++++++++++++++++++++ queue/base/dealBase.php | 242 ++++++++++++++++++++++++++++++++++++++++++++ queue/config/daemonconf.php | 14 +++ queue/cron_monitor.php | 43 ++++++++ queue/model/qTool.php | 68 +++++++++++++ 5 files changed, 485 insertions(+) create mode 100644 queue/base/cronBase.php create mode 100644 queue/base/dealBase.php create mode 100644 queue/config/daemonconf.php create mode 100644 queue/cron_monitor.php create mode 100644 queue/model/qTool.php diff --git a/queue/base/cronBase.php b/queue/base/cronBase.php new file mode 100644 index 0000000..eec9f95 --- /dev/null +++ b/queue/base/cronBase.php @@ -0,0 +1,118 @@ + + * @version 2.0 - 2011-11-25 + * @package Daemon + */ + +include_once(dirname(dirname(dirname(__FILE__)))."/library/publicBase.php"); + +date_default_timezone_set("Asia/Shanghai"); +abstract class cronBase { + protected $process; // 队列进程列表,必选项 + protected $procnumlist; // 队列进程数限制,必选项 + protected $procnum = 0; // 默认启动进程数 + protected $maxtimelist; // 队列进程最大执行时间限制列表 + protected $maxtime = 480; // 队列一次循环的默认最长时间, 未自定义则该项有效, 超过该值进程将被杀死 + protected $bin_php = '/usr/local/bin/php'; // php执行文件路径 + + private $path_daemon; // ROOT根目录 + private $proc_start; // 启动队列的进程信息 + private $path_prochealth; // 进程健康路径 + private $path_conf; // 配置文件路径 + + public function __construct() { + $this->path_conf = dirname(dirname(__FILE__))."/config/daemonconf.php"; + include_once($this->path_conf); + + $this->path_daemon = dirname(dirname(dirname(__FILE__))).'/queue/deal/'; + $this->path_prochealth = dirname(dirname(dirname(__FILE__))).'/queue/cache/proc/deal_%s.txt'; + + $this->setPara(); + $this->prepare(); + $this->checkProc(); + $this->startProc(); + + return true; + } + + /* 子类必须实现,设置队列运行所须属性 */ + public abstract function setPara(); + + private function prepare() { + $dir_prochealth = dirname($this->path_prochealth); + if(!is_dir($dir_prochealth)) system("mkdir -p $dir_prochealth"); + if(empty($this->process)) exit("No dealscript.\n"); + } + + private function checkProc() { + $obj = new mBase(); + $server_ip = $obj->getServerIp(); + + $rdobj = $obj->initRedis(); + // 检测 + foreach($this->process as $deal_flag => $proc) { + $active_proc_codes = array(); + // 搜集存活的进程号、数 + $path_proc = $this->path_daemon.$proc; + + $chk_shell = "ps -ef|grep '{$path_proc}'|grep -v grep|grep -v '\/bin\/sh'|awk -F ' ' '{print $2\" \"$10\" \"$11}'"; + + $maxtime = $this->maxtimelist[$deal_flag]+0>0 ? $this->maxtimelist[$deal_flag] : $this->maxtime; + + $fp = @popen($chk_shell, 'r'); + + while (!feof($fp)) { + $buffer = fgets($fp, 4096); + $procinfo = explode(' ', trim($buffer)); + + $procid = $procinfo[0]+0; + $proc_code = is_numeric($procinfo[2]) ? $procinfo[2] : $procinfo[1]+0; + + if($procid==0) continue; + + // 杀死僵死进程 + $lasttime = $rdobj->get(sprintf(_DAEMON_HEALTH, $server_ip, $deal_flag, $proc_code)); + + if($lasttime === false || time()-$lasttime > $maxtime) { + system("/bin/kill -9 $procid"); + error_log('['.date('Y-m-d H:i:s').']kill:'.$path_proc.'|'.$proc_code.'_'.md5($path_proc).'|'.time().'|'.$lasttime.'|'.$maxtime."\n", 3, dirname(dirname(__FILE__)).'/cache/proc/cron_track.log'); + + } else { + // 存活的进程 + $active_proc_codes[] = $proc_code; + } + } + + $procnum = $this->procnumlist[$deal_flag]+0>0 ? $this->procnumlist[$deal_flag] : $this->procnum; + for($i=0;$i<$procnum;$i++) { + if(in_array($i, $active_proc_codes)) continue; + $this->proc_start[$deal_flag]['proc'] = $path_proc; + $this->proc_start[$deal_flag]['startnum'][] = $i; + } + if($fp) @pclose($fp); + } + } + + /* 检查进程运行情况检测, 杀僵死进程, 启动进程 */ + private function startProc() { + // 启动队列 + if(empty($this->proc_start)) return ; + foreach($this->proc_start as $deal_flag => $proc) { + if(empty($proc['startnum'])) continue; + + foreach ($proc['startnum'] as $proc_code) { + $cmd = "{$this->bin_php} {$proc['proc']} {$deal_flag} {$proc_code} &"; + //echo $cmd."\n"; + //error_log('['.date('Y-m-d H:i:s').']startproc:'.$cmd."\n", 3, dirname(__FILE__).'/cron_track.log'); + $fp = @popen($cmd, "r"); + if($fp) @pclose($fp); + } + } + } + +} + diff --git a/queue/base/dealBase.php b/queue/base/dealBase.php new file mode 100644 index 0000000..0849090 --- /dev/null +++ b/queue/base/dealBase.php @@ -0,0 +1,242 @@ +path_conf = dirname(dirname(__FILE__))."/config/daemonconf.php"; + $this->path_proc = dirname(dirname(dirname(__FILE__))).'/queue/cache/proc/deal_%s.txt'; + $this->close_path = dirname(dirname(__FILE__)).'/cache/proc/close.txt'; + + $this->file_deal = $_SERVER['SCRIPT_FILENAME']; + + // 识别队列自定义进程数号码标识 + if(count($_SERVER['argv'])==3) { + $this->deal_flag = $_SERVER['argv'][1]; + $this->proc_code = $_SERVER['argv'][2]; + } else { + $this->proc_code = $_SERVER['argv'][1]; + } + + $this->path_deal = trim($this->file_deal.' '.$this->deal_flag); + + include_once($this->path_conf); + + $this->server_ip = $this->getServerIp(); + + $this->setPara(); + + $this->collectVerifyMd5(); + + // 循环达到一定次数,重置最大进程数 + $i=0; + do { + if($i == GET_DAEMON_MAXNUM_CYCLE_TIMES) $i=0; + + if($i == 0) { + $obj = new mDaemon(); + $this->processnum = $obj->getDaemonNum($this->server_ip, $this->deal_flag); + } + + $this->setProcHealth(); + if(file_exists($this->close_path)) { + sleep(10); + return ; + } + + $this->checkVerifyMd5(); + if(!$this->cProcessNum()) exit("Process limit <{$this->processnum}>\n"); // 每次循环执行任务调用ps系统命令导致cpu使用过高 + $this->deal(); + + sleep($this->deal_sleep); + + $i++; + + } while($this->is_while); + return; + } + + /* 抽象方法,子类必须实现,负责设置daemon运行所必须的属性 */ + abstract function setPara(); + + /* 抽象方法,子类必须实现,负责每条日志处理的过程 */ + abstract function deal(); + + /* 收集文件MD5 */ + private function collectVerifyMd5() { + $this->conf_md5 = md5_file($this->path_conf); + $this->deal_md5 = md5_file($this->file_deal); + $this->base_md5 = md5_file(__FILE__); + + $obj = new mBase(); + $rdobj = $obj->initRedis(); + $this->restart_server = $rdobj->get(sprintf(_RD_RESTART_SERVER, $this->server_ip)); + } + + /* 校验三个文件是否修改,如修改,退出 */ + private function checkVerifyMd5() { + if($this->conf_md5 != md5_file($this->path_conf)) exit("daemon conf file modified.\n"); + if($this->deal_md5 != md5_file($this->file_deal)) exit("deal file modified.\n"); + if($this->base_md5 != md5_file(__FILE__)) exit("base class file modified.\n"); + + $obj = new mBase(); + $rdobj = $obj->initRedis(); + $restart_server = $rdobj->get(sprintf(_RD_RESTART_SERVER, $this->server_ip)); + if($this->restart_server != $restart_server) exit("restart server.\n"); + } + + /** + * 设置进程健康状态 + * + * @return boolean + */ + protected function setProcHealth() { + $obj = new mBase(); + $rdobj = $obj->initRedis(); + $res = $rdobj->set(sprintf(_DAEMON_HEALTH, $this->server_ip, $this->deal_flag, $this->proc_code), time()); + return true; + } + + /** + * 进程上限限制 + * + * @return boolean + */ + private function cProcessNum() { + if($this->proc_code+1>$this->processnum) { + $obj = new mBase(); + $rdobj = $obj->initRedis(); + $is_exist = $rdobj->get(sprintf(_RD_UP_DOWN, $this->server_ip, $this->deal_flag, $this->proc_code)); + if($is_exist) return true; + + //error_log('['.date('Y-m-d H:i:s').']'.$this->proc_code."|procnum:{$this->processnum}\n", 3, dirname(__FILE__).'/deal_track.log'); + return false; + } + return true; + } + + /** + * 跟踪日志 + * @param int $type + * @param string $log + * @return boolean + */ + public function trackLog($type, $log, $log_path='') { + if(empty($log_path)) $log_path = sprintf(LOG_TRACK_SAVE_PATH, date('Y-m-d'), $type); + $log_dir = dirname($log_path); + $isfirst = false; + if(!is_dir($log_dir)) { + mkdir($log_dir, 0775, true); + chown($log_dir, 'nobody'); + chgrp($log_dir, 'nobody'); + $isfirst = true; + } + + error_log(date('H:i:s').'|'.$log."\r\n", 3, $log_path); + chown($log_path, 'nobody'); + chgrp($log_path, 'nobody'); + + return true; + } + + /** + * 获取上传/下载任务 + * @param unknown $task_key + * @param unknown $auto_type // 0上传 1下载 + */ + public function getTask($task_key, $auto_type=0) { + $obj = new mBase(); + $robj = $obj->initRedis(); + + // 该进程是否存在上次未处理完的任务 + $proc_key = sprintf(_RD_UP_DOWN, $this->server_ip, $this->deal_flag, $this->proc_code); + $sale_id = $robj->get($proc_key); + if($sale_id==false) { + // 不存在未处理完的任务, 从队列中取 + $sale_id = $robj->rpop($task_key); + if($sale_id === false) return false; + + $res = $robj->set($proc_key, $sale_id); + } + + // key saleid + // value 进程key + // 用于 重新上传 重新下载时 把原占用进程删除, 防止原进程一直在占用这个saleid + $active_key = sprintf(_RD_UPLOAD_ACTIVE_PROC, $sale_id); + if($auto_type==AUTO_CHECK_DOWN) $active_key = sprintf(_RD_DOWN_ACTIVE_PROC, $sale_id); + $this->setProcKey($active_key, $proc_key, 24*60*60); + + return $sale_id; + } + + /** + * 重新放入任务队列 + * @param unknown $key + * @param unknown $val + * @param string $proc_key + * @return boolean + */ + public function lpushToList($key, $val, $proc_key='') { + $obj = new mBase(); + $robj = $obj->initRedis(); + + $robj->lpush($key, $val); + if($proc_key) $this->delProcKey($proc_key); + return true; + } + + /** + * 删除进程key + * @param unknown $proc_key + */ + public function delProcKey($proc_key) { + $obj = new mBase(); + $robj = $obj->initRedis(); + $robj->del($proc_key); + return true; + } + + /** + * 设置进程key + * @param unknown $key + * @param unknown $val + * @param number $expire_time + * @return boolean + */ + public function setProcKey($key, $val, $expire_time=0) { + $obj = new mBase(); + $robj = $obj->initRedis(); + + $robj->set($key, $val); + if($expire_time>0) $robj->expire($key, $expire_time); + + return true; + } + +} + diff --git a/queue/config/daemonconf.php b/queue/config/daemonconf.php new file mode 100644 index 0000000..eccee43 --- /dev/null +++ b/queue/config/daemonconf.php @@ -0,0 +1,14 @@ + + * @version 3.0 - 2011-11-25 + * @package crontab + */ +include_once dirname(__FILE__).'/base/cronBase.php'; +include_once dirname(__FILE__).'/base/dealBase.php'; + +class MonCron extends cronBase { + + public function setPara() { + + $obj = new mDaemon(); + $rdobj = $obj->initRedis(); + + $server_ip = $obj->getServerIp(); + + // 是否存在需要启动的代理 + $proxy_exec_list = $obj->getProxyExecListByIp($server_ip); + if(is_array($proxy_exec_list) && count($proxy_exec_list) >0) { + $res = $rdobj->lpush(sprintf(_RQ_START_PROXY, $server_ip), $server_ip); + } + + $timestamp = $rdobj->get(sprintf(_RD_RESTART_SERVER, $server_ip)); + if(empty($timestamp)) { + $rdobj->set(sprintf(_RD_RESTART_SERVER, $server_ip), time()); + } + + $daemon = $obj->getStartProcConf($server_ip); + + $this->process = $daemon['flag2proc']; + $this->procnumlist = $daemon['flag2maxnum']; + $this->maxtimelist = $daemon['flag2maxtime']; + + return ; + } +} + +new MonCron; diff --git a/queue/model/qTool.php b/queue/model/qTool.php new file mode 100644 index 0000000..91af9d5 --- /dev/null +++ b/queue/model/qTool.php @@ -0,0 +1,68 @@ + 143, + '_token' => '3073e44edb118b3f5d8014ac7d17d73a', + 'log' => $log + ); + + $url = KUAILELUNWEN_API_URL.'/track_log?type='.$type; + $serobj = new mService(); + $is_jd = $serobj->isJingDong(); + if($is_jd) $url = KUAILELUNWEN_API_URL_JD.'/track_log?type='.$type; + + for($i=0;$i<3;$i++) { + $inf = $this->postCUrl($url, $priv_data, 60); + $inf = json_decode(trim($inf,chr(239).chr(187).chr(191)),true); + if($inf['status']) return true; + } + return true; + } + + + + + +}