Spark BullkLoad
bulkload说明
推荐阅读:Hbase BulkLoad
版本信息
- scala: 2.11
- spark: 2.4.8
- hbase: 1.2.0
pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>com</groupId>
<artifactId>bulkload</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.8</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.8</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.4.8</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.32</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- Scala Compiler -->
<plugin>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<groupId>net.alchim31.maven</groupId>
<version>3.4.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>BulkLoad</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
参考代码
hive -> hbase
/**
*
* @author 7e2hj
* @date 2022-01-04
* @version 1.0
*/
object BulkLoad {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("yarn")
.appName("BulkLoad")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
val hBaseConf = getHBaseConf() //自定义的方法
val hBaseConn = getHBaseConn(hBaseConf) //自定义的方法
val admin = hBaseConn.getAdmin
val fileSystem = FileSystem.get(new Configuration())
val permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
//获取输出路径
val filePath = new Path("hFilePath")
// 如果存放 HFile文件的路径已经存在,就删除掉
if (fileSystem.exists(filePath)) {
fileSystem.delete(filePath, true)
print("删除hdfs上存在的路径")
}
val hTableName: TableName = TableName.valueOf("hBaseNameSpace", "hBaseTableName")
val hTable = hBaseConn.getTable(hTableName)
//从hive表读取数据
val sql = "select * from table"
val df: DataFrame = spark.sql(sql)
val data: RDD[(Array[Byte], (Array[Byte], Array[Byte], Array[Byte]))] =
df.rdd.map(row => {
val rowKey = row.getString(0)
val familyName = row.getString(1)
val columnName = row.getString(2)
val str = row.getString(3)
(Bytes.toBytes(rowKey), (Bytes.toBytes(familyName), Bytes.toBytes(columnName), Bytes.toBytes(str)))
})
println("hive数据计算完成")
//表不存在则建Hbase表
createOrNot(admin, hTableName, familyName)
//设置job
val job = Job.getInstance(hBaseConf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat2.configureIncrementalLoadMap(job, hTable)
//生成hfile
data
.map(cell => {
val keyValue = new KeyValue(cell._1, cell._2._1, cell._2._2, cell._2._3)
(new ImmutableBytesWritable(cell._1), keyValue)
})
.sortBy(x=>(x._1,x._2.getKeyString),ascending = true)
.saveAsNewAPIHadoopFile("hFilePath",
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
job.getConfiguration)
println("Hfile生成完成")
//修改目录及文件权限
if (fileSystem.exists(filePath)) {
setPermission(fileSystem, filePath, permission)
}
//将Hfile移动到HBase
val bulkLoader = new LoadIncrementalHFiles(hBaseConf)
val regionLocator = hBaseConn.getRegionLocator(hTableName)
bulkLoader.doBulkLoad(filePath, admin, hTable, regionLocator)
println("Bulk Load完成")
admin.close()
hBaseConn.close()
fileSystem.close()
sc.stop()
}
def setPermission(fs: FileSystem, path: Path, permission: FsPermission): Unit = {
try
if (fs.getFileStatus(path).isDirectory) {
fs.setPermission(path, permission)
val fileStatus: Array[FileStatus] = fs.listStatus(path)
for (i <- fileStatus.indices) {
val status = fileStatus(i)
setPermission(fs, status.getPath, permission)
}
}
else fs.setPermission(path, permission)
catch {
case ex: Exception =>
ex.printStackTrace()
}
}
def createOrNot(admin: Admin, tableName: TableName, familyName: String) = {
if (!admin.tableExists(tableName)) {
val tableDesc = new HTableDescriptor(tableName)
tableDesc.addFamily(new HColumnDescriptor(familyName.getBytes()))
admin.createTable(tableDesc)
}
}
}
常见问题
排序问题
报错:Caused by: java.io.IOException: Added a key not lexically larger than previous
原因: spark写hfile时是按照rowkey+列族+列名进行排序的,因此在写入数据时要做到整体有序
权限问题
报错:Caused by: org.apache.hadoop.security.AccessControlException: Permission denied: user=hbase, access=WRITE, inode="..."
原因: 执行生成Hfile文件的用户时hadoop用户,移动Hfile进HBase的用户是hbase
解决方案:
- 使用hbase用户执行
- 在生成hfile后将文件与目录的权限修改
- 关闭hdfs权限设置
参考
spark实现BulkLoad批量加载方式导入Hbase数据 - 夜空中最亮的仔 - 博客园 (cnblogs.com)
Comments NOTHING