来源:我是码农,转载请保留出处和链接!

本文链接:http://www.54manong/?id=1214

1 ETL详解

1.1 ETL

https://wwwblogs/yjd_hycf_space/p/7772722.html

ETL是将业务系统的数据经过抽取、清洗转换之后加载到数据仓库的过程。

ETL的设计分三部分:数据抽取、数据的清洗转换、数据的加载。

1.1.1 ElasticSearch

全文搜索引擎:http://www.ruanyifeng/blog/2017/08/elasticsearch.html

1.1.2 Kibana

通过Kibana,能够对Elasticsearch中的数据进行可视化并在Elastic Stack进行操作。

Kibana核心产品搭载了一批经典功能:柱状图、线状图、饼图、旭日图等。

https://www.elastic.co/cn/products/kibana

Kibana是一个针对Elasticsearch的开源分析及可视化平台,用来搜索、查看交互存储在Elasticsearch索引中的数据。使用Kibana,可以通过各种图表进行高级数据分析及展示

1.1.3 Logstash

https://www.elastic.co/cn/products/logstash

Logstash是开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据存储到数据库。

Logstash支持各种输入选择,可以在同一时间从众多常用来源捕捉时间。能够以连续的流式传输方式,轻松地从日志、指标、Web应用、数据存储以及各种AWS服务采集数据。

Logstash 提供众多输出选择,您可以将数据发送到您要指定的地方,并且能够灵活地解锁众多下游用例。

2 ETL启动报错

2.0.1 启动检查没有通过

修改/etc/security/limits.conf

修改/etc/security/limits.d/90-nproc.conf

3 ETL实例

3.1 elasticsearch常用请求

3.1.1 查看索引目录(装有es的机器上执行)

curl "localhost:9200/_cat/indices?v"

3.1.2 创建customer索引

curl -XPUT "localhost:9200/customer?pretty"

3.1.3 给customer索引创建文档

curl -XPUT "localhost:9200/customer/external/1?pretty" -d '{"name":"TEST"}'

3.1.4 在索引上查找文档

curl -XGET "localhost:9200/customer/external/1?pretty"

3.1.5 修改索引下的文档

curl -XPOST "localhost:9200/customer/external/1/_update?pretty" -d ' { "doc": { "name": "Lenovo","Location":"Beijing" }}'

3.2 ELK日志采集

ELK+syslog+nginx访问日志收集+分词处理

http://blog.51cto/lrtao2010/1949334

https://www.2cto/kf/201610/560348.html

https://blog.csdn/qq_22211217/article/details/80764568

3.2.1 控制台输入,控制台输出测试

编辑配置文件

将集群中的logstash停止,然后通过命令启动并指定刚创建的配置文件。

logstash -f test.conf

3.2.2 集群中修改配置文件logstash-data-source

添加配置组

添加配置如下

检测/var/log/ambary-server/ambary-server.log日志文件

查看ElasticSearch索引

curl "node18.sleap:9200/_cat/indices?v"

查看kibana展示

http://node16.sleap:5601

3.2.3 问题

logstash配置文件如何编写(过滤部分)

http://wwwblogs/yincheng/p/logstash.html

kibana如何使用

https://www.elastic.co/cn/products/kibana

https://www.elastic.co/guide/cn/kibana/current/dashboard.html

ElasticSearch中数据如何查看

https://www.yiibai/elasticsearch/elasticsearch-getting-start.html

3.3 ELK与kafka整合

参考连接

https://sematext/blog/kafka-connect-elasticsearch-how-to/

https://wwwblogs/smartloli/p/6978645.html

https://www.confluent.io/blog/the-simplest-useful-kafka-connect-data-pipeline-in-the-world-or-thereabouts-part-2/

http://wwwblogs/JetpropelledSnake/p/10057545.html

http://www.demodashi/demo/10181.html

https://blog.csdn/qq_37502106/article/details/79262721

3.3.1 日志处理流程

使用java把日志传入kafka,然后通过kafka将日志发送给logstash,logstash再将日志写入elasticsearch,这样elasticsearch就有了日志数据了,最后,则使用kibana将存放在elasticsearch中的日志数据显示出来,并且可以做实时的数据图表分析等等。

zookeeper查看leader节点:echo stat | nc node17.sleap 2181

3.3.2 kafka常用命令

https://wwwblogs/xtdxs/p/7112683.html

创建topic data1

kafka-topics --create --zookeeper node17.sleap:2181 --replication-factor 2 --partitions 1 --topic data2

