上一章节我们了解了 shuffle 相关的概念和原理后其实可以发现一个问题,那就是 shuffle 比较容易造成数据倾斜的情况。 例如上一节我们看到的图,在这批数据中,hello 这个单词的行占据了绝大部分,当我们执行 groupByKey 的时候触发了 shuffle。这时候大部分的数据 (Hello) 都汇集到了一个 partition 上。这种极端的情况就会造成著名的长尾现象,就是说由于大部分数据都汇集到了一个 partition 而造成了这个 partition 的 task 运行的十分慢。而其他的 task 早已完成,整个任务都在等这个大尾巴 task 的结束。 这种现象破坏了分布式计算的设计初衷,因为最终大部分的计算任务都在一个单点上执行了。所以极端的数据分布就成为了机器学习和大数据处理这类产品的劲敌,我跟我司的研发人员聊的时候,他们也觉得数据倾斜的情况比较难处理,当然我们可以做 repartition(重新分片) 来重新整合 parition 的数量和分布等操作,以及避免或者减少 shuffle 的成本,各家不同的业务有不同的做法。在做这类产品的性能测试的时候,也跟我们以往的互联网模式不同,产品的压力不在于并发量上,而在于数据量和数据分布上。
造数工具
一般我们需要模拟以下这些情况的数据:
- 数据拥有大量的分片
- 数据倾斜
- 宽表
- 空表
- 空行
- 空文件
- 中文行和中文列
- 超长列名
- 包含特殊字符的数据
下面是造数工具的架构图:
解释一下原理:
- 通过spark-submit把分布式计算任务提交到集群中, 可以是hadoop集群可以是spark自建的集群也可以是k8s集群, 其目的就是利用分布式计算的原理, 把造数任务分布在多个机器上进行处理。 最后汇总数据落盘到分布式存储设备中。 所以我们需要把造数逻辑编写到spark脚本中
- 在落盘的时候,面对不同的业务可以选择只输出到一个存储设备中, 但如果面对的是比较复杂的大数据或者机器学习系统, 那么有可能会需要把一份数据双写或者多写到不同的存储设备中, 而在spark中也是有这样的连接器帮助我们对接各种存储设备。
- 如果数据没有构造的特别庞大,也可以像上图那样把数据压缩导出到磁盘中,下一次换个环境使用的时候可以原封不动的再导入进去, 但是这样做的前提是数据不能非常庞大。
下面给一个使用pyspark构造数据的DEMO:
from pyspark import SparkContext, SparkConf, SQLContext
import random
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
def choice_one_random(sequence):
return random.choice(sequence)
def choice_one_with_weights(sequence, weight):
return random.choices(sequence, weights=weight)[0]
def randomInt(start, end):
return random.randint(start, end)
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
title = ['程序员', '教师', '测试人员', '产品经理']
gender = ['男性', '女性']
gender_weights = weights = [0.6, 0.4]
nums = sc.parallelize(range(10000))
rdd = nums.map(
lambda x: (
randomInt(1, 100),
choice_one_random(title),
choice_one_with_weights(gender, gender_weights)))
schema = StructType([
StructField("age", IntegerType(), True),
StructField("title", StringType(), True),
StructField("gender", StringType(), True),
])
data = sqlContext.createDataFrame(rdd, schema=schema)
data.show()
原理:
- 通过 range 在内存中生成足够数量元素的 list
- 通过 rdd.map 函数遍历每一个元素, 然后按照规则生成自己需要的数据
- 转成 dataframe 并保存到存储设备中
可以参考上面的DEMO来完成自己的造数任务, 可以看到python的库中有很多好用的功能, 比如上面我通过random库不仅可以生成随机的数据, 也可以给存储一个列表,让数据生成的时候从这个列表中选一个并且给不同的值不同的权重来控制数据的分布, 这就可以造出数据倾斜的场景。
下面给一个用java实现的例子(实际的项目中我是用java的,下面就是一个项目中的构造程序):
package yuanhang;
import generator.field.random.RandomDateField;
import generator.field.random.RandomIntField;
import generator.field.random.RandomScopeField;
import generator.field.random.RandomStringField;
import generator.table.XRange;
import generator.utils.DateUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.sql.Date;
import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.spark.util.LongAccumulator;
import java.time.LocalDate;
import java.util.Random;
/**
*/
public class event {
public static void main(String[] args) {
// SparkConf conf = new SparkConf().setAppName("data produce")
// .setMaster("local");
SparkConf conf = new SparkConf().setAppName("data produce");
JavaSparkContext sc = new JavaSparkContext(conf);
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.getOrCreate();
// SparkContext sparkSC = spark.sparkContext();
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("uin", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("app_key", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("event_time", DataTypes.DateType, true));
fields.add(DataTypes.createStructField("event_code", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("ds", DataTypes.IntegerType, true));
fields.add(DataTypes.createStructField("i001", DataTypes.IntegerType, true));
fields.add(DataTypes.createStructField("i002", DataTypes.IntegerType, true));
fields.add(DataTypes.createStructField("s001", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("s002", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("s003", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("s004", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("s005", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("s006", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("s007", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("s008", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("d001", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
final LongAccumulator accumulator = sc.sc().longAccumulator();
// LocalDate startDate = LocalDate.of(2023, 5, 1);
LocalDate startDate = LocalDate.of(2022, 10, 30);
// LocalDate beginDate = LocalDate.of(2022, 5, 1);
LocalDate endDate = LocalDate.of(2023, 10, 30);
//default time zone
ZoneId defaultZoneId = ZoneId.systemDefault();
RandomScopeField event_codes = new RandomScopeField();
event_codes.getValues().add("app_jhapp_search_res");
event_codes.getValues().add("app_jhapp_tab_switch");
event_codes.getValues().add("app_jhapp_applnch");
event_codes.getValues().add("app_jhapp_search_ck");
event_codes.getValues().add("app_jhapp_search_page_imp");
event_codes.getValues().add("app_jhapp_search_page_ck");
event_codes.getValues().add("app_jhapp_explore_subtab_imp");
event_codes.getValues().add("app_jhapp_carousels_imp");
event_codes.getValues().add("app_jhapp_carousels_intrct");
event_codes.getValues().add("app_jhapp_first_open");
event_codes.getValues().add("app_jhapp_content_detail_imp");
event_codes.getValues().add("app_jhapp_content_detail_interact");
event_codes.getValues().add("app_jhapp_tab_imp");
while (!startDate.isAfter(endDate)) {
// System.out.println(startDate);
// List data1 = new XRange(1000);
List data1 = new XRange(274000);
JavaRDD distData = sc.parallelize(data1, 20);
DateFormat dateformat = new SimpleDateFormat("yyyyMMddhh");
Date date = new Date(java.util.Date.from(startDate.atStartOfDay(defaultZoneId).toInstant()).getTime());
// Date bDate = new Date(java.util.Date.from(beginDate.atStartOfDay(defaultZoneId).toInstant()).getTime());
// Date eDate = new Date(java.util.Date.from(endDate.atStartOfDay(defaultZoneId).toInstant()).getTime());
int ds = Integer.parseInt(dateformat.format(date));
JavaRDD rowRDD = distData.map( record ->{
RandomIntField userId = new RandomIntField();
userId.setMax(1000000);
userId.setMin(1);
// Date date = DateUtil.randomDate("2023-05-01", "2023-06-19");
RandomIntField r = new RandomIntField();
r.setMin(1);
r.setMax(100);
int i001 = Integer.parseInt(r.gen().toString());
int i002 = Integer.parseInt(r.gen().toString());
String s001 = "C"+ r.gen();
String s002 = "当前一级板" + r.gen();
String s003 = "去向一级板块" + r.gen();
String s004 = "去向一级板块" + r.gen();
String s005 = "去向一级板块" + r.gen();
String s006 = "去向一级板块" + r.gen();
String s007 = "2022090701";
String s008 = "2023090701";
double d001 = Double.parseDouble(r.gen().toString());
return RowFactory.create(userId.gen().toString(), "0WEB05LD02D5FL6K",date,event_codes.gen(),ds,i001,i002,s001,s002,s003,s004,s005,s006,s007,s008,d001);
});
Dataset dataset =spark.createDataFrame(rowRDD, schema);
dataset.show();
// DataFrameWriter writer = new DataFrameWriter(dataset);
String jdbcUrl = "jdbc:clickhouse://clickhouse-hs:8123/beacon_olap";
// String jdbcUrl = "jdbc:clickhouse://10.27.20.122:8123/beacon_olap";
Properties ckProperties = new Properties();
// ckProperties.setProperty("user", "beidou");
ckProperties.setProperty("user", "default");
ckProperties.setProperty("password", "QdFx@00700!*");
// ckProperties.setProperty("password", "Beidou@qidian");
ckProperties.setProperty("driver", "ru.yandex.clickhouse.ClickHouseDriver");
// ckProperties.setProperty("dbtable", "event_record_240");
// ckProperties.setProperty("batchsize", "50000");
// ckProperties.setProperty("isolationLevel", "NONE");
// ckProperties.setProperty("numPartitions", "12");
// ckProperties.setProperty("url", "jdbc:clickhouse://clickhouse-hs:8123/beacon_olap");
dataset.write().option("batchsize", "50000")
.option("isolationLevel", "NONE")
.option("numPartitions", "10")
.option("truncate", "true")
.option("compression", "snappy")
.mode(SaveMode.Append)
.jdbc(jdbcUrl, "event_record_273", ckProperties);
// dataset.write().mode(SaveMode.Append).jdbc(jdbcUrl, "event_record_240", ckProperties);
startDate = startDate.plusDays(1);
}
}
}
生成海量小文件
上一篇中也提到了海量的小文件是所有分布式存储设备的天敌, 我们之前在做一个车企的项目的时候, 也是要为客户搭建一个人工智能系统, 但是客户那边有20亿张图片的数据量需要做模型的训练推理(主要是计算机视觉方向,所以都是图片数据)。 所以专门引入了一个适合存储小文件的分布式存储设备(比如可以用ceph), 这时候就需要测试在这样庞大的文件数量下,不仅仅是存储系统,还有我们的产品本身是否能符合客户的性能需要。
海量小文件的构建与之前所讲的构造方式完全不同, spark可以造大量的数据,但这些数据都是在少数文件中的, 它无法构建海量的小文件, 这是因为在spark中每个parition(这里可以理解为一个小文件, 因为如果一个比较大的数据被切分成了很多很小的文件, 那么即便这个文件只有1k,在它读取到内存的时候也会当成一个partition处理)都会生成一个独立的task来计算, 一个task可以理解为一个线程。 所以当文件数量过多时,spark就会启动非常多的线程争抢cpu资源。所以不仅仅是分布式存储系统, 在分布式计算本身,过多的文件数量都是噩梦,试想一下,当一个文件只有100w条数据,但是每条数据都单独保存在一个文件中。 这时候spark就要开启100w个线程来处理这个数据,这是多么可怕的事情。
所以以前的构造方式是无法满足我们的需要的。 就要引入另外一种机制 -- 异步IO,这是一种利用少量线程就可以支撑大并发量的技术。 因为我们常见的普通的同步IO是无法满足我们的需要的,它有如下的缺点:
- IO:不管是生成图片,还是把与数据库交互都会消耗网络和磁盘 io, 20 亿张图片对于 IO 的考验是比较大的
- CPU:如果按照传统的思路,为了提升造数性能, 会开很多个线程来并发生成图片,计算元数据和数据交互。 但线程开的太多 CPU 的上下文切换也很损耗性能。 尤其我们使用的是 ssd 磁盘,多线程的模式可能是无法最大化利用磁盘 IO 的。
后面经过讨论, 最后的方案是使用 golang 语言, 用协程 + 异步 IO 来进行造数:
- 首先 golang 语言的 IO 库都是使用 netpoll 进行优化过的,netpoll 底层用的就是 epoll。 这种异步 IO 技术能保证用更少的线程处理更多的文件。 关于 epoll 为什么性能好可以参考这篇文章:https://www.cnblogs.com/Hijack-you/p/13057792.html 也可以去查一下同步 IO 和异步 IO 的区别。 我大概总结一下就是, 传统的多线程 + 同步 IO 模型是开多个线程抗压力, 每个线程同一时间只处理一个 IO。 一旦 IO 触发开始读写文件线程就会处于阻塞状态, 这个线程就会从 CPU 队列中移除也就是切换线程。 等 IO 操作结束了再把线程放到 CPU 队列里让线程继续执行下面的操作,而异步 IO 是如果一个线程遇到了 IO 操作,它不会进入阻塞状态, 而是继续处理其他的事, 等到那个 IO 操作结束了再通知程序(通过系统中断),调用回调函数处理后面的事情。 这样就保证了异步 IO 的机制下可以用更少的线程处理更多的 IO 操作。 这是为什么异步 IO 性能更好的原因,也是为什么异步 IO 能最大化利用磁盘性能。 比如在这个造图片的场景里, 我在内存中造好图片后,开始写入文件系统, 如果是同步 IO 那这时候就要阻塞了,直到文件写入完毕线程才会继续处理, 但因为我用的异步 IO, 调用玩函数让内存中的数据写入到文件就不管了, 直接开始造下一张图片, 什么时候 IO 结束通知我我在回过头来处理。 所以是可以用更少的线程来完成更多的 IO 操作也就是异步 IO 能很容易的把磁盘性能打满。 我自己测试的时候再自己的笔记本上造 100k 的图片, 大概是 1s 就能造 1W 张图片
- 其次 golang 的 GMP 模型本身就很高效, 编写异步程序也非常简单。 我也就花了一上午就把脚本写完了。
- 最后利用 k8s 集群把造数任务调度到集群中, 充分利用分布式计算的优势, 在多台机器上启动多个造数任务共同完成。
代码实现:
package main
import (
"crypto/md5"
"encoding/hex"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"os"
"path"
"sync"
"time"
)
var (
fileQueue = make(chan FileInfo, 1000) // 缓冲队列,用来存储文件key, 方便后面的协程取出来插入数据库
sourceFiles = []string{"asfdf.png"} // 复制的源文件
sourceFileCache sync.Map // 缓存源文件内容, 避免每次copy都重新读取源文件
)
const (
destDir = "file" // 需要复制的目录路径
copyNumber = 10 // 每个协程需要copy的文件数量
)
// 文件 信息
type FileInfo struct {
key string
createdAt time.Time
}
// 生成随机字符串
func GetRandomString(n int) string {
str := "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
bytes := []byte(str)
var result []byte
for i := 0; i < n; i++ {
result = append(result, bytes[rand.Intn(len(bytes))])
}
return string(result)
}
func copyFile(src, dst string) ([]byte, int64, error) {
var input []byte
if data, ok := sourceFileCache.Load(src); ok {
input = data.([]byte)
} else {
data, err := ioutil.ReadFile(src)
if err != nil {
return []byte{}, 0, err
}
input = data
sourceFileCache.Store(src, input)
}
err := ioutil.WriteFile(dst, input, 0644)
if err != nil {
return []byte{}, 0, err
}
fi, err := os.Stat(dst)
if err != nil {
return []byte{}, 0, err
}
return input, fi.Size(), nil
}
func copyFiles(wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < copyNumber; i++ {
// 随机种子
rand.Seed(time.Now().UnixNano())
// 从源文件中选择一个文件进行copy
sourceFileCount := len(sourceFiles)
sourceFilePath := sourceFiles[rand.Intn(sourceFileCount)]
// 生成随机的文件名称
fileName := GetRandomString(30) + ".jpg"
destFilePath := path.Join(destDir, fileName)
data, size, err := copyFile(sourceFilePath, destFilePath)
if err != nil {
fmt.Printf("copyFile file from %s to %s err, the message is %s", sourceFilePath, destFilePath, err.Error())
}
key, err := NewUploadFileKey("superadmin", fileName, md5V(string(data)), size)
if err != nil {
fmt.Printf("gen file key error, the message is %s", err.Error())
}
fileQueue <- FileInfo{
key: key,
createdAt: time.Now(),
}
}
}
func parseBasenameFromURI(uri string) (string, error) {
r, _ := http.NewRequest("GET", uri, nil)
return path.Base(r.URL.Path), nil
}
func NewUploadFileKey(uin, filename, hash string, size int64) (string, error) {
str := fmt.Sprintf("uin:%s-hash:%s-size:%d", uin, hash, size)
basename, err := parseBasenameFromURI(filename)
if err != nil {
return "", err
}
partHash := md5V(str)
result := md5V(fmt.Sprintf("hash:%s-name:%s", partHash, filename))
content := md5V(fmt.Sprintf("%s-%s-%s", result[8:10], result[10:], basename))
u := fmt.Sprintf("%s%s/%s/%s-%s", "upload/", result[8:10], content[8:10], result[10:], basename)
return u, nil
}
func md5V(str string) string {
h := md5.New()
h.Write([]byte(str))
return hex.EncodeToString(h.Sum(nil))
}
func main() {
var wg1 sync.WaitGroup
wg1.Add(10)
for i := 0; i < 10; i++ {
go copyFiles(&wg1)
}
// 等待所有复制文件的协程结束
go func() {
wg1.Wait()
// 关闭chan, 通知插入数据的协程, 文件都已经复制完毕
close(fileQueue)
fmt.Println("关闭通道")
}()
// 使用10个线程来插入数据库
var wg2 sync.WaitGroup
wg2.Add(10)
for i := 0; i < 10; i++ {
go insert(&wg2)
}
wg2.Wait()
fmt.Println("数据生成完毕")
}
// 从chan中取出文件key插入数据库
func insert(wg *sync.WaitGroup) {
defer wg.Done()
var cache []FileInfo
for fileInfo := range fileQueue {
if len(cache) < 1000 {
cache = append(cache, fileInfo)
//fmt.Println("数据没到1000条, 继续缓存")
} else {
// todo 将1000条数据插入到数据库中
fmt.Println("积累了1k个文件, 开始插入数据库")
}
}
// todo for循环结束说明队列已经被关闭, 所有文件都copy完毕这时候需要缓存中剩余的记录一块插入到数据库中
fmt.Println("通道已经关闭, 现在开始把剩余的插入到数据库中")
//fmt.Println(cache)
for _, fileInfo := range cache{
fmt.Println(fileInfo.key)
}
}
因为golang原生就支持异步IO,实现起来最简单便选择了golang语言,对于go语言不熟悉的同学也可以查找一下python语言的异步io库
更多内容欢迎来到我的知识星球: