Browse Source

同步推送数据

pull/14/head
13146336667 3 years ago
parent
commit
a7266324e4
  1. 13
      config/database.ini
  2. 6
      config/define.php
  3. 1
      control/index.php
  4. 4
      cron.conf
  5. 4
      data/dOrder.php
  6. 21
      library/pinduoduo/pdd_api.php
  7. 10
      model/mOrder.php
  8. 24
      model/mPdd.php
  9. 12
      model/mShop.php
  10. 12
      queue/config/daemonconf.php
  11. 279
      queue/crontab/sync_push_data.php

13
config/database.ini

@ -9,8 +9,15 @@ user_r = "kuaileorder_r"
passwd_r = "kuaileorder@R"
db = "kuaileorder"
[pushdata]
master = "100.65.10.33:32001"
user_r = "kuaileorder_r"
passwd_r = "kuaileorder@R"
db = "pdp_db"
[redis]
host = "saas-d90dad49(VIP: 100.65.10.41)"
port = "6379"
db = 0
;暂时未用到redis 用到时再配置
;host = "saas-d90dad49"
;port = "6379"
;db = 0

6
config/define.php

@ -104,6 +104,12 @@
32 => '补寄待商家发货',
);
// 订单是否风控
define('ORDER_RISK_NO', 0); // 正常订单
define('ORDER_RISK_YES', 1); // 风控订单
// rds_id
define('RDS_ID', '48B26FD166F024F7');
# 同步历史订单
define('_RQ_SYNC_HISTORICAL_ORDERS', 'rq_sync_historical_orders');

1
control/index.php

@ -125,6 +125,7 @@ class index extends publicBase {
if (empty($list)) break;
foreach ($list as $info) {
if ($info['risk_control_status'] == ORDER_RISK_YES) continue;
echo $info['order_sn']."\t\n";
}
}

4
cron.conf

@ -1 +1,3 @@
* * * * * /usr/bin/php /var/www/html/queue/cron_monitor.php
* * * * * /usr/bin/php /var/www/html/queue/cron_monitor.php
*/5 * * * * /usr/bin/php /var/www/html/queue/crontab/sync_push_data.php

4
data/dOrder.php

