Browse Source

框架

pull/1/head
pengda 9 months ago
parent
commit
3957e4c34f
  1. 118
      queue/base/cronBase.php
  2. 242
      queue/base/dealBase.php
  3. 14
      queue/config/daemonconf.php
  4. 43
      queue/cron_monitor.php
  5. 68
      queue/model/qTool.php

118
queue/base/cronBase.php

@ -0,0 +1,118 @@
<?php
/**
* 启动并监控cron的队列脚本状态
*
* @copyright (c) 2010, weibo All rights reserved.
* @author 王勇 <wangyong1@staff.sina.com.cn>
* @version 2.0 - 2011-11-25
* @package Daemon
*/
include_once(dirname(dirname(dirname(__FILE__)))."/library/publicBase.php");
date_default_timezone_set("Asia/Shanghai");
abstract class cronBase {
protected $process; // 队列进程列表,必选项
protected $procnumlist; // 队列进程数限制,必选项
protected $procnum = 0; // 默认启动进程数
protected $maxtimelist; // 队列进程最大执行时间限制列表
protected $maxtime = 480; // 队列一次循环的默认最长时间, 未自定义则该项有效, 超过该值进程将被杀死
protected $bin_php = '/usr/local/bin/php'; // php执行文件路径
private $path_daemon; // ROOT根目录
private $proc_start; // 启动队列的进程信息
private $path_prochealth; // 进程健康路径
private $path_conf; // 配置文件路径
public function __construct() {
$this->path_conf = dirname(dirname(__FILE__))."/config/daemonconf.php";
include_once($this->path_conf);
$this->path_daemon = dirname(dirname(dirname(__FILE__))).'/queue/deal/';
$this->path_prochealth = dirname(dirname(dirname(__FILE__))).'/queue/cache/proc/deal_%s.txt';
$this->setPara();
$this->prepare();
$this->checkProc();
$this->startProc();
return true;
}
/* 子类必须实现,设置队列运行所须属性 */
public abstract function setPara();
private function prepare() {
$dir_prochealth = dirname($this->path_prochealth);
if(!is_dir($dir_prochealth)) system("mkdir -p $dir_prochealth");
if(empty($this->process)) exit("No dealscript.\n");
}
private function checkProc() {
$obj = new mBase();
$server_ip = $obj->getServerIp();
$rdobj = $obj->initRedis();
// 检测
foreach($this->process as $deal_flag => $proc) {
$active_proc_codes = array();
// 搜集存活的进程号、数
$path_proc = $this->path_daemon.$proc;
$chk_shell = "ps -ef|grep '{$path_proc}'|grep -v grep|grep -v '\/bin\/sh'|awk -F ' ' '{print $2\" \"$10\" \"$11}'";
$maxtime = $this->maxtimelist[$deal_flag]+0>0 ? $this->maxtimelist[$deal_flag] : $this->maxtime;
$fp = @popen($chk_shell, 'r');
while (!feof($fp)) {
$buffer = fgets($fp, 4096);
$procinfo = explode(' ', trim($buffer));
$procid = $procinfo[0]+0;
$proc_code = is_numeric($procinfo[2]) ? $procinfo[2] : $procinfo[1]+0;
if($procid==0) continue;
// 杀死僵死进程
$lasttime = $rdobj->get(sprintf(_DAEMON_HEALTH, $server_ip, $deal_flag, $proc_code));
if($lasttime === false || time()-$lasttime > $maxtime) {
system("/bin/kill -9 $procid");
error_log('['.date('Y-m-d H:i:s').']kill:'.$path_proc.'|'.$proc_code.'_'.md5($path_proc).'|'.time().'|'.$lasttime.'|'.$maxtime."\n", 3, dirname(dirname(__FILE__)).'/cache/proc/cron_track.log');
} else {
// 存活的进程
$active_proc_codes[] = $proc_code;
}
}
$procnum = $this->procnumlist[$deal_flag]+0>0 ? $this->procnumlist[$deal_flag] : $this->procnum;
for($i=0;$i<$procnum;$i++) {
if(in_array($i, $active_proc_codes)) continue;
$this->proc_start[$deal_flag]['proc'] = $path_proc;
$this->proc_start[$deal_flag]['startnum'][] = $i;
}
if($fp) @pclose($fp);
}
}
/* 检查进程运行情况检测, 杀僵死进程, 启动进程 */
private function startProc() {
// 启动队列
if(empty($this->proc_start)) return ;
foreach($this->proc_start as $deal_flag => $proc) {
if(empty($proc['startnum'])) continue;
foreach ($proc['startnum'] as $proc_code) {
$cmd = "{$this->bin_php} {$proc['proc']} {$deal_flag} {$proc_code} &";
//echo $cmd."\n";
//error_log('['.date('Y-m-d H:i:s').']startproc:'.$cmd."\n", 3, dirname(__FILE__).'/cron_track.log');
$fp = @popen($cmd, "r");
if($fp) @pclose($fp);
}
}
}
}

