自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Kafka實(shí)時(shí)數(shù)據(jù)即席查詢應(yīng)用與實(shí)踐

大數(shù)據(jù)
在定位一些實(shí)時(shí)數(shù)據(jù)的Case時(shí),如果沒有對(duì)實(shí)時(shí)數(shù)據(jù)進(jìn)行歷史歸檔,在排查問題時(shí),沒有日志追述,會(huì)很難定位是哪個(gè)環(huán)節(jié)的問題。因此,我們需要對(duì)處理的這些實(shí)時(shí)數(shù)據(jù)進(jìn)行記錄歸檔并存儲(chǔ)。

一、背景

Kafka中的實(shí)時(shí)數(shù)據(jù)是以Topic的概念進(jìn)行分類存儲(chǔ),而Topic的數(shù)據(jù)是有一定時(shí)效性的,比如保存24小時(shí)、36小時(shí)、48小時(shí)等。

二、內(nèi)容

2.1 案例分析

這里以i視頻和vivo短視頻實(shí)時(shí)數(shù)據(jù)為例,之前存在這樣的協(xié)作問題:

數(shù)據(jù)上游內(nèi)容方提供實(shí)時(shí)Topic(存放i視頻和vivo短視頻相關(guān)實(shí)時(shí)數(shù)據(jù)),數(shù)據(jù)側(cè)對(duì)實(shí)時(shí)數(shù)據(jù)進(jìn)行邏輯處理后,發(fā)送給下游工程去建庫實(shí)時(shí)索引,當(dāng)任務(wù)執(zhí)行一段時(shí)間后,工程側(cè)建索引偶爾會(huì)提出數(shù)據(jù)沒有發(fā)送過去的Case,前期由于沒有對(duì)數(shù)據(jù)做存儲(chǔ),在定位問題的時(shí)候會(huì)比較麻煩,經(jīng)常需求查看實(shí)時(shí)日志,需要花費(fèi)很長(zhǎng)的時(shí)間來分析這些Case是出現(xiàn)在哪個(gè)環(huán)節(jié)。

為了解決這個(gè)問題,我們可以將實(shí)時(shí)Topic中的數(shù)據(jù),在發(fā)送給其他Topic的時(shí)候,添加跟蹤機(jī)制,進(jìn)行數(shù)據(jù)分流,Sink到存儲(chǔ)介質(zhì)(比如HDFS、Hive等)。這里,我們選擇使用Hive來進(jìn)行存儲(chǔ),主要是查詢方便,支持SQL來快速查詢。如下圖所示:

圖片

在實(shí)現(xiàn)優(yōu)化后的方案時(shí),有兩種方式可以實(shí)現(xiàn)跟蹤機(jī)制,它們分別是Flink SQL寫Hive、Flink DataStream寫Hive。接下來,分別對(duì)這兩種實(shí)現(xiàn)方案進(jìn)行介紹和實(shí)踐。

2.2 方案一:Flink SQL寫Hive

這種方式比較直接,可以在Flink任務(wù)里面直接操作實(shí)時(shí)Topic數(shù)據(jù)后,將消費(fèi)后的數(shù)據(jù)進(jìn)行分流跟蹤,作為日志記錄寫入到Hive表中,具體實(shí)現(xiàn)步驟如下:

  • 構(gòu)造Hive Catalog;
  • 創(chuàng)建Hive表;
  • 寫入實(shí)時(shí)數(shù)據(jù)到Hive表。

2.2.1 構(gòu)造Hive Catalog

在構(gòu)造Hive Catalog時(shí),需要初始化Hive的相關(guān)信息,部分代碼片段如下所示:

// 設(shè)置執(zhí)行環(huán)境
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
 StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,settings);
 // 構(gòu)造 Hive Catalog 名稱
 String name = "video-hive-catalog";
 // 初始化數(shù)據(jù)庫名
 String defaultDatabase = "comsearch";
 // Hive 配置文件路徑地址
 String hiveConfDir = "/appcom/hive/conf";
 // Hive 版本號(hào)
 String version = "3.1.2";
 // 實(shí)例化一個(gè) HiveCatalog 對(duì)象
 HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
 // 注冊(cè)HiveCatalog
 tEnv.registerCatalog(name, hive);
 // 設(shè)定當(dāng)前 HiveCatalog
 tEnv.useCatalog(name);
 // 設(shè)置執(zhí)行SQL為Hive
 tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
 // 使用數(shù)據(jù)庫
 tEnv.useDatabase("db1");

