Yii2 – 批量插入数据到 elasticSearch

elasticSearch 是目前来说,最强大的开源搜索引擎,对于一些搜索,放到ElasticSearch中,速度会快很多,当然,这个玩意也是非常消耗资源。

下面是,使用yii2,将数据批量导入到ES中,单行插入的效率太低,使用批量插入,速度还是可以。

安装ElasticSearch 这个参看

安装ElasticSearch ,以及在yii2中的使用

2. 安装yii2-ElasticSearch插件

https://github.com/yiisoft/yii2-elasticsearch

3. 配置

'elasticsearch_TA' => [
    'class' => 'yii\elasticsearch\Connection',
    'nodes' => [
        ['http_address' => '192.168.0.199:9200'],
        ['http_address' => '192.168.0.210:9200'],
    ],
],

4.使用

传递数据,我们还是用shell 脚本来传递数据 /appta/shell/customer/syncCustomerDataToEs.sh

#!/bin/sh

DIR=$(cd `dirname $0`; pwd)
# sync mongodb to elasticsearch
echo 'sync custom data to es'
processDate=$1
websiteIds=$2

arr=$(echo $websiteIds|tr "," "\n");
for website_id in $arr; do
  echo "website_id:".$website_id;
  variable=`$DIR/../../../yii ta/migrate/elasticsearch/customerdatapagecount $processDate $website_id`
  echo "$variable.."
  for (( i=1; i<=$variable; i++ ))
  do 
    $DIR/../../../yii ta/migrate/elasticsearch/customerdata $processDate $website_id $i 
    echo "Page $i done"
  done
done







controller文件:

<?php
namespace appadmin\code\Ta\console\migrate;
use Yii;
use appadmin\code\Ta\models\WebsiteBaseInfo;
use yii\console\Controller;
use appadmin\code\Ta\helper\mongoDb as MongoDb;
use appadmin\code\Ta\models\mongo\CustomerData as MgCustomerData;
use appadmin\code\Ta\models\elasticSearch\CustomerData as EsCustomerData;

use appadmin\code\Ta\models\mongo\TraceData as MgTraceData;
use appadmin\code\Ta\models\elasticSearch\TraceData as EsTraceData;


class ElasticsearchController extends Controller
{
  public $numPerPage = 1000;
  //public $dbName = "ta_".$processDate;
  //public $collName;
  
  public function initParam($processDate,$website_id){
    //$thidbName = "ta_".$processDate;
    $collName = "ta_".$website_id."_customer_data";
    //echo $processDate;exit;
    MongoDb::setDbByDate($processDate);
    MgCustomerData::initCollName($website_id);
    MgTraceData::initCollName($website_id);
  }
  # customer data  数据的总页数
  public function actionCustomerdatapagecount($processDate,$website_id){
    $this->initParam($processDate,$website_id);
    $count =  MgCustomerData::find()->count();
    //var_dump(MgCustomerData::getDb());
    //echo $count;exit;
    echo ceil($count/$this->numPerPage);
  }
  # 同步customer data的数据到ElasticSearch
  public function actionCustomerdata($processDate,$website_id,$pageNum){
    $this->initParam($processDate,$website_id);
    $skip = $this->numPerPage * ($pageNum - 1);
    $data = MgCustomerData::find()
        ->asArray()
        ->limit($this->numPerPage)
        ->offset($skip)
        ->all();
    $arr = [];
    $i = 0;
        
    if(is_array($data) && !empty($data )){
      $elasticsearch = Yii::$app->elasticsearch_TA;
      $bulkclient = $elasticsearch->createBulkCommand();
      //EsCustomerData::initDb($website_id);
      $index_name = 'ta_'.$website_id;
      $one_day_type = 'customer_data';
      //$EsCustomerDataOne = EsCustomerData::findOne($a['_id']);
      foreach($data  as $one){
        $i++;
        $a = [];
        $a['id'] = $one['_id'];
        $value = $one['value'];
        if(is_array($value) && !empty($value )){
          foreach($value  as $k => $v){
            if($k == 'data'){
              //var_dump($v);
              $v = serialize($v);
            }
            $a[$k] = $v;
          }
        }
        
        $bulkclient->addAction(array(
          'index' => array(
            '_index'=> $index_name,
            '_type' => $one_day_type,
            '_id' 	=> $one['_id'],
          )
        ), $a);
        /*
        # 保存数据到ES
        EsCustomerData::initDb($website_id);
        $EsCustomerDataOne = EsCustomerData::findOne($a['_id']);
        if(!$EsCustomerDataOne){
          $EsCustomerDataOne = new EsCustomerData;
          $EsCustomerDataOne->setPrimaryKey($a['_id']);
        }
        $EsCustomerDataOne->id = $a['_id'];
        $attributes = $EsCustomerDataOne->attributes();
        foreach($a as $k=>$v){
          if(in_array($k,$attributes)){
            if($k == 'data'){
              //var_dump($v);
              $v = serialize($v);
            }
            $EsCustomerDataOne[$k] = $v;
          }
        }
        $mtime=explode(' ',microtime());
        $startTime=$mtime[1]+$mtime[0];        
        
        $EsCustomerDataOne->save();
        $mtime=explode(' ',microtime());
        $endTime=$mtime[1]+$mtime[0];        
        echo "chaju_time :($i)".($endTime-$startTime)."\n"; 
        //$arr[] = $a; 
        */
      }
      $bulkclient->execute();
    }
    
    
  }
    
