目的:有一段代码,后面暂时用不到,但是又很耗时,占了当前R session,难道只能等半个小时,等到它结束才能画图?
可以使用R多线程,在支线进程中执行耗时任务,同时不阻塞当前R进程,可以继续干活。
- 在Rstudio下只能使用 plan(multisession, workers=5)。
- 在shell R下,还可以使用
plan(multiprocess),plan(multicore)[Not supported on Windows.]
Strategy ‘multiprocess’ is deprecated in future (>= 1.20.0) [2020-10-30].
Instead, explicitly specify either ‘multisession’ (recommended) or ‘multicore’. In the current R session, ‘multiprocess’ equals ‘multisession’.
1. 非阻塞的启动R多进程支线任务
# 第一步,开启多线程。只能linux系统,必须开多线程
library(future)
plan(multisession, workers = 3) # 设置为多进程模式
# 第二部:定义耗时任务
save_task <- future({
pid=Sys.getpid()
print( paste0("Start ...", pid) )
#save.image("before.RData")
Sys.sleep(30) # 模拟耗时1小时
# 这里保存Rds文件,可能需要花费几十分钟到2个小时
# save.image("end.RData")
print( paste0("End...", pid) )
100 #最后一行是线程的返回值,类似函数的感觉。
})
# 第三步,继续执行其他任务
# 在任务运行时,可以继续执行其他代码,不用等待耗时任务结束
print("正在后台保存文件,可以继续执行其他任务...")
2.检查一个支线任务是否结束
- 要检查任务的状态,可以使用 value() 函数,它会阻塞直到任务完成并返回结果。
- resolved() 函数用于检查任务是否已完成,是非阻塞的。
- 如果想非阻塞地检查结果,可以结合使用 resolved() 和 value() 函数。
# f 是一个future任务
# 非阻塞地检查是否完成
if (resolved(f)) {
result <- value(f) # 如果已完成,获取结果
print(result)
} else {
print("任务尚未完成")
}
继续本示例:
resolved(save_task) #非阻塞的查看状态,可以随时查看状态,返回T/F
> value(save_task ) #阻塞的查看返回结果:最后一行的值
[1] "Start ...1389"
[1] "End...1389"
plan(sequential) # 恢复 设置为单进程模式
3. 等待所有支线任务都结束
场景:分别计算每个亚群的高变基因,每个亚群都计算完才能进行下一步。
# arr 是数组,其成员是 future 变量
while(any(!resolved(arr))){ } #阻塞,直到所有支线都出结果:直到任何一个都是T
4. 竞速模式:等待最快的一个线程得到结果
场景:分别使用多个网站下载数据,只要有一个途径下载好,即可开始下一步。
require(future)
#plan(multiprocess)
plan(multisession, workers=5)
longRunningFunction <- function(value, seed=0) {
set.seed(seed)
random1<- runif(min= 5 ,max = 30,n = 1)
Sys.sleep(random1)
return(value)
}
arr = list()
#changed starting number to 1 since R lists start at 1, not 0
i=1
#If the number of futures generated is more than the number of cores available, then the main thread will block until the first future completes and allows more futures to be started
while(i < 6) {
arr[[i]] = future(longRunningFunction(i, seed = i), seed = T)
i = i + 1
}
# 一开始都是F,都没有解析出;最后都是T,都解析了。
while(any(!resolved(arr))){ } #阻塞,直到所有支线都出结果:直到任何一个都是T
#while(all(!resolved(arr))){ } #阻塞,直到有一个支线出结果: 直到至少一个是T[竞速模式]
raceresults_from_future<-lapply(arr[resolved(arr)], value)
print(paste("raceresults_from_future: ",raceresults_from_future) )
5. 传入环境,给多线程的内存消耗瘦身
future开启的支线默认是复制主进程的全部环境,这会导致R的内存开销很大,而其中很大一部分是用不到的拷贝。
可以手动指定某些变量传入执行环境,来达到多进程内存瘦身的目的。
out.a=123
# 1 开启多线程。只能linux系统,必须开多线程
library(future)
plan(multisession, workers = 3) # 设置为多进程模式
# 2 创建环境
# 用于限定多进程拷贝的变量个数,默认是拷贝父进程的整个环境
e1 <- new.env(parent = baseenv())
e1$a=out.a+1902
get("a", e1) #2025 #获取环境中的变量值
assign("b", -3210, envir = e1) #给环境e1中的变量b赋值
get("b", envir = e1) #03210 获取环境中的变量值
# 3 环境作为future的第二个参数
task2 <- future({
pid=Sys.getpid()
print(sprintf("task [%s]", pid))
Sys.sleep(5)
a+100 #最后一行是线程的返回值,类似函数的感觉。
},
envir = e1 ) #envir 传入变量,否则默认传入父环境
# 可以执行其他任务
# 4 查看多进程结果
resolved(task2) #非阻塞(立刻返回)的查看状态,可以随时查看状态,返回T/F
while(!resolved(task2)){} #阻塞,直到支线任务完成
value(task2 ) #阻塞的(直到有结果)查看返回结果:最后一行的值 2125
task2
plan(sequential) # 恢复 设置为单进程模式
Ref:
- https://rstudio.github.io/promises/articles/promises_03_overview.html
- 竞速模式 https://stackoverflow.com/questions/52040744/r-waiting-for-a-list-of-promises-to-resolve