モチベーション

ブロッキングI/O,ノンブロッキングI/O,同期I/O,非同期I/O,I/O多重化を理解する

記事概要

非同期I/Oはインターネット上の文献を読んでいると、ノンブロッキングI/Oと同じだと解釈されていることが多い。同僚もノンブロッキングI/Oと非同期I/Oは同じだと話していたけど、この辺の話はあまり詳しくないから納得できなかったが、本当にそうなんだろうか。本記事では、非同期I/O、ノンブロッキングI/Oをの違いを整理することを主眼として、関連する同期I/O、ブロッキングI/O、I/O多重化などI/O周りのあやふやなに知識を焼きなおそうと思う。

とりあえず結論から

結論からいうと、ブロッキングI/Oと同期I/Oは同じであるが、ノンブロッキングI/Oと非同期I/Oは違うものと自分は判断した。

ブロッキングI/O & 同期I/O(Syncronous I/O)

ブロッキングI/O(同期I/O)ではI/O処理時に対象のファイルディスクリプタが準備完了になっていない場合、ブロック状態、つまりプロセスはシステムコールの返答待ち状態になり張り付く。その間プログラムの処理が進むことはない。以下は、ブロッキングI/Oのユーザーモードとカーネルモードの遷移を表す。カーネルモードにコンテキストが切り替わり、システムコールの実行が完了すると元のユーザモードにコンテキストが戻り、ブロック状態からも解放される。(以降に出てくる図は”Boost application performance using asynchronous I/O”から引用させて頂いたものである)

20150422_ブロッキングIO

ノンブロッキングI/O

ノンブロッキングI/OではI/O対象のファイルディスクリプタの準備完が了していないことをアプリケーション側に伝えるため即座にエラーが返る(errnoにEGAINが格納されて返ってくる)。一般に、O_NONBLOCKフラグを利用してノンブロッキングモードを宣言するが、この時プロセスはブロック状態にならず、CPUを他の処理に回すことができるためI/O待ち時間を有効活用できる。エラーはアプリケーション側でハンドリングしてリトライするタイミングを定義する必要がある。ノンブロッキングI/Oはソケットなどのネットワークに用いられるI/OモデルでディスクI/Oには使わない。C10K問題の対策としてノンブロッキングI/O+イベントループモデルを採用することでシングルスレッドで複数の接続を処理する方法がある。

20150422_syncronous+nonblockingI/O

非同期I/O(Asyncronous I/O)

I/O処理が完了したタイミングで通知するI/Oモデルを非同期I/Oという。非同期I/Oは、通知はシグナルもしくはコールバックによって行われ、通知があるまではアプリケーション側で他の処理を進めることができる。プロセスがブロック状態にならない点ではノンブロッキングI/Oと一緒だが、非同期I/OはI/Oの処理が完了したら通知をし、ノンブロッキングI/OはI/O処理が可能な状態にないことをエラーで通知するので動きは異なる。

20150422_非同期&ノンブロッキングIO

ノンブロッキングI/Oと非同期I/Oはどこが違う?

ノンブロッキングI/Oは処理がすぐできない時はエラーを返し、ブロック状態にさせない方式。一方、非同期I/Oは処理がすぐできない時は処理が完了するまでバックグラウンドで待機して、終了したタイミングで通知を返す方式(通知が来たら既に処理が終わっている)。

I/Oの多重化(I/O Multiplexing)

I/O関連の調査をしていると必ず出てくるのがI/Oの多重化。このI/O多重化とはpoll()、select()、epollシステムコールを利用して、複数のファイルディスクリプタを1つのプロセスで管理すること。これらのシステムコールはファイルディスクリプタの状態変化を監視できるため、複数のファイルディスクリプタのいずれかが入出力準備が完了したかわかる。select、poll、epollの違いは以下の通り。

select

selectはI/Oを多重化するためのシステムコールの1つ。監視対象のファイルディスクリプタn個をループの中で1つずつ状態確認する。そのため,計算量的には*O(n)*になる.また,扱えるファイルディスクリプタの数に上限があるため、現在はこのシステムコールを積極的に利用する理由は特にない。以下はselectを使ったsocket()、bind()、listen()後に準備ができているソケットを監視するプログラムだ。