  # customer data  数据的总页数
  public function actionTracedatapagecount($processDate,$website_id){
    $this->initParam($processDate,$website_id);
    $count =  MgTraceData::find()->count();
    //var_dump(MgCustomerData::getDb());
    //echo $count;exit;
    echo ceil($count/$this->numPerPage);
  }
  # 同步customer data的数据到ElasticSearch
  public function actionTracedata($processDate,$website_id,$pageNum){
    $this->initParam($processDate,$website_id);
    $skip = $this->numPerPage * ($pageNum - 1);
    $data = MgTraceData::find()
        ->asArray()
        ->limit($this->numPerPage)
        ->offset($skip)
        ->all();
    $arr = [];
    $i = 0;
        
    if(is_array($data) && !empty($data )){
      $elasticsearch = Yii::$app->elasticsearch_TA;
      $bulkclient = $elasticsearch->createBulkCommand();
      //EsCustomerData::initDb($website_id);
      $index_name = 'ta_'.$website_id;
      $one_day_type = 'trace_data';
      //$EsCustomerDataOne = EsCustomerData::findOne($a['_id']);
      foreach($data  as $one){
        $i++;
        $a = [];
        
        if(is_array($one) && !empty($one )){
          foreach($one  as $k => $v){
            $a[$k] = $v;
          }
        }
        $a['id'] = $a['_id'];
        unset($a['_id']);
        
        $bulkclient->addAction(array(
          'index' => array(
            '_index'=> $index_name,
            '_type' => $one_day_type,
            '_id' 	=> $one['_id'],
          )
        ), $a);
        
      }
      $bulkclient->execute();
    }
    
    
  }	
    
    
    
    
    
    
    
    
    
    
    
}

appadmin\code\Ta\models\mongo\CustomerData

<?php  
# 商家SELLER 和  对应的 SELLERID 的设置。 
namespace appadmin\code\Ta\models\mongo; 
use yii\mongodb\ActiveRecord;
use fec\helpers\CDate;
use fec\helpers\CConfig;
use Yii;
use appadmin\code\Ta\helper\mongoDb;
# use appadmin\code\Ta\models\mongo\CustomerData; 
class CustomerData extends ActiveRecord  
{  
  
  public static $_collectionName;
  
  # 定义db
  public static function getDb()
    {
    return \Yii::$app->get('mongodb_ta_date');
    }
  
  
  
  # 定义collection name  
    public static function collectionName()  
    {  
        return self::$_collectionName;  
    }  
  
  
  
  
  public static function initCollName($website_id){
    self::$_collectionName = "ta_".$website_id."_customer_data";
  }
  
  
  public function attributes()
    {
        // path mapping for '_id' is setup to field 'id'
        return [
      '_id', 
      'value',
      
    ];
    }
  
  
}  

appadmin\code\Ta\models\ElasticSearch\CustomerData

<?php

namespace appadmin\code\Ta\models\elasticSearch;

use yii\elasticsearch\ActiveRecord;

class CustomerData extends ActiveRecord
{
  public static $currentIndex;
  
  # 定义db链接
  public static function getDb()
  {
    return \Yii::$app->get('elasticsearch_TA');
  }
  
  # 不同的website 使用的是不同的db ,使用前需要先初始化
  # db的名字
  public static function initDb($website_id){
    //echo 888;
    if($website_id){
      //echo 999;
      self::$currentIndex = 'ta'."_".$website_id;
      //echo self::$currentIndex;
      //echo 3;
    }
  }
  
  
  
  # db
  public static function index()
  {
    return self::$currentIndex;
  }
  # table
  public static function type()
  {
    return 'customer_data';
  }
  
   public function attributes()
    {
        // path mapping for '_id' is setup to field 'id'
        return [
      'id',
      
      'uuid',
      'customer_id',
      'pv',
      
      'ip',
      'service_date_str',
      'service_datetime',
      'service_timestamp',
      'devide',
      'user_agent',
      'browser_name',
      'browser_version',
      'browser_date',
      'browser_lang',
      'operate',
      'operate_relase',
      'domain',
      'url',
      'title',
      'refer_url',
      'first_referrer_domain',
      'is_return',
      'uuid',
      'device_pixel_ratio',
      'resolution',
      'color_depth',
      'website_id',
      'sku',
      'country_code',
      'country_name',
      
      'data',
      
      'order_status',
      'cart',
      'order',
      'category',
      'login_email',
      'register_email',
      'search',
      'currency',
      'stay_seconds',
    ];
    }
  
  
  
}

 

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注