elasticSearch 是目前来说,最强大的开源搜索引擎,对于一些搜索,放到ElasticSearch中,速度会快很多,当然,这个玩意也是非常消耗资源。
下面是,使用yii2,将数据批量导入到ES中,单行插入的效率太低,使用批量插入,速度还是可以。
安装ElasticSearch 这个参看
安装ElasticSearch ,以及在yii2中的使用
2. 安装yii2-ElasticSearch插件
https://github.com/yiisoft/yii2-elasticsearch
3. 配置
'elasticsearch_TA' => [
'class' => 'yii\elasticsearch\Connection',
'nodes' => [
['http_address' => '192.168.0.199:9200'],
['http_address' => '192.168.0.210:9200'],
],
],
4.使用
传递数据,我们还是用shell 脚本来传递数据 /appta/shell/customer/syncCustomerDataToEs.sh
#!/bin/sh
DIR=$(cd `dirname $0`; pwd)
# sync mongodb to elasticsearch
echo 'sync custom data to es'
processDate=$1
websiteIds=$2
arr=$(echo $websiteIds|tr "," "\n");
for website_id in $arr; do
echo "website_id:".$website_id;
variable=`$DIR/../../../yii ta/migrate/elasticsearch/customerdatapagecount $processDate $website_id`
echo "$variable.."
for (( i=1; i<=$variable; i++ ))
do
$DIR/../../../yii ta/migrate/elasticsearch/customerdata $processDate $website_id $i
echo "Page $i done"
done
done
controller文件:
<?php
namespace appadmin\code\Ta\console\migrate;
use Yii;
use appadmin\code\Ta\models\WebsiteBaseInfo;
use yii\console\Controller;
use appadmin\code\Ta\helper\mongoDb as MongoDb;
use appadmin\code\Ta\models\mongo\CustomerData as MgCustomerData;
use appadmin\code\Ta\models\elasticSearch\CustomerData as EsCustomerData;
use appadmin\code\Ta\models\mongo\TraceData as MgTraceData;
use appadmin\code\Ta\models\elasticSearch\TraceData as EsTraceData;
class ElasticsearchController extends Controller
{
public $numPerPage = 1000;
//public $dbName = "ta_".$processDate;
//public $collName;
public function initParam($processDate,$website_id){
//$thidbName = "ta_".$processDate;
$collName = "ta_".$website_id."_customer_data";
//echo $processDate;exit;
MongoDb::setDbByDate($processDate);
MgCustomerData::initCollName($website_id);
MgTraceData::initCollName($website_id);
}
# customer data 数据的总页数
public function actionCustomerdatapagecount($processDate,$website_id){
$this->initParam($processDate,$website_id);
$count = MgCustomerData::find()->count();
//var_dump(MgCustomerData::getDb());
//echo $count;exit;
echo ceil($count/$this->numPerPage);
}
# 同步customer data的数据到ElasticSearch
public function actionCustomerdata($processDate,$website_id,$pageNum){
$this->initParam($processDate,$website_id);
$skip = $this->numPerPage * ($pageNum - 1);
$data = MgCustomerData::find()
->asArray()
->limit($this->numPerPage)
->offset($skip)
->all();
$arr = [];
$i = 0;
if(is_array($data) && !empty($data )){
$elasticsearch = Yii::$app->elasticsearch_TA;
$bulkclient = $elasticsearch->createBulkCommand();
//EsCustomerData::initDb($website_id);
$index_name = 'ta_'.$website_id;
$one_day_type = 'customer_data';
//$EsCustomerDataOne = EsCustomerData::findOne($a['_id']);
foreach($data as $one){
$i++;
$a = [];
$a['id'] = $one['_id'];
$value = $one['value'];
if(is_array($value) && !empty($value )){
foreach($value as $k => $v){
if($k == 'data'){
//var_dump($v);
$v = serialize($v);
}
$a[$k] = $v;
}
}
$bulkclient->addAction(array(
'index' => array(
'_index'=> $index_name,
'_type' => $one_day_type,
'_id' => $one['_id'],
)
), $a);
/*
# 保存数据到ES
EsCustomerData::initDb($website_id);
$EsCustomerDataOne = EsCustomerData::findOne($a['_id']);
if(!$EsCustomerDataOne){
$EsCustomerDataOne = new EsCustomerData;
$EsCustomerDataOne->setPrimaryKey($a['_id']);
}
$EsCustomerDataOne->id = $a['_id'];
$attributes = $EsCustomerDataOne->attributes();
foreach($a as $k=>$v){
if(in_array($k,$attributes)){
if($k == 'data'){
//var_dump($v);
$v = serialize($v);
}
$EsCustomerDataOne[$k] = $v;
}
}
$mtime=explode(' ',microtime());
$startTime=$mtime[1]+$mtime[0];
$EsCustomerDataOne->save();
$mtime=explode(' ',microtime());
$endTime=$mtime[1]+$mtime[0];
echo "chaju_time :($i)".($endTime-$startTime)."\n";
//$arr[] = $a;
*/
}
$bulkclient->execute();
}
}
# customer data 数据的总页数
public function actionTracedatapagecount($processDate,$website_id){
$this->initParam($processDate,$website_id);
$count = MgTraceData::find()->count();
//var_dump(MgCustomerData::getDb());
//echo $count;exit;
echo ceil($count/$this->numPerPage);
}
# 同步customer data的数据到ElasticSearch
public function actionTracedata($processDate,$website_id,$pageNum){
$this->initParam($processDate,$website_id);
$skip = $this->numPerPage * ($pageNum - 1);
$data = MgTraceData::find()
->asArray()
->limit($this->numPerPage)
->offset($skip)
->all();
$arr = [];
$i = 0;
if(is_array($data) && !empty($data )){
$elasticsearch = Yii::$app->elasticsearch_TA;
$bulkclient = $elasticsearch->createBulkCommand();
//EsCustomerData::initDb($website_id);
$index_name = 'ta_'.$website_id;
$one_day_type = 'trace_data';
//$EsCustomerDataOne = EsCustomerData::findOne($a['_id']);
foreach($data as $one){
$i++;
$a = [];
if(is_array($one) && !empty($one )){
foreach($one as $k => $v){
$a[$k] = $v;
}
}
$a['id'] = $a['_id'];
unset($a['_id']);
$bulkclient->addAction(array(
'index' => array(
'_index'=> $index_name,
'_type' => $one_day_type,
'_id' => $one['_id'],
)
), $a);
}
$bulkclient->execute();
}
}
}
appadmin\code\Ta\models\mongo\CustomerData
<?php
# 商家SELLER 和 对应的 SELLERID 的设置。
namespace appadmin\code\Ta\models\mongo;
use yii\mongodb\ActiveRecord;
use fec\helpers\CDate;
use fec\helpers\CConfig;
use Yii;
use appadmin\code\Ta\helper\mongoDb;
# use appadmin\code\Ta\models\mongo\CustomerData;
class CustomerData extends ActiveRecord
{
public static $_collectionName;
# 定义db
public static function getDb()
{
return \Yii::$app->get('mongodb_ta_date');
}
# 定义collection name
public static function collectionName()
{
return self::$_collectionName;
}
public static function initCollName($website_id){
self::$_collectionName = "ta_".$website_id."_customer_data";
}
public function attributes()
{
// path mapping for '_id' is setup to field 'id'
return [
'_id',
'value',
];
}
}
appadmin\code\Ta\models\ElasticSearch\CustomerData
<?php
namespace appadmin\code\Ta\models\elasticSearch;
use yii\elasticsearch\ActiveRecord;
class CustomerData extends ActiveRecord
{
public static $currentIndex;
# 定义db链接
public static function getDb()
{
return \Yii::$app->get('elasticsearch_TA');
}
# 不同的website 使用的是不同的db ,使用前需要先初始化
# db的名字
public static function initDb($website_id){
//echo 888;
if($website_id){
//echo 999;
self::$currentIndex = 'ta'."_".$website_id;
//echo self::$currentIndex;
//echo 3;
}
}
# db
public static function index()
{
return self::$currentIndex;
}
# table
public static function type()
{
return 'customer_data';
}
public function attributes()
{
// path mapping for '_id' is setup to field 'id'
return [
'id',
'uuid',
'customer_id',
'pv',
'ip',
'service_date_str',
'service_datetime',
'service_timestamp',
'devide',
'user_agent',
'browser_name',
'browser_version',
'browser_date',
'browser_lang',
'operate',
'operate_relase',
'domain',
'url',
'title',
'refer_url',
'first_referrer_domain',
'is_return',
'uuid',
'device_pixel_ratio',
'resolution',
'color_depth',
'website_id',
'sku',
'country_code',
'country_name',
'data',
'order_status',
'cart',
'order',
'category',
'login_email',
'register_email',
'search',
'currency',
'stay_seconds',
];
}
}