elasticSearch 是目前来说,最强大的开源搜索引擎,对于一些搜索,放到ElasticSearch中,速度会快很多,当然,这个玩意也是非常消耗资源。
下面是,使用yii2,将数据批量导入到ES中,单行插入的效率太低,使用批量插入,速度还是可以。
安装ElasticSearch 这个参看
2. 安装yii2-ElasticSearch插件
https://github.com/yiisoft/yii2-elasticsearch
3. 配置
'elasticsearch_TA' => [ 'class' => 'yii\elasticsearch\Connection', 'nodes' => [ ['http_address' => '192.168.0.199:9200'], ['http_address' => '192.168.0.210:9200'], ], ],
4.使用
传递数据,我们还是用shell 脚本来传递数据 /appta/shell/customer/syncCustomerDataToEs.sh
#!/bin/sh DIR=$(cd `dirname $0`; pwd) # sync mongodb to elasticsearch echo 'sync custom data to es' processDate=$1 websiteIds=$2 arr=$(echo $websiteIds|tr "," "\n"); for website_id in $arr; do echo "website_id:".$website_id; variable=`$DIR/../../../yii ta/migrate/elasticsearch/customerdatapagecount $processDate $website_id` echo "$variable.." for (( i=1; i<=$variable; i++ )) do $DIR/../../../yii ta/migrate/elasticsearch/customerdata $processDate $website_id $i echo "Page $i done" done done
controller文件:
<?php namespace appadmin\code\Ta\console\migrate; use Yii; use appadmin\code\Ta\models\WebsiteBaseInfo; use yii\console\Controller; use appadmin\code\Ta\helper\mongoDb as MongoDb; use appadmin\code\Ta\models\mongo\CustomerData as MgCustomerData; use appadmin\code\Ta\models\elasticSearch\CustomerData as EsCustomerData; use appadmin\code\Ta\models\mongo\TraceData as MgTraceData; use appadmin\code\Ta\models\elasticSearch\TraceData as EsTraceData; class ElasticsearchController extends Controller { public $numPerPage = 1000; //public $dbName = "ta_".$processDate; //public $collName; public function initParam($processDate,$website_id){ //$thidbName = "ta_".$processDate; $collName = "ta_".$website_id."_customer_data"; //echo $processDate;exit; MongoDb::setDbByDate($processDate); MgCustomerData::initCollName($website_id); MgTraceData::initCollName($website_id); } # customer data 数据的总页数 public function actionCustomerdatapagecount($processDate,$website_id){ $this->initParam($processDate,$website_id); $count = MgCustomerData::find()->count(); //var_dump(MgCustomerData::getDb()); //echo $count;exit; echo ceil($count/$this->numPerPage); } # 同步customer data的数据到ElasticSearch public function actionCustomerdata($processDate,$website_id,$pageNum){ $this->initParam($processDate,$website_id); $skip = $this->numPerPage * ($pageNum - 1); $data = MgCustomerData::find() ->asArray() ->limit($this->numPerPage) ->offset($skip) ->all(); $arr = []; $i = 0; if(is_array($data) && !empty($data )){ $elasticsearch = Yii::$app->elasticsearch_TA; $bulkclient = $elasticsearch->createBulkCommand(); //EsCustomerData::initDb($website_id); $index_name = 'ta_'.$website_id; $one_day_type = 'customer_data'; //$EsCustomerDataOne = EsCustomerData::findOne($a['_id']); foreach($data as $one){ $i++; $a = []; $a['id'] = $one['_id']; $value = $one['value']; if(is_array($value) && !empty($value )){ foreach($value as $k => $v){ if($k == 'data'){ //var_dump($v); $v = serialize($v); } $a[$k] = $v; } } $bulkclient->addAction(array( 'index' => array( '_index'=> $index_name, '_type' => $one_day_type, '_id' => $one['_id'], ) ), $a); /* # 保存数据到ES EsCustomerData::initDb($website_id); $EsCustomerDataOne = EsCustomerData::findOne($a['_id']); if(!$EsCustomerDataOne){ $EsCustomerDataOne = new EsCustomerData; $EsCustomerDataOne->setPrimaryKey($a['_id']); } $EsCustomerDataOne->id = $a['_id']; $attributes = $EsCustomerDataOne->attributes(); foreach($a as $k=>$v){ if(in_array($k,$attributes)){ if($k == 'data'){ //var_dump($v); $v = serialize($v); } $EsCustomerDataOne[$k] = $v; } } $mtime=explode(' ',microtime()); $startTime=$mtime[1]+$mtime[0]; $EsCustomerDataOne->save(); $mtime=explode(' ',microtime()); $endTime=$mtime[1]+$mtime[0]; echo "chaju_time :($i)".($endTime-$startTime)."\n"; //$arr[] = $a; */ } $bulkclient->execute(); } } # customer data 数据的总页数 public function actionTracedatapagecount($processDate,$website_id){ $this->initParam($processDate,$website_id); $count = MgTraceData::find()->count(); //var_dump(MgCustomerData::getDb()); //echo $count;exit; echo ceil($count/$this->numPerPage); } # 同步customer data的数据到ElasticSearch public function actionTracedata($processDate,$website_id,$pageNum){ $this->initParam($processDate,$website_id); $skip = $this->numPerPage * ($pageNum - 1); $data = MgTraceData::find() ->asArray() ->limit($this->numPerPage) ->offset($skip) ->all(); $arr = []; $i = 0; if(is_array($data) && !empty($data )){ $elasticsearch = Yii::$app->elasticsearch_TA; $bulkclient = $elasticsearch->createBulkCommand(); //EsCustomerData::initDb($website_id); $index_name = 'ta_'.$website_id; $one_day_type = 'trace_data'; //$EsCustomerDataOne = EsCustomerData::findOne($a['_id']); foreach($data as $one){ $i++; $a = []; if(is_array($one) && !empty($one )){ foreach($one as $k => $v){ $a[$k] = $v; } } $a['id'] = $a['_id']; unset($a['_id']); $bulkclient->addAction(array( 'index' => array( '_index'=> $index_name, '_type' => $one_day_type, '_id' => $one['_id'], ) ), $a); } $bulkclient->execute(); } } }
appadmin\code\Ta\models\mongo\CustomerData
<?php # 商家SELLER 和 对应的 SELLERID 的设置。 namespace appadmin\code\Ta\models\mongo; use yii\mongodb\ActiveRecord; use fec\helpers\CDate; use fec\helpers\CConfig; use Yii; use appadmin\code\Ta\helper\mongoDb; # use appadmin\code\Ta\models\mongo\CustomerData; class CustomerData extends ActiveRecord { public static $_collectionName; # 定义db public static function getDb() { return \Yii::$app->get('mongodb_ta_date'); } # 定义collection name public static function collectionName() { return self::$_collectionName; } public static function initCollName($website_id){ self::$_collectionName = "ta_".$website_id."_customer_data"; } public function attributes() { // path mapping for '_id' is setup to field 'id' return [ '_id', 'value', ]; } }
appadmin\code\Ta\models\ElasticSearch\CustomerData
<?php namespace appadmin\code\Ta\models\elasticSearch; use yii\elasticsearch\ActiveRecord; class CustomerData extends ActiveRecord { public static $currentIndex; # 定义db链接 public static function getDb() { return \Yii::$app->get('elasticsearch_TA'); } # 不同的website 使用的是不同的db ,使用前需要先初始化 # db的名字 public static function initDb($website_id){ //echo 888; if($website_id){ //echo 999; self::$currentIndex = 'ta'."_".$website_id; //echo self::$currentIndex; //echo 3; } } # db public static function index() { return self::$currentIndex; } # table public static function type() { return 'customer_data'; } public function attributes() { // path mapping for '_id' is setup to field 'id' return [ 'id', 'uuid', 'customer_id', 'pv', 'ip', 'service_date_str', 'service_datetime', 'service_timestamp', 'devide', 'user_agent', 'browser_name', 'browser_version', 'browser_date', 'browser_lang', 'operate', 'operate_relase', 'domain', 'url', 'title', 'refer_url', 'first_referrer_domain', 'is_return', 'uuid', 'device_pixel_ratio', 'resolution', 'color_depth', 'website_id', 'sku', 'country_code', 'country_name', 'data', 'order_status', 'cart', 'order', 'category', 'login_email', 'register_email', 'search', 'currency', 'stay_seconds', ]; } }