php简易多进程tcp服务器模型

作者: 白云飞 分类: php,workerman 发布时间: 2017-04-27 13:37 阅读:

我最近在看 workerman,我看的是 3.4.1 版本。整体上大致梳理了一下 workerman 框架的执行流程。当然,我感觉盗用一个图说,更容易理解:

 

相信大家都看的明白,我也就大概说两句吧,这里master 进程创建 tcp 服务器,并启动 子进程 workers 去接受客户端的链接,处理逻辑。而子进程中,又采用的 epoll 方式,高效的处理连接事件。这里,我大致,简易的写几个类,放在一个文件中,方便大家去理解,workerman 的大致流程。当然,其中还要牵扯很多细节方面的东西,我在此,便不做过多的扩展。因为 workerman 已经在这了,写的很优秀,大家直接看源码即可。

1、Worker 类

class Worker
{
    // 所有运行的进程的id
    public static $workers = [];
    // 事件轮询库对象
    public static $loop = null;
    // 服务器 socket
    public $mainSocket = null;
    // 连接的 socket
    public $conn_socket = null;
 
    /**
     * 架构函数
     * Worker constructor.
     */
    public function __construct()
    {  
        // fork 子进程
        $this->forkWorker();
        // 监管子进程
        $this->monitor();
    }
 
    /**
     * fork 子进程
     */
    public function forkWorker()
    {
        // 获取主进程 ID
        $mainPid = posix_getpid();
        // 设置主进程的名称  方便通过 ps 命令查询
        $this->setProcessTitle("master " . $mainPid . " process");
 
        // 创建 tcp 服务器,并监听 2000 端口
        $this->mainSocket = stream_socket_server("tcp://0.0.0.0:2000", $errno, $errstr);
        // 设置为非阻塞模式
        stream_set_blocking($this->mainSocket, 0);
 
        // fork 出4个子进程
        for($i = 0; $i < 4; $i++){
 
            $this->forkOne();
        }
         
    }
 
    /**
     * fork 一个子进程
     */
    public function forkOne()
    {
        // fork 子进程
        $pid = pcntl_fork();
        if($pid > 0){
            // 父进程中记录 worker 子进程的 id
            self::$workers[$pid] = $pid;
        }else if(0 === $pid){
            // 子进程 中设置子进程名称,方便 ps 查看
            $this->setProcessTitle("child " . posix_getpid() . " process");
 
            // 实例化轮询库
            if(is_null(self::$loop)){
                self::$loop = new Libevent();
            }
 
            // 添加轮询 回调 函数
            self::$loop->add($this->mainSocket, [$this, 'acceptCb']);
            // 启动事件轮询
            self::$loop->loop();
            exit(0);
        }else{
            // fork 失败退出
            exit("fork one worker fail");
        }
         
    }
 
    /**
     * 监管子进程
     */
    public function monitor()
    {
        while(1){
 
            // 如果有信号到来,尝试触发信号处理函数
            pcntl_signal_dispatch();
            // 挂起进程,直到有子进程退出或者被信号打断
            $status = 0;
            $pid = pcntl_wait($status);
            // 如果有信号到来,尝试触发信号处理函数
            pcntl_signal_dispatch();
         
            // 子进程退出信号
            if($pid > 0){
                 
                // 如果不是正常退出,是被kill等杀掉的
                if($status !== 0){
                    exit("worker {$pid} exit with status $status");
                }
                // 删除已经退出的子进程id
                unset(self::$workers[$pid]);
                // 重新 fork 一个子进程
                $this->forkOne();
            }  
        }
    }
 
    /**
     * 接收连接 回调
     * @param $socket
     * @param $flag
     * @param $base
     */
    public function acceptCb($socket, $flag, $base)
    {
        // 接受连接
        $this->conn_socket = @stream_socket_accept($socket, 0);
        // 设置为非阻塞
        stream_set_blocking($this->conn_socket, 0);
 
        // 实例化连接对象
        new Connection($this->conn_socket, self::$loop);
    }
 
    /**
     * 设置当前进程的名称,在ps aux命令中有用
     * 注意 需要php>=5.5或者安装了protitle扩展
     * @param string $title
     * @return void
     */
    protected function setProcessTitle($title)
    {
        if (!empty($title)){
            // 需要扩展
            if(extension_loaded('proctitle') && function_exists('setproctitle')){
                @setproctitle($title);
                 
            }else if (function_exists('cli_set_process_title')){
                // >=php 5.5
                cli_set_process_title($title);
            }
        }
    }
}

