えちょ記

語らないブログ

タスク並列ということ

何故かReactive Extensionsのぐぐる検索で一時トップになってて慌てましたが少し下がってホッとしました。いあ、どう考えてもうちはトップじゃないでしょぐぐるさん!学習向けの紹介サイトならこことかこことかが分りやすくまとめていますので、まずはこちらを覗いてみることをお勧めします。

追い越し禁止?

さて表題の件ですが。私はRxを処理パイプのような感覚で理解しようしてたのですが、少し疑問がありました。「Rxはどんな単位で処理を並列化、あるいは直列化してるのか?」。もうちょっと具体的には「処理パイプの中で後から投入されたデータの処理が先に終わることはありうるのか?」ということ。

ということで実際にやってみた。

処理パイプの構成

今回の試験用処理パイプは、以下の構成になっています。

  • 入力は1〜9のRange
  • スケジューラはScheduler.ThreadPool
  • 実行処理はコンソールへの出力(スレッド番号など)
  • 処理内でランダムにSleepを入れる

この構成の部品を適当に組み合わせてパイプを構成し、試験することとします。

コード(超長いので折りたたむ)

using System;
using System.Collections.Generic;
using System.Concurrency;
using System.Threading;
using System.Linq;
using System.Text;

namespace ZStudy.Rx.Parallel
{
    public static class Program
    {
        static void Main(string[] args)
        {
            // スレッドプールに対してジョブを発行するパイプ
            var pipe = Observable
                .Range(1, 9, Scheduler.ThreadPool)
                ;
            var stopwatch = new System.Diagnostics.Stopwatch();

            // ■1つのパイプを実行
            Console.WriteLine("#### TEST 1 ####");
            stopwatch.Restart();
            pipe
                .ConsoleWrite("1 ")
                .Run();
            stopwatch.Stop();
            Console.WriteLine("#### TEST 1 ==> time {0:00000}ms\n\n"
                , stopwatch.ElapsedMilliseconds);
            // ---> 処理は直列化されています。
            //      1つのタスクが終わるまで次はスケジュールされません。


            // ■2つのパイプをマージ
            Console.WriteLine("#### TEST 2 ####");
            stopwatch.Restart();
            Observable.Merge(
                pipe.ConsoleWrite("1 "),
                pipe.ConsoleWrite(" 2"))
                .Run(cnt => {
                    var text = string.Format("({0:0})[--] :                         MERGE({1:00})",
                        cnt, Thread.CurrentThread.ManagedThreadId);
                    Console.WriteLine("({1:HH:mm:ss.fff}) {0}", text, DateTimeOffset.Now);
                });
            stopwatch.Stop();
            Console.WriteLine("#### TEST 2 ==> time {0:00000}ms\n\n"
                , stopwatch.ElapsedMilliseconds);
            // ---> 2つのタスクは並列実行されます。
            //      それぞれのタスクは直列に実行されています。


            // ■2つのパイプをZip
            Console.WriteLine("#### TEST 3 ####");
            stopwatch.Restart();
            Observable.Zip(
                pipe.ConsoleWrite("1 "),
                pipe.ConsoleWrite(" 2"),
                (a, b) => Tuple.Create(a, b))
                .Run(pair => {
                    var text = string.Format("({0:0})[--] :                         ZIP({1:00}) [{2:00},{3:00}]",
                        pair.Item1, Thread.CurrentThread.ManagedThreadId,
                        pair.Item1, pair.Item2);
                    Console.WriteLine("({1:HH:mm:ss.fff}) {0}", text, DateTimeOffset.Now);
                })
                ;
            stopwatch.Stop();
            Console.WriteLine("#### TEST 2 ==> time {0:00000}ms\n\n"
                , stopwatch.ElapsedMilliseconds);
            // ---> 2つのタスクは並列実行されます。
            //      それぞれのタスクは直列に実行されています。


            // ■1つのパイプに2つの処理を続けて実行
            Console.WriteLine("#### TEST 4 ####");
            stopwatch.Restart();
            pipe
                .ConsoleWrite("1 ")
                .ConsoleWrite(" 2")
                .Run();
            stopwatch.Stop();
            Console.WriteLine("#### TEST 4 ==> time {0:00000}ms\n\n"
                , stopwatch.ElapsedMilliseconds);
            // ---> 2つの処理は直列実行されます。
            //      2つの処理が終わるまで次のタスクはスケジュールされません。


            // ■1つのパイプに2つの処理をスケジューラ切り替えをはさんで実行
            Console.WriteLine("#### TEST 5 ####");
            stopwatch.Restart();
            pipe
                .ObserveOn(Scheduler.ThreadPool)
                .ConsoleWrite("1 ")
                .ObserveOn(Scheduler.ThreadPool)
                .ConsoleWrite(" 2")
                .Run();
            stopwatch.Stop();
            Console.WriteLine("#### TEST 5 ==> time {0:00000}ms\n\n"
                , stopwatch.ElapsedMilliseconds);
            // ---> 2つの処理は並列実行されます。
            //      パイプ内で処理が追い抜かれることはありません。


            Console.WriteLine("\n#### 終了しました、何かキーを押してください。 ####");
            Console.Read();
        }