242
queue/base/dealBase.php

@ -0,0 +1,242 @@
<?php
/**
* 队列进程基类--生成子进程
*
* @version 2.0 - 2011-11-25
* @package Daemon
*/
set_time_limit(0);
include_once(dirname(dirname(dirname(__FILE__)))."/library/publicBase.php");
abstract class dealBase extends publicBase {
protected $processnum = 0; // 程序限制进程数,默认为1
protected $is_while = true; // 是否无限循环,默认为是
protected $deal_sleep = 1; // 每执行一次后等待的时间,单位为妙,默认为1
protected $proc_code; // 当前队列自定义进程数号码标识
protected $server_ip;
private $path_conf; // 配置文件路径
private $path_proc; // 进程状态路径
private $conf_md5; // 配置文件MD5
private $deal_md5; // 执行队列文件MD5
private $base_md5; // 基础类文件MD5
private $file_deal; // 队列文件路径
private $path_deal; // 队列文件路径(包含标识)
private $deal_flag=''; // 队列类别标识
private $close_path; // 开关标识文件
private $restart_server; // 重启服务
public function __construct() {
$this->path_conf = dirname(dirname(__FILE__))."/config/daemonconf.php";
$this->path_proc = dirname(dirname(dirname(__FILE__))).'/queue/cache/proc/deal_%s.txt';
$this->close_path = dirname(dirname(__FILE__)).'/cache/proc/close.txt';
$this->file_deal = $_SERVER['SCRIPT_FILENAME'];
// 识别队列自定义进程数号码标识
if(count($_SERVER['argv'])==3) {
$this->deal_flag = $_SERVER['argv'][1];
$this->proc_code = $_SERVER['argv'][2];
} else {
$this->proc_code = $_SERVER['argv'][1];
}
$this->path_deal = trim($this->file_deal.' '.$this->deal_flag);
include_once($this->path_conf);
$this->server_ip = $this->getServerIp();
$this->setPara();
$this->collectVerifyMd5();
// 循环达到一定次数,重置最大进程数
$i=0;
do {
if($i == GET_DAEMON_MAXNUM_CYCLE_TIMES) $i=0;
if($i == 0) {
$obj = new mDaemon();
$this->processnum = $obj->getDaemonNum($this->server_ip, $this->deal_flag);
}
$this->setProcHealth();
if(file_exists($this->close_path)) {
sleep(10);
return ;
}
$this->checkVerifyMd5();
if(!$this->cProcessNum()) exit("Process limit <{$this->processnum}>\n"); // 每次循环执行任务调用ps系统命令导致cpu使用过高
$this->deal();
sleep($this->deal_sleep);
$i++;
} while($this->is_while);
return;
}
/* 抽象方法,子类必须实现,负责设置daemon运行所必须的属性 */
abstract function setPara();
/* 抽象方法,子类必须实现,负责每条日志处理的过程 */
abstract function deal();
/* 收集文件MD5 */
private function collectVerifyMd5() {
$this->conf_md5 = md5_file($this->path_conf);
$this->deal_md5 = md5_file($this->file_deal);
$this->base_md5 = md5_file(__FILE__);
$obj = new mBase();
$rdobj = $obj->initRedis();
$this->restart_server = $rdobj->get(sprintf(_RD_RESTART_SERVER, $this->server_ip));
}
/* 校验三个文件是否修改,如修改,退出 */
private function checkVerifyMd5() {
if($this->conf_md5 != md5_file($this->path_conf)) exit("daemon conf file modified.\n");
if($this->deal_md5 != md5_file($this->file_deal)) exit("deal file modified.\n");
if($this->base_md5 != md5_file(__FILE__)) exit("base class file modified.\n");
$obj = new mBase();
$rdobj = $obj->initRedis();
$restart_server = $rdobj->get(sprintf(_RD_RESTART_SERVER, $this->server_ip));
if($this->restart_server != $restart_server) exit("restart server.\n");
}
/**
* 设置进程健康状态
*
* @return boolean
*/
protected function setProcHealth() {
$obj = new mBase();
$rdobj = $obj->initRedis();
$res = $rdobj->set(sprintf(_DAEMON_HEALTH, $this->server_ip, $this->deal_flag, $this->proc_code), time());
return true;
}
/**
* 进程上限限制
*
* @return boolean
*/
private function cProcessNum() {
if($this->proc_code+1>$this->processnum) {
$obj = new mBase();
$rdobj = $obj->initRedis();
$is_exist = $rdobj->get(sprintf(_RD_UP_DOWN, $this->server_ip, $this->deal_flag, $this->proc_code));
if($is_exist) return true;
//error_log('['.date('Y-m-d H:i:s').']'.$this->proc_code."|procnum:{$this->processnum}\n", 3, dirname(__FILE__).'/deal_track.log');
return false;
}
return true;
}
/**
* 跟踪日志
* @param int $type
* @param string $log
* @return boolean
*/
public function trackLog($type, $log, $log_path='') {
if(empty($log_path)) $log_path = sprintf(LOG_TRACK_SAVE_PATH, date('Y-m-d'), $type);
$log_dir = dirname($log_path);
$isfirst = false;
if(!is_dir($log_dir)) {
mkdir($log_dir, 0775, true);
chown($log_dir, 'nobody');
chgrp($log_dir, 'nobody');
$isfirst = true;
}
error_log(date('H:i:s').'|'.$log."\r\n", 3, $log_path);
chown($log_path, 'nobody');
chgrp($log_path, 'nobody');
return true;
}
/**
* 获取上传/下载任务
* @param unknown $task_key
* @param unknown $auto_type // 0上传 1下载
*/
public function getTask($task_key, $auto_type=0) {
$obj = new mBase();
$robj = $obj->initRedis();
// 该进程是否存在上次未处理完的任务
$proc_key = sprintf(_RD_UP_DOWN, $this->server_ip, $this->deal_flag, $this->proc_code);
$sale_id = $robj->get($proc_key);
if($sale_id==false) {
// 不存在未处理完的任务, 从队列中取
$sale_id = $robj->rpop($task_key);
if($sale_id === false) return false;
$res = $robj->set($proc_key, $sale_id);
}
// key saleid
// value 进程key
// 用于 重新上传 重新下载时 把原占用进程删除, 防止原进程一直在占用这个saleid
$active_key = sprintf(_RD_UPLOAD_ACTIVE_PROC, $sale_id);
if($auto_type==AUTO_CHECK_DOWN) $active_key = sprintf(_RD_DOWN_ACTIVE_PROC, $sale_id);
$this->setProcKey($active_key, $proc_key, 24*60*60);
return $sale_id;
}
/**
* 重新放入任务队列
* @param unknown $key
* @param unknown $val
* @param string $proc_key
* @return boolean
*/
public function lpushToList($key, $val, $proc_key='') {
$obj = new mBase();
$robj = $obj->initRedis();
$robj->lpush($key, $val);
if($proc_key) $this->delProcKey($proc_key);
return true;
}
/**
* 删除进程key
* @param unknown $proc_key
*/
public function delProcKey($proc_key) {
$obj = new mBase();
$robj = $obj->initRedis();
$robj->del($proc_key);
return true;
}
/**
* 设置进程key
* @param unknown $key
* @param unknown $val
* @param number $expire_time
* @return boolean
*/
public function setProcKey($key, $val, $expire_time=0) {
$obj = new mBase();
$robj = $obj->initRedis();
$robj->set($key, $val);
if($expire_time>0) $robj->expire($key, $expire_time);
return true;
}
}