void accept_loop(int soc)
{
    char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
    int child[MAX_CHILD];
    struct timeval timeout;
    struct sockaddr_storage from;
    int acc, child_no, width, i, count, pos, ret;
    socklen_t len;
    fd_set mask;

    /* child配列の初期化 */
    for (i = 0; i < MAX_CHILD; i++) {
        child[i] = -1;
    }
    child_no = 0;
    for (;;) {
        /* select()用マスクの作成 */
        FD_ZERO(&mask);
        FD_SET(soc, &mask);
        width = soc + 1;
        count = 0;
        for (i = 0; i < child_no; i++) {
            if (child[i] != -1) {
                FD_SET(child[i], &mask);
                if (child[i] + 1 > width) {
                    width = child[i] + 1;
                    count++;
                }
            }
        }
        (void) fprintf(stderr, "<<child count:%d>>\n", count);
        /* select()用タイムアウト値のセット */
        timeout.tv_sec = 10;
        timeout.tv_usec = 0;
        switch (select(width, (fd_set *) &mask, NULL, NULL, &timeout)) {
        case -1:
            /* エラー */
            perror("select");
            break;
        case 0:
            /* タイムアウト */
            break;
        default:
            /* レディ有り */
            if (FD_ISSET(soc, &mask)) {
                /* サーバソケットレディ */
                len = (socklen_t) sizeof(from);
                /* 接続受付 */
                if ((acc = accept(soc, (struct sockaddr *)&from, &len)) == -1) {
                    if(errno!=EINTR){
                        perror("accept");
                    }
                } else {
                    (void) getnameinfo((struct sockaddr *) &from, len,
                               hbuf, sizeof(hbuf),
                               sbuf, sizeof(sbuf),
                               NI_NUMERICHOST | NI_NUMERICSERV);
                    (void) fprintf(stderr, "accept:%s:%s\n", hbuf, sbuf);
                    /* childの空きを検索 */
                    pos = -1;
                    for (i = 0; i < child_no; i++) {
                        if (child[i] == -1) {
                            pos = i;
                            break;
                        }
                    }
                    if (pos == -1) {
                        /* 空きが無い */
                        if (child_no + 1 >= MAX_CHILD) {
                            /* childにこれ以上格納できない */
                            (void) fprintf(stderr,
					   "child is full : cannot accept\n");
                            /* クローズしてしまう */
                            (void) close(acc);
                        } else {
                            child_no++;
                            pos = child_no - 1;
                        }
                    }
                    if (pos != -1) {
                        /* childに格納 */
                        child[pos] = acc;
                    }
                }
            }
/* 準備が完了しているファイルディスクプタを調べる= 計算量:O(n) */            
            for (i = 0; i < child_no; i++) {
                if (child[i] != -1) {
                    if (FD_ISSET(child[i], &mask)) {
                        /* 送受信 */
                        if ((ret = send_recv(child[i], i)) == -1) {
                            /* エラーまたは切断 */
                            /* クローズ */
                            (void) close(child[i]);
                            /* childを空きに */
                            child[i] = -1;
                        }
                    }
                }
            }
	    break;
        }
    }
}



注目するのはこの部分。ここで監視するファイルディスクリプタの数だけループを回すことになる。

for (i = 0; i < child_no; i++) {
  if (child[i] != -1) {
    if (FD_ISSET(child[i], &mask)) {
      /* 送受信 */
      if ((ret = send_recv(child[i], i)) == -1) {
        /* エラーまたは切断 */
        /* クローズ */
        (void) close(child[i]);
        /* childを空きに */
        child[i] = -1;
      }
    }
  }
}

poll

pollはI/Oを多重化するためのシステムコールの1つで、selectで問題だったファイルディスクリプタの数が無制限になっている。しかし,select同様監視対象のファイルディスクリプタを1つずつ状態確認するため、その計算量は*O(n)*となる。そのため、ファイルディスクリプタの数が増えるとパフォーマンスが低下する。selectを利用するならpollを使う方が良い。以下はpollを使ったsocket(),bind(),listen()後に準備ができているソケットを監視するプログラムだ。