在以上代碼中,我們首先設(shè)置了 Flink 的執(zhí)行環(huán)境和表環(huán)境,然后創(chuàng)建了一個(gè) HiveCatalog,并將其注冊(cè)到表環(huán)境中。

2.2.2 創(chuàng)建Hive表

如果Hive表不存在,可以通過在程序中執(zhí)行建表語句,具體SQL見表語句代碼如下所示:

-- 創(chuàng)建表語句 
tEnv.executeSql("CREATE TABLE IF NOT EXISTS TABLE `xxx_table`(
  `content_id` string,
  `status` int)
PARTITIONED BY (
  `dt` string,
  `h` string,
  `m` string)
stored as ORC
TBLPROPERTIES (
  'auto-compaction'='true',
  'sink.partition-commit.policy.kind'='metastore,success-file',
  'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00'
)")

在創(chuàng)建Hive表時(shí)我們使用了IF NOT EXISTS關(guān)鍵字,如果Hive中該表不存在會(huì)自動(dòng)在Hive上創(chuàng)建,也可以提前在Hive中創(chuàng)建好該表,F(xiàn)link SQL中就無需再執(zhí)行建表SQL,因?yàn)橛昧薍ive的Catalog,F(xiàn)link SQL運(yùn)行時(shí)會(huì)找到表。這里,我們?cè)O(shè)置了auto-compaction屬性為true,用來使小文件自動(dòng)合并,1.12版的新特性,解決了實(shí)時(shí)寫Hive產(chǎn)生的小文件問題。同時(shí),指定metastore值是專門用于寫入Hive的,也需要指定success-file值,這樣CheckPoint觸發(fā)完數(shù)據(jù)寫入磁盤后會(huì)創(chuàng)建_SUCCESS文件以及Hive metastore上創(chuàng)建元數(shù)據(jù),這樣Hive才能夠?qū)@些寫入的數(shù)據(jù)可查。

2.2.3 寫入實(shí)時(shí)數(shù)據(jù)到Hive表

在準(zhǔn)備完成2.2.1和2.2.2中的步驟后,接下來就可以在Flink任務(wù)中通過SQL來對(duì)實(shí)時(shí)數(shù)據(jù)進(jìn)行操作了,具體實(shí)現(xiàn)代碼片段如下所示:

// 編寫業(yè)務(wù)SQL
 String insertSql = "insert into  xxx_table SELECT content_id, status, " +
                    " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM xxx_rt";
 // 執(zhí)行 Hive SQL
 tEnv.executeSql(insertSql);
 // 執(zhí)行任務(wù)
 env.execute();

將消費(fèi)后的數(shù)據(jù)進(jìn)行分類,編寫業(yè)務(wù)SQL語句,將消費(fèi)的數(shù)據(jù)作為日志記錄,發(fā)送到Hive表進(jìn)行存儲(chǔ),這樣Kafka中的實(shí)時(shí)數(shù)據(jù)就存儲(chǔ)到Hive了,方便使用Hive來對(duì)Kafka數(shù)據(jù)進(jìn)行即席分析。

2.2.4 避坑技巧

使用這種方式在處理的過程中,如果配置使用的是EventTime,在程序中配置'sink.partition-commit.trigger'='partition-time',最后會(huì)出現(xiàn)無法提交分區(qū)的情況。經(jīng)過對(duì)源代碼PartitionTimeCommitTigger的分析,找到了出現(xiàn)這種異常情況的原因。

我們可以通過看

org.apache.flink.table.filesystem.stream.PartitionTimeCommitTigger#committablePartitionsorg.apache.flink.table.filesystem.stream.PartitionTimeCommitTigger#committablePartitions

中的一個(gè)函數(shù),來說明具體的問題,部分源代碼片段如下:

// PartitionTimeCommitTigger源代碼函數(shù)代碼片段
@Override
public List<String> committablePartitions(long checkpointId) {
 if (!watermarks.containsKey(checkpointId)) {
  throw new IllegalArgumentException(String.format(
    "Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
    checkpointId, watermarks));
 }
 long watermark = watermarks.get(checkpointId);
 watermarks.headMap(checkpointId, true).clear();
 
 List<String> needCommit = new ArrayList<>();
 Iterator<String> iter = pendingPartitions.iterator();
 while (iter.hasNext()) {
  String partition = iter.next();
  // 通過分區(qū)的值來獲取分區(qū)的時(shí)間
  LocalDateTime partTime = extractor.extract(
    partitionKeys, extractPartitionValues(new Path(partition)));
  // 判斷水印是否大于分區(qū)創(chuàng)建時(shí)間+延遲時(shí)間
  if (watermark > toMills(partTime) + commitDelay) {
   needCommit.add(partition);
   iter.remove();
  }
 }
 return needCommit;
}