14
queue/config/daemonconf.php

@ -0,0 +1,14 @@
<?php
$GLOBALS['DAEMON_MAXTIME'] = array(
);
$GLOBALS['DAEMON_LIST'] = array(
);
$GLOBALS['DAEMON_LIST_DESC'] = array(
);
$GLOBALS['DAEMON_NUMLIMIT'] = array(
);

43
queue/cron_monitor.php

@ -0,0 +1,43 @@
<?php
/**
* Cron监控程序
*
* @copyright(c)2011,weibo All rights reserved.
* @author 王勇 <wangyong1@staff.sina.com.cn>
* @version 3.0 - 2011-11-25
* @package crontab
*/
include_once dirname(__FILE__).'/base/cronBase.php';
include_once dirname(__FILE__).'/base/dealBase.php';
class MonCron extends cronBase {
public function setPara() {
$obj = new mDaemon();
$rdobj = $obj->initRedis();
$server_ip = $obj->getServerIp();
// 是否存在需要启动的代理
$proxy_exec_list = $obj->getProxyExecListByIp($server_ip);
if(is_array($proxy_exec_list) && count($proxy_exec_list) >0) {
$res = $rdobj->lpush(sprintf(_RQ_START_PROXY, $server_ip), $server_ip);
}
$timestamp = $rdobj->get(sprintf(_RD_RESTART_SERVER, $server_ip));
if(empty($timestamp)) {
$rdobj->set(sprintf(_RD_RESTART_SERVER, $server_ip), time());
}
$daemon = $obj->getStartProcConf($server_ip);
$this->process = $daemon['flag2proc'];
$this->procnumlist = $daemon['flag2maxnum'];
$this->maxtimelist = $daemon['flag2maxtime'];
return ;
}
}
new MonCron;