void accept_loop(int soc)
{
    char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
    int child[MAX_CHILD];
    struct sockaddr_storage from;
    int acc, child_no, i, j, count, pos, ret;
    socklen_t len;
    struct pollfd targets[MAX_CHILD + 1];
    /* child配列の初期化 */
    for (i = 0; i < MAX_CHILD; i++) {
        child[i] = -1;
    }
    child_no = 0;
    for (;;) {
        /* poll()用データの作成 */
        count = 0;
        targets[count].fd = soc;
        targets[count].events = POLLIN;
        count++;
        for (i = 0; i < child_no; i++) {
            if (child[i] != -1) {
                targets[count].fd = child[i];
                targets[count].events = POLLIN;
                count++;
            }
        }
        (void) fprintf(stderr,"<<child count:%d>>\n", count - 1);
        switch (poll(targets, count, 10 * 1000)) {
        case -1:
            /* エラー */
            perror("poll");
            break;
        case 0:
            /* タイムアウト */
            break;
        default:
            if (targets[0].revents & POLLIN) {
                /* サーバソケットレディ */
                len = (socklen_t) sizeof(from);
                /* 接続受付 */
                if ((acc = accept(soc, (struct sockaddr *)&from, &len)) == -1) {
                    if(errno!=EINTR){
                        perror("accept");
                    }
                } else {
                    (void) getnameinfo((struct sockaddr *) &from, len,
                               hbuf, sizeof(hbuf),
                               sbuf, sizeof(sbuf),
                               NI_NUMERICHOST | NI_NUMERICSERV);
                    (void) fprintf(stderr, "accept:%s:%s\n", hbuf, sbuf);
                    /* childの空きを検索 */
                    pos = -1;
                    for (i = 0; i < child_no; i++) {
                        if (child[i] == -1) {
                            pos = i;
                            break;
                        }
                    }
                    if (pos == -1) {
                        /* 空きが無い */
                        if (child_no + 1 >= MAX_CHILD) {
                            /* childにこれ以上格納できない */
                            (void) fprintf(stderr,
					   "child is full : cannot accept\n");
                            /* クローズしてしまう */
                            (void) close(acc);
                        } else {
                            child_no++;
                            pos = child_no - 1;
                        }
                    }
                    if (pos != -1) {
                        /* childに格納 */
                        child[pos] = acc;
                    }
                }
            }
/* 準備が完了しているファイルディスクプタを調べる= 計算量:O(n) */            
            for (i = 1; i < count; i++) {
                if (targets[i].revents & (POLLIN | POLLERR)) {
                    /* 送受信 */
                    if ((ret = send_recv(targets[i].fd, i - 1)) == -1) {
                        /* エラーまたは切断 */
                        /* クローズ */
                        (void) close(targets[i].fd);
                        /* childを空きに */
                        for (j = 0; j < child_no; j++) {
                            if (child[j] == targets[i].fd) {
                                child[j] = -1;
                                break;
                            }
                        }
                    }
                }
            }
	    break;
        }
    }
}



selectを使ったプログラムと同様にループで監視するファイルディスクリプタの数だけループを回す必要があるためパフォーマンスを気にする場合は次のepollを利用したほうがいい。

for (i = 1; i < count; i++) {
  if (targets[i].revents & (pollin | pollerr)) {
    /* 送受信 */
    if ((ret = send_recv(targets[i].fd, i - 1)) == -1) {
      /* エラーまたは切断 */
      /* クローズ */
      (void) close(targets[i].fd);
      /* childを空きに */
      for (j = 0; j < child_no; j++) {
        if (child[j] == targets[i].fd) {
          child[j] = -1;
          break;
        }
      }
    }
  }
}

epoll

epoll API は Linux カーネル 2.5.44 に導入されたもので、2.6以降なら利用可能。epollはファイルディスクリプタの数に制限が無いのに加えて、ファイルディスクリプタの状態変化監視も改善されている。具体的には,ファイルディスクリプタの状態をカーネルで監視しており、変化したものを直接通知できるためselect、pollの時のようにループを使った監視をする必要がない。これにより、計算量は*O(1)*となり、よりパフォーマンスの高い多重化I/Oを実現できる。

