<?php
/**
 * 队列进程基类--生成子进程
 *
 * @version 2.0 - 2011-11-25
 * @package Daemon
 */

set_time_limit(0);

include_once(dirname(dirname(dirname(__FILE__)))."/library/publicBase.php");
abstract class dealBase extends publicBase {
    protected $processnum = 0;      // 程序限制进程数,默认为1
    protected $is_while = true;     // 是否无限循环,默认为是
    protected $deal_sleep = 1;      // 每执行一次后等待的时间,单位为妙,默认为1
    protected $proc_code;             // 当前队列自定义进程数号码标识
    protected $server_ip;

    private $path_conf;             // 配置文件路径
    private $path_proc;             // 进程状态路径
    private $conf_md5;              // 配置文件MD5
    private $deal_md5;              // 执行队列文件MD5
    private $base_md5;              // 基础类文件MD5
    private $file_deal;             // 队列文件路径
    private $path_deal;             // 队列文件路径(包含标识)
    private $deal_flag='';          // 队列类别标识
    private $close_path;            // 开关标识文件

    private $restart_server;     // 重启服务

    public function __construct() {
        $this->path_conf = dirname(dirname(__FILE__))."/config/daemonconf.php";
        $this->path_proc = dirname(dirname(dirname(__FILE__))).'/queue/cache/proc/deal_%s.txt';
        $this->close_path = dirname(dirname(__FILE__)).'/cache/proc/close.txt';

        $this->file_deal = $_SERVER['SCRIPT_FILENAME'];

        // 识别队列自定义进程数号码标识
        if(count($_SERVER['argv'])==3) {
            $this->deal_flag = $_SERVER['argv'][1];
            $this->proc_code = $_SERVER['argv'][2];
        } else {
            $this->proc_code = $_SERVER['argv'][1];
        }

        $this->path_deal = trim($this->file_deal.' '.$this->deal_flag);

        include_once($this->path_conf);

        $this->server_ip = $this->getServerIp();

        $this->setPara();

        $this->collectVerifyMd5();

        // 循环达到一定次数,重置最大进程数
        $i=0;
        do {
            if($i == GET_DAEMON_MAXNUM_CYCLE_TIMES) $i=0;

            if($i == 0) {
                $obj = new mDaemon();
                $this->processnum = $obj->getDaemonNum($this->server_ip, $this->deal_flag);
            }

            $this->setProcHealth();
            if(file_exists($this->close_path)) {
                sleep(10);
                return ;
            }

            $this->checkVerifyMd5();
            if(!$this->cProcessNum()) exit("Process limit <{$this->processnum}>\n");  // 每次循环执行任务调用ps系统命令导致cpu使用过高
            $this->deal();

            sleep($this->deal_sleep);

            $i++;

        } while($this->is_while);
        return;
    }

    /* 抽象方法,子类必须实现,负责设置daemon运行所必须的属性 */
    abstract function setPara();

    /* 抽象方法,子类必须实现,负责每条日志处理的过程 */
    abstract function deal();

    /* 收集文件MD5 */
    private function collectVerifyMd5() {
        $this->conf_md5 = md5_file($this->path_conf);
        $this->deal_md5 = md5_file($this->file_deal);
        $this->base_md5 = md5_file(__FILE__);

        $obj = new mBase();
        $rdobj = $obj->initRedis();
        $this->restart_server = $rdobj->get(sprintf(_RD_RESTART_SERVER, $this->server_ip));
    }

    /* 校验三个文件是否修改,如修改,退出 */
    private function checkVerifyMd5() {
        if($this->conf_md5 != md5_file($this->path_conf)) exit("daemon conf file modified.\n");
        if($this->deal_md5 != md5_file($this->file_deal)) exit("deal file modified.\n");
        if($this->base_md5 != md5_file(__FILE__)) exit("base class file modified.\n");

        $obj = new mBase();
        $rdobj = $obj->initRedis();
        $restart_server = $rdobj->get(sprintf(_RD_RESTART_SERVER, $this->server_ip));
        if($this->restart_server != $restart_server) exit("restart server.\n");
    }