        public static IObservable<int> ConsoleWrite(this IObservable<int> pipe, string name)
        {
            var rand = new Random(name.GetHashCode());
            return pipe
                .Do(count => {
                    var sleepTime = rand.Next(999);
                    var title = string.Format("({0:0})[{1}]", count, name);
                    Console.WriteLine("({0:HH:mm:ss.fff}) {1} : START({2:00}/{3:000}ms)"
                        , DateTimeOffset.Now, title
                        , Thread.CurrentThread.ManagedThreadId, sleepTime);

                    Thread.Sleep(sleepTime);

                    Console.WriteLine("({0:HH:mm:ss.fff}) {1} :                 END({2:00})"
                        , DateTimeOffset.Now, title
                        , Thread.CurrentThread.ManagedThreadId, sleepTime);
                })
                ;
        }

    }

}

TEST1:1処理だけのパイプに10個投入

実行結果

#### TEST 1 ####
(01:38:22.493) (1)[1 ] : START(06/630ms)
(01:38:23.132) (1)[1 ] :                 END(06)
(01:38:23.135) (2)[1 ] : START(10/668ms)
(01:38:23.803) (2)[1 ] :                 END(10)
(01:38:23.803) (3)[1 ] : START(06/391ms)
(01:38:24.194) (3)[1 ] :                 END(06)
(01:38:24.194) (4)[1 ] : START(12/875ms)
(01:38:25.069) (4)[1 ] :                 END(12)
(01:38:25.069) (5)[1 ] : START(06/276ms)
(01:38:25.345) (5)[1 ] :                 END(06)
(01:38:25.345) (6)[1 ] : START(11/449ms)
(01:38:25.794) (6)[1 ] :                 END(11)
(01:38:25.794) (7)[1 ] : START(10/250ms)
(01:38:26.044) (7)[1 ] :                 END(10)
(01:38:26.044) (8)[1 ] : START(11/184ms)
(01:38:26.228) (8)[1 ] :                 END(11)
(01:38:26.228) (9)[1 ] : START(10/608ms)
(01:38:26.836) (9)[1 ] :                 END(10)
#### TEST 1 ==> time 04371ms

ここから読み取れるのは、

  • 1処理は任意のスレッドプールに割り振られる
  • スレッドプールは切り替わるが、処理の追い越しは発生しない

1パイプ内のデータは、たとえスリープが発生したとしても処理の追い越しは発生しません。全ての処理が一度にスケジュールされるのではなく、1処理が終わるたびに次の処理がスケジュールされるようです。
考えてみればこれはタスク並列、またはアクターモデルにおける重要な性格。1つの処理パイプにおいては処理が直列化されることで、並行処理にまつわるもろもろの同期処理を意識せずにロジックを組むことが出来るのです。

TEST2:2つのパイプをマージ

今度はパイプを2つ用意し、Mergeしてみました。