void accept_loop(int soc)
{
    char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
    struct sockaddr_storage from;
    int acc, count, i, epollfd, nfds, ret;
    socklen_t len;
    struct epoll_event ev, events[MAX_CHILD];

    if ((epollfd = epoll_create(MAX_CHILD + 1)) == -1) {
        perror("epoll_create");
        return;
    }
    /* EPOLL用データの作成 */
    ev.data.fd = soc;
    ev.events = EPOLLIN;
    if (epoll_ctl(epollfd, EPOLL_CTL_ADD, soc, &ev) == -1) {
        perror("epoll_ctl");
        (void) close(epollfd);
        return;
    }
    count = 0;
    for (;;) {
        (void) fprintf(stderr,"<<child count:%d>>\n", count);
        switch ((nfds = epoll_wait(epollfd, events, MAX_CHILD+1, 10 * 1000))) {
        case -1:
            /* エラー */
            perror("epoll_wait");
            break;
        case 0:
            /* タイムアウト */
            break;
        default:
            /* ソケットがレディ */
            for (i = 0; i < nfds; i++) {
                /* events[i].data.fdは全て読み込み可能 */
                if (events[i].data.fd == soc) {
                    /* サーバソケットレディ */
                    len = (socklen_t) sizeof(from);
                    /* 接続受付 */
                    if ((acc = accept(soc, (struct sockaddr *)&from, &len))==-1) {
                        if (errno != EINTR){
                            perror("accept");
                        }
                    } else {
                        (void) getnameinfo((struct sockaddr *) &from, len,
                                           hbuf, sizeof(hbuf),
                                           sbuf, sizeof(sbuf),
                                           NI_NUMERICHOST | NI_NUMERICSERV);
                        (void) fprintf(stderr, "accept:%s:%s\n", hbuf, sbuf);
                        /* 空きが無い */
                        if (count + 1 >= MAX_CHILD) {
                            /* これ以上接続できない */
                            (void) fprintf(stderr,
					  "connection is full : cannot accept\n");
                            /* クローズしてしまう */
                            (void) close(acc);
                        } else {
                            ev.data.fd = acc;
                            ev.events = EPOLLIN;
                            if (epoll_ctl(epollfd, EPOLL_CTL_ADD, acc, &ev) == -1) {
                                perror("epoll_ctl");
                                (void) close(acc);
                                (void) close(epollfd);
                                return;
                            }
                            count++;
                        }
                    }
                } else {
                    /* 送受信 */
                    if ((ret = send_recv(events[i].data.fd, events[i].data.fd))
		        == -1) {
                        /* エラーまたは切断 */
                        if (epoll_ctl(epollfd,
				      EPOLL_CTL_DEL,
				      events[i].data.fd,
				      &ev) == -1) {
                            perror("epoll_ctl");
                            (void) close(acc);
                            (void) close(epollfd);
                            return;
                        }
                        /* クローズ */
                        (void) close(events[i].data.fd);
                        count--;
                    }
                }
            }
	    break;
        }
    }
    (void) close(epollfd);
}



epollはselectやpollと異なりeventsに入っているデータは全て読み込み可能なファイルディスクリプタなので、O(1)の計算量で済む。

/* events[i].data.fdは全て読み込み可能 */
if (events[i].data.fd == soc) {

まとめ

I/O周りの様々な用語をまとめた。今回、自分の中ではノンブロッキングI/Oと非同期I/Oは異なるものだという結論をしたが、インターネット上の情報を参照する時はそれが何を表しているかは文脈で判断したい。また、今後は、Node.jsやNginxといった高度なI/O処理テクニックを利用しているサーバの実装について見ていこうと思う。

参考

今回の話は人によって意見が異なるので、参考文献は多めに載せておくこととする。

書籍

Linuxネットワークプログラミングバイブルは本書籍の多重化の章は説明もプログラムも豊富でC言語でI/O多重化の実装の流れを掴みたいときは参考になると思う。

Linuxネットワークプログラミングバイブル
  • Author: 小俣光之
  • Manufacturer: 秀和システム
  • Publish date: 2011-02-01
  • 全般

    ノンブロッキングI/O

    I/Oの多重化

    C10K問題の歴史

    その他