通過分析上述代碼片段,我們可以知道系統(tǒng)通過分區(qū)值來抽取相應(yīng)的分區(qū)來創(chuàng)建時(shí)間,然后進(jìn)行比對(duì),比如我們?cè)O(shè)置的時(shí)間 pattern 是 '$dt $h:$m:00' , 某一時(shí)刻我們正在往 /2022-02-26/18/20/ 這個(gè)分區(qū)下寫數(shù)據(jù),那么程序根據(jù)分區(qū)值,得到的 pattern 將會(huì)是2022-02-26 18:20:00,這個(gè)值在SQL中是根據(jù) DATA_FORMAT 函數(shù)獲取的。

而這個(gè)值是帶有時(shí)區(qū)的,比如我們的時(shí)區(qū)設(shè)置為東八區(qū),2022-02-26 18:20:00這個(gè)時(shí)間是東八區(qū)的時(shí)間,換成標(biāo)準(zhǔn) UTC 時(shí)間是減去8個(gè)小時(shí),也就是2022-02-26 10:20:00,而在源代碼中的 toMills 函數(shù)在處理這個(gè)東八區(qū)的時(shí)間時(shí),并沒有對(duì)時(shí)區(qū)進(jìn)行處理,把這個(gè)其實(shí)應(yīng)該是東八區(qū)的時(shí)間當(dāng)做了 UTC 時(shí)間來處理,這樣計(jì)算出來的值就比實(shí)際值大8小時(shí),導(dǎo)致一直沒有觸發(fā)分區(qū)的提交。

如果我們?cè)跀?shù)據(jù)源中構(gòu)造的分區(qū)是 UTC 時(shí)間,也就是不帶分區(qū)的時(shí)間,那么這個(gè)邏輯就是沒有問題的,但是這樣又不符合我們的實(shí)際情況,比如對(duì)于分區(qū)2022-02-26 18:20:00,我希望我的分區(qū)肯定是東八區(qū)的時(shí)間,而不是比東八區(qū)小8個(gè)小時(shí)的UTC時(shí)間2022-02-26 10:20:00。

在明白了原因之后,我們就可以針對(duì)上述異常情況進(jìn)行優(yōu)化我們的實(shí)現(xiàn)方案,比如自定義一個(gè)分區(qū)類、或者修改缺省的時(shí)間分區(qū)類。比如,我們使用TimeZoneTableFunction類來實(shí)現(xiàn)一個(gè)自定義時(shí)區(qū),部分參考代碼片段如下:

public class CustomTimeZoneTableFunction implements TimeZoneTableFunction {
  private transient DateTimeFormatter formatter;
  private String timeZoneId;
  public CustomTimeZoneTableFunction(String timeZoneId) {
    this.timeZoneId = timeZoneId;
  }
  @Override
  public void open(FunctionContext context) throws Exception {
    // 初始化 DateTimeFormatter 對(duì)象
    formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:00");
    formatter = formatter.withZone(ZoneId.of(timeZoneId));
  }
  @Override
  public void eval(Long timestamp, Collector<TimestampWithTimeZone> out) {
    // 將時(shí)間戳轉(zhuǎn)換為 LocalDateTime 對(duì)象
    LocalDateTime localDateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneOffset.UTC);
    // 將 LocalDateTime 對(duì)象轉(zhuǎn)換為指定時(shí)區(qū)下的 LocalDateTime 對(duì)象
    LocalDateTime targetDateTime = localDateTime.atZone(ZoneId.of(timeZoneId)).toLocalDateTime();
    // 將 LocalDateTime 對(duì)象轉(zhuǎn)換為 TimestampWithTimeZone 對(duì)象,并輸出到下游
    out.collect(TimestampWithTimeZone.fromLocalDateTime(targetDateTime, ZoneId.of(timeZoneId)));
  }
}

2.3 方案二:Flink DataStream寫Hive