#### TEST 2 ####
(01:38:26.852) (1)[1 ] : START(06/630ms)
(01:38:26.853) (1)[ 2] : START(11/349ms)
(01:38:27.203) (1)[ 2] :                 END(11)
(01:38:27.204) (1)[--] :                         MERGE(11)
(01:38:27.204) (2)[ 2] : START(10/506ms)
(01:38:27.483) (1)[1 ] :                 END(06)
(01:38:27.483) (1)[--] :                         MERGE(06)
(01:38:27.484) (2)[1 ] : START(06/668ms)
(01:38:27.711) (2)[ 2] :                 END(10)
(01:38:27.711) (2)[--] :                         MERGE(10)
(01:38:27.712) (3)[ 2] : START(11/316ms)
(01:38:28.028) (3)[ 2] :                 END(11)
(01:38:28.028) (3)[--] :                         MERGE(11)
(01:38:28.029) (4)[ 2] : START(10/052ms)
(01:38:28.084) (4)[ 2] :                 END(10)
(01:38:28.084) (4)[--] :                         MERGE(10)
(01:38:28.085) (5)[ 2] : START(11/758ms)
(01:38:28.153) (2)[1 ] :                 END(06)
(01:38:28.153) (2)[--] :                         MERGE(06)
(01:38:28.154) (3)[1 ] : START(13/391ms)
(01:38:28.545) (3)[1 ] :                 END(13)
(01:38:28.545) (3)[--] :                         MERGE(13)
(01:38:28.546) (4)[1 ] : START(12/875ms)
(01:38:28.843) (5)[ 2] :                 END(11)
(01:38:28.843) (5)[--] :                         MERGE(11)
(01:38:28.844) (6)[ 2] : START(13/837ms)
(01:38:29.422) (4)[1 ] :                 END(12)
(01:38:29.422) (4)[--] :                         MERGE(12)
(01:38:29.423) (5)[1 ] : START(11/276ms)
(01:38:29.681) (6)[ 2] :                 END(13)
(01:38:29.681) (6)[--] :                         MERGE(13)
(01:38:29.682) (7)[ 2] : START(12/041ms)
(01:38:29.700) (5)[1 ] :                 END(11)
(01:38:29.700) (5)[--] :                         MERGE(11)
(01:38:29.701) (6)[1 ] : START(10/449ms)
(01:38:29.723) (7)[ 2] :                 END(12)
(01:38:29.723) (7)[--] :                         MERGE(12)
(01:38:29.724) (8)[ 2] : START(11/672ms)
(01:38:30.150) (6)[1 ] :                 END(10)
(01:38:30.150) (6)[--] :                         MERGE(10)
(01:38:30.151) (7)[1 ] : START(13/250ms)
(01:38:30.397) (8)[ 2] :                 END(11)
(01:38:30.397) (8)[--] :                         MERGE(11)
(01:38:30.398) (9)[ 2] : START(10/397ms)
(01:38:30.401) (7)[1 ] :                 END(13)
(01:38:30.401) (7)[--] :                         MERGE(13)
(01:38:30.401) (8)[1 ] : START(12/184ms)
(01:38:30.586) (8)[1 ] :                 END(12)
(01:38:30.586) (8)[--] :                         MERGE(12)
(01:38:30.587) (9)[1 ] : START(06/608ms)
(01:38:30.795) (9)[ 2] :                 END(10)
(01:38:30.795) (9)[--] :                         MERGE(10)
(01:38:31.195) (9)[1 ] :                 END(06)
(01:38:31.195) (9)[--] :                         MERGE(06)
#### TEST 2 ==> time 04356ms

予想通り2つのパイプは平行に動作し、Mergeポイントでのみデータが直列化されます。Mergeではデータが来た順に受け付けますので、2つのパイプの処理のどちらが早いかによって受け付け順が異なることになります。

TEST3:2つのパイプをZip

Mergeあらため、Zip。ここで少し予想外な挙動です。

#### TEST 3 ####
(01:41:15.755) (1)[1 ] : START(11/630ms)
(01:41:15.755) (1)[ 2] : START(10/349ms)
(01:41:16.105) (1)[ 2] :                 END(10)
(01:41:16.111) (2)[ 2] : START(14/506ms)
(01:41:16.385) (1)[1 ] :                 END(11)
(01:41:16.390) (1)[--] :                         ZIP(11) [01,01]
(01:41:16.390) (2)[1 ] : START(13/668ms)
(01:41:16.618) (2)[ 2] :                 END(14)
(01:41:16.618) (3)[ 2] : START(11/316ms)
(01:41:16.935) (3)[ 2] :                 END(11)
(01:41:16.935) (4)[ 2] : START(14/052ms)
(01:41:16.988) (4)[ 2] :                 END(14)
(01:41:16.988) (5)[ 2] : START(14/758ms)
(01:41:17.059) (2)[1 ] :                 END(13)
(01:41:17.059) (2)[--] :                         ZIP(13) [02,02]
(01:41:17.060) (3)[1 ] : START(11/391ms)
(01:41:17.454) (3)[1 ] :                 END(11)
(01:41:17.454) (3)[--] :                         ZIP(11) [03,03]
(01:41:17.455) (4)[1 ] : START(10/875ms)
(01:41:17.747) (5)[ 2] :                 END(14)
(01:41:17.747) (6)[ 2] : START(14/837ms)
(01:41:18.330) (4)[1 ] :                 END(10)
(01:41:18.330) (4)[--] :                         ZIP(10) [04,04]
(01:41:18.331) (5)[1 ] : START(13/276ms)
(01:41:18.585) (6)[ 2] :                 END(14)
(01:41:18.585) (7)[ 2] : START(15/041ms)
(01:41:18.607) (5)[1 ] :                 END(13)
(01:41:18.607) (5)[--] :                         ZIP(13) [05,05]
(01:41:18.608) (6)[1 ] : START(11/449ms)
(01:41:18.627) (7)[ 2] :                 END(15)
(01:41:18.627) (8)[ 2] : START(15/672ms)
(01:41:19.057) (6)[1 ] :                 END(11)
(01:41:19.057) (6)[--] :                         ZIP(11) [06,06]
(01:41:19.058) (7)[1 ] : START(11/250ms)
(01:41:19.300) (8)[ 2] :                 END(15)
(01:41:19.300) (9)[ 2] : START(10/397ms)
(01:41:19.308) (7)[1 ] :                 END(11)
(01:41:19.308) (7)[--] :                         ZIP(11) [07,07]
(01:41:19.309) (8)[1 ] : START(11/184ms)
(01:41:19.493) (8)[1 ] :                 END(11)
(01:41:19.493) (8)[--] :                         ZIP(11) [08,08]
(01:41:19.494) (9)[1 ] : START(14/608ms)
(01:41:19.698) (9)[ 2] :                 END(10)
(01:41:20.102) (9)[1 ] :                 END(14)
(01:41:20.102) (9)[--] :                         ZIP(14) [09,09]
#### TEST 3 ==> time 04368ms

