プログラム言語 非同期処理
目次
用語解説
非同期処理
複数の別の処理を同時に行う。
マルチコア環境(複数のCPUで処理する)の場合、高速化する可能性あり。
並列(並行)処理
単一の処理を複数に分割して行う。
プロセス
プログラムの実行単位
プロセスは1つ以上のスレッド、固有のヒープメモリ領域等で構成される。
起動や切り替えに時間が掛かる。
複数プロセスに対する処理時間の割り当てはOSが行う。
具体的にはプロセス内のスレッドに対して処理を割り当てる。
スレッド
CPUの利用単位
スレッド単位でスタック、CPUレジスタのコピーを持つ
ヒープメモリ領域は同一プロセス内のスレッドで共用する
起動や切り替えが速い。
複数スレッドに対する処理時間の割り当てはOSが行う。
UIスレッド
=メインスレッド
※エンド・ユーザーからの入力を受け付けたり、UIを描画したりするスレッド
ワーカースレッド
UIスレッドのバックグラウンドで動作する別スレッド
非同期処理(マルチスレッド)のメリット
プロセスA
スレッド1
スレッド2
プロセスB
スレッド1
だとプロセスAの方が多くの処理時間が割り当てられる。
よってマルチスレッド処理は有用。
同期制御
複数のスレッドがタイミングを計りながらリソースに対してアクセスする事。
プリエンプティブ
マルチタスク、マルチスレッドの処理におけるスケジューリング方法の1つ。
タスク(スレッド)が動作中であろうが一定時間が経過すると強制的に処理を別タスクへ移す。
強調的(ノン・プリエンプティブ)
各タスクが自己申告で都合の良い場所まで処理を終えてから処理を別タスクへ移す。
実際にはシステムコールを呼び出すタイミングで処理が移される。
多段フィードバックキュー
・短いタスクを優先する
・FIFO(FirstInFirstOut)方式で最初にスケジューリングされたタスクを優先する
等の優先順位の重みに違いがあるプリエンプティブ方式
スレッドセーフ
1つのデータ(オブジェクト)に対して、複数のスレッドが同時にアクセスした場合、データの不整合が発生する。
排他制御を施し、この現象が発生しないクラスのこと。
スレッドセーフなクラスは継承禁止にすべき。
排他制御処理は時間が掛かる為、無用な使用は控えるべき。
スタック
FIFO方式で使用するメモリ領域。
スタックポインタ(SP)レジスタにスタックのアドレスを保持する事で、
スタックの読み込み先、書き込み先を判定する。
ローカル変数、関数の戻り先等に使用。
マルチスレッドにおけるスタック利用
スレッドを別に作成する毎にスタック領域が確保される(デフォルト:1MB)。
スレッドの切り替え時にはCPUレジスタの内容を退避/復帰する必要がある為、退避用メモリ領域も必要(デフォルト:1MB)。
容量及び、確保に時間を要するのでマルチスレッドはコストが高いと言われる。
スレッドプール
スレッド用のプログラムを保持しておくメモリ領域。
マルチスレッド用のスタック領域を破棄せずに使いまわす事で容量及び処理時間を短縮する目的で使用される。
.NET4.0以前
ThreadPool.QueueUserWorkItemメソッドを利用
(省略)
.NET4.0以降
=Taskクラスを利用したスレッド
実例
別スレッド呼び出し(引数無し)
非同期処理はスレッドプール(Taskクラス)を使う。
static void Main(string[] args)
{
ThreadStartデリゲートにメソッドを指定
Thread other1 = new Thread(start: new ThreadStart(OtherThread1));
other1.Start();
パラメータ用ThreadStartデリゲートにメソッドを指定
Thread other2 = new Thread(start: OtherThread1);
other2.Start();
ラムダ式
Thread other3 = new Thread(() => OtherThread1());
other3.Start();
}
public static void OtherThread1()
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine(@”Other!”);
}
}
非同期処理
});
myTask.Start();
new Task( () =>
{
非同期処理
}).Start();
Task.Run(() =>
{
非同期処理
});
var ret = Task.Run(() =>
{
非同期処理
return 5;
});
ret.Result:5
{
await Task.Run(() =>
{
非同期処理
});
return @"AAA";
}
var ret = myAsyncAwait().Result;
ret:AAA
Action func = async () =>
{
await Task.Run(() =>
{
非同期処理
});
};
func();
var html = Task.Run(async () => {
return await new HttpClient().GetStringAsync(@"https://office-yone.com/");
});
html.Result:<html>~
Windowsコントロールへの非同期処理反映
SampleAsync();
Invoke等の処理は不要
public async void SampleAsync() {
this.Windowsコントロール.~ = "実行中";
await Task.Run( () => {
非同期処理
} );
this.Windowsコントロール.~ = "完了";
}
@Override
public void run(){
try{
for (int i=0; i<10; i++){
System.out.println( "A" );
Thread.sleep(100);
}
}catch(InterruptedException e){
System.out.println(e.getMessage());
}
}
}
public class ThreadB extends Thread{
public void run(){
try{
for (int i=0; i<10; i++){
System.out.println( "B" );
Thread.sleep(100);
}
}catch(InterruptedException e){
System.out.println(e.getMessage());
}
}
}
public class App{
public static void main( String[] args )<{
new ThreadA().start();
new ThreadB().start();
}
}
→A B B A A B B A A B A B B A B A B A A B
public void run(){
try{
for (int i=0; i<10; i++){
System.out.println( "A" );
Thread.sleep(100);
}
}catch(InterruptedException e){
System.out.println(e.getMessage());
}
}
}
public class ThreadB implements Runnable{
public void run(){
try{
for (int i=0; i<10; i++){
System.out.println( "B" );
Thread.sleep(100);
}
}catch(InterruptedException e){
System.out.println(e.getMessage());
}
}
}
public class App{
public static void main( String[] args ){
new Thread(new ThreadA()).start();
new Thread(new ThreadB()).start();
}
}
→A B B A B A B A A B B A B A B A A B A B
別スレッド呼び出し(引数有り)
非同期処理はスレッドプール(Taskクラス)を使う。
ラムダ式
Thread other4 = new Thread(() => OtherThread2(@”4″));
//other4.Start();
匿名メソッド
new Thread(start: delegate()
{
OtherThread2(“5”);
}).Start();
パラメータ用ThreadStartデリゲートに引数を渡す
Thread other6 = new Thread(start: OtherThread3);
other6.Start(parameter: @”6″);
for (int i = 0; i < 10; i++)
{
Console.WriteLine(@”Main!”);
}
}
static void OtherThread2(string msg)
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine(msg);
}
Console.ReadKey();
}
static void OtherThread3(object msg)
{
※ パラメータ用ThreadStartデリゲートに引数の引数は変換が必要
string myMsg = (string)msg;
for (int i = 0; i < 10; i++)
{
Console.WriteLine(myMsg);
}
Console.ReadKey();
}
private String _s;
public MyThread(String s){
this._s = s;
}
public void run(){
System.out.println( this._s );
}
}
public class App{
public static void main( String[] args ){
new MyThread("Java").run();
}
}
private String _s;
public MyThread(String s){
this._s = s;
}
public void run(){
System.out.println( this._s );
}
}
public class App{
public static void main( String[] args ){
new Thread(new MyThread("Java")).start();
}
}
スレッドの終了を待つ
var myTask = new Task( () =>
{
Console.WriteLine("myTask : start");
System.Threading.Thread.Sleep(1000);
Console.WriteLine("myTask : end");
});
Console.WriteLine("program : start");
myTask.Start(); タスク開始
myTask.Wait(); タスクの終了を待つ
Console.WriteLine("program : end");
→
program : start
myTask : start
myTask : end
program : start
非同期処理が成功してから処理を開始する
var myTask = new Task(() =>
{
Console.WriteLine("myTask1 : start");
System.Threading.Thread.Sleep(5000);
Console.WriteLine("myTask1 : end");
});
myTask.Start();
↑ スレッドの終了後に開始
myTask.ContinueWith(_ =>
{
Console.WriteLine(@"myTask was finished.");
});
Console.WriteLine(@"program : end");
↑ スレッドの終了を待たずに実行される。
↑ スレッドは非同期
var myTask = new Task(() =>
{
Console.WriteLine("myTask1 : start");
System.Threading.Thread.Sleep(5000);
Console.WriteLine("myTask1 : end");
});
myTask.Start();
myTask.ContinueWith(_ =>
{
Console.WriteLine(@"myTask was finished.");
});
↑ スレッドの終了を待つ。
Task.WaitAll(myTask);
Console.WriteLine(@"program : end");
複数のスレッドの終了を待つ
{
Console.WriteLine("myThread1 : start");
System.Threading.Thread.Sleep(1000);
Console.WriteLine("myThread1 : end");
});
var myThread2 = new Thread(() =>
{
Console.WriteLine("myThread2 : start");
System.Threading.Thread.Sleep(2000);
Console.WriteLine("myThread2 : end");
});
Console.WriteLine("program : start");
myThread1.Start();
myThread2.Start();
myThread1.Join();
myThread2.Join();
Console.WriteLine("program : end");
スレッドが2つとも終了してから処理される
※タスククラスを使用した場合1
using System.Threading.Tasks;
var myTask1 = new Task(() =>
{
Console.WriteLine("myTask1 : start");
System.Threading.Thread.Sleep(1000);
Console.WriteLine("myTask1 : end");
});
var myTask2 = new Task(() =>
{
Console.WriteLine("myTask2 : start");
System.Threading.Thread.Sleep(2000);
Console.WriteLine("myTask2 : end");
});
Console.WriteLine("program : start");
myTask1.Start();
myTask2.Start();
Task.WaitAll(myTask1, myTask2);
Console.WriteLine("program : end");
スレッドが2つとも終了してから処理される
※タスククラスを使用した場合2
using System.Threading.Tasks;
Parallel.Invoke(() =>
{
Console.WriteLine("myTask1 : start");
System.Threading.Thread.Sleep(1000);
Console.WriteLine("myTask1 : end");
},
() =>
{
Console.WriteLine("myTask2 : start");
System.Threading.Thread.Sleep(2000);
Console.WriteLine("myTask2 : end");
});
Console.WriteLine("program : end");
スレッドが2つとも終了してから処理される
排他制御
複数のスレッドから共通のリソース(データ)にほぼ同時にアクセスする事によるデータの不整合を制御する事
非同期処理をロックしない場合
static void myThread(int prm)
{
Console.WriteLine(@"[");
Thread.Sleep(prm * 1000);
Console.WriteLine(@"]");
}
static void Main(string[] args)
{
Parallel.Invoke(
() => myThread(prm: 1),
() => myThread(prm: 2)
);
}
→ [ [ ] ]
非同期処理をロックする場合
private static object myLock = new object();
static void myThread(int prm)
{
lock (myLock)
{
Console.WriteLine(@"[");
Thread.Sleep(prm * 1000);
Console.WriteLine(@"]");
}
}
static void Main(string[] args)
{
Parallel.Invoke(
() => myThread(prm: 1),
() => myThread(prm: 2)
);
}
→ [ ] [ ]
private Bathroom room;
private String name;
public FamilyTread(Bathroom room, String name) {
this.room = room;
this.name = name;
}
public void run() {
this.room.in(this.name);
}
}
public class Bathroom {
synchronized付与により、メソッドを同時呼び出し不可に
public synchronized void in(String name) {
System.out.println(name + ":enter");
try {
System.out.println(name + ":enterd");
3秒待機
Thread.sleep(3000);
} catch (InterruptedException e) {
}
System.out.println(name + ":out");
}
}
public class Main {
public static void main(String[] args) {
Bathroom room = new Bathroom();
FamilyTread father = new FamilyTread(room, "父");
FamilyTread mother = new FamilyTread(room, "母");
FamilyTread sister = new FamilyTread(room, "姉");
FamilyTread me = new FamilyTread(room, "自分");
father.start();
mother.start();
sister.start();
me.start();
}
}
結果:synchronized無
母:enter
父:enter
姉:enter
自分:enter
姉:enterd
姉:out
父:enterd
父:out
母:enterd
母:out
自分:enterd
自分:out
結果:synchronized有
父:enter
父:enterd
父:out
自分:enter
自分:enterd
自分:out
姉:enter
姉:enterd
姉:out
母:enter
母:enterd
母:out
並列処理
Parallel.For(0, 10, (n) =>
{
Console.WriteLine(n);
});
→0~9まで数字が出力されるが毎回結果が異なる
(出力に関して並列で処理が行われている)
ForEach利用
var ary = Enumerable.Range(start: 0, count: 10);
Parallel.ForEach(ary, n => {
Console.WriteLine(n);
});
LINQの並列処理(PLINQ)
static void Main(string[] args)
{
var oldAry = Enumerable.Range(start: 0, count: 10);
var newAry1 = oldAry.Where(n => n > retNum()).Select(n => n).ToArray();
時間は掛かるが0~9が順番通り格納される
var newAry2 = oldAry.Where(n => n > retNum()).AsParallel().Select(n => n).ToArray();
0~9がバラバラに格納される(並列で処理される)。
var newAry3 = oldAry.Where(n => n > retNum()).AsParallel().AsOrdered().Select(n => n).ToArray();
時間は掛かるが0~9が順番通り格納される(並列で処理されるがソートされる)。
}
時間を掛けて0を返すだけの関数
static int retNum()
{
Thread.Sleep(millisecondsTimeout: 1000);
return 0;
}
タイマー
フォームコントロールのタイマーとは別。
処理が軽い。
private delegate void _delegate();
※フォーム操作用
private System.Threading.Timer _timer();
※タイマーオブジェクトは外部に保持しておかないとガーベージコレクションの対象となる
private void MyForm_Load(object sender, EventArgs e)
{
定期的に実行する処理を指定(callBackMethod)
TimerCallback timerDelegate = new TimerCallback(callBackMethod);
タイマー実行
_timer = new System.Threading.Timer(
callback: timerDelegate,
state: null,
dueTime: 0,
period: 1000);
}
定期的に実行される処理
private void callBackMethod(object o)
{
処理
処理
フォームを操作する場合
Invoke(method: new _delegate(method));
}
/複数行に渡る処理を行う場合
private void method()
{
this.MyLabel.Text = “フォーム操作”;
}
IAsyncResultクラス利用による非同期処理
内部的にスレッドプールを使用している為処理が軽くなる。
基本
delegate string SampleDelegate(int prm1, string prm2);
static void Main(string[] args)
{
移譲オブジェクト作成
SampleDelegate sd = new SampleDelegate(DelegateMethod);
非同期処理開始
IAsyncResult ar = sd.BeginInvoke(prm1: 100, prm2: @”test”, callback: null, @object: null);
非同期処理終了
string result = sd.EndInvoke(result: ar);
}
非同期で実行させる(重たい)処理
static string DelegateMethod(int prm1, string prm2)
{
return “”;
}
待ち合わせ・待機
delegate string SampleDelegate(int prm1, string prm2);
static void Main(string[] args)
{
移譲オブジェクト作成
SampleDelegate sd = new SampleDelegate(DelegateMethod);
非同期処理開始
IAsyncResult ar = sd.BeginInvoke(prm1: 100, prm2: @”test”, callback: null, @object: null);
非同期処理を2秒中断
ar.AsyncWaitHandle.WaitOne(millisecondsTimeout: 2000);
非同期処理終了まで待機
ar.AsyncWaitHandle.WaitOne();
if (ar.IsCompleted)
{
非同期処理終了
string result = sd.EndInvoke(result: ar);
}
}
非同期で実行させる(重たい)処理
static string DelegateMethod(int prm1, string prm2)
{
return “”;
}
コールバック関数利用
delegate string SampleDelegate(int prm1, string prm2);
static void Main(string[] args)
{
移譲オブジェクト作成
SampleDelegate sd = new SampleDelegate(DelegateMethod);
コールバック関数定義
AsyncCallback cb = new AsyncCallback(CallBackMethod);
非同期処理開始
IAsyncResult ar = sd.BeginInvoke(prm1: 100, prm2: @”test”, callback: cb, @object: null);
非同期処理終了
string result = sd.EndInvoke(result: ar);
}
非同期で実行させる(重たい)処理
static string DelegateMethod(int prm1, string prm2)
{
Console.WriteLine(“asynchronous”);
Console.ReadKey();
return “”;
}
static void CallBackMethod(IAsyncResult ar)
{
Console.WriteLine(“callback”);
Console.ReadKey();
}
スレッド関連メソッド/プロパティ
Thread インスタンス = new Thread(start: new ThreadStart(メソッド名));
インスタンス.Start();
※別スレッドで起動できるのは戻り値無し、引数無しのメソッドのみ
戻り値やパラメータを利用したいときは、後述のデリゲートを使用
バックグラウンド・スレッド
メインのスレッドが終了すると自身も終了するスレッド
フォアグラウンド・スレッド
メインのスレッドが終了しても終了しないスレッド
バックグラウンド・スレッドとする
インスタンス.IsBackground = true;
優先度変更
インスタンス.Priority = ThreadPriority.Highest;
インスタンス.Priority = ThreadPriority.Normal;
インスタンス.Priority = ThreadPriority.Lowest;
スレッドの一時停止
インスタンス.Suspend();
スレッドの再開
インスタンス.Resume();
スレッドの処理が終了するまで待つ
インスタンス.Join();
スレッドの強制終了
インスタンス.Abort();
1秒間待つ
Thread.Sleep(1000);