一.使用logstash同步订单数据(订单表和订单项表)到ElasticSearch:
1.到官网下载logstash:https://www.elastic.co/cn/downloads/logstash
2.安装logstash前,确保需要先安装java的jdk环境
3.下载后,解压:之后千万别到bin环境点击logstash.bat这个命令启动,这样会报错的
4.接下来,在logstash安装目录找到config文件夹,在那里新增一个文件夹,我新建的为shop文件夹,然后在里面添加如下文件:
5.开始时.last_run_item.txt和last_run_order.txt文件是没数据的
6.logstash_order.conf文件的配置如下:
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
jdbc {
type => "order_mast" #下面同步ES可以根据type进行区分,单是单个表同步是,可以不写这个
jdbc_driver_library => "../config/shop/mysql-connector-java-5.1.6-bin.jar" #这个是shop文件夹下的jar包
jdbc_paging_enabled => "true"
jdbc_page_size => "2000"
jdbc_driver_class => "com.mysql.jdbc.Driver"
#jdbc跟账号密码需改成对应环境的
jdbc_connection_string => "jdbc:mysql://192.168.50.117:3306/shop_dm?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false"
jdbc_user => "shop"
jdbc_password => "shop"
schedule => "* * * * *" #这个代表每分钟同步一次
statement_filepath => "../config/shop/order_mast.sql" #这个是shop文件下的sql文件
record_last_run => true
use_column_value => false
last_run_metadata_path => "../config/shop/last_run_order.txt" #这个是记录上一次更新的是什么时间,这样就可以实现增量新增了
clean_run => false
#是否将 字段(column) 名称转小写
lowercase_column_names => false
}
jdbc {
type => "order_item" #下面同步ES可以根据type进行区分,单是单个表同步是,可以不写这个
jdbc_driver_library => "../config/shop/mysql-connector-java-5.1.6-bin.jar" #这个是shop文件夹下的jar包
jdbc_paging_enabled => "true"
jdbc_page_size => "2000"
jdbc_driver_class => "com.mysql.jdbc.Driver" #这个代表每分钟同步一次
#jdbc跟账号密码需改成对应环境的
jdbc_connection_string => "jdbc:mysql://192.168.50.117:3306/shop_dm?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false"
jdbc_user => "shop"
jdbc_password => "shop"
schedule => "* * * * *"
statement_filepath => "../config/shop/order_item.sql" #这个是shop文件下的sql文件
record_last_run => true
use_column_value => false
last_run_metadata_path => "../config/shop/last_run_item.txt" #这个是记录上一次更新的是什么时间,这样就可以实现增量新增了
clean_run => false
#是否将 字段(column) 名称转小写
lowercase_column_names => false
}
}
filter {
#jdbc默认json,暂时没找到修改方法
#json {
# source => "message"
# remove_field => ["message"]
#}
mutate {
#需要移除的字段
remove_field => "@timestamp"
remove_field => "@version"
}
}
output {
if [type]=="order_mast"{
elasticsearch {
hosts => ["http://localhost:9200"]
#如果有账号密码,在下面添加,并去除#号
#user => elastic
#password => "elastic@test.com"
index => "shop_order_mast"
document_type => "order_mast" #这个在es7.0版本后就没有type属性了
document_id => "%{cod_order_id}"
}
}
if [type]=="order_item"{
elasticsearch {
hosts => ["http://localhost:9200"]
#如果有账号密码,在下面添加,并去除#号
#user => elastic
#password => "elastic@test.com"
index => "shop_order_item"
document_type => "order_item"
document_id => "%{cod_order_item_id}"
}
}
stdout {
codec => json_lines
}
}
//如果只有一张表的时候,单表output的配置:
output {
elasticsearch {
hosts => ["http://localhost:9200"]
#如果有账号密码,在下面添加,并去除#号
#user => elastic
#password => "elastic@test.com"
index => "shop_order_mast"
document_type => "order_mast" #这个在es7.0版本后就没有type属性了
document_id => "%{cod_order_id}"
}
stdout {
codec => json_lines
}
} }
//sql的写法,这里只提供orderItem
SELECT
`cod_order_item_id` , -- 注意,这里写了cod_order_item_id和下面同样下了cod_order_item_id的意义不一样,第一个是作为ES文档的Id,会跟上面logstash_order.conf文件的 document_id => "%{cod_order_item_id}"匹配上
`cod_order_item_id` as "orderItemId",
`cod_order_id`as "orderId",
`flg_item_type`as "itemType",
`cod_market_id`as "marketId",
`cod_item_id`as "itemId",
`cod_item_id_main`as "mainItemId",
`txt_name`as "itemTitle",
`cod_item_quantity`as "quantity",
`amt_item`as "itemPrice",
`cod_score_total`as "scoreTotal",
`amt_score`as "scoreAmount",
`amt_charge`as "chargeAmount",
`amt_standard_price`as "standardPrice",
`amt_balance_discount`as "balanceDiscountAmount",
`amt_payment_total`as "itemTotalAmount",
`amt_coupon_total`as "couponTotalAmount",
`amt_act_discount`as "actDiscountAmount",
`cod_order_parent_id`as "parentOrderId",
`cod_merchant_no`as "shopId",
`cod_create_user`as "createUserId",
DATE_FORMAT(
`dat_modify`,
'%Y-%m-%d %T'
) AS "updateTime",
DATE_FORMAT(
`dat_create`,
'%Y-%m-%d %T'
) AS "createTime",
`cod_modify_user`as "updateUserId"
from
shop_order_item
WHERE
dat_modify >= :sql_last_value -- 这个sql_last_value会读取shop文件夹下的last_run_item.txt的值,第一次同步时,没有该值,所以默认就会是1970年7月1日,相当于是全量新增了
7.如果运行过一次后,打开last_run_item.txt可以看到
8.启动logstash:需要保证你的ES已经启动了,并创建了对应的index和type
window环境:在安装目录bin文件下,打开命令窗口,或者打开命令窗口,切换到该路径: logstash -f ../config/shop/logstash_order.conf
如果是在linux环境,切换安装的bin目录执行:
nohup logstash -f ../config/shop/logstash_order.conf > ../logs/logstash.out &
9.之后打开ES查询数据
可以看到数据已经同步过来了
10.之后可以在项目中进行对应的数据操作了,因为该同步是一分钟同步一次,所以对于实时性要求特别高的,可以在代码中使用ES的crud操作也进行同步,这样就可以保证万无一失了
11.ES相关操作可以参考:https://www.cnblogs.com/yangxiaohui227/p/11237268.html
12.附上一个orderItem表的(ES版本为6.4.3)操作
@Configuration
public class ElasticsearchConfig implements InitializingBean{
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfig.class);
@Value("${elasticsearch.cluster.name}")
private String clusterName;
@Value("${elasticsearch.port}")
private Integer port;
@Value("${elasticsearch.host}")
private String host;
/**
* Springboot整合Elasticsearch 在项目启动前设置一下的属性,防止报错
* 解决netty冲突后初始化client时还会抛出异常
* java.lang.IllegalStateException: availableProcessors is already set to [4], rejecting [4]
*/
@PostConstruct
void init() {
System.setProperty("es.set.netty.runtime.available.processors", "false");
}
// @Before
@Bean
public TransportClient getTransportClient() {
TransportClient client=null;
LOGGER.info("elasticsearch init.");
try {
Settings settings = Settings.builder()
.put("cluster.name", clusterName) //集群名字
.put("client.transport.sniff", true)//增加嗅探机制,找到ES集群
.put("thread_pool.search.size", 5).build();//增加线程池个数
client = new PreBuiltTransportClient(settings);
TransportAddress transportAddress = new TransportAddress(InetAddress.getByName(host), port);
client.addTransportAddresses(transportAddress);
LOGGER.info("elasticsearch init success.");
return client;
} catch (Exception e) {
throw new RuntimeException("elasticsearch init fail."+ e);
}
} }
//高级查询对象
public class EsQueryObject {
private String orderId;
private String customerId;
private String txtOrderTitle;
private Integer orderStatus;
private Integer paymentStatus;
private String phone;
private String recieveName;
private String addresss;
private String orderSubmitTime_S;
private String orderSubmitTime_E;
private String payTime_S;
private String payTime_E;
private BigDecimal minPayAmount;
private BigDecimal maxPayAmount;
private String shopId;
private String itemId;
private String itemTile;
private Page page;
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getCustomerId() {
return customerId;
}
public void setCustomerId(String customerId) {
this.customerId = customerId;
}
public String getTxtOrderTitle() {
return txtOrderTitle;
}
public void setTxtOrderTitle(String txtOrderTitle) {
this.txtOrderTitle = txtOrderTitle;
}
public Integer getOrderStatus() {
return orderStatus;
}
public void setOrderStatus(Integer orderStatus) {
this.orderStatus = orderStatus;
}
public Integer getPaymentStatus() {
return paymentStatus;
}
public void setPaymentStatus(Integer paymentStatus) {
this.paymentStatus = paymentStatus;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public String getRecieveName() {
return recieveName;
}
public void setRecieveName(String recieveName) {
this.recieveName = recieveName;
}
public String getAddresss() {
return addresss;
}
public void setAddresss(String addresss) {
this.addresss = addresss;
}
public String getOrderSubmitTime_S() {
return orderSubmitTime_S;
}
public void setOrderSubmitTime_S(String orderSubmitTime_S) {
this.orderSubmitTime_S = orderSubmitTime_S;
}
public String getOrderSubmitTime_E() {
return orderSubmitTime_E;
}
public void setOrderSubmitTime_E(String orderSubmitTime_E) {
this.orderSubmitTime_E = orderSubmitTime_E;
}
public String getPayTime_S() {
return payTime_S;
}
public void setPayTime_S(String payTime_S) {
this.payTime_S = payTime_S;
}
public String getPayTime_E() {
return payTime_E;
}
public void setPayTime_E(String payTime_E) {
this.payTime_E = payTime_E;
}
public BigDecimal getMinPayAmount() {
return minPayAmount;
}
public void setMinPayAmount(BigDecimal minPayAmount) {
this.minPayAmount = minPayAmount;
}
public BigDecimal getMaxPayAmount() {
return maxPayAmount;
}
public void setMaxPayAmount(BigDecimal maxPayAmount) {
this.maxPayAmount = maxPayAmount;
}
public String getShopId() {
return shopId;
}
public void setShopId(String shopId) {
this.shopId = shopId;
}
public String getItemId() {
return itemId;
}
public void setItemId(String itemId) {
this.itemId = itemId;
}
public String getItemTile() {
return itemTile;
}
public void setItemTile(String itemTile) {
this.itemTile = itemTile;
}
public Page getPage() {
return page;
}
public void setPage(Page page) {
this.page = page;
}
}
package com.tft.shop.service.order;
import com.alibaba.fastjson.JSON;
import com.bootcrabframework.cloud.core.common.base.GenericBaseService;
import com.bootcrabframework.cloud.core.util.CommonUtil;
import com.bootcrabframework.cloud.core.util.DateUtil;
import com.google.common.collect.Lists;
import com.tft.shop.constant.order.OrderConstant;
import com.tft.shop.entity.es.EsShopOrderItem;
import com.tft.shop.entity.es.EsShopOrderItemRequestDTO;
import com.tft.shop.entity.order.ShopOrderItem;
import com.tft.shop.util.StringUtil;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Service
public class EsShopOrderItemService extends GenericBaseService {
@Resource
private TransportClient transportClient;
//批量新增
public void batchInsert(List<EsShopOrderItem> list){
if(CommonUtil.isNull(list)){
return;
}
BulkRequest bulkRequest = new BulkRequest();
list.forEach(a->{
IndexRequest indexRequest = new IndexRequest(OrderConstant.ES_ORDER_ITEM_INDEX, OrderConstant.ES_ORDER_ITEM_TYPE, a.getOrderItemId());
indexRequest.source(JSON.toJSONString(a), XContentType.JSON);
bulkRequest.add(indexRequest);
});
ActionFuture<BulkResponse> bulk = transportClient.bulk(bulkRequest);
boolean failures = bulk.actionGet().hasFailures();
if(!failures){
return; //没有失败
}
//如果有失败,输出哪一条是失败的
try {
BulkResponse bulkItemResponses = bulk.get();
if(bulkItemResponses==null){
return;
}
if(CommonUtil.isNull(bulkItemResponses.getItems())){
return;
}
for (BulkItemResponse bulkItemResponse : bulkItemResponses.getItems()) {
boolean failed = bulkItemResponse.isFailed();
if(failed){
logger.error("订单项插入ES失败,错误信息{},对应订单项编号{}",bulkItemResponse.getId(),bulkItemResponse.getFailureMessage());
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
//单条新增
public void insertOne(EsShopOrderItem item){
if(null==item){
return;
}
List<EsShopOrderItem> list =Lists.newArrayList();
list.add(item);
this.batchInsert(list);
}
//单条新增
public void insertOne(ShopOrderItem orderItem){
this.insertOne(shopOrderItemChangeToEsOrderItem(orderItem));
}
private EsShopOrderItem shopOrderItemChangeToEsOrderItem(ShopOrderItem orderItem){
if(null==orderItem){
return null;
}
EsShopOrderItem shopOrderItem = new EsShopOrderItem();
shopOrderItem.setOrderItemId(orderItem.getCodOrderItemId());
shopOrderItem.setOrderId(orderItem.getCodOrderId());
shopOrderItem.setItemType(orderItem.getFlgItemType());
shopOrderItem.setMarketId(orderItem.getCodMarketId());
shopOrderItem.setItemId(orderItem.getCodItemId());
shopOrderItem.setMainItemId(orderItem.getCodItemIdMain());
shopOrderItem.setItemTitle(orderItem.getTxtName());
shopOrderItem.setQuantity(orderItem.getCodItemQuantity());
shopOrderItem.setItemPrice(orderItem.getAmtItem());
shopOrderItem.setScoreTotal(orderItem.getCodScoreTotal());
shopOrderItem.setScoreAmount(orderItem.getAmtScore());
shopOrderItem.setChargeAmount(orderItem.getAmtCharge());
shopOrderItem.setStandardPrice(orderItem.getAmtStandardPrice());
shopOrderItem.setBalanceDiscountAmount(orderItem.getAmtBalanceDiscount());
shopOrderItem.setItemTotalAmount(orderItem.getAmtPaymentTotal());
shopOrderItem.setActDiscountAmount(orderItem.getAmtActDiscount());
shopOrderItem.setCouponTotalAmount(orderItem.getAmtCouponTotal());
shopOrderItem.setParentOrderId(orderItem.getCodOrderParentId());
shopOrderItem.setShopId(orderItem.getCodMerchantNo());
shopOrderItem.setCreateUserId(orderItem.getCodCreateUser());
if(null!=orderItem.getDatCreate()){
shopOrderItem.setCreateTime(DateUtil.dateFormat(orderItem.getDatCreate(),DateUtil.TIME_FORMAT_FULL));
}
if(null!=orderItem.getDatModify()){
shopOrderItem.setUpdateTime(DateUtil.dateFormat(orderItem.getDatModify(),DateUtil.TIME_FORMAT_FULL));
}
shopOrderItem.setUpdateUserId(orderItem.getCodModifyUser());
return shopOrderItem;
}
//删除
public void deleteOne(String orderItemId){
if(CommonUtil.isNull(orderItemId)){
return;
}
ActionFuture<DeleteResponse> actionFuture = transportClient.delete(new DeleteRequest(OrderConstant.ES_ORDER_ITEM_INDEX, OrderConstant.ES_ORDER_ITEM_TYPE, orderItemId));
if(actionFuture==null){
return;
}
DeleteResponse deleteResponse = actionFuture.actionGet();
if(null==deleteResponse || null==deleteResponse.status()){
return;
}
if(deleteResponse.status().getStatus()!=200){
logger.error("删除ES订单项,编号为{},删除失败",orderItemId);
}
}
//修改
public void updateOne(EsShopOrderItem esShopOrderItem){
if(null==esShopOrderItem){
return;
}
UpdateResponse updateResponse = transportClient.prepareUpdate(OrderConstant.ES_ORDER_ITEM_INDEX, OrderConstant.ES_ORDER_ITEM_TYPE, esShopOrderItem.getOrderItemId())
.setDoc(JSON.toJSONString(esShopOrderItem), XContentType.JSON).execute().actionGet();
if(null==updateResponse || null==updateResponse.status()){
return;
}
if(updateResponse.status().getStatus()!=200){
logger.error("修改ES订单项失败,编号为{}",esShopOrderItem.getOrderItemId());
}
}
//修改
public void updateOne(ShopOrderItem orderItem){
this.updateOne(this.shopOrderItemChangeToEsOrderItem(orderItem));
}
//查询单个
public EsShopOrderItem selectById(String orderItemId){
if(StringUtil.isEmpty(orderItemId)){
return null;
}
GetRequestBuilder ret = transportClient.prepareGet(OrderConstant.ES_ORDER_ITEM_INDEX, OrderConstant.ES_ORDER_ITEM_TYPE, orderItemId);
if(null==ret || null==ret.get()){
return null;
}
GetResponse response = ret.get();
if(StringUtil.isEmpty(response.getSourceAsString())){
return null;
}
return JSON.parseObject(response.getSourceAsString(),EsShopOrderItem.class);
}
/**
*
*
* @param req 高级查询对象,当用商品标题查询的时候,限制只返回最大2000条
* @return
*/
public List<EsShopOrderItem> queryAdvanced(EsShopOrderItemRequestDTO req){
if(null==req){
return null;
}
SearchRequest searchRequest = new SearchRequest(OrderConstant.ES_ORDER_ITEM_INDEX);
searchRequest.types(OrderConstant.ES_ORDER_ITEM_TYPE);
// 构造查询器
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
if(!StringUtils.isEmpty(req.getItemTitle())){
boolQueryBuilder.must(QueryBuilders.matchQuery("itemTitle",req.getItemTitle()));
}
if(!StringUtils.isEmpty(req.getItemId())){
boolQueryBuilder.must(QueryBuilders.termQuery("itemId",req.getItemId()));
}
if(!StringUtils.isEmpty(req.getShopId())){
boolQueryBuilder.must(QueryBuilders.termQuery("shopId",req.getShopId()));
}
if(!StringUtils.isEmpty(req.getCustomerId())){
boolQueryBuilder.must(QueryBuilders.termQuery("createUserId",req.getCustomerId()));
}
if(!StringUtils.isEmpty(req.getParentOrderId())){
boolQueryBuilder.must(QueryBuilders.termQuery("parentOrderId",req.getParentOrderId()));
}
if(!StringUtils.isEmpty(req.getOrderId())){
boolQueryBuilder.must(QueryBuilders.termQuery("orderId",req.getOrderId()));
}
if(null!=req.getItemType() && req.getItemType()>=0){
boolQueryBuilder.must(QueryBuilders.termQuery("itemType",req.getItemType()));
}
if(!StringUtils.isEmpty(req.getCreateStartTime())){
boolQueryBuilder.must(QueryBuilders.rangeQuery("createTime").gte(req.getCreateStartTime()));
}
if(!StringUtils.isEmpty(req.getCreateEndTime())){
boolQueryBuilder.must(QueryBuilders.rangeQuery("createTime").lte(req.getCreateEndTime()));
}
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
if(!StringUtils.isEmpty(req.getItemTitle())){ //注意分页from()的参数并不是页码,而是偏移量,如页数为num时,偏移量=(num-1)* pageSize
sourceBuilder.from(0).size(2000);
}
sourceBuilder.sort(new FieldSortBuilder("createTime").order(SortOrder.DESC));
searchRequest.source(sourceBuilder);
searchRequest.searchType(SearchType.QUERY_THEN_FETCH);
SearchResponse searchResponse = transportClient.search(searchRequest).actionGet();
if(null==searchResponse || null==searchResponse.getHits() || searchResponse.getHits().totalHits<=0){
return null;
}
List<EsShopOrderItem> list = new ArrayList<>();
SearchHits hits = searchResponse.getHits();
for (SearchHit hit : hits) {
String sourceAsString = hit.getSourceAsString();
EsShopOrderItem orderItem = JSON.parseObject(sourceAsString, EsShopOrderItem.class);
list.add(orderItem);
}
return list;
}
public List<String> queryOrderIdList(EsShopOrderItemRequestDTO req){
if(null==req){
return null;
}
List<EsShopOrderItem> shopOrderItems = this.queryAdvanced(req);
if(CommonUtil.isNull(shopOrderItems)){
return null;
}
return shopOrderItems.stream().map(a->a.getOrderId()).collect(Collectors.toList());
}
}
//附上shop_order_item的mapping配置:
put shop_order_item
{ "settings": { "analysis": { "analyzer": { "thai_analyzer": { "type": "custom", "tokenizer": "thai", "filter": [ "lowercase", "asciifolding" ] }, "caseSensitive": { "filter": "lowercase", "type": "custom", "tokenizer": "keyword" } } } },
"mappings": { "order_item": { "properties": { "orderId": { "type": "keyword" }, "parentOrderId": { "type": "keyword" }, "shopId": { "type": "keyword" },
"orderItemId": { "type": "keyword" }, "itemTitle": { "type": "text", "analyzer": "thai_analyzer", "search_analyzer": "thai_analyzer" }, "itemId": { "type": "keyword" }, "mainItemId": { "type": "keyword" }, "marketId": { "type": "keyword" }, "itemType": { "type": "integer" }, "quantity": { "type": "integer" }, "scoreTotal": { "type": "integer" },
"scoreAmount": { "type": "double" }, "chargeAmount": { "type": "double" }, "itemPrice": { "type": "double" }, "standardPrice": { "type": "double" }, "itemTotalAmount": { "type": "double" },
"couponTotalAmount": { "type": "double" }, "balanceDiscountAmount": { "type": "double" }, "actDiscountAmount": { "type": "double" },
"createTime": { "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd", "type": "date" }, "updateTime": { "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd", "type": "date" },
"createUserId": { "type": "keyword" }, "updateUserId": { "type": "keyword" }
} } } }
|