    /**
     * 设置进程健康状态
     *
     * @return	boolean
     */
    protected function setProcHealth() {
        $obj = new mBase();
        $rdobj = $obj->initRedis();
        $res = $rdobj->set(sprintf(_DAEMON_HEALTH, $this->server_ip, $this->deal_flag, $this->proc_code), time());
        return true;
    }

    /**
     * 进程上限限制
     *
     * @return	boolean
     */
    private function cProcessNum() {
        if($this->proc_code+1>$this->processnum) {
            $obj = new mBase();
            $rdobj = $obj->initRedis();
            $is_exist = $rdobj->get(sprintf(_RD_UP_DOWN, $this->server_ip, $this->deal_flag, $this->proc_code));
            if($is_exist) return true;

            //error_log('['.date('Y-m-d H:i:s').']'.$this->proc_code."|procnum:{$this->processnum}\n", 3, dirname(__FILE__).'/deal_track.log');
            return false;
        }
        return true;
    }

    /**
     * 跟踪日志
     * @param int $type
     * @param string $log
     * @return boolean
     */
    public function trackLog($type, $log, $log_path='') {
        if(empty($log_path)) $log_path = sprintf(LOG_TRACK_SAVE_PATH, date('Y-m-d'), $type);
        $log_dir = dirname($log_path);
        $isfirst = false;
        if(!is_dir($log_dir)) {
            mkdir($log_dir, 0775, true);
            chown($log_dir, 'nobody');
            chgrp($log_dir, 'nobody');
            $isfirst = true;
        }

        error_log(date('H:i:s').'|'.$log."\r\n", 3, $log_path);
        chown($log_path, 'nobody');
        chgrp($log_path, 'nobody');

        return true;
    }

    /**
     * 获取上传/下载任务
     * @param unknown $task_key
     * @param unknown $auto_type   // 0上传 1下载
     */
    public function getTask($task_key, $auto_type=0) {
        $obj = new mBase();
        $robj = $obj->initRedis();

        // 该进程是否存在上次未处理完的任务
        $proc_key = sprintf(_RD_UP_DOWN, $this->server_ip, $this->deal_flag, $this->proc_code);
        $sale_id = $robj->get($proc_key);
        if($sale_id==false) {
            // 不存在未处理完的任务, 从队列中取
            $sale_id = $robj->rpop($task_key);
            if($sale_id === false) return false;

            $res = $robj->set($proc_key, $sale_id);
        }

        // key saleid
        // value 进程key
        // 用于 重新上传 重新下载时 把原占用进程删除, 防止原进程一直在占用这个saleid
        $active_key = sprintf(_RD_UPLOAD_ACTIVE_PROC, $sale_id);
        if($auto_type==AUTO_CHECK_DOWN) $active_key = sprintf(_RD_DOWN_ACTIVE_PROC, $sale_id);
        $this->setProcKey($active_key, $proc_key, 24*60*60);

        return $sale_id;
    }

    /**
     * 重新放入任务队列
     * @param unknown $key
     * @param unknown $val
     * @param string $proc_key
     * @return boolean
     */
    public function lpushToList($key, $val, $proc_key='') {
        $obj = new mBase();
        $robj = $obj->initRedis();

        $robj->lpush($key, $val);
        if($proc_key) $this->delProcKey($proc_key);
        return true;
    }

    /**
     * 删除进程key
     * @param unknown $proc_key
     */
    public function delProcKey($proc_key) {
        $obj = new mBase();
        $robj = $obj->initRedis();
        $robj->del($proc_key);
        return true;
    }

    /**
     * 设置进程key
     * @param unknown $key
     * @param unknown $val
     * @param number $expire_time
     * @return boolean
     */
    public function setProcKey($key, $val, $expire_time=0) {
        $obj = new mBase();
        $robj = $obj->initRedis();

        $robj->set($key, $val);
        if($expire_time>0) $robj->expire($key, $expire_time);

        return true;
    }

}