在一些特殊的場(chǎng)景下,F(xiàn)link SQL如果無法實(shí)現(xiàn)我們復(fù)雜的業(yè)務(wù)需求,那么我們可以考慮使用Flink DataStream寫Hive這種實(shí)現(xiàn)方案。比如如下業(yè)務(wù)場(chǎng)景,現(xiàn)在需要實(shí)現(xiàn)這樣一個(gè)業(yè)務(wù)需求,內(nèi)容方將實(shí)時(shí)數(shù)據(jù)寫入到Kafka消息隊(duì)列中,然后由數(shù)據(jù)側(cè)通過Flink任務(wù)消費(fèi)內(nèi)容方提供的數(shù)據(jù)源,接著對(duì)消費(fèi)的數(shù)據(jù)進(jìn)行分流處理(這里的步驟和Flink SQL寫Hive的步驟類似),每分鐘進(jìn)行存儲(chǔ)到HDFS(MapReduce任務(wù)需要計(jì)算和重跑HDFS數(shù)據(jù)),然后通過MapReduce任務(wù)將HDFS上的這些日志數(shù)據(jù)生成Hive所需要格式,最后將這些Hive格式數(shù)據(jù)文件加載到Hive表中。實(shí)現(xiàn)Kafka數(shù)據(jù)到Hive的即席分析功能,具體實(shí)現(xiàn)流程細(xì)節(jié)如下圖所示:

圖片

具體核心實(shí)現(xiàn)步驟如下:

  • 消費(fèi)內(nèi)容方Topic實(shí)時(shí)數(shù)據(jù);
  • 生成數(shù)據(jù)預(yù)處理策略;
  • 加載數(shù)據(jù);
  • 使用Hive SQL對(duì)Kafka數(shù)據(jù)進(jìn)行即席分析。

2.3.1 消費(fèi)內(nèi)容方Topic實(shí)時(shí)數(shù)據(jù)

編寫消費(fèi)Topic的Flink代碼,這里不對(duì)Topic中的數(shù)據(jù)做邏輯處理,在后面統(tǒng)一交給MapReduce來做數(shù)據(jù)預(yù)處理,直接消費(fèi)并存儲(chǔ)到HDFS上。具體實(shí)現(xiàn)代碼如下所示:

public class Kafka2Hdfs {
    public static void main(String[] args) {
        // 判斷參數(shù)是否有效
        if (args.length != 3) {
            LOG.error("kafka(server01:9092), hdfs(hdfs://cluster01/data/), flink(parallelism=2) must be exist.");
            return;
        }
        // 初始化Kafka連接地址和HDFS存儲(chǔ)地址以及Flink并行度
        String bootStrapServer = args[0];
        String hdfsPath = args[1];
        int parallelism = Integer.parseInt(args[2]);
 
        // 實(shí)例化一個(gè)Flink任務(wù)對(duì)象
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        env.setParallelism(parallelism);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // Flink消費(fèi)Topic中的數(shù)據(jù)
        DataStream<String> transction = env.addSource(new FlinkKafkaConsumer010<>("test_bll_topic", new SimpleStringSchema(), configByKafkaServer(bootStrapServer)));
        // 實(shí)例化一個(gè)HDFS存儲(chǔ)對(duì)象
        BucketingSink<String> sink = new BucketingSink<>(hdfsPath);
        // 自定義存儲(chǔ)到HDFS上的文件名,用小時(shí)和分鐘來命名,方便后面算策略
        sink.setBucketer(new DateTimeBucketer<String>("HH-mm"));
        // 設(shè)置存儲(chǔ)HDFS的文件大小和存儲(chǔ)文件時(shí)間頻率
        sink.setBatchSize(1024 * 1024 * 4);
        sink.setBatchRolloverInterval(1000 * 30);
        transction.addSink(sink);
        env.execute("Kafka2Hdfs");
    }
    // 初始化Kafka對(duì)象連接信息
    private static Object configByKafkaServer(String bootStrapServer) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", bootStrapServer);
        props.setProperty("group.id", "test_bll_group");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
}

注意事項(xiàng):

  • 這里我們把時(shí)間窗口設(shè)置小一些,每30s做一次Checkpoint,如果該批次的時(shí)間窗口沒有數(shù)據(jù)過來,就生成一個(gè)文件落地到HDFS上;
  • 另外,我們重寫了Bucketer為DateTimeBucketer,邏輯并不復(fù)雜,在原有的方法上加一個(gè)年-月-日/時(shí)-分的文件生成路徑,例如在HDFS上的生成路徑:xxxx/2022-02-26/00-00。

具體DateTimeBucketer實(shí)現(xiàn)代碼如下所示:

public class DateMinuteBucketer implements Bucketer<String> {
    private SimpleDateFormat baseFormatDay = new SimpleDateFormat("yyyy-MM-dd");
    private SimpleDateFormat baseFormatMin = new SimpleDateFormat("HH-mm");
    @Override
    public Path getBucketPath(Clock clock, Path basePath, String element) {
        return new Path(basePath + "/" + baseFormatDay.format(new Date()) + "/" + baseFormatMin.format(new Date()));
    }
}

2.3.2 生成數(shù)據(jù)預(yù)處理策略

這里,我們需要對(duì)落地到HDFS上的文件進(jìn)行預(yù)處理,處理的邏輯是這樣的。比如,現(xiàn)在是2022-02-26 14:00,那么我們需要將當(dāng)天的13:55,13:56,13:57,13:58,13:59這最近5分鐘的數(shù)據(jù)處理到一起,并加載到Hive的最近5分鐘的一個(gè)分區(qū)里面去。那么,我們需要生成這樣一個(gè)邏輯策略集合,用HH-mm作為key,與之最近的5個(gè)文件作為value,進(jìn)行數(shù)據(jù)預(yù)處理合并。具體實(shí)現(xiàn)代碼步驟如下:

  • 步驟一:獲取小時(shí)循環(huán)策略;
  • 步驟二:獲取分鐘循環(huán)策略;
  • 步驟三:判斷是否為5分鐘的倍數(shù);
  • 步驟四:對(duì)分鐘級(jí)別小于10的數(shù)字做0補(bǔ)齊(比如9補(bǔ)齊后變成09);
  • 步驟五:對(duì)小時(shí)級(jí)別小于10的數(shù)字做0補(bǔ)齊(比如1補(bǔ)齊后變成01);
  • 步驟六:生成時(shí)間范圍;
  • 步驟七:輸出結(jié)果。

其中,主要的邏輯是在生成時(shí)間范圍的過程中,根據(jù)小時(shí)和分鐘數(shù)的不同情況,生成不同的時(shí)間范圍,并輸出結(jié)果。在生成時(shí)間范圍時(shí),需要注意前導(dǎo)0的處理,以及特殊情況(如小時(shí)為0、分鐘為0等)的處理。最后,將生成的時(shí)間范圍輸出即可。

根據(jù)上述步驟編寫對(duì)應(yīng)的實(shí)現(xiàn)代碼,生成當(dāng)天所有日期命名規(guī)則,預(yù)覽部分結(jié)果如下:

圖片

需要注意的是,如果發(fā)生了第二天00:00,那么我們需要用到前一天的00-00=>23-59,23-58,23-57,23-56,23-55這5個(gè)文件中的數(shù)據(jù)來做預(yù)處理。

2.3.3 加載數(shù)據(jù)

在完成2.3.1和2.3.2里面的內(nèi)容后,接下來,我們可以使用Hive的load命令直接加載HDFS上預(yù)處理后的文件,把數(shù)據(jù)加載到對(duì)應(yīng)的Hive表中,具體實(shí)現(xiàn)命令如下:

-- 加載數(shù)據(jù)到Hive表
load data inpath '<hdfs_path_hfile>' overwrite into table xxx.table partition(day='2022-02-26',hour='14',min='05')

2.3.4 即席分析

之后,我們使用Hive SQL來對(duì)Kafka數(shù)據(jù)進(jìn)行即席分析,示例SQL如下所示:

-- 查詢某5分鐘分區(qū)數(shù)據(jù)
select * from xxx.table where day='2022-02-26' and hour='14' and min='05'

2.4 Flink SQL與 Flink DataStream如何選擇

Flink SQL 和 Flink DataStream 都是 Flink 中用于處理數(shù)據(jù)的核心組件,我們可以根據(jù)自己實(shí)際的業(yè)務(wù)場(chǎng)景來選擇使用哪一種組件。

Flink SQL 是一種基于 SQL 語言的數(shù)據(jù)處理引擎,它可以將 SQL 查詢語句轉(zhuǎn)換為 Flink 的數(shù)據(jù)流處理程序。相比于 Flink DataStream,F(xiàn)link SQL 更加易于使用和維護(hù),同時(shí)具有更快的開發(fā)速度和更高的代碼復(fù)用性。Flink SQL 適用于需要快速開發(fā)和部署數(shù)據(jù)處理任務(wù)的場(chǎng)景,比如數(shù)據(jù)倉庫、實(shí)時(shí)報(bào)表、數(shù)據(jù)清洗等。

