はじめに
急に仕事が忙しくなり始め,業務で取り扱うデータ量が大きくなってきました。
今まではあまり実行時間を気にしなかったのですが,あまり手を入れずに処理を短時間で終わらせたいと考えました。
そこで,今まであまり調べてなかったJulia言語の並列処理化に手をつけてみました。ここでは,あまり頭を使わずに実現できる方法をピックアップしてみました。
並列処理化
並列処理化には
- プロセスによる並列化
- スレッドによる並列化
の二種類があるのですが,今回はわりと手軽に感じたスレッドによる並列化を試してみます。
スレッドによる並列化を利用する時には,Juliaの環境を実行する前に準備が必要です。
一つは,julia言語を起動する時に julia -t 4
のように引数で最大スレッド数を指定する方法,もう一つはjuliaを起動する前に環境変数JULIA_NUM_THREADS
に
最大スレッド数を指定する方法です。
プロセス並列化の場合はjuliaを起動した後にプロセス数の変更が可能ですが,スレッド並列化の場合は起動前に指定する必要があります。
なお,デフォルトではスレッド数は1
となっていて,
Threads.nthreads()
で現在の環境のスレッド数を確認することができます。
並列化に適した処理
並列化に適した処理は 「それぞれ別のファイルのデータにフィルタをかけて,別々のファイル保存する」とかのように,それぞれの処理に依存関係がなく独立しているものです。
- グローバル変数を利用しない
- 入力データは関数の引数で入力する
- 入力データ(入力引数)を不用意に書き換えない
- 出力先のファイルをそれぞれの処理で共有しない
等気をつければ,それほど難しくはないでしょうか? よく分かりません。
for文による並列化
例えば,次のような処理を行う時間がかかる処理があったとします。
- データファイル読み込み
- データを演算
- 演算したデータを保存
ここでは仮に次のようなサンプルを用意します。
function myfile_task(filename::AbstractString, outdir::AbstractString) read_dummy(x) = (sleep(2); x) calc_dummy(_) = (sleep(2); rand()) save_dummy(_, _) = (sleep(2); nothing) dummy_data = read_dummy(filename) # データ読み込み(なんちゃって) dummy_calced = calc_dummy(dummy_data) # データ演算(なんちゃって) save_dummy(dummy_calced, outdir) # データ保存(なんちゃって) end
julia> filenames = "myfile" .* string.(collect(1:10)) 10-element Vector{String}: "myfile1" "myfile2" "myfile3" "myfile4" "myfile5" "myfile6" "myfile7" "myfile8" "myfile9" "myfile10" julia> outdir = "./hogehoge" "./hogehoge" julia> @time for k in 1:10 myfile_task(filenames[k], outdir) end 60.109670 seconds (23.23 k allocations: 1.143 MiB, 0.03% compilation time)
上の例だと60秒かかります。
ここで,スレッド数を7
の環境で次のように実行します。
julia> Threads.nthreads() 7 julia> @time Threads.@threads for k in 1:10 myfile_task(filenames[k], outdir) end 12.067657 seconds (59.38 k allocations: 2.953 MiB, 1.19% compilation time)
約1/5の実行時間になりました。
map関数の並列化
スレッド並列化の基礎のスライドの中に,map関数のマルチスレッド化
というものがあります。
次のような実装です。
function threaded_map(fn, array::AbstractArray) tasks = [Threads.@spawn(fn(v)) for v in array] [fetch(task) for task in tasks] end
この関数を試してみます。 時間がかかる処理を次のようにしました。
function mytask(k) sleep(2) # 時間がかかる処理のつもり k * k end
通常のmap関数と実行時間を比較したものを次に示します。 処理時間が短くなっていることが分かります。
julia> @time hoge = map(mytask, 1:10) 20.030014 seconds (43 allocations: 1.516 KiB) 10-element Vector{Int64}: 1 4 9 16 25 36 49 64 81 100 julia> @time hoge = threaded_map(mytask, 1:10) 2.027161 seconds (26.51 k allocations: 1.315 MiB, 2.86% compilation time) 10-element Vector{Int64}: 1 4 9 16 25 36 49 64 81 100
filter関数の並列化
上の例を応用して,filter関数で実装してみます。
function threaded_filter(fn, ar::AbstractArray) tasks = [Threads.@spawn(fn(v)) for v in ar] [a for (task, a) in zip(tasks, ar) if fetch(task)] end
この関数を試してみます。処理がかかる関数のつもりは次のような感じ。
function myfilter(k) sleep(2) # 時間がかかる処理のつもり isodd(k) end
julia> @time hoge = filter(myfilter, 1:10) 20.030471 seconds (44 allocations: 1.328 KiB) 5-element Vector{Int64}: 1 3 5 7 9 julia> @time hoge = threaded_filter(myfilter, 1:10) 2.008133 seconds (1.56 k allocations: 76.562 KiB, 1.71% compilation time) 5-element Vector{Int64}: 1 3 5 7 9
処理時間が短くなっていることが分かります。
まとめ
ここでは,マルチスレッド処理のうち簡単に使えるもののみ紹介しました。 それぞれの依存関係がある処理はマルチスレッド化するのは難しいのですが, それぞれ独立性が高い処理は比較的簡単にマルチスレッド化できるようです。
自分がやっている業務では,上記のように比較的簡単にマルチスレッド化できる処理が多く, もう少し早く真剣に検討すれば良かったと思う今日この頃です。
ただ,メモリの使用量がかなり大きくなるので,たくさんメモリを積んだ環境が必要となるのは言わずもがなです。