自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

大數(shù)據(jù)Hadoop之EFAK和Confluent KSQL簡單使用

大數(shù)據(jù) Hadoop
kafka 0.9.x以后的版本新增了advertised.listeners?配置,kafka 0.9.x以后的版本不要使用 advertised.host.name 和 advertised.host.port 已經(jīng)deprecatedhost.name 和 port 為 deprecated,使用?listeners代替。

一、EFAK概述和安裝

關(guān)于EFAK的概述和安裝,可以參考我這篇文章:大數(shù)據(jù)Hadoop之——Kafka 圖形化工具 EFAK(EFAK環(huán)境部署)在講EFAK使用之前,這里先講一下listeners 和 advertised.listeners的用法,其實企業(yè)里還是會配置著兩個參數(shù)的。

二、listeners和advertised.listeners配置詳解

kafka 0.9.x以后的版本新增了advertised.listeners配置,kafka 0.9.x以后的版本不要使用 advertised.host.name 和 advertised.host.port 已經(jīng)deprecatedhost.name 和 port 為 deprecated,使用listeners代替。

  • listeners:就是主要用來定義Kafka Broker的Listener的配置項,listeners是kafka真正bind的地址。
  • advertised.listeners:參數(shù)的作用就是將Broker的Listener信息發(fā)布到Zookeeper中,是暴露給外部的listeners,如果沒有設(shè)置,會用listeners。
  • listener.security.protocol.map:配置監(jiān)聽者的安全協(xié)議的,主要有以下幾種協(xié)議:
  1. PLAINTEXT => PLAINTEXT 不需要授權(quán),非加密通道
  2. SSL => SSL 使用SSL加密通道
  3. SASL_PLAINTEXT => SASL_PLAINTEXT 使用SASL認證非加密通道
  4. SASL_SSL => SASL_SSL 使用SASL認證并且SSL加密通道
  • inter.broker.listener.name:專門用于Kafka集群中Broker之間的通信。
listener.security.protocol.map=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
# 外網(wǎng)訪問192.168.0.113:19092,內(nèi)網(wǎng)訪問:192.168.0.113:19093,當kafka部署在k8s時候就很有用
listeners=EXTERNAL://192.168.0.113:19092,INTERNAL://192.168.0.113:19093
inter.broker.listener.name=INTERNAL
# 如果advertised.listeners沒配置就使用listeners的配置
#advertised.listeners=EXTERNAL://192.168.0.113:19092,INTERNAL://192.168.0.113:19093

使用場景

  • 只有內(nèi)網(wǎng):比如在公司搭建的 kafka 集群,只有內(nèi)網(wǎng)中的服務(wù)可以用,這種情況下,只需要用 listeners 就行。示例如下:
# listeners=<協(xié)議名稱>://<內(nèi)網(wǎng)ip>:<端口>
listeners=EXTERNAL://192.168.0.113:19092
  • 內(nèi)外網(wǎng):在 k8s 中或者 在類似阿里云主機上部署 kafka 集群,這種情況下是 需要用到 advertised_listeners。示例如下:
listeners=INSIDE://0.0.0.0:9092,OUTSIDE://<公網(wǎng) ip>:端口(或者 0.0.0.0:端口)
advertised.listeners=INSIDE://localhost:9092,OUTSIDE://<宿主機ip>:<宿主機暴露的端口>
listener.security.protocol.map=INSIDE:SASL_PLAINTEXT,OUTSIDE:SASL_PLAINTEXT
kafka_inter_broker_listener_name:inter.broker.listener.name=INSIDE

三、KSQL使用

KSQL是一個用于Apache kafka的流式SQL引擎,KSQL在內(nèi)部使用Kafka的Streams API,KSQL降低了進入流處理的門檻,提供了一個簡單的、完全交互式的SQL接口,用于處理Kafka的數(shù)據(jù),可以讓我們在流數(shù)據(jù)上持續(xù)執(zhí)行 SQL 查詢,KSQL支持廣泛的強大的流處理操作,包括聚合、連接、窗口、會話等等。官方文檔:https://www.rittmanmead.com/blog/2017/10/ksql-streaming-sql-for-apache-kafka/

1)KSQL架構(gòu)

GitHub地址:https://github.com/confluentinc/ksql

圖片

圖片

2)Confluent安裝(ZK/KAFKA/KSQL)

ksql支持kafka0.11之后的版本,在confluent的V3和V4版本中默認并沒有加入ksql server程序,當然V3和V4是支持ksql的,在V5版本之后已經(jīng)默認加入ksql了,這里選擇最新版本7.1。其實Confluent 就是kafka的增加版,包含了kafka和zk。

下載地址:https://packages.confluent.io/archive/

1、下載confluent

$ cd /opt/bigdata/hadoop/software
$ wget https://packages.confluent.io/archive/7.1/confluent-7.1.1.tar.gz
$ tar -xf confluent-7.1.1.tar.gz -C /opt/bigdata/hadoop/server/

2、配置環(huán)境變量

$ vi /etc/profile
export CONFLUENT_HOME=/opt/bigdata/hadoop/server/confluent-7.1.1
export PATH=$CONFLUENT_HOME/bin:$PATH

$ source /etc/profile

3、創(chuàng)建log和data目錄

$ mkdir $CONFLUENT_HOME/etc/kafka/zookeeper_data $CONFLUENT_HOME/etc/kafka/zookeeper_logs $CONFLUENT_HOME/etc/kafka/logs

4、配置zk和kafka

  • 配置zk
$ cat > $CONFLUENT_HOME/etc/kafka/zookeeper.properties <<-EOF
# 配置主要修改如下:
#數(shù)據(jù)目錄
dataDir=/opt/bigdata/hadoop/server/confluent-7.1.1/etc/kafka/zookeeper_data
#日志目錄
# dataLogDir=/opt/bigdata/hadoop/server/confluent-7.1.1/etc/kafka/zookeeper_logs
#心跳間隔時間,zookeeper中使用的基本時間單位,毫秒值。每隔2秒發(fā)送一個心跳
tickTime=2000
#leader與客戶端連接超時時間。表示5個心跳間隔
initLimit=5
#Leader與Follower之間的超時時間,表示2個心跳間隔
syncLimit=2
#客戶端連接端口,默認端口2181
clientPort=12181
admin.enableServer=false
# admin.serverPort=8080
# zookeeper集群配置項,server.1,server.2,server.3是zk集群節(jié)點;hadoop-node1,hadoop-node2,hadoop-node3是主機名稱;2888是主從通信端口;3888用來選舉leader
server.1=hadoop-node1:2888:3888
server.2=hadoop-node2:2888:3888
server.3=hadoop-node3:2888:3888
EOF
  • 配置kafka
$ cat > $CONFLUENT_HOME/etc/kafka/server.properties <<-EOF
#broker的全局唯一編號,不能重復(fù)
broker.id=0

listener.security.protocol.map=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
# broker 服務(wù)器要監(jiān)聽的地址及端口 . 默認是 localhost:9092 ,0.0.0.0的話 ,表示監(jiān)聽本機的所有ip地址.本機配置:
# localhost : 只監(jiān)聽本機的地址請求, 客戶端也只能用 localhost 來請求
# 127.0.0.1 : 同localhost, 在請求上可能有與區(qū)分 , 看client的請求吧 . 客戶端也只能用127.0.0.1來請求
# 192.168.0.1 : 你的局域網(wǎng)不一定是 192.168 段的.所以一般不選這個
# 0.0.0.0 : 本機的所有地址都監(jiān)聽 , 包含 localhost , 127.0.0.1, 及不同網(wǎng)卡的所有ip地址 , 都監(jiān)聽 .
listeners=EXTERNAL://0.0.0.0:19092,INTERNAL://0.0.0.0:19093

# 是暴露給外部的listeners,如果沒有設(shè)置,會用listeners,參數(shù)的作用就是將Broker的Listener信息發(fā)布到Zookeeper中,注意其它節(jié)點得修改成本身的hostnaem或者ip,不支持0.0.0.0
advertised.listeners=EXTERNAL://hadoop-node1:19092,INTERNAL://hadoop-node1:19093

inter.broker.listener.name=INTERNAL

#刪除topic功能使能
delete.topic.enable=true
#處理網(wǎng)絡(luò)請求的線程數(shù)量
num.network.threads=3
#用來處理磁盤IO的現(xiàn)成數(shù)量
num.io.threads=8
#發(fā)送套接字的緩沖區(qū)大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400
#請求套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600
#kafka數(shù)據(jù)的存儲位置
log.dirs=/opt/bigdata/hadoop/server/confluent-7.1.1/etc/kafka/logs
#topic在當前broker上的分區(qū)個數(shù)
num.partitinotallow=1
#用來恢復(fù)和清理data下數(shù)據(jù)的線程數(shù)量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長時間,超時將被刪除
log.retention.hours=168
#配置連接Zookeeper集群地址
zookeeper.cnotallow=hadoop-node1:12181,hadoop-node2:12181,hadoop-node3:12181
#zookeeper連接超時時間
zookeeper.connection.timeout.ms=60000

EOF

3、把confluent copy到其它節(jié)點

$ scp -r $CONFLUENT_HOME hadoop-node2:/opt/bigdata/hadoop/server/
$ scp -r $CONFLUENT_HOME hadoop-node2:/opt/bigdata/hadoop/server/

【溫馨提示】其它幾點修改以下三點:

  • 記得要修改broker.id,把hadoop-node2上的broker.id設(shè)置1,把hadoop-node2上的broker.id設(shè)置2;
  • 設(shè)置環(huán)境變量
$ vi /etc/profile
export CONFLUENT_HOME=/opt/bigdata/hadoop/server/confluent-7.1.1
export PATH=$CONFLUENT_HOME/bin:$PATH

$ source /etc/profile
  • advertised.listeners地址修改成本機地址

4、設(shè)置zk myid

# 在hadoop-node1配置如下:
$ echo 1 > $CONFLUENT_HOME/etc/kafka/zookeeper_data/myid
# 在hadoop-node2配置如下:
$ echo 2 > $CONFLUENT_HOME/etc/kafka/zookeeper_data/myid
# 在hadoop-node3配置如下:
$ echo 3 > $CONFLUENT_HOME/etc/kafka/zookeeper_data/myid

5、修改ksql-server.properties文件

$ vi $CONFLUENT_HOME/etc/ksqldb/ksql-server.properties
#修改對應(yīng)的kafka的bootstrap server
bootstrap.servers=hadoop-node1:19092,hadoop-node2:19092,hadoop-node3:19092

6、開啟Kafka JMX監(jiān)控

# 在kafka-server-start文件中添加export JMX_PORT="9988",端口自定義就行
$ vi $CONFLUENT_HOME/bin/kafka-server-start

7、啟動服務(wù)(zk,kafka,ksql)

先停掉之前的kafka和zk

$ $KAFKA_HOME/bin/zookeeper-server-stop.sh
$ $KAFKA_HOME/bin/kafka-server-stop.sh

啟動zk(必須先啟動zk再啟動kafka)

$ cd $CONFLUENT_HOME
$ bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties

啟動kafka

$ cd $CONFLUENT_HOME
$ bin/kafka-server-start -daemon etc/kafka/server.properties

啟動KSQL server

# 默認端口8088,可以修改listeners字段來修改port
$ cd $CONFLUENT_HOME
$ ./bin/ksql-server-start ./etc/ksqldb/ksql-server.properties
# 后臺啟動
$ ./bin/ksql-server-start -daemon ./etc/ksqldb/ksql-server.properties

圖片

啟動ksql cli端

$ cd $CONFLUENT_HOME
$ ./bin/ksql http://0.0.0.0:8088

圖片

【溫習提示】其實也可以使用外部的zk和kafka

3)KSQL簡單使用

1、table和stream概述

  • stream:stream是一個無序的數(shù)據(jù)結(jié)構(gòu),stream中發(fā)生的事件是不可改變的,已經(jīng)被認定為事實,新的事件加入到這個stream中,現(xiàn)有的事實都不會改變。Streams可以從Kafka的topic創(chuàng)建出來,也可以從現(xiàn)有streams中派生出來。streams的基礎(chǔ)數(shù)據(jù)在Kafka的broker里的topic持久保存(持久化)。
  • table:table是stream的一個視圖,表示不斷變化的事實的集合。 它相當于傳統(tǒng)的數(shù)據(jù)庫表,但通過流式語義(如窗口)進行了豐富。 table中的事實是可變的,這意味著可以將新事實插入表中,同時可以更新或刪除現(xiàn)有事實。 可以從Kafka主題創(chuàng)建table,也可以從現(xiàn)有streams和tables中派生。 在這兩種情況下,table的基礎(chǔ)數(shù)據(jù)都在Kafka的broker里的topic中持久存儲(持久化)。