片方が先に到着したらそちらの処理は待機するかと思いましたが、待機せずそのまま続行しています。Zip処理では値をバッファし、両方がそろった段階でバッファから取り出す実装です。考えてみればパイプへのデータ投入は待ったなしですので、受け身の処理が基本であるRxではデータの同期はZip側で面倒見る必要があるのですね。

TEST4:1つのパイプに2つの処理

パイプに、同じ処理を2回実行させてみます。

#### TEST 4 ####
(01:41:20.104) (1)[1 ] : START(11/630ms)
(01:41:20.735) (1)[1 ] :                 END(11)
(01:41:20.735) (1)[ 2] : START(11/349ms)
(01:41:21.085) (1)[ 2] :                 END(11)
(01:41:21.085) (2)[1 ] : START(14/668ms)
(01:41:21.754) (2)[1 ] :                 END(14)
(01:41:21.754) (2)[ 2] : START(14/506ms)
(01:41:22.262) (2)[ 2] :                 END(14)
(01:41:22.262) (3)[1 ] : START(15/391ms)
(01:41:22.654) (3)[1 ] :                 END(15)
(01:41:22.654) (3)[ 2] : START(15/316ms)
(01:41:22.971) (3)[ 2] :                 END(15)
(01:41:22.971) (4)[1 ] : START(11/875ms)
(01:41:23.847) (4)[1 ] :                 END(11)
(01:41:23.847) (4)[ 2] : START(11/052ms)
(01:41:23.900) (4)[ 2] :                 END(11)
(01:41:23.900) (5)[1 ] : START(12/276ms)
(01:41:24.177) (5)[1 ] :                 END(12)
(01:41:24.177) (5)[ 2] : START(12/758ms)
(01:41:24.936) (5)[ 2] :                 END(12)
(01:41:24.936) (6)[1 ] : START(15/449ms)
(01:41:25.386) (6)[1 ] :                 END(15)
(01:41:25.386) (6)[ 2] : START(15/837ms)
(01:41:26.224) (6)[ 2] :                 END(15)
(01:41:26.224) (7)[1 ] : START(13/250ms)
(01:41:26.475) (7)[1 ] :                 END(13)
(01:41:26.475) (7)[ 2] : START(13/041ms)
(01:41:26.517) (7)[ 2] :                 END(13)
(01:41:26.517) (8)[1 ] : START(11/184ms)
(01:41:26.702) (8)[1 ] :                 END(11)
(01:41:26.702) (8)[ 2] : START(11/672ms)
(01:41:27.375) (8)[ 2] :                 END(11)
(01:41:27.375) (9)[1 ] : START(12/608ms)
(01:41:27.984) (9)[1 ] :                 END(12)
(01:41:27.984) (9)[ 2] : START(12/397ms)
(01:41:28.382) (9)[ 2] :                 END(12)
#### TEST 4 ==> time 08276ms

結果は、「2つの処理が終わるまで次はスケジュールされない」です。スケジューラへの処理割振り単位は「2つの処理がまとめられて」行われています。単純に関数合成がされる範囲においては並行処理には分割されません。

