abcdefGets

ゲッツ!

WebWorker & SharedArrayBuffer & Atomics (1)

遂にShared Memory and AtomicsがES2017のstage4に
JSの世界にマルチスレッド・共有メモリ・排他制御の仕組みが備わることになった。

Javascript/Browserにおけるマルチスレッド・並列実行

Javascriptの世界では長らく協調的スレッドと呼ばれるような仕組みで擬似的に並列実行を行っていた。
いわゆるノンプリエンプティブなスレッドである。

例えば

(function loop1() {
    console.log('loop1 start');
    console.log('loop1 end');
    setTimeout(loop1, 100);
})();
(function loop2() {
    console.log('loop2 start');
    console.log('loop2 end');
    setTimeout(loop2, 100);
})();

これはタイマーを利用した再帰処理だが、ブラウザがスケジューリングしたタイミングでそれぞれ
loop1とloop2が処理を行う。
loop1とloop2はそれぞれ非同期に実行されるが、全てメインスレッドでスケジューリングされるため、
各処理に割り込みが入ることはない。

つまり必ず

loop[1] start
loop1[1] end

の順番で出力される。

loop[1] start
loop1[2] start

とはならない。

Concurrent.Thread

Concurrent.Threadという古のライブラリがある。
このライブラリはこんなふうに使う

function hello(id) {
    document.write("[" + id + "] " + "hello,<br />");
    Concurrent.Thread.sleep(id * 1000);
    document.write("[" + id + "] " + "world!<br />");
}
Concurrent.Thread.create(hello, 1);
Concurrent.Thread.create(hello, 2);
Concurrent.Thread.create(hello, 3);

こうすると

1hello
2hello
3hello

1world
2world
3world

というまさにプリエンプションが行われたかのような結果となる。
このライブラリは渡された関数の各行をsetTimeoutでラップすることで、各ステートメントを非同期化し、
再スケジューリングすることでコンテキストスイッチを実現している。

しかしあくまでメインスレッド上で分割して実行しているため、
あくまで、並列であり並行ではない。

並行と並列の定義の違いはparallel と concurrent、並列と並行の違い
を参照

Web Worker

Web Workerというものがある。これはまさに前述の並行処理を可能にするもので、本物のスレッドである。
しかし各スレッド間はメッセージングでやりとりを行い、JSONのやり取りしかできない。

つまり処理は並列だが、各スレッドが共有する値にアクセスすることはできない。
ただ、このWokerの仕組みを採用した事により排他制御の複雑さからは逃れられている。

Shared Array Buffers

SharedArrayBufferはWebWorker間で値を共有できない問題を解決するために導入された、
Worker間をまたいだバイト配列共有の仕組みである。

しかしSharedArrayBufferをスレッド間で共有すると問題が発生する。

スレッド間可視性

// main.js
sharedArrayBuffer[0] = 1;
sharedArrayBuffer[0] = 2;

// thread.js
while (sharedArrayBuffer[0] === 1) {}
console.log(sharedArrayBuffer[0]); //A

この場合Aの値は1かも知れないし、2かもしれない。
1か2のどちらかになるだろう。

これは一例であるが、

  1. main 代入処理 sharedArrayBuffer[0] = 1
  2. thread グローバルなメモリ領域からsharedArrayBuffer[0]を読み込み
  3. thread sharedArrayBuffer[0]を読み込み
  4. main 代入処理 sharedArrayBuffer[0] = 2

といったような順序で処理が行われている場合、AのsharedArrayBufferはまだ初期化されていない可能性がある。

というようにスレッドの可視性の問題によって、また、CPU・コンパイラによる各種最適化によって
Aスレッドで更新された値がBスレッドから確実に読めるかという疑問はかなり不確定な答えになってしまう。

Atomics

これを解決するのがAtomicsである。

Atomics.store(sharedArray, 0, 1);

while (Atomics.load(sharedArray, 0) === 1) {}
console.log(Atomics.load(sharedArray, 0));

この場合、
sharedArrayBuffer間の値の読み書きの順序は整合性を持つことになる。

説明としてはこんぐらい。
基本的にスレッド間の変数の整合性・一貫性を保つのがAtomicsの役割。

Atomics API

Atomics.load(typedArray: SharedArrayBuffer, index: number): number

アトミックにtypedArrayのindexから値を取得する。

Atomics.store(typedArray: SharedArrayBuffer, index: number, value: number): number

アトミックにtypedArrayのindexにvalueを書きこみその値を返す。

Atomics.sub(typedArray: SharedArrayBuffer, index: number, value: number): number

アトミックにtypedArrayのindexの値からvalueを引きその結果を返す。

Atomics.wait(typedArray: SharedArrayBuffer, index: number, timeout: number): number

typedArrayのindexがwakeされるか、timeoutまでsleepする。
mainスレッドでは呼び出せない。

var int32Array = new Int32Array(sharedArray);
Atomics.wait(int32Array, 0, 0);
// int32Arrayの0番目の値が0の場合スリープし続ける。

Atomics.wake(typedArray: SharedArrayBuffer, index: number, count: number): void

count個のwait中のtypedArrayのindexをwakeする。

var int32Array = new Int32Array(sharedArray);
Atomics.wake(int32Array, 0, 1);
// 1つのwaitしているSharedArrayBufferのsleepを解除する。

Atomics.or(typedArray: SharedArrayBuffer, index: number, value: number): number

アトミックにtypedArrayのindexとvalueの論理和を行いその結果を返す

Atomics.add(typedArray: SharedArrayBuffer, index: number, value: numebr): SharedArrayBuffer

指定されたtypedArrayの指定されたindexにvalueを設定する。
内部的にはAtomicReadModifyWrite(typedArray, index, value, op)を使い、Atomicに値をロード、変更、storeする。

Atomics.and(typedArray: SharedArrayBuffer, index: number, value: numebr): SharedArrayBuffer

指定されたtypedArrayの指定されたindexの値と指定されたvalueの論理積とり、指定されたindexに設定する。
内部的にはAtomicReadModifyWrite(typedArray, index, value, op)を使い、Atomicに値をロード、変更、storeする。

Atomics.compareExchange(typedArray: SharedArrayBuffer, index: number, expectedValue: number, replacementValue: number): number

typedArrayのindexの値をexpectedValueと比較し、等しければreplacementValueを書き込む。
戻り値は更新前のtypedArrayのindexの値。

例.

let computedValue = 1;
let initialValue;
let totalValue = Atomcs.load(sharedArray, 0);
const append = 10;
do {
  initialValue = totalValue;
  computedValue = initialValue + append;
  totalValue = Atomics.compareExchange(sharedArray, 0, initialValue, computedValue);
  // 値がinitialValueと等しければ+10にアトミックに更新
  // 値がinitialValueと等しクない場合は、他のスレッドから更新されたので、再度initialValueをアップデートして再更新
} while (initialValue !== totalValue);

Atomics.exchange(typedArray: SharedArrayBuffer, index: number, value: number): number

typedArrayのindexにvalueを書き込み、以前のindexの値を返す。

Atomics.isLockFree(size: number): boolean

指定されたサイズがロックフリーオペレーションが可能かを返す。

まとめ

WebWorkerでShareArrayBufferを使う場合は常にAtomics経由でアクセスしたほうが良い。
もちろんしなくても良いパターンもあるが、Atomicオペレーションのコストより安全側に倒したい。

すごく適当な説明だけど、すごく疲れたから詳細はまた次回