录医案
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

242 lines
7.7 KiB

<?php
/**
* 队列进程基类--生成子进程
*
* @version 2.0 - 2011-11-25
* @package Daemon
*/
set_time_limit(0);
include_once(dirname(dirname(dirname(__FILE__)))."/library/publicBase.php");
abstract class dealBase extends publicBase {
protected $processnum = 0; // 程序限制进程数,默认为1
protected $is_while = true; // 是否无限循环,默认为是
protected $deal_sleep = 1; // 每执行一次后等待的时间,单位为妙,默认为1
protected $proc_code; // 当前队列自定义进程数号码标识
protected $server_ip;
private $path_conf; // 配置文件路径
private $path_proc; // 进程状态路径
private $conf_md5; // 配置文件MD5
private $deal_md5; // 执行队列文件MD5
private $base_md5; // 基础类文件MD5
private $file_deal; // 队列文件路径
private $path_deal; // 队列文件路径(包含标识)
private $deal_flag=''; // 队列类别标识
private $close_path; // 开关标识文件
private $restart_server; // 重启服务
public function __construct() {
$this->path_conf = dirname(dirname(__FILE__))."/config/daemonconf.php";
$this->path_proc = dirname(dirname(dirname(__FILE__))).'/queue/cache/proc/deal_%s.txt';
$this->close_path = dirname(dirname(__FILE__)).'/cache/proc/close.txt';
$this->file_deal = $_SERVER['SCRIPT_FILENAME'];
// 识别队列自定义进程数号码标识
if(count($_SERVER['argv'])==3) {
$this->deal_flag = $_SERVER['argv'][1];
$this->proc_code = $_SERVER['argv'][2];
} else {
$this->proc_code = $_SERVER['argv'][1];
}
$this->path_deal = trim($this->file_deal.' '.$this->deal_flag);
include_once($this->path_conf);
$this->server_ip = $this->getServerIp();
$this->setPara();
$this->collectVerifyMd5();
// 循环达到一定次数,重置最大进程数
$i=0;
do {
if($i == GET_DAEMON_MAXNUM_CYCLE_TIMES) $i=0;
if($i == 0) {
$obj = new mDaemon();
$this->processnum = $obj->getDaemonNum($this->server_ip, $this->deal_flag);
}
$this->setProcHealth();
if(file_exists($this->close_path)) {
sleep(10);
return ;
}
$this->checkVerifyMd5();
if(!$this->cProcessNum()) exit("Process limit <{$this->processnum}>\n"); // 每次循环执行任务调用ps系统命令导致cpu使用过高
$this->deal();
sleep($this->deal_sleep);
$i++;
} while($this->is_while);
return;
}
/* 抽象方法,子类必须实现,负责设置daemon运行所必须的属性 */
abstract function setPara();
/* 抽象方法,子类必须实现,负责每条日志处理的过程 */
abstract function deal();
/* 收集文件MD5 */
private function collectVerifyMd5() {
$this->conf_md5 = md5_file($this->path_conf);
$this->deal_md5 = md5_file($this->file_deal);
$this->base_md5 = md5_file(__FILE__);
$obj = new mBase();
$rdobj = $obj->initRedis();
$this->restart_server = $rdobj->get(sprintf(_RD_RESTART_SERVER, $this->server_ip));
}
/* 校验三个文件是否修改,如修改,退出 */
private function checkVerifyMd5() {
if($this->conf_md5 != md5_file($this->path_conf)) exit("daemon conf file modified.\n");
if($this->deal_md5 != md5_file($this->file_deal)) exit("deal file modified.\n");
if($this->base_md5 != md5_file(__FILE__)) exit("base class file modified.\n");
$obj = new mBase();
$rdobj = $obj->initRedis();
$restart_server = $rdobj->get(sprintf(_RD_RESTART_SERVER, $this->server_ip));
if($this->restart_server != $restart_server) exit("restart server.\n");
}
/**
* 设置进程健康状态
*
* @return boolean
*/
protected function setProcHealth() {
$obj = new mBase();
$rdobj = $obj->initRedis();
$res = $rdobj->set(sprintf(_DAEMON_HEALTH, $this->server_ip, $this->deal_flag, $this->proc_code), time());
return true;
}
/**
* 进程上限限制
*
* @return boolean
*/
private function cProcessNum() {
if($this->proc_code+1>$this->processnum) {
$obj = new mBase();
$rdobj = $obj->initRedis();
$is_exist = $rdobj->get(sprintf(_RD_UP_DOWN, $this->server_ip, $this->deal_flag, $this->proc_code));
if($is_exist) return true;
//error_log('['.date('Y-m-d H:i:s').']'.$this->proc_code."|procnum:{$this->processnum}\n", 3, dirname(__FILE__).'/deal_track.log');
return false;
}
return true;
}
/**
* 跟踪日志
* @param int $type
* @param string $log
* @return boolean
*/
public function trackLog($type, $log, $log_path='') {
if(empty($log_path)) $log_path = sprintf(LOG_TRACK_SAVE_PATH, date('Y-m-d'), $type);
$log_dir = dirname($log_path);
$isfirst = false;
if(!is_dir($log_dir)) {
mkdir($log_dir, 0775, true);
chown($log_dir, 'nobody');
chgrp($log_dir, 'nobody');
$isfirst = true;
}
error_log(date('H:i:s').'|'.$log."\r\n", 3, $log_path);
chown($log_path, 'nobody');
chgrp($log_path, 'nobody');
return true;
}
/**
* 获取上传/下载任务
* @param unknown $task_key
* @param unknown $auto_type // 0上传 1下载
*/
public function getTask($task_key, $auto_type=0) {
$obj = new mBase();
$robj = $obj->initRedis();
// 该进程是否存在上次未处理完的任务
$proc_key = sprintf(_RD_UP_DOWN, $this->server_ip, $this->deal_flag, $this->proc_code);
$sale_id = $robj->get($proc_key);
if($sale_id==false) {
// 不存在未处理完的任务, 从队列中取
$sale_id = $robj->rpop($task_key);
if($sale_id === false) return false;
$res = $robj->set($proc_key, $sale_id);
}
// key saleid
// value 进程key
// 用于 重新上传 重新下载时 把原占用进程删除, 防止原进程一直在占用这个saleid
$active_key = sprintf(_RD_UPLOAD_ACTIVE_PROC, $sale_id);
if($auto_type==AUTO_CHECK_DOWN) $active_key = sprintf(_RD_DOWN_ACTIVE_PROC, $sale_id);
$this->setProcKey($active_key, $proc_key, 24*60*60);
return $sale_id;
}
/**
* 重新放入任务队列
* @param unknown $key
* @param unknown $val
* @param string $proc_key
* @return boolean
*/
public function lpushToList($key, $val, $proc_key='') {
$obj = new mBase();
$robj = $obj->initRedis();
$robj->lpush($key, $val);
if($proc_key) $this->delProcKey($proc_key);
return true;
}
/**
* 删除进程key
* @param unknown $proc_key
*/
public function delProcKey($proc_key) {
$obj = new mBase();
$robj = $obj->initRedis();
$robj->del($proc_key);
return true;
}
/**
* 设置进程key
* @param unknown $key
* @param unknown $val
* @param number $expire_time
* @return boolean
*/
public function setProcKey($key, $val, $expire_time=0) {
$obj = new mBase();
$robj = $obj->initRedis();
$robj->set($key, $val);
if($expire_time>0) $robj->expire($key, $expire_time);
return true;
}
}