Webエンジニアのメモ帳

技術的な話を中心に書いています。

【Java】マルチスレッドとRateLimiterによるQPSの制御について

マルチスレッドとは

あるクラスが、別のあるクラスのインスタンスを複数生成し、そのインスタンスが持つメソッドを同時に実行したい、というような状況は、Webアプリなどで重い処理を何度も行う時などにはよくあることです。(APIを叩く処理など)

このように、時間がかかるような処理を並行で複数動かすことをマルチスレッドといいます。

マルチスレッドでプロセスを動かす例

ここではMainクラスでSampleThreadというクラスのインスタンスを2つ生成し、マルチスレッドで動かす例を考えてみます。 SampleThread.javaのrun()メソッドは、1秒ごとに0から2までの数字を出力します。

・Main.java

public class Main {
  public static void main(String[] args) {
    Thread th1 = new SampleThread("スレッド1");
    Thread th2 = new SampleThread("スレッド2");
    th1.start();
    th2.start();
  }
}

・SampleThread.java

class SampleThread extends Thread {
  String name;
  
  public SampleThread(String name) {
    this.name = name;
  }

  public void run() {
    try {
      for (int i = 0; i < 3; i++) {
        Thread.sleep(1000);
        System.out.println(name + ": " + i);
      }
    } catch(Exception e) {
      System.out.println("エラーです");
    }
  }
}

出力は以下のようになり、1秒に2行ずつ表示されていくはずです。

スレッド1: 0
スレッド2: 0
スレッド1: 1
スレッド2: 1
スレッド1: 2
スレッド2: 2

注意点として、Mainクラスでth1.start()ではなくth1.run()としてSampleThreadクラスのrun()を呼び出すこともできますが、これだとシングルスレッドで動いてしまい、出力が

スレッド1: 0
スレッド1: 1
スレッド1: 2
スレッド2: 0
スレッド2: 1
スレッド2: 2

となってしまいます。

QPSの制御

さて、SampleThread.javaが次のようなコードである場合を考えます。 先ほどとの違いは、sleep()を挟んでいない点です。

・SampleThread.java

class SampleThread extends Thread {
  String name;
  
  public SampleThread(String name) {
    this.name = name;
  }

  public void run() {
    try {
      for (int i = 0; i < 3; i++) {
        System.out.println(name + ": " + i);
      }
    } catch(Exception e) {
      System.out.println("エラーです");
    }
  }
}

run()メソッドが実行されると、先ほどとは違って、0から2までの数字が瞬時に出力されます。

ここで、「SampleThreadのインスタンスを2つ作成して、run()メソッドを並行で実行したいが、1秒間にrun()メソッドが実行される回数は、2つのインスタンスを合計して1回以下にしたい」という状況を考えます。

今回はrun()メソッドは数字を出力しているだけですが、記事の冒頭に書いたように、もしもrun()メソッドがWebサービスのプログラムの一部で、何らかのAPIを叩いていたりすれば、このような状況は容易に想像できます。

SampleThreadクラスで2秒スリープを挟むというのは一つの手です。2つのスレッドが動くのであれば、平均すれば1秒に一回run()が実行されて数字が出力されます。しかし、あまりスマートな方法ではありません。

そのような時に有効なのが、RateLimiterというクラスです。 このクラスを使って、Main.javaとSampleThread.javaを書き換えます。

・Main.java

import com.revinate.guava.util.concurrent.RateLimiter;

public class Main {
  public static void main(String[] args) {
    RateLimiter limiter = RateLimiter.create(1);
    Thread th1 = new SampleThread("スレッド1", limiter);
    Thread th2 = new SampleThread("スレッド2", limiter);
    th1.start();
    th2.start();
  }
}

・SampleThread.java

import com.revinate.guava.util.concurrent.RateLimiter;

class SampleThread extends Thread {
  String name;
  RateLimiter limiter;

  public SampleThread(String name, RateLimiter limiter) {
    this.name = name;
    this.limiter = limiter;
  }

  public void run() {
    try {
      for (int i = 0; i < 3; i++) {
        limiter.acquire();
        System.out.println(name + ": " + i);
      }
    } catch(Exception e) {
      System.out.println("エラーです");
    }
  }
}

Main.java

RateLimiter limiter = RateLimiter.create(1);

としてRateLimiterクラスのインスタンスを作成しているのがポイントです。引数は1秒間に実行したい回数です。

そして、このインスタンスをSampleThreadクラスの2つのインスタンスの両方に渡しています。

SampleThreadクラスのrun()メソッドでは

limiter.acquire();

という処理を実行しています。

RateLimiter.create()で設定したQPSを超えそうなら、ここで処理が止まるということですね。

このようにすると、1秒に1回、数字が出力されるようになります。

スレッド1: 0
スレッド2: 0
スレッド1: 1
スレッド2: 1
スレッド1: 2
スレッド2: 2

最後に、pom.xmlののところには以下のようにdependencyを追記してください。

<dependency>
  <groupId>com.revinate</groupId>
  <artifactId>guava-rate-limiter</artifactId>
  <version>19.0</version>
</dependency>

追記

RateLimiterを使う場合、threadを使わなくても、同様の動きが可能です。

・Main.java

import com.revinate.guava.util.concurrent.RateLimiter;

public class Main {
  public static void main(String[] args) {
    RateLimiter limiter = RateLimiter.create(1);
    SampleThread th1 = new SampleThread("スレッド1", limiter);
    SampleThread th2 = new SampleThread("スレッド2", limiter);
    
    // start()を使わない
    th1.run();
    th2.run();
  }
}

・SampleThread.java

import com.revinate.guava.util.concurrent.RateLimiter;

# Threadクラスを継承していない
class SampleThread {
  String name;
  RateLimiter limiter;
  
  public SampleThread(String name, RateLimiter limiter) {
    this.name = name;
    this.limiter = limiter;
  }

  public void run() {
    try {
      for (int i = 0; i < 3; i++) {
        limiter.acquire();
        System.out.println(name + ": " + i);
      }
    }
  }
}