查看所有topic

kafka-topics --list --zookeeper node17.sleap:2181

创建消费者消费topic data1

kafka-console-consumer --zookeeper node17.sleap:2181 --topic data2 --from-beginning

创建生产者

kafka-console-producer --broker-list node17.sleap:6667 --topic data2

3.3.3 查看ElasticSearch索引

curl "node18.sleap:9200/_cat/indices?v"

curl –XGET "node18.sleap:9200/kafka-logstash/_search"

3.3.4 kafka配置参数

https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html

https://segmentfault/a/1190000016595992

logstash高低版本配置不同:(具体配置可查看对应版本的官网配置介绍)

https://www.elastic.co/guide/en/logstash/index.html

logstash 5

bootstrap_servers => “node17.sleap:6667”

topics => [“data1”]

logstash 2.4

zk_connect => “node17.sleap:2181”

topic_id => “data1”

控制台输入,控制台输出stdin-stdout.conf

input {

  stdin {}

}

output {

  stdout {

    codec => rubydebug

  }

}

kafka输入数据到logstash

input-kafka.conf

input {

  kafka {

    bootstrap_servers => "10.110.181.39:6667"

    topics => ["data1"]

    type => "kafka.logstash"

  }

}

output {

  if [type] == "kafka.logstash" {

    stdout {

      codec => rubydebug

    }

  }

}

logstash输出数据到kafka

input {

  stdin {}

}

output {

  kafka {

    bootstrap_servers => "10.110.181.39:9092"

    topic_id => "data1"

  }

}

采集ambari-server.log到elasticsearch

# test ambari log

ambari-log.conf

input {

  file {

    path => "/var/log/ambari-server/ambari-server.log"

    start_position => beginning

    type => "ambari.log"

  }

}

filter {

 

}

output {

  if [type] == "ambari.log" {

    elasticsearch {

      hosts => ["node17.sleap:9200","node18.sleap:9200"]

      index => "ambari-log"

    }

  }

}

查看logstash插件版本是否与kafka版本兼容。

./bin/logstash-plugin list –verbose

logstash-input-kafka (5.1.7)

logstash-output-kafka (5.1.6)

logstash_LEAP3.4.4.0-5.4.1+ldh1.2.0+c0001-b0048.el6.x86_64.rpm

kafka_LEAP3.4.4.0-0.9.0+ldh1.2.0+c0001-b0061.el6.x86_64.rpm

elasticsearch_LEAP3.4.4.0-5.4.1+ldh1.2.0+c0001-b0061.NOARCH.rpm

https://cwiki.apache/confluence/display/KAFKA/Compatibility+Matrix

kafka版本不兼容:Kafka Connect: versions <= 0.10.0 or >= 0.10.2

plugins-inputs-kafka配置

https://www.elastic.co/guide/en/logstash/5.4/plugins-inputs-kafka.html

3.3.5 kafka问题: Error reading field 'topics'

https://blog.csdn/badyting/article/details/56667873

3.4 hive数据导入ElasticSearch

https://blog.csdn/qianshangding0708/article/details/50388750

http://www.voidcn/article/p-ftrfzdop-bqu.html

3.5 ElasticSearch数据导入hive

https://blog.csdn/shan1369678/article/details/51331296

4 本地验证

centos7 下kafka的安装介绍:

https://segmentfault/a/1190000012990954

centos7 安装部署ELK 6.2.4:

http://blog.51cto/andyxu/2124697

Logstash连接kafka指南

https://wdxtub/2016/08/18/logstash-kafka-guide/

启动zookeeper

zookeeper-server-start.sh kafka_2.12-1.0.0/config/zookeeper.properties > /dev/null 2>&1 &

启动kafka

kafka-server-start.sh kafka_2.12-1.0.0/config/server.properties > /dev/null 2>&1 &

创建topic

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic data1

查看topic

kafka-topics.sh --list --zookeeper localhost:2181

创建消费者

kafka-console-consumer.sh  --zookeeper 10.110.181.50:2181 --topic data1 --from-beginning

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

创建生产者

kafka-console-producer.sh --broker-list localhost:9092 --topic test

删除topic

kafka-topics.sh --zookeeper localhost:2181 --topic data1

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf

查看elasticsearch的index

查看input-kafka的内容

/usr/share/logstash/bin/logstash -f input-kafka.conf 

5 ELK实例

kafka接收数据,导入到logstash,过滤出特定ip日志,存储在hive。

更多推荐

ETL入门教程