2、通過ksql-datagen工具創(chuàng)建topic和data

confluent自帶了一個ksql-datagen工具,可以創(chuàng)建和產(chǎn)生相關(guān)的topic和數(shù)據(jù),ksql-datagen可以指定的參數(shù)如下:

$ ksql-datagen

圖片

$ cd $CONFLUENT_HOME
$ ./bin/ksql-datagen quickstart=pageviews format=delimited topic=pageviews maxInterval=500 bootstrap-server=hadoop-node1:19092

ps:以上命令會源源不斷在stdin上輸出數(shù)據(jù),就是工具自己產(chǎn)生的數(shù)據(jù),如下樣例

[1653124249561L] --> ([ 1653124249561L | 'User_3' | 'Page_82' ]) ts:1653124249561
[1653124249561L] --> ([ 1653124249561L | 'User_9' | 'Page_24' ]) ts:1653124249561
[1653124249561L] --> ([ 1653124249561L | 'User_9' | 'Page_91' ]) ts:1653124249561
[1653124249561L] --> ([ 1653124249561L | 'User_2' | 'Page_61' ]) ts:1653124249561

ps:不過使用consumer消費出來的數(shù)據(jù)是如下樣式:

1653124249561L,User_3,Page_82
1653124249561L,User_9,Page_24
1653124249561L,User_9,Page_91
1653124249561L,User_2,Page_61

創(chuàng)建users,數(shù)據(jù)格式為json

$ cd $CONFLUENT_HOME
$ ./bin/ksql-datagen quickstart=users format=json topic=users maxInterval=100 bootstrap-server=hadoop-node1:19092

ps:以上命令會源源不斷在stdin上輸出數(shù)據(jù),就是工具自己產(chǎn)生的數(shù)據(jù),如下樣例:

['User_6'] --> ([ 1495933043739L | 'User_6' | 'Region_8' | 'OTHER' ]) ts:1653124467578
['User_3'] --> ([ 1489611795658L | 'User_3' | 'Region_7' | 'MALE' ]) ts:1653124467578
['User_5'] --> ([ 1496009798562L | 'User_5' | 'Region_2' | 'MALE' ]) ts:1653124467578

不過使用consumer消費出來的數(shù)據(jù)是如下樣式:

{"registertime":1495933043739L,"userid":"User_6","regionid":"Region_8","gender":"OTHER"}
{"registertime":1489611795658L,"userid":"User_3","regionid":"Region_7","gender":"MALE"}
{"registertime":1496009798562L,"userid":"User_5","regionid":"Region_2","gender":"MALE"}

3、創(chuàng)建stream和table

  • 創(chuàng)建stream

根據(jù)topic pageviews創(chuàng)建一個stream pageviews_original,value_format為DELIMITED

$ cd $CONFLUENT_HOME
$ ./bin/ksql http://0.0.0.0:8088

# 【溫馨提示】value_format 有三種格式 JSON(json格式)、DELIMITED(原生格式)、AVRO(Avro 格式是 Hadoop 的一種基于行的存儲格式)
CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED');

# 查看表詳情
DESCRIBE pageviews_original;
# 刪表
DROP STREAM pageviews_original;
# 查看topic
SHOW topics;
# 查看STREAMS
SHOW STREAMS;
  • 創(chuàng)建table

根據(jù)topic users創(chuàng)建一個table users_original,value_format為json,必須設(shè)置一個為主鍵,也可以指定副本很分區(qū)數(shù),默認都是1,, PARTITIONS=1, REPLICAS=1

CREATE TABLE users_original (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR PRIMARY KEY) WITH (kafka_topic='users', value_format='JSON')

