背景:
项目属于部署在内网环境,不对互联网开放访问,因为项目的特殊性,会涉及到与很多的业务系统进行数据对接,但又由于其他业务系统建设比较早,早已过了运维期,要改造几乎不可能,所以对方给过来的数据,基本上是csv或txt格式的数据,而且单个文件大小也是很大,csv数据量至少100w级别,txt文件的数据量更大,达到了千万级。
这就给我们的业务系统带了一个技术挑战,要读取这些文件,且要快速入库。
读取数据可能就一个通用方法,以文件流的形式一行行读取,我的开发笔记本100w数据,全部读完大概是4s的样子,以此推测1000w行数据40s,本来就是读文件流的方式,速度上跟机器性能有一些关系,理论上服务器的性能肯定要比开发笔记本要快几倍。
读完数据是第一步,第二步就是最关键的问题了,如果是100w数据,通过循环一行行插表,你觉得时间要多久?那么1000w行数据呢,可能你会傻眼。
那么数据写入,可能大家会想到几个方案:
方案一:最懒的方式(拿到数据集,循环一条条插表)
这个方案对于数据量不大(1w以内),你可以这样偷懒的去做,应该问题不大,但是对于超过100w的数据集,这种方案去处理,可能业务系统就要出“事故”了。
方案二:把数据集进行分段拆分,分批次批量插表
什么是分段拆分,这么说吧,假如现在又10w数据集,我们可以按照5000行来进行分段拆分,也就是说这10w行数据,按照5000行为一段来进行拆分,10w数据也就是拆分成20段,然后分20次批量插表,每次批量插入5000行,这样也能节省一些数据库的开销。
方案三:基于方案二的原理,再进行升华,以多线程的方式去分批处理
不好理解?这么捋一下,首先还是和方案二一样,把数据进行分段拆分,具体拆分的数据行可以根据数据总量来拆,拆完后,把每个拆完后的数据集用多线程去处理,这样就不要每次等着上一个分段的数据集处理完,程序再处理下一个,多线程的方式下可以并行处理多个分段的数据集,这样在效率上又能比方案二上快。
多线程是实现并发机制的一种有效手段。进程和线程一样,都是实现并发的一个基本单位。线程是比进程更小的执行单位,线程是进程的基础之上进行进一步的划分。所谓多线程是指一个进程在执行过程中可以产生多个更小的程序单元,这些更小的单元称为线程,这些线程可以同时存在,同时运行,一个进程可能包含多个同时执行的线程。
在 Java 中实现多线程有两种手段,一种是继承 Thread 类,另一种就是实现 Runnable 接口。下面我们就分别来介绍这两种方式的使用。
实现 Runnable 接口
class MyThread implements Runnable{ // 实现Runnable接口,作为线程的实现类
private String name ; // 表示线程的名称
public MyThread(String name){
this.name = name ; // 通过构造方法配置name属性
}
public void run(){ // 覆写run()方法,作为线程 的操作主体
for(int i=0;i<10;i++){
System.out.println(name + "运行,i = " + i) ;
}
}
};
public class RunnableDemo01{
public static void main(String args[]){
MyThread mt1 = new MyThread("线程A ") ; // 实例化对象
MyThread mt2 = new MyThread("线程B ") ; // 实例化对象
Thread t1 = new Thread(mt1) ; // 实例化Thread类对象
Thread t2 = new Thread(mt2) ; // 实例化Thread类对象
t1.start() ; // 启动多线程
t2.start() ; // 启动多线程
}
};
**
**
继承 Thread 类
class MyThread extends Thread{ // 继承Thread类,作为线程的实现类
private String name ; // 表示线程的名称
public MyThread(String name){
this.name = name ; // 通过构造方法配置name属性
}
public void run(){ // 覆写run()方法,作为线程 的操作主体
for(int i=0;i<10;i++){
System.out.println(name + "运行,i = " + i) ;
}
}
};
public class ThreadDemo02{
public static void main(String args[]){
MyThread mt1 = new MyThread("线程A ") ; // 实例化对象
MyThread mt2 = new MyThread("线程B ") ; // 实例化对象
mt1.start() ; // 调用线程主体
mt2.start() ; // 调用线程主体
}
};
从程序可以看出,现在的两个线程对象是交错运行的,哪个线程对象抢到了 CPU 资源,哪个线程就可以运行,所以程序每次的运行结果肯定是不一样的,在线程启动虽然调用的是 start() 方法,但实际上调用的却是 run() 方法定义的主体。
这是一个读取csv的方式代码,txt文件网上有很多读取方式,这里就不列举。
读csv的依赖,pom文件
<dependency>
<groupId>net.sourceforge.javacsv</groupId>
<artifactId>javacsv</artifactId>
<version>2.0</version>
</dependency>
测试类ReadAndWriterCsvFlie
public class ReadAndWriterCsvFlie {
// 需要写入的 csv 文件路径
public static final String WRITE_CSV_FILE_PATH = "D:\\ftptest\\20210727.csv";
// 线程池
public static ExecutorService executorService = new ThreadPoolExecutor(20, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
/**
* 读取 csv 文件
*/
public static void readCsvFile(String readCsvFilePath) {
try {
// 创建 CSV Reader 对象, 参数说明(读取的文件路径,分隔符,编码格式)
CsvReader csvReader = new CsvReader(readCsvFilePath, ',', Charset.forName("UTF8"));
// 跳过表头
csvReader.readHeaders();
Date start = new Date();
List<String> list = Lists.newArrayList();
List<String> tempList = Lists.newArrayList();
//每次存储条数
Integer size = 100000;
// 计数器
int count = 1;
int total = 0;
// 读取除表头外的内容
while (csvReader.readRecord()) {
// 读取一整行
String line = csvReader.getRawRecord();
list.add(line);
total += 1;
}
for (int i = 0; i < list.size(); i++, count++) {
tempList.add(list.get(i));
if (count % size == 0) {
executorService.execute(new InsertDate(tempList));
tempList.clear();
count = 0;
}
}
if (!tempList.isEmpty()) {
executorService.execute(new InsertDate(tempList));
tempList.clear();
}
System.out.println(total);
System.out.println(new Date().getTime() - start.getTime() + "ms");
csvReader.close();
} catch (Exception e) {
e.printStackTrace();
}finally {
executorService.shutdown();
}
}
public static void main(String[] args) {
readCsvFile(WRITE_CSV_FILE_PATH);
}
static class InsertDate implements Runnable{
List<String> tempList = Lists.newArrayList();
public InsertDate(List<String> limodel){
limodel.forEach((model)->{
String[] str = model.split(",");
if(str != null){
for(int i=0;i<str.length;i++){
String s = str[i];
}
}
tempList.add(model);
});
}
public void run() {
//保存数据库
System.out.println(Thread.currentThread().getName()+"条数:"+tempList.size());
}
}
}