使用DataFrame直接创建hive表, 并作为其中的一个分区数据
test 1
1
2
3
4
5
6
|
table
.write
.format("hive")
.mode("overwrite")
.option("path", inputPath + "_table")
.insertInto(tableName)
|
error, 需要先创建表
1
|
Exception in thread "main" org.apache.spark.sql.AnalysisException: Table not found: hdp_lbg_ectech_ads.zp_compensate_ad_detail_test1;
|
test 2
首先判别表是否存在
1
2
3
4
5
6
7
8
|
def checkTableExist(tableName: String): Boolean = {
// https://www.mikulskibartosz.name/save-spark-dataframe-as-partitioned-hive-table/
val tableDbName = tableName.split("\\.")
// Dog.print(tableName, tableDbName.mkString(".."))
SparkEnvir.ss.sql(s"SHOW TABLES IN ${tableDbName(0)}")
.where(col("tableName") === tableDbName(1))
.count() == 1
}
|
表存在则直接插入, 表不存在则创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
ss.sql("SET hive.exec.dynamic.partition=true")
ss.sql("SET hive.exec.dynamic.partition.mode=nonstrict")
ss.sql("SET hive.exec.max.dynamic.partitions.pernode=400")
val writer: DataFrameWriter[Row] = table
.write
.mode("append") // overwrite
.format("parquet") // difference of hive and parquet(different with format(SERDE, INPUT, OUTPUT))
.option("path", outputTablePath) // external table
val tableExist = SparkUtils.checkTableExist(tableName)
if (tableExist) {
Dog.print(s"[insertInto(tableName)] ${tableName}")
writer.insertInto(tableName)
} else {
Dog.print(s"[saveAsTable(tableName)] ${tableName}")
writer
.partitionBy("dt")
.saveAsTable(tableName)
}
|
这里创建表时没有问题的, path下会根据partition自动生成路径, path/partition_name=xx
但是insertInto(tableName)一直在hanging状态, 这里还未排查出原因, 怀疑是external path的问题?
这里另外需要注意的是, 创建表需要partition信息, 而insertInto就不需要partition信息了, 会根据表名的metadata获取partition信息. 参考
test 3(worked)
创建表使用saveAsTable, 添加分区数据使用parquet, 然后使用MSCK REPAIR TABLE ${tableName}
更新数据路径的映射.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
ss.sql("SET hive.exec.dynamic.partition=true")
ss.sql("SET hive.exec.dynamic.partition.mode=nonstrict")
ss.sql("SET hive.exec.max.dynamic.partitions.pernode=400")
val writer: DataFrameWriter[Row] = table
.coalesce(1)
.write
.mode("append") // overwrite
.format("parquet") // difference of hive and parquet(different with format(SERDE, INPUT, OUTPUT))
.option("path", outputTablePath) // external table
.option("partitionOverwriteMode", "dynamic")
// .partitionBy("dt")
val tableExist = SparkUtils.checkTableExist(tableName)
if (tableExist) {
Dog.print(s"[insertInto(tableName)] ${tableName}")
writer.parquet(outputTablePath)
// writer.insertInto(tableName)
} else {
Dog.print(s"[saveAsTable(tableName)] ${tableName}")
writer.saveAsTable(tableName)
}
ss.sql(s"MSCK REPAIR TABLE ${tableName}")
val selectExample = s"SELECT * FROM ${tableName} WHERE dt=${date}"
Dog.print("[selectExample] " + selectExample)
ss.sql(selectExample).show()
SparkEnvir.close()
|
其他
1 使用DDL生成建表sql
1
2
3
4
5
6
7
8
9
10
|
def getCreateTableSql(df: DataFrame, tableName: String, partitioned: Array[String]): String = {
// https://stackoverflow.com/a/69365683/6494418
val schemaDDL = df.schema.toDDL
val partitionedStr = partitioned.map(_ + " STRING").mkString(",")
val createTableSql = s"CREATE TABLE ${tableName} (${schemaDDL}) PARTITIONED BY (${partitionedStr})"
Dog.print("[df.schema.fields] " + df.schema.fields.mkString(", "))
Dog.print("[create table sql] " + createTableSql)
Dog.print("[schemaDDL ] " + schemaDDL)
createTableSql
}
|
两者的区别在于序列化方式不同, 查看建表语句show create table tableName
parquet
1
2
3
4
5
6
|
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
|
hive
1
2
3
4
5
6
|
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
|
3 其他方式
之前使用DataFrame创建hive表的一个分区一直使用的是手动的, hivesql的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
---- CREATE TABLE IF NOT EXIST
CREATE EXTERNAL TABLE IF NOT EXISTS ${table_name} (
${col_normal_str}
)
COMMENT "create by wangke09"
PARTITIONED BY (
${col_partition_str}
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY "${fields_terminated}"
LINES TERMINATED BY "\n"
STORED AS TEXTFILE LOCATION "${table_location}";
---- LINK DATA
ALTER TABLE ${table_name}
ADD IF NOT EXISTS PARTITION(${partition_valued}) -- partition valued and sep by(,), like dt='1', bg='2'
LOCATION "${partition_location}";
-- link partition_location to partition_valued(no need align to partition_valued like table_location/1/2)
---- DROP TABLE IF EXISTS ${table_name};
---- SELECT * FROM ${table_name} WHERE ${partition_valued_and} ORDER BY RAND() LIMIT 500;
|
主要使用add partition去链接external data, 现在可以用更简单的方式创建表和连接表, 但是可能会有下面问题(未验证):
- external path的格式, 如果不是创建表的标准格式, MSCK REPAIR可能不能扫描(未测试), 这种情况只有通过add partition链接数据
- parquet是一种序列化方法(类似json, hive?, csv, 等), 非文本格式, 不能直接使用hadoop -text 查看原数据
- 创建表不能添加comment, 需要自己修改metadata
4 是否可以直接使用parquet创建表?
5 NULL值处理
不处理null值检索全部列(*
)貌似非常的慢, 最后做缺失值填充
json reader
json没有
1
|
var rawData: DataFrame = ss.read.json(files: _*).repartition(numPartition).cache()
|
refs
- https://www.jianshu.com/p/c1b0dc86f9b0 msck repair
- https://www.mikulskibartosz.name/save-spark-dataframe-as-partitioned-hive-table/
- https://stackoverflow.com/a/31371571/6494418
- https://stackoverflow.com/a/48251020/6494418
- https://stackoverflow.com/a/45706191/6494418
[TOC]