圖片

  • 創(chuàng)建持久查詢
# 上面創(chuàng)建的表時不能直接查詢數(shù)據(jù)的
SELECT * FROM USERS_ORIGINAL LIMIT 3;
CREATE TABLE QUERYABLE_USERS_ORIGINAL AS SELECT * FROM USERS_ORIGINAL;

# 查詢數(shù)據(jù)
SELECT * FROM QUERYABLE_USERS_ORIGINAL LIMIT 5;

圖片

  • Kafka Connect 接收器
CREATE SINK CONNECTOR es_sink WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'topics' = 'clicks_transformed',
'key.ignore' = 'true',
'schema.ignore' = 'true',
'type.name' = '',
'connection.url' = 'http://hadoop-node1:9200');

4、持久化查詢

持久化查詢可以源源不斷的把查詢出的數(shù)據(jù)發(fā)送到你指定的topic中去,查詢的時候在select前面添加create stream關(guān)鍵字即可創(chuàng)建持久化查詢。

# 生成數(shù)據(jù)
$ cd $CONFLUENT_HOME
$ ./bin/ksql-datagen quickstart=pageviews format=delimited topic=pageviews maxInterval=500 bootstrap-server=hadoop-node1:19092

# 先刪除
DROP STREAM pageviews_original;
# 創(chuàng)建查詢
CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED');

# 直接查
select * from pageviews_original limit 10;

# 持久化查詢,PO會對應(yīng)一個topic
CREATE STREAM PO AS SELECT userid FROM pageviews_original EMIT CHANGES;
# 查詢新stream
SHOW STREAMS;
# 查詢執(zhí)行任務(wù)
SHOW QUERIES;

圖片

消費新數(shù)據(jù)

$ kafka-console-consumer --bootstrap-server 192.168.0.113:19092 --from-beginning --topic PO

終止查詢?nèi)蝿?wù)

SHOW QUERIES;
TERMINATE CSAS_PV_15;

四、Mock(生產(chǎn)者)

這個就是模擬測試的功能,就是推送數(shù)據(jù)到kafka topic,其實就是生產(chǎn)者。一般作為測試用。

圖片

消費數(shù)據(jù)

$ kafka-console-consumer.sh --bootstrap-server hadoop-node1:19092 --topic test001 --from-beginning

圖片

五、Manager

添加、查看、刪除topic配置等功能,操作也很簡單

圖片

六、Hub

topic數(shù)據(jù)遷移或是數(shù)據(jù)平衡

圖片

責任編輯:武曉燕 來源: 大數(shù)據(jù)與云原生技術(shù)分享
相關(guān)推薦

2018-07-11 13:33:43

大數(shù)據(jù)人工智能Hadoop

2015-05-05 11:18:18

大數(shù)據(jù)Hadoop技術(shù)處理

2013-05-06 10:22:28

大數(shù)據(jù)Hadoop

2012-12-03 23:19:12

Etu 知意圖大數(shù)據(jù)一體機

2014-11-11 10:47:19

hadoop數(shù)據(jù)流

2022-10-24 00:26:51

大數(shù)據(jù)Hadoop存儲層

2014-07-29 10:38:25

大數(shù)據(jù)Hadoop

2016-12-20 18:21:29

Hadoop大數(shù)據(jù)面試

2021-12-14 09:56:51

HadoopSparkKafka

2012-05-31 15:56:23

Hadoop大數(shù)據(jù)

2017-10-25 14:15:55

大數(shù)據(jù)Hadoop維度建模

2022-07-20 15:10:38

Docker大數(shù)據(jù)平臺

2011-10-09 13:37:25

大數(shù)據(jù)BigDataHadoop

2021-06-21 17:04:55

大數(shù)據(jù)Hadoop

2015-08-10 09:23:05

2012-08-08 09:53:23

HadoopMapReduce

2015-07-23 14:29:28

大數(shù)據(jù)sparkhadoop

2017-10-26 09:31:14

Hadoop維度建模Kimball

2020-04-22 14:34:42

大數(shù)據(jù)Hadoop技術(shù)

2013-04-12 10:56:31

大數(shù)據(jù)
點贊
收藏

51CTO技術(shù)棧公眾號