这个类,主要就是 创建 tcp 服务器,并且监听 2000 端口。fork 出 4 个子进程,来应对客户端的链接。同时,主进程监听子进程的动态,有子进程完成任务退出后,立马补上新的 子进程。

2、Connetction 类

class Connection
{
    // 连接的 socket
    public $_socket = null;
 
    /**
     * 构造函数
     * Connection constructor.
     * @param $socket
     * @param $libevent
     */
    public function __construct($socket, $libevent)
    {
        $this->_socket = $socket;
        // 添加读取回调
        $libevent->add($socket, [$this, 'readCb']);
    }
 
    /**
     * 读取内容回调函数
     * @param $socket
     * @param $flag
     * @param $base
     */
    public function readCb($socket, $flag, $base)
    {
        $buffer = @fread($socket, 1024);
 
        if ($buffer === '' || $buffer === false) {
            if ((feof($socket) || !is_resource($socket) || $buffer === false)) {
                // 接收完成,清除轮询事件,关闭 socket 连接
                $this->del();
                return;
            }
        } else {
            echo $buffer . PHP_EOL;
        }
        // 向客户端发送欢迎信息
        fwrite($socket, "welcome " . (int)$this->_socket); 
    }
 
    /**
     * 关闭轮询
     */
    public function del()
    {
        Worker::$loop->del($this->_socket);
    }
}

每一个客户端的链接,都会实例化一个 connection 对象来处理这些链接。包括读取客户端发来的信息,以及发送给客户端一些信息。

 

3、Libevent 类

class Libevent
{
    public $base = null;
    public $events = [];
 
    public function __construct()
    {
        // 创建并且初始事件
        $this->base = event_base_new();
    }
 
    /**
     * 添加事件轮询
     * @param $socket
     * @param $func
     */
    public function add($socket, $func)
    {
        // 创建一个新的事件
        $event = event_new();
        // 准备想要在event_add中添加回调事件
        event_set($event, $socket, EV_READ | EV_PERSIST, $func, $this->base);
        // 关联事件到事件base
        event_base_set($event, $this->base);
        // 向指定的设置中添加一个执行事件
        event_add($event);
 
        // 记录事件
        $this->events[(int)$socket] = $event;
    }
 
    /**
     * 启动事件轮询
     */
    public function loop()
    {
        event_base_loop($this->base);
    }
 
    /**
     * 移除事件轮询
     * @param $socket
     */
    public function del($socket)
    {
        event_del($this->events[(int)$socket]);
        unset($this->events[(int)$socket]);
        @fclose($socket);
    }
}

这个类,采用 php libevent 扩展,通过 epoll 模式高效的去接收客户端的请求。

 

测试方法如下 client.php

<?php
/**
 * author: NickBai
 * createTime: 2016/12/17 0017 下午 3:00
 */
$socket_client = stream_socket_client('tcp://127.0.0.1:2000', $errno, $errstr, 30);
fwrite($socket_client, "hello world!");
sleep(2);
$return = fread($socket_client, 1024);
echo "come from server : " . $return . PHP_EOL;
sleep(2);
  
fwrite($socket_client, "send again!");
$return = fread($socket_client, 1024);
echo "come from server : " . $return . PHP_EOL;

Worker 类完整 单文件如下 worker.php

<?php
class Worker
{
    // 所有运行的进程的id
    public static $workers = [];
    // 事件轮询库对象
    public static $loop = null;
    // 服务器 socket
    public $mainSocket = null;
    // 连接的 socket
    public $conn_socket = null;
 
    /**
     * 架构函数
     * Worker constructor.
     */
    public function __construct()
    {  
        // fork 子进程
        $this->forkWorker();
        // 监管子进程
        $this->monitor();
    }
 
    /**
     * fork 子进程
     */
    public function forkWorker()
    {
        // 获取主进程 ID
        $mainPid = posix_getpid();
        // 设置主进程的名称  方便通过 ps 命令查询
        $this->setProcessTitle("master " . $mainPid . " process");
 
        // 创建 tcp 服务器,并监听 2000 端口
        $this->mainSocket = stream_socket_server("tcp://0.0.0.0:2000", $errno, $errstr);
        // 设置为非阻塞模式
        stream_set_blocking($this->mainSocket, 0);
 
        // fork 出4个子进程
        for($i = 0; $i < 4; $i++){
 
            $this->forkOne();
        }
         
    }
 