TEST5:2つの処理の間にスケジューラ切り替えを挟む

今度は処理の間にObserveOnを挟んでみました。

#### TEST 5 ####
(01:41:28.390) (1)[1 ] : START(15/630ms)
(01:41:29.020) (1)[1 ] :                 END(15)
(01:41:29.020) (2)[1 ] : START(15/668ms)
(01:41:29.020) (1)[ 2] : START(12/349ms)
(01:41:29.373) (1)[ 2] :                 END(12)
(01:41:29.691) (2)[1 ] :                 END(15)
(01:41:29.691) (3)[1 ] : START(15/391ms)
(01:41:29.691) (2)[ 2] : START(14/506ms)
(01:41:30.083) (3)[1 ] :                 END(15)
(01:41:30.083) (4)[1 ] : START(12/875ms)
(01:41:30.199) (2)[ 2] :                 END(14)
(01:41:30.199) (3)[ 2] : START(14/316ms)
(01:41:30.516) (3)[ 2] :                 END(14)
(01:41:30.959) (4)[1 ] :                 END(12)
(01:41:30.959) (5)[1 ] : START(12/276ms)
(01:41:30.959) (4)[ 2] : START(10/052ms)
(01:41:31.013) (4)[ 2] :                 END(10)
(01:41:31.236) (5)[1 ] :                 END(12)
(01:41:31.236) (5)[ 2] : START(11/758ms)
(01:41:31.236) (6)[1 ] : START(13/449ms)
(01:41:31.686) (6)[1 ] :                 END(13)
(01:41:31.686) (7)[1 ] : START(10/250ms)
(01:41:31.937) (7)[1 ] :                 END(10)
(01:41:31.937) (8)[1 ] : START(14/184ms)
(01:41:31.995) (5)[ 2] :                 END(11)
(01:41:31.995) (6)[ 2] : START(12/837ms)
(01:41:32.122) (8)[1 ] :                 END(14)
(01:41:32.122) (9)[1 ] : START(10/608ms)
(01:41:32.731) (9)[1 ] :                 END(10)
(01:41:32.833) (6)[ 2] :                 END(12)
(01:41:32.833) (7)[ 2] : START(14/041ms)
(01:41:32.875) (7)[ 2] :                 END(14)
(01:41:32.875) (8)[ 2] : START(10/672ms)
(01:41:33.548) (8)[ 2] :                 END(10)
(01:41:33.548) (9)[ 2] : START(11/397ms)
(01:41:33.946) (9)[ 2] :                 END(11)
#### TEST 5 ==> time 05562ms

すると、処理1と2は同時に実行されるようになります。スケジュール単位はObserveOnで区切られるため、処理の追い抜きが発生しない範囲で並行処理が行われるようになりました。

総論:タスク並列とデータ並列の使い分け

Rxに限らずアクターモデル系技術の根幹の思想は「如何に並行処理を感じさせずに非同期処理をプログラム出来るか」です。「実行は非同期化されること」「同期処理のような直感的なコードが書けること」をフレームワークが保証するからこそ、デバッグが容易な非同期処理を実装できるようになっています。

処理の流れにいつ終わるかわからない非同期処理が挟まれる場合においても、コードを分散させずに処理の流れを記述する事が出来ます。また、パイプ内の処理は直列化されますので同期を心配する必要はありません。
そのかわり、処理順序にこだわる必要が無い場合においても1パイプ内で処理が直列化されますので、CPUをぶんぶん回すような並行処理分散が行われるわけではありません。

もう一つの並行処理技術としてPLINQの実装がありますが、こちらは逆にデータ並列モデルを採用しています。大量のデータをなるべくCPUを遊ばせることなしに処理させることが目的であるため、PLINQでは最終結果に影響しない限り、処理順序に拘りません。コード記述者は処理順序が影響しない単位でデータ処理を分割することを心がけることで最大の効果を発揮する事が出来ます。

タスク並列アプローチでも適切な関数を準備すればデータ並列的な処理分散は可能なのですが、.NETのアプローチは2つの役割を明確に分離しているようで、見たところRxには2つのパイプをまとめる関数はあっても、複数のパイプに分散したり実行順序を守らなくていい処理分散を明示的に行う関数群は用意されていないようです。負荷分散に関してはPLINQが専門家ですね。

幸いどちらもAPIの見た目は似ていますし相互乗り入れも可能です。利用者はどちらか一方だけを盲目的に使うのではなく、処理の内容により適切な技術を選ぶのが.NET流の並行処理プログラミングのスタイルじゃないかと思いました。