php简易多进程tcp服务器模型
我最近在看 workerman,我看的是 3.4.1 版本。整体上大致梳理了一下 workerman 框架的执行流程。当然,我感觉盗用一个图说,更容易理解:
相信大家都看的明白,我也就大概说两句吧,这里master 进程创建 tcp 服务器,并启动 子进程 workers 去接受客户端的链接,处理逻辑。而子进程中,又采用的 epoll 方式,高效的处理连接事件。这里,我大致,简易的写几个类,放在一个文件中,方便大家去理解,workerman 的大致流程。当然,其中还要牵扯很多细节方面的东西,我在此,便不做过多的扩展。因为 workerman 已经在这了,写的很优秀,大家直接看源码即可。
1、Worker 类
class Worker
{
// 所有运行的进程的id
public static $workers = [];
// 事件轮询库对象
public static $loop = null;
// 服务器 socket
public $mainSocket = null;
// 连接的 socket
public $conn_socket = null;
/**
* 架构函数
* Worker constructor.
*/
public function __construct()
{
// fork 子进程
$this->forkWorker();
// 监管子进程
$this->monitor();
}
/**
* fork 子进程
*/
public function forkWorker()
{
// 获取主进程 ID
$mainPid = posix_getpid();
// 设置主进程的名称 方便通过 ps 命令查询
$this->setProcessTitle("master " . $mainPid . " process");
// 创建 tcp 服务器,并监听 2000 端口
$this->mainSocket = stream_socket_server("tcp://0.0.0.0:2000", $errno, $errstr);
// 设置为非阻塞模式
stream_set_blocking($this->mainSocket, 0);
// fork 出4个子进程
for($i = 0; $i < 4; $i++){
$this->forkOne();
}
}
/**
* fork 一个子进程
*/
public function forkOne()
{
// fork 子进程
$pid = pcntl_fork();
if($pid > 0){
// 父进程中记录 worker 子进程的 id
self::$workers[$pid] = $pid;
}else if(0 === $pid){
// 子进程 中设置子进程名称,方便 ps 查看
$this->setProcessTitle("child " . posix_getpid() . " process");
// 实例化轮询库
if(is_null(self::$loop)){
self::$loop = new Libevent();
}
// 添加轮询 回调 函数
self::$loop->add($this->mainSocket, [$this, 'acceptCb']);
// 启动事件轮询
self::$loop->loop();
exit(0);
}else{
// fork 失败退出
exit("fork one worker fail");
}
}
/**
* 监管子进程
*/
public function monitor()
{
while(1){
// 如果有信号到来,尝试触发信号处理函数
pcntl_signal_dispatch();
// 挂起进程,直到有子进程退出或者被信号打断
$status = 0;
$pid = pcntl_wait($status);
// 如果有信号到来,尝试触发信号处理函数
pcntl_signal_dispatch();
// 子进程退出信号
if($pid > 0){
// 如果不是正常退出,是被kill等杀掉的
if($status !== 0){
exit("worker {$pid} exit with status $status");
}
// 删除已经退出的子进程id
unset(self::$workers[$pid]);
// 重新 fork 一个子进程
$this->forkOne();
}
}
}
/**
* 接收连接 回调
* @param $socket
* @param $flag
* @param $base
*/
public function acceptCb($socket, $flag, $base)
{
// 接受连接
$this->conn_socket = @stream_socket_accept($socket, 0);
// 设置为非阻塞
stream_set_blocking($this->conn_socket, 0);
// 实例化连接对象
new Connection($this->conn_socket, self::$loop);
}
/**
* 设置当前进程的名称,在ps aux命令中有用
* 注意 需要php>=5.5或者安装了protitle扩展
* @param string $title
* @return void
*/
protected function setProcessTitle($title)
{
if (!empty($title)){
// 需要扩展
if(extension_loaded('proctitle') && function_exists('setproctitle')){
@setproctitle($title);
}else if (function_exists('cli_set_process_title')){
// >=php 5.5
cli_set_process_title($title);
}
}
}
}
这个类,主要就是 创建 tcp 服务器,并且监听 2000 端口。fork 出 4 个子进程,来应对客户端的链接。同时,主进程监听子进程的动态,有子进程完成任务退出后,立马补上新的 子进程。
2、Connetction 类
class Connection
{
// 连接的 socket
public $_socket = null;
/**
* 构造函数
* Connection constructor.
* @param $socket
* @param $libevent
*/
public function __construct($socket, $libevent)
{
$this->_socket = $socket;
// 添加读取回调
$libevent->add($socket, [$this, 'readCb']);
}
/**
* 读取内容回调函数
* @param $socket
* @param $flag
* @param $base
*/
public function readCb($socket, $flag, $base)
{
$buffer = @fread($socket, 1024);
if ($buffer === '' || $buffer === false) {
if ((feof($socket) || !is_resource($socket) || $buffer === false)) {
// 接收完成,清除轮询事件,关闭 socket 连接
$this->del();
return;
}
} else {
echo $buffer . PHP_EOL;
}
// 向客户端发送欢迎信息
fwrite($socket, "welcome " . (int)$this->_socket);
}
/**
* 关闭轮询
*/
public function del()
{
Worker::$loop->del($this->_socket);
}
}
每一个客户端的链接,都会实例化一个 connection 对象来处理这些链接。包括读取客户端发来的信息,以及发送给客户端一些信息。
3、Libevent 类
class Libevent
{
public $base = null;
public $events = [];
public function __construct()
{
// 创建并且初始事件
$this->base = event_base_new();
}
/**
* 添加事件轮询
* @param $socket
* @param $func
*/
public function add($socket, $func)
{
// 创建一个新的事件
$event = event_new();
// 准备想要在event_add中添加回调事件
event_set($event, $socket, EV_READ | EV_PERSIST, $func, $this->base);
// 关联事件到事件base
event_base_set($event, $this->base);
// 向指定的设置中添加一个执行事件
event_add($event);
// 记录事件
$this->events[(int)$socket] = $event;
}
/**
* 启动事件轮询
*/
public function loop()
{
event_base_loop($this->base);
}
/**
* 移除事件轮询
* @param $socket
*/
public function del($socket)
{
event_del($this->events[(int)$socket]);
unset($this->events[(int)$socket]);
@fclose($socket);
}
}
这个类,采用 php libevent 扩展,通过 epoll 模式高效的去接收客户端的请求。
测试方法如下 client.php
<?php
/**
* author: NickBai
* createTime: 2016/12/17 0017 下午 3:00
*/
$socket_client = stream_socket_client('tcp://127.0.0.1:2000', $errno, $errstr, 30);
fwrite($socket_client, "hello world!");
sleep(2);
$return = fread($socket_client, 1024);
echo "come from server : " . $return . PHP_EOL;
sleep(2);
fwrite($socket_client, "send again!");
$return = fread($socket_client, 1024);
echo "come from server : " . $return . PHP_EOL;
Worker 类完整 单文件如下 worker.php
<?php
class Worker
{
// 所有运行的进程的id
public static $workers = [];
// 事件轮询库对象
public static $loop = null;
// 服务器 socket
public $mainSocket = null;
// 连接的 socket
public $conn_socket = null;
/**
* 架构函数
* Worker constructor.
*/
public function __construct()
{
// fork 子进程
$this->forkWorker();
// 监管子进程
$this->monitor();
}
/**
* fork 子进程
*/
public function forkWorker()
{
// 获取主进程 ID
$mainPid = posix_getpid();
// 设置主进程的名称 方便通过 ps 命令查询
$this->setProcessTitle("master " . $mainPid . " process");
// 创建 tcp 服务器,并监听 2000 端口
$this->mainSocket = stream_socket_server("tcp://0.0.0.0:2000", $errno, $errstr);
// 设置为非阻塞模式
stream_set_blocking($this->mainSocket, 0);
// fork 出4个子进程
for($i = 0; $i < 4; $i++){
$this->forkOne();
}
}
/**
* fork 一个子进程
*/
public function forkOne()
{
// fork 子进程
$pid = pcntl_fork();
if($pid > 0){
// 父进程中记录 worker 子进程的 id
self::$workers[$pid] = $pid;
}else if(0 === $pid){
// 子进程 中设置子进程名称,方便 ps 查看
$this->setProcessTitle("child " . posix_getpid() . " process");
// 实例化轮询库
if(is_null(self::$loop)){
self::$loop = new Libevent();
}
// 添加轮询 回调 函数
self::$loop->add($this->mainSocket, [$this, 'acceptCb']);
// 启动事件轮询
self::$loop->loop();
exit(0);
}else{
// fork 失败退出
exit("fork one worker fail");
}
}
/**
* 监管子进程
*/
public function monitor()
{
while(1){
// 如果有信号到来,尝试触发信号处理函数
pcntl_signal_dispatch();
// 挂起进程,直到有子进程退出或者被信号打断
$status = 0;
$pid = pcntl_wait($status);
// 如果有信号到来,尝试触发信号处理函数
pcntl_signal_dispatch();
// 子进程退出信号
if($pid > 0){
// 如果不是正常退出,是被kill等杀掉的
if($status !== 0){
exit("worker {$pid} exit with status $status");
}
// 删除已经退出的子进程id
unset(self::$workers[$pid]);
// 重新 fork 一个子进程
$this->forkOne();
}
}
}
/**
* 接收连接 回调
* @param $socket
* @param $flag
* @param $base
*/
public function acceptCb($socket, $flag, $base)
{
// 接受连接
$this->conn_socket = @stream_socket_accept($socket, 0);
// 设置为非阻塞
stream_set_blocking($this->conn_socket, 0);
// 实例化连接对象
new Connection($this->conn_socket, self::$loop);
}
/**
* 设置当前进程的名称,在ps aux命令中有用
* 注意 需要php>=5.5或者安装了protitle扩展
* @param string $title
* @return void
*/
protected function setProcessTitle($title)
{
if (!empty($title)){
// 需要扩展
if(extension_loaded('proctitle') && function_exists('setproctitle')){
@setproctitle($title);
}else if (function_exists('cli_set_process_title')){
// >=php 5.5
cli_set_process_title($title);
}
}
}
}
class Connection
{
// 连接的 socket
public $_socket = null;
/**
* 构造函数
* Connection constructor.
* @param $socket
* @param $libevent
*/
public function __construct($socket, $libevent)
{
$this->_socket = $socket;
// 添加读取回调
$libevent->add($socket, [$this, 'readCb']);
}
/**
* 读取内容回调函数
* @param $socket
* @param $flag
* @param $base
*/
public function readCb($socket, $flag, $base)
{
$buffer = @fread($socket, 1024);
if ($buffer === '' || $buffer === false) {
if ((feof($socket) || !is_resource($socket) || $buffer === false)) {
// 接收完成,清除轮询事件,关闭 socket 连接
$this->del();
return;
}
} else {
echo $buffer . PHP_EOL;
}
// 向客户端发送欢迎信息
fwrite($socket, "welcome " . (int)$this->_socket);
}
/**
* 关闭轮询
*/
public function del()
{
Worker::$loop->del($this->_socket);
}
}
class Libevent
{
public $base = null;
public $events = [];
public function __construct()
{
// 创建并且初始事件
$this->base = event_base_new();
}
/**
* 添加事件轮询
* @param $socket
* @param $func
*/
public function add($socket, $func)
{
// 创建一个新的事件
$event = event_new();
// 准备想要在event_add中添加回调事件
event_set($event, $socket, EV_READ | EV_PERSIST, $func, $this->base);
// 关联事件到事件base
event_base_set($event, $this->base);
// 向指定的设置中添加一个执行事件
event_add($event);
// 记录事件
$this->events[(int)$socket] = $event;
}
/**
* 启动事件轮询
*/
public function loop()
{
event_base_loop($this->base);
}
/**
* 移除事件轮询
* @param $socket
*/
public function del($socket)
{
event_del($this->events[(int)$socket]);
unset($this->events[(int)$socket]);
@fclose($socket);
}
}
new Worker();
记住,一定要在 linux 下运行,并且保证你的 php > 5.4 ,以及你安装了 php libevent 扩展 和 pcntl 扩展