Flink DataStream API是Flink數(shù)據(jù)流處理標(biāo)準(zhǔn)API,SQL是Flink后期版本提供的新的數(shù)據(jù)處理操作接口。SQL的引入為提高了Flink使用的靈活性??梢哉J(rèn)為Flink SQL是一種通過字符串來定義數(shù)據(jù)流處理邏輯的描述語言。

因此,在選擇 Flink SQL 和 Flink DataStream 時(shí),需要根據(jù)具體的業(yè)務(wù)需求和數(shù)據(jù)處理任務(wù)的特點(diǎn)來進(jìn)行選擇。如果需要快速開發(fā)和部署任務(wù),可以選擇使用 Flink SQL;如果需要進(jìn)行更為深入和定制化的數(shù)據(jù)處理操作,可以選擇使用 Flink DataStream。同時(shí),也可以根據(jù)實(shí)際情況,結(jié)合使用 Flink SQL 和 Flink DataStream 來完成復(fù)雜的數(shù)據(jù)處理任務(wù)。

三、 總結(jié)

在實(shí)際應(yīng)用中,Kafka實(shí)時(shí)數(shù)據(jù)即席查詢可以用于多種場(chǎng)景,如實(shí)時(shí)監(jiān)控、實(shí)時(shí)報(bào)警、實(shí)時(shí)統(tǒng)計(jì)、實(shí)時(shí)分析等。具體應(yīng)用和實(shí)踐中,需要注意以下幾點(diǎn):

  • 數(shù)據(jù)質(zhì)量:Kafka實(shí)時(shí)數(shù)據(jù)即席查詢需要保證數(shù)據(jù)質(zhì)量,避免數(shù)據(jù)重復(fù)、丟失或錯(cuò)誤等問題,需要進(jìn)行數(shù)據(jù)質(zhì)量監(jiān)控和調(diào)優(yōu)。
  • 系統(tǒng)復(fù)雜性:Kafka實(shí)時(shí)數(shù)據(jù)即席查詢需要涉及到多個(gè)系統(tǒng)和組件,包括Kafka、數(shù)據(jù)處理引擎(比如Flink)、查詢引擎(比如Hive)等,需要對(duì)系統(tǒng)進(jìn)行配置和管理,增加了系統(tǒng)的復(fù)雜性。
  • 安全性:Kafka實(shí)時(shí)數(shù)據(jù)即席查詢需要加強(qiáng)數(shù)據(jù)安全性保障,避免數(shù)據(jù)泄露或數(shù)據(jù)篡改等安全問題,做好Hive的權(quán)限管控。
  • 性能優(yōu)化:Kafka實(shí)時(shí)數(shù)據(jù)即席查詢需要對(duì)系統(tǒng)進(jìn)行性能優(yōu)化,包括優(yōu)化數(shù)據(jù)處理引擎、查詢引擎等,提高系統(tǒng)的性能和效率。
責(zé)任編輯:龐桂玉 來源: vivo互聯(lián)網(wǎng)技術(shù)
相關(guān)推薦

2023-10-13 07:25:50

2021-07-22 18:29:58

AI

2023-08-29 10:20:00

2022-05-23 13:30:48

數(shù)據(jù)胡實(shí)踐

2023-05-06 07:19:48

數(shù)倉架構(gòu)技術(shù)架構(gòu)

2023-12-11 08:00:00

架構(gòu)FlinkDruid

2021-11-30 07:49:00

大數(shù)據(jù)工具 Presto

2018-08-23 07:40:58

Spark流處理數(shù)據(jù)流

2023-10-11 14:37:21

工具開發(fā)

2022-07-14 15:29:26

數(shù)據(jù)庫實(shí)踐

2021-07-29 08:00:00

開源數(shù)據(jù)技術(shù)

2021-09-13 13:46:29

Apache HudiB 站數(shù)據(jù)湖

2022-06-27 09:09:34

快手Flink數(shù)倉建設(shè)

2023-07-27 07:44:07

云音樂數(shù)倉平臺(tái)

2022-07-07 10:19:05

數(shù)據(jù)畫像

2021-08-31 10:18:34

Flink 數(shù)倉一體快手

2023-11-21 08:11:48

Kafka的分區(qū)策略

2020-05-29 17:10:15

數(shù)據(jù)架構(gòu)數(shù)據(jù)一切數(shù)據(jù)體系

2020-04-28 11:04:51

數(shù)據(jù)架構(gòu)互聯(lián)網(wǎng)Flink

2024-12-19 09:45:24

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)