passengerを読み解く(例のあのプロセスとの通信) vol4

passengerを読み解く(例のあのプロセスが動くまで) vol2 - I am Cruby!
で起動したプロセスは何を行っているんだろうか.
 
I am Cruby!
ではApacheが生成したプロセスと通信している姿が見られた.
今回はその謎を追う事に.
 

プロセスの起動先

			dup2(fds[0], SERVER_SOCKET_FD);
                 ...
			// execute!
			execlp(
				#if 0
					"valgrind",
					"valgrind",
				#else
					m_serverExecutable.c_str(),
				#endif
				m_serverExecutable.c_str(),
				toString(Passenger::getLogLevel()).c_str(),
				m_spawnServerCommand.c_str(),
				m_logFile.c_str(),
				m_rubyCommand.c_str(),
				m_user.c_str(),
				statusReportFIFO.c_str(),
				NULL);

では何が呼び出されたのか.
 
ApplicationPoolServerExecutableが実行されている.
以下はそのmain部

int
main(int argc, char *argv[]) {
	try {
		Server server(SERVER_SOCKET_FD, atoi(argv[1]),
			argv[2], argv[3], argv[4], argv[5], argv[6]);
		return server.start();
	} catch (const exception &e) {
		P_ERROR(e.what());
		return 1;
	}
}

SERVER_SOCKET_FDは 3 である.親プロセスでdup2した事を思い出して欲しい.
serverをnewしている.
 

Server()

	Server(int serverSocket,
	       const unsigned int logLevel,
	       const string &spawnServerCommand,
	       const string &logFile,
	       const string &rubyCommand,
	       const string &user,
	       const string &statusReportFIFO)
		: pool(spawnServerCommand, logFile, rubyCommand, user) {
		
		Passenger::setLogLevel(logLevel);
		this->serverSocket = serverSocket;
		this->statusReportFIFO = statusReportFIFO;
	}

poolについては次回詳しく説明する.
その他はserverSocketを設定したり,状態を設定するファイル名を設定したり.
statusReportFIFOに入っている文字列は以下のパターンである.
 
/tmp/passenger_status.プロセス名.fifo
 

server.start()

コードは以下.さすがに長い...orz

int
Server::start() {
    ...
                /* (1) */
		if (!statusReportFIFO.empty()) {
			statusReportThread = ptr(
				new Thread(
					bind(&Server::statusReportThreadMain, this),
					1024 * 128
				)
			);
		}
		
		while (!this_thread::interruption_requested()) {   /* (2) */
			int fds[2], ret;
			char x;
			
                        /* (3) */
			// The received data only serves to wake up the server socket,
			// and is not important.
			ret = InterruptableCalls::read(serverSocket, &x, 1);

                  ...
			
			// We have an incoming connect request from an
			// ApplicationPool client.
			do {
				ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fds);
			} while (ret == -1 && errno == EINTR);
			if (ret == -1) {
				throw SystemException("Cannot create an anonymous Unix socket", errno);
			}
			

                  ...
                        /* (4) */
			MessageChannel(serverSocket).writeFileDescriptor(fds[1]);
			InterruptableCalls::close(fds[1]);
			
                        /* (5) */
			ClientPtr client(new Client(*this, fds[0]));

                  ...

			{
				mutex::scoped_lock l(lock);
				clients.insert(client);
			}
                  ...
                        /* (6) */
			client->start(client);
		}
        ...

	return 0;
}

このメソッドを起動して,main()は終了する.
つまりこのコードがforkで起動したプロセスの肝である.
 

(1)状態を報告するスレッドの起動
		if (!statusReportFIFO.empty()) {
			statusReportThread = ptr(
				new Thread(
					bind(&Server::statusReportThreadMain, this),
					1024 * 128
				)
			);
		}

この部分では,このプロセスの状態を報告するスレッドを起動する様である.
ここは処理の本質ではなさそうなので,深くは読まない.名前で判断.
 

