読者です 読者をやめる 読者になる 読者になる

はじめての並行プログラミング(1)

第53回 社内勉強会に使用したテキストファイ
「はじめての並行プログラミング(1)」

今回話すこと

  • スレッド?
  • 並行プログラミングの難しさ

複数の仕事を同時に

『並行プログラミング』
とあるプログラムで何かの処理を同時におこないたい
 
方法は大きく分けて二つ

  • プロセス
  • スレッド(軽量プロセス)

※ スレッド in プロセス の関係。

プロセスとスレッドの違い(簡単に言えば)

プロセスは「資源を共有しない」。
スレッドは「資源を共有する」。

  • スレッドが共有するもの
  • スレッドが共有しないもの
    • プログラムカウンタ、スタック、ローカル変数

スレッドを使うメリット

よいとこ

  • 一つのプロセス内で並行処理が可能
    • 非同期な処理とか
  • 起動も低コスト
  • データの同期が低コスト

わるいところ

  • 共有データの扱いが難しい(超重要)

スレッドの安全性(safety)

スレッドセーフなコード = 共有データをマルチスレッドで安全に扱っているコード
※ スレッドを使う場合は「スレッドセーフティ」を常に意識!

共有データを扱ってみよう

以下のコードはスレッドセーフでしょうか??

class Counter
  def initialize
    @next_value = 0
  end

  # 重複のない値を返したい
  def next_value
    @next_value+=1
    sleep(0.001)
    return @next_value
  end
end
c = Counter.new
res = []
a = Thread.new do
  1000.times do
    res << c.next_value
  end
end
b = Thread.new do
  1000.times do
    res << c.next_value
  end
end

sleep(3)
p res.length
p res.uniq.length

スレッドセーフにしてみよう

class SafeCounter
  def initialize
    @m = Mutex.new
    @next_value = 0
  end

  def next_value
    @m.synchronize do
      @next_value+=1
      sleep(0.001)
      return @next_value
    end
  end
end
c = SafeCounter.new
res = []
a = Thread.new do
  1000.times do
    res << c.next_value
  end
end
b = Thread.new do
  1000.times do
    res << c.next_value
  end
end

sleep(3)
p res.length
p res.uniq.length

排他制御

ココで使ったのはミューテックス
MUTEX - MUTual EXclusion(相互排他)
『使用可能な機材の上には小さな旗が置かれ、その旗を手にいれた者がその機材の使用権利を獲得する』
 
mutex.lock 旗を手に入れる
mutex.unlock 旗を手放す
 
mutexは排他制御やりかたの内の一つ
さまざまな排他制御のやりかたがある(詳細はまた次回)

  • ロック - スピンロック
  • モニター
  • セマフォ
  • その他

見えづらいスレッドアンセーフなコード(上級編)

UnsafeCounter.java

import java.lang.Thread;
import java.util.*;

// Threadセーフでないカウンター
public class UnsafeCounter
{
    private int nextValue = 0;
    public static List<Integer> res = Collections.synchronizedList(new ArrayList());

    public int getNextValue() {
        return ++this.nextValue;
    }

    public static void main(String[] args) throws InterruptedException {
        UnsafeCounter c = new UnsafeCounter();
        ThreadA a = new ThreadA(c);
        ThreadB b = new ThreadB(c);
        a.start();
        b.start();


        Thread.sleep(100);
        System.out.println(res.size());
        HashSet<Integer> hashSet = new HashSet<Integer>();
        hashSet.addAll(res);
        System.out.println(hashSet.size());
    }

    static class ThreadA extends Thread
    {
        private UnsafeCounter c;
        ThreadA(UnsafeCounter c) {
            this.c = c;
        }

        public void run() {
            for(int i=0; i < 3000; i++) {
                UnsafeCounter.res.add(this.c.getNextValue());
            }
        }
    }

    static class ThreadB extends Thread
    {
        private UnsafeCounter c;
        ThreadB(UnsafeCounter c) {
            this.c = c;
        }

        public void run() {
            for(int i=0; i < 3000; i++) {
                UnsafeCounter.res.add(this.c.getNextValue());
            }
        }
    }
};

なぜスレッドセーフではないのか?

JVMバイトコード

public int getNextValue();
  Code:
   0:   aload_0
   1:   dup
   2:   getfield        #2; //Field nextValue:I
   5:   iconst_1
   6:   iadd
   7:   dup_x1
   8:   putfield        #2; //Field nextValue:I
   11:  ireturn

7でスレッドが切り替わる可能性がある

tmp = nextValue; // tmp = 50
tmp = tmp + 1;   // aがbで切り替わる可能性、もしくはa, b同時実行 tmp = 51
nextValue = tmp;
return nextValue; // a, bに nextValue = tmp = 50 が返ってしまう

これにより同じカウントが出てくる可能性があるため、スレッドセーフではない。

スレッドセーフにしてみよう!

getNextValue に synchronized を付けるだけ。

public int getNextValue();
  Code:
   0:   aload_0
   1:   dup
   2:   astore_1
   3:   monitorenter
   4:   aload_0
   5:   dup
   6:   getfield        #2; //Field nextValue:I
   9:   iconst_1
   10:  iadd
   11:  dup_x1
   12:  putfield        #2; //Field nextValue:I
   15:  aload_1
   16:  monitorexit
   17:  ireturn
   18:  astore_2
   19:  aload_1
   20:  monitorexit
   21:  aload_2
   22:  athrow

3, 16 の辺りでロックがかかっている
スレッドセーフ!

言語のスレッドの中身

OSが提供するスレッド(ネイティブスレッド)

ほとんどの言語は上記の関数群を利用してスレッド処理を実現している。

言語処理系がエミュレートしたもの

  • グリーンスレッド

1プロセス、1スレッド、複数エミューレトスレッド。
ruby1.8までがこれ。1.9はGVL。

Ruby1.9のGVL

GVL(Global VM Lock)なるものが実装されている。
何かしらブロックする処理のときにスレッドが切り替わる。例えばsleepやIOなど。
タイマーでも切り替わるよ(前田さん)

$x = []
t = Thread.start{ 100000.times{$x << 1}}
100000.times{$x << 2}
p t.join
puts $x

1と2が混ざって出力される。おお、ほんとだ。
 
Ruby1.9からはネイティブスレッド実装。Thread.start = ネイティブスレッドで実行。
ただし、GVLによって1度に実行するスレッドは1つだけとなる。
そのためマルチプロセッサ環境でも1プロセッサのみしか使用されない。
 
GVLは誰のために嬉しいのだろう?

  • CRubyでC拡張ライブラリを書くようなときにスレッドセーフなコードでなくてもよい。
  • CRubyの内部のコードをすべてスレッドセーフにしなくてすむ

 
一方、JVMはGVLのようなものがないので、マルチプロセッサ環境では完全に並行に動けるのであった。

次回話すこと

  • スレッドセーフ