68
queue/model/qTool.php

@ -0,0 +1,68 @@
<?php
/*
*
*/
include_once(SERVER_ROOT."/model/mBase.php");
class qTool extends mBase {
/**
* 加入重试日志
* @param unknown $type
* @param unknown $id
*/
public function addRetryLog($type, $id) {
return error_log($id."\n", 3, PAPER_UPFAIL_PATH);
}
/**
* 跟踪日志
* @param int $type
* @param string $log
* @return boolean
*/
public function trackLog($type, $log, $log_path='') {
if(empty($log_path)) $log_path = sprintf(LOG_TRACK_SAVE_PATH, date('Y-m-d'), $type);
$log_dir = dirname($log_path);
$isfirst = false;
if(!is_dir($log_dir)) {
mkdir($log_dir, 0775, true);
chown($log_dir, 'nobody');
chgrp($log_dir, 'nobody');
$isfirst = true;
}
error_log(date('H:i:s').'|'.$log."\r\n", 3, $log_path);
chown($log_path, 'nobody');
chgrp($log_path, 'nobody');
return true;
}
public function trackApiLog($type, $log) {
$priv_data = array(
'_uid' => 143,
'_token' => '3073e44edb118b3f5d8014ac7d17d73a',
'log' => $log
);
$url = KUAILELUNWEN_API_URL.'/track_log?type='.$type;
$serobj = new mService();
$is_jd = $serobj->isJingDong();
if($is_jd) $url = KUAILELUNWEN_API_URL_JD.'/track_log?type='.$type;
for($i=0;$i<3;$i++) {
$inf = $this->postCUrl($url, $priv_data, 60);
$inf = json_decode(trim($inf,chr(239).chr(187).chr(191)),true);
if($inf['status']) return true;
}
return true;
}
}
Loading…
Cancel
Save