(2)メインループ
		while (!this_thread::interruption_requested()) {   /* (2) */

これも名前で判断.
「リクエストが中断されるまで,処理を繰り返す」
という意味だと思う.
 

(3)通信待ち受け
			// The received data only serves to wake up the server socket,
			// and is not important.
			ret = InterruptableCalls::read(serverSocket, &x, 1);

serverSocketは起動した親プロセスと繋がっているソケットである.
ココで親プロセスからの通信を待ち受ける.
 
実は他プロセスから起動するコードは前回紹介済み.
applicationPoolServer->connect()内の話である.

ApplicationPoolPtr connect() {
                ...
                        // Write some random data to wake up the server.
                        channel.writeRaw("x", 1);
                ...
        }

ココ.この処理が実行されるのは,Apacheが生成した子プロセスなので,親プロセスは一緒であるが無関係である.
ただ,親プロセスでつないでおいたソケットはそのまま,子プロセスに引き継がれるので,無関係ではないか.
腹違いの兄弟?
 

(4)他プロセスへのファイルディスクプリタ受け渡し
			MessageChannel(serverSocket).writeFileDescriptor(fds[1]);
			InterruptableCalls::close(fds[1]);

ここでは自分を起動したプロセスさんに,socketpairで新しく生成したの片方のファイルディスクプリタを渡す.

	/**
	 * Pass a file descriptor. This only works if the underlying file
	 * descriptor is a Unix socket.
	 *
	 * @param fileDescriptor The file descriptor to pass.
	 * @throws SystemException Something went wrong during file descriptor passing.
	 * @throws boost::thread_interrupted
	 * @pre <tt>fileDescriptor >= 0</tt>
	 * @see readFileDescriptor()
	 */
	void writeFileDescriptor(int fileDescriptor) {
		struct msghdr msg;
		struct iovec vec;
		char dummy[1];
		char control_data[CMSG_SPACE(sizeof(int))];
		struct cmsghdr *control_header;
		int ret;
	
		msg.msg_name = NULL;
		msg.msg_namelen = 0;
	
		/* Linux and Solaris require msg_iov to be non-NULL. */
		dummy[0]       = '\0';
		vec.iov_base   = dummy;
		vec.iov_len    = sizeof(dummy);
		msg.msg_iov    = &vec;
		msg.msg_iovlen = 1;
	
		msg.msg_control    = (caddr_t) &control_data;
		msg.msg_controllen = sizeof(control_data);
		msg.msg_flags      = 0;
		
		control_header = CMSG_FIRSTHDR(&msg);
		control_header->cmsg_level = SOL_SOCKET;
		control_header->cmsg_type  = SCM_RIGHTS;
                control_header->cmsg_len = CMSG_LEN(sizeof(int));
                memcpy(CMSG_DATA(control_header), &fileDescriptor, sizeof(int));
		
		ret = InterruptableCalls::sendmsg(fd, &msg, 0);
	}

sendmsg(2)でfdの受け渡しをしている.
 
受け取り側は前回紹介ずみである.

ApplicationPoolPtr connect() {
                ...
                        clientConnection = channel.readFileDescriptor();
                ...
        }

違うプロセス間でfdを受け渡す方法としてはポピュラな方法なんだろうか.
ファイル記述子をUnixドメインソケット経由で渡す - bkブログによると,詳細Unixの第15章に乗っているらしい.
おぉ.glibc内部でも使っているのか.勉強になるなぁ.
 

(5)Client生成
			ClientPtr client(new Client(*this, fds[0]));

clientというのは何だろう?
 

/*****************************************
 * Client
 *****************************************/

/**
 * Represents a single ApplicationPool client, connected to this server.
 *
 * @invariant
 * The life time of a Client object is guaranteed to be less than
 * that of its associated Server object.
 */
class Client {
private:

  ...

public:
	/**
	 * Create a new Client object.
	 *
	 * @param the_server The Server object that this Client belongs to.
	 * @param connection The connection to the ApplicationPool client.
	 *
	 * @note
	 * <tt>connection</tt> will be closed upon destruction
	 */
	Client(Server &the_server, int connection)
		: server(the_server),
		  fd(connection),
		  channel(connection) {
		thr = NULL;
		lastSessionID = 0;
	}

コメントを訳すと,
「一つのApplicationPoolのクライアントを表し,このサーバと繋がっている.」
どうやら,ApplicationPoolとApacheのサブプロセスをつないでいるもののよう.
さっきの,ApplicationPoolServerのpoolメンバの事であろう.
 
clientを生成してからの続き

			ClientPtr client(new Client(*this, fds[0]));
                    ...
			{
				mutex::scoped_lock l(lock);
				clients.insert(client);
			}

mutexをロックして,clientsにinsertしている.
clientsは?

class Server {
private:
   ...
	set<ClientPtr> clients;

おぉ,setか.
setはinsertなどすると自動でソートしてくれるクラスだった様な.
http://www.geocities.jp/ky_webid/cpp/library/009.html
そうみたい.ふむふむ.
ClientPtrはClientのshared_ptrである.GCのにおいが少し.
 

(6)Clientのスタート!
	/**
	 * Start the thread for handling the connection with this client.
	 *
	 * @param self The iterator of this Client object inside the server's
	 *        <tt>clients</tt> set. This is used to remove itself from
	 *        the <tt>clients</tt> set once the client has closed the
	 *        connection.
	 */
	void start(const weak_ptr<Client> self) {
		thr = new Thread(
			bind(&Client::threadMain, this, self),
			CLIENT_THREAD_STACK_SIZE
		);
	}

コメントでは,「clientがコネクトをハンドルするようのThreadを起動する」と書いてある.
 
ココで一つ疑問.new ThreadってC++にあるんすか?
System.h

	using namespace boost;
	
      ....

	/**
	 * Thread class with system call interruption support.
	 */
	class Thread: public thread {
	public:
		template <class F>
		explicit Thread(F f, unsigned int stackSize = 0)
			: thread(f, stackSize) {}
		
		/**
		 * Interrupt the thread. This method behaves just like
		 * boost::thread::interrupt(), but will also respect the interruption
		 * points defined in Passenger::InterruptableCalls.
		 *
		 * Note that an interruption request may get lost, depending on the
		 * current execution point of the thread. Thus, one should call this
		 * method in a loop, until a certain goal condition has been fulfilled.
		 * interruptAndJoin() is a convenience method that implements this
		 * pattern.
		 */
		void interrupt() {
			int ret;
			
			thread::interrupt();
			do {
				ret = pthread_kill(native_handle(),
					INTERRUPTION_SIGNAL);
			} while (ret == EINTR);
		}
		
		/**
		 * Keep interrupting the thread until it's done, then join it.
		 *
		 * @throws boost::thread_interrupted
		 */
		void interruptAndJoin() {
			bool done = false;
			while (!done) {
				interrupt();
				done = timed_join(posix_time::millisec(10));
			}
		}
	};

boostのthreadを継承したクラスなのか.boostって知らなかったや.勉強になるなぁ.
 
あと,bindだけど,関数ポインタみたいなものを作るよう.
なので,新しく作成されたThreadで走るメソッドは以下.

	/**
	 * The entry point of the thread that handles the client connection.
	 */
	void threadMain(const weak_ptr<Client> self) {
		vector<string> args;
		try {
			while (!this_thread::interruption_requested()) {
                                ...
					if (!channel.read(args)) {
						// Client closed connection.
						break;
					}
                                ...

				P_TRACE(4, "Client " << this << ": received message: " <<
					toString(args));
				
				if (args[0] == "get" && args.size() == 7) {
					processGet(args);
				} else if (args[0] == "close" && args.size() == 2) {
					processClose(args);
				} else if (args[0] == "clear" && args.size() == 1) {
					processClear(args);
				} else if (args[0] == "setMaxIdleTime" && args.size() == 2) {
					processSetMaxIdleTime(args);
				} else if (args[0] == "setMax" && args.size() == 2) {
					processSetMax(args);
				} else if (args[0] == "getActive" && args.size() == 1) {
					processGetActive(args);
				} else if (args[0] == "getCount" && args.size() == 1) {
					processGetCount(args);
				} else if (args[0] == "setMaxPerApp" && args.size() == 2) {
					processSetMaxPerApp(atoi(args[1]));
				} else if (args[0] == "getSpawnServerPid" && args.size() == 1) {
					processGetSpawnServerPid(args);
				} else {
					processUnknownMessage(args);
					break;
				}
				args.clear();
			}
		} catch (const boost::thread_interrupted &) {
			P_TRACE(2, "Client thread " << this << " interrupted.");
		} catch (const exception &e) {
			P_TRACE(2, "Uncaught exception in ApplicationPoolServer client thread:\n"
				<< "   message: " << toString(args) << "\n"
				<< "   exception: " << e.what());
		}
		
                /* (1) */
		mutex::scoped_lock l(server.lock);
		ClientPtr myself(self.lock());
		if (myself != NULL) {
			server.clients.erase(myself);
		}
	}

 
この"get" や, "setMaxIdel"などは前回の記事で紹介した,以下の部分とリンクする.

virtual void setMax(unsigned int max) {
    MessageChannel channel(data->server);
    mutex::scoped_lock l(data->lock);
    channel.write("setMax", toString(max).c_str(), NULL);
}

これはinitChild内部のserver->connect()で呼び出されるメソッドである.
ふむ,なるほど.こうやってパラメータを設定したりするのか.
 
(1)
Clientが用済みになった場合,最終的にはself.lock()でweak_ptrからポインタを取り出して,clientsから削除している.
lock()の挙動についてはこちら.letsboost::weak_ptr
削除されれば参照カウンタが0になり,deleteされるはずである.
 

まとめ

  • 起動されたプロセスの役割
    • 決まったディスクプリタで待ち受け( 3 )
    • Apacheが生成したプロセスはその決まったディスクプリタに書き込み.
    • 待ち受けが解除され,通信するクライアントを生成し,片方のファイルディスクプリタをApache側へ渡す.
    • 生成されたクライアントはさらに通信を待ち受け
    • 待ち受けが解除され,読み込んだ文字列を元にプールしているApplicationPoolに対して操作を行う

 

次回は

ApplicationPoolServerのpoolメンバに何が入っているのか!
について.