    /**
     * fork 一个子进程
     */
    public function forkOne()
    {
        // fork 子进程
        $pid = pcntl_fork();
        if($pid > 0){
            // 父进程中记录 worker 子进程的 id
            self::$workers[$pid] = $pid;
        }else if(0 === $pid){
            // 子进程 中设置子进程名称,方便 ps 查看
            $this->setProcessTitle("child " . posix_getpid() . " process");
 
            // 实例化轮询库
            if(is_null(self::$loop)){
                self::$loop = new Libevent();
            }
 
            // 添加轮询 回调 函数
            self::$loop->add($this->mainSocket, [$this, 'acceptCb']);
            // 启动事件轮询
            self::$loop->loop();
            exit(0);
        }else{
            // fork 失败退出
            exit("fork one worker fail");
        }
         
    }
 
    /**
     * 监管子进程
     */
    public function monitor()
    {
        while(1){
 
            // 如果有信号到来,尝试触发信号处理函数
            pcntl_signal_dispatch();
            // 挂起进程,直到有子进程退出或者被信号打断
            $status = 0;
            $pid = pcntl_wait($status);
            // 如果有信号到来,尝试触发信号处理函数
            pcntl_signal_dispatch();
         
            // 子进程退出信号
            if($pid > 0){
                 
                // 如果不是正常退出,是被kill等杀掉的
                if($status !== 0){
                    exit("worker {$pid} exit with status $status");
                }
                // 删除已经退出的子进程id
                unset(self::$workers[$pid]);
                // 重新 fork 一个子进程
                $this->forkOne();
            }  
        }
    }
 
    /**
     * 接收连接 回调
     * @param $socket
     * @param $flag
     * @param $base
     */
    public function acceptCb($socket, $flag, $base)
    {
        // 接受连接
        $this->conn_socket = @stream_socket_accept($socket, 0);
        // 设置为非阻塞
        stream_set_blocking($this->conn_socket, 0);
 
        // 实例化连接对象
        new Connection($this->conn_socket, self::$loop);
    }
 
    /**
     * 设置当前进程的名称,在ps aux命令中有用
     * 注意 需要php>=5.5或者安装了protitle扩展
     * @param string $title
     * @return void
     */
    protected function setProcessTitle($title)
    {
        if (!empty($title)){
            // 需要扩展
            if(extension_loaded('proctitle') && function_exists('setproctitle')){
                @setproctitle($title);
                 
            }else if (function_exists('cli_set_process_title')){
                // >=php 5.5
                cli_set_process_title($title);
            }
        }
    }
}
 
class Connection
{
    // 连接的 socket
    public $_socket = null;
 
    /**
     * 构造函数
     * Connection constructor.
     * @param $socket
     * @param $libevent
     */
    public function __construct($socket, $libevent)
    {
        $this->_socket = $socket;
        // 添加读取回调
        $libevent->add($socket, [$this, 'readCb']);
    }
 
    /**
     * 读取内容回调函数
     * @param $socket
     * @param $flag
     * @param $base
     */
    public function readCb($socket, $flag, $base)
    {
        $buffer = @fread($socket, 1024);
 
        if ($buffer === '' || $buffer === false) {
            if ((feof($socket) || !is_resource($socket) || $buffer === false)) {
                // 接收完成,清除轮询事件,关闭 socket 连接
                $this->del();
                return;
            }
        } else {
            echo $buffer . PHP_EOL;
        }
        // 向客户端发送欢迎信息
        fwrite($socket, "welcome " . (int)$this->_socket); 
    }
 
    /**
     * 关闭轮询
     */
    public function del()
    {
        Worker::$loop->del($this->_socket);
    }
}
 
class Libevent
{
    public $base = null;
    public $events = [];
 
    public function __construct()
    {
        // 创建并且初始事件
        $this->base = event_base_new();
    }
 
    /**
     * 添加事件轮询
     * @param $socket
     * @param $func
     */
    public function add($socket, $func)
    {
        // 创建一个新的事件
        $event = event_new();
        // 准备想要在event_add中添加回调事件
        event_set($event, $socket, EV_READ | EV_PERSIST, $func, $this->base);
        // 关联事件到事件base
        event_base_set($event, $this->base);
        // 向指定的设置中添加一个执行事件
        event_add($event);
 
        // 记录事件
        $this->events[(int)$socket] = $event;
    }
 
    /**
     * 启动事件轮询
     */
    public function loop()
    {
        event_base_loop($this->base);
    }
 
    /**
     * 移除事件轮询
     * @param $socket
     */
    public function del($socket)
    {
        event_del($this->events[(int)$socket]);
        unset($this->events[(int)$socket]);
        @fclose($socket);
    }
}
 
new Worker();

 

记住,一定要在 linux 下运行,并且保证你的 php > 5.4 ,以及你安装了 php libevent 扩展 和 pcntl 扩展

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

发表评论

电子邮件地址不会被公开。