@ -25,6 +25,10 @@ class dOrder extends dBase {
'buyer_memo',
'risk_control_status'
),
'sync_data_modify_time' => array(
'id',
'pdp_modified'
),
);
protected $primary_keys = array(

21
library/pinduoduo/pdd_api.php

@ -238,10 +238,11 @@ class PDD {
* @param number $page_size 每页记录数,默认200,最大200
* @return boolean|mixed
*
* { ["ddy_pdp_users_get_response"]=> array(3) { ["request_id"]=> string(17) "16601863037551775" ["users"]=> array(0) { } ["total_results"]=> int(0) } }
* {"ddy_pdp_users_get_response":{"request_id":"16602046604027416","users":[],"total_results":0}}
*
* {"ddy_pdp_users_get_response":{"request_id":"16602031726529724","users":[{"owner_id":959603435,"rds_id":"48B26FD166F024F7","history_days":0,"status":1}],"total_results":1}}
*/
public function getDdyPdpUsers($owner_id='', $start_modified='', $end_modified='', $page_no=1, $page_size=200) {
$params = $this->getPublicParams();
$params['type'] = 'pdd.ddy.pdp.users.get';
if ($owner_id) $params['owner_id'] = $owner_id;
@ -250,22 +251,23 @@ class PDD {
if ($page_no) $params['page_no'] = $page_no;
if ($page_size) $params['page_size'] = $page_size;
$params['sign'] = $this->getSignature($params);
return $this->postRequest(json_encode($params));
return $this->getPostRequestResult($params);
}
/**
* 添加数据推送用户
* https://open.pinduoduo.com/application/document/api?id=pdd.ddy.pdp.user.add
*
* @param unknown $rds_id
* @param unknown $history_days
* @param unknown $rds_id rds实例编号
* @param unknown $history_days 推送历史数据天数,只能为30天内,包含30天。当此参数不填时,表示以页面中应用配置的历史天数为准;如果为0表示这个用户不推送历史数据;其它表示推送的历史天数
*
* {"ddy_pdp_user_add_response":{"is_success":true,"request_id":"16602048767776597"}}
*
*/
public function addDdyPdpUser($rds_id, $history_days=-1) {
$params['type'] = 'pdd.ddy.pdp.user.add';
$params['rds_id'] = $rds_id;
if ($history_days > -1) $params['history_days'] = $history_days;
$params['history_days'] = $history_days;
return $this->getPostRequestResult($params);
}
@ -288,8 +290,7 @@ class PDD {
$pub_params = $this->getPublicParams();
$nparams = array_merge($pub_params, $params);
$nparams['sign'] = $this->getSignature($params);
$nparams['sign'] = $this->getSignature($nparams);
return $this->postRequest(json_encode($nparams));
}

10
model/mOrder.php

@ -8,6 +8,7 @@ include_once(SERVER_ROOT."/model/mBase.php");
class mOrder extends mBase {
private $obj;
private $order;
private $sync_data_modify_time;
// 订单编号
public $order_sn = '';
@ -54,6 +55,7 @@ class mOrder extends mBase {
public function __construct() {
$this->obj = new dOrder();
$this->order = 'order_list';
$this->sync_data_modify_time = 'sync_data_modify_time';
}
public function addOrder($shop_id, $uid) {
@ -411,4 +413,12 @@ class mOrder extends mBase {
return $data;
}
public function getSyncDataModifyTime() {
return $this->obj->select($this->sync_data_modify_time);
}
public function updateSyncDataModifyTime($id, $pdp_modified) {
return $this->obj->update($this->sync_data_modify_time, array('pdp_modified'=>$pdp_modified), array('sql'=>'`id`=?', 'vals'=>array($id)));
}
}

24
model/mPdd.php

@ -261,6 +261,8 @@ class mPdd extends mBase {
$obj = new PDD(PDD_CLIENT_ID, PDD_CLIENT_SECRET);
$res = $obj->getDdyPdpUsers($owner_id, $start_modified, $end_modified, $page_no, $page_size);
$this->writeLog('pdd', 'getPddDdyPdpUsers.log', $res);
$res = json_decode($res, true);
if (isset($res['error_response'])) {
$this->setError($res['error_response']['error_msg'].' '.$res['error_response']['sub_msg']);
@ -271,22 +273,16 @@ class mPdd extends mBase {
}
public function addPddDdyPdpUser($rds_id, $history_days=-1) {
$obj = new PDD(PDD_CLIENT_ID, PDD_CLIENT_SECRET);
$res = $obj->addDdyPdpUser($rds_id, $history_days);
$res = json_decode($res, true);
if (isset($res['error_response'])) {
$this->setError($res['error_response']['error_msg'].' '.$res['error_response']['sub_msg']);
public function addPddDdyPdpUser($name) {
$access_token = $this->getPddToken($name);
if (!$access_token) {
$this->setError($this->getError());
return false;
}
$obj = new PDD(PDD_CLIENT_ID, PDD_CLIENT_SECRET, $access_token);
$res = $obj->addDdyPdpUser(RDS_ID);
return $res['ddy_pdp_user_add_response'];
}
public function deletePddDdyPdpUser($owner_id) {
$obj = new PDD(PDD_CLIENT_ID, PDD_CLIENT_SECRET);
$res = $obj->deleteDdyPdpUser($owner_id);
$this->writeLog('pdd', 'addPddDdyPdpUser.log', $res);
$res = json_decode($res, true);
if (isset($res['error_response'])) {
@ -294,7 +290,7 @@ class mPdd extends mBase {
return false;
}
return $res['ddy_pdp_users_delete_response'];
return $res['ddy_pdp_user_add_response'];
}
}

12
model/mShop.php

@ -16,6 +16,11 @@ class mShop extends mBase {
}
public function addShop($name, $access_token, $owner_id, $expire_time, $uid=0) {
// 设置推送
$pobj = new mPdd();
$r = $pobj->getPddDdyPdpUsers($owner_id);
if (empty($r['users'])) $pobj->addPddDdyPdpUser($name);
$data = array();
$data['access_token'] = $access_token;
$data['owner_id'] = $owner_id;
@ -43,9 +48,8 @@ class mShop extends mBase {
$res = $this->obj->insert($this->shop, $data);
$this->syncHistoricalOrders($name);
$info = $this->getShopByName($name);
return $info;
}
@ -83,6 +87,10 @@ class mShop extends mBase {
return $this->obj->select($this->shop, array('sql'=>'`id`=?', 'vals'=>array($id)));
}
public function getShopByOwnerid($owner_id) {
return $this->obj->select($this->shop, array('sql'=>'`owner_id`=?', 'vals'=>array($owner_id)));
}
public function updateShop($id, $data) {
return $this->obj->update($this->shop, $data, array('sql'=>'`id`=?', 'vals'=>array($id)));

12
queue/config/daemonconf.php

@ -5,13 +5,13 @@ define('DAEMON_SYNC_INCREMENT_ORDERS', 'daemon_sync_increment_orders');
define('DAEMON_SYNC_ORDERS_STATUS', 'daemon_sync_orders_status');
$GLOBALS['DAEMON_LIST'] = array(
DAEMON_SYNC_HISTORICAL_ORDERS => 'sync_historical_orders.php',
DAEMON_SYNC_INCREMENT_ORDERS => 'sync_increment_orders.php',
DAEMON_SYNC_ORDERS_STATUS => 'sync_orders_status.php',
// DAEMON_SYNC_HISTORICAL_ORDERS => 'sync_historical_orders.php',
// DAEMON_SYNC_INCREMENT_ORDERS => 'sync_increment_orders.php',
// DAEMON_SYNC_ORDERS_STATUS => 'sync_orders_status.php',
);
$GLOBALS['DAEMON_NUMLIMIT'] = array(
DAEMON_SYNC_HISTORICAL_ORDERS => 10,
DAEMON_SYNC_INCREMENT_ORDERS => 1,
DAEMON_SYNC_ORDERS_STATUS => 1,
// DAEMON_SYNC_HISTORICAL_ORDERS => 10,
// DAEMON_SYNC_INCREMENT_ORDERS => 1,
// DAEMON_SYNC_ORDERS_STATUS => 1,
);

279
queue/crontab/sync_push_data.php

@ -0,0 +1,279 @@
<?php
/**
* 同步推送数据
*/
include_once dirname(dirname(__FILE__)).'/base/dealBase.php';
class syncPushData {
private $table;
protected $dbflag = 'pushdata';
private static $db_w = null;
private static $db_r = null;
private static $link;
private $ismaster = false;
public function __construct() {
$obj = new mOrder();
$info = $obj->getSyncDataModifyTime();
if ($info['pdp_modified'] == '0000-00-00 00:00:00') {
$sql = "select min(pdp_modified) as pdp_modified from `pdp_tb_trade`";
$r = $this->execute($sql, false, true);
$start = $r['pdp_modified'];
} else {
$start = $info['pdp_modified'];
}
$end = date("Y-m-d H:i:s", strtotime($start)+300);
$list = $this->selectAll('pdp_tb_trade', array('sql'=>'`pdp_modified` >=? and `pdp_modified` <= ?', 'vals'=>array($start, $end)), 'pdp_modified desc');
foreach ($list as $info) {
if (empty($info['pdp_response'])) continue;
$order = json_decode($info['pdp_response'], true);
/*
array(17) {
["order_sn"]=>
string(22) "220811-669893293090781"
["trade_type"]=>
int(0)
["order_status"]=>
int(2)
["group_status"]=>
int(1)
["confirm_status"]=>
int(1)
["refund_status"]=>
int(1)
["after_sales_status"]=>
int(0)
["risk_control_status"]=>
int(0)
["created_time"]=>
string(19) "2022-08-11 10:58:31"
["pay_time"]=>
string(19) "2022-08-11 10:58:40"
["confirm_time"]=>
string(19) "2022-08-11 10:58:40"
["last_ship_time"]=>
string(19) "2022-08-13 10:58:40"
["updated_at"]=>
string(19) "2022-08-11 11:01:12"
["pay_amount"]=>
float(10.3)
["item_list"]=>
array(1) {
[0]=>
array(6) {
["goods_id"]=>
int(329073140054)
["sku_id"]=>
int(1119777051922)
["goods_name"]=>
string(87) "万方查重万方数据官网本科硕士博士毕业论文查重论文检测系统2.0"
["goods_price"]=>
float(2.6)
["goods_spec"]=>
string(42) "万方通用版(期刊作业报告等)"
["goods_count"]=>
int(4)
}
}
["buyer_memo"]=>
string(0) ""
["urge_shipping_time"]=>
string(0) ""
}
*/
$obj->order_sn = $order['order_sn'];
$obj->order_status = $order['order_status'];
$obj->refund_status = $order['refund_status'];
$obj->after_sales_status = $order['after_sales_status'];
$obj->pay_amount = $order['pay_amount'];
$obj->pay_time = $order['pay_time'];
$obj->goods_id = $order['item_list'][0]['goods_id'];
$obj->sku_id = $order['item_list'][0]['sku_id'];
$obj->goods_price = $order['item_list'][0]['goods_price'];
$obj->goods_count = $order['item_list'][0]['goods_count'];
$obj->urge_shipping_time = $order['urge_shipping_time'];
$obj->last_ship_time = $order['last_ship_time'];
$obj->buyer_memo = $order['buyer_memo'];
$obj->risk_control_status = $order['risk_control_status'];
$sobj = new mShop();
$shop = $sobj->getShopByOwnerid($info['mall_id']);
$res = $obj->addOrder($shop['id'], $shop['uid']);
$obj->writeLog('pdd', 'sync_push_data.log', $start.'|'.$end.'|'.$order['order_sn'].'|'.$res);
}
$obj->updateSyncDataModifyTime($info['id'], $end);
}
/**
* 解析数据库配置信息
* 获得数据库的相关连接信息
* @param sting $dbflag 数据库标识
* @param boolean $master 是否为主库
*/
private function parseDbCnf($dbflag,$master=false) {
$configs = parse_ini_file(_Storage_CNF_PATH, true);
$config = $configs[strtolower($dbflag)];
if($master) {
$host_port = $config['master'];
} else {
$host_port = $config['slave'];
}
$hps = explode(',', $host_port);
$random = rand(0, count($hps)-1);
$hp = $hps[$random];
list($host, $port) = explode(':', $hp);
$cnf = array();
$cnf['host'] = trim($host);
$cnf['port'] = trim($port);
$cnf['user'] = $master==false ? trim($config['user_r']) : trim($config['user']);
$cnf['pwd'] = $master==false ? trim($config['passwd_r']) : trim($config['passwd']);
$cnf['db'] = trim($config['db']);
return $cnf;
}
private function getInstance() {
$dbpara = $this->ismaster==false ? 'db_r' : 'db_w';
if(!is_null(self::$$dbpara)) {
self::$link = self::$$dbpara;
// 判断mysql是否gone away
$status = self::$link->getAttribute(PDO::ATTR_SERVER_INFO);
if($status == 'MySQL server has gone away') {
self::$link = self::$$dbpara = $this->toDb();
}
} else {
self::$link = self::$$dbpara = $this->toDb();
}
return self::$link;
}
private function initDb($table, $ismaster=false) {
$this->table = $table;
$this->ismaster = $ismaster;
return self::getInstance();
}
private function toDb() {
$type = strtolower($this->dbflag);
$cnf = $this->parseDbCnf($this->dbflag, $this->ismaster);
if (count($cnf)<=0 && $cnf['host'] == '') {
return false;
}
try {
$conn[PDO::ATTR_TIMEOUT] = 3;
if($GLOBALS['pconnect_db']===true) $conn[PDO::ATTR_PERSISTENT] = true;
$conn[PDO::MYSQL_ATTR_INIT_COMMAND] = "SET NAMES UTF8;";
$db = new PDO('mysql:host='.$cnf['host'].';port='.$cnf['port'].';dbname='.$cnf['db'],$cnf['user'],$cnf['pwd'], $conn);
} catch(PDOException $e) {
//error_log('['.date('Y-m-d H:i:s').'][first-fail]'.implode('|', $cnf).':'.$e->getMessage()."\n", 3, LOG_PATH_BASE.'/mysql/stat_todb_'.date('Y-m-d').'.log');
try {
$conn[PDO::ATTR_TIMEOUT] = 3;
if($GLOBALS['pconnect_db']===true) $conn[PDO::ATTR_PERSISTENT] = true;
$conn[PDO::MYSQL_ATTR_INIT_COMMAND] = "SET NAMES UTF8;";
$db = new PDO('mysql:host='.$cnf['host'].';port='.$cnf['port'].';dbname='.$cnf['db'],$cnf['user'],$cnf['pwd'], $conn);
} catch(PDOException $e) {
//error_log('['.date('Y-m-d H:i:s').'][second-fail]'.implode('|', $cnf).':'.$e->getMessage()."\n", 3, LOG_PATH_BASE.'/mysql/stat_todb_'.date('Y-m-d').'.log');
return false;
}
//error_log('['.date('Y-m-d H:i:s').'][second-succ]'.implode('|', $cnf)."\n", 3, LOG_PATH_BASE.'/mysql/stat_todb_'.date('Y-m-d').'.log');
return $db;
}
//error_log('['.date('Y-m-d H:i:s').'][first-succ]'.implode('|', $cnf)."\n", 3, LOG_PATH_BASE.'/mysql/stat_todb_'.date('Y-m-d').'.log');
return $db;
}
/**
* 数据库Select多数据
* 封装标准方法
* @param string $table 表名
* @param string $where where条件,比如:array('sql'=>'name1=? and name2=?', 'vals'=>array('name1', 'name2'))
* @param string $orderby 排序,比如:"ctime desc, name asc"
* @param array $limit 读取数量 array(起始数, 数量)
*/
public function selectAll($table, $where=array(), $orderby='', $limit=array()) {
$this->initDb($table);
try {
$wheresql = '';
if(!empty($where)) {
$wheresql = "WHERE {$where['sql']}";
$vals = $where['vals'];
}
$orderby = "ORDER BY {$orderby}";
if($orderby === null) $orderby = '';
$limitstr = empty($limit) ? '' : "LIMIT {$limit[0]}, {$limit[1]}";
$sql = "SELECT * FROM `{$this->table}` {$wheresql} {$orderby} {$limitstr}";
$st = self::$link->prepare($sql);
$res = $st->execute($vals);
if(!$res) return false;
return $st->fetchAll(PDO::FETCH_ASSOC);
} catch(PDOException $e) {
return false;
}
}
/**
* 执行SQL查询语句
*
* 当SQL语句为查询语句时返回执行后的全部数据
* @access public
* @param string $sql SQL语句
* @param boolean $all_rows 是否显示全部数据开关,当为true时,显示全部数据,为false时,显示一行数据,默认为true
* @param boolean $is_select 是否为查询语句
* @return array | void
*/
public function execute($sql, $all_rows=true, $is_select=false) {
$sql = trim($sql);
if(!$sql) return false;
$this->ismaster = $is_select==false ? true : false;
self::getInstance();
try {
$st = self::$link->prepare($sql);
$res = $st->execute();
if(!$res) {
error_log('['.date('Y-m-d H:i:s').']'.$sql.':'.json_encode($st->errorInfo())."\n", 3, LOG_PATH_BASE.'/mysql/execute_'.date('Y-m-d').'.log');
return false;
}
if(!$is_select) return $res;
if($all_rows==true) {
return $st->fetchAll(PDO::FETCH_ASSOC);
} else {
return $st->fetch(PDO::FETCH_ASSOC);
}
} catch(PDOException $e) {
return false;
}
}
}
new syncPushData();
